• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "observability_util.h"
16 
17 #include <chrono>
18 #include <cstdlib>
19 #include <map>
20 #include <string>
21 
22 #include "absl/status/statusor.h"
23 #include "absl/strings/string_view.h"
24 #include "absl/types/optional.h"
25 #include "client_call_tracer.h"
26 #include "constants.h"
27 #include "python_observability_context.h"
28 #include "server_call_tracer.h"
29 
30 namespace grpc_observability {
31 
32 std::queue<CensusData>* g_census_data_buffer;
33 std::mutex g_census_data_buffer_mutex;
34 std::condition_variable g_census_data_buffer_cv;
35 // TODO(xuanwn): Change below to a more appropriate number.
36 // Assume buffer will store 100 CensusData and start export when buffer is 70%
37 // full.
38 constexpr float kExportThreshold = 0.7;
39 constexpr int kMaxExportBufferSize = 10000;
40 
41 namespace {
42 
GetExportThreadHold()43 float GetExportThreadHold() {
44   const char* value = std::getenv("GRPC_PYTHON_CENSUS_EXPORT_THRESHOLD");
45   if (value != nullptr) {
46     return std::stof(value);
47   }
48   return kExportThreshold;
49 }
50 
GetMaxExportBufferSize()51 int GetMaxExportBufferSize() {
52   const char* value = std::getenv("GRPC_PYTHON_CENSUS_MAX_EXPORT_BUFFER_SIZE");
53   if (value != nullptr) {
54     return std::stoi(value);
55   }
56   return kMaxExportBufferSize;
57 }
58 
59 }  // namespace
60 
RecordIntMetric(MetricsName name,int64_t value,const std::vector<Label> & labels,std::string identifier,const bool registered_method,const bool include_exchange_labels)61 void RecordIntMetric(MetricsName name, int64_t value,
62                      const std::vector<Label>& labels, std::string identifier,
63                      const bool registered_method,
64                      const bool include_exchange_labels) {
65   Measurement measurement_data;
66   measurement_data.type = kMeasurementInt;
67   measurement_data.name = name;
68   measurement_data.registered_method = registered_method;
69   measurement_data.include_exchange_labels = include_exchange_labels;
70   measurement_data.value.value_int = value;
71 
72   CensusData data = CensusData(measurement_data, labels, identifier);
73   AddCensusDataToBuffer(data);
74 }
75 
RecordDoubleMetric(MetricsName name,double value,const std::vector<Label> & labels,std::string identifier,const bool registered_method,const bool include_exchange_labels)76 void RecordDoubleMetric(MetricsName name, double value,
77                         const std::vector<Label>& labels,
78                         std::string identifier, const bool registered_method,
79                         const bool include_exchange_labels) {
80   Measurement measurement_data;
81   measurement_data.type = kMeasurementDouble;
82   measurement_data.name = name;
83   measurement_data.registered_method = registered_method;
84   measurement_data.include_exchange_labels = include_exchange_labels;
85   measurement_data.value.value_double = value;
86 
87   CensusData data = CensusData(measurement_data, labels, identifier);
88   AddCensusDataToBuffer(data);
89 }
90 
RecordSpan(const SpanCensusData & span_census_data)91 void RecordSpan(const SpanCensusData& span_census_data) {
92   CensusData data = CensusData(span_census_data);
93   AddCensusDataToBuffer(data);
94 }
95 
NativeObservabilityInit()96 void NativeObservabilityInit() {
97   g_census_data_buffer = new std::queue<CensusData>;
98 }
99 
CreateClientCallTracer(const char * method,const char * target,const char * trace_id,const char * parent_span_id,const char * identifier,const std::vector<Label> exchange_labels,bool add_csm_optional_labels,bool registered_method)100 void* CreateClientCallTracer(const char* method, const char* target,
101                              const char* trace_id, const char* parent_span_id,
102                              const char* identifier,
103                              const std::vector<Label> exchange_labels,
104                              bool add_csm_optional_labels,
105                              bool registered_method) {
106   void* client_call_tracer = new PythonOpenCensusCallTracer(
107       method, target, trace_id, parent_span_id, identifier, exchange_labels,
108       PythonCensusTracingEnabled(), add_csm_optional_labels, registered_method);
109   return client_call_tracer;
110 }
111 
CreateServerCallTracerFactory(const std::vector<Label> exchange_labels,const char * identifier)112 void* CreateServerCallTracerFactory(const std::vector<Label> exchange_labels,
113                                     const char* identifier) {
114   void* server_call_tracer_factory =
115       new PythonOpenCensusServerCallTracerFactory(exchange_labels, identifier);
116   return server_call_tracer_factory;
117 }
118 
AwaitNextBatchLocked(std::unique_lock<std::mutex> & lock,int timeout_ms)119 void AwaitNextBatchLocked(std::unique_lock<std::mutex>& lock, int timeout_ms) {
120   auto now = std::chrono::system_clock::now();
121   g_census_data_buffer_cv.wait_until(
122       lock, now + std::chrono::milliseconds(timeout_ms));
123 }
124 
AddCensusDataToBuffer(const CensusData & data)125 void AddCensusDataToBuffer(const CensusData& data) {
126   std::unique_lock<std::mutex> lk(g_census_data_buffer_mutex);
127   if (g_census_data_buffer->size() >= GetMaxExportBufferSize()) {
128     VLOG(2) << "Reached maximum census data buffer size, discarding this "
129                "CensusData entry";
130   } else {
131     g_census_data_buffer->push(data);
132   }
133   if (g_census_data_buffer->size() >=
134       (GetExportThreadHold() * GetMaxExportBufferSize())) {
135     g_census_data_buffer_cv.notify_all();
136   }
137 }
138 
StatusCodeToString(grpc_status_code code)139 absl::string_view StatusCodeToString(grpc_status_code code) {
140   switch (code) {
141     case GRPC_STATUS_OK:
142       return "OK";
143     case GRPC_STATUS_CANCELLED:
144       return "CANCELLED";
145     case GRPC_STATUS_UNKNOWN:
146       return "UNKNOWN";
147     case GRPC_STATUS_INVALID_ARGUMENT:
148       return "INVALID_ARGUMENT";
149     case GRPC_STATUS_DEADLINE_EXCEEDED:
150       return "DEADLINE_EXCEEDED";
151     case GRPC_STATUS_NOT_FOUND:
152       return "NOT_FOUND";
153     case GRPC_STATUS_ALREADY_EXISTS:
154       return "ALREADY_EXISTS";
155     case GRPC_STATUS_PERMISSION_DENIED:
156       return "PERMISSION_DENIED";
157     case GRPC_STATUS_UNAUTHENTICATED:
158       return "UNAUTHENTICATED";
159     case GRPC_STATUS_RESOURCE_EXHAUSTED:
160       return "RESOURCE_EXHAUSTED";
161     case GRPC_STATUS_FAILED_PRECONDITION:
162       return "FAILED_PRECONDITION";
163     case GRPC_STATUS_ABORTED:
164       return "ABORTED";
165     case GRPC_STATUS_OUT_OF_RANGE:
166       return "OUT_OF_RANGE";
167     case GRPC_STATUS_UNIMPLEMENTED:
168       return "UNIMPLEMENTED";
169     case GRPC_STATUS_INTERNAL:
170       return "INTERNAL";
171     case GRPC_STATUS_UNAVAILABLE:
172       return "UNAVAILABLE";
173     case GRPC_STATUS_DATA_LOSS:
174       return "DATA_LOSS";
175     default:
176       // gRPC wants users of this enum to include a default branch so that
177       // adding values is not a breaking change.
178       return "UNKNOWN_STATUS";
179   }
180 }
181 
182 }  // namespace grpc_observability
183