1# Copyright 2023 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15cimport cpython 16from cython.operator cimport dereference 17 18import enum 19import functools 20import logging 21import os 22from threading import Thread 23from typing import AnyStr, Dict, List, Mapping, Tuple, Union 24 25from grpc_observability import _observability 26 27# Time we wait for batch exporting census data 28# TODO(xuanwn): change interval to a more appropriate number 29CENSUS_EXPORT_BATCH_INTERVAL_SECS = float(os.environ.get('GRPC_PYTHON_CENSUS_EXPORT_BATCH_INTERVAL_SECS', 0.5)) 30GRPC_PYTHON_CENSUS_EXPORT_THREAD_TIMEOUT = float(os.environ.get('GRPC_PYTHON_CENSUS_EXPORT_THREAD_TIMEOUT', 10)) 31cdef const char* CLIENT_CALL_TRACER = "client_call_tracer" 32cdef const char* SERVER_CALL_TRACER_FACTORY = "server_call_tracer_factory" 33cdef bint GLOBAL_SHUTDOWN_EXPORT_THREAD = False 34cdef object GLOBAL_EXPORT_THREAD 35 36PLUGIN_IDENTIFIER_SEP = "," 37 38_LOGGER = logging.getLogger(__name__) 39 40 41class _CyMetricsName: 42 CY_CLIENT_API_LATENCY = kRpcClientApiLatencyMeasureName 43 CY_CLIENT_SNET_MESSSAGES_PER_RPC = kRpcClientSentMessagesPerRpcMeasureName 44 CY_CLIENT_SEND_BYTES_PER_RPC = kRpcClientSentBytesPerRpcMeasureName 45 CY_CLIENT_RECEIVED_MESSAGES_PER_RPC = kRpcClientReceivedMessagesPerRpcMeasureName 46 CY_CLIENT_RECEIVED_BYTES_PER_RPC = kRpcClientReceivedBytesPerRpcMeasureName 47 CY_CLIENT_ROUNDTRIP_LATENCY = kRpcClientRoundtripLatencyMeasureName 48 CY_CLIENT_COMPLETED_RPC = kRpcClientCompletedRpcMeasureName 49 CY_CLIENT_SERVER_LATENCY = kRpcClientServerLatencyMeasureName 50 CY_CLIENT_STARTED_RPCS = kRpcClientStartedRpcsMeasureName 51 CY_CLIENT_RETRIES_PER_CALL = kRpcClientRetriesPerCallMeasureName 52 CY_CLIENT_TRANSPARENT_RETRIES_PER_CALL = kRpcClientTransparentRetriesPerCallMeasureName 53 CY_CLIENT_RETRY_DELAY_PER_CALL = kRpcClientRetryDelayPerCallMeasureName 54 CY_CLIENT_TRANSPORT_LATENCY = kRpcClientTransportLatencyMeasureName 55 CY_SERVER_SENT_MESSAGES_PER_RPC = kRpcServerSentMessagesPerRpcMeasureName 56 CY_SERVER_SENT_BYTES_PER_RPC = kRpcServerSentBytesPerRpcMeasureName 57 CY_SERVER_RECEIVED_MESSAGES_PER_RPC = kRpcServerReceivedMessagesPerRpcMeasureName 58 CY_SERVER_RECEIVED_BYTES_PER_RPC = kRpcServerReceivedBytesPerRpcMeasureName 59 CY_SERVER_SERVER_LATENCY = kRpcServerServerLatencyMeasureName 60 CY_SERVER_COMPLETED_RPC = kRpcServerCompletedRpcMeasureName 61 CY_SERVER_STARTED_RPCS = kRpcServerStartedRpcsMeasureName 62 63@enum.unique 64class MetricsName(enum.Enum): 65 CLIENT_STARTED_RPCS = _CyMetricsName.CY_CLIENT_STARTED_RPCS 66 CLIENT_API_LATENCY = _CyMetricsName.CY_CLIENT_API_LATENCY 67 CLIENT_SNET_MESSSAGES_PER_RPC = _CyMetricsName.CY_CLIENT_SNET_MESSSAGES_PER_RPC 68 CLIENT_SEND_BYTES_PER_RPC = _CyMetricsName.CY_CLIENT_SEND_BYTES_PER_RPC 69 CLIENT_RECEIVED_MESSAGES_PER_RPC = _CyMetricsName.CY_CLIENT_RECEIVED_MESSAGES_PER_RPC 70 CLIENT_RECEIVED_BYTES_PER_RPC = _CyMetricsName.CY_CLIENT_RECEIVED_BYTES_PER_RPC 71 CLIENT_ROUNDTRIP_LATENCY = _CyMetricsName.CY_CLIENT_ROUNDTRIP_LATENCY 72 CLIENT_COMPLETED_RPC = _CyMetricsName.CY_CLIENT_COMPLETED_RPC 73 CLIENT_SERVER_LATENCY = _CyMetricsName.CY_CLIENT_SERVER_LATENCY 74 CLIENT_RETRIES_PER_CALL = _CyMetricsName.CY_CLIENT_RETRIES_PER_CALL 75 CLIENT_TRANSPARENT_RETRIES_PER_CALL = _CyMetricsName.CY_CLIENT_TRANSPARENT_RETRIES_PER_CALL 76 CLIENT_RETRY_DELAY_PER_CALL = _CyMetricsName.CY_CLIENT_RETRY_DELAY_PER_CALL 77 SERVER_SENT_MESSAGES_PER_RPC = _CyMetricsName.CY_SERVER_SENT_MESSAGES_PER_RPC 78 SERVER_SENT_BYTES_PER_RPC = _CyMetricsName.CY_SERVER_SENT_BYTES_PER_RPC 79 SERVER_RECEIVED_MESSAGES_PER_RPC = _CyMetricsName.CY_SERVER_RECEIVED_MESSAGES_PER_RPC 80 SERVER_RECEIVED_BYTES_PER_RPC = _CyMetricsName.CY_SERVER_RECEIVED_BYTES_PER_RPC 81 SERVER_SERVER_LATENCY = _CyMetricsName.CY_SERVER_SERVER_LATENCY 82 SERVER_COMPLETED_RPC = _CyMetricsName.CY_SERVER_COMPLETED_RPC 83 SERVER_STARTED_RPCS = _CyMetricsName.CY_SERVER_STARTED_RPCS 84 85# Delay map creation due to circular dependencies 86_CY_METRICS_NAME_TO_PY_METRICS_NAME_MAPPING = {x.value: x for x in MetricsName} 87 88def cyobservability_init(object exporter) -> None: 89 exporter: _observability.Exporter 90 91 NativeObservabilityInit() 92 _start_exporting_thread(exporter) 93 94 95def _start_exporting_thread(object exporter) -> None: 96 exporter: _observability.Exporter 97 98 global GLOBAL_EXPORT_THREAD 99 global GLOBAL_SHUTDOWN_EXPORT_THREAD 100 GLOBAL_SHUTDOWN_EXPORT_THREAD = False 101 # TODO(xuanwn): Change it to daemon thread. 102 GLOBAL_EXPORT_THREAD = Thread(target=_export_census_data, args=(exporter,)) 103 GLOBAL_EXPORT_THREAD.start() 104 105def activate_config(object py_config) -> None: 106 py_config: "_observability_config.GcpObservabilityConfig" 107 108 if (py_config.tracing_enabled): 109 EnablePythonCensusTracing(True); 110 # Save sampling rate to global sampler. 111 ProbabilitySampler.Get().SetThreshold(py_config.sampling_rate) 112 113 if (py_config.stats_enabled): 114 EnablePythonCensusStats(True); 115 116def activate_stats() -> None: 117 EnablePythonCensusStats(True); 118 119def create_client_call_tracer(bytes method_name, bytes target, bytes trace_id, str identifier, 120 dict exchange_labels, object enabled_optional_labels, 121 bint registered_method, bytes parent_span_id=b'') -> cpython.PyObject: 122 """Create a ClientCallTracer and save to PyCapsule. 123 124 Returns: A grpc_observability._observability.ClientCallTracerCapsule object. 125 """ 126 cdef char* c_method = cpython.PyBytes_AsString(method_name) 127 cdef char* c_target = cpython.PyBytes_AsString(target) 128 cdef char* c_trace_id = cpython.PyBytes_AsString(trace_id) 129 cdef char* c_parent_span_id = cpython.PyBytes_AsString(parent_span_id) 130 identifier_bytes = _encode(identifier) 131 cdef char* c_identifier = cpython.PyBytes_AsString(identifier_bytes) 132 cdef vector[Label] c_labels = _labels_to_c_labels(exchange_labels) 133 cdef bint add_csm_optional_labels = False 134 135 for label_type in enabled_optional_labels: 136 if label_type == _observability.OptionalLabelType.XDS_SERVICE_LABELS: 137 add_csm_optional_labels = True 138 139 cdef void* call_tracer = CreateClientCallTracer(c_method, c_target, c_trace_id, c_parent_span_id, 140 c_identifier, c_labels, add_csm_optional_labels, 141 registered_method) 142 capsule = cpython.PyCapsule_New(call_tracer, CLIENT_CALL_TRACER, NULL) 143 return capsule 144 145 146def create_server_call_tracer_factory_capsule(dict exchange_labels, str identifier) -> cpython.PyObject: 147 """Create a ServerCallTracerFactory and save to PyCapsule. 148 149 Returns: A grpc_observability._observability.ServerCallTracerFactoryCapsule object. 150 """ 151 cdef vector[Label] c_labels = _labels_to_c_labels(exchange_labels) 152 cdef char* c_identifier = cpython.PyBytes_AsString(_encode(identifier)) 153 cdef void* call_tracer_factory = CreateServerCallTracerFactory(c_labels, c_identifier) 154 capsule = cpython.PyCapsule_New(call_tracer_factory, SERVER_CALL_TRACER_FACTORY, NULL) 155 return capsule 156 157 158def _c_label_to_labels(vector[Label] c_labels) -> Dict[str, AnyStr]: 159 py_labels = {} 160 for label in c_labels: 161 py_labels[_decode(label.key)] = label.value 162 return py_labels 163 164 165def _labels_to_c_labels(dict py_labels) -> vector[Label]: 166 cdef vector[Label] c_labels 167 cdef Label label 168 169 for key, value in py_labels.items(): 170 label.key = _encode(key) 171 label.value = _encode(value) 172 c_labels.push_back(label) 173 174 return c_labels 175 176 177def _c_measurement_to_measurement(object measurement 178 ) -> Mapping[str, Union[enum, Mapping[str, Union[float, int], bool]]]: 179 """Convert Cython Measurement to Python measurement. 180 181 Args: 182 measurement: Actual measurement represented by Cython type Measurement, using object here 183 since Cython refuse to automatically convert a union with unsafe type combinations. 184 185 Returns: 186 A mapping object with keys and values as following: 187 name -> cMetricsName 188 type -> MeasurementType 189 registered_method -> bool 190 include_exchange_labels -> bool 191 value -> {value_double: float | value_int: int} 192 """ 193 measurement: Measurement 194 195 py_measurement = {} 196 py_measurement['name'] = measurement['name'] 197 py_measurement['type'] = measurement['type'] 198 py_measurement['registered_method'] = measurement['registered_method'] 199 py_measurement['include_exchange_labels'] = measurement['include_exchange_labels'] 200 if measurement['type'] == kMeasurementDouble: 201 py_measurement['value'] = {'value_double': measurement['value']['value_double']} 202 else: 203 py_measurement['value'] = {'value_int': measurement['value']['value_int']} 204 return py_measurement 205 206 207def _c_annotation_to_annotations(vector[Annotation] c_annotations) -> List[Tuple[str, str]]: 208 py_annotations = [] 209 for annotation in c_annotations: 210 py_annotations.append((_decode(annotation.time_stamp), 211 _decode(annotation.description))) 212 return py_annotations 213 214 215def observability_deinit() -> None: 216 _shutdown_exporting_thread() 217 EnablePythonCensusStats(False) 218 EnablePythonCensusTracing(False) 219 220 221@functools.lru_cache(maxsize=None) 222def _cy_metric_name_to_py_metric_name(cMetricsName metric_name) -> MetricsName: 223 try: 224 return _CY_METRICS_NAME_TO_PY_METRICS_NAME_MAPPING[metric_name] 225 except KeyError: 226 raise ValueError('Invalid metric name %s' % metric_name) 227 228 229def _get_stats_data(object measurement, object labels, object identifier) -> _observability.StatsData: 230 """Convert a Python measurement to StatsData. 231 232 Args: 233 measurement: A dict of type Mapping[str, Union[enum, Mapping[str, Union[float, int]]]] 234 with keys and values as following: 235 name -> cMetricsName 236 type -> MeasurementType 237 registered_method -> bool 238 include_exchange_labels -> bool 239 value -> {value_double: float | value_int: int} 240 labels: Labels assciociated with stats data with type of Mapping[str, AnyStr]. 241 identifier: Specifies the plugins associated with this stats data. 242 """ 243 measurement: Measurement 244 labels: Mapping[str, AnyStr] 245 246 metric_name = _cy_metric_name_to_py_metric_name(measurement['name']) 247 identifiers = set(identifier.split(PLUGIN_IDENTIFIER_SEP)) 248 if measurement['type'] == kMeasurementDouble: 249 py_stat = _observability.StatsData(name=metric_name, measure_double=True, 250 value_float=measurement['value']['value_double'], 251 labels=labels, 252 identifiers=identifiers, 253 registered_method=measurement['registered_method'], 254 include_exchange_labels=measurement['include_exchange_labels'],) 255 else: 256 py_stat = _observability.StatsData(name=metric_name, measure_double=False, 257 value_int=measurement['value']['value_int'], 258 labels=labels, 259 identifiers=identifiers, 260 registered_method=measurement['registered_method'], 261 include_exchange_labels=measurement['include_exchange_labels'],) 262 return py_stat 263 264 265def _get_tracing_data(SpanCensusData span_data, vector[Label] span_labels, 266 vector[Annotation] span_annotations) -> _observability.TracingData: 267 py_span_labels = _c_label_to_labels(span_labels) 268 py_span_annotations = _c_annotation_to_annotations(span_annotations) 269 return _observability.TracingData(name=_decode(span_data.name), 270 start_time = _decode(span_data.start_time), 271 end_time = _decode(span_data.end_time), 272 trace_id = _decode(span_data.trace_id), 273 span_id = _decode(span_data.span_id), 274 parent_span_id = _decode(span_data.parent_span_id), 275 status = _decode(span_data.status), 276 should_sample = span_data.should_sample, 277 child_span_count = span_data.child_span_count, 278 span_labels = py_span_labels, 279 span_annotations = py_span_annotations) 280 281 282def _record_rpc_latency(object exporter, str method, str target, float rpc_latency, 283 str status_code, str identifier, bint registered_method) -> None: 284 exporter: _observability.Exporter 285 286 measurement = {} 287 measurement['name'] = kRpcClientApiLatencyMeasureName 288 measurement['type'] = kMeasurementDouble 289 measurement['value'] = {'value_double': rpc_latency} 290 measurement['registered_method'] = registered_method 291 measurement['include_exchange_labels'] = False 292 293 labels = {} 294 labels[_decode(kClientMethod)] = method.strip("/") 295 labels[_decode(kClientTarget)] = target 296 labels[_decode(kClientStatus)] = status_code 297 metric = _get_stats_data(measurement, labels, identifier) 298 exporter.export_stats_data([metric]) 299 300 301cdef void _export_census_data(object exporter): 302 """Main function running in export thread.""" 303 exporter: _observability.Exporter 304 305 cdef int export_interval_ms = CENSUS_EXPORT_BATCH_INTERVAL_SECS * 1000 306 while True: 307 with nogil: 308 while not GLOBAL_SHUTDOWN_EXPORT_THREAD: 309 lk = new unique_lock[mutex](g_census_data_buffer_mutex) 310 # Wait for next batch of census data OR timeout at fixed interval. 311 # Batch export census data to minimize the time we acquire the GIL. 312 AwaitNextBatchLocked(dereference(lk), export_interval_ms) 313 314 # Break only when buffer have data 315 if not g_census_data_buffer.empty(): 316 del lk 317 break 318 else: 319 del lk 320 321 _flush_census_data(exporter) 322 323 if GLOBAL_SHUTDOWN_EXPORT_THREAD: 324 break # Break to shutdown exporting thead 325 326 # Flush one last time before shutdown thread 327 _flush_census_data(exporter) 328 329 330cdef void _flush_census_data(object exporter): 331 exporter: _observability.Exporter 332 333 lk = new unique_lock[mutex](g_census_data_buffer_mutex) 334 if g_census_data_buffer.empty(): 335 del lk 336 return 337 py_metrics_batch = [] 338 py_spans_batch = [] 339 while not g_census_data_buffer.empty(): 340 c_census_data = g_census_data_buffer.front() 341 if c_census_data.type == kMetricData: 342 py_labels = _c_label_to_labels(c_census_data.labels) 343 py_identifier = _decode(c_census_data.identifier) 344 py_measurement = _c_measurement_to_measurement(c_census_data.measurement_data) 345 py_metric = _get_stats_data(py_measurement, py_labels, py_identifier) 346 py_metrics_batch.append(py_metric) 347 else: 348 py_span = _get_tracing_data(c_census_data.span_data, c_census_data.span_data.span_labels, 349 c_census_data.span_data.span_annotations) 350 py_spans_batch.append(py_span) 351 g_census_data_buffer.pop() 352 353 del lk 354 exporter.export_stats_data(py_metrics_batch) 355 exporter.export_tracing_data(py_spans_batch) 356 357 358cdef void _shutdown_exporting_thread(): 359 with nogil: 360 global GLOBAL_SHUTDOWN_EXPORT_THREAD 361 GLOBAL_SHUTDOWN_EXPORT_THREAD = True 362 g_census_data_buffer_cv.notify_all() 363 GLOBAL_EXPORT_THREAD.join(timeout=GRPC_PYTHON_CENSUS_EXPORT_THREAD_TIMEOUT) 364 365 366cdef str _decode(bytes bytestring): 367 if isinstance(bytestring, (str,)): 368 return <str>bytestring 369 else: 370 try: 371 return bytestring.decode('utf8') 372 except UnicodeDecodeError: 373 _LOGGER.exception('Invalid encoding on %s', bytestring) 374 return bytestring.decode('latin1') 375 376 377cdef bytes _encode(object string_or_none): 378 if string_or_none is None: 379 return b'' 380 elif isinstance(string_or_none, (bytes,)): 381 return <bytes>string_or_none 382 elif isinstance(string_or_none, (unicode,)): 383 return string_or_none.encode('utf8') 384 else: 385 raise TypeError('Expected str, not {}'.format(type(string_or_none))) 386