• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2017 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Tests exposure of SSL auth context"""
15
16import logging
17import pickle
18import unittest
19
20import grpc
21from grpc import _channel
22from grpc.experimental import session_cache
23
24from tests.unit import resources
25from tests.unit import test_common
26
27_REQUEST = b"\x00\x00\x00"
28_RESPONSE = b"\x00\x00\x00"
29
30_UNARY_UNARY = "/test/UnaryUnary"
31
32_SERVER_HOST_OVERRIDE = "foo.test.google.fr"
33_CLIENT_IDS = (
34    b"*.test.google.fr",
35    b"waterzooi.test.google.be",
36    b"*.test.youtube.com",
37    b"192.168.1.3",
38)
39_ID = "id"
40_ID_KEY = "id_key"
41_AUTH_CTX = "auth_ctx"
42
43_PRIVATE_KEY = resources.private_key()
44_CERTIFICATE_CHAIN = resources.certificate_chain()
45_TEST_ROOT_CERTIFICATES = resources.test_root_certificates()
46_SERVER_CERTS = ((_PRIVATE_KEY, _CERTIFICATE_CHAIN),)
47_PROPERTY_OPTIONS = (
48    (
49        "grpc.ssl_target_name_override",
50        _SERVER_HOST_OVERRIDE,
51    ),
52)
53
54
55def handle_unary_unary(request, servicer_context):
56    return pickle.dumps(
57        {
58            _ID: servicer_context.peer_identities(),
59            _ID_KEY: servicer_context.peer_identity_key(),
60            _AUTH_CTX: servicer_context.auth_context(),
61        }
62    )
63
64
65class AuthContextTest(unittest.TestCase):
66    def testInsecure(self):
67        handler = grpc.method_handlers_generic_handler(
68            "test",
69            {
70                "UnaryUnary": grpc.unary_unary_rpc_method_handler(
71                    handle_unary_unary
72                )
73            },
74        )
75        server = test_common.test_server()
76        server.add_generic_rpc_handlers((handler,))
77        port = server.add_insecure_port("[::]:0")
78        server.start()
79
80        with grpc.insecure_channel("localhost:%d" % port) as channel:
81            response = channel.unary_unary(
82                _UNARY_UNARY,
83                _registered_method=True,
84            )(_REQUEST)
85        server.stop(None)
86
87        auth_data = pickle.loads(response)
88        self.assertIsNone(auth_data[_ID])
89        self.assertIsNone(auth_data[_ID_KEY])
90        self.assertDictEqual(
91            {
92                "security_level": [b"TSI_SECURITY_NONE"],
93                "transport_security_type": [b"insecure"],
94            },
95            auth_data[_AUTH_CTX],
96        )
97
98    def testSecureNoCert(self):
99        handler = grpc.method_handlers_generic_handler(
100            "test",
101            {
102                "UnaryUnary": grpc.unary_unary_rpc_method_handler(
103                    handle_unary_unary
104                )
105            },
106        )
107        server = test_common.test_server()
108        server.add_generic_rpc_handlers((handler,))
109        server_cred = grpc.ssl_server_credentials(_SERVER_CERTS)
110        port = server.add_secure_port("[::]:0", server_cred)
111        server.start()
112
113        channel_creds = grpc.ssl_channel_credentials(
114            root_certificates=_TEST_ROOT_CERTIFICATES
115        )
116        channel = grpc.secure_channel(
117            "localhost:{}".format(port),
118            channel_creds,
119            options=_PROPERTY_OPTIONS,
120        )
121        response = channel.unary_unary(
122            _UNARY_UNARY,
123            _registered_method=True,
124        )(_REQUEST)
125        channel.close()
126        server.stop(None)
127
128        auth_data = pickle.loads(response)
129        self.assertIsNone(auth_data[_ID])
130        self.assertIsNone(auth_data[_ID_KEY])
131        self.assertDictEqual(
132            {
133                "security_level": [b"TSI_PRIVACY_AND_INTEGRITY"],
134                "transport_security_type": [b"ssl"],
135                "ssl_session_reused": [b"false"],
136            },
137            auth_data[_AUTH_CTX],
138        )
139
140    def testSecureClientCert(self):
141        handler = grpc.method_handlers_generic_handler(
142            "test",
143            {
144                "UnaryUnary": grpc.unary_unary_rpc_method_handler(
145                    handle_unary_unary
146                )
147            },
148        )
149        server = test_common.test_server()
150        server.add_generic_rpc_handlers((handler,))
151        server_cred = grpc.ssl_server_credentials(
152            _SERVER_CERTS,
153            root_certificates=_TEST_ROOT_CERTIFICATES,
154            require_client_auth=True,
155        )
156        port = server.add_secure_port("[::]:0", server_cred)
157        server.start()
158
159        channel_creds = grpc.ssl_channel_credentials(
160            root_certificates=_TEST_ROOT_CERTIFICATES,
161            private_key=_PRIVATE_KEY,
162            certificate_chain=_CERTIFICATE_CHAIN,
163        )
164        channel = grpc.secure_channel(
165            "localhost:{}".format(port),
166            channel_creds,
167            options=_PROPERTY_OPTIONS,
168        )
169
170        response = channel.unary_unary(
171            _UNARY_UNARY,
172            _registered_method=True,
173        )(_REQUEST)
174        channel.close()
175        server.stop(None)
176
177        auth_data = pickle.loads(response)
178        auth_ctx = auth_data[_AUTH_CTX]
179        self.assertCountEqual(_CLIENT_IDS, auth_data[_ID])
180        self.assertEqual("x509_subject_alternative_name", auth_data[_ID_KEY])
181        self.assertSequenceEqual([b"ssl"], auth_ctx["transport_security_type"])
182        self.assertSequenceEqual(
183            [b"*.test.google.com"], auth_ctx["x509_common_name"]
184        )
185
186    def _do_one_shot_client_rpc(
187        self, channel_creds, channel_options, port, expect_ssl_session_reused
188    ):
189        channel = grpc.secure_channel(
190            "localhost:{}".format(port), channel_creds, options=channel_options
191        )
192        response = channel.unary_unary(
193            _UNARY_UNARY,
194            _registered_method=True,
195        )(_REQUEST)
196        auth_data = pickle.loads(response)
197        self.assertEqual(
198            expect_ssl_session_reused,
199            auth_data[_AUTH_CTX]["ssl_session_reused"],
200        )
201        channel.close()
202
203    def testSessionResumption(self):
204        # Set up a secure server
205        handler = grpc.method_handlers_generic_handler(
206            "test",
207            {
208                "UnaryUnary": grpc.unary_unary_rpc_method_handler(
209                    handle_unary_unary
210                )
211            },
212        )
213        server = test_common.test_server()
214        server.add_generic_rpc_handlers((handler,))
215        server_cred = grpc.ssl_server_credentials(_SERVER_CERTS)
216        port = server.add_secure_port("[::]:0", server_cred)
217        server.start()
218
219        # Create a cache for TLS session tickets
220        cache = session_cache.ssl_session_cache_lru(1)
221        channel_creds = grpc.ssl_channel_credentials(
222            root_certificates=_TEST_ROOT_CERTIFICATES
223        )
224        channel_options = _PROPERTY_OPTIONS + (
225            ("grpc.ssl_session_cache", cache),
226        )
227
228        # Initial connection has no session to resume
229        self._do_one_shot_client_rpc(
230            channel_creds,
231            channel_options,
232            port,
233            expect_ssl_session_reused=[b"false"],
234        )
235
236        # Subsequent connections resume sessions
237        self._do_one_shot_client_rpc(
238            channel_creds,
239            channel_options,
240            port,
241            expect_ssl_session_reused=[b"true"],
242        )
243        server.stop(None)
244
245
246if __name__ == "__main__":
247    logging.basicConfig()
248    unittest.main(verbosity=2)
249