1# Copyright 2017 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import time 16import unittest 17 18import grpc 19import grpc_testing 20 21from tests.testing import _application_common 22from tests.testing import _application_testing_common 23from tests.testing import _server_application 24from tests.testing.proto import services_pb2 25 26 27class FirstServiceServicerTest(unittest.TestCase): 28 def setUp(self): 29 self._real_time = grpc_testing.strict_real_time() 30 self._fake_time = grpc_testing.strict_fake_time(time.time()) 31 servicer = _server_application.FirstServiceServicer() 32 descriptors_to_servicers = { 33 _application_testing_common.FIRST_SERVICE: servicer 34 } 35 self._real_time_server = grpc_testing.server_from_dictionary( 36 descriptors_to_servicers, self._real_time 37 ) 38 self._fake_time_server = grpc_testing.server_from_dictionary( 39 descriptors_to_servicers, self._fake_time 40 ) 41 42 def test_successful_unary_unary(self): 43 rpc = self._real_time_server.invoke_unary_unary( 44 _application_testing_common.FIRST_SERVICE_UNUN, 45 (), 46 _application_common.UNARY_UNARY_REQUEST, 47 None, 48 ) 49 initial_metadata = rpc.initial_metadata() 50 response, trailing_metadata, code, details = rpc.termination() 51 52 self.assertEqual(_application_common.UNARY_UNARY_RESPONSE, response) 53 self.assertIs(code, grpc.StatusCode.OK) 54 55 def test_successful_unary_stream(self): 56 rpc = self._real_time_server.invoke_unary_stream( 57 _application_testing_common.FIRST_SERVICE_UNSTRE, 58 (), 59 _application_common.UNARY_STREAM_REQUEST, 60 None, 61 ) 62 initial_metadata = rpc.initial_metadata() 63 trailing_metadata, code, details = rpc.termination() 64 65 self.assertIs(code, grpc.StatusCode.OK) 66 67 def test_successful_stream_unary(self): 68 rpc = self._real_time_server.invoke_stream_unary( 69 _application_testing_common.FIRST_SERVICE_STREUN, (), None 70 ) 71 rpc.send_request(_application_common.STREAM_UNARY_REQUEST) 72 rpc.send_request(_application_common.STREAM_UNARY_REQUEST) 73 rpc.send_request(_application_common.STREAM_UNARY_REQUEST) 74 rpc.requests_closed() 75 initial_metadata = rpc.initial_metadata() 76 response, trailing_metadata, code, details = rpc.termination() 77 78 self.assertEqual(_application_common.STREAM_UNARY_RESPONSE, response) 79 self.assertIs(code, grpc.StatusCode.OK) 80 81 def test_successful_stream_stream(self): 82 rpc = self._real_time_server.invoke_stream_stream( 83 _application_testing_common.FIRST_SERVICE_STRESTRE, (), None 84 ) 85 rpc.send_request(_application_common.STREAM_STREAM_REQUEST) 86 initial_metadata = rpc.initial_metadata() 87 responses = [ 88 rpc.take_response(), 89 rpc.take_response(), 90 ] 91 rpc.send_request(_application_common.STREAM_STREAM_REQUEST) 92 rpc.send_request(_application_common.STREAM_STREAM_REQUEST) 93 responses.extend( 94 [ 95 rpc.take_response(), 96 rpc.take_response(), 97 rpc.take_response(), 98 rpc.take_response(), 99 ] 100 ) 101 rpc.requests_closed() 102 trailing_metadata, code, details = rpc.termination() 103 104 for response in responses: 105 self.assertEqual( 106 _application_common.STREAM_STREAM_RESPONSE, response 107 ) 108 self.assertIs(code, grpc.StatusCode.OK) 109 110 def test_mutating_stream_stream(self): 111 rpc = self._real_time_server.invoke_stream_stream( 112 _application_testing_common.FIRST_SERVICE_STRESTRE, (), None 113 ) 114 rpc.send_request(_application_common.STREAM_STREAM_MUTATING_REQUEST) 115 initial_metadata = rpc.initial_metadata() 116 responses = [ 117 rpc.take_response() 118 for _ in range(_application_common.STREAM_STREAM_MUTATING_COUNT) 119 ] 120 rpc.send_request(_application_common.STREAM_STREAM_MUTATING_REQUEST) 121 responses.extend( 122 [ 123 rpc.take_response() 124 for _ in range(_application_common.STREAM_STREAM_MUTATING_COUNT) 125 ] 126 ) 127 rpc.requests_closed() 128 _, _, _ = rpc.termination() 129 expected_responses = ( 130 services_pb2.Bottom(first_bottom_field=0), 131 services_pb2.Bottom(first_bottom_field=1), 132 services_pb2.Bottom(first_bottom_field=0), 133 services_pb2.Bottom(first_bottom_field=1), 134 ) 135 self.assertSequenceEqual(expected_responses, responses) 136 137 def test_server_rpc_idempotence(self): 138 rpc = self._real_time_server.invoke_unary_unary( 139 _application_testing_common.FIRST_SERVICE_UNUN, 140 (), 141 _application_common.UNARY_UNARY_REQUEST, 142 None, 143 ) 144 first_initial_metadata = rpc.initial_metadata() 145 second_initial_metadata = rpc.initial_metadata() 146 third_initial_metadata = rpc.initial_metadata() 147 first_termination = rpc.termination() 148 second_termination = rpc.termination() 149 third_termination = rpc.termination() 150 151 for later_initial_metadata in ( 152 second_initial_metadata, 153 third_initial_metadata, 154 ): 155 self.assertEqual(first_initial_metadata, later_initial_metadata) 156 response = first_termination[0] 157 terminal_metadata = first_termination[1] 158 code = first_termination[2] 159 details = first_termination[3] 160 for later_termination in ( 161 second_termination, 162 third_termination, 163 ): 164 self.assertEqual(response, later_termination[0]) 165 self.assertEqual(terminal_metadata, later_termination[1]) 166 self.assertIs(code, later_termination[2]) 167 self.assertEqual(details, later_termination[3]) 168 self.assertEqual(_application_common.UNARY_UNARY_RESPONSE, response) 169 self.assertIs(code, grpc.StatusCode.OK) 170 171 def test_misbehaving_client_unary_unary(self): 172 rpc = self._real_time_server.invoke_unary_unary( 173 _application_testing_common.FIRST_SERVICE_UNUN, 174 (), 175 _application_common.ERRONEOUS_UNARY_UNARY_REQUEST, 176 None, 177 ) 178 initial_metadata = rpc.initial_metadata() 179 response, trailing_metadata, code, details = rpc.termination() 180 181 self.assertIsNot(code, grpc.StatusCode.OK) 182 183 def test_infinite_request_stream_real_time(self): 184 rpc = self._real_time_server.invoke_stream_unary( 185 _application_testing_common.FIRST_SERVICE_STREUN, 186 (), 187 _application_common.INFINITE_REQUEST_STREAM_TIMEOUT, 188 ) 189 rpc.send_request(_application_common.STREAM_UNARY_REQUEST) 190 rpc.send_request(_application_common.STREAM_UNARY_REQUEST) 191 rpc.send_request(_application_common.STREAM_UNARY_REQUEST) 192 initial_metadata = rpc.initial_metadata() 193 self._real_time.sleep_for( 194 _application_common.INFINITE_REQUEST_STREAM_TIMEOUT * 2 195 ) 196 rpc.send_request(_application_common.STREAM_UNARY_REQUEST) 197 response, trailing_metadata, code, details = rpc.termination() 198 199 self.assertIs(code, grpc.StatusCode.DEADLINE_EXCEEDED) 200 201 def test_infinite_request_stream_fake_time(self): 202 rpc = self._fake_time_server.invoke_stream_unary( 203 _application_testing_common.FIRST_SERVICE_STREUN, 204 (), 205 _application_common.INFINITE_REQUEST_STREAM_TIMEOUT, 206 ) 207 rpc.send_request(_application_common.STREAM_UNARY_REQUEST) 208 rpc.send_request(_application_common.STREAM_UNARY_REQUEST) 209 rpc.send_request(_application_common.STREAM_UNARY_REQUEST) 210 initial_metadata = rpc.initial_metadata() 211 self._fake_time.sleep_for( 212 _application_common.INFINITE_REQUEST_STREAM_TIMEOUT * 2 213 ) 214 rpc.send_request(_application_common.STREAM_UNARY_REQUEST) 215 response, trailing_metadata, code, details = rpc.termination() 216 217 self.assertIs(code, grpc.StatusCode.DEADLINE_EXCEEDED) 218 219 def test_servicer_context_abort(self): 220 rpc = self._real_time_server.invoke_unary_unary( 221 _application_testing_common.FIRST_SERVICE_UNUN, 222 (), 223 _application_common.ABORT_REQUEST, 224 None, 225 ) 226 _, _, code, _ = rpc.termination() 227 self.assertIs(code, grpc.StatusCode.PERMISSION_DENIED) 228 rpc = self._real_time_server.invoke_unary_unary( 229 _application_testing_common.FIRST_SERVICE_UNUN, 230 (), 231 _application_common.ABORT_SUCCESS_QUERY, 232 None, 233 ) 234 response, _, code, _ = rpc.termination() 235 self.assertEqual(_application_common.ABORT_SUCCESS_RESPONSE, response) 236 self.assertIs(code, grpc.StatusCode.OK) 237 238 239if __name__ == "__main__": 240 unittest.main(verbosity=2) 241