• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2023 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15cimport cpython
16from cython.operator cimport dereference
17
18import enum
19import functools
20import logging
21import os
22from threading import Thread
23from typing import AnyStr, Dict, List, Mapping, Tuple, Union
24
25from grpc_observability import _observability
26
27# Time we wait for batch exporting census data
28# TODO(xuanwn): change interval to a more appropriate number
29CENSUS_EXPORT_BATCH_INTERVAL_SECS = float(os.environ.get('GRPC_PYTHON_CENSUS_EXPORT_BATCH_INTERVAL_SECS', 0.5))
30GRPC_PYTHON_CENSUS_EXPORT_THREAD_TIMEOUT = float(os.environ.get('GRPC_PYTHON_CENSUS_EXPORT_THREAD_TIMEOUT', 10))
31cdef const char* CLIENT_CALL_TRACER = "client_call_tracer"
32cdef const char* SERVER_CALL_TRACER_FACTORY = "server_call_tracer_factory"
33cdef bint GLOBAL_SHUTDOWN_EXPORT_THREAD = False
34cdef object GLOBAL_EXPORT_THREAD
35
36PLUGIN_IDENTIFIER_SEP = ","
37
38_LOGGER = logging.getLogger(__name__)
39
40
41class _CyMetricsName:
42  CY_CLIENT_API_LATENCY = kRpcClientApiLatencyMeasureName
43  CY_CLIENT_SNET_MESSSAGES_PER_RPC = kRpcClientSentMessagesPerRpcMeasureName
44  CY_CLIENT_SEND_BYTES_PER_RPC = kRpcClientSentBytesPerRpcMeasureName
45  CY_CLIENT_RECEIVED_MESSAGES_PER_RPC = kRpcClientReceivedMessagesPerRpcMeasureName
46  CY_CLIENT_RECEIVED_BYTES_PER_RPC = kRpcClientReceivedBytesPerRpcMeasureName
47  CY_CLIENT_ROUNDTRIP_LATENCY = kRpcClientRoundtripLatencyMeasureName
48  CY_CLIENT_COMPLETED_RPC = kRpcClientCompletedRpcMeasureName
49  CY_CLIENT_SERVER_LATENCY = kRpcClientServerLatencyMeasureName
50  CY_CLIENT_STARTED_RPCS = kRpcClientStartedRpcsMeasureName
51  CY_CLIENT_RETRIES_PER_CALL = kRpcClientRetriesPerCallMeasureName
52  CY_CLIENT_TRANSPARENT_RETRIES_PER_CALL = kRpcClientTransparentRetriesPerCallMeasureName
53  CY_CLIENT_RETRY_DELAY_PER_CALL = kRpcClientRetryDelayPerCallMeasureName
54  CY_CLIENT_TRANSPORT_LATENCY = kRpcClientTransportLatencyMeasureName
55  CY_SERVER_SENT_MESSAGES_PER_RPC = kRpcServerSentMessagesPerRpcMeasureName
56  CY_SERVER_SENT_BYTES_PER_RPC = kRpcServerSentBytesPerRpcMeasureName
57  CY_SERVER_RECEIVED_MESSAGES_PER_RPC = kRpcServerReceivedMessagesPerRpcMeasureName
58  CY_SERVER_RECEIVED_BYTES_PER_RPC = kRpcServerReceivedBytesPerRpcMeasureName
59  CY_SERVER_SERVER_LATENCY = kRpcServerServerLatencyMeasureName
60  CY_SERVER_COMPLETED_RPC = kRpcServerCompletedRpcMeasureName
61  CY_SERVER_STARTED_RPCS = kRpcServerStartedRpcsMeasureName
62
63@enum.unique
64class MetricsName(enum.Enum):
65  CLIENT_STARTED_RPCS = _CyMetricsName.CY_CLIENT_STARTED_RPCS
66  CLIENT_API_LATENCY = _CyMetricsName.CY_CLIENT_API_LATENCY
67  CLIENT_SNET_MESSSAGES_PER_RPC = _CyMetricsName.CY_CLIENT_SNET_MESSSAGES_PER_RPC
68  CLIENT_SEND_BYTES_PER_RPC = _CyMetricsName.CY_CLIENT_SEND_BYTES_PER_RPC
69  CLIENT_RECEIVED_MESSAGES_PER_RPC = _CyMetricsName.CY_CLIENT_RECEIVED_MESSAGES_PER_RPC
70  CLIENT_RECEIVED_BYTES_PER_RPC = _CyMetricsName.CY_CLIENT_RECEIVED_BYTES_PER_RPC
71  CLIENT_ROUNDTRIP_LATENCY = _CyMetricsName.CY_CLIENT_ROUNDTRIP_LATENCY
72  CLIENT_COMPLETED_RPC = _CyMetricsName.CY_CLIENT_COMPLETED_RPC
73  CLIENT_SERVER_LATENCY = _CyMetricsName.CY_CLIENT_SERVER_LATENCY
74  CLIENT_RETRIES_PER_CALL = _CyMetricsName.CY_CLIENT_RETRIES_PER_CALL
75  CLIENT_TRANSPARENT_RETRIES_PER_CALL = _CyMetricsName.CY_CLIENT_TRANSPARENT_RETRIES_PER_CALL
76  CLIENT_RETRY_DELAY_PER_CALL = _CyMetricsName.CY_CLIENT_RETRY_DELAY_PER_CALL
77  SERVER_SENT_MESSAGES_PER_RPC = _CyMetricsName.CY_SERVER_SENT_MESSAGES_PER_RPC
78  SERVER_SENT_BYTES_PER_RPC = _CyMetricsName.CY_SERVER_SENT_BYTES_PER_RPC
79  SERVER_RECEIVED_MESSAGES_PER_RPC = _CyMetricsName.CY_SERVER_RECEIVED_MESSAGES_PER_RPC
80  SERVER_RECEIVED_BYTES_PER_RPC = _CyMetricsName.CY_SERVER_RECEIVED_BYTES_PER_RPC
81  SERVER_SERVER_LATENCY = _CyMetricsName.CY_SERVER_SERVER_LATENCY
82  SERVER_COMPLETED_RPC = _CyMetricsName.CY_SERVER_COMPLETED_RPC
83  SERVER_STARTED_RPCS = _CyMetricsName.CY_SERVER_STARTED_RPCS
84
85# Delay map creation due to circular dependencies
86_CY_METRICS_NAME_TO_PY_METRICS_NAME_MAPPING = {x.value: x for x in MetricsName}
87
88def cyobservability_init(object exporter) -> None:
89  exporter: _observability.Exporter
90
91  NativeObservabilityInit()
92  _start_exporting_thread(exporter)
93
94
95def _start_exporting_thread(object exporter) -> None:
96  exporter: _observability.Exporter
97
98  global GLOBAL_EXPORT_THREAD
99  global GLOBAL_SHUTDOWN_EXPORT_THREAD
100  GLOBAL_SHUTDOWN_EXPORT_THREAD = False
101  # TODO(xuanwn): Change it to daemon thread.
102  GLOBAL_EXPORT_THREAD = Thread(target=_export_census_data, args=(exporter,))
103  GLOBAL_EXPORT_THREAD.start()
104
105def activate_config(object py_config) -> None:
106  py_config: "_observability_config.GcpObservabilityConfig"
107
108  if (py_config.tracing_enabled):
109    EnablePythonCensusTracing(True);
110    # Save sampling rate to global sampler.
111    ProbabilitySampler.Get().SetThreshold(py_config.sampling_rate)
112
113  if (py_config.stats_enabled):
114    EnablePythonCensusStats(True);
115
116def activate_stats() -> None:
117  EnablePythonCensusStats(True);
118
119def create_client_call_tracer(bytes method_name, bytes target, bytes trace_id, str identifier,
120                              dict exchange_labels, object enabled_optional_labels,
121                              bint registered_method, bytes parent_span_id=b'') -> cpython.PyObject:
122  """Create a ClientCallTracer and save to PyCapsule.
123
124  Returns: A grpc_observability._observability.ClientCallTracerCapsule object.
125  """
126  cdef char* c_method = cpython.PyBytes_AsString(method_name)
127  cdef char* c_target = cpython.PyBytes_AsString(target)
128  cdef char* c_trace_id = cpython.PyBytes_AsString(trace_id)
129  cdef char* c_parent_span_id = cpython.PyBytes_AsString(parent_span_id)
130  identifier_bytes = _encode(identifier)
131  cdef char* c_identifier = cpython.PyBytes_AsString(identifier_bytes)
132  cdef vector[Label] c_labels = _labels_to_c_labels(exchange_labels)
133  cdef bint add_csm_optional_labels = False
134
135  for label_type in enabled_optional_labels:
136    if label_type == _observability.OptionalLabelType.XDS_SERVICE_LABELS:
137      add_csm_optional_labels = True
138
139  cdef void* call_tracer = CreateClientCallTracer(c_method, c_target, c_trace_id, c_parent_span_id,
140                                                  c_identifier, c_labels, add_csm_optional_labels,
141                                                  registered_method)
142  capsule = cpython.PyCapsule_New(call_tracer, CLIENT_CALL_TRACER, NULL)
143  return capsule
144
145
146def create_server_call_tracer_factory_capsule(dict exchange_labels, str identifier) -> cpython.PyObject:
147  """Create a ServerCallTracerFactory and save to PyCapsule.
148
149  Returns: A grpc_observability._observability.ServerCallTracerFactoryCapsule object.
150  """
151  cdef vector[Label] c_labels = _labels_to_c_labels(exchange_labels)
152  cdef char* c_identifier = cpython.PyBytes_AsString(_encode(identifier))
153  cdef void* call_tracer_factory = CreateServerCallTracerFactory(c_labels, c_identifier)
154  capsule = cpython.PyCapsule_New(call_tracer_factory, SERVER_CALL_TRACER_FACTORY, NULL)
155  return capsule
156
157
158def _c_label_to_labels(vector[Label] c_labels) -> Dict[str, AnyStr]:
159  py_labels = {}
160  for label in c_labels:
161    py_labels[_decode(label.key)] = label.value
162  return py_labels
163
164
165def _labels_to_c_labels(dict py_labels) -> vector[Label]:
166  cdef vector[Label] c_labels
167  cdef Label label
168
169  for key, value in py_labels.items():
170      label.key = _encode(key)
171      label.value = _encode(value)
172      c_labels.push_back(label)
173
174  return c_labels
175
176
177def _c_measurement_to_measurement(object measurement
178  ) -> Mapping[str, Union[enum, Mapping[str, Union[float, int], bool]]]:
179  """Convert Cython Measurement to Python measurement.
180
181  Args:
182  measurement: Actual measurement represented by Cython type Measurement, using object here
183   since Cython refuse to automatically convert a union with unsafe type combinations.
184
185  Returns:
186   A mapping object with keys and values as following:
187    name -> cMetricsName
188    type -> MeasurementType
189    registered_method -> bool
190    include_exchange_labels -> bool
191    value -> {value_double: float | value_int: int}
192  """
193  measurement: Measurement
194
195  py_measurement = {}
196  py_measurement['name'] = measurement['name']
197  py_measurement['type'] = measurement['type']
198  py_measurement['registered_method'] = measurement['registered_method']
199  py_measurement['include_exchange_labels'] = measurement['include_exchange_labels']
200  if measurement['type'] == kMeasurementDouble:
201    py_measurement['value'] = {'value_double': measurement['value']['value_double']}
202  else:
203    py_measurement['value'] = {'value_int': measurement['value']['value_int']}
204  return py_measurement
205
206
207def _c_annotation_to_annotations(vector[Annotation] c_annotations) -> List[Tuple[str, str]]:
208  py_annotations = []
209  for annotation in c_annotations:
210    py_annotations.append((_decode(annotation.time_stamp),
211                          _decode(annotation.description)))
212  return py_annotations
213
214
215def observability_deinit() -> None:
216  _shutdown_exporting_thread()
217  EnablePythonCensusStats(False)
218  EnablePythonCensusTracing(False)
219
220
221@functools.lru_cache(maxsize=None)
222def _cy_metric_name_to_py_metric_name(cMetricsName metric_name) -> MetricsName:
223  try:
224    return _CY_METRICS_NAME_TO_PY_METRICS_NAME_MAPPING[metric_name]
225  except KeyError:
226    raise ValueError('Invalid metric name %s' % metric_name)
227
228
229def _get_stats_data(object measurement, object labels, object identifier) -> _observability.StatsData:
230  """Convert a Python measurement to StatsData.
231
232  Args:
233  measurement: A dict of type Mapping[str, Union[enum, Mapping[str, Union[float, int]]]]
234    with keys and values as following:
235      name -> cMetricsName
236      type -> MeasurementType
237      registered_method -> bool
238      include_exchange_labels -> bool
239      value -> {value_double: float | value_int: int}
240  labels: Labels assciociated with stats data with type of Mapping[str, AnyStr].
241  identifier: Specifies the plugins associated with this stats data.
242  """
243  measurement: Measurement
244  labels: Mapping[str, AnyStr]
245
246  metric_name = _cy_metric_name_to_py_metric_name(measurement['name'])
247  identifiers = set(identifier.split(PLUGIN_IDENTIFIER_SEP))
248  if measurement['type'] == kMeasurementDouble:
249    py_stat = _observability.StatsData(name=metric_name, measure_double=True,
250                                       value_float=measurement['value']['value_double'],
251                                       labels=labels,
252                                       identifiers=identifiers,
253                                       registered_method=measurement['registered_method'],
254                                       include_exchange_labels=measurement['include_exchange_labels'],)
255  else:
256    py_stat = _observability.StatsData(name=metric_name, measure_double=False,
257                                       value_int=measurement['value']['value_int'],
258                                       labels=labels,
259                                       identifiers=identifiers,
260                                       registered_method=measurement['registered_method'],
261                                       include_exchange_labels=measurement['include_exchange_labels'],)
262  return py_stat
263
264
265def _get_tracing_data(SpanCensusData span_data, vector[Label] span_labels,
266                      vector[Annotation] span_annotations) -> _observability.TracingData:
267  py_span_labels = _c_label_to_labels(span_labels)
268  py_span_annotations = _c_annotation_to_annotations(span_annotations)
269  return _observability.TracingData(name=_decode(span_data.name),
270                                    start_time = _decode(span_data.start_time),
271                                    end_time = _decode(span_data.end_time),
272                                    trace_id = _decode(span_data.trace_id),
273                                    span_id = _decode(span_data.span_id),
274                                    parent_span_id = _decode(span_data.parent_span_id),
275                                    status = _decode(span_data.status),
276                                    should_sample = span_data.should_sample,
277                                    child_span_count = span_data.child_span_count,
278                                    span_labels = py_span_labels,
279                                    span_annotations = py_span_annotations)
280
281
282def _record_rpc_latency(object exporter, str method, str target, float rpc_latency,
283                        str status_code, str identifier, bint registered_method) -> None:
284  exporter: _observability.Exporter
285
286  measurement = {}
287  measurement['name'] = kRpcClientApiLatencyMeasureName
288  measurement['type'] = kMeasurementDouble
289  measurement['value'] = {'value_double': rpc_latency}
290  measurement['registered_method'] = registered_method
291  measurement['include_exchange_labels'] = False
292
293  labels = {}
294  labels[_decode(kClientMethod)] = method.strip("/")
295  labels[_decode(kClientTarget)] = target
296  labels[_decode(kClientStatus)] = status_code
297  metric = _get_stats_data(measurement, labels, identifier)
298  exporter.export_stats_data([metric])
299
300
301cdef void _export_census_data(object exporter):
302  """Main function running in export thread."""
303  exporter: _observability.Exporter
304
305  cdef int export_interval_ms = CENSUS_EXPORT_BATCH_INTERVAL_SECS * 1000
306  while True:
307    with nogil:
308      while not GLOBAL_SHUTDOWN_EXPORT_THREAD:
309        lk = new unique_lock[mutex](g_census_data_buffer_mutex)
310        # Wait for next batch of census data OR timeout at fixed interval.
311        # Batch export census data to minimize the time we acquire the GIL.
312        AwaitNextBatchLocked(dereference(lk), export_interval_ms)
313
314        # Break only when buffer have data
315        if not g_census_data_buffer.empty():
316          del lk
317          break
318        else:
319          del lk
320
321    _flush_census_data(exporter)
322
323    if GLOBAL_SHUTDOWN_EXPORT_THREAD:
324      break # Break to shutdown exporting thead
325
326  # Flush one last time before shutdown thread
327  _flush_census_data(exporter)
328
329
330cdef void _flush_census_data(object exporter):
331  exporter: _observability.Exporter
332
333  lk = new unique_lock[mutex](g_census_data_buffer_mutex)
334  if g_census_data_buffer.empty():
335    del lk
336    return
337  py_metrics_batch = []
338  py_spans_batch = []
339  while not g_census_data_buffer.empty():
340    c_census_data = g_census_data_buffer.front()
341    if c_census_data.type == kMetricData:
342      py_labels = _c_label_to_labels(c_census_data.labels)
343      py_identifier = _decode(c_census_data.identifier)
344      py_measurement = _c_measurement_to_measurement(c_census_data.measurement_data)
345      py_metric = _get_stats_data(py_measurement, py_labels, py_identifier)
346      py_metrics_batch.append(py_metric)
347    else:
348      py_span = _get_tracing_data(c_census_data.span_data, c_census_data.span_data.span_labels,
349                                  c_census_data.span_data.span_annotations)
350      py_spans_batch.append(py_span)
351    g_census_data_buffer.pop()
352
353  del lk
354  exporter.export_stats_data(py_metrics_batch)
355  exporter.export_tracing_data(py_spans_batch)
356
357
358cdef void _shutdown_exporting_thread():
359  with nogil:
360    global GLOBAL_SHUTDOWN_EXPORT_THREAD
361    GLOBAL_SHUTDOWN_EXPORT_THREAD = True
362    g_census_data_buffer_cv.notify_all()
363  GLOBAL_EXPORT_THREAD.join(timeout=GRPC_PYTHON_CENSUS_EXPORT_THREAD_TIMEOUT)
364
365
366cdef str _decode(bytes bytestring):
367  if isinstance(bytestring, (str,)):
368    return <str>bytestring
369  else:
370    try:
371      return bytestring.decode('utf8')
372    except UnicodeDecodeError:
373      _LOGGER.exception('Invalid encoding on %s', bytestring)
374      return bytestring.decode('latin1')
375
376
377cdef bytes _encode(object string_or_none):
378  if string_or_none is None:
379    return b''
380  elif isinstance(string_or_none, (bytes,)):
381    return <bytes>string_or_none
382  elif isinstance(string_or_none, (unicode,)):
383    return string_or_none.encode('utf8')
384  else:
385    raise TypeError('Expected str, not {}'.format(type(string_or_none)))
386