1# Copyright 2023 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15from collections import defaultdict 16import datetime 17import logging 18import os 19import sys 20import time 21from typing import Any, Callable, Dict, List, Optional, Set 22import unittest 23 24import grpc 25import grpc_observability 26from grpc_observability import _open_telemetry_measures 27from grpc_observability._open_telemetry_observability import ( 28 GRPC_OTHER_LABEL_VALUE, 29) 30from grpc_observability._open_telemetry_observability import GRPC_METHOD_LABEL 31from grpc_observability._open_telemetry_observability import GRPC_TARGET_LABEL 32from opentelemetry.sdk.metrics import MeterProvider 33from opentelemetry.sdk.metrics.export import AggregationTemporality 34from opentelemetry.sdk.metrics.export import MetricExportResult 35from opentelemetry.sdk.metrics.export import MetricExporter 36from opentelemetry.sdk.metrics.export import MetricsData 37from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader 38 39from tests.observability import _test_server 40 41logger = logging.getLogger(__name__) 42 43STREAM_LENGTH = 5 44OTEL_EXPORT_INTERVAL_S = 0.5 45 46 47class OTelMetricExporter(MetricExporter): 48 """Implementation of :class:`MetricExporter` that export metrics to the 49 provided metric_list. 50 51 all_metrics: A dict which key is grpc_observability._opentelemetry_measures.Metric.name, 52 value is a list of labels recorded for that metric. 53 An example item of this dict: 54 {"grpc.client.attempt.started": 55 [{'grpc.method': 'test/UnaryUnary', 'grpc.target': 'localhost:42517'}, 56 {'grpc.method': 'other', 'grpc.target': 'localhost:42517'}]} 57 """ 58 59 def __init__( 60 self, 61 all_metrics: Dict[str, List], 62 preferred_temporality: Dict[type, AggregationTemporality] = None, 63 preferred_aggregation: Dict[ 64 type, "opentelemetry.sdk.metrics.view.Aggregation" 65 ] = None, 66 ): 67 super().__init__( 68 preferred_temporality=preferred_temporality, 69 preferred_aggregation=preferred_aggregation, 70 ) 71 self.all_metrics = all_metrics 72 73 def export( 74 self, 75 metrics_data: MetricsData, 76 timeout_millis: float = 10_000, 77 **kwargs, 78 ) -> MetricExportResult: 79 self.record_metric(metrics_data) 80 return MetricExportResult.SUCCESS 81 82 def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: 83 pass 84 85 def force_flush(self, timeout_millis: float = 10_000) -> bool: 86 return True 87 88 def record_metric(self, metrics_data: MetricsData) -> None: 89 for resource_metric in metrics_data.resource_metrics: 90 for scope_metric in resource_metric.scope_metrics: 91 for metric in scope_metric.metrics: 92 for data_point in metric.data.data_points: 93 self.all_metrics[metric.name].append( 94 data_point.attributes 95 ) 96 97 98class _ClientUnaryUnaryInterceptor(grpc.UnaryUnaryClientInterceptor): 99 def intercept_unary_unary( 100 self, continuation, client_call_details, request_or_iterator 101 ): 102 response = continuation(client_call_details, request_or_iterator) 103 return response 104 105 106class _ServerInterceptor(grpc.ServerInterceptor): 107 def intercept_service(self, continuation, handler_call_details): 108 return continuation(handler_call_details) 109 110 111@unittest.skipIf( 112 os.name == "nt" or "darwin" in sys.platform, 113 "Observability is not supported in Windows and MacOS", 114) 115class OpenTelemetryObservabilityTest(unittest.TestCase): 116 def setUp(self): 117 self.all_metrics = defaultdict(list) 118 otel_exporter = OTelMetricExporter(self.all_metrics) 119 reader = PeriodicExportingMetricReader( 120 exporter=otel_exporter, 121 export_interval_millis=OTEL_EXPORT_INTERVAL_S * 1000, 122 ) 123 self._provider = MeterProvider(metric_readers=[reader]) 124 self._server = None 125 self._port = None 126 127 def tearDown(self): 128 if self._server: 129 self._server.stop(0) 130 131 def testRecordUnaryUnaryUseContextManager(self): 132 with grpc_observability.OpenTelemetryPlugin( 133 meter_provider=self._provider 134 ): 135 server, port = _test_server.start_server() 136 self._server = server 137 _test_server.unary_unary_call(port=port) 138 139 self._validate_metrics_exist(self.all_metrics) 140 self._validate_all_metrics_names(self.all_metrics) 141 142 def testRecordUnaryUnaryUseGlobalInit(self): 143 otel_plugin = grpc_observability.OpenTelemetryPlugin( 144 meter_provider=self._provider 145 ) 146 otel_plugin.register_global() 147 148 server, port = _test_server.start_server() 149 self._server = server 150 _test_server.unary_unary_call(port=port) 151 152 self._validate_metrics_exist(self.all_metrics) 153 self._validate_all_metrics_names(self.all_metrics) 154 otel_plugin.deregister_global() 155 156 def testCallGlobalInitThrowErrorWhenGlobalCalled(self): 157 otel_plugin = grpc_observability.OpenTelemetryPlugin( 158 meter_provider=self._provider 159 ) 160 otel_plugin.register_global() 161 try: 162 otel_plugin.register_global() 163 except RuntimeError as exp: 164 self.assertIn( 165 "gPRC Python observability was already initialized", str(exp) 166 ) 167 168 otel_plugin.deregister_global() 169 170 def testCallGlobalInitThrowErrorWhenContextManagerCalled(self): 171 with grpc_observability.OpenTelemetryPlugin( 172 meter_provider=self._provider 173 ): 174 try: 175 otel_plugin = grpc_observability.OpenTelemetryPlugin( 176 meter_provider=self._provider 177 ) 178 otel_plugin.register_global() 179 except RuntimeError as exp: 180 self.assertIn( 181 "gPRC Python observability was already initialized", 182 str(exp), 183 ) 184 185 def testCallContextManagerThrowErrorWhenGlobalInitCalled(self): 186 otel_plugin = grpc_observability.OpenTelemetryPlugin( 187 meter_provider=self._provider 188 ) 189 otel_plugin.register_global() 190 try: 191 with grpc_observability.OpenTelemetryPlugin( 192 meter_provider=self._provider 193 ): 194 pass 195 except RuntimeError as exp: 196 self.assertIn( 197 "gPRC Python observability was already initialized", str(exp) 198 ) 199 otel_plugin.deregister_global() 200 201 def testContextManagerThrowErrorWhenContextManagerCalled(self): 202 with grpc_observability.OpenTelemetryPlugin( 203 meter_provider=self._provider 204 ): 205 try: 206 with grpc_observability.OpenTelemetryPlugin( 207 meter_provider=self._provider 208 ): 209 pass 210 except RuntimeError as exp: 211 self.assertIn( 212 "gPRC Python observability was already initialized", 213 str(exp), 214 ) 215 216 def testNoErrorCallGlobalInitThenContextManager(self): 217 otel_plugin = grpc_observability.OpenTelemetryPlugin( 218 meter_provider=self._provider 219 ) 220 otel_plugin.register_global() 221 otel_plugin.deregister_global() 222 223 with grpc_observability.OpenTelemetryPlugin( 224 meter_provider=self._provider 225 ): 226 pass 227 228 def testNoErrorCallContextManagerThenGlobalInit(self): 229 with grpc_observability.OpenTelemetryPlugin( 230 meter_provider=self._provider 231 ): 232 pass 233 otel_plugin = grpc_observability.OpenTelemetryPlugin( 234 meter_provider=self._provider 235 ) 236 otel_plugin.register_global() 237 otel_plugin.deregister_global() 238 239 def testRecordUnaryUnaryWithClientInterceptor(self): 240 interceptor = _ClientUnaryUnaryInterceptor() 241 with grpc_observability.OpenTelemetryPlugin( 242 meter_provider=self._provider 243 ): 244 server, port = _test_server.start_server() 245 self._server = server 246 _test_server.intercepted_unary_unary_call( 247 port=port, interceptors=interceptor 248 ) 249 250 self._validate_metrics_exist(self.all_metrics) 251 self._validate_all_metrics_names(self.all_metrics) 252 253 def testRecordUnaryUnaryWithServerInterceptor(self): 254 interceptor = _ServerInterceptor() 255 with grpc_observability.OpenTelemetryPlugin( 256 meter_provider=self._provider 257 ): 258 server, port = _test_server.start_server(interceptors=[interceptor]) 259 self._server = server 260 _test_server.unary_unary_call(port=port) 261 262 self._validate_metrics_exist(self.all_metrics) 263 self._validate_all_metrics_names(self.all_metrics) 264 265 def testRecordUnaryUnaryClientOnly(self): 266 server, port = _test_server.start_server() 267 self._server = server 268 269 with grpc_observability.OpenTelemetryPlugin( 270 meter_provider=self._provider 271 ): 272 _test_server.unary_unary_call(port=port) 273 274 self._validate_metrics_exist(self.all_metrics) 275 self._validate_client_metrics_names(self.all_metrics) 276 277 def testNoRecordBeforeInit(self): 278 server, port = _test_server.start_server() 279 _test_server.unary_unary_call(port=port) 280 self.assertEqual(len(self.all_metrics), 0) 281 server.stop(0) 282 283 with grpc_observability.OpenTelemetryPlugin( 284 meter_provider=self._provider 285 ): 286 server, port = _test_server.start_server() 287 self._server = server 288 _test_server.unary_unary_call(port=port) 289 290 self._validate_metrics_exist(self.all_metrics) 291 self._validate_all_metrics_names(self.all_metrics) 292 293 def testNoRecordAfterExitUseContextManager(self): 294 with grpc_observability.OpenTelemetryPlugin( 295 meter_provider=self._provider 296 ): 297 server, port = _test_server.start_server() 298 self._server = server 299 self._port = port 300 _test_server.unary_unary_call(port=port) 301 302 self._validate_metrics_exist(self.all_metrics) 303 self._validate_all_metrics_names(self.all_metrics) 304 305 self.all_metrics = defaultdict(list) 306 _test_server.unary_unary_call(port=self._port) 307 with self.assertRaisesRegex(AssertionError, "No metrics was exported"): 308 self._validate_metrics_exist(self.all_metrics) 309 310 def testNoRecordAfterExitUseGlobal(self): 311 otel_plugin = grpc_observability.OpenTelemetryPlugin( 312 meter_provider=self._provider 313 ) 314 otel_plugin.register_global() 315 316 server, port = _test_server.start_server() 317 self._server = server 318 self._port = port 319 _test_server.unary_unary_call(port=port) 320 otel_plugin.deregister_global() 321 322 self._validate_metrics_exist(self.all_metrics) 323 self._validate_all_metrics_names(self.all_metrics) 324 325 self.all_metrics = defaultdict(list) 326 _test_server.unary_unary_call(port=self._port) 327 with self.assertRaisesRegex(AssertionError, "No metrics was exported"): 328 self._validate_metrics_exist(self.all_metrics) 329 330 def testRecordUnaryStream(self): 331 with grpc_observability.OpenTelemetryPlugin( 332 meter_provider=self._provider 333 ): 334 server, port = _test_server.start_server() 335 self._server = server 336 _test_server.unary_stream_call(port=port) 337 338 self._validate_metrics_exist(self.all_metrics) 339 self._validate_all_metrics_names(self.all_metrics) 340 341 def testRecordStreamUnary(self): 342 with grpc_observability.OpenTelemetryPlugin( 343 meter_provider=self._provider 344 ): 345 server, port = _test_server.start_server() 346 self._server = server 347 _test_server.stream_unary_call(port=port) 348 349 self._validate_metrics_exist(self.all_metrics) 350 self._validate_all_metrics_names(self.all_metrics) 351 352 def testRecordStreamStream(self): 353 with grpc_observability.OpenTelemetryPlugin( 354 meter_provider=self._provider 355 ): 356 server, port = _test_server.start_server() 357 self._server = server 358 _test_server.stream_stream_call(port=port) 359 360 self._validate_metrics_exist(self.all_metrics) 361 self._validate_all_metrics_names(self.all_metrics) 362 363 def testTargetAttributeFilter(self): 364 main_server, main_port = _test_server.start_server() 365 backup_server, backup_port = _test_server.start_server() 366 main_target = f"localhost:{main_port}" 367 backup_target = f"localhost:{backup_port}" 368 369 # Replace target label with 'other' for main_server. 370 def target_filter(target: str) -> bool: 371 if main_target in target: 372 return False 373 return True 374 375 with grpc_observability.OpenTelemetryPlugin( 376 meter_provider=self._provider, target_attribute_filter=target_filter 377 ): 378 _test_server.unary_unary_call(port=main_port) 379 _test_server.unary_unary_call(port=backup_port) 380 381 self._validate_metrics_exist(self.all_metrics) 382 self._validate_client_metrics_names(self.all_metrics) 383 384 target_values = set() 385 for label_list in self.all_metrics.values(): 386 for labels in label_list: 387 if GRPC_TARGET_LABEL in labels: 388 target_values.add(labels[GRPC_TARGET_LABEL]) 389 self.assertTrue(GRPC_OTHER_LABEL_VALUE in target_values) 390 self.assertTrue(backup_target in target_values) 391 392 main_server.stop(0) 393 backup_server.stop(0) 394 395 def testMethodAttributeFilter(self): 396 # If method name is 'test/UnaryUnaryFiltered', is should be replaced with 'other'. 397 FILTERED_METHOD_NAME = "test/UnaryUnaryFiltered" 398 399 def method_filter(method: str) -> bool: 400 if FILTERED_METHOD_NAME in method: 401 return False 402 return True 403 404 with grpc_observability.OpenTelemetryPlugin( 405 meter_provider=self._provider, 406 generic_method_attribute_filter=method_filter, 407 ): 408 server, port = _test_server.start_server() 409 self._server = server 410 _test_server.unary_unary_call(port=port) 411 _test_server.unary_unary_filtered_call(port=port) 412 413 self._validate_metrics_exist(self.all_metrics) 414 self._validate_all_metrics_names(self.all_metrics) 415 method_values = set() 416 for label_list in self.all_metrics.values(): 417 for labels in label_list: 418 if GRPC_METHOD_LABEL in labels: 419 method_values.add(labels[GRPC_METHOD_LABEL]) 420 self.assertTrue(GRPC_OTHER_LABEL_VALUE in method_values) 421 self.assertTrue(FILTERED_METHOD_NAME not in method_values) 422 423 def assert_eventually( 424 self, 425 predicate: Callable[[], bool], 426 *, 427 timeout: Optional[datetime.timedelta] = None, 428 message: Optional[Callable[[], str]] = None, 429 ) -> None: 430 message = message or (lambda: "Proposition did not evaluate to true") 431 timeout = timeout or datetime.timedelta(seconds=5) 432 end = datetime.datetime.now() + timeout 433 while datetime.datetime.now() < end: 434 if predicate(): 435 break 436 time.sleep(0.5) 437 else: 438 self.fail(message() + " after " + str(timeout)) 439 440 def _validate_metrics_exist(self, all_metrics: Dict[str, Any]) -> None: 441 # Sleep here to make sure we have at least one export from OTel MetricExporter. 442 self.assert_eventually( 443 lambda: len(all_metrics.keys()) > 1, 444 message=lambda: f"No metrics was exported", 445 ) 446 447 def _validate_all_metrics_names(self, metric_names: Set[str]) -> None: 448 self._validate_server_metrics_names(metric_names) 449 self._validate_client_metrics_names(metric_names) 450 451 def _validate_server_metrics_names(self, metric_names: Set[str]) -> None: 452 for base_metric in _open_telemetry_measures.base_metrics(): 453 if "grpc.server" in base_metric.name: 454 self.assertTrue( 455 base_metric.name in metric_names, 456 msg=f"metric {base_metric.name} not found in exported metrics: {metric_names}!", 457 ) 458 459 def _validate_client_metrics_names(self, metric_names: Set[str]) -> None: 460 for base_metric in _open_telemetry_measures.base_metrics(): 461 if "grpc.client" in base_metric.name: 462 self.assertTrue( 463 base_metric.name in metric_names, 464 msg=f"metric {base_metric.name} not found in exported metrics: {metric_names}!", 465 ) 466 467 468if __name__ == "__main__": 469 logging.basicConfig() 470 unittest.main(verbosity=2) 471