1# Copyright 2024 The gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import argparse 16import logging 17import time 18from typing import Sequence 19 20import grpc 21from grpc_csm_observability import CsmOpenTelemetryPlugin 22from opentelemetry.exporter.prometheus import PrometheusMetricReader 23from opentelemetry.sdk.metrics import Histogram 24from opentelemetry.sdk.metrics import MeterProvider 25from opentelemetry.sdk.metrics import view 26from prometheus_client import start_http_server 27 28from examples.python.observability.csm import helloworld_pb2 29from examples.python.observability.csm import helloworld_pb2_grpc 30 31logger = logging.getLogger() 32console_handler = logging.StreamHandler() 33formatter = logging.Formatter(fmt="%(asctime)s: %(levelname)-8s %(message)s") 34console_handler.setFormatter(formatter) 35logger.addHandler(console_handler) 36 37 38def _run(target: str, secure_mode: bool, prometheus_endpoint: int): 39 csm_plugin = _prepare_csm_observability_plugin(prometheus_endpoint) 40 csm_plugin.register_global() 41 if secure_mode: 42 fallback_creds = grpc.experimental.insecure_channel_credentials() 43 channel_creds = grpc.xds_channel_credentials(fallback_creds) 44 channel = grpc.secure_channel(target, channel_creds) 45 else: 46 channel = grpc.insecure_channel(target) 47 with channel: 48 stub = helloworld_pb2_grpc.GreeterStub(channel) 49 # Continuously send RPCs every second. 50 while True: 51 request = helloworld_pb2.HelloRequest(name="You") 52 logger.info("Sending request to server") 53 try: 54 response = stub.SayHello(request) 55 print(f"Greeter client received: {response.message}") 56 time.sleep(1) 57 except Exception: # pylint: disable=broad-except 58 logger.info( 59 "Request failed, this is normal during initial setup." 60 ) 61 # Deregister is not called in this example, but this is required to clean up. 62 csm_plugin.deregister_global() 63 64 65def _prepare_csm_observability_plugin( 66 prometheus_endpoint: int, 67) -> CsmOpenTelemetryPlugin: 68 # Start Prometheus client 69 start_http_server(port=prometheus_endpoint, addr="0.0.0.0") 70 reader = PrometheusMetricReader() 71 meter_provider = MeterProvider( 72 metric_readers=[reader], views=_create_views() 73 ) 74 csm_plugin = CsmOpenTelemetryPlugin( 75 meter_provider=meter_provider, 76 ) 77 return csm_plugin 78 79 80def _create_views() -> Sequence[view.View]: 81 """Create a list of views with config for specific metrics.""" 82 latency_boundaries = [ 83 0, 84 0.00001, 85 0.00005, 86 0.0001, 87 0.0003, 88 0.0006, 89 0.0008, 90 0.001, 91 0.002, 92 0.003, 93 0.004, 94 0.005, 95 0.006, 96 0.008, 97 0.01, 98 0.013, 99 0.016, 100 0.02, 101 0.025, 102 0.03, 103 0.04, 104 0.05, 105 0.065, 106 0.08, 107 0.1, 108 0.13, 109 0.16, 110 0.2, 111 0.25, 112 0.3, 113 0.4, 114 0.5, 115 0.65, 116 0.8, 117 1, 118 2, 119 5, 120 10, 121 20, 122 50, 123 100, 124 ] 125 size_boundaries = [ 126 0, 127 1024, 128 2048, 129 4096, 130 16384, 131 65536, 132 262144, 133 1048576, 134 4194304, 135 16777216, 136 67108864, 137 268435456, 138 1073741824, 139 4294967296, 140 ] 141 return [ 142 view.View( 143 instrument_type=Histogram, 144 instrument_unit="s", 145 aggregation=view.ExplicitBucketHistogramAggregation( 146 # Boundaries as defined in gRFC. See: 147 # https://github.com/grpc/proposal/blob/master/A66-otel-stats.md 148 boundaries=latency_boundaries 149 ), 150 ), 151 view.View( 152 instrument_type=Histogram, 153 instrument_unit="By", 154 aggregation=view.ExplicitBucketHistogramAggregation( 155 # Boundaries as defined in gRFC. See: 156 # https://github.com/grpc/proposal/blob/master/A66-otel-stats.md 157 boundaries=size_boundaries 158 ), 159 ), 160 ] 161 162 163def bool_arg(arg: str) -> bool: 164 if arg.lower() in ("true", "yes", "y"): 165 return True 166 elif arg.lower() in ("false", "no", "n"): 167 return False 168 else: 169 raise argparse.ArgumentTypeError(f"Could not parse '{arg}' as a bool.") 170 171 172if __name__ == "__main__": 173 logging.basicConfig() 174 logger.setLevel(logging.INFO) 175 parser = argparse.ArgumentParser( 176 description="Run Python CSM Observability Test client." 177 ) 178 parser.add_argument( 179 "--target", 180 default="localhost:50051", 181 help="The address of the server.", 182 ) 183 parser.add_argument( 184 "--secure_mode", 185 default="False", 186 type=bool_arg, 187 help="If specified, uses xDS credentials to connect to the server.", 188 ) 189 parser.add_argument( 190 "--prometheus_endpoint", 191 type=int, 192 default=9464, 193 help="Port for servers besides test server.", 194 ) 195 args = parser.parse_args() 196 _run(args.target, args.secure_mode, args.prometheus_endpoint) 197