1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""The Python implementation of the GRPC interoperability test client.""" 15 16import argparse 17import os 18 19from google import auth as google_auth 20from google.auth import jwt as google_auth_jwt 21import grpc 22from src.proto.grpc.testing import test_pb2_grpc 23 24from tests.interop import methods 25from tests.interop import resources 26 27 28def _args(): 29 parser = argparse.ArgumentParser() 30 parser.add_argument( 31 '--server_host', 32 default="localhost", 33 type=str, 34 help='the host to which to connect') 35 parser.add_argument( 36 '--server_port', 37 type=int, 38 required=True, 39 help='the port to which to connect') 40 parser.add_argument( 41 '--test_case', 42 default='large_unary', 43 type=str, 44 help='the test case to execute') 45 parser.add_argument( 46 '--use_tls', 47 default=False, 48 type=resources.parse_bool, 49 help='require a secure connection') 50 parser.add_argument( 51 '--use_test_ca', 52 default=False, 53 type=resources.parse_bool, 54 help='replace platform root CAs with ca.pem') 55 parser.add_argument( 56 '--server_host_override', 57 default="foo.test.google.fr", 58 type=str, 59 help='the server host to which to claim to connect') 60 parser.add_argument( 61 '--oauth_scope', type=str, help='scope for OAuth tokens') 62 parser.add_argument( 63 '--default_service_account', 64 type=str, 65 help='email address of the default service account') 66 return parser.parse_args() 67 68 69def _stub(args): 70 target = '{}:{}'.format(args.server_host, args.server_port) 71 if args.test_case == 'oauth2_auth_token': 72 google_credentials, unused_project_id = google_auth.default( 73 scopes=[args.oauth_scope]) 74 google_credentials.refresh(google_auth.transport.requests.Request()) 75 call_credentials = grpc.access_token_call_credentials( 76 google_credentials.token) 77 elif args.test_case == 'compute_engine_creds': 78 google_credentials, unused_project_id = google_auth.default( 79 scopes=[args.oauth_scope]) 80 call_credentials = grpc.metadata_call_credentials( 81 google_auth.transport.grpc.AuthMetadataPlugin( 82 credentials=google_credentials, 83 request=google_auth.transport.requests.Request())) 84 elif args.test_case == 'jwt_token_creds': 85 google_credentials = google_auth_jwt.OnDemandCredentials.from_service_account_file( 86 os.environ[google_auth.environment_vars.CREDENTIALS]) 87 call_credentials = grpc.metadata_call_credentials( 88 google_auth.transport.grpc.AuthMetadataPlugin( 89 credentials=google_credentials, request=None)) 90 else: 91 call_credentials = None 92 if args.use_tls: 93 if args.use_test_ca: 94 root_certificates = resources.test_root_certificates() 95 else: 96 root_certificates = None # will load default roots. 97 98 channel_credentials = grpc.ssl_channel_credentials(root_certificates) 99 if call_credentials is not None: 100 channel_credentials = grpc.composite_channel_credentials( 101 channel_credentials, call_credentials) 102 103 channel = grpc.secure_channel(target, channel_credentials, (( 104 'grpc.ssl_target_name_override', 105 args.server_host_override, 106 ),)) 107 else: 108 channel = grpc.insecure_channel(target) 109 if args.test_case == "unimplemented_service": 110 return test_pb2_grpc.UnimplementedServiceStub(channel) 111 else: 112 return test_pb2_grpc.TestServiceStub(channel) 113 114 115def _test_case_from_arg(test_case_arg): 116 for test_case in methods.TestCase: 117 if test_case_arg == test_case.value: 118 return test_case 119 else: 120 raise ValueError('No test case "%s"!' % test_case_arg) 121 122 123def test_interoperability(): 124 args = _args() 125 stub = _stub(args) 126 test_case = _test_case_from_arg(args.test_case) 127 test_case.test_interoperability(stub, args) 128 129 130if __name__ == '__main__': 131 test_interoperability() 132