• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2022 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "src/cpp/ext/gcp/observability_logging_sink.h"
20 
21 #include <grpc/impl/channel_arg_names.h>
22 #include <grpc/support/port_platform.h>
23 #include <grpc/support/time.h>
24 #include <grpcpp/grpcpp.h>
25 #include <grpcpp/security/credentials.h>
26 #include <grpcpp/support/channel_arguments.h>
27 #include <grpcpp/support/status.h>
28 
29 #include <algorithm>
30 #include <map>
31 #include <utility>
32 
33 #include "absl/log/log.h"
34 #include "absl/numeric/int128.h"
35 #include "absl/strings/escaping.h"
36 #include "absl/strings/match.h"
37 #include "absl/strings/str_format.h"
38 #include "absl/types/optional.h"
39 #include "google/api/monitored_resource.pb.h"
40 #include "google/logging/v2/log_entry.pb.h"
41 #include "google/logging/v2/logging.grpc.pb.h"
42 #include "google/logging/v2/logging.pb.h"
43 #include "google/protobuf/text_format.h"
44 #include "src/core/lib/event_engine/default_event_engine.h"
45 #include "src/core/util/env.h"
46 #include "src/core/util/json/json.h"
47 #include "src/core/util/time.h"
48 #include "src/core/util/uuid_v4.h"
49 #include "src/cpp/ext/filters/census/open_census_call_tracer.h"
50 
51 // IWYU pragma: no_include "google/protobuf/struct.pb.h"
52 // IWYU pragma: no_include "google/protobuf/timestamp.pb.h"
53 
54 namespace grpc {
55 namespace internal {
56 
57 using grpc_core::LoggingSink;
58 
ObservabilityLoggingSink(GcpObservabilityConfig::CloudLogging logging_config,std::string project_id,std::map<std::string,std::string> labels)59 ObservabilityLoggingSink::ObservabilityLoggingSink(
60     GcpObservabilityConfig::CloudLogging logging_config, std::string project_id,
61     std::map<std::string, std::string> labels)
62     : project_id_(std::move(project_id)),
63       labels_(labels.begin(), labels.end()) {
64   for (auto& client_rpc_event_config : logging_config.client_rpc_events) {
65     client_configs_.emplace_back(client_rpc_event_config);
66   }
67   for (auto& server_rpc_event_config : logging_config.server_rpc_events) {
68     server_configs_.emplace_back(server_rpc_event_config);
69   }
70   absl::optional<std::string> authority_env =
71       grpc_core::GetEnv("GOOGLE_CLOUD_CPP_LOGGING_SERVICE_V2_ENDPOINT");
72   absl::optional<std::string> endpoint_env =
73       grpc_core::GetEnv("GOOGLE_CLOUD_CPP_LOGGING_SERVICE_V2_ENDPOINT");
74   if (authority_env.has_value() && !authority_env->empty()) {
75     authority_ = std::move(*endpoint_env);
76   }
77 }
78 
FindMatch(bool is_client,absl::string_view service,absl::string_view method)79 LoggingSink::Config ObservabilityLoggingSink::FindMatch(
80     bool is_client, absl::string_view service, absl::string_view method) {
81   const auto& configs = is_client ? client_configs_ : server_configs_;
82   if (service.empty() || method.empty()) {
83     return LoggingSink::Config();
84   }
85   for (const auto& config : configs) {
86     for (const auto& config_method : config.parsed_methods) {
87       if ((config_method.service == "*") ||
88           ((service == config_method.service) &&
89            ((config_method.method == "*") ||
90             (method == config_method.method)))) {
91         if (config.exclude) {
92           return LoggingSink::Config();
93         }
94         return LoggingSink::Config(config.max_metadata_bytes,
95                                    config.max_message_bytes);
96       }
97     }
98   }
99   return LoggingSink::Config();
100 }
101 
102 namespace {
103 
EventTypeToString(LoggingSink::Entry::EventType type)104 std::string EventTypeToString(LoggingSink::Entry::EventType type) {
105   switch (type) {
106     case LoggingSink::Entry::EventType::kClientHeader:
107       return "CLIENT_HEADER";
108     case LoggingSink::Entry::EventType::kServerHeader:
109       return "SERVER_HEADER";
110     case LoggingSink::Entry::EventType::kClientMessage:
111       return "CLIENT_MESSAGE";
112     case LoggingSink::Entry::EventType::kServerMessage:
113       return "SERVER_MESSAGE";
114     case LoggingSink::Entry::EventType::kClientHalfClose:
115       return "CLIENT_HALF_CLOSE";
116     case LoggingSink::Entry::EventType::kServerTrailer:
117       return "SERVER_TRAILER";
118     case LoggingSink::Entry::EventType::kCancel:
119       return "CANCEL";
120     case LoggingSink::Entry::EventType::kUnknown:
121     default:
122       return "EVENT_TYPE_UNKNOWN";
123   }
124 }
125 
LoggerToString(LoggingSink::Entry::Logger type)126 std::string LoggerToString(LoggingSink::Entry::Logger type) {
127   switch (type) {
128     case LoggingSink::Entry::Logger::kClient:
129       return "CLIENT";
130     case LoggingSink::Entry::Logger::kServer:
131       return "SERVER";
132     case LoggingSink::Entry::Logger::kUnknown:
133     default:
134       return "LOGGER_UNKNOWN";
135   }
136 }
137 
PayloadToJsonStructProto(LoggingSink::Entry::Payload payload,::google::protobuf::Struct * payload_proto)138 void PayloadToJsonStructProto(LoggingSink::Entry::Payload payload,
139                               ::google::protobuf::Struct* payload_proto) {
140   grpc_core::Json::Object payload_json;
141   if (!payload.metadata.empty()) {
142     auto* metadata_proto =
143         (*payload_proto->mutable_fields())["metadata"].mutable_struct_value();
144     for (auto& metadata : payload.metadata) {
145       if (absl::EndsWith(metadata.first, "-bin")) {
146         (*metadata_proto->mutable_fields())[metadata.first].set_string_value(
147             absl::WebSafeBase64Escape(metadata.second));
148       } else {
149         (*metadata_proto->mutable_fields())[metadata.first].set_string_value(
150             std::move(metadata.second));
151       }
152     }
153   }
154   if (payload.timeout != grpc_core::Duration::Zero()) {
155     (*payload_proto->mutable_fields())["timeout"].set_string_value(
156         payload.timeout.ToJsonString());
157   }
158   if (payload.status_code != 0) {
159     (*payload_proto->mutable_fields())["statusCode"].set_number_value(
160         payload.status_code);
161   }
162   if (!payload.status_message.empty()) {
163     (*payload_proto->mutable_fields())["statusMessage"].set_string_value(
164         std::move(payload.status_message));
165   }
166   if (!payload.status_details.empty()) {
167     (*payload_proto->mutable_fields())["statusDetails"].set_string_value(
168         absl::Base64Escape(payload.status_details));
169   }
170   if (payload.message_length != 0) {
171     (*payload_proto->mutable_fields())["messageLength"].set_number_value(
172         payload.message_length);
173   }
174   if (!payload.message.empty()) {
175     (*payload_proto->mutable_fields())["message"].set_string_value(
176         absl::Base64Escape(payload.message));
177   }
178 }
179 
AddressTypeToString(LoggingSink::Entry::Address::Type type)180 std::string AddressTypeToString(LoggingSink::Entry::Address::Type type) {
181   switch (type) {
182     case LoggingSink::Entry::Address::Type::kIpv4:
183       return "TYPE_IPV4";
184     case LoggingSink::Entry::Address::Type::kIpv6:
185       return "TYPE_IPV6";
186     case LoggingSink::Entry::Address::Type::kUnix:
187       return "TYPE_UNIX";
188     case LoggingSink::Entry::Address::Type::kUnknown:
189     default:
190       return "TYPE_UNKNOWN";
191   }
192 }
193 
PeerToJsonStructProto(LoggingSink::Entry::Address peer,::google::protobuf::Struct * peer_json)194 void PeerToJsonStructProto(LoggingSink::Entry::Address peer,
195                            ::google::protobuf::Struct* peer_json) {
196   (*peer_json->mutable_fields())["type"].set_string_value(
197       AddressTypeToString(peer.type));
198   if (peer.type != LoggingSink::Entry::Address::Type::kUnknown) {
199     (*peer_json->mutable_fields())["address"].set_string_value(
200         std::move(peer.address));
201     (*peer_json->mutable_fields())["ipPort"].set_number_value(peer.ip_port);
202   }
203 }
204 
205 }  // namespace
206 
EntryToJsonStructProto(LoggingSink::Entry entry,::google::protobuf::Struct * json_payload)207 void EntryToJsonStructProto(LoggingSink::Entry entry,
208                             ::google::protobuf::Struct* json_payload) {
209   (*json_payload->mutable_fields())["callId"].set_string_value(
210       grpc_core::GenerateUUIDv4(absl::Uint128High64(entry.call_id),
211                                 absl::Uint128Low64(entry.call_id)));
212   (*json_payload->mutable_fields())["sequenceId"].set_number_value(
213       entry.sequence_id);
214   (*json_payload->mutable_fields())["type"].set_string_value(
215       EventTypeToString(entry.type));
216   (*json_payload->mutable_fields())["logger"].set_string_value(
217       LoggerToString(entry.logger));
218   PayloadToJsonStructProto(
219       std::move(entry.payload),
220       (*json_payload->mutable_fields())["payload"].mutable_struct_value());
221   if (entry.payload_truncated) {
222     (*json_payload->mutable_fields())["payloadTruncated"].set_bool_value(
223         entry.payload_truncated);
224   }
225   PeerToJsonStructProto(
226       std::move(entry.peer),
227       (*json_payload->mutable_fields())["peer"].mutable_struct_value());
228   (*json_payload->mutable_fields())["authority"].set_string_value(
229       std::move(entry.authority));
230   (*json_payload->mutable_fields())["serviceName"].set_string_value(
231       std::move(entry.service_name));
232   (*json_payload->mutable_fields())["methodName"].set_string_value(
233       std::move(entry.method_name));
234 }
235 
236 namespace {
237 
EstimateEntrySize(const LoggingSink::Entry & entry)238 uint64_t EstimateEntrySize(const LoggingSink::Entry& entry) {
239   uint64_t size = sizeof(entry);
240   for (const auto& pair : entry.payload.metadata) {
241     size += pair.first.size() + pair.second.size();
242   }
243   size += entry.payload.status_message.size();
244   size += entry.payload.status_details.size();
245   size += entry.payload.message.size();
246   size += entry.authority.size();
247   size += entry.service_name.size();
248   size += entry.method_name.size();
249   return size;
250 }
251 
252 }  // namespace
253 
LogEntry(Entry entry)254 void ObservabilityLoggingSink::LogEntry(Entry entry) {
255   auto entry_size = EstimateEntrySize(entry);
256   grpc_core::MutexLock lock(&mu_);
257   if (sink_closed_) return;
258   entries_.push_back(std::move(entry));
259   entries_memory_footprint_ += entry_size;
260   MaybeTriggerFlushLocked();
261 }
262 
RegisterEnvironmentResource(const EnvironmentAutoDetect::ResourceType * resource)263 void ObservabilityLoggingSink::RegisterEnvironmentResource(
264     const EnvironmentAutoDetect::ResourceType* resource) {
265   grpc_core::MutexLock lock(&mu_);
266   resource_ = resource;
267   MaybeTriggerFlushLocked();
268 }
269 
FlushAndClose()270 void ObservabilityLoggingSink::FlushAndClose() {
271   grpc_core::MutexLock lock(&mu_);
272   sink_closed_ = true;
273   if (entries_.empty()) return;
274   MaybeTriggerFlushLocked();
275   sink_flushed_after_close_.Wait(&mu_);
276 }
277 
Flush()278 void ObservabilityLoggingSink::Flush() {
279   std::vector<Entry> entries;
280   google::logging::v2::LoggingServiceV2::StubInterface* stub = nullptr;
281   const EnvironmentAutoDetect::ResourceType* resource = nullptr;
282   {
283     grpc_core::MutexLock lock(&mu_);
284     if (flush_in_progress_) {
285       return;
286     }
287     flush_in_progress_ = true;
288     flush_timer_in_progress_ = false;
289     flush_triggered_ = false;
290     if (stub_ == nullptr) {
291       std::string endpoint;
292       absl::optional<std::string> endpoint_env =
293           grpc_core::GetEnv("GOOGLE_CLOUD_CPP_LOGGING_SERVICE_V2_ENDPOINT");
294       if (endpoint_env.has_value() && !endpoint_env->empty()) {
295         endpoint = std::move(*endpoint_env);
296       } else {
297         endpoint = "logging.googleapis.com";
298       }
299       ChannelArguments args;
300       // Disable observability for RPCs on this channel
301       args.SetInt(GRPC_ARG_ENABLE_OBSERVABILITY, 0);
302       // Set keepalive time to 24 hrs to effectively disable keepalive ping, but
303       // still enable KEEPALIVE_TIMEOUT to get the TCP_USER_TIMEOUT effect.
304       args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS,
305                   24 * 60 * 60 * 1000 /* 24 hours */);
306       args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 20 * 1000 /* 20 seconds */);
307       stub_ = google::logging::v2::LoggingServiceV2::NewStub(
308           CreateCustomChannel(endpoint, GoogleDefaultCredentials(), args));
309     }
310     stub = stub_.get();
311     entries = std::move(entries_);
312     entries_memory_footprint_ = 0;
313     resource = resource_;
314   }
315   FlushEntriesHelper(stub, std::move(entries), resource);
316 }
317 
FlushEntriesHelper(google::logging::v2::LoggingServiceV2::StubInterface * stub,std::vector<Entry> entries,const EnvironmentAutoDetect::ResourceType * resource)318 void ObservabilityLoggingSink::FlushEntriesHelper(
319     google::logging::v2::LoggingServiceV2::StubInterface* stub,
320     std::vector<Entry> entries,
321     const EnvironmentAutoDetect::ResourceType* resource) {
322   if (entries.empty()) {
323     return;
324   }
325   struct CallContext {
326     ClientContext context;
327     google::logging::v2::WriteLogEntriesRequest request;
328     google::logging::v2::WriteLogEntriesResponse response;
329   };
330   CallContext* call = new CallContext;
331   call->context.set_authority(authority_);
332   call->context.set_deadline(
333       (grpc_core::Timestamp::Now() + grpc_core::Duration::Seconds(30))
334           .as_timespec(GPR_CLOCK_MONOTONIC));
335   call->request.set_log_name(
336       absl::StrFormat("projects/%s/logs/"
337                       "microservices.googleapis.com%%2Fobservability%%2fgrpc",
338                       project_id_));
339   (*call->request.mutable_labels()).insert(labels_.begin(), labels_.end());
340   // Set the proper resource type and labels.
341   call->request.mutable_resource()->set_type(resource->resource_type);
342   call->request.mutable_resource()->mutable_labels()->insert(
343       resource->labels.begin(), resource->labels.end());
344   for (auto& entry : entries) {
345     auto* proto_entry = call->request.add_entries();
346     gpr_timespec timespec = entry.timestamp.as_timespec(GPR_CLOCK_REALTIME);
347     proto_entry->mutable_timestamp()->set_seconds(timespec.tv_sec);
348     proto_entry->mutable_timestamp()->set_nanos(timespec.tv_nsec);
349     // Add tracing details
350     proto_entry->set_span_id(entry.span_id);
351     proto_entry->set_trace(
352         absl::StrFormat("projects/%s/traces/%s", project_id_, entry.trace_id));
353     proto_entry->set_trace_sampled(entry.is_sampled);
354     // TODO(yashykt): Check if we need to fill receive timestamp
355     EntryToJsonStructProto(std::move(entry),
356                            proto_entry->mutable_json_payload());
357   }
358   stub->async()->WriteLogEntries(
359       &(call->context), &(call->request), &(call->response),
360       [this, call](Status status) {
361         if (!status.ok()) {
362           LOG(ERROR) << "GCP Observability Logging Error "
363                      << status.error_code() << ": " << status.error_message()
364                      << ". Dumping log entries.";
365           for (auto& entry : call->request.entries()) {
366             std::string output;
367             ::google::protobuf::TextFormat::PrintToString(entry.json_payload(),
368                                                           &output);
369             LOG(INFO) << "Log Entry recorded at time: "
370                       << grpc_core::Timestamp::FromTimespecRoundUp(
371                              gpr_timespec{entry.timestamp().seconds(),
372                                           entry.timestamp().nanos(),
373                                           GPR_CLOCK_REALTIME})
374                              .ToString()
375                       << " : " << output;
376           }
377         }
378         delete call;
379         grpc_core::MutexLock lock(&mu_);
380         flush_in_progress_ = false;
381         if (sink_closed_ && entries_.empty()) {
382           sink_flushed_after_close_.SignalAll();
383         } else {
384           MaybeTriggerFlushLocked();
385         }
386       });
387 }
388 
MaybeTriggerFlush()389 void ObservabilityLoggingSink::MaybeTriggerFlush() {
390   grpc_core::MutexLock lock(&mu_);
391   return MaybeTriggerFlushLocked();
392 }
393 
MaybeTriggerFlushLocked()394 void ObservabilityLoggingSink::MaybeTriggerFlushLocked() {
395   constexpr int kMaxEntriesBeforeDump = 100000;
396   constexpr int kMaxMemoryFootprintBeforeDump = 10 * 1024 * 1024;
397   constexpr int kMinEntriesBeforeFlush = 1000;
398   constexpr int kMinMemoryFootprintBeforeFlush = 1 * 1024 * 1024;
399   // Use this opportunity to fetch environment resource if not fetched already
400   if (resource_ == nullptr && !registered_env_fetch_notification_) {
401     auto& env_autodetect = EnvironmentAutoDetect::Get();
402     resource_ = env_autodetect.resource();
403     event_engine_ = grpc_event_engine::experimental::GetDefaultEventEngine();
404     if (resource_ == nullptr) {
405       registered_env_fetch_notification_ = true;
406       env_autodetect.NotifyOnDone([this]() {
407         RegisterEnvironmentResource(EnvironmentAutoDetect::Get().resource());
408       });
409     }
410   }
411   if (entries_.empty()) return;
412   if (entries_.size() > kMaxEntriesBeforeDump ||
413       entries_memory_footprint_ > kMaxMemoryFootprintBeforeDump) {
414     // Buffer limits have been reached. Dump entries with LOG
415     LOG(INFO) << "Buffer limit reached. Dumping log entries.";
416     for (auto& entry : entries_) {
417       google::protobuf::Struct proto;
418       std::string timestamp = entry.timestamp.ToString();
419       EntryToJsonStructProto(std::move(entry), &proto);
420       std::string output;
421       ::google::protobuf::TextFormat::PrintToString(proto, &output);
422       LOG(INFO) << "Log Entry recorded at time: " << timestamp << " : "
423                 << output;
424     }
425     entries_.clear();
426     entries_memory_footprint_ = 0;
427   } else if (resource_ != nullptr && !flush_in_progress_) {
428     // Environment resource has been detected. Trigger flush if conditions
429     // suffice.
430     if ((entries_.size() >= kMinEntriesBeforeFlush ||
431          entries_memory_footprint_ >= kMinMemoryFootprintBeforeFlush ||
432          sink_closed_) &&
433         !flush_triggered_) {
434       // It is fine even if there were a flush with a timer in progress. What is
435       // important is that a flush is triggered.
436       flush_triggered_ = true;
437       event_engine_->Run([this]() { Flush(); });
438     } else if (!flush_timer_in_progress_) {
439       flush_timer_in_progress_ = true;
440       event_engine_->RunAfter(grpc_core::Duration::Seconds(1),
441                               [this]() { Flush(); });
442     }
443   }
444 }
445 
Configuration(const GcpObservabilityConfig::CloudLogging::RpcEventConfiguration & rpc_event_config)446 ObservabilityLoggingSink::Configuration::Configuration(
447     const GcpObservabilityConfig::CloudLogging::RpcEventConfiguration&
448         rpc_event_config)
449     : exclude(rpc_event_config.exclude),
450       max_metadata_bytes(rpc_event_config.max_metadata_bytes),
451       max_message_bytes(rpc_event_config.max_message_bytes) {
452   for (auto& parsed_method : rpc_event_config.parsed_methods) {
453     parsed_methods.emplace_back(ParsedMethod{
454         std::string(parsed_method.service), std::string(parsed_method.method)});
455   }
456 }
457 
458 }  // namespace internal
459 }  // namespace grpc
460