1# Copyright 2017 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Tests that a channel will reconnect if a connection is dropped""" 15 16import socket 17import time 18import unittest 19 20import grpc 21from grpc.framework.foundation import logging_pool 22 23from tests.unit.framework.common import test_constants 24 25_REQUEST = b'\x00\x00\x00' 26_RESPONSE = b'\x00\x00\x01' 27 28_UNARY_UNARY = '/test/UnaryUnary' 29 30 31def _handle_unary_unary(unused_request, unused_servicer_context): 32 return _RESPONSE 33 34 35def _get_reuse_socket_option(): 36 try: 37 return socket.SO_REUSEPORT 38 except AttributeError: 39 # SO_REUSEPORT is unavailable on Windows, but SO_REUSEADDR 40 # allows forcibly re-binding to a port 41 return socket.SO_REUSEADDR 42 43 44def _pick_and_bind_port(sock_opt): 45 # Reserve a port, when we restart the server we want 46 # to hold onto the port 47 port = 0 48 for address_family in (socket.AF_INET6, socket.AF_INET): 49 try: 50 s = socket.socket(address_family, socket.SOCK_STREAM) 51 except socket.error: 52 continue # this address family is unavailable 53 s.setsockopt(socket.SOL_SOCKET, sock_opt, 1) 54 try: 55 s.bind(('localhost', port)) 56 # for socket.SOCK_STREAM sockets, it is necessary to call 57 # listen to get the desired behavior. 58 s.listen(1) 59 port = s.getsockname()[1] 60 except socket.error: 61 # port was not available on the current address family 62 # try again 63 port = 0 64 break 65 finally: 66 s.close() 67 if s: 68 return port if port != 0 else _pick_and_bind_port(sock_opt) 69 else: 70 return None # no address family was available 71 72 73class ReconnectTest(unittest.TestCase): 74 75 def test_reconnect(self): 76 server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) 77 handler = grpc.method_handlers_generic_handler('test', { 78 'UnaryUnary': 79 grpc.unary_unary_rpc_method_handler(_handle_unary_unary) 80 }) 81 sock_opt = _get_reuse_socket_option() 82 port = _pick_and_bind_port(sock_opt) 83 self.assertIsNotNone(port) 84 85 server = grpc.server(server_pool, (handler,)) 86 server.add_insecure_port('[::]:{}'.format(port)) 87 server.start() 88 channel = grpc.insecure_channel('localhost:%d' % port) 89 multi_callable = channel.unary_unary(_UNARY_UNARY) 90 self.assertEqual(_RESPONSE, multi_callable(_REQUEST)) 91 server.stop(None) 92 # By default, the channel connectivity is checked every 5s 93 # GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS can be set to change 94 # this. 95 time.sleep(5.1) 96 server = grpc.server(server_pool, (handler,)) 97 server.add_insecure_port('[::]:{}'.format(port)) 98 server.start() 99 self.assertEqual(_RESPONSE, multi_callable(_REQUEST)) 100 101 102if __name__ == '__main__': 103 unittest.main(verbosity=2) 104