1# Copyright 2009 Google Inc. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Fake implementations for different file objects.""" 16 17import errno 18import io 19import os 20import sys 21import traceback 22from stat import ( 23 S_IFREG, 24 S_IFDIR, 25) 26from types import TracebackType 27from typing import ( 28 List, 29 Optional, 30 Callable, 31 Union, 32 Any, 33 Dict, 34 cast, 35 AnyStr, 36 NoReturn, 37 Iterator, 38 TextIO, 39 Type, 40 TYPE_CHECKING, 41) 42 43from pyfakefs import helpers 44from pyfakefs.helpers import ( 45 FakeStatResult, 46 BinaryBufferIO, 47 TextBufferIO, 48 is_int_type, 49 is_unicode_string, 50 to_string, 51 matching_string, 52 real_encoding, 53 AnyPath, 54 AnyString, 55 get_locale_encoding, 56 _OpenModes, 57 is_root, 58) 59 60if TYPE_CHECKING: 61 from pyfakefs.fake_filesystem import FakeFilesystem 62 63 64# Work around pyupgrade auto-rewriting `io.open()` to `open()`. 65io_open = io.open 66 67AnyFileWrapper = Union[ 68 "FakeFileWrapper", 69 "FakeDirWrapper", 70 "StandardStreamWrapper", 71 "FakePipeWrapper", 72] 73AnyFile = Union["FakeFile", "FakeDirectory"] 74 75 76class FakeLargeFileIoException(Exception): 77 """Exception thrown on unsupported operations for fake large files. 78 Fake large files have a size with no real content. 79 """ 80 81 def __init__(self, file_path: str) -> None: 82 super().__init__( 83 "Read and write operations not supported for " 84 "fake large file: %s" % file_path 85 ) 86 87 88class FakeFile: 89 """Provides the appearance of a real file. 90 91 Attributes currently faked out: 92 * `st_mode`: user-specified, otherwise S_IFREG 93 * `st_ctime`: the time.time() timestamp of the file change time (updated 94 each time a file's attributes is modified). 95 * `st_atime`: the time.time() timestamp when the file was last accessed. 96 * `st_mtime`: the time.time() timestamp when the file was last modified. 97 * `st_size`: the size of the file 98 * `st_nlink`: the number of hard links to the file 99 * `st_ino`: the inode number - a unique number identifying the file 100 * `st_dev`: a unique number identifying the (fake) file system device 101 the file belongs to 102 * `st_uid`: always set to USER_ID, which can be changed globally using 103 `set_uid` 104 * `st_gid`: always set to GROUP_ID, which can be changed globally using 105 `set_gid` 106 107 .. note:: The resolution for `st_ctime`, `st_mtime` and `st_atime` in the 108 real file system depends on the used file system (for example it is 109 only 1s for HFS+ and older Linux file systems, but much higher for 110 ext4 and NTFS). This is currently ignored by pyfakefs, which uses 111 the resolution of `time.time()`. 112 113 Under Windows, `st_atime` is not updated for performance reasons by 114 default. pyfakefs never updates `st_atime` under Windows, assuming 115 the default setting. 116 """ 117 118 stat_types = ( 119 "st_mode", 120 "st_ino", 121 "st_dev", 122 "st_nlink", 123 "st_uid", 124 "st_gid", 125 "st_size", 126 "st_atime", 127 "st_mtime", 128 "st_ctime", 129 "st_atime_ns", 130 "st_mtime_ns", 131 "st_ctime_ns", 132 ) 133 134 def __init__( 135 self, 136 name: AnyStr, 137 st_mode: int = S_IFREG | helpers.PERM_DEF_FILE, 138 contents: Optional[AnyStr] = None, 139 filesystem: Optional["FakeFilesystem"] = None, 140 encoding: Optional[str] = None, 141 errors: Optional[str] = None, 142 side_effect: Optional[Callable[["FakeFile"], None]] = None, 143 open_modes: Optional[_OpenModes] = None, 144 ): 145 """ 146 Args: 147 name: Name of the file/directory, without parent path information 148 st_mode: The stat.S_IF* constant representing the file type (i.e. 149 stat.S_IFREG, stat.S_IFDIR), and the file permissions. 150 If no file type is set (e.g. permission flags only), a 151 regular file type is assumed. 152 contents: The contents of the filesystem object; should be a string 153 or byte object for regular files, and a dict of other 154 FakeFile or FakeDirectory objects with the file names as 155 keys for FakeDirectory objects 156 filesystem: The fake filesystem where the file is created. 157 encoding: If contents is a unicode string, the encoding used 158 for serialization. 159 errors: The error mode used for encoding/decoding errors. 160 side_effect: function handle that is executed when file is written, 161 must accept the file object as an argument. 162 open_modes: The modes the file was opened with (e.g. can read, write etc.) 163 """ 164 # to be backwards compatible regarding argument order, we raise on None 165 if filesystem is None: 166 raise ValueError("filesystem shall not be None") 167 self.filesystem: "FakeFilesystem" = filesystem 168 self._side_effect: Optional[Callable] = side_effect 169 self.name: AnyStr = name # type: ignore[assignment] 170 self.stat_result = FakeStatResult( 171 filesystem.is_windows_fs, 172 helpers.get_uid(), 173 helpers.get_gid(), 174 helpers.now(), 175 ) 176 if st_mode >> 12 == 0: 177 st_mode |= S_IFREG 178 self.stat_result.st_mode = st_mode 179 self.st_size: int = 0 180 self.encoding: Optional[str] = real_encoding(encoding) 181 self.errors: str = errors or "strict" 182 self._byte_contents: Optional[bytes] = self._encode_contents(contents) 183 self.stat_result.st_size = ( 184 len(self._byte_contents) if self._byte_contents is not None else 0 185 ) 186 self.epoch: int = 0 187 self.parent_dir: Optional[FakeDirectory] = None 188 # Linux specific: extended file system attributes 189 self.xattr: Dict = {} 190 self.opened_as: AnyString = "" 191 self.open_modes = open_modes 192 193 @property 194 def byte_contents(self) -> Optional[bytes]: 195 """Return the contents as raw byte array.""" 196 return self._byte_contents 197 198 @property 199 def contents(self) -> Optional[str]: 200 """Return the contents as string with the original encoding.""" 201 if isinstance(self.byte_contents, bytes): 202 return self.byte_contents.decode( 203 self.encoding or get_locale_encoding(), 204 errors=self.errors, 205 ) 206 return None 207 208 @property 209 def st_ctime(self) -> float: 210 """Return the creation time of the fake file.""" 211 return self.stat_result.st_ctime 212 213 @st_ctime.setter 214 def st_ctime(self, val: float) -> None: 215 """Set the creation time of the fake file.""" 216 self.stat_result.st_ctime = val 217 218 @property 219 def st_atime(self) -> float: 220 """Return the access time of the fake file.""" 221 return self.stat_result.st_atime 222 223 @st_atime.setter 224 def st_atime(self, val: float) -> None: 225 """Set the access time of the fake file.""" 226 self.stat_result.st_atime = val 227 228 @property 229 def st_mtime(self) -> float: 230 """Return the modification time of the fake file.""" 231 return self.stat_result.st_mtime 232 233 @st_mtime.setter 234 def st_mtime(self, val: float) -> None: 235 """Set the modification time of the fake file.""" 236 self.stat_result.st_mtime = val 237 238 def set_large_file_size(self, st_size: int) -> None: 239 """Sets the self.st_size attribute and replaces self.content with None. 240 241 Provided specifically to simulate very large files without regards 242 to their content (which wouldn't fit in memory). 243 Note that read/write operations with such a file raise 244 :py:class:`FakeLargeFileIoException`. 245 246 Args: 247 st_size: (int) The desired file size 248 249 Raises: 250 OSError: if the st_size is not a non-negative integer, 251 or if st_size exceeds the available file system space 252 """ 253 self._check_positive_int(st_size) 254 if self.st_size: 255 self.size = 0 256 if self.filesystem: 257 self.filesystem.change_disk_usage(st_size, self.name, self.st_dev) 258 self.st_size = st_size 259 self._byte_contents = None 260 261 def _check_positive_int(self, size: int) -> None: 262 # the size should be an positive integer value 263 if not is_int_type(size) or size < 0: 264 self.filesystem.raise_os_error(errno.ENOSPC, self.name) 265 266 def is_large_file(self) -> bool: 267 """Return `True` if this file was initialized with size 268 but no contents. 269 """ 270 return self._byte_contents is None 271 272 def _encode_contents(self, contents: Union[str, bytes, None]) -> Optional[bytes]: 273 if is_unicode_string(contents): 274 contents = bytes( 275 cast(str, contents), 276 self.encoding or get_locale_encoding(), 277 self.errors, 278 ) 279 return cast(bytes, contents) 280 281 def set_initial_contents(self, contents: AnyStr) -> bool: 282 """Sets the file contents and size. 283 Called internally after initial file creation. 284 285 Args: 286 contents: string, new content of file. 287 288 Returns: 289 True if the contents have been changed. 290 291 Raises: 292 OSError: if the st_size is not a non-negative integer, 293 or if st_size exceeds the available file system space 294 """ 295 byte_contents = self._encode_contents(contents) 296 changed = self._byte_contents != byte_contents 297 st_size = len(byte_contents) if byte_contents else 0 298 299 current_size = self.st_size or 0 300 self.filesystem.change_disk_usage( 301 st_size - current_size, self.name, self.st_dev 302 ) 303 self._byte_contents = byte_contents 304 self.st_size = st_size 305 self.epoch += 1 306 return changed 307 308 def set_contents(self, contents: AnyStr, encoding: Optional[str] = None) -> bool: 309 """Sets the file contents and size and increases the modification time. 310 Also executes the side_effects if available. 311 312 Args: 313 contents: (str, bytes) new content of file. 314 encoding: (str) the encoding to be used for writing the contents 315 if they are a unicode string. 316 If not given, the locale preferred encoding is used. 317 318 Returns: 319 True if the contents have been changed. 320 321 Raises: 322 OSError: if `st_size` is not a non-negative integer, 323 or if it exceeds the available file system space. 324 """ 325 self.encoding = real_encoding(encoding) 326 changed = self.set_initial_contents(contents) 327 if self._side_effect is not None: 328 self._side_effect(self) 329 return changed 330 331 @property 332 def size(self) -> int: 333 """Return the size in bytes of the file contents.""" 334 return self.st_size 335 336 @size.setter 337 def size(self, st_size: int) -> None: 338 """Resizes file content, padding with nulls if new size exceeds the 339 old size. 340 341 Args: 342 st_size: The desired size for the file. 343 344 Raises: 345 OSError: if the st_size arg is not a non-negative integer 346 or if st_size exceeds the available file system space 347 """ 348 349 self._check_positive_int(st_size) 350 current_size = self.st_size or 0 351 self.filesystem.change_disk_usage( 352 st_size - current_size, self.name, self.st_dev 353 ) 354 if self._byte_contents: 355 if st_size < current_size: 356 self._byte_contents = self._byte_contents[:st_size] 357 else: 358 self._byte_contents += b"\0" * (st_size - current_size) 359 self.st_size = st_size 360 self.epoch += 1 361 362 @property 363 def path(self) -> AnyStr: # type: ignore[type-var] 364 """Return the full path of the current object.""" 365 names: List[AnyStr] = [] # pytype: disable=invalid-annotation 366 obj: Optional[FakeFile] = self 367 while obj: 368 names.insert(0, matching_string(self.name, obj.name)) # type: ignore 369 obj = obj.parent_dir 370 sep = self.filesystem.get_path_separator(names[0]) 371 if names[0] == sep: 372 names.pop(0) 373 dir_path = sep.join(names) 374 drive = self.filesystem.splitdrive(dir_path)[0] 375 # if a Windows path already starts with a drive or UNC path, 376 # no extra separator is needed 377 if not drive: 378 dir_path = sep + dir_path 379 else: 380 dir_path = sep.join(names) 381 return self.filesystem.absnormpath(dir_path) 382 383 if sys.version_info >= (3, 12): 384 385 @property 386 def is_junction(self) -> bool: 387 return self.filesystem.isjunction(self.path) 388 389 def __getattr__(self, item: str) -> Any: 390 """Forward some properties to stat_result.""" 391 if item in self.stat_types: 392 return getattr(self.stat_result, item) 393 return super().__getattribute__(item) 394 395 def __setattr__(self, key: str, value: Any) -> None: 396 """Forward some properties to stat_result.""" 397 if key in self.stat_types: 398 return setattr(self.stat_result, key, value) 399 return super().__setattr__(key, value) 400 401 def __str__(self) -> str: 402 return f"{self.name!r}({self.st_mode:o})" 403 404 def has_permission(self, permission_bits: int) -> bool: 405 """Checks if the given permissions are set in the fake file. 406 407 Args: 408 permission_bits: The permission bits as set for the user. 409 410 Returns: 411 True if the permissions are set in the correct class (user/group/other). 412 """ 413 if helpers.get_uid() == self.stat_result.st_uid: 414 return self.st_mode & permission_bits == permission_bits 415 if helpers.get_gid() == self.stat_result.st_gid: 416 return self.st_mode & (permission_bits >> 3) == permission_bits >> 3 417 return self.st_mode & (permission_bits >> 6) == permission_bits >> 6 418 419 420class FakeNullFile(FakeFile): 421 def __init__(self, filesystem: "FakeFilesystem") -> None: 422 super().__init__(filesystem.devnull, filesystem=filesystem, contents="") 423 424 @property 425 def byte_contents(self) -> bytes: 426 return b"" 427 428 def set_initial_contents(self, contents: AnyStr) -> bool: 429 return False 430 431 432class FakeFileFromRealFile(FakeFile): 433 """Represents a fake file copied from the real file system. 434 435 The contents of the file are read on demand only. 436 """ 437 438 def __init__( 439 self, 440 file_path: str, 441 filesystem: "FakeFilesystem", 442 side_effect: Optional[Callable] = None, 443 ) -> None: 444 """ 445 Args: 446 file_path: Path to the existing file. 447 filesystem: The fake filesystem where the file is created. 448 449 Raises: 450 OSError: if the file does not exist in the real file system. 451 OSError: if the file already exists in the fake file system. 452 """ 453 super().__init__( 454 name=os.path.basename(file_path), 455 filesystem=filesystem, 456 side_effect=side_effect, 457 ) 458 self.contents_read = False 459 460 @property 461 def byte_contents(self) -> Optional[bytes]: 462 if not self.contents_read: 463 self.contents_read = True 464 with io_open(self.file_path, "rb") as f: 465 self._byte_contents = f.read() 466 # On MacOS and BSD, the above io.open() updates atime on the real file 467 self.st_atime = os.stat(self.file_path).st_atime 468 return self._byte_contents 469 470 def set_contents(self, contents, encoding=None): 471 self.contents_read = True 472 super().set_contents(contents, encoding) 473 474 def is_large_file(self): 475 """The contents are never faked.""" 476 return False 477 478 479class FakeDirectory(FakeFile): 480 """Provides the appearance of a real directory.""" 481 482 def __init__( 483 self, 484 name: str, 485 perm_bits: int = helpers.PERM_DEF, 486 filesystem: Optional["FakeFilesystem"] = None, 487 ): 488 """ 489 Args: 490 name: name of the file/directory, without parent path information 491 perm_bits: permission bits. defaults to 0o777. 492 filesystem: if set, the fake filesystem where the directory 493 is created 494 """ 495 FakeFile.__init__(self, name, S_IFDIR | perm_bits, "", filesystem=filesystem) 496 # directories have the link count of contained entries, 497 # including '.' and '..' 498 self.st_nlink += 1 499 self._entries: Dict[str, AnyFile] = {} 500 501 def set_contents(self, contents: AnyStr, encoding: Optional[str] = None) -> bool: 502 raise self.filesystem.raise_os_error(errno.EISDIR, self.path) 503 504 @property 505 def entries(self) -> Dict[str, FakeFile]: 506 """Return the list of contained directory entries.""" 507 return self._entries 508 509 @property 510 def ordered_dirs(self) -> List[str]: 511 """Return the list of contained directory entry names ordered by 512 creation order. 513 """ 514 return [ 515 item[0] 516 for item in sorted(self._entries.items(), key=lambda entry: entry[1].st_ino) 517 ] 518 519 def add_entry(self, path_object: FakeFile) -> None: 520 """Adds a child FakeFile to this directory. 521 522 Args: 523 path_object: FakeFile instance to add as a child of this directory. 524 525 Raises: 526 OSError: if the directory has no write permission (Posix only) 527 OSError: if the file or directory to be added already exists 528 """ 529 if ( 530 not helpers.is_root() 531 and not self.filesystem.is_windows_fs 532 and not self.has_permission(helpers.PERM_WRITE) 533 ): 534 raise OSError(errno.EACCES, "Permission Denied", self.path) 535 536 path_object_name: str = to_string(path_object.name) 537 if path_object_name in self.entries: 538 self.filesystem.raise_os_error(errno.EEXIST, self.path) 539 540 self._entries[path_object_name] = path_object 541 path_object.parent_dir = self 542 if path_object.st_ino is None: 543 self.filesystem.last_ino += 1 544 path_object.st_ino = self.filesystem.last_ino 545 self.st_nlink += 1 546 path_object.st_nlink += 1 547 path_object.st_dev = self.st_dev 548 if path_object.st_nlink == 1: 549 self.filesystem.change_disk_usage( 550 path_object.size, path_object.name, self.st_dev 551 ) 552 553 def get_entry(self, pathname_name: str) -> AnyFile: 554 """Retrieves the specified child file or directory entry. 555 556 Args: 557 pathname_name: The basename of the child object to retrieve. 558 559 Returns: 560 The fake file or directory object. 561 562 Raises: 563 KeyError: if no child exists by the specified name. 564 """ 565 pathname_name = self._normalized_entryname(pathname_name) 566 return self.entries[to_string(pathname_name)] 567 568 def _normalized_entryname(self, pathname_name: str) -> str: 569 if not self.filesystem.is_case_sensitive: 570 matching_names = [ 571 name for name in self.entries if name.lower() == pathname_name.lower() 572 ] 573 if matching_names: 574 pathname_name = matching_names[0] 575 return pathname_name 576 577 def remove_entry(self, pathname_name: str, recursive: bool = True) -> None: 578 """Removes the specified child file or directory. 579 580 Args: 581 pathname_name: Basename of the child object to remove. 582 recursive: If True (default), the entries in contained directories 583 are deleted first. Used to propagate removal errors 584 (e.g. permission problems) from contained entries. 585 586 Raises: 587 KeyError: if no child exists by the specified name. 588 OSError: if user lacks permission to delete the file, 589 or (Windows only) the file is open. 590 """ 591 pathname_name = self._normalized_entryname(pathname_name) 592 entry = self.get_entry(pathname_name) 593 if self.filesystem.is_windows_fs: 594 if not is_root() and entry.st_mode & helpers.PERM_WRITE == 0: 595 self.filesystem.raise_os_error(errno.EACCES, pathname_name) 596 if self.filesystem.has_open_file(entry): 597 raise_error = True 598 if os.name == "posix" and not hasattr(os, "O_TMPFILE"): 599 # special handling for emulating Windows under macOS and PyPi 600 # tempfile uses unlink based on the real OS while deleting 601 # a temporary file, so we ignore that error in this specific case 602 st = traceback.extract_stack(limit=6) 603 if sys.version_info < (3, 10): 604 if ( 605 st[0].name == "TemporaryFile" 606 and st[0].line == "_os.unlink(name)" 607 ): 608 raise_error = False 609 else: 610 # TemporaryFile implementation has changed in Python 3.10 611 if st[0].name == "opener" and st[0].line == "_os.unlink(name)": 612 raise_error = False 613 if raise_error: 614 self.filesystem.raise_os_error(errno.EACCES, pathname_name) 615 else: 616 if not helpers.is_root() and not self.has_permission( 617 helpers.PERM_WRITE | helpers.PERM_EXE 618 ): 619 self.filesystem.raise_os_error(errno.EACCES, pathname_name) 620 621 if recursive and isinstance(entry, FakeDirectory): 622 while entry.entries: 623 entry.remove_entry(list(entry.entries)[0]) 624 elif entry.st_nlink == 1: 625 self.filesystem.change_disk_usage(-entry.size, pathname_name, entry.st_dev) 626 627 self.st_nlink -= 1 628 entry.st_nlink -= 1 629 assert entry.st_nlink >= 0 630 631 del self.entries[to_string(pathname_name)] 632 633 @property 634 def size(self) -> int: 635 """Return the total size of all files contained 636 in this directory tree. 637 """ 638 return sum([item[1].size for item in self.entries.items()]) 639 640 @size.setter 641 def size(self, st_size: int) -> None: 642 """Setting the size is an error for a directory.""" 643 raise self.filesystem.raise_os_error(errno.EISDIR, self.path) 644 645 def has_parent_object(self, dir_object: "FakeDirectory") -> bool: 646 """Return `True` if dir_object is a direct or indirect parent 647 directory, or if both are the same object.""" 648 obj: Optional[FakeDirectory] = self 649 while obj: 650 if obj == dir_object: 651 return True 652 obj = obj.parent_dir 653 return False 654 655 def __str__(self) -> str: 656 description = super().__str__() + ":\n" 657 for item in self.entries: 658 item_desc = self.entries[item].__str__() 659 for line in item_desc.split("\n"): 660 if line: 661 description = description + " " + line + "\n" 662 return description 663 664 665class FakeDirectoryFromRealDirectory(FakeDirectory): 666 """Represents a fake directory copied from the real file system. 667 668 The contents of the directory are read on demand only. 669 """ 670 671 def __init__( 672 self, 673 source_path: AnyPath, 674 filesystem: "FakeFilesystem", 675 read_only: bool, 676 target_path: Optional[AnyPath] = None, 677 ): 678 """ 679 Args: 680 source_path: Full directory path. 681 filesystem: The fake filesystem where the directory is created. 682 read_only: If set, all files under the directory are treated 683 as read-only, e.g. a write access raises an exception; 684 otherwise, writing to the files changes the fake files 685 only as usually. 686 target_path: If given, the target path of the directory, 687 otherwise the target is the same as `source_path`. 688 689 Raises: 690 OSError: if the directory does not exist in the real file system 691 """ 692 target_path = target_path or source_path 693 real_stat = os.stat(source_path) 694 super().__init__( 695 name=to_string(os.path.split(target_path)[1]), 696 perm_bits=real_stat.st_mode, 697 filesystem=filesystem, 698 ) 699 700 self.st_ctime = real_stat.st_ctime 701 self.st_atime = real_stat.st_atime 702 self.st_mtime = real_stat.st_mtime 703 self.st_gid = real_stat.st_gid 704 self.st_uid = real_stat.st_uid 705 self.source_path = source_path # type: ignore 706 self.read_only = read_only 707 self.contents_read = False 708 709 @property 710 def entries(self) -> Dict[str, FakeFile]: 711 """Return the list of contained directory entries, loading them 712 if not already loaded.""" 713 if not self.contents_read: 714 self.contents_read = True 715 base = self.path 716 for entry in os.listdir(self.source_path): 717 source_path = os.path.join(self.source_path, entry) 718 target_path = os.path.join(base, entry) # type: ignore 719 if os.path.islink(source_path): 720 self.filesystem.add_real_symlink(source_path, target_path) 721 elif os.path.isdir(source_path): 722 self.filesystem.add_real_directory( 723 source_path, self.read_only, target_path=target_path 724 ) 725 else: 726 self.filesystem.add_real_file( 727 source_path, self.read_only, target_path=target_path 728 ) 729 return self._entries 730 731 @property 732 def size(self) -> int: 733 # we cannot get the size until the contents are loaded 734 if not self.contents_read: 735 return 0 736 return super().size 737 738 @size.setter 739 def size(self, st_size: int) -> None: 740 raise self.filesystem.raise_os_error(errno.EISDIR, self.path) 741 742 743class FakeFileWrapper: 744 """Wrapper for a stream object for use by a FakeFile object. 745 746 If the wrapper has any data written to it, it will propagate to 747 the FakeFile object on close() or flush(). 748 """ 749 750 def __init__( 751 self, 752 file_object: FakeFile, 753 file_path: AnyStr, 754 update: bool, 755 read: bool, 756 append: bool, 757 delete_on_close: bool, 758 filesystem: "FakeFilesystem", 759 newline: Optional[str], 760 binary: bool, 761 closefd: bool, 762 encoding: Optional[str], 763 errors: Optional[str], 764 buffering: int, 765 raw_io: bool, 766 opened_as_fd: bool, 767 is_stream: bool = False, 768 ): 769 self.file_object = file_object 770 self.file_path = file_path # type: ignore[var-annotated] 771 self._append = append 772 self._read = read 773 self.allow_update = update 774 self._closefd = closefd 775 self._file_epoch = file_object.epoch 776 self.raw_io = raw_io 777 self._binary = binary 778 self.opened_as_fd = opened_as_fd 779 self.is_stream = is_stream 780 self._changed = False 781 self._buffer_size = buffering 782 if self._buffer_size == 0 and not binary: 783 raise ValueError("can't have unbuffered text I/O") 784 # buffer_size is ignored in text mode 785 elif self._buffer_size == -1 or not binary: 786 self._buffer_size = io.DEFAULT_BUFFER_SIZE 787 self._use_line_buffer = not binary and buffering == 1 788 789 contents = file_object.byte_contents 790 self._encoding = encoding or get_locale_encoding() 791 errors = errors or "strict" 792 self._io: Union[BinaryBufferIO, TextBufferIO] = ( 793 BinaryBufferIO(contents) 794 if binary 795 else TextBufferIO( 796 contents, encoding=encoding, newline=newline, errors=errors 797 ) 798 ) 799 self._read_whence = 0 800 self._read_seek = 0 801 self._flush_pos = 0 802 if contents: 803 self._flush_pos = len(contents) 804 if update: 805 if not append: 806 self._io.seek(0) 807 else: 808 self._io.seek(self._flush_pos) 809 self._read_seek = self._io.tell() 810 811 if delete_on_close: 812 assert filesystem, "delete_on_close=True requires filesystem" 813 self._filesystem = filesystem 814 self.delete_on_close = delete_on_close 815 # override, don't modify FakeFile.name, as FakeFilesystem expects 816 # it to be the file name only, no directories. 817 self.name = file_object.opened_as 818 self.filedes: Optional[int] = None 819 820 def __enter__(self) -> "FakeFileWrapper": 821 """To support usage of this fake file with the 'with' statement.""" 822 return self 823 824 def __exit__( 825 self, 826 exc_type: Optional[Type[BaseException]], 827 exc_val: Optional[BaseException], 828 exc_tb: Optional[TracebackType], 829 ) -> None: 830 """To support usage of this fake file with the 'with' statement.""" 831 self.close() 832 833 def _raise(self, message: str) -> NoReturn: 834 if self.raw_io: 835 self._filesystem.raise_os_error(errno.EBADF, self.file_path) 836 raise io.UnsupportedOperation(message) 837 838 def get_object(self) -> FakeFile: 839 """Return the FakeFile object that is wrapped 840 by the current instance. 841 """ 842 return self.file_object 843 844 def fileno(self) -> int: 845 """Return the file descriptor of the file object.""" 846 if self.filedes is not None: 847 return self.filedes 848 raise OSError(errno.EBADF, "Invalid file descriptor") 849 850 def close(self) -> None: 851 """Close the file.""" 852 self.close_fd(self.filedes) 853 854 def close_fd(self, fd: Optional[int]) -> None: 855 """Close the file for the given file descriptor.""" 856 857 # ignore closing a closed file 858 if not self._is_open(): 859 return 860 861 # for raw io, all writes are flushed immediately 862 if not self.raw_io: 863 try: 864 self.flush() 865 except OSError as e: 866 if e.errno == errno.EBADF: 867 # if we get here, we have an open file descriptor 868 # without write permission, which has to be closed 869 assert self.filedes 870 self._filesystem.close_open_file(self.filedes) 871 raise 872 873 if self._filesystem.is_windows_fs and self._changed: 874 self.file_object.st_mtime = helpers.now() 875 876 assert fd is not None 877 if self._closefd: 878 self._filesystem.close_open_file(fd) 879 else: 880 open_files = self._filesystem.open_files[fd] 881 assert open_files is not None 882 open_files.remove(self) 883 if self.delete_on_close: 884 self._filesystem.remove_object( 885 self.get_object().path # type: ignore[arg-type] 886 ) 887 888 @property 889 def closed(self) -> bool: 890 """Simulate the `closed` attribute on file.""" 891 return not self._is_open() 892 893 def _try_flush(self, old_pos: int) -> None: 894 """Try to flush and reset the position if it fails.""" 895 flush_pos = self._flush_pos 896 try: 897 self.flush() 898 except OSError: 899 # write failed - reset to previous position 900 self._io.seek(old_pos) 901 self._io.truncate() 902 self._flush_pos = flush_pos 903 raise 904 905 def flush(self) -> None: 906 """Flush file contents to 'disk'.""" 907 if self.is_stream: 908 return 909 910 self._check_open_file() 911 912 if self.allow_update: 913 contents = self._io.getvalue() 914 if self._append: 915 self._sync_io() 916 old_contents = self.file_object.byte_contents 917 assert old_contents is not None 918 contents = old_contents + contents[self._flush_pos :] 919 self._set_stream_contents(contents) 920 else: 921 self._io.flush() 922 changed = self.file_object.set_contents(contents, self._encoding) 923 self.update_flush_pos() 924 if changed: 925 if self._filesystem.is_windows_fs: 926 self._changed = True 927 else: 928 current_time = helpers.now() 929 self.file_object.st_ctime = current_time 930 self.file_object.st_mtime = current_time 931 self._file_epoch = self.file_object.epoch 932 self._flush_related_files() 933 else: 934 buf_length = len(self._io.getvalue()) 935 content_length = 0 936 if self.file_object.byte_contents is not None: 937 content_length = len(self.file_object.byte_contents) 938 # an error is only raised if there is something to flush 939 if content_length != buf_length: 940 self._filesystem.raise_os_error(errno.EBADF) 941 942 def update_flush_pos(self) -> None: 943 self._flush_pos = self._io.tell() 944 945 def _flush_related_files(self) -> None: 946 for open_files in self._filesystem.open_files[3:]: 947 if open_files is not None: 948 for open_file in open_files: 949 if ( 950 open_file is not self 951 and isinstance(open_file, FakeFileWrapper) 952 and self.file_object == open_file.file_object 953 and not open_file._append 954 ): 955 open_file._sync_io() 956 957 def seek(self, offset: int, whence: int = 0) -> None: 958 """Move read/write pointer in 'file'.""" 959 self._check_open_file() 960 if not self._append: 961 self._io.seek(offset, whence) 962 else: 963 self._read_seek = offset 964 self._read_whence = whence 965 if not self.is_stream: 966 self.flush() 967 968 def tell(self) -> int: 969 """Return the file's current position. 970 971 Returns: 972 int, file's current position in bytes. 973 """ 974 self._check_open_file() 975 if not self.is_stream: 976 self.flush() 977 978 if not self._append: 979 return self._io.tell() 980 if self._read_whence: 981 write_seek = self._io.tell() 982 self._io.seek(self._read_seek, self._read_whence) 983 self._read_seek = self._io.tell() 984 self._read_whence = 0 985 self._io.seek(write_seek) 986 return self._read_seek 987 988 def _sync_io(self) -> None: 989 """Update the stream with changes to the file object contents.""" 990 if self._file_epoch == self.file_object.epoch: 991 return 992 993 contents = self.file_object.byte_contents 994 assert contents is not None 995 self._set_stream_contents(contents) 996 self._file_epoch = self.file_object.epoch 997 998 def _set_stream_contents(self, contents: bytes) -> None: 999 whence = self._io.tell() 1000 self._io.seek(0) 1001 self._io.truncate() 1002 self._io.putvalue(contents) 1003 if not self._append: 1004 self._io.seek(whence) 1005 1006 def _read_wrappers(self, name: str) -> Callable: 1007 """Wrap a stream attribute in a read wrapper. 1008 1009 Returns a read_wrapper which tracks our own read pointer since the 1010 stream object has no concept of a different read and write pointer. 1011 1012 Args: 1013 name: The name of the attribute to wrap. Should be a read call. 1014 1015 Returns: 1016 The read_wrapper function. 1017 """ 1018 io_attr = getattr(self._io, name) 1019 1020 def read_wrapper(*args, **kwargs): 1021 """Wrap all read calls to the stream object. 1022 1023 We do this to track the read pointer separate from the write 1024 pointer. Anything that wants to read from the stream object 1025 while we're in append mode goes through this. 1026 1027 Args: 1028 *args: pass through args 1029 **kwargs: pass through kwargs 1030 Returns: 1031 Wrapped stream object method 1032 """ 1033 self._io.seek(self._read_seek, self._read_whence) 1034 ret_value = io_attr(*args, **kwargs) 1035 self._read_seek = self._io.tell() 1036 self._read_whence = 0 1037 self._io.seek(0, 2) 1038 return ret_value 1039 1040 return read_wrapper 1041 1042 def _other_wrapper(self, name: str) -> Callable: 1043 """Wrap a stream attribute in an other_wrapper. 1044 1045 Args: 1046 name: the name of the stream attribute to wrap. 1047 1048 Returns: 1049 other_wrapper which is described below. 1050 """ 1051 io_attr = getattr(self._io, name) 1052 1053 def other_wrapper(*args, **kwargs): 1054 """Wrap all other calls to the stream Object. 1055 1056 We do this to track changes to the write pointer. Anything that 1057 moves the write pointer in a file open for appending should move 1058 the read pointer as well. 1059 1060 Args: 1061 *args: Pass through args. 1062 **kwargs: Pass through kwargs. 1063 1064 Returns: 1065 Wrapped stream object method. 1066 """ 1067 write_seek = self._io.tell() 1068 ret_value = io_attr(*args, **kwargs) 1069 if write_seek != self._io.tell(): 1070 self._read_seek = self._io.tell() 1071 self._read_whence = 0 1072 1073 return ret_value 1074 1075 return other_wrapper 1076 1077 def _write_wrapper(self, name: str) -> Callable: 1078 """Wrap a stream attribute in a write_wrapper. 1079 1080 Args: 1081 name: the name of the stream attribute to wrap. 1082 1083 Returns: 1084 write_wrapper which is described below. 1085 """ 1086 io_attr = getattr(self._io, name) 1087 1088 def write_wrapper(*args, **kwargs): 1089 """Wrap all other calls to the stream Object. 1090 1091 We do this to track changes to the write pointer. Anything that 1092 moves the write pointer in a file open for appending should move 1093 the read pointer as well. 1094 1095 Args: 1096 *args: Pass through args. 1097 **kwargs: Pass through kwargs. 1098 1099 Returns: 1100 Wrapped stream object method. 1101 """ 1102 old_pos = self._io.tell() 1103 ret_value = io_attr(*args, **kwargs) 1104 new_pos = self._io.tell() 1105 1106 # if the buffer size is exceeded, we flush 1107 use_line_buf = self._use_line_buffer and "\n" in args[0] 1108 if new_pos - self._flush_pos > self._buffer_size or use_line_buf: 1109 flush_all = new_pos - old_pos > self._buffer_size or use_line_buf 1110 # if the current write does not exceed the buffer size, 1111 # we revert to the previous position and flush that, 1112 # otherwise we flush all 1113 if not flush_all: 1114 self._io.seek(old_pos) 1115 self._io.truncate() 1116 self._try_flush(old_pos) 1117 if not flush_all: 1118 ret_value = io_attr(*args, **kwargs) 1119 if self._append: 1120 self._read_seek = self._io.tell() 1121 self._read_whence = 0 1122 return ret_value 1123 1124 return write_wrapper 1125 1126 def _adapt_size_for_related_files(self, size: int) -> None: 1127 for open_files in self._filesystem.open_files[3:]: 1128 if open_files is not None: 1129 for open_file in open_files: 1130 if ( 1131 open_file is not self 1132 and isinstance(open_file, FakeFileWrapper) 1133 and self.file_object == open_file.file_object 1134 and cast(FakeFileWrapper, open_file)._append 1135 ): 1136 open_file._read_seek += size 1137 1138 def _truncate_wrapper(self) -> Callable: 1139 """Wrap truncate() to allow flush after truncate. 1140 1141 Returns: 1142 Wrapper which is described below. 1143 """ 1144 io_attr = self._io.truncate 1145 1146 def truncate_wrapper(*args, **kwargs): 1147 """Wrap truncate call to call flush after truncate.""" 1148 if self._append: 1149 self._io.seek(self._read_seek, self._read_whence) 1150 size = io_attr(*args, **kwargs) 1151 self.flush() 1152 if not self.is_stream: 1153 self.file_object.size = size 1154 buffer_size = len(self._io.getvalue()) 1155 if buffer_size < size: 1156 self._io.seek(buffer_size) 1157 self._io.putvalue(b"\0" * (size - buffer_size)) 1158 self.file_object.set_contents(self._io.getvalue(), self._encoding) 1159 self._flush_pos = size 1160 self._adapt_size_for_related_files(size - buffer_size) 1161 1162 self.flush() 1163 return size 1164 1165 return truncate_wrapper 1166 1167 def size(self) -> int: 1168 """Return the content size in bytes of the wrapped file.""" 1169 return self.file_object.st_size 1170 1171 def __getattr__(self, name: str) -> Any: 1172 if self.file_object.is_large_file(): 1173 raise FakeLargeFileIoException(self.file_path) 1174 1175 reading = name.startswith("read") or name == "next" 1176 truncate = name == "truncate" 1177 writing = name.startswith("write") or truncate 1178 1179 if reading or writing: 1180 self._check_open_file() 1181 if not self._read and reading: 1182 return self._read_error() 1183 if not self.opened_as_fd and not self.allow_update and writing: 1184 return self._write_error() 1185 1186 if reading: 1187 self._sync_io() 1188 if not self.is_stream: 1189 self.flush() 1190 if not self._filesystem.is_windows_fs: 1191 self.file_object.st_atime = helpers.now() 1192 if truncate: 1193 return self._truncate_wrapper() 1194 if self._append: 1195 if reading: 1196 return self._read_wrappers(name) 1197 elif not writing: 1198 return self._other_wrapper(name) 1199 if writing: 1200 return self._write_wrapper(name) 1201 1202 return getattr(self._io, name) 1203 1204 def _read_error(self) -> Callable: 1205 def read_error(*args, **kwargs): 1206 """Throw an error unless the argument is zero.""" 1207 if args and args[0] == 0: 1208 if self._filesystem.is_windows_fs and self.raw_io: 1209 return b"" if self._binary else "" 1210 self._raise("File is not open for reading.") 1211 1212 return read_error 1213 1214 def _write_error(self) -> Callable: 1215 def write_error(*args, **kwargs): 1216 """Throw an error.""" 1217 if self.raw_io: 1218 if self._filesystem.is_windows_fs and args and len(args[0]) == 0: 1219 return 0 1220 self._raise("File is not open for writing.") 1221 1222 return write_error 1223 1224 def _is_open(self) -> bool: 1225 if self.filedes is not None and self.filedes < len(self._filesystem.open_files): 1226 open_files = self._filesystem.open_files[self.filedes] 1227 if open_files is not None and self in open_files: 1228 return True 1229 return False 1230 1231 def _check_open_file(self) -> None: 1232 if not self.is_stream and not self._is_open(): 1233 raise ValueError("I/O operation on closed file") 1234 1235 def __iter__(self) -> Union[Iterator[str], Iterator[bytes]]: 1236 if not self._read: 1237 self._raise("File is not open for reading") 1238 return self._io.__iter__() 1239 1240 def __next__(self): 1241 if not self._read: 1242 self._raise("File is not open for reading") 1243 return next(self._io) 1244 1245 1246class StandardStreamWrapper: 1247 """Wrapper for a system standard stream to be used in open files list.""" 1248 1249 def __init__(self, stream_object: TextIO): 1250 self._stream_object = stream_object 1251 self.filedes: Optional[int] = None 1252 1253 def get_object(self) -> TextIO: 1254 return self._stream_object 1255 1256 def fileno(self) -> int: 1257 """Return the file descriptor of the wrapped standard stream.""" 1258 if self.filedes is not None: 1259 return self.filedes 1260 raise OSError(errno.EBADF, "Invalid file descriptor") 1261 1262 def read(self, n: int = -1) -> bytes: 1263 return cast(bytes, self._stream_object.read()) 1264 1265 def write(self, contents: bytes) -> int: 1266 self._stream_object.write(cast(str, contents)) 1267 return len(contents) 1268 1269 def close(self) -> None: 1270 """We do not support closing standard streams.""" 1271 1272 def close_fd(self, fd: Optional[int]) -> None: 1273 """We do not support closing standard streams.""" 1274 1275 def is_stream(self) -> bool: 1276 return True 1277 1278 def __enter__(self) -> "StandardStreamWrapper": 1279 """To support usage of this standard stream with the 'with' statement.""" 1280 return self 1281 1282 def __exit__( 1283 self, 1284 exc_type: Optional[Type[BaseException]], 1285 exc_val: Optional[BaseException], 1286 exc_tb: Optional[TracebackType], 1287 ) -> None: 1288 """To support usage of this standard stream with the 'with' statement.""" 1289 self.close() 1290 1291 1292class FakeDirWrapper: 1293 """Wrapper for a FakeDirectory object to be used in open files list.""" 1294 1295 def __init__( 1296 self, 1297 file_object: FakeDirectory, 1298 file_path: AnyString, 1299 filesystem: "FakeFilesystem", 1300 ): 1301 self.file_object = file_object 1302 self.file_path = file_path 1303 self._filesystem = filesystem 1304 self.filedes: Optional[int] = None 1305 1306 def get_object(self) -> FakeDirectory: 1307 """Return the FakeFile object that is wrapped by the current 1308 instance.""" 1309 return self.file_object 1310 1311 def fileno(self) -> int: 1312 """Return the file descriptor of the file object.""" 1313 if self.filedes is not None: 1314 return self.filedes 1315 raise OSError(errno.EBADF, "Invalid file descriptor") 1316 1317 def close(self) -> None: 1318 """Close the directory.""" 1319 self.close_fd(self.filedes) 1320 1321 def close_fd(self, fd: Optional[int]) -> None: 1322 """Close the directory.""" 1323 assert fd is not None 1324 self._filesystem.close_open_file(fd) 1325 1326 def read(self, numBytes: int = -1) -> bytes: 1327 """Read from the directory.""" 1328 return self.file_object.read(numBytes) 1329 1330 def write(self, contents: bytes) -> int: 1331 """Write to the directory.""" 1332 self.file_object.write(contents) 1333 return len(contents) 1334 1335 def __enter__(self) -> "FakeDirWrapper": 1336 """To support usage of this fake directory with the 'with' statement.""" 1337 return self 1338 1339 def __exit__( 1340 self, 1341 exc_type: Optional[Type[BaseException]], 1342 exc_val: Optional[BaseException], 1343 exc_tb: Optional[TracebackType], 1344 ) -> None: 1345 """To support usage of this fake directory with the 'with' statement.""" 1346 self.close() 1347 1348 1349class FakePipeWrapper: 1350 """Wrapper for a read or write descriptor of a real pipe object to be 1351 used in open files list. 1352 """ 1353 1354 def __init__( 1355 self, 1356 filesystem: "FakeFilesystem", 1357 fd: int, 1358 can_write: bool, 1359 mode: str = "", 1360 ): 1361 self._filesystem = filesystem 1362 self.fd = fd # the real file descriptor 1363 self.can_write = can_write 1364 self.file_object = None 1365 self.filedes: Optional[int] = None 1366 self.real_file = None 1367 if mode: 1368 self.real_file = open(fd, mode) 1369 1370 def __enter__(self) -> "FakePipeWrapper": 1371 """To support usage of this fake pipe with the 'with' statement.""" 1372 return self 1373 1374 def __exit__( 1375 self, 1376 exc_type: Optional[Type[BaseException]], 1377 exc_val: Optional[BaseException], 1378 exc_tb: Optional[TracebackType], 1379 ) -> None: 1380 """To support usage of this fake pipe with the 'with' statement.""" 1381 self.close() 1382 1383 def get_object(self) -> None: 1384 return self.file_object 1385 1386 def fileno(self) -> int: 1387 """Return the fake file descriptor of the pipe object.""" 1388 if self.filedes is not None: 1389 return self.filedes 1390 raise OSError(errno.EBADF, "Invalid file descriptor") 1391 1392 def read(self, numBytes: int = -1) -> bytes: 1393 """Read from the real pipe.""" 1394 if self.real_file: 1395 return self.real_file.read(numBytes) # pytype: disable=bad-return-type 1396 return os.read(self.fd, numBytes) 1397 1398 def flush(self) -> None: 1399 """Flush the real pipe?""" 1400 1401 def write(self, contents: bytes) -> int: 1402 """Write to the real pipe.""" 1403 if self.real_file: 1404 return self.real_file.write(contents) 1405 return os.write(self.fd, contents) 1406 1407 def close(self) -> None: 1408 """Close the pipe descriptor.""" 1409 self.close_fd(self.filedes) 1410 1411 def close_fd(self, fd: Optional[int]) -> None: 1412 """Close the pipe descriptor with the given file descriptor.""" 1413 assert fd is not None 1414 open_files = self._filesystem.open_files[fd] 1415 assert open_files is not None 1416 open_files.remove(self) 1417 if self.real_file: 1418 self.real_file.close() 1419 else: 1420 os.close(self.fd) 1421 1422 def readable(self) -> bool: 1423 """The pipe end can either be readable or writable.""" 1424 return not self.can_write 1425 1426 def writable(self) -> bool: 1427 """The pipe end can either be readable or writable.""" 1428 return self.can_write 1429 1430 def seekable(self) -> bool: 1431 """A pipe is not seekable.""" 1432 return False 1433