1"""Utility functions for copying and archiving files and directory trees. 2 3XXX The functions here don't copy the resource fork or other metadata on Mac. 4 5""" 6 7import os 8import sys 9import stat 10import fnmatch 11import collections 12import errno 13 14try: 15 import zlib 16 del zlib 17 _ZLIB_SUPPORTED = True 18except ImportError: 19 _ZLIB_SUPPORTED = False 20 21try: 22 import bz2 23 del bz2 24 _BZ2_SUPPORTED = True 25except ImportError: 26 _BZ2_SUPPORTED = False 27 28try: 29 import lzma 30 del lzma 31 _LZMA_SUPPORTED = True 32except ImportError: 33 _LZMA_SUPPORTED = False 34 35_WINDOWS = os.name == 'nt' 36posix = nt = None 37if os.name == 'posix': 38 import posix 39elif _WINDOWS: 40 import nt 41 42COPY_BUFSIZE = 1024 * 1024 if _WINDOWS else 64 * 1024 43_USE_CP_SENDFILE = hasattr(os, "sendfile") and sys.platform.startswith("linux") 44_HAS_FCOPYFILE = posix and hasattr(posix, "_fcopyfile") # macOS 45 46# CMD defaults in Windows 10 47_WIN_DEFAULT_PATHEXT = ".COM;.EXE;.BAT;.CMD;.VBS;.JS;.WS;.MSC" 48 49__all__ = ["copyfileobj", "copyfile", "copymode", "copystat", "copy", "copy2", 50 "copytree", "move", "rmtree", "Error", "SpecialFileError", 51 "ExecError", "make_archive", "get_archive_formats", 52 "register_archive_format", "unregister_archive_format", 53 "get_unpack_formats", "register_unpack_format", 54 "unregister_unpack_format", "unpack_archive", 55 "ignore_patterns", "chown", "which", "get_terminal_size", 56 "SameFileError"] 57 # disk_usage is added later, if available on the platform 58 59class Error(OSError): 60 pass 61 62class SameFileError(Error): 63 """Raised when source and destination are the same file.""" 64 65class SpecialFileError(OSError): 66 """Raised when trying to do a kind of operation (e.g. copying) which is 67 not supported on a special file (e.g. a named pipe)""" 68 69class ExecError(OSError): 70 """Raised when a command could not be executed""" 71 72class ReadError(OSError): 73 """Raised when an archive cannot be read""" 74 75class RegistryError(Exception): 76 """Raised when a registry operation with the archiving 77 and unpacking registries fails""" 78 79class _GiveupOnFastCopy(Exception): 80 """Raised as a signal to fallback on using raw read()/write() 81 file copy when fast-copy functions fail to do so. 82 """ 83 84def _fastcopy_fcopyfile(fsrc, fdst, flags): 85 """Copy a regular file content or metadata by using high-performance 86 fcopyfile(3) syscall (macOS). 87 """ 88 try: 89 infd = fsrc.fileno() 90 outfd = fdst.fileno() 91 except Exception as err: 92 raise _GiveupOnFastCopy(err) # not a regular file 93 94 try: 95 posix._fcopyfile(infd, outfd, flags) 96 except OSError as err: 97 err.filename = fsrc.name 98 err.filename2 = fdst.name 99 if err.errno in {errno.EINVAL, errno.ENOTSUP}: 100 raise _GiveupOnFastCopy(err) 101 else: 102 raise err from None 103 104def _fastcopy_sendfile(fsrc, fdst): 105 """Copy data from one regular mmap-like fd to another by using 106 high-performance sendfile(2) syscall. 107 This should work on Linux >= 2.6.33 only. 108 """ 109 # Note: copyfileobj() is left alone in order to not introduce any 110 # unexpected breakage. Possible risks by using zero-copy calls 111 # in copyfileobj() are: 112 # - fdst cannot be open in "a"(ppend) mode 113 # - fsrc and fdst may be open in "t"(ext) mode 114 # - fsrc may be a BufferedReader (which hides unread data in a buffer), 115 # GzipFile (which decompresses data), HTTPResponse (which decodes 116 # chunks). 117 # - possibly others (e.g. encrypted fs/partition?) 118 global _USE_CP_SENDFILE 119 try: 120 infd = fsrc.fileno() 121 outfd = fdst.fileno() 122 except Exception as err: 123 raise _GiveupOnFastCopy(err) # not a regular file 124 125 # Hopefully the whole file will be copied in a single call. 126 # sendfile() is called in a loop 'till EOF is reached (0 return) 127 # so a bufsize smaller or bigger than the actual file size 128 # should not make any difference, also in case the file content 129 # changes while being copied. 130 try: 131 blocksize = max(os.fstat(infd).st_size, 2 ** 23) # min 8MiB 132 except OSError: 133 blocksize = 2 ** 27 # 128MiB 134 # On 32-bit architectures truncate to 1GiB to avoid OverflowError, 135 # see bpo-38319. 136 if sys.maxsize < 2 ** 32: 137 blocksize = min(blocksize, 2 ** 30) 138 139 offset = 0 140 while True: 141 try: 142 sent = os.sendfile(outfd, infd, offset, blocksize) 143 except OSError as err: 144 # ...in oder to have a more informative exception. 145 err.filename = fsrc.name 146 err.filename2 = fdst.name 147 148 if err.errno == errno.ENOTSOCK: 149 # sendfile() on this platform (probably Linux < 2.6.33) 150 # does not support copies between regular files (only 151 # sockets). 152 _USE_CP_SENDFILE = False 153 raise _GiveupOnFastCopy(err) 154 155 if err.errno == errno.ENOSPC: # filesystem is full 156 raise err from None 157 158 # Give up on first call and if no data was copied. 159 if offset == 0 and os.lseek(outfd, 0, os.SEEK_CUR) == 0: 160 raise _GiveupOnFastCopy(err) 161 162 raise err 163 else: 164 if sent == 0: 165 break # EOF 166 offset += sent 167 168def _copyfileobj_readinto(fsrc, fdst, length=COPY_BUFSIZE): 169 """readinto()/memoryview() based variant of copyfileobj(). 170 *fsrc* must support readinto() method and both files must be 171 open in binary mode. 172 """ 173 # Localize variable access to minimize overhead. 174 fsrc_readinto = fsrc.readinto 175 fdst_write = fdst.write 176 with memoryview(bytearray(length)) as mv: 177 while True: 178 n = fsrc_readinto(mv) 179 if not n: 180 break 181 elif n < length: 182 with mv[:n] as smv: 183 fdst.write(smv) 184 else: 185 fdst_write(mv) 186 187def copyfileobj(fsrc, fdst, length=0): 188 """copy data from file-like object fsrc to file-like object fdst""" 189 # Localize variable access to minimize overhead. 190 if not length: 191 length = COPY_BUFSIZE 192 fsrc_read = fsrc.read 193 fdst_write = fdst.write 194 while True: 195 buf = fsrc_read(length) 196 if not buf: 197 break 198 fdst_write(buf) 199 200def _samefile(src, dst): 201 # Macintosh, Unix. 202 if isinstance(src, os.DirEntry) and hasattr(os.path, 'samestat'): 203 try: 204 return os.path.samestat(src.stat(), os.stat(dst)) 205 except OSError: 206 return False 207 208 if hasattr(os.path, 'samefile'): 209 try: 210 return os.path.samefile(src, dst) 211 except OSError: 212 return False 213 214 # All other platforms: check for same pathname. 215 return (os.path.normcase(os.path.abspath(src)) == 216 os.path.normcase(os.path.abspath(dst))) 217 218def _stat(fn): 219 return fn.stat() if isinstance(fn, os.DirEntry) else os.stat(fn) 220 221def _islink(fn): 222 return fn.is_symlink() if isinstance(fn, os.DirEntry) else os.path.islink(fn) 223 224def copyfile(src, dst, *, follow_symlinks=True): 225 """Copy data from src to dst in the most efficient way possible. 226 227 If follow_symlinks is not set and src is a symbolic link, a new 228 symlink will be created instead of copying the file it points to. 229 230 """ 231 sys.audit("shutil.copyfile", src, dst) 232 233 if _samefile(src, dst): 234 raise SameFileError("{!r} and {!r} are the same file".format(src, dst)) 235 236 file_size = 0 237 for i, fn in enumerate([src, dst]): 238 try: 239 st = _stat(fn) 240 except OSError: 241 # File most likely does not exist 242 pass 243 else: 244 # XXX What about other special files? (sockets, devices...) 245 if stat.S_ISFIFO(st.st_mode): 246 fn = fn.path if isinstance(fn, os.DirEntry) else fn 247 raise SpecialFileError("`%s` is a named pipe" % fn) 248 if _WINDOWS and i == 0: 249 file_size = st.st_size 250 251 if not follow_symlinks and _islink(src): 252 os.symlink(os.readlink(src), dst) 253 else: 254 with open(src, 'rb') as fsrc: 255 try: 256 with open(dst, 'wb') as fdst: 257 # macOS 258 if _HAS_FCOPYFILE: 259 try: 260 _fastcopy_fcopyfile(fsrc, fdst, posix._COPYFILE_DATA) 261 return dst 262 except _GiveupOnFastCopy: 263 pass 264 # Linux 265 elif _USE_CP_SENDFILE: 266 try: 267 _fastcopy_sendfile(fsrc, fdst) 268 return dst 269 except _GiveupOnFastCopy: 270 pass 271 # Windows, see: 272 # https://github.com/python/cpython/pull/7160#discussion_r195405230 273 elif _WINDOWS and file_size > 0: 274 _copyfileobj_readinto(fsrc, fdst, min(file_size, COPY_BUFSIZE)) 275 return dst 276 277 copyfileobj(fsrc, fdst) 278 279 # Issue 43219, raise a less confusing exception 280 except IsADirectoryError as e: 281 if not os.path.exists(dst): 282 raise FileNotFoundError(f'Directory does not exist: {dst}') from e 283 else: 284 raise 285 286 return dst 287 288def copymode(src, dst, *, follow_symlinks=True): 289 """Copy mode bits from src to dst. 290 291 If follow_symlinks is not set, symlinks aren't followed if and only 292 if both `src` and `dst` are symlinks. If `lchmod` isn't available 293 (e.g. Linux) this method does nothing. 294 295 """ 296 sys.audit("shutil.copymode", src, dst) 297 298 if not follow_symlinks and _islink(src) and os.path.islink(dst): 299 if hasattr(os, 'lchmod'): 300 stat_func, chmod_func = os.lstat, os.lchmod 301 else: 302 return 303 else: 304 stat_func, chmod_func = _stat, os.chmod 305 306 st = stat_func(src) 307 chmod_func(dst, stat.S_IMODE(st.st_mode)) 308 309if hasattr(os, 'listxattr'): 310 def _copyxattr(src, dst, *, follow_symlinks=True): 311 """Copy extended filesystem attributes from `src` to `dst`. 312 313 Overwrite existing attributes. 314 315 If `follow_symlinks` is false, symlinks won't be followed. 316 317 """ 318 319 try: 320 names = os.listxattr(src, follow_symlinks=follow_symlinks) 321 except OSError as e: 322 if e.errno not in (errno.ENOTSUP, errno.ENODATA, errno.EINVAL): 323 raise 324 return 325 for name in names: 326 try: 327 value = os.getxattr(src, name, follow_symlinks=follow_symlinks) 328 os.setxattr(dst, name, value, follow_symlinks=follow_symlinks) 329 except OSError as e: 330 if e.errno not in (errno.EPERM, errno.ENOTSUP, errno.ENODATA, 331 errno.EINVAL): 332 raise 333else: 334 def _copyxattr(*args, **kwargs): 335 pass 336 337def copystat(src, dst, *, follow_symlinks=True): 338 """Copy file metadata 339 340 Copy the permission bits, last access time, last modification time, and 341 flags from `src` to `dst`. On Linux, copystat() also copies the "extended 342 attributes" where possible. The file contents, owner, and group are 343 unaffected. `src` and `dst` are path-like objects or path names given as 344 strings. 345 346 If the optional flag `follow_symlinks` is not set, symlinks aren't 347 followed if and only if both `src` and `dst` are symlinks. 348 """ 349 sys.audit("shutil.copystat", src, dst) 350 351 def _nop(*args, ns=None, follow_symlinks=None): 352 pass 353 354 # follow symlinks (aka don't not follow symlinks) 355 follow = follow_symlinks or not (_islink(src) and os.path.islink(dst)) 356 if follow: 357 # use the real function if it exists 358 def lookup(name): 359 return getattr(os, name, _nop) 360 else: 361 # use the real function only if it exists 362 # *and* it supports follow_symlinks 363 def lookup(name): 364 fn = getattr(os, name, _nop) 365 if fn in os.supports_follow_symlinks: 366 return fn 367 return _nop 368 369 if isinstance(src, os.DirEntry): 370 st = src.stat(follow_symlinks=follow) 371 else: 372 st = lookup("stat")(src, follow_symlinks=follow) 373 mode = stat.S_IMODE(st.st_mode) 374 lookup("utime")(dst, ns=(st.st_atime_ns, st.st_mtime_ns), 375 follow_symlinks=follow) 376 # We must copy extended attributes before the file is (potentially) 377 # chmod()'ed read-only, otherwise setxattr() will error with -EACCES. 378 _copyxattr(src, dst, follow_symlinks=follow) 379 try: 380 lookup("chmod")(dst, mode, follow_symlinks=follow) 381 except NotImplementedError: 382 # if we got a NotImplementedError, it's because 383 # * follow_symlinks=False, 384 # * lchown() is unavailable, and 385 # * either 386 # * fchownat() is unavailable or 387 # * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW. 388 # (it returned ENOSUP.) 389 # therefore we're out of options--we simply cannot chown the 390 # symlink. give up, suppress the error. 391 # (which is what shutil always did in this circumstance.) 392 pass 393 if hasattr(st, 'st_flags'): 394 try: 395 lookup("chflags")(dst, st.st_flags, follow_symlinks=follow) 396 except OSError as why: 397 for err in 'EOPNOTSUPP', 'ENOTSUP': 398 if hasattr(errno, err) and why.errno == getattr(errno, err): 399 break 400 else: 401 raise 402 403def copy(src, dst, *, follow_symlinks=True): 404 """Copy data and mode bits ("cp src dst"). Return the file's destination. 405 406 The destination may be a directory. 407 408 If follow_symlinks is false, symlinks won't be followed. This 409 resembles GNU's "cp -P src dst". 410 411 If source and destination are the same file, a SameFileError will be 412 raised. 413 414 """ 415 if os.path.isdir(dst): 416 dst = os.path.join(dst, os.path.basename(src)) 417 copyfile(src, dst, follow_symlinks=follow_symlinks) 418 copymode(src, dst, follow_symlinks=follow_symlinks) 419 return dst 420 421def copy2(src, dst, *, follow_symlinks=True): 422 """Copy data and metadata. Return the file's destination. 423 424 Metadata is copied with copystat(). Please see the copystat function 425 for more information. 426 427 The destination may be a directory. 428 429 If follow_symlinks is false, symlinks won't be followed. This 430 resembles GNU's "cp -P src dst". 431 """ 432 if os.path.isdir(dst): 433 dst = os.path.join(dst, os.path.basename(src)) 434 copyfile(src, dst, follow_symlinks=follow_symlinks) 435 copystat(src, dst, follow_symlinks=follow_symlinks) 436 return dst 437 438def ignore_patterns(*patterns): 439 """Function that can be used as copytree() ignore parameter. 440 441 Patterns is a sequence of glob-style patterns 442 that are used to exclude files""" 443 def _ignore_patterns(path, names): 444 ignored_names = [] 445 for pattern in patterns: 446 ignored_names.extend(fnmatch.filter(names, pattern)) 447 return set(ignored_names) 448 return _ignore_patterns 449 450def _copytree(entries, src, dst, symlinks, ignore, copy_function, 451 ignore_dangling_symlinks, dirs_exist_ok=False): 452 if ignore is not None: 453 ignored_names = ignore(os.fspath(src), [x.name for x in entries]) 454 else: 455 ignored_names = set() 456 457 os.makedirs(dst, exist_ok=dirs_exist_ok) 458 errors = [] 459 use_srcentry = copy_function is copy2 or copy_function is copy 460 461 for srcentry in entries: 462 if srcentry.name in ignored_names: 463 continue 464 srcname = os.path.join(src, srcentry.name) 465 dstname = os.path.join(dst, srcentry.name) 466 srcobj = srcentry if use_srcentry else srcname 467 try: 468 is_symlink = srcentry.is_symlink() 469 if is_symlink and os.name == 'nt': 470 # Special check for directory junctions, which appear as 471 # symlinks but we want to recurse. 472 lstat = srcentry.stat(follow_symlinks=False) 473 if lstat.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT: 474 is_symlink = False 475 if is_symlink: 476 linkto = os.readlink(srcname) 477 if symlinks: 478 # We can't just leave it to `copy_function` because legacy 479 # code with a custom `copy_function` may rely on copytree 480 # doing the right thing. 481 os.symlink(linkto, dstname) 482 copystat(srcobj, dstname, follow_symlinks=not symlinks) 483 else: 484 # ignore dangling symlink if the flag is on 485 if not os.path.exists(linkto) and ignore_dangling_symlinks: 486 continue 487 # otherwise let the copy occur. copy2 will raise an error 488 if srcentry.is_dir(): 489 copytree(srcobj, dstname, symlinks, ignore, 490 copy_function, dirs_exist_ok=dirs_exist_ok) 491 else: 492 copy_function(srcobj, dstname) 493 elif srcentry.is_dir(): 494 copytree(srcobj, dstname, symlinks, ignore, copy_function, 495 dirs_exist_ok=dirs_exist_ok) 496 else: 497 # Will raise a SpecialFileError for unsupported file types 498 copy_function(srcobj, dstname) 499 # catch the Error from the recursive copytree so that we can 500 # continue with other files 501 except Error as err: 502 errors.extend(err.args[0]) 503 except OSError as why: 504 errors.append((srcname, dstname, str(why))) 505 try: 506 copystat(src, dst) 507 except OSError as why: 508 # Copying file access times may fail on Windows 509 if getattr(why, 'winerror', None) is None: 510 errors.append((src, dst, str(why))) 511 if errors: 512 raise Error(errors) 513 return dst 514 515def copytree(src, dst, symlinks=False, ignore=None, copy_function=copy2, 516 ignore_dangling_symlinks=False, dirs_exist_ok=False): 517 """Recursively copy a directory tree and return the destination directory. 518 519 dirs_exist_ok dictates whether to raise an exception in case dst or any 520 missing parent directory already exists. 521 522 If exception(s) occur, an Error is raised with a list of reasons. 523 524 If the optional symlinks flag is true, symbolic links in the 525 source tree result in symbolic links in the destination tree; if 526 it is false, the contents of the files pointed to by symbolic 527 links are copied. If the file pointed by the symlink doesn't 528 exist, an exception will be added in the list of errors raised in 529 an Error exception at the end of the copy process. 530 531 You can set the optional ignore_dangling_symlinks flag to true if you 532 want to silence this exception. Notice that this has no effect on 533 platforms that don't support os.symlink. 534 535 The optional ignore argument is a callable. If given, it 536 is called with the `src` parameter, which is the directory 537 being visited by copytree(), and `names` which is the list of 538 `src` contents, as returned by os.listdir(): 539 540 callable(src, names) -> ignored_names 541 542 Since copytree() is called recursively, the callable will be 543 called once for each directory that is copied. It returns a 544 list of names relative to the `src` directory that should 545 not be copied. 546 547 The optional copy_function argument is a callable that will be used 548 to copy each file. It will be called with the source path and the 549 destination path as arguments. By default, copy2() is used, but any 550 function that supports the same signature (like copy()) can be used. 551 552 """ 553 sys.audit("shutil.copytree", src, dst) 554 with os.scandir(src) as itr: 555 entries = list(itr) 556 return _copytree(entries=entries, src=src, dst=dst, symlinks=symlinks, 557 ignore=ignore, copy_function=copy_function, 558 ignore_dangling_symlinks=ignore_dangling_symlinks, 559 dirs_exist_ok=dirs_exist_ok) 560 561if hasattr(os.stat_result, 'st_file_attributes'): 562 # Special handling for directory junctions to make them behave like 563 # symlinks for shutil.rmtree, since in general they do not appear as 564 # regular links. 565 def _rmtree_isdir(entry): 566 try: 567 st = entry.stat(follow_symlinks=False) 568 return (stat.S_ISDIR(st.st_mode) and not 569 (st.st_file_attributes & stat.FILE_ATTRIBUTE_REPARSE_POINT 570 and st.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT)) 571 except OSError: 572 return False 573 574 def _rmtree_islink(path): 575 try: 576 st = os.lstat(path) 577 return (stat.S_ISLNK(st.st_mode) or 578 (st.st_file_attributes & stat.FILE_ATTRIBUTE_REPARSE_POINT 579 and st.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT)) 580 except OSError: 581 return False 582else: 583 def _rmtree_isdir(entry): 584 try: 585 return entry.is_dir(follow_symlinks=False) 586 except OSError: 587 return False 588 589 def _rmtree_islink(path): 590 return os.path.islink(path) 591 592# version vulnerable to race conditions 593def _rmtree_unsafe(path, onerror): 594 try: 595 with os.scandir(path) as scandir_it: 596 entries = list(scandir_it) 597 except OSError: 598 onerror(os.scandir, path, sys.exc_info()) 599 entries = [] 600 for entry in entries: 601 fullname = entry.path 602 if _rmtree_isdir(entry): 603 try: 604 if entry.is_symlink(): 605 # This can only happen if someone replaces 606 # a directory with a symlink after the call to 607 # os.scandir or entry.is_dir above. 608 raise OSError("Cannot call rmtree on a symbolic link") 609 except OSError: 610 onerror(os.path.islink, fullname, sys.exc_info()) 611 continue 612 _rmtree_unsafe(fullname, onerror) 613 else: 614 try: 615 os.unlink(fullname) 616 except OSError: 617 onerror(os.unlink, fullname, sys.exc_info()) 618 try: 619 os.rmdir(path) 620 except OSError: 621 onerror(os.rmdir, path, sys.exc_info()) 622 623# Version using fd-based APIs to protect against races 624def _rmtree_safe_fd(topfd, path, onerror): 625 try: 626 with os.scandir(topfd) as scandir_it: 627 entries = list(scandir_it) 628 except OSError as err: 629 err.filename = path 630 onerror(os.scandir, path, sys.exc_info()) 631 return 632 for entry in entries: 633 fullname = os.path.join(path, entry.name) 634 try: 635 is_dir = entry.is_dir(follow_symlinks=False) 636 except OSError: 637 is_dir = False 638 else: 639 if is_dir: 640 try: 641 orig_st = entry.stat(follow_symlinks=False) 642 is_dir = stat.S_ISDIR(orig_st.st_mode) 643 except OSError: 644 onerror(os.lstat, fullname, sys.exc_info()) 645 continue 646 if is_dir: 647 try: 648 dirfd = os.open(entry.name, os.O_RDONLY, dir_fd=topfd) 649 except OSError: 650 onerror(os.open, fullname, sys.exc_info()) 651 else: 652 try: 653 if os.path.samestat(orig_st, os.fstat(dirfd)): 654 _rmtree_safe_fd(dirfd, fullname, onerror) 655 try: 656 os.rmdir(entry.name, dir_fd=topfd) 657 except OSError: 658 onerror(os.rmdir, fullname, sys.exc_info()) 659 else: 660 try: 661 # This can only happen if someone replaces 662 # a directory with a symlink after the call to 663 # os.scandir or stat.S_ISDIR above. 664 raise OSError("Cannot call rmtree on a symbolic " 665 "link") 666 except OSError: 667 onerror(os.path.islink, fullname, sys.exc_info()) 668 finally: 669 os.close(dirfd) 670 else: 671 try: 672 os.unlink(entry.name, dir_fd=topfd) 673 except OSError: 674 onerror(os.unlink, fullname, sys.exc_info()) 675 676_use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <= 677 os.supports_dir_fd and 678 os.scandir in os.supports_fd and 679 os.stat in os.supports_follow_symlinks) 680 681def rmtree(path, ignore_errors=False, onerror=None): 682 """Recursively delete a directory tree. 683 684 If ignore_errors is set, errors are ignored; otherwise, if onerror 685 is set, it is called to handle the error with arguments (func, 686 path, exc_info) where func is platform and implementation dependent; 687 path is the argument to that function that caused it to fail; and 688 exc_info is a tuple returned by sys.exc_info(). If ignore_errors 689 is false and onerror is None, an exception is raised. 690 691 """ 692 sys.audit("shutil.rmtree", path) 693 if ignore_errors: 694 def onerror(*args): 695 pass 696 elif onerror is None: 697 def onerror(*args): 698 raise 699 if _use_fd_functions: 700 # While the unsafe rmtree works fine on bytes, the fd based does not. 701 if isinstance(path, bytes): 702 path = os.fsdecode(path) 703 # Note: To guard against symlink races, we use the standard 704 # lstat()/open()/fstat() trick. 705 try: 706 orig_st = os.lstat(path) 707 except Exception: 708 onerror(os.lstat, path, sys.exc_info()) 709 return 710 try: 711 fd = os.open(path, os.O_RDONLY) 712 except Exception: 713 onerror(os.open, path, sys.exc_info()) 714 return 715 try: 716 if os.path.samestat(orig_st, os.fstat(fd)): 717 _rmtree_safe_fd(fd, path, onerror) 718 try: 719 os.rmdir(path) 720 except OSError: 721 onerror(os.rmdir, path, sys.exc_info()) 722 else: 723 try: 724 # symlinks to directories are forbidden, see bug #1669 725 raise OSError("Cannot call rmtree on a symbolic link") 726 except OSError: 727 onerror(os.path.islink, path, sys.exc_info()) 728 finally: 729 os.close(fd) 730 else: 731 try: 732 if _rmtree_islink(path): 733 # symlinks to directories are forbidden, see bug #1669 734 raise OSError("Cannot call rmtree on a symbolic link") 735 except OSError: 736 onerror(os.path.islink, path, sys.exc_info()) 737 # can't continue even if onerror hook returns 738 return 739 return _rmtree_unsafe(path, onerror) 740 741# Allow introspection of whether or not the hardening against symlink 742# attacks is supported on the current platform 743rmtree.avoids_symlink_attacks = _use_fd_functions 744 745def _basename(path): 746 """A basename() variant which first strips the trailing slash, if present. 747 Thus we always get the last component of the path, even for directories. 748 749 path: Union[PathLike, str] 750 751 e.g. 752 >>> os.path.basename('/bar/foo') 753 'foo' 754 >>> os.path.basename('/bar/foo/') 755 '' 756 >>> _basename('/bar/foo/') 757 'foo' 758 """ 759 path = os.fspath(path) 760 sep = os.path.sep + (os.path.altsep or '') 761 return os.path.basename(path.rstrip(sep)) 762 763def move(src, dst, copy_function=copy2): 764 """Recursively move a file or directory to another location. This is 765 similar to the Unix "mv" command. Return the file or directory's 766 destination. 767 768 If the destination is a directory or a symlink to a directory, the source 769 is moved inside the directory. The destination path must not already 770 exist. 771 772 If the destination already exists but is not a directory, it may be 773 overwritten depending on os.rename() semantics. 774 775 If the destination is on our current filesystem, then rename() is used. 776 Otherwise, src is copied to the destination and then removed. Symlinks are 777 recreated under the new name if os.rename() fails because of cross 778 filesystem renames. 779 780 The optional `copy_function` argument is a callable that will be used 781 to copy the source or it will be delegated to `copytree`. 782 By default, copy2() is used, but any function that supports the same 783 signature (like copy()) can be used. 784 785 A lot more could be done here... A look at a mv.c shows a lot of 786 the issues this implementation glosses over. 787 788 """ 789 sys.audit("shutil.move", src, dst) 790 real_dst = dst 791 if os.path.isdir(dst): 792 if _samefile(src, dst): 793 # We might be on a case insensitive filesystem, 794 # perform the rename anyway. 795 os.rename(src, dst) 796 return 797 798 # Using _basename instead of os.path.basename is important, as we must 799 # ignore any trailing slash to avoid the basename returning '' 800 real_dst = os.path.join(dst, _basename(src)) 801 802 if os.path.exists(real_dst): 803 raise Error("Destination path '%s' already exists" % real_dst) 804 try: 805 os.rename(src, real_dst) 806 except OSError: 807 if os.path.islink(src): 808 linkto = os.readlink(src) 809 os.symlink(linkto, real_dst) 810 os.unlink(src) 811 elif os.path.isdir(src): 812 if _destinsrc(src, dst): 813 raise Error("Cannot move a directory '%s' into itself" 814 " '%s'." % (src, dst)) 815 if (_is_immutable(src) 816 or (not os.access(src, os.W_OK) and os.listdir(src) 817 and sys.platform == 'darwin')): 818 raise PermissionError("Cannot move the non-empty directory " 819 "'%s': Lacking write permission to '%s'." 820 % (src, src)) 821 copytree(src, real_dst, copy_function=copy_function, 822 symlinks=True) 823 rmtree(src) 824 else: 825 copy_function(src, real_dst) 826 os.unlink(src) 827 return real_dst 828 829def _destinsrc(src, dst): 830 src = os.path.abspath(src) 831 dst = os.path.abspath(dst) 832 if not src.endswith(os.path.sep): 833 src += os.path.sep 834 if not dst.endswith(os.path.sep): 835 dst += os.path.sep 836 return dst.startswith(src) 837 838def _is_immutable(src): 839 st = _stat(src) 840 immutable_states = [stat.UF_IMMUTABLE, stat.SF_IMMUTABLE] 841 return hasattr(st, 'st_flags') and st.st_flags in immutable_states 842 843def _get_gid(name): 844 """Returns a gid, given a group name.""" 845 if name is None: 846 return None 847 848 try: 849 from grp import getgrnam 850 except ImportError: 851 return None 852 853 try: 854 result = getgrnam(name) 855 except KeyError: 856 result = None 857 if result is not None: 858 return result[2] 859 return None 860 861def _get_uid(name): 862 """Returns an uid, given a user name.""" 863 if name is None: 864 return None 865 866 try: 867 from pwd import getpwnam 868 except ImportError: 869 return None 870 871 try: 872 result = getpwnam(name) 873 except KeyError: 874 result = None 875 if result is not None: 876 return result[2] 877 return None 878 879def _make_tarball(base_name, base_dir, compress="gzip", verbose=0, dry_run=0, 880 owner=None, group=None, logger=None): 881 """Create a (possibly compressed) tar file from all the files under 882 'base_dir'. 883 884 'compress' must be "gzip" (the default), "bzip2", "xz", or None. 885 886 'owner' and 'group' can be used to define an owner and a group for the 887 archive that is being built. If not provided, the current owner and group 888 will be used. 889 890 The output tar file will be named 'base_name' + ".tar", possibly plus 891 the appropriate compression extension (".gz", ".bz2", or ".xz"). 892 893 Returns the output filename. 894 """ 895 if compress is None: 896 tar_compression = '' 897 elif _ZLIB_SUPPORTED and compress == 'gzip': 898 tar_compression = 'gz' 899 elif _BZ2_SUPPORTED and compress == 'bzip2': 900 tar_compression = 'bz2' 901 elif _LZMA_SUPPORTED and compress == 'xz': 902 tar_compression = 'xz' 903 else: 904 raise ValueError("bad value for 'compress', or compression format not " 905 "supported : {0}".format(compress)) 906 907 import tarfile # late import for breaking circular dependency 908 909 compress_ext = '.' + tar_compression if compress else '' 910 archive_name = base_name + '.tar' + compress_ext 911 archive_dir = os.path.dirname(archive_name) 912 913 if archive_dir and not os.path.exists(archive_dir): 914 if logger is not None: 915 logger.info("creating %s", archive_dir) 916 if not dry_run: 917 os.makedirs(archive_dir) 918 919 # creating the tarball 920 if logger is not None: 921 logger.info('Creating tar archive') 922 923 uid = _get_uid(owner) 924 gid = _get_gid(group) 925 926 def _set_uid_gid(tarinfo): 927 if gid is not None: 928 tarinfo.gid = gid 929 tarinfo.gname = group 930 if uid is not None: 931 tarinfo.uid = uid 932 tarinfo.uname = owner 933 return tarinfo 934 935 if not dry_run: 936 tar = tarfile.open(archive_name, 'w|%s' % tar_compression) 937 try: 938 tar.add(base_dir, filter=_set_uid_gid) 939 finally: 940 tar.close() 941 942 return archive_name 943 944def _make_zipfile(base_name, base_dir, verbose=0, dry_run=0, logger=None): 945 """Create a zip file from all the files under 'base_dir'. 946 947 The output zip file will be named 'base_name' + ".zip". Returns the 948 name of the output zip file. 949 """ 950 import zipfile # late import for breaking circular dependency 951 952 zip_filename = base_name + ".zip" 953 archive_dir = os.path.dirname(base_name) 954 955 if archive_dir and not os.path.exists(archive_dir): 956 if logger is not None: 957 logger.info("creating %s", archive_dir) 958 if not dry_run: 959 os.makedirs(archive_dir) 960 961 if logger is not None: 962 logger.info("creating '%s' and adding '%s' to it", 963 zip_filename, base_dir) 964 965 if not dry_run: 966 with zipfile.ZipFile(zip_filename, "w", 967 compression=zipfile.ZIP_DEFLATED) as zf: 968 path = os.path.normpath(base_dir) 969 if path != os.curdir: 970 zf.write(path, path) 971 if logger is not None: 972 logger.info("adding '%s'", path) 973 for dirpath, dirnames, filenames in os.walk(base_dir): 974 for name in sorted(dirnames): 975 path = os.path.normpath(os.path.join(dirpath, name)) 976 zf.write(path, path) 977 if logger is not None: 978 logger.info("adding '%s'", path) 979 for name in filenames: 980 path = os.path.normpath(os.path.join(dirpath, name)) 981 if os.path.isfile(path): 982 zf.write(path, path) 983 if logger is not None: 984 logger.info("adding '%s'", path) 985 986 return zip_filename 987 988_ARCHIVE_FORMATS = { 989 'tar': (_make_tarball, [('compress', None)], "uncompressed tar file"), 990} 991 992if _ZLIB_SUPPORTED: 993 _ARCHIVE_FORMATS['gztar'] = (_make_tarball, [('compress', 'gzip')], 994 "gzip'ed tar-file") 995 _ARCHIVE_FORMATS['zip'] = (_make_zipfile, [], "ZIP file") 996 997if _BZ2_SUPPORTED: 998 _ARCHIVE_FORMATS['bztar'] = (_make_tarball, [('compress', 'bzip2')], 999 "bzip2'ed tar-file") 1000 1001if _LZMA_SUPPORTED: 1002 _ARCHIVE_FORMATS['xztar'] = (_make_tarball, [('compress', 'xz')], 1003 "xz'ed tar-file") 1004 1005def get_archive_formats(): 1006 """Returns a list of supported formats for archiving and unarchiving. 1007 1008 Each element of the returned sequence is a tuple (name, description) 1009 """ 1010 formats = [(name, registry[2]) for name, registry in 1011 _ARCHIVE_FORMATS.items()] 1012 formats.sort() 1013 return formats 1014 1015def register_archive_format(name, function, extra_args=None, description=''): 1016 """Registers an archive format. 1017 1018 name is the name of the format. function is the callable that will be 1019 used to create archives. If provided, extra_args is a sequence of 1020 (name, value) tuples that will be passed as arguments to the callable. 1021 description can be provided to describe the format, and will be returned 1022 by the get_archive_formats() function. 1023 """ 1024 if extra_args is None: 1025 extra_args = [] 1026 if not callable(function): 1027 raise TypeError('The %s object is not callable' % function) 1028 if not isinstance(extra_args, (tuple, list)): 1029 raise TypeError('extra_args needs to be a sequence') 1030 for element in extra_args: 1031 if not isinstance(element, (tuple, list)) or len(element) !=2: 1032 raise TypeError('extra_args elements are : (arg_name, value)') 1033 1034 _ARCHIVE_FORMATS[name] = (function, extra_args, description) 1035 1036def unregister_archive_format(name): 1037 del _ARCHIVE_FORMATS[name] 1038 1039def make_archive(base_name, format, root_dir=None, base_dir=None, verbose=0, 1040 dry_run=0, owner=None, group=None, logger=None): 1041 """Create an archive file (eg. zip or tar). 1042 1043 'base_name' is the name of the file to create, minus any format-specific 1044 extension; 'format' is the archive format: one of "zip", "tar", "gztar", 1045 "bztar", or "xztar". Or any other registered format. 1046 1047 'root_dir' is a directory that will be the root directory of the 1048 archive; ie. we typically chdir into 'root_dir' before creating the 1049 archive. 'base_dir' is the directory where we start archiving from; 1050 ie. 'base_dir' will be the common prefix of all files and 1051 directories in the archive. 'root_dir' and 'base_dir' both default 1052 to the current directory. Returns the name of the archive file. 1053 1054 'owner' and 'group' are used when creating a tar archive. By default, 1055 uses the current owner and group. 1056 """ 1057 sys.audit("shutil.make_archive", base_name, format, root_dir, base_dir) 1058 save_cwd = os.getcwd() 1059 if root_dir is not None: 1060 if logger is not None: 1061 logger.debug("changing into '%s'", root_dir) 1062 base_name = os.path.abspath(base_name) 1063 if not dry_run: 1064 os.chdir(root_dir) 1065 1066 if base_dir is None: 1067 base_dir = os.curdir 1068 1069 kwargs = {'dry_run': dry_run, 'logger': logger} 1070 1071 try: 1072 format_info = _ARCHIVE_FORMATS[format] 1073 except KeyError: 1074 raise ValueError("unknown archive format '%s'" % format) from None 1075 1076 func = format_info[0] 1077 for arg, val in format_info[1]: 1078 kwargs[arg] = val 1079 1080 if format != 'zip': 1081 kwargs['owner'] = owner 1082 kwargs['group'] = group 1083 1084 try: 1085 filename = func(base_name, base_dir, **kwargs) 1086 finally: 1087 if root_dir is not None: 1088 if logger is not None: 1089 logger.debug("changing back to '%s'", save_cwd) 1090 os.chdir(save_cwd) 1091 1092 return filename 1093 1094 1095def get_unpack_formats(): 1096 """Returns a list of supported formats for unpacking. 1097 1098 Each element of the returned sequence is a tuple 1099 (name, extensions, description) 1100 """ 1101 formats = [(name, info[0], info[3]) for name, info in 1102 _UNPACK_FORMATS.items()] 1103 formats.sort() 1104 return formats 1105 1106def _check_unpack_options(extensions, function, extra_args): 1107 """Checks what gets registered as an unpacker.""" 1108 # first make sure no other unpacker is registered for this extension 1109 existing_extensions = {} 1110 for name, info in _UNPACK_FORMATS.items(): 1111 for ext in info[0]: 1112 existing_extensions[ext] = name 1113 1114 for extension in extensions: 1115 if extension in existing_extensions: 1116 msg = '%s is already registered for "%s"' 1117 raise RegistryError(msg % (extension, 1118 existing_extensions[extension])) 1119 1120 if not callable(function): 1121 raise TypeError('The registered function must be a callable') 1122 1123 1124def register_unpack_format(name, extensions, function, extra_args=None, 1125 description=''): 1126 """Registers an unpack format. 1127 1128 `name` is the name of the format. `extensions` is a list of extensions 1129 corresponding to the format. 1130 1131 `function` is the callable that will be 1132 used to unpack archives. The callable will receive archives to unpack. 1133 If it's unable to handle an archive, it needs to raise a ReadError 1134 exception. 1135 1136 If provided, `extra_args` is a sequence of 1137 (name, value) tuples that will be passed as arguments to the callable. 1138 description can be provided to describe the format, and will be returned 1139 by the get_unpack_formats() function. 1140 """ 1141 if extra_args is None: 1142 extra_args = [] 1143 _check_unpack_options(extensions, function, extra_args) 1144 _UNPACK_FORMATS[name] = extensions, function, extra_args, description 1145 1146def unregister_unpack_format(name): 1147 """Removes the pack format from the registry.""" 1148 del _UNPACK_FORMATS[name] 1149 1150def _ensure_directory(path): 1151 """Ensure that the parent directory of `path` exists""" 1152 dirname = os.path.dirname(path) 1153 if not os.path.isdir(dirname): 1154 os.makedirs(dirname) 1155 1156def _unpack_zipfile(filename, extract_dir): 1157 """Unpack zip `filename` to `extract_dir` 1158 """ 1159 import zipfile # late import for breaking circular dependency 1160 1161 if not zipfile.is_zipfile(filename): 1162 raise ReadError("%s is not a zip file" % filename) 1163 1164 zip = zipfile.ZipFile(filename) 1165 try: 1166 for info in zip.infolist(): 1167 name = info.filename 1168 1169 # don't extract absolute paths or ones with .. in them 1170 if name.startswith('/') or '..' in name: 1171 continue 1172 1173 targetpath = os.path.join(extract_dir, *name.split('/')) 1174 if not targetpath: 1175 continue 1176 1177 _ensure_directory(targetpath) 1178 if not name.endswith('/'): 1179 # file 1180 with zip.open(name, 'r') as source, \ 1181 open(targetpath, 'wb') as target: 1182 copyfileobj(source, target) 1183 finally: 1184 zip.close() 1185 1186def _unpack_tarfile(filename, extract_dir): 1187 """Unpack tar/tar.gz/tar.bz2/tar.xz `filename` to `extract_dir` 1188 """ 1189 import tarfile # late import for breaking circular dependency 1190 try: 1191 tarobj = tarfile.open(filename) 1192 except tarfile.TarError: 1193 raise ReadError( 1194 "%s is not a compressed or uncompressed tar file" % filename) 1195 try: 1196 tarobj.extractall(extract_dir) 1197 finally: 1198 tarobj.close() 1199 1200_UNPACK_FORMATS = { 1201 'tar': (['.tar'], _unpack_tarfile, [], "uncompressed tar file"), 1202 'zip': (['.zip'], _unpack_zipfile, [], "ZIP file"), 1203} 1204 1205if _ZLIB_SUPPORTED: 1206 _UNPACK_FORMATS['gztar'] = (['.tar.gz', '.tgz'], _unpack_tarfile, [], 1207 "gzip'ed tar-file") 1208 1209if _BZ2_SUPPORTED: 1210 _UNPACK_FORMATS['bztar'] = (['.tar.bz2', '.tbz2'], _unpack_tarfile, [], 1211 "bzip2'ed tar-file") 1212 1213if _LZMA_SUPPORTED: 1214 _UNPACK_FORMATS['xztar'] = (['.tar.xz', '.txz'], _unpack_tarfile, [], 1215 "xz'ed tar-file") 1216 1217def _find_unpack_format(filename): 1218 for name, info in _UNPACK_FORMATS.items(): 1219 for extension in info[0]: 1220 if filename.endswith(extension): 1221 return name 1222 return None 1223 1224def unpack_archive(filename, extract_dir=None, format=None): 1225 """Unpack an archive. 1226 1227 `filename` is the name of the archive. 1228 1229 `extract_dir` is the name of the target directory, where the archive 1230 is unpacked. If not provided, the current working directory is used. 1231 1232 `format` is the archive format: one of "zip", "tar", "gztar", "bztar", 1233 or "xztar". Or any other registered format. If not provided, 1234 unpack_archive will use the filename extension and see if an unpacker 1235 was registered for that extension. 1236 1237 In case none is found, a ValueError is raised. 1238 """ 1239 sys.audit("shutil.unpack_archive", filename, extract_dir, format) 1240 1241 if extract_dir is None: 1242 extract_dir = os.getcwd() 1243 1244 extract_dir = os.fspath(extract_dir) 1245 filename = os.fspath(filename) 1246 1247 if format is not None: 1248 try: 1249 format_info = _UNPACK_FORMATS[format] 1250 except KeyError: 1251 raise ValueError("Unknown unpack format '{0}'".format(format)) from None 1252 1253 func = format_info[1] 1254 func(filename, extract_dir, **dict(format_info[2])) 1255 else: 1256 # we need to look at the registered unpackers supported extensions 1257 format = _find_unpack_format(filename) 1258 if format is None: 1259 raise ReadError("Unknown archive format '{0}'".format(filename)) 1260 1261 func = _UNPACK_FORMATS[format][1] 1262 kwargs = dict(_UNPACK_FORMATS[format][2]) 1263 func(filename, extract_dir, **kwargs) 1264 1265 1266if hasattr(os, 'statvfs'): 1267 1268 __all__.append('disk_usage') 1269 _ntuple_diskusage = collections.namedtuple('usage', 'total used free') 1270 _ntuple_diskusage.total.__doc__ = 'Total space in bytes' 1271 _ntuple_diskusage.used.__doc__ = 'Used space in bytes' 1272 _ntuple_diskusage.free.__doc__ = 'Free space in bytes' 1273 1274 def disk_usage(path): 1275 """Return disk usage statistics about the given path. 1276 1277 Returned value is a named tuple with attributes 'total', 'used' and 1278 'free', which are the amount of total, used and free space, in bytes. 1279 """ 1280 st = os.statvfs(path) 1281 free = st.f_bavail * st.f_frsize 1282 total = st.f_blocks * st.f_frsize 1283 used = (st.f_blocks - st.f_bfree) * st.f_frsize 1284 return _ntuple_diskusage(total, used, free) 1285 1286elif _WINDOWS: 1287 1288 __all__.append('disk_usage') 1289 _ntuple_diskusage = collections.namedtuple('usage', 'total used free') 1290 1291 def disk_usage(path): 1292 """Return disk usage statistics about the given path. 1293 1294 Returned values is a named tuple with attributes 'total', 'used' and 1295 'free', which are the amount of total, used and free space, in bytes. 1296 """ 1297 total, free = nt._getdiskusage(path) 1298 used = total - free 1299 return _ntuple_diskusage(total, used, free) 1300 1301 1302def chown(path, user=None, group=None): 1303 """Change owner user and group of the given path. 1304 1305 user and group can be the uid/gid or the user/group names, and in that case, 1306 they are converted to their respective uid/gid. 1307 """ 1308 sys.audit('shutil.chown', path, user, group) 1309 1310 if user is None and group is None: 1311 raise ValueError("user and/or group must be set") 1312 1313 _user = user 1314 _group = group 1315 1316 # -1 means don't change it 1317 if user is None: 1318 _user = -1 1319 # user can either be an int (the uid) or a string (the system username) 1320 elif isinstance(user, str): 1321 _user = _get_uid(user) 1322 if _user is None: 1323 raise LookupError("no such user: {!r}".format(user)) 1324 1325 if group is None: 1326 _group = -1 1327 elif not isinstance(group, int): 1328 _group = _get_gid(group) 1329 if _group is None: 1330 raise LookupError("no such group: {!r}".format(group)) 1331 1332 os.chown(path, _user, _group) 1333 1334def get_terminal_size(fallback=(80, 24)): 1335 """Get the size of the terminal window. 1336 1337 For each of the two dimensions, the environment variable, COLUMNS 1338 and LINES respectively, is checked. If the variable is defined and 1339 the value is a positive integer, it is used. 1340 1341 When COLUMNS or LINES is not defined, which is the common case, 1342 the terminal connected to sys.__stdout__ is queried 1343 by invoking os.get_terminal_size. 1344 1345 If the terminal size cannot be successfully queried, either because 1346 the system doesn't support querying, or because we are not 1347 connected to a terminal, the value given in fallback parameter 1348 is used. Fallback defaults to (80, 24) which is the default 1349 size used by many terminal emulators. 1350 1351 The value returned is a named tuple of type os.terminal_size. 1352 """ 1353 # columns, lines are the working values 1354 try: 1355 columns = int(os.environ['COLUMNS']) 1356 except (KeyError, ValueError): 1357 columns = 0 1358 1359 try: 1360 lines = int(os.environ['LINES']) 1361 except (KeyError, ValueError): 1362 lines = 0 1363 1364 # only query if necessary 1365 if columns <= 0 or lines <= 0: 1366 try: 1367 size = os.get_terminal_size(sys.__stdout__.fileno()) 1368 except (AttributeError, ValueError, OSError): 1369 # stdout is None, closed, detached, or not a terminal, or 1370 # os.get_terminal_size() is unsupported 1371 size = os.terminal_size(fallback) 1372 if columns <= 0: 1373 columns = size.columns 1374 if lines <= 0: 1375 lines = size.lines 1376 1377 return os.terminal_size((columns, lines)) 1378 1379 1380# Check that a given file can be accessed with the correct mode. 1381# Additionally check that `file` is not a directory, as on Windows 1382# directories pass the os.access check. 1383def _access_check(fn, mode): 1384 return (os.path.exists(fn) and os.access(fn, mode) 1385 and not os.path.isdir(fn)) 1386 1387 1388def which(cmd, mode=os.F_OK | os.X_OK, path=None): 1389 """Given a command, mode, and a PATH string, return the path which 1390 conforms to the given mode on the PATH, or None if there is no such 1391 file. 1392 1393 `mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result 1394 of os.environ.get("PATH"), or can be overridden with a custom search 1395 path. 1396 1397 """ 1398 # If we're given a path with a directory part, look it up directly rather 1399 # than referring to PATH directories. This includes checking relative to the 1400 # current directory, e.g. ./script 1401 if os.path.dirname(cmd): 1402 if _access_check(cmd, mode): 1403 return cmd 1404 return None 1405 1406 use_bytes = isinstance(cmd, bytes) 1407 1408 if path is None: 1409 path = os.environ.get("PATH", None) 1410 if path is None: 1411 try: 1412 path = os.confstr("CS_PATH") 1413 except (AttributeError, ValueError): 1414 # os.confstr() or CS_PATH is not available 1415 path = os.defpath 1416 # bpo-35755: Don't use os.defpath if the PATH environment variable is 1417 # set to an empty string 1418 1419 # PATH='' doesn't match, whereas PATH=':' looks in the current directory 1420 if not path: 1421 return None 1422 1423 if use_bytes: 1424 path = os.fsencode(path) 1425 path = path.split(os.fsencode(os.pathsep)) 1426 else: 1427 path = os.fsdecode(path) 1428 path = path.split(os.pathsep) 1429 1430 if sys.platform == "win32": 1431 # The current directory takes precedence on Windows. 1432 curdir = os.curdir 1433 if use_bytes: 1434 curdir = os.fsencode(curdir) 1435 if curdir not in path: 1436 path.insert(0, curdir) 1437 1438 # PATHEXT is necessary to check on Windows. 1439 pathext_source = os.getenv("PATHEXT") or _WIN_DEFAULT_PATHEXT 1440 pathext = [ext for ext in pathext_source.split(os.pathsep) if ext] 1441 1442 if use_bytes: 1443 pathext = [os.fsencode(ext) for ext in pathext] 1444 # See if the given file matches any of the expected path extensions. 1445 # This will allow us to short circuit when given "python.exe". 1446 # If it does match, only test that one, otherwise we have to try 1447 # others. 1448 if any(cmd.lower().endswith(ext.lower()) for ext in pathext): 1449 files = [cmd] 1450 else: 1451 files = [cmd + ext for ext in pathext] 1452 else: 1453 # On other platforms you don't have things like PATHEXT to tell you 1454 # what file suffixes are executable, so just pass on cmd as-is. 1455 files = [cmd] 1456 1457 seen = set() 1458 for dir in path: 1459 normdir = os.path.normcase(dir) 1460 if not normdir in seen: 1461 seen.add(normdir) 1462 for thefile in files: 1463 name = os.path.join(dir, thefile) 1464 if _access_check(name, mode): 1465 return name 1466 return None 1467