// // // Copyright 2022 gRPC authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // // #include "src/cpp/ext/gcp/observability_logging_sink.h" #include #include #include #include #include #include #include #include #include #include #include "absl/log/log.h" #include "absl/numeric/int128.h" #include "absl/strings/escaping.h" #include "absl/strings/match.h" #include "absl/strings/str_format.h" #include "absl/types/optional.h" #include "google/api/monitored_resource.pb.h" #include "google/logging/v2/log_entry.pb.h" #include "google/logging/v2/logging.grpc.pb.h" #include "google/logging/v2/logging.pb.h" #include "google/protobuf/text_format.h" #include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/util/env.h" #include "src/core/util/json/json.h" #include "src/core/util/time.h" #include "src/core/util/uuid_v4.h" #include "src/cpp/ext/filters/census/open_census_call_tracer.h" // IWYU pragma: no_include "google/protobuf/struct.pb.h" // IWYU pragma: no_include "google/protobuf/timestamp.pb.h" namespace grpc { namespace internal { using grpc_core::LoggingSink; ObservabilityLoggingSink::ObservabilityLoggingSink( GcpObservabilityConfig::CloudLogging logging_config, std::string project_id, std::map labels) : project_id_(std::move(project_id)), labels_(labels.begin(), labels.end()) { for (auto& client_rpc_event_config : logging_config.client_rpc_events) { client_configs_.emplace_back(client_rpc_event_config); } for (auto& server_rpc_event_config : logging_config.server_rpc_events) { server_configs_.emplace_back(server_rpc_event_config); } absl::optional authority_env = grpc_core::GetEnv("GOOGLE_CLOUD_CPP_LOGGING_SERVICE_V2_ENDPOINT"); absl::optional endpoint_env = grpc_core::GetEnv("GOOGLE_CLOUD_CPP_LOGGING_SERVICE_V2_ENDPOINT"); if (authority_env.has_value() && !authority_env->empty()) { authority_ = std::move(*endpoint_env); } } LoggingSink::Config ObservabilityLoggingSink::FindMatch( bool is_client, absl::string_view service, absl::string_view method) { const auto& configs = is_client ? client_configs_ : server_configs_; if (service.empty() || method.empty()) { return LoggingSink::Config(); } for (const auto& config : configs) { for (const auto& config_method : config.parsed_methods) { if ((config_method.service == "*") || ((service == config_method.service) && ((config_method.method == "*") || (method == config_method.method)))) { if (config.exclude) { return LoggingSink::Config(); } return LoggingSink::Config(config.max_metadata_bytes, config.max_message_bytes); } } } return LoggingSink::Config(); } namespace { std::string EventTypeToString(LoggingSink::Entry::EventType type) { switch (type) { case LoggingSink::Entry::EventType::kClientHeader: return "CLIENT_HEADER"; case LoggingSink::Entry::EventType::kServerHeader: return "SERVER_HEADER"; case LoggingSink::Entry::EventType::kClientMessage: return "CLIENT_MESSAGE"; case LoggingSink::Entry::EventType::kServerMessage: return "SERVER_MESSAGE"; case LoggingSink::Entry::EventType::kClientHalfClose: return "CLIENT_HALF_CLOSE"; case LoggingSink::Entry::EventType::kServerTrailer: return "SERVER_TRAILER"; case LoggingSink::Entry::EventType::kCancel: return "CANCEL"; case LoggingSink::Entry::EventType::kUnknown: default: return "EVENT_TYPE_UNKNOWN"; } } std::string LoggerToString(LoggingSink::Entry::Logger type) { switch (type) { case LoggingSink::Entry::Logger::kClient: return "CLIENT"; case LoggingSink::Entry::Logger::kServer: return "SERVER"; case LoggingSink::Entry::Logger::kUnknown: default: return "LOGGER_UNKNOWN"; } } void PayloadToJsonStructProto(LoggingSink::Entry::Payload payload, ::google::protobuf::Struct* payload_proto) { grpc_core::Json::Object payload_json; if (!payload.metadata.empty()) { auto* metadata_proto = (*payload_proto->mutable_fields())["metadata"].mutable_struct_value(); for (auto& metadata : payload.metadata) { if (absl::EndsWith(metadata.first, "-bin")) { (*metadata_proto->mutable_fields())[metadata.first].set_string_value( absl::WebSafeBase64Escape(metadata.second)); } else { (*metadata_proto->mutable_fields())[metadata.first].set_string_value( std::move(metadata.second)); } } } if (payload.timeout != grpc_core::Duration::Zero()) { (*payload_proto->mutable_fields())["timeout"].set_string_value( payload.timeout.ToJsonString()); } if (payload.status_code != 0) { (*payload_proto->mutable_fields())["statusCode"].set_number_value( payload.status_code); } if (!payload.status_message.empty()) { (*payload_proto->mutable_fields())["statusMessage"].set_string_value( std::move(payload.status_message)); } if (!payload.status_details.empty()) { (*payload_proto->mutable_fields())["statusDetails"].set_string_value( absl::Base64Escape(payload.status_details)); } if (payload.message_length != 0) { (*payload_proto->mutable_fields())["messageLength"].set_number_value( payload.message_length); } if (!payload.message.empty()) { (*payload_proto->mutable_fields())["message"].set_string_value( absl::Base64Escape(payload.message)); } } std::string AddressTypeToString(LoggingSink::Entry::Address::Type type) { switch (type) { case LoggingSink::Entry::Address::Type::kIpv4: return "TYPE_IPV4"; case LoggingSink::Entry::Address::Type::kIpv6: return "TYPE_IPV6"; case LoggingSink::Entry::Address::Type::kUnix: return "TYPE_UNIX"; case LoggingSink::Entry::Address::Type::kUnknown: default: return "TYPE_UNKNOWN"; } } void PeerToJsonStructProto(LoggingSink::Entry::Address peer, ::google::protobuf::Struct* peer_json) { (*peer_json->mutable_fields())["type"].set_string_value( AddressTypeToString(peer.type)); if (peer.type != LoggingSink::Entry::Address::Type::kUnknown) { (*peer_json->mutable_fields())["address"].set_string_value( std::move(peer.address)); (*peer_json->mutable_fields())["ipPort"].set_number_value(peer.ip_port); } } } // namespace void EntryToJsonStructProto(LoggingSink::Entry entry, ::google::protobuf::Struct* json_payload) { (*json_payload->mutable_fields())["callId"].set_string_value( grpc_core::GenerateUUIDv4(absl::Uint128High64(entry.call_id), absl::Uint128Low64(entry.call_id))); (*json_payload->mutable_fields())["sequenceId"].set_number_value( entry.sequence_id); (*json_payload->mutable_fields())["type"].set_string_value( EventTypeToString(entry.type)); (*json_payload->mutable_fields())["logger"].set_string_value( LoggerToString(entry.logger)); PayloadToJsonStructProto( std::move(entry.payload), (*json_payload->mutable_fields())["payload"].mutable_struct_value()); if (entry.payload_truncated) { (*json_payload->mutable_fields())["payloadTruncated"].set_bool_value( entry.payload_truncated); } PeerToJsonStructProto( std::move(entry.peer), (*json_payload->mutable_fields())["peer"].mutable_struct_value()); (*json_payload->mutable_fields())["authority"].set_string_value( std::move(entry.authority)); (*json_payload->mutable_fields())["serviceName"].set_string_value( std::move(entry.service_name)); (*json_payload->mutable_fields())["methodName"].set_string_value( std::move(entry.method_name)); } namespace { uint64_t EstimateEntrySize(const LoggingSink::Entry& entry) { uint64_t size = sizeof(entry); for (const auto& pair : entry.payload.metadata) { size += pair.first.size() + pair.second.size(); } size += entry.payload.status_message.size(); size += entry.payload.status_details.size(); size += entry.payload.message.size(); size += entry.authority.size(); size += entry.service_name.size(); size += entry.method_name.size(); return size; } } // namespace void ObservabilityLoggingSink::LogEntry(Entry entry) { auto entry_size = EstimateEntrySize(entry); grpc_core::MutexLock lock(&mu_); if (sink_closed_) return; entries_.push_back(std::move(entry)); entries_memory_footprint_ += entry_size; MaybeTriggerFlushLocked(); } void ObservabilityLoggingSink::RegisterEnvironmentResource( const EnvironmentAutoDetect::ResourceType* resource) { grpc_core::MutexLock lock(&mu_); resource_ = resource; MaybeTriggerFlushLocked(); } void ObservabilityLoggingSink::FlushAndClose() { grpc_core::MutexLock lock(&mu_); sink_closed_ = true; if (entries_.empty()) return; MaybeTriggerFlushLocked(); sink_flushed_after_close_.Wait(&mu_); } void ObservabilityLoggingSink::Flush() { std::vector entries; google::logging::v2::LoggingServiceV2::StubInterface* stub = nullptr; const EnvironmentAutoDetect::ResourceType* resource = nullptr; { grpc_core::MutexLock lock(&mu_); if (flush_in_progress_) { return; } flush_in_progress_ = true; flush_timer_in_progress_ = false; flush_triggered_ = false; if (stub_ == nullptr) { std::string endpoint; absl::optional endpoint_env = grpc_core::GetEnv("GOOGLE_CLOUD_CPP_LOGGING_SERVICE_V2_ENDPOINT"); if (endpoint_env.has_value() && !endpoint_env->empty()) { endpoint = std::move(*endpoint_env); } else { endpoint = "logging.googleapis.com"; } ChannelArguments args; // Disable observability for RPCs on this channel args.SetInt(GRPC_ARG_ENABLE_OBSERVABILITY, 0); // Set keepalive time to 24 hrs to effectively disable keepalive ping, but // still enable KEEPALIVE_TIMEOUT to get the TCP_USER_TIMEOUT effect. args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, 24 * 60 * 60 * 1000 /* 24 hours */); args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 20 * 1000 /* 20 seconds */); stub_ = google::logging::v2::LoggingServiceV2::NewStub( CreateCustomChannel(endpoint, GoogleDefaultCredentials(), args)); } stub = stub_.get(); entries = std::move(entries_); entries_memory_footprint_ = 0; resource = resource_; } FlushEntriesHelper(stub, std::move(entries), resource); } void ObservabilityLoggingSink::FlushEntriesHelper( google::logging::v2::LoggingServiceV2::StubInterface* stub, std::vector entries, const EnvironmentAutoDetect::ResourceType* resource) { if (entries.empty()) { return; } struct CallContext { ClientContext context; google::logging::v2::WriteLogEntriesRequest request; google::logging::v2::WriteLogEntriesResponse response; }; CallContext* call = new CallContext; call->context.set_authority(authority_); call->context.set_deadline( (grpc_core::Timestamp::Now() + grpc_core::Duration::Seconds(30)) .as_timespec(GPR_CLOCK_MONOTONIC)); call->request.set_log_name( absl::StrFormat("projects/%s/logs/" "microservices.googleapis.com%%2Fobservability%%2fgrpc", project_id_)); (*call->request.mutable_labels()).insert(labels_.begin(), labels_.end()); // Set the proper resource type and labels. call->request.mutable_resource()->set_type(resource->resource_type); call->request.mutable_resource()->mutable_labels()->insert( resource->labels.begin(), resource->labels.end()); for (auto& entry : entries) { auto* proto_entry = call->request.add_entries(); gpr_timespec timespec = entry.timestamp.as_timespec(GPR_CLOCK_REALTIME); proto_entry->mutable_timestamp()->set_seconds(timespec.tv_sec); proto_entry->mutable_timestamp()->set_nanos(timespec.tv_nsec); // Add tracing details proto_entry->set_span_id(entry.span_id); proto_entry->set_trace( absl::StrFormat("projects/%s/traces/%s", project_id_, entry.trace_id)); proto_entry->set_trace_sampled(entry.is_sampled); // TODO(yashykt): Check if we need to fill receive timestamp EntryToJsonStructProto(std::move(entry), proto_entry->mutable_json_payload()); } stub->async()->WriteLogEntries( &(call->context), &(call->request), &(call->response), [this, call](Status status) { if (!status.ok()) { LOG(ERROR) << "GCP Observability Logging Error " << status.error_code() << ": " << status.error_message() << ". Dumping log entries."; for (auto& entry : call->request.entries()) { std::string output; ::google::protobuf::TextFormat::PrintToString(entry.json_payload(), &output); LOG(INFO) << "Log Entry recorded at time: " << grpc_core::Timestamp::FromTimespecRoundUp( gpr_timespec{entry.timestamp().seconds(), entry.timestamp().nanos(), GPR_CLOCK_REALTIME}) .ToString() << " : " << output; } } delete call; grpc_core::MutexLock lock(&mu_); flush_in_progress_ = false; if (sink_closed_ && entries_.empty()) { sink_flushed_after_close_.SignalAll(); } else { MaybeTriggerFlushLocked(); } }); } void ObservabilityLoggingSink::MaybeTriggerFlush() { grpc_core::MutexLock lock(&mu_); return MaybeTriggerFlushLocked(); } void ObservabilityLoggingSink::MaybeTriggerFlushLocked() { constexpr int kMaxEntriesBeforeDump = 100000; constexpr int kMaxMemoryFootprintBeforeDump = 10 * 1024 * 1024; constexpr int kMinEntriesBeforeFlush = 1000; constexpr int kMinMemoryFootprintBeforeFlush = 1 * 1024 * 1024; // Use this opportunity to fetch environment resource if not fetched already if (resource_ == nullptr && !registered_env_fetch_notification_) { auto& env_autodetect = EnvironmentAutoDetect::Get(); resource_ = env_autodetect.resource(); event_engine_ = grpc_event_engine::experimental::GetDefaultEventEngine(); if (resource_ == nullptr) { registered_env_fetch_notification_ = true; env_autodetect.NotifyOnDone([this]() { RegisterEnvironmentResource(EnvironmentAutoDetect::Get().resource()); }); } } if (entries_.empty()) return; if (entries_.size() > kMaxEntriesBeforeDump || entries_memory_footprint_ > kMaxMemoryFootprintBeforeDump) { // Buffer limits have been reached. Dump entries with LOG LOG(INFO) << "Buffer limit reached. Dumping log entries."; for (auto& entry : entries_) { google::protobuf::Struct proto; std::string timestamp = entry.timestamp.ToString(); EntryToJsonStructProto(std::move(entry), &proto); std::string output; ::google::protobuf::TextFormat::PrintToString(proto, &output); LOG(INFO) << "Log Entry recorded at time: " << timestamp << " : " << output; } entries_.clear(); entries_memory_footprint_ = 0; } else if (resource_ != nullptr && !flush_in_progress_) { // Environment resource has been detected. Trigger flush if conditions // suffice. if ((entries_.size() >= kMinEntriesBeforeFlush || entries_memory_footprint_ >= kMinMemoryFootprintBeforeFlush || sink_closed_) && !flush_triggered_) { // It is fine even if there were a flush with a timer in progress. What is // important is that a flush is triggered. flush_triggered_ = true; event_engine_->Run([this]() { Flush(); }); } else if (!flush_timer_in_progress_) { flush_timer_in_progress_ = true; event_engine_->RunAfter(grpc_core::Duration::Seconds(1), [this]() { Flush(); }); } } } ObservabilityLoggingSink::Configuration::Configuration( const GcpObservabilityConfig::CloudLogging::RpcEventConfiguration& rpc_event_config) : exclude(rpc_event_config.exclude), max_metadata_bytes(rpc_event_config.max_metadata_bytes), max_message_bytes(rpc_event_config.max_message_bytes) { for (auto& parsed_method : rpc_event_config.parsed_methods) { parsed_methods.emplace_back(ParsedMethod{ std::string(parsed_method.service), std::string(parsed_method.method)}); } } } // namespace internal } // namespace grpc