1# Copyright 2018 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Tests experimental TLS Session Resumption API""" 15 16import logging 17import pickle 18import unittest 19 20import grpc 21from grpc import _channel 22from grpc.experimental import session_cache 23 24from tests.unit import resources 25from tests.unit import test_common 26 27_REQUEST = b"\x00\x00\x00" 28_RESPONSE = b"\x00\x00\x00" 29 30_SERVICE_NAME = "test" 31_UNARY_UNARY = "UnaryUnary" 32 33_SERVER_HOST_OVERRIDE = "foo.test.google.fr" 34_ID = "id" 35_ID_KEY = "id_key" 36_AUTH_CTX = "auth_ctx" 37 38_PRIVATE_KEY = resources.private_key() 39_CERTIFICATE_CHAIN = resources.certificate_chain() 40_TEST_ROOT_CERTIFICATES = resources.test_root_certificates() 41_SERVER_CERTS = ((_PRIVATE_KEY, _CERTIFICATE_CHAIN),) 42_PROPERTY_OPTIONS = ( 43 ( 44 "grpc.ssl_target_name_override", 45 _SERVER_HOST_OVERRIDE, 46 ), 47) 48 49 50def handle_unary_unary(request, servicer_context): 51 return pickle.dumps( 52 { 53 _ID: servicer_context.peer_identities(), 54 _ID_KEY: servicer_context.peer_identity_key(), 55 _AUTH_CTX: servicer_context.auth_context(), 56 } 57 ) 58 59 60_METHOD_HANDLERS = { 61 _UNARY_UNARY: grpc.unary_unary_rpc_method_handler(handle_unary_unary) 62} 63 64 65def start_secure_server(): 66 server = test_common.test_server() 67 server.add_registered_method_handlers(_SERVICE_NAME, _METHOD_HANDLERS) 68 server_cred = grpc.ssl_server_credentials(_SERVER_CERTS) 69 port = server.add_secure_port("[::]:0", server_cred) 70 server.start() 71 72 return server, port 73 74 75class SSLSessionCacheTest(unittest.TestCase): 76 def _do_one_shot_client_rpc( 77 self, channel_creds, channel_options, port, expect_ssl_session_reused 78 ): 79 channel = grpc.secure_channel( 80 "localhost:{}".format(port), channel_creds, options=channel_options 81 ) 82 response = channel.unary_unary( 83 grpc._common.fully_qualified_method(_SERVICE_NAME, _UNARY_UNARY), 84 _registered_method=True, 85 )(_REQUEST) 86 auth_data = pickle.loads(response) 87 self.assertEqual( 88 expect_ssl_session_reused, 89 auth_data[_AUTH_CTX]["ssl_session_reused"], 90 ) 91 channel.close() 92 93 def testSSLSessionCacheLRU(self): 94 server_1, port_1 = start_secure_server() 95 96 cache = session_cache.ssl_session_cache_lru(1) 97 channel_creds = grpc.ssl_channel_credentials( 98 root_certificates=_TEST_ROOT_CERTIFICATES 99 ) 100 channel_options = _PROPERTY_OPTIONS + ( 101 ("grpc.ssl_session_cache", cache), 102 ) 103 104 # Initial connection has no session to resume 105 self._do_one_shot_client_rpc( 106 channel_creds, 107 channel_options, 108 port_1, 109 expect_ssl_session_reused=[b"false"], 110 ) 111 112 # Connection to server_1 resumes from initial session 113 self._do_one_shot_client_rpc( 114 channel_creds, 115 channel_options, 116 port_1, 117 expect_ssl_session_reused=[b"true"], 118 ) 119 120 # Connection to a different server with the same name overwrites the cache entry 121 server_2, port_2 = start_secure_server() 122 self._do_one_shot_client_rpc( 123 channel_creds, 124 channel_options, 125 port_2, 126 expect_ssl_session_reused=[b"false"], 127 ) 128 self._do_one_shot_client_rpc( 129 channel_creds, 130 channel_options, 131 port_2, 132 expect_ssl_session_reused=[b"true"], 133 ) 134 server_2.stop(None) 135 136 # Connection to server_1 now falls back to full TLS handshake 137 self._do_one_shot_client_rpc( 138 channel_creds, 139 channel_options, 140 port_1, 141 expect_ssl_session_reused=[b"false"], 142 ) 143 144 # Re-creating server_1 causes old sessions to become invalid 145 server_1.stop(None) 146 server_1, port_1 = start_secure_server() 147 148 # Old sessions should no longer be valid 149 self._do_one_shot_client_rpc( 150 channel_creds, 151 channel_options, 152 port_1, 153 expect_ssl_session_reused=[b"false"], 154 ) 155 156 # Resumption should work for subsequent connections 157 self._do_one_shot_client_rpc( 158 channel_creds, 159 channel_options, 160 port_1, 161 expect_ssl_session_reused=[b"true"], 162 ) 163 server_1.stop(None) 164 165 166if __name__ == "__main__": 167 logging.basicConfig() 168 unittest.main(verbosity=2) 169