• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Utility functions for copying and archiving files and directory trees.
2
3XXX The functions here don't copy the resource fork or other metadata on Mac.
4
5"""
6
7import os
8import sys
9import stat
10import fnmatch
11import collections
12import errno
13
14try:
15    import zlib
16    del zlib
17    _ZLIB_SUPPORTED = True
18except ImportError:
19    _ZLIB_SUPPORTED = False
20
21try:
22    import bz2
23    del bz2
24    _BZ2_SUPPORTED = True
25except ImportError:
26    _BZ2_SUPPORTED = False
27
28try:
29    import lzma
30    del lzma
31    _LZMA_SUPPORTED = True
32except ImportError:
33    _LZMA_SUPPORTED = False
34
35_WINDOWS = os.name == 'nt'
36posix = nt = None
37if os.name == 'posix':
38    import posix
39elif _WINDOWS:
40    import nt
41
42if sys.platform == 'win32':
43    import _winapi
44else:
45    _winapi = None
46
47COPY_BUFSIZE = 1024 * 1024 if _WINDOWS else 64 * 1024
48# This should never be removed, see rationale in:
49# https://bugs.python.org/issue43743#msg393429
50_USE_CP_SENDFILE = (hasattr(os, "sendfile")
51                    and sys.platform.startswith(("linux", "android")))
52_HAS_FCOPYFILE = posix and hasattr(posix, "_fcopyfile")  # macOS
53
54# CMD defaults in Windows 10
55_WIN_DEFAULT_PATHEXT = ".COM;.EXE;.BAT;.CMD;.VBS;.JS;.WS;.MSC"
56
57__all__ = ["copyfileobj", "copyfile", "copymode", "copystat", "copy", "copy2",
58           "copytree", "move", "rmtree", "Error", "SpecialFileError",
59           "ExecError", "make_archive", "get_archive_formats",
60           "register_archive_format", "unregister_archive_format",
61           "get_unpack_formats", "register_unpack_format",
62           "unregister_unpack_format", "unpack_archive",
63           "ignore_patterns", "chown", "which", "get_terminal_size",
64           "SameFileError"]
65           # disk_usage is added later, if available on the platform
66
67class Error(OSError):
68    pass
69
70class SameFileError(Error):
71    """Raised when source and destination are the same file."""
72
73class SpecialFileError(OSError):
74    """Raised when trying to do a kind of operation (e.g. copying) which is
75    not supported on a special file (e.g. a named pipe)"""
76
77class ExecError(OSError):
78    """Raised when a command could not be executed"""
79
80class ReadError(OSError):
81    """Raised when an archive cannot be read"""
82
83class RegistryError(Exception):
84    """Raised when a registry operation with the archiving
85    and unpacking registries fails"""
86
87class _GiveupOnFastCopy(Exception):
88    """Raised as a signal to fallback on using raw read()/write()
89    file copy when fast-copy functions fail to do so.
90    """
91
92def _fastcopy_fcopyfile(fsrc, fdst, flags):
93    """Copy a regular file content or metadata by using high-performance
94    fcopyfile(3) syscall (macOS).
95    """
96    try:
97        infd = fsrc.fileno()
98        outfd = fdst.fileno()
99    except Exception as err:
100        raise _GiveupOnFastCopy(err)  # not a regular file
101
102    try:
103        posix._fcopyfile(infd, outfd, flags)
104    except OSError as err:
105        err.filename = fsrc.name
106        err.filename2 = fdst.name
107        if err.errno in {errno.EINVAL, errno.ENOTSUP}:
108            raise _GiveupOnFastCopy(err)
109        else:
110            raise err from None
111
112def _fastcopy_sendfile(fsrc, fdst):
113    """Copy data from one regular mmap-like fd to another by using
114    high-performance sendfile(2) syscall.
115    This should work on Linux >= 2.6.33 only.
116    """
117    # Note: copyfileobj() is left alone in order to not introduce any
118    # unexpected breakage. Possible risks by using zero-copy calls
119    # in copyfileobj() are:
120    # - fdst cannot be open in "a"(ppend) mode
121    # - fsrc and fdst may be open in "t"(ext) mode
122    # - fsrc may be a BufferedReader (which hides unread data in a buffer),
123    #   GzipFile (which decompresses data), HTTPResponse (which decodes
124    #   chunks).
125    # - possibly others (e.g. encrypted fs/partition?)
126    global _USE_CP_SENDFILE
127    try:
128        infd = fsrc.fileno()
129        outfd = fdst.fileno()
130    except Exception as err:
131        raise _GiveupOnFastCopy(err)  # not a regular file
132
133    # Hopefully the whole file will be copied in a single call.
134    # sendfile() is called in a loop 'till EOF is reached (0 return)
135    # so a bufsize smaller or bigger than the actual file size
136    # should not make any difference, also in case the file content
137    # changes while being copied.
138    try:
139        blocksize = max(os.fstat(infd).st_size, 2 ** 23)  # min 8MiB
140    except OSError:
141        blocksize = 2 ** 27  # 128MiB
142    # On 32-bit architectures truncate to 1GiB to avoid OverflowError,
143    # see bpo-38319.
144    if sys.maxsize < 2 ** 32:
145        blocksize = min(blocksize, 2 ** 30)
146
147    offset = 0
148    while True:
149        try:
150            sent = os.sendfile(outfd, infd, offset, blocksize)
151        except OSError as err:
152            # ...in oder to have a more informative exception.
153            err.filename = fsrc.name
154            err.filename2 = fdst.name
155
156            if err.errno == errno.ENOTSOCK:
157                # sendfile() on this platform (probably Linux < 2.6.33)
158                # does not support copies between regular files (only
159                # sockets).
160                _USE_CP_SENDFILE = False
161                raise _GiveupOnFastCopy(err)
162
163            if err.errno == errno.ENOSPC:  # filesystem is full
164                raise err from None
165
166            # Give up on first call and if no data was copied.
167            if offset == 0 and os.lseek(outfd, 0, os.SEEK_CUR) == 0:
168                raise _GiveupOnFastCopy(err)
169
170            raise err
171        else:
172            if sent == 0:
173                break  # EOF
174            offset += sent
175
176def _copyfileobj_readinto(fsrc, fdst, length=COPY_BUFSIZE):
177    """readinto()/memoryview() based variant of copyfileobj().
178    *fsrc* must support readinto() method and both files must be
179    open in binary mode.
180    """
181    # Localize variable access to minimize overhead.
182    fsrc_readinto = fsrc.readinto
183    fdst_write = fdst.write
184    with memoryview(bytearray(length)) as mv:
185        while True:
186            n = fsrc_readinto(mv)
187            if not n:
188                break
189            elif n < length:
190                with mv[:n] as smv:
191                    fdst_write(smv)
192                break
193            else:
194                fdst_write(mv)
195
196def copyfileobj(fsrc, fdst, length=0):
197    """copy data from file-like object fsrc to file-like object fdst"""
198    if not length:
199        length = COPY_BUFSIZE
200    # Localize variable access to minimize overhead.
201    fsrc_read = fsrc.read
202    fdst_write = fdst.write
203    while buf := fsrc_read(length):
204        fdst_write(buf)
205
206def _samefile(src, dst):
207    # Macintosh, Unix.
208    if isinstance(src, os.DirEntry) and hasattr(os.path, 'samestat'):
209        try:
210            return os.path.samestat(src.stat(), os.stat(dst))
211        except OSError:
212            return False
213
214    if hasattr(os.path, 'samefile'):
215        try:
216            return os.path.samefile(src, dst)
217        except OSError:
218            return False
219
220    # All other platforms: check for same pathname.
221    return (os.path.normcase(os.path.abspath(src)) ==
222            os.path.normcase(os.path.abspath(dst)))
223
224def _stat(fn):
225    return fn.stat() if isinstance(fn, os.DirEntry) else os.stat(fn)
226
227def _islink(fn):
228    return fn.is_symlink() if isinstance(fn, os.DirEntry) else os.path.islink(fn)
229
230def copyfile(src, dst, *, follow_symlinks=True):
231    """Copy data from src to dst in the most efficient way possible.
232
233    If follow_symlinks is not set and src is a symbolic link, a new
234    symlink will be created instead of copying the file it points to.
235
236    """
237    sys.audit("shutil.copyfile", src, dst)
238
239    if _samefile(src, dst):
240        raise SameFileError("{!r} and {!r} are the same file".format(src, dst))
241
242    file_size = 0
243    for i, fn in enumerate([src, dst]):
244        try:
245            st = _stat(fn)
246        except OSError:
247            # File most likely does not exist
248            pass
249        else:
250            # XXX What about other special files? (sockets, devices...)
251            if stat.S_ISFIFO(st.st_mode):
252                fn = fn.path if isinstance(fn, os.DirEntry) else fn
253                raise SpecialFileError("`%s` is a named pipe" % fn)
254            if _WINDOWS and i == 0:
255                file_size = st.st_size
256
257    if not follow_symlinks and _islink(src):
258        os.symlink(os.readlink(src), dst)
259    else:
260        with open(src, 'rb') as fsrc:
261            try:
262                with open(dst, 'wb') as fdst:
263                    # macOS
264                    if _HAS_FCOPYFILE:
265                        try:
266                            _fastcopy_fcopyfile(fsrc, fdst, posix._COPYFILE_DATA)
267                            return dst
268                        except _GiveupOnFastCopy:
269                            pass
270                    # Linux
271                    elif _USE_CP_SENDFILE:
272                        try:
273                            _fastcopy_sendfile(fsrc, fdst)
274                            return dst
275                        except _GiveupOnFastCopy:
276                            pass
277                    # Windows, see:
278                    # https://github.com/python/cpython/pull/7160#discussion_r195405230
279                    elif _WINDOWS and file_size > 0:
280                        _copyfileobj_readinto(fsrc, fdst, min(file_size, COPY_BUFSIZE))
281                        return dst
282
283                    copyfileobj(fsrc, fdst)
284
285            # Issue 43219, raise a less confusing exception
286            except IsADirectoryError as e:
287                if not os.path.exists(dst):
288                    raise FileNotFoundError(f'Directory does not exist: {dst}') from e
289                else:
290                    raise
291
292    return dst
293
294def copymode(src, dst, *, follow_symlinks=True):
295    """Copy mode bits from src to dst.
296
297    If follow_symlinks is not set, symlinks aren't followed if and only
298    if both `src` and `dst` are symlinks.  If `lchmod` isn't available
299    (e.g. Linux) this method does nothing.
300
301    """
302    sys.audit("shutil.copymode", src, dst)
303
304    if not follow_symlinks and _islink(src) and os.path.islink(dst):
305        if hasattr(os, 'lchmod'):
306            stat_func, chmod_func = os.lstat, os.lchmod
307        else:
308            return
309    else:
310        stat_func = _stat
311        if os.name == 'nt' and os.path.islink(dst):
312            def chmod_func(*args):
313                os.chmod(*args, follow_symlinks=True)
314        else:
315            chmod_func = os.chmod
316
317    st = stat_func(src)
318    chmod_func(dst, stat.S_IMODE(st.st_mode))
319
320if hasattr(os, 'listxattr'):
321    def _copyxattr(src, dst, *, follow_symlinks=True):
322        """Copy extended filesystem attributes from `src` to `dst`.
323
324        Overwrite existing attributes.
325
326        If `follow_symlinks` is false, symlinks won't be followed.
327
328        """
329
330        try:
331            names = os.listxattr(src, follow_symlinks=follow_symlinks)
332        except OSError as e:
333            if e.errno not in (errno.ENOTSUP, errno.ENODATA, errno.EINVAL):
334                raise
335            return
336        for name in names:
337            try:
338                value = os.getxattr(src, name, follow_symlinks=follow_symlinks)
339                os.setxattr(dst, name, value, follow_symlinks=follow_symlinks)
340            except OSError as e:
341                if e.errno not in (errno.EPERM, errno.ENOTSUP, errno.ENODATA,
342                                   errno.EINVAL, errno.EACCES):
343                    raise
344else:
345    def _copyxattr(*args, **kwargs):
346        pass
347
348def copystat(src, dst, *, follow_symlinks=True):
349    """Copy file metadata
350
351    Copy the permission bits, last access time, last modification time, and
352    flags from `src` to `dst`. On Linux, copystat() also copies the "extended
353    attributes" where possible. The file contents, owner, and group are
354    unaffected. `src` and `dst` are path-like objects or path names given as
355    strings.
356
357    If the optional flag `follow_symlinks` is not set, symlinks aren't
358    followed if and only if both `src` and `dst` are symlinks.
359    """
360    sys.audit("shutil.copystat", src, dst)
361
362    def _nop(*args, ns=None, follow_symlinks=None):
363        pass
364
365    # follow symlinks (aka don't not follow symlinks)
366    follow = follow_symlinks or not (_islink(src) and os.path.islink(dst))
367    if follow:
368        # use the real function if it exists
369        def lookup(name):
370            return getattr(os, name, _nop)
371    else:
372        # use the real function only if it exists
373        # *and* it supports follow_symlinks
374        def lookup(name):
375            fn = getattr(os, name, _nop)
376            if fn in os.supports_follow_symlinks:
377                return fn
378            return _nop
379
380    if isinstance(src, os.DirEntry):
381        st = src.stat(follow_symlinks=follow)
382    else:
383        st = lookup("stat")(src, follow_symlinks=follow)
384    mode = stat.S_IMODE(st.st_mode)
385    lookup("utime")(dst, ns=(st.st_atime_ns, st.st_mtime_ns),
386        follow_symlinks=follow)
387    # We must copy extended attributes before the file is (potentially)
388    # chmod()'ed read-only, otherwise setxattr() will error with -EACCES.
389    _copyxattr(src, dst, follow_symlinks=follow)
390    try:
391        lookup("chmod")(dst, mode, follow_symlinks=follow)
392    except NotImplementedError:
393        # if we got a NotImplementedError, it's because
394        #   * follow_symlinks=False,
395        #   * lchown() is unavailable, and
396        #   * either
397        #       * fchownat() is unavailable or
398        #       * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW.
399        #         (it returned ENOSUP.)
400        # therefore we're out of options--we simply cannot chown the
401        # symlink.  give up, suppress the error.
402        # (which is what shutil always did in this circumstance.)
403        pass
404    if hasattr(st, 'st_flags'):
405        try:
406            lookup("chflags")(dst, st.st_flags, follow_symlinks=follow)
407        except OSError as why:
408            for err in 'EOPNOTSUPP', 'ENOTSUP':
409                if hasattr(errno, err) and why.errno == getattr(errno, err):
410                    break
411            else:
412                raise
413
414def copy(src, dst, *, follow_symlinks=True):
415    """Copy data and mode bits ("cp src dst"). Return the file's destination.
416
417    The destination may be a directory.
418
419    If follow_symlinks is false, symlinks won't be followed. This
420    resembles GNU's "cp -P src dst".
421
422    If source and destination are the same file, a SameFileError will be
423    raised.
424
425    """
426    if os.path.isdir(dst):
427        dst = os.path.join(dst, os.path.basename(src))
428    copyfile(src, dst, follow_symlinks=follow_symlinks)
429    copymode(src, dst, follow_symlinks=follow_symlinks)
430    return dst
431
432def copy2(src, dst, *, follow_symlinks=True):
433    """Copy data and metadata. Return the file's destination.
434
435    Metadata is copied with copystat(). Please see the copystat function
436    for more information.
437
438    The destination may be a directory.
439
440    If follow_symlinks is false, symlinks won't be followed. This
441    resembles GNU's "cp -P src dst".
442    """
443    if os.path.isdir(dst):
444        dst = os.path.join(dst, os.path.basename(src))
445
446    if hasattr(_winapi, "CopyFile2"):
447        src_ = os.fsdecode(src)
448        dst_ = os.fsdecode(dst)
449        flags = _winapi.COPY_FILE_ALLOW_DECRYPTED_DESTINATION # for compat
450        if not follow_symlinks:
451            flags |= _winapi.COPY_FILE_COPY_SYMLINK
452        try:
453            _winapi.CopyFile2(src_, dst_, flags)
454            return dst
455        except OSError as exc:
456            if (exc.winerror == _winapi.ERROR_PRIVILEGE_NOT_HELD
457                and not follow_symlinks):
458                # Likely encountered a symlink we aren't allowed to create.
459                # Fall back on the old code
460                pass
461            elif exc.winerror == _winapi.ERROR_ACCESS_DENIED:
462                # Possibly encountered a hidden or readonly file we can't
463                # overwrite. Fall back on old code
464                pass
465            else:
466                raise
467
468    copyfile(src, dst, follow_symlinks=follow_symlinks)
469    copystat(src, dst, follow_symlinks=follow_symlinks)
470    return dst
471
472def ignore_patterns(*patterns):
473    """Function that can be used as copytree() ignore parameter.
474
475    Patterns is a sequence of glob-style patterns
476    that are used to exclude files"""
477    def _ignore_patterns(path, names):
478        ignored_names = []
479        for pattern in patterns:
480            ignored_names.extend(fnmatch.filter(names, pattern))
481        return set(ignored_names)
482    return _ignore_patterns
483
484def _copytree(entries, src, dst, symlinks, ignore, copy_function,
485              ignore_dangling_symlinks, dirs_exist_ok=False):
486    if ignore is not None:
487        ignored_names = ignore(os.fspath(src), [x.name for x in entries])
488    else:
489        ignored_names = ()
490
491    os.makedirs(dst, exist_ok=dirs_exist_ok)
492    errors = []
493    use_srcentry = copy_function is copy2 or copy_function is copy
494
495    for srcentry in entries:
496        if srcentry.name in ignored_names:
497            continue
498        srcname = os.path.join(src, srcentry.name)
499        dstname = os.path.join(dst, srcentry.name)
500        srcobj = srcentry if use_srcentry else srcname
501        try:
502            is_symlink = srcentry.is_symlink()
503            if is_symlink and os.name == 'nt':
504                # Special check for directory junctions, which appear as
505                # symlinks but we want to recurse.
506                lstat = srcentry.stat(follow_symlinks=False)
507                if lstat.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT:
508                    is_symlink = False
509            if is_symlink:
510                linkto = os.readlink(srcname)
511                if symlinks:
512                    # We can't just leave it to `copy_function` because legacy
513                    # code with a custom `copy_function` may rely on copytree
514                    # doing the right thing.
515                    os.symlink(linkto, dstname)
516                    copystat(srcobj, dstname, follow_symlinks=not symlinks)
517                else:
518                    # ignore dangling symlink if the flag is on
519                    if not os.path.exists(linkto) and ignore_dangling_symlinks:
520                        continue
521                    # otherwise let the copy occur. copy2 will raise an error
522                    if srcentry.is_dir():
523                        copytree(srcobj, dstname, symlinks, ignore,
524                                 copy_function, ignore_dangling_symlinks,
525                                 dirs_exist_ok)
526                    else:
527                        copy_function(srcobj, dstname)
528            elif srcentry.is_dir():
529                copytree(srcobj, dstname, symlinks, ignore, copy_function,
530                         ignore_dangling_symlinks, dirs_exist_ok)
531            else:
532                # Will raise a SpecialFileError for unsupported file types
533                copy_function(srcobj, dstname)
534        # catch the Error from the recursive copytree so that we can
535        # continue with other files
536        except Error as err:
537            errors.extend(err.args[0])
538        except OSError as why:
539            errors.append((srcname, dstname, str(why)))
540    try:
541        copystat(src, dst)
542    except OSError as why:
543        # Copying file access times may fail on Windows
544        if getattr(why, 'winerror', None) is None:
545            errors.append((src, dst, str(why)))
546    if errors:
547        raise Error(errors)
548    return dst
549
550def copytree(src, dst, symlinks=False, ignore=None, copy_function=copy2,
551             ignore_dangling_symlinks=False, dirs_exist_ok=False):
552    """Recursively copy a directory tree and return the destination directory.
553
554    If exception(s) occur, an Error is raised with a list of reasons.
555
556    If the optional symlinks flag is true, symbolic links in the
557    source tree result in symbolic links in the destination tree; if
558    it is false, the contents of the files pointed to by symbolic
559    links are copied. If the file pointed to by the symlink doesn't
560    exist, an exception will be added in the list of errors raised in
561    an Error exception at the end of the copy process.
562
563    You can set the optional ignore_dangling_symlinks flag to true if you
564    want to silence this exception. Notice that this has no effect on
565    platforms that don't support os.symlink.
566
567    The optional ignore argument is a callable. If given, it
568    is called with the `src` parameter, which is the directory
569    being visited by copytree(), and `names` which is the list of
570    `src` contents, as returned by os.listdir():
571
572        callable(src, names) -> ignored_names
573
574    Since copytree() is called recursively, the callable will be
575    called once for each directory that is copied. It returns a
576    list of names relative to the `src` directory that should
577    not be copied.
578
579    The optional copy_function argument is a callable that will be used
580    to copy each file. It will be called with the source path and the
581    destination path as arguments. By default, copy2() is used, but any
582    function that supports the same signature (like copy()) can be used.
583
584    If dirs_exist_ok is false (the default) and `dst` already exists, a
585    `FileExistsError` is raised. If `dirs_exist_ok` is true, the copying
586    operation will continue if it encounters existing directories, and files
587    within the `dst` tree will be overwritten by corresponding files from the
588    `src` tree.
589    """
590    sys.audit("shutil.copytree", src, dst)
591    with os.scandir(src) as itr:
592        entries = list(itr)
593    return _copytree(entries=entries, src=src, dst=dst, symlinks=symlinks,
594                     ignore=ignore, copy_function=copy_function,
595                     ignore_dangling_symlinks=ignore_dangling_symlinks,
596                     dirs_exist_ok=dirs_exist_ok)
597
598if hasattr(os.stat_result, 'st_file_attributes'):
599    def _rmtree_islink(st):
600        return (stat.S_ISLNK(st.st_mode) or
601            (st.st_file_attributes & stat.FILE_ATTRIBUTE_REPARSE_POINT
602                and st.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT))
603else:
604    def _rmtree_islink(st):
605        return stat.S_ISLNK(st.st_mode)
606
607# version vulnerable to race conditions
608def _rmtree_unsafe(path, onexc):
609    def onerror(err):
610        if not isinstance(err, FileNotFoundError):
611            onexc(os.scandir, err.filename, err)
612    results = os.walk(path, topdown=False, onerror=onerror, followlinks=os._walk_symlinks_as_files)
613    for dirpath, dirnames, filenames in results:
614        for name in dirnames:
615            fullname = os.path.join(dirpath, name)
616            try:
617                os.rmdir(fullname)
618            except FileNotFoundError:
619                continue
620            except OSError as err:
621                onexc(os.rmdir, fullname, err)
622        for name in filenames:
623            fullname = os.path.join(dirpath, name)
624            try:
625                os.unlink(fullname)
626            except FileNotFoundError:
627                continue
628            except OSError as err:
629                onexc(os.unlink, fullname, err)
630    try:
631        os.rmdir(path)
632    except FileNotFoundError:
633        pass
634    except OSError as err:
635        onexc(os.rmdir, path, err)
636
637# Version using fd-based APIs to protect against races
638def _rmtree_safe_fd(stack, onexc):
639    # Each stack item has four elements:
640    # * func: The first operation to perform: os.lstat, os.close or os.rmdir.
641    #   Walking a directory starts with an os.lstat() to detect symlinks; in
642    #   this case, func is updated before subsequent operations and passed to
643    #   onexc() if an error occurs.
644    # * dirfd: Open file descriptor, or None if we're processing the top-level
645    #   directory given to rmtree() and the user didn't supply dir_fd.
646    # * path: Path of file to operate upon. This is passed to onexc() if an
647    #   error occurs.
648    # * orig_entry: os.DirEntry, or None if we're processing the top-level
649    #   directory given to rmtree(). We used the cached stat() of the entry to
650    #   save a call to os.lstat() when walking subdirectories.
651    func, dirfd, path, orig_entry = stack.pop()
652    name = path if orig_entry is None else orig_entry.name
653    try:
654        if func is os.close:
655            os.close(dirfd)
656            return
657        if func is os.rmdir:
658            os.rmdir(name, dir_fd=dirfd)
659            return
660
661        # Note: To guard against symlink races, we use the standard
662        # lstat()/open()/fstat() trick.
663        assert func is os.lstat
664        if orig_entry is None:
665            orig_st = os.lstat(name, dir_fd=dirfd)
666        else:
667            orig_st = orig_entry.stat(follow_symlinks=False)
668
669        func = os.open  # For error reporting.
670        topfd = os.open(name, os.O_RDONLY | os.O_NONBLOCK, dir_fd=dirfd)
671
672        func = os.path.islink  # For error reporting.
673        try:
674            if not os.path.samestat(orig_st, os.fstat(topfd)):
675                # Symlinks to directories are forbidden, see GH-46010.
676                raise OSError("Cannot call rmtree on a symbolic link")
677            stack.append((os.rmdir, dirfd, path, orig_entry))
678        finally:
679            stack.append((os.close, topfd, path, orig_entry))
680
681        func = os.scandir  # For error reporting.
682        with os.scandir(topfd) as scandir_it:
683            entries = list(scandir_it)
684        for entry in entries:
685            fullname = os.path.join(path, entry.name)
686            try:
687                if entry.is_dir(follow_symlinks=False):
688                    # Traverse into sub-directory.
689                    stack.append((os.lstat, topfd, fullname, entry))
690                    continue
691            except FileNotFoundError:
692                continue
693            except OSError:
694                pass
695            try:
696                os.unlink(entry.name, dir_fd=topfd)
697            except FileNotFoundError:
698                continue
699            except OSError as err:
700                onexc(os.unlink, fullname, err)
701    except FileNotFoundError as err:
702        if orig_entry is None or func is os.close:
703            err.filename = path
704            onexc(func, path, err)
705    except OSError as err:
706        err.filename = path
707        onexc(func, path, err)
708
709_use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <=
710                     os.supports_dir_fd and
711                     os.scandir in os.supports_fd and
712                     os.stat in os.supports_follow_symlinks)
713
714def rmtree(path, ignore_errors=False, onerror=None, *, onexc=None, dir_fd=None):
715    """Recursively delete a directory tree.
716
717    If dir_fd is not None, it should be a file descriptor open to a directory;
718    path will then be relative to that directory.
719    dir_fd may not be implemented on your platform.
720    If it is unavailable, using it will raise a NotImplementedError.
721
722    If ignore_errors is set, errors are ignored; otherwise, if onexc or
723    onerror is set, it is called to handle the error with arguments (func,
724    path, exc_info) where func is platform and implementation dependent;
725    path is the argument to that function that caused it to fail; and
726    the value of exc_info describes the exception. For onexc it is the
727    exception instance, and for onerror it is a tuple as returned by
728    sys.exc_info().  If ignore_errors is false and both onexc and
729    onerror are None, the exception is reraised.
730
731    onerror is deprecated and only remains for backwards compatibility.
732    If both onerror and onexc are set, onerror is ignored and onexc is used.
733    """
734
735    sys.audit("shutil.rmtree", path, dir_fd)
736    if ignore_errors:
737        def onexc(*args):
738            pass
739    elif onerror is None and onexc is None:
740        def onexc(*args):
741            raise
742    elif onexc is None:
743        if onerror is None:
744            def onexc(*args):
745                raise
746        else:
747            # delegate to onerror
748            def onexc(*args):
749                func, path, exc = args
750                if exc is None:
751                    exc_info = None, None, None
752                else:
753                    exc_info = type(exc), exc, exc.__traceback__
754                return onerror(func, path, exc_info)
755
756    if _use_fd_functions:
757        # While the unsafe rmtree works fine on bytes, the fd based does not.
758        if isinstance(path, bytes):
759            path = os.fsdecode(path)
760        stack = [(os.lstat, dir_fd, path, None)]
761        try:
762            while stack:
763                _rmtree_safe_fd(stack, onexc)
764        finally:
765            # Close any file descriptors still on the stack.
766            while stack:
767                func, fd, path, entry = stack.pop()
768                if func is not os.close:
769                    continue
770                try:
771                    os.close(fd)
772                except OSError as err:
773                    onexc(os.close, path, err)
774    else:
775        if dir_fd is not None:
776            raise NotImplementedError("dir_fd unavailable on this platform")
777        try:
778            st = os.lstat(path)
779        except OSError as err:
780            onexc(os.lstat, path, err)
781            return
782        try:
783            if _rmtree_islink(st):
784                # symlinks to directories are forbidden, see bug #1669
785                raise OSError("Cannot call rmtree on a symbolic link")
786        except OSError as err:
787            onexc(os.path.islink, path, err)
788            # can't continue even if onexc hook returns
789            return
790        return _rmtree_unsafe(path, onexc)
791
792# Allow introspection of whether or not the hardening against symlink
793# attacks is supported on the current platform
794rmtree.avoids_symlink_attacks = _use_fd_functions
795
796def _basename(path):
797    """A basename() variant which first strips the trailing slash, if present.
798    Thus we always get the last component of the path, even for directories.
799
800    path: Union[PathLike, str]
801
802    e.g.
803    >>> os.path.basename('/bar/foo')
804    'foo'
805    >>> os.path.basename('/bar/foo/')
806    ''
807    >>> _basename('/bar/foo/')
808    'foo'
809    """
810    path = os.fspath(path)
811    sep = os.path.sep + (os.path.altsep or '')
812    return os.path.basename(path.rstrip(sep))
813
814def move(src, dst, copy_function=copy2):
815    """Recursively move a file or directory to another location. This is
816    similar to the Unix "mv" command. Return the file or directory's
817    destination.
818
819    If dst is an existing directory or a symlink to a directory, then src is
820    moved inside that directory. The destination path in that directory must
821    not already exist.
822
823    If dst already exists but is not a directory, it may be overwritten
824    depending on os.rename() semantics.
825
826    If the destination is on our current filesystem, then rename() is used.
827    Otherwise, src is copied to the destination and then removed. Symlinks are
828    recreated under the new name if os.rename() fails because of cross
829    filesystem renames.
830
831    The optional `copy_function` argument is a callable that will be used
832    to copy the source or it will be delegated to `copytree`.
833    By default, copy2() is used, but any function that supports the same
834    signature (like copy()) can be used.
835
836    A lot more could be done here...  A look at a mv.c shows a lot of
837    the issues this implementation glosses over.
838
839    """
840    sys.audit("shutil.move", src, dst)
841    real_dst = dst
842    if os.path.isdir(dst):
843        if _samefile(src, dst) and not os.path.islink(src):
844            # We might be on a case insensitive filesystem,
845            # perform the rename anyway.
846            os.rename(src, dst)
847            return
848
849        # Using _basename instead of os.path.basename is important, as we must
850        # ignore any trailing slash to avoid the basename returning ''
851        real_dst = os.path.join(dst, _basename(src))
852
853        if os.path.exists(real_dst):
854            raise Error("Destination path '%s' already exists" % real_dst)
855    try:
856        os.rename(src, real_dst)
857    except OSError:
858        if os.path.islink(src):
859            linkto = os.readlink(src)
860            os.symlink(linkto, real_dst)
861            os.unlink(src)
862        elif os.path.isdir(src):
863            if _destinsrc(src, dst):
864                raise Error("Cannot move a directory '%s' into itself"
865                            " '%s'." % (src, dst))
866            if (_is_immutable(src)
867                    or (not os.access(src, os.W_OK) and os.listdir(src)
868                        and sys.platform == 'darwin')):
869                raise PermissionError("Cannot move the non-empty directory "
870                                      "'%s': Lacking write permission to '%s'."
871                                      % (src, src))
872            copytree(src, real_dst, copy_function=copy_function,
873                     symlinks=True)
874            rmtree(src)
875        else:
876            copy_function(src, real_dst)
877            os.unlink(src)
878    return real_dst
879
880def _destinsrc(src, dst):
881    src = os.path.abspath(src)
882    dst = os.path.abspath(dst)
883    if not src.endswith(os.path.sep):
884        src += os.path.sep
885    if not dst.endswith(os.path.sep):
886        dst += os.path.sep
887    return dst.startswith(src)
888
889def _is_immutable(src):
890    st = _stat(src)
891    immutable_states = [stat.UF_IMMUTABLE, stat.SF_IMMUTABLE]
892    return hasattr(st, 'st_flags') and st.st_flags in immutable_states
893
894def _get_gid(name):
895    """Returns a gid, given a group name."""
896    if name is None:
897        return None
898
899    try:
900        from grp import getgrnam
901    except ImportError:
902        return None
903
904    try:
905        result = getgrnam(name)
906    except KeyError:
907        result = None
908    if result is not None:
909        return result[2]
910    return None
911
912def _get_uid(name):
913    """Returns an uid, given a user name."""
914    if name is None:
915        return None
916
917    try:
918        from pwd import getpwnam
919    except ImportError:
920        return None
921
922    try:
923        result = getpwnam(name)
924    except KeyError:
925        result = None
926    if result is not None:
927        return result[2]
928    return None
929
930def _make_tarball(base_name, base_dir, compress="gzip", verbose=0, dry_run=0,
931                  owner=None, group=None, logger=None, root_dir=None):
932    """Create a (possibly compressed) tar file from all the files under
933    'base_dir'.
934
935    'compress' must be "gzip" (the default), "bzip2", "xz", or None.
936
937    'owner' and 'group' can be used to define an owner and a group for the
938    archive that is being built. If not provided, the current owner and group
939    will be used.
940
941    The output tar file will be named 'base_name' +  ".tar", possibly plus
942    the appropriate compression extension (".gz", ".bz2", or ".xz").
943
944    Returns the output filename.
945    """
946    if compress is None:
947        tar_compression = ''
948    elif _ZLIB_SUPPORTED and compress == 'gzip':
949        tar_compression = 'gz'
950    elif _BZ2_SUPPORTED and compress == 'bzip2':
951        tar_compression = 'bz2'
952    elif _LZMA_SUPPORTED and compress == 'xz':
953        tar_compression = 'xz'
954    else:
955        raise ValueError("bad value for 'compress', or compression format not "
956                         "supported : {0}".format(compress))
957
958    import tarfile  # late import for breaking circular dependency
959
960    compress_ext = '.' + tar_compression if compress else ''
961    archive_name = base_name + '.tar' + compress_ext
962    archive_dir = os.path.dirname(archive_name)
963
964    if archive_dir and not os.path.exists(archive_dir):
965        if logger is not None:
966            logger.info("creating %s", archive_dir)
967        if not dry_run:
968            os.makedirs(archive_dir)
969
970    # creating the tarball
971    if logger is not None:
972        logger.info('Creating tar archive')
973
974    uid = _get_uid(owner)
975    gid = _get_gid(group)
976
977    def _set_uid_gid(tarinfo):
978        if gid is not None:
979            tarinfo.gid = gid
980            tarinfo.gname = group
981        if uid is not None:
982            tarinfo.uid = uid
983            tarinfo.uname = owner
984        return tarinfo
985
986    if not dry_run:
987        tar = tarfile.open(archive_name, 'w|%s' % tar_compression)
988        arcname = base_dir
989        if root_dir is not None:
990            base_dir = os.path.join(root_dir, base_dir)
991        try:
992            tar.add(base_dir, arcname, filter=_set_uid_gid)
993        finally:
994            tar.close()
995
996    if root_dir is not None:
997        archive_name = os.path.abspath(archive_name)
998    return archive_name
999
1000def _make_zipfile(base_name, base_dir, verbose=0, dry_run=0,
1001                  logger=None, owner=None, group=None, root_dir=None):
1002    """Create a zip file from all the files under 'base_dir'.
1003
1004    The output zip file will be named 'base_name' + ".zip".  Returns the
1005    name of the output zip file.
1006    """
1007    import zipfile  # late import for breaking circular dependency
1008
1009    zip_filename = base_name + ".zip"
1010    archive_dir = os.path.dirname(base_name)
1011
1012    if archive_dir and not os.path.exists(archive_dir):
1013        if logger is not None:
1014            logger.info("creating %s", archive_dir)
1015        if not dry_run:
1016            os.makedirs(archive_dir)
1017
1018    if logger is not None:
1019        logger.info("creating '%s' and adding '%s' to it",
1020                    zip_filename, base_dir)
1021
1022    if not dry_run:
1023        with zipfile.ZipFile(zip_filename, "w",
1024                             compression=zipfile.ZIP_DEFLATED) as zf:
1025            arcname = os.path.normpath(base_dir)
1026            if root_dir is not None:
1027                base_dir = os.path.join(root_dir, base_dir)
1028            base_dir = os.path.normpath(base_dir)
1029            if arcname != os.curdir:
1030                zf.write(base_dir, arcname)
1031                if logger is not None:
1032                    logger.info("adding '%s'", base_dir)
1033            for dirpath, dirnames, filenames in os.walk(base_dir):
1034                arcdirpath = dirpath
1035                if root_dir is not None:
1036                    arcdirpath = os.path.relpath(arcdirpath, root_dir)
1037                arcdirpath = os.path.normpath(arcdirpath)
1038                for name in sorted(dirnames):
1039                    path = os.path.join(dirpath, name)
1040                    arcname = os.path.join(arcdirpath, name)
1041                    zf.write(path, arcname)
1042                    if logger is not None:
1043                        logger.info("adding '%s'", path)
1044                for name in filenames:
1045                    path = os.path.join(dirpath, name)
1046                    path = os.path.normpath(path)
1047                    if os.path.isfile(path):
1048                        arcname = os.path.join(arcdirpath, name)
1049                        zf.write(path, arcname)
1050                        if logger is not None:
1051                            logger.info("adding '%s'", path)
1052
1053    if root_dir is not None:
1054        zip_filename = os.path.abspath(zip_filename)
1055    return zip_filename
1056
1057_make_tarball.supports_root_dir = True
1058_make_zipfile.supports_root_dir = True
1059
1060# Maps the name of the archive format to a tuple containing:
1061# * the archiving function
1062# * extra keyword arguments
1063# * description
1064_ARCHIVE_FORMATS = {
1065    'tar':   (_make_tarball, [('compress', None)],
1066              "uncompressed tar file"),
1067}
1068
1069if _ZLIB_SUPPORTED:
1070    _ARCHIVE_FORMATS['gztar'] = (_make_tarball, [('compress', 'gzip')],
1071                                "gzip'ed tar-file")
1072    _ARCHIVE_FORMATS['zip'] = (_make_zipfile, [], "ZIP file")
1073
1074if _BZ2_SUPPORTED:
1075    _ARCHIVE_FORMATS['bztar'] = (_make_tarball, [('compress', 'bzip2')],
1076                                "bzip2'ed tar-file")
1077
1078if _LZMA_SUPPORTED:
1079    _ARCHIVE_FORMATS['xztar'] = (_make_tarball, [('compress', 'xz')],
1080                                "xz'ed tar-file")
1081
1082def get_archive_formats():
1083    """Returns a list of supported formats for archiving and unarchiving.
1084
1085    Each element of the returned sequence is a tuple (name, description)
1086    """
1087    formats = [(name, registry[2]) for name, registry in
1088               _ARCHIVE_FORMATS.items()]
1089    formats.sort()
1090    return formats
1091
1092def register_archive_format(name, function, extra_args=None, description=''):
1093    """Registers an archive format.
1094
1095    name is the name of the format. function is the callable that will be
1096    used to create archives. If provided, extra_args is a sequence of
1097    (name, value) tuples that will be passed as arguments to the callable.
1098    description can be provided to describe the format, and will be returned
1099    by the get_archive_formats() function.
1100    """
1101    if extra_args is None:
1102        extra_args = []
1103    if not callable(function):
1104        raise TypeError('The %s object is not callable' % function)
1105    if not isinstance(extra_args, (tuple, list)):
1106        raise TypeError('extra_args needs to be a sequence')
1107    for element in extra_args:
1108        if not isinstance(element, (tuple, list)) or len(element) !=2:
1109            raise TypeError('extra_args elements are : (arg_name, value)')
1110
1111    _ARCHIVE_FORMATS[name] = (function, extra_args, description)
1112
1113def unregister_archive_format(name):
1114    del _ARCHIVE_FORMATS[name]
1115
1116def make_archive(base_name, format, root_dir=None, base_dir=None, verbose=0,
1117                 dry_run=0, owner=None, group=None, logger=None):
1118    """Create an archive file (eg. zip or tar).
1119
1120    'base_name' is the name of the file to create, minus any format-specific
1121    extension; 'format' is the archive format: one of "zip", "tar", "gztar",
1122    "bztar", or "xztar".  Or any other registered format.
1123
1124    'root_dir' is a directory that will be the root directory of the
1125    archive; ie. we typically chdir into 'root_dir' before creating the
1126    archive.  'base_dir' is the directory where we start archiving from;
1127    ie. 'base_dir' will be the common prefix of all files and
1128    directories in the archive.  'root_dir' and 'base_dir' both default
1129    to the current directory.  Returns the name of the archive file.
1130
1131    'owner' and 'group' are used when creating a tar archive. By default,
1132    uses the current owner and group.
1133    """
1134    sys.audit("shutil.make_archive", base_name, format, root_dir, base_dir)
1135    try:
1136        format_info = _ARCHIVE_FORMATS[format]
1137    except KeyError:
1138        raise ValueError("unknown archive format '%s'" % format) from None
1139
1140    kwargs = {'dry_run': dry_run, 'logger': logger,
1141              'owner': owner, 'group': group}
1142
1143    func = format_info[0]
1144    for arg, val in format_info[1]:
1145        kwargs[arg] = val
1146
1147    if base_dir is None:
1148        base_dir = os.curdir
1149
1150    supports_root_dir = getattr(func, 'supports_root_dir', False)
1151    save_cwd = None
1152    if root_dir is not None:
1153        stmd = os.stat(root_dir).st_mode
1154        if not stat.S_ISDIR(stmd):
1155            raise NotADirectoryError(errno.ENOTDIR, 'Not a directory', root_dir)
1156
1157        if supports_root_dir:
1158            # Support path-like base_name here for backwards-compatibility.
1159            base_name = os.fspath(base_name)
1160            kwargs['root_dir'] = root_dir
1161        else:
1162            save_cwd = os.getcwd()
1163            if logger is not None:
1164                logger.debug("changing into '%s'", root_dir)
1165            base_name = os.path.abspath(base_name)
1166            if not dry_run:
1167                os.chdir(root_dir)
1168
1169    try:
1170        filename = func(base_name, base_dir, **kwargs)
1171    finally:
1172        if save_cwd is not None:
1173            if logger is not None:
1174                logger.debug("changing back to '%s'", save_cwd)
1175            os.chdir(save_cwd)
1176
1177    return filename
1178
1179
1180def get_unpack_formats():
1181    """Returns a list of supported formats for unpacking.
1182
1183    Each element of the returned sequence is a tuple
1184    (name, extensions, description)
1185    """
1186    formats = [(name, info[0], info[3]) for name, info in
1187               _UNPACK_FORMATS.items()]
1188    formats.sort()
1189    return formats
1190
1191def _check_unpack_options(extensions, function, extra_args):
1192    """Checks what gets registered as an unpacker."""
1193    # first make sure no other unpacker is registered for this extension
1194    existing_extensions = {}
1195    for name, info in _UNPACK_FORMATS.items():
1196        for ext in info[0]:
1197            existing_extensions[ext] = name
1198
1199    for extension in extensions:
1200        if extension in existing_extensions:
1201            msg = '%s is already registered for "%s"'
1202            raise RegistryError(msg % (extension,
1203                                       existing_extensions[extension]))
1204
1205    if not callable(function):
1206        raise TypeError('The registered function must be a callable')
1207
1208
1209def register_unpack_format(name, extensions, function, extra_args=None,
1210                           description=''):
1211    """Registers an unpack format.
1212
1213    `name` is the name of the format. `extensions` is a list of extensions
1214    corresponding to the format.
1215
1216    `function` is the callable that will be
1217    used to unpack archives. The callable will receive archives to unpack.
1218    If it's unable to handle an archive, it needs to raise a ReadError
1219    exception.
1220
1221    If provided, `extra_args` is a sequence of
1222    (name, value) tuples that will be passed as arguments to the callable.
1223    description can be provided to describe the format, and will be returned
1224    by the get_unpack_formats() function.
1225    """
1226    if extra_args is None:
1227        extra_args = []
1228    _check_unpack_options(extensions, function, extra_args)
1229    _UNPACK_FORMATS[name] = extensions, function, extra_args, description
1230
1231def unregister_unpack_format(name):
1232    """Removes the pack format from the registry."""
1233    del _UNPACK_FORMATS[name]
1234
1235def _ensure_directory(path):
1236    """Ensure that the parent directory of `path` exists"""
1237    dirname = os.path.dirname(path)
1238    if not os.path.isdir(dirname):
1239        os.makedirs(dirname)
1240
1241def _unpack_zipfile(filename, extract_dir):
1242    """Unpack zip `filename` to `extract_dir`
1243    """
1244    import zipfile  # late import for breaking circular dependency
1245
1246    if not zipfile.is_zipfile(filename):
1247        raise ReadError("%s is not a zip file" % filename)
1248
1249    zip = zipfile.ZipFile(filename)
1250    try:
1251        for info in zip.infolist():
1252            name = info.filename
1253
1254            # don't extract absolute paths or ones with .. in them
1255            if name.startswith('/') or '..' in name:
1256                continue
1257
1258            targetpath = os.path.join(extract_dir, *name.split('/'))
1259            if not targetpath:
1260                continue
1261
1262            _ensure_directory(targetpath)
1263            if not name.endswith('/'):
1264                # file
1265                with zip.open(name, 'r') as source, \
1266                        open(targetpath, 'wb') as target:
1267                    copyfileobj(source, target)
1268    finally:
1269        zip.close()
1270
1271def _unpack_tarfile(filename, extract_dir, *, filter=None):
1272    """Unpack tar/tar.gz/tar.bz2/tar.xz `filename` to `extract_dir`
1273    """
1274    import tarfile  # late import for breaking circular dependency
1275    try:
1276        tarobj = tarfile.open(filename)
1277    except tarfile.TarError:
1278        raise ReadError(
1279            "%s is not a compressed or uncompressed tar file" % filename)
1280    try:
1281        tarobj.extractall(extract_dir, filter=filter)
1282    finally:
1283        tarobj.close()
1284
1285# Maps the name of the unpack format to a tuple containing:
1286# * extensions
1287# * the unpacking function
1288# * extra keyword arguments
1289# * description
1290_UNPACK_FORMATS = {
1291    'tar':   (['.tar'], _unpack_tarfile, [], "uncompressed tar file"),
1292    'zip':   (['.zip'], _unpack_zipfile, [], "ZIP file"),
1293}
1294
1295if _ZLIB_SUPPORTED:
1296    _UNPACK_FORMATS['gztar'] = (['.tar.gz', '.tgz'], _unpack_tarfile, [],
1297                                "gzip'ed tar-file")
1298
1299if _BZ2_SUPPORTED:
1300    _UNPACK_FORMATS['bztar'] = (['.tar.bz2', '.tbz2'], _unpack_tarfile, [],
1301                                "bzip2'ed tar-file")
1302
1303if _LZMA_SUPPORTED:
1304    _UNPACK_FORMATS['xztar'] = (['.tar.xz', '.txz'], _unpack_tarfile, [],
1305                                "xz'ed tar-file")
1306
1307def _find_unpack_format(filename):
1308    for name, info in _UNPACK_FORMATS.items():
1309        for extension in info[0]:
1310            if filename.endswith(extension):
1311                return name
1312    return None
1313
1314def unpack_archive(filename, extract_dir=None, format=None, *, filter=None):
1315    """Unpack an archive.
1316
1317    `filename` is the name of the archive.
1318
1319    `extract_dir` is the name of the target directory, where the archive
1320    is unpacked. If not provided, the current working directory is used.
1321
1322    `format` is the archive format: one of "zip", "tar", "gztar", "bztar",
1323    or "xztar".  Or any other registered format.  If not provided,
1324    unpack_archive will use the filename extension and see if an unpacker
1325    was registered for that extension.
1326
1327    In case none is found, a ValueError is raised.
1328
1329    If `filter` is given, it is passed to the underlying
1330    extraction function.
1331    """
1332    sys.audit("shutil.unpack_archive", filename, extract_dir, format)
1333
1334    if extract_dir is None:
1335        extract_dir = os.getcwd()
1336
1337    extract_dir = os.fspath(extract_dir)
1338    filename = os.fspath(filename)
1339
1340    if filter is None:
1341        filter_kwargs = {}
1342    else:
1343        filter_kwargs = {'filter': filter}
1344    if format is not None:
1345        try:
1346            format_info = _UNPACK_FORMATS[format]
1347        except KeyError:
1348            raise ValueError("Unknown unpack format '{0}'".format(format)) from None
1349
1350        func = format_info[1]
1351        func(filename, extract_dir, **dict(format_info[2]), **filter_kwargs)
1352    else:
1353        # we need to look at the registered unpackers supported extensions
1354        format = _find_unpack_format(filename)
1355        if format is None:
1356            raise ReadError("Unknown archive format '{0}'".format(filename))
1357
1358        func = _UNPACK_FORMATS[format][1]
1359        kwargs = dict(_UNPACK_FORMATS[format][2]) | filter_kwargs
1360        func(filename, extract_dir, **kwargs)
1361
1362
1363if hasattr(os, 'statvfs'):
1364
1365    __all__.append('disk_usage')
1366    _ntuple_diskusage = collections.namedtuple('usage', 'total used free')
1367    _ntuple_diskusage.total.__doc__ = 'Total space in bytes'
1368    _ntuple_diskusage.used.__doc__ = 'Used space in bytes'
1369    _ntuple_diskusage.free.__doc__ = 'Free space in bytes'
1370
1371    def disk_usage(path):
1372        """Return disk usage statistics about the given path.
1373
1374        Returned value is a named tuple with attributes 'total', 'used' and
1375        'free', which are the amount of total, used and free space, in bytes.
1376        """
1377        st = os.statvfs(path)
1378        free = st.f_bavail * st.f_frsize
1379        total = st.f_blocks * st.f_frsize
1380        used = (st.f_blocks - st.f_bfree) * st.f_frsize
1381        return _ntuple_diskusage(total, used, free)
1382
1383elif _WINDOWS:
1384
1385    __all__.append('disk_usage')
1386    _ntuple_diskusage = collections.namedtuple('usage', 'total used free')
1387
1388    def disk_usage(path):
1389        """Return disk usage statistics about the given path.
1390
1391        Returned values is a named tuple with attributes 'total', 'used' and
1392        'free', which are the amount of total, used and free space, in bytes.
1393        """
1394        total, free = nt._getdiskusage(path)
1395        used = total - free
1396        return _ntuple_diskusage(total, used, free)
1397
1398
1399def chown(path, user=None, group=None, *, dir_fd=None, follow_symlinks=True):
1400    """Change owner user and group of the given path.
1401
1402    user and group can be the uid/gid or the user/group names, and in that case,
1403    they are converted to their respective uid/gid.
1404
1405    If dir_fd is set, it should be an open file descriptor to the directory to
1406    be used as the root of *path* if it is relative.
1407
1408    If follow_symlinks is set to False and the last element of the path is a
1409    symbolic link, chown will modify the link itself and not the file being
1410    referenced by the link.
1411    """
1412    sys.audit('shutil.chown', path, user, group)
1413
1414    if user is None and group is None:
1415        raise ValueError("user and/or group must be set")
1416
1417    _user = user
1418    _group = group
1419
1420    # -1 means don't change it
1421    if user is None:
1422        _user = -1
1423    # user can either be an int (the uid) or a string (the system username)
1424    elif isinstance(user, str):
1425        _user = _get_uid(user)
1426        if _user is None:
1427            raise LookupError("no such user: {!r}".format(user))
1428
1429    if group is None:
1430        _group = -1
1431    elif not isinstance(group, int):
1432        _group = _get_gid(group)
1433        if _group is None:
1434            raise LookupError("no such group: {!r}".format(group))
1435
1436    os.chown(path, _user, _group, dir_fd=dir_fd,
1437             follow_symlinks=follow_symlinks)
1438
1439def get_terminal_size(fallback=(80, 24)):
1440    """Get the size of the terminal window.
1441
1442    For each of the two dimensions, the environment variable, COLUMNS
1443    and LINES respectively, is checked. If the variable is defined and
1444    the value is a positive integer, it is used.
1445
1446    When COLUMNS or LINES is not defined, which is the common case,
1447    the terminal connected to sys.__stdout__ is queried
1448    by invoking os.get_terminal_size.
1449
1450    If the terminal size cannot be successfully queried, either because
1451    the system doesn't support querying, or because we are not
1452    connected to a terminal, the value given in fallback parameter
1453    is used. Fallback defaults to (80, 24) which is the default
1454    size used by many terminal emulators.
1455
1456    The value returned is a named tuple of type os.terminal_size.
1457    """
1458    # columns, lines are the working values
1459    try:
1460        columns = int(os.environ['COLUMNS'])
1461    except (KeyError, ValueError):
1462        columns = 0
1463
1464    try:
1465        lines = int(os.environ['LINES'])
1466    except (KeyError, ValueError):
1467        lines = 0
1468
1469    # only query if necessary
1470    if columns <= 0 or lines <= 0:
1471        try:
1472            size = os.get_terminal_size(sys.__stdout__.fileno())
1473        except (AttributeError, ValueError, OSError):
1474            # stdout is None, closed, detached, or not a terminal, or
1475            # os.get_terminal_size() is unsupported
1476            size = os.terminal_size(fallback)
1477        if columns <= 0:
1478            columns = size.columns or fallback[0]
1479        if lines <= 0:
1480            lines = size.lines or fallback[1]
1481
1482    return os.terminal_size((columns, lines))
1483
1484
1485# Check that a given file can be accessed with the correct mode.
1486# Additionally check that `file` is not a directory, as on Windows
1487# directories pass the os.access check.
1488def _access_check(fn, mode):
1489    return (os.path.exists(fn) and os.access(fn, mode)
1490            and not os.path.isdir(fn))
1491
1492
1493def _win_path_needs_curdir(cmd, mode):
1494    """
1495    On Windows, we can use NeedCurrentDirectoryForExePath to figure out
1496    if we should add the cwd to PATH when searching for executables if
1497    the mode is executable.
1498    """
1499    return (not (mode & os.X_OK)) or _winapi.NeedCurrentDirectoryForExePath(
1500                os.fsdecode(cmd))
1501
1502
1503def which(cmd, mode=os.F_OK | os.X_OK, path=None):
1504    """Given a command, mode, and a PATH string, return the path which
1505    conforms to the given mode on the PATH, or None if there is no such
1506    file.
1507
1508    `mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result
1509    of os.environ.get("PATH"), or can be overridden with a custom search
1510    path.
1511
1512    """
1513    use_bytes = isinstance(cmd, bytes)
1514
1515    # If we're given a path with a directory part, look it up directly rather
1516    # than referring to PATH directories. This includes checking relative to
1517    # the current directory, e.g. ./script
1518    dirname, cmd = os.path.split(cmd)
1519    if dirname:
1520        path = [dirname]
1521    else:
1522        if path is None:
1523            path = os.environ.get("PATH", None)
1524            if path is None:
1525                try:
1526                    path = os.confstr("CS_PATH")
1527                except (AttributeError, ValueError):
1528                    # os.confstr() or CS_PATH is not available
1529                    path = os.defpath
1530            # bpo-35755: Don't use os.defpath if the PATH environment variable
1531            # is set to an empty string
1532
1533        # PATH='' doesn't match, whereas PATH=':' looks in the current
1534        # directory
1535        if not path:
1536            return None
1537
1538        if use_bytes:
1539            path = os.fsencode(path)
1540            path = path.split(os.fsencode(os.pathsep))
1541        else:
1542            path = os.fsdecode(path)
1543            path = path.split(os.pathsep)
1544
1545        if sys.platform == "win32" and _win_path_needs_curdir(cmd, mode):
1546            curdir = os.curdir
1547            if use_bytes:
1548                curdir = os.fsencode(curdir)
1549            path.insert(0, curdir)
1550
1551    if sys.platform == "win32":
1552        # PATHEXT is necessary to check on Windows.
1553        pathext_source = os.getenv("PATHEXT") or _WIN_DEFAULT_PATHEXT
1554        pathext = pathext_source.split(os.pathsep)
1555        pathext = [ext.rstrip('.') for ext in pathext if ext]
1556
1557        if use_bytes:
1558            pathext = [os.fsencode(ext) for ext in pathext]
1559
1560        files = [cmd + ext for ext in pathext]
1561
1562        # If X_OK in mode, simulate the cmd.exe behavior: look at direct
1563        # match if and only if the extension is in PATHEXT.
1564        # If X_OK not in mode, simulate the first result of where.exe:
1565        # always look at direct match before a PATHEXT match.
1566        normcmd = cmd.upper()
1567        if not (mode & os.X_OK) or any(normcmd.endswith(ext.upper()) for ext in pathext):
1568            files.insert(0, cmd)
1569    else:
1570        # On other platforms you don't have things like PATHEXT to tell you
1571        # what file suffixes are executable, so just pass on cmd as-is.
1572        files = [cmd]
1573
1574    seen = set()
1575    for dir in path:
1576        normdir = os.path.normcase(dir)
1577        if normdir not in seen:
1578            seen.add(normdir)
1579            for thefile in files:
1580                name = os.path.join(dir, thefile)
1581                if _access_check(name, mode):
1582                    return name
1583    return None
1584