• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2016 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Test a corner-case at the level of the Cython API."""
15
16import threading
17import unittest
18
19from grpc._cython import cygrpc
20
21from tests.unit._cython import test_utilities
22
23_EMPTY_FLAGS = 0
24_EMPTY_METADATA = ()
25
26
27class _ServerDriver(object):
28    def __init__(self, completion_queue, shutdown_tag):
29        self._condition = threading.Condition()
30        self._completion_queue = completion_queue
31        self._shutdown_tag = shutdown_tag
32        self._events = []
33        self._saw_shutdown_tag = False
34
35    def start(self):
36        def in_thread():
37            while True:
38                event = self._completion_queue.poll()
39                with self._condition:
40                    self._events.append(event)
41                    self._condition.notify()
42                    if event.tag is self._shutdown_tag:
43                        self._saw_shutdown_tag = True
44                        break
45
46        thread = threading.Thread(target=in_thread)
47        thread.start()
48
49    def done(self):
50        with self._condition:
51            return self._saw_shutdown_tag
52
53    def first_event(self):
54        with self._condition:
55            while not self._events:
56                self._condition.wait()
57            return self._events[0]
58
59    def events(self):
60        with self._condition:
61            while not self._saw_shutdown_tag:
62                self._condition.wait()
63            return tuple(self._events)
64
65
66class _QueueDriver(object):
67    def __init__(self, condition, completion_queue, due):
68        self._condition = condition
69        self._completion_queue = completion_queue
70        self._due = due
71        self._events = []
72        self._returned = False
73
74    def start(self):
75        def in_thread():
76            while True:
77                event = self._completion_queue.poll()
78                with self._condition:
79                    self._events.append(event)
80                    self._due.remove(event.tag)
81                    self._condition.notify_all()
82                    if not self._due:
83                        self._returned = True
84                        return
85
86        thread = threading.Thread(target=in_thread)
87        thread.start()
88
89    def done(self):
90        with self._condition:
91            return self._returned
92
93    def event_with_tag(self, tag):
94        with self._condition:
95            while True:
96                for event in self._events:
97                    if event.tag is tag:
98                        return event
99                self._condition.wait()
100
101    def events(self):
102        with self._condition:
103            while not self._returned:
104                self._condition.wait()
105            return tuple(self._events)
106
107
108class ReadSomeButNotAllResponsesTest(unittest.TestCase):
109    def testReadSomeButNotAllResponses(self):
110        server_completion_queue = cygrpc.CompletionQueue()
111        server = cygrpc.Server(
112            [
113                (
114                    b"grpc.so_reuseport",
115                    0,
116                )
117            ],
118            False,
119        )
120        server.register_completion_queue(server_completion_queue)
121        port = server.add_http2_port(b"[::]:0")
122        server.start()
123        channel = cygrpc.Channel(
124            "localhost:{}".format(port).encode(), set(), None
125        )
126
127        server_shutdown_tag = "server_shutdown_tag"
128        server_driver = _ServerDriver(
129            server_completion_queue, server_shutdown_tag
130        )
131        server_driver.start()
132
133        client_condition = threading.Condition()
134        client_due = set()
135
136        server_call_condition = threading.Condition()
137        server_send_initial_metadata_tag = "server_send_initial_metadata_tag"
138        server_send_first_message_tag = "server_send_first_message_tag"
139        server_send_second_message_tag = "server_send_second_message_tag"
140        server_complete_rpc_tag = "server_complete_rpc_tag"
141        server_call_due = set(
142            (
143                server_send_initial_metadata_tag,
144                server_send_first_message_tag,
145                server_send_second_message_tag,
146                server_complete_rpc_tag,
147            )
148        )
149        server_call_completion_queue = cygrpc.CompletionQueue()
150        server_call_driver = _QueueDriver(
151            server_call_condition, server_call_completion_queue, server_call_due
152        )
153        server_call_driver.start()
154
155        server_rpc_tag = "server_rpc_tag"
156        request_call_result = server.request_call(
157            server_call_completion_queue,
158            server_completion_queue,
159            server_rpc_tag,
160        )
161
162        client_receive_initial_metadata_tag = (
163            "client_receive_initial_metadata_tag"
164        )
165        client_complete_rpc_tag = "client_complete_rpc_tag"
166        client_call = channel.segregated_call(
167            _EMPTY_FLAGS,
168            b"/twinkies",
169            None,
170            None,
171            _EMPTY_METADATA,
172            None,
173            (
174                (
175                    [
176                        cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
177                    ],
178                    client_receive_initial_metadata_tag,
179                ),
180                (
181                    [
182                        cygrpc.SendInitialMetadataOperation(
183                            _EMPTY_METADATA, _EMPTY_FLAGS
184                        ),
185                        cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
186                        cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
187                    ],
188                    client_complete_rpc_tag,
189                ),
190            ),
191        )
192        client_receive_initial_metadata_event_future = (
193            test_utilities.SimpleFuture(client_call.next_event)
194        )
195
196        server_rpc_event = server_driver.first_event()
197
198        with server_call_condition:
199            server_send_initial_metadata_start_batch_result = (
200                server_rpc_event.call.start_server_batch(
201                    [
202                        cygrpc.SendInitialMetadataOperation(
203                            _EMPTY_METADATA, _EMPTY_FLAGS
204                        ),
205                    ],
206                    server_send_initial_metadata_tag,
207                )
208            )
209            server_send_first_message_start_batch_result = (
210                server_rpc_event.call.start_server_batch(
211                    [
212                        cygrpc.SendMessageOperation(b"\x07", _EMPTY_FLAGS),
213                    ],
214                    server_send_first_message_tag,
215                )
216            )
217        server_send_initial_metadata_event = server_call_driver.event_with_tag(
218            server_send_initial_metadata_tag
219        )
220        server_send_first_message_event = server_call_driver.event_with_tag(
221            server_send_first_message_tag
222        )
223        with server_call_condition:
224            server_send_second_message_start_batch_result = (
225                server_rpc_event.call.start_server_batch(
226                    [
227                        cygrpc.SendMessageOperation(b"\x07", _EMPTY_FLAGS),
228                    ],
229                    server_send_second_message_tag,
230                )
231            )
232            server_complete_rpc_start_batch_result = (
233                server_rpc_event.call.start_server_batch(
234                    [
235                        cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),
236                        cygrpc.SendStatusFromServerOperation(
237                            (),
238                            cygrpc.StatusCode.ok,
239                            b"test details",
240                            _EMPTY_FLAGS,
241                        ),
242                    ],
243                    server_complete_rpc_tag,
244                )
245            )
246        server_send_second_message_event = server_call_driver.event_with_tag(
247            server_send_second_message_tag
248        )
249        server_complete_rpc_event = server_call_driver.event_with_tag(
250            server_complete_rpc_tag
251        )
252        server_call_driver.events()
253
254        client_recieve_initial_metadata_event = (
255            client_receive_initial_metadata_event_future.result()
256        )
257
258        client_receive_first_message_tag = "client_receive_first_message_tag"
259        client_call.operate(
260            [
261                cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
262            ],
263            client_receive_first_message_tag,
264        )
265        client_receive_first_message_event = client_call.next_event()
266
267        client_call_cancel_result = client_call.cancel(
268            cygrpc.StatusCode.cancelled, "Cancelled during test!"
269        )
270        client_complete_rpc_event = client_call.next_event()
271
272        channel.close(cygrpc.StatusCode.unknown, "Channel closed!")
273        server.shutdown(server_completion_queue, server_shutdown_tag)
274        server.cancel_all_calls()
275        server_driver.events()
276
277        self.assertEqual(cygrpc.CallError.ok, request_call_result)
278        self.assertEqual(
279            cygrpc.CallError.ok, server_send_initial_metadata_start_batch_result
280        )
281        self.assertIs(server_rpc_tag, server_rpc_event.tag)
282        self.assertEqual(
283            cygrpc.CompletionType.operation_complete,
284            server_rpc_event.completion_type,
285        )
286        self.assertIsInstance(server_rpc_event.call, cygrpc.Call)
287
288
289if __name__ == "__main__":
290    unittest.main(verbosity=2)
291