1"Test posix functions" 2 3from test import support 4from test.support.script_helper import assert_python_ok 5 6# Skip these tests if there is no posix module. 7posix = support.import_module('posix') 8 9import errno 10import sys 11import signal 12import time 13import os 14import platform 15import pwd 16import stat 17import tempfile 18import unittest 19import warnings 20import textwrap 21 22_DUMMY_SYMLINK = os.path.join(tempfile.gettempdir(), 23 support.TESTFN + '-dummy-symlink') 24 25requires_32b = unittest.skipUnless(sys.maxsize < 2**32, 26 'test is only meaningful on 32-bit builds') 27 28def _supports_sched(): 29 if not hasattr(posix, 'sched_getscheduler'): 30 return False 31 try: 32 posix.sched_getscheduler(0) 33 except OSError as e: 34 if e.errno == errno.ENOSYS: 35 return False 36 return True 37 38requires_sched = unittest.skipUnless(_supports_sched(), 'requires POSIX scheduler API') 39 40class PosixTester(unittest.TestCase): 41 42 def setUp(self): 43 # create empty file 44 fp = open(support.TESTFN, 'w+') 45 fp.close() 46 self.teardown_files = [ support.TESTFN ] 47 self._warnings_manager = support.check_warnings() 48 self._warnings_manager.__enter__() 49 warnings.filterwarnings('ignore', '.* potential security risk .*', 50 RuntimeWarning) 51 52 def tearDown(self): 53 for teardown_file in self.teardown_files: 54 support.unlink(teardown_file) 55 self._warnings_manager.__exit__(None, None, None) 56 57 def testNoArgFunctions(self): 58 # test posix functions which take no arguments and have 59 # no side-effects which we need to cleanup (e.g., fork, wait, abort) 60 NO_ARG_FUNCTIONS = [ "ctermid", "getcwd", "getcwdb", "uname", 61 "times", "getloadavg", 62 "getegid", "geteuid", "getgid", "getgroups", 63 "getpid", "getpgrp", "getppid", "getuid", "sync", 64 ] 65 66 for name in NO_ARG_FUNCTIONS: 67 posix_func = getattr(posix, name, None) 68 if posix_func is not None: 69 posix_func() 70 self.assertRaises(TypeError, posix_func, 1) 71 72 @unittest.skipUnless(hasattr(posix, 'getresuid'), 73 'test needs posix.getresuid()') 74 def test_getresuid(self): 75 user_ids = posix.getresuid() 76 self.assertEqual(len(user_ids), 3) 77 for val in user_ids: 78 self.assertGreaterEqual(val, 0) 79 80 @unittest.skipUnless(hasattr(posix, 'getresgid'), 81 'test needs posix.getresgid()') 82 def test_getresgid(self): 83 group_ids = posix.getresgid() 84 self.assertEqual(len(group_ids), 3) 85 for val in group_ids: 86 self.assertGreaterEqual(val, 0) 87 88 @unittest.skipUnless(hasattr(posix, 'setresuid'), 89 'test needs posix.setresuid()') 90 def test_setresuid(self): 91 current_user_ids = posix.getresuid() 92 self.assertIsNone(posix.setresuid(*current_user_ids)) 93 # -1 means don't change that value. 94 self.assertIsNone(posix.setresuid(-1, -1, -1)) 95 96 @unittest.skipUnless(hasattr(posix, 'setresuid'), 97 'test needs posix.setresuid()') 98 def test_setresuid_exception(self): 99 # Don't do this test if someone is silly enough to run us as root. 100 current_user_ids = posix.getresuid() 101 if 0 not in current_user_ids: 102 new_user_ids = (current_user_ids[0]+1, -1, -1) 103 self.assertRaises(OSError, posix.setresuid, *new_user_ids) 104 105 @unittest.skipUnless(hasattr(posix, 'setresgid'), 106 'test needs posix.setresgid()') 107 def test_setresgid(self): 108 current_group_ids = posix.getresgid() 109 self.assertIsNone(posix.setresgid(*current_group_ids)) 110 # -1 means don't change that value. 111 self.assertIsNone(posix.setresgid(-1, -1, -1)) 112 113 @unittest.skipUnless(hasattr(posix, 'setresgid'), 114 'test needs posix.setresgid()') 115 def test_setresgid_exception(self): 116 # Don't do this test if someone is silly enough to run us as root. 117 current_group_ids = posix.getresgid() 118 if 0 not in current_group_ids: 119 new_group_ids = (current_group_ids[0]+1, -1, -1) 120 self.assertRaises(OSError, posix.setresgid, *new_group_ids) 121 122 @unittest.skipUnless(hasattr(posix, 'initgroups'), 123 "test needs os.initgroups()") 124 def test_initgroups(self): 125 # It takes a string and an integer; check that it raises a TypeError 126 # for other argument lists. 127 self.assertRaises(TypeError, posix.initgroups) 128 self.assertRaises(TypeError, posix.initgroups, None) 129 self.assertRaises(TypeError, posix.initgroups, 3, "foo") 130 self.assertRaises(TypeError, posix.initgroups, "foo", 3, object()) 131 132 # If a non-privileged user invokes it, it should fail with OSError 133 # EPERM. 134 if os.getuid() != 0: 135 try: 136 name = pwd.getpwuid(posix.getuid()).pw_name 137 except KeyError: 138 # the current UID may not have a pwd entry 139 raise unittest.SkipTest("need a pwd entry") 140 try: 141 posix.initgroups(name, 13) 142 except OSError as e: 143 self.assertEqual(e.errno, errno.EPERM) 144 else: 145 self.fail("Expected OSError to be raised by initgroups") 146 147 @unittest.skipUnless(hasattr(posix, 'statvfs'), 148 'test needs posix.statvfs()') 149 def test_statvfs(self): 150 self.assertTrue(posix.statvfs(os.curdir)) 151 152 @unittest.skipUnless(hasattr(posix, 'fstatvfs'), 153 'test needs posix.fstatvfs()') 154 def test_fstatvfs(self): 155 fp = open(support.TESTFN) 156 try: 157 self.assertTrue(posix.fstatvfs(fp.fileno())) 158 self.assertTrue(posix.statvfs(fp.fileno())) 159 finally: 160 fp.close() 161 162 @unittest.skipUnless(hasattr(posix, 'ftruncate'), 163 'test needs posix.ftruncate()') 164 def test_ftruncate(self): 165 fp = open(support.TESTFN, 'w+') 166 try: 167 # we need to have some data to truncate 168 fp.write('test') 169 fp.flush() 170 posix.ftruncate(fp.fileno(), 0) 171 finally: 172 fp.close() 173 174 @unittest.skipUnless(hasattr(posix, 'truncate'), "test needs posix.truncate()") 175 def test_truncate(self): 176 with open(support.TESTFN, 'w') as fp: 177 fp.write('test') 178 fp.flush() 179 posix.truncate(support.TESTFN, 0) 180 181 @unittest.skipUnless(getattr(os, 'execve', None) in os.supports_fd, "test needs execve() to support the fd parameter") 182 @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") 183 @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()") 184 def test_fexecve(self): 185 fp = os.open(sys.executable, os.O_RDONLY) 186 try: 187 pid = os.fork() 188 if pid == 0: 189 os.chdir(os.path.split(sys.executable)[0]) 190 posix.execve(fp, [sys.executable, '-c', 'pass'], os.environ) 191 else: 192 self.assertEqual(os.waitpid(pid, 0), (pid, 0)) 193 finally: 194 os.close(fp) 195 196 197 @unittest.skipUnless(hasattr(posix, 'waitid'), "test needs posix.waitid()") 198 @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") 199 def test_waitid(self): 200 pid = os.fork() 201 if pid == 0: 202 os.chdir(os.path.split(sys.executable)[0]) 203 posix.execve(sys.executable, [sys.executable, '-c', 'pass'], os.environ) 204 else: 205 res = posix.waitid(posix.P_PID, pid, posix.WEXITED) 206 self.assertEqual(pid, res.si_pid) 207 208 @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") 209 def test_register_at_fork(self): 210 with self.assertRaises(TypeError, msg="Positional args not allowed"): 211 os.register_at_fork(lambda: None) 212 with self.assertRaises(TypeError, msg="Args must be callable"): 213 os.register_at_fork(before=2) 214 with self.assertRaises(TypeError, msg="Args must be callable"): 215 os.register_at_fork(after_in_child="three") 216 with self.assertRaises(TypeError, msg="Args must be callable"): 217 os.register_at_fork(after_in_parent=b"Five") 218 with self.assertRaises(TypeError, msg="Args must not be None"): 219 os.register_at_fork(before=None) 220 with self.assertRaises(TypeError, msg="Args must not be None"): 221 os.register_at_fork(after_in_child=None) 222 with self.assertRaises(TypeError, msg="Args must not be None"): 223 os.register_at_fork(after_in_parent=None) 224 with self.assertRaises(TypeError, msg="Invalid arg was allowed"): 225 # Ensure a combination of valid and invalid is an error. 226 os.register_at_fork(before=None, after_in_parent=lambda: 3) 227 with self.assertRaises(TypeError, msg="Invalid arg was allowed"): 228 # Ensure a combination of valid and invalid is an error. 229 os.register_at_fork(before=lambda: None, after_in_child='') 230 # We test actual registrations in their own process so as not to 231 # pollute this one. There is no way to unregister for cleanup. 232 code = """if 1: 233 import os 234 235 r, w = os.pipe() 236 fin_r, fin_w = os.pipe() 237 238 os.register_at_fork(before=lambda: os.write(w, b'A')) 239 os.register_at_fork(after_in_parent=lambda: os.write(w, b'C')) 240 os.register_at_fork(after_in_child=lambda: os.write(w, b'E')) 241 os.register_at_fork(before=lambda: os.write(w, b'B'), 242 after_in_parent=lambda: os.write(w, b'D'), 243 after_in_child=lambda: os.write(w, b'F')) 244 245 pid = os.fork() 246 if pid == 0: 247 # At this point, after-forkers have already been executed 248 os.close(w) 249 # Wait for parent to tell us to exit 250 os.read(fin_r, 1) 251 os._exit(0) 252 else: 253 try: 254 os.close(w) 255 with open(r, "rb") as f: 256 data = f.read() 257 assert len(data) == 6, data 258 # Check before-fork callbacks 259 assert data[:2] == b'BA', data 260 # Check after-fork callbacks 261 assert sorted(data[2:]) == list(b'CDEF'), data 262 assert data.index(b'C') < data.index(b'D'), data 263 assert data.index(b'E') < data.index(b'F'), data 264 finally: 265 os.write(fin_w, b'!') 266 """ 267 assert_python_ok('-c', code) 268 269 @unittest.skipUnless(hasattr(posix, 'lockf'), "test needs posix.lockf()") 270 def test_lockf(self): 271 fd = os.open(support.TESTFN, os.O_WRONLY | os.O_CREAT) 272 try: 273 os.write(fd, b'test') 274 os.lseek(fd, 0, os.SEEK_SET) 275 posix.lockf(fd, posix.F_LOCK, 4) 276 # section is locked 277 posix.lockf(fd, posix.F_ULOCK, 4) 278 finally: 279 os.close(fd) 280 281 @unittest.skipUnless(hasattr(posix, 'pread'), "test needs posix.pread()") 282 def test_pread(self): 283 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) 284 try: 285 os.write(fd, b'test') 286 os.lseek(fd, 0, os.SEEK_SET) 287 self.assertEqual(b'es', posix.pread(fd, 2, 1)) 288 # the first pread() shouldn't disturb the file offset 289 self.assertEqual(b'te', posix.read(fd, 2)) 290 finally: 291 os.close(fd) 292 293 @unittest.skipUnless(hasattr(posix, 'preadv'), "test needs posix.preadv()") 294 def test_preadv(self): 295 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) 296 try: 297 os.write(fd, b'test1tt2t3t5t6t6t8') 298 buf = [bytearray(i) for i in [5, 3, 2]] 299 self.assertEqual(posix.preadv(fd, buf, 3), 10) 300 self.assertEqual([b't1tt2', b't3t', b'5t'], list(buf)) 301 finally: 302 os.close(fd) 303 304 @unittest.skipUnless(hasattr(posix, 'preadv'), "test needs posix.preadv()") 305 @unittest.skipUnless(hasattr(posix, 'RWF_HIPRI'), "test needs posix.RWF_HIPRI") 306 def test_preadv_flags(self): 307 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) 308 try: 309 os.write(fd, b'test1tt2t3t5t6t6t8') 310 buf = [bytearray(i) for i in [5, 3, 2]] 311 self.assertEqual(posix.preadv(fd, buf, 3, os.RWF_HIPRI), 10) 312 self.assertEqual([b't1tt2', b't3t', b'5t'], list(buf)) 313 except NotImplementedError: 314 self.skipTest("preadv2 not available") 315 except OSError as inst: 316 # Is possible that the macro RWF_HIPRI was defined at compilation time 317 # but the option is not supported by the kernel or the runtime libc shared 318 # library. 319 if inst.errno in {errno.EINVAL, errno.ENOTSUP}: 320 raise unittest.SkipTest("RWF_HIPRI is not supported by the current system") 321 else: 322 raise 323 finally: 324 os.close(fd) 325 326 @unittest.skipUnless(hasattr(posix, 'preadv'), "test needs posix.preadv()") 327 @requires_32b 328 def test_preadv_overflow_32bits(self): 329 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) 330 try: 331 buf = [bytearray(2**16)] * 2**15 332 with self.assertRaises(OSError) as cm: 333 os.preadv(fd, buf, 0) 334 self.assertEqual(cm.exception.errno, errno.EINVAL) 335 self.assertEqual(bytes(buf[0]), b'\0'* 2**16) 336 finally: 337 os.close(fd) 338 339 @unittest.skipUnless(hasattr(posix, 'pwrite'), "test needs posix.pwrite()") 340 def test_pwrite(self): 341 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) 342 try: 343 os.write(fd, b'test') 344 os.lseek(fd, 0, os.SEEK_SET) 345 posix.pwrite(fd, b'xx', 1) 346 self.assertEqual(b'txxt', posix.read(fd, 4)) 347 finally: 348 os.close(fd) 349 350 @unittest.skipUnless(hasattr(posix, 'pwritev'), "test needs posix.pwritev()") 351 def test_pwritev(self): 352 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) 353 try: 354 os.write(fd, b"xx") 355 os.lseek(fd, 0, os.SEEK_SET) 356 n = os.pwritev(fd, [b'test1', b'tt2', b't3'], 2) 357 self.assertEqual(n, 10) 358 359 os.lseek(fd, 0, os.SEEK_SET) 360 self.assertEqual(b'xxtest1tt2t3', posix.read(fd, 100)) 361 finally: 362 os.close(fd) 363 364 @unittest.skipUnless(hasattr(posix, 'pwritev'), "test needs posix.pwritev()") 365 @unittest.skipUnless(hasattr(posix, 'os.RWF_SYNC'), "test needs os.RWF_SYNC") 366 def test_pwritev_flags(self): 367 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) 368 try: 369 os.write(fd,b"xx") 370 os.lseek(fd, 0, os.SEEK_SET) 371 n = os.pwritev(fd, [b'test1', b'tt2', b't3'], 2, os.RWF_SYNC) 372 self.assertEqual(n, 10) 373 374 os.lseek(fd, 0, os.SEEK_SET) 375 self.assertEqual(b'xxtest1tt2', posix.read(fd, 100)) 376 finally: 377 os.close(fd) 378 379 @unittest.skipUnless(hasattr(posix, 'pwritev'), "test needs posix.pwritev()") 380 @requires_32b 381 def test_pwritev_overflow_32bits(self): 382 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) 383 try: 384 with self.assertRaises(OSError) as cm: 385 os.pwritev(fd, [b"x" * 2**16] * 2**15, 0) 386 self.assertEqual(cm.exception.errno, errno.EINVAL) 387 finally: 388 os.close(fd) 389 390 @unittest.skipUnless(hasattr(posix, 'posix_fallocate'), 391 "test needs posix.posix_fallocate()") 392 def test_posix_fallocate(self): 393 fd = os.open(support.TESTFN, os.O_WRONLY | os.O_CREAT) 394 try: 395 posix.posix_fallocate(fd, 0, 10) 396 except OSError as inst: 397 # issue10812, ZFS doesn't appear to support posix_fallocate, 398 # so skip Solaris-based since they are likely to have ZFS. 399 # issue33655: Also ignore EINVAL on *BSD since ZFS is also 400 # often used there. 401 if inst.errno == errno.EINVAL and sys.platform.startswith( 402 ('sunos', 'freebsd', 'netbsd', 'openbsd', 'gnukfreebsd')): 403 raise unittest.SkipTest("test may fail on ZFS filesystems") 404 else: 405 raise 406 finally: 407 os.close(fd) 408 409 # issue31106 - posix_fallocate() does not set error in errno. 410 @unittest.skipUnless(hasattr(posix, 'posix_fallocate'), 411 "test needs posix.posix_fallocate()") 412 def test_posix_fallocate_errno(self): 413 try: 414 posix.posix_fallocate(-42, 0, 10) 415 except OSError as inst: 416 if inst.errno != errno.EBADF: 417 raise 418 419 @unittest.skipUnless(hasattr(posix, 'posix_fadvise'), 420 "test needs posix.posix_fadvise()") 421 def test_posix_fadvise(self): 422 fd = os.open(support.TESTFN, os.O_RDONLY) 423 try: 424 posix.posix_fadvise(fd, 0, 0, posix.POSIX_FADV_WILLNEED) 425 finally: 426 os.close(fd) 427 428 @unittest.skipUnless(hasattr(posix, 'posix_fadvise'), 429 "test needs posix.posix_fadvise()") 430 def test_posix_fadvise_errno(self): 431 try: 432 posix.posix_fadvise(-42, 0, 0, posix.POSIX_FADV_WILLNEED) 433 except OSError as inst: 434 if inst.errno != errno.EBADF: 435 raise 436 437 @unittest.skipUnless(os.utime in os.supports_fd, "test needs fd support in os.utime") 438 def test_utime_with_fd(self): 439 now = time.time() 440 fd = os.open(support.TESTFN, os.O_RDONLY) 441 try: 442 posix.utime(fd) 443 posix.utime(fd, None) 444 self.assertRaises(TypeError, posix.utime, fd, (None, None)) 445 self.assertRaises(TypeError, posix.utime, fd, (now, None)) 446 self.assertRaises(TypeError, posix.utime, fd, (None, now)) 447 posix.utime(fd, (int(now), int(now))) 448 posix.utime(fd, (now, now)) 449 self.assertRaises(ValueError, posix.utime, fd, (now, now), ns=(now, now)) 450 self.assertRaises(ValueError, posix.utime, fd, (now, 0), ns=(None, None)) 451 self.assertRaises(ValueError, posix.utime, fd, (None, None), ns=(now, 0)) 452 posix.utime(fd, (int(now), int((now - int(now)) * 1e9))) 453 posix.utime(fd, ns=(int(now), int((now - int(now)) * 1e9))) 454 455 finally: 456 os.close(fd) 457 458 @unittest.skipUnless(os.utime in os.supports_follow_symlinks, "test needs follow_symlinks support in os.utime") 459 def test_utime_nofollow_symlinks(self): 460 now = time.time() 461 posix.utime(support.TESTFN, None, follow_symlinks=False) 462 self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, None), follow_symlinks=False) 463 self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None), follow_symlinks=False) 464 self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now), follow_symlinks=False) 465 posix.utime(support.TESTFN, (int(now), int(now)), follow_symlinks=False) 466 posix.utime(support.TESTFN, (now, now), follow_symlinks=False) 467 posix.utime(support.TESTFN, follow_symlinks=False) 468 469 @unittest.skipUnless(hasattr(posix, 'writev'), "test needs posix.writev()") 470 def test_writev(self): 471 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) 472 try: 473 n = os.writev(fd, (b'test1', b'tt2', b't3')) 474 self.assertEqual(n, 10) 475 476 os.lseek(fd, 0, os.SEEK_SET) 477 self.assertEqual(b'test1tt2t3', posix.read(fd, 10)) 478 479 # Issue #20113: empty list of buffers should not crash 480 try: 481 size = posix.writev(fd, []) 482 except OSError: 483 # writev(fd, []) raises OSError(22, "Invalid argument") 484 # on OpenIndiana 485 pass 486 else: 487 self.assertEqual(size, 0) 488 finally: 489 os.close(fd) 490 491 @unittest.skipUnless(hasattr(posix, 'writev'), "test needs posix.writev()") 492 @requires_32b 493 def test_writev_overflow_32bits(self): 494 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) 495 try: 496 with self.assertRaises(OSError) as cm: 497 os.writev(fd, [b"x" * 2**16] * 2**15) 498 self.assertEqual(cm.exception.errno, errno.EINVAL) 499 finally: 500 os.close(fd) 501 502 @unittest.skipUnless(hasattr(posix, 'readv'), "test needs posix.readv()") 503 def test_readv(self): 504 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) 505 try: 506 os.write(fd, b'test1tt2t3') 507 os.lseek(fd, 0, os.SEEK_SET) 508 buf = [bytearray(i) for i in [5, 3, 2]] 509 self.assertEqual(posix.readv(fd, buf), 10) 510 self.assertEqual([b'test1', b'tt2', b't3'], [bytes(i) for i in buf]) 511 512 # Issue #20113: empty list of buffers should not crash 513 try: 514 size = posix.readv(fd, []) 515 except OSError: 516 # readv(fd, []) raises OSError(22, "Invalid argument") 517 # on OpenIndiana 518 pass 519 else: 520 self.assertEqual(size, 0) 521 finally: 522 os.close(fd) 523 524 @unittest.skipUnless(hasattr(posix, 'readv'), "test needs posix.readv()") 525 @requires_32b 526 def test_readv_overflow_32bits(self): 527 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) 528 try: 529 buf = [bytearray(2**16)] * 2**15 530 with self.assertRaises(OSError) as cm: 531 os.readv(fd, buf) 532 self.assertEqual(cm.exception.errno, errno.EINVAL) 533 self.assertEqual(bytes(buf[0]), b'\0'* 2**16) 534 finally: 535 os.close(fd) 536 537 @unittest.skipUnless(hasattr(posix, 'dup'), 538 'test needs posix.dup()') 539 def test_dup(self): 540 fp = open(support.TESTFN) 541 try: 542 fd = posix.dup(fp.fileno()) 543 self.assertIsInstance(fd, int) 544 os.close(fd) 545 finally: 546 fp.close() 547 548 @unittest.skipUnless(hasattr(posix, 'confstr'), 549 'test needs posix.confstr()') 550 def test_confstr(self): 551 self.assertRaises(ValueError, posix.confstr, "CS_garbage") 552 self.assertEqual(len(posix.confstr("CS_PATH")) > 0, True) 553 554 @unittest.skipUnless(hasattr(posix, 'dup2'), 555 'test needs posix.dup2()') 556 def test_dup2(self): 557 fp1 = open(support.TESTFN) 558 fp2 = open(support.TESTFN) 559 try: 560 posix.dup2(fp1.fileno(), fp2.fileno()) 561 finally: 562 fp1.close() 563 fp2.close() 564 565 @unittest.skipUnless(hasattr(os, 'O_CLOEXEC'), "needs os.O_CLOEXEC") 566 @support.requires_linux_version(2, 6, 23) 567 def test_oscloexec(self): 568 fd = os.open(support.TESTFN, os.O_RDONLY|os.O_CLOEXEC) 569 self.addCleanup(os.close, fd) 570 self.assertFalse(os.get_inheritable(fd)) 571 572 @unittest.skipUnless(hasattr(posix, 'O_EXLOCK'), 573 'test needs posix.O_EXLOCK') 574 def test_osexlock(self): 575 fd = os.open(support.TESTFN, 576 os.O_WRONLY|os.O_EXLOCK|os.O_CREAT) 577 self.assertRaises(OSError, os.open, support.TESTFN, 578 os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK) 579 os.close(fd) 580 581 if hasattr(posix, "O_SHLOCK"): 582 fd = os.open(support.TESTFN, 583 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) 584 self.assertRaises(OSError, os.open, support.TESTFN, 585 os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK) 586 os.close(fd) 587 588 @unittest.skipUnless(hasattr(posix, 'O_SHLOCK'), 589 'test needs posix.O_SHLOCK') 590 def test_osshlock(self): 591 fd1 = os.open(support.TESTFN, 592 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) 593 fd2 = os.open(support.TESTFN, 594 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) 595 os.close(fd2) 596 os.close(fd1) 597 598 if hasattr(posix, "O_EXLOCK"): 599 fd = os.open(support.TESTFN, 600 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) 601 self.assertRaises(OSError, os.open, support.TESTFN, 602 os.O_RDONLY|os.O_EXLOCK|os.O_NONBLOCK) 603 os.close(fd) 604 605 @unittest.skipUnless(hasattr(posix, 'fstat'), 606 'test needs posix.fstat()') 607 def test_fstat(self): 608 fp = open(support.TESTFN) 609 try: 610 self.assertTrue(posix.fstat(fp.fileno())) 611 self.assertTrue(posix.stat(fp.fileno())) 612 613 self.assertRaisesRegex(TypeError, 614 'should be string, bytes, os.PathLike or integer, not', 615 posix.stat, float(fp.fileno())) 616 finally: 617 fp.close() 618 619 def test_stat(self): 620 self.assertTrue(posix.stat(support.TESTFN)) 621 self.assertTrue(posix.stat(os.fsencode(support.TESTFN))) 622 623 self.assertWarnsRegex(DeprecationWarning, 624 'should be string, bytes, os.PathLike or integer, not', 625 posix.stat, bytearray(os.fsencode(support.TESTFN))) 626 self.assertRaisesRegex(TypeError, 627 'should be string, bytes, os.PathLike or integer, not', 628 posix.stat, None) 629 self.assertRaisesRegex(TypeError, 630 'should be string, bytes, os.PathLike or integer, not', 631 posix.stat, list(support.TESTFN)) 632 self.assertRaisesRegex(TypeError, 633 'should be string, bytes, os.PathLike or integer, not', 634 posix.stat, list(os.fsencode(support.TESTFN))) 635 636 @unittest.skipUnless(hasattr(posix, 'mkfifo'), "don't have mkfifo()") 637 def test_mkfifo(self): 638 support.unlink(support.TESTFN) 639 try: 640 posix.mkfifo(support.TESTFN, stat.S_IRUSR | stat.S_IWUSR) 641 except PermissionError as e: 642 self.skipTest('posix.mkfifo(): %s' % e) 643 self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode)) 644 645 @unittest.skipUnless(hasattr(posix, 'mknod') and hasattr(stat, 'S_IFIFO'), 646 "don't have mknod()/S_IFIFO") 647 def test_mknod(self): 648 # Test using mknod() to create a FIFO (the only use specified 649 # by POSIX). 650 support.unlink(support.TESTFN) 651 mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR 652 try: 653 posix.mknod(support.TESTFN, mode, 0) 654 except OSError as e: 655 # Some old systems don't allow unprivileged users to use 656 # mknod(), or only support creating device nodes. 657 self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES)) 658 else: 659 self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode)) 660 661 # Keyword arguments are also supported 662 support.unlink(support.TESTFN) 663 try: 664 posix.mknod(path=support.TESTFN, mode=mode, device=0, 665 dir_fd=None) 666 except OSError as e: 667 self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES)) 668 669 @unittest.skipUnless(hasattr(posix, 'makedev'), 'test needs posix.makedev()') 670 def test_makedev(self): 671 st = posix.stat(support.TESTFN) 672 dev = st.st_dev 673 self.assertIsInstance(dev, int) 674 self.assertGreaterEqual(dev, 0) 675 676 major = posix.major(dev) 677 self.assertIsInstance(major, int) 678 self.assertGreaterEqual(major, 0) 679 self.assertEqual(posix.major(dev), major) 680 self.assertRaises(TypeError, posix.major, float(dev)) 681 self.assertRaises(TypeError, posix.major) 682 self.assertRaises((ValueError, OverflowError), posix.major, -1) 683 684 minor = posix.minor(dev) 685 self.assertIsInstance(minor, int) 686 self.assertGreaterEqual(minor, 0) 687 self.assertEqual(posix.minor(dev), minor) 688 self.assertRaises(TypeError, posix.minor, float(dev)) 689 self.assertRaises(TypeError, posix.minor) 690 self.assertRaises((ValueError, OverflowError), posix.minor, -1) 691 692 self.assertEqual(posix.makedev(major, minor), dev) 693 self.assertRaises(TypeError, posix.makedev, float(major), minor) 694 self.assertRaises(TypeError, posix.makedev, major, float(minor)) 695 self.assertRaises(TypeError, posix.makedev, major) 696 self.assertRaises(TypeError, posix.makedev) 697 698 def _test_all_chown_common(self, chown_func, first_param, stat_func): 699 """Common code for chown, fchown and lchown tests.""" 700 def check_stat(uid, gid): 701 if stat_func is not None: 702 stat = stat_func(first_param) 703 self.assertEqual(stat.st_uid, uid) 704 self.assertEqual(stat.st_gid, gid) 705 uid = os.getuid() 706 gid = os.getgid() 707 # test a successful chown call 708 chown_func(first_param, uid, gid) 709 check_stat(uid, gid) 710 chown_func(first_param, -1, gid) 711 check_stat(uid, gid) 712 chown_func(first_param, uid, -1) 713 check_stat(uid, gid) 714 715 if uid == 0: 716 # Try an amusingly large uid/gid to make sure we handle 717 # large unsigned values. (chown lets you use any 718 # uid/gid you like, even if they aren't defined.) 719 # 720 # This problem keeps coming up: 721 # http://bugs.python.org/issue1747858 722 # http://bugs.python.org/issue4591 723 # http://bugs.python.org/issue15301 724 # Hopefully the fix in 4591 fixes it for good! 725 # 726 # This part of the test only runs when run as root. 727 # Only scary people run their tests as root. 728 729 big_value = 2**31 730 chown_func(first_param, big_value, big_value) 731 check_stat(big_value, big_value) 732 chown_func(first_param, -1, -1) 733 check_stat(big_value, big_value) 734 chown_func(first_param, uid, gid) 735 check_stat(uid, gid) 736 elif platform.system() in ('HP-UX', 'SunOS'): 737 # HP-UX and Solaris can allow a non-root user to chown() to root 738 # (issue #5113) 739 raise unittest.SkipTest("Skipping because of non-standard chown() " 740 "behavior") 741 else: 742 # non-root cannot chown to root, raises OSError 743 self.assertRaises(OSError, chown_func, first_param, 0, 0) 744 check_stat(uid, gid) 745 self.assertRaises(OSError, chown_func, first_param, 0, -1) 746 check_stat(uid, gid) 747 if 0 not in os.getgroups(): 748 self.assertRaises(OSError, chown_func, first_param, -1, 0) 749 check_stat(uid, gid) 750 # test illegal types 751 for t in str, float: 752 self.assertRaises(TypeError, chown_func, first_param, t(uid), gid) 753 check_stat(uid, gid) 754 self.assertRaises(TypeError, chown_func, first_param, uid, t(gid)) 755 check_stat(uid, gid) 756 757 @unittest.skipUnless(hasattr(posix, 'chown'), "test needs os.chown()") 758 def test_chown(self): 759 # raise an OSError if the file does not exist 760 os.unlink(support.TESTFN) 761 self.assertRaises(OSError, posix.chown, support.TESTFN, -1, -1) 762 763 # re-create the file 764 support.create_empty_file(support.TESTFN) 765 self._test_all_chown_common(posix.chown, support.TESTFN, posix.stat) 766 767 @unittest.skipUnless(hasattr(posix, 'fchown'), "test needs os.fchown()") 768 def test_fchown(self): 769 os.unlink(support.TESTFN) 770 771 # re-create the file 772 test_file = open(support.TESTFN, 'w') 773 try: 774 fd = test_file.fileno() 775 self._test_all_chown_common(posix.fchown, fd, 776 getattr(posix, 'fstat', None)) 777 finally: 778 test_file.close() 779 780 @unittest.skipUnless(hasattr(posix, 'lchown'), "test needs os.lchown()") 781 def test_lchown(self): 782 os.unlink(support.TESTFN) 783 # create a symlink 784 os.symlink(_DUMMY_SYMLINK, support.TESTFN) 785 self._test_all_chown_common(posix.lchown, support.TESTFN, 786 getattr(posix, 'lstat', None)) 787 788 @unittest.skipUnless(hasattr(posix, 'chdir'), 'test needs posix.chdir()') 789 def test_chdir(self): 790 posix.chdir(os.curdir) 791 self.assertRaises(OSError, posix.chdir, support.TESTFN) 792 793 def test_listdir(self): 794 self.assertIn(support.TESTFN, posix.listdir(os.curdir)) 795 796 def test_listdir_default(self): 797 # When listdir is called without argument, 798 # it's the same as listdir(os.curdir). 799 self.assertIn(support.TESTFN, posix.listdir()) 800 801 def test_listdir_bytes(self): 802 # When listdir is called with a bytes object, 803 # the returned strings are of type bytes. 804 self.assertIn(os.fsencode(support.TESTFN), posix.listdir(b'.')) 805 806 def test_listdir_bytes_like(self): 807 for cls in bytearray, memoryview: 808 with self.assertWarns(DeprecationWarning): 809 names = posix.listdir(cls(b'.')) 810 self.assertIn(os.fsencode(support.TESTFN), names) 811 for name in names: 812 self.assertIs(type(name), bytes) 813 814 @unittest.skipUnless(posix.listdir in os.supports_fd, 815 "test needs fd support for posix.listdir()") 816 def test_listdir_fd(self): 817 f = posix.open(posix.getcwd(), posix.O_RDONLY) 818 self.addCleanup(posix.close, f) 819 self.assertEqual( 820 sorted(posix.listdir('.')), 821 sorted(posix.listdir(f)) 822 ) 823 # Check that the fd offset was reset (issue #13739) 824 self.assertEqual( 825 sorted(posix.listdir('.')), 826 sorted(posix.listdir(f)) 827 ) 828 829 @unittest.skipUnless(hasattr(posix, 'access'), 'test needs posix.access()') 830 def test_access(self): 831 self.assertTrue(posix.access(support.TESTFN, os.R_OK)) 832 833 @unittest.skipUnless(hasattr(posix, 'umask'), 'test needs posix.umask()') 834 def test_umask(self): 835 old_mask = posix.umask(0) 836 self.assertIsInstance(old_mask, int) 837 posix.umask(old_mask) 838 839 @unittest.skipUnless(hasattr(posix, 'strerror'), 840 'test needs posix.strerror()') 841 def test_strerror(self): 842 self.assertTrue(posix.strerror(0)) 843 844 @unittest.skipUnless(hasattr(posix, 'pipe'), 'test needs posix.pipe()') 845 def test_pipe(self): 846 reader, writer = posix.pipe() 847 os.close(reader) 848 os.close(writer) 849 850 @unittest.skipUnless(hasattr(os, 'pipe2'), "test needs os.pipe2()") 851 @support.requires_linux_version(2, 6, 27) 852 def test_pipe2(self): 853 self.assertRaises(TypeError, os.pipe2, 'DEADBEEF') 854 self.assertRaises(TypeError, os.pipe2, 0, 0) 855 856 # try calling with flags = 0, like os.pipe() 857 r, w = os.pipe2(0) 858 os.close(r) 859 os.close(w) 860 861 # test flags 862 r, w = os.pipe2(os.O_CLOEXEC|os.O_NONBLOCK) 863 self.addCleanup(os.close, r) 864 self.addCleanup(os.close, w) 865 self.assertFalse(os.get_inheritable(r)) 866 self.assertFalse(os.get_inheritable(w)) 867 self.assertFalse(os.get_blocking(r)) 868 self.assertFalse(os.get_blocking(w)) 869 # try reading from an empty pipe: this should fail, not block 870 self.assertRaises(OSError, os.read, r, 1) 871 # try a write big enough to fill-up the pipe: this should either 872 # fail or perform a partial write, not block 873 try: 874 os.write(w, b'x' * support.PIPE_MAX_SIZE) 875 except OSError: 876 pass 877 878 @support.cpython_only 879 @unittest.skipUnless(hasattr(os, 'pipe2'), "test needs os.pipe2()") 880 @support.requires_linux_version(2, 6, 27) 881 def test_pipe2_c_limits(self): 882 # Issue 15989 883 import _testcapi 884 self.assertRaises(OverflowError, os.pipe2, _testcapi.INT_MAX + 1) 885 self.assertRaises(OverflowError, os.pipe2, _testcapi.UINT_MAX + 1) 886 887 @unittest.skipUnless(hasattr(posix, 'utime'), 'test needs posix.utime()') 888 def test_utime(self): 889 now = time.time() 890 posix.utime(support.TESTFN, None) 891 self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, None)) 892 self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None)) 893 self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now)) 894 posix.utime(support.TESTFN, (int(now), int(now))) 895 posix.utime(support.TESTFN, (now, now)) 896 897 def _test_chflags_regular_file(self, chflags_func, target_file, **kwargs): 898 st = os.stat(target_file) 899 self.assertTrue(hasattr(st, 'st_flags')) 900 901 # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE. 902 flags = st.st_flags | stat.UF_IMMUTABLE 903 try: 904 chflags_func(target_file, flags, **kwargs) 905 except OSError as err: 906 if err.errno != errno.EOPNOTSUPP: 907 raise 908 msg = 'chflag UF_IMMUTABLE not supported by underlying fs' 909 self.skipTest(msg) 910 911 try: 912 new_st = os.stat(target_file) 913 self.assertEqual(st.st_flags | stat.UF_IMMUTABLE, new_st.st_flags) 914 try: 915 fd = open(target_file, 'w+') 916 except OSError as e: 917 self.assertEqual(e.errno, errno.EPERM) 918 finally: 919 posix.chflags(target_file, st.st_flags) 920 921 @unittest.skipUnless(hasattr(posix, 'chflags'), 'test needs os.chflags()') 922 def test_chflags(self): 923 self._test_chflags_regular_file(posix.chflags, support.TESTFN) 924 925 @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()') 926 def test_lchflags_regular_file(self): 927 self._test_chflags_regular_file(posix.lchflags, support.TESTFN) 928 self._test_chflags_regular_file(posix.chflags, support.TESTFN, follow_symlinks=False) 929 930 @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()') 931 def test_lchflags_symlink(self): 932 testfn_st = os.stat(support.TESTFN) 933 934 self.assertTrue(hasattr(testfn_st, 'st_flags')) 935 936 os.symlink(support.TESTFN, _DUMMY_SYMLINK) 937 self.teardown_files.append(_DUMMY_SYMLINK) 938 dummy_symlink_st = os.lstat(_DUMMY_SYMLINK) 939 940 def chflags_nofollow(path, flags): 941 return posix.chflags(path, flags, follow_symlinks=False) 942 943 for fn in (posix.lchflags, chflags_nofollow): 944 # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE. 945 flags = dummy_symlink_st.st_flags | stat.UF_IMMUTABLE 946 try: 947 fn(_DUMMY_SYMLINK, flags) 948 except OSError as err: 949 if err.errno != errno.EOPNOTSUPP: 950 raise 951 msg = 'chflag UF_IMMUTABLE not supported by underlying fs' 952 self.skipTest(msg) 953 try: 954 new_testfn_st = os.stat(support.TESTFN) 955 new_dummy_symlink_st = os.lstat(_DUMMY_SYMLINK) 956 957 self.assertEqual(testfn_st.st_flags, new_testfn_st.st_flags) 958 self.assertEqual(dummy_symlink_st.st_flags | stat.UF_IMMUTABLE, 959 new_dummy_symlink_st.st_flags) 960 finally: 961 fn(_DUMMY_SYMLINK, dummy_symlink_st.st_flags) 962 963 def test_environ(self): 964 if os.name == "nt": 965 item_type = str 966 else: 967 item_type = bytes 968 for k, v in posix.environ.items(): 969 self.assertEqual(type(k), item_type) 970 self.assertEqual(type(v), item_type) 971 972 @unittest.skipUnless(hasattr(os, "putenv"), "requires os.putenv()") 973 def test_putenv(self): 974 with self.assertRaises(ValueError): 975 os.putenv('FRUIT\0VEGETABLE', 'cabbage') 976 with self.assertRaises(ValueError): 977 os.putenv(b'FRUIT\0VEGETABLE', b'cabbage') 978 with self.assertRaises(ValueError): 979 os.putenv('FRUIT', 'orange\0VEGETABLE=cabbage') 980 with self.assertRaises(ValueError): 981 os.putenv(b'FRUIT', b'orange\0VEGETABLE=cabbage') 982 with self.assertRaises(ValueError): 983 os.putenv('FRUIT=ORANGE', 'lemon') 984 with self.assertRaises(ValueError): 985 os.putenv(b'FRUIT=ORANGE', b'lemon') 986 987 @unittest.skipUnless(hasattr(posix, 'getcwd'), 'test needs posix.getcwd()') 988 def test_getcwd_long_pathnames(self): 989 dirname = 'getcwd-test-directory-0123456789abcdef-01234567890abcdef' 990 curdir = os.getcwd() 991 base_path = os.path.abspath(support.TESTFN) + '.getcwd' 992 993 try: 994 os.mkdir(base_path) 995 os.chdir(base_path) 996 except: 997 # Just returning nothing instead of the SkipTest exception, because 998 # the test results in Error in that case. Is that ok? 999 # raise unittest.SkipTest("cannot create directory for testing") 1000 return 1001 1002 def _create_and_do_getcwd(dirname, current_path_length = 0): 1003 try: 1004 os.mkdir(dirname) 1005 except: 1006 raise unittest.SkipTest("mkdir cannot create directory sufficiently deep for getcwd test") 1007 1008 os.chdir(dirname) 1009 try: 1010 os.getcwd() 1011 if current_path_length < 1027: 1012 _create_and_do_getcwd(dirname, current_path_length + len(dirname) + 1) 1013 finally: 1014 os.chdir('..') 1015 os.rmdir(dirname) 1016 1017 _create_and_do_getcwd(dirname) 1018 1019 finally: 1020 os.chdir(curdir) 1021 support.rmtree(base_path) 1022 1023 @unittest.skipUnless(hasattr(posix, 'getgrouplist'), "test needs posix.getgrouplist()") 1024 @unittest.skipUnless(hasattr(pwd, 'getpwuid'), "test needs pwd.getpwuid()") 1025 @unittest.skipUnless(hasattr(os, 'getuid'), "test needs os.getuid()") 1026 def test_getgrouplist(self): 1027 user = pwd.getpwuid(os.getuid())[0] 1028 group = pwd.getpwuid(os.getuid())[3] 1029 self.assertIn(group, posix.getgrouplist(user, group)) 1030 1031 1032 @unittest.skipUnless(hasattr(os, 'getegid'), "test needs os.getegid()") 1033 def test_getgroups(self): 1034 with os.popen('id -G 2>/dev/null') as idg: 1035 groups = idg.read().strip() 1036 ret = idg.close() 1037 1038 try: 1039 idg_groups = set(int(g) for g in groups.split()) 1040 except ValueError: 1041 idg_groups = set() 1042 if ret is not None or not idg_groups: 1043 raise unittest.SkipTest("need working 'id -G'") 1044 1045 # Issues 16698: OS X ABIs prior to 10.6 have limits on getgroups() 1046 if sys.platform == 'darwin': 1047 import sysconfig 1048 dt = sysconfig.get_config_var('MACOSX_DEPLOYMENT_TARGET') or '10.0' 1049 if tuple(int(n) for n in dt.split('.')[0:2]) < (10, 6): 1050 raise unittest.SkipTest("getgroups(2) is broken prior to 10.6") 1051 1052 # 'id -G' and 'os.getgroups()' should return the same 1053 # groups, ignoring order, duplicates, and the effective gid. 1054 # #10822/#26944 - It is implementation defined whether 1055 # posix.getgroups() includes the effective gid. 1056 symdiff = idg_groups.symmetric_difference(posix.getgroups()) 1057 self.assertTrue(not symdiff or symdiff == {posix.getegid()}) 1058 1059 # tests for the posix *at functions follow 1060 1061 @unittest.skipUnless(os.access in os.supports_dir_fd, "test needs dir_fd support for os.access()") 1062 def test_access_dir_fd(self): 1063 f = posix.open(posix.getcwd(), posix.O_RDONLY) 1064 try: 1065 self.assertTrue(posix.access(support.TESTFN, os.R_OK, dir_fd=f)) 1066 finally: 1067 posix.close(f) 1068 1069 @unittest.skipUnless(os.chmod in os.supports_dir_fd, "test needs dir_fd support in os.chmod()") 1070 def test_chmod_dir_fd(self): 1071 os.chmod(support.TESTFN, stat.S_IRUSR) 1072 1073 f = posix.open(posix.getcwd(), posix.O_RDONLY) 1074 try: 1075 posix.chmod(support.TESTFN, stat.S_IRUSR | stat.S_IWUSR, dir_fd=f) 1076 1077 s = posix.stat(support.TESTFN) 1078 self.assertEqual(s[0] & stat.S_IRWXU, stat.S_IRUSR | stat.S_IWUSR) 1079 finally: 1080 posix.close(f) 1081 1082 @unittest.skipUnless(os.chown in os.supports_dir_fd, "test needs dir_fd support in os.chown()") 1083 def test_chown_dir_fd(self): 1084 support.unlink(support.TESTFN) 1085 support.create_empty_file(support.TESTFN) 1086 1087 f = posix.open(posix.getcwd(), posix.O_RDONLY) 1088 try: 1089 posix.chown(support.TESTFN, os.getuid(), os.getgid(), dir_fd=f) 1090 finally: 1091 posix.close(f) 1092 1093 @unittest.skipUnless(os.stat in os.supports_dir_fd, "test needs dir_fd support in os.stat()") 1094 def test_stat_dir_fd(self): 1095 support.unlink(support.TESTFN) 1096 with open(support.TESTFN, 'w') as outfile: 1097 outfile.write("testline\n") 1098 1099 f = posix.open(posix.getcwd(), posix.O_RDONLY) 1100 try: 1101 s1 = posix.stat(support.TESTFN) 1102 s2 = posix.stat(support.TESTFN, dir_fd=f) 1103 self.assertEqual(s1, s2) 1104 s2 = posix.stat(support.TESTFN, dir_fd=None) 1105 self.assertEqual(s1, s2) 1106 self.assertRaisesRegex(TypeError, 'should be integer or None, not', 1107 posix.stat, support.TESTFN, dir_fd=posix.getcwd()) 1108 self.assertRaisesRegex(TypeError, 'should be integer or None, not', 1109 posix.stat, support.TESTFN, dir_fd=float(f)) 1110 self.assertRaises(OverflowError, 1111 posix.stat, support.TESTFN, dir_fd=10**20) 1112 finally: 1113 posix.close(f) 1114 1115 @unittest.skipUnless(os.utime in os.supports_dir_fd, "test needs dir_fd support in os.utime()") 1116 def test_utime_dir_fd(self): 1117 f = posix.open(posix.getcwd(), posix.O_RDONLY) 1118 try: 1119 now = time.time() 1120 posix.utime(support.TESTFN, None, dir_fd=f) 1121 posix.utime(support.TESTFN, dir_fd=f) 1122 self.assertRaises(TypeError, posix.utime, support.TESTFN, now, dir_fd=f) 1123 self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, None), dir_fd=f) 1124 self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None), dir_fd=f) 1125 self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now), dir_fd=f) 1126 self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, "x"), dir_fd=f) 1127 posix.utime(support.TESTFN, (int(now), int(now)), dir_fd=f) 1128 posix.utime(support.TESTFN, (now, now), dir_fd=f) 1129 posix.utime(support.TESTFN, 1130 (int(now), int((now - int(now)) * 1e9)), dir_fd=f) 1131 posix.utime(support.TESTFN, dir_fd=f, 1132 times=(int(now), int((now - int(now)) * 1e9))) 1133 1134 # try dir_fd and follow_symlinks together 1135 if os.utime in os.supports_follow_symlinks: 1136 try: 1137 posix.utime(support.TESTFN, follow_symlinks=False, dir_fd=f) 1138 except ValueError: 1139 # whoops! using both together not supported on this platform. 1140 pass 1141 1142 finally: 1143 posix.close(f) 1144 1145 @unittest.skipUnless(os.link in os.supports_dir_fd, "test needs dir_fd support in os.link()") 1146 def test_link_dir_fd(self): 1147 f = posix.open(posix.getcwd(), posix.O_RDONLY) 1148 try: 1149 posix.link(support.TESTFN, support.TESTFN + 'link', src_dir_fd=f, dst_dir_fd=f) 1150 except PermissionError as e: 1151 self.skipTest('posix.link(): %s' % e) 1152 else: 1153 # should have same inodes 1154 self.assertEqual(posix.stat(support.TESTFN)[1], 1155 posix.stat(support.TESTFN + 'link')[1]) 1156 finally: 1157 posix.close(f) 1158 support.unlink(support.TESTFN + 'link') 1159 1160 @unittest.skipUnless(os.mkdir in os.supports_dir_fd, "test needs dir_fd support in os.mkdir()") 1161 def test_mkdir_dir_fd(self): 1162 f = posix.open(posix.getcwd(), posix.O_RDONLY) 1163 try: 1164 posix.mkdir(support.TESTFN + 'dir', dir_fd=f) 1165 posix.stat(support.TESTFN + 'dir') # should not raise exception 1166 finally: 1167 posix.close(f) 1168 support.rmtree(support.TESTFN + 'dir') 1169 1170 @unittest.skipUnless((os.mknod in os.supports_dir_fd) and hasattr(stat, 'S_IFIFO'), 1171 "test requires both stat.S_IFIFO and dir_fd support for os.mknod()") 1172 def test_mknod_dir_fd(self): 1173 # Test using mknodat() to create a FIFO (the only use specified 1174 # by POSIX). 1175 support.unlink(support.TESTFN) 1176 mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR 1177 f = posix.open(posix.getcwd(), posix.O_RDONLY) 1178 try: 1179 posix.mknod(support.TESTFN, mode, 0, dir_fd=f) 1180 except OSError as e: 1181 # Some old systems don't allow unprivileged users to use 1182 # mknod(), or only support creating device nodes. 1183 self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES)) 1184 else: 1185 self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode)) 1186 finally: 1187 posix.close(f) 1188 1189 @unittest.skipUnless(os.open in os.supports_dir_fd, "test needs dir_fd support in os.open()") 1190 def test_open_dir_fd(self): 1191 support.unlink(support.TESTFN) 1192 with open(support.TESTFN, 'w') as outfile: 1193 outfile.write("testline\n") 1194 a = posix.open(posix.getcwd(), posix.O_RDONLY) 1195 b = posix.open(support.TESTFN, posix.O_RDONLY, dir_fd=a) 1196 try: 1197 res = posix.read(b, 9).decode(encoding="utf-8") 1198 self.assertEqual("testline\n", res) 1199 finally: 1200 posix.close(a) 1201 posix.close(b) 1202 1203 @unittest.skipUnless(os.readlink in os.supports_dir_fd, "test needs dir_fd support in os.readlink()") 1204 def test_readlink_dir_fd(self): 1205 os.symlink(support.TESTFN, support.TESTFN + 'link') 1206 f = posix.open(posix.getcwd(), posix.O_RDONLY) 1207 try: 1208 self.assertEqual(posix.readlink(support.TESTFN + 'link'), 1209 posix.readlink(support.TESTFN + 'link', dir_fd=f)) 1210 finally: 1211 support.unlink(support.TESTFN + 'link') 1212 posix.close(f) 1213 1214 @unittest.skipUnless(os.rename in os.supports_dir_fd, "test needs dir_fd support in os.rename()") 1215 def test_rename_dir_fd(self): 1216 support.unlink(support.TESTFN) 1217 support.create_empty_file(support.TESTFN + 'ren') 1218 f = posix.open(posix.getcwd(), posix.O_RDONLY) 1219 try: 1220 posix.rename(support.TESTFN + 'ren', support.TESTFN, src_dir_fd=f, dst_dir_fd=f) 1221 except: 1222 posix.rename(support.TESTFN + 'ren', support.TESTFN) 1223 raise 1224 else: 1225 posix.stat(support.TESTFN) # should not raise exception 1226 finally: 1227 posix.close(f) 1228 1229 @unittest.skipUnless(os.symlink in os.supports_dir_fd, "test needs dir_fd support in os.symlink()") 1230 def test_symlink_dir_fd(self): 1231 f = posix.open(posix.getcwd(), posix.O_RDONLY) 1232 try: 1233 posix.symlink(support.TESTFN, support.TESTFN + 'link', dir_fd=f) 1234 self.assertEqual(posix.readlink(support.TESTFN + 'link'), support.TESTFN) 1235 finally: 1236 posix.close(f) 1237 support.unlink(support.TESTFN + 'link') 1238 1239 @unittest.skipUnless(os.unlink in os.supports_dir_fd, "test needs dir_fd support in os.unlink()") 1240 def test_unlink_dir_fd(self): 1241 f = posix.open(posix.getcwd(), posix.O_RDONLY) 1242 support.create_empty_file(support.TESTFN + 'del') 1243 posix.stat(support.TESTFN + 'del') # should not raise exception 1244 try: 1245 posix.unlink(support.TESTFN + 'del', dir_fd=f) 1246 except: 1247 support.unlink(support.TESTFN + 'del') 1248 raise 1249 else: 1250 self.assertRaises(OSError, posix.stat, support.TESTFN + 'link') 1251 finally: 1252 posix.close(f) 1253 1254 @unittest.skipUnless(os.mkfifo in os.supports_dir_fd, "test needs dir_fd support in os.mkfifo()") 1255 def test_mkfifo_dir_fd(self): 1256 support.unlink(support.TESTFN) 1257 f = posix.open(posix.getcwd(), posix.O_RDONLY) 1258 try: 1259 try: 1260 posix.mkfifo(support.TESTFN, 1261 stat.S_IRUSR | stat.S_IWUSR, dir_fd=f) 1262 except PermissionError as e: 1263 self.skipTest('posix.mkfifo(): %s' % e) 1264 self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode)) 1265 finally: 1266 posix.close(f) 1267 1268 requires_sched_h = unittest.skipUnless(hasattr(posix, 'sched_yield'), 1269 "don't have scheduling support") 1270 requires_sched_affinity = unittest.skipUnless(hasattr(posix, 'sched_setaffinity'), 1271 "don't have sched affinity support") 1272 1273 @requires_sched_h 1274 def test_sched_yield(self): 1275 # This has no error conditions (at least on Linux). 1276 posix.sched_yield() 1277 1278 @requires_sched_h 1279 @unittest.skipUnless(hasattr(posix, 'sched_get_priority_max'), 1280 "requires sched_get_priority_max()") 1281 def test_sched_priority(self): 1282 # Round-robin usually has interesting priorities. 1283 pol = posix.SCHED_RR 1284 lo = posix.sched_get_priority_min(pol) 1285 hi = posix.sched_get_priority_max(pol) 1286 self.assertIsInstance(lo, int) 1287 self.assertIsInstance(hi, int) 1288 self.assertGreaterEqual(hi, lo) 1289 # OSX evidently just returns 15 without checking the argument. 1290 if sys.platform != "darwin": 1291 self.assertRaises(OSError, posix.sched_get_priority_min, -23) 1292 self.assertRaises(OSError, posix.sched_get_priority_max, -23) 1293 1294 @requires_sched 1295 def test_get_and_set_scheduler_and_param(self): 1296 possible_schedulers = [sched for name, sched in posix.__dict__.items() 1297 if name.startswith("SCHED_")] 1298 mine = posix.sched_getscheduler(0) 1299 self.assertIn(mine, possible_schedulers) 1300 try: 1301 parent = posix.sched_getscheduler(os.getppid()) 1302 except OSError as e: 1303 if e.errno != errno.EPERM: 1304 raise 1305 else: 1306 self.assertIn(parent, possible_schedulers) 1307 self.assertRaises(OSError, posix.sched_getscheduler, -1) 1308 self.assertRaises(OSError, posix.sched_getparam, -1) 1309 param = posix.sched_getparam(0) 1310 self.assertIsInstance(param.sched_priority, int) 1311 1312 # POSIX states that calling sched_setparam() or sched_setscheduler() on 1313 # a process with a scheduling policy other than SCHED_FIFO or SCHED_RR 1314 # is implementation-defined: NetBSD and FreeBSD can return EINVAL. 1315 if not sys.platform.startswith(('freebsd', 'netbsd')): 1316 try: 1317 posix.sched_setscheduler(0, mine, param) 1318 posix.sched_setparam(0, param) 1319 except OSError as e: 1320 if e.errno != errno.EPERM: 1321 raise 1322 self.assertRaises(OSError, posix.sched_setparam, -1, param) 1323 1324 self.assertRaises(OSError, posix.sched_setscheduler, -1, mine, param) 1325 self.assertRaises(TypeError, posix.sched_setscheduler, 0, mine, None) 1326 self.assertRaises(TypeError, posix.sched_setparam, 0, 43) 1327 param = posix.sched_param(None) 1328 self.assertRaises(TypeError, posix.sched_setparam, 0, param) 1329 large = 214748364700 1330 param = posix.sched_param(large) 1331 self.assertRaises(OverflowError, posix.sched_setparam, 0, param) 1332 param = posix.sched_param(sched_priority=-large) 1333 self.assertRaises(OverflowError, posix.sched_setparam, 0, param) 1334 1335 @unittest.skipUnless(hasattr(posix, "sched_rr_get_interval"), "no function") 1336 def test_sched_rr_get_interval(self): 1337 try: 1338 interval = posix.sched_rr_get_interval(0) 1339 except OSError as e: 1340 # This likely means that sched_rr_get_interval is only valid for 1341 # processes with the SCHED_RR scheduler in effect. 1342 if e.errno != errno.EINVAL: 1343 raise 1344 self.skipTest("only works on SCHED_RR processes") 1345 self.assertIsInstance(interval, float) 1346 # Reasonable constraints, I think. 1347 self.assertGreaterEqual(interval, 0.) 1348 self.assertLess(interval, 1.) 1349 1350 @requires_sched_affinity 1351 def test_sched_getaffinity(self): 1352 mask = posix.sched_getaffinity(0) 1353 self.assertIsInstance(mask, set) 1354 self.assertGreaterEqual(len(mask), 1) 1355 self.assertRaises(OSError, posix.sched_getaffinity, -1) 1356 for cpu in mask: 1357 self.assertIsInstance(cpu, int) 1358 self.assertGreaterEqual(cpu, 0) 1359 self.assertLess(cpu, 1 << 32) 1360 1361 @requires_sched_affinity 1362 def test_sched_setaffinity(self): 1363 mask = posix.sched_getaffinity(0) 1364 if len(mask) > 1: 1365 # Empty masks are forbidden 1366 mask.pop() 1367 posix.sched_setaffinity(0, mask) 1368 self.assertEqual(posix.sched_getaffinity(0), mask) 1369 self.assertRaises(OSError, posix.sched_setaffinity, 0, []) 1370 self.assertRaises(ValueError, posix.sched_setaffinity, 0, [-10]) 1371 self.assertRaises(ValueError, posix.sched_setaffinity, 0, map(int, "0X")) 1372 self.assertRaises(OverflowError, posix.sched_setaffinity, 0, [1<<128]) 1373 self.assertRaises(OSError, posix.sched_setaffinity, -1, mask) 1374 1375 def test_rtld_constants(self): 1376 # check presence of major RTLD_* constants 1377 posix.RTLD_LAZY 1378 posix.RTLD_NOW 1379 posix.RTLD_GLOBAL 1380 posix.RTLD_LOCAL 1381 1382 @unittest.skipUnless(hasattr(os, 'SEEK_HOLE'), 1383 "test needs an OS that reports file holes") 1384 def test_fs_holes(self): 1385 # Even if the filesystem doesn't report holes, 1386 # if the OS supports it the SEEK_* constants 1387 # will be defined and will have a consistent 1388 # behaviour: 1389 # os.SEEK_DATA = current position 1390 # os.SEEK_HOLE = end of file position 1391 with open(support.TESTFN, 'r+b') as fp: 1392 fp.write(b"hello") 1393 fp.flush() 1394 size = fp.tell() 1395 fno = fp.fileno() 1396 try : 1397 for i in range(size): 1398 self.assertEqual(i, os.lseek(fno, i, os.SEEK_DATA)) 1399 self.assertLessEqual(size, os.lseek(fno, i, os.SEEK_HOLE)) 1400 self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_DATA) 1401 self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_HOLE) 1402 except OSError : 1403 # Some OSs claim to support SEEK_HOLE/SEEK_DATA 1404 # but it is not true. 1405 # For instance: 1406 # http://lists.freebsd.org/pipermail/freebsd-amd64/2012-January/014332.html 1407 raise unittest.SkipTest("OSError raised!") 1408 1409 def test_path_error2(self): 1410 """ 1411 Test functions that call path_error2(), providing two filenames in their exceptions. 1412 """ 1413 for name in ("rename", "replace", "link"): 1414 function = getattr(os, name, None) 1415 if function is None: 1416 continue 1417 1418 for dst in ("noodly2", support.TESTFN): 1419 try: 1420 function('doesnotexistfilename', dst) 1421 except OSError as e: 1422 self.assertIn("'doesnotexistfilename' -> '{}'".format(dst), str(e)) 1423 break 1424 else: 1425 self.fail("No valid path_error2() test for os." + name) 1426 1427 def test_path_with_null_character(self): 1428 fn = support.TESTFN 1429 fn_with_NUL = fn + '\0' 1430 self.addCleanup(support.unlink, fn) 1431 support.unlink(fn) 1432 fd = None 1433 try: 1434 with self.assertRaises(ValueError): 1435 fd = os.open(fn_with_NUL, os.O_WRONLY | os.O_CREAT) # raises 1436 finally: 1437 if fd is not None: 1438 os.close(fd) 1439 self.assertFalse(os.path.exists(fn)) 1440 self.assertRaises(ValueError, os.mkdir, fn_with_NUL) 1441 self.assertFalse(os.path.exists(fn)) 1442 open(fn, 'wb').close() 1443 self.assertRaises(ValueError, os.stat, fn_with_NUL) 1444 1445 def test_path_with_null_byte(self): 1446 fn = os.fsencode(support.TESTFN) 1447 fn_with_NUL = fn + b'\0' 1448 self.addCleanup(support.unlink, fn) 1449 support.unlink(fn) 1450 fd = None 1451 try: 1452 with self.assertRaises(ValueError): 1453 fd = os.open(fn_with_NUL, os.O_WRONLY | os.O_CREAT) # raises 1454 finally: 1455 if fd is not None: 1456 os.close(fd) 1457 self.assertFalse(os.path.exists(fn)) 1458 self.assertRaises(ValueError, os.mkdir, fn_with_NUL) 1459 self.assertFalse(os.path.exists(fn)) 1460 open(fn, 'wb').close() 1461 self.assertRaises(ValueError, os.stat, fn_with_NUL) 1462 1463class PosixGroupsTester(unittest.TestCase): 1464 1465 def setUp(self): 1466 if posix.getuid() != 0: 1467 raise unittest.SkipTest("not enough privileges") 1468 if not hasattr(posix, 'getgroups'): 1469 raise unittest.SkipTest("need posix.getgroups") 1470 if sys.platform == 'darwin': 1471 raise unittest.SkipTest("getgroups(2) is broken on OSX") 1472 self.saved_groups = posix.getgroups() 1473 1474 def tearDown(self): 1475 if hasattr(posix, 'setgroups'): 1476 posix.setgroups(self.saved_groups) 1477 elif hasattr(posix, 'initgroups'): 1478 name = pwd.getpwuid(posix.getuid()).pw_name 1479 posix.initgroups(name, self.saved_groups[0]) 1480 1481 @unittest.skipUnless(hasattr(posix, 'initgroups'), 1482 "test needs posix.initgroups()") 1483 def test_initgroups(self): 1484 # find missing group 1485 1486 g = max(self.saved_groups or [0]) + 1 1487 name = pwd.getpwuid(posix.getuid()).pw_name 1488 posix.initgroups(name, g) 1489 self.assertIn(g, posix.getgroups()) 1490 1491 @unittest.skipUnless(hasattr(posix, 'setgroups'), 1492 "test needs posix.setgroups()") 1493 def test_setgroups(self): 1494 for groups in [[0], list(range(16))]: 1495 posix.setgroups(groups) 1496 self.assertListEqual(groups, posix.getgroups()) 1497 1498 1499class _PosixSpawnMixin: 1500 # Program which does nothing and exits with status 0 (success) 1501 NOOP_PROGRAM = (sys.executable, '-I', '-S', '-c', 'pass') 1502 spawn_func = None 1503 1504 def python_args(self, *args): 1505 # Disable site module to avoid side effects. For example, 1506 # on Fedora 28, if the HOME environment variable is not set, 1507 # site._getuserbase() calls pwd.getpwuid() which opens 1508 # /var/lib/sss/mc/passwd but then leaves the file open which makes 1509 # test_close_file() to fail. 1510 return (sys.executable, '-I', '-S', *args) 1511 1512 def test_returns_pid(self): 1513 pidfile = support.TESTFN 1514 self.addCleanup(support.unlink, pidfile) 1515 script = f"""if 1: 1516 import os 1517 with open({pidfile!r}, "w") as pidfile: 1518 pidfile.write(str(os.getpid())) 1519 """ 1520 args = self.python_args('-c', script) 1521 pid = self.spawn_func(args[0], args, os.environ) 1522 self.assertEqual(os.waitpid(pid, 0), (pid, 0)) 1523 with open(pidfile) as f: 1524 self.assertEqual(f.read(), str(pid)) 1525 1526 def test_no_such_executable(self): 1527 no_such_executable = 'no_such_executable' 1528 try: 1529 pid = self.spawn_func(no_such_executable, 1530 [no_such_executable], 1531 os.environ) 1532 # bpo-35794: PermissionError can be raised if there are 1533 # directories in the $PATH that are not accessible. 1534 except (FileNotFoundError, PermissionError) as exc: 1535 self.assertEqual(exc.filename, no_such_executable) 1536 else: 1537 pid2, status = os.waitpid(pid, 0) 1538 self.assertEqual(pid2, pid) 1539 self.assertNotEqual(status, 0) 1540 1541 def test_specify_environment(self): 1542 envfile = support.TESTFN 1543 self.addCleanup(support.unlink, envfile) 1544 script = f"""if 1: 1545 import os 1546 with open({envfile!r}, "w") as envfile: 1547 envfile.write(os.environ['foo']) 1548 """ 1549 args = self.python_args('-c', script) 1550 pid = self.spawn_func(args[0], args, 1551 {**os.environ, 'foo': 'bar'}) 1552 self.assertEqual(os.waitpid(pid, 0), (pid, 0)) 1553 with open(envfile) as f: 1554 self.assertEqual(f.read(), 'bar') 1555 1556 def test_none_file_actions(self): 1557 pid = self.spawn_func( 1558 self.NOOP_PROGRAM[0], 1559 self.NOOP_PROGRAM, 1560 os.environ, 1561 file_actions=None 1562 ) 1563 self.assertEqual(os.waitpid(pid, 0), (pid, 0)) 1564 1565 def test_empty_file_actions(self): 1566 pid = self.spawn_func( 1567 self.NOOP_PROGRAM[0], 1568 self.NOOP_PROGRAM, 1569 os.environ, 1570 file_actions=[] 1571 ) 1572 self.assertEqual(os.waitpid(pid, 0), (pid, 0)) 1573 1574 def test_resetids_explicit_default(self): 1575 pid = self.spawn_func( 1576 sys.executable, 1577 [sys.executable, '-c', 'pass'], 1578 os.environ, 1579 resetids=False 1580 ) 1581 self.assertEqual(os.waitpid(pid, 0), (pid, 0)) 1582 1583 def test_resetids(self): 1584 pid = self.spawn_func( 1585 sys.executable, 1586 [sys.executable, '-c', 'pass'], 1587 os.environ, 1588 resetids=True 1589 ) 1590 self.assertEqual(os.waitpid(pid, 0), (pid, 0)) 1591 1592 def test_resetids_wrong_type(self): 1593 with self.assertRaises(TypeError): 1594 self.spawn_func(sys.executable, 1595 [sys.executable, "-c", "pass"], 1596 os.environ, resetids=None) 1597 1598 def test_setpgroup(self): 1599 pid = self.spawn_func( 1600 sys.executable, 1601 [sys.executable, '-c', 'pass'], 1602 os.environ, 1603 setpgroup=os.getpgrp() 1604 ) 1605 self.assertEqual(os.waitpid(pid, 0), (pid, 0)) 1606 1607 def test_setpgroup_wrong_type(self): 1608 with self.assertRaises(TypeError): 1609 self.spawn_func(sys.executable, 1610 [sys.executable, "-c", "pass"], 1611 os.environ, setpgroup="023") 1612 1613 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 1614 'need signal.pthread_sigmask()') 1615 def test_setsigmask(self): 1616 code = textwrap.dedent("""\ 1617 import signal 1618 signal.raise_signal(signal.SIGUSR1)""") 1619 1620 pid = self.spawn_func( 1621 sys.executable, 1622 [sys.executable, '-c', code], 1623 os.environ, 1624 setsigmask=[signal.SIGUSR1] 1625 ) 1626 self.assertEqual(os.waitpid(pid, 0), (pid, 0)) 1627 1628 def test_setsigmask_wrong_type(self): 1629 with self.assertRaises(TypeError): 1630 self.spawn_func(sys.executable, 1631 [sys.executable, "-c", "pass"], 1632 os.environ, setsigmask=34) 1633 with self.assertRaises(TypeError): 1634 self.spawn_func(sys.executable, 1635 [sys.executable, "-c", "pass"], 1636 os.environ, setsigmask=["j"]) 1637 with self.assertRaises(ValueError): 1638 self.spawn_func(sys.executable, 1639 [sys.executable, "-c", "pass"], 1640 os.environ, setsigmask=[signal.NSIG, 1641 signal.NSIG+1]) 1642 1643 def test_setsid(self): 1644 rfd, wfd = os.pipe() 1645 self.addCleanup(os.close, rfd) 1646 try: 1647 os.set_inheritable(wfd, True) 1648 1649 code = textwrap.dedent(f""" 1650 import os 1651 fd = {wfd} 1652 sid = os.getsid(0) 1653 os.write(fd, str(sid).encode()) 1654 """) 1655 1656 try: 1657 pid = self.spawn_func(sys.executable, 1658 [sys.executable, "-c", code], 1659 os.environ, setsid=True) 1660 except NotImplementedError as exc: 1661 self.skipTest(f"setsid is not supported: {exc!r}") 1662 except PermissionError as exc: 1663 self.skipTest(f"setsid failed with: {exc!r}") 1664 finally: 1665 os.close(wfd) 1666 1667 self.assertEqual(os.waitpid(pid, 0), (pid, 0)) 1668 output = os.read(rfd, 100) 1669 child_sid = int(output) 1670 parent_sid = os.getsid(os.getpid()) 1671 self.assertNotEqual(parent_sid, child_sid) 1672 1673 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 1674 'need signal.pthread_sigmask()') 1675 def test_setsigdef(self): 1676 original_handler = signal.signal(signal.SIGUSR1, signal.SIG_IGN) 1677 code = textwrap.dedent("""\ 1678 import signal 1679 signal.raise_signal(signal.SIGUSR1)""") 1680 try: 1681 pid = self.spawn_func( 1682 sys.executable, 1683 [sys.executable, '-c', code], 1684 os.environ, 1685 setsigdef=[signal.SIGUSR1] 1686 ) 1687 finally: 1688 signal.signal(signal.SIGUSR1, original_handler) 1689 1690 pid2, status = os.waitpid(pid, 0) 1691 self.assertEqual(pid2, pid) 1692 self.assertTrue(os.WIFSIGNALED(status), status) 1693 self.assertEqual(os.WTERMSIG(status), signal.SIGUSR1) 1694 1695 def test_setsigdef_wrong_type(self): 1696 with self.assertRaises(TypeError): 1697 self.spawn_func(sys.executable, 1698 [sys.executable, "-c", "pass"], 1699 os.environ, setsigdef=34) 1700 with self.assertRaises(TypeError): 1701 self.spawn_func(sys.executable, 1702 [sys.executable, "-c", "pass"], 1703 os.environ, setsigdef=["j"]) 1704 with self.assertRaises(ValueError): 1705 self.spawn_func(sys.executable, 1706 [sys.executable, "-c", "pass"], 1707 os.environ, setsigdef=[signal.NSIG, signal.NSIG+1]) 1708 1709 @requires_sched 1710 @unittest.skipIf(sys.platform.startswith(('freebsd', 'netbsd')), 1711 "bpo-34685: test can fail on BSD") 1712 def test_setscheduler_only_param(self): 1713 policy = os.sched_getscheduler(0) 1714 priority = os.sched_get_priority_min(policy) 1715 code = textwrap.dedent(f"""\ 1716 import os, sys 1717 if os.sched_getscheduler(0) != {policy}: 1718 sys.exit(101) 1719 if os.sched_getparam(0).sched_priority != {priority}: 1720 sys.exit(102)""") 1721 pid = self.spawn_func( 1722 sys.executable, 1723 [sys.executable, '-c', code], 1724 os.environ, 1725 scheduler=(None, os.sched_param(priority)) 1726 ) 1727 self.assertEqual(os.waitpid(pid, 0), (pid, 0)) 1728 1729 @requires_sched 1730 @unittest.skipIf(sys.platform.startswith(('freebsd', 'netbsd')), 1731 "bpo-34685: test can fail on BSD") 1732 def test_setscheduler_with_policy(self): 1733 policy = os.sched_getscheduler(0) 1734 priority = os.sched_get_priority_min(policy) 1735 code = textwrap.dedent(f"""\ 1736 import os, sys 1737 if os.sched_getscheduler(0) != {policy}: 1738 sys.exit(101) 1739 if os.sched_getparam(0).sched_priority != {priority}: 1740 sys.exit(102)""") 1741 pid = self.spawn_func( 1742 sys.executable, 1743 [sys.executable, '-c', code], 1744 os.environ, 1745 scheduler=(policy, os.sched_param(priority)) 1746 ) 1747 self.assertEqual(os.waitpid(pid, 0), (pid, 0)) 1748 1749 def test_multiple_file_actions(self): 1750 file_actions = [ 1751 (os.POSIX_SPAWN_OPEN, 3, os.path.realpath(__file__), os.O_RDONLY, 0), 1752 (os.POSIX_SPAWN_CLOSE, 0), 1753 (os.POSIX_SPAWN_DUP2, 1, 4), 1754 ] 1755 pid = self.spawn_func(self.NOOP_PROGRAM[0], 1756 self.NOOP_PROGRAM, 1757 os.environ, 1758 file_actions=file_actions) 1759 self.assertEqual(os.waitpid(pid, 0), (pid, 0)) 1760 1761 def test_bad_file_actions(self): 1762 args = self.NOOP_PROGRAM 1763 with self.assertRaises(TypeError): 1764 self.spawn_func(args[0], args, os.environ, 1765 file_actions=[None]) 1766 with self.assertRaises(TypeError): 1767 self.spawn_func(args[0], args, os.environ, 1768 file_actions=[()]) 1769 with self.assertRaises(TypeError): 1770 self.spawn_func(args[0], args, os.environ, 1771 file_actions=[(None,)]) 1772 with self.assertRaises(TypeError): 1773 self.spawn_func(args[0], args, os.environ, 1774 file_actions=[(12345,)]) 1775 with self.assertRaises(TypeError): 1776 self.spawn_func(args[0], args, os.environ, 1777 file_actions=[(os.POSIX_SPAWN_CLOSE,)]) 1778 with self.assertRaises(TypeError): 1779 self.spawn_func(args[0], args, os.environ, 1780 file_actions=[(os.POSIX_SPAWN_CLOSE, 1, 2)]) 1781 with self.assertRaises(TypeError): 1782 self.spawn_func(args[0], args, os.environ, 1783 file_actions=[(os.POSIX_SPAWN_CLOSE, None)]) 1784 with self.assertRaises(ValueError): 1785 self.spawn_func(args[0], args, os.environ, 1786 file_actions=[(os.POSIX_SPAWN_OPEN, 1787 3, __file__ + '\0', 1788 os.O_RDONLY, 0)]) 1789 1790 def test_open_file(self): 1791 outfile = support.TESTFN 1792 self.addCleanup(support.unlink, outfile) 1793 script = """if 1: 1794 import sys 1795 sys.stdout.write("hello") 1796 """ 1797 file_actions = [ 1798 (os.POSIX_SPAWN_OPEN, 1, outfile, 1799 os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 1800 stat.S_IRUSR | stat.S_IWUSR), 1801 ] 1802 args = self.python_args('-c', script) 1803 pid = self.spawn_func(args[0], args, os.environ, 1804 file_actions=file_actions) 1805 self.assertEqual(os.waitpid(pid, 0), (pid, 0)) 1806 with open(outfile) as f: 1807 self.assertEqual(f.read(), 'hello') 1808 1809 def test_close_file(self): 1810 closefile = support.TESTFN 1811 self.addCleanup(support.unlink, closefile) 1812 script = f"""if 1: 1813 import os 1814 try: 1815 os.fstat(0) 1816 except OSError as e: 1817 with open({closefile!r}, 'w') as closefile: 1818 closefile.write('is closed %d' % e.errno) 1819 """ 1820 args = self.python_args('-c', script) 1821 pid = self.spawn_func(args[0], args, os.environ, 1822 file_actions=[(os.POSIX_SPAWN_CLOSE, 0)]) 1823 self.assertEqual(os.waitpid(pid, 0), (pid, 0)) 1824 with open(closefile) as f: 1825 self.assertEqual(f.read(), 'is closed %d' % errno.EBADF) 1826 1827 def test_dup2(self): 1828 dupfile = support.TESTFN 1829 self.addCleanup(support.unlink, dupfile) 1830 script = """if 1: 1831 import sys 1832 sys.stdout.write("hello") 1833 """ 1834 with open(dupfile, "wb") as childfile: 1835 file_actions = [ 1836 (os.POSIX_SPAWN_DUP2, childfile.fileno(), 1), 1837 ] 1838 args = self.python_args('-c', script) 1839 pid = self.spawn_func(args[0], args, os.environ, 1840 file_actions=file_actions) 1841 self.assertEqual(os.waitpid(pid, 0), (pid, 0)) 1842 with open(dupfile) as f: 1843 self.assertEqual(f.read(), 'hello') 1844 1845 1846@unittest.skipUnless(hasattr(os, 'posix_spawn'), "test needs os.posix_spawn") 1847class TestPosixSpawn(unittest.TestCase, _PosixSpawnMixin): 1848 spawn_func = getattr(posix, 'posix_spawn', None) 1849 1850 1851@unittest.skipUnless(hasattr(os, 'posix_spawnp'), "test needs os.posix_spawnp") 1852class TestPosixSpawnP(unittest.TestCase, _PosixSpawnMixin): 1853 spawn_func = getattr(posix, 'posix_spawnp', None) 1854 1855 @support.skip_unless_symlink 1856 def test_posix_spawnp(self): 1857 # Use a symlink to create a program in its own temporary directory 1858 temp_dir = tempfile.mkdtemp() 1859 self.addCleanup(support.rmtree, temp_dir) 1860 1861 program = 'posix_spawnp_test_program.exe' 1862 program_fullpath = os.path.join(temp_dir, program) 1863 os.symlink(sys.executable, program_fullpath) 1864 1865 try: 1866 path = os.pathsep.join((temp_dir, os.environ['PATH'])) 1867 except KeyError: 1868 path = temp_dir # PATH is not set 1869 1870 spawn_args = (program, '-I', '-S', '-c', 'pass') 1871 code = textwrap.dedent(""" 1872 import os 1873 args = %a 1874 pid = os.posix_spawnp(args[0], args, os.environ) 1875 pid2, status = os.waitpid(pid, 0) 1876 if pid2 != pid: 1877 raise Exception(f"pid {pid2} != {pid}") 1878 if status != 0: 1879 raise Exception(f"status {status} != 0") 1880 """ % (spawn_args,)) 1881 1882 # Use a subprocess to test os.posix_spawnp() with a modified PATH 1883 # environment variable: posix_spawnp() uses the current environment 1884 # to locate the program, not its environment argument. 1885 args = ('-c', code) 1886 assert_python_ok(*args, PATH=path) 1887 1888 1889def test_main(): 1890 try: 1891 support.run_unittest( 1892 PosixTester, 1893 PosixGroupsTester, 1894 TestPosixSpawn, 1895 TestPosixSpawnP, 1896 ) 1897 finally: 1898 support.reap_children() 1899 1900if __name__ == '__main__': 1901 test_main() 1902