1# Copyright 2016 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import logging 16import unittest 17 18import grpc 19 20from tests.unit import test_common 21from tests.unit.framework.common import test_constants 22 23_REQUEST = b"" 24_RESPONSE = b"" 25 26_SERVICE_NAME = "test" 27_UNARY_UNARY = "UnaryUnary" 28_UNARY_STREAM = "UnaryStream" 29_STREAM_UNARY = "StreamUnary" 30_STREAM_STREAM = "StreamStream" 31 32 33def handle_unary_unary(request, servicer_context): 34 return _RESPONSE 35 36 37def handle_unary_stream(request, servicer_context): 38 for _ in range(test_constants.STREAM_LENGTH): 39 yield _RESPONSE 40 41 42def handle_stream_unary(request_iterator, servicer_context): 43 for request in request_iterator: 44 pass 45 return _RESPONSE 46 47 48def handle_stream_stream(request_iterator, servicer_context): 49 for request in request_iterator: 50 yield _RESPONSE 51 52 53class _MethodHandler(grpc.RpcMethodHandler): 54 def __init__(self, request_streaming, response_streaming): 55 self.request_streaming = request_streaming 56 self.response_streaming = response_streaming 57 self.request_deserializer = None 58 self.response_serializer = None 59 self.unary_unary = None 60 self.unary_stream = None 61 self.stream_unary = None 62 self.stream_stream = None 63 if self.request_streaming and self.response_streaming: 64 self.stream_stream = handle_stream_stream 65 elif self.request_streaming: 66 self.stream_unary = handle_stream_unary 67 elif self.response_streaming: 68 self.unary_stream = handle_unary_stream 69 else: 70 self.unary_unary = handle_unary_unary 71 72 73_METHOD_HANDLERS = { 74 _UNARY_UNARY: _MethodHandler(False, False), 75 _UNARY_STREAM: _MethodHandler(False, True), 76 _STREAM_UNARY: _MethodHandler(True, False), 77 _STREAM_STREAM: _MethodHandler(True, True), 78} 79 80 81class EmptyMessageTest(unittest.TestCase): 82 def setUp(self): 83 self._server = test_common.test_server() 84 self._server.add_registered_method_handlers( 85 _SERVICE_NAME, _METHOD_HANDLERS 86 ) 87 port = self._server.add_insecure_port("[::]:0") 88 self._server.start() 89 self._channel = grpc.insecure_channel("localhost:%d" % port) 90 91 def tearDown(self): 92 self._server.stop(0) 93 self._channel.close() 94 95 def testUnaryUnary(self): 96 response = self._channel.unary_unary( 97 grpc._common.fully_qualified_method(_SERVICE_NAME, _UNARY_UNARY), 98 _registered_method=True, 99 )(_REQUEST) 100 self.assertEqual(_RESPONSE, response) 101 102 def testUnaryStream(self): 103 response_iterator = self._channel.unary_stream( 104 grpc._common.fully_qualified_method(_SERVICE_NAME, _UNARY_STREAM), 105 _registered_method=True, 106 )(_REQUEST) 107 self.assertSequenceEqual( 108 [_RESPONSE] * test_constants.STREAM_LENGTH, list(response_iterator) 109 ) 110 111 def testStreamUnary(self): 112 response = self._channel.stream_unary( 113 grpc._common.fully_qualified_method(_SERVICE_NAME, _STREAM_UNARY), 114 _registered_method=True, 115 )(iter([_REQUEST] * test_constants.STREAM_LENGTH)) 116 self.assertEqual(_RESPONSE, response) 117 118 def testStreamStream(self): 119 response_iterator = self._channel.stream_stream( 120 grpc._common.fully_qualified_method(_SERVICE_NAME, _STREAM_STREAM), 121 _registered_method=True, 122 )(iter([_REQUEST] * test_constants.STREAM_LENGTH)) 123 self.assertSequenceEqual( 124 [_RESPONSE] * test_constants.STREAM_LENGTH, list(response_iterator) 125 ) 126 127 128if __name__ == "__main__": 129 logging.basicConfig() 130 unittest.main(verbosity=2) 131