• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2024 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import os
16import re
17from typing import AnyStr, Callable, Dict, Iterable, List, Optional, Union
18
19from google.protobuf import struct_pb2
20from grpc_observability._observability import OptionalLabelType
21from grpc_observability._open_telemetry_plugin import OpenTelemetryLabelInjector
22from grpc_observability._open_telemetry_plugin import OpenTelemetryPlugin
23from grpc_observability._open_telemetry_plugin import OpenTelemetryPluginOption
24
25# pytype: disable=pyi-error
26from opentelemetry.metrics import MeterProvider
27from opentelemetry.resourcedetector.gcp_resource_detector import (
28    GoogleCloudResourceDetector,
29)
30from opentelemetry.sdk.resources import Resource
31from opentelemetry.semconv.resource import ResourceAttributes
32
33TRAFFIC_DIRECTOR_AUTHORITY = "traffic-director-global.xds.googleapis.com"
34UNKNOWN_VALUE = "unknown"
35TYPE_GCE = "gcp_compute_engine"
36TYPE_GKE = "gcp_kubernetes_engine"
37MESH_ID_PREFIX = "mesh:"
38
39METADATA_EXCHANGE_KEY_FIXED_MAP = {
40    "type": "csm.remote_workload_type",
41    "canonical_service": "csm.remote_workload_canonical_service",
42}
43
44METADATA_EXCHANGE_KEY_GKE_MAP = {
45    "workload_name": "csm.remote_workload_name",
46    "namespace_name": "csm.remote_workload_namespace_name",
47    "cluster_name": "csm.remote_workload_cluster_name",
48    "location": "csm.remote_workload_location",
49    "project_id": "csm.remote_workload_project_id",
50}
51
52METADATA_EXCHANGE_KEY_GCE_MAP = {
53    "workload_name": "csm.remote_workload_name",
54    "location": "csm.remote_workload_location",
55    "project_id": "csm.remote_workload_project_id",
56}
57
58
59class CSMOpenTelemetryLabelInjector(OpenTelemetryLabelInjector):
60    """
61    An implementation of OpenTelemetryLabelInjector for CSM.
62
63    This injector will fetch labels from GCP resource detector and
64    environment, it's also responsible for serialize and deserialize
65    metadata exchange labels.
66    """
67
68    _exchange_labels: Dict[str, AnyStr]
69    _additional_exchange_labels: Dict[str, str]
70
71    def __init__(self):
72        fields = {}
73        self._exchange_labels = {}
74        self._additional_exchange_labels = {}
75
76        # Labels from environment
77        canonical_service_value = os.getenv(
78            "CSM_CANONICAL_SERVICE_NAME", UNKNOWN_VALUE
79        )
80        workload_name_value = os.getenv("CSM_WORKLOAD_NAME", UNKNOWN_VALUE)
81        mesh_id = os.getenv("CSM_MESH_ID", UNKNOWN_VALUE)
82
83        gcp_resource = GoogleCloudResourceDetector().detect()
84        resource_type_value = get_resource_type(gcp_resource)
85        namespace_value = get_str_value_from_resource(
86            ResourceAttributes.K8S_NAMESPACE_NAME, gcp_resource
87        )
88        cluster_name_value = get_str_value_from_resource(
89            ResourceAttributes.K8S_CLUSTER_NAME, gcp_resource
90        )
91        # ResourceAttributes.CLOUD_AVAILABILITY_ZONE are called
92        # "zones" on Google Cloud.
93        location_value = get_str_value_from_resource("cloud.zone", gcp_resource)
94        if UNKNOWN_VALUE == location_value:
95            location_value = get_str_value_from_resource(
96                ResourceAttributes.CLOUD_REGION, gcp_resource
97            )
98        project_id_value = get_str_value_from_resource(
99            ResourceAttributes.CLOUD_ACCOUNT_ID, gcp_resource
100        )
101
102        fields["type"] = struct_pb2.Value(string_value=resource_type_value)
103        fields["canonical_service"] = struct_pb2.Value(
104            string_value=canonical_service_value
105        )
106        if resource_type_value == TYPE_GKE:
107            fields["workload_name"] = struct_pb2.Value(
108                string_value=workload_name_value
109            )
110            fields["namespace_name"] = struct_pb2.Value(
111                string_value=namespace_value
112            )
113            fields["cluster_name"] = struct_pb2.Value(
114                string_value=cluster_name_value
115            )
116            fields["location"] = struct_pb2.Value(string_value=location_value)
117            fields["project_id"] = struct_pb2.Value(
118                string_value=project_id_value
119            )
120        elif resource_type_value == TYPE_GCE:
121            fields["workload_name"] = struct_pb2.Value(
122                string_value=workload_name_value
123            )
124            fields["location"] = struct_pb2.Value(string_value=location_value)
125            fields["project_id"] = struct_pb2.Value(
126                string_value=project_id_value
127            )
128
129        serialized_struct = struct_pb2.Struct(fields=fields)
130        serialized_str = serialized_struct.SerializeToString()
131
132        self._exchange_labels = {"XEnvoyPeerMetadata": serialized_str}
133        self._additional_exchange_labels[
134            "csm.workload_canonical_service"
135        ] = canonical_service_value
136        self._additional_exchange_labels["csm.mesh_id"] = mesh_id
137
138    def get_labels_for_exchange(self) -> Dict[str, AnyStr]:
139        return self._exchange_labels
140
141    def get_additional_labels(
142        self, include_exchange_labels: bool
143    ) -> Dict[str, str]:
144        if include_exchange_labels:
145            return self._additional_exchange_labels
146        else:
147            return {}
148
149    @staticmethod
150    def deserialize_labels(labels: Dict[str, AnyStr]) -> Dict[str, AnyStr]:
151        deserialized_labels = {}
152        for key, value in labels.items():
153            if "XEnvoyPeerMetadata" == key:
154                pb_struct = struct_pb2.Struct()
155                pb_struct.ParseFromString(value)
156
157                remote_type = get_value_from_struct("type", pb_struct)
158
159                for (
160                    local_key,
161                    remote_key,
162                ) in METADATA_EXCHANGE_KEY_FIXED_MAP.items():
163                    deserialized_labels[remote_key] = get_value_from_struct(
164                        local_key, pb_struct
165                    )
166                if remote_type == TYPE_GKE:
167                    for (
168                        local_key,
169                        remote_key,
170                    ) in METADATA_EXCHANGE_KEY_GKE_MAP.items():
171                        deserialized_labels[remote_key] = get_value_from_struct(
172                            local_key, pb_struct
173                        )
174                elif remote_type == TYPE_GCE:
175                    for (
176                        local_key,
177                        remote_key,
178                    ) in METADATA_EXCHANGE_KEY_GCE_MAP.items():
179                        deserialized_labels[remote_key] = get_value_from_struct(
180                            local_key, pb_struct
181                        )
182            # If CSM label injector is enabled on server side but client didn't send
183            # XEnvoyPeerMetadata, we'll record remote label as unknown.
184            else:
185                for _, remote_key in METADATA_EXCHANGE_KEY_FIXED_MAP.items():
186                    deserialized_labels[remote_key] = UNKNOWN_VALUE
187                deserialized_labels[key] = value
188
189        return deserialized_labels
190
191
192class CsmOpenTelemetryPluginOption(OpenTelemetryPluginOption):
193    """
194    An implementation of OpenTelemetryPlugin for CSM.
195    """
196
197    _label_injector: CSMOpenTelemetryLabelInjector
198
199    def __init__(self):
200        self._label_injector = CSMOpenTelemetryLabelInjector()
201
202    @staticmethod
203    def is_active_on_client_channel(target: str) -> bool:
204        """Determines whether this plugin option is active on a channel based on target.
205
206        Args:
207          target: Required. The target for the RPC.
208
209        Returns:
210          True if this plugin option is active on the channel, false otherwise.
211        """
212        # CSM channels should have an "xds" scheme
213        if not target.startswith("xds:"):
214            return False
215        # If scheme is correct, the authority should be TD if exist
216        authority_pattern = r"^xds:\/\/([^/]+)"
217        match = re.search(authority_pattern, target)
218        if match:
219            return TRAFFIC_DIRECTOR_AUTHORITY in match.group(1)
220        else:
221            # Return True if the authority doesn't exist
222            return True
223
224    @staticmethod
225    def is_active_on_server(
226        xds: bool,  # pylint: disable=unused-argument
227    ) -> bool:
228        """Determines whether this plugin option is active on a given server.
229
230        Since servers don't need to be xds enabled to work as part of a service
231        mesh, we're returning True and enable this PluginOption for all servers.
232
233        Note: This always returns true because server can be part of the mesh even
234        if it's not xds-enabled. And we want CSM labels for those servers too.
235
236        Args:
237          xds: Required. if this server is build for xds.
238
239        Returns:
240          True if this plugin option is active on the server, false otherwise.
241        """
242        return True
243
244    def get_label_injector(self) -> OpenTelemetryLabelInjector:
245        return self._label_injector
246
247
248# pylint: disable=no-self-use
249class CsmOpenTelemetryPlugin(OpenTelemetryPlugin):
250    """Describes a Plugin for CSM OpenTelemetry observability.
251
252    This is class is part of an EXPERIMENTAL API.
253    """
254
255    plugin_options: Iterable[OpenTelemetryPluginOption]
256    meter_provider: Optional[MeterProvider]
257    generic_method_attribute_filter: Callable[[str], bool]
258
259    def __init__(
260        self,
261        *,
262        plugin_options: Iterable[OpenTelemetryPluginOption] = [],
263        meter_provider: Optional[MeterProvider] = None,
264        generic_method_attribute_filter: Optional[Callable[[str], bool]] = None,
265    ):
266        new_options = list(plugin_options) + [CsmOpenTelemetryPluginOption()]
267        super().__init__(
268            plugin_options=new_options,
269            meter_provider=meter_provider,
270            generic_method_attribute_filter=generic_method_attribute_filter,
271        )
272
273    def _get_enabled_optional_labels(self) -> List[OptionalLabelType]:
274        return [OptionalLabelType.XDS_SERVICE_LABELS]
275
276
277def get_value_from_struct(key: str, struct: struct_pb2.Struct) -> str:
278    value = struct.fields.get(key)
279    if not value:
280        return UNKNOWN_VALUE
281    return value.string_value
282
283
284def get_str_value_from_resource(
285    attribute: Union[ResourceAttributes, str], resource: Resource
286) -> str:
287    value = resource.attributes.get(attribute, UNKNOWN_VALUE)
288    return str(value)
289
290
291# pylint: disable=line-too-long
292def get_resource_type(gcp_resource: Resource) -> str:
293    # Convert resource type from GoogleCloudResourceDetector to the value we used for
294    # metadata exchange.
295    # Reference: https://github.com/GoogleCloudPlatform/opentelemetry-operations-python/blob/cc61f23a5ff2f16f4aa2c38d07e55153828849cc/opentelemetry-resourcedetector-gcp/src/opentelemetry/resourcedetector/gcp_resource_detector/__init__.py#L96
296    gcp_resource_type = get_str_value_from_resource(
297        "gcp.resource_type", gcp_resource
298    )
299    if gcp_resource_type == "gke_container":
300        return TYPE_GKE
301    elif gcp_resource_type == "gce_instance":
302        return TYPE_GCE
303    else:
304        return gcp_resource_type
305