• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2017 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Tests exposure of SSL auth context"""
15
16import logging
17import pickle
18import unittest
19
20import grpc
21from grpc import _channel
22from grpc.experimental import session_cache
23
24from tests.unit import resources
25from tests.unit import test_common
26
27_REQUEST = b"\x00\x00\x00"
28_RESPONSE = b"\x00\x00\x00"
29
30_SERVICE_NAME = "test"
31_UNARY_UNARY = "UnaryUnary"
32
33_SERVER_HOST_OVERRIDE = "foo.test.google.fr"
34_CLIENT_IDS = (
35    b"*.test.google.fr",
36    b"waterzooi.test.google.be",
37    b"*.test.youtube.com",
38    b"192.168.1.3",
39)
40_ID = "id"
41_ID_KEY = "id_key"
42_AUTH_CTX = "auth_ctx"
43
44_PRIVATE_KEY = resources.private_key()
45_CERTIFICATE_CHAIN = resources.certificate_chain()
46_TEST_ROOT_CERTIFICATES = resources.test_root_certificates()
47_SERVER_CERTS = ((_PRIVATE_KEY, _CERTIFICATE_CHAIN),)
48_PROPERTY_OPTIONS = (
49    (
50        "grpc.ssl_target_name_override",
51        _SERVER_HOST_OVERRIDE,
52    ),
53)
54
55
56def handle_unary_unary(request, servicer_context):
57    return pickle.dumps(
58        {
59            _ID: servicer_context.peer_identities(),
60            _ID_KEY: servicer_context.peer_identity_key(),
61            _AUTH_CTX: servicer_context.auth_context(),
62        }
63    )
64
65
66_METHOD_HANDLERS = {
67    "UnaryUnary": grpc.unary_unary_rpc_method_handler(handle_unary_unary)
68}
69
70
71class AuthContextTest(unittest.TestCase):
72    def testInsecure(self):
73        server = test_common.test_server()
74        server.add_registered_method_handlers(_SERVICE_NAME, _METHOD_HANDLERS)
75        port = server.add_insecure_port("[::]:0")
76        server.start()
77
78        with grpc.insecure_channel("localhost:%d" % port) as channel:
79            response = channel.unary_unary(
80                grpc._common.fully_qualified_method(
81                    _SERVICE_NAME, _UNARY_UNARY
82                ),
83                _registered_method=True,
84            )(_REQUEST)
85        server.stop(None)
86
87        auth_data = pickle.loads(response)
88        self.assertIsNone(auth_data[_ID])
89        self.assertIsNone(auth_data[_ID_KEY])
90        self.assertDictEqual(
91            {
92                "security_level": [b"TSI_SECURITY_NONE"],
93                "transport_security_type": [b"insecure"],
94            },
95            auth_data[_AUTH_CTX],
96        )
97
98    def testSecureNoCert(self):
99        server = test_common.test_server()
100        server.add_registered_method_handlers(_SERVICE_NAME, _METHOD_HANDLERS)
101        server_cred = grpc.ssl_server_credentials(_SERVER_CERTS)
102        port = server.add_secure_port("[::]:0", server_cred)
103        server.start()
104
105        channel_creds = grpc.ssl_channel_credentials(
106            root_certificates=_TEST_ROOT_CERTIFICATES
107        )
108        channel = grpc.secure_channel(
109            "localhost:{}".format(port),
110            channel_creds,
111            options=_PROPERTY_OPTIONS,
112        )
113        response = channel.unary_unary(
114            grpc._common.fully_qualified_method(_SERVICE_NAME, _UNARY_UNARY),
115            _registered_method=True,
116        )(_REQUEST)
117        channel.close()
118        server.stop(None)
119
120        auth_data = pickle.loads(response)
121        self.assertIsNone(auth_data[_ID])
122        self.assertIsNone(auth_data[_ID_KEY])
123        self.assertDictEqual(
124            {
125                "security_level": [b"TSI_PRIVACY_AND_INTEGRITY"],
126                "transport_security_type": [b"ssl"],
127                "ssl_session_reused": [b"false"],
128            },
129            auth_data[_AUTH_CTX],
130        )
131
132    def testSecureClientCert(self):
133        server = test_common.test_server()
134        server.add_registered_method_handlers(_SERVICE_NAME, _METHOD_HANDLERS)
135        server_cred = grpc.ssl_server_credentials(
136            _SERVER_CERTS,
137            root_certificates=_TEST_ROOT_CERTIFICATES,
138            require_client_auth=True,
139        )
140        port = server.add_secure_port("[::]:0", server_cred)
141        server.start()
142
143        channel_creds = grpc.ssl_channel_credentials(
144            root_certificates=_TEST_ROOT_CERTIFICATES,
145            private_key=_PRIVATE_KEY,
146            certificate_chain=_CERTIFICATE_CHAIN,
147        )
148        channel = grpc.secure_channel(
149            "localhost:{}".format(port),
150            channel_creds,
151            options=_PROPERTY_OPTIONS,
152        )
153
154        response = channel.unary_unary(
155            grpc._common.fully_qualified_method(_SERVICE_NAME, _UNARY_UNARY),
156            _registered_method=True,
157        )(_REQUEST)
158        channel.close()
159        server.stop(None)
160
161        auth_data = pickle.loads(response)
162        auth_ctx = auth_data[_AUTH_CTX]
163        self.assertCountEqual(_CLIENT_IDS, auth_data[_ID])
164        self.assertEqual("x509_subject_alternative_name", auth_data[_ID_KEY])
165        self.assertSequenceEqual([b"ssl"], auth_ctx["transport_security_type"])
166        self.assertSequenceEqual(
167            [b"*.test.google.com"], auth_ctx["x509_common_name"]
168        )
169
170    def _do_one_shot_client_rpc(
171        self, channel_creds, channel_options, port, expect_ssl_session_reused
172    ):
173        channel = grpc.secure_channel(
174            "localhost:{}".format(port), channel_creds, options=channel_options
175        )
176        response = channel.unary_unary(
177            grpc._common.fully_qualified_method(_SERVICE_NAME, _UNARY_UNARY),
178            _registered_method=True,
179        )(_REQUEST)
180        auth_data = pickle.loads(response)
181        self.assertEqual(
182            expect_ssl_session_reused,
183            auth_data[_AUTH_CTX]["ssl_session_reused"],
184        )
185        channel.close()
186
187    def testSessionResumption(self):
188        # Set up a secure server
189        server = test_common.test_server()
190        server.add_registered_method_handlers(_SERVICE_NAME, _METHOD_HANDLERS)
191        server_cred = grpc.ssl_server_credentials(_SERVER_CERTS)
192        port = server.add_secure_port("[::]:0", server_cred)
193        server.start()
194
195        # Create a cache for TLS session tickets
196        cache = session_cache.ssl_session_cache_lru(1)
197        channel_creds = grpc.ssl_channel_credentials(
198            root_certificates=_TEST_ROOT_CERTIFICATES
199        )
200        channel_options = _PROPERTY_OPTIONS + (
201            ("grpc.ssl_session_cache", cache),
202        )
203
204        # Initial connection has no session to resume
205        self._do_one_shot_client_rpc(
206            channel_creds,
207            channel_options,
208            port,
209            expect_ssl_session_reused=[b"false"],
210        )
211
212        # Subsequent connections resume sessions
213        self._do_one_shot_client_rpc(
214            channel_creds,
215            channel_options,
216            port,
217            expect_ssl_session_reused=[b"true"],
218        )
219        server.stop(None)
220
221
222if __name__ == "__main__":
223    logging.basicConfig()
224    unittest.main(verbosity=2)
225