• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2023 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from datetime import datetime
16import os
17from typing import List, Mapping, Optional, Tuple
18
19from google.rpc import code_pb2
20from grpc_observability import _observability  # pytype: disable=pyi-error
21from grpc_observability import _observability_config
22from grpc_observability import _views
23from opencensus.common.transports import async_
24from opencensus.ext.stackdriver import stats_exporter
25from opencensus.ext.stackdriver import trace_exporter
26from opencensus.stats import stats as stats_module
27from opencensus.stats.stats_recorder import StatsRecorder
28from opencensus.stats.view_manager import ViewManager
29from opencensus.tags.tag_key import TagKey
30from opencensus.tags.tag_map import TagMap
31from opencensus.tags.tag_value import TagValue
32from opencensus.trace import execution_context
33from opencensus.trace import samplers
34from opencensus.trace import span
35from opencensus.trace import span_context as span_context_module
36from opencensus.trace import span_data as span_data_module
37from opencensus.trace import status
38from opencensus.trace import time_event
39from opencensus.trace import trace_options
40from opencensus.trace import tracer
41
42# 60s is the default time for open census to call export.
43CENSUS_UPLOAD_INTERVAL_SECS = int(
44    os.environ.get("GRPC_PYTHON_CENSUS_EXPORT_UPLOAD_INTERVAL_SECS", 20)
45)
46
47
48class StackDriverAsyncTransport(async_.AsyncTransport):
49    """Wrapper class used to pass wait_period.
50
51    This is required because current StackDriver Tracing Exporter doesn't allow
52    us pass wait_period to AsyncTransport directly.
53
54    Args:
55      exporter: An opencensus.trace.base_exporter.Exporter object.
56    """
57
58    def __init__(self, exporter):
59        super().__init__(exporter, wait_period=CENSUS_UPLOAD_INTERVAL_SECS)
60
61
62class OpenCensusExporter(_observability.Exporter):
63    config: _observability_config.GcpObservabilityConfig
64    default_labels: Optional[Mapping[str, str]]
65    project_id: str
66    tracer: Optional[tracer.Tracer]
67    stats_recorder: Optional[StatsRecorder]
68    view_manager: Optional[ViewManager]
69
70    def __init__(self, config: _observability_config.GcpObservabilityConfig):
71        self.config = config.get()
72        self.default_labels = self.config.labels
73        self.project_id = self.config.project_id
74        self.tracer = None
75        self.stats_recorder = None
76        self.view_manager = None
77        self._setup_open_census_stackdriver_exporter()
78
79    def _setup_open_census_stackdriver_exporter(self) -> None:
80        if self.config.stats_enabled:
81            stats = stats_module.stats
82            self.stats_recorder = stats.stats_recorder
83            self.view_manager = stats.view_manager
84            # If testing locally please add resource="global" to Options, otherwise
85            # StackDriver might override project_id based on detected resource.
86            options = stats_exporter.Options(project_id=self.project_id)
87            metrics_exporter = stats_exporter.new_stats_exporter(
88                options, interval=CENSUS_UPLOAD_INTERVAL_SECS
89            )
90            self.view_manager.register_exporter(metrics_exporter)
91            self._register_open_census_views()
92
93        if self.config.tracing_enabled:
94            current_tracer = execution_context.get_opencensus_tracer()
95            trace_id = current_tracer.span_context.trace_id
96            span_id = current_tracer.span_context.span_id
97            if not span_id:
98                span_id = span_context_module.generate_span_id()
99            span_context = span_context_module.SpanContext(
100                trace_id=trace_id, span_id=span_id
101            )
102            # Create and Saves Tracer and Sampler to ContextVar
103            sampler = samplers.ProbabilitySampler(
104                rate=self.config.sampling_rate
105            )
106            self.trace_exporter = trace_exporter.StackdriverExporter(
107                project_id=self.project_id,
108                transport=StackDriverAsyncTransport,
109            )
110            self.tracer = tracer.Tracer(
111                sampler=sampler,
112                span_context=span_context,
113                exporter=self.trace_exporter,
114            )
115
116    def export_stats_data(
117        self, stats_data: List[_observability.StatsData]
118    ) -> None:
119        if not self.config.stats_enabled:
120            return
121        for data in stats_data:
122            measure = _views.METRICS_NAME_TO_MEASURE.get(data.name, None)
123            if not measure:
124                continue
125            # Create a measurement map for each metric, otherwise metrics will
126            # be overridden instead of accumulate.
127            measurement_map = self.stats_recorder.new_measurement_map()
128            # Add data label to default labels.
129            labels = data.labels
130            labels.update(self.default_labels)
131            tag_map = TagMap()
132            for key, value in labels.items():
133                tag_map.insert(TagKey(key), TagValue(value))
134
135            if data.measure_double:
136                measurement_map.measure_float_put(measure, data.value_float)
137            else:
138                measurement_map.measure_int_put(measure, data.value_int)
139            measurement_map.record(tag_map)
140
141    def export_tracing_data(
142        self, tracing_data: List[_observability.TracingData]
143    ) -> None:
144        if not self.config.tracing_enabled:
145            return
146        for span_data in tracing_data:
147            # Only traced data will be exported, thus TraceOptions=1.
148            span_context = span_context_module.SpanContext(
149                trace_id=span_data.trace_id,
150                span_id=span_data.span_id,
151                trace_options=trace_options.TraceOptions(1),
152            )
153            span_datas = _get_span_data(
154                span_data, span_context, self.default_labels
155            )
156            self.trace_exporter.export(span_datas)
157
158    def _register_open_census_views(self) -> None:
159        # Client
160        self.view_manager.register_view(
161            _views.client_started_rpcs(self.default_labels)
162        )
163        self.view_manager.register_view(
164            _views.client_completed_rpcs(self.default_labels)
165        )
166        self.view_manager.register_view(
167            _views.client_roundtrip_latency(self.default_labels)
168        )
169        self.view_manager.register_view(
170            _views.client_api_latency(self.default_labels)
171        )
172        self.view_manager.register_view(
173            _views.client_sent_compressed_message_bytes_per_rpc(
174                self.default_labels
175            )
176        )
177        self.view_manager.register_view(
178            _views.client_received_compressed_message_bytes_per_rpc(
179                self.default_labels
180            )
181        )
182
183        # Server
184        self.view_manager.register_view(
185            _views.server_started_rpcs(self.default_labels)
186        )
187        self.view_manager.register_view(
188            _views.server_completed_rpcs(self.default_labels)
189        )
190        self.view_manager.register_view(
191            _views.server_sent_compressed_message_bytes_per_rpc(
192                self.default_labels
193            )
194        )
195        self.view_manager.register_view(
196            _views.server_received_compressed_message_bytes_per_rpc(
197                self.default_labels
198            )
199        )
200        self.view_manager.register_view(
201            _views.server_server_latency(self.default_labels)
202        )
203
204
205def _get_span_annotations(
206    span_annotations: List[Tuple[str, str]]
207) -> List[time_event.Annotation]:
208    annotations = []
209
210    for time_stamp, description in span_annotations:
211        time = datetime.fromisoformat(time_stamp)
212        annotations.append(time_event.Annotation(time, description))
213
214    return annotations
215
216
217# pylint: disable=too-many-return-statements
218# pylint: disable=too-many-branches
219def _status_to_span_status(span_status: str) -> Optional[status.Status]:
220    if status == "OK":
221        return status.Status(code_pb2.OK, message=span_status)
222    elif status == "CANCELLED":
223        return status.Status(code_pb2.CANCELLED, message=span_status)
224    elif status == "UNKNOWN":
225        return status.Status(code_pb2.UNKNOWN, message=span_status)
226    elif status == "INVALID_ARGUMENT":
227        return status.Status(code_pb2.INVALID_ARGUMENT, message=span_status)
228    elif status == "DEADLINE_EXCEEDED":
229        return status.Status(code_pb2.DEADLINE_EXCEEDED, message=span_status)
230    elif status == "NOT_FOUND":
231        return status.Status(code_pb2.NOT_FOUND, message=span_status)
232    elif status == "ALREADY_EXISTS":
233        return status.Status(code_pb2.ALREADY_EXISTS, message=span_status)
234    elif status == "PERMISSION_DENIED":
235        return status.Status(code_pb2.PERMISSION_DENIED, message=span_status)
236    elif status == "UNAUTHENTICATED":
237        return status.Status(code_pb2.UNAUTHENTICATED, message=span_status)
238    elif status == "RESOURCE_EXHAUSTED":
239        return status.Status(code_pb2.RESOURCE_EXHAUSTED, message=span_status)
240    elif status == "FAILED_PRECONDITION":
241        return status.Status(code_pb2.FAILED_PRECONDITION, message=span_status)
242    elif status == "ABORTED":
243        return status.Status(code_pb2.ABORTED, message=span_status)
244    elif status == "OUT_OF_RANGE":
245        return status.Status(code_pb2.OUT_OF_RANGE, message=span_status)
246    elif status == "UNIMPLEMENTED":
247        return status.Status(code_pb2.UNIMPLEMENTED, message=span_status)
248    elif status == "INTERNAL":
249        return status.Status(code_pb2.INTERNAL, message=span_status)
250    elif status == "UNAVAILABLE":
251        return status.Status(code_pb2.UNAVAILABLE, message=span_status)
252    elif status == "DATA_LOSS":
253        return status.Status(code_pb2.DATA_LOSS, message=span_status)
254    else:
255        return None
256
257
258def _get_span_data(
259    span_data: _observability.TracingData,
260    span_context: span_context_module.SpanContext,
261    labels: Mapping[str, str],
262) -> List[span_data_module.SpanData]:
263    """Extracts a list of SpanData tuples from a span.
264
265    Args:
266    span_data: _observability.TracingData to convert.
267    span_context: The context related to the span_data.
268    labels: Labels to be added to SpanData.
269
270    Returns:
271    A list of opencensus.trace.span_data.SpanData.
272    """
273    span_attributes = span_data.span_labels
274    span_attributes.update(labels)
275    span_status = _status_to_span_status(span_data.status)
276    span_annotations = _get_span_annotations(span_data.span_annotations)
277    span_datas = [
278        span_data_module.SpanData(
279            name=span_data.name,
280            context=span_context,
281            span_id=span_data.span_id,
282            parent_span_id=span_data.parent_span_id
283            if span_data.parent_span_id
284            else None,
285            attributes=span_attributes,
286            start_time=span_data.start_time,
287            end_time=span_data.end_time,
288            child_span_count=span_data.child_span_count,
289            stack_trace=None,
290            annotations=span_annotations,
291            message_events=None,
292            links=None,
293            status=span_status,
294            same_process_as_parent_span=True
295            if span_data.parent_span_id
296            else None,
297            span_kind=span.SpanKind.UNSPECIFIED,
298        )
299    ]
300
301    return span_datas
302