1# Copyright 2019 The gRPC Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Tests behavior around the Core channel arguments.""" 15 16import asyncio 17import logging 18import platform 19import random 20import errno 21import unittest 22 23import grpc 24from grpc.experimental import aio 25 26from src.proto.grpc.testing import messages_pb2, test_pb2_grpc 27from tests.unit.framework import common 28from tests_aio.unit._test_base import AioTestBase 29from tests_aio.unit._test_server import start_test_server 30 31_RANDOM_SEED = 42 32 33_ENABLE_REUSE_PORT = 'SO_REUSEPORT enabled' 34_DISABLE_REUSE_PORT = 'SO_REUSEPORT disabled' 35_SOCKET_OPT_SO_REUSEPORT = 'grpc.so_reuseport' 36_OPTIONS = ( 37 (_ENABLE_REUSE_PORT, ((_SOCKET_OPT_SO_REUSEPORT, 1),)), 38 (_DISABLE_REUSE_PORT, ((_SOCKET_OPT_SO_REUSEPORT, 0),)), 39) 40 41_NUM_SERVER_CREATED = 5 42 43_GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH = 'grpc.max_receive_message_length' 44_MAX_MESSAGE_LENGTH = 1024 45 46_ADDRESS_TOKEN_ERRNO = errno.EADDRINUSE, errno.ENOSR 47 48 49class _TestPointerWrapper(object): 50 51 def __int__(self): 52 return 123456 53 54 55_TEST_CHANNEL_ARGS = ( 56 ('arg1', b'bytes_val'), 57 ('arg2', 'str_val'), 58 ('arg3', 1), 59 (b'arg4', 'str_val'), 60 ('arg6', _TestPointerWrapper()), 61) 62 63_INVALID_TEST_CHANNEL_ARGS = [ 64 { 65 'foo': 'bar' 66 }, 67 (('key',),), 68 'str', 69] 70 71 72async def test_if_reuse_port_enabled(server: aio.Server): 73 port = server.add_insecure_port('localhost:0') 74 await server.start() 75 76 try: 77 with common.bound_socket( 78 bind_address='localhost', 79 port=port, 80 listen=False, 81 ) as (unused_host, bound_port): 82 assert bound_port == port 83 except OSError as e: 84 if e.errno in _ADDRESS_TOKEN_ERRNO: 85 return False 86 else: 87 logging.exception(e) 88 raise 89 else: 90 return True 91 92 93class TestChannelArgument(AioTestBase): 94 95 async def setUp(self): 96 random.seed(_RANDOM_SEED) 97 98 @unittest.skipIf(platform.system() == 'Windows', 99 'SO_REUSEPORT only available in Linux-like OS.') 100 async def test_server_so_reuse_port_is_set_properly(self): 101 102 async def test_body(): 103 fact, options = random.choice(_OPTIONS) 104 server = aio.server(options=options) 105 try: 106 result = await test_if_reuse_port_enabled(server) 107 if fact == _ENABLE_REUSE_PORT and not result: 108 self.fail( 109 'Enabled reuse port in options, but not observed in socket' 110 ) 111 elif fact == _DISABLE_REUSE_PORT and result: 112 self.fail( 113 'Disabled reuse port in options, but observed in socket' 114 ) 115 finally: 116 await server.stop(None) 117 118 # Creating a lot of servers concurrently 119 await asyncio.gather(*(test_body() for _ in range(_NUM_SERVER_CREATED))) 120 121 async def test_client(self): 122 # Do not segfault, or raise exception! 123 channel = aio.insecure_channel('[::]:0', options=_TEST_CHANNEL_ARGS) 124 await channel.close() 125 126 async def test_server(self): 127 # Do not segfault, or raise exception! 128 server = aio.server(options=_TEST_CHANNEL_ARGS) 129 await server.stop(None) 130 131 async def test_invalid_client_args(self): 132 for invalid_arg in _INVALID_TEST_CHANNEL_ARGS: 133 self.assertRaises((ValueError, TypeError), 134 aio.insecure_channel, 135 '[::]:0', 136 options=invalid_arg) 137 138 async def test_max_message_length_applied(self): 139 address, server = await start_test_server() 140 141 async with aio.insecure_channel( 142 address, 143 options=((_GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, 144 _MAX_MESSAGE_LENGTH),)) as channel: 145 stub = test_pb2_grpc.TestServiceStub(channel) 146 147 request = messages_pb2.StreamingOutputCallRequest() 148 # First request will pass 149 request.response_parameters.append( 150 messages_pb2.ResponseParameters(size=_MAX_MESSAGE_LENGTH // 2,)) 151 # Second request should fail 152 request.response_parameters.append( 153 messages_pb2.ResponseParameters(size=_MAX_MESSAGE_LENGTH * 2,)) 154 155 call = stub.StreamingOutputCall(request) 156 157 response = await call.read() 158 self.assertEqual(_MAX_MESSAGE_LENGTH // 2, 159 len(response.payload.body)) 160 161 with self.assertRaises(aio.AioRpcError) as exception_context: 162 await call.read() 163 rpc_error = exception_context.exception 164 self.assertEqual(grpc.StatusCode.RESOURCE_EXHAUSTED, 165 rpc_error.code()) 166 self.assertIn(str(_MAX_MESSAGE_LENGTH), rpc_error.details()) 167 168 self.assertEqual(grpc.StatusCode.RESOURCE_EXHAUSTED, await 169 call.code()) 170 171 await server.stop(None) 172 173 174if __name__ == '__main__': 175 logging.basicConfig(level=logging.DEBUG) 176 unittest.main(verbosity=2) 177