1""" 2This test suite exercises some system calls subject to interruption with EINTR, 3to check that it is actually handled transparently. 4It is intended to be run by the main test suite within a child process, to 5ensure there is no background thread running (so that signals are delivered to 6the correct thread). 7Signals are generated in-process using setitimer(ITIMER_REAL), which allows 8sub-second periodicity (contrarily to signal()). 9""" 10 11import contextlib 12import faulthandler 13import fcntl 14import os 15import platform 16import select 17import signal 18import socket 19import subprocess 20import sys 21import time 22import unittest 23 24from test import support 25from test.support import socket_helper 26 27@contextlib.contextmanager 28def kill_on_error(proc): 29 """Context manager killing the subprocess if a Python exception is raised.""" 30 with proc: 31 try: 32 yield proc 33 except: 34 proc.kill() 35 raise 36 37 38@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") 39class EINTRBaseTest(unittest.TestCase): 40 """ Base class for EINTR tests. """ 41 42 # delay for initial signal delivery 43 signal_delay = 0.1 44 # signal delivery periodicity 45 signal_period = 0.1 46 # default sleep time for tests - should obviously have: 47 # sleep_time > signal_period 48 sleep_time = 0.2 49 50 def sighandler(self, signum, frame): 51 self.signals += 1 52 53 def setUp(self): 54 self.signals = 0 55 self.orig_handler = signal.signal(signal.SIGALRM, self.sighandler) 56 signal.setitimer(signal.ITIMER_REAL, self.signal_delay, 57 self.signal_period) 58 59 # Use faulthandler as watchdog to debug when a test hangs 60 # (timeout of 10 minutes) 61 faulthandler.dump_traceback_later(10 * 60, exit=True, 62 file=sys.__stderr__) 63 64 @staticmethod 65 def stop_alarm(): 66 signal.setitimer(signal.ITIMER_REAL, 0, 0) 67 68 def tearDown(self): 69 self.stop_alarm() 70 signal.signal(signal.SIGALRM, self.orig_handler) 71 faulthandler.cancel_dump_traceback_later() 72 73 def subprocess(self, *args, **kw): 74 cmd_args = (sys.executable, '-c') + args 75 return subprocess.Popen(cmd_args, **kw) 76 77 78@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") 79class OSEINTRTest(EINTRBaseTest): 80 """ EINTR tests for the os module. """ 81 82 def new_sleep_process(self): 83 code = 'import time; time.sleep(%r)' % self.sleep_time 84 return self.subprocess(code) 85 86 def _test_wait_multiple(self, wait_func): 87 num = 3 88 processes = [self.new_sleep_process() for _ in range(num)] 89 for _ in range(num): 90 wait_func() 91 # Call the Popen method to avoid a ResourceWarning 92 for proc in processes: 93 proc.wait() 94 95 def test_wait(self): 96 self._test_wait_multiple(os.wait) 97 98 @unittest.skipUnless(hasattr(os, 'wait3'), 'requires wait3()') 99 def test_wait3(self): 100 self._test_wait_multiple(lambda: os.wait3(0)) 101 102 def _test_wait_single(self, wait_func): 103 proc = self.new_sleep_process() 104 wait_func(proc.pid) 105 # Call the Popen method to avoid a ResourceWarning 106 proc.wait() 107 108 def test_waitpid(self): 109 self._test_wait_single(lambda pid: os.waitpid(pid, 0)) 110 111 @unittest.skipUnless(hasattr(os, 'wait4'), 'requires wait4()') 112 def test_wait4(self): 113 self._test_wait_single(lambda pid: os.wait4(pid, 0)) 114 115 def test_read(self): 116 rd, wr = os.pipe() 117 self.addCleanup(os.close, rd) 118 # wr closed explicitly by parent 119 120 # the payload below are smaller than PIPE_BUF, hence the writes will be 121 # atomic 122 datas = [b"hello", b"world", b"spam"] 123 124 code = '\n'.join(( 125 'import os, sys, time', 126 '', 127 'wr = int(sys.argv[1])', 128 'datas = %r' % datas, 129 'sleep_time = %r' % self.sleep_time, 130 '', 131 'for data in datas:', 132 ' # let the parent block on read()', 133 ' time.sleep(sleep_time)', 134 ' os.write(wr, data)', 135 )) 136 137 proc = self.subprocess(code, str(wr), pass_fds=[wr]) 138 with kill_on_error(proc): 139 os.close(wr) 140 for data in datas: 141 self.assertEqual(data, os.read(rd, len(data))) 142 self.assertEqual(proc.wait(), 0) 143 144 def test_write(self): 145 rd, wr = os.pipe() 146 self.addCleanup(os.close, wr) 147 # rd closed explicitly by parent 148 149 # we must write enough data for the write() to block 150 data = b"x" * support.PIPE_MAX_SIZE 151 152 code = '\n'.join(( 153 'import io, os, sys, time', 154 '', 155 'rd = int(sys.argv[1])', 156 'sleep_time = %r' % self.sleep_time, 157 'data = b"x" * %s' % support.PIPE_MAX_SIZE, 158 'data_len = len(data)', 159 '', 160 '# let the parent block on write()', 161 'time.sleep(sleep_time)', 162 '', 163 'read_data = io.BytesIO()', 164 'while len(read_data.getvalue()) < data_len:', 165 ' chunk = os.read(rd, 2 * data_len)', 166 ' read_data.write(chunk)', 167 '', 168 'value = read_data.getvalue()', 169 'if value != data:', 170 ' raise Exception("read error: %s vs %s bytes"', 171 ' % (len(value), data_len))', 172 )) 173 174 proc = self.subprocess(code, str(rd), pass_fds=[rd]) 175 with kill_on_error(proc): 176 os.close(rd) 177 written = 0 178 while written < len(data): 179 written += os.write(wr, memoryview(data)[written:]) 180 self.assertEqual(proc.wait(), 0) 181 182 183@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") 184class SocketEINTRTest(EINTRBaseTest): 185 """ EINTR tests for the socket module. """ 186 187 @unittest.skipUnless(hasattr(socket, 'socketpair'), 'needs socketpair()') 188 def _test_recv(self, recv_func): 189 rd, wr = socket.socketpair() 190 self.addCleanup(rd.close) 191 # wr closed explicitly by parent 192 193 # single-byte payload guard us against partial recv 194 datas = [b"x", b"y", b"z"] 195 196 code = '\n'.join(( 197 'import os, socket, sys, time', 198 '', 199 'fd = int(sys.argv[1])', 200 'family = %s' % int(wr.family), 201 'sock_type = %s' % int(wr.type), 202 'datas = %r' % datas, 203 'sleep_time = %r' % self.sleep_time, 204 '', 205 'wr = socket.fromfd(fd, family, sock_type)', 206 'os.close(fd)', 207 '', 208 'with wr:', 209 ' for data in datas:', 210 ' # let the parent block on recv()', 211 ' time.sleep(sleep_time)', 212 ' wr.sendall(data)', 213 )) 214 215 fd = wr.fileno() 216 proc = self.subprocess(code, str(fd), pass_fds=[fd]) 217 with kill_on_error(proc): 218 wr.close() 219 for data in datas: 220 self.assertEqual(data, recv_func(rd, len(data))) 221 self.assertEqual(proc.wait(), 0) 222 223 def test_recv(self): 224 self._test_recv(socket.socket.recv) 225 226 @unittest.skipUnless(hasattr(socket.socket, 'recvmsg'), 'needs recvmsg()') 227 def test_recvmsg(self): 228 self._test_recv(lambda sock, data: sock.recvmsg(data)[0]) 229 230 def _test_send(self, send_func): 231 rd, wr = socket.socketpair() 232 self.addCleanup(wr.close) 233 # rd closed explicitly by parent 234 235 # we must send enough data for the send() to block 236 data = b"xyz" * (support.SOCK_MAX_SIZE // 3) 237 238 code = '\n'.join(( 239 'import os, socket, sys, time', 240 '', 241 'fd = int(sys.argv[1])', 242 'family = %s' % int(rd.family), 243 'sock_type = %s' % int(rd.type), 244 'sleep_time = %r' % self.sleep_time, 245 'data = b"xyz" * %s' % (support.SOCK_MAX_SIZE // 3), 246 'data_len = len(data)', 247 '', 248 'rd = socket.fromfd(fd, family, sock_type)', 249 'os.close(fd)', 250 '', 251 'with rd:', 252 ' # let the parent block on send()', 253 ' time.sleep(sleep_time)', 254 '', 255 ' received_data = bytearray(data_len)', 256 ' n = 0', 257 ' while n < data_len:', 258 ' n += rd.recv_into(memoryview(received_data)[n:])', 259 '', 260 'if received_data != data:', 261 ' raise Exception("recv error: %s vs %s bytes"', 262 ' % (len(received_data), data_len))', 263 )) 264 265 fd = rd.fileno() 266 proc = self.subprocess(code, str(fd), pass_fds=[fd]) 267 with kill_on_error(proc): 268 rd.close() 269 written = 0 270 while written < len(data): 271 sent = send_func(wr, memoryview(data)[written:]) 272 # sendall() returns None 273 written += len(data) if sent is None else sent 274 self.assertEqual(proc.wait(), 0) 275 276 def test_send(self): 277 self._test_send(socket.socket.send) 278 279 def test_sendall(self): 280 self._test_send(socket.socket.sendall) 281 282 @unittest.skipUnless(hasattr(socket.socket, 'sendmsg'), 'needs sendmsg()') 283 def test_sendmsg(self): 284 self._test_send(lambda sock, data: sock.sendmsg([data])) 285 286 def test_accept(self): 287 sock = socket.create_server((socket_helper.HOST, 0)) 288 self.addCleanup(sock.close) 289 port = sock.getsockname()[1] 290 291 code = '\n'.join(( 292 'import socket, time', 293 '', 294 'host = %r' % socket_helper.HOST, 295 'port = %s' % port, 296 'sleep_time = %r' % self.sleep_time, 297 '', 298 '# let parent block on accept()', 299 'time.sleep(sleep_time)', 300 'with socket.create_connection((host, port)):', 301 ' time.sleep(sleep_time)', 302 )) 303 304 proc = self.subprocess(code) 305 with kill_on_error(proc): 306 client_sock, _ = sock.accept() 307 client_sock.close() 308 self.assertEqual(proc.wait(), 0) 309 310 # Issue #25122: There is a race condition in the FreeBSD kernel on 311 # handling signals in the FIFO device. Skip the test until the bug is 312 # fixed in the kernel. 313 # https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=203162 314 @support.requires_freebsd_version(10, 3) 315 @unittest.skipUnless(hasattr(os, 'mkfifo'), 'needs mkfifo()') 316 def _test_open(self, do_open_close_reader, do_open_close_writer): 317 filename = support.TESTFN 318 319 # Use a fifo: until the child opens it for reading, the parent will 320 # block when trying to open it for writing. 321 support.unlink(filename) 322 try: 323 os.mkfifo(filename) 324 except PermissionError as e: 325 self.skipTest('os.mkfifo(): %s' % e) 326 self.addCleanup(support.unlink, filename) 327 328 code = '\n'.join(( 329 'import os, time', 330 '', 331 'path = %a' % filename, 332 'sleep_time = %r' % self.sleep_time, 333 '', 334 '# let the parent block', 335 'time.sleep(sleep_time)', 336 '', 337 do_open_close_reader, 338 )) 339 340 proc = self.subprocess(code) 341 with kill_on_error(proc): 342 do_open_close_writer(filename) 343 self.assertEqual(proc.wait(), 0) 344 345 def python_open(self, path): 346 fp = open(path, 'w') 347 fp.close() 348 349 @unittest.skipIf(sys.platform == "darwin", 350 "hangs under macOS; see bpo-25234, bpo-35363") 351 def test_open(self): 352 self._test_open("fp = open(path, 'r')\nfp.close()", 353 self.python_open) 354 355 def os_open(self, path): 356 fd = os.open(path, os.O_WRONLY) 357 os.close(fd) 358 359 @unittest.skipIf(sys.platform == "darwin", 360 "hangs under macOS; see bpo-25234, bpo-35363") 361 def test_os_open(self): 362 self._test_open("fd = os.open(path, os.O_RDONLY)\nos.close(fd)", 363 self.os_open) 364 365 366@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") 367class TimeEINTRTest(EINTRBaseTest): 368 """ EINTR tests for the time module. """ 369 370 def test_sleep(self): 371 t0 = time.monotonic() 372 time.sleep(self.sleep_time) 373 self.stop_alarm() 374 dt = time.monotonic() - t0 375 self.assertGreaterEqual(dt, self.sleep_time) 376 377 378@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") 379# bpo-30320: Need pthread_sigmask() to block the signal, otherwise the test 380# is vulnerable to a race condition between the child and the parent processes. 381@unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 382 'need signal.pthread_sigmask()') 383class SignalEINTRTest(EINTRBaseTest): 384 """ EINTR tests for the signal module. """ 385 386 def check_sigwait(self, wait_func): 387 signum = signal.SIGUSR1 388 pid = os.getpid() 389 390 old_handler = signal.signal(signum, lambda *args: None) 391 self.addCleanup(signal.signal, signum, old_handler) 392 393 code = '\n'.join(( 394 'import os, time', 395 'pid = %s' % os.getpid(), 396 'signum = %s' % int(signum), 397 'sleep_time = %r' % self.sleep_time, 398 'time.sleep(sleep_time)', 399 'os.kill(pid, signum)', 400 )) 401 402 old_mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) 403 self.addCleanup(signal.pthread_sigmask, signal.SIG_UNBLOCK, [signum]) 404 405 t0 = time.monotonic() 406 proc = self.subprocess(code) 407 with kill_on_error(proc): 408 wait_func(signum) 409 dt = time.monotonic() - t0 410 411 self.assertEqual(proc.wait(), 0) 412 413 @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'), 414 'need signal.sigwaitinfo()') 415 def test_sigwaitinfo(self): 416 def wait_func(signum): 417 signal.sigwaitinfo([signum]) 418 419 self.check_sigwait(wait_func) 420 421 @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), 422 'need signal.sigwaitinfo()') 423 def test_sigtimedwait(self): 424 def wait_func(signum): 425 signal.sigtimedwait([signum], 120.0) 426 427 self.check_sigwait(wait_func) 428 429 430@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") 431class SelectEINTRTest(EINTRBaseTest): 432 """ EINTR tests for the select module. """ 433 434 def test_select(self): 435 t0 = time.monotonic() 436 select.select([], [], [], self.sleep_time) 437 dt = time.monotonic() - t0 438 self.stop_alarm() 439 self.assertGreaterEqual(dt, self.sleep_time) 440 441 @unittest.skipIf(sys.platform == "darwin", 442 "poll may fail on macOS; see issue #28087") 443 @unittest.skipUnless(hasattr(select, 'poll'), 'need select.poll') 444 def test_poll(self): 445 poller = select.poll() 446 447 t0 = time.monotonic() 448 poller.poll(self.sleep_time * 1e3) 449 dt = time.monotonic() - t0 450 self.stop_alarm() 451 self.assertGreaterEqual(dt, self.sleep_time) 452 453 @unittest.skipUnless(hasattr(select, 'epoll'), 'need select.epoll') 454 def test_epoll(self): 455 poller = select.epoll() 456 self.addCleanup(poller.close) 457 458 t0 = time.monotonic() 459 poller.poll(self.sleep_time) 460 dt = time.monotonic() - t0 461 self.stop_alarm() 462 self.assertGreaterEqual(dt, self.sleep_time) 463 464 @unittest.skipUnless(hasattr(select, 'kqueue'), 'need select.kqueue') 465 def test_kqueue(self): 466 kqueue = select.kqueue() 467 self.addCleanup(kqueue.close) 468 469 t0 = time.monotonic() 470 kqueue.control(None, 1, self.sleep_time) 471 dt = time.monotonic() - t0 472 self.stop_alarm() 473 self.assertGreaterEqual(dt, self.sleep_time) 474 475 @unittest.skipUnless(hasattr(select, 'devpoll'), 'need select.devpoll') 476 def test_devpoll(self): 477 poller = select.devpoll() 478 self.addCleanup(poller.close) 479 480 t0 = time.monotonic() 481 poller.poll(self.sleep_time * 1e3) 482 dt = time.monotonic() - t0 483 self.stop_alarm() 484 self.assertGreaterEqual(dt, self.sleep_time) 485 486 487class FNTLEINTRTest(EINTRBaseTest): 488 def _lock(self, lock_func, lock_name): 489 self.addCleanup(support.unlink, support.TESTFN) 490 code = '\n'.join(( 491 "import fcntl, time", 492 "with open('%s', 'wb') as f:" % support.TESTFN, 493 " fcntl.%s(f, fcntl.LOCK_EX)" % lock_name, 494 " time.sleep(%s)" % self.sleep_time)) 495 start_time = time.monotonic() 496 proc = self.subprocess(code) 497 with kill_on_error(proc): 498 with open(support.TESTFN, 'wb') as f: 499 while True: # synchronize the subprocess 500 dt = time.monotonic() - start_time 501 if dt > 60.0: 502 raise Exception("failed to sync child in %.1f sec" % dt) 503 try: 504 lock_func(f, fcntl.LOCK_EX | fcntl.LOCK_NB) 505 lock_func(f, fcntl.LOCK_UN) 506 time.sleep(0.01) 507 except BlockingIOError: 508 break 509 # the child locked the file just a moment ago for 'sleep_time' seconds 510 # that means that the lock below will block for 'sleep_time' minus some 511 # potential context switch delay 512 lock_func(f, fcntl.LOCK_EX) 513 dt = time.monotonic() - start_time 514 self.assertGreaterEqual(dt, self.sleep_time) 515 self.stop_alarm() 516 proc.wait() 517 518 # Issue 35633: See https://bugs.python.org/issue35633#msg333662 519 # skip test rather than accept PermissionError from all platforms 520 @unittest.skipIf(platform.system() == "AIX", "AIX returns PermissionError") 521 def test_lockf(self): 522 self._lock(fcntl.lockf, "lockf") 523 524 def test_flock(self): 525 self._lock(fcntl.flock, "flock") 526 527 528if __name__ == "__main__": 529 unittest.main() 530