• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2020 The gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import argparse
16import collections
17import concurrent.futures
18import datetime
19import logging
20import signal
21import threading
22import time
23from typing import (
24    DefaultDict,
25    Dict,
26    FrozenSet,
27    Iterable,
28    List,
29    Mapping,
30    Sequence,
31    Set,
32    Tuple,
33)
34
35import grpc
36from grpc import _typing as grpc_typing
37import grpc_admin
38from grpc_channelz.v1 import channelz
39from grpc_csm_observability import CsmOpenTelemetryPlugin
40from opentelemetry.exporter.prometheus import PrometheusMetricReader
41from opentelemetry.sdk.metrics import MeterProvider
42from prometheus_client import start_http_server
43
44from src.proto.grpc.testing import empty_pb2
45from src.proto.grpc.testing import messages_pb2
46from src.proto.grpc.testing import test_pb2
47from src.proto.grpc.testing import test_pb2_grpc
48from src.python.grpcio_tests.tests.fork import native_debug
49
50native_debug.install_failure_signal_handler()
51
52logger = logging.getLogger()
53console_handler = logging.StreamHandler()
54formatter = logging.Formatter(fmt="%(asctime)s: %(levelname)-8s %(message)s")
55console_handler.setFormatter(formatter)
56logger.addHandler(console_handler)
57
58_SUPPORTED_METHODS = (
59    "UnaryCall",
60    "EmptyCall",
61)
62
63_METHOD_CAMEL_TO_CAPS_SNAKE = {
64    "UnaryCall": "UNARY_CALL",
65    "EmptyCall": "EMPTY_CALL",
66}
67
68_METHOD_STR_TO_ENUM = {
69    "UnaryCall": messages_pb2.ClientConfigureRequest.UNARY_CALL,
70    "EmptyCall": messages_pb2.ClientConfigureRequest.EMPTY_CALL,
71}
72
73_METHOD_ENUM_TO_STR = {v: k for k, v in _METHOD_STR_TO_ENUM.items()}
74
75_PROMETHEUS_PORT = 9464
76
77PerMethodMetadataType = Mapping[str, Sequence[Tuple[str, str]]]
78
79
80# FutureFromCall is both a grpc.Call and grpc.Future
81class FutureFromCallType(grpc.Call, grpc.Future):
82    pass
83
84
85_CONFIG_CHANGE_TIMEOUT = datetime.timedelta(milliseconds=500)
86
87
88class _StatsWatcher:
89    _start: int
90    _end: int
91    _rpcs_needed: int
92    _rpcs_by_peer: DefaultDict[str, int]
93    _rpcs_by_method: DefaultDict[str, DefaultDict[str, int]]
94    _no_remote_peer: int
95    _lock: threading.Lock
96    _condition: threading.Condition
97    _metadata_keys: FrozenSet[str]
98    _include_all_metadata: bool
99    _metadata_by_peer: DefaultDict[
100        str, messages_pb2.LoadBalancerStatsResponse.MetadataByPeer
101    ]
102
103    def __init__(self, start: int, end: int, metadata_keys: Iterable[str]):
104        self._start = start
105        self._end = end
106        self._rpcs_needed = end - start
107        self._rpcs_by_peer = collections.defaultdict(int)
108        self._rpcs_by_method = collections.defaultdict(
109            lambda: collections.defaultdict(int)
110        )
111        self._condition = threading.Condition()
112        self._no_remote_peer = 0
113        self._metadata_keys = frozenset(
114            self._sanitize_metadata_key(key) for key in metadata_keys
115        )
116        self._include_all_metadata = "*" in self._metadata_keys
117        self._metadata_by_peer = collections.defaultdict(
118            messages_pb2.LoadBalancerStatsResponse.MetadataByPeer
119        )
120
121    @classmethod
122    def _sanitize_metadata_key(cls, metadata_key: str) -> str:
123        return metadata_key.strip().lower()
124
125    def _add_metadata(
126        self,
127        rpc_metadata: messages_pb2.LoadBalancerStatsResponse.RpcMetadata,
128        metadata_to_add: grpc_typing.MetadataType,
129        metadata_type: messages_pb2.LoadBalancerStatsResponse.MetadataType,
130    ) -> None:
131        for key, value in metadata_to_add:
132            if (
133                self._include_all_metadata
134                or self._sanitize_metadata_key(key) in self._metadata_keys
135            ):
136                rpc_metadata.metadata.append(
137                    messages_pb2.LoadBalancerStatsResponse.MetadataEntry(
138                        key=key, value=value, type=metadata_type
139                    )
140                )
141
142    def on_rpc_complete(
143        self,
144        request_id: int,
145        peer: str,
146        method: str,
147        *,
148        initial_metadata: grpc_typing.MetadataType,
149        trailing_metadata: grpc_typing.MetadataType,
150    ) -> None:
151        """Records statistics for a single RPC."""
152        if self._start <= request_id < self._end:
153            with self._condition:
154                if not peer:
155                    self._no_remote_peer += 1
156                else:
157                    self._rpcs_by_peer[peer] += 1
158                    self._rpcs_by_method[method][peer] += 1
159                    if self._metadata_keys:
160                        rpc_metadata = (
161                            messages_pb2.LoadBalancerStatsResponse.RpcMetadata()
162                        )
163                        self._add_metadata(
164                            rpc_metadata,
165                            initial_metadata,
166                            messages_pb2.LoadBalancerStatsResponse.MetadataType.INITIAL,
167                        )
168                        self._add_metadata(
169                            rpc_metadata,
170                            trailing_metadata,
171                            messages_pb2.LoadBalancerStatsResponse.MetadataType.TRAILING,
172                        )
173                        self._metadata_by_peer[peer].rpc_metadata.append(
174                            rpc_metadata
175                        )
176                self._rpcs_needed -= 1
177                self._condition.notify()
178
179    def await_rpc_stats_response(
180        self, timeout_sec: int
181    ) -> messages_pb2.LoadBalancerStatsResponse:
182        """Blocks until a full response has been collected."""
183        with self._condition:
184            self._condition.wait_for(
185                lambda: not self._rpcs_needed, timeout=float(timeout_sec)
186            )
187            response = messages_pb2.LoadBalancerStatsResponse()
188            for peer, count in self._rpcs_by_peer.items():
189                response.rpcs_by_peer[peer] = count
190            for method, count_by_peer in self._rpcs_by_method.items():
191                for peer, count in count_by_peer.items():
192                    response.rpcs_by_method[method].rpcs_by_peer[peer] = count
193            for peer, metadata_by_peer in self._metadata_by_peer.items():
194                response.metadatas_by_peer[peer].CopyFrom(metadata_by_peer)
195            response.num_failures = self._no_remote_peer + self._rpcs_needed
196        return response
197
198
199_global_lock = threading.Lock()
200_stop_event = threading.Event()
201_global_rpc_id: int = 0
202_watchers: Set[_StatsWatcher] = set()
203_global_server = None
204_global_rpcs_started: Mapping[str, int] = collections.defaultdict(int)
205_global_rpcs_succeeded: Mapping[str, int] = collections.defaultdict(int)
206_global_rpcs_failed: Mapping[str, int] = collections.defaultdict(int)
207
208# Mapping[method, Mapping[status_code, count]]
209_global_rpc_statuses: Mapping[str, Mapping[int, int]] = collections.defaultdict(
210    lambda: collections.defaultdict(int)
211)
212
213
214def _handle_sigint(sig, frame) -> None:
215    logger.warning("Received SIGINT")
216    _stop_event.set()
217    _global_server.stop(None)
218
219
220class _LoadBalancerStatsServicer(
221    test_pb2_grpc.LoadBalancerStatsServiceServicer
222):
223    def __init__(self):
224        super(_LoadBalancerStatsServicer).__init__()
225
226    def GetClientStats(
227        self,
228        request: messages_pb2.LoadBalancerStatsRequest,
229        context: grpc.ServicerContext,
230    ) -> messages_pb2.LoadBalancerStatsResponse:
231        logger.info("Received stats request.")
232        start = None
233        end = None
234        watcher = None
235        with _global_lock:
236            start = _global_rpc_id + 1
237            end = start + request.num_rpcs
238            watcher = _StatsWatcher(start, end, request.metadata_keys)
239            _watchers.add(watcher)
240        response = watcher.await_rpc_stats_response(request.timeout_sec)
241        with _global_lock:
242            _watchers.remove(watcher)
243        logger.info("Returning stats response: %s", response)
244        return response
245
246    def GetClientAccumulatedStats(
247        self,
248        request: messages_pb2.LoadBalancerAccumulatedStatsRequest,
249        context: grpc.ServicerContext,
250    ) -> messages_pb2.LoadBalancerAccumulatedStatsResponse:
251        logger.info("Received cumulative stats request.")
252        response = messages_pb2.LoadBalancerAccumulatedStatsResponse()
253        with _global_lock:
254            for method in _SUPPORTED_METHODS:
255                caps_method = _METHOD_CAMEL_TO_CAPS_SNAKE[method]
256                response.num_rpcs_started_by_method[
257                    caps_method
258                ] = _global_rpcs_started[method]
259                response.num_rpcs_succeeded_by_method[
260                    caps_method
261                ] = _global_rpcs_succeeded[method]
262                response.num_rpcs_failed_by_method[
263                    caps_method
264                ] = _global_rpcs_failed[method]
265                response.stats_per_method[
266                    caps_method
267                ].rpcs_started = _global_rpcs_started[method]
268                for code, count in _global_rpc_statuses[method].items():
269                    response.stats_per_method[caps_method].result[code] = count
270        logger.info("Returning cumulative stats response.")
271        return response
272
273
274def _start_rpc(
275    method: str,
276    metadata: Sequence[Tuple[str, str]],
277    request_id: int,
278    stub: test_pb2_grpc.TestServiceStub,
279    timeout: float,
280    futures: Mapping[int, Tuple[FutureFromCallType, str]],
281    request_payload_size: int,
282    response_payload_size: int,
283) -> None:
284    logger.debug(f"Sending {method} request to backend: {request_id}")
285    if method == "UnaryCall":
286        request = messages_pb2.SimpleRequest(
287            response_type=messages_pb2.COMPRESSABLE,
288            response_size=response_payload_size,
289            payload=messages_pb2.Payload(body=b"0" * request_payload_size),
290        )
291        future = stub.UnaryCall.future(
292            request, metadata=metadata, timeout=timeout
293        )
294    elif method == "EmptyCall":
295        if request_payload_size > 0:
296            logger.error(
297                f"request_payload_size should not be set for EMPTY_CALL"
298            )
299        if response_payload_size > 0:
300            logger.error(
301                f"response_payload_size should not be set for EMPTY_CALL"
302            )
303        future = stub.EmptyCall.future(
304            empty_pb2.Empty(), metadata=metadata, timeout=timeout
305        )
306    else:
307        raise ValueError(f"Unrecognized method '{method}'.")
308    futures[request_id] = (future, method)
309
310
311def _on_rpc_done(
312    rpc_id: int, future: FutureFromCallType, method: str, print_response: bool
313) -> None:
314    exception = future.exception()
315    hostname = ""
316    with _global_lock:
317        _global_rpc_statuses[method][future.code().value[0]] += 1
318    if exception is not None:
319        with _global_lock:
320            _global_rpcs_failed[method] += 1
321        if exception.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
322            logger.error(f"RPC {rpc_id} timed out")
323        else:
324            logger.error(exception)
325    else:
326        response = future.result()
327        hostname = None
328        for metadatum in future.initial_metadata():
329            if metadatum[0] == "hostname":
330                hostname = metadatum[1]
331                break
332        else:
333            hostname = response.hostname
334        if future.code() == grpc.StatusCode.OK:
335            with _global_lock:
336                _global_rpcs_succeeded[method] += 1
337        else:
338            with _global_lock:
339                _global_rpcs_failed[method] += 1
340        if print_response:
341            if future.code() == grpc.StatusCode.OK:
342                logger.debug("Successful response.")
343            else:
344                logger.debug(f"RPC failed: {rpc_id}")
345    with _global_lock:
346        for watcher in _watchers:
347            watcher.on_rpc_complete(
348                rpc_id,
349                hostname,
350                method,
351                initial_metadata=future.initial_metadata(),
352                trailing_metadata=future.trailing_metadata(),
353            )
354
355
356def _remove_completed_rpcs(
357    rpc_futures: Mapping[int, FutureFromCallType], print_response: bool
358) -> None:
359    logger.debug("Removing completed RPCs")
360    done = []
361    for future_id, (future, method) in rpc_futures.items():
362        if future.done():
363            _on_rpc_done(future_id, future, method, args.print_response)
364            done.append(future_id)
365    for rpc_id in done:
366        del rpc_futures[rpc_id]
367
368
369def _cancel_all_rpcs(futures: Mapping[int, Tuple[grpc.Future, str]]) -> None:
370    logger.info("Cancelling all remaining RPCs")
371    for future, _ in futures.values():
372        future.cancel()
373
374
375class _ChannelConfiguration:
376    """Configuration for a single client channel.
377
378    Instances of this class are meant to be dealt with as PODs. That is,
379    data member should be accessed directly. This class is not thread-safe.
380    When accessing any of its members, the lock member should be held.
381    """
382
383    def __init__(
384        self,
385        method: str,
386        metadata: Sequence[Tuple[str, str]],
387        qps: int,
388        server: str,
389        rpc_timeout_sec: int,
390        print_response: bool,
391        secure_mode: bool,
392        request_payload_size: int,
393        response_payload_size: int,
394    ):
395        # condition is signalled when a change is made to the config.
396        self.condition = threading.Condition()
397
398        self.method = method
399        self.metadata = metadata
400        self.qps = qps
401        self.server = server
402        self.rpc_timeout_sec = rpc_timeout_sec
403        self.print_response = print_response
404        self.secure_mode = secure_mode
405        self.response_payload_size = response_payload_size
406        self.request_payload_size = request_payload_size
407
408
409def _run_single_channel(config: _ChannelConfiguration) -> None:
410    global _global_rpc_id  # pylint: disable=global-statement
411    with config.condition:
412        server = config.server
413    channel = None
414    if config.secure_mode:
415        fallback_creds = grpc.experimental.insecure_channel_credentials()
416        channel_creds = grpc.xds_channel_credentials(fallback_creds)
417        channel = grpc.secure_channel(server, channel_creds)
418    else:
419        channel = grpc.insecure_channel(server)
420    with channel:
421        stub = test_pb2_grpc.TestServiceStub(channel)
422        futures: Dict[int, Tuple[FutureFromCallType, str]] = {}
423        while not _stop_event.is_set():
424            with config.condition:
425                if config.qps == 0:
426                    config.condition.wait(
427                        timeout=_CONFIG_CHANGE_TIMEOUT.total_seconds()
428                    )
429                    continue
430                else:
431                    duration_per_query = 1.0 / float(config.qps)
432                request_id = None
433                with _global_lock:
434                    request_id = _global_rpc_id
435                    _global_rpc_id += 1
436                    _global_rpcs_started[config.method] += 1
437                start = time.time()
438                end = start + duration_per_query
439                _start_rpc(
440                    config.method,
441                    config.metadata,
442                    request_id,
443                    stub,
444                    float(config.rpc_timeout_sec),
445                    futures,
446                    config.request_payload_size,
447                    config.response_payload_size,
448                )
449                print_response = config.print_response
450            _remove_completed_rpcs(futures, config.print_response)
451            logger.debug(f"Currently {len(futures)} in-flight RPCs")
452            now = time.time()
453            while now < end:
454                time.sleep(end - now)
455                now = time.time()
456        _cancel_all_rpcs(futures)
457
458
459class _XdsUpdateClientConfigureServicer(
460    test_pb2_grpc.XdsUpdateClientConfigureServiceServicer
461):
462    def __init__(
463        self, per_method_configs: Mapping[str, _ChannelConfiguration], qps: int
464    ):
465        super(_XdsUpdateClientConfigureServicer).__init__()
466        self._per_method_configs = per_method_configs
467        self._qps = qps
468
469    def Configure(
470        self,
471        request: messages_pb2.ClientConfigureRequest,
472        context: grpc.ServicerContext,
473    ) -> messages_pb2.ClientConfigureResponse:
474        logger.info("Received Configure RPC: %s", request)
475        method_strs = [_METHOD_ENUM_TO_STR[t] for t in request.types]
476        for method in _SUPPORTED_METHODS:
477            method_enum = _METHOD_STR_TO_ENUM[method]
478            channel_config = self._per_method_configs[method]
479            if method in method_strs:
480                qps = self._qps
481                metadata = (
482                    (md.key, md.value)
483                    for md in request.metadata
484                    if md.type == method_enum
485                )
486                # For backward compatibility, do not change timeout when we
487                # receive a default value timeout.
488                if request.timeout_sec == 0:
489                    timeout_sec = channel_config.rpc_timeout_sec
490                else:
491                    timeout_sec = request.timeout_sec
492            else:
493                qps = 0
494                metadata = ()
495                # Leave timeout unchanged for backward compatibility.
496                timeout_sec = channel_config.rpc_timeout_sec
497            with channel_config.condition:
498                channel_config.qps = qps
499                channel_config.metadata = list(metadata)
500                channel_config.rpc_timeout_sec = timeout_sec
501                channel_config.condition.notify_all()
502        return messages_pb2.ClientConfigureResponse()
503
504
505class _MethodHandle:
506    """An object grouping together threads driving RPCs for a method."""
507
508    _channel_threads: List[threading.Thread]
509
510    def __init__(
511        self, num_channels: int, channel_config: _ChannelConfiguration
512    ):
513        """Creates and starts a group of threads running the indicated method."""
514        self._channel_threads = []
515        for i in range(num_channels):
516            thread = threading.Thread(
517                target=_run_single_channel, args=(channel_config,)
518            )
519            thread.start()
520            self._channel_threads.append(thread)
521
522    def stop(self) -> None:
523        """Joins all threads referenced by the handle."""
524        for channel_thread in self._channel_threads:
525            channel_thread.join()
526
527
528def _run(
529    args: argparse.Namespace,
530    methods: Sequence[str],
531    per_method_metadata: PerMethodMetadataType,
532) -> None:
533    logger.info("Starting python xDS Interop Client.")
534    csm_plugin = None
535    if args.enable_csm_observability:
536        csm_plugin = _prepare_csm_observability_plugin()
537        csm_plugin.register_global()
538    global _global_server  # pylint: disable=global-statement
539    method_handles = []
540    channel_configs = {}
541    for method in _SUPPORTED_METHODS:
542        if method in methods:
543            qps = args.qps
544        else:
545            qps = 0
546        channel_config = _ChannelConfiguration(
547            method,
548            per_method_metadata.get(method, []),
549            qps,
550            args.server,
551            args.rpc_timeout_sec,
552            args.print_response,
553            args.secure_mode,
554            args.request_payload_size,
555            args.response_payload_size,
556        )
557        channel_configs[method] = channel_config
558        method_handles.append(_MethodHandle(args.num_channels, channel_config))
559    _global_server = grpc.server(concurrent.futures.ThreadPoolExecutor())
560    _global_server.add_insecure_port(f"0.0.0.0:{args.stats_port}")
561    test_pb2_grpc.add_LoadBalancerStatsServiceServicer_to_server(
562        _LoadBalancerStatsServicer(), _global_server
563    )
564    test_pb2_grpc.add_XdsUpdateClientConfigureServiceServicer_to_server(
565        _XdsUpdateClientConfigureServicer(channel_configs, args.qps),
566        _global_server,
567    )
568    grpc_admin.add_admin_servicers(_global_server)
569    _global_server.start()
570    _global_server.wait_for_termination()
571    for method_handle in method_handles:
572        method_handle.stop()
573    if csm_plugin:
574        csm_plugin.deregister_global()
575
576
577def parse_metadata_arg(metadata_arg: str) -> PerMethodMetadataType:
578    metadata = metadata_arg.split(",") if args.metadata else []
579    per_method_metadata = collections.defaultdict(list)
580    for metadatum in metadata:
581        elems = metadatum.split(":")
582        if len(elems) != 3:
583            raise ValueError(
584                f"'{metadatum}' was not in the form 'METHOD:KEY:VALUE'"
585            )
586        if elems[0] not in _SUPPORTED_METHODS:
587            raise ValueError(f"Unrecognized method '{elems[0]}'")
588        per_method_metadata[elems[0]].append((elems[1], elems[2]))
589    return per_method_metadata
590
591
592def parse_rpc_arg(rpc_arg: str) -> Sequence[str]:
593    methods = rpc_arg.split(",")
594    if set(methods) - set(_SUPPORTED_METHODS):
595        raise ValueError(
596            "--rpc supported methods: {}".format(", ".join(_SUPPORTED_METHODS))
597        )
598    return methods
599
600
601def bool_arg(arg: str) -> bool:
602    if arg.lower() in ("true", "yes", "y"):
603        return True
604    elif arg.lower() in ("false", "no", "n"):
605        return False
606    else:
607        raise argparse.ArgumentTypeError(f"Could not parse '{arg}' as a bool.")
608
609
610def _prepare_csm_observability_plugin() -> CsmOpenTelemetryPlugin:
611    # Start Prometheus client
612    start_http_server(port=_PROMETHEUS_PORT, addr="0.0.0.0")
613    reader = PrometheusMetricReader()
614    meter_provider = MeterProvider(metric_readers=[reader])
615    csm_plugin = CsmOpenTelemetryPlugin(
616        meter_provider=meter_provider,
617    )
618    return csm_plugin
619
620
621if __name__ == "__main__":
622    parser = argparse.ArgumentParser(
623        description="Run Python XDS interop client."
624    )
625    parser.add_argument(
626        "--num_channels",
627        default=1,
628        type=int,
629        help="The number of channels from which to send requests.",
630    )
631    parser.add_argument(
632        "--print_response",
633        default="False",
634        type=bool_arg,
635        help="Write RPC response to STDOUT.",
636    )
637    parser.add_argument(
638        "--qps",
639        default=1,
640        type=int,
641        help="The number of queries to send from each channel per second.",
642    )
643    parser.add_argument(
644        "--rpc_timeout_sec",
645        default=30,
646        type=int,
647        help="The per-RPC timeout in seconds.",
648    )
649    parser.add_argument(
650        "--server", default="localhost:50051", help="The address of the server."
651    )
652    parser.add_argument(
653        "--stats_port",
654        default=50052,
655        type=int,
656        help="The port on which to expose the peer distribution stats service.",
657    )
658    parser.add_argument(
659        "--secure_mode",
660        default="False",
661        type=bool_arg,
662        help="If specified, uses xDS credentials to connect to the server.",
663    )
664    parser.add_argument(
665        "--verbose",
666        help="verbose log output",
667        default=False,
668        action="store_true",
669    )
670    parser.add_argument(
671        "--log_file", default=None, type=str, help="A file to log to."
672    )
673    parser.add_argument(
674        "--enable_csm_observability",
675        help="Whether to enable CSM Observability",
676        default="False",
677        type=bool_arg,
678    )
679    parser.add_argument(
680        "--request_payload_size",
681        default=0,
682        type=int,
683        help="Set the SimpleRequest.payload.body to a string of repeated 0 (zero) ASCII characters of the given size in bytes.",
684    )
685    parser.add_argument(
686        "--response_payload_size",
687        default=0,
688        type=int,
689        help="Ask the server to respond with SimpleResponse.payload.body of the given length (may not be implemented on the server).",
690    )
691    rpc_help = "A comma-delimited list of RPC methods to run. Must be one of "
692    rpc_help += ", ".join(_SUPPORTED_METHODS)
693    rpc_help += "."
694    parser.add_argument("--rpc", default="UnaryCall", type=str, help=rpc_help)
695    metadata_help = (
696        "A comma-delimited list of 3-tuples of the form "
697        + "METHOD:KEY:VALUE, e.g. "
698        + "EmptyCall:key1:value1,UnaryCall:key2:value2,EmptyCall:k3:v3"
699    )
700    parser.add_argument("--metadata", default="", type=str, help=metadata_help)
701    args = parser.parse_args()
702    signal.signal(signal.SIGINT, _handle_sigint)
703    if args.verbose:
704        logger.setLevel(logging.DEBUG)
705    if args.log_file:
706        file_handler = logging.FileHandler(args.log_file, mode="a")
707        file_handler.setFormatter(formatter)
708        logger.addHandler(file_handler)
709    _run(args, parse_rpc_arg(args.rpc), parse_metadata_arg(args.metadata))
710