• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2023 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from typing import AnyStr, Callable, Dict, Iterable, List, Optional
16
17# pytype: disable=pyi-error
18from grpc_observability import _open_telemetry_observability
19from grpc_observability._observability import OptionalLabelType
20from opentelemetry.metrics import MeterProvider
21
22GRPC_METHOD_LABEL = "grpc.method"
23GRPC_TARGET_LABEL = "grpc.target"
24GRPC_CLIENT_METRIC_PREFIX = "grpc.client"
25GRPC_OTHER_LABEL_VALUE = "other"
26
27
28class OpenTelemetryLabelInjector:
29    """
30    An interface that allows you to add additional labels on the calls traced.
31    """
32
33    def get_labels_for_exchange(self) -> Dict[str, AnyStr]:
34        """
35        Get labels used for metadata exchange.
36
37        Returns:
38          A dict of labels, with a string as key representing label name, string or bytes
39        as value representing label value.
40        """
41        raise NotImplementedError()
42
43    def get_additional_labels(
44        self, include_exchange_labels: bool
45    ) -> Dict[str, str]:
46        """
47        Get additional labels added by this injector.
48
49        The return value from this method will be added directly to metric data.
50
51        Args:
52          include_exchange_labels: Whether to add additional metadata exchange related labels.
53
54        Returns:
55          A dict of labels.
56        """
57        raise NotImplementedError()
58
59    # pylint: disable=no-self-use
60    def deserialize_labels(
61        self, labels: Dict[str, AnyStr]
62    ) -> Dict[str, AnyStr]:
63        """
64        Deserialize the labels if required.
65
66        If this injector added labels for metadata exchange, this method will be called to
67        deserialize the exchanged labels.
68
69        For example, if this injector added xds_peer_metadata_label for exchange:
70
71            labels: {"labelA": b"valueA", "xds_peer_metadata_label": b"exchanged_bytes"}
72
73        This method should deserialize xds_peer_metadata_label and return labels as:
74
75            labels: {"labelA": b"valueA", "xds_label_A": "xds_label_A",
76                     "xds_label_B": "xds_label_B"}
77
78        Returns:
79          A dict of deserialized labels.
80        """
81        return labels
82
83
84class OpenTelemetryPluginOption:
85    """
86    An interface that allows you to add additional function to OpenTelemetryPlugin.
87    """
88
89
90# pylint: disable=no-self-use
91class OpenTelemetryPlugin:
92    """Describes a Plugin for OpenTelemetry observability."""
93
94    plugin_options: Iterable[OpenTelemetryPluginOption]
95    meter_provider: Optional[MeterProvider]
96    target_attribute_filter: Callable[[str], bool]
97    generic_method_attribute_filter: Callable[[str], bool]
98    _plugins: List[_open_telemetry_observability._OpenTelemetryPlugin]
99
100    def __init__(
101        self,
102        *,
103        plugin_options: Iterable[OpenTelemetryPluginOption] = [],
104        meter_provider: Optional[MeterProvider] = None,
105        target_attribute_filter: Optional[Callable[[str], bool]] = None,
106        generic_method_attribute_filter: Optional[Callable[[str], bool]] = None,
107    ):
108        """
109        Args:
110          plugin_options: An Iterable of OpenTelemetryPluginOption which will be
111        enabled for this OpenTelemetryPlugin.
112          meter_provider: A MeterProvider which will be used to collect telemetry data,
113        or None which means no metrics will be collected.
114          target_attribute_filter: [DEPRECATED] This attribute is deprecated and should
115        not be used.
116        Once provided, this will be called per channel to decide whether to record the
117        target attribute on client or to replace it with "other".
118        This helps reduce the cardinality on metrics in cases where many channels
119        are created with different targets in the same binary (which might happen
120        for example, if the channel target string uses IP addresses directly).
121        Return True means the original target string will be used, False means target string
122        will be replaced with "other".
123          generic_method_attribute_filter: Once provided, this will be called with a generic
124        method type to decide whether to record the method name or to replace it with
125        "other". Note that pre-registered methods will always be recorded no matter what
126        this function returns.
127        Return True means the original method name will be used, False means method name will
128        be replaced with "other".
129        """
130        self.plugin_options = plugin_options
131        self.meter_provider = meter_provider
132        self.target_attribute_filter = target_attribute_filter or (
133            lambda target: True
134        )
135        self.generic_method_attribute_filter = (
136            generic_method_attribute_filter or (lambda target: False)
137        )
138        self._plugins = [
139            _open_telemetry_observability._OpenTelemetryPlugin(self)
140        ]
141
142    def register_global(self) -> None:
143        """
144        Registers a global plugin that acts on all channels and servers running on the process.
145
146        Raises:
147            RuntimeError: If a global plugin was already registered.
148        """
149        _open_telemetry_observability.start_open_telemetry_observability(
150            plugins=self._plugins
151        )
152
153    def deregister_global(self) -> None:
154        """
155        De-register the global plugin that acts on all channels and servers running on the process.
156
157        Raises:
158            RuntimeError: If no global plugin was registered.
159        """
160        _open_telemetry_observability.end_open_telemetry_observability()
161
162    def __enter__(self) -> None:
163        _open_telemetry_observability.start_open_telemetry_observability(
164            plugins=self._plugins
165        )
166
167    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
168        _open_telemetry_observability.end_open_telemetry_observability()
169
170    def _get_enabled_optional_labels(self) -> List[OptionalLabelType]:
171        return []
172