1import collections.abc 2import contextlib 3import errno 4import os 5import re 6import stat 7import sys 8import time 9import unittest 10import warnings 11 12 13# Filename used for testing 14if os.name == 'java': 15 # Jython disallows @ in module names 16 TESTFN_ASCII = '$test' 17else: 18 TESTFN_ASCII = '@test' 19 20# Disambiguate TESTFN for parallel testing, while letting it remain a valid 21# module name. 22TESTFN_ASCII = "{}_{}_tmp".format(TESTFN_ASCII, os.getpid()) 23 24# TESTFN_UNICODE is a non-ascii filename 25TESTFN_UNICODE = TESTFN_ASCII + "-\xe0\xf2\u0258\u0141\u011f" 26if sys.platform == 'darwin': 27 # In Mac OS X's VFS API file names are, by definition, canonically 28 # decomposed Unicode, encoded using UTF-8. See QA1173: 29 # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html 30 import unicodedata 31 TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE) 32 33# TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be 34# encoded by the filesystem encoding (in strict mode). It can be None if we 35# cannot generate such filename. 36TESTFN_UNENCODABLE = None 37if os.name == 'nt': 38 # skip win32s (0) or Windows 9x/ME (1) 39 if sys.getwindowsversion().platform >= 2: 40 # Different kinds of characters from various languages to minimize the 41 # probability that the whole name is encodable to MBCS (issue #9819) 42 TESTFN_UNENCODABLE = TESTFN_ASCII + "-\u5171\u0141\u2661\u0363\uDC80" 43 try: 44 TESTFN_UNENCODABLE.encode(sys.getfilesystemencoding()) 45 except UnicodeEncodeError: 46 pass 47 else: 48 print('WARNING: The filename %r CAN be encoded by the filesystem ' 49 'encoding (%s). Unicode filename tests may not be effective' 50 % (TESTFN_UNENCODABLE, sys.getfilesystemencoding())) 51 TESTFN_UNENCODABLE = None 52# Mac OS X denies unencodable filenames (invalid utf-8) 53elif sys.platform != 'darwin': 54 try: 55 # ascii and utf-8 cannot encode the byte 0xff 56 b'\xff'.decode(sys.getfilesystemencoding()) 57 except UnicodeDecodeError: 58 # 0xff will be encoded using the surrogate character u+DCFF 59 TESTFN_UNENCODABLE = TESTFN_ASCII \ 60 + b'-\xff'.decode(sys.getfilesystemencoding(), 'surrogateescape') 61 else: 62 # File system encoding (eg. ISO-8859-* encodings) can encode 63 # the byte 0xff. Skip some unicode filename tests. 64 pass 65 66# FS_NONASCII: non-ASCII character encodable by os.fsencode(), 67# or an empty string if there is no such character. 68FS_NONASCII = '' 69for character in ( 70 # First try printable and common characters to have a readable filename. 71 # For each character, the encoding list are just example of encodings able 72 # to encode the character (the list is not exhaustive). 73 74 # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1 75 '\u00E6', 76 # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3 77 '\u0130', 78 # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257 79 '\u0141', 80 # U+03C6 (Greek Small Letter Phi): cp1253 81 '\u03C6', 82 # U+041A (Cyrillic Capital Letter Ka): cp1251 83 '\u041A', 84 # U+05D0 (Hebrew Letter Alef): Encodable to cp424 85 '\u05D0', 86 # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic 87 '\u060C', 88 # U+062A (Arabic Letter Teh): cp720 89 '\u062A', 90 # U+0E01 (Thai Character Ko Kai): cp874 91 '\u0E01', 92 93 # Then try more "special" characters. "special" because they may be 94 # interpreted or displayed differently depending on the exact locale 95 # encoding and the font. 96 97 # U+00A0 (No-Break Space) 98 '\u00A0', 99 # U+20AC (Euro Sign) 100 '\u20AC', 101): 102 try: 103 # If Python is set up to use the legacy 'mbcs' in Windows, 104 # 'replace' error mode is used, and encode() returns b'?' 105 # for characters missing in the ANSI codepage 106 if os.fsdecode(os.fsencode(character)) != character: 107 raise UnicodeError 108 except UnicodeError: 109 pass 110 else: 111 FS_NONASCII = character 112 break 113 114# Save the initial cwd 115SAVEDCWD = os.getcwd() 116 117# TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be 118# decoded from the filesystem encoding (in strict mode). It can be None if we 119# cannot generate such filename (ex: the latin1 encoding can decode any byte 120# sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks 121# to the surrogateescape error handler (PEP 383), but not from the filesystem 122# encoding in strict mode. 123TESTFN_UNDECODABLE = None 124for name in ( 125 # b'\xff' is not decodable by os.fsdecode() with code page 932. Windows 126 # accepts it to create a file or a directory, or don't accept to enter to 127 # such directory (when the bytes name is used). So test b'\xe7' first: 128 # it is not decodable from cp932. 129 b'\xe7w\xf0', 130 # undecodable from ASCII, UTF-8 131 b'\xff', 132 # undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856 133 # and cp857 134 b'\xae\xd5' 135 # undecodable from UTF-8 (UNIX and Mac OS X) 136 b'\xed\xb2\x80', b'\xed\xb4\x80', 137 # undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252, 138 # cp1253, cp1254, cp1255, cp1257, cp1258 139 b'\x81\x98', 140): 141 try: 142 name.decode(sys.getfilesystemencoding()) 143 except UnicodeDecodeError: 144 TESTFN_UNDECODABLE = os.fsencode(TESTFN_ASCII) + name 145 break 146 147if FS_NONASCII: 148 TESTFN_NONASCII = TESTFN_ASCII + FS_NONASCII 149else: 150 TESTFN_NONASCII = None 151TESTFN = TESTFN_NONASCII or TESTFN_ASCII 152 153 154def make_bad_fd(): 155 """ 156 Create an invalid file descriptor by opening and closing a file and return 157 its fd. 158 """ 159 file = open(TESTFN, "wb") 160 try: 161 return file.fileno() 162 finally: 163 file.close() 164 unlink(TESTFN) 165 166 167_can_symlink = None 168 169 170def can_symlink(): 171 global _can_symlink 172 if _can_symlink is not None: 173 return _can_symlink 174 symlink_path = TESTFN + "can_symlink" 175 try: 176 os.symlink(TESTFN, symlink_path) 177 can = True 178 except (OSError, NotImplementedError, AttributeError): 179 can = False 180 else: 181 os.remove(symlink_path) 182 _can_symlink = can 183 return can 184 185 186def skip_unless_symlink(test): 187 """Skip decorator for tests that require functional symlink""" 188 ok = can_symlink() 189 msg = "Requires functional symlink implementation" 190 return test if ok else unittest.skip(msg)(test) 191 192 193_can_xattr = None 194 195 196def can_xattr(): 197 import tempfile 198 global _can_xattr 199 if _can_xattr is not None: 200 return _can_xattr 201 if not hasattr(os, "setxattr"): 202 can = False 203 else: 204 import platform 205 tmp_dir = tempfile.mkdtemp() 206 tmp_fp, tmp_name = tempfile.mkstemp(dir=tmp_dir) 207 try: 208 with open(TESTFN, "wb") as fp: 209 try: 210 # TESTFN & tempfile may use different file systems with 211 # different capabilities 212 os.setxattr(tmp_fp, b"user.test", b"") 213 os.setxattr(tmp_name, b"trusted.foo", b"42") 214 os.setxattr(fp.fileno(), b"user.test", b"") 215 # Kernels < 2.6.39 don't respect setxattr flags. 216 kernel_version = platform.release() 217 m = re.match(r"2.6.(\d{1,2})", kernel_version) 218 can = m is None or int(m.group(1)) >= 39 219 except OSError: 220 can = False 221 finally: 222 unlink(TESTFN) 223 unlink(tmp_name) 224 rmdir(tmp_dir) 225 _can_xattr = can 226 return can 227 228 229def skip_unless_xattr(test): 230 """Skip decorator for tests that require functional extended attributes""" 231 ok = can_xattr() 232 msg = "no non-broken extended attribute support" 233 return test if ok else unittest.skip(msg)(test) 234 235 236def unlink(filename): 237 try: 238 _unlink(filename) 239 except (FileNotFoundError, NotADirectoryError): 240 pass 241 242 243if sys.platform.startswith("win"): 244 def _waitfor(func, pathname, waitall=False): 245 # Perform the operation 246 func(pathname) 247 # Now setup the wait loop 248 if waitall: 249 dirname = pathname 250 else: 251 dirname, name = os.path.split(pathname) 252 dirname = dirname or '.' 253 # Check for `pathname` to be removed from the filesystem. 254 # The exponential backoff of the timeout amounts to a total 255 # of ~1 second after which the deletion is probably an error 256 # anyway. 257 # Testing on an i7@4.3GHz shows that usually only 1 iteration is 258 # required when contention occurs. 259 timeout = 0.001 260 while timeout < 1.0: 261 # Note we are only testing for the existence of the file(s) in 262 # the contents of the directory regardless of any security or 263 # access rights. If we have made it this far, we have sufficient 264 # permissions to do that much using Python's equivalent of the 265 # Windows API FindFirstFile. 266 # Other Windows APIs can fail or give incorrect results when 267 # dealing with files that are pending deletion. 268 L = os.listdir(dirname) 269 if not (L if waitall else name in L): 270 return 271 # Increase the timeout and try again 272 time.sleep(timeout) 273 timeout *= 2 274 warnings.warn('tests may fail, delete still pending for ' + pathname, 275 RuntimeWarning, stacklevel=4) 276 277 def _unlink(filename): 278 _waitfor(os.unlink, filename) 279 280 def _rmdir(dirname): 281 _waitfor(os.rmdir, dirname) 282 283 def _rmtree(path): 284 from test.support import _force_run 285 286 def _rmtree_inner(path): 287 for name in _force_run(path, os.listdir, path): 288 fullname = os.path.join(path, name) 289 try: 290 mode = os.lstat(fullname).st_mode 291 except OSError as exc: 292 print("support.rmtree(): os.lstat(%r) failed with %s" 293 % (fullname, exc), 294 file=sys.__stderr__) 295 mode = 0 296 if stat.S_ISDIR(mode): 297 _waitfor(_rmtree_inner, fullname, waitall=True) 298 _force_run(fullname, os.rmdir, fullname) 299 else: 300 _force_run(fullname, os.unlink, fullname) 301 _waitfor(_rmtree_inner, path, waitall=True) 302 _waitfor(lambda p: _force_run(p, os.rmdir, p), path) 303 304 def _longpath(path): 305 try: 306 import ctypes 307 except ImportError: 308 # No ctypes means we can't expands paths. 309 pass 310 else: 311 buffer = ctypes.create_unicode_buffer(len(path) * 2) 312 length = ctypes.windll.kernel32.GetLongPathNameW(path, buffer, 313 len(buffer)) 314 if length: 315 return buffer[:length] 316 return path 317else: 318 _unlink = os.unlink 319 _rmdir = os.rmdir 320 321 def _rmtree(path): 322 import shutil 323 try: 324 shutil.rmtree(path) 325 return 326 except OSError: 327 pass 328 329 def _rmtree_inner(path): 330 from test.support import _force_run 331 for name in _force_run(path, os.listdir, path): 332 fullname = os.path.join(path, name) 333 try: 334 mode = os.lstat(fullname).st_mode 335 except OSError: 336 mode = 0 337 if stat.S_ISDIR(mode): 338 _rmtree_inner(fullname) 339 _force_run(path, os.rmdir, fullname) 340 else: 341 _force_run(path, os.unlink, fullname) 342 _rmtree_inner(path) 343 os.rmdir(path) 344 345 def _longpath(path): 346 return path 347 348 349def rmdir(dirname): 350 try: 351 _rmdir(dirname) 352 except FileNotFoundError: 353 pass 354 355 356def rmtree(path): 357 try: 358 _rmtree(path) 359 except FileNotFoundError: 360 pass 361 362 363@contextlib.contextmanager 364def temp_dir(path=None, quiet=False): 365 """Return a context manager that creates a temporary directory. 366 367 Arguments: 368 369 path: the directory to create temporarily. If omitted or None, 370 defaults to creating a temporary directory using tempfile.mkdtemp. 371 372 quiet: if False (the default), the context manager raises an exception 373 on error. Otherwise, if the path is specified and cannot be 374 created, only a warning is issued. 375 376 """ 377 import tempfile 378 dir_created = False 379 if path is None: 380 path = tempfile.mkdtemp() 381 dir_created = True 382 path = os.path.realpath(path) 383 else: 384 try: 385 os.mkdir(path) 386 dir_created = True 387 except OSError as exc: 388 if not quiet: 389 raise 390 warnings.warn(f'tests may fail, unable to create ' 391 f'temporary directory {path!r}: {exc}', 392 RuntimeWarning, stacklevel=3) 393 if dir_created: 394 pid = os.getpid() 395 try: 396 yield path 397 finally: 398 # In case the process forks, let only the parent remove the 399 # directory. The child has a different process id. (bpo-30028) 400 if dir_created and pid == os.getpid(): 401 rmtree(path) 402 403 404@contextlib.contextmanager 405def change_cwd(path, quiet=False): 406 """Return a context manager that changes the current working directory. 407 408 Arguments: 409 410 path: the directory to use as the temporary current working directory. 411 412 quiet: if False (the default), the context manager raises an exception 413 on error. Otherwise, it issues only a warning and keeps the current 414 working directory the same. 415 416 """ 417 saved_dir = os.getcwd() 418 try: 419 os.chdir(os.path.realpath(path)) 420 except OSError as exc: 421 if not quiet: 422 raise 423 warnings.warn(f'tests may fail, unable to change the current working ' 424 f'directory to {path!r}: {exc}', 425 RuntimeWarning, stacklevel=3) 426 try: 427 yield os.getcwd() 428 finally: 429 os.chdir(saved_dir) 430 431 432@contextlib.contextmanager 433def temp_cwd(name='tempcwd', quiet=False): 434 """ 435 Context manager that temporarily creates and changes the CWD. 436 437 The function temporarily changes the current working directory 438 after creating a temporary directory in the current directory with 439 name *name*. If *name* is None, the temporary directory is 440 created using tempfile.mkdtemp. 441 442 If *quiet* is False (default) and it is not possible to 443 create or change the CWD, an error is raised. If *quiet* is True, 444 only a warning is raised and the original CWD is used. 445 446 """ 447 with temp_dir(path=name, quiet=quiet) as temp_path: 448 with change_cwd(temp_path, quiet=quiet) as cwd_dir: 449 yield cwd_dir 450 451 452def create_empty_file(filename): 453 """Create an empty file. If the file already exists, truncate it.""" 454 fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC) 455 os.close(fd) 456 457 458@contextlib.contextmanager 459def open_dir_fd(path): 460 """Open a file descriptor to a directory.""" 461 assert os.path.isdir(path) 462 dir_fd = os.open(path, os.O_RDONLY) 463 try: 464 yield dir_fd 465 finally: 466 os.close(dir_fd) 467 468 469def fs_is_case_insensitive(directory): 470 """Detects if the file system for the specified directory 471 is case-insensitive.""" 472 import tempfile 473 with tempfile.NamedTemporaryFile(dir=directory) as base: 474 base_path = base.name 475 case_path = base_path.upper() 476 if case_path == base_path: 477 case_path = base_path.lower() 478 try: 479 return os.path.samefile(base_path, case_path) 480 except FileNotFoundError: 481 return False 482 483 484class FakePath: 485 """Simple implementing of the path protocol. 486 """ 487 def __init__(self, path): 488 self.path = path 489 490 def __repr__(self): 491 return f'<FakePath {self.path!r}>' 492 493 def __fspath__(self): 494 if (isinstance(self.path, BaseException) or 495 isinstance(self.path, type) and 496 issubclass(self.path, BaseException)): 497 raise self.path 498 else: 499 return self.path 500 501 502def fd_count(): 503 """Count the number of open file descriptors. 504 """ 505 if sys.platform.startswith(('linux', 'freebsd')): 506 try: 507 names = os.listdir("/proc/self/fd") 508 # Subtract one because listdir() internally opens a file 509 # descriptor to list the content of the /proc/self/fd/ directory. 510 return len(names) - 1 511 except FileNotFoundError: 512 pass 513 514 MAXFD = 256 515 if hasattr(os, 'sysconf'): 516 try: 517 MAXFD = os.sysconf("SC_OPEN_MAX") 518 except OSError: 519 pass 520 521 old_modes = None 522 if sys.platform == 'win32': 523 # bpo-25306, bpo-31009: Call CrtSetReportMode() to not kill the process 524 # on invalid file descriptor if Python is compiled in debug mode 525 try: 526 import msvcrt 527 msvcrt.CrtSetReportMode 528 except (AttributeError, ImportError): 529 # no msvcrt or a release build 530 pass 531 else: 532 old_modes = {} 533 for report_type in (msvcrt.CRT_WARN, 534 msvcrt.CRT_ERROR, 535 msvcrt.CRT_ASSERT): 536 old_modes[report_type] = msvcrt.CrtSetReportMode(report_type, 537 0) 538 539 try: 540 count = 0 541 for fd in range(MAXFD): 542 try: 543 # Prefer dup() over fstat(). fstat() can require input/output 544 # whereas dup() doesn't. 545 fd2 = os.dup(fd) 546 except OSError as e: 547 if e.errno != errno.EBADF: 548 raise 549 else: 550 os.close(fd2) 551 count += 1 552 finally: 553 if old_modes is not None: 554 for report_type in (msvcrt.CRT_WARN, 555 msvcrt.CRT_ERROR, 556 msvcrt.CRT_ASSERT): 557 msvcrt.CrtSetReportMode(report_type, old_modes[report_type]) 558 559 return count 560 561 562if hasattr(os, "umask"): 563 @contextlib.contextmanager 564 def temp_umask(umask): 565 """Context manager that temporarily sets the process umask.""" 566 oldmask = os.umask(umask) 567 try: 568 yield 569 finally: 570 os.umask(oldmask) 571 572 573class EnvironmentVarGuard(collections.abc.MutableMapping): 574 575 """Class to help protect the environment variable properly. Can be used as 576 a context manager.""" 577 578 def __init__(self): 579 self._environ = os.environ 580 self._changed = {} 581 582 def __getitem__(self, envvar): 583 return self._environ[envvar] 584 585 def __setitem__(self, envvar, value): 586 # Remember the initial value on the first access 587 if envvar not in self._changed: 588 self._changed[envvar] = self._environ.get(envvar) 589 self._environ[envvar] = value 590 591 def __delitem__(self, envvar): 592 # Remember the initial value on the first access 593 if envvar not in self._changed: 594 self._changed[envvar] = self._environ.get(envvar) 595 if envvar in self._environ: 596 del self._environ[envvar] 597 598 def keys(self): 599 return self._environ.keys() 600 601 def __iter__(self): 602 return iter(self._environ) 603 604 def __len__(self): 605 return len(self._environ) 606 607 def set(self, envvar, value): 608 self[envvar] = value 609 610 def unset(self, envvar): 611 del self[envvar] 612 613 def __enter__(self): 614 return self 615 616 def __exit__(self, *ignore_exc): 617 for (k, v) in self._changed.items(): 618 if v is None: 619 if k in self._environ: 620 del self._environ[k] 621 else: 622 self._environ[k] = v 623 os.environ = self._environ 624