1# Copyright 2018 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Implementations of fork support test methods.""" 15 16import enum 17import json 18import logging 19import multiprocessing 20import os 21import threading 22import time 23 24import grpc 25 26from six.moves import queue 27 28from src.proto.grpc.testing import empty_pb2 29from src.proto.grpc.testing import messages_pb2 30from src.proto.grpc.testing import test_pb2_grpc 31 32_LOGGER = logging.getLogger(__name__) 33_RPC_TIMEOUT_S = 10 34_CHILD_FINISH_TIMEOUT_S = 60 35 36 37def _channel(args): 38 target = '{}:{}'.format(args['server_host'], args['server_port']) 39 if args['use_tls']: 40 channel_credentials = grpc.ssl_channel_credentials() 41 channel = grpc.secure_channel(target, channel_credentials) 42 else: 43 channel = grpc.insecure_channel(target) 44 return channel 45 46 47def _validate_payload_type_and_length(response, expected_type, expected_length): 48 if response.payload.type is not expected_type: 49 raise ValueError('expected payload type %s, got %s' % 50 (expected_type, type(response.payload.type))) 51 elif len(response.payload.body) != expected_length: 52 raise ValueError('expected payload body size %d, got %d' % 53 (expected_length, len(response.payload.body))) 54 55 56def _async_unary(stub): 57 size = 314159 58 request = messages_pb2.SimpleRequest( 59 response_type=messages_pb2.COMPRESSABLE, 60 response_size=size, 61 payload=messages_pb2.Payload(body=b'\x00' * 271828)) 62 response_future = stub.UnaryCall.future(request, timeout=_RPC_TIMEOUT_S) 63 response = response_future.result() 64 _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, size) 65 66 67def _blocking_unary(stub): 68 size = 314159 69 request = messages_pb2.SimpleRequest( 70 response_type=messages_pb2.COMPRESSABLE, 71 response_size=size, 72 payload=messages_pb2.Payload(body=b'\x00' * 271828)) 73 response = stub.UnaryCall(request, timeout=_RPC_TIMEOUT_S) 74 _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, size) 75 76 77class _Pipe(object): 78 79 def __init__(self): 80 self._condition = threading.Condition() 81 self._values = [] 82 self._open = True 83 84 def __iter__(self): 85 return self 86 87 def __next__(self): 88 return self.next() 89 90 def next(self): 91 with self._condition: 92 while not self._values and self._open: 93 self._condition.wait() 94 if self._values: 95 return self._values.pop(0) 96 else: 97 raise StopIteration() 98 99 def add(self, value): 100 with self._condition: 101 self._values.append(value) 102 self._condition.notify() 103 104 def close(self): 105 with self._condition: 106 self._open = False 107 self._condition.notify() 108 109 def __enter__(self): 110 return self 111 112 def __exit__(self, type, value, traceback): 113 self.close() 114 115 116class _ChildProcess(object): 117 118 def __init__(self, task, args=None): 119 if args is None: 120 args = () 121 self._exceptions = multiprocessing.Queue() 122 123 def record_exceptions(): 124 try: 125 task(*args) 126 except grpc.RpcError as rpc_error: 127 self._exceptions.put('RpcError: %s' % rpc_error) 128 except Exception as e: # pylint: disable=broad-except 129 self._exceptions.put(e) 130 131 self._process = multiprocessing.Process(target=record_exceptions) 132 133 def start(self): 134 self._process.start() 135 136 def finish(self): 137 self._process.join(timeout=_CHILD_FINISH_TIMEOUT_S) 138 if self._process.is_alive(): 139 raise RuntimeError('Child process did not terminate') 140 if self._process.exitcode != 0: 141 raise ValueError('Child process failed with exitcode %d' % 142 self._process.exitcode) 143 try: 144 exception = self._exceptions.get(block=False) 145 raise ValueError('Child process failed: "%s": "%s"' % 146 (repr(exception), exception)) 147 except queue.Empty: 148 pass 149 150 151def _async_unary_same_channel(channel): 152 153 def child_target(): 154 try: 155 _async_unary(stub) 156 raise Exception( 157 'Child should not be able to re-use channel after fork') 158 except ValueError as expected_value_error: 159 pass 160 161 stub = test_pb2_grpc.TestServiceStub(channel) 162 _async_unary(stub) 163 child_process = _ChildProcess(child_target) 164 child_process.start() 165 _async_unary(stub) 166 child_process.finish() 167 168 169def _async_unary_new_channel(channel, args): 170 171 def child_target(): 172 with _channel(args) as child_channel: 173 child_stub = test_pb2_grpc.TestServiceStub(child_channel) 174 _async_unary(child_stub) 175 child_channel.close() 176 177 stub = test_pb2_grpc.TestServiceStub(channel) 178 _async_unary(stub) 179 child_process = _ChildProcess(child_target) 180 child_process.start() 181 _async_unary(stub) 182 child_process.finish() 183 184 185def _blocking_unary_same_channel(channel): 186 187 def child_target(): 188 try: 189 _blocking_unary(stub) 190 raise Exception( 191 'Child should not be able to re-use channel after fork') 192 except ValueError as expected_value_error: 193 pass 194 195 stub = test_pb2_grpc.TestServiceStub(channel) 196 _blocking_unary(stub) 197 child_process = _ChildProcess(child_target) 198 child_process.start() 199 child_process.finish() 200 201 202def _blocking_unary_new_channel(channel, args): 203 204 def child_target(): 205 with _channel(args) as child_channel: 206 child_stub = test_pb2_grpc.TestServiceStub(child_channel) 207 _blocking_unary(child_stub) 208 209 stub = test_pb2_grpc.TestServiceStub(channel) 210 _blocking_unary(stub) 211 child_process = _ChildProcess(child_target) 212 child_process.start() 213 _blocking_unary(stub) 214 child_process.finish() 215 216 217# Verify that the fork channel registry can handle already closed channels 218def _close_channel_before_fork(channel, args): 219 220 def child_target(): 221 new_channel.close() 222 with _channel(args) as child_channel: 223 child_stub = test_pb2_grpc.TestServiceStub(child_channel) 224 _blocking_unary(child_stub) 225 226 stub = test_pb2_grpc.TestServiceStub(channel) 227 _blocking_unary(stub) 228 channel.close() 229 230 with _channel(args) as new_channel: 231 new_stub = test_pb2_grpc.TestServiceStub(new_channel) 232 child_process = _ChildProcess(child_target) 233 child_process.start() 234 _blocking_unary(new_stub) 235 child_process.finish() 236 237 238def _connectivity_watch(channel, args): 239 240 parent_states = [] 241 parent_channel_ready_event = threading.Event() 242 243 def child_target(): 244 245 child_channel_ready_event = threading.Event() 246 247 def child_connectivity_callback(state): 248 if state is grpc.ChannelConnectivity.READY: 249 child_channel_ready_event.set() 250 251 with _channel(args) as child_channel: 252 child_stub = test_pb2_grpc.TestServiceStub(child_channel) 253 child_channel.subscribe(child_connectivity_callback) 254 _async_unary(child_stub) 255 if not child_channel_ready_event.wait(timeout=_RPC_TIMEOUT_S): 256 raise ValueError('Channel did not move to READY') 257 if len(parent_states) > 1: 258 raise ValueError( 259 'Received connectivity updates on parent callback', 260 parent_states) 261 child_channel.unsubscribe(child_connectivity_callback) 262 263 def parent_connectivity_callback(state): 264 parent_states.append(state) 265 if state is grpc.ChannelConnectivity.READY: 266 parent_channel_ready_event.set() 267 268 channel.subscribe(parent_connectivity_callback) 269 stub = test_pb2_grpc.TestServiceStub(channel) 270 child_process = _ChildProcess(child_target) 271 child_process.start() 272 _async_unary(stub) 273 if not parent_channel_ready_event.wait(timeout=_RPC_TIMEOUT_S): 274 raise ValueError('Channel did not move to READY') 275 channel.unsubscribe(parent_connectivity_callback) 276 child_process.finish() 277 278 279def _ping_pong_with_child_processes_after_first_response( 280 channel, args, child_target, run_after_close=True): 281 request_response_sizes = ( 282 31415, 283 9, 284 2653, 285 58979, 286 ) 287 request_payload_sizes = ( 288 27182, 289 8, 290 1828, 291 45904, 292 ) 293 stub = test_pb2_grpc.TestServiceStub(channel) 294 pipe = _Pipe() 295 parent_bidi_call = stub.FullDuplexCall(pipe) 296 child_processes = [] 297 first_message_received = False 298 for response_size, payload_size in zip(request_response_sizes, 299 request_payload_sizes): 300 request = messages_pb2.StreamingOutputCallRequest( 301 response_type=messages_pb2.COMPRESSABLE, 302 response_parameters=(messages_pb2.ResponseParameters( 303 size=response_size),), 304 payload=messages_pb2.Payload(body=b'\x00' * payload_size)) 305 pipe.add(request) 306 if first_message_received: 307 child_process = _ChildProcess(child_target, 308 (parent_bidi_call, channel, args)) 309 child_process.start() 310 child_processes.append(child_process) 311 response = next(parent_bidi_call) 312 first_message_received = True 313 child_process = _ChildProcess(child_target, 314 (parent_bidi_call, channel, args)) 315 child_process.start() 316 child_processes.append(child_process) 317 _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, 318 response_size) 319 pipe.close() 320 if run_after_close: 321 child_process = _ChildProcess(child_target, 322 (parent_bidi_call, channel, args)) 323 child_process.start() 324 child_processes.append(child_process) 325 for child_process in child_processes: 326 child_process.finish() 327 328 329def _in_progress_bidi_continue_call(channel): 330 331 def child_target(parent_bidi_call, parent_channel, args): 332 stub = test_pb2_grpc.TestServiceStub(parent_channel) 333 try: 334 _async_unary(stub) 335 raise Exception( 336 'Child should not be able to re-use channel after fork') 337 except ValueError as expected_value_error: 338 pass 339 inherited_code = parent_bidi_call.code() 340 inherited_details = parent_bidi_call.details() 341 if inherited_code != grpc.StatusCode.CANCELLED: 342 raise ValueError('Expected inherited code CANCELLED, got %s' % 343 inherited_code) 344 if inherited_details != 'Channel closed due to fork': 345 raise ValueError( 346 'Expected inherited details Channel closed due to fork, got %s' 347 % inherited_details) 348 349 # Don't run child_target after closing the parent call, as the call may have 350 # received a status from the server before fork occurs. 351 _ping_pong_with_child_processes_after_first_response(channel, 352 None, 353 child_target, 354 run_after_close=False) 355 356 357def _in_progress_bidi_same_channel_async_call(channel): 358 359 def child_target(parent_bidi_call, parent_channel, args): 360 stub = test_pb2_grpc.TestServiceStub(parent_channel) 361 try: 362 _async_unary(stub) 363 raise Exception( 364 'Child should not be able to re-use channel after fork') 365 except ValueError as expected_value_error: 366 pass 367 368 _ping_pong_with_child_processes_after_first_response( 369 channel, None, child_target) 370 371 372def _in_progress_bidi_same_channel_blocking_call(channel): 373 374 def child_target(parent_bidi_call, parent_channel, args): 375 stub = test_pb2_grpc.TestServiceStub(parent_channel) 376 try: 377 _blocking_unary(stub) 378 raise Exception( 379 'Child should not be able to re-use channel after fork') 380 except ValueError as expected_value_error: 381 pass 382 383 _ping_pong_with_child_processes_after_first_response( 384 channel, None, child_target) 385 386 387def _in_progress_bidi_new_channel_async_call(channel, args): 388 389 def child_target(parent_bidi_call, parent_channel, args): 390 with _channel(args) as channel: 391 stub = test_pb2_grpc.TestServiceStub(channel) 392 _async_unary(stub) 393 394 _ping_pong_with_child_processes_after_first_response( 395 channel, args, child_target) 396 397 398def _in_progress_bidi_new_channel_blocking_call(channel, args): 399 400 def child_target(parent_bidi_call, parent_channel, args): 401 with _channel(args) as channel: 402 stub = test_pb2_grpc.TestServiceStub(channel) 403 _blocking_unary(stub) 404 405 _ping_pong_with_child_processes_after_first_response( 406 channel, args, child_target) 407 408 409@enum.unique 410class TestCase(enum.Enum): 411 412 CONNECTIVITY_WATCH = 'connectivity_watch' 413 CLOSE_CHANNEL_BEFORE_FORK = 'close_channel_before_fork' 414 ASYNC_UNARY_SAME_CHANNEL = 'async_unary_same_channel' 415 ASYNC_UNARY_NEW_CHANNEL = 'async_unary_new_channel' 416 BLOCKING_UNARY_SAME_CHANNEL = 'blocking_unary_same_channel' 417 BLOCKING_UNARY_NEW_CHANNEL = 'blocking_unary_new_channel' 418 IN_PROGRESS_BIDI_CONTINUE_CALL = 'in_progress_bidi_continue_call' 419 IN_PROGRESS_BIDI_SAME_CHANNEL_ASYNC_CALL = 'in_progress_bidi_same_channel_async_call' 420 IN_PROGRESS_BIDI_SAME_CHANNEL_BLOCKING_CALL = 'in_progress_bidi_same_channel_blocking_call' 421 IN_PROGRESS_BIDI_NEW_CHANNEL_ASYNC_CALL = 'in_progress_bidi_new_channel_async_call' 422 IN_PROGRESS_BIDI_NEW_CHANNEL_BLOCKING_CALL = 'in_progress_bidi_new_channel_blocking_call' 423 424 def run_test(self, args): 425 _LOGGER.info("Running %s", self) 426 channel = _channel(args) 427 if self is TestCase.ASYNC_UNARY_SAME_CHANNEL: 428 _async_unary_same_channel(channel) 429 elif self is TestCase.ASYNC_UNARY_NEW_CHANNEL: 430 _async_unary_new_channel(channel, args) 431 elif self is TestCase.BLOCKING_UNARY_SAME_CHANNEL: 432 _blocking_unary_same_channel(channel) 433 elif self is TestCase.BLOCKING_UNARY_NEW_CHANNEL: 434 _blocking_unary_new_channel(channel, args) 435 elif self is TestCase.CLOSE_CHANNEL_BEFORE_FORK: 436 _close_channel_before_fork(channel, args) 437 elif self is TestCase.CONNECTIVITY_WATCH: 438 _connectivity_watch(channel, args) 439 elif self is TestCase.IN_PROGRESS_BIDI_CONTINUE_CALL: 440 _in_progress_bidi_continue_call(channel) 441 elif self is TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_ASYNC_CALL: 442 _in_progress_bidi_same_channel_async_call(channel) 443 elif self is TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_BLOCKING_CALL: 444 _in_progress_bidi_same_channel_blocking_call(channel) 445 elif self is TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_ASYNC_CALL: 446 _in_progress_bidi_new_channel_async_call(channel, args) 447 elif self is TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_BLOCKING_CALL: 448 _in_progress_bidi_new_channel_blocking_call(channel, args) 449 else: 450 raise NotImplementedError('Test case "%s" not implemented!' % 451 self.name) 452 channel.close() 453