1# Copyright 2017 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Test a corner-case at the level of the Cython API.""" 15 16import threading 17import unittest 18 19from grpc._cython import cygrpc 20 21from tests.unit._cython import _common 22from tests.unit._cython import test_utilities 23 24 25class Test(_common.RpcTest, unittest.TestCase): 26 27 def _do_rpcs(self): 28 server_request_call_tag = 'server_request_call_tag' 29 server_send_initial_metadata_tag = 'server_send_initial_metadata_tag' 30 server_complete_rpc_tag = 'server_complete_rpc_tag' 31 32 with self.server_condition: 33 server_request_call_start_batch_result = self.server.request_call( 34 self.server_completion_queue, self.server_completion_queue, 35 server_request_call_tag) 36 self.server_driver.add_due({ 37 server_request_call_tag, 38 }) 39 40 client_receive_initial_metadata_tag = 'client_receive_initial_metadata_tag' 41 client_complete_rpc_tag = 'client_complete_rpc_tag' 42 client_call = self.channel.integrated_call( 43 _common.EMPTY_FLAGS, b'/twinkies', None, None, 44 _common.INVOCATION_METADATA, None, [ 45 ( 46 [ 47 cygrpc.SendInitialMetadataOperation( 48 _common.INVOCATION_METADATA, _common.EMPTY_FLAGS), 49 cygrpc.SendCloseFromClientOperation( 50 _common.EMPTY_FLAGS), 51 cygrpc.ReceiveStatusOnClientOperation( 52 _common.EMPTY_FLAGS), 53 ], 54 client_complete_rpc_tag, 55 ), 56 ]) 57 client_call.operate([ 58 cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS), 59 ], client_receive_initial_metadata_tag) 60 61 client_events_future = test_utilities.SimpleFuture(lambda: [ 62 self.channel.next_call_event(), 63 self.channel.next_call_event(), 64 ]) 65 server_request_call_event = self.server_driver.event_with_tag( 66 server_request_call_tag) 67 68 with self.server_condition: 69 server_send_initial_metadata_start_batch_result = ( 70 server_request_call_event.call.start_server_batch([ 71 cygrpc.SendInitialMetadataOperation( 72 _common.INITIAL_METADATA, _common.EMPTY_FLAGS), 73 ], server_send_initial_metadata_tag)) 74 self.server_driver.add_due({ 75 server_send_initial_metadata_tag, 76 }) 77 server_send_initial_metadata_event = self.server_driver.event_with_tag( 78 server_send_initial_metadata_tag) 79 80 with self.server_condition: 81 server_complete_rpc_start_batch_result = ( 82 server_request_call_event.call.start_server_batch([ 83 cygrpc.ReceiveCloseOnServerOperation(_common.EMPTY_FLAGS), 84 cygrpc.SendStatusFromServerOperation( 85 _common.TRAILING_METADATA, cygrpc.StatusCode.ok, 86 'test details', _common.EMPTY_FLAGS), 87 ], server_complete_rpc_tag)) 88 self.server_driver.add_due({ 89 server_complete_rpc_tag, 90 }) 91 server_complete_rpc_event = self.server_driver.event_with_tag( 92 server_complete_rpc_tag) 93 94 client_events = client_events_future.result() 95 client_receive_initial_metadata_event = client_events[0] 96 client_complete_rpc_event = client_events[1] 97 98 return ( 99 _common.OperationResult(server_request_call_start_batch_result, 100 server_request_call_event.completion_type, 101 server_request_call_event.success), 102 _common.OperationResult( 103 cygrpc.CallError.ok, 104 client_receive_initial_metadata_event.completion_type, 105 client_receive_initial_metadata_event.success), 106 _common.OperationResult(cygrpc.CallError.ok, 107 client_complete_rpc_event.completion_type, 108 client_complete_rpc_event.success), 109 _common.OperationResult( 110 server_send_initial_metadata_start_batch_result, 111 server_send_initial_metadata_event.completion_type, 112 server_send_initial_metadata_event.success), 113 _common.OperationResult(server_complete_rpc_start_batch_result, 114 server_complete_rpc_event.completion_type, 115 server_complete_rpc_event.success), 116 ) 117 118 def test_rpcs(self): 119 expecteds = [(_common.SUCCESSFUL_OPERATION_RESULT,) * 5 120 ] * _common.RPC_COUNT 121 actuallys = _common.execute_many_times(self._do_rpcs) 122 self.assertSequenceEqual(expecteds, actuallys) 123 124 125if __name__ == '__main__': 126 unittest.main(verbosity=2) 127