1"Test posix functions" 2 3from test import support 4android_not_root = support.android_not_root 5 6# Skip these tests if there is no posix module. 7posix = support.import_module('posix') 8 9import errno 10import sys 11import time 12import os 13import platform 14import pwd 15import stat 16import tempfile 17import unittest 18import warnings 19 20_DUMMY_SYMLINK = os.path.join(tempfile.gettempdir(), 21 support.TESTFN + '-dummy-symlink') 22 23class PosixTester(unittest.TestCase): 24 25 def setUp(self): 26 # create empty file 27 fp = open(support.TESTFN, 'w+') 28 fp.close() 29 self.teardown_files = [ support.TESTFN ] 30 self._warnings_manager = support.check_warnings() 31 self._warnings_manager.__enter__() 32 warnings.filterwarnings('ignore', '.* potential security risk .*', 33 RuntimeWarning) 34 35 def tearDown(self): 36 for teardown_file in self.teardown_files: 37 support.unlink(teardown_file) 38 self._warnings_manager.__exit__(None, None, None) 39 40 def testNoArgFunctions(self): 41 # test posix functions which take no arguments and have 42 # no side-effects which we need to cleanup (e.g., fork, wait, abort) 43 NO_ARG_FUNCTIONS = [ "ctermid", "getcwd", "getcwdb", "uname", 44 "times", "getloadavg", 45 "getegid", "geteuid", "getgid", "getgroups", 46 "getpid", "getpgrp", "getppid", "getuid", "sync", 47 ] 48 49 for name in NO_ARG_FUNCTIONS: 50 posix_func = getattr(posix, name, None) 51 if posix_func is not None: 52 posix_func() 53 self.assertRaises(TypeError, posix_func, 1) 54 55 @unittest.skipUnless(hasattr(posix, 'getresuid'), 56 'test needs posix.getresuid()') 57 def test_getresuid(self): 58 user_ids = posix.getresuid() 59 self.assertEqual(len(user_ids), 3) 60 for val in user_ids: 61 self.assertGreaterEqual(val, 0) 62 63 @unittest.skipUnless(hasattr(posix, 'getresgid'), 64 'test needs posix.getresgid()') 65 def test_getresgid(self): 66 group_ids = posix.getresgid() 67 self.assertEqual(len(group_ids), 3) 68 for val in group_ids: 69 self.assertGreaterEqual(val, 0) 70 71 @unittest.skipUnless(hasattr(posix, 'setresuid'), 72 'test needs posix.setresuid()') 73 def test_setresuid(self): 74 current_user_ids = posix.getresuid() 75 self.assertIsNone(posix.setresuid(*current_user_ids)) 76 # -1 means don't change that value. 77 self.assertIsNone(posix.setresuid(-1, -1, -1)) 78 79 @unittest.skipUnless(hasattr(posix, 'setresuid'), 80 'test needs posix.setresuid()') 81 def test_setresuid_exception(self): 82 # Don't do this test if someone is silly enough to run us as root. 83 current_user_ids = posix.getresuid() 84 if 0 not in current_user_ids: 85 new_user_ids = (current_user_ids[0]+1, -1, -1) 86 self.assertRaises(OSError, posix.setresuid, *new_user_ids) 87 88 @unittest.skipUnless(hasattr(posix, 'setresgid'), 89 'test needs posix.setresgid()') 90 def test_setresgid(self): 91 current_group_ids = posix.getresgid() 92 self.assertIsNone(posix.setresgid(*current_group_ids)) 93 # -1 means don't change that value. 94 self.assertIsNone(posix.setresgid(-1, -1, -1)) 95 96 @unittest.skipUnless(hasattr(posix, 'setresgid'), 97 'test needs posix.setresgid()') 98 def test_setresgid_exception(self): 99 # Don't do this test if someone is silly enough to run us as root. 100 current_group_ids = posix.getresgid() 101 if 0 not in current_group_ids: 102 new_group_ids = (current_group_ids[0]+1, -1, -1) 103 self.assertRaises(OSError, posix.setresgid, *new_group_ids) 104 105 @unittest.skipUnless(hasattr(posix, 'initgroups'), 106 "test needs os.initgroups()") 107 def test_initgroups(self): 108 # It takes a string and an integer; check that it raises a TypeError 109 # for other argument lists. 110 self.assertRaises(TypeError, posix.initgroups) 111 self.assertRaises(TypeError, posix.initgroups, None) 112 self.assertRaises(TypeError, posix.initgroups, 3, "foo") 113 self.assertRaises(TypeError, posix.initgroups, "foo", 3, object()) 114 115 # If a non-privileged user invokes it, it should fail with OSError 116 # EPERM. 117 if os.getuid() != 0: 118 try: 119 name = pwd.getpwuid(posix.getuid()).pw_name 120 except KeyError: 121 # the current UID may not have a pwd entry 122 raise unittest.SkipTest("need a pwd entry") 123 try: 124 posix.initgroups(name, 13) 125 except OSError as e: 126 self.assertEqual(e.errno, errno.EPERM) 127 else: 128 self.fail("Expected OSError to be raised by initgroups") 129 130 @unittest.skipUnless(hasattr(posix, 'statvfs'), 131 'test needs posix.statvfs()') 132 def test_statvfs(self): 133 self.assertTrue(posix.statvfs(os.curdir)) 134 135 @unittest.skipUnless(hasattr(posix, 'fstatvfs'), 136 'test needs posix.fstatvfs()') 137 def test_fstatvfs(self): 138 fp = open(support.TESTFN) 139 try: 140 self.assertTrue(posix.fstatvfs(fp.fileno())) 141 self.assertTrue(posix.statvfs(fp.fileno())) 142 finally: 143 fp.close() 144 145 @unittest.skipUnless(hasattr(posix, 'ftruncate'), 146 'test needs posix.ftruncate()') 147 def test_ftruncate(self): 148 fp = open(support.TESTFN, 'w+') 149 try: 150 # we need to have some data to truncate 151 fp.write('test') 152 fp.flush() 153 posix.ftruncate(fp.fileno(), 0) 154 finally: 155 fp.close() 156 157 @unittest.skipUnless(hasattr(posix, 'truncate'), "test needs posix.truncate()") 158 def test_truncate(self): 159 with open(support.TESTFN, 'w') as fp: 160 fp.write('test') 161 fp.flush() 162 posix.truncate(support.TESTFN, 0) 163 164 @unittest.skipUnless(getattr(os, 'execve', None) in os.supports_fd, "test needs execve() to support the fd parameter") 165 @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") 166 @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()") 167 def test_fexecve(self): 168 fp = os.open(sys.executable, os.O_RDONLY) 169 try: 170 pid = os.fork() 171 if pid == 0: 172 os.chdir(os.path.split(sys.executable)[0]) 173 posix.execve(fp, [sys.executable, '-c', 'pass'], os.environ) 174 else: 175 self.assertEqual(os.waitpid(pid, 0), (pid, 0)) 176 finally: 177 os.close(fp) 178 179 @unittest.skipUnless(hasattr(posix, 'waitid'), "test needs posix.waitid()") 180 @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") 181 def test_waitid(self): 182 pid = os.fork() 183 if pid == 0: 184 os.chdir(os.path.split(sys.executable)[0]) 185 posix.execve(sys.executable, [sys.executable, '-c', 'pass'], os.environ) 186 else: 187 res = posix.waitid(posix.P_PID, pid, posix.WEXITED) 188 self.assertEqual(pid, res.si_pid) 189 190 @unittest.skipUnless(hasattr(posix, 'lockf'), "test needs posix.lockf()") 191 def test_lockf(self): 192 fd = os.open(support.TESTFN, os.O_WRONLY | os.O_CREAT) 193 try: 194 os.write(fd, b'test') 195 os.lseek(fd, 0, os.SEEK_SET) 196 posix.lockf(fd, posix.F_LOCK, 4) 197 # section is locked 198 posix.lockf(fd, posix.F_ULOCK, 4) 199 finally: 200 os.close(fd) 201 202 @unittest.skipUnless(hasattr(posix, 'pread'), "test needs posix.pread()") 203 def test_pread(self): 204 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) 205 try: 206 os.write(fd, b'test') 207 os.lseek(fd, 0, os.SEEK_SET) 208 self.assertEqual(b'es', posix.pread(fd, 2, 1)) 209 # the first pread() shouldn't disturb the file offset 210 self.assertEqual(b'te', posix.read(fd, 2)) 211 finally: 212 os.close(fd) 213 214 @unittest.skipUnless(hasattr(posix, 'pwrite'), "test needs posix.pwrite()") 215 def test_pwrite(self): 216 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) 217 try: 218 os.write(fd, b'test') 219 os.lseek(fd, 0, os.SEEK_SET) 220 posix.pwrite(fd, b'xx', 1) 221 self.assertEqual(b'txxt', posix.read(fd, 4)) 222 finally: 223 os.close(fd) 224 225 @unittest.skipUnless(hasattr(posix, 'posix_fallocate'), 226 "test needs posix.posix_fallocate()") 227 def test_posix_fallocate(self): 228 fd = os.open(support.TESTFN, os.O_WRONLY | os.O_CREAT) 229 try: 230 posix.posix_fallocate(fd, 0, 10) 231 except OSError as inst: 232 # issue10812, ZFS doesn't appear to support posix_fallocate, 233 # so skip Solaris-based since they are likely to have ZFS. 234 if inst.errno != errno.EINVAL or not sys.platform.startswith("sunos"): 235 raise 236 finally: 237 os.close(fd) 238 239 @unittest.skipUnless(hasattr(posix, 'posix_fadvise'), 240 "test needs posix.posix_fadvise()") 241 def test_posix_fadvise(self): 242 fd = os.open(support.TESTFN, os.O_RDONLY) 243 try: 244 posix.posix_fadvise(fd, 0, 0, posix.POSIX_FADV_WILLNEED) 245 finally: 246 os.close(fd) 247 248 @unittest.skipUnless(os.utime in os.supports_fd, "test needs fd support in os.utime") 249 def test_utime_with_fd(self): 250 now = time.time() 251 fd = os.open(support.TESTFN, os.O_RDONLY) 252 try: 253 posix.utime(fd) 254 posix.utime(fd, None) 255 self.assertRaises(TypeError, posix.utime, fd, (None, None)) 256 self.assertRaises(TypeError, posix.utime, fd, (now, None)) 257 self.assertRaises(TypeError, posix.utime, fd, (None, now)) 258 posix.utime(fd, (int(now), int(now))) 259 posix.utime(fd, (now, now)) 260 self.assertRaises(ValueError, posix.utime, fd, (now, now), ns=(now, now)) 261 self.assertRaises(ValueError, posix.utime, fd, (now, 0), ns=(None, None)) 262 self.assertRaises(ValueError, posix.utime, fd, (None, None), ns=(now, 0)) 263 posix.utime(fd, (int(now), int((now - int(now)) * 1e9))) 264 posix.utime(fd, ns=(int(now), int((now - int(now)) * 1e9))) 265 266 finally: 267 os.close(fd) 268 269 @unittest.skipUnless(os.utime in os.supports_follow_symlinks, "test needs follow_symlinks support in os.utime") 270 def test_utime_nofollow_symlinks(self): 271 now = time.time() 272 posix.utime(support.TESTFN, None, follow_symlinks=False) 273 self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, None), follow_symlinks=False) 274 self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None), follow_symlinks=False) 275 self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now), follow_symlinks=False) 276 posix.utime(support.TESTFN, (int(now), int(now)), follow_symlinks=False) 277 posix.utime(support.TESTFN, (now, now), follow_symlinks=False) 278 posix.utime(support.TESTFN, follow_symlinks=False) 279 280 @unittest.skipUnless(hasattr(posix, 'writev'), "test needs posix.writev()") 281 def test_writev(self): 282 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) 283 try: 284 n = os.writev(fd, (b'test1', b'tt2', b't3')) 285 self.assertEqual(n, 10) 286 287 os.lseek(fd, 0, os.SEEK_SET) 288 self.assertEqual(b'test1tt2t3', posix.read(fd, 10)) 289 290 # Issue #20113: empty list of buffers should not crash 291 try: 292 size = posix.writev(fd, []) 293 except OSError: 294 # writev(fd, []) raises OSError(22, "Invalid argument") 295 # on OpenIndiana 296 pass 297 else: 298 self.assertEqual(size, 0) 299 finally: 300 os.close(fd) 301 302 @unittest.skipUnless(hasattr(posix, 'readv'), "test needs posix.readv()") 303 def test_readv(self): 304 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) 305 try: 306 os.write(fd, b'test1tt2t3') 307 os.lseek(fd, 0, os.SEEK_SET) 308 buf = [bytearray(i) for i in [5, 3, 2]] 309 self.assertEqual(posix.readv(fd, buf), 10) 310 self.assertEqual([b'test1', b'tt2', b't3'], [bytes(i) for i in buf]) 311 312 # Issue #20113: empty list of buffers should not crash 313 try: 314 size = posix.readv(fd, []) 315 except OSError: 316 # readv(fd, []) raises OSError(22, "Invalid argument") 317 # on OpenIndiana 318 pass 319 else: 320 self.assertEqual(size, 0) 321 finally: 322 os.close(fd) 323 324 @unittest.skipUnless(hasattr(posix, 'dup'), 325 'test needs posix.dup()') 326 def test_dup(self): 327 fp = open(support.TESTFN) 328 try: 329 fd = posix.dup(fp.fileno()) 330 self.assertIsInstance(fd, int) 331 os.close(fd) 332 finally: 333 fp.close() 334 335 @unittest.skipUnless(hasattr(posix, 'confstr'), 336 'test needs posix.confstr()') 337 def test_confstr(self): 338 self.assertRaises(ValueError, posix.confstr, "CS_garbage") 339 self.assertEqual(len(posix.confstr("CS_PATH")) > 0, True) 340 341 @unittest.skipUnless(hasattr(posix, 'dup2'), 342 'test needs posix.dup2()') 343 def test_dup2(self): 344 fp1 = open(support.TESTFN) 345 fp2 = open(support.TESTFN) 346 try: 347 posix.dup2(fp1.fileno(), fp2.fileno()) 348 finally: 349 fp1.close() 350 fp2.close() 351 352 @unittest.skipUnless(hasattr(os, 'O_CLOEXEC'), "needs os.O_CLOEXEC") 353 @support.requires_linux_version(2, 6, 23) 354 def test_oscloexec(self): 355 fd = os.open(support.TESTFN, os.O_RDONLY|os.O_CLOEXEC) 356 self.addCleanup(os.close, fd) 357 self.assertFalse(os.get_inheritable(fd)) 358 359 @unittest.skipUnless(hasattr(posix, 'O_EXLOCK'), 360 'test needs posix.O_EXLOCK') 361 def test_osexlock(self): 362 fd = os.open(support.TESTFN, 363 os.O_WRONLY|os.O_EXLOCK|os.O_CREAT) 364 self.assertRaises(OSError, os.open, support.TESTFN, 365 os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK) 366 os.close(fd) 367 368 if hasattr(posix, "O_SHLOCK"): 369 fd = os.open(support.TESTFN, 370 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) 371 self.assertRaises(OSError, os.open, support.TESTFN, 372 os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK) 373 os.close(fd) 374 375 @unittest.skipUnless(hasattr(posix, 'O_SHLOCK'), 376 'test needs posix.O_SHLOCK') 377 def test_osshlock(self): 378 fd1 = os.open(support.TESTFN, 379 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) 380 fd2 = os.open(support.TESTFN, 381 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) 382 os.close(fd2) 383 os.close(fd1) 384 385 if hasattr(posix, "O_EXLOCK"): 386 fd = os.open(support.TESTFN, 387 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) 388 self.assertRaises(OSError, os.open, support.TESTFN, 389 os.O_RDONLY|os.O_EXLOCK|os.O_NONBLOCK) 390 os.close(fd) 391 392 @unittest.skipUnless(hasattr(posix, 'fstat'), 393 'test needs posix.fstat()') 394 def test_fstat(self): 395 fp = open(support.TESTFN) 396 try: 397 self.assertTrue(posix.fstat(fp.fileno())) 398 self.assertTrue(posix.stat(fp.fileno())) 399 400 self.assertRaisesRegex(TypeError, 401 'should be string, bytes, os.PathLike or integer, not', 402 posix.stat, float(fp.fileno())) 403 finally: 404 fp.close() 405 406 @unittest.skipUnless(hasattr(posix, 'stat'), 407 'test needs posix.stat()') 408 def test_stat(self): 409 self.assertTrue(posix.stat(support.TESTFN)) 410 self.assertTrue(posix.stat(os.fsencode(support.TESTFN))) 411 412 self.assertWarnsRegex(DeprecationWarning, 413 'should be string, bytes, os.PathLike or integer, not', 414 posix.stat, bytearray(os.fsencode(support.TESTFN))) 415 self.assertRaisesRegex(TypeError, 416 'should be string, bytes, os.PathLike or integer, not', 417 posix.stat, None) 418 self.assertRaisesRegex(TypeError, 419 'should be string, bytes, os.PathLike or integer, not', 420 posix.stat, list(support.TESTFN)) 421 self.assertRaisesRegex(TypeError, 422 'should be string, bytes, os.PathLike or integer, not', 423 posix.stat, list(os.fsencode(support.TESTFN))) 424 425 @unittest.skipUnless(hasattr(posix, 'mkfifo'), "don't have mkfifo()") 426 @unittest.skipIf(android_not_root, "mkfifo not allowed, non root user") 427 def test_mkfifo(self): 428 support.unlink(support.TESTFN) 429 posix.mkfifo(support.TESTFN, stat.S_IRUSR | stat.S_IWUSR) 430 self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode)) 431 432 @unittest.skipUnless(hasattr(posix, 'mknod') and hasattr(stat, 'S_IFIFO'), 433 "don't have mknod()/S_IFIFO") 434 @unittest.skipIf(android_not_root, "mknod not allowed, non root user") 435 def test_mknod(self): 436 # Test using mknod() to create a FIFO (the only use specified 437 # by POSIX). 438 support.unlink(support.TESTFN) 439 mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR 440 try: 441 posix.mknod(support.TESTFN, mode, 0) 442 except OSError as e: 443 # Some old systems don't allow unprivileged users to use 444 # mknod(), or only support creating device nodes. 445 self.assertIn(e.errno, (errno.EPERM, errno.EINVAL)) 446 else: 447 self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode)) 448 449 # Keyword arguments are also supported 450 support.unlink(support.TESTFN) 451 try: 452 posix.mknod(path=support.TESTFN, mode=mode, device=0, 453 dir_fd=None) 454 except OSError as e: 455 self.assertIn(e.errno, (errno.EPERM, errno.EINVAL)) 456 457 @unittest.skipUnless(hasattr(posix, 'stat'), 'test needs posix.stat()') 458 @unittest.skipUnless(hasattr(posix, 'makedev'), 'test needs posix.makedev()') 459 def test_makedev(self): 460 st = posix.stat(support.TESTFN) 461 dev = st.st_dev 462 self.assertIsInstance(dev, int) 463 self.assertGreaterEqual(dev, 0) 464 465 major = posix.major(dev) 466 self.assertIsInstance(major, int) 467 self.assertGreaterEqual(major, 0) 468 self.assertEqual(posix.major(dev), major) 469 self.assertRaises(TypeError, posix.major, float(dev)) 470 self.assertRaises(TypeError, posix.major) 471 self.assertRaises((ValueError, OverflowError), posix.major, -1) 472 473 minor = posix.minor(dev) 474 self.assertIsInstance(minor, int) 475 self.assertGreaterEqual(minor, 0) 476 self.assertEqual(posix.minor(dev), minor) 477 self.assertRaises(TypeError, posix.minor, float(dev)) 478 self.assertRaises(TypeError, posix.minor) 479 self.assertRaises((ValueError, OverflowError), posix.minor, -1) 480 481 self.assertEqual(posix.makedev(major, minor), dev) 482 self.assertRaises(TypeError, posix.makedev, float(major), minor) 483 self.assertRaises(TypeError, posix.makedev, major, float(minor)) 484 self.assertRaises(TypeError, posix.makedev, major) 485 self.assertRaises(TypeError, posix.makedev) 486 487 def _test_all_chown_common(self, chown_func, first_param, stat_func): 488 """Common code for chown, fchown and lchown tests.""" 489 def check_stat(uid, gid): 490 if stat_func is not None: 491 stat = stat_func(first_param) 492 self.assertEqual(stat.st_uid, uid) 493 self.assertEqual(stat.st_gid, gid) 494 uid = os.getuid() 495 gid = os.getgid() 496 # test a successful chown call 497 chown_func(first_param, uid, gid) 498 check_stat(uid, gid) 499 chown_func(first_param, -1, gid) 500 check_stat(uid, gid) 501 chown_func(first_param, uid, -1) 502 check_stat(uid, gid) 503 504 if uid == 0: 505 # Try an amusingly large uid/gid to make sure we handle 506 # large unsigned values. (chown lets you use any 507 # uid/gid you like, even if they aren't defined.) 508 # 509 # This problem keeps coming up: 510 # http://bugs.python.org/issue1747858 511 # http://bugs.python.org/issue4591 512 # http://bugs.python.org/issue15301 513 # Hopefully the fix in 4591 fixes it for good! 514 # 515 # This part of the test only runs when run as root. 516 # Only scary people run their tests as root. 517 518 big_value = 2**31 519 chown_func(first_param, big_value, big_value) 520 check_stat(big_value, big_value) 521 chown_func(first_param, -1, -1) 522 check_stat(big_value, big_value) 523 chown_func(first_param, uid, gid) 524 check_stat(uid, gid) 525 elif platform.system() in ('HP-UX', 'SunOS'): 526 # HP-UX and Solaris can allow a non-root user to chown() to root 527 # (issue #5113) 528 raise unittest.SkipTest("Skipping because of non-standard chown() " 529 "behavior") 530 else: 531 # non-root cannot chown to root, raises OSError 532 self.assertRaises(OSError, chown_func, first_param, 0, 0) 533 check_stat(uid, gid) 534 self.assertRaises(OSError, chown_func, first_param, 0, -1) 535 check_stat(uid, gid) 536 if 0 not in os.getgroups(): 537 self.assertRaises(OSError, chown_func, first_param, -1, 0) 538 check_stat(uid, gid) 539 # test illegal types 540 for t in str, float: 541 self.assertRaises(TypeError, chown_func, first_param, t(uid), gid) 542 check_stat(uid, gid) 543 self.assertRaises(TypeError, chown_func, first_param, uid, t(gid)) 544 check_stat(uid, gid) 545 546 @unittest.skipUnless(hasattr(posix, 'chown'), "test needs os.chown()") 547 def test_chown(self): 548 # raise an OSError if the file does not exist 549 os.unlink(support.TESTFN) 550 self.assertRaises(OSError, posix.chown, support.TESTFN, -1, -1) 551 552 # re-create the file 553 support.create_empty_file(support.TESTFN) 554 self._test_all_chown_common(posix.chown, support.TESTFN, 555 getattr(posix, 'stat', None)) 556 557 @unittest.skipUnless(hasattr(posix, 'fchown'), "test needs os.fchown()") 558 def test_fchown(self): 559 os.unlink(support.TESTFN) 560 561 # re-create the file 562 test_file = open(support.TESTFN, 'w') 563 try: 564 fd = test_file.fileno() 565 self._test_all_chown_common(posix.fchown, fd, 566 getattr(posix, 'fstat', None)) 567 finally: 568 test_file.close() 569 570 @unittest.skipUnless(hasattr(posix, 'lchown'), "test needs os.lchown()") 571 def test_lchown(self): 572 os.unlink(support.TESTFN) 573 # create a symlink 574 os.symlink(_DUMMY_SYMLINK, support.TESTFN) 575 self._test_all_chown_common(posix.lchown, support.TESTFN, 576 getattr(posix, 'lstat', None)) 577 578 @unittest.skipUnless(hasattr(posix, 'chdir'), 'test needs posix.chdir()') 579 def test_chdir(self): 580 posix.chdir(os.curdir) 581 self.assertRaises(OSError, posix.chdir, support.TESTFN) 582 583 def test_listdir(self): 584 self.assertTrue(support.TESTFN in posix.listdir(os.curdir)) 585 586 def test_listdir_default(self): 587 # When listdir is called without argument, 588 # it's the same as listdir(os.curdir). 589 self.assertTrue(support.TESTFN in posix.listdir()) 590 591 def test_listdir_bytes(self): 592 # When listdir is called with a bytes object, 593 # the returned strings are of type bytes. 594 self.assertTrue(os.fsencode(support.TESTFN) in posix.listdir(b'.')) 595 596 @unittest.skipUnless(posix.listdir in os.supports_fd, 597 "test needs fd support for posix.listdir()") 598 def test_listdir_fd(self): 599 f = posix.open(posix.getcwd(), posix.O_RDONLY) 600 self.addCleanup(posix.close, f) 601 self.assertEqual( 602 sorted(posix.listdir('.')), 603 sorted(posix.listdir(f)) 604 ) 605 # Check that the fd offset was reset (issue #13739) 606 self.assertEqual( 607 sorted(posix.listdir('.')), 608 sorted(posix.listdir(f)) 609 ) 610 611 @unittest.skipUnless(hasattr(posix, 'access'), 'test needs posix.access()') 612 def test_access(self): 613 self.assertTrue(posix.access(support.TESTFN, os.R_OK)) 614 615 @unittest.skipUnless(hasattr(posix, 'umask'), 'test needs posix.umask()') 616 def test_umask(self): 617 old_mask = posix.umask(0) 618 self.assertIsInstance(old_mask, int) 619 posix.umask(old_mask) 620 621 @unittest.skipUnless(hasattr(posix, 'strerror'), 622 'test needs posix.strerror()') 623 def test_strerror(self): 624 self.assertTrue(posix.strerror(0)) 625 626 @unittest.skipUnless(hasattr(posix, 'pipe'), 'test needs posix.pipe()') 627 def test_pipe(self): 628 reader, writer = posix.pipe() 629 os.close(reader) 630 os.close(writer) 631 632 @unittest.skipUnless(hasattr(os, 'pipe2'), "test needs os.pipe2()") 633 @support.requires_linux_version(2, 6, 27) 634 def test_pipe2(self): 635 self.assertRaises(TypeError, os.pipe2, 'DEADBEEF') 636 self.assertRaises(TypeError, os.pipe2, 0, 0) 637 638 # try calling with flags = 0, like os.pipe() 639 r, w = os.pipe2(0) 640 os.close(r) 641 os.close(w) 642 643 # test flags 644 r, w = os.pipe2(os.O_CLOEXEC|os.O_NONBLOCK) 645 self.addCleanup(os.close, r) 646 self.addCleanup(os.close, w) 647 self.assertFalse(os.get_inheritable(r)) 648 self.assertFalse(os.get_inheritable(w)) 649 self.assertFalse(os.get_blocking(r)) 650 self.assertFalse(os.get_blocking(w)) 651 # try reading from an empty pipe: this should fail, not block 652 self.assertRaises(OSError, os.read, r, 1) 653 # try a write big enough to fill-up the pipe: this should either 654 # fail or perform a partial write, not block 655 try: 656 os.write(w, b'x' * support.PIPE_MAX_SIZE) 657 except OSError: 658 pass 659 660 @support.cpython_only 661 @unittest.skipUnless(hasattr(os, 'pipe2'), "test needs os.pipe2()") 662 @support.requires_linux_version(2, 6, 27) 663 def test_pipe2_c_limits(self): 664 # Issue 15989 665 import _testcapi 666 self.assertRaises(OverflowError, os.pipe2, _testcapi.INT_MAX + 1) 667 self.assertRaises(OverflowError, os.pipe2, _testcapi.UINT_MAX + 1) 668 669 @unittest.skipUnless(hasattr(posix, 'utime'), 'test needs posix.utime()') 670 def test_utime(self): 671 now = time.time() 672 posix.utime(support.TESTFN, None) 673 self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, None)) 674 self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None)) 675 self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now)) 676 posix.utime(support.TESTFN, (int(now), int(now))) 677 posix.utime(support.TESTFN, (now, now)) 678 679 def _test_chflags_regular_file(self, chflags_func, target_file, **kwargs): 680 st = os.stat(target_file) 681 self.assertTrue(hasattr(st, 'st_flags')) 682 683 # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE. 684 flags = st.st_flags | stat.UF_IMMUTABLE 685 try: 686 chflags_func(target_file, flags, **kwargs) 687 except OSError as err: 688 if err.errno != errno.EOPNOTSUPP: 689 raise 690 msg = 'chflag UF_IMMUTABLE not supported by underlying fs' 691 self.skipTest(msg) 692 693 try: 694 new_st = os.stat(target_file) 695 self.assertEqual(st.st_flags | stat.UF_IMMUTABLE, new_st.st_flags) 696 try: 697 fd = open(target_file, 'w+') 698 except OSError as e: 699 self.assertEqual(e.errno, errno.EPERM) 700 finally: 701 posix.chflags(target_file, st.st_flags) 702 703 @unittest.skipUnless(hasattr(posix, 'chflags'), 'test needs os.chflags()') 704 def test_chflags(self): 705 self._test_chflags_regular_file(posix.chflags, support.TESTFN) 706 707 @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()') 708 def test_lchflags_regular_file(self): 709 self._test_chflags_regular_file(posix.lchflags, support.TESTFN) 710 self._test_chflags_regular_file(posix.chflags, support.TESTFN, follow_symlinks=False) 711 712 @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()') 713 def test_lchflags_symlink(self): 714 testfn_st = os.stat(support.TESTFN) 715 716 self.assertTrue(hasattr(testfn_st, 'st_flags')) 717 718 os.symlink(support.TESTFN, _DUMMY_SYMLINK) 719 self.teardown_files.append(_DUMMY_SYMLINK) 720 dummy_symlink_st = os.lstat(_DUMMY_SYMLINK) 721 722 def chflags_nofollow(path, flags): 723 return posix.chflags(path, flags, follow_symlinks=False) 724 725 for fn in (posix.lchflags, chflags_nofollow): 726 # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE. 727 flags = dummy_symlink_st.st_flags | stat.UF_IMMUTABLE 728 try: 729 fn(_DUMMY_SYMLINK, flags) 730 except OSError as err: 731 if err.errno != errno.EOPNOTSUPP: 732 raise 733 msg = 'chflag UF_IMMUTABLE not supported by underlying fs' 734 self.skipTest(msg) 735 try: 736 new_testfn_st = os.stat(support.TESTFN) 737 new_dummy_symlink_st = os.lstat(_DUMMY_SYMLINK) 738 739 self.assertEqual(testfn_st.st_flags, new_testfn_st.st_flags) 740 self.assertEqual(dummy_symlink_st.st_flags | stat.UF_IMMUTABLE, 741 new_dummy_symlink_st.st_flags) 742 finally: 743 fn(_DUMMY_SYMLINK, dummy_symlink_st.st_flags) 744 745 def test_environ(self): 746 if os.name == "nt": 747 item_type = str 748 else: 749 item_type = bytes 750 for k, v in posix.environ.items(): 751 self.assertEqual(type(k), item_type) 752 self.assertEqual(type(v), item_type) 753 754 @unittest.skipUnless(hasattr(posix, 'getcwd'), 'test needs posix.getcwd()') 755 def test_getcwd_long_pathnames(self): 756 dirname = 'getcwd-test-directory-0123456789abcdef-01234567890abcdef' 757 curdir = os.getcwd() 758 base_path = os.path.abspath(support.TESTFN) + '.getcwd' 759 760 try: 761 os.mkdir(base_path) 762 os.chdir(base_path) 763 except: 764 # Just returning nothing instead of the SkipTest exception, because 765 # the test results in Error in that case. Is that ok? 766 # raise unittest.SkipTest("cannot create directory for testing") 767 return 768 769 def _create_and_do_getcwd(dirname, current_path_length = 0): 770 try: 771 os.mkdir(dirname) 772 except: 773 raise unittest.SkipTest("mkdir cannot create directory sufficiently deep for getcwd test") 774 775 os.chdir(dirname) 776 try: 777 os.getcwd() 778 if current_path_length < 1027: 779 _create_and_do_getcwd(dirname, current_path_length + len(dirname) + 1) 780 finally: 781 os.chdir('..') 782 os.rmdir(dirname) 783 784 _create_and_do_getcwd(dirname) 785 786 finally: 787 os.chdir(curdir) 788 support.rmtree(base_path) 789 790 @unittest.skipUnless(hasattr(posix, 'getgrouplist'), "test needs posix.getgrouplist()") 791 @unittest.skipUnless(hasattr(pwd, 'getpwuid'), "test needs pwd.getpwuid()") 792 @unittest.skipUnless(hasattr(os, 'getuid'), "test needs os.getuid()") 793 def test_getgrouplist(self): 794 user = pwd.getpwuid(os.getuid())[0] 795 group = pwd.getpwuid(os.getuid())[3] 796 self.assertIn(group, posix.getgrouplist(user, group)) 797 798 799 @unittest.skipUnless(hasattr(os, 'getegid'), "test needs os.getegid()") 800 def test_getgroups(self): 801 with os.popen('id -G 2>/dev/null') as idg: 802 groups = idg.read().strip() 803 ret = idg.close() 804 805 try: 806 idg_groups = set(int(g) for g in groups.split()) 807 except ValueError: 808 idg_groups = set() 809 if ret is not None or not idg_groups: 810 raise unittest.SkipTest("need working 'id -G'") 811 812 # Issues 16698: OS X ABIs prior to 10.6 have limits on getgroups() 813 if sys.platform == 'darwin': 814 import sysconfig 815 dt = sysconfig.get_config_var('MACOSX_DEPLOYMENT_TARGET') or '10.0' 816 if tuple(int(n) for n in dt.split('.')[0:2]) < (10, 6): 817 raise unittest.SkipTest("getgroups(2) is broken prior to 10.6") 818 819 # 'id -G' and 'os.getgroups()' should return the same 820 # groups, ignoring order, duplicates, and the effective gid. 821 # #10822/#26944 - It is implementation defined whether 822 # posix.getgroups() includes the effective gid. 823 symdiff = idg_groups.symmetric_difference(posix.getgroups()) 824 self.assertTrue(not symdiff or symdiff == {posix.getegid()}) 825 826 # tests for the posix *at functions follow 827 828 @unittest.skipUnless(os.access in os.supports_dir_fd, "test needs dir_fd support for os.access()") 829 def test_access_dir_fd(self): 830 f = posix.open(posix.getcwd(), posix.O_RDONLY) 831 try: 832 self.assertTrue(posix.access(support.TESTFN, os.R_OK, dir_fd=f)) 833 finally: 834 posix.close(f) 835 836 @unittest.skipUnless(os.chmod in os.supports_dir_fd, "test needs dir_fd support in os.chmod()") 837 def test_chmod_dir_fd(self): 838 os.chmod(support.TESTFN, stat.S_IRUSR) 839 840 f = posix.open(posix.getcwd(), posix.O_RDONLY) 841 try: 842 posix.chmod(support.TESTFN, stat.S_IRUSR | stat.S_IWUSR, dir_fd=f) 843 844 s = posix.stat(support.TESTFN) 845 self.assertEqual(s[0] & stat.S_IRWXU, stat.S_IRUSR | stat.S_IWUSR) 846 finally: 847 posix.close(f) 848 849 @unittest.skipUnless(os.chown in os.supports_dir_fd, "test needs dir_fd support in os.chown()") 850 def test_chown_dir_fd(self): 851 support.unlink(support.TESTFN) 852 support.create_empty_file(support.TESTFN) 853 854 f = posix.open(posix.getcwd(), posix.O_RDONLY) 855 try: 856 posix.chown(support.TESTFN, os.getuid(), os.getgid(), dir_fd=f) 857 finally: 858 posix.close(f) 859 860 @unittest.skipUnless(os.stat in os.supports_dir_fd, "test needs dir_fd support in os.stat()") 861 def test_stat_dir_fd(self): 862 support.unlink(support.TESTFN) 863 with open(support.TESTFN, 'w') as outfile: 864 outfile.write("testline\n") 865 866 f = posix.open(posix.getcwd(), posix.O_RDONLY) 867 try: 868 s1 = posix.stat(support.TESTFN) 869 s2 = posix.stat(support.TESTFN, dir_fd=f) 870 self.assertEqual(s1, s2) 871 s2 = posix.stat(support.TESTFN, dir_fd=None) 872 self.assertEqual(s1, s2) 873 self.assertRaisesRegex(TypeError, 'should be integer or None, not', 874 posix.stat, support.TESTFN, dir_fd=posix.getcwd()) 875 self.assertRaisesRegex(TypeError, 'should be integer or None, not', 876 posix.stat, support.TESTFN, dir_fd=float(f)) 877 self.assertRaises(OverflowError, 878 posix.stat, support.TESTFN, dir_fd=10**20) 879 finally: 880 posix.close(f) 881 882 @unittest.skipUnless(os.utime in os.supports_dir_fd, "test needs dir_fd support in os.utime()") 883 def test_utime_dir_fd(self): 884 f = posix.open(posix.getcwd(), posix.O_RDONLY) 885 try: 886 now = time.time() 887 posix.utime(support.TESTFN, None, dir_fd=f) 888 posix.utime(support.TESTFN, dir_fd=f) 889 self.assertRaises(TypeError, posix.utime, support.TESTFN, now, dir_fd=f) 890 self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, None), dir_fd=f) 891 self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None), dir_fd=f) 892 self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now), dir_fd=f) 893 self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, "x"), dir_fd=f) 894 posix.utime(support.TESTFN, (int(now), int(now)), dir_fd=f) 895 posix.utime(support.TESTFN, (now, now), dir_fd=f) 896 posix.utime(support.TESTFN, 897 (int(now), int((now - int(now)) * 1e9)), dir_fd=f) 898 posix.utime(support.TESTFN, dir_fd=f, 899 times=(int(now), int((now - int(now)) * 1e9))) 900 901 # try dir_fd and follow_symlinks together 902 if os.utime in os.supports_follow_symlinks: 903 try: 904 posix.utime(support.TESTFN, follow_symlinks=False, dir_fd=f) 905 except ValueError: 906 # whoops! using both together not supported on this platform. 907 pass 908 909 finally: 910 posix.close(f) 911 912 @unittest.skipUnless(os.link in os.supports_dir_fd, "test needs dir_fd support in os.link()") 913 @unittest.skipIf(android_not_root, "hard link not allowed, non root user") 914 def test_link_dir_fd(self): 915 f = posix.open(posix.getcwd(), posix.O_RDONLY) 916 try: 917 posix.link(support.TESTFN, support.TESTFN + 'link', src_dir_fd=f, dst_dir_fd=f) 918 # should have same inodes 919 self.assertEqual(posix.stat(support.TESTFN)[1], 920 posix.stat(support.TESTFN + 'link')[1]) 921 finally: 922 posix.close(f) 923 support.unlink(support.TESTFN + 'link') 924 925 @unittest.skipUnless(os.mkdir in os.supports_dir_fd, "test needs dir_fd support in os.mkdir()") 926 def test_mkdir_dir_fd(self): 927 f = posix.open(posix.getcwd(), posix.O_RDONLY) 928 try: 929 posix.mkdir(support.TESTFN + 'dir', dir_fd=f) 930 posix.stat(support.TESTFN + 'dir') # should not raise exception 931 finally: 932 posix.close(f) 933 support.rmtree(support.TESTFN + 'dir') 934 935 @unittest.skipUnless((os.mknod in os.supports_dir_fd) and hasattr(stat, 'S_IFIFO'), 936 "test requires both stat.S_IFIFO and dir_fd support for os.mknod()") 937 @unittest.skipIf(android_not_root, "mknod not allowed, non root user") 938 def test_mknod_dir_fd(self): 939 # Test using mknodat() to create a FIFO (the only use specified 940 # by POSIX). 941 support.unlink(support.TESTFN) 942 mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR 943 f = posix.open(posix.getcwd(), posix.O_RDONLY) 944 try: 945 posix.mknod(support.TESTFN, mode, 0, dir_fd=f) 946 except OSError as e: 947 # Some old systems don't allow unprivileged users to use 948 # mknod(), or only support creating device nodes. 949 self.assertIn(e.errno, (errno.EPERM, errno.EINVAL)) 950 else: 951 self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode)) 952 finally: 953 posix.close(f) 954 955 @unittest.skipUnless(os.open in os.supports_dir_fd, "test needs dir_fd support in os.open()") 956 def test_open_dir_fd(self): 957 support.unlink(support.TESTFN) 958 with open(support.TESTFN, 'w') as outfile: 959 outfile.write("testline\n") 960 a = posix.open(posix.getcwd(), posix.O_RDONLY) 961 b = posix.open(support.TESTFN, posix.O_RDONLY, dir_fd=a) 962 try: 963 res = posix.read(b, 9).decode(encoding="utf-8") 964 self.assertEqual("testline\n", res) 965 finally: 966 posix.close(a) 967 posix.close(b) 968 969 @unittest.skipUnless(os.readlink in os.supports_dir_fd, "test needs dir_fd support in os.readlink()") 970 def test_readlink_dir_fd(self): 971 os.symlink(support.TESTFN, support.TESTFN + 'link') 972 f = posix.open(posix.getcwd(), posix.O_RDONLY) 973 try: 974 self.assertEqual(posix.readlink(support.TESTFN + 'link'), 975 posix.readlink(support.TESTFN + 'link', dir_fd=f)) 976 finally: 977 support.unlink(support.TESTFN + 'link') 978 posix.close(f) 979 980 @unittest.skipUnless(os.rename in os.supports_dir_fd, "test needs dir_fd support in os.rename()") 981 def test_rename_dir_fd(self): 982 support.unlink(support.TESTFN) 983 support.create_empty_file(support.TESTFN + 'ren') 984 f = posix.open(posix.getcwd(), posix.O_RDONLY) 985 try: 986 posix.rename(support.TESTFN + 'ren', support.TESTFN, src_dir_fd=f, dst_dir_fd=f) 987 except: 988 posix.rename(support.TESTFN + 'ren', support.TESTFN) 989 raise 990 else: 991 posix.stat(support.TESTFN) # should not raise exception 992 finally: 993 posix.close(f) 994 995 @unittest.skipUnless(os.symlink in os.supports_dir_fd, "test needs dir_fd support in os.symlink()") 996 def test_symlink_dir_fd(self): 997 f = posix.open(posix.getcwd(), posix.O_RDONLY) 998 try: 999 posix.symlink(support.TESTFN, support.TESTFN + 'link', dir_fd=f) 1000 self.assertEqual(posix.readlink(support.TESTFN + 'link'), support.TESTFN) 1001 finally: 1002 posix.close(f) 1003 support.unlink(support.TESTFN + 'link') 1004 1005 @unittest.skipUnless(os.unlink in os.supports_dir_fd, "test needs dir_fd support in os.unlink()") 1006 def test_unlink_dir_fd(self): 1007 f = posix.open(posix.getcwd(), posix.O_RDONLY) 1008 support.create_empty_file(support.TESTFN + 'del') 1009 posix.stat(support.TESTFN + 'del') # should not raise exception 1010 try: 1011 posix.unlink(support.TESTFN + 'del', dir_fd=f) 1012 except: 1013 support.unlink(support.TESTFN + 'del') 1014 raise 1015 else: 1016 self.assertRaises(OSError, posix.stat, support.TESTFN + 'link') 1017 finally: 1018 posix.close(f) 1019 1020 @unittest.skipUnless(os.mkfifo in os.supports_dir_fd, "test needs dir_fd support in os.mkfifo()") 1021 @unittest.skipIf(android_not_root, "mkfifo not allowed, non root user") 1022 def test_mkfifo_dir_fd(self): 1023 support.unlink(support.TESTFN) 1024 f = posix.open(posix.getcwd(), posix.O_RDONLY) 1025 try: 1026 posix.mkfifo(support.TESTFN, stat.S_IRUSR | stat.S_IWUSR, dir_fd=f) 1027 self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode)) 1028 finally: 1029 posix.close(f) 1030 1031 requires_sched_h = unittest.skipUnless(hasattr(posix, 'sched_yield'), 1032 "don't have scheduling support") 1033 requires_sched_affinity = unittest.skipUnless(hasattr(posix, 'sched_setaffinity'), 1034 "don't have sched affinity support") 1035 1036 @requires_sched_h 1037 def test_sched_yield(self): 1038 # This has no error conditions (at least on Linux). 1039 posix.sched_yield() 1040 1041 @requires_sched_h 1042 @unittest.skipUnless(hasattr(posix, 'sched_get_priority_max'), 1043 "requires sched_get_priority_max()") 1044 def test_sched_priority(self): 1045 # Round-robin usually has interesting priorities. 1046 pol = posix.SCHED_RR 1047 lo = posix.sched_get_priority_min(pol) 1048 hi = posix.sched_get_priority_max(pol) 1049 self.assertIsInstance(lo, int) 1050 self.assertIsInstance(hi, int) 1051 self.assertGreaterEqual(hi, lo) 1052 # OSX evidently just returns 15 without checking the argument. 1053 if sys.platform != "darwin": 1054 self.assertRaises(OSError, posix.sched_get_priority_min, -23) 1055 self.assertRaises(OSError, posix.sched_get_priority_max, -23) 1056 1057 @unittest.skipUnless(hasattr(posix, 'sched_setscheduler'), "can't change scheduler") 1058 def test_get_and_set_scheduler_and_param(self): 1059 possible_schedulers = [sched for name, sched in posix.__dict__.items() 1060 if name.startswith("SCHED_")] 1061 mine = posix.sched_getscheduler(0) 1062 self.assertIn(mine, possible_schedulers) 1063 try: 1064 parent = posix.sched_getscheduler(os.getppid()) 1065 except OSError as e: 1066 if e.errno != errno.EPERM: 1067 raise 1068 else: 1069 self.assertIn(parent, possible_schedulers) 1070 self.assertRaises(OSError, posix.sched_getscheduler, -1) 1071 self.assertRaises(OSError, posix.sched_getparam, -1) 1072 param = posix.sched_getparam(0) 1073 self.assertIsInstance(param.sched_priority, int) 1074 1075 # POSIX states that calling sched_setparam() or sched_setscheduler() on 1076 # a process with a scheduling policy other than SCHED_FIFO or SCHED_RR 1077 # is implementation-defined: NetBSD and FreeBSD can return EINVAL. 1078 if not sys.platform.startswith(('freebsd', 'netbsd')): 1079 try: 1080 posix.sched_setscheduler(0, mine, param) 1081 posix.sched_setparam(0, param) 1082 except OSError as e: 1083 if e.errno != errno.EPERM: 1084 raise 1085 self.assertRaises(OSError, posix.sched_setparam, -1, param) 1086 1087 self.assertRaises(OSError, posix.sched_setscheduler, -1, mine, param) 1088 self.assertRaises(TypeError, posix.sched_setscheduler, 0, mine, None) 1089 self.assertRaises(TypeError, posix.sched_setparam, 0, 43) 1090 param = posix.sched_param(None) 1091 self.assertRaises(TypeError, posix.sched_setparam, 0, param) 1092 large = 214748364700 1093 param = posix.sched_param(large) 1094 self.assertRaises(OverflowError, posix.sched_setparam, 0, param) 1095 param = posix.sched_param(sched_priority=-large) 1096 self.assertRaises(OverflowError, posix.sched_setparam, 0, param) 1097 1098 @unittest.skipUnless(hasattr(posix, "sched_rr_get_interval"), "no function") 1099 def test_sched_rr_get_interval(self): 1100 try: 1101 interval = posix.sched_rr_get_interval(0) 1102 except OSError as e: 1103 # This likely means that sched_rr_get_interval is only valid for 1104 # processes with the SCHED_RR scheduler in effect. 1105 if e.errno != errno.EINVAL: 1106 raise 1107 self.skipTest("only works on SCHED_RR processes") 1108 self.assertIsInstance(interval, float) 1109 # Reasonable constraints, I think. 1110 self.assertGreaterEqual(interval, 0.) 1111 self.assertLess(interval, 1.) 1112 1113 @requires_sched_affinity 1114 def test_sched_getaffinity(self): 1115 mask = posix.sched_getaffinity(0) 1116 self.assertIsInstance(mask, set) 1117 self.assertGreaterEqual(len(mask), 1) 1118 self.assertRaises(OSError, posix.sched_getaffinity, -1) 1119 for cpu in mask: 1120 self.assertIsInstance(cpu, int) 1121 self.assertGreaterEqual(cpu, 0) 1122 self.assertLess(cpu, 1 << 32) 1123 1124 @requires_sched_affinity 1125 def test_sched_setaffinity(self): 1126 mask = posix.sched_getaffinity(0) 1127 if len(mask) > 1: 1128 # Empty masks are forbidden 1129 mask.pop() 1130 posix.sched_setaffinity(0, mask) 1131 self.assertEqual(posix.sched_getaffinity(0), mask) 1132 self.assertRaises(OSError, posix.sched_setaffinity, 0, []) 1133 self.assertRaises(ValueError, posix.sched_setaffinity, 0, [-10]) 1134 self.assertRaises(OverflowError, posix.sched_setaffinity, 0, [1<<128]) 1135 self.assertRaises(OSError, posix.sched_setaffinity, -1, mask) 1136 1137 def test_rtld_constants(self): 1138 # check presence of major RTLD_* constants 1139 posix.RTLD_LAZY 1140 posix.RTLD_NOW 1141 posix.RTLD_GLOBAL 1142 posix.RTLD_LOCAL 1143 1144 @unittest.skipUnless(hasattr(os, 'SEEK_HOLE'), 1145 "test needs an OS that reports file holes") 1146 def test_fs_holes(self): 1147 # Even if the filesystem doesn't report holes, 1148 # if the OS supports it the SEEK_* constants 1149 # will be defined and will have a consistent 1150 # behaviour: 1151 # os.SEEK_DATA = current position 1152 # os.SEEK_HOLE = end of file position 1153 with open(support.TESTFN, 'r+b') as fp: 1154 fp.write(b"hello") 1155 fp.flush() 1156 size = fp.tell() 1157 fno = fp.fileno() 1158 try : 1159 for i in range(size): 1160 self.assertEqual(i, os.lseek(fno, i, os.SEEK_DATA)) 1161 self.assertLessEqual(size, os.lseek(fno, i, os.SEEK_HOLE)) 1162 self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_DATA) 1163 self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_HOLE) 1164 except OSError : 1165 # Some OSs claim to support SEEK_HOLE/SEEK_DATA 1166 # but it is not true. 1167 # For instance: 1168 # http://lists.freebsd.org/pipermail/freebsd-amd64/2012-January/014332.html 1169 raise unittest.SkipTest("OSError raised!") 1170 1171 def test_path_error2(self): 1172 """ 1173 Test functions that call path_error2(), providing two filenames in their exceptions. 1174 """ 1175 for name in ("rename", "replace", "link"): 1176 function = getattr(os, name, None) 1177 if function is None: 1178 continue 1179 1180 for dst in ("noodly2", support.TESTFN): 1181 try: 1182 function('doesnotexistfilename', dst) 1183 except OSError as e: 1184 self.assertIn("'doesnotexistfilename' -> '{}'".format(dst), str(e)) 1185 break 1186 else: 1187 self.fail("No valid path_error2() test for os." + name) 1188 1189 def test_path_with_null_character(self): 1190 fn = support.TESTFN 1191 fn_with_NUL = fn + '\0' 1192 self.addCleanup(support.unlink, fn) 1193 support.unlink(fn) 1194 fd = None 1195 try: 1196 with self.assertRaises(ValueError): 1197 fd = os.open(fn_with_NUL, os.O_WRONLY | os.O_CREAT) # raises 1198 finally: 1199 if fd is not None: 1200 os.close(fd) 1201 self.assertFalse(os.path.exists(fn)) 1202 self.assertRaises(ValueError, os.mkdir, fn_with_NUL) 1203 self.assertFalse(os.path.exists(fn)) 1204 open(fn, 'wb').close() 1205 self.assertRaises(ValueError, os.stat, fn_with_NUL) 1206 1207 def test_path_with_null_byte(self): 1208 fn = os.fsencode(support.TESTFN) 1209 fn_with_NUL = fn + b'\0' 1210 self.addCleanup(support.unlink, fn) 1211 support.unlink(fn) 1212 fd = None 1213 try: 1214 with self.assertRaises(ValueError): 1215 fd = os.open(fn_with_NUL, os.O_WRONLY | os.O_CREAT) # raises 1216 finally: 1217 if fd is not None: 1218 os.close(fd) 1219 self.assertFalse(os.path.exists(fn)) 1220 self.assertRaises(ValueError, os.mkdir, fn_with_NUL) 1221 self.assertFalse(os.path.exists(fn)) 1222 open(fn, 'wb').close() 1223 self.assertRaises(ValueError, os.stat, fn_with_NUL) 1224 1225class PosixGroupsTester(unittest.TestCase): 1226 1227 def setUp(self): 1228 if posix.getuid() != 0: 1229 raise unittest.SkipTest("not enough privileges") 1230 if not hasattr(posix, 'getgroups'): 1231 raise unittest.SkipTest("need posix.getgroups") 1232 if sys.platform == 'darwin': 1233 raise unittest.SkipTest("getgroups(2) is broken on OSX") 1234 self.saved_groups = posix.getgroups() 1235 1236 def tearDown(self): 1237 if hasattr(posix, 'setgroups'): 1238 posix.setgroups(self.saved_groups) 1239 elif hasattr(posix, 'initgroups'): 1240 name = pwd.getpwuid(posix.getuid()).pw_name 1241 posix.initgroups(name, self.saved_groups[0]) 1242 1243 @unittest.skipUnless(hasattr(posix, 'initgroups'), 1244 "test needs posix.initgroups()") 1245 def test_initgroups(self): 1246 # find missing group 1247 1248 g = max(self.saved_groups or [0]) + 1 1249 name = pwd.getpwuid(posix.getuid()).pw_name 1250 posix.initgroups(name, g) 1251 self.assertIn(g, posix.getgroups()) 1252 1253 @unittest.skipUnless(hasattr(posix, 'setgroups'), 1254 "test needs posix.setgroups()") 1255 def test_setgroups(self): 1256 for groups in [[0], list(range(16))]: 1257 posix.setgroups(groups) 1258 self.assertListEqual(groups, posix.getgroups()) 1259 1260def test_main(): 1261 try: 1262 support.run_unittest(PosixTester, PosixGroupsTester) 1263 finally: 1264 support.reap_children() 1265 1266if __name__ == '__main__': 1267 test_main() 1268