• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2# Copyright 2020 gRPC authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Run xDS integration tests on GCP using Traffic Director."""
16
17import argparse
18import datetime
19import googleapiclient.discovery
20import grpc
21import json
22import logging
23import os
24import random
25import shlex
26import socket
27import subprocess
28import sys
29import tempfile
30import time
31import uuid
32
33from oauth2client.client import GoogleCredentials
34from google.protobuf import json_format
35
36import python_utils.jobset as jobset
37import python_utils.report_utils as report_utils
38
39from src.proto.grpc.health.v1 import health_pb2
40from src.proto.grpc.health.v1 import health_pb2_grpc
41from src.proto.grpc.testing import empty_pb2
42from src.proto.grpc.testing import messages_pb2
43from src.proto.grpc.testing import test_pb2_grpc
44
45# Envoy protos provided by PyPI package xds-protos
46# Needs to import the generated Python file to load descriptors
47try:
48    from envoy.service.status.v3 import csds_pb2
49    from envoy.service.status.v3 import csds_pb2_grpc
50    from envoy.extensions.filters.network.http_connection_manager.v3 import http_connection_manager_pb2
51    from envoy.extensions.filters.common.fault.v3 import fault_pb2
52    from envoy.extensions.filters.http.fault.v3 import fault_pb2
53    from envoy.extensions.filters.http.router.v3 import router_pb2
54except ImportError:
55    # These protos are required by CSDS test. We should not fail the entire
56    # script for one test case.
57    pass
58
59logger = logging.getLogger()
60console_handler = logging.StreamHandler()
61formatter = logging.Formatter(fmt='%(asctime)s: %(levelname)-8s %(message)s')
62console_handler.setFormatter(formatter)
63logger.handlers = []
64logger.addHandler(console_handler)
65logger.setLevel(logging.WARNING)
66
67# Suppress excessive logs for gRPC Python
68original_grpc_trace = os.environ.pop('GRPC_TRACE', None)
69original_grpc_verbosity = os.environ.pop('GRPC_VERBOSITY', None)
70# Suppress not-essential logs for GCP clients
71logging.getLogger('google_auth_httplib2').setLevel(logging.WARNING)
72logging.getLogger('googleapiclient.discovery').setLevel(logging.WARNING)
73
74_TEST_CASES = [
75    'backends_restart',
76    'change_backend_service',
77    'gentle_failover',
78    'load_report_based_failover',
79    'ping_pong',
80    'remove_instance_group',
81    'round_robin',
82    'secondary_locality_gets_no_requests_on_partial_primary_failure',
83    'secondary_locality_gets_requests_on_primary_failure',
84    'traffic_splitting',
85    'path_matching',
86    'header_matching',
87    'forwarding_rule_port_match',
88    'forwarding_rule_default_port',
89    'metadata_filter',
90]
91
92# Valid test cases, but not in all. So the tests can only run manually, and
93# aren't enabled automatically for all languages.
94#
95# TODO: Move them into _TEST_CASES when support is ready in all languages.
96_ADDITIONAL_TEST_CASES = [
97    'circuit_breaking',
98    'timeout',
99    'fault_injection',
100    'csds',
101    'api_listener',  # TODO(b/187352987) Relieve quota pressure
102]
103
104# Test cases that require the V3 API.  Skipped in older runs.
105_V3_TEST_CASES = frozenset(['timeout', 'fault_injection', 'csds'])
106
107# Test cases that require the alpha API.  Skipped for stable API runs.
108_ALPHA_TEST_CASES = frozenset(['timeout'])
109
110
111def parse_test_cases(arg):
112    if arg == '':
113        return []
114    arg_split = arg.split(',')
115    test_cases = set()
116    all_test_cases = _TEST_CASES + _ADDITIONAL_TEST_CASES
117    for arg in arg_split:
118        if arg == "all":
119            test_cases = test_cases.union(_TEST_CASES)
120        else:
121            test_cases = test_cases.union([arg])
122    if not all([test_case in all_test_cases for test_case in test_cases]):
123        raise Exception('Failed to parse test cases %s' % arg)
124    # Perserve order.
125    return [x for x in all_test_cases if x in test_cases]
126
127
128def parse_port_range(port_arg):
129    try:
130        port = int(port_arg)
131        return range(port, port + 1)
132    except:
133        port_min, port_max = port_arg.split(':')
134        return range(int(port_min), int(port_max) + 1)
135
136
137argp = argparse.ArgumentParser(description='Run xDS interop tests on GCP')
138# TODO(zdapeng): remove default value of project_id and project_num
139argp.add_argument('--project_id', default='grpc-testing', help='GCP project id')
140argp.add_argument('--project_num',
141                  default='830293263384',
142                  help='GCP project number')
143argp.add_argument(
144    '--gcp_suffix',
145    default='',
146    help='Optional suffix for all generated GCP resource names. Useful to '
147    'ensure distinct names across test runs.')
148argp.add_argument(
149    '--test_case',
150    default='ping_pong',
151    type=parse_test_cases,
152    help='Comma-separated list of test cases to run. Available tests: %s, '
153    '(or \'all\' to run every test). '
154    'Alternative tests not included in \'all\': %s' %
155    (','.join(_TEST_CASES), ','.join(_ADDITIONAL_TEST_CASES)))
156argp.add_argument(
157    '--bootstrap_file',
158    default='',
159    help='File to reference via GRPC_XDS_BOOTSTRAP. Disables built-in '
160    'bootstrap generation')
161argp.add_argument(
162    '--xds_v3_support',
163    default=False,
164    action='store_true',
165    help='Support xDS v3 via GRPC_XDS_EXPERIMENTAL_V3_SUPPORT. '
166    'If a pre-created bootstrap file is provided via the --bootstrap_file '
167    'parameter, it should include xds_v3 in its server_features field.')
168argp.add_argument(
169    '--client_cmd',
170    default=None,
171    help='Command to launch xDS test client. {server_uri}, {stats_port} and '
172    '{qps} references will be replaced using str.format(). GRPC_XDS_BOOTSTRAP '
173    'will be set for the command')
174argp.add_argument(
175    '--client_hosts',
176    default=None,
177    help='Comma-separated list of hosts running client processes. If set, '
178    '--client_cmd is ignored and client processes are assumed to be running on '
179    'the specified hosts.')
180argp.add_argument('--zone', default='us-central1-a')
181argp.add_argument('--secondary_zone',
182                  default='us-west1-b',
183                  help='Zone to use for secondary TD locality tests')
184argp.add_argument('--qps', default=100, type=int, help='Client QPS')
185argp.add_argument(
186    '--wait_for_backend_sec',
187    default=1200,
188    type=int,
189    help='Time limit for waiting for created backend services to report '
190    'healthy when launching or updated GCP resources')
191argp.add_argument(
192    '--use_existing_gcp_resources',
193    default=False,
194    action='store_true',
195    help=
196    'If set, find and use already created GCP resources instead of creating new'
197    ' ones.')
198argp.add_argument(
199    '--keep_gcp_resources',
200    default=False,
201    action='store_true',
202    help=
203    'Leave GCP VMs and configuration running after test. Default behavior is '
204    'to delete when tests complete.')
205argp.add_argument(
206    '--compute_discovery_document',
207    default=None,
208    type=str,
209    help=
210    'If provided, uses this file instead of retrieving via the GCP discovery '
211    'API')
212argp.add_argument(
213    '--alpha_compute_discovery_document',
214    default=None,
215    type=str,
216    help='If provided, uses this file instead of retrieving via the alpha GCP '
217    'discovery API')
218argp.add_argument('--network',
219                  default='global/networks/default',
220                  help='GCP network to use')
221_DEFAULT_PORT_RANGE = '8080:8280'
222argp.add_argument('--service_port_range',
223                  default=_DEFAULT_PORT_RANGE,
224                  type=parse_port_range,
225                  help='Listening port for created gRPC backends. Specified as '
226                  'either a single int or as a range in the format min:max, in '
227                  'which case an available port p will be chosen s.t. min <= p '
228                  '<= max')
229argp.add_argument(
230    '--stats_port',
231    default=8079,
232    type=int,
233    help='Local port for the client process to expose the LB stats service')
234argp.add_argument('--xds_server',
235                  default='trafficdirector.googleapis.com:443',
236                  help='xDS server')
237argp.add_argument('--source_image',
238                  default='projects/debian-cloud/global/images/family/debian-9',
239                  help='Source image for VMs created during the test')
240argp.add_argument('--path_to_server_binary',
241                  default=None,
242                  type=str,
243                  help='If set, the server binary must already be pre-built on '
244                  'the specified source image')
245argp.add_argument('--machine_type',
246                  default='e2-standard-2',
247                  help='Machine type for VMs created during the test')
248argp.add_argument(
249    '--instance_group_size',
250    default=2,
251    type=int,
252    help='Number of VMs to create per instance group. Certain test cases (e.g., '
253    'round_robin) may not give meaningful results if this is set to a value '
254    'less than 2.')
255argp.add_argument('--verbose',
256                  help='verbose log output',
257                  default=False,
258                  action='store_true')
259# TODO(ericgribkoff) Remove this param once the sponge-formatted log files are
260# visible in all test environments.
261argp.add_argument('--log_client_output',
262                  help='Log captured client output',
263                  default=False,
264                  action='store_true')
265# TODO(ericgribkoff) Remove this flag once all test environments are verified to
266# have access to the alpha compute APIs.
267argp.add_argument('--only_stable_gcp_apis',
268                  help='Do not use alpha compute APIs. Some tests may be '
269                  'incompatible with this option (gRPC health checks are '
270                  'currently alpha and required for simulating server failure',
271                  default=False,
272                  action='store_true')
273args = argp.parse_args()
274
275if args.verbose:
276    logger.setLevel(logging.DEBUG)
277
278CLIENT_HOSTS = []
279if args.client_hosts:
280    CLIENT_HOSTS = args.client_hosts.split(',')
281
282# Each of the config propagation in the control plane should finish within 600s.
283# Otherwise, it indicates a bug in the control plane. The config propagation
284# includes all kinds of traffic config update, like updating urlMap, creating
285# the resources for the first time, updating BackendService, and changing the
286# status of endpoints in BackendService.
287_WAIT_FOR_URL_MAP_PATCH_SEC = 600
288# In general, fetching load balancing stats only takes ~10s. However, slow
289# config update could lead to empty EDS or similar symptoms causing the
290# connection to hang for a long period of time. So, we want to extend the stats
291# wait time to be the same as urlMap patch time.
292_WAIT_FOR_STATS_SEC = _WAIT_FOR_URL_MAP_PATCH_SEC
293
294_DEFAULT_SERVICE_PORT = 80
295_WAIT_FOR_BACKEND_SEC = args.wait_for_backend_sec
296_WAIT_FOR_OPERATION_SEC = 1200
297_INSTANCE_GROUP_SIZE = args.instance_group_size
298_NUM_TEST_RPCS = 10 * args.qps
299_CONNECTION_TIMEOUT_SEC = 60
300_GCP_API_RETRIES = 5
301_BOOTSTRAP_TEMPLATE = """
302{{
303  "node": {{
304    "id": "{node_id}",
305    "metadata": {{
306      "TRAFFICDIRECTOR_NETWORK_NAME": "%s",
307      "com.googleapis.trafficdirector.config_time_trace": "TRUE"
308    }},
309    "locality": {{
310      "zone": "%s"
311    }}
312  }},
313  "xds_servers": [{{
314    "server_uri": "%s",
315    "channel_creds": [
316      {{
317        "type": "google_default",
318        "config": {{}}
319      }}
320    ],
321    "server_features": {server_features}
322  }}]
323}}""" % (args.network.split('/')[-1], args.zone, args.xds_server)
324
325# TODO(ericgribkoff) Add change_backend_service to this list once TD no longer
326# sends an update with no localities when adding the MIG to the backend service
327# can race with the URL map patch.
328_TESTS_TO_FAIL_ON_RPC_FAILURE = ['ping_pong', 'round_robin']
329# Tests that run UnaryCall and EmptyCall.
330_TESTS_TO_RUN_MULTIPLE_RPCS = ['path_matching', 'header_matching']
331# Tests that make UnaryCall with test metadata.
332_TESTS_TO_SEND_METADATA = ['header_matching']
333_TEST_METADATA_KEY = 'xds_md'
334_TEST_METADATA_VALUE_UNARY = 'unary_yranu'
335_TEST_METADATA_VALUE_EMPTY = 'empty_ytpme'
336# Extra RPC metadata whose value is a number, sent with UnaryCall only.
337_TEST_METADATA_NUMERIC_KEY = 'xds_md_numeric'
338_TEST_METADATA_NUMERIC_VALUE = '159'
339_PATH_MATCHER_NAME = 'path-matcher'
340_BASE_TEMPLATE_NAME = 'test-template'
341_BASE_INSTANCE_GROUP_NAME = 'test-ig'
342_BASE_HEALTH_CHECK_NAME = 'test-hc'
343_BASE_FIREWALL_RULE_NAME = 'test-fw-rule'
344_BASE_BACKEND_SERVICE_NAME = 'test-backend-service'
345_BASE_URL_MAP_NAME = 'test-map'
346_BASE_SERVICE_HOST = 'grpc-test'
347_BASE_TARGET_PROXY_NAME = 'test-target-proxy'
348_BASE_FORWARDING_RULE_NAME = 'test-forwarding-rule'
349_TEST_LOG_BASE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)),
350                                  '../../reports')
351_SPONGE_LOG_NAME = 'sponge_log.log'
352_SPONGE_XML_NAME = 'sponge_log.xml'
353
354
355def get_client_stats(num_rpcs, timeout_sec):
356    if CLIENT_HOSTS:
357        hosts = CLIENT_HOSTS
358    else:
359        hosts = ['localhost']
360    for host in hosts:
361        with grpc.insecure_channel('%s:%d' %
362                                   (host, args.stats_port)) as channel:
363            stub = test_pb2_grpc.LoadBalancerStatsServiceStub(channel)
364            request = messages_pb2.LoadBalancerStatsRequest()
365            request.num_rpcs = num_rpcs
366            request.timeout_sec = timeout_sec
367            rpc_timeout = timeout_sec + _CONNECTION_TIMEOUT_SEC
368            logger.debug('Invoking GetClientStats RPC to %s:%d:', host,
369                         args.stats_port)
370            response = stub.GetClientStats(request,
371                                           wait_for_ready=True,
372                                           timeout=rpc_timeout)
373            logger.debug('Invoked GetClientStats RPC to %s: %s', host,
374                         json_format.MessageToJson(response))
375            return response
376
377
378def get_client_accumulated_stats():
379    if CLIENT_HOSTS:
380        hosts = CLIENT_HOSTS
381    else:
382        hosts = ['localhost']
383    for host in hosts:
384        with grpc.insecure_channel('%s:%d' %
385                                   (host, args.stats_port)) as channel:
386            stub = test_pb2_grpc.LoadBalancerStatsServiceStub(channel)
387            request = messages_pb2.LoadBalancerAccumulatedStatsRequest()
388            logger.debug('Invoking GetClientAccumulatedStats RPC to %s:%d:',
389                         host, args.stats_port)
390            response = stub.GetClientAccumulatedStats(
391                request, wait_for_ready=True, timeout=_CONNECTION_TIMEOUT_SEC)
392            logger.debug('Invoked GetClientAccumulatedStats RPC to %s: %s',
393                         host, response)
394            return response
395
396
397def get_client_xds_config_dump():
398    if CLIENT_HOSTS:
399        hosts = CLIENT_HOSTS
400    else:
401        hosts = ['localhost']
402    for host in hosts:
403        server_address = '%s:%d' % (host, args.stats_port)
404        with grpc.insecure_channel(server_address) as channel:
405            stub = csds_pb2_grpc.ClientStatusDiscoveryServiceStub(channel)
406            logger.debug('Fetching xDS config dump from %s', server_address)
407            response = stub.FetchClientStatus(csds_pb2.ClientStatusRequest(),
408                                              wait_for_ready=True,
409                                              timeout=_CONNECTION_TIMEOUT_SEC)
410            logger.debug('Fetched xDS config dump from %s', server_address)
411            if len(response.config) != 1:
412                logger.error('Unexpected number of ClientConfigs %d: %s',
413                             len(response.config), response)
414                return None
415            else:
416                # Converting the ClientStatusResponse into JSON, because many
417                # fields are packed in google.protobuf.Any. It will require many
418                # duplicated code to unpack proto message and inspect values.
419                return json_format.MessageToDict(
420                    response.config[0], preserving_proto_field_name=True)
421
422
423def configure_client(rpc_types, metadata=[], timeout_sec=None):
424    if CLIENT_HOSTS:
425        hosts = CLIENT_HOSTS
426    else:
427        hosts = ['localhost']
428    for host in hosts:
429        with grpc.insecure_channel('%s:%d' %
430                                   (host, args.stats_port)) as channel:
431            stub = test_pb2_grpc.XdsUpdateClientConfigureServiceStub(channel)
432            request = messages_pb2.ClientConfigureRequest()
433            request.types.extend(rpc_types)
434            for rpc_type, md_key, md_value in metadata:
435                md = request.metadata.add()
436                md.type = rpc_type
437                md.key = md_key
438                md.value = md_value
439            if timeout_sec:
440                request.timeout_sec = timeout_sec
441            logger.debug(
442                'Invoking XdsUpdateClientConfigureService RPC to %s:%d: %s',
443                host, args.stats_port, request)
444            stub.Configure(request,
445                           wait_for_ready=True,
446                           timeout=_CONNECTION_TIMEOUT_SEC)
447            logger.debug('Invoked XdsUpdateClientConfigureService RPC to %s',
448                         host)
449
450
451class RpcDistributionError(Exception):
452    pass
453
454
455def _verify_rpcs_to_given_backends(backends, timeout_sec, num_rpcs,
456                                   allow_failures):
457    start_time = time.time()
458    error_msg = None
459    logger.debug('Waiting for %d sec until backends %s receive load' %
460                 (timeout_sec, backends))
461    while time.time() - start_time <= timeout_sec:
462        error_msg = None
463        stats = get_client_stats(num_rpcs, timeout_sec)
464        rpcs_by_peer = stats.rpcs_by_peer
465        for backend in backends:
466            if backend not in rpcs_by_peer:
467                error_msg = 'Backend %s did not receive load' % backend
468                break
469        if not error_msg and len(rpcs_by_peer) > len(backends):
470            error_msg = 'Unexpected backend received load: %s' % rpcs_by_peer
471        if not allow_failures and stats.num_failures > 0:
472            error_msg = '%d RPCs failed' % stats.num_failures
473        if not error_msg:
474            return
475    raise RpcDistributionError(error_msg)
476
477
478def wait_until_all_rpcs_go_to_given_backends_or_fail(backends,
479                                                     timeout_sec,
480                                                     num_rpcs=_NUM_TEST_RPCS):
481    _verify_rpcs_to_given_backends(backends,
482                                   timeout_sec,
483                                   num_rpcs,
484                                   allow_failures=True)
485
486
487def wait_until_all_rpcs_go_to_given_backends(backends,
488                                             timeout_sec,
489                                             num_rpcs=_NUM_TEST_RPCS):
490    _verify_rpcs_to_given_backends(backends,
491                                   timeout_sec,
492                                   num_rpcs,
493                                   allow_failures=False)
494
495
496def wait_until_no_rpcs_go_to_given_backends(backends, timeout_sec):
497    start_time = time.time()
498    while time.time() - start_time <= timeout_sec:
499        stats = get_client_stats(_NUM_TEST_RPCS, timeout_sec)
500        error_msg = None
501        rpcs_by_peer = stats.rpcs_by_peer
502        for backend in backends:
503            if backend in rpcs_by_peer:
504                error_msg = 'Unexpected backend %s receives load' % backend
505                break
506        if not error_msg:
507            return
508    raise Exception('Unexpected RPCs going to given backends')
509
510
511def wait_until_rpcs_in_flight(rpc_type, timeout_sec, num_rpcs, threshold):
512    '''Block until the test client reaches the state with the given number
513    of RPCs being outstanding stably.
514
515    Args:
516      rpc_type: A string indicating the RPC method to check for. Either
517        'UnaryCall' or 'EmptyCall'.
518      timeout_sec: Maximum number of seconds to wait until the desired state
519        is reached.
520      num_rpcs: Expected number of RPCs to be in-flight.
521      threshold: Number within [0,100], the tolerable percentage by which
522        the actual number of RPCs in-flight can differ from the expected number.
523    '''
524    if threshold < 0 or threshold > 100:
525        raise ValueError('Value error: Threshold should be between 0 to 100')
526    threshold_fraction = threshold / 100.0
527    start_time = time.time()
528    error_msg = None
529    logger.debug(
530        'Waiting for %d sec until %d %s RPCs (with %d%% tolerance) in-flight' %
531        (timeout_sec, num_rpcs, rpc_type, threshold))
532    while time.time() - start_time <= timeout_sec:
533        error_msg = _check_rpcs_in_flight(rpc_type, num_rpcs, threshold,
534                                          threshold_fraction)
535        if error_msg:
536            logger.debug('Progress: %s', error_msg)
537            time.sleep(2)
538        else:
539            break
540    # Ensure the number of outstanding RPCs is stable.
541    if not error_msg:
542        time.sleep(5)
543        error_msg = _check_rpcs_in_flight(rpc_type, num_rpcs, threshold,
544                                          threshold_fraction)
545    if error_msg:
546        raise Exception("Wrong number of %s RPCs in-flight: %s" %
547                        (rpc_type, error_msg))
548
549
550def _check_rpcs_in_flight(rpc_type, num_rpcs, threshold, threshold_fraction):
551    error_msg = None
552    stats = get_client_accumulated_stats()
553    rpcs_started = stats.num_rpcs_started_by_method[rpc_type]
554    rpcs_succeeded = stats.num_rpcs_succeeded_by_method[rpc_type]
555    rpcs_failed = stats.num_rpcs_failed_by_method[rpc_type]
556    rpcs_in_flight = rpcs_started - rpcs_succeeded - rpcs_failed
557    if rpcs_in_flight < (num_rpcs * (1 - threshold_fraction)):
558        error_msg = ('actual(%d) < expected(%d - %d%%)' %
559                     (rpcs_in_flight, num_rpcs, threshold))
560    elif rpcs_in_flight > (num_rpcs * (1 + threshold_fraction)):
561        error_msg = ('actual(%d) > expected(%d + %d%%)' %
562                     (rpcs_in_flight, num_rpcs, threshold))
563    return error_msg
564
565
566def compare_distributions(actual_distribution, expected_distribution,
567                          threshold):
568    """Compare if two distributions are similar.
569
570    Args:
571      actual_distribution: A list of floats, contains the actual distribution.
572      expected_distribution: A list of floats, contains the expected distribution.
573      threshold: Number within [0,100], the threshold percentage by which the
574        actual distribution can differ from the expected distribution.
575
576    Returns:
577      The similarity between the distributions as a boolean. Returns true if the
578      actual distribution lies within the threshold of the expected
579      distribution, false otherwise.
580
581    Raises:
582      ValueError: if threshold is not with in [0,100].
583      Exception: containing detailed error messages.
584    """
585    if len(expected_distribution) != len(actual_distribution):
586        raise Exception(
587            'Error: expected and actual distributions have different size (%d vs %d)'
588            % (len(expected_distribution), len(actual_distribution)))
589    if threshold < 0 or threshold > 100:
590        raise ValueError('Value error: Threshold should be between 0 to 100')
591    threshold_fraction = threshold / 100.0
592    for expected, actual in zip(expected_distribution, actual_distribution):
593        if actual < (expected * (1 - threshold_fraction)):
594            raise Exception("actual(%f) < expected(%f-%d%%)" %
595                            (actual, expected, threshold))
596        if actual > (expected * (1 + threshold_fraction)):
597            raise Exception("actual(%f) > expected(%f+%d%%)" %
598                            (actual, expected, threshold))
599    return True
600
601
602def compare_expected_instances(stats, expected_instances):
603    """Compare if stats have expected instances for each type of RPC.
604
605    Args:
606      stats: LoadBalancerStatsResponse reported by interop client.
607      expected_instances: a dict with key as the RPC type (string), value as
608        the expected backend instances (list of strings).
609
610    Returns:
611      Returns true if the instances are expected. False if not.
612    """
613    for rpc_type, expected_peers in expected_instances.items():
614        rpcs_by_peer_for_type = stats.rpcs_by_method[rpc_type]
615        rpcs_by_peer = rpcs_by_peer_for_type.rpcs_by_peer if rpcs_by_peer_for_type else None
616        logger.debug('rpc: %s, by_peer: %s', rpc_type, rpcs_by_peer)
617        peers = list(rpcs_by_peer.keys())
618        if set(peers) != set(expected_peers):
619            logger.info('unexpected peers for %s, got %s, want %s', rpc_type,
620                        peers, expected_peers)
621            return False
622    return True
623
624
625def test_backends_restart(gcp, backend_service, instance_group):
626    logger.info('Running test_backends_restart')
627    instance_names = get_instance_names(gcp, instance_group)
628    num_instances = len(instance_names)
629    start_time = time.time()
630    wait_until_all_rpcs_go_to_given_backends(instance_names,
631                                             _WAIT_FOR_STATS_SEC)
632    try:
633        resize_instance_group(gcp, instance_group, 0)
634        wait_until_all_rpcs_go_to_given_backends_or_fail([],
635                                                         _WAIT_FOR_BACKEND_SEC)
636    finally:
637        resize_instance_group(gcp, instance_group, num_instances)
638    wait_for_healthy_backends(gcp, backend_service, instance_group)
639    new_instance_names = get_instance_names(gcp, instance_group)
640    wait_until_all_rpcs_go_to_given_backends(new_instance_names,
641                                             _WAIT_FOR_BACKEND_SEC)
642
643
644def test_change_backend_service(gcp, original_backend_service, instance_group,
645                                alternate_backend_service,
646                                same_zone_instance_group):
647    logger.info('Running test_change_backend_service')
648    original_backend_instances = get_instance_names(gcp, instance_group)
649    alternate_backend_instances = get_instance_names(gcp,
650                                                     same_zone_instance_group)
651    patch_backend_service(gcp, alternate_backend_service,
652                          [same_zone_instance_group])
653    wait_for_healthy_backends(gcp, original_backend_service, instance_group)
654    wait_for_healthy_backends(gcp, alternate_backend_service,
655                              same_zone_instance_group)
656    wait_until_all_rpcs_go_to_given_backends(original_backend_instances,
657                                             _WAIT_FOR_STATS_SEC)
658    try:
659        patch_url_map_backend_service(gcp, alternate_backend_service)
660        wait_until_all_rpcs_go_to_given_backends(alternate_backend_instances,
661                                                 _WAIT_FOR_URL_MAP_PATCH_SEC)
662    finally:
663        patch_url_map_backend_service(gcp, original_backend_service)
664        patch_backend_service(gcp, alternate_backend_service, [])
665
666
667def test_gentle_failover(gcp,
668                         backend_service,
669                         primary_instance_group,
670                         secondary_instance_group,
671                         swapped_primary_and_secondary=False):
672    logger.info('Running test_gentle_failover')
673    num_primary_instances = len(get_instance_names(gcp, primary_instance_group))
674    min_instances_for_gentle_failover = 3  # Need >50% failure to start failover
675    try:
676        if num_primary_instances < min_instances_for_gentle_failover:
677            resize_instance_group(gcp, primary_instance_group,
678                                  min_instances_for_gentle_failover)
679        patch_backend_service(
680            gcp, backend_service,
681            [primary_instance_group, secondary_instance_group])
682        primary_instance_names = get_instance_names(gcp, primary_instance_group)
683        secondary_instance_names = get_instance_names(gcp,
684                                                      secondary_instance_group)
685        wait_for_healthy_backends(gcp, backend_service, primary_instance_group)
686        wait_for_healthy_backends(gcp, backend_service,
687                                  secondary_instance_group)
688        wait_until_all_rpcs_go_to_given_backends(primary_instance_names,
689                                                 _WAIT_FOR_STATS_SEC)
690        instances_to_stop = primary_instance_names[:-1]
691        remaining_instances = primary_instance_names[-1:]
692        try:
693            set_serving_status(instances_to_stop,
694                               gcp.service_port,
695                               serving=False)
696            wait_until_all_rpcs_go_to_given_backends(
697                remaining_instances + secondary_instance_names,
698                _WAIT_FOR_BACKEND_SEC)
699        finally:
700            set_serving_status(primary_instance_names,
701                               gcp.service_port,
702                               serving=True)
703    except RpcDistributionError as e:
704        if not swapped_primary_and_secondary and is_primary_instance_group(
705                gcp, secondary_instance_group):
706            # Swap expectation of primary and secondary instance groups.
707            test_gentle_failover(gcp,
708                                 backend_service,
709                                 secondary_instance_group,
710                                 primary_instance_group,
711                                 swapped_primary_and_secondary=True)
712        else:
713            raise e
714    finally:
715        patch_backend_service(gcp, backend_service, [primary_instance_group])
716        resize_instance_group(gcp, primary_instance_group,
717                              num_primary_instances)
718        instance_names = get_instance_names(gcp, primary_instance_group)
719        wait_until_all_rpcs_go_to_given_backends(instance_names,
720                                                 _WAIT_FOR_BACKEND_SEC)
721
722
723def test_load_report_based_failover(gcp, backend_service,
724                                    primary_instance_group,
725                                    secondary_instance_group):
726    logger.info('Running test_load_report_based_failover')
727    try:
728        patch_backend_service(
729            gcp, backend_service,
730            [primary_instance_group, secondary_instance_group])
731        primary_instance_names = get_instance_names(gcp, primary_instance_group)
732        secondary_instance_names = get_instance_names(gcp,
733                                                      secondary_instance_group)
734        wait_for_healthy_backends(gcp, backend_service, primary_instance_group)
735        wait_for_healthy_backends(gcp, backend_service,
736                                  secondary_instance_group)
737        wait_until_all_rpcs_go_to_given_backends(primary_instance_names,
738                                                 _WAIT_FOR_STATS_SEC)
739        # Set primary locality's balance mode to RATE, and RPS to 20% of the
740        # client's QPS. The secondary locality will be used.
741        max_rate = int(args.qps * 1 / 5)
742        logger.info('Patching backend service to RATE with %d max_rate',
743                    max_rate)
744        patch_backend_service(
745            gcp,
746            backend_service, [primary_instance_group, secondary_instance_group],
747            balancing_mode='RATE',
748            max_rate=max_rate)
749        wait_until_all_rpcs_go_to_given_backends(
750            primary_instance_names + secondary_instance_names,
751            _WAIT_FOR_BACKEND_SEC)
752
753        # Set primary locality's balance mode to RATE, and RPS to 120% of the
754        # client's QPS. Only the primary locality will be used.
755        max_rate = int(args.qps * 6 / 5)
756        logger.info('Patching backend service to RATE with %d max_rate',
757                    max_rate)
758        patch_backend_service(
759            gcp,
760            backend_service, [primary_instance_group, secondary_instance_group],
761            balancing_mode='RATE',
762            max_rate=max_rate)
763        wait_until_all_rpcs_go_to_given_backends(primary_instance_names,
764                                                 _WAIT_FOR_BACKEND_SEC)
765        logger.info("success")
766    finally:
767        patch_backend_service(gcp, backend_service, [primary_instance_group])
768        instance_names = get_instance_names(gcp, primary_instance_group)
769        wait_until_all_rpcs_go_to_given_backends(instance_names,
770                                                 _WAIT_FOR_BACKEND_SEC)
771
772
773def test_ping_pong(gcp, backend_service, instance_group):
774    logger.info('Running test_ping_pong')
775    wait_for_healthy_backends(gcp, backend_service, instance_group)
776    instance_names = get_instance_names(gcp, instance_group)
777    wait_until_all_rpcs_go_to_given_backends(instance_names,
778                                             _WAIT_FOR_STATS_SEC)
779
780
781def test_remove_instance_group(gcp, backend_service, instance_group,
782                               same_zone_instance_group):
783    logger.info('Running test_remove_instance_group')
784    try:
785        patch_backend_service(gcp,
786                              backend_service,
787                              [instance_group, same_zone_instance_group],
788                              balancing_mode='RATE')
789        wait_for_healthy_backends(gcp, backend_service, instance_group)
790        wait_for_healthy_backends(gcp, backend_service,
791                                  same_zone_instance_group)
792        instance_names = get_instance_names(gcp, instance_group)
793        same_zone_instance_names = get_instance_names(gcp,
794                                                      same_zone_instance_group)
795        try:
796            wait_until_all_rpcs_go_to_given_backends(
797                instance_names + same_zone_instance_names,
798                _WAIT_FOR_OPERATION_SEC)
799            remaining_instance_group = same_zone_instance_group
800            remaining_instance_names = same_zone_instance_names
801        except RpcDistributionError as e:
802            # If connected to TD in a different zone, we may route traffic to
803            # only one instance group. Determine which group that is to continue
804            # with the remainder of the test case.
805            try:
806                wait_until_all_rpcs_go_to_given_backends(
807                    instance_names, _WAIT_FOR_STATS_SEC)
808                remaining_instance_group = same_zone_instance_group
809                remaining_instance_names = same_zone_instance_names
810            except RpcDistributionError as e:
811                wait_until_all_rpcs_go_to_given_backends(
812                    same_zone_instance_names, _WAIT_FOR_STATS_SEC)
813                remaining_instance_group = instance_group
814                remaining_instance_names = instance_names
815        patch_backend_service(gcp,
816                              backend_service, [remaining_instance_group],
817                              balancing_mode='RATE')
818        wait_until_all_rpcs_go_to_given_backends(remaining_instance_names,
819                                                 _WAIT_FOR_BACKEND_SEC)
820    finally:
821        patch_backend_service(gcp, backend_service, [instance_group])
822        wait_until_all_rpcs_go_to_given_backends(instance_names,
823                                                 _WAIT_FOR_BACKEND_SEC)
824
825
826def test_round_robin(gcp, backend_service, instance_group):
827    logger.info('Running test_round_robin')
828    wait_for_healthy_backends(gcp, backend_service, instance_group)
829    instance_names = get_instance_names(gcp, instance_group)
830    threshold = 1
831    wait_until_all_rpcs_go_to_given_backends(instance_names,
832                                             _WAIT_FOR_STATS_SEC)
833    # TODO(ericgribkoff) Delayed config propagation from earlier tests
834    # may result in briefly receiving an empty EDS update, resulting in failed
835    # RPCs. Retry distribution validation if this occurs; long-term fix is
836    # creating new backend resources for each individual test case.
837    # Each attempt takes 10 seconds. Config propagation can take several
838    # minutes.
839    max_attempts = 40
840    for i in range(max_attempts):
841        stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
842        requests_received = [stats.rpcs_by_peer[x] for x in stats.rpcs_by_peer]
843        total_requests_received = sum(requests_received)
844        if total_requests_received != _NUM_TEST_RPCS:
845            logger.info('Unexpected RPC failures, retrying: %s', stats)
846            continue
847        expected_requests = total_requests_received / len(instance_names)
848        for instance in instance_names:
849            if abs(stats.rpcs_by_peer[instance] -
850                   expected_requests) > threshold:
851                raise Exception(
852                    'RPC peer distribution differs from expected by more than %d '
853                    'for instance %s (%s)' % (threshold, instance, stats))
854        return
855    raise Exception('RPC failures persisted through %d retries' % max_attempts)
856
857
858def test_secondary_locality_gets_no_requests_on_partial_primary_failure(
859        gcp,
860        backend_service,
861        primary_instance_group,
862        secondary_instance_group,
863        swapped_primary_and_secondary=False):
864    logger.info(
865        'Running secondary_locality_gets_no_requests_on_partial_primary_failure'
866    )
867    try:
868        patch_backend_service(
869            gcp, backend_service,
870            [primary_instance_group, secondary_instance_group])
871        wait_for_healthy_backends(gcp, backend_service, primary_instance_group)
872        wait_for_healthy_backends(gcp, backend_service,
873                                  secondary_instance_group)
874        primary_instance_names = get_instance_names(gcp, primary_instance_group)
875        wait_until_all_rpcs_go_to_given_backends(primary_instance_names,
876                                                 _WAIT_FOR_STATS_SEC)
877        instances_to_stop = primary_instance_names[:1]
878        remaining_instances = primary_instance_names[1:]
879        try:
880            set_serving_status(instances_to_stop,
881                               gcp.service_port,
882                               serving=False)
883            wait_until_all_rpcs_go_to_given_backends(remaining_instances,
884                                                     _WAIT_FOR_BACKEND_SEC)
885        finally:
886            set_serving_status(primary_instance_names,
887                               gcp.service_port,
888                               serving=True)
889    except RpcDistributionError as e:
890        if not swapped_primary_and_secondary and is_primary_instance_group(
891                gcp, secondary_instance_group):
892            # Swap expectation of primary and secondary instance groups.
893            test_secondary_locality_gets_no_requests_on_partial_primary_failure(
894                gcp,
895                backend_service,
896                secondary_instance_group,
897                primary_instance_group,
898                swapped_primary_and_secondary=True)
899        else:
900            raise e
901    finally:
902        patch_backend_service(gcp, backend_service, [primary_instance_group])
903
904
905def test_secondary_locality_gets_requests_on_primary_failure(
906        gcp,
907        backend_service,
908        primary_instance_group,
909        secondary_instance_group,
910        swapped_primary_and_secondary=False):
911    logger.info('Running secondary_locality_gets_requests_on_primary_failure')
912    try:
913        patch_backend_service(
914            gcp, backend_service,
915            [primary_instance_group, secondary_instance_group])
916        wait_for_healthy_backends(gcp, backend_service, primary_instance_group)
917        wait_for_healthy_backends(gcp, backend_service,
918                                  secondary_instance_group)
919        primary_instance_names = get_instance_names(gcp, primary_instance_group)
920        secondary_instance_names = get_instance_names(gcp,
921                                                      secondary_instance_group)
922        wait_until_all_rpcs_go_to_given_backends(primary_instance_names,
923                                                 _WAIT_FOR_STATS_SEC)
924        try:
925            set_serving_status(primary_instance_names,
926                               gcp.service_port,
927                               serving=False)
928            wait_until_all_rpcs_go_to_given_backends(secondary_instance_names,
929                                                     _WAIT_FOR_BACKEND_SEC)
930        finally:
931            set_serving_status(primary_instance_names,
932                               gcp.service_port,
933                               serving=True)
934    except RpcDistributionError as e:
935        if not swapped_primary_and_secondary and is_primary_instance_group(
936                gcp, secondary_instance_group):
937            # Swap expectation of primary and secondary instance groups.
938            test_secondary_locality_gets_requests_on_primary_failure(
939                gcp,
940                backend_service,
941                secondary_instance_group,
942                primary_instance_group,
943                swapped_primary_and_secondary=True)
944        else:
945            raise e
946    finally:
947        patch_backend_service(gcp, backend_service, [primary_instance_group])
948
949
950def prepare_services_for_urlmap_tests(gcp, original_backend_service,
951                                      instance_group, alternate_backend_service,
952                                      same_zone_instance_group):
953    '''
954    This function prepares the services to be ready for tests that modifies
955    urlmaps.
956
957    Returns:
958      Returns original and alternate backend names as lists of strings.
959    '''
960    logger.info('waiting for original backends to become healthy')
961    wait_for_healthy_backends(gcp, original_backend_service, instance_group)
962
963    patch_backend_service(gcp, alternate_backend_service,
964                          [same_zone_instance_group])
965    logger.info('waiting for alternate to become healthy')
966    wait_for_healthy_backends(gcp, alternate_backend_service,
967                              same_zone_instance_group)
968
969    original_backend_instances = get_instance_names(gcp, instance_group)
970    logger.info('original backends instances: %s', original_backend_instances)
971
972    alternate_backend_instances = get_instance_names(gcp,
973                                                     same_zone_instance_group)
974    logger.info('alternate backends instances: %s', alternate_backend_instances)
975
976    # Start with all traffic going to original_backend_service.
977    logger.info('waiting for traffic to all go to original backends')
978    wait_until_all_rpcs_go_to_given_backends(original_backend_instances,
979                                             _WAIT_FOR_STATS_SEC)
980    return original_backend_instances, alternate_backend_instances
981
982
983def test_metadata_filter(gcp, original_backend_service, instance_group,
984                         alternate_backend_service, same_zone_instance_group):
985    logger.info("Running test_metadata_filter")
986    wait_for_healthy_backends(gcp, original_backend_service, instance_group)
987    original_backend_instances = get_instance_names(gcp, instance_group)
988    alternate_backend_instances = get_instance_names(gcp,
989                                                     same_zone_instance_group)
990    patch_backend_service(gcp, alternate_backend_service,
991                          [same_zone_instance_group])
992    wait_for_healthy_backends(gcp, alternate_backend_service,
993                              same_zone_instance_group)
994    try:
995        with open(bootstrap_path) as f:
996            md = json.load(f)['node']['metadata']
997            match_labels = []
998            for k, v in md.items():
999                match_labels.append({'name': k, 'value': v})
1000
1001        not_match_labels = [{'name': 'fake', 'value': 'fail'}]
1002        test_route_rules = [
1003            # test MATCH_ALL
1004            [
1005                {
1006                    'priority': 0,
1007                    'matchRules': [{
1008                        'prefixMatch':
1009                            '/',
1010                        'metadataFilters': [{
1011                            'filterMatchCriteria': 'MATCH_ALL',
1012                            'filterLabels': not_match_labels
1013                        }]
1014                    }],
1015                    'service': original_backend_service.url
1016                },
1017                {
1018                    'priority': 1,
1019                    'matchRules': [{
1020                        'prefixMatch':
1021                            '/',
1022                        'metadataFilters': [{
1023                            'filterMatchCriteria': 'MATCH_ALL',
1024                            'filterLabels': match_labels
1025                        }]
1026                    }],
1027                    'service': alternate_backend_service.url
1028                },
1029            ],
1030            # test mixing MATCH_ALL and MATCH_ANY
1031            # test MATCH_ALL: super set labels won't match
1032            [
1033                {
1034                    'priority': 0,
1035                    'matchRules': [{
1036                        'prefixMatch':
1037                            '/',
1038                        'metadataFilters': [{
1039                            'filterMatchCriteria': 'MATCH_ALL',
1040                            'filterLabels': not_match_labels + match_labels
1041                        }]
1042                    }],
1043                    'service': original_backend_service.url
1044                },
1045                {
1046                    'priority': 1,
1047                    'matchRules': [{
1048                        'prefixMatch':
1049                            '/',
1050                        'metadataFilters': [{
1051                            'filterMatchCriteria': 'MATCH_ANY',
1052                            'filterLabels': not_match_labels + match_labels
1053                        }]
1054                    }],
1055                    'service': alternate_backend_service.url
1056                },
1057            ],
1058            # test MATCH_ANY
1059            [
1060                {
1061                    'priority': 0,
1062                    'matchRules': [{
1063                        'prefixMatch':
1064                            '/',
1065                        'metadataFilters': [{
1066                            'filterMatchCriteria': 'MATCH_ANY',
1067                            'filterLabels': not_match_labels
1068                        }]
1069                    }],
1070                    'service': original_backend_service.url
1071                },
1072                {
1073                    'priority': 1,
1074                    'matchRules': [{
1075                        'prefixMatch':
1076                            '/',
1077                        'metadataFilters': [{
1078                            'filterMatchCriteria': 'MATCH_ANY',
1079                            'filterLabels': not_match_labels + match_labels
1080                        }]
1081                    }],
1082                    'service': alternate_backend_service.url
1083                },
1084            ],
1085            # test match multiple route rules
1086            [
1087                {
1088                    'priority': 0,
1089                    'matchRules': [{
1090                        'prefixMatch':
1091                            '/',
1092                        'metadataFilters': [{
1093                            'filterMatchCriteria': 'MATCH_ANY',
1094                            'filterLabels': match_labels
1095                        }]
1096                    }],
1097                    'service': alternate_backend_service.url
1098                },
1099                {
1100                    'priority': 1,
1101                    'matchRules': [{
1102                        'prefixMatch':
1103                            '/',
1104                        'metadataFilters': [{
1105                            'filterMatchCriteria': 'MATCH_ALL',
1106                            'filterLabels': match_labels
1107                        }]
1108                    }],
1109                    'service': original_backend_service.url
1110                },
1111            ]
1112        ]
1113
1114        for route_rules in test_route_rules:
1115            wait_until_all_rpcs_go_to_given_backends(original_backend_instances,
1116                                                     _WAIT_FOR_STATS_SEC)
1117            patch_url_map_backend_service(gcp,
1118                                          original_backend_service,
1119                                          route_rules=route_rules)
1120            wait_until_no_rpcs_go_to_given_backends(original_backend_instances,
1121                                                    _WAIT_FOR_STATS_SEC)
1122            wait_until_all_rpcs_go_to_given_backends(
1123                alternate_backend_instances, _WAIT_FOR_STATS_SEC)
1124            patch_url_map_backend_service(gcp, original_backend_service)
1125    finally:
1126        patch_backend_service(gcp, alternate_backend_service, [])
1127
1128
1129def test_api_listener(gcp, backend_service, instance_group,
1130                      alternate_backend_service):
1131    logger.info("Running api_listener")
1132    try:
1133        wait_for_healthy_backends(gcp, backend_service, instance_group)
1134        backend_instances = get_instance_names(gcp, instance_group)
1135        wait_until_all_rpcs_go_to_given_backends(backend_instances,
1136                                                 _WAIT_FOR_STATS_SEC)
1137        # create a second suite of map+tp+fr with the same host name in host rule
1138        # and we have to disable proxyless validation because it needs `0.0.0.0`
1139        # ip address in fr for proxyless and also we violate ip:port uniqueness
1140        # for test purpose. See https://github.com/grpc/grpc-java/issues/8009
1141        new_config_suffix = '2'
1142        create_url_map(gcp, url_map_name + new_config_suffix, backend_service,
1143                       service_host_name)
1144        create_target_proxy(gcp, target_proxy_name + new_config_suffix, False)
1145        if not gcp.service_port:
1146            raise Exception(
1147                'Faied to find a valid port for the forwarding rule')
1148        potential_ip_addresses = []
1149        max_attempts = 10
1150        for i in range(max_attempts):
1151            potential_ip_addresses.append('10.10.10.%d' %
1152                                          (random.randint(0, 255)))
1153        create_global_forwarding_rule(gcp,
1154                                      forwarding_rule_name + new_config_suffix,
1155                                      [gcp.service_port],
1156                                      potential_ip_addresses)
1157        if gcp.service_port != _DEFAULT_SERVICE_PORT:
1158            patch_url_map_host_rule_with_port(gcp,
1159                                              url_map_name + new_config_suffix,
1160                                              backend_service,
1161                                              service_host_name)
1162        wait_until_all_rpcs_go_to_given_backends(backend_instances,
1163                                                 _WAIT_FOR_STATS_SEC)
1164
1165        delete_global_forwarding_rule(gcp, forwarding_rule_name)
1166        delete_target_proxy(gcp, target_proxy_name)
1167        delete_url_map(gcp, url_map_name)
1168        verify_attempts = int(_WAIT_FOR_URL_MAP_PATCH_SEC / _NUM_TEST_RPCS *
1169                              args.qps)
1170        for i in range(verify_attempts):
1171            wait_until_all_rpcs_go_to_given_backends(backend_instances,
1172                                                     _WAIT_FOR_STATS_SEC)
1173        # delete host rule for the original host name
1174        patch_url_map_backend_service(gcp, alternate_backend_service)
1175        wait_until_no_rpcs_go_to_given_backends(backend_instances,
1176                                                _WAIT_FOR_STATS_SEC)
1177
1178    finally:
1179        delete_global_forwarding_rule(gcp,
1180                                      forwarding_rule_name + new_config_suffix)
1181        delete_target_proxy(gcp, target_proxy_name + new_config_suffix)
1182        delete_url_map(gcp, url_map_name + new_config_suffix)
1183        create_url_map(gcp, url_map_name, backend_service, service_host_name)
1184        create_target_proxy(gcp, target_proxy_name)
1185        create_global_forwarding_rule(gcp, forwarding_rule_name,
1186                                      potential_service_ports)
1187        if gcp.service_port != _DEFAULT_SERVICE_PORT:
1188            patch_url_map_host_rule_with_port(gcp, url_map_name,
1189                                              backend_service,
1190                                              service_host_name)
1191            server_uri = service_host_name + ':' + str(gcp.service_port)
1192        else:
1193            server_uri = service_host_name
1194        return server_uri
1195
1196
1197def test_forwarding_rule_port_match(gcp, backend_service, instance_group):
1198    logger.info("Running test_forwarding_rule_port_match")
1199    try:
1200        wait_for_healthy_backends(gcp, backend_service, instance_group)
1201        backend_instances = get_instance_names(gcp, instance_group)
1202        wait_until_all_rpcs_go_to_given_backends(backend_instances,
1203                                                 _WAIT_FOR_STATS_SEC)
1204        delete_global_forwarding_rule(gcp)
1205        create_global_forwarding_rule(gcp, forwarding_rule_name, [
1206            x for x in parse_port_range(_DEFAULT_PORT_RANGE)
1207            if x != gcp.service_port
1208        ])
1209        wait_until_no_rpcs_go_to_given_backends(backend_instances,
1210                                                _WAIT_FOR_STATS_SEC)
1211    finally:
1212        delete_global_forwarding_rule(gcp)
1213        create_global_forwarding_rule(gcp, forwarding_rule_name,
1214                                      potential_service_ports)
1215        if gcp.service_port != _DEFAULT_SERVICE_PORT:
1216            patch_url_map_host_rule_with_port(gcp, url_map_name,
1217                                              backend_service,
1218                                              service_host_name)
1219            server_uri = service_host_name + ':' + str(gcp.service_port)
1220        else:
1221            server_uri = service_host_name
1222        return server_uri
1223
1224
1225def test_forwarding_rule_default_port(gcp, backend_service, instance_group):
1226    logger.info("Running test_forwarding_rule_default_port")
1227    try:
1228        wait_for_healthy_backends(gcp, backend_service, instance_group)
1229        backend_instances = get_instance_names(gcp, instance_group)
1230        if gcp.service_port == _DEFAULT_SERVICE_PORT:
1231            wait_until_all_rpcs_go_to_given_backends(backend_instances,
1232                                                     _WAIT_FOR_STATS_SEC)
1233            delete_global_forwarding_rule(gcp)
1234            create_global_forwarding_rule(gcp, forwarding_rule_name,
1235                                          parse_port_range(_DEFAULT_PORT_RANGE))
1236            patch_url_map_host_rule_with_port(gcp, url_map_name,
1237                                              backend_service,
1238                                              service_host_name)
1239        wait_until_no_rpcs_go_to_given_backends(backend_instances,
1240                                                _WAIT_FOR_STATS_SEC)
1241        # expect success when no port in client request service uri, and no port in url-map
1242        delete_global_forwarding_rule(gcp)
1243        delete_target_proxy(gcp)
1244        delete_url_map(gcp)
1245        create_url_map(gcp, url_map_name, backend_service, service_host_name)
1246        create_target_proxy(gcp, gcp.target_proxy.name, False)
1247        potential_ip_addresses = []
1248        max_attempts = 10
1249        for i in range(max_attempts):
1250            potential_ip_addresses.append('10.10.10.%d' %
1251                                          (random.randint(0, 255)))
1252        create_global_forwarding_rule(gcp, forwarding_rule_name, [80],
1253                                      potential_ip_addresses)
1254        wait_until_all_rpcs_go_to_given_backends(backend_instances,
1255                                                 _WAIT_FOR_STATS_SEC)
1256
1257        # expect failure when no port in client request uri, but specify port in url-map
1258        patch_url_map_host_rule_with_port(gcp, url_map_name, backend_service,
1259                                          service_host_name)
1260        wait_until_no_rpcs_go_to_given_backends(backend_instances,
1261                                                _WAIT_FOR_STATS_SEC)
1262    finally:
1263        delete_global_forwarding_rule(gcp)
1264        delete_target_proxy(gcp)
1265        delete_url_map(gcp)
1266        create_url_map(gcp, url_map_name, backend_service, service_host_name)
1267        create_target_proxy(gcp, target_proxy_name)
1268        create_global_forwarding_rule(gcp, forwarding_rule_name,
1269                                      potential_service_ports)
1270        if gcp.service_port != _DEFAULT_SERVICE_PORT:
1271            patch_url_map_host_rule_with_port(gcp, url_map_name,
1272                                              backend_service,
1273                                              service_host_name)
1274            server_uri = service_host_name + ':' + str(gcp.service_port)
1275        else:
1276            server_uri = service_host_name
1277        return server_uri
1278
1279
1280def test_traffic_splitting(gcp, original_backend_service, instance_group,
1281                           alternate_backend_service, same_zone_instance_group):
1282    # This test start with all traffic going to original_backend_service. Then
1283    # it updates URL-map to set default action to traffic splitting between
1284    # original and alternate. It waits for all backends in both services to
1285    # receive traffic, then verifies that weights are expected.
1286    logger.info('Running test_traffic_splitting')
1287
1288    original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests(
1289        gcp, original_backend_service, instance_group,
1290        alternate_backend_service, same_zone_instance_group)
1291
1292    try:
1293        # Patch urlmap, change route action to traffic splitting between
1294        # original and alternate.
1295        logger.info('patching url map with traffic splitting')
1296        original_service_percentage, alternate_service_percentage = 20, 80
1297        patch_url_map_backend_service(
1298            gcp,
1299            services_with_weights={
1300                original_backend_service: original_service_percentage,
1301                alternate_backend_service: alternate_service_percentage,
1302            })
1303        # Split percentage between instances: [20,80] -> [10,10,40,40].
1304        expected_instance_percentage = [
1305            original_service_percentage * 1.0 / len(original_backend_instances)
1306        ] * len(original_backend_instances) + [
1307            alternate_service_percentage * 1.0 /
1308            len(alternate_backend_instances)
1309        ] * len(alternate_backend_instances)
1310
1311        # Wait for traffic to go to both services.
1312        logger.info(
1313            'waiting for traffic to go to all backends (including alternate)')
1314        wait_until_all_rpcs_go_to_given_backends(
1315            original_backend_instances + alternate_backend_instances,
1316            _WAIT_FOR_STATS_SEC)
1317
1318        # Verify that weights between two services are expected.
1319        retry_count = 10
1320        # Each attempt takes about 10 seconds, 10 retries is equivalent to 100
1321        # seconds timeout.
1322        for i in range(retry_count):
1323            stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
1324            got_instance_count = [
1325                stats.rpcs_by_peer[i] for i in original_backend_instances
1326            ] + [stats.rpcs_by_peer[i] for i in alternate_backend_instances]
1327            total_count = sum(got_instance_count)
1328            got_instance_percentage = [
1329                x * 100.0 / total_count for x in got_instance_count
1330            ]
1331
1332            try:
1333                compare_distributions(got_instance_percentage,
1334                                      expected_instance_percentage, 5)
1335            except Exception as e:
1336                logger.info('attempt %d', i)
1337                logger.info('got percentage: %s', got_instance_percentage)
1338                logger.info('expected percentage: %s',
1339                            expected_instance_percentage)
1340                logger.info(e)
1341                if i == retry_count - 1:
1342                    raise Exception(
1343                        'RPC distribution (%s) differs from expected (%s)' %
1344                        (got_instance_percentage, expected_instance_percentage))
1345            else:
1346                logger.info("success")
1347                break
1348    finally:
1349        patch_url_map_backend_service(gcp, original_backend_service)
1350        patch_backend_service(gcp, alternate_backend_service, [])
1351
1352
1353def test_path_matching(gcp, original_backend_service, instance_group,
1354                       alternate_backend_service, same_zone_instance_group):
1355    # This test start with all traffic (UnaryCall and EmptyCall) going to
1356    # original_backend_service.
1357    #
1358    # Then it updates URL-map to add routes, to make UnaryCall and EmptyCall to
1359    # go different backends. It waits for all backends in both services to
1360    # receive traffic, then verifies that traffic goes to the expected
1361    # backends.
1362    logger.info('Running test_path_matching')
1363
1364    original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests(
1365        gcp, original_backend_service, instance_group,
1366        alternate_backend_service, same_zone_instance_group)
1367
1368    try:
1369        # A list of tuples (route_rules, expected_instances).
1370        test_cases = [
1371            (
1372                [{
1373                    'priority': 0,
1374                    # FullPath EmptyCall -> alternate_backend_service.
1375                    'matchRules': [{
1376                        'fullPathMatch': '/grpc.testing.TestService/EmptyCall'
1377                    }],
1378                    'service': alternate_backend_service.url
1379                }],
1380                {
1381                    "EmptyCall": alternate_backend_instances,
1382                    "UnaryCall": original_backend_instances
1383                }),
1384            (
1385                [{
1386                    'priority': 0,
1387                    # Prefix UnaryCall -> alternate_backend_service.
1388                    'matchRules': [{
1389                        'prefixMatch': '/grpc.testing.TestService/Unary'
1390                    }],
1391                    'service': alternate_backend_service.url
1392                }],
1393                {
1394                    "UnaryCall": alternate_backend_instances,
1395                    "EmptyCall": original_backend_instances
1396                }),
1397            (
1398                # This test case is similar to the one above (but with route
1399                # services swapped). This test has two routes (full_path and
1400                # the default) to match EmptyCall, and both routes set
1401                # alternative_backend_service as the action. This forces the
1402                # client to handle duplicate Clusters in the RDS response.
1403                [
1404                    {
1405                        'priority': 0,
1406                        # Prefix UnaryCall -> original_backend_service.
1407                        'matchRules': [{
1408                            'prefixMatch': '/grpc.testing.TestService/Unary'
1409                        }],
1410                        'service': original_backend_service.url
1411                    },
1412                    {
1413                        'priority': 1,
1414                        # FullPath EmptyCall -> alternate_backend_service.
1415                        'matchRules': [{
1416                            'fullPathMatch':
1417                                '/grpc.testing.TestService/EmptyCall'
1418                        }],
1419                        'service': alternate_backend_service.url
1420                    }
1421                ],
1422                {
1423                    "UnaryCall": original_backend_instances,
1424                    "EmptyCall": alternate_backend_instances
1425                }),
1426            (
1427                [{
1428                    'priority': 0,
1429                    # Regex UnaryCall -> alternate_backend_service.
1430                    'matchRules': [{
1431                        'regexMatch':
1432                            '^\/.*\/UnaryCall$'  # Unary methods with any services.
1433                    }],
1434                    'service': alternate_backend_service.url
1435                }],
1436                {
1437                    "UnaryCall": alternate_backend_instances,
1438                    "EmptyCall": original_backend_instances
1439                }),
1440            (
1441                [{
1442                    'priority': 0,
1443                    # ignoreCase EmptyCall -> alternate_backend_service.
1444                    'matchRules': [{
1445                        # Case insensitive matching.
1446                        'fullPathMatch': '/gRpC.tEsTinG.tEstseRvice/empTycaLl',
1447                        'ignoreCase': True,
1448                    }],
1449                    'service': alternate_backend_service.url
1450                }],
1451                {
1452                    "UnaryCall": original_backend_instances,
1453                    "EmptyCall": alternate_backend_instances
1454                }),
1455        ]
1456
1457        for (route_rules, expected_instances) in test_cases:
1458            logger.info('patching url map with %s', route_rules)
1459            patch_url_map_backend_service(gcp,
1460                                          original_backend_service,
1461                                          route_rules=route_rules)
1462
1463            # Wait for traffic to go to both services.
1464            logger.info(
1465                'waiting for traffic to go to all backends (including alternate)'
1466            )
1467            wait_until_all_rpcs_go_to_given_backends(
1468                original_backend_instances + alternate_backend_instances,
1469                _WAIT_FOR_STATS_SEC)
1470
1471            retry_count = 80
1472            # Each attempt takes about 5 seconds, 80 retries is equivalent to 400
1473            # seconds timeout.
1474            for i in range(retry_count):
1475                stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
1476                if not stats.rpcs_by_method:
1477                    raise ValueError(
1478                        'stats.rpcs_by_method is None, the interop client stats service does not support this test case'
1479                    )
1480                logger.info('attempt %d', i)
1481                if compare_expected_instances(stats, expected_instances):
1482                    logger.info("success")
1483                    break
1484                elif i == retry_count - 1:
1485                    raise Exception(
1486                        'timeout waiting for RPCs to the expected instances: %s'
1487                        % expected_instances)
1488    finally:
1489        patch_url_map_backend_service(gcp, original_backend_service)
1490        patch_backend_service(gcp, alternate_backend_service, [])
1491
1492
1493def test_header_matching(gcp, original_backend_service, instance_group,
1494                         alternate_backend_service, same_zone_instance_group):
1495    # This test start with all traffic (UnaryCall and EmptyCall) going to
1496    # original_backend_service.
1497    #
1498    # Then it updates URL-map to add routes, to make RPCs with test headers to
1499    # go to different backends. It waits for all backends in both services to
1500    # receive traffic, then verifies that traffic goes to the expected
1501    # backends.
1502    logger.info('Running test_header_matching')
1503
1504    original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests(
1505        gcp, original_backend_service, instance_group,
1506        alternate_backend_service, same_zone_instance_group)
1507
1508    try:
1509        # A list of tuples (route_rules, expected_instances).
1510        test_cases = [
1511            (
1512                [{
1513                    'priority': 0,
1514                    # Header ExactMatch -> alternate_backend_service.
1515                    # EmptyCall is sent with the metadata.
1516                    'matchRules': [{
1517                        'prefixMatch':
1518                            '/',
1519                        'headerMatches': [{
1520                            'headerName': _TEST_METADATA_KEY,
1521                            'exactMatch': _TEST_METADATA_VALUE_EMPTY
1522                        }]
1523                    }],
1524                    'service': alternate_backend_service.url
1525                }],
1526                {
1527                    "EmptyCall": alternate_backend_instances,
1528                    "UnaryCall": original_backend_instances
1529                }),
1530            (
1531                [{
1532                    'priority': 0,
1533                    # Header PrefixMatch -> alternate_backend_service.
1534                    # UnaryCall is sent with the metadata.
1535                    'matchRules': [{
1536                        'prefixMatch':
1537                            '/',
1538                        'headerMatches': [{
1539                            'headerName': _TEST_METADATA_KEY,
1540                            'prefixMatch': _TEST_METADATA_VALUE_UNARY[:2]
1541                        }]
1542                    }],
1543                    'service': alternate_backend_service.url
1544                }],
1545                {
1546                    "EmptyCall": original_backend_instances,
1547                    "UnaryCall": alternate_backend_instances
1548                }),
1549            (
1550                [{
1551                    'priority': 0,
1552                    # Header SuffixMatch -> alternate_backend_service.
1553                    # EmptyCall is sent with the metadata.
1554                    'matchRules': [{
1555                        'prefixMatch':
1556                            '/',
1557                        'headerMatches': [{
1558                            'headerName': _TEST_METADATA_KEY,
1559                            'suffixMatch': _TEST_METADATA_VALUE_EMPTY[-2:]
1560                        }]
1561                    }],
1562                    'service': alternate_backend_service.url
1563                }],
1564                {
1565                    "EmptyCall": alternate_backend_instances,
1566                    "UnaryCall": original_backend_instances
1567                }),
1568            (
1569                [{
1570                    'priority': 0,
1571                    # Header 'xds_md_numeric' present -> alternate_backend_service.
1572                    # UnaryCall is sent with the metadata, so will be sent to alternative.
1573                    'matchRules': [{
1574                        'prefixMatch':
1575                            '/',
1576                        'headerMatches': [{
1577                            'headerName': _TEST_METADATA_NUMERIC_KEY,
1578                            'presentMatch': True
1579                        }]
1580                    }],
1581                    'service': alternate_backend_service.url
1582                }],
1583                {
1584                    "EmptyCall": original_backend_instances,
1585                    "UnaryCall": alternate_backend_instances
1586                }),
1587            (
1588                [{
1589                    'priority': 0,
1590                    # Header invert ExactMatch -> alternate_backend_service.
1591                    # UnaryCall is sent with the metadata, so will be sent to
1592                    # original. EmptyCall will be sent to alternative.
1593                    'matchRules': [{
1594                        'prefixMatch':
1595                            '/',
1596                        'headerMatches': [{
1597                            'headerName': _TEST_METADATA_KEY,
1598                            'exactMatch': _TEST_METADATA_VALUE_UNARY,
1599                            'invertMatch': True
1600                        }]
1601                    }],
1602                    'service': alternate_backend_service.url
1603                }],
1604                {
1605                    "EmptyCall": alternate_backend_instances,
1606                    "UnaryCall": original_backend_instances
1607                }),
1608            (
1609                [{
1610                    'priority': 0,
1611                    # Header 'xds_md_numeric' range [100,200] -> alternate_backend_service.
1612                    # UnaryCall is sent with the metadata in range.
1613                    'matchRules': [{
1614                        'prefixMatch':
1615                            '/',
1616                        'headerMatches': [{
1617                            'headerName': _TEST_METADATA_NUMERIC_KEY,
1618                            'rangeMatch': {
1619                                'rangeStart': '100',
1620                                'rangeEnd': '200'
1621                            }
1622                        }]
1623                    }],
1624                    'service': alternate_backend_service.url
1625                }],
1626                {
1627                    "EmptyCall": original_backend_instances,
1628                    "UnaryCall": alternate_backend_instances
1629                }),
1630            (
1631                [{
1632                    'priority': 0,
1633                    # Header RegexMatch -> alternate_backend_service.
1634                    # EmptyCall is sent with the metadata.
1635                    'matchRules': [{
1636                        'prefixMatch':
1637                            '/',
1638                        'headerMatches': [{
1639                            'headerName':
1640                                _TEST_METADATA_KEY,
1641                            'regexMatch':
1642                                "^%s.*%s$" % (_TEST_METADATA_VALUE_EMPTY[:2],
1643                                              _TEST_METADATA_VALUE_EMPTY[-2:])
1644                        }]
1645                    }],
1646                    'service': alternate_backend_service.url
1647                }],
1648                {
1649                    "EmptyCall": alternate_backend_instances,
1650                    "UnaryCall": original_backend_instances
1651                }),
1652        ]
1653
1654        for (route_rules, expected_instances) in test_cases:
1655            logger.info('patching url map with %s -> alternative',
1656                        route_rules[0]['matchRules'])
1657            patch_url_map_backend_service(gcp,
1658                                          original_backend_service,
1659                                          route_rules=route_rules)
1660
1661            # Wait for traffic to go to both services.
1662            logger.info(
1663                'waiting for traffic to go to all backends (including alternate)'
1664            )
1665            wait_until_all_rpcs_go_to_given_backends(
1666                original_backend_instances + alternate_backend_instances,
1667                _WAIT_FOR_STATS_SEC)
1668
1669            retry_count = 80
1670            # Each attempt takes about 5 seconds, 80 retries is equivalent to 400
1671            # seconds timeout.
1672            for i in range(retry_count):
1673                stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
1674                if not stats.rpcs_by_method:
1675                    raise ValueError(
1676                        'stats.rpcs_by_method is None, the interop client stats service does not support this test case'
1677                    )
1678                logger.info('attempt %d', i)
1679                if compare_expected_instances(stats, expected_instances):
1680                    logger.info("success")
1681                    break
1682                elif i == retry_count - 1:
1683                    raise Exception(
1684                        'timeout waiting for RPCs to the expected instances: %s'
1685                        % expected_instances)
1686    finally:
1687        patch_url_map_backend_service(gcp, original_backend_service)
1688        patch_backend_service(gcp, alternate_backend_service, [])
1689
1690
1691def test_circuit_breaking(gcp, original_backend_service, instance_group,
1692                          same_zone_instance_group):
1693    '''
1694    Since backend service circuit_breakers configuration cannot be unset,
1695    which causes trouble for restoring validate_for_proxy flag in target
1696    proxy/global forwarding rule. This test uses dedicated backend sevices.
1697    The url_map and backend services undergoes the following state changes:
1698
1699    Before test:
1700       original_backend_service -> [instance_group]
1701       extra_backend_service -> []
1702       more_extra_backend_service -> []
1703
1704       url_map -> [original_backend_service]
1705
1706    In test:
1707       extra_backend_service (with circuit_breakers) -> [instance_group]
1708       more_extra_backend_service (with circuit_breakers) -> [same_zone_instance_group]
1709
1710       url_map -> [extra_backend_service, more_extra_backend_service]
1711
1712    After test:
1713       original_backend_service -> [instance_group]
1714       extra_backend_service (with circuit_breakers) -> []
1715       more_extra_backend_service (with circuit_breakers) -> []
1716
1717       url_map -> [original_backend_service]
1718    '''
1719    logger.info('Running test_circuit_breaking')
1720    additional_backend_services = []
1721    try:
1722        # TODO(chengyuanzhang): Dedicated backend services created for circuit
1723        # breaking test. Once the issue for unsetting backend service circuit
1724        # breakers is resolved or configuring backend service circuit breakers is
1725        # enabled for config validation, these dedicated backend services can be
1726        # eliminated.
1727        extra_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-extra' + gcp_suffix
1728        more_extra_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-more-extra' + gcp_suffix
1729        extra_backend_service = add_backend_service(gcp,
1730                                                    extra_backend_service_name)
1731        additional_backend_services.append(extra_backend_service)
1732        more_extra_backend_service = add_backend_service(
1733            gcp, more_extra_backend_service_name)
1734        additional_backend_services.append(more_extra_backend_service)
1735        # The config validation for proxyless doesn't allow setting
1736        # circuit_breakers. Disable validate validate_for_proxyless
1737        # for this test. This can be removed when validation
1738        # accepts circuit_breakers.
1739        logger.info('disabling validate_for_proxyless in target proxy')
1740        set_validate_for_proxyless(gcp, False)
1741        extra_backend_service_max_requests = 500
1742        more_extra_backend_service_max_requests = 1000
1743        patch_backend_service(gcp,
1744                              extra_backend_service, [instance_group],
1745                              circuit_breakers={
1746                                  'maxRequests':
1747                                      extra_backend_service_max_requests
1748                              })
1749        logger.info('Waiting for extra backends to become healthy')
1750        wait_for_healthy_backends(gcp, extra_backend_service, instance_group)
1751        patch_backend_service(gcp,
1752                              more_extra_backend_service,
1753                              [same_zone_instance_group],
1754                              circuit_breakers={
1755                                  'maxRequests':
1756                                      more_extra_backend_service_max_requests
1757                              })
1758        logger.info('Waiting for more extra backend to become healthy')
1759        wait_for_healthy_backends(gcp, more_extra_backend_service,
1760                                  same_zone_instance_group)
1761        extra_backend_instances = get_instance_names(gcp, instance_group)
1762        more_extra_backend_instances = get_instance_names(
1763            gcp, same_zone_instance_group)
1764        route_rules = [
1765            {
1766                'priority': 0,
1767                # UnaryCall -> extra_backend_service
1768                'matchRules': [{
1769                    'fullPathMatch': '/grpc.testing.TestService/UnaryCall'
1770                }],
1771                'service': extra_backend_service.url
1772            },
1773            {
1774                'priority': 1,
1775                # EmptyCall -> more_extra_backend_service
1776                'matchRules': [{
1777                    'fullPathMatch': '/grpc.testing.TestService/EmptyCall'
1778                }],
1779                'service': more_extra_backend_service.url
1780            },
1781        ]
1782
1783        # Make client send UNARY_CALL and EMPTY_CALL.
1784        configure_client([
1785            messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1786            messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL
1787        ])
1788        logger.info('Patching url map with %s', route_rules)
1789        patch_url_map_backend_service(gcp,
1790                                      extra_backend_service,
1791                                      route_rules=route_rules)
1792        logger.info('Waiting for traffic to go to all backends')
1793        wait_until_all_rpcs_go_to_given_backends(
1794            extra_backend_instances + more_extra_backend_instances,
1795            _WAIT_FOR_STATS_SEC)
1796
1797        # Make all calls keep-open.
1798        configure_client([
1799            messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1800            messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL
1801        ], [(messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1802             'rpc-behavior', 'keep-open'),
1803            (messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL,
1804             'rpc-behavior', 'keep-open')])
1805        wait_until_rpcs_in_flight(
1806            'UNARY_CALL', (_WAIT_FOR_BACKEND_SEC +
1807                           int(extra_backend_service_max_requests / args.qps)),
1808            extra_backend_service_max_requests, 1)
1809        logger.info('UNARY_CALL reached stable state (%d)',
1810                    extra_backend_service_max_requests)
1811        wait_until_rpcs_in_flight(
1812            'EMPTY_CALL',
1813            (_WAIT_FOR_BACKEND_SEC +
1814             int(more_extra_backend_service_max_requests / args.qps)),
1815            more_extra_backend_service_max_requests, 1)
1816        logger.info('EMPTY_CALL reached stable state (%d)',
1817                    more_extra_backend_service_max_requests)
1818
1819        # Increment circuit breakers max_requests threshold.
1820        extra_backend_service_max_requests = 800
1821        patch_backend_service(gcp,
1822                              extra_backend_service, [instance_group],
1823                              circuit_breakers={
1824                                  'maxRequests':
1825                                      extra_backend_service_max_requests
1826                              })
1827        wait_until_rpcs_in_flight(
1828            'UNARY_CALL', (_WAIT_FOR_BACKEND_SEC +
1829                           int(extra_backend_service_max_requests / args.qps)),
1830            extra_backend_service_max_requests, 1)
1831        logger.info('UNARY_CALL reached stable state after increase (%d)',
1832                    extra_backend_service_max_requests)
1833        logger.info('success')
1834        # Avoid new RPCs being outstanding (some test clients create threads
1835        # for sending RPCs) after restoring backend services.
1836        configure_client(
1837            [messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL])
1838    finally:
1839        patch_url_map_backend_service(gcp, original_backend_service)
1840        patch_backend_service(gcp, original_backend_service, [instance_group])
1841        for backend_service in additional_backend_services:
1842            delete_backend_service(gcp, backend_service)
1843        set_validate_for_proxyless(gcp, True)
1844
1845
1846def test_timeout(gcp, original_backend_service, instance_group):
1847    logger.info('Running test_timeout')
1848
1849    logger.info('waiting for original backends to become healthy')
1850    wait_for_healthy_backends(gcp, original_backend_service, instance_group)
1851
1852    # UnaryCall -> maxStreamDuration:3s
1853    route_rules = [{
1854        'priority': 0,
1855        'matchRules': [{
1856            'fullPathMatch': '/grpc.testing.TestService/UnaryCall'
1857        }],
1858        'service': original_backend_service.url,
1859        'routeAction': {
1860            'maxStreamDuration': {
1861                'seconds': 3,
1862            },
1863        },
1864    }]
1865    patch_url_map_backend_service(gcp,
1866                                  original_backend_service,
1867                                  route_rules=route_rules)
1868    # A list of tuples (testcase_name, {client_config}, {expected_results})
1869    test_cases = [
1870        (
1871            'timeout_exceeded (UNARY_CALL), timeout_different_route (EMPTY_CALL)',
1872            # UnaryCall and EmptyCall both sleep-4.
1873            # UnaryCall timeouts, EmptyCall succeeds.
1874            {
1875                'rpc_types': [
1876                    messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1877                    messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL,
1878                ],
1879                'metadata': [
1880                    (messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1881                     'rpc-behavior', 'sleep-4'),
1882                    (messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL,
1883                     'rpc-behavior', 'sleep-4'),
1884                ],
1885            },
1886            {
1887                'UNARY_CALL': 4,  # DEADLINE_EXCEEDED
1888                'EMPTY_CALL': 0,
1889            },
1890        ),
1891        (
1892            'app_timeout_exceeded',
1893            # UnaryCall only with sleep-2; timeout=1s; calls timeout.
1894            {
1895                'rpc_types': [
1896                    messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1897                ],
1898                'metadata': [
1899                    (messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1900                     'rpc-behavior', 'sleep-2'),
1901                ],
1902                'timeout_sec': 1,
1903            },
1904            {
1905                'UNARY_CALL': 4,  # DEADLINE_EXCEEDED
1906            },
1907        ),
1908        (
1909            'timeout_not_exceeded',
1910            # UnaryCall only with no sleep; calls succeed.
1911            {
1912                'rpc_types': [
1913                    messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1914                ],
1915            },
1916            {
1917                'UNARY_CALL': 0,
1918            },
1919        )
1920    ]
1921
1922    try:
1923        first_case = True
1924        for (testcase_name, client_config, expected_results) in test_cases:
1925            logger.info('starting case %s', testcase_name)
1926            configure_client(**client_config)
1927            # wait a second to help ensure the client stops sending RPCs with
1928            # the old config.  We will make multiple attempts if it is failing,
1929            # but this improves confidence that the test is valid if the
1930            # previous client_config would lead to the same results.
1931            time.sleep(1)
1932            # Each attempt takes 10 seconds; 20 attempts is equivalent to 200
1933            # second timeout.
1934            attempt_count = 20
1935            if first_case:
1936                attempt_count = 120
1937                first_case = False
1938            before_stats = get_client_accumulated_stats()
1939            if not before_stats.stats_per_method:
1940                raise ValueError(
1941                    'stats.stats_per_method is None, the interop client stats service does not support this test case'
1942                )
1943            for i in range(attempt_count):
1944                logger.info('%s: attempt %d', testcase_name, i)
1945
1946                test_runtime_secs = 10
1947                time.sleep(test_runtime_secs)
1948                after_stats = get_client_accumulated_stats()
1949
1950                success = True
1951                for rpc, status in expected_results.items():
1952                    qty = (after_stats.stats_per_method[rpc].result[status] -
1953                           before_stats.stats_per_method[rpc].result[status])
1954                    want = test_runtime_secs * args.qps
1955                    # Allow 10% deviation from expectation to reduce flakiness
1956                    if qty < (want * .9) or qty > (want * 1.1):
1957                        logger.info('%s: failed due to %s[%s]: got %d want ~%d',
1958                                    testcase_name, rpc, status, qty, want)
1959                        success = False
1960                if success:
1961                    logger.info('success')
1962                    break
1963                logger.info('%s attempt %d failed', testcase_name, i)
1964                before_stats = after_stats
1965            else:
1966                raise Exception(
1967                    '%s: timeout waiting for expected results: %s; got %s' %
1968                    (testcase_name, expected_results,
1969                     after_stats.stats_per_method))
1970    finally:
1971        patch_url_map_backend_service(gcp, original_backend_service)
1972
1973
1974def test_fault_injection(gcp, original_backend_service, instance_group):
1975    logger.info('Running test_fault_injection')
1976
1977    logger.info('waiting for original backends to become healthy')
1978    wait_for_healthy_backends(gcp, original_backend_service, instance_group)
1979
1980    testcase_header = 'fi_testcase'
1981
1982    def _route(pri, name, fi_policy):
1983        return {
1984            'priority': pri,
1985            'matchRules': [{
1986                'prefixMatch':
1987                    '/',
1988                'headerMatches': [{
1989                    'headerName': testcase_header,
1990                    'exactMatch': name,
1991                }],
1992            }],
1993            'service': original_backend_service.url,
1994            'routeAction': {
1995                'faultInjectionPolicy': fi_policy
1996            },
1997        }
1998
1999    def _abort(pct):
2000        return {
2001            'abort': {
2002                'httpStatus': 401,
2003                'percentage': pct,
2004            }
2005        }
2006
2007    def _delay(pct):
2008        return {
2009            'delay': {
2010                'fixedDelay': {
2011                    'seconds': '20'
2012                },
2013                'percentage': pct,
2014            }
2015        }
2016
2017    zero_route = _abort(0)
2018    zero_route.update(_delay(0))
2019    route_rules = [
2020        _route(0, 'zero_percent_fault_injection', zero_route),
2021        _route(1, 'always_delay', _delay(100)),
2022        _route(2, 'always_abort', _abort(100)),
2023        _route(3, 'delay_half', _delay(50)),
2024        _route(4, 'abort_half', _abort(50)),
2025        {
2026            'priority': 5,
2027            'matchRules': [{
2028                'prefixMatch': '/'
2029            }],
2030            'service': original_backend_service.url,
2031        },
2032    ]
2033    set_validate_for_proxyless(gcp, False)
2034    patch_url_map_backend_service(gcp,
2035                                  original_backend_service,
2036                                  route_rules=route_rules)
2037    # A list of tuples (testcase_name, {client_config}, {code: percent}).  Each
2038    # test case will set the testcase_header with the testcase_name for routing
2039    # to the appropriate config for the case, defined above.
2040    test_cases = [
2041        (
2042            'zero_percent_fault_injection',
2043            {},
2044            {
2045                0: 1
2046            },  # OK
2047        ),
2048        (
2049            'non_matching_fault_injection',  # Not in route_rules, above.
2050            {},
2051            {
2052                0: 1
2053            },  # OK
2054        ),
2055        (
2056            'always_delay',
2057            {
2058                'timeout_sec': 2
2059            },
2060            {
2061                4: 1
2062            },  # DEADLINE_EXCEEDED
2063        ),
2064        (
2065            'always_abort',
2066            {},
2067            {
2068                16: 1
2069            },  # UNAUTHENTICATED
2070        ),
2071        (
2072            'delay_half',
2073            {
2074                'timeout_sec': 2
2075            },
2076            {
2077                4: .5,
2078                0: .5
2079            },  # DEADLINE_EXCEEDED / OK: 50% / 50%
2080        ),
2081        (
2082            'abort_half',
2083            {},
2084            {
2085                16: .5,
2086                0: .5
2087            },  # UNAUTHENTICATED / OK: 50% / 50%
2088        )
2089    ]
2090
2091    try:
2092        first_case = True
2093        for (testcase_name, client_config, expected_results) in test_cases:
2094            logger.info('starting case %s', testcase_name)
2095
2096            client_config['metadata'] = [
2097                (messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
2098                 testcase_header, testcase_name)
2099            ]
2100            client_config['rpc_types'] = [
2101                messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
2102            ]
2103            configure_client(**client_config)
2104            # wait a second to help ensure the client stops sending RPCs with
2105            # the old config.  We will make multiple attempts if it is failing,
2106            # but this improves confidence that the test is valid if the
2107            # previous client_config would lead to the same results.
2108            time.sleep(1)
2109            # Each attempt takes 10 seconds; 20 attempts is equivalent to 200
2110            # second timeout.
2111            attempt_count = 20
2112            if first_case:
2113                attempt_count = 120
2114                first_case = False
2115            before_stats = get_client_accumulated_stats()
2116            if not before_stats.stats_per_method:
2117                raise ValueError(
2118                    'stats.stats_per_method is None, the interop client stats service does not support this test case'
2119                )
2120            for i in range(attempt_count):
2121                logger.info('%s: attempt %d', testcase_name, i)
2122
2123                test_runtime_secs = 10
2124                time.sleep(test_runtime_secs)
2125                after_stats = get_client_accumulated_stats()
2126
2127                success = True
2128                for status, pct in expected_results.items():
2129                    rpc = 'UNARY_CALL'
2130                    qty = (after_stats.stats_per_method[rpc].result[status] -
2131                           before_stats.stats_per_method[rpc].result[status])
2132                    want = pct * args.qps * test_runtime_secs
2133                    # Allow 10% deviation from expectation to reduce flakiness
2134                    VARIANCE_ALLOWED = 0.1
2135                    if abs(qty - want) > want * VARIANCE_ALLOWED:
2136                        logger.info('%s: failed due to %s[%s]: got %d want ~%d',
2137                                    testcase_name, rpc, status, qty, want)
2138                        success = False
2139                if success:
2140                    logger.info('success')
2141                    break
2142                logger.info('%s attempt %d failed', testcase_name, i)
2143                before_stats = after_stats
2144            else:
2145                raise Exception(
2146                    '%s: timeout waiting for expected results: %s; got %s' %
2147                    (testcase_name, expected_results,
2148                     after_stats.stats_per_method))
2149    finally:
2150        patch_url_map_backend_service(gcp, original_backend_service)
2151        set_validate_for_proxyless(gcp, True)
2152
2153
2154def test_csds(gcp, original_backend_service, instance_group, server_uri):
2155    test_csds_timeout_s = datetime.timedelta(minutes=5).total_seconds()
2156    sleep_interval_between_attempts_s = datetime.timedelta(
2157        seconds=2).total_seconds()
2158    logger.info('Running test_csds')
2159
2160    logger.info('waiting for original backends to become healthy')
2161    wait_for_healthy_backends(gcp, original_backend_service, instance_group)
2162
2163    # Test case timeout: 5 minutes
2164    deadline = time.time() + test_csds_timeout_s
2165    cnt = 0
2166    while time.time() <= deadline:
2167        client_config = get_client_xds_config_dump()
2168        logger.info('test_csds attempt %d: received xDS config %s', cnt,
2169                    json.dumps(client_config, indent=2))
2170        if client_config is not None:
2171            # Got the xDS config dump, now validate it
2172            ok = True
2173            try:
2174                if client_config['node']['locality']['zone'] != args.zone:
2175                    logger.info('Invalid zone %s != %s',
2176                                client_config['node']['locality']['zone'],
2177                                args.zone)
2178                    ok = False
2179                seen = set()
2180                for xds_config in client_config['xds_config']:
2181                    if 'listener_config' in xds_config:
2182                        listener_name = xds_config['listener_config'][
2183                            'dynamic_listeners'][0]['active_state']['listener'][
2184                                'name']
2185                        if listener_name != server_uri:
2186                            logger.info('Invalid Listener name %s != %s',
2187                                        listener_name, server_uri)
2188                            ok = False
2189                        else:
2190                            seen.add('lds')
2191                    elif 'route_config' in xds_config:
2192                        num_vh = len(
2193                            xds_config['route_config']['dynamic_route_configs']
2194                            [0]['route_config']['virtual_hosts'])
2195                        if num_vh <= 0:
2196                            logger.info('Invalid number of VirtualHosts %s',
2197                                        num_vh)
2198                            ok = False
2199                        else:
2200                            seen.add('rds')
2201                    elif 'cluster_config' in xds_config:
2202                        cluster_type = xds_config['cluster_config'][
2203                            'dynamic_active_clusters'][0]['cluster']['type']
2204                        if cluster_type != 'EDS':
2205                            logger.info('Invalid cluster type %s != EDS',
2206                                        cluster_type)
2207                            ok = False
2208                        else:
2209                            seen.add('cds')
2210                    elif 'endpoint_config' in xds_config:
2211                        sub_zone = xds_config["endpoint_config"][
2212                            "dynamic_endpoint_configs"][0]["endpoint_config"][
2213                                "endpoints"][0]["locality"]["sub_zone"]
2214                        if args.zone not in sub_zone:
2215                            logger.info('Invalid endpoint sub_zone %s',
2216                                        sub_zone)
2217                            ok = False
2218                        else:
2219                            seen.add('eds')
2220                want = {'lds', 'rds', 'cds', 'eds'}
2221                if seen != want:
2222                    logger.info('Incomplete xDS config dump, seen=%s', seen)
2223                    ok = False
2224            except:
2225                logger.exception('Error in xDS config dump:')
2226                ok = False
2227            finally:
2228                if ok:
2229                    # Successfully fetched xDS config, and they looks good.
2230                    logger.info('success')
2231                    return
2232        logger.info('test_csds attempt %d failed', cnt)
2233        # Give the client some time to fetch xDS resources
2234        time.sleep(sleep_interval_between_attempts_s)
2235        cnt += 1
2236
2237    raise RuntimeError('failed to receive a valid xDS config in %s seconds' %
2238                       test_csds_timeout_s)
2239
2240
2241def maybe_write_sponge_properties():
2242    """Writing test infos to enable more advanced testgrid searches."""
2243    if 'KOKORO_ARTIFACTS_DIR' not in os.environ:
2244        return
2245    if 'GIT_ORIGIN_URL' not in os.environ:
2246        return
2247    if 'GIT_COMMIT_SHORT' not in os.environ:
2248        return
2249    properties = [
2250        # Technically, 'TESTS_FORMAT_VERSION' is not required for run_xds_tests.
2251        # We keep it here so one day we may merge the process of writing sponge
2252        # properties.
2253        'TESTS_FORMAT_VERSION,2',
2254        'TESTGRID_EXCLUDE,%s' % os.environ.get('TESTGRID_EXCLUDE', 0),
2255        'GIT_ORIGIN_URL,%s' % os.environ['GIT_ORIGIN_URL'],
2256        'GIT_COMMIT_SHORT,%s' % os.environ['GIT_COMMIT_SHORT'],
2257    ]
2258    logger.info('Writing Sponge configs: %s', properties)
2259    with open(
2260            os.path.join(os.environ['KOKORO_ARTIFACTS_DIR'],
2261                         "custom_sponge_config.csv"), 'w') as f:
2262        f.write("\n".join(properties))
2263        f.write("\n")
2264
2265
2266def set_validate_for_proxyless(gcp, validate_for_proxyless):
2267    if not gcp.alpha_compute:
2268        logger.debug(
2269            'Not setting validateForProxy because alpha is not enabled')
2270        return
2271    # This function deletes global_forwarding_rule and target_proxy, then
2272    # recreate target_proxy with validateForProxyless=False. This is necessary
2273    # because patching target_grpc_proxy isn't supported.
2274    delete_global_forwarding_rule(gcp)
2275    delete_target_proxy(gcp)
2276    create_target_proxy(gcp, gcp.target_proxy.name, validate_for_proxyless)
2277    create_global_forwarding_rule(gcp, gcp.global_forwarding_rule.name,
2278                                  [gcp.service_port])
2279
2280
2281def get_serving_status(instance, service_port):
2282    with grpc.insecure_channel('%s:%d' % (instance, service_port)) as channel:
2283        health_stub = health_pb2_grpc.HealthStub(channel)
2284        return health_stub.Check(health_pb2.HealthCheckRequest())
2285
2286
2287def set_serving_status(instances, service_port, serving):
2288    logger.info('setting %s serving status to %s', instances, serving)
2289    for instance in instances:
2290        with grpc.insecure_channel('%s:%d' %
2291                                   (instance, service_port)) as channel:
2292            logger.info('setting %s serving status to %s', instance, serving)
2293            stub = test_pb2_grpc.XdsUpdateHealthServiceStub(channel)
2294            retry_count = 5
2295            for i in range(5):
2296                if serving:
2297                    stub.SetServing(empty_pb2.Empty())
2298                else:
2299                    stub.SetNotServing(empty_pb2.Empty())
2300                serving_status = get_serving_status(instance, service_port)
2301                logger.info('got instance service status %s', serving_status)
2302                want_status = health_pb2.HealthCheckResponse.SERVING if serving else health_pb2.HealthCheckResponse.NOT_SERVING
2303                if serving_status.status == want_status:
2304                    break
2305                if i == retry_count - 1:
2306                    raise Exception(
2307                        'failed to set instance service status after %d retries'
2308                        % retry_count)
2309
2310
2311def is_primary_instance_group(gcp, instance_group):
2312    # Clients may connect to a TD instance in a different region than the
2313    # client, in which case primary/secondary assignments may not be based on
2314    # the client's actual locality.
2315    instance_names = get_instance_names(gcp, instance_group)
2316    stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
2317    return all(peer in instance_names for peer in stats.rpcs_by_peer.keys())
2318
2319
2320def get_startup_script(path_to_server_binary, service_port):
2321    if path_to_server_binary:
2322        return 'nohup %s --port=%d 1>/dev/null &' % (path_to_server_binary,
2323                                                     service_port)
2324    else:
2325        return """#!/bin/bash
2326sudo apt update
2327sudo apt install -y git default-jdk
2328mkdir java_server
2329pushd java_server
2330git clone https://github.com/grpc/grpc-java.git
2331pushd grpc-java
2332pushd interop-testing
2333../gradlew installDist -x test -PskipCodegen=true -PskipAndroid=true
2334
2335nohup build/install/grpc-interop-testing/bin/xds-test-server \
2336    --port=%d 1>/dev/null &""" % service_port
2337
2338
2339def create_instance_template(gcp, name, network, source_image, machine_type,
2340                             startup_script):
2341    config = {
2342        'name': name,
2343        'properties': {
2344            'tags': {
2345                'items': ['allow-health-checks']
2346            },
2347            'machineType': machine_type,
2348            'serviceAccounts': [{
2349                'email': 'default',
2350                'scopes': ['https://www.googleapis.com/auth/cloud-platform',]
2351            }],
2352            'networkInterfaces': [{
2353                'accessConfigs': [{
2354                    'type': 'ONE_TO_ONE_NAT'
2355                }],
2356                'network': network
2357            }],
2358            'disks': [{
2359                'boot': True,
2360                'initializeParams': {
2361                    'sourceImage': source_image
2362                }
2363            }],
2364            'metadata': {
2365                'items': [{
2366                    'key': 'startup-script',
2367                    'value': startup_script
2368                }]
2369            }
2370        }
2371    }
2372
2373    logger.debug('Sending GCP request with body=%s', config)
2374    result = gcp.compute.instanceTemplates().insert(
2375        project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
2376    wait_for_global_operation(gcp, result['name'])
2377    gcp.instance_template = GcpResource(config['name'], result['targetLink'])
2378
2379
2380def add_instance_group(gcp, zone, name, size):
2381    config = {
2382        'name': name,
2383        'instanceTemplate': gcp.instance_template.url,
2384        'targetSize': size,
2385        'namedPorts': [{
2386            'name': 'grpc',
2387            'port': gcp.service_port
2388        }]
2389    }
2390
2391    logger.debug('Sending GCP request with body=%s', config)
2392    result = gcp.compute.instanceGroupManagers().insert(
2393        project=gcp.project, zone=zone,
2394        body=config).execute(num_retries=_GCP_API_RETRIES)
2395    wait_for_zone_operation(gcp, zone, result['name'])
2396    result = gcp.compute.instanceGroupManagers().get(
2397        project=gcp.project, zone=zone,
2398        instanceGroupManager=config['name']).execute(
2399            num_retries=_GCP_API_RETRIES)
2400    instance_group = InstanceGroup(config['name'], result['instanceGroup'],
2401                                   zone)
2402    gcp.instance_groups.append(instance_group)
2403    wait_for_instance_group_to_reach_expected_size(gcp, instance_group, size,
2404                                                   _WAIT_FOR_OPERATION_SEC)
2405    return instance_group
2406
2407
2408def create_health_check(gcp, name):
2409    if gcp.alpha_compute:
2410        config = {
2411            'name': name,
2412            'type': 'GRPC',
2413            'grpcHealthCheck': {
2414                'portSpecification': 'USE_SERVING_PORT'
2415            }
2416        }
2417        compute_to_use = gcp.alpha_compute
2418    else:
2419        config = {
2420            'name': name,
2421            'type': 'TCP',
2422            'tcpHealthCheck': {
2423                'portName': 'grpc'
2424            }
2425        }
2426        compute_to_use = gcp.compute
2427    logger.debug('Sending GCP request with body=%s', config)
2428    result = compute_to_use.healthChecks().insert(
2429        project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
2430    wait_for_global_operation(gcp, result['name'])
2431    gcp.health_check = GcpResource(config['name'], result['targetLink'])
2432
2433
2434def create_health_check_firewall_rule(gcp, name):
2435    config = {
2436        'name': name,
2437        'direction': 'INGRESS',
2438        'allowed': [{
2439            'IPProtocol': 'tcp'
2440        }],
2441        'sourceRanges': ['35.191.0.0/16', '130.211.0.0/22'],
2442        'targetTags': ['allow-health-checks'],
2443    }
2444    logger.debug('Sending GCP request with body=%s', config)
2445    result = gcp.compute.firewalls().insert(
2446        project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
2447    wait_for_global_operation(gcp, result['name'])
2448    gcp.health_check_firewall_rule = GcpResource(config['name'],
2449                                                 result['targetLink'])
2450
2451
2452def add_backend_service(gcp, name):
2453    if gcp.alpha_compute:
2454        protocol = 'GRPC'
2455        compute_to_use = gcp.alpha_compute
2456    else:
2457        protocol = 'HTTP2'
2458        compute_to_use = gcp.compute
2459    config = {
2460        'name': name,
2461        'loadBalancingScheme': 'INTERNAL_SELF_MANAGED',
2462        'healthChecks': [gcp.health_check.url],
2463        'portName': 'grpc',
2464        'protocol': protocol
2465    }
2466    logger.debug('Sending GCP request with body=%s', config)
2467    result = compute_to_use.backendServices().insert(
2468        project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
2469    wait_for_global_operation(gcp, result['name'])
2470    backend_service = GcpResource(config['name'], result['targetLink'])
2471    gcp.backend_services.append(backend_service)
2472    return backend_service
2473
2474
2475def create_url_map(gcp, name, backend_service, host_name):
2476    config = {
2477        'name': name,
2478        'defaultService': backend_service.url,
2479        'pathMatchers': [{
2480            'name': _PATH_MATCHER_NAME,
2481            'defaultService': backend_service.url,
2482        }],
2483        'hostRules': [{
2484            'hosts': [host_name],
2485            'pathMatcher': _PATH_MATCHER_NAME
2486        }]
2487    }
2488    logger.debug('Sending GCP request with body=%s', config)
2489    result = gcp.compute.urlMaps().insert(
2490        project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
2491    wait_for_global_operation(gcp, result['name'])
2492    gcp.url_map = GcpResource(config['name'], result['targetLink'])
2493
2494
2495def patch_url_map_host_rule_with_port(gcp, name, backend_service, host_name):
2496    config = {
2497        'hostRules': [{
2498            'hosts': ['%s:%d' % (host_name, gcp.service_port)],
2499            'pathMatcher': _PATH_MATCHER_NAME
2500        }]
2501    }
2502    logger.debug('Sending GCP request with body=%s', config)
2503    result = gcp.compute.urlMaps().patch(
2504        project=gcp.project, urlMap=name,
2505        body=config).execute(num_retries=_GCP_API_RETRIES)
2506    wait_for_global_operation(gcp, result['name'])
2507
2508
2509def create_target_proxy(gcp, name, validate_for_proxyless=True):
2510    if gcp.alpha_compute:
2511        config = {
2512            'name': name,
2513            'url_map': gcp.url_map.url,
2514            'validate_for_proxyless': validate_for_proxyless
2515        }
2516        logger.debug('Sending GCP request with body=%s', config)
2517        result = gcp.alpha_compute.targetGrpcProxies().insert(
2518            project=gcp.project,
2519            body=config).execute(num_retries=_GCP_API_RETRIES)
2520    else:
2521        config = {
2522            'name': name,
2523            'url_map': gcp.url_map.url,
2524        }
2525        logger.debug('Sending GCP request with body=%s', config)
2526        result = gcp.compute.targetHttpProxies().insert(
2527            project=gcp.project,
2528            body=config).execute(num_retries=_GCP_API_RETRIES)
2529    wait_for_global_operation(gcp, result['name'])
2530    gcp.target_proxy = GcpResource(config['name'], result['targetLink'])
2531
2532
2533def create_global_forwarding_rule(gcp,
2534                                  name,
2535                                  potential_ports,
2536                                  potential_ip_addresses=['0.0.0.0']):
2537    if gcp.alpha_compute:
2538        compute_to_use = gcp.alpha_compute
2539    else:
2540        compute_to_use = gcp.compute
2541    for port in potential_ports:
2542        for ip_address in potential_ip_addresses:
2543            try:
2544                config = {
2545                    'name': name,
2546                    'loadBalancingScheme': 'INTERNAL_SELF_MANAGED',
2547                    'portRange': str(port),
2548                    'IPAddress': ip_address,
2549                    'network': args.network,
2550                    'target': gcp.target_proxy.url,
2551                }
2552                logger.debug('Sending GCP request with body=%s', config)
2553                result = compute_to_use.globalForwardingRules().insert(
2554                    project=gcp.project,
2555                    body=config).execute(num_retries=_GCP_API_RETRIES)
2556                wait_for_global_operation(gcp, result['name'])
2557                gcp.global_forwarding_rule = GcpResource(
2558                    config['name'], result['targetLink'])
2559                gcp.service_port = port
2560                return
2561            except googleapiclient.errors.HttpError as http_error:
2562                logger.warning(
2563                    'Got error %s when attempting to create forwarding rule to '
2564                    '%s:%d. Retrying with another port.' %
2565                    (http_error, ip_address, port))
2566
2567
2568def get_health_check(gcp, health_check_name):
2569    result = gcp.compute.healthChecks().get(
2570        project=gcp.project, healthCheck=health_check_name).execute()
2571    gcp.health_check = GcpResource(health_check_name, result['selfLink'])
2572
2573
2574def get_health_check_firewall_rule(gcp, firewall_name):
2575    result = gcp.compute.firewalls().get(project=gcp.project,
2576                                         firewall=firewall_name).execute()
2577    gcp.health_check_firewall_rule = GcpResource(firewall_name,
2578                                                 result['selfLink'])
2579
2580
2581def get_backend_service(gcp, backend_service_name):
2582    result = gcp.compute.backendServices().get(
2583        project=gcp.project, backendService=backend_service_name).execute()
2584    backend_service = GcpResource(backend_service_name, result['selfLink'])
2585    gcp.backend_services.append(backend_service)
2586    return backend_service
2587
2588
2589def get_url_map(gcp, url_map_name):
2590    result = gcp.compute.urlMaps().get(project=gcp.project,
2591                                       urlMap=url_map_name).execute()
2592    gcp.url_map = GcpResource(url_map_name, result['selfLink'])
2593
2594
2595def get_target_proxy(gcp, target_proxy_name):
2596    if gcp.alpha_compute:
2597        result = gcp.alpha_compute.targetGrpcProxies().get(
2598            project=gcp.project, targetGrpcProxy=target_proxy_name).execute()
2599    else:
2600        result = gcp.compute.targetHttpProxies().get(
2601            project=gcp.project, targetHttpProxy=target_proxy_name).execute()
2602    gcp.target_proxy = GcpResource(target_proxy_name, result['selfLink'])
2603
2604
2605def get_global_forwarding_rule(gcp, forwarding_rule_name):
2606    result = gcp.compute.globalForwardingRules().get(
2607        project=gcp.project, forwardingRule=forwarding_rule_name).execute()
2608    gcp.global_forwarding_rule = GcpResource(forwarding_rule_name,
2609                                             result['selfLink'])
2610
2611
2612def get_instance_template(gcp, template_name):
2613    result = gcp.compute.instanceTemplates().get(
2614        project=gcp.project, instanceTemplate=template_name).execute()
2615    gcp.instance_template = GcpResource(template_name, result['selfLink'])
2616
2617
2618def get_instance_group(gcp, zone, instance_group_name):
2619    result = gcp.compute.instanceGroups().get(
2620        project=gcp.project, zone=zone,
2621        instanceGroup=instance_group_name).execute()
2622    gcp.service_port = result['namedPorts'][0]['port']
2623    instance_group = InstanceGroup(instance_group_name, result['selfLink'],
2624                                   zone)
2625    gcp.instance_groups.append(instance_group)
2626    return instance_group
2627
2628
2629def delete_global_forwarding_rule(gcp, name=None):
2630    if name:
2631        forwarding_rule_to_delete = name
2632    else:
2633        forwarding_rule_to_delete = gcp.global_forwarding_rule.name
2634    try:
2635        result = gcp.compute.globalForwardingRules().delete(
2636            project=gcp.project,
2637            forwardingRule=forwarding_rule_to_delete).execute(
2638                num_retries=_GCP_API_RETRIES)
2639        wait_for_global_operation(gcp, result['name'])
2640    except googleapiclient.errors.HttpError as http_error:
2641        logger.info('Delete failed: %s', http_error)
2642
2643
2644def delete_target_proxy(gcp, name=None):
2645    if name:
2646        proxy_to_delete = name
2647    else:
2648        proxy_to_delete = gcp.target_proxy.name
2649    try:
2650        if gcp.alpha_compute:
2651            result = gcp.alpha_compute.targetGrpcProxies().delete(
2652                project=gcp.project, targetGrpcProxy=proxy_to_delete).execute(
2653                    num_retries=_GCP_API_RETRIES)
2654        else:
2655            result = gcp.compute.targetHttpProxies().delete(
2656                project=gcp.project, targetHttpProxy=proxy_to_delete).execute(
2657                    num_retries=_GCP_API_RETRIES)
2658        wait_for_global_operation(gcp, result['name'])
2659    except googleapiclient.errors.HttpError as http_error:
2660        logger.info('Delete failed: %s', http_error)
2661
2662
2663def delete_url_map(gcp, name=None):
2664    if name:
2665        url_map_to_delete = name
2666    else:
2667        url_map_to_delete = gcp.url_map.name
2668    try:
2669        result = gcp.compute.urlMaps().delete(
2670            project=gcp.project,
2671            urlMap=url_map_to_delete).execute(num_retries=_GCP_API_RETRIES)
2672        wait_for_global_operation(gcp, result['name'])
2673    except googleapiclient.errors.HttpError as http_error:
2674        logger.info('Delete failed: %s', http_error)
2675
2676
2677def delete_backend_service(gcp, backend_service):
2678    try:
2679        result = gcp.compute.backendServices().delete(
2680            project=gcp.project, backendService=backend_service.name).execute(
2681                num_retries=_GCP_API_RETRIES)
2682        wait_for_global_operation(gcp, result['name'])
2683    except googleapiclient.errors.HttpError as http_error:
2684        logger.info('Delete failed: %s', http_error)
2685
2686
2687def delete_backend_services(gcp):
2688    for backend_service in gcp.backend_services:
2689        delete_backend_service(gcp, backend_service)
2690
2691
2692def delete_firewall(gcp):
2693    try:
2694        result = gcp.compute.firewalls().delete(
2695            project=gcp.project,
2696            firewall=gcp.health_check_firewall_rule.name).execute(
2697                num_retries=_GCP_API_RETRIES)
2698        wait_for_global_operation(gcp, result['name'])
2699    except googleapiclient.errors.HttpError as http_error:
2700        logger.info('Delete failed: %s', http_error)
2701
2702
2703def delete_health_check(gcp):
2704    try:
2705        result = gcp.compute.healthChecks().delete(
2706            project=gcp.project, healthCheck=gcp.health_check.name).execute(
2707                num_retries=_GCP_API_RETRIES)
2708        wait_for_global_operation(gcp, result['name'])
2709    except googleapiclient.errors.HttpError as http_error:
2710        logger.info('Delete failed: %s', http_error)
2711
2712
2713def delete_instance_groups(gcp):
2714    for instance_group in gcp.instance_groups:
2715        try:
2716            result = gcp.compute.instanceGroupManagers().delete(
2717                project=gcp.project,
2718                zone=instance_group.zone,
2719                instanceGroupManager=instance_group.name).execute(
2720                    num_retries=_GCP_API_RETRIES)
2721            wait_for_zone_operation(gcp,
2722                                    instance_group.zone,
2723                                    result['name'],
2724                                    timeout_sec=_WAIT_FOR_BACKEND_SEC)
2725        except googleapiclient.errors.HttpError as http_error:
2726            logger.info('Delete failed: %s', http_error)
2727
2728
2729def delete_instance_template(gcp):
2730    try:
2731        result = gcp.compute.instanceTemplates().delete(
2732            project=gcp.project,
2733            instanceTemplate=gcp.instance_template.name).execute(
2734                num_retries=_GCP_API_RETRIES)
2735        wait_for_global_operation(gcp, result['name'])
2736    except googleapiclient.errors.HttpError as http_error:
2737        logger.info('Delete failed: %s', http_error)
2738
2739
2740def patch_backend_service(gcp,
2741                          backend_service,
2742                          instance_groups,
2743                          balancing_mode='UTILIZATION',
2744                          max_rate=1,
2745                          circuit_breakers=None):
2746    if gcp.alpha_compute:
2747        compute_to_use = gcp.alpha_compute
2748    else:
2749        compute_to_use = gcp.compute
2750    config = {
2751        'backends': [{
2752            'group': instance_group.url,
2753            'balancingMode': balancing_mode,
2754            'maxRate': max_rate if balancing_mode == 'RATE' else None
2755        } for instance_group in instance_groups],
2756        'circuitBreakers': circuit_breakers,
2757    }
2758    logger.debug('Sending GCP request with body=%s', config)
2759    result = compute_to_use.backendServices().patch(
2760        project=gcp.project, backendService=backend_service.name,
2761        body=config).execute(num_retries=_GCP_API_RETRIES)
2762    wait_for_global_operation(gcp,
2763                              result['name'],
2764                              timeout_sec=_WAIT_FOR_BACKEND_SEC)
2765
2766
2767def resize_instance_group(gcp,
2768                          instance_group,
2769                          new_size,
2770                          timeout_sec=_WAIT_FOR_OPERATION_SEC):
2771    result = gcp.compute.instanceGroupManagers().resize(
2772        project=gcp.project,
2773        zone=instance_group.zone,
2774        instanceGroupManager=instance_group.name,
2775        size=new_size).execute(num_retries=_GCP_API_RETRIES)
2776    wait_for_zone_operation(gcp,
2777                            instance_group.zone,
2778                            result['name'],
2779                            timeout_sec=360)
2780    wait_for_instance_group_to_reach_expected_size(gcp, instance_group,
2781                                                   new_size, timeout_sec)
2782
2783
2784def patch_url_map_backend_service(gcp,
2785                                  backend_service=None,
2786                                  services_with_weights=None,
2787                                  route_rules=None):
2788    '''change url_map's backend service
2789
2790    Only one of backend_service and service_with_weights can be not None.
2791    '''
2792    if gcp.alpha_compute:
2793        compute_to_use = gcp.alpha_compute
2794    else:
2795        compute_to_use = gcp.compute
2796
2797    if backend_service and services_with_weights:
2798        raise ValueError(
2799            'both backend_service and service_with_weights are not None.')
2800
2801    default_service = backend_service.url if backend_service else None
2802    default_route_action = {
2803        'weightedBackendServices': [{
2804            'backendService': service.url,
2805            'weight': w,
2806        } for service, w in services_with_weights.items()]
2807    } if services_with_weights else None
2808
2809    config = {
2810        'pathMatchers': [{
2811            'name': _PATH_MATCHER_NAME,
2812            'defaultService': default_service,
2813            'defaultRouteAction': default_route_action,
2814            'routeRules': route_rules,
2815        }]
2816    }
2817    logger.debug('Sending GCP request with body=%s', config)
2818    result = compute_to_use.urlMaps().patch(
2819        project=gcp.project, urlMap=gcp.url_map.name,
2820        body=config).execute(num_retries=_GCP_API_RETRIES)
2821    wait_for_global_operation(gcp, result['name'])
2822
2823
2824def wait_for_instance_group_to_reach_expected_size(gcp, instance_group,
2825                                                   expected_size, timeout_sec):
2826    start_time = time.time()
2827    while True:
2828        current_size = len(get_instance_names(gcp, instance_group))
2829        if current_size == expected_size:
2830            break
2831        if time.time() - start_time > timeout_sec:
2832            raise Exception(
2833                'Instance group had expected size %d but actual size %d' %
2834                (expected_size, current_size))
2835        time.sleep(2)
2836
2837
2838def wait_for_global_operation(gcp,
2839                              operation,
2840                              timeout_sec=_WAIT_FOR_OPERATION_SEC):
2841    start_time = time.time()
2842    while time.time() - start_time <= timeout_sec:
2843        result = gcp.compute.globalOperations().get(
2844            project=gcp.project,
2845            operation=operation).execute(num_retries=_GCP_API_RETRIES)
2846        if result['status'] == 'DONE':
2847            if 'error' in result:
2848                raise Exception(result['error'])
2849            return
2850        time.sleep(2)
2851    raise Exception('Operation %s did not complete within %d' %
2852                    (operation, timeout_sec))
2853
2854
2855def wait_for_zone_operation(gcp,
2856                            zone,
2857                            operation,
2858                            timeout_sec=_WAIT_FOR_OPERATION_SEC):
2859    start_time = time.time()
2860    while time.time() - start_time <= timeout_sec:
2861        result = gcp.compute.zoneOperations().get(
2862            project=gcp.project, zone=zone,
2863            operation=operation).execute(num_retries=_GCP_API_RETRIES)
2864        if result['status'] == 'DONE':
2865            if 'error' in result:
2866                raise Exception(result['error'])
2867            return
2868        time.sleep(2)
2869    raise Exception('Operation %s did not complete within %d' %
2870                    (operation, timeout_sec))
2871
2872
2873def wait_for_healthy_backends(gcp,
2874                              backend_service,
2875                              instance_group,
2876                              timeout_sec=_WAIT_FOR_BACKEND_SEC):
2877    start_time = time.time()
2878    config = {'group': instance_group.url}
2879    instance_names = get_instance_names(gcp, instance_group)
2880    expected_size = len(instance_names)
2881    while time.time() - start_time <= timeout_sec:
2882        for instance_name in instance_names:
2883            try:
2884                status = get_serving_status(instance_name, gcp.service_port)
2885                logger.info('serving status response from %s: %s',
2886                            instance_name, status)
2887            except grpc.RpcError as rpc_error:
2888                logger.info('checking serving status of %s failed: %s',
2889                            instance_name, rpc_error)
2890        result = gcp.compute.backendServices().getHealth(
2891            project=gcp.project,
2892            backendService=backend_service.name,
2893            body=config).execute(num_retries=_GCP_API_RETRIES)
2894        if 'healthStatus' in result:
2895            logger.info('received GCP healthStatus: %s', result['healthStatus'])
2896            healthy = True
2897            for instance in result['healthStatus']:
2898                if instance['healthState'] != 'HEALTHY':
2899                    healthy = False
2900                    break
2901            if healthy and expected_size == len(result['healthStatus']):
2902                return
2903        else:
2904            logger.info('no healthStatus received from GCP')
2905        time.sleep(5)
2906    raise Exception('Not all backends became healthy within %d seconds: %s' %
2907                    (timeout_sec, result))
2908
2909
2910def get_instance_names(gcp, instance_group):
2911    instance_names = []
2912    result = gcp.compute.instanceGroups().listInstances(
2913        project=gcp.project,
2914        zone=instance_group.zone,
2915        instanceGroup=instance_group.name,
2916        body={
2917            'instanceState': 'ALL'
2918        }).execute(num_retries=_GCP_API_RETRIES)
2919    if 'items' not in result:
2920        return []
2921    for item in result['items']:
2922        # listInstances() returns the full URL of the instance, which ends with
2923        # the instance name. compute.instances().get() requires using the
2924        # instance name (not the full URL) to look up instance details, so we
2925        # just extract the name manually.
2926        instance_name = item['instance'].split('/')[-1]
2927        instance_names.append(instance_name)
2928    logger.info('retrieved instance names: %s', instance_names)
2929    return instance_names
2930
2931
2932def clean_up(gcp):
2933    if gcp.global_forwarding_rule:
2934        delete_global_forwarding_rule(gcp)
2935    if gcp.target_proxy:
2936        delete_target_proxy(gcp)
2937    if gcp.url_map:
2938        delete_url_map(gcp)
2939    delete_backend_services(gcp)
2940    if gcp.health_check_firewall_rule:
2941        delete_firewall(gcp)
2942    if gcp.health_check:
2943        delete_health_check(gcp)
2944    delete_instance_groups(gcp)
2945    if gcp.instance_template:
2946        delete_instance_template(gcp)
2947
2948
2949class InstanceGroup(object):
2950
2951    def __init__(self, name, url, zone):
2952        self.name = name
2953        self.url = url
2954        self.zone = zone
2955
2956
2957class GcpResource(object):
2958
2959    def __init__(self, name, url):
2960        self.name = name
2961        self.url = url
2962
2963
2964class GcpState(object):
2965
2966    def __init__(self, compute, alpha_compute, project, project_num):
2967        self.compute = compute
2968        self.alpha_compute = alpha_compute
2969        self.project = project
2970        self.project_num = project_num
2971        self.health_check = None
2972        self.health_check_firewall_rule = None
2973        self.backend_services = []
2974        self.url_map = None
2975        self.target_proxy = None
2976        self.global_forwarding_rule = None
2977        self.service_port = None
2978        self.instance_template = None
2979        self.instance_groups = []
2980
2981
2982maybe_write_sponge_properties()
2983alpha_compute = None
2984if args.compute_discovery_document:
2985    with open(args.compute_discovery_document, 'r') as discovery_doc:
2986        compute = googleapiclient.discovery.build_from_document(
2987            discovery_doc.read())
2988    if not args.only_stable_gcp_apis and args.alpha_compute_discovery_document:
2989        with open(args.alpha_compute_discovery_document, 'r') as discovery_doc:
2990            alpha_compute = googleapiclient.discovery.build_from_document(
2991                discovery_doc.read())
2992else:
2993    compute = googleapiclient.discovery.build('compute', 'v1')
2994    if not args.only_stable_gcp_apis:
2995        alpha_compute = googleapiclient.discovery.build('compute', 'alpha')
2996
2997try:
2998    gcp = GcpState(compute, alpha_compute, args.project_id, args.project_num)
2999    gcp_suffix = args.gcp_suffix
3000    health_check_name = _BASE_HEALTH_CHECK_NAME + gcp_suffix
3001    if not args.use_existing_gcp_resources:
3002        if args.keep_gcp_resources:
3003            # Auto-generating a unique suffix in case of conflict should not be
3004            # combined with --keep_gcp_resources, as the suffix actually used
3005            # for GCP resources will not match the provided --gcp_suffix value.
3006            num_attempts = 1
3007        else:
3008            num_attempts = 5
3009        for i in range(num_attempts):
3010            try:
3011                logger.info('Using GCP suffix %s', gcp_suffix)
3012                create_health_check(gcp, health_check_name)
3013                break
3014            except googleapiclient.errors.HttpError as http_error:
3015                gcp_suffix = '%s-%04d' % (gcp_suffix, random.randint(0, 9999))
3016                health_check_name = _BASE_HEALTH_CHECK_NAME + gcp_suffix
3017                logger.exception('HttpError when creating health check')
3018        if gcp.health_check is None:
3019            raise Exception('Failed to create health check name after %d '
3020                            'attempts' % num_attempts)
3021    firewall_name = _BASE_FIREWALL_RULE_NAME + gcp_suffix
3022    backend_service_name = _BASE_BACKEND_SERVICE_NAME + gcp_suffix
3023    alternate_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-alternate' + gcp_suffix
3024    url_map_name = _BASE_URL_MAP_NAME + gcp_suffix
3025    service_host_name = _BASE_SERVICE_HOST + gcp_suffix
3026    target_proxy_name = _BASE_TARGET_PROXY_NAME + gcp_suffix
3027    forwarding_rule_name = _BASE_FORWARDING_RULE_NAME + gcp_suffix
3028    template_name = _BASE_TEMPLATE_NAME + gcp_suffix
3029    instance_group_name = _BASE_INSTANCE_GROUP_NAME + gcp_suffix
3030    same_zone_instance_group_name = _BASE_INSTANCE_GROUP_NAME + '-same-zone' + gcp_suffix
3031    secondary_zone_instance_group_name = _BASE_INSTANCE_GROUP_NAME + '-secondary-zone' + gcp_suffix
3032    if args.use_existing_gcp_resources:
3033        logger.info('Reusing existing GCP resources')
3034        get_health_check(gcp, health_check_name)
3035        try:
3036            get_health_check_firewall_rule(gcp, firewall_name)
3037        except googleapiclient.errors.HttpError as http_error:
3038            # Firewall rule may be auto-deleted periodically depending on GCP
3039            # project settings.
3040            logger.exception('Failed to find firewall rule, recreating')
3041            create_health_check_firewall_rule(gcp, firewall_name)
3042        backend_service = get_backend_service(gcp, backend_service_name)
3043        alternate_backend_service = get_backend_service(
3044            gcp, alternate_backend_service_name)
3045        get_url_map(gcp, url_map_name)
3046        get_target_proxy(gcp, target_proxy_name)
3047        get_global_forwarding_rule(gcp, forwarding_rule_name)
3048        get_instance_template(gcp, template_name)
3049        instance_group = get_instance_group(gcp, args.zone, instance_group_name)
3050        same_zone_instance_group = get_instance_group(
3051            gcp, args.zone, same_zone_instance_group_name)
3052        secondary_zone_instance_group = get_instance_group(
3053            gcp, args.secondary_zone, secondary_zone_instance_group_name)
3054    else:
3055        create_health_check_firewall_rule(gcp, firewall_name)
3056        backend_service = add_backend_service(gcp, backend_service_name)
3057        alternate_backend_service = add_backend_service(
3058            gcp, alternate_backend_service_name)
3059        create_url_map(gcp, url_map_name, backend_service, service_host_name)
3060        create_target_proxy(gcp, target_proxy_name)
3061        potential_service_ports = list(args.service_port_range)
3062        random.shuffle(potential_service_ports)
3063        create_global_forwarding_rule(gcp, forwarding_rule_name,
3064                                      potential_service_ports)
3065        if not gcp.service_port:
3066            raise Exception(
3067                'Failed to find a valid ip:port for the forwarding rule')
3068        if gcp.service_port != _DEFAULT_SERVICE_PORT:
3069            patch_url_map_host_rule_with_port(gcp, url_map_name,
3070                                              backend_service,
3071                                              service_host_name)
3072        startup_script = get_startup_script(args.path_to_server_binary,
3073                                            gcp.service_port)
3074        create_instance_template(gcp, template_name, args.network,
3075                                 args.source_image, args.machine_type,
3076                                 startup_script)
3077        instance_group = add_instance_group(gcp, args.zone, instance_group_name,
3078                                            _INSTANCE_GROUP_SIZE)
3079        patch_backend_service(gcp, backend_service, [instance_group])
3080        same_zone_instance_group = add_instance_group(
3081            gcp, args.zone, same_zone_instance_group_name, _INSTANCE_GROUP_SIZE)
3082        secondary_zone_instance_group = add_instance_group(
3083            gcp, args.secondary_zone, secondary_zone_instance_group_name,
3084            _INSTANCE_GROUP_SIZE)
3085
3086    wait_for_healthy_backends(gcp, backend_service, instance_group)
3087
3088    if args.test_case:
3089        client_env = dict(os.environ)
3090        if original_grpc_trace:
3091            client_env['GRPC_TRACE'] = original_grpc_trace
3092        if original_grpc_verbosity:
3093            client_env['GRPC_VERBOSITY'] = original_grpc_verbosity
3094        bootstrap_server_features = []
3095
3096        if gcp.service_port == _DEFAULT_SERVICE_PORT:
3097            server_uri = service_host_name
3098        else:
3099            server_uri = service_host_name + ':' + str(gcp.service_port)
3100        if args.xds_v3_support:
3101            client_env['GRPC_XDS_EXPERIMENTAL_V3_SUPPORT'] = 'true'
3102            bootstrap_server_features.append('xds_v3')
3103        if args.bootstrap_file:
3104            bootstrap_path = os.path.abspath(args.bootstrap_file)
3105        else:
3106            with tempfile.NamedTemporaryFile(delete=False) as bootstrap_file:
3107                bootstrap_file.write(
3108                    _BOOTSTRAP_TEMPLATE.format(
3109                        node_id='projects/%s/networks/%s/nodes/%s' %
3110                        (gcp.project_num, args.network.split('/')[-1],
3111                         uuid.uuid1()),
3112                        server_features=json.dumps(
3113                            bootstrap_server_features)).encode('utf-8'))
3114                bootstrap_path = bootstrap_file.name
3115        client_env['GRPC_XDS_BOOTSTRAP'] = bootstrap_path
3116        client_env['GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING'] = 'true'
3117        client_env['GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT'] = 'true'
3118        client_env['GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION'] = 'true'
3119        test_results = {}
3120        failed_tests = []
3121        for test_case in args.test_case:
3122            if test_case in _V3_TEST_CASES and not args.xds_v3_support:
3123                logger.info('skipping test %s due to missing v3 support',
3124                            test_case)
3125                continue
3126            if test_case in _ALPHA_TEST_CASES and not gcp.alpha_compute:
3127                logger.info('skipping test %s due to missing alpha support',
3128                            test_case)
3129                continue
3130            if test_case in [
3131                    'api_listener', 'forwarding_rule_port_match',
3132                    'forwarding_rule_default_port'
3133            ] and CLIENT_HOSTS:
3134                logger.info(
3135                    'skipping test %s because test configuration is'
3136                    'not compatible with client processes on existing'
3137                    'client hosts', test_case)
3138                continue
3139            if test_case == 'forwarding_rule_default_port':
3140                server_uri = service_host_name
3141            result = jobset.JobResult()
3142            log_dir = os.path.join(_TEST_LOG_BASE_DIR, test_case)
3143            if not os.path.exists(log_dir):
3144                os.makedirs(log_dir)
3145            test_log_filename = os.path.join(log_dir, _SPONGE_LOG_NAME)
3146            test_log_file = open(test_log_filename, 'w+')
3147            client_process = None
3148
3149            if test_case in _TESTS_TO_RUN_MULTIPLE_RPCS:
3150                rpcs_to_send = '--rpc="UnaryCall,EmptyCall"'
3151            else:
3152                rpcs_to_send = '--rpc="UnaryCall"'
3153
3154            if test_case in _TESTS_TO_SEND_METADATA:
3155                metadata_to_send = '--metadata="EmptyCall:{keyE}:{valueE},UnaryCall:{keyU}:{valueU},UnaryCall:{keyNU}:{valueNU}"'.format(
3156                    keyE=_TEST_METADATA_KEY,
3157                    valueE=_TEST_METADATA_VALUE_EMPTY,
3158                    keyU=_TEST_METADATA_KEY,
3159                    valueU=_TEST_METADATA_VALUE_UNARY,
3160                    keyNU=_TEST_METADATA_NUMERIC_KEY,
3161                    valueNU=_TEST_METADATA_NUMERIC_VALUE)
3162            else:
3163                # Setting the arg explicitly to empty with '--metadata=""'
3164                # makes C# client fail
3165                # (see https://github.com/commandlineparser/commandline/issues/412),
3166                # so instead we just rely on clients using the default when
3167                # metadata arg is not specified.
3168                metadata_to_send = ''
3169
3170            # TODO(ericgribkoff) Temporarily disable fail_on_failed_rpc checks
3171            # in the client. This means we will ignore intermittent RPC
3172            # failures (but this framework still checks that the final result
3173            # is as expected).
3174            #
3175            # Reason for disabling this is, the resources are shared by
3176            # multiple tests, and a change in previous test could be delayed
3177            # until the second test starts. The second test may see
3178            # intermittent failures because of that.
3179            #
3180            # A fix is to not share resources between tests (though that does
3181            # mean the tests will be significantly slower due to creating new
3182            # resources).
3183            fail_on_failed_rpc = ''
3184
3185            try:
3186                if not CLIENT_HOSTS:
3187                    client_cmd_formatted = args.client_cmd.format(
3188                        server_uri=server_uri,
3189                        stats_port=args.stats_port,
3190                        qps=args.qps,
3191                        fail_on_failed_rpc=fail_on_failed_rpc,
3192                        rpcs_to_send=rpcs_to_send,
3193                        metadata_to_send=metadata_to_send)
3194                    logger.debug('running client: %s', client_cmd_formatted)
3195                    client_cmd = shlex.split(client_cmd_formatted)
3196                    client_process = subprocess.Popen(client_cmd,
3197                                                      env=client_env,
3198                                                      stderr=subprocess.STDOUT,
3199                                                      stdout=test_log_file)
3200                if test_case == 'backends_restart':
3201                    test_backends_restart(gcp, backend_service, instance_group)
3202                elif test_case == 'change_backend_service':
3203                    test_change_backend_service(gcp, backend_service,
3204                                                instance_group,
3205                                                alternate_backend_service,
3206                                                same_zone_instance_group)
3207                elif test_case == 'gentle_failover':
3208                    test_gentle_failover(gcp, backend_service, instance_group,
3209                                         secondary_zone_instance_group)
3210                elif test_case == 'load_report_based_failover':
3211                    test_load_report_based_failover(
3212                        gcp, backend_service, instance_group,
3213                        secondary_zone_instance_group)
3214                elif test_case == 'ping_pong':
3215                    test_ping_pong(gcp, backend_service, instance_group)
3216                elif test_case == 'remove_instance_group':
3217                    test_remove_instance_group(gcp, backend_service,
3218                                               instance_group,
3219                                               same_zone_instance_group)
3220                elif test_case == 'round_robin':
3221                    test_round_robin(gcp, backend_service, instance_group)
3222                elif test_case == 'secondary_locality_gets_no_requests_on_partial_primary_failure':
3223                    test_secondary_locality_gets_no_requests_on_partial_primary_failure(
3224                        gcp, backend_service, instance_group,
3225                        secondary_zone_instance_group)
3226                elif test_case == 'secondary_locality_gets_requests_on_primary_failure':
3227                    test_secondary_locality_gets_requests_on_primary_failure(
3228                        gcp, backend_service, instance_group,
3229                        secondary_zone_instance_group)
3230                elif test_case == 'traffic_splitting':
3231                    test_traffic_splitting(gcp, backend_service, instance_group,
3232                                           alternate_backend_service,
3233                                           same_zone_instance_group)
3234                elif test_case == 'path_matching':
3235                    test_path_matching(gcp, backend_service, instance_group,
3236                                       alternate_backend_service,
3237                                       same_zone_instance_group)
3238                elif test_case == 'header_matching':
3239                    test_header_matching(gcp, backend_service, instance_group,
3240                                         alternate_backend_service,
3241                                         same_zone_instance_group)
3242                elif test_case == 'circuit_breaking':
3243                    test_circuit_breaking(gcp, backend_service, instance_group,
3244                                          same_zone_instance_group)
3245                elif test_case == 'timeout':
3246                    test_timeout(gcp, backend_service, instance_group)
3247                elif test_case == 'fault_injection':
3248                    test_fault_injection(gcp, backend_service, instance_group)
3249                elif test_case == 'api_listener':
3250                    server_uri = test_api_listener(gcp, backend_service,
3251                                                   instance_group,
3252                                                   alternate_backend_service)
3253                elif test_case == 'forwarding_rule_port_match':
3254                    server_uri = test_forwarding_rule_port_match(
3255                        gcp, backend_service, instance_group)
3256                elif test_case == 'forwarding_rule_default_port':
3257                    server_uri = test_forwarding_rule_default_port(
3258                        gcp, backend_service, instance_group)
3259                elif test_case == 'metadata_filter':
3260                    test_metadata_filter(gcp, backend_service, instance_group,
3261                                         alternate_backend_service,
3262                                         same_zone_instance_group)
3263                elif test_case == 'csds':
3264                    test_csds(gcp, backend_service, instance_group, server_uri)
3265                else:
3266                    logger.error('Unknown test case: %s', test_case)
3267                    sys.exit(1)
3268                if client_process and client_process.poll() is not None:
3269                    raise Exception(
3270                        'Client process exited prematurely with exit code %d' %
3271                        client_process.returncode)
3272                result.state = 'PASSED'
3273                result.returncode = 0
3274            except Exception as e:
3275                logger.exception('Test case %s failed', test_case)
3276                failed_tests.append(test_case)
3277                result.state = 'FAILED'
3278                result.message = str(e)
3279            finally:
3280                if client_process:
3281                    if client_process.returncode:
3282                        logger.info('Client exited with code %d' %
3283                                    client_process.returncode)
3284                    else:
3285                        client_process.terminate()
3286                test_log_file.close()
3287                # Workaround for Python 3, as report_utils will invoke decode() on
3288                # result.message, which has a default value of ''.
3289                result.message = result.message.encode('UTF-8')
3290                test_results[test_case] = [result]
3291                if args.log_client_output:
3292                    logger.info('Client output:')
3293                    with open(test_log_filename, 'r') as client_output:
3294                        logger.info(client_output.read())
3295        if not os.path.exists(_TEST_LOG_BASE_DIR):
3296            os.makedirs(_TEST_LOG_BASE_DIR)
3297        report_utils.render_junit_xml_report(test_results,
3298                                             os.path.join(
3299                                                 _TEST_LOG_BASE_DIR,
3300                                                 _SPONGE_XML_NAME),
3301                                             suite_name='xds_tests',
3302                                             multi_target=True)
3303        if failed_tests:
3304            logger.error('Test case(s) %s failed', failed_tests)
3305            sys.exit(1)
3306finally:
3307    if not args.keep_gcp_resources:
3308        logger.info('Cleaning up GCP resources. This may take some time.')
3309        clean_up(gcp)
3310