• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2024 The gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import argparse
16import logging
17import time
18from typing import Sequence
19
20import grpc
21from grpc_csm_observability import CsmOpenTelemetryPlugin
22from opentelemetry.exporter.prometheus import PrometheusMetricReader
23from opentelemetry.sdk.metrics import Histogram
24from opentelemetry.sdk.metrics import MeterProvider
25from opentelemetry.sdk.metrics import view
26from prometheus_client import start_http_server
27
28from examples.python.observability.csm import helloworld_pb2
29from examples.python.observability.csm import helloworld_pb2_grpc
30
31logger = logging.getLogger()
32console_handler = logging.StreamHandler()
33formatter = logging.Formatter(fmt="%(asctime)s: %(levelname)-8s %(message)s")
34console_handler.setFormatter(formatter)
35logger.addHandler(console_handler)
36
37
38def _run(target: str, secure_mode: bool, prometheus_endpoint: int):
39    csm_plugin = _prepare_csm_observability_plugin(prometheus_endpoint)
40    csm_plugin.register_global()
41    if secure_mode:
42        fallback_creds = grpc.experimental.insecure_channel_credentials()
43        channel_creds = grpc.xds_channel_credentials(fallback_creds)
44        channel = grpc.secure_channel(target, channel_creds)
45    else:
46        channel = grpc.insecure_channel(target)
47    with channel:
48        stub = helloworld_pb2_grpc.GreeterStub(channel)
49        # Continuously send RPCs every second.
50        while True:
51            request = helloworld_pb2.HelloRequest(name="You")
52            logger.info("Sending request to server")
53            try:
54                response = stub.SayHello(request)
55                print(f"Greeter client received: {response.message}")
56                time.sleep(1)
57            except Exception:  # pylint: disable=broad-except
58                logger.info(
59                    "Request failed, this is normal during initial setup."
60                )
61    # Deregister is not called in this example, but this is required to clean up.
62    csm_plugin.deregister_global()
63
64
65def _prepare_csm_observability_plugin(
66    prometheus_endpoint: int,
67) -> CsmOpenTelemetryPlugin:
68    # Start Prometheus client
69    start_http_server(port=prometheus_endpoint, addr="0.0.0.0")
70    reader = PrometheusMetricReader()
71    meter_provider = MeterProvider(
72        metric_readers=[reader], views=_create_views()
73    )
74    csm_plugin = CsmOpenTelemetryPlugin(
75        meter_provider=meter_provider,
76    )
77    return csm_plugin
78
79
80def _create_views() -> Sequence[view.View]:
81    """Create a list of views with config for specific metrics."""
82    latency_boundaries = [
83        0,
84        0.00001,
85        0.00005,
86        0.0001,
87        0.0003,
88        0.0006,
89        0.0008,
90        0.001,
91        0.002,
92        0.003,
93        0.004,
94        0.005,
95        0.006,
96        0.008,
97        0.01,
98        0.013,
99        0.016,
100        0.02,
101        0.025,
102        0.03,
103        0.04,
104        0.05,
105        0.065,
106        0.08,
107        0.1,
108        0.13,
109        0.16,
110        0.2,
111        0.25,
112        0.3,
113        0.4,
114        0.5,
115        0.65,
116        0.8,
117        1,
118        2,
119        5,
120        10,
121        20,
122        50,
123        100,
124    ]
125    size_boundaries = [
126        0,
127        1024,
128        2048,
129        4096,
130        16384,
131        65536,
132        262144,
133        1048576,
134        4194304,
135        16777216,
136        67108864,
137        268435456,
138        1073741824,
139        4294967296,
140    ]
141    return [
142        view.View(
143            instrument_type=Histogram,
144            instrument_unit="s",
145            aggregation=view.ExplicitBucketHistogramAggregation(
146                # Boundaries as defined in gRFC. See:
147                # https://github.com/grpc/proposal/blob/master/A66-otel-stats.md
148                boundaries=latency_boundaries
149            ),
150        ),
151        view.View(
152            instrument_type=Histogram,
153            instrument_unit="By",
154            aggregation=view.ExplicitBucketHistogramAggregation(
155                # Boundaries as defined in gRFC. See:
156                # https://github.com/grpc/proposal/blob/master/A66-otel-stats.md
157                boundaries=size_boundaries
158            ),
159        ),
160    ]
161
162
163def bool_arg(arg: str) -> bool:
164    if arg.lower() in ("true", "yes", "y"):
165        return True
166    elif arg.lower() in ("false", "no", "n"):
167        return False
168    else:
169        raise argparse.ArgumentTypeError(f"Could not parse '{arg}' as a bool.")
170
171
172if __name__ == "__main__":
173    logging.basicConfig()
174    logger.setLevel(logging.INFO)
175    parser = argparse.ArgumentParser(
176        description="Run Python CSM Observability Test client."
177    )
178    parser.add_argument(
179        "--target",
180        default="localhost:50051",
181        help="The address of the server.",
182    )
183    parser.add_argument(
184        "--secure_mode",
185        default="False",
186        type=bool_arg,
187        help="If specified, uses xDS credentials to connect to the server.",
188    )
189    parser.add_argument(
190        "--prometheus_endpoint",
191        type=int,
192        default=9464,
193        help="Port for servers besides test server.",
194    )
195    args = parser.parse_args()
196    _run(args.target, args.secure_mode, args.prometheus_endpoint)
197