• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2024 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from collections import defaultdict
16import datetime
17import logging
18import os
19import sys
20import time
21from typing import Any, Callable, Dict, List, Optional, Set
22import unittest
23from unittest import mock
24
25from grpc_csm_observability import CsmOpenTelemetryPlugin
26from grpc_csm_observability._csm_observability_plugin import (
27    CSMOpenTelemetryLabelInjector,
28)
29from grpc_csm_observability._csm_observability_plugin import TYPE_GCE
30from grpc_csm_observability._csm_observability_plugin import TYPE_GKE
31from grpc_csm_observability._csm_observability_plugin import UNKNOWN_VALUE
32import grpc_observability
33from grpc_observability import _open_telemetry_measures
34from grpc_observability._open_telemetry_plugin import OpenTelemetryLabelInjector
35from grpc_observability._open_telemetry_plugin import OpenTelemetryPluginOption
36from opentelemetry.sdk.metrics import MeterProvider
37from opentelemetry.sdk.metrics.export import AggregationTemporality
38from opentelemetry.sdk.metrics.export import MetricExportResult
39from opentelemetry.sdk.metrics.export import MetricExporter
40from opentelemetry.sdk.metrics.export import MetricsData
41from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
42from opentelemetry.sdk.resources import Resource
43
44from tests.observability import _test_server
45
46logger = logging.getLogger(__name__)
47
48OTEL_EXPORT_INTERVAL_S = 0.5
49# We only expect basic labels to be exchanged.
50CSM_METADATA_EXCHANGE_DEFAULT_LABELS = [
51    "csm.remote_workload_type",
52    "csm.remote_workload_canonical_service",
53]
54
55# The following metrics should have optional labels when optional
56# labels is enabled through OpenTelemetryPlugin.
57METRIC_NAME_WITH_OPTIONAL_LABEL = [
58    "grpc.client.attempt.duration",
59    "grpc.client.attempt.sent_total_compressed_message_size",
60    "grpc.client.attempt.rcvd_total_compressed_message_size",
61]
62CSM_OPTIONAL_LABEL_KEYS = ["csm.service_name", "csm.service_namespace_name"]
63
64# The following metrics should have metadata exchange labels when metadata
65# exchange flow is triggered.
66METRIC_NAME_WITH_EXCHANGE_LABEL = [
67    "grpc.client.attempt.duration",
68    "grpc.client.attempt.sent_total_compressed_message_size",
69    "grpc.client.attempt.rcvd_total_compressed_message_size",
70    "grpc.server.call.duration",
71    "grpc.server.call.sent_total_compressed_message_size",
72    "grpc.server.call.rcvd_total_compressed_message_size",
73]
74
75MOCK_GKE_RESOURCE = Resource.create(
76    attributes={
77        "gcp.resource_type": "gke_container",
78        "k8s.pod.name": "pod",
79        "k8s.container.name": "container",
80        "k8s.namespace.name": "namespace",
81        "k8s.cluster.name": "cluster",
82        "cloud.region": "region",
83        "cloud.account.id": "id",
84    }
85)
86
87MOCK_GCE_RESOURCE = Resource.create(
88    attributes={
89        "gcp.resource_type": "gce_instance",
90        "cloud.zone": "zone",
91        "cloud.account.id": "id",
92    }
93)
94
95MOCK_UNKNOWN_RESOURCE = Resource.create(
96    attributes={
97        "gcp.resource_type": "random",
98    }
99)
100
101
102class OTelMetricExporter(MetricExporter):
103    """Implementation of :class:`MetricExporter` that export metrics to the
104    provided metric_list.
105
106    all_metrics: A dict which key is grpc_observability._opentelemetry_measures.Metric.name,
107        value is a list of labels recorded for that metric.
108        An example item of this dict:
109            {"grpc.client.attempt.started":
110              [{'grpc.method': 'test/UnaryUnary', 'grpc.target': 'localhost:42517'},
111               {'grpc.method': 'other', 'grpc.target': 'localhost:42517'}]}
112    """
113
114    def __init__(
115        self,
116        all_metrics: Dict[str, List],
117        preferred_temporality: Dict[type, AggregationTemporality] = None,
118        preferred_aggregation: Dict[
119            type, "opentelemetry.sdk.metrics.view.Aggregation"
120        ] = None,
121    ):
122        super().__init__(
123            preferred_temporality=preferred_temporality,
124            preferred_aggregation=preferred_aggregation,
125        )
126        self.all_metrics = all_metrics
127
128    def export(
129        self,
130        metrics_data: MetricsData,
131        timeout_millis: float = 10_000,
132        **kwargs,
133    ) -> MetricExportResult:
134        self.record_metric(metrics_data)
135        return MetricExportResult.SUCCESS
136
137    def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
138        pass
139
140    def force_flush(self, timeout_millis: float = 10_000) -> bool:
141        return True
142
143    def record_metric(self, metrics_data: MetricsData) -> None:
144        for resource_metric in metrics_data.resource_metrics:
145            for scope_metric in resource_metric.scope_metrics:
146                for metric in scope_metric.metrics:
147                    for data_point in metric.data.data_points:
148                        self.all_metrics[metric.name].append(
149                            data_point.attributes
150                        )
151
152
153class TestOpenTelemetryPluginOption(OpenTelemetryPluginOption):
154    _label_injector: OpenTelemetryLabelInjector
155    _active_on_client: bool
156    _active_on_server: bool
157
158    def __init__(
159        self,
160        label_injector: OpenTelemetryLabelInjector,
161        active_on_client: Optional[bool] = True,
162        active_on_server: Optional[bool] = True,
163    ):
164        self._label_injector = label_injector
165        self._active_on_client = active_on_client
166        self._active_on_server = active_on_server
167
168    def is_active_on_client_channel(self, target: str) -> bool:
169        return self._active_on_client
170
171    def is_active_on_server(self, xds: bool) -> bool:
172        return self._active_on_server
173
174    def get_label_injector(self) -> OpenTelemetryLabelInjector:
175        return self._label_injector
176
177
178@unittest.skipIf(
179    os.name == "nt" or "darwin" in sys.platform,
180    "Observability is not supported in Windows and MacOS",
181)
182class CSMObservabilityPluginTest(unittest.TestCase):
183    def setUp(self):
184        self.all_metrics = defaultdict(list)
185        otel_exporter = OTelMetricExporter(self.all_metrics)
186        reader = PeriodicExportingMetricReader(
187            exporter=otel_exporter,
188            export_interval_millis=OTEL_EXPORT_INTERVAL_S * 1000,
189        )
190        self._provider = MeterProvider(metric_readers=[reader])
191        self._server = None
192        self._port = None
193
194    def tearDown(self):
195        if self._server:
196            self._server.stop(0)
197
198    def testOptionalXdsServiceLabelExist(self):
199        csm_plugin = CsmOpenTelemetryPlugin(
200            meter_provider=self._provider,
201        )
202
203        csm_plugin.register_global()
204        self._server, port = _test_server.start_server()
205        _test_server.unary_unary_call(port=port)
206        csm_plugin.deregister_global()
207
208        validate_metrics_exist(self, self.all_metrics)
209        for name, label_list in self.all_metrics.items():
210            if name in METRIC_NAME_WITH_OPTIONAL_LABEL:
211                self._validate_label_exist(
212                    name, label_list, CSM_OPTIONAL_LABEL_KEYS
213                )
214            else:
215                self._validate_label_not_exist(
216                    name, label_list, CSM_OPTIONAL_LABEL_KEYS
217                )
218
219    def testPluginOptionOnlyEnabledForXdsTargets(self):
220        csm_plugin = CsmOpenTelemetryPlugin(
221            meter_provider=self._provider,
222        )
223        csm_plugin_option = csm_plugin.plugin_options[0]
224        self.assertFalse(
225            csm_plugin_option.is_active_on_client_channel("foo.bar.google.com")
226        )
227        self.assertFalse(
228            csm_plugin_option.is_active_on_client_channel(
229                "dns:///foo.bar.google.com"
230            )
231        )
232        self.assertFalse(
233            csm_plugin_option.is_active_on_client_channel(
234                "dns:///foo.bar.google.com:1234"
235            )
236        )
237        self.assertFalse(
238            csm_plugin_option.is_active_on_client_channel(
239                "dns://authority/foo.bar.google.com:1234"
240            )
241        )
242        self.assertFalse(
243            csm_plugin_option.is_active_on_client_channel("xds://authority/foo")
244        )
245
246        self.assertTrue(
247            csm_plugin_option.is_active_on_client_channel("xds:///foo")
248        )
249        self.assertTrue(
250            csm_plugin_option.is_active_on_client_channel(
251                "xds://traffic-director-global.xds.googleapis.com/foo"
252            )
253        )
254        self.assertTrue(
255            csm_plugin_option.is_active_on_client_channel(
256                "xds://traffic-director-global.xds.googleapis.com/foo.bar"
257            )
258        )
259
260    def _validate_all_metrics_names(self, metric_names: Set[str]) -> None:
261        self._validate_server_metrics_names(metric_names)
262        self._validate_client_metrics_names(metric_names)
263
264    def _validate_server_metrics_names(self, metric_names: Set[str]) -> None:
265        for base_metric in _open_telemetry_measures.base_metrics():
266            if "grpc.server" in base_metric.name:
267                self.assertTrue(
268                    base_metric.name in metric_names,
269                    msg=f"metric {base_metric.name} not found in exported metrics: {metric_names}!",
270                )
271
272    def _validate_client_metrics_names(self, metric_names: Set[str]) -> None:
273        for base_metric in _open_telemetry_measures.base_metrics():
274            if "grpc.client" in base_metric.name:
275                self.assertTrue(
276                    base_metric.name in metric_names,
277                    msg=f"metric {base_metric.name} not found in exported metrics: {metric_names}!",
278                )
279
280    def _validate_label_exist(
281        self,
282        metric_name: str,
283        metric_label_list: List[str],
284        labels_to_check: List[str],
285    ) -> None:
286        for metric_label in metric_label_list:
287            for label in labels_to_check:
288                self.assertTrue(
289                    label in metric_label,
290                    msg=f"label with key {label} not found in metric {metric_name}, found label list: {metric_label}",
291                )
292
293    def _validate_label_not_exist(
294        self,
295        metric_name: str,
296        metric_label_list: List[str],
297        labels_to_check: List[str],
298    ) -> None:
299        for metric_label in metric_label_list:
300            for label in labels_to_check:
301                self.assertFalse(
302                    label in metric_label,
303                    msg=f"found unexpected label with key {label} in metric {metric_name}, found label list: {metric_label}",
304                )
305
306
307@unittest.skipIf(
308    os.name == "nt" or "darwin" in sys.platform,
309    "Observability is not supported in Windows and MacOS",
310)
311class MetadataExchangeTest(unittest.TestCase):
312    def setUp(self):
313        self.all_metrics = defaultdict(list)
314        otel_exporter = OTelMetricExporter(self.all_metrics)
315        reader = PeriodicExportingMetricReader(
316            exporter=otel_exporter,
317            export_interval_millis=OTEL_EXPORT_INTERVAL_S * 1000,
318        )
319        self._provider = MeterProvider(metric_readers=[reader])
320        self._server = None
321        self._port = None
322
323    def tearDown(self):
324        if self._server:
325            self._server.stop(0)
326
327    @mock.patch(
328        "opentelemetry.resourcedetector.gcp_resource_detector.GoogleCloudResourceDetector.detect"
329    )
330    def testMetadataExchangeClientDoesNotSendMetadata(self, mock_detector):
331        mock_detector.return_value = MOCK_GKE_RESOURCE
332        with mock.patch.dict(
333            os.environ,
334            {
335                "CSM_CANONICAL_SERVICE_NAME": "canonical_service",
336                "CSM_WORKLOAD_NAME": "workload",
337            },
338        ):
339            plugin_option = TestOpenTelemetryPluginOption(
340                label_injector=CSMOpenTelemetryLabelInjector(),
341                active_on_client=False,
342            )
343
344        # Manually create csm_plugin so that it's always disabled on client.
345        csm_plugin = grpc_observability.OpenTelemetryPlugin(
346            meter_provider=self._provider, plugin_options=[plugin_option]
347        )
348
349        csm_plugin.register_global()
350        self._server, port = _test_server.start_server()
351        _test_server.unary_unary_call(port=port)
352        csm_plugin.deregister_global()
353
354        validate_metrics_exist(self, self.all_metrics)
355        for name, label_list in self.all_metrics.items():
356            for labels in label_list:
357                # Verifies that the server records unknown when the client does not send metadata
358                if name in ["grpc.server.call.duration"]:
359                    self.assertEqual(
360                        labels["csm.workload_canonical_service"],
361                        "canonical_service",
362                    )
363                    self.assertEqual(
364                        labels["csm.remote_workload_canonical_service"],
365                        "unknown",
366                    )
367                # Client metric should not have CSM labels.
368                elif "grpc.client" in name:
369                    self.assertTrue(
370                        "csm.workload_canonical_service" not in labels.keys()
371                    )
372                    self.assertTrue(
373                        "csm.remote_workload_canonical_service"
374                        not in labels.keys()
375                    )
376
377    @mock.patch(
378        "opentelemetry.resourcedetector.gcp_resource_detector.GoogleCloudResourceDetector.detect"
379    )
380    def testResourceDetectorGCE(self, mock_detector):
381        mock_detector.return_value = MOCK_GCE_RESOURCE
382        with mock.patch.dict(
383            os.environ,
384            {
385                "CSM_CANONICAL_SERVICE_NAME": "canonical_service",
386                "CSM_WORKLOAD_NAME": "workload",
387            },
388        ):
389            plugin_option = TestOpenTelemetryPluginOption(
390                label_injector=CSMOpenTelemetryLabelInjector(),
391            )
392
393        # Have to manually create csm_plugin so that we can enable it for all
394        # channels.
395        csm_plugin = grpc_observability.OpenTelemetryPlugin(
396            meter_provider=self._provider, plugin_options=[plugin_option]
397        )
398
399        csm_plugin.register_global()
400        self._server, port = _test_server.start_server()
401        _test_server.unary_unary_call(port=port)
402        _test_server.unary_unary_call(port=port)
403        csm_plugin.deregister_global()
404
405        validate_metrics_exist(self, self.all_metrics)
406        for name, label_list in self.all_metrics.items():
407            # started metrics shouldn't have any csm labels.
408            if name in [
409                "grpc.client.attempt.started",
410                "grpc.server.call.started",
411            ]:
412                self._verify_no_service_mesh_attributes(label_list)
413            # duration metrics should have all csm related labels.
414            elif name in [
415                "grpc.client.attempt.duration",
416                "grpc.server.call.duration",
417            ]:
418                self._verify_service_mesh_attributes(label_list, TYPE_GCE)
419
420    @mock.patch(
421        "opentelemetry.resourcedetector.gcp_resource_detector.GoogleCloudResourceDetector.detect"
422    )
423    def testResourceDetectorGKE(self, mock_detector):
424        mock_detector.return_value = MOCK_GKE_RESOURCE
425        with mock.patch.dict(
426            os.environ,
427            {
428                "CSM_CANONICAL_SERVICE_NAME": "canonical_service",
429                "CSM_WORKLOAD_NAME": "workload",
430            },
431        ):
432            plugin_option = TestOpenTelemetryPluginOption(
433                label_injector=CSMOpenTelemetryLabelInjector()
434            )
435
436        # Have to manually create csm_plugin so that we can enable it for all
437        # channels.
438        csm_plugin = grpc_observability.OpenTelemetryPlugin(
439            meter_provider=self._provider, plugin_options=[plugin_option]
440        )
441
442        csm_plugin.register_global()
443        self._server, port = _test_server.start_server()
444        _test_server.unary_unary_call(port=port)
445        csm_plugin.deregister_global()
446
447        validate_metrics_exist(self, self.all_metrics)
448        for name, label_list in self.all_metrics.items():
449            # started metrics shouldn't have any csm labels.
450            if name in [
451                "grpc.client.attempt.started",
452                "grpc.server.call.started",
453            ]:
454                self._verify_no_service_mesh_attributes(label_list)
455            # duration metrics should have all csm related labels.
456            elif name in [
457                "grpc.client.attempt.duration",
458                "grpc.server.call.duration",
459            ]:
460                self._verify_service_mesh_attributes(label_list, TYPE_GKE)
461
462    @mock.patch(
463        "opentelemetry.resourcedetector.gcp_resource_detector.GoogleCloudResourceDetector.detect"
464    )
465    def testResourceDetectorUnknown(self, mock_detector):
466        mock_detector.return_value = MOCK_UNKNOWN_RESOURCE
467        with mock.patch.dict(
468            os.environ,
469            {
470                "CSM_CANONICAL_SERVICE_NAME": "canonical_service",
471                "CSM_WORKLOAD_NAME": "workload",
472            },
473        ):
474            plugin_option = TestOpenTelemetryPluginOption(
475                label_injector=CSMOpenTelemetryLabelInjector()
476            )
477
478        # Have to manually create csm_plugin so that we can enable it for all
479        # channels.
480        csm_plugin = grpc_observability.OpenTelemetryPlugin(
481            meter_provider=self._provider, plugin_options=[plugin_option]
482        )
483
484        csm_plugin.register_global()
485        self._server, port = _test_server.start_server()
486        _test_server.unary_unary_call(port=port)
487        csm_plugin.deregister_global()
488
489        validate_metrics_exist(self, self.all_metrics)
490        for name, label_list in self.all_metrics.items():
491            # started metrics shouldn't have any csm labels.
492            if name in [
493                "grpc.client.attempt.started",
494                "grpc.server.call.started",
495            ]:
496                self._verify_no_service_mesh_attributes(label_list)
497            # duration metrics should have all csm related labels.
498            elif name in [
499                "grpc.client.attempt.duration",
500                "grpc.server.call.duration",
501            ]:
502                self._verify_service_mesh_attributes(label_list, UNKNOWN_VALUE)
503
504    def _verify_service_mesh_attributes(
505        self, label_list: List[Dict[str, str]], resource_type: str
506    ):
507        for labels in label_list:
508            # Assuming attributes is a dictionary
509            self.assertEqual(
510                labels["csm.workload_canonical_service"], "canonical_service"
511            )
512            self.assertEqual(
513                labels["csm.remote_workload_canonical_service"],
514                "canonical_service",
515            )
516
517            if resource_type == TYPE_GKE:
518                self.assertEqual(
519                    labels["csm.remote_workload_type"], "gcp_kubernetes_engine"
520                )
521                self.assertEqual(labels["csm.remote_workload_name"], "workload")
522                self.assertEqual(
523                    labels["csm.remote_workload_namespace_name"], "namespace"
524                )
525                self.assertEqual(
526                    labels["csm.remote_workload_cluster_name"], "cluster"
527                )
528                self.assertEqual(
529                    labels["csm.remote_workload_location"], "region"
530                )
531                self.assertEqual(labels["csm.remote_workload_project_id"], "id")
532            elif resource_type == TYPE_GCE:
533                self.assertEqual(
534                    labels["csm.remote_workload_type"], "gcp_compute_engine"
535                )
536                self.assertEqual(labels["csm.remote_workload_name"], "workload")
537                self.assertEqual(labels["csm.remote_workload_location"], "zone")
538                self.assertEqual(labels["csm.remote_workload_project_id"], "id")
539            elif resource_type == UNKNOWN_VALUE:
540                self.assertEqual(labels["csm.remote_workload_type"], "random")
541
542    def _verify_no_service_mesh_attributes(
543        self, label_list: List[Dict[str, str]]
544    ):
545        for labels in label_list:
546            self.assertTrue(
547                "csm.remote_workload_canonical_service" not in labels.keys()
548            )
549            self.assertTrue("csm.remote_workload_type" not in labels.keys())
550            self.assertTrue(
551                "csm.workload_canonical_service" not in labels.keys()
552            )
553            self.assertTrue("csm.workload_type" not in labels.keys())
554            self.assertTrue("csm.mesh_id" not in labels.keys())
555
556
557def validate_metrics_exist(
558    testCase: unittest.TestCase, all_metrics: Dict[str, Any]
559) -> None:
560    # Sleep here to make sure we have at least one export from OTel MetricExporter.
561    assert_eventually(
562        testCase=testCase,
563        predicate=lambda: len(all_metrics.keys()) > 1,
564        message=lambda: f"No metrics was exported",
565    )
566
567
568def assert_eventually(
569    testCase: unittest.TestCase,
570    predicate: Callable[[], bool],
571    *,
572    timeout: Optional[datetime.timedelta] = None,
573    message: Optional[Callable[[], str]] = None,
574) -> None:
575    message = message or (lambda: "Proposition did not evaluate to true")
576    timeout = timeout or datetime.timedelta(seconds=5)
577    end = datetime.datetime.now() + timeout
578    while datetime.datetime.now() < end:
579        if predicate():
580            break
581        time.sleep(0.5)
582    else:
583        testCase.fail(message() + " after " + str(timeout))
584
585
586if __name__ == "__main__":
587    logging.basicConfig()
588    unittest.main(verbosity=2)
589