1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "observability_util.h"
16
17 #include <chrono>
18 #include <cstdlib>
19 #include <map>
20 #include <string>
21
22 #include "absl/status/statusor.h"
23 #include "absl/strings/string_view.h"
24 #include "absl/types/optional.h"
25 #include "client_call_tracer.h"
26 #include "constants.h"
27 #include "python_observability_context.h"
28 #include "server_call_tracer.h"
29
30 namespace grpc_observability {
31
32 std::queue<CensusData>* g_census_data_buffer;
33 std::mutex g_census_data_buffer_mutex;
34 std::condition_variable g_census_data_buffer_cv;
35 // TODO(xuanwn): Change below to a more appropriate number.
36 // Assume buffer will store 100 CensusData and start export when buffer is 70%
37 // full.
38 constexpr float kExportThreshold = 0.7;
39 constexpr int kMaxExportBufferSize = 10000;
40
41 namespace {
42
GetExportThreadHold()43 float GetExportThreadHold() {
44 const char* value = std::getenv("GRPC_PYTHON_CENSUS_EXPORT_THRESHOLD");
45 if (value != nullptr) {
46 return std::stof(value);
47 }
48 return kExportThreshold;
49 }
50
GetMaxExportBufferSize()51 int GetMaxExportBufferSize() {
52 const char* value = std::getenv("GRPC_PYTHON_CENSUS_MAX_EXPORT_BUFFER_SIZE");
53 if (value != nullptr) {
54 return std::stoi(value);
55 }
56 return kMaxExportBufferSize;
57 }
58
59 } // namespace
60
RecordIntMetric(MetricsName name,int64_t value,const std::vector<Label> & labels,std::string identifier,const bool registered_method,const bool include_exchange_labels)61 void RecordIntMetric(MetricsName name, int64_t value,
62 const std::vector<Label>& labels, std::string identifier,
63 const bool registered_method,
64 const bool include_exchange_labels) {
65 Measurement measurement_data;
66 measurement_data.type = kMeasurementInt;
67 measurement_data.name = name;
68 measurement_data.registered_method = registered_method;
69 measurement_data.include_exchange_labels = include_exchange_labels;
70 measurement_data.value.value_int = value;
71
72 CensusData data = CensusData(measurement_data, labels, identifier);
73 AddCensusDataToBuffer(data);
74 }
75
RecordDoubleMetric(MetricsName name,double value,const std::vector<Label> & labels,std::string identifier,const bool registered_method,const bool include_exchange_labels)76 void RecordDoubleMetric(MetricsName name, double value,
77 const std::vector<Label>& labels,
78 std::string identifier, const bool registered_method,
79 const bool include_exchange_labels) {
80 Measurement measurement_data;
81 measurement_data.type = kMeasurementDouble;
82 measurement_data.name = name;
83 measurement_data.registered_method = registered_method;
84 measurement_data.include_exchange_labels = include_exchange_labels;
85 measurement_data.value.value_double = value;
86
87 CensusData data = CensusData(measurement_data, labels, identifier);
88 AddCensusDataToBuffer(data);
89 }
90
RecordSpan(const SpanCensusData & span_census_data)91 void RecordSpan(const SpanCensusData& span_census_data) {
92 CensusData data = CensusData(span_census_data);
93 AddCensusDataToBuffer(data);
94 }
95
NativeObservabilityInit()96 void NativeObservabilityInit() {
97 g_census_data_buffer = new std::queue<CensusData>;
98 }
99
CreateClientCallTracer(const char * method,const char * target,const char * trace_id,const char * parent_span_id,const char * identifier,const std::vector<Label> exchange_labels,bool add_csm_optional_labels,bool registered_method)100 void* CreateClientCallTracer(const char* method, const char* target,
101 const char* trace_id, const char* parent_span_id,
102 const char* identifier,
103 const std::vector<Label> exchange_labels,
104 bool add_csm_optional_labels,
105 bool registered_method) {
106 void* client_call_tracer = new PythonOpenCensusCallTracer(
107 method, target, trace_id, parent_span_id, identifier, exchange_labels,
108 PythonCensusTracingEnabled(), add_csm_optional_labels, registered_method);
109 return client_call_tracer;
110 }
111
CreateServerCallTracerFactory(const std::vector<Label> exchange_labels,const char * identifier)112 void* CreateServerCallTracerFactory(const std::vector<Label> exchange_labels,
113 const char* identifier) {
114 void* server_call_tracer_factory =
115 new PythonOpenCensusServerCallTracerFactory(exchange_labels, identifier);
116 return server_call_tracer_factory;
117 }
118
AwaitNextBatchLocked(std::unique_lock<std::mutex> & lock,int timeout_ms)119 void AwaitNextBatchLocked(std::unique_lock<std::mutex>& lock, int timeout_ms) {
120 auto now = std::chrono::system_clock::now();
121 g_census_data_buffer_cv.wait_until(
122 lock, now + std::chrono::milliseconds(timeout_ms));
123 }
124
AddCensusDataToBuffer(const CensusData & data)125 void AddCensusDataToBuffer(const CensusData& data) {
126 std::unique_lock<std::mutex> lk(g_census_data_buffer_mutex);
127 if (g_census_data_buffer->size() >= GetMaxExportBufferSize()) {
128 VLOG(2) << "Reached maximum census data buffer size, discarding this "
129 "CensusData entry";
130 } else {
131 g_census_data_buffer->push(data);
132 }
133 if (g_census_data_buffer->size() >=
134 (GetExportThreadHold() * GetMaxExportBufferSize())) {
135 g_census_data_buffer_cv.notify_all();
136 }
137 }
138
StatusCodeToString(grpc_status_code code)139 absl::string_view StatusCodeToString(grpc_status_code code) {
140 switch (code) {
141 case GRPC_STATUS_OK:
142 return "OK";
143 case GRPC_STATUS_CANCELLED:
144 return "CANCELLED";
145 case GRPC_STATUS_UNKNOWN:
146 return "UNKNOWN";
147 case GRPC_STATUS_INVALID_ARGUMENT:
148 return "INVALID_ARGUMENT";
149 case GRPC_STATUS_DEADLINE_EXCEEDED:
150 return "DEADLINE_EXCEEDED";
151 case GRPC_STATUS_NOT_FOUND:
152 return "NOT_FOUND";
153 case GRPC_STATUS_ALREADY_EXISTS:
154 return "ALREADY_EXISTS";
155 case GRPC_STATUS_PERMISSION_DENIED:
156 return "PERMISSION_DENIED";
157 case GRPC_STATUS_UNAUTHENTICATED:
158 return "UNAUTHENTICATED";
159 case GRPC_STATUS_RESOURCE_EXHAUSTED:
160 return "RESOURCE_EXHAUSTED";
161 case GRPC_STATUS_FAILED_PRECONDITION:
162 return "FAILED_PRECONDITION";
163 case GRPC_STATUS_ABORTED:
164 return "ABORTED";
165 case GRPC_STATUS_OUT_OF_RANGE:
166 return "OUT_OF_RANGE";
167 case GRPC_STATUS_UNIMPLEMENTED:
168 return "UNIMPLEMENTED";
169 case GRPC_STATUS_INTERNAL:
170 return "INTERNAL";
171 case GRPC_STATUS_UNAVAILABLE:
172 return "UNAVAILABLE";
173 case GRPC_STATUS_DATA_LOSS:
174 return "DATA_LOSS";
175 default:
176 // gRPC wants users of this enum to include a default branch so that
177 // adding values is not a breaking change.
178 return "UNKNOWN_STATUS";
179 }
180 }
181
182 } // namespace grpc_observability
183