• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2023 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from collections import defaultdict
16import datetime
17import logging
18import os
19import sys
20import time
21from typing import Any, Callable, Dict, List, Optional, Set
22import unittest
23
24import grpc
25import grpc_observability
26from grpc_observability import _open_telemetry_measures
27from grpc_observability._open_telemetry_observability import (
28    GRPC_OTHER_LABEL_VALUE,
29)
30from grpc_observability._open_telemetry_observability import GRPC_METHOD_LABEL
31from grpc_observability._open_telemetry_observability import GRPC_TARGET_LABEL
32from opentelemetry.sdk.metrics import MeterProvider
33from opentelemetry.sdk.metrics.export import AggregationTemporality
34from opentelemetry.sdk.metrics.export import MetricExportResult
35from opentelemetry.sdk.metrics.export import MetricExporter
36from opentelemetry.sdk.metrics.export import MetricsData
37from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
38
39from tests.observability import _test_server
40
41logger = logging.getLogger(__name__)
42
43STREAM_LENGTH = 5
44OTEL_EXPORT_INTERVAL_S = 0.5
45
46
47class OTelMetricExporter(MetricExporter):
48    """Implementation of :class:`MetricExporter` that export metrics to the
49    provided metric_list.
50
51    all_metrics: A dict which key is grpc_observability._opentelemetry_measures.Metric.name,
52        value is a list of labels recorded for that metric.
53        An example item of this dict:
54            {"grpc.client.attempt.started":
55              [{'grpc.method': 'test/UnaryUnary', 'grpc.target': 'localhost:42517'},
56               {'grpc.method': 'other', 'grpc.target': 'localhost:42517'}]}
57    """
58
59    def __init__(
60        self,
61        all_metrics: Dict[str, List],
62        preferred_temporality: Dict[type, AggregationTemporality] = None,
63        preferred_aggregation: Dict[
64            type, "opentelemetry.sdk.metrics.view.Aggregation"
65        ] = None,
66    ):
67        super().__init__(
68            preferred_temporality=preferred_temporality,
69            preferred_aggregation=preferred_aggregation,
70        )
71        self.all_metrics = all_metrics
72
73    def export(
74        self,
75        metrics_data: MetricsData,
76        timeout_millis: float = 10_000,
77        **kwargs,
78    ) -> MetricExportResult:
79        self.record_metric(metrics_data)
80        return MetricExportResult.SUCCESS
81
82    def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
83        pass
84
85    def force_flush(self, timeout_millis: float = 10_000) -> bool:
86        return True
87
88    def record_metric(self, metrics_data: MetricsData) -> None:
89        for resource_metric in metrics_data.resource_metrics:
90            for scope_metric in resource_metric.scope_metrics:
91                for metric in scope_metric.metrics:
92                    for data_point in metric.data.data_points:
93                        self.all_metrics[metric.name].append(
94                            data_point.attributes
95                        )
96
97
98class _ClientUnaryUnaryInterceptor(grpc.UnaryUnaryClientInterceptor):
99    def intercept_unary_unary(
100        self, continuation, client_call_details, request_or_iterator
101    ):
102        response = continuation(client_call_details, request_or_iterator)
103        return response
104
105
106class _ServerInterceptor(grpc.ServerInterceptor):
107    def intercept_service(self, continuation, handler_call_details):
108        return continuation(handler_call_details)
109
110
111@unittest.skipIf(
112    os.name == "nt" or "darwin" in sys.platform,
113    "Observability is not supported in Windows and MacOS",
114)
115class OpenTelemetryObservabilityTest(unittest.TestCase):
116    def setUp(self):
117        self.all_metrics = defaultdict(list)
118        otel_exporter = OTelMetricExporter(self.all_metrics)
119        reader = PeriodicExportingMetricReader(
120            exporter=otel_exporter,
121            export_interval_millis=OTEL_EXPORT_INTERVAL_S * 1000,
122        )
123        self._provider = MeterProvider(metric_readers=[reader])
124        self._server = None
125        self._port = None
126
127    def tearDown(self):
128        if self._server:
129            self._server.stop(0)
130
131    def testRecordUnaryUnaryUseContextManager(self):
132        with grpc_observability.OpenTelemetryPlugin(
133            meter_provider=self._provider
134        ):
135            server, port = _test_server.start_server()
136            self._server = server
137            _test_server.unary_unary_call(port=port)
138
139        self._validate_metrics_exist(self.all_metrics)
140        self._validate_all_metrics_names(self.all_metrics)
141
142    def testRecordUnaryUnaryUseGlobalInit(self):
143        otel_plugin = grpc_observability.OpenTelemetryPlugin(
144            meter_provider=self._provider
145        )
146        otel_plugin.register_global()
147
148        server, port = _test_server.start_server()
149        self._server = server
150        _test_server.unary_unary_call(port=port)
151
152        self._validate_metrics_exist(self.all_metrics)
153        self._validate_all_metrics_names(self.all_metrics)
154        otel_plugin.deregister_global()
155
156    def testCallGlobalInitThrowErrorWhenGlobalCalled(self):
157        otel_plugin = grpc_observability.OpenTelemetryPlugin(
158            meter_provider=self._provider
159        )
160        otel_plugin.register_global()
161        try:
162            otel_plugin.register_global()
163        except RuntimeError as exp:
164            self.assertIn(
165                "gPRC Python observability was already initialized", str(exp)
166            )
167
168        otel_plugin.deregister_global()
169
170    def testCallGlobalInitThrowErrorWhenContextManagerCalled(self):
171        with grpc_observability.OpenTelemetryPlugin(
172            meter_provider=self._provider
173        ):
174            try:
175                otel_plugin = grpc_observability.OpenTelemetryPlugin(
176                    meter_provider=self._provider
177                )
178                otel_plugin.register_global()
179            except RuntimeError as exp:
180                self.assertIn(
181                    "gPRC Python observability was already initialized",
182                    str(exp),
183                )
184
185    def testCallContextManagerThrowErrorWhenGlobalInitCalled(self):
186        otel_plugin = grpc_observability.OpenTelemetryPlugin(
187            meter_provider=self._provider
188        )
189        otel_plugin.register_global()
190        try:
191            with grpc_observability.OpenTelemetryPlugin(
192                meter_provider=self._provider
193            ):
194                pass
195        except RuntimeError as exp:
196            self.assertIn(
197                "gPRC Python observability was already initialized", str(exp)
198            )
199        otel_plugin.deregister_global()
200
201    def testContextManagerThrowErrorWhenContextManagerCalled(self):
202        with grpc_observability.OpenTelemetryPlugin(
203            meter_provider=self._provider
204        ):
205            try:
206                with grpc_observability.OpenTelemetryPlugin(
207                    meter_provider=self._provider
208                ):
209                    pass
210            except RuntimeError as exp:
211                self.assertIn(
212                    "gPRC Python observability was already initialized",
213                    str(exp),
214                )
215
216    def testNoErrorCallGlobalInitThenContextManager(self):
217        otel_plugin = grpc_observability.OpenTelemetryPlugin(
218            meter_provider=self._provider
219        )
220        otel_plugin.register_global()
221        otel_plugin.deregister_global()
222
223        with grpc_observability.OpenTelemetryPlugin(
224            meter_provider=self._provider
225        ):
226            pass
227
228    def testNoErrorCallContextManagerThenGlobalInit(self):
229        with grpc_observability.OpenTelemetryPlugin(
230            meter_provider=self._provider
231        ):
232            pass
233        otel_plugin = grpc_observability.OpenTelemetryPlugin(
234            meter_provider=self._provider
235        )
236        otel_plugin.register_global()
237        otel_plugin.deregister_global()
238
239    def testRecordUnaryUnaryWithClientInterceptor(self):
240        interceptor = _ClientUnaryUnaryInterceptor()
241        with grpc_observability.OpenTelemetryPlugin(
242            meter_provider=self._provider
243        ):
244            server, port = _test_server.start_server()
245            self._server = server
246            _test_server.intercepted_unary_unary_call(
247                port=port, interceptors=interceptor
248            )
249
250        self._validate_metrics_exist(self.all_metrics)
251        self._validate_all_metrics_names(self.all_metrics)
252
253    def testRecordUnaryUnaryWithServerInterceptor(self):
254        interceptor = _ServerInterceptor()
255        with grpc_observability.OpenTelemetryPlugin(
256            meter_provider=self._provider
257        ):
258            server, port = _test_server.start_server(interceptors=[interceptor])
259            self._server = server
260            _test_server.unary_unary_call(port=port)
261
262        self._validate_metrics_exist(self.all_metrics)
263        self._validate_all_metrics_names(self.all_metrics)
264
265    def testRecordUnaryUnaryClientOnly(self):
266        server, port = _test_server.start_server()
267        self._server = server
268
269        with grpc_observability.OpenTelemetryPlugin(
270            meter_provider=self._provider
271        ):
272            _test_server.unary_unary_call(port=port)
273
274        self._validate_metrics_exist(self.all_metrics)
275        self._validate_client_metrics_names(self.all_metrics)
276
277    def testNoRecordBeforeInit(self):
278        server, port = _test_server.start_server()
279        _test_server.unary_unary_call(port=port)
280        self.assertEqual(len(self.all_metrics), 0)
281        server.stop(0)
282
283        with grpc_observability.OpenTelemetryPlugin(
284            meter_provider=self._provider
285        ):
286            server, port = _test_server.start_server()
287            self._server = server
288            _test_server.unary_unary_call(port=port)
289
290        self._validate_metrics_exist(self.all_metrics)
291        self._validate_all_metrics_names(self.all_metrics)
292
293    def testNoRecordAfterExitUseContextManager(self):
294        with grpc_observability.OpenTelemetryPlugin(
295            meter_provider=self._provider
296        ):
297            server, port = _test_server.start_server()
298            self._server = server
299            self._port = port
300            _test_server.unary_unary_call(port=port)
301
302        self._validate_metrics_exist(self.all_metrics)
303        self._validate_all_metrics_names(self.all_metrics)
304
305        self.all_metrics = defaultdict(list)
306        _test_server.unary_unary_call(port=self._port)
307        with self.assertRaisesRegex(AssertionError, "No metrics was exported"):
308            self._validate_metrics_exist(self.all_metrics)
309
310    def testNoRecordAfterExitUseGlobal(self):
311        otel_plugin = grpc_observability.OpenTelemetryPlugin(
312            meter_provider=self._provider
313        )
314        otel_plugin.register_global()
315
316        server, port = _test_server.start_server()
317        self._server = server
318        self._port = port
319        _test_server.unary_unary_call(port=port)
320        otel_plugin.deregister_global()
321
322        self._validate_metrics_exist(self.all_metrics)
323        self._validate_all_metrics_names(self.all_metrics)
324
325        self.all_metrics = defaultdict(list)
326        _test_server.unary_unary_call(port=self._port)
327        with self.assertRaisesRegex(AssertionError, "No metrics was exported"):
328            self._validate_metrics_exist(self.all_metrics)
329
330    def testRecordUnaryStream(self):
331        with grpc_observability.OpenTelemetryPlugin(
332            meter_provider=self._provider
333        ):
334            server, port = _test_server.start_server()
335            self._server = server
336            _test_server.unary_stream_call(port=port)
337
338        self._validate_metrics_exist(self.all_metrics)
339        self._validate_all_metrics_names(self.all_metrics)
340
341    def testRecordStreamUnary(self):
342        with grpc_observability.OpenTelemetryPlugin(
343            meter_provider=self._provider
344        ):
345            server, port = _test_server.start_server()
346            self._server = server
347            _test_server.stream_unary_call(port=port)
348
349        self._validate_metrics_exist(self.all_metrics)
350        self._validate_all_metrics_names(self.all_metrics)
351
352    def testRecordStreamStream(self):
353        with grpc_observability.OpenTelemetryPlugin(
354            meter_provider=self._provider
355        ):
356            server, port = _test_server.start_server()
357            self._server = server
358            _test_server.stream_stream_call(port=port)
359
360        self._validate_metrics_exist(self.all_metrics)
361        self._validate_all_metrics_names(self.all_metrics)
362
363    def testTargetAttributeFilter(self):
364        main_server, main_port = _test_server.start_server()
365        backup_server, backup_port = _test_server.start_server()
366        main_target = f"localhost:{main_port}"
367        backup_target = f"localhost:{backup_port}"
368
369        # Replace target label with 'other' for main_server.
370        def target_filter(target: str) -> bool:
371            if main_target in target:
372                return False
373            return True
374
375        with grpc_observability.OpenTelemetryPlugin(
376            meter_provider=self._provider, target_attribute_filter=target_filter
377        ):
378            _test_server.unary_unary_call(port=main_port)
379            _test_server.unary_unary_call(port=backup_port)
380
381        self._validate_metrics_exist(self.all_metrics)
382        self._validate_client_metrics_names(self.all_metrics)
383
384        target_values = set()
385        for label_list in self.all_metrics.values():
386            for labels in label_list:
387                if GRPC_TARGET_LABEL in labels:
388                    target_values.add(labels[GRPC_TARGET_LABEL])
389        self.assertTrue(GRPC_OTHER_LABEL_VALUE in target_values)
390        self.assertTrue(backup_target in target_values)
391
392        main_server.stop(0)
393        backup_server.stop(0)
394
395    def testMethodAttributeFilter(self):
396        # If method name is 'test/UnaryUnaryFiltered', is should be replaced with 'other'.
397        FILTERED_METHOD_NAME = "test/UnaryUnaryFiltered"
398
399        def method_filter(method: str) -> bool:
400            if FILTERED_METHOD_NAME in method:
401                return False
402            return True
403
404        with grpc_observability.OpenTelemetryPlugin(
405            meter_provider=self._provider,
406            generic_method_attribute_filter=method_filter,
407        ):
408            server, port = _test_server.start_server()
409            self._server = server
410            _test_server.unary_unary_call(port=port)
411            _test_server.unary_unary_filtered_call(port=port)
412
413        self._validate_metrics_exist(self.all_metrics)
414        self._validate_all_metrics_names(self.all_metrics)
415        method_values = set()
416        for label_list in self.all_metrics.values():
417            for labels in label_list:
418                if GRPC_METHOD_LABEL in labels:
419                    method_values.add(labels[GRPC_METHOD_LABEL])
420        self.assertTrue(GRPC_OTHER_LABEL_VALUE in method_values)
421        self.assertTrue(FILTERED_METHOD_NAME not in method_values)
422
423    def assert_eventually(
424        self,
425        predicate: Callable[[], bool],
426        *,
427        timeout: Optional[datetime.timedelta] = None,
428        message: Optional[Callable[[], str]] = None,
429    ) -> None:
430        message = message or (lambda: "Proposition did not evaluate to true")
431        timeout = timeout or datetime.timedelta(seconds=5)
432        end = datetime.datetime.now() + timeout
433        while datetime.datetime.now() < end:
434            if predicate():
435                break
436            time.sleep(0.5)
437        else:
438            self.fail(message() + " after " + str(timeout))
439
440    def _validate_metrics_exist(self, all_metrics: Dict[str, Any]) -> None:
441        # Sleep here to make sure we have at least one export from OTel MetricExporter.
442        self.assert_eventually(
443            lambda: len(all_metrics.keys()) > 1,
444            message=lambda: f"No metrics was exported",
445        )
446
447    def _validate_all_metrics_names(self, metric_names: Set[str]) -> None:
448        self._validate_server_metrics_names(metric_names)
449        self._validate_client_metrics_names(metric_names)
450
451    def _validate_server_metrics_names(self, metric_names: Set[str]) -> None:
452        for base_metric in _open_telemetry_measures.base_metrics():
453            if "grpc.server" in base_metric.name:
454                self.assertTrue(
455                    base_metric.name in metric_names,
456                    msg=f"metric {base_metric.name} not found in exported metrics: {metric_names}!",
457                )
458
459    def _validate_client_metrics_names(self, metric_names: Set[str]) -> None:
460        for base_metric in _open_telemetry_measures.base_metrics():
461            if "grpc.client" in base_metric.name:
462                self.assertTrue(
463                    base_metric.name in metric_names,
464                    msg=f"metric {base_metric.name} not found in exported metrics: {metric_names}!",
465                )
466
467
468if __name__ == "__main__":
469    logging.basicConfig()
470    unittest.main(verbosity=2)
471