• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 The gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Tests xDS server and channel credentials."""
15
16from concurrent import futures
17import contextlib
18import logging
19import unittest
20
21import grpc
22import grpc.experimental
23
24from tests.unit import resources
25from tests.unit import test_common
26
27_SERVICE_NAME = "test"
28_METHOD = "method"
29
30_METHOD_HANDLERS = {
31    _METHOD: grpc.unary_unary_rpc_method_handler(
32        lambda request, unused_context: request
33    )
34}
35
36
37@contextlib.contextmanager
38def xds_channel_server_without_xds(server_fallback_creds):
39    server = grpc.server(futures.ThreadPoolExecutor())
40    server.add_registered_method_handlers(_SERVICE_NAME, _METHOD_HANDLERS)
41    server_server_fallback_creds = grpc.ssl_server_credentials(
42        ((resources.private_key(), resources.certificate_chain()),)
43    )
44    server_creds = grpc.xds_server_credentials(server_fallback_creds)
45    port = server.add_secure_port("localhost:0", server_creds)
46    server.start()
47    try:
48        yield "localhost:{}".format(port)
49    finally:
50        server.stop(None)
51
52
53class XdsCredentialsTest(unittest.TestCase):
54    def test_xds_creds_fallback_ssl(self):
55        # Since there is no xDS server, the fallback credentials will be used.
56        # In this case, SSL credentials.
57        server_fallback_creds = grpc.ssl_server_credentials(
58            ((resources.private_key(), resources.certificate_chain()),)
59        )
60        with xds_channel_server_without_xds(
61            server_fallback_creds
62        ) as server_address:
63            override_options = (
64                ("grpc.ssl_target_name_override", "foo.test.google.fr"),
65            )
66            channel_fallback_creds = grpc.ssl_channel_credentials(
67                root_certificates=resources.test_root_certificates(),
68                private_key=resources.private_key(),
69                certificate_chain=resources.certificate_chain(),
70            )
71            channel_creds = grpc.xds_channel_credentials(channel_fallback_creds)
72            with grpc.secure_channel(
73                server_address, channel_creds, options=override_options
74            ) as channel:
75                request = b"abc"
76                response = channel.unary_unary(
77                    grpc._common.fully_qualified_method(_SERVICE_NAME, _METHOD),
78                    _registered_method=True,
79                )(request, wait_for_ready=True)
80                self.assertEqual(response, request)
81
82    def test_xds_creds_fallback_insecure(self):
83        # Since there is no xDS server, the fallback credentials will be used.
84        # In this case, insecure.
85        server_fallback_creds = grpc.insecure_server_credentials()
86        with xds_channel_server_without_xds(
87            server_fallback_creds
88        ) as server_address:
89            channel_fallback_creds = (
90                grpc.experimental.insecure_channel_credentials()
91            )
92            channel_creds = grpc.xds_channel_credentials(channel_fallback_creds)
93            with grpc.secure_channel(server_address, channel_creds) as channel:
94                request = b"abc"
95                response = channel.unary_unary(
96                    "/test/method",
97                    _registered_method=True,
98                )(request, wait_for_ready=True)
99                self.assertEqual(response, request)
100
101    def test_start_xds_server(self):
102        server = grpc.server(futures.ThreadPoolExecutor(), xds=True)
103        server.add_registered_method_handlers(_SERVICE_NAME, _METHOD_HANDLERS)
104        server_fallback_creds = grpc.insecure_server_credentials()
105        server_creds = grpc.xds_server_credentials(server_fallback_creds)
106        port = server.add_secure_port("localhost:0", server_creds)
107        server.start()
108        server.stop(None)
109        # No exceptions thrown. A more comprehensive suite of tests will be
110        # provided by the interop tests.
111
112
113if __name__ == "__main__":
114    logging.basicConfig()
115    unittest.main()
116