1# Copyright 2017 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Tests exposure of SSL auth context""" 15 16import logging 17import pickle 18import unittest 19 20import grpc 21from grpc import _channel 22from grpc.experimental import session_cache 23 24from tests.unit import resources 25from tests.unit import test_common 26 27_REQUEST = b"\x00\x00\x00" 28_RESPONSE = b"\x00\x00\x00" 29 30_SERVICE_NAME = "test" 31_UNARY_UNARY = "UnaryUnary" 32 33_SERVER_HOST_OVERRIDE = "foo.test.google.fr" 34_CLIENT_IDS = ( 35 b"*.test.google.fr", 36 b"waterzooi.test.google.be", 37 b"*.test.youtube.com", 38 b"192.168.1.3", 39) 40_ID = "id" 41_ID_KEY = "id_key" 42_AUTH_CTX = "auth_ctx" 43 44_PRIVATE_KEY = resources.private_key() 45_CERTIFICATE_CHAIN = resources.certificate_chain() 46_TEST_ROOT_CERTIFICATES = resources.test_root_certificates() 47_SERVER_CERTS = ((_PRIVATE_KEY, _CERTIFICATE_CHAIN),) 48_PROPERTY_OPTIONS = ( 49 ( 50 "grpc.ssl_target_name_override", 51 _SERVER_HOST_OVERRIDE, 52 ), 53) 54 55 56def handle_unary_unary(request, servicer_context): 57 return pickle.dumps( 58 { 59 _ID: servicer_context.peer_identities(), 60 _ID_KEY: servicer_context.peer_identity_key(), 61 _AUTH_CTX: servicer_context.auth_context(), 62 } 63 ) 64 65 66_METHOD_HANDLERS = { 67 "UnaryUnary": grpc.unary_unary_rpc_method_handler(handle_unary_unary) 68} 69 70 71class AuthContextTest(unittest.TestCase): 72 def testInsecure(self): 73 server = test_common.test_server() 74 server.add_registered_method_handlers(_SERVICE_NAME, _METHOD_HANDLERS) 75 port = server.add_insecure_port("[::]:0") 76 server.start() 77 78 with grpc.insecure_channel("localhost:%d" % port) as channel: 79 response = channel.unary_unary( 80 grpc._common.fully_qualified_method( 81 _SERVICE_NAME, _UNARY_UNARY 82 ), 83 _registered_method=True, 84 )(_REQUEST) 85 server.stop(None) 86 87 auth_data = pickle.loads(response) 88 self.assertIsNone(auth_data[_ID]) 89 self.assertIsNone(auth_data[_ID_KEY]) 90 self.assertDictEqual( 91 { 92 "security_level": [b"TSI_SECURITY_NONE"], 93 "transport_security_type": [b"insecure"], 94 }, 95 auth_data[_AUTH_CTX], 96 ) 97 98 def testSecureNoCert(self): 99 server = test_common.test_server() 100 server.add_registered_method_handlers(_SERVICE_NAME, _METHOD_HANDLERS) 101 server_cred = grpc.ssl_server_credentials(_SERVER_CERTS) 102 port = server.add_secure_port("[::]:0", server_cred) 103 server.start() 104 105 channel_creds = grpc.ssl_channel_credentials( 106 root_certificates=_TEST_ROOT_CERTIFICATES 107 ) 108 channel = grpc.secure_channel( 109 "localhost:{}".format(port), 110 channel_creds, 111 options=_PROPERTY_OPTIONS, 112 ) 113 response = channel.unary_unary( 114 grpc._common.fully_qualified_method(_SERVICE_NAME, _UNARY_UNARY), 115 _registered_method=True, 116 )(_REQUEST) 117 channel.close() 118 server.stop(None) 119 120 auth_data = pickle.loads(response) 121 self.assertIsNone(auth_data[_ID]) 122 self.assertIsNone(auth_data[_ID_KEY]) 123 self.assertDictEqual( 124 { 125 "security_level": [b"TSI_PRIVACY_AND_INTEGRITY"], 126 "transport_security_type": [b"ssl"], 127 "ssl_session_reused": [b"false"], 128 }, 129 auth_data[_AUTH_CTX], 130 ) 131 132 def testSecureClientCert(self): 133 server = test_common.test_server() 134 server.add_registered_method_handlers(_SERVICE_NAME, _METHOD_HANDLERS) 135 server_cred = grpc.ssl_server_credentials( 136 _SERVER_CERTS, 137 root_certificates=_TEST_ROOT_CERTIFICATES, 138 require_client_auth=True, 139 ) 140 port = server.add_secure_port("[::]:0", server_cred) 141 server.start() 142 143 channel_creds = grpc.ssl_channel_credentials( 144 root_certificates=_TEST_ROOT_CERTIFICATES, 145 private_key=_PRIVATE_KEY, 146 certificate_chain=_CERTIFICATE_CHAIN, 147 ) 148 channel = grpc.secure_channel( 149 "localhost:{}".format(port), 150 channel_creds, 151 options=_PROPERTY_OPTIONS, 152 ) 153 154 response = channel.unary_unary( 155 grpc._common.fully_qualified_method(_SERVICE_NAME, _UNARY_UNARY), 156 _registered_method=True, 157 )(_REQUEST) 158 channel.close() 159 server.stop(None) 160 161 auth_data = pickle.loads(response) 162 auth_ctx = auth_data[_AUTH_CTX] 163 self.assertCountEqual(_CLIENT_IDS, auth_data[_ID]) 164 self.assertEqual("x509_subject_alternative_name", auth_data[_ID_KEY]) 165 self.assertSequenceEqual([b"ssl"], auth_ctx["transport_security_type"]) 166 self.assertSequenceEqual( 167 [b"*.test.google.com"], auth_ctx["x509_common_name"] 168 ) 169 170 def _do_one_shot_client_rpc( 171 self, channel_creds, channel_options, port, expect_ssl_session_reused 172 ): 173 channel = grpc.secure_channel( 174 "localhost:{}".format(port), channel_creds, options=channel_options 175 ) 176 response = channel.unary_unary( 177 grpc._common.fully_qualified_method(_SERVICE_NAME, _UNARY_UNARY), 178 _registered_method=True, 179 )(_REQUEST) 180 auth_data = pickle.loads(response) 181 self.assertEqual( 182 expect_ssl_session_reused, 183 auth_data[_AUTH_CTX]["ssl_session_reused"], 184 ) 185 channel.close() 186 187 def testSessionResumption(self): 188 # Set up a secure server 189 server = test_common.test_server() 190 server.add_registered_method_handlers(_SERVICE_NAME, _METHOD_HANDLERS) 191 server_cred = grpc.ssl_server_credentials(_SERVER_CERTS) 192 port = server.add_secure_port("[::]:0", server_cred) 193 server.start() 194 195 # Create a cache for TLS session tickets 196 cache = session_cache.ssl_session_cache_lru(1) 197 channel_creds = grpc.ssl_channel_credentials( 198 root_certificates=_TEST_ROOT_CERTIFICATES 199 ) 200 channel_options = _PROPERTY_OPTIONS + ( 201 ("grpc.ssl_session_cache", cache), 202 ) 203 204 # Initial connection has no session to resume 205 self._do_one_shot_client_rpc( 206 channel_creds, 207 channel_options, 208 port, 209 expect_ssl_session_reused=[b"false"], 210 ) 211 212 # Subsequent connections resume sessions 213 self._do_one_shot_client_rpc( 214 channel_creds, 215 channel_options, 216 port, 217 expect_ssl_session_reused=[b"true"], 218 ) 219 server.stop(None) 220 221 222if __name__ == "__main__": 223 logging.basicConfig() 224 unittest.main(verbosity=2) 225