• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2018 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Implementations of fork support test methods."""
15
16import enum
17import json
18import logging
19import multiprocessing
20import os
21import threading
22import time
23
24import grpc
25
26from six.moves import queue
27
28from src.proto.grpc.testing import empty_pb2
29from src.proto.grpc.testing import messages_pb2
30from src.proto.grpc.testing import test_pb2_grpc
31
32_LOGGER = logging.getLogger(__name__)
33_RPC_TIMEOUT_S = 10
34_CHILD_FINISH_TIMEOUT_S = 60
35
36
37def _channel(args):
38    target = '{}:{}'.format(args['server_host'], args['server_port'])
39    if args['use_tls']:
40        channel_credentials = grpc.ssl_channel_credentials()
41        channel = grpc.secure_channel(target, channel_credentials)
42    else:
43        channel = grpc.insecure_channel(target)
44    return channel
45
46
47def _validate_payload_type_and_length(response, expected_type, expected_length):
48    if response.payload.type is not expected_type:
49        raise ValueError('expected payload type %s, got %s' %
50                         (expected_type, type(response.payload.type)))
51    elif len(response.payload.body) != expected_length:
52        raise ValueError('expected payload body size %d, got %d' %
53                         (expected_length, len(response.payload.body)))
54
55
56def _async_unary(stub):
57    size = 314159
58    request = messages_pb2.SimpleRequest(
59        response_type=messages_pb2.COMPRESSABLE,
60        response_size=size,
61        payload=messages_pb2.Payload(body=b'\x00' * 271828))
62    response_future = stub.UnaryCall.future(request, timeout=_RPC_TIMEOUT_S)
63    response = response_future.result()
64    _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, size)
65
66
67def _blocking_unary(stub):
68    size = 314159
69    request = messages_pb2.SimpleRequest(
70        response_type=messages_pb2.COMPRESSABLE,
71        response_size=size,
72        payload=messages_pb2.Payload(body=b'\x00' * 271828))
73    response = stub.UnaryCall(request, timeout=_RPC_TIMEOUT_S)
74    _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, size)
75
76
77class _Pipe(object):
78
79    def __init__(self):
80        self._condition = threading.Condition()
81        self._values = []
82        self._open = True
83
84    def __iter__(self):
85        return self
86
87    def __next__(self):
88        return self.next()
89
90    def next(self):
91        with self._condition:
92            while not self._values and self._open:
93                self._condition.wait()
94            if self._values:
95                return self._values.pop(0)
96            else:
97                raise StopIteration()
98
99    def add(self, value):
100        with self._condition:
101            self._values.append(value)
102            self._condition.notify()
103
104    def close(self):
105        with self._condition:
106            self._open = False
107            self._condition.notify()
108
109    def __enter__(self):
110        return self
111
112    def __exit__(self, type, value, traceback):
113        self.close()
114
115
116class _ChildProcess(object):
117
118    def __init__(self, task, args=None):
119        if args is None:
120            args = ()
121        self._exceptions = multiprocessing.Queue()
122
123        def record_exceptions():
124            try:
125                task(*args)
126            except grpc.RpcError as rpc_error:
127                self._exceptions.put('RpcError: %s' % rpc_error)
128            except Exception as e:  # pylint: disable=broad-except
129                self._exceptions.put(e)
130
131        self._process = multiprocessing.Process(target=record_exceptions)
132
133    def start(self):
134        self._process.start()
135
136    def finish(self):
137        self._process.join(timeout=_CHILD_FINISH_TIMEOUT_S)
138        if self._process.is_alive():
139            raise RuntimeError('Child process did not terminate')
140        if self._process.exitcode != 0:
141            raise ValueError('Child process failed with exitcode %d' %
142                             self._process.exitcode)
143        try:
144            exception = self._exceptions.get(block=False)
145            raise ValueError('Child process failed: "%s": "%s"' %
146                             (repr(exception), exception))
147        except queue.Empty:
148            pass
149
150
151def _async_unary_same_channel(channel):
152
153    def child_target():
154        try:
155            _async_unary(stub)
156            raise Exception(
157                'Child should not be able to re-use channel after fork')
158        except ValueError as expected_value_error:
159            pass
160
161    stub = test_pb2_grpc.TestServiceStub(channel)
162    _async_unary(stub)
163    child_process = _ChildProcess(child_target)
164    child_process.start()
165    _async_unary(stub)
166    child_process.finish()
167
168
169def _async_unary_new_channel(channel, args):
170
171    def child_target():
172        with _channel(args) as child_channel:
173            child_stub = test_pb2_grpc.TestServiceStub(child_channel)
174            _async_unary(child_stub)
175            child_channel.close()
176
177    stub = test_pb2_grpc.TestServiceStub(channel)
178    _async_unary(stub)
179    child_process = _ChildProcess(child_target)
180    child_process.start()
181    _async_unary(stub)
182    child_process.finish()
183
184
185def _blocking_unary_same_channel(channel):
186
187    def child_target():
188        try:
189            _blocking_unary(stub)
190            raise Exception(
191                'Child should not be able to re-use channel after fork')
192        except ValueError as expected_value_error:
193            pass
194
195    stub = test_pb2_grpc.TestServiceStub(channel)
196    _blocking_unary(stub)
197    child_process = _ChildProcess(child_target)
198    child_process.start()
199    child_process.finish()
200
201
202def _blocking_unary_new_channel(channel, args):
203
204    def child_target():
205        with _channel(args) as child_channel:
206            child_stub = test_pb2_grpc.TestServiceStub(child_channel)
207            _blocking_unary(child_stub)
208
209    stub = test_pb2_grpc.TestServiceStub(channel)
210    _blocking_unary(stub)
211    child_process = _ChildProcess(child_target)
212    child_process.start()
213    _blocking_unary(stub)
214    child_process.finish()
215
216
217# Verify that the fork channel registry can handle already closed channels
218def _close_channel_before_fork(channel, args):
219
220    def child_target():
221        new_channel.close()
222        with _channel(args) as child_channel:
223            child_stub = test_pb2_grpc.TestServiceStub(child_channel)
224            _blocking_unary(child_stub)
225
226    stub = test_pb2_grpc.TestServiceStub(channel)
227    _blocking_unary(stub)
228    channel.close()
229
230    with _channel(args) as new_channel:
231        new_stub = test_pb2_grpc.TestServiceStub(new_channel)
232        child_process = _ChildProcess(child_target)
233        child_process.start()
234        _blocking_unary(new_stub)
235        child_process.finish()
236
237
238def _connectivity_watch(channel, args):
239
240    parent_states = []
241    parent_channel_ready_event = threading.Event()
242
243    def child_target():
244
245        child_channel_ready_event = threading.Event()
246
247        def child_connectivity_callback(state):
248            if state is grpc.ChannelConnectivity.READY:
249                child_channel_ready_event.set()
250
251        with _channel(args) as child_channel:
252            child_stub = test_pb2_grpc.TestServiceStub(child_channel)
253            child_channel.subscribe(child_connectivity_callback)
254            _async_unary(child_stub)
255            if not child_channel_ready_event.wait(timeout=_RPC_TIMEOUT_S):
256                raise ValueError('Channel did not move to READY')
257            if len(parent_states) > 1:
258                raise ValueError(
259                    'Received connectivity updates on parent callback',
260                    parent_states)
261            child_channel.unsubscribe(child_connectivity_callback)
262
263    def parent_connectivity_callback(state):
264        parent_states.append(state)
265        if state is grpc.ChannelConnectivity.READY:
266            parent_channel_ready_event.set()
267
268    channel.subscribe(parent_connectivity_callback)
269    stub = test_pb2_grpc.TestServiceStub(channel)
270    child_process = _ChildProcess(child_target)
271    child_process.start()
272    _async_unary(stub)
273    if not parent_channel_ready_event.wait(timeout=_RPC_TIMEOUT_S):
274        raise ValueError('Channel did not move to READY')
275    channel.unsubscribe(parent_connectivity_callback)
276    child_process.finish()
277
278
279def _ping_pong_with_child_processes_after_first_response(
280        channel, args, child_target, run_after_close=True):
281    request_response_sizes = (
282        31415,
283        9,
284        2653,
285        58979,
286    )
287    request_payload_sizes = (
288        27182,
289        8,
290        1828,
291        45904,
292    )
293    stub = test_pb2_grpc.TestServiceStub(channel)
294    pipe = _Pipe()
295    parent_bidi_call = stub.FullDuplexCall(pipe)
296    child_processes = []
297    first_message_received = False
298    for response_size, payload_size in zip(request_response_sizes,
299                                           request_payload_sizes):
300        request = messages_pb2.StreamingOutputCallRequest(
301            response_type=messages_pb2.COMPRESSABLE,
302            response_parameters=(messages_pb2.ResponseParameters(
303                size=response_size),),
304            payload=messages_pb2.Payload(body=b'\x00' * payload_size))
305        pipe.add(request)
306        if first_message_received:
307            child_process = _ChildProcess(child_target,
308                                          (parent_bidi_call, channel, args))
309            child_process.start()
310            child_processes.append(child_process)
311        response = next(parent_bidi_call)
312        first_message_received = True
313        child_process = _ChildProcess(child_target,
314                                      (parent_bidi_call, channel, args))
315        child_process.start()
316        child_processes.append(child_process)
317        _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE,
318                                          response_size)
319    pipe.close()
320    if run_after_close:
321        child_process = _ChildProcess(child_target,
322                                      (parent_bidi_call, channel, args))
323        child_process.start()
324        child_processes.append(child_process)
325    for child_process in child_processes:
326        child_process.finish()
327
328
329def _in_progress_bidi_continue_call(channel):
330
331    def child_target(parent_bidi_call, parent_channel, args):
332        stub = test_pb2_grpc.TestServiceStub(parent_channel)
333        try:
334            _async_unary(stub)
335            raise Exception(
336                'Child should not be able to re-use channel after fork')
337        except ValueError as expected_value_error:
338            pass
339        inherited_code = parent_bidi_call.code()
340        inherited_details = parent_bidi_call.details()
341        if inherited_code != grpc.StatusCode.CANCELLED:
342            raise ValueError('Expected inherited code CANCELLED, got %s' %
343                             inherited_code)
344        if inherited_details != 'Channel closed due to fork':
345            raise ValueError(
346                'Expected inherited details Channel closed due to fork, got %s'
347                % inherited_details)
348
349    # Don't run child_target after closing the parent call, as the call may have
350    # received a status from the  server before fork occurs.
351    _ping_pong_with_child_processes_after_first_response(channel,
352                                                         None,
353                                                         child_target,
354                                                         run_after_close=False)
355
356
357def _in_progress_bidi_same_channel_async_call(channel):
358
359    def child_target(parent_bidi_call, parent_channel, args):
360        stub = test_pb2_grpc.TestServiceStub(parent_channel)
361        try:
362            _async_unary(stub)
363            raise Exception(
364                'Child should not be able to re-use channel after fork')
365        except ValueError as expected_value_error:
366            pass
367
368    _ping_pong_with_child_processes_after_first_response(
369        channel, None, child_target)
370
371
372def _in_progress_bidi_same_channel_blocking_call(channel):
373
374    def child_target(parent_bidi_call, parent_channel, args):
375        stub = test_pb2_grpc.TestServiceStub(parent_channel)
376        try:
377            _blocking_unary(stub)
378            raise Exception(
379                'Child should not be able to re-use channel after fork')
380        except ValueError as expected_value_error:
381            pass
382
383    _ping_pong_with_child_processes_after_first_response(
384        channel, None, child_target)
385
386
387def _in_progress_bidi_new_channel_async_call(channel, args):
388
389    def child_target(parent_bidi_call, parent_channel, args):
390        with _channel(args) as channel:
391            stub = test_pb2_grpc.TestServiceStub(channel)
392            _async_unary(stub)
393
394    _ping_pong_with_child_processes_after_first_response(
395        channel, args, child_target)
396
397
398def _in_progress_bidi_new_channel_blocking_call(channel, args):
399
400    def child_target(parent_bidi_call, parent_channel, args):
401        with _channel(args) as channel:
402            stub = test_pb2_grpc.TestServiceStub(channel)
403            _blocking_unary(stub)
404
405    _ping_pong_with_child_processes_after_first_response(
406        channel, args, child_target)
407
408
409@enum.unique
410class TestCase(enum.Enum):
411
412    CONNECTIVITY_WATCH = 'connectivity_watch'
413    CLOSE_CHANNEL_BEFORE_FORK = 'close_channel_before_fork'
414    ASYNC_UNARY_SAME_CHANNEL = 'async_unary_same_channel'
415    ASYNC_UNARY_NEW_CHANNEL = 'async_unary_new_channel'
416    BLOCKING_UNARY_SAME_CHANNEL = 'blocking_unary_same_channel'
417    BLOCKING_UNARY_NEW_CHANNEL = 'blocking_unary_new_channel'
418    IN_PROGRESS_BIDI_CONTINUE_CALL = 'in_progress_bidi_continue_call'
419    IN_PROGRESS_BIDI_SAME_CHANNEL_ASYNC_CALL = 'in_progress_bidi_same_channel_async_call'
420    IN_PROGRESS_BIDI_SAME_CHANNEL_BLOCKING_CALL = 'in_progress_bidi_same_channel_blocking_call'
421    IN_PROGRESS_BIDI_NEW_CHANNEL_ASYNC_CALL = 'in_progress_bidi_new_channel_async_call'
422    IN_PROGRESS_BIDI_NEW_CHANNEL_BLOCKING_CALL = 'in_progress_bidi_new_channel_blocking_call'
423
424    def run_test(self, args):
425        _LOGGER.info("Running %s", self)
426        channel = _channel(args)
427        if self is TestCase.ASYNC_UNARY_SAME_CHANNEL:
428            _async_unary_same_channel(channel)
429        elif self is TestCase.ASYNC_UNARY_NEW_CHANNEL:
430            _async_unary_new_channel(channel, args)
431        elif self is TestCase.BLOCKING_UNARY_SAME_CHANNEL:
432            _blocking_unary_same_channel(channel)
433        elif self is TestCase.BLOCKING_UNARY_NEW_CHANNEL:
434            _blocking_unary_new_channel(channel, args)
435        elif self is TestCase.CLOSE_CHANNEL_BEFORE_FORK:
436            _close_channel_before_fork(channel, args)
437        elif self is TestCase.CONNECTIVITY_WATCH:
438            _connectivity_watch(channel, args)
439        elif self is TestCase.IN_PROGRESS_BIDI_CONTINUE_CALL:
440            _in_progress_bidi_continue_call(channel)
441        elif self is TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_ASYNC_CALL:
442            _in_progress_bidi_same_channel_async_call(channel)
443        elif self is TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_BLOCKING_CALL:
444            _in_progress_bidi_same_channel_blocking_call(channel)
445        elif self is TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_ASYNC_CALL:
446            _in_progress_bidi_new_channel_async_call(channel, args)
447        elif self is TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_BLOCKING_CALL:
448            _in_progress_bidi_new_channel_blocking_call(channel, args)
449        else:
450            raise NotImplementedError('Test case "%s" not implemented!' %
451                                      self.name)
452        channel.close()
453