1# Copyright 2023 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14from __future__ import annotations 15 16import logging 17import time 18from typing import Any 19 20import grpc 21 22# pytype: disable=pyi-error 23from grpc_observability import _cyobservability 24from grpc_observability import _observability_config 25 26_LOGGER = logging.getLogger(__name__) 27 28ClientCallTracerCapsule = Any # it appears only once in the function signature 29ServerCallTracerFactoryCapsule = ( 30 Any # it appears only once in the function signature 31) 32grpc_observability = Any # grpc_observability.py imports this module. 33 34GRPC_STATUS_CODE_TO_STRING = { 35 grpc.StatusCode.OK: "OK", 36 grpc.StatusCode.CANCELLED: "CANCELLED", 37 grpc.StatusCode.UNKNOWN: "UNKNOWN", 38 grpc.StatusCode.INVALID_ARGUMENT: "INVALID_ARGUMENT", 39 grpc.StatusCode.DEADLINE_EXCEEDED: "DEADLINE_EXCEEDED", 40 grpc.StatusCode.NOT_FOUND: "NOT_FOUND", 41 grpc.StatusCode.ALREADY_EXISTS: "ALREADY_EXISTS", 42 grpc.StatusCode.PERMISSION_DENIED: "PERMISSION_DENIED", 43 grpc.StatusCode.UNAUTHENTICATED: "UNAUTHENTICATED", 44 grpc.StatusCode.RESOURCE_EXHAUSTED: "RESOURCE_EXHAUSTED", 45 grpc.StatusCode.FAILED_PRECONDITION: "FAILED_PRECONDITION", 46 grpc.StatusCode.ABORTED: "ABORTED", 47 grpc.StatusCode.OUT_OF_RANGE: "OUT_OF_RANGE", 48 grpc.StatusCode.UNIMPLEMENTED: "UNIMPLEMENTED", 49 grpc.StatusCode.INTERNAL: "INTERNAL", 50 grpc.StatusCode.UNAVAILABLE: "UNAVAILABLE", 51 grpc.StatusCode.DATA_LOSS: "DATA_LOSS", 52} 53 54 55# pylint: disable=no-self-use 56class GCPOpenCensusObservability(grpc._observability.ObservabilityPlugin): 57 """GCP OpenCensus based plugin implementation. 58 59 If no exporter is passed, the default will be OpenCensus StackDriver 60 based exporter. 61 62 For more details, please refer to User Guide: 63 * https://cloud.google.com/stackdriver/docs/solutions/grpc 64 65 Attributes: 66 config: Configuration for GCP OpenCensus Observability. 67 exporter: Exporter used to export data. 68 """ 69 70 config: _observability_config.GcpObservabilityConfig 71 exporter: "grpc_observability.Exporter" 72 73 def __init__(self, exporter: "grpc_observability.Exporter" = None): 74 self.exporter = None 75 self.config = None 76 try: 77 self.config = _observability_config.read_config() 78 _cyobservability.activate_config(self.config) 79 except Exception as e: # pylint: disable=broad-except 80 raise ValueError(f"Reading configuration failed with: {e}") 81 82 if exporter: 83 self.exporter = exporter 84 else: 85 raise ValueError(f"Please provide an exporter!") 86 87 if self.config.tracing_enabled: 88 self.set_tracing(True) 89 if self.config.stats_enabled: 90 self.set_stats(True) 91 92 def __enter__(self): 93 try: 94 _cyobservability.cyobservability_init(self.exporter) 95 # TODO(xuanwn): Use specific exceptons 96 except Exception as e: # pylint: disable=broad-except 97 _LOGGER.exception("GCPOpenCensusObservability failed with: %s", e) 98 99 grpc._observability.observability_init(self) 100 return self 101 102 def __exit__(self, exc_type, exc_val, exc_tb) -> None: 103 self.exit() 104 105 def exit(self) -> None: 106 # Sleep so we don't loss any data. If we shutdown export thread 107 # immediately after exit, it's possible that core didn't call RecordEnd 108 # in callTracer, and all data recorded by calling RecordEnd will be 109 # lost. 110 # CENSUS_EXPORT_BATCH_INTERVAL_SECS: The time equals to the time in 111 # AwaitNextBatchLocked. 112 # TODO(xuanwn): explicit synchronization 113 # https://github.com/grpc/grpc/issues/33262 114 time.sleep(_cyobservability.CENSUS_EXPORT_BATCH_INTERVAL_SECS) 115 self.set_tracing(False) 116 self.set_stats(False) 117 _cyobservability.observability_deinit() 118 grpc._observability.observability_deinit() 119 120 def create_client_call_tracer( 121 self, method_name: bytes, target: bytes 122 ) -> ClientCallTracerCapsule: 123 trace_id = b"TRACE_ID" 124 capsule = _cyobservability.create_client_call_tracer( 125 method_name, target, trace_id 126 ) 127 return capsule 128 129 def create_server_call_tracer_factory( 130 self, 131 ) -> ServerCallTracerFactoryCapsule: 132 capsule = _cyobservability.create_server_call_tracer_factory_capsule() 133 return capsule 134 135 def delete_client_call_tracer( 136 self, client_call_tracer: ClientCallTracerCapsule 137 ) -> None: 138 _cyobservability.delete_client_call_tracer(client_call_tracer) 139 140 def save_trace_context( 141 self, trace_id: str, span_id: str, is_sampled: bool 142 ) -> None: 143 pass 144 145 def record_rpc_latency( 146 self, 147 method: str, 148 target: str, 149 rpc_latency: float, 150 status_code: grpc.StatusCode, 151 ) -> None: 152 status_code = GRPC_STATUS_CODE_TO_STRING.get(status_code, "UNKNOWN") 153 _cyobservability._record_rpc_latency( 154 self.exporter, method, target, rpc_latency, status_code 155 ) 156