1import collections.abc 2import contextlib 3import errno 4import os 5import re 6import stat 7import sys 8import time 9import unittest 10import warnings 11 12 13# Filename used for testing 14if os.name == 'java': 15 # Jython disallows @ in module names 16 TESTFN_ASCII = '$test' 17else: 18 TESTFN_ASCII = '@test' 19 20# Disambiguate TESTFN for parallel testing, while letting it remain a valid 21# module name. 22TESTFN_ASCII = "{}_{}_tmp".format(TESTFN_ASCII, os.getpid()) 23 24# TESTFN_UNICODE is a non-ascii filename 25TESTFN_UNICODE = TESTFN_ASCII + "-\xe0\xf2\u0258\u0141\u011f" 26if sys.platform == 'darwin': 27 # In Mac OS X's VFS API file names are, by definition, canonically 28 # decomposed Unicode, encoded using UTF-8. See QA1173: 29 # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html 30 import unicodedata 31 TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE) 32 33# TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be 34# encoded by the filesystem encoding (in strict mode). It can be None if we 35# cannot generate such filename. 36TESTFN_UNENCODABLE = None 37if os.name == 'nt': 38 # skip win32s (0) or Windows 9x/ME (1) 39 if sys.getwindowsversion().platform >= 2: 40 # Different kinds of characters from various languages to minimize the 41 # probability that the whole name is encodable to MBCS (issue #9819) 42 TESTFN_UNENCODABLE = TESTFN_ASCII + "-\u5171\u0141\u2661\u0363\uDC80" 43 try: 44 TESTFN_UNENCODABLE.encode(sys.getfilesystemencoding()) 45 except UnicodeEncodeError: 46 pass 47 else: 48 print('WARNING: The filename %r CAN be encoded by the filesystem ' 49 'encoding (%s). Unicode filename tests may not be effective' 50 % (TESTFN_UNENCODABLE, sys.getfilesystemencoding())) 51 TESTFN_UNENCODABLE = None 52# Mac OS X denies unencodable filenames (invalid utf-8) 53elif sys.platform != 'darwin': 54 try: 55 # ascii and utf-8 cannot encode the byte 0xff 56 b'\xff'.decode(sys.getfilesystemencoding()) 57 except UnicodeDecodeError: 58 # 0xff will be encoded using the surrogate character u+DCFF 59 TESTFN_UNENCODABLE = TESTFN_ASCII \ 60 + b'-\xff'.decode(sys.getfilesystemencoding(), 'surrogateescape') 61 else: 62 # File system encoding (eg. ISO-8859-* encodings) can encode 63 # the byte 0xff. Skip some unicode filename tests. 64 pass 65 66# FS_NONASCII: non-ASCII character encodable by os.fsencode(), 67# or an empty string if there is no such character. 68FS_NONASCII = '' 69for character in ( 70 # First try printable and common characters to have a readable filename. 71 # For each character, the encoding list are just example of encodings able 72 # to encode the character (the list is not exhaustive). 73 74 # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1 75 '\u00E6', 76 # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3 77 '\u0130', 78 # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257 79 '\u0141', 80 # U+03C6 (Greek Small Letter Phi): cp1253 81 '\u03C6', 82 # U+041A (Cyrillic Capital Letter Ka): cp1251 83 '\u041A', 84 # U+05D0 (Hebrew Letter Alef): Encodable to cp424 85 '\u05D0', 86 # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic 87 '\u060C', 88 # U+062A (Arabic Letter Teh): cp720 89 '\u062A', 90 # U+0E01 (Thai Character Ko Kai): cp874 91 '\u0E01', 92 93 # Then try more "special" characters. "special" because they may be 94 # interpreted or displayed differently depending on the exact locale 95 # encoding and the font. 96 97 # U+00A0 (No-Break Space) 98 '\u00A0', 99 # U+20AC (Euro Sign) 100 '\u20AC', 101): 102 try: 103 # If Python is set up to use the legacy 'mbcs' in Windows, 104 # 'replace' error mode is used, and encode() returns b'?' 105 # for characters missing in the ANSI codepage 106 if os.fsdecode(os.fsencode(character)) != character: 107 raise UnicodeError 108 except UnicodeError: 109 pass 110 else: 111 FS_NONASCII = character 112 break 113 114# Save the initial cwd 115SAVEDCWD = os.getcwd() 116 117# TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be 118# decoded from the filesystem encoding (in strict mode). It can be None if we 119# cannot generate such filename (ex: the latin1 encoding can decode any byte 120# sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks 121# to the surrogateescape error handler (PEP 383), but not from the filesystem 122# encoding in strict mode. 123TESTFN_UNDECODABLE = None 124for name in ( 125 # b'\xff' is not decodable by os.fsdecode() with code page 932. Windows 126 # accepts it to create a file or a directory, or don't accept to enter to 127 # such directory (when the bytes name is used). So test b'\xe7' first: 128 # it is not decodable from cp932. 129 b'\xe7w\xf0', 130 # undecodable from ASCII, UTF-8 131 b'\xff', 132 # undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856 133 # and cp857 134 b'\xae\xd5' 135 # undecodable from UTF-8 (UNIX and Mac OS X) 136 b'\xed\xb2\x80', b'\xed\xb4\x80', 137 # undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252, 138 # cp1253, cp1254, cp1255, cp1257, cp1258 139 b'\x81\x98', 140): 141 try: 142 name.decode(sys.getfilesystemencoding()) 143 except UnicodeDecodeError: 144 TESTFN_UNDECODABLE = os.fsencode(TESTFN_ASCII) + name 145 break 146 147if FS_NONASCII: 148 TESTFN_NONASCII = TESTFN_ASCII + FS_NONASCII 149else: 150 TESTFN_NONASCII = None 151TESTFN = TESTFN_NONASCII or TESTFN_ASCII 152 153 154def make_bad_fd(): 155 """ 156 Create an invalid file descriptor by opening and closing a file and return 157 its fd. 158 """ 159 file = open(TESTFN, "wb") 160 try: 161 return file.fileno() 162 finally: 163 file.close() 164 unlink(TESTFN) 165 166 167_can_symlink = None 168 169 170def can_symlink(): 171 global _can_symlink 172 if _can_symlink is not None: 173 return _can_symlink 174 symlink_path = TESTFN + "can_symlink" 175 try: 176 os.symlink(TESTFN, symlink_path) 177 can = True 178 except (OSError, NotImplementedError, AttributeError): 179 can = False 180 else: 181 os.remove(symlink_path) 182 _can_symlink = can 183 return can 184 185 186def skip_unless_symlink(test): 187 """Skip decorator for tests that require functional symlink""" 188 ok = can_symlink() 189 msg = "Requires functional symlink implementation" 190 return test if ok else unittest.skip(msg)(test) 191 192 193_can_xattr = None 194 195 196def can_xattr(): 197 import tempfile 198 global _can_xattr 199 if _can_xattr is not None: 200 return _can_xattr 201 if not hasattr(os, "setxattr"): 202 can = False 203 else: 204 import platform 205 tmp_dir = tempfile.mkdtemp() 206 tmp_fp, tmp_name = tempfile.mkstemp(dir=tmp_dir) 207 try: 208 with open(TESTFN, "wb") as fp: 209 try: 210 # TESTFN & tempfile may use different file systems with 211 # different capabilities 212 os.setxattr(tmp_fp, b"user.test", b"") 213 os.setxattr(tmp_name, b"trusted.foo", b"42") 214 os.setxattr(fp.fileno(), b"user.test", b"") 215 # Kernels < 2.6.39 don't respect setxattr flags. 216 kernel_version = platform.release() 217 m = re.match(r"2.6.(\d{1,2})", kernel_version) 218 can = m is None or int(m.group(1)) >= 39 219 except OSError: 220 can = False 221 finally: 222 unlink(TESTFN) 223 unlink(tmp_name) 224 rmdir(tmp_dir) 225 _can_xattr = can 226 return can 227 228 229def skip_unless_xattr(test): 230 """Skip decorator for tests that require functional extended attributes""" 231 ok = can_xattr() 232 msg = "no non-broken extended attribute support" 233 return test if ok else unittest.skip(msg)(test) 234 235 236def unlink(filename): 237 try: 238 _unlink(filename) 239 except (FileNotFoundError, NotADirectoryError): 240 pass 241 242 243if sys.platform.startswith("win"): 244 def _waitfor(func, pathname, waitall=False): 245 # Perform the operation 246 func(pathname) 247 # Now setup the wait loop 248 if waitall: 249 dirname = pathname 250 else: 251 dirname, name = os.path.split(pathname) 252 dirname = dirname or '.' 253 # Check for `pathname` to be removed from the filesystem. 254 # The exponential backoff of the timeout amounts to a total 255 # of ~1 second after which the deletion is probably an error 256 # anyway. 257 # Testing on an i7@4.3GHz shows that usually only 1 iteration is 258 # required when contention occurs. 259 timeout = 0.001 260 while timeout < 1.0: 261 # Note we are only testing for the existence of the file(s) in 262 # the contents of the directory regardless of any security or 263 # access rights. If we have made it this far, we have sufficient 264 # permissions to do that much using Python's equivalent of the 265 # Windows API FindFirstFile. 266 # Other Windows APIs can fail or give incorrect results when 267 # dealing with files that are pending deletion. 268 L = os.listdir(dirname) 269 if not (L if waitall else name in L): 270 return 271 # Increase the timeout and try again 272 time.sleep(timeout) 273 timeout *= 2 274 warnings.warn('tests may fail, delete still pending for ' + pathname, 275 RuntimeWarning, stacklevel=4) 276 277 def _unlink(filename): 278 _waitfor(os.unlink, filename) 279 280 def _rmdir(dirname): 281 _waitfor(os.rmdir, dirname) 282 283 def _rmtree(path): 284 from test.support import _force_run 285 286 def _rmtree_inner(path): 287 for name in _force_run(path, os.listdir, path): 288 fullname = os.path.join(path, name) 289 try: 290 mode = os.lstat(fullname).st_mode 291 except OSError as exc: 292 print("support.rmtree(): os.lstat(%r) failed with %s" 293 % (fullname, exc), 294 file=sys.__stderr__) 295 mode = 0 296 if stat.S_ISDIR(mode): 297 _waitfor(_rmtree_inner, fullname, waitall=True) 298 _force_run(fullname, os.rmdir, fullname) 299 else: 300 _force_run(fullname, os.unlink, fullname) 301 _waitfor(_rmtree_inner, path, waitall=True) 302 _waitfor(lambda p: _force_run(p, os.rmdir, p), path) 303 304 def _longpath(path): 305 try: 306 import ctypes 307 except ImportError: 308 # No ctypes means we can't expands paths. 309 pass 310 else: 311 buffer = ctypes.create_unicode_buffer(len(path) * 2) 312 length = ctypes.windll.kernel32.GetLongPathNameW(path, buffer, 313 len(buffer)) 314 if length: 315 return buffer[:length] 316 return path 317else: 318 _unlink = os.unlink 319 _rmdir = os.rmdir 320 321 def _rmtree(path): 322 import shutil 323 try: 324 shutil.rmtree(path) 325 return 326 except OSError: 327 pass 328 329 def _rmtree_inner(path): 330 from test.support import _force_run 331 for name in _force_run(path, os.listdir, path): 332 fullname = os.path.join(path, name) 333 try: 334 mode = os.lstat(fullname).st_mode 335 except OSError: 336 mode = 0 337 if stat.S_ISDIR(mode): 338 _rmtree_inner(fullname) 339 _force_run(path, os.rmdir, fullname) 340 else: 341 _force_run(path, os.unlink, fullname) 342 _rmtree_inner(path) 343 os.rmdir(path) 344 345 def _longpath(path): 346 return path 347 348 349def rmdir(dirname): 350 try: 351 _rmdir(dirname) 352 except FileNotFoundError: 353 pass 354 355 356def rmtree(path): 357 try: 358 _rmtree(path) 359 except FileNotFoundError: 360 pass 361 362 363@contextlib.contextmanager 364def temp_dir(path=None, quiet=False): 365 """Return a context manager that creates a temporary directory. 366 367 Arguments: 368 369 path: the directory to create temporarily. If omitted or None, 370 defaults to creating a temporary directory using tempfile.mkdtemp. 371 372 quiet: if False (the default), the context manager raises an exception 373 on error. Otherwise, if the path is specified and cannot be 374 created, only a warning is issued. 375 376 """ 377 import tempfile 378 dir_created = False 379 if path is None: 380 path = tempfile.mkdtemp() 381 dir_created = True 382 path = os.path.realpath(path) 383 else: 384 try: 385 os.mkdir(path) 386 dir_created = True 387 except OSError as exc: 388 if not quiet: 389 raise 390 warnings.warn(f'tests may fail, unable to create ' 391 f'temporary directory {path!r}: {exc}', 392 RuntimeWarning, stacklevel=3) 393 if dir_created: 394 pid = os.getpid() 395 try: 396 yield path 397 finally: 398 # In case the process forks, let only the parent remove the 399 # directory. The child has a different process id. (bpo-30028) 400 if dir_created and pid == os.getpid(): 401 rmtree(path) 402 403 404@contextlib.contextmanager 405def change_cwd(path, quiet=False): 406 """Return a context manager that changes the current working directory. 407 408 Arguments: 409 410 path: the directory to use as the temporary current working directory. 411 412 quiet: if False (the default), the context manager raises an exception 413 on error. Otherwise, it issues only a warning and keeps the current 414 working directory the same. 415 416 """ 417 saved_dir = os.getcwd() 418 try: 419 os.chdir(os.path.realpath(path)) 420 except OSError as exc: 421 if not quiet: 422 raise 423 warnings.warn(f'tests may fail, unable to change the current working ' 424 f'directory to {path!r}: {exc}', 425 RuntimeWarning, stacklevel=3) 426 try: 427 yield os.getcwd() 428 finally: 429 os.chdir(saved_dir) 430 431 432@contextlib.contextmanager 433def temp_cwd(name='tempcwd', quiet=False): 434 """ 435 Context manager that temporarily creates and changes the CWD. 436 437 The function temporarily changes the current working directory 438 after creating a temporary directory in the current directory with 439 name *name*. If *name* is None, the temporary directory is 440 created using tempfile.mkdtemp. 441 442 If *quiet* is False (default) and it is not possible to 443 create or change the CWD, an error is raised. If *quiet* is True, 444 only a warning is raised and the original CWD is used. 445 446 """ 447 with temp_dir(path=name, quiet=quiet) as temp_path: 448 with change_cwd(temp_path, quiet=quiet) as cwd_dir: 449 yield cwd_dir 450 451 452def create_empty_file(filename): 453 """Create an empty file. If the file already exists, truncate it.""" 454 fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC) 455 os.close(fd) 456 457 458def fs_is_case_insensitive(directory): 459 """Detects if the file system for the specified directory 460 is case-insensitive.""" 461 import tempfile 462 with tempfile.NamedTemporaryFile(dir=directory) as base: 463 base_path = base.name 464 case_path = base_path.upper() 465 if case_path == base_path: 466 case_path = base_path.lower() 467 try: 468 return os.path.samefile(base_path, case_path) 469 except FileNotFoundError: 470 return False 471 472 473class FakePath: 474 """Simple implementing of the path protocol. 475 """ 476 def __init__(self, path): 477 self.path = path 478 479 def __repr__(self): 480 return f'<FakePath {self.path!r}>' 481 482 def __fspath__(self): 483 if (isinstance(self.path, BaseException) or 484 isinstance(self.path, type) and 485 issubclass(self.path, BaseException)): 486 raise self.path 487 else: 488 return self.path 489 490 491def fd_count(): 492 """Count the number of open file descriptors. 493 """ 494 if sys.platform.startswith(('linux', 'freebsd')): 495 try: 496 names = os.listdir("/proc/self/fd") 497 # Subtract one because listdir() internally opens a file 498 # descriptor to list the content of the /proc/self/fd/ directory. 499 return len(names) - 1 500 except FileNotFoundError: 501 pass 502 503 MAXFD = 256 504 if hasattr(os, 'sysconf'): 505 try: 506 MAXFD = os.sysconf("SC_OPEN_MAX") 507 except OSError: 508 pass 509 510 old_modes = None 511 if sys.platform == 'win32': 512 # bpo-25306, bpo-31009: Call CrtSetReportMode() to not kill the process 513 # on invalid file descriptor if Python is compiled in debug mode 514 try: 515 import msvcrt 516 msvcrt.CrtSetReportMode 517 except (AttributeError, ImportError): 518 # no msvcrt or a release build 519 pass 520 else: 521 old_modes = {} 522 for report_type in (msvcrt.CRT_WARN, 523 msvcrt.CRT_ERROR, 524 msvcrt.CRT_ASSERT): 525 old_modes[report_type] = msvcrt.CrtSetReportMode(report_type, 526 0) 527 528 try: 529 count = 0 530 for fd in range(MAXFD): 531 try: 532 # Prefer dup() over fstat(). fstat() can require input/output 533 # whereas dup() doesn't. 534 fd2 = os.dup(fd) 535 except OSError as e: 536 if e.errno != errno.EBADF: 537 raise 538 else: 539 os.close(fd2) 540 count += 1 541 finally: 542 if old_modes is not None: 543 for report_type in (msvcrt.CRT_WARN, 544 msvcrt.CRT_ERROR, 545 msvcrt.CRT_ASSERT): 546 msvcrt.CrtSetReportMode(report_type, old_modes[report_type]) 547 548 return count 549 550 551if hasattr(os, "umask"): 552 @contextlib.contextmanager 553 def temp_umask(umask): 554 """Context manager that temporarily sets the process umask.""" 555 oldmask = os.umask(umask) 556 try: 557 yield 558 finally: 559 os.umask(oldmask) 560 561 562class EnvironmentVarGuard(collections.abc.MutableMapping): 563 564 """Class to help protect the environment variable properly. Can be used as 565 a context manager.""" 566 567 def __init__(self): 568 self._environ = os.environ 569 self._changed = {} 570 571 def __getitem__(self, envvar): 572 return self._environ[envvar] 573 574 def __setitem__(self, envvar, value): 575 # Remember the initial value on the first access 576 if envvar not in self._changed: 577 self._changed[envvar] = self._environ.get(envvar) 578 self._environ[envvar] = value 579 580 def __delitem__(self, envvar): 581 # Remember the initial value on the first access 582 if envvar not in self._changed: 583 self._changed[envvar] = self._environ.get(envvar) 584 if envvar in self._environ: 585 del self._environ[envvar] 586 587 def keys(self): 588 return self._environ.keys() 589 590 def __iter__(self): 591 return iter(self._environ) 592 593 def __len__(self): 594 return len(self._environ) 595 596 def set(self, envvar, value): 597 self[envvar] = value 598 599 def unset(self, envvar): 600 del self[envvar] 601 602 def __enter__(self): 603 return self 604 605 def __exit__(self, *ignore_exc): 606 for (k, v) in self._changed.items(): 607 if v is None: 608 if k in self._environ: 609 del self._environ[k] 610 else: 611 self._environ[k] = v 612 os.environ = self._environ 613