• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2# Copyright 2020 gRPC authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Run xDS integration tests on GCP using Traffic Director."""
16
17import argparse
18import googleapiclient.discovery
19import grpc
20import logging
21import os
22import random
23import shlex
24import socket
25import subprocess
26import sys
27import tempfile
28import time
29
30from oauth2client.client import GoogleCredentials
31
32import python_utils.jobset as jobset
33import python_utils.report_utils as report_utils
34
35from src.proto.grpc.testing import empty_pb2
36from src.proto.grpc.testing import messages_pb2
37from src.proto.grpc.testing import test_pb2_grpc
38
39logger = logging.getLogger()
40console_handler = logging.StreamHandler()
41formatter = logging.Formatter(fmt='%(asctime)s: %(levelname)-8s %(message)s')
42console_handler.setFormatter(formatter)
43logger.handlers = []
44logger.addHandler(console_handler)
45logger.setLevel(logging.WARNING)
46
47_TEST_CASES = [
48    'backends_restart',
49    'change_backend_service',
50    'gentle_failover',
51    'new_instance_group_receives_traffic',
52    'ping_pong',
53    'remove_instance_group',
54    'round_robin',
55    'secondary_locality_gets_no_requests_on_partial_primary_failure',
56    'secondary_locality_gets_requests_on_primary_failure',
57    'traffic_splitting',
58]
59# Valid test cases, but not in all. So the tests can only run manually, and
60# aren't enabled automatically for all languages.
61#
62# TODO: Move them into _TEST_CASES when support is ready in all languages.
63_ADDITIONAL_TEST_CASES = ['path_matching', 'header_matching']
64
65
66def parse_test_cases(arg):
67    if arg == '':
68        return []
69    arg_split = arg.split(',')
70    test_cases = set()
71    all_test_cases = _TEST_CASES + _ADDITIONAL_TEST_CASES
72    for arg in arg_split:
73        if arg == "all":
74            test_cases = test_cases.union(_TEST_CASES)
75        else:
76            test_cases = test_cases.union([arg])
77    if not all([test_case in all_test_cases for test_case in test_cases]):
78        raise Exception('Failed to parse test cases %s' % arg)
79    # Perserve order.
80    return [x for x in all_test_cases if x in test_cases]
81
82
83def parse_port_range(port_arg):
84    try:
85        port = int(port_arg)
86        return range(port, port + 1)
87    except:
88        port_min, port_max = port_arg.split(':')
89        return range(int(port_min), int(port_max) + 1)
90
91
92argp = argparse.ArgumentParser(description='Run xDS interop tests on GCP')
93argp.add_argument('--project_id', help='GCP project id')
94argp.add_argument(
95    '--gcp_suffix',
96    default='',
97    help='Optional suffix for all generated GCP resource names. Useful to '
98    'ensure distinct names across test runs.')
99argp.add_argument(
100    '--test_case',
101    default='ping_pong',
102    type=parse_test_cases,
103    help='Comma-separated list of test cases to run. Available tests: %s, '
104    '(or \'all\' to run every test). '
105    'Alternative tests not included in \'all\': %s' %
106    (','.join(_TEST_CASES), ','.join(_ADDITIONAL_TEST_CASES)))
107argp.add_argument(
108    '--bootstrap_file',
109    default='',
110    help='File to reference via GRPC_XDS_BOOTSTRAP. Disables built-in '
111    'bootstrap generation')
112argp.add_argument(
113    '--client_cmd',
114    default=None,
115    help='Command to launch xDS test client. {server_uri}, {stats_port} and '
116    '{qps} references will be replaced using str.format(). GRPC_XDS_BOOTSTRAP '
117    'will be set for the command')
118argp.add_argument('--zone', default='us-central1-a')
119argp.add_argument('--secondary_zone',
120                  default='us-west1-b',
121                  help='Zone to use for secondary TD locality tests')
122argp.add_argument('--qps', default=100, type=int, help='Client QPS')
123argp.add_argument(
124    '--wait_for_backend_sec',
125    default=1200,
126    type=int,
127    help='Time limit for waiting for created backend services to report '
128    'healthy when launching or updated GCP resources')
129argp.add_argument(
130    '--use_existing_gcp_resources',
131    default=False,
132    action='store_true',
133    help=
134    'If set, find and use already created GCP resources instead of creating new'
135    ' ones.')
136argp.add_argument(
137    '--keep_gcp_resources',
138    default=False,
139    action='store_true',
140    help=
141    'Leave GCP VMs and configuration running after test. Default behavior is '
142    'to delete when tests complete.')
143argp.add_argument(
144    '--compute_discovery_document',
145    default=None,
146    type=str,
147    help=
148    'If provided, uses this file instead of retrieving via the GCP discovery '
149    'API')
150argp.add_argument(
151    '--alpha_compute_discovery_document',
152    default=None,
153    type=str,
154    help='If provided, uses this file instead of retrieving via the alpha GCP '
155    'discovery API')
156argp.add_argument('--network',
157                  default='global/networks/default',
158                  help='GCP network to use')
159argp.add_argument('--service_port_range',
160                  default='8080:8110',
161                  type=parse_port_range,
162                  help='Listening port for created gRPC backends. Specified as '
163                  'either a single int or as a range in the format min:max, in '
164                  'which case an available port p will be chosen s.t. min <= p '
165                  '<= max')
166argp.add_argument(
167    '--stats_port',
168    default=8079,
169    type=int,
170    help='Local port for the client process to expose the LB stats service')
171argp.add_argument('--xds_server',
172                  default='trafficdirector.googleapis.com:443',
173                  help='xDS server')
174argp.add_argument('--source_image',
175                  default='projects/debian-cloud/global/images/family/debian-9',
176                  help='Source image for VMs created during the test')
177argp.add_argument('--path_to_server_binary',
178                  default=None,
179                  type=str,
180                  help='If set, the server binary must already be pre-built on '
181                  'the specified source image')
182argp.add_argument('--machine_type',
183                  default='e2-standard-2',
184                  help='Machine type for VMs created during the test')
185argp.add_argument(
186    '--instance_group_size',
187    default=2,
188    type=int,
189    help='Number of VMs to create per instance group. Certain test cases (e.g., '
190    'round_robin) may not give meaningful results if this is set to a value '
191    'less than 2.')
192argp.add_argument('--verbose',
193                  help='verbose log output',
194                  default=False,
195                  action='store_true')
196# TODO(ericgribkoff) Remove this param once the sponge-formatted log files are
197# visible in all test environments.
198argp.add_argument('--log_client_output',
199                  help='Log captured client output',
200                  default=False,
201                  action='store_true')
202# TODO(ericgribkoff) Remove this flag once all test environments are verified to
203# have access to the alpha compute APIs.
204argp.add_argument('--only_stable_gcp_apis',
205                  help='Do not use alpha compute APIs. Some tests may be '
206                  'incompatible with this option (gRPC health checks are '
207                  'currently alpha and required for simulating server failure',
208                  default=False,
209                  action='store_true')
210args = argp.parse_args()
211
212if args.verbose:
213    logger.setLevel(logging.DEBUG)
214
215_DEFAULT_SERVICE_PORT = 80
216_WAIT_FOR_BACKEND_SEC = args.wait_for_backend_sec
217_WAIT_FOR_OPERATION_SEC = 300
218_INSTANCE_GROUP_SIZE = args.instance_group_size
219_NUM_TEST_RPCS = 10 * args.qps
220_WAIT_FOR_STATS_SEC = 180
221_WAIT_FOR_VALID_CONFIG_SEC = 60
222_WAIT_FOR_URL_MAP_PATCH_SEC = 300
223_CONNECTION_TIMEOUT_SEC = 60
224_GCP_API_RETRIES = 5
225_BOOTSTRAP_TEMPLATE = """
226{{
227  "node": {{
228    "id": "{node_id}",
229    "metadata": {{
230      "TRAFFICDIRECTOR_NETWORK_NAME": "%s"
231    }},
232    "locality": {{
233      "zone": "%s"
234    }}
235  }},
236  "xds_servers": [{{
237    "server_uri": "%s",
238    "channel_creds": [
239      {{
240        "type": "google_default",
241        "config": {{}}
242      }}
243    ]
244  }}]
245}}""" % (args.network.split('/')[-1], args.zone, args.xds_server)
246
247# TODO(ericgribkoff) Add change_backend_service to this list once TD no longer
248# sends an update with no localities when adding the MIG to the backend service
249# can race with the URL map patch.
250_TESTS_TO_FAIL_ON_RPC_FAILURE = [
251    'new_instance_group_receives_traffic', 'ping_pong', 'round_robin'
252]
253# Tests that run UnaryCall and EmptyCall.
254_TESTS_TO_RUN_MULTIPLE_RPCS = ['path_matching', 'header_matching']
255# Tests that make UnaryCall with test metadata.
256_TESTS_TO_SEND_METADATA = ['header_matching']
257_TEST_METADATA_KEY = 'xds_md'
258_TEST_METADATA_VALUE = 'exact_match'
259_PATH_MATCHER_NAME = 'path-matcher'
260_BASE_TEMPLATE_NAME = 'test-template'
261_BASE_INSTANCE_GROUP_NAME = 'test-ig'
262_BASE_HEALTH_CHECK_NAME = 'test-hc'
263_BASE_FIREWALL_RULE_NAME = 'test-fw-rule'
264_BASE_BACKEND_SERVICE_NAME = 'test-backend-service'
265_BASE_URL_MAP_NAME = 'test-map'
266_BASE_SERVICE_HOST = 'grpc-test'
267_BASE_TARGET_PROXY_NAME = 'test-target-proxy'
268_BASE_FORWARDING_RULE_NAME = 'test-forwarding-rule'
269_TEST_LOG_BASE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)),
270                                  '../../reports')
271_SPONGE_LOG_NAME = 'sponge_log.log'
272_SPONGE_XML_NAME = 'sponge_log.xml'
273
274
275def get_client_stats(num_rpcs, timeout_sec):
276    with grpc.insecure_channel('localhost:%d' % args.stats_port) as channel:
277        stub = test_pb2_grpc.LoadBalancerStatsServiceStub(channel)
278        request = messages_pb2.LoadBalancerStatsRequest()
279        request.num_rpcs = num_rpcs
280        request.timeout_sec = timeout_sec
281        rpc_timeout = timeout_sec + _CONNECTION_TIMEOUT_SEC
282        response = stub.GetClientStats(request,
283                                       wait_for_ready=True,
284                                       timeout=rpc_timeout)
285        logger.debug('Invoked GetClientStats RPC: %s', response)
286        return response
287
288
289class RpcDistributionError(Exception):
290    pass
291
292
293def _verify_rpcs_to_given_backends(backends, timeout_sec, num_rpcs,
294                                   allow_failures):
295    start_time = time.time()
296    error_msg = None
297    logger.debug('Waiting for %d sec until backends %s receive load' %
298                 (timeout_sec, backends))
299    while time.time() - start_time <= timeout_sec:
300        error_msg = None
301        stats = get_client_stats(num_rpcs, timeout_sec)
302        rpcs_by_peer = stats.rpcs_by_peer
303        for backend in backends:
304            if backend not in rpcs_by_peer:
305                error_msg = 'Backend %s did not receive load' % backend
306                break
307        if not error_msg and len(rpcs_by_peer) > len(backends):
308            error_msg = 'Unexpected backend received load: %s' % rpcs_by_peer
309        if not allow_failures and stats.num_failures > 0:
310            error_msg = '%d RPCs failed' % stats.num_failures
311        if not error_msg:
312            return
313    raise RpcDistributionError(error_msg)
314
315
316def wait_until_all_rpcs_go_to_given_backends_or_fail(backends,
317                                                     timeout_sec,
318                                                     num_rpcs=_NUM_TEST_RPCS):
319    _verify_rpcs_to_given_backends(backends,
320                                   timeout_sec,
321                                   num_rpcs,
322                                   allow_failures=True)
323
324
325def wait_until_all_rpcs_go_to_given_backends(backends,
326                                             timeout_sec,
327                                             num_rpcs=_NUM_TEST_RPCS):
328    _verify_rpcs_to_given_backends(backends,
329                                   timeout_sec,
330                                   num_rpcs,
331                                   allow_failures=False)
332
333
334def compare_distributions(actual_distribution, expected_distribution,
335                          threshold):
336    """Compare if two distributions are similar.
337
338    Args:
339      actual_distribution: A list of floats, contains the actual distribution.
340      expected_distribution: A list of floats, contains the expected distribution.
341      threshold: Number within [0,100], the threshold percentage by which the
342        actual distribution can differ from the expected distribution.
343
344    Returns:
345      The similarity between the distributions as a boolean. Returns true if the
346      actual distribution lies within the threshold of the expected
347      distribution, false otherwise.
348
349    Raises:
350      ValueError: if threshold is not with in [0,100].
351      Exception: containing detailed error messages.
352    """
353    if len(expected_distribution) != len(actual_distribution):
354        raise Exception(
355            'Error: expected and actual distributions have different size (%d vs %d)'
356            % (len(expected_distribution), len(actual_distribution)))
357    if threshold < 0 or threshold > 100:
358        raise ValueError('Value error: Threshold should be between 0 to 100')
359    threshold_fraction = threshold / 100.0
360    for expected, actual in zip(expected_distribution, actual_distribution):
361        if actual < (expected * (1 - threshold_fraction)):
362            raise Exception("actual(%f) < expected(%f-%d%%)" %
363                            (actual, expected, threshold))
364        if actual > (expected * (1 + threshold_fraction)):
365            raise Exception("actual(%f) > expected(%f+%d%%)" %
366                            (actual, expected, threshold))
367    return True
368
369
370def compare_expected_instances(stats, expected_instances):
371    """Compare if stats have expected instances for each type of RPC.
372
373    Args:
374      stats: LoadBalancerStatsResponse reported by interop client.
375      expected_instances: a dict with key as the RPC type (string), value as
376        the expected backend instances (list of strings).
377
378    Returns:
379      Returns true if the instances are expected. False if not.
380    """
381    for rpc_type, expected_peers in expected_instances.items():
382        rpcs_by_peer_for_type = stats.rpcs_by_method[rpc_type]
383        rpcs_by_peer = rpcs_by_peer_for_type.rpcs_by_peer if rpcs_by_peer_for_type else None
384        logger.debug('rpc: %s, by_peer: %s', rpc_type, rpcs_by_peer)
385        peers = list(rpcs_by_peer.keys())
386        if set(peers) != set(expected_peers):
387            logger.info('unexpected peers for %s, got %s, want %s', rpc_type,
388                        peers, expected_peers)
389            return False
390    return True
391
392
393def test_backends_restart(gcp, backend_service, instance_group):
394    logger.info('Running test_backends_restart')
395    instance_names = get_instance_names(gcp, instance_group)
396    num_instances = len(instance_names)
397    start_time = time.time()
398    wait_until_all_rpcs_go_to_given_backends(instance_names,
399                                             _WAIT_FOR_STATS_SEC)
400    stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
401    try:
402        resize_instance_group(gcp, instance_group, 0)
403        wait_until_all_rpcs_go_to_given_backends_or_fail([],
404                                                         _WAIT_FOR_BACKEND_SEC)
405    finally:
406        resize_instance_group(gcp, instance_group, num_instances)
407    wait_for_healthy_backends(gcp, backend_service, instance_group)
408    new_instance_names = get_instance_names(gcp, instance_group)
409    wait_until_all_rpcs_go_to_given_backends(new_instance_names,
410                                             _WAIT_FOR_BACKEND_SEC)
411    new_stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
412    original_distribution = list(stats.rpcs_by_peer.values())
413    original_distribution.sort()
414    new_distribution = list(new_stats.rpcs_by_peer.values())
415    new_distribution.sort()
416    threshold = 3
417    for i in range(len(original_distribution)):
418        if abs(original_distribution[i] - new_distribution[i]) > threshold:
419            raise Exception('Distributions do not match: ', stats, new_stats)
420
421
422def test_change_backend_service(gcp, original_backend_service, instance_group,
423                                alternate_backend_service,
424                                same_zone_instance_group):
425    logger.info('Running test_change_backend_service')
426    original_backend_instances = get_instance_names(gcp, instance_group)
427    alternate_backend_instances = get_instance_names(gcp,
428                                                     same_zone_instance_group)
429    patch_backend_instances(gcp, alternate_backend_service,
430                            [same_zone_instance_group])
431    wait_for_healthy_backends(gcp, original_backend_service, instance_group)
432    wait_for_healthy_backends(gcp, alternate_backend_service,
433                              same_zone_instance_group)
434    wait_until_all_rpcs_go_to_given_backends(original_backend_instances,
435                                             _WAIT_FOR_STATS_SEC)
436    try:
437        patch_url_map_backend_service(gcp, alternate_backend_service)
438        wait_until_all_rpcs_go_to_given_backends(alternate_backend_instances,
439                                                 _WAIT_FOR_URL_MAP_PATCH_SEC)
440    finally:
441        patch_url_map_backend_service(gcp, original_backend_service)
442        patch_backend_instances(gcp, alternate_backend_service, [])
443
444
445def test_gentle_failover(gcp,
446                         backend_service,
447                         primary_instance_group,
448                         secondary_instance_group,
449                         swapped_primary_and_secondary=False):
450    logger.info('Running test_gentle_failover')
451    num_primary_instances = len(get_instance_names(gcp, primary_instance_group))
452    min_instances_for_gentle_failover = 3  # Need >50% failure to start failover
453    try:
454        if num_primary_instances < min_instances_for_gentle_failover:
455            resize_instance_group(gcp, primary_instance_group,
456                                  min_instances_for_gentle_failover)
457        patch_backend_instances(
458            gcp, backend_service,
459            [primary_instance_group, secondary_instance_group])
460        primary_instance_names = get_instance_names(gcp, primary_instance_group)
461        secondary_instance_names = get_instance_names(gcp,
462                                                      secondary_instance_group)
463        wait_for_healthy_backends(gcp, backend_service, primary_instance_group)
464        wait_for_healthy_backends(gcp, backend_service,
465                                  secondary_instance_group)
466        wait_until_all_rpcs_go_to_given_backends(primary_instance_names,
467                                                 _WAIT_FOR_STATS_SEC)
468        instances_to_stop = primary_instance_names[:-1]
469        remaining_instances = primary_instance_names[-1:]
470        try:
471            set_serving_status(instances_to_stop,
472                               gcp.service_port,
473                               serving=False)
474            wait_until_all_rpcs_go_to_given_backends(
475                remaining_instances + secondary_instance_names,
476                _WAIT_FOR_BACKEND_SEC)
477        finally:
478            set_serving_status(primary_instance_names,
479                               gcp.service_port,
480                               serving=True)
481    except RpcDistributionError as e:
482        if not swapped_primary_and_secondary and is_primary_instance_group(
483                gcp, secondary_instance_group):
484            # Swap expectation of primary and secondary instance groups.
485            test_gentle_failover(gcp,
486                                 backend_service,
487                                 secondary_instance_group,
488                                 primary_instance_group,
489                                 swapped_primary_and_secondary=True)
490        else:
491            raise e
492    finally:
493        patch_backend_instances(gcp, backend_service, [primary_instance_group])
494        resize_instance_group(gcp, primary_instance_group,
495                              num_primary_instances)
496        instance_names = get_instance_names(gcp, primary_instance_group)
497        wait_until_all_rpcs_go_to_given_backends(instance_names,
498                                                 _WAIT_FOR_BACKEND_SEC)
499
500
501def test_new_instance_group_receives_traffic(gcp, backend_service,
502                                             instance_group,
503                                             same_zone_instance_group):
504    logger.info('Running test_new_instance_group_receives_traffic')
505    instance_names = get_instance_names(gcp, instance_group)
506    # TODO(ericgribkoff) Reduce this timeout. When running sequentially, this
507    # occurs after patching the url map in test_change_backend_service, so we
508    # need the extended timeout here as well.
509    wait_until_all_rpcs_go_to_given_backends(instance_names,
510                                             _WAIT_FOR_URL_MAP_PATCH_SEC)
511    try:
512        patch_backend_instances(gcp,
513                                backend_service,
514                                [instance_group, same_zone_instance_group],
515                                balancing_mode='RATE')
516        wait_for_healthy_backends(gcp, backend_service, instance_group)
517        wait_for_healthy_backends(gcp, backend_service,
518                                  same_zone_instance_group)
519        combined_instance_names = instance_names + get_instance_names(
520            gcp, same_zone_instance_group)
521        wait_until_all_rpcs_go_to_given_backends(combined_instance_names,
522                                                 _WAIT_FOR_BACKEND_SEC)
523    finally:
524        patch_backend_instances(gcp, backend_service, [instance_group])
525
526
527def test_ping_pong(gcp, backend_service, instance_group):
528    logger.info('Running test_ping_pong')
529    wait_for_healthy_backends(gcp, backend_service, instance_group)
530    instance_names = get_instance_names(gcp, instance_group)
531    wait_until_all_rpcs_go_to_given_backends(instance_names,
532                                             _WAIT_FOR_STATS_SEC)
533
534
535def test_remove_instance_group(gcp, backend_service, instance_group,
536                               same_zone_instance_group):
537    logger.info('Running test_remove_instance_group')
538    try:
539        patch_backend_instances(gcp,
540                                backend_service,
541                                [instance_group, same_zone_instance_group],
542                                balancing_mode='RATE')
543        wait_for_healthy_backends(gcp, backend_service, instance_group)
544        wait_for_healthy_backends(gcp, backend_service,
545                                  same_zone_instance_group)
546        instance_names = get_instance_names(gcp, instance_group)
547        same_zone_instance_names = get_instance_names(gcp,
548                                                      same_zone_instance_group)
549        wait_until_all_rpcs_go_to_given_backends(
550            instance_names + same_zone_instance_names, _WAIT_FOR_BACKEND_SEC)
551        patch_backend_instances(gcp,
552                                backend_service, [same_zone_instance_group],
553                                balancing_mode='RATE')
554        wait_until_all_rpcs_go_to_given_backends(same_zone_instance_names,
555                                                 _WAIT_FOR_BACKEND_SEC)
556    finally:
557        patch_backend_instances(gcp, backend_service, [instance_group])
558        wait_until_all_rpcs_go_to_given_backends(instance_names,
559                                                 _WAIT_FOR_BACKEND_SEC)
560
561
562def test_round_robin(gcp, backend_service, instance_group):
563    logger.info('Running test_round_robin')
564    wait_for_healthy_backends(gcp, backend_service, instance_group)
565    instance_names = get_instance_names(gcp, instance_group)
566    threshold = 1
567    wait_until_all_rpcs_go_to_given_backends(instance_names,
568                                             _WAIT_FOR_STATS_SEC)
569    stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
570    requests_received = [stats.rpcs_by_peer[x] for x in stats.rpcs_by_peer]
571    total_requests_received = sum(requests_received)
572    if total_requests_received != _NUM_TEST_RPCS:
573        raise Exception('Unexpected RPC failures', stats)
574    expected_requests = total_requests_received / len(instance_names)
575    for instance in instance_names:
576        if abs(stats.rpcs_by_peer[instance] - expected_requests) > threshold:
577            raise Exception(
578                'RPC peer distribution differs from expected by more than %d '
579                'for instance %s (%s)', threshold, instance, stats)
580
581
582def test_secondary_locality_gets_no_requests_on_partial_primary_failure(
583        gcp,
584        backend_service,
585        primary_instance_group,
586        secondary_instance_group,
587        swapped_primary_and_secondary=False):
588    logger.info(
589        'Running secondary_locality_gets_no_requests_on_partial_primary_failure'
590    )
591    try:
592        patch_backend_instances(
593            gcp, backend_service,
594            [primary_instance_group, secondary_instance_group])
595        wait_for_healthy_backends(gcp, backend_service, primary_instance_group)
596        wait_for_healthy_backends(gcp, backend_service,
597                                  secondary_instance_group)
598        primary_instance_names = get_instance_names(gcp, primary_instance_group)
599        wait_until_all_rpcs_go_to_given_backends(primary_instance_names,
600                                                 _WAIT_FOR_STATS_SEC)
601        instances_to_stop = primary_instance_names[:1]
602        remaining_instances = primary_instance_names[1:]
603        try:
604            set_serving_status(instances_to_stop,
605                               gcp.service_port,
606                               serving=False)
607            wait_until_all_rpcs_go_to_given_backends(remaining_instances,
608                                                     _WAIT_FOR_BACKEND_SEC)
609        finally:
610            set_serving_status(primary_instance_names,
611                               gcp.service_port,
612                               serving=True)
613    except RpcDistributionError as e:
614        if not swapped_primary_and_secondary and is_primary_instance_group(
615                gcp, secondary_instance_group):
616            # Swap expectation of primary and secondary instance groups.
617            test_secondary_locality_gets_no_requests_on_partial_primary_failure(
618                gcp,
619                backend_service,
620                secondary_instance_group,
621                primary_instance_group,
622                swapped_primary_and_secondary=True)
623        else:
624            raise e
625    finally:
626        patch_backend_instances(gcp, backend_service, [primary_instance_group])
627
628
629def test_secondary_locality_gets_requests_on_primary_failure(
630        gcp,
631        backend_service,
632        primary_instance_group,
633        secondary_instance_group,
634        swapped_primary_and_secondary=False):
635    logger.info('Running secondary_locality_gets_requests_on_primary_failure')
636    try:
637        patch_backend_instances(
638            gcp, backend_service,
639            [primary_instance_group, secondary_instance_group])
640        wait_for_healthy_backends(gcp, backend_service, primary_instance_group)
641        wait_for_healthy_backends(gcp, backend_service,
642                                  secondary_instance_group)
643        primary_instance_names = get_instance_names(gcp, primary_instance_group)
644        secondary_instance_names = get_instance_names(gcp,
645                                                      secondary_instance_group)
646        wait_until_all_rpcs_go_to_given_backends(primary_instance_names,
647                                                 _WAIT_FOR_STATS_SEC)
648        try:
649            set_serving_status(primary_instance_names,
650                               gcp.service_port,
651                               serving=False)
652            wait_until_all_rpcs_go_to_given_backends(secondary_instance_names,
653                                                     _WAIT_FOR_BACKEND_SEC)
654        finally:
655            set_serving_status(primary_instance_names,
656                               gcp.service_port,
657                               serving=True)
658    except RpcDistributionError as e:
659        if not swapped_primary_and_secondary and is_primary_instance_group(
660                gcp, secondary_instance_group):
661            # Swap expectation of primary and secondary instance groups.
662            test_secondary_locality_gets_requests_on_primary_failure(
663                gcp,
664                backend_service,
665                secondary_instance_group,
666                primary_instance_group,
667                swapped_primary_and_secondary=True)
668        else:
669            raise e
670    finally:
671        patch_backend_instances(gcp, backend_service, [primary_instance_group])
672
673
674def prepare_services_for_urlmap_tests(gcp, original_backend_service,
675                                      instance_group, alternate_backend_service,
676                                      same_zone_instance_group):
677    '''
678    This function prepares the services to be ready for tests that modifies
679    urlmaps.
680
681    Returns:
682      Returns original and alternate backend names as lists of strings.
683    '''
684    # The config validation for proxyless doesn't allow setting
685    # default_route_action or route_rules. Disable validate
686    # validate_for_proxyless for this test. This can be removed when validation
687    # accepts default_route_action.
688    logger.info('disabling validate_for_proxyless in target proxy')
689    set_validate_for_proxyless(gcp, False)
690
691    logger.info('waiting for original backends to become healthy')
692    wait_for_healthy_backends(gcp, original_backend_service, instance_group)
693
694    patch_backend_instances(gcp, alternate_backend_service,
695                            [same_zone_instance_group])
696    logger.info('waiting for alternate to become healthy')
697    wait_for_healthy_backends(gcp, alternate_backend_service,
698                              same_zone_instance_group)
699
700    original_backend_instances = get_instance_names(gcp, instance_group)
701    logger.info('original backends instances: %s', original_backend_instances)
702
703    alternate_backend_instances = get_instance_names(gcp,
704                                                     same_zone_instance_group)
705    logger.info('alternate backends instances: %s', alternate_backend_instances)
706
707    # Start with all traffic going to original_backend_service.
708    logger.info('waiting for traffic to all go to original backends')
709    wait_until_all_rpcs_go_to_given_backends(original_backend_instances,
710                                             _WAIT_FOR_STATS_SEC)
711    return original_backend_instances, alternate_backend_instances
712
713
714def test_traffic_splitting(gcp, original_backend_service, instance_group,
715                           alternate_backend_service, same_zone_instance_group):
716    # This test start with all traffic going to original_backend_service. Then
717    # it updates URL-map to set default action to traffic splitting between
718    # original and alternate. It waits for all backends in both services to
719    # receive traffic, then verifies that weights are expected.
720    logger.info('Running test_traffic_splitting')
721
722    original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests(
723        gcp, original_backend_service, instance_group,
724        alternate_backend_service, same_zone_instance_group)
725
726    try:
727        # Patch urlmap, change route action to traffic splitting between
728        # original and alternate.
729        logger.info('patching url map with traffic splitting')
730        original_service_percentage, alternate_service_percentage = 20, 80
731        patch_url_map_backend_service(
732            gcp,
733            services_with_weights={
734                original_backend_service: original_service_percentage,
735                alternate_backend_service: alternate_service_percentage,
736            })
737        # Split percentage between instances: [20,80] -> [10,10,40,40].
738        expected_instance_percentage = [
739            original_service_percentage * 1.0 / len(original_backend_instances)
740        ] * len(original_backend_instances) + [
741            alternate_service_percentage * 1.0 /
742            len(alternate_backend_instances)
743        ] * len(alternate_backend_instances)
744
745        # Wait for traffic to go to both services.
746        logger.info(
747            'waiting for traffic to go to all backends (including alternate)')
748        wait_until_all_rpcs_go_to_given_backends(
749            original_backend_instances + alternate_backend_instances,
750            _WAIT_FOR_STATS_SEC)
751
752        # Verify that weights between two services are expected.
753        retry_count = 10
754        # Each attempt takes about 10 seconds, 10 retries is equivalent to 100
755        # seconds timeout.
756        for i in range(retry_count):
757            stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
758            got_instance_count = [
759                stats.rpcs_by_peer[i] for i in original_backend_instances
760            ] + [stats.rpcs_by_peer[i] for i in alternate_backend_instances]
761            total_count = sum(got_instance_count)
762            got_instance_percentage = [
763                x * 100.0 / total_count for x in got_instance_count
764            ]
765
766            try:
767                compare_distributions(got_instance_percentage,
768                                      expected_instance_percentage, 5)
769            except Exception as e:
770                logger.info('attempt %d', i)
771                logger.info('got percentage: %s', got_instance_percentage)
772                logger.info('expected percentage: %s',
773                            expected_instance_percentage)
774                logger.info(e)
775                if i == retry_count - 1:
776                    raise Exception(
777                        'RPC distribution (%s) differs from expected (%s)',
778                        got_instance_percentage, expected_instance_percentage)
779            else:
780                logger.info("success")
781                break
782    finally:
783        patch_url_map_backend_service(gcp, original_backend_service)
784        patch_backend_instances(gcp, alternate_backend_service, [])
785        set_validate_for_proxyless(gcp, True)
786
787
788def test_path_matching(gcp, original_backend_service, instance_group,
789                       alternate_backend_service, same_zone_instance_group):
790    # This test start with all traffic (UnaryCall and EmptyCall) going to
791    # original_backend_service.
792    #
793    # Then it updates URL-map to add routes, to make UnaryCall and EmptyCall to
794    # go different backends. It waits for all backends in both services to
795    # receive traffic, then verifies that traffic goes to the expected
796    # backends.
797    logger.info('Running test_path_matching')
798
799    original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests(
800        gcp, original_backend_service, instance_group,
801        alternate_backend_service, same_zone_instance_group)
802
803    try:
804        # A list of tuples (route_rules, expected_instances).
805        test_cases = [
806            (
807                [{
808                    'priority': 0,
809                    # FullPath EmptyCall -> alternate_backend_service.
810                    'matchRules': [{
811                        'fullPathMatch': '/grpc.testing.TestService/EmptyCall'
812                    }],
813                    'service': alternate_backend_service.url
814                }],
815                {
816                    "EmptyCall": alternate_backend_instances,
817                    "UnaryCall": original_backend_instances
818                }),
819            (
820                [{
821                    'priority': 0,
822                    # Prefix UnaryCall -> alternate_backend_service.
823                    'matchRules': [{
824                        'prefixMatch': '/grpc.testing.TestService/Unary'
825                    }],
826                    'service': alternate_backend_service.url
827                }],
828                {
829                    "UnaryCall": alternate_backend_instances,
830                    "EmptyCall": original_backend_instances
831                })
832        ]
833
834        for (route_rules, expected_instances) in test_cases:
835            logger.info('patching url map with %s -> alternative',
836                        route_rules[0]['matchRules'])
837            patch_url_map_backend_service(gcp,
838                                          original_backend_service,
839                                          route_rules=route_rules)
840
841            # Wait for traffic to go to both services.
842            logger.info(
843                'waiting for traffic to go to all backends (including alternate)'
844            )
845            wait_until_all_rpcs_go_to_given_backends(
846                original_backend_instances + alternate_backend_instances,
847                _WAIT_FOR_STATS_SEC)
848
849            retry_count = 10
850            # Each attempt takes about 10 seconds, 10 retries is equivalent to 100
851            # seconds timeout.
852            for i in range(retry_count):
853                stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
854                if not stats.rpcs_by_method:
855                    raise ValueError(
856                        'stats.rpcs_by_method is None, the interop client stats service does not support this test case'
857                    )
858                logger.info('attempt %d', i)
859                if compare_expected_instances(stats, expected_instances):
860                    logger.info("success")
861                    break
862    finally:
863        patch_url_map_backend_service(gcp, original_backend_service)
864        patch_backend_instances(gcp, alternate_backend_service, [])
865        set_validate_for_proxyless(gcp, True)
866
867
868def test_header_matching(gcp, original_backend_service, instance_group,
869                         alternate_backend_service, same_zone_instance_group):
870    # This test start with all traffic (UnaryCall and EmptyCall) going to
871    # original_backend_service.
872    #
873    # Then it updates URL-map to add routes, to make RPCs with test headers to
874    # go to different backends. It waits for all backends in both services to
875    # receive traffic, then verifies that traffic goes to the expected
876    # backends.
877    logger.info('Running test_header_matching')
878
879    original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests(
880        gcp, original_backend_service, instance_group,
881        alternate_backend_service, same_zone_instance_group)
882
883    try:
884        # A list of tuples (route_rules, expected_instances).
885        test_cases = [(
886            [{
887                'priority': 0,
888                # Header ExactMatch -> alternate_backend_service.
889                # EmptyCall is sent with the metadata.
890                'matchRules': [{
891                    'prefixMatch':
892                        '/',
893                    'headerMatches': [{
894                        'headerName': _TEST_METADATA_KEY,
895                        'exactMatch': _TEST_METADATA_VALUE
896                    }]
897                }],
898                'service': alternate_backend_service.url
899            }],
900            {
901                "EmptyCall": alternate_backend_instances,
902                "UnaryCall": original_backend_instances
903            })]
904
905        for (route_rules, expected_instances) in test_cases:
906            logger.info('patching url map with %s -> alternative',
907                        route_rules[0]['matchRules'])
908            patch_url_map_backend_service(gcp,
909                                          original_backend_service,
910                                          route_rules=route_rules)
911
912            # Wait for traffic to go to both services.
913            logger.info(
914                'waiting for traffic to go to all backends (including alternate)'
915            )
916            wait_until_all_rpcs_go_to_given_backends(
917                original_backend_instances + alternate_backend_instances,
918                _WAIT_FOR_STATS_SEC)
919
920            retry_count = 10
921            # Each attempt takes about 10 seconds, 10 retries is equivalent to 100
922            # seconds timeout.
923            for i in range(retry_count):
924                stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
925                if not stats.rpcs_by_method:
926                    raise ValueError(
927                        'stats.rpcs_by_method is None, the interop client stats service does not support this test case'
928                    )
929                logger.info('attempt %d', i)
930                if compare_expected_instances(stats, expected_instances):
931                    logger.info("success")
932                    break
933    finally:
934        patch_url_map_backend_service(gcp, original_backend_service)
935        patch_backend_instances(gcp, alternate_backend_service, [])
936        set_validate_for_proxyless(gcp, True)
937
938
939def set_serving_status(instances, service_port, serving):
940    for instance in instances:
941        with grpc.insecure_channel('%s:%d' %
942                                   (instance, service_port)) as channel:
943            stub = test_pb2_grpc.XdsUpdateHealthServiceStub(channel)
944            if serving:
945                stub.SetServing(empty_pb2.Empty())
946            else:
947                stub.SetNotServing(empty_pb2.Empty())
948
949
950def is_primary_instance_group(gcp, instance_group):
951    # Clients may connect to a TD instance in a different region than the
952    # client, in which case primary/secondary assignments may not be based on
953    # the client's actual locality.
954    instance_names = get_instance_names(gcp, instance_group)
955    stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
956    return all(peer in instance_names for peer in stats.rpcs_by_peer.keys())
957
958
959def get_startup_script(path_to_server_binary, service_port):
960    if path_to_server_binary:
961        return "nohup %s --port=%d 1>/dev/null &" % (path_to_server_binary,
962                                                     service_port)
963    else:
964        return """#!/bin/bash
965sudo apt update
966sudo apt install -y git default-jdk
967mkdir java_server
968pushd java_server
969git clone https://github.com/grpc/grpc-java.git
970pushd grpc-java
971pushd interop-testing
972../gradlew installDist -x test -PskipCodegen=true -PskipAndroid=true
973
974nohup build/install/grpc-interop-testing/bin/xds-test-server \
975    --port=%d 1>/dev/null &""" % service_port
976
977
978def create_instance_template(gcp, name, network, source_image, machine_type,
979                             startup_script):
980    config = {
981        'name': name,
982        'properties': {
983            'tags': {
984                'items': ['allow-health-checks']
985            },
986            'machineType': machine_type,
987            'serviceAccounts': [{
988                'email': 'default',
989                'scopes': ['https://www.googleapis.com/auth/cloud-platform',]
990            }],
991            'networkInterfaces': [{
992                'accessConfigs': [{
993                    'type': 'ONE_TO_ONE_NAT'
994                }],
995                'network': network
996            }],
997            'disks': [{
998                'boot': True,
999                'initializeParams': {
1000                    'sourceImage': source_image
1001                }
1002            }],
1003            'metadata': {
1004                'items': [{
1005                    'key': 'startup-script',
1006                    'value': startup_script
1007                }]
1008            }
1009        }
1010    }
1011
1012    logger.debug('Sending GCP request with body=%s', config)
1013    result = gcp.compute.instanceTemplates().insert(
1014        project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
1015    wait_for_global_operation(gcp, result['name'])
1016    gcp.instance_template = GcpResource(config['name'], result['targetLink'])
1017
1018
1019def add_instance_group(gcp, zone, name, size):
1020    config = {
1021        'name': name,
1022        'instanceTemplate': gcp.instance_template.url,
1023        'targetSize': size,
1024        'namedPorts': [{
1025            'name': 'grpc',
1026            'port': gcp.service_port
1027        }]
1028    }
1029
1030    logger.debug('Sending GCP request with body=%s', config)
1031    result = gcp.compute.instanceGroupManagers().insert(
1032        project=gcp.project, zone=zone,
1033        body=config).execute(num_retries=_GCP_API_RETRIES)
1034    wait_for_zone_operation(gcp, zone, result['name'])
1035    result = gcp.compute.instanceGroupManagers().get(
1036        project=gcp.project, zone=zone,
1037        instanceGroupManager=config['name']).execute(
1038            num_retries=_GCP_API_RETRIES)
1039    instance_group = InstanceGroup(config['name'], result['instanceGroup'],
1040                                   zone)
1041    gcp.instance_groups.append(instance_group)
1042    wait_for_instance_group_to_reach_expected_size(gcp, instance_group, size,
1043                                                   _WAIT_FOR_OPERATION_SEC)
1044    return instance_group
1045
1046
1047def create_health_check(gcp, name):
1048    if gcp.alpha_compute:
1049        config = {
1050            'name': name,
1051            'type': 'GRPC',
1052            'grpcHealthCheck': {
1053                'portSpecification': 'USE_SERVING_PORT'
1054            }
1055        }
1056        compute_to_use = gcp.alpha_compute
1057    else:
1058        config = {
1059            'name': name,
1060            'type': 'TCP',
1061            'tcpHealthCheck': {
1062                'portName': 'grpc'
1063            }
1064        }
1065        compute_to_use = gcp.compute
1066    logger.debug('Sending GCP request with body=%s', config)
1067    result = compute_to_use.healthChecks().insert(
1068        project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
1069    wait_for_global_operation(gcp, result['name'])
1070    gcp.health_check = GcpResource(config['name'], result['targetLink'])
1071
1072
1073def create_health_check_firewall_rule(gcp, name):
1074    config = {
1075        'name': name,
1076        'direction': 'INGRESS',
1077        'allowed': [{
1078            'IPProtocol': 'tcp'
1079        }],
1080        'sourceRanges': ['35.191.0.0/16', '130.211.0.0/22'],
1081        'targetTags': ['allow-health-checks'],
1082    }
1083    logger.debug('Sending GCP request with body=%s', config)
1084    result = gcp.compute.firewalls().insert(
1085        project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
1086    wait_for_global_operation(gcp, result['name'])
1087    gcp.health_check_firewall_rule = GcpResource(config['name'],
1088                                                 result['targetLink'])
1089
1090
1091def add_backend_service(gcp, name):
1092    if gcp.alpha_compute:
1093        protocol = 'GRPC'
1094        compute_to_use = gcp.alpha_compute
1095    else:
1096        protocol = 'HTTP2'
1097        compute_to_use = gcp.compute
1098    config = {
1099        'name': name,
1100        'loadBalancingScheme': 'INTERNAL_SELF_MANAGED',
1101        'healthChecks': [gcp.health_check.url],
1102        'portName': 'grpc',
1103        'protocol': protocol
1104    }
1105    logger.debug('Sending GCP request with body=%s', config)
1106    result = compute_to_use.backendServices().insert(
1107        project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
1108    wait_for_global_operation(gcp, result['name'])
1109    backend_service = GcpResource(config['name'], result['targetLink'])
1110    gcp.backend_services.append(backend_service)
1111    return backend_service
1112
1113
1114def create_url_map(gcp, name, backend_service, host_name):
1115    config = {
1116        'name': name,
1117        'defaultService': backend_service.url,
1118        'pathMatchers': [{
1119            'name': _PATH_MATCHER_NAME,
1120            'defaultService': backend_service.url,
1121        }],
1122        'hostRules': [{
1123            'hosts': [host_name],
1124            'pathMatcher': _PATH_MATCHER_NAME
1125        }]
1126    }
1127    logger.debug('Sending GCP request with body=%s', config)
1128    result = gcp.compute.urlMaps().insert(
1129        project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
1130    wait_for_global_operation(gcp, result['name'])
1131    gcp.url_map = GcpResource(config['name'], result['targetLink'])
1132
1133
1134def patch_url_map_host_rule_with_port(gcp, name, backend_service, host_name):
1135    config = {
1136        'hostRules': [{
1137            'hosts': ['%s:%d' % (host_name, gcp.service_port)],
1138            'pathMatcher': _PATH_MATCHER_NAME
1139        }]
1140    }
1141    logger.debug('Sending GCP request with body=%s', config)
1142    result = gcp.compute.urlMaps().patch(
1143        project=gcp.project, urlMap=name,
1144        body=config).execute(num_retries=_GCP_API_RETRIES)
1145    wait_for_global_operation(gcp, result['name'])
1146
1147
1148def set_validate_for_proxyless(gcp, validate_for_proxyless):
1149    if not gcp.alpha_compute:
1150        logger.debug(
1151            'Not setting validateForProxy because alpha is not enabled')
1152        return
1153    # This function deletes global_forwarding_rule and target_proxy, then
1154    # recreate target_proxy with validateForProxyless=False. This is necessary
1155    # because patching target_grpc_proxy isn't supported.
1156    delete_global_forwarding_rule(gcp)
1157    delete_target_proxy(gcp)
1158    create_target_proxy(gcp, gcp.target_proxy.name, validate_for_proxyless)
1159    create_global_forwarding_rule(gcp, gcp.global_forwarding_rule.name,
1160                                  [gcp.service_port])
1161
1162
1163def create_target_proxy(gcp, name, validate_for_proxyless=True):
1164    if gcp.alpha_compute:
1165        config = {
1166            'name': name,
1167            'url_map': gcp.url_map.url,
1168            'validate_for_proxyless': validate_for_proxyless,
1169        }
1170        logger.debug('Sending GCP request with body=%s', config)
1171        result = gcp.alpha_compute.targetGrpcProxies().insert(
1172            project=gcp.project,
1173            body=config).execute(num_retries=_GCP_API_RETRIES)
1174    else:
1175        config = {
1176            'name': name,
1177            'url_map': gcp.url_map.url,
1178        }
1179        logger.debug('Sending GCP request with body=%s', config)
1180        result = gcp.compute.targetHttpProxies().insert(
1181            project=gcp.project,
1182            body=config).execute(num_retries=_GCP_API_RETRIES)
1183    wait_for_global_operation(gcp, result['name'])
1184    gcp.target_proxy = GcpResource(config['name'], result['targetLink'])
1185
1186
1187def create_global_forwarding_rule(gcp, name, potential_ports):
1188    if gcp.alpha_compute:
1189        compute_to_use = gcp.alpha_compute
1190    else:
1191        compute_to_use = gcp.compute
1192    for port in potential_ports:
1193        try:
1194            config = {
1195                'name': name,
1196                'loadBalancingScheme': 'INTERNAL_SELF_MANAGED',
1197                'portRange': str(port),
1198                'IPAddress': '0.0.0.0',
1199                'network': args.network,
1200                'target': gcp.target_proxy.url,
1201            }
1202            logger.debug('Sending GCP request with body=%s', config)
1203            result = compute_to_use.globalForwardingRules().insert(
1204                project=gcp.project,
1205                body=config).execute(num_retries=_GCP_API_RETRIES)
1206            wait_for_global_operation(gcp, result['name'])
1207            gcp.global_forwarding_rule = GcpResource(config['name'],
1208                                                     result['targetLink'])
1209            gcp.service_port = port
1210            return
1211        except googleapiclient.errors.HttpError as http_error:
1212            logger.warning(
1213                'Got error %s when attempting to create forwarding rule to '
1214                '0.0.0.0:%d. Retrying with another port.' % (http_error, port))
1215
1216
1217def get_health_check(gcp, health_check_name):
1218    result = gcp.compute.healthChecks().get(
1219        project=gcp.project, healthCheck=health_check_name).execute()
1220    gcp.health_check = GcpResource(health_check_name, result['selfLink'])
1221
1222
1223def get_health_check_firewall_rule(gcp, firewall_name):
1224    result = gcp.compute.firewalls().get(project=gcp.project,
1225                                         firewall=firewall_name).execute()
1226    gcp.health_check_firewall_rule = GcpResource(firewall_name,
1227                                                 result['selfLink'])
1228
1229
1230def get_backend_service(gcp, backend_service_name):
1231    result = gcp.compute.backendServices().get(
1232        project=gcp.project, backendService=backend_service_name).execute()
1233    backend_service = GcpResource(backend_service_name, result['selfLink'])
1234    gcp.backend_services.append(backend_service)
1235    return backend_service
1236
1237
1238def get_url_map(gcp, url_map_name):
1239    result = gcp.compute.urlMaps().get(project=gcp.project,
1240                                       urlMap=url_map_name).execute()
1241    gcp.url_map = GcpResource(url_map_name, result['selfLink'])
1242
1243
1244def get_target_proxy(gcp, target_proxy_name):
1245    if gcp.alpha_compute:
1246        result = gcp.alpha_compute.targetGrpcProxies().get(
1247            project=gcp.project, targetGrpcProxy=target_proxy_name).execute()
1248    else:
1249        result = gcp.compute.targetHttpProxies().get(
1250            project=gcp.project, targetHttpProxy=target_proxy_name).execute()
1251    gcp.target_proxy = GcpResource(target_proxy_name, result['selfLink'])
1252
1253
1254def get_global_forwarding_rule(gcp, forwarding_rule_name):
1255    result = gcp.compute.globalForwardingRules().get(
1256        project=gcp.project, forwardingRule=forwarding_rule_name).execute()
1257    gcp.global_forwarding_rule = GcpResource(forwarding_rule_name,
1258                                             result['selfLink'])
1259
1260
1261def get_instance_template(gcp, template_name):
1262    result = gcp.compute.instanceTemplates().get(
1263        project=gcp.project, instanceTemplate=template_name).execute()
1264    gcp.instance_template = GcpResource(template_name, result['selfLink'])
1265
1266
1267def get_instance_group(gcp, zone, instance_group_name):
1268    result = gcp.compute.instanceGroups().get(
1269        project=gcp.project, zone=zone,
1270        instanceGroup=instance_group_name).execute()
1271    gcp.service_port = result['namedPorts'][0]['port']
1272    instance_group = InstanceGroup(instance_group_name, result['selfLink'],
1273                                   zone)
1274    gcp.instance_groups.append(instance_group)
1275    return instance_group
1276
1277
1278def delete_global_forwarding_rule(gcp):
1279    try:
1280        result = gcp.compute.globalForwardingRules().delete(
1281            project=gcp.project,
1282            forwardingRule=gcp.global_forwarding_rule.name).execute(
1283                num_retries=_GCP_API_RETRIES)
1284        wait_for_global_operation(gcp, result['name'])
1285    except googleapiclient.errors.HttpError as http_error:
1286        logger.info('Delete failed: %s', http_error)
1287
1288
1289def delete_target_proxy(gcp):
1290    try:
1291        if gcp.alpha_compute:
1292            result = gcp.alpha_compute.targetGrpcProxies().delete(
1293                project=gcp.project,
1294                targetGrpcProxy=gcp.target_proxy.name).execute(
1295                    num_retries=_GCP_API_RETRIES)
1296        else:
1297            result = gcp.compute.targetHttpProxies().delete(
1298                project=gcp.project,
1299                targetHttpProxy=gcp.target_proxy.name).execute(
1300                    num_retries=_GCP_API_RETRIES)
1301        wait_for_global_operation(gcp, result['name'])
1302    except googleapiclient.errors.HttpError as http_error:
1303        logger.info('Delete failed: %s', http_error)
1304
1305
1306def delete_url_map(gcp):
1307    try:
1308        result = gcp.compute.urlMaps().delete(
1309            project=gcp.project,
1310            urlMap=gcp.url_map.name).execute(num_retries=_GCP_API_RETRIES)
1311        wait_for_global_operation(gcp, result['name'])
1312    except googleapiclient.errors.HttpError as http_error:
1313        logger.info('Delete failed: %s', http_error)
1314
1315
1316def delete_backend_services(gcp):
1317    for backend_service in gcp.backend_services:
1318        try:
1319            result = gcp.compute.backendServices().delete(
1320                project=gcp.project,
1321                backendService=backend_service.name).execute(
1322                    num_retries=_GCP_API_RETRIES)
1323            wait_for_global_operation(gcp, result['name'])
1324        except googleapiclient.errors.HttpError as http_error:
1325            logger.info('Delete failed: %s', http_error)
1326
1327
1328def delete_firewall(gcp):
1329    try:
1330        result = gcp.compute.firewalls().delete(
1331            project=gcp.project,
1332            firewall=gcp.health_check_firewall_rule.name).execute(
1333                num_retries=_GCP_API_RETRIES)
1334        wait_for_global_operation(gcp, result['name'])
1335    except googleapiclient.errors.HttpError as http_error:
1336        logger.info('Delete failed: %s', http_error)
1337
1338
1339def delete_health_check(gcp):
1340    try:
1341        result = gcp.compute.healthChecks().delete(
1342            project=gcp.project, healthCheck=gcp.health_check.name).execute(
1343                num_retries=_GCP_API_RETRIES)
1344        wait_for_global_operation(gcp, result['name'])
1345    except googleapiclient.errors.HttpError as http_error:
1346        logger.info('Delete failed: %s', http_error)
1347
1348
1349def delete_instance_groups(gcp):
1350    for instance_group in gcp.instance_groups:
1351        try:
1352            result = gcp.compute.instanceGroupManagers().delete(
1353                project=gcp.project,
1354                zone=instance_group.zone,
1355                instanceGroupManager=instance_group.name).execute(
1356                    num_retries=_GCP_API_RETRIES)
1357            wait_for_zone_operation(gcp,
1358                                    instance_group.zone,
1359                                    result['name'],
1360                                    timeout_sec=_WAIT_FOR_BACKEND_SEC)
1361        except googleapiclient.errors.HttpError as http_error:
1362            logger.info('Delete failed: %s', http_error)
1363
1364
1365def delete_instance_template(gcp):
1366    try:
1367        result = gcp.compute.instanceTemplates().delete(
1368            project=gcp.project,
1369            instanceTemplate=gcp.instance_template.name).execute(
1370                num_retries=_GCP_API_RETRIES)
1371        wait_for_global_operation(gcp, result['name'])
1372    except googleapiclient.errors.HttpError as http_error:
1373        logger.info('Delete failed: %s', http_error)
1374
1375
1376def patch_backend_instances(gcp,
1377                            backend_service,
1378                            instance_groups,
1379                            balancing_mode='UTILIZATION'):
1380    if gcp.alpha_compute:
1381        compute_to_use = gcp.alpha_compute
1382    else:
1383        compute_to_use = gcp.compute
1384    config = {
1385        'backends': [{
1386            'group': instance_group.url,
1387            'balancingMode': balancing_mode,
1388            'maxRate': 1 if balancing_mode == 'RATE' else None
1389        } for instance_group in instance_groups],
1390    }
1391    logger.debug('Sending GCP request with body=%s', config)
1392    result = compute_to_use.backendServices().patch(
1393        project=gcp.project, backendService=backend_service.name,
1394        body=config).execute(num_retries=_GCP_API_RETRIES)
1395    wait_for_global_operation(gcp,
1396                              result['name'],
1397                              timeout_sec=_WAIT_FOR_BACKEND_SEC)
1398
1399
1400def resize_instance_group(gcp,
1401                          instance_group,
1402                          new_size,
1403                          timeout_sec=_WAIT_FOR_OPERATION_SEC):
1404    result = gcp.compute.instanceGroupManagers().resize(
1405        project=gcp.project,
1406        zone=instance_group.zone,
1407        instanceGroupManager=instance_group.name,
1408        size=new_size).execute(num_retries=_GCP_API_RETRIES)
1409    wait_for_zone_operation(gcp,
1410                            instance_group.zone,
1411                            result['name'],
1412                            timeout_sec=360)
1413    wait_for_instance_group_to_reach_expected_size(gcp, instance_group,
1414                                                   new_size, timeout_sec)
1415
1416
1417def patch_url_map_backend_service(gcp,
1418                                  backend_service=None,
1419                                  services_with_weights=None,
1420                                  route_rules=None):
1421    '''change url_map's backend service
1422
1423    Only one of backend_service and service_with_weights can be not None.
1424    '''
1425    if backend_service and services_with_weights:
1426        raise ValueError(
1427            'both backend_service and service_with_weights are not None.')
1428
1429    default_service = backend_service.url if backend_service else None
1430    default_route_action = {
1431        'weightedBackendServices': [{
1432            'backendService': service.url,
1433            'weight': w,
1434        } for service, w in services_with_weights.items()]
1435    } if services_with_weights else None
1436
1437    config = {
1438        'pathMatchers': [{
1439            'name': _PATH_MATCHER_NAME,
1440            'defaultService': default_service,
1441            'defaultRouteAction': default_route_action,
1442            'routeRules': route_rules,
1443        }]
1444    }
1445    logger.debug('Sending GCP request with body=%s', config)
1446    result = gcp.compute.urlMaps().patch(
1447        project=gcp.project, urlMap=gcp.url_map.name,
1448        body=config).execute(num_retries=_GCP_API_RETRIES)
1449    wait_for_global_operation(gcp, result['name'])
1450
1451
1452def wait_for_instance_group_to_reach_expected_size(gcp, instance_group,
1453                                                   expected_size, timeout_sec):
1454    start_time = time.time()
1455    while True:
1456        current_size = len(get_instance_names(gcp, instance_group))
1457        if current_size == expected_size:
1458            break
1459        if time.time() - start_time > timeout_sec:
1460            raise Exception(
1461                'Instance group had expected size %d but actual size %d' %
1462                (expected_size, current_size))
1463        time.sleep(2)
1464
1465
1466def wait_for_global_operation(gcp,
1467                              operation,
1468                              timeout_sec=_WAIT_FOR_OPERATION_SEC):
1469    start_time = time.time()
1470    while time.time() - start_time <= timeout_sec:
1471        result = gcp.compute.globalOperations().get(
1472            project=gcp.project,
1473            operation=operation).execute(num_retries=_GCP_API_RETRIES)
1474        if result['status'] == 'DONE':
1475            if 'error' in result:
1476                raise Exception(result['error'])
1477            return
1478        time.sleep(2)
1479    raise Exception('Operation %s did not complete within %d', operation,
1480                    timeout_sec)
1481
1482
1483def wait_for_zone_operation(gcp,
1484                            zone,
1485                            operation,
1486                            timeout_sec=_WAIT_FOR_OPERATION_SEC):
1487    start_time = time.time()
1488    while time.time() - start_time <= timeout_sec:
1489        result = gcp.compute.zoneOperations().get(
1490            project=gcp.project, zone=zone,
1491            operation=operation).execute(num_retries=_GCP_API_RETRIES)
1492        if result['status'] == 'DONE':
1493            if 'error' in result:
1494                raise Exception(result['error'])
1495            return
1496        time.sleep(2)
1497    raise Exception('Operation %s did not complete within %d', operation,
1498                    timeout_sec)
1499
1500
1501def wait_for_healthy_backends(gcp,
1502                              backend_service,
1503                              instance_group,
1504                              timeout_sec=_WAIT_FOR_BACKEND_SEC):
1505    start_time = time.time()
1506    config = {'group': instance_group.url}
1507    expected_size = len(get_instance_names(gcp, instance_group))
1508    while time.time() - start_time <= timeout_sec:
1509        result = gcp.compute.backendServices().getHealth(
1510            project=gcp.project,
1511            backendService=backend_service.name,
1512            body=config).execute(num_retries=_GCP_API_RETRIES)
1513        if 'healthStatus' in result:
1514            logger.info('received healthStatus: %s', result['healthStatus'])
1515            healthy = True
1516            for instance in result['healthStatus']:
1517                if instance['healthState'] != 'HEALTHY':
1518                    healthy = False
1519                    break
1520            if healthy and expected_size == len(result['healthStatus']):
1521                return
1522        time.sleep(2)
1523    raise Exception('Not all backends became healthy within %d seconds: %s' %
1524                    (timeout_sec, result))
1525
1526
1527def get_instance_names(gcp, instance_group):
1528    instance_names = []
1529    result = gcp.compute.instanceGroups().listInstances(
1530        project=gcp.project,
1531        zone=instance_group.zone,
1532        instanceGroup=instance_group.name,
1533        body={
1534            'instanceState': 'ALL'
1535        }).execute(num_retries=_GCP_API_RETRIES)
1536    if 'items' not in result:
1537        return []
1538    for item in result['items']:
1539        # listInstances() returns the full URL of the instance, which ends with
1540        # the instance name. compute.instances().get() requires using the
1541        # instance name (not the full URL) to look up instance details, so we
1542        # just extract the name manually.
1543        instance_name = item['instance'].split('/')[-1]
1544        instance_names.append(instance_name)
1545    logger.info('retrieved instance names: %s', instance_names)
1546    return instance_names
1547
1548
1549def clean_up(gcp):
1550    if gcp.global_forwarding_rule:
1551        delete_global_forwarding_rule(gcp)
1552    if gcp.target_proxy:
1553        delete_target_proxy(gcp)
1554    if gcp.url_map:
1555        delete_url_map(gcp)
1556    delete_backend_services(gcp)
1557    if gcp.health_check_firewall_rule:
1558        delete_firewall(gcp)
1559    if gcp.health_check:
1560        delete_health_check(gcp)
1561    delete_instance_groups(gcp)
1562    if gcp.instance_template:
1563        delete_instance_template(gcp)
1564
1565
1566class InstanceGroup(object):
1567
1568    def __init__(self, name, url, zone):
1569        self.name = name
1570        self.url = url
1571        self.zone = zone
1572
1573
1574class GcpResource(object):
1575
1576    def __init__(self, name, url):
1577        self.name = name
1578        self.url = url
1579
1580
1581class GcpState(object):
1582
1583    def __init__(self, compute, alpha_compute, project):
1584        self.compute = compute
1585        self.alpha_compute = alpha_compute
1586        self.project = project
1587        self.health_check = None
1588        self.health_check_firewall_rule = None
1589        self.backend_services = []
1590        self.url_map = None
1591        self.target_proxy = None
1592        self.global_forwarding_rule = None
1593        self.service_port = None
1594        self.instance_template = None
1595        self.instance_groups = []
1596
1597
1598alpha_compute = None
1599if args.compute_discovery_document:
1600    with open(args.compute_discovery_document, 'r') as discovery_doc:
1601        compute = googleapiclient.discovery.build_from_document(
1602            discovery_doc.read())
1603    if not args.only_stable_gcp_apis and args.alpha_compute_discovery_document:
1604        with open(args.alpha_compute_discovery_document, 'r') as discovery_doc:
1605            alpha_compute = googleapiclient.discovery.build_from_document(
1606                discovery_doc.read())
1607else:
1608    compute = googleapiclient.discovery.build('compute', 'v1')
1609    if not args.only_stable_gcp_apis:
1610        alpha_compute = googleapiclient.discovery.build('compute', 'alpha')
1611
1612try:
1613    gcp = GcpState(compute, alpha_compute, args.project_id)
1614    health_check_name = _BASE_HEALTH_CHECK_NAME + args.gcp_suffix
1615    firewall_name = _BASE_FIREWALL_RULE_NAME + args.gcp_suffix
1616    backend_service_name = _BASE_BACKEND_SERVICE_NAME + args.gcp_suffix
1617    alternate_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-alternate' + args.gcp_suffix
1618    url_map_name = _BASE_URL_MAP_NAME + args.gcp_suffix
1619    service_host_name = _BASE_SERVICE_HOST + args.gcp_suffix
1620    target_proxy_name = _BASE_TARGET_PROXY_NAME + args.gcp_suffix
1621    forwarding_rule_name = _BASE_FORWARDING_RULE_NAME + args.gcp_suffix
1622    template_name = _BASE_TEMPLATE_NAME + args.gcp_suffix
1623    instance_group_name = _BASE_INSTANCE_GROUP_NAME + args.gcp_suffix
1624    same_zone_instance_group_name = _BASE_INSTANCE_GROUP_NAME + '-same-zone' + args.gcp_suffix
1625    secondary_zone_instance_group_name = _BASE_INSTANCE_GROUP_NAME + '-secondary-zone' + args.gcp_suffix
1626    if args.use_existing_gcp_resources:
1627        logger.info('Reusing existing GCP resources')
1628        get_health_check(gcp, health_check_name)
1629        try:
1630            get_health_check_firewall_rule(gcp, firewall_name)
1631        except googleapiclient.errors.HttpError as http_error:
1632            # Firewall rule may be auto-deleted periodically depending on GCP
1633            # project settings.
1634            logger.exception('Failed to find firewall rule, recreating')
1635            create_health_check_firewall_rule(gcp, firewall_name)
1636        backend_service = get_backend_service(gcp, backend_service_name)
1637        alternate_backend_service = get_backend_service(
1638            gcp, alternate_backend_service_name)
1639        get_url_map(gcp, url_map_name)
1640        get_target_proxy(gcp, target_proxy_name)
1641        get_global_forwarding_rule(gcp, forwarding_rule_name)
1642        get_instance_template(gcp, template_name)
1643        instance_group = get_instance_group(gcp, args.zone, instance_group_name)
1644        same_zone_instance_group = get_instance_group(
1645            gcp, args.zone, same_zone_instance_group_name)
1646        secondary_zone_instance_group = get_instance_group(
1647            gcp, args.secondary_zone, secondary_zone_instance_group_name)
1648    else:
1649        create_health_check(gcp, health_check_name)
1650        create_health_check_firewall_rule(gcp, firewall_name)
1651        backend_service = add_backend_service(gcp, backend_service_name)
1652        alternate_backend_service = add_backend_service(
1653            gcp, alternate_backend_service_name)
1654        create_url_map(gcp, url_map_name, backend_service, service_host_name)
1655        create_target_proxy(gcp, target_proxy_name)
1656        potential_service_ports = list(args.service_port_range)
1657        random.shuffle(potential_service_ports)
1658        create_global_forwarding_rule(gcp, forwarding_rule_name,
1659                                      potential_service_ports)
1660        if not gcp.service_port:
1661            raise Exception(
1662                'Failed to find a valid ip:port for the forwarding rule')
1663        if gcp.service_port != _DEFAULT_SERVICE_PORT:
1664            patch_url_map_host_rule_with_port(gcp, url_map_name,
1665                                              backend_service,
1666                                              service_host_name)
1667        startup_script = get_startup_script(args.path_to_server_binary,
1668                                            gcp.service_port)
1669        create_instance_template(gcp, template_name, args.network,
1670                                 args.source_image, args.machine_type,
1671                                 startup_script)
1672        instance_group = add_instance_group(gcp, args.zone, instance_group_name,
1673                                            _INSTANCE_GROUP_SIZE)
1674        patch_backend_instances(gcp, backend_service, [instance_group])
1675        same_zone_instance_group = add_instance_group(
1676            gcp, args.zone, same_zone_instance_group_name, _INSTANCE_GROUP_SIZE)
1677        secondary_zone_instance_group = add_instance_group(
1678            gcp, args.secondary_zone, secondary_zone_instance_group_name,
1679            _INSTANCE_GROUP_SIZE)
1680
1681    wait_for_healthy_backends(gcp, backend_service, instance_group)
1682
1683    if args.test_case:
1684        if gcp.service_port == _DEFAULT_SERVICE_PORT:
1685            server_uri = service_host_name
1686        else:
1687            server_uri = service_host_name + ':' + str(gcp.service_port)
1688        if args.bootstrap_file:
1689            bootstrap_path = os.path.abspath(args.bootstrap_file)
1690        else:
1691            with tempfile.NamedTemporaryFile(delete=False) as bootstrap_file:
1692                bootstrap_file.write(
1693                    _BOOTSTRAP_TEMPLATE.format(
1694                        node_id=socket.gethostname()).encode('utf-8'))
1695                bootstrap_path = bootstrap_file.name
1696        client_env = dict(os.environ, GRPC_XDS_BOOTSTRAP=bootstrap_path)
1697
1698        test_results = {}
1699        failed_tests = []
1700        for test_case in args.test_case:
1701            result = jobset.JobResult()
1702            log_dir = os.path.join(_TEST_LOG_BASE_DIR, test_case)
1703            if not os.path.exists(log_dir):
1704                os.makedirs(log_dir)
1705            test_log_filename = os.path.join(log_dir, _SPONGE_LOG_NAME)
1706            test_log_file = open(test_log_filename, 'w+')
1707            client_process = None
1708
1709            if test_case in _TESTS_TO_RUN_MULTIPLE_RPCS:
1710                rpcs_to_send = '--rpc="UnaryCall,EmptyCall"'
1711            else:
1712                rpcs_to_send = '--rpc="UnaryCall"'
1713
1714            if test_case in _TESTS_TO_SEND_METADATA:
1715                metadata_to_send = '--metadata="EmptyCall:{key}:{value}"'.format(
1716                    key=_TEST_METADATA_KEY, value=_TEST_METADATA_VALUE)
1717            else:
1718                metadata_to_send = '--metadata=""'
1719
1720            if test_case in _TESTS_TO_FAIL_ON_RPC_FAILURE:
1721                fail_on_failed_rpc = '--fail_on_failed_rpc=true'
1722            else:
1723                fail_on_failed_rpc = '--fail_on_failed_rpc=false'
1724
1725            client_cmd_formatted = args.client_cmd.format(
1726                server_uri=server_uri,
1727                stats_port=args.stats_port,
1728                qps=args.qps,
1729                fail_on_failed_rpc=fail_on_failed_rpc,
1730                rpcs_to_send=rpcs_to_send,
1731                metadata_to_send=metadata_to_send)
1732            logger.debug('running client: %s', client_cmd_formatted)
1733            client_cmd = shlex.split(client_cmd_formatted)
1734            try:
1735                client_process = subprocess.Popen(client_cmd,
1736                                                  env=client_env,
1737                                                  stderr=subprocess.STDOUT,
1738                                                  stdout=test_log_file)
1739                if test_case == 'backends_restart':
1740                    test_backends_restart(gcp, backend_service, instance_group)
1741                elif test_case == 'change_backend_service':
1742                    test_change_backend_service(gcp, backend_service,
1743                                                instance_group,
1744                                                alternate_backend_service,
1745                                                same_zone_instance_group)
1746                elif test_case == 'gentle_failover':
1747                    test_gentle_failover(gcp, backend_service, instance_group,
1748                                         secondary_zone_instance_group)
1749                elif test_case == 'new_instance_group_receives_traffic':
1750                    test_new_instance_group_receives_traffic(
1751                        gcp, backend_service, instance_group,
1752                        same_zone_instance_group)
1753                elif test_case == 'ping_pong':
1754                    test_ping_pong(gcp, backend_service, instance_group)
1755                elif test_case == 'remove_instance_group':
1756                    test_remove_instance_group(gcp, backend_service,
1757                                               instance_group,
1758                                               same_zone_instance_group)
1759                elif test_case == 'round_robin':
1760                    test_round_robin(gcp, backend_service, instance_group)
1761                elif test_case == 'secondary_locality_gets_no_requests_on_partial_primary_failure':
1762                    test_secondary_locality_gets_no_requests_on_partial_primary_failure(
1763                        gcp, backend_service, instance_group,
1764                        secondary_zone_instance_group)
1765                elif test_case == 'secondary_locality_gets_requests_on_primary_failure':
1766                    test_secondary_locality_gets_requests_on_primary_failure(
1767                        gcp, backend_service, instance_group,
1768                        secondary_zone_instance_group)
1769                elif test_case == 'traffic_splitting':
1770                    test_traffic_splitting(gcp, backend_service, instance_group,
1771                                           alternate_backend_service,
1772                                           same_zone_instance_group)
1773                elif test_case == 'path_matching':
1774                    test_path_matching(gcp, backend_service, instance_group,
1775                                       alternate_backend_service,
1776                                       same_zone_instance_group)
1777                elif test_case == 'header_matching':
1778                    test_header_matching(gcp, backend_service, instance_group,
1779                                         alternate_backend_service,
1780                                         same_zone_instance_group)
1781                else:
1782                    logger.error('Unknown test case: %s', test_case)
1783                    sys.exit(1)
1784                if client_process.poll() is not None:
1785                    raise Exception(
1786                        'Client process exited prematurely with exit code %d' %
1787                        client_process.returncode)
1788                result.state = 'PASSED'
1789                result.returncode = 0
1790            except Exception as e:
1791                logger.exception('Test case %s failed', test_case)
1792                failed_tests.append(test_case)
1793                result.state = 'FAILED'
1794                result.message = str(e)
1795            finally:
1796                if client_process and not client_process.returncode:
1797                    client_process.terminate()
1798                test_log_file.close()
1799                # Workaround for Python 3, as report_utils will invoke decode() on
1800                # result.message, which has a default value of ''.
1801                result.message = result.message.encode('UTF-8')
1802                test_results[test_case] = [result]
1803                if args.log_client_output:
1804                    logger.info('Client output:')
1805                    with open(test_log_filename, 'r') as client_output:
1806                        logger.info(client_output.read())
1807        if not os.path.exists(_TEST_LOG_BASE_DIR):
1808            os.makedirs(_TEST_LOG_BASE_DIR)
1809        report_utils.render_junit_xml_report(test_results,
1810                                             os.path.join(
1811                                                 _TEST_LOG_BASE_DIR,
1812                                                 _SPONGE_XML_NAME),
1813                                             suite_name='xds_tests',
1814                                             multi_target=True)
1815        if failed_tests:
1816            logger.error('Test case(s) %s failed', failed_tests)
1817            sys.exit(1)
1818finally:
1819    if not args.keep_gcp_resources:
1820        logger.info('Cleaning up GCP resources. This may take some time.')
1821        clean_up(gcp)
1822