1# Copyright 2024 The gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import argparse 16from concurrent import futures 17import logging 18from typing import Sequence 19 20import grpc 21from grpc_csm_observability import CsmOpenTelemetryPlugin 22from opentelemetry.exporter.prometheus import PrometheusMetricReader 23from opentelemetry.sdk.metrics import Histogram 24from opentelemetry.sdk.metrics import MeterProvider 25from opentelemetry.sdk.metrics import view 26from prometheus_client import start_http_server 27 28from examples.python.observability.csm import helloworld_pb2 29from examples.python.observability.csm import helloworld_pb2_grpc 30 31_LISTEN_HOST = "0.0.0.0" 32_THREAD_POOL_SIZE = 256 33 34logger = logging.getLogger() 35console_handler = logging.StreamHandler() 36formatter = logging.Formatter(fmt="%(asctime)s: %(levelname)-8s %(message)s") 37console_handler.setFormatter(formatter) 38logger.addHandler(console_handler) 39 40 41class Greeter(helloworld_pb2_grpc.GreeterServicer): 42 def SayHello(self, request, context): 43 message = request.name 44 return helloworld_pb2.HelloReply(message=f"Hello {message}") 45 46 47def _run( 48 port: int, 49 secure_mode: bool, 50 server_id: str, 51 prometheus_endpoint: int, 52) -> None: 53 csm_plugin = _prepare_csm_observability_plugin(prometheus_endpoint) 54 csm_plugin.register_global() 55 server = grpc.server( 56 futures.ThreadPoolExecutor(max_workers=_THREAD_POOL_SIZE), 57 xds=secure_mode, 58 ) 59 _configure_test_server(server, port, secure_mode, server_id) 60 server.start() 61 logger.info("Test server listening on port %d", port) 62 server.wait_for_termination() 63 csm_plugin.deregister_global() 64 65 66def _prepare_csm_observability_plugin( 67 prometheus_endpoint: int, 68) -> CsmOpenTelemetryPlugin: 69 # Start Prometheus client 70 start_http_server(port=prometheus_endpoint, addr="0.0.0.0") 71 reader = PrometheusMetricReader() 72 meter_provider = MeterProvider( 73 metric_readers=[reader], views=_create_views() 74 ) 75 csm_plugin = CsmOpenTelemetryPlugin( 76 meter_provider=meter_provider, 77 ) 78 return csm_plugin 79 80 81def _create_views() -> Sequence[view.View]: 82 """Create a list of views with config for specific metrics.""" 83 latency_boundaries = [ 84 0, 85 0.00001, 86 0.00005, 87 0.0001, 88 0.0003, 89 0.0006, 90 0.0008, 91 0.001, 92 0.002, 93 0.003, 94 0.004, 95 0.005, 96 0.006, 97 0.008, 98 0.01, 99 0.013, 100 0.016, 101 0.02, 102 0.025, 103 0.03, 104 0.04, 105 0.05, 106 0.065, 107 0.08, 108 0.1, 109 0.13, 110 0.16, 111 0.2, 112 0.25, 113 0.3, 114 0.4, 115 0.5, 116 0.65, 117 0.8, 118 1, 119 2, 120 5, 121 10, 122 20, 123 50, 124 100, 125 ] 126 size_boundaries = [ 127 0, 128 1024, 129 2048, 130 4096, 131 16384, 132 65536, 133 262144, 134 1048576, 135 4194304, 136 16777216, 137 67108864, 138 268435456, 139 1073741824, 140 4294967296, 141 ] 142 return [ 143 view.View( 144 instrument_type=Histogram, 145 instrument_unit="s", 146 aggregation=view.ExplicitBucketHistogramAggregation( 147 # Boundaries as defined in gRFC. See: 148 # https://github.com/grpc/proposal/blob/master/A66-otel-stats.md 149 boundaries=latency_boundaries 150 ), 151 ), 152 view.View( 153 instrument_type=Histogram, 154 instrument_unit="By", 155 aggregation=view.ExplicitBucketHistogramAggregation( 156 # Boundaries as defined in gRFC. See: 157 # https://github.com/grpc/proposal/blob/master/A66-otel-stats.md 158 boundaries=size_boundaries 159 ), 160 ), 161 ] 162 163 164def _configure_test_server( 165 server: grpc.Server, port: int, secure_mode: bool, server_id: str 166) -> None: 167 helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server) 168 listen_address = f"{_LISTEN_HOST}:{port}" 169 if not secure_mode: 170 server.add_insecure_port(listen_address) 171 else: 172 logger.info("Running with xDS Server credentials") 173 server_fallback_creds = grpc.insecure_server_credentials() 174 server_creds = grpc.xds_server_credentials(server_fallback_creds) 175 server.add_secure_port(listen_address, server_creds) 176 177 178def bool_arg(arg: str) -> bool: 179 if arg.lower() in ("true", "yes", "y"): 180 return True 181 elif arg.lower() in ("false", "no", "n"): 182 return False 183 else: 184 raise argparse.ArgumentTypeError(f"Could not parse '{arg}' as a bool.") 185 186 187if __name__ == "__main__": 188 logging.basicConfig() 189 logger.setLevel(logging.INFO) 190 parser = argparse.ArgumentParser( 191 description="Run Python CSM Observability Test server." 192 ) 193 parser.add_argument( 194 "--port", type=int, default=50051, help="Port for test server." 195 ) 196 parser.add_argument( 197 "--secure_mode", 198 type=bool_arg, 199 default="False", 200 help="If specified, uses xDS to retrieve server credentials.", 201 ) 202 parser.add_argument( 203 "--server_id", 204 type=str, 205 default="python_server", 206 help="The server ID to return in responses.", 207 ) 208 parser.add_argument( 209 "--prometheus_endpoint", 210 type=int, 211 default=9464, 212 help="Port for servers besides test server.", 213 ) 214 args = parser.parse_args() 215 _run( 216 args.port, 217 args.secure_mode, 218 args.server_id, 219 args.prometheus_endpoint, 220 ) 221