• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2019 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Client-side fork interop tests as a unit test."""
15
16import os
17import subprocess
18import sys
19import tempfile
20import threading
21import time
22import unittest
23
24from grpc._cython import cygrpc
25
26from tests.fork import methods
27
28
29def _dump_streams(name, streams):
30    assert len(streams) == 2
31    for stream_name, stream in zip(("STDOUT", "STDERR"), streams):
32        stream.seek(0)
33        sys.stderr.write(
34            "{} {}:\n{}\n".format(
35                name, stream_name, stream.read().decode("ascii")
36            )
37        )
38        stream.close()
39    sys.stderr.flush()
40
41
42# New instance of multiprocessing.Process using fork without exec can and will
43# freeze if the Python process has any other threads running. This includes the
44# additional thread spawned by our _runner.py class. So in order to test our
45# compatibility with multiprocessing, we first fork+exec a new process to ensure
46# we don't have any conflicting background threads.
47_CLIENT_FORK_SCRIPT_TEMPLATE = """if True:
48    import os
49    from grpc._cython import cygrpc
50    from tests.fork import methods
51
52    from src.python.grpcio_tests.tests.fork import native_debug
53
54    native_debug.install_failure_signal_handler()
55
56    cygrpc._GRPC_ENABLE_FORK_SUPPORT = True
57    os.environ['GRPC_POLL_STRATEGY'] = 'epoll1'
58    os.environ['GRPC_ENABLE_FORK_SUPPORT'] = 'true'
59    methods.TestCase.%s.run_test({
60      'server_host': 'localhost',
61      'server_port': %d,
62      'use_tls': False
63    })
64"""
65_SUBPROCESS_TIMEOUT_S = 80
66_GDB_TIMEOUT_S = 60
67
68
69@unittest.skipUnless(
70    sys.platform.startswith("linux"),
71    "not supported on windows, and fork+exec networking blocked on mac",
72)
73@unittest.skipUnless(
74    os.getenv("GRPC_ENABLE_FORK_SUPPORT") is not None,
75    "Core must be built with fork support to run this test.",
76)
77class ForkInteropTest(unittest.TestCase):
78    def setUp(self):
79        self._port = None
80        start_server_script = """if True:
81            import sys
82            import time
83
84            import grpc
85            from src.proto.grpc.testing import test_pb2_grpc
86            from tests.interop import service as interop_service
87            from tests.unit import test_common
88
89            server = test_common.test_server()
90            test_pb2_grpc.add_TestServiceServicer_to_server(
91                interop_service.TestService(), server)
92            port = server.add_insecure_port('[::]:0')
93            server.start()
94            print(port)
95            sys.stdout.flush()
96            while True:
97                time.sleep(1)
98        """
99        self._streams = tuple(tempfile.TemporaryFile() for _ in range(2))
100        self._server_process = subprocess.Popen(
101            [sys.executable, "-c", start_server_script],
102            stdout=self._streams[0],
103            stderr=self._streams[1],
104        )
105        timer = threading.Timer(
106            _SUBPROCESS_TIMEOUT_S, self._server_process.kill
107        )
108        interval_secs = 2.0
109        cumulative_secs = 0.0
110        try:
111            timer.start()
112            while cumulative_secs < _SUBPROCESS_TIMEOUT_S:
113                self._streams[0].seek(0)
114                s = self._streams[0].readline()
115                if s:
116                    self._port = int(s)
117                    break
118                time.sleep(interval_secs)
119                cumulative_secs += interval_secs
120
121            if self._port is None:
122                # Timeout
123                self._streams[0].seek(0)
124                sys.stderr.write(
125                    "Server STDOUT:\n{}\n".format(self._streams[0].read())
126                )
127                self._streams[1].seek(0)
128                sys.stderr.write(
129                    "Server STDERR:\n{}\n".format(self._streams[1].read())
130                )
131                sys.stderr.flush()
132                raise Exception("Failed to get port from server.")
133        except ValueError:
134            raise Exception("Failed to get port from server")
135        finally:
136            timer.cancel()
137
138    def testConnectivityWatch(self):
139        self._verifyTestCase(methods.TestCase.CONNECTIVITY_WATCH)
140
141    def testCloseChannelBeforeFork(self):
142        self._verifyTestCase(methods.TestCase.CLOSE_CHANNEL_BEFORE_FORK)
143
144    def testAsyncUnarySameChannel(self):
145        self._verifyTestCase(methods.TestCase.ASYNC_UNARY_SAME_CHANNEL)
146
147    def testAsyncUnaryNewChannel(self):
148        self._verifyTestCase(methods.TestCase.ASYNC_UNARY_NEW_CHANNEL)
149
150    def testBlockingUnarySameChannel(self):
151        self._verifyTestCase(methods.TestCase.BLOCKING_UNARY_SAME_CHANNEL)
152
153    def testBlockingUnaryNewChannel(self):
154        self._verifyTestCase(methods.TestCase.BLOCKING_UNARY_NEW_CHANNEL)
155
156    def testInProgressBidiContinueCall(self):
157        self._verifyTestCase(methods.TestCase.IN_PROGRESS_BIDI_CONTINUE_CALL)
158
159    def testInProgressBidiSameChannelAsyncCall(self):
160        self._verifyTestCase(
161            methods.TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_ASYNC_CALL
162        )
163
164    def testInProgressBidiSameChannelBlockingCall(self):
165        self._verifyTestCase(
166            methods.TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_BLOCKING_CALL
167        )
168
169    def testInProgressBidiNewChannelAsyncCall(self):
170        self._verifyTestCase(
171            methods.TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_ASYNC_CALL
172        )
173
174    def testInProgressBidiNewChannelBlockingCall(self):
175        self._verifyTestCase(
176            methods.TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_BLOCKING_CALL
177        )
178
179    def tearDown(self):
180        self._server_process.kill()
181        for stream in self._streams:
182            stream.close()
183
184    def _print_backtraces(self, pid):
185        cmd = [
186            "gdb",
187            "-ex",
188            "set confirm off",
189            "-ex",
190            "echo attaching",
191            "-ex",
192            "attach {}".format(pid),
193            "-ex",
194            "echo print_backtrace",
195            "-ex",
196            "thread apply all bt",
197            "-ex",
198            "echo printed_backtrace",
199            "-ex",
200            "quit",
201        ]
202        streams = tuple(tempfile.TemporaryFile() for _ in range(2))
203        sys.stderr.write("Invoking gdb\n")
204        sys.stderr.flush()
205        process = subprocess.Popen(cmd, stdout=streams[0], stderr=streams[1])
206        try:
207            process.wait(timeout=_GDB_TIMEOUT_S)
208        except subprocess.TimeoutExpired:
209            sys.stderr.write("gdb stacktrace generation timed out.\n")
210        finally:
211            _dump_streams("gdb", streams)
212
213    def _verifyTestCase(self, test_case):
214        script = _CLIENT_FORK_SCRIPT_TEMPLATE % (test_case.name, self._port)
215        streams = tuple(tempfile.TemporaryFile() for _ in range(2))
216        process = subprocess.Popen(
217            [sys.executable, "-c", script], stdout=streams[0], stderr=streams[1]
218        )
219        try:
220            process.wait(timeout=_SUBPROCESS_TIMEOUT_S)
221            self.assertEqual(0, process.returncode)
222        except subprocess.TimeoutExpired:
223            self._print_backtraces(process.pid)
224            process.kill()
225            raise AssertionError("Parent process timed out.")
226        finally:
227            _dump_streams("Parent", streams)
228            _dump_streams("Server", self._streams)
229
230
231if __name__ == "__main__":
232    unittest.main(verbosity=2)
233