1# As a test suite for the os module, this is woefully inadequate, but this 2# does add tests for a few functions which have been determined to be more 3# portable than they had been thought to be. 4 5import codecs 6import contextlib 7import decimal 8import errno 9import fnmatch 10import fractions 11import itertools 12import locale 13import mmap 14import os 15import pickle 16import select 17import shutil 18import signal 19import socket 20import stat 21import struct 22import subprocess 23import sys 24import sysconfig 25import tempfile 26import threading 27import time 28import types 29import unittest 30import uuid 31import warnings 32from test import support 33from test.support import import_helper 34from test.support import os_helper 35from test.support import socket_helper 36from test.support import threading_helper 37from test.support import warnings_helper 38from platform import win32_is_iot 39 40with warnings.catch_warnings(): 41 warnings.simplefilter('ignore', DeprecationWarning) 42 import asynchat 43 import asyncore 44 45try: 46 import resource 47except ImportError: 48 resource = None 49try: 50 import fcntl 51except ImportError: 52 fcntl = None 53try: 54 import _winapi 55except ImportError: 56 _winapi = None 57try: 58 import pwd 59 all_users = [u.pw_uid for u in pwd.getpwall()] 60except (ImportError, AttributeError): 61 all_users = [] 62try: 63 from _testcapi import INT_MAX, PY_SSIZE_T_MAX 64except ImportError: 65 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize 66 67 68from test.support.script_helper import assert_python_ok 69from test.support import unix_shell 70from test.support.os_helper import FakePath 71 72 73root_in_posix = False 74if hasattr(os, 'geteuid'): 75 root_in_posix = (os.geteuid() == 0) 76 77# Detect whether we're on a Linux system that uses the (now outdated 78# and unmaintained) linuxthreads threading library. There's an issue 79# when combining linuxthreads with a failed execv call: see 80# http://bugs.python.org/issue4970. 81if hasattr(sys, 'thread_info') and sys.thread_info.version: 82 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads") 83else: 84 USING_LINUXTHREADS = False 85 86# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group. 87HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0 88 89 90def requires_os_func(name): 91 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name) 92 93 94def create_file(filename, content=b'content'): 95 with open(filename, "xb", 0) as fp: 96 fp.write(content) 97 98 99# bpo-41625: On AIX, splice() only works with a socket, not with a pipe. 100requires_splice_pipe = unittest.skipIf(sys.platform.startswith("aix"), 101 'on AIX, splice() only accepts sockets') 102 103 104class MiscTests(unittest.TestCase): 105 def test_getcwd(self): 106 cwd = os.getcwd() 107 self.assertIsInstance(cwd, str) 108 109 def test_getcwd_long_path(self): 110 # bpo-37412: On Linux, PATH_MAX is usually around 4096 bytes. On 111 # Windows, MAX_PATH is defined as 260 characters, but Windows supports 112 # longer path if longer paths support is enabled. Internally, the os 113 # module uses MAXPATHLEN which is at least 1024. 114 # 115 # Use a directory name of 200 characters to fit into Windows MAX_PATH 116 # limit. 117 # 118 # On Windows, the test can stop when trying to create a path longer 119 # than MAX_PATH if long paths support is disabled: 120 # see RtlAreLongPathsEnabled(). 121 min_len = 2000 # characters 122 # On VxWorks, PATH_MAX is defined as 1024 bytes. Creating a path 123 # longer than PATH_MAX will fail. 124 if sys.platform == 'vxworks': 125 min_len = 1000 126 dirlen = 200 # characters 127 dirname = 'python_test_dir_' 128 dirname = dirname + ('a' * (dirlen - len(dirname))) 129 130 with tempfile.TemporaryDirectory() as tmpdir: 131 with os_helper.change_cwd(tmpdir) as path: 132 expected = path 133 134 while True: 135 cwd = os.getcwd() 136 self.assertEqual(cwd, expected) 137 138 need = min_len - (len(cwd) + len(os.path.sep)) 139 if need <= 0: 140 break 141 if len(dirname) > need and need > 0: 142 dirname = dirname[:need] 143 144 path = os.path.join(path, dirname) 145 try: 146 os.mkdir(path) 147 # On Windows, chdir() can fail 148 # even if mkdir() succeeded 149 os.chdir(path) 150 except FileNotFoundError: 151 # On Windows, catch ERROR_PATH_NOT_FOUND (3) and 152 # ERROR_FILENAME_EXCED_RANGE (206) errors 153 # ("The filename or extension is too long") 154 break 155 except OSError as exc: 156 if exc.errno == errno.ENAMETOOLONG: 157 break 158 else: 159 raise 160 161 expected = path 162 163 if support.verbose: 164 print(f"Tested current directory length: {len(cwd)}") 165 166 def test_getcwdb(self): 167 cwd = os.getcwdb() 168 self.assertIsInstance(cwd, bytes) 169 self.assertEqual(os.fsdecode(cwd), os.getcwd()) 170 171 172# Tests creating TESTFN 173class FileTests(unittest.TestCase): 174 def setUp(self): 175 if os.path.lexists(os_helper.TESTFN): 176 os.unlink(os_helper.TESTFN) 177 tearDown = setUp 178 179 def test_access(self): 180 f = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR) 181 os.close(f) 182 self.assertTrue(os.access(os_helper.TESTFN, os.W_OK)) 183 184 def test_closerange(self): 185 first = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR) 186 # We must allocate two consecutive file descriptors, otherwise 187 # it will mess up other file descriptors (perhaps even the three 188 # standard ones). 189 second = os.dup(first) 190 try: 191 retries = 0 192 while second != first + 1: 193 os.close(first) 194 retries += 1 195 if retries > 10: 196 # XXX test skipped 197 self.skipTest("couldn't allocate two consecutive fds") 198 first, second = second, os.dup(second) 199 finally: 200 os.close(second) 201 # close a fd that is open, and one that isn't 202 os.closerange(first, first + 2) 203 self.assertRaises(OSError, os.write, first, b"a") 204 205 @support.cpython_only 206 def test_rename(self): 207 path = os_helper.TESTFN 208 old = sys.getrefcount(path) 209 self.assertRaises(TypeError, os.rename, path, 0) 210 new = sys.getrefcount(path) 211 self.assertEqual(old, new) 212 213 def test_read(self): 214 with open(os_helper.TESTFN, "w+b") as fobj: 215 fobj.write(b"spam") 216 fobj.flush() 217 fd = fobj.fileno() 218 os.lseek(fd, 0, 0) 219 s = os.read(fd, 4) 220 self.assertEqual(type(s), bytes) 221 self.assertEqual(s, b"spam") 222 223 @support.cpython_only 224 # Skip the test on 32-bit platforms: the number of bytes must fit in a 225 # Py_ssize_t type 226 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX, 227 "needs INT_MAX < PY_SSIZE_T_MAX") 228 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False) 229 def test_large_read(self, size): 230 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 231 create_file(os_helper.TESTFN, b'test') 232 233 # Issue #21932: Make sure that os.read() does not raise an 234 # OverflowError for size larger than INT_MAX 235 with open(os_helper.TESTFN, "rb") as fp: 236 data = os.read(fp.fileno(), size) 237 238 # The test does not try to read more than 2 GiB at once because the 239 # operating system is free to return less bytes than requested. 240 self.assertEqual(data, b'test') 241 242 def test_write(self): 243 # os.write() accepts bytes- and buffer-like objects but not strings 244 fd = os.open(os_helper.TESTFN, os.O_CREAT | os.O_WRONLY) 245 self.assertRaises(TypeError, os.write, fd, "beans") 246 os.write(fd, b"bacon\n") 247 os.write(fd, bytearray(b"eggs\n")) 248 os.write(fd, memoryview(b"spam\n")) 249 os.close(fd) 250 with open(os_helper.TESTFN, "rb") as fobj: 251 self.assertEqual(fobj.read().splitlines(), 252 [b"bacon", b"eggs", b"spam"]) 253 254 def write_windows_console(self, *args): 255 retcode = subprocess.call(args, 256 # use a new console to not flood the test output 257 creationflags=subprocess.CREATE_NEW_CONSOLE, 258 # use a shell to hide the console window (SW_HIDE) 259 shell=True) 260 self.assertEqual(retcode, 0) 261 262 @unittest.skipUnless(sys.platform == 'win32', 263 'test specific to the Windows console') 264 def test_write_windows_console(self): 265 # Issue #11395: the Windows console returns an error (12: not enough 266 # space error) on writing into stdout if stdout mode is binary and the 267 # length is greater than 66,000 bytes (or less, depending on heap 268 # usage). 269 code = "print('x' * 100000)" 270 self.write_windows_console(sys.executable, "-c", code) 271 self.write_windows_console(sys.executable, "-u", "-c", code) 272 273 def fdopen_helper(self, *args): 274 fd = os.open(os_helper.TESTFN, os.O_RDONLY) 275 f = os.fdopen(fd, *args, encoding="utf-8") 276 f.close() 277 278 def test_fdopen(self): 279 fd = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR) 280 os.close(fd) 281 282 self.fdopen_helper() 283 self.fdopen_helper('r') 284 self.fdopen_helper('r', 100) 285 286 def test_replace(self): 287 TESTFN2 = os_helper.TESTFN + ".2" 288 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 289 self.addCleanup(os_helper.unlink, TESTFN2) 290 291 create_file(os_helper.TESTFN, b"1") 292 create_file(TESTFN2, b"2") 293 294 os.replace(os_helper.TESTFN, TESTFN2) 295 self.assertRaises(FileNotFoundError, os.stat, os_helper.TESTFN) 296 with open(TESTFN2, 'r', encoding='utf-8') as f: 297 self.assertEqual(f.read(), "1") 298 299 def test_open_keywords(self): 300 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777, 301 dir_fd=None) 302 os.close(f) 303 304 def test_symlink_keywords(self): 305 symlink = support.get_attribute(os, "symlink") 306 try: 307 symlink(src='target', dst=os_helper.TESTFN, 308 target_is_directory=False, dir_fd=None) 309 except (NotImplementedError, OSError): 310 pass # No OS support or unprivileged user 311 312 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()') 313 def test_copy_file_range_invalid_values(self): 314 with self.assertRaises(ValueError): 315 os.copy_file_range(0, 1, -10) 316 317 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()') 318 def test_copy_file_range(self): 319 TESTFN2 = os_helper.TESTFN + ".3" 320 data = b'0123456789' 321 322 create_file(os_helper.TESTFN, data) 323 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 324 325 in_file = open(os_helper.TESTFN, 'rb') 326 self.addCleanup(in_file.close) 327 in_fd = in_file.fileno() 328 329 out_file = open(TESTFN2, 'w+b') 330 self.addCleanup(os_helper.unlink, TESTFN2) 331 self.addCleanup(out_file.close) 332 out_fd = out_file.fileno() 333 334 try: 335 i = os.copy_file_range(in_fd, out_fd, 5) 336 except OSError as e: 337 # Handle the case in which Python was compiled 338 # in a system with the syscall but without support 339 # in the kernel. 340 if e.errno != errno.ENOSYS: 341 raise 342 self.skipTest(e) 343 else: 344 # The number of copied bytes can be less than 345 # the number of bytes originally requested. 346 self.assertIn(i, range(0, 6)); 347 348 with open(TESTFN2, 'rb') as in_file: 349 self.assertEqual(in_file.read(), data[:i]) 350 351 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()') 352 def test_copy_file_range_offset(self): 353 TESTFN4 = os_helper.TESTFN + ".4" 354 data = b'0123456789' 355 bytes_to_copy = 6 356 in_skip = 3 357 out_seek = 5 358 359 create_file(os_helper.TESTFN, data) 360 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 361 362 in_file = open(os_helper.TESTFN, 'rb') 363 self.addCleanup(in_file.close) 364 in_fd = in_file.fileno() 365 366 out_file = open(TESTFN4, 'w+b') 367 self.addCleanup(os_helper.unlink, TESTFN4) 368 self.addCleanup(out_file.close) 369 out_fd = out_file.fileno() 370 371 try: 372 i = os.copy_file_range(in_fd, out_fd, bytes_to_copy, 373 offset_src=in_skip, 374 offset_dst=out_seek) 375 except OSError as e: 376 # Handle the case in which Python was compiled 377 # in a system with the syscall but without support 378 # in the kernel. 379 if e.errno != errno.ENOSYS: 380 raise 381 self.skipTest(e) 382 else: 383 # The number of copied bytes can be less than 384 # the number of bytes originally requested. 385 self.assertIn(i, range(0, bytes_to_copy+1)); 386 387 with open(TESTFN4, 'rb') as in_file: 388 read = in_file.read() 389 # seeked bytes (5) are zero'ed 390 self.assertEqual(read[:out_seek], b'\x00'*out_seek) 391 # 012 are skipped (in_skip) 392 # 345678 are copied in the file (in_skip + bytes_to_copy) 393 self.assertEqual(read[out_seek:], 394 data[in_skip:in_skip+i]) 395 396 @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()') 397 def test_splice_invalid_values(self): 398 with self.assertRaises(ValueError): 399 os.splice(0, 1, -10) 400 401 @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()') 402 @requires_splice_pipe 403 def test_splice(self): 404 TESTFN2 = os_helper.TESTFN + ".3" 405 data = b'0123456789' 406 407 create_file(os_helper.TESTFN, data) 408 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 409 410 in_file = open(os_helper.TESTFN, 'rb') 411 self.addCleanup(in_file.close) 412 in_fd = in_file.fileno() 413 414 read_fd, write_fd = os.pipe() 415 self.addCleanup(lambda: os.close(read_fd)) 416 self.addCleanup(lambda: os.close(write_fd)) 417 418 try: 419 i = os.splice(in_fd, write_fd, 5) 420 except OSError as e: 421 # Handle the case in which Python was compiled 422 # in a system with the syscall but without support 423 # in the kernel. 424 if e.errno != errno.ENOSYS: 425 raise 426 self.skipTest(e) 427 else: 428 # The number of copied bytes can be less than 429 # the number of bytes originally requested. 430 self.assertIn(i, range(0, 6)); 431 432 self.assertEqual(os.read(read_fd, 100), data[:i]) 433 434 @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()') 435 @requires_splice_pipe 436 def test_splice_offset_in(self): 437 TESTFN4 = os_helper.TESTFN + ".4" 438 data = b'0123456789' 439 bytes_to_copy = 6 440 in_skip = 3 441 442 create_file(os_helper.TESTFN, data) 443 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 444 445 in_file = open(os_helper.TESTFN, 'rb') 446 self.addCleanup(in_file.close) 447 in_fd = in_file.fileno() 448 449 read_fd, write_fd = os.pipe() 450 self.addCleanup(lambda: os.close(read_fd)) 451 self.addCleanup(lambda: os.close(write_fd)) 452 453 try: 454 i = os.splice(in_fd, write_fd, bytes_to_copy, offset_src=in_skip) 455 except OSError as e: 456 # Handle the case in which Python was compiled 457 # in a system with the syscall but without support 458 # in the kernel. 459 if e.errno != errno.ENOSYS: 460 raise 461 self.skipTest(e) 462 else: 463 # The number of copied bytes can be less than 464 # the number of bytes originally requested. 465 self.assertIn(i, range(0, bytes_to_copy+1)); 466 467 read = os.read(read_fd, 100) 468 # 012 are skipped (in_skip) 469 # 345678 are copied in the file (in_skip + bytes_to_copy) 470 self.assertEqual(read, data[in_skip:in_skip+i]) 471 472 @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()') 473 @requires_splice_pipe 474 def test_splice_offset_out(self): 475 TESTFN4 = os_helper.TESTFN + ".4" 476 data = b'0123456789' 477 bytes_to_copy = 6 478 out_seek = 3 479 480 create_file(os_helper.TESTFN, data) 481 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 482 483 read_fd, write_fd = os.pipe() 484 self.addCleanup(lambda: os.close(read_fd)) 485 self.addCleanup(lambda: os.close(write_fd)) 486 os.write(write_fd, data) 487 488 out_file = open(TESTFN4, 'w+b') 489 self.addCleanup(os_helper.unlink, TESTFN4) 490 self.addCleanup(out_file.close) 491 out_fd = out_file.fileno() 492 493 try: 494 i = os.splice(read_fd, out_fd, bytes_to_copy, offset_dst=out_seek) 495 except OSError as e: 496 # Handle the case in which Python was compiled 497 # in a system with the syscall but without support 498 # in the kernel. 499 if e.errno != errno.ENOSYS: 500 raise 501 self.skipTest(e) 502 else: 503 # The number of copied bytes can be less than 504 # the number of bytes originally requested. 505 self.assertIn(i, range(0, bytes_to_copy+1)); 506 507 with open(TESTFN4, 'rb') as in_file: 508 read = in_file.read() 509 # seeked bytes (5) are zero'ed 510 self.assertEqual(read[:out_seek], b'\x00'*out_seek) 511 # 012 are skipped (in_skip) 512 # 345678 are copied in the file (in_skip + bytes_to_copy) 513 self.assertEqual(read[out_seek:], data[:i]) 514 515 516# Test attributes on return values from os.*stat* family. 517class StatAttributeTests(unittest.TestCase): 518 def setUp(self): 519 self.fname = os_helper.TESTFN 520 self.addCleanup(os_helper.unlink, self.fname) 521 create_file(self.fname, b"ABC") 522 523 def check_stat_attributes(self, fname): 524 result = os.stat(fname) 525 526 # Make sure direct access works 527 self.assertEqual(result[stat.ST_SIZE], 3) 528 self.assertEqual(result.st_size, 3) 529 530 # Make sure all the attributes are there 531 members = dir(result) 532 for name in dir(stat): 533 if name[:3] == 'ST_': 534 attr = name.lower() 535 if name.endswith("TIME"): 536 def trunc(x): return int(x) 537 else: 538 def trunc(x): return x 539 self.assertEqual(trunc(getattr(result, attr)), 540 result[getattr(stat, name)]) 541 self.assertIn(attr, members) 542 543 # Make sure that the st_?time and st_?time_ns fields roughly agree 544 # (they should always agree up to around tens-of-microseconds) 545 for name in 'st_atime st_mtime st_ctime'.split(): 546 floaty = int(getattr(result, name) * 100000) 547 nanosecondy = getattr(result, name + "_ns") // 10000 548 self.assertAlmostEqual(floaty, nanosecondy, delta=2) 549 550 try: 551 result[200] 552 self.fail("No exception raised") 553 except IndexError: 554 pass 555 556 # Make sure that assignment fails 557 try: 558 result.st_mode = 1 559 self.fail("No exception raised") 560 except AttributeError: 561 pass 562 563 try: 564 result.st_rdev = 1 565 self.fail("No exception raised") 566 except (AttributeError, TypeError): 567 pass 568 569 try: 570 result.parrot = 1 571 self.fail("No exception raised") 572 except AttributeError: 573 pass 574 575 # Use the stat_result constructor with a too-short tuple. 576 try: 577 result2 = os.stat_result((10,)) 578 self.fail("No exception raised") 579 except TypeError: 580 pass 581 582 # Use the constructor with a too-long tuple. 583 try: 584 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14)) 585 except TypeError: 586 pass 587 588 def test_stat_attributes(self): 589 self.check_stat_attributes(self.fname) 590 591 def test_stat_attributes_bytes(self): 592 try: 593 fname = self.fname.encode(sys.getfilesystemencoding()) 594 except UnicodeEncodeError: 595 self.skipTest("cannot encode %a for the filesystem" % self.fname) 596 self.check_stat_attributes(fname) 597 598 def test_stat_result_pickle(self): 599 result = os.stat(self.fname) 600 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 601 p = pickle.dumps(result, proto) 602 self.assertIn(b'stat_result', p) 603 if proto < 4: 604 self.assertIn(b'cos\nstat_result\n', p) 605 unpickled = pickle.loads(p) 606 self.assertEqual(result, unpickled) 607 608 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()') 609 def test_statvfs_attributes(self): 610 result = os.statvfs(self.fname) 611 612 # Make sure direct access works 613 self.assertEqual(result.f_bfree, result[3]) 614 615 # Make sure all the attributes are there. 616 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files', 617 'ffree', 'favail', 'flag', 'namemax') 618 for value, member in enumerate(members): 619 self.assertEqual(getattr(result, 'f_' + member), result[value]) 620 621 self.assertTrue(isinstance(result.f_fsid, int)) 622 623 # Test that the size of the tuple doesn't change 624 self.assertEqual(len(result), 10) 625 626 # Make sure that assignment really fails 627 try: 628 result.f_bfree = 1 629 self.fail("No exception raised") 630 except AttributeError: 631 pass 632 633 try: 634 result.parrot = 1 635 self.fail("No exception raised") 636 except AttributeError: 637 pass 638 639 # Use the constructor with a too-short tuple. 640 try: 641 result2 = os.statvfs_result((10,)) 642 self.fail("No exception raised") 643 except TypeError: 644 pass 645 646 # Use the constructor with a too-long tuple. 647 try: 648 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14)) 649 except TypeError: 650 pass 651 652 @unittest.skipUnless(hasattr(os, 'statvfs'), 653 "need os.statvfs()") 654 def test_statvfs_result_pickle(self): 655 result = os.statvfs(self.fname) 656 657 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 658 p = pickle.dumps(result, proto) 659 self.assertIn(b'statvfs_result', p) 660 if proto < 4: 661 self.assertIn(b'cos\nstatvfs_result\n', p) 662 unpickled = pickle.loads(p) 663 self.assertEqual(result, unpickled) 664 665 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 666 def test_1686475(self): 667 # Verify that an open file can be stat'ed 668 try: 669 os.stat(r"c:\pagefile.sys") 670 except FileNotFoundError: 671 self.skipTest(r'c:\pagefile.sys does not exist') 672 except OSError as e: 673 self.fail("Could not stat pagefile.sys") 674 675 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 676 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") 677 def test_15261(self): 678 # Verify that stat'ing a closed fd does not cause crash 679 r, w = os.pipe() 680 try: 681 os.stat(r) # should not raise error 682 finally: 683 os.close(r) 684 os.close(w) 685 with self.assertRaises(OSError) as ctx: 686 os.stat(r) 687 self.assertEqual(ctx.exception.errno, errno.EBADF) 688 689 def check_file_attributes(self, result): 690 self.assertTrue(hasattr(result, 'st_file_attributes')) 691 self.assertTrue(isinstance(result.st_file_attributes, int)) 692 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF) 693 694 @unittest.skipUnless(sys.platform == "win32", 695 "st_file_attributes is Win32 specific") 696 def test_file_attributes(self): 697 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set) 698 result = os.stat(self.fname) 699 self.check_file_attributes(result) 700 self.assertEqual( 701 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY, 702 0) 703 704 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set) 705 dirname = os_helper.TESTFN + "dir" 706 os.mkdir(dirname) 707 self.addCleanup(os.rmdir, dirname) 708 709 result = os.stat(dirname) 710 self.check_file_attributes(result) 711 self.assertEqual( 712 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY, 713 stat.FILE_ATTRIBUTE_DIRECTORY) 714 715 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 716 def test_access_denied(self): 717 # Default to FindFirstFile WIN32_FIND_DATA when access is 718 # denied. See issue 28075. 719 # os.environ['TEMP'] should be located on a volume that 720 # supports file ACLs. 721 fname = os.path.join(os.environ['TEMP'], self.fname) 722 self.addCleanup(os_helper.unlink, fname) 723 create_file(fname, b'ABC') 724 # Deny the right to [S]YNCHRONIZE on the file to 725 # force CreateFile to fail with ERROR_ACCESS_DENIED. 726 DETACHED_PROCESS = 8 727 subprocess.check_call( 728 # bpo-30584: Use security identifier *S-1-5-32-545 instead 729 # of localized "Users" to not depend on the locale. 730 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'], 731 creationflags=DETACHED_PROCESS 732 ) 733 result = os.stat(fname) 734 self.assertNotEqual(result.st_size, 0) 735 736 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 737 def test_stat_block_device(self): 738 # bpo-38030: os.stat fails for block devices 739 # Test a filename like "//./C:" 740 fname = "//./" + os.path.splitdrive(os.getcwd())[0] 741 result = os.stat(fname) 742 self.assertEqual(result.st_mode, stat.S_IFBLK) 743 744 745class UtimeTests(unittest.TestCase): 746 def setUp(self): 747 self.dirname = os_helper.TESTFN 748 self.fname = os.path.join(self.dirname, "f1") 749 750 self.addCleanup(os_helper.rmtree, self.dirname) 751 os.mkdir(self.dirname) 752 create_file(self.fname) 753 754 def support_subsecond(self, filename): 755 # Heuristic to check if the filesystem supports timestamp with 756 # subsecond resolution: check if float and int timestamps are different 757 st = os.stat(filename) 758 return ((st.st_atime != st[7]) 759 or (st.st_mtime != st[8]) 760 or (st.st_ctime != st[9])) 761 762 def _test_utime(self, set_time, filename=None): 763 if not filename: 764 filename = self.fname 765 766 support_subsecond = self.support_subsecond(filename) 767 if support_subsecond: 768 # Timestamp with a resolution of 1 microsecond (10^-6). 769 # 770 # The resolution of the C internal function used by os.utime() 771 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable 772 # test with a resolution of 1 ns requires more work: 773 # see the issue #15745. 774 atime_ns = 1002003000 # 1.002003 seconds 775 mtime_ns = 4005006000 # 4.005006 seconds 776 else: 777 # use a resolution of 1 second 778 atime_ns = 5 * 10**9 779 mtime_ns = 8 * 10**9 780 781 set_time(filename, (atime_ns, mtime_ns)) 782 st = os.stat(filename) 783 784 if support_subsecond: 785 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6) 786 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6) 787 else: 788 self.assertEqual(st.st_atime, atime_ns * 1e-9) 789 self.assertEqual(st.st_mtime, mtime_ns * 1e-9) 790 self.assertEqual(st.st_atime_ns, atime_ns) 791 self.assertEqual(st.st_mtime_ns, mtime_ns) 792 793 def test_utime(self): 794 def set_time(filename, ns): 795 # test the ns keyword parameter 796 os.utime(filename, ns=ns) 797 self._test_utime(set_time) 798 799 @staticmethod 800 def ns_to_sec(ns): 801 # Convert a number of nanosecond (int) to a number of seconds (float). 802 # Round towards infinity by adding 0.5 nanosecond to avoid rounding 803 # issue, os.utime() rounds towards minus infinity. 804 return (ns * 1e-9) + 0.5e-9 805 806 def test_utime_by_indexed(self): 807 # pass times as floating point seconds as the second indexed parameter 808 def set_time(filename, ns): 809 atime_ns, mtime_ns = ns 810 atime = self.ns_to_sec(atime_ns) 811 mtime = self.ns_to_sec(mtime_ns) 812 # test utimensat(timespec), utimes(timeval), utime(utimbuf) 813 # or utime(time_t) 814 os.utime(filename, (atime, mtime)) 815 self._test_utime(set_time) 816 817 def test_utime_by_times(self): 818 def set_time(filename, ns): 819 atime_ns, mtime_ns = ns 820 atime = self.ns_to_sec(atime_ns) 821 mtime = self.ns_to_sec(mtime_ns) 822 # test the times keyword parameter 823 os.utime(filename, times=(atime, mtime)) 824 self._test_utime(set_time) 825 826 @unittest.skipUnless(os.utime in os.supports_follow_symlinks, 827 "follow_symlinks support for utime required " 828 "for this test.") 829 def test_utime_nofollow_symlinks(self): 830 def set_time(filename, ns): 831 # use follow_symlinks=False to test utimensat(timespec) 832 # or lutimes(timeval) 833 os.utime(filename, ns=ns, follow_symlinks=False) 834 self._test_utime(set_time) 835 836 @unittest.skipUnless(os.utime in os.supports_fd, 837 "fd support for utime required for this test.") 838 def test_utime_fd(self): 839 def set_time(filename, ns): 840 with open(filename, 'wb', 0) as fp: 841 # use a file descriptor to test futimens(timespec) 842 # or futimes(timeval) 843 os.utime(fp.fileno(), ns=ns) 844 self._test_utime(set_time) 845 846 @unittest.skipUnless(os.utime in os.supports_dir_fd, 847 "dir_fd support for utime required for this test.") 848 def test_utime_dir_fd(self): 849 def set_time(filename, ns): 850 dirname, name = os.path.split(filename) 851 dirfd = os.open(dirname, os.O_RDONLY) 852 try: 853 # pass dir_fd to test utimensat(timespec) or futimesat(timeval) 854 os.utime(name, dir_fd=dirfd, ns=ns) 855 finally: 856 os.close(dirfd) 857 self._test_utime(set_time) 858 859 def test_utime_directory(self): 860 def set_time(filename, ns): 861 # test calling os.utime() on a directory 862 os.utime(filename, ns=ns) 863 self._test_utime(set_time, filename=self.dirname) 864 865 def _test_utime_current(self, set_time): 866 # Get the system clock 867 current = time.time() 868 869 # Call os.utime() to set the timestamp to the current system clock 870 set_time(self.fname) 871 872 if not self.support_subsecond(self.fname): 873 delta = 1.0 874 else: 875 # On Windows, the usual resolution of time.time() is 15.6 ms. 876 # bpo-30649: Tolerate 50 ms for slow Windows buildbots. 877 # 878 # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use 879 # also 50 ms on other platforms. 880 delta = 0.050 881 st = os.stat(self.fname) 882 msg = ("st_time=%r, current=%r, dt=%r" 883 % (st.st_mtime, current, st.st_mtime - current)) 884 self.assertAlmostEqual(st.st_mtime, current, 885 delta=delta, msg=msg) 886 887 def test_utime_current(self): 888 def set_time(filename): 889 # Set to the current time in the new way 890 os.utime(self.fname) 891 self._test_utime_current(set_time) 892 893 def test_utime_current_old(self): 894 def set_time(filename): 895 # Set to the current time in the old explicit way. 896 os.utime(self.fname, None) 897 self._test_utime_current(set_time) 898 899 def get_file_system(self, path): 900 if sys.platform == 'win32': 901 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\' 902 import ctypes 903 kernel32 = ctypes.windll.kernel32 904 buf = ctypes.create_unicode_buffer("", 100) 905 ok = kernel32.GetVolumeInformationW(root, None, 0, 906 None, None, None, 907 buf, len(buf)) 908 if ok: 909 return buf.value 910 # return None if the filesystem is unknown 911 912 def test_large_time(self): 913 # Many filesystems are limited to the year 2038. At least, the test 914 # pass with NTFS filesystem. 915 if self.get_file_system(self.dirname) != "NTFS": 916 self.skipTest("requires NTFS") 917 918 large = 5000000000 # some day in 2128 919 os.utime(self.fname, (large, large)) 920 self.assertEqual(os.stat(self.fname).st_mtime, large) 921 922 def test_utime_invalid_arguments(self): 923 # seconds and nanoseconds parameters are mutually exclusive 924 with self.assertRaises(ValueError): 925 os.utime(self.fname, (5, 5), ns=(5, 5)) 926 with self.assertRaises(TypeError): 927 os.utime(self.fname, [5, 5]) 928 with self.assertRaises(TypeError): 929 os.utime(self.fname, (5,)) 930 with self.assertRaises(TypeError): 931 os.utime(self.fname, (5, 5, 5)) 932 with self.assertRaises(TypeError): 933 os.utime(self.fname, ns=[5, 5]) 934 with self.assertRaises(TypeError): 935 os.utime(self.fname, ns=(5,)) 936 with self.assertRaises(TypeError): 937 os.utime(self.fname, ns=(5, 5, 5)) 938 939 if os.utime not in os.supports_follow_symlinks: 940 with self.assertRaises(NotImplementedError): 941 os.utime(self.fname, (5, 5), follow_symlinks=False) 942 if os.utime not in os.supports_fd: 943 with open(self.fname, 'wb', 0) as fp: 944 with self.assertRaises(TypeError): 945 os.utime(fp.fileno(), (5, 5)) 946 if os.utime not in os.supports_dir_fd: 947 with self.assertRaises(NotImplementedError): 948 os.utime(self.fname, (5, 5), dir_fd=0) 949 950 @support.cpython_only 951 def test_issue31577(self): 952 # The interpreter shouldn't crash in case utime() received a bad 953 # ns argument. 954 def get_bad_int(divmod_ret_val): 955 class BadInt: 956 def __divmod__(*args): 957 return divmod_ret_val 958 return BadInt() 959 with self.assertRaises(TypeError): 960 os.utime(self.fname, ns=(get_bad_int(42), 1)) 961 with self.assertRaises(TypeError): 962 os.utime(self.fname, ns=(get_bad_int(()), 1)) 963 with self.assertRaises(TypeError): 964 os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1)) 965 966 967from test import mapping_tests 968 969class EnvironTests(mapping_tests.BasicTestMappingProtocol): 970 """check that os.environ object conform to mapping protocol""" 971 type2test = None 972 973 def setUp(self): 974 self.__save = dict(os.environ) 975 if os.supports_bytes_environ: 976 self.__saveb = dict(os.environb) 977 for key, value in self._reference().items(): 978 os.environ[key] = value 979 980 def tearDown(self): 981 os.environ.clear() 982 os.environ.update(self.__save) 983 if os.supports_bytes_environ: 984 os.environb.clear() 985 os.environb.update(self.__saveb) 986 987 def _reference(self): 988 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"} 989 990 def _empty_mapping(self): 991 os.environ.clear() 992 return os.environ 993 994 # Bug 1110478 995 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell), 996 'requires a shell') 997 @unittest.skipUnless(hasattr(os, 'popen'), "needs os.popen()") 998 def test_update2(self): 999 os.environ.clear() 1000 os.environ.update(HELLO="World") 1001 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen: 1002 value = popen.read().strip() 1003 self.assertEqual(value, "World") 1004 1005 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell), 1006 'requires a shell') 1007 @unittest.skipUnless(hasattr(os, 'popen'), "needs os.popen()") 1008 def test_os_popen_iter(self): 1009 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'" 1010 % unix_shell) as popen: 1011 it = iter(popen) 1012 self.assertEqual(next(it), "line1\n") 1013 self.assertEqual(next(it), "line2\n") 1014 self.assertEqual(next(it), "line3\n") 1015 self.assertRaises(StopIteration, next, it) 1016 1017 # Verify environ keys and values from the OS are of the 1018 # correct str type. 1019 def test_keyvalue_types(self): 1020 for key, val in os.environ.items(): 1021 self.assertEqual(type(key), str) 1022 self.assertEqual(type(val), str) 1023 1024 def test_items(self): 1025 for key, value in self._reference().items(): 1026 self.assertEqual(os.environ.get(key), value) 1027 1028 # Issue 7310 1029 def test___repr__(self): 1030 """Check that the repr() of os.environ looks like environ({...}).""" 1031 env = os.environ 1032 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join( 1033 '{!r}: {!r}'.format(key, value) 1034 for key, value in env.items()))) 1035 1036 def test_get_exec_path(self): 1037 defpath_list = os.defpath.split(os.pathsep) 1038 test_path = ['/monty', '/python', '', '/flying/circus'] 1039 test_env = {'PATH': os.pathsep.join(test_path)} 1040 1041 saved_environ = os.environ 1042 try: 1043 os.environ = dict(test_env) 1044 # Test that defaulting to os.environ works. 1045 self.assertSequenceEqual(test_path, os.get_exec_path()) 1046 self.assertSequenceEqual(test_path, os.get_exec_path(env=None)) 1047 finally: 1048 os.environ = saved_environ 1049 1050 # No PATH environment variable 1051 self.assertSequenceEqual(defpath_list, os.get_exec_path({})) 1052 # Empty PATH environment variable 1053 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''})) 1054 # Supplied PATH environment variable 1055 self.assertSequenceEqual(test_path, os.get_exec_path(test_env)) 1056 1057 if os.supports_bytes_environ: 1058 # env cannot contain 'PATH' and b'PATH' keys 1059 try: 1060 # ignore BytesWarning warning 1061 with warnings.catch_warnings(record=True): 1062 mixed_env = {'PATH': '1', b'PATH': b'2'} 1063 except BytesWarning: 1064 # mixed_env cannot be created with python -bb 1065 pass 1066 else: 1067 self.assertRaises(ValueError, os.get_exec_path, mixed_env) 1068 1069 # bytes key and/or value 1070 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}), 1071 ['abc']) 1072 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}), 1073 ['abc']) 1074 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}), 1075 ['abc']) 1076 1077 @unittest.skipUnless(os.supports_bytes_environ, 1078 "os.environb required for this test.") 1079 def test_environb(self): 1080 # os.environ -> os.environb 1081 value = 'euro\u20ac' 1082 try: 1083 value_bytes = value.encode(sys.getfilesystemencoding(), 1084 'surrogateescape') 1085 except UnicodeEncodeError: 1086 msg = "U+20AC character is not encodable to %s" % ( 1087 sys.getfilesystemencoding(),) 1088 self.skipTest(msg) 1089 os.environ['unicode'] = value 1090 self.assertEqual(os.environ['unicode'], value) 1091 self.assertEqual(os.environb[b'unicode'], value_bytes) 1092 1093 # os.environb -> os.environ 1094 value = b'\xff' 1095 os.environb[b'bytes'] = value 1096 self.assertEqual(os.environb[b'bytes'], value) 1097 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape') 1098 self.assertEqual(os.environ['bytes'], value_str) 1099 1100 def test_putenv_unsetenv(self): 1101 name = "PYTHONTESTVAR" 1102 value = "testvalue" 1103 code = f'import os; print(repr(os.environ.get({name!r})))' 1104 1105 with os_helper.EnvironmentVarGuard() as env: 1106 env.pop(name, None) 1107 1108 os.putenv(name, value) 1109 proc = subprocess.run([sys.executable, '-c', code], check=True, 1110 stdout=subprocess.PIPE, text=True) 1111 self.assertEqual(proc.stdout.rstrip(), repr(value)) 1112 1113 os.unsetenv(name) 1114 proc = subprocess.run([sys.executable, '-c', code], check=True, 1115 stdout=subprocess.PIPE, text=True) 1116 self.assertEqual(proc.stdout.rstrip(), repr(None)) 1117 1118 # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415). 1119 @support.requires_mac_ver(10, 6) 1120 def test_putenv_unsetenv_error(self): 1121 # Empty variable name is invalid. 1122 # "=" and null character are not allowed in a variable name. 1123 for name in ('', '=name', 'na=me', 'name=', 'name\0', 'na\0me'): 1124 self.assertRaises((OSError, ValueError), os.putenv, name, "value") 1125 self.assertRaises((OSError, ValueError), os.unsetenv, name) 1126 1127 if sys.platform == "win32": 1128 # On Windows, an environment variable string ("name=value" string) 1129 # is limited to 32,767 characters 1130 longstr = 'x' * 32_768 1131 self.assertRaises(ValueError, os.putenv, longstr, "1") 1132 self.assertRaises(ValueError, os.putenv, "X", longstr) 1133 self.assertRaises(ValueError, os.unsetenv, longstr) 1134 1135 def test_key_type(self): 1136 missing = 'missingkey' 1137 self.assertNotIn(missing, os.environ) 1138 1139 with self.assertRaises(KeyError) as cm: 1140 os.environ[missing] 1141 self.assertIs(cm.exception.args[0], missing) 1142 self.assertTrue(cm.exception.__suppress_context__) 1143 1144 with self.assertRaises(KeyError) as cm: 1145 del os.environ[missing] 1146 self.assertIs(cm.exception.args[0], missing) 1147 self.assertTrue(cm.exception.__suppress_context__) 1148 1149 def _test_environ_iteration(self, collection): 1150 iterator = iter(collection) 1151 new_key = "__new_key__" 1152 1153 next(iterator) # start iteration over os.environ.items 1154 1155 # add a new key in os.environ mapping 1156 os.environ[new_key] = "test_environ_iteration" 1157 1158 try: 1159 next(iterator) # force iteration over modified mapping 1160 self.assertEqual(os.environ[new_key], "test_environ_iteration") 1161 finally: 1162 del os.environ[new_key] 1163 1164 def test_iter_error_when_changing_os_environ(self): 1165 self._test_environ_iteration(os.environ) 1166 1167 def test_iter_error_when_changing_os_environ_items(self): 1168 self._test_environ_iteration(os.environ.items()) 1169 1170 def test_iter_error_when_changing_os_environ_values(self): 1171 self._test_environ_iteration(os.environ.values()) 1172 1173 def _test_underlying_process_env(self, var, expected): 1174 if not (unix_shell and os.path.exists(unix_shell)): 1175 return 1176 1177 with os.popen(f"{unix_shell} -c 'echo ${var}'") as popen: 1178 value = popen.read().strip() 1179 1180 self.assertEqual(expected, value) 1181 1182 def test_or_operator(self): 1183 overridden_key = '_TEST_VAR_' 1184 original_value = 'original_value' 1185 os.environ[overridden_key] = original_value 1186 1187 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'} 1188 expected = dict(os.environ) 1189 expected.update(new_vars_dict) 1190 1191 actual = os.environ | new_vars_dict 1192 self.assertDictEqual(expected, actual) 1193 self.assertEqual('3', actual[overridden_key]) 1194 1195 new_vars_items = new_vars_dict.items() 1196 self.assertIs(NotImplemented, os.environ.__or__(new_vars_items)) 1197 1198 self._test_underlying_process_env('_A_', '') 1199 self._test_underlying_process_env(overridden_key, original_value) 1200 1201 def test_ior_operator(self): 1202 overridden_key = '_TEST_VAR_' 1203 os.environ[overridden_key] = 'original_value' 1204 1205 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'} 1206 expected = dict(os.environ) 1207 expected.update(new_vars_dict) 1208 1209 os.environ |= new_vars_dict 1210 self.assertEqual(expected, os.environ) 1211 self.assertEqual('3', os.environ[overridden_key]) 1212 1213 self._test_underlying_process_env('_A_', '1') 1214 self._test_underlying_process_env(overridden_key, '3') 1215 1216 def test_ior_operator_invalid_dicts(self): 1217 os_environ_copy = os.environ.copy() 1218 with self.assertRaises(TypeError): 1219 dict_with_bad_key = {1: '_A_'} 1220 os.environ |= dict_with_bad_key 1221 1222 with self.assertRaises(TypeError): 1223 dict_with_bad_val = {'_A_': 1} 1224 os.environ |= dict_with_bad_val 1225 1226 # Check nothing was added. 1227 self.assertEqual(os_environ_copy, os.environ) 1228 1229 def test_ior_operator_key_value_iterable(self): 1230 overridden_key = '_TEST_VAR_' 1231 os.environ[overridden_key] = 'original_value' 1232 1233 new_vars_items = (('_A_', '1'), ('_B_', '2'), (overridden_key, '3')) 1234 expected = dict(os.environ) 1235 expected.update(new_vars_items) 1236 1237 os.environ |= new_vars_items 1238 self.assertEqual(expected, os.environ) 1239 self.assertEqual('3', os.environ[overridden_key]) 1240 1241 self._test_underlying_process_env('_A_', '1') 1242 self._test_underlying_process_env(overridden_key, '3') 1243 1244 def test_ror_operator(self): 1245 overridden_key = '_TEST_VAR_' 1246 original_value = 'original_value' 1247 os.environ[overridden_key] = original_value 1248 1249 new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'} 1250 expected = dict(new_vars_dict) 1251 expected.update(os.environ) 1252 1253 actual = new_vars_dict | os.environ 1254 self.assertDictEqual(expected, actual) 1255 self.assertEqual(original_value, actual[overridden_key]) 1256 1257 new_vars_items = new_vars_dict.items() 1258 self.assertIs(NotImplemented, os.environ.__ror__(new_vars_items)) 1259 1260 self._test_underlying_process_env('_A_', '') 1261 self._test_underlying_process_env(overridden_key, original_value) 1262 1263 1264class WalkTests(unittest.TestCase): 1265 """Tests for os.walk().""" 1266 1267 # Wrapper to hide minor differences between os.walk and os.fwalk 1268 # to tests both functions with the same code base 1269 def walk(self, top, **kwargs): 1270 if 'follow_symlinks' in kwargs: 1271 kwargs['followlinks'] = kwargs.pop('follow_symlinks') 1272 return os.walk(top, **kwargs) 1273 1274 def setUp(self): 1275 join = os.path.join 1276 self.addCleanup(os_helper.rmtree, os_helper.TESTFN) 1277 1278 # Build: 1279 # TESTFN/ 1280 # TEST1/ a file kid and two directory kids 1281 # tmp1 1282 # SUB1/ a file kid and a directory kid 1283 # tmp2 1284 # SUB11/ no kids 1285 # SUB2/ a file kid and a dirsymlink kid 1286 # tmp3 1287 # SUB21/ not readable 1288 # tmp5 1289 # link/ a symlink to TESTFN.2 1290 # broken_link 1291 # broken_link2 1292 # broken_link3 1293 # TEST2/ 1294 # tmp4 a lone file 1295 self.walk_path = join(os_helper.TESTFN, "TEST1") 1296 self.sub1_path = join(self.walk_path, "SUB1") 1297 self.sub11_path = join(self.sub1_path, "SUB11") 1298 sub2_path = join(self.walk_path, "SUB2") 1299 sub21_path = join(sub2_path, "SUB21") 1300 tmp1_path = join(self.walk_path, "tmp1") 1301 tmp2_path = join(self.sub1_path, "tmp2") 1302 tmp3_path = join(sub2_path, "tmp3") 1303 tmp5_path = join(sub21_path, "tmp3") 1304 self.link_path = join(sub2_path, "link") 1305 t2_path = join(os_helper.TESTFN, "TEST2") 1306 tmp4_path = join(os_helper.TESTFN, "TEST2", "tmp4") 1307 broken_link_path = join(sub2_path, "broken_link") 1308 broken_link2_path = join(sub2_path, "broken_link2") 1309 broken_link3_path = join(sub2_path, "broken_link3") 1310 1311 # Create stuff. 1312 os.makedirs(self.sub11_path) 1313 os.makedirs(sub2_path) 1314 os.makedirs(sub21_path) 1315 os.makedirs(t2_path) 1316 1317 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path: 1318 with open(path, "x", encoding='utf-8') as f: 1319 f.write("I'm " + path + " and proud of it. Blame test_os.\n") 1320 1321 if os_helper.can_symlink(): 1322 os.symlink(os.path.abspath(t2_path), self.link_path) 1323 os.symlink('broken', broken_link_path, True) 1324 os.symlink(join('tmp3', 'broken'), broken_link2_path, True) 1325 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True) 1326 self.sub2_tree = (sub2_path, ["SUB21", "link"], 1327 ["broken_link", "broken_link2", "broken_link3", 1328 "tmp3"]) 1329 else: 1330 self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"]) 1331 1332 os.chmod(sub21_path, 0) 1333 try: 1334 os.listdir(sub21_path) 1335 except PermissionError: 1336 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU) 1337 else: 1338 os.chmod(sub21_path, stat.S_IRWXU) 1339 os.unlink(tmp5_path) 1340 os.rmdir(sub21_path) 1341 del self.sub2_tree[1][:1] 1342 1343 def test_walk_topdown(self): 1344 # Walk top-down. 1345 all = list(self.walk(self.walk_path)) 1346 1347 self.assertEqual(len(all), 4) 1348 # We can't know which order SUB1 and SUB2 will appear in. 1349 # Not flipped: TESTFN, SUB1, SUB11, SUB2 1350 # flipped: TESTFN, SUB2, SUB1, SUB11 1351 flipped = all[0][1][0] != "SUB1" 1352 all[0][1].sort() 1353 all[3 - 2 * flipped][-1].sort() 1354 all[3 - 2 * flipped][1].sort() 1355 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"])) 1356 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"])) 1357 self.assertEqual(all[2 + flipped], (self.sub11_path, [], [])) 1358 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree) 1359 1360 def test_walk_prune(self, walk_path=None): 1361 if walk_path is None: 1362 walk_path = self.walk_path 1363 # Prune the search. 1364 all = [] 1365 for root, dirs, files in self.walk(walk_path): 1366 all.append((root, dirs, files)) 1367 # Don't descend into SUB1. 1368 if 'SUB1' in dirs: 1369 # Note that this also mutates the dirs we appended to all! 1370 dirs.remove('SUB1') 1371 1372 self.assertEqual(len(all), 2) 1373 self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"])) 1374 1375 all[1][-1].sort() 1376 all[1][1].sort() 1377 self.assertEqual(all[1], self.sub2_tree) 1378 1379 def test_file_like_path(self): 1380 self.test_walk_prune(FakePath(self.walk_path)) 1381 1382 def test_walk_bottom_up(self): 1383 # Walk bottom-up. 1384 all = list(self.walk(self.walk_path, topdown=False)) 1385 1386 self.assertEqual(len(all), 4, all) 1387 # We can't know which order SUB1 and SUB2 will appear in. 1388 # Not flipped: SUB11, SUB1, SUB2, TESTFN 1389 # flipped: SUB2, SUB11, SUB1, TESTFN 1390 flipped = all[3][1][0] != "SUB1" 1391 all[3][1].sort() 1392 all[2 - 2 * flipped][-1].sort() 1393 all[2 - 2 * flipped][1].sort() 1394 self.assertEqual(all[3], 1395 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"])) 1396 self.assertEqual(all[flipped], 1397 (self.sub11_path, [], [])) 1398 self.assertEqual(all[flipped + 1], 1399 (self.sub1_path, ["SUB11"], ["tmp2"])) 1400 self.assertEqual(all[2 - 2 * flipped], 1401 self.sub2_tree) 1402 1403 def test_walk_symlink(self): 1404 if not os_helper.can_symlink(): 1405 self.skipTest("need symlink support") 1406 1407 # Walk, following symlinks. 1408 walk_it = self.walk(self.walk_path, follow_symlinks=True) 1409 for root, dirs, files in walk_it: 1410 if root == self.link_path: 1411 self.assertEqual(dirs, []) 1412 self.assertEqual(files, ["tmp4"]) 1413 break 1414 else: 1415 self.fail("Didn't follow symlink with followlinks=True") 1416 1417 def test_walk_bad_dir(self): 1418 # Walk top-down. 1419 errors = [] 1420 walk_it = self.walk(self.walk_path, onerror=errors.append) 1421 root, dirs, files = next(walk_it) 1422 self.assertEqual(errors, []) 1423 dir1 = 'SUB1' 1424 path1 = os.path.join(root, dir1) 1425 path1new = os.path.join(root, dir1 + '.new') 1426 os.rename(path1, path1new) 1427 try: 1428 roots = [r for r, d, f in walk_it] 1429 self.assertTrue(errors) 1430 self.assertNotIn(path1, roots) 1431 self.assertNotIn(path1new, roots) 1432 for dir2 in dirs: 1433 if dir2 != dir1: 1434 self.assertIn(os.path.join(root, dir2), roots) 1435 finally: 1436 os.rename(path1new, path1) 1437 1438 def test_walk_many_open_files(self): 1439 depth = 30 1440 base = os.path.join(os_helper.TESTFN, 'deep') 1441 p = os.path.join(base, *(['d']*depth)) 1442 os.makedirs(p) 1443 1444 iters = [self.walk(base, topdown=False) for j in range(100)] 1445 for i in range(depth + 1): 1446 expected = (p, ['d'] if i else [], []) 1447 for it in iters: 1448 self.assertEqual(next(it), expected) 1449 p = os.path.dirname(p) 1450 1451 iters = [self.walk(base, topdown=True) for j in range(100)] 1452 p = base 1453 for i in range(depth + 1): 1454 expected = (p, ['d'] if i < depth else [], []) 1455 for it in iters: 1456 self.assertEqual(next(it), expected) 1457 p = os.path.join(p, 'd') 1458 1459 1460@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()") 1461class FwalkTests(WalkTests): 1462 """Tests for os.fwalk().""" 1463 1464 def walk(self, top, **kwargs): 1465 for root, dirs, files, root_fd in self.fwalk(top, **kwargs): 1466 yield (root, dirs, files) 1467 1468 def fwalk(self, *args, **kwargs): 1469 return os.fwalk(*args, **kwargs) 1470 1471 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs): 1472 """ 1473 compare with walk() results. 1474 """ 1475 walk_kwargs = walk_kwargs.copy() 1476 fwalk_kwargs = fwalk_kwargs.copy() 1477 for topdown, follow_symlinks in itertools.product((True, False), repeat=2): 1478 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks) 1479 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks) 1480 1481 expected = {} 1482 for root, dirs, files in os.walk(**walk_kwargs): 1483 expected[root] = (set(dirs), set(files)) 1484 1485 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs): 1486 self.assertIn(root, expected) 1487 self.assertEqual(expected[root], (set(dirs), set(files))) 1488 1489 def test_compare_to_walk(self): 1490 kwargs = {'top': os_helper.TESTFN} 1491 self._compare_to_walk(kwargs, kwargs) 1492 1493 def test_dir_fd(self): 1494 try: 1495 fd = os.open(".", os.O_RDONLY) 1496 walk_kwargs = {'top': os_helper.TESTFN} 1497 fwalk_kwargs = walk_kwargs.copy() 1498 fwalk_kwargs['dir_fd'] = fd 1499 self._compare_to_walk(walk_kwargs, fwalk_kwargs) 1500 finally: 1501 os.close(fd) 1502 1503 def test_yields_correct_dir_fd(self): 1504 # check returned file descriptors 1505 for topdown, follow_symlinks in itertools.product((True, False), repeat=2): 1506 args = os_helper.TESTFN, topdown, None 1507 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks): 1508 # check that the FD is valid 1509 os.fstat(rootfd) 1510 # redundant check 1511 os.stat(rootfd) 1512 # check that listdir() returns consistent information 1513 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files)) 1514 1515 def test_fd_leak(self): 1516 # Since we're opening a lot of FDs, we must be careful to avoid leaks: 1517 # we both check that calling fwalk() a large number of times doesn't 1518 # yield EMFILE, and that the minimum allocated FD hasn't changed. 1519 minfd = os.dup(1) 1520 os.close(minfd) 1521 for i in range(256): 1522 for x in self.fwalk(os_helper.TESTFN): 1523 pass 1524 newfd = os.dup(1) 1525 self.addCleanup(os.close, newfd) 1526 self.assertEqual(newfd, minfd) 1527 1528 # fwalk() keeps file descriptors open 1529 test_walk_many_open_files = None 1530 1531 1532class BytesWalkTests(WalkTests): 1533 """Tests for os.walk() with bytes.""" 1534 def walk(self, top, **kwargs): 1535 if 'follow_symlinks' in kwargs: 1536 kwargs['followlinks'] = kwargs.pop('follow_symlinks') 1537 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs): 1538 root = os.fsdecode(broot) 1539 dirs = list(map(os.fsdecode, bdirs)) 1540 files = list(map(os.fsdecode, bfiles)) 1541 yield (root, dirs, files) 1542 bdirs[:] = list(map(os.fsencode, dirs)) 1543 bfiles[:] = list(map(os.fsencode, files)) 1544 1545@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()") 1546class BytesFwalkTests(FwalkTests): 1547 """Tests for os.walk() with bytes.""" 1548 def fwalk(self, top='.', *args, **kwargs): 1549 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs): 1550 root = os.fsdecode(broot) 1551 dirs = list(map(os.fsdecode, bdirs)) 1552 files = list(map(os.fsdecode, bfiles)) 1553 yield (root, dirs, files, topfd) 1554 bdirs[:] = list(map(os.fsencode, dirs)) 1555 bfiles[:] = list(map(os.fsencode, files)) 1556 1557 1558class MakedirTests(unittest.TestCase): 1559 def setUp(self): 1560 os.mkdir(os_helper.TESTFN) 1561 1562 def test_makedir(self): 1563 base = os_helper.TESTFN 1564 path = os.path.join(base, 'dir1', 'dir2', 'dir3') 1565 os.makedirs(path) # Should work 1566 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4') 1567 os.makedirs(path) 1568 1569 # Try paths with a '.' in them 1570 self.assertRaises(OSError, os.makedirs, os.curdir) 1571 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir) 1572 os.makedirs(path) 1573 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4', 1574 'dir5', 'dir6') 1575 os.makedirs(path) 1576 1577 def test_mode(self): 1578 with os_helper.temp_umask(0o002): 1579 base = os_helper.TESTFN 1580 parent = os.path.join(base, 'dir1') 1581 path = os.path.join(parent, 'dir2') 1582 os.makedirs(path, 0o555) 1583 self.assertTrue(os.path.exists(path)) 1584 self.assertTrue(os.path.isdir(path)) 1585 if os.name != 'nt': 1586 self.assertEqual(os.stat(path).st_mode & 0o777, 0o555) 1587 self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775) 1588 1589 def test_exist_ok_existing_directory(self): 1590 path = os.path.join(os_helper.TESTFN, 'dir1') 1591 mode = 0o777 1592 old_mask = os.umask(0o022) 1593 os.makedirs(path, mode) 1594 self.assertRaises(OSError, os.makedirs, path, mode) 1595 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False) 1596 os.makedirs(path, 0o776, exist_ok=True) 1597 os.makedirs(path, mode=mode, exist_ok=True) 1598 os.umask(old_mask) 1599 1600 # Issue #25583: A drive root could raise PermissionError on Windows 1601 os.makedirs(os.path.abspath('/'), exist_ok=True) 1602 1603 def test_exist_ok_s_isgid_directory(self): 1604 path = os.path.join(os_helper.TESTFN, 'dir1') 1605 S_ISGID = stat.S_ISGID 1606 mode = 0o777 1607 old_mask = os.umask(0o022) 1608 try: 1609 existing_testfn_mode = stat.S_IMODE( 1610 os.lstat(os_helper.TESTFN).st_mode) 1611 try: 1612 os.chmod(os_helper.TESTFN, existing_testfn_mode | S_ISGID) 1613 except PermissionError: 1614 raise unittest.SkipTest('Cannot set S_ISGID for dir.') 1615 if (os.lstat(os_helper.TESTFN).st_mode & S_ISGID != S_ISGID): 1616 raise unittest.SkipTest('No support for S_ISGID dir mode.') 1617 # The os should apply S_ISGID from the parent dir for us, but 1618 # this test need not depend on that behavior. Be explicit. 1619 os.makedirs(path, mode | S_ISGID) 1620 # http://bugs.python.org/issue14992 1621 # Should not fail when the bit is already set. 1622 os.makedirs(path, mode, exist_ok=True) 1623 # remove the bit. 1624 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID) 1625 # May work even when the bit is not already set when demanded. 1626 os.makedirs(path, mode | S_ISGID, exist_ok=True) 1627 finally: 1628 os.umask(old_mask) 1629 1630 def test_exist_ok_existing_regular_file(self): 1631 base = os_helper.TESTFN 1632 path = os.path.join(os_helper.TESTFN, 'dir1') 1633 with open(path, 'w', encoding='utf-8') as f: 1634 f.write('abc') 1635 self.assertRaises(OSError, os.makedirs, path) 1636 self.assertRaises(OSError, os.makedirs, path, exist_ok=False) 1637 self.assertRaises(OSError, os.makedirs, path, exist_ok=True) 1638 os.remove(path) 1639 1640 def tearDown(self): 1641 path = os.path.join(os_helper.TESTFN, 'dir1', 'dir2', 'dir3', 1642 'dir4', 'dir5', 'dir6') 1643 # If the tests failed, the bottom-most directory ('../dir6') 1644 # may not have been created, so we look for the outermost directory 1645 # that exists. 1646 while not os.path.exists(path) and path != os_helper.TESTFN: 1647 path = os.path.dirname(path) 1648 1649 os.removedirs(path) 1650 1651 1652@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown") 1653class ChownFileTests(unittest.TestCase): 1654 1655 @classmethod 1656 def setUpClass(cls): 1657 os.mkdir(os_helper.TESTFN) 1658 1659 def test_chown_uid_gid_arguments_must_be_index(self): 1660 stat = os.stat(os_helper.TESTFN) 1661 uid = stat.st_uid 1662 gid = stat.st_gid 1663 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)): 1664 self.assertRaises(TypeError, os.chown, os_helper.TESTFN, value, gid) 1665 self.assertRaises(TypeError, os.chown, os_helper.TESTFN, uid, value) 1666 self.assertIsNone(os.chown(os_helper.TESTFN, uid, gid)) 1667 self.assertIsNone(os.chown(os_helper.TESTFN, -1, -1)) 1668 1669 @unittest.skipUnless(hasattr(os, 'getgroups'), 'need os.getgroups') 1670 def test_chown_gid(self): 1671 groups = os.getgroups() 1672 if len(groups) < 2: 1673 self.skipTest("test needs at least 2 groups") 1674 1675 gid_1, gid_2 = groups[:2] 1676 uid = os.stat(os_helper.TESTFN).st_uid 1677 1678 os.chown(os_helper.TESTFN, uid, gid_1) 1679 gid = os.stat(os_helper.TESTFN).st_gid 1680 self.assertEqual(gid, gid_1) 1681 1682 os.chown(os_helper.TESTFN, uid, gid_2) 1683 gid = os.stat(os_helper.TESTFN).st_gid 1684 self.assertEqual(gid, gid_2) 1685 1686 @unittest.skipUnless(root_in_posix and len(all_users) > 1, 1687 "test needs root privilege and more than one user") 1688 def test_chown_with_root(self): 1689 uid_1, uid_2 = all_users[:2] 1690 gid = os.stat(os_helper.TESTFN).st_gid 1691 os.chown(os_helper.TESTFN, uid_1, gid) 1692 uid = os.stat(os_helper.TESTFN).st_uid 1693 self.assertEqual(uid, uid_1) 1694 os.chown(os_helper.TESTFN, uid_2, gid) 1695 uid = os.stat(os_helper.TESTFN).st_uid 1696 self.assertEqual(uid, uid_2) 1697 1698 @unittest.skipUnless(not root_in_posix and len(all_users) > 1, 1699 "test needs non-root account and more than one user") 1700 def test_chown_without_permission(self): 1701 uid_1, uid_2 = all_users[:2] 1702 gid = os.stat(os_helper.TESTFN).st_gid 1703 with self.assertRaises(PermissionError): 1704 os.chown(os_helper.TESTFN, uid_1, gid) 1705 os.chown(os_helper.TESTFN, uid_2, gid) 1706 1707 @classmethod 1708 def tearDownClass(cls): 1709 os.rmdir(os_helper.TESTFN) 1710 1711 1712class RemoveDirsTests(unittest.TestCase): 1713 def setUp(self): 1714 os.makedirs(os_helper.TESTFN) 1715 1716 def tearDown(self): 1717 os_helper.rmtree(os_helper.TESTFN) 1718 1719 def test_remove_all(self): 1720 dira = os.path.join(os_helper.TESTFN, 'dira') 1721 os.mkdir(dira) 1722 dirb = os.path.join(dira, 'dirb') 1723 os.mkdir(dirb) 1724 os.removedirs(dirb) 1725 self.assertFalse(os.path.exists(dirb)) 1726 self.assertFalse(os.path.exists(dira)) 1727 self.assertFalse(os.path.exists(os_helper.TESTFN)) 1728 1729 def test_remove_partial(self): 1730 dira = os.path.join(os_helper.TESTFN, 'dira') 1731 os.mkdir(dira) 1732 dirb = os.path.join(dira, 'dirb') 1733 os.mkdir(dirb) 1734 create_file(os.path.join(dira, 'file.txt')) 1735 os.removedirs(dirb) 1736 self.assertFalse(os.path.exists(dirb)) 1737 self.assertTrue(os.path.exists(dira)) 1738 self.assertTrue(os.path.exists(os_helper.TESTFN)) 1739 1740 def test_remove_nothing(self): 1741 dira = os.path.join(os_helper.TESTFN, 'dira') 1742 os.mkdir(dira) 1743 dirb = os.path.join(dira, 'dirb') 1744 os.mkdir(dirb) 1745 create_file(os.path.join(dirb, 'file.txt')) 1746 with self.assertRaises(OSError): 1747 os.removedirs(dirb) 1748 self.assertTrue(os.path.exists(dirb)) 1749 self.assertTrue(os.path.exists(dira)) 1750 self.assertTrue(os.path.exists(os_helper.TESTFN)) 1751 1752 1753class DevNullTests(unittest.TestCase): 1754 def test_devnull(self): 1755 with open(os.devnull, 'wb', 0) as f: 1756 f.write(b'hello') 1757 f.close() 1758 with open(os.devnull, 'rb') as f: 1759 self.assertEqual(f.read(), b'') 1760 1761 1762class URandomTests(unittest.TestCase): 1763 def test_urandom_length(self): 1764 self.assertEqual(len(os.urandom(0)), 0) 1765 self.assertEqual(len(os.urandom(1)), 1) 1766 self.assertEqual(len(os.urandom(10)), 10) 1767 self.assertEqual(len(os.urandom(100)), 100) 1768 self.assertEqual(len(os.urandom(1000)), 1000) 1769 1770 def test_urandom_value(self): 1771 data1 = os.urandom(16) 1772 self.assertIsInstance(data1, bytes) 1773 data2 = os.urandom(16) 1774 self.assertNotEqual(data1, data2) 1775 1776 def get_urandom_subprocess(self, count): 1777 code = '\n'.join(( 1778 'import os, sys', 1779 'data = os.urandom(%s)' % count, 1780 'sys.stdout.buffer.write(data)', 1781 'sys.stdout.buffer.flush()')) 1782 out = assert_python_ok('-c', code) 1783 stdout = out[1] 1784 self.assertEqual(len(stdout), count) 1785 return stdout 1786 1787 def test_urandom_subprocess(self): 1788 data1 = self.get_urandom_subprocess(16) 1789 data2 = self.get_urandom_subprocess(16) 1790 self.assertNotEqual(data1, data2) 1791 1792 1793@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()') 1794class GetRandomTests(unittest.TestCase): 1795 @classmethod 1796 def setUpClass(cls): 1797 try: 1798 os.getrandom(1) 1799 except OSError as exc: 1800 if exc.errno == errno.ENOSYS: 1801 # Python compiled on a more recent Linux version 1802 # than the current Linux kernel 1803 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS") 1804 else: 1805 raise 1806 1807 def test_getrandom_type(self): 1808 data = os.getrandom(16) 1809 self.assertIsInstance(data, bytes) 1810 self.assertEqual(len(data), 16) 1811 1812 def test_getrandom0(self): 1813 empty = os.getrandom(0) 1814 self.assertEqual(empty, b'') 1815 1816 def test_getrandom_random(self): 1817 self.assertTrue(hasattr(os, 'GRND_RANDOM')) 1818 1819 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare 1820 # resource /dev/random 1821 1822 def test_getrandom_nonblock(self): 1823 # The call must not fail. Check also that the flag exists 1824 try: 1825 os.getrandom(1, os.GRND_NONBLOCK) 1826 except BlockingIOError: 1827 # System urandom is not initialized yet 1828 pass 1829 1830 def test_getrandom_value(self): 1831 data1 = os.getrandom(16) 1832 data2 = os.getrandom(16) 1833 self.assertNotEqual(data1, data2) 1834 1835 1836# os.urandom() doesn't use a file descriptor when it is implemented with the 1837# getentropy() function, the getrandom() function or the getrandom() syscall 1838OS_URANDOM_DONT_USE_FD = ( 1839 sysconfig.get_config_var('HAVE_GETENTROPY') == 1 1840 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1 1841 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1) 1842 1843@unittest.skipIf(OS_URANDOM_DONT_USE_FD , 1844 "os.random() does not use a file descriptor") 1845@unittest.skipIf(sys.platform == "vxworks", 1846 "VxWorks can't set RLIMIT_NOFILE to 1") 1847class URandomFDTests(unittest.TestCase): 1848 @unittest.skipUnless(resource, "test requires the resource module") 1849 def test_urandom_failure(self): 1850 # Check urandom() failing when it is not able to open /dev/random. 1851 # We spawn a new process to make the test more robust (if getrlimit() 1852 # failed to restore the file descriptor limit after this, the whole 1853 # test suite would crash; this actually happened on the OS X Tiger 1854 # buildbot). 1855 code = """if 1: 1856 import errno 1857 import os 1858 import resource 1859 1860 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE) 1861 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit)) 1862 try: 1863 os.urandom(16) 1864 except OSError as e: 1865 assert e.errno == errno.EMFILE, e.errno 1866 else: 1867 raise AssertionError("OSError not raised") 1868 """ 1869 assert_python_ok('-c', code) 1870 1871 def test_urandom_fd_closed(self): 1872 # Issue #21207: urandom() should reopen its fd to /dev/urandom if 1873 # closed. 1874 code = """if 1: 1875 import os 1876 import sys 1877 import test.support 1878 os.urandom(4) 1879 with test.support.SuppressCrashReport(): 1880 os.closerange(3, 256) 1881 sys.stdout.buffer.write(os.urandom(4)) 1882 """ 1883 rc, out, err = assert_python_ok('-Sc', code) 1884 1885 def test_urandom_fd_reopened(self): 1886 # Issue #21207: urandom() should detect its fd to /dev/urandom 1887 # changed to something else, and reopen it. 1888 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 1889 create_file(os_helper.TESTFN, b"x" * 256) 1890 1891 code = """if 1: 1892 import os 1893 import sys 1894 import test.support 1895 os.urandom(4) 1896 with test.support.SuppressCrashReport(): 1897 for fd in range(3, 256): 1898 try: 1899 os.close(fd) 1900 except OSError: 1901 pass 1902 else: 1903 # Found the urandom fd (XXX hopefully) 1904 break 1905 os.closerange(3, 256) 1906 with open({TESTFN!r}, 'rb') as f: 1907 new_fd = f.fileno() 1908 # Issue #26935: posix allows new_fd and fd to be equal but 1909 # some libc implementations have dup2 return an error in this 1910 # case. 1911 if new_fd != fd: 1912 os.dup2(new_fd, fd) 1913 sys.stdout.buffer.write(os.urandom(4)) 1914 sys.stdout.buffer.write(os.urandom(4)) 1915 """.format(TESTFN=os_helper.TESTFN) 1916 rc, out, err = assert_python_ok('-Sc', code) 1917 self.assertEqual(len(out), 8) 1918 self.assertNotEqual(out[0:4], out[4:8]) 1919 rc, out2, err2 = assert_python_ok('-Sc', code) 1920 self.assertEqual(len(out2), 8) 1921 self.assertNotEqual(out2, out) 1922 1923 1924@contextlib.contextmanager 1925def _execvpe_mockup(defpath=None): 1926 """ 1927 Stubs out execv and execve functions when used as context manager. 1928 Records exec calls. The mock execv and execve functions always raise an 1929 exception as they would normally never return. 1930 """ 1931 # A list of tuples containing (function name, first arg, args) 1932 # of calls to execv or execve that have been made. 1933 calls = [] 1934 1935 def mock_execv(name, *args): 1936 calls.append(('execv', name, args)) 1937 raise RuntimeError("execv called") 1938 1939 def mock_execve(name, *args): 1940 calls.append(('execve', name, args)) 1941 raise OSError(errno.ENOTDIR, "execve called") 1942 1943 try: 1944 orig_execv = os.execv 1945 orig_execve = os.execve 1946 orig_defpath = os.defpath 1947 os.execv = mock_execv 1948 os.execve = mock_execve 1949 if defpath is not None: 1950 os.defpath = defpath 1951 yield calls 1952 finally: 1953 os.execv = orig_execv 1954 os.execve = orig_execve 1955 os.defpath = orig_defpath 1956 1957@unittest.skipUnless(hasattr(os, 'execv'), 1958 "need os.execv()") 1959class ExecTests(unittest.TestCase): 1960 @unittest.skipIf(USING_LINUXTHREADS, 1961 "avoid triggering a linuxthreads bug: see issue #4970") 1962 def test_execvpe_with_bad_program(self): 1963 self.assertRaises(OSError, os.execvpe, 'no such app-', 1964 ['no such app-'], None) 1965 1966 def test_execv_with_bad_arglist(self): 1967 self.assertRaises(ValueError, os.execv, 'notepad', ()) 1968 self.assertRaises(ValueError, os.execv, 'notepad', []) 1969 self.assertRaises(ValueError, os.execv, 'notepad', ('',)) 1970 self.assertRaises(ValueError, os.execv, 'notepad', ['']) 1971 1972 def test_execvpe_with_bad_arglist(self): 1973 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None) 1974 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {}) 1975 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {}) 1976 1977 @unittest.skipUnless(hasattr(os, '_execvpe'), 1978 "No internal os._execvpe function to test.") 1979 def _test_internal_execvpe(self, test_type): 1980 program_path = os.sep + 'absolutepath' 1981 if test_type is bytes: 1982 program = b'executable' 1983 fullpath = os.path.join(os.fsencode(program_path), program) 1984 native_fullpath = fullpath 1985 arguments = [b'progname', 'arg1', 'arg2'] 1986 else: 1987 program = 'executable' 1988 arguments = ['progname', 'arg1', 'arg2'] 1989 fullpath = os.path.join(program_path, program) 1990 if os.name != "nt": 1991 native_fullpath = os.fsencode(fullpath) 1992 else: 1993 native_fullpath = fullpath 1994 env = {'spam': 'beans'} 1995 1996 # test os._execvpe() with an absolute path 1997 with _execvpe_mockup() as calls: 1998 self.assertRaises(RuntimeError, 1999 os._execvpe, fullpath, arguments) 2000 self.assertEqual(len(calls), 1) 2001 self.assertEqual(calls[0], ('execv', fullpath, (arguments,))) 2002 2003 # test os._execvpe() with a relative path: 2004 # os.get_exec_path() returns defpath 2005 with _execvpe_mockup(defpath=program_path) as calls: 2006 self.assertRaises(OSError, 2007 os._execvpe, program, arguments, env=env) 2008 self.assertEqual(len(calls), 1) 2009 self.assertSequenceEqual(calls[0], 2010 ('execve', native_fullpath, (arguments, env))) 2011 2012 # test os._execvpe() with a relative path: 2013 # os.get_exec_path() reads the 'PATH' variable 2014 with _execvpe_mockup() as calls: 2015 env_path = env.copy() 2016 if test_type is bytes: 2017 env_path[b'PATH'] = program_path 2018 else: 2019 env_path['PATH'] = program_path 2020 self.assertRaises(OSError, 2021 os._execvpe, program, arguments, env=env_path) 2022 self.assertEqual(len(calls), 1) 2023 self.assertSequenceEqual(calls[0], 2024 ('execve', native_fullpath, (arguments, env_path))) 2025 2026 def test_internal_execvpe_str(self): 2027 self._test_internal_execvpe(str) 2028 if os.name != "nt": 2029 self._test_internal_execvpe(bytes) 2030 2031 def test_execve_invalid_env(self): 2032 args = [sys.executable, '-c', 'pass'] 2033 2034 # null character in the environment variable name 2035 newenv = os.environ.copy() 2036 newenv["FRUIT\0VEGETABLE"] = "cabbage" 2037 with self.assertRaises(ValueError): 2038 os.execve(args[0], args, newenv) 2039 2040 # null character in the environment variable value 2041 newenv = os.environ.copy() 2042 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage" 2043 with self.assertRaises(ValueError): 2044 os.execve(args[0], args, newenv) 2045 2046 # equal character in the environment variable name 2047 newenv = os.environ.copy() 2048 newenv["FRUIT=ORANGE"] = "lemon" 2049 with self.assertRaises(ValueError): 2050 os.execve(args[0], args, newenv) 2051 2052 @unittest.skipUnless(sys.platform == "win32", "Win32-specific test") 2053 def test_execve_with_empty_path(self): 2054 # bpo-32890: Check GetLastError() misuse 2055 try: 2056 os.execve('', ['arg'], {}) 2057 except OSError as e: 2058 self.assertTrue(e.winerror is None or e.winerror != 0) 2059 else: 2060 self.fail('No OSError raised') 2061 2062 2063@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 2064class Win32ErrorTests(unittest.TestCase): 2065 def setUp(self): 2066 try: 2067 os.stat(os_helper.TESTFN) 2068 except FileNotFoundError: 2069 exists = False 2070 except OSError as exc: 2071 exists = True 2072 self.fail("file %s must not exist; os.stat failed with %s" 2073 % (os_helper.TESTFN, exc)) 2074 else: 2075 self.fail("file %s must not exist" % os_helper.TESTFN) 2076 2077 def test_rename(self): 2078 self.assertRaises(OSError, os.rename, os_helper.TESTFN, os_helper.TESTFN+".bak") 2079 2080 def test_remove(self): 2081 self.assertRaises(OSError, os.remove, os_helper.TESTFN) 2082 2083 def test_chdir(self): 2084 self.assertRaises(OSError, os.chdir, os_helper.TESTFN) 2085 2086 def test_mkdir(self): 2087 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 2088 2089 with open(os_helper.TESTFN, "x") as f: 2090 self.assertRaises(OSError, os.mkdir, os_helper.TESTFN) 2091 2092 def test_utime(self): 2093 self.assertRaises(OSError, os.utime, os_helper.TESTFN, None) 2094 2095 def test_chmod(self): 2096 self.assertRaises(OSError, os.chmod, os_helper.TESTFN, 0) 2097 2098 2099class TestInvalidFD(unittest.TestCase): 2100 singles = ["fchdir", "dup", "fdatasync", "fstat", 2101 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"] 2102 #singles.append("close") 2103 #We omit close because it doesn't raise an exception on some platforms 2104 def get_single(f): 2105 def helper(self): 2106 if hasattr(os, f): 2107 self.check(getattr(os, f)) 2108 return helper 2109 for f in singles: 2110 locals()["test_"+f] = get_single(f) 2111 2112 def check(self, f, *args, **kwargs): 2113 try: 2114 f(os_helper.make_bad_fd(), *args, **kwargs) 2115 except OSError as e: 2116 self.assertEqual(e.errno, errno.EBADF) 2117 else: 2118 self.fail("%r didn't raise an OSError with a bad file descriptor" 2119 % f) 2120 2121 def test_fdopen(self): 2122 self.check(os.fdopen, encoding="utf-8") 2123 2124 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()') 2125 def test_isatty(self): 2126 self.assertEqual(os.isatty(os_helper.make_bad_fd()), False) 2127 2128 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()') 2129 def test_closerange(self): 2130 fd = os_helper.make_bad_fd() 2131 # Make sure none of the descriptors we are about to close are 2132 # currently valid (issue 6542). 2133 for i in range(10): 2134 try: os.fstat(fd+i) 2135 except OSError: 2136 pass 2137 else: 2138 break 2139 if i < 2: 2140 raise unittest.SkipTest( 2141 "Unable to acquire a range of invalid file descriptors") 2142 self.assertEqual(os.closerange(fd, fd + i-1), None) 2143 2144 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()') 2145 def test_dup2(self): 2146 self.check(os.dup2, 20) 2147 2148 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()') 2149 def test_fchmod(self): 2150 self.check(os.fchmod, 0) 2151 2152 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()') 2153 def test_fchown(self): 2154 self.check(os.fchown, -1, -1) 2155 2156 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()') 2157 def test_fpathconf(self): 2158 self.check(os.pathconf, "PC_NAME_MAX") 2159 self.check(os.fpathconf, "PC_NAME_MAX") 2160 2161 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()') 2162 def test_ftruncate(self): 2163 self.check(os.truncate, 0) 2164 self.check(os.ftruncate, 0) 2165 2166 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()') 2167 def test_lseek(self): 2168 self.check(os.lseek, 0, 0) 2169 2170 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()') 2171 def test_read(self): 2172 self.check(os.read, 1) 2173 2174 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()') 2175 def test_readv(self): 2176 buf = bytearray(10) 2177 self.check(os.readv, [buf]) 2178 2179 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()') 2180 def test_tcsetpgrpt(self): 2181 self.check(os.tcsetpgrp, 0) 2182 2183 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()') 2184 def test_write(self): 2185 self.check(os.write, b" ") 2186 2187 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()') 2188 def test_writev(self): 2189 self.check(os.writev, [b'abc']) 2190 2191 def test_inheritable(self): 2192 self.check(os.get_inheritable) 2193 self.check(os.set_inheritable, True) 2194 2195 @unittest.skipUnless(hasattr(os, 'get_blocking'), 2196 'needs os.get_blocking() and os.set_blocking()') 2197 def test_blocking(self): 2198 self.check(os.get_blocking) 2199 self.check(os.set_blocking, True) 2200 2201 2202class LinkTests(unittest.TestCase): 2203 def setUp(self): 2204 self.file1 = os_helper.TESTFN 2205 self.file2 = os.path.join(os_helper.TESTFN + "2") 2206 2207 def tearDown(self): 2208 for file in (self.file1, self.file2): 2209 if os.path.exists(file): 2210 os.unlink(file) 2211 2212 def _test_link(self, file1, file2): 2213 create_file(file1) 2214 2215 try: 2216 os.link(file1, file2) 2217 except PermissionError as e: 2218 self.skipTest('os.link(): %s' % e) 2219 with open(file1, "rb") as f1, open(file2, "rb") as f2: 2220 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno())) 2221 2222 def test_link(self): 2223 self._test_link(self.file1, self.file2) 2224 2225 def test_link_bytes(self): 2226 self._test_link(bytes(self.file1, sys.getfilesystemencoding()), 2227 bytes(self.file2, sys.getfilesystemencoding())) 2228 2229 def test_unicode_name(self): 2230 try: 2231 os.fsencode("\xf1") 2232 except UnicodeError: 2233 raise unittest.SkipTest("Unable to encode for this platform.") 2234 2235 self.file1 += "\xf1" 2236 self.file2 = self.file1 + "2" 2237 self._test_link(self.file1, self.file2) 2238 2239@unittest.skipIf(sys.platform == "win32", "Posix specific tests") 2240class PosixUidGidTests(unittest.TestCase): 2241 # uid_t and gid_t are 32-bit unsigned integers on Linux 2242 UID_OVERFLOW = (1 << 32) 2243 GID_OVERFLOW = (1 << 32) 2244 2245 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()') 2246 def test_setuid(self): 2247 if os.getuid() != 0: 2248 self.assertRaises(OSError, os.setuid, 0) 2249 self.assertRaises(TypeError, os.setuid, 'not an int') 2250 self.assertRaises(OverflowError, os.setuid, self.UID_OVERFLOW) 2251 2252 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()') 2253 def test_setgid(self): 2254 if os.getuid() != 0 and not HAVE_WHEEL_GROUP: 2255 self.assertRaises(OSError, os.setgid, 0) 2256 self.assertRaises(TypeError, os.setgid, 'not an int') 2257 self.assertRaises(OverflowError, os.setgid, self.GID_OVERFLOW) 2258 2259 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()') 2260 def test_seteuid(self): 2261 if os.getuid() != 0: 2262 self.assertRaises(OSError, os.seteuid, 0) 2263 self.assertRaises(TypeError, os.setegid, 'not an int') 2264 self.assertRaises(OverflowError, os.seteuid, self.UID_OVERFLOW) 2265 2266 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()') 2267 def test_setegid(self): 2268 if os.getuid() != 0 and not HAVE_WHEEL_GROUP: 2269 self.assertRaises(OSError, os.setegid, 0) 2270 self.assertRaises(TypeError, os.setegid, 'not an int') 2271 self.assertRaises(OverflowError, os.setegid, self.GID_OVERFLOW) 2272 2273 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()') 2274 def test_setreuid(self): 2275 if os.getuid() != 0: 2276 self.assertRaises(OSError, os.setreuid, 0, 0) 2277 self.assertRaises(TypeError, os.setreuid, 'not an int', 0) 2278 self.assertRaises(TypeError, os.setreuid, 0, 'not an int') 2279 self.assertRaises(OverflowError, os.setreuid, self.UID_OVERFLOW, 0) 2280 self.assertRaises(OverflowError, os.setreuid, 0, self.UID_OVERFLOW) 2281 2282 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()') 2283 def test_setreuid_neg1(self): 2284 # Needs to accept -1. We run this in a subprocess to avoid 2285 # altering the test runner's process state (issue8045). 2286 subprocess.check_call([ 2287 sys.executable, '-c', 2288 'import os,sys;os.setreuid(-1,-1);sys.exit(0)']) 2289 2290 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()') 2291 def test_setregid(self): 2292 if os.getuid() != 0 and not HAVE_WHEEL_GROUP: 2293 self.assertRaises(OSError, os.setregid, 0, 0) 2294 self.assertRaises(TypeError, os.setregid, 'not an int', 0) 2295 self.assertRaises(TypeError, os.setregid, 0, 'not an int') 2296 self.assertRaises(OverflowError, os.setregid, self.GID_OVERFLOW, 0) 2297 self.assertRaises(OverflowError, os.setregid, 0, self.GID_OVERFLOW) 2298 2299 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()') 2300 def test_setregid_neg1(self): 2301 # Needs to accept -1. We run this in a subprocess to avoid 2302 # altering the test runner's process state (issue8045). 2303 subprocess.check_call([ 2304 sys.executable, '-c', 2305 'import os,sys;os.setregid(-1,-1);sys.exit(0)']) 2306 2307@unittest.skipIf(sys.platform == "win32", "Posix specific tests") 2308class Pep383Tests(unittest.TestCase): 2309 def setUp(self): 2310 if os_helper.TESTFN_UNENCODABLE: 2311 self.dir = os_helper.TESTFN_UNENCODABLE 2312 elif os_helper.TESTFN_NONASCII: 2313 self.dir = os_helper.TESTFN_NONASCII 2314 else: 2315 self.dir = os_helper.TESTFN 2316 self.bdir = os.fsencode(self.dir) 2317 2318 bytesfn = [] 2319 def add_filename(fn): 2320 try: 2321 fn = os.fsencode(fn) 2322 except UnicodeEncodeError: 2323 return 2324 bytesfn.append(fn) 2325 add_filename(os_helper.TESTFN_UNICODE) 2326 if os_helper.TESTFN_UNENCODABLE: 2327 add_filename(os_helper.TESTFN_UNENCODABLE) 2328 if os_helper.TESTFN_NONASCII: 2329 add_filename(os_helper.TESTFN_NONASCII) 2330 if not bytesfn: 2331 self.skipTest("couldn't create any non-ascii filename") 2332 2333 self.unicodefn = set() 2334 os.mkdir(self.dir) 2335 try: 2336 for fn in bytesfn: 2337 os_helper.create_empty_file(os.path.join(self.bdir, fn)) 2338 fn = os.fsdecode(fn) 2339 if fn in self.unicodefn: 2340 raise ValueError("duplicate filename") 2341 self.unicodefn.add(fn) 2342 except: 2343 shutil.rmtree(self.dir) 2344 raise 2345 2346 def tearDown(self): 2347 shutil.rmtree(self.dir) 2348 2349 def test_listdir(self): 2350 expected = self.unicodefn 2351 found = set(os.listdir(self.dir)) 2352 self.assertEqual(found, expected) 2353 # test listdir without arguments 2354 current_directory = os.getcwd() 2355 try: 2356 os.chdir(os.sep) 2357 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep))) 2358 finally: 2359 os.chdir(current_directory) 2360 2361 def test_open(self): 2362 for fn in self.unicodefn: 2363 f = open(os.path.join(self.dir, fn), 'rb') 2364 f.close() 2365 2366 @unittest.skipUnless(hasattr(os, 'statvfs'), 2367 "need os.statvfs()") 2368 def test_statvfs(self): 2369 # issue #9645 2370 for fn in self.unicodefn: 2371 # should not fail with file not found error 2372 fullname = os.path.join(self.dir, fn) 2373 os.statvfs(fullname) 2374 2375 def test_stat(self): 2376 for fn in self.unicodefn: 2377 os.stat(os.path.join(self.dir, fn)) 2378 2379@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 2380class Win32KillTests(unittest.TestCase): 2381 def _kill(self, sig): 2382 # Start sys.executable as a subprocess and communicate from the 2383 # subprocess to the parent that the interpreter is ready. When it 2384 # becomes ready, send *sig* via os.kill to the subprocess and check 2385 # that the return code is equal to *sig*. 2386 import ctypes 2387 from ctypes import wintypes 2388 import msvcrt 2389 2390 # Since we can't access the contents of the process' stdout until the 2391 # process has exited, use PeekNamedPipe to see what's inside stdout 2392 # without waiting. This is done so we can tell that the interpreter 2393 # is started and running at a point where it could handle a signal. 2394 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe 2395 PeekNamedPipe.restype = wintypes.BOOL 2396 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle 2397 ctypes.POINTER(ctypes.c_char), # stdout buf 2398 wintypes.DWORD, # Buffer size 2399 ctypes.POINTER(wintypes.DWORD), # bytes read 2400 ctypes.POINTER(wintypes.DWORD), # bytes avail 2401 ctypes.POINTER(wintypes.DWORD)) # bytes left 2402 msg = "running" 2403 proc = subprocess.Popen([sys.executable, "-c", 2404 "import sys;" 2405 "sys.stdout.write('{}');" 2406 "sys.stdout.flush();" 2407 "input()".format(msg)], 2408 stdout=subprocess.PIPE, 2409 stderr=subprocess.PIPE, 2410 stdin=subprocess.PIPE) 2411 self.addCleanup(proc.stdout.close) 2412 self.addCleanup(proc.stderr.close) 2413 self.addCleanup(proc.stdin.close) 2414 2415 count, max = 0, 100 2416 while count < max and proc.poll() is None: 2417 # Create a string buffer to store the result of stdout from the pipe 2418 buf = ctypes.create_string_buffer(len(msg)) 2419 # Obtain the text currently in proc.stdout 2420 # Bytes read/avail/left are left as NULL and unused 2421 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()), 2422 buf, ctypes.sizeof(buf), None, None, None) 2423 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed") 2424 if buf.value: 2425 self.assertEqual(msg, buf.value.decode()) 2426 break 2427 time.sleep(0.1) 2428 count += 1 2429 else: 2430 self.fail("Did not receive communication from the subprocess") 2431 2432 os.kill(proc.pid, sig) 2433 self.assertEqual(proc.wait(), sig) 2434 2435 def test_kill_sigterm(self): 2436 # SIGTERM doesn't mean anything special, but make sure it works 2437 self._kill(signal.SIGTERM) 2438 2439 def test_kill_int(self): 2440 # os.kill on Windows can take an int which gets set as the exit code 2441 self._kill(100) 2442 2443 def _kill_with_event(self, event, name): 2444 tagname = "test_os_%s" % uuid.uuid1() 2445 m = mmap.mmap(-1, 1, tagname) 2446 m[0] = 0 2447 # Run a script which has console control handling enabled. 2448 proc = subprocess.Popen([sys.executable, 2449 os.path.join(os.path.dirname(__file__), 2450 "win_console_handler.py"), tagname], 2451 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP) 2452 # Let the interpreter startup before we send signals. See #3137. 2453 count, max = 0, 100 2454 while count < max and proc.poll() is None: 2455 if m[0] == 1: 2456 break 2457 time.sleep(0.1) 2458 count += 1 2459 else: 2460 # Forcefully kill the process if we weren't able to signal it. 2461 os.kill(proc.pid, signal.SIGINT) 2462 self.fail("Subprocess didn't finish initialization") 2463 os.kill(proc.pid, event) 2464 # proc.send_signal(event) could also be done here. 2465 # Allow time for the signal to be passed and the process to exit. 2466 time.sleep(0.5) 2467 if not proc.poll(): 2468 # Forcefully kill the process if we weren't able to signal it. 2469 os.kill(proc.pid, signal.SIGINT) 2470 self.fail("subprocess did not stop on {}".format(name)) 2471 2472 @unittest.skip("subprocesses aren't inheriting Ctrl+C property") 2473 def test_CTRL_C_EVENT(self): 2474 from ctypes import wintypes 2475 import ctypes 2476 2477 # Make a NULL value by creating a pointer with no argument. 2478 NULL = ctypes.POINTER(ctypes.c_int)() 2479 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler 2480 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int), 2481 wintypes.BOOL) 2482 SetConsoleCtrlHandler.restype = wintypes.BOOL 2483 2484 # Calling this with NULL and FALSE causes the calling process to 2485 # handle Ctrl+C, rather than ignore it. This property is inherited 2486 # by subprocesses. 2487 SetConsoleCtrlHandler(NULL, 0) 2488 2489 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT") 2490 2491 def test_CTRL_BREAK_EVENT(self): 2492 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT") 2493 2494 2495@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 2496class Win32ListdirTests(unittest.TestCase): 2497 """Test listdir on Windows.""" 2498 2499 def setUp(self): 2500 self.created_paths = [] 2501 for i in range(2): 2502 dir_name = 'SUB%d' % i 2503 dir_path = os.path.join(os_helper.TESTFN, dir_name) 2504 file_name = 'FILE%d' % i 2505 file_path = os.path.join(os_helper.TESTFN, file_name) 2506 os.makedirs(dir_path) 2507 with open(file_path, 'w', encoding='utf-8') as f: 2508 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path) 2509 self.created_paths.extend([dir_name, file_name]) 2510 self.created_paths.sort() 2511 2512 def tearDown(self): 2513 shutil.rmtree(os_helper.TESTFN) 2514 2515 def test_listdir_no_extended_path(self): 2516 """Test when the path is not an "extended" path.""" 2517 # unicode 2518 self.assertEqual( 2519 sorted(os.listdir(os_helper.TESTFN)), 2520 self.created_paths) 2521 2522 # bytes 2523 self.assertEqual( 2524 sorted(os.listdir(os.fsencode(os_helper.TESTFN))), 2525 [os.fsencode(path) for path in self.created_paths]) 2526 2527 def test_listdir_extended_path(self): 2528 """Test when the path starts with '\\\\?\\'.""" 2529 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath 2530 # unicode 2531 path = '\\\\?\\' + os.path.abspath(os_helper.TESTFN) 2532 self.assertEqual( 2533 sorted(os.listdir(path)), 2534 self.created_paths) 2535 2536 # bytes 2537 path = b'\\\\?\\' + os.fsencode(os.path.abspath(os_helper.TESTFN)) 2538 self.assertEqual( 2539 sorted(os.listdir(path)), 2540 [os.fsencode(path) for path in self.created_paths]) 2541 2542 2543@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()') 2544class ReadlinkTests(unittest.TestCase): 2545 filelink = 'readlinktest' 2546 filelink_target = os.path.abspath(__file__) 2547 filelinkb = os.fsencode(filelink) 2548 filelinkb_target = os.fsencode(filelink_target) 2549 2550 def assertPathEqual(self, left, right): 2551 left = os.path.normcase(left) 2552 right = os.path.normcase(right) 2553 if sys.platform == 'win32': 2554 # Bad practice to blindly strip the prefix as it may be required to 2555 # correctly refer to the file, but we're only comparing paths here. 2556 has_prefix = lambda p: p.startswith( 2557 b'\\\\?\\' if isinstance(p, bytes) else '\\\\?\\') 2558 if has_prefix(left): 2559 left = left[4:] 2560 if has_prefix(right): 2561 right = right[4:] 2562 self.assertEqual(left, right) 2563 2564 def setUp(self): 2565 self.assertTrue(os.path.exists(self.filelink_target)) 2566 self.assertTrue(os.path.exists(self.filelinkb_target)) 2567 self.assertFalse(os.path.exists(self.filelink)) 2568 self.assertFalse(os.path.exists(self.filelinkb)) 2569 2570 def test_not_symlink(self): 2571 filelink_target = FakePath(self.filelink_target) 2572 self.assertRaises(OSError, os.readlink, self.filelink_target) 2573 self.assertRaises(OSError, os.readlink, filelink_target) 2574 2575 def test_missing_link(self): 2576 self.assertRaises(FileNotFoundError, os.readlink, 'missing-link') 2577 self.assertRaises(FileNotFoundError, os.readlink, 2578 FakePath('missing-link')) 2579 2580 @os_helper.skip_unless_symlink 2581 def test_pathlike(self): 2582 os.symlink(self.filelink_target, self.filelink) 2583 self.addCleanup(os_helper.unlink, self.filelink) 2584 filelink = FakePath(self.filelink) 2585 self.assertPathEqual(os.readlink(filelink), self.filelink_target) 2586 2587 @os_helper.skip_unless_symlink 2588 def test_pathlike_bytes(self): 2589 os.symlink(self.filelinkb_target, self.filelinkb) 2590 self.addCleanup(os_helper.unlink, self.filelinkb) 2591 path = os.readlink(FakePath(self.filelinkb)) 2592 self.assertPathEqual(path, self.filelinkb_target) 2593 self.assertIsInstance(path, bytes) 2594 2595 @os_helper.skip_unless_symlink 2596 def test_bytes(self): 2597 os.symlink(self.filelinkb_target, self.filelinkb) 2598 self.addCleanup(os_helper.unlink, self.filelinkb) 2599 path = os.readlink(self.filelinkb) 2600 self.assertPathEqual(path, self.filelinkb_target) 2601 self.assertIsInstance(path, bytes) 2602 2603 2604@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 2605@os_helper.skip_unless_symlink 2606class Win32SymlinkTests(unittest.TestCase): 2607 filelink = 'filelinktest' 2608 filelink_target = os.path.abspath(__file__) 2609 dirlink = 'dirlinktest' 2610 dirlink_target = os.path.dirname(filelink_target) 2611 missing_link = 'missing link' 2612 2613 def setUp(self): 2614 assert os.path.exists(self.dirlink_target) 2615 assert os.path.exists(self.filelink_target) 2616 assert not os.path.exists(self.dirlink) 2617 assert not os.path.exists(self.filelink) 2618 assert not os.path.exists(self.missing_link) 2619 2620 def tearDown(self): 2621 if os.path.exists(self.filelink): 2622 os.remove(self.filelink) 2623 if os.path.exists(self.dirlink): 2624 os.rmdir(self.dirlink) 2625 if os.path.lexists(self.missing_link): 2626 os.remove(self.missing_link) 2627 2628 def test_directory_link(self): 2629 os.symlink(self.dirlink_target, self.dirlink) 2630 self.assertTrue(os.path.exists(self.dirlink)) 2631 self.assertTrue(os.path.isdir(self.dirlink)) 2632 self.assertTrue(os.path.islink(self.dirlink)) 2633 self.check_stat(self.dirlink, self.dirlink_target) 2634 2635 def test_file_link(self): 2636 os.symlink(self.filelink_target, self.filelink) 2637 self.assertTrue(os.path.exists(self.filelink)) 2638 self.assertTrue(os.path.isfile(self.filelink)) 2639 self.assertTrue(os.path.islink(self.filelink)) 2640 self.check_stat(self.filelink, self.filelink_target) 2641 2642 def _create_missing_dir_link(self): 2643 'Create a "directory" link to a non-existent target' 2644 linkname = self.missing_link 2645 if os.path.lexists(linkname): 2646 os.remove(linkname) 2647 target = r'c:\\target does not exist.29r3c740' 2648 assert not os.path.exists(target) 2649 target_is_dir = True 2650 os.symlink(target, linkname, target_is_dir) 2651 2652 def test_remove_directory_link_to_missing_target(self): 2653 self._create_missing_dir_link() 2654 # For compatibility with Unix, os.remove will check the 2655 # directory status and call RemoveDirectory if the symlink 2656 # was created with target_is_dir==True. 2657 os.remove(self.missing_link) 2658 2659 def test_isdir_on_directory_link_to_missing_target(self): 2660 self._create_missing_dir_link() 2661 self.assertFalse(os.path.isdir(self.missing_link)) 2662 2663 def test_rmdir_on_directory_link_to_missing_target(self): 2664 self._create_missing_dir_link() 2665 os.rmdir(self.missing_link) 2666 2667 def check_stat(self, link, target): 2668 self.assertEqual(os.stat(link), os.stat(target)) 2669 self.assertNotEqual(os.lstat(link), os.stat(link)) 2670 2671 bytes_link = os.fsencode(link) 2672 self.assertEqual(os.stat(bytes_link), os.stat(target)) 2673 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link)) 2674 2675 def test_12084(self): 2676 level1 = os.path.abspath(os_helper.TESTFN) 2677 level2 = os.path.join(level1, "level2") 2678 level3 = os.path.join(level2, "level3") 2679 self.addCleanup(os_helper.rmtree, level1) 2680 2681 os.mkdir(level1) 2682 os.mkdir(level2) 2683 os.mkdir(level3) 2684 2685 file1 = os.path.abspath(os.path.join(level1, "file1")) 2686 create_file(file1) 2687 2688 orig_dir = os.getcwd() 2689 try: 2690 os.chdir(level2) 2691 link = os.path.join(level2, "link") 2692 os.symlink(os.path.relpath(file1), "link") 2693 self.assertIn("link", os.listdir(os.getcwd())) 2694 2695 # Check os.stat calls from the same dir as the link 2696 self.assertEqual(os.stat(file1), os.stat("link")) 2697 2698 # Check os.stat calls from a dir below the link 2699 os.chdir(level1) 2700 self.assertEqual(os.stat(file1), 2701 os.stat(os.path.relpath(link))) 2702 2703 # Check os.stat calls from a dir above the link 2704 os.chdir(level3) 2705 self.assertEqual(os.stat(file1), 2706 os.stat(os.path.relpath(link))) 2707 finally: 2708 os.chdir(orig_dir) 2709 2710 @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users') 2711 and os.path.exists(r'C:\ProgramData'), 2712 'Test directories not found') 2713 def test_29248(self): 2714 # os.symlink() calls CreateSymbolicLink, which creates 2715 # the reparse data buffer with the print name stored 2716 # first, so the offset is always 0. CreateSymbolicLink 2717 # stores the "PrintName" DOS path (e.g. "C:\") first, 2718 # with an offset of 0, followed by the "SubstituteName" 2719 # NT path (e.g. "\??\C:\"). The "All Users" link, on 2720 # the other hand, seems to have been created manually 2721 # with an inverted order. 2722 target = os.readlink(r'C:\Users\All Users') 2723 self.assertTrue(os.path.samefile(target, r'C:\ProgramData')) 2724 2725 def test_buffer_overflow(self): 2726 # Older versions would have a buffer overflow when detecting 2727 # whether a link source was a directory. This test ensures we 2728 # no longer crash, but does not otherwise validate the behavior 2729 segment = 'X' * 27 2730 path = os.path.join(*[segment] * 10) 2731 test_cases = [ 2732 # overflow with absolute src 2733 ('\\' + path, segment), 2734 # overflow dest with relative src 2735 (segment, path), 2736 # overflow when joining src 2737 (path[:180], path[:180]), 2738 ] 2739 for src, dest in test_cases: 2740 try: 2741 os.symlink(src, dest) 2742 except FileNotFoundError: 2743 pass 2744 else: 2745 try: 2746 os.remove(dest) 2747 except OSError: 2748 pass 2749 # Also test with bytes, since that is a separate code path. 2750 try: 2751 os.symlink(os.fsencode(src), os.fsencode(dest)) 2752 except FileNotFoundError: 2753 pass 2754 else: 2755 try: 2756 os.remove(dest) 2757 except OSError: 2758 pass 2759 2760 def test_appexeclink(self): 2761 root = os.path.expandvars(r'%LOCALAPPDATA%\Microsoft\WindowsApps') 2762 if not os.path.isdir(root): 2763 self.skipTest("test requires a WindowsApps directory") 2764 2765 aliases = [os.path.join(root, a) 2766 for a in fnmatch.filter(os.listdir(root), '*.exe')] 2767 2768 for alias in aliases: 2769 if support.verbose: 2770 print() 2771 print("Testing with", alias) 2772 st = os.lstat(alias) 2773 self.assertEqual(st, os.stat(alias)) 2774 self.assertFalse(stat.S_ISLNK(st.st_mode)) 2775 self.assertEqual(st.st_reparse_tag, stat.IO_REPARSE_TAG_APPEXECLINK) 2776 # testing the first one we see is sufficient 2777 break 2778 else: 2779 self.skipTest("test requires an app execution alias") 2780 2781@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 2782class Win32JunctionTests(unittest.TestCase): 2783 junction = 'junctiontest' 2784 junction_target = os.path.dirname(os.path.abspath(__file__)) 2785 2786 def setUp(self): 2787 assert os.path.exists(self.junction_target) 2788 assert not os.path.lexists(self.junction) 2789 2790 def tearDown(self): 2791 if os.path.lexists(self.junction): 2792 os.unlink(self.junction) 2793 2794 def test_create_junction(self): 2795 _winapi.CreateJunction(self.junction_target, self.junction) 2796 self.assertTrue(os.path.lexists(self.junction)) 2797 self.assertTrue(os.path.exists(self.junction)) 2798 self.assertTrue(os.path.isdir(self.junction)) 2799 self.assertNotEqual(os.stat(self.junction), os.lstat(self.junction)) 2800 self.assertEqual(os.stat(self.junction), os.stat(self.junction_target)) 2801 2802 # bpo-37834: Junctions are not recognized as links. 2803 self.assertFalse(os.path.islink(self.junction)) 2804 self.assertEqual(os.path.normcase("\\\\?\\" + self.junction_target), 2805 os.path.normcase(os.readlink(self.junction))) 2806 2807 def test_unlink_removes_junction(self): 2808 _winapi.CreateJunction(self.junction_target, self.junction) 2809 self.assertTrue(os.path.exists(self.junction)) 2810 self.assertTrue(os.path.lexists(self.junction)) 2811 2812 os.unlink(self.junction) 2813 self.assertFalse(os.path.exists(self.junction)) 2814 2815@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") 2816class Win32NtTests(unittest.TestCase): 2817 def test_getfinalpathname_handles(self): 2818 nt = import_helper.import_module('nt') 2819 ctypes = import_helper.import_module('ctypes') 2820 import ctypes.wintypes 2821 2822 kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True) 2823 kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE 2824 2825 kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL 2826 kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE, 2827 ctypes.wintypes.LPDWORD) 2828 2829 # This is a pseudo-handle that doesn't need to be closed 2830 hproc = kernel.GetCurrentProcess() 2831 2832 handle_count = ctypes.wintypes.DWORD() 2833 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count)) 2834 self.assertEqual(1, ok) 2835 2836 before_count = handle_count.value 2837 2838 # The first two test the error path, __file__ tests the success path 2839 filenames = [ 2840 r'\\?\C:', 2841 r'\\?\NUL', 2842 r'\\?\CONIN', 2843 __file__, 2844 ] 2845 2846 for _ in range(10): 2847 for name in filenames: 2848 try: 2849 nt._getfinalpathname(name) 2850 except Exception: 2851 # Failure is expected 2852 pass 2853 try: 2854 os.stat(name) 2855 except Exception: 2856 pass 2857 2858 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count)) 2859 self.assertEqual(1, ok) 2860 2861 handle_delta = handle_count.value - before_count 2862 2863 self.assertEqual(0, handle_delta) 2864 2865@os_helper.skip_unless_symlink 2866class NonLocalSymlinkTests(unittest.TestCase): 2867 2868 def setUp(self): 2869 r""" 2870 Create this structure: 2871 2872 base 2873 \___ some_dir 2874 """ 2875 os.makedirs('base/some_dir') 2876 2877 def tearDown(self): 2878 shutil.rmtree('base') 2879 2880 def test_directory_link_nonlocal(self): 2881 """ 2882 The symlink target should resolve relative to the link, not relative 2883 to the current directory. 2884 2885 Then, link base/some_link -> base/some_dir and ensure that some_link 2886 is resolved as a directory. 2887 2888 In issue13772, it was discovered that directory detection failed if 2889 the symlink target was not specified relative to the current 2890 directory, which was a defect in the implementation. 2891 """ 2892 src = os.path.join('base', 'some_link') 2893 os.symlink('some_dir', src) 2894 assert os.path.isdir(src) 2895 2896 2897class FSEncodingTests(unittest.TestCase): 2898 def test_nop(self): 2899 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff') 2900 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141') 2901 2902 def test_identity(self): 2903 # assert fsdecode(fsencode(x)) == x 2904 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'): 2905 try: 2906 bytesfn = os.fsencode(fn) 2907 except UnicodeEncodeError: 2908 continue 2909 self.assertEqual(os.fsdecode(bytesfn), fn) 2910 2911 2912 2913class DeviceEncodingTests(unittest.TestCase): 2914 2915 def test_bad_fd(self): 2916 # Return None when an fd doesn't actually exist. 2917 self.assertIsNone(os.device_encoding(123456)) 2918 2919 @unittest.skipUnless(os.isatty(0) and not win32_is_iot() and (sys.platform.startswith('win') or 2920 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))), 2921 'test requires a tty and either Windows or nl_langinfo(CODESET)') 2922 def test_device_encoding(self): 2923 encoding = os.device_encoding(0) 2924 self.assertIsNotNone(encoding) 2925 self.assertTrue(codecs.lookup(encoding)) 2926 2927 2928class PidTests(unittest.TestCase): 2929 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid") 2930 def test_getppid(self): 2931 p = subprocess.Popen([sys.executable, '-c', 2932 'import os; print(os.getppid())'], 2933 stdout=subprocess.PIPE) 2934 stdout, _ = p.communicate() 2935 # We are the parent of our subprocess 2936 self.assertEqual(int(stdout), os.getpid()) 2937 2938 def check_waitpid(self, code, exitcode, callback=None): 2939 if sys.platform == 'win32': 2940 # On Windows, os.spawnv() simply joins arguments with spaces: 2941 # arguments need to be quoted 2942 args = [f'"{sys.executable}"', '-c', f'"{code}"'] 2943 else: 2944 args = [sys.executable, '-c', code] 2945 pid = os.spawnv(os.P_NOWAIT, sys.executable, args) 2946 2947 if callback is not None: 2948 callback(pid) 2949 2950 # don't use support.wait_process() to test directly os.waitpid() 2951 # and os.waitstatus_to_exitcode() 2952 pid2, status = os.waitpid(pid, 0) 2953 self.assertEqual(os.waitstatus_to_exitcode(status), exitcode) 2954 self.assertEqual(pid2, pid) 2955 2956 def test_waitpid(self): 2957 self.check_waitpid(code='pass', exitcode=0) 2958 2959 def test_waitstatus_to_exitcode(self): 2960 exitcode = 23 2961 code = f'import sys; sys.exit({exitcode})' 2962 self.check_waitpid(code, exitcode=exitcode) 2963 2964 with self.assertRaises(TypeError): 2965 os.waitstatus_to_exitcode(0.0) 2966 2967 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test') 2968 def test_waitpid_windows(self): 2969 # bpo-40138: test os.waitpid() and os.waitstatus_to_exitcode() 2970 # with exit code larger than INT_MAX. 2971 STATUS_CONTROL_C_EXIT = 0xC000013A 2972 code = f'import _winapi; _winapi.ExitProcess({STATUS_CONTROL_C_EXIT})' 2973 self.check_waitpid(code, exitcode=STATUS_CONTROL_C_EXIT) 2974 2975 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test') 2976 def test_waitstatus_to_exitcode_windows(self): 2977 max_exitcode = 2 ** 32 - 1 2978 for exitcode in (0, 1, 5, max_exitcode): 2979 self.assertEqual(os.waitstatus_to_exitcode(exitcode << 8), 2980 exitcode) 2981 2982 # invalid values 2983 with self.assertRaises(ValueError): 2984 os.waitstatus_to_exitcode((max_exitcode + 1) << 8) 2985 with self.assertRaises(OverflowError): 2986 os.waitstatus_to_exitcode(-1) 2987 2988 # Skip the test on Windows 2989 @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'need signal.SIGKILL') 2990 def test_waitstatus_to_exitcode_kill(self): 2991 code = f'import time; time.sleep({support.LONG_TIMEOUT})' 2992 signum = signal.SIGKILL 2993 2994 def kill_process(pid): 2995 os.kill(pid, signum) 2996 2997 self.check_waitpid(code, exitcode=-signum, callback=kill_process) 2998 2999 3000class SpawnTests(unittest.TestCase): 3001 def create_args(self, *, with_env=False, use_bytes=False): 3002 self.exitcode = 17 3003 3004 filename = os_helper.TESTFN 3005 self.addCleanup(os_helper.unlink, filename) 3006 3007 if not with_env: 3008 code = 'import sys; sys.exit(%s)' % self.exitcode 3009 else: 3010 self.env = dict(os.environ) 3011 # create an unique key 3012 self.key = str(uuid.uuid4()) 3013 self.env[self.key] = self.key 3014 # read the variable from os.environ to check that it exists 3015 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)' 3016 % (self.key, self.exitcode)) 3017 3018 with open(filename, "w", encoding="utf-8") as fp: 3019 fp.write(code) 3020 3021 args = [sys.executable, filename] 3022 if use_bytes: 3023 args = [os.fsencode(a) for a in args] 3024 self.env = {os.fsencode(k): os.fsencode(v) 3025 for k, v in self.env.items()} 3026 3027 return args 3028 3029 @requires_os_func('spawnl') 3030 def test_spawnl(self): 3031 args = self.create_args() 3032 exitcode = os.spawnl(os.P_WAIT, args[0], *args) 3033 self.assertEqual(exitcode, self.exitcode) 3034 3035 @requires_os_func('spawnle') 3036 def test_spawnle(self): 3037 args = self.create_args(with_env=True) 3038 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env) 3039 self.assertEqual(exitcode, self.exitcode) 3040 3041 @requires_os_func('spawnlp') 3042 def test_spawnlp(self): 3043 args = self.create_args() 3044 exitcode = os.spawnlp(os.P_WAIT, args[0], *args) 3045 self.assertEqual(exitcode, self.exitcode) 3046 3047 @requires_os_func('spawnlpe') 3048 def test_spawnlpe(self): 3049 args = self.create_args(with_env=True) 3050 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env) 3051 self.assertEqual(exitcode, self.exitcode) 3052 3053 @requires_os_func('spawnv') 3054 def test_spawnv(self): 3055 args = self.create_args() 3056 exitcode = os.spawnv(os.P_WAIT, args[0], args) 3057 self.assertEqual(exitcode, self.exitcode) 3058 3059 # Test for PyUnicode_FSConverter() 3060 exitcode = os.spawnv(os.P_WAIT, FakePath(args[0]), args) 3061 self.assertEqual(exitcode, self.exitcode) 3062 3063 @requires_os_func('spawnve') 3064 def test_spawnve(self): 3065 args = self.create_args(with_env=True) 3066 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env) 3067 self.assertEqual(exitcode, self.exitcode) 3068 3069 @requires_os_func('spawnvp') 3070 def test_spawnvp(self): 3071 args = self.create_args() 3072 exitcode = os.spawnvp(os.P_WAIT, args[0], args) 3073 self.assertEqual(exitcode, self.exitcode) 3074 3075 @requires_os_func('spawnvpe') 3076 def test_spawnvpe(self): 3077 args = self.create_args(with_env=True) 3078 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env) 3079 self.assertEqual(exitcode, self.exitcode) 3080 3081 @requires_os_func('spawnv') 3082 def test_nowait(self): 3083 args = self.create_args() 3084 pid = os.spawnv(os.P_NOWAIT, args[0], args) 3085 support.wait_process(pid, exitcode=self.exitcode) 3086 3087 @requires_os_func('spawnve') 3088 def test_spawnve_bytes(self): 3089 # Test bytes handling in parse_arglist and parse_envlist (#28114) 3090 args = self.create_args(with_env=True, use_bytes=True) 3091 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env) 3092 self.assertEqual(exitcode, self.exitcode) 3093 3094 @requires_os_func('spawnl') 3095 def test_spawnl_noargs(self): 3096 args = self.create_args() 3097 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0]) 3098 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '') 3099 3100 @requires_os_func('spawnle') 3101 def test_spawnle_noargs(self): 3102 args = self.create_args() 3103 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {}) 3104 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {}) 3105 3106 @requires_os_func('spawnv') 3107 def test_spawnv_noargs(self): 3108 args = self.create_args() 3109 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ()) 3110 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], []) 3111 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',)) 3112 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ['']) 3113 3114 @requires_os_func('spawnve') 3115 def test_spawnve_noargs(self): 3116 args = self.create_args() 3117 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {}) 3118 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {}) 3119 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {}) 3120 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {}) 3121 3122 def _test_invalid_env(self, spawn): 3123 args = [sys.executable, '-c', 'pass'] 3124 3125 # null character in the environment variable name 3126 newenv = os.environ.copy() 3127 newenv["FRUIT\0VEGETABLE"] = "cabbage" 3128 try: 3129 exitcode = spawn(os.P_WAIT, args[0], args, newenv) 3130 except ValueError: 3131 pass 3132 else: 3133 self.assertEqual(exitcode, 127) 3134 3135 # null character in the environment variable value 3136 newenv = os.environ.copy() 3137 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage" 3138 try: 3139 exitcode = spawn(os.P_WAIT, args[0], args, newenv) 3140 except ValueError: 3141 pass 3142 else: 3143 self.assertEqual(exitcode, 127) 3144 3145 # equal character in the environment variable name 3146 newenv = os.environ.copy() 3147 newenv["FRUIT=ORANGE"] = "lemon" 3148 try: 3149 exitcode = spawn(os.P_WAIT, args[0], args, newenv) 3150 except ValueError: 3151 pass 3152 else: 3153 self.assertEqual(exitcode, 127) 3154 3155 # equal character in the environment variable value 3156 filename = os_helper.TESTFN 3157 self.addCleanup(os_helper.unlink, filename) 3158 with open(filename, "w", encoding="utf-8") as fp: 3159 fp.write('import sys, os\n' 3160 'if os.getenv("FRUIT") != "orange=lemon":\n' 3161 ' raise AssertionError') 3162 args = [sys.executable, filename] 3163 newenv = os.environ.copy() 3164 newenv["FRUIT"] = "orange=lemon" 3165 exitcode = spawn(os.P_WAIT, args[0], args, newenv) 3166 self.assertEqual(exitcode, 0) 3167 3168 @requires_os_func('spawnve') 3169 def test_spawnve_invalid_env(self): 3170 self._test_invalid_env(os.spawnve) 3171 3172 @requires_os_func('spawnvpe') 3173 def test_spawnvpe_invalid_env(self): 3174 self._test_invalid_env(os.spawnvpe) 3175 3176 3177# The introduction of this TestCase caused at least two different errors on 3178# *nix buildbots. Temporarily skip this to let the buildbots move along. 3179@unittest.skip("Skip due to platform/environment differences on *NIX buildbots") 3180@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin") 3181class LoginTests(unittest.TestCase): 3182 def test_getlogin(self): 3183 user_name = os.getlogin() 3184 self.assertNotEqual(len(user_name), 0) 3185 3186 3187@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'), 3188 "needs os.getpriority and os.setpriority") 3189class ProgramPriorityTests(unittest.TestCase): 3190 """Tests for os.getpriority() and os.setpriority().""" 3191 3192 def test_set_get_priority(self): 3193 3194 base = os.getpriority(os.PRIO_PROCESS, os.getpid()) 3195 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1) 3196 try: 3197 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid()) 3198 if base >= 19 and new_prio <= 19: 3199 raise unittest.SkipTest("unable to reliably test setpriority " 3200 "at current nice level of %s" % base) 3201 else: 3202 self.assertEqual(new_prio, base + 1) 3203 finally: 3204 try: 3205 os.setpriority(os.PRIO_PROCESS, os.getpid(), base) 3206 except OSError as err: 3207 if err.errno != errno.EACCES: 3208 raise 3209 3210 3211class SendfileTestServer(asyncore.dispatcher, threading.Thread): 3212 3213 class Handler(asynchat.async_chat): 3214 3215 def __init__(self, conn): 3216 asynchat.async_chat.__init__(self, conn) 3217 self.in_buffer = [] 3218 self.accumulate = True 3219 self.closed = False 3220 self.push(b"220 ready\r\n") 3221 3222 def handle_read(self): 3223 data = self.recv(4096) 3224 if self.accumulate: 3225 self.in_buffer.append(data) 3226 3227 def get_data(self): 3228 return b''.join(self.in_buffer) 3229 3230 def handle_close(self): 3231 self.close() 3232 self.closed = True 3233 3234 def handle_error(self): 3235 raise 3236 3237 def __init__(self, address): 3238 threading.Thread.__init__(self) 3239 asyncore.dispatcher.__init__(self) 3240 self.create_socket(socket.AF_INET, socket.SOCK_STREAM) 3241 self.bind(address) 3242 self.listen(5) 3243 self.host, self.port = self.socket.getsockname()[:2] 3244 self.handler_instance = None 3245 self._active = False 3246 self._active_lock = threading.Lock() 3247 3248 # --- public API 3249 3250 @property 3251 def running(self): 3252 return self._active 3253 3254 def start(self): 3255 assert not self.running 3256 self.__flag = threading.Event() 3257 threading.Thread.start(self) 3258 self.__flag.wait() 3259 3260 def stop(self): 3261 assert self.running 3262 self._active = False 3263 self.join() 3264 3265 def wait(self): 3266 # wait for handler connection to be closed, then stop the server 3267 while not getattr(self.handler_instance, "closed", False): 3268 time.sleep(0.001) 3269 self.stop() 3270 3271 # --- internals 3272 3273 def run(self): 3274 self._active = True 3275 self.__flag.set() 3276 while self._active and asyncore.socket_map: 3277 self._active_lock.acquire() 3278 asyncore.loop(timeout=0.001, count=1) 3279 self._active_lock.release() 3280 asyncore.close_all() 3281 3282 def handle_accept(self): 3283 conn, addr = self.accept() 3284 self.handler_instance = self.Handler(conn) 3285 3286 def handle_connect(self): 3287 self.close() 3288 handle_read = handle_connect 3289 3290 def writable(self): 3291 return 0 3292 3293 def handle_error(self): 3294 raise 3295 3296 3297@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()") 3298class TestSendfile(unittest.TestCase): 3299 3300 DATA = b"12345abcde" * 16 * 1024 # 160 KiB 3301 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \ 3302 not sys.platform.startswith("solaris") and \ 3303 not sys.platform.startswith("sunos") 3304 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS, 3305 'requires headers and trailers support') 3306 requires_32b = unittest.skipUnless(sys.maxsize < 2**32, 3307 'test is only meaningful on 32-bit builds') 3308 3309 @classmethod 3310 def setUpClass(cls): 3311 cls.key = threading_helper.threading_setup() 3312 create_file(os_helper.TESTFN, cls.DATA) 3313 3314 @classmethod 3315 def tearDownClass(cls): 3316 threading_helper.threading_cleanup(*cls.key) 3317 os_helper.unlink(os_helper.TESTFN) 3318 3319 def setUp(self): 3320 self.server = SendfileTestServer((socket_helper.HOST, 0)) 3321 self.server.start() 3322 self.client = socket.socket() 3323 self.client.connect((self.server.host, self.server.port)) 3324 self.client.settimeout(1) 3325 # synchronize by waiting for "220 ready" response 3326 self.client.recv(1024) 3327 self.sockno = self.client.fileno() 3328 self.file = open(os_helper.TESTFN, 'rb') 3329 self.fileno = self.file.fileno() 3330 3331 def tearDown(self): 3332 self.file.close() 3333 self.client.close() 3334 if self.server.running: 3335 self.server.stop() 3336 self.server = None 3337 3338 def sendfile_wrapper(self, *args, **kwargs): 3339 """A higher level wrapper representing how an application is 3340 supposed to use sendfile(). 3341 """ 3342 while True: 3343 try: 3344 return os.sendfile(*args, **kwargs) 3345 except OSError as err: 3346 if err.errno == errno.ECONNRESET: 3347 # disconnected 3348 raise 3349 elif err.errno in (errno.EAGAIN, errno.EBUSY): 3350 # we have to retry send data 3351 continue 3352 else: 3353 raise 3354 3355 def test_send_whole_file(self): 3356 # normal send 3357 total_sent = 0 3358 offset = 0 3359 nbytes = 4096 3360 while total_sent < len(self.DATA): 3361 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes) 3362 if sent == 0: 3363 break 3364 offset += sent 3365 total_sent += sent 3366 self.assertTrue(sent <= nbytes) 3367 self.assertEqual(offset, total_sent) 3368 3369 self.assertEqual(total_sent, len(self.DATA)) 3370 self.client.shutdown(socket.SHUT_RDWR) 3371 self.client.close() 3372 self.server.wait() 3373 data = self.server.handler_instance.get_data() 3374 self.assertEqual(len(data), len(self.DATA)) 3375 self.assertEqual(data, self.DATA) 3376 3377 def test_send_at_certain_offset(self): 3378 # start sending a file at a certain offset 3379 total_sent = 0 3380 offset = len(self.DATA) // 2 3381 must_send = len(self.DATA) - offset 3382 nbytes = 4096 3383 while total_sent < must_send: 3384 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes) 3385 if sent == 0: 3386 break 3387 offset += sent 3388 total_sent += sent 3389 self.assertTrue(sent <= nbytes) 3390 3391 self.client.shutdown(socket.SHUT_RDWR) 3392 self.client.close() 3393 self.server.wait() 3394 data = self.server.handler_instance.get_data() 3395 expected = self.DATA[len(self.DATA) // 2:] 3396 self.assertEqual(total_sent, len(expected)) 3397 self.assertEqual(len(data), len(expected)) 3398 self.assertEqual(data, expected) 3399 3400 def test_offset_overflow(self): 3401 # specify an offset > file size 3402 offset = len(self.DATA) + 4096 3403 try: 3404 sent = os.sendfile(self.sockno, self.fileno, offset, 4096) 3405 except OSError as e: 3406 # Solaris can raise EINVAL if offset >= file length, ignore. 3407 if e.errno != errno.EINVAL: 3408 raise 3409 else: 3410 self.assertEqual(sent, 0) 3411 self.client.shutdown(socket.SHUT_RDWR) 3412 self.client.close() 3413 self.server.wait() 3414 data = self.server.handler_instance.get_data() 3415 self.assertEqual(data, b'') 3416 3417 def test_invalid_offset(self): 3418 with self.assertRaises(OSError) as cm: 3419 os.sendfile(self.sockno, self.fileno, -1, 4096) 3420 self.assertEqual(cm.exception.errno, errno.EINVAL) 3421 3422 def test_keywords(self): 3423 # Keyword arguments should be supported 3424 os.sendfile(out_fd=self.sockno, in_fd=self.fileno, 3425 offset=0, count=4096) 3426 if self.SUPPORT_HEADERS_TRAILERS: 3427 os.sendfile(out_fd=self.sockno, in_fd=self.fileno, 3428 offset=0, count=4096, 3429 headers=(), trailers=(), flags=0) 3430 3431 # --- headers / trailers tests 3432 3433 @requires_headers_trailers 3434 def test_headers(self): 3435 total_sent = 0 3436 expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1] 3437 sent = os.sendfile(self.sockno, self.fileno, 0, 4096, 3438 headers=[b"x" * 512, b"y" * 256]) 3439 self.assertLessEqual(sent, 512 + 256 + 4096) 3440 total_sent += sent 3441 offset = 4096 3442 while total_sent < len(expected_data): 3443 nbytes = min(len(expected_data) - total_sent, 4096) 3444 sent = self.sendfile_wrapper(self.sockno, self.fileno, 3445 offset, nbytes) 3446 if sent == 0: 3447 break 3448 self.assertLessEqual(sent, nbytes) 3449 total_sent += sent 3450 offset += sent 3451 3452 self.assertEqual(total_sent, len(expected_data)) 3453 self.client.close() 3454 self.server.wait() 3455 data = self.server.handler_instance.get_data() 3456 self.assertEqual(hash(data), hash(expected_data)) 3457 3458 @requires_headers_trailers 3459 def test_trailers(self): 3460 TESTFN2 = os_helper.TESTFN + "2" 3461 file_data = b"abcdef" 3462 3463 self.addCleanup(os_helper.unlink, TESTFN2) 3464 create_file(TESTFN2, file_data) 3465 3466 with open(TESTFN2, 'rb') as f: 3467 os.sendfile(self.sockno, f.fileno(), 0, 5, 3468 trailers=[b"123456", b"789"]) 3469 self.client.close() 3470 self.server.wait() 3471 data = self.server.handler_instance.get_data() 3472 self.assertEqual(data, b"abcde123456789") 3473 3474 @requires_headers_trailers 3475 @requires_32b 3476 def test_headers_overflow_32bits(self): 3477 self.server.handler_instance.accumulate = False 3478 with self.assertRaises(OSError) as cm: 3479 os.sendfile(self.sockno, self.fileno, 0, 0, 3480 headers=[b"x" * 2**16] * 2**15) 3481 self.assertEqual(cm.exception.errno, errno.EINVAL) 3482 3483 @requires_headers_trailers 3484 @requires_32b 3485 def test_trailers_overflow_32bits(self): 3486 self.server.handler_instance.accumulate = False 3487 with self.assertRaises(OSError) as cm: 3488 os.sendfile(self.sockno, self.fileno, 0, 0, 3489 trailers=[b"x" * 2**16] * 2**15) 3490 self.assertEqual(cm.exception.errno, errno.EINVAL) 3491 3492 @requires_headers_trailers 3493 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'), 3494 'test needs os.SF_NODISKIO') 3495 def test_flags(self): 3496 try: 3497 os.sendfile(self.sockno, self.fileno, 0, 4096, 3498 flags=os.SF_NODISKIO) 3499 except OSError as err: 3500 if err.errno not in (errno.EBUSY, errno.EAGAIN): 3501 raise 3502 3503 3504def supports_extended_attributes(): 3505 if not hasattr(os, "setxattr"): 3506 return False 3507 3508 try: 3509 with open(os_helper.TESTFN, "xb", 0) as fp: 3510 try: 3511 os.setxattr(fp.fileno(), b"user.test", b"") 3512 except OSError: 3513 return False 3514 finally: 3515 os_helper.unlink(os_helper.TESTFN) 3516 3517 return True 3518 3519 3520@unittest.skipUnless(supports_extended_attributes(), 3521 "no non-broken extended attribute support") 3522# Kernels < 2.6.39 don't respect setxattr flags. 3523@support.requires_linux_version(2, 6, 39) 3524class ExtendedAttributeTests(unittest.TestCase): 3525 3526 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs): 3527 fn = os_helper.TESTFN 3528 self.addCleanup(os_helper.unlink, fn) 3529 create_file(fn) 3530 3531 with self.assertRaises(OSError) as cm: 3532 getxattr(fn, s("user.test"), **kwargs) 3533 self.assertEqual(cm.exception.errno, errno.ENODATA) 3534 3535 init_xattr = listxattr(fn) 3536 self.assertIsInstance(init_xattr, list) 3537 3538 setxattr(fn, s("user.test"), b"", **kwargs) 3539 xattr = set(init_xattr) 3540 xattr.add("user.test") 3541 self.assertEqual(set(listxattr(fn)), xattr) 3542 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"") 3543 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs) 3544 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello") 3545 3546 with self.assertRaises(OSError) as cm: 3547 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs) 3548 self.assertEqual(cm.exception.errno, errno.EEXIST) 3549 3550 with self.assertRaises(OSError) as cm: 3551 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs) 3552 self.assertEqual(cm.exception.errno, errno.ENODATA) 3553 3554 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs) 3555 xattr.add("user.test2") 3556 self.assertEqual(set(listxattr(fn)), xattr) 3557 removexattr(fn, s("user.test"), **kwargs) 3558 3559 with self.assertRaises(OSError) as cm: 3560 getxattr(fn, s("user.test"), **kwargs) 3561 self.assertEqual(cm.exception.errno, errno.ENODATA) 3562 3563 xattr.remove("user.test") 3564 self.assertEqual(set(listxattr(fn)), xattr) 3565 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo") 3566 setxattr(fn, s("user.test"), b"a"*1024, **kwargs) 3567 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024) 3568 removexattr(fn, s("user.test"), **kwargs) 3569 many = sorted("user.test{}".format(i) for i in range(100)) 3570 for thing in many: 3571 setxattr(fn, thing, b"x", **kwargs) 3572 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many)) 3573 3574 def _check_xattrs(self, *args, **kwargs): 3575 self._check_xattrs_str(str, *args, **kwargs) 3576 os_helper.unlink(os_helper.TESTFN) 3577 3578 self._check_xattrs_str(os.fsencode, *args, **kwargs) 3579 os_helper.unlink(os_helper.TESTFN) 3580 3581 def test_simple(self): 3582 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr, 3583 os.listxattr) 3584 3585 def test_lpath(self): 3586 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr, 3587 os.listxattr, follow_symlinks=False) 3588 3589 def test_fds(self): 3590 def getxattr(path, *args): 3591 with open(path, "rb") as fp: 3592 return os.getxattr(fp.fileno(), *args) 3593 def setxattr(path, *args): 3594 with open(path, "wb", 0) as fp: 3595 os.setxattr(fp.fileno(), *args) 3596 def removexattr(path, *args): 3597 with open(path, "wb", 0) as fp: 3598 os.removexattr(fp.fileno(), *args) 3599 def listxattr(path, *args): 3600 with open(path, "rb") as fp: 3601 return os.listxattr(fp.fileno(), *args) 3602 self._check_xattrs(getxattr, setxattr, removexattr, listxattr) 3603 3604 3605@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size") 3606class TermsizeTests(unittest.TestCase): 3607 def test_does_not_crash(self): 3608 """Check if get_terminal_size() returns a meaningful value. 3609 3610 There's no easy portable way to actually check the size of the 3611 terminal, so let's check if it returns something sensible instead. 3612 """ 3613 try: 3614 size = os.get_terminal_size() 3615 except OSError as e: 3616 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY): 3617 # Under win32 a generic OSError can be thrown if the 3618 # handle cannot be retrieved 3619 self.skipTest("failed to query terminal size") 3620 raise 3621 3622 self.assertGreaterEqual(size.columns, 0) 3623 self.assertGreaterEqual(size.lines, 0) 3624 3625 def test_stty_match(self): 3626 """Check if stty returns the same results 3627 3628 stty actually tests stdin, so get_terminal_size is invoked on 3629 stdin explicitly. If stty succeeded, then get_terminal_size() 3630 should work too. 3631 """ 3632 try: 3633 size = ( 3634 subprocess.check_output( 3635 ["stty", "size"], stderr=subprocess.DEVNULL, text=True 3636 ).split() 3637 ) 3638 except (FileNotFoundError, subprocess.CalledProcessError, 3639 PermissionError): 3640 self.skipTest("stty invocation failed") 3641 expected = (int(size[1]), int(size[0])) # reversed order 3642 3643 try: 3644 actual = os.get_terminal_size(sys.__stdin__.fileno()) 3645 except OSError as e: 3646 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY): 3647 # Under win32 a generic OSError can be thrown if the 3648 # handle cannot be retrieved 3649 self.skipTest("failed to query terminal size") 3650 raise 3651 self.assertEqual(expected, actual) 3652 3653 3654@unittest.skipUnless(hasattr(os, 'memfd_create'), 'requires os.memfd_create') 3655@support.requires_linux_version(3, 17) 3656class MemfdCreateTests(unittest.TestCase): 3657 def test_memfd_create(self): 3658 fd = os.memfd_create("Hi", os.MFD_CLOEXEC) 3659 self.assertNotEqual(fd, -1) 3660 self.addCleanup(os.close, fd) 3661 self.assertFalse(os.get_inheritable(fd)) 3662 with open(fd, "wb", closefd=False) as f: 3663 f.write(b'memfd_create') 3664 self.assertEqual(f.tell(), 12) 3665 3666 fd2 = os.memfd_create("Hi") 3667 self.addCleanup(os.close, fd2) 3668 self.assertFalse(os.get_inheritable(fd2)) 3669 3670 3671@unittest.skipUnless(hasattr(os, 'eventfd'), 'requires os.eventfd') 3672@support.requires_linux_version(2, 6, 30) 3673class EventfdTests(unittest.TestCase): 3674 def test_eventfd_initval(self): 3675 def pack(value): 3676 """Pack as native uint64_t 3677 """ 3678 return struct.pack("@Q", value) 3679 size = 8 # read/write 8 bytes 3680 initval = 42 3681 fd = os.eventfd(initval) 3682 self.assertNotEqual(fd, -1) 3683 self.addCleanup(os.close, fd) 3684 self.assertFalse(os.get_inheritable(fd)) 3685 3686 # test with raw read/write 3687 res = os.read(fd, size) 3688 self.assertEqual(res, pack(initval)) 3689 3690 os.write(fd, pack(23)) 3691 res = os.read(fd, size) 3692 self.assertEqual(res, pack(23)) 3693 3694 os.write(fd, pack(40)) 3695 os.write(fd, pack(2)) 3696 res = os.read(fd, size) 3697 self.assertEqual(res, pack(42)) 3698 3699 # test with eventfd_read/eventfd_write 3700 os.eventfd_write(fd, 20) 3701 os.eventfd_write(fd, 3) 3702 res = os.eventfd_read(fd) 3703 self.assertEqual(res, 23) 3704 3705 def test_eventfd_semaphore(self): 3706 initval = 2 3707 flags = os.EFD_CLOEXEC | os.EFD_SEMAPHORE | os.EFD_NONBLOCK 3708 fd = os.eventfd(initval, flags) 3709 self.assertNotEqual(fd, -1) 3710 self.addCleanup(os.close, fd) 3711 3712 # semaphore starts has initval 2, two reads return '1' 3713 res = os.eventfd_read(fd) 3714 self.assertEqual(res, 1) 3715 res = os.eventfd_read(fd) 3716 self.assertEqual(res, 1) 3717 # third read would block 3718 with self.assertRaises(BlockingIOError): 3719 os.eventfd_read(fd) 3720 with self.assertRaises(BlockingIOError): 3721 os.read(fd, 8) 3722 3723 # increase semaphore counter, read one 3724 os.eventfd_write(fd, 1) 3725 res = os.eventfd_read(fd) 3726 self.assertEqual(res, 1) 3727 # next read would block, too 3728 with self.assertRaises(BlockingIOError): 3729 os.eventfd_read(fd) 3730 3731 def test_eventfd_select(self): 3732 flags = os.EFD_CLOEXEC | os.EFD_NONBLOCK 3733 fd = os.eventfd(0, flags) 3734 self.assertNotEqual(fd, -1) 3735 self.addCleanup(os.close, fd) 3736 3737 # counter is zero, only writeable 3738 rfd, wfd, xfd = select.select([fd], [fd], [fd], 0) 3739 self.assertEqual((rfd, wfd, xfd), ([], [fd], [])) 3740 3741 # counter is non-zero, read and writeable 3742 os.eventfd_write(fd, 23) 3743 rfd, wfd, xfd = select.select([fd], [fd], [fd], 0) 3744 self.assertEqual((rfd, wfd, xfd), ([fd], [fd], [])) 3745 self.assertEqual(os.eventfd_read(fd), 23) 3746 3747 # counter at max, only readable 3748 os.eventfd_write(fd, (2**64) - 2) 3749 rfd, wfd, xfd = select.select([fd], [fd], [fd], 0) 3750 self.assertEqual((rfd, wfd, xfd), ([fd], [], [])) 3751 os.eventfd_read(fd) 3752 3753 3754class OSErrorTests(unittest.TestCase): 3755 def setUp(self): 3756 class Str(str): 3757 pass 3758 3759 self.bytes_filenames = [] 3760 self.unicode_filenames = [] 3761 if os_helper.TESTFN_UNENCODABLE is not None: 3762 decoded = os_helper.TESTFN_UNENCODABLE 3763 else: 3764 decoded = os_helper.TESTFN 3765 self.unicode_filenames.append(decoded) 3766 self.unicode_filenames.append(Str(decoded)) 3767 if os_helper.TESTFN_UNDECODABLE is not None: 3768 encoded = os_helper.TESTFN_UNDECODABLE 3769 else: 3770 encoded = os.fsencode(os_helper.TESTFN) 3771 self.bytes_filenames.append(encoded) 3772 self.bytes_filenames.append(bytearray(encoded)) 3773 self.bytes_filenames.append(memoryview(encoded)) 3774 3775 self.filenames = self.bytes_filenames + self.unicode_filenames 3776 3777 def test_oserror_filename(self): 3778 funcs = [ 3779 (self.filenames, os.chdir,), 3780 (self.filenames, os.chmod, 0o777), 3781 (self.filenames, os.lstat,), 3782 (self.filenames, os.open, os.O_RDONLY), 3783 (self.filenames, os.rmdir,), 3784 (self.filenames, os.stat,), 3785 (self.filenames, os.unlink,), 3786 ] 3787 if sys.platform == "win32": 3788 funcs.extend(( 3789 (self.bytes_filenames, os.rename, b"dst"), 3790 (self.bytes_filenames, os.replace, b"dst"), 3791 (self.unicode_filenames, os.rename, "dst"), 3792 (self.unicode_filenames, os.replace, "dst"), 3793 (self.unicode_filenames, os.listdir, ), 3794 )) 3795 else: 3796 funcs.extend(( 3797 (self.filenames, os.listdir,), 3798 (self.filenames, os.rename, "dst"), 3799 (self.filenames, os.replace, "dst"), 3800 )) 3801 if hasattr(os, "chown"): 3802 funcs.append((self.filenames, os.chown, 0, 0)) 3803 if hasattr(os, "lchown"): 3804 funcs.append((self.filenames, os.lchown, 0, 0)) 3805 if hasattr(os, "truncate"): 3806 funcs.append((self.filenames, os.truncate, 0)) 3807 if hasattr(os, "chflags"): 3808 funcs.append((self.filenames, os.chflags, 0)) 3809 if hasattr(os, "lchflags"): 3810 funcs.append((self.filenames, os.lchflags, 0)) 3811 if hasattr(os, "chroot"): 3812 funcs.append((self.filenames, os.chroot,)) 3813 if hasattr(os, "link"): 3814 if sys.platform == "win32": 3815 funcs.append((self.bytes_filenames, os.link, b"dst")) 3816 funcs.append((self.unicode_filenames, os.link, "dst")) 3817 else: 3818 funcs.append((self.filenames, os.link, "dst")) 3819 if hasattr(os, "listxattr"): 3820 funcs.extend(( 3821 (self.filenames, os.listxattr,), 3822 (self.filenames, os.getxattr, "user.test"), 3823 (self.filenames, os.setxattr, "user.test", b'user'), 3824 (self.filenames, os.removexattr, "user.test"), 3825 )) 3826 if hasattr(os, "lchmod"): 3827 funcs.append((self.filenames, os.lchmod, 0o777)) 3828 if hasattr(os, "readlink"): 3829 funcs.append((self.filenames, os.readlink,)) 3830 3831 3832 for filenames, func, *func_args in funcs: 3833 for name in filenames: 3834 try: 3835 if isinstance(name, (str, bytes)): 3836 func(name, *func_args) 3837 else: 3838 with self.assertWarnsRegex(DeprecationWarning, 'should be'): 3839 func(name, *func_args) 3840 except OSError as err: 3841 self.assertIs(err.filename, name, str(func)) 3842 except UnicodeDecodeError: 3843 pass 3844 else: 3845 self.fail("No exception thrown by {}".format(func)) 3846 3847class CPUCountTests(unittest.TestCase): 3848 def test_cpu_count(self): 3849 cpus = os.cpu_count() 3850 if cpus is not None: 3851 self.assertIsInstance(cpus, int) 3852 self.assertGreater(cpus, 0) 3853 else: 3854 self.skipTest("Could not determine the number of CPUs") 3855 3856 3857class FDInheritanceTests(unittest.TestCase): 3858 def test_get_set_inheritable(self): 3859 fd = os.open(__file__, os.O_RDONLY) 3860 self.addCleanup(os.close, fd) 3861 self.assertEqual(os.get_inheritable(fd), False) 3862 3863 os.set_inheritable(fd, True) 3864 self.assertEqual(os.get_inheritable(fd), True) 3865 3866 @unittest.skipIf(fcntl is None, "need fcntl") 3867 def test_get_inheritable_cloexec(self): 3868 fd = os.open(__file__, os.O_RDONLY) 3869 self.addCleanup(os.close, fd) 3870 self.assertEqual(os.get_inheritable(fd), False) 3871 3872 # clear FD_CLOEXEC flag 3873 flags = fcntl.fcntl(fd, fcntl.F_GETFD) 3874 flags &= ~fcntl.FD_CLOEXEC 3875 fcntl.fcntl(fd, fcntl.F_SETFD, flags) 3876 3877 self.assertEqual(os.get_inheritable(fd), True) 3878 3879 @unittest.skipIf(fcntl is None, "need fcntl") 3880 def test_set_inheritable_cloexec(self): 3881 fd = os.open(__file__, os.O_RDONLY) 3882 self.addCleanup(os.close, fd) 3883 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, 3884 fcntl.FD_CLOEXEC) 3885 3886 os.set_inheritable(fd, True) 3887 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, 3888 0) 3889 3890 @unittest.skipUnless(hasattr(os, 'O_PATH'), "need os.O_PATH") 3891 def test_get_set_inheritable_o_path(self): 3892 fd = os.open(__file__, os.O_PATH) 3893 self.addCleanup(os.close, fd) 3894 self.assertEqual(os.get_inheritable(fd), False) 3895 3896 os.set_inheritable(fd, True) 3897 self.assertEqual(os.get_inheritable(fd), True) 3898 3899 os.set_inheritable(fd, False) 3900 self.assertEqual(os.get_inheritable(fd), False) 3901 3902 def test_get_set_inheritable_badf(self): 3903 fd = os_helper.make_bad_fd() 3904 3905 with self.assertRaises(OSError) as ctx: 3906 os.get_inheritable(fd) 3907 self.assertEqual(ctx.exception.errno, errno.EBADF) 3908 3909 with self.assertRaises(OSError) as ctx: 3910 os.set_inheritable(fd, True) 3911 self.assertEqual(ctx.exception.errno, errno.EBADF) 3912 3913 with self.assertRaises(OSError) as ctx: 3914 os.set_inheritable(fd, False) 3915 self.assertEqual(ctx.exception.errno, errno.EBADF) 3916 3917 def test_open(self): 3918 fd = os.open(__file__, os.O_RDONLY) 3919 self.addCleanup(os.close, fd) 3920 self.assertEqual(os.get_inheritable(fd), False) 3921 3922 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()") 3923 def test_pipe(self): 3924 rfd, wfd = os.pipe() 3925 self.addCleanup(os.close, rfd) 3926 self.addCleanup(os.close, wfd) 3927 self.assertEqual(os.get_inheritable(rfd), False) 3928 self.assertEqual(os.get_inheritable(wfd), False) 3929 3930 def test_dup(self): 3931 fd1 = os.open(__file__, os.O_RDONLY) 3932 self.addCleanup(os.close, fd1) 3933 3934 fd2 = os.dup(fd1) 3935 self.addCleanup(os.close, fd2) 3936 self.assertEqual(os.get_inheritable(fd2), False) 3937 3938 def test_dup_standard_stream(self): 3939 fd = os.dup(1) 3940 self.addCleanup(os.close, fd) 3941 self.assertGreater(fd, 0) 3942 3943 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test') 3944 def test_dup_nul(self): 3945 # os.dup() was creating inheritable fds for character files. 3946 fd1 = os.open('NUL', os.O_RDONLY) 3947 self.addCleanup(os.close, fd1) 3948 fd2 = os.dup(fd1) 3949 self.addCleanup(os.close, fd2) 3950 self.assertFalse(os.get_inheritable(fd2)) 3951 3952 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()") 3953 def test_dup2(self): 3954 fd = os.open(__file__, os.O_RDONLY) 3955 self.addCleanup(os.close, fd) 3956 3957 # inheritable by default 3958 fd2 = os.open(__file__, os.O_RDONLY) 3959 self.addCleanup(os.close, fd2) 3960 self.assertEqual(os.dup2(fd, fd2), fd2) 3961 self.assertTrue(os.get_inheritable(fd2)) 3962 3963 # force non-inheritable 3964 fd3 = os.open(__file__, os.O_RDONLY) 3965 self.addCleanup(os.close, fd3) 3966 self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3) 3967 self.assertFalse(os.get_inheritable(fd3)) 3968 3969 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()") 3970 def test_openpty(self): 3971 master_fd, slave_fd = os.openpty() 3972 self.addCleanup(os.close, master_fd) 3973 self.addCleanup(os.close, slave_fd) 3974 self.assertEqual(os.get_inheritable(master_fd), False) 3975 self.assertEqual(os.get_inheritable(slave_fd), False) 3976 3977 3978class PathTConverterTests(unittest.TestCase): 3979 # tuples of (function name, allows fd arguments, additional arguments to 3980 # function, cleanup function) 3981 functions = [ 3982 ('stat', True, (), None), 3983 ('lstat', False, (), None), 3984 ('access', False, (os.F_OK,), None), 3985 ('chflags', False, (0,), None), 3986 ('lchflags', False, (0,), None), 3987 ('open', False, (0,), getattr(os, 'close', None)), 3988 ] 3989 3990 def test_path_t_converter(self): 3991 str_filename = os_helper.TESTFN 3992 if os.name == 'nt': 3993 bytes_fspath = bytes_filename = None 3994 else: 3995 bytes_filename = os.fsencode(os_helper.TESTFN) 3996 bytes_fspath = FakePath(bytes_filename) 3997 fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT) 3998 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 3999 self.addCleanup(os.close, fd) 4000 4001 int_fspath = FakePath(fd) 4002 str_fspath = FakePath(str_filename) 4003 4004 for name, allow_fd, extra_args, cleanup_fn in self.functions: 4005 with self.subTest(name=name): 4006 try: 4007 fn = getattr(os, name) 4008 except AttributeError: 4009 continue 4010 4011 for path in (str_filename, bytes_filename, str_fspath, 4012 bytes_fspath): 4013 if path is None: 4014 continue 4015 with self.subTest(name=name, path=path): 4016 result = fn(path, *extra_args) 4017 if cleanup_fn is not None: 4018 cleanup_fn(result) 4019 4020 with self.assertRaisesRegex( 4021 TypeError, 'to return str or bytes'): 4022 fn(int_fspath, *extra_args) 4023 4024 if allow_fd: 4025 result = fn(fd, *extra_args) # should not fail 4026 if cleanup_fn is not None: 4027 cleanup_fn(result) 4028 else: 4029 with self.assertRaisesRegex( 4030 TypeError, 4031 'os.PathLike'): 4032 fn(fd, *extra_args) 4033 4034 def test_path_t_converter_and_custom_class(self): 4035 msg = r'__fspath__\(\) to return str or bytes, not %s' 4036 with self.assertRaisesRegex(TypeError, msg % r'int'): 4037 os.stat(FakePath(2)) 4038 with self.assertRaisesRegex(TypeError, msg % r'float'): 4039 os.stat(FakePath(2.34)) 4040 with self.assertRaisesRegex(TypeError, msg % r'object'): 4041 os.stat(FakePath(object())) 4042 4043 4044@unittest.skipUnless(hasattr(os, 'get_blocking'), 4045 'needs os.get_blocking() and os.set_blocking()') 4046class BlockingTests(unittest.TestCase): 4047 def test_blocking(self): 4048 fd = os.open(__file__, os.O_RDONLY) 4049 self.addCleanup(os.close, fd) 4050 self.assertEqual(os.get_blocking(fd), True) 4051 4052 os.set_blocking(fd, False) 4053 self.assertEqual(os.get_blocking(fd), False) 4054 4055 os.set_blocking(fd, True) 4056 self.assertEqual(os.get_blocking(fd), True) 4057 4058 4059 4060class ExportsTests(unittest.TestCase): 4061 def test_os_all(self): 4062 self.assertIn('open', os.__all__) 4063 self.assertIn('walk', os.__all__) 4064 4065 4066class TestDirEntry(unittest.TestCase): 4067 def setUp(self): 4068 self.path = os.path.realpath(os_helper.TESTFN) 4069 self.addCleanup(os_helper.rmtree, self.path) 4070 os.mkdir(self.path) 4071 4072 def test_uninstantiable(self): 4073 self.assertRaises(TypeError, os.DirEntry) 4074 4075 def test_unpickable(self): 4076 filename = create_file(os.path.join(self.path, "file.txt"), b'python') 4077 entry = [entry for entry in os.scandir(self.path)].pop() 4078 self.assertIsInstance(entry, os.DirEntry) 4079 self.assertEqual(entry.name, "file.txt") 4080 import pickle 4081 self.assertRaises(TypeError, pickle.dumps, entry, filename) 4082 4083 4084class TestScandir(unittest.TestCase): 4085 check_no_resource_warning = warnings_helper.check_no_resource_warning 4086 4087 def setUp(self): 4088 self.path = os.path.realpath(os_helper.TESTFN) 4089 self.bytes_path = os.fsencode(self.path) 4090 self.addCleanup(os_helper.rmtree, self.path) 4091 os.mkdir(self.path) 4092 4093 def create_file(self, name="file.txt"): 4094 path = self.bytes_path if isinstance(name, bytes) else self.path 4095 filename = os.path.join(path, name) 4096 create_file(filename, b'python') 4097 return filename 4098 4099 def get_entries(self, names): 4100 entries = dict((entry.name, entry) 4101 for entry in os.scandir(self.path)) 4102 self.assertEqual(sorted(entries.keys()), names) 4103 return entries 4104 4105 def assert_stat_equal(self, stat1, stat2, skip_fields): 4106 if skip_fields: 4107 for attr in dir(stat1): 4108 if not attr.startswith("st_"): 4109 continue 4110 if attr in ("st_dev", "st_ino", "st_nlink"): 4111 continue 4112 self.assertEqual(getattr(stat1, attr), 4113 getattr(stat2, attr), 4114 (stat1, stat2, attr)) 4115 else: 4116 self.assertEqual(stat1, stat2) 4117 4118 def test_uninstantiable(self): 4119 scandir_iter = os.scandir(self.path) 4120 self.assertRaises(TypeError, type(scandir_iter)) 4121 scandir_iter.close() 4122 4123 def test_unpickable(self): 4124 filename = self.create_file("file.txt") 4125 scandir_iter = os.scandir(self.path) 4126 import pickle 4127 self.assertRaises(TypeError, pickle.dumps, scandir_iter, filename) 4128 scandir_iter.close() 4129 4130 def check_entry(self, entry, name, is_dir, is_file, is_symlink): 4131 self.assertIsInstance(entry, os.DirEntry) 4132 self.assertEqual(entry.name, name) 4133 self.assertEqual(entry.path, os.path.join(self.path, name)) 4134 self.assertEqual(entry.inode(), 4135 os.stat(entry.path, follow_symlinks=False).st_ino) 4136 4137 entry_stat = os.stat(entry.path) 4138 self.assertEqual(entry.is_dir(), 4139 stat.S_ISDIR(entry_stat.st_mode)) 4140 self.assertEqual(entry.is_file(), 4141 stat.S_ISREG(entry_stat.st_mode)) 4142 self.assertEqual(entry.is_symlink(), 4143 os.path.islink(entry.path)) 4144 4145 entry_lstat = os.stat(entry.path, follow_symlinks=False) 4146 self.assertEqual(entry.is_dir(follow_symlinks=False), 4147 stat.S_ISDIR(entry_lstat.st_mode)) 4148 self.assertEqual(entry.is_file(follow_symlinks=False), 4149 stat.S_ISREG(entry_lstat.st_mode)) 4150 4151 self.assert_stat_equal(entry.stat(), 4152 entry_stat, 4153 os.name == 'nt' and not is_symlink) 4154 self.assert_stat_equal(entry.stat(follow_symlinks=False), 4155 entry_lstat, 4156 os.name == 'nt') 4157 4158 def test_attributes(self): 4159 link = hasattr(os, 'link') 4160 symlink = os_helper.can_symlink() 4161 4162 dirname = os.path.join(self.path, "dir") 4163 os.mkdir(dirname) 4164 filename = self.create_file("file.txt") 4165 if link: 4166 try: 4167 os.link(filename, os.path.join(self.path, "link_file.txt")) 4168 except PermissionError as e: 4169 self.skipTest('os.link(): %s' % e) 4170 if symlink: 4171 os.symlink(dirname, os.path.join(self.path, "symlink_dir"), 4172 target_is_directory=True) 4173 os.symlink(filename, os.path.join(self.path, "symlink_file.txt")) 4174 4175 names = ['dir', 'file.txt'] 4176 if link: 4177 names.append('link_file.txt') 4178 if symlink: 4179 names.extend(('symlink_dir', 'symlink_file.txt')) 4180 entries = self.get_entries(names) 4181 4182 entry = entries['dir'] 4183 self.check_entry(entry, 'dir', True, False, False) 4184 4185 entry = entries['file.txt'] 4186 self.check_entry(entry, 'file.txt', False, True, False) 4187 4188 if link: 4189 entry = entries['link_file.txt'] 4190 self.check_entry(entry, 'link_file.txt', False, True, False) 4191 4192 if symlink: 4193 entry = entries['symlink_dir'] 4194 self.check_entry(entry, 'symlink_dir', True, False, True) 4195 4196 entry = entries['symlink_file.txt'] 4197 self.check_entry(entry, 'symlink_file.txt', False, True, True) 4198 4199 def get_entry(self, name): 4200 path = self.bytes_path if isinstance(name, bytes) else self.path 4201 entries = list(os.scandir(path)) 4202 self.assertEqual(len(entries), 1) 4203 4204 entry = entries[0] 4205 self.assertEqual(entry.name, name) 4206 return entry 4207 4208 def create_file_entry(self, name='file.txt'): 4209 filename = self.create_file(name=name) 4210 return self.get_entry(os.path.basename(filename)) 4211 4212 def test_current_directory(self): 4213 filename = self.create_file() 4214 old_dir = os.getcwd() 4215 try: 4216 os.chdir(self.path) 4217 4218 # call scandir() without parameter: it must list the content 4219 # of the current directory 4220 entries = dict((entry.name, entry) for entry in os.scandir()) 4221 self.assertEqual(sorted(entries.keys()), 4222 [os.path.basename(filename)]) 4223 finally: 4224 os.chdir(old_dir) 4225 4226 def test_repr(self): 4227 entry = self.create_file_entry() 4228 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>") 4229 4230 def test_fspath_protocol(self): 4231 entry = self.create_file_entry() 4232 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt')) 4233 4234 def test_fspath_protocol_bytes(self): 4235 bytes_filename = os.fsencode('bytesfile.txt') 4236 bytes_entry = self.create_file_entry(name=bytes_filename) 4237 fspath = os.fspath(bytes_entry) 4238 self.assertIsInstance(fspath, bytes) 4239 self.assertEqual(fspath, 4240 os.path.join(os.fsencode(self.path),bytes_filename)) 4241 4242 def test_removed_dir(self): 4243 path = os.path.join(self.path, 'dir') 4244 4245 os.mkdir(path) 4246 entry = self.get_entry('dir') 4247 os.rmdir(path) 4248 4249 # On POSIX, is_dir() result depends if scandir() filled d_type or not 4250 if os.name == 'nt': 4251 self.assertTrue(entry.is_dir()) 4252 self.assertFalse(entry.is_file()) 4253 self.assertFalse(entry.is_symlink()) 4254 if os.name == 'nt': 4255 self.assertRaises(FileNotFoundError, entry.inode) 4256 # don't fail 4257 entry.stat() 4258 entry.stat(follow_symlinks=False) 4259 else: 4260 self.assertGreater(entry.inode(), 0) 4261 self.assertRaises(FileNotFoundError, entry.stat) 4262 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False) 4263 4264 def test_removed_file(self): 4265 entry = self.create_file_entry() 4266 os.unlink(entry.path) 4267 4268 self.assertFalse(entry.is_dir()) 4269 # On POSIX, is_dir() result depends if scandir() filled d_type or not 4270 if os.name == 'nt': 4271 self.assertTrue(entry.is_file()) 4272 self.assertFalse(entry.is_symlink()) 4273 if os.name == 'nt': 4274 self.assertRaises(FileNotFoundError, entry.inode) 4275 # don't fail 4276 entry.stat() 4277 entry.stat(follow_symlinks=False) 4278 else: 4279 self.assertGreater(entry.inode(), 0) 4280 self.assertRaises(FileNotFoundError, entry.stat) 4281 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False) 4282 4283 def test_broken_symlink(self): 4284 if not os_helper.can_symlink(): 4285 return self.skipTest('cannot create symbolic link') 4286 4287 filename = self.create_file("file.txt") 4288 os.symlink(filename, 4289 os.path.join(self.path, "symlink.txt")) 4290 entries = self.get_entries(['file.txt', 'symlink.txt']) 4291 entry = entries['symlink.txt'] 4292 os.unlink(filename) 4293 4294 self.assertGreater(entry.inode(), 0) 4295 self.assertFalse(entry.is_dir()) 4296 self.assertFalse(entry.is_file()) # broken symlink returns False 4297 self.assertFalse(entry.is_dir(follow_symlinks=False)) 4298 self.assertFalse(entry.is_file(follow_symlinks=False)) 4299 self.assertTrue(entry.is_symlink()) 4300 self.assertRaises(FileNotFoundError, entry.stat) 4301 # don't fail 4302 entry.stat(follow_symlinks=False) 4303 4304 def test_bytes(self): 4305 self.create_file("file.txt") 4306 4307 path_bytes = os.fsencode(self.path) 4308 entries = list(os.scandir(path_bytes)) 4309 self.assertEqual(len(entries), 1, entries) 4310 entry = entries[0] 4311 4312 self.assertEqual(entry.name, b'file.txt') 4313 self.assertEqual(entry.path, 4314 os.fsencode(os.path.join(self.path, 'file.txt'))) 4315 4316 def test_bytes_like(self): 4317 self.create_file("file.txt") 4318 4319 for cls in bytearray, memoryview: 4320 path_bytes = cls(os.fsencode(self.path)) 4321 with self.assertWarns(DeprecationWarning): 4322 entries = list(os.scandir(path_bytes)) 4323 self.assertEqual(len(entries), 1, entries) 4324 entry = entries[0] 4325 4326 self.assertEqual(entry.name, b'file.txt') 4327 self.assertEqual(entry.path, 4328 os.fsencode(os.path.join(self.path, 'file.txt'))) 4329 self.assertIs(type(entry.name), bytes) 4330 self.assertIs(type(entry.path), bytes) 4331 4332 @unittest.skipUnless(os.listdir in os.supports_fd, 4333 'fd support for listdir required for this test.') 4334 def test_fd(self): 4335 self.assertIn(os.scandir, os.supports_fd) 4336 self.create_file('file.txt') 4337 expected_names = ['file.txt'] 4338 if os_helper.can_symlink(): 4339 os.symlink('file.txt', os.path.join(self.path, 'link')) 4340 expected_names.append('link') 4341 4342 fd = os.open(self.path, os.O_RDONLY) 4343 try: 4344 with os.scandir(fd) as it: 4345 entries = list(it) 4346 names = [entry.name for entry in entries] 4347 self.assertEqual(sorted(names), expected_names) 4348 self.assertEqual(names, os.listdir(fd)) 4349 for entry in entries: 4350 self.assertEqual(entry.path, entry.name) 4351 self.assertEqual(os.fspath(entry), entry.name) 4352 self.assertEqual(entry.is_symlink(), entry.name == 'link') 4353 if os.stat in os.supports_dir_fd: 4354 st = os.stat(entry.name, dir_fd=fd) 4355 self.assertEqual(entry.stat(), st) 4356 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False) 4357 self.assertEqual(entry.stat(follow_symlinks=False), st) 4358 finally: 4359 os.close(fd) 4360 4361 def test_empty_path(self): 4362 self.assertRaises(FileNotFoundError, os.scandir, '') 4363 4364 def test_consume_iterator_twice(self): 4365 self.create_file("file.txt") 4366 iterator = os.scandir(self.path) 4367 4368 entries = list(iterator) 4369 self.assertEqual(len(entries), 1, entries) 4370 4371 # check than consuming the iterator twice doesn't raise exception 4372 entries2 = list(iterator) 4373 self.assertEqual(len(entries2), 0, entries2) 4374 4375 def test_bad_path_type(self): 4376 for obj in [1.234, {}, []]: 4377 self.assertRaises(TypeError, os.scandir, obj) 4378 4379 def test_close(self): 4380 self.create_file("file.txt") 4381 self.create_file("file2.txt") 4382 iterator = os.scandir(self.path) 4383 next(iterator) 4384 iterator.close() 4385 # multiple closes 4386 iterator.close() 4387 with self.check_no_resource_warning(): 4388 del iterator 4389 4390 def test_context_manager(self): 4391 self.create_file("file.txt") 4392 self.create_file("file2.txt") 4393 with os.scandir(self.path) as iterator: 4394 next(iterator) 4395 with self.check_no_resource_warning(): 4396 del iterator 4397 4398 def test_context_manager_close(self): 4399 self.create_file("file.txt") 4400 self.create_file("file2.txt") 4401 with os.scandir(self.path) as iterator: 4402 next(iterator) 4403 iterator.close() 4404 4405 def test_context_manager_exception(self): 4406 self.create_file("file.txt") 4407 self.create_file("file2.txt") 4408 with self.assertRaises(ZeroDivisionError): 4409 with os.scandir(self.path) as iterator: 4410 next(iterator) 4411 1/0 4412 with self.check_no_resource_warning(): 4413 del iterator 4414 4415 def test_resource_warning(self): 4416 self.create_file("file.txt") 4417 self.create_file("file2.txt") 4418 iterator = os.scandir(self.path) 4419 next(iterator) 4420 with self.assertWarns(ResourceWarning): 4421 del iterator 4422 support.gc_collect() 4423 # exhausted iterator 4424 iterator = os.scandir(self.path) 4425 list(iterator) 4426 with self.check_no_resource_warning(): 4427 del iterator 4428 4429 4430class TestPEP519(unittest.TestCase): 4431 4432 # Abstracted so it can be overridden to test pure Python implementation 4433 # if a C version is provided. 4434 fspath = staticmethod(os.fspath) 4435 4436 def test_return_bytes(self): 4437 for b in b'hello', b'goodbye', b'some/path/and/file': 4438 self.assertEqual(b, self.fspath(b)) 4439 4440 def test_return_string(self): 4441 for s in 'hello', 'goodbye', 'some/path/and/file': 4442 self.assertEqual(s, self.fspath(s)) 4443 4444 def test_fsencode_fsdecode(self): 4445 for p in "path/like/object", b"path/like/object": 4446 pathlike = FakePath(p) 4447 4448 self.assertEqual(p, self.fspath(pathlike)) 4449 self.assertEqual(b"path/like/object", os.fsencode(pathlike)) 4450 self.assertEqual("path/like/object", os.fsdecode(pathlike)) 4451 4452 def test_pathlike(self): 4453 self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil'))) 4454 self.assertTrue(issubclass(FakePath, os.PathLike)) 4455 self.assertTrue(isinstance(FakePath('x'), os.PathLike)) 4456 4457 def test_garbage_in_exception_out(self): 4458 vapor = type('blah', (), {}) 4459 for o in int, type, os, vapor(): 4460 self.assertRaises(TypeError, self.fspath, o) 4461 4462 def test_argument_required(self): 4463 self.assertRaises(TypeError, self.fspath) 4464 4465 def test_bad_pathlike(self): 4466 # __fspath__ returns a value other than str or bytes. 4467 self.assertRaises(TypeError, self.fspath, FakePath(42)) 4468 # __fspath__ attribute that is not callable. 4469 c = type('foo', (), {}) 4470 c.__fspath__ = 1 4471 self.assertRaises(TypeError, self.fspath, c()) 4472 # __fspath__ raises an exception. 4473 self.assertRaises(ZeroDivisionError, self.fspath, 4474 FakePath(ZeroDivisionError())) 4475 4476 def test_pathlike_subclasshook(self): 4477 # bpo-38878: subclasshook causes subclass checks 4478 # true on abstract implementation. 4479 class A(os.PathLike): 4480 pass 4481 self.assertFalse(issubclass(FakePath, A)) 4482 self.assertTrue(issubclass(FakePath, os.PathLike)) 4483 4484 def test_pathlike_class_getitem(self): 4485 self.assertIsInstance(os.PathLike[bytes], types.GenericAlias) 4486 4487 4488class TimesTests(unittest.TestCase): 4489 def test_times(self): 4490 times = os.times() 4491 self.assertIsInstance(times, os.times_result) 4492 4493 for field in ('user', 'system', 'children_user', 'children_system', 4494 'elapsed'): 4495 value = getattr(times, field) 4496 self.assertIsInstance(value, float) 4497 4498 if os.name == 'nt': 4499 self.assertEqual(times.children_user, 0) 4500 self.assertEqual(times.children_system, 0) 4501 self.assertEqual(times.elapsed, 0) 4502 4503 4504@requires_os_func('fork') 4505class ForkTests(unittest.TestCase): 4506 def test_fork(self): 4507 # bpo-42540: ensure os.fork() with non-default memory allocator does 4508 # not crash on exit. 4509 code = """if 1: 4510 import os 4511 from test import support 4512 pid = os.fork() 4513 if pid != 0: 4514 support.wait_process(pid, exitcode=0) 4515 """ 4516 assert_python_ok("-c", code) 4517 assert_python_ok("-c", code, PYTHONMALLOC="malloc_debug") 4518 4519 4520# Only test if the C version is provided, otherwise TestPEP519 already tested 4521# the pure Python implementation. 4522if hasattr(os, "_fspath"): 4523 class TestPEP519PurePython(TestPEP519): 4524 4525 """Explicitly test the pure Python implementation of os.fspath().""" 4526 4527 fspath = staticmethod(os._fspath) 4528 4529 4530if __name__ == "__main__": 4531 unittest.main() 4532