1# Copyright 2020 The gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import argparse 16import collections 17import concurrent.futures 18import datetime 19import logging 20import signal 21import threading 22import time 23from typing import ( 24 DefaultDict, 25 Dict, 26 FrozenSet, 27 Iterable, 28 List, 29 Mapping, 30 Sequence, 31 Set, 32 Tuple, 33) 34 35import grpc 36from grpc import _typing as grpc_typing 37import grpc_admin 38from grpc_channelz.v1 import channelz 39from grpc_csm_observability import CsmOpenTelemetryPlugin 40from opentelemetry.exporter.prometheus import PrometheusMetricReader 41from opentelemetry.sdk.metrics import MeterProvider 42from prometheus_client import start_http_server 43 44from src.proto.grpc.testing import empty_pb2 45from src.proto.grpc.testing import messages_pb2 46from src.proto.grpc.testing import test_pb2 47from src.proto.grpc.testing import test_pb2_grpc 48from src.python.grpcio_tests.tests.fork import native_debug 49 50native_debug.install_failure_signal_handler() 51 52logger = logging.getLogger() 53console_handler = logging.StreamHandler() 54formatter = logging.Formatter(fmt="%(asctime)s: %(levelname)-8s %(message)s") 55console_handler.setFormatter(formatter) 56logger.addHandler(console_handler) 57 58_SUPPORTED_METHODS = ( 59 "UnaryCall", 60 "EmptyCall", 61) 62 63_METHOD_CAMEL_TO_CAPS_SNAKE = { 64 "UnaryCall": "UNARY_CALL", 65 "EmptyCall": "EMPTY_CALL", 66} 67 68_METHOD_STR_TO_ENUM = { 69 "UnaryCall": messages_pb2.ClientConfigureRequest.UNARY_CALL, 70 "EmptyCall": messages_pb2.ClientConfigureRequest.EMPTY_CALL, 71} 72 73_METHOD_ENUM_TO_STR = {v: k for k, v in _METHOD_STR_TO_ENUM.items()} 74 75_PROMETHEUS_PORT = 9464 76 77PerMethodMetadataType = Mapping[str, Sequence[Tuple[str, str]]] 78 79 80# FutureFromCall is both a grpc.Call and grpc.Future 81class FutureFromCallType(grpc.Call, grpc.Future): 82 pass 83 84 85_CONFIG_CHANGE_TIMEOUT = datetime.timedelta(milliseconds=500) 86 87 88class _StatsWatcher: 89 _start: int 90 _end: int 91 _rpcs_needed: int 92 _rpcs_by_peer: DefaultDict[str, int] 93 _rpcs_by_method: DefaultDict[str, DefaultDict[str, int]] 94 _no_remote_peer: int 95 _lock: threading.Lock 96 _condition: threading.Condition 97 _metadata_keys: FrozenSet[str] 98 _include_all_metadata: bool 99 _metadata_by_peer: DefaultDict[ 100 str, messages_pb2.LoadBalancerStatsResponse.MetadataByPeer 101 ] 102 103 def __init__(self, start: int, end: int, metadata_keys: Iterable[str]): 104 self._start = start 105 self._end = end 106 self._rpcs_needed = end - start 107 self._rpcs_by_peer = collections.defaultdict(int) 108 self._rpcs_by_method = collections.defaultdict( 109 lambda: collections.defaultdict(int) 110 ) 111 self._condition = threading.Condition() 112 self._no_remote_peer = 0 113 self._metadata_keys = frozenset( 114 self._sanitize_metadata_key(key) for key in metadata_keys 115 ) 116 self._include_all_metadata = "*" in self._metadata_keys 117 self._metadata_by_peer = collections.defaultdict( 118 messages_pb2.LoadBalancerStatsResponse.MetadataByPeer 119 ) 120 121 @classmethod 122 def _sanitize_metadata_key(cls, metadata_key: str) -> str: 123 return metadata_key.strip().lower() 124 125 def _add_metadata( 126 self, 127 rpc_metadata: messages_pb2.LoadBalancerStatsResponse.RpcMetadata, 128 metadata_to_add: grpc_typing.MetadataType, 129 metadata_type: messages_pb2.LoadBalancerStatsResponse.MetadataType, 130 ) -> None: 131 for key, value in metadata_to_add: 132 if ( 133 self._include_all_metadata 134 or self._sanitize_metadata_key(key) in self._metadata_keys 135 ): 136 rpc_metadata.metadata.append( 137 messages_pb2.LoadBalancerStatsResponse.MetadataEntry( 138 key=key, value=value, type=metadata_type 139 ) 140 ) 141 142 def on_rpc_complete( 143 self, 144 request_id: int, 145 peer: str, 146 method: str, 147 *, 148 initial_metadata: grpc_typing.MetadataType, 149 trailing_metadata: grpc_typing.MetadataType, 150 ) -> None: 151 """Records statistics for a single RPC.""" 152 if self._start <= request_id < self._end: 153 with self._condition: 154 if not peer: 155 self._no_remote_peer += 1 156 else: 157 self._rpcs_by_peer[peer] += 1 158 self._rpcs_by_method[method][peer] += 1 159 if self._metadata_keys: 160 rpc_metadata = ( 161 messages_pb2.LoadBalancerStatsResponse.RpcMetadata() 162 ) 163 self._add_metadata( 164 rpc_metadata, 165 initial_metadata, 166 messages_pb2.LoadBalancerStatsResponse.MetadataType.INITIAL, 167 ) 168 self._add_metadata( 169 rpc_metadata, 170 trailing_metadata, 171 messages_pb2.LoadBalancerStatsResponse.MetadataType.TRAILING, 172 ) 173 self._metadata_by_peer[peer].rpc_metadata.append( 174 rpc_metadata 175 ) 176 self._rpcs_needed -= 1 177 self._condition.notify() 178 179 def await_rpc_stats_response( 180 self, timeout_sec: int 181 ) -> messages_pb2.LoadBalancerStatsResponse: 182 """Blocks until a full response has been collected.""" 183 with self._condition: 184 self._condition.wait_for( 185 lambda: not self._rpcs_needed, timeout=float(timeout_sec) 186 ) 187 response = messages_pb2.LoadBalancerStatsResponse() 188 for peer, count in self._rpcs_by_peer.items(): 189 response.rpcs_by_peer[peer] = count 190 for method, count_by_peer in self._rpcs_by_method.items(): 191 for peer, count in count_by_peer.items(): 192 response.rpcs_by_method[method].rpcs_by_peer[peer] = count 193 for peer, metadata_by_peer in self._metadata_by_peer.items(): 194 response.metadatas_by_peer[peer].CopyFrom(metadata_by_peer) 195 response.num_failures = self._no_remote_peer + self._rpcs_needed 196 return response 197 198 199_global_lock = threading.Lock() 200_stop_event = threading.Event() 201_global_rpc_id: int = 0 202_watchers: Set[_StatsWatcher] = set() 203_global_server = None 204_global_rpcs_started: Mapping[str, int] = collections.defaultdict(int) 205_global_rpcs_succeeded: Mapping[str, int] = collections.defaultdict(int) 206_global_rpcs_failed: Mapping[str, int] = collections.defaultdict(int) 207 208# Mapping[method, Mapping[status_code, count]] 209_global_rpc_statuses: Mapping[str, Mapping[int, int]] = collections.defaultdict( 210 lambda: collections.defaultdict(int) 211) 212 213 214def _handle_sigint(sig, frame) -> None: 215 logger.warning("Received SIGINT") 216 _stop_event.set() 217 _global_server.stop(None) 218 219 220class _LoadBalancerStatsServicer( 221 test_pb2_grpc.LoadBalancerStatsServiceServicer 222): 223 def __init__(self): 224 super(_LoadBalancerStatsServicer).__init__() 225 226 def GetClientStats( 227 self, 228 request: messages_pb2.LoadBalancerStatsRequest, 229 context: grpc.ServicerContext, 230 ) -> messages_pb2.LoadBalancerStatsResponse: 231 logger.info("Received stats request.") 232 start = None 233 end = None 234 watcher = None 235 with _global_lock: 236 start = _global_rpc_id + 1 237 end = start + request.num_rpcs 238 watcher = _StatsWatcher(start, end, request.metadata_keys) 239 _watchers.add(watcher) 240 response = watcher.await_rpc_stats_response(request.timeout_sec) 241 with _global_lock: 242 _watchers.remove(watcher) 243 logger.info("Returning stats response: %s", response) 244 return response 245 246 def GetClientAccumulatedStats( 247 self, 248 request: messages_pb2.LoadBalancerAccumulatedStatsRequest, 249 context: grpc.ServicerContext, 250 ) -> messages_pb2.LoadBalancerAccumulatedStatsResponse: 251 logger.info("Received cumulative stats request.") 252 response = messages_pb2.LoadBalancerAccumulatedStatsResponse() 253 with _global_lock: 254 for method in _SUPPORTED_METHODS: 255 caps_method = _METHOD_CAMEL_TO_CAPS_SNAKE[method] 256 response.num_rpcs_started_by_method[ 257 caps_method 258 ] = _global_rpcs_started[method] 259 response.num_rpcs_succeeded_by_method[ 260 caps_method 261 ] = _global_rpcs_succeeded[method] 262 response.num_rpcs_failed_by_method[ 263 caps_method 264 ] = _global_rpcs_failed[method] 265 response.stats_per_method[ 266 caps_method 267 ].rpcs_started = _global_rpcs_started[method] 268 for code, count in _global_rpc_statuses[method].items(): 269 response.stats_per_method[caps_method].result[code] = count 270 logger.info("Returning cumulative stats response.") 271 return response 272 273 274def _start_rpc( 275 method: str, 276 metadata: Sequence[Tuple[str, str]], 277 request_id: int, 278 stub: test_pb2_grpc.TestServiceStub, 279 timeout: float, 280 futures: Mapping[int, Tuple[FutureFromCallType, str]], 281 request_payload_size: int, 282 response_payload_size: int, 283) -> None: 284 logger.debug(f"Sending {method} request to backend: {request_id}") 285 if method == "UnaryCall": 286 request = messages_pb2.SimpleRequest( 287 response_type=messages_pb2.COMPRESSABLE, 288 response_size=response_payload_size, 289 payload=messages_pb2.Payload(body=b"0" * request_payload_size), 290 ) 291 future = stub.UnaryCall.future( 292 request, metadata=metadata, timeout=timeout 293 ) 294 elif method == "EmptyCall": 295 if request_payload_size > 0: 296 logger.error( 297 f"request_payload_size should not be set for EMPTY_CALL" 298 ) 299 if response_payload_size > 0: 300 logger.error( 301 f"response_payload_size should not be set for EMPTY_CALL" 302 ) 303 future = stub.EmptyCall.future( 304 empty_pb2.Empty(), metadata=metadata, timeout=timeout 305 ) 306 else: 307 raise ValueError(f"Unrecognized method '{method}'.") 308 futures[request_id] = (future, method) 309 310 311def _on_rpc_done( 312 rpc_id: int, future: FutureFromCallType, method: str, print_response: bool 313) -> None: 314 exception = future.exception() 315 hostname = "" 316 with _global_lock: 317 _global_rpc_statuses[method][future.code().value[0]] += 1 318 if exception is not None: 319 with _global_lock: 320 _global_rpcs_failed[method] += 1 321 if exception.code() == grpc.StatusCode.DEADLINE_EXCEEDED: 322 logger.error(f"RPC {rpc_id} timed out") 323 else: 324 logger.error(exception) 325 else: 326 response = future.result() 327 hostname = None 328 for metadatum in future.initial_metadata(): 329 if metadatum[0] == "hostname": 330 hostname = metadatum[1] 331 break 332 else: 333 hostname = response.hostname 334 if future.code() == grpc.StatusCode.OK: 335 with _global_lock: 336 _global_rpcs_succeeded[method] += 1 337 else: 338 with _global_lock: 339 _global_rpcs_failed[method] += 1 340 if print_response: 341 if future.code() == grpc.StatusCode.OK: 342 logger.debug("Successful response.") 343 else: 344 logger.debug(f"RPC failed: {rpc_id}") 345 with _global_lock: 346 for watcher in _watchers: 347 watcher.on_rpc_complete( 348 rpc_id, 349 hostname, 350 method, 351 initial_metadata=future.initial_metadata(), 352 trailing_metadata=future.trailing_metadata(), 353 ) 354 355 356def _remove_completed_rpcs( 357 rpc_futures: Mapping[int, FutureFromCallType], print_response: bool 358) -> None: 359 logger.debug("Removing completed RPCs") 360 done = [] 361 for future_id, (future, method) in rpc_futures.items(): 362 if future.done(): 363 _on_rpc_done(future_id, future, method, args.print_response) 364 done.append(future_id) 365 for rpc_id in done: 366 del rpc_futures[rpc_id] 367 368 369def _cancel_all_rpcs(futures: Mapping[int, Tuple[grpc.Future, str]]) -> None: 370 logger.info("Cancelling all remaining RPCs") 371 for future, _ in futures.values(): 372 future.cancel() 373 374 375class _ChannelConfiguration: 376 """Configuration for a single client channel. 377 378 Instances of this class are meant to be dealt with as PODs. That is, 379 data member should be accessed directly. This class is not thread-safe. 380 When accessing any of its members, the lock member should be held. 381 """ 382 383 def __init__( 384 self, 385 method: str, 386 metadata: Sequence[Tuple[str, str]], 387 qps: int, 388 server: str, 389 rpc_timeout_sec: int, 390 print_response: bool, 391 secure_mode: bool, 392 request_payload_size: int, 393 response_payload_size: int, 394 ): 395 # condition is signalled when a change is made to the config. 396 self.condition = threading.Condition() 397 398 self.method = method 399 self.metadata = metadata 400 self.qps = qps 401 self.server = server 402 self.rpc_timeout_sec = rpc_timeout_sec 403 self.print_response = print_response 404 self.secure_mode = secure_mode 405 self.response_payload_size = response_payload_size 406 self.request_payload_size = request_payload_size 407 408 409def _run_single_channel(config: _ChannelConfiguration) -> None: 410 global _global_rpc_id # pylint: disable=global-statement 411 with config.condition: 412 server = config.server 413 channel = None 414 if config.secure_mode: 415 fallback_creds = grpc.experimental.insecure_channel_credentials() 416 channel_creds = grpc.xds_channel_credentials(fallback_creds) 417 channel = grpc.secure_channel(server, channel_creds) 418 else: 419 channel = grpc.insecure_channel(server) 420 with channel: 421 stub = test_pb2_grpc.TestServiceStub(channel) 422 futures: Dict[int, Tuple[FutureFromCallType, str]] = {} 423 while not _stop_event.is_set(): 424 with config.condition: 425 if config.qps == 0: 426 config.condition.wait( 427 timeout=_CONFIG_CHANGE_TIMEOUT.total_seconds() 428 ) 429 continue 430 else: 431 duration_per_query = 1.0 / float(config.qps) 432 request_id = None 433 with _global_lock: 434 request_id = _global_rpc_id 435 _global_rpc_id += 1 436 _global_rpcs_started[config.method] += 1 437 start = time.time() 438 end = start + duration_per_query 439 _start_rpc( 440 config.method, 441 config.metadata, 442 request_id, 443 stub, 444 float(config.rpc_timeout_sec), 445 futures, 446 config.request_payload_size, 447 config.response_payload_size, 448 ) 449 print_response = config.print_response 450 _remove_completed_rpcs(futures, config.print_response) 451 logger.debug(f"Currently {len(futures)} in-flight RPCs") 452 now = time.time() 453 while now < end: 454 time.sleep(end - now) 455 now = time.time() 456 _cancel_all_rpcs(futures) 457 458 459class _XdsUpdateClientConfigureServicer( 460 test_pb2_grpc.XdsUpdateClientConfigureServiceServicer 461): 462 def __init__( 463 self, per_method_configs: Mapping[str, _ChannelConfiguration], qps: int 464 ): 465 super(_XdsUpdateClientConfigureServicer).__init__() 466 self._per_method_configs = per_method_configs 467 self._qps = qps 468 469 def Configure( 470 self, 471 request: messages_pb2.ClientConfigureRequest, 472 context: grpc.ServicerContext, 473 ) -> messages_pb2.ClientConfigureResponse: 474 logger.info("Received Configure RPC: %s", request) 475 method_strs = [_METHOD_ENUM_TO_STR[t] for t in request.types] 476 for method in _SUPPORTED_METHODS: 477 method_enum = _METHOD_STR_TO_ENUM[method] 478 channel_config = self._per_method_configs[method] 479 if method in method_strs: 480 qps = self._qps 481 metadata = ( 482 (md.key, md.value) 483 for md in request.metadata 484 if md.type == method_enum 485 ) 486 # For backward compatibility, do not change timeout when we 487 # receive a default value timeout. 488 if request.timeout_sec == 0: 489 timeout_sec = channel_config.rpc_timeout_sec 490 else: 491 timeout_sec = request.timeout_sec 492 else: 493 qps = 0 494 metadata = () 495 # Leave timeout unchanged for backward compatibility. 496 timeout_sec = channel_config.rpc_timeout_sec 497 with channel_config.condition: 498 channel_config.qps = qps 499 channel_config.metadata = list(metadata) 500 channel_config.rpc_timeout_sec = timeout_sec 501 channel_config.condition.notify_all() 502 return messages_pb2.ClientConfigureResponse() 503 504 505class _MethodHandle: 506 """An object grouping together threads driving RPCs for a method.""" 507 508 _channel_threads: List[threading.Thread] 509 510 def __init__( 511 self, num_channels: int, channel_config: _ChannelConfiguration 512 ): 513 """Creates and starts a group of threads running the indicated method.""" 514 self._channel_threads = [] 515 for i in range(num_channels): 516 thread = threading.Thread( 517 target=_run_single_channel, args=(channel_config,) 518 ) 519 thread.start() 520 self._channel_threads.append(thread) 521 522 def stop(self) -> None: 523 """Joins all threads referenced by the handle.""" 524 for channel_thread in self._channel_threads: 525 channel_thread.join() 526 527 528def _run( 529 args: argparse.Namespace, 530 methods: Sequence[str], 531 per_method_metadata: PerMethodMetadataType, 532) -> None: 533 logger.info("Starting python xDS Interop Client.") 534 csm_plugin = None 535 if args.enable_csm_observability: 536 csm_plugin = _prepare_csm_observability_plugin() 537 csm_plugin.register_global() 538 global _global_server # pylint: disable=global-statement 539 method_handles = [] 540 channel_configs = {} 541 for method in _SUPPORTED_METHODS: 542 if method in methods: 543 qps = args.qps 544 else: 545 qps = 0 546 channel_config = _ChannelConfiguration( 547 method, 548 per_method_metadata.get(method, []), 549 qps, 550 args.server, 551 args.rpc_timeout_sec, 552 args.print_response, 553 args.secure_mode, 554 args.request_payload_size, 555 args.response_payload_size, 556 ) 557 channel_configs[method] = channel_config 558 method_handles.append(_MethodHandle(args.num_channels, channel_config)) 559 _global_server = grpc.server(concurrent.futures.ThreadPoolExecutor()) 560 _global_server.add_insecure_port(f"0.0.0.0:{args.stats_port}") 561 test_pb2_grpc.add_LoadBalancerStatsServiceServicer_to_server( 562 _LoadBalancerStatsServicer(), _global_server 563 ) 564 test_pb2_grpc.add_XdsUpdateClientConfigureServiceServicer_to_server( 565 _XdsUpdateClientConfigureServicer(channel_configs, args.qps), 566 _global_server, 567 ) 568 grpc_admin.add_admin_servicers(_global_server) 569 _global_server.start() 570 _global_server.wait_for_termination() 571 for method_handle in method_handles: 572 method_handle.stop() 573 if csm_plugin: 574 csm_plugin.deregister_global() 575 576 577def parse_metadata_arg(metadata_arg: str) -> PerMethodMetadataType: 578 metadata = metadata_arg.split(",") if args.metadata else [] 579 per_method_metadata = collections.defaultdict(list) 580 for metadatum in metadata: 581 elems = metadatum.split(":") 582 if len(elems) != 3: 583 raise ValueError( 584 f"'{metadatum}' was not in the form 'METHOD:KEY:VALUE'" 585 ) 586 if elems[0] not in _SUPPORTED_METHODS: 587 raise ValueError(f"Unrecognized method '{elems[0]}'") 588 per_method_metadata[elems[0]].append((elems[1], elems[2])) 589 return per_method_metadata 590 591 592def parse_rpc_arg(rpc_arg: str) -> Sequence[str]: 593 methods = rpc_arg.split(",") 594 if set(methods) - set(_SUPPORTED_METHODS): 595 raise ValueError( 596 "--rpc supported methods: {}".format(", ".join(_SUPPORTED_METHODS)) 597 ) 598 return methods 599 600 601def bool_arg(arg: str) -> bool: 602 if arg.lower() in ("true", "yes", "y"): 603 return True 604 elif arg.lower() in ("false", "no", "n"): 605 return False 606 else: 607 raise argparse.ArgumentTypeError(f"Could not parse '{arg}' as a bool.") 608 609 610def _prepare_csm_observability_plugin() -> CsmOpenTelemetryPlugin: 611 # Start Prometheus client 612 start_http_server(port=_PROMETHEUS_PORT, addr="0.0.0.0") 613 reader = PrometheusMetricReader() 614 meter_provider = MeterProvider(metric_readers=[reader]) 615 csm_plugin = CsmOpenTelemetryPlugin( 616 meter_provider=meter_provider, 617 ) 618 return csm_plugin 619 620 621if __name__ == "__main__": 622 parser = argparse.ArgumentParser( 623 description="Run Python XDS interop client." 624 ) 625 parser.add_argument( 626 "--num_channels", 627 default=1, 628 type=int, 629 help="The number of channels from which to send requests.", 630 ) 631 parser.add_argument( 632 "--print_response", 633 default="False", 634 type=bool_arg, 635 help="Write RPC response to STDOUT.", 636 ) 637 parser.add_argument( 638 "--qps", 639 default=1, 640 type=int, 641 help="The number of queries to send from each channel per second.", 642 ) 643 parser.add_argument( 644 "--rpc_timeout_sec", 645 default=30, 646 type=int, 647 help="The per-RPC timeout in seconds.", 648 ) 649 parser.add_argument( 650 "--server", default="localhost:50051", help="The address of the server." 651 ) 652 parser.add_argument( 653 "--stats_port", 654 default=50052, 655 type=int, 656 help="The port on which to expose the peer distribution stats service.", 657 ) 658 parser.add_argument( 659 "--secure_mode", 660 default="False", 661 type=bool_arg, 662 help="If specified, uses xDS credentials to connect to the server.", 663 ) 664 parser.add_argument( 665 "--verbose", 666 help="verbose log output", 667 default=False, 668 action="store_true", 669 ) 670 parser.add_argument( 671 "--log_file", default=None, type=str, help="A file to log to." 672 ) 673 parser.add_argument( 674 "--enable_csm_observability", 675 help="Whether to enable CSM Observability", 676 default="False", 677 type=bool_arg, 678 ) 679 parser.add_argument( 680 "--request_payload_size", 681 default=0, 682 type=int, 683 help="Set the SimpleRequest.payload.body to a string of repeated 0 (zero) ASCII characters of the given size in bytes.", 684 ) 685 parser.add_argument( 686 "--response_payload_size", 687 default=0, 688 type=int, 689 help="Ask the server to respond with SimpleResponse.payload.body of the given length (may not be implemented on the server).", 690 ) 691 rpc_help = "A comma-delimited list of RPC methods to run. Must be one of " 692 rpc_help += ", ".join(_SUPPORTED_METHODS) 693 rpc_help += "." 694 parser.add_argument("--rpc", default="UnaryCall", type=str, help=rpc_help) 695 metadata_help = ( 696 "A comma-delimited list of 3-tuples of the form " 697 + "METHOD:KEY:VALUE, e.g. " 698 + "EmptyCall:key1:value1,UnaryCall:key2:value2,EmptyCall:k3:v3" 699 ) 700 parser.add_argument("--metadata", default="", type=str, help=metadata_help) 701 args = parser.parse_args() 702 signal.signal(signal.SIGINT, _handle_sigint) 703 if args.verbose: 704 logger.setLevel(logging.DEBUG) 705 if args.log_file: 706 file_handler = logging.FileHandler(args.log_file, mode="a") 707 file_handler.setFormatter(formatter) 708 logger.addHandler(file_handler) 709 _run(args, parse_rpc_arg(args.rpc), parse_metadata_arg(args.metadata)) 710