• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2022 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "src/core/ext/filters/logging/logging_filter.h"
20 
21 #include <grpc/impl/channel_arg_names.h>
22 #include <grpc/slice.h>
23 #include <grpc/status.h>
24 #include <grpc/support/port_platform.h>
25 #include <inttypes.h>
26 
27 #include <algorithm>
28 #include <cstddef>
29 #include <functional>
30 #include <map>
31 #include <memory>
32 #include <string>
33 #include <utility>
34 #include <vector>
35 
36 #include "absl/log/log.h"
37 #include "absl/numeric/int128.h"
38 #include "absl/random/random.h"
39 #include "absl/random/uniform_int_distribution.h"
40 #include "absl/status/statusor.h"
41 #include "absl/strings/numbers.h"
42 #include "absl/strings/str_cat.h"
43 #include "absl/strings/str_split.h"
44 #include "absl/strings/string_view.h"
45 #include "absl/strings/strip.h"
46 #include "absl/types/optional.h"
47 #include "src/core/client_channel/client_channel_filter.h"
48 #include "src/core/config/core_configuration.h"
49 #include "src/core/ext/filters/logging/logging_sink.h"
50 #include "src/core/lib/channel/channel_args.h"
51 #include "src/core/lib/channel/channel_fwd.h"
52 #include "src/core/lib/channel/channel_stack.h"
53 #include "src/core/lib/promise/arena_promise.h"
54 #include "src/core/lib/promise/cancel_callback.h"
55 #include "src/core/lib/promise/context.h"
56 #include "src/core/lib/promise/map.h"
57 #include "src/core/lib/promise/pipe.h"
58 #include "src/core/lib/resource_quota/arena.h"
59 #include "src/core/lib/slice/slice.h"
60 #include "src/core/lib/slice/slice_buffer.h"
61 #include "src/core/lib/surface/channel_stack_type.h"
62 #include "src/core/lib/transport/metadata_batch.h"
63 #include "src/core/lib/transport/transport.h"
64 #include "src/core/resolver/resolver_registry.h"
65 #include "src/core/telemetry/call_tracer.h"
66 #include "src/core/util/host_port.h"
67 #include "src/core/util/latent_see.h"
68 #include "src/core/util/time.h"
69 #include "src/core/util/uri.h"
70 
71 namespace grpc_core {
72 
73 const NoInterceptor ClientLoggingFilter::Call::OnFinalize;
74 const NoInterceptor ServerLoggingFilter::Call::OnFinalize;
75 
76 namespace {
77 
78 LoggingSink* g_logging_sink = nullptr;
79 
GetCallId()80 absl::uint128 GetCallId() {
81   thread_local absl::InsecureBitGen gen;
82   return absl::uniform_int_distribution<absl::uint128>()(gen);
83 }
84 
85 class MetadataEncoder {
86  public:
MetadataEncoder(LoggingSink::Entry::Payload * payload,std::string * status_details_bin,uint64_t log_len)87   MetadataEncoder(LoggingSink::Entry::Payload* payload,
88                   std::string* status_details_bin, uint64_t log_len)
89       : payload_(payload),
90         status_details_bin_(status_details_bin),
91         log_len_(log_len) {}
92 
Encode(const Slice & key_slice,const Slice & value_slice)93   void Encode(const Slice& key_slice, const Slice& value_slice) {
94     auto key = key_slice.as_string_view();
95     auto value = value_slice.as_string_view();
96     if (status_details_bin_ != nullptr && key == "grpc-status-details-bin") {
97       *status_details_bin_ = std::string(value);
98       return;
99     }
100     if (absl::ConsumePrefix(&key, "grpc-")) {
101       // skip all other grpc- headers
102       return;
103     }
104     uint64_t mdentry_len = key.length() + value.length();
105     if (mdentry_len > log_len_) {
106       VLOG(2) << "Skipped metadata key because of max metadata logging bytes "
107               << mdentry_len << " (current) vs " << log_len_
108               << " (max less already accounted metadata)";
109       truncated_ = true;
110       return;
111     }
112 
113     payload_->metadata.emplace(std::string(key), std::string(value));
114     log_len_ -= mdentry_len;
115   }
116 
117   template <typename Which>
Encode(Which,const typename Which::ValueType &)118   void Encode(Which, const typename Which::ValueType&) {}
119 
Encode(GrpcStatusMetadata,grpc_status_code status)120   void Encode(GrpcStatusMetadata, grpc_status_code status) {
121     payload_->status_code = status;
122   }
123 
Encode(GrpcMessageMetadata,const Slice & status_message)124   void Encode(GrpcMessageMetadata, const Slice& status_message) {
125     payload_->status_message = std::string(status_message.as_string_view());
126   }
127 
truncated() const128   bool truncated() const { return truncated_; }
129 
130  private:
131   LoggingSink::Entry::Payload* const payload_;
132   std::string* const status_details_bin_;
133   uint64_t log_len_;
134   bool truncated_ = false;
135 };
136 
SetIpPort(absl::string_view s,LoggingSink::Entry::Address * peer)137 void SetIpPort(absl::string_view s, LoggingSink::Entry::Address* peer) {
138   absl::string_view host;
139   absl::string_view port;
140   if (SplitHostPort(absl::string_view(s.data(), s.length()), &host, &port) ==
141       1) {
142     if (!host.empty()) {
143       peer->address = std::string(host);
144     }
145     if (!port.empty()) {
146       if (!absl::SimpleAtoi(absl::string_view(port.data(), port.size()),
147                             &peer->ip_port)) {
148         peer->ip_port = 0;
149       }
150     }
151   }
152 }
153 
PeerStringToAddress(const Slice & peer_string)154 LoggingSink::Entry::Address PeerStringToAddress(const Slice& peer_string) {
155   LoggingSink::Entry::Address address;
156   absl::StatusOr<URI> uri = URI::Parse(peer_string.as_string_view());
157   if (!uri.ok()) {
158     VLOG(2) << "peer_string is in invalid format and cannot be logged";
159     return address;
160   }
161 
162   if (uri->scheme() == "ipv4") {
163     address.type = LoggingSink::Entry::Address::Type::kIpv4;
164     SetIpPort(uri->path(), &address);
165   } else if (uri->scheme() == "ipv6") {
166     address.type = LoggingSink::Entry::Address::Type::kIpv6;
167     // TODO(zpencer): per grfc, verify RFC5952 section 4 styled addrs in use
168     SetIpPort(uri->path(), &address);
169   } else if (uri->scheme() == "unix") {
170     address.type = LoggingSink::Entry::Address::Type::kUnix;
171     address.address = uri->path();
172   }
173   return address;
174 }
175 
EncodeMessageToPayload(const SliceBuffer * message,uint32_t log_len,LoggingSink::Entry * entry)176 void EncodeMessageToPayload(const SliceBuffer* message, uint32_t log_len,
177                             LoggingSink::Entry* entry) {
178   auto* sb = message->c_slice_buffer();
179   entry->payload.message_length = sb->length;
180   // Log the message to a max of the configured message length
181   for (size_t i = 0; i < sb->count; i++) {
182     absl::StrAppend(
183         &entry->payload.message,
184         absl::string_view(
185             reinterpret_cast<const char*>(GRPC_SLICE_START_PTR(sb->slices[i])),
186             std::min(static_cast<size_t>(GRPC_SLICE_LENGTH(sb->slices[i])),
187                      static_cast<size_t>(log_len))));
188     if (log_len < GRPC_SLICE_LENGTH(sb->slices[i])) {
189       entry->payload_truncated = true;
190       break;
191     }
192     log_len -= GRPC_SLICE_LENGTH(sb->slices[i]);
193   }
194 }
195 
196 }  // namespace
197 
198 namespace logging_filter_detail {
199 
CallData(bool is_client,const ClientMetadata & client_initial_metadata,const std::string & authority)200 CallData::CallData(bool is_client,
201                    const ClientMetadata& client_initial_metadata,
202                    const std::string& authority)
203     : call_id_(GetCallId()) {
204   absl::string_view path;
205   if (auto* value = client_initial_metadata.get_pointer(HttpPathMetadata())) {
206     path = value->as_string_view();
207   }
208   std::vector<std::string> parts = absl::StrSplit(path, '/', absl::SkipEmpty());
209   if (parts.size() == 2) {
210     service_name_ = std::move(parts[0]);
211     method_name_ = std::move(parts[1]);
212   }
213   config_ = g_logging_sink->FindMatch(is_client, service_name_, method_name_);
214   if (config_.ShouldLog()) {
215     if (auto* value =
216             client_initial_metadata.get_pointer(HttpAuthorityMetadata())) {
217       authority_ = std::string(value->as_string_view());
218     } else {
219       authority_ = authority;
220     }
221   }
222 }
223 
LogClientHeader(bool is_client,CallTracerAnnotationInterface * tracer,const ClientMetadata & metadata)224 void CallData::LogClientHeader(bool is_client,
225                                CallTracerAnnotationInterface* tracer,
226                                const ClientMetadata& metadata) {
227   LoggingSink::Entry entry;
228   if (!is_client) {
229     if (auto* value = metadata.get_pointer(PeerString())) {
230       peer_ = PeerStringToAddress(*value);
231     }
232   }
233   SetCommonEntryFields(&entry, is_client, tracer,
234                        LoggingSink::Entry::EventType::kClientHeader);
235   MetadataEncoder encoder(&entry.payload, nullptr,
236                           config_.max_metadata_bytes());
237   metadata.Encode(&encoder);
238   entry.payload_truncated = encoder.truncated();
239   g_logging_sink->LogEntry(std::move(entry));
240 }
241 
LogClientHalfClose(bool is_client,CallTracerAnnotationInterface * tracer)242 void CallData::LogClientHalfClose(bool is_client,
243                                   CallTracerAnnotationInterface* tracer) {
244   LoggingSink::Entry entry;
245   SetCommonEntryFields(&entry, is_client, tracer,
246                        LoggingSink::Entry::EventType::kClientHalfClose);
247   g_logging_sink->LogEntry(std::move(entry));
248 }
249 
LogServerHeader(bool is_client,CallTracerAnnotationInterface * tracer,const ServerMetadata * metadata)250 void CallData::LogServerHeader(bool is_client,
251                                CallTracerAnnotationInterface* tracer,
252                                const ServerMetadata* metadata) {
253   LoggingSink::Entry entry;
254   if (metadata != nullptr) {
255     entry.is_trailer_only = metadata->get(GrpcTrailersOnly()).value_or(false);
256     if (is_client) {
257       if (auto* value = metadata->get_pointer(PeerString())) {
258         peer_ = PeerStringToAddress(*value);
259       }
260     }
261   }
262   SetCommonEntryFields(&entry, is_client, tracer,
263                        LoggingSink::Entry::EventType::kServerHeader);
264   if (metadata != nullptr) {
265     MetadataEncoder encoder(&entry.payload, nullptr,
266                             config_.max_metadata_bytes());
267     metadata->Encode(&encoder);
268     entry.payload_truncated = encoder.truncated();
269   }
270   g_logging_sink->LogEntry(std::move(entry));
271 }
272 
LogServerTrailer(bool is_client,CallTracerAnnotationInterface * tracer,const ServerMetadata * metadata)273 void CallData::LogServerTrailer(bool is_client,
274                                 CallTracerAnnotationInterface* tracer,
275                                 const ServerMetadata* metadata) {
276   LoggingSink::Entry entry;
277   SetCommonEntryFields(&entry, is_client, tracer,
278                        LoggingSink::Entry::EventType::kServerTrailer);
279   if (metadata != nullptr) {
280     entry.is_trailer_only = metadata->get(GrpcTrailersOnly()).value_or(false);
281     MetadataEncoder encoder(&entry.payload, &entry.payload.status_details,
282                             config_.max_metadata_bytes());
283     metadata->Encode(&encoder);
284     entry.payload_truncated = encoder.truncated();
285   }
286   g_logging_sink->LogEntry(std::move(entry));
287 }
288 
LogClientMessage(bool is_client,CallTracerAnnotationInterface * tracer,const SliceBuffer * message)289 void CallData::LogClientMessage(bool is_client,
290                                 CallTracerAnnotationInterface* tracer,
291                                 const SliceBuffer* message) {
292   LoggingSink::Entry entry;
293   SetCommonEntryFields(&entry, is_client, tracer,
294                        LoggingSink::Entry::EventType::kClientMessage);
295   EncodeMessageToPayload(message, config_.max_message_bytes(), &entry);
296   g_logging_sink->LogEntry(std::move(entry));
297 }
298 
LogServerMessage(bool is_client,CallTracerAnnotationInterface * tracer,const SliceBuffer * message)299 void CallData::LogServerMessage(bool is_client,
300                                 CallTracerAnnotationInterface* tracer,
301                                 const SliceBuffer* message) {
302   LoggingSink::Entry entry;
303   SetCommonEntryFields(&entry, is_client, tracer,
304                        LoggingSink::Entry::EventType::kServerMessage);
305   EncodeMessageToPayload(message, config_.max_message_bytes(), &entry);
306   g_logging_sink->LogEntry(std::move(entry));
307 }
308 
LogCancel(bool is_client,CallTracerAnnotationInterface * tracer)309 void CallData::LogCancel(bool is_client,
310                          CallTracerAnnotationInterface* tracer) {
311   LoggingSink::Entry entry;
312   SetCommonEntryFields(&entry, is_client, tracer,
313                        LoggingSink::Entry::EventType::kCancel);
314   g_logging_sink->LogEntry(std::move(entry));
315 }
316 
SetCommonEntryFields(LoggingSink::Entry * entry,bool is_client,CallTracerAnnotationInterface * tracer,LoggingSink::Entry::EventType event_type)317 void CallData::SetCommonEntryFields(LoggingSink::Entry* entry, bool is_client,
318                                     CallTracerAnnotationInterface* tracer,
319                                     LoggingSink::Entry::EventType event_type) {
320   entry->call_id = call_id_;
321   entry->sequence_id = sequence_id_++;
322   entry->type = event_type;
323   entry->logger = is_client ? LoggingSink::Entry::Logger::kClient
324                             : LoggingSink::Entry::Logger::kServer;
325   entry->authority = authority_;
326   entry->peer = peer_;
327   entry->service_name = service_name_;
328   entry->method_name = method_name_;
329   entry->timestamp = Timestamp::Now();
330   if (tracer != nullptr) {
331     entry->trace_id = tracer->TraceId();
332     entry->span_id = tracer->SpanId();
333     entry->is_sampled = tracer->IsSampled();
334   }
335 }
336 
337 }  // namespace logging_filter_detail
338 
339 absl::StatusOr<std::unique_ptr<ClientLoggingFilter>>
Create(const ChannelArgs & args,ChannelFilter::Args)340 ClientLoggingFilter::Create(const ChannelArgs& args,
341                             ChannelFilter::Args /*filter_args*/) {
342   absl::optional<absl::string_view> default_authority =
343       args.GetString(GRPC_ARG_DEFAULT_AUTHORITY);
344   if (default_authority.has_value()) {
345     return std::make_unique<ClientLoggingFilter>(
346         std::string(default_authority.value()));
347   }
348   absl::optional<std::string> server_uri =
349       args.GetOwnedString(GRPC_ARG_SERVER_URI);
350   if (server_uri.has_value()) {
351     return std::make_unique<ClientLoggingFilter>(
352         CoreConfiguration::Get().resolver_registry().GetDefaultAuthority(
353             *server_uri));
354   }
355   return std::make_unique<ClientLoggingFilter>("");
356 }
357 
OnClientInitialMetadata(ClientMetadata & md,ClientLoggingFilter * filter)358 void ClientLoggingFilter::Call::OnClientInitialMetadata(
359     ClientMetadata& md, ClientLoggingFilter* filter) {
360   GRPC_LATENT_SEE_INNER_SCOPE(
361       "ClientLoggingFilter::Call::OnClientInitialMetadata");
362   call_data_.emplace(true, md, filter->default_authority_);
363   if (!call_data_->ShouldLog()) {
364     call_data_.reset();
365     return;
366   }
367   call_data_->LogClientHeader(
368       /*is_client=*/true, MaybeGetContext<CallTracerAnnotationInterface>(), md);
369 }
370 
OnServerInitialMetadata(ServerMetadata & md)371 void ClientLoggingFilter::Call::OnServerInitialMetadata(ServerMetadata& md) {
372   GRPC_LATENT_SEE_INNER_SCOPE(
373       "ClientLoggingFilter::Call::OnServerInitialMetadata");
374   if (!call_data_.has_value()) return;
375   call_data_->LogServerHeader(
376       /*is_client=*/true, MaybeGetContext<CallTracerAnnotationInterface>(),
377       &md);
378 }
379 
OnServerTrailingMetadata(ServerMetadata & md)380 void ClientLoggingFilter::Call::OnServerTrailingMetadata(ServerMetadata& md) {
381   GRPC_LATENT_SEE_INNER_SCOPE(
382       "ClientLoggingFilter::Call::OnServerTrailingMetadata");
383   if (!call_data_.has_value()) return;
384   if (md.get(GrpcCallWasCancelled()).value_or(false) &&
385       md.get(GrpcStatusMetadata()) == GRPC_STATUS_CANCELLED) {
386     call_data_->LogCancel(
387         /*is_client=*/true, MaybeGetContext<CallTracerAnnotationInterface>());
388     return;
389   }
390   call_data_->LogServerTrailer(
391       /*is_client=*/true, MaybeGetContext<CallTracerAnnotationInterface>(),
392       &md);
393 }
394 
OnClientToServerMessage(const Message & message)395 void ClientLoggingFilter::Call::OnClientToServerMessage(
396     const Message& message) {
397   GRPC_LATENT_SEE_INNER_SCOPE(
398       "ClientLoggingFilter::Call::OnClientToServerMessage");
399   if (!call_data_.has_value()) return;
400   call_data_->LogClientMessage(
401       /*is_client=*/true, MaybeGetContext<CallTracerAnnotationInterface>(),
402       message.payload());
403 }
404 
OnClientToServerHalfClose()405 void ClientLoggingFilter::Call::OnClientToServerHalfClose() {
406   GRPC_LATENT_SEE_INNER_SCOPE(
407       "ClientLoggingFilter::Call::OnClientToServerHalfClose");
408   if (!call_data_.has_value()) return;
409   call_data_->LogClientHalfClose(
410       /*is_client=*/true, MaybeGetContext<CallTracerAnnotationInterface>());
411 }
412 
OnServerToClientMessage(const Message & message)413 void ClientLoggingFilter::Call::OnServerToClientMessage(
414     const Message& message) {
415   GRPC_LATENT_SEE_INNER_SCOPE(
416       "ClientLoggingFilter::Call::OnServerToClientMessage");
417   if (!call_data_.has_value()) return;
418   call_data_->LogServerMessage(
419       /*is_client=*/true, MaybeGetContext<CallTracerAnnotationInterface>(),
420       message.payload());
421 }
422 
423 const grpc_channel_filter ClientLoggingFilter::kFilter =
424     MakePromiseBasedFilter<ClientLoggingFilter, FilterEndpoint::kClient,
425                            kFilterExaminesServerInitialMetadata |
426                                kFilterExaminesInboundMessages |
427                                kFilterExaminesOutboundMessages>();
428 
429 absl::StatusOr<std::unique_ptr<ServerLoggingFilter>>
Create(const ChannelArgs &,ChannelFilter::Args)430 ServerLoggingFilter::Create(const ChannelArgs& /*args*/,
431                             ChannelFilter::Args /*filter_args*/) {
432   return std::make_unique<ServerLoggingFilter>();
433 }
434 
435 // Construct a promise for one call.
OnClientInitialMetadata(ClientMetadata & md)436 void ServerLoggingFilter::Call::OnClientInitialMetadata(ClientMetadata& md) {
437   GRPC_LATENT_SEE_INNER_SCOPE(
438       "ServerLoggingFilter::Call::OnClientInitialMetadata");
439   call_data_.emplace(false, md, "");
440   if (!call_data_->ShouldLog()) {
441     call_data_.reset();
442     return;
443   }
444   call_data_->LogClientHeader(
445       /*is_client=*/false, MaybeGetContext<CallTracerAnnotationInterface>(),
446       md);
447 }
448 
OnServerInitialMetadata(ServerMetadata & md)449 void ServerLoggingFilter::Call::OnServerInitialMetadata(ServerMetadata& md) {
450   GRPC_LATENT_SEE_INNER_SCOPE(
451       "ServerLoggingFilter::Call::OnServerInitialMetadata");
452   if (!call_data_.has_value()) return;
453   call_data_->LogServerHeader(
454       /*is_client=*/false, MaybeGetContext<CallTracerAnnotationInterface>(),
455       &md);
456 }
457 
OnServerTrailingMetadata(ServerMetadata & md)458 void ServerLoggingFilter::Call::OnServerTrailingMetadata(ServerMetadata& md) {
459   GRPC_LATENT_SEE_INNER_SCOPE(
460       "ServerLoggingFilter::Call::OnServerTrailingMetadata");
461   if (!call_data_.has_value()) return;
462   if (md.get(GrpcCallWasCancelled()).value_or(false) &&
463       md.get(GrpcStatusMetadata()) == GRPC_STATUS_CANCELLED) {
464     call_data_->LogCancel(
465         /*is_client=*/false, MaybeGetContext<CallTracerAnnotationInterface>());
466     return;
467   }
468   call_data_->LogServerTrailer(
469       /*is_client=*/false, MaybeGetContext<CallTracerAnnotationInterface>(),
470       &md);
471 }
472 
OnClientToServerMessage(const Message & message)473 void ServerLoggingFilter::Call::OnClientToServerMessage(
474     const Message& message) {
475   GRPC_LATENT_SEE_INNER_SCOPE(
476       "ServerLoggingFilter::Call::OnClientToServerMessage");
477   if (!call_data_.has_value()) return;
478   call_data_->LogClientMessage(
479       /*is_client=*/false, MaybeGetContext<CallTracerAnnotationInterface>(),
480       message.payload());
481 }
482 
OnClientToServerHalfClose()483 void ServerLoggingFilter::Call::OnClientToServerHalfClose() {
484   GRPC_LATENT_SEE_INNER_SCOPE(
485       "ServerLoggingFilter::Call::OnClientToServerHalfClose");
486   if (!call_data_.has_value()) return;
487   call_data_->LogClientHalfClose(
488       /*is_client=*/false, MaybeGetContext<CallTracerAnnotationInterface>());
489 }
490 
OnServerToClientMessage(const Message & message)491 void ServerLoggingFilter::Call::OnServerToClientMessage(
492     const Message& message) {
493   GRPC_LATENT_SEE_INNER_SCOPE(
494       "ServerLoggingFilter::Call::OnServerToClientMessage");
495   if (!call_data_.has_value()) return;
496   call_data_->LogServerMessage(
497       /*is_client=*/false, MaybeGetContext<CallTracerAnnotationInterface>(),
498       message.payload());
499 }
500 
501 const grpc_channel_filter ServerLoggingFilter::kFilter =
502     MakePromiseBasedFilter<ServerLoggingFilter, FilterEndpoint::kServer,
503                            kFilterExaminesServerInitialMetadata |
504                                kFilterExaminesInboundMessages |
505                                kFilterExaminesOutboundMessages>();
506 
RegisterLoggingFilter(LoggingSink * sink)507 void RegisterLoggingFilter(LoggingSink* sink) {
508   g_logging_sink = sink;
509   CoreConfiguration::RegisterBuilder([](CoreConfiguration::Builder* builder) {
510     builder->channel_init()
511         ->RegisterV2Filter<ServerLoggingFilter>(GRPC_SERVER_CHANNEL)
512         // TODO(yashykt) : Figure out a good place to place this channel arg
513         .IfChannelArg("grpc.experimental.enable_observability", true);
514     builder->channel_init()
515         ->RegisterV2Filter<ClientLoggingFilter>(GRPC_CLIENT_CHANNEL)
516         // TODO(yashykt) : Figure out a good place to place this channel arg
517         .IfChannelArg("grpc.experimental.enable_observability", true);
518   });
519 }
520 
521 }  // namespace grpc_core
522