• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2024 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from collections import defaultdict
16import datetime
17import logging
18import os
19import sys
20import time
21from typing import Any, AnyStr, Callable, Dict, List, Optional, Set
22import unittest
23
24from google.protobuf import struct_pb2
25import grpc_observability
26from grpc_observability import _open_telemetry_measures
27from grpc_observability._open_telemetry_plugin import OpenTelemetryLabelInjector
28from grpc_observability._open_telemetry_plugin import OpenTelemetryPluginOption
29from opentelemetry.sdk.metrics import MeterProvider
30from opentelemetry.sdk.metrics.export import AggregationTemporality
31from opentelemetry.sdk.metrics.export import MetricExportResult
32from opentelemetry.sdk.metrics.export import MetricExporter
33from opentelemetry.sdk.metrics.export import MetricsData
34from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
35
36from tests.observability import _test_server
37
38logger = logging.getLogger(__name__)
39
40STREAM_LENGTH = 5
41OTEL_EXPORT_INTERVAL_S = 0.5
42CSM_METADATA_EXCHANGE_LABEL_KEY = "exchange_labels_key"
43
44# The following metrics should have optional labels when optional
45# labels is enabled through OpenTelemetryPlugin.
46METRIC_NAME_WITH_OPTIONAL_LABEL = [
47    "grpc.client.attempt.duration"
48    "grpc.client.attempt.sent_total_compressed_message_size",
49    "grpc.client.attempt.rcvd_total_compressed_message_size",
50]
51CSM_OPTIONAL_LABEL_KEYS = ["csm.service_name", "csm.service_namespace_name"]
52
53# The following metrics should have metadata exchange labels when metadata
54# exchange flow is triggered.
55METRIC_NAME_WITH_EXCHANGE_LABEL = [
56    "grpc.client.attempt.duration"
57    "grpc.client.attempt.sent_total_compressed_message_size",
58    "grpc.client.attempt.rcvd_total_compressed_message_size",
59    "grpc.server.call.duration",
60    "grpc.server.call.sent_total_compressed_message_size",
61    "grpc.server.call.rcvd_total_compressed_message_size",
62]
63
64
65class OTelMetricExporter(MetricExporter):
66    """Implementation of :class:`MetricExporter` that export metrics to the
67    provided metric_list.
68
69    all_metrics: A dict which key is grpc_observability._opentelemetry_measures.Metric.name,
70        value is a list of labels recorded for that metric.
71        An example item of this dict:
72            {"grpc.client.attempt.started":
73              [{'grpc.method': 'test/UnaryUnary', 'grpc.target': 'localhost:42517'},
74               {'grpc.method': 'other', 'grpc.target': 'localhost:42517'}]}
75    """
76
77    def __init__(
78        self,
79        all_metrics: Dict[str, List],
80        preferred_temporality: Dict[type, AggregationTemporality] = None,
81        preferred_aggregation: Dict[
82            type, "opentelemetry.sdk.metrics.view.Aggregation"
83        ] = None,
84    ):
85        super().__init__(
86            preferred_temporality=preferred_temporality,
87            preferred_aggregation=preferred_aggregation,
88        )
89        self.all_metrics = all_metrics
90
91    def export(
92        self,
93        metrics_data: MetricsData,
94        timeout_millis: float = 10_000,
95        **kwargs,
96    ) -> MetricExportResult:
97        self.record_metric(metrics_data)
98        return MetricExportResult.SUCCESS
99
100    def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
101        pass
102
103    def force_flush(self, timeout_millis: float = 10_000) -> bool:
104        return True
105
106    def record_metric(self, metrics_data: MetricsData) -> None:
107        for resource_metric in metrics_data.resource_metrics:
108            for scope_metric in resource_metric.scope_metrics:
109                for metric in scope_metric.metrics:
110                    for data_point in metric.data.data_points:
111                        self.all_metrics[metric.name].append(
112                            data_point.attributes
113                        )
114
115
116class TestLabelInjector(OpenTelemetryLabelInjector):
117    _exchange_labels: Dict[str, AnyStr]
118    _local_labels: Dict[str, str]
119
120    def __init__(
121        self, local_labels: Dict[str, str], exchange_labels: Dict[str, str]
122    ):
123        self._exchange_labels = exchange_labels
124        self._local_labels = local_labels
125
126    def get_labels_for_exchange(self) -> Dict[str, AnyStr]:
127        return self._exchange_labels
128
129    def get_additional_labels(
130        self, include_exchange_labels: bool
131    ) -> Dict[str, str]:
132        return self._local_labels
133
134    def deserialize_labels(
135        self, labels: Dict[str, AnyStr]
136    ) -> Dict[str, AnyStr]:
137        deserialized_labels = {}
138        for key, value in labels.items():
139            if "XEnvoyPeerMetadata" == key:
140                struct = struct_pb2.Struct()
141                struct.ParseFromString(value)
142
143                exchange_labels_value = self._get_value_from_struct(
144                    CSM_METADATA_EXCHANGE_LABEL_KEY, struct
145                )
146                deserialized_labels[
147                    CSM_METADATA_EXCHANGE_LABEL_KEY
148                ] = exchange_labels_value
149            else:
150                deserialized_labels[key] = value
151
152        return deserialized_labels
153
154    def _get_value_from_struct(
155        self, key: str, struct: struct_pb2.Struct
156    ) -> str:
157        value = struct.fields.get(key)
158        if not value:
159            return "unknown"
160        return value.string_value
161
162
163class TestOpenTelemetryPluginOption(OpenTelemetryPluginOption):
164    _label_injector: OpenTelemetryLabelInjector
165    _active_on_client: bool
166    _active_on_server: bool
167
168    def __init__(
169        self,
170        label_injector: OpenTelemetryLabelInjector,
171        active_on_client: Optional[bool] = True,
172        active_on_server: Optional[bool] = True,
173    ):
174        self._label_injector = label_injector
175        self._active_on_client = active_on_client
176        self._active_on_server = active_on_server
177
178    def is_active_on_client_channel(self, target: str) -> bool:
179        return self._active_on_client
180
181    def is_active_on_server(self, xds: bool) -> bool:
182        return self._active_on_server
183
184    def get_label_injector(self) -> OpenTelemetryLabelInjector:
185        return self._label_injector
186
187
188@unittest.skipIf(
189    os.name == "nt" or "darwin" in sys.platform,
190    "Observability is not supported in Windows and MacOS",
191)
192class ObservabilityPluginTest(unittest.TestCase):
193    def setUp(self):
194        self.all_metrics = defaultdict(list)
195        otel_exporter = OTelMetricExporter(self.all_metrics)
196        reader = PeriodicExportingMetricReader(
197            exporter=otel_exporter,
198            export_interval_millis=OTEL_EXPORT_INTERVAL_S * 1000,
199        )
200        self._provider = MeterProvider(metric_readers=[reader])
201        self._server = None
202        self._port = None
203
204    def tearDown(self):
205        if self._server:
206            self._server.stop(0)
207
208    def testLabelInjectorWithLocalLabels(self):
209        """Local labels in label injector should be added to all metrics."""
210        label_injector = TestLabelInjector(
211            local_labels={"local_labels_key": "local_labels_value"},
212            exchange_labels={},
213        )
214        plugin_option = TestOpenTelemetryPluginOption(
215            label_injector=label_injector
216        )
217        otel_plugin = grpc_observability.OpenTelemetryPlugin(
218            meter_provider=self._provider, plugin_options=[plugin_option]
219        )
220
221        otel_plugin.register_global()
222        self._server, port = _test_server.start_server()
223        _test_server.unary_unary_call(port=port)
224        otel_plugin.deregister_global()
225
226        self._validate_metrics_exist(self.all_metrics)
227        for name, label_list in self.all_metrics.items():
228            self._validate_label_exist(name, label_list, ["local_labels_key"])
229
230    def testClientSidePluginOption(self):
231        label_injector = TestLabelInjector(
232            local_labels={"local_labels_key": "local_labels_value"},
233            exchange_labels={},
234        )
235        plugin_option = TestOpenTelemetryPluginOption(
236            label_injector=label_injector, active_on_server=False
237        )
238        otel_plugin = grpc_observability.OpenTelemetryPlugin(
239            meter_provider=self._provider, plugin_options=[plugin_option]
240        )
241
242        otel_plugin.register_global()
243        server, port = _test_server.start_server()
244        self._server = server
245        _test_server.unary_unary_call(port=port)
246        otel_plugin.deregister_global()
247
248        self._validate_metrics_exist(self.all_metrics)
249        for name, label_list in self.all_metrics.items():
250            if "grpc.client" in name:
251                self._validate_label_exist(
252                    name, label_list, ["local_labels_key"]
253                )
254        for name, label_list in self.all_metrics.items():
255            if "grpc.server" in name:
256                self._validate_label_not_exist(
257                    name, label_list, ["local_labels_key"]
258                )
259
260    def testServerSidePluginOption(self):
261        label_injector = TestLabelInjector(
262            local_labels={"local_labels_key": "local_labels_value"},
263            exchange_labels={},
264        )
265        plugin_option = TestOpenTelemetryPluginOption(
266            label_injector=label_injector, active_on_client=False
267        )
268        otel_plugin = grpc_observability.OpenTelemetryPlugin(
269            meter_provider=self._provider, plugin_options=[plugin_option]
270        )
271
272        otel_plugin.register_global()
273        server, port = _test_server.start_server()
274        self._server = server
275        _test_server.unary_unary_call(port=port)
276        otel_plugin.deregister_global()
277
278        self._validate_metrics_exist(self.all_metrics)
279        for name, label_list in self.all_metrics.items():
280            if "grpc.client" in name:
281                self._validate_label_not_exist(
282                    name, label_list, ["local_labels_key"]
283                )
284        for name, label_list in self.all_metrics.items():
285            if "grpc.server" in name:
286                self._validate_label_exist(
287                    name, label_list, ["local_labels_key"]
288                )
289
290    def assert_eventually(
291        self,
292        predicate: Callable[[], bool],
293        *,
294        timeout: Optional[datetime.timedelta] = None,
295        message: Optional[Callable[[], str]] = None,
296    ) -> None:
297        message = message or (lambda: "Proposition did not evaluate to true")
298        timeout = timeout or datetime.timedelta(seconds=5)
299        end = datetime.datetime.now() + timeout
300        while datetime.datetime.now() < end:
301            if predicate():
302                break
303            time.sleep(0.5)
304        else:
305            self.fail(message() + " after " + str(timeout))
306
307    def _validate_metrics_exist(self, all_metrics: Dict[str, Any]) -> None:
308        # Sleep here to make sure we have at least one export from OTel MetricExporter.
309        self.assert_eventually(
310            lambda: len(all_metrics.keys()) > 1,
311            message=lambda: f"No metrics was exported",
312        )
313
314    def _validate_all_metrics_names(self, metric_names: Set[str]) -> None:
315        self._validate_server_metrics_names(metric_names)
316        self._validate_client_metrics_names(metric_names)
317
318    def _validate_server_metrics_names(self, metric_names: Set[str]) -> None:
319        for base_metric in _open_telemetry_measures.base_metrics():
320            if "grpc.server" in base_metric.name:
321                self.assertTrue(
322                    base_metric.name in metric_names,
323                    msg=f"metric {base_metric.name} not found in exported metrics: {metric_names}!",
324                )
325
326    def _validate_client_metrics_names(self, metric_names: Set[str]) -> None:
327        for base_metric in _open_telemetry_measures.base_metrics():
328            if "grpc.client" in base_metric.name:
329                self.assertTrue(
330                    base_metric.name in metric_names,
331                    msg=f"metric {base_metric.name} not found in exported metrics: {metric_names}!",
332                )
333
334    def _validate_label_exist(
335        self,
336        metric_name: str,
337        metric_label_list: List[str],
338        labels_to_check: List[str],
339    ) -> None:
340        for metric_label in metric_label_list:
341            for label in labels_to_check:
342                self.assertTrue(
343                    label in metric_label,
344                    msg=f"label with key {label} not found in metric {metric_name}, found label list: {metric_label}",
345                )
346
347    def _validate_label_not_exist(
348        self,
349        metric_name: str,
350        metric_label_list: List[str],
351        labels_to_check: List[str],
352    ) -> None:
353        for metric_label in metric_label_list:
354            for label in labels_to_check:
355                self.assertFalse(
356                    label in metric_label,
357                    msg=f"found unexpected label with key {label} in metric {metric_name}, found label list: {metric_label}",
358                )
359
360
361if __name__ == "__main__":
362    logging.basicConfig()
363    unittest.main(verbosity=2)
364