1# Copyright 2021 The gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Tests xDS server and channel credentials.""" 15 16from concurrent import futures 17import contextlib 18import logging 19import unittest 20 21import grpc 22import grpc.experimental 23 24from tests.unit import resources 25from tests.unit import test_common 26 27_SERVICE_NAME = "test" 28_METHOD = "method" 29 30_METHOD_HANDLERS = { 31 _METHOD: grpc.unary_unary_rpc_method_handler( 32 lambda request, unused_context: request 33 ) 34} 35 36 37@contextlib.contextmanager 38def xds_channel_server_without_xds(server_fallback_creds): 39 server = grpc.server(futures.ThreadPoolExecutor()) 40 server.add_registered_method_handlers(_SERVICE_NAME, _METHOD_HANDLERS) 41 server_server_fallback_creds = grpc.ssl_server_credentials( 42 ((resources.private_key(), resources.certificate_chain()),) 43 ) 44 server_creds = grpc.xds_server_credentials(server_fallback_creds) 45 port = server.add_secure_port("localhost:0", server_creds) 46 server.start() 47 try: 48 yield "localhost:{}".format(port) 49 finally: 50 server.stop(None) 51 52 53class XdsCredentialsTest(unittest.TestCase): 54 def test_xds_creds_fallback_ssl(self): 55 # Since there is no xDS server, the fallback credentials will be used. 56 # In this case, SSL credentials. 57 server_fallback_creds = grpc.ssl_server_credentials( 58 ((resources.private_key(), resources.certificate_chain()),) 59 ) 60 with xds_channel_server_without_xds( 61 server_fallback_creds 62 ) as server_address: 63 override_options = ( 64 ("grpc.ssl_target_name_override", "foo.test.google.fr"), 65 ) 66 channel_fallback_creds = grpc.ssl_channel_credentials( 67 root_certificates=resources.test_root_certificates(), 68 private_key=resources.private_key(), 69 certificate_chain=resources.certificate_chain(), 70 ) 71 channel_creds = grpc.xds_channel_credentials(channel_fallback_creds) 72 with grpc.secure_channel( 73 server_address, channel_creds, options=override_options 74 ) as channel: 75 request = b"abc" 76 response = channel.unary_unary( 77 grpc._common.fully_qualified_method(_SERVICE_NAME, _METHOD), 78 _registered_method=True, 79 )(request, wait_for_ready=True) 80 self.assertEqual(response, request) 81 82 def test_xds_creds_fallback_insecure(self): 83 # Since there is no xDS server, the fallback credentials will be used. 84 # In this case, insecure. 85 server_fallback_creds = grpc.insecure_server_credentials() 86 with xds_channel_server_without_xds( 87 server_fallback_creds 88 ) as server_address: 89 channel_fallback_creds = ( 90 grpc.experimental.insecure_channel_credentials() 91 ) 92 channel_creds = grpc.xds_channel_credentials(channel_fallback_creds) 93 with grpc.secure_channel(server_address, channel_creds) as channel: 94 request = b"abc" 95 response = channel.unary_unary( 96 "/test/method", 97 _registered_method=True, 98 )(request, wait_for_ready=True) 99 self.assertEqual(response, request) 100 101 def test_start_xds_server(self): 102 server = grpc.server(futures.ThreadPoolExecutor(), xds=True) 103 server.add_registered_method_handlers(_SERVICE_NAME, _METHOD_HANDLERS) 104 server_fallback_creds = grpc.insecure_server_credentials() 105 server_creds = grpc.xds_server_credentials(server_fallback_creds) 106 port = server.add_secure_port("localhost:0", server_creds) 107 server.start() 108 server.stop(None) 109 # No exceptions thrown. A more comprehensive suite of tests will be 110 # provided by the interop tests. 111 112 113if __name__ == "__main__": 114 logging.basicConfig() 115 unittest.main() 116