1# Copyright 2017 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Tests exposure of SSL auth context""" 15 16import logging 17import pickle 18import unittest 19 20import grpc 21from grpc import _channel 22from grpc.experimental import session_cache 23 24from tests.unit import resources 25from tests.unit import test_common 26 27_REQUEST = b"\x00\x00\x00" 28_RESPONSE = b"\x00\x00\x00" 29 30_UNARY_UNARY = "/test/UnaryUnary" 31 32_SERVER_HOST_OVERRIDE = "foo.test.google.fr" 33_CLIENT_IDS = ( 34 b"*.test.google.fr", 35 b"waterzooi.test.google.be", 36 b"*.test.youtube.com", 37 b"192.168.1.3", 38) 39_ID = "id" 40_ID_KEY = "id_key" 41_AUTH_CTX = "auth_ctx" 42 43_PRIVATE_KEY = resources.private_key() 44_CERTIFICATE_CHAIN = resources.certificate_chain() 45_TEST_ROOT_CERTIFICATES = resources.test_root_certificates() 46_SERVER_CERTS = ((_PRIVATE_KEY, _CERTIFICATE_CHAIN),) 47_PROPERTY_OPTIONS = ( 48 ( 49 "grpc.ssl_target_name_override", 50 _SERVER_HOST_OVERRIDE, 51 ), 52) 53 54 55def handle_unary_unary(request, servicer_context): 56 return pickle.dumps( 57 { 58 _ID: servicer_context.peer_identities(), 59 _ID_KEY: servicer_context.peer_identity_key(), 60 _AUTH_CTX: servicer_context.auth_context(), 61 } 62 ) 63 64 65class AuthContextTest(unittest.TestCase): 66 def testInsecure(self): 67 handler = grpc.method_handlers_generic_handler( 68 "test", 69 { 70 "UnaryUnary": grpc.unary_unary_rpc_method_handler( 71 handle_unary_unary 72 ) 73 }, 74 ) 75 server = test_common.test_server() 76 server.add_generic_rpc_handlers((handler,)) 77 port = server.add_insecure_port("[::]:0") 78 server.start() 79 80 with grpc.insecure_channel("localhost:%d" % port) as channel: 81 response = channel.unary_unary( 82 _UNARY_UNARY, 83 _registered_method=True, 84 )(_REQUEST) 85 server.stop(None) 86 87 auth_data = pickle.loads(response) 88 self.assertIsNone(auth_data[_ID]) 89 self.assertIsNone(auth_data[_ID_KEY]) 90 self.assertDictEqual( 91 { 92 "security_level": [b"TSI_SECURITY_NONE"], 93 "transport_security_type": [b"insecure"], 94 }, 95 auth_data[_AUTH_CTX], 96 ) 97 98 def testSecureNoCert(self): 99 handler = grpc.method_handlers_generic_handler( 100 "test", 101 { 102 "UnaryUnary": grpc.unary_unary_rpc_method_handler( 103 handle_unary_unary 104 ) 105 }, 106 ) 107 server = test_common.test_server() 108 server.add_generic_rpc_handlers((handler,)) 109 server_cred = grpc.ssl_server_credentials(_SERVER_CERTS) 110 port = server.add_secure_port("[::]:0", server_cred) 111 server.start() 112 113 channel_creds = grpc.ssl_channel_credentials( 114 root_certificates=_TEST_ROOT_CERTIFICATES 115 ) 116 channel = grpc.secure_channel( 117 "localhost:{}".format(port), 118 channel_creds, 119 options=_PROPERTY_OPTIONS, 120 ) 121 response = channel.unary_unary( 122 _UNARY_UNARY, 123 _registered_method=True, 124 )(_REQUEST) 125 channel.close() 126 server.stop(None) 127 128 auth_data = pickle.loads(response) 129 self.assertIsNone(auth_data[_ID]) 130 self.assertIsNone(auth_data[_ID_KEY]) 131 self.assertDictEqual( 132 { 133 "security_level": [b"TSI_PRIVACY_AND_INTEGRITY"], 134 "transport_security_type": [b"ssl"], 135 "ssl_session_reused": [b"false"], 136 }, 137 auth_data[_AUTH_CTX], 138 ) 139 140 def testSecureClientCert(self): 141 handler = grpc.method_handlers_generic_handler( 142 "test", 143 { 144 "UnaryUnary": grpc.unary_unary_rpc_method_handler( 145 handle_unary_unary 146 ) 147 }, 148 ) 149 server = test_common.test_server() 150 server.add_generic_rpc_handlers((handler,)) 151 server_cred = grpc.ssl_server_credentials( 152 _SERVER_CERTS, 153 root_certificates=_TEST_ROOT_CERTIFICATES, 154 require_client_auth=True, 155 ) 156 port = server.add_secure_port("[::]:0", server_cred) 157 server.start() 158 159 channel_creds = grpc.ssl_channel_credentials( 160 root_certificates=_TEST_ROOT_CERTIFICATES, 161 private_key=_PRIVATE_KEY, 162 certificate_chain=_CERTIFICATE_CHAIN, 163 ) 164 channel = grpc.secure_channel( 165 "localhost:{}".format(port), 166 channel_creds, 167 options=_PROPERTY_OPTIONS, 168 ) 169 170 response = channel.unary_unary( 171 _UNARY_UNARY, 172 _registered_method=True, 173 )(_REQUEST) 174 channel.close() 175 server.stop(None) 176 177 auth_data = pickle.loads(response) 178 auth_ctx = auth_data[_AUTH_CTX] 179 self.assertCountEqual(_CLIENT_IDS, auth_data[_ID]) 180 self.assertEqual("x509_subject_alternative_name", auth_data[_ID_KEY]) 181 self.assertSequenceEqual([b"ssl"], auth_ctx["transport_security_type"]) 182 self.assertSequenceEqual( 183 [b"*.test.google.com"], auth_ctx["x509_common_name"] 184 ) 185 186 def _do_one_shot_client_rpc( 187 self, channel_creds, channel_options, port, expect_ssl_session_reused 188 ): 189 channel = grpc.secure_channel( 190 "localhost:{}".format(port), channel_creds, options=channel_options 191 ) 192 response = channel.unary_unary( 193 _UNARY_UNARY, 194 _registered_method=True, 195 )(_REQUEST) 196 auth_data = pickle.loads(response) 197 self.assertEqual( 198 expect_ssl_session_reused, 199 auth_data[_AUTH_CTX]["ssl_session_reused"], 200 ) 201 channel.close() 202 203 def testSessionResumption(self): 204 # Set up a secure server 205 handler = grpc.method_handlers_generic_handler( 206 "test", 207 { 208 "UnaryUnary": grpc.unary_unary_rpc_method_handler( 209 handle_unary_unary 210 ) 211 }, 212 ) 213 server = test_common.test_server() 214 server.add_generic_rpc_handlers((handler,)) 215 server_cred = grpc.ssl_server_credentials(_SERVER_CERTS) 216 port = server.add_secure_port("[::]:0", server_cred) 217 server.start() 218 219 # Create a cache for TLS session tickets 220 cache = session_cache.ssl_session_cache_lru(1) 221 channel_creds = grpc.ssl_channel_credentials( 222 root_certificates=_TEST_ROOT_CERTIFICATES 223 ) 224 channel_options = _PROPERTY_OPTIONS + ( 225 ("grpc.ssl_session_cache", cache), 226 ) 227 228 # Initial connection has no session to resume 229 self._do_one_shot_client_rpc( 230 channel_creds, 231 channel_options, 232 port, 233 expect_ssl_session_reused=[b"false"], 234 ) 235 236 # Subsequent connections resume sessions 237 self._do_one_shot_client_rpc( 238 channel_creds, 239 channel_options, 240 port, 241 expect_ssl_session_reused=[b"true"], 242 ) 243 server.stop(None) 244 245 246if __name__ == "__main__": 247 logging.basicConfig() 248 unittest.main(verbosity=2) 249