• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2019 The gRPC Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Tests behavior around the Core channel arguments."""
15
16import asyncio
17import logging
18import platform
19import random
20import errno
21import unittest
22
23import grpc
24from grpc.experimental import aio
25
26from src.proto.grpc.testing import messages_pb2, test_pb2_grpc
27from tests.unit.framework import common
28from tests_aio.unit._test_base import AioTestBase
29from tests_aio.unit._test_server import start_test_server
30
31_RANDOM_SEED = 42
32
33_ENABLE_REUSE_PORT = 'SO_REUSEPORT enabled'
34_DISABLE_REUSE_PORT = 'SO_REUSEPORT disabled'
35_SOCKET_OPT_SO_REUSEPORT = 'grpc.so_reuseport'
36_OPTIONS = (
37    (_ENABLE_REUSE_PORT, ((_SOCKET_OPT_SO_REUSEPORT, 1),)),
38    (_DISABLE_REUSE_PORT, ((_SOCKET_OPT_SO_REUSEPORT, 0),)),
39)
40
41_NUM_SERVER_CREATED = 5
42
43_GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH = 'grpc.max_receive_message_length'
44_MAX_MESSAGE_LENGTH = 1024
45
46_ADDRESS_TOKEN_ERRNO = errno.EADDRINUSE, errno.ENOSR
47
48
49class _TestPointerWrapper(object):
50
51    def __int__(self):
52        return 123456
53
54
55_TEST_CHANNEL_ARGS = (
56    ('arg1', b'bytes_val'),
57    ('arg2', 'str_val'),
58    ('arg3', 1),
59    (b'arg4', 'str_val'),
60    ('arg6', _TestPointerWrapper()),
61)
62
63_INVALID_TEST_CHANNEL_ARGS = [
64    {
65        'foo': 'bar'
66    },
67    (('key',),),
68    'str',
69]
70
71
72async def test_if_reuse_port_enabled(server: aio.Server):
73    port = server.add_insecure_port('localhost:0')
74    await server.start()
75
76    try:
77        with common.bound_socket(
78                bind_address='localhost',
79                port=port,
80                listen=False,
81        ) as (unused_host, bound_port):
82            assert bound_port == port
83    except OSError as e:
84        if e.errno in _ADDRESS_TOKEN_ERRNO:
85            return False
86        else:
87            logging.exception(e)
88            raise
89    else:
90        return True
91
92
93class TestChannelArgument(AioTestBase):
94
95    async def setUp(self):
96        random.seed(_RANDOM_SEED)
97
98    @unittest.skipIf(platform.system() == 'Windows',
99                     'SO_REUSEPORT only available in Linux-like OS.')
100    async def test_server_so_reuse_port_is_set_properly(self):
101
102        async def test_body():
103            fact, options = random.choice(_OPTIONS)
104            server = aio.server(options=options)
105            try:
106                result = await test_if_reuse_port_enabled(server)
107                if fact == _ENABLE_REUSE_PORT and not result:
108                    self.fail(
109                        'Enabled reuse port in options, but not observed in socket'
110                    )
111                elif fact == _DISABLE_REUSE_PORT and result:
112                    self.fail(
113                        'Disabled reuse port in options, but observed in socket'
114                    )
115            finally:
116                await server.stop(None)
117
118        # Creating a lot of servers concurrently
119        await asyncio.gather(*(test_body() for _ in range(_NUM_SERVER_CREATED)))
120
121    async def test_client(self):
122        # Do not segfault, or raise exception!
123        channel = aio.insecure_channel('[::]:0', options=_TEST_CHANNEL_ARGS)
124        await channel.close()
125
126    async def test_server(self):
127        # Do not segfault, or raise exception!
128        server = aio.server(options=_TEST_CHANNEL_ARGS)
129        await server.stop(None)
130
131    async def test_invalid_client_args(self):
132        for invalid_arg in _INVALID_TEST_CHANNEL_ARGS:
133            self.assertRaises((ValueError, TypeError),
134                              aio.insecure_channel,
135                              '[::]:0',
136                              options=invalid_arg)
137
138    async def test_max_message_length_applied(self):
139        address, server = await start_test_server()
140
141        async with aio.insecure_channel(
142                address,
143                options=((_GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH,
144                          _MAX_MESSAGE_LENGTH),)) as channel:
145            stub = test_pb2_grpc.TestServiceStub(channel)
146
147            request = messages_pb2.StreamingOutputCallRequest()
148            # First request will pass
149            request.response_parameters.append(
150                messages_pb2.ResponseParameters(size=_MAX_MESSAGE_LENGTH // 2,))
151            # Second request should fail
152            request.response_parameters.append(
153                messages_pb2.ResponseParameters(size=_MAX_MESSAGE_LENGTH * 2,))
154
155            call = stub.StreamingOutputCall(request)
156
157            response = await call.read()
158            self.assertEqual(_MAX_MESSAGE_LENGTH // 2,
159                             len(response.payload.body))
160
161            with self.assertRaises(aio.AioRpcError) as exception_context:
162                await call.read()
163            rpc_error = exception_context.exception
164            self.assertEqual(grpc.StatusCode.RESOURCE_EXHAUSTED,
165                             rpc_error.code())
166            self.assertIn(str(_MAX_MESSAGE_LENGTH), rpc_error.details())
167
168            self.assertEqual(grpc.StatusCode.RESOURCE_EXHAUSTED, await
169                             call.code())
170
171        await server.stop(None)
172
173
174if __name__ == '__main__':
175    logging.basicConfig(level=logging.DEBUG)
176    unittest.main(verbosity=2)
177