1# Copyright 2018 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Implementations of fork support test methods.""" 15 16import enum 17import json 18import logging 19import multiprocessing 20import os 21import threading 22import time 23 24import grpc 25 26from six.moves import queue 27 28from src.proto.grpc.testing import empty_pb2 29from src.proto.grpc.testing import messages_pb2 30from src.proto.grpc.testing import test_pb2_grpc 31 32_LOGGER = logging.getLogger(__name__) 33_RPC_TIMEOUT_S = 10 34_CHILD_FINISH_TIMEOUT_S = 60 35 36 37def _channel(args): 38 target = '{}:{}'.format(args['server_host'], args['server_port']) 39 if args['use_tls']: 40 channel_credentials = grpc.ssl_channel_credentials() 41 channel = grpc.secure_channel(target, channel_credentials) 42 else: 43 channel = grpc.insecure_channel(target) 44 return channel 45 46 47def _validate_payload_type_and_length(response, expected_type, expected_length): 48 if response.payload.type is not expected_type: 49 raise ValueError('expected payload type %s, got %s' % 50 (expected_type, type(response.payload.type))) 51 elif len(response.payload.body) != expected_length: 52 raise ValueError('expected payload body size %d, got %d' % 53 (expected_length, len(response.payload.body))) 54 55 56def _async_unary(stub): 57 size = 314159 58 request = messages_pb2.SimpleRequest( 59 response_type=messages_pb2.COMPRESSABLE, 60 response_size=size, 61 payload=messages_pb2.Payload(body=b'\x00' * 271828)) 62 response_future = stub.UnaryCall.future(request, timeout=_RPC_TIMEOUT_S) 63 response = response_future.result() 64 _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, size) 65 66 67def _blocking_unary(stub): 68 size = 314159 69 request = messages_pb2.SimpleRequest( 70 response_type=messages_pb2.COMPRESSABLE, 71 response_size=size, 72 payload=messages_pb2.Payload(body=b'\x00' * 271828)) 73 response = stub.UnaryCall(request, timeout=_RPC_TIMEOUT_S) 74 _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, size) 75 76 77class _Pipe(object): 78 79 def __init__(self): 80 self._condition = threading.Condition() 81 self._values = [] 82 self._open = True 83 84 def __iter__(self): 85 return self 86 87 def __next__(self): 88 return self.next() 89 90 def next(self): 91 with self._condition: 92 while not self._values and self._open: 93 self._condition.wait() 94 if self._values: 95 return self._values.pop(0) 96 else: 97 raise StopIteration() 98 99 def add(self, value): 100 with self._condition: 101 self._values.append(value) 102 self._condition.notify() 103 104 def close(self): 105 with self._condition: 106 self._open = False 107 self._condition.notify() 108 109 def __enter__(self): 110 return self 111 112 def __exit__(self, type, value, traceback): 113 self.close() 114 115 116class _ChildProcess(object): 117 118 def __init__(self, task, args=None): 119 if args is None: 120 args = () 121 self._exceptions = multiprocessing.Queue() 122 123 def record_exceptions(): 124 try: 125 task(*args) 126 except grpc.RpcError as rpc_error: 127 self._exceptions.put('RpcError: %s' % rpc_error) 128 except Exception as e: # pylint: disable=broad-except 129 self._exceptions.put(e) 130 131 self._process = multiprocessing.Process(target=record_exceptions) 132 133 def start(self): 134 self._process.start() 135 136 def finish(self): 137 self._process.join(timeout=_CHILD_FINISH_TIMEOUT_S) 138 if self._process.is_alive(): 139 raise RuntimeError('Child process did not terminate') 140 if self._process.exitcode != 0: 141 raise ValueError('Child process failed with exitcode %d' % 142 self._process.exitcode) 143 try: 144 exception = self._exceptions.get(block=False) 145 raise ValueError('Child process failed: %s' % exception) 146 except queue.Empty: 147 pass 148 149 150def _async_unary_same_channel(channel): 151 152 def child_target(): 153 try: 154 _async_unary(stub) 155 raise Exception( 156 'Child should not be able to re-use channel after fork') 157 except ValueError as expected_value_error: 158 pass 159 160 stub = test_pb2_grpc.TestServiceStub(channel) 161 _async_unary(stub) 162 child_process = _ChildProcess(child_target) 163 child_process.start() 164 _async_unary(stub) 165 child_process.finish() 166 167 168def _async_unary_new_channel(channel, args): 169 170 def child_target(): 171 with _channel(args) as child_channel: 172 child_stub = test_pb2_grpc.TestServiceStub(child_channel) 173 _async_unary(child_stub) 174 child_channel.close() 175 176 stub = test_pb2_grpc.TestServiceStub(channel) 177 _async_unary(stub) 178 child_process = _ChildProcess(child_target) 179 child_process.start() 180 _async_unary(stub) 181 child_process.finish() 182 183 184def _blocking_unary_same_channel(channel): 185 186 def child_target(): 187 try: 188 _blocking_unary(stub) 189 raise Exception( 190 'Child should not be able to re-use channel after fork') 191 except ValueError as expected_value_error: 192 pass 193 194 stub = test_pb2_grpc.TestServiceStub(channel) 195 _blocking_unary(stub) 196 child_process = _ChildProcess(child_target) 197 child_process.start() 198 child_process.finish() 199 200 201def _blocking_unary_new_channel(channel, args): 202 203 def child_target(): 204 with _channel(args) as child_channel: 205 child_stub = test_pb2_grpc.TestServiceStub(child_channel) 206 _blocking_unary(child_stub) 207 208 stub = test_pb2_grpc.TestServiceStub(channel) 209 _blocking_unary(stub) 210 child_process = _ChildProcess(child_target) 211 child_process.start() 212 _blocking_unary(stub) 213 child_process.finish() 214 215 216# Verify that the fork channel registry can handle already closed channels 217def _close_channel_before_fork(channel, args): 218 219 def child_target(): 220 new_channel.close() 221 with _channel(args) as child_channel: 222 child_stub = test_pb2_grpc.TestServiceStub(child_channel) 223 _blocking_unary(child_stub) 224 225 stub = test_pb2_grpc.TestServiceStub(channel) 226 _blocking_unary(stub) 227 channel.close() 228 229 with _channel(args) as new_channel: 230 new_stub = test_pb2_grpc.TestServiceStub(new_channel) 231 child_process = _ChildProcess(child_target) 232 child_process.start() 233 _blocking_unary(new_stub) 234 child_process.finish() 235 236 237def _connectivity_watch(channel, args): 238 239 parent_states = [] 240 parent_channel_ready_event = threading.Event() 241 242 def child_target(): 243 244 child_channel_ready_event = threading.Event() 245 246 def child_connectivity_callback(state): 247 if state is grpc.ChannelConnectivity.READY: 248 child_channel_ready_event.set() 249 250 with _channel(args) as child_channel: 251 child_stub = test_pb2_grpc.TestServiceStub(child_channel) 252 child_channel.subscribe(child_connectivity_callback) 253 _async_unary(child_stub) 254 if not child_channel_ready_event.wait(timeout=_RPC_TIMEOUT_S): 255 raise ValueError('Channel did not move to READY') 256 if len(parent_states) > 1: 257 raise ValueError( 258 'Received connectivity updates on parent callback', 259 parent_states) 260 child_channel.unsubscribe(child_connectivity_callback) 261 262 def parent_connectivity_callback(state): 263 parent_states.append(state) 264 if state is grpc.ChannelConnectivity.READY: 265 parent_channel_ready_event.set() 266 267 channel.subscribe(parent_connectivity_callback) 268 stub = test_pb2_grpc.TestServiceStub(channel) 269 child_process = _ChildProcess(child_target) 270 child_process.start() 271 _async_unary(stub) 272 if not parent_channel_ready_event.wait(timeout=_RPC_TIMEOUT_S): 273 raise ValueError('Channel did not move to READY') 274 channel.unsubscribe(parent_connectivity_callback) 275 child_process.finish() 276 277 278def _ping_pong_with_child_processes_after_first_response( 279 channel, args, child_target, run_after_close=True): 280 request_response_sizes = ( 281 31415, 282 9, 283 2653, 284 58979, 285 ) 286 request_payload_sizes = ( 287 27182, 288 8, 289 1828, 290 45904, 291 ) 292 stub = test_pb2_grpc.TestServiceStub(channel) 293 pipe = _Pipe() 294 parent_bidi_call = stub.FullDuplexCall(pipe) 295 child_processes = [] 296 first_message_received = False 297 for response_size, payload_size in zip(request_response_sizes, 298 request_payload_sizes): 299 request = messages_pb2.StreamingOutputCallRequest( 300 response_type=messages_pb2.COMPRESSABLE, 301 response_parameters=(messages_pb2.ResponseParameters( 302 size=response_size),), 303 payload=messages_pb2.Payload(body=b'\x00' * payload_size)) 304 pipe.add(request) 305 if first_message_received: 306 child_process = _ChildProcess(child_target, 307 (parent_bidi_call, channel, args)) 308 child_process.start() 309 child_processes.append(child_process) 310 response = next(parent_bidi_call) 311 first_message_received = True 312 child_process = _ChildProcess(child_target, 313 (parent_bidi_call, channel, args)) 314 child_process.start() 315 child_processes.append(child_process) 316 _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, 317 response_size) 318 pipe.close() 319 if run_after_close: 320 child_process = _ChildProcess(child_target, 321 (parent_bidi_call, channel, args)) 322 child_process.start() 323 child_processes.append(child_process) 324 for child_process in child_processes: 325 child_process.finish() 326 327 328def _in_progress_bidi_continue_call(channel): 329 330 def child_target(parent_bidi_call, parent_channel, args): 331 stub = test_pb2_grpc.TestServiceStub(parent_channel) 332 try: 333 _async_unary(stub) 334 raise Exception( 335 'Child should not be able to re-use channel after fork') 336 except ValueError as expected_value_error: 337 pass 338 inherited_code = parent_bidi_call.code() 339 inherited_details = parent_bidi_call.details() 340 if inherited_code != grpc.StatusCode.CANCELLED: 341 raise ValueError('Expected inherited code CANCELLED, got %s' % 342 inherited_code) 343 if inherited_details != 'Channel closed due to fork': 344 raise ValueError( 345 'Expected inherited details Channel closed due to fork, got %s' 346 % inherited_details) 347 348 # Don't run child_target after closing the parent call, as the call may have 349 # received a status from the server before fork occurs. 350 _ping_pong_with_child_processes_after_first_response(channel, 351 None, 352 child_target, 353 run_after_close=False) 354 355 356def _in_progress_bidi_same_channel_async_call(channel): 357 358 def child_target(parent_bidi_call, parent_channel, args): 359 stub = test_pb2_grpc.TestServiceStub(parent_channel) 360 try: 361 _async_unary(stub) 362 raise Exception( 363 'Child should not be able to re-use channel after fork') 364 except ValueError as expected_value_error: 365 pass 366 367 _ping_pong_with_child_processes_after_first_response( 368 channel, None, child_target) 369 370 371def _in_progress_bidi_same_channel_blocking_call(channel): 372 373 def child_target(parent_bidi_call, parent_channel, args): 374 stub = test_pb2_grpc.TestServiceStub(parent_channel) 375 try: 376 _blocking_unary(stub) 377 raise Exception( 378 'Child should not be able to re-use channel after fork') 379 except ValueError as expected_value_error: 380 pass 381 382 _ping_pong_with_child_processes_after_first_response( 383 channel, None, child_target) 384 385 386def _in_progress_bidi_new_channel_async_call(channel, args): 387 388 def child_target(parent_bidi_call, parent_channel, args): 389 with _channel(args) as channel: 390 stub = test_pb2_grpc.TestServiceStub(channel) 391 _async_unary(stub) 392 393 _ping_pong_with_child_processes_after_first_response( 394 channel, args, child_target) 395 396 397def _in_progress_bidi_new_channel_blocking_call(channel, args): 398 399 def child_target(parent_bidi_call, parent_channel, args): 400 with _channel(args) as channel: 401 stub = test_pb2_grpc.TestServiceStub(channel) 402 _blocking_unary(stub) 403 404 _ping_pong_with_child_processes_after_first_response( 405 channel, args, child_target) 406 407 408@enum.unique 409class TestCase(enum.Enum): 410 411 CONNECTIVITY_WATCH = 'connectivity_watch' 412 CLOSE_CHANNEL_BEFORE_FORK = 'close_channel_before_fork' 413 ASYNC_UNARY_SAME_CHANNEL = 'async_unary_same_channel' 414 ASYNC_UNARY_NEW_CHANNEL = 'async_unary_new_channel' 415 BLOCKING_UNARY_SAME_CHANNEL = 'blocking_unary_same_channel' 416 BLOCKING_UNARY_NEW_CHANNEL = 'blocking_unary_new_channel' 417 IN_PROGRESS_BIDI_CONTINUE_CALL = 'in_progress_bidi_continue_call' 418 IN_PROGRESS_BIDI_SAME_CHANNEL_ASYNC_CALL = 'in_progress_bidi_same_channel_async_call' 419 IN_PROGRESS_BIDI_SAME_CHANNEL_BLOCKING_CALL = 'in_progress_bidi_same_channel_blocking_call' 420 IN_PROGRESS_BIDI_NEW_CHANNEL_ASYNC_CALL = 'in_progress_bidi_new_channel_async_call' 421 IN_PROGRESS_BIDI_NEW_CHANNEL_BLOCKING_CALL = 'in_progress_bidi_new_channel_blocking_call' 422 423 def run_test(self, args): 424 _LOGGER.info("Running %s", self) 425 channel = _channel(args) 426 if self is TestCase.ASYNC_UNARY_SAME_CHANNEL: 427 _async_unary_same_channel(channel) 428 elif self is TestCase.ASYNC_UNARY_NEW_CHANNEL: 429 _async_unary_new_channel(channel, args) 430 elif self is TestCase.BLOCKING_UNARY_SAME_CHANNEL: 431 _blocking_unary_same_channel(channel) 432 elif self is TestCase.BLOCKING_UNARY_NEW_CHANNEL: 433 _blocking_unary_new_channel(channel, args) 434 elif self is TestCase.CLOSE_CHANNEL_BEFORE_FORK: 435 _close_channel_before_fork(channel, args) 436 elif self is TestCase.CONNECTIVITY_WATCH: 437 _connectivity_watch(channel, args) 438 elif self is TestCase.IN_PROGRESS_BIDI_CONTINUE_CALL: 439 _in_progress_bidi_continue_call(channel) 440 elif self is TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_ASYNC_CALL: 441 _in_progress_bidi_same_channel_async_call(channel) 442 elif self is TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_BLOCKING_CALL: 443 _in_progress_bidi_same_channel_blocking_call(channel) 444 elif self is TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_ASYNC_CALL: 445 _in_progress_bidi_new_channel_async_call(channel, args) 446 elif self is TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_BLOCKING_CALL: 447 _in_progress_bidi_new_channel_blocking_call(channel, args) 448 else: 449 raise NotImplementedError('Test case "%s" not implemented!' % 450 self.name) 451 channel.close() 452