1# Copyright 2023 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15from collections import defaultdict 16import datetime 17import logging 18import os 19import sys 20import time 21from typing import Any, Callable, Dict, List, Optional, Set 22import unittest 23 24import grpc 25import grpc_observability 26from grpc_observability import _open_telemetry_measures 27from grpc_observability._open_telemetry_observability import ( 28 GRPC_OTHER_LABEL_VALUE, 29) 30from grpc_observability._open_telemetry_observability import GRPC_METHOD_LABEL 31from grpc_observability._open_telemetry_observability import GRPC_TARGET_LABEL 32from opentelemetry.sdk.metrics import MeterProvider 33from opentelemetry.sdk.metrics.export import AggregationTemporality 34from opentelemetry.sdk.metrics.export import MetricExportResult 35from opentelemetry.sdk.metrics.export import MetricExporter 36from opentelemetry.sdk.metrics.export import MetricsData 37from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader 38 39from tests.observability import _test_server 40 41logger = logging.getLogger(__name__) 42 43STREAM_LENGTH = 5 44OTEL_EXPORT_INTERVAL_S = 0.5 45 46 47class OTelMetricExporter(MetricExporter): 48 """Implementation of :class:`MetricExporter` that export metrics to the 49 provided metric_list. 50 51 all_metrics: A dict which key is grpc_observability._opentelemetry_measures.Metric.name, 52 value is a list of labels recorded for that metric. 53 An example item of this dict: 54 {"grpc.client.attempt.started": 55 [{'grpc.method': 'test/UnaryUnary', 'grpc.target': 'localhost:42517'}, 56 {'grpc.method': 'other', 'grpc.target': 'localhost:42517'}]} 57 """ 58 59 def __init__( 60 self, 61 all_metrics: Dict[str, List], 62 preferred_temporality: Dict[type, AggregationTemporality] = None, 63 preferred_aggregation: Dict[ 64 type, "opentelemetry.sdk.metrics.view.Aggregation" 65 ] = None, 66 ): 67 super().__init__( 68 preferred_temporality=preferred_temporality, 69 preferred_aggregation=preferred_aggregation, 70 ) 71 self.all_metrics = all_metrics 72 73 def export( 74 self, 75 metrics_data: MetricsData, 76 timeout_millis: float = 10_000, 77 **kwargs, 78 ) -> MetricExportResult: 79 self.record_metric(metrics_data) 80 return MetricExportResult.SUCCESS 81 82 def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: 83 pass 84 85 def force_flush(self, timeout_millis: float = 10_000) -> bool: 86 return True 87 88 def record_metric(self, metrics_data: MetricsData) -> None: 89 for resource_metric in metrics_data.resource_metrics: 90 for scope_metric in resource_metric.scope_metrics: 91 for metric in scope_metric.metrics: 92 for data_point in metric.data.data_points: 93 self.all_metrics[metric.name].append( 94 data_point.attributes 95 ) 96 97 98class _ClientUnaryUnaryInterceptor(grpc.UnaryUnaryClientInterceptor): 99 def intercept_unary_unary( 100 self, continuation, client_call_details, request_or_iterator 101 ): 102 response = continuation(client_call_details, request_or_iterator) 103 return response 104 105 106class _ServerInterceptor(grpc.ServerInterceptor): 107 def intercept_service(self, continuation, handler_call_details): 108 return continuation(handler_call_details) 109 110 111@unittest.skipIf( 112 os.name == "nt" or "darwin" in sys.platform, 113 "Observability is not supported in Windows and MacOS", 114) 115class OpenTelemetryObservabilityTest(unittest.TestCase): 116 def setUp(self): 117 self.all_metrics = defaultdict(list) 118 otel_exporter = OTelMetricExporter(self.all_metrics) 119 reader = PeriodicExportingMetricReader( 120 exporter=otel_exporter, 121 export_interval_millis=OTEL_EXPORT_INTERVAL_S * 1000, 122 ) 123 self._provider = MeterProvider(metric_readers=[reader]) 124 self._server = None 125 self._port = None 126 127 def tearDown(self): 128 if self._server: 129 self._server.stop(0) 130 131 def testRecordUnaryUnaryUseContextManager(self): 132 with grpc_observability.OpenTelemetryPlugin( 133 meter_provider=self._provider 134 ): 135 server, port = _test_server.start_server() 136 self._server = server 137 _test_server.unary_unary_call(port=port) 138 139 self._validate_metrics_exist(self.all_metrics) 140 self._validate_all_metrics_names(self.all_metrics.keys()) 141 142 def testRecordUnaryUnaryUseGlobalInit(self): 143 otel_plugin = grpc_observability.OpenTelemetryPlugin( 144 meter_provider=self._provider 145 ) 146 otel_plugin.register_global() 147 148 server, port = _test_server.start_server() 149 self._server = server 150 _test_server.unary_unary_call(port=port) 151 152 self._validate_metrics_exist(self.all_metrics) 153 self._validate_all_metrics_names(self.all_metrics.keys()) 154 otel_plugin.deregister_global() 155 156 def testCallGlobalInitThrowErrorWhenGlobalCalled(self): 157 otel_plugin = grpc_observability.OpenTelemetryPlugin( 158 meter_provider=self._provider 159 ) 160 otel_plugin.register_global() 161 try: 162 otel_plugin.register_global() 163 except RuntimeError as exp: 164 self.assertIn( 165 "gPRC Python observability was already initialized", str(exp) 166 ) 167 168 otel_plugin.deregister_global() 169 170 def testCallGlobalInitThrowErrorWhenContextManagerCalled(self): 171 with grpc_observability.OpenTelemetryPlugin( 172 meter_provider=self._provider 173 ): 174 try: 175 otel_plugin = grpc_observability.OpenTelemetryPlugin( 176 meter_provider=self._provider 177 ) 178 otel_plugin.register_global() 179 except RuntimeError as exp: 180 self.assertIn( 181 "gPRC Python observability was already initialized", 182 str(exp), 183 ) 184 185 def testCallContextManagerThrowErrorWhenGlobalInitCalled(self): 186 otel_plugin = grpc_observability.OpenTelemetryPlugin( 187 meter_provider=self._provider 188 ) 189 otel_plugin.register_global() 190 try: 191 with grpc_observability.OpenTelemetryPlugin( 192 meter_provider=self._provider 193 ): 194 pass 195 except RuntimeError as exp: 196 self.assertIn( 197 "gPRC Python observability was already initialized", str(exp) 198 ) 199 otel_plugin.deregister_global() 200 201 def testContextManagerThrowErrorWhenContextManagerCalled(self): 202 with grpc_observability.OpenTelemetryPlugin( 203 meter_provider=self._provider 204 ): 205 try: 206 with grpc_observability.OpenTelemetryPlugin( 207 meter_provider=self._provider 208 ): 209 pass 210 except RuntimeError as exp: 211 self.assertIn( 212 "gPRC Python observability was already initialized", 213 str(exp), 214 ) 215 216 def testNoErrorCallGlobalInitThenContextManager(self): 217 otel_plugin = grpc_observability.OpenTelemetryPlugin( 218 meter_provider=self._provider 219 ) 220 otel_plugin.register_global() 221 otel_plugin.deregister_global() 222 223 with grpc_observability.OpenTelemetryPlugin( 224 meter_provider=self._provider 225 ): 226 pass 227 228 def testNoErrorCallContextManagerThenGlobalInit(self): 229 with grpc_observability.OpenTelemetryPlugin( 230 meter_provider=self._provider 231 ): 232 pass 233 otel_plugin = grpc_observability.OpenTelemetryPlugin( 234 meter_provider=self._provider 235 ) 236 otel_plugin.register_global() 237 otel_plugin.deregister_global() 238 239 def testRecordUnaryUnaryWithClientInterceptor(self): 240 interceptor = _ClientUnaryUnaryInterceptor() 241 with grpc_observability.OpenTelemetryPlugin( 242 meter_provider=self._provider 243 ): 244 server, port = _test_server.start_server() 245 self._server = server 246 _test_server.intercepted_unary_unary_call( 247 port=port, interceptors=interceptor 248 ) 249 250 self._validate_metrics_exist(self.all_metrics) 251 self._validate_all_metrics_names(self.all_metrics.keys()) 252 253 def testRecordUnaryUnaryWithServerInterceptor(self): 254 interceptor = _ServerInterceptor() 255 with grpc_observability.OpenTelemetryPlugin( 256 meter_provider=self._provider 257 ): 258 server, port = _test_server.start_server(interceptors=[interceptor]) 259 self._server = server 260 _test_server.unary_unary_call(port=port) 261 262 self._validate_metrics_exist(self.all_metrics) 263 self._validate_all_metrics_names(self.all_metrics.keys()) 264 265 def testRecordUnaryUnaryClientOnly(self): 266 server, port = _test_server.start_server() 267 self._server = server 268 269 with grpc_observability.OpenTelemetryPlugin( 270 meter_provider=self._provider 271 ): 272 _test_server.unary_unary_call(port=port) 273 274 self._validate_metrics_exist(self.all_metrics) 275 self._validate_client_metrics_names(self.all_metrics) 276 277 def testNoRecordBeforeInit(self): 278 server, port = _test_server.start_server() 279 _test_server.unary_unary_call(port=port) 280 self.assertEqual(len(self.all_metrics), 0) 281 server.stop(0) 282 283 with grpc_observability.OpenTelemetryPlugin( 284 meter_provider=self._provider 285 ): 286 server, port = _test_server.start_server() 287 self._server = server 288 _test_server.unary_unary_call(port=port) 289 290 self._validate_metrics_exist(self.all_metrics) 291 self._validate_all_metrics_names(self.all_metrics.keys()) 292 293 def testNoRecordAfterExitUseContextManager(self): 294 with grpc_observability.OpenTelemetryPlugin( 295 meter_provider=self._provider 296 ): 297 server, port = _test_server.start_server() 298 self._server = server 299 self._port = port 300 _test_server.unary_unary_call(port=port) 301 302 self._validate_metrics_exist(self.all_metrics) 303 self._validate_all_metrics_names(self.all_metrics.keys()) 304 305 self.all_metrics = defaultdict(list) 306 _test_server.unary_unary_call(port=self._port) 307 with self.assertRaisesRegex(AssertionError, "No metrics was exported"): 308 self._validate_metrics_exist(self.all_metrics) 309 310 def testNoRecordAfterExitUseGlobal(self): 311 otel_plugin = grpc_observability.OpenTelemetryPlugin( 312 meter_provider=self._provider 313 ) 314 otel_plugin.register_global() 315 316 server, port = _test_server.start_server() 317 self._server = server 318 self._port = port 319 _test_server.unary_unary_call(port=port) 320 otel_plugin.deregister_global() 321 322 self._validate_metrics_exist(self.all_metrics) 323 self._validate_all_metrics_names(self.all_metrics.keys()) 324 325 self.all_metrics = defaultdict(list) 326 _test_server.unary_unary_call(port=self._port) 327 with self.assertRaisesRegex(AssertionError, "No metrics was exported"): 328 self._validate_metrics_exist(self.all_metrics) 329 330 def testRecordUnaryStream(self): 331 with grpc_observability.OpenTelemetryPlugin( 332 meter_provider=self._provider 333 ): 334 server, port = _test_server.start_server() 335 self._server = server 336 _test_server.unary_stream_call(port=port) 337 338 self._validate_metrics_exist(self.all_metrics) 339 self._validate_all_metrics_names(self.all_metrics.keys()) 340 341 def testRecordStreamUnary(self): 342 with grpc_observability.OpenTelemetryPlugin( 343 meter_provider=self._provider 344 ): 345 server, port = _test_server.start_server() 346 self._server = server 347 _test_server.stream_unary_call(port=port) 348 349 self._validate_metrics_exist(self.all_metrics) 350 self._validate_all_metrics_names(self.all_metrics.keys()) 351 352 def testRecordStreamStream(self): 353 with grpc_observability.OpenTelemetryPlugin( 354 meter_provider=self._provider 355 ): 356 server, port = _test_server.start_server() 357 self._server = server 358 _test_server.stream_stream_call(port=port) 359 360 self._validate_metrics_exist(self.all_metrics) 361 self._validate_all_metrics_names(self.all_metrics.keys()) 362 363 def testTargetAttributeFilter(self): 364 main_server, main_port = _test_server.start_server() 365 backup_server, backup_port = _test_server.start_server() 366 main_target = f"localhost:{main_port}" 367 backup_target = f"localhost:{backup_port}" 368 369 # Replace target label with 'other' for main_server. 370 def target_filter(target: str) -> bool: 371 if main_target in target: 372 return False 373 return True 374 375 with grpc_observability.OpenTelemetryPlugin( 376 meter_provider=self._provider, target_attribute_filter=target_filter 377 ): 378 _test_server.unary_unary_call(port=main_port) 379 _test_server.unary_unary_call(port=backup_port) 380 381 self._validate_metrics_exist(self.all_metrics) 382 self._validate_client_metrics_names(self.all_metrics) 383 384 target_values = set() 385 for label_list in self.all_metrics.values(): 386 for labels in label_list: 387 if GRPC_TARGET_LABEL in labels: 388 target_values.add(labels[GRPC_TARGET_LABEL]) 389 self.assertTrue(GRPC_OTHER_LABEL_VALUE in target_values) 390 self.assertTrue(backup_target in target_values) 391 392 main_server.stop(0) 393 backup_server.stop(0) 394 395 def testMethodAttributeFilter(self): 396 # method_filter should replace method name 'test/UnaryUnaryFiltered' with 'other'. 397 FILTERED_METHOD_NAME = "test/UnaryUnaryFiltered" 398 399 def method_filter(method: str) -> bool: 400 if FILTERED_METHOD_NAME in method: 401 return False 402 return True 403 404 with grpc_observability.OpenTelemetryPlugin( 405 meter_provider=self._provider, 406 generic_method_attribute_filter=method_filter, 407 ): 408 server, port = _test_server.start_server(register_method=False) 409 self._server = server 410 _test_server.unary_unary_call(port=port, registered_method=True) 411 _test_server.unary_unary_filtered_call(port=port) 412 413 self._validate_metrics_exist(self.all_metrics) 414 self._validate_all_metrics_names(self.all_metrics.keys()) 415 method_values = set() 416 for label_list in self.all_metrics.values(): 417 for labels in label_list: 418 if GRPC_METHOD_LABEL in labels: 419 method_values.add(labels[GRPC_METHOD_LABEL]) 420 self.assertTrue(GRPC_OTHER_LABEL_VALUE in method_values) 421 self.assertTrue(FILTERED_METHOD_NAME not in method_values) 422 423 def testClientNonRegisteredMethod(self): 424 UNARY_METHOD_NAME = "test/UnaryUnary" 425 426 with grpc_observability.OpenTelemetryPlugin( 427 meter_provider=self._provider 428 ): 429 server, port = _test_server.start_server(register_method=True) 430 self._server = server 431 _test_server.unary_unary_call(port=port, registered_method=False) 432 433 self._validate_metrics_exist(self.all_metrics) 434 self._validate_all_metrics_names(self.all_metrics.keys()) 435 client_method_values = set() 436 server_method_values = set() 437 for metric_name, label_list in self.all_metrics.items(): 438 for labels in label_list: 439 if GRPC_METHOD_LABEL in labels: 440 if "grpc.client" in metric_name: 441 client_method_values.add(labels[GRPC_METHOD_LABEL]) 442 elif "grpc.server" in metric_name: 443 server_method_values.add(labels[GRPC_METHOD_LABEL]) 444 # For client metrics, all method name should be replaced with 'other'. 445 self.assertTrue(GRPC_OTHER_LABEL_VALUE in client_method_values) 446 self.assertTrue(UNARY_METHOD_NAME not in client_method_values) 447 448 # For server metrics, all method name should be 'test/UnaryUnary'. 449 self.assertTrue(GRPC_OTHER_LABEL_VALUE not in server_method_values) 450 self.assertTrue(UNARY_METHOD_NAME in server_method_values) 451 452 def testServerNonRegisteredMethod(self): 453 UNARY_METHOD_NAME = "test/UnaryUnary" 454 455 with grpc_observability.OpenTelemetryPlugin( 456 meter_provider=self._provider 457 ): 458 server, port = _test_server.start_server(register_method=False) 459 self._server = server 460 _test_server.unary_unary_call(port=port, registered_method=True) 461 462 self._validate_metrics_exist(self.all_metrics) 463 self._validate_all_metrics_names(self.all_metrics.keys()) 464 client_method_values = set() 465 server_method_values = set() 466 for metric_name, label_list in self.all_metrics.items(): 467 for labels in label_list: 468 if GRPC_METHOD_LABEL in labels: 469 if "grpc.client" in metric_name: 470 client_method_values.add(labels[GRPC_METHOD_LABEL]) 471 elif "grpc.server" in metric_name: 472 server_method_values.add(labels[GRPC_METHOD_LABEL]) 473 # For client metrics, all method name should be 'test/UnaryUnary'. 474 self.assertTrue(GRPC_OTHER_LABEL_VALUE not in client_method_values) 475 self.assertTrue(UNARY_METHOD_NAME in client_method_values) 476 477 # For server metrics, all method name should be replaced with 'other'. 478 self.assertTrue(GRPC_OTHER_LABEL_VALUE in server_method_values) 479 self.assertTrue(UNARY_METHOD_NAME not in server_method_values) 480 481 def assert_eventually( 482 self, 483 predicate: Callable[[], bool], 484 *, 485 timeout: Optional[datetime.timedelta] = None, 486 message: Optional[Callable[[], str]] = None, 487 ) -> None: 488 message = message or (lambda: "Proposition did not evaluate to true") 489 timeout = timeout or datetime.timedelta(seconds=5) 490 end = datetime.datetime.now() + timeout 491 while datetime.datetime.now() < end: 492 if predicate(): 493 break 494 time.sleep(0.5) 495 else: 496 self.fail(message() + " after " + str(timeout)) 497 498 def _validate_metrics_exist(self, all_metrics: Dict[str, Any]) -> None: 499 # Sleep here to make sure we have at least one export from OTel MetricExporter. 500 self.assert_eventually( 501 lambda: len(all_metrics.keys()) > 1, 502 message=lambda: f"No metrics was exported", 503 ) 504 505 def _validate_all_metrics_names(self, metric_names: Set[str]) -> None: 506 self._validate_server_metrics_names(metric_names) 507 self._validate_client_metrics_names(metric_names) 508 509 def _validate_server_metrics_names(self, metric_names: Set[str]) -> None: 510 for base_metric in _open_telemetry_measures.base_metrics(): 511 if "grpc.server" in base_metric.name: 512 self.assertTrue( 513 base_metric.name in metric_names, 514 msg=f"metric {base_metric.name} not found in exported metrics: {metric_names}!", 515 ) 516 517 def _validate_client_metrics_names(self, metric_names: Set[str]) -> None: 518 for base_metric in _open_telemetry_measures.base_metrics(): 519 if "grpc.client" in base_metric.name: 520 self.assertTrue( 521 base_metric.name in metric_names, 522 msg=f"metric {base_metric.name} not found in exported metrics: {metric_names}!", 523 ) 524 525 526if __name__ == "__main__": 527 logging.basicConfig() 528 unittest.main(verbosity=2) 529