• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2017 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Test a corner-case at the level of the Cython API."""
15
16import threading
17import unittest
18
19from grpc._cython import cygrpc
20
21from tests.unit._cython import _common
22from tests.unit._cython import test_utilities
23
24
25class Test(_common.RpcTest, unittest.TestCase):
26    def _do_rpcs(self):
27        server_request_call_tag = "server_request_call_tag"
28        server_send_initial_metadata_tag = "server_send_initial_metadata_tag"
29        server_complete_rpc_tag = "server_complete_rpc_tag"
30
31        with self.server_condition:
32            server_request_call_start_batch_result = self.server.request_call(
33                self.server_completion_queue,
34                self.server_completion_queue,
35                server_request_call_tag,
36            )
37            self.server_driver.add_due(
38                {
39                    server_request_call_tag,
40                }
41            )
42
43        client_receive_initial_metadata_tag = (
44            "client_receive_initial_metadata_tag"
45        )
46        client_complete_rpc_tag = "client_complete_rpc_tag"
47        client_call = self.channel.integrated_call(
48            _common.EMPTY_FLAGS,
49            b"/twinkies",
50            None,
51            None,
52            _common.INVOCATION_METADATA,
53            None,
54            [
55                (
56                    [
57                        cygrpc.SendInitialMetadataOperation(
58                            _common.INVOCATION_METADATA, _common.EMPTY_FLAGS
59                        ),
60                        cygrpc.SendCloseFromClientOperation(
61                            _common.EMPTY_FLAGS
62                        ),
63                        cygrpc.ReceiveStatusOnClientOperation(
64                            _common.EMPTY_FLAGS
65                        ),
66                    ],
67                    client_complete_rpc_tag,
68                ),
69            ],
70        )
71        client_call.operate(
72            [
73                cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS),
74            ],
75            client_receive_initial_metadata_tag,
76        )
77
78        client_events_future = test_utilities.SimpleFuture(
79            lambda: [
80                self.channel.next_call_event(),
81                self.channel.next_call_event(),
82            ]
83        )
84        server_request_call_event = self.server_driver.event_with_tag(
85            server_request_call_tag
86        )
87
88        with self.server_condition:
89            server_send_initial_metadata_start_batch_result = (
90                server_request_call_event.call.start_server_batch(
91                    [
92                        cygrpc.SendInitialMetadataOperation(
93                            _common.INITIAL_METADATA, _common.EMPTY_FLAGS
94                        ),
95                    ],
96                    server_send_initial_metadata_tag,
97                )
98            )
99            self.server_driver.add_due(
100                {
101                    server_send_initial_metadata_tag,
102                }
103            )
104        server_send_initial_metadata_event = self.server_driver.event_with_tag(
105            server_send_initial_metadata_tag
106        )
107
108        with self.server_condition:
109            server_complete_rpc_start_batch_result = (
110                server_request_call_event.call.start_server_batch(
111                    [
112                        cygrpc.ReceiveCloseOnServerOperation(
113                            _common.EMPTY_FLAGS
114                        ),
115                        cygrpc.SendStatusFromServerOperation(
116                            _common.TRAILING_METADATA,
117                            cygrpc.StatusCode.ok,
118                            "test details",
119                            _common.EMPTY_FLAGS,
120                        ),
121                    ],
122                    server_complete_rpc_tag,
123                )
124            )
125            self.server_driver.add_due(
126                {
127                    server_complete_rpc_tag,
128                }
129            )
130        server_complete_rpc_event = self.server_driver.event_with_tag(
131            server_complete_rpc_tag
132        )
133
134        client_events = client_events_future.result()
135        client_receive_initial_metadata_event = client_events[0]
136        client_complete_rpc_event = client_events[1]
137
138        return (
139            _common.OperationResult(
140                server_request_call_start_batch_result,
141                server_request_call_event.completion_type,
142                server_request_call_event.success,
143            ),
144            _common.OperationResult(
145                cygrpc.CallError.ok,
146                client_receive_initial_metadata_event.completion_type,
147                client_receive_initial_metadata_event.success,
148            ),
149            _common.OperationResult(
150                cygrpc.CallError.ok,
151                client_complete_rpc_event.completion_type,
152                client_complete_rpc_event.success,
153            ),
154            _common.OperationResult(
155                server_send_initial_metadata_start_batch_result,
156                server_send_initial_metadata_event.completion_type,
157                server_send_initial_metadata_event.success,
158            ),
159            _common.OperationResult(
160                server_complete_rpc_start_batch_result,
161                server_complete_rpc_event.completion_type,
162                server_complete_rpc_event.success,
163            ),
164        )
165
166    def test_rpcs(self):
167        expecteds = [
168            (_common.SUCCESSFUL_OPERATION_RESULT,) * 5
169        ] * _common.RPC_COUNT
170        actuallys = _common.execute_many_times(self._do_rpcs)
171        self.assertSequenceEqual(expecteds, actuallys)
172
173
174if __name__ == "__main__":
175    unittest.main(verbosity=2)
176