1#!/usr/bin/env python 2# Copyright 2020 gRPC authors. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15"""Run xDS integration tests on GCP using Traffic Director.""" 16 17import argparse 18import googleapiclient.discovery 19import grpc 20import logging 21import os 22import random 23import shlex 24import socket 25import subprocess 26import sys 27import tempfile 28import time 29 30from oauth2client.client import GoogleCredentials 31 32import python_utils.jobset as jobset 33import python_utils.report_utils as report_utils 34 35from src.proto.grpc.testing import empty_pb2 36from src.proto.grpc.testing import messages_pb2 37from src.proto.grpc.testing import test_pb2_grpc 38 39logger = logging.getLogger() 40console_handler = logging.StreamHandler() 41formatter = logging.Formatter(fmt='%(asctime)s: %(levelname)-8s %(message)s') 42console_handler.setFormatter(formatter) 43logger.handlers = [] 44logger.addHandler(console_handler) 45logger.setLevel(logging.WARNING) 46 47_TEST_CASES = [ 48 'backends_restart', 49 'change_backend_service', 50 'gentle_failover', 51 'new_instance_group_receives_traffic', 52 'ping_pong', 53 'remove_instance_group', 54 'round_robin', 55 'secondary_locality_gets_no_requests_on_partial_primary_failure', 56 'secondary_locality_gets_requests_on_primary_failure', 57 'traffic_splitting', 58] 59# Valid test cases, but not in all. So the tests can only run manually, and 60# aren't enabled automatically for all languages. 61# 62# TODO: Move them into _TEST_CASES when support is ready in all languages. 63_ADDITIONAL_TEST_CASES = ['path_matching', 'header_matching'] 64 65 66def parse_test_cases(arg): 67 if arg == '': 68 return [] 69 arg_split = arg.split(',') 70 test_cases = set() 71 all_test_cases = _TEST_CASES + _ADDITIONAL_TEST_CASES 72 for arg in arg_split: 73 if arg == "all": 74 test_cases = test_cases.union(_TEST_CASES) 75 else: 76 test_cases = test_cases.union([arg]) 77 if not all([test_case in all_test_cases for test_case in test_cases]): 78 raise Exception('Failed to parse test cases %s' % arg) 79 # Perserve order. 80 return [x for x in all_test_cases if x in test_cases] 81 82 83def parse_port_range(port_arg): 84 try: 85 port = int(port_arg) 86 return range(port, port + 1) 87 except: 88 port_min, port_max = port_arg.split(':') 89 return range(int(port_min), int(port_max) + 1) 90 91 92argp = argparse.ArgumentParser(description='Run xDS interop tests on GCP') 93argp.add_argument('--project_id', help='GCP project id') 94argp.add_argument( 95 '--gcp_suffix', 96 default='', 97 help='Optional suffix for all generated GCP resource names. Useful to ' 98 'ensure distinct names across test runs.') 99argp.add_argument( 100 '--test_case', 101 default='ping_pong', 102 type=parse_test_cases, 103 help='Comma-separated list of test cases to run. Available tests: %s, ' 104 '(or \'all\' to run every test). ' 105 'Alternative tests not included in \'all\': %s' % 106 (','.join(_TEST_CASES), ','.join(_ADDITIONAL_TEST_CASES))) 107argp.add_argument( 108 '--bootstrap_file', 109 default='', 110 help='File to reference via GRPC_XDS_BOOTSTRAP. Disables built-in ' 111 'bootstrap generation') 112argp.add_argument( 113 '--client_cmd', 114 default=None, 115 help='Command to launch xDS test client. {server_uri}, {stats_port} and ' 116 '{qps} references will be replaced using str.format(). GRPC_XDS_BOOTSTRAP ' 117 'will be set for the command') 118argp.add_argument('--zone', default='us-central1-a') 119argp.add_argument('--secondary_zone', 120 default='us-west1-b', 121 help='Zone to use for secondary TD locality tests') 122argp.add_argument('--qps', default=100, type=int, help='Client QPS') 123argp.add_argument( 124 '--wait_for_backend_sec', 125 default=1200, 126 type=int, 127 help='Time limit for waiting for created backend services to report ' 128 'healthy when launching or updated GCP resources') 129argp.add_argument( 130 '--use_existing_gcp_resources', 131 default=False, 132 action='store_true', 133 help= 134 'If set, find and use already created GCP resources instead of creating new' 135 ' ones.') 136argp.add_argument( 137 '--keep_gcp_resources', 138 default=False, 139 action='store_true', 140 help= 141 'Leave GCP VMs and configuration running after test. Default behavior is ' 142 'to delete when tests complete.') 143argp.add_argument( 144 '--compute_discovery_document', 145 default=None, 146 type=str, 147 help= 148 'If provided, uses this file instead of retrieving via the GCP discovery ' 149 'API') 150argp.add_argument( 151 '--alpha_compute_discovery_document', 152 default=None, 153 type=str, 154 help='If provided, uses this file instead of retrieving via the alpha GCP ' 155 'discovery API') 156argp.add_argument('--network', 157 default='global/networks/default', 158 help='GCP network to use') 159argp.add_argument('--service_port_range', 160 default='8080:8110', 161 type=parse_port_range, 162 help='Listening port for created gRPC backends. Specified as ' 163 'either a single int or as a range in the format min:max, in ' 164 'which case an available port p will be chosen s.t. min <= p ' 165 '<= max') 166argp.add_argument( 167 '--stats_port', 168 default=8079, 169 type=int, 170 help='Local port for the client process to expose the LB stats service') 171argp.add_argument('--xds_server', 172 default='trafficdirector.googleapis.com:443', 173 help='xDS server') 174argp.add_argument('--source_image', 175 default='projects/debian-cloud/global/images/family/debian-9', 176 help='Source image for VMs created during the test') 177argp.add_argument('--path_to_server_binary', 178 default=None, 179 type=str, 180 help='If set, the server binary must already be pre-built on ' 181 'the specified source image') 182argp.add_argument('--machine_type', 183 default='e2-standard-2', 184 help='Machine type for VMs created during the test') 185argp.add_argument( 186 '--instance_group_size', 187 default=2, 188 type=int, 189 help='Number of VMs to create per instance group. Certain test cases (e.g., ' 190 'round_robin) may not give meaningful results if this is set to a value ' 191 'less than 2.') 192argp.add_argument('--verbose', 193 help='verbose log output', 194 default=False, 195 action='store_true') 196# TODO(ericgribkoff) Remove this param once the sponge-formatted log files are 197# visible in all test environments. 198argp.add_argument('--log_client_output', 199 help='Log captured client output', 200 default=False, 201 action='store_true') 202# TODO(ericgribkoff) Remove this flag once all test environments are verified to 203# have access to the alpha compute APIs. 204argp.add_argument('--only_stable_gcp_apis', 205 help='Do not use alpha compute APIs. Some tests may be ' 206 'incompatible with this option (gRPC health checks are ' 207 'currently alpha and required for simulating server failure', 208 default=False, 209 action='store_true') 210args = argp.parse_args() 211 212if args.verbose: 213 logger.setLevel(logging.DEBUG) 214 215_DEFAULT_SERVICE_PORT = 80 216_WAIT_FOR_BACKEND_SEC = args.wait_for_backend_sec 217_WAIT_FOR_OPERATION_SEC = 300 218_INSTANCE_GROUP_SIZE = args.instance_group_size 219_NUM_TEST_RPCS = 10 * args.qps 220_WAIT_FOR_STATS_SEC = 180 221_WAIT_FOR_VALID_CONFIG_SEC = 60 222_WAIT_FOR_URL_MAP_PATCH_SEC = 300 223_CONNECTION_TIMEOUT_SEC = 60 224_GCP_API_RETRIES = 5 225_BOOTSTRAP_TEMPLATE = """ 226{{ 227 "node": {{ 228 "id": "{node_id}", 229 "metadata": {{ 230 "TRAFFICDIRECTOR_NETWORK_NAME": "%s" 231 }}, 232 "locality": {{ 233 "zone": "%s" 234 }} 235 }}, 236 "xds_servers": [{{ 237 "server_uri": "%s", 238 "channel_creds": [ 239 {{ 240 "type": "google_default", 241 "config": {{}} 242 }} 243 ] 244 }}] 245}}""" % (args.network.split('/')[-1], args.zone, args.xds_server) 246 247# TODO(ericgribkoff) Add change_backend_service to this list once TD no longer 248# sends an update with no localities when adding the MIG to the backend service 249# can race with the URL map patch. 250_TESTS_TO_FAIL_ON_RPC_FAILURE = [ 251 'new_instance_group_receives_traffic', 'ping_pong', 'round_robin' 252] 253# Tests that run UnaryCall and EmptyCall. 254_TESTS_TO_RUN_MULTIPLE_RPCS = ['path_matching', 'header_matching'] 255# Tests that make UnaryCall with test metadata. 256_TESTS_TO_SEND_METADATA = ['header_matching'] 257_TEST_METADATA_KEY = 'xds_md' 258_TEST_METADATA_VALUE = 'exact_match' 259_PATH_MATCHER_NAME = 'path-matcher' 260_BASE_TEMPLATE_NAME = 'test-template' 261_BASE_INSTANCE_GROUP_NAME = 'test-ig' 262_BASE_HEALTH_CHECK_NAME = 'test-hc' 263_BASE_FIREWALL_RULE_NAME = 'test-fw-rule' 264_BASE_BACKEND_SERVICE_NAME = 'test-backend-service' 265_BASE_URL_MAP_NAME = 'test-map' 266_BASE_SERVICE_HOST = 'grpc-test' 267_BASE_TARGET_PROXY_NAME = 'test-target-proxy' 268_BASE_FORWARDING_RULE_NAME = 'test-forwarding-rule' 269_TEST_LOG_BASE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 270 '../../reports') 271_SPONGE_LOG_NAME = 'sponge_log.log' 272_SPONGE_XML_NAME = 'sponge_log.xml' 273 274 275def get_client_stats(num_rpcs, timeout_sec): 276 with grpc.insecure_channel('localhost:%d' % args.stats_port) as channel: 277 stub = test_pb2_grpc.LoadBalancerStatsServiceStub(channel) 278 request = messages_pb2.LoadBalancerStatsRequest() 279 request.num_rpcs = num_rpcs 280 request.timeout_sec = timeout_sec 281 rpc_timeout = timeout_sec + _CONNECTION_TIMEOUT_SEC 282 response = stub.GetClientStats(request, 283 wait_for_ready=True, 284 timeout=rpc_timeout) 285 logger.debug('Invoked GetClientStats RPC: %s', response) 286 return response 287 288 289class RpcDistributionError(Exception): 290 pass 291 292 293def _verify_rpcs_to_given_backends(backends, timeout_sec, num_rpcs, 294 allow_failures): 295 start_time = time.time() 296 error_msg = None 297 logger.debug('Waiting for %d sec until backends %s receive load' % 298 (timeout_sec, backends)) 299 while time.time() - start_time <= timeout_sec: 300 error_msg = None 301 stats = get_client_stats(num_rpcs, timeout_sec) 302 rpcs_by_peer = stats.rpcs_by_peer 303 for backend in backends: 304 if backend not in rpcs_by_peer: 305 error_msg = 'Backend %s did not receive load' % backend 306 break 307 if not error_msg and len(rpcs_by_peer) > len(backends): 308 error_msg = 'Unexpected backend received load: %s' % rpcs_by_peer 309 if not allow_failures and stats.num_failures > 0: 310 error_msg = '%d RPCs failed' % stats.num_failures 311 if not error_msg: 312 return 313 raise RpcDistributionError(error_msg) 314 315 316def wait_until_all_rpcs_go_to_given_backends_or_fail(backends, 317 timeout_sec, 318 num_rpcs=_NUM_TEST_RPCS): 319 _verify_rpcs_to_given_backends(backends, 320 timeout_sec, 321 num_rpcs, 322 allow_failures=True) 323 324 325def wait_until_all_rpcs_go_to_given_backends(backends, 326 timeout_sec, 327 num_rpcs=_NUM_TEST_RPCS): 328 _verify_rpcs_to_given_backends(backends, 329 timeout_sec, 330 num_rpcs, 331 allow_failures=False) 332 333 334def compare_distributions(actual_distribution, expected_distribution, 335 threshold): 336 """Compare if two distributions are similar. 337 338 Args: 339 actual_distribution: A list of floats, contains the actual distribution. 340 expected_distribution: A list of floats, contains the expected distribution. 341 threshold: Number within [0,100], the threshold percentage by which the 342 actual distribution can differ from the expected distribution. 343 344 Returns: 345 The similarity between the distributions as a boolean. Returns true if the 346 actual distribution lies within the threshold of the expected 347 distribution, false otherwise. 348 349 Raises: 350 ValueError: if threshold is not with in [0,100]. 351 Exception: containing detailed error messages. 352 """ 353 if len(expected_distribution) != len(actual_distribution): 354 raise Exception( 355 'Error: expected and actual distributions have different size (%d vs %d)' 356 % (len(expected_distribution), len(actual_distribution))) 357 if threshold < 0 or threshold > 100: 358 raise ValueError('Value error: Threshold should be between 0 to 100') 359 threshold_fraction = threshold / 100.0 360 for expected, actual in zip(expected_distribution, actual_distribution): 361 if actual < (expected * (1 - threshold_fraction)): 362 raise Exception("actual(%f) < expected(%f-%d%%)" % 363 (actual, expected, threshold)) 364 if actual > (expected * (1 + threshold_fraction)): 365 raise Exception("actual(%f) > expected(%f+%d%%)" % 366 (actual, expected, threshold)) 367 return True 368 369 370def compare_expected_instances(stats, expected_instances): 371 """Compare if stats have expected instances for each type of RPC. 372 373 Args: 374 stats: LoadBalancerStatsResponse reported by interop client. 375 expected_instances: a dict with key as the RPC type (string), value as 376 the expected backend instances (list of strings). 377 378 Returns: 379 Returns true if the instances are expected. False if not. 380 """ 381 for rpc_type, expected_peers in expected_instances.items(): 382 rpcs_by_peer_for_type = stats.rpcs_by_method[rpc_type] 383 rpcs_by_peer = rpcs_by_peer_for_type.rpcs_by_peer if rpcs_by_peer_for_type else None 384 logger.debug('rpc: %s, by_peer: %s', rpc_type, rpcs_by_peer) 385 peers = list(rpcs_by_peer.keys()) 386 if set(peers) != set(expected_peers): 387 logger.info('unexpected peers for %s, got %s, want %s', rpc_type, 388 peers, expected_peers) 389 return False 390 return True 391 392 393def test_backends_restart(gcp, backend_service, instance_group): 394 logger.info('Running test_backends_restart') 395 instance_names = get_instance_names(gcp, instance_group) 396 num_instances = len(instance_names) 397 start_time = time.time() 398 wait_until_all_rpcs_go_to_given_backends(instance_names, 399 _WAIT_FOR_STATS_SEC) 400 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) 401 try: 402 resize_instance_group(gcp, instance_group, 0) 403 wait_until_all_rpcs_go_to_given_backends_or_fail([], 404 _WAIT_FOR_BACKEND_SEC) 405 finally: 406 resize_instance_group(gcp, instance_group, num_instances) 407 wait_for_healthy_backends(gcp, backend_service, instance_group) 408 new_instance_names = get_instance_names(gcp, instance_group) 409 wait_until_all_rpcs_go_to_given_backends(new_instance_names, 410 _WAIT_FOR_BACKEND_SEC) 411 new_stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) 412 original_distribution = list(stats.rpcs_by_peer.values()) 413 original_distribution.sort() 414 new_distribution = list(new_stats.rpcs_by_peer.values()) 415 new_distribution.sort() 416 threshold = 3 417 for i in range(len(original_distribution)): 418 if abs(original_distribution[i] - new_distribution[i]) > threshold: 419 raise Exception('Distributions do not match: ', stats, new_stats) 420 421 422def test_change_backend_service(gcp, original_backend_service, instance_group, 423 alternate_backend_service, 424 same_zone_instance_group): 425 logger.info('Running test_change_backend_service') 426 original_backend_instances = get_instance_names(gcp, instance_group) 427 alternate_backend_instances = get_instance_names(gcp, 428 same_zone_instance_group) 429 patch_backend_instances(gcp, alternate_backend_service, 430 [same_zone_instance_group]) 431 wait_for_healthy_backends(gcp, original_backend_service, instance_group) 432 wait_for_healthy_backends(gcp, alternate_backend_service, 433 same_zone_instance_group) 434 wait_until_all_rpcs_go_to_given_backends(original_backend_instances, 435 _WAIT_FOR_STATS_SEC) 436 try: 437 patch_url_map_backend_service(gcp, alternate_backend_service) 438 wait_until_all_rpcs_go_to_given_backends(alternate_backend_instances, 439 _WAIT_FOR_URL_MAP_PATCH_SEC) 440 finally: 441 patch_url_map_backend_service(gcp, original_backend_service) 442 patch_backend_instances(gcp, alternate_backend_service, []) 443 444 445def test_gentle_failover(gcp, 446 backend_service, 447 primary_instance_group, 448 secondary_instance_group, 449 swapped_primary_and_secondary=False): 450 logger.info('Running test_gentle_failover') 451 num_primary_instances = len(get_instance_names(gcp, primary_instance_group)) 452 min_instances_for_gentle_failover = 3 # Need >50% failure to start failover 453 try: 454 if num_primary_instances < min_instances_for_gentle_failover: 455 resize_instance_group(gcp, primary_instance_group, 456 min_instances_for_gentle_failover) 457 patch_backend_instances( 458 gcp, backend_service, 459 [primary_instance_group, secondary_instance_group]) 460 primary_instance_names = get_instance_names(gcp, primary_instance_group) 461 secondary_instance_names = get_instance_names(gcp, 462 secondary_instance_group) 463 wait_for_healthy_backends(gcp, backend_service, primary_instance_group) 464 wait_for_healthy_backends(gcp, backend_service, 465 secondary_instance_group) 466 wait_until_all_rpcs_go_to_given_backends(primary_instance_names, 467 _WAIT_FOR_STATS_SEC) 468 instances_to_stop = primary_instance_names[:-1] 469 remaining_instances = primary_instance_names[-1:] 470 try: 471 set_serving_status(instances_to_stop, 472 gcp.service_port, 473 serving=False) 474 wait_until_all_rpcs_go_to_given_backends( 475 remaining_instances + secondary_instance_names, 476 _WAIT_FOR_BACKEND_SEC) 477 finally: 478 set_serving_status(primary_instance_names, 479 gcp.service_port, 480 serving=True) 481 except RpcDistributionError as e: 482 if not swapped_primary_and_secondary and is_primary_instance_group( 483 gcp, secondary_instance_group): 484 # Swap expectation of primary and secondary instance groups. 485 test_gentle_failover(gcp, 486 backend_service, 487 secondary_instance_group, 488 primary_instance_group, 489 swapped_primary_and_secondary=True) 490 else: 491 raise e 492 finally: 493 patch_backend_instances(gcp, backend_service, [primary_instance_group]) 494 resize_instance_group(gcp, primary_instance_group, 495 num_primary_instances) 496 instance_names = get_instance_names(gcp, primary_instance_group) 497 wait_until_all_rpcs_go_to_given_backends(instance_names, 498 _WAIT_FOR_BACKEND_SEC) 499 500 501def test_new_instance_group_receives_traffic(gcp, backend_service, 502 instance_group, 503 same_zone_instance_group): 504 logger.info('Running test_new_instance_group_receives_traffic') 505 instance_names = get_instance_names(gcp, instance_group) 506 # TODO(ericgribkoff) Reduce this timeout. When running sequentially, this 507 # occurs after patching the url map in test_change_backend_service, so we 508 # need the extended timeout here as well. 509 wait_until_all_rpcs_go_to_given_backends(instance_names, 510 _WAIT_FOR_URL_MAP_PATCH_SEC) 511 try: 512 patch_backend_instances(gcp, 513 backend_service, 514 [instance_group, same_zone_instance_group], 515 balancing_mode='RATE') 516 wait_for_healthy_backends(gcp, backend_service, instance_group) 517 wait_for_healthy_backends(gcp, backend_service, 518 same_zone_instance_group) 519 combined_instance_names = instance_names + get_instance_names( 520 gcp, same_zone_instance_group) 521 wait_until_all_rpcs_go_to_given_backends(combined_instance_names, 522 _WAIT_FOR_BACKEND_SEC) 523 finally: 524 patch_backend_instances(gcp, backend_service, [instance_group]) 525 526 527def test_ping_pong(gcp, backend_service, instance_group): 528 logger.info('Running test_ping_pong') 529 wait_for_healthy_backends(gcp, backend_service, instance_group) 530 instance_names = get_instance_names(gcp, instance_group) 531 wait_until_all_rpcs_go_to_given_backends(instance_names, 532 _WAIT_FOR_STATS_SEC) 533 534 535def test_remove_instance_group(gcp, backend_service, instance_group, 536 same_zone_instance_group): 537 logger.info('Running test_remove_instance_group') 538 try: 539 patch_backend_instances(gcp, 540 backend_service, 541 [instance_group, same_zone_instance_group], 542 balancing_mode='RATE') 543 wait_for_healthy_backends(gcp, backend_service, instance_group) 544 wait_for_healthy_backends(gcp, backend_service, 545 same_zone_instance_group) 546 instance_names = get_instance_names(gcp, instance_group) 547 same_zone_instance_names = get_instance_names(gcp, 548 same_zone_instance_group) 549 wait_until_all_rpcs_go_to_given_backends( 550 instance_names + same_zone_instance_names, _WAIT_FOR_BACKEND_SEC) 551 patch_backend_instances(gcp, 552 backend_service, [same_zone_instance_group], 553 balancing_mode='RATE') 554 wait_until_all_rpcs_go_to_given_backends(same_zone_instance_names, 555 _WAIT_FOR_BACKEND_SEC) 556 finally: 557 patch_backend_instances(gcp, backend_service, [instance_group]) 558 wait_until_all_rpcs_go_to_given_backends(instance_names, 559 _WAIT_FOR_BACKEND_SEC) 560 561 562def test_round_robin(gcp, backend_service, instance_group): 563 logger.info('Running test_round_robin') 564 wait_for_healthy_backends(gcp, backend_service, instance_group) 565 instance_names = get_instance_names(gcp, instance_group) 566 threshold = 1 567 wait_until_all_rpcs_go_to_given_backends(instance_names, 568 _WAIT_FOR_STATS_SEC) 569 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) 570 requests_received = [stats.rpcs_by_peer[x] for x in stats.rpcs_by_peer] 571 total_requests_received = sum(requests_received) 572 if total_requests_received != _NUM_TEST_RPCS: 573 raise Exception('Unexpected RPC failures', stats) 574 expected_requests = total_requests_received / len(instance_names) 575 for instance in instance_names: 576 if abs(stats.rpcs_by_peer[instance] - expected_requests) > threshold: 577 raise Exception( 578 'RPC peer distribution differs from expected by more than %d ' 579 'for instance %s (%s)', threshold, instance, stats) 580 581 582def test_secondary_locality_gets_no_requests_on_partial_primary_failure( 583 gcp, 584 backend_service, 585 primary_instance_group, 586 secondary_instance_group, 587 swapped_primary_and_secondary=False): 588 logger.info( 589 'Running secondary_locality_gets_no_requests_on_partial_primary_failure' 590 ) 591 try: 592 patch_backend_instances( 593 gcp, backend_service, 594 [primary_instance_group, secondary_instance_group]) 595 wait_for_healthy_backends(gcp, backend_service, primary_instance_group) 596 wait_for_healthy_backends(gcp, backend_service, 597 secondary_instance_group) 598 primary_instance_names = get_instance_names(gcp, primary_instance_group) 599 wait_until_all_rpcs_go_to_given_backends(primary_instance_names, 600 _WAIT_FOR_STATS_SEC) 601 instances_to_stop = primary_instance_names[:1] 602 remaining_instances = primary_instance_names[1:] 603 try: 604 set_serving_status(instances_to_stop, 605 gcp.service_port, 606 serving=False) 607 wait_until_all_rpcs_go_to_given_backends(remaining_instances, 608 _WAIT_FOR_BACKEND_SEC) 609 finally: 610 set_serving_status(primary_instance_names, 611 gcp.service_port, 612 serving=True) 613 except RpcDistributionError as e: 614 if not swapped_primary_and_secondary and is_primary_instance_group( 615 gcp, secondary_instance_group): 616 # Swap expectation of primary and secondary instance groups. 617 test_secondary_locality_gets_no_requests_on_partial_primary_failure( 618 gcp, 619 backend_service, 620 secondary_instance_group, 621 primary_instance_group, 622 swapped_primary_and_secondary=True) 623 else: 624 raise e 625 finally: 626 patch_backend_instances(gcp, backend_service, [primary_instance_group]) 627 628 629def test_secondary_locality_gets_requests_on_primary_failure( 630 gcp, 631 backend_service, 632 primary_instance_group, 633 secondary_instance_group, 634 swapped_primary_and_secondary=False): 635 logger.info('Running secondary_locality_gets_requests_on_primary_failure') 636 try: 637 patch_backend_instances( 638 gcp, backend_service, 639 [primary_instance_group, secondary_instance_group]) 640 wait_for_healthy_backends(gcp, backend_service, primary_instance_group) 641 wait_for_healthy_backends(gcp, backend_service, 642 secondary_instance_group) 643 primary_instance_names = get_instance_names(gcp, primary_instance_group) 644 secondary_instance_names = get_instance_names(gcp, 645 secondary_instance_group) 646 wait_until_all_rpcs_go_to_given_backends(primary_instance_names, 647 _WAIT_FOR_STATS_SEC) 648 try: 649 set_serving_status(primary_instance_names, 650 gcp.service_port, 651 serving=False) 652 wait_until_all_rpcs_go_to_given_backends(secondary_instance_names, 653 _WAIT_FOR_BACKEND_SEC) 654 finally: 655 set_serving_status(primary_instance_names, 656 gcp.service_port, 657 serving=True) 658 except RpcDistributionError as e: 659 if not swapped_primary_and_secondary and is_primary_instance_group( 660 gcp, secondary_instance_group): 661 # Swap expectation of primary and secondary instance groups. 662 test_secondary_locality_gets_requests_on_primary_failure( 663 gcp, 664 backend_service, 665 secondary_instance_group, 666 primary_instance_group, 667 swapped_primary_and_secondary=True) 668 else: 669 raise e 670 finally: 671 patch_backend_instances(gcp, backend_service, [primary_instance_group]) 672 673 674def prepare_services_for_urlmap_tests(gcp, original_backend_service, 675 instance_group, alternate_backend_service, 676 same_zone_instance_group): 677 ''' 678 This function prepares the services to be ready for tests that modifies 679 urlmaps. 680 681 Returns: 682 Returns original and alternate backend names as lists of strings. 683 ''' 684 # The config validation for proxyless doesn't allow setting 685 # default_route_action or route_rules. Disable validate 686 # validate_for_proxyless for this test. This can be removed when validation 687 # accepts default_route_action. 688 logger.info('disabling validate_for_proxyless in target proxy') 689 set_validate_for_proxyless(gcp, False) 690 691 logger.info('waiting for original backends to become healthy') 692 wait_for_healthy_backends(gcp, original_backend_service, instance_group) 693 694 patch_backend_instances(gcp, alternate_backend_service, 695 [same_zone_instance_group]) 696 logger.info('waiting for alternate to become healthy') 697 wait_for_healthy_backends(gcp, alternate_backend_service, 698 same_zone_instance_group) 699 700 original_backend_instances = get_instance_names(gcp, instance_group) 701 logger.info('original backends instances: %s', original_backend_instances) 702 703 alternate_backend_instances = get_instance_names(gcp, 704 same_zone_instance_group) 705 logger.info('alternate backends instances: %s', alternate_backend_instances) 706 707 # Start with all traffic going to original_backend_service. 708 logger.info('waiting for traffic to all go to original backends') 709 wait_until_all_rpcs_go_to_given_backends(original_backend_instances, 710 _WAIT_FOR_STATS_SEC) 711 return original_backend_instances, alternate_backend_instances 712 713 714def test_traffic_splitting(gcp, original_backend_service, instance_group, 715 alternate_backend_service, same_zone_instance_group): 716 # This test start with all traffic going to original_backend_service. Then 717 # it updates URL-map to set default action to traffic splitting between 718 # original and alternate. It waits for all backends in both services to 719 # receive traffic, then verifies that weights are expected. 720 logger.info('Running test_traffic_splitting') 721 722 original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests( 723 gcp, original_backend_service, instance_group, 724 alternate_backend_service, same_zone_instance_group) 725 726 try: 727 # Patch urlmap, change route action to traffic splitting between 728 # original and alternate. 729 logger.info('patching url map with traffic splitting') 730 original_service_percentage, alternate_service_percentage = 20, 80 731 patch_url_map_backend_service( 732 gcp, 733 services_with_weights={ 734 original_backend_service: original_service_percentage, 735 alternate_backend_service: alternate_service_percentage, 736 }) 737 # Split percentage between instances: [20,80] -> [10,10,40,40]. 738 expected_instance_percentage = [ 739 original_service_percentage * 1.0 / len(original_backend_instances) 740 ] * len(original_backend_instances) + [ 741 alternate_service_percentage * 1.0 / 742 len(alternate_backend_instances) 743 ] * len(alternate_backend_instances) 744 745 # Wait for traffic to go to both services. 746 logger.info( 747 'waiting for traffic to go to all backends (including alternate)') 748 wait_until_all_rpcs_go_to_given_backends( 749 original_backend_instances + alternate_backend_instances, 750 _WAIT_FOR_STATS_SEC) 751 752 # Verify that weights between two services are expected. 753 retry_count = 10 754 # Each attempt takes about 10 seconds, 10 retries is equivalent to 100 755 # seconds timeout. 756 for i in range(retry_count): 757 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) 758 got_instance_count = [ 759 stats.rpcs_by_peer[i] for i in original_backend_instances 760 ] + [stats.rpcs_by_peer[i] for i in alternate_backend_instances] 761 total_count = sum(got_instance_count) 762 got_instance_percentage = [ 763 x * 100.0 / total_count for x in got_instance_count 764 ] 765 766 try: 767 compare_distributions(got_instance_percentage, 768 expected_instance_percentage, 5) 769 except Exception as e: 770 logger.info('attempt %d', i) 771 logger.info('got percentage: %s', got_instance_percentage) 772 logger.info('expected percentage: %s', 773 expected_instance_percentage) 774 logger.info(e) 775 if i == retry_count - 1: 776 raise Exception( 777 'RPC distribution (%s) differs from expected (%s)', 778 got_instance_percentage, expected_instance_percentage) 779 else: 780 logger.info("success") 781 break 782 finally: 783 patch_url_map_backend_service(gcp, original_backend_service) 784 patch_backend_instances(gcp, alternate_backend_service, []) 785 set_validate_for_proxyless(gcp, True) 786 787 788def test_path_matching(gcp, original_backend_service, instance_group, 789 alternate_backend_service, same_zone_instance_group): 790 # This test start with all traffic (UnaryCall and EmptyCall) going to 791 # original_backend_service. 792 # 793 # Then it updates URL-map to add routes, to make UnaryCall and EmptyCall to 794 # go different backends. It waits for all backends in both services to 795 # receive traffic, then verifies that traffic goes to the expected 796 # backends. 797 logger.info('Running test_path_matching') 798 799 original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests( 800 gcp, original_backend_service, instance_group, 801 alternate_backend_service, same_zone_instance_group) 802 803 try: 804 # A list of tuples (route_rules, expected_instances). 805 test_cases = [ 806 ( 807 [{ 808 'priority': 0, 809 # FullPath EmptyCall -> alternate_backend_service. 810 'matchRules': [{ 811 'fullPathMatch': '/grpc.testing.TestService/EmptyCall' 812 }], 813 'service': alternate_backend_service.url 814 }], 815 { 816 "EmptyCall": alternate_backend_instances, 817 "UnaryCall": original_backend_instances 818 }), 819 ( 820 [{ 821 'priority': 0, 822 # Prefix UnaryCall -> alternate_backend_service. 823 'matchRules': [{ 824 'prefixMatch': '/grpc.testing.TestService/Unary' 825 }], 826 'service': alternate_backend_service.url 827 }], 828 { 829 "UnaryCall": alternate_backend_instances, 830 "EmptyCall": original_backend_instances 831 }) 832 ] 833 834 for (route_rules, expected_instances) in test_cases: 835 logger.info('patching url map with %s -> alternative', 836 route_rules[0]['matchRules']) 837 patch_url_map_backend_service(gcp, 838 original_backend_service, 839 route_rules=route_rules) 840 841 # Wait for traffic to go to both services. 842 logger.info( 843 'waiting for traffic to go to all backends (including alternate)' 844 ) 845 wait_until_all_rpcs_go_to_given_backends( 846 original_backend_instances + alternate_backend_instances, 847 _WAIT_FOR_STATS_SEC) 848 849 retry_count = 10 850 # Each attempt takes about 10 seconds, 10 retries is equivalent to 100 851 # seconds timeout. 852 for i in range(retry_count): 853 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) 854 if not stats.rpcs_by_method: 855 raise ValueError( 856 'stats.rpcs_by_method is None, the interop client stats service does not support this test case' 857 ) 858 logger.info('attempt %d', i) 859 if compare_expected_instances(stats, expected_instances): 860 logger.info("success") 861 break 862 finally: 863 patch_url_map_backend_service(gcp, original_backend_service) 864 patch_backend_instances(gcp, alternate_backend_service, []) 865 set_validate_for_proxyless(gcp, True) 866 867 868def test_header_matching(gcp, original_backend_service, instance_group, 869 alternate_backend_service, same_zone_instance_group): 870 # This test start with all traffic (UnaryCall and EmptyCall) going to 871 # original_backend_service. 872 # 873 # Then it updates URL-map to add routes, to make RPCs with test headers to 874 # go to different backends. It waits for all backends in both services to 875 # receive traffic, then verifies that traffic goes to the expected 876 # backends. 877 logger.info('Running test_header_matching') 878 879 original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests( 880 gcp, original_backend_service, instance_group, 881 alternate_backend_service, same_zone_instance_group) 882 883 try: 884 # A list of tuples (route_rules, expected_instances). 885 test_cases = [( 886 [{ 887 'priority': 0, 888 # Header ExactMatch -> alternate_backend_service. 889 # EmptyCall is sent with the metadata. 890 'matchRules': [{ 891 'prefixMatch': 892 '/', 893 'headerMatches': [{ 894 'headerName': _TEST_METADATA_KEY, 895 'exactMatch': _TEST_METADATA_VALUE 896 }] 897 }], 898 'service': alternate_backend_service.url 899 }], 900 { 901 "EmptyCall": alternate_backend_instances, 902 "UnaryCall": original_backend_instances 903 })] 904 905 for (route_rules, expected_instances) in test_cases: 906 logger.info('patching url map with %s -> alternative', 907 route_rules[0]['matchRules']) 908 patch_url_map_backend_service(gcp, 909 original_backend_service, 910 route_rules=route_rules) 911 912 # Wait for traffic to go to both services. 913 logger.info( 914 'waiting for traffic to go to all backends (including alternate)' 915 ) 916 wait_until_all_rpcs_go_to_given_backends( 917 original_backend_instances + alternate_backend_instances, 918 _WAIT_FOR_STATS_SEC) 919 920 retry_count = 10 921 # Each attempt takes about 10 seconds, 10 retries is equivalent to 100 922 # seconds timeout. 923 for i in range(retry_count): 924 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) 925 if not stats.rpcs_by_method: 926 raise ValueError( 927 'stats.rpcs_by_method is None, the interop client stats service does not support this test case' 928 ) 929 logger.info('attempt %d', i) 930 if compare_expected_instances(stats, expected_instances): 931 logger.info("success") 932 break 933 finally: 934 patch_url_map_backend_service(gcp, original_backend_service) 935 patch_backend_instances(gcp, alternate_backend_service, []) 936 set_validate_for_proxyless(gcp, True) 937 938 939def set_serving_status(instances, service_port, serving): 940 for instance in instances: 941 with grpc.insecure_channel('%s:%d' % 942 (instance, service_port)) as channel: 943 stub = test_pb2_grpc.XdsUpdateHealthServiceStub(channel) 944 if serving: 945 stub.SetServing(empty_pb2.Empty()) 946 else: 947 stub.SetNotServing(empty_pb2.Empty()) 948 949 950def is_primary_instance_group(gcp, instance_group): 951 # Clients may connect to a TD instance in a different region than the 952 # client, in which case primary/secondary assignments may not be based on 953 # the client's actual locality. 954 instance_names = get_instance_names(gcp, instance_group) 955 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) 956 return all(peer in instance_names for peer in stats.rpcs_by_peer.keys()) 957 958 959def get_startup_script(path_to_server_binary, service_port): 960 if path_to_server_binary: 961 return "nohup %s --port=%d 1>/dev/null &" % (path_to_server_binary, 962 service_port) 963 else: 964 return """#!/bin/bash 965sudo apt update 966sudo apt install -y git default-jdk 967mkdir java_server 968pushd java_server 969git clone https://github.com/grpc/grpc-java.git 970pushd grpc-java 971pushd interop-testing 972../gradlew installDist -x test -PskipCodegen=true -PskipAndroid=true 973 974nohup build/install/grpc-interop-testing/bin/xds-test-server \ 975 --port=%d 1>/dev/null &""" % service_port 976 977 978def create_instance_template(gcp, name, network, source_image, machine_type, 979 startup_script): 980 config = { 981 'name': name, 982 'properties': { 983 'tags': { 984 'items': ['allow-health-checks'] 985 }, 986 'machineType': machine_type, 987 'serviceAccounts': [{ 988 'email': 'default', 989 'scopes': ['https://www.googleapis.com/auth/cloud-platform',] 990 }], 991 'networkInterfaces': [{ 992 'accessConfigs': [{ 993 'type': 'ONE_TO_ONE_NAT' 994 }], 995 'network': network 996 }], 997 'disks': [{ 998 'boot': True, 999 'initializeParams': { 1000 'sourceImage': source_image 1001 } 1002 }], 1003 'metadata': { 1004 'items': [{ 1005 'key': 'startup-script', 1006 'value': startup_script 1007 }] 1008 } 1009 } 1010 } 1011 1012 logger.debug('Sending GCP request with body=%s', config) 1013 result = gcp.compute.instanceTemplates().insert( 1014 project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES) 1015 wait_for_global_operation(gcp, result['name']) 1016 gcp.instance_template = GcpResource(config['name'], result['targetLink']) 1017 1018 1019def add_instance_group(gcp, zone, name, size): 1020 config = { 1021 'name': name, 1022 'instanceTemplate': gcp.instance_template.url, 1023 'targetSize': size, 1024 'namedPorts': [{ 1025 'name': 'grpc', 1026 'port': gcp.service_port 1027 }] 1028 } 1029 1030 logger.debug('Sending GCP request with body=%s', config) 1031 result = gcp.compute.instanceGroupManagers().insert( 1032 project=gcp.project, zone=zone, 1033 body=config).execute(num_retries=_GCP_API_RETRIES) 1034 wait_for_zone_operation(gcp, zone, result['name']) 1035 result = gcp.compute.instanceGroupManagers().get( 1036 project=gcp.project, zone=zone, 1037 instanceGroupManager=config['name']).execute( 1038 num_retries=_GCP_API_RETRIES) 1039 instance_group = InstanceGroup(config['name'], result['instanceGroup'], 1040 zone) 1041 gcp.instance_groups.append(instance_group) 1042 wait_for_instance_group_to_reach_expected_size(gcp, instance_group, size, 1043 _WAIT_FOR_OPERATION_SEC) 1044 return instance_group 1045 1046 1047def create_health_check(gcp, name): 1048 if gcp.alpha_compute: 1049 config = { 1050 'name': name, 1051 'type': 'GRPC', 1052 'grpcHealthCheck': { 1053 'portSpecification': 'USE_SERVING_PORT' 1054 } 1055 } 1056 compute_to_use = gcp.alpha_compute 1057 else: 1058 config = { 1059 'name': name, 1060 'type': 'TCP', 1061 'tcpHealthCheck': { 1062 'portName': 'grpc' 1063 } 1064 } 1065 compute_to_use = gcp.compute 1066 logger.debug('Sending GCP request with body=%s', config) 1067 result = compute_to_use.healthChecks().insert( 1068 project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES) 1069 wait_for_global_operation(gcp, result['name']) 1070 gcp.health_check = GcpResource(config['name'], result['targetLink']) 1071 1072 1073def create_health_check_firewall_rule(gcp, name): 1074 config = { 1075 'name': name, 1076 'direction': 'INGRESS', 1077 'allowed': [{ 1078 'IPProtocol': 'tcp' 1079 }], 1080 'sourceRanges': ['35.191.0.0/16', '130.211.0.0/22'], 1081 'targetTags': ['allow-health-checks'], 1082 } 1083 logger.debug('Sending GCP request with body=%s', config) 1084 result = gcp.compute.firewalls().insert( 1085 project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES) 1086 wait_for_global_operation(gcp, result['name']) 1087 gcp.health_check_firewall_rule = GcpResource(config['name'], 1088 result['targetLink']) 1089 1090 1091def add_backend_service(gcp, name): 1092 if gcp.alpha_compute: 1093 protocol = 'GRPC' 1094 compute_to_use = gcp.alpha_compute 1095 else: 1096 protocol = 'HTTP2' 1097 compute_to_use = gcp.compute 1098 config = { 1099 'name': name, 1100 'loadBalancingScheme': 'INTERNAL_SELF_MANAGED', 1101 'healthChecks': [gcp.health_check.url], 1102 'portName': 'grpc', 1103 'protocol': protocol 1104 } 1105 logger.debug('Sending GCP request with body=%s', config) 1106 result = compute_to_use.backendServices().insert( 1107 project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES) 1108 wait_for_global_operation(gcp, result['name']) 1109 backend_service = GcpResource(config['name'], result['targetLink']) 1110 gcp.backend_services.append(backend_service) 1111 return backend_service 1112 1113 1114def create_url_map(gcp, name, backend_service, host_name): 1115 config = { 1116 'name': name, 1117 'defaultService': backend_service.url, 1118 'pathMatchers': [{ 1119 'name': _PATH_MATCHER_NAME, 1120 'defaultService': backend_service.url, 1121 }], 1122 'hostRules': [{ 1123 'hosts': [host_name], 1124 'pathMatcher': _PATH_MATCHER_NAME 1125 }] 1126 } 1127 logger.debug('Sending GCP request with body=%s', config) 1128 result = gcp.compute.urlMaps().insert( 1129 project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES) 1130 wait_for_global_operation(gcp, result['name']) 1131 gcp.url_map = GcpResource(config['name'], result['targetLink']) 1132 1133 1134def patch_url_map_host_rule_with_port(gcp, name, backend_service, host_name): 1135 config = { 1136 'hostRules': [{ 1137 'hosts': ['%s:%d' % (host_name, gcp.service_port)], 1138 'pathMatcher': _PATH_MATCHER_NAME 1139 }] 1140 } 1141 logger.debug('Sending GCP request with body=%s', config) 1142 result = gcp.compute.urlMaps().patch( 1143 project=gcp.project, urlMap=name, 1144 body=config).execute(num_retries=_GCP_API_RETRIES) 1145 wait_for_global_operation(gcp, result['name']) 1146 1147 1148def set_validate_for_proxyless(gcp, validate_for_proxyless): 1149 if not gcp.alpha_compute: 1150 logger.debug( 1151 'Not setting validateForProxy because alpha is not enabled') 1152 return 1153 # This function deletes global_forwarding_rule and target_proxy, then 1154 # recreate target_proxy with validateForProxyless=False. This is necessary 1155 # because patching target_grpc_proxy isn't supported. 1156 delete_global_forwarding_rule(gcp) 1157 delete_target_proxy(gcp) 1158 create_target_proxy(gcp, gcp.target_proxy.name, validate_for_proxyless) 1159 create_global_forwarding_rule(gcp, gcp.global_forwarding_rule.name, 1160 [gcp.service_port]) 1161 1162 1163def create_target_proxy(gcp, name, validate_for_proxyless=True): 1164 if gcp.alpha_compute: 1165 config = { 1166 'name': name, 1167 'url_map': gcp.url_map.url, 1168 'validate_for_proxyless': validate_for_proxyless, 1169 } 1170 logger.debug('Sending GCP request with body=%s', config) 1171 result = gcp.alpha_compute.targetGrpcProxies().insert( 1172 project=gcp.project, 1173 body=config).execute(num_retries=_GCP_API_RETRIES) 1174 else: 1175 config = { 1176 'name': name, 1177 'url_map': gcp.url_map.url, 1178 } 1179 logger.debug('Sending GCP request with body=%s', config) 1180 result = gcp.compute.targetHttpProxies().insert( 1181 project=gcp.project, 1182 body=config).execute(num_retries=_GCP_API_RETRIES) 1183 wait_for_global_operation(gcp, result['name']) 1184 gcp.target_proxy = GcpResource(config['name'], result['targetLink']) 1185 1186 1187def create_global_forwarding_rule(gcp, name, potential_ports): 1188 if gcp.alpha_compute: 1189 compute_to_use = gcp.alpha_compute 1190 else: 1191 compute_to_use = gcp.compute 1192 for port in potential_ports: 1193 try: 1194 config = { 1195 'name': name, 1196 'loadBalancingScheme': 'INTERNAL_SELF_MANAGED', 1197 'portRange': str(port), 1198 'IPAddress': '0.0.0.0', 1199 'network': args.network, 1200 'target': gcp.target_proxy.url, 1201 } 1202 logger.debug('Sending GCP request with body=%s', config) 1203 result = compute_to_use.globalForwardingRules().insert( 1204 project=gcp.project, 1205 body=config).execute(num_retries=_GCP_API_RETRIES) 1206 wait_for_global_operation(gcp, result['name']) 1207 gcp.global_forwarding_rule = GcpResource(config['name'], 1208 result['targetLink']) 1209 gcp.service_port = port 1210 return 1211 except googleapiclient.errors.HttpError as http_error: 1212 logger.warning( 1213 'Got error %s when attempting to create forwarding rule to ' 1214 '0.0.0.0:%d. Retrying with another port.' % (http_error, port)) 1215 1216 1217def get_health_check(gcp, health_check_name): 1218 result = gcp.compute.healthChecks().get( 1219 project=gcp.project, healthCheck=health_check_name).execute() 1220 gcp.health_check = GcpResource(health_check_name, result['selfLink']) 1221 1222 1223def get_health_check_firewall_rule(gcp, firewall_name): 1224 result = gcp.compute.firewalls().get(project=gcp.project, 1225 firewall=firewall_name).execute() 1226 gcp.health_check_firewall_rule = GcpResource(firewall_name, 1227 result['selfLink']) 1228 1229 1230def get_backend_service(gcp, backend_service_name): 1231 result = gcp.compute.backendServices().get( 1232 project=gcp.project, backendService=backend_service_name).execute() 1233 backend_service = GcpResource(backend_service_name, result['selfLink']) 1234 gcp.backend_services.append(backend_service) 1235 return backend_service 1236 1237 1238def get_url_map(gcp, url_map_name): 1239 result = gcp.compute.urlMaps().get(project=gcp.project, 1240 urlMap=url_map_name).execute() 1241 gcp.url_map = GcpResource(url_map_name, result['selfLink']) 1242 1243 1244def get_target_proxy(gcp, target_proxy_name): 1245 if gcp.alpha_compute: 1246 result = gcp.alpha_compute.targetGrpcProxies().get( 1247 project=gcp.project, targetGrpcProxy=target_proxy_name).execute() 1248 else: 1249 result = gcp.compute.targetHttpProxies().get( 1250 project=gcp.project, targetHttpProxy=target_proxy_name).execute() 1251 gcp.target_proxy = GcpResource(target_proxy_name, result['selfLink']) 1252 1253 1254def get_global_forwarding_rule(gcp, forwarding_rule_name): 1255 result = gcp.compute.globalForwardingRules().get( 1256 project=gcp.project, forwardingRule=forwarding_rule_name).execute() 1257 gcp.global_forwarding_rule = GcpResource(forwarding_rule_name, 1258 result['selfLink']) 1259 1260 1261def get_instance_template(gcp, template_name): 1262 result = gcp.compute.instanceTemplates().get( 1263 project=gcp.project, instanceTemplate=template_name).execute() 1264 gcp.instance_template = GcpResource(template_name, result['selfLink']) 1265 1266 1267def get_instance_group(gcp, zone, instance_group_name): 1268 result = gcp.compute.instanceGroups().get( 1269 project=gcp.project, zone=zone, 1270 instanceGroup=instance_group_name).execute() 1271 gcp.service_port = result['namedPorts'][0]['port'] 1272 instance_group = InstanceGroup(instance_group_name, result['selfLink'], 1273 zone) 1274 gcp.instance_groups.append(instance_group) 1275 return instance_group 1276 1277 1278def delete_global_forwarding_rule(gcp): 1279 try: 1280 result = gcp.compute.globalForwardingRules().delete( 1281 project=gcp.project, 1282 forwardingRule=gcp.global_forwarding_rule.name).execute( 1283 num_retries=_GCP_API_RETRIES) 1284 wait_for_global_operation(gcp, result['name']) 1285 except googleapiclient.errors.HttpError as http_error: 1286 logger.info('Delete failed: %s', http_error) 1287 1288 1289def delete_target_proxy(gcp): 1290 try: 1291 if gcp.alpha_compute: 1292 result = gcp.alpha_compute.targetGrpcProxies().delete( 1293 project=gcp.project, 1294 targetGrpcProxy=gcp.target_proxy.name).execute( 1295 num_retries=_GCP_API_RETRIES) 1296 else: 1297 result = gcp.compute.targetHttpProxies().delete( 1298 project=gcp.project, 1299 targetHttpProxy=gcp.target_proxy.name).execute( 1300 num_retries=_GCP_API_RETRIES) 1301 wait_for_global_operation(gcp, result['name']) 1302 except googleapiclient.errors.HttpError as http_error: 1303 logger.info('Delete failed: %s', http_error) 1304 1305 1306def delete_url_map(gcp): 1307 try: 1308 result = gcp.compute.urlMaps().delete( 1309 project=gcp.project, 1310 urlMap=gcp.url_map.name).execute(num_retries=_GCP_API_RETRIES) 1311 wait_for_global_operation(gcp, result['name']) 1312 except googleapiclient.errors.HttpError as http_error: 1313 logger.info('Delete failed: %s', http_error) 1314 1315 1316def delete_backend_services(gcp): 1317 for backend_service in gcp.backend_services: 1318 try: 1319 result = gcp.compute.backendServices().delete( 1320 project=gcp.project, 1321 backendService=backend_service.name).execute( 1322 num_retries=_GCP_API_RETRIES) 1323 wait_for_global_operation(gcp, result['name']) 1324 except googleapiclient.errors.HttpError as http_error: 1325 logger.info('Delete failed: %s', http_error) 1326 1327 1328def delete_firewall(gcp): 1329 try: 1330 result = gcp.compute.firewalls().delete( 1331 project=gcp.project, 1332 firewall=gcp.health_check_firewall_rule.name).execute( 1333 num_retries=_GCP_API_RETRIES) 1334 wait_for_global_operation(gcp, result['name']) 1335 except googleapiclient.errors.HttpError as http_error: 1336 logger.info('Delete failed: %s', http_error) 1337 1338 1339def delete_health_check(gcp): 1340 try: 1341 result = gcp.compute.healthChecks().delete( 1342 project=gcp.project, healthCheck=gcp.health_check.name).execute( 1343 num_retries=_GCP_API_RETRIES) 1344 wait_for_global_operation(gcp, result['name']) 1345 except googleapiclient.errors.HttpError as http_error: 1346 logger.info('Delete failed: %s', http_error) 1347 1348 1349def delete_instance_groups(gcp): 1350 for instance_group in gcp.instance_groups: 1351 try: 1352 result = gcp.compute.instanceGroupManagers().delete( 1353 project=gcp.project, 1354 zone=instance_group.zone, 1355 instanceGroupManager=instance_group.name).execute( 1356 num_retries=_GCP_API_RETRIES) 1357 wait_for_zone_operation(gcp, 1358 instance_group.zone, 1359 result['name'], 1360 timeout_sec=_WAIT_FOR_BACKEND_SEC) 1361 except googleapiclient.errors.HttpError as http_error: 1362 logger.info('Delete failed: %s', http_error) 1363 1364 1365def delete_instance_template(gcp): 1366 try: 1367 result = gcp.compute.instanceTemplates().delete( 1368 project=gcp.project, 1369 instanceTemplate=gcp.instance_template.name).execute( 1370 num_retries=_GCP_API_RETRIES) 1371 wait_for_global_operation(gcp, result['name']) 1372 except googleapiclient.errors.HttpError as http_error: 1373 logger.info('Delete failed: %s', http_error) 1374 1375 1376def patch_backend_instances(gcp, 1377 backend_service, 1378 instance_groups, 1379 balancing_mode='UTILIZATION'): 1380 if gcp.alpha_compute: 1381 compute_to_use = gcp.alpha_compute 1382 else: 1383 compute_to_use = gcp.compute 1384 config = { 1385 'backends': [{ 1386 'group': instance_group.url, 1387 'balancingMode': balancing_mode, 1388 'maxRate': 1 if balancing_mode == 'RATE' else None 1389 } for instance_group in instance_groups], 1390 } 1391 logger.debug('Sending GCP request with body=%s', config) 1392 result = compute_to_use.backendServices().patch( 1393 project=gcp.project, backendService=backend_service.name, 1394 body=config).execute(num_retries=_GCP_API_RETRIES) 1395 wait_for_global_operation(gcp, 1396 result['name'], 1397 timeout_sec=_WAIT_FOR_BACKEND_SEC) 1398 1399 1400def resize_instance_group(gcp, 1401 instance_group, 1402 new_size, 1403 timeout_sec=_WAIT_FOR_OPERATION_SEC): 1404 result = gcp.compute.instanceGroupManagers().resize( 1405 project=gcp.project, 1406 zone=instance_group.zone, 1407 instanceGroupManager=instance_group.name, 1408 size=new_size).execute(num_retries=_GCP_API_RETRIES) 1409 wait_for_zone_operation(gcp, 1410 instance_group.zone, 1411 result['name'], 1412 timeout_sec=360) 1413 wait_for_instance_group_to_reach_expected_size(gcp, instance_group, 1414 new_size, timeout_sec) 1415 1416 1417def patch_url_map_backend_service(gcp, 1418 backend_service=None, 1419 services_with_weights=None, 1420 route_rules=None): 1421 '''change url_map's backend service 1422 1423 Only one of backend_service and service_with_weights can be not None. 1424 ''' 1425 if backend_service and services_with_weights: 1426 raise ValueError( 1427 'both backend_service and service_with_weights are not None.') 1428 1429 default_service = backend_service.url if backend_service else None 1430 default_route_action = { 1431 'weightedBackendServices': [{ 1432 'backendService': service.url, 1433 'weight': w, 1434 } for service, w in services_with_weights.items()] 1435 } if services_with_weights else None 1436 1437 config = { 1438 'pathMatchers': [{ 1439 'name': _PATH_MATCHER_NAME, 1440 'defaultService': default_service, 1441 'defaultRouteAction': default_route_action, 1442 'routeRules': route_rules, 1443 }] 1444 } 1445 logger.debug('Sending GCP request with body=%s', config) 1446 result = gcp.compute.urlMaps().patch( 1447 project=gcp.project, urlMap=gcp.url_map.name, 1448 body=config).execute(num_retries=_GCP_API_RETRIES) 1449 wait_for_global_operation(gcp, result['name']) 1450 1451 1452def wait_for_instance_group_to_reach_expected_size(gcp, instance_group, 1453 expected_size, timeout_sec): 1454 start_time = time.time() 1455 while True: 1456 current_size = len(get_instance_names(gcp, instance_group)) 1457 if current_size == expected_size: 1458 break 1459 if time.time() - start_time > timeout_sec: 1460 raise Exception( 1461 'Instance group had expected size %d but actual size %d' % 1462 (expected_size, current_size)) 1463 time.sleep(2) 1464 1465 1466def wait_for_global_operation(gcp, 1467 operation, 1468 timeout_sec=_WAIT_FOR_OPERATION_SEC): 1469 start_time = time.time() 1470 while time.time() - start_time <= timeout_sec: 1471 result = gcp.compute.globalOperations().get( 1472 project=gcp.project, 1473 operation=operation).execute(num_retries=_GCP_API_RETRIES) 1474 if result['status'] == 'DONE': 1475 if 'error' in result: 1476 raise Exception(result['error']) 1477 return 1478 time.sleep(2) 1479 raise Exception('Operation %s did not complete within %d', operation, 1480 timeout_sec) 1481 1482 1483def wait_for_zone_operation(gcp, 1484 zone, 1485 operation, 1486 timeout_sec=_WAIT_FOR_OPERATION_SEC): 1487 start_time = time.time() 1488 while time.time() - start_time <= timeout_sec: 1489 result = gcp.compute.zoneOperations().get( 1490 project=gcp.project, zone=zone, 1491 operation=operation).execute(num_retries=_GCP_API_RETRIES) 1492 if result['status'] == 'DONE': 1493 if 'error' in result: 1494 raise Exception(result['error']) 1495 return 1496 time.sleep(2) 1497 raise Exception('Operation %s did not complete within %d', operation, 1498 timeout_sec) 1499 1500 1501def wait_for_healthy_backends(gcp, 1502 backend_service, 1503 instance_group, 1504 timeout_sec=_WAIT_FOR_BACKEND_SEC): 1505 start_time = time.time() 1506 config = {'group': instance_group.url} 1507 expected_size = len(get_instance_names(gcp, instance_group)) 1508 while time.time() - start_time <= timeout_sec: 1509 result = gcp.compute.backendServices().getHealth( 1510 project=gcp.project, 1511 backendService=backend_service.name, 1512 body=config).execute(num_retries=_GCP_API_RETRIES) 1513 if 'healthStatus' in result: 1514 logger.info('received healthStatus: %s', result['healthStatus']) 1515 healthy = True 1516 for instance in result['healthStatus']: 1517 if instance['healthState'] != 'HEALTHY': 1518 healthy = False 1519 break 1520 if healthy and expected_size == len(result['healthStatus']): 1521 return 1522 time.sleep(2) 1523 raise Exception('Not all backends became healthy within %d seconds: %s' % 1524 (timeout_sec, result)) 1525 1526 1527def get_instance_names(gcp, instance_group): 1528 instance_names = [] 1529 result = gcp.compute.instanceGroups().listInstances( 1530 project=gcp.project, 1531 zone=instance_group.zone, 1532 instanceGroup=instance_group.name, 1533 body={ 1534 'instanceState': 'ALL' 1535 }).execute(num_retries=_GCP_API_RETRIES) 1536 if 'items' not in result: 1537 return [] 1538 for item in result['items']: 1539 # listInstances() returns the full URL of the instance, which ends with 1540 # the instance name. compute.instances().get() requires using the 1541 # instance name (not the full URL) to look up instance details, so we 1542 # just extract the name manually. 1543 instance_name = item['instance'].split('/')[-1] 1544 instance_names.append(instance_name) 1545 logger.info('retrieved instance names: %s', instance_names) 1546 return instance_names 1547 1548 1549def clean_up(gcp): 1550 if gcp.global_forwarding_rule: 1551 delete_global_forwarding_rule(gcp) 1552 if gcp.target_proxy: 1553 delete_target_proxy(gcp) 1554 if gcp.url_map: 1555 delete_url_map(gcp) 1556 delete_backend_services(gcp) 1557 if gcp.health_check_firewall_rule: 1558 delete_firewall(gcp) 1559 if gcp.health_check: 1560 delete_health_check(gcp) 1561 delete_instance_groups(gcp) 1562 if gcp.instance_template: 1563 delete_instance_template(gcp) 1564 1565 1566class InstanceGroup(object): 1567 1568 def __init__(self, name, url, zone): 1569 self.name = name 1570 self.url = url 1571 self.zone = zone 1572 1573 1574class GcpResource(object): 1575 1576 def __init__(self, name, url): 1577 self.name = name 1578 self.url = url 1579 1580 1581class GcpState(object): 1582 1583 def __init__(self, compute, alpha_compute, project): 1584 self.compute = compute 1585 self.alpha_compute = alpha_compute 1586 self.project = project 1587 self.health_check = None 1588 self.health_check_firewall_rule = None 1589 self.backend_services = [] 1590 self.url_map = None 1591 self.target_proxy = None 1592 self.global_forwarding_rule = None 1593 self.service_port = None 1594 self.instance_template = None 1595 self.instance_groups = [] 1596 1597 1598alpha_compute = None 1599if args.compute_discovery_document: 1600 with open(args.compute_discovery_document, 'r') as discovery_doc: 1601 compute = googleapiclient.discovery.build_from_document( 1602 discovery_doc.read()) 1603 if not args.only_stable_gcp_apis and args.alpha_compute_discovery_document: 1604 with open(args.alpha_compute_discovery_document, 'r') as discovery_doc: 1605 alpha_compute = googleapiclient.discovery.build_from_document( 1606 discovery_doc.read()) 1607else: 1608 compute = googleapiclient.discovery.build('compute', 'v1') 1609 if not args.only_stable_gcp_apis: 1610 alpha_compute = googleapiclient.discovery.build('compute', 'alpha') 1611 1612try: 1613 gcp = GcpState(compute, alpha_compute, args.project_id) 1614 health_check_name = _BASE_HEALTH_CHECK_NAME + args.gcp_suffix 1615 firewall_name = _BASE_FIREWALL_RULE_NAME + args.gcp_suffix 1616 backend_service_name = _BASE_BACKEND_SERVICE_NAME + args.gcp_suffix 1617 alternate_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-alternate' + args.gcp_suffix 1618 url_map_name = _BASE_URL_MAP_NAME + args.gcp_suffix 1619 service_host_name = _BASE_SERVICE_HOST + args.gcp_suffix 1620 target_proxy_name = _BASE_TARGET_PROXY_NAME + args.gcp_suffix 1621 forwarding_rule_name = _BASE_FORWARDING_RULE_NAME + args.gcp_suffix 1622 template_name = _BASE_TEMPLATE_NAME + args.gcp_suffix 1623 instance_group_name = _BASE_INSTANCE_GROUP_NAME + args.gcp_suffix 1624 same_zone_instance_group_name = _BASE_INSTANCE_GROUP_NAME + '-same-zone' + args.gcp_suffix 1625 secondary_zone_instance_group_name = _BASE_INSTANCE_GROUP_NAME + '-secondary-zone' + args.gcp_suffix 1626 if args.use_existing_gcp_resources: 1627 logger.info('Reusing existing GCP resources') 1628 get_health_check(gcp, health_check_name) 1629 try: 1630 get_health_check_firewall_rule(gcp, firewall_name) 1631 except googleapiclient.errors.HttpError as http_error: 1632 # Firewall rule may be auto-deleted periodically depending on GCP 1633 # project settings. 1634 logger.exception('Failed to find firewall rule, recreating') 1635 create_health_check_firewall_rule(gcp, firewall_name) 1636 backend_service = get_backend_service(gcp, backend_service_name) 1637 alternate_backend_service = get_backend_service( 1638 gcp, alternate_backend_service_name) 1639 get_url_map(gcp, url_map_name) 1640 get_target_proxy(gcp, target_proxy_name) 1641 get_global_forwarding_rule(gcp, forwarding_rule_name) 1642 get_instance_template(gcp, template_name) 1643 instance_group = get_instance_group(gcp, args.zone, instance_group_name) 1644 same_zone_instance_group = get_instance_group( 1645 gcp, args.zone, same_zone_instance_group_name) 1646 secondary_zone_instance_group = get_instance_group( 1647 gcp, args.secondary_zone, secondary_zone_instance_group_name) 1648 else: 1649 create_health_check(gcp, health_check_name) 1650 create_health_check_firewall_rule(gcp, firewall_name) 1651 backend_service = add_backend_service(gcp, backend_service_name) 1652 alternate_backend_service = add_backend_service( 1653 gcp, alternate_backend_service_name) 1654 create_url_map(gcp, url_map_name, backend_service, service_host_name) 1655 create_target_proxy(gcp, target_proxy_name) 1656 potential_service_ports = list(args.service_port_range) 1657 random.shuffle(potential_service_ports) 1658 create_global_forwarding_rule(gcp, forwarding_rule_name, 1659 potential_service_ports) 1660 if not gcp.service_port: 1661 raise Exception( 1662 'Failed to find a valid ip:port for the forwarding rule') 1663 if gcp.service_port != _DEFAULT_SERVICE_PORT: 1664 patch_url_map_host_rule_with_port(gcp, url_map_name, 1665 backend_service, 1666 service_host_name) 1667 startup_script = get_startup_script(args.path_to_server_binary, 1668 gcp.service_port) 1669 create_instance_template(gcp, template_name, args.network, 1670 args.source_image, args.machine_type, 1671 startup_script) 1672 instance_group = add_instance_group(gcp, args.zone, instance_group_name, 1673 _INSTANCE_GROUP_SIZE) 1674 patch_backend_instances(gcp, backend_service, [instance_group]) 1675 same_zone_instance_group = add_instance_group( 1676 gcp, args.zone, same_zone_instance_group_name, _INSTANCE_GROUP_SIZE) 1677 secondary_zone_instance_group = add_instance_group( 1678 gcp, args.secondary_zone, secondary_zone_instance_group_name, 1679 _INSTANCE_GROUP_SIZE) 1680 1681 wait_for_healthy_backends(gcp, backend_service, instance_group) 1682 1683 if args.test_case: 1684 if gcp.service_port == _DEFAULT_SERVICE_PORT: 1685 server_uri = service_host_name 1686 else: 1687 server_uri = service_host_name + ':' + str(gcp.service_port) 1688 if args.bootstrap_file: 1689 bootstrap_path = os.path.abspath(args.bootstrap_file) 1690 else: 1691 with tempfile.NamedTemporaryFile(delete=False) as bootstrap_file: 1692 bootstrap_file.write( 1693 _BOOTSTRAP_TEMPLATE.format( 1694 node_id=socket.gethostname()).encode('utf-8')) 1695 bootstrap_path = bootstrap_file.name 1696 client_env = dict(os.environ, GRPC_XDS_BOOTSTRAP=bootstrap_path) 1697 1698 test_results = {} 1699 failed_tests = [] 1700 for test_case in args.test_case: 1701 result = jobset.JobResult() 1702 log_dir = os.path.join(_TEST_LOG_BASE_DIR, test_case) 1703 if not os.path.exists(log_dir): 1704 os.makedirs(log_dir) 1705 test_log_filename = os.path.join(log_dir, _SPONGE_LOG_NAME) 1706 test_log_file = open(test_log_filename, 'w+') 1707 client_process = None 1708 1709 if test_case in _TESTS_TO_RUN_MULTIPLE_RPCS: 1710 rpcs_to_send = '--rpc="UnaryCall,EmptyCall"' 1711 else: 1712 rpcs_to_send = '--rpc="UnaryCall"' 1713 1714 if test_case in _TESTS_TO_SEND_METADATA: 1715 metadata_to_send = '--metadata="EmptyCall:{key}:{value}"'.format( 1716 key=_TEST_METADATA_KEY, value=_TEST_METADATA_VALUE) 1717 else: 1718 metadata_to_send = '--metadata=""' 1719 1720 if test_case in _TESTS_TO_FAIL_ON_RPC_FAILURE: 1721 fail_on_failed_rpc = '--fail_on_failed_rpc=true' 1722 else: 1723 fail_on_failed_rpc = '--fail_on_failed_rpc=false' 1724 1725 client_cmd_formatted = args.client_cmd.format( 1726 server_uri=server_uri, 1727 stats_port=args.stats_port, 1728 qps=args.qps, 1729 fail_on_failed_rpc=fail_on_failed_rpc, 1730 rpcs_to_send=rpcs_to_send, 1731 metadata_to_send=metadata_to_send) 1732 logger.debug('running client: %s', client_cmd_formatted) 1733 client_cmd = shlex.split(client_cmd_formatted) 1734 try: 1735 client_process = subprocess.Popen(client_cmd, 1736 env=client_env, 1737 stderr=subprocess.STDOUT, 1738 stdout=test_log_file) 1739 if test_case == 'backends_restart': 1740 test_backends_restart(gcp, backend_service, instance_group) 1741 elif test_case == 'change_backend_service': 1742 test_change_backend_service(gcp, backend_service, 1743 instance_group, 1744 alternate_backend_service, 1745 same_zone_instance_group) 1746 elif test_case == 'gentle_failover': 1747 test_gentle_failover(gcp, backend_service, instance_group, 1748 secondary_zone_instance_group) 1749 elif test_case == 'new_instance_group_receives_traffic': 1750 test_new_instance_group_receives_traffic( 1751 gcp, backend_service, instance_group, 1752 same_zone_instance_group) 1753 elif test_case == 'ping_pong': 1754 test_ping_pong(gcp, backend_service, instance_group) 1755 elif test_case == 'remove_instance_group': 1756 test_remove_instance_group(gcp, backend_service, 1757 instance_group, 1758 same_zone_instance_group) 1759 elif test_case == 'round_robin': 1760 test_round_robin(gcp, backend_service, instance_group) 1761 elif test_case == 'secondary_locality_gets_no_requests_on_partial_primary_failure': 1762 test_secondary_locality_gets_no_requests_on_partial_primary_failure( 1763 gcp, backend_service, instance_group, 1764 secondary_zone_instance_group) 1765 elif test_case == 'secondary_locality_gets_requests_on_primary_failure': 1766 test_secondary_locality_gets_requests_on_primary_failure( 1767 gcp, backend_service, instance_group, 1768 secondary_zone_instance_group) 1769 elif test_case == 'traffic_splitting': 1770 test_traffic_splitting(gcp, backend_service, instance_group, 1771 alternate_backend_service, 1772 same_zone_instance_group) 1773 elif test_case == 'path_matching': 1774 test_path_matching(gcp, backend_service, instance_group, 1775 alternate_backend_service, 1776 same_zone_instance_group) 1777 elif test_case == 'header_matching': 1778 test_header_matching(gcp, backend_service, instance_group, 1779 alternate_backend_service, 1780 same_zone_instance_group) 1781 else: 1782 logger.error('Unknown test case: %s', test_case) 1783 sys.exit(1) 1784 if client_process.poll() is not None: 1785 raise Exception( 1786 'Client process exited prematurely with exit code %d' % 1787 client_process.returncode) 1788 result.state = 'PASSED' 1789 result.returncode = 0 1790 except Exception as e: 1791 logger.exception('Test case %s failed', test_case) 1792 failed_tests.append(test_case) 1793 result.state = 'FAILED' 1794 result.message = str(e) 1795 finally: 1796 if client_process and not client_process.returncode: 1797 client_process.terminate() 1798 test_log_file.close() 1799 # Workaround for Python 3, as report_utils will invoke decode() on 1800 # result.message, which has a default value of ''. 1801 result.message = result.message.encode('UTF-8') 1802 test_results[test_case] = [result] 1803 if args.log_client_output: 1804 logger.info('Client output:') 1805 with open(test_log_filename, 'r') as client_output: 1806 logger.info(client_output.read()) 1807 if not os.path.exists(_TEST_LOG_BASE_DIR): 1808 os.makedirs(_TEST_LOG_BASE_DIR) 1809 report_utils.render_junit_xml_report(test_results, 1810 os.path.join( 1811 _TEST_LOG_BASE_DIR, 1812 _SPONGE_XML_NAME), 1813 suite_name='xds_tests', 1814 multi_target=True) 1815 if failed_tests: 1816 logger.error('Test case(s) %s failed', failed_tests) 1817 sys.exit(1) 1818finally: 1819 if not args.keep_gcp_resources: 1820 logger.info('Cleaning up GCP resources. This may take some time.') 1821 clean_up(gcp) 1822