1"Test posix functions" 2 3from test import support 4from test.support import import_helper 5from test.support import os_helper 6from test.support import warnings_helper 7from test.support.script_helper import assert_python_ok 8 9# Skip these tests if there is no posix module. 10posix = import_helper.import_module('posix') 11 12import errno 13import sys 14import signal 15import time 16import os 17import platform 18import pwd 19import stat 20import tempfile 21import unittest 22import warnings 23import textwrap 24 25_DUMMY_SYMLINK = os.path.join(tempfile.gettempdir(), 26 os_helper.TESTFN + '-dummy-symlink') 27 28requires_32b = unittest.skipUnless(sys.maxsize < 2**32, 29 'test is only meaningful on 32-bit builds') 30 31def _supports_sched(): 32 if not hasattr(posix, 'sched_getscheduler'): 33 return False 34 try: 35 posix.sched_getscheduler(0) 36 except OSError as e: 37 if e.errno == errno.ENOSYS: 38 return False 39 return True 40 41requires_sched = unittest.skipUnless(_supports_sched(), 'requires POSIX scheduler API') 42 43 44class PosixTester(unittest.TestCase): 45 46 def setUp(self): 47 # create empty file 48 with open(os_helper.TESTFN, "wb"): 49 pass 50 self.teardown_files = [ os_helper.TESTFN ] 51 self._warnings_manager = warnings_helper.check_warnings() 52 self._warnings_manager.__enter__() 53 warnings.filterwarnings('ignore', '.* potential security risk .*', 54 RuntimeWarning) 55 56 def tearDown(self): 57 for teardown_file in self.teardown_files: 58 os_helper.unlink(teardown_file) 59 self._warnings_manager.__exit__(None, None, None) 60 61 def testNoArgFunctions(self): 62 # test posix functions which take no arguments and have 63 # no side-effects which we need to cleanup (e.g., fork, wait, abort) 64 NO_ARG_FUNCTIONS = [ "ctermid", "getcwd", "getcwdb", "uname", 65 "times", "getloadavg", 66 "getegid", "geteuid", "getgid", "getgroups", 67 "getpid", "getpgrp", "getppid", "getuid", "sync", 68 ] 69 70 for name in NO_ARG_FUNCTIONS: 71 posix_func = getattr(posix, name, None) 72 if posix_func is not None: 73 posix_func() 74 self.assertRaises(TypeError, posix_func, 1) 75 76 @unittest.skipUnless(hasattr(posix, 'getresuid'), 77 'test needs posix.getresuid()') 78 def test_getresuid(self): 79 user_ids = posix.getresuid() 80 self.assertEqual(len(user_ids), 3) 81 for val in user_ids: 82 self.assertGreaterEqual(val, 0) 83 84 @unittest.skipUnless(hasattr(posix, 'getresgid'), 85 'test needs posix.getresgid()') 86 def test_getresgid(self): 87 group_ids = posix.getresgid() 88 self.assertEqual(len(group_ids), 3) 89 for val in group_ids: 90 self.assertGreaterEqual(val, 0) 91 92 @unittest.skipUnless(hasattr(posix, 'setresuid'), 93 'test needs posix.setresuid()') 94 def test_setresuid(self): 95 current_user_ids = posix.getresuid() 96 self.assertIsNone(posix.setresuid(*current_user_ids)) 97 # -1 means don't change that value. 98 self.assertIsNone(posix.setresuid(-1, -1, -1)) 99 100 @unittest.skipUnless(hasattr(posix, 'setresuid'), 101 'test needs posix.setresuid()') 102 def test_setresuid_exception(self): 103 # Don't do this test if someone is silly enough to run us as root. 104 current_user_ids = posix.getresuid() 105 if 0 not in current_user_ids: 106 new_user_ids = (current_user_ids[0]+1, -1, -1) 107 self.assertRaises(OSError, posix.setresuid, *new_user_ids) 108 109 @unittest.skipUnless(hasattr(posix, 'setresgid'), 110 'test needs posix.setresgid()') 111 def test_setresgid(self): 112 current_group_ids = posix.getresgid() 113 self.assertIsNone(posix.setresgid(*current_group_ids)) 114 # -1 means don't change that value. 115 self.assertIsNone(posix.setresgid(-1, -1, -1)) 116 117 @unittest.skipUnless(hasattr(posix, 'setresgid'), 118 'test needs posix.setresgid()') 119 def test_setresgid_exception(self): 120 # Don't do this test if someone is silly enough to run us as root. 121 current_group_ids = posix.getresgid() 122 if 0 not in current_group_ids: 123 new_group_ids = (current_group_ids[0]+1, -1, -1) 124 self.assertRaises(OSError, posix.setresgid, *new_group_ids) 125 126 @unittest.skipUnless(hasattr(posix, 'initgroups'), 127 "test needs os.initgroups()") 128 def test_initgroups(self): 129 # It takes a string and an integer; check that it raises a TypeError 130 # for other argument lists. 131 self.assertRaises(TypeError, posix.initgroups) 132 self.assertRaises(TypeError, posix.initgroups, None) 133 self.assertRaises(TypeError, posix.initgroups, 3, "foo") 134 self.assertRaises(TypeError, posix.initgroups, "foo", 3, object()) 135 136 # If a non-privileged user invokes it, it should fail with OSError 137 # EPERM. 138 if os.getuid() != 0: 139 try: 140 name = pwd.getpwuid(posix.getuid()).pw_name 141 except KeyError: 142 # the current UID may not have a pwd entry 143 raise unittest.SkipTest("need a pwd entry") 144 try: 145 posix.initgroups(name, 13) 146 except OSError as e: 147 self.assertEqual(e.errno, errno.EPERM) 148 else: 149 self.fail("Expected OSError to be raised by initgroups") 150 151 @unittest.skipUnless(hasattr(posix, 'statvfs'), 152 'test needs posix.statvfs()') 153 def test_statvfs(self): 154 self.assertTrue(posix.statvfs(os.curdir)) 155 156 @unittest.skipUnless(hasattr(posix, 'fstatvfs'), 157 'test needs posix.fstatvfs()') 158 def test_fstatvfs(self): 159 fp = open(os_helper.TESTFN) 160 try: 161 self.assertTrue(posix.fstatvfs(fp.fileno())) 162 self.assertTrue(posix.statvfs(fp.fileno())) 163 finally: 164 fp.close() 165 166 @unittest.skipUnless(hasattr(posix, 'ftruncate'), 167 'test needs posix.ftruncate()') 168 def test_ftruncate(self): 169 fp = open(os_helper.TESTFN, 'w+') 170 try: 171 # we need to have some data to truncate 172 fp.write('test') 173 fp.flush() 174 posix.ftruncate(fp.fileno(), 0) 175 finally: 176 fp.close() 177 178 @unittest.skipUnless(hasattr(posix, 'truncate'), "test needs posix.truncate()") 179 def test_truncate(self): 180 with open(os_helper.TESTFN, 'w') as fp: 181 fp.write('test') 182 fp.flush() 183 posix.truncate(os_helper.TESTFN, 0) 184 185 @unittest.skipUnless(getattr(os, 'execve', None) in os.supports_fd, "test needs execve() to support the fd parameter") 186 @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") 187 def test_fexecve(self): 188 fp = os.open(sys.executable, os.O_RDONLY) 189 try: 190 pid = os.fork() 191 if pid == 0: 192 os.chdir(os.path.split(sys.executable)[0]) 193 posix.execve(fp, [sys.executable, '-c', 'pass'], os.environ) 194 else: 195 support.wait_process(pid, exitcode=0) 196 finally: 197 os.close(fp) 198 199 200 @unittest.skipUnless(hasattr(posix, 'waitid'), "test needs posix.waitid()") 201 @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") 202 def test_waitid(self): 203 pid = os.fork() 204 if pid == 0: 205 os.chdir(os.path.split(sys.executable)[0]) 206 posix.execve(sys.executable, [sys.executable, '-c', 'pass'], os.environ) 207 else: 208 res = posix.waitid(posix.P_PID, pid, posix.WEXITED) 209 self.assertEqual(pid, res.si_pid) 210 211 @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") 212 def test_register_at_fork(self): 213 with self.assertRaises(TypeError, msg="Positional args not allowed"): 214 os.register_at_fork(lambda: None) 215 with self.assertRaises(TypeError, msg="Args must be callable"): 216 os.register_at_fork(before=2) 217 with self.assertRaises(TypeError, msg="Args must be callable"): 218 os.register_at_fork(after_in_child="three") 219 with self.assertRaises(TypeError, msg="Args must be callable"): 220 os.register_at_fork(after_in_parent=b"Five") 221 with self.assertRaises(TypeError, msg="Args must not be None"): 222 os.register_at_fork(before=None) 223 with self.assertRaises(TypeError, msg="Args must not be None"): 224 os.register_at_fork(after_in_child=None) 225 with self.assertRaises(TypeError, msg="Args must not be None"): 226 os.register_at_fork(after_in_parent=None) 227 with self.assertRaises(TypeError, msg="Invalid arg was allowed"): 228 # Ensure a combination of valid and invalid is an error. 229 os.register_at_fork(before=None, after_in_parent=lambda: 3) 230 with self.assertRaises(TypeError, msg="Invalid arg was allowed"): 231 # Ensure a combination of valid and invalid is an error. 232 os.register_at_fork(before=lambda: None, after_in_child='') 233 # We test actual registrations in their own process so as not to 234 # pollute this one. There is no way to unregister for cleanup. 235 code = """if 1: 236 import os 237 238 r, w = os.pipe() 239 fin_r, fin_w = os.pipe() 240 241 os.register_at_fork(before=lambda: os.write(w, b'A')) 242 os.register_at_fork(after_in_parent=lambda: os.write(w, b'C')) 243 os.register_at_fork(after_in_child=lambda: os.write(w, b'E')) 244 os.register_at_fork(before=lambda: os.write(w, b'B'), 245 after_in_parent=lambda: os.write(w, b'D'), 246 after_in_child=lambda: os.write(w, b'F')) 247 248 pid = os.fork() 249 if pid == 0: 250 # At this point, after-forkers have already been executed 251 os.close(w) 252 # Wait for parent to tell us to exit 253 os.read(fin_r, 1) 254 os._exit(0) 255 else: 256 try: 257 os.close(w) 258 with open(r, "rb") as f: 259 data = f.read() 260 assert len(data) == 6, data 261 # Check before-fork callbacks 262 assert data[:2] == b'BA', data 263 # Check after-fork callbacks 264 assert sorted(data[2:]) == list(b'CDEF'), data 265 assert data.index(b'C') < data.index(b'D'), data 266 assert data.index(b'E') < data.index(b'F'), data 267 finally: 268 os.write(fin_w, b'!') 269 """ 270 assert_python_ok('-c', code) 271 272 @unittest.skipUnless(hasattr(posix, 'lockf'), "test needs posix.lockf()") 273 def test_lockf(self): 274 fd = os.open(os_helper.TESTFN, os.O_WRONLY | os.O_CREAT) 275 try: 276 os.write(fd, b'test') 277 os.lseek(fd, 0, os.SEEK_SET) 278 posix.lockf(fd, posix.F_LOCK, 4) 279 # section is locked 280 posix.lockf(fd, posix.F_ULOCK, 4) 281 finally: 282 os.close(fd) 283 284 @unittest.skipUnless(hasattr(posix, 'pread'), "test needs posix.pread()") 285 def test_pread(self): 286 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 287 try: 288 os.write(fd, b'test') 289 os.lseek(fd, 0, os.SEEK_SET) 290 self.assertEqual(b'es', posix.pread(fd, 2, 1)) 291 # the first pread() shouldn't disturb the file offset 292 self.assertEqual(b'te', posix.read(fd, 2)) 293 finally: 294 os.close(fd) 295 296 @unittest.skipUnless(hasattr(posix, 'preadv'), "test needs posix.preadv()") 297 def test_preadv(self): 298 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 299 try: 300 os.write(fd, b'test1tt2t3t5t6t6t8') 301 buf = [bytearray(i) for i in [5, 3, 2]] 302 self.assertEqual(posix.preadv(fd, buf, 3), 10) 303 self.assertEqual([b't1tt2', b't3t', b'5t'], list(buf)) 304 finally: 305 os.close(fd) 306 307 @unittest.skipUnless(hasattr(posix, 'preadv'), "test needs posix.preadv()") 308 @unittest.skipUnless(hasattr(posix, 'RWF_HIPRI'), "test needs posix.RWF_HIPRI") 309 def test_preadv_flags(self): 310 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 311 try: 312 os.write(fd, b'test1tt2t3t5t6t6t8') 313 buf = [bytearray(i) for i in [5, 3, 2]] 314 self.assertEqual(posix.preadv(fd, buf, 3, os.RWF_HIPRI), 10) 315 self.assertEqual([b't1tt2', b't3t', b'5t'], list(buf)) 316 except NotImplementedError: 317 self.skipTest("preadv2 not available") 318 except OSError as inst: 319 # Is possible that the macro RWF_HIPRI was defined at compilation time 320 # but the option is not supported by the kernel or the runtime libc shared 321 # library. 322 if inst.errno in {errno.EINVAL, errno.ENOTSUP}: 323 raise unittest.SkipTest("RWF_HIPRI is not supported by the current system") 324 else: 325 raise 326 finally: 327 os.close(fd) 328 329 @unittest.skipUnless(hasattr(posix, 'preadv'), "test needs posix.preadv()") 330 @requires_32b 331 def test_preadv_overflow_32bits(self): 332 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 333 try: 334 buf = [bytearray(2**16)] * 2**15 335 with self.assertRaises(OSError) as cm: 336 os.preadv(fd, buf, 0) 337 self.assertEqual(cm.exception.errno, errno.EINVAL) 338 self.assertEqual(bytes(buf[0]), b'\0'* 2**16) 339 finally: 340 os.close(fd) 341 342 @unittest.skipUnless(hasattr(posix, 'pwrite'), "test needs posix.pwrite()") 343 def test_pwrite(self): 344 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 345 try: 346 os.write(fd, b'test') 347 os.lseek(fd, 0, os.SEEK_SET) 348 posix.pwrite(fd, b'xx', 1) 349 self.assertEqual(b'txxt', posix.read(fd, 4)) 350 finally: 351 os.close(fd) 352 353 @unittest.skipUnless(hasattr(posix, 'pwritev'), "test needs posix.pwritev()") 354 def test_pwritev(self): 355 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 356 try: 357 os.write(fd, b"xx") 358 os.lseek(fd, 0, os.SEEK_SET) 359 n = os.pwritev(fd, [b'test1', b'tt2', b't3'], 2) 360 self.assertEqual(n, 10) 361 362 os.lseek(fd, 0, os.SEEK_SET) 363 self.assertEqual(b'xxtest1tt2t3', posix.read(fd, 100)) 364 finally: 365 os.close(fd) 366 367 @unittest.skipUnless(hasattr(posix, 'pwritev'), "test needs posix.pwritev()") 368 @unittest.skipUnless(hasattr(posix, 'os.RWF_SYNC'), "test needs os.RWF_SYNC") 369 def test_pwritev_flags(self): 370 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 371 try: 372 os.write(fd,b"xx") 373 os.lseek(fd, 0, os.SEEK_SET) 374 n = os.pwritev(fd, [b'test1', b'tt2', b't3'], 2, os.RWF_SYNC) 375 self.assertEqual(n, 10) 376 377 os.lseek(fd, 0, os.SEEK_SET) 378 self.assertEqual(b'xxtest1tt2', posix.read(fd, 100)) 379 finally: 380 os.close(fd) 381 382 @unittest.skipUnless(hasattr(posix, 'pwritev'), "test needs posix.pwritev()") 383 @requires_32b 384 def test_pwritev_overflow_32bits(self): 385 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 386 try: 387 with self.assertRaises(OSError) as cm: 388 os.pwritev(fd, [b"x" * 2**16] * 2**15, 0) 389 self.assertEqual(cm.exception.errno, errno.EINVAL) 390 finally: 391 os.close(fd) 392 393 @unittest.skipUnless(hasattr(posix, 'posix_fallocate'), 394 "test needs posix.posix_fallocate()") 395 def test_posix_fallocate(self): 396 fd = os.open(os_helper.TESTFN, os.O_WRONLY | os.O_CREAT) 397 try: 398 posix.posix_fallocate(fd, 0, 10) 399 except OSError as inst: 400 # issue10812, ZFS doesn't appear to support posix_fallocate, 401 # so skip Solaris-based since they are likely to have ZFS. 402 # issue33655: Also ignore EINVAL on *BSD since ZFS is also 403 # often used there. 404 if inst.errno == errno.EINVAL and sys.platform.startswith( 405 ('sunos', 'freebsd', 'netbsd', 'openbsd', 'gnukfreebsd')): 406 raise unittest.SkipTest("test may fail on ZFS filesystems") 407 else: 408 raise 409 finally: 410 os.close(fd) 411 412 # issue31106 - posix_fallocate() does not set error in errno. 413 @unittest.skipUnless(hasattr(posix, 'posix_fallocate'), 414 "test needs posix.posix_fallocate()") 415 def test_posix_fallocate_errno(self): 416 try: 417 posix.posix_fallocate(-42, 0, 10) 418 except OSError as inst: 419 if inst.errno != errno.EBADF: 420 raise 421 422 @unittest.skipUnless(hasattr(posix, 'posix_fadvise'), 423 "test needs posix.posix_fadvise()") 424 def test_posix_fadvise(self): 425 fd = os.open(os_helper.TESTFN, os.O_RDONLY) 426 try: 427 posix.posix_fadvise(fd, 0, 0, posix.POSIX_FADV_WILLNEED) 428 finally: 429 os.close(fd) 430 431 @unittest.skipUnless(hasattr(posix, 'posix_fadvise'), 432 "test needs posix.posix_fadvise()") 433 def test_posix_fadvise_errno(self): 434 try: 435 posix.posix_fadvise(-42, 0, 0, posix.POSIX_FADV_WILLNEED) 436 except OSError as inst: 437 if inst.errno != errno.EBADF: 438 raise 439 440 @unittest.skipUnless(os.utime in os.supports_fd, "test needs fd support in os.utime") 441 def test_utime_with_fd(self): 442 now = time.time() 443 fd = os.open(os_helper.TESTFN, os.O_RDONLY) 444 try: 445 posix.utime(fd) 446 posix.utime(fd, None) 447 self.assertRaises(TypeError, posix.utime, fd, (None, None)) 448 self.assertRaises(TypeError, posix.utime, fd, (now, None)) 449 self.assertRaises(TypeError, posix.utime, fd, (None, now)) 450 posix.utime(fd, (int(now), int(now))) 451 posix.utime(fd, (now, now)) 452 self.assertRaises(ValueError, posix.utime, fd, (now, now), ns=(now, now)) 453 self.assertRaises(ValueError, posix.utime, fd, (now, 0), ns=(None, None)) 454 self.assertRaises(ValueError, posix.utime, fd, (None, None), ns=(now, 0)) 455 posix.utime(fd, (int(now), int((now - int(now)) * 1e9))) 456 posix.utime(fd, ns=(int(now), int((now - int(now)) * 1e9))) 457 458 finally: 459 os.close(fd) 460 461 @unittest.skipUnless(os.utime in os.supports_follow_symlinks, "test needs follow_symlinks support in os.utime") 462 def test_utime_nofollow_symlinks(self): 463 now = time.time() 464 posix.utime(os_helper.TESTFN, None, follow_symlinks=False) 465 self.assertRaises(TypeError, posix.utime, os_helper.TESTFN, 466 (None, None), follow_symlinks=False) 467 self.assertRaises(TypeError, posix.utime, os_helper.TESTFN, 468 (now, None), follow_symlinks=False) 469 self.assertRaises(TypeError, posix.utime, os_helper.TESTFN, 470 (None, now), follow_symlinks=False) 471 posix.utime(os_helper.TESTFN, (int(now), int(now)), 472 follow_symlinks=False) 473 posix.utime(os_helper.TESTFN, (now, now), follow_symlinks=False) 474 posix.utime(os_helper.TESTFN, follow_symlinks=False) 475 476 @unittest.skipUnless(hasattr(posix, 'writev'), "test needs posix.writev()") 477 def test_writev(self): 478 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 479 try: 480 n = os.writev(fd, (b'test1', b'tt2', b't3')) 481 self.assertEqual(n, 10) 482 483 os.lseek(fd, 0, os.SEEK_SET) 484 self.assertEqual(b'test1tt2t3', posix.read(fd, 10)) 485 486 # Issue #20113: empty list of buffers should not crash 487 try: 488 size = posix.writev(fd, []) 489 except OSError: 490 # writev(fd, []) raises OSError(22, "Invalid argument") 491 # on OpenIndiana 492 pass 493 else: 494 self.assertEqual(size, 0) 495 finally: 496 os.close(fd) 497 498 @unittest.skipUnless(hasattr(posix, 'writev'), "test needs posix.writev()") 499 @requires_32b 500 def test_writev_overflow_32bits(self): 501 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 502 try: 503 with self.assertRaises(OSError) as cm: 504 os.writev(fd, [b"x" * 2**16] * 2**15) 505 self.assertEqual(cm.exception.errno, errno.EINVAL) 506 finally: 507 os.close(fd) 508 509 @unittest.skipUnless(hasattr(posix, 'readv'), "test needs posix.readv()") 510 def test_readv(self): 511 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 512 try: 513 os.write(fd, b'test1tt2t3') 514 os.lseek(fd, 0, os.SEEK_SET) 515 buf = [bytearray(i) for i in [5, 3, 2]] 516 self.assertEqual(posix.readv(fd, buf), 10) 517 self.assertEqual([b'test1', b'tt2', b't3'], [bytes(i) for i in buf]) 518 519 # Issue #20113: empty list of buffers should not crash 520 try: 521 size = posix.readv(fd, []) 522 except OSError: 523 # readv(fd, []) raises OSError(22, "Invalid argument") 524 # on OpenIndiana 525 pass 526 else: 527 self.assertEqual(size, 0) 528 finally: 529 os.close(fd) 530 531 @unittest.skipUnless(hasattr(posix, 'readv'), "test needs posix.readv()") 532 @requires_32b 533 def test_readv_overflow_32bits(self): 534 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 535 try: 536 buf = [bytearray(2**16)] * 2**15 537 with self.assertRaises(OSError) as cm: 538 os.readv(fd, buf) 539 self.assertEqual(cm.exception.errno, errno.EINVAL) 540 self.assertEqual(bytes(buf[0]), b'\0'* 2**16) 541 finally: 542 os.close(fd) 543 544 @unittest.skipUnless(hasattr(posix, 'dup'), 545 'test needs posix.dup()') 546 def test_dup(self): 547 fp = open(os_helper.TESTFN) 548 try: 549 fd = posix.dup(fp.fileno()) 550 self.assertIsInstance(fd, int) 551 os.close(fd) 552 finally: 553 fp.close() 554 555 @unittest.skipUnless(hasattr(posix, 'confstr'), 556 'test needs posix.confstr()') 557 def test_confstr(self): 558 self.assertRaises(ValueError, posix.confstr, "CS_garbage") 559 self.assertEqual(len(posix.confstr("CS_PATH")) > 0, True) 560 561 @unittest.skipUnless(hasattr(posix, 'dup2'), 562 'test needs posix.dup2()') 563 def test_dup2(self): 564 fp1 = open(os_helper.TESTFN) 565 fp2 = open(os_helper.TESTFN) 566 try: 567 posix.dup2(fp1.fileno(), fp2.fileno()) 568 finally: 569 fp1.close() 570 fp2.close() 571 572 @unittest.skipUnless(hasattr(os, 'O_CLOEXEC'), "needs os.O_CLOEXEC") 573 @support.requires_linux_version(2, 6, 23) 574 def test_oscloexec(self): 575 fd = os.open(os_helper.TESTFN, os.O_RDONLY|os.O_CLOEXEC) 576 self.addCleanup(os.close, fd) 577 self.assertFalse(os.get_inheritable(fd)) 578 579 @unittest.skipUnless(hasattr(posix, 'O_EXLOCK'), 580 'test needs posix.O_EXLOCK') 581 def test_osexlock(self): 582 fd = os.open(os_helper.TESTFN, 583 os.O_WRONLY|os.O_EXLOCK|os.O_CREAT) 584 self.assertRaises(OSError, os.open, os_helper.TESTFN, 585 os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK) 586 os.close(fd) 587 588 if hasattr(posix, "O_SHLOCK"): 589 fd = os.open(os_helper.TESTFN, 590 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) 591 self.assertRaises(OSError, os.open, os_helper.TESTFN, 592 os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK) 593 os.close(fd) 594 595 @unittest.skipUnless(hasattr(posix, 'O_SHLOCK'), 596 'test needs posix.O_SHLOCK') 597 def test_osshlock(self): 598 fd1 = os.open(os_helper.TESTFN, 599 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) 600 fd2 = os.open(os_helper.TESTFN, 601 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) 602 os.close(fd2) 603 os.close(fd1) 604 605 if hasattr(posix, "O_EXLOCK"): 606 fd = os.open(os_helper.TESTFN, 607 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) 608 self.assertRaises(OSError, os.open, os_helper.TESTFN, 609 os.O_RDONLY|os.O_EXLOCK|os.O_NONBLOCK) 610 os.close(fd) 611 612 @unittest.skipUnless(hasattr(posix, 'fstat'), 613 'test needs posix.fstat()') 614 def test_fstat(self): 615 fp = open(os_helper.TESTFN) 616 try: 617 self.assertTrue(posix.fstat(fp.fileno())) 618 self.assertTrue(posix.stat(fp.fileno())) 619 620 self.assertRaisesRegex(TypeError, 621 'should be string, bytes, os.PathLike or integer, not', 622 posix.stat, float(fp.fileno())) 623 finally: 624 fp.close() 625 626 def test_stat(self): 627 self.assertTrue(posix.stat(os_helper.TESTFN)) 628 self.assertTrue(posix.stat(os.fsencode(os_helper.TESTFN))) 629 630 self.assertWarnsRegex(DeprecationWarning, 631 'should be string, bytes, os.PathLike or integer, not', 632 posix.stat, bytearray(os.fsencode(os_helper.TESTFN))) 633 self.assertRaisesRegex(TypeError, 634 'should be string, bytes, os.PathLike or integer, not', 635 posix.stat, None) 636 self.assertRaisesRegex(TypeError, 637 'should be string, bytes, os.PathLike or integer, not', 638 posix.stat, list(os_helper.TESTFN)) 639 self.assertRaisesRegex(TypeError, 640 'should be string, bytes, os.PathLike or integer, not', 641 posix.stat, list(os.fsencode(os_helper.TESTFN))) 642 643 @unittest.skipUnless(hasattr(posix, 'mkfifo'), "don't have mkfifo()") 644 def test_mkfifo(self): 645 if sys.platform == "vxworks": 646 fifo_path = os.path.join("/fifos/", os_helper.TESTFN) 647 else: 648 fifo_path = os_helper.TESTFN 649 os_helper.unlink(fifo_path) 650 self.addCleanup(os_helper.unlink, fifo_path) 651 try: 652 posix.mkfifo(fifo_path, stat.S_IRUSR | stat.S_IWUSR) 653 except PermissionError as e: 654 self.skipTest('posix.mkfifo(): %s' % e) 655 self.assertTrue(stat.S_ISFIFO(posix.stat(fifo_path).st_mode)) 656 657 @unittest.skipUnless(hasattr(posix, 'mknod') and hasattr(stat, 'S_IFIFO'), 658 "don't have mknod()/S_IFIFO") 659 def test_mknod(self): 660 # Test using mknod() to create a FIFO (the only use specified 661 # by POSIX). 662 os_helper.unlink(os_helper.TESTFN) 663 mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR 664 try: 665 posix.mknod(os_helper.TESTFN, mode, 0) 666 except OSError as e: 667 # Some old systems don't allow unprivileged users to use 668 # mknod(), or only support creating device nodes. 669 self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES)) 670 else: 671 self.assertTrue(stat.S_ISFIFO(posix.stat(os_helper.TESTFN).st_mode)) 672 673 # Keyword arguments are also supported 674 os_helper.unlink(os_helper.TESTFN) 675 try: 676 posix.mknod(path=os_helper.TESTFN, mode=mode, device=0, 677 dir_fd=None) 678 except OSError as e: 679 self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES)) 680 681 @unittest.skipUnless(hasattr(posix, 'makedev'), 'test needs posix.makedev()') 682 def test_makedev(self): 683 st = posix.stat(os_helper.TESTFN) 684 dev = st.st_dev 685 self.assertIsInstance(dev, int) 686 self.assertGreaterEqual(dev, 0) 687 688 major = posix.major(dev) 689 self.assertIsInstance(major, int) 690 self.assertGreaterEqual(major, 0) 691 self.assertEqual(posix.major(dev), major) 692 self.assertRaises(TypeError, posix.major, float(dev)) 693 self.assertRaises(TypeError, posix.major) 694 self.assertRaises((ValueError, OverflowError), posix.major, -1) 695 696 minor = posix.minor(dev) 697 self.assertIsInstance(minor, int) 698 self.assertGreaterEqual(minor, 0) 699 self.assertEqual(posix.minor(dev), minor) 700 self.assertRaises(TypeError, posix.minor, float(dev)) 701 self.assertRaises(TypeError, posix.minor) 702 self.assertRaises((ValueError, OverflowError), posix.minor, -1) 703 704 self.assertEqual(posix.makedev(major, minor), dev) 705 self.assertRaises(TypeError, posix.makedev, float(major), minor) 706 self.assertRaises(TypeError, posix.makedev, major, float(minor)) 707 self.assertRaises(TypeError, posix.makedev, major) 708 self.assertRaises(TypeError, posix.makedev) 709 710 def _test_all_chown_common(self, chown_func, first_param, stat_func): 711 """Common code for chown, fchown and lchown tests.""" 712 def check_stat(uid, gid): 713 if stat_func is not None: 714 stat = stat_func(first_param) 715 self.assertEqual(stat.st_uid, uid) 716 self.assertEqual(stat.st_gid, gid) 717 uid = os.getuid() 718 gid = os.getgid() 719 # test a successful chown call 720 chown_func(first_param, uid, gid) 721 check_stat(uid, gid) 722 chown_func(first_param, -1, gid) 723 check_stat(uid, gid) 724 chown_func(first_param, uid, -1) 725 check_stat(uid, gid) 726 727 if sys.platform == "vxworks": 728 # On VxWorks, root user id is 1 and 0 means no login user: 729 # both are super users. 730 is_root = (uid in (0, 1)) 731 else: 732 is_root = (uid == 0) 733 if is_root: 734 # Try an amusingly large uid/gid to make sure we handle 735 # large unsigned values. (chown lets you use any 736 # uid/gid you like, even if they aren't defined.) 737 # 738 # On VxWorks uid_t is defined as unsigned short. A big 739 # value greater than 65535 will result in underflow error. 740 # 741 # This problem keeps coming up: 742 # http://bugs.python.org/issue1747858 743 # http://bugs.python.org/issue4591 744 # http://bugs.python.org/issue15301 745 # Hopefully the fix in 4591 fixes it for good! 746 # 747 # This part of the test only runs when run as root. 748 # Only scary people run their tests as root. 749 750 big_value = (2**31 if sys.platform != "vxworks" else 2**15) 751 chown_func(first_param, big_value, big_value) 752 check_stat(big_value, big_value) 753 chown_func(first_param, -1, -1) 754 check_stat(big_value, big_value) 755 chown_func(first_param, uid, gid) 756 check_stat(uid, gid) 757 elif platform.system() in ('HP-UX', 'SunOS'): 758 # HP-UX and Solaris can allow a non-root user to chown() to root 759 # (issue #5113) 760 raise unittest.SkipTest("Skipping because of non-standard chown() " 761 "behavior") 762 else: 763 # non-root cannot chown to root, raises OSError 764 self.assertRaises(OSError, chown_func, first_param, 0, 0) 765 check_stat(uid, gid) 766 self.assertRaises(OSError, chown_func, first_param, 0, -1) 767 check_stat(uid, gid) 768 if 0 not in os.getgroups(): 769 self.assertRaises(OSError, chown_func, first_param, -1, 0) 770 check_stat(uid, gid) 771 # test illegal types 772 for t in str, float: 773 self.assertRaises(TypeError, chown_func, first_param, t(uid), gid) 774 check_stat(uid, gid) 775 self.assertRaises(TypeError, chown_func, first_param, uid, t(gid)) 776 check_stat(uid, gid) 777 778 @unittest.skipUnless(hasattr(posix, 'chown'), "test needs os.chown()") 779 def test_chown(self): 780 # raise an OSError if the file does not exist 781 os.unlink(os_helper.TESTFN) 782 self.assertRaises(OSError, posix.chown, os_helper.TESTFN, -1, -1) 783 784 # re-create the file 785 os_helper.create_empty_file(os_helper.TESTFN) 786 self._test_all_chown_common(posix.chown, os_helper.TESTFN, posix.stat) 787 788 @unittest.skipUnless(hasattr(posix, 'fchown'), "test needs os.fchown()") 789 def test_fchown(self): 790 os.unlink(os_helper.TESTFN) 791 792 # re-create the file 793 test_file = open(os_helper.TESTFN, 'w') 794 try: 795 fd = test_file.fileno() 796 self._test_all_chown_common(posix.fchown, fd, 797 getattr(posix, 'fstat', None)) 798 finally: 799 test_file.close() 800 801 @unittest.skipUnless(hasattr(posix, 'lchown'), "test needs os.lchown()") 802 def test_lchown(self): 803 os.unlink(os_helper.TESTFN) 804 # create a symlink 805 os.symlink(_DUMMY_SYMLINK, os_helper.TESTFN) 806 self._test_all_chown_common(posix.lchown, os_helper.TESTFN, 807 getattr(posix, 'lstat', None)) 808 809 @unittest.skipUnless(hasattr(posix, 'chdir'), 'test needs posix.chdir()') 810 def test_chdir(self): 811 posix.chdir(os.curdir) 812 self.assertRaises(OSError, posix.chdir, os_helper.TESTFN) 813 814 def test_listdir(self): 815 self.assertIn(os_helper.TESTFN, posix.listdir(os.curdir)) 816 817 def test_listdir_default(self): 818 # When listdir is called without argument, 819 # it's the same as listdir(os.curdir). 820 self.assertIn(os_helper.TESTFN, posix.listdir()) 821 822 def test_listdir_bytes(self): 823 # When listdir is called with a bytes object, 824 # the returned strings are of type bytes. 825 self.assertIn(os.fsencode(os_helper.TESTFN), posix.listdir(b'.')) 826 827 def test_listdir_bytes_like(self): 828 for cls in bytearray, memoryview: 829 with self.assertWarns(DeprecationWarning): 830 names = posix.listdir(cls(b'.')) 831 self.assertIn(os.fsencode(os_helper.TESTFN), names) 832 for name in names: 833 self.assertIs(type(name), bytes) 834 835 @unittest.skipUnless(posix.listdir in os.supports_fd, 836 "test needs fd support for posix.listdir()") 837 def test_listdir_fd(self): 838 f = posix.open(posix.getcwd(), posix.O_RDONLY) 839 self.addCleanup(posix.close, f) 840 self.assertEqual( 841 sorted(posix.listdir('.')), 842 sorted(posix.listdir(f)) 843 ) 844 # Check that the fd offset was reset (issue #13739) 845 self.assertEqual( 846 sorted(posix.listdir('.')), 847 sorted(posix.listdir(f)) 848 ) 849 850 @unittest.skipUnless(hasattr(posix, 'access'), 'test needs posix.access()') 851 def test_access(self): 852 self.assertTrue(posix.access(os_helper.TESTFN, os.R_OK)) 853 854 @unittest.skipUnless(hasattr(posix, 'umask'), 'test needs posix.umask()') 855 def test_umask(self): 856 old_mask = posix.umask(0) 857 self.assertIsInstance(old_mask, int) 858 posix.umask(old_mask) 859 860 @unittest.skipUnless(hasattr(posix, 'strerror'), 861 'test needs posix.strerror()') 862 def test_strerror(self): 863 self.assertTrue(posix.strerror(0)) 864 865 @unittest.skipUnless(hasattr(posix, 'pipe'), 'test needs posix.pipe()') 866 def test_pipe(self): 867 reader, writer = posix.pipe() 868 os.close(reader) 869 os.close(writer) 870 871 @unittest.skipUnless(hasattr(os, 'pipe2'), "test needs os.pipe2()") 872 @support.requires_linux_version(2, 6, 27) 873 def test_pipe2(self): 874 self.assertRaises(TypeError, os.pipe2, 'DEADBEEF') 875 self.assertRaises(TypeError, os.pipe2, 0, 0) 876 877 # try calling with flags = 0, like os.pipe() 878 r, w = os.pipe2(0) 879 os.close(r) 880 os.close(w) 881 882 # test flags 883 r, w = os.pipe2(os.O_CLOEXEC|os.O_NONBLOCK) 884 self.addCleanup(os.close, r) 885 self.addCleanup(os.close, w) 886 self.assertFalse(os.get_inheritable(r)) 887 self.assertFalse(os.get_inheritable(w)) 888 self.assertFalse(os.get_blocking(r)) 889 self.assertFalse(os.get_blocking(w)) 890 # try reading from an empty pipe: this should fail, not block 891 self.assertRaises(OSError, os.read, r, 1) 892 # try a write big enough to fill-up the pipe: this should either 893 # fail or perform a partial write, not block 894 try: 895 os.write(w, b'x' * support.PIPE_MAX_SIZE) 896 except OSError: 897 pass 898 899 @support.cpython_only 900 @unittest.skipUnless(hasattr(os, 'pipe2'), "test needs os.pipe2()") 901 @support.requires_linux_version(2, 6, 27) 902 def test_pipe2_c_limits(self): 903 # Issue 15989 904 import _testcapi 905 self.assertRaises(OverflowError, os.pipe2, _testcapi.INT_MAX + 1) 906 self.assertRaises(OverflowError, os.pipe2, _testcapi.UINT_MAX + 1) 907 908 @unittest.skipUnless(hasattr(posix, 'utime'), 'test needs posix.utime()') 909 def test_utime(self): 910 now = time.time() 911 posix.utime(os_helper.TESTFN, None) 912 self.assertRaises(TypeError, posix.utime, 913 os_helper.TESTFN, (None, None)) 914 self.assertRaises(TypeError, posix.utime, 915 os_helper.TESTFN, (now, None)) 916 self.assertRaises(TypeError, posix.utime, 917 os_helper.TESTFN, (None, now)) 918 posix.utime(os_helper.TESTFN, (int(now), int(now))) 919 posix.utime(os_helper.TESTFN, (now, now)) 920 921 def _test_chflags_regular_file(self, chflags_func, target_file, **kwargs): 922 st = os.stat(target_file) 923 self.assertTrue(hasattr(st, 'st_flags')) 924 925 # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE. 926 flags = st.st_flags | stat.UF_IMMUTABLE 927 try: 928 chflags_func(target_file, flags, **kwargs) 929 except OSError as err: 930 if err.errno != errno.EOPNOTSUPP: 931 raise 932 msg = 'chflag UF_IMMUTABLE not supported by underlying fs' 933 self.skipTest(msg) 934 935 try: 936 new_st = os.stat(target_file) 937 self.assertEqual(st.st_flags | stat.UF_IMMUTABLE, new_st.st_flags) 938 try: 939 fd = open(target_file, 'w+') 940 except OSError as e: 941 self.assertEqual(e.errno, errno.EPERM) 942 finally: 943 posix.chflags(target_file, st.st_flags) 944 945 @unittest.skipUnless(hasattr(posix, 'chflags'), 'test needs os.chflags()') 946 def test_chflags(self): 947 self._test_chflags_regular_file(posix.chflags, os_helper.TESTFN) 948 949 @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()') 950 def test_lchflags_regular_file(self): 951 self._test_chflags_regular_file(posix.lchflags, os_helper.TESTFN) 952 self._test_chflags_regular_file(posix.chflags, os_helper.TESTFN, 953 follow_symlinks=False) 954 955 @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()') 956 def test_lchflags_symlink(self): 957 testfn_st = os.stat(os_helper.TESTFN) 958 959 self.assertTrue(hasattr(testfn_st, 'st_flags')) 960 961 os.symlink(os_helper.TESTFN, _DUMMY_SYMLINK) 962 self.teardown_files.append(_DUMMY_SYMLINK) 963 dummy_symlink_st = os.lstat(_DUMMY_SYMLINK) 964 965 def chflags_nofollow(path, flags): 966 return posix.chflags(path, flags, follow_symlinks=False) 967 968 for fn in (posix.lchflags, chflags_nofollow): 969 # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE. 970 flags = dummy_symlink_st.st_flags | stat.UF_IMMUTABLE 971 try: 972 fn(_DUMMY_SYMLINK, flags) 973 except OSError as err: 974 if err.errno != errno.EOPNOTSUPP: 975 raise 976 msg = 'chflag UF_IMMUTABLE not supported by underlying fs' 977 self.skipTest(msg) 978 try: 979 new_testfn_st = os.stat(os_helper.TESTFN) 980 new_dummy_symlink_st = os.lstat(_DUMMY_SYMLINK) 981 982 self.assertEqual(testfn_st.st_flags, new_testfn_st.st_flags) 983 self.assertEqual(dummy_symlink_st.st_flags | stat.UF_IMMUTABLE, 984 new_dummy_symlink_st.st_flags) 985 finally: 986 fn(_DUMMY_SYMLINK, dummy_symlink_st.st_flags) 987 988 def test_environ(self): 989 if os.name == "nt": 990 item_type = str 991 else: 992 item_type = bytes 993 for k, v in posix.environ.items(): 994 self.assertEqual(type(k), item_type) 995 self.assertEqual(type(v), item_type) 996 997 def test_putenv(self): 998 with self.assertRaises(ValueError): 999 os.putenv('FRUIT\0VEGETABLE', 'cabbage') 1000 with self.assertRaises(ValueError): 1001 os.putenv(b'FRUIT\0VEGETABLE', b'cabbage') 1002 with self.assertRaises(ValueError): 1003 os.putenv('FRUIT', 'orange\0VEGETABLE=cabbage') 1004 with self.assertRaises(ValueError): 1005 os.putenv(b'FRUIT', b'orange\0VEGETABLE=cabbage') 1006 with self.assertRaises(ValueError): 1007 os.putenv('FRUIT=ORANGE', 'lemon') 1008 with self.assertRaises(ValueError): 1009 os.putenv(b'FRUIT=ORANGE', b'lemon') 1010 1011 @unittest.skipUnless(hasattr(posix, 'getcwd'), 'test needs posix.getcwd()') 1012 def test_getcwd_long_pathnames(self): 1013 dirname = 'getcwd-test-directory-0123456789abcdef-01234567890abcdef' 1014 curdir = os.getcwd() 1015 base_path = os.path.abspath(os_helper.TESTFN) + '.getcwd' 1016 1017 try: 1018 os.mkdir(base_path) 1019 os.chdir(base_path) 1020 except: 1021 # Just returning nothing instead of the SkipTest exception, because 1022 # the test results in Error in that case. Is that ok? 1023 # raise unittest.SkipTest("cannot create directory for testing") 1024 return 1025 1026 def _create_and_do_getcwd(dirname, current_path_length = 0): 1027 try: 1028 os.mkdir(dirname) 1029 except: 1030 raise unittest.SkipTest("mkdir cannot create directory sufficiently deep for getcwd test") 1031 1032 os.chdir(dirname) 1033 try: 1034 os.getcwd() 1035 if current_path_length < 1027: 1036 _create_and_do_getcwd(dirname, current_path_length + len(dirname) + 1) 1037 finally: 1038 os.chdir('..') 1039 os.rmdir(dirname) 1040 1041 _create_and_do_getcwd(dirname) 1042 1043 finally: 1044 os.chdir(curdir) 1045 os_helper.rmtree(base_path) 1046 1047 @unittest.skipUnless(hasattr(posix, 'getgrouplist'), "test needs posix.getgrouplist()") 1048 @unittest.skipUnless(hasattr(pwd, 'getpwuid'), "test needs pwd.getpwuid()") 1049 @unittest.skipUnless(hasattr(os, 'getuid'), "test needs os.getuid()") 1050 def test_getgrouplist(self): 1051 user = pwd.getpwuid(os.getuid())[0] 1052 group = pwd.getpwuid(os.getuid())[3] 1053 self.assertIn(group, posix.getgrouplist(user, group)) 1054 1055 1056 @unittest.skipUnless(hasattr(os, 'getegid'), "test needs os.getegid()") 1057 @unittest.skipUnless(hasattr(os, 'popen'), "test needs os.popen()") 1058 def test_getgroups(self): 1059 with os.popen('id -G 2>/dev/null') as idg: 1060 groups = idg.read().strip() 1061 ret = idg.close() 1062 1063 try: 1064 idg_groups = set(int(g) for g in groups.split()) 1065 except ValueError: 1066 idg_groups = set() 1067 if ret is not None or not idg_groups: 1068 raise unittest.SkipTest("need working 'id -G'") 1069 1070 # Issues 16698: OS X ABIs prior to 10.6 have limits on getgroups() 1071 if sys.platform == 'darwin': 1072 import sysconfig 1073 dt = sysconfig.get_config_var('MACOSX_DEPLOYMENT_TARGET') or '10.3' 1074 if tuple(int(n) for n in dt.split('.')[0:2]) < (10, 6): 1075 raise unittest.SkipTest("getgroups(2) is broken prior to 10.6") 1076 1077 # 'id -G' and 'os.getgroups()' should return the same 1078 # groups, ignoring order, duplicates, and the effective gid. 1079 # #10822/#26944 - It is implementation defined whether 1080 # posix.getgroups() includes the effective gid. 1081 symdiff = idg_groups.symmetric_difference(posix.getgroups()) 1082 self.assertTrue(not symdiff or symdiff == {posix.getegid()}) 1083 1084 # tests for the posix *at functions follow 1085 1086 @unittest.skipUnless(os.access in os.supports_dir_fd, "test needs dir_fd support for os.access()") 1087 def test_access_dir_fd(self): 1088 f = posix.open(posix.getcwd(), posix.O_RDONLY) 1089 try: 1090 self.assertTrue(posix.access(os_helper.TESTFN, os.R_OK, dir_fd=f)) 1091 finally: 1092 posix.close(f) 1093 1094 @unittest.skipUnless(os.chmod in os.supports_dir_fd, "test needs dir_fd support in os.chmod()") 1095 def test_chmod_dir_fd(self): 1096 os.chmod(os_helper.TESTFN, stat.S_IRUSR) 1097 1098 f = posix.open(posix.getcwd(), posix.O_RDONLY) 1099 try: 1100 posix.chmod(os_helper.TESTFN, stat.S_IRUSR | stat.S_IWUSR, dir_fd=f) 1101 1102 s = posix.stat(os_helper.TESTFN) 1103 self.assertEqual(s[0] & stat.S_IRWXU, stat.S_IRUSR | stat.S_IWUSR) 1104 finally: 1105 posix.close(f) 1106 1107 @unittest.skipUnless(hasattr(os, 'chown') and (os.chown in os.supports_dir_fd), 1108 "test needs dir_fd support in os.chown()") 1109 def test_chown_dir_fd(self): 1110 os_helper.unlink(os_helper.TESTFN) 1111 os_helper.create_empty_file(os_helper.TESTFN) 1112 1113 f = posix.open(posix.getcwd(), posix.O_RDONLY) 1114 try: 1115 posix.chown(os_helper.TESTFN, os.getuid(), os.getgid(), dir_fd=f) 1116 finally: 1117 posix.close(f) 1118 1119 @unittest.skipUnless(os.stat in os.supports_dir_fd, "test needs dir_fd support in os.stat()") 1120 def test_stat_dir_fd(self): 1121 os_helper.unlink(os_helper.TESTFN) 1122 with open(os_helper.TESTFN, 'w') as outfile: 1123 outfile.write("testline\n") 1124 1125 f = posix.open(posix.getcwd(), posix.O_RDONLY) 1126 try: 1127 s1 = posix.stat(os_helper.TESTFN) 1128 s2 = posix.stat(os_helper.TESTFN, dir_fd=f) 1129 self.assertEqual(s1, s2) 1130 s2 = posix.stat(os_helper.TESTFN, dir_fd=None) 1131 self.assertEqual(s1, s2) 1132 self.assertRaisesRegex(TypeError, 'should be integer or None, not', 1133 posix.stat, os_helper.TESTFN, dir_fd=posix.getcwd()) 1134 self.assertRaisesRegex(TypeError, 'should be integer or None, not', 1135 posix.stat, os_helper.TESTFN, dir_fd=float(f)) 1136 self.assertRaises(OverflowError, 1137 posix.stat, os_helper.TESTFN, dir_fd=10**20) 1138 finally: 1139 posix.close(f) 1140 1141 @unittest.skipUnless(os.utime in os.supports_dir_fd, "test needs dir_fd support in os.utime()") 1142 def test_utime_dir_fd(self): 1143 f = posix.open(posix.getcwd(), posix.O_RDONLY) 1144 try: 1145 now = time.time() 1146 posix.utime(os_helper.TESTFN, None, dir_fd=f) 1147 posix.utime(os_helper.TESTFN, dir_fd=f) 1148 self.assertRaises(TypeError, posix.utime, os_helper.TESTFN, 1149 now, dir_fd=f) 1150 self.assertRaises(TypeError, posix.utime, os_helper.TESTFN, 1151 (None, None), dir_fd=f) 1152 self.assertRaises(TypeError, posix.utime, os_helper.TESTFN, 1153 (now, None), dir_fd=f) 1154 self.assertRaises(TypeError, posix.utime, os_helper.TESTFN, 1155 (None, now), dir_fd=f) 1156 self.assertRaises(TypeError, posix.utime, os_helper.TESTFN, 1157 (now, "x"), dir_fd=f) 1158 posix.utime(os_helper.TESTFN, (int(now), int(now)), dir_fd=f) 1159 posix.utime(os_helper.TESTFN, (now, now), dir_fd=f) 1160 posix.utime(os_helper.TESTFN, 1161 (int(now), int((now - int(now)) * 1e9)), dir_fd=f) 1162 posix.utime(os_helper.TESTFN, dir_fd=f, 1163 times=(int(now), int((now - int(now)) * 1e9))) 1164 1165 # try dir_fd and follow_symlinks together 1166 if os.utime in os.supports_follow_symlinks: 1167 try: 1168 posix.utime(os_helper.TESTFN, follow_symlinks=False, 1169 dir_fd=f) 1170 except ValueError: 1171 # whoops! using both together not supported on this platform. 1172 pass 1173 1174 finally: 1175 posix.close(f) 1176 1177 @unittest.skipUnless(os.link in os.supports_dir_fd, "test needs dir_fd support in os.link()") 1178 def test_link_dir_fd(self): 1179 f = posix.open(posix.getcwd(), posix.O_RDONLY) 1180 try: 1181 posix.link(os_helper.TESTFN, os_helper.TESTFN + 'link', 1182 src_dir_fd=f, dst_dir_fd=f) 1183 except PermissionError as e: 1184 self.skipTest('posix.link(): %s' % e) 1185 else: 1186 # should have same inodes 1187 self.assertEqual(posix.stat(os_helper.TESTFN)[1], 1188 posix.stat(os_helper.TESTFN + 'link')[1]) 1189 finally: 1190 posix.close(f) 1191 os_helper.unlink(os_helper.TESTFN + 'link') 1192 1193 @unittest.skipUnless(os.mkdir in os.supports_dir_fd, "test needs dir_fd support in os.mkdir()") 1194 def test_mkdir_dir_fd(self): 1195 f = posix.open(posix.getcwd(), posix.O_RDONLY) 1196 try: 1197 posix.mkdir(os_helper.TESTFN + 'dir', dir_fd=f) 1198 posix.stat(os_helper.TESTFN + 'dir') # should not raise exception 1199 finally: 1200 posix.close(f) 1201 os_helper.rmtree(os_helper.TESTFN + 'dir') 1202 1203 @unittest.skipUnless(hasattr(os, 'mknod') 1204 and (os.mknod in os.supports_dir_fd) 1205 and hasattr(stat, 'S_IFIFO'), 1206 "test requires both stat.S_IFIFO and dir_fd support for os.mknod()") 1207 def test_mknod_dir_fd(self): 1208 # Test using mknodat() to create a FIFO (the only use specified 1209 # by POSIX). 1210 os_helper.unlink(os_helper.TESTFN) 1211 mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR 1212 f = posix.open(posix.getcwd(), posix.O_RDONLY) 1213 try: 1214 posix.mknod(os_helper.TESTFN, mode, 0, dir_fd=f) 1215 except OSError as e: 1216 # Some old systems don't allow unprivileged users to use 1217 # mknod(), or only support creating device nodes. 1218 self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES)) 1219 else: 1220 self.assertTrue(stat.S_ISFIFO(posix.stat(os_helper.TESTFN).st_mode)) 1221 finally: 1222 posix.close(f) 1223 1224 @unittest.skipUnless(os.open in os.supports_dir_fd, "test needs dir_fd support in os.open()") 1225 def test_open_dir_fd(self): 1226 os_helper.unlink(os_helper.TESTFN) 1227 with open(os_helper.TESTFN, 'w') as outfile: 1228 outfile.write("testline\n") 1229 a = posix.open(posix.getcwd(), posix.O_RDONLY) 1230 b = posix.open(os_helper.TESTFN, posix.O_RDONLY, dir_fd=a) 1231 try: 1232 res = posix.read(b, 9).decode(encoding="utf-8") 1233 self.assertEqual("testline\n", res) 1234 finally: 1235 posix.close(a) 1236 posix.close(b) 1237 1238 @unittest.skipUnless(hasattr(os, 'readlink') and (os.readlink in os.supports_dir_fd), 1239 "test needs dir_fd support in os.readlink()") 1240 def test_readlink_dir_fd(self): 1241 os.symlink(os_helper.TESTFN, os_helper.TESTFN + 'link') 1242 f = posix.open(posix.getcwd(), posix.O_RDONLY) 1243 try: 1244 self.assertEqual(posix.readlink(os_helper.TESTFN + 'link'), 1245 posix.readlink(os_helper.TESTFN + 'link', dir_fd=f)) 1246 finally: 1247 os_helper.unlink(os_helper.TESTFN + 'link') 1248 posix.close(f) 1249 1250 @unittest.skipUnless(os.rename in os.supports_dir_fd, "test needs dir_fd support in os.rename()") 1251 def test_rename_dir_fd(self): 1252 os_helper.unlink(os_helper.TESTFN) 1253 os_helper.create_empty_file(os_helper.TESTFN + 'ren') 1254 f = posix.open(posix.getcwd(), posix.O_RDONLY) 1255 try: 1256 posix.rename(os_helper.TESTFN + 'ren', os_helper.TESTFN, src_dir_fd=f, dst_dir_fd=f) 1257 except: 1258 posix.rename(os_helper.TESTFN + 'ren', os_helper.TESTFN) 1259 raise 1260 else: 1261 posix.stat(os_helper.TESTFN) # should not raise exception 1262 finally: 1263 posix.close(f) 1264 1265 @unittest.skipUnless(hasattr(signal, 'SIGCHLD'), 'CLD_XXXX be placed in si_code for a SIGCHLD signal') 1266 @unittest.skipUnless(hasattr(os, 'waitid_result'), "test needs os.waitid_result") 1267 def test_cld_xxxx_constants(self): 1268 os.CLD_EXITED 1269 os.CLD_KILLED 1270 os.CLD_DUMPED 1271 os.CLD_TRAPPED 1272 os.CLD_STOPPED 1273 os.CLD_CONTINUED 1274 1275 @unittest.skipUnless(os.symlink in os.supports_dir_fd, "test needs dir_fd support in os.symlink()") 1276 def test_symlink_dir_fd(self): 1277 f = posix.open(posix.getcwd(), posix.O_RDONLY) 1278 try: 1279 posix.symlink(os_helper.TESTFN, os_helper.TESTFN + 'link', 1280 dir_fd=f) 1281 self.assertEqual(posix.readlink(os_helper.TESTFN + 'link'), 1282 os_helper.TESTFN) 1283 finally: 1284 posix.close(f) 1285 os_helper.unlink(os_helper.TESTFN + 'link') 1286 1287 @unittest.skipUnless(os.unlink in os.supports_dir_fd, "test needs dir_fd support in os.unlink()") 1288 def test_unlink_dir_fd(self): 1289 f = posix.open(posix.getcwd(), posix.O_RDONLY) 1290 os_helper.create_empty_file(os_helper.TESTFN + 'del') 1291 posix.stat(os_helper.TESTFN + 'del') # should not raise exception 1292 try: 1293 posix.unlink(os_helper.TESTFN + 'del', dir_fd=f) 1294 except: 1295 os_helper.unlink(os_helper.TESTFN + 'del') 1296 raise 1297 else: 1298 self.assertRaises(OSError, posix.stat, os_helper.TESTFN + 'link') 1299 finally: 1300 posix.close(f) 1301 1302 @unittest.skipUnless(os.mkfifo in os.supports_dir_fd, "test needs dir_fd support in os.mkfifo()") 1303 def test_mkfifo_dir_fd(self): 1304 os_helper.unlink(os_helper.TESTFN) 1305 f = posix.open(posix.getcwd(), posix.O_RDONLY) 1306 try: 1307 try: 1308 posix.mkfifo(os_helper.TESTFN, 1309 stat.S_IRUSR | stat.S_IWUSR, dir_fd=f) 1310 except PermissionError as e: 1311 self.skipTest('posix.mkfifo(): %s' % e) 1312 self.assertTrue(stat.S_ISFIFO(posix.stat(os_helper.TESTFN).st_mode)) 1313 finally: 1314 posix.close(f) 1315 1316 requires_sched_h = unittest.skipUnless(hasattr(posix, 'sched_yield'), 1317 "don't have scheduling support") 1318 requires_sched_affinity = unittest.skipUnless(hasattr(posix, 'sched_setaffinity'), 1319 "don't have sched affinity support") 1320 1321 @requires_sched_h 1322 def test_sched_yield(self): 1323 # This has no error conditions (at least on Linux). 1324 posix.sched_yield() 1325 1326 @requires_sched_h 1327 @unittest.skipUnless(hasattr(posix, 'sched_get_priority_max'), 1328 "requires sched_get_priority_max()") 1329 def test_sched_priority(self): 1330 # Round-robin usually has interesting priorities. 1331 pol = posix.SCHED_RR 1332 lo = posix.sched_get_priority_min(pol) 1333 hi = posix.sched_get_priority_max(pol) 1334 self.assertIsInstance(lo, int) 1335 self.assertIsInstance(hi, int) 1336 self.assertGreaterEqual(hi, lo) 1337 # OSX evidently just returns 15 without checking the argument. 1338 if sys.platform != "darwin": 1339 self.assertRaises(OSError, posix.sched_get_priority_min, -23) 1340 self.assertRaises(OSError, posix.sched_get_priority_max, -23) 1341 1342 @requires_sched 1343 def test_get_and_set_scheduler_and_param(self): 1344 possible_schedulers = [sched for name, sched in posix.__dict__.items() 1345 if name.startswith("SCHED_")] 1346 mine = posix.sched_getscheduler(0) 1347 self.assertIn(mine, possible_schedulers) 1348 try: 1349 parent = posix.sched_getscheduler(os.getppid()) 1350 except OSError as e: 1351 if e.errno != errno.EPERM: 1352 raise 1353 else: 1354 self.assertIn(parent, possible_schedulers) 1355 self.assertRaises(OSError, posix.sched_getscheduler, -1) 1356 self.assertRaises(OSError, posix.sched_getparam, -1) 1357 param = posix.sched_getparam(0) 1358 self.assertIsInstance(param.sched_priority, int) 1359 1360 # POSIX states that calling sched_setparam() or sched_setscheduler() on 1361 # a process with a scheduling policy other than SCHED_FIFO or SCHED_RR 1362 # is implementation-defined: NetBSD and FreeBSD can return EINVAL. 1363 if not sys.platform.startswith(('freebsd', 'netbsd')): 1364 try: 1365 posix.sched_setscheduler(0, mine, param) 1366 posix.sched_setparam(0, param) 1367 except OSError as e: 1368 if e.errno != errno.EPERM: 1369 raise 1370 self.assertRaises(OSError, posix.sched_setparam, -1, param) 1371 1372 self.assertRaises(OSError, posix.sched_setscheduler, -1, mine, param) 1373 self.assertRaises(TypeError, posix.sched_setscheduler, 0, mine, None) 1374 self.assertRaises(TypeError, posix.sched_setparam, 0, 43) 1375 param = posix.sched_param(None) 1376 self.assertRaises(TypeError, posix.sched_setparam, 0, param) 1377 large = 214748364700 1378 param = posix.sched_param(large) 1379 self.assertRaises(OverflowError, posix.sched_setparam, 0, param) 1380 param = posix.sched_param(sched_priority=-large) 1381 self.assertRaises(OverflowError, posix.sched_setparam, 0, param) 1382 1383 @unittest.skipUnless(hasattr(posix, "sched_rr_get_interval"), "no function") 1384 def test_sched_rr_get_interval(self): 1385 try: 1386 interval = posix.sched_rr_get_interval(0) 1387 except OSError as e: 1388 # This likely means that sched_rr_get_interval is only valid for 1389 # processes with the SCHED_RR scheduler in effect. 1390 if e.errno != errno.EINVAL: 1391 raise 1392 self.skipTest("only works on SCHED_RR processes") 1393 self.assertIsInstance(interval, float) 1394 # Reasonable constraints, I think. 1395 self.assertGreaterEqual(interval, 0.) 1396 self.assertLess(interval, 1.) 1397 1398 @requires_sched_affinity 1399 def test_sched_getaffinity(self): 1400 mask = posix.sched_getaffinity(0) 1401 self.assertIsInstance(mask, set) 1402 self.assertGreaterEqual(len(mask), 1) 1403 self.assertRaises(OSError, posix.sched_getaffinity, -1) 1404 for cpu in mask: 1405 self.assertIsInstance(cpu, int) 1406 self.assertGreaterEqual(cpu, 0) 1407 self.assertLess(cpu, 1 << 32) 1408 1409 @requires_sched_affinity 1410 def test_sched_setaffinity(self): 1411 mask = posix.sched_getaffinity(0) 1412 if len(mask) > 1: 1413 # Empty masks are forbidden 1414 mask.pop() 1415 posix.sched_setaffinity(0, mask) 1416 self.assertEqual(posix.sched_getaffinity(0), mask) 1417 self.assertRaises(OSError, posix.sched_setaffinity, 0, []) 1418 self.assertRaises(ValueError, posix.sched_setaffinity, 0, [-10]) 1419 self.assertRaises(ValueError, posix.sched_setaffinity, 0, map(int, "0X")) 1420 self.assertRaises(OverflowError, posix.sched_setaffinity, 0, [1<<128]) 1421 self.assertRaises(OSError, posix.sched_setaffinity, -1, mask) 1422 1423 def test_rtld_constants(self): 1424 # check presence of major RTLD_* constants 1425 posix.RTLD_LAZY 1426 posix.RTLD_NOW 1427 posix.RTLD_GLOBAL 1428 posix.RTLD_LOCAL 1429 1430 @unittest.skipUnless(hasattr(os, 'SEEK_HOLE'), 1431 "test needs an OS that reports file holes") 1432 def test_fs_holes(self): 1433 # Even if the filesystem doesn't report holes, 1434 # if the OS supports it the SEEK_* constants 1435 # will be defined and will have a consistent 1436 # behaviour: 1437 # os.SEEK_DATA = current position 1438 # os.SEEK_HOLE = end of file position 1439 with open(os_helper.TESTFN, 'r+b') as fp: 1440 fp.write(b"hello") 1441 fp.flush() 1442 size = fp.tell() 1443 fno = fp.fileno() 1444 try : 1445 for i in range(size): 1446 self.assertEqual(i, os.lseek(fno, i, os.SEEK_DATA)) 1447 self.assertLessEqual(size, os.lseek(fno, i, os.SEEK_HOLE)) 1448 self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_DATA) 1449 self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_HOLE) 1450 except OSError : 1451 # Some OSs claim to support SEEK_HOLE/SEEK_DATA 1452 # but it is not true. 1453 # For instance: 1454 # http://lists.freebsd.org/pipermail/freebsd-amd64/2012-January/014332.html 1455 raise unittest.SkipTest("OSError raised!") 1456 1457 def test_path_error2(self): 1458 """ 1459 Test functions that call path_error2(), providing two filenames in their exceptions. 1460 """ 1461 for name in ("rename", "replace", "link"): 1462 function = getattr(os, name, None) 1463 if function is None: 1464 continue 1465 1466 for dst in ("noodly2", os_helper.TESTFN): 1467 try: 1468 function('doesnotexistfilename', dst) 1469 except OSError as e: 1470 self.assertIn("'doesnotexistfilename' -> '{}'".format(dst), str(e)) 1471 break 1472 else: 1473 self.fail("No valid path_error2() test for os." + name) 1474 1475 def test_path_with_null_character(self): 1476 fn = os_helper.TESTFN 1477 fn_with_NUL = fn + '\0' 1478 self.addCleanup(os_helper.unlink, fn) 1479 os_helper.unlink(fn) 1480 fd = None 1481 try: 1482 with self.assertRaises(ValueError): 1483 fd = os.open(fn_with_NUL, os.O_WRONLY | os.O_CREAT) # raises 1484 finally: 1485 if fd is not None: 1486 os.close(fd) 1487 self.assertFalse(os.path.exists(fn)) 1488 self.assertRaises(ValueError, os.mkdir, fn_with_NUL) 1489 self.assertFalse(os.path.exists(fn)) 1490 open(fn, 'wb').close() 1491 self.assertRaises(ValueError, os.stat, fn_with_NUL) 1492 1493 def test_path_with_null_byte(self): 1494 fn = os.fsencode(os_helper.TESTFN) 1495 fn_with_NUL = fn + b'\0' 1496 self.addCleanup(os_helper.unlink, fn) 1497 os_helper.unlink(fn) 1498 fd = None 1499 try: 1500 with self.assertRaises(ValueError): 1501 fd = os.open(fn_with_NUL, os.O_WRONLY | os.O_CREAT) # raises 1502 finally: 1503 if fd is not None: 1504 os.close(fd) 1505 self.assertFalse(os.path.exists(fn)) 1506 self.assertRaises(ValueError, os.mkdir, fn_with_NUL) 1507 self.assertFalse(os.path.exists(fn)) 1508 open(fn, 'wb').close() 1509 self.assertRaises(ValueError, os.stat, fn_with_NUL) 1510 1511 @unittest.skipUnless(hasattr(os, "pidfd_open"), "pidfd_open unavailable") 1512 def test_pidfd_open(self): 1513 with self.assertRaises(OSError) as cm: 1514 os.pidfd_open(-1) 1515 if cm.exception.errno == errno.ENOSYS: 1516 self.skipTest("system does not support pidfd_open") 1517 if isinstance(cm.exception, PermissionError): 1518 self.skipTest(f"pidfd_open syscall blocked: {cm.exception!r}") 1519 self.assertEqual(cm.exception.errno, errno.EINVAL) 1520 os.close(os.pidfd_open(os.getpid(), 0)) 1521 1522class PosixGroupsTester(unittest.TestCase): 1523 1524 def setUp(self): 1525 if posix.getuid() != 0: 1526 raise unittest.SkipTest("not enough privileges") 1527 if not hasattr(posix, 'getgroups'): 1528 raise unittest.SkipTest("need posix.getgroups") 1529 if sys.platform == 'darwin': 1530 raise unittest.SkipTest("getgroups(2) is broken on OSX") 1531 self.saved_groups = posix.getgroups() 1532 1533 def tearDown(self): 1534 if hasattr(posix, 'setgroups'): 1535 posix.setgroups(self.saved_groups) 1536 elif hasattr(posix, 'initgroups'): 1537 name = pwd.getpwuid(posix.getuid()).pw_name 1538 posix.initgroups(name, self.saved_groups[0]) 1539 1540 @unittest.skipUnless(hasattr(posix, 'initgroups'), 1541 "test needs posix.initgroups()") 1542 def test_initgroups(self): 1543 # find missing group 1544 1545 g = max(self.saved_groups or [0]) + 1 1546 name = pwd.getpwuid(posix.getuid()).pw_name 1547 posix.initgroups(name, g) 1548 self.assertIn(g, posix.getgroups()) 1549 1550 @unittest.skipUnless(hasattr(posix, 'setgroups'), 1551 "test needs posix.setgroups()") 1552 def test_setgroups(self): 1553 for groups in [[0], list(range(16))]: 1554 posix.setgroups(groups) 1555 self.assertListEqual(groups, posix.getgroups()) 1556 1557 1558class _PosixSpawnMixin: 1559 # Program which does nothing and exits with status 0 (success) 1560 NOOP_PROGRAM = (sys.executable, '-I', '-S', '-c', 'pass') 1561 spawn_func = None 1562 1563 def python_args(self, *args): 1564 # Disable site module to avoid side effects. For example, 1565 # on Fedora 28, if the HOME environment variable is not set, 1566 # site._getuserbase() calls pwd.getpwuid() which opens 1567 # /var/lib/sss/mc/passwd but then leaves the file open which makes 1568 # test_close_file() to fail. 1569 return (sys.executable, '-I', '-S', *args) 1570 1571 def test_returns_pid(self): 1572 pidfile = os_helper.TESTFN 1573 self.addCleanup(os_helper.unlink, pidfile) 1574 script = f"""if 1: 1575 import os 1576 with open({pidfile!r}, "w") as pidfile: 1577 pidfile.write(str(os.getpid())) 1578 """ 1579 args = self.python_args('-c', script) 1580 pid = self.spawn_func(args[0], args, os.environ) 1581 support.wait_process(pid, exitcode=0) 1582 with open(pidfile, encoding="utf-8") as f: 1583 self.assertEqual(f.read(), str(pid)) 1584 1585 def test_no_such_executable(self): 1586 no_such_executable = 'no_such_executable' 1587 try: 1588 pid = self.spawn_func(no_such_executable, 1589 [no_such_executable], 1590 os.environ) 1591 # bpo-35794: PermissionError can be raised if there are 1592 # directories in the $PATH that are not accessible. 1593 except (FileNotFoundError, PermissionError) as exc: 1594 self.assertEqual(exc.filename, no_such_executable) 1595 else: 1596 pid2, status = os.waitpid(pid, 0) 1597 self.assertEqual(pid2, pid) 1598 self.assertNotEqual(status, 0) 1599 1600 def test_specify_environment(self): 1601 envfile = os_helper.TESTFN 1602 self.addCleanup(os_helper.unlink, envfile) 1603 script = f"""if 1: 1604 import os 1605 with open({envfile!r}, "w", encoding="utf-8") as envfile: 1606 envfile.write(os.environ['foo']) 1607 """ 1608 args = self.python_args('-c', script) 1609 pid = self.spawn_func(args[0], args, 1610 {**os.environ, 'foo': 'bar'}) 1611 support.wait_process(pid, exitcode=0) 1612 with open(envfile, encoding="utf-8") as f: 1613 self.assertEqual(f.read(), 'bar') 1614 1615 def test_none_file_actions(self): 1616 pid = self.spawn_func( 1617 self.NOOP_PROGRAM[0], 1618 self.NOOP_PROGRAM, 1619 os.environ, 1620 file_actions=None 1621 ) 1622 support.wait_process(pid, exitcode=0) 1623 1624 def test_empty_file_actions(self): 1625 pid = self.spawn_func( 1626 self.NOOP_PROGRAM[0], 1627 self.NOOP_PROGRAM, 1628 os.environ, 1629 file_actions=[] 1630 ) 1631 support.wait_process(pid, exitcode=0) 1632 1633 def test_resetids_explicit_default(self): 1634 pid = self.spawn_func( 1635 sys.executable, 1636 [sys.executable, '-c', 'pass'], 1637 os.environ, 1638 resetids=False 1639 ) 1640 support.wait_process(pid, exitcode=0) 1641 1642 def test_resetids(self): 1643 pid = self.spawn_func( 1644 sys.executable, 1645 [sys.executable, '-c', 'pass'], 1646 os.environ, 1647 resetids=True 1648 ) 1649 support.wait_process(pid, exitcode=0) 1650 1651 def test_resetids_wrong_type(self): 1652 with self.assertRaises(TypeError): 1653 self.spawn_func(sys.executable, 1654 [sys.executable, "-c", "pass"], 1655 os.environ, resetids=None) 1656 1657 def test_setpgroup(self): 1658 pid = self.spawn_func( 1659 sys.executable, 1660 [sys.executable, '-c', 'pass'], 1661 os.environ, 1662 setpgroup=os.getpgrp() 1663 ) 1664 support.wait_process(pid, exitcode=0) 1665 1666 def test_setpgroup_wrong_type(self): 1667 with self.assertRaises(TypeError): 1668 self.spawn_func(sys.executable, 1669 [sys.executable, "-c", "pass"], 1670 os.environ, setpgroup="023") 1671 1672 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 1673 'need signal.pthread_sigmask()') 1674 def test_setsigmask(self): 1675 code = textwrap.dedent("""\ 1676 import signal 1677 signal.raise_signal(signal.SIGUSR1)""") 1678 1679 pid = self.spawn_func( 1680 sys.executable, 1681 [sys.executable, '-c', code], 1682 os.environ, 1683 setsigmask=[signal.SIGUSR1] 1684 ) 1685 support.wait_process(pid, exitcode=0) 1686 1687 def test_setsigmask_wrong_type(self): 1688 with self.assertRaises(TypeError): 1689 self.spawn_func(sys.executable, 1690 [sys.executable, "-c", "pass"], 1691 os.environ, setsigmask=34) 1692 with self.assertRaises(TypeError): 1693 self.spawn_func(sys.executable, 1694 [sys.executable, "-c", "pass"], 1695 os.environ, setsigmask=["j"]) 1696 with self.assertRaises(ValueError): 1697 self.spawn_func(sys.executable, 1698 [sys.executable, "-c", "pass"], 1699 os.environ, setsigmask=[signal.NSIG, 1700 signal.NSIG+1]) 1701 1702 def test_setsid(self): 1703 rfd, wfd = os.pipe() 1704 self.addCleanup(os.close, rfd) 1705 try: 1706 os.set_inheritable(wfd, True) 1707 1708 code = textwrap.dedent(f""" 1709 import os 1710 fd = {wfd} 1711 sid = os.getsid(0) 1712 os.write(fd, str(sid).encode()) 1713 """) 1714 1715 try: 1716 pid = self.spawn_func(sys.executable, 1717 [sys.executable, "-c", code], 1718 os.environ, setsid=True) 1719 except NotImplementedError as exc: 1720 self.skipTest(f"setsid is not supported: {exc!r}") 1721 except PermissionError as exc: 1722 self.skipTest(f"setsid failed with: {exc!r}") 1723 finally: 1724 os.close(wfd) 1725 1726 support.wait_process(pid, exitcode=0) 1727 1728 output = os.read(rfd, 100) 1729 child_sid = int(output) 1730 parent_sid = os.getsid(os.getpid()) 1731 self.assertNotEqual(parent_sid, child_sid) 1732 1733 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 1734 'need signal.pthread_sigmask()') 1735 def test_setsigdef(self): 1736 original_handler = signal.signal(signal.SIGUSR1, signal.SIG_IGN) 1737 code = textwrap.dedent("""\ 1738 import signal 1739 signal.raise_signal(signal.SIGUSR1)""") 1740 try: 1741 pid = self.spawn_func( 1742 sys.executable, 1743 [sys.executable, '-c', code], 1744 os.environ, 1745 setsigdef=[signal.SIGUSR1] 1746 ) 1747 finally: 1748 signal.signal(signal.SIGUSR1, original_handler) 1749 1750 support.wait_process(pid, exitcode=-signal.SIGUSR1) 1751 1752 def test_setsigdef_wrong_type(self): 1753 with self.assertRaises(TypeError): 1754 self.spawn_func(sys.executable, 1755 [sys.executable, "-c", "pass"], 1756 os.environ, setsigdef=34) 1757 with self.assertRaises(TypeError): 1758 self.spawn_func(sys.executable, 1759 [sys.executable, "-c", "pass"], 1760 os.environ, setsigdef=["j"]) 1761 with self.assertRaises(ValueError): 1762 self.spawn_func(sys.executable, 1763 [sys.executable, "-c", "pass"], 1764 os.environ, setsigdef=[signal.NSIG, signal.NSIG+1]) 1765 1766 @requires_sched 1767 @unittest.skipIf(sys.platform.startswith(('freebsd', 'netbsd')), 1768 "bpo-34685: test can fail on BSD") 1769 def test_setscheduler_only_param(self): 1770 policy = os.sched_getscheduler(0) 1771 priority = os.sched_get_priority_min(policy) 1772 code = textwrap.dedent(f"""\ 1773 import os, sys 1774 if os.sched_getscheduler(0) != {policy}: 1775 sys.exit(101) 1776 if os.sched_getparam(0).sched_priority != {priority}: 1777 sys.exit(102)""") 1778 pid = self.spawn_func( 1779 sys.executable, 1780 [sys.executable, '-c', code], 1781 os.environ, 1782 scheduler=(None, os.sched_param(priority)) 1783 ) 1784 support.wait_process(pid, exitcode=0) 1785 1786 @requires_sched 1787 @unittest.skipIf(sys.platform.startswith(('freebsd', 'netbsd')), 1788 "bpo-34685: test can fail on BSD") 1789 def test_setscheduler_with_policy(self): 1790 policy = os.sched_getscheduler(0) 1791 priority = os.sched_get_priority_min(policy) 1792 code = textwrap.dedent(f"""\ 1793 import os, sys 1794 if os.sched_getscheduler(0) != {policy}: 1795 sys.exit(101) 1796 if os.sched_getparam(0).sched_priority != {priority}: 1797 sys.exit(102)""") 1798 pid = self.spawn_func( 1799 sys.executable, 1800 [sys.executable, '-c', code], 1801 os.environ, 1802 scheduler=(policy, os.sched_param(priority)) 1803 ) 1804 support.wait_process(pid, exitcode=0) 1805 1806 def test_multiple_file_actions(self): 1807 file_actions = [ 1808 (os.POSIX_SPAWN_OPEN, 3, os.path.realpath(__file__), os.O_RDONLY, 0), 1809 (os.POSIX_SPAWN_CLOSE, 0), 1810 (os.POSIX_SPAWN_DUP2, 1, 4), 1811 ] 1812 pid = self.spawn_func(self.NOOP_PROGRAM[0], 1813 self.NOOP_PROGRAM, 1814 os.environ, 1815 file_actions=file_actions) 1816 support.wait_process(pid, exitcode=0) 1817 1818 def test_bad_file_actions(self): 1819 args = self.NOOP_PROGRAM 1820 with self.assertRaises(TypeError): 1821 self.spawn_func(args[0], args, os.environ, 1822 file_actions=[None]) 1823 with self.assertRaises(TypeError): 1824 self.spawn_func(args[0], args, os.environ, 1825 file_actions=[()]) 1826 with self.assertRaises(TypeError): 1827 self.spawn_func(args[0], args, os.environ, 1828 file_actions=[(None,)]) 1829 with self.assertRaises(TypeError): 1830 self.spawn_func(args[0], args, os.environ, 1831 file_actions=[(12345,)]) 1832 with self.assertRaises(TypeError): 1833 self.spawn_func(args[0], args, os.environ, 1834 file_actions=[(os.POSIX_SPAWN_CLOSE,)]) 1835 with self.assertRaises(TypeError): 1836 self.spawn_func(args[0], args, os.environ, 1837 file_actions=[(os.POSIX_SPAWN_CLOSE, 1, 2)]) 1838 with self.assertRaises(TypeError): 1839 self.spawn_func(args[0], args, os.environ, 1840 file_actions=[(os.POSIX_SPAWN_CLOSE, None)]) 1841 with self.assertRaises(ValueError): 1842 self.spawn_func(args[0], args, os.environ, 1843 file_actions=[(os.POSIX_SPAWN_OPEN, 1844 3, __file__ + '\0', 1845 os.O_RDONLY, 0)]) 1846 1847 def test_open_file(self): 1848 outfile = os_helper.TESTFN 1849 self.addCleanup(os_helper.unlink, outfile) 1850 script = """if 1: 1851 import sys 1852 sys.stdout.write("hello") 1853 """ 1854 file_actions = [ 1855 (os.POSIX_SPAWN_OPEN, 1, outfile, 1856 os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 1857 stat.S_IRUSR | stat.S_IWUSR), 1858 ] 1859 args = self.python_args('-c', script) 1860 pid = self.spawn_func(args[0], args, os.environ, 1861 file_actions=file_actions) 1862 1863 support.wait_process(pid, exitcode=0) 1864 with open(outfile, encoding="utf-8") as f: 1865 self.assertEqual(f.read(), 'hello') 1866 1867 def test_close_file(self): 1868 closefile = os_helper.TESTFN 1869 self.addCleanup(os_helper.unlink, closefile) 1870 script = f"""if 1: 1871 import os 1872 try: 1873 os.fstat(0) 1874 except OSError as e: 1875 with open({closefile!r}, 'w', encoding='utf-8') as closefile: 1876 closefile.write('is closed %d' % e.errno) 1877 """ 1878 args = self.python_args('-c', script) 1879 pid = self.spawn_func(args[0], args, os.environ, 1880 file_actions=[(os.POSIX_SPAWN_CLOSE, 0)]) 1881 1882 support.wait_process(pid, exitcode=0) 1883 with open(closefile, encoding="utf-8") as f: 1884 self.assertEqual(f.read(), 'is closed %d' % errno.EBADF) 1885 1886 def test_dup2(self): 1887 dupfile = os_helper.TESTFN 1888 self.addCleanup(os_helper.unlink, dupfile) 1889 script = """if 1: 1890 import sys 1891 sys.stdout.write("hello") 1892 """ 1893 with open(dupfile, "wb") as childfile: 1894 file_actions = [ 1895 (os.POSIX_SPAWN_DUP2, childfile.fileno(), 1), 1896 ] 1897 args = self.python_args('-c', script) 1898 pid = self.spawn_func(args[0], args, os.environ, 1899 file_actions=file_actions) 1900 support.wait_process(pid, exitcode=0) 1901 with open(dupfile, encoding="utf-8") as f: 1902 self.assertEqual(f.read(), 'hello') 1903 1904 1905@unittest.skipUnless(hasattr(os, 'posix_spawn'), "test needs os.posix_spawn") 1906class TestPosixSpawn(unittest.TestCase, _PosixSpawnMixin): 1907 spawn_func = getattr(posix, 'posix_spawn', None) 1908 1909 1910@unittest.skipUnless(hasattr(os, 'posix_spawnp'), "test needs os.posix_spawnp") 1911class TestPosixSpawnP(unittest.TestCase, _PosixSpawnMixin): 1912 spawn_func = getattr(posix, 'posix_spawnp', None) 1913 1914 @os_helper.skip_unless_symlink 1915 def test_posix_spawnp(self): 1916 # Use a symlink to create a program in its own temporary directory 1917 temp_dir = tempfile.mkdtemp() 1918 self.addCleanup(os_helper.rmtree, temp_dir) 1919 1920 program = 'posix_spawnp_test_program.exe' 1921 program_fullpath = os.path.join(temp_dir, program) 1922 os.symlink(sys.executable, program_fullpath) 1923 1924 try: 1925 path = os.pathsep.join((temp_dir, os.environ['PATH'])) 1926 except KeyError: 1927 path = temp_dir # PATH is not set 1928 1929 spawn_args = (program, '-I', '-S', '-c', 'pass') 1930 code = textwrap.dedent(""" 1931 import os 1932 from test import support 1933 1934 args = %a 1935 pid = os.posix_spawnp(args[0], args, os.environ) 1936 1937 support.wait_process(pid, exitcode=0) 1938 """ % (spawn_args,)) 1939 1940 # Use a subprocess to test os.posix_spawnp() with a modified PATH 1941 # environment variable: posix_spawnp() uses the current environment 1942 # to locate the program, not its environment argument. 1943 args = ('-c', code) 1944 assert_python_ok(*args, PATH=path) 1945 1946 1947@unittest.skipUnless(sys.platform == "darwin", "test weak linking on macOS") 1948class TestPosixWeaklinking(unittest.TestCase): 1949 # These test cases verify that weak linking support on macOS works 1950 # as expected. These cases only test new behaviour introduced by weak linking, 1951 # regular behaviour is tested by the normal test cases. 1952 # 1953 # See the section on Weak Linking in Mac/README.txt for more information. 1954 def setUp(self): 1955 import sysconfig 1956 import platform 1957 1958 config_vars = sysconfig.get_config_vars() 1959 self.available = { nm for nm in config_vars if nm.startswith("HAVE_") and config_vars[nm] } 1960 self.mac_ver = tuple(int(part) for part in platform.mac_ver()[0].split(".")) 1961 1962 def _verify_available(self, name): 1963 if name not in self.available: 1964 raise unittest.SkipTest(f"{name} not weak-linked") 1965 1966 def test_pwritev(self): 1967 self._verify_available("HAVE_PWRITEV") 1968 if self.mac_ver >= (10, 16): 1969 self.assertTrue(hasattr(os, "pwritev"), "os.pwritev is not available") 1970 self.assertTrue(hasattr(os, "preadv"), "os.readv is not available") 1971 1972 else: 1973 self.assertFalse(hasattr(os, "pwritev"), "os.pwritev is available") 1974 self.assertFalse(hasattr(os, "preadv"), "os.readv is available") 1975 1976 def test_stat(self): 1977 self._verify_available("HAVE_FSTATAT") 1978 if self.mac_ver >= (10, 10): 1979 self.assertIn("HAVE_FSTATAT", posix._have_functions) 1980 1981 else: 1982 self.assertNotIn("HAVE_FSTATAT", posix._have_functions) 1983 1984 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 1985 os.stat("file", dir_fd=0) 1986 1987 def test_access(self): 1988 self._verify_available("HAVE_FACCESSAT") 1989 if self.mac_ver >= (10, 10): 1990 self.assertIn("HAVE_FACCESSAT", posix._have_functions) 1991 1992 else: 1993 self.assertNotIn("HAVE_FACCESSAT", posix._have_functions) 1994 1995 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 1996 os.access("file", os.R_OK, dir_fd=0) 1997 1998 with self.assertRaisesRegex(NotImplementedError, "follow_symlinks unavailable"): 1999 os.access("file", os.R_OK, follow_symlinks=False) 2000 2001 with self.assertRaisesRegex(NotImplementedError, "effective_ids unavailable"): 2002 os.access("file", os.R_OK, effective_ids=True) 2003 2004 def test_chmod(self): 2005 self._verify_available("HAVE_FCHMODAT") 2006 if self.mac_ver >= (10, 10): 2007 self.assertIn("HAVE_FCHMODAT", posix._have_functions) 2008 2009 else: 2010 self.assertNotIn("HAVE_FCHMODAT", posix._have_functions) 2011 self.assertIn("HAVE_LCHMOD", posix._have_functions) 2012 2013 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2014 os.chmod("file", 0o644, dir_fd=0) 2015 2016 def test_chown(self): 2017 self._verify_available("HAVE_FCHOWNAT") 2018 if self.mac_ver >= (10, 10): 2019 self.assertIn("HAVE_FCHOWNAT", posix._have_functions) 2020 2021 else: 2022 self.assertNotIn("HAVE_FCHOWNAT", posix._have_functions) 2023 self.assertIn("HAVE_LCHOWN", posix._have_functions) 2024 2025 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2026 os.chown("file", 0, 0, dir_fd=0) 2027 2028 def test_link(self): 2029 self._verify_available("HAVE_LINKAT") 2030 if self.mac_ver >= (10, 10): 2031 self.assertIn("HAVE_LINKAT", posix._have_functions) 2032 2033 else: 2034 self.assertNotIn("HAVE_LINKAT", posix._have_functions) 2035 2036 with self.assertRaisesRegex(NotImplementedError, "src_dir_fd unavailable"): 2037 os.link("source", "target", src_dir_fd=0) 2038 2039 with self.assertRaisesRegex(NotImplementedError, "dst_dir_fd unavailable"): 2040 os.link("source", "target", dst_dir_fd=0) 2041 2042 with self.assertRaisesRegex(NotImplementedError, "src_dir_fd unavailable"): 2043 os.link("source", "target", src_dir_fd=0, dst_dir_fd=0) 2044 2045 # issue 41355: !HAVE_LINKAT code path ignores the follow_symlinks flag 2046 with os_helper.temp_dir() as base_path: 2047 link_path = os.path.join(base_path, "link") 2048 target_path = os.path.join(base_path, "target") 2049 source_path = os.path.join(base_path, "source") 2050 2051 with open(source_path, "w") as fp: 2052 fp.write("data") 2053 2054 os.symlink("target", link_path) 2055 2056 # Calling os.link should fail in the link(2) call, and 2057 # should not reject *follow_symlinks* (to match the 2058 # behaviour you'd get when building on a platform without 2059 # linkat) 2060 with self.assertRaises(FileExistsError): 2061 os.link(source_path, link_path, follow_symlinks=True) 2062 2063 with self.assertRaises(FileExistsError): 2064 os.link(source_path, link_path, follow_symlinks=False) 2065 2066 2067 def test_listdir_scandir(self): 2068 self._verify_available("HAVE_FDOPENDIR") 2069 if self.mac_ver >= (10, 10): 2070 self.assertIn("HAVE_FDOPENDIR", posix._have_functions) 2071 2072 else: 2073 self.assertNotIn("HAVE_FDOPENDIR", posix._have_functions) 2074 2075 with self.assertRaisesRegex(TypeError, "listdir: path should be string, bytes, os.PathLike or None, not int"): 2076 os.listdir(0) 2077 2078 with self.assertRaisesRegex(TypeError, "scandir: path should be string, bytes, os.PathLike or None, not int"): 2079 os.scandir(0) 2080 2081 def test_mkdir(self): 2082 self._verify_available("HAVE_MKDIRAT") 2083 if self.mac_ver >= (10, 10): 2084 self.assertIn("HAVE_MKDIRAT", posix._have_functions) 2085 2086 else: 2087 self.assertNotIn("HAVE_MKDIRAT", posix._have_functions) 2088 2089 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2090 os.mkdir("dir", dir_fd=0) 2091 2092 def test_rename_replace(self): 2093 self._verify_available("HAVE_RENAMEAT") 2094 if self.mac_ver >= (10, 10): 2095 self.assertIn("HAVE_RENAMEAT", posix._have_functions) 2096 2097 else: 2098 self.assertNotIn("HAVE_RENAMEAT", posix._have_functions) 2099 2100 with self.assertRaisesRegex(NotImplementedError, "src_dir_fd and dst_dir_fd unavailable"): 2101 os.rename("a", "b", src_dir_fd=0) 2102 2103 with self.assertRaisesRegex(NotImplementedError, "src_dir_fd and dst_dir_fd unavailable"): 2104 os.rename("a", "b", dst_dir_fd=0) 2105 2106 with self.assertRaisesRegex(NotImplementedError, "src_dir_fd and dst_dir_fd unavailable"): 2107 os.replace("a", "b", src_dir_fd=0) 2108 2109 with self.assertRaisesRegex(NotImplementedError, "src_dir_fd and dst_dir_fd unavailable"): 2110 os.replace("a", "b", dst_dir_fd=0) 2111 2112 def test_unlink_rmdir(self): 2113 self._verify_available("HAVE_UNLINKAT") 2114 if self.mac_ver >= (10, 10): 2115 self.assertIn("HAVE_UNLINKAT", posix._have_functions) 2116 2117 else: 2118 self.assertNotIn("HAVE_UNLINKAT", posix._have_functions) 2119 2120 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2121 os.unlink("path", dir_fd=0) 2122 2123 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2124 os.rmdir("path", dir_fd=0) 2125 2126 def test_open(self): 2127 self._verify_available("HAVE_OPENAT") 2128 if self.mac_ver >= (10, 10): 2129 self.assertIn("HAVE_OPENAT", posix._have_functions) 2130 2131 else: 2132 self.assertNotIn("HAVE_OPENAT", posix._have_functions) 2133 2134 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2135 os.open("path", os.O_RDONLY, dir_fd=0) 2136 2137 def test_readlink(self): 2138 self._verify_available("HAVE_READLINKAT") 2139 if self.mac_ver >= (10, 10): 2140 self.assertIn("HAVE_READLINKAT", posix._have_functions) 2141 2142 else: 2143 self.assertNotIn("HAVE_READLINKAT", posix._have_functions) 2144 2145 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2146 os.readlink("path", dir_fd=0) 2147 2148 def test_symlink(self): 2149 self._verify_available("HAVE_SYMLINKAT") 2150 if self.mac_ver >= (10, 10): 2151 self.assertIn("HAVE_SYMLINKAT", posix._have_functions) 2152 2153 else: 2154 self.assertNotIn("HAVE_SYMLINKAT", posix._have_functions) 2155 2156 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2157 os.symlink("a", "b", dir_fd=0) 2158 2159 def test_utime(self): 2160 self._verify_available("HAVE_FUTIMENS") 2161 self._verify_available("HAVE_UTIMENSAT") 2162 if self.mac_ver >= (10, 13): 2163 self.assertIn("HAVE_FUTIMENS", posix._have_functions) 2164 self.assertIn("HAVE_UTIMENSAT", posix._have_functions) 2165 2166 else: 2167 self.assertNotIn("HAVE_FUTIMENS", posix._have_functions) 2168 self.assertNotIn("HAVE_UTIMENSAT", posix._have_functions) 2169 2170 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2171 os.utime("path", dir_fd=0) 2172 2173 2174def tearDownModule(): 2175 support.reap_children() 2176 2177 2178if __name__ == '__main__': 2179 unittest.main() 2180