1# Copyright 2016 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Test a corner-case at the level of the Cython API.""" 15 16import threading 17import unittest 18 19from grpc._cython import cygrpc 20 21from tests.unit._cython import test_utilities 22 23_EMPTY_FLAGS = 0 24_EMPTY_METADATA = () 25 26 27class _ServerDriver(object): 28 def __init__(self, completion_queue, shutdown_tag): 29 self._condition = threading.Condition() 30 self._completion_queue = completion_queue 31 self._shutdown_tag = shutdown_tag 32 self._events = [] 33 self._saw_shutdown_tag = False 34 35 def start(self): 36 def in_thread(): 37 while True: 38 event = self._completion_queue.poll() 39 with self._condition: 40 self._events.append(event) 41 self._condition.notify() 42 if event.tag is self._shutdown_tag: 43 self._saw_shutdown_tag = True 44 break 45 46 thread = threading.Thread(target=in_thread) 47 thread.start() 48 49 def done(self): 50 with self._condition: 51 return self._saw_shutdown_tag 52 53 def first_event(self): 54 with self._condition: 55 while not self._events: 56 self._condition.wait() 57 return self._events[0] 58 59 def events(self): 60 with self._condition: 61 while not self._saw_shutdown_tag: 62 self._condition.wait() 63 return tuple(self._events) 64 65 66class _QueueDriver(object): 67 def __init__(self, condition, completion_queue, due): 68 self._condition = condition 69 self._completion_queue = completion_queue 70 self._due = due 71 self._events = [] 72 self._returned = False 73 74 def start(self): 75 def in_thread(): 76 while True: 77 event = self._completion_queue.poll() 78 with self._condition: 79 self._events.append(event) 80 self._due.remove(event.tag) 81 self._condition.notify_all() 82 if not self._due: 83 self._returned = True 84 return 85 86 thread = threading.Thread(target=in_thread) 87 thread.start() 88 89 def done(self): 90 with self._condition: 91 return self._returned 92 93 def event_with_tag(self, tag): 94 with self._condition: 95 while True: 96 for event in self._events: 97 if event.tag is tag: 98 return event 99 self._condition.wait() 100 101 def events(self): 102 with self._condition: 103 while not self._returned: 104 self._condition.wait() 105 return tuple(self._events) 106 107 108class ReadSomeButNotAllResponsesTest(unittest.TestCase): 109 def testReadSomeButNotAllResponses(self): 110 server_completion_queue = cygrpc.CompletionQueue() 111 server = cygrpc.Server( 112 [ 113 ( 114 b"grpc.so_reuseport", 115 0, 116 ) 117 ], 118 False, 119 ) 120 server.register_completion_queue(server_completion_queue) 121 port = server.add_http2_port(b"[::]:0") 122 server.start() 123 channel = cygrpc.Channel( 124 "localhost:{}".format(port).encode(), set(), None 125 ) 126 127 server_shutdown_tag = "server_shutdown_tag" 128 server_driver = _ServerDriver( 129 server_completion_queue, server_shutdown_tag 130 ) 131 server_driver.start() 132 133 client_condition = threading.Condition() 134 client_due = set() 135 136 server_call_condition = threading.Condition() 137 server_send_initial_metadata_tag = "server_send_initial_metadata_tag" 138 server_send_first_message_tag = "server_send_first_message_tag" 139 server_send_second_message_tag = "server_send_second_message_tag" 140 server_complete_rpc_tag = "server_complete_rpc_tag" 141 server_call_due = set( 142 ( 143 server_send_initial_metadata_tag, 144 server_send_first_message_tag, 145 server_send_second_message_tag, 146 server_complete_rpc_tag, 147 ) 148 ) 149 server_call_completion_queue = cygrpc.CompletionQueue() 150 server_call_driver = _QueueDriver( 151 server_call_condition, server_call_completion_queue, server_call_due 152 ) 153 server_call_driver.start() 154 155 server_rpc_tag = "server_rpc_tag" 156 request_call_result = server.request_call( 157 server_call_completion_queue, 158 server_completion_queue, 159 server_rpc_tag, 160 ) 161 162 client_receive_initial_metadata_tag = ( 163 "client_receive_initial_metadata_tag" 164 ) 165 client_complete_rpc_tag = "client_complete_rpc_tag" 166 client_call = channel.segregated_call( 167 _EMPTY_FLAGS, 168 b"/twinkies", 169 None, 170 None, 171 _EMPTY_METADATA, 172 None, 173 ( 174 ( 175 [ 176 cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), 177 ], 178 client_receive_initial_metadata_tag, 179 ), 180 ( 181 [ 182 cygrpc.SendInitialMetadataOperation( 183 _EMPTY_METADATA, _EMPTY_FLAGS 184 ), 185 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), 186 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 187 ], 188 client_complete_rpc_tag, 189 ), 190 ), 191 ) 192 client_receive_initial_metadata_event_future = ( 193 test_utilities.SimpleFuture(client_call.next_event) 194 ) 195 196 server_rpc_event = server_driver.first_event() 197 198 with server_call_condition: 199 server_send_initial_metadata_start_batch_result = ( 200 server_rpc_event.call.start_server_batch( 201 [ 202 cygrpc.SendInitialMetadataOperation( 203 _EMPTY_METADATA, _EMPTY_FLAGS 204 ), 205 ], 206 server_send_initial_metadata_tag, 207 ) 208 ) 209 server_send_first_message_start_batch_result = ( 210 server_rpc_event.call.start_server_batch( 211 [ 212 cygrpc.SendMessageOperation(b"\x07", _EMPTY_FLAGS), 213 ], 214 server_send_first_message_tag, 215 ) 216 ) 217 server_send_initial_metadata_event = server_call_driver.event_with_tag( 218 server_send_initial_metadata_tag 219 ) 220 server_send_first_message_event = server_call_driver.event_with_tag( 221 server_send_first_message_tag 222 ) 223 with server_call_condition: 224 server_send_second_message_start_batch_result = ( 225 server_rpc_event.call.start_server_batch( 226 [ 227 cygrpc.SendMessageOperation(b"\x07", _EMPTY_FLAGS), 228 ], 229 server_send_second_message_tag, 230 ) 231 ) 232 server_complete_rpc_start_batch_result = ( 233 server_rpc_event.call.start_server_batch( 234 [ 235 cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS), 236 cygrpc.SendStatusFromServerOperation( 237 (), 238 cygrpc.StatusCode.ok, 239 b"test details", 240 _EMPTY_FLAGS, 241 ), 242 ], 243 server_complete_rpc_tag, 244 ) 245 ) 246 server_send_second_message_event = server_call_driver.event_with_tag( 247 server_send_second_message_tag 248 ) 249 server_complete_rpc_event = server_call_driver.event_with_tag( 250 server_complete_rpc_tag 251 ) 252 server_call_driver.events() 253 254 client_recieve_initial_metadata_event = ( 255 client_receive_initial_metadata_event_future.result() 256 ) 257 258 client_receive_first_message_tag = "client_receive_first_message_tag" 259 client_call.operate( 260 [ 261 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), 262 ], 263 client_receive_first_message_tag, 264 ) 265 client_receive_first_message_event = client_call.next_event() 266 267 client_call_cancel_result = client_call.cancel( 268 cygrpc.StatusCode.cancelled, "Cancelled during test!" 269 ) 270 client_complete_rpc_event = client_call.next_event() 271 272 channel.close(cygrpc.StatusCode.unknown, "Channel closed!") 273 server.shutdown(server_completion_queue, server_shutdown_tag) 274 server.cancel_all_calls() 275 server_driver.events() 276 277 self.assertEqual(cygrpc.CallError.ok, request_call_result) 278 self.assertEqual( 279 cygrpc.CallError.ok, server_send_initial_metadata_start_batch_result 280 ) 281 self.assertIs(server_rpc_tag, server_rpc_event.tag) 282 self.assertEqual( 283 cygrpc.CompletionType.operation_complete, 284 server_rpc_event.completion_type, 285 ) 286 self.assertIsInstance(server_rpc_event.call, cygrpc.Call) 287 288 289if __name__ == "__main__": 290 unittest.main(verbosity=2) 291