1"""Utility functions for copying and archiving files and directory trees. 2 3XXX The functions here don't copy the resource fork or other metadata on Mac. 4 5""" 6 7import os 8import sys 9import stat 10import fnmatch 11import collections 12import errno 13 14try: 15 import zlib 16 del zlib 17 _ZLIB_SUPPORTED = True 18except ImportError: 19 _ZLIB_SUPPORTED = False 20 21try: 22 import bz2 23 del bz2 24 _BZ2_SUPPORTED = True 25except ImportError: 26 _BZ2_SUPPORTED = False 27 28try: 29 import lzma 30 del lzma 31 _LZMA_SUPPORTED = True 32except ImportError: 33 _LZMA_SUPPORTED = False 34 35_WINDOWS = os.name == 'nt' 36posix = nt = None 37if os.name == 'posix': 38 import posix 39elif _WINDOWS: 40 import nt 41 42COPY_BUFSIZE = 1024 * 1024 if _WINDOWS else 64 * 1024 43_USE_CP_SENDFILE = hasattr(os, "sendfile") and sys.platform.startswith("linux") 44_HAS_FCOPYFILE = posix and hasattr(posix, "_fcopyfile") # macOS 45 46# CMD defaults in Windows 10 47_WIN_DEFAULT_PATHEXT = ".COM;.EXE;.BAT;.CMD;.VBS;.JS;.WS;.MSC" 48 49__all__ = ["copyfileobj", "copyfile", "copymode", "copystat", "copy", "copy2", 50 "copytree", "move", "rmtree", "Error", "SpecialFileError", 51 "ExecError", "make_archive", "get_archive_formats", 52 "register_archive_format", "unregister_archive_format", 53 "get_unpack_formats", "register_unpack_format", 54 "unregister_unpack_format", "unpack_archive", 55 "ignore_patterns", "chown", "which", "get_terminal_size", 56 "SameFileError"] 57 # disk_usage is added later, if available on the platform 58 59class Error(OSError): 60 pass 61 62class SameFileError(Error): 63 """Raised when source and destination are the same file.""" 64 65class SpecialFileError(OSError): 66 """Raised when trying to do a kind of operation (e.g. copying) which is 67 not supported on a special file (e.g. a named pipe)""" 68 69class ExecError(OSError): 70 """Raised when a command could not be executed""" 71 72class ReadError(OSError): 73 """Raised when an archive cannot be read""" 74 75class RegistryError(Exception): 76 """Raised when a registry operation with the archiving 77 and unpacking registries fails""" 78 79class _GiveupOnFastCopy(Exception): 80 """Raised as a signal to fallback on using raw read()/write() 81 file copy when fast-copy functions fail to do so. 82 """ 83 84def _fastcopy_fcopyfile(fsrc, fdst, flags): 85 """Copy a regular file content or metadata by using high-performance 86 fcopyfile(3) syscall (macOS). 87 """ 88 try: 89 infd = fsrc.fileno() 90 outfd = fdst.fileno() 91 except Exception as err: 92 raise _GiveupOnFastCopy(err) # not a regular file 93 94 try: 95 posix._fcopyfile(infd, outfd, flags) 96 except OSError as err: 97 err.filename = fsrc.name 98 err.filename2 = fdst.name 99 if err.errno in {errno.EINVAL, errno.ENOTSUP}: 100 raise _GiveupOnFastCopy(err) 101 else: 102 raise err from None 103 104def _fastcopy_sendfile(fsrc, fdst): 105 """Copy data from one regular mmap-like fd to another by using 106 high-performance sendfile(2) syscall. 107 This should work on Linux >= 2.6.33 only. 108 """ 109 # Note: copyfileobj() is left alone in order to not introduce any 110 # unexpected breakage. Possible risks by using zero-copy calls 111 # in copyfileobj() are: 112 # - fdst cannot be open in "a"(ppend) mode 113 # - fsrc and fdst may be open in "t"(ext) mode 114 # - fsrc may be a BufferedReader (which hides unread data in a buffer), 115 # GzipFile (which decompresses data), HTTPResponse (which decodes 116 # chunks). 117 # - possibly others (e.g. encrypted fs/partition?) 118 global _USE_CP_SENDFILE 119 try: 120 infd = fsrc.fileno() 121 outfd = fdst.fileno() 122 except Exception as err: 123 raise _GiveupOnFastCopy(err) # not a regular file 124 125 # Hopefully the whole file will be copied in a single call. 126 # sendfile() is called in a loop 'till EOF is reached (0 return) 127 # so a bufsize smaller or bigger than the actual file size 128 # should not make any difference, also in case the file content 129 # changes while being copied. 130 try: 131 blocksize = max(os.fstat(infd).st_size, 2 ** 23) # min 8MiB 132 except OSError: 133 blocksize = 2 ** 27 # 128MiB 134 # On 32-bit architectures truncate to 1GiB to avoid OverflowError, 135 # see bpo-38319. 136 if sys.maxsize < 2 ** 32: 137 blocksize = min(blocksize, 2 ** 30) 138 139 offset = 0 140 while True: 141 try: 142 sent = os.sendfile(outfd, infd, offset, blocksize) 143 except OSError as err: 144 # ...in oder to have a more informative exception. 145 err.filename = fsrc.name 146 err.filename2 = fdst.name 147 148 if err.errno == errno.ENOTSOCK: 149 # sendfile() on this platform (probably Linux < 2.6.33) 150 # does not support copies between regular files (only 151 # sockets). 152 _USE_CP_SENDFILE = False 153 raise _GiveupOnFastCopy(err) 154 155 if err.errno == errno.ENOSPC: # filesystem is full 156 raise err from None 157 158 # Give up on first call and if no data was copied. 159 if offset == 0 and os.lseek(outfd, 0, os.SEEK_CUR) == 0: 160 raise _GiveupOnFastCopy(err) 161 162 raise err 163 else: 164 if sent == 0: 165 break # EOF 166 offset += sent 167 168def _copyfileobj_readinto(fsrc, fdst, length=COPY_BUFSIZE): 169 """readinto()/memoryview() based variant of copyfileobj(). 170 *fsrc* must support readinto() method and both files must be 171 open in binary mode. 172 """ 173 # Localize variable access to minimize overhead. 174 fsrc_readinto = fsrc.readinto 175 fdst_write = fdst.write 176 with memoryview(bytearray(length)) as mv: 177 while True: 178 n = fsrc_readinto(mv) 179 if not n: 180 break 181 elif n < length: 182 with mv[:n] as smv: 183 fdst.write(smv) 184 else: 185 fdst_write(mv) 186 187def copyfileobj(fsrc, fdst, length=0): 188 """copy data from file-like object fsrc to file-like object fdst""" 189 # Localize variable access to minimize overhead. 190 if not length: 191 length = COPY_BUFSIZE 192 fsrc_read = fsrc.read 193 fdst_write = fdst.write 194 while True: 195 buf = fsrc_read(length) 196 if not buf: 197 break 198 fdst_write(buf) 199 200def _samefile(src, dst): 201 # Macintosh, Unix. 202 if isinstance(src, os.DirEntry) and hasattr(os.path, 'samestat'): 203 try: 204 return os.path.samestat(src.stat(), os.stat(dst)) 205 except OSError: 206 return False 207 208 if hasattr(os.path, 'samefile'): 209 try: 210 return os.path.samefile(src, dst) 211 except OSError: 212 return False 213 214 # All other platforms: check for same pathname. 215 return (os.path.normcase(os.path.abspath(src)) == 216 os.path.normcase(os.path.abspath(dst))) 217 218def _stat(fn): 219 return fn.stat() if isinstance(fn, os.DirEntry) else os.stat(fn) 220 221def _islink(fn): 222 return fn.is_symlink() if isinstance(fn, os.DirEntry) else os.path.islink(fn) 223 224def copyfile(src, dst, *, follow_symlinks=True): 225 """Copy data from src to dst in the most efficient way possible. 226 227 If follow_symlinks is not set and src is a symbolic link, a new 228 symlink will be created instead of copying the file it points to. 229 230 """ 231 sys.audit("shutil.copyfile", src, dst) 232 233 if _samefile(src, dst): 234 raise SameFileError("{!r} and {!r} are the same file".format(src, dst)) 235 236 file_size = 0 237 for i, fn in enumerate([src, dst]): 238 try: 239 st = _stat(fn) 240 except OSError: 241 # File most likely does not exist 242 pass 243 else: 244 # XXX What about other special files? (sockets, devices...) 245 if stat.S_ISFIFO(st.st_mode): 246 fn = fn.path if isinstance(fn, os.DirEntry) else fn 247 raise SpecialFileError("`%s` is a named pipe" % fn) 248 if _WINDOWS and i == 0: 249 file_size = st.st_size 250 251 if not follow_symlinks and _islink(src): 252 os.symlink(os.readlink(src), dst) 253 else: 254 with open(src, 'rb') as fsrc: 255 try: 256 with open(dst, 'wb') as fdst: 257 # macOS 258 if _HAS_FCOPYFILE: 259 try: 260 _fastcopy_fcopyfile(fsrc, fdst, posix._COPYFILE_DATA) 261 return dst 262 except _GiveupOnFastCopy: 263 pass 264 # Linux 265 elif _USE_CP_SENDFILE: 266 try: 267 _fastcopy_sendfile(fsrc, fdst) 268 return dst 269 except _GiveupOnFastCopy: 270 pass 271 # Windows, see: 272 # https://github.com/python/cpython/pull/7160#discussion_r195405230 273 elif _WINDOWS and file_size > 0: 274 _copyfileobj_readinto(fsrc, fdst, min(file_size, COPY_BUFSIZE)) 275 return dst 276 277 copyfileobj(fsrc, fdst) 278 279 # Issue 43219, raise a less confusing exception 280 except IsADirectoryError as e: 281 if not os.path.exists(dst): 282 raise FileNotFoundError(f'Directory does not exist: {dst}') from e 283 else: 284 raise 285 286 return dst 287 288def copymode(src, dst, *, follow_symlinks=True): 289 """Copy mode bits from src to dst. 290 291 If follow_symlinks is not set, symlinks aren't followed if and only 292 if both `src` and `dst` are symlinks. If `lchmod` isn't available 293 (e.g. Linux) this method does nothing. 294 295 """ 296 sys.audit("shutil.copymode", src, dst) 297 298 if not follow_symlinks and _islink(src) and os.path.islink(dst): 299 if hasattr(os, 'lchmod'): 300 stat_func, chmod_func = os.lstat, os.lchmod 301 else: 302 return 303 else: 304 stat_func, chmod_func = _stat, os.chmod 305 306 st = stat_func(src) 307 chmod_func(dst, stat.S_IMODE(st.st_mode)) 308 309if hasattr(os, 'listxattr'): 310 def _copyxattr(src, dst, *, follow_symlinks=True): 311 """Copy extended filesystem attributes from `src` to `dst`. 312 313 Overwrite existing attributes. 314 315 If `follow_symlinks` is false, symlinks won't be followed. 316 317 """ 318 319 try: 320 names = os.listxattr(src, follow_symlinks=follow_symlinks) 321 except OSError as e: 322 if e.errno not in (errno.ENOTSUP, errno.ENODATA, errno.EINVAL): 323 raise 324 return 325 for name in names: 326 try: 327 value = os.getxattr(src, name, follow_symlinks=follow_symlinks) 328 os.setxattr(dst, name, value, follow_symlinks=follow_symlinks) 329 except OSError as e: 330 if e.errno not in (errno.EPERM, errno.ENOTSUP, errno.ENODATA, 331 errno.EINVAL): 332 raise 333else: 334 def _copyxattr(*args, **kwargs): 335 pass 336 337def copystat(src, dst, *, follow_symlinks=True): 338 """Copy file metadata 339 340 Copy the permission bits, last access time, last modification time, and 341 flags from `src` to `dst`. On Linux, copystat() also copies the "extended 342 attributes" where possible. The file contents, owner, and group are 343 unaffected. `src` and `dst` are path-like objects or path names given as 344 strings. 345 346 If the optional flag `follow_symlinks` is not set, symlinks aren't 347 followed if and only if both `src` and `dst` are symlinks. 348 """ 349 sys.audit("shutil.copystat", src, dst) 350 351 def _nop(*args, ns=None, follow_symlinks=None): 352 pass 353 354 # follow symlinks (aka don't not follow symlinks) 355 follow = follow_symlinks or not (_islink(src) and os.path.islink(dst)) 356 if follow: 357 # use the real function if it exists 358 def lookup(name): 359 return getattr(os, name, _nop) 360 else: 361 # use the real function only if it exists 362 # *and* it supports follow_symlinks 363 def lookup(name): 364 fn = getattr(os, name, _nop) 365 if fn in os.supports_follow_symlinks: 366 return fn 367 return _nop 368 369 if isinstance(src, os.DirEntry): 370 st = src.stat(follow_symlinks=follow) 371 else: 372 st = lookup("stat")(src, follow_symlinks=follow) 373 mode = stat.S_IMODE(st.st_mode) 374 lookup("utime")(dst, ns=(st.st_atime_ns, st.st_mtime_ns), 375 follow_symlinks=follow) 376 # We must copy extended attributes before the file is (potentially) 377 # chmod()'ed read-only, otherwise setxattr() will error with -EACCES. 378 _copyxattr(src, dst, follow_symlinks=follow) 379 try: 380 lookup("chmod")(dst, mode, follow_symlinks=follow) 381 except NotImplementedError: 382 # if we got a NotImplementedError, it's because 383 # * follow_symlinks=False, 384 # * lchown() is unavailable, and 385 # * either 386 # * fchownat() is unavailable or 387 # * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW. 388 # (it returned ENOSUP.) 389 # therefore we're out of options--we simply cannot chown the 390 # symlink. give up, suppress the error. 391 # (which is what shutil always did in this circumstance.) 392 pass 393 if hasattr(st, 'st_flags'): 394 try: 395 lookup("chflags")(dst, st.st_flags, follow_symlinks=follow) 396 except OSError as why: 397 for err in 'EOPNOTSUPP', 'ENOTSUP': 398 if hasattr(errno, err) and why.errno == getattr(errno, err): 399 break 400 else: 401 raise 402 403def copy(src, dst, *, follow_symlinks=True): 404 """Copy data and mode bits ("cp src dst"). Return the file's destination. 405 406 The destination may be a directory. 407 408 If follow_symlinks is false, symlinks won't be followed. This 409 resembles GNU's "cp -P src dst". 410 411 If source and destination are the same file, a SameFileError will be 412 raised. 413 414 """ 415 if os.path.isdir(dst): 416 dst = os.path.join(dst, os.path.basename(src)) 417 copyfile(src, dst, follow_symlinks=follow_symlinks) 418 copymode(src, dst, follow_symlinks=follow_symlinks) 419 return dst 420 421def copy2(src, dst, *, follow_symlinks=True): 422 """Copy data and metadata. Return the file's destination. 423 424 Metadata is copied with copystat(). Please see the copystat function 425 for more information. 426 427 The destination may be a directory. 428 429 If follow_symlinks is false, symlinks won't be followed. This 430 resembles GNU's "cp -P src dst". 431 """ 432 if os.path.isdir(dst): 433 dst = os.path.join(dst, os.path.basename(src)) 434 copyfile(src, dst, follow_symlinks=follow_symlinks) 435 copystat(src, dst, follow_symlinks=follow_symlinks) 436 return dst 437 438def ignore_patterns(*patterns): 439 """Function that can be used as copytree() ignore parameter. 440 441 Patterns is a sequence of glob-style patterns 442 that are used to exclude files""" 443 def _ignore_patterns(path, names): 444 ignored_names = [] 445 for pattern in patterns: 446 ignored_names.extend(fnmatch.filter(names, pattern)) 447 return set(ignored_names) 448 return _ignore_patterns 449 450def _copytree(entries, src, dst, symlinks, ignore, copy_function, 451 ignore_dangling_symlinks, dirs_exist_ok=False): 452 if ignore is not None: 453 ignored_names = ignore(os.fspath(src), [x.name for x in entries]) 454 else: 455 ignored_names = set() 456 457 os.makedirs(dst, exist_ok=dirs_exist_ok) 458 errors = [] 459 use_srcentry = copy_function is copy2 or copy_function is copy 460 461 for srcentry in entries: 462 if srcentry.name in ignored_names: 463 continue 464 srcname = os.path.join(src, srcentry.name) 465 dstname = os.path.join(dst, srcentry.name) 466 srcobj = srcentry if use_srcentry else srcname 467 try: 468 is_symlink = srcentry.is_symlink() 469 if is_symlink and os.name == 'nt': 470 # Special check for directory junctions, which appear as 471 # symlinks but we want to recurse. 472 lstat = srcentry.stat(follow_symlinks=False) 473 if lstat.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT: 474 is_symlink = False 475 if is_symlink: 476 linkto = os.readlink(srcname) 477 if symlinks: 478 # We can't just leave it to `copy_function` because legacy 479 # code with a custom `copy_function` may rely on copytree 480 # doing the right thing. 481 os.symlink(linkto, dstname) 482 copystat(srcobj, dstname, follow_symlinks=not symlinks) 483 else: 484 # ignore dangling symlink if the flag is on 485 if not os.path.exists(linkto) and ignore_dangling_symlinks: 486 continue 487 # otherwise let the copy occur. copy2 will raise an error 488 if srcentry.is_dir(): 489 copytree(srcobj, dstname, symlinks, ignore, 490 copy_function, dirs_exist_ok=dirs_exist_ok) 491 else: 492 copy_function(srcobj, dstname) 493 elif srcentry.is_dir(): 494 copytree(srcobj, dstname, symlinks, ignore, copy_function, 495 dirs_exist_ok=dirs_exist_ok) 496 else: 497 # Will raise a SpecialFileError for unsupported file types 498 copy_function(srcobj, dstname) 499 # catch the Error from the recursive copytree so that we can 500 # continue with other files 501 except Error as err: 502 errors.extend(err.args[0]) 503 except OSError as why: 504 errors.append((srcname, dstname, str(why))) 505 try: 506 copystat(src, dst) 507 except OSError as why: 508 # Copying file access times may fail on Windows 509 if getattr(why, 'winerror', None) is None: 510 errors.append((src, dst, str(why))) 511 if errors: 512 raise Error(errors) 513 return dst 514 515def copytree(src, dst, symlinks=False, ignore=None, copy_function=copy2, 516 ignore_dangling_symlinks=False, dirs_exist_ok=False): 517 """Recursively copy a directory tree and return the destination directory. 518 519 dirs_exist_ok dictates whether to raise an exception in case dst or any 520 missing parent directory already exists. 521 522 If exception(s) occur, an Error is raised with a list of reasons. 523 524 If the optional symlinks flag is true, symbolic links in the 525 source tree result in symbolic links in the destination tree; if 526 it is false, the contents of the files pointed to by symbolic 527 links are copied. If the file pointed by the symlink doesn't 528 exist, an exception will be added in the list of errors raised in 529 an Error exception at the end of the copy process. 530 531 You can set the optional ignore_dangling_symlinks flag to true if you 532 want to silence this exception. Notice that this has no effect on 533 platforms that don't support os.symlink. 534 535 The optional ignore argument is a callable. If given, it 536 is called with the `src` parameter, which is the directory 537 being visited by copytree(), and `names` which is the list of 538 `src` contents, as returned by os.listdir(): 539 540 callable(src, names) -> ignored_names 541 542 Since copytree() is called recursively, the callable will be 543 called once for each directory that is copied. It returns a 544 list of names relative to the `src` directory that should 545 not be copied. 546 547 The optional copy_function argument is a callable that will be used 548 to copy each file. It will be called with the source path and the 549 destination path as arguments. By default, copy2() is used, but any 550 function that supports the same signature (like copy()) can be used. 551 552 """ 553 sys.audit("shutil.copytree", src, dst) 554 with os.scandir(src) as itr: 555 entries = list(itr) 556 return _copytree(entries=entries, src=src, dst=dst, symlinks=symlinks, 557 ignore=ignore, copy_function=copy_function, 558 ignore_dangling_symlinks=ignore_dangling_symlinks, 559 dirs_exist_ok=dirs_exist_ok) 560 561if hasattr(os.stat_result, 'st_file_attributes'): 562 # Special handling for directory junctions to make them behave like 563 # symlinks for shutil.rmtree, since in general they do not appear as 564 # regular links. 565 def _rmtree_isdir(entry): 566 try: 567 st = entry.stat(follow_symlinks=False) 568 return (stat.S_ISDIR(st.st_mode) and not 569 (st.st_file_attributes & stat.FILE_ATTRIBUTE_REPARSE_POINT 570 and st.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT)) 571 except OSError: 572 return False 573 574 def _rmtree_islink(path): 575 try: 576 st = os.lstat(path) 577 return (stat.S_ISLNK(st.st_mode) or 578 (st.st_file_attributes & stat.FILE_ATTRIBUTE_REPARSE_POINT 579 and st.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT)) 580 except OSError: 581 return False 582else: 583 def _rmtree_isdir(entry): 584 try: 585 return entry.is_dir(follow_symlinks=False) 586 except OSError: 587 return False 588 589 def _rmtree_islink(path): 590 return os.path.islink(path) 591 592# version vulnerable to race conditions 593def _rmtree_unsafe(path, onerror): 594 try: 595 with os.scandir(path) as scandir_it: 596 entries = list(scandir_it) 597 except OSError: 598 onerror(os.scandir, path, sys.exc_info()) 599 entries = [] 600 for entry in entries: 601 fullname = entry.path 602 if _rmtree_isdir(entry): 603 try: 604 if entry.is_symlink(): 605 # This can only happen if someone replaces 606 # a directory with a symlink after the call to 607 # os.scandir or entry.is_dir above. 608 raise OSError("Cannot call rmtree on a symbolic link") 609 except OSError: 610 onerror(os.path.islink, fullname, sys.exc_info()) 611 continue 612 _rmtree_unsafe(fullname, onerror) 613 else: 614 try: 615 os.unlink(fullname) 616 except OSError: 617 onerror(os.unlink, fullname, sys.exc_info()) 618 try: 619 os.rmdir(path) 620 except OSError: 621 onerror(os.rmdir, path, sys.exc_info()) 622 623# Version using fd-based APIs to protect against races 624def _rmtree_safe_fd(topfd, path, onerror): 625 try: 626 with os.scandir(topfd) as scandir_it: 627 entries = list(scandir_it) 628 except OSError as err: 629 err.filename = path 630 onerror(os.scandir, path, sys.exc_info()) 631 return 632 for entry in entries: 633 fullname = os.path.join(path, entry.name) 634 try: 635 is_dir = entry.is_dir(follow_symlinks=False) 636 except OSError: 637 is_dir = False 638 else: 639 if is_dir: 640 try: 641 orig_st = entry.stat(follow_symlinks=False) 642 is_dir = stat.S_ISDIR(orig_st.st_mode) 643 except OSError: 644 onerror(os.lstat, fullname, sys.exc_info()) 645 continue 646 if is_dir: 647 try: 648 dirfd = os.open(entry.name, os.O_RDONLY, dir_fd=topfd) 649 dirfd_closed = False 650 except OSError: 651 onerror(os.open, fullname, sys.exc_info()) 652 else: 653 try: 654 if os.path.samestat(orig_st, os.fstat(dirfd)): 655 _rmtree_safe_fd(dirfd, fullname, onerror) 656 try: 657 os.close(dirfd) 658 dirfd_closed = True 659 os.rmdir(entry.name, dir_fd=topfd) 660 except OSError: 661 onerror(os.rmdir, fullname, sys.exc_info()) 662 else: 663 try: 664 # This can only happen if someone replaces 665 # a directory with a symlink after the call to 666 # os.scandir or stat.S_ISDIR above. 667 raise OSError("Cannot call rmtree on a symbolic " 668 "link") 669 except OSError: 670 onerror(os.path.islink, fullname, sys.exc_info()) 671 finally: 672 if not dirfd_closed: 673 os.close(dirfd) 674 else: 675 try: 676 os.unlink(entry.name, dir_fd=topfd) 677 except OSError: 678 onerror(os.unlink, fullname, sys.exc_info()) 679 680_use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <= 681 os.supports_dir_fd and 682 os.scandir in os.supports_fd and 683 os.stat in os.supports_follow_symlinks) 684 685def rmtree(path, ignore_errors=False, onerror=None): 686 """Recursively delete a directory tree. 687 688 If ignore_errors is set, errors are ignored; otherwise, if onerror 689 is set, it is called to handle the error with arguments (func, 690 path, exc_info) where func is platform and implementation dependent; 691 path is the argument to that function that caused it to fail; and 692 exc_info is a tuple returned by sys.exc_info(). If ignore_errors 693 is false and onerror is None, an exception is raised. 694 695 """ 696 sys.audit("shutil.rmtree", path) 697 if ignore_errors: 698 def onerror(*args): 699 pass 700 elif onerror is None: 701 def onerror(*args): 702 raise 703 if _use_fd_functions: 704 # While the unsafe rmtree works fine on bytes, the fd based does not. 705 if isinstance(path, bytes): 706 path = os.fsdecode(path) 707 # Note: To guard against symlink races, we use the standard 708 # lstat()/open()/fstat() trick. 709 try: 710 orig_st = os.lstat(path) 711 except Exception: 712 onerror(os.lstat, path, sys.exc_info()) 713 return 714 try: 715 fd = os.open(path, os.O_RDONLY) 716 fd_closed = False 717 except Exception: 718 onerror(os.open, path, sys.exc_info()) 719 return 720 try: 721 if os.path.samestat(orig_st, os.fstat(fd)): 722 _rmtree_safe_fd(fd, path, onerror) 723 try: 724 os.close(fd) 725 fd_closed = True 726 os.rmdir(path) 727 except OSError: 728 onerror(os.rmdir, path, sys.exc_info()) 729 else: 730 try: 731 # symlinks to directories are forbidden, see bug #1669 732 raise OSError("Cannot call rmtree on a symbolic link") 733 except OSError: 734 onerror(os.path.islink, path, sys.exc_info()) 735 finally: 736 if not fd_closed: 737 os.close(fd) 738 else: 739 try: 740 if _rmtree_islink(path): 741 # symlinks to directories are forbidden, see bug #1669 742 raise OSError("Cannot call rmtree on a symbolic link") 743 except OSError: 744 onerror(os.path.islink, path, sys.exc_info()) 745 # can't continue even if onerror hook returns 746 return 747 return _rmtree_unsafe(path, onerror) 748 749# Allow introspection of whether or not the hardening against symlink 750# attacks is supported on the current platform 751rmtree.avoids_symlink_attacks = _use_fd_functions 752 753def _basename(path): 754 """A basename() variant which first strips the trailing slash, if present. 755 Thus we always get the last component of the path, even for directories. 756 757 path: Union[PathLike, str] 758 759 e.g. 760 >>> os.path.basename('/bar/foo') 761 'foo' 762 >>> os.path.basename('/bar/foo/') 763 '' 764 >>> _basename('/bar/foo/') 765 'foo' 766 """ 767 path = os.fspath(path) 768 sep = os.path.sep + (os.path.altsep or '') 769 return os.path.basename(path.rstrip(sep)) 770 771def move(src, dst, copy_function=copy2): 772 """Recursively move a file or directory to another location. This is 773 similar to the Unix "mv" command. Return the file or directory's 774 destination. 775 776 If the destination is a directory or a symlink to a directory, the source 777 is moved inside the directory. The destination path must not already 778 exist. 779 780 If the destination already exists but is not a directory, it may be 781 overwritten depending on os.rename() semantics. 782 783 If the destination is on our current filesystem, then rename() is used. 784 Otherwise, src is copied to the destination and then removed. Symlinks are 785 recreated under the new name if os.rename() fails because of cross 786 filesystem renames. 787 788 The optional `copy_function` argument is a callable that will be used 789 to copy the source or it will be delegated to `copytree`. 790 By default, copy2() is used, but any function that supports the same 791 signature (like copy()) can be used. 792 793 A lot more could be done here... A look at a mv.c shows a lot of 794 the issues this implementation glosses over. 795 796 """ 797 sys.audit("shutil.move", src, dst) 798 real_dst = dst 799 if os.path.isdir(dst): 800 if _samefile(src, dst): 801 # We might be on a case insensitive filesystem, 802 # perform the rename anyway. 803 os.rename(src, dst) 804 return 805 806 # Using _basename instead of os.path.basename is important, as we must 807 # ignore any trailing slash to avoid the basename returning '' 808 real_dst = os.path.join(dst, _basename(src)) 809 810 if os.path.exists(real_dst): 811 raise Error("Destination path '%s' already exists" % real_dst) 812 try: 813 os.rename(src, real_dst) 814 except OSError: 815 if os.path.islink(src): 816 linkto = os.readlink(src) 817 os.symlink(linkto, real_dst) 818 os.unlink(src) 819 elif os.path.isdir(src): 820 if _destinsrc(src, dst): 821 raise Error("Cannot move a directory '%s' into itself" 822 " '%s'." % (src, dst)) 823 if (_is_immutable(src) 824 or (not os.access(src, os.W_OK) and os.listdir(src) 825 and sys.platform == 'darwin')): 826 raise PermissionError("Cannot move the non-empty directory " 827 "'%s': Lacking write permission to '%s'." 828 % (src, src)) 829 copytree(src, real_dst, copy_function=copy_function, 830 symlinks=True) 831 rmtree(src) 832 else: 833 copy_function(src, real_dst) 834 os.unlink(src) 835 return real_dst 836 837def _destinsrc(src, dst): 838 src = os.path.abspath(src) 839 dst = os.path.abspath(dst) 840 if not src.endswith(os.path.sep): 841 src += os.path.sep 842 if not dst.endswith(os.path.sep): 843 dst += os.path.sep 844 return dst.startswith(src) 845 846def _is_immutable(src): 847 st = _stat(src) 848 immutable_states = [stat.UF_IMMUTABLE, stat.SF_IMMUTABLE] 849 return hasattr(st, 'st_flags') and st.st_flags in immutable_states 850 851def _get_gid(name): 852 """Returns a gid, given a group name.""" 853 if name is None: 854 return None 855 856 try: 857 from grp import getgrnam 858 except ImportError: 859 return None 860 861 try: 862 result = getgrnam(name) 863 except KeyError: 864 result = None 865 if result is not None: 866 return result[2] 867 return None 868 869def _get_uid(name): 870 """Returns an uid, given a user name.""" 871 if name is None: 872 return None 873 874 try: 875 from pwd import getpwnam 876 except ImportError: 877 return None 878 879 try: 880 result = getpwnam(name) 881 except KeyError: 882 result = None 883 if result is not None: 884 return result[2] 885 return None 886 887def _make_tarball(base_name, base_dir, compress="gzip", verbose=0, dry_run=0, 888 owner=None, group=None, logger=None): 889 """Create a (possibly compressed) tar file from all the files under 890 'base_dir'. 891 892 'compress' must be "gzip" (the default), "bzip2", "xz", or None. 893 894 'owner' and 'group' can be used to define an owner and a group for the 895 archive that is being built. If not provided, the current owner and group 896 will be used. 897 898 The output tar file will be named 'base_name' + ".tar", possibly plus 899 the appropriate compression extension (".gz", ".bz2", or ".xz"). 900 901 Returns the output filename. 902 """ 903 if compress is None: 904 tar_compression = '' 905 elif _ZLIB_SUPPORTED and compress == 'gzip': 906 tar_compression = 'gz' 907 elif _BZ2_SUPPORTED and compress == 'bzip2': 908 tar_compression = 'bz2' 909 elif _LZMA_SUPPORTED and compress == 'xz': 910 tar_compression = 'xz' 911 else: 912 raise ValueError("bad value for 'compress', or compression format not " 913 "supported : {0}".format(compress)) 914 915 import tarfile # late import for breaking circular dependency 916 917 compress_ext = '.' + tar_compression if compress else '' 918 archive_name = base_name + '.tar' + compress_ext 919 archive_dir = os.path.dirname(archive_name) 920 921 if archive_dir and not os.path.exists(archive_dir): 922 if logger is not None: 923 logger.info("creating %s", archive_dir) 924 if not dry_run: 925 os.makedirs(archive_dir) 926 927 # creating the tarball 928 if logger is not None: 929 logger.info('Creating tar archive') 930 931 uid = _get_uid(owner) 932 gid = _get_gid(group) 933 934 def _set_uid_gid(tarinfo): 935 if gid is not None: 936 tarinfo.gid = gid 937 tarinfo.gname = group 938 if uid is not None: 939 tarinfo.uid = uid 940 tarinfo.uname = owner 941 return tarinfo 942 943 if not dry_run: 944 tar = tarfile.open(archive_name, 'w|%s' % tar_compression) 945 try: 946 tar.add(base_dir, filter=_set_uid_gid) 947 finally: 948 tar.close() 949 950 return archive_name 951 952def _make_zipfile(base_name, base_dir, verbose=0, dry_run=0, logger=None): 953 """Create a zip file from all the files under 'base_dir'. 954 955 The output zip file will be named 'base_name' + ".zip". Returns the 956 name of the output zip file. 957 """ 958 import zipfile # late import for breaking circular dependency 959 960 zip_filename = base_name + ".zip" 961 archive_dir = os.path.dirname(base_name) 962 963 if archive_dir and not os.path.exists(archive_dir): 964 if logger is not None: 965 logger.info("creating %s", archive_dir) 966 if not dry_run: 967 os.makedirs(archive_dir) 968 969 if logger is not None: 970 logger.info("creating '%s' and adding '%s' to it", 971 zip_filename, base_dir) 972 973 if not dry_run: 974 with zipfile.ZipFile(zip_filename, "w", 975 compression=zipfile.ZIP_DEFLATED) as zf: 976 path = os.path.normpath(base_dir) 977 if path != os.curdir: 978 zf.write(path, path) 979 if logger is not None: 980 logger.info("adding '%s'", path) 981 for dirpath, dirnames, filenames in os.walk(base_dir): 982 for name in sorted(dirnames): 983 path = os.path.normpath(os.path.join(dirpath, name)) 984 zf.write(path, path) 985 if logger is not None: 986 logger.info("adding '%s'", path) 987 for name in filenames: 988 path = os.path.normpath(os.path.join(dirpath, name)) 989 if os.path.isfile(path): 990 zf.write(path, path) 991 if logger is not None: 992 logger.info("adding '%s'", path) 993 994 return zip_filename 995 996_ARCHIVE_FORMATS = { 997 'tar': (_make_tarball, [('compress', None)], "uncompressed tar file"), 998} 999 1000if _ZLIB_SUPPORTED: 1001 _ARCHIVE_FORMATS['gztar'] = (_make_tarball, [('compress', 'gzip')], 1002 "gzip'ed tar-file") 1003 _ARCHIVE_FORMATS['zip'] = (_make_zipfile, [], "ZIP file") 1004 1005if _BZ2_SUPPORTED: 1006 _ARCHIVE_FORMATS['bztar'] = (_make_tarball, [('compress', 'bzip2')], 1007 "bzip2'ed tar-file") 1008 1009if _LZMA_SUPPORTED: 1010 _ARCHIVE_FORMATS['xztar'] = (_make_tarball, [('compress', 'xz')], 1011 "xz'ed tar-file") 1012 1013def get_archive_formats(): 1014 """Returns a list of supported formats for archiving and unarchiving. 1015 1016 Each element of the returned sequence is a tuple (name, description) 1017 """ 1018 formats = [(name, registry[2]) for name, registry in 1019 _ARCHIVE_FORMATS.items()] 1020 formats.sort() 1021 return formats 1022 1023def register_archive_format(name, function, extra_args=None, description=''): 1024 """Registers an archive format. 1025 1026 name is the name of the format. function is the callable that will be 1027 used to create archives. If provided, extra_args is a sequence of 1028 (name, value) tuples that will be passed as arguments to the callable. 1029 description can be provided to describe the format, and will be returned 1030 by the get_archive_formats() function. 1031 """ 1032 if extra_args is None: 1033 extra_args = [] 1034 if not callable(function): 1035 raise TypeError('The %s object is not callable' % function) 1036 if not isinstance(extra_args, (tuple, list)): 1037 raise TypeError('extra_args needs to be a sequence') 1038 for element in extra_args: 1039 if not isinstance(element, (tuple, list)) or len(element) !=2: 1040 raise TypeError('extra_args elements are : (arg_name, value)') 1041 1042 _ARCHIVE_FORMATS[name] = (function, extra_args, description) 1043 1044def unregister_archive_format(name): 1045 del _ARCHIVE_FORMATS[name] 1046 1047def make_archive(base_name, format, root_dir=None, base_dir=None, verbose=0, 1048 dry_run=0, owner=None, group=None, logger=None): 1049 """Create an archive file (eg. zip or tar). 1050 1051 'base_name' is the name of the file to create, minus any format-specific 1052 extension; 'format' is the archive format: one of "zip", "tar", "gztar", 1053 "bztar", or "xztar". Or any other registered format. 1054 1055 'root_dir' is a directory that will be the root directory of the 1056 archive; ie. we typically chdir into 'root_dir' before creating the 1057 archive. 'base_dir' is the directory where we start archiving from; 1058 ie. 'base_dir' will be the common prefix of all files and 1059 directories in the archive. 'root_dir' and 'base_dir' both default 1060 to the current directory. Returns the name of the archive file. 1061 1062 'owner' and 'group' are used when creating a tar archive. By default, 1063 uses the current owner and group. 1064 """ 1065 sys.audit("shutil.make_archive", base_name, format, root_dir, base_dir) 1066 save_cwd = os.getcwd() 1067 if root_dir is not None: 1068 if logger is not None: 1069 logger.debug("changing into '%s'", root_dir) 1070 base_name = os.path.abspath(base_name) 1071 if not dry_run: 1072 os.chdir(root_dir) 1073 1074 if base_dir is None: 1075 base_dir = os.curdir 1076 1077 kwargs = {'dry_run': dry_run, 'logger': logger} 1078 1079 try: 1080 format_info = _ARCHIVE_FORMATS[format] 1081 except KeyError: 1082 raise ValueError("unknown archive format '%s'" % format) from None 1083 1084 func = format_info[0] 1085 for arg, val in format_info[1]: 1086 kwargs[arg] = val 1087 1088 if format != 'zip': 1089 kwargs['owner'] = owner 1090 kwargs['group'] = group 1091 1092 try: 1093 filename = func(base_name, base_dir, **kwargs) 1094 finally: 1095 if root_dir is not None: 1096 if logger is not None: 1097 logger.debug("changing back to '%s'", save_cwd) 1098 os.chdir(save_cwd) 1099 1100 return filename 1101 1102 1103def get_unpack_formats(): 1104 """Returns a list of supported formats for unpacking. 1105 1106 Each element of the returned sequence is a tuple 1107 (name, extensions, description) 1108 """ 1109 formats = [(name, info[0], info[3]) for name, info in 1110 _UNPACK_FORMATS.items()] 1111 formats.sort() 1112 return formats 1113 1114def _check_unpack_options(extensions, function, extra_args): 1115 """Checks what gets registered as an unpacker.""" 1116 # first make sure no other unpacker is registered for this extension 1117 existing_extensions = {} 1118 for name, info in _UNPACK_FORMATS.items(): 1119 for ext in info[0]: 1120 existing_extensions[ext] = name 1121 1122 for extension in extensions: 1123 if extension in existing_extensions: 1124 msg = '%s is already registered for "%s"' 1125 raise RegistryError(msg % (extension, 1126 existing_extensions[extension])) 1127 1128 if not callable(function): 1129 raise TypeError('The registered function must be a callable') 1130 1131 1132def register_unpack_format(name, extensions, function, extra_args=None, 1133 description=''): 1134 """Registers an unpack format. 1135 1136 `name` is the name of the format. `extensions` is a list of extensions 1137 corresponding to the format. 1138 1139 `function` is the callable that will be 1140 used to unpack archives. The callable will receive archives to unpack. 1141 If it's unable to handle an archive, it needs to raise a ReadError 1142 exception. 1143 1144 If provided, `extra_args` is a sequence of 1145 (name, value) tuples that will be passed as arguments to the callable. 1146 description can be provided to describe the format, and will be returned 1147 by the get_unpack_formats() function. 1148 """ 1149 if extra_args is None: 1150 extra_args = [] 1151 _check_unpack_options(extensions, function, extra_args) 1152 _UNPACK_FORMATS[name] = extensions, function, extra_args, description 1153 1154def unregister_unpack_format(name): 1155 """Removes the pack format from the registry.""" 1156 del _UNPACK_FORMATS[name] 1157 1158def _ensure_directory(path): 1159 """Ensure that the parent directory of `path` exists""" 1160 dirname = os.path.dirname(path) 1161 if not os.path.isdir(dirname): 1162 os.makedirs(dirname) 1163 1164def _unpack_zipfile(filename, extract_dir): 1165 """Unpack zip `filename` to `extract_dir` 1166 """ 1167 import zipfile # late import for breaking circular dependency 1168 1169 if not zipfile.is_zipfile(filename): 1170 raise ReadError("%s is not a zip file" % filename) 1171 1172 zip = zipfile.ZipFile(filename) 1173 try: 1174 for info in zip.infolist(): 1175 name = info.filename 1176 1177 # don't extract absolute paths or ones with .. in them 1178 if name.startswith('/') or '..' in name: 1179 continue 1180 1181 targetpath = os.path.join(extract_dir, *name.split('/')) 1182 if not targetpath: 1183 continue 1184 1185 _ensure_directory(targetpath) 1186 if not name.endswith('/'): 1187 # file 1188 with zip.open(name, 'r') as source, \ 1189 open(targetpath, 'wb') as target: 1190 copyfileobj(source, target) 1191 finally: 1192 zip.close() 1193 1194def _unpack_tarfile(filename, extract_dir): 1195 """Unpack tar/tar.gz/tar.bz2/tar.xz `filename` to `extract_dir` 1196 """ 1197 import tarfile # late import for breaking circular dependency 1198 try: 1199 tarobj = tarfile.open(filename) 1200 except tarfile.TarError: 1201 raise ReadError( 1202 "%s is not a compressed or uncompressed tar file" % filename) 1203 try: 1204 tarobj.extractall(extract_dir) 1205 finally: 1206 tarobj.close() 1207 1208_UNPACK_FORMATS = { 1209 'tar': (['.tar'], _unpack_tarfile, [], "uncompressed tar file"), 1210 'zip': (['.zip'], _unpack_zipfile, [], "ZIP file"), 1211} 1212 1213if _ZLIB_SUPPORTED: 1214 _UNPACK_FORMATS['gztar'] = (['.tar.gz', '.tgz'], _unpack_tarfile, [], 1215 "gzip'ed tar-file") 1216 1217if _BZ2_SUPPORTED: 1218 _UNPACK_FORMATS['bztar'] = (['.tar.bz2', '.tbz2'], _unpack_tarfile, [], 1219 "bzip2'ed tar-file") 1220 1221if _LZMA_SUPPORTED: 1222 _UNPACK_FORMATS['xztar'] = (['.tar.xz', '.txz'], _unpack_tarfile, [], 1223 "xz'ed tar-file") 1224 1225def _find_unpack_format(filename): 1226 for name, info in _UNPACK_FORMATS.items(): 1227 for extension in info[0]: 1228 if filename.endswith(extension): 1229 return name 1230 return None 1231 1232def unpack_archive(filename, extract_dir=None, format=None): 1233 """Unpack an archive. 1234 1235 `filename` is the name of the archive. 1236 1237 `extract_dir` is the name of the target directory, where the archive 1238 is unpacked. If not provided, the current working directory is used. 1239 1240 `format` is the archive format: one of "zip", "tar", "gztar", "bztar", 1241 or "xztar". Or any other registered format. If not provided, 1242 unpack_archive will use the filename extension and see if an unpacker 1243 was registered for that extension. 1244 1245 In case none is found, a ValueError is raised. 1246 """ 1247 sys.audit("shutil.unpack_archive", filename, extract_dir, format) 1248 1249 if extract_dir is None: 1250 extract_dir = os.getcwd() 1251 1252 extract_dir = os.fspath(extract_dir) 1253 filename = os.fspath(filename) 1254 1255 if format is not None: 1256 try: 1257 format_info = _UNPACK_FORMATS[format] 1258 except KeyError: 1259 raise ValueError("Unknown unpack format '{0}'".format(format)) from None 1260 1261 func = format_info[1] 1262 func(filename, extract_dir, **dict(format_info[2])) 1263 else: 1264 # we need to look at the registered unpackers supported extensions 1265 format = _find_unpack_format(filename) 1266 if format is None: 1267 raise ReadError("Unknown archive format '{0}'".format(filename)) 1268 1269 func = _UNPACK_FORMATS[format][1] 1270 kwargs = dict(_UNPACK_FORMATS[format][2]) 1271 func(filename, extract_dir, **kwargs) 1272 1273 1274if hasattr(os, 'statvfs'): 1275 1276 __all__.append('disk_usage') 1277 _ntuple_diskusage = collections.namedtuple('usage', 'total used free') 1278 _ntuple_diskusage.total.__doc__ = 'Total space in bytes' 1279 _ntuple_diskusage.used.__doc__ = 'Used space in bytes' 1280 _ntuple_diskusage.free.__doc__ = 'Free space in bytes' 1281 1282 def disk_usage(path): 1283 """Return disk usage statistics about the given path. 1284 1285 Returned value is a named tuple with attributes 'total', 'used' and 1286 'free', which are the amount of total, used and free space, in bytes. 1287 """ 1288 st = os.statvfs(path) 1289 free = st.f_bavail * st.f_frsize 1290 total = st.f_blocks * st.f_frsize 1291 used = (st.f_blocks - st.f_bfree) * st.f_frsize 1292 return _ntuple_diskusage(total, used, free) 1293 1294elif _WINDOWS: 1295 1296 __all__.append('disk_usage') 1297 _ntuple_diskusage = collections.namedtuple('usage', 'total used free') 1298 1299 def disk_usage(path): 1300 """Return disk usage statistics about the given path. 1301 1302 Returned values is a named tuple with attributes 'total', 'used' and 1303 'free', which are the amount of total, used and free space, in bytes. 1304 """ 1305 total, free = nt._getdiskusage(path) 1306 used = total - free 1307 return _ntuple_diskusage(total, used, free) 1308 1309 1310def chown(path, user=None, group=None): 1311 """Change owner user and group of the given path. 1312 1313 user and group can be the uid/gid or the user/group names, and in that case, 1314 they are converted to their respective uid/gid. 1315 """ 1316 sys.audit('shutil.chown', path, user, group) 1317 1318 if user is None and group is None: 1319 raise ValueError("user and/or group must be set") 1320 1321 _user = user 1322 _group = group 1323 1324 # -1 means don't change it 1325 if user is None: 1326 _user = -1 1327 # user can either be an int (the uid) or a string (the system username) 1328 elif isinstance(user, str): 1329 _user = _get_uid(user) 1330 if _user is None: 1331 raise LookupError("no such user: {!r}".format(user)) 1332 1333 if group is None: 1334 _group = -1 1335 elif not isinstance(group, int): 1336 _group = _get_gid(group) 1337 if _group is None: 1338 raise LookupError("no such group: {!r}".format(group)) 1339 1340 os.chown(path, _user, _group) 1341 1342def get_terminal_size(fallback=(80, 24)): 1343 """Get the size of the terminal window. 1344 1345 For each of the two dimensions, the environment variable, COLUMNS 1346 and LINES respectively, is checked. If the variable is defined and 1347 the value is a positive integer, it is used. 1348 1349 When COLUMNS or LINES is not defined, which is the common case, 1350 the terminal connected to sys.__stdout__ is queried 1351 by invoking os.get_terminal_size. 1352 1353 If the terminal size cannot be successfully queried, either because 1354 the system doesn't support querying, or because we are not 1355 connected to a terminal, the value given in fallback parameter 1356 is used. Fallback defaults to (80, 24) which is the default 1357 size used by many terminal emulators. 1358 1359 The value returned is a named tuple of type os.terminal_size. 1360 """ 1361 # columns, lines are the working values 1362 try: 1363 columns = int(os.environ['COLUMNS']) 1364 except (KeyError, ValueError): 1365 columns = 0 1366 1367 try: 1368 lines = int(os.environ['LINES']) 1369 except (KeyError, ValueError): 1370 lines = 0 1371 1372 # only query if necessary 1373 if columns <= 0 or lines <= 0: 1374 try: 1375 size = os.get_terminal_size(sys.__stdout__.fileno()) 1376 except (AttributeError, ValueError, OSError): 1377 # stdout is None, closed, detached, or not a terminal, or 1378 # os.get_terminal_size() is unsupported 1379 size = os.terminal_size(fallback) 1380 if columns <= 0: 1381 columns = size.columns 1382 if lines <= 0: 1383 lines = size.lines 1384 1385 return os.terminal_size((columns, lines)) 1386 1387 1388# Check that a given file can be accessed with the correct mode. 1389# Additionally check that `file` is not a directory, as on Windows 1390# directories pass the os.access check. 1391def _access_check(fn, mode): 1392 return (os.path.exists(fn) and os.access(fn, mode) 1393 and not os.path.isdir(fn)) 1394 1395 1396def which(cmd, mode=os.F_OK | os.X_OK, path=None): 1397 """Given a command, mode, and a PATH string, return the path which 1398 conforms to the given mode on the PATH, or None if there is no such 1399 file. 1400 1401 `mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result 1402 of os.environ.get("PATH"), or can be overridden with a custom search 1403 path. 1404 1405 """ 1406 # If we're given a path with a directory part, look it up directly rather 1407 # than referring to PATH directories. This includes checking relative to the 1408 # current directory, e.g. ./script 1409 if os.path.dirname(cmd): 1410 if _access_check(cmd, mode): 1411 return cmd 1412 return None 1413 1414 use_bytes = isinstance(cmd, bytes) 1415 1416 if path is None: 1417 path = os.environ.get("PATH", None) 1418 if path is None: 1419 try: 1420 path = os.confstr("CS_PATH") 1421 except (AttributeError, ValueError): 1422 # os.confstr() or CS_PATH is not available 1423 path = os.defpath 1424 # bpo-35755: Don't use os.defpath if the PATH environment variable is 1425 # set to an empty string 1426 1427 # PATH='' doesn't match, whereas PATH=':' looks in the current directory 1428 if not path: 1429 return None 1430 1431 if use_bytes: 1432 path = os.fsencode(path) 1433 path = path.split(os.fsencode(os.pathsep)) 1434 else: 1435 path = os.fsdecode(path) 1436 path = path.split(os.pathsep) 1437 1438 if sys.platform == "win32": 1439 # The current directory takes precedence on Windows. 1440 curdir = os.curdir 1441 if use_bytes: 1442 curdir = os.fsencode(curdir) 1443 if curdir not in path: 1444 path.insert(0, curdir) 1445 1446 # PATHEXT is necessary to check on Windows. 1447 pathext_source = os.getenv("PATHEXT") or _WIN_DEFAULT_PATHEXT 1448 pathext = [ext for ext in pathext_source.split(os.pathsep) if ext] 1449 1450 if use_bytes: 1451 pathext = [os.fsencode(ext) for ext in pathext] 1452 # See if the given file matches any of the expected path extensions. 1453 # This will allow us to short circuit when given "python.exe". 1454 # If it does match, only test that one, otherwise we have to try 1455 # others. 1456 if any(cmd.lower().endswith(ext.lower()) for ext in pathext): 1457 files = [cmd] 1458 else: 1459 files = [cmd + ext for ext in pathext] 1460 else: 1461 # On other platforms you don't have things like PATHEXT to tell you 1462 # what file suffixes are executable, so just pass on cmd as-is. 1463 files = [cmd] 1464 1465 seen = set() 1466 for dir in path: 1467 normdir = os.path.normcase(dir) 1468 if not normdir in seen: 1469 seen.add(normdir) 1470 for thefile in files: 1471 name = os.path.join(dir, thefile) 1472 if _access_check(name, mode): 1473 return name 1474 return None 1475