1# Copyright 2024 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15from collections import defaultdict 16import datetime 17import logging 18import os 19import sys 20import time 21from typing import Any, Callable, Dict, List, Optional, Set 22import unittest 23from unittest import mock 24 25from grpc_csm_observability import CsmOpenTelemetryPlugin 26from grpc_csm_observability._csm_observability_plugin import ( 27 CSMOpenTelemetryLabelInjector, 28) 29from grpc_csm_observability._csm_observability_plugin import TYPE_GCE 30from grpc_csm_observability._csm_observability_plugin import TYPE_GKE 31from grpc_csm_observability._csm_observability_plugin import UNKNOWN_VALUE 32import grpc_observability 33from grpc_observability import _open_telemetry_measures 34from grpc_observability._open_telemetry_plugin import OpenTelemetryLabelInjector 35from grpc_observability._open_telemetry_plugin import OpenTelemetryPluginOption 36from opentelemetry.sdk.metrics import MeterProvider 37from opentelemetry.sdk.metrics.export import AggregationTemporality 38from opentelemetry.sdk.metrics.export import MetricExportResult 39from opentelemetry.sdk.metrics.export import MetricExporter 40from opentelemetry.sdk.metrics.export import MetricsData 41from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader 42from opentelemetry.sdk.resources import Resource 43 44from tests.observability import _test_server 45 46logger = logging.getLogger(__name__) 47 48OTEL_EXPORT_INTERVAL_S = 0.5 49# We only expect basic labels to be exchanged. 50CSM_METADATA_EXCHANGE_DEFAULT_LABELS = [ 51 "csm.remote_workload_type", 52 "csm.remote_workload_canonical_service", 53] 54 55# The following metrics should have optional labels when optional 56# labels is enabled through OpenTelemetryPlugin. 57METRIC_NAME_WITH_OPTIONAL_LABEL = [ 58 "grpc.client.attempt.duration", 59 "grpc.client.attempt.sent_total_compressed_message_size", 60 "grpc.client.attempt.rcvd_total_compressed_message_size", 61] 62CSM_OPTIONAL_LABEL_KEYS = ["csm.service_name", "csm.service_namespace_name"] 63 64# The following metrics should have metadata exchange labels when metadata 65# exchange flow is triggered. 66METRIC_NAME_WITH_EXCHANGE_LABEL = [ 67 "grpc.client.attempt.duration", 68 "grpc.client.attempt.sent_total_compressed_message_size", 69 "grpc.client.attempt.rcvd_total_compressed_message_size", 70 "grpc.server.call.duration", 71 "grpc.server.call.sent_total_compressed_message_size", 72 "grpc.server.call.rcvd_total_compressed_message_size", 73] 74 75MOCK_GKE_RESOURCE = Resource.create( 76 attributes={ 77 "gcp.resource_type": "gke_container", 78 "k8s.pod.name": "pod", 79 "k8s.container.name": "container", 80 "k8s.namespace.name": "namespace", 81 "k8s.cluster.name": "cluster", 82 "cloud.region": "region", 83 "cloud.account.id": "id", 84 } 85) 86 87MOCK_GCE_RESOURCE = Resource.create( 88 attributes={ 89 "gcp.resource_type": "gce_instance", 90 "cloud.zone": "zone", 91 "cloud.account.id": "id", 92 } 93) 94 95MOCK_UNKNOWN_RESOURCE = Resource.create( 96 attributes={ 97 "gcp.resource_type": "random", 98 } 99) 100 101 102class OTelMetricExporter(MetricExporter): 103 """Implementation of :class:`MetricExporter` that export metrics to the 104 provided metric_list. 105 106 all_metrics: A dict which key is grpc_observability._opentelemetry_measures.Metric.name, 107 value is a list of labels recorded for that metric. 108 An example item of this dict: 109 {"grpc.client.attempt.started": 110 [{'grpc.method': 'test/UnaryUnary', 'grpc.target': 'localhost:42517'}, 111 {'grpc.method': 'other', 'grpc.target': 'localhost:42517'}]} 112 """ 113 114 def __init__( 115 self, 116 all_metrics: Dict[str, List], 117 preferred_temporality: Dict[type, AggregationTemporality] = None, 118 preferred_aggregation: Dict[ 119 type, "opentelemetry.sdk.metrics.view.Aggregation" 120 ] = None, 121 ): 122 super().__init__( 123 preferred_temporality=preferred_temporality, 124 preferred_aggregation=preferred_aggregation, 125 ) 126 self.all_metrics = all_metrics 127 128 def export( 129 self, 130 metrics_data: MetricsData, 131 timeout_millis: float = 10_000, 132 **kwargs, 133 ) -> MetricExportResult: 134 self.record_metric(metrics_data) 135 return MetricExportResult.SUCCESS 136 137 def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: 138 pass 139 140 def force_flush(self, timeout_millis: float = 10_000) -> bool: 141 return True 142 143 def record_metric(self, metrics_data: MetricsData) -> None: 144 for resource_metric in metrics_data.resource_metrics: 145 for scope_metric in resource_metric.scope_metrics: 146 for metric in scope_metric.metrics: 147 for data_point in metric.data.data_points: 148 self.all_metrics[metric.name].append( 149 data_point.attributes 150 ) 151 152 153class TestOpenTelemetryPluginOption(OpenTelemetryPluginOption): 154 _label_injector: OpenTelemetryLabelInjector 155 _active_on_client: bool 156 _active_on_server: bool 157 158 def __init__( 159 self, 160 label_injector: OpenTelemetryLabelInjector, 161 active_on_client: Optional[bool] = True, 162 active_on_server: Optional[bool] = True, 163 ): 164 self._label_injector = label_injector 165 self._active_on_client = active_on_client 166 self._active_on_server = active_on_server 167 168 def is_active_on_client_channel(self, target: str) -> bool: 169 return self._active_on_client 170 171 def is_active_on_server(self, xds: bool) -> bool: 172 return self._active_on_server 173 174 def get_label_injector(self) -> OpenTelemetryLabelInjector: 175 return self._label_injector 176 177 178@unittest.skipIf( 179 os.name == "nt" or "darwin" in sys.platform, 180 "Observability is not supported in Windows and MacOS", 181) 182class CSMObservabilityPluginTest(unittest.TestCase): 183 def setUp(self): 184 self.all_metrics = defaultdict(list) 185 otel_exporter = OTelMetricExporter(self.all_metrics) 186 reader = PeriodicExportingMetricReader( 187 exporter=otel_exporter, 188 export_interval_millis=OTEL_EXPORT_INTERVAL_S * 1000, 189 ) 190 self._provider = MeterProvider(metric_readers=[reader]) 191 self._server = None 192 self._port = None 193 194 def tearDown(self): 195 if self._server: 196 self._server.stop(0) 197 198 def testOptionalXdsServiceLabelExist(self): 199 csm_plugin = CsmOpenTelemetryPlugin( 200 meter_provider=self._provider, 201 ) 202 203 csm_plugin.register_global() 204 self._server, port = _test_server.start_server() 205 _test_server.unary_unary_call(port=port) 206 csm_plugin.deregister_global() 207 208 validate_metrics_exist(self, self.all_metrics) 209 for name, label_list in self.all_metrics.items(): 210 if name in METRIC_NAME_WITH_OPTIONAL_LABEL: 211 self._validate_label_exist( 212 name, label_list, CSM_OPTIONAL_LABEL_KEYS 213 ) 214 else: 215 self._validate_label_not_exist( 216 name, label_list, CSM_OPTIONAL_LABEL_KEYS 217 ) 218 219 def testPluginOptionOnlyEnabledForXdsTargets(self): 220 csm_plugin = CsmOpenTelemetryPlugin( 221 meter_provider=self._provider, 222 ) 223 csm_plugin_option = csm_plugin.plugin_options[0] 224 self.assertFalse( 225 csm_plugin_option.is_active_on_client_channel("foo.bar.google.com") 226 ) 227 self.assertFalse( 228 csm_plugin_option.is_active_on_client_channel( 229 "dns:///foo.bar.google.com" 230 ) 231 ) 232 self.assertFalse( 233 csm_plugin_option.is_active_on_client_channel( 234 "dns:///foo.bar.google.com:1234" 235 ) 236 ) 237 self.assertFalse( 238 csm_plugin_option.is_active_on_client_channel( 239 "dns://authority/foo.bar.google.com:1234" 240 ) 241 ) 242 self.assertFalse( 243 csm_plugin_option.is_active_on_client_channel("xds://authority/foo") 244 ) 245 246 self.assertTrue( 247 csm_plugin_option.is_active_on_client_channel("xds:///foo") 248 ) 249 self.assertTrue( 250 csm_plugin_option.is_active_on_client_channel( 251 "xds://traffic-director-global.xds.googleapis.com/foo" 252 ) 253 ) 254 self.assertTrue( 255 csm_plugin_option.is_active_on_client_channel( 256 "xds://traffic-director-global.xds.googleapis.com/foo.bar" 257 ) 258 ) 259 260 def _validate_all_metrics_names(self, metric_names: Set[str]) -> None: 261 self._validate_server_metrics_names(metric_names) 262 self._validate_client_metrics_names(metric_names) 263 264 def _validate_server_metrics_names(self, metric_names: Set[str]) -> None: 265 for base_metric in _open_telemetry_measures.base_metrics(): 266 if "grpc.server" in base_metric.name: 267 self.assertTrue( 268 base_metric.name in metric_names, 269 msg=f"metric {base_metric.name} not found in exported metrics: {metric_names}!", 270 ) 271 272 def _validate_client_metrics_names(self, metric_names: Set[str]) -> None: 273 for base_metric in _open_telemetry_measures.base_metrics(): 274 if "grpc.client" in base_metric.name: 275 self.assertTrue( 276 base_metric.name in metric_names, 277 msg=f"metric {base_metric.name} not found in exported metrics: {metric_names}!", 278 ) 279 280 def _validate_label_exist( 281 self, 282 metric_name: str, 283 metric_label_list: List[str], 284 labels_to_check: List[str], 285 ) -> None: 286 for metric_label in metric_label_list: 287 for label in labels_to_check: 288 self.assertTrue( 289 label in metric_label, 290 msg=f"label with key {label} not found in metric {metric_name}, found label list: {metric_label}", 291 ) 292 293 def _validate_label_not_exist( 294 self, 295 metric_name: str, 296 metric_label_list: List[str], 297 labels_to_check: List[str], 298 ) -> None: 299 for metric_label in metric_label_list: 300 for label in labels_to_check: 301 self.assertFalse( 302 label in metric_label, 303 msg=f"found unexpected label with key {label} in metric {metric_name}, found label list: {metric_label}", 304 ) 305 306 307@unittest.skipIf( 308 os.name == "nt" or "darwin" in sys.platform, 309 "Observability is not supported in Windows and MacOS", 310) 311class MetadataExchangeTest(unittest.TestCase): 312 def setUp(self): 313 self.all_metrics = defaultdict(list) 314 otel_exporter = OTelMetricExporter(self.all_metrics) 315 reader = PeriodicExportingMetricReader( 316 exporter=otel_exporter, 317 export_interval_millis=OTEL_EXPORT_INTERVAL_S * 1000, 318 ) 319 self._provider = MeterProvider(metric_readers=[reader]) 320 self._server = None 321 self._port = None 322 323 def tearDown(self): 324 if self._server: 325 self._server.stop(0) 326 327 @mock.patch( 328 "opentelemetry.resourcedetector.gcp_resource_detector.GoogleCloudResourceDetector.detect" 329 ) 330 def testMetadataExchangeClientDoesNotSendMetadata(self, mock_detector): 331 mock_detector.return_value = MOCK_GKE_RESOURCE 332 with mock.patch.dict( 333 os.environ, 334 { 335 "CSM_CANONICAL_SERVICE_NAME": "canonical_service", 336 "CSM_WORKLOAD_NAME": "workload", 337 }, 338 ): 339 plugin_option = TestOpenTelemetryPluginOption( 340 label_injector=CSMOpenTelemetryLabelInjector(), 341 active_on_client=False, 342 ) 343 344 # Manually create csm_plugin so that it's always disabled on client. 345 csm_plugin = grpc_observability.OpenTelemetryPlugin( 346 meter_provider=self._provider, plugin_options=[plugin_option] 347 ) 348 349 csm_plugin.register_global() 350 self._server, port = _test_server.start_server() 351 _test_server.unary_unary_call(port=port) 352 csm_plugin.deregister_global() 353 354 validate_metrics_exist(self, self.all_metrics) 355 for name, label_list in self.all_metrics.items(): 356 for labels in label_list: 357 # Verifies that the server records unknown when the client does not send metadata 358 if name in ["grpc.server.call.duration"]: 359 self.assertEqual( 360 labels["csm.workload_canonical_service"], 361 "canonical_service", 362 ) 363 self.assertEqual( 364 labels["csm.remote_workload_canonical_service"], 365 "unknown", 366 ) 367 # Client metric should not have CSM labels. 368 elif "grpc.client" in name: 369 self.assertTrue( 370 "csm.workload_canonical_service" not in labels.keys() 371 ) 372 self.assertTrue( 373 "csm.remote_workload_canonical_service" 374 not in labels.keys() 375 ) 376 377 @mock.patch( 378 "opentelemetry.resourcedetector.gcp_resource_detector.GoogleCloudResourceDetector.detect" 379 ) 380 def testResourceDetectorGCE(self, mock_detector): 381 mock_detector.return_value = MOCK_GCE_RESOURCE 382 with mock.patch.dict( 383 os.environ, 384 { 385 "CSM_CANONICAL_SERVICE_NAME": "canonical_service", 386 "CSM_WORKLOAD_NAME": "workload", 387 }, 388 ): 389 plugin_option = TestOpenTelemetryPluginOption( 390 label_injector=CSMOpenTelemetryLabelInjector(), 391 ) 392 393 # Have to manually create csm_plugin so that we can enable it for all 394 # channels. 395 csm_plugin = grpc_observability.OpenTelemetryPlugin( 396 meter_provider=self._provider, plugin_options=[plugin_option] 397 ) 398 399 csm_plugin.register_global() 400 self._server, port = _test_server.start_server() 401 _test_server.unary_unary_call(port=port) 402 _test_server.unary_unary_call(port=port) 403 csm_plugin.deregister_global() 404 405 validate_metrics_exist(self, self.all_metrics) 406 for name, label_list in self.all_metrics.items(): 407 # started metrics shouldn't have any csm labels. 408 if name in [ 409 "grpc.client.attempt.started", 410 "grpc.server.call.started", 411 ]: 412 self._verify_no_service_mesh_attributes(label_list) 413 # duration metrics should have all csm related labels. 414 elif name in [ 415 "grpc.client.attempt.duration", 416 "grpc.server.call.duration", 417 ]: 418 self._verify_service_mesh_attributes(label_list, TYPE_GCE) 419 420 @mock.patch( 421 "opentelemetry.resourcedetector.gcp_resource_detector.GoogleCloudResourceDetector.detect" 422 ) 423 def testResourceDetectorGKE(self, mock_detector): 424 mock_detector.return_value = MOCK_GKE_RESOURCE 425 with mock.patch.dict( 426 os.environ, 427 { 428 "CSM_CANONICAL_SERVICE_NAME": "canonical_service", 429 "CSM_WORKLOAD_NAME": "workload", 430 }, 431 ): 432 plugin_option = TestOpenTelemetryPluginOption( 433 label_injector=CSMOpenTelemetryLabelInjector() 434 ) 435 436 # Have to manually create csm_plugin so that we can enable it for all 437 # channels. 438 csm_plugin = grpc_observability.OpenTelemetryPlugin( 439 meter_provider=self._provider, plugin_options=[plugin_option] 440 ) 441 442 csm_plugin.register_global() 443 self._server, port = _test_server.start_server() 444 _test_server.unary_unary_call(port=port) 445 csm_plugin.deregister_global() 446 447 validate_metrics_exist(self, self.all_metrics) 448 for name, label_list in self.all_metrics.items(): 449 # started metrics shouldn't have any csm labels. 450 if name in [ 451 "grpc.client.attempt.started", 452 "grpc.server.call.started", 453 ]: 454 self._verify_no_service_mesh_attributes(label_list) 455 # duration metrics should have all csm related labels. 456 elif name in [ 457 "grpc.client.attempt.duration", 458 "grpc.server.call.duration", 459 ]: 460 self._verify_service_mesh_attributes(label_list, TYPE_GKE) 461 462 @mock.patch( 463 "opentelemetry.resourcedetector.gcp_resource_detector.GoogleCloudResourceDetector.detect" 464 ) 465 def testResourceDetectorUnknown(self, mock_detector): 466 mock_detector.return_value = MOCK_UNKNOWN_RESOURCE 467 with mock.patch.dict( 468 os.environ, 469 { 470 "CSM_CANONICAL_SERVICE_NAME": "canonical_service", 471 "CSM_WORKLOAD_NAME": "workload", 472 }, 473 ): 474 plugin_option = TestOpenTelemetryPluginOption( 475 label_injector=CSMOpenTelemetryLabelInjector() 476 ) 477 478 # Have to manually create csm_plugin so that we can enable it for all 479 # channels. 480 csm_plugin = grpc_observability.OpenTelemetryPlugin( 481 meter_provider=self._provider, plugin_options=[plugin_option] 482 ) 483 484 csm_plugin.register_global() 485 self._server, port = _test_server.start_server() 486 _test_server.unary_unary_call(port=port) 487 csm_plugin.deregister_global() 488 489 validate_metrics_exist(self, self.all_metrics) 490 for name, label_list in self.all_metrics.items(): 491 # started metrics shouldn't have any csm labels. 492 if name in [ 493 "grpc.client.attempt.started", 494 "grpc.server.call.started", 495 ]: 496 self._verify_no_service_mesh_attributes(label_list) 497 # duration metrics should have all csm related labels. 498 elif name in [ 499 "grpc.client.attempt.duration", 500 "grpc.server.call.duration", 501 ]: 502 self._verify_service_mesh_attributes(label_list, UNKNOWN_VALUE) 503 504 def _verify_service_mesh_attributes( 505 self, label_list: List[Dict[str, str]], resource_type: str 506 ): 507 for labels in label_list: 508 # Assuming attributes is a dictionary 509 self.assertEqual( 510 labels["csm.workload_canonical_service"], "canonical_service" 511 ) 512 self.assertEqual( 513 labels["csm.remote_workload_canonical_service"], 514 "canonical_service", 515 ) 516 517 if resource_type == TYPE_GKE: 518 self.assertEqual( 519 labels["csm.remote_workload_type"], "gcp_kubernetes_engine" 520 ) 521 self.assertEqual(labels["csm.remote_workload_name"], "workload") 522 self.assertEqual( 523 labels["csm.remote_workload_namespace_name"], "namespace" 524 ) 525 self.assertEqual( 526 labels["csm.remote_workload_cluster_name"], "cluster" 527 ) 528 self.assertEqual( 529 labels["csm.remote_workload_location"], "region" 530 ) 531 self.assertEqual(labels["csm.remote_workload_project_id"], "id") 532 elif resource_type == TYPE_GCE: 533 self.assertEqual( 534 labels["csm.remote_workload_type"], "gcp_compute_engine" 535 ) 536 self.assertEqual(labels["csm.remote_workload_name"], "workload") 537 self.assertEqual(labels["csm.remote_workload_location"], "zone") 538 self.assertEqual(labels["csm.remote_workload_project_id"], "id") 539 elif resource_type == UNKNOWN_VALUE: 540 self.assertEqual(labels["csm.remote_workload_type"], "random") 541 542 def _verify_no_service_mesh_attributes( 543 self, label_list: List[Dict[str, str]] 544 ): 545 for labels in label_list: 546 self.assertTrue( 547 "csm.remote_workload_canonical_service" not in labels.keys() 548 ) 549 self.assertTrue("csm.remote_workload_type" not in labels.keys()) 550 self.assertTrue( 551 "csm.workload_canonical_service" not in labels.keys() 552 ) 553 self.assertTrue("csm.workload_type" not in labels.keys()) 554 self.assertTrue("csm.mesh_id" not in labels.keys()) 555 556 557def validate_metrics_exist( 558 testCase: unittest.TestCase, all_metrics: Dict[str, Any] 559) -> None: 560 # Sleep here to make sure we have at least one export from OTel MetricExporter. 561 assert_eventually( 562 testCase=testCase, 563 predicate=lambda: len(all_metrics.keys()) > 1, 564 message=lambda: f"No metrics was exported", 565 ) 566 567 568def assert_eventually( 569 testCase: unittest.TestCase, 570 predicate: Callable[[], bool], 571 *, 572 timeout: Optional[datetime.timedelta] = None, 573 message: Optional[Callable[[], str]] = None, 574) -> None: 575 message = message or (lambda: "Proposition did not evaluate to true") 576 timeout = timeout or datetime.timedelta(seconds=5) 577 end = datetime.datetime.now() + timeout 578 while datetime.datetime.now() < end: 579 if predicate(): 580 break 581 time.sleep(0.5) 582 else: 583 testCase.fail(message() + " after " + str(timeout)) 584 585 586if __name__ == "__main__": 587 logging.basicConfig() 588 unittest.main(verbosity=2) 589