1#!/usr/bin/env python 2# Copyright 2020 gRPC authors. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15"""Run xDS integration tests on GCP using Traffic Director.""" 16 17import argparse 18import datetime 19import googleapiclient.discovery 20import grpc 21import json 22import logging 23import os 24import random 25import shlex 26import socket 27import subprocess 28import sys 29import tempfile 30import time 31import uuid 32 33from oauth2client.client import GoogleCredentials 34from google.protobuf import json_format 35 36import python_utils.jobset as jobset 37import python_utils.report_utils as report_utils 38 39from src.proto.grpc.health.v1 import health_pb2 40from src.proto.grpc.health.v1 import health_pb2_grpc 41from src.proto.grpc.testing import empty_pb2 42from src.proto.grpc.testing import messages_pb2 43from src.proto.grpc.testing import test_pb2_grpc 44 45# Envoy protos provided by PyPI package xds-protos 46# Needs to import the generated Python file to load descriptors 47try: 48 from envoy.service.status.v3 import csds_pb2 49 from envoy.service.status.v3 import csds_pb2_grpc 50 from envoy.extensions.filters.network.http_connection_manager.v3 import http_connection_manager_pb2 51 from envoy.extensions.filters.common.fault.v3 import fault_pb2 52 from envoy.extensions.filters.http.fault.v3 import fault_pb2 53 from envoy.extensions.filters.http.router.v3 import router_pb2 54except ImportError: 55 # These protos are required by CSDS test. We should not fail the entire 56 # script for one test case. 57 pass 58 59logger = logging.getLogger() 60console_handler = logging.StreamHandler() 61formatter = logging.Formatter(fmt='%(asctime)s: %(levelname)-8s %(message)s') 62console_handler.setFormatter(formatter) 63logger.handlers = [] 64logger.addHandler(console_handler) 65logger.setLevel(logging.WARNING) 66 67# Suppress excessive logs for gRPC Python 68original_grpc_trace = os.environ.pop('GRPC_TRACE', None) 69original_grpc_verbosity = os.environ.pop('GRPC_VERBOSITY', None) 70# Suppress not-essential logs for GCP clients 71logging.getLogger('google_auth_httplib2').setLevel(logging.WARNING) 72logging.getLogger('googleapiclient.discovery').setLevel(logging.WARNING) 73 74_TEST_CASES = [ 75 'backends_restart', 76 'change_backend_service', 77 'gentle_failover', 78 'load_report_based_failover', 79 'ping_pong', 80 'remove_instance_group', 81 'round_robin', 82 'secondary_locality_gets_no_requests_on_partial_primary_failure', 83 'secondary_locality_gets_requests_on_primary_failure', 84 'traffic_splitting', 85 'path_matching', 86 'header_matching', 87 'forwarding_rule_port_match', 88 'forwarding_rule_default_port', 89 'metadata_filter', 90] 91 92# Valid test cases, but not in all. So the tests can only run manually, and 93# aren't enabled automatically for all languages. 94# 95# TODO: Move them into _TEST_CASES when support is ready in all languages. 96_ADDITIONAL_TEST_CASES = [ 97 'circuit_breaking', 98 'timeout', 99 'fault_injection', 100 'csds', 101 'api_listener', # TODO(b/187352987) Relieve quota pressure 102] 103 104# Test cases that require the V3 API. Skipped in older runs. 105_V3_TEST_CASES = frozenset(['timeout', 'fault_injection', 'csds']) 106 107# Test cases that require the alpha API. Skipped for stable API runs. 108_ALPHA_TEST_CASES = frozenset(['timeout']) 109 110 111def parse_test_cases(arg): 112 if arg == '': 113 return [] 114 arg_split = arg.split(',') 115 test_cases = set() 116 all_test_cases = _TEST_CASES + _ADDITIONAL_TEST_CASES 117 for arg in arg_split: 118 if arg == "all": 119 test_cases = test_cases.union(_TEST_CASES) 120 else: 121 test_cases = test_cases.union([arg]) 122 if not all([test_case in all_test_cases for test_case in test_cases]): 123 raise Exception('Failed to parse test cases %s' % arg) 124 # Perserve order. 125 return [x for x in all_test_cases if x in test_cases] 126 127 128def parse_port_range(port_arg): 129 try: 130 port = int(port_arg) 131 return range(port, port + 1) 132 except: 133 port_min, port_max = port_arg.split(':') 134 return range(int(port_min), int(port_max) + 1) 135 136 137argp = argparse.ArgumentParser(description='Run xDS interop tests on GCP') 138# TODO(zdapeng): remove default value of project_id and project_num 139argp.add_argument('--project_id', default='grpc-testing', help='GCP project id') 140argp.add_argument('--project_num', 141 default='830293263384', 142 help='GCP project number') 143argp.add_argument( 144 '--gcp_suffix', 145 default='', 146 help='Optional suffix for all generated GCP resource names. Useful to ' 147 'ensure distinct names across test runs.') 148argp.add_argument( 149 '--test_case', 150 default='ping_pong', 151 type=parse_test_cases, 152 help='Comma-separated list of test cases to run. Available tests: %s, ' 153 '(or \'all\' to run every test). ' 154 'Alternative tests not included in \'all\': %s' % 155 (','.join(_TEST_CASES), ','.join(_ADDITIONAL_TEST_CASES))) 156argp.add_argument( 157 '--bootstrap_file', 158 default='', 159 help='File to reference via GRPC_XDS_BOOTSTRAP. Disables built-in ' 160 'bootstrap generation') 161argp.add_argument( 162 '--xds_v3_support', 163 default=False, 164 action='store_true', 165 help='Support xDS v3 via GRPC_XDS_EXPERIMENTAL_V3_SUPPORT. ' 166 'If a pre-created bootstrap file is provided via the --bootstrap_file ' 167 'parameter, it should include xds_v3 in its server_features field.') 168argp.add_argument( 169 '--client_cmd', 170 default=None, 171 help='Command to launch xDS test client. {server_uri}, {stats_port} and ' 172 '{qps} references will be replaced using str.format(). GRPC_XDS_BOOTSTRAP ' 173 'will be set for the command') 174argp.add_argument( 175 '--client_hosts', 176 default=None, 177 help='Comma-separated list of hosts running client processes. If set, ' 178 '--client_cmd is ignored and client processes are assumed to be running on ' 179 'the specified hosts.') 180argp.add_argument('--zone', default='us-central1-a') 181argp.add_argument('--secondary_zone', 182 default='us-west1-b', 183 help='Zone to use for secondary TD locality tests') 184argp.add_argument('--qps', default=100, type=int, help='Client QPS') 185argp.add_argument( 186 '--wait_for_backend_sec', 187 default=1200, 188 type=int, 189 help='Time limit for waiting for created backend services to report ' 190 'healthy when launching or updated GCP resources') 191argp.add_argument( 192 '--use_existing_gcp_resources', 193 default=False, 194 action='store_true', 195 help= 196 'If set, find and use already created GCP resources instead of creating new' 197 ' ones.') 198argp.add_argument( 199 '--keep_gcp_resources', 200 default=False, 201 action='store_true', 202 help= 203 'Leave GCP VMs and configuration running after test. Default behavior is ' 204 'to delete when tests complete.') 205argp.add_argument( 206 '--compute_discovery_document', 207 default=None, 208 type=str, 209 help= 210 'If provided, uses this file instead of retrieving via the GCP discovery ' 211 'API') 212argp.add_argument( 213 '--alpha_compute_discovery_document', 214 default=None, 215 type=str, 216 help='If provided, uses this file instead of retrieving via the alpha GCP ' 217 'discovery API') 218argp.add_argument('--network', 219 default='global/networks/default', 220 help='GCP network to use') 221_DEFAULT_PORT_RANGE = '8080:8280' 222argp.add_argument('--service_port_range', 223 default=_DEFAULT_PORT_RANGE, 224 type=parse_port_range, 225 help='Listening port for created gRPC backends. Specified as ' 226 'either a single int or as a range in the format min:max, in ' 227 'which case an available port p will be chosen s.t. min <= p ' 228 '<= max') 229argp.add_argument( 230 '--stats_port', 231 default=8079, 232 type=int, 233 help='Local port for the client process to expose the LB stats service') 234argp.add_argument('--xds_server', 235 default='trafficdirector.googleapis.com:443', 236 help='xDS server') 237argp.add_argument('--source_image', 238 default='projects/debian-cloud/global/images/family/debian-9', 239 help='Source image for VMs created during the test') 240argp.add_argument('--path_to_server_binary', 241 default=None, 242 type=str, 243 help='If set, the server binary must already be pre-built on ' 244 'the specified source image') 245argp.add_argument('--machine_type', 246 default='e2-standard-2', 247 help='Machine type for VMs created during the test') 248argp.add_argument( 249 '--instance_group_size', 250 default=2, 251 type=int, 252 help='Number of VMs to create per instance group. Certain test cases (e.g., ' 253 'round_robin) may not give meaningful results if this is set to a value ' 254 'less than 2.') 255argp.add_argument('--verbose', 256 help='verbose log output', 257 default=False, 258 action='store_true') 259# TODO(ericgribkoff) Remove this param once the sponge-formatted log files are 260# visible in all test environments. 261argp.add_argument('--log_client_output', 262 help='Log captured client output', 263 default=False, 264 action='store_true') 265# TODO(ericgribkoff) Remove this flag once all test environments are verified to 266# have access to the alpha compute APIs. 267argp.add_argument('--only_stable_gcp_apis', 268 help='Do not use alpha compute APIs. Some tests may be ' 269 'incompatible with this option (gRPC health checks are ' 270 'currently alpha and required for simulating server failure', 271 default=False, 272 action='store_true') 273args = argp.parse_args() 274 275if args.verbose: 276 logger.setLevel(logging.DEBUG) 277 278CLIENT_HOSTS = [] 279if args.client_hosts: 280 CLIENT_HOSTS = args.client_hosts.split(',') 281 282# Each of the config propagation in the control plane should finish within 600s. 283# Otherwise, it indicates a bug in the control plane. The config propagation 284# includes all kinds of traffic config update, like updating urlMap, creating 285# the resources for the first time, updating BackendService, and changing the 286# status of endpoints in BackendService. 287_WAIT_FOR_URL_MAP_PATCH_SEC = 600 288# In general, fetching load balancing stats only takes ~10s. However, slow 289# config update could lead to empty EDS or similar symptoms causing the 290# connection to hang for a long period of time. So, we want to extend the stats 291# wait time to be the same as urlMap patch time. 292_WAIT_FOR_STATS_SEC = _WAIT_FOR_URL_MAP_PATCH_SEC 293 294_DEFAULT_SERVICE_PORT = 80 295_WAIT_FOR_BACKEND_SEC = args.wait_for_backend_sec 296_WAIT_FOR_OPERATION_SEC = 1200 297_INSTANCE_GROUP_SIZE = args.instance_group_size 298_NUM_TEST_RPCS = 10 * args.qps 299_CONNECTION_TIMEOUT_SEC = 60 300_GCP_API_RETRIES = 5 301_BOOTSTRAP_TEMPLATE = """ 302{{ 303 "node": {{ 304 "id": "{node_id}", 305 "metadata": {{ 306 "TRAFFICDIRECTOR_NETWORK_NAME": "%s", 307 "com.googleapis.trafficdirector.config_time_trace": "TRUE" 308 }}, 309 "locality": {{ 310 "zone": "%s" 311 }} 312 }}, 313 "xds_servers": [{{ 314 "server_uri": "%s", 315 "channel_creds": [ 316 {{ 317 "type": "google_default", 318 "config": {{}} 319 }} 320 ], 321 "server_features": {server_features} 322 }}] 323}}""" % (args.network.split('/')[-1], args.zone, args.xds_server) 324 325# TODO(ericgribkoff) Add change_backend_service to this list once TD no longer 326# sends an update with no localities when adding the MIG to the backend service 327# can race with the URL map patch. 328_TESTS_TO_FAIL_ON_RPC_FAILURE = ['ping_pong', 'round_robin'] 329# Tests that run UnaryCall and EmptyCall. 330_TESTS_TO_RUN_MULTIPLE_RPCS = ['path_matching', 'header_matching'] 331# Tests that make UnaryCall with test metadata. 332_TESTS_TO_SEND_METADATA = ['header_matching'] 333_TEST_METADATA_KEY = 'xds_md' 334_TEST_METADATA_VALUE_UNARY = 'unary_yranu' 335_TEST_METADATA_VALUE_EMPTY = 'empty_ytpme' 336# Extra RPC metadata whose value is a number, sent with UnaryCall only. 337_TEST_METADATA_NUMERIC_KEY = 'xds_md_numeric' 338_TEST_METADATA_NUMERIC_VALUE = '159' 339_PATH_MATCHER_NAME = 'path-matcher' 340_BASE_TEMPLATE_NAME = 'test-template' 341_BASE_INSTANCE_GROUP_NAME = 'test-ig' 342_BASE_HEALTH_CHECK_NAME = 'test-hc' 343_BASE_FIREWALL_RULE_NAME = 'test-fw-rule' 344_BASE_BACKEND_SERVICE_NAME = 'test-backend-service' 345_BASE_URL_MAP_NAME = 'test-map' 346_BASE_SERVICE_HOST = 'grpc-test' 347_BASE_TARGET_PROXY_NAME = 'test-target-proxy' 348_BASE_FORWARDING_RULE_NAME = 'test-forwarding-rule' 349_TEST_LOG_BASE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 350 '../../reports') 351_SPONGE_LOG_NAME = 'sponge_log.log' 352_SPONGE_XML_NAME = 'sponge_log.xml' 353 354 355def get_client_stats(num_rpcs, timeout_sec): 356 if CLIENT_HOSTS: 357 hosts = CLIENT_HOSTS 358 else: 359 hosts = ['localhost'] 360 for host in hosts: 361 with grpc.insecure_channel('%s:%d' % 362 (host, args.stats_port)) as channel: 363 stub = test_pb2_grpc.LoadBalancerStatsServiceStub(channel) 364 request = messages_pb2.LoadBalancerStatsRequest() 365 request.num_rpcs = num_rpcs 366 request.timeout_sec = timeout_sec 367 rpc_timeout = timeout_sec + _CONNECTION_TIMEOUT_SEC 368 logger.debug('Invoking GetClientStats RPC to %s:%d:', host, 369 args.stats_port) 370 response = stub.GetClientStats(request, 371 wait_for_ready=True, 372 timeout=rpc_timeout) 373 logger.debug('Invoked GetClientStats RPC to %s: %s', host, 374 json_format.MessageToJson(response)) 375 return response 376 377 378def get_client_accumulated_stats(): 379 if CLIENT_HOSTS: 380 hosts = CLIENT_HOSTS 381 else: 382 hosts = ['localhost'] 383 for host in hosts: 384 with grpc.insecure_channel('%s:%d' % 385 (host, args.stats_port)) as channel: 386 stub = test_pb2_grpc.LoadBalancerStatsServiceStub(channel) 387 request = messages_pb2.LoadBalancerAccumulatedStatsRequest() 388 logger.debug('Invoking GetClientAccumulatedStats RPC to %s:%d:', 389 host, args.stats_port) 390 response = stub.GetClientAccumulatedStats( 391 request, wait_for_ready=True, timeout=_CONNECTION_TIMEOUT_SEC) 392 logger.debug('Invoked GetClientAccumulatedStats RPC to %s: %s', 393 host, response) 394 return response 395 396 397def get_client_xds_config_dump(): 398 if CLIENT_HOSTS: 399 hosts = CLIENT_HOSTS 400 else: 401 hosts = ['localhost'] 402 for host in hosts: 403 server_address = '%s:%d' % (host, args.stats_port) 404 with grpc.insecure_channel(server_address) as channel: 405 stub = csds_pb2_grpc.ClientStatusDiscoveryServiceStub(channel) 406 logger.debug('Fetching xDS config dump from %s', server_address) 407 response = stub.FetchClientStatus(csds_pb2.ClientStatusRequest(), 408 wait_for_ready=True, 409 timeout=_CONNECTION_TIMEOUT_SEC) 410 logger.debug('Fetched xDS config dump from %s', server_address) 411 if len(response.config) != 1: 412 logger.error('Unexpected number of ClientConfigs %d: %s', 413 len(response.config), response) 414 return None 415 else: 416 # Converting the ClientStatusResponse into JSON, because many 417 # fields are packed in google.protobuf.Any. It will require many 418 # duplicated code to unpack proto message and inspect values. 419 return json_format.MessageToDict( 420 response.config[0], preserving_proto_field_name=True) 421 422 423def configure_client(rpc_types, metadata=[], timeout_sec=None): 424 if CLIENT_HOSTS: 425 hosts = CLIENT_HOSTS 426 else: 427 hosts = ['localhost'] 428 for host in hosts: 429 with grpc.insecure_channel('%s:%d' % 430 (host, args.stats_port)) as channel: 431 stub = test_pb2_grpc.XdsUpdateClientConfigureServiceStub(channel) 432 request = messages_pb2.ClientConfigureRequest() 433 request.types.extend(rpc_types) 434 for rpc_type, md_key, md_value in metadata: 435 md = request.metadata.add() 436 md.type = rpc_type 437 md.key = md_key 438 md.value = md_value 439 if timeout_sec: 440 request.timeout_sec = timeout_sec 441 logger.debug( 442 'Invoking XdsUpdateClientConfigureService RPC to %s:%d: %s', 443 host, args.stats_port, request) 444 stub.Configure(request, 445 wait_for_ready=True, 446 timeout=_CONNECTION_TIMEOUT_SEC) 447 logger.debug('Invoked XdsUpdateClientConfigureService RPC to %s', 448 host) 449 450 451class RpcDistributionError(Exception): 452 pass 453 454 455def _verify_rpcs_to_given_backends(backends, timeout_sec, num_rpcs, 456 allow_failures): 457 start_time = time.time() 458 error_msg = None 459 logger.debug('Waiting for %d sec until backends %s receive load' % 460 (timeout_sec, backends)) 461 while time.time() - start_time <= timeout_sec: 462 error_msg = None 463 stats = get_client_stats(num_rpcs, timeout_sec) 464 rpcs_by_peer = stats.rpcs_by_peer 465 for backend in backends: 466 if backend not in rpcs_by_peer: 467 error_msg = 'Backend %s did not receive load' % backend 468 break 469 if not error_msg and len(rpcs_by_peer) > len(backends): 470 error_msg = 'Unexpected backend received load: %s' % rpcs_by_peer 471 if not allow_failures and stats.num_failures > 0: 472 error_msg = '%d RPCs failed' % stats.num_failures 473 if not error_msg: 474 return 475 raise RpcDistributionError(error_msg) 476 477 478def wait_until_all_rpcs_go_to_given_backends_or_fail(backends, 479 timeout_sec, 480 num_rpcs=_NUM_TEST_RPCS): 481 _verify_rpcs_to_given_backends(backends, 482 timeout_sec, 483 num_rpcs, 484 allow_failures=True) 485 486 487def wait_until_all_rpcs_go_to_given_backends(backends, 488 timeout_sec, 489 num_rpcs=_NUM_TEST_RPCS): 490 _verify_rpcs_to_given_backends(backends, 491 timeout_sec, 492 num_rpcs, 493 allow_failures=False) 494 495 496def wait_until_no_rpcs_go_to_given_backends(backends, timeout_sec): 497 start_time = time.time() 498 while time.time() - start_time <= timeout_sec: 499 stats = get_client_stats(_NUM_TEST_RPCS, timeout_sec) 500 error_msg = None 501 rpcs_by_peer = stats.rpcs_by_peer 502 for backend in backends: 503 if backend in rpcs_by_peer: 504 error_msg = 'Unexpected backend %s receives load' % backend 505 break 506 if not error_msg: 507 return 508 raise Exception('Unexpected RPCs going to given backends') 509 510 511def wait_until_rpcs_in_flight(rpc_type, timeout_sec, num_rpcs, threshold): 512 '''Block until the test client reaches the state with the given number 513 of RPCs being outstanding stably. 514 515 Args: 516 rpc_type: A string indicating the RPC method to check for. Either 517 'UnaryCall' or 'EmptyCall'. 518 timeout_sec: Maximum number of seconds to wait until the desired state 519 is reached. 520 num_rpcs: Expected number of RPCs to be in-flight. 521 threshold: Number within [0,100], the tolerable percentage by which 522 the actual number of RPCs in-flight can differ from the expected number. 523 ''' 524 if threshold < 0 or threshold > 100: 525 raise ValueError('Value error: Threshold should be between 0 to 100') 526 threshold_fraction = threshold / 100.0 527 start_time = time.time() 528 error_msg = None 529 logger.debug( 530 'Waiting for %d sec until %d %s RPCs (with %d%% tolerance) in-flight' % 531 (timeout_sec, num_rpcs, rpc_type, threshold)) 532 while time.time() - start_time <= timeout_sec: 533 error_msg = _check_rpcs_in_flight(rpc_type, num_rpcs, threshold, 534 threshold_fraction) 535 if error_msg: 536 logger.debug('Progress: %s', error_msg) 537 time.sleep(2) 538 else: 539 break 540 # Ensure the number of outstanding RPCs is stable. 541 if not error_msg: 542 time.sleep(5) 543 error_msg = _check_rpcs_in_flight(rpc_type, num_rpcs, threshold, 544 threshold_fraction) 545 if error_msg: 546 raise Exception("Wrong number of %s RPCs in-flight: %s" % 547 (rpc_type, error_msg)) 548 549 550def _check_rpcs_in_flight(rpc_type, num_rpcs, threshold, threshold_fraction): 551 error_msg = None 552 stats = get_client_accumulated_stats() 553 rpcs_started = stats.num_rpcs_started_by_method[rpc_type] 554 rpcs_succeeded = stats.num_rpcs_succeeded_by_method[rpc_type] 555 rpcs_failed = stats.num_rpcs_failed_by_method[rpc_type] 556 rpcs_in_flight = rpcs_started - rpcs_succeeded - rpcs_failed 557 if rpcs_in_flight < (num_rpcs * (1 - threshold_fraction)): 558 error_msg = ('actual(%d) < expected(%d - %d%%)' % 559 (rpcs_in_flight, num_rpcs, threshold)) 560 elif rpcs_in_flight > (num_rpcs * (1 + threshold_fraction)): 561 error_msg = ('actual(%d) > expected(%d + %d%%)' % 562 (rpcs_in_flight, num_rpcs, threshold)) 563 return error_msg 564 565 566def compare_distributions(actual_distribution, expected_distribution, 567 threshold): 568 """Compare if two distributions are similar. 569 570 Args: 571 actual_distribution: A list of floats, contains the actual distribution. 572 expected_distribution: A list of floats, contains the expected distribution. 573 threshold: Number within [0,100], the threshold percentage by which the 574 actual distribution can differ from the expected distribution. 575 576 Returns: 577 The similarity between the distributions as a boolean. Returns true if the 578 actual distribution lies within the threshold of the expected 579 distribution, false otherwise. 580 581 Raises: 582 ValueError: if threshold is not with in [0,100]. 583 Exception: containing detailed error messages. 584 """ 585 if len(expected_distribution) != len(actual_distribution): 586 raise Exception( 587 'Error: expected and actual distributions have different size (%d vs %d)' 588 % (len(expected_distribution), len(actual_distribution))) 589 if threshold < 0 or threshold > 100: 590 raise ValueError('Value error: Threshold should be between 0 to 100') 591 threshold_fraction = threshold / 100.0 592 for expected, actual in zip(expected_distribution, actual_distribution): 593 if actual < (expected * (1 - threshold_fraction)): 594 raise Exception("actual(%f) < expected(%f-%d%%)" % 595 (actual, expected, threshold)) 596 if actual > (expected * (1 + threshold_fraction)): 597 raise Exception("actual(%f) > expected(%f+%d%%)" % 598 (actual, expected, threshold)) 599 return True 600 601 602def compare_expected_instances(stats, expected_instances): 603 """Compare if stats have expected instances for each type of RPC. 604 605 Args: 606 stats: LoadBalancerStatsResponse reported by interop client. 607 expected_instances: a dict with key as the RPC type (string), value as 608 the expected backend instances (list of strings). 609 610 Returns: 611 Returns true if the instances are expected. False if not. 612 """ 613 for rpc_type, expected_peers in expected_instances.items(): 614 rpcs_by_peer_for_type = stats.rpcs_by_method[rpc_type] 615 rpcs_by_peer = rpcs_by_peer_for_type.rpcs_by_peer if rpcs_by_peer_for_type else None 616 logger.debug('rpc: %s, by_peer: %s', rpc_type, rpcs_by_peer) 617 peers = list(rpcs_by_peer.keys()) 618 if set(peers) != set(expected_peers): 619 logger.info('unexpected peers for %s, got %s, want %s', rpc_type, 620 peers, expected_peers) 621 return False 622 return True 623 624 625def test_backends_restart(gcp, backend_service, instance_group): 626 logger.info('Running test_backends_restart') 627 instance_names = get_instance_names(gcp, instance_group) 628 num_instances = len(instance_names) 629 start_time = time.time() 630 wait_until_all_rpcs_go_to_given_backends(instance_names, 631 _WAIT_FOR_STATS_SEC) 632 try: 633 resize_instance_group(gcp, instance_group, 0) 634 wait_until_all_rpcs_go_to_given_backends_or_fail([], 635 _WAIT_FOR_BACKEND_SEC) 636 finally: 637 resize_instance_group(gcp, instance_group, num_instances) 638 wait_for_healthy_backends(gcp, backend_service, instance_group) 639 new_instance_names = get_instance_names(gcp, instance_group) 640 wait_until_all_rpcs_go_to_given_backends(new_instance_names, 641 _WAIT_FOR_BACKEND_SEC) 642 643 644def test_change_backend_service(gcp, original_backend_service, instance_group, 645 alternate_backend_service, 646 same_zone_instance_group): 647 logger.info('Running test_change_backend_service') 648 original_backend_instances = get_instance_names(gcp, instance_group) 649 alternate_backend_instances = get_instance_names(gcp, 650 same_zone_instance_group) 651 patch_backend_service(gcp, alternate_backend_service, 652 [same_zone_instance_group]) 653 wait_for_healthy_backends(gcp, original_backend_service, instance_group) 654 wait_for_healthy_backends(gcp, alternate_backend_service, 655 same_zone_instance_group) 656 wait_until_all_rpcs_go_to_given_backends(original_backend_instances, 657 _WAIT_FOR_STATS_SEC) 658 try: 659 patch_url_map_backend_service(gcp, alternate_backend_service) 660 wait_until_all_rpcs_go_to_given_backends(alternate_backend_instances, 661 _WAIT_FOR_URL_MAP_PATCH_SEC) 662 finally: 663 patch_url_map_backend_service(gcp, original_backend_service) 664 patch_backend_service(gcp, alternate_backend_service, []) 665 666 667def test_gentle_failover(gcp, 668 backend_service, 669 primary_instance_group, 670 secondary_instance_group, 671 swapped_primary_and_secondary=False): 672 logger.info('Running test_gentle_failover') 673 num_primary_instances = len(get_instance_names(gcp, primary_instance_group)) 674 min_instances_for_gentle_failover = 3 # Need >50% failure to start failover 675 try: 676 if num_primary_instances < min_instances_for_gentle_failover: 677 resize_instance_group(gcp, primary_instance_group, 678 min_instances_for_gentle_failover) 679 patch_backend_service( 680 gcp, backend_service, 681 [primary_instance_group, secondary_instance_group]) 682 primary_instance_names = get_instance_names(gcp, primary_instance_group) 683 secondary_instance_names = get_instance_names(gcp, 684 secondary_instance_group) 685 wait_for_healthy_backends(gcp, backend_service, primary_instance_group) 686 wait_for_healthy_backends(gcp, backend_service, 687 secondary_instance_group) 688 wait_until_all_rpcs_go_to_given_backends(primary_instance_names, 689 _WAIT_FOR_STATS_SEC) 690 instances_to_stop = primary_instance_names[:-1] 691 remaining_instances = primary_instance_names[-1:] 692 try: 693 set_serving_status(instances_to_stop, 694 gcp.service_port, 695 serving=False) 696 wait_until_all_rpcs_go_to_given_backends( 697 remaining_instances + secondary_instance_names, 698 _WAIT_FOR_BACKEND_SEC) 699 finally: 700 set_serving_status(primary_instance_names, 701 gcp.service_port, 702 serving=True) 703 except RpcDistributionError as e: 704 if not swapped_primary_and_secondary and is_primary_instance_group( 705 gcp, secondary_instance_group): 706 # Swap expectation of primary and secondary instance groups. 707 test_gentle_failover(gcp, 708 backend_service, 709 secondary_instance_group, 710 primary_instance_group, 711 swapped_primary_and_secondary=True) 712 else: 713 raise e 714 finally: 715 patch_backend_service(gcp, backend_service, [primary_instance_group]) 716 resize_instance_group(gcp, primary_instance_group, 717 num_primary_instances) 718 instance_names = get_instance_names(gcp, primary_instance_group) 719 wait_until_all_rpcs_go_to_given_backends(instance_names, 720 _WAIT_FOR_BACKEND_SEC) 721 722 723def test_load_report_based_failover(gcp, backend_service, 724 primary_instance_group, 725 secondary_instance_group): 726 logger.info('Running test_load_report_based_failover') 727 try: 728 patch_backend_service( 729 gcp, backend_service, 730 [primary_instance_group, secondary_instance_group]) 731 primary_instance_names = get_instance_names(gcp, primary_instance_group) 732 secondary_instance_names = get_instance_names(gcp, 733 secondary_instance_group) 734 wait_for_healthy_backends(gcp, backend_service, primary_instance_group) 735 wait_for_healthy_backends(gcp, backend_service, 736 secondary_instance_group) 737 wait_until_all_rpcs_go_to_given_backends(primary_instance_names, 738 _WAIT_FOR_STATS_SEC) 739 # Set primary locality's balance mode to RATE, and RPS to 20% of the 740 # client's QPS. The secondary locality will be used. 741 max_rate = int(args.qps * 1 / 5) 742 logger.info('Patching backend service to RATE with %d max_rate', 743 max_rate) 744 patch_backend_service( 745 gcp, 746 backend_service, [primary_instance_group, secondary_instance_group], 747 balancing_mode='RATE', 748 max_rate=max_rate) 749 wait_until_all_rpcs_go_to_given_backends( 750 primary_instance_names + secondary_instance_names, 751 _WAIT_FOR_BACKEND_SEC) 752 753 # Set primary locality's balance mode to RATE, and RPS to 120% of the 754 # client's QPS. Only the primary locality will be used. 755 max_rate = int(args.qps * 6 / 5) 756 logger.info('Patching backend service to RATE with %d max_rate', 757 max_rate) 758 patch_backend_service( 759 gcp, 760 backend_service, [primary_instance_group, secondary_instance_group], 761 balancing_mode='RATE', 762 max_rate=max_rate) 763 wait_until_all_rpcs_go_to_given_backends(primary_instance_names, 764 _WAIT_FOR_BACKEND_SEC) 765 logger.info("success") 766 finally: 767 patch_backend_service(gcp, backend_service, [primary_instance_group]) 768 instance_names = get_instance_names(gcp, primary_instance_group) 769 wait_until_all_rpcs_go_to_given_backends(instance_names, 770 _WAIT_FOR_BACKEND_SEC) 771 772 773def test_ping_pong(gcp, backend_service, instance_group): 774 logger.info('Running test_ping_pong') 775 wait_for_healthy_backends(gcp, backend_service, instance_group) 776 instance_names = get_instance_names(gcp, instance_group) 777 wait_until_all_rpcs_go_to_given_backends(instance_names, 778 _WAIT_FOR_STATS_SEC) 779 780 781def test_remove_instance_group(gcp, backend_service, instance_group, 782 same_zone_instance_group): 783 logger.info('Running test_remove_instance_group') 784 try: 785 patch_backend_service(gcp, 786 backend_service, 787 [instance_group, same_zone_instance_group], 788 balancing_mode='RATE') 789 wait_for_healthy_backends(gcp, backend_service, instance_group) 790 wait_for_healthy_backends(gcp, backend_service, 791 same_zone_instance_group) 792 instance_names = get_instance_names(gcp, instance_group) 793 same_zone_instance_names = get_instance_names(gcp, 794 same_zone_instance_group) 795 try: 796 wait_until_all_rpcs_go_to_given_backends( 797 instance_names + same_zone_instance_names, 798 _WAIT_FOR_OPERATION_SEC) 799 remaining_instance_group = same_zone_instance_group 800 remaining_instance_names = same_zone_instance_names 801 except RpcDistributionError as e: 802 # If connected to TD in a different zone, we may route traffic to 803 # only one instance group. Determine which group that is to continue 804 # with the remainder of the test case. 805 try: 806 wait_until_all_rpcs_go_to_given_backends( 807 instance_names, _WAIT_FOR_STATS_SEC) 808 remaining_instance_group = same_zone_instance_group 809 remaining_instance_names = same_zone_instance_names 810 except RpcDistributionError as e: 811 wait_until_all_rpcs_go_to_given_backends( 812 same_zone_instance_names, _WAIT_FOR_STATS_SEC) 813 remaining_instance_group = instance_group 814 remaining_instance_names = instance_names 815 patch_backend_service(gcp, 816 backend_service, [remaining_instance_group], 817 balancing_mode='RATE') 818 wait_until_all_rpcs_go_to_given_backends(remaining_instance_names, 819 _WAIT_FOR_BACKEND_SEC) 820 finally: 821 patch_backend_service(gcp, backend_service, [instance_group]) 822 wait_until_all_rpcs_go_to_given_backends(instance_names, 823 _WAIT_FOR_BACKEND_SEC) 824 825 826def test_round_robin(gcp, backend_service, instance_group): 827 logger.info('Running test_round_robin') 828 wait_for_healthy_backends(gcp, backend_service, instance_group) 829 instance_names = get_instance_names(gcp, instance_group) 830 threshold = 1 831 wait_until_all_rpcs_go_to_given_backends(instance_names, 832 _WAIT_FOR_STATS_SEC) 833 # TODO(ericgribkoff) Delayed config propagation from earlier tests 834 # may result in briefly receiving an empty EDS update, resulting in failed 835 # RPCs. Retry distribution validation if this occurs; long-term fix is 836 # creating new backend resources for each individual test case. 837 # Each attempt takes 10 seconds. Config propagation can take several 838 # minutes. 839 max_attempts = 40 840 for i in range(max_attempts): 841 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) 842 requests_received = [stats.rpcs_by_peer[x] for x in stats.rpcs_by_peer] 843 total_requests_received = sum(requests_received) 844 if total_requests_received != _NUM_TEST_RPCS: 845 logger.info('Unexpected RPC failures, retrying: %s', stats) 846 continue 847 expected_requests = total_requests_received / len(instance_names) 848 for instance in instance_names: 849 if abs(stats.rpcs_by_peer[instance] - 850 expected_requests) > threshold: 851 raise Exception( 852 'RPC peer distribution differs from expected by more than %d ' 853 'for instance %s (%s)' % (threshold, instance, stats)) 854 return 855 raise Exception('RPC failures persisted through %d retries' % max_attempts) 856 857 858def test_secondary_locality_gets_no_requests_on_partial_primary_failure( 859 gcp, 860 backend_service, 861 primary_instance_group, 862 secondary_instance_group, 863 swapped_primary_and_secondary=False): 864 logger.info( 865 'Running secondary_locality_gets_no_requests_on_partial_primary_failure' 866 ) 867 try: 868 patch_backend_service( 869 gcp, backend_service, 870 [primary_instance_group, secondary_instance_group]) 871 wait_for_healthy_backends(gcp, backend_service, primary_instance_group) 872 wait_for_healthy_backends(gcp, backend_service, 873 secondary_instance_group) 874 primary_instance_names = get_instance_names(gcp, primary_instance_group) 875 wait_until_all_rpcs_go_to_given_backends(primary_instance_names, 876 _WAIT_FOR_STATS_SEC) 877 instances_to_stop = primary_instance_names[:1] 878 remaining_instances = primary_instance_names[1:] 879 try: 880 set_serving_status(instances_to_stop, 881 gcp.service_port, 882 serving=False) 883 wait_until_all_rpcs_go_to_given_backends(remaining_instances, 884 _WAIT_FOR_BACKEND_SEC) 885 finally: 886 set_serving_status(primary_instance_names, 887 gcp.service_port, 888 serving=True) 889 except RpcDistributionError as e: 890 if not swapped_primary_and_secondary and is_primary_instance_group( 891 gcp, secondary_instance_group): 892 # Swap expectation of primary and secondary instance groups. 893 test_secondary_locality_gets_no_requests_on_partial_primary_failure( 894 gcp, 895 backend_service, 896 secondary_instance_group, 897 primary_instance_group, 898 swapped_primary_and_secondary=True) 899 else: 900 raise e 901 finally: 902 patch_backend_service(gcp, backend_service, [primary_instance_group]) 903 904 905def test_secondary_locality_gets_requests_on_primary_failure( 906 gcp, 907 backend_service, 908 primary_instance_group, 909 secondary_instance_group, 910 swapped_primary_and_secondary=False): 911 logger.info('Running secondary_locality_gets_requests_on_primary_failure') 912 try: 913 patch_backend_service( 914 gcp, backend_service, 915 [primary_instance_group, secondary_instance_group]) 916 wait_for_healthy_backends(gcp, backend_service, primary_instance_group) 917 wait_for_healthy_backends(gcp, backend_service, 918 secondary_instance_group) 919 primary_instance_names = get_instance_names(gcp, primary_instance_group) 920 secondary_instance_names = get_instance_names(gcp, 921 secondary_instance_group) 922 wait_until_all_rpcs_go_to_given_backends(primary_instance_names, 923 _WAIT_FOR_STATS_SEC) 924 try: 925 set_serving_status(primary_instance_names, 926 gcp.service_port, 927 serving=False) 928 wait_until_all_rpcs_go_to_given_backends(secondary_instance_names, 929 _WAIT_FOR_BACKEND_SEC) 930 finally: 931 set_serving_status(primary_instance_names, 932 gcp.service_port, 933 serving=True) 934 except RpcDistributionError as e: 935 if not swapped_primary_and_secondary and is_primary_instance_group( 936 gcp, secondary_instance_group): 937 # Swap expectation of primary and secondary instance groups. 938 test_secondary_locality_gets_requests_on_primary_failure( 939 gcp, 940 backend_service, 941 secondary_instance_group, 942 primary_instance_group, 943 swapped_primary_and_secondary=True) 944 else: 945 raise e 946 finally: 947 patch_backend_service(gcp, backend_service, [primary_instance_group]) 948 949 950def prepare_services_for_urlmap_tests(gcp, original_backend_service, 951 instance_group, alternate_backend_service, 952 same_zone_instance_group): 953 ''' 954 This function prepares the services to be ready for tests that modifies 955 urlmaps. 956 957 Returns: 958 Returns original and alternate backend names as lists of strings. 959 ''' 960 logger.info('waiting for original backends to become healthy') 961 wait_for_healthy_backends(gcp, original_backend_service, instance_group) 962 963 patch_backend_service(gcp, alternate_backend_service, 964 [same_zone_instance_group]) 965 logger.info('waiting for alternate to become healthy') 966 wait_for_healthy_backends(gcp, alternate_backend_service, 967 same_zone_instance_group) 968 969 original_backend_instances = get_instance_names(gcp, instance_group) 970 logger.info('original backends instances: %s', original_backend_instances) 971 972 alternate_backend_instances = get_instance_names(gcp, 973 same_zone_instance_group) 974 logger.info('alternate backends instances: %s', alternate_backend_instances) 975 976 # Start with all traffic going to original_backend_service. 977 logger.info('waiting for traffic to all go to original backends') 978 wait_until_all_rpcs_go_to_given_backends(original_backend_instances, 979 _WAIT_FOR_STATS_SEC) 980 return original_backend_instances, alternate_backend_instances 981 982 983def test_metadata_filter(gcp, original_backend_service, instance_group, 984 alternate_backend_service, same_zone_instance_group): 985 logger.info("Running test_metadata_filter") 986 wait_for_healthy_backends(gcp, original_backend_service, instance_group) 987 original_backend_instances = get_instance_names(gcp, instance_group) 988 alternate_backend_instances = get_instance_names(gcp, 989 same_zone_instance_group) 990 patch_backend_service(gcp, alternate_backend_service, 991 [same_zone_instance_group]) 992 wait_for_healthy_backends(gcp, alternate_backend_service, 993 same_zone_instance_group) 994 try: 995 with open(bootstrap_path) as f: 996 md = json.load(f)['node']['metadata'] 997 match_labels = [] 998 for k, v in md.items(): 999 match_labels.append({'name': k, 'value': v}) 1000 1001 not_match_labels = [{'name': 'fake', 'value': 'fail'}] 1002 test_route_rules = [ 1003 # test MATCH_ALL 1004 [ 1005 { 1006 'priority': 0, 1007 'matchRules': [{ 1008 'prefixMatch': 1009 '/', 1010 'metadataFilters': [{ 1011 'filterMatchCriteria': 'MATCH_ALL', 1012 'filterLabels': not_match_labels 1013 }] 1014 }], 1015 'service': original_backend_service.url 1016 }, 1017 { 1018 'priority': 1, 1019 'matchRules': [{ 1020 'prefixMatch': 1021 '/', 1022 'metadataFilters': [{ 1023 'filterMatchCriteria': 'MATCH_ALL', 1024 'filterLabels': match_labels 1025 }] 1026 }], 1027 'service': alternate_backend_service.url 1028 }, 1029 ], 1030 # test mixing MATCH_ALL and MATCH_ANY 1031 # test MATCH_ALL: super set labels won't match 1032 [ 1033 { 1034 'priority': 0, 1035 'matchRules': [{ 1036 'prefixMatch': 1037 '/', 1038 'metadataFilters': [{ 1039 'filterMatchCriteria': 'MATCH_ALL', 1040 'filterLabels': not_match_labels + match_labels 1041 }] 1042 }], 1043 'service': original_backend_service.url 1044 }, 1045 { 1046 'priority': 1, 1047 'matchRules': [{ 1048 'prefixMatch': 1049 '/', 1050 'metadataFilters': [{ 1051 'filterMatchCriteria': 'MATCH_ANY', 1052 'filterLabels': not_match_labels + match_labels 1053 }] 1054 }], 1055 'service': alternate_backend_service.url 1056 }, 1057 ], 1058 # test MATCH_ANY 1059 [ 1060 { 1061 'priority': 0, 1062 'matchRules': [{ 1063 'prefixMatch': 1064 '/', 1065 'metadataFilters': [{ 1066 'filterMatchCriteria': 'MATCH_ANY', 1067 'filterLabels': not_match_labels 1068 }] 1069 }], 1070 'service': original_backend_service.url 1071 }, 1072 { 1073 'priority': 1, 1074 'matchRules': [{ 1075 'prefixMatch': 1076 '/', 1077 'metadataFilters': [{ 1078 'filterMatchCriteria': 'MATCH_ANY', 1079 'filterLabels': not_match_labels + match_labels 1080 }] 1081 }], 1082 'service': alternate_backend_service.url 1083 }, 1084 ], 1085 # test match multiple route rules 1086 [ 1087 { 1088 'priority': 0, 1089 'matchRules': [{ 1090 'prefixMatch': 1091 '/', 1092 'metadataFilters': [{ 1093 'filterMatchCriteria': 'MATCH_ANY', 1094 'filterLabels': match_labels 1095 }] 1096 }], 1097 'service': alternate_backend_service.url 1098 }, 1099 { 1100 'priority': 1, 1101 'matchRules': [{ 1102 'prefixMatch': 1103 '/', 1104 'metadataFilters': [{ 1105 'filterMatchCriteria': 'MATCH_ALL', 1106 'filterLabels': match_labels 1107 }] 1108 }], 1109 'service': original_backend_service.url 1110 }, 1111 ] 1112 ] 1113 1114 for route_rules in test_route_rules: 1115 wait_until_all_rpcs_go_to_given_backends(original_backend_instances, 1116 _WAIT_FOR_STATS_SEC) 1117 patch_url_map_backend_service(gcp, 1118 original_backend_service, 1119 route_rules=route_rules) 1120 wait_until_no_rpcs_go_to_given_backends(original_backend_instances, 1121 _WAIT_FOR_STATS_SEC) 1122 wait_until_all_rpcs_go_to_given_backends( 1123 alternate_backend_instances, _WAIT_FOR_STATS_SEC) 1124 patch_url_map_backend_service(gcp, original_backend_service) 1125 finally: 1126 patch_backend_service(gcp, alternate_backend_service, []) 1127 1128 1129def test_api_listener(gcp, backend_service, instance_group, 1130 alternate_backend_service): 1131 logger.info("Running api_listener") 1132 try: 1133 wait_for_healthy_backends(gcp, backend_service, instance_group) 1134 backend_instances = get_instance_names(gcp, instance_group) 1135 wait_until_all_rpcs_go_to_given_backends(backend_instances, 1136 _WAIT_FOR_STATS_SEC) 1137 # create a second suite of map+tp+fr with the same host name in host rule 1138 # and we have to disable proxyless validation because it needs `0.0.0.0` 1139 # ip address in fr for proxyless and also we violate ip:port uniqueness 1140 # for test purpose. See https://github.com/grpc/grpc-java/issues/8009 1141 new_config_suffix = '2' 1142 create_url_map(gcp, url_map_name + new_config_suffix, backend_service, 1143 service_host_name) 1144 create_target_proxy(gcp, target_proxy_name + new_config_suffix, False) 1145 if not gcp.service_port: 1146 raise Exception( 1147 'Faied to find a valid port for the forwarding rule') 1148 potential_ip_addresses = [] 1149 max_attempts = 10 1150 for i in range(max_attempts): 1151 potential_ip_addresses.append('10.10.10.%d' % 1152 (random.randint(0, 255))) 1153 create_global_forwarding_rule(gcp, 1154 forwarding_rule_name + new_config_suffix, 1155 [gcp.service_port], 1156 potential_ip_addresses) 1157 if gcp.service_port != _DEFAULT_SERVICE_PORT: 1158 patch_url_map_host_rule_with_port(gcp, 1159 url_map_name + new_config_suffix, 1160 backend_service, 1161 service_host_name) 1162 wait_until_all_rpcs_go_to_given_backends(backend_instances, 1163 _WAIT_FOR_STATS_SEC) 1164 1165 delete_global_forwarding_rule(gcp, forwarding_rule_name) 1166 delete_target_proxy(gcp, target_proxy_name) 1167 delete_url_map(gcp, url_map_name) 1168 verify_attempts = int(_WAIT_FOR_URL_MAP_PATCH_SEC / _NUM_TEST_RPCS * 1169 args.qps) 1170 for i in range(verify_attempts): 1171 wait_until_all_rpcs_go_to_given_backends(backend_instances, 1172 _WAIT_FOR_STATS_SEC) 1173 # delete host rule for the original host name 1174 patch_url_map_backend_service(gcp, alternate_backend_service) 1175 wait_until_no_rpcs_go_to_given_backends(backend_instances, 1176 _WAIT_FOR_STATS_SEC) 1177 1178 finally: 1179 delete_global_forwarding_rule(gcp, 1180 forwarding_rule_name + new_config_suffix) 1181 delete_target_proxy(gcp, target_proxy_name + new_config_suffix) 1182 delete_url_map(gcp, url_map_name + new_config_suffix) 1183 create_url_map(gcp, url_map_name, backend_service, service_host_name) 1184 create_target_proxy(gcp, target_proxy_name) 1185 create_global_forwarding_rule(gcp, forwarding_rule_name, 1186 potential_service_ports) 1187 if gcp.service_port != _DEFAULT_SERVICE_PORT: 1188 patch_url_map_host_rule_with_port(gcp, url_map_name, 1189 backend_service, 1190 service_host_name) 1191 server_uri = service_host_name + ':' + str(gcp.service_port) 1192 else: 1193 server_uri = service_host_name 1194 return server_uri 1195 1196 1197def test_forwarding_rule_port_match(gcp, backend_service, instance_group): 1198 logger.info("Running test_forwarding_rule_port_match") 1199 try: 1200 wait_for_healthy_backends(gcp, backend_service, instance_group) 1201 backend_instances = get_instance_names(gcp, instance_group) 1202 wait_until_all_rpcs_go_to_given_backends(backend_instances, 1203 _WAIT_FOR_STATS_SEC) 1204 delete_global_forwarding_rule(gcp) 1205 create_global_forwarding_rule(gcp, forwarding_rule_name, [ 1206 x for x in parse_port_range(_DEFAULT_PORT_RANGE) 1207 if x != gcp.service_port 1208 ]) 1209 wait_until_no_rpcs_go_to_given_backends(backend_instances, 1210 _WAIT_FOR_STATS_SEC) 1211 finally: 1212 delete_global_forwarding_rule(gcp) 1213 create_global_forwarding_rule(gcp, forwarding_rule_name, 1214 potential_service_ports) 1215 if gcp.service_port != _DEFAULT_SERVICE_PORT: 1216 patch_url_map_host_rule_with_port(gcp, url_map_name, 1217 backend_service, 1218 service_host_name) 1219 server_uri = service_host_name + ':' + str(gcp.service_port) 1220 else: 1221 server_uri = service_host_name 1222 return server_uri 1223 1224 1225def test_forwarding_rule_default_port(gcp, backend_service, instance_group): 1226 logger.info("Running test_forwarding_rule_default_port") 1227 try: 1228 wait_for_healthy_backends(gcp, backend_service, instance_group) 1229 backend_instances = get_instance_names(gcp, instance_group) 1230 if gcp.service_port == _DEFAULT_SERVICE_PORT: 1231 wait_until_all_rpcs_go_to_given_backends(backend_instances, 1232 _WAIT_FOR_STATS_SEC) 1233 delete_global_forwarding_rule(gcp) 1234 create_global_forwarding_rule(gcp, forwarding_rule_name, 1235 parse_port_range(_DEFAULT_PORT_RANGE)) 1236 patch_url_map_host_rule_with_port(gcp, url_map_name, 1237 backend_service, 1238 service_host_name) 1239 wait_until_no_rpcs_go_to_given_backends(backend_instances, 1240 _WAIT_FOR_STATS_SEC) 1241 # expect success when no port in client request service uri, and no port in url-map 1242 delete_global_forwarding_rule(gcp) 1243 delete_target_proxy(gcp) 1244 delete_url_map(gcp) 1245 create_url_map(gcp, url_map_name, backend_service, service_host_name) 1246 create_target_proxy(gcp, gcp.target_proxy.name, False) 1247 potential_ip_addresses = [] 1248 max_attempts = 10 1249 for i in range(max_attempts): 1250 potential_ip_addresses.append('10.10.10.%d' % 1251 (random.randint(0, 255))) 1252 create_global_forwarding_rule(gcp, forwarding_rule_name, [80], 1253 potential_ip_addresses) 1254 wait_until_all_rpcs_go_to_given_backends(backend_instances, 1255 _WAIT_FOR_STATS_SEC) 1256 1257 # expect failure when no port in client request uri, but specify port in url-map 1258 patch_url_map_host_rule_with_port(gcp, url_map_name, backend_service, 1259 service_host_name) 1260 wait_until_no_rpcs_go_to_given_backends(backend_instances, 1261 _WAIT_FOR_STATS_SEC) 1262 finally: 1263 delete_global_forwarding_rule(gcp) 1264 delete_target_proxy(gcp) 1265 delete_url_map(gcp) 1266 create_url_map(gcp, url_map_name, backend_service, service_host_name) 1267 create_target_proxy(gcp, target_proxy_name) 1268 create_global_forwarding_rule(gcp, forwarding_rule_name, 1269 potential_service_ports) 1270 if gcp.service_port != _DEFAULT_SERVICE_PORT: 1271 patch_url_map_host_rule_with_port(gcp, url_map_name, 1272 backend_service, 1273 service_host_name) 1274 server_uri = service_host_name + ':' + str(gcp.service_port) 1275 else: 1276 server_uri = service_host_name 1277 return server_uri 1278 1279 1280def test_traffic_splitting(gcp, original_backend_service, instance_group, 1281 alternate_backend_service, same_zone_instance_group): 1282 # This test start with all traffic going to original_backend_service. Then 1283 # it updates URL-map to set default action to traffic splitting between 1284 # original and alternate. It waits for all backends in both services to 1285 # receive traffic, then verifies that weights are expected. 1286 logger.info('Running test_traffic_splitting') 1287 1288 original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests( 1289 gcp, original_backend_service, instance_group, 1290 alternate_backend_service, same_zone_instance_group) 1291 1292 try: 1293 # Patch urlmap, change route action to traffic splitting between 1294 # original and alternate. 1295 logger.info('patching url map with traffic splitting') 1296 original_service_percentage, alternate_service_percentage = 20, 80 1297 patch_url_map_backend_service( 1298 gcp, 1299 services_with_weights={ 1300 original_backend_service: original_service_percentage, 1301 alternate_backend_service: alternate_service_percentage, 1302 }) 1303 # Split percentage between instances: [20,80] -> [10,10,40,40]. 1304 expected_instance_percentage = [ 1305 original_service_percentage * 1.0 / len(original_backend_instances) 1306 ] * len(original_backend_instances) + [ 1307 alternate_service_percentage * 1.0 / 1308 len(alternate_backend_instances) 1309 ] * len(alternate_backend_instances) 1310 1311 # Wait for traffic to go to both services. 1312 logger.info( 1313 'waiting for traffic to go to all backends (including alternate)') 1314 wait_until_all_rpcs_go_to_given_backends( 1315 original_backend_instances + alternate_backend_instances, 1316 _WAIT_FOR_STATS_SEC) 1317 1318 # Verify that weights between two services are expected. 1319 retry_count = 10 1320 # Each attempt takes about 10 seconds, 10 retries is equivalent to 100 1321 # seconds timeout. 1322 for i in range(retry_count): 1323 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) 1324 got_instance_count = [ 1325 stats.rpcs_by_peer[i] for i in original_backend_instances 1326 ] + [stats.rpcs_by_peer[i] for i in alternate_backend_instances] 1327 total_count = sum(got_instance_count) 1328 got_instance_percentage = [ 1329 x * 100.0 / total_count for x in got_instance_count 1330 ] 1331 1332 try: 1333 compare_distributions(got_instance_percentage, 1334 expected_instance_percentage, 5) 1335 except Exception as e: 1336 logger.info('attempt %d', i) 1337 logger.info('got percentage: %s', got_instance_percentage) 1338 logger.info('expected percentage: %s', 1339 expected_instance_percentage) 1340 logger.info(e) 1341 if i == retry_count - 1: 1342 raise Exception( 1343 'RPC distribution (%s) differs from expected (%s)' % 1344 (got_instance_percentage, expected_instance_percentage)) 1345 else: 1346 logger.info("success") 1347 break 1348 finally: 1349 patch_url_map_backend_service(gcp, original_backend_service) 1350 patch_backend_service(gcp, alternate_backend_service, []) 1351 1352 1353def test_path_matching(gcp, original_backend_service, instance_group, 1354 alternate_backend_service, same_zone_instance_group): 1355 # This test start with all traffic (UnaryCall and EmptyCall) going to 1356 # original_backend_service. 1357 # 1358 # Then it updates URL-map to add routes, to make UnaryCall and EmptyCall to 1359 # go different backends. It waits for all backends in both services to 1360 # receive traffic, then verifies that traffic goes to the expected 1361 # backends. 1362 logger.info('Running test_path_matching') 1363 1364 original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests( 1365 gcp, original_backend_service, instance_group, 1366 alternate_backend_service, same_zone_instance_group) 1367 1368 try: 1369 # A list of tuples (route_rules, expected_instances). 1370 test_cases = [ 1371 ( 1372 [{ 1373 'priority': 0, 1374 # FullPath EmptyCall -> alternate_backend_service. 1375 'matchRules': [{ 1376 'fullPathMatch': '/grpc.testing.TestService/EmptyCall' 1377 }], 1378 'service': alternate_backend_service.url 1379 }], 1380 { 1381 "EmptyCall": alternate_backend_instances, 1382 "UnaryCall": original_backend_instances 1383 }), 1384 ( 1385 [{ 1386 'priority': 0, 1387 # Prefix UnaryCall -> alternate_backend_service. 1388 'matchRules': [{ 1389 'prefixMatch': '/grpc.testing.TestService/Unary' 1390 }], 1391 'service': alternate_backend_service.url 1392 }], 1393 { 1394 "UnaryCall": alternate_backend_instances, 1395 "EmptyCall": original_backend_instances 1396 }), 1397 ( 1398 # This test case is similar to the one above (but with route 1399 # services swapped). This test has two routes (full_path and 1400 # the default) to match EmptyCall, and both routes set 1401 # alternative_backend_service as the action. This forces the 1402 # client to handle duplicate Clusters in the RDS response. 1403 [ 1404 { 1405 'priority': 0, 1406 # Prefix UnaryCall -> original_backend_service. 1407 'matchRules': [{ 1408 'prefixMatch': '/grpc.testing.TestService/Unary' 1409 }], 1410 'service': original_backend_service.url 1411 }, 1412 { 1413 'priority': 1, 1414 # FullPath EmptyCall -> alternate_backend_service. 1415 'matchRules': [{ 1416 'fullPathMatch': 1417 '/grpc.testing.TestService/EmptyCall' 1418 }], 1419 'service': alternate_backend_service.url 1420 } 1421 ], 1422 { 1423 "UnaryCall": original_backend_instances, 1424 "EmptyCall": alternate_backend_instances 1425 }), 1426 ( 1427 [{ 1428 'priority': 0, 1429 # Regex UnaryCall -> alternate_backend_service. 1430 'matchRules': [{ 1431 'regexMatch': 1432 '^\/.*\/UnaryCall$' # Unary methods with any services. 1433 }], 1434 'service': alternate_backend_service.url 1435 }], 1436 { 1437 "UnaryCall": alternate_backend_instances, 1438 "EmptyCall": original_backend_instances 1439 }), 1440 ( 1441 [{ 1442 'priority': 0, 1443 # ignoreCase EmptyCall -> alternate_backend_service. 1444 'matchRules': [{ 1445 # Case insensitive matching. 1446 'fullPathMatch': '/gRpC.tEsTinG.tEstseRvice/empTycaLl', 1447 'ignoreCase': True, 1448 }], 1449 'service': alternate_backend_service.url 1450 }], 1451 { 1452 "UnaryCall": original_backend_instances, 1453 "EmptyCall": alternate_backend_instances 1454 }), 1455 ] 1456 1457 for (route_rules, expected_instances) in test_cases: 1458 logger.info('patching url map with %s', route_rules) 1459 patch_url_map_backend_service(gcp, 1460 original_backend_service, 1461 route_rules=route_rules) 1462 1463 # Wait for traffic to go to both services. 1464 logger.info( 1465 'waiting for traffic to go to all backends (including alternate)' 1466 ) 1467 wait_until_all_rpcs_go_to_given_backends( 1468 original_backend_instances + alternate_backend_instances, 1469 _WAIT_FOR_STATS_SEC) 1470 1471 retry_count = 80 1472 # Each attempt takes about 5 seconds, 80 retries is equivalent to 400 1473 # seconds timeout. 1474 for i in range(retry_count): 1475 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) 1476 if not stats.rpcs_by_method: 1477 raise ValueError( 1478 'stats.rpcs_by_method is None, the interop client stats service does not support this test case' 1479 ) 1480 logger.info('attempt %d', i) 1481 if compare_expected_instances(stats, expected_instances): 1482 logger.info("success") 1483 break 1484 elif i == retry_count - 1: 1485 raise Exception( 1486 'timeout waiting for RPCs to the expected instances: %s' 1487 % expected_instances) 1488 finally: 1489 patch_url_map_backend_service(gcp, original_backend_service) 1490 patch_backend_service(gcp, alternate_backend_service, []) 1491 1492 1493def test_header_matching(gcp, original_backend_service, instance_group, 1494 alternate_backend_service, same_zone_instance_group): 1495 # This test start with all traffic (UnaryCall and EmptyCall) going to 1496 # original_backend_service. 1497 # 1498 # Then it updates URL-map to add routes, to make RPCs with test headers to 1499 # go to different backends. It waits for all backends in both services to 1500 # receive traffic, then verifies that traffic goes to the expected 1501 # backends. 1502 logger.info('Running test_header_matching') 1503 1504 original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests( 1505 gcp, original_backend_service, instance_group, 1506 alternate_backend_service, same_zone_instance_group) 1507 1508 try: 1509 # A list of tuples (route_rules, expected_instances). 1510 test_cases = [ 1511 ( 1512 [{ 1513 'priority': 0, 1514 # Header ExactMatch -> alternate_backend_service. 1515 # EmptyCall is sent with the metadata. 1516 'matchRules': [{ 1517 'prefixMatch': 1518 '/', 1519 'headerMatches': [{ 1520 'headerName': _TEST_METADATA_KEY, 1521 'exactMatch': _TEST_METADATA_VALUE_EMPTY 1522 }] 1523 }], 1524 'service': alternate_backend_service.url 1525 }], 1526 { 1527 "EmptyCall": alternate_backend_instances, 1528 "UnaryCall": original_backend_instances 1529 }), 1530 ( 1531 [{ 1532 'priority': 0, 1533 # Header PrefixMatch -> alternate_backend_service. 1534 # UnaryCall is sent with the metadata. 1535 'matchRules': [{ 1536 'prefixMatch': 1537 '/', 1538 'headerMatches': [{ 1539 'headerName': _TEST_METADATA_KEY, 1540 'prefixMatch': _TEST_METADATA_VALUE_UNARY[:2] 1541 }] 1542 }], 1543 'service': alternate_backend_service.url 1544 }], 1545 { 1546 "EmptyCall": original_backend_instances, 1547 "UnaryCall": alternate_backend_instances 1548 }), 1549 ( 1550 [{ 1551 'priority': 0, 1552 # Header SuffixMatch -> alternate_backend_service. 1553 # EmptyCall is sent with the metadata. 1554 'matchRules': [{ 1555 'prefixMatch': 1556 '/', 1557 'headerMatches': [{ 1558 'headerName': _TEST_METADATA_KEY, 1559 'suffixMatch': _TEST_METADATA_VALUE_EMPTY[-2:] 1560 }] 1561 }], 1562 'service': alternate_backend_service.url 1563 }], 1564 { 1565 "EmptyCall": alternate_backend_instances, 1566 "UnaryCall": original_backend_instances 1567 }), 1568 ( 1569 [{ 1570 'priority': 0, 1571 # Header 'xds_md_numeric' present -> alternate_backend_service. 1572 # UnaryCall is sent with the metadata, so will be sent to alternative. 1573 'matchRules': [{ 1574 'prefixMatch': 1575 '/', 1576 'headerMatches': [{ 1577 'headerName': _TEST_METADATA_NUMERIC_KEY, 1578 'presentMatch': True 1579 }] 1580 }], 1581 'service': alternate_backend_service.url 1582 }], 1583 { 1584 "EmptyCall": original_backend_instances, 1585 "UnaryCall": alternate_backend_instances 1586 }), 1587 ( 1588 [{ 1589 'priority': 0, 1590 # Header invert ExactMatch -> alternate_backend_service. 1591 # UnaryCall is sent with the metadata, so will be sent to 1592 # original. EmptyCall will be sent to alternative. 1593 'matchRules': [{ 1594 'prefixMatch': 1595 '/', 1596 'headerMatches': [{ 1597 'headerName': _TEST_METADATA_KEY, 1598 'exactMatch': _TEST_METADATA_VALUE_UNARY, 1599 'invertMatch': True 1600 }] 1601 }], 1602 'service': alternate_backend_service.url 1603 }], 1604 { 1605 "EmptyCall": alternate_backend_instances, 1606 "UnaryCall": original_backend_instances 1607 }), 1608 ( 1609 [{ 1610 'priority': 0, 1611 # Header 'xds_md_numeric' range [100,200] -> alternate_backend_service. 1612 # UnaryCall is sent with the metadata in range. 1613 'matchRules': [{ 1614 'prefixMatch': 1615 '/', 1616 'headerMatches': [{ 1617 'headerName': _TEST_METADATA_NUMERIC_KEY, 1618 'rangeMatch': { 1619 'rangeStart': '100', 1620 'rangeEnd': '200' 1621 } 1622 }] 1623 }], 1624 'service': alternate_backend_service.url 1625 }], 1626 { 1627 "EmptyCall": original_backend_instances, 1628 "UnaryCall": alternate_backend_instances 1629 }), 1630 ( 1631 [{ 1632 'priority': 0, 1633 # Header RegexMatch -> alternate_backend_service. 1634 # EmptyCall is sent with the metadata. 1635 'matchRules': [{ 1636 'prefixMatch': 1637 '/', 1638 'headerMatches': [{ 1639 'headerName': 1640 _TEST_METADATA_KEY, 1641 'regexMatch': 1642 "^%s.*%s$" % (_TEST_METADATA_VALUE_EMPTY[:2], 1643 _TEST_METADATA_VALUE_EMPTY[-2:]) 1644 }] 1645 }], 1646 'service': alternate_backend_service.url 1647 }], 1648 { 1649 "EmptyCall": alternate_backend_instances, 1650 "UnaryCall": original_backend_instances 1651 }), 1652 ] 1653 1654 for (route_rules, expected_instances) in test_cases: 1655 logger.info('patching url map with %s -> alternative', 1656 route_rules[0]['matchRules']) 1657 patch_url_map_backend_service(gcp, 1658 original_backend_service, 1659 route_rules=route_rules) 1660 1661 # Wait for traffic to go to both services. 1662 logger.info( 1663 'waiting for traffic to go to all backends (including alternate)' 1664 ) 1665 wait_until_all_rpcs_go_to_given_backends( 1666 original_backend_instances + alternate_backend_instances, 1667 _WAIT_FOR_STATS_SEC) 1668 1669 retry_count = 80 1670 # Each attempt takes about 5 seconds, 80 retries is equivalent to 400 1671 # seconds timeout. 1672 for i in range(retry_count): 1673 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) 1674 if not stats.rpcs_by_method: 1675 raise ValueError( 1676 'stats.rpcs_by_method is None, the interop client stats service does not support this test case' 1677 ) 1678 logger.info('attempt %d', i) 1679 if compare_expected_instances(stats, expected_instances): 1680 logger.info("success") 1681 break 1682 elif i == retry_count - 1: 1683 raise Exception( 1684 'timeout waiting for RPCs to the expected instances: %s' 1685 % expected_instances) 1686 finally: 1687 patch_url_map_backend_service(gcp, original_backend_service) 1688 patch_backend_service(gcp, alternate_backend_service, []) 1689 1690 1691def test_circuit_breaking(gcp, original_backend_service, instance_group, 1692 same_zone_instance_group): 1693 ''' 1694 Since backend service circuit_breakers configuration cannot be unset, 1695 which causes trouble for restoring validate_for_proxy flag in target 1696 proxy/global forwarding rule. This test uses dedicated backend sevices. 1697 The url_map and backend services undergoes the following state changes: 1698 1699 Before test: 1700 original_backend_service -> [instance_group] 1701 extra_backend_service -> [] 1702 more_extra_backend_service -> [] 1703 1704 url_map -> [original_backend_service] 1705 1706 In test: 1707 extra_backend_service (with circuit_breakers) -> [instance_group] 1708 more_extra_backend_service (with circuit_breakers) -> [same_zone_instance_group] 1709 1710 url_map -> [extra_backend_service, more_extra_backend_service] 1711 1712 After test: 1713 original_backend_service -> [instance_group] 1714 extra_backend_service (with circuit_breakers) -> [] 1715 more_extra_backend_service (with circuit_breakers) -> [] 1716 1717 url_map -> [original_backend_service] 1718 ''' 1719 logger.info('Running test_circuit_breaking') 1720 additional_backend_services = [] 1721 try: 1722 # TODO(chengyuanzhang): Dedicated backend services created for circuit 1723 # breaking test. Once the issue for unsetting backend service circuit 1724 # breakers is resolved or configuring backend service circuit breakers is 1725 # enabled for config validation, these dedicated backend services can be 1726 # eliminated. 1727 extra_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-extra' + gcp_suffix 1728 more_extra_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-more-extra' + gcp_suffix 1729 extra_backend_service = add_backend_service(gcp, 1730 extra_backend_service_name) 1731 additional_backend_services.append(extra_backend_service) 1732 more_extra_backend_service = add_backend_service( 1733 gcp, more_extra_backend_service_name) 1734 additional_backend_services.append(more_extra_backend_service) 1735 # The config validation for proxyless doesn't allow setting 1736 # circuit_breakers. Disable validate validate_for_proxyless 1737 # for this test. This can be removed when validation 1738 # accepts circuit_breakers. 1739 logger.info('disabling validate_for_proxyless in target proxy') 1740 set_validate_for_proxyless(gcp, False) 1741 extra_backend_service_max_requests = 500 1742 more_extra_backend_service_max_requests = 1000 1743 patch_backend_service(gcp, 1744 extra_backend_service, [instance_group], 1745 circuit_breakers={ 1746 'maxRequests': 1747 extra_backend_service_max_requests 1748 }) 1749 logger.info('Waiting for extra backends to become healthy') 1750 wait_for_healthy_backends(gcp, extra_backend_service, instance_group) 1751 patch_backend_service(gcp, 1752 more_extra_backend_service, 1753 [same_zone_instance_group], 1754 circuit_breakers={ 1755 'maxRequests': 1756 more_extra_backend_service_max_requests 1757 }) 1758 logger.info('Waiting for more extra backend to become healthy') 1759 wait_for_healthy_backends(gcp, more_extra_backend_service, 1760 same_zone_instance_group) 1761 extra_backend_instances = get_instance_names(gcp, instance_group) 1762 more_extra_backend_instances = get_instance_names( 1763 gcp, same_zone_instance_group) 1764 route_rules = [ 1765 { 1766 'priority': 0, 1767 # UnaryCall -> extra_backend_service 1768 'matchRules': [{ 1769 'fullPathMatch': '/grpc.testing.TestService/UnaryCall' 1770 }], 1771 'service': extra_backend_service.url 1772 }, 1773 { 1774 'priority': 1, 1775 # EmptyCall -> more_extra_backend_service 1776 'matchRules': [{ 1777 'fullPathMatch': '/grpc.testing.TestService/EmptyCall' 1778 }], 1779 'service': more_extra_backend_service.url 1780 }, 1781 ] 1782 1783 # Make client send UNARY_CALL and EMPTY_CALL. 1784 configure_client([ 1785 messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 1786 messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL 1787 ]) 1788 logger.info('Patching url map with %s', route_rules) 1789 patch_url_map_backend_service(gcp, 1790 extra_backend_service, 1791 route_rules=route_rules) 1792 logger.info('Waiting for traffic to go to all backends') 1793 wait_until_all_rpcs_go_to_given_backends( 1794 extra_backend_instances + more_extra_backend_instances, 1795 _WAIT_FOR_STATS_SEC) 1796 1797 # Make all calls keep-open. 1798 configure_client([ 1799 messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 1800 messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL 1801 ], [(messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 1802 'rpc-behavior', 'keep-open'), 1803 (messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL, 1804 'rpc-behavior', 'keep-open')]) 1805 wait_until_rpcs_in_flight( 1806 'UNARY_CALL', (_WAIT_FOR_BACKEND_SEC + 1807 int(extra_backend_service_max_requests / args.qps)), 1808 extra_backend_service_max_requests, 1) 1809 logger.info('UNARY_CALL reached stable state (%d)', 1810 extra_backend_service_max_requests) 1811 wait_until_rpcs_in_flight( 1812 'EMPTY_CALL', 1813 (_WAIT_FOR_BACKEND_SEC + 1814 int(more_extra_backend_service_max_requests / args.qps)), 1815 more_extra_backend_service_max_requests, 1) 1816 logger.info('EMPTY_CALL reached stable state (%d)', 1817 more_extra_backend_service_max_requests) 1818 1819 # Increment circuit breakers max_requests threshold. 1820 extra_backend_service_max_requests = 800 1821 patch_backend_service(gcp, 1822 extra_backend_service, [instance_group], 1823 circuit_breakers={ 1824 'maxRequests': 1825 extra_backend_service_max_requests 1826 }) 1827 wait_until_rpcs_in_flight( 1828 'UNARY_CALL', (_WAIT_FOR_BACKEND_SEC + 1829 int(extra_backend_service_max_requests / args.qps)), 1830 extra_backend_service_max_requests, 1) 1831 logger.info('UNARY_CALL reached stable state after increase (%d)', 1832 extra_backend_service_max_requests) 1833 logger.info('success') 1834 # Avoid new RPCs being outstanding (some test clients create threads 1835 # for sending RPCs) after restoring backend services. 1836 configure_client( 1837 [messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL]) 1838 finally: 1839 patch_url_map_backend_service(gcp, original_backend_service) 1840 patch_backend_service(gcp, original_backend_service, [instance_group]) 1841 for backend_service in additional_backend_services: 1842 delete_backend_service(gcp, backend_service) 1843 set_validate_for_proxyless(gcp, True) 1844 1845 1846def test_timeout(gcp, original_backend_service, instance_group): 1847 logger.info('Running test_timeout') 1848 1849 logger.info('waiting for original backends to become healthy') 1850 wait_for_healthy_backends(gcp, original_backend_service, instance_group) 1851 1852 # UnaryCall -> maxStreamDuration:3s 1853 route_rules = [{ 1854 'priority': 0, 1855 'matchRules': [{ 1856 'fullPathMatch': '/grpc.testing.TestService/UnaryCall' 1857 }], 1858 'service': original_backend_service.url, 1859 'routeAction': { 1860 'maxStreamDuration': { 1861 'seconds': 3, 1862 }, 1863 }, 1864 }] 1865 patch_url_map_backend_service(gcp, 1866 original_backend_service, 1867 route_rules=route_rules) 1868 # A list of tuples (testcase_name, {client_config}, {expected_results}) 1869 test_cases = [ 1870 ( 1871 'timeout_exceeded (UNARY_CALL), timeout_different_route (EMPTY_CALL)', 1872 # UnaryCall and EmptyCall both sleep-4. 1873 # UnaryCall timeouts, EmptyCall succeeds. 1874 { 1875 'rpc_types': [ 1876 messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 1877 messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL, 1878 ], 1879 'metadata': [ 1880 (messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 1881 'rpc-behavior', 'sleep-4'), 1882 (messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL, 1883 'rpc-behavior', 'sleep-4'), 1884 ], 1885 }, 1886 { 1887 'UNARY_CALL': 4, # DEADLINE_EXCEEDED 1888 'EMPTY_CALL': 0, 1889 }, 1890 ), 1891 ( 1892 'app_timeout_exceeded', 1893 # UnaryCall only with sleep-2; timeout=1s; calls timeout. 1894 { 1895 'rpc_types': [ 1896 messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 1897 ], 1898 'metadata': [ 1899 (messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 1900 'rpc-behavior', 'sleep-2'), 1901 ], 1902 'timeout_sec': 1, 1903 }, 1904 { 1905 'UNARY_CALL': 4, # DEADLINE_EXCEEDED 1906 }, 1907 ), 1908 ( 1909 'timeout_not_exceeded', 1910 # UnaryCall only with no sleep; calls succeed. 1911 { 1912 'rpc_types': [ 1913 messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 1914 ], 1915 }, 1916 { 1917 'UNARY_CALL': 0, 1918 }, 1919 ) 1920 ] 1921 1922 try: 1923 first_case = True 1924 for (testcase_name, client_config, expected_results) in test_cases: 1925 logger.info('starting case %s', testcase_name) 1926 configure_client(**client_config) 1927 # wait a second to help ensure the client stops sending RPCs with 1928 # the old config. We will make multiple attempts if it is failing, 1929 # but this improves confidence that the test is valid if the 1930 # previous client_config would lead to the same results. 1931 time.sleep(1) 1932 # Each attempt takes 10 seconds; 20 attempts is equivalent to 200 1933 # second timeout. 1934 attempt_count = 20 1935 if first_case: 1936 attempt_count = 120 1937 first_case = False 1938 before_stats = get_client_accumulated_stats() 1939 if not before_stats.stats_per_method: 1940 raise ValueError( 1941 'stats.stats_per_method is None, the interop client stats service does not support this test case' 1942 ) 1943 for i in range(attempt_count): 1944 logger.info('%s: attempt %d', testcase_name, i) 1945 1946 test_runtime_secs = 10 1947 time.sleep(test_runtime_secs) 1948 after_stats = get_client_accumulated_stats() 1949 1950 success = True 1951 for rpc, status in expected_results.items(): 1952 qty = (after_stats.stats_per_method[rpc].result[status] - 1953 before_stats.stats_per_method[rpc].result[status]) 1954 want = test_runtime_secs * args.qps 1955 # Allow 10% deviation from expectation to reduce flakiness 1956 if qty < (want * .9) or qty > (want * 1.1): 1957 logger.info('%s: failed due to %s[%s]: got %d want ~%d', 1958 testcase_name, rpc, status, qty, want) 1959 success = False 1960 if success: 1961 logger.info('success') 1962 break 1963 logger.info('%s attempt %d failed', testcase_name, i) 1964 before_stats = after_stats 1965 else: 1966 raise Exception( 1967 '%s: timeout waiting for expected results: %s; got %s' % 1968 (testcase_name, expected_results, 1969 after_stats.stats_per_method)) 1970 finally: 1971 patch_url_map_backend_service(gcp, original_backend_service) 1972 1973 1974def test_fault_injection(gcp, original_backend_service, instance_group): 1975 logger.info('Running test_fault_injection') 1976 1977 logger.info('waiting for original backends to become healthy') 1978 wait_for_healthy_backends(gcp, original_backend_service, instance_group) 1979 1980 testcase_header = 'fi_testcase' 1981 1982 def _route(pri, name, fi_policy): 1983 return { 1984 'priority': pri, 1985 'matchRules': [{ 1986 'prefixMatch': 1987 '/', 1988 'headerMatches': [{ 1989 'headerName': testcase_header, 1990 'exactMatch': name, 1991 }], 1992 }], 1993 'service': original_backend_service.url, 1994 'routeAction': { 1995 'faultInjectionPolicy': fi_policy 1996 }, 1997 } 1998 1999 def _abort(pct): 2000 return { 2001 'abort': { 2002 'httpStatus': 401, 2003 'percentage': pct, 2004 } 2005 } 2006 2007 def _delay(pct): 2008 return { 2009 'delay': { 2010 'fixedDelay': { 2011 'seconds': '20' 2012 }, 2013 'percentage': pct, 2014 } 2015 } 2016 2017 zero_route = _abort(0) 2018 zero_route.update(_delay(0)) 2019 route_rules = [ 2020 _route(0, 'zero_percent_fault_injection', zero_route), 2021 _route(1, 'always_delay', _delay(100)), 2022 _route(2, 'always_abort', _abort(100)), 2023 _route(3, 'delay_half', _delay(50)), 2024 _route(4, 'abort_half', _abort(50)), 2025 { 2026 'priority': 5, 2027 'matchRules': [{ 2028 'prefixMatch': '/' 2029 }], 2030 'service': original_backend_service.url, 2031 }, 2032 ] 2033 set_validate_for_proxyless(gcp, False) 2034 patch_url_map_backend_service(gcp, 2035 original_backend_service, 2036 route_rules=route_rules) 2037 # A list of tuples (testcase_name, {client_config}, {code: percent}). Each 2038 # test case will set the testcase_header with the testcase_name for routing 2039 # to the appropriate config for the case, defined above. 2040 test_cases = [ 2041 ( 2042 'zero_percent_fault_injection', 2043 {}, 2044 { 2045 0: 1 2046 }, # OK 2047 ), 2048 ( 2049 'non_matching_fault_injection', # Not in route_rules, above. 2050 {}, 2051 { 2052 0: 1 2053 }, # OK 2054 ), 2055 ( 2056 'always_delay', 2057 { 2058 'timeout_sec': 2 2059 }, 2060 { 2061 4: 1 2062 }, # DEADLINE_EXCEEDED 2063 ), 2064 ( 2065 'always_abort', 2066 {}, 2067 { 2068 16: 1 2069 }, # UNAUTHENTICATED 2070 ), 2071 ( 2072 'delay_half', 2073 { 2074 'timeout_sec': 2 2075 }, 2076 { 2077 4: .5, 2078 0: .5 2079 }, # DEADLINE_EXCEEDED / OK: 50% / 50% 2080 ), 2081 ( 2082 'abort_half', 2083 {}, 2084 { 2085 16: .5, 2086 0: .5 2087 }, # UNAUTHENTICATED / OK: 50% / 50% 2088 ) 2089 ] 2090 2091 try: 2092 first_case = True 2093 for (testcase_name, client_config, expected_results) in test_cases: 2094 logger.info('starting case %s', testcase_name) 2095 2096 client_config['metadata'] = [ 2097 (messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 2098 testcase_header, testcase_name) 2099 ] 2100 client_config['rpc_types'] = [ 2101 messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, 2102 ] 2103 configure_client(**client_config) 2104 # wait a second to help ensure the client stops sending RPCs with 2105 # the old config. We will make multiple attempts if it is failing, 2106 # but this improves confidence that the test is valid if the 2107 # previous client_config would lead to the same results. 2108 time.sleep(1) 2109 # Each attempt takes 10 seconds; 20 attempts is equivalent to 200 2110 # second timeout. 2111 attempt_count = 20 2112 if first_case: 2113 attempt_count = 120 2114 first_case = False 2115 before_stats = get_client_accumulated_stats() 2116 if not before_stats.stats_per_method: 2117 raise ValueError( 2118 'stats.stats_per_method is None, the interop client stats service does not support this test case' 2119 ) 2120 for i in range(attempt_count): 2121 logger.info('%s: attempt %d', testcase_name, i) 2122 2123 test_runtime_secs = 10 2124 time.sleep(test_runtime_secs) 2125 after_stats = get_client_accumulated_stats() 2126 2127 success = True 2128 for status, pct in expected_results.items(): 2129 rpc = 'UNARY_CALL' 2130 qty = (after_stats.stats_per_method[rpc].result[status] - 2131 before_stats.stats_per_method[rpc].result[status]) 2132 want = pct * args.qps * test_runtime_secs 2133 # Allow 10% deviation from expectation to reduce flakiness 2134 VARIANCE_ALLOWED = 0.1 2135 if abs(qty - want) > want * VARIANCE_ALLOWED: 2136 logger.info('%s: failed due to %s[%s]: got %d want ~%d', 2137 testcase_name, rpc, status, qty, want) 2138 success = False 2139 if success: 2140 logger.info('success') 2141 break 2142 logger.info('%s attempt %d failed', testcase_name, i) 2143 before_stats = after_stats 2144 else: 2145 raise Exception( 2146 '%s: timeout waiting for expected results: %s; got %s' % 2147 (testcase_name, expected_results, 2148 after_stats.stats_per_method)) 2149 finally: 2150 patch_url_map_backend_service(gcp, original_backend_service) 2151 set_validate_for_proxyless(gcp, True) 2152 2153 2154def test_csds(gcp, original_backend_service, instance_group, server_uri): 2155 test_csds_timeout_s = datetime.timedelta(minutes=5).total_seconds() 2156 sleep_interval_between_attempts_s = datetime.timedelta( 2157 seconds=2).total_seconds() 2158 logger.info('Running test_csds') 2159 2160 logger.info('waiting for original backends to become healthy') 2161 wait_for_healthy_backends(gcp, original_backend_service, instance_group) 2162 2163 # Test case timeout: 5 minutes 2164 deadline = time.time() + test_csds_timeout_s 2165 cnt = 0 2166 while time.time() <= deadline: 2167 client_config = get_client_xds_config_dump() 2168 logger.info('test_csds attempt %d: received xDS config %s', cnt, 2169 json.dumps(client_config, indent=2)) 2170 if client_config is not None: 2171 # Got the xDS config dump, now validate it 2172 ok = True 2173 try: 2174 if client_config['node']['locality']['zone'] != args.zone: 2175 logger.info('Invalid zone %s != %s', 2176 client_config['node']['locality']['zone'], 2177 args.zone) 2178 ok = False 2179 seen = set() 2180 for xds_config in client_config['xds_config']: 2181 if 'listener_config' in xds_config: 2182 listener_name = xds_config['listener_config'][ 2183 'dynamic_listeners'][0]['active_state']['listener'][ 2184 'name'] 2185 if listener_name != server_uri: 2186 logger.info('Invalid Listener name %s != %s', 2187 listener_name, server_uri) 2188 ok = False 2189 else: 2190 seen.add('lds') 2191 elif 'route_config' in xds_config: 2192 num_vh = len( 2193 xds_config['route_config']['dynamic_route_configs'] 2194 [0]['route_config']['virtual_hosts']) 2195 if num_vh <= 0: 2196 logger.info('Invalid number of VirtualHosts %s', 2197 num_vh) 2198 ok = False 2199 else: 2200 seen.add('rds') 2201 elif 'cluster_config' in xds_config: 2202 cluster_type = xds_config['cluster_config'][ 2203 'dynamic_active_clusters'][0]['cluster']['type'] 2204 if cluster_type != 'EDS': 2205 logger.info('Invalid cluster type %s != EDS', 2206 cluster_type) 2207 ok = False 2208 else: 2209 seen.add('cds') 2210 elif 'endpoint_config' in xds_config: 2211 sub_zone = xds_config["endpoint_config"][ 2212 "dynamic_endpoint_configs"][0]["endpoint_config"][ 2213 "endpoints"][0]["locality"]["sub_zone"] 2214 if args.zone not in sub_zone: 2215 logger.info('Invalid endpoint sub_zone %s', 2216 sub_zone) 2217 ok = False 2218 else: 2219 seen.add('eds') 2220 want = {'lds', 'rds', 'cds', 'eds'} 2221 if seen != want: 2222 logger.info('Incomplete xDS config dump, seen=%s', seen) 2223 ok = False 2224 except: 2225 logger.exception('Error in xDS config dump:') 2226 ok = False 2227 finally: 2228 if ok: 2229 # Successfully fetched xDS config, and they looks good. 2230 logger.info('success') 2231 return 2232 logger.info('test_csds attempt %d failed', cnt) 2233 # Give the client some time to fetch xDS resources 2234 time.sleep(sleep_interval_between_attempts_s) 2235 cnt += 1 2236 2237 raise RuntimeError('failed to receive a valid xDS config in %s seconds' % 2238 test_csds_timeout_s) 2239 2240 2241def maybe_write_sponge_properties(): 2242 """Writing test infos to enable more advanced testgrid searches.""" 2243 if 'KOKORO_ARTIFACTS_DIR' not in os.environ: 2244 return 2245 if 'GIT_ORIGIN_URL' not in os.environ: 2246 return 2247 if 'GIT_COMMIT_SHORT' not in os.environ: 2248 return 2249 properties = [ 2250 # Technically, 'TESTS_FORMAT_VERSION' is not required for run_xds_tests. 2251 # We keep it here so one day we may merge the process of writing sponge 2252 # properties. 2253 'TESTS_FORMAT_VERSION,2', 2254 'TESTGRID_EXCLUDE,%s' % os.environ.get('TESTGRID_EXCLUDE', 0), 2255 'GIT_ORIGIN_URL,%s' % os.environ['GIT_ORIGIN_URL'], 2256 'GIT_COMMIT_SHORT,%s' % os.environ['GIT_COMMIT_SHORT'], 2257 ] 2258 logger.info('Writing Sponge configs: %s', properties) 2259 with open( 2260 os.path.join(os.environ['KOKORO_ARTIFACTS_DIR'], 2261 "custom_sponge_config.csv"), 'w') as f: 2262 f.write("\n".join(properties)) 2263 f.write("\n") 2264 2265 2266def set_validate_for_proxyless(gcp, validate_for_proxyless): 2267 if not gcp.alpha_compute: 2268 logger.debug( 2269 'Not setting validateForProxy because alpha is not enabled') 2270 return 2271 # This function deletes global_forwarding_rule and target_proxy, then 2272 # recreate target_proxy with validateForProxyless=False. This is necessary 2273 # because patching target_grpc_proxy isn't supported. 2274 delete_global_forwarding_rule(gcp) 2275 delete_target_proxy(gcp) 2276 create_target_proxy(gcp, gcp.target_proxy.name, validate_for_proxyless) 2277 create_global_forwarding_rule(gcp, gcp.global_forwarding_rule.name, 2278 [gcp.service_port]) 2279 2280 2281def get_serving_status(instance, service_port): 2282 with grpc.insecure_channel('%s:%d' % (instance, service_port)) as channel: 2283 health_stub = health_pb2_grpc.HealthStub(channel) 2284 return health_stub.Check(health_pb2.HealthCheckRequest()) 2285 2286 2287def set_serving_status(instances, service_port, serving): 2288 logger.info('setting %s serving status to %s', instances, serving) 2289 for instance in instances: 2290 with grpc.insecure_channel('%s:%d' % 2291 (instance, service_port)) as channel: 2292 logger.info('setting %s serving status to %s', instance, serving) 2293 stub = test_pb2_grpc.XdsUpdateHealthServiceStub(channel) 2294 retry_count = 5 2295 for i in range(5): 2296 if serving: 2297 stub.SetServing(empty_pb2.Empty()) 2298 else: 2299 stub.SetNotServing(empty_pb2.Empty()) 2300 serving_status = get_serving_status(instance, service_port) 2301 logger.info('got instance service status %s', serving_status) 2302 want_status = health_pb2.HealthCheckResponse.SERVING if serving else health_pb2.HealthCheckResponse.NOT_SERVING 2303 if serving_status.status == want_status: 2304 break 2305 if i == retry_count - 1: 2306 raise Exception( 2307 'failed to set instance service status after %d retries' 2308 % retry_count) 2309 2310 2311def is_primary_instance_group(gcp, instance_group): 2312 # Clients may connect to a TD instance in a different region than the 2313 # client, in which case primary/secondary assignments may not be based on 2314 # the client's actual locality. 2315 instance_names = get_instance_names(gcp, instance_group) 2316 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) 2317 return all(peer in instance_names for peer in stats.rpcs_by_peer.keys()) 2318 2319 2320def get_startup_script(path_to_server_binary, service_port): 2321 if path_to_server_binary: 2322 return 'nohup %s --port=%d 1>/dev/null &' % (path_to_server_binary, 2323 service_port) 2324 else: 2325 return """#!/bin/bash 2326sudo apt update 2327sudo apt install -y git default-jdk 2328mkdir java_server 2329pushd java_server 2330git clone https://github.com/grpc/grpc-java.git 2331pushd grpc-java 2332pushd interop-testing 2333../gradlew installDist -x test -PskipCodegen=true -PskipAndroid=true 2334 2335nohup build/install/grpc-interop-testing/bin/xds-test-server \ 2336 --port=%d 1>/dev/null &""" % service_port 2337 2338 2339def create_instance_template(gcp, name, network, source_image, machine_type, 2340 startup_script): 2341 config = { 2342 'name': name, 2343 'properties': { 2344 'tags': { 2345 'items': ['allow-health-checks'] 2346 }, 2347 'machineType': machine_type, 2348 'serviceAccounts': [{ 2349 'email': 'default', 2350 'scopes': ['https://www.googleapis.com/auth/cloud-platform',] 2351 }], 2352 'networkInterfaces': [{ 2353 'accessConfigs': [{ 2354 'type': 'ONE_TO_ONE_NAT' 2355 }], 2356 'network': network 2357 }], 2358 'disks': [{ 2359 'boot': True, 2360 'initializeParams': { 2361 'sourceImage': source_image 2362 } 2363 }], 2364 'metadata': { 2365 'items': [{ 2366 'key': 'startup-script', 2367 'value': startup_script 2368 }] 2369 } 2370 } 2371 } 2372 2373 logger.debug('Sending GCP request with body=%s', config) 2374 result = gcp.compute.instanceTemplates().insert( 2375 project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES) 2376 wait_for_global_operation(gcp, result['name']) 2377 gcp.instance_template = GcpResource(config['name'], result['targetLink']) 2378 2379 2380def add_instance_group(gcp, zone, name, size): 2381 config = { 2382 'name': name, 2383 'instanceTemplate': gcp.instance_template.url, 2384 'targetSize': size, 2385 'namedPorts': [{ 2386 'name': 'grpc', 2387 'port': gcp.service_port 2388 }] 2389 } 2390 2391 logger.debug('Sending GCP request with body=%s', config) 2392 result = gcp.compute.instanceGroupManagers().insert( 2393 project=gcp.project, zone=zone, 2394 body=config).execute(num_retries=_GCP_API_RETRIES) 2395 wait_for_zone_operation(gcp, zone, result['name']) 2396 result = gcp.compute.instanceGroupManagers().get( 2397 project=gcp.project, zone=zone, 2398 instanceGroupManager=config['name']).execute( 2399 num_retries=_GCP_API_RETRIES) 2400 instance_group = InstanceGroup(config['name'], result['instanceGroup'], 2401 zone) 2402 gcp.instance_groups.append(instance_group) 2403 wait_for_instance_group_to_reach_expected_size(gcp, instance_group, size, 2404 _WAIT_FOR_OPERATION_SEC) 2405 return instance_group 2406 2407 2408def create_health_check(gcp, name): 2409 if gcp.alpha_compute: 2410 config = { 2411 'name': name, 2412 'type': 'GRPC', 2413 'grpcHealthCheck': { 2414 'portSpecification': 'USE_SERVING_PORT' 2415 } 2416 } 2417 compute_to_use = gcp.alpha_compute 2418 else: 2419 config = { 2420 'name': name, 2421 'type': 'TCP', 2422 'tcpHealthCheck': { 2423 'portName': 'grpc' 2424 } 2425 } 2426 compute_to_use = gcp.compute 2427 logger.debug('Sending GCP request with body=%s', config) 2428 result = compute_to_use.healthChecks().insert( 2429 project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES) 2430 wait_for_global_operation(gcp, result['name']) 2431 gcp.health_check = GcpResource(config['name'], result['targetLink']) 2432 2433 2434def create_health_check_firewall_rule(gcp, name): 2435 config = { 2436 'name': name, 2437 'direction': 'INGRESS', 2438 'allowed': [{ 2439 'IPProtocol': 'tcp' 2440 }], 2441 'sourceRanges': ['35.191.0.0/16', '130.211.0.0/22'], 2442 'targetTags': ['allow-health-checks'], 2443 } 2444 logger.debug('Sending GCP request with body=%s', config) 2445 result = gcp.compute.firewalls().insert( 2446 project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES) 2447 wait_for_global_operation(gcp, result['name']) 2448 gcp.health_check_firewall_rule = GcpResource(config['name'], 2449 result['targetLink']) 2450 2451 2452def add_backend_service(gcp, name): 2453 if gcp.alpha_compute: 2454 protocol = 'GRPC' 2455 compute_to_use = gcp.alpha_compute 2456 else: 2457 protocol = 'HTTP2' 2458 compute_to_use = gcp.compute 2459 config = { 2460 'name': name, 2461 'loadBalancingScheme': 'INTERNAL_SELF_MANAGED', 2462 'healthChecks': [gcp.health_check.url], 2463 'portName': 'grpc', 2464 'protocol': protocol 2465 } 2466 logger.debug('Sending GCP request with body=%s', config) 2467 result = compute_to_use.backendServices().insert( 2468 project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES) 2469 wait_for_global_operation(gcp, result['name']) 2470 backend_service = GcpResource(config['name'], result['targetLink']) 2471 gcp.backend_services.append(backend_service) 2472 return backend_service 2473 2474 2475def create_url_map(gcp, name, backend_service, host_name): 2476 config = { 2477 'name': name, 2478 'defaultService': backend_service.url, 2479 'pathMatchers': [{ 2480 'name': _PATH_MATCHER_NAME, 2481 'defaultService': backend_service.url, 2482 }], 2483 'hostRules': [{ 2484 'hosts': [host_name], 2485 'pathMatcher': _PATH_MATCHER_NAME 2486 }] 2487 } 2488 logger.debug('Sending GCP request with body=%s', config) 2489 result = gcp.compute.urlMaps().insert( 2490 project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES) 2491 wait_for_global_operation(gcp, result['name']) 2492 gcp.url_map = GcpResource(config['name'], result['targetLink']) 2493 2494 2495def patch_url_map_host_rule_with_port(gcp, name, backend_service, host_name): 2496 config = { 2497 'hostRules': [{ 2498 'hosts': ['%s:%d' % (host_name, gcp.service_port)], 2499 'pathMatcher': _PATH_MATCHER_NAME 2500 }] 2501 } 2502 logger.debug('Sending GCP request with body=%s', config) 2503 result = gcp.compute.urlMaps().patch( 2504 project=gcp.project, urlMap=name, 2505 body=config).execute(num_retries=_GCP_API_RETRIES) 2506 wait_for_global_operation(gcp, result['name']) 2507 2508 2509def create_target_proxy(gcp, name, validate_for_proxyless=True): 2510 if gcp.alpha_compute: 2511 config = { 2512 'name': name, 2513 'url_map': gcp.url_map.url, 2514 'validate_for_proxyless': validate_for_proxyless 2515 } 2516 logger.debug('Sending GCP request with body=%s', config) 2517 result = gcp.alpha_compute.targetGrpcProxies().insert( 2518 project=gcp.project, 2519 body=config).execute(num_retries=_GCP_API_RETRIES) 2520 else: 2521 config = { 2522 'name': name, 2523 'url_map': gcp.url_map.url, 2524 } 2525 logger.debug('Sending GCP request with body=%s', config) 2526 result = gcp.compute.targetHttpProxies().insert( 2527 project=gcp.project, 2528 body=config).execute(num_retries=_GCP_API_RETRIES) 2529 wait_for_global_operation(gcp, result['name']) 2530 gcp.target_proxy = GcpResource(config['name'], result['targetLink']) 2531 2532 2533def create_global_forwarding_rule(gcp, 2534 name, 2535 potential_ports, 2536 potential_ip_addresses=['0.0.0.0']): 2537 if gcp.alpha_compute: 2538 compute_to_use = gcp.alpha_compute 2539 else: 2540 compute_to_use = gcp.compute 2541 for port in potential_ports: 2542 for ip_address in potential_ip_addresses: 2543 try: 2544 config = { 2545 'name': name, 2546 'loadBalancingScheme': 'INTERNAL_SELF_MANAGED', 2547 'portRange': str(port), 2548 'IPAddress': ip_address, 2549 'network': args.network, 2550 'target': gcp.target_proxy.url, 2551 } 2552 logger.debug('Sending GCP request with body=%s', config) 2553 result = compute_to_use.globalForwardingRules().insert( 2554 project=gcp.project, 2555 body=config).execute(num_retries=_GCP_API_RETRIES) 2556 wait_for_global_operation(gcp, result['name']) 2557 gcp.global_forwarding_rule = GcpResource( 2558 config['name'], result['targetLink']) 2559 gcp.service_port = port 2560 return 2561 except googleapiclient.errors.HttpError as http_error: 2562 logger.warning( 2563 'Got error %s when attempting to create forwarding rule to ' 2564 '%s:%d. Retrying with another port.' % 2565 (http_error, ip_address, port)) 2566 2567 2568def get_health_check(gcp, health_check_name): 2569 result = gcp.compute.healthChecks().get( 2570 project=gcp.project, healthCheck=health_check_name).execute() 2571 gcp.health_check = GcpResource(health_check_name, result['selfLink']) 2572 2573 2574def get_health_check_firewall_rule(gcp, firewall_name): 2575 result = gcp.compute.firewalls().get(project=gcp.project, 2576 firewall=firewall_name).execute() 2577 gcp.health_check_firewall_rule = GcpResource(firewall_name, 2578 result['selfLink']) 2579 2580 2581def get_backend_service(gcp, backend_service_name): 2582 result = gcp.compute.backendServices().get( 2583 project=gcp.project, backendService=backend_service_name).execute() 2584 backend_service = GcpResource(backend_service_name, result['selfLink']) 2585 gcp.backend_services.append(backend_service) 2586 return backend_service 2587 2588 2589def get_url_map(gcp, url_map_name): 2590 result = gcp.compute.urlMaps().get(project=gcp.project, 2591 urlMap=url_map_name).execute() 2592 gcp.url_map = GcpResource(url_map_name, result['selfLink']) 2593 2594 2595def get_target_proxy(gcp, target_proxy_name): 2596 if gcp.alpha_compute: 2597 result = gcp.alpha_compute.targetGrpcProxies().get( 2598 project=gcp.project, targetGrpcProxy=target_proxy_name).execute() 2599 else: 2600 result = gcp.compute.targetHttpProxies().get( 2601 project=gcp.project, targetHttpProxy=target_proxy_name).execute() 2602 gcp.target_proxy = GcpResource(target_proxy_name, result['selfLink']) 2603 2604 2605def get_global_forwarding_rule(gcp, forwarding_rule_name): 2606 result = gcp.compute.globalForwardingRules().get( 2607 project=gcp.project, forwardingRule=forwarding_rule_name).execute() 2608 gcp.global_forwarding_rule = GcpResource(forwarding_rule_name, 2609 result['selfLink']) 2610 2611 2612def get_instance_template(gcp, template_name): 2613 result = gcp.compute.instanceTemplates().get( 2614 project=gcp.project, instanceTemplate=template_name).execute() 2615 gcp.instance_template = GcpResource(template_name, result['selfLink']) 2616 2617 2618def get_instance_group(gcp, zone, instance_group_name): 2619 result = gcp.compute.instanceGroups().get( 2620 project=gcp.project, zone=zone, 2621 instanceGroup=instance_group_name).execute() 2622 gcp.service_port = result['namedPorts'][0]['port'] 2623 instance_group = InstanceGroup(instance_group_name, result['selfLink'], 2624 zone) 2625 gcp.instance_groups.append(instance_group) 2626 return instance_group 2627 2628 2629def delete_global_forwarding_rule(gcp, name=None): 2630 if name: 2631 forwarding_rule_to_delete = name 2632 else: 2633 forwarding_rule_to_delete = gcp.global_forwarding_rule.name 2634 try: 2635 result = gcp.compute.globalForwardingRules().delete( 2636 project=gcp.project, 2637 forwardingRule=forwarding_rule_to_delete).execute( 2638 num_retries=_GCP_API_RETRIES) 2639 wait_for_global_operation(gcp, result['name']) 2640 except googleapiclient.errors.HttpError as http_error: 2641 logger.info('Delete failed: %s', http_error) 2642 2643 2644def delete_target_proxy(gcp, name=None): 2645 if name: 2646 proxy_to_delete = name 2647 else: 2648 proxy_to_delete = gcp.target_proxy.name 2649 try: 2650 if gcp.alpha_compute: 2651 result = gcp.alpha_compute.targetGrpcProxies().delete( 2652 project=gcp.project, targetGrpcProxy=proxy_to_delete).execute( 2653 num_retries=_GCP_API_RETRIES) 2654 else: 2655 result = gcp.compute.targetHttpProxies().delete( 2656 project=gcp.project, targetHttpProxy=proxy_to_delete).execute( 2657 num_retries=_GCP_API_RETRIES) 2658 wait_for_global_operation(gcp, result['name']) 2659 except googleapiclient.errors.HttpError as http_error: 2660 logger.info('Delete failed: %s', http_error) 2661 2662 2663def delete_url_map(gcp, name=None): 2664 if name: 2665 url_map_to_delete = name 2666 else: 2667 url_map_to_delete = gcp.url_map.name 2668 try: 2669 result = gcp.compute.urlMaps().delete( 2670 project=gcp.project, 2671 urlMap=url_map_to_delete).execute(num_retries=_GCP_API_RETRIES) 2672 wait_for_global_operation(gcp, result['name']) 2673 except googleapiclient.errors.HttpError as http_error: 2674 logger.info('Delete failed: %s', http_error) 2675 2676 2677def delete_backend_service(gcp, backend_service): 2678 try: 2679 result = gcp.compute.backendServices().delete( 2680 project=gcp.project, backendService=backend_service.name).execute( 2681 num_retries=_GCP_API_RETRIES) 2682 wait_for_global_operation(gcp, result['name']) 2683 except googleapiclient.errors.HttpError as http_error: 2684 logger.info('Delete failed: %s', http_error) 2685 2686 2687def delete_backend_services(gcp): 2688 for backend_service in gcp.backend_services: 2689 delete_backend_service(gcp, backend_service) 2690 2691 2692def delete_firewall(gcp): 2693 try: 2694 result = gcp.compute.firewalls().delete( 2695 project=gcp.project, 2696 firewall=gcp.health_check_firewall_rule.name).execute( 2697 num_retries=_GCP_API_RETRIES) 2698 wait_for_global_operation(gcp, result['name']) 2699 except googleapiclient.errors.HttpError as http_error: 2700 logger.info('Delete failed: %s', http_error) 2701 2702 2703def delete_health_check(gcp): 2704 try: 2705 result = gcp.compute.healthChecks().delete( 2706 project=gcp.project, healthCheck=gcp.health_check.name).execute( 2707 num_retries=_GCP_API_RETRIES) 2708 wait_for_global_operation(gcp, result['name']) 2709 except googleapiclient.errors.HttpError as http_error: 2710 logger.info('Delete failed: %s', http_error) 2711 2712 2713def delete_instance_groups(gcp): 2714 for instance_group in gcp.instance_groups: 2715 try: 2716 result = gcp.compute.instanceGroupManagers().delete( 2717 project=gcp.project, 2718 zone=instance_group.zone, 2719 instanceGroupManager=instance_group.name).execute( 2720 num_retries=_GCP_API_RETRIES) 2721 wait_for_zone_operation(gcp, 2722 instance_group.zone, 2723 result['name'], 2724 timeout_sec=_WAIT_FOR_BACKEND_SEC) 2725 except googleapiclient.errors.HttpError as http_error: 2726 logger.info('Delete failed: %s', http_error) 2727 2728 2729def delete_instance_template(gcp): 2730 try: 2731 result = gcp.compute.instanceTemplates().delete( 2732 project=gcp.project, 2733 instanceTemplate=gcp.instance_template.name).execute( 2734 num_retries=_GCP_API_RETRIES) 2735 wait_for_global_operation(gcp, result['name']) 2736 except googleapiclient.errors.HttpError as http_error: 2737 logger.info('Delete failed: %s', http_error) 2738 2739 2740def patch_backend_service(gcp, 2741 backend_service, 2742 instance_groups, 2743 balancing_mode='UTILIZATION', 2744 max_rate=1, 2745 circuit_breakers=None): 2746 if gcp.alpha_compute: 2747 compute_to_use = gcp.alpha_compute 2748 else: 2749 compute_to_use = gcp.compute 2750 config = { 2751 'backends': [{ 2752 'group': instance_group.url, 2753 'balancingMode': balancing_mode, 2754 'maxRate': max_rate if balancing_mode == 'RATE' else None 2755 } for instance_group in instance_groups], 2756 'circuitBreakers': circuit_breakers, 2757 } 2758 logger.debug('Sending GCP request with body=%s', config) 2759 result = compute_to_use.backendServices().patch( 2760 project=gcp.project, backendService=backend_service.name, 2761 body=config).execute(num_retries=_GCP_API_RETRIES) 2762 wait_for_global_operation(gcp, 2763 result['name'], 2764 timeout_sec=_WAIT_FOR_BACKEND_SEC) 2765 2766 2767def resize_instance_group(gcp, 2768 instance_group, 2769 new_size, 2770 timeout_sec=_WAIT_FOR_OPERATION_SEC): 2771 result = gcp.compute.instanceGroupManagers().resize( 2772 project=gcp.project, 2773 zone=instance_group.zone, 2774 instanceGroupManager=instance_group.name, 2775 size=new_size).execute(num_retries=_GCP_API_RETRIES) 2776 wait_for_zone_operation(gcp, 2777 instance_group.zone, 2778 result['name'], 2779 timeout_sec=360) 2780 wait_for_instance_group_to_reach_expected_size(gcp, instance_group, 2781 new_size, timeout_sec) 2782 2783 2784def patch_url_map_backend_service(gcp, 2785 backend_service=None, 2786 services_with_weights=None, 2787 route_rules=None): 2788 '''change url_map's backend service 2789 2790 Only one of backend_service and service_with_weights can be not None. 2791 ''' 2792 if gcp.alpha_compute: 2793 compute_to_use = gcp.alpha_compute 2794 else: 2795 compute_to_use = gcp.compute 2796 2797 if backend_service and services_with_weights: 2798 raise ValueError( 2799 'both backend_service and service_with_weights are not None.') 2800 2801 default_service = backend_service.url if backend_service else None 2802 default_route_action = { 2803 'weightedBackendServices': [{ 2804 'backendService': service.url, 2805 'weight': w, 2806 } for service, w in services_with_weights.items()] 2807 } if services_with_weights else None 2808 2809 config = { 2810 'pathMatchers': [{ 2811 'name': _PATH_MATCHER_NAME, 2812 'defaultService': default_service, 2813 'defaultRouteAction': default_route_action, 2814 'routeRules': route_rules, 2815 }] 2816 } 2817 logger.debug('Sending GCP request with body=%s', config) 2818 result = compute_to_use.urlMaps().patch( 2819 project=gcp.project, urlMap=gcp.url_map.name, 2820 body=config).execute(num_retries=_GCP_API_RETRIES) 2821 wait_for_global_operation(gcp, result['name']) 2822 2823 2824def wait_for_instance_group_to_reach_expected_size(gcp, instance_group, 2825 expected_size, timeout_sec): 2826 start_time = time.time() 2827 while True: 2828 current_size = len(get_instance_names(gcp, instance_group)) 2829 if current_size == expected_size: 2830 break 2831 if time.time() - start_time > timeout_sec: 2832 raise Exception( 2833 'Instance group had expected size %d but actual size %d' % 2834 (expected_size, current_size)) 2835 time.sleep(2) 2836 2837 2838def wait_for_global_operation(gcp, 2839 operation, 2840 timeout_sec=_WAIT_FOR_OPERATION_SEC): 2841 start_time = time.time() 2842 while time.time() - start_time <= timeout_sec: 2843 result = gcp.compute.globalOperations().get( 2844 project=gcp.project, 2845 operation=operation).execute(num_retries=_GCP_API_RETRIES) 2846 if result['status'] == 'DONE': 2847 if 'error' in result: 2848 raise Exception(result['error']) 2849 return 2850 time.sleep(2) 2851 raise Exception('Operation %s did not complete within %d' % 2852 (operation, timeout_sec)) 2853 2854 2855def wait_for_zone_operation(gcp, 2856 zone, 2857 operation, 2858 timeout_sec=_WAIT_FOR_OPERATION_SEC): 2859 start_time = time.time() 2860 while time.time() - start_time <= timeout_sec: 2861 result = gcp.compute.zoneOperations().get( 2862 project=gcp.project, zone=zone, 2863 operation=operation).execute(num_retries=_GCP_API_RETRIES) 2864 if result['status'] == 'DONE': 2865 if 'error' in result: 2866 raise Exception(result['error']) 2867 return 2868 time.sleep(2) 2869 raise Exception('Operation %s did not complete within %d' % 2870 (operation, timeout_sec)) 2871 2872 2873def wait_for_healthy_backends(gcp, 2874 backend_service, 2875 instance_group, 2876 timeout_sec=_WAIT_FOR_BACKEND_SEC): 2877 start_time = time.time() 2878 config = {'group': instance_group.url} 2879 instance_names = get_instance_names(gcp, instance_group) 2880 expected_size = len(instance_names) 2881 while time.time() - start_time <= timeout_sec: 2882 for instance_name in instance_names: 2883 try: 2884 status = get_serving_status(instance_name, gcp.service_port) 2885 logger.info('serving status response from %s: %s', 2886 instance_name, status) 2887 except grpc.RpcError as rpc_error: 2888 logger.info('checking serving status of %s failed: %s', 2889 instance_name, rpc_error) 2890 result = gcp.compute.backendServices().getHealth( 2891 project=gcp.project, 2892 backendService=backend_service.name, 2893 body=config).execute(num_retries=_GCP_API_RETRIES) 2894 if 'healthStatus' in result: 2895 logger.info('received GCP healthStatus: %s', result['healthStatus']) 2896 healthy = True 2897 for instance in result['healthStatus']: 2898 if instance['healthState'] != 'HEALTHY': 2899 healthy = False 2900 break 2901 if healthy and expected_size == len(result['healthStatus']): 2902 return 2903 else: 2904 logger.info('no healthStatus received from GCP') 2905 time.sleep(5) 2906 raise Exception('Not all backends became healthy within %d seconds: %s' % 2907 (timeout_sec, result)) 2908 2909 2910def get_instance_names(gcp, instance_group): 2911 instance_names = [] 2912 result = gcp.compute.instanceGroups().listInstances( 2913 project=gcp.project, 2914 zone=instance_group.zone, 2915 instanceGroup=instance_group.name, 2916 body={ 2917 'instanceState': 'ALL' 2918 }).execute(num_retries=_GCP_API_RETRIES) 2919 if 'items' not in result: 2920 return [] 2921 for item in result['items']: 2922 # listInstances() returns the full URL of the instance, which ends with 2923 # the instance name. compute.instances().get() requires using the 2924 # instance name (not the full URL) to look up instance details, so we 2925 # just extract the name manually. 2926 instance_name = item['instance'].split('/')[-1] 2927 instance_names.append(instance_name) 2928 logger.info('retrieved instance names: %s', instance_names) 2929 return instance_names 2930 2931 2932def clean_up(gcp): 2933 if gcp.global_forwarding_rule: 2934 delete_global_forwarding_rule(gcp) 2935 if gcp.target_proxy: 2936 delete_target_proxy(gcp) 2937 if gcp.url_map: 2938 delete_url_map(gcp) 2939 delete_backend_services(gcp) 2940 if gcp.health_check_firewall_rule: 2941 delete_firewall(gcp) 2942 if gcp.health_check: 2943 delete_health_check(gcp) 2944 delete_instance_groups(gcp) 2945 if gcp.instance_template: 2946 delete_instance_template(gcp) 2947 2948 2949class InstanceGroup(object): 2950 2951 def __init__(self, name, url, zone): 2952 self.name = name 2953 self.url = url 2954 self.zone = zone 2955 2956 2957class GcpResource(object): 2958 2959 def __init__(self, name, url): 2960 self.name = name 2961 self.url = url 2962 2963 2964class GcpState(object): 2965 2966 def __init__(self, compute, alpha_compute, project, project_num): 2967 self.compute = compute 2968 self.alpha_compute = alpha_compute 2969 self.project = project 2970 self.project_num = project_num 2971 self.health_check = None 2972 self.health_check_firewall_rule = None 2973 self.backend_services = [] 2974 self.url_map = None 2975 self.target_proxy = None 2976 self.global_forwarding_rule = None 2977 self.service_port = None 2978 self.instance_template = None 2979 self.instance_groups = [] 2980 2981 2982maybe_write_sponge_properties() 2983alpha_compute = None 2984if args.compute_discovery_document: 2985 with open(args.compute_discovery_document, 'r') as discovery_doc: 2986 compute = googleapiclient.discovery.build_from_document( 2987 discovery_doc.read()) 2988 if not args.only_stable_gcp_apis and args.alpha_compute_discovery_document: 2989 with open(args.alpha_compute_discovery_document, 'r') as discovery_doc: 2990 alpha_compute = googleapiclient.discovery.build_from_document( 2991 discovery_doc.read()) 2992else: 2993 compute = googleapiclient.discovery.build('compute', 'v1') 2994 if not args.only_stable_gcp_apis: 2995 alpha_compute = googleapiclient.discovery.build('compute', 'alpha') 2996 2997try: 2998 gcp = GcpState(compute, alpha_compute, args.project_id, args.project_num) 2999 gcp_suffix = args.gcp_suffix 3000 health_check_name = _BASE_HEALTH_CHECK_NAME + gcp_suffix 3001 if not args.use_existing_gcp_resources: 3002 if args.keep_gcp_resources: 3003 # Auto-generating a unique suffix in case of conflict should not be 3004 # combined with --keep_gcp_resources, as the suffix actually used 3005 # for GCP resources will not match the provided --gcp_suffix value. 3006 num_attempts = 1 3007 else: 3008 num_attempts = 5 3009 for i in range(num_attempts): 3010 try: 3011 logger.info('Using GCP suffix %s', gcp_suffix) 3012 create_health_check(gcp, health_check_name) 3013 break 3014 except googleapiclient.errors.HttpError as http_error: 3015 gcp_suffix = '%s-%04d' % (gcp_suffix, random.randint(0, 9999)) 3016 health_check_name = _BASE_HEALTH_CHECK_NAME + gcp_suffix 3017 logger.exception('HttpError when creating health check') 3018 if gcp.health_check is None: 3019 raise Exception('Failed to create health check name after %d ' 3020 'attempts' % num_attempts) 3021 firewall_name = _BASE_FIREWALL_RULE_NAME + gcp_suffix 3022 backend_service_name = _BASE_BACKEND_SERVICE_NAME + gcp_suffix 3023 alternate_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-alternate' + gcp_suffix 3024 url_map_name = _BASE_URL_MAP_NAME + gcp_suffix 3025 service_host_name = _BASE_SERVICE_HOST + gcp_suffix 3026 target_proxy_name = _BASE_TARGET_PROXY_NAME + gcp_suffix 3027 forwarding_rule_name = _BASE_FORWARDING_RULE_NAME + gcp_suffix 3028 template_name = _BASE_TEMPLATE_NAME + gcp_suffix 3029 instance_group_name = _BASE_INSTANCE_GROUP_NAME + gcp_suffix 3030 same_zone_instance_group_name = _BASE_INSTANCE_GROUP_NAME + '-same-zone' + gcp_suffix 3031 secondary_zone_instance_group_name = _BASE_INSTANCE_GROUP_NAME + '-secondary-zone' + gcp_suffix 3032 if args.use_existing_gcp_resources: 3033 logger.info('Reusing existing GCP resources') 3034 get_health_check(gcp, health_check_name) 3035 try: 3036 get_health_check_firewall_rule(gcp, firewall_name) 3037 except googleapiclient.errors.HttpError as http_error: 3038 # Firewall rule may be auto-deleted periodically depending on GCP 3039 # project settings. 3040 logger.exception('Failed to find firewall rule, recreating') 3041 create_health_check_firewall_rule(gcp, firewall_name) 3042 backend_service = get_backend_service(gcp, backend_service_name) 3043 alternate_backend_service = get_backend_service( 3044 gcp, alternate_backend_service_name) 3045 get_url_map(gcp, url_map_name) 3046 get_target_proxy(gcp, target_proxy_name) 3047 get_global_forwarding_rule(gcp, forwarding_rule_name) 3048 get_instance_template(gcp, template_name) 3049 instance_group = get_instance_group(gcp, args.zone, instance_group_name) 3050 same_zone_instance_group = get_instance_group( 3051 gcp, args.zone, same_zone_instance_group_name) 3052 secondary_zone_instance_group = get_instance_group( 3053 gcp, args.secondary_zone, secondary_zone_instance_group_name) 3054 else: 3055 create_health_check_firewall_rule(gcp, firewall_name) 3056 backend_service = add_backend_service(gcp, backend_service_name) 3057 alternate_backend_service = add_backend_service( 3058 gcp, alternate_backend_service_name) 3059 create_url_map(gcp, url_map_name, backend_service, service_host_name) 3060 create_target_proxy(gcp, target_proxy_name) 3061 potential_service_ports = list(args.service_port_range) 3062 random.shuffle(potential_service_ports) 3063 create_global_forwarding_rule(gcp, forwarding_rule_name, 3064 potential_service_ports) 3065 if not gcp.service_port: 3066 raise Exception( 3067 'Failed to find a valid ip:port for the forwarding rule') 3068 if gcp.service_port != _DEFAULT_SERVICE_PORT: 3069 patch_url_map_host_rule_with_port(gcp, url_map_name, 3070 backend_service, 3071 service_host_name) 3072 startup_script = get_startup_script(args.path_to_server_binary, 3073 gcp.service_port) 3074 create_instance_template(gcp, template_name, args.network, 3075 args.source_image, args.machine_type, 3076 startup_script) 3077 instance_group = add_instance_group(gcp, args.zone, instance_group_name, 3078 _INSTANCE_GROUP_SIZE) 3079 patch_backend_service(gcp, backend_service, [instance_group]) 3080 same_zone_instance_group = add_instance_group( 3081 gcp, args.zone, same_zone_instance_group_name, _INSTANCE_GROUP_SIZE) 3082 secondary_zone_instance_group = add_instance_group( 3083 gcp, args.secondary_zone, secondary_zone_instance_group_name, 3084 _INSTANCE_GROUP_SIZE) 3085 3086 wait_for_healthy_backends(gcp, backend_service, instance_group) 3087 3088 if args.test_case: 3089 client_env = dict(os.environ) 3090 if original_grpc_trace: 3091 client_env['GRPC_TRACE'] = original_grpc_trace 3092 if original_grpc_verbosity: 3093 client_env['GRPC_VERBOSITY'] = original_grpc_verbosity 3094 bootstrap_server_features = [] 3095 3096 if gcp.service_port == _DEFAULT_SERVICE_PORT: 3097 server_uri = service_host_name 3098 else: 3099 server_uri = service_host_name + ':' + str(gcp.service_port) 3100 if args.xds_v3_support: 3101 client_env['GRPC_XDS_EXPERIMENTAL_V3_SUPPORT'] = 'true' 3102 bootstrap_server_features.append('xds_v3') 3103 if args.bootstrap_file: 3104 bootstrap_path = os.path.abspath(args.bootstrap_file) 3105 else: 3106 with tempfile.NamedTemporaryFile(delete=False) as bootstrap_file: 3107 bootstrap_file.write( 3108 _BOOTSTRAP_TEMPLATE.format( 3109 node_id='projects/%s/networks/%s/nodes/%s' % 3110 (gcp.project_num, args.network.split('/')[-1], 3111 uuid.uuid1()), 3112 server_features=json.dumps( 3113 bootstrap_server_features)).encode('utf-8')) 3114 bootstrap_path = bootstrap_file.name 3115 client_env['GRPC_XDS_BOOTSTRAP'] = bootstrap_path 3116 client_env['GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING'] = 'true' 3117 client_env['GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT'] = 'true' 3118 client_env['GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION'] = 'true' 3119 test_results = {} 3120 failed_tests = [] 3121 for test_case in args.test_case: 3122 if test_case in _V3_TEST_CASES and not args.xds_v3_support: 3123 logger.info('skipping test %s due to missing v3 support', 3124 test_case) 3125 continue 3126 if test_case in _ALPHA_TEST_CASES and not gcp.alpha_compute: 3127 logger.info('skipping test %s due to missing alpha support', 3128 test_case) 3129 continue 3130 if test_case in [ 3131 'api_listener', 'forwarding_rule_port_match', 3132 'forwarding_rule_default_port' 3133 ] and CLIENT_HOSTS: 3134 logger.info( 3135 'skipping test %s because test configuration is' 3136 'not compatible with client processes on existing' 3137 'client hosts', test_case) 3138 continue 3139 if test_case == 'forwarding_rule_default_port': 3140 server_uri = service_host_name 3141 result = jobset.JobResult() 3142 log_dir = os.path.join(_TEST_LOG_BASE_DIR, test_case) 3143 if not os.path.exists(log_dir): 3144 os.makedirs(log_dir) 3145 test_log_filename = os.path.join(log_dir, _SPONGE_LOG_NAME) 3146 test_log_file = open(test_log_filename, 'w+') 3147 client_process = None 3148 3149 if test_case in _TESTS_TO_RUN_MULTIPLE_RPCS: 3150 rpcs_to_send = '--rpc="UnaryCall,EmptyCall"' 3151 else: 3152 rpcs_to_send = '--rpc="UnaryCall"' 3153 3154 if test_case in _TESTS_TO_SEND_METADATA: 3155 metadata_to_send = '--metadata="EmptyCall:{keyE}:{valueE},UnaryCall:{keyU}:{valueU},UnaryCall:{keyNU}:{valueNU}"'.format( 3156 keyE=_TEST_METADATA_KEY, 3157 valueE=_TEST_METADATA_VALUE_EMPTY, 3158 keyU=_TEST_METADATA_KEY, 3159 valueU=_TEST_METADATA_VALUE_UNARY, 3160 keyNU=_TEST_METADATA_NUMERIC_KEY, 3161 valueNU=_TEST_METADATA_NUMERIC_VALUE) 3162 else: 3163 # Setting the arg explicitly to empty with '--metadata=""' 3164 # makes C# client fail 3165 # (see https://github.com/commandlineparser/commandline/issues/412), 3166 # so instead we just rely on clients using the default when 3167 # metadata arg is not specified. 3168 metadata_to_send = '' 3169 3170 # TODO(ericgribkoff) Temporarily disable fail_on_failed_rpc checks 3171 # in the client. This means we will ignore intermittent RPC 3172 # failures (but this framework still checks that the final result 3173 # is as expected). 3174 # 3175 # Reason for disabling this is, the resources are shared by 3176 # multiple tests, and a change in previous test could be delayed 3177 # until the second test starts. The second test may see 3178 # intermittent failures because of that. 3179 # 3180 # A fix is to not share resources between tests (though that does 3181 # mean the tests will be significantly slower due to creating new 3182 # resources). 3183 fail_on_failed_rpc = '' 3184 3185 try: 3186 if not CLIENT_HOSTS: 3187 client_cmd_formatted = args.client_cmd.format( 3188 server_uri=server_uri, 3189 stats_port=args.stats_port, 3190 qps=args.qps, 3191 fail_on_failed_rpc=fail_on_failed_rpc, 3192 rpcs_to_send=rpcs_to_send, 3193 metadata_to_send=metadata_to_send) 3194 logger.debug('running client: %s', client_cmd_formatted) 3195 client_cmd = shlex.split(client_cmd_formatted) 3196 client_process = subprocess.Popen(client_cmd, 3197 env=client_env, 3198 stderr=subprocess.STDOUT, 3199 stdout=test_log_file) 3200 if test_case == 'backends_restart': 3201 test_backends_restart(gcp, backend_service, instance_group) 3202 elif test_case == 'change_backend_service': 3203 test_change_backend_service(gcp, backend_service, 3204 instance_group, 3205 alternate_backend_service, 3206 same_zone_instance_group) 3207 elif test_case == 'gentle_failover': 3208 test_gentle_failover(gcp, backend_service, instance_group, 3209 secondary_zone_instance_group) 3210 elif test_case == 'load_report_based_failover': 3211 test_load_report_based_failover( 3212 gcp, backend_service, instance_group, 3213 secondary_zone_instance_group) 3214 elif test_case == 'ping_pong': 3215 test_ping_pong(gcp, backend_service, instance_group) 3216 elif test_case == 'remove_instance_group': 3217 test_remove_instance_group(gcp, backend_service, 3218 instance_group, 3219 same_zone_instance_group) 3220 elif test_case == 'round_robin': 3221 test_round_robin(gcp, backend_service, instance_group) 3222 elif test_case == 'secondary_locality_gets_no_requests_on_partial_primary_failure': 3223 test_secondary_locality_gets_no_requests_on_partial_primary_failure( 3224 gcp, backend_service, instance_group, 3225 secondary_zone_instance_group) 3226 elif test_case == 'secondary_locality_gets_requests_on_primary_failure': 3227 test_secondary_locality_gets_requests_on_primary_failure( 3228 gcp, backend_service, instance_group, 3229 secondary_zone_instance_group) 3230 elif test_case == 'traffic_splitting': 3231 test_traffic_splitting(gcp, backend_service, instance_group, 3232 alternate_backend_service, 3233 same_zone_instance_group) 3234 elif test_case == 'path_matching': 3235 test_path_matching(gcp, backend_service, instance_group, 3236 alternate_backend_service, 3237 same_zone_instance_group) 3238 elif test_case == 'header_matching': 3239 test_header_matching(gcp, backend_service, instance_group, 3240 alternate_backend_service, 3241 same_zone_instance_group) 3242 elif test_case == 'circuit_breaking': 3243 test_circuit_breaking(gcp, backend_service, instance_group, 3244 same_zone_instance_group) 3245 elif test_case == 'timeout': 3246 test_timeout(gcp, backend_service, instance_group) 3247 elif test_case == 'fault_injection': 3248 test_fault_injection(gcp, backend_service, instance_group) 3249 elif test_case == 'api_listener': 3250 server_uri = test_api_listener(gcp, backend_service, 3251 instance_group, 3252 alternate_backend_service) 3253 elif test_case == 'forwarding_rule_port_match': 3254 server_uri = test_forwarding_rule_port_match( 3255 gcp, backend_service, instance_group) 3256 elif test_case == 'forwarding_rule_default_port': 3257 server_uri = test_forwarding_rule_default_port( 3258 gcp, backend_service, instance_group) 3259 elif test_case == 'metadata_filter': 3260 test_metadata_filter(gcp, backend_service, instance_group, 3261 alternate_backend_service, 3262 same_zone_instance_group) 3263 elif test_case == 'csds': 3264 test_csds(gcp, backend_service, instance_group, server_uri) 3265 else: 3266 logger.error('Unknown test case: %s', test_case) 3267 sys.exit(1) 3268 if client_process and client_process.poll() is not None: 3269 raise Exception( 3270 'Client process exited prematurely with exit code %d' % 3271 client_process.returncode) 3272 result.state = 'PASSED' 3273 result.returncode = 0 3274 except Exception as e: 3275 logger.exception('Test case %s failed', test_case) 3276 failed_tests.append(test_case) 3277 result.state = 'FAILED' 3278 result.message = str(e) 3279 finally: 3280 if client_process: 3281 if client_process.returncode: 3282 logger.info('Client exited with code %d' % 3283 client_process.returncode) 3284 else: 3285 client_process.terminate() 3286 test_log_file.close() 3287 # Workaround for Python 3, as report_utils will invoke decode() on 3288 # result.message, which has a default value of ''. 3289 result.message = result.message.encode('UTF-8') 3290 test_results[test_case] = [result] 3291 if args.log_client_output: 3292 logger.info('Client output:') 3293 with open(test_log_filename, 'r') as client_output: 3294 logger.info(client_output.read()) 3295 if not os.path.exists(_TEST_LOG_BASE_DIR): 3296 os.makedirs(_TEST_LOG_BASE_DIR) 3297 report_utils.render_junit_xml_report(test_results, 3298 os.path.join( 3299 _TEST_LOG_BASE_DIR, 3300 _SPONGE_XML_NAME), 3301 suite_name='xds_tests', 3302 multi_target=True) 3303 if failed_tests: 3304 logger.error('Test case(s) %s failed', failed_tests) 3305 sys.exit(1) 3306finally: 3307 if not args.keep_gcp_resources: 3308 logger.info('Cleaning up GCP resources. This may take some time.') 3309 clean_up(gcp) 3310