1# Copyright 2023 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15from typing import AnyStr, Callable, Dict, Iterable, List, Optional 16 17# pytype: disable=pyi-error 18from grpc_observability import _open_telemetry_observability 19from grpc_observability._observability import OptionalLabelType 20from opentelemetry.metrics import MeterProvider 21 22GRPC_METHOD_LABEL = "grpc.method" 23GRPC_TARGET_LABEL = "grpc.target" 24GRPC_CLIENT_METRIC_PREFIX = "grpc.client" 25GRPC_OTHER_LABEL_VALUE = "other" 26 27 28class OpenTelemetryLabelInjector: 29 """ 30 An interface that allows you to add additional labels on the calls traced. 31 """ 32 33 def get_labels_for_exchange(self) -> Dict[str, AnyStr]: 34 """ 35 Get labels used for metadata exchange. 36 37 Returns: 38 A dict of labels, with a string as key representing label name, string or bytes 39 as value representing label value. 40 """ 41 raise NotImplementedError() 42 43 def get_additional_labels( 44 self, include_exchange_labels: bool 45 ) -> Dict[str, str]: 46 """ 47 Get additional labels added by this injector. 48 49 The return value from this method will be added directly to metric data. 50 51 Args: 52 include_exchange_labels: Whether to add additional metadata exchange related labels. 53 54 Returns: 55 A dict of labels. 56 """ 57 raise NotImplementedError() 58 59 # pylint: disable=no-self-use 60 def deserialize_labels( 61 self, labels: Dict[str, AnyStr] 62 ) -> Dict[str, AnyStr]: 63 """ 64 Deserialize the labels if required. 65 66 If this injector added labels for metadata exchange, this method will be called to 67 deserialize the exchanged labels. 68 69 For example, if this injector added xds_peer_metadata_label for exchange: 70 71 labels: {"labelA": b"valueA", "xds_peer_metadata_label": b"exchanged_bytes"} 72 73 This method should deserialize xds_peer_metadata_label and return labels as: 74 75 labels: {"labelA": b"valueA", "xds_label_A": "xds_label_A", 76 "xds_label_B": "xds_label_B"} 77 78 Returns: 79 A dict of deserialized labels. 80 """ 81 return labels 82 83 84class OpenTelemetryPluginOption: 85 """ 86 An interface that allows you to add additional function to OpenTelemetryPlugin. 87 """ 88 89 90# pylint: disable=no-self-use 91class OpenTelemetryPlugin: 92 """Describes a Plugin for OpenTelemetry observability.""" 93 94 plugin_options: Iterable[OpenTelemetryPluginOption] 95 meter_provider: Optional[MeterProvider] 96 target_attribute_filter: Callable[[str], bool] 97 generic_method_attribute_filter: Callable[[str], bool] 98 _plugins: List[_open_telemetry_observability._OpenTelemetryPlugin] 99 100 def __init__( 101 self, 102 *, 103 plugin_options: Iterable[OpenTelemetryPluginOption] = [], 104 meter_provider: Optional[MeterProvider] = None, 105 target_attribute_filter: Optional[Callable[[str], bool]] = None, 106 generic_method_attribute_filter: Optional[Callable[[str], bool]] = None, 107 ): 108 """ 109 Args: 110 plugin_options: An Iterable of OpenTelemetryPluginOption which will be 111 enabled for this OpenTelemetryPlugin. 112 meter_provider: A MeterProvider which will be used to collect telemetry data, 113 or None which means no metrics will be collected. 114 target_attribute_filter: [DEPRECATED] This attribute is deprecated and should 115 not be used. 116 Once provided, this will be called per channel to decide whether to record the 117 target attribute on client or to replace it with "other". 118 This helps reduce the cardinality on metrics in cases where many channels 119 are created with different targets in the same binary (which might happen 120 for example, if the channel target string uses IP addresses directly). 121 Return True means the original target string will be used, False means target string 122 will be replaced with "other". 123 generic_method_attribute_filter: Once provided, this will be called with a generic 124 method type to decide whether to record the method name or to replace it with 125 "other". Note that pre-registered methods will always be recorded no matter what 126 this function returns. 127 Return True means the original method name will be used, False means method name will 128 be replaced with "other". 129 """ 130 self.plugin_options = plugin_options 131 self.meter_provider = meter_provider 132 self.target_attribute_filter = target_attribute_filter or ( 133 lambda target: True 134 ) 135 self.generic_method_attribute_filter = ( 136 generic_method_attribute_filter or (lambda target: False) 137 ) 138 self._plugins = [ 139 _open_telemetry_observability._OpenTelemetryPlugin(self) 140 ] 141 142 def register_global(self) -> None: 143 """ 144 Registers a global plugin that acts on all channels and servers running on the process. 145 146 Raises: 147 RuntimeError: If a global plugin was already registered. 148 """ 149 _open_telemetry_observability.start_open_telemetry_observability( 150 plugins=self._plugins 151 ) 152 153 def deregister_global(self) -> None: 154 """ 155 De-register the global plugin that acts on all channels and servers running on the process. 156 157 Raises: 158 RuntimeError: If no global plugin was registered. 159 """ 160 _open_telemetry_observability.end_open_telemetry_observability() 161 162 def __enter__(self) -> None: 163 _open_telemetry_observability.start_open_telemetry_observability( 164 plugins=self._plugins 165 ) 166 167 def __exit__(self, exc_type, exc_val, exc_tb) -> None: 168 _open_telemetry_observability.end_open_telemetry_observability() 169 170 def _get_enabled_optional_labels(self) -> List[OptionalLabelType]: 171 return [] 172