• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""The Python implementation of the GRPC interoperability test client."""
15
16import argparse
17import os
18
19from google import auth as google_auth
20from google.auth import jwt as google_auth_jwt
21import grpc
22from src.proto.grpc.testing import test_pb2_grpc
23
24from tests.interop import methods
25from tests.interop import resources
26
27
28def _args():
29    parser = argparse.ArgumentParser()
30    parser.add_argument(
31        '--server_host',
32        default="localhost",
33        type=str,
34        help='the host to which to connect')
35    parser.add_argument(
36        '--server_port',
37        type=int,
38        required=True,
39        help='the port to which to connect')
40    parser.add_argument(
41        '--test_case',
42        default='large_unary',
43        type=str,
44        help='the test case to execute')
45    parser.add_argument(
46        '--use_tls',
47        default=False,
48        type=resources.parse_bool,
49        help='require a secure connection')
50    parser.add_argument(
51        '--use_test_ca',
52        default=False,
53        type=resources.parse_bool,
54        help='replace platform root CAs with ca.pem')
55    parser.add_argument(
56        '--server_host_override',
57        default="foo.test.google.fr",
58        type=str,
59        help='the server host to which to claim to connect')
60    parser.add_argument(
61        '--oauth_scope', type=str, help='scope for OAuth tokens')
62    parser.add_argument(
63        '--default_service_account',
64        type=str,
65        help='email address of the default service account')
66    return parser.parse_args()
67
68
69def _stub(args):
70    target = '{}:{}'.format(args.server_host, args.server_port)
71    if args.test_case == 'oauth2_auth_token':
72        google_credentials, unused_project_id = google_auth.default(
73            scopes=[args.oauth_scope])
74        google_credentials.refresh(google_auth.transport.requests.Request())
75        call_credentials = grpc.access_token_call_credentials(
76            google_credentials.token)
77    elif args.test_case == 'compute_engine_creds':
78        google_credentials, unused_project_id = google_auth.default(
79            scopes=[args.oauth_scope])
80        call_credentials = grpc.metadata_call_credentials(
81            google_auth.transport.grpc.AuthMetadataPlugin(
82                credentials=google_credentials,
83                request=google_auth.transport.requests.Request()))
84    elif args.test_case == 'jwt_token_creds':
85        google_credentials = google_auth_jwt.OnDemandCredentials.from_service_account_file(
86            os.environ[google_auth.environment_vars.CREDENTIALS])
87        call_credentials = grpc.metadata_call_credentials(
88            google_auth.transport.grpc.AuthMetadataPlugin(
89                credentials=google_credentials, request=None))
90    else:
91        call_credentials = None
92    if args.use_tls:
93        if args.use_test_ca:
94            root_certificates = resources.test_root_certificates()
95        else:
96            root_certificates = None  # will load default roots.
97
98        channel_credentials = grpc.ssl_channel_credentials(root_certificates)
99        if call_credentials is not None:
100            channel_credentials = grpc.composite_channel_credentials(
101                channel_credentials, call_credentials)
102
103        channel = grpc.secure_channel(target, channel_credentials, ((
104            'grpc.ssl_target_name_override',
105            args.server_host_override,
106        ),))
107    else:
108        channel = grpc.insecure_channel(target)
109    if args.test_case == "unimplemented_service":
110        return test_pb2_grpc.UnimplementedServiceStub(channel)
111    else:
112        return test_pb2_grpc.TestServiceStub(channel)
113
114
115def _test_case_from_arg(test_case_arg):
116    for test_case in methods.TestCase:
117        if test_case_arg == test_case.value:
118            return test_case
119    else:
120        raise ValueError('No test case "%s"!' % test_case_arg)
121
122
123def test_interoperability():
124    args = _args()
125    stub = _stub(args)
126    test_case = _test_case_from_arg(args.test_case)
127    test_case.test_interoperability(stub, args)
128
129
130if __name__ == '__main__':
131    test_interoperability()
132