1"Test posix functions" 2 3from test import support 4from test.support import import_helper 5from test.support import os_helper 6from test.support import warnings_helper 7from test.support.script_helper import assert_python_ok 8 9# Skip these tests if there is no posix module. 10posix = import_helper.import_module('posix') 11 12import errno 13import sys 14import signal 15import time 16import os 17import platform 18import pwd 19import stat 20import tempfile 21import unittest 22import warnings 23import textwrap 24from contextlib import contextmanager 25 26_DUMMY_SYMLINK = os.path.join(tempfile.gettempdir(), 27 os_helper.TESTFN + '-dummy-symlink') 28 29requires_32b = unittest.skipUnless(sys.maxsize < 2**32, 30 'test is only meaningful on 32-bit builds') 31 32def _supports_sched(): 33 if not hasattr(posix, 'sched_getscheduler'): 34 return False 35 try: 36 posix.sched_getscheduler(0) 37 except OSError as e: 38 if e.errno == errno.ENOSYS: 39 return False 40 return True 41 42requires_sched = unittest.skipUnless(_supports_sched(), 'requires POSIX scheduler API') 43 44 45class PosixTester(unittest.TestCase): 46 47 def setUp(self): 48 # create empty file 49 with open(os_helper.TESTFN, "wb"): 50 pass 51 self.teardown_files = [ os_helper.TESTFN ] 52 self._warnings_manager = warnings_helper.check_warnings() 53 self._warnings_manager.__enter__() 54 warnings.filterwarnings('ignore', '.* potential security risk .*', 55 RuntimeWarning) 56 57 def tearDown(self): 58 for teardown_file in self.teardown_files: 59 os_helper.unlink(teardown_file) 60 self._warnings_manager.__exit__(None, None, None) 61 62 def testNoArgFunctions(self): 63 # test posix functions which take no arguments and have 64 # no side-effects which we need to cleanup (e.g., fork, wait, abort) 65 NO_ARG_FUNCTIONS = [ "ctermid", "getcwd", "getcwdb", "uname", 66 "times", "getloadavg", 67 "getegid", "geteuid", "getgid", "getgroups", 68 "getpid", "getpgrp", "getppid", "getuid", "sync", 69 ] 70 71 for name in NO_ARG_FUNCTIONS: 72 posix_func = getattr(posix, name, None) 73 if posix_func is not None: 74 posix_func() 75 self.assertRaises(TypeError, posix_func, 1) 76 77 @unittest.skipUnless(hasattr(posix, 'getresuid'), 78 'test needs posix.getresuid()') 79 def test_getresuid(self): 80 user_ids = posix.getresuid() 81 self.assertEqual(len(user_ids), 3) 82 for val in user_ids: 83 self.assertGreaterEqual(val, 0) 84 85 @unittest.skipUnless(hasattr(posix, 'getresgid'), 86 'test needs posix.getresgid()') 87 def test_getresgid(self): 88 group_ids = posix.getresgid() 89 self.assertEqual(len(group_ids), 3) 90 for val in group_ids: 91 self.assertGreaterEqual(val, 0) 92 93 @unittest.skipUnless(hasattr(posix, 'setresuid'), 94 'test needs posix.setresuid()') 95 def test_setresuid(self): 96 current_user_ids = posix.getresuid() 97 self.assertIsNone(posix.setresuid(*current_user_ids)) 98 # -1 means don't change that value. 99 self.assertIsNone(posix.setresuid(-1, -1, -1)) 100 101 @unittest.skipUnless(hasattr(posix, 'setresuid'), 102 'test needs posix.setresuid()') 103 def test_setresuid_exception(self): 104 # Don't do this test if someone is silly enough to run us as root. 105 current_user_ids = posix.getresuid() 106 if 0 not in current_user_ids: 107 new_user_ids = (current_user_ids[0]+1, -1, -1) 108 self.assertRaises(OSError, posix.setresuid, *new_user_ids) 109 110 @unittest.skipUnless(hasattr(posix, 'setresgid'), 111 'test needs posix.setresgid()') 112 def test_setresgid(self): 113 current_group_ids = posix.getresgid() 114 self.assertIsNone(posix.setresgid(*current_group_ids)) 115 # -1 means don't change that value. 116 self.assertIsNone(posix.setresgid(-1, -1, -1)) 117 118 @unittest.skipUnless(hasattr(posix, 'setresgid'), 119 'test needs posix.setresgid()') 120 def test_setresgid_exception(self): 121 # Don't do this test if someone is silly enough to run us as root. 122 current_group_ids = posix.getresgid() 123 if 0 not in current_group_ids: 124 new_group_ids = (current_group_ids[0]+1, -1, -1) 125 self.assertRaises(OSError, posix.setresgid, *new_group_ids) 126 127 @unittest.skipUnless(hasattr(posix, 'initgroups'), 128 "test needs os.initgroups()") 129 def test_initgroups(self): 130 # It takes a string and an integer; check that it raises a TypeError 131 # for other argument lists. 132 self.assertRaises(TypeError, posix.initgroups) 133 self.assertRaises(TypeError, posix.initgroups, None) 134 self.assertRaises(TypeError, posix.initgroups, 3, "foo") 135 self.assertRaises(TypeError, posix.initgroups, "foo", 3, object()) 136 137 # If a non-privileged user invokes it, it should fail with OSError 138 # EPERM. 139 if os.getuid() != 0: 140 try: 141 name = pwd.getpwuid(posix.getuid()).pw_name 142 except KeyError: 143 # the current UID may not have a pwd entry 144 raise unittest.SkipTest("need a pwd entry") 145 try: 146 posix.initgroups(name, 13) 147 except OSError as e: 148 self.assertEqual(e.errno, errno.EPERM) 149 else: 150 self.fail("Expected OSError to be raised by initgroups") 151 152 @unittest.skipUnless(hasattr(posix, 'statvfs'), 153 'test needs posix.statvfs()') 154 def test_statvfs(self): 155 self.assertTrue(posix.statvfs(os.curdir)) 156 157 @unittest.skipUnless(hasattr(posix, 'fstatvfs'), 158 'test needs posix.fstatvfs()') 159 def test_fstatvfs(self): 160 fp = open(os_helper.TESTFN) 161 try: 162 self.assertTrue(posix.fstatvfs(fp.fileno())) 163 self.assertTrue(posix.statvfs(fp.fileno())) 164 finally: 165 fp.close() 166 167 @unittest.skipUnless(hasattr(posix, 'ftruncate'), 168 'test needs posix.ftruncate()') 169 def test_ftruncate(self): 170 fp = open(os_helper.TESTFN, 'w+') 171 try: 172 # we need to have some data to truncate 173 fp.write('test') 174 fp.flush() 175 posix.ftruncate(fp.fileno(), 0) 176 finally: 177 fp.close() 178 179 @unittest.skipUnless(hasattr(posix, 'truncate'), "test needs posix.truncate()") 180 def test_truncate(self): 181 with open(os_helper.TESTFN, 'w') as fp: 182 fp.write('test') 183 fp.flush() 184 posix.truncate(os_helper.TESTFN, 0) 185 186 @unittest.skipUnless(getattr(os, 'execve', None) in os.supports_fd, "test needs execve() to support the fd parameter") 187 @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") 188 def test_fexecve(self): 189 fp = os.open(sys.executable, os.O_RDONLY) 190 try: 191 pid = os.fork() 192 if pid == 0: 193 os.chdir(os.path.split(sys.executable)[0]) 194 posix.execve(fp, [sys.executable, '-c', 'pass'], os.environ) 195 else: 196 support.wait_process(pid, exitcode=0) 197 finally: 198 os.close(fp) 199 200 201 @unittest.skipUnless(hasattr(posix, 'waitid'), "test needs posix.waitid()") 202 @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") 203 def test_waitid(self): 204 pid = os.fork() 205 if pid == 0: 206 os.chdir(os.path.split(sys.executable)[0]) 207 posix.execve(sys.executable, [sys.executable, '-c', 'pass'], os.environ) 208 else: 209 res = posix.waitid(posix.P_PID, pid, posix.WEXITED) 210 self.assertEqual(pid, res.si_pid) 211 212 @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") 213 def test_register_at_fork(self): 214 with self.assertRaises(TypeError, msg="Positional args not allowed"): 215 os.register_at_fork(lambda: None) 216 with self.assertRaises(TypeError, msg="Args must be callable"): 217 os.register_at_fork(before=2) 218 with self.assertRaises(TypeError, msg="Args must be callable"): 219 os.register_at_fork(after_in_child="three") 220 with self.assertRaises(TypeError, msg="Args must be callable"): 221 os.register_at_fork(after_in_parent=b"Five") 222 with self.assertRaises(TypeError, msg="Args must not be None"): 223 os.register_at_fork(before=None) 224 with self.assertRaises(TypeError, msg="Args must not be None"): 225 os.register_at_fork(after_in_child=None) 226 with self.assertRaises(TypeError, msg="Args must not be None"): 227 os.register_at_fork(after_in_parent=None) 228 with self.assertRaises(TypeError, msg="Invalid arg was allowed"): 229 # Ensure a combination of valid and invalid is an error. 230 os.register_at_fork(before=None, after_in_parent=lambda: 3) 231 with self.assertRaises(TypeError, msg="Invalid arg was allowed"): 232 # Ensure a combination of valid and invalid is an error. 233 os.register_at_fork(before=lambda: None, after_in_child='') 234 # We test actual registrations in their own process so as not to 235 # pollute this one. There is no way to unregister for cleanup. 236 code = """if 1: 237 import os 238 239 r, w = os.pipe() 240 fin_r, fin_w = os.pipe() 241 242 os.register_at_fork(before=lambda: os.write(w, b'A')) 243 os.register_at_fork(after_in_parent=lambda: os.write(w, b'C')) 244 os.register_at_fork(after_in_child=lambda: os.write(w, b'E')) 245 os.register_at_fork(before=lambda: os.write(w, b'B'), 246 after_in_parent=lambda: os.write(w, b'D'), 247 after_in_child=lambda: os.write(w, b'F')) 248 249 pid = os.fork() 250 if pid == 0: 251 # At this point, after-forkers have already been executed 252 os.close(w) 253 # Wait for parent to tell us to exit 254 os.read(fin_r, 1) 255 os._exit(0) 256 else: 257 try: 258 os.close(w) 259 with open(r, "rb") as f: 260 data = f.read() 261 assert len(data) == 6, data 262 # Check before-fork callbacks 263 assert data[:2] == b'BA', data 264 # Check after-fork callbacks 265 assert sorted(data[2:]) == list(b'CDEF'), data 266 assert data.index(b'C') < data.index(b'D'), data 267 assert data.index(b'E') < data.index(b'F'), data 268 finally: 269 os.write(fin_w, b'!') 270 """ 271 assert_python_ok('-c', code) 272 273 @unittest.skipUnless(hasattr(posix, 'lockf'), "test needs posix.lockf()") 274 def test_lockf(self): 275 fd = os.open(os_helper.TESTFN, os.O_WRONLY | os.O_CREAT) 276 try: 277 os.write(fd, b'test') 278 os.lseek(fd, 0, os.SEEK_SET) 279 posix.lockf(fd, posix.F_LOCK, 4) 280 # section is locked 281 posix.lockf(fd, posix.F_ULOCK, 4) 282 finally: 283 os.close(fd) 284 285 @unittest.skipUnless(hasattr(posix, 'pread'), "test needs posix.pread()") 286 def test_pread(self): 287 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 288 try: 289 os.write(fd, b'test') 290 os.lseek(fd, 0, os.SEEK_SET) 291 self.assertEqual(b'es', posix.pread(fd, 2, 1)) 292 # the first pread() shouldn't disturb the file offset 293 self.assertEqual(b'te', posix.read(fd, 2)) 294 finally: 295 os.close(fd) 296 297 @unittest.skipUnless(hasattr(posix, 'preadv'), "test needs posix.preadv()") 298 def test_preadv(self): 299 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 300 try: 301 os.write(fd, b'test1tt2t3t5t6t6t8') 302 buf = [bytearray(i) for i in [5, 3, 2]] 303 self.assertEqual(posix.preadv(fd, buf, 3), 10) 304 self.assertEqual([b't1tt2', b't3t', b'5t'], list(buf)) 305 finally: 306 os.close(fd) 307 308 @unittest.skipUnless(hasattr(posix, 'preadv'), "test needs posix.preadv()") 309 @unittest.skipUnless(hasattr(posix, 'RWF_HIPRI'), "test needs posix.RWF_HIPRI") 310 def test_preadv_flags(self): 311 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 312 try: 313 os.write(fd, b'test1tt2t3t5t6t6t8') 314 buf = [bytearray(i) for i in [5, 3, 2]] 315 self.assertEqual(posix.preadv(fd, buf, 3, os.RWF_HIPRI), 10) 316 self.assertEqual([b't1tt2', b't3t', b'5t'], list(buf)) 317 except NotImplementedError: 318 self.skipTest("preadv2 not available") 319 except OSError as inst: 320 # Is possible that the macro RWF_HIPRI was defined at compilation time 321 # but the option is not supported by the kernel or the runtime libc shared 322 # library. 323 if inst.errno in {errno.EINVAL, errno.ENOTSUP}: 324 raise unittest.SkipTest("RWF_HIPRI is not supported by the current system") 325 else: 326 raise 327 finally: 328 os.close(fd) 329 330 @unittest.skipUnless(hasattr(posix, 'preadv'), "test needs posix.preadv()") 331 @requires_32b 332 def test_preadv_overflow_32bits(self): 333 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 334 try: 335 buf = [bytearray(2**16)] * 2**15 336 with self.assertRaises(OSError) as cm: 337 os.preadv(fd, buf, 0) 338 self.assertEqual(cm.exception.errno, errno.EINVAL) 339 self.assertEqual(bytes(buf[0]), b'\0'* 2**16) 340 finally: 341 os.close(fd) 342 343 @unittest.skipUnless(hasattr(posix, 'pwrite'), "test needs posix.pwrite()") 344 def test_pwrite(self): 345 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 346 try: 347 os.write(fd, b'test') 348 os.lseek(fd, 0, os.SEEK_SET) 349 posix.pwrite(fd, b'xx', 1) 350 self.assertEqual(b'txxt', posix.read(fd, 4)) 351 finally: 352 os.close(fd) 353 354 @unittest.skipUnless(hasattr(posix, 'pwritev'), "test needs posix.pwritev()") 355 def test_pwritev(self): 356 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 357 try: 358 os.write(fd, b"xx") 359 os.lseek(fd, 0, os.SEEK_SET) 360 n = os.pwritev(fd, [b'test1', b'tt2', b't3'], 2) 361 self.assertEqual(n, 10) 362 363 os.lseek(fd, 0, os.SEEK_SET) 364 self.assertEqual(b'xxtest1tt2t3', posix.read(fd, 100)) 365 finally: 366 os.close(fd) 367 368 @unittest.skipUnless(hasattr(posix, 'pwritev'), "test needs posix.pwritev()") 369 @unittest.skipUnless(hasattr(posix, 'os.RWF_SYNC'), "test needs os.RWF_SYNC") 370 def test_pwritev_flags(self): 371 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 372 try: 373 os.write(fd,b"xx") 374 os.lseek(fd, 0, os.SEEK_SET) 375 n = os.pwritev(fd, [b'test1', b'tt2', b't3'], 2, os.RWF_SYNC) 376 self.assertEqual(n, 10) 377 378 os.lseek(fd, 0, os.SEEK_SET) 379 self.assertEqual(b'xxtest1tt2', posix.read(fd, 100)) 380 finally: 381 os.close(fd) 382 383 @unittest.skipUnless(hasattr(posix, 'pwritev'), "test needs posix.pwritev()") 384 @requires_32b 385 def test_pwritev_overflow_32bits(self): 386 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 387 try: 388 with self.assertRaises(OSError) as cm: 389 os.pwritev(fd, [b"x" * 2**16] * 2**15, 0) 390 self.assertEqual(cm.exception.errno, errno.EINVAL) 391 finally: 392 os.close(fd) 393 394 @unittest.skipUnless(hasattr(posix, 'posix_fallocate'), 395 "test needs posix.posix_fallocate()") 396 def test_posix_fallocate(self): 397 fd = os.open(os_helper.TESTFN, os.O_WRONLY | os.O_CREAT) 398 try: 399 posix.posix_fallocate(fd, 0, 10) 400 except OSError as inst: 401 # issue10812, ZFS doesn't appear to support posix_fallocate, 402 # so skip Solaris-based since they are likely to have ZFS. 403 # issue33655: Also ignore EINVAL on *BSD since ZFS is also 404 # often used there. 405 if inst.errno == errno.EINVAL and sys.platform.startswith( 406 ('sunos', 'freebsd', 'netbsd', 'openbsd', 'gnukfreebsd')): 407 raise unittest.SkipTest("test may fail on ZFS filesystems") 408 else: 409 raise 410 finally: 411 os.close(fd) 412 413 # issue31106 - posix_fallocate() does not set error in errno. 414 @unittest.skipUnless(hasattr(posix, 'posix_fallocate'), 415 "test needs posix.posix_fallocate()") 416 def test_posix_fallocate_errno(self): 417 try: 418 posix.posix_fallocate(-42, 0, 10) 419 except OSError as inst: 420 if inst.errno != errno.EBADF: 421 raise 422 423 @unittest.skipUnless(hasattr(posix, 'posix_fadvise'), 424 "test needs posix.posix_fadvise()") 425 def test_posix_fadvise(self): 426 fd = os.open(os_helper.TESTFN, os.O_RDONLY) 427 try: 428 posix.posix_fadvise(fd, 0, 0, posix.POSIX_FADV_WILLNEED) 429 finally: 430 os.close(fd) 431 432 @unittest.skipUnless(hasattr(posix, 'posix_fadvise'), 433 "test needs posix.posix_fadvise()") 434 def test_posix_fadvise_errno(self): 435 try: 436 posix.posix_fadvise(-42, 0, 0, posix.POSIX_FADV_WILLNEED) 437 except OSError as inst: 438 if inst.errno != errno.EBADF: 439 raise 440 441 @unittest.skipUnless(os.utime in os.supports_fd, "test needs fd support in os.utime") 442 def test_utime_with_fd(self): 443 now = time.time() 444 fd = os.open(os_helper.TESTFN, os.O_RDONLY) 445 try: 446 posix.utime(fd) 447 posix.utime(fd, None) 448 self.assertRaises(TypeError, posix.utime, fd, (None, None)) 449 self.assertRaises(TypeError, posix.utime, fd, (now, None)) 450 self.assertRaises(TypeError, posix.utime, fd, (None, now)) 451 posix.utime(fd, (int(now), int(now))) 452 posix.utime(fd, (now, now)) 453 self.assertRaises(ValueError, posix.utime, fd, (now, now), ns=(now, now)) 454 self.assertRaises(ValueError, posix.utime, fd, (now, 0), ns=(None, None)) 455 self.assertRaises(ValueError, posix.utime, fd, (None, None), ns=(now, 0)) 456 posix.utime(fd, (int(now), int((now - int(now)) * 1e9))) 457 posix.utime(fd, ns=(int(now), int((now - int(now)) * 1e9))) 458 459 finally: 460 os.close(fd) 461 462 @unittest.skipUnless(os.utime in os.supports_follow_symlinks, "test needs follow_symlinks support in os.utime") 463 def test_utime_nofollow_symlinks(self): 464 now = time.time() 465 posix.utime(os_helper.TESTFN, None, follow_symlinks=False) 466 self.assertRaises(TypeError, posix.utime, os_helper.TESTFN, 467 (None, None), follow_symlinks=False) 468 self.assertRaises(TypeError, posix.utime, os_helper.TESTFN, 469 (now, None), follow_symlinks=False) 470 self.assertRaises(TypeError, posix.utime, os_helper.TESTFN, 471 (None, now), follow_symlinks=False) 472 posix.utime(os_helper.TESTFN, (int(now), int(now)), 473 follow_symlinks=False) 474 posix.utime(os_helper.TESTFN, (now, now), follow_symlinks=False) 475 posix.utime(os_helper.TESTFN, follow_symlinks=False) 476 477 @unittest.skipUnless(hasattr(posix, 'writev'), "test needs posix.writev()") 478 def test_writev(self): 479 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 480 try: 481 n = os.writev(fd, (b'test1', b'tt2', b't3')) 482 self.assertEqual(n, 10) 483 484 os.lseek(fd, 0, os.SEEK_SET) 485 self.assertEqual(b'test1tt2t3', posix.read(fd, 10)) 486 487 # Issue #20113: empty list of buffers should not crash 488 try: 489 size = posix.writev(fd, []) 490 except OSError: 491 # writev(fd, []) raises OSError(22, "Invalid argument") 492 # on OpenIndiana 493 pass 494 else: 495 self.assertEqual(size, 0) 496 finally: 497 os.close(fd) 498 499 @unittest.skipUnless(hasattr(posix, 'writev'), "test needs posix.writev()") 500 @requires_32b 501 def test_writev_overflow_32bits(self): 502 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 503 try: 504 with self.assertRaises(OSError) as cm: 505 os.writev(fd, [b"x" * 2**16] * 2**15) 506 self.assertEqual(cm.exception.errno, errno.EINVAL) 507 finally: 508 os.close(fd) 509 510 @unittest.skipUnless(hasattr(posix, 'readv'), "test needs posix.readv()") 511 def test_readv(self): 512 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 513 try: 514 os.write(fd, b'test1tt2t3') 515 os.lseek(fd, 0, os.SEEK_SET) 516 buf = [bytearray(i) for i in [5, 3, 2]] 517 self.assertEqual(posix.readv(fd, buf), 10) 518 self.assertEqual([b'test1', b'tt2', b't3'], [bytes(i) for i in buf]) 519 520 # Issue #20113: empty list of buffers should not crash 521 try: 522 size = posix.readv(fd, []) 523 except OSError: 524 # readv(fd, []) raises OSError(22, "Invalid argument") 525 # on OpenIndiana 526 pass 527 else: 528 self.assertEqual(size, 0) 529 finally: 530 os.close(fd) 531 532 @unittest.skipUnless(hasattr(posix, 'readv'), "test needs posix.readv()") 533 @requires_32b 534 def test_readv_overflow_32bits(self): 535 fd = os.open(os_helper.TESTFN, os.O_RDWR | os.O_CREAT) 536 try: 537 buf = [bytearray(2**16)] * 2**15 538 with self.assertRaises(OSError) as cm: 539 os.readv(fd, buf) 540 self.assertEqual(cm.exception.errno, errno.EINVAL) 541 self.assertEqual(bytes(buf[0]), b'\0'* 2**16) 542 finally: 543 os.close(fd) 544 545 @unittest.skipUnless(hasattr(posix, 'dup'), 546 'test needs posix.dup()') 547 def test_dup(self): 548 fp = open(os_helper.TESTFN) 549 try: 550 fd = posix.dup(fp.fileno()) 551 self.assertIsInstance(fd, int) 552 os.close(fd) 553 finally: 554 fp.close() 555 556 @unittest.skipUnless(hasattr(posix, 'confstr'), 557 'test needs posix.confstr()') 558 def test_confstr(self): 559 self.assertRaises(ValueError, posix.confstr, "CS_garbage") 560 self.assertEqual(len(posix.confstr("CS_PATH")) > 0, True) 561 562 @unittest.skipUnless(hasattr(posix, 'dup2'), 563 'test needs posix.dup2()') 564 def test_dup2(self): 565 fp1 = open(os_helper.TESTFN) 566 fp2 = open(os_helper.TESTFN) 567 try: 568 posix.dup2(fp1.fileno(), fp2.fileno()) 569 finally: 570 fp1.close() 571 fp2.close() 572 573 @unittest.skipUnless(hasattr(os, 'O_CLOEXEC'), "needs os.O_CLOEXEC") 574 @support.requires_linux_version(2, 6, 23) 575 def test_oscloexec(self): 576 fd = os.open(os_helper.TESTFN, os.O_RDONLY|os.O_CLOEXEC) 577 self.addCleanup(os.close, fd) 578 self.assertFalse(os.get_inheritable(fd)) 579 580 @unittest.skipUnless(hasattr(posix, 'O_EXLOCK'), 581 'test needs posix.O_EXLOCK') 582 def test_osexlock(self): 583 fd = os.open(os_helper.TESTFN, 584 os.O_WRONLY|os.O_EXLOCK|os.O_CREAT) 585 self.assertRaises(OSError, os.open, os_helper.TESTFN, 586 os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK) 587 os.close(fd) 588 589 if hasattr(posix, "O_SHLOCK"): 590 fd = os.open(os_helper.TESTFN, 591 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) 592 self.assertRaises(OSError, os.open, os_helper.TESTFN, 593 os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK) 594 os.close(fd) 595 596 @unittest.skipUnless(hasattr(posix, 'O_SHLOCK'), 597 'test needs posix.O_SHLOCK') 598 def test_osshlock(self): 599 fd1 = os.open(os_helper.TESTFN, 600 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) 601 fd2 = os.open(os_helper.TESTFN, 602 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) 603 os.close(fd2) 604 os.close(fd1) 605 606 if hasattr(posix, "O_EXLOCK"): 607 fd = os.open(os_helper.TESTFN, 608 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) 609 self.assertRaises(OSError, os.open, os_helper.TESTFN, 610 os.O_RDONLY|os.O_EXLOCK|os.O_NONBLOCK) 611 os.close(fd) 612 613 @unittest.skipUnless(hasattr(posix, 'fstat'), 614 'test needs posix.fstat()') 615 def test_fstat(self): 616 fp = open(os_helper.TESTFN) 617 try: 618 self.assertTrue(posix.fstat(fp.fileno())) 619 self.assertTrue(posix.stat(fp.fileno())) 620 621 self.assertRaisesRegex(TypeError, 622 'should be string, bytes, os.PathLike or integer, not', 623 posix.stat, float(fp.fileno())) 624 finally: 625 fp.close() 626 627 def test_stat(self): 628 self.assertTrue(posix.stat(os_helper.TESTFN)) 629 self.assertTrue(posix.stat(os.fsencode(os_helper.TESTFN))) 630 631 self.assertWarnsRegex(DeprecationWarning, 632 'should be string, bytes, os.PathLike or integer, not', 633 posix.stat, bytearray(os.fsencode(os_helper.TESTFN))) 634 self.assertRaisesRegex(TypeError, 635 'should be string, bytes, os.PathLike or integer, not', 636 posix.stat, None) 637 self.assertRaisesRegex(TypeError, 638 'should be string, bytes, os.PathLike or integer, not', 639 posix.stat, list(os_helper.TESTFN)) 640 self.assertRaisesRegex(TypeError, 641 'should be string, bytes, os.PathLike or integer, not', 642 posix.stat, list(os.fsencode(os_helper.TESTFN))) 643 644 @unittest.skipUnless(hasattr(posix, 'mkfifo'), "don't have mkfifo()") 645 def test_mkfifo(self): 646 if sys.platform == "vxworks": 647 fifo_path = os.path.join("/fifos/", os_helper.TESTFN) 648 else: 649 fifo_path = os_helper.TESTFN 650 os_helper.unlink(fifo_path) 651 self.addCleanup(os_helper.unlink, fifo_path) 652 try: 653 posix.mkfifo(fifo_path, stat.S_IRUSR | stat.S_IWUSR) 654 except PermissionError as e: 655 self.skipTest('posix.mkfifo(): %s' % e) 656 self.assertTrue(stat.S_ISFIFO(posix.stat(fifo_path).st_mode)) 657 658 @unittest.skipUnless(hasattr(posix, 'mknod') and hasattr(stat, 'S_IFIFO'), 659 "don't have mknod()/S_IFIFO") 660 def test_mknod(self): 661 # Test using mknod() to create a FIFO (the only use specified 662 # by POSIX). 663 os_helper.unlink(os_helper.TESTFN) 664 mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR 665 try: 666 posix.mknod(os_helper.TESTFN, mode, 0) 667 except OSError as e: 668 # Some old systems don't allow unprivileged users to use 669 # mknod(), or only support creating device nodes. 670 self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES)) 671 else: 672 self.assertTrue(stat.S_ISFIFO(posix.stat(os_helper.TESTFN).st_mode)) 673 674 # Keyword arguments are also supported 675 os_helper.unlink(os_helper.TESTFN) 676 try: 677 posix.mknod(path=os_helper.TESTFN, mode=mode, device=0, 678 dir_fd=None) 679 except OSError as e: 680 self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES)) 681 682 @unittest.skipUnless(hasattr(posix, 'makedev'), 'test needs posix.makedev()') 683 def test_makedev(self): 684 st = posix.stat(os_helper.TESTFN) 685 dev = st.st_dev 686 self.assertIsInstance(dev, int) 687 self.assertGreaterEqual(dev, 0) 688 689 major = posix.major(dev) 690 self.assertIsInstance(major, int) 691 self.assertGreaterEqual(major, 0) 692 self.assertEqual(posix.major(dev), major) 693 self.assertRaises(TypeError, posix.major, float(dev)) 694 self.assertRaises(TypeError, posix.major) 695 self.assertRaises((ValueError, OverflowError), posix.major, -1) 696 697 minor = posix.minor(dev) 698 self.assertIsInstance(minor, int) 699 self.assertGreaterEqual(minor, 0) 700 self.assertEqual(posix.minor(dev), minor) 701 self.assertRaises(TypeError, posix.minor, float(dev)) 702 self.assertRaises(TypeError, posix.minor) 703 self.assertRaises((ValueError, OverflowError), posix.minor, -1) 704 705 self.assertEqual(posix.makedev(major, minor), dev) 706 self.assertRaises(TypeError, posix.makedev, float(major), minor) 707 self.assertRaises(TypeError, posix.makedev, major, float(minor)) 708 self.assertRaises(TypeError, posix.makedev, major) 709 self.assertRaises(TypeError, posix.makedev) 710 711 def _test_all_chown_common(self, chown_func, first_param, stat_func): 712 """Common code for chown, fchown and lchown tests.""" 713 def check_stat(uid, gid): 714 if stat_func is not None: 715 stat = stat_func(first_param) 716 self.assertEqual(stat.st_uid, uid) 717 self.assertEqual(stat.st_gid, gid) 718 uid = os.getuid() 719 gid = os.getgid() 720 # test a successful chown call 721 chown_func(first_param, uid, gid) 722 check_stat(uid, gid) 723 chown_func(first_param, -1, gid) 724 check_stat(uid, gid) 725 chown_func(first_param, uid, -1) 726 check_stat(uid, gid) 727 728 if sys.platform == "vxworks": 729 # On VxWorks, root user id is 1 and 0 means no login user: 730 # both are super users. 731 is_root = (uid in (0, 1)) 732 else: 733 is_root = (uid == 0) 734 if is_root: 735 # Try an amusingly large uid/gid to make sure we handle 736 # large unsigned values. (chown lets you use any 737 # uid/gid you like, even if they aren't defined.) 738 # 739 # On VxWorks uid_t is defined as unsigned short. A big 740 # value greater than 65535 will result in underflow error. 741 # 742 # This problem keeps coming up: 743 # http://bugs.python.org/issue1747858 744 # http://bugs.python.org/issue4591 745 # http://bugs.python.org/issue15301 746 # Hopefully the fix in 4591 fixes it for good! 747 # 748 # This part of the test only runs when run as root. 749 # Only scary people run their tests as root. 750 751 big_value = (2**31 if sys.platform != "vxworks" else 2**15) 752 chown_func(first_param, big_value, big_value) 753 check_stat(big_value, big_value) 754 chown_func(first_param, -1, -1) 755 check_stat(big_value, big_value) 756 chown_func(first_param, uid, gid) 757 check_stat(uid, gid) 758 elif platform.system() in ('HP-UX', 'SunOS'): 759 # HP-UX and Solaris can allow a non-root user to chown() to root 760 # (issue #5113) 761 raise unittest.SkipTest("Skipping because of non-standard chown() " 762 "behavior") 763 else: 764 # non-root cannot chown to root, raises OSError 765 self.assertRaises(OSError, chown_func, first_param, 0, 0) 766 check_stat(uid, gid) 767 self.assertRaises(OSError, chown_func, first_param, 0, -1) 768 check_stat(uid, gid) 769 if 0 not in os.getgroups(): 770 self.assertRaises(OSError, chown_func, first_param, -1, 0) 771 check_stat(uid, gid) 772 # test illegal types 773 for t in str, float: 774 self.assertRaises(TypeError, chown_func, first_param, t(uid), gid) 775 check_stat(uid, gid) 776 self.assertRaises(TypeError, chown_func, first_param, uid, t(gid)) 777 check_stat(uid, gid) 778 779 @unittest.skipUnless(hasattr(posix, 'chown'), "test needs os.chown()") 780 def test_chown(self): 781 # raise an OSError if the file does not exist 782 os.unlink(os_helper.TESTFN) 783 self.assertRaises(OSError, posix.chown, os_helper.TESTFN, -1, -1) 784 785 # re-create the file 786 os_helper.create_empty_file(os_helper.TESTFN) 787 self._test_all_chown_common(posix.chown, os_helper.TESTFN, posix.stat) 788 789 @unittest.skipUnless(hasattr(posix, 'fchown'), "test needs os.fchown()") 790 def test_fchown(self): 791 os.unlink(os_helper.TESTFN) 792 793 # re-create the file 794 test_file = open(os_helper.TESTFN, 'w') 795 try: 796 fd = test_file.fileno() 797 self._test_all_chown_common(posix.fchown, fd, 798 getattr(posix, 'fstat', None)) 799 finally: 800 test_file.close() 801 802 @unittest.skipUnless(hasattr(posix, 'lchown'), "test needs os.lchown()") 803 def test_lchown(self): 804 os.unlink(os_helper.TESTFN) 805 # create a symlink 806 os.symlink(_DUMMY_SYMLINK, os_helper.TESTFN) 807 self._test_all_chown_common(posix.lchown, os_helper.TESTFN, 808 getattr(posix, 'lstat', None)) 809 810 @unittest.skipUnless(hasattr(posix, 'chdir'), 'test needs posix.chdir()') 811 def test_chdir(self): 812 posix.chdir(os.curdir) 813 self.assertRaises(OSError, posix.chdir, os_helper.TESTFN) 814 815 def test_listdir(self): 816 self.assertIn(os_helper.TESTFN, posix.listdir(os.curdir)) 817 818 def test_listdir_default(self): 819 # When listdir is called without argument, 820 # it's the same as listdir(os.curdir). 821 self.assertIn(os_helper.TESTFN, posix.listdir()) 822 823 def test_listdir_bytes(self): 824 # When listdir is called with a bytes object, 825 # the returned strings are of type bytes. 826 self.assertIn(os.fsencode(os_helper.TESTFN), posix.listdir(b'.')) 827 828 def test_listdir_bytes_like(self): 829 for cls in bytearray, memoryview: 830 with self.assertWarns(DeprecationWarning): 831 names = posix.listdir(cls(b'.')) 832 self.assertIn(os.fsencode(os_helper.TESTFN), names) 833 for name in names: 834 self.assertIs(type(name), bytes) 835 836 @unittest.skipUnless(posix.listdir in os.supports_fd, 837 "test needs fd support for posix.listdir()") 838 def test_listdir_fd(self): 839 f = posix.open(posix.getcwd(), posix.O_RDONLY) 840 self.addCleanup(posix.close, f) 841 self.assertEqual( 842 sorted(posix.listdir('.')), 843 sorted(posix.listdir(f)) 844 ) 845 # Check that the fd offset was reset (issue #13739) 846 self.assertEqual( 847 sorted(posix.listdir('.')), 848 sorted(posix.listdir(f)) 849 ) 850 851 @unittest.skipUnless(hasattr(posix, 'access'), 'test needs posix.access()') 852 def test_access(self): 853 self.assertTrue(posix.access(os_helper.TESTFN, os.R_OK)) 854 855 @unittest.skipUnless(hasattr(posix, 'umask'), 'test needs posix.umask()') 856 def test_umask(self): 857 old_mask = posix.umask(0) 858 self.assertIsInstance(old_mask, int) 859 posix.umask(old_mask) 860 861 @unittest.skipUnless(hasattr(posix, 'strerror'), 862 'test needs posix.strerror()') 863 def test_strerror(self): 864 self.assertTrue(posix.strerror(0)) 865 866 @unittest.skipUnless(hasattr(posix, 'pipe'), 'test needs posix.pipe()') 867 def test_pipe(self): 868 reader, writer = posix.pipe() 869 os.close(reader) 870 os.close(writer) 871 872 @unittest.skipUnless(hasattr(os, 'pipe2'), "test needs os.pipe2()") 873 @support.requires_linux_version(2, 6, 27) 874 def test_pipe2(self): 875 self.assertRaises(TypeError, os.pipe2, 'DEADBEEF') 876 self.assertRaises(TypeError, os.pipe2, 0, 0) 877 878 # try calling with flags = 0, like os.pipe() 879 r, w = os.pipe2(0) 880 os.close(r) 881 os.close(w) 882 883 # test flags 884 r, w = os.pipe2(os.O_CLOEXEC|os.O_NONBLOCK) 885 self.addCleanup(os.close, r) 886 self.addCleanup(os.close, w) 887 self.assertFalse(os.get_inheritable(r)) 888 self.assertFalse(os.get_inheritable(w)) 889 self.assertFalse(os.get_blocking(r)) 890 self.assertFalse(os.get_blocking(w)) 891 # try reading from an empty pipe: this should fail, not block 892 self.assertRaises(OSError, os.read, r, 1) 893 # try a write big enough to fill-up the pipe: this should either 894 # fail or perform a partial write, not block 895 try: 896 os.write(w, b'x' * support.PIPE_MAX_SIZE) 897 except OSError: 898 pass 899 900 @support.cpython_only 901 @unittest.skipUnless(hasattr(os, 'pipe2'), "test needs os.pipe2()") 902 @support.requires_linux_version(2, 6, 27) 903 def test_pipe2_c_limits(self): 904 # Issue 15989 905 import _testcapi 906 self.assertRaises(OverflowError, os.pipe2, _testcapi.INT_MAX + 1) 907 self.assertRaises(OverflowError, os.pipe2, _testcapi.UINT_MAX + 1) 908 909 @unittest.skipUnless(hasattr(posix, 'utime'), 'test needs posix.utime()') 910 def test_utime(self): 911 now = time.time() 912 posix.utime(os_helper.TESTFN, None) 913 self.assertRaises(TypeError, posix.utime, 914 os_helper.TESTFN, (None, None)) 915 self.assertRaises(TypeError, posix.utime, 916 os_helper.TESTFN, (now, None)) 917 self.assertRaises(TypeError, posix.utime, 918 os_helper.TESTFN, (None, now)) 919 posix.utime(os_helper.TESTFN, (int(now), int(now))) 920 posix.utime(os_helper.TESTFN, (now, now)) 921 922 def _test_chflags_regular_file(self, chflags_func, target_file, **kwargs): 923 st = os.stat(target_file) 924 self.assertTrue(hasattr(st, 'st_flags')) 925 926 # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE. 927 flags = st.st_flags | stat.UF_IMMUTABLE 928 try: 929 chflags_func(target_file, flags, **kwargs) 930 except OSError as err: 931 if err.errno != errno.EOPNOTSUPP: 932 raise 933 msg = 'chflag UF_IMMUTABLE not supported by underlying fs' 934 self.skipTest(msg) 935 936 try: 937 new_st = os.stat(target_file) 938 self.assertEqual(st.st_flags | stat.UF_IMMUTABLE, new_st.st_flags) 939 try: 940 fd = open(target_file, 'w+') 941 except OSError as e: 942 self.assertEqual(e.errno, errno.EPERM) 943 finally: 944 posix.chflags(target_file, st.st_flags) 945 946 @unittest.skipUnless(hasattr(posix, 'chflags'), 'test needs os.chflags()') 947 def test_chflags(self): 948 self._test_chflags_regular_file(posix.chflags, os_helper.TESTFN) 949 950 @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()') 951 def test_lchflags_regular_file(self): 952 self._test_chflags_regular_file(posix.lchflags, os_helper.TESTFN) 953 self._test_chflags_regular_file(posix.chflags, os_helper.TESTFN, 954 follow_symlinks=False) 955 956 @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()') 957 def test_lchflags_symlink(self): 958 testfn_st = os.stat(os_helper.TESTFN) 959 960 self.assertTrue(hasattr(testfn_st, 'st_flags')) 961 962 os.symlink(os_helper.TESTFN, _DUMMY_SYMLINK) 963 self.teardown_files.append(_DUMMY_SYMLINK) 964 dummy_symlink_st = os.lstat(_DUMMY_SYMLINK) 965 966 def chflags_nofollow(path, flags): 967 return posix.chflags(path, flags, follow_symlinks=False) 968 969 for fn in (posix.lchflags, chflags_nofollow): 970 # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE. 971 flags = dummy_symlink_st.st_flags | stat.UF_IMMUTABLE 972 try: 973 fn(_DUMMY_SYMLINK, flags) 974 except OSError as err: 975 if err.errno != errno.EOPNOTSUPP: 976 raise 977 msg = 'chflag UF_IMMUTABLE not supported by underlying fs' 978 self.skipTest(msg) 979 try: 980 new_testfn_st = os.stat(os_helper.TESTFN) 981 new_dummy_symlink_st = os.lstat(_DUMMY_SYMLINK) 982 983 self.assertEqual(testfn_st.st_flags, new_testfn_st.st_flags) 984 self.assertEqual(dummy_symlink_st.st_flags | stat.UF_IMMUTABLE, 985 new_dummy_symlink_st.st_flags) 986 finally: 987 fn(_DUMMY_SYMLINK, dummy_symlink_st.st_flags) 988 989 def test_environ(self): 990 if os.name == "nt": 991 item_type = str 992 else: 993 item_type = bytes 994 for k, v in posix.environ.items(): 995 self.assertEqual(type(k), item_type) 996 self.assertEqual(type(v), item_type) 997 998 def test_putenv(self): 999 with self.assertRaises(ValueError): 1000 os.putenv('FRUIT\0VEGETABLE', 'cabbage') 1001 with self.assertRaises(ValueError): 1002 os.putenv(b'FRUIT\0VEGETABLE', b'cabbage') 1003 with self.assertRaises(ValueError): 1004 os.putenv('FRUIT', 'orange\0VEGETABLE=cabbage') 1005 with self.assertRaises(ValueError): 1006 os.putenv(b'FRUIT', b'orange\0VEGETABLE=cabbage') 1007 with self.assertRaises(ValueError): 1008 os.putenv('FRUIT=ORANGE', 'lemon') 1009 with self.assertRaises(ValueError): 1010 os.putenv(b'FRUIT=ORANGE', b'lemon') 1011 1012 @unittest.skipUnless(hasattr(posix, 'getcwd'), 'test needs posix.getcwd()') 1013 def test_getcwd_long_pathnames(self): 1014 dirname = 'getcwd-test-directory-0123456789abcdef-01234567890abcdef' 1015 curdir = os.getcwd() 1016 base_path = os.path.abspath(os_helper.TESTFN) + '.getcwd' 1017 1018 try: 1019 os.mkdir(base_path) 1020 os.chdir(base_path) 1021 except: 1022 # Just returning nothing instead of the SkipTest exception, because 1023 # the test results in Error in that case. Is that ok? 1024 # raise unittest.SkipTest("cannot create directory for testing") 1025 return 1026 1027 def _create_and_do_getcwd(dirname, current_path_length = 0): 1028 try: 1029 os.mkdir(dirname) 1030 except: 1031 raise unittest.SkipTest("mkdir cannot create directory sufficiently deep for getcwd test") 1032 1033 os.chdir(dirname) 1034 try: 1035 os.getcwd() 1036 if current_path_length < 1027: 1037 _create_and_do_getcwd(dirname, current_path_length + len(dirname) + 1) 1038 finally: 1039 os.chdir('..') 1040 os.rmdir(dirname) 1041 1042 _create_and_do_getcwd(dirname) 1043 1044 finally: 1045 os.chdir(curdir) 1046 os_helper.rmtree(base_path) 1047 1048 @unittest.skipUnless(hasattr(posix, 'getgrouplist'), "test needs posix.getgrouplist()") 1049 @unittest.skipUnless(hasattr(pwd, 'getpwuid'), "test needs pwd.getpwuid()") 1050 @unittest.skipUnless(hasattr(os, 'getuid'), "test needs os.getuid()") 1051 def test_getgrouplist(self): 1052 user = pwd.getpwuid(os.getuid())[0] 1053 group = pwd.getpwuid(os.getuid())[3] 1054 self.assertIn(group, posix.getgrouplist(user, group)) 1055 1056 1057 @unittest.skipUnless(hasattr(os, 'getegid'), "test needs os.getegid()") 1058 @unittest.skipUnless(hasattr(os, 'popen'), "test needs os.popen()") 1059 def test_getgroups(self): 1060 with os.popen('id -G 2>/dev/null') as idg: 1061 groups = idg.read().strip() 1062 ret = idg.close() 1063 1064 try: 1065 idg_groups = set(int(g) for g in groups.split()) 1066 except ValueError: 1067 idg_groups = set() 1068 if ret is not None or not idg_groups: 1069 raise unittest.SkipTest("need working 'id -G'") 1070 1071 # Issues 16698: OS X ABIs prior to 10.6 have limits on getgroups() 1072 if sys.platform == 'darwin': 1073 import sysconfig 1074 dt = sysconfig.get_config_var('MACOSX_DEPLOYMENT_TARGET') or '10.3' 1075 if tuple(int(n) for n in dt.split('.')[0:2]) < (10, 6): 1076 raise unittest.SkipTest("getgroups(2) is broken prior to 10.6") 1077 1078 # 'id -G' and 'os.getgroups()' should return the same 1079 # groups, ignoring order, duplicates, and the effective gid. 1080 # #10822/#26944 - It is implementation defined whether 1081 # posix.getgroups() includes the effective gid. 1082 symdiff = idg_groups.symmetric_difference(posix.getgroups()) 1083 self.assertTrue(not symdiff or symdiff == {posix.getegid()}) 1084 1085 @unittest.skipUnless(hasattr(signal, 'SIGCHLD'), 'CLD_XXXX be placed in si_code for a SIGCHLD signal') 1086 @unittest.skipUnless(hasattr(os, 'waitid_result'), "test needs os.waitid_result") 1087 def test_cld_xxxx_constants(self): 1088 os.CLD_EXITED 1089 os.CLD_KILLED 1090 os.CLD_DUMPED 1091 os.CLD_TRAPPED 1092 os.CLD_STOPPED 1093 os.CLD_CONTINUED 1094 1095 requires_sched_h = unittest.skipUnless(hasattr(posix, 'sched_yield'), 1096 "don't have scheduling support") 1097 requires_sched_affinity = unittest.skipUnless(hasattr(posix, 'sched_setaffinity'), 1098 "don't have sched affinity support") 1099 1100 @requires_sched_h 1101 def test_sched_yield(self): 1102 # This has no error conditions (at least on Linux). 1103 posix.sched_yield() 1104 1105 @requires_sched_h 1106 @unittest.skipUnless(hasattr(posix, 'sched_get_priority_max'), 1107 "requires sched_get_priority_max()") 1108 def test_sched_priority(self): 1109 # Round-robin usually has interesting priorities. 1110 pol = posix.SCHED_RR 1111 lo = posix.sched_get_priority_min(pol) 1112 hi = posix.sched_get_priority_max(pol) 1113 self.assertIsInstance(lo, int) 1114 self.assertIsInstance(hi, int) 1115 self.assertGreaterEqual(hi, lo) 1116 # OSX evidently just returns 15 without checking the argument. 1117 if sys.platform != "darwin": 1118 self.assertRaises(OSError, posix.sched_get_priority_min, -23) 1119 self.assertRaises(OSError, posix.sched_get_priority_max, -23) 1120 1121 @requires_sched 1122 def test_get_and_set_scheduler_and_param(self): 1123 possible_schedulers = [sched for name, sched in posix.__dict__.items() 1124 if name.startswith("SCHED_")] 1125 mine = posix.sched_getscheduler(0) 1126 self.assertIn(mine, possible_schedulers) 1127 try: 1128 parent = posix.sched_getscheduler(os.getppid()) 1129 except OSError as e: 1130 if e.errno != errno.EPERM: 1131 raise 1132 else: 1133 self.assertIn(parent, possible_schedulers) 1134 self.assertRaises(OSError, posix.sched_getscheduler, -1) 1135 self.assertRaises(OSError, posix.sched_getparam, -1) 1136 param = posix.sched_getparam(0) 1137 self.assertIsInstance(param.sched_priority, int) 1138 1139 # POSIX states that calling sched_setparam() or sched_setscheduler() on 1140 # a process with a scheduling policy other than SCHED_FIFO or SCHED_RR 1141 # is implementation-defined: NetBSD and FreeBSD can return EINVAL. 1142 if not sys.platform.startswith(('freebsd', 'netbsd')): 1143 try: 1144 posix.sched_setscheduler(0, mine, param) 1145 posix.sched_setparam(0, param) 1146 except OSError as e: 1147 if e.errno != errno.EPERM: 1148 raise 1149 self.assertRaises(OSError, posix.sched_setparam, -1, param) 1150 1151 self.assertRaises(OSError, posix.sched_setscheduler, -1, mine, param) 1152 self.assertRaises(TypeError, posix.sched_setscheduler, 0, mine, None) 1153 self.assertRaises(TypeError, posix.sched_setparam, 0, 43) 1154 param = posix.sched_param(None) 1155 self.assertRaises(TypeError, posix.sched_setparam, 0, param) 1156 large = 214748364700 1157 param = posix.sched_param(large) 1158 self.assertRaises(OverflowError, posix.sched_setparam, 0, param) 1159 param = posix.sched_param(sched_priority=-large) 1160 self.assertRaises(OverflowError, posix.sched_setparam, 0, param) 1161 1162 @unittest.skipUnless(hasattr(posix, "sched_rr_get_interval"), "no function") 1163 def test_sched_rr_get_interval(self): 1164 try: 1165 interval = posix.sched_rr_get_interval(0) 1166 except OSError as e: 1167 # This likely means that sched_rr_get_interval is only valid for 1168 # processes with the SCHED_RR scheduler in effect. 1169 if e.errno != errno.EINVAL: 1170 raise 1171 self.skipTest("only works on SCHED_RR processes") 1172 self.assertIsInstance(interval, float) 1173 # Reasonable constraints, I think. 1174 self.assertGreaterEqual(interval, 0.) 1175 self.assertLess(interval, 1.) 1176 1177 @requires_sched_affinity 1178 def test_sched_getaffinity(self): 1179 mask = posix.sched_getaffinity(0) 1180 self.assertIsInstance(mask, set) 1181 self.assertGreaterEqual(len(mask), 1) 1182 self.assertRaises(OSError, posix.sched_getaffinity, -1) 1183 for cpu in mask: 1184 self.assertIsInstance(cpu, int) 1185 self.assertGreaterEqual(cpu, 0) 1186 self.assertLess(cpu, 1 << 32) 1187 1188 @requires_sched_affinity 1189 def test_sched_setaffinity(self): 1190 mask = posix.sched_getaffinity(0) 1191 if len(mask) > 1: 1192 # Empty masks are forbidden 1193 mask.pop() 1194 posix.sched_setaffinity(0, mask) 1195 self.assertEqual(posix.sched_getaffinity(0), mask) 1196 self.assertRaises(OSError, posix.sched_setaffinity, 0, []) 1197 self.assertRaises(ValueError, posix.sched_setaffinity, 0, [-10]) 1198 self.assertRaises(ValueError, posix.sched_setaffinity, 0, map(int, "0X")) 1199 self.assertRaises(OverflowError, posix.sched_setaffinity, 0, [1<<128]) 1200 self.assertRaises(OSError, posix.sched_setaffinity, -1, mask) 1201 1202 def test_rtld_constants(self): 1203 # check presence of major RTLD_* constants 1204 posix.RTLD_LAZY 1205 posix.RTLD_NOW 1206 posix.RTLD_GLOBAL 1207 posix.RTLD_LOCAL 1208 1209 @unittest.skipUnless(hasattr(os, 'SEEK_HOLE'), 1210 "test needs an OS that reports file holes") 1211 def test_fs_holes(self): 1212 # Even if the filesystem doesn't report holes, 1213 # if the OS supports it the SEEK_* constants 1214 # will be defined and will have a consistent 1215 # behaviour: 1216 # os.SEEK_DATA = current position 1217 # os.SEEK_HOLE = end of file position 1218 with open(os_helper.TESTFN, 'r+b') as fp: 1219 fp.write(b"hello") 1220 fp.flush() 1221 size = fp.tell() 1222 fno = fp.fileno() 1223 try : 1224 for i in range(size): 1225 self.assertEqual(i, os.lseek(fno, i, os.SEEK_DATA)) 1226 self.assertLessEqual(size, os.lseek(fno, i, os.SEEK_HOLE)) 1227 self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_DATA) 1228 self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_HOLE) 1229 except OSError : 1230 # Some OSs claim to support SEEK_HOLE/SEEK_DATA 1231 # but it is not true. 1232 # For instance: 1233 # http://lists.freebsd.org/pipermail/freebsd-amd64/2012-January/014332.html 1234 raise unittest.SkipTest("OSError raised!") 1235 1236 def test_path_error2(self): 1237 """ 1238 Test functions that call path_error2(), providing two filenames in their exceptions. 1239 """ 1240 for name in ("rename", "replace", "link"): 1241 function = getattr(os, name, None) 1242 if function is None: 1243 continue 1244 1245 for dst in ("noodly2", os_helper.TESTFN): 1246 try: 1247 function('doesnotexistfilename', dst) 1248 except OSError as e: 1249 self.assertIn("'doesnotexistfilename' -> '{}'".format(dst), str(e)) 1250 break 1251 else: 1252 self.fail("No valid path_error2() test for os." + name) 1253 1254 def test_path_with_null_character(self): 1255 fn = os_helper.TESTFN 1256 fn_with_NUL = fn + '\0' 1257 self.addCleanup(os_helper.unlink, fn) 1258 os_helper.unlink(fn) 1259 fd = None 1260 try: 1261 with self.assertRaises(ValueError): 1262 fd = os.open(fn_with_NUL, os.O_WRONLY | os.O_CREAT) # raises 1263 finally: 1264 if fd is not None: 1265 os.close(fd) 1266 self.assertFalse(os.path.exists(fn)) 1267 self.assertRaises(ValueError, os.mkdir, fn_with_NUL) 1268 self.assertFalse(os.path.exists(fn)) 1269 open(fn, 'wb').close() 1270 self.assertRaises(ValueError, os.stat, fn_with_NUL) 1271 1272 def test_path_with_null_byte(self): 1273 fn = os.fsencode(os_helper.TESTFN) 1274 fn_with_NUL = fn + b'\0' 1275 self.addCleanup(os_helper.unlink, fn) 1276 os_helper.unlink(fn) 1277 fd = None 1278 try: 1279 with self.assertRaises(ValueError): 1280 fd = os.open(fn_with_NUL, os.O_WRONLY | os.O_CREAT) # raises 1281 finally: 1282 if fd is not None: 1283 os.close(fd) 1284 self.assertFalse(os.path.exists(fn)) 1285 self.assertRaises(ValueError, os.mkdir, fn_with_NUL) 1286 self.assertFalse(os.path.exists(fn)) 1287 open(fn, 'wb').close() 1288 self.assertRaises(ValueError, os.stat, fn_with_NUL) 1289 1290 @unittest.skipUnless(hasattr(os, "pidfd_open"), "pidfd_open unavailable") 1291 def test_pidfd_open(self): 1292 with self.assertRaises(OSError) as cm: 1293 os.pidfd_open(-1) 1294 if cm.exception.errno == errno.ENOSYS: 1295 self.skipTest("system does not support pidfd_open") 1296 if isinstance(cm.exception, PermissionError): 1297 self.skipTest(f"pidfd_open syscall blocked: {cm.exception!r}") 1298 self.assertEqual(cm.exception.errno, errno.EINVAL) 1299 os.close(os.pidfd_open(os.getpid(), 0)) 1300 1301 1302# tests for the posix *at functions follow 1303class TestPosixDirFd(unittest.TestCase): 1304 count = 0 1305 1306 @contextmanager 1307 def prepare(self): 1308 TestPosixDirFd.count += 1 1309 name = f'{os_helper.TESTFN}_{self.count}' 1310 base_dir = f'{os_helper.TESTFN}_{self.count}base' 1311 posix.mkdir(base_dir) 1312 self.addCleanup(posix.rmdir, base_dir) 1313 fullname = os.path.join(base_dir, name) 1314 assert not os.path.exists(fullname) 1315 with os_helper.open_dir_fd(base_dir) as dir_fd: 1316 yield (dir_fd, name, fullname) 1317 1318 @contextmanager 1319 def prepare_file(self): 1320 with self.prepare() as (dir_fd, name, fullname): 1321 os_helper.create_empty_file(fullname) 1322 self.addCleanup(posix.unlink, fullname) 1323 yield (dir_fd, name, fullname) 1324 1325 @unittest.skipUnless(os.access in os.supports_dir_fd, "test needs dir_fd support for os.access()") 1326 def test_access_dir_fd(self): 1327 with self.prepare_file() as (dir_fd, name, fullname): 1328 self.assertTrue(posix.access(name, os.R_OK, dir_fd=dir_fd)) 1329 1330 @unittest.skipUnless(os.chmod in os.supports_dir_fd, "test needs dir_fd support in os.chmod()") 1331 def test_chmod_dir_fd(self): 1332 with self.prepare_file() as (dir_fd, name, fullname): 1333 posix.chmod(fullname, stat.S_IRUSR) 1334 posix.chmod(name, stat.S_IRUSR | stat.S_IWUSR, dir_fd=dir_fd) 1335 s = posix.stat(fullname) 1336 self.assertEqual(s.st_mode & stat.S_IRWXU, 1337 stat.S_IRUSR | stat.S_IWUSR) 1338 1339 @unittest.skipUnless(hasattr(os, 'chown') and (os.chown in os.supports_dir_fd), 1340 "test needs dir_fd support in os.chown()") 1341 def test_chown_dir_fd(self): 1342 with self.prepare_file() as (dir_fd, name, fullname): 1343 posix.chown(name, os.getuid(), os.getgid(), dir_fd=dir_fd) 1344 1345 @unittest.skipUnless(os.stat in os.supports_dir_fd, "test needs dir_fd support in os.stat()") 1346 def test_stat_dir_fd(self): 1347 with self.prepare() as (dir_fd, name, fullname): 1348 with open(fullname, 'w') as outfile: 1349 outfile.write("testline\n") 1350 self.addCleanup(posix.unlink, fullname) 1351 1352 s1 = posix.stat(fullname) 1353 s2 = posix.stat(name, dir_fd=dir_fd) 1354 self.assertEqual(s1, s2) 1355 s2 = posix.stat(fullname, dir_fd=None) 1356 self.assertEqual(s1, s2) 1357 1358 self.assertRaisesRegex(TypeError, 'should be integer or None, not', 1359 posix.stat, name, dir_fd=posix.getcwd()) 1360 self.assertRaisesRegex(TypeError, 'should be integer or None, not', 1361 posix.stat, name, dir_fd=float(dir_fd)) 1362 self.assertRaises(OverflowError, 1363 posix.stat, name, dir_fd=10**20) 1364 1365 @unittest.skipUnless(os.utime in os.supports_dir_fd, "test needs dir_fd support in os.utime()") 1366 def test_utime_dir_fd(self): 1367 with self.prepare_file() as (dir_fd, name, fullname): 1368 now = time.time() 1369 posix.utime(name, None, dir_fd=dir_fd) 1370 posix.utime(name, dir_fd=dir_fd) 1371 self.assertRaises(TypeError, posix.utime, name, 1372 now, dir_fd=dir_fd) 1373 self.assertRaises(TypeError, posix.utime, name, 1374 (None, None), dir_fd=dir_fd) 1375 self.assertRaises(TypeError, posix.utime, name, 1376 (now, None), dir_fd=dir_fd) 1377 self.assertRaises(TypeError, posix.utime, name, 1378 (None, now), dir_fd=dir_fd) 1379 self.assertRaises(TypeError, posix.utime, name, 1380 (now, "x"), dir_fd=dir_fd) 1381 posix.utime(name, (int(now), int(now)), dir_fd=dir_fd) 1382 posix.utime(name, (now, now), dir_fd=dir_fd) 1383 posix.utime(name, 1384 (int(now), int((now - int(now)) * 1e9)), dir_fd=dir_fd) 1385 posix.utime(name, dir_fd=dir_fd, 1386 times=(int(now), int((now - int(now)) * 1e9))) 1387 1388 # try dir_fd and follow_symlinks together 1389 if os.utime in os.supports_follow_symlinks: 1390 try: 1391 posix.utime(name, follow_symlinks=False, dir_fd=dir_fd) 1392 except ValueError: 1393 # whoops! using both together not supported on this platform. 1394 pass 1395 1396 @unittest.skipUnless(os.link in os.supports_dir_fd, "test needs dir_fd support in os.link()") 1397 def test_link_dir_fd(self): 1398 with self.prepare_file() as (dir_fd, name, fullname), \ 1399 self.prepare() as (dir_fd2, linkname, fulllinkname): 1400 try: 1401 posix.link(name, linkname, src_dir_fd=dir_fd, dst_dir_fd=dir_fd2) 1402 except PermissionError as e: 1403 self.skipTest('posix.link(): %s' % e) 1404 self.addCleanup(posix.unlink, fulllinkname) 1405 # should have same inodes 1406 self.assertEqual(posix.stat(fullname)[1], 1407 posix.stat(fulllinkname)[1]) 1408 1409 @unittest.skipUnless(os.mkdir in os.supports_dir_fd, "test needs dir_fd support in os.mkdir()") 1410 def test_mkdir_dir_fd(self): 1411 with self.prepare() as (dir_fd, name, fullname): 1412 posix.mkdir(name, dir_fd=dir_fd) 1413 self.addCleanup(posix.rmdir, fullname) 1414 posix.stat(fullname) # should not raise exception 1415 1416 @unittest.skipUnless(hasattr(os, 'mknod') 1417 and (os.mknod in os.supports_dir_fd) 1418 and hasattr(stat, 'S_IFIFO'), 1419 "test requires both stat.S_IFIFO and dir_fd support for os.mknod()") 1420 def test_mknod_dir_fd(self): 1421 # Test using mknodat() to create a FIFO (the only use specified 1422 # by POSIX). 1423 with self.prepare() as (dir_fd, name, fullname): 1424 mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR 1425 try: 1426 posix.mknod(name, mode, 0, dir_fd=dir_fd) 1427 except OSError as e: 1428 # Some old systems don't allow unprivileged users to use 1429 # mknod(), or only support creating device nodes. 1430 self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES)) 1431 else: 1432 self.addCleanup(posix.unlink, fullname) 1433 self.assertTrue(stat.S_ISFIFO(posix.stat(fullname).st_mode)) 1434 1435 @unittest.skipUnless(os.open in os.supports_dir_fd, "test needs dir_fd support in os.open()") 1436 def test_open_dir_fd(self): 1437 with self.prepare() as (dir_fd, name, fullname): 1438 with open(fullname, 'wb') as outfile: 1439 outfile.write(b"testline\n") 1440 self.addCleanup(posix.unlink, fullname) 1441 fd = posix.open(name, posix.O_RDONLY, dir_fd=dir_fd) 1442 try: 1443 res = posix.read(fd, 9) 1444 self.assertEqual(b"testline\n", res) 1445 finally: 1446 posix.close(fd) 1447 1448 @unittest.skipUnless(hasattr(os, 'readlink') and (os.readlink in os.supports_dir_fd), 1449 "test needs dir_fd support in os.readlink()") 1450 def test_readlink_dir_fd(self): 1451 with self.prepare() as (dir_fd, name, fullname): 1452 os.symlink('symlink', fullname) 1453 self.addCleanup(posix.unlink, fullname) 1454 self.assertEqual(posix.readlink(name, dir_fd=dir_fd), 'symlink') 1455 1456 @unittest.skipUnless(os.rename in os.supports_dir_fd, "test needs dir_fd support in os.rename()") 1457 def test_rename_dir_fd(self): 1458 with self.prepare_file() as (dir_fd, name, fullname), \ 1459 self.prepare() as (dir_fd2, name2, fullname2): 1460 posix.rename(name, name2, 1461 src_dir_fd=dir_fd, dst_dir_fd=dir_fd2) 1462 posix.stat(fullname2) # should not raise exception 1463 posix.rename(fullname2, fullname) 1464 1465 @unittest.skipUnless(os.symlink in os.supports_dir_fd, "test needs dir_fd support in os.symlink()") 1466 def test_symlink_dir_fd(self): 1467 with self.prepare() as (dir_fd, name, fullname): 1468 posix.symlink('symlink', name, dir_fd=dir_fd) 1469 self.addCleanup(posix.unlink, fullname) 1470 self.assertEqual(posix.readlink(fullname), 'symlink') 1471 1472 @unittest.skipUnless(os.unlink in os.supports_dir_fd, "test needs dir_fd support in os.unlink()") 1473 def test_unlink_dir_fd(self): 1474 with self.prepare() as (dir_fd, name, fullname): 1475 os_helper.create_empty_file(fullname) 1476 posix.stat(fullname) # should not raise exception 1477 try: 1478 posix.unlink(name, dir_fd=dir_fd) 1479 self.assertRaises(OSError, posix.stat, fullname) 1480 except: 1481 self.addCleanup(posix.unlink, fullname) 1482 raise 1483 1484 @unittest.skipUnless(os.mkfifo in os.supports_dir_fd, "test needs dir_fd support in os.mkfifo()") 1485 def test_mkfifo_dir_fd(self): 1486 with self.prepare() as (dir_fd, name, fullname): 1487 try: 1488 posix.mkfifo(name, stat.S_IRUSR | stat.S_IWUSR, dir_fd=dir_fd) 1489 except PermissionError as e: 1490 self.skipTest('posix.mkfifo(): %s' % e) 1491 self.addCleanup(posix.unlink, fullname) 1492 self.assertTrue(stat.S_ISFIFO(posix.stat(fullname).st_mode)) 1493 1494 1495class PosixGroupsTester(unittest.TestCase): 1496 1497 def setUp(self): 1498 if posix.getuid() != 0: 1499 raise unittest.SkipTest("not enough privileges") 1500 if not hasattr(posix, 'getgroups'): 1501 raise unittest.SkipTest("need posix.getgroups") 1502 if sys.platform == 'darwin': 1503 raise unittest.SkipTest("getgroups(2) is broken on OSX") 1504 self.saved_groups = posix.getgroups() 1505 1506 def tearDown(self): 1507 if hasattr(posix, 'setgroups'): 1508 posix.setgroups(self.saved_groups) 1509 elif hasattr(posix, 'initgroups'): 1510 name = pwd.getpwuid(posix.getuid()).pw_name 1511 posix.initgroups(name, self.saved_groups[0]) 1512 1513 @unittest.skipUnless(hasattr(posix, 'initgroups'), 1514 "test needs posix.initgroups()") 1515 def test_initgroups(self): 1516 # find missing group 1517 1518 g = max(self.saved_groups or [0]) + 1 1519 name = pwd.getpwuid(posix.getuid()).pw_name 1520 posix.initgroups(name, g) 1521 self.assertIn(g, posix.getgroups()) 1522 1523 @unittest.skipUnless(hasattr(posix, 'setgroups'), 1524 "test needs posix.setgroups()") 1525 def test_setgroups(self): 1526 for groups in [[0], list(range(16))]: 1527 posix.setgroups(groups) 1528 self.assertListEqual(groups, posix.getgroups()) 1529 1530 1531class _PosixSpawnMixin: 1532 # Program which does nothing and exits with status 0 (success) 1533 NOOP_PROGRAM = (sys.executable, '-I', '-S', '-c', 'pass') 1534 spawn_func = None 1535 1536 def python_args(self, *args): 1537 # Disable site module to avoid side effects. For example, 1538 # on Fedora 28, if the HOME environment variable is not set, 1539 # site._getuserbase() calls pwd.getpwuid() which opens 1540 # /var/lib/sss/mc/passwd but then leaves the file open which makes 1541 # test_close_file() to fail. 1542 return (sys.executable, '-I', '-S', *args) 1543 1544 def test_returns_pid(self): 1545 pidfile = os_helper.TESTFN 1546 self.addCleanup(os_helper.unlink, pidfile) 1547 script = f"""if 1: 1548 import os 1549 with open({pidfile!r}, "w") as pidfile: 1550 pidfile.write(str(os.getpid())) 1551 """ 1552 args = self.python_args('-c', script) 1553 pid = self.spawn_func(args[0], args, os.environ) 1554 support.wait_process(pid, exitcode=0) 1555 with open(pidfile, encoding="utf-8") as f: 1556 self.assertEqual(f.read(), str(pid)) 1557 1558 def test_no_such_executable(self): 1559 no_such_executable = 'no_such_executable' 1560 try: 1561 pid = self.spawn_func(no_such_executable, 1562 [no_such_executable], 1563 os.environ) 1564 # bpo-35794: PermissionError can be raised if there are 1565 # directories in the $PATH that are not accessible. 1566 except (FileNotFoundError, PermissionError) as exc: 1567 self.assertEqual(exc.filename, no_such_executable) 1568 else: 1569 pid2, status = os.waitpid(pid, 0) 1570 self.assertEqual(pid2, pid) 1571 self.assertNotEqual(status, 0) 1572 1573 def test_specify_environment(self): 1574 envfile = os_helper.TESTFN 1575 self.addCleanup(os_helper.unlink, envfile) 1576 script = f"""if 1: 1577 import os 1578 with open({envfile!r}, "w", encoding="utf-8") as envfile: 1579 envfile.write(os.environ['foo']) 1580 """ 1581 args = self.python_args('-c', script) 1582 pid = self.spawn_func(args[0], args, 1583 {**os.environ, 'foo': 'bar'}) 1584 support.wait_process(pid, exitcode=0) 1585 with open(envfile, encoding="utf-8") as f: 1586 self.assertEqual(f.read(), 'bar') 1587 1588 def test_none_file_actions(self): 1589 pid = self.spawn_func( 1590 self.NOOP_PROGRAM[0], 1591 self.NOOP_PROGRAM, 1592 os.environ, 1593 file_actions=None 1594 ) 1595 support.wait_process(pid, exitcode=0) 1596 1597 def test_empty_file_actions(self): 1598 pid = self.spawn_func( 1599 self.NOOP_PROGRAM[0], 1600 self.NOOP_PROGRAM, 1601 os.environ, 1602 file_actions=[] 1603 ) 1604 support.wait_process(pid, exitcode=0) 1605 1606 def test_resetids_explicit_default(self): 1607 pid = self.spawn_func( 1608 sys.executable, 1609 [sys.executable, '-c', 'pass'], 1610 os.environ, 1611 resetids=False 1612 ) 1613 support.wait_process(pid, exitcode=0) 1614 1615 def test_resetids(self): 1616 pid = self.spawn_func( 1617 sys.executable, 1618 [sys.executable, '-c', 'pass'], 1619 os.environ, 1620 resetids=True 1621 ) 1622 support.wait_process(pid, exitcode=0) 1623 1624 def test_resetids_wrong_type(self): 1625 with self.assertRaises(TypeError): 1626 self.spawn_func(sys.executable, 1627 [sys.executable, "-c", "pass"], 1628 os.environ, resetids=None) 1629 1630 def test_setpgroup(self): 1631 pid = self.spawn_func( 1632 sys.executable, 1633 [sys.executable, '-c', 'pass'], 1634 os.environ, 1635 setpgroup=os.getpgrp() 1636 ) 1637 support.wait_process(pid, exitcode=0) 1638 1639 def test_setpgroup_wrong_type(self): 1640 with self.assertRaises(TypeError): 1641 self.spawn_func(sys.executable, 1642 [sys.executable, "-c", "pass"], 1643 os.environ, setpgroup="023") 1644 1645 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 1646 'need signal.pthread_sigmask()') 1647 def test_setsigmask(self): 1648 code = textwrap.dedent("""\ 1649 import signal 1650 signal.raise_signal(signal.SIGUSR1)""") 1651 1652 pid = self.spawn_func( 1653 sys.executable, 1654 [sys.executable, '-c', code], 1655 os.environ, 1656 setsigmask=[signal.SIGUSR1] 1657 ) 1658 support.wait_process(pid, exitcode=0) 1659 1660 def test_setsigmask_wrong_type(self): 1661 with self.assertRaises(TypeError): 1662 self.spawn_func(sys.executable, 1663 [sys.executable, "-c", "pass"], 1664 os.environ, setsigmask=34) 1665 with self.assertRaises(TypeError): 1666 self.spawn_func(sys.executable, 1667 [sys.executable, "-c", "pass"], 1668 os.environ, setsigmask=["j"]) 1669 with self.assertRaises(ValueError): 1670 self.spawn_func(sys.executable, 1671 [sys.executable, "-c", "pass"], 1672 os.environ, setsigmask=[signal.NSIG, 1673 signal.NSIG+1]) 1674 1675 def test_setsid(self): 1676 rfd, wfd = os.pipe() 1677 self.addCleanup(os.close, rfd) 1678 try: 1679 os.set_inheritable(wfd, True) 1680 1681 code = textwrap.dedent(f""" 1682 import os 1683 fd = {wfd} 1684 sid = os.getsid(0) 1685 os.write(fd, str(sid).encode()) 1686 """) 1687 1688 try: 1689 pid = self.spawn_func(sys.executable, 1690 [sys.executable, "-c", code], 1691 os.environ, setsid=True) 1692 except NotImplementedError as exc: 1693 self.skipTest(f"setsid is not supported: {exc!r}") 1694 except PermissionError as exc: 1695 self.skipTest(f"setsid failed with: {exc!r}") 1696 finally: 1697 os.close(wfd) 1698 1699 support.wait_process(pid, exitcode=0) 1700 1701 output = os.read(rfd, 100) 1702 child_sid = int(output) 1703 parent_sid = os.getsid(os.getpid()) 1704 self.assertNotEqual(parent_sid, child_sid) 1705 1706 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 1707 'need signal.pthread_sigmask()') 1708 def test_setsigdef(self): 1709 original_handler = signal.signal(signal.SIGUSR1, signal.SIG_IGN) 1710 code = textwrap.dedent("""\ 1711 import signal 1712 signal.raise_signal(signal.SIGUSR1)""") 1713 try: 1714 pid = self.spawn_func( 1715 sys.executable, 1716 [sys.executable, '-c', code], 1717 os.environ, 1718 setsigdef=[signal.SIGUSR1] 1719 ) 1720 finally: 1721 signal.signal(signal.SIGUSR1, original_handler) 1722 1723 support.wait_process(pid, exitcode=-signal.SIGUSR1) 1724 1725 def test_setsigdef_wrong_type(self): 1726 with self.assertRaises(TypeError): 1727 self.spawn_func(sys.executable, 1728 [sys.executable, "-c", "pass"], 1729 os.environ, setsigdef=34) 1730 with self.assertRaises(TypeError): 1731 self.spawn_func(sys.executable, 1732 [sys.executable, "-c", "pass"], 1733 os.environ, setsigdef=["j"]) 1734 with self.assertRaises(ValueError): 1735 self.spawn_func(sys.executable, 1736 [sys.executable, "-c", "pass"], 1737 os.environ, setsigdef=[signal.NSIG, signal.NSIG+1]) 1738 1739 @requires_sched 1740 @unittest.skipIf(sys.platform.startswith(('freebsd', 'netbsd')), 1741 "bpo-34685: test can fail on BSD") 1742 def test_setscheduler_only_param(self): 1743 policy = os.sched_getscheduler(0) 1744 priority = os.sched_get_priority_min(policy) 1745 code = textwrap.dedent(f"""\ 1746 import os, sys 1747 if os.sched_getscheduler(0) != {policy}: 1748 sys.exit(101) 1749 if os.sched_getparam(0).sched_priority != {priority}: 1750 sys.exit(102)""") 1751 pid = self.spawn_func( 1752 sys.executable, 1753 [sys.executable, '-c', code], 1754 os.environ, 1755 scheduler=(None, os.sched_param(priority)) 1756 ) 1757 support.wait_process(pid, exitcode=0) 1758 1759 @requires_sched 1760 @unittest.skipIf(sys.platform.startswith(('freebsd', 'netbsd')), 1761 "bpo-34685: test can fail on BSD") 1762 def test_setscheduler_with_policy(self): 1763 policy = os.sched_getscheduler(0) 1764 priority = os.sched_get_priority_min(policy) 1765 code = textwrap.dedent(f"""\ 1766 import os, sys 1767 if os.sched_getscheduler(0) != {policy}: 1768 sys.exit(101) 1769 if os.sched_getparam(0).sched_priority != {priority}: 1770 sys.exit(102)""") 1771 pid = self.spawn_func( 1772 sys.executable, 1773 [sys.executable, '-c', code], 1774 os.environ, 1775 scheduler=(policy, os.sched_param(priority)) 1776 ) 1777 support.wait_process(pid, exitcode=0) 1778 1779 def test_multiple_file_actions(self): 1780 file_actions = [ 1781 (os.POSIX_SPAWN_OPEN, 3, os.path.realpath(__file__), os.O_RDONLY, 0), 1782 (os.POSIX_SPAWN_CLOSE, 0), 1783 (os.POSIX_SPAWN_DUP2, 1, 4), 1784 ] 1785 pid = self.spawn_func(self.NOOP_PROGRAM[0], 1786 self.NOOP_PROGRAM, 1787 os.environ, 1788 file_actions=file_actions) 1789 support.wait_process(pid, exitcode=0) 1790 1791 def test_bad_file_actions(self): 1792 args = self.NOOP_PROGRAM 1793 with self.assertRaises(TypeError): 1794 self.spawn_func(args[0], args, os.environ, 1795 file_actions=[None]) 1796 with self.assertRaises(TypeError): 1797 self.spawn_func(args[0], args, os.environ, 1798 file_actions=[()]) 1799 with self.assertRaises(TypeError): 1800 self.spawn_func(args[0], args, os.environ, 1801 file_actions=[(None,)]) 1802 with self.assertRaises(TypeError): 1803 self.spawn_func(args[0], args, os.environ, 1804 file_actions=[(12345,)]) 1805 with self.assertRaises(TypeError): 1806 self.spawn_func(args[0], args, os.environ, 1807 file_actions=[(os.POSIX_SPAWN_CLOSE,)]) 1808 with self.assertRaises(TypeError): 1809 self.spawn_func(args[0], args, os.environ, 1810 file_actions=[(os.POSIX_SPAWN_CLOSE, 1, 2)]) 1811 with self.assertRaises(TypeError): 1812 self.spawn_func(args[0], args, os.environ, 1813 file_actions=[(os.POSIX_SPAWN_CLOSE, None)]) 1814 with self.assertRaises(ValueError): 1815 self.spawn_func(args[0], args, os.environ, 1816 file_actions=[(os.POSIX_SPAWN_OPEN, 1817 3, __file__ + '\0', 1818 os.O_RDONLY, 0)]) 1819 1820 def test_open_file(self): 1821 outfile = os_helper.TESTFN 1822 self.addCleanup(os_helper.unlink, outfile) 1823 script = """if 1: 1824 import sys 1825 sys.stdout.write("hello") 1826 """ 1827 file_actions = [ 1828 (os.POSIX_SPAWN_OPEN, 1, outfile, 1829 os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 1830 stat.S_IRUSR | stat.S_IWUSR), 1831 ] 1832 args = self.python_args('-c', script) 1833 pid = self.spawn_func(args[0], args, os.environ, 1834 file_actions=file_actions) 1835 1836 support.wait_process(pid, exitcode=0) 1837 with open(outfile, encoding="utf-8") as f: 1838 self.assertEqual(f.read(), 'hello') 1839 1840 def test_close_file(self): 1841 closefile = os_helper.TESTFN 1842 self.addCleanup(os_helper.unlink, closefile) 1843 script = f"""if 1: 1844 import os 1845 try: 1846 os.fstat(0) 1847 except OSError as e: 1848 with open({closefile!r}, 'w', encoding='utf-8') as closefile: 1849 closefile.write('is closed %d' % e.errno) 1850 """ 1851 args = self.python_args('-c', script) 1852 pid = self.spawn_func(args[0], args, os.environ, 1853 file_actions=[(os.POSIX_SPAWN_CLOSE, 0)]) 1854 1855 support.wait_process(pid, exitcode=0) 1856 with open(closefile, encoding="utf-8") as f: 1857 self.assertEqual(f.read(), 'is closed %d' % errno.EBADF) 1858 1859 def test_dup2(self): 1860 dupfile = os_helper.TESTFN 1861 self.addCleanup(os_helper.unlink, dupfile) 1862 script = """if 1: 1863 import sys 1864 sys.stdout.write("hello") 1865 """ 1866 with open(dupfile, "wb") as childfile: 1867 file_actions = [ 1868 (os.POSIX_SPAWN_DUP2, childfile.fileno(), 1), 1869 ] 1870 args = self.python_args('-c', script) 1871 pid = self.spawn_func(args[0], args, os.environ, 1872 file_actions=file_actions) 1873 support.wait_process(pid, exitcode=0) 1874 with open(dupfile, encoding="utf-8") as f: 1875 self.assertEqual(f.read(), 'hello') 1876 1877 1878@unittest.skipUnless(hasattr(os, 'posix_spawn'), "test needs os.posix_spawn") 1879class TestPosixSpawn(unittest.TestCase, _PosixSpawnMixin): 1880 spawn_func = getattr(posix, 'posix_spawn', None) 1881 1882 1883@unittest.skipUnless(hasattr(os, 'posix_spawnp'), "test needs os.posix_spawnp") 1884class TestPosixSpawnP(unittest.TestCase, _PosixSpawnMixin): 1885 spawn_func = getattr(posix, 'posix_spawnp', None) 1886 1887 @os_helper.skip_unless_symlink 1888 def test_posix_spawnp(self): 1889 # Use a symlink to create a program in its own temporary directory 1890 temp_dir = tempfile.mkdtemp() 1891 self.addCleanup(os_helper.rmtree, temp_dir) 1892 1893 program = 'posix_spawnp_test_program.exe' 1894 program_fullpath = os.path.join(temp_dir, program) 1895 os.symlink(sys.executable, program_fullpath) 1896 1897 try: 1898 path = os.pathsep.join((temp_dir, os.environ['PATH'])) 1899 except KeyError: 1900 path = temp_dir # PATH is not set 1901 1902 spawn_args = (program, '-I', '-S', '-c', 'pass') 1903 code = textwrap.dedent(""" 1904 import os 1905 from test import support 1906 1907 args = %a 1908 pid = os.posix_spawnp(args[0], args, os.environ) 1909 1910 support.wait_process(pid, exitcode=0) 1911 """ % (spawn_args,)) 1912 1913 # Use a subprocess to test os.posix_spawnp() with a modified PATH 1914 # environment variable: posix_spawnp() uses the current environment 1915 # to locate the program, not its environment argument. 1916 args = ('-c', code) 1917 assert_python_ok(*args, PATH=path) 1918 1919 1920@unittest.skipUnless(sys.platform == "darwin", "test weak linking on macOS") 1921class TestPosixWeaklinking(unittest.TestCase): 1922 # These test cases verify that weak linking support on macOS works 1923 # as expected. These cases only test new behaviour introduced by weak linking, 1924 # regular behaviour is tested by the normal test cases. 1925 # 1926 # See the section on Weak Linking in Mac/README.txt for more information. 1927 def setUp(self): 1928 import sysconfig 1929 import platform 1930 1931 config_vars = sysconfig.get_config_vars() 1932 self.available = { nm for nm in config_vars if nm.startswith("HAVE_") and config_vars[nm] } 1933 self.mac_ver = tuple(int(part) for part in platform.mac_ver()[0].split(".")) 1934 1935 def _verify_available(self, name): 1936 if name not in self.available: 1937 raise unittest.SkipTest(f"{name} not weak-linked") 1938 1939 def test_pwritev(self): 1940 self._verify_available("HAVE_PWRITEV") 1941 if self.mac_ver >= (10, 16): 1942 self.assertTrue(hasattr(os, "pwritev"), "os.pwritev is not available") 1943 self.assertTrue(hasattr(os, "preadv"), "os.readv is not available") 1944 1945 else: 1946 self.assertFalse(hasattr(os, "pwritev"), "os.pwritev is available") 1947 self.assertFalse(hasattr(os, "preadv"), "os.readv is available") 1948 1949 def test_stat(self): 1950 self._verify_available("HAVE_FSTATAT") 1951 if self.mac_ver >= (10, 10): 1952 self.assertIn("HAVE_FSTATAT", posix._have_functions) 1953 1954 else: 1955 self.assertNotIn("HAVE_FSTATAT", posix._have_functions) 1956 1957 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 1958 os.stat("file", dir_fd=0) 1959 1960 def test_access(self): 1961 self._verify_available("HAVE_FACCESSAT") 1962 if self.mac_ver >= (10, 10): 1963 self.assertIn("HAVE_FACCESSAT", posix._have_functions) 1964 1965 else: 1966 self.assertNotIn("HAVE_FACCESSAT", posix._have_functions) 1967 1968 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 1969 os.access("file", os.R_OK, dir_fd=0) 1970 1971 with self.assertRaisesRegex(NotImplementedError, "follow_symlinks unavailable"): 1972 os.access("file", os.R_OK, follow_symlinks=False) 1973 1974 with self.assertRaisesRegex(NotImplementedError, "effective_ids unavailable"): 1975 os.access("file", os.R_OK, effective_ids=True) 1976 1977 def test_chmod(self): 1978 self._verify_available("HAVE_FCHMODAT") 1979 if self.mac_ver >= (10, 10): 1980 self.assertIn("HAVE_FCHMODAT", posix._have_functions) 1981 1982 else: 1983 self.assertNotIn("HAVE_FCHMODAT", posix._have_functions) 1984 self.assertIn("HAVE_LCHMOD", posix._have_functions) 1985 1986 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 1987 os.chmod("file", 0o644, dir_fd=0) 1988 1989 def test_chown(self): 1990 self._verify_available("HAVE_FCHOWNAT") 1991 if self.mac_ver >= (10, 10): 1992 self.assertIn("HAVE_FCHOWNAT", posix._have_functions) 1993 1994 else: 1995 self.assertNotIn("HAVE_FCHOWNAT", posix._have_functions) 1996 self.assertIn("HAVE_LCHOWN", posix._have_functions) 1997 1998 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 1999 os.chown("file", 0, 0, dir_fd=0) 2000 2001 def test_link(self): 2002 self._verify_available("HAVE_LINKAT") 2003 if self.mac_ver >= (10, 10): 2004 self.assertIn("HAVE_LINKAT", posix._have_functions) 2005 2006 else: 2007 self.assertNotIn("HAVE_LINKAT", posix._have_functions) 2008 2009 with self.assertRaisesRegex(NotImplementedError, "src_dir_fd unavailable"): 2010 os.link("source", "target", src_dir_fd=0) 2011 2012 with self.assertRaisesRegex(NotImplementedError, "dst_dir_fd unavailable"): 2013 os.link("source", "target", dst_dir_fd=0) 2014 2015 with self.assertRaisesRegex(NotImplementedError, "src_dir_fd unavailable"): 2016 os.link("source", "target", src_dir_fd=0, dst_dir_fd=0) 2017 2018 # issue 41355: !HAVE_LINKAT code path ignores the follow_symlinks flag 2019 with os_helper.temp_dir() as base_path: 2020 link_path = os.path.join(base_path, "link") 2021 target_path = os.path.join(base_path, "target") 2022 source_path = os.path.join(base_path, "source") 2023 2024 with open(source_path, "w") as fp: 2025 fp.write("data") 2026 2027 os.symlink("target", link_path) 2028 2029 # Calling os.link should fail in the link(2) call, and 2030 # should not reject *follow_symlinks* (to match the 2031 # behaviour you'd get when building on a platform without 2032 # linkat) 2033 with self.assertRaises(FileExistsError): 2034 os.link(source_path, link_path, follow_symlinks=True) 2035 2036 with self.assertRaises(FileExistsError): 2037 os.link(source_path, link_path, follow_symlinks=False) 2038 2039 2040 def test_listdir_scandir(self): 2041 self._verify_available("HAVE_FDOPENDIR") 2042 if self.mac_ver >= (10, 10): 2043 self.assertIn("HAVE_FDOPENDIR", posix._have_functions) 2044 2045 else: 2046 self.assertNotIn("HAVE_FDOPENDIR", posix._have_functions) 2047 2048 with self.assertRaisesRegex(TypeError, "listdir: path should be string, bytes, os.PathLike or None, not int"): 2049 os.listdir(0) 2050 2051 with self.assertRaisesRegex(TypeError, "scandir: path should be string, bytes, os.PathLike or None, not int"): 2052 os.scandir(0) 2053 2054 def test_mkdir(self): 2055 self._verify_available("HAVE_MKDIRAT") 2056 if self.mac_ver >= (10, 10): 2057 self.assertIn("HAVE_MKDIRAT", posix._have_functions) 2058 2059 else: 2060 self.assertNotIn("HAVE_MKDIRAT", posix._have_functions) 2061 2062 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2063 os.mkdir("dir", dir_fd=0) 2064 2065 def test_rename_replace(self): 2066 self._verify_available("HAVE_RENAMEAT") 2067 if self.mac_ver >= (10, 10): 2068 self.assertIn("HAVE_RENAMEAT", posix._have_functions) 2069 2070 else: 2071 self.assertNotIn("HAVE_RENAMEAT", posix._have_functions) 2072 2073 with self.assertRaisesRegex(NotImplementedError, "src_dir_fd and dst_dir_fd unavailable"): 2074 os.rename("a", "b", src_dir_fd=0) 2075 2076 with self.assertRaisesRegex(NotImplementedError, "src_dir_fd and dst_dir_fd unavailable"): 2077 os.rename("a", "b", dst_dir_fd=0) 2078 2079 with self.assertRaisesRegex(NotImplementedError, "src_dir_fd and dst_dir_fd unavailable"): 2080 os.replace("a", "b", src_dir_fd=0) 2081 2082 with self.assertRaisesRegex(NotImplementedError, "src_dir_fd and dst_dir_fd unavailable"): 2083 os.replace("a", "b", dst_dir_fd=0) 2084 2085 def test_unlink_rmdir(self): 2086 self._verify_available("HAVE_UNLINKAT") 2087 if self.mac_ver >= (10, 10): 2088 self.assertIn("HAVE_UNLINKAT", posix._have_functions) 2089 2090 else: 2091 self.assertNotIn("HAVE_UNLINKAT", posix._have_functions) 2092 2093 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2094 os.unlink("path", dir_fd=0) 2095 2096 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2097 os.rmdir("path", dir_fd=0) 2098 2099 def test_open(self): 2100 self._verify_available("HAVE_OPENAT") 2101 if self.mac_ver >= (10, 10): 2102 self.assertIn("HAVE_OPENAT", posix._have_functions) 2103 2104 else: 2105 self.assertNotIn("HAVE_OPENAT", posix._have_functions) 2106 2107 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2108 os.open("path", os.O_RDONLY, dir_fd=0) 2109 2110 def test_readlink(self): 2111 self._verify_available("HAVE_READLINKAT") 2112 if self.mac_ver >= (10, 10): 2113 self.assertIn("HAVE_READLINKAT", posix._have_functions) 2114 2115 else: 2116 self.assertNotIn("HAVE_READLINKAT", posix._have_functions) 2117 2118 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2119 os.readlink("path", dir_fd=0) 2120 2121 def test_symlink(self): 2122 self._verify_available("HAVE_SYMLINKAT") 2123 if self.mac_ver >= (10, 10): 2124 self.assertIn("HAVE_SYMLINKAT", posix._have_functions) 2125 2126 else: 2127 self.assertNotIn("HAVE_SYMLINKAT", posix._have_functions) 2128 2129 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2130 os.symlink("a", "b", dir_fd=0) 2131 2132 def test_utime(self): 2133 self._verify_available("HAVE_FUTIMENS") 2134 self._verify_available("HAVE_UTIMENSAT") 2135 if self.mac_ver >= (10, 13): 2136 self.assertIn("HAVE_FUTIMENS", posix._have_functions) 2137 self.assertIn("HAVE_UTIMENSAT", posix._have_functions) 2138 2139 else: 2140 self.assertNotIn("HAVE_FUTIMENS", posix._have_functions) 2141 self.assertNotIn("HAVE_UTIMENSAT", posix._have_functions) 2142 2143 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2144 os.utime("path", dir_fd=0) 2145 2146 2147def tearDownModule(): 2148 support.reap_children() 2149 2150 2151if __name__ == '__main__': 2152 unittest.main() 2153