• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2018 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Implementations of fork support test methods."""
15
16import enum
17import json
18import logging
19import multiprocessing
20import os
21import threading
22import time
23
24import grpc
25
26from six.moves import queue
27
28from src.proto.grpc.testing import empty_pb2
29from src.proto.grpc.testing import messages_pb2
30from src.proto.grpc.testing import test_pb2_grpc
31
32_LOGGER = logging.getLogger(__name__)
33_RPC_TIMEOUT_S = 10
34_CHILD_FINISH_TIMEOUT_S = 60
35
36
37def _channel(args):
38    target = '{}:{}'.format(args['server_host'], args['server_port'])
39    if args['use_tls']:
40        channel_credentials = grpc.ssl_channel_credentials()
41        channel = grpc.secure_channel(target, channel_credentials)
42    else:
43        channel = grpc.insecure_channel(target)
44    return channel
45
46
47def _validate_payload_type_and_length(response, expected_type, expected_length):
48    if response.payload.type is not expected_type:
49        raise ValueError('expected payload type %s, got %s' %
50                         (expected_type, type(response.payload.type)))
51    elif len(response.payload.body) != expected_length:
52        raise ValueError('expected payload body size %d, got %d' %
53                         (expected_length, len(response.payload.body)))
54
55
56def _async_unary(stub):
57    size = 314159
58    request = messages_pb2.SimpleRequest(
59        response_type=messages_pb2.COMPRESSABLE,
60        response_size=size,
61        payload=messages_pb2.Payload(body=b'\x00' * 271828))
62    response_future = stub.UnaryCall.future(request, timeout=_RPC_TIMEOUT_S)
63    response = response_future.result()
64    _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, size)
65
66
67def _blocking_unary(stub):
68    size = 314159
69    request = messages_pb2.SimpleRequest(
70        response_type=messages_pb2.COMPRESSABLE,
71        response_size=size,
72        payload=messages_pb2.Payload(body=b'\x00' * 271828))
73    response = stub.UnaryCall(request, timeout=_RPC_TIMEOUT_S)
74    _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, size)
75
76
77class _Pipe(object):
78
79    def __init__(self):
80        self._condition = threading.Condition()
81        self._values = []
82        self._open = True
83
84    def __iter__(self):
85        return self
86
87    def __next__(self):
88        return self.next()
89
90    def next(self):
91        with self._condition:
92            while not self._values and self._open:
93                self._condition.wait()
94            if self._values:
95                return self._values.pop(0)
96            else:
97                raise StopIteration()
98
99    def add(self, value):
100        with self._condition:
101            self._values.append(value)
102            self._condition.notify()
103
104    def close(self):
105        with self._condition:
106            self._open = False
107            self._condition.notify()
108
109    def __enter__(self):
110        return self
111
112    def __exit__(self, type, value, traceback):
113        self.close()
114
115
116class _ChildProcess(object):
117
118    def __init__(self, task, args=None):
119        if args is None:
120            args = ()
121        self._exceptions = multiprocessing.Queue()
122
123        def record_exceptions():
124            try:
125                task(*args)
126            except grpc.RpcError as rpc_error:
127                self._exceptions.put('RpcError: %s' % rpc_error)
128            except Exception as e:  # pylint: disable=broad-except
129                self._exceptions.put(e)
130
131        self._process = multiprocessing.Process(target=record_exceptions)
132
133    def start(self):
134        self._process.start()
135
136    def finish(self):
137        self._process.join(timeout=_CHILD_FINISH_TIMEOUT_S)
138        if self._process.is_alive():
139            raise RuntimeError('Child process did not terminate')
140        if self._process.exitcode != 0:
141            raise ValueError('Child process failed with exitcode %d' %
142                             self._process.exitcode)
143        try:
144            exception = self._exceptions.get(block=False)
145            raise ValueError('Child process failed: %s' % exception)
146        except queue.Empty:
147            pass
148
149
150def _async_unary_same_channel(channel):
151
152    def child_target():
153        try:
154            _async_unary(stub)
155            raise Exception(
156                'Child should not be able to re-use channel after fork')
157        except ValueError as expected_value_error:
158            pass
159
160    stub = test_pb2_grpc.TestServiceStub(channel)
161    _async_unary(stub)
162    child_process = _ChildProcess(child_target)
163    child_process.start()
164    _async_unary(stub)
165    child_process.finish()
166
167
168def _async_unary_new_channel(channel, args):
169
170    def child_target():
171        with _channel(args) as child_channel:
172            child_stub = test_pb2_grpc.TestServiceStub(child_channel)
173            _async_unary(child_stub)
174            child_channel.close()
175
176    stub = test_pb2_grpc.TestServiceStub(channel)
177    _async_unary(stub)
178    child_process = _ChildProcess(child_target)
179    child_process.start()
180    _async_unary(stub)
181    child_process.finish()
182
183
184def _blocking_unary_same_channel(channel):
185
186    def child_target():
187        try:
188            _blocking_unary(stub)
189            raise Exception(
190                'Child should not be able to re-use channel after fork')
191        except ValueError as expected_value_error:
192            pass
193
194    stub = test_pb2_grpc.TestServiceStub(channel)
195    _blocking_unary(stub)
196    child_process = _ChildProcess(child_target)
197    child_process.start()
198    child_process.finish()
199
200
201def _blocking_unary_new_channel(channel, args):
202
203    def child_target():
204        with _channel(args) as child_channel:
205            child_stub = test_pb2_grpc.TestServiceStub(child_channel)
206            _blocking_unary(child_stub)
207
208    stub = test_pb2_grpc.TestServiceStub(channel)
209    _blocking_unary(stub)
210    child_process = _ChildProcess(child_target)
211    child_process.start()
212    _blocking_unary(stub)
213    child_process.finish()
214
215
216# Verify that the fork channel registry can handle already closed channels
217def _close_channel_before_fork(channel, args):
218
219    def child_target():
220        new_channel.close()
221        with _channel(args) as child_channel:
222            child_stub = test_pb2_grpc.TestServiceStub(child_channel)
223            _blocking_unary(child_stub)
224
225    stub = test_pb2_grpc.TestServiceStub(channel)
226    _blocking_unary(stub)
227    channel.close()
228
229    with _channel(args) as new_channel:
230        new_stub = test_pb2_grpc.TestServiceStub(new_channel)
231        child_process = _ChildProcess(child_target)
232        child_process.start()
233        _blocking_unary(new_stub)
234        child_process.finish()
235
236
237def _connectivity_watch(channel, args):
238
239    parent_states = []
240    parent_channel_ready_event = threading.Event()
241
242    def child_target():
243
244        child_channel_ready_event = threading.Event()
245
246        def child_connectivity_callback(state):
247            if state is grpc.ChannelConnectivity.READY:
248                child_channel_ready_event.set()
249
250        with _channel(args) as child_channel:
251            child_stub = test_pb2_grpc.TestServiceStub(child_channel)
252            child_channel.subscribe(child_connectivity_callback)
253            _async_unary(child_stub)
254            if not child_channel_ready_event.wait(timeout=_RPC_TIMEOUT_S):
255                raise ValueError('Channel did not move to READY')
256            if len(parent_states) > 1:
257                raise ValueError(
258                    'Received connectivity updates on parent callback',
259                    parent_states)
260            child_channel.unsubscribe(child_connectivity_callback)
261
262    def parent_connectivity_callback(state):
263        parent_states.append(state)
264        if state is grpc.ChannelConnectivity.READY:
265            parent_channel_ready_event.set()
266
267    channel.subscribe(parent_connectivity_callback)
268    stub = test_pb2_grpc.TestServiceStub(channel)
269    child_process = _ChildProcess(child_target)
270    child_process.start()
271    _async_unary(stub)
272    if not parent_channel_ready_event.wait(timeout=_RPC_TIMEOUT_S):
273        raise ValueError('Channel did not move to READY')
274    channel.unsubscribe(parent_connectivity_callback)
275    child_process.finish()
276
277
278def _ping_pong_with_child_processes_after_first_response(
279        channel, args, child_target, run_after_close=True):
280    request_response_sizes = (
281        31415,
282        9,
283        2653,
284        58979,
285    )
286    request_payload_sizes = (
287        27182,
288        8,
289        1828,
290        45904,
291    )
292    stub = test_pb2_grpc.TestServiceStub(channel)
293    pipe = _Pipe()
294    parent_bidi_call = stub.FullDuplexCall(pipe)
295    child_processes = []
296    first_message_received = False
297    for response_size, payload_size in zip(request_response_sizes,
298                                           request_payload_sizes):
299        request = messages_pb2.StreamingOutputCallRequest(
300            response_type=messages_pb2.COMPRESSABLE,
301            response_parameters=(messages_pb2.ResponseParameters(
302                size=response_size),),
303            payload=messages_pb2.Payload(body=b'\x00' * payload_size))
304        pipe.add(request)
305        if first_message_received:
306            child_process = _ChildProcess(child_target,
307                                          (parent_bidi_call, channel, args))
308            child_process.start()
309            child_processes.append(child_process)
310        response = next(parent_bidi_call)
311        first_message_received = True
312        child_process = _ChildProcess(child_target,
313                                      (parent_bidi_call, channel, args))
314        child_process.start()
315        child_processes.append(child_process)
316        _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE,
317                                          response_size)
318    pipe.close()
319    if run_after_close:
320        child_process = _ChildProcess(child_target,
321                                      (parent_bidi_call, channel, args))
322        child_process.start()
323        child_processes.append(child_process)
324    for child_process in child_processes:
325        child_process.finish()
326
327
328def _in_progress_bidi_continue_call(channel):
329
330    def child_target(parent_bidi_call, parent_channel, args):
331        stub = test_pb2_grpc.TestServiceStub(parent_channel)
332        try:
333            _async_unary(stub)
334            raise Exception(
335                'Child should not be able to re-use channel after fork')
336        except ValueError as expected_value_error:
337            pass
338        inherited_code = parent_bidi_call.code()
339        inherited_details = parent_bidi_call.details()
340        if inherited_code != grpc.StatusCode.CANCELLED:
341            raise ValueError('Expected inherited code CANCELLED, got %s' %
342                             inherited_code)
343        if inherited_details != 'Channel closed due to fork':
344            raise ValueError(
345                'Expected inherited details Channel closed due to fork, got %s'
346                % inherited_details)
347
348    # Don't run child_target after closing the parent call, as the call may have
349    # received a status from the  server before fork occurs.
350    _ping_pong_with_child_processes_after_first_response(channel,
351                                                         None,
352                                                         child_target,
353                                                         run_after_close=False)
354
355
356def _in_progress_bidi_same_channel_async_call(channel):
357
358    def child_target(parent_bidi_call, parent_channel, args):
359        stub = test_pb2_grpc.TestServiceStub(parent_channel)
360        try:
361            _async_unary(stub)
362            raise Exception(
363                'Child should not be able to re-use channel after fork')
364        except ValueError as expected_value_error:
365            pass
366
367    _ping_pong_with_child_processes_after_first_response(
368        channel, None, child_target)
369
370
371def _in_progress_bidi_same_channel_blocking_call(channel):
372
373    def child_target(parent_bidi_call, parent_channel, args):
374        stub = test_pb2_grpc.TestServiceStub(parent_channel)
375        try:
376            _blocking_unary(stub)
377            raise Exception(
378                'Child should not be able to re-use channel after fork')
379        except ValueError as expected_value_error:
380            pass
381
382    _ping_pong_with_child_processes_after_first_response(
383        channel, None, child_target)
384
385
386def _in_progress_bidi_new_channel_async_call(channel, args):
387
388    def child_target(parent_bidi_call, parent_channel, args):
389        with _channel(args) as channel:
390            stub = test_pb2_grpc.TestServiceStub(channel)
391            _async_unary(stub)
392
393    _ping_pong_with_child_processes_after_first_response(
394        channel, args, child_target)
395
396
397def _in_progress_bidi_new_channel_blocking_call(channel, args):
398
399    def child_target(parent_bidi_call, parent_channel, args):
400        with _channel(args) as channel:
401            stub = test_pb2_grpc.TestServiceStub(channel)
402            _blocking_unary(stub)
403
404    _ping_pong_with_child_processes_after_first_response(
405        channel, args, child_target)
406
407
408@enum.unique
409class TestCase(enum.Enum):
410
411    CONNECTIVITY_WATCH = 'connectivity_watch'
412    CLOSE_CHANNEL_BEFORE_FORK = 'close_channel_before_fork'
413    ASYNC_UNARY_SAME_CHANNEL = 'async_unary_same_channel'
414    ASYNC_UNARY_NEW_CHANNEL = 'async_unary_new_channel'
415    BLOCKING_UNARY_SAME_CHANNEL = 'blocking_unary_same_channel'
416    BLOCKING_UNARY_NEW_CHANNEL = 'blocking_unary_new_channel'
417    IN_PROGRESS_BIDI_CONTINUE_CALL = 'in_progress_bidi_continue_call'
418    IN_PROGRESS_BIDI_SAME_CHANNEL_ASYNC_CALL = 'in_progress_bidi_same_channel_async_call'
419    IN_PROGRESS_BIDI_SAME_CHANNEL_BLOCKING_CALL = 'in_progress_bidi_same_channel_blocking_call'
420    IN_PROGRESS_BIDI_NEW_CHANNEL_ASYNC_CALL = 'in_progress_bidi_new_channel_async_call'
421    IN_PROGRESS_BIDI_NEW_CHANNEL_BLOCKING_CALL = 'in_progress_bidi_new_channel_blocking_call'
422
423    def run_test(self, args):
424        _LOGGER.info("Running %s", self)
425        channel = _channel(args)
426        if self is TestCase.ASYNC_UNARY_SAME_CHANNEL:
427            _async_unary_same_channel(channel)
428        elif self is TestCase.ASYNC_UNARY_NEW_CHANNEL:
429            _async_unary_new_channel(channel, args)
430        elif self is TestCase.BLOCKING_UNARY_SAME_CHANNEL:
431            _blocking_unary_same_channel(channel)
432        elif self is TestCase.BLOCKING_UNARY_NEW_CHANNEL:
433            _blocking_unary_new_channel(channel, args)
434        elif self is TestCase.CLOSE_CHANNEL_BEFORE_FORK:
435            _close_channel_before_fork(channel, args)
436        elif self is TestCase.CONNECTIVITY_WATCH:
437            _connectivity_watch(channel, args)
438        elif self is TestCase.IN_PROGRESS_BIDI_CONTINUE_CALL:
439            _in_progress_bidi_continue_call(channel)
440        elif self is TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_ASYNC_CALL:
441            _in_progress_bidi_same_channel_async_call(channel)
442        elif self is TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_BLOCKING_CALL:
443            _in_progress_bidi_same_channel_blocking_call(channel)
444        elif self is TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_ASYNC_CALL:
445            _in_progress_bidi_new_channel_async_call(channel, args)
446        elif self is TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_BLOCKING_CALL:
447            _in_progress_bidi_new_channel_blocking_call(channel, args)
448        else:
449            raise NotImplementedError('Test case "%s" not implemented!' %
450                                      self.name)
451        channel.close()
452