• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2023 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from collections import defaultdict
16import datetime
17import logging
18import os
19import sys
20import time
21from typing import Any, Callable, Dict, List, Optional, Set
22import unittest
23
24import grpc
25import grpc_observability
26from grpc_observability import _open_telemetry_measures
27from grpc_observability._open_telemetry_observability import (
28    GRPC_OTHER_LABEL_VALUE,
29)
30from grpc_observability._open_telemetry_observability import GRPC_METHOD_LABEL
31from grpc_observability._open_telemetry_observability import GRPC_TARGET_LABEL
32from opentelemetry.sdk.metrics import MeterProvider
33from opentelemetry.sdk.metrics.export import AggregationTemporality
34from opentelemetry.sdk.metrics.export import MetricExportResult
35from opentelemetry.sdk.metrics.export import MetricExporter
36from opentelemetry.sdk.metrics.export import MetricsData
37from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
38
39from tests.observability import _test_server
40
41logger = logging.getLogger(__name__)
42
43STREAM_LENGTH = 5
44OTEL_EXPORT_INTERVAL_S = 0.5
45
46
47class OTelMetricExporter(MetricExporter):
48    """Implementation of :class:`MetricExporter` that export metrics to the
49    provided metric_list.
50
51    all_metrics: A dict which key is grpc_observability._opentelemetry_measures.Metric.name,
52        value is a list of labels recorded for that metric.
53        An example item of this dict:
54            {"grpc.client.attempt.started":
55              [{'grpc.method': 'test/UnaryUnary', 'grpc.target': 'localhost:42517'},
56               {'grpc.method': 'other', 'grpc.target': 'localhost:42517'}]}
57    """
58
59    def __init__(
60        self,
61        all_metrics: Dict[str, List],
62        preferred_temporality: Dict[type, AggregationTemporality] = None,
63        preferred_aggregation: Dict[
64            type, "opentelemetry.sdk.metrics.view.Aggregation"
65        ] = None,
66    ):
67        super().__init__(
68            preferred_temporality=preferred_temporality,
69            preferred_aggregation=preferred_aggregation,
70        )
71        self.all_metrics = all_metrics
72
73    def export(
74        self,
75        metrics_data: MetricsData,
76        timeout_millis: float = 10_000,
77        **kwargs,
78    ) -> MetricExportResult:
79        self.record_metric(metrics_data)
80        return MetricExportResult.SUCCESS
81
82    def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
83        pass
84
85    def force_flush(self, timeout_millis: float = 10_000) -> bool:
86        return True
87
88    def record_metric(self, metrics_data: MetricsData) -> None:
89        for resource_metric in metrics_data.resource_metrics:
90            for scope_metric in resource_metric.scope_metrics:
91                for metric in scope_metric.metrics:
92                    for data_point in metric.data.data_points:
93                        self.all_metrics[metric.name].append(
94                            data_point.attributes
95                        )
96
97
98class _ClientUnaryUnaryInterceptor(grpc.UnaryUnaryClientInterceptor):
99    def intercept_unary_unary(
100        self, continuation, client_call_details, request_or_iterator
101    ):
102        response = continuation(client_call_details, request_or_iterator)
103        return response
104
105
106class _ServerInterceptor(grpc.ServerInterceptor):
107    def intercept_service(self, continuation, handler_call_details):
108        return continuation(handler_call_details)
109
110
111@unittest.skipIf(
112    os.name == "nt" or "darwin" in sys.platform,
113    "Observability is not supported in Windows and MacOS",
114)
115class OpenTelemetryObservabilityTest(unittest.TestCase):
116    def setUp(self):
117        self.all_metrics = defaultdict(list)
118        otel_exporter = OTelMetricExporter(self.all_metrics)
119        reader = PeriodicExportingMetricReader(
120            exporter=otel_exporter,
121            export_interval_millis=OTEL_EXPORT_INTERVAL_S * 1000,
122        )
123        self._provider = MeterProvider(metric_readers=[reader])
124        self._server = None
125        self._port = None
126
127    def tearDown(self):
128        if self._server:
129            self._server.stop(0)
130
131    def testRecordUnaryUnaryUseContextManager(self):
132        with grpc_observability.OpenTelemetryPlugin(
133            meter_provider=self._provider
134        ):
135            server, port = _test_server.start_server()
136            self._server = server
137            _test_server.unary_unary_call(port=port)
138
139        self._validate_metrics_exist(self.all_metrics)
140        self._validate_all_metrics_names(self.all_metrics.keys())
141
142    def testRecordUnaryUnaryUseGlobalInit(self):
143        otel_plugin = grpc_observability.OpenTelemetryPlugin(
144            meter_provider=self._provider
145        )
146        otel_plugin.register_global()
147
148        server, port = _test_server.start_server()
149        self._server = server
150        _test_server.unary_unary_call(port=port)
151
152        self._validate_metrics_exist(self.all_metrics)
153        self._validate_all_metrics_names(self.all_metrics.keys())
154        otel_plugin.deregister_global()
155
156    def testCallGlobalInitThrowErrorWhenGlobalCalled(self):
157        otel_plugin = grpc_observability.OpenTelemetryPlugin(
158            meter_provider=self._provider
159        )
160        otel_plugin.register_global()
161        try:
162            otel_plugin.register_global()
163        except RuntimeError as exp:
164            self.assertIn(
165                "gPRC Python observability was already initialized", str(exp)
166            )
167
168        otel_plugin.deregister_global()
169
170    def testCallGlobalInitThrowErrorWhenContextManagerCalled(self):
171        with grpc_observability.OpenTelemetryPlugin(
172            meter_provider=self._provider
173        ):
174            try:
175                otel_plugin = grpc_observability.OpenTelemetryPlugin(
176                    meter_provider=self._provider
177                )
178                otel_plugin.register_global()
179            except RuntimeError as exp:
180                self.assertIn(
181                    "gPRC Python observability was already initialized",
182                    str(exp),
183                )
184
185    def testCallContextManagerThrowErrorWhenGlobalInitCalled(self):
186        otel_plugin = grpc_observability.OpenTelemetryPlugin(
187            meter_provider=self._provider
188        )
189        otel_plugin.register_global()
190        try:
191            with grpc_observability.OpenTelemetryPlugin(
192                meter_provider=self._provider
193            ):
194                pass
195        except RuntimeError as exp:
196            self.assertIn(
197                "gPRC Python observability was already initialized", str(exp)
198            )
199        otel_plugin.deregister_global()
200
201    def testContextManagerThrowErrorWhenContextManagerCalled(self):
202        with grpc_observability.OpenTelemetryPlugin(
203            meter_provider=self._provider
204        ):
205            try:
206                with grpc_observability.OpenTelemetryPlugin(
207                    meter_provider=self._provider
208                ):
209                    pass
210            except RuntimeError as exp:
211                self.assertIn(
212                    "gPRC Python observability was already initialized",
213                    str(exp),
214                )
215
216    def testNoErrorCallGlobalInitThenContextManager(self):
217        otel_plugin = grpc_observability.OpenTelemetryPlugin(
218            meter_provider=self._provider
219        )
220        otel_plugin.register_global()
221        otel_plugin.deregister_global()
222
223        with grpc_observability.OpenTelemetryPlugin(
224            meter_provider=self._provider
225        ):
226            pass
227
228    def testNoErrorCallContextManagerThenGlobalInit(self):
229        with grpc_observability.OpenTelemetryPlugin(
230            meter_provider=self._provider
231        ):
232            pass
233        otel_plugin = grpc_observability.OpenTelemetryPlugin(
234            meter_provider=self._provider
235        )
236        otel_plugin.register_global()
237        otel_plugin.deregister_global()
238
239    def testRecordUnaryUnaryWithClientInterceptor(self):
240        interceptor = _ClientUnaryUnaryInterceptor()
241        with grpc_observability.OpenTelemetryPlugin(
242            meter_provider=self._provider
243        ):
244            server, port = _test_server.start_server()
245            self._server = server
246            _test_server.intercepted_unary_unary_call(
247                port=port, interceptors=interceptor
248            )
249
250        self._validate_metrics_exist(self.all_metrics)
251        self._validate_all_metrics_names(self.all_metrics.keys())
252
253    def testRecordUnaryUnaryWithServerInterceptor(self):
254        interceptor = _ServerInterceptor()
255        with grpc_observability.OpenTelemetryPlugin(
256            meter_provider=self._provider
257        ):
258            server, port = _test_server.start_server(interceptors=[interceptor])
259            self._server = server
260            _test_server.unary_unary_call(port=port)
261
262        self._validate_metrics_exist(self.all_metrics)
263        self._validate_all_metrics_names(self.all_metrics.keys())
264
265    def testRecordUnaryUnaryClientOnly(self):
266        server, port = _test_server.start_server()
267        self._server = server
268
269        with grpc_observability.OpenTelemetryPlugin(
270            meter_provider=self._provider
271        ):
272            _test_server.unary_unary_call(port=port)
273
274        self._validate_metrics_exist(self.all_metrics)
275        self._validate_client_metrics_names(self.all_metrics)
276
277    def testNoRecordBeforeInit(self):
278        server, port = _test_server.start_server()
279        _test_server.unary_unary_call(port=port)
280        self.assertEqual(len(self.all_metrics), 0)
281        server.stop(0)
282
283        with grpc_observability.OpenTelemetryPlugin(
284            meter_provider=self._provider
285        ):
286            server, port = _test_server.start_server()
287            self._server = server
288            _test_server.unary_unary_call(port=port)
289
290        self._validate_metrics_exist(self.all_metrics)
291        self._validate_all_metrics_names(self.all_metrics.keys())
292
293    def testNoRecordAfterExitUseContextManager(self):
294        with grpc_observability.OpenTelemetryPlugin(
295            meter_provider=self._provider
296        ):
297            server, port = _test_server.start_server()
298            self._server = server
299            self._port = port
300            _test_server.unary_unary_call(port=port)
301
302        self._validate_metrics_exist(self.all_metrics)
303        self._validate_all_metrics_names(self.all_metrics.keys())
304
305        self.all_metrics = defaultdict(list)
306        _test_server.unary_unary_call(port=self._port)
307        with self.assertRaisesRegex(AssertionError, "No metrics was exported"):
308            self._validate_metrics_exist(self.all_metrics)
309
310    def testNoRecordAfterExitUseGlobal(self):
311        otel_plugin = grpc_observability.OpenTelemetryPlugin(
312            meter_provider=self._provider
313        )
314        otel_plugin.register_global()
315
316        server, port = _test_server.start_server()
317        self._server = server
318        self._port = port
319        _test_server.unary_unary_call(port=port)
320        otel_plugin.deregister_global()
321
322        self._validate_metrics_exist(self.all_metrics)
323        self._validate_all_metrics_names(self.all_metrics.keys())
324
325        self.all_metrics = defaultdict(list)
326        _test_server.unary_unary_call(port=self._port)
327        with self.assertRaisesRegex(AssertionError, "No metrics was exported"):
328            self._validate_metrics_exist(self.all_metrics)
329
330    def testRecordUnaryStream(self):
331        with grpc_observability.OpenTelemetryPlugin(
332            meter_provider=self._provider
333        ):
334            server, port = _test_server.start_server()
335            self._server = server
336            _test_server.unary_stream_call(port=port)
337
338        self._validate_metrics_exist(self.all_metrics)
339        self._validate_all_metrics_names(self.all_metrics.keys())
340
341    def testRecordStreamUnary(self):
342        with grpc_observability.OpenTelemetryPlugin(
343            meter_provider=self._provider
344        ):
345            server, port = _test_server.start_server()
346            self._server = server
347            _test_server.stream_unary_call(port=port)
348
349        self._validate_metrics_exist(self.all_metrics)
350        self._validate_all_metrics_names(self.all_metrics.keys())
351
352    def testRecordStreamStream(self):
353        with grpc_observability.OpenTelemetryPlugin(
354            meter_provider=self._provider
355        ):
356            server, port = _test_server.start_server()
357            self._server = server
358            _test_server.stream_stream_call(port=port)
359
360        self._validate_metrics_exist(self.all_metrics)
361        self._validate_all_metrics_names(self.all_metrics.keys())
362
363    def testTargetAttributeFilter(self):
364        main_server, main_port = _test_server.start_server()
365        backup_server, backup_port = _test_server.start_server()
366        main_target = f"localhost:{main_port}"
367        backup_target = f"localhost:{backup_port}"
368
369        # Replace target label with 'other' for main_server.
370        def target_filter(target: str) -> bool:
371            if main_target in target:
372                return False
373            return True
374
375        with grpc_observability.OpenTelemetryPlugin(
376            meter_provider=self._provider, target_attribute_filter=target_filter
377        ):
378            _test_server.unary_unary_call(port=main_port)
379            _test_server.unary_unary_call(port=backup_port)
380
381        self._validate_metrics_exist(self.all_metrics)
382        self._validate_client_metrics_names(self.all_metrics)
383
384        target_values = set()
385        for label_list in self.all_metrics.values():
386            for labels in label_list:
387                if GRPC_TARGET_LABEL in labels:
388                    target_values.add(labels[GRPC_TARGET_LABEL])
389        self.assertTrue(GRPC_OTHER_LABEL_VALUE in target_values)
390        self.assertTrue(backup_target in target_values)
391
392        main_server.stop(0)
393        backup_server.stop(0)
394
395    def testMethodAttributeFilter(self):
396        # method_filter should replace method name 'test/UnaryUnaryFiltered' with 'other'.
397        FILTERED_METHOD_NAME = "test/UnaryUnaryFiltered"
398
399        def method_filter(method: str) -> bool:
400            if FILTERED_METHOD_NAME in method:
401                return False
402            return True
403
404        with grpc_observability.OpenTelemetryPlugin(
405            meter_provider=self._provider,
406            generic_method_attribute_filter=method_filter,
407        ):
408            server, port = _test_server.start_server(register_method=False)
409            self._server = server
410            _test_server.unary_unary_call(port=port, registered_method=True)
411            _test_server.unary_unary_filtered_call(port=port)
412
413        self._validate_metrics_exist(self.all_metrics)
414        self._validate_all_metrics_names(self.all_metrics.keys())
415        method_values = set()
416        for label_list in self.all_metrics.values():
417            for labels in label_list:
418                if GRPC_METHOD_LABEL in labels:
419                    method_values.add(labels[GRPC_METHOD_LABEL])
420        self.assertTrue(GRPC_OTHER_LABEL_VALUE in method_values)
421        self.assertTrue(FILTERED_METHOD_NAME not in method_values)
422
423    def testClientNonRegisteredMethod(self):
424        UNARY_METHOD_NAME = "test/UnaryUnary"
425
426        with grpc_observability.OpenTelemetryPlugin(
427            meter_provider=self._provider
428        ):
429            server, port = _test_server.start_server(register_method=True)
430            self._server = server
431            _test_server.unary_unary_call(port=port, registered_method=False)
432
433        self._validate_metrics_exist(self.all_metrics)
434        self._validate_all_metrics_names(self.all_metrics.keys())
435        client_method_values = set()
436        server_method_values = set()
437        for metric_name, label_list in self.all_metrics.items():
438            for labels in label_list:
439                if GRPC_METHOD_LABEL in labels:
440                    if "grpc.client" in metric_name:
441                        client_method_values.add(labels[GRPC_METHOD_LABEL])
442                    elif "grpc.server" in metric_name:
443                        server_method_values.add(labels[GRPC_METHOD_LABEL])
444        # For client metrics, all method name should be replaced with 'other'.
445        self.assertTrue(GRPC_OTHER_LABEL_VALUE in client_method_values)
446        self.assertTrue(UNARY_METHOD_NAME not in client_method_values)
447
448        # For server metrics, all method name should be 'test/UnaryUnary'.
449        self.assertTrue(GRPC_OTHER_LABEL_VALUE not in server_method_values)
450        self.assertTrue(UNARY_METHOD_NAME in server_method_values)
451
452    def testServerNonRegisteredMethod(self):
453        UNARY_METHOD_NAME = "test/UnaryUnary"
454
455        with grpc_observability.OpenTelemetryPlugin(
456            meter_provider=self._provider
457        ):
458            server, port = _test_server.start_server(register_method=False)
459            self._server = server
460            _test_server.unary_unary_call(port=port, registered_method=True)
461
462        self._validate_metrics_exist(self.all_metrics)
463        self._validate_all_metrics_names(self.all_metrics.keys())
464        client_method_values = set()
465        server_method_values = set()
466        for metric_name, label_list in self.all_metrics.items():
467            for labels in label_list:
468                if GRPC_METHOD_LABEL in labels:
469                    if "grpc.client" in metric_name:
470                        client_method_values.add(labels[GRPC_METHOD_LABEL])
471                    elif "grpc.server" in metric_name:
472                        server_method_values.add(labels[GRPC_METHOD_LABEL])
473        # For client metrics, all method name should be 'test/UnaryUnary'.
474        self.assertTrue(GRPC_OTHER_LABEL_VALUE not in client_method_values)
475        self.assertTrue(UNARY_METHOD_NAME in client_method_values)
476
477        # For server metrics, all method name should be replaced with 'other'.
478        self.assertTrue(GRPC_OTHER_LABEL_VALUE in server_method_values)
479        self.assertTrue(UNARY_METHOD_NAME not in server_method_values)
480
481    def assert_eventually(
482        self,
483        predicate: Callable[[], bool],
484        *,
485        timeout: Optional[datetime.timedelta] = None,
486        message: Optional[Callable[[], str]] = None,
487    ) -> None:
488        message = message or (lambda: "Proposition did not evaluate to true")
489        timeout = timeout or datetime.timedelta(seconds=5)
490        end = datetime.datetime.now() + timeout
491        while datetime.datetime.now() < end:
492            if predicate():
493                break
494            time.sleep(0.5)
495        else:
496            self.fail(message() + " after " + str(timeout))
497
498    def _validate_metrics_exist(self, all_metrics: Dict[str, Any]) -> None:
499        # Sleep here to make sure we have at least one export from OTel MetricExporter.
500        self.assert_eventually(
501            lambda: len(all_metrics.keys()) > 1,
502            message=lambda: f"No metrics was exported",
503        )
504
505    def _validate_all_metrics_names(self, metric_names: Set[str]) -> None:
506        self._validate_server_metrics_names(metric_names)
507        self._validate_client_metrics_names(metric_names)
508
509    def _validate_server_metrics_names(self, metric_names: Set[str]) -> None:
510        for base_metric in _open_telemetry_measures.base_metrics():
511            if "grpc.server" in base_metric.name:
512                self.assertTrue(
513                    base_metric.name in metric_names,
514                    msg=f"metric {base_metric.name} not found in exported metrics: {metric_names}!",
515                )
516
517    def _validate_client_metrics_names(self, metric_names: Set[str]) -> None:
518        for base_metric in _open_telemetry_measures.base_metrics():
519            if "grpc.client" in base_metric.name:
520                self.assertTrue(
521                    base_metric.name in metric_names,
522                    msg=f"metric {base_metric.name} not found in exported metrics: {metric_names}!",
523                )
524
525
526if __name__ == "__main__":
527    logging.basicConfig()
528    unittest.main(verbosity=2)
529