1# Copyright 2024 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import os 16import re 17from typing import AnyStr, Callable, Dict, Iterable, List, Optional, Union 18 19from google.protobuf import struct_pb2 20from grpc_observability._observability import OptionalLabelType 21from grpc_observability._open_telemetry_plugin import OpenTelemetryLabelInjector 22from grpc_observability._open_telemetry_plugin import OpenTelemetryPlugin 23from grpc_observability._open_telemetry_plugin import OpenTelemetryPluginOption 24 25# pytype: disable=pyi-error 26from opentelemetry.metrics import MeterProvider 27from opentelemetry.resourcedetector.gcp_resource_detector import ( 28 GoogleCloudResourceDetector, 29) 30from opentelemetry.sdk.resources import Resource 31from opentelemetry.semconv.resource import ResourceAttributes 32 33TRAFFIC_DIRECTOR_AUTHORITY = "traffic-director-global.xds.googleapis.com" 34UNKNOWN_VALUE = "unknown" 35TYPE_GCE = "gcp_compute_engine" 36TYPE_GKE = "gcp_kubernetes_engine" 37MESH_ID_PREFIX = "mesh:" 38 39METADATA_EXCHANGE_KEY_FIXED_MAP = { 40 "type": "csm.remote_workload_type", 41 "canonical_service": "csm.remote_workload_canonical_service", 42} 43 44METADATA_EXCHANGE_KEY_GKE_MAP = { 45 "workload_name": "csm.remote_workload_name", 46 "namespace_name": "csm.remote_workload_namespace_name", 47 "cluster_name": "csm.remote_workload_cluster_name", 48 "location": "csm.remote_workload_location", 49 "project_id": "csm.remote_workload_project_id", 50} 51 52METADATA_EXCHANGE_KEY_GCE_MAP = { 53 "workload_name": "csm.remote_workload_name", 54 "location": "csm.remote_workload_location", 55 "project_id": "csm.remote_workload_project_id", 56} 57 58 59class CSMOpenTelemetryLabelInjector(OpenTelemetryLabelInjector): 60 """ 61 An implementation of OpenTelemetryLabelInjector for CSM. 62 63 This injector will fetch labels from GCP resource detector and 64 environment, it's also responsible for serialize and deserialize 65 metadata exchange labels. 66 """ 67 68 _exchange_labels: Dict[str, AnyStr] 69 _additional_exchange_labels: Dict[str, str] 70 71 def __init__(self): 72 fields = {} 73 self._exchange_labels = {} 74 self._additional_exchange_labels = {} 75 76 # Labels from environment 77 canonical_service_value = os.getenv( 78 "CSM_CANONICAL_SERVICE_NAME", UNKNOWN_VALUE 79 ) 80 workload_name_value = os.getenv("CSM_WORKLOAD_NAME", UNKNOWN_VALUE) 81 mesh_id = os.getenv("CSM_MESH_ID", UNKNOWN_VALUE) 82 83 gcp_resource = GoogleCloudResourceDetector().detect() 84 resource_type_value = get_resource_type(gcp_resource) 85 namespace_value = get_str_value_from_resource( 86 ResourceAttributes.K8S_NAMESPACE_NAME, gcp_resource 87 ) 88 cluster_name_value = get_str_value_from_resource( 89 ResourceAttributes.K8S_CLUSTER_NAME, gcp_resource 90 ) 91 # ResourceAttributes.CLOUD_AVAILABILITY_ZONE are called 92 # "zones" on Google Cloud. 93 location_value = get_str_value_from_resource("cloud.zone", gcp_resource) 94 if UNKNOWN_VALUE == location_value: 95 location_value = get_str_value_from_resource( 96 ResourceAttributes.CLOUD_REGION, gcp_resource 97 ) 98 project_id_value = get_str_value_from_resource( 99 ResourceAttributes.CLOUD_ACCOUNT_ID, gcp_resource 100 ) 101 102 fields["type"] = struct_pb2.Value(string_value=resource_type_value) 103 fields["canonical_service"] = struct_pb2.Value( 104 string_value=canonical_service_value 105 ) 106 if resource_type_value == TYPE_GKE: 107 fields["workload_name"] = struct_pb2.Value( 108 string_value=workload_name_value 109 ) 110 fields["namespace_name"] = struct_pb2.Value( 111 string_value=namespace_value 112 ) 113 fields["cluster_name"] = struct_pb2.Value( 114 string_value=cluster_name_value 115 ) 116 fields["location"] = struct_pb2.Value(string_value=location_value) 117 fields["project_id"] = struct_pb2.Value( 118 string_value=project_id_value 119 ) 120 elif resource_type_value == TYPE_GCE: 121 fields["workload_name"] = struct_pb2.Value( 122 string_value=workload_name_value 123 ) 124 fields["location"] = struct_pb2.Value(string_value=location_value) 125 fields["project_id"] = struct_pb2.Value( 126 string_value=project_id_value 127 ) 128 129 serialized_struct = struct_pb2.Struct(fields=fields) 130 serialized_str = serialized_struct.SerializeToString() 131 132 self._exchange_labels = {"XEnvoyPeerMetadata": serialized_str} 133 self._additional_exchange_labels[ 134 "csm.workload_canonical_service" 135 ] = canonical_service_value 136 self._additional_exchange_labels["csm.mesh_id"] = mesh_id 137 138 def get_labels_for_exchange(self) -> Dict[str, AnyStr]: 139 return self._exchange_labels 140 141 def get_additional_labels( 142 self, include_exchange_labels: bool 143 ) -> Dict[str, str]: 144 if include_exchange_labels: 145 return self._additional_exchange_labels 146 else: 147 return {} 148 149 @staticmethod 150 def deserialize_labels(labels: Dict[str, AnyStr]) -> Dict[str, AnyStr]: 151 deserialized_labels = {} 152 for key, value in labels.items(): 153 if "XEnvoyPeerMetadata" == key: 154 pb_struct = struct_pb2.Struct() 155 pb_struct.ParseFromString(value) 156 157 remote_type = get_value_from_struct("type", pb_struct) 158 159 for ( 160 local_key, 161 remote_key, 162 ) in METADATA_EXCHANGE_KEY_FIXED_MAP.items(): 163 deserialized_labels[remote_key] = get_value_from_struct( 164 local_key, pb_struct 165 ) 166 if remote_type == TYPE_GKE: 167 for ( 168 local_key, 169 remote_key, 170 ) in METADATA_EXCHANGE_KEY_GKE_MAP.items(): 171 deserialized_labels[remote_key] = get_value_from_struct( 172 local_key, pb_struct 173 ) 174 elif remote_type == TYPE_GCE: 175 for ( 176 local_key, 177 remote_key, 178 ) in METADATA_EXCHANGE_KEY_GCE_MAP.items(): 179 deserialized_labels[remote_key] = get_value_from_struct( 180 local_key, pb_struct 181 ) 182 # If CSM label injector is enabled on server side but client didn't send 183 # XEnvoyPeerMetadata, we'll record remote label as unknown. 184 else: 185 for _, remote_key in METADATA_EXCHANGE_KEY_FIXED_MAP.items(): 186 deserialized_labels[remote_key] = UNKNOWN_VALUE 187 deserialized_labels[key] = value 188 189 return deserialized_labels 190 191 192class CsmOpenTelemetryPluginOption(OpenTelemetryPluginOption): 193 """ 194 An implementation of OpenTelemetryPlugin for CSM. 195 """ 196 197 _label_injector: CSMOpenTelemetryLabelInjector 198 199 def __init__(self): 200 self._label_injector = CSMOpenTelemetryLabelInjector() 201 202 @staticmethod 203 def is_active_on_client_channel(target: str) -> bool: 204 """Determines whether this plugin option is active on a channel based on target. 205 206 Args: 207 target: Required. The target for the RPC. 208 209 Returns: 210 True if this plugin option is active on the channel, false otherwise. 211 """ 212 # CSM channels should have an "xds" scheme 213 if not target.startswith("xds:"): 214 return False 215 # If scheme is correct, the authority should be TD if exist 216 authority_pattern = r"^xds:\/\/([^/]+)" 217 match = re.search(authority_pattern, target) 218 if match: 219 return TRAFFIC_DIRECTOR_AUTHORITY in match.group(1) 220 else: 221 # Return True if the authority doesn't exist 222 return True 223 224 @staticmethod 225 def is_active_on_server( 226 xds: bool, # pylint: disable=unused-argument 227 ) -> bool: 228 """Determines whether this plugin option is active on a given server. 229 230 Since servers don't need to be xds enabled to work as part of a service 231 mesh, we're returning True and enable this PluginOption for all servers. 232 233 Note: This always returns true because server can be part of the mesh even 234 if it's not xds-enabled. And we want CSM labels for those servers too. 235 236 Args: 237 xds: Required. if this server is build for xds. 238 239 Returns: 240 True if this plugin option is active on the server, false otherwise. 241 """ 242 return True 243 244 def get_label_injector(self) -> OpenTelemetryLabelInjector: 245 return self._label_injector 246 247 248# pylint: disable=no-self-use 249class CsmOpenTelemetryPlugin(OpenTelemetryPlugin): 250 """Describes a Plugin for CSM OpenTelemetry observability. 251 252 This is class is part of an EXPERIMENTAL API. 253 """ 254 255 plugin_options: Iterable[OpenTelemetryPluginOption] 256 meter_provider: Optional[MeterProvider] 257 generic_method_attribute_filter: Callable[[str], bool] 258 259 def __init__( 260 self, 261 *, 262 plugin_options: Iterable[OpenTelemetryPluginOption] = [], 263 meter_provider: Optional[MeterProvider] = None, 264 generic_method_attribute_filter: Optional[Callable[[str], bool]] = None, 265 ): 266 new_options = list(plugin_options) + [CsmOpenTelemetryPluginOption()] 267 super().__init__( 268 plugin_options=new_options, 269 meter_provider=meter_provider, 270 generic_method_attribute_filter=generic_method_attribute_filter, 271 ) 272 273 def _get_enabled_optional_labels(self) -> List[OptionalLabelType]: 274 return [OptionalLabelType.XDS_SERVICE_LABELS] 275 276 277def get_value_from_struct(key: str, struct: struct_pb2.Struct) -> str: 278 value = struct.fields.get(key) 279 if not value: 280 return UNKNOWN_VALUE 281 return value.string_value 282 283 284def get_str_value_from_resource( 285 attribute: Union[ResourceAttributes, str], resource: Resource 286) -> str: 287 value = resource.attributes.get(attribute, UNKNOWN_VALUE) 288 return str(value) 289 290 291# pylint: disable=line-too-long 292def get_resource_type(gcp_resource: Resource) -> str: 293 # Convert resource type from GoogleCloudResourceDetector to the value we used for 294 # metadata exchange. 295 # Reference: https://github.com/GoogleCloudPlatform/opentelemetry-operations-python/blob/cc61f23a5ff2f16f4aa2c38d07e55153828849cc/opentelemetry-resourcedetector-gcp/src/opentelemetry/resourcedetector/gcp_resource_detector/__init__.py#L96 296 gcp_resource_type = get_str_value_from_resource( 297 "gcp.resource_type", gcp_resource 298 ) 299 if gcp_resource_type == "gke_container": 300 return TYPE_GKE 301 elif gcp_resource_type == "gce_instance": 302 return TYPE_GCE 303 else: 304 return gcp_resource_type 305