• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""
2This test suite exercises some system calls subject to interruption with EINTR,
3to check that it is actually handled transparently.
4It is intended to be run by the main test suite within a child process, to
5ensure there is no background thread running (so that signals are delivered to
6the correct thread).
7Signals are generated in-process using setitimer(ITIMER_REAL), which allows
8sub-second periodicity (contrarily to signal()).
9"""
10
11import contextlib
12import faulthandler
13import fcntl
14import os
15import platform
16import select
17import signal
18import socket
19import subprocess
20import sys
21import time
22import unittest
23
24from test import support
25from test.support import socket_helper
26
27@contextlib.contextmanager
28def kill_on_error(proc):
29    """Context manager killing the subprocess if a Python exception is raised."""
30    with proc:
31        try:
32            yield proc
33        except:
34            proc.kill()
35            raise
36
37
38@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
39class EINTRBaseTest(unittest.TestCase):
40    """ Base class for EINTR tests. """
41
42    # delay for initial signal delivery
43    signal_delay = 0.1
44    # signal delivery periodicity
45    signal_period = 0.1
46    # default sleep time for tests - should obviously have:
47    # sleep_time > signal_period
48    sleep_time = 0.2
49
50    def sighandler(self, signum, frame):
51        self.signals += 1
52
53    def setUp(self):
54        self.signals = 0
55        self.orig_handler = signal.signal(signal.SIGALRM, self.sighandler)
56        signal.setitimer(signal.ITIMER_REAL, self.signal_delay,
57                         self.signal_period)
58
59        # Use faulthandler as watchdog to debug when a test hangs
60        # (timeout of 10 minutes)
61        faulthandler.dump_traceback_later(10 * 60, exit=True,
62                                          file=sys.__stderr__)
63
64    @staticmethod
65    def stop_alarm():
66        signal.setitimer(signal.ITIMER_REAL, 0, 0)
67
68    def tearDown(self):
69        self.stop_alarm()
70        signal.signal(signal.SIGALRM, self.orig_handler)
71        faulthandler.cancel_dump_traceback_later()
72
73    def subprocess(self, *args, **kw):
74        cmd_args = (sys.executable, '-c') + args
75        return subprocess.Popen(cmd_args, **kw)
76
77
78@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
79class OSEINTRTest(EINTRBaseTest):
80    """ EINTR tests for the os module. """
81
82    def new_sleep_process(self):
83        code = 'import time; time.sleep(%r)' % self.sleep_time
84        return self.subprocess(code)
85
86    def _test_wait_multiple(self, wait_func):
87        num = 3
88        processes = [self.new_sleep_process() for _ in range(num)]
89        for _ in range(num):
90            wait_func()
91        # Call the Popen method to avoid a ResourceWarning
92        for proc in processes:
93            proc.wait()
94
95    def test_wait(self):
96        self._test_wait_multiple(os.wait)
97
98    @unittest.skipUnless(hasattr(os, 'wait3'), 'requires wait3()')
99    def test_wait3(self):
100        self._test_wait_multiple(lambda: os.wait3(0))
101
102    def _test_wait_single(self, wait_func):
103        proc = self.new_sleep_process()
104        wait_func(proc.pid)
105        # Call the Popen method to avoid a ResourceWarning
106        proc.wait()
107
108    def test_waitpid(self):
109        self._test_wait_single(lambda pid: os.waitpid(pid, 0))
110
111    @unittest.skipUnless(hasattr(os, 'wait4'), 'requires wait4()')
112    def test_wait4(self):
113        self._test_wait_single(lambda pid: os.wait4(pid, 0))
114
115    def test_read(self):
116        rd, wr = os.pipe()
117        self.addCleanup(os.close, rd)
118        # wr closed explicitly by parent
119
120        # the payload below are smaller than PIPE_BUF, hence the writes will be
121        # atomic
122        datas = [b"hello", b"world", b"spam"]
123
124        code = '\n'.join((
125            'import os, sys, time',
126            '',
127            'wr = int(sys.argv[1])',
128            'datas = %r' % datas,
129            'sleep_time = %r' % self.sleep_time,
130            '',
131            'for data in datas:',
132            '    # let the parent block on read()',
133            '    time.sleep(sleep_time)',
134            '    os.write(wr, data)',
135        ))
136
137        proc = self.subprocess(code, str(wr), pass_fds=[wr])
138        with kill_on_error(proc):
139            os.close(wr)
140            for data in datas:
141                self.assertEqual(data, os.read(rd, len(data)))
142            self.assertEqual(proc.wait(), 0)
143
144    def test_write(self):
145        rd, wr = os.pipe()
146        self.addCleanup(os.close, wr)
147        # rd closed explicitly by parent
148
149        # we must write enough data for the write() to block
150        data = b"x" * support.PIPE_MAX_SIZE
151
152        code = '\n'.join((
153            'import io, os, sys, time',
154            '',
155            'rd = int(sys.argv[1])',
156            'sleep_time = %r' % self.sleep_time,
157            'data = b"x" * %s' % support.PIPE_MAX_SIZE,
158            'data_len = len(data)',
159            '',
160            '# let the parent block on write()',
161            'time.sleep(sleep_time)',
162            '',
163            'read_data = io.BytesIO()',
164            'while len(read_data.getvalue()) < data_len:',
165            '    chunk = os.read(rd, 2 * data_len)',
166            '    read_data.write(chunk)',
167            '',
168            'value = read_data.getvalue()',
169            'if value != data:',
170            '    raise Exception("read error: %s vs %s bytes"',
171            '                    % (len(value), data_len))',
172        ))
173
174        proc = self.subprocess(code, str(rd), pass_fds=[rd])
175        with kill_on_error(proc):
176            os.close(rd)
177            written = 0
178            while written < len(data):
179                written += os.write(wr, memoryview(data)[written:])
180            self.assertEqual(proc.wait(), 0)
181
182
183@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
184class SocketEINTRTest(EINTRBaseTest):
185    """ EINTR tests for the socket module. """
186
187    @unittest.skipUnless(hasattr(socket, 'socketpair'), 'needs socketpair()')
188    def _test_recv(self, recv_func):
189        rd, wr = socket.socketpair()
190        self.addCleanup(rd.close)
191        # wr closed explicitly by parent
192
193        # single-byte payload guard us against partial recv
194        datas = [b"x", b"y", b"z"]
195
196        code = '\n'.join((
197            'import os, socket, sys, time',
198            '',
199            'fd = int(sys.argv[1])',
200            'family = %s' % int(wr.family),
201            'sock_type = %s' % int(wr.type),
202            'datas = %r' % datas,
203            'sleep_time = %r' % self.sleep_time,
204            '',
205            'wr = socket.fromfd(fd, family, sock_type)',
206            'os.close(fd)',
207            '',
208            'with wr:',
209            '    for data in datas:',
210            '        # let the parent block on recv()',
211            '        time.sleep(sleep_time)',
212            '        wr.sendall(data)',
213        ))
214
215        fd = wr.fileno()
216        proc = self.subprocess(code, str(fd), pass_fds=[fd])
217        with kill_on_error(proc):
218            wr.close()
219            for data in datas:
220                self.assertEqual(data, recv_func(rd, len(data)))
221            self.assertEqual(proc.wait(), 0)
222
223    def test_recv(self):
224        self._test_recv(socket.socket.recv)
225
226    @unittest.skipUnless(hasattr(socket.socket, 'recvmsg'), 'needs recvmsg()')
227    def test_recvmsg(self):
228        self._test_recv(lambda sock, data: sock.recvmsg(data)[0])
229
230    def _test_send(self, send_func):
231        rd, wr = socket.socketpair()
232        self.addCleanup(wr.close)
233        # rd closed explicitly by parent
234
235        # we must send enough data for the send() to block
236        data = b"xyz" * (support.SOCK_MAX_SIZE // 3)
237
238        code = '\n'.join((
239            'import os, socket, sys, time',
240            '',
241            'fd = int(sys.argv[1])',
242            'family = %s' % int(rd.family),
243            'sock_type = %s' % int(rd.type),
244            'sleep_time = %r' % self.sleep_time,
245            'data = b"xyz" * %s' % (support.SOCK_MAX_SIZE // 3),
246            'data_len = len(data)',
247            '',
248            'rd = socket.fromfd(fd, family, sock_type)',
249            'os.close(fd)',
250            '',
251            'with rd:',
252            '    # let the parent block on send()',
253            '    time.sleep(sleep_time)',
254            '',
255            '    received_data = bytearray(data_len)',
256            '    n = 0',
257            '    while n < data_len:',
258            '        n += rd.recv_into(memoryview(received_data)[n:])',
259            '',
260            'if received_data != data:',
261            '    raise Exception("recv error: %s vs %s bytes"',
262            '                    % (len(received_data), data_len))',
263        ))
264
265        fd = rd.fileno()
266        proc = self.subprocess(code, str(fd), pass_fds=[fd])
267        with kill_on_error(proc):
268            rd.close()
269            written = 0
270            while written < len(data):
271                sent = send_func(wr, memoryview(data)[written:])
272                # sendall() returns None
273                written += len(data) if sent is None else sent
274            self.assertEqual(proc.wait(), 0)
275
276    def test_send(self):
277        self._test_send(socket.socket.send)
278
279    def test_sendall(self):
280        self._test_send(socket.socket.sendall)
281
282    @unittest.skipUnless(hasattr(socket.socket, 'sendmsg'), 'needs sendmsg()')
283    def test_sendmsg(self):
284        self._test_send(lambda sock, data: sock.sendmsg([data]))
285
286    def test_accept(self):
287        sock = socket.create_server((socket_helper.HOST, 0))
288        self.addCleanup(sock.close)
289        port = sock.getsockname()[1]
290
291        code = '\n'.join((
292            'import socket, time',
293            '',
294            'host = %r' % socket_helper.HOST,
295            'port = %s' % port,
296            'sleep_time = %r' % self.sleep_time,
297            '',
298            '# let parent block on accept()',
299            'time.sleep(sleep_time)',
300            'with socket.create_connection((host, port)):',
301            '    time.sleep(sleep_time)',
302        ))
303
304        proc = self.subprocess(code)
305        with kill_on_error(proc):
306            client_sock, _ = sock.accept()
307            client_sock.close()
308            self.assertEqual(proc.wait(), 0)
309
310    # Issue #25122: There is a race condition in the FreeBSD kernel on
311    # handling signals in the FIFO device. Skip the test until the bug is
312    # fixed in the kernel.
313    # https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=203162
314    @support.requires_freebsd_version(10, 3)
315    @unittest.skipUnless(hasattr(os, 'mkfifo'), 'needs mkfifo()')
316    def _test_open(self, do_open_close_reader, do_open_close_writer):
317        filename = support.TESTFN
318
319        # Use a fifo: until the child opens it for reading, the parent will
320        # block when trying to open it for writing.
321        support.unlink(filename)
322        try:
323            os.mkfifo(filename)
324        except PermissionError as e:
325            self.skipTest('os.mkfifo(): %s' % e)
326        self.addCleanup(support.unlink, filename)
327
328        code = '\n'.join((
329            'import os, time',
330            '',
331            'path = %a' % filename,
332            'sleep_time = %r' % self.sleep_time,
333            '',
334            '# let the parent block',
335            'time.sleep(sleep_time)',
336            '',
337            do_open_close_reader,
338        ))
339
340        proc = self.subprocess(code)
341        with kill_on_error(proc):
342            do_open_close_writer(filename)
343            self.assertEqual(proc.wait(), 0)
344
345    def python_open(self, path):
346        fp = open(path, 'w')
347        fp.close()
348
349    @unittest.skipIf(sys.platform == "darwin",
350                     "hangs under macOS; see bpo-25234, bpo-35363")
351    def test_open(self):
352        self._test_open("fp = open(path, 'r')\nfp.close()",
353                        self.python_open)
354
355    def os_open(self, path):
356        fd = os.open(path, os.O_WRONLY)
357        os.close(fd)
358
359    @unittest.skipIf(sys.platform == "darwin",
360                     "hangs under macOS; see bpo-25234, bpo-35363")
361    def test_os_open(self):
362        self._test_open("fd = os.open(path, os.O_RDONLY)\nos.close(fd)",
363                        self.os_open)
364
365
366@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
367class TimeEINTRTest(EINTRBaseTest):
368    """ EINTR tests for the time module. """
369
370    def test_sleep(self):
371        t0 = time.monotonic()
372        time.sleep(self.sleep_time)
373        self.stop_alarm()
374        dt = time.monotonic() - t0
375        self.assertGreaterEqual(dt, self.sleep_time)
376
377
378@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
379# bpo-30320: Need pthread_sigmask() to block the signal, otherwise the test
380# is vulnerable to a race condition between the child and the parent processes.
381@unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
382                     'need signal.pthread_sigmask()')
383class SignalEINTRTest(EINTRBaseTest):
384    """ EINTR tests for the signal module. """
385
386    def check_sigwait(self, wait_func):
387        signum = signal.SIGUSR1
388        pid = os.getpid()
389
390        old_handler = signal.signal(signum, lambda *args: None)
391        self.addCleanup(signal.signal, signum, old_handler)
392
393        code = '\n'.join((
394            'import os, time',
395            'pid = %s' % os.getpid(),
396            'signum = %s' % int(signum),
397            'sleep_time = %r' % self.sleep_time,
398            'time.sleep(sleep_time)',
399            'os.kill(pid, signum)',
400        ))
401
402        old_mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
403        self.addCleanup(signal.pthread_sigmask, signal.SIG_UNBLOCK, [signum])
404
405        t0 = time.monotonic()
406        proc = self.subprocess(code)
407        with kill_on_error(proc):
408            wait_func(signum)
409            dt = time.monotonic() - t0
410
411        self.assertEqual(proc.wait(), 0)
412
413    @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
414                         'need signal.sigwaitinfo()')
415    def test_sigwaitinfo(self):
416        def wait_func(signum):
417            signal.sigwaitinfo([signum])
418
419        self.check_sigwait(wait_func)
420
421    @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
422                         'need signal.sigwaitinfo()')
423    def test_sigtimedwait(self):
424        def wait_func(signum):
425            signal.sigtimedwait([signum], 120.0)
426
427        self.check_sigwait(wait_func)
428
429
430@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
431class SelectEINTRTest(EINTRBaseTest):
432    """ EINTR tests for the select module. """
433
434    def test_select(self):
435        t0 = time.monotonic()
436        select.select([], [], [], self.sleep_time)
437        dt = time.monotonic() - t0
438        self.stop_alarm()
439        self.assertGreaterEqual(dt, self.sleep_time)
440
441    @unittest.skipIf(sys.platform == "darwin",
442                     "poll may fail on macOS; see issue #28087")
443    @unittest.skipUnless(hasattr(select, 'poll'), 'need select.poll')
444    def test_poll(self):
445        poller = select.poll()
446
447        t0 = time.monotonic()
448        poller.poll(self.sleep_time * 1e3)
449        dt = time.monotonic() - t0
450        self.stop_alarm()
451        self.assertGreaterEqual(dt, self.sleep_time)
452
453    @unittest.skipUnless(hasattr(select, 'epoll'), 'need select.epoll')
454    def test_epoll(self):
455        poller = select.epoll()
456        self.addCleanup(poller.close)
457
458        t0 = time.monotonic()
459        poller.poll(self.sleep_time)
460        dt = time.monotonic() - t0
461        self.stop_alarm()
462        self.assertGreaterEqual(dt, self.sleep_time)
463
464    @unittest.skipUnless(hasattr(select, 'kqueue'), 'need select.kqueue')
465    def test_kqueue(self):
466        kqueue = select.kqueue()
467        self.addCleanup(kqueue.close)
468
469        t0 = time.monotonic()
470        kqueue.control(None, 1, self.sleep_time)
471        dt = time.monotonic() - t0
472        self.stop_alarm()
473        self.assertGreaterEqual(dt, self.sleep_time)
474
475    @unittest.skipUnless(hasattr(select, 'devpoll'), 'need select.devpoll')
476    def test_devpoll(self):
477        poller = select.devpoll()
478        self.addCleanup(poller.close)
479
480        t0 = time.monotonic()
481        poller.poll(self.sleep_time * 1e3)
482        dt = time.monotonic() - t0
483        self.stop_alarm()
484        self.assertGreaterEqual(dt, self.sleep_time)
485
486
487class FNTLEINTRTest(EINTRBaseTest):
488    def _lock(self, lock_func, lock_name):
489        self.addCleanup(support.unlink, support.TESTFN)
490        code = '\n'.join((
491            "import fcntl, time",
492            "with open('%s', 'wb') as f:" % support.TESTFN,
493            "   fcntl.%s(f, fcntl.LOCK_EX)" % lock_name,
494            "   time.sleep(%s)" % self.sleep_time))
495        start_time = time.monotonic()
496        proc = self.subprocess(code)
497        with kill_on_error(proc):
498            with open(support.TESTFN, 'wb') as f:
499                while True:  # synchronize the subprocess
500                    dt = time.monotonic() - start_time
501                    if dt > 60.0:
502                        raise Exception("failed to sync child in %.1f sec" % dt)
503                    try:
504                        lock_func(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
505                        lock_func(f, fcntl.LOCK_UN)
506                        time.sleep(0.01)
507                    except BlockingIOError:
508                        break
509                # the child locked the file just a moment ago for 'sleep_time' seconds
510                # that means that the lock below will block for 'sleep_time' minus some
511                # potential context switch delay
512                lock_func(f, fcntl.LOCK_EX)
513                dt = time.monotonic() - start_time
514                self.assertGreaterEqual(dt, self.sleep_time)
515                self.stop_alarm()
516            proc.wait()
517
518    # Issue 35633: See https://bugs.python.org/issue35633#msg333662
519    # skip test rather than accept PermissionError from all platforms
520    @unittest.skipIf(platform.system() == "AIX", "AIX returns PermissionError")
521    def test_lockf(self):
522        self._lock(fcntl.lockf, "lockf")
523
524    def test_flock(self):
525        self._lock(fcntl.flock, "flock")
526
527
528if __name__ == "__main__":
529    unittest.main()
530