• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2024 The gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import argparse
16from concurrent import futures
17import logging
18from typing import Sequence
19
20import grpc
21from grpc_csm_observability import CsmOpenTelemetryPlugin
22from opentelemetry.exporter.prometheus import PrometheusMetricReader
23from opentelemetry.sdk.metrics import Histogram
24from opentelemetry.sdk.metrics import MeterProvider
25from opentelemetry.sdk.metrics import view
26from prometheus_client import start_http_server
27
28from examples.python.observability.csm import helloworld_pb2
29from examples.python.observability.csm import helloworld_pb2_grpc
30
31_LISTEN_HOST = "0.0.0.0"
32_THREAD_POOL_SIZE = 256
33
34logger = logging.getLogger()
35console_handler = logging.StreamHandler()
36formatter = logging.Formatter(fmt="%(asctime)s: %(levelname)-8s %(message)s")
37console_handler.setFormatter(formatter)
38logger.addHandler(console_handler)
39
40
41class Greeter(helloworld_pb2_grpc.GreeterServicer):
42    def SayHello(self, request, context):
43        message = request.name
44        return helloworld_pb2.HelloReply(message=f"Hello {message}")
45
46
47def _run(
48    port: int,
49    secure_mode: bool,
50    server_id: str,
51    prometheus_endpoint: int,
52) -> None:
53    csm_plugin = _prepare_csm_observability_plugin(prometheus_endpoint)
54    csm_plugin.register_global()
55    server = grpc.server(
56        futures.ThreadPoolExecutor(max_workers=_THREAD_POOL_SIZE),
57        xds=secure_mode,
58    )
59    _configure_test_server(server, port, secure_mode, server_id)
60    server.start()
61    logger.info("Test server listening on port %d", port)
62    server.wait_for_termination()
63    csm_plugin.deregister_global()
64
65
66def _prepare_csm_observability_plugin(
67    prometheus_endpoint: int,
68) -> CsmOpenTelemetryPlugin:
69    # Start Prometheus client
70    start_http_server(port=prometheus_endpoint, addr="0.0.0.0")
71    reader = PrometheusMetricReader()
72    meter_provider = MeterProvider(
73        metric_readers=[reader], views=_create_views()
74    )
75    csm_plugin = CsmOpenTelemetryPlugin(
76        meter_provider=meter_provider,
77    )
78    return csm_plugin
79
80
81def _create_views() -> Sequence[view.View]:
82    """Create a list of views with config for specific metrics."""
83    latency_boundaries = [
84        0,
85        0.00001,
86        0.00005,
87        0.0001,
88        0.0003,
89        0.0006,
90        0.0008,
91        0.001,
92        0.002,
93        0.003,
94        0.004,
95        0.005,
96        0.006,
97        0.008,
98        0.01,
99        0.013,
100        0.016,
101        0.02,
102        0.025,
103        0.03,
104        0.04,
105        0.05,
106        0.065,
107        0.08,
108        0.1,
109        0.13,
110        0.16,
111        0.2,
112        0.25,
113        0.3,
114        0.4,
115        0.5,
116        0.65,
117        0.8,
118        1,
119        2,
120        5,
121        10,
122        20,
123        50,
124        100,
125    ]
126    size_boundaries = [
127        0,
128        1024,
129        2048,
130        4096,
131        16384,
132        65536,
133        262144,
134        1048576,
135        4194304,
136        16777216,
137        67108864,
138        268435456,
139        1073741824,
140        4294967296,
141    ]
142    return [
143        view.View(
144            instrument_type=Histogram,
145            instrument_unit="s",
146            aggregation=view.ExplicitBucketHistogramAggregation(
147                # Boundaries as defined in gRFC. See:
148                # https://github.com/grpc/proposal/blob/master/A66-otel-stats.md
149                boundaries=latency_boundaries
150            ),
151        ),
152        view.View(
153            instrument_type=Histogram,
154            instrument_unit="By",
155            aggregation=view.ExplicitBucketHistogramAggregation(
156                # Boundaries as defined in gRFC. See:
157                # https://github.com/grpc/proposal/blob/master/A66-otel-stats.md
158                boundaries=size_boundaries
159            ),
160        ),
161    ]
162
163
164def _configure_test_server(
165    server: grpc.Server, port: int, secure_mode: bool, server_id: str
166) -> None:
167    helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
168    listen_address = f"{_LISTEN_HOST}:{port}"
169    if not secure_mode:
170        server.add_insecure_port(listen_address)
171    else:
172        logger.info("Running with xDS Server credentials")
173        server_fallback_creds = grpc.insecure_server_credentials()
174        server_creds = grpc.xds_server_credentials(server_fallback_creds)
175        server.add_secure_port(listen_address, server_creds)
176
177
178def bool_arg(arg: str) -> bool:
179    if arg.lower() in ("true", "yes", "y"):
180        return True
181    elif arg.lower() in ("false", "no", "n"):
182        return False
183    else:
184        raise argparse.ArgumentTypeError(f"Could not parse '{arg}' as a bool.")
185
186
187if __name__ == "__main__":
188    logging.basicConfig()
189    logger.setLevel(logging.INFO)
190    parser = argparse.ArgumentParser(
191        description="Run Python CSM Observability Test server."
192    )
193    parser.add_argument(
194        "--port", type=int, default=50051, help="Port for test server."
195    )
196    parser.add_argument(
197        "--secure_mode",
198        type=bool_arg,
199        default="False",
200        help="If specified, uses xDS to retrieve server credentials.",
201    )
202    parser.add_argument(
203        "--server_id",
204        type=str,
205        default="python_server",
206        help="The server ID to return in responses.",
207    )
208    parser.add_argument(
209        "--prometheus_endpoint",
210        type=int,
211        default=9464,
212        help="Port for servers besides test server.",
213    )
214    args = parser.parse_args()
215    _run(
216        args.port,
217        args.secure_mode,
218        args.server_id,
219        args.prometheus_endpoint,
220    )
221