1""" 2This test suite exercises some system calls subject to interruption with EINTR, 3to check that it is actually handled transparently. 4It is intended to be run by the main test suite within a child process, to 5ensure there is no background thread running (so that signals are delivered to 6the correct thread). 7Signals are generated in-process using setitimer(ITIMER_REAL), which allows 8sub-second periodicity (contrarily to signal()). 9""" 10 11import contextlib 12import faulthandler 13import fcntl 14import os 15import platform 16import select 17import signal 18import socket 19import subprocess 20import sys 21import textwrap 22import time 23import unittest 24 25from test import support 26from test.support import os_helper 27from test.support import socket_helper 28 29 30# gh-109592: Tolerate a difference of 20 ms when comparing timings 31# (clock resolution) 32CLOCK_RES = 0.020 33 34 35@contextlib.contextmanager 36def kill_on_error(proc): 37 """Context manager killing the subprocess if a Python exception is raised.""" 38 with proc: 39 try: 40 yield proc 41 except: 42 proc.kill() 43 raise 44 45 46@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") 47class EINTRBaseTest(unittest.TestCase): 48 """ Base class for EINTR tests. """ 49 50 # delay for initial signal delivery 51 signal_delay = 0.1 52 # signal delivery periodicity 53 signal_period = 0.1 54 # default sleep time for tests - should obviously have: 55 # sleep_time > signal_period 56 sleep_time = 0.2 57 58 def sighandler(self, signum, frame): 59 self.signals += 1 60 61 def setUp(self): 62 self.signals = 0 63 self.orig_handler = signal.signal(signal.SIGALRM, self.sighandler) 64 signal.setitimer(signal.ITIMER_REAL, self.signal_delay, 65 self.signal_period) 66 67 # Use faulthandler as watchdog to debug when a test hangs 68 # (timeout of 10 minutes) 69 faulthandler.dump_traceback_later(10 * 60, exit=True, 70 file=sys.__stderr__) 71 72 @staticmethod 73 def stop_alarm(): 74 signal.setitimer(signal.ITIMER_REAL, 0, 0) 75 76 def tearDown(self): 77 self.stop_alarm() 78 signal.signal(signal.SIGALRM, self.orig_handler) 79 faulthandler.cancel_dump_traceback_later() 80 81 def subprocess(self, *args, **kw): 82 cmd_args = (sys.executable, '-c') + args 83 return subprocess.Popen(cmd_args, **kw) 84 85 def check_elapsed_time(self, elapsed): 86 self.assertGreaterEqual(elapsed, self.sleep_time - CLOCK_RES) 87 88 89@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") 90class OSEINTRTest(EINTRBaseTest): 91 """ EINTR tests for the os module. """ 92 93 def new_sleep_process(self): 94 code = 'import time; time.sleep(%r)' % self.sleep_time 95 return self.subprocess(code) 96 97 def _test_wait_multiple(self, wait_func): 98 num = 3 99 processes = [self.new_sleep_process() for _ in range(num)] 100 for _ in range(num): 101 wait_func() 102 # Call the Popen method to avoid a ResourceWarning 103 for proc in processes: 104 proc.wait() 105 106 def test_wait(self): 107 self._test_wait_multiple(os.wait) 108 109 @unittest.skipUnless(hasattr(os, 'wait3'), 'requires wait3()') 110 def test_wait3(self): 111 self._test_wait_multiple(lambda: os.wait3(0)) 112 113 def _test_wait_single(self, wait_func): 114 proc = self.new_sleep_process() 115 wait_func(proc.pid) 116 # Call the Popen method to avoid a ResourceWarning 117 proc.wait() 118 119 def test_waitpid(self): 120 self._test_wait_single(lambda pid: os.waitpid(pid, 0)) 121 122 @unittest.skipUnless(hasattr(os, 'wait4'), 'requires wait4()') 123 def test_wait4(self): 124 self._test_wait_single(lambda pid: os.wait4(pid, 0)) 125 126 def test_read(self): 127 rd, wr = os.pipe() 128 self.addCleanup(os.close, rd) 129 # wr closed explicitly by parent 130 131 # the payload below are smaller than PIPE_BUF, hence the writes will be 132 # atomic 133 datas = [b"hello", b"world", b"spam"] 134 135 code = '\n'.join(( 136 'import os, sys, time', 137 '', 138 'wr = int(sys.argv[1])', 139 'datas = %r' % datas, 140 'sleep_time = %r' % self.sleep_time, 141 '', 142 'for data in datas:', 143 ' # let the parent block on read()', 144 ' time.sleep(sleep_time)', 145 ' os.write(wr, data)', 146 )) 147 148 proc = self.subprocess(code, str(wr), pass_fds=[wr]) 149 with kill_on_error(proc): 150 os.close(wr) 151 for data in datas: 152 self.assertEqual(data, os.read(rd, len(data))) 153 self.assertEqual(proc.wait(), 0) 154 155 def test_write(self): 156 rd, wr = os.pipe() 157 self.addCleanup(os.close, wr) 158 # rd closed explicitly by parent 159 160 # we must write enough data for the write() to block 161 data = b"x" * support.PIPE_MAX_SIZE 162 163 code = '\n'.join(( 164 'import io, os, sys, time', 165 '', 166 'rd = int(sys.argv[1])', 167 'sleep_time = %r' % self.sleep_time, 168 'data = b"x" * %s' % support.PIPE_MAX_SIZE, 169 'data_len = len(data)', 170 '', 171 '# let the parent block on write()', 172 'time.sleep(sleep_time)', 173 '', 174 'read_data = io.BytesIO()', 175 'while len(read_data.getvalue()) < data_len:', 176 ' chunk = os.read(rd, 2 * data_len)', 177 ' read_data.write(chunk)', 178 '', 179 'value = read_data.getvalue()', 180 'if value != data:', 181 ' raise Exception("read error: %s vs %s bytes"', 182 ' % (len(value), data_len))', 183 )) 184 185 proc = self.subprocess(code, str(rd), pass_fds=[rd]) 186 with kill_on_error(proc): 187 os.close(rd) 188 written = 0 189 while written < len(data): 190 written += os.write(wr, memoryview(data)[written:]) 191 self.assertEqual(proc.wait(), 0) 192 193 194@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") 195class SocketEINTRTest(EINTRBaseTest): 196 """ EINTR tests for the socket module. """ 197 198 @unittest.skipUnless(hasattr(socket, 'socketpair'), 'needs socketpair()') 199 def _test_recv(self, recv_func): 200 rd, wr = socket.socketpair() 201 self.addCleanup(rd.close) 202 # wr closed explicitly by parent 203 204 # single-byte payload guard us against partial recv 205 datas = [b"x", b"y", b"z"] 206 207 code = '\n'.join(( 208 'import os, socket, sys, time', 209 '', 210 'fd = int(sys.argv[1])', 211 'family = %s' % int(wr.family), 212 'sock_type = %s' % int(wr.type), 213 'datas = %r' % datas, 214 'sleep_time = %r' % self.sleep_time, 215 '', 216 'wr = socket.fromfd(fd, family, sock_type)', 217 'os.close(fd)', 218 '', 219 'with wr:', 220 ' for data in datas:', 221 ' # let the parent block on recv()', 222 ' time.sleep(sleep_time)', 223 ' wr.sendall(data)', 224 )) 225 226 fd = wr.fileno() 227 proc = self.subprocess(code, str(fd), pass_fds=[fd]) 228 with kill_on_error(proc): 229 wr.close() 230 for data in datas: 231 self.assertEqual(data, recv_func(rd, len(data))) 232 self.assertEqual(proc.wait(), 0) 233 234 def test_recv(self): 235 self._test_recv(socket.socket.recv) 236 237 @unittest.skipUnless(hasattr(socket.socket, 'recvmsg'), 'needs recvmsg()') 238 def test_recvmsg(self): 239 self._test_recv(lambda sock, data: sock.recvmsg(data)[0]) 240 241 def _test_send(self, send_func): 242 rd, wr = socket.socketpair() 243 self.addCleanup(wr.close) 244 # rd closed explicitly by parent 245 246 # we must send enough data for the send() to block 247 data = b"xyz" * (support.SOCK_MAX_SIZE // 3) 248 249 code = '\n'.join(( 250 'import os, socket, sys, time', 251 '', 252 'fd = int(sys.argv[1])', 253 'family = %s' % int(rd.family), 254 'sock_type = %s' % int(rd.type), 255 'sleep_time = %r' % self.sleep_time, 256 'data = b"xyz" * %s' % (support.SOCK_MAX_SIZE // 3), 257 'data_len = len(data)', 258 '', 259 'rd = socket.fromfd(fd, family, sock_type)', 260 'os.close(fd)', 261 '', 262 'with rd:', 263 ' # let the parent block on send()', 264 ' time.sleep(sleep_time)', 265 '', 266 ' received_data = bytearray(data_len)', 267 ' n = 0', 268 ' while n < data_len:', 269 ' n += rd.recv_into(memoryview(received_data)[n:])', 270 '', 271 'if received_data != data:', 272 ' raise Exception("recv error: %s vs %s bytes"', 273 ' % (len(received_data), data_len))', 274 )) 275 276 fd = rd.fileno() 277 proc = self.subprocess(code, str(fd), pass_fds=[fd]) 278 with kill_on_error(proc): 279 rd.close() 280 written = 0 281 while written < len(data): 282 sent = send_func(wr, memoryview(data)[written:]) 283 # sendall() returns None 284 written += len(data) if sent is None else sent 285 self.assertEqual(proc.wait(), 0) 286 287 def test_send(self): 288 self._test_send(socket.socket.send) 289 290 def test_sendall(self): 291 self._test_send(socket.socket.sendall) 292 293 @unittest.skipUnless(hasattr(socket.socket, 'sendmsg'), 'needs sendmsg()') 294 def test_sendmsg(self): 295 self._test_send(lambda sock, data: sock.sendmsg([data])) 296 297 def test_accept(self): 298 sock = socket.create_server((socket_helper.HOST, 0)) 299 self.addCleanup(sock.close) 300 port = sock.getsockname()[1] 301 302 code = '\n'.join(( 303 'import socket, time', 304 '', 305 'host = %r' % socket_helper.HOST, 306 'port = %s' % port, 307 'sleep_time = %r' % self.sleep_time, 308 '', 309 '# let parent block on accept()', 310 'time.sleep(sleep_time)', 311 'with socket.create_connection((host, port)):', 312 ' time.sleep(sleep_time)', 313 )) 314 315 proc = self.subprocess(code) 316 with kill_on_error(proc): 317 client_sock, _ = sock.accept() 318 client_sock.close() 319 self.assertEqual(proc.wait(), 0) 320 321 # Issue #25122: There is a race condition in the FreeBSD kernel on 322 # handling signals in the FIFO device. Skip the test until the bug is 323 # fixed in the kernel. 324 # https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=203162 325 @support.requires_freebsd_version(10, 3) 326 @unittest.skipUnless(hasattr(os, 'mkfifo'), 'needs mkfifo()') 327 def _test_open(self, do_open_close_reader, do_open_close_writer): 328 filename = os_helper.TESTFN 329 330 # Use a fifo: until the child opens it for reading, the parent will 331 # block when trying to open it for writing. 332 os_helper.unlink(filename) 333 try: 334 os.mkfifo(filename) 335 except PermissionError as e: 336 self.skipTest('os.mkfifo(): %s' % e) 337 self.addCleanup(os_helper.unlink, filename) 338 339 code = '\n'.join(( 340 'import os, time', 341 '', 342 'path = %a' % filename, 343 'sleep_time = %r' % self.sleep_time, 344 '', 345 '# let the parent block', 346 'time.sleep(sleep_time)', 347 '', 348 do_open_close_reader, 349 )) 350 351 proc = self.subprocess(code) 352 with kill_on_error(proc): 353 do_open_close_writer(filename) 354 self.assertEqual(proc.wait(), 0) 355 356 def python_open(self, path): 357 fp = open(path, 'w') 358 fp.close() 359 360 @unittest.skipIf(sys.platform == "darwin", 361 "hangs under macOS; see bpo-25234, bpo-35363") 362 def test_open(self): 363 self._test_open("fp = open(path, 'r')\nfp.close()", 364 self.python_open) 365 366 def os_open(self, path): 367 fd = os.open(path, os.O_WRONLY) 368 os.close(fd) 369 370 @unittest.skipIf(sys.platform == "darwin", 371 "hangs under macOS; see bpo-25234, bpo-35363") 372 def test_os_open(self): 373 self._test_open("fd = os.open(path, os.O_RDONLY)\nos.close(fd)", 374 self.os_open) 375 376 377@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") 378class TimeEINTRTest(EINTRBaseTest): 379 """ EINTR tests for the time module. """ 380 381 def test_sleep(self): 382 t0 = time.monotonic() 383 time.sleep(self.sleep_time) 384 self.stop_alarm() 385 dt = time.monotonic() - t0 386 self.check_elapsed_time(dt) 387 388 389@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") 390# bpo-30320: Need pthread_sigmask() to block the signal, otherwise the test 391# is vulnerable to a race condition between the child and the parent processes. 392@unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 393 'need signal.pthread_sigmask()') 394class SignalEINTRTest(EINTRBaseTest): 395 """ EINTR tests for the signal module. """ 396 397 def check_sigwait(self, wait_func): 398 signum = signal.SIGUSR1 399 pid = os.getpid() 400 401 old_handler = signal.signal(signum, lambda *args: None) 402 self.addCleanup(signal.signal, signum, old_handler) 403 404 code = '\n'.join(( 405 'import os, time', 406 'pid = %s' % os.getpid(), 407 'signum = %s' % int(signum), 408 'sleep_time = %r' % self.sleep_time, 409 'time.sleep(sleep_time)', 410 'os.kill(pid, signum)', 411 )) 412 413 old_mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) 414 self.addCleanup(signal.pthread_sigmask, signal.SIG_UNBLOCK, [signum]) 415 416 proc = self.subprocess(code) 417 with kill_on_error(proc): 418 wait_func(signum) 419 420 self.assertEqual(proc.wait(), 0) 421 422 @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'), 423 'need signal.sigwaitinfo()') 424 def test_sigwaitinfo(self): 425 def wait_func(signum): 426 signal.sigwaitinfo([signum]) 427 428 self.check_sigwait(wait_func) 429 430 @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), 431 'need signal.sigwaitinfo()') 432 def test_sigtimedwait(self): 433 def wait_func(signum): 434 signal.sigtimedwait([signum], 120.0) 435 436 self.check_sigwait(wait_func) 437 438 439@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") 440class SelectEINTRTest(EINTRBaseTest): 441 """ EINTR tests for the select module. """ 442 443 def test_select(self): 444 t0 = time.monotonic() 445 select.select([], [], [], self.sleep_time) 446 dt = time.monotonic() - t0 447 self.stop_alarm() 448 self.check_elapsed_time(dt) 449 450 @unittest.skipIf(sys.platform == "darwin", 451 "poll may fail on macOS; see issue #28087") 452 @unittest.skipUnless(hasattr(select, 'poll'), 'need select.poll') 453 def test_poll(self): 454 poller = select.poll() 455 456 t0 = time.monotonic() 457 poller.poll(self.sleep_time * 1e3) 458 dt = time.monotonic() - t0 459 self.stop_alarm() 460 self.check_elapsed_time(dt) 461 462 @unittest.skipUnless(hasattr(select, 'epoll'), 'need select.epoll') 463 def test_epoll(self): 464 poller = select.epoll() 465 self.addCleanup(poller.close) 466 467 t0 = time.monotonic() 468 poller.poll(self.sleep_time) 469 dt = time.monotonic() - t0 470 self.stop_alarm() 471 self.check_elapsed_time(dt) 472 473 @unittest.skipUnless(hasattr(select, 'kqueue'), 'need select.kqueue') 474 def test_kqueue(self): 475 kqueue = select.kqueue() 476 self.addCleanup(kqueue.close) 477 478 t0 = time.monotonic() 479 kqueue.control(None, 1, self.sleep_time) 480 dt = time.monotonic() - t0 481 self.stop_alarm() 482 self.check_elapsed_time(dt) 483 484 @unittest.skipUnless(hasattr(select, 'devpoll'), 'need select.devpoll') 485 def test_devpoll(self): 486 poller = select.devpoll() 487 self.addCleanup(poller.close) 488 489 t0 = time.monotonic() 490 poller.poll(self.sleep_time * 1e3) 491 dt = time.monotonic() - t0 492 self.stop_alarm() 493 self.check_elapsed_time(dt) 494 495 496class FCNTLEINTRTest(EINTRBaseTest): 497 def _lock(self, lock_func, lock_name): 498 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 499 rd1, wr1 = os.pipe() 500 rd2, wr2 = os.pipe() 501 for fd in (rd1, wr1, rd2, wr2): 502 self.addCleanup(os.close, fd) 503 code = textwrap.dedent(f""" 504 import fcntl, os, time 505 with open('{os_helper.TESTFN}', 'wb') as f: 506 fcntl.{lock_name}(f, fcntl.LOCK_EX) 507 os.write({wr1}, b"ok") 508 _ = os.read({rd2}, 2) # wait for parent process 509 time.sleep({self.sleep_time}) 510 """) 511 proc = self.subprocess(code, pass_fds=[wr1, rd2]) 512 with kill_on_error(proc): 513 with open(os_helper.TESTFN, 'wb') as f: 514 # synchronize the subprocess 515 ok = os.read(rd1, 2) 516 self.assertEqual(ok, b"ok") 517 518 # notify the child that the parent is ready 519 start_time = time.monotonic() 520 os.write(wr2, b"go") 521 522 # the child locked the file just a moment ago for 'sleep_time' seconds 523 # that means that the lock below will block for 'sleep_time' minus some 524 # potential context switch delay 525 lock_func(f, fcntl.LOCK_EX) 526 dt = time.monotonic() - start_time 527 self.stop_alarm() 528 self.check_elapsed_time(dt) 529 proc.wait() 530 531 # Issue 35633: See https://bugs.python.org/issue35633#msg333662 532 # skip test rather than accept PermissionError from all platforms 533 @unittest.skipIf(platform.system() == "AIX", "AIX returns PermissionError") 534 def test_lockf(self): 535 self._lock(fcntl.lockf, "lockf") 536 537 def test_flock(self): 538 self._lock(fcntl.flock, "flock") 539 540 541if __name__ == "__main__": 542 unittest.main() 543