1# Copyright 2024 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15from collections import defaultdict 16import datetime 17import logging 18import os 19import sys 20import time 21from typing import Any, AnyStr, Callable, Dict, List, Optional, Set 22import unittest 23 24from google.protobuf import struct_pb2 25import grpc_observability 26from grpc_observability import _open_telemetry_measures 27from grpc_observability._open_telemetry_plugin import OpenTelemetryLabelInjector 28from grpc_observability._open_telemetry_plugin import OpenTelemetryPluginOption 29from opentelemetry.sdk.metrics import MeterProvider 30from opentelemetry.sdk.metrics.export import AggregationTemporality 31from opentelemetry.sdk.metrics.export import MetricExportResult 32from opentelemetry.sdk.metrics.export import MetricExporter 33from opentelemetry.sdk.metrics.export import MetricsData 34from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader 35 36from tests.observability import _test_server 37 38logger = logging.getLogger(__name__) 39 40STREAM_LENGTH = 5 41OTEL_EXPORT_INTERVAL_S = 0.5 42CSM_METADATA_EXCHANGE_LABEL_KEY = "exchange_labels_key" 43 44# The following metrics should have optional labels when optional 45# labels is enabled through OpenTelemetryPlugin. 46METRIC_NAME_WITH_OPTIONAL_LABEL = [ 47 "grpc.client.attempt.duration" 48 "grpc.client.attempt.sent_total_compressed_message_size", 49 "grpc.client.attempt.rcvd_total_compressed_message_size", 50] 51CSM_OPTIONAL_LABEL_KEYS = ["csm.service_name", "csm.service_namespace_name"] 52 53# The following metrics should have metadata exchange labels when metadata 54# exchange flow is triggered. 55METRIC_NAME_WITH_EXCHANGE_LABEL = [ 56 "grpc.client.attempt.duration" 57 "grpc.client.attempt.sent_total_compressed_message_size", 58 "grpc.client.attempt.rcvd_total_compressed_message_size", 59 "grpc.server.call.duration", 60 "grpc.server.call.sent_total_compressed_message_size", 61 "grpc.server.call.rcvd_total_compressed_message_size", 62] 63 64 65class OTelMetricExporter(MetricExporter): 66 """Implementation of :class:`MetricExporter` that export metrics to the 67 provided metric_list. 68 69 all_metrics: A dict which key is grpc_observability._opentelemetry_measures.Metric.name, 70 value is a list of labels recorded for that metric. 71 An example item of this dict: 72 {"grpc.client.attempt.started": 73 [{'grpc.method': 'test/UnaryUnary', 'grpc.target': 'localhost:42517'}, 74 {'grpc.method': 'other', 'grpc.target': 'localhost:42517'}]} 75 """ 76 77 def __init__( 78 self, 79 all_metrics: Dict[str, List], 80 preferred_temporality: Dict[type, AggregationTemporality] = None, 81 preferred_aggregation: Dict[ 82 type, "opentelemetry.sdk.metrics.view.Aggregation" 83 ] = None, 84 ): 85 super().__init__( 86 preferred_temporality=preferred_temporality, 87 preferred_aggregation=preferred_aggregation, 88 ) 89 self.all_metrics = all_metrics 90 91 def export( 92 self, 93 metrics_data: MetricsData, 94 timeout_millis: float = 10_000, 95 **kwargs, 96 ) -> MetricExportResult: 97 self.record_metric(metrics_data) 98 return MetricExportResult.SUCCESS 99 100 def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: 101 pass 102 103 def force_flush(self, timeout_millis: float = 10_000) -> bool: 104 return True 105 106 def record_metric(self, metrics_data: MetricsData) -> None: 107 for resource_metric in metrics_data.resource_metrics: 108 for scope_metric in resource_metric.scope_metrics: 109 for metric in scope_metric.metrics: 110 for data_point in metric.data.data_points: 111 self.all_metrics[metric.name].append( 112 data_point.attributes 113 ) 114 115 116class TestLabelInjector(OpenTelemetryLabelInjector): 117 _exchange_labels: Dict[str, AnyStr] 118 _local_labels: Dict[str, str] 119 120 def __init__( 121 self, local_labels: Dict[str, str], exchange_labels: Dict[str, str] 122 ): 123 self._exchange_labels = exchange_labels 124 self._local_labels = local_labels 125 126 def get_labels_for_exchange(self) -> Dict[str, AnyStr]: 127 return self._exchange_labels 128 129 def get_additional_labels( 130 self, include_exchange_labels: bool 131 ) -> Dict[str, str]: 132 return self._local_labels 133 134 def deserialize_labels( 135 self, labels: Dict[str, AnyStr] 136 ) -> Dict[str, AnyStr]: 137 deserialized_labels = {} 138 for key, value in labels.items(): 139 if "XEnvoyPeerMetadata" == key: 140 struct = struct_pb2.Struct() 141 struct.ParseFromString(value) 142 143 exchange_labels_value = self._get_value_from_struct( 144 CSM_METADATA_EXCHANGE_LABEL_KEY, struct 145 ) 146 deserialized_labels[ 147 CSM_METADATA_EXCHANGE_LABEL_KEY 148 ] = exchange_labels_value 149 else: 150 deserialized_labels[key] = value 151 152 return deserialized_labels 153 154 def _get_value_from_struct( 155 self, key: str, struct: struct_pb2.Struct 156 ) -> str: 157 value = struct.fields.get(key) 158 if not value: 159 return "unknown" 160 return value.string_value 161 162 163class TestOpenTelemetryPluginOption(OpenTelemetryPluginOption): 164 _label_injector: OpenTelemetryLabelInjector 165 _active_on_client: bool 166 _active_on_server: bool 167 168 def __init__( 169 self, 170 label_injector: OpenTelemetryLabelInjector, 171 active_on_client: Optional[bool] = True, 172 active_on_server: Optional[bool] = True, 173 ): 174 self._label_injector = label_injector 175 self._active_on_client = active_on_client 176 self._active_on_server = active_on_server 177 178 def is_active_on_client_channel(self, target: str) -> bool: 179 return self._active_on_client 180 181 def is_active_on_server(self, xds: bool) -> bool: 182 return self._active_on_server 183 184 def get_label_injector(self) -> OpenTelemetryLabelInjector: 185 return self._label_injector 186 187 188@unittest.skipIf( 189 os.name == "nt" or "darwin" in sys.platform, 190 "Observability is not supported in Windows and MacOS", 191) 192class ObservabilityPluginTest(unittest.TestCase): 193 def setUp(self): 194 self.all_metrics = defaultdict(list) 195 otel_exporter = OTelMetricExporter(self.all_metrics) 196 reader = PeriodicExportingMetricReader( 197 exporter=otel_exporter, 198 export_interval_millis=OTEL_EXPORT_INTERVAL_S * 1000, 199 ) 200 self._provider = MeterProvider(metric_readers=[reader]) 201 self._server = None 202 self._port = None 203 204 def tearDown(self): 205 if self._server: 206 self._server.stop(0) 207 208 def testLabelInjectorWithLocalLabels(self): 209 """Local labels in label injector should be added to all metrics.""" 210 label_injector = TestLabelInjector( 211 local_labels={"local_labels_key": "local_labels_value"}, 212 exchange_labels={}, 213 ) 214 plugin_option = TestOpenTelemetryPluginOption( 215 label_injector=label_injector 216 ) 217 otel_plugin = grpc_observability.OpenTelemetryPlugin( 218 meter_provider=self._provider, plugin_options=[plugin_option] 219 ) 220 221 otel_plugin.register_global() 222 self._server, port = _test_server.start_server() 223 _test_server.unary_unary_call(port=port) 224 otel_plugin.deregister_global() 225 226 self._validate_metrics_exist(self.all_metrics) 227 for name, label_list in self.all_metrics.items(): 228 self._validate_label_exist(name, label_list, ["local_labels_key"]) 229 230 def testClientSidePluginOption(self): 231 label_injector = TestLabelInjector( 232 local_labels={"local_labels_key": "local_labels_value"}, 233 exchange_labels={}, 234 ) 235 plugin_option = TestOpenTelemetryPluginOption( 236 label_injector=label_injector, active_on_server=False 237 ) 238 otel_plugin = grpc_observability.OpenTelemetryPlugin( 239 meter_provider=self._provider, plugin_options=[plugin_option] 240 ) 241 242 otel_plugin.register_global() 243 server, port = _test_server.start_server() 244 self._server = server 245 _test_server.unary_unary_call(port=port) 246 otel_plugin.deregister_global() 247 248 self._validate_metrics_exist(self.all_metrics) 249 for name, label_list in self.all_metrics.items(): 250 if "grpc.client" in name: 251 self._validate_label_exist( 252 name, label_list, ["local_labels_key"] 253 ) 254 for name, label_list in self.all_metrics.items(): 255 if "grpc.server" in name: 256 self._validate_label_not_exist( 257 name, label_list, ["local_labels_key"] 258 ) 259 260 def testServerSidePluginOption(self): 261 label_injector = TestLabelInjector( 262 local_labels={"local_labels_key": "local_labels_value"}, 263 exchange_labels={}, 264 ) 265 plugin_option = TestOpenTelemetryPluginOption( 266 label_injector=label_injector, active_on_client=False 267 ) 268 otel_plugin = grpc_observability.OpenTelemetryPlugin( 269 meter_provider=self._provider, plugin_options=[plugin_option] 270 ) 271 272 otel_plugin.register_global() 273 server, port = _test_server.start_server() 274 self._server = server 275 _test_server.unary_unary_call(port=port) 276 otel_plugin.deregister_global() 277 278 self._validate_metrics_exist(self.all_metrics) 279 for name, label_list in self.all_metrics.items(): 280 if "grpc.client" in name: 281 self._validate_label_not_exist( 282 name, label_list, ["local_labels_key"] 283 ) 284 for name, label_list in self.all_metrics.items(): 285 if "grpc.server" in name: 286 self._validate_label_exist( 287 name, label_list, ["local_labels_key"] 288 ) 289 290 def assert_eventually( 291 self, 292 predicate: Callable[[], bool], 293 *, 294 timeout: Optional[datetime.timedelta] = None, 295 message: Optional[Callable[[], str]] = None, 296 ) -> None: 297 message = message or (lambda: "Proposition did not evaluate to true") 298 timeout = timeout or datetime.timedelta(seconds=5) 299 end = datetime.datetime.now() + timeout 300 while datetime.datetime.now() < end: 301 if predicate(): 302 break 303 time.sleep(0.5) 304 else: 305 self.fail(message() + " after " + str(timeout)) 306 307 def _validate_metrics_exist(self, all_metrics: Dict[str, Any]) -> None: 308 # Sleep here to make sure we have at least one export from OTel MetricExporter. 309 self.assert_eventually( 310 lambda: len(all_metrics.keys()) > 1, 311 message=lambda: f"No metrics was exported", 312 ) 313 314 def _validate_all_metrics_names(self, metric_names: Set[str]) -> None: 315 self._validate_server_metrics_names(metric_names) 316 self._validate_client_metrics_names(metric_names) 317 318 def _validate_server_metrics_names(self, metric_names: Set[str]) -> None: 319 for base_metric in _open_telemetry_measures.base_metrics(): 320 if "grpc.server" in base_metric.name: 321 self.assertTrue( 322 base_metric.name in metric_names, 323 msg=f"metric {base_metric.name} not found in exported metrics: {metric_names}!", 324 ) 325 326 def _validate_client_metrics_names(self, metric_names: Set[str]) -> None: 327 for base_metric in _open_telemetry_measures.base_metrics(): 328 if "grpc.client" in base_metric.name: 329 self.assertTrue( 330 base_metric.name in metric_names, 331 msg=f"metric {base_metric.name} not found in exported metrics: {metric_names}!", 332 ) 333 334 def _validate_label_exist( 335 self, 336 metric_name: str, 337 metric_label_list: List[str], 338 labels_to_check: List[str], 339 ) -> None: 340 for metric_label in metric_label_list: 341 for label in labels_to_check: 342 self.assertTrue( 343 label in metric_label, 344 msg=f"label with key {label} not found in metric {metric_name}, found label list: {metric_label}", 345 ) 346 347 def _validate_label_not_exist( 348 self, 349 metric_name: str, 350 metric_label_list: List[str], 351 labels_to_check: List[str], 352 ) -> None: 353 for metric_label in metric_label_list: 354 for label in labels_to_check: 355 self.assertFalse( 356 label in metric_label, 357 msg=f"found unexpected label with key {label} in metric {metric_name}, found label list: {metric_label}", 358 ) 359 360 361if __name__ == "__main__": 362 logging.basicConfig() 363 unittest.main(verbosity=2) 364