1 //
2 //
3 // Copyright 2022 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "src/core/ext/filters/logging/logging_filter.h"
20
21 #include <grpc/impl/channel_arg_names.h>
22 #include <grpc/slice.h>
23 #include <grpc/status.h>
24 #include <grpc/support/port_platform.h>
25 #include <inttypes.h>
26
27 #include <algorithm>
28 #include <cstddef>
29 #include <functional>
30 #include <map>
31 #include <memory>
32 #include <string>
33 #include <utility>
34 #include <vector>
35
36 #include "absl/log/log.h"
37 #include "absl/numeric/int128.h"
38 #include "absl/random/random.h"
39 #include "absl/random/uniform_int_distribution.h"
40 #include "absl/status/statusor.h"
41 #include "absl/strings/numbers.h"
42 #include "absl/strings/str_cat.h"
43 #include "absl/strings/str_split.h"
44 #include "absl/strings/string_view.h"
45 #include "absl/strings/strip.h"
46 #include "absl/types/optional.h"
47 #include "src/core/client_channel/client_channel_filter.h"
48 #include "src/core/config/core_configuration.h"
49 #include "src/core/ext/filters/logging/logging_sink.h"
50 #include "src/core/lib/channel/channel_args.h"
51 #include "src/core/lib/channel/channel_fwd.h"
52 #include "src/core/lib/channel/channel_stack.h"
53 #include "src/core/lib/promise/arena_promise.h"
54 #include "src/core/lib/promise/cancel_callback.h"
55 #include "src/core/lib/promise/context.h"
56 #include "src/core/lib/promise/map.h"
57 #include "src/core/lib/promise/pipe.h"
58 #include "src/core/lib/resource_quota/arena.h"
59 #include "src/core/lib/slice/slice.h"
60 #include "src/core/lib/slice/slice_buffer.h"
61 #include "src/core/lib/surface/channel_stack_type.h"
62 #include "src/core/lib/transport/metadata_batch.h"
63 #include "src/core/lib/transport/transport.h"
64 #include "src/core/resolver/resolver_registry.h"
65 #include "src/core/telemetry/call_tracer.h"
66 #include "src/core/util/host_port.h"
67 #include "src/core/util/latent_see.h"
68 #include "src/core/util/time.h"
69 #include "src/core/util/uri.h"
70
71 namespace grpc_core {
72
73 const NoInterceptor ClientLoggingFilter::Call::OnFinalize;
74 const NoInterceptor ServerLoggingFilter::Call::OnFinalize;
75
76 namespace {
77
78 LoggingSink* g_logging_sink = nullptr;
79
GetCallId()80 absl::uint128 GetCallId() {
81 thread_local absl::InsecureBitGen gen;
82 return absl::uniform_int_distribution<absl::uint128>()(gen);
83 }
84
85 class MetadataEncoder {
86 public:
MetadataEncoder(LoggingSink::Entry::Payload * payload,std::string * status_details_bin,uint64_t log_len)87 MetadataEncoder(LoggingSink::Entry::Payload* payload,
88 std::string* status_details_bin, uint64_t log_len)
89 : payload_(payload),
90 status_details_bin_(status_details_bin),
91 log_len_(log_len) {}
92
Encode(const Slice & key_slice,const Slice & value_slice)93 void Encode(const Slice& key_slice, const Slice& value_slice) {
94 auto key = key_slice.as_string_view();
95 auto value = value_slice.as_string_view();
96 if (status_details_bin_ != nullptr && key == "grpc-status-details-bin") {
97 *status_details_bin_ = std::string(value);
98 return;
99 }
100 if (absl::ConsumePrefix(&key, "grpc-")) {
101 // skip all other grpc- headers
102 return;
103 }
104 uint64_t mdentry_len = key.length() + value.length();
105 if (mdentry_len > log_len_) {
106 VLOG(2) << "Skipped metadata key because of max metadata logging bytes "
107 << mdentry_len << " (current) vs " << log_len_
108 << " (max less already accounted metadata)";
109 truncated_ = true;
110 return;
111 }
112
113 payload_->metadata.emplace(std::string(key), std::string(value));
114 log_len_ -= mdentry_len;
115 }
116
117 template <typename Which>
Encode(Which,const typename Which::ValueType &)118 void Encode(Which, const typename Which::ValueType&) {}
119
Encode(GrpcStatusMetadata,grpc_status_code status)120 void Encode(GrpcStatusMetadata, grpc_status_code status) {
121 payload_->status_code = status;
122 }
123
Encode(GrpcMessageMetadata,const Slice & status_message)124 void Encode(GrpcMessageMetadata, const Slice& status_message) {
125 payload_->status_message = std::string(status_message.as_string_view());
126 }
127
truncated() const128 bool truncated() const { return truncated_; }
129
130 private:
131 LoggingSink::Entry::Payload* const payload_;
132 std::string* const status_details_bin_;
133 uint64_t log_len_;
134 bool truncated_ = false;
135 };
136
SetIpPort(absl::string_view s,LoggingSink::Entry::Address * peer)137 void SetIpPort(absl::string_view s, LoggingSink::Entry::Address* peer) {
138 absl::string_view host;
139 absl::string_view port;
140 if (SplitHostPort(absl::string_view(s.data(), s.length()), &host, &port) ==
141 1) {
142 if (!host.empty()) {
143 peer->address = std::string(host);
144 }
145 if (!port.empty()) {
146 if (!absl::SimpleAtoi(absl::string_view(port.data(), port.size()),
147 &peer->ip_port)) {
148 peer->ip_port = 0;
149 }
150 }
151 }
152 }
153
PeerStringToAddress(const Slice & peer_string)154 LoggingSink::Entry::Address PeerStringToAddress(const Slice& peer_string) {
155 LoggingSink::Entry::Address address;
156 absl::StatusOr<URI> uri = URI::Parse(peer_string.as_string_view());
157 if (!uri.ok()) {
158 VLOG(2) << "peer_string is in invalid format and cannot be logged";
159 return address;
160 }
161
162 if (uri->scheme() == "ipv4") {
163 address.type = LoggingSink::Entry::Address::Type::kIpv4;
164 SetIpPort(uri->path(), &address);
165 } else if (uri->scheme() == "ipv6") {
166 address.type = LoggingSink::Entry::Address::Type::kIpv6;
167 // TODO(zpencer): per grfc, verify RFC5952 section 4 styled addrs in use
168 SetIpPort(uri->path(), &address);
169 } else if (uri->scheme() == "unix") {
170 address.type = LoggingSink::Entry::Address::Type::kUnix;
171 address.address = uri->path();
172 }
173 return address;
174 }
175
EncodeMessageToPayload(const SliceBuffer * message,uint32_t log_len,LoggingSink::Entry * entry)176 void EncodeMessageToPayload(const SliceBuffer* message, uint32_t log_len,
177 LoggingSink::Entry* entry) {
178 auto* sb = message->c_slice_buffer();
179 entry->payload.message_length = sb->length;
180 // Log the message to a max of the configured message length
181 for (size_t i = 0; i < sb->count; i++) {
182 absl::StrAppend(
183 &entry->payload.message,
184 absl::string_view(
185 reinterpret_cast<const char*>(GRPC_SLICE_START_PTR(sb->slices[i])),
186 std::min(static_cast<size_t>(GRPC_SLICE_LENGTH(sb->slices[i])),
187 static_cast<size_t>(log_len))));
188 if (log_len < GRPC_SLICE_LENGTH(sb->slices[i])) {
189 entry->payload_truncated = true;
190 break;
191 }
192 log_len -= GRPC_SLICE_LENGTH(sb->slices[i]);
193 }
194 }
195
196 } // namespace
197
198 namespace logging_filter_detail {
199
CallData(bool is_client,const ClientMetadata & client_initial_metadata,const std::string & authority)200 CallData::CallData(bool is_client,
201 const ClientMetadata& client_initial_metadata,
202 const std::string& authority)
203 : call_id_(GetCallId()) {
204 absl::string_view path;
205 if (auto* value = client_initial_metadata.get_pointer(HttpPathMetadata())) {
206 path = value->as_string_view();
207 }
208 std::vector<std::string> parts = absl::StrSplit(path, '/', absl::SkipEmpty());
209 if (parts.size() == 2) {
210 service_name_ = std::move(parts[0]);
211 method_name_ = std::move(parts[1]);
212 }
213 config_ = g_logging_sink->FindMatch(is_client, service_name_, method_name_);
214 if (config_.ShouldLog()) {
215 if (auto* value =
216 client_initial_metadata.get_pointer(HttpAuthorityMetadata())) {
217 authority_ = std::string(value->as_string_view());
218 } else {
219 authority_ = authority;
220 }
221 }
222 }
223
LogClientHeader(bool is_client,CallTracerAnnotationInterface * tracer,const ClientMetadata & metadata)224 void CallData::LogClientHeader(bool is_client,
225 CallTracerAnnotationInterface* tracer,
226 const ClientMetadata& metadata) {
227 LoggingSink::Entry entry;
228 if (!is_client) {
229 if (auto* value = metadata.get_pointer(PeerString())) {
230 peer_ = PeerStringToAddress(*value);
231 }
232 }
233 SetCommonEntryFields(&entry, is_client, tracer,
234 LoggingSink::Entry::EventType::kClientHeader);
235 MetadataEncoder encoder(&entry.payload, nullptr,
236 config_.max_metadata_bytes());
237 metadata.Encode(&encoder);
238 entry.payload_truncated = encoder.truncated();
239 g_logging_sink->LogEntry(std::move(entry));
240 }
241
LogClientHalfClose(bool is_client,CallTracerAnnotationInterface * tracer)242 void CallData::LogClientHalfClose(bool is_client,
243 CallTracerAnnotationInterface* tracer) {
244 LoggingSink::Entry entry;
245 SetCommonEntryFields(&entry, is_client, tracer,
246 LoggingSink::Entry::EventType::kClientHalfClose);
247 g_logging_sink->LogEntry(std::move(entry));
248 }
249
LogServerHeader(bool is_client,CallTracerAnnotationInterface * tracer,const ServerMetadata * metadata)250 void CallData::LogServerHeader(bool is_client,
251 CallTracerAnnotationInterface* tracer,
252 const ServerMetadata* metadata) {
253 LoggingSink::Entry entry;
254 if (metadata != nullptr) {
255 entry.is_trailer_only = metadata->get(GrpcTrailersOnly()).value_or(false);
256 if (is_client) {
257 if (auto* value = metadata->get_pointer(PeerString())) {
258 peer_ = PeerStringToAddress(*value);
259 }
260 }
261 }
262 SetCommonEntryFields(&entry, is_client, tracer,
263 LoggingSink::Entry::EventType::kServerHeader);
264 if (metadata != nullptr) {
265 MetadataEncoder encoder(&entry.payload, nullptr,
266 config_.max_metadata_bytes());
267 metadata->Encode(&encoder);
268 entry.payload_truncated = encoder.truncated();
269 }
270 g_logging_sink->LogEntry(std::move(entry));
271 }
272
LogServerTrailer(bool is_client,CallTracerAnnotationInterface * tracer,const ServerMetadata * metadata)273 void CallData::LogServerTrailer(bool is_client,
274 CallTracerAnnotationInterface* tracer,
275 const ServerMetadata* metadata) {
276 LoggingSink::Entry entry;
277 SetCommonEntryFields(&entry, is_client, tracer,
278 LoggingSink::Entry::EventType::kServerTrailer);
279 if (metadata != nullptr) {
280 entry.is_trailer_only = metadata->get(GrpcTrailersOnly()).value_or(false);
281 MetadataEncoder encoder(&entry.payload, &entry.payload.status_details,
282 config_.max_metadata_bytes());
283 metadata->Encode(&encoder);
284 entry.payload_truncated = encoder.truncated();
285 }
286 g_logging_sink->LogEntry(std::move(entry));
287 }
288
LogClientMessage(bool is_client,CallTracerAnnotationInterface * tracer,const SliceBuffer * message)289 void CallData::LogClientMessage(bool is_client,
290 CallTracerAnnotationInterface* tracer,
291 const SliceBuffer* message) {
292 LoggingSink::Entry entry;
293 SetCommonEntryFields(&entry, is_client, tracer,
294 LoggingSink::Entry::EventType::kClientMessage);
295 EncodeMessageToPayload(message, config_.max_message_bytes(), &entry);
296 g_logging_sink->LogEntry(std::move(entry));
297 }
298
LogServerMessage(bool is_client,CallTracerAnnotationInterface * tracer,const SliceBuffer * message)299 void CallData::LogServerMessage(bool is_client,
300 CallTracerAnnotationInterface* tracer,
301 const SliceBuffer* message) {
302 LoggingSink::Entry entry;
303 SetCommonEntryFields(&entry, is_client, tracer,
304 LoggingSink::Entry::EventType::kServerMessage);
305 EncodeMessageToPayload(message, config_.max_message_bytes(), &entry);
306 g_logging_sink->LogEntry(std::move(entry));
307 }
308
LogCancel(bool is_client,CallTracerAnnotationInterface * tracer)309 void CallData::LogCancel(bool is_client,
310 CallTracerAnnotationInterface* tracer) {
311 LoggingSink::Entry entry;
312 SetCommonEntryFields(&entry, is_client, tracer,
313 LoggingSink::Entry::EventType::kCancel);
314 g_logging_sink->LogEntry(std::move(entry));
315 }
316
SetCommonEntryFields(LoggingSink::Entry * entry,bool is_client,CallTracerAnnotationInterface * tracer,LoggingSink::Entry::EventType event_type)317 void CallData::SetCommonEntryFields(LoggingSink::Entry* entry, bool is_client,
318 CallTracerAnnotationInterface* tracer,
319 LoggingSink::Entry::EventType event_type) {
320 entry->call_id = call_id_;
321 entry->sequence_id = sequence_id_++;
322 entry->type = event_type;
323 entry->logger = is_client ? LoggingSink::Entry::Logger::kClient
324 : LoggingSink::Entry::Logger::kServer;
325 entry->authority = authority_;
326 entry->peer = peer_;
327 entry->service_name = service_name_;
328 entry->method_name = method_name_;
329 entry->timestamp = Timestamp::Now();
330 if (tracer != nullptr) {
331 entry->trace_id = tracer->TraceId();
332 entry->span_id = tracer->SpanId();
333 entry->is_sampled = tracer->IsSampled();
334 }
335 }
336
337 } // namespace logging_filter_detail
338
339 absl::StatusOr<std::unique_ptr<ClientLoggingFilter>>
Create(const ChannelArgs & args,ChannelFilter::Args)340 ClientLoggingFilter::Create(const ChannelArgs& args,
341 ChannelFilter::Args /*filter_args*/) {
342 absl::optional<absl::string_view> default_authority =
343 args.GetString(GRPC_ARG_DEFAULT_AUTHORITY);
344 if (default_authority.has_value()) {
345 return std::make_unique<ClientLoggingFilter>(
346 std::string(default_authority.value()));
347 }
348 absl::optional<std::string> server_uri =
349 args.GetOwnedString(GRPC_ARG_SERVER_URI);
350 if (server_uri.has_value()) {
351 return std::make_unique<ClientLoggingFilter>(
352 CoreConfiguration::Get().resolver_registry().GetDefaultAuthority(
353 *server_uri));
354 }
355 return std::make_unique<ClientLoggingFilter>("");
356 }
357
OnClientInitialMetadata(ClientMetadata & md,ClientLoggingFilter * filter)358 void ClientLoggingFilter::Call::OnClientInitialMetadata(
359 ClientMetadata& md, ClientLoggingFilter* filter) {
360 GRPC_LATENT_SEE_INNER_SCOPE(
361 "ClientLoggingFilter::Call::OnClientInitialMetadata");
362 call_data_.emplace(true, md, filter->default_authority_);
363 if (!call_data_->ShouldLog()) {
364 call_data_.reset();
365 return;
366 }
367 call_data_->LogClientHeader(
368 /*is_client=*/true, MaybeGetContext<CallTracerAnnotationInterface>(), md);
369 }
370
OnServerInitialMetadata(ServerMetadata & md)371 void ClientLoggingFilter::Call::OnServerInitialMetadata(ServerMetadata& md) {
372 GRPC_LATENT_SEE_INNER_SCOPE(
373 "ClientLoggingFilter::Call::OnServerInitialMetadata");
374 if (!call_data_.has_value()) return;
375 call_data_->LogServerHeader(
376 /*is_client=*/true, MaybeGetContext<CallTracerAnnotationInterface>(),
377 &md);
378 }
379
OnServerTrailingMetadata(ServerMetadata & md)380 void ClientLoggingFilter::Call::OnServerTrailingMetadata(ServerMetadata& md) {
381 GRPC_LATENT_SEE_INNER_SCOPE(
382 "ClientLoggingFilter::Call::OnServerTrailingMetadata");
383 if (!call_data_.has_value()) return;
384 if (md.get(GrpcCallWasCancelled()).value_or(false) &&
385 md.get(GrpcStatusMetadata()) == GRPC_STATUS_CANCELLED) {
386 call_data_->LogCancel(
387 /*is_client=*/true, MaybeGetContext<CallTracerAnnotationInterface>());
388 return;
389 }
390 call_data_->LogServerTrailer(
391 /*is_client=*/true, MaybeGetContext<CallTracerAnnotationInterface>(),
392 &md);
393 }
394
OnClientToServerMessage(const Message & message)395 void ClientLoggingFilter::Call::OnClientToServerMessage(
396 const Message& message) {
397 GRPC_LATENT_SEE_INNER_SCOPE(
398 "ClientLoggingFilter::Call::OnClientToServerMessage");
399 if (!call_data_.has_value()) return;
400 call_data_->LogClientMessage(
401 /*is_client=*/true, MaybeGetContext<CallTracerAnnotationInterface>(),
402 message.payload());
403 }
404
OnClientToServerHalfClose()405 void ClientLoggingFilter::Call::OnClientToServerHalfClose() {
406 GRPC_LATENT_SEE_INNER_SCOPE(
407 "ClientLoggingFilter::Call::OnClientToServerHalfClose");
408 if (!call_data_.has_value()) return;
409 call_data_->LogClientHalfClose(
410 /*is_client=*/true, MaybeGetContext<CallTracerAnnotationInterface>());
411 }
412
OnServerToClientMessage(const Message & message)413 void ClientLoggingFilter::Call::OnServerToClientMessage(
414 const Message& message) {
415 GRPC_LATENT_SEE_INNER_SCOPE(
416 "ClientLoggingFilter::Call::OnServerToClientMessage");
417 if (!call_data_.has_value()) return;
418 call_data_->LogServerMessage(
419 /*is_client=*/true, MaybeGetContext<CallTracerAnnotationInterface>(),
420 message.payload());
421 }
422
423 const grpc_channel_filter ClientLoggingFilter::kFilter =
424 MakePromiseBasedFilter<ClientLoggingFilter, FilterEndpoint::kClient,
425 kFilterExaminesServerInitialMetadata |
426 kFilterExaminesInboundMessages |
427 kFilterExaminesOutboundMessages>();
428
429 absl::StatusOr<std::unique_ptr<ServerLoggingFilter>>
Create(const ChannelArgs &,ChannelFilter::Args)430 ServerLoggingFilter::Create(const ChannelArgs& /*args*/,
431 ChannelFilter::Args /*filter_args*/) {
432 return std::make_unique<ServerLoggingFilter>();
433 }
434
435 // Construct a promise for one call.
OnClientInitialMetadata(ClientMetadata & md)436 void ServerLoggingFilter::Call::OnClientInitialMetadata(ClientMetadata& md) {
437 GRPC_LATENT_SEE_INNER_SCOPE(
438 "ServerLoggingFilter::Call::OnClientInitialMetadata");
439 call_data_.emplace(false, md, "");
440 if (!call_data_->ShouldLog()) {
441 call_data_.reset();
442 return;
443 }
444 call_data_->LogClientHeader(
445 /*is_client=*/false, MaybeGetContext<CallTracerAnnotationInterface>(),
446 md);
447 }
448
OnServerInitialMetadata(ServerMetadata & md)449 void ServerLoggingFilter::Call::OnServerInitialMetadata(ServerMetadata& md) {
450 GRPC_LATENT_SEE_INNER_SCOPE(
451 "ServerLoggingFilter::Call::OnServerInitialMetadata");
452 if (!call_data_.has_value()) return;
453 call_data_->LogServerHeader(
454 /*is_client=*/false, MaybeGetContext<CallTracerAnnotationInterface>(),
455 &md);
456 }
457
OnServerTrailingMetadata(ServerMetadata & md)458 void ServerLoggingFilter::Call::OnServerTrailingMetadata(ServerMetadata& md) {
459 GRPC_LATENT_SEE_INNER_SCOPE(
460 "ServerLoggingFilter::Call::OnServerTrailingMetadata");
461 if (!call_data_.has_value()) return;
462 if (md.get(GrpcCallWasCancelled()).value_or(false) &&
463 md.get(GrpcStatusMetadata()) == GRPC_STATUS_CANCELLED) {
464 call_data_->LogCancel(
465 /*is_client=*/false, MaybeGetContext<CallTracerAnnotationInterface>());
466 return;
467 }
468 call_data_->LogServerTrailer(
469 /*is_client=*/false, MaybeGetContext<CallTracerAnnotationInterface>(),
470 &md);
471 }
472
OnClientToServerMessage(const Message & message)473 void ServerLoggingFilter::Call::OnClientToServerMessage(
474 const Message& message) {
475 GRPC_LATENT_SEE_INNER_SCOPE(
476 "ServerLoggingFilter::Call::OnClientToServerMessage");
477 if (!call_data_.has_value()) return;
478 call_data_->LogClientMessage(
479 /*is_client=*/false, MaybeGetContext<CallTracerAnnotationInterface>(),
480 message.payload());
481 }
482
OnClientToServerHalfClose()483 void ServerLoggingFilter::Call::OnClientToServerHalfClose() {
484 GRPC_LATENT_SEE_INNER_SCOPE(
485 "ServerLoggingFilter::Call::OnClientToServerHalfClose");
486 if (!call_data_.has_value()) return;
487 call_data_->LogClientHalfClose(
488 /*is_client=*/false, MaybeGetContext<CallTracerAnnotationInterface>());
489 }
490
OnServerToClientMessage(const Message & message)491 void ServerLoggingFilter::Call::OnServerToClientMessage(
492 const Message& message) {
493 GRPC_LATENT_SEE_INNER_SCOPE(
494 "ServerLoggingFilter::Call::OnServerToClientMessage");
495 if (!call_data_.has_value()) return;
496 call_data_->LogServerMessage(
497 /*is_client=*/false, MaybeGetContext<CallTracerAnnotationInterface>(),
498 message.payload());
499 }
500
501 const grpc_channel_filter ServerLoggingFilter::kFilter =
502 MakePromiseBasedFilter<ServerLoggingFilter, FilterEndpoint::kServer,
503 kFilterExaminesServerInitialMetadata |
504 kFilterExaminesInboundMessages |
505 kFilterExaminesOutboundMessages>();
506
RegisterLoggingFilter(LoggingSink * sink)507 void RegisterLoggingFilter(LoggingSink* sink) {
508 g_logging_sink = sink;
509 CoreConfiguration::RegisterBuilder([](CoreConfiguration::Builder* builder) {
510 builder->channel_init()
511 ->RegisterV2Filter<ServerLoggingFilter>(GRPC_SERVER_CHANNEL)
512 // TODO(yashykt) : Figure out a good place to place this channel arg
513 .IfChannelArg("grpc.experimental.enable_observability", true);
514 builder->channel_init()
515 ->RegisterV2Filter<ClientLoggingFilter>(GRPC_CLIENT_CHANNEL)
516 // TODO(yashykt) : Figure out a good place to place this channel arg
517 .IfChannelArg("grpc.experimental.enable_observability", true);
518 });
519 }
520
521 } // namespace grpc_core
522