• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2017 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Test a corner-case at the level of the Cython API."""
15
16import threading
17import unittest
18
19from grpc._cython import cygrpc
20
21from tests.unit._cython import _common
22from tests.unit._cython import test_utilities
23
24
25class Test(_common.RpcTest, unittest.TestCase):
26
27    def _do_rpcs(self):
28        server_request_call_tag = 'server_request_call_tag'
29        server_send_initial_metadata_tag = 'server_send_initial_metadata_tag'
30        server_complete_rpc_tag = 'server_complete_rpc_tag'
31
32        with self.server_condition:
33            server_request_call_start_batch_result = self.server.request_call(
34                self.server_completion_queue, self.server_completion_queue,
35                server_request_call_tag)
36            self.server_driver.add_due({
37                server_request_call_tag,
38            })
39
40        client_receive_initial_metadata_tag = 'client_receive_initial_metadata_tag'
41        client_complete_rpc_tag = 'client_complete_rpc_tag'
42        client_call = self.channel.integrated_call(
43            _common.EMPTY_FLAGS, b'/twinkies', None, None,
44            _common.INVOCATION_METADATA, None, [
45                (
46                    [
47                        cygrpc.SendInitialMetadataOperation(
48                            _common.INVOCATION_METADATA, _common.EMPTY_FLAGS),
49                        cygrpc.SendCloseFromClientOperation(
50                            _common.EMPTY_FLAGS),
51                        cygrpc.ReceiveStatusOnClientOperation(
52                            _common.EMPTY_FLAGS),
53                    ],
54                    client_complete_rpc_tag,
55                ),
56            ])
57        client_call.operate([
58            cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS),
59        ], client_receive_initial_metadata_tag)
60
61        client_events_future = test_utilities.SimpleFuture(lambda: [
62            self.channel.next_call_event(),
63            self.channel.next_call_event(),
64        ])
65        server_request_call_event = self.server_driver.event_with_tag(
66            server_request_call_tag)
67
68        with self.server_condition:
69            server_send_initial_metadata_start_batch_result = (
70                server_request_call_event.call.start_server_batch([
71                    cygrpc.SendInitialMetadataOperation(
72                        _common.INITIAL_METADATA, _common.EMPTY_FLAGS),
73                ], server_send_initial_metadata_tag))
74            self.server_driver.add_due({
75                server_send_initial_metadata_tag,
76            })
77        server_send_initial_metadata_event = self.server_driver.event_with_tag(
78            server_send_initial_metadata_tag)
79
80        with self.server_condition:
81            server_complete_rpc_start_batch_result = (
82                server_request_call_event.call.start_server_batch([
83                    cygrpc.ReceiveCloseOnServerOperation(_common.EMPTY_FLAGS),
84                    cygrpc.SendStatusFromServerOperation(
85                        _common.TRAILING_METADATA, cygrpc.StatusCode.ok,
86                        'test details', _common.EMPTY_FLAGS),
87                ], server_complete_rpc_tag))
88            self.server_driver.add_due({
89                server_complete_rpc_tag,
90            })
91        server_complete_rpc_event = self.server_driver.event_with_tag(
92            server_complete_rpc_tag)
93
94        client_events = client_events_future.result()
95        client_receive_initial_metadata_event = client_events[0]
96        client_complete_rpc_event = client_events[1]
97
98        return (
99            _common.OperationResult(server_request_call_start_batch_result,
100                                    server_request_call_event.completion_type,
101                                    server_request_call_event.success),
102            _common.OperationResult(
103                cygrpc.CallError.ok,
104                client_receive_initial_metadata_event.completion_type,
105                client_receive_initial_metadata_event.success),
106            _common.OperationResult(cygrpc.CallError.ok,
107                                    client_complete_rpc_event.completion_type,
108                                    client_complete_rpc_event.success),
109            _common.OperationResult(
110                server_send_initial_metadata_start_batch_result,
111                server_send_initial_metadata_event.completion_type,
112                server_send_initial_metadata_event.success),
113            _common.OperationResult(server_complete_rpc_start_batch_result,
114                                    server_complete_rpc_event.completion_type,
115                                    server_complete_rpc_event.success),
116        )
117
118    def test_rpcs(self):
119        expecteds = [(_common.SUCCESSFUL_OPERATION_RESULT,) * 5
120                    ] * _common.RPC_COUNT
121        actuallys = _common.execute_many_times(self._do_rpcs)
122        self.assertSequenceEqual(expecteds, actuallys)
123
124
125if __name__ == '__main__':
126    unittest.main(verbosity=2)
127