1# Copyright 2017 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Test a corner-case at the level of the Cython API.""" 15 16import threading 17import unittest 18 19from grpc._cython import cygrpc 20 21from tests.unit._cython import _common 22from tests.unit._cython import test_utilities 23 24 25class Test(_common.RpcTest, unittest.TestCase): 26 def _do_rpcs(self): 27 server_request_call_tag = "server_request_call_tag" 28 server_send_initial_metadata_tag = "server_send_initial_metadata_tag" 29 server_complete_rpc_tag = "server_complete_rpc_tag" 30 31 with self.server_condition: 32 server_request_call_start_batch_result = self.server.request_call( 33 self.server_completion_queue, 34 self.server_completion_queue, 35 server_request_call_tag, 36 ) 37 self.server_driver.add_due( 38 { 39 server_request_call_tag, 40 } 41 ) 42 43 client_receive_initial_metadata_tag = ( 44 "client_receive_initial_metadata_tag" 45 ) 46 client_complete_rpc_tag = "client_complete_rpc_tag" 47 client_call = self.channel.integrated_call( 48 _common.EMPTY_FLAGS, 49 b"/twinkies", 50 None, 51 None, 52 _common.INVOCATION_METADATA, 53 None, 54 [ 55 ( 56 [ 57 cygrpc.SendInitialMetadataOperation( 58 _common.INVOCATION_METADATA, _common.EMPTY_FLAGS 59 ), 60 cygrpc.SendCloseFromClientOperation( 61 _common.EMPTY_FLAGS 62 ), 63 cygrpc.ReceiveStatusOnClientOperation( 64 _common.EMPTY_FLAGS 65 ), 66 ], 67 client_complete_rpc_tag, 68 ), 69 ], 70 ) 71 client_call.operate( 72 [ 73 cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS), 74 ], 75 client_receive_initial_metadata_tag, 76 ) 77 78 client_events_future = test_utilities.SimpleFuture( 79 lambda: [ 80 self.channel.next_call_event(), 81 self.channel.next_call_event(), 82 ] 83 ) 84 server_request_call_event = self.server_driver.event_with_tag( 85 server_request_call_tag 86 ) 87 88 with self.server_condition: 89 server_send_initial_metadata_start_batch_result = ( 90 server_request_call_event.call.start_server_batch( 91 [ 92 cygrpc.SendInitialMetadataOperation( 93 _common.INITIAL_METADATA, _common.EMPTY_FLAGS 94 ), 95 ], 96 server_send_initial_metadata_tag, 97 ) 98 ) 99 self.server_driver.add_due( 100 { 101 server_send_initial_metadata_tag, 102 } 103 ) 104 server_send_initial_metadata_event = self.server_driver.event_with_tag( 105 server_send_initial_metadata_tag 106 ) 107 108 with self.server_condition: 109 server_complete_rpc_start_batch_result = ( 110 server_request_call_event.call.start_server_batch( 111 [ 112 cygrpc.ReceiveCloseOnServerOperation( 113 _common.EMPTY_FLAGS 114 ), 115 cygrpc.SendStatusFromServerOperation( 116 _common.TRAILING_METADATA, 117 cygrpc.StatusCode.ok, 118 "test details", 119 _common.EMPTY_FLAGS, 120 ), 121 ], 122 server_complete_rpc_tag, 123 ) 124 ) 125 self.server_driver.add_due( 126 { 127 server_complete_rpc_tag, 128 } 129 ) 130 server_complete_rpc_event = self.server_driver.event_with_tag( 131 server_complete_rpc_tag 132 ) 133 134 client_events = client_events_future.result() 135 client_receive_initial_metadata_event = client_events[0] 136 client_complete_rpc_event = client_events[1] 137 138 return ( 139 _common.OperationResult( 140 server_request_call_start_batch_result, 141 server_request_call_event.completion_type, 142 server_request_call_event.success, 143 ), 144 _common.OperationResult( 145 cygrpc.CallError.ok, 146 client_receive_initial_metadata_event.completion_type, 147 client_receive_initial_metadata_event.success, 148 ), 149 _common.OperationResult( 150 cygrpc.CallError.ok, 151 client_complete_rpc_event.completion_type, 152 client_complete_rpc_event.success, 153 ), 154 _common.OperationResult( 155 server_send_initial_metadata_start_batch_result, 156 server_send_initial_metadata_event.completion_type, 157 server_send_initial_metadata_event.success, 158 ), 159 _common.OperationResult( 160 server_complete_rpc_start_batch_result, 161 server_complete_rpc_event.completion_type, 162 server_complete_rpc_event.success, 163 ), 164 ) 165 166 def test_rpcs(self): 167 expecteds = [ 168 (_common.SUCCESSFUL_OPERATION_RESULT,) * 5 169 ] * _common.RPC_COUNT 170 actuallys = _common.execute_many_times(self._do_rpcs) 171 self.assertSequenceEqual(expecteds, actuallys) 172 173 174if __name__ == "__main__": 175 unittest.main(verbosity=2) 176