1# Copyright 2023 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15from typing import Mapping 16 17from grpc_observability import _measures 18from grpc_observability._cyobservability import MetricsName 19from opencensus.stats import aggregation 20from opencensus.stats import view as view_module 21from opencensus.tags.tag_key import TagKey 22 23METRICS_NAME_TO_MEASURE = { 24 MetricsName.CLIENT_STARTED_RPCS: _measures.CLIENT_STARTED_RPCS_MEASURE, 25 MetricsName.CLIENT_ROUNDTRIP_LATENCY: _measures.CLIENT_ROUNDTRIP_LATENCY_MEASURE, 26 MetricsName.CLIENT_COMPLETED_RPC: _measures.CLIENT_COMPLETED_RPCS_MEASURE, 27 MetricsName.CLIENT_API_LATENCY: _measures.CLIENT_API_LATENCY_MEASURE, 28 MetricsName.CLIENT_SEND_BYTES_PER_RPC: _measures.CLIENT_SEND_BYTES_PER_RPC_MEASURE, 29 MetricsName.CLIENT_RECEIVED_BYTES_PER_RPC: _measures.CLIENT_RECEIVED_BYTES_PER_RPC_MEASURE, 30 MetricsName.SERVER_STARTED_RPCS: _measures.SERVER_STARTED_RPCS_MEASURE, 31 MetricsName.SERVER_SENT_BYTES_PER_RPC: _measures.SERVER_SENT_BYTES_PER_RPC_MEASURE, 32 MetricsName.SERVER_RECEIVED_BYTES_PER_RPC: _measures.SERVER_RECEIVED_BYTES_PER_RPC_MEASURE, 33 MetricsName.SERVER_SERVER_LATENCY: _measures.SERVER_SERVER_LATENCY_MEASURE, 34 MetricsName.SERVER_COMPLETED_RPC: _measures.SERVER_COMPLETED_RPCS_MEASURE, 35} 36 37 38# These measure definitions should be kept in sync across opencensus 39# implementations--see 40# https://github.com/census-instrumentation/opencensus-java/blob/master/contrib/grpc_metrics/src/main/java/io/opencensus/contrib/grpc/metrics/RpcMeasureConstants.java. 41def client_method_tag_key(): 42 return TagKey("grpc_client_method") 43 44 45def client_status_tag_key(): 46 return TagKey("grpc_client_status") 47 48 49def server_method_tag_key(): 50 return TagKey("grpc_server_method") 51 52 53def server_status_tag_key(): 54 return TagKey("server_status_tag_key") 55 56 57def count_distribution_aggregation() -> aggregation.DistributionAggregation: 58 exponential_boundaries = _get_exponential_boundaries(17, 1.0, 2.0) 59 return aggregation.DistributionAggregation(exponential_boundaries) 60 61 62def bytes_distribution_aggregation() -> aggregation.DistributionAggregation: 63 return aggregation.DistributionAggregation( 64 [ 65 1024, 66 2048, 67 4096, 68 16384, 69 65536, 70 262144, 71 1048576, 72 4194304, 73 16777216, 74 67108864, 75 268435456, 76 1073741824, 77 4294967296, 78 ] 79 ) 80 81 82def millis_distribution_aggregation() -> aggregation.DistributionAggregation: 83 return aggregation.DistributionAggregation( 84 [ 85 0.01, 86 0.05, 87 0.1, 88 0.3, 89 0.6, 90 0.8, 91 1, 92 2, 93 3, 94 4, 95 5, 96 6, 97 8, 98 10, 99 13, 100 16, 101 20, 102 25, 103 30, 104 40, 105 50, 106 65, 107 80, 108 100, 109 130, 110 160, 111 200, 112 250, 113 300, 114 400, 115 500, 116 650, 117 800, 118 1000, 119 2000, 120 5000, 121 10000, 122 20000, 123 50000, 124 100000, 125 ] 126 ) 127 128 129# Client 130def client_started_rpcs(labels: Mapping[str, str]) -> view_module.View: 131 view = view_module.View( 132 "grpc.io/client/started_rpcs", 133 "The count of RPCs ever received at the server, including RPCs" 134 + " that have not completed.", 135 [TagKey(key) for key in labels.keys()] + [client_method_tag_key()], 136 _measures.CLIENT_STARTED_RPCS_MEASURE, 137 aggregation.CountAggregation(), 138 ) 139 return view 140 141 142def client_completed_rpcs(labels: Mapping[str, str]) -> view_module.View: 143 view = view_module.View( 144 "grpc.io/client/completed_rpcs", 145 "The total count of RPCs completed, for example, when a response" 146 + " is sent by the server.", 147 [TagKey(key) for key in labels.keys()] 148 + [client_method_tag_key(), client_status_tag_key()], 149 _measures.CLIENT_COMPLETED_RPCS_MEASURE, 150 aggregation.CountAggregation(), 151 ) 152 return view 153 154 155def client_roundtrip_latency(labels: Mapping[str, str]) -> view_module.View: 156 view = view_module.View( 157 "grpc.io/client/roundtrip_latency", 158 "End-to-end time taken to complete an RPC attempt including the time" 159 + " it takes to pick a subchannel.", 160 [TagKey(key) for key in labels.keys()] + [client_method_tag_key()], 161 _measures.CLIENT_ROUNDTRIP_LATENCY_MEASURE, 162 millis_distribution_aggregation(), 163 ) 164 return view 165 166 167def client_api_latency(labels: Mapping[str, str]) -> view_module.View: 168 view = view_module.View( 169 "grpc.io/client/api_latency", 170 "The total time taken by the gRPC library to complete an RPC from" 171 + " the application's perspective.", 172 [TagKey(key) for key in labels.keys()] 173 + [client_method_tag_key(), client_status_tag_key()], 174 _measures.CLIENT_API_LATENCY_MEASURE, 175 millis_distribution_aggregation(), 176 ) 177 return view 178 179 180def client_sent_compressed_message_bytes_per_rpc( 181 labels: Mapping[str, str] 182) -> view_module.View: 183 view = view_module.View( 184 "grpc.io/client/sent_compressed_message_bytes_per_rpc", 185 "The total bytes (compressed, not encrypted) sent across all" 186 + " request messages per RPC attempt.", 187 [TagKey(key) for key in labels.keys()] 188 + [client_method_tag_key(), client_status_tag_key()], 189 _measures.CLIENT_SEND_BYTES_PER_RPC_MEASURE, 190 bytes_distribution_aggregation(), 191 ) 192 return view 193 194 195def client_received_compressed_message_bytes_per_rpc( 196 labels: Mapping[str, str] 197) -> view_module.View: 198 view = view_module.View( 199 "grpc.io/client/received_compressed_message_bytes_per_rpc", 200 "The total bytes (compressed, not encrypted) received across" 201 + " all response messages per RPC attempt.", 202 [TagKey(key) for key in labels.keys()] 203 + [client_method_tag_key(), client_status_tag_key()], 204 _measures.CLIENT_RECEIVED_BYTES_PER_RPC_MEASURE, 205 bytes_distribution_aggregation(), 206 ) 207 return view 208 209 210# Server 211def server_started_rpcs(labels: Mapping[str, str]) -> view_module.View: 212 view = view_module.View( 213 "grpc.io/server/started_rpcs", 214 "The count of RPCs ever received at the server, including RPCs" 215 + " that have not completed.", 216 [TagKey(key) for key in labels.keys()] + [server_method_tag_key()], 217 _measures.SERVER_STARTED_RPCS_MEASURE, 218 aggregation.CountAggregation(), 219 ) 220 return view 221 222 223def server_completed_rpcs(labels: Mapping[str, str]) -> view_module.View: 224 view = view_module.View( 225 "grpc.io/server/completed_rpcs", 226 "The total count of RPCs completed, for example, when a response" 227 + " is sent by the server.", 228 [TagKey(key) for key in labels.keys()] 229 + [server_method_tag_key(), server_status_tag_key()], 230 _measures.SERVER_COMPLETED_RPCS_MEASURE, 231 aggregation.CountAggregation(), 232 ) 233 return view 234 235 236def server_sent_compressed_message_bytes_per_rpc( 237 labels: Mapping[str, str] 238) -> view_module.View: 239 view = view_module.View( 240 "grpc.io/server/sent_compressed_message_bytes_per_rpc", 241 "The total bytes (compressed not encrypted) sent across all response" 242 + " messages per RPC.", 243 [TagKey(key) for key in labels.keys()] 244 + [server_method_tag_key(), server_status_tag_key()], 245 _measures.SERVER_SENT_BYTES_PER_RPC_MEASURE, 246 bytes_distribution_aggregation(), 247 ) 248 return view 249 250 251def server_received_compressed_message_bytes_per_rpc( 252 labels: Mapping[str, str] 253) -> view_module.View: 254 view = view_module.View( 255 "grpc.io/server/received_compressed_message_bytes_per_rpc", 256 "The total bytes (compressed not encrypted) received across all" 257 + " request messages per RPC.", 258 [TagKey(key) for key in labels.keys()] 259 + [server_method_tag_key(), server_status_tag_key()], 260 _measures.SERVER_RECEIVED_BYTES_PER_RPC_MEASURE, 261 bytes_distribution_aggregation(), 262 ) 263 return view 264 265 266def server_server_latency(labels: Mapping[str, str]) -> view_module.View: 267 view = view_module.View( 268 "grpc.io/server/server_latency", 269 "The total time taken by an RPC from server transport's" 270 + " (HTTP2 / inproc / cronet) perspective.", 271 [TagKey(key) for key in labels.keys()] 272 + [server_method_tag_key(), server_status_tag_key()], 273 _measures.SERVER_SERVER_LATENCY_MEASURE, 274 millis_distribution_aggregation(), 275 ) 276 return view 277 278 279def _get_exponential_boundaries( 280 num_finite_buckets: int, scale: float, grrowth_factor: float 281) -> list: 282 boundaries = [] 283 upper_bound = scale 284 for _ in range(num_finite_buckets): 285 boundaries.append(upper_bound) 286 upper_bound *= grrowth_factor 287 return boundaries 288