1# Copyright 2019 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Client-side fork interop tests as a unit test.""" 15 16import os 17import subprocess 18import sys 19import tempfile 20import threading 21import time 22import unittest 23 24from grpc._cython import cygrpc 25 26from tests.fork import methods 27 28 29def _dump_streams(name, streams): 30 assert len(streams) == 2 31 for stream_name, stream in zip(("STDOUT", "STDERR"), streams): 32 stream.seek(0) 33 sys.stderr.write( 34 "{} {}:\n{}\n".format( 35 name, stream_name, stream.read().decode("ascii") 36 ) 37 ) 38 stream.close() 39 sys.stderr.flush() 40 41 42# New instance of multiprocessing.Process using fork without exec can and will 43# freeze if the Python process has any other threads running. This includes the 44# additional thread spawned by our _runner.py class. So in order to test our 45# compatibility with multiprocessing, we first fork+exec a new process to ensure 46# we don't have any conflicting background threads. 47_CLIENT_FORK_SCRIPT_TEMPLATE = """if True: 48 import os 49 from grpc._cython import cygrpc 50 from tests.fork import methods 51 52 from src.python.grpcio_tests.tests.fork import native_debug 53 54 native_debug.install_failure_signal_handler() 55 56 cygrpc._GRPC_ENABLE_FORK_SUPPORT = True 57 os.environ['GRPC_POLL_STRATEGY'] = 'epoll1' 58 os.environ['GRPC_ENABLE_FORK_SUPPORT'] = 'true' 59 methods.TestCase.%s.run_test({ 60 'server_host': 'localhost', 61 'server_port': %d, 62 'use_tls': False 63 }) 64""" 65_SUBPROCESS_TIMEOUT_S = 80 66_GDB_TIMEOUT_S = 60 67 68 69@unittest.skipUnless( 70 sys.platform.startswith("linux"), 71 "not supported on windows, and fork+exec networking blocked on mac", 72) 73@unittest.skipUnless( 74 os.getenv("GRPC_ENABLE_FORK_SUPPORT") is not None, 75 "Core must be built with fork support to run this test.", 76) 77class ForkInteropTest(unittest.TestCase): 78 def setUp(self): 79 self._port = None 80 start_server_script = """if True: 81 import sys 82 import time 83 84 import grpc 85 from src.proto.grpc.testing import test_pb2_grpc 86 from tests.interop import service as interop_service 87 from tests.unit import test_common 88 89 server = test_common.test_server() 90 test_pb2_grpc.add_TestServiceServicer_to_server( 91 interop_service.TestService(), server) 92 port = server.add_insecure_port('[::]:0') 93 server.start() 94 print(port) 95 sys.stdout.flush() 96 while True: 97 time.sleep(1) 98 """ 99 self._streams = tuple(tempfile.TemporaryFile() for _ in range(2)) 100 self._server_process = subprocess.Popen( 101 [sys.executable, "-c", start_server_script], 102 stdout=self._streams[0], 103 stderr=self._streams[1], 104 ) 105 timer = threading.Timer( 106 _SUBPROCESS_TIMEOUT_S, self._server_process.kill 107 ) 108 interval_secs = 2.0 109 cumulative_secs = 0.0 110 try: 111 timer.start() 112 while cumulative_secs < _SUBPROCESS_TIMEOUT_S: 113 self._streams[0].seek(0) 114 s = self._streams[0].readline() 115 if s: 116 self._port = int(s) 117 break 118 time.sleep(interval_secs) 119 cumulative_secs += interval_secs 120 121 if self._port is None: 122 # Timeout 123 self._streams[0].seek(0) 124 sys.stderr.write( 125 "Server STDOUT:\n{}\n".format(self._streams[0].read()) 126 ) 127 self._streams[1].seek(0) 128 sys.stderr.write( 129 "Server STDERR:\n{}\n".format(self._streams[1].read()) 130 ) 131 sys.stderr.flush() 132 raise Exception("Failed to get port from server.") 133 except ValueError: 134 raise Exception("Failed to get port from server") 135 finally: 136 timer.cancel() 137 138 def testConnectivityWatch(self): 139 self._verifyTestCase(methods.TestCase.CONNECTIVITY_WATCH) 140 141 def testCloseChannelBeforeFork(self): 142 self._verifyTestCase(methods.TestCase.CLOSE_CHANNEL_BEFORE_FORK) 143 144 def testAsyncUnarySameChannel(self): 145 self._verifyTestCase(methods.TestCase.ASYNC_UNARY_SAME_CHANNEL) 146 147 def testAsyncUnaryNewChannel(self): 148 self._verifyTestCase(methods.TestCase.ASYNC_UNARY_NEW_CHANNEL) 149 150 def testBlockingUnarySameChannel(self): 151 self._verifyTestCase(methods.TestCase.BLOCKING_UNARY_SAME_CHANNEL) 152 153 def testBlockingUnaryNewChannel(self): 154 self._verifyTestCase(methods.TestCase.BLOCKING_UNARY_NEW_CHANNEL) 155 156 def testInProgressBidiContinueCall(self): 157 self._verifyTestCase(methods.TestCase.IN_PROGRESS_BIDI_CONTINUE_CALL) 158 159 def testInProgressBidiSameChannelAsyncCall(self): 160 self._verifyTestCase( 161 methods.TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_ASYNC_CALL 162 ) 163 164 def testInProgressBidiSameChannelBlockingCall(self): 165 self._verifyTestCase( 166 methods.TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_BLOCKING_CALL 167 ) 168 169 def testInProgressBidiNewChannelAsyncCall(self): 170 self._verifyTestCase( 171 methods.TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_ASYNC_CALL 172 ) 173 174 def testInProgressBidiNewChannelBlockingCall(self): 175 self._verifyTestCase( 176 methods.TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_BLOCKING_CALL 177 ) 178 179 def tearDown(self): 180 self._server_process.kill() 181 for stream in self._streams: 182 stream.close() 183 184 def _print_backtraces(self, pid): 185 cmd = [ 186 "gdb", 187 "-ex", 188 "set confirm off", 189 "-ex", 190 "echo attaching", 191 "-ex", 192 "attach {}".format(pid), 193 "-ex", 194 "echo print_backtrace", 195 "-ex", 196 "thread apply all bt", 197 "-ex", 198 "echo printed_backtrace", 199 "-ex", 200 "quit", 201 ] 202 streams = tuple(tempfile.TemporaryFile() for _ in range(2)) 203 sys.stderr.write("Invoking gdb\n") 204 sys.stderr.flush() 205 process = subprocess.Popen(cmd, stdout=streams[0], stderr=streams[1]) 206 try: 207 process.wait(timeout=_GDB_TIMEOUT_S) 208 except subprocess.TimeoutExpired: 209 sys.stderr.write("gdb stacktrace generation timed out.\n") 210 finally: 211 _dump_streams("gdb", streams) 212 213 def _verifyTestCase(self, test_case): 214 script = _CLIENT_FORK_SCRIPT_TEMPLATE % (test_case.name, self._port) 215 streams = tuple(tempfile.TemporaryFile() for _ in range(2)) 216 process = subprocess.Popen( 217 [sys.executable, "-c", script], stdout=streams[0], stderr=streams[1] 218 ) 219 try: 220 process.wait(timeout=_SUBPROCESS_TIMEOUT_S) 221 self.assertEqual(0, process.returncode) 222 except subprocess.TimeoutExpired: 223 self._print_backtraces(process.pid) 224 process.kill() 225 raise AssertionError("Parent process timed out.") 226 finally: 227 _dump_streams("Parent", streams) 228 _dump_streams("Server", self._streams) 229 230 231if __name__ == "__main__": 232 unittest.main(verbosity=2) 233