1# Copyright 2023 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15from datetime import datetime 16import os 17from typing import List, Mapping, Optional, Tuple 18 19from google.rpc import code_pb2 20from grpc_observability import _observability # pytype: disable=pyi-error 21from grpc_observability import _observability_config 22from grpc_observability import _views 23from opencensus.common.transports import async_ 24from opencensus.ext.stackdriver import stats_exporter 25from opencensus.ext.stackdriver import trace_exporter 26from opencensus.stats import stats as stats_module 27from opencensus.stats.stats_recorder import StatsRecorder 28from opencensus.stats.view_manager import ViewManager 29from opencensus.tags.tag_key import TagKey 30from opencensus.tags.tag_map import TagMap 31from opencensus.tags.tag_value import TagValue 32from opencensus.trace import execution_context 33from opencensus.trace import samplers 34from opencensus.trace import span 35from opencensus.trace import span_context as span_context_module 36from opencensus.trace import span_data as span_data_module 37from opencensus.trace import status 38from opencensus.trace import time_event 39from opencensus.trace import trace_options 40from opencensus.trace import tracer 41 42# 60s is the default time for open census to call export. 43CENSUS_UPLOAD_INTERVAL_SECS = int( 44 os.environ.get("GRPC_PYTHON_CENSUS_EXPORT_UPLOAD_INTERVAL_SECS", 20) 45) 46 47 48class StackDriverAsyncTransport(async_.AsyncTransport): 49 """Wrapper class used to pass wait_period. 50 51 This is required because current StackDriver Tracing Exporter doesn't allow 52 us pass wait_period to AsyncTransport directly. 53 54 Args: 55 exporter: An opencensus.trace.base_exporter.Exporter object. 56 """ 57 58 def __init__(self, exporter): 59 super().__init__(exporter, wait_period=CENSUS_UPLOAD_INTERVAL_SECS) 60 61 62class OpenCensusExporter(_observability.Exporter): 63 config: _observability_config.GcpObservabilityConfig 64 default_labels: Optional[Mapping[str, str]] 65 project_id: str 66 tracer: Optional[tracer.Tracer] 67 stats_recorder: Optional[StatsRecorder] 68 view_manager: Optional[ViewManager] 69 70 def __init__(self, config: _observability_config.GcpObservabilityConfig): 71 self.config = config.get() 72 self.default_labels = self.config.labels 73 self.project_id = self.config.project_id 74 self.tracer = None 75 self.stats_recorder = None 76 self.view_manager = None 77 self._setup_open_census_stackdriver_exporter() 78 79 def _setup_open_census_stackdriver_exporter(self) -> None: 80 if self.config.stats_enabled: 81 stats = stats_module.stats 82 self.stats_recorder = stats.stats_recorder 83 self.view_manager = stats.view_manager 84 # If testing locally please add resource="global" to Options, otherwise 85 # StackDriver might override project_id based on detected resource. 86 options = stats_exporter.Options(project_id=self.project_id) 87 metrics_exporter = stats_exporter.new_stats_exporter( 88 options, interval=CENSUS_UPLOAD_INTERVAL_SECS 89 ) 90 self.view_manager.register_exporter(metrics_exporter) 91 self._register_open_census_views() 92 93 if self.config.tracing_enabled: 94 current_tracer = execution_context.get_opencensus_tracer() 95 trace_id = current_tracer.span_context.trace_id 96 span_id = current_tracer.span_context.span_id 97 if not span_id: 98 span_id = span_context_module.generate_span_id() 99 span_context = span_context_module.SpanContext( 100 trace_id=trace_id, span_id=span_id 101 ) 102 # Create and Saves Tracer and Sampler to ContextVar 103 sampler = samplers.ProbabilitySampler( 104 rate=self.config.sampling_rate 105 ) 106 self.trace_exporter = trace_exporter.StackdriverExporter( 107 project_id=self.project_id, 108 transport=StackDriverAsyncTransport, 109 ) 110 self.tracer = tracer.Tracer( 111 sampler=sampler, 112 span_context=span_context, 113 exporter=self.trace_exporter, 114 ) 115 116 def export_stats_data( 117 self, stats_data: List[_observability.StatsData] 118 ) -> None: 119 if not self.config.stats_enabled: 120 return 121 for data in stats_data: 122 measure = _views.METRICS_NAME_TO_MEASURE.get(data.name, None) 123 if not measure: 124 continue 125 # Create a measurement map for each metric, otherwise metrics will 126 # be overridden instead of accumulate. 127 measurement_map = self.stats_recorder.new_measurement_map() 128 # Add data label to default labels. 129 labels = data.labels 130 labels.update(self.default_labels) 131 tag_map = TagMap() 132 for key, value in labels.items(): 133 tag_map.insert(TagKey(key), TagValue(value)) 134 135 if data.measure_double: 136 measurement_map.measure_float_put(measure, data.value_float) 137 else: 138 measurement_map.measure_int_put(measure, data.value_int) 139 measurement_map.record(tag_map) 140 141 def export_tracing_data( 142 self, tracing_data: List[_observability.TracingData] 143 ) -> None: 144 if not self.config.tracing_enabled: 145 return 146 for span_data in tracing_data: 147 # Only traced data will be exported, thus TraceOptions=1. 148 span_context = span_context_module.SpanContext( 149 trace_id=span_data.trace_id, 150 span_id=span_data.span_id, 151 trace_options=trace_options.TraceOptions(1), 152 ) 153 span_datas = _get_span_data( 154 span_data, span_context, self.default_labels 155 ) 156 self.trace_exporter.export(span_datas) 157 158 def _register_open_census_views(self) -> None: 159 # Client 160 self.view_manager.register_view( 161 _views.client_started_rpcs(self.default_labels) 162 ) 163 self.view_manager.register_view( 164 _views.client_completed_rpcs(self.default_labels) 165 ) 166 self.view_manager.register_view( 167 _views.client_roundtrip_latency(self.default_labels) 168 ) 169 self.view_manager.register_view( 170 _views.client_api_latency(self.default_labels) 171 ) 172 self.view_manager.register_view( 173 _views.client_sent_compressed_message_bytes_per_rpc( 174 self.default_labels 175 ) 176 ) 177 self.view_manager.register_view( 178 _views.client_received_compressed_message_bytes_per_rpc( 179 self.default_labels 180 ) 181 ) 182 183 # Server 184 self.view_manager.register_view( 185 _views.server_started_rpcs(self.default_labels) 186 ) 187 self.view_manager.register_view( 188 _views.server_completed_rpcs(self.default_labels) 189 ) 190 self.view_manager.register_view( 191 _views.server_sent_compressed_message_bytes_per_rpc( 192 self.default_labels 193 ) 194 ) 195 self.view_manager.register_view( 196 _views.server_received_compressed_message_bytes_per_rpc( 197 self.default_labels 198 ) 199 ) 200 self.view_manager.register_view( 201 _views.server_server_latency(self.default_labels) 202 ) 203 204 205def _get_span_annotations( 206 span_annotations: List[Tuple[str, str]] 207) -> List[time_event.Annotation]: 208 annotations = [] 209 210 for time_stamp, description in span_annotations: 211 time = datetime.fromisoformat(time_stamp) 212 annotations.append(time_event.Annotation(time, description)) 213 214 return annotations 215 216 217# pylint: disable=too-many-return-statements 218# pylint: disable=too-many-branches 219def _status_to_span_status(span_status: str) -> Optional[status.Status]: 220 if status == "OK": 221 return status.Status(code_pb2.OK, message=span_status) 222 elif status == "CANCELLED": 223 return status.Status(code_pb2.CANCELLED, message=span_status) 224 elif status == "UNKNOWN": 225 return status.Status(code_pb2.UNKNOWN, message=span_status) 226 elif status == "INVALID_ARGUMENT": 227 return status.Status(code_pb2.INVALID_ARGUMENT, message=span_status) 228 elif status == "DEADLINE_EXCEEDED": 229 return status.Status(code_pb2.DEADLINE_EXCEEDED, message=span_status) 230 elif status == "NOT_FOUND": 231 return status.Status(code_pb2.NOT_FOUND, message=span_status) 232 elif status == "ALREADY_EXISTS": 233 return status.Status(code_pb2.ALREADY_EXISTS, message=span_status) 234 elif status == "PERMISSION_DENIED": 235 return status.Status(code_pb2.PERMISSION_DENIED, message=span_status) 236 elif status == "UNAUTHENTICATED": 237 return status.Status(code_pb2.UNAUTHENTICATED, message=span_status) 238 elif status == "RESOURCE_EXHAUSTED": 239 return status.Status(code_pb2.RESOURCE_EXHAUSTED, message=span_status) 240 elif status == "FAILED_PRECONDITION": 241 return status.Status(code_pb2.FAILED_PRECONDITION, message=span_status) 242 elif status == "ABORTED": 243 return status.Status(code_pb2.ABORTED, message=span_status) 244 elif status == "OUT_OF_RANGE": 245 return status.Status(code_pb2.OUT_OF_RANGE, message=span_status) 246 elif status == "UNIMPLEMENTED": 247 return status.Status(code_pb2.UNIMPLEMENTED, message=span_status) 248 elif status == "INTERNAL": 249 return status.Status(code_pb2.INTERNAL, message=span_status) 250 elif status == "UNAVAILABLE": 251 return status.Status(code_pb2.UNAVAILABLE, message=span_status) 252 elif status == "DATA_LOSS": 253 return status.Status(code_pb2.DATA_LOSS, message=span_status) 254 else: 255 return None 256 257 258def _get_span_data( 259 span_data: _observability.TracingData, 260 span_context: span_context_module.SpanContext, 261 labels: Mapping[str, str], 262) -> List[span_data_module.SpanData]: 263 """Extracts a list of SpanData tuples from a span. 264 265 Args: 266 span_data: _observability.TracingData to convert. 267 span_context: The context related to the span_data. 268 labels: Labels to be added to SpanData. 269 270 Returns: 271 A list of opencensus.trace.span_data.SpanData. 272 """ 273 span_attributes = span_data.span_labels 274 span_attributes.update(labels) 275 span_status = _status_to_span_status(span_data.status) 276 span_annotations = _get_span_annotations(span_data.span_annotations) 277 span_datas = [ 278 span_data_module.SpanData( 279 name=span_data.name, 280 context=span_context, 281 span_id=span_data.span_id, 282 parent_span_id=span_data.parent_span_id 283 if span_data.parent_span_id 284 else None, 285 attributes=span_attributes, 286 start_time=span_data.start_time, 287 end_time=span_data.end_time, 288 child_span_count=span_data.child_span_count, 289 stack_trace=None, 290 annotations=span_annotations, 291 message_events=None, 292 links=None, 293 status=span_status, 294 same_process_as_parent_span=True 295 if span_data.parent_span_id 296 else None, 297 span_kind=span.SpanKind.UNSPECIFIED, 298 ) 299 ] 300 301 return span_datas 302