1"Test posix functions" 2 3from test import support 4from test.support import is_apple 5from test.support import os_helper 6from test.support import warnings_helper 7from test.support.script_helper import assert_python_ok 8 9import copy 10import errno 11import sys 12import signal 13import time 14import os 15import platform 16import pickle 17import stat 18import tempfile 19import unittest 20import warnings 21import textwrap 22from contextlib import contextmanager 23 24try: 25 import posix 26except ImportError: 27 import nt as posix 28 29try: 30 import pwd 31except ImportError: 32 pwd = None 33 34_DUMMY_SYMLINK = os.path.join(tempfile.gettempdir(), 35 os_helper.TESTFN + '-dummy-symlink') 36 37requires_32b = unittest.skipUnless( 38 # Emscripten/WASI have 32 bits pointers, but support 64 bits syscall args. 39 sys.maxsize < 2**32 and not (support.is_emscripten or support.is_wasi), 40 'test is only meaningful on 32-bit builds' 41) 42 43def _supports_sched(): 44 if not hasattr(posix, 'sched_getscheduler'): 45 return False 46 try: 47 posix.sched_getscheduler(0) 48 except OSError as e: 49 if e.errno == errno.ENOSYS: 50 return False 51 return True 52 53requires_sched = unittest.skipUnless(_supports_sched(), 'requires POSIX scheduler API') 54 55 56class PosixTester(unittest.TestCase): 57 58 def setUp(self): 59 # create empty file 60 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 61 with open(os_helper.TESTFN, "wb"): 62 pass 63 self.enterContext(warnings_helper.check_warnings()) 64 warnings.filterwarnings('ignore', '.* potential security risk .*', 65 RuntimeWarning) 66 67 def testNoArgFunctions(self): 68 # test posix functions which take no arguments and have 69 # no side-effects which we need to cleanup (e.g., fork, wait, abort) 70 NO_ARG_FUNCTIONS = [ "ctermid", "getcwd", "getcwdb", "uname", 71 "times", "getloadavg", 72 "getegid", "geteuid", "getgid", "getgroups", 73 "getpid", "getpgrp", "getppid", "getuid", "sync", 74 ] 75 76 for name in NO_ARG_FUNCTIONS: 77 posix_func = getattr(posix, name, None) 78 if posix_func is not None: 79 with self.subTest(name): 80 posix_func() 81 self.assertRaises(TypeError, posix_func, 1) 82 83 @unittest.skipUnless(hasattr(posix, 'getresuid'), 84 'test needs posix.getresuid()') 85 def test_getresuid(self): 86 user_ids = posix.getresuid() 87 self.assertEqual(len(user_ids), 3) 88 for val in user_ids: 89 self.assertGreaterEqual(val, 0) 90 91 @unittest.skipUnless(hasattr(posix, 'getresgid'), 92 'test needs posix.getresgid()') 93 def test_getresgid(self): 94 group_ids = posix.getresgid() 95 self.assertEqual(len(group_ids), 3) 96 for val in group_ids: 97 self.assertGreaterEqual(val, 0) 98 99 @unittest.skipUnless(hasattr(posix, 'setresuid'), 100 'test needs posix.setresuid()') 101 def test_setresuid(self): 102 current_user_ids = posix.getresuid() 103 self.assertIsNone(posix.setresuid(*current_user_ids)) 104 # -1 means don't change that value. 105 self.assertIsNone(posix.setresuid(-1, -1, -1)) 106 107 @unittest.skipUnless(hasattr(posix, 'setresuid'), 108 'test needs posix.setresuid()') 109 def test_setresuid_exception(self): 110 # Don't do this test if someone is silly enough to run us as root. 111 current_user_ids = posix.getresuid() 112 if 0 not in current_user_ids: 113 new_user_ids = (current_user_ids[0]+1, -1, -1) 114 self.assertRaises(OSError, posix.setresuid, *new_user_ids) 115 116 @unittest.skipUnless(hasattr(posix, 'setresgid'), 117 'test needs posix.setresgid()') 118 def test_setresgid(self): 119 current_group_ids = posix.getresgid() 120 self.assertIsNone(posix.setresgid(*current_group_ids)) 121 # -1 means don't change that value. 122 self.assertIsNone(posix.setresgid(-1, -1, -1)) 123 124 @unittest.skipUnless(hasattr(posix, 'setresgid'), 125 'test needs posix.setresgid()') 126 def test_setresgid_exception(self): 127 # Don't do this test if someone is silly enough to run us as root. 128 current_group_ids = posix.getresgid() 129 if 0 not in current_group_ids: 130 new_group_ids = (current_group_ids[0]+1, -1, -1) 131 self.assertRaises(OSError, posix.setresgid, *new_group_ids) 132 133 @unittest.skipUnless(hasattr(posix, 'initgroups'), 134 "test needs os.initgroups()") 135 @unittest.skipUnless(hasattr(pwd, 'getpwuid'), "test needs pwd.getpwuid()") 136 def test_initgroups(self): 137 # It takes a string and an integer; check that it raises a TypeError 138 # for other argument lists. 139 self.assertRaises(TypeError, posix.initgroups) 140 self.assertRaises(TypeError, posix.initgroups, None) 141 self.assertRaises(TypeError, posix.initgroups, 3, "foo") 142 self.assertRaises(TypeError, posix.initgroups, "foo", 3, object()) 143 144 # If a non-privileged user invokes it, it should fail with OSError 145 # EPERM. 146 if os.getuid() != 0: 147 try: 148 name = pwd.getpwuid(posix.getuid()).pw_name 149 except KeyError: 150 # the current UID may not have a pwd entry 151 raise unittest.SkipTest("need a pwd entry") 152 try: 153 posix.initgroups(name, 13) 154 except OSError as e: 155 self.assertEqual(e.errno, errno.EPERM) 156 else: 157 self.fail("Expected OSError to be raised by initgroups") 158 159 @unittest.skipUnless(hasattr(posix, 'statvfs'), 160 'test needs posix.statvfs()') 161 def test_statvfs(self): 162 self.assertTrue(posix.statvfs(os.curdir)) 163 164 @unittest.skipUnless(hasattr(posix, 'fstatvfs'), 165 'test needs posix.fstatvfs()') 166 def test_fstatvfs(self): 167 fp = open(os_helper.TESTFN) 168 try: 169 self.assertTrue(posix.fstatvfs(fp.fileno())) 170 self.assertTrue(posix.statvfs(fp.fileno())) 171 finally: 172 fp.close() 173 174 @unittest.skipUnless(hasattr(posix, 'ftruncate'), 175 'test needs posix.ftruncate()') 176 def test_ftruncate(self): 177 fp = open(os_helper.TESTFN, 'w+') 178 try: 179 # we need to have some data to truncate 180 fp.write('test') 181 fp.flush() 182 posix.ftruncate(fp.fileno(), 0) 183 finally: 184 fp.close() 185 186 @unittest.skipUnless(hasattr(posix, 'truncate'), "test needs posix.truncate()") 187 def test_truncate(self): 188 with open(os_helper.TESTFN, 'w') as fp: 189 fp.write('test') 190 fp.flush() 191 posix.truncate(os_helper.TESTFN, 0) 192 193 @unittest.skipUnless(getattr(os, 'execve', None) in os.supports_fd, "test needs execve() to support the fd parameter") 194 @support.requires_fork() 195 def test_fexecve(self): 196 fp = os.open(sys.executable, os.O_RDONLY) 197 try: 198 pid = os.fork() 199 if pid == 0: 200 os.chdir(os.path.split(sys.executable)[0]) 201 posix.execve(fp, [sys.executable, '-c', 'pass'], os.environ) 202 else: 203 support.wait_process(pid, exitcode=0) 204 finally: 205 os.close(fp) 206 207 208 @unittest.skipUnless(hasattr(posix, 'waitid'), "test needs posix.waitid()") 209 @support.requires_fork() 210 def test_waitid(self): 211 pid = os.fork() 212 if pid == 0: 213 os.chdir(os.path.split(sys.executable)[0]) 214 posix.execve(sys.executable, [sys.executable, '-c', 'pass'], os.environ) 215 else: 216 res = posix.waitid(posix.P_PID, pid, posix.WEXITED) 217 self.assertEqual(pid, res.si_pid) 218 219 @support.requires_fork() 220 def test_register_at_fork(self): 221 with self.assertRaises(TypeError, msg="Positional args not allowed"): 222 os.register_at_fork(lambda: None) 223 with self.assertRaises(TypeError, msg="Args must be callable"): 224 os.register_at_fork(before=2) 225 with self.assertRaises(TypeError, msg="Args must be callable"): 226 os.register_at_fork(after_in_child="three") 227 with self.assertRaises(TypeError, msg="Args must be callable"): 228 os.register_at_fork(after_in_parent=b"Five") 229 with self.assertRaises(TypeError, msg="Args must not be None"): 230 os.register_at_fork(before=None) 231 with self.assertRaises(TypeError, msg="Args must not be None"): 232 os.register_at_fork(after_in_child=None) 233 with self.assertRaises(TypeError, msg="Args must not be None"): 234 os.register_at_fork(after_in_parent=None) 235 with self.assertRaises(TypeError, msg="Invalid arg was allowed"): 236 # Ensure a combination of valid and invalid is an error. 237 os.register_at_fork(before=None, after_in_parent=lambda: 3) 238 with self.assertRaises(TypeError, msg="At least one argument is required"): 239 # when no arg is passed 240 os.register_at_fork() 241 with self.assertRaises(TypeError, msg="Invalid arg was allowed"): 242 # Ensure a combination of valid and invalid is an error. 243 os.register_at_fork(before=lambda: None, after_in_child='') 244 # We test actual registrations in their own process so as not to 245 # pollute this one. There is no way to unregister for cleanup. 246 code = """if 1: 247 import os 248 249 r, w = os.pipe() 250 fin_r, fin_w = os.pipe() 251 252 os.register_at_fork(before=lambda: os.write(w, b'A')) 253 os.register_at_fork(after_in_parent=lambda: os.write(w, b'C')) 254 os.register_at_fork(after_in_child=lambda: os.write(w, b'E')) 255 os.register_at_fork(before=lambda: os.write(w, b'B'), 256 after_in_parent=lambda: os.write(w, b'D'), 257 after_in_child=lambda: os.write(w, b'F')) 258 259 pid = os.fork() 260 if pid == 0: 261 # At this point, after-forkers have already been executed 262 os.close(w) 263 # Wait for parent to tell us to exit 264 os.read(fin_r, 1) 265 os._exit(0) 266 else: 267 try: 268 os.close(w) 269 with open(r, "rb") as f: 270 data = f.read() 271 assert len(data) == 6, data 272 # Check before-fork callbacks 273 assert data[:2] == b'BA', data 274 # Check after-fork callbacks 275 assert sorted(data[2:]) == list(b'CDEF'), data 276 assert data.index(b'C') < data.index(b'D'), data 277 assert data.index(b'E') < data.index(b'F'), data 278 finally: 279 os.write(fin_w, b'!') 280 """ 281 assert_python_ok('-c', code) 282 283 @unittest.skipUnless(hasattr(posix, 'lockf'), "test needs posix.lockf()") 284 def test_lockf(self): 285 fd = os.open(os_helper.TESTFN, os.O_WRONLY | os.O_CREAT) 286 try: 287 os.write(fd, b'test') 288 os.lseek(fd, 0, os.SEEK_SET) 289 posix.lockf(fd, posix.F_LOCK, 4) 290 # section is locked 291 posix.lockf(fd, posix.F_ULOCK, 4) 292 finally: 293 os.close(fd) 294 295 @unittest.skipUnless(hasattr(posix, 'pread'), "test needs posix.pread()") 296 def test_pread(self): 297 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 298 try: 299 os.write(fd, b'test') 300 os.lseek(fd, 0, os.SEEK_SET) 301 self.assertEqual(b'es', posix.pread(fd, 2, 1)) 302 # the first pread() shouldn't disturb the file offset 303 self.assertEqual(b'te', posix.read(fd, 2)) 304 finally: 305 os.close(fd) 306 307 @unittest.skipUnless(hasattr(posix, 'preadv'), "test needs posix.preadv()") 308 def test_preadv(self): 309 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 310 try: 311 os.write(fd, b'test1tt2t3t5t6t6t8') 312 buf = [bytearray(i) for i in [5, 3, 2]] 313 self.assertEqual(posix.preadv(fd, buf, 3), 10) 314 self.assertEqual([b't1tt2', b't3t', b'5t'], list(buf)) 315 finally: 316 os.close(fd) 317 318 @unittest.skipUnless(hasattr(posix, 'preadv'), "test needs posix.preadv()") 319 @unittest.skipUnless(hasattr(posix, 'RWF_HIPRI'), "test needs posix.RWF_HIPRI") 320 def test_preadv_flags(self): 321 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 322 try: 323 os.write(fd, b'test1tt2t3t5t6t6t8') 324 buf = [bytearray(i) for i in [5, 3, 2]] 325 self.assertEqual(posix.preadv(fd, buf, 3, os.RWF_HIPRI), 10) 326 self.assertEqual([b't1tt2', b't3t', b'5t'], list(buf)) 327 except NotImplementedError: 328 self.skipTest("preadv2 not available") 329 except OSError as inst: 330 # Is possible that the macro RWF_HIPRI was defined at compilation time 331 # but the option is not supported by the kernel or the runtime libc shared 332 # library. 333 if inst.errno in {errno.EINVAL, errno.ENOTSUP}: 334 raise unittest.SkipTest("RWF_HIPRI is not supported by the current system") 335 else: 336 raise 337 finally: 338 os.close(fd) 339 340 @unittest.skipUnless(hasattr(posix, 'preadv'), "test needs posix.preadv()") 341 @requires_32b 342 def test_preadv_overflow_32bits(self): 343 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 344 try: 345 buf = [bytearray(2**16)] * 2**15 346 with self.assertRaises(OSError) as cm: 347 os.preadv(fd, buf, 0) 348 self.assertEqual(cm.exception.errno, errno.EINVAL) 349 self.assertEqual(bytes(buf[0]), b'\0'* 2**16) 350 finally: 351 os.close(fd) 352 353 @unittest.skipUnless(hasattr(posix, 'pwrite'), "test needs posix.pwrite()") 354 def test_pwrite(self): 355 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 356 try: 357 os.write(fd, b'test') 358 os.lseek(fd, 0, os.SEEK_SET) 359 posix.pwrite(fd, b'xx', 1) 360 self.assertEqual(b'txxt', posix.read(fd, 4)) 361 finally: 362 os.close(fd) 363 364 @unittest.skipUnless(hasattr(posix, 'pwritev'), "test needs posix.pwritev()") 365 def test_pwritev(self): 366 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 367 try: 368 os.write(fd, b"xx") 369 os.lseek(fd, 0, os.SEEK_SET) 370 n = os.pwritev(fd, [b'test1', b'tt2', b't3'], 2) 371 self.assertEqual(n, 10) 372 373 os.lseek(fd, 0, os.SEEK_SET) 374 self.assertEqual(b'xxtest1tt2t3', posix.read(fd, 100)) 375 finally: 376 os.close(fd) 377 378 @unittest.skipUnless(hasattr(posix, 'pwritev'), "test needs posix.pwritev()") 379 @unittest.skipUnless(hasattr(posix, 'os.RWF_SYNC'), "test needs os.RWF_SYNC") 380 def test_pwritev_flags(self): 381 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 382 try: 383 os.write(fd,b"xx") 384 os.lseek(fd, 0, os.SEEK_SET) 385 n = os.pwritev(fd, [b'test1', b'tt2', b't3'], 2, os.RWF_SYNC) 386 self.assertEqual(n, 10) 387 388 os.lseek(fd, 0, os.SEEK_SET) 389 self.assertEqual(b'xxtest1tt2', posix.read(fd, 100)) 390 finally: 391 os.close(fd) 392 393 @unittest.skipUnless(hasattr(posix, 'pwritev'), "test needs posix.pwritev()") 394 @requires_32b 395 def test_pwritev_overflow_32bits(self): 396 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 397 try: 398 with self.assertRaises(OSError) as cm: 399 os.pwritev(fd, [b"x" * 2**16] * 2**15, 0) 400 self.assertEqual(cm.exception.errno, errno.EINVAL) 401 finally: 402 os.close(fd) 403 404 @unittest.skipUnless(hasattr(posix, 'posix_fallocate'), 405 "test needs posix.posix_fallocate()") 406 def test_posix_fallocate(self): 407 fd = os.open(os_helper.TESTFN, os.O_WRONLY | os.O_CREAT) 408 try: 409 posix.posix_fallocate(fd, 0, 10) 410 except OSError as inst: 411 # issue10812, ZFS doesn't appear to support posix_fallocate, 412 # so skip Solaris-based since they are likely to have ZFS. 413 # issue33655: Also ignore EINVAL on *BSD since ZFS is also 414 # often used there. 415 if inst.errno == errno.EINVAL and sys.platform.startswith( 416 ('sunos', 'freebsd', 'openbsd', 'gnukfreebsd')): 417 raise unittest.SkipTest("test may fail on ZFS filesystems") 418 elif inst.errno == errno.EOPNOTSUPP and sys.platform.startswith("netbsd"): 419 raise unittest.SkipTest("test may fail on FFS filesystems") 420 else: 421 raise 422 finally: 423 os.close(fd) 424 425 # issue31106 - posix_fallocate() does not set error in errno. 426 @unittest.skipUnless(hasattr(posix, 'posix_fallocate'), 427 "test needs posix.posix_fallocate()") 428 def test_posix_fallocate_errno(self): 429 try: 430 posix.posix_fallocate(-42, 0, 10) 431 except OSError as inst: 432 if inst.errno != errno.EBADF: 433 raise 434 435 @unittest.skipUnless(hasattr(posix, 'posix_fadvise'), 436 "test needs posix.posix_fadvise()") 437 def test_posix_fadvise(self): 438 fd = os.open(os_helper.TESTFN, os.O_RDONLY) 439 try: 440 posix.posix_fadvise(fd, 0, 0, posix.POSIX_FADV_WILLNEED) 441 finally: 442 os.close(fd) 443 444 @unittest.skipUnless(hasattr(posix, 'posix_fadvise'), 445 "test needs posix.posix_fadvise()") 446 def test_posix_fadvise_errno(self): 447 try: 448 posix.posix_fadvise(-42, 0, 0, posix.POSIX_FADV_WILLNEED) 449 except OSError as inst: 450 if inst.errno != errno.EBADF: 451 raise 452 453 @unittest.skipUnless(os.utime in os.supports_fd, "test needs fd support in os.utime") 454 def test_utime_with_fd(self): 455 now = time.time() 456 fd = os.open(os_helper.TESTFN, os.O_RDONLY) 457 try: 458 posix.utime(fd) 459 posix.utime(fd, None) 460 self.assertRaises(TypeError, posix.utime, fd, (None, None)) 461 self.assertRaises(TypeError, posix.utime, fd, (now, None)) 462 self.assertRaises(TypeError, posix.utime, fd, (None, now)) 463 posix.utime(fd, (int(now), int(now))) 464 posix.utime(fd, (now, now)) 465 self.assertRaises(ValueError, posix.utime, fd, (now, now), ns=(now, now)) 466 self.assertRaises(ValueError, posix.utime, fd, (now, 0), ns=(None, None)) 467 self.assertRaises(ValueError, posix.utime, fd, (None, None), ns=(now, 0)) 468 posix.utime(fd, (int(now), int((now - int(now)) * 1e9))) 469 posix.utime(fd, ns=(int(now), int((now - int(now)) * 1e9))) 470 471 finally: 472 os.close(fd) 473 474 @unittest.skipUnless(os.utime in os.supports_follow_symlinks, "test needs follow_symlinks support in os.utime") 475 def test_utime_nofollow_symlinks(self): 476 now = time.time() 477 posix.utime(os_helper.TESTFN, None, follow_symlinks=False) 478 self.assertRaises(TypeError, posix.utime, os_helper.TESTFN, 479 (None, None), follow_symlinks=False) 480 self.assertRaises(TypeError, posix.utime, os_helper.TESTFN, 481 (now, None), follow_symlinks=False) 482 self.assertRaises(TypeError, posix.utime, os_helper.TESTFN, 483 (None, now), follow_symlinks=False) 484 posix.utime(os_helper.TESTFN, (int(now), int(now)), 485 follow_symlinks=False) 486 posix.utime(os_helper.TESTFN, (now, now), follow_symlinks=False) 487 posix.utime(os_helper.TESTFN, follow_symlinks=False) 488 489 @unittest.skipUnless(hasattr(posix, 'writev'), "test needs posix.writev()") 490 def test_writev(self): 491 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 492 try: 493 n = os.writev(fd, (b'test1', b'tt2', b't3')) 494 self.assertEqual(n, 10) 495 496 os.lseek(fd, 0, os.SEEK_SET) 497 self.assertEqual(b'test1tt2t3', posix.read(fd, 10)) 498 499 # Issue #20113: empty list of buffers should not crash 500 try: 501 size = posix.writev(fd, []) 502 except OSError: 503 # writev(fd, []) raises OSError(22, "Invalid argument") 504 # on OpenIndiana 505 pass 506 else: 507 self.assertEqual(size, 0) 508 finally: 509 os.close(fd) 510 511 @unittest.skipUnless(hasattr(posix, 'writev'), "test needs posix.writev()") 512 @requires_32b 513 def test_writev_overflow_32bits(self): 514 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 515 try: 516 with self.assertRaises(OSError) as cm: 517 os.writev(fd, [b"x" * 2**16] * 2**15) 518 self.assertEqual(cm.exception.errno, errno.EINVAL) 519 finally: 520 os.close(fd) 521 522 @unittest.skipUnless(hasattr(posix, 'readv'), "test needs posix.readv()") 523 def test_readv(self): 524 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 525 try: 526 os.write(fd, b'test1tt2t3') 527 os.lseek(fd, 0, os.SEEK_SET) 528 buf = [bytearray(i) for i in [5, 3, 2]] 529 self.assertEqual(posix.readv(fd, buf), 10) 530 self.assertEqual([b'test1', b'tt2', b't3'], [bytes(i) for i in buf]) 531 532 # Issue #20113: empty list of buffers should not crash 533 try: 534 size = posix.readv(fd, []) 535 except OSError: 536 # readv(fd, []) raises OSError(22, "Invalid argument") 537 # on OpenIndiana 538 pass 539 else: 540 self.assertEqual(size, 0) 541 finally: 542 os.close(fd) 543 544 @unittest.skipUnless(hasattr(posix, 'readv'), "test needs posix.readv()") 545 @requires_32b 546 def test_readv_overflow_32bits(self): 547 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 548 try: 549 buf = [bytearray(2**16)] * 2**15 550 with self.assertRaises(OSError) as cm: 551 os.readv(fd, buf) 552 self.assertEqual(cm.exception.errno, errno.EINVAL) 553 self.assertEqual(bytes(buf[0]), b'\0'* 2**16) 554 finally: 555 os.close(fd) 556 557 @unittest.skipUnless(hasattr(posix, 'dup'), 558 'test needs posix.dup()') 559 @unittest.skipIf(support.is_wasi, "WASI does not have dup()") 560 def test_dup(self): 561 fp = open(os_helper.TESTFN) 562 try: 563 fd = posix.dup(fp.fileno()) 564 self.assertIsInstance(fd, int) 565 os.close(fd) 566 finally: 567 fp.close() 568 569 @unittest.skipUnless(hasattr(posix, 'confstr'), 570 'test needs posix.confstr()') 571 @unittest.skipIf(support.is_apple_mobile, "gh-118201: Test is flaky on iOS") 572 def test_confstr(self): 573 self.assertRaises(ValueError, posix.confstr, "CS_garbage") 574 self.assertEqual(len(posix.confstr("CS_PATH")) > 0, True) 575 576 @unittest.skipUnless(hasattr(posix, 'dup2'), 577 'test needs posix.dup2()') 578 @unittest.skipIf(support.is_wasi, "WASI does not have dup2()") 579 def test_dup2(self): 580 fp1 = open(os_helper.TESTFN) 581 fp2 = open(os_helper.TESTFN) 582 try: 583 posix.dup2(fp1.fileno(), fp2.fileno()) 584 finally: 585 fp1.close() 586 fp2.close() 587 588 @unittest.skipUnless(hasattr(os, 'O_CLOEXEC'), "needs os.O_CLOEXEC") 589 @support.requires_linux_version(2, 6, 23) 590 @support.requires_subprocess() 591 def test_oscloexec(self): 592 fd = os.open(os_helper.TESTFN, os.O_RDONLY|os.O_CLOEXEC) 593 self.addCleanup(os.close, fd) 594 self.assertFalse(os.get_inheritable(fd)) 595 596 @unittest.skipUnless(hasattr(posix, 'O_EXLOCK'), 597 'test needs posix.O_EXLOCK') 598 def test_osexlock(self): 599 fd = os.open(os_helper.TESTFN, 600 os.O_WRONLY|os.O_EXLOCK|os.O_CREAT) 601 self.assertRaises(OSError, os.open, os_helper.TESTFN, 602 os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK) 603 os.close(fd) 604 605 if hasattr(posix, "O_SHLOCK"): 606 fd = os.open(os_helper.TESTFN, 607 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) 608 self.assertRaises(OSError, os.open, os_helper.TESTFN, 609 os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK) 610 os.close(fd) 611 612 @unittest.skipUnless(hasattr(posix, 'O_SHLOCK'), 613 'test needs posix.O_SHLOCK') 614 def test_osshlock(self): 615 fd1 = os.open(os_helper.TESTFN, 616 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) 617 fd2 = os.open(os_helper.TESTFN, 618 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) 619 os.close(fd2) 620 os.close(fd1) 621 622 if hasattr(posix, "O_EXLOCK"): 623 fd = os.open(os_helper.TESTFN, 624 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) 625 self.assertRaises(OSError, os.open, os_helper.TESTFN, 626 os.O_RDONLY|os.O_EXLOCK|os.O_NONBLOCK) 627 os.close(fd) 628 629 @unittest.skipUnless(hasattr(posix, 'fstat'), 630 'test needs posix.fstat()') 631 def test_fstat(self): 632 fp = open(os_helper.TESTFN) 633 try: 634 self.assertTrue(posix.fstat(fp.fileno())) 635 self.assertTrue(posix.stat(fp.fileno())) 636 637 self.assertRaisesRegex(TypeError, 638 'should be string, bytes, os.PathLike or integer, not', 639 posix.stat, float(fp.fileno())) 640 finally: 641 fp.close() 642 643 def test_stat(self): 644 self.assertTrue(posix.stat(os_helper.TESTFN)) 645 self.assertTrue(posix.stat(os.fsencode(os_helper.TESTFN))) 646 647 self.assertRaisesRegex(TypeError, 648 'should be string, bytes, os.PathLike or integer, not', 649 posix.stat, bytearray(os.fsencode(os_helper.TESTFN))) 650 self.assertRaisesRegex(TypeError, 651 'should be string, bytes, os.PathLike or integer, not', 652 posix.stat, None) 653 self.assertRaisesRegex(TypeError, 654 'should be string, bytes, os.PathLike or integer, not', 655 posix.stat, list(os_helper.TESTFN)) 656 self.assertRaisesRegex(TypeError, 657 'should be string, bytes, os.PathLike or integer, not', 658 posix.stat, list(os.fsencode(os_helper.TESTFN))) 659 660 @unittest.skipUnless(hasattr(posix, 'mkfifo'), "don't have mkfifo()") 661 def test_mkfifo(self): 662 if sys.platform == "vxworks": 663 fifo_path = os.path.join("/fifos/", os_helper.TESTFN) 664 else: 665 fifo_path = os_helper.TESTFN 666 os_helper.unlink(fifo_path) 667 self.addCleanup(os_helper.unlink, fifo_path) 668 try: 669 posix.mkfifo(fifo_path, stat.S_IRUSR | stat.S_IWUSR) 670 except PermissionError as e: 671 self.skipTest('posix.mkfifo(): %s' % e) 672 self.assertTrue(stat.S_ISFIFO(posix.stat(fifo_path).st_mode)) 673 674 @unittest.skipUnless(hasattr(posix, 'mknod') and hasattr(stat, 'S_IFIFO'), 675 "don't have mknod()/S_IFIFO") 676 def test_mknod(self): 677 # Test using mknod() to create a FIFO (the only use specified 678 # by POSIX). 679 os_helper.unlink(os_helper.TESTFN) 680 mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR 681 try: 682 posix.mknod(os_helper.TESTFN, mode, 0) 683 except OSError as e: 684 # Some old systems don't allow unprivileged users to use 685 # mknod(), or only support creating device nodes. 686 self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES)) 687 else: 688 self.assertTrue(stat.S_ISFIFO(posix.stat(os_helper.TESTFN).st_mode)) 689 690 # Keyword arguments are also supported 691 os_helper.unlink(os_helper.TESTFN) 692 try: 693 posix.mknod(path=os_helper.TESTFN, mode=mode, device=0, 694 dir_fd=None) 695 except OSError as e: 696 self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES)) 697 698 @unittest.skipUnless(hasattr(posix, 'makedev'), 'test needs posix.makedev()') 699 def test_makedev(self): 700 st = posix.stat(os_helper.TESTFN) 701 dev = st.st_dev 702 self.assertIsInstance(dev, int) 703 self.assertGreaterEqual(dev, 0) 704 705 major = posix.major(dev) 706 self.assertIsInstance(major, int) 707 self.assertGreaterEqual(major, 0) 708 self.assertEqual(posix.major(dev), major) 709 self.assertRaises(TypeError, posix.major, float(dev)) 710 self.assertRaises(TypeError, posix.major) 711 for x in -2, 2**64, -2**63-1: 712 self.assertRaises((ValueError, OverflowError), posix.major, x) 713 714 minor = posix.minor(dev) 715 self.assertIsInstance(minor, int) 716 self.assertGreaterEqual(minor, 0) 717 self.assertEqual(posix.minor(dev), minor) 718 self.assertRaises(TypeError, posix.minor, float(dev)) 719 self.assertRaises(TypeError, posix.minor) 720 for x in -2, 2**64, -2**63-1: 721 self.assertRaises((ValueError, OverflowError), posix.minor, x) 722 723 self.assertEqual(posix.makedev(major, minor), dev) 724 self.assertRaises(TypeError, posix.makedev, float(major), minor) 725 self.assertRaises(TypeError, posix.makedev, major, float(minor)) 726 self.assertRaises(TypeError, posix.makedev, major) 727 self.assertRaises(TypeError, posix.makedev) 728 for x in -2, 2**32, 2**64, -2**63-1: 729 self.assertRaises((ValueError, OverflowError), posix.makedev, x, minor) 730 self.assertRaises((ValueError, OverflowError), posix.makedev, major, x) 731 732 if sys.platform == 'linux': 733 NODEV = -1 734 self.assertEqual(posix.major(NODEV), NODEV) 735 self.assertEqual(posix.minor(NODEV), NODEV) 736 self.assertEqual(posix.makedev(NODEV, NODEV), NODEV) 737 738 def _test_all_chown_common(self, chown_func, first_param, stat_func): 739 """Common code for chown, fchown and lchown tests.""" 740 def check_stat(uid, gid): 741 if stat_func is not None: 742 stat = stat_func(first_param) 743 self.assertEqual(stat.st_uid, uid) 744 self.assertEqual(stat.st_gid, gid) 745 uid = os.getuid() 746 gid = os.getgid() 747 # test a successful chown call 748 chown_func(first_param, uid, gid) 749 check_stat(uid, gid) 750 chown_func(first_param, -1, gid) 751 check_stat(uid, gid) 752 chown_func(first_param, uid, -1) 753 check_stat(uid, gid) 754 755 if sys.platform == "vxworks": 756 # On VxWorks, root user id is 1 and 0 means no login user: 757 # both are super users. 758 is_root = (uid in (0, 1)) 759 else: 760 is_root = (uid == 0) 761 if support.is_emscripten: 762 # Emscripten getuid() / geteuid() always return 0 (root), but 763 # cannot chown uid/gid to random value. 764 pass 765 elif is_root: 766 # Try an amusingly large uid/gid to make sure we handle 767 # large unsigned values. (chown lets you use any 768 # uid/gid you like, even if they aren't defined.) 769 # 770 # On VxWorks uid_t is defined as unsigned short. A big 771 # value greater than 65535 will result in underflow error. 772 # 773 # This problem keeps coming up: 774 # http://bugs.python.org/issue1747858 775 # http://bugs.python.org/issue4591 776 # http://bugs.python.org/issue15301 777 # Hopefully the fix in 4591 fixes it for good! 778 # 779 # This part of the test only runs when run as root. 780 # Only scary people run their tests as root. 781 782 big_value = (2**31 if sys.platform != "vxworks" else 2**15) 783 chown_func(first_param, big_value, big_value) 784 check_stat(big_value, big_value) 785 chown_func(first_param, -1, -1) 786 check_stat(big_value, big_value) 787 chown_func(first_param, uid, gid) 788 check_stat(uid, gid) 789 elif platform.system() in ('HP-UX', 'SunOS'): 790 # HP-UX and Solaris can allow a non-root user to chown() to root 791 # (issue #5113) 792 raise unittest.SkipTest("Skipping because of non-standard chown() " 793 "behavior") 794 else: 795 # non-root cannot chown to root, raises OSError 796 self.assertRaises(OSError, chown_func, first_param, 0, 0) 797 check_stat(uid, gid) 798 self.assertRaises(OSError, chown_func, first_param, 0, -1) 799 check_stat(uid, gid) 800 if hasattr(os, 'getgroups'): 801 if 0 not in os.getgroups(): 802 self.assertRaises(OSError, chown_func, first_param, -1, 0) 803 check_stat(uid, gid) 804 # test illegal types 805 for t in str, float: 806 self.assertRaises(TypeError, chown_func, first_param, t(uid), gid) 807 check_stat(uid, gid) 808 self.assertRaises(TypeError, chown_func, first_param, uid, t(gid)) 809 check_stat(uid, gid) 810 811 @unittest.skipUnless(hasattr(os, "chown"), "requires os.chown()") 812 @unittest.skipIf(support.is_emscripten, "getgid() is a stub") 813 def test_chown(self): 814 # raise an OSError if the file does not exist 815 os.unlink(os_helper.TESTFN) 816 self.assertRaises(OSError, posix.chown, os_helper.TESTFN, -1, -1) 817 818 # re-create the file 819 os_helper.create_empty_file(os_helper.TESTFN) 820 self._test_all_chown_common(posix.chown, os_helper.TESTFN, posix.stat) 821 822 @os_helper.skip_unless_working_chmod 823 @unittest.skipUnless(hasattr(posix, 'fchown'), "test needs os.fchown()") 824 @unittest.skipIf(support.is_emscripten, "getgid() is a stub") 825 def test_fchown(self): 826 os.unlink(os_helper.TESTFN) 827 828 # re-create the file 829 test_file = open(os_helper.TESTFN, 'w') 830 try: 831 fd = test_file.fileno() 832 self._test_all_chown_common(posix.fchown, fd, 833 getattr(posix, 'fstat', None)) 834 finally: 835 test_file.close() 836 837 @os_helper.skip_unless_working_chmod 838 @unittest.skipUnless(hasattr(posix, 'lchown'), "test needs os.lchown()") 839 def test_lchown(self): 840 os.unlink(os_helper.TESTFN) 841 # create a symlink 842 os.symlink(_DUMMY_SYMLINK, os_helper.TESTFN) 843 self._test_all_chown_common(posix.lchown, os_helper.TESTFN, 844 getattr(posix, 'lstat', None)) 845 846 @unittest.skipUnless(hasattr(posix, 'chdir'), 'test needs posix.chdir()') 847 def test_chdir(self): 848 posix.chdir(os.curdir) 849 self.assertRaises(OSError, posix.chdir, os_helper.TESTFN) 850 851 def test_listdir(self): 852 self.assertIn(os_helper.TESTFN, posix.listdir(os.curdir)) 853 854 def test_listdir_default(self): 855 # When listdir is called without argument, 856 # it's the same as listdir(os.curdir). 857 self.assertIn(os_helper.TESTFN, posix.listdir()) 858 859 def test_listdir_bytes(self): 860 # When listdir is called with a bytes object, 861 # the returned strings are of type bytes. 862 self.assertIn(os.fsencode(os_helper.TESTFN), posix.listdir(b'.')) 863 864 def test_listdir_bytes_like(self): 865 for cls in bytearray, memoryview: 866 with self.assertRaises(TypeError): 867 posix.listdir(cls(b'.')) 868 869 @unittest.skipUnless(posix.listdir in os.supports_fd, 870 "test needs fd support for posix.listdir()") 871 def test_listdir_fd(self): 872 f = posix.open(posix.getcwd(), posix.O_RDONLY) 873 self.addCleanup(posix.close, f) 874 self.assertEqual( 875 sorted(posix.listdir('.')), 876 sorted(posix.listdir(f)) 877 ) 878 # Check that the fd offset was reset (issue #13739) 879 self.assertEqual( 880 sorted(posix.listdir('.')), 881 sorted(posix.listdir(f)) 882 ) 883 884 @unittest.skipUnless(hasattr(posix, 'access'), 'test needs posix.access()') 885 def test_access(self): 886 self.assertTrue(posix.access(os_helper.TESTFN, os.R_OK)) 887 888 @unittest.skipUnless(hasattr(posix, 'umask'), 'test needs posix.umask()') 889 def test_umask(self): 890 old_mask = posix.umask(0) 891 self.assertIsInstance(old_mask, int) 892 posix.umask(old_mask) 893 894 @unittest.skipUnless(hasattr(posix, 'strerror'), 895 'test needs posix.strerror()') 896 def test_strerror(self): 897 self.assertTrue(posix.strerror(0)) 898 899 @unittest.skipUnless(hasattr(posix, 'pipe'), 'test needs posix.pipe()') 900 def test_pipe(self): 901 reader, writer = posix.pipe() 902 os.close(reader) 903 os.close(writer) 904 905 @unittest.skipUnless(hasattr(os, 'pipe2'), "test needs os.pipe2()") 906 @support.requires_linux_version(2, 6, 27) 907 def test_pipe2(self): 908 self.assertRaises(TypeError, os.pipe2, 'DEADBEEF') 909 self.assertRaises(TypeError, os.pipe2, 0, 0) 910 911 # try calling with flags = 0, like os.pipe() 912 r, w = os.pipe2(0) 913 os.close(r) 914 os.close(w) 915 916 # test flags 917 r, w = os.pipe2(os.O_CLOEXEC|os.O_NONBLOCK) 918 self.addCleanup(os.close, r) 919 self.addCleanup(os.close, w) 920 self.assertFalse(os.get_inheritable(r)) 921 self.assertFalse(os.get_inheritable(w)) 922 self.assertFalse(os.get_blocking(r)) 923 self.assertFalse(os.get_blocking(w)) 924 # try reading from an empty pipe: this should fail, not block 925 self.assertRaises(OSError, os.read, r, 1) 926 # try a write big enough to fill-up the pipe: this should either 927 # fail or perform a partial write, not block 928 try: 929 os.write(w, b'x' * support.PIPE_MAX_SIZE) 930 except OSError: 931 pass 932 933 @support.cpython_only 934 @unittest.skipUnless(hasattr(os, 'pipe2'), "test needs os.pipe2()") 935 @support.requires_linux_version(2, 6, 27) 936 def test_pipe2_c_limits(self): 937 # Issue 15989 938 import _testcapi 939 self.assertRaises(OverflowError, os.pipe2, _testcapi.INT_MAX + 1) 940 self.assertRaises(OverflowError, os.pipe2, _testcapi.UINT_MAX + 1) 941 942 @unittest.skipUnless(hasattr(posix, 'utime'), 'test needs posix.utime()') 943 def test_utime(self): 944 now = time.time() 945 posix.utime(os_helper.TESTFN, None) 946 self.assertRaises(TypeError, posix.utime, 947 os_helper.TESTFN, (None, None)) 948 self.assertRaises(TypeError, posix.utime, 949 os_helper.TESTFN, (now, None)) 950 self.assertRaises(TypeError, posix.utime, 951 os_helper.TESTFN, (None, now)) 952 posix.utime(os_helper.TESTFN, (int(now), int(now))) 953 posix.utime(os_helper.TESTFN, (now, now)) 954 955 def check_chmod(self, chmod_func, target, **kwargs): 956 closefd = not isinstance(target, int) 957 mode = os.stat(target).st_mode 958 try: 959 new_mode = mode & ~(stat.S_IWOTH | stat.S_IWGRP | stat.S_IWUSR) 960 chmod_func(target, new_mode, **kwargs) 961 self.assertEqual(os.stat(target).st_mode, new_mode) 962 if stat.S_ISREG(mode): 963 try: 964 with open(target, 'wb+', closefd=closefd): 965 pass 966 except PermissionError: 967 pass 968 new_mode = mode | (stat.S_IWOTH | stat.S_IWGRP | stat.S_IWUSR) 969 chmod_func(target, new_mode, **kwargs) 970 self.assertEqual(os.stat(target).st_mode, new_mode) 971 if stat.S_ISREG(mode): 972 with open(target, 'wb+', closefd=closefd): 973 pass 974 finally: 975 chmod_func(target, mode) 976 977 @os_helper.skip_unless_working_chmod 978 def test_chmod_file(self): 979 self.check_chmod(posix.chmod, os_helper.TESTFN) 980 981 def tempdir(self): 982 target = os_helper.TESTFN + 'd' 983 posix.mkdir(target) 984 self.addCleanup(posix.rmdir, target) 985 return target 986 987 @os_helper.skip_unless_working_chmod 988 def test_chmod_dir(self): 989 target = self.tempdir() 990 self.check_chmod(posix.chmod, target) 991 992 @os_helper.skip_unless_working_chmod 993 def test_fchmod_file(self): 994 with open(os_helper.TESTFN, 'wb+') as f: 995 self.check_chmod(posix.fchmod, f.fileno()) 996 self.check_chmod(posix.chmod, f.fileno()) 997 998 @unittest.skipUnless(hasattr(posix, 'lchmod'), 'test needs os.lchmod()') 999 def test_lchmod_file(self): 1000 self.check_chmod(posix.lchmod, os_helper.TESTFN) 1001 self.check_chmod(posix.chmod, os_helper.TESTFN, follow_symlinks=False) 1002 1003 @unittest.skipUnless(hasattr(posix, 'lchmod'), 'test needs os.lchmod()') 1004 def test_lchmod_dir(self): 1005 target = self.tempdir() 1006 self.check_chmod(posix.lchmod, target) 1007 self.check_chmod(posix.chmod, target, follow_symlinks=False) 1008 1009 def check_chmod_link(self, chmod_func, target, link, **kwargs): 1010 target_mode = os.stat(target).st_mode 1011 link_mode = os.lstat(link).st_mode 1012 try: 1013 new_mode = target_mode & ~(stat.S_IWOTH | stat.S_IWGRP | stat.S_IWUSR) 1014 chmod_func(link, new_mode, **kwargs) 1015 self.assertEqual(os.stat(target).st_mode, new_mode) 1016 self.assertEqual(os.lstat(link).st_mode, link_mode) 1017 new_mode = target_mode | (stat.S_IWOTH | stat.S_IWGRP | stat.S_IWUSR) 1018 chmod_func(link, new_mode, **kwargs) 1019 self.assertEqual(os.stat(target).st_mode, new_mode) 1020 self.assertEqual(os.lstat(link).st_mode, link_mode) 1021 finally: 1022 posix.chmod(target, target_mode) 1023 1024 def check_lchmod_link(self, chmod_func, target, link, **kwargs): 1025 target_mode = os.stat(target).st_mode 1026 link_mode = os.lstat(link).st_mode 1027 new_mode = link_mode & ~(stat.S_IWOTH | stat.S_IWGRP | stat.S_IWUSR) 1028 chmod_func(link, new_mode, **kwargs) 1029 self.assertEqual(os.stat(target).st_mode, target_mode) 1030 self.assertEqual(os.lstat(link).st_mode, new_mode) 1031 new_mode = link_mode | (stat.S_IWOTH | stat.S_IWGRP | stat.S_IWUSR) 1032 chmod_func(link, new_mode, **kwargs) 1033 self.assertEqual(os.stat(target).st_mode, target_mode) 1034 self.assertEqual(os.lstat(link).st_mode, new_mode) 1035 1036 @os_helper.skip_unless_symlink 1037 def test_chmod_file_symlink(self): 1038 target = os_helper.TESTFN 1039 link = os_helper.TESTFN + '-link' 1040 os.symlink(target, link) 1041 self.addCleanup(posix.unlink, link) 1042 if os.name == 'nt': 1043 self.check_lchmod_link(posix.chmod, target, link) 1044 else: 1045 self.check_chmod_link(posix.chmod, target, link) 1046 self.check_chmod_link(posix.chmod, target, link, follow_symlinks=True) 1047 1048 @os_helper.skip_unless_symlink 1049 def test_chmod_dir_symlink(self): 1050 target = self.tempdir() 1051 link = os_helper.TESTFN + '-link' 1052 os.symlink(target, link, target_is_directory=True) 1053 self.addCleanup(posix.unlink, link) 1054 if os.name == 'nt': 1055 self.check_lchmod_link(posix.chmod, target, link) 1056 else: 1057 self.check_chmod_link(posix.chmod, target, link) 1058 self.check_chmod_link(posix.chmod, target, link, follow_symlinks=True) 1059 1060 @unittest.skipUnless(hasattr(posix, 'lchmod'), 'test needs os.lchmod()') 1061 @os_helper.skip_unless_symlink 1062 def test_lchmod_file_symlink(self): 1063 target = os_helper.TESTFN 1064 link = os_helper.TESTFN + '-link' 1065 os.symlink(target, link) 1066 self.addCleanup(posix.unlink, link) 1067 self.check_lchmod_link(posix.chmod, target, link, follow_symlinks=False) 1068 self.check_lchmod_link(posix.lchmod, target, link) 1069 1070 @unittest.skipUnless(hasattr(posix, 'lchmod'), 'test needs os.lchmod()') 1071 @os_helper.skip_unless_symlink 1072 def test_lchmod_dir_symlink(self): 1073 target = self.tempdir() 1074 link = os_helper.TESTFN + '-link' 1075 os.symlink(target, link) 1076 self.addCleanup(posix.unlink, link) 1077 self.check_lchmod_link(posix.chmod, target, link, follow_symlinks=False) 1078 self.check_lchmod_link(posix.lchmod, target, link) 1079 1080 def _test_chflags_regular_file(self, chflags_func, target_file, **kwargs): 1081 st = os.stat(target_file) 1082 self.assertTrue(hasattr(st, 'st_flags')) 1083 1084 # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE. 1085 flags = st.st_flags | stat.UF_IMMUTABLE 1086 try: 1087 chflags_func(target_file, flags, **kwargs) 1088 except OSError as err: 1089 if err.errno != errno.EOPNOTSUPP: 1090 raise 1091 msg = 'chflag UF_IMMUTABLE not supported by underlying fs' 1092 self.skipTest(msg) 1093 1094 try: 1095 new_st = os.stat(target_file) 1096 self.assertEqual(st.st_flags | stat.UF_IMMUTABLE, new_st.st_flags) 1097 try: 1098 fd = open(target_file, 'w+') 1099 except OSError as e: 1100 self.assertEqual(e.errno, errno.EPERM) 1101 finally: 1102 posix.chflags(target_file, st.st_flags) 1103 1104 @unittest.skipUnless(hasattr(posix, 'chflags'), 'test needs os.chflags()') 1105 def test_chflags(self): 1106 self._test_chflags_regular_file(posix.chflags, os_helper.TESTFN) 1107 1108 @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()') 1109 def test_lchflags_regular_file(self): 1110 self._test_chflags_regular_file(posix.lchflags, os_helper.TESTFN) 1111 self._test_chflags_regular_file(posix.chflags, os_helper.TESTFN, 1112 follow_symlinks=False) 1113 1114 @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()') 1115 def test_lchflags_symlink(self): 1116 testfn_st = os.stat(os_helper.TESTFN) 1117 1118 self.assertTrue(hasattr(testfn_st, 'st_flags')) 1119 1120 self.addCleanup(os_helper.unlink, _DUMMY_SYMLINK) 1121 os.symlink(os_helper.TESTFN, _DUMMY_SYMLINK) 1122 dummy_symlink_st = os.lstat(_DUMMY_SYMLINK) 1123 1124 def chflags_nofollow(path, flags): 1125 return posix.chflags(path, flags, follow_symlinks=False) 1126 1127 for fn in (posix.lchflags, chflags_nofollow): 1128 # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE. 1129 flags = dummy_symlink_st.st_flags | stat.UF_IMMUTABLE 1130 try: 1131 fn(_DUMMY_SYMLINK, flags) 1132 except OSError as err: 1133 if err.errno != errno.EOPNOTSUPP: 1134 raise 1135 msg = 'chflag UF_IMMUTABLE not supported by underlying fs' 1136 self.skipTest(msg) 1137 try: 1138 new_testfn_st = os.stat(os_helper.TESTFN) 1139 new_dummy_symlink_st = os.lstat(_DUMMY_SYMLINK) 1140 1141 self.assertEqual(testfn_st.st_flags, new_testfn_st.st_flags) 1142 self.assertEqual(dummy_symlink_st.st_flags | stat.UF_IMMUTABLE, 1143 new_dummy_symlink_st.st_flags) 1144 finally: 1145 fn(_DUMMY_SYMLINK, dummy_symlink_st.st_flags) 1146 1147 def test_environ(self): 1148 if os.name == "nt": 1149 item_type = str 1150 else: 1151 item_type = bytes 1152 for k, v in posix.environ.items(): 1153 self.assertEqual(type(k), item_type) 1154 self.assertEqual(type(v), item_type) 1155 1156 def test_putenv(self): 1157 with self.assertRaises(ValueError): 1158 os.putenv('FRUIT\0VEGETABLE', 'cabbage') 1159 with self.assertRaises(ValueError): 1160 os.putenv('FRUIT', 'orange\0VEGETABLE=cabbage') 1161 with self.assertRaises(ValueError): 1162 os.putenv('FRUIT=ORANGE', 'lemon') 1163 if os.name == 'posix': 1164 with self.assertRaises(ValueError): 1165 os.putenv(b'FRUIT\0VEGETABLE', b'cabbage') 1166 with self.assertRaises(ValueError): 1167 os.putenv(b'FRUIT', b'orange\0VEGETABLE=cabbage') 1168 with self.assertRaises(ValueError): 1169 os.putenv(b'FRUIT=ORANGE', b'lemon') 1170 1171 @unittest.skipUnless(hasattr(posix, 'getcwd'), 'test needs posix.getcwd()') 1172 def test_getcwd_long_pathnames(self): 1173 dirname = 'getcwd-test-directory-0123456789abcdef-01234567890abcdef' 1174 curdir = os.getcwd() 1175 base_path = os.path.abspath(os_helper.TESTFN) + '.getcwd' 1176 1177 try: 1178 os.mkdir(base_path) 1179 os.chdir(base_path) 1180 except: 1181 # Just returning nothing instead of the SkipTest exception, because 1182 # the test results in Error in that case. Is that ok? 1183 # raise unittest.SkipTest("cannot create directory for testing") 1184 return 1185 1186 def _create_and_do_getcwd(dirname, current_path_length = 0): 1187 try: 1188 os.mkdir(dirname) 1189 except: 1190 raise unittest.SkipTest("mkdir cannot create directory sufficiently deep for getcwd test") 1191 1192 os.chdir(dirname) 1193 try: 1194 os.getcwd() 1195 if current_path_length < 1027: 1196 _create_and_do_getcwd(dirname, current_path_length + len(dirname) + 1) 1197 finally: 1198 os.chdir('..') 1199 os.rmdir(dirname) 1200 1201 _create_and_do_getcwd(dirname) 1202 1203 finally: 1204 os.chdir(curdir) 1205 os_helper.rmtree(base_path) 1206 1207 @unittest.skipUnless(hasattr(posix, 'getgrouplist'), "test needs posix.getgrouplist()") 1208 @unittest.skipUnless(hasattr(pwd, 'getpwuid'), "test needs pwd.getpwuid()") 1209 @unittest.skipUnless(hasattr(os, 'getuid'), "test needs os.getuid()") 1210 def test_getgrouplist(self): 1211 user = pwd.getpwuid(os.getuid())[0] 1212 group = pwd.getpwuid(os.getuid())[3] 1213 self.assertIn(group, posix.getgrouplist(user, group)) 1214 1215 1216 @unittest.skipUnless(hasattr(os, 'getegid'), "test needs os.getegid()") 1217 @unittest.skipUnless(hasattr(os, 'popen'), "test needs os.popen()") 1218 @support.requires_subprocess() 1219 def test_getgroups(self): 1220 with os.popen('id -G 2>/dev/null') as idg: 1221 groups = idg.read().strip() 1222 ret = idg.close() 1223 1224 try: 1225 idg_groups = set(int(g) for g in groups.split()) 1226 except ValueError: 1227 idg_groups = set() 1228 if ret is not None or not idg_groups: 1229 raise unittest.SkipTest("need working 'id -G'") 1230 1231 # Issues 16698: OS X ABIs prior to 10.6 have limits on getgroups() 1232 if sys.platform == 'darwin': 1233 import sysconfig 1234 dt = sysconfig.get_config_var('MACOSX_DEPLOYMENT_TARGET') or '10.3' 1235 if tuple(int(n) for n in dt.split('.')[0:2]) < (10, 6): 1236 raise unittest.SkipTest("getgroups(2) is broken prior to 10.6") 1237 1238 # 'id -G' and 'os.getgroups()' should return the same 1239 # groups, ignoring order, duplicates, and the effective gid. 1240 # #10822/#26944 - It is implementation defined whether 1241 # posix.getgroups() includes the effective gid. 1242 symdiff = idg_groups.symmetric_difference(posix.getgroups()) 1243 self.assertTrue(not symdiff or symdiff == {posix.getegid()}) 1244 1245 @unittest.skipUnless(hasattr(signal, 'SIGCHLD'), 'CLD_XXXX be placed in si_code for a SIGCHLD signal') 1246 @unittest.skipUnless(hasattr(os, 'waitid_result'), "test needs os.waitid_result") 1247 def test_cld_xxxx_constants(self): 1248 os.CLD_EXITED 1249 os.CLD_KILLED 1250 os.CLD_DUMPED 1251 os.CLD_TRAPPED 1252 os.CLD_STOPPED 1253 os.CLD_CONTINUED 1254 1255 requires_sched_h = unittest.skipUnless(hasattr(posix, 'sched_yield'), 1256 "don't have scheduling support") 1257 requires_sched_affinity = unittest.skipUnless(hasattr(posix, 'sched_setaffinity'), 1258 "don't have sched affinity support") 1259 1260 @requires_sched_h 1261 def test_sched_yield(self): 1262 # This has no error conditions (at least on Linux). 1263 posix.sched_yield() 1264 1265 @requires_sched_h 1266 @unittest.skipUnless(hasattr(posix, 'sched_get_priority_max'), 1267 "requires sched_get_priority_max()") 1268 def test_sched_priority(self): 1269 # Round-robin usually has interesting priorities. 1270 pol = posix.SCHED_RR 1271 lo = posix.sched_get_priority_min(pol) 1272 hi = posix.sched_get_priority_max(pol) 1273 self.assertIsInstance(lo, int) 1274 self.assertIsInstance(hi, int) 1275 self.assertGreaterEqual(hi, lo) 1276 # Apple platforms return 15 without checking the argument. 1277 if not is_apple: 1278 self.assertRaises(OSError, posix.sched_get_priority_min, -23) 1279 self.assertRaises(OSError, posix.sched_get_priority_max, -23) 1280 1281 @requires_sched 1282 def test_get_and_set_scheduler_and_param(self): 1283 possible_schedulers = [sched for name, sched in posix.__dict__.items() 1284 if name.startswith("SCHED_")] 1285 mine = posix.sched_getscheduler(0) 1286 self.assertIn(mine, possible_schedulers) 1287 try: 1288 parent = posix.sched_getscheduler(os.getppid()) 1289 except PermissionError: 1290 # POSIX specifies EPERM, but Android returns EACCES. Both errno 1291 # values are mapped to PermissionError. 1292 pass 1293 else: 1294 self.assertIn(parent, possible_schedulers) 1295 self.assertRaises(OSError, posix.sched_getscheduler, -1) 1296 self.assertRaises(OSError, posix.sched_getparam, -1) 1297 param = posix.sched_getparam(0) 1298 self.assertIsInstance(param.sched_priority, int) 1299 1300 # POSIX states that calling sched_setparam() or sched_setscheduler() on 1301 # a process with a scheduling policy other than SCHED_FIFO or SCHED_RR 1302 # is implementation-defined: NetBSD and FreeBSD can return EINVAL. 1303 if not sys.platform.startswith(('freebsd', 'netbsd')): 1304 try: 1305 posix.sched_setscheduler(0, mine, param) 1306 posix.sched_setparam(0, param) 1307 except PermissionError: 1308 pass 1309 self.assertRaises(OSError, posix.sched_setparam, -1, param) 1310 1311 self.assertRaises(OSError, posix.sched_setscheduler, -1, mine, param) 1312 self.assertRaises(TypeError, posix.sched_setscheduler, 0, mine, None) 1313 self.assertRaises(TypeError, posix.sched_setparam, 0, 43) 1314 param = posix.sched_param(None) 1315 self.assertRaises(TypeError, posix.sched_setparam, 0, param) 1316 large = 214748364700 1317 param = posix.sched_param(large) 1318 self.assertRaises(OverflowError, posix.sched_setparam, 0, param) 1319 param = posix.sched_param(sched_priority=-large) 1320 self.assertRaises(OverflowError, posix.sched_setparam, 0, param) 1321 1322 @requires_sched 1323 def test_sched_param(self): 1324 param = posix.sched_param(1) 1325 for proto in range(pickle.HIGHEST_PROTOCOL+1): 1326 newparam = pickle.loads(pickle.dumps(param, proto)) 1327 self.assertEqual(newparam, param) 1328 newparam = copy.copy(param) 1329 self.assertIsNot(newparam, param) 1330 self.assertEqual(newparam, param) 1331 newparam = copy.deepcopy(param) 1332 self.assertIsNot(newparam, param) 1333 self.assertEqual(newparam, param) 1334 newparam = copy.replace(param) 1335 self.assertIsNot(newparam, param) 1336 self.assertEqual(newparam, param) 1337 newparam = copy.replace(param, sched_priority=0) 1338 self.assertNotEqual(newparam, param) 1339 self.assertEqual(newparam.sched_priority, 0) 1340 1341 @unittest.skipUnless(hasattr(posix, "sched_rr_get_interval"), "no function") 1342 def test_sched_rr_get_interval(self): 1343 try: 1344 interval = posix.sched_rr_get_interval(0) 1345 except OSError as e: 1346 # This likely means that sched_rr_get_interval is only valid for 1347 # processes with the SCHED_RR scheduler in effect. 1348 if e.errno != errno.EINVAL: 1349 raise 1350 self.skipTest("only works on SCHED_RR processes") 1351 self.assertIsInstance(interval, float) 1352 # Reasonable constraints, I think. 1353 self.assertGreaterEqual(interval, 0.) 1354 self.assertLess(interval, 1.) 1355 1356 @requires_sched_affinity 1357 def test_sched_getaffinity(self): 1358 mask = posix.sched_getaffinity(0) 1359 self.assertIsInstance(mask, set) 1360 self.assertGreaterEqual(len(mask), 1) 1361 if not sys.platform.startswith("freebsd"): 1362 # bpo-47205: does not raise OSError on FreeBSD 1363 self.assertRaises(OSError, posix.sched_getaffinity, -1) 1364 for cpu in mask: 1365 self.assertIsInstance(cpu, int) 1366 self.assertGreaterEqual(cpu, 0) 1367 self.assertLess(cpu, 1 << 32) 1368 1369 @requires_sched_affinity 1370 def test_sched_setaffinity(self): 1371 mask = posix.sched_getaffinity(0) 1372 self.addCleanup(posix.sched_setaffinity, 0, list(mask)) 1373 1374 if len(mask) > 1: 1375 # Empty masks are forbidden 1376 mask.pop() 1377 posix.sched_setaffinity(0, mask) 1378 self.assertEqual(posix.sched_getaffinity(0), mask) 1379 1380 try: 1381 posix.sched_setaffinity(0, []) 1382 # gh-117061: On RHEL9, sched_setaffinity(0, []) does not fail 1383 except OSError: 1384 # sched_setaffinity() manual page documents EINVAL error 1385 # when the mask is empty. 1386 pass 1387 1388 self.assertRaises(ValueError, posix.sched_setaffinity, 0, [-10]) 1389 self.assertRaises(ValueError, posix.sched_setaffinity, 0, map(int, "0X")) 1390 self.assertRaises(OverflowError, posix.sched_setaffinity, 0, [1<<128]) 1391 if not sys.platform.startswith("freebsd"): 1392 # bpo-47205: does not raise OSError on FreeBSD 1393 self.assertRaises(OSError, posix.sched_setaffinity, -1, mask) 1394 1395 @unittest.skipIf(support.is_wasi, "No dynamic linking on WASI") 1396 @unittest.skipUnless(os.name == 'posix', "POSIX-only test") 1397 def test_rtld_constants(self): 1398 # check presence of major RTLD_* constants 1399 posix.RTLD_LAZY 1400 posix.RTLD_NOW 1401 posix.RTLD_GLOBAL 1402 posix.RTLD_LOCAL 1403 1404 @unittest.skipUnless(hasattr(os, 'SEEK_HOLE'), 1405 "test needs an OS that reports file holes") 1406 def test_fs_holes(self): 1407 # Even if the filesystem doesn't report holes, 1408 # if the OS supports it the SEEK_* constants 1409 # will be defined and will have a consistent 1410 # behaviour: 1411 # os.SEEK_DATA = current position 1412 # os.SEEK_HOLE = end of file position 1413 with open(os_helper.TESTFN, 'r+b') as fp: 1414 fp.write(b"hello") 1415 fp.flush() 1416 size = fp.tell() 1417 fno = fp.fileno() 1418 try : 1419 for i in range(size): 1420 self.assertEqual(i, os.lseek(fno, i, os.SEEK_DATA)) 1421 self.assertLessEqual(size, os.lseek(fno, i, os.SEEK_HOLE)) 1422 self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_DATA) 1423 self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_HOLE) 1424 except OSError : 1425 # Some OSs claim to support SEEK_HOLE/SEEK_DATA 1426 # but it is not true. 1427 # For instance: 1428 # http://lists.freebsd.org/pipermail/freebsd-amd64/2012-January/014332.html 1429 raise unittest.SkipTest("OSError raised!") 1430 1431 def test_path_error2(self): 1432 """ 1433 Test functions that call path_error2(), providing two filenames in their exceptions. 1434 """ 1435 for name in ("rename", "replace", "link"): 1436 function = getattr(os, name, None) 1437 if function is None: 1438 continue 1439 1440 for dst in ("noodly2", os_helper.TESTFN): 1441 try: 1442 function('doesnotexistfilename', dst) 1443 except OSError as e: 1444 self.assertIn("'doesnotexistfilename' -> '{}'".format(dst), str(e)) 1445 break 1446 else: 1447 self.fail("No valid path_error2() test for os." + name) 1448 1449 def test_path_with_null_character(self): 1450 fn = os_helper.TESTFN 1451 fn_with_NUL = fn + '\0' 1452 self.addCleanup(os_helper.unlink, fn) 1453 os_helper.unlink(fn) 1454 fd = None 1455 try: 1456 with self.assertRaises(ValueError): 1457 fd = os.open(fn_with_NUL, os.O_WRONLY | os.O_CREAT) # raises 1458 finally: 1459 if fd is not None: 1460 os.close(fd) 1461 self.assertFalse(os.path.exists(fn)) 1462 self.assertRaises(ValueError, os.mkdir, fn_with_NUL) 1463 self.assertFalse(os.path.exists(fn)) 1464 open(fn, 'wb').close() 1465 self.assertRaises(ValueError, os.stat, fn_with_NUL) 1466 1467 def test_path_with_null_byte(self): 1468 fn = os.fsencode(os_helper.TESTFN) 1469 fn_with_NUL = fn + b'\0' 1470 self.addCleanup(os_helper.unlink, fn) 1471 os_helper.unlink(fn) 1472 fd = None 1473 try: 1474 with self.assertRaises(ValueError): 1475 fd = os.open(fn_with_NUL, os.O_WRONLY | os.O_CREAT) # raises 1476 finally: 1477 if fd is not None: 1478 os.close(fd) 1479 self.assertFalse(os.path.exists(fn)) 1480 self.assertRaises(ValueError, os.mkdir, fn_with_NUL) 1481 self.assertFalse(os.path.exists(fn)) 1482 open(fn, 'wb').close() 1483 self.assertRaises(ValueError, os.stat, fn_with_NUL) 1484 1485 @unittest.skipUnless(hasattr(os, "pidfd_open"), "pidfd_open unavailable") 1486 def test_pidfd_open(self): 1487 with self.assertRaises(OSError) as cm: 1488 os.pidfd_open(-1) 1489 if cm.exception.errno == errno.ENOSYS: 1490 self.skipTest("system does not support pidfd_open") 1491 if isinstance(cm.exception, PermissionError): 1492 self.skipTest(f"pidfd_open syscall blocked: {cm.exception!r}") 1493 self.assertEqual(cm.exception.errno, errno.EINVAL) 1494 os.close(os.pidfd_open(os.getpid(), 0)) 1495 1496 1497# tests for the posix *at functions follow 1498class TestPosixDirFd(unittest.TestCase): 1499 count = 0 1500 1501 @contextmanager 1502 def prepare(self): 1503 TestPosixDirFd.count += 1 1504 name = f'{os_helper.TESTFN}_{self.count}' 1505 base_dir = f'{os_helper.TESTFN}_{self.count}base' 1506 posix.mkdir(base_dir) 1507 self.addCleanup(posix.rmdir, base_dir) 1508 fullname = os.path.join(base_dir, name) 1509 assert not os.path.exists(fullname) 1510 with os_helper.open_dir_fd(base_dir) as dir_fd: 1511 yield (dir_fd, name, fullname) 1512 1513 @contextmanager 1514 def prepare_file(self): 1515 with self.prepare() as (dir_fd, name, fullname): 1516 os_helper.create_empty_file(fullname) 1517 self.addCleanup(posix.unlink, fullname) 1518 yield (dir_fd, name, fullname) 1519 1520 @unittest.skipUnless(os.access in os.supports_dir_fd, "test needs dir_fd support for os.access()") 1521 def test_access_dir_fd(self): 1522 with self.prepare_file() as (dir_fd, name, fullname): 1523 self.assertTrue(posix.access(name, os.R_OK, dir_fd=dir_fd)) 1524 1525 @unittest.skipUnless(os.chmod in os.supports_dir_fd, "test needs dir_fd support in os.chmod()") 1526 def test_chmod_dir_fd(self): 1527 with self.prepare_file() as (dir_fd, name, fullname): 1528 posix.chmod(fullname, stat.S_IRUSR) 1529 posix.chmod(name, stat.S_IRUSR | stat.S_IWUSR, dir_fd=dir_fd) 1530 s = posix.stat(fullname) 1531 self.assertEqual(s.st_mode & stat.S_IRWXU, 1532 stat.S_IRUSR | stat.S_IWUSR) 1533 1534 @unittest.skipUnless(hasattr(os, 'chown') and (os.chown in os.supports_dir_fd), 1535 "test needs dir_fd support in os.chown()") 1536 @unittest.skipIf(support.is_emscripten, "getgid() is a stub") 1537 def test_chown_dir_fd(self): 1538 with self.prepare_file() as (dir_fd, name, fullname): 1539 posix.chown(name, os.getuid(), os.getgid(), dir_fd=dir_fd) 1540 1541 @unittest.skipUnless(os.stat in os.supports_dir_fd, "test needs dir_fd support in os.stat()") 1542 def test_stat_dir_fd(self): 1543 with self.prepare() as (dir_fd, name, fullname): 1544 with open(fullname, 'w') as outfile: 1545 outfile.write("testline\n") 1546 self.addCleanup(posix.unlink, fullname) 1547 1548 s1 = posix.stat(fullname) 1549 s2 = posix.stat(name, dir_fd=dir_fd) 1550 self.assertEqual(s1, s2) 1551 s2 = posix.stat(fullname, dir_fd=None) 1552 self.assertEqual(s1, s2) 1553 1554 self.assertRaisesRegex(TypeError, 'should be integer or None, not', 1555 posix.stat, name, dir_fd=posix.getcwd()) 1556 self.assertRaisesRegex(TypeError, 'should be integer or None, not', 1557 posix.stat, name, dir_fd=float(dir_fd)) 1558 self.assertRaises(OverflowError, 1559 posix.stat, name, dir_fd=10**20) 1560 1561 for fd in False, True: 1562 with self.assertWarnsRegex(RuntimeWarning, 1563 'bool is used as a file descriptor') as cm: 1564 with self.assertRaises(OSError): 1565 posix.stat('nonexisting', dir_fd=fd) 1566 self.assertEqual(cm.filename, __file__) 1567 1568 @unittest.skipUnless(os.utime in os.supports_dir_fd, "test needs dir_fd support in os.utime()") 1569 def test_utime_dir_fd(self): 1570 with self.prepare_file() as (dir_fd, name, fullname): 1571 now = time.time() 1572 posix.utime(name, None, dir_fd=dir_fd) 1573 posix.utime(name, dir_fd=dir_fd) 1574 self.assertRaises(TypeError, posix.utime, name, 1575 now, dir_fd=dir_fd) 1576 self.assertRaises(TypeError, posix.utime, name, 1577 (None, None), dir_fd=dir_fd) 1578 self.assertRaises(TypeError, posix.utime, name, 1579 (now, None), dir_fd=dir_fd) 1580 self.assertRaises(TypeError, posix.utime, name, 1581 (None, now), dir_fd=dir_fd) 1582 self.assertRaises(TypeError, posix.utime, name, 1583 (now, "x"), dir_fd=dir_fd) 1584 posix.utime(name, (int(now), int(now)), dir_fd=dir_fd) 1585 posix.utime(name, (now, now), dir_fd=dir_fd) 1586 posix.utime(name, 1587 (int(now), int((now - int(now)) * 1e9)), dir_fd=dir_fd) 1588 posix.utime(name, dir_fd=dir_fd, 1589 times=(int(now), int((now - int(now)) * 1e9))) 1590 1591 # try dir_fd and follow_symlinks together 1592 if os.utime in os.supports_follow_symlinks: 1593 try: 1594 posix.utime(name, follow_symlinks=False, dir_fd=dir_fd) 1595 except ValueError: 1596 # whoops! using both together not supported on this platform. 1597 pass 1598 1599 @unittest.skipIf( 1600 support.is_wasi, 1601 "WASI: symlink following on path_link is not supported" 1602 ) 1603 @unittest.skipUnless( 1604 hasattr(os, "link") and os.link in os.supports_dir_fd, 1605 "test needs dir_fd support in os.link()" 1606 ) 1607 def test_link_dir_fd(self): 1608 with self.prepare_file() as (dir_fd, name, fullname), \ 1609 self.prepare() as (dir_fd2, linkname, fulllinkname): 1610 try: 1611 posix.link(name, linkname, src_dir_fd=dir_fd, dst_dir_fd=dir_fd2) 1612 except PermissionError as e: 1613 self.skipTest('posix.link(): %s' % e) 1614 self.addCleanup(posix.unlink, fulllinkname) 1615 # should have same inodes 1616 self.assertEqual(posix.stat(fullname)[1], 1617 posix.stat(fulllinkname)[1]) 1618 1619 @unittest.skipUnless(os.mkdir in os.supports_dir_fd, "test needs dir_fd support in os.mkdir()") 1620 def test_mkdir_dir_fd(self): 1621 with self.prepare() as (dir_fd, name, fullname): 1622 posix.mkdir(name, dir_fd=dir_fd) 1623 self.addCleanup(posix.rmdir, fullname) 1624 posix.stat(fullname) # should not raise exception 1625 1626 @unittest.skipUnless(hasattr(os, 'mknod') 1627 and (os.mknod in os.supports_dir_fd) 1628 and hasattr(stat, 'S_IFIFO'), 1629 "test requires both stat.S_IFIFO and dir_fd support for os.mknod()") 1630 def test_mknod_dir_fd(self): 1631 # Test using mknodat() to create a FIFO (the only use specified 1632 # by POSIX). 1633 with self.prepare() as (dir_fd, name, fullname): 1634 mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR 1635 try: 1636 posix.mknod(name, mode, 0, dir_fd=dir_fd) 1637 except OSError as e: 1638 # Some old systems don't allow unprivileged users to use 1639 # mknod(), or only support creating device nodes. 1640 self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES)) 1641 else: 1642 self.addCleanup(posix.unlink, fullname) 1643 self.assertTrue(stat.S_ISFIFO(posix.stat(fullname).st_mode)) 1644 1645 @unittest.skipUnless(os.open in os.supports_dir_fd, "test needs dir_fd support in os.open()") 1646 def test_open_dir_fd(self): 1647 with self.prepare() as (dir_fd, name, fullname): 1648 with open(fullname, 'wb') as outfile: 1649 outfile.write(b"testline\n") 1650 self.addCleanup(posix.unlink, fullname) 1651 fd = posix.open(name, posix.O_RDONLY, dir_fd=dir_fd) 1652 try: 1653 res = posix.read(fd, 9) 1654 self.assertEqual(b"testline\n", res) 1655 finally: 1656 posix.close(fd) 1657 1658 @unittest.skipUnless(hasattr(os, 'readlink') and (os.readlink in os.supports_dir_fd), 1659 "test needs dir_fd support in os.readlink()") 1660 def test_readlink_dir_fd(self): 1661 with self.prepare() as (dir_fd, name, fullname): 1662 os.symlink('symlink', fullname) 1663 self.addCleanup(posix.unlink, fullname) 1664 self.assertEqual(posix.readlink(name, dir_fd=dir_fd), 'symlink') 1665 1666 @unittest.skipUnless(os.rename in os.supports_dir_fd, "test needs dir_fd support in os.rename()") 1667 def test_rename_dir_fd(self): 1668 with self.prepare_file() as (dir_fd, name, fullname), \ 1669 self.prepare() as (dir_fd2, name2, fullname2): 1670 posix.rename(name, name2, 1671 src_dir_fd=dir_fd, dst_dir_fd=dir_fd2) 1672 posix.stat(fullname2) # should not raise exception 1673 posix.rename(fullname2, fullname) 1674 1675 @unittest.skipUnless(os.symlink in os.supports_dir_fd, "test needs dir_fd support in os.symlink()") 1676 def test_symlink_dir_fd(self): 1677 with self.prepare() as (dir_fd, name, fullname): 1678 posix.symlink('symlink', name, dir_fd=dir_fd) 1679 self.addCleanup(posix.unlink, fullname) 1680 self.assertEqual(posix.readlink(fullname), 'symlink') 1681 1682 @unittest.skipUnless(os.unlink in os.supports_dir_fd, "test needs dir_fd support in os.unlink()") 1683 def test_unlink_dir_fd(self): 1684 with self.prepare() as (dir_fd, name, fullname): 1685 os_helper.create_empty_file(fullname) 1686 posix.stat(fullname) # should not raise exception 1687 try: 1688 posix.unlink(name, dir_fd=dir_fd) 1689 self.assertRaises(OSError, posix.stat, fullname) 1690 except: 1691 self.addCleanup(posix.unlink, fullname) 1692 raise 1693 1694 @unittest.skipUnless(hasattr(os, 'mkfifo') and os.mkfifo in os.supports_dir_fd, "test needs dir_fd support in os.mkfifo()") 1695 def test_mkfifo_dir_fd(self): 1696 with self.prepare() as (dir_fd, name, fullname): 1697 try: 1698 posix.mkfifo(name, stat.S_IRUSR | stat.S_IWUSR, dir_fd=dir_fd) 1699 except PermissionError as e: 1700 self.skipTest('posix.mkfifo(): %s' % e) 1701 self.addCleanup(posix.unlink, fullname) 1702 self.assertTrue(stat.S_ISFIFO(posix.stat(fullname).st_mode)) 1703 1704 1705class PosixGroupsTester(unittest.TestCase): 1706 1707 def setUp(self): 1708 if posix.getuid() != 0: 1709 raise unittest.SkipTest("not enough privileges") 1710 if not hasattr(posix, 'getgroups'): 1711 raise unittest.SkipTest("need posix.getgroups") 1712 if sys.platform == 'darwin': 1713 raise unittest.SkipTest("getgroups(2) is broken on OSX") 1714 self.saved_groups = posix.getgroups() 1715 1716 def tearDown(self): 1717 if hasattr(posix, 'setgroups'): 1718 posix.setgroups(self.saved_groups) 1719 elif hasattr(posix, 'initgroups'): 1720 name = pwd.getpwuid(posix.getuid()).pw_name 1721 posix.initgroups(name, self.saved_groups[0]) 1722 1723 @unittest.skipUnless(hasattr(posix, 'initgroups'), 1724 "test needs posix.initgroups()") 1725 def test_initgroups(self): 1726 # find missing group 1727 1728 g = max(self.saved_groups or [0]) + 1 1729 name = pwd.getpwuid(posix.getuid()).pw_name 1730 posix.initgroups(name, g) 1731 self.assertIn(g, posix.getgroups()) 1732 1733 @unittest.skipUnless(hasattr(posix, 'setgroups'), 1734 "test needs posix.setgroups()") 1735 def test_setgroups(self): 1736 for groups in [[0], list(range(16))]: 1737 posix.setgroups(groups) 1738 self.assertListEqual(groups, posix.getgroups()) 1739 1740 1741class _PosixSpawnMixin: 1742 # Program which does nothing and exits with status 0 (success) 1743 NOOP_PROGRAM = (sys.executable, '-I', '-S', '-c', 'pass') 1744 spawn_func = None 1745 1746 def python_args(self, *args): 1747 # Disable site module to avoid side effects. For example, 1748 # on Fedora 28, if the HOME environment variable is not set, 1749 # site._getuserbase() calls pwd.getpwuid() which opens 1750 # /var/lib/sss/mc/passwd but then leaves the file open which makes 1751 # test_close_file() to fail. 1752 return (sys.executable, '-I', '-S', *args) 1753 1754 def test_returns_pid(self): 1755 pidfile = os_helper.TESTFN 1756 self.addCleanup(os_helper.unlink, pidfile) 1757 script = f"""if 1: 1758 import os 1759 with open({pidfile!r}, "w") as pidfile: 1760 pidfile.write(str(os.getpid())) 1761 """ 1762 args = self.python_args('-c', script) 1763 pid = self.spawn_func(args[0], args, os.environ) 1764 support.wait_process(pid, exitcode=0) 1765 with open(pidfile, encoding="utf-8") as f: 1766 self.assertEqual(f.read(), str(pid)) 1767 1768 def test_no_such_executable(self): 1769 no_such_executable = 'no_such_executable' 1770 try: 1771 pid = self.spawn_func(no_such_executable, 1772 [no_such_executable], 1773 os.environ) 1774 # bpo-35794: PermissionError can be raised if there are 1775 # directories in the $PATH that are not accessible. 1776 except (FileNotFoundError, PermissionError) as exc: 1777 self.assertEqual(exc.filename, no_such_executable) 1778 else: 1779 pid2, status = os.waitpid(pid, 0) 1780 self.assertEqual(pid2, pid) 1781 self.assertNotEqual(status, 0) 1782 1783 def test_specify_environment(self): 1784 envfile = os_helper.TESTFN 1785 self.addCleanup(os_helper.unlink, envfile) 1786 script = f"""if 1: 1787 import os 1788 with open({envfile!r}, "w", encoding="utf-8") as envfile: 1789 envfile.write(os.environ['foo']) 1790 """ 1791 args = self.python_args('-c', script) 1792 pid = self.spawn_func(args[0], args, 1793 {**os.environ, 'foo': 'bar'}) 1794 support.wait_process(pid, exitcode=0) 1795 with open(envfile, encoding="utf-8") as f: 1796 self.assertEqual(f.read(), 'bar') 1797 1798 def test_none_file_actions(self): 1799 pid = self.spawn_func( 1800 self.NOOP_PROGRAM[0], 1801 self.NOOP_PROGRAM, 1802 os.environ, 1803 file_actions=None 1804 ) 1805 support.wait_process(pid, exitcode=0) 1806 1807 def test_empty_file_actions(self): 1808 pid = self.spawn_func( 1809 self.NOOP_PROGRAM[0], 1810 self.NOOP_PROGRAM, 1811 os.environ, 1812 file_actions=[] 1813 ) 1814 support.wait_process(pid, exitcode=0) 1815 1816 def test_resetids_explicit_default(self): 1817 pid = self.spawn_func( 1818 sys.executable, 1819 [sys.executable, '-c', 'pass'], 1820 os.environ, 1821 resetids=False 1822 ) 1823 support.wait_process(pid, exitcode=0) 1824 1825 def test_resetids(self): 1826 pid = self.spawn_func( 1827 sys.executable, 1828 [sys.executable, '-c', 'pass'], 1829 os.environ, 1830 resetids=True 1831 ) 1832 support.wait_process(pid, exitcode=0) 1833 1834 def test_setpgroup(self): 1835 pid = self.spawn_func( 1836 sys.executable, 1837 [sys.executable, '-c', 'pass'], 1838 os.environ, 1839 setpgroup=os.getpgrp() 1840 ) 1841 support.wait_process(pid, exitcode=0) 1842 1843 def test_setpgroup_wrong_type(self): 1844 with self.assertRaises(TypeError): 1845 self.spawn_func(sys.executable, 1846 [sys.executable, "-c", "pass"], 1847 os.environ, setpgroup="023") 1848 1849 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 1850 'need signal.pthread_sigmask()') 1851 def test_setsigmask(self): 1852 code = textwrap.dedent("""\ 1853 import signal 1854 signal.raise_signal(signal.SIGUSR1)""") 1855 1856 pid = self.spawn_func( 1857 sys.executable, 1858 [sys.executable, '-c', code], 1859 os.environ, 1860 setsigmask=[signal.SIGUSR1] 1861 ) 1862 support.wait_process(pid, exitcode=0) 1863 1864 def test_setsigmask_wrong_type(self): 1865 with self.assertRaises(TypeError): 1866 self.spawn_func(sys.executable, 1867 [sys.executable, "-c", "pass"], 1868 os.environ, setsigmask=34) 1869 with self.assertRaises(TypeError): 1870 self.spawn_func(sys.executable, 1871 [sys.executable, "-c", "pass"], 1872 os.environ, setsigmask=["j"]) 1873 with self.assertRaises(ValueError): 1874 self.spawn_func(sys.executable, 1875 [sys.executable, "-c", "pass"], 1876 os.environ, setsigmask=[signal.NSIG, 1877 signal.NSIG+1]) 1878 1879 def test_setsid(self): 1880 rfd, wfd = os.pipe() 1881 self.addCleanup(os.close, rfd) 1882 try: 1883 os.set_inheritable(wfd, True) 1884 1885 code = textwrap.dedent(f""" 1886 import os 1887 fd = {wfd} 1888 sid = os.getsid(0) 1889 os.write(fd, str(sid).encode()) 1890 """) 1891 1892 try: 1893 pid = self.spawn_func(sys.executable, 1894 [sys.executable, "-c", code], 1895 os.environ, setsid=True) 1896 except NotImplementedError as exc: 1897 self.skipTest(f"setsid is not supported: {exc!r}") 1898 except PermissionError as exc: 1899 self.skipTest(f"setsid failed with: {exc!r}") 1900 finally: 1901 os.close(wfd) 1902 1903 support.wait_process(pid, exitcode=0) 1904 1905 output = os.read(rfd, 100) 1906 child_sid = int(output) 1907 parent_sid = os.getsid(os.getpid()) 1908 self.assertNotEqual(parent_sid, child_sid) 1909 1910 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 1911 'need signal.pthread_sigmask()') 1912 def test_setsigdef(self): 1913 original_handler = signal.signal(signal.SIGUSR1, signal.SIG_IGN) 1914 code = textwrap.dedent("""\ 1915 import signal 1916 signal.raise_signal(signal.SIGUSR1)""") 1917 try: 1918 pid = self.spawn_func( 1919 sys.executable, 1920 [sys.executable, '-c', code], 1921 os.environ, 1922 setsigdef=[signal.SIGUSR1] 1923 ) 1924 finally: 1925 signal.signal(signal.SIGUSR1, original_handler) 1926 1927 support.wait_process(pid, exitcode=-signal.SIGUSR1) 1928 1929 def test_setsigdef_wrong_type(self): 1930 with self.assertRaises(TypeError): 1931 self.spawn_func(sys.executable, 1932 [sys.executable, "-c", "pass"], 1933 os.environ, setsigdef=34) 1934 with self.assertRaises(TypeError): 1935 self.spawn_func(sys.executable, 1936 [sys.executable, "-c", "pass"], 1937 os.environ, setsigdef=["j"]) 1938 with self.assertRaises(ValueError): 1939 self.spawn_func(sys.executable, 1940 [sys.executable, "-c", "pass"], 1941 os.environ, setsigdef=[signal.NSIG, signal.NSIG+1]) 1942 1943 @requires_sched 1944 @unittest.skipIf(sys.platform.startswith(('freebsd', 'netbsd')), 1945 "bpo-34685: test can fail on BSD") 1946 def test_setscheduler_only_param(self): 1947 policy = os.sched_getscheduler(0) 1948 priority = os.sched_get_priority_min(policy) 1949 code = textwrap.dedent(f"""\ 1950 import os, sys 1951 if os.sched_getscheduler(0) != {policy}: 1952 sys.exit(101) 1953 if os.sched_getparam(0).sched_priority != {priority}: 1954 sys.exit(102)""") 1955 pid = self.spawn_func( 1956 sys.executable, 1957 [sys.executable, '-c', code], 1958 os.environ, 1959 scheduler=(None, os.sched_param(priority)) 1960 ) 1961 support.wait_process(pid, exitcode=0) 1962 1963 @requires_sched 1964 @unittest.skipIf(sys.platform.startswith(('freebsd', 'netbsd')), 1965 "bpo-34685: test can fail on BSD") 1966 def test_setscheduler_with_policy(self): 1967 policy = os.sched_getscheduler(0) 1968 priority = os.sched_get_priority_min(policy) 1969 code = textwrap.dedent(f"""\ 1970 import os, sys 1971 if os.sched_getscheduler(0) != {policy}: 1972 sys.exit(101) 1973 if os.sched_getparam(0).sched_priority != {priority}: 1974 sys.exit(102)""") 1975 pid = self.spawn_func( 1976 sys.executable, 1977 [sys.executable, '-c', code], 1978 os.environ, 1979 scheduler=(policy, os.sched_param(priority)) 1980 ) 1981 support.wait_process(pid, exitcode=0) 1982 1983 def test_multiple_file_actions(self): 1984 file_actions = [ 1985 (os.POSIX_SPAWN_OPEN, 3, os.path.realpath(__file__), os.O_RDONLY, 0), 1986 (os.POSIX_SPAWN_CLOSE, 0), 1987 (os.POSIX_SPAWN_DUP2, 1, 4), 1988 ] 1989 pid = self.spawn_func(self.NOOP_PROGRAM[0], 1990 self.NOOP_PROGRAM, 1991 os.environ, 1992 file_actions=file_actions) 1993 support.wait_process(pid, exitcode=0) 1994 1995 def test_bad_file_actions(self): 1996 args = self.NOOP_PROGRAM 1997 with self.assertRaises(TypeError): 1998 self.spawn_func(args[0], args, os.environ, 1999 file_actions=[None]) 2000 with self.assertRaises(TypeError): 2001 self.spawn_func(args[0], args, os.environ, 2002 file_actions=[()]) 2003 with self.assertRaises(TypeError): 2004 self.spawn_func(args[0], args, os.environ, 2005 file_actions=[(None,)]) 2006 with self.assertRaises(TypeError): 2007 self.spawn_func(args[0], args, os.environ, 2008 file_actions=[(12345,)]) 2009 with self.assertRaises(TypeError): 2010 self.spawn_func(args[0], args, os.environ, 2011 file_actions=[(os.POSIX_SPAWN_CLOSE,)]) 2012 with self.assertRaises(TypeError): 2013 self.spawn_func(args[0], args, os.environ, 2014 file_actions=[(os.POSIX_SPAWN_CLOSE, 1, 2)]) 2015 with self.assertRaises(TypeError): 2016 self.spawn_func(args[0], args, os.environ, 2017 file_actions=[(os.POSIX_SPAWN_CLOSE, None)]) 2018 with self.assertRaises(ValueError): 2019 self.spawn_func(args[0], args, os.environ, 2020 file_actions=[(os.POSIX_SPAWN_OPEN, 2021 3, __file__ + '\0', 2022 os.O_RDONLY, 0)]) 2023 2024 def test_open_file(self): 2025 outfile = os_helper.TESTFN 2026 self.addCleanup(os_helper.unlink, outfile) 2027 script = """if 1: 2028 import sys 2029 sys.stdout.write("hello") 2030 """ 2031 file_actions = [ 2032 (os.POSIX_SPAWN_OPEN, 1, outfile, 2033 os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 2034 stat.S_IRUSR | stat.S_IWUSR), 2035 ] 2036 args = self.python_args('-c', script) 2037 pid = self.spawn_func(args[0], args, os.environ, 2038 file_actions=file_actions) 2039 2040 support.wait_process(pid, exitcode=0) 2041 with open(outfile, encoding="utf-8") as f: 2042 self.assertEqual(f.read(), 'hello') 2043 2044 def test_close_file(self): 2045 closefile = os_helper.TESTFN 2046 self.addCleanup(os_helper.unlink, closefile) 2047 script = f"""if 1: 2048 import os 2049 try: 2050 os.fstat(0) 2051 except OSError as e: 2052 with open({closefile!r}, 'w', encoding='utf-8') as closefile: 2053 closefile.write('is closed %d' % e.errno) 2054 """ 2055 args = self.python_args('-c', script) 2056 pid = self.spawn_func(args[0], args, os.environ, 2057 file_actions=[(os.POSIX_SPAWN_CLOSE, 0)]) 2058 2059 support.wait_process(pid, exitcode=0) 2060 with open(closefile, encoding="utf-8") as f: 2061 self.assertEqual(f.read(), 'is closed %d' % errno.EBADF) 2062 2063 def test_dup2(self): 2064 dupfile = os_helper.TESTFN 2065 self.addCleanup(os_helper.unlink, dupfile) 2066 script = """if 1: 2067 import sys 2068 sys.stdout.write("hello") 2069 """ 2070 with open(dupfile, "wb") as childfile: 2071 file_actions = [ 2072 (os.POSIX_SPAWN_DUP2, childfile.fileno(), 1), 2073 ] 2074 args = self.python_args('-c', script) 2075 pid = self.spawn_func(args[0], args, os.environ, 2076 file_actions=file_actions) 2077 support.wait_process(pid, exitcode=0) 2078 with open(dupfile, encoding="utf-8") as f: 2079 self.assertEqual(f.read(), 'hello') 2080 2081 2082@unittest.skipUnless(hasattr(os, 'posix_spawn'), "test needs os.posix_spawn") 2083@support.requires_subprocess() 2084class TestPosixSpawn(unittest.TestCase, _PosixSpawnMixin): 2085 spawn_func = getattr(posix, 'posix_spawn', None) 2086 2087 2088@unittest.skipUnless(hasattr(os, 'posix_spawnp'), "test needs os.posix_spawnp") 2089@support.requires_subprocess() 2090class TestPosixSpawnP(unittest.TestCase, _PosixSpawnMixin): 2091 spawn_func = getattr(posix, 'posix_spawnp', None) 2092 2093 @os_helper.skip_unless_symlink 2094 def test_posix_spawnp(self): 2095 # Use a symlink to create a program in its own temporary directory 2096 temp_dir = tempfile.mkdtemp() 2097 self.addCleanup(os_helper.rmtree, temp_dir) 2098 2099 program = 'posix_spawnp_test_program.exe' 2100 program_fullpath = os.path.join(temp_dir, program) 2101 os.symlink(sys.executable, program_fullpath) 2102 2103 try: 2104 path = os.pathsep.join((temp_dir, os.environ['PATH'])) 2105 except KeyError: 2106 path = temp_dir # PATH is not set 2107 2108 spawn_args = (program, '-I', '-S', '-c', 'pass') 2109 code = textwrap.dedent(""" 2110 import os 2111 from test import support 2112 2113 args = %a 2114 pid = os.posix_spawnp(args[0], args, os.environ) 2115 2116 support.wait_process(pid, exitcode=0) 2117 """ % (spawn_args,)) 2118 2119 # Use a subprocess to test os.posix_spawnp() with a modified PATH 2120 # environment variable: posix_spawnp() uses the current environment 2121 # to locate the program, not its environment argument. 2122 args = ('-c', code) 2123 assert_python_ok(*args, PATH=path) 2124 2125 2126@unittest.skipUnless(sys.platform == "darwin", "test weak linking on macOS") 2127class TestPosixWeaklinking(unittest.TestCase): 2128 # These test cases verify that weak linking support on macOS works 2129 # as expected. These cases only test new behaviour introduced by weak linking, 2130 # regular behaviour is tested by the normal test cases. 2131 # 2132 # See the section on Weak Linking in Mac/README.txt for more information. 2133 def setUp(self): 2134 import sysconfig 2135 import platform 2136 2137 config_vars = sysconfig.get_config_vars() 2138 self.available = { nm for nm in config_vars if nm.startswith("HAVE_") and config_vars[nm] } 2139 self.mac_ver = tuple(int(part) for part in platform.mac_ver()[0].split(".")) 2140 2141 def _verify_available(self, name): 2142 if name not in self.available: 2143 raise unittest.SkipTest(f"{name} not weak-linked") 2144 2145 def test_pwritev(self): 2146 self._verify_available("HAVE_PWRITEV") 2147 if self.mac_ver >= (10, 16): 2148 self.assertTrue(hasattr(os, "pwritev"), "os.pwritev is not available") 2149 self.assertTrue(hasattr(os, "preadv"), "os.readv is not available") 2150 2151 else: 2152 self.assertFalse(hasattr(os, "pwritev"), "os.pwritev is available") 2153 self.assertFalse(hasattr(os, "preadv"), "os.readv is available") 2154 2155 def test_stat(self): 2156 self._verify_available("HAVE_FSTATAT") 2157 if self.mac_ver >= (10, 10): 2158 self.assertIn("HAVE_FSTATAT", posix._have_functions) 2159 2160 else: 2161 self.assertNotIn("HAVE_FSTATAT", posix._have_functions) 2162 2163 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2164 os.stat("file", dir_fd=0) 2165 2166 def test_ptsname_r(self): 2167 self._verify_available("HAVE_PTSNAME_R") 2168 if self.mac_ver >= (10, 13, 4): 2169 self.assertIn("HAVE_PTSNAME_R", posix._have_functions) 2170 else: 2171 self.assertNotIn("HAVE_PTSNAME_R", posix._have_functions) 2172 2173 def test_access(self): 2174 self._verify_available("HAVE_FACCESSAT") 2175 if self.mac_ver >= (10, 10): 2176 self.assertIn("HAVE_FACCESSAT", posix._have_functions) 2177 2178 else: 2179 self.assertNotIn("HAVE_FACCESSAT", posix._have_functions) 2180 2181 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2182 os.access("file", os.R_OK, dir_fd=0) 2183 2184 with self.assertRaisesRegex(NotImplementedError, "follow_symlinks unavailable"): 2185 os.access("file", os.R_OK, follow_symlinks=False) 2186 2187 with self.assertRaisesRegex(NotImplementedError, "effective_ids unavailable"): 2188 os.access("file", os.R_OK, effective_ids=True) 2189 2190 def test_chmod(self): 2191 self._verify_available("HAVE_FCHMODAT") 2192 if self.mac_ver >= (10, 10): 2193 self.assertIn("HAVE_FCHMODAT", posix._have_functions) 2194 2195 else: 2196 self.assertNotIn("HAVE_FCHMODAT", posix._have_functions) 2197 self.assertIn("HAVE_LCHMOD", posix._have_functions) 2198 2199 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2200 os.chmod("file", 0o644, dir_fd=0) 2201 2202 def test_chown(self): 2203 self._verify_available("HAVE_FCHOWNAT") 2204 if self.mac_ver >= (10, 10): 2205 self.assertIn("HAVE_FCHOWNAT", posix._have_functions) 2206 2207 else: 2208 self.assertNotIn("HAVE_FCHOWNAT", posix._have_functions) 2209 self.assertIn("HAVE_LCHOWN", posix._have_functions) 2210 2211 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2212 os.chown("file", 0, 0, dir_fd=0) 2213 2214 def test_link(self): 2215 self._verify_available("HAVE_LINKAT") 2216 if self.mac_ver >= (10, 10): 2217 self.assertIn("HAVE_LINKAT", posix._have_functions) 2218 2219 else: 2220 self.assertNotIn("HAVE_LINKAT", posix._have_functions) 2221 2222 with self.assertRaisesRegex(NotImplementedError, "src_dir_fd unavailable"): 2223 os.link("source", "target", src_dir_fd=0) 2224 2225 with self.assertRaisesRegex(NotImplementedError, "dst_dir_fd unavailable"): 2226 os.link("source", "target", dst_dir_fd=0) 2227 2228 with self.assertRaisesRegex(NotImplementedError, "src_dir_fd unavailable"): 2229 os.link("source", "target", src_dir_fd=0, dst_dir_fd=0) 2230 2231 # issue 41355: !HAVE_LINKAT code path ignores the follow_symlinks flag 2232 with os_helper.temp_dir() as base_path: 2233 link_path = os.path.join(base_path, "link") 2234 target_path = os.path.join(base_path, "target") 2235 source_path = os.path.join(base_path, "source") 2236 2237 with open(source_path, "w") as fp: 2238 fp.write("data") 2239 2240 os.symlink("target", link_path) 2241 2242 # Calling os.link should fail in the link(2) call, and 2243 # should not reject *follow_symlinks* (to match the 2244 # behaviour you'd get when building on a platform without 2245 # linkat) 2246 with self.assertRaises(FileExistsError): 2247 os.link(source_path, link_path, follow_symlinks=True) 2248 2249 with self.assertRaises(FileExistsError): 2250 os.link(source_path, link_path, follow_symlinks=False) 2251 2252 2253 def test_listdir_scandir(self): 2254 self._verify_available("HAVE_FDOPENDIR") 2255 if self.mac_ver >= (10, 10): 2256 self.assertIn("HAVE_FDOPENDIR", posix._have_functions) 2257 2258 else: 2259 self.assertNotIn("HAVE_FDOPENDIR", posix._have_functions) 2260 2261 with self.assertRaisesRegex(TypeError, "listdir: path should be string, bytes, os.PathLike or None, not int"): 2262 os.listdir(0) 2263 2264 with self.assertRaisesRegex(TypeError, "scandir: path should be string, bytes, os.PathLike or None, not int"): 2265 os.scandir(0) 2266 2267 def test_mkdir(self): 2268 self._verify_available("HAVE_MKDIRAT") 2269 if self.mac_ver >= (10, 10): 2270 self.assertIn("HAVE_MKDIRAT", posix._have_functions) 2271 2272 else: 2273 self.assertNotIn("HAVE_MKDIRAT", posix._have_functions) 2274 2275 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2276 os.mkdir("dir", dir_fd=0) 2277 2278 def test_mkfifo(self): 2279 self._verify_available("HAVE_MKFIFOAT") 2280 if self.mac_ver >= (13, 0): 2281 self.assertIn("HAVE_MKFIFOAT", posix._have_functions) 2282 2283 else: 2284 self.assertNotIn("HAVE_MKFIFOAT", posix._have_functions) 2285 2286 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2287 os.mkfifo("path", dir_fd=0) 2288 2289 def test_mknod(self): 2290 self._verify_available("HAVE_MKNODAT") 2291 if self.mac_ver >= (13, 0): 2292 self.assertIn("HAVE_MKNODAT", posix._have_functions) 2293 2294 else: 2295 self.assertNotIn("HAVE_MKNODAT", posix._have_functions) 2296 2297 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2298 os.mknod("path", dir_fd=0) 2299 2300 def test_rename_replace(self): 2301 self._verify_available("HAVE_RENAMEAT") 2302 if self.mac_ver >= (10, 10): 2303 self.assertIn("HAVE_RENAMEAT", posix._have_functions) 2304 2305 else: 2306 self.assertNotIn("HAVE_RENAMEAT", posix._have_functions) 2307 2308 with self.assertRaisesRegex(NotImplementedError, "src_dir_fd and dst_dir_fd unavailable"): 2309 os.rename("a", "b", src_dir_fd=0) 2310 2311 with self.assertRaisesRegex(NotImplementedError, "src_dir_fd and dst_dir_fd unavailable"): 2312 os.rename("a", "b", dst_dir_fd=0) 2313 2314 with self.assertRaisesRegex(NotImplementedError, "src_dir_fd and dst_dir_fd unavailable"): 2315 os.replace("a", "b", src_dir_fd=0) 2316 2317 with self.assertRaisesRegex(NotImplementedError, "src_dir_fd and dst_dir_fd unavailable"): 2318 os.replace("a", "b", dst_dir_fd=0) 2319 2320 def test_unlink_rmdir(self): 2321 self._verify_available("HAVE_UNLINKAT") 2322 if self.mac_ver >= (10, 10): 2323 self.assertIn("HAVE_UNLINKAT", posix._have_functions) 2324 2325 else: 2326 self.assertNotIn("HAVE_UNLINKAT", posix._have_functions) 2327 2328 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2329 os.unlink("path", dir_fd=0) 2330 2331 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2332 os.rmdir("path", dir_fd=0) 2333 2334 def test_open(self): 2335 self._verify_available("HAVE_OPENAT") 2336 if self.mac_ver >= (10, 10): 2337 self.assertIn("HAVE_OPENAT", posix._have_functions) 2338 2339 else: 2340 self.assertNotIn("HAVE_OPENAT", posix._have_functions) 2341 2342 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2343 os.open("path", os.O_RDONLY, dir_fd=0) 2344 2345 def test_readlink(self): 2346 self._verify_available("HAVE_READLINKAT") 2347 if self.mac_ver >= (10, 10): 2348 self.assertIn("HAVE_READLINKAT", posix._have_functions) 2349 2350 else: 2351 self.assertNotIn("HAVE_READLINKAT", posix._have_functions) 2352 2353 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2354 os.readlink("path", dir_fd=0) 2355 2356 def test_symlink(self): 2357 self._verify_available("HAVE_SYMLINKAT") 2358 if self.mac_ver >= (10, 10): 2359 self.assertIn("HAVE_SYMLINKAT", posix._have_functions) 2360 2361 else: 2362 self.assertNotIn("HAVE_SYMLINKAT", posix._have_functions) 2363 2364 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2365 os.symlink("a", "b", dir_fd=0) 2366 2367 def test_utime(self): 2368 self._verify_available("HAVE_FUTIMENS") 2369 self._verify_available("HAVE_UTIMENSAT") 2370 if self.mac_ver >= (10, 13): 2371 self.assertIn("HAVE_FUTIMENS", posix._have_functions) 2372 self.assertIn("HAVE_UTIMENSAT", posix._have_functions) 2373 2374 else: 2375 self.assertNotIn("HAVE_FUTIMENS", posix._have_functions) 2376 self.assertNotIn("HAVE_UTIMENSAT", posix._have_functions) 2377 2378 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2379 os.utime("path", dir_fd=0) 2380 2381 2382class NamespacesTests(unittest.TestCase): 2383 """Tests for os.unshare() and os.setns().""" 2384 2385 @unittest.skipUnless(hasattr(os, 'unshare'), 'needs os.unshare()') 2386 @unittest.skipUnless(hasattr(os, 'setns'), 'needs os.setns()') 2387 @unittest.skipUnless(os.path.exists('/proc/self/ns/uts'), 'need /proc/self/ns/uts') 2388 @support.requires_linux_version(3, 0, 0) 2389 def test_unshare_setns(self): 2390 code = """if 1: 2391 import errno 2392 import os 2393 import sys 2394 fd = os.open('/proc/self/ns/uts', os.O_RDONLY) 2395 try: 2396 original = os.readlink('/proc/self/ns/uts') 2397 try: 2398 os.unshare(os.CLONE_NEWUTS) 2399 except OSError as e: 2400 if e.errno == errno.ENOSPC: 2401 # skip test if limit is exceeded 2402 sys.exit() 2403 raise 2404 new = os.readlink('/proc/self/ns/uts') 2405 if original == new: 2406 raise Exception('os.unshare failed') 2407 os.setns(fd, os.CLONE_NEWUTS) 2408 restored = os.readlink('/proc/self/ns/uts') 2409 if original != restored: 2410 raise Exception('os.setns failed') 2411 except PermissionError: 2412 # The calling process did not have the required privileges 2413 # for this operation 2414 pass 2415 except OSError as e: 2416 # Skip the test on these errors: 2417 # - ENOSYS: syscall not available 2418 # - EINVAL: kernel was not configured with the CONFIG_UTS_NS option 2419 # - ENOMEM: not enough memory 2420 if e.errno not in (errno.ENOSYS, errno.EINVAL, errno.ENOMEM): 2421 raise 2422 finally: 2423 os.close(fd) 2424 """ 2425 2426 assert_python_ok("-c", code) 2427 2428 2429def tearDownModule(): 2430 support.reap_children() 2431 2432 2433if __name__ == '__main__': 2434 unittest.main() 2435