# Copyright 2023 gRPC authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from typing import Mapping from grpc_observability import _measures from grpc_observability._cyobservability import MetricsName from opencensus.stats import aggregation from opencensus.stats import view as view_module from opencensus.tags.tag_key import TagKey METRICS_NAME_TO_MEASURE = { MetricsName.CLIENT_STARTED_RPCS: _measures.CLIENT_STARTED_RPCS_MEASURE, MetricsName.CLIENT_ROUNDTRIP_LATENCY: _measures.CLIENT_ROUNDTRIP_LATENCY_MEASURE, MetricsName.CLIENT_COMPLETED_RPC: _measures.CLIENT_COMPLETED_RPCS_MEASURE, MetricsName.CLIENT_API_LATENCY: _measures.CLIENT_API_LATENCY_MEASURE, MetricsName.CLIENT_SEND_BYTES_PER_RPC: _measures.CLIENT_SEND_BYTES_PER_RPC_MEASURE, MetricsName.CLIENT_RECEIVED_BYTES_PER_RPC: _measures.CLIENT_RECEIVED_BYTES_PER_RPC_MEASURE, MetricsName.SERVER_STARTED_RPCS: _measures.SERVER_STARTED_RPCS_MEASURE, MetricsName.SERVER_SENT_BYTES_PER_RPC: _measures.SERVER_SENT_BYTES_PER_RPC_MEASURE, MetricsName.SERVER_RECEIVED_BYTES_PER_RPC: _measures.SERVER_RECEIVED_BYTES_PER_RPC_MEASURE, MetricsName.SERVER_SERVER_LATENCY: _measures.SERVER_SERVER_LATENCY_MEASURE, MetricsName.SERVER_COMPLETED_RPC: _measures.SERVER_COMPLETED_RPCS_MEASURE, } # These measure definitions should be kept in sync across opencensus # implementations--see # https://github.com/census-instrumentation/opencensus-java/blob/master/contrib/grpc_metrics/src/main/java/io/opencensus/contrib/grpc/metrics/RpcMeasureConstants.java. def client_method_tag_key(): return TagKey("grpc_client_method") def client_status_tag_key(): return TagKey("grpc_client_status") def server_method_tag_key(): return TagKey("grpc_server_method") def server_status_tag_key(): return TagKey("server_status_tag_key") def count_distribution_aggregation() -> aggregation.DistributionAggregation: exponential_boundaries = _get_exponential_boundaries(17, 1.0, 2.0) return aggregation.DistributionAggregation(exponential_boundaries) def bytes_distribution_aggregation() -> aggregation.DistributionAggregation: return aggregation.DistributionAggregation( [ 1024, 2048, 4096, 16384, 65536, 262144, 1048576, 4194304, 16777216, 67108864, 268435456, 1073741824, 4294967296, ] ) def millis_distribution_aggregation() -> aggregation.DistributionAggregation: return aggregation.DistributionAggregation( [ 0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000, ] ) # Client def client_started_rpcs(labels: Mapping[str, str]) -> view_module.View: view = view_module.View( "grpc.io/client/started_rpcs", "The count of RPCs ever received at the server, including RPCs" + " that have not completed.", [TagKey(key) for key in labels.keys()] + [client_method_tag_key()], _measures.CLIENT_STARTED_RPCS_MEASURE, aggregation.CountAggregation(), ) return view def client_completed_rpcs(labels: Mapping[str, str]) -> view_module.View: view = view_module.View( "grpc.io/client/completed_rpcs", "The total count of RPCs completed, for example, when a response" + " is sent by the server.", [TagKey(key) for key in labels.keys()] + [client_method_tag_key(), client_status_tag_key()], _measures.CLIENT_COMPLETED_RPCS_MEASURE, aggregation.CountAggregation(), ) return view def client_roundtrip_latency(labels: Mapping[str, str]) -> view_module.View: view = view_module.View( "grpc.io/client/roundtrip_latency", "End-to-end time taken to complete an RPC attempt including the time" + " it takes to pick a subchannel.", [TagKey(key) for key in labels.keys()] + [client_method_tag_key()], _measures.CLIENT_ROUNDTRIP_LATENCY_MEASURE, millis_distribution_aggregation(), ) return view def client_api_latency(labels: Mapping[str, str]) -> view_module.View: view = view_module.View( "grpc.io/client/api_latency", "The total time taken by the gRPC library to complete an RPC from" + " the application's perspective.", [TagKey(key) for key in labels.keys()] + [client_method_tag_key(), client_status_tag_key()], _measures.CLIENT_API_LATENCY_MEASURE, millis_distribution_aggregation(), ) return view def client_sent_compressed_message_bytes_per_rpc( labels: Mapping[str, str] ) -> view_module.View: view = view_module.View( "grpc.io/client/sent_compressed_message_bytes_per_rpc", "The total bytes (compressed, not encrypted) sent across all" + " request messages per RPC attempt.", [TagKey(key) for key in labels.keys()] + [client_method_tag_key(), client_status_tag_key()], _measures.CLIENT_SEND_BYTES_PER_RPC_MEASURE, bytes_distribution_aggregation(), ) return view def client_received_compressed_message_bytes_per_rpc( labels: Mapping[str, str] ) -> view_module.View: view = view_module.View( "grpc.io/client/received_compressed_message_bytes_per_rpc", "The total bytes (compressed, not encrypted) received across" + " all response messages per RPC attempt.", [TagKey(key) for key in labels.keys()] + [client_method_tag_key(), client_status_tag_key()], _measures.CLIENT_RECEIVED_BYTES_PER_RPC_MEASURE, bytes_distribution_aggregation(), ) return view # Server def server_started_rpcs(labels: Mapping[str, str]) -> view_module.View: view = view_module.View( "grpc.io/server/started_rpcs", "The count of RPCs ever received at the server, including RPCs" + " that have not completed.", [TagKey(key) for key in labels.keys()] + [server_method_tag_key()], _measures.SERVER_STARTED_RPCS_MEASURE, aggregation.CountAggregation(), ) return view def server_completed_rpcs(labels: Mapping[str, str]) -> view_module.View: view = view_module.View( "grpc.io/server/completed_rpcs", "The total count of RPCs completed, for example, when a response" + " is sent by the server.", [TagKey(key) for key in labels.keys()] + [server_method_tag_key(), server_status_tag_key()], _measures.SERVER_COMPLETED_RPCS_MEASURE, aggregation.CountAggregation(), ) return view def server_sent_compressed_message_bytes_per_rpc( labels: Mapping[str, str] ) -> view_module.View: view = view_module.View( "grpc.io/server/sent_compressed_message_bytes_per_rpc", "The total bytes (compressed not encrypted) sent across all response" + " messages per RPC.", [TagKey(key) for key in labels.keys()] + [server_method_tag_key(), server_status_tag_key()], _measures.SERVER_SENT_BYTES_PER_RPC_MEASURE, bytes_distribution_aggregation(), ) return view def server_received_compressed_message_bytes_per_rpc( labels: Mapping[str, str] ) -> view_module.View: view = view_module.View( "grpc.io/server/received_compressed_message_bytes_per_rpc", "The total bytes (compressed not encrypted) received across all" + " request messages per RPC.", [TagKey(key) for key in labels.keys()] + [server_method_tag_key(), server_status_tag_key()], _measures.SERVER_RECEIVED_BYTES_PER_RPC_MEASURE, bytes_distribution_aggregation(), ) return view def server_server_latency(labels: Mapping[str, str]) -> view_module.View: view = view_module.View( "grpc.io/server/server_latency", "The total time taken by an RPC from server transport's" + " (HTTP2 / inproc / cronet) perspective.", [TagKey(key) for key in labels.keys()] + [server_method_tag_key(), server_status_tag_key()], _measures.SERVER_SERVER_LATENCY_MEASURE, millis_distribution_aggregation(), ) return view def _get_exponential_boundaries( num_finite_buckets: int, scale: float, grrowth_factor: float ) -> list: boundaries = [] upper_bound = scale for _ in range(num_finite_buckets): boundaries.append(upper_bound) upper_bound *= grrowth_factor return boundaries