1"""Utility functions for copying and archiving files and directory trees. 2 3XXX The functions here don't copy the resource fork or other metadata on Mac. 4 5""" 6 7import os 8import sys 9import stat 10import fnmatch 11import collections 12import errno 13 14try: 15 import zlib 16 del zlib 17 _ZLIB_SUPPORTED = True 18except ImportError: 19 _ZLIB_SUPPORTED = False 20 21try: 22 import bz2 23 del bz2 24 _BZ2_SUPPORTED = True 25except ImportError: 26 _BZ2_SUPPORTED = False 27 28try: 29 import lzma 30 del lzma 31 _LZMA_SUPPORTED = True 32except ImportError: 33 _LZMA_SUPPORTED = False 34 35_WINDOWS = os.name == 'nt' 36posix = nt = None 37if os.name == 'posix': 38 import posix 39elif _WINDOWS: 40 import nt 41 42if sys.platform == 'win32': 43 import _winapi 44else: 45 _winapi = None 46 47COPY_BUFSIZE = 1024 * 1024 if _WINDOWS else 64 * 1024 48# This should never be removed, see rationale in: 49# https://bugs.python.org/issue43743#msg393429 50_USE_CP_SENDFILE = (hasattr(os, "sendfile") 51 and sys.platform.startswith(("linux", "android"))) 52_HAS_FCOPYFILE = posix and hasattr(posix, "_fcopyfile") # macOS 53 54# CMD defaults in Windows 10 55_WIN_DEFAULT_PATHEXT = ".COM;.EXE;.BAT;.CMD;.VBS;.JS;.WS;.MSC" 56 57__all__ = ["copyfileobj", "copyfile", "copymode", "copystat", "copy", "copy2", 58 "copytree", "move", "rmtree", "Error", "SpecialFileError", 59 "ExecError", "make_archive", "get_archive_formats", 60 "register_archive_format", "unregister_archive_format", 61 "get_unpack_formats", "register_unpack_format", 62 "unregister_unpack_format", "unpack_archive", 63 "ignore_patterns", "chown", "which", "get_terminal_size", 64 "SameFileError"] 65 # disk_usage is added later, if available on the platform 66 67class Error(OSError): 68 pass 69 70class SameFileError(Error): 71 """Raised when source and destination are the same file.""" 72 73class SpecialFileError(OSError): 74 """Raised when trying to do a kind of operation (e.g. copying) which is 75 not supported on a special file (e.g. a named pipe)""" 76 77class ExecError(OSError): 78 """Raised when a command could not be executed""" 79 80class ReadError(OSError): 81 """Raised when an archive cannot be read""" 82 83class RegistryError(Exception): 84 """Raised when a registry operation with the archiving 85 and unpacking registries fails""" 86 87class _GiveupOnFastCopy(Exception): 88 """Raised as a signal to fallback on using raw read()/write() 89 file copy when fast-copy functions fail to do so. 90 """ 91 92def _fastcopy_fcopyfile(fsrc, fdst, flags): 93 """Copy a regular file content or metadata by using high-performance 94 fcopyfile(3) syscall (macOS). 95 """ 96 try: 97 infd = fsrc.fileno() 98 outfd = fdst.fileno() 99 except Exception as err: 100 raise _GiveupOnFastCopy(err) # not a regular file 101 102 try: 103 posix._fcopyfile(infd, outfd, flags) 104 except OSError as err: 105 err.filename = fsrc.name 106 err.filename2 = fdst.name 107 if err.errno in {errno.EINVAL, errno.ENOTSUP}: 108 raise _GiveupOnFastCopy(err) 109 else: 110 raise err from None 111 112def _fastcopy_sendfile(fsrc, fdst): 113 """Copy data from one regular mmap-like fd to another by using 114 high-performance sendfile(2) syscall. 115 This should work on Linux >= 2.6.33 only. 116 """ 117 # Note: copyfileobj() is left alone in order to not introduce any 118 # unexpected breakage. Possible risks by using zero-copy calls 119 # in copyfileobj() are: 120 # - fdst cannot be open in "a"(ppend) mode 121 # - fsrc and fdst may be open in "t"(ext) mode 122 # - fsrc may be a BufferedReader (which hides unread data in a buffer), 123 # GzipFile (which decompresses data), HTTPResponse (which decodes 124 # chunks). 125 # - possibly others (e.g. encrypted fs/partition?) 126 global _USE_CP_SENDFILE 127 try: 128 infd = fsrc.fileno() 129 outfd = fdst.fileno() 130 except Exception as err: 131 raise _GiveupOnFastCopy(err) # not a regular file 132 133 # Hopefully the whole file will be copied in a single call. 134 # sendfile() is called in a loop 'till EOF is reached (0 return) 135 # so a bufsize smaller or bigger than the actual file size 136 # should not make any difference, also in case the file content 137 # changes while being copied. 138 try: 139 blocksize = max(os.fstat(infd).st_size, 2 ** 23) # min 8MiB 140 except OSError: 141 blocksize = 2 ** 27 # 128MiB 142 # On 32-bit architectures truncate to 1GiB to avoid OverflowError, 143 # see bpo-38319. 144 if sys.maxsize < 2 ** 32: 145 blocksize = min(blocksize, 2 ** 30) 146 147 offset = 0 148 while True: 149 try: 150 sent = os.sendfile(outfd, infd, offset, blocksize) 151 except OSError as err: 152 # ...in oder to have a more informative exception. 153 err.filename = fsrc.name 154 err.filename2 = fdst.name 155 156 if err.errno == errno.ENOTSOCK: 157 # sendfile() on this platform (probably Linux < 2.6.33) 158 # does not support copies between regular files (only 159 # sockets). 160 _USE_CP_SENDFILE = False 161 raise _GiveupOnFastCopy(err) 162 163 if err.errno == errno.ENOSPC: # filesystem is full 164 raise err from None 165 166 # Give up on first call and if no data was copied. 167 if offset == 0 and os.lseek(outfd, 0, os.SEEK_CUR) == 0: 168 raise _GiveupOnFastCopy(err) 169 170 raise err 171 else: 172 if sent == 0: 173 break # EOF 174 offset += sent 175 176def _copyfileobj_readinto(fsrc, fdst, length=COPY_BUFSIZE): 177 """readinto()/memoryview() based variant of copyfileobj(). 178 *fsrc* must support readinto() method and both files must be 179 open in binary mode. 180 """ 181 # Localize variable access to minimize overhead. 182 fsrc_readinto = fsrc.readinto 183 fdst_write = fdst.write 184 with memoryview(bytearray(length)) as mv: 185 while True: 186 n = fsrc_readinto(mv) 187 if not n: 188 break 189 elif n < length: 190 with mv[:n] as smv: 191 fdst_write(smv) 192 break 193 else: 194 fdst_write(mv) 195 196def copyfileobj(fsrc, fdst, length=0): 197 """copy data from file-like object fsrc to file-like object fdst""" 198 if not length: 199 length = COPY_BUFSIZE 200 # Localize variable access to minimize overhead. 201 fsrc_read = fsrc.read 202 fdst_write = fdst.write 203 while buf := fsrc_read(length): 204 fdst_write(buf) 205 206def _samefile(src, dst): 207 # Macintosh, Unix. 208 if isinstance(src, os.DirEntry) and hasattr(os.path, 'samestat'): 209 try: 210 return os.path.samestat(src.stat(), os.stat(dst)) 211 except OSError: 212 return False 213 214 if hasattr(os.path, 'samefile'): 215 try: 216 return os.path.samefile(src, dst) 217 except OSError: 218 return False 219 220 # All other platforms: check for same pathname. 221 return (os.path.normcase(os.path.abspath(src)) == 222 os.path.normcase(os.path.abspath(dst))) 223 224def _stat(fn): 225 return fn.stat() if isinstance(fn, os.DirEntry) else os.stat(fn) 226 227def _islink(fn): 228 return fn.is_symlink() if isinstance(fn, os.DirEntry) else os.path.islink(fn) 229 230def copyfile(src, dst, *, follow_symlinks=True): 231 """Copy data from src to dst in the most efficient way possible. 232 233 If follow_symlinks is not set and src is a symbolic link, a new 234 symlink will be created instead of copying the file it points to. 235 236 """ 237 sys.audit("shutil.copyfile", src, dst) 238 239 if _samefile(src, dst): 240 raise SameFileError("{!r} and {!r} are the same file".format(src, dst)) 241 242 file_size = 0 243 for i, fn in enumerate([src, dst]): 244 try: 245 st = _stat(fn) 246 except OSError: 247 # File most likely does not exist 248 pass 249 else: 250 # XXX What about other special files? (sockets, devices...) 251 if stat.S_ISFIFO(st.st_mode): 252 fn = fn.path if isinstance(fn, os.DirEntry) else fn 253 raise SpecialFileError("`%s` is a named pipe" % fn) 254 if _WINDOWS and i == 0: 255 file_size = st.st_size 256 257 if not follow_symlinks and _islink(src): 258 os.symlink(os.readlink(src), dst) 259 else: 260 with open(src, 'rb') as fsrc: 261 try: 262 with open(dst, 'wb') as fdst: 263 # macOS 264 if _HAS_FCOPYFILE: 265 try: 266 _fastcopy_fcopyfile(fsrc, fdst, posix._COPYFILE_DATA) 267 return dst 268 except _GiveupOnFastCopy: 269 pass 270 # Linux 271 elif _USE_CP_SENDFILE: 272 try: 273 _fastcopy_sendfile(fsrc, fdst) 274 return dst 275 except _GiveupOnFastCopy: 276 pass 277 # Windows, see: 278 # https://github.com/python/cpython/pull/7160#discussion_r195405230 279 elif _WINDOWS and file_size > 0: 280 _copyfileobj_readinto(fsrc, fdst, min(file_size, COPY_BUFSIZE)) 281 return dst 282 283 copyfileobj(fsrc, fdst) 284 285 # Issue 43219, raise a less confusing exception 286 except IsADirectoryError as e: 287 if not os.path.exists(dst): 288 raise FileNotFoundError(f'Directory does not exist: {dst}') from e 289 else: 290 raise 291 292 return dst 293 294def copymode(src, dst, *, follow_symlinks=True): 295 """Copy mode bits from src to dst. 296 297 If follow_symlinks is not set, symlinks aren't followed if and only 298 if both `src` and `dst` are symlinks. If `lchmod` isn't available 299 (e.g. Linux) this method does nothing. 300 301 """ 302 sys.audit("shutil.copymode", src, dst) 303 304 if not follow_symlinks and _islink(src) and os.path.islink(dst): 305 if hasattr(os, 'lchmod'): 306 stat_func, chmod_func = os.lstat, os.lchmod 307 else: 308 return 309 else: 310 stat_func = _stat 311 if os.name == 'nt' and os.path.islink(dst): 312 def chmod_func(*args): 313 os.chmod(*args, follow_symlinks=True) 314 else: 315 chmod_func = os.chmod 316 317 st = stat_func(src) 318 chmod_func(dst, stat.S_IMODE(st.st_mode)) 319 320if hasattr(os, 'listxattr'): 321 def _copyxattr(src, dst, *, follow_symlinks=True): 322 """Copy extended filesystem attributes from `src` to `dst`. 323 324 Overwrite existing attributes. 325 326 If `follow_symlinks` is false, symlinks won't be followed. 327 328 """ 329 330 try: 331 names = os.listxattr(src, follow_symlinks=follow_symlinks) 332 except OSError as e: 333 if e.errno not in (errno.ENOTSUP, errno.ENODATA, errno.EINVAL): 334 raise 335 return 336 for name in names: 337 try: 338 value = os.getxattr(src, name, follow_symlinks=follow_symlinks) 339 os.setxattr(dst, name, value, follow_symlinks=follow_symlinks) 340 except OSError as e: 341 if e.errno not in (errno.EPERM, errno.ENOTSUP, errno.ENODATA, 342 errno.EINVAL, errno.EACCES): 343 raise 344else: 345 def _copyxattr(*args, **kwargs): 346 pass 347 348def copystat(src, dst, *, follow_symlinks=True): 349 """Copy file metadata 350 351 Copy the permission bits, last access time, last modification time, and 352 flags from `src` to `dst`. On Linux, copystat() also copies the "extended 353 attributes" where possible. The file contents, owner, and group are 354 unaffected. `src` and `dst` are path-like objects or path names given as 355 strings. 356 357 If the optional flag `follow_symlinks` is not set, symlinks aren't 358 followed if and only if both `src` and `dst` are symlinks. 359 """ 360 sys.audit("shutil.copystat", src, dst) 361 362 def _nop(*args, ns=None, follow_symlinks=None): 363 pass 364 365 # follow symlinks (aka don't not follow symlinks) 366 follow = follow_symlinks or not (_islink(src) and os.path.islink(dst)) 367 if follow: 368 # use the real function if it exists 369 def lookup(name): 370 return getattr(os, name, _nop) 371 else: 372 # use the real function only if it exists 373 # *and* it supports follow_symlinks 374 def lookup(name): 375 fn = getattr(os, name, _nop) 376 if fn in os.supports_follow_symlinks: 377 return fn 378 return _nop 379 380 if isinstance(src, os.DirEntry): 381 st = src.stat(follow_symlinks=follow) 382 else: 383 st = lookup("stat")(src, follow_symlinks=follow) 384 mode = stat.S_IMODE(st.st_mode) 385 lookup("utime")(dst, ns=(st.st_atime_ns, st.st_mtime_ns), 386 follow_symlinks=follow) 387 # We must copy extended attributes before the file is (potentially) 388 # chmod()'ed read-only, otherwise setxattr() will error with -EACCES. 389 _copyxattr(src, dst, follow_symlinks=follow) 390 try: 391 lookup("chmod")(dst, mode, follow_symlinks=follow) 392 except NotImplementedError: 393 # if we got a NotImplementedError, it's because 394 # * follow_symlinks=False, 395 # * lchown() is unavailable, and 396 # * either 397 # * fchownat() is unavailable or 398 # * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW. 399 # (it returned ENOSUP.) 400 # therefore we're out of options--we simply cannot chown the 401 # symlink. give up, suppress the error. 402 # (which is what shutil always did in this circumstance.) 403 pass 404 if hasattr(st, 'st_flags'): 405 try: 406 lookup("chflags")(dst, st.st_flags, follow_symlinks=follow) 407 except OSError as why: 408 for err in 'EOPNOTSUPP', 'ENOTSUP': 409 if hasattr(errno, err) and why.errno == getattr(errno, err): 410 break 411 else: 412 raise 413 414def copy(src, dst, *, follow_symlinks=True): 415 """Copy data and mode bits ("cp src dst"). Return the file's destination. 416 417 The destination may be a directory. 418 419 If follow_symlinks is false, symlinks won't be followed. This 420 resembles GNU's "cp -P src dst". 421 422 If source and destination are the same file, a SameFileError will be 423 raised. 424 425 """ 426 if os.path.isdir(dst): 427 dst = os.path.join(dst, os.path.basename(src)) 428 copyfile(src, dst, follow_symlinks=follow_symlinks) 429 copymode(src, dst, follow_symlinks=follow_symlinks) 430 return dst 431 432def copy2(src, dst, *, follow_symlinks=True): 433 """Copy data and metadata. Return the file's destination. 434 435 Metadata is copied with copystat(). Please see the copystat function 436 for more information. 437 438 The destination may be a directory. 439 440 If follow_symlinks is false, symlinks won't be followed. This 441 resembles GNU's "cp -P src dst". 442 """ 443 if os.path.isdir(dst): 444 dst = os.path.join(dst, os.path.basename(src)) 445 446 if hasattr(_winapi, "CopyFile2"): 447 src_ = os.fsdecode(src) 448 dst_ = os.fsdecode(dst) 449 flags = _winapi.COPY_FILE_ALLOW_DECRYPTED_DESTINATION # for compat 450 if not follow_symlinks: 451 flags |= _winapi.COPY_FILE_COPY_SYMLINK 452 try: 453 _winapi.CopyFile2(src_, dst_, flags) 454 return dst 455 except OSError as exc: 456 if (exc.winerror == _winapi.ERROR_PRIVILEGE_NOT_HELD 457 and not follow_symlinks): 458 # Likely encountered a symlink we aren't allowed to create. 459 # Fall back on the old code 460 pass 461 elif exc.winerror == _winapi.ERROR_ACCESS_DENIED: 462 # Possibly encountered a hidden or readonly file we can't 463 # overwrite. Fall back on old code 464 pass 465 else: 466 raise 467 468 copyfile(src, dst, follow_symlinks=follow_symlinks) 469 copystat(src, dst, follow_symlinks=follow_symlinks) 470 return dst 471 472def ignore_patterns(*patterns): 473 """Function that can be used as copytree() ignore parameter. 474 475 Patterns is a sequence of glob-style patterns 476 that are used to exclude files""" 477 def _ignore_patterns(path, names): 478 ignored_names = [] 479 for pattern in patterns: 480 ignored_names.extend(fnmatch.filter(names, pattern)) 481 return set(ignored_names) 482 return _ignore_patterns 483 484def _copytree(entries, src, dst, symlinks, ignore, copy_function, 485 ignore_dangling_symlinks, dirs_exist_ok=False): 486 if ignore is not None: 487 ignored_names = ignore(os.fspath(src), [x.name for x in entries]) 488 else: 489 ignored_names = () 490 491 os.makedirs(dst, exist_ok=dirs_exist_ok) 492 errors = [] 493 use_srcentry = copy_function is copy2 or copy_function is copy 494 495 for srcentry in entries: 496 if srcentry.name in ignored_names: 497 continue 498 srcname = os.path.join(src, srcentry.name) 499 dstname = os.path.join(dst, srcentry.name) 500 srcobj = srcentry if use_srcentry else srcname 501 try: 502 is_symlink = srcentry.is_symlink() 503 if is_symlink and os.name == 'nt': 504 # Special check for directory junctions, which appear as 505 # symlinks but we want to recurse. 506 lstat = srcentry.stat(follow_symlinks=False) 507 if lstat.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT: 508 is_symlink = False 509 if is_symlink: 510 linkto = os.readlink(srcname) 511 if symlinks: 512 # We can't just leave it to `copy_function` because legacy 513 # code with a custom `copy_function` may rely on copytree 514 # doing the right thing. 515 os.symlink(linkto, dstname) 516 copystat(srcobj, dstname, follow_symlinks=not symlinks) 517 else: 518 # ignore dangling symlink if the flag is on 519 if not os.path.exists(linkto) and ignore_dangling_symlinks: 520 continue 521 # otherwise let the copy occur. copy2 will raise an error 522 if srcentry.is_dir(): 523 copytree(srcobj, dstname, symlinks, ignore, 524 copy_function, ignore_dangling_symlinks, 525 dirs_exist_ok) 526 else: 527 copy_function(srcobj, dstname) 528 elif srcentry.is_dir(): 529 copytree(srcobj, dstname, symlinks, ignore, copy_function, 530 ignore_dangling_symlinks, dirs_exist_ok) 531 else: 532 # Will raise a SpecialFileError for unsupported file types 533 copy_function(srcobj, dstname) 534 # catch the Error from the recursive copytree so that we can 535 # continue with other files 536 except Error as err: 537 errors.extend(err.args[0]) 538 except OSError as why: 539 errors.append((srcname, dstname, str(why))) 540 try: 541 copystat(src, dst) 542 except OSError as why: 543 # Copying file access times may fail on Windows 544 if getattr(why, 'winerror', None) is None: 545 errors.append((src, dst, str(why))) 546 if errors: 547 raise Error(errors) 548 return dst 549 550def copytree(src, dst, symlinks=False, ignore=None, copy_function=copy2, 551 ignore_dangling_symlinks=False, dirs_exist_ok=False): 552 """Recursively copy a directory tree and return the destination directory. 553 554 If exception(s) occur, an Error is raised with a list of reasons. 555 556 If the optional symlinks flag is true, symbolic links in the 557 source tree result in symbolic links in the destination tree; if 558 it is false, the contents of the files pointed to by symbolic 559 links are copied. If the file pointed to by the symlink doesn't 560 exist, an exception will be added in the list of errors raised in 561 an Error exception at the end of the copy process. 562 563 You can set the optional ignore_dangling_symlinks flag to true if you 564 want to silence this exception. Notice that this has no effect on 565 platforms that don't support os.symlink. 566 567 The optional ignore argument is a callable. If given, it 568 is called with the `src` parameter, which is the directory 569 being visited by copytree(), and `names` which is the list of 570 `src` contents, as returned by os.listdir(): 571 572 callable(src, names) -> ignored_names 573 574 Since copytree() is called recursively, the callable will be 575 called once for each directory that is copied. It returns a 576 list of names relative to the `src` directory that should 577 not be copied. 578 579 The optional copy_function argument is a callable that will be used 580 to copy each file. It will be called with the source path and the 581 destination path as arguments. By default, copy2() is used, but any 582 function that supports the same signature (like copy()) can be used. 583 584 If dirs_exist_ok is false (the default) and `dst` already exists, a 585 `FileExistsError` is raised. If `dirs_exist_ok` is true, the copying 586 operation will continue if it encounters existing directories, and files 587 within the `dst` tree will be overwritten by corresponding files from the 588 `src` tree. 589 """ 590 sys.audit("shutil.copytree", src, dst) 591 with os.scandir(src) as itr: 592 entries = list(itr) 593 return _copytree(entries=entries, src=src, dst=dst, symlinks=symlinks, 594 ignore=ignore, copy_function=copy_function, 595 ignore_dangling_symlinks=ignore_dangling_symlinks, 596 dirs_exist_ok=dirs_exist_ok) 597 598if hasattr(os.stat_result, 'st_file_attributes'): 599 def _rmtree_islink(st): 600 return (stat.S_ISLNK(st.st_mode) or 601 (st.st_file_attributes & stat.FILE_ATTRIBUTE_REPARSE_POINT 602 and st.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT)) 603else: 604 def _rmtree_islink(st): 605 return stat.S_ISLNK(st.st_mode) 606 607# version vulnerable to race conditions 608def _rmtree_unsafe(path, onexc): 609 def onerror(err): 610 if not isinstance(err, FileNotFoundError): 611 onexc(os.scandir, err.filename, err) 612 results = os.walk(path, topdown=False, onerror=onerror, followlinks=os._walk_symlinks_as_files) 613 for dirpath, dirnames, filenames in results: 614 for name in dirnames: 615 fullname = os.path.join(dirpath, name) 616 try: 617 os.rmdir(fullname) 618 except FileNotFoundError: 619 continue 620 except OSError as err: 621 onexc(os.rmdir, fullname, err) 622 for name in filenames: 623 fullname = os.path.join(dirpath, name) 624 try: 625 os.unlink(fullname) 626 except FileNotFoundError: 627 continue 628 except OSError as err: 629 onexc(os.unlink, fullname, err) 630 try: 631 os.rmdir(path) 632 except FileNotFoundError: 633 pass 634 except OSError as err: 635 onexc(os.rmdir, path, err) 636 637# Version using fd-based APIs to protect against races 638def _rmtree_safe_fd(stack, onexc): 639 # Each stack item has four elements: 640 # * func: The first operation to perform: os.lstat, os.close or os.rmdir. 641 # Walking a directory starts with an os.lstat() to detect symlinks; in 642 # this case, func is updated before subsequent operations and passed to 643 # onexc() if an error occurs. 644 # * dirfd: Open file descriptor, or None if we're processing the top-level 645 # directory given to rmtree() and the user didn't supply dir_fd. 646 # * path: Path of file to operate upon. This is passed to onexc() if an 647 # error occurs. 648 # * orig_entry: os.DirEntry, or None if we're processing the top-level 649 # directory given to rmtree(). We used the cached stat() of the entry to 650 # save a call to os.lstat() when walking subdirectories. 651 func, dirfd, path, orig_entry = stack.pop() 652 name = path if orig_entry is None else orig_entry.name 653 try: 654 if func is os.close: 655 os.close(dirfd) 656 return 657 if func is os.rmdir: 658 os.rmdir(name, dir_fd=dirfd) 659 return 660 661 # Note: To guard against symlink races, we use the standard 662 # lstat()/open()/fstat() trick. 663 assert func is os.lstat 664 if orig_entry is None: 665 orig_st = os.lstat(name, dir_fd=dirfd) 666 else: 667 orig_st = orig_entry.stat(follow_symlinks=False) 668 669 func = os.open # For error reporting. 670 topfd = os.open(name, os.O_RDONLY | os.O_NONBLOCK, dir_fd=dirfd) 671 672 func = os.path.islink # For error reporting. 673 try: 674 if not os.path.samestat(orig_st, os.fstat(topfd)): 675 # Symlinks to directories are forbidden, see GH-46010. 676 raise OSError("Cannot call rmtree on a symbolic link") 677 stack.append((os.rmdir, dirfd, path, orig_entry)) 678 finally: 679 stack.append((os.close, topfd, path, orig_entry)) 680 681 func = os.scandir # For error reporting. 682 with os.scandir(topfd) as scandir_it: 683 entries = list(scandir_it) 684 for entry in entries: 685 fullname = os.path.join(path, entry.name) 686 try: 687 if entry.is_dir(follow_symlinks=False): 688 # Traverse into sub-directory. 689 stack.append((os.lstat, topfd, fullname, entry)) 690 continue 691 except FileNotFoundError: 692 continue 693 except OSError: 694 pass 695 try: 696 os.unlink(entry.name, dir_fd=topfd) 697 except FileNotFoundError: 698 continue 699 except OSError as err: 700 onexc(os.unlink, fullname, err) 701 except FileNotFoundError as err: 702 if orig_entry is None or func is os.close: 703 err.filename = path 704 onexc(func, path, err) 705 except OSError as err: 706 err.filename = path 707 onexc(func, path, err) 708 709_use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <= 710 os.supports_dir_fd and 711 os.scandir in os.supports_fd and 712 os.stat in os.supports_follow_symlinks) 713 714def rmtree(path, ignore_errors=False, onerror=None, *, onexc=None, dir_fd=None): 715 """Recursively delete a directory tree. 716 717 If dir_fd is not None, it should be a file descriptor open to a directory; 718 path will then be relative to that directory. 719 dir_fd may not be implemented on your platform. 720 If it is unavailable, using it will raise a NotImplementedError. 721 722 If ignore_errors is set, errors are ignored; otherwise, if onexc or 723 onerror is set, it is called to handle the error with arguments (func, 724 path, exc_info) where func is platform and implementation dependent; 725 path is the argument to that function that caused it to fail; and 726 the value of exc_info describes the exception. For onexc it is the 727 exception instance, and for onerror it is a tuple as returned by 728 sys.exc_info(). If ignore_errors is false and both onexc and 729 onerror are None, the exception is reraised. 730 731 onerror is deprecated and only remains for backwards compatibility. 732 If both onerror and onexc are set, onerror is ignored and onexc is used. 733 """ 734 735 sys.audit("shutil.rmtree", path, dir_fd) 736 if ignore_errors: 737 def onexc(*args): 738 pass 739 elif onerror is None and onexc is None: 740 def onexc(*args): 741 raise 742 elif onexc is None: 743 if onerror is None: 744 def onexc(*args): 745 raise 746 else: 747 # delegate to onerror 748 def onexc(*args): 749 func, path, exc = args 750 if exc is None: 751 exc_info = None, None, None 752 else: 753 exc_info = type(exc), exc, exc.__traceback__ 754 return onerror(func, path, exc_info) 755 756 if _use_fd_functions: 757 # While the unsafe rmtree works fine on bytes, the fd based does not. 758 if isinstance(path, bytes): 759 path = os.fsdecode(path) 760 stack = [(os.lstat, dir_fd, path, None)] 761 try: 762 while stack: 763 _rmtree_safe_fd(stack, onexc) 764 finally: 765 # Close any file descriptors still on the stack. 766 while stack: 767 func, fd, path, entry = stack.pop() 768 if func is not os.close: 769 continue 770 try: 771 os.close(fd) 772 except OSError as err: 773 onexc(os.close, path, err) 774 else: 775 if dir_fd is not None: 776 raise NotImplementedError("dir_fd unavailable on this platform") 777 try: 778 st = os.lstat(path) 779 except OSError as err: 780 onexc(os.lstat, path, err) 781 return 782 try: 783 if _rmtree_islink(st): 784 # symlinks to directories are forbidden, see bug #1669 785 raise OSError("Cannot call rmtree on a symbolic link") 786 except OSError as err: 787 onexc(os.path.islink, path, err) 788 # can't continue even if onexc hook returns 789 return 790 return _rmtree_unsafe(path, onexc) 791 792# Allow introspection of whether or not the hardening against symlink 793# attacks is supported on the current platform 794rmtree.avoids_symlink_attacks = _use_fd_functions 795 796def _basename(path): 797 """A basename() variant which first strips the trailing slash, if present. 798 Thus we always get the last component of the path, even for directories. 799 800 path: Union[PathLike, str] 801 802 e.g. 803 >>> os.path.basename('/bar/foo') 804 'foo' 805 >>> os.path.basename('/bar/foo/') 806 '' 807 >>> _basename('/bar/foo/') 808 'foo' 809 """ 810 path = os.fspath(path) 811 sep = os.path.sep + (os.path.altsep or '') 812 return os.path.basename(path.rstrip(sep)) 813 814def move(src, dst, copy_function=copy2): 815 """Recursively move a file or directory to another location. This is 816 similar to the Unix "mv" command. Return the file or directory's 817 destination. 818 819 If dst is an existing directory or a symlink to a directory, then src is 820 moved inside that directory. The destination path in that directory must 821 not already exist. 822 823 If dst already exists but is not a directory, it may be overwritten 824 depending on os.rename() semantics. 825 826 If the destination is on our current filesystem, then rename() is used. 827 Otherwise, src is copied to the destination and then removed. Symlinks are 828 recreated under the new name if os.rename() fails because of cross 829 filesystem renames. 830 831 The optional `copy_function` argument is a callable that will be used 832 to copy the source or it will be delegated to `copytree`. 833 By default, copy2() is used, but any function that supports the same 834 signature (like copy()) can be used. 835 836 A lot more could be done here... A look at a mv.c shows a lot of 837 the issues this implementation glosses over. 838 839 """ 840 sys.audit("shutil.move", src, dst) 841 real_dst = dst 842 if os.path.isdir(dst): 843 if _samefile(src, dst) and not os.path.islink(src): 844 # We might be on a case insensitive filesystem, 845 # perform the rename anyway. 846 os.rename(src, dst) 847 return 848 849 # Using _basename instead of os.path.basename is important, as we must 850 # ignore any trailing slash to avoid the basename returning '' 851 real_dst = os.path.join(dst, _basename(src)) 852 853 if os.path.exists(real_dst): 854 raise Error("Destination path '%s' already exists" % real_dst) 855 try: 856 os.rename(src, real_dst) 857 except OSError: 858 if os.path.islink(src): 859 linkto = os.readlink(src) 860 os.symlink(linkto, real_dst) 861 os.unlink(src) 862 elif os.path.isdir(src): 863 if _destinsrc(src, dst): 864 raise Error("Cannot move a directory '%s' into itself" 865 " '%s'." % (src, dst)) 866 if (_is_immutable(src) 867 or (not os.access(src, os.W_OK) and os.listdir(src) 868 and sys.platform == 'darwin')): 869 raise PermissionError("Cannot move the non-empty directory " 870 "'%s': Lacking write permission to '%s'." 871 % (src, src)) 872 copytree(src, real_dst, copy_function=copy_function, 873 symlinks=True) 874 rmtree(src) 875 else: 876 copy_function(src, real_dst) 877 os.unlink(src) 878 return real_dst 879 880def _destinsrc(src, dst): 881 src = os.path.abspath(src) 882 dst = os.path.abspath(dst) 883 if not src.endswith(os.path.sep): 884 src += os.path.sep 885 if not dst.endswith(os.path.sep): 886 dst += os.path.sep 887 return dst.startswith(src) 888 889def _is_immutable(src): 890 st = _stat(src) 891 immutable_states = [stat.UF_IMMUTABLE, stat.SF_IMMUTABLE] 892 return hasattr(st, 'st_flags') and st.st_flags in immutable_states 893 894def _get_gid(name): 895 """Returns a gid, given a group name.""" 896 if name is None: 897 return None 898 899 try: 900 from grp import getgrnam 901 except ImportError: 902 return None 903 904 try: 905 result = getgrnam(name) 906 except KeyError: 907 result = None 908 if result is not None: 909 return result[2] 910 return None 911 912def _get_uid(name): 913 """Returns an uid, given a user name.""" 914 if name is None: 915 return None 916 917 try: 918 from pwd import getpwnam 919 except ImportError: 920 return None 921 922 try: 923 result = getpwnam(name) 924 except KeyError: 925 result = None 926 if result is not None: 927 return result[2] 928 return None 929 930def _make_tarball(base_name, base_dir, compress="gzip", verbose=0, dry_run=0, 931 owner=None, group=None, logger=None, root_dir=None): 932 """Create a (possibly compressed) tar file from all the files under 933 'base_dir'. 934 935 'compress' must be "gzip" (the default), "bzip2", "xz", or None. 936 937 'owner' and 'group' can be used to define an owner and a group for the 938 archive that is being built. If not provided, the current owner and group 939 will be used. 940 941 The output tar file will be named 'base_name' + ".tar", possibly plus 942 the appropriate compression extension (".gz", ".bz2", or ".xz"). 943 944 Returns the output filename. 945 """ 946 if compress is None: 947 tar_compression = '' 948 elif _ZLIB_SUPPORTED and compress == 'gzip': 949 tar_compression = 'gz' 950 elif _BZ2_SUPPORTED and compress == 'bzip2': 951 tar_compression = 'bz2' 952 elif _LZMA_SUPPORTED and compress == 'xz': 953 tar_compression = 'xz' 954 else: 955 raise ValueError("bad value for 'compress', or compression format not " 956 "supported : {0}".format(compress)) 957 958 import tarfile # late import for breaking circular dependency 959 960 compress_ext = '.' + tar_compression if compress else '' 961 archive_name = base_name + '.tar' + compress_ext 962 archive_dir = os.path.dirname(archive_name) 963 964 if archive_dir and not os.path.exists(archive_dir): 965 if logger is not None: 966 logger.info("creating %s", archive_dir) 967 if not dry_run: 968 os.makedirs(archive_dir) 969 970 # creating the tarball 971 if logger is not None: 972 logger.info('Creating tar archive') 973 974 uid = _get_uid(owner) 975 gid = _get_gid(group) 976 977 def _set_uid_gid(tarinfo): 978 if gid is not None: 979 tarinfo.gid = gid 980 tarinfo.gname = group 981 if uid is not None: 982 tarinfo.uid = uid 983 tarinfo.uname = owner 984 return tarinfo 985 986 if not dry_run: 987 tar = tarfile.open(archive_name, 'w|%s' % tar_compression) 988 arcname = base_dir 989 if root_dir is not None: 990 base_dir = os.path.join(root_dir, base_dir) 991 try: 992 tar.add(base_dir, arcname, filter=_set_uid_gid) 993 finally: 994 tar.close() 995 996 if root_dir is not None: 997 archive_name = os.path.abspath(archive_name) 998 return archive_name 999 1000def _make_zipfile(base_name, base_dir, verbose=0, dry_run=0, 1001 logger=None, owner=None, group=None, root_dir=None): 1002 """Create a zip file from all the files under 'base_dir'. 1003 1004 The output zip file will be named 'base_name' + ".zip". Returns the 1005 name of the output zip file. 1006 """ 1007 import zipfile # late import for breaking circular dependency 1008 1009 zip_filename = base_name + ".zip" 1010 archive_dir = os.path.dirname(base_name) 1011 1012 if archive_dir and not os.path.exists(archive_dir): 1013 if logger is not None: 1014 logger.info("creating %s", archive_dir) 1015 if not dry_run: 1016 os.makedirs(archive_dir) 1017 1018 if logger is not None: 1019 logger.info("creating '%s' and adding '%s' to it", 1020 zip_filename, base_dir) 1021 1022 if not dry_run: 1023 with zipfile.ZipFile(zip_filename, "w", 1024 compression=zipfile.ZIP_DEFLATED) as zf: 1025 arcname = os.path.normpath(base_dir) 1026 if root_dir is not None: 1027 base_dir = os.path.join(root_dir, base_dir) 1028 base_dir = os.path.normpath(base_dir) 1029 if arcname != os.curdir: 1030 zf.write(base_dir, arcname) 1031 if logger is not None: 1032 logger.info("adding '%s'", base_dir) 1033 for dirpath, dirnames, filenames in os.walk(base_dir): 1034 arcdirpath = dirpath 1035 if root_dir is not None: 1036 arcdirpath = os.path.relpath(arcdirpath, root_dir) 1037 arcdirpath = os.path.normpath(arcdirpath) 1038 for name in sorted(dirnames): 1039 path = os.path.join(dirpath, name) 1040 arcname = os.path.join(arcdirpath, name) 1041 zf.write(path, arcname) 1042 if logger is not None: 1043 logger.info("adding '%s'", path) 1044 for name in filenames: 1045 path = os.path.join(dirpath, name) 1046 path = os.path.normpath(path) 1047 if os.path.isfile(path): 1048 arcname = os.path.join(arcdirpath, name) 1049 zf.write(path, arcname) 1050 if logger is not None: 1051 logger.info("adding '%s'", path) 1052 1053 if root_dir is not None: 1054 zip_filename = os.path.abspath(zip_filename) 1055 return zip_filename 1056 1057_make_tarball.supports_root_dir = True 1058_make_zipfile.supports_root_dir = True 1059 1060# Maps the name of the archive format to a tuple containing: 1061# * the archiving function 1062# * extra keyword arguments 1063# * description 1064_ARCHIVE_FORMATS = { 1065 'tar': (_make_tarball, [('compress', None)], 1066 "uncompressed tar file"), 1067} 1068 1069if _ZLIB_SUPPORTED: 1070 _ARCHIVE_FORMATS['gztar'] = (_make_tarball, [('compress', 'gzip')], 1071 "gzip'ed tar-file") 1072 _ARCHIVE_FORMATS['zip'] = (_make_zipfile, [], "ZIP file") 1073 1074if _BZ2_SUPPORTED: 1075 _ARCHIVE_FORMATS['bztar'] = (_make_tarball, [('compress', 'bzip2')], 1076 "bzip2'ed tar-file") 1077 1078if _LZMA_SUPPORTED: 1079 _ARCHIVE_FORMATS['xztar'] = (_make_tarball, [('compress', 'xz')], 1080 "xz'ed tar-file") 1081 1082def get_archive_formats(): 1083 """Returns a list of supported formats for archiving and unarchiving. 1084 1085 Each element of the returned sequence is a tuple (name, description) 1086 """ 1087 formats = [(name, registry[2]) for name, registry in 1088 _ARCHIVE_FORMATS.items()] 1089 formats.sort() 1090 return formats 1091 1092def register_archive_format(name, function, extra_args=None, description=''): 1093 """Registers an archive format. 1094 1095 name is the name of the format. function is the callable that will be 1096 used to create archives. If provided, extra_args is a sequence of 1097 (name, value) tuples that will be passed as arguments to the callable. 1098 description can be provided to describe the format, and will be returned 1099 by the get_archive_formats() function. 1100 """ 1101 if extra_args is None: 1102 extra_args = [] 1103 if not callable(function): 1104 raise TypeError('The %s object is not callable' % function) 1105 if not isinstance(extra_args, (tuple, list)): 1106 raise TypeError('extra_args needs to be a sequence') 1107 for element in extra_args: 1108 if not isinstance(element, (tuple, list)) or len(element) !=2: 1109 raise TypeError('extra_args elements are : (arg_name, value)') 1110 1111 _ARCHIVE_FORMATS[name] = (function, extra_args, description) 1112 1113def unregister_archive_format(name): 1114 del _ARCHIVE_FORMATS[name] 1115 1116def make_archive(base_name, format, root_dir=None, base_dir=None, verbose=0, 1117 dry_run=0, owner=None, group=None, logger=None): 1118 """Create an archive file (eg. zip or tar). 1119 1120 'base_name' is the name of the file to create, minus any format-specific 1121 extension; 'format' is the archive format: one of "zip", "tar", "gztar", 1122 "bztar", or "xztar". Or any other registered format. 1123 1124 'root_dir' is a directory that will be the root directory of the 1125 archive; ie. we typically chdir into 'root_dir' before creating the 1126 archive. 'base_dir' is the directory where we start archiving from; 1127 ie. 'base_dir' will be the common prefix of all files and 1128 directories in the archive. 'root_dir' and 'base_dir' both default 1129 to the current directory. Returns the name of the archive file. 1130 1131 'owner' and 'group' are used when creating a tar archive. By default, 1132 uses the current owner and group. 1133 """ 1134 sys.audit("shutil.make_archive", base_name, format, root_dir, base_dir) 1135 try: 1136 format_info = _ARCHIVE_FORMATS[format] 1137 except KeyError: 1138 raise ValueError("unknown archive format '%s'" % format) from None 1139 1140 kwargs = {'dry_run': dry_run, 'logger': logger, 1141 'owner': owner, 'group': group} 1142 1143 func = format_info[0] 1144 for arg, val in format_info[1]: 1145 kwargs[arg] = val 1146 1147 if base_dir is None: 1148 base_dir = os.curdir 1149 1150 supports_root_dir = getattr(func, 'supports_root_dir', False) 1151 save_cwd = None 1152 if root_dir is not None: 1153 stmd = os.stat(root_dir).st_mode 1154 if not stat.S_ISDIR(stmd): 1155 raise NotADirectoryError(errno.ENOTDIR, 'Not a directory', root_dir) 1156 1157 if supports_root_dir: 1158 # Support path-like base_name here for backwards-compatibility. 1159 base_name = os.fspath(base_name) 1160 kwargs['root_dir'] = root_dir 1161 else: 1162 save_cwd = os.getcwd() 1163 if logger is not None: 1164 logger.debug("changing into '%s'", root_dir) 1165 base_name = os.path.abspath(base_name) 1166 if not dry_run: 1167 os.chdir(root_dir) 1168 1169 try: 1170 filename = func(base_name, base_dir, **kwargs) 1171 finally: 1172 if save_cwd is not None: 1173 if logger is not None: 1174 logger.debug("changing back to '%s'", save_cwd) 1175 os.chdir(save_cwd) 1176 1177 return filename 1178 1179 1180def get_unpack_formats(): 1181 """Returns a list of supported formats for unpacking. 1182 1183 Each element of the returned sequence is a tuple 1184 (name, extensions, description) 1185 """ 1186 formats = [(name, info[0], info[3]) for name, info in 1187 _UNPACK_FORMATS.items()] 1188 formats.sort() 1189 return formats 1190 1191def _check_unpack_options(extensions, function, extra_args): 1192 """Checks what gets registered as an unpacker.""" 1193 # first make sure no other unpacker is registered for this extension 1194 existing_extensions = {} 1195 for name, info in _UNPACK_FORMATS.items(): 1196 for ext in info[0]: 1197 existing_extensions[ext] = name 1198 1199 for extension in extensions: 1200 if extension in existing_extensions: 1201 msg = '%s is already registered for "%s"' 1202 raise RegistryError(msg % (extension, 1203 existing_extensions[extension])) 1204 1205 if not callable(function): 1206 raise TypeError('The registered function must be a callable') 1207 1208 1209def register_unpack_format(name, extensions, function, extra_args=None, 1210 description=''): 1211 """Registers an unpack format. 1212 1213 `name` is the name of the format. `extensions` is a list of extensions 1214 corresponding to the format. 1215 1216 `function` is the callable that will be 1217 used to unpack archives. The callable will receive archives to unpack. 1218 If it's unable to handle an archive, it needs to raise a ReadError 1219 exception. 1220 1221 If provided, `extra_args` is a sequence of 1222 (name, value) tuples that will be passed as arguments to the callable. 1223 description can be provided to describe the format, and will be returned 1224 by the get_unpack_formats() function. 1225 """ 1226 if extra_args is None: 1227 extra_args = [] 1228 _check_unpack_options(extensions, function, extra_args) 1229 _UNPACK_FORMATS[name] = extensions, function, extra_args, description 1230 1231def unregister_unpack_format(name): 1232 """Removes the pack format from the registry.""" 1233 del _UNPACK_FORMATS[name] 1234 1235def _ensure_directory(path): 1236 """Ensure that the parent directory of `path` exists""" 1237 dirname = os.path.dirname(path) 1238 if not os.path.isdir(dirname): 1239 os.makedirs(dirname) 1240 1241def _unpack_zipfile(filename, extract_dir): 1242 """Unpack zip `filename` to `extract_dir` 1243 """ 1244 import zipfile # late import for breaking circular dependency 1245 1246 if not zipfile.is_zipfile(filename): 1247 raise ReadError("%s is not a zip file" % filename) 1248 1249 zip = zipfile.ZipFile(filename) 1250 try: 1251 for info in zip.infolist(): 1252 name = info.filename 1253 1254 # don't extract absolute paths or ones with .. in them 1255 if name.startswith('/') or '..' in name: 1256 continue 1257 1258 targetpath = os.path.join(extract_dir, *name.split('/')) 1259 if not targetpath: 1260 continue 1261 1262 _ensure_directory(targetpath) 1263 if not name.endswith('/'): 1264 # file 1265 with zip.open(name, 'r') as source, \ 1266 open(targetpath, 'wb') as target: 1267 copyfileobj(source, target) 1268 finally: 1269 zip.close() 1270 1271def _unpack_tarfile(filename, extract_dir, *, filter=None): 1272 """Unpack tar/tar.gz/tar.bz2/tar.xz `filename` to `extract_dir` 1273 """ 1274 import tarfile # late import for breaking circular dependency 1275 try: 1276 tarobj = tarfile.open(filename) 1277 except tarfile.TarError: 1278 raise ReadError( 1279 "%s is not a compressed or uncompressed tar file" % filename) 1280 try: 1281 tarobj.extractall(extract_dir, filter=filter) 1282 finally: 1283 tarobj.close() 1284 1285# Maps the name of the unpack format to a tuple containing: 1286# * extensions 1287# * the unpacking function 1288# * extra keyword arguments 1289# * description 1290_UNPACK_FORMATS = { 1291 'tar': (['.tar'], _unpack_tarfile, [], "uncompressed tar file"), 1292 'zip': (['.zip'], _unpack_zipfile, [], "ZIP file"), 1293} 1294 1295if _ZLIB_SUPPORTED: 1296 _UNPACK_FORMATS['gztar'] = (['.tar.gz', '.tgz'], _unpack_tarfile, [], 1297 "gzip'ed tar-file") 1298 1299if _BZ2_SUPPORTED: 1300 _UNPACK_FORMATS['bztar'] = (['.tar.bz2', '.tbz2'], _unpack_tarfile, [], 1301 "bzip2'ed tar-file") 1302 1303if _LZMA_SUPPORTED: 1304 _UNPACK_FORMATS['xztar'] = (['.tar.xz', '.txz'], _unpack_tarfile, [], 1305 "xz'ed tar-file") 1306 1307def _find_unpack_format(filename): 1308 for name, info in _UNPACK_FORMATS.items(): 1309 for extension in info[0]: 1310 if filename.endswith(extension): 1311 return name 1312 return None 1313 1314def unpack_archive(filename, extract_dir=None, format=None, *, filter=None): 1315 """Unpack an archive. 1316 1317 `filename` is the name of the archive. 1318 1319 `extract_dir` is the name of the target directory, where the archive 1320 is unpacked. If not provided, the current working directory is used. 1321 1322 `format` is the archive format: one of "zip", "tar", "gztar", "bztar", 1323 or "xztar". Or any other registered format. If not provided, 1324 unpack_archive will use the filename extension and see if an unpacker 1325 was registered for that extension. 1326 1327 In case none is found, a ValueError is raised. 1328 1329 If `filter` is given, it is passed to the underlying 1330 extraction function. 1331 """ 1332 sys.audit("shutil.unpack_archive", filename, extract_dir, format) 1333 1334 if extract_dir is None: 1335 extract_dir = os.getcwd() 1336 1337 extract_dir = os.fspath(extract_dir) 1338 filename = os.fspath(filename) 1339 1340 if filter is None: 1341 filter_kwargs = {} 1342 else: 1343 filter_kwargs = {'filter': filter} 1344 if format is not None: 1345 try: 1346 format_info = _UNPACK_FORMATS[format] 1347 except KeyError: 1348 raise ValueError("Unknown unpack format '{0}'".format(format)) from None 1349 1350 func = format_info[1] 1351 func(filename, extract_dir, **dict(format_info[2]), **filter_kwargs) 1352 else: 1353 # we need to look at the registered unpackers supported extensions 1354 format = _find_unpack_format(filename) 1355 if format is None: 1356 raise ReadError("Unknown archive format '{0}'".format(filename)) 1357 1358 func = _UNPACK_FORMATS[format][1] 1359 kwargs = dict(_UNPACK_FORMATS[format][2]) | filter_kwargs 1360 func(filename, extract_dir, **kwargs) 1361 1362 1363if hasattr(os, 'statvfs'): 1364 1365 __all__.append('disk_usage') 1366 _ntuple_diskusage = collections.namedtuple('usage', 'total used free') 1367 _ntuple_diskusage.total.__doc__ = 'Total space in bytes' 1368 _ntuple_diskusage.used.__doc__ = 'Used space in bytes' 1369 _ntuple_diskusage.free.__doc__ = 'Free space in bytes' 1370 1371 def disk_usage(path): 1372 """Return disk usage statistics about the given path. 1373 1374 Returned value is a named tuple with attributes 'total', 'used' and 1375 'free', which are the amount of total, used and free space, in bytes. 1376 """ 1377 st = os.statvfs(path) 1378 free = st.f_bavail * st.f_frsize 1379 total = st.f_blocks * st.f_frsize 1380 used = (st.f_blocks - st.f_bfree) * st.f_frsize 1381 return _ntuple_diskusage(total, used, free) 1382 1383elif _WINDOWS: 1384 1385 __all__.append('disk_usage') 1386 _ntuple_diskusage = collections.namedtuple('usage', 'total used free') 1387 1388 def disk_usage(path): 1389 """Return disk usage statistics about the given path. 1390 1391 Returned values is a named tuple with attributes 'total', 'used' and 1392 'free', which are the amount of total, used and free space, in bytes. 1393 """ 1394 total, free = nt._getdiskusage(path) 1395 used = total - free 1396 return _ntuple_diskusage(total, used, free) 1397 1398 1399def chown(path, user=None, group=None, *, dir_fd=None, follow_symlinks=True): 1400 """Change owner user and group of the given path. 1401 1402 user and group can be the uid/gid or the user/group names, and in that case, 1403 they are converted to their respective uid/gid. 1404 1405 If dir_fd is set, it should be an open file descriptor to the directory to 1406 be used as the root of *path* if it is relative. 1407 1408 If follow_symlinks is set to False and the last element of the path is a 1409 symbolic link, chown will modify the link itself and not the file being 1410 referenced by the link. 1411 """ 1412 sys.audit('shutil.chown', path, user, group) 1413 1414 if user is None and group is None: 1415 raise ValueError("user and/or group must be set") 1416 1417 _user = user 1418 _group = group 1419 1420 # -1 means don't change it 1421 if user is None: 1422 _user = -1 1423 # user can either be an int (the uid) or a string (the system username) 1424 elif isinstance(user, str): 1425 _user = _get_uid(user) 1426 if _user is None: 1427 raise LookupError("no such user: {!r}".format(user)) 1428 1429 if group is None: 1430 _group = -1 1431 elif not isinstance(group, int): 1432 _group = _get_gid(group) 1433 if _group is None: 1434 raise LookupError("no such group: {!r}".format(group)) 1435 1436 os.chown(path, _user, _group, dir_fd=dir_fd, 1437 follow_symlinks=follow_symlinks) 1438 1439def get_terminal_size(fallback=(80, 24)): 1440 """Get the size of the terminal window. 1441 1442 For each of the two dimensions, the environment variable, COLUMNS 1443 and LINES respectively, is checked. If the variable is defined and 1444 the value is a positive integer, it is used. 1445 1446 When COLUMNS or LINES is not defined, which is the common case, 1447 the terminal connected to sys.__stdout__ is queried 1448 by invoking os.get_terminal_size. 1449 1450 If the terminal size cannot be successfully queried, either because 1451 the system doesn't support querying, or because we are not 1452 connected to a terminal, the value given in fallback parameter 1453 is used. Fallback defaults to (80, 24) which is the default 1454 size used by many terminal emulators. 1455 1456 The value returned is a named tuple of type os.terminal_size. 1457 """ 1458 # columns, lines are the working values 1459 try: 1460 columns = int(os.environ['COLUMNS']) 1461 except (KeyError, ValueError): 1462 columns = 0 1463 1464 try: 1465 lines = int(os.environ['LINES']) 1466 except (KeyError, ValueError): 1467 lines = 0 1468 1469 # only query if necessary 1470 if columns <= 0 or lines <= 0: 1471 try: 1472 size = os.get_terminal_size(sys.__stdout__.fileno()) 1473 except (AttributeError, ValueError, OSError): 1474 # stdout is None, closed, detached, or not a terminal, or 1475 # os.get_terminal_size() is unsupported 1476 size = os.terminal_size(fallback) 1477 if columns <= 0: 1478 columns = size.columns or fallback[0] 1479 if lines <= 0: 1480 lines = size.lines or fallback[1] 1481 1482 return os.terminal_size((columns, lines)) 1483 1484 1485# Check that a given file can be accessed with the correct mode. 1486# Additionally check that `file` is not a directory, as on Windows 1487# directories pass the os.access check. 1488def _access_check(fn, mode): 1489 return (os.path.exists(fn) and os.access(fn, mode) 1490 and not os.path.isdir(fn)) 1491 1492 1493def _win_path_needs_curdir(cmd, mode): 1494 """ 1495 On Windows, we can use NeedCurrentDirectoryForExePath to figure out 1496 if we should add the cwd to PATH when searching for executables if 1497 the mode is executable. 1498 """ 1499 return (not (mode & os.X_OK)) or _winapi.NeedCurrentDirectoryForExePath( 1500 os.fsdecode(cmd)) 1501 1502 1503def which(cmd, mode=os.F_OK | os.X_OK, path=None): 1504 """Given a command, mode, and a PATH string, return the path which 1505 conforms to the given mode on the PATH, or None if there is no such 1506 file. 1507 1508 `mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result 1509 of os.environ.get("PATH"), or can be overridden with a custom search 1510 path. 1511 1512 """ 1513 use_bytes = isinstance(cmd, bytes) 1514 1515 # If we're given a path with a directory part, look it up directly rather 1516 # than referring to PATH directories. This includes checking relative to 1517 # the current directory, e.g. ./script 1518 dirname, cmd = os.path.split(cmd) 1519 if dirname: 1520 path = [dirname] 1521 else: 1522 if path is None: 1523 path = os.environ.get("PATH", None) 1524 if path is None: 1525 try: 1526 path = os.confstr("CS_PATH") 1527 except (AttributeError, ValueError): 1528 # os.confstr() or CS_PATH is not available 1529 path = os.defpath 1530 # bpo-35755: Don't use os.defpath if the PATH environment variable 1531 # is set to an empty string 1532 1533 # PATH='' doesn't match, whereas PATH=':' looks in the current 1534 # directory 1535 if not path: 1536 return None 1537 1538 if use_bytes: 1539 path = os.fsencode(path) 1540 path = path.split(os.fsencode(os.pathsep)) 1541 else: 1542 path = os.fsdecode(path) 1543 path = path.split(os.pathsep) 1544 1545 if sys.platform == "win32" and _win_path_needs_curdir(cmd, mode): 1546 curdir = os.curdir 1547 if use_bytes: 1548 curdir = os.fsencode(curdir) 1549 path.insert(0, curdir) 1550 1551 if sys.platform == "win32": 1552 # PATHEXT is necessary to check on Windows. 1553 pathext_source = os.getenv("PATHEXT") or _WIN_DEFAULT_PATHEXT 1554 pathext = pathext_source.split(os.pathsep) 1555 pathext = [ext.rstrip('.') for ext in pathext if ext] 1556 1557 if use_bytes: 1558 pathext = [os.fsencode(ext) for ext in pathext] 1559 1560 files = [cmd + ext for ext in pathext] 1561 1562 # If X_OK in mode, simulate the cmd.exe behavior: look at direct 1563 # match if and only if the extension is in PATHEXT. 1564 # If X_OK not in mode, simulate the first result of where.exe: 1565 # always look at direct match before a PATHEXT match. 1566 normcmd = cmd.upper() 1567 if not (mode & os.X_OK) or any(normcmd.endswith(ext.upper()) for ext in pathext): 1568 files.insert(0, cmd) 1569 else: 1570 # On other platforms you don't have things like PATHEXT to tell you 1571 # what file suffixes are executable, so just pass on cmd as-is. 1572 files = [cmd] 1573 1574 seen = set() 1575 for dir in path: 1576 normdir = os.path.normcase(dir) 1577 if normdir not in seen: 1578 seen.add(normdir) 1579 for thefile in files: 1580 name = os.path.join(dir, thefile) 1581 if _access_check(name, mode): 1582 return name 1583 return None 1584