1# As a test suite for the os module, this is woefully inadequate, but this 2# does add tests for a few functions which have been determined to be more 3# portable than they had been thought to be. 4 5import asynchat 6import asyncore 7import codecs 8import contextlib 9import decimal 10import errno 11import fnmatch 12import fractions 13import itertools 14import locale 15import mmap 16import os 17import pickle 18import shutil 19import signal 20import socket 21import stat 22import subprocess 23import sys 24import sysconfig 25import tempfile 26import threading 27import time 28import types 29import unittest 30import uuid 31import warnings 32from test import support 33from test.support import socket_helper 34from platform import win32_is_iot 35 36try: 37 import resource 38except ImportError: 39 resource = None 40try: 41 import fcntl 42except ImportError: 43 fcntl = None 44try: 45 import _winapi 46except ImportError: 47 _winapi = None 48try: 49 import pwd 50 all_users = [u.pw_uid for u in pwd.getpwall()] 51except (ImportError, AttributeError): 52 all_users = [] 53try: 54 from _testcapi import INT_MAX, PY_SSIZE_T_MAX 55except ImportError: 56 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize 57 58from test.support.script_helper import assert_python_ok 59from test.support import unix_shell, FakePath 60 61 62root_in_posix = False 63if hasattr(os, 'geteuid'): 64 root_in_posix = (os.geteuid() == 0) 65 66# Detect whether we're on a Linux system that uses the (now outdated 67# and unmaintained) linuxthreads threading library. There's an issue 68# when combining linuxthreads with a failed execv call: see 69# http://bugs.python.org/issue4970. 70if hasattr(sys, 'thread_info') and sys.thread_info.version: 71 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads") 72else: 73 USING_LINUXTHREADS = False 74 75# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group. 76HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0 77 78 79def requires_os_func(name): 80 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name) 81 82 83def create_file(filename, content=b'content'): 84 with open(filename, "xb", 0) as fp: 85 fp.write(content) 86 87 88class MiscTests(unittest.TestCase): 89 def test_getcwd(self): 90 cwd = os.getcwd() 91 self.assertIsInstance(cwd, str) 92 93 def test_getcwd_long_path(self): 94 # bpo-37412: On Linux, PATH_MAX is usually around 4096 bytes. On 95 # Windows, MAX_PATH is defined as 260 characters, but Windows supports 96 # longer path if longer paths support is enabled. Internally, the os 97 # module uses MAXPATHLEN which is at least 1024. 98 # 99 # Use a directory name of 200 characters to fit into Windows MAX_PATH 100 # limit. 101 # 102 # On Windows, the test can stop when trying to create a path longer 103 # than MAX_PATH if long paths support is disabled: 104 # see RtlAreLongPathsEnabled(). 105 min_len = 2000 # characters 106 dirlen = 200 # characters 107 dirname = 'python_test_dir_' 108 dirname = dirname + ('a' * (dirlen - len(dirname))) 109 110 with tempfile.TemporaryDirectory() as tmpdir: 111 with support.change_cwd(tmpdir) as path: 112 expected = path 113 114 while True: 115 cwd = os.getcwd() 116 self.assertEqual(cwd, expected) 117 118 need = min_len - (len(cwd) + len(os.path.sep)) 119 if need <= 0: 120 break 121 if len(dirname) > need and need > 0: 122 dirname = dirname[:need] 123 124 path = os.path.join(path, dirname) 125 try: 126 os.mkdir(path) 127 # On Windows, chdir() can fail 128 # even if mkdir() succeeded 129 os.chdir(path) 130 except FileNotFoundError: 131 # On Windows, catch ERROR_PATH_NOT_FOUND (3) and 132 # ERROR_FILENAME_EXCED_RANGE (206) errors 133 # ("The filename or extension is too long") 134 break 135 except OSError as exc: 136 if exc.errno == errno.ENAMETOOLONG: 137 break 138 else: 139 raise 140 141 expected = path 142 143 if support.verbose: 144 print(f"Tested current directory length: {len(cwd)}") 145 146 def test_getcwdb(self): 147 cwd = os.getcwdb() 148 self.assertIsInstance(cwd, bytes) 149 self.assertEqual(os.fsdecode(cwd), os.getcwd()) 150 151 152# Tests creating TESTFN 153class FileTests(unittest.TestCase): 154 def setUp(self): 155 if os.path.lexists(support.TESTFN): 156 os.unlink(support.TESTFN) 157 tearDown = setUp 158 159 def test_access(self): 160 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR) 161 os.close(f) 162 self.assertTrue(os.access(support.TESTFN, os.W_OK)) 163 164 def test_closerange(self): 165 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR) 166 # We must allocate two consecutive file descriptors, otherwise 167 # it will mess up other file descriptors (perhaps even the three 168 # standard ones). 169 second = os.dup(first) 170 try: 171 retries = 0 172 while second != first + 1: 173 os.close(first) 174 retries += 1 175 if retries > 10: 176 # XXX test skipped 177 self.skipTest("couldn't allocate two consecutive fds") 178 first, second = second, os.dup(second) 179 finally: 180 os.close(second) 181 # close a fd that is open, and one that isn't 182 os.closerange(first, first + 2) 183 self.assertRaises(OSError, os.write, first, b"a") 184 185 @support.cpython_only 186 def test_rename(self): 187 path = support.TESTFN 188 old = sys.getrefcount(path) 189 self.assertRaises(TypeError, os.rename, path, 0) 190 new = sys.getrefcount(path) 191 self.assertEqual(old, new) 192 193 def test_read(self): 194 with open(support.TESTFN, "w+b") as fobj: 195 fobj.write(b"spam") 196 fobj.flush() 197 fd = fobj.fileno() 198 os.lseek(fd, 0, 0) 199 s = os.read(fd, 4) 200 self.assertEqual(type(s), bytes) 201 self.assertEqual(s, b"spam") 202 203 @support.cpython_only 204 # Skip the test on 32-bit platforms: the number of bytes must fit in a 205 # Py_ssize_t type 206 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX, 207 "needs INT_MAX < PY_SSIZE_T_MAX") 208 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False) 209 def test_large_read(self, size): 210 self.addCleanup(support.unlink, support.TESTFN) 211 create_file(support.TESTFN, b'test') 212 213 # Issue #21932: Make sure that os.read() does not raise an 214 # OverflowError for size larger than INT_MAX 215 with open(support.TESTFN, "rb") as fp: 216 data = os.read(fp.fileno(), size) 217 218 # The test does not try to read more than 2 GiB at once because the 219 # operating system is free to return less bytes than requested. 220 self.assertEqual(data, b'test') 221 222 def test_write(self): 223 # os.write() accepts bytes- and buffer-like objects but not strings 224 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY) 225 self.assertRaises(TypeError, os.write, fd, "beans") 226 os.write(fd, b"bacon\n") 227 os.write(fd, bytearray(b"eggs\n")) 228 os.write(fd, memoryview(b"spam\n")) 229 os.close(fd) 230 with open(support.TESTFN, "rb") as fobj: 231 self.assertEqual(fobj.read().splitlines(), 232 [b"bacon", b"eggs", b"spam"]) 233 234 def write_windows_console(self, *args): 235 retcode = subprocess.call(args, 236 # use a new console to not flood the test output 237 creationflags=subprocess.CREATE_NEW_CONSOLE, 238 # use a shell to hide the console window (SW_HIDE) 239 shell=True) 240 self.assertEqual(retcode, 0) 241 242 @unittest.skipUnless(sys.platform == 'win32', 243 'test specific to the Windows console') 244 def test_write_windows_console(self): 245 # Issue #11395: the Windows console returns an error (12: not enough 246 # space error) on writing into stdout if stdout mode is binary and the 247 # length is greater than 66,000 bytes (or less, depending on heap 248 # usage). 249 code = "print('x' * 100000)" 250 self.write_windows_console(sys.executable, "-c", code) 251 self.write_windows_console(sys.executable, "-u", "-c", code) 252 253 def fdopen_helper(self, *args): 254 fd = os.open(support.TESTFN, os.O_RDONLY) 255 f = os.fdopen(fd, *args) 256 f.close() 257 258 def test_fdopen(self): 259 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR) 260 os.close(fd) 261 262 self.fdopen_helper() 263 self.fdopen_helper('r') 264 self.fdopen_helper('r', 100) 265 266 def test_replace(self): 267 TESTFN2 = support.TESTFN + ".2" 268 self.addCleanup(support.unlink, support.TESTFN) 269 self.addCleanup(support.unlink, TESTFN2) 270 271 create_file(support.TESTFN, b"1") 272 create_file(TESTFN2, b"2") 273 274 os.replace(support.TESTFN, TESTFN2) 275 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN) 276 with open(TESTFN2, 'r') as f: 277 self.assertEqual(f.read(), "1") 278 279 def test_open_keywords(self): 280 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777, 281 dir_fd=None) 282 os.close(f) 283 284 def test_symlink_keywords(self): 285 symlink = support.get_attribute(os, "symlink") 286 try: 287 symlink(src='target', dst=support.TESTFN, 288 target_is_directory=False, dir_fd=None) 289 except (NotImplementedError, OSError): 290 pass # No OS support or unprivileged user 291 292 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()') 293 def test_copy_file_range_invalid_values(self): 294 with self.assertRaises(ValueError): 295 os.copy_file_range(0, 1, -10) 296 297 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()') 298 def test_copy_file_range(self): 299 TESTFN2 = support.TESTFN + ".3" 300 data = b'0123456789' 301 302 create_file(support.TESTFN, data) 303 self.addCleanup(support.unlink, support.TESTFN) 304 305 in_file = open(support.TESTFN, 'rb') 306 self.addCleanup(in_file.close) 307 in_fd = in_file.fileno() 308 309 out_file = open(TESTFN2, 'w+b') 310 self.addCleanup(support.unlink, TESTFN2) 311 self.addCleanup(out_file.close) 312 out_fd = out_file.fileno() 313 314 try: 315 i = os.copy_file_range(in_fd, out_fd, 5) 316 except OSError as e: 317 # Handle the case in which Python was compiled 318 # in a system with the syscall but without support 319 # in the kernel. 320 if e.errno != errno.ENOSYS: 321 raise 322 self.skipTest(e) 323 else: 324 # The number of copied bytes can be less than 325 # the number of bytes originally requested. 326 self.assertIn(i, range(0, 6)); 327 328 with open(TESTFN2, 'rb') as in_file: 329 self.assertEqual(in_file.read(), data[:i]) 330 331 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()') 332 def test_copy_file_range_offset(self): 333 TESTFN4 = support.TESTFN + ".4" 334 data = b'0123456789' 335 bytes_to_copy = 6 336 in_skip = 3 337 out_seek = 5 338 339 create_file(support.TESTFN, data) 340 self.addCleanup(support.unlink, support.TESTFN) 341 342 in_file = open(support.TESTFN, 'rb') 343 self.addCleanup(in_file.close) 344 in_fd = in_file.fileno() 345 346 out_file = open(TESTFN4, 'w+b') 347 self.addCleanup(support.unlink, TESTFN4) 348 self.addCleanup(out_file.close) 349 out_fd = out_file.fileno() 350 351 try: 352 i = os.copy_file_range(in_fd, out_fd, bytes_to_copy, 353 offset_src=in_skip, 354 offset_dst=out_seek) 355 except OSError as e: 356 # Handle the case in which Python was compiled 357 # in a system with the syscall but without support 358 # in the kernel. 359 if e.errno != errno.ENOSYS: 360 raise 361 self.skipTest(e) 362 else: 363 # The number of copied bytes can be less than 364 # the number of bytes originally requested. 365 self.assertIn(i, range(0, bytes_to_copy+1)); 366 367 with open(TESTFN4, 'rb') as in_file: 368 read = in_file.read() 369 # seeked bytes (5) are zero'ed 370 self.assertEqual(read[:out_seek], b'\x00'*out_seek) 371 # 012 are skipped (in_skip) 372 # 345678 are copied in the file (in_skip + bytes_to_copy) 373 self.assertEqual(read[out_seek:], 374 data[in_skip:in_skip+i]) 375 376# Test attributes on return values from os.*stat* family. 377class StatAttributeTests(unittest.TestCase): 378 def setUp(self): 379 self.fname = support.TESTFN 380 self.addCleanup(support.unlink, self.fname) 381 create_file(self.fname, b"ABC") 382 383 def check_stat_attributes(self, fname): 384 result = os.stat(fname) 385 386 # Make sure direct access works 387 self.assertEqual(result[stat.ST_SIZE], 3) 388 self.assertEqual(result.st_size, 3) 389 390 # Make sure all the attributes are there 391 members = dir(result) 392 for name in dir(stat): 393 if name[:3] == 'ST_': 394 attr = name.lower() 395 if name.endswith("TIME"): 396 def trunc(x): return int(x) 397 else: 398 def trunc(x): return x 399 self.assertEqual(trunc(getattr(result, attr)), 400 result[getattr(stat, name)]) 401 self.assertIn(attr, members) 402 403 # Make sure that the st_?time and st_?time_ns fields roughly agree 404 # (they should always agree up to around tens-of-microseconds) 405 for name in 'st_atime st_mtime st_ctime'.split(): 406 floaty = int(getattr(result, name) * 100000) 407 nanosecondy = getattr(result, name + "_ns") // 10000 408 self.assertAlmostEqual(floaty, nanosecondy, delta=2) 409 410 try: 411 result[200] 412 self.fail("No exception raised") 413 except IndexError: 414 pass 415 416 # Make sure that assignment fails 417 try: 418 result.st_mode = 1 419 self.fail("No exception raised") 420 except AttributeError: 421 pass 422 423 try: 424 result.st_rdev = 1 425 self.fail("No exception raised") 426 except (AttributeError, TypeError): 427 pass 428 429 try: 430 result.parrot = 1 431 self.fail("No exception raised") 432 except AttributeError: 433 pass 434 435 # Use the stat_result constructor with a too-short tuple. 436 try: 437 result2 = os.stat_result((10,)) 438 self.fail("No exception raised") 439 except TypeError: 440 pass 441 442 # Use the constructor with a too-long tuple. 443 try: 444 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14)) 445 except TypeError: 446 pass 447 448 def test_stat_attributes(self): 449 self.check_stat_attributes(self.fname) 450 451 def test_stat_attributes_bytes(self): 452 try: 453 fname = self.fname.encode(sys.getfilesystemencoding()) 454 except UnicodeEncodeError: 455 self.skipTest("cannot encode %a for the filesystem" % self.fname) 456 self.check_stat_attributes(fname) 457 458 def test_stat_result_pickle(self): 459 result = os.stat(self.fname) 460 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 461 p = pickle.dumps(result, proto) 462 self.assertIn(b'stat_result', p) 463 if proto < 4: 464 self.assertIn(b'cos\nstat_result\n', p) 465 unpickled = pickle.loads(p) 466 self.assertEqual(result, unpickled) 467 468 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()') 469 def test_statvfs_attributes(self): 470 result = os.statvfs(self.fname) 471 472 # Make sure direct access works 473 self.assertEqual(result.f_bfree, result[3]) 474 475 # Make sure all the attributes are there. 476 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files', 477 'ffree', 'favail', 'flag', 'namemax') 478 for value, member in enumerate(members): 479 self.assertEqual(getattr(result, 'f_' + member), result[value]) 480 481 self.assertTrue(isinstance(result.f_fsid, int)) 482 483 # Test that the size of the tuple doesn't change 484 self.assertEqual(len(result), 10) 485 486 # Make sure that assignment really fails 487 try: 488 result.f_bfree = 1 489 self.fail("No exception raised") 490 except AttributeError: 491 pass 492 493 try: 494 result.parrot = 1 495 self.fail("No exception raised") 496 except AttributeError: 497 pass 498 499 # Use the constructor with a too-short tuple. 500 try: 501 result2 = os.statvfs_result((10,)) 502 self.fail("No exception raised") 503 except TypeError: 504 pass 505 506 # Use the constructor with a too-long tuple. 507 try: 508 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14)) 509 except TypeError: 510 pass 511 512 @unittest.skipUnless(hasattr(os, 'statvfs'), 513 "need os.statvfs()") 514 def test_statvfs_result_pickle(self): 515 result = os.statvfs(self.fname) 516 517 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 518 p = pickle.dumps(result, proto) 519 self.assertIn(b'statvfs_result', p) 520 if proto < 4: 521 self.assertIn(b'cos\nstatvfs_result\n', p) 522 unpickled = pickle.loads(p) 523 self.assertEqual(result, unpickled) 524 525 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 526 def test_1686475(self): 527 # Verify that an open file can be stat'ed 528 try: 529 os.stat(r"c:\pagefile.sys") 530 except FileNotFoundError: 531 self.skipTest(r'c:\pagefile.sys does not exist') 532 except OSError as e: 533 self.fail("Could not stat pagefile.sys") 534 535 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 536 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") 537 def test_15261(self): 538 # Verify that stat'ing a closed fd does not cause crash 539 r, w = os.pipe() 540 try: 541 os.stat(r) # should not raise error 542 finally: 543 os.close(r) 544 os.close(w) 545 with self.assertRaises(OSError) as ctx: 546 os.stat(r) 547 self.assertEqual(ctx.exception.errno, errno.EBADF) 548 549 def check_file_attributes(self, result): 550 self.assertTrue(hasattr(result, 'st_file_attributes')) 551 self.assertTrue(isinstance(result.st_file_attributes, int)) 552 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF) 553 554 @unittest.skipUnless(sys.platform == "win32", 555 "st_file_attributes is Win32 specific") 556 def test_file_attributes(self): 557 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set) 558 result = os.stat(self.fname) 559 self.check_file_attributes(result) 560 self.assertEqual( 561 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY, 562 0) 563 564 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set) 565 dirname = support.TESTFN + "dir" 566 os.mkdir(dirname) 567 self.addCleanup(os.rmdir, dirname) 568 569 result = os.stat(dirname) 570 self.check_file_attributes(result) 571 self.assertEqual( 572 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY, 573 stat.FILE_ATTRIBUTE_DIRECTORY) 574 575 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 576 def test_access_denied(self): 577 # Default to FindFirstFile WIN32_FIND_DATA when access is 578 # denied. See issue 28075. 579 # os.environ['TEMP'] should be located on a volume that 580 # supports file ACLs. 581 fname = os.path.join(os.environ['TEMP'], self.fname) 582 self.addCleanup(support.unlink, fname) 583 create_file(fname, b'ABC') 584 # Deny the right to [S]YNCHRONIZE on the file to 585 # force CreateFile to fail with ERROR_ACCESS_DENIED. 586 DETACHED_PROCESS = 8 587 subprocess.check_call( 588 # bpo-30584: Use security identifier *S-1-5-32-545 instead 589 # of localized "Users" to not depend on the locale. 590 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'], 591 creationflags=DETACHED_PROCESS 592 ) 593 result = os.stat(fname) 594 self.assertNotEqual(result.st_size, 0) 595 596 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 597 def test_stat_block_device(self): 598 # bpo-38030: os.stat fails for block devices 599 # Test a filename like "//./C:" 600 fname = "//./" + os.path.splitdrive(os.getcwd())[0] 601 result = os.stat(fname) 602 self.assertEqual(result.st_mode, stat.S_IFBLK) 603 604 605class UtimeTests(unittest.TestCase): 606 def setUp(self): 607 self.dirname = support.TESTFN 608 self.fname = os.path.join(self.dirname, "f1") 609 610 self.addCleanup(support.rmtree, self.dirname) 611 os.mkdir(self.dirname) 612 create_file(self.fname) 613 614 def support_subsecond(self, filename): 615 # Heuristic to check if the filesystem supports timestamp with 616 # subsecond resolution: check if float and int timestamps are different 617 st = os.stat(filename) 618 return ((st.st_atime != st[7]) 619 or (st.st_mtime != st[8]) 620 or (st.st_ctime != st[9])) 621 622 def _test_utime(self, set_time, filename=None): 623 if not filename: 624 filename = self.fname 625 626 support_subsecond = self.support_subsecond(filename) 627 if support_subsecond: 628 # Timestamp with a resolution of 1 microsecond (10^-6). 629 # 630 # The resolution of the C internal function used by os.utime() 631 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable 632 # test with a resolution of 1 ns requires more work: 633 # see the issue #15745. 634 atime_ns = 1002003000 # 1.002003 seconds 635 mtime_ns = 4005006000 # 4.005006 seconds 636 else: 637 # use a resolution of 1 second 638 atime_ns = 5 * 10**9 639 mtime_ns = 8 * 10**9 640 641 set_time(filename, (atime_ns, mtime_ns)) 642 st = os.stat(filename) 643 644 if support_subsecond: 645 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6) 646 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6) 647 else: 648 self.assertEqual(st.st_atime, atime_ns * 1e-9) 649 self.assertEqual(st.st_mtime, mtime_ns * 1e-9) 650 self.assertEqual(st.st_atime_ns, atime_ns) 651 self.assertEqual(st.st_mtime_ns, mtime_ns) 652 653 def test_utime(self): 654 def set_time(filename, ns): 655 # test the ns keyword parameter 656 os.utime(filename, ns=ns) 657 self._test_utime(set_time) 658 659 @staticmethod 660 def ns_to_sec(ns): 661 # Convert a number of nanosecond (int) to a number of seconds (float). 662 # Round towards infinity by adding 0.5 nanosecond to avoid rounding 663 # issue, os.utime() rounds towards minus infinity. 664 return (ns * 1e-9) + 0.5e-9 665 666 def test_utime_by_indexed(self): 667 # pass times as floating point seconds as the second indexed parameter 668 def set_time(filename, ns): 669 atime_ns, mtime_ns = ns 670 atime = self.ns_to_sec(atime_ns) 671 mtime = self.ns_to_sec(mtime_ns) 672 # test utimensat(timespec), utimes(timeval), utime(utimbuf) 673 # or utime(time_t) 674 os.utime(filename, (atime, mtime)) 675 self._test_utime(set_time) 676 677 def test_utime_by_times(self): 678 def set_time(filename, ns): 679 atime_ns, mtime_ns = ns 680 atime = self.ns_to_sec(atime_ns) 681 mtime = self.ns_to_sec(mtime_ns) 682 # test the times keyword parameter 683 os.utime(filename, times=(atime, mtime)) 684 self._test_utime(set_time) 685 686 @unittest.skipUnless(os.utime in os.supports_follow_symlinks, 687 "follow_symlinks support for utime required " 688 "for this test.") 689 def test_utime_nofollow_symlinks(self): 690 def set_time(filename, ns): 691 # use follow_symlinks=False to test utimensat(timespec) 692 # or lutimes(timeval) 693 os.utime(filename, ns=ns, follow_symlinks=False) 694 self._test_utime(set_time) 695 696 @unittest.skipUnless(os.utime in os.supports_fd, 697 "fd support for utime required for this test.") 698 def test_utime_fd(self): 699 def set_time(filename, ns): 700 with open(filename, 'wb', 0) as fp: 701 # use a file descriptor to test futimens(timespec) 702 # or futimes(timeval) 703 os.utime(fp.fileno(), ns=ns) 704 self._test_utime(set_time) 705 706 @unittest.skipUnless(os.utime in os.supports_dir_fd, 707 "dir_fd support for utime required for this test.") 708 def test_utime_dir_fd(self): 709 def set_time(filename, ns): 710 dirname, name = os.path.split(filename) 711 dirfd = os.open(dirname, os.O_RDONLY) 712 try: 713 # pass dir_fd to test utimensat(timespec) or futimesat(timeval) 714 os.utime(name, dir_fd=dirfd, ns=ns) 715 finally: 716 os.close(dirfd) 717 self._test_utime(set_time) 718 719 def test_utime_directory(self): 720 def set_time(filename, ns): 721 # test calling os.utime() on a directory 722 os.utime(filename, ns=ns) 723 self._test_utime(set_time, filename=self.dirname) 724 725 def _test_utime_current(self, set_time): 726 # Get the system clock 727 current = time.time() 728 729 # Call os.utime() to set the timestamp to the current system clock 730 set_time(self.fname) 731 732 if not self.support_subsecond(self.fname): 733 delta = 1.0 734 else: 735 # On Windows, the usual resolution of time.time() is 15.6 ms. 736 # bpo-30649: Tolerate 50 ms for slow Windows buildbots. 737 # 738 # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use 739 # also 50 ms on other platforms. 740 delta = 0.050 741 st = os.stat(self.fname) 742 msg = ("st_time=%r, current=%r, dt=%r" 743 % (st.st_mtime, current, st.st_mtime - current)) 744 self.assertAlmostEqual(st.st_mtime, current, 745 delta=delta, msg=msg) 746 747 def test_utime_current(self): 748 def set_time(filename): 749 # Set to the current time in the new way 750 os.utime(self.fname) 751 self._test_utime_current(set_time) 752 753 def test_utime_current_old(self): 754 def set_time(filename): 755 # Set to the current time in the old explicit way. 756 os.utime(self.fname, None) 757 self._test_utime_current(set_time) 758 759 def get_file_system(self, path): 760 if sys.platform == 'win32': 761 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\' 762 import ctypes 763 kernel32 = ctypes.windll.kernel32 764 buf = ctypes.create_unicode_buffer("", 100) 765 ok = kernel32.GetVolumeInformationW(root, None, 0, 766 None, None, None, 767 buf, len(buf)) 768 if ok: 769 return buf.value 770 # return None if the filesystem is unknown 771 772 def test_large_time(self): 773 # Many filesystems are limited to the year 2038. At least, the test 774 # pass with NTFS filesystem. 775 if self.get_file_system(self.dirname) != "NTFS": 776 self.skipTest("requires NTFS") 777 778 large = 5000000000 # some day in 2128 779 os.utime(self.fname, (large, large)) 780 self.assertEqual(os.stat(self.fname).st_mtime, large) 781 782 def test_utime_invalid_arguments(self): 783 # seconds and nanoseconds parameters are mutually exclusive 784 with self.assertRaises(ValueError): 785 os.utime(self.fname, (5, 5), ns=(5, 5)) 786 with self.assertRaises(TypeError): 787 os.utime(self.fname, [5, 5]) 788 with self.assertRaises(TypeError): 789 os.utime(self.fname, (5,)) 790 with self.assertRaises(TypeError): 791 os.utime(self.fname, (5, 5, 5)) 792 with self.assertRaises(TypeError): 793 os.utime(self.fname, ns=[5, 5]) 794 with self.assertRaises(TypeError): 795 os.utime(self.fname, ns=(5,)) 796 with self.assertRaises(TypeError): 797 os.utime(self.fname, ns=(5, 5, 5)) 798 799 if os.utime not in os.supports_follow_symlinks: 800 with self.assertRaises(NotImplementedError): 801 os.utime(self.fname, (5, 5), follow_symlinks=False) 802 if os.utime not in os.supports_fd: 803 with open(self.fname, 'wb', 0) as fp: 804 with self.assertRaises(TypeError): 805 os.utime(fp.fileno(), (5, 5)) 806 if os.utime not in os.supports_dir_fd: 807 with self.assertRaises(NotImplementedError): 808 os.utime(self.fname, (5, 5), dir_fd=0) 809 810 @support.cpython_only 811 def test_issue31577(self): 812 # The interpreter shouldn't crash in case utime() received a bad 813 # ns argument. 814 def get_bad_int(divmod_ret_val): 815 class BadInt: 816 def __divmod__(*args): 817 return divmod_ret_val 818 return BadInt() 819 with self.assertRaises(TypeError): 820 os.utime(self.fname, ns=(get_bad_int(42), 1)) 821 with self.assertRaises(TypeError): 822 os.utime(self.fname, ns=(get_bad_int(()), 1)) 823 with self.assertRaises(TypeError): 824 os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1)) 825 826 827from test import mapping_tests 828 829class EnvironTests(mapping_tests.BasicTestMappingProtocol): 830 """check that os.environ object conform to mapping protocol""" 831 type2test = None 832 833 def setUp(self): 834 self.__save = dict(os.environ) 835 if os.supports_bytes_environ: 836 self.__saveb = dict(os.environb) 837 for key, value in self._reference().items(): 838 os.environ[key] = value 839 840 def tearDown(self): 841 os.environ.clear() 842 os.environ.update(self.__save) 843 if os.supports_bytes_environ: 844 os.environb.clear() 845 os.environb.update(self.__saveb) 846 847 def _reference(self): 848 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"} 849 850 def _empty_mapping(self): 851 os.environ.clear() 852 return os.environ 853 854 # Bug 1110478 855 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell), 856 'requires a shell') 857 def test_update2(self): 858 os.environ.clear() 859 os.environ.update(HELLO="World") 860 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen: 861 value = popen.read().strip() 862 self.assertEqual(value, "World") 863 864 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell), 865 'requires a shell') 866 def test_os_popen_iter(self): 867 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'" 868 % unix_shell) as popen: 869 it = iter(popen) 870 self.assertEqual(next(it), "line1\n") 871 self.assertEqual(next(it), "line2\n") 872 self.assertEqual(next(it), "line3\n") 873 self.assertRaises(StopIteration, next, it) 874 875 # Verify environ keys and values from the OS are of the 876 # correct str type. 877 def test_keyvalue_types(self): 878 for key, val in os.environ.items(): 879 self.assertEqual(type(key), str) 880 self.assertEqual(type(val), str) 881 882 def test_items(self): 883 for key, value in self._reference().items(): 884 self.assertEqual(os.environ.get(key), value) 885 886 # Issue 7310 887 def test___repr__(self): 888 """Check that the repr() of os.environ looks like environ({...}).""" 889 env = os.environ 890 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join( 891 '{!r}: {!r}'.format(key, value) 892 for key, value in env.items()))) 893 894 def test_get_exec_path(self): 895 defpath_list = os.defpath.split(os.pathsep) 896 test_path = ['/monty', '/python', '', '/flying/circus'] 897 test_env = {'PATH': os.pathsep.join(test_path)} 898 899 saved_environ = os.environ 900 try: 901 os.environ = dict(test_env) 902 # Test that defaulting to os.environ works. 903 self.assertSequenceEqual(test_path, os.get_exec_path()) 904 self.assertSequenceEqual(test_path, os.get_exec_path(env=None)) 905 finally: 906 os.environ = saved_environ 907 908 # No PATH environment variable 909 self.assertSequenceEqual(defpath_list, os.get_exec_path({})) 910 # Empty PATH environment variable 911 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''})) 912 # Supplied PATH environment variable 913 self.assertSequenceEqual(test_path, os.get_exec_path(test_env)) 914 915 if os.supports_bytes_environ: 916 # env cannot contain 'PATH' and b'PATH' keys 917 try: 918 # ignore BytesWarning warning 919 with warnings.catch_warnings(record=True): 920 mixed_env = {'PATH': '1', b'PATH': b'2'} 921 except BytesWarning: 922 # mixed_env cannot be created with python -bb 923 pass 924 else: 925 self.assertRaises(ValueError, os.get_exec_path, mixed_env) 926 927 # bytes key and/or value 928 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}), 929 ['abc']) 930 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}), 931 ['abc']) 932 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}), 933 ['abc']) 934 935 @unittest.skipUnless(os.supports_bytes_environ, 936 "os.environb required for this test.") 937 def test_environb(self): 938 # os.environ -> os.environb 939 value = 'euro\u20ac' 940 try: 941 value_bytes = value.encode(sys.getfilesystemencoding(), 942 'surrogateescape') 943 except UnicodeEncodeError: 944 msg = "U+20AC character is not encodable to %s" % ( 945 sys.getfilesystemencoding(),) 946 self.skipTest(msg) 947 os.environ['unicode'] = value 948 self.assertEqual(os.environ['unicode'], value) 949 self.assertEqual(os.environb[b'unicode'], value_bytes) 950 951 # os.environb -> os.environ 952 value = b'\xff' 953 os.environb[b'bytes'] = value 954 self.assertEqual(os.environb[b'bytes'], value) 955 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape') 956 self.assertEqual(os.environ['bytes'], value_str) 957 958 def test_putenv_unsetenv(self): 959 name = "PYTHONTESTVAR" 960 value = "testvalue" 961 code = f'import os; print(repr(os.environ.get({name!r})))' 962 963 with support.EnvironmentVarGuard() as env: 964 env.pop(name, None) 965 966 os.putenv(name, value) 967 proc = subprocess.run([sys.executable, '-c', code], check=True, 968 stdout=subprocess.PIPE, text=True) 969 self.assertEqual(proc.stdout.rstrip(), repr(value)) 970 971 os.unsetenv(name) 972 proc = subprocess.run([sys.executable, '-c', code], check=True, 973 stdout=subprocess.PIPE, text=True) 974 self.assertEqual(proc.stdout.rstrip(), repr(None)) 975 976 # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415). 977 @support.requires_mac_ver(10, 6) 978 def test_putenv_unsetenv_error(self): 979 # Empty variable name is invalid. 980 # "=" and null character are not allowed in a variable name. 981 for name in ('', '=name', 'na=me', 'name=', 'name\0', 'na\0me'): 982 self.assertRaises((OSError, ValueError), os.putenv, name, "value") 983 self.assertRaises((OSError, ValueError), os.unsetenv, name) 984 985 if sys.platform == "win32": 986 # On Windows, an environment variable string ("name=value" string) 987 # is limited to 32,767 characters 988 longstr = 'x' * 32_768 989 self.assertRaises(ValueError, os.putenv, longstr, "1") 990 self.assertRaises(ValueError, os.putenv, "X", longstr) 991 self.assertRaises(ValueError, os.unsetenv, longstr) 992 993 def test_key_type(self): 994 missing = 'missingkey' 995 self.assertNotIn(missing, os.environ) 996 997 with self.assertRaises(KeyError) as cm: 998 os.environ[missing] 999 self.assertIs(cm.exception.args[0], missing) 1000 self.assertTrue(cm.exception.__suppress_context__) 1001 1002 with self.assertRaises(KeyError) as cm: 1003 del os.environ[missing] 1004 self.assertIs(cm.exception.args[0], missing) 1005 self.assertTrue(cm.exception.__suppress_context__) 1006 1007 def _test_environ_iteration(self, collection): 1008 iterator = iter(collection) 1009 new_key = "__new_key__" 1010 1011 next(iterator) # start iteration over os.environ.items 1012 1013 # add a new key in os.environ mapping 1014 os.environ[new_key] = "test_environ_iteration" 1015 1016 try: 1017 next(iterator) # force iteration over modified mapping 1018 self.assertEqual(os.environ[new_key], "test_environ_iteration") 1019 finally: 1020 del os.environ[new_key] 1021 1022 def test_iter_error_when_changing_os_environ(self): 1023 self._test_environ_iteration(os.environ) 1024 1025 def test_iter_error_when_changing_os_environ_items(self): 1026 self._test_environ_iteration(os.environ.items()) 1027 1028 def test_iter_error_when_changing_os_environ_values(self): 1029 self._test_environ_iteration(os.environ.values()) 1030 1031 def _test_underlying_process_env(self, var, expected): 1032 if not (unix_shell and os.path.exists(unix_shell)): 1033 return 1034 1035 with os.popen(f"{unix_shell} -c 'echo ${var}'") as popen: 1036 value = popen.read().strip() 1037 1038 self.assertEqual(expected, value) 1039 1040 def test_or_operator(self): 1041 overridden_key = '_TEST_VAR_' 1042 original_value = 'original_value' 1043 os.environ[overridden_key] = original_value 1044 1045 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'} 1046 expected = dict(os.environ) 1047 expected.update(new_vars_dict) 1048 1049 actual = os.environ | new_vars_dict 1050 self.assertDictEqual(expected, actual) 1051 self.assertEqual('3', actual[overridden_key]) 1052 1053 new_vars_items = new_vars_dict.items() 1054 self.assertIs(NotImplemented, os.environ.__or__(new_vars_items)) 1055 1056 self._test_underlying_process_env('_A_', '') 1057 self._test_underlying_process_env(overridden_key, original_value) 1058 1059 def test_ior_operator(self): 1060 overridden_key = '_TEST_VAR_' 1061 os.environ[overridden_key] = 'original_value' 1062 1063 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'} 1064 expected = dict(os.environ) 1065 expected.update(new_vars_dict) 1066 1067 os.environ |= new_vars_dict 1068 self.assertEqual(expected, os.environ) 1069 self.assertEqual('3', os.environ[overridden_key]) 1070 1071 self._test_underlying_process_env('_A_', '1') 1072 self._test_underlying_process_env(overridden_key, '3') 1073 1074 def test_ior_operator_invalid_dicts(self): 1075 os_environ_copy = os.environ.copy() 1076 with self.assertRaises(TypeError): 1077 dict_with_bad_key = {1: '_A_'} 1078 os.environ |= dict_with_bad_key 1079 1080 with self.assertRaises(TypeError): 1081 dict_with_bad_val = {'_A_': 1} 1082 os.environ |= dict_with_bad_val 1083 1084 # Check nothing was added. 1085 self.assertEqual(os_environ_copy, os.environ) 1086 1087 def test_ior_operator_key_value_iterable(self): 1088 overridden_key = '_TEST_VAR_' 1089 os.environ[overridden_key] = 'original_value' 1090 1091 new_vars_items = (('_A_', '1'), ('_B_', '2'), (overridden_key, '3')) 1092 expected = dict(os.environ) 1093 expected.update(new_vars_items) 1094 1095 os.environ |= new_vars_items 1096 self.assertEqual(expected, os.environ) 1097 self.assertEqual('3', os.environ[overridden_key]) 1098 1099 self._test_underlying_process_env('_A_', '1') 1100 self._test_underlying_process_env(overridden_key, '3') 1101 1102 def test_ror_operator(self): 1103 overridden_key = '_TEST_VAR_' 1104 original_value = 'original_value' 1105 os.environ[overridden_key] = original_value 1106 1107 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'} 1108 expected = dict(new_vars_dict) 1109 expected.update(os.environ) 1110 1111 actual = new_vars_dict | os.environ 1112 self.assertDictEqual(expected, actual) 1113 self.assertEqual(original_value, actual[overridden_key]) 1114 1115 new_vars_items = new_vars_dict.items() 1116 self.assertIs(NotImplemented, os.environ.__ror__(new_vars_items)) 1117 1118 self._test_underlying_process_env('_A_', '') 1119 self._test_underlying_process_env(overridden_key, original_value) 1120 1121 1122class WalkTests(unittest.TestCase): 1123 """Tests for os.walk().""" 1124 1125 # Wrapper to hide minor differences between os.walk and os.fwalk 1126 # to tests both functions with the same code base 1127 def walk(self, top, **kwargs): 1128 if 'follow_symlinks' in kwargs: 1129 kwargs['followlinks'] = kwargs.pop('follow_symlinks') 1130 return os.walk(top, **kwargs) 1131 1132 def setUp(self): 1133 join = os.path.join 1134 self.addCleanup(support.rmtree, support.TESTFN) 1135 1136 # Build: 1137 # TESTFN/ 1138 # TEST1/ a file kid and two directory kids 1139 # tmp1 1140 # SUB1/ a file kid and a directory kid 1141 # tmp2 1142 # SUB11/ no kids 1143 # SUB2/ a file kid and a dirsymlink kid 1144 # tmp3 1145 # SUB21/ not readable 1146 # tmp5 1147 # link/ a symlink to TESTFN.2 1148 # broken_link 1149 # broken_link2 1150 # broken_link3 1151 # TEST2/ 1152 # tmp4 a lone file 1153 self.walk_path = join(support.TESTFN, "TEST1") 1154 self.sub1_path = join(self.walk_path, "SUB1") 1155 self.sub11_path = join(self.sub1_path, "SUB11") 1156 sub2_path = join(self.walk_path, "SUB2") 1157 sub21_path = join(sub2_path, "SUB21") 1158 tmp1_path = join(self.walk_path, "tmp1") 1159 tmp2_path = join(self.sub1_path, "tmp2") 1160 tmp3_path = join(sub2_path, "tmp3") 1161 tmp5_path = join(sub21_path, "tmp3") 1162 self.link_path = join(sub2_path, "link") 1163 t2_path = join(support.TESTFN, "TEST2") 1164 tmp4_path = join(support.TESTFN, "TEST2", "tmp4") 1165 broken_link_path = join(sub2_path, "broken_link") 1166 broken_link2_path = join(sub2_path, "broken_link2") 1167 broken_link3_path = join(sub2_path, "broken_link3") 1168 1169 # Create stuff. 1170 os.makedirs(self.sub11_path) 1171 os.makedirs(sub2_path) 1172 os.makedirs(sub21_path) 1173 os.makedirs(t2_path) 1174 1175 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path: 1176 with open(path, "x", encoding='utf-8') as f: 1177 f.write("I'm " + path + " and proud of it. Blame test_os.\n") 1178 1179 if support.can_symlink(): 1180 os.symlink(os.path.abspath(t2_path), self.link_path) 1181 os.symlink('broken', broken_link_path, True) 1182 os.symlink(join('tmp3', 'broken'), broken_link2_path, True) 1183 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True) 1184 self.sub2_tree = (sub2_path, ["SUB21", "link"], 1185 ["broken_link", "broken_link2", "broken_link3", 1186 "tmp3"]) 1187 else: 1188 self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"]) 1189 1190 os.chmod(sub21_path, 0) 1191 try: 1192 os.listdir(sub21_path) 1193 except PermissionError: 1194 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU) 1195 else: 1196 os.chmod(sub21_path, stat.S_IRWXU) 1197 os.unlink(tmp5_path) 1198 os.rmdir(sub21_path) 1199 del self.sub2_tree[1][:1] 1200 1201 def test_walk_topdown(self): 1202 # Walk top-down. 1203 all = list(self.walk(self.walk_path)) 1204 1205 self.assertEqual(len(all), 4) 1206 # We can't know which order SUB1 and SUB2 will appear in. 1207 # Not flipped: TESTFN, SUB1, SUB11, SUB2 1208 # flipped: TESTFN, SUB2, SUB1, SUB11 1209 flipped = all[0][1][0] != "SUB1" 1210 all[0][1].sort() 1211 all[3 - 2 * flipped][-1].sort() 1212 all[3 - 2 * flipped][1].sort() 1213 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"])) 1214 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"])) 1215 self.assertEqual(all[2 + flipped], (self.sub11_path, [], [])) 1216 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree) 1217 1218 def test_walk_prune(self, walk_path=None): 1219 if walk_path is None: 1220 walk_path = self.walk_path 1221 # Prune the search. 1222 all = [] 1223 for root, dirs, files in self.walk(walk_path): 1224 all.append((root, dirs, files)) 1225 # Don't descend into SUB1. 1226 if 'SUB1' in dirs: 1227 # Note that this also mutates the dirs we appended to all! 1228 dirs.remove('SUB1') 1229 1230 self.assertEqual(len(all), 2) 1231 self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"])) 1232 1233 all[1][-1].sort() 1234 all[1][1].sort() 1235 self.assertEqual(all[1], self.sub2_tree) 1236 1237 def test_file_like_path(self): 1238 self.test_walk_prune(FakePath(self.walk_path)) 1239 1240 def test_walk_bottom_up(self): 1241 # Walk bottom-up. 1242 all = list(self.walk(self.walk_path, topdown=False)) 1243 1244 self.assertEqual(len(all), 4, all) 1245 # We can't know which order SUB1 and SUB2 will appear in. 1246 # Not flipped: SUB11, SUB1, SUB2, TESTFN 1247 # flipped: SUB2, SUB11, SUB1, TESTFN 1248 flipped = all[3][1][0] != "SUB1" 1249 all[3][1].sort() 1250 all[2 - 2 * flipped][-1].sort() 1251 all[2 - 2 * flipped][1].sort() 1252 self.assertEqual(all[3], 1253 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"])) 1254 self.assertEqual(all[flipped], 1255 (self.sub11_path, [], [])) 1256 self.assertEqual(all[flipped + 1], 1257 (self.sub1_path, ["SUB11"], ["tmp2"])) 1258 self.assertEqual(all[2 - 2 * flipped], 1259 self.sub2_tree) 1260 1261 def test_walk_symlink(self): 1262 if not support.can_symlink(): 1263 self.skipTest("need symlink support") 1264 1265 # Walk, following symlinks. 1266 walk_it = self.walk(self.walk_path, follow_symlinks=True) 1267 for root, dirs, files in walk_it: 1268 if root == self.link_path: 1269 self.assertEqual(dirs, []) 1270 self.assertEqual(files, ["tmp4"]) 1271 break 1272 else: 1273 self.fail("Didn't follow symlink with followlinks=True") 1274 1275 def test_walk_bad_dir(self): 1276 # Walk top-down. 1277 errors = [] 1278 walk_it = self.walk(self.walk_path, onerror=errors.append) 1279 root, dirs, files = next(walk_it) 1280 self.assertEqual(errors, []) 1281 dir1 = 'SUB1' 1282 path1 = os.path.join(root, dir1) 1283 path1new = os.path.join(root, dir1 + '.new') 1284 os.rename(path1, path1new) 1285 try: 1286 roots = [r for r, d, f in walk_it] 1287 self.assertTrue(errors) 1288 self.assertNotIn(path1, roots) 1289 self.assertNotIn(path1new, roots) 1290 for dir2 in dirs: 1291 if dir2 != dir1: 1292 self.assertIn(os.path.join(root, dir2), roots) 1293 finally: 1294 os.rename(path1new, path1) 1295 1296 def test_walk_many_open_files(self): 1297 depth = 30 1298 base = os.path.join(support.TESTFN, 'deep') 1299 p = os.path.join(base, *(['d']*depth)) 1300 os.makedirs(p) 1301 1302 iters = [self.walk(base, topdown=False) for j in range(100)] 1303 for i in range(depth + 1): 1304 expected = (p, ['d'] if i else [], []) 1305 for it in iters: 1306 self.assertEqual(next(it), expected) 1307 p = os.path.dirname(p) 1308 1309 iters = [self.walk(base, topdown=True) for j in range(100)] 1310 p = base 1311 for i in range(depth + 1): 1312 expected = (p, ['d'] if i < depth else [], []) 1313 for it in iters: 1314 self.assertEqual(next(it), expected) 1315 p = os.path.join(p, 'd') 1316 1317 1318@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()") 1319class FwalkTests(WalkTests): 1320 """Tests for os.fwalk().""" 1321 1322 def walk(self, top, **kwargs): 1323 for root, dirs, files, root_fd in self.fwalk(top, **kwargs): 1324 yield (root, dirs, files) 1325 1326 def fwalk(self, *args, **kwargs): 1327 return os.fwalk(*args, **kwargs) 1328 1329 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs): 1330 """ 1331 compare with walk() results. 1332 """ 1333 walk_kwargs = walk_kwargs.copy() 1334 fwalk_kwargs = fwalk_kwargs.copy() 1335 for topdown, follow_symlinks in itertools.product((True, False), repeat=2): 1336 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks) 1337 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks) 1338 1339 expected = {} 1340 for root, dirs, files in os.walk(**walk_kwargs): 1341 expected[root] = (set(dirs), set(files)) 1342 1343 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs): 1344 self.assertIn(root, expected) 1345 self.assertEqual(expected[root], (set(dirs), set(files))) 1346 1347 def test_compare_to_walk(self): 1348 kwargs = {'top': support.TESTFN} 1349 self._compare_to_walk(kwargs, kwargs) 1350 1351 def test_dir_fd(self): 1352 try: 1353 fd = os.open(".", os.O_RDONLY) 1354 walk_kwargs = {'top': support.TESTFN} 1355 fwalk_kwargs = walk_kwargs.copy() 1356 fwalk_kwargs['dir_fd'] = fd 1357 self._compare_to_walk(walk_kwargs, fwalk_kwargs) 1358 finally: 1359 os.close(fd) 1360 1361 def test_yields_correct_dir_fd(self): 1362 # check returned file descriptors 1363 for topdown, follow_symlinks in itertools.product((True, False), repeat=2): 1364 args = support.TESTFN, topdown, None 1365 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks): 1366 # check that the FD is valid 1367 os.fstat(rootfd) 1368 # redundant check 1369 os.stat(rootfd) 1370 # check that listdir() returns consistent information 1371 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files)) 1372 1373 def test_fd_leak(self): 1374 # Since we're opening a lot of FDs, we must be careful to avoid leaks: 1375 # we both check that calling fwalk() a large number of times doesn't 1376 # yield EMFILE, and that the minimum allocated FD hasn't changed. 1377 minfd = os.dup(1) 1378 os.close(minfd) 1379 for i in range(256): 1380 for x in self.fwalk(support.TESTFN): 1381 pass 1382 newfd = os.dup(1) 1383 self.addCleanup(os.close, newfd) 1384 self.assertEqual(newfd, minfd) 1385 1386 # fwalk() keeps file descriptors open 1387 test_walk_many_open_files = None 1388 1389 1390class BytesWalkTests(WalkTests): 1391 """Tests for os.walk() with bytes.""" 1392 def walk(self, top, **kwargs): 1393 if 'follow_symlinks' in kwargs: 1394 kwargs['followlinks'] = kwargs.pop('follow_symlinks') 1395 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs): 1396 root = os.fsdecode(broot) 1397 dirs = list(map(os.fsdecode, bdirs)) 1398 files = list(map(os.fsdecode, bfiles)) 1399 yield (root, dirs, files) 1400 bdirs[:] = list(map(os.fsencode, dirs)) 1401 bfiles[:] = list(map(os.fsencode, files)) 1402 1403@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()") 1404class BytesFwalkTests(FwalkTests): 1405 """Tests for os.walk() with bytes.""" 1406 def fwalk(self, top='.', *args, **kwargs): 1407 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs): 1408 root = os.fsdecode(broot) 1409 dirs = list(map(os.fsdecode, bdirs)) 1410 files = list(map(os.fsdecode, bfiles)) 1411 yield (root, dirs, files, topfd) 1412 bdirs[:] = list(map(os.fsencode, dirs)) 1413 bfiles[:] = list(map(os.fsencode, files)) 1414 1415 1416class MakedirTests(unittest.TestCase): 1417 def setUp(self): 1418 os.mkdir(support.TESTFN) 1419 1420 def test_makedir(self): 1421 base = support.TESTFN 1422 path = os.path.join(base, 'dir1', 'dir2', 'dir3') 1423 os.makedirs(path) # Should work 1424 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4') 1425 os.makedirs(path) 1426 1427 # Try paths with a '.' in them 1428 self.assertRaises(OSError, os.makedirs, os.curdir) 1429 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir) 1430 os.makedirs(path) 1431 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4', 1432 'dir5', 'dir6') 1433 os.makedirs(path) 1434 1435 def test_mode(self): 1436 with support.temp_umask(0o002): 1437 base = support.TESTFN 1438 parent = os.path.join(base, 'dir1') 1439 path = os.path.join(parent, 'dir2') 1440 os.makedirs(path, 0o555) 1441 self.assertTrue(os.path.exists(path)) 1442 self.assertTrue(os.path.isdir(path)) 1443 if os.name != 'nt': 1444 self.assertEqual(os.stat(path).st_mode & 0o777, 0o555) 1445 self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775) 1446 1447 def test_exist_ok_existing_directory(self): 1448 path = os.path.join(support.TESTFN, 'dir1') 1449 mode = 0o777 1450 old_mask = os.umask(0o022) 1451 os.makedirs(path, mode) 1452 self.assertRaises(OSError, os.makedirs, path, mode) 1453 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False) 1454 os.makedirs(path, 0o776, exist_ok=True) 1455 os.makedirs(path, mode=mode, exist_ok=True) 1456 os.umask(old_mask) 1457 1458 # Issue #25583: A drive root could raise PermissionError on Windows 1459 os.makedirs(os.path.abspath('/'), exist_ok=True) 1460 1461 def test_exist_ok_s_isgid_directory(self): 1462 path = os.path.join(support.TESTFN, 'dir1') 1463 S_ISGID = stat.S_ISGID 1464 mode = 0o777 1465 old_mask = os.umask(0o022) 1466 try: 1467 existing_testfn_mode = stat.S_IMODE( 1468 os.lstat(support.TESTFN).st_mode) 1469 try: 1470 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID) 1471 except PermissionError: 1472 raise unittest.SkipTest('Cannot set S_ISGID for dir.') 1473 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID): 1474 raise unittest.SkipTest('No support for S_ISGID dir mode.') 1475 # The os should apply S_ISGID from the parent dir for us, but 1476 # this test need not depend on that behavior. Be explicit. 1477 os.makedirs(path, mode | S_ISGID) 1478 # http://bugs.python.org/issue14992 1479 # Should not fail when the bit is already set. 1480 os.makedirs(path, mode, exist_ok=True) 1481 # remove the bit. 1482 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID) 1483 # May work even when the bit is not already set when demanded. 1484 os.makedirs(path, mode | S_ISGID, exist_ok=True) 1485 finally: 1486 os.umask(old_mask) 1487 1488 def test_exist_ok_existing_regular_file(self): 1489 base = support.TESTFN 1490 path = os.path.join(support.TESTFN, 'dir1') 1491 with open(path, 'w') as f: 1492 f.write('abc') 1493 self.assertRaises(OSError, os.makedirs, path) 1494 self.assertRaises(OSError, os.makedirs, path, exist_ok=False) 1495 self.assertRaises(OSError, os.makedirs, path, exist_ok=True) 1496 os.remove(path) 1497 1498 def tearDown(self): 1499 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3', 1500 'dir4', 'dir5', 'dir6') 1501 # If the tests failed, the bottom-most directory ('../dir6') 1502 # may not have been created, so we look for the outermost directory 1503 # that exists. 1504 while not os.path.exists(path) and path != support.TESTFN: 1505 path = os.path.dirname(path) 1506 1507 os.removedirs(path) 1508 1509 1510@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown") 1511class ChownFileTests(unittest.TestCase): 1512 1513 @classmethod 1514 def setUpClass(cls): 1515 os.mkdir(support.TESTFN) 1516 1517 def test_chown_uid_gid_arguments_must_be_index(self): 1518 stat = os.stat(support.TESTFN) 1519 uid = stat.st_uid 1520 gid = stat.st_gid 1521 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)): 1522 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid) 1523 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value) 1524 self.assertIsNone(os.chown(support.TESTFN, uid, gid)) 1525 self.assertIsNone(os.chown(support.TESTFN, -1, -1)) 1526 1527 @unittest.skipUnless(hasattr(os, 'getgroups'), 'need os.getgroups') 1528 def test_chown_gid(self): 1529 groups = os.getgroups() 1530 if len(groups) < 2: 1531 self.skipTest("test needs at least 2 groups") 1532 1533 gid_1, gid_2 = groups[:2] 1534 uid = os.stat(support.TESTFN).st_uid 1535 1536 os.chown(support.TESTFN, uid, gid_1) 1537 gid = os.stat(support.TESTFN).st_gid 1538 self.assertEqual(gid, gid_1) 1539 1540 os.chown(support.TESTFN, uid, gid_2) 1541 gid = os.stat(support.TESTFN).st_gid 1542 self.assertEqual(gid, gid_2) 1543 1544 @unittest.skipUnless(root_in_posix and len(all_users) > 1, 1545 "test needs root privilege and more than one user") 1546 def test_chown_with_root(self): 1547 uid_1, uid_2 = all_users[:2] 1548 gid = os.stat(support.TESTFN).st_gid 1549 os.chown(support.TESTFN, uid_1, gid) 1550 uid = os.stat(support.TESTFN).st_uid 1551 self.assertEqual(uid, uid_1) 1552 os.chown(support.TESTFN, uid_2, gid) 1553 uid = os.stat(support.TESTFN).st_uid 1554 self.assertEqual(uid, uid_2) 1555 1556 @unittest.skipUnless(not root_in_posix and len(all_users) > 1, 1557 "test needs non-root account and more than one user") 1558 def test_chown_without_permission(self): 1559 uid_1, uid_2 = all_users[:2] 1560 gid = os.stat(support.TESTFN).st_gid 1561 with self.assertRaises(PermissionError): 1562 os.chown(support.TESTFN, uid_1, gid) 1563 os.chown(support.TESTFN, uid_2, gid) 1564 1565 @classmethod 1566 def tearDownClass(cls): 1567 os.rmdir(support.TESTFN) 1568 1569 1570class RemoveDirsTests(unittest.TestCase): 1571 def setUp(self): 1572 os.makedirs(support.TESTFN) 1573 1574 def tearDown(self): 1575 support.rmtree(support.TESTFN) 1576 1577 def test_remove_all(self): 1578 dira = os.path.join(support.TESTFN, 'dira') 1579 os.mkdir(dira) 1580 dirb = os.path.join(dira, 'dirb') 1581 os.mkdir(dirb) 1582 os.removedirs(dirb) 1583 self.assertFalse(os.path.exists(dirb)) 1584 self.assertFalse(os.path.exists(dira)) 1585 self.assertFalse(os.path.exists(support.TESTFN)) 1586 1587 def test_remove_partial(self): 1588 dira = os.path.join(support.TESTFN, 'dira') 1589 os.mkdir(dira) 1590 dirb = os.path.join(dira, 'dirb') 1591 os.mkdir(dirb) 1592 create_file(os.path.join(dira, 'file.txt')) 1593 os.removedirs(dirb) 1594 self.assertFalse(os.path.exists(dirb)) 1595 self.assertTrue(os.path.exists(dira)) 1596 self.assertTrue(os.path.exists(support.TESTFN)) 1597 1598 def test_remove_nothing(self): 1599 dira = os.path.join(support.TESTFN, 'dira') 1600 os.mkdir(dira) 1601 dirb = os.path.join(dira, 'dirb') 1602 os.mkdir(dirb) 1603 create_file(os.path.join(dirb, 'file.txt')) 1604 with self.assertRaises(OSError): 1605 os.removedirs(dirb) 1606 self.assertTrue(os.path.exists(dirb)) 1607 self.assertTrue(os.path.exists(dira)) 1608 self.assertTrue(os.path.exists(support.TESTFN)) 1609 1610 1611class DevNullTests(unittest.TestCase): 1612 def test_devnull(self): 1613 with open(os.devnull, 'wb', 0) as f: 1614 f.write(b'hello') 1615 f.close() 1616 with open(os.devnull, 'rb') as f: 1617 self.assertEqual(f.read(), b'') 1618 1619 1620class URandomTests(unittest.TestCase): 1621 def test_urandom_length(self): 1622 self.assertEqual(len(os.urandom(0)), 0) 1623 self.assertEqual(len(os.urandom(1)), 1) 1624 self.assertEqual(len(os.urandom(10)), 10) 1625 self.assertEqual(len(os.urandom(100)), 100) 1626 self.assertEqual(len(os.urandom(1000)), 1000) 1627 1628 def test_urandom_value(self): 1629 data1 = os.urandom(16) 1630 self.assertIsInstance(data1, bytes) 1631 data2 = os.urandom(16) 1632 self.assertNotEqual(data1, data2) 1633 1634 def get_urandom_subprocess(self, count): 1635 code = '\n'.join(( 1636 'import os, sys', 1637 'data = os.urandom(%s)' % count, 1638 'sys.stdout.buffer.write(data)', 1639 'sys.stdout.buffer.flush()')) 1640 out = assert_python_ok('-c', code) 1641 stdout = out[1] 1642 self.assertEqual(len(stdout), count) 1643 return stdout 1644 1645 def test_urandom_subprocess(self): 1646 data1 = self.get_urandom_subprocess(16) 1647 data2 = self.get_urandom_subprocess(16) 1648 self.assertNotEqual(data1, data2) 1649 1650 1651@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()') 1652class GetRandomTests(unittest.TestCase): 1653 @classmethod 1654 def setUpClass(cls): 1655 try: 1656 os.getrandom(1) 1657 except OSError as exc: 1658 if exc.errno == errno.ENOSYS: 1659 # Python compiled on a more recent Linux version 1660 # than the current Linux kernel 1661 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS") 1662 else: 1663 raise 1664 1665 def test_getrandom_type(self): 1666 data = os.getrandom(16) 1667 self.assertIsInstance(data, bytes) 1668 self.assertEqual(len(data), 16) 1669 1670 def test_getrandom0(self): 1671 empty = os.getrandom(0) 1672 self.assertEqual(empty, b'') 1673 1674 def test_getrandom_random(self): 1675 self.assertTrue(hasattr(os, 'GRND_RANDOM')) 1676 1677 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare 1678 # resource /dev/random 1679 1680 def test_getrandom_nonblock(self): 1681 # The call must not fail. Check also that the flag exists 1682 try: 1683 os.getrandom(1, os.GRND_NONBLOCK) 1684 except BlockingIOError: 1685 # System urandom is not initialized yet 1686 pass 1687 1688 def test_getrandom_value(self): 1689 data1 = os.getrandom(16) 1690 data2 = os.getrandom(16) 1691 self.assertNotEqual(data1, data2) 1692 1693 1694# os.urandom() doesn't use a file descriptor when it is implemented with the 1695# getentropy() function, the getrandom() function or the getrandom() syscall 1696OS_URANDOM_DONT_USE_FD = ( 1697 sysconfig.get_config_var('HAVE_GETENTROPY') == 1 1698 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1 1699 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1) 1700 1701@unittest.skipIf(OS_URANDOM_DONT_USE_FD , 1702 "os.random() does not use a file descriptor") 1703@unittest.skipIf(sys.platform == "vxworks", 1704 "VxWorks can't set RLIMIT_NOFILE to 1") 1705class URandomFDTests(unittest.TestCase): 1706 @unittest.skipUnless(resource, "test requires the resource module") 1707 def test_urandom_failure(self): 1708 # Check urandom() failing when it is not able to open /dev/random. 1709 # We spawn a new process to make the test more robust (if getrlimit() 1710 # failed to restore the file descriptor limit after this, the whole 1711 # test suite would crash; this actually happened on the OS X Tiger 1712 # buildbot). 1713 code = """if 1: 1714 import errno 1715 import os 1716 import resource 1717 1718 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE) 1719 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit)) 1720 try: 1721 os.urandom(16) 1722 except OSError as e: 1723 assert e.errno == errno.EMFILE, e.errno 1724 else: 1725 raise AssertionError("OSError not raised") 1726 """ 1727 assert_python_ok('-c', code) 1728 1729 def test_urandom_fd_closed(self): 1730 # Issue #21207: urandom() should reopen its fd to /dev/urandom if 1731 # closed. 1732 code = """if 1: 1733 import os 1734 import sys 1735 import test.support 1736 os.urandom(4) 1737 with test.support.SuppressCrashReport(): 1738 os.closerange(3, 256) 1739 sys.stdout.buffer.write(os.urandom(4)) 1740 """ 1741 rc, out, err = assert_python_ok('-Sc', code) 1742 1743 def test_urandom_fd_reopened(self): 1744 # Issue #21207: urandom() should detect its fd to /dev/urandom 1745 # changed to something else, and reopen it. 1746 self.addCleanup(support.unlink, support.TESTFN) 1747 create_file(support.TESTFN, b"x" * 256) 1748 1749 code = """if 1: 1750 import os 1751 import sys 1752 import test.support 1753 os.urandom(4) 1754 with test.support.SuppressCrashReport(): 1755 for fd in range(3, 256): 1756 try: 1757 os.close(fd) 1758 except OSError: 1759 pass 1760 else: 1761 # Found the urandom fd (XXX hopefully) 1762 break 1763 os.closerange(3, 256) 1764 with open({TESTFN!r}, 'rb') as f: 1765 new_fd = f.fileno() 1766 # Issue #26935: posix allows new_fd and fd to be equal but 1767 # some libc implementations have dup2 return an error in this 1768 # case. 1769 if new_fd != fd: 1770 os.dup2(new_fd, fd) 1771 sys.stdout.buffer.write(os.urandom(4)) 1772 sys.stdout.buffer.write(os.urandom(4)) 1773 """.format(TESTFN=support.TESTFN) 1774 rc, out, err = assert_python_ok('-Sc', code) 1775 self.assertEqual(len(out), 8) 1776 self.assertNotEqual(out[0:4], out[4:8]) 1777 rc, out2, err2 = assert_python_ok('-Sc', code) 1778 self.assertEqual(len(out2), 8) 1779 self.assertNotEqual(out2, out) 1780 1781 1782@contextlib.contextmanager 1783def _execvpe_mockup(defpath=None): 1784 """ 1785 Stubs out execv and execve functions when used as context manager. 1786 Records exec calls. The mock execv and execve functions always raise an 1787 exception as they would normally never return. 1788 """ 1789 # A list of tuples containing (function name, first arg, args) 1790 # of calls to execv or execve that have been made. 1791 calls = [] 1792 1793 def mock_execv(name, *args): 1794 calls.append(('execv', name, args)) 1795 raise RuntimeError("execv called") 1796 1797 def mock_execve(name, *args): 1798 calls.append(('execve', name, args)) 1799 raise OSError(errno.ENOTDIR, "execve called") 1800 1801 try: 1802 orig_execv = os.execv 1803 orig_execve = os.execve 1804 orig_defpath = os.defpath 1805 os.execv = mock_execv 1806 os.execve = mock_execve 1807 if defpath is not None: 1808 os.defpath = defpath 1809 yield calls 1810 finally: 1811 os.execv = orig_execv 1812 os.execve = orig_execve 1813 os.defpath = orig_defpath 1814 1815@unittest.skipUnless(hasattr(os, 'execv'), 1816 "need os.execv()") 1817class ExecTests(unittest.TestCase): 1818 @unittest.skipIf(USING_LINUXTHREADS, 1819 "avoid triggering a linuxthreads bug: see issue #4970") 1820 def test_execvpe_with_bad_program(self): 1821 self.assertRaises(OSError, os.execvpe, 'no such app-', 1822 ['no such app-'], None) 1823 1824 def test_execv_with_bad_arglist(self): 1825 self.assertRaises(ValueError, os.execv, 'notepad', ()) 1826 self.assertRaises(ValueError, os.execv, 'notepad', []) 1827 self.assertRaises(ValueError, os.execv, 'notepad', ('',)) 1828 self.assertRaises(ValueError, os.execv, 'notepad', ['']) 1829 1830 def test_execvpe_with_bad_arglist(self): 1831 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None) 1832 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {}) 1833 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {}) 1834 1835 @unittest.skipUnless(hasattr(os, '_execvpe'), 1836 "No internal os._execvpe function to test.") 1837 def _test_internal_execvpe(self, test_type): 1838 program_path = os.sep + 'absolutepath' 1839 if test_type is bytes: 1840 program = b'executable' 1841 fullpath = os.path.join(os.fsencode(program_path), program) 1842 native_fullpath = fullpath 1843 arguments = [b'progname', 'arg1', 'arg2'] 1844 else: 1845 program = 'executable' 1846 arguments = ['progname', 'arg1', 'arg2'] 1847 fullpath = os.path.join(program_path, program) 1848 if os.name != "nt": 1849 native_fullpath = os.fsencode(fullpath) 1850 else: 1851 native_fullpath = fullpath 1852 env = {'spam': 'beans'} 1853 1854 # test os._execvpe() with an absolute path 1855 with _execvpe_mockup() as calls: 1856 self.assertRaises(RuntimeError, 1857 os._execvpe, fullpath, arguments) 1858 self.assertEqual(len(calls), 1) 1859 self.assertEqual(calls[0], ('execv', fullpath, (arguments,))) 1860 1861 # test os._execvpe() with a relative path: 1862 # os.get_exec_path() returns defpath 1863 with _execvpe_mockup(defpath=program_path) as calls: 1864 self.assertRaises(OSError, 1865 os._execvpe, program, arguments, env=env) 1866 self.assertEqual(len(calls), 1) 1867 self.assertSequenceEqual(calls[0], 1868 ('execve', native_fullpath, (arguments, env))) 1869 1870 # test os._execvpe() with a relative path: 1871 # os.get_exec_path() reads the 'PATH' variable 1872 with _execvpe_mockup() as calls: 1873 env_path = env.copy() 1874 if test_type is bytes: 1875 env_path[b'PATH'] = program_path 1876 else: 1877 env_path['PATH'] = program_path 1878 self.assertRaises(OSError, 1879 os._execvpe, program, arguments, env=env_path) 1880 self.assertEqual(len(calls), 1) 1881 self.assertSequenceEqual(calls[0], 1882 ('execve', native_fullpath, (arguments, env_path))) 1883 1884 def test_internal_execvpe_str(self): 1885 self._test_internal_execvpe(str) 1886 if os.name != "nt": 1887 self._test_internal_execvpe(bytes) 1888 1889 def test_execve_invalid_env(self): 1890 args = [sys.executable, '-c', 'pass'] 1891 1892 # null character in the environment variable name 1893 newenv = os.environ.copy() 1894 newenv["FRUIT\0VEGETABLE"] = "cabbage" 1895 with self.assertRaises(ValueError): 1896 os.execve(args[0], args, newenv) 1897 1898 # null character in the environment variable value 1899 newenv = os.environ.copy() 1900 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage" 1901 with self.assertRaises(ValueError): 1902 os.execve(args[0], args, newenv) 1903 1904 # equal character in the environment variable name 1905 newenv = os.environ.copy() 1906 newenv["FRUIT=ORANGE"] = "lemon" 1907 with self.assertRaises(ValueError): 1908 os.execve(args[0], args, newenv) 1909 1910 @unittest.skipUnless(sys.platform == "win32", "Win32-specific test") 1911 def test_execve_with_empty_path(self): 1912 # bpo-32890: Check GetLastError() misuse 1913 try: 1914 os.execve('', ['arg'], {}) 1915 except OSError as e: 1916 self.assertTrue(e.winerror is None or e.winerror != 0) 1917 else: 1918 self.fail('No OSError raised') 1919 1920 1921@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 1922class Win32ErrorTests(unittest.TestCase): 1923 def setUp(self): 1924 try: 1925 os.stat(support.TESTFN) 1926 except FileNotFoundError: 1927 exists = False 1928 except OSError as exc: 1929 exists = True 1930 self.fail("file %s must not exist; os.stat failed with %s" 1931 % (support.TESTFN, exc)) 1932 else: 1933 self.fail("file %s must not exist" % support.TESTFN) 1934 1935 def test_rename(self): 1936 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak") 1937 1938 def test_remove(self): 1939 self.assertRaises(OSError, os.remove, support.TESTFN) 1940 1941 def test_chdir(self): 1942 self.assertRaises(OSError, os.chdir, support.TESTFN) 1943 1944 def test_mkdir(self): 1945 self.addCleanup(support.unlink, support.TESTFN) 1946 1947 with open(support.TESTFN, "x") as f: 1948 self.assertRaises(OSError, os.mkdir, support.TESTFN) 1949 1950 def test_utime(self): 1951 self.assertRaises(OSError, os.utime, support.TESTFN, None) 1952 1953 def test_chmod(self): 1954 self.assertRaises(OSError, os.chmod, support.TESTFN, 0) 1955 1956 1957class TestInvalidFD(unittest.TestCase): 1958 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat", 1959 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"] 1960 #singles.append("close") 1961 #We omit close because it doesn't raise an exception on some platforms 1962 def get_single(f): 1963 def helper(self): 1964 if hasattr(os, f): 1965 self.check(getattr(os, f)) 1966 return helper 1967 for f in singles: 1968 locals()["test_"+f] = get_single(f) 1969 1970 def check(self, f, *args): 1971 try: 1972 f(support.make_bad_fd(), *args) 1973 except OSError as e: 1974 self.assertEqual(e.errno, errno.EBADF) 1975 else: 1976 self.fail("%r didn't raise an OSError with a bad file descriptor" 1977 % f) 1978 1979 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()') 1980 def test_isatty(self): 1981 self.assertEqual(os.isatty(support.make_bad_fd()), False) 1982 1983 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()') 1984 def test_closerange(self): 1985 fd = support.make_bad_fd() 1986 # Make sure none of the descriptors we are about to close are 1987 # currently valid (issue 6542). 1988 for i in range(10): 1989 try: os.fstat(fd+i) 1990 except OSError: 1991 pass 1992 else: 1993 break 1994 if i < 2: 1995 raise unittest.SkipTest( 1996 "Unable to acquire a range of invalid file descriptors") 1997 self.assertEqual(os.closerange(fd, fd + i-1), None) 1998 1999 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()') 2000 def test_dup2(self): 2001 self.check(os.dup2, 20) 2002 2003 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()') 2004 def test_fchmod(self): 2005 self.check(os.fchmod, 0) 2006 2007 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()') 2008 def test_fchown(self): 2009 self.check(os.fchown, -1, -1) 2010 2011 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()') 2012 def test_fpathconf(self): 2013 self.check(os.pathconf, "PC_NAME_MAX") 2014 self.check(os.fpathconf, "PC_NAME_MAX") 2015 2016 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()') 2017 def test_ftruncate(self): 2018 self.check(os.truncate, 0) 2019 self.check(os.ftruncate, 0) 2020 2021 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()') 2022 def test_lseek(self): 2023 self.check(os.lseek, 0, 0) 2024 2025 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()') 2026 def test_read(self): 2027 self.check(os.read, 1) 2028 2029 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()') 2030 def test_readv(self): 2031 buf = bytearray(10) 2032 self.check(os.readv, [buf]) 2033 2034 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()') 2035 def test_tcsetpgrpt(self): 2036 self.check(os.tcsetpgrp, 0) 2037 2038 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()') 2039 def test_write(self): 2040 self.check(os.write, b" ") 2041 2042 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()') 2043 def test_writev(self): 2044 self.check(os.writev, [b'abc']) 2045 2046 def test_inheritable(self): 2047 self.check(os.get_inheritable) 2048 self.check(os.set_inheritable, True) 2049 2050 @unittest.skipUnless(hasattr(os, 'get_blocking'), 2051 'needs os.get_blocking() and os.set_blocking()') 2052 def test_blocking(self): 2053 self.check(os.get_blocking) 2054 self.check(os.set_blocking, True) 2055 2056 2057class LinkTests(unittest.TestCase): 2058 def setUp(self): 2059 self.file1 = support.TESTFN 2060 self.file2 = os.path.join(support.TESTFN + "2") 2061 2062 def tearDown(self): 2063 for file in (self.file1, self.file2): 2064 if os.path.exists(file): 2065 os.unlink(file) 2066 2067 def _test_link(self, file1, file2): 2068 create_file(file1) 2069 2070 try: 2071 os.link(file1, file2) 2072 except PermissionError as e: 2073 self.skipTest('os.link(): %s' % e) 2074 with open(file1, "r") as f1, open(file2, "r") as f2: 2075 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno())) 2076 2077 def test_link(self): 2078 self._test_link(self.file1, self.file2) 2079 2080 def test_link_bytes(self): 2081 self._test_link(bytes(self.file1, sys.getfilesystemencoding()), 2082 bytes(self.file2, sys.getfilesystemencoding())) 2083 2084 def test_unicode_name(self): 2085 try: 2086 os.fsencode("\xf1") 2087 except UnicodeError: 2088 raise unittest.SkipTest("Unable to encode for this platform.") 2089 2090 self.file1 += "\xf1" 2091 self.file2 = self.file1 + "2" 2092 self._test_link(self.file1, self.file2) 2093 2094@unittest.skipIf(sys.platform == "win32", "Posix specific tests") 2095class PosixUidGidTests(unittest.TestCase): 2096 # uid_t and gid_t are 32-bit unsigned integers on Linux 2097 UID_OVERFLOW = (1 << 32) 2098 GID_OVERFLOW = (1 << 32) 2099 2100 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()') 2101 def test_setuid(self): 2102 if os.getuid() != 0: 2103 self.assertRaises(OSError, os.setuid, 0) 2104 self.assertRaises(TypeError, os.setuid, 'not an int') 2105 self.assertRaises(OverflowError, os.setuid, self.UID_OVERFLOW) 2106 2107 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()') 2108 def test_setgid(self): 2109 if os.getuid() != 0 and not HAVE_WHEEL_GROUP: 2110 self.assertRaises(OSError, os.setgid, 0) 2111 self.assertRaises(TypeError, os.setgid, 'not an int') 2112 self.assertRaises(OverflowError, os.setgid, self.GID_OVERFLOW) 2113 2114 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()') 2115 def test_seteuid(self): 2116 if os.getuid() != 0: 2117 self.assertRaises(OSError, os.seteuid, 0) 2118 self.assertRaises(TypeError, os.setegid, 'not an int') 2119 self.assertRaises(OverflowError, os.seteuid, self.UID_OVERFLOW) 2120 2121 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()') 2122 def test_setegid(self): 2123 if os.getuid() != 0 and not HAVE_WHEEL_GROUP: 2124 self.assertRaises(OSError, os.setegid, 0) 2125 self.assertRaises(TypeError, os.setegid, 'not an int') 2126 self.assertRaises(OverflowError, os.setegid, self.GID_OVERFLOW) 2127 2128 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()') 2129 def test_setreuid(self): 2130 if os.getuid() != 0: 2131 self.assertRaises(OSError, os.setreuid, 0, 0) 2132 self.assertRaises(TypeError, os.setreuid, 'not an int', 0) 2133 self.assertRaises(TypeError, os.setreuid, 0, 'not an int') 2134 self.assertRaises(OverflowError, os.setreuid, self.UID_OVERFLOW, 0) 2135 self.assertRaises(OverflowError, os.setreuid, 0, self.UID_OVERFLOW) 2136 2137 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()') 2138 def test_setreuid_neg1(self): 2139 # Needs to accept -1. We run this in a subprocess to avoid 2140 # altering the test runner's process state (issue8045). 2141 subprocess.check_call([ 2142 sys.executable, '-c', 2143 'import os,sys;os.setreuid(-1,-1);sys.exit(0)']) 2144 2145 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()') 2146 def test_setregid(self): 2147 if os.getuid() != 0 and not HAVE_WHEEL_GROUP: 2148 self.assertRaises(OSError, os.setregid, 0, 0) 2149 self.assertRaises(TypeError, os.setregid, 'not an int', 0) 2150 self.assertRaises(TypeError, os.setregid, 0, 'not an int') 2151 self.assertRaises(OverflowError, os.setregid, self.GID_OVERFLOW, 0) 2152 self.assertRaises(OverflowError, os.setregid, 0, self.GID_OVERFLOW) 2153 2154 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()') 2155 def test_setregid_neg1(self): 2156 # Needs to accept -1. We run this in a subprocess to avoid 2157 # altering the test runner's process state (issue8045). 2158 subprocess.check_call([ 2159 sys.executable, '-c', 2160 'import os,sys;os.setregid(-1,-1);sys.exit(0)']) 2161 2162@unittest.skipIf(sys.platform == "win32", "Posix specific tests") 2163class Pep383Tests(unittest.TestCase): 2164 def setUp(self): 2165 if support.TESTFN_UNENCODABLE: 2166 self.dir = support.TESTFN_UNENCODABLE 2167 elif support.TESTFN_NONASCII: 2168 self.dir = support.TESTFN_NONASCII 2169 else: 2170 self.dir = support.TESTFN 2171 self.bdir = os.fsencode(self.dir) 2172 2173 bytesfn = [] 2174 def add_filename(fn): 2175 try: 2176 fn = os.fsencode(fn) 2177 except UnicodeEncodeError: 2178 return 2179 bytesfn.append(fn) 2180 add_filename(support.TESTFN_UNICODE) 2181 if support.TESTFN_UNENCODABLE: 2182 add_filename(support.TESTFN_UNENCODABLE) 2183 if support.TESTFN_NONASCII: 2184 add_filename(support.TESTFN_NONASCII) 2185 if not bytesfn: 2186 self.skipTest("couldn't create any non-ascii filename") 2187 2188 self.unicodefn = set() 2189 os.mkdir(self.dir) 2190 try: 2191 for fn in bytesfn: 2192 support.create_empty_file(os.path.join(self.bdir, fn)) 2193 fn = os.fsdecode(fn) 2194 if fn in self.unicodefn: 2195 raise ValueError("duplicate filename") 2196 self.unicodefn.add(fn) 2197 except: 2198 shutil.rmtree(self.dir) 2199 raise 2200 2201 def tearDown(self): 2202 shutil.rmtree(self.dir) 2203 2204 def test_listdir(self): 2205 expected = self.unicodefn 2206 found = set(os.listdir(self.dir)) 2207 self.assertEqual(found, expected) 2208 # test listdir without arguments 2209 current_directory = os.getcwd() 2210 try: 2211 os.chdir(os.sep) 2212 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep))) 2213 finally: 2214 os.chdir(current_directory) 2215 2216 def test_open(self): 2217 for fn in self.unicodefn: 2218 f = open(os.path.join(self.dir, fn), 'rb') 2219 f.close() 2220 2221 @unittest.skipUnless(hasattr(os, 'statvfs'), 2222 "need os.statvfs()") 2223 def test_statvfs(self): 2224 # issue #9645 2225 for fn in self.unicodefn: 2226 # should not fail with file not found error 2227 fullname = os.path.join(self.dir, fn) 2228 os.statvfs(fullname) 2229 2230 def test_stat(self): 2231 for fn in self.unicodefn: 2232 os.stat(os.path.join(self.dir, fn)) 2233 2234@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 2235class Win32KillTests(unittest.TestCase): 2236 def _kill(self, sig): 2237 # Start sys.executable as a subprocess and communicate from the 2238 # subprocess to the parent that the interpreter is ready. When it 2239 # becomes ready, send *sig* via os.kill to the subprocess and check 2240 # that the return code is equal to *sig*. 2241 import ctypes 2242 from ctypes import wintypes 2243 import msvcrt 2244 2245 # Since we can't access the contents of the process' stdout until the 2246 # process has exited, use PeekNamedPipe to see what's inside stdout 2247 # without waiting. This is done so we can tell that the interpreter 2248 # is started and running at a point where it could handle a signal. 2249 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe 2250 PeekNamedPipe.restype = wintypes.BOOL 2251 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle 2252 ctypes.POINTER(ctypes.c_char), # stdout buf 2253 wintypes.DWORD, # Buffer size 2254 ctypes.POINTER(wintypes.DWORD), # bytes read 2255 ctypes.POINTER(wintypes.DWORD), # bytes avail 2256 ctypes.POINTER(wintypes.DWORD)) # bytes left 2257 msg = "running" 2258 proc = subprocess.Popen([sys.executable, "-c", 2259 "import sys;" 2260 "sys.stdout.write('{}');" 2261 "sys.stdout.flush();" 2262 "input()".format(msg)], 2263 stdout=subprocess.PIPE, 2264 stderr=subprocess.PIPE, 2265 stdin=subprocess.PIPE) 2266 self.addCleanup(proc.stdout.close) 2267 self.addCleanup(proc.stderr.close) 2268 self.addCleanup(proc.stdin.close) 2269 2270 count, max = 0, 100 2271 while count < max and proc.poll() is None: 2272 # Create a string buffer to store the result of stdout from the pipe 2273 buf = ctypes.create_string_buffer(len(msg)) 2274 # Obtain the text currently in proc.stdout 2275 # Bytes read/avail/left are left as NULL and unused 2276 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()), 2277 buf, ctypes.sizeof(buf), None, None, None) 2278 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed") 2279 if buf.value: 2280 self.assertEqual(msg, buf.value.decode()) 2281 break 2282 time.sleep(0.1) 2283 count += 1 2284 else: 2285 self.fail("Did not receive communication from the subprocess") 2286 2287 os.kill(proc.pid, sig) 2288 self.assertEqual(proc.wait(), sig) 2289 2290 def test_kill_sigterm(self): 2291 # SIGTERM doesn't mean anything special, but make sure it works 2292 self._kill(signal.SIGTERM) 2293 2294 def test_kill_int(self): 2295 # os.kill on Windows can take an int which gets set as the exit code 2296 self._kill(100) 2297 2298 def _kill_with_event(self, event, name): 2299 tagname = "test_os_%s" % uuid.uuid1() 2300 m = mmap.mmap(-1, 1, tagname) 2301 m[0] = 0 2302 # Run a script which has console control handling enabled. 2303 proc = subprocess.Popen([sys.executable, 2304 os.path.join(os.path.dirname(__file__), 2305 "win_console_handler.py"), tagname], 2306 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP) 2307 # Let the interpreter startup before we send signals. See #3137. 2308 count, max = 0, 100 2309 while count < max and proc.poll() is None: 2310 if m[0] == 1: 2311 break 2312 time.sleep(0.1) 2313 count += 1 2314 else: 2315 # Forcefully kill the process if we weren't able to signal it. 2316 os.kill(proc.pid, signal.SIGINT) 2317 self.fail("Subprocess didn't finish initialization") 2318 os.kill(proc.pid, event) 2319 # proc.send_signal(event) could also be done here. 2320 # Allow time for the signal to be passed and the process to exit. 2321 time.sleep(0.5) 2322 if not proc.poll(): 2323 # Forcefully kill the process if we weren't able to signal it. 2324 os.kill(proc.pid, signal.SIGINT) 2325 self.fail("subprocess did not stop on {}".format(name)) 2326 2327 @unittest.skip("subprocesses aren't inheriting Ctrl+C property") 2328 def test_CTRL_C_EVENT(self): 2329 from ctypes import wintypes 2330 import ctypes 2331 2332 # Make a NULL value by creating a pointer with no argument. 2333 NULL = ctypes.POINTER(ctypes.c_int)() 2334 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler 2335 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int), 2336 wintypes.BOOL) 2337 SetConsoleCtrlHandler.restype = wintypes.BOOL 2338 2339 # Calling this with NULL and FALSE causes the calling process to 2340 # handle Ctrl+C, rather than ignore it. This property is inherited 2341 # by subprocesses. 2342 SetConsoleCtrlHandler(NULL, 0) 2343 2344 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT") 2345 2346 def test_CTRL_BREAK_EVENT(self): 2347 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT") 2348 2349 2350@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 2351class Win32ListdirTests(unittest.TestCase): 2352 """Test listdir on Windows.""" 2353 2354 def setUp(self): 2355 self.created_paths = [] 2356 for i in range(2): 2357 dir_name = 'SUB%d' % i 2358 dir_path = os.path.join(support.TESTFN, dir_name) 2359 file_name = 'FILE%d' % i 2360 file_path = os.path.join(support.TESTFN, file_name) 2361 os.makedirs(dir_path) 2362 with open(file_path, 'w', encoding='utf-8') as f: 2363 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path) 2364 self.created_paths.extend([dir_name, file_name]) 2365 self.created_paths.sort() 2366 2367 def tearDown(self): 2368 shutil.rmtree(support.TESTFN) 2369 2370 def test_listdir_no_extended_path(self): 2371 """Test when the path is not an "extended" path.""" 2372 # unicode 2373 self.assertEqual( 2374 sorted(os.listdir(support.TESTFN)), 2375 self.created_paths) 2376 2377 # bytes 2378 self.assertEqual( 2379 sorted(os.listdir(os.fsencode(support.TESTFN))), 2380 [os.fsencode(path) for path in self.created_paths]) 2381 2382 def test_listdir_extended_path(self): 2383 """Test when the path starts with '\\\\?\\'.""" 2384 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath 2385 # unicode 2386 path = '\\\\?\\' + os.path.abspath(support.TESTFN) 2387 self.assertEqual( 2388 sorted(os.listdir(path)), 2389 self.created_paths) 2390 2391 # bytes 2392 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN)) 2393 self.assertEqual( 2394 sorted(os.listdir(path)), 2395 [os.fsencode(path) for path in self.created_paths]) 2396 2397 2398@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()') 2399class ReadlinkTests(unittest.TestCase): 2400 filelink = 'readlinktest' 2401 filelink_target = os.path.abspath(__file__) 2402 filelinkb = os.fsencode(filelink) 2403 filelinkb_target = os.fsencode(filelink_target) 2404 2405 def assertPathEqual(self, left, right): 2406 left = os.path.normcase(left) 2407 right = os.path.normcase(right) 2408 if sys.platform == 'win32': 2409 # Bad practice to blindly strip the prefix as it may be required to 2410 # correctly refer to the file, but we're only comparing paths here. 2411 has_prefix = lambda p: p.startswith( 2412 b'\\\\?\\' if isinstance(p, bytes) else '\\\\?\\') 2413 if has_prefix(left): 2414 left = left[4:] 2415 if has_prefix(right): 2416 right = right[4:] 2417 self.assertEqual(left, right) 2418 2419 def setUp(self): 2420 self.assertTrue(os.path.exists(self.filelink_target)) 2421 self.assertTrue(os.path.exists(self.filelinkb_target)) 2422 self.assertFalse(os.path.exists(self.filelink)) 2423 self.assertFalse(os.path.exists(self.filelinkb)) 2424 2425 def test_not_symlink(self): 2426 filelink_target = FakePath(self.filelink_target) 2427 self.assertRaises(OSError, os.readlink, self.filelink_target) 2428 self.assertRaises(OSError, os.readlink, filelink_target) 2429 2430 def test_missing_link(self): 2431 self.assertRaises(FileNotFoundError, os.readlink, 'missing-link') 2432 self.assertRaises(FileNotFoundError, os.readlink, 2433 FakePath('missing-link')) 2434 2435 @support.skip_unless_symlink 2436 def test_pathlike(self): 2437 os.symlink(self.filelink_target, self.filelink) 2438 self.addCleanup(support.unlink, self.filelink) 2439 filelink = FakePath(self.filelink) 2440 self.assertPathEqual(os.readlink(filelink), self.filelink_target) 2441 2442 @support.skip_unless_symlink 2443 def test_pathlike_bytes(self): 2444 os.symlink(self.filelinkb_target, self.filelinkb) 2445 self.addCleanup(support.unlink, self.filelinkb) 2446 path = os.readlink(FakePath(self.filelinkb)) 2447 self.assertPathEqual(path, self.filelinkb_target) 2448 self.assertIsInstance(path, bytes) 2449 2450 @support.skip_unless_symlink 2451 def test_bytes(self): 2452 os.symlink(self.filelinkb_target, self.filelinkb) 2453 self.addCleanup(support.unlink, self.filelinkb) 2454 path = os.readlink(self.filelinkb) 2455 self.assertPathEqual(path, self.filelinkb_target) 2456 self.assertIsInstance(path, bytes) 2457 2458 2459@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 2460@support.skip_unless_symlink 2461class Win32SymlinkTests(unittest.TestCase): 2462 filelink = 'filelinktest' 2463 filelink_target = os.path.abspath(__file__) 2464 dirlink = 'dirlinktest' 2465 dirlink_target = os.path.dirname(filelink_target) 2466 missing_link = 'missing link' 2467 2468 def setUp(self): 2469 assert os.path.exists(self.dirlink_target) 2470 assert os.path.exists(self.filelink_target) 2471 assert not os.path.exists(self.dirlink) 2472 assert not os.path.exists(self.filelink) 2473 assert not os.path.exists(self.missing_link) 2474 2475 def tearDown(self): 2476 if os.path.exists(self.filelink): 2477 os.remove(self.filelink) 2478 if os.path.exists(self.dirlink): 2479 os.rmdir(self.dirlink) 2480 if os.path.lexists(self.missing_link): 2481 os.remove(self.missing_link) 2482 2483 def test_directory_link(self): 2484 os.symlink(self.dirlink_target, self.dirlink) 2485 self.assertTrue(os.path.exists(self.dirlink)) 2486 self.assertTrue(os.path.isdir(self.dirlink)) 2487 self.assertTrue(os.path.islink(self.dirlink)) 2488 self.check_stat(self.dirlink, self.dirlink_target) 2489 2490 def test_file_link(self): 2491 os.symlink(self.filelink_target, self.filelink) 2492 self.assertTrue(os.path.exists(self.filelink)) 2493 self.assertTrue(os.path.isfile(self.filelink)) 2494 self.assertTrue(os.path.islink(self.filelink)) 2495 self.check_stat(self.filelink, self.filelink_target) 2496 2497 def _create_missing_dir_link(self): 2498 'Create a "directory" link to a non-existent target' 2499 linkname = self.missing_link 2500 if os.path.lexists(linkname): 2501 os.remove(linkname) 2502 target = r'c:\\target does not exist.29r3c740' 2503 assert not os.path.exists(target) 2504 target_is_dir = True 2505 os.symlink(target, linkname, target_is_dir) 2506 2507 def test_remove_directory_link_to_missing_target(self): 2508 self._create_missing_dir_link() 2509 # For compatibility with Unix, os.remove will check the 2510 # directory status and call RemoveDirectory if the symlink 2511 # was created with target_is_dir==True. 2512 os.remove(self.missing_link) 2513 2514 def test_isdir_on_directory_link_to_missing_target(self): 2515 self._create_missing_dir_link() 2516 self.assertFalse(os.path.isdir(self.missing_link)) 2517 2518 def test_rmdir_on_directory_link_to_missing_target(self): 2519 self._create_missing_dir_link() 2520 os.rmdir(self.missing_link) 2521 2522 def check_stat(self, link, target): 2523 self.assertEqual(os.stat(link), os.stat(target)) 2524 self.assertNotEqual(os.lstat(link), os.stat(link)) 2525 2526 bytes_link = os.fsencode(link) 2527 self.assertEqual(os.stat(bytes_link), os.stat(target)) 2528 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link)) 2529 2530 def test_12084(self): 2531 level1 = os.path.abspath(support.TESTFN) 2532 level2 = os.path.join(level1, "level2") 2533 level3 = os.path.join(level2, "level3") 2534 self.addCleanup(support.rmtree, level1) 2535 2536 os.mkdir(level1) 2537 os.mkdir(level2) 2538 os.mkdir(level3) 2539 2540 file1 = os.path.abspath(os.path.join(level1, "file1")) 2541 create_file(file1) 2542 2543 orig_dir = os.getcwd() 2544 try: 2545 os.chdir(level2) 2546 link = os.path.join(level2, "link") 2547 os.symlink(os.path.relpath(file1), "link") 2548 self.assertIn("link", os.listdir(os.getcwd())) 2549 2550 # Check os.stat calls from the same dir as the link 2551 self.assertEqual(os.stat(file1), os.stat("link")) 2552 2553 # Check os.stat calls from a dir below the link 2554 os.chdir(level1) 2555 self.assertEqual(os.stat(file1), 2556 os.stat(os.path.relpath(link))) 2557 2558 # Check os.stat calls from a dir above the link 2559 os.chdir(level3) 2560 self.assertEqual(os.stat(file1), 2561 os.stat(os.path.relpath(link))) 2562 finally: 2563 os.chdir(orig_dir) 2564 2565 @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users') 2566 and os.path.exists(r'C:\ProgramData'), 2567 'Test directories not found') 2568 def test_29248(self): 2569 # os.symlink() calls CreateSymbolicLink, which creates 2570 # the reparse data buffer with the print name stored 2571 # first, so the offset is always 0. CreateSymbolicLink 2572 # stores the "PrintName" DOS path (e.g. "C:\") first, 2573 # with an offset of 0, followed by the "SubstituteName" 2574 # NT path (e.g. "\??\C:\"). The "All Users" link, on 2575 # the other hand, seems to have been created manually 2576 # with an inverted order. 2577 target = os.readlink(r'C:\Users\All Users') 2578 self.assertTrue(os.path.samefile(target, r'C:\ProgramData')) 2579 2580 def test_buffer_overflow(self): 2581 # Older versions would have a buffer overflow when detecting 2582 # whether a link source was a directory. This test ensures we 2583 # no longer crash, but does not otherwise validate the behavior 2584 segment = 'X' * 27 2585 path = os.path.join(*[segment] * 10) 2586 test_cases = [ 2587 # overflow with absolute src 2588 ('\\' + path, segment), 2589 # overflow dest with relative src 2590 (segment, path), 2591 # overflow when joining src 2592 (path[:180], path[:180]), 2593 ] 2594 for src, dest in test_cases: 2595 try: 2596 os.symlink(src, dest) 2597 except FileNotFoundError: 2598 pass 2599 else: 2600 try: 2601 os.remove(dest) 2602 except OSError: 2603 pass 2604 # Also test with bytes, since that is a separate code path. 2605 try: 2606 os.symlink(os.fsencode(src), os.fsencode(dest)) 2607 except FileNotFoundError: 2608 pass 2609 else: 2610 try: 2611 os.remove(dest) 2612 except OSError: 2613 pass 2614 2615 def test_appexeclink(self): 2616 root = os.path.expandvars(r'%LOCALAPPDATA%\Microsoft\WindowsApps') 2617 if not os.path.isdir(root): 2618 self.skipTest("test requires a WindowsApps directory") 2619 2620 aliases = [os.path.join(root, a) 2621 for a in fnmatch.filter(os.listdir(root), '*.exe')] 2622 2623 for alias in aliases: 2624 if support.verbose: 2625 print() 2626 print("Testing with", alias) 2627 st = os.lstat(alias) 2628 self.assertEqual(st, os.stat(alias)) 2629 self.assertFalse(stat.S_ISLNK(st.st_mode)) 2630 self.assertEqual(st.st_reparse_tag, stat.IO_REPARSE_TAG_APPEXECLINK) 2631 # testing the first one we see is sufficient 2632 break 2633 else: 2634 self.skipTest("test requires an app execution alias") 2635 2636@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 2637class Win32JunctionTests(unittest.TestCase): 2638 junction = 'junctiontest' 2639 junction_target = os.path.dirname(os.path.abspath(__file__)) 2640 2641 def setUp(self): 2642 assert os.path.exists(self.junction_target) 2643 assert not os.path.lexists(self.junction) 2644 2645 def tearDown(self): 2646 if os.path.lexists(self.junction): 2647 os.unlink(self.junction) 2648 2649 def test_create_junction(self): 2650 _winapi.CreateJunction(self.junction_target, self.junction) 2651 self.assertTrue(os.path.lexists(self.junction)) 2652 self.assertTrue(os.path.exists(self.junction)) 2653 self.assertTrue(os.path.isdir(self.junction)) 2654 self.assertNotEqual(os.stat(self.junction), os.lstat(self.junction)) 2655 self.assertEqual(os.stat(self.junction), os.stat(self.junction_target)) 2656 2657 # bpo-37834: Junctions are not recognized as links. 2658 self.assertFalse(os.path.islink(self.junction)) 2659 self.assertEqual(os.path.normcase("\\\\?\\" + self.junction_target), 2660 os.path.normcase(os.readlink(self.junction))) 2661 2662 def test_unlink_removes_junction(self): 2663 _winapi.CreateJunction(self.junction_target, self.junction) 2664 self.assertTrue(os.path.exists(self.junction)) 2665 self.assertTrue(os.path.lexists(self.junction)) 2666 2667 os.unlink(self.junction) 2668 self.assertFalse(os.path.exists(self.junction)) 2669 2670@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 2671class Win32NtTests(unittest.TestCase): 2672 def test_getfinalpathname_handles(self): 2673 nt = support.import_module('nt') 2674 ctypes = support.import_module('ctypes') 2675 import ctypes.wintypes 2676 2677 kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True) 2678 kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE 2679 2680 kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL 2681 kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE, 2682 ctypes.wintypes.LPDWORD) 2683 2684 # This is a pseudo-handle that doesn't need to be closed 2685 hproc = kernel.GetCurrentProcess() 2686 2687 handle_count = ctypes.wintypes.DWORD() 2688 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count)) 2689 self.assertEqual(1, ok) 2690 2691 before_count = handle_count.value 2692 2693 # The first two test the error path, __file__ tests the success path 2694 filenames = [ 2695 r'\\?\C:', 2696 r'\\?\NUL', 2697 r'\\?\CONIN', 2698 __file__, 2699 ] 2700 2701 for _ in range(10): 2702 for name in filenames: 2703 try: 2704 nt._getfinalpathname(name) 2705 except Exception: 2706 # Failure is expected 2707 pass 2708 try: 2709 os.stat(name) 2710 except Exception: 2711 pass 2712 2713 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count)) 2714 self.assertEqual(1, ok) 2715 2716 handle_delta = handle_count.value - before_count 2717 2718 self.assertEqual(0, handle_delta) 2719 2720@support.skip_unless_symlink 2721class NonLocalSymlinkTests(unittest.TestCase): 2722 2723 def setUp(self): 2724 r""" 2725 Create this structure: 2726 2727 base 2728 \___ some_dir 2729 """ 2730 os.makedirs('base/some_dir') 2731 2732 def tearDown(self): 2733 shutil.rmtree('base') 2734 2735 def test_directory_link_nonlocal(self): 2736 """ 2737 The symlink target should resolve relative to the link, not relative 2738 to the current directory. 2739 2740 Then, link base/some_link -> base/some_dir and ensure that some_link 2741 is resolved as a directory. 2742 2743 In issue13772, it was discovered that directory detection failed if 2744 the symlink target was not specified relative to the current 2745 directory, which was a defect in the implementation. 2746 """ 2747 src = os.path.join('base', 'some_link') 2748 os.symlink('some_dir', src) 2749 assert os.path.isdir(src) 2750 2751 2752class FSEncodingTests(unittest.TestCase): 2753 def test_nop(self): 2754 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff') 2755 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141') 2756 2757 def test_identity(self): 2758 # assert fsdecode(fsencode(x)) == x 2759 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'): 2760 try: 2761 bytesfn = os.fsencode(fn) 2762 except UnicodeEncodeError: 2763 continue 2764 self.assertEqual(os.fsdecode(bytesfn), fn) 2765 2766 2767 2768class DeviceEncodingTests(unittest.TestCase): 2769 2770 def test_bad_fd(self): 2771 # Return None when an fd doesn't actually exist. 2772 self.assertIsNone(os.device_encoding(123456)) 2773 2774 @unittest.skipUnless(os.isatty(0) and not win32_is_iot() and (sys.platform.startswith('win') or 2775 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))), 2776 'test requires a tty and either Windows or nl_langinfo(CODESET)') 2777 def test_device_encoding(self): 2778 encoding = os.device_encoding(0) 2779 self.assertIsNotNone(encoding) 2780 self.assertTrue(codecs.lookup(encoding)) 2781 2782 2783class PidTests(unittest.TestCase): 2784 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid") 2785 def test_getppid(self): 2786 p = subprocess.Popen([sys.executable, '-c', 2787 'import os; print(os.getppid())'], 2788 stdout=subprocess.PIPE) 2789 stdout, _ = p.communicate() 2790 # We are the parent of our subprocess 2791 self.assertEqual(int(stdout), os.getpid()) 2792 2793 def check_waitpid(self, code, exitcode, callback=None): 2794 if sys.platform == 'win32': 2795 # On Windows, os.spawnv() simply joins arguments with spaces: 2796 # arguments need to be quoted 2797 args = [f'"{sys.executable}"', '-c', f'"{code}"'] 2798 else: 2799 args = [sys.executable, '-c', code] 2800 pid = os.spawnv(os.P_NOWAIT, sys.executable, args) 2801 2802 if callback is not None: 2803 callback(pid) 2804 2805 # don't use support.wait_process() to test directly os.waitpid() 2806 # and os.waitstatus_to_exitcode() 2807 pid2, status = os.waitpid(pid, 0) 2808 self.assertEqual(os.waitstatus_to_exitcode(status), exitcode) 2809 self.assertEqual(pid2, pid) 2810 2811 def test_waitpid(self): 2812 self.check_waitpid(code='pass', exitcode=0) 2813 2814 def test_waitstatus_to_exitcode(self): 2815 exitcode = 23 2816 code = f'import sys; sys.exit({exitcode})' 2817 self.check_waitpid(code, exitcode=exitcode) 2818 2819 with self.assertRaises(TypeError): 2820 os.waitstatus_to_exitcode(0.0) 2821 2822 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test') 2823 def test_waitpid_windows(self): 2824 # bpo-40138: test os.waitpid() and os.waitstatus_to_exitcode() 2825 # with exit code larger than INT_MAX. 2826 STATUS_CONTROL_C_EXIT = 0xC000013A 2827 code = f'import _winapi; _winapi.ExitProcess({STATUS_CONTROL_C_EXIT})' 2828 self.check_waitpid(code, exitcode=STATUS_CONTROL_C_EXIT) 2829 2830 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test') 2831 def test_waitstatus_to_exitcode_windows(self): 2832 max_exitcode = 2 ** 32 - 1 2833 for exitcode in (0, 1, 5, max_exitcode): 2834 self.assertEqual(os.waitstatus_to_exitcode(exitcode << 8), 2835 exitcode) 2836 2837 # invalid values 2838 with self.assertRaises(ValueError): 2839 os.waitstatus_to_exitcode((max_exitcode + 1) << 8) 2840 with self.assertRaises(OverflowError): 2841 os.waitstatus_to_exitcode(-1) 2842 2843 # Skip the test on Windows 2844 @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'need signal.SIGKILL') 2845 def test_waitstatus_to_exitcode_kill(self): 2846 code = f'import time; time.sleep({support.LONG_TIMEOUT})' 2847 signum = signal.SIGKILL 2848 2849 def kill_process(pid): 2850 os.kill(pid, signum) 2851 2852 self.check_waitpid(code, exitcode=-signum, callback=kill_process) 2853 2854 2855class SpawnTests(unittest.TestCase): 2856 def create_args(self, *, with_env=False, use_bytes=False): 2857 self.exitcode = 17 2858 2859 filename = support.TESTFN 2860 self.addCleanup(support.unlink, filename) 2861 2862 if not with_env: 2863 code = 'import sys; sys.exit(%s)' % self.exitcode 2864 else: 2865 self.env = dict(os.environ) 2866 # create an unique key 2867 self.key = str(uuid.uuid4()) 2868 self.env[self.key] = self.key 2869 # read the variable from os.environ to check that it exists 2870 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)' 2871 % (self.key, self.exitcode)) 2872 2873 with open(filename, "w") as fp: 2874 fp.write(code) 2875 2876 args = [sys.executable, filename] 2877 if use_bytes: 2878 args = [os.fsencode(a) for a in args] 2879 self.env = {os.fsencode(k): os.fsencode(v) 2880 for k, v in self.env.items()} 2881 2882 return args 2883 2884 @requires_os_func('spawnl') 2885 def test_spawnl(self): 2886 args = self.create_args() 2887 exitcode = os.spawnl(os.P_WAIT, args[0], *args) 2888 self.assertEqual(exitcode, self.exitcode) 2889 2890 @requires_os_func('spawnle') 2891 def test_spawnle(self): 2892 args = self.create_args(with_env=True) 2893 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env) 2894 self.assertEqual(exitcode, self.exitcode) 2895 2896 @requires_os_func('spawnlp') 2897 def test_spawnlp(self): 2898 args = self.create_args() 2899 exitcode = os.spawnlp(os.P_WAIT, args[0], *args) 2900 self.assertEqual(exitcode, self.exitcode) 2901 2902 @requires_os_func('spawnlpe') 2903 def test_spawnlpe(self): 2904 args = self.create_args(with_env=True) 2905 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env) 2906 self.assertEqual(exitcode, self.exitcode) 2907 2908 @requires_os_func('spawnv') 2909 def test_spawnv(self): 2910 args = self.create_args() 2911 exitcode = os.spawnv(os.P_WAIT, args[0], args) 2912 self.assertEqual(exitcode, self.exitcode) 2913 2914 # Test for PyUnicode_FSConverter() 2915 exitcode = os.spawnv(os.P_WAIT, FakePath(args[0]), args) 2916 self.assertEqual(exitcode, self.exitcode) 2917 2918 @requires_os_func('spawnve') 2919 def test_spawnve(self): 2920 args = self.create_args(with_env=True) 2921 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env) 2922 self.assertEqual(exitcode, self.exitcode) 2923 2924 @requires_os_func('spawnvp') 2925 def test_spawnvp(self): 2926 args = self.create_args() 2927 exitcode = os.spawnvp(os.P_WAIT, args[0], args) 2928 self.assertEqual(exitcode, self.exitcode) 2929 2930 @requires_os_func('spawnvpe') 2931 def test_spawnvpe(self): 2932 args = self.create_args(with_env=True) 2933 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env) 2934 self.assertEqual(exitcode, self.exitcode) 2935 2936 @requires_os_func('spawnv') 2937 def test_nowait(self): 2938 args = self.create_args() 2939 pid = os.spawnv(os.P_NOWAIT, args[0], args) 2940 support.wait_process(pid, exitcode=self.exitcode) 2941 2942 @requires_os_func('spawnve') 2943 def test_spawnve_bytes(self): 2944 # Test bytes handling in parse_arglist and parse_envlist (#28114) 2945 args = self.create_args(with_env=True, use_bytes=True) 2946 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env) 2947 self.assertEqual(exitcode, self.exitcode) 2948 2949 @requires_os_func('spawnl') 2950 def test_spawnl_noargs(self): 2951 args = self.create_args() 2952 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0]) 2953 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '') 2954 2955 @requires_os_func('spawnle') 2956 def test_spawnle_noargs(self): 2957 args = self.create_args() 2958 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {}) 2959 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {}) 2960 2961 @requires_os_func('spawnv') 2962 def test_spawnv_noargs(self): 2963 args = self.create_args() 2964 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ()) 2965 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], []) 2966 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',)) 2967 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ['']) 2968 2969 @requires_os_func('spawnve') 2970 def test_spawnve_noargs(self): 2971 args = self.create_args() 2972 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {}) 2973 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {}) 2974 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {}) 2975 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {}) 2976 2977 def _test_invalid_env(self, spawn): 2978 args = [sys.executable, '-c', 'pass'] 2979 2980 # null character in the environment variable name 2981 newenv = os.environ.copy() 2982 newenv["FRUIT\0VEGETABLE"] = "cabbage" 2983 try: 2984 exitcode = spawn(os.P_WAIT, args[0], args, newenv) 2985 except ValueError: 2986 pass 2987 else: 2988 self.assertEqual(exitcode, 127) 2989 2990 # null character in the environment variable value 2991 newenv = os.environ.copy() 2992 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage" 2993 try: 2994 exitcode = spawn(os.P_WAIT, args[0], args, newenv) 2995 except ValueError: 2996 pass 2997 else: 2998 self.assertEqual(exitcode, 127) 2999 3000 # equal character in the environment variable name 3001 newenv = os.environ.copy() 3002 newenv["FRUIT=ORANGE"] = "lemon" 3003 try: 3004 exitcode = spawn(os.P_WAIT, args[0], args, newenv) 3005 except ValueError: 3006 pass 3007 else: 3008 self.assertEqual(exitcode, 127) 3009 3010 # equal character in the environment variable value 3011 filename = support.TESTFN 3012 self.addCleanup(support.unlink, filename) 3013 with open(filename, "w") as fp: 3014 fp.write('import sys, os\n' 3015 'if os.getenv("FRUIT") != "orange=lemon":\n' 3016 ' raise AssertionError') 3017 args = [sys.executable, filename] 3018 newenv = os.environ.copy() 3019 newenv["FRUIT"] = "orange=lemon" 3020 exitcode = spawn(os.P_WAIT, args[0], args, newenv) 3021 self.assertEqual(exitcode, 0) 3022 3023 @requires_os_func('spawnve') 3024 def test_spawnve_invalid_env(self): 3025 self._test_invalid_env(os.spawnve) 3026 3027 @requires_os_func('spawnvpe') 3028 def test_spawnvpe_invalid_env(self): 3029 self._test_invalid_env(os.spawnvpe) 3030 3031 3032# The introduction of this TestCase caused at least two different errors on 3033# *nix buildbots. Temporarily skip this to let the buildbots move along. 3034@unittest.skip("Skip due to platform/environment differences on *NIX buildbots") 3035@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin") 3036class LoginTests(unittest.TestCase): 3037 def test_getlogin(self): 3038 user_name = os.getlogin() 3039 self.assertNotEqual(len(user_name), 0) 3040 3041 3042@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'), 3043 "needs os.getpriority and os.setpriority") 3044class ProgramPriorityTests(unittest.TestCase): 3045 """Tests for os.getpriority() and os.setpriority().""" 3046 3047 def test_set_get_priority(self): 3048 3049 base = os.getpriority(os.PRIO_PROCESS, os.getpid()) 3050 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1) 3051 try: 3052 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid()) 3053 if base >= 19 and new_prio <= 19: 3054 raise unittest.SkipTest("unable to reliably test setpriority " 3055 "at current nice level of %s" % base) 3056 else: 3057 self.assertEqual(new_prio, base + 1) 3058 finally: 3059 try: 3060 os.setpriority(os.PRIO_PROCESS, os.getpid(), base) 3061 except OSError as err: 3062 if err.errno != errno.EACCES: 3063 raise 3064 3065 3066class SendfileTestServer(asyncore.dispatcher, threading.Thread): 3067 3068 class Handler(asynchat.async_chat): 3069 3070 def __init__(self, conn): 3071 asynchat.async_chat.__init__(self, conn) 3072 self.in_buffer = [] 3073 self.accumulate = True 3074 self.closed = False 3075 self.push(b"220 ready\r\n") 3076 3077 def handle_read(self): 3078 data = self.recv(4096) 3079 if self.accumulate: 3080 self.in_buffer.append(data) 3081 3082 def get_data(self): 3083 return b''.join(self.in_buffer) 3084 3085 def handle_close(self): 3086 self.close() 3087 self.closed = True 3088 3089 def handle_error(self): 3090 raise 3091 3092 def __init__(self, address): 3093 threading.Thread.__init__(self) 3094 asyncore.dispatcher.__init__(self) 3095 self.create_socket(socket.AF_INET, socket.SOCK_STREAM) 3096 self.bind(address) 3097 self.listen(5) 3098 self.host, self.port = self.socket.getsockname()[:2] 3099 self.handler_instance = None 3100 self._active = False 3101 self._active_lock = threading.Lock() 3102 3103 # --- public API 3104 3105 @property 3106 def running(self): 3107 return self._active 3108 3109 def start(self): 3110 assert not self.running 3111 self.__flag = threading.Event() 3112 threading.Thread.start(self) 3113 self.__flag.wait() 3114 3115 def stop(self): 3116 assert self.running 3117 self._active = False 3118 self.join() 3119 3120 def wait(self): 3121 # wait for handler connection to be closed, then stop the server 3122 while not getattr(self.handler_instance, "closed", False): 3123 time.sleep(0.001) 3124 self.stop() 3125 3126 # --- internals 3127 3128 def run(self): 3129 self._active = True 3130 self.__flag.set() 3131 while self._active and asyncore.socket_map: 3132 self._active_lock.acquire() 3133 asyncore.loop(timeout=0.001, count=1) 3134 self._active_lock.release() 3135 asyncore.close_all() 3136 3137 def handle_accept(self): 3138 conn, addr = self.accept() 3139 self.handler_instance = self.Handler(conn) 3140 3141 def handle_connect(self): 3142 self.close() 3143 handle_read = handle_connect 3144 3145 def writable(self): 3146 return 0 3147 3148 def handle_error(self): 3149 raise 3150 3151 3152@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()") 3153class TestSendfile(unittest.TestCase): 3154 3155 DATA = b"12345abcde" * 16 * 1024 # 160 KiB 3156 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \ 3157 not sys.platform.startswith("solaris") and \ 3158 not sys.platform.startswith("sunos") 3159 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS, 3160 'requires headers and trailers support') 3161 requires_32b = unittest.skipUnless(sys.maxsize < 2**32, 3162 'test is only meaningful on 32-bit builds') 3163 3164 @classmethod 3165 def setUpClass(cls): 3166 cls.key = support.threading_setup() 3167 create_file(support.TESTFN, cls.DATA) 3168 3169 @classmethod 3170 def tearDownClass(cls): 3171 support.threading_cleanup(*cls.key) 3172 support.unlink(support.TESTFN) 3173 3174 def setUp(self): 3175 self.server = SendfileTestServer((socket_helper.HOST, 0)) 3176 self.server.start() 3177 self.client = socket.socket() 3178 self.client.connect((self.server.host, self.server.port)) 3179 self.client.settimeout(1) 3180 # synchronize by waiting for "220 ready" response 3181 self.client.recv(1024) 3182 self.sockno = self.client.fileno() 3183 self.file = open(support.TESTFN, 'rb') 3184 self.fileno = self.file.fileno() 3185 3186 def tearDown(self): 3187 self.file.close() 3188 self.client.close() 3189 if self.server.running: 3190 self.server.stop() 3191 self.server = None 3192 3193 def sendfile_wrapper(self, *args, **kwargs): 3194 """A higher level wrapper representing how an application is 3195 supposed to use sendfile(). 3196 """ 3197 while True: 3198 try: 3199 return os.sendfile(*args, **kwargs) 3200 except OSError as err: 3201 if err.errno == errno.ECONNRESET: 3202 # disconnected 3203 raise 3204 elif err.errno in (errno.EAGAIN, errno.EBUSY): 3205 # we have to retry send data 3206 continue 3207 else: 3208 raise 3209 3210 def test_send_whole_file(self): 3211 # normal send 3212 total_sent = 0 3213 offset = 0 3214 nbytes = 4096 3215 while total_sent < len(self.DATA): 3216 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes) 3217 if sent == 0: 3218 break 3219 offset += sent 3220 total_sent += sent 3221 self.assertTrue(sent <= nbytes) 3222 self.assertEqual(offset, total_sent) 3223 3224 self.assertEqual(total_sent, len(self.DATA)) 3225 self.client.shutdown(socket.SHUT_RDWR) 3226 self.client.close() 3227 self.server.wait() 3228 data = self.server.handler_instance.get_data() 3229 self.assertEqual(len(data), len(self.DATA)) 3230 self.assertEqual(data, self.DATA) 3231 3232 def test_send_at_certain_offset(self): 3233 # start sending a file at a certain offset 3234 total_sent = 0 3235 offset = len(self.DATA) // 2 3236 must_send = len(self.DATA) - offset 3237 nbytes = 4096 3238 while total_sent < must_send: 3239 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes) 3240 if sent == 0: 3241 break 3242 offset += sent 3243 total_sent += sent 3244 self.assertTrue(sent <= nbytes) 3245 3246 self.client.shutdown(socket.SHUT_RDWR) 3247 self.client.close() 3248 self.server.wait() 3249 data = self.server.handler_instance.get_data() 3250 expected = self.DATA[len(self.DATA) // 2:] 3251 self.assertEqual(total_sent, len(expected)) 3252 self.assertEqual(len(data), len(expected)) 3253 self.assertEqual(data, expected) 3254 3255 def test_offset_overflow(self): 3256 # specify an offset > file size 3257 offset = len(self.DATA) + 4096 3258 try: 3259 sent = os.sendfile(self.sockno, self.fileno, offset, 4096) 3260 except OSError as e: 3261 # Solaris can raise EINVAL if offset >= file length, ignore. 3262 if e.errno != errno.EINVAL: 3263 raise 3264 else: 3265 self.assertEqual(sent, 0) 3266 self.client.shutdown(socket.SHUT_RDWR) 3267 self.client.close() 3268 self.server.wait() 3269 data = self.server.handler_instance.get_data() 3270 self.assertEqual(data, b'') 3271 3272 def test_invalid_offset(self): 3273 with self.assertRaises(OSError) as cm: 3274 os.sendfile(self.sockno, self.fileno, -1, 4096) 3275 self.assertEqual(cm.exception.errno, errno.EINVAL) 3276 3277 def test_keywords(self): 3278 # Keyword arguments should be supported 3279 os.sendfile(out_fd=self.sockno, in_fd=self.fileno, 3280 offset=0, count=4096) 3281 if self.SUPPORT_HEADERS_TRAILERS: 3282 os.sendfile(out_fd=self.sockno, in_fd=self.fileno, 3283 offset=0, count=4096, 3284 headers=(), trailers=(), flags=0) 3285 3286 # --- headers / trailers tests 3287 3288 @requires_headers_trailers 3289 def test_headers(self): 3290 total_sent = 0 3291 expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1] 3292 sent = os.sendfile(self.sockno, self.fileno, 0, 4096, 3293 headers=[b"x" * 512, b"y" * 256]) 3294 self.assertLessEqual(sent, 512 + 256 + 4096) 3295 total_sent += sent 3296 offset = 4096 3297 while total_sent < len(expected_data): 3298 nbytes = min(len(expected_data) - total_sent, 4096) 3299 sent = self.sendfile_wrapper(self.sockno, self.fileno, 3300 offset, nbytes) 3301 if sent == 0: 3302 break 3303 self.assertLessEqual(sent, nbytes) 3304 total_sent += sent 3305 offset += sent 3306 3307 self.assertEqual(total_sent, len(expected_data)) 3308 self.client.close() 3309 self.server.wait() 3310 data = self.server.handler_instance.get_data() 3311 self.assertEqual(hash(data), hash(expected_data)) 3312 3313 @requires_headers_trailers 3314 def test_trailers(self): 3315 TESTFN2 = support.TESTFN + "2" 3316 file_data = b"abcdef" 3317 3318 self.addCleanup(support.unlink, TESTFN2) 3319 create_file(TESTFN2, file_data) 3320 3321 with open(TESTFN2, 'rb') as f: 3322 os.sendfile(self.sockno, f.fileno(), 0, 5, 3323 trailers=[b"123456", b"789"]) 3324 self.client.close() 3325 self.server.wait() 3326 data = self.server.handler_instance.get_data() 3327 self.assertEqual(data, b"abcde123456789") 3328 3329 @requires_headers_trailers 3330 @requires_32b 3331 def test_headers_overflow_32bits(self): 3332 self.server.handler_instance.accumulate = False 3333 with self.assertRaises(OSError) as cm: 3334 os.sendfile(self.sockno, self.fileno, 0, 0, 3335 headers=[b"x" * 2**16] * 2**15) 3336 self.assertEqual(cm.exception.errno, errno.EINVAL) 3337 3338 @requires_headers_trailers 3339 @requires_32b 3340 def test_trailers_overflow_32bits(self): 3341 self.server.handler_instance.accumulate = False 3342 with self.assertRaises(OSError) as cm: 3343 os.sendfile(self.sockno, self.fileno, 0, 0, 3344 trailers=[b"x" * 2**16] * 2**15) 3345 self.assertEqual(cm.exception.errno, errno.EINVAL) 3346 3347 @requires_headers_trailers 3348 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'), 3349 'test needs os.SF_NODISKIO') 3350 def test_flags(self): 3351 try: 3352 os.sendfile(self.sockno, self.fileno, 0, 4096, 3353 flags=os.SF_NODISKIO) 3354 except OSError as err: 3355 if err.errno not in (errno.EBUSY, errno.EAGAIN): 3356 raise 3357 3358 3359def supports_extended_attributes(): 3360 if not hasattr(os, "setxattr"): 3361 return False 3362 3363 try: 3364 with open(support.TESTFN, "xb", 0) as fp: 3365 try: 3366 os.setxattr(fp.fileno(), b"user.test", b"") 3367 except OSError: 3368 return False 3369 finally: 3370 support.unlink(support.TESTFN) 3371 3372 return True 3373 3374 3375@unittest.skipUnless(supports_extended_attributes(), 3376 "no non-broken extended attribute support") 3377# Kernels < 2.6.39 don't respect setxattr flags. 3378@support.requires_linux_version(2, 6, 39) 3379class ExtendedAttributeTests(unittest.TestCase): 3380 3381 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs): 3382 fn = support.TESTFN 3383 self.addCleanup(support.unlink, fn) 3384 create_file(fn) 3385 3386 with self.assertRaises(OSError) as cm: 3387 getxattr(fn, s("user.test"), **kwargs) 3388 self.assertEqual(cm.exception.errno, errno.ENODATA) 3389 3390 init_xattr = listxattr(fn) 3391 self.assertIsInstance(init_xattr, list) 3392 3393 setxattr(fn, s("user.test"), b"", **kwargs) 3394 xattr = set(init_xattr) 3395 xattr.add("user.test") 3396 self.assertEqual(set(listxattr(fn)), xattr) 3397 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"") 3398 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs) 3399 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello") 3400 3401 with self.assertRaises(OSError) as cm: 3402 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs) 3403 self.assertEqual(cm.exception.errno, errno.EEXIST) 3404 3405 with self.assertRaises(OSError) as cm: 3406 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs) 3407 self.assertEqual(cm.exception.errno, errno.ENODATA) 3408 3409 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs) 3410 xattr.add("user.test2") 3411 self.assertEqual(set(listxattr(fn)), xattr) 3412 removexattr(fn, s("user.test"), **kwargs) 3413 3414 with self.assertRaises(OSError) as cm: 3415 getxattr(fn, s("user.test"), **kwargs) 3416 self.assertEqual(cm.exception.errno, errno.ENODATA) 3417 3418 xattr.remove("user.test") 3419 self.assertEqual(set(listxattr(fn)), xattr) 3420 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo") 3421 setxattr(fn, s("user.test"), b"a"*1024, **kwargs) 3422 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024) 3423 removexattr(fn, s("user.test"), **kwargs) 3424 many = sorted("user.test{}".format(i) for i in range(100)) 3425 for thing in many: 3426 setxattr(fn, thing, b"x", **kwargs) 3427 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many)) 3428 3429 def _check_xattrs(self, *args, **kwargs): 3430 self._check_xattrs_str(str, *args, **kwargs) 3431 support.unlink(support.TESTFN) 3432 3433 self._check_xattrs_str(os.fsencode, *args, **kwargs) 3434 support.unlink(support.TESTFN) 3435 3436 def test_simple(self): 3437 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr, 3438 os.listxattr) 3439 3440 def test_lpath(self): 3441 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr, 3442 os.listxattr, follow_symlinks=False) 3443 3444 def test_fds(self): 3445 def getxattr(path, *args): 3446 with open(path, "rb") as fp: 3447 return os.getxattr(fp.fileno(), *args) 3448 def setxattr(path, *args): 3449 with open(path, "wb", 0) as fp: 3450 os.setxattr(fp.fileno(), *args) 3451 def removexattr(path, *args): 3452 with open(path, "wb", 0) as fp: 3453 os.removexattr(fp.fileno(), *args) 3454 def listxattr(path, *args): 3455 with open(path, "rb") as fp: 3456 return os.listxattr(fp.fileno(), *args) 3457 self._check_xattrs(getxattr, setxattr, removexattr, listxattr) 3458 3459 3460@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size") 3461class TermsizeTests(unittest.TestCase): 3462 def test_does_not_crash(self): 3463 """Check if get_terminal_size() returns a meaningful value. 3464 3465 There's no easy portable way to actually check the size of the 3466 terminal, so let's check if it returns something sensible instead. 3467 """ 3468 try: 3469 size = os.get_terminal_size() 3470 except OSError as e: 3471 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY): 3472 # Under win32 a generic OSError can be thrown if the 3473 # handle cannot be retrieved 3474 self.skipTest("failed to query terminal size") 3475 raise 3476 3477 self.assertGreaterEqual(size.columns, 0) 3478 self.assertGreaterEqual(size.lines, 0) 3479 3480 def test_stty_match(self): 3481 """Check if stty returns the same results 3482 3483 stty actually tests stdin, so get_terminal_size is invoked on 3484 stdin explicitly. If stty succeeded, then get_terminal_size() 3485 should work too. 3486 """ 3487 try: 3488 size = ( 3489 subprocess.check_output( 3490 ["stty", "size"], stderr=subprocess.DEVNULL, text=True 3491 ).split() 3492 ) 3493 except (FileNotFoundError, subprocess.CalledProcessError, 3494 PermissionError): 3495 self.skipTest("stty invocation failed") 3496 expected = (int(size[1]), int(size[0])) # reversed order 3497 3498 try: 3499 actual = os.get_terminal_size(sys.__stdin__.fileno()) 3500 except OSError as e: 3501 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY): 3502 # Under win32 a generic OSError can be thrown if the 3503 # handle cannot be retrieved 3504 self.skipTest("failed to query terminal size") 3505 raise 3506 self.assertEqual(expected, actual) 3507 3508 3509@unittest.skipUnless(hasattr(os, 'memfd_create'), 'requires os.memfd_create') 3510@support.requires_linux_version(3, 17) 3511class MemfdCreateTests(unittest.TestCase): 3512 def test_memfd_create(self): 3513 fd = os.memfd_create("Hi", os.MFD_CLOEXEC) 3514 self.assertNotEqual(fd, -1) 3515 self.addCleanup(os.close, fd) 3516 self.assertFalse(os.get_inheritable(fd)) 3517 with open(fd, "wb", closefd=False) as f: 3518 f.write(b'memfd_create') 3519 self.assertEqual(f.tell(), 12) 3520 3521 fd2 = os.memfd_create("Hi") 3522 self.addCleanup(os.close, fd2) 3523 self.assertFalse(os.get_inheritable(fd2)) 3524 3525 3526class OSErrorTests(unittest.TestCase): 3527 def setUp(self): 3528 class Str(str): 3529 pass 3530 3531 self.bytes_filenames = [] 3532 self.unicode_filenames = [] 3533 if support.TESTFN_UNENCODABLE is not None: 3534 decoded = support.TESTFN_UNENCODABLE 3535 else: 3536 decoded = support.TESTFN 3537 self.unicode_filenames.append(decoded) 3538 self.unicode_filenames.append(Str(decoded)) 3539 if support.TESTFN_UNDECODABLE is not None: 3540 encoded = support.TESTFN_UNDECODABLE 3541 else: 3542 encoded = os.fsencode(support.TESTFN) 3543 self.bytes_filenames.append(encoded) 3544 self.bytes_filenames.append(bytearray(encoded)) 3545 self.bytes_filenames.append(memoryview(encoded)) 3546 3547 self.filenames = self.bytes_filenames + self.unicode_filenames 3548 3549 def test_oserror_filename(self): 3550 funcs = [ 3551 (self.filenames, os.chdir,), 3552 (self.filenames, os.chmod, 0o777), 3553 (self.filenames, os.lstat,), 3554 (self.filenames, os.open, os.O_RDONLY), 3555 (self.filenames, os.rmdir,), 3556 (self.filenames, os.stat,), 3557 (self.filenames, os.unlink,), 3558 ] 3559 if sys.platform == "win32": 3560 funcs.extend(( 3561 (self.bytes_filenames, os.rename, b"dst"), 3562 (self.bytes_filenames, os.replace, b"dst"), 3563 (self.unicode_filenames, os.rename, "dst"), 3564 (self.unicode_filenames, os.replace, "dst"), 3565 (self.unicode_filenames, os.listdir, ), 3566 )) 3567 else: 3568 funcs.extend(( 3569 (self.filenames, os.listdir,), 3570 (self.filenames, os.rename, "dst"), 3571 (self.filenames, os.replace, "dst"), 3572 )) 3573 if hasattr(os, "chown"): 3574 funcs.append((self.filenames, os.chown, 0, 0)) 3575 if hasattr(os, "lchown"): 3576 funcs.append((self.filenames, os.lchown, 0, 0)) 3577 if hasattr(os, "truncate"): 3578 funcs.append((self.filenames, os.truncate, 0)) 3579 if hasattr(os, "chflags"): 3580 funcs.append((self.filenames, os.chflags, 0)) 3581 if hasattr(os, "lchflags"): 3582 funcs.append((self.filenames, os.lchflags, 0)) 3583 if hasattr(os, "chroot"): 3584 funcs.append((self.filenames, os.chroot,)) 3585 if hasattr(os, "link"): 3586 if sys.platform == "win32": 3587 funcs.append((self.bytes_filenames, os.link, b"dst")) 3588 funcs.append((self.unicode_filenames, os.link, "dst")) 3589 else: 3590 funcs.append((self.filenames, os.link, "dst")) 3591 if hasattr(os, "listxattr"): 3592 funcs.extend(( 3593 (self.filenames, os.listxattr,), 3594 (self.filenames, os.getxattr, "user.test"), 3595 (self.filenames, os.setxattr, "user.test", b'user'), 3596 (self.filenames, os.removexattr, "user.test"), 3597 )) 3598 if hasattr(os, "lchmod"): 3599 funcs.append((self.filenames, os.lchmod, 0o777)) 3600 if hasattr(os, "readlink"): 3601 funcs.append((self.filenames, os.readlink,)) 3602 3603 3604 for filenames, func, *func_args in funcs: 3605 for name in filenames: 3606 try: 3607 if isinstance(name, (str, bytes)): 3608 func(name, *func_args) 3609 else: 3610 with self.assertWarnsRegex(DeprecationWarning, 'should be'): 3611 func(name, *func_args) 3612 except OSError as err: 3613 self.assertIs(err.filename, name, str(func)) 3614 except UnicodeDecodeError: 3615 pass 3616 else: 3617 self.fail("No exception thrown by {}".format(func)) 3618 3619class CPUCountTests(unittest.TestCase): 3620 def test_cpu_count(self): 3621 cpus = os.cpu_count() 3622 if cpus is not None: 3623 self.assertIsInstance(cpus, int) 3624 self.assertGreater(cpus, 0) 3625 else: 3626 self.skipTest("Could not determine the number of CPUs") 3627 3628 3629class FDInheritanceTests(unittest.TestCase): 3630 def test_get_set_inheritable(self): 3631 fd = os.open(__file__, os.O_RDONLY) 3632 self.addCleanup(os.close, fd) 3633 self.assertEqual(os.get_inheritable(fd), False) 3634 3635 os.set_inheritable(fd, True) 3636 self.assertEqual(os.get_inheritable(fd), True) 3637 3638 @unittest.skipIf(fcntl is None, "need fcntl") 3639 def test_get_inheritable_cloexec(self): 3640 fd = os.open(__file__, os.O_RDONLY) 3641 self.addCleanup(os.close, fd) 3642 self.assertEqual(os.get_inheritable(fd), False) 3643 3644 # clear FD_CLOEXEC flag 3645 flags = fcntl.fcntl(fd, fcntl.F_GETFD) 3646 flags &= ~fcntl.FD_CLOEXEC 3647 fcntl.fcntl(fd, fcntl.F_SETFD, flags) 3648 3649 self.assertEqual(os.get_inheritable(fd), True) 3650 3651 @unittest.skipIf(fcntl is None, "need fcntl") 3652 def test_set_inheritable_cloexec(self): 3653 fd = os.open(__file__, os.O_RDONLY) 3654 self.addCleanup(os.close, fd) 3655 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, 3656 fcntl.FD_CLOEXEC) 3657 3658 os.set_inheritable(fd, True) 3659 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, 3660 0) 3661 3662 @unittest.skipUnless(hasattr(os, 'O_PATH'), "need os.O_PATH") 3663 def test_get_set_inheritable_o_path(self): 3664 fd = os.open(__file__, os.O_PATH) 3665 self.addCleanup(os.close, fd) 3666 self.assertEqual(os.get_inheritable(fd), False) 3667 3668 os.set_inheritable(fd, True) 3669 self.assertEqual(os.get_inheritable(fd), True) 3670 3671 os.set_inheritable(fd, False) 3672 self.assertEqual(os.get_inheritable(fd), False) 3673 3674 def test_get_set_inheritable_badf(self): 3675 fd = support.make_bad_fd() 3676 3677 with self.assertRaises(OSError) as ctx: 3678 os.get_inheritable(fd) 3679 self.assertEqual(ctx.exception.errno, errno.EBADF) 3680 3681 with self.assertRaises(OSError) as ctx: 3682 os.set_inheritable(fd, True) 3683 self.assertEqual(ctx.exception.errno, errno.EBADF) 3684 3685 with self.assertRaises(OSError) as ctx: 3686 os.set_inheritable(fd, False) 3687 self.assertEqual(ctx.exception.errno, errno.EBADF) 3688 3689 def test_open(self): 3690 fd = os.open(__file__, os.O_RDONLY) 3691 self.addCleanup(os.close, fd) 3692 self.assertEqual(os.get_inheritable(fd), False) 3693 3694 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()") 3695 def test_pipe(self): 3696 rfd, wfd = os.pipe() 3697 self.addCleanup(os.close, rfd) 3698 self.addCleanup(os.close, wfd) 3699 self.assertEqual(os.get_inheritable(rfd), False) 3700 self.assertEqual(os.get_inheritable(wfd), False) 3701 3702 def test_dup(self): 3703 fd1 = os.open(__file__, os.O_RDONLY) 3704 self.addCleanup(os.close, fd1) 3705 3706 fd2 = os.dup(fd1) 3707 self.addCleanup(os.close, fd2) 3708 self.assertEqual(os.get_inheritable(fd2), False) 3709 3710 def test_dup_standard_stream(self): 3711 fd = os.dup(1) 3712 self.addCleanup(os.close, fd) 3713 self.assertGreater(fd, 0) 3714 3715 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test') 3716 def test_dup_nul(self): 3717 # os.dup() was creating inheritable fds for character files. 3718 fd1 = os.open('NUL', os.O_RDONLY) 3719 self.addCleanup(os.close, fd1) 3720 fd2 = os.dup(fd1) 3721 self.addCleanup(os.close, fd2) 3722 self.assertFalse(os.get_inheritable(fd2)) 3723 3724 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()") 3725 def test_dup2(self): 3726 fd = os.open(__file__, os.O_RDONLY) 3727 self.addCleanup(os.close, fd) 3728 3729 # inheritable by default 3730 fd2 = os.open(__file__, os.O_RDONLY) 3731 self.addCleanup(os.close, fd2) 3732 self.assertEqual(os.dup2(fd, fd2), fd2) 3733 self.assertTrue(os.get_inheritable(fd2)) 3734 3735 # force non-inheritable 3736 fd3 = os.open(__file__, os.O_RDONLY) 3737 self.addCleanup(os.close, fd3) 3738 self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3) 3739 self.assertFalse(os.get_inheritable(fd3)) 3740 3741 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()") 3742 def test_openpty(self): 3743 master_fd, slave_fd = os.openpty() 3744 self.addCleanup(os.close, master_fd) 3745 self.addCleanup(os.close, slave_fd) 3746 self.assertEqual(os.get_inheritable(master_fd), False) 3747 self.assertEqual(os.get_inheritable(slave_fd), False) 3748 3749 3750class PathTConverterTests(unittest.TestCase): 3751 # tuples of (function name, allows fd arguments, additional arguments to 3752 # function, cleanup function) 3753 functions = [ 3754 ('stat', True, (), None), 3755 ('lstat', False, (), None), 3756 ('access', False, (os.F_OK,), None), 3757 ('chflags', False, (0,), None), 3758 ('lchflags', False, (0,), None), 3759 ('open', False, (0,), getattr(os, 'close', None)), 3760 ] 3761 3762 def test_path_t_converter(self): 3763 str_filename = support.TESTFN 3764 if os.name == 'nt': 3765 bytes_fspath = bytes_filename = None 3766 else: 3767 bytes_filename = os.fsencode(support.TESTFN) 3768 bytes_fspath = FakePath(bytes_filename) 3769 fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT) 3770 self.addCleanup(support.unlink, support.TESTFN) 3771 self.addCleanup(os.close, fd) 3772 3773 int_fspath = FakePath(fd) 3774 str_fspath = FakePath(str_filename) 3775 3776 for name, allow_fd, extra_args, cleanup_fn in self.functions: 3777 with self.subTest(name=name): 3778 try: 3779 fn = getattr(os, name) 3780 except AttributeError: 3781 continue 3782 3783 for path in (str_filename, bytes_filename, str_fspath, 3784 bytes_fspath): 3785 if path is None: 3786 continue 3787 with self.subTest(name=name, path=path): 3788 result = fn(path, *extra_args) 3789 if cleanup_fn is not None: 3790 cleanup_fn(result) 3791 3792 with self.assertRaisesRegex( 3793 TypeError, 'to return str or bytes'): 3794 fn(int_fspath, *extra_args) 3795 3796 if allow_fd: 3797 result = fn(fd, *extra_args) # should not fail 3798 if cleanup_fn is not None: 3799 cleanup_fn(result) 3800 else: 3801 with self.assertRaisesRegex( 3802 TypeError, 3803 'os.PathLike'): 3804 fn(fd, *extra_args) 3805 3806 def test_path_t_converter_and_custom_class(self): 3807 msg = r'__fspath__\(\) to return str or bytes, not %s' 3808 with self.assertRaisesRegex(TypeError, msg % r'int'): 3809 os.stat(FakePath(2)) 3810 with self.assertRaisesRegex(TypeError, msg % r'float'): 3811 os.stat(FakePath(2.34)) 3812 with self.assertRaisesRegex(TypeError, msg % r'object'): 3813 os.stat(FakePath(object())) 3814 3815 3816@unittest.skipUnless(hasattr(os, 'get_blocking'), 3817 'needs os.get_blocking() and os.set_blocking()') 3818class BlockingTests(unittest.TestCase): 3819 def test_blocking(self): 3820 fd = os.open(__file__, os.O_RDONLY) 3821 self.addCleanup(os.close, fd) 3822 self.assertEqual(os.get_blocking(fd), True) 3823 3824 os.set_blocking(fd, False) 3825 self.assertEqual(os.get_blocking(fd), False) 3826 3827 os.set_blocking(fd, True) 3828 self.assertEqual(os.get_blocking(fd), True) 3829 3830 3831 3832class ExportsTests(unittest.TestCase): 3833 def test_os_all(self): 3834 self.assertIn('open', os.__all__) 3835 self.assertIn('walk', os.__all__) 3836 3837 3838class TestDirEntry(unittest.TestCase): 3839 def setUp(self): 3840 self.path = os.path.realpath(support.TESTFN) 3841 self.addCleanup(support.rmtree, self.path) 3842 os.mkdir(self.path) 3843 3844 def test_uninstantiable(self): 3845 self.assertRaises(TypeError, os.DirEntry) 3846 3847 def test_unpickable(self): 3848 filename = create_file(os.path.join(self.path, "file.txt"), b'python') 3849 entry = [entry for entry in os.scandir(self.path)].pop() 3850 self.assertIsInstance(entry, os.DirEntry) 3851 self.assertEqual(entry.name, "file.txt") 3852 import pickle 3853 self.assertRaises(TypeError, pickle.dumps, entry, filename) 3854 3855 3856class TestScandir(unittest.TestCase): 3857 check_no_resource_warning = support.check_no_resource_warning 3858 3859 def setUp(self): 3860 self.path = os.path.realpath(support.TESTFN) 3861 self.bytes_path = os.fsencode(self.path) 3862 self.addCleanup(support.rmtree, self.path) 3863 os.mkdir(self.path) 3864 3865 def create_file(self, name="file.txt"): 3866 path = self.bytes_path if isinstance(name, bytes) else self.path 3867 filename = os.path.join(path, name) 3868 create_file(filename, b'python') 3869 return filename 3870 3871 def get_entries(self, names): 3872 entries = dict((entry.name, entry) 3873 for entry in os.scandir(self.path)) 3874 self.assertEqual(sorted(entries.keys()), names) 3875 return entries 3876 3877 def assert_stat_equal(self, stat1, stat2, skip_fields): 3878 if skip_fields: 3879 for attr in dir(stat1): 3880 if not attr.startswith("st_"): 3881 continue 3882 if attr in ("st_dev", "st_ino", "st_nlink"): 3883 continue 3884 self.assertEqual(getattr(stat1, attr), 3885 getattr(stat2, attr), 3886 (stat1, stat2, attr)) 3887 else: 3888 self.assertEqual(stat1, stat2) 3889 3890 def test_uninstantiable(self): 3891 scandir_iter = os.scandir(self.path) 3892 self.assertRaises(TypeError, type(scandir_iter)) 3893 scandir_iter.close() 3894 3895 def test_unpickable(self): 3896 filename = self.create_file("file.txt") 3897 scandir_iter = os.scandir(self.path) 3898 import pickle 3899 self.assertRaises(TypeError, pickle.dumps, scandir_iter, filename) 3900 scandir_iter.close() 3901 3902 def check_entry(self, entry, name, is_dir, is_file, is_symlink): 3903 self.assertIsInstance(entry, os.DirEntry) 3904 self.assertEqual(entry.name, name) 3905 self.assertEqual(entry.path, os.path.join(self.path, name)) 3906 self.assertEqual(entry.inode(), 3907 os.stat(entry.path, follow_symlinks=False).st_ino) 3908 3909 entry_stat = os.stat(entry.path) 3910 self.assertEqual(entry.is_dir(), 3911 stat.S_ISDIR(entry_stat.st_mode)) 3912 self.assertEqual(entry.is_file(), 3913 stat.S_ISREG(entry_stat.st_mode)) 3914 self.assertEqual(entry.is_symlink(), 3915 os.path.islink(entry.path)) 3916 3917 entry_lstat = os.stat(entry.path, follow_symlinks=False) 3918 self.assertEqual(entry.is_dir(follow_symlinks=False), 3919 stat.S_ISDIR(entry_lstat.st_mode)) 3920 self.assertEqual(entry.is_file(follow_symlinks=False), 3921 stat.S_ISREG(entry_lstat.st_mode)) 3922 3923 self.assert_stat_equal(entry.stat(), 3924 entry_stat, 3925 os.name == 'nt' and not is_symlink) 3926 self.assert_stat_equal(entry.stat(follow_symlinks=False), 3927 entry_lstat, 3928 os.name == 'nt') 3929 3930 def test_attributes(self): 3931 link = hasattr(os, 'link') 3932 symlink = support.can_symlink() 3933 3934 dirname = os.path.join(self.path, "dir") 3935 os.mkdir(dirname) 3936 filename = self.create_file("file.txt") 3937 if link: 3938 try: 3939 os.link(filename, os.path.join(self.path, "link_file.txt")) 3940 except PermissionError as e: 3941 self.skipTest('os.link(): %s' % e) 3942 if symlink: 3943 os.symlink(dirname, os.path.join(self.path, "symlink_dir"), 3944 target_is_directory=True) 3945 os.symlink(filename, os.path.join(self.path, "symlink_file.txt")) 3946 3947 names = ['dir', 'file.txt'] 3948 if link: 3949 names.append('link_file.txt') 3950 if symlink: 3951 names.extend(('symlink_dir', 'symlink_file.txt')) 3952 entries = self.get_entries(names) 3953 3954 entry = entries['dir'] 3955 self.check_entry(entry, 'dir', True, False, False) 3956 3957 entry = entries['file.txt'] 3958 self.check_entry(entry, 'file.txt', False, True, False) 3959 3960 if link: 3961 entry = entries['link_file.txt'] 3962 self.check_entry(entry, 'link_file.txt', False, True, False) 3963 3964 if symlink: 3965 entry = entries['symlink_dir'] 3966 self.check_entry(entry, 'symlink_dir', True, False, True) 3967 3968 entry = entries['symlink_file.txt'] 3969 self.check_entry(entry, 'symlink_file.txt', False, True, True) 3970 3971 def get_entry(self, name): 3972 path = self.bytes_path if isinstance(name, bytes) else self.path 3973 entries = list(os.scandir(path)) 3974 self.assertEqual(len(entries), 1) 3975 3976 entry = entries[0] 3977 self.assertEqual(entry.name, name) 3978 return entry 3979 3980 def create_file_entry(self, name='file.txt'): 3981 filename = self.create_file(name=name) 3982 return self.get_entry(os.path.basename(filename)) 3983 3984 def test_current_directory(self): 3985 filename = self.create_file() 3986 old_dir = os.getcwd() 3987 try: 3988 os.chdir(self.path) 3989 3990 # call scandir() without parameter: it must list the content 3991 # of the current directory 3992 entries = dict((entry.name, entry) for entry in os.scandir()) 3993 self.assertEqual(sorted(entries.keys()), 3994 [os.path.basename(filename)]) 3995 finally: 3996 os.chdir(old_dir) 3997 3998 def test_repr(self): 3999 entry = self.create_file_entry() 4000 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>") 4001 4002 def test_fspath_protocol(self): 4003 entry = self.create_file_entry() 4004 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt')) 4005 4006 def test_fspath_protocol_bytes(self): 4007 bytes_filename = os.fsencode('bytesfile.txt') 4008 bytes_entry = self.create_file_entry(name=bytes_filename) 4009 fspath = os.fspath(bytes_entry) 4010 self.assertIsInstance(fspath, bytes) 4011 self.assertEqual(fspath, 4012 os.path.join(os.fsencode(self.path),bytes_filename)) 4013 4014 def test_removed_dir(self): 4015 path = os.path.join(self.path, 'dir') 4016 4017 os.mkdir(path) 4018 entry = self.get_entry('dir') 4019 os.rmdir(path) 4020 4021 # On POSIX, is_dir() result depends if scandir() filled d_type or not 4022 if os.name == 'nt': 4023 self.assertTrue(entry.is_dir()) 4024 self.assertFalse(entry.is_file()) 4025 self.assertFalse(entry.is_symlink()) 4026 if os.name == 'nt': 4027 self.assertRaises(FileNotFoundError, entry.inode) 4028 # don't fail 4029 entry.stat() 4030 entry.stat(follow_symlinks=False) 4031 else: 4032 self.assertGreater(entry.inode(), 0) 4033 self.assertRaises(FileNotFoundError, entry.stat) 4034 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False) 4035 4036 def test_removed_file(self): 4037 entry = self.create_file_entry() 4038 os.unlink(entry.path) 4039 4040 self.assertFalse(entry.is_dir()) 4041 # On POSIX, is_dir() result depends if scandir() filled d_type or not 4042 if os.name == 'nt': 4043 self.assertTrue(entry.is_file()) 4044 self.assertFalse(entry.is_symlink()) 4045 if os.name == 'nt': 4046 self.assertRaises(FileNotFoundError, entry.inode) 4047 # don't fail 4048 entry.stat() 4049 entry.stat(follow_symlinks=False) 4050 else: 4051 self.assertGreater(entry.inode(), 0) 4052 self.assertRaises(FileNotFoundError, entry.stat) 4053 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False) 4054 4055 def test_broken_symlink(self): 4056 if not support.can_symlink(): 4057 return self.skipTest('cannot create symbolic link') 4058 4059 filename = self.create_file("file.txt") 4060 os.symlink(filename, 4061 os.path.join(self.path, "symlink.txt")) 4062 entries = self.get_entries(['file.txt', 'symlink.txt']) 4063 entry = entries['symlink.txt'] 4064 os.unlink(filename) 4065 4066 self.assertGreater(entry.inode(), 0) 4067 self.assertFalse(entry.is_dir()) 4068 self.assertFalse(entry.is_file()) # broken symlink returns False 4069 self.assertFalse(entry.is_dir(follow_symlinks=False)) 4070 self.assertFalse(entry.is_file(follow_symlinks=False)) 4071 self.assertTrue(entry.is_symlink()) 4072 self.assertRaises(FileNotFoundError, entry.stat) 4073 # don't fail 4074 entry.stat(follow_symlinks=False) 4075 4076 def test_bytes(self): 4077 self.create_file("file.txt") 4078 4079 path_bytes = os.fsencode(self.path) 4080 entries = list(os.scandir(path_bytes)) 4081 self.assertEqual(len(entries), 1, entries) 4082 entry = entries[0] 4083 4084 self.assertEqual(entry.name, b'file.txt') 4085 self.assertEqual(entry.path, 4086 os.fsencode(os.path.join(self.path, 'file.txt'))) 4087 4088 def test_bytes_like(self): 4089 self.create_file("file.txt") 4090 4091 for cls in bytearray, memoryview: 4092 path_bytes = cls(os.fsencode(self.path)) 4093 with self.assertWarns(DeprecationWarning): 4094 entries = list(os.scandir(path_bytes)) 4095 self.assertEqual(len(entries), 1, entries) 4096 entry = entries[0] 4097 4098 self.assertEqual(entry.name, b'file.txt') 4099 self.assertEqual(entry.path, 4100 os.fsencode(os.path.join(self.path, 'file.txt'))) 4101 self.assertIs(type(entry.name), bytes) 4102 self.assertIs(type(entry.path), bytes) 4103 4104 @unittest.skipUnless(os.listdir in os.supports_fd, 4105 'fd support for listdir required for this test.') 4106 def test_fd(self): 4107 self.assertIn(os.scandir, os.supports_fd) 4108 self.create_file('file.txt') 4109 expected_names = ['file.txt'] 4110 if support.can_symlink(): 4111 os.symlink('file.txt', os.path.join(self.path, 'link')) 4112 expected_names.append('link') 4113 4114 fd = os.open(self.path, os.O_RDONLY) 4115 try: 4116 with os.scandir(fd) as it: 4117 entries = list(it) 4118 names = [entry.name for entry in entries] 4119 self.assertEqual(sorted(names), expected_names) 4120 self.assertEqual(names, os.listdir(fd)) 4121 for entry in entries: 4122 self.assertEqual(entry.path, entry.name) 4123 self.assertEqual(os.fspath(entry), entry.name) 4124 self.assertEqual(entry.is_symlink(), entry.name == 'link') 4125 if os.stat in os.supports_dir_fd: 4126 st = os.stat(entry.name, dir_fd=fd) 4127 self.assertEqual(entry.stat(), st) 4128 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False) 4129 self.assertEqual(entry.stat(follow_symlinks=False), st) 4130 finally: 4131 os.close(fd) 4132 4133 def test_empty_path(self): 4134 self.assertRaises(FileNotFoundError, os.scandir, '') 4135 4136 def test_consume_iterator_twice(self): 4137 self.create_file("file.txt") 4138 iterator = os.scandir(self.path) 4139 4140 entries = list(iterator) 4141 self.assertEqual(len(entries), 1, entries) 4142 4143 # check than consuming the iterator twice doesn't raise exception 4144 entries2 = list(iterator) 4145 self.assertEqual(len(entries2), 0, entries2) 4146 4147 def test_bad_path_type(self): 4148 for obj in [1.234, {}, []]: 4149 self.assertRaises(TypeError, os.scandir, obj) 4150 4151 def test_close(self): 4152 self.create_file("file.txt") 4153 self.create_file("file2.txt") 4154 iterator = os.scandir(self.path) 4155 next(iterator) 4156 iterator.close() 4157 # multiple closes 4158 iterator.close() 4159 with self.check_no_resource_warning(): 4160 del iterator 4161 4162 def test_context_manager(self): 4163 self.create_file("file.txt") 4164 self.create_file("file2.txt") 4165 with os.scandir(self.path) as iterator: 4166 next(iterator) 4167 with self.check_no_resource_warning(): 4168 del iterator 4169 4170 def test_context_manager_close(self): 4171 self.create_file("file.txt") 4172 self.create_file("file2.txt") 4173 with os.scandir(self.path) as iterator: 4174 next(iterator) 4175 iterator.close() 4176 4177 def test_context_manager_exception(self): 4178 self.create_file("file.txt") 4179 self.create_file("file2.txt") 4180 with self.assertRaises(ZeroDivisionError): 4181 with os.scandir(self.path) as iterator: 4182 next(iterator) 4183 1/0 4184 with self.check_no_resource_warning(): 4185 del iterator 4186 4187 def test_resource_warning(self): 4188 self.create_file("file.txt") 4189 self.create_file("file2.txt") 4190 iterator = os.scandir(self.path) 4191 next(iterator) 4192 with self.assertWarns(ResourceWarning): 4193 del iterator 4194 support.gc_collect() 4195 # exhausted iterator 4196 iterator = os.scandir(self.path) 4197 list(iterator) 4198 with self.check_no_resource_warning(): 4199 del iterator 4200 4201 4202class TestPEP519(unittest.TestCase): 4203 4204 # Abstracted so it can be overridden to test pure Python implementation 4205 # if a C version is provided. 4206 fspath = staticmethod(os.fspath) 4207 4208 def test_return_bytes(self): 4209 for b in b'hello', b'goodbye', b'some/path/and/file': 4210 self.assertEqual(b, self.fspath(b)) 4211 4212 def test_return_string(self): 4213 for s in 'hello', 'goodbye', 'some/path/and/file': 4214 self.assertEqual(s, self.fspath(s)) 4215 4216 def test_fsencode_fsdecode(self): 4217 for p in "path/like/object", b"path/like/object": 4218 pathlike = FakePath(p) 4219 4220 self.assertEqual(p, self.fspath(pathlike)) 4221 self.assertEqual(b"path/like/object", os.fsencode(pathlike)) 4222 self.assertEqual("path/like/object", os.fsdecode(pathlike)) 4223 4224 def test_pathlike(self): 4225 self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil'))) 4226 self.assertTrue(issubclass(FakePath, os.PathLike)) 4227 self.assertTrue(isinstance(FakePath('x'), os.PathLike)) 4228 4229 def test_garbage_in_exception_out(self): 4230 vapor = type('blah', (), {}) 4231 for o in int, type, os, vapor(): 4232 self.assertRaises(TypeError, self.fspath, o) 4233 4234 def test_argument_required(self): 4235 self.assertRaises(TypeError, self.fspath) 4236 4237 def test_bad_pathlike(self): 4238 # __fspath__ returns a value other than str or bytes. 4239 self.assertRaises(TypeError, self.fspath, FakePath(42)) 4240 # __fspath__ attribute that is not callable. 4241 c = type('foo', (), {}) 4242 c.__fspath__ = 1 4243 self.assertRaises(TypeError, self.fspath, c()) 4244 # __fspath__ raises an exception. 4245 self.assertRaises(ZeroDivisionError, self.fspath, 4246 FakePath(ZeroDivisionError())) 4247 4248 def test_pathlike_subclasshook(self): 4249 # bpo-38878: subclasshook causes subclass checks 4250 # true on abstract implementation. 4251 class A(os.PathLike): 4252 pass 4253 self.assertFalse(issubclass(FakePath, A)) 4254 self.assertTrue(issubclass(FakePath, os.PathLike)) 4255 4256 def test_pathlike_class_getitem(self): 4257 self.assertIsInstance(os.PathLike[bytes], types.GenericAlias) 4258 4259 4260class TimesTests(unittest.TestCase): 4261 def test_times(self): 4262 times = os.times() 4263 self.assertIsInstance(times, os.times_result) 4264 4265 for field in ('user', 'system', 'children_user', 'children_system', 4266 'elapsed'): 4267 value = getattr(times, field) 4268 self.assertIsInstance(value, float) 4269 4270 if os.name == 'nt': 4271 self.assertEqual(times.children_user, 0) 4272 self.assertEqual(times.children_system, 0) 4273 self.assertEqual(times.elapsed, 0) 4274 4275 4276# Only test if the C version is provided, otherwise TestPEP519 already tested 4277# the pure Python implementation. 4278if hasattr(os, "_fspath"): 4279 class TestPEP519PurePython(TestPEP519): 4280 4281 """Explicitly test the pure Python implementation of os.fspath().""" 4282 4283 fspath = staticmethod(os._fspath) 4284 4285 4286if __name__ == "__main__": 4287 unittest.main() 4288