1"""Utility functions for copying and archiving files and directory trees. 2 3XXX The functions here don't copy the resource fork or other metadata on Mac. 4 5""" 6 7import os 8import sys 9import stat 10import fnmatch 11import collections 12import errno 13 14try: 15 import zlib 16 del zlib 17 _ZLIB_SUPPORTED = True 18except ImportError: 19 _ZLIB_SUPPORTED = False 20 21try: 22 import bz2 23 del bz2 24 _BZ2_SUPPORTED = True 25except ImportError: 26 _BZ2_SUPPORTED = False 27 28try: 29 import lzma 30 del lzma 31 _LZMA_SUPPORTED = True 32except ImportError: 33 _LZMA_SUPPORTED = False 34 35try: 36 from pwd import getpwnam 37except ImportError: 38 getpwnam = None 39 40try: 41 from grp import getgrnam 42except ImportError: 43 getgrnam = None 44 45_WINDOWS = os.name == 'nt' 46posix = nt = None 47if os.name == 'posix': 48 import posix 49elif _WINDOWS: 50 import nt 51 52COPY_BUFSIZE = 1024 * 1024 if _WINDOWS else 64 * 1024 53_USE_CP_SENDFILE = hasattr(os, "sendfile") and sys.platform.startswith("linux") 54_HAS_FCOPYFILE = posix and hasattr(posix, "_fcopyfile") # macOS 55 56# CMD defaults in Windows 10 57_WIN_DEFAULT_PATHEXT = ".COM;.EXE;.BAT;.CMD;.VBS;.JS;.WS;.MSC" 58 59__all__ = ["copyfileobj", "copyfile", "copymode", "copystat", "copy", "copy2", 60 "copytree", "move", "rmtree", "Error", "SpecialFileError", 61 "ExecError", "make_archive", "get_archive_formats", 62 "register_archive_format", "unregister_archive_format", 63 "get_unpack_formats", "register_unpack_format", 64 "unregister_unpack_format", "unpack_archive", 65 "ignore_patterns", "chown", "which", "get_terminal_size", 66 "SameFileError"] 67 # disk_usage is added later, if available on the platform 68 69class Error(OSError): 70 pass 71 72class SameFileError(Error): 73 """Raised when source and destination are the same file.""" 74 75class SpecialFileError(OSError): 76 """Raised when trying to do a kind of operation (e.g. copying) which is 77 not supported on a special file (e.g. a named pipe)""" 78 79class ExecError(OSError): 80 """Raised when a command could not be executed""" 81 82class ReadError(OSError): 83 """Raised when an archive cannot be read""" 84 85class RegistryError(Exception): 86 """Raised when a registry operation with the archiving 87 and unpacking registries fails""" 88 89class _GiveupOnFastCopy(Exception): 90 """Raised as a signal to fallback on using raw read()/write() 91 file copy when fast-copy functions fail to do so. 92 """ 93 94def _fastcopy_fcopyfile(fsrc, fdst, flags): 95 """Copy a regular file content or metadata by using high-performance 96 fcopyfile(3) syscall (macOS). 97 """ 98 try: 99 infd = fsrc.fileno() 100 outfd = fdst.fileno() 101 except Exception as err: 102 raise _GiveupOnFastCopy(err) # not a regular file 103 104 try: 105 posix._fcopyfile(infd, outfd, flags) 106 except OSError as err: 107 err.filename = fsrc.name 108 err.filename2 = fdst.name 109 if err.errno in {errno.EINVAL, errno.ENOTSUP}: 110 raise _GiveupOnFastCopy(err) 111 else: 112 raise err from None 113 114def _fastcopy_sendfile(fsrc, fdst): 115 """Copy data from one regular mmap-like fd to another by using 116 high-performance sendfile(2) syscall. 117 This should work on Linux >= 2.6.33 only. 118 """ 119 # Note: copyfileobj() is left alone in order to not introduce any 120 # unexpected breakage. Possible risks by using zero-copy calls 121 # in copyfileobj() are: 122 # - fdst cannot be open in "a"(ppend) mode 123 # - fsrc and fdst may be open in "t"(ext) mode 124 # - fsrc may be a BufferedReader (which hides unread data in a buffer), 125 # GzipFile (which decompresses data), HTTPResponse (which decodes 126 # chunks). 127 # - possibly others (e.g. encrypted fs/partition?) 128 global _USE_CP_SENDFILE 129 try: 130 infd = fsrc.fileno() 131 outfd = fdst.fileno() 132 except Exception as err: 133 raise _GiveupOnFastCopy(err) # not a regular file 134 135 # Hopefully the whole file will be copied in a single call. 136 # sendfile() is called in a loop 'till EOF is reached (0 return) 137 # so a bufsize smaller or bigger than the actual file size 138 # should not make any difference, also in case the file content 139 # changes while being copied. 140 try: 141 blocksize = max(os.fstat(infd).st_size, 2 ** 23) # min 8MiB 142 except OSError: 143 blocksize = 2 ** 27 # 128MiB 144 # On 32-bit architectures truncate to 1GiB to avoid OverflowError, 145 # see bpo-38319. 146 if sys.maxsize < 2 ** 32: 147 blocksize = min(blocksize, 2 ** 30) 148 149 offset = 0 150 while True: 151 try: 152 sent = os.sendfile(outfd, infd, offset, blocksize) 153 except OSError as err: 154 # ...in oder to have a more informative exception. 155 err.filename = fsrc.name 156 err.filename2 = fdst.name 157 158 if err.errno == errno.ENOTSOCK: 159 # sendfile() on this platform (probably Linux < 2.6.33) 160 # does not support copies between regular files (only 161 # sockets). 162 _USE_CP_SENDFILE = False 163 raise _GiveupOnFastCopy(err) 164 165 if err.errno == errno.ENOSPC: # filesystem is full 166 raise err from None 167 168 # Give up on first call and if no data was copied. 169 if offset == 0 and os.lseek(outfd, 0, os.SEEK_CUR) == 0: 170 raise _GiveupOnFastCopy(err) 171 172 raise err 173 else: 174 if sent == 0: 175 break # EOF 176 offset += sent 177 178def _copyfileobj_readinto(fsrc, fdst, length=COPY_BUFSIZE): 179 """readinto()/memoryview() based variant of copyfileobj(). 180 *fsrc* must support readinto() method and both files must be 181 open in binary mode. 182 """ 183 # Localize variable access to minimize overhead. 184 fsrc_readinto = fsrc.readinto 185 fdst_write = fdst.write 186 with memoryview(bytearray(length)) as mv: 187 while True: 188 n = fsrc_readinto(mv) 189 if not n: 190 break 191 elif n < length: 192 with mv[:n] as smv: 193 fdst.write(smv) 194 else: 195 fdst_write(mv) 196 197def copyfileobj(fsrc, fdst, length=0): 198 """copy data from file-like object fsrc to file-like object fdst""" 199 # Localize variable access to minimize overhead. 200 if not length: 201 length = COPY_BUFSIZE 202 fsrc_read = fsrc.read 203 fdst_write = fdst.write 204 while True: 205 buf = fsrc_read(length) 206 if not buf: 207 break 208 fdst_write(buf) 209 210def _samefile(src, dst): 211 # Macintosh, Unix. 212 if isinstance(src, os.DirEntry) and hasattr(os.path, 'samestat'): 213 try: 214 return os.path.samestat(src.stat(), os.stat(dst)) 215 except OSError: 216 return False 217 218 if hasattr(os.path, 'samefile'): 219 try: 220 return os.path.samefile(src, dst) 221 except OSError: 222 return False 223 224 # All other platforms: check for same pathname. 225 return (os.path.normcase(os.path.abspath(src)) == 226 os.path.normcase(os.path.abspath(dst))) 227 228def _stat(fn): 229 return fn.stat() if isinstance(fn, os.DirEntry) else os.stat(fn) 230 231def _islink(fn): 232 return fn.is_symlink() if isinstance(fn, os.DirEntry) else os.path.islink(fn) 233 234def copyfile(src, dst, *, follow_symlinks=True): 235 """Copy data from src to dst in the most efficient way possible. 236 237 If follow_symlinks is not set and src is a symbolic link, a new 238 symlink will be created instead of copying the file it points to. 239 240 """ 241 sys.audit("shutil.copyfile", src, dst) 242 243 if _samefile(src, dst): 244 raise SameFileError("{!r} and {!r} are the same file".format(src, dst)) 245 246 file_size = 0 247 for i, fn in enumerate([src, dst]): 248 try: 249 st = _stat(fn) 250 except OSError: 251 # File most likely does not exist 252 pass 253 else: 254 # XXX What about other special files? (sockets, devices...) 255 if stat.S_ISFIFO(st.st_mode): 256 fn = fn.path if isinstance(fn, os.DirEntry) else fn 257 raise SpecialFileError("`%s` is a named pipe" % fn) 258 if _WINDOWS and i == 0: 259 file_size = st.st_size 260 261 if not follow_symlinks and _islink(src): 262 os.symlink(os.readlink(src), dst) 263 else: 264 with open(src, 'rb') as fsrc, open(dst, 'wb') as fdst: 265 # macOS 266 if _HAS_FCOPYFILE: 267 try: 268 _fastcopy_fcopyfile(fsrc, fdst, posix._COPYFILE_DATA) 269 return dst 270 except _GiveupOnFastCopy: 271 pass 272 # Linux 273 elif _USE_CP_SENDFILE: 274 try: 275 _fastcopy_sendfile(fsrc, fdst) 276 return dst 277 except _GiveupOnFastCopy: 278 pass 279 # Windows, see: 280 # https://github.com/python/cpython/pull/7160#discussion_r195405230 281 elif _WINDOWS and file_size > 0: 282 _copyfileobj_readinto(fsrc, fdst, min(file_size, COPY_BUFSIZE)) 283 return dst 284 285 copyfileobj(fsrc, fdst) 286 287 return dst 288 289def copymode(src, dst, *, follow_symlinks=True): 290 """Copy mode bits from src to dst. 291 292 If follow_symlinks is not set, symlinks aren't followed if and only 293 if both `src` and `dst` are symlinks. If `lchmod` isn't available 294 (e.g. Linux) this method does nothing. 295 296 """ 297 sys.audit("shutil.copymode", src, dst) 298 299 if not follow_symlinks and _islink(src) and os.path.islink(dst): 300 if hasattr(os, 'lchmod'): 301 stat_func, chmod_func = os.lstat, os.lchmod 302 else: 303 return 304 else: 305 stat_func, chmod_func = _stat, os.chmod 306 307 st = stat_func(src) 308 chmod_func(dst, stat.S_IMODE(st.st_mode)) 309 310if hasattr(os, 'listxattr'): 311 def _copyxattr(src, dst, *, follow_symlinks=True): 312 """Copy extended filesystem attributes from `src` to `dst`. 313 314 Overwrite existing attributes. 315 316 If `follow_symlinks` is false, symlinks won't be followed. 317 318 """ 319 320 try: 321 names = os.listxattr(src, follow_symlinks=follow_symlinks) 322 except OSError as e: 323 if e.errno not in (errno.ENOTSUP, errno.ENODATA, errno.EINVAL): 324 raise 325 return 326 for name in names: 327 try: 328 value = os.getxattr(src, name, follow_symlinks=follow_symlinks) 329 os.setxattr(dst, name, value, follow_symlinks=follow_symlinks) 330 except OSError as e: 331 if e.errno not in (errno.EPERM, errno.ENOTSUP, errno.ENODATA, 332 errno.EINVAL): 333 raise 334else: 335 def _copyxattr(*args, **kwargs): 336 pass 337 338def copystat(src, dst, *, follow_symlinks=True): 339 """Copy file metadata 340 341 Copy the permission bits, last access time, last modification time, and 342 flags from `src` to `dst`. On Linux, copystat() also copies the "extended 343 attributes" where possible. The file contents, owner, and group are 344 unaffected. `src` and `dst` are path-like objects or path names given as 345 strings. 346 347 If the optional flag `follow_symlinks` is not set, symlinks aren't 348 followed if and only if both `src` and `dst` are symlinks. 349 """ 350 sys.audit("shutil.copystat", src, dst) 351 352 def _nop(*args, ns=None, follow_symlinks=None): 353 pass 354 355 # follow symlinks (aka don't not follow symlinks) 356 follow = follow_symlinks or not (_islink(src) and os.path.islink(dst)) 357 if follow: 358 # use the real function if it exists 359 def lookup(name): 360 return getattr(os, name, _nop) 361 else: 362 # use the real function only if it exists 363 # *and* it supports follow_symlinks 364 def lookup(name): 365 fn = getattr(os, name, _nop) 366 if fn in os.supports_follow_symlinks: 367 return fn 368 return _nop 369 370 if isinstance(src, os.DirEntry): 371 st = src.stat(follow_symlinks=follow) 372 else: 373 st = lookup("stat")(src, follow_symlinks=follow) 374 mode = stat.S_IMODE(st.st_mode) 375 lookup("utime")(dst, ns=(st.st_atime_ns, st.st_mtime_ns), 376 follow_symlinks=follow) 377 # We must copy extended attributes before the file is (potentially) 378 # chmod()'ed read-only, otherwise setxattr() will error with -EACCES. 379 _copyxattr(src, dst, follow_symlinks=follow) 380 try: 381 lookup("chmod")(dst, mode, follow_symlinks=follow) 382 except NotImplementedError: 383 # if we got a NotImplementedError, it's because 384 # * follow_symlinks=False, 385 # * lchown() is unavailable, and 386 # * either 387 # * fchownat() is unavailable or 388 # * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW. 389 # (it returned ENOSUP.) 390 # therefore we're out of options--we simply cannot chown the 391 # symlink. give up, suppress the error. 392 # (which is what shutil always did in this circumstance.) 393 pass 394 if hasattr(st, 'st_flags'): 395 try: 396 lookup("chflags")(dst, st.st_flags, follow_symlinks=follow) 397 except OSError as why: 398 for err in 'EOPNOTSUPP', 'ENOTSUP': 399 if hasattr(errno, err) and why.errno == getattr(errno, err): 400 break 401 else: 402 raise 403 404def copy(src, dst, *, follow_symlinks=True): 405 """Copy data and mode bits ("cp src dst"). Return the file's destination. 406 407 The destination may be a directory. 408 409 If follow_symlinks is false, symlinks won't be followed. This 410 resembles GNU's "cp -P src dst". 411 412 If source and destination are the same file, a SameFileError will be 413 raised. 414 415 """ 416 if os.path.isdir(dst): 417 dst = os.path.join(dst, os.path.basename(src)) 418 copyfile(src, dst, follow_symlinks=follow_symlinks) 419 copymode(src, dst, follow_symlinks=follow_symlinks) 420 return dst 421 422def copy2(src, dst, *, follow_symlinks=True): 423 """Copy data and metadata. Return the file's destination. 424 425 Metadata is copied with copystat(). Please see the copystat function 426 for more information. 427 428 The destination may be a directory. 429 430 If follow_symlinks is false, symlinks won't be followed. This 431 resembles GNU's "cp -P src dst". 432 """ 433 if os.path.isdir(dst): 434 dst = os.path.join(dst, os.path.basename(src)) 435 copyfile(src, dst, follow_symlinks=follow_symlinks) 436 copystat(src, dst, follow_symlinks=follow_symlinks) 437 return dst 438 439def ignore_patterns(*patterns): 440 """Function that can be used as copytree() ignore parameter. 441 442 Patterns is a sequence of glob-style patterns 443 that are used to exclude files""" 444 def _ignore_patterns(path, names): 445 ignored_names = [] 446 for pattern in patterns: 447 ignored_names.extend(fnmatch.filter(names, pattern)) 448 return set(ignored_names) 449 return _ignore_patterns 450 451def _copytree(entries, src, dst, symlinks, ignore, copy_function, 452 ignore_dangling_symlinks, dirs_exist_ok=False): 453 if ignore is not None: 454 ignored_names = ignore(os.fspath(src), [x.name for x in entries]) 455 else: 456 ignored_names = set() 457 458 os.makedirs(dst, exist_ok=dirs_exist_ok) 459 errors = [] 460 use_srcentry = copy_function is copy2 or copy_function is copy 461 462 for srcentry in entries: 463 if srcentry.name in ignored_names: 464 continue 465 srcname = os.path.join(src, srcentry.name) 466 dstname = os.path.join(dst, srcentry.name) 467 srcobj = srcentry if use_srcentry else srcname 468 try: 469 is_symlink = srcentry.is_symlink() 470 if is_symlink and os.name == 'nt': 471 # Special check for directory junctions, which appear as 472 # symlinks but we want to recurse. 473 lstat = srcentry.stat(follow_symlinks=False) 474 if lstat.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT: 475 is_symlink = False 476 if is_symlink: 477 linkto = os.readlink(srcname) 478 if symlinks: 479 # We can't just leave it to `copy_function` because legacy 480 # code with a custom `copy_function` may rely on copytree 481 # doing the right thing. 482 os.symlink(linkto, dstname) 483 copystat(srcobj, dstname, follow_symlinks=not symlinks) 484 else: 485 # ignore dangling symlink if the flag is on 486 if not os.path.exists(linkto) and ignore_dangling_symlinks: 487 continue 488 # otherwise let the copy occur. copy2 will raise an error 489 if srcentry.is_dir(): 490 copytree(srcobj, dstname, symlinks, ignore, 491 copy_function, dirs_exist_ok=dirs_exist_ok) 492 else: 493 copy_function(srcobj, dstname) 494 elif srcentry.is_dir(): 495 copytree(srcobj, dstname, symlinks, ignore, copy_function, 496 dirs_exist_ok=dirs_exist_ok) 497 else: 498 # Will raise a SpecialFileError for unsupported file types 499 copy_function(srcobj, dstname) 500 # catch the Error from the recursive copytree so that we can 501 # continue with other files 502 except Error as err: 503 errors.extend(err.args[0]) 504 except OSError as why: 505 errors.append((srcname, dstname, str(why))) 506 try: 507 copystat(src, dst) 508 except OSError as why: 509 # Copying file access times may fail on Windows 510 if getattr(why, 'winerror', None) is None: 511 errors.append((src, dst, str(why))) 512 if errors: 513 raise Error(errors) 514 return dst 515 516def copytree(src, dst, symlinks=False, ignore=None, copy_function=copy2, 517 ignore_dangling_symlinks=False, dirs_exist_ok=False): 518 """Recursively copy a directory tree and return the destination directory. 519 520 dirs_exist_ok dictates whether to raise an exception in case dst or any 521 missing parent directory already exists. 522 523 If exception(s) occur, an Error is raised with a list of reasons. 524 525 If the optional symlinks flag is true, symbolic links in the 526 source tree result in symbolic links in the destination tree; if 527 it is false, the contents of the files pointed to by symbolic 528 links are copied. If the file pointed by the symlink doesn't 529 exist, an exception will be added in the list of errors raised in 530 an Error exception at the end of the copy process. 531 532 You can set the optional ignore_dangling_symlinks flag to true if you 533 want to silence this exception. Notice that this has no effect on 534 platforms that don't support os.symlink. 535 536 The optional ignore argument is a callable. If given, it 537 is called with the `src` parameter, which is the directory 538 being visited by copytree(), and `names` which is the list of 539 `src` contents, as returned by os.listdir(): 540 541 callable(src, names) -> ignored_names 542 543 Since copytree() is called recursively, the callable will be 544 called once for each directory that is copied. It returns a 545 list of names relative to the `src` directory that should 546 not be copied. 547 548 The optional copy_function argument is a callable that will be used 549 to copy each file. It will be called with the source path and the 550 destination path as arguments. By default, copy2() is used, but any 551 function that supports the same signature (like copy()) can be used. 552 553 """ 554 sys.audit("shutil.copytree", src, dst) 555 with os.scandir(src) as itr: 556 entries = list(itr) 557 return _copytree(entries=entries, src=src, dst=dst, symlinks=symlinks, 558 ignore=ignore, copy_function=copy_function, 559 ignore_dangling_symlinks=ignore_dangling_symlinks, 560 dirs_exist_ok=dirs_exist_ok) 561 562if hasattr(os.stat_result, 'st_file_attributes'): 563 # Special handling for directory junctions to make them behave like 564 # symlinks for shutil.rmtree, since in general they do not appear as 565 # regular links. 566 def _rmtree_isdir(entry): 567 try: 568 st = entry.stat(follow_symlinks=False) 569 return (stat.S_ISDIR(st.st_mode) and not 570 (st.st_file_attributes & stat.FILE_ATTRIBUTE_REPARSE_POINT 571 and st.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT)) 572 except OSError: 573 return False 574 575 def _rmtree_islink(path): 576 try: 577 st = os.lstat(path) 578 return (stat.S_ISLNK(st.st_mode) or 579 (st.st_file_attributes & stat.FILE_ATTRIBUTE_REPARSE_POINT 580 and st.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT)) 581 except OSError: 582 return False 583else: 584 def _rmtree_isdir(entry): 585 try: 586 return entry.is_dir(follow_symlinks=False) 587 except OSError: 588 return False 589 590 def _rmtree_islink(path): 591 return os.path.islink(path) 592 593# version vulnerable to race conditions 594def _rmtree_unsafe(path, onerror): 595 try: 596 with os.scandir(path) as scandir_it: 597 entries = list(scandir_it) 598 except OSError: 599 onerror(os.scandir, path, sys.exc_info()) 600 entries = [] 601 for entry in entries: 602 fullname = entry.path 603 if _rmtree_isdir(entry): 604 try: 605 if entry.is_symlink(): 606 # This can only happen if someone replaces 607 # a directory with a symlink after the call to 608 # os.scandir or entry.is_dir above. 609 raise OSError("Cannot call rmtree on a symbolic link") 610 except OSError: 611 onerror(os.path.islink, fullname, sys.exc_info()) 612 continue 613 _rmtree_unsafe(fullname, onerror) 614 else: 615 try: 616 os.unlink(fullname) 617 except OSError: 618 onerror(os.unlink, fullname, sys.exc_info()) 619 try: 620 os.rmdir(path) 621 except OSError: 622 onerror(os.rmdir, path, sys.exc_info()) 623 624# Version using fd-based APIs to protect against races 625def _rmtree_safe_fd(topfd, path, onerror): 626 try: 627 with os.scandir(topfd) as scandir_it: 628 entries = list(scandir_it) 629 except OSError as err: 630 err.filename = path 631 onerror(os.scandir, path, sys.exc_info()) 632 return 633 for entry in entries: 634 fullname = os.path.join(path, entry.name) 635 try: 636 is_dir = entry.is_dir(follow_symlinks=False) 637 except OSError: 638 is_dir = False 639 else: 640 if is_dir: 641 try: 642 orig_st = entry.stat(follow_symlinks=False) 643 is_dir = stat.S_ISDIR(orig_st.st_mode) 644 except OSError: 645 onerror(os.lstat, fullname, sys.exc_info()) 646 continue 647 if is_dir: 648 try: 649 dirfd = os.open(entry.name, os.O_RDONLY, dir_fd=topfd) 650 except OSError: 651 onerror(os.open, fullname, sys.exc_info()) 652 else: 653 try: 654 if os.path.samestat(orig_st, os.fstat(dirfd)): 655 _rmtree_safe_fd(dirfd, fullname, onerror) 656 try: 657 os.rmdir(entry.name, dir_fd=topfd) 658 except OSError: 659 onerror(os.rmdir, fullname, sys.exc_info()) 660 else: 661 try: 662 # This can only happen if someone replaces 663 # a directory with a symlink after the call to 664 # os.scandir or stat.S_ISDIR above. 665 raise OSError("Cannot call rmtree on a symbolic " 666 "link") 667 except OSError: 668 onerror(os.path.islink, fullname, sys.exc_info()) 669 finally: 670 os.close(dirfd) 671 else: 672 try: 673 os.unlink(entry.name, dir_fd=topfd) 674 except OSError: 675 onerror(os.unlink, fullname, sys.exc_info()) 676 677_use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <= 678 os.supports_dir_fd and 679 os.scandir in os.supports_fd and 680 os.stat in os.supports_follow_symlinks) 681 682def rmtree(path, ignore_errors=False, onerror=None): 683 """Recursively delete a directory tree. 684 685 If ignore_errors is set, errors are ignored; otherwise, if onerror 686 is set, it is called to handle the error with arguments (func, 687 path, exc_info) where func is platform and implementation dependent; 688 path is the argument to that function that caused it to fail; and 689 exc_info is a tuple returned by sys.exc_info(). If ignore_errors 690 is false and onerror is None, an exception is raised. 691 692 """ 693 sys.audit("shutil.rmtree", path) 694 if ignore_errors: 695 def onerror(*args): 696 pass 697 elif onerror is None: 698 def onerror(*args): 699 raise 700 if _use_fd_functions: 701 # While the unsafe rmtree works fine on bytes, the fd based does not. 702 if isinstance(path, bytes): 703 path = os.fsdecode(path) 704 # Note: To guard against symlink races, we use the standard 705 # lstat()/open()/fstat() trick. 706 try: 707 orig_st = os.lstat(path) 708 except Exception: 709 onerror(os.lstat, path, sys.exc_info()) 710 return 711 try: 712 fd = os.open(path, os.O_RDONLY) 713 except Exception: 714 onerror(os.open, path, sys.exc_info()) 715 return 716 try: 717 if os.path.samestat(orig_st, os.fstat(fd)): 718 _rmtree_safe_fd(fd, path, onerror) 719 try: 720 os.rmdir(path) 721 except OSError: 722 onerror(os.rmdir, path, sys.exc_info()) 723 else: 724 try: 725 # symlinks to directories are forbidden, see bug #1669 726 raise OSError("Cannot call rmtree on a symbolic link") 727 except OSError: 728 onerror(os.path.islink, path, sys.exc_info()) 729 finally: 730 os.close(fd) 731 else: 732 try: 733 if _rmtree_islink(path): 734 # symlinks to directories are forbidden, see bug #1669 735 raise OSError("Cannot call rmtree on a symbolic link") 736 except OSError: 737 onerror(os.path.islink, path, sys.exc_info()) 738 # can't continue even if onerror hook returns 739 return 740 return _rmtree_unsafe(path, onerror) 741 742# Allow introspection of whether or not the hardening against symlink 743# attacks is supported on the current platform 744rmtree.avoids_symlink_attacks = _use_fd_functions 745 746def _basename(path): 747 """A basename() variant which first strips the trailing slash, if present. 748 Thus we always get the last component of the path, even for directories. 749 750 path: Union[PathLike, str] 751 752 e.g. 753 >>> os.path.basename('/bar/foo') 754 'foo' 755 >>> os.path.basename('/bar/foo/') 756 '' 757 >>> _basename('/bar/foo/') 758 'foo' 759 """ 760 path = os.fspath(path) 761 sep = os.path.sep + (os.path.altsep or '') 762 return os.path.basename(path.rstrip(sep)) 763 764def move(src, dst, copy_function=copy2): 765 """Recursively move a file or directory to another location. This is 766 similar to the Unix "mv" command. Return the file or directory's 767 destination. 768 769 If the destination is a directory or a symlink to a directory, the source 770 is moved inside the directory. The destination path must not already 771 exist. 772 773 If the destination already exists but is not a directory, it may be 774 overwritten depending on os.rename() semantics. 775 776 If the destination is on our current filesystem, then rename() is used. 777 Otherwise, src is copied to the destination and then removed. Symlinks are 778 recreated under the new name if os.rename() fails because of cross 779 filesystem renames. 780 781 The optional `copy_function` argument is a callable that will be used 782 to copy the source or it will be delegated to `copytree`. 783 By default, copy2() is used, but any function that supports the same 784 signature (like copy()) can be used. 785 786 A lot more could be done here... A look at a mv.c shows a lot of 787 the issues this implementation glosses over. 788 789 """ 790 sys.audit("shutil.move", src, dst) 791 real_dst = dst 792 if os.path.isdir(dst): 793 if _samefile(src, dst): 794 # We might be on a case insensitive filesystem, 795 # perform the rename anyway. 796 os.rename(src, dst) 797 return 798 799 # Using _basename instead of os.path.basename is important, as we must 800 # ignore any trailing slash to avoid the basename returning '' 801 real_dst = os.path.join(dst, _basename(src)) 802 803 if os.path.exists(real_dst): 804 raise Error("Destination path '%s' already exists" % real_dst) 805 try: 806 os.rename(src, real_dst) 807 except OSError: 808 if os.path.islink(src): 809 linkto = os.readlink(src) 810 os.symlink(linkto, real_dst) 811 os.unlink(src) 812 elif os.path.isdir(src): 813 if _destinsrc(src, dst): 814 raise Error("Cannot move a directory '%s' into itself" 815 " '%s'." % (src, dst)) 816 copytree(src, real_dst, copy_function=copy_function, 817 symlinks=True) 818 rmtree(src) 819 else: 820 copy_function(src, real_dst) 821 os.unlink(src) 822 return real_dst 823 824def _destinsrc(src, dst): 825 src = os.path.abspath(src) 826 dst = os.path.abspath(dst) 827 if not src.endswith(os.path.sep): 828 src += os.path.sep 829 if not dst.endswith(os.path.sep): 830 dst += os.path.sep 831 return dst.startswith(src) 832 833def _get_gid(name): 834 """Returns a gid, given a group name.""" 835 if getgrnam is None or name is None: 836 return None 837 try: 838 result = getgrnam(name) 839 except KeyError: 840 result = None 841 if result is not None: 842 return result[2] 843 return None 844 845def _get_uid(name): 846 """Returns an uid, given a user name.""" 847 if getpwnam is None or name is None: 848 return None 849 try: 850 result = getpwnam(name) 851 except KeyError: 852 result = None 853 if result is not None: 854 return result[2] 855 return None 856 857def _make_tarball(base_name, base_dir, compress="gzip", verbose=0, dry_run=0, 858 owner=None, group=None, logger=None): 859 """Create a (possibly compressed) tar file from all the files under 860 'base_dir'. 861 862 'compress' must be "gzip" (the default), "bzip2", "xz", or None. 863 864 'owner' and 'group' can be used to define an owner and a group for the 865 archive that is being built. If not provided, the current owner and group 866 will be used. 867 868 The output tar file will be named 'base_name' + ".tar", possibly plus 869 the appropriate compression extension (".gz", ".bz2", or ".xz"). 870 871 Returns the output filename. 872 """ 873 if compress is None: 874 tar_compression = '' 875 elif _ZLIB_SUPPORTED and compress == 'gzip': 876 tar_compression = 'gz' 877 elif _BZ2_SUPPORTED and compress == 'bzip2': 878 tar_compression = 'bz2' 879 elif _LZMA_SUPPORTED and compress == 'xz': 880 tar_compression = 'xz' 881 else: 882 raise ValueError("bad value for 'compress', or compression format not " 883 "supported : {0}".format(compress)) 884 885 import tarfile # late import for breaking circular dependency 886 887 compress_ext = '.' + tar_compression if compress else '' 888 archive_name = base_name + '.tar' + compress_ext 889 archive_dir = os.path.dirname(archive_name) 890 891 if archive_dir and not os.path.exists(archive_dir): 892 if logger is not None: 893 logger.info("creating %s", archive_dir) 894 if not dry_run: 895 os.makedirs(archive_dir) 896 897 # creating the tarball 898 if logger is not None: 899 logger.info('Creating tar archive') 900 901 uid = _get_uid(owner) 902 gid = _get_gid(group) 903 904 def _set_uid_gid(tarinfo): 905 if gid is not None: 906 tarinfo.gid = gid 907 tarinfo.gname = group 908 if uid is not None: 909 tarinfo.uid = uid 910 tarinfo.uname = owner 911 return tarinfo 912 913 if not dry_run: 914 tar = tarfile.open(archive_name, 'w|%s' % tar_compression) 915 try: 916 tar.add(base_dir, filter=_set_uid_gid) 917 finally: 918 tar.close() 919 920 return archive_name 921 922def _make_zipfile(base_name, base_dir, verbose=0, dry_run=0, logger=None): 923 """Create a zip file from all the files under 'base_dir'. 924 925 The output zip file will be named 'base_name' + ".zip". Returns the 926 name of the output zip file. 927 """ 928 import zipfile # late import for breaking circular dependency 929 930 zip_filename = base_name + ".zip" 931 archive_dir = os.path.dirname(base_name) 932 933 if archive_dir and not os.path.exists(archive_dir): 934 if logger is not None: 935 logger.info("creating %s", archive_dir) 936 if not dry_run: 937 os.makedirs(archive_dir) 938 939 if logger is not None: 940 logger.info("creating '%s' and adding '%s' to it", 941 zip_filename, base_dir) 942 943 if not dry_run: 944 with zipfile.ZipFile(zip_filename, "w", 945 compression=zipfile.ZIP_DEFLATED) as zf: 946 path = os.path.normpath(base_dir) 947 if path != os.curdir: 948 zf.write(path, path) 949 if logger is not None: 950 logger.info("adding '%s'", path) 951 for dirpath, dirnames, filenames in os.walk(base_dir): 952 for name in sorted(dirnames): 953 path = os.path.normpath(os.path.join(dirpath, name)) 954 zf.write(path, path) 955 if logger is not None: 956 logger.info("adding '%s'", path) 957 for name in filenames: 958 path = os.path.normpath(os.path.join(dirpath, name)) 959 if os.path.isfile(path): 960 zf.write(path, path) 961 if logger is not None: 962 logger.info("adding '%s'", path) 963 964 return zip_filename 965 966_ARCHIVE_FORMATS = { 967 'tar': (_make_tarball, [('compress', None)], "uncompressed tar file"), 968} 969 970if _ZLIB_SUPPORTED: 971 _ARCHIVE_FORMATS['gztar'] = (_make_tarball, [('compress', 'gzip')], 972 "gzip'ed tar-file") 973 _ARCHIVE_FORMATS['zip'] = (_make_zipfile, [], "ZIP file") 974 975if _BZ2_SUPPORTED: 976 _ARCHIVE_FORMATS['bztar'] = (_make_tarball, [('compress', 'bzip2')], 977 "bzip2'ed tar-file") 978 979if _LZMA_SUPPORTED: 980 _ARCHIVE_FORMATS['xztar'] = (_make_tarball, [('compress', 'xz')], 981 "xz'ed tar-file") 982 983def get_archive_formats(): 984 """Returns a list of supported formats for archiving and unarchiving. 985 986 Each element of the returned sequence is a tuple (name, description) 987 """ 988 formats = [(name, registry[2]) for name, registry in 989 _ARCHIVE_FORMATS.items()] 990 formats.sort() 991 return formats 992 993def register_archive_format(name, function, extra_args=None, description=''): 994 """Registers an archive format. 995 996 name is the name of the format. function is the callable that will be 997 used to create archives. If provided, extra_args is a sequence of 998 (name, value) tuples that will be passed as arguments to the callable. 999 description can be provided to describe the format, and will be returned 1000 by the get_archive_formats() function. 1001 """ 1002 if extra_args is None: 1003 extra_args = [] 1004 if not callable(function): 1005 raise TypeError('The %s object is not callable' % function) 1006 if not isinstance(extra_args, (tuple, list)): 1007 raise TypeError('extra_args needs to be a sequence') 1008 for element in extra_args: 1009 if not isinstance(element, (tuple, list)) or len(element) !=2: 1010 raise TypeError('extra_args elements are : (arg_name, value)') 1011 1012 _ARCHIVE_FORMATS[name] = (function, extra_args, description) 1013 1014def unregister_archive_format(name): 1015 del _ARCHIVE_FORMATS[name] 1016 1017def make_archive(base_name, format, root_dir=None, base_dir=None, verbose=0, 1018 dry_run=0, owner=None, group=None, logger=None): 1019 """Create an archive file (eg. zip or tar). 1020 1021 'base_name' is the name of the file to create, minus any format-specific 1022 extension; 'format' is the archive format: one of "zip", "tar", "gztar", 1023 "bztar", or "xztar". Or any other registered format. 1024 1025 'root_dir' is a directory that will be the root directory of the 1026 archive; ie. we typically chdir into 'root_dir' before creating the 1027 archive. 'base_dir' is the directory where we start archiving from; 1028 ie. 'base_dir' will be the common prefix of all files and 1029 directories in the archive. 'root_dir' and 'base_dir' both default 1030 to the current directory. Returns the name of the archive file. 1031 1032 'owner' and 'group' are used when creating a tar archive. By default, 1033 uses the current owner and group. 1034 """ 1035 sys.audit("shutil.make_archive", base_name, format, root_dir, base_dir) 1036 save_cwd = os.getcwd() 1037 if root_dir is not None: 1038 if logger is not None: 1039 logger.debug("changing into '%s'", root_dir) 1040 base_name = os.path.abspath(base_name) 1041 if not dry_run: 1042 os.chdir(root_dir) 1043 1044 if base_dir is None: 1045 base_dir = os.curdir 1046 1047 kwargs = {'dry_run': dry_run, 'logger': logger} 1048 1049 try: 1050 format_info = _ARCHIVE_FORMATS[format] 1051 except KeyError: 1052 raise ValueError("unknown archive format '%s'" % format) from None 1053 1054 func = format_info[0] 1055 for arg, val in format_info[1]: 1056 kwargs[arg] = val 1057 1058 if format != 'zip': 1059 kwargs['owner'] = owner 1060 kwargs['group'] = group 1061 1062 try: 1063 filename = func(base_name, base_dir, **kwargs) 1064 finally: 1065 if root_dir is not None: 1066 if logger is not None: 1067 logger.debug("changing back to '%s'", save_cwd) 1068 os.chdir(save_cwd) 1069 1070 return filename 1071 1072 1073def get_unpack_formats(): 1074 """Returns a list of supported formats for unpacking. 1075 1076 Each element of the returned sequence is a tuple 1077 (name, extensions, description) 1078 """ 1079 formats = [(name, info[0], info[3]) for name, info in 1080 _UNPACK_FORMATS.items()] 1081 formats.sort() 1082 return formats 1083 1084def _check_unpack_options(extensions, function, extra_args): 1085 """Checks what gets registered as an unpacker.""" 1086 # first make sure no other unpacker is registered for this extension 1087 existing_extensions = {} 1088 for name, info in _UNPACK_FORMATS.items(): 1089 for ext in info[0]: 1090 existing_extensions[ext] = name 1091 1092 for extension in extensions: 1093 if extension in existing_extensions: 1094 msg = '%s is already registered for "%s"' 1095 raise RegistryError(msg % (extension, 1096 existing_extensions[extension])) 1097 1098 if not callable(function): 1099 raise TypeError('The registered function must be a callable') 1100 1101 1102def register_unpack_format(name, extensions, function, extra_args=None, 1103 description=''): 1104 """Registers an unpack format. 1105 1106 `name` is the name of the format. `extensions` is a list of extensions 1107 corresponding to the format. 1108 1109 `function` is the callable that will be 1110 used to unpack archives. The callable will receive archives to unpack. 1111 If it's unable to handle an archive, it needs to raise a ReadError 1112 exception. 1113 1114 If provided, `extra_args` is a sequence of 1115 (name, value) tuples that will be passed as arguments to the callable. 1116 description can be provided to describe the format, and will be returned 1117 by the get_unpack_formats() function. 1118 """ 1119 if extra_args is None: 1120 extra_args = [] 1121 _check_unpack_options(extensions, function, extra_args) 1122 _UNPACK_FORMATS[name] = extensions, function, extra_args, description 1123 1124def unregister_unpack_format(name): 1125 """Removes the pack format from the registry.""" 1126 del _UNPACK_FORMATS[name] 1127 1128def _ensure_directory(path): 1129 """Ensure that the parent directory of `path` exists""" 1130 dirname = os.path.dirname(path) 1131 if not os.path.isdir(dirname): 1132 os.makedirs(dirname) 1133 1134def _unpack_zipfile(filename, extract_dir): 1135 """Unpack zip `filename` to `extract_dir` 1136 """ 1137 import zipfile # late import for breaking circular dependency 1138 1139 if not zipfile.is_zipfile(filename): 1140 raise ReadError("%s is not a zip file" % filename) 1141 1142 zip = zipfile.ZipFile(filename) 1143 try: 1144 for info in zip.infolist(): 1145 name = info.filename 1146 1147 # don't extract absolute paths or ones with .. in them 1148 if name.startswith('/') or '..' in name: 1149 continue 1150 1151 target = os.path.join(extract_dir, *name.split('/')) 1152 if not target: 1153 continue 1154 1155 _ensure_directory(target) 1156 if not name.endswith('/'): 1157 # file 1158 data = zip.read(info.filename) 1159 f = open(target, 'wb') 1160 try: 1161 f.write(data) 1162 finally: 1163 f.close() 1164 del data 1165 finally: 1166 zip.close() 1167 1168def _unpack_tarfile(filename, extract_dir): 1169 """Unpack tar/tar.gz/tar.bz2/tar.xz `filename` to `extract_dir` 1170 """ 1171 import tarfile # late import for breaking circular dependency 1172 try: 1173 tarobj = tarfile.open(filename) 1174 except tarfile.TarError: 1175 raise ReadError( 1176 "%s is not a compressed or uncompressed tar file" % filename) 1177 try: 1178 tarobj.extractall(extract_dir) 1179 finally: 1180 tarobj.close() 1181 1182_UNPACK_FORMATS = { 1183 'tar': (['.tar'], _unpack_tarfile, [], "uncompressed tar file"), 1184 'zip': (['.zip'], _unpack_zipfile, [], "ZIP file"), 1185} 1186 1187if _ZLIB_SUPPORTED: 1188 _UNPACK_FORMATS['gztar'] = (['.tar.gz', '.tgz'], _unpack_tarfile, [], 1189 "gzip'ed tar-file") 1190 1191if _BZ2_SUPPORTED: 1192 _UNPACK_FORMATS['bztar'] = (['.tar.bz2', '.tbz2'], _unpack_tarfile, [], 1193 "bzip2'ed tar-file") 1194 1195if _LZMA_SUPPORTED: 1196 _UNPACK_FORMATS['xztar'] = (['.tar.xz', '.txz'], _unpack_tarfile, [], 1197 "xz'ed tar-file") 1198 1199def _find_unpack_format(filename): 1200 for name, info in _UNPACK_FORMATS.items(): 1201 for extension in info[0]: 1202 if filename.endswith(extension): 1203 return name 1204 return None 1205 1206def unpack_archive(filename, extract_dir=None, format=None): 1207 """Unpack an archive. 1208 1209 `filename` is the name of the archive. 1210 1211 `extract_dir` is the name of the target directory, where the archive 1212 is unpacked. If not provided, the current working directory is used. 1213 1214 `format` is the archive format: one of "zip", "tar", "gztar", "bztar", 1215 or "xztar". Or any other registered format. If not provided, 1216 unpack_archive will use the filename extension and see if an unpacker 1217 was registered for that extension. 1218 1219 In case none is found, a ValueError is raised. 1220 """ 1221 sys.audit("shutil.unpack_archive", filename, extract_dir, format) 1222 1223 if extract_dir is None: 1224 extract_dir = os.getcwd() 1225 1226 extract_dir = os.fspath(extract_dir) 1227 filename = os.fspath(filename) 1228 1229 if format is not None: 1230 try: 1231 format_info = _UNPACK_FORMATS[format] 1232 except KeyError: 1233 raise ValueError("Unknown unpack format '{0}'".format(format)) from None 1234 1235 func = format_info[1] 1236 func(filename, extract_dir, **dict(format_info[2])) 1237 else: 1238 # we need to look at the registered unpackers supported extensions 1239 format = _find_unpack_format(filename) 1240 if format is None: 1241 raise ReadError("Unknown archive format '{0}'".format(filename)) 1242 1243 func = _UNPACK_FORMATS[format][1] 1244 kwargs = dict(_UNPACK_FORMATS[format][2]) 1245 func(filename, extract_dir, **kwargs) 1246 1247 1248if hasattr(os, 'statvfs'): 1249 1250 __all__.append('disk_usage') 1251 _ntuple_diskusage = collections.namedtuple('usage', 'total used free') 1252 _ntuple_diskusage.total.__doc__ = 'Total space in bytes' 1253 _ntuple_diskusage.used.__doc__ = 'Used space in bytes' 1254 _ntuple_diskusage.free.__doc__ = 'Free space in bytes' 1255 1256 def disk_usage(path): 1257 """Return disk usage statistics about the given path. 1258 1259 Returned value is a named tuple with attributes 'total', 'used' and 1260 'free', which are the amount of total, used and free space, in bytes. 1261 """ 1262 st = os.statvfs(path) 1263 free = st.f_bavail * st.f_frsize 1264 total = st.f_blocks * st.f_frsize 1265 used = (st.f_blocks - st.f_bfree) * st.f_frsize 1266 return _ntuple_diskusage(total, used, free) 1267 1268elif _WINDOWS: 1269 1270 __all__.append('disk_usage') 1271 _ntuple_diskusage = collections.namedtuple('usage', 'total used free') 1272 1273 def disk_usage(path): 1274 """Return disk usage statistics about the given path. 1275 1276 Returned values is a named tuple with attributes 'total', 'used' and 1277 'free', which are the amount of total, used and free space, in bytes. 1278 """ 1279 total, free = nt._getdiskusage(path) 1280 used = total - free 1281 return _ntuple_diskusage(total, used, free) 1282 1283 1284def chown(path, user=None, group=None): 1285 """Change owner user and group of the given path. 1286 1287 user and group can be the uid/gid or the user/group names, and in that case, 1288 they are converted to their respective uid/gid. 1289 """ 1290 sys.audit('shutil.chown', path, user, group) 1291 1292 if user is None and group is None: 1293 raise ValueError("user and/or group must be set") 1294 1295 _user = user 1296 _group = group 1297 1298 # -1 means don't change it 1299 if user is None: 1300 _user = -1 1301 # user can either be an int (the uid) or a string (the system username) 1302 elif isinstance(user, str): 1303 _user = _get_uid(user) 1304 if _user is None: 1305 raise LookupError("no such user: {!r}".format(user)) 1306 1307 if group is None: 1308 _group = -1 1309 elif not isinstance(group, int): 1310 _group = _get_gid(group) 1311 if _group is None: 1312 raise LookupError("no such group: {!r}".format(group)) 1313 1314 os.chown(path, _user, _group) 1315 1316def get_terminal_size(fallback=(80, 24)): 1317 """Get the size of the terminal window. 1318 1319 For each of the two dimensions, the environment variable, COLUMNS 1320 and LINES respectively, is checked. If the variable is defined and 1321 the value is a positive integer, it is used. 1322 1323 When COLUMNS or LINES is not defined, which is the common case, 1324 the terminal connected to sys.__stdout__ is queried 1325 by invoking os.get_terminal_size. 1326 1327 If the terminal size cannot be successfully queried, either because 1328 the system doesn't support querying, or because we are not 1329 connected to a terminal, the value given in fallback parameter 1330 is used. Fallback defaults to (80, 24) which is the default 1331 size used by many terminal emulators. 1332 1333 The value returned is a named tuple of type os.terminal_size. 1334 """ 1335 # columns, lines are the working values 1336 try: 1337 columns = int(os.environ['COLUMNS']) 1338 except (KeyError, ValueError): 1339 columns = 0 1340 1341 try: 1342 lines = int(os.environ['LINES']) 1343 except (KeyError, ValueError): 1344 lines = 0 1345 1346 # only query if necessary 1347 if columns <= 0 or lines <= 0: 1348 try: 1349 size = os.get_terminal_size(sys.__stdout__.fileno()) 1350 except (AttributeError, ValueError, OSError): 1351 # stdout is None, closed, detached, or not a terminal, or 1352 # os.get_terminal_size() is unsupported 1353 size = os.terminal_size(fallback) 1354 if columns <= 0: 1355 columns = size.columns 1356 if lines <= 0: 1357 lines = size.lines 1358 1359 return os.terminal_size((columns, lines)) 1360 1361 1362# Check that a given file can be accessed with the correct mode. 1363# Additionally check that `file` is not a directory, as on Windows 1364# directories pass the os.access check. 1365def _access_check(fn, mode): 1366 return (os.path.exists(fn) and os.access(fn, mode) 1367 and not os.path.isdir(fn)) 1368 1369 1370def which(cmd, mode=os.F_OK | os.X_OK, path=None): 1371 """Given a command, mode, and a PATH string, return the path which 1372 conforms to the given mode on the PATH, or None if there is no such 1373 file. 1374 1375 `mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result 1376 of os.environ.get("PATH"), or can be overridden with a custom search 1377 path. 1378 1379 """ 1380 # If we're given a path with a directory part, look it up directly rather 1381 # than referring to PATH directories. This includes checking relative to the 1382 # current directory, e.g. ./script 1383 if os.path.dirname(cmd): 1384 if _access_check(cmd, mode): 1385 return cmd 1386 return None 1387 1388 use_bytes = isinstance(cmd, bytes) 1389 1390 if path is None: 1391 path = os.environ.get("PATH", None) 1392 if path is None: 1393 try: 1394 path = os.confstr("CS_PATH") 1395 except (AttributeError, ValueError): 1396 # os.confstr() or CS_PATH is not available 1397 path = os.defpath 1398 # bpo-35755: Don't use os.defpath if the PATH environment variable is 1399 # set to an empty string 1400 1401 # PATH='' doesn't match, whereas PATH=':' looks in the current directory 1402 if not path: 1403 return None 1404 1405 if use_bytes: 1406 path = os.fsencode(path) 1407 path = path.split(os.fsencode(os.pathsep)) 1408 else: 1409 path = os.fsdecode(path) 1410 path = path.split(os.pathsep) 1411 1412 if sys.platform == "win32": 1413 # The current directory takes precedence on Windows. 1414 curdir = os.curdir 1415 if use_bytes: 1416 curdir = os.fsencode(curdir) 1417 if curdir not in path: 1418 path.insert(0, curdir) 1419 1420 # PATHEXT is necessary to check on Windows. 1421 pathext_source = os.getenv("PATHEXT") or _WIN_DEFAULT_PATHEXT 1422 pathext = [ext for ext in pathext_source.split(os.pathsep) if ext] 1423 1424 if use_bytes: 1425 pathext = [os.fsencode(ext) for ext in pathext] 1426 # See if the given file matches any of the expected path extensions. 1427 # This will allow us to short circuit when given "python.exe". 1428 # If it does match, only test that one, otherwise we have to try 1429 # others. 1430 if any(cmd.lower().endswith(ext.lower()) for ext in pathext): 1431 files = [cmd] 1432 else: 1433 files = [cmd + ext for ext in pathext] 1434 else: 1435 # On other platforms you don't have things like PATHEXT to tell you 1436 # what file suffixes are executable, so just pass on cmd as-is. 1437 files = [cmd] 1438 1439 seen = set() 1440 for dir in path: 1441 normdir = os.path.normcase(dir) 1442 if not normdir in seen: 1443 seen.add(normdir) 1444 for thefile in files: 1445 name = os.path.join(dir, thefile) 1446 if _access_check(name, mode): 1447 return name 1448 return None 1449