• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2023 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14from __future__ import annotations
15
16import logging
17import time
18from typing import Any
19
20import grpc
21
22# pytype: disable=pyi-error
23from grpc_observability import _cyobservability
24from grpc_observability import _observability_config
25
26_LOGGER = logging.getLogger(__name__)
27
28ClientCallTracerCapsule = Any  # it appears only once in the function signature
29ServerCallTracerFactoryCapsule = (
30    Any  # it appears only once in the function signature
31)
32grpc_observability = Any  # grpc_observability.py imports this module.
33
34GRPC_STATUS_CODE_TO_STRING = {
35    grpc.StatusCode.OK: "OK",
36    grpc.StatusCode.CANCELLED: "CANCELLED",
37    grpc.StatusCode.UNKNOWN: "UNKNOWN",
38    grpc.StatusCode.INVALID_ARGUMENT: "INVALID_ARGUMENT",
39    grpc.StatusCode.DEADLINE_EXCEEDED: "DEADLINE_EXCEEDED",
40    grpc.StatusCode.NOT_FOUND: "NOT_FOUND",
41    grpc.StatusCode.ALREADY_EXISTS: "ALREADY_EXISTS",
42    grpc.StatusCode.PERMISSION_DENIED: "PERMISSION_DENIED",
43    grpc.StatusCode.UNAUTHENTICATED: "UNAUTHENTICATED",
44    grpc.StatusCode.RESOURCE_EXHAUSTED: "RESOURCE_EXHAUSTED",
45    grpc.StatusCode.FAILED_PRECONDITION: "FAILED_PRECONDITION",
46    grpc.StatusCode.ABORTED: "ABORTED",
47    grpc.StatusCode.OUT_OF_RANGE: "OUT_OF_RANGE",
48    grpc.StatusCode.UNIMPLEMENTED: "UNIMPLEMENTED",
49    grpc.StatusCode.INTERNAL: "INTERNAL",
50    grpc.StatusCode.UNAVAILABLE: "UNAVAILABLE",
51    grpc.StatusCode.DATA_LOSS: "DATA_LOSS",
52}
53
54
55# pylint: disable=no-self-use
56class GCPOpenCensusObservability(grpc._observability.ObservabilityPlugin):
57    """GCP OpenCensus based plugin implementation.
58
59    If no exporter is passed, the default will be OpenCensus StackDriver
60    based exporter.
61
62    For more details, please refer to User Guide:
63      * https://cloud.google.com/stackdriver/docs/solutions/grpc
64
65    Attributes:
66      config: Configuration for GCP OpenCensus Observability.
67      exporter: Exporter used to export data.
68    """
69
70    config: _observability_config.GcpObservabilityConfig
71    exporter: "grpc_observability.Exporter"
72
73    def __init__(self, exporter: "grpc_observability.Exporter" = None):
74        self.exporter = None
75        self.config = None
76        try:
77            self.config = _observability_config.read_config()
78            _cyobservability.activate_config(self.config)
79        except Exception as e:  # pylint: disable=broad-except
80            raise ValueError(f"Reading configuration failed with: {e}")
81
82        if exporter:
83            self.exporter = exporter
84        else:
85            raise ValueError(f"Please provide an exporter!")
86
87        if self.config.tracing_enabled:
88            self.set_tracing(True)
89        if self.config.stats_enabled:
90            self.set_stats(True)
91
92    def __enter__(self):
93        try:
94            _cyobservability.cyobservability_init(self.exporter)
95        # TODO(xuanwn): Use specific exceptons
96        except Exception as e:  # pylint: disable=broad-except
97            _LOGGER.exception("GCPOpenCensusObservability failed with: %s", e)
98
99        grpc._observability.observability_init(self)
100        return self
101
102    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
103        self.exit()
104
105    def exit(self) -> None:
106        # Sleep so we don't loss any data. If we shutdown export thread
107        # immediately after exit, it's possible that core didn't call RecordEnd
108        # in callTracer, and all data recorded by calling RecordEnd will be
109        # lost.
110        # CENSUS_EXPORT_BATCH_INTERVAL_SECS: The time equals to the time in
111        # AwaitNextBatchLocked.
112        # TODO(xuanwn): explicit synchronization
113        # https://github.com/grpc/grpc/issues/33262
114        time.sleep(_cyobservability.CENSUS_EXPORT_BATCH_INTERVAL_SECS)
115        self.set_tracing(False)
116        self.set_stats(False)
117        _cyobservability.observability_deinit()
118        grpc._observability.observability_deinit()
119
120    def create_client_call_tracer(
121        self, method_name: bytes, target: bytes
122    ) -> ClientCallTracerCapsule:
123        trace_id = b"TRACE_ID"
124        capsule = _cyobservability.create_client_call_tracer(
125            method_name, target, trace_id
126        )
127        return capsule
128
129    def create_server_call_tracer_factory(
130        self,
131    ) -> ServerCallTracerFactoryCapsule:
132        capsule = _cyobservability.create_server_call_tracer_factory_capsule()
133        return capsule
134
135    def delete_client_call_tracer(
136        self, client_call_tracer: ClientCallTracerCapsule
137    ) -> None:
138        _cyobservability.delete_client_call_tracer(client_call_tracer)
139
140    def save_trace_context(
141        self, trace_id: str, span_id: str, is_sampled: bool
142    ) -> None:
143        pass
144
145    def record_rpc_latency(
146        self,
147        method: str,
148        target: str,
149        rpc_latency: float,
150        status_code: grpc.StatusCode,
151    ) -> None:
152        status_code = GRPC_STATUS_CODE_TO_STRING.get(status_code, "UNKNOWN")
153        _cyobservability._record_rpc_latency(
154            self.exporter, method, target, rpc_latency, status_code
155        )
156