• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""
2This test suite exercises some system calls subject to interruption with EINTR,
3to check that it is actually handled transparently.
4It is intended to be run by the main test suite within a child process, to
5ensure there is no background thread running (so that signals are delivered to
6the correct thread).
7Signals are generated in-process using setitimer(ITIMER_REAL), which allows
8sub-second periodicity (contrarily to signal()).
9"""
10
11import contextlib
12import faulthandler
13import fcntl
14import os
15import platform
16import select
17import signal
18import socket
19import subprocess
20import sys
21import textwrap
22import time
23import unittest
24
25from test import support
26from test.support import os_helper
27from test.support import socket_helper
28
29
30# gh-109592: Tolerate a difference of 20 ms when comparing timings
31# (clock resolution)
32CLOCK_RES = 0.020
33
34
35@contextlib.contextmanager
36def kill_on_error(proc):
37    """Context manager killing the subprocess if a Python exception is raised."""
38    with proc:
39        try:
40            yield proc
41        except:
42            proc.kill()
43            raise
44
45
46@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
47class EINTRBaseTest(unittest.TestCase):
48    """ Base class for EINTR tests. """
49
50    # delay for initial signal delivery
51    signal_delay = 0.1
52    # signal delivery periodicity
53    signal_period = 0.1
54    # default sleep time for tests - should obviously have:
55    # sleep_time > signal_period
56    sleep_time = 0.2
57
58    def sighandler(self, signum, frame):
59        self.signals += 1
60
61    def setUp(self):
62        self.signals = 0
63        self.orig_handler = signal.signal(signal.SIGALRM, self.sighandler)
64        signal.setitimer(signal.ITIMER_REAL, self.signal_delay,
65                         self.signal_period)
66
67        # Use faulthandler as watchdog to debug when a test hangs
68        # (timeout of 10 minutes)
69        faulthandler.dump_traceback_later(10 * 60, exit=True,
70                                          file=sys.__stderr__)
71
72    @staticmethod
73    def stop_alarm():
74        signal.setitimer(signal.ITIMER_REAL, 0, 0)
75
76    def tearDown(self):
77        self.stop_alarm()
78        signal.signal(signal.SIGALRM, self.orig_handler)
79        faulthandler.cancel_dump_traceback_later()
80
81    def subprocess(self, *args, **kw):
82        cmd_args = (sys.executable, '-c') + args
83        return subprocess.Popen(cmd_args, **kw)
84
85    def check_elapsed_time(self, elapsed):
86        self.assertGreaterEqual(elapsed, self.sleep_time - CLOCK_RES)
87
88
89@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
90class OSEINTRTest(EINTRBaseTest):
91    """ EINTR tests for the os module. """
92
93    def new_sleep_process(self):
94        code = 'import time; time.sleep(%r)' % self.sleep_time
95        return self.subprocess(code)
96
97    def _test_wait_multiple(self, wait_func):
98        num = 3
99        processes = [self.new_sleep_process() for _ in range(num)]
100        for _ in range(num):
101            wait_func()
102        # Call the Popen method to avoid a ResourceWarning
103        for proc in processes:
104            proc.wait()
105
106    def test_wait(self):
107        self._test_wait_multiple(os.wait)
108
109    @unittest.skipUnless(hasattr(os, 'wait3'), 'requires wait3()')
110    def test_wait3(self):
111        self._test_wait_multiple(lambda: os.wait3(0))
112
113    def _test_wait_single(self, wait_func):
114        proc = self.new_sleep_process()
115        wait_func(proc.pid)
116        # Call the Popen method to avoid a ResourceWarning
117        proc.wait()
118
119    def test_waitpid(self):
120        self._test_wait_single(lambda pid: os.waitpid(pid, 0))
121
122    @unittest.skipUnless(hasattr(os, 'wait4'), 'requires wait4()')
123    def test_wait4(self):
124        self._test_wait_single(lambda pid: os.wait4(pid, 0))
125
126    def test_read(self):
127        rd, wr = os.pipe()
128        self.addCleanup(os.close, rd)
129        # wr closed explicitly by parent
130
131        # the payload below are smaller than PIPE_BUF, hence the writes will be
132        # atomic
133        datas = [b"hello", b"world", b"spam"]
134
135        code = '\n'.join((
136            'import os, sys, time',
137            '',
138            'wr = int(sys.argv[1])',
139            'datas = %r' % datas,
140            'sleep_time = %r' % self.sleep_time,
141            '',
142            'for data in datas:',
143            '    # let the parent block on read()',
144            '    time.sleep(sleep_time)',
145            '    os.write(wr, data)',
146        ))
147
148        proc = self.subprocess(code, str(wr), pass_fds=[wr])
149        with kill_on_error(proc):
150            os.close(wr)
151            for data in datas:
152                self.assertEqual(data, os.read(rd, len(data)))
153            self.assertEqual(proc.wait(), 0)
154
155    def test_write(self):
156        rd, wr = os.pipe()
157        self.addCleanup(os.close, wr)
158        # rd closed explicitly by parent
159
160        # we must write enough data for the write() to block
161        data = b"x" * support.PIPE_MAX_SIZE
162
163        code = '\n'.join((
164            'import io, os, sys, time',
165            '',
166            'rd = int(sys.argv[1])',
167            'sleep_time = %r' % self.sleep_time,
168            'data = b"x" * %s' % support.PIPE_MAX_SIZE,
169            'data_len = len(data)',
170            '',
171            '# let the parent block on write()',
172            'time.sleep(sleep_time)',
173            '',
174            'read_data = io.BytesIO()',
175            'while len(read_data.getvalue()) < data_len:',
176            '    chunk = os.read(rd, 2 * data_len)',
177            '    read_data.write(chunk)',
178            '',
179            'value = read_data.getvalue()',
180            'if value != data:',
181            '    raise Exception("read error: %s vs %s bytes"',
182            '                    % (len(value), data_len))',
183        ))
184
185        proc = self.subprocess(code, str(rd), pass_fds=[rd])
186        with kill_on_error(proc):
187            os.close(rd)
188            written = 0
189            while written < len(data):
190                written += os.write(wr, memoryview(data)[written:])
191            self.assertEqual(proc.wait(), 0)
192
193
194@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
195class SocketEINTRTest(EINTRBaseTest):
196    """ EINTR tests for the socket module. """
197
198    @unittest.skipUnless(hasattr(socket, 'socketpair'), 'needs socketpair()')
199    def _test_recv(self, recv_func):
200        rd, wr = socket.socketpair()
201        self.addCleanup(rd.close)
202        # wr closed explicitly by parent
203
204        # single-byte payload guard us against partial recv
205        datas = [b"x", b"y", b"z"]
206
207        code = '\n'.join((
208            'import os, socket, sys, time',
209            '',
210            'fd = int(sys.argv[1])',
211            'family = %s' % int(wr.family),
212            'sock_type = %s' % int(wr.type),
213            'datas = %r' % datas,
214            'sleep_time = %r' % self.sleep_time,
215            '',
216            'wr = socket.fromfd(fd, family, sock_type)',
217            'os.close(fd)',
218            '',
219            'with wr:',
220            '    for data in datas:',
221            '        # let the parent block on recv()',
222            '        time.sleep(sleep_time)',
223            '        wr.sendall(data)',
224        ))
225
226        fd = wr.fileno()
227        proc = self.subprocess(code, str(fd), pass_fds=[fd])
228        with kill_on_error(proc):
229            wr.close()
230            for data in datas:
231                self.assertEqual(data, recv_func(rd, len(data)))
232            self.assertEqual(proc.wait(), 0)
233
234    def test_recv(self):
235        self._test_recv(socket.socket.recv)
236
237    @unittest.skipUnless(hasattr(socket.socket, 'recvmsg'), 'needs recvmsg()')
238    def test_recvmsg(self):
239        self._test_recv(lambda sock, data: sock.recvmsg(data)[0])
240
241    def _test_send(self, send_func):
242        rd, wr = socket.socketpair()
243        self.addCleanup(wr.close)
244        # rd closed explicitly by parent
245
246        # we must send enough data for the send() to block
247        data = b"xyz" * (support.SOCK_MAX_SIZE // 3)
248
249        code = '\n'.join((
250            'import os, socket, sys, time',
251            '',
252            'fd = int(sys.argv[1])',
253            'family = %s' % int(rd.family),
254            'sock_type = %s' % int(rd.type),
255            'sleep_time = %r' % self.sleep_time,
256            'data = b"xyz" * %s' % (support.SOCK_MAX_SIZE // 3),
257            'data_len = len(data)',
258            '',
259            'rd = socket.fromfd(fd, family, sock_type)',
260            'os.close(fd)',
261            '',
262            'with rd:',
263            '    # let the parent block on send()',
264            '    time.sleep(sleep_time)',
265            '',
266            '    received_data = bytearray(data_len)',
267            '    n = 0',
268            '    while n < data_len:',
269            '        n += rd.recv_into(memoryview(received_data)[n:])',
270            '',
271            'if received_data != data:',
272            '    raise Exception("recv error: %s vs %s bytes"',
273            '                    % (len(received_data), data_len))',
274        ))
275
276        fd = rd.fileno()
277        proc = self.subprocess(code, str(fd), pass_fds=[fd])
278        with kill_on_error(proc):
279            rd.close()
280            written = 0
281            while written < len(data):
282                sent = send_func(wr, memoryview(data)[written:])
283                # sendall() returns None
284                written += len(data) if sent is None else sent
285            self.assertEqual(proc.wait(), 0)
286
287    def test_send(self):
288        self._test_send(socket.socket.send)
289
290    def test_sendall(self):
291        self._test_send(socket.socket.sendall)
292
293    @unittest.skipUnless(hasattr(socket.socket, 'sendmsg'), 'needs sendmsg()')
294    def test_sendmsg(self):
295        self._test_send(lambda sock, data: sock.sendmsg([data]))
296
297    def test_accept(self):
298        sock = socket.create_server((socket_helper.HOST, 0))
299        self.addCleanup(sock.close)
300        port = sock.getsockname()[1]
301
302        code = '\n'.join((
303            'import socket, time',
304            '',
305            'host = %r' % socket_helper.HOST,
306            'port = %s' % port,
307            'sleep_time = %r' % self.sleep_time,
308            '',
309            '# let parent block on accept()',
310            'time.sleep(sleep_time)',
311            'with socket.create_connection((host, port)):',
312            '    time.sleep(sleep_time)',
313        ))
314
315        proc = self.subprocess(code)
316        with kill_on_error(proc):
317            client_sock, _ = sock.accept()
318            client_sock.close()
319            self.assertEqual(proc.wait(), 0)
320
321    # Issue #25122: There is a race condition in the FreeBSD kernel on
322    # handling signals in the FIFO device. Skip the test until the bug is
323    # fixed in the kernel.
324    # https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=203162
325    @support.requires_freebsd_version(10, 3)
326    @unittest.skipUnless(hasattr(os, 'mkfifo'), 'needs mkfifo()')
327    def _test_open(self, do_open_close_reader, do_open_close_writer):
328        filename = os_helper.TESTFN
329
330        # Use a fifo: until the child opens it for reading, the parent will
331        # block when trying to open it for writing.
332        os_helper.unlink(filename)
333        try:
334            os.mkfifo(filename)
335        except PermissionError as e:
336            self.skipTest('os.mkfifo(): %s' % e)
337        self.addCleanup(os_helper.unlink, filename)
338
339        code = '\n'.join((
340            'import os, time',
341            '',
342            'path = %a' % filename,
343            'sleep_time = %r' % self.sleep_time,
344            '',
345            '# let the parent block',
346            'time.sleep(sleep_time)',
347            '',
348            do_open_close_reader,
349        ))
350
351        proc = self.subprocess(code)
352        with kill_on_error(proc):
353            do_open_close_writer(filename)
354            self.assertEqual(proc.wait(), 0)
355
356    def python_open(self, path):
357        fp = open(path, 'w')
358        fp.close()
359
360    @unittest.skipIf(sys.platform == "darwin",
361                     "hangs under macOS; see bpo-25234, bpo-35363")
362    def test_open(self):
363        self._test_open("fp = open(path, 'r')\nfp.close()",
364                        self.python_open)
365
366    def os_open(self, path):
367        fd = os.open(path, os.O_WRONLY)
368        os.close(fd)
369
370    @unittest.skipIf(sys.platform == "darwin",
371                     "hangs under macOS; see bpo-25234, bpo-35363")
372    def test_os_open(self):
373        self._test_open("fd = os.open(path, os.O_RDONLY)\nos.close(fd)",
374                        self.os_open)
375
376
377@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
378class TimeEINTRTest(EINTRBaseTest):
379    """ EINTR tests for the time module. """
380
381    def test_sleep(self):
382        t0 = time.monotonic()
383        time.sleep(self.sleep_time)
384        self.stop_alarm()
385        dt = time.monotonic() - t0
386        self.check_elapsed_time(dt)
387
388
389@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
390# bpo-30320: Need pthread_sigmask() to block the signal, otherwise the test
391# is vulnerable to a race condition between the child and the parent processes.
392@unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
393                     'need signal.pthread_sigmask()')
394class SignalEINTRTest(EINTRBaseTest):
395    """ EINTR tests for the signal module. """
396
397    def check_sigwait(self, wait_func):
398        signum = signal.SIGUSR1
399        pid = os.getpid()
400
401        old_handler = signal.signal(signum, lambda *args: None)
402        self.addCleanup(signal.signal, signum, old_handler)
403
404        code = '\n'.join((
405            'import os, time',
406            'pid = %s' % os.getpid(),
407            'signum = %s' % int(signum),
408            'sleep_time = %r' % self.sleep_time,
409            'time.sleep(sleep_time)',
410            'os.kill(pid, signum)',
411        ))
412
413        old_mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
414        self.addCleanup(signal.pthread_sigmask, signal.SIG_UNBLOCK, [signum])
415
416        proc = self.subprocess(code)
417        with kill_on_error(proc):
418            wait_func(signum)
419
420        self.assertEqual(proc.wait(), 0)
421
422    @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
423                         'need signal.sigwaitinfo()')
424    def test_sigwaitinfo(self):
425        def wait_func(signum):
426            signal.sigwaitinfo([signum])
427
428        self.check_sigwait(wait_func)
429
430    @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
431                         'need signal.sigwaitinfo()')
432    def test_sigtimedwait(self):
433        def wait_func(signum):
434            signal.sigtimedwait([signum], 120.0)
435
436        self.check_sigwait(wait_func)
437
438
439@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
440class SelectEINTRTest(EINTRBaseTest):
441    """ EINTR tests for the select module. """
442
443    def test_select(self):
444        t0 = time.monotonic()
445        select.select([], [], [], self.sleep_time)
446        dt = time.monotonic() - t0
447        self.stop_alarm()
448        self.check_elapsed_time(dt)
449
450    @unittest.skipIf(sys.platform == "darwin",
451                     "poll may fail on macOS; see issue #28087")
452    @unittest.skipUnless(hasattr(select, 'poll'), 'need select.poll')
453    def test_poll(self):
454        poller = select.poll()
455
456        t0 = time.monotonic()
457        poller.poll(self.sleep_time * 1e3)
458        dt = time.monotonic() - t0
459        self.stop_alarm()
460        self.check_elapsed_time(dt)
461
462    @unittest.skipUnless(hasattr(select, 'epoll'), 'need select.epoll')
463    def test_epoll(self):
464        poller = select.epoll()
465        self.addCleanup(poller.close)
466
467        t0 = time.monotonic()
468        poller.poll(self.sleep_time)
469        dt = time.monotonic() - t0
470        self.stop_alarm()
471        self.check_elapsed_time(dt)
472
473    @unittest.skipUnless(hasattr(select, 'kqueue'), 'need select.kqueue')
474    def test_kqueue(self):
475        kqueue = select.kqueue()
476        self.addCleanup(kqueue.close)
477
478        t0 = time.monotonic()
479        kqueue.control(None, 1, self.sleep_time)
480        dt = time.monotonic() - t0
481        self.stop_alarm()
482        self.check_elapsed_time(dt)
483
484    @unittest.skipUnless(hasattr(select, 'devpoll'), 'need select.devpoll')
485    def test_devpoll(self):
486        poller = select.devpoll()
487        self.addCleanup(poller.close)
488
489        t0 = time.monotonic()
490        poller.poll(self.sleep_time * 1e3)
491        dt = time.monotonic() - t0
492        self.stop_alarm()
493        self.check_elapsed_time(dt)
494
495
496class FCNTLEINTRTest(EINTRBaseTest):
497    def _lock(self, lock_func, lock_name):
498        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
499        rd1, wr1 = os.pipe()
500        rd2, wr2 = os.pipe()
501        for fd in (rd1, wr1, rd2, wr2):
502            self.addCleanup(os.close, fd)
503        code = textwrap.dedent(f"""
504            import fcntl, os, time
505            with open('{os_helper.TESTFN}', 'wb') as f:
506                fcntl.{lock_name}(f, fcntl.LOCK_EX)
507                os.write({wr1}, b"ok")
508                _ = os.read({rd2}, 2)  # wait for parent process
509                time.sleep({self.sleep_time})
510        """)
511        proc = self.subprocess(code, pass_fds=[wr1, rd2])
512        with kill_on_error(proc):
513            with open(os_helper.TESTFN, 'wb') as f:
514                # synchronize the subprocess
515                ok = os.read(rd1, 2)
516                self.assertEqual(ok, b"ok")
517
518                # notify the child that the parent is ready
519                start_time = time.monotonic()
520                os.write(wr2, b"go")
521
522                # the child locked the file just a moment ago for 'sleep_time' seconds
523                # that means that the lock below will block for 'sleep_time' minus some
524                # potential context switch delay
525                lock_func(f, fcntl.LOCK_EX)
526                dt = time.monotonic() - start_time
527                self.stop_alarm()
528                self.check_elapsed_time(dt)
529            proc.wait()
530
531    # Issue 35633: See https://bugs.python.org/issue35633#msg333662
532    # skip test rather than accept PermissionError from all platforms
533    @unittest.skipIf(platform.system() == "AIX", "AIX returns PermissionError")
534    def test_lockf(self):
535        self._lock(fcntl.lockf, "lockf")
536
537    def test_flock(self):
538        self._lock(fcntl.flock, "flock")
539
540
541if __name__ == "__main__":
542    unittest.main()
543