• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2009 Google Inc. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Fake implementations for different file objects."""
16
17import errno
18import io
19import os
20import sys
21import traceback
22from stat import (
23    S_IFREG,
24    S_IFDIR,
25)
26from types import TracebackType
27from typing import (
28    List,
29    Optional,
30    Callable,
31    Union,
32    Any,
33    Dict,
34    cast,
35    AnyStr,
36    NoReturn,
37    Iterator,
38    TextIO,
39    Type,
40    TYPE_CHECKING,
41)
42
43from pyfakefs import helpers
44from pyfakefs.helpers import (
45    FakeStatResult,
46    BinaryBufferIO,
47    TextBufferIO,
48    is_int_type,
49    is_unicode_string,
50    to_string,
51    matching_string,
52    real_encoding,
53    AnyPath,
54    AnyString,
55    get_locale_encoding,
56    _OpenModes,
57    is_root,
58)
59
60if TYPE_CHECKING:
61    from pyfakefs.fake_filesystem import FakeFilesystem
62
63
64# Work around pyupgrade auto-rewriting `io.open()` to `open()`.
65io_open = io.open
66
67AnyFileWrapper = Union[
68    "FakeFileWrapper",
69    "FakeDirWrapper",
70    "StandardStreamWrapper",
71    "FakePipeWrapper",
72]
73AnyFile = Union["FakeFile", "FakeDirectory"]
74
75
76class FakeLargeFileIoException(Exception):
77    """Exception thrown on unsupported operations for fake large files.
78    Fake large files have a size with no real content.
79    """
80
81    def __init__(self, file_path: str) -> None:
82        super().__init__(
83            "Read and write operations not supported for "
84            "fake large file: %s" % file_path
85        )
86
87
88class FakeFile:
89    """Provides the appearance of a real file.
90
91    Attributes currently faked out:
92      * `st_mode`: user-specified, otherwise S_IFREG
93      * `st_ctime`: the time.time() timestamp of the file change time (updated
94        each time a file's attributes is modified).
95      * `st_atime`: the time.time() timestamp when the file was last accessed.
96      * `st_mtime`: the time.time() timestamp when the file was last modified.
97      * `st_size`: the size of the file
98      * `st_nlink`: the number of hard links to the file
99      * `st_ino`: the inode number - a unique number identifying the file
100      * `st_dev`: a unique number identifying the (fake) file system device
101        the file belongs to
102      * `st_uid`: always set to USER_ID, which can be changed globally using
103            `set_uid`
104      * `st_gid`: always set to GROUP_ID, which can be changed globally using
105            `set_gid`
106
107    .. note:: The resolution for `st_ctime`, `st_mtime` and `st_atime` in the
108        real file system depends on the used file system (for example it is
109        only 1s for HFS+ and older Linux file systems, but much higher for
110        ext4 and NTFS). This is currently ignored by pyfakefs, which uses
111        the resolution of `time.time()`.
112
113        Under Windows, `st_atime` is not updated for performance reasons by
114        default. pyfakefs never updates `st_atime` under Windows, assuming
115        the default setting.
116    """
117
118    stat_types = (
119        "st_mode",
120        "st_ino",
121        "st_dev",
122        "st_nlink",
123        "st_uid",
124        "st_gid",
125        "st_size",
126        "st_atime",
127        "st_mtime",
128        "st_ctime",
129        "st_atime_ns",
130        "st_mtime_ns",
131        "st_ctime_ns",
132    )
133
134    def __init__(
135        self,
136        name: AnyStr,
137        st_mode: int = S_IFREG | helpers.PERM_DEF_FILE,
138        contents: Optional[AnyStr] = None,
139        filesystem: Optional["FakeFilesystem"] = None,
140        encoding: Optional[str] = None,
141        errors: Optional[str] = None,
142        side_effect: Optional[Callable[["FakeFile"], None]] = None,
143        open_modes: Optional[_OpenModes] = None,
144    ):
145        """
146        Args:
147            name: Name of the file/directory, without parent path information
148            st_mode: The stat.S_IF* constant representing the file type (i.e.
149                stat.S_IFREG, stat.S_IFDIR), and the file permissions.
150                If no file type is set (e.g. permission flags only), a
151                regular file type is assumed.
152            contents: The contents of the filesystem object; should be a string
153                or byte object for regular files, and a dict of other
154                FakeFile or FakeDirectory objects with the file names as
155                keys for FakeDirectory objects
156            filesystem: The fake filesystem where the file is created.
157            encoding: If contents is a unicode string, the encoding used
158                for serialization.
159            errors: The error mode used for encoding/decoding errors.
160            side_effect: function handle that is executed when file is written,
161                must accept the file object as an argument.
162            open_modes: The modes the file was opened with (e.g. can read, write etc.)
163        """
164        # to be backwards compatible regarding argument order, we raise on None
165        if filesystem is None:
166            raise ValueError("filesystem shall not be None")
167        self.filesystem: "FakeFilesystem" = filesystem
168        self._side_effect: Optional[Callable] = side_effect
169        self.name: AnyStr = name  # type: ignore[assignment]
170        self.stat_result = FakeStatResult(
171            filesystem.is_windows_fs,
172            helpers.get_uid(),
173            helpers.get_gid(),
174            helpers.now(),
175        )
176        if st_mode >> 12 == 0:
177            st_mode |= S_IFREG
178        self.stat_result.st_mode = st_mode
179        self.st_size: int = 0
180        self.encoding: Optional[str] = real_encoding(encoding)
181        self.errors: str = errors or "strict"
182        self._byte_contents: Optional[bytes] = self._encode_contents(contents)
183        self.stat_result.st_size = (
184            len(self._byte_contents) if self._byte_contents is not None else 0
185        )
186        self.epoch: int = 0
187        self.parent_dir: Optional[FakeDirectory] = None
188        # Linux specific: extended file system attributes
189        self.xattr: Dict = {}
190        self.opened_as: AnyString = ""
191        self.open_modes = open_modes
192
193    @property
194    def byte_contents(self) -> Optional[bytes]:
195        """Return the contents as raw byte array."""
196        return self._byte_contents
197
198    @property
199    def contents(self) -> Optional[str]:
200        """Return the contents as string with the original encoding."""
201        if isinstance(self.byte_contents, bytes):
202            return self.byte_contents.decode(
203                self.encoding or get_locale_encoding(),
204                errors=self.errors,
205            )
206        return None
207
208    @property
209    def st_ctime(self) -> float:
210        """Return the creation time of the fake file."""
211        return self.stat_result.st_ctime
212
213    @st_ctime.setter
214    def st_ctime(self, val: float) -> None:
215        """Set the creation time of the fake file."""
216        self.stat_result.st_ctime = val
217
218    @property
219    def st_atime(self) -> float:
220        """Return the access time of the fake file."""
221        return self.stat_result.st_atime
222
223    @st_atime.setter
224    def st_atime(self, val: float) -> None:
225        """Set the access time of the fake file."""
226        self.stat_result.st_atime = val
227
228    @property
229    def st_mtime(self) -> float:
230        """Return the modification time of the fake file."""
231        return self.stat_result.st_mtime
232
233    @st_mtime.setter
234    def st_mtime(self, val: float) -> None:
235        """Set the modification time of the fake file."""
236        self.stat_result.st_mtime = val
237
238    def set_large_file_size(self, st_size: int) -> None:
239        """Sets the self.st_size attribute and replaces self.content with None.
240
241        Provided specifically to simulate very large files without regards
242        to their content (which wouldn't fit in memory).
243        Note that read/write operations with such a file raise
244            :py:class:`FakeLargeFileIoException`.
245
246        Args:
247          st_size: (int) The desired file size
248
249        Raises:
250          OSError: if the st_size is not a non-negative integer,
251                   or if st_size exceeds the available file system space
252        """
253        self._check_positive_int(st_size)
254        if self.st_size:
255            self.size = 0
256        if self.filesystem:
257            self.filesystem.change_disk_usage(st_size, self.name, self.st_dev)
258        self.st_size = st_size
259        self._byte_contents = None
260
261    def _check_positive_int(self, size: int) -> None:
262        # the size should be an positive integer value
263        if not is_int_type(size) or size < 0:
264            self.filesystem.raise_os_error(errno.ENOSPC, self.name)
265
266    def is_large_file(self) -> bool:
267        """Return `True` if this file was initialized with size
268        but no contents.
269        """
270        return self._byte_contents is None
271
272    def _encode_contents(self, contents: Union[str, bytes, None]) -> Optional[bytes]:
273        if is_unicode_string(contents):
274            contents = bytes(
275                cast(str, contents),
276                self.encoding or get_locale_encoding(),
277                self.errors,
278            )
279        return cast(bytes, contents)
280
281    def set_initial_contents(self, contents: AnyStr) -> bool:
282        """Sets the file contents and size.
283           Called internally after initial file creation.
284
285        Args:
286            contents: string, new content of file.
287
288        Returns:
289            True if the contents have been changed.
290
291        Raises:
292              OSError: if the st_size is not a non-negative integer,
293                   or if st_size exceeds the available file system space
294        """
295        byte_contents = self._encode_contents(contents)
296        changed = self._byte_contents != byte_contents
297        st_size = len(byte_contents) if byte_contents else 0
298
299        current_size = self.st_size or 0
300        self.filesystem.change_disk_usage(
301            st_size - current_size, self.name, self.st_dev
302        )
303        self._byte_contents = byte_contents
304        self.st_size = st_size
305        self.epoch += 1
306        return changed
307
308    def set_contents(self, contents: AnyStr, encoding: Optional[str] = None) -> bool:
309        """Sets the file contents and size and increases the modification time.
310        Also executes the side_effects if available.
311
312        Args:
313          contents: (str, bytes) new content of file.
314          encoding: (str) the encoding to be used for writing the contents
315                    if they are a unicode string.
316                    If not given, the locale preferred encoding is used.
317
318        Returns:
319            True if the contents have been changed.
320
321        Raises:
322          OSError: if `st_size` is not a non-negative integer,
323                   or if it exceeds the available file system space.
324        """
325        self.encoding = real_encoding(encoding)
326        changed = self.set_initial_contents(contents)
327        if self._side_effect is not None:
328            self._side_effect(self)
329        return changed
330
331    @property
332    def size(self) -> int:
333        """Return the size in bytes of the file contents."""
334        return self.st_size
335
336    @size.setter
337    def size(self, st_size: int) -> None:
338        """Resizes file content, padding with nulls if new size exceeds the
339        old size.
340
341        Args:
342          st_size: The desired size for the file.
343
344        Raises:
345          OSError: if the st_size arg is not a non-negative integer
346                   or if st_size exceeds the available file system space
347        """
348
349        self._check_positive_int(st_size)
350        current_size = self.st_size or 0
351        self.filesystem.change_disk_usage(
352            st_size - current_size, self.name, self.st_dev
353        )
354        if self._byte_contents:
355            if st_size < current_size:
356                self._byte_contents = self._byte_contents[:st_size]
357            else:
358                self._byte_contents += b"\0" * (st_size - current_size)
359        self.st_size = st_size
360        self.epoch += 1
361
362    @property
363    def path(self) -> AnyStr:  # type: ignore[type-var]
364        """Return the full path of the current object."""
365        names: List[AnyStr] = []  # pytype: disable=invalid-annotation
366        obj: Optional[FakeFile] = self
367        while obj:
368            names.insert(0, matching_string(self.name, obj.name))  # type: ignore
369            obj = obj.parent_dir
370        sep = self.filesystem.get_path_separator(names[0])
371        if names[0] == sep:
372            names.pop(0)
373            dir_path = sep.join(names)
374            drive = self.filesystem.splitdrive(dir_path)[0]
375            # if a Windows path already starts with a drive or UNC path,
376            # no extra separator is needed
377            if not drive:
378                dir_path = sep + dir_path
379        else:
380            dir_path = sep.join(names)
381        return self.filesystem.absnormpath(dir_path)
382
383    if sys.version_info >= (3, 12):
384
385        @property
386        def is_junction(self) -> bool:
387            return self.filesystem.isjunction(self.path)
388
389    def __getattr__(self, item: str) -> Any:
390        """Forward some properties to stat_result."""
391        if item in self.stat_types:
392            return getattr(self.stat_result, item)
393        return super().__getattribute__(item)
394
395    def __setattr__(self, key: str, value: Any) -> None:
396        """Forward some properties to stat_result."""
397        if key in self.stat_types:
398            return setattr(self.stat_result, key, value)
399        return super().__setattr__(key, value)
400
401    def __str__(self) -> str:
402        return f"{self.name!r}({self.st_mode:o})"
403
404    def has_permission(self, permission_bits: int) -> bool:
405        """Checks if the given permissions are set in the fake file.
406
407        Args:
408            permission_bits: The permission bits as set for the user.
409
410        Returns:
411            True if the permissions are set in the correct class (user/group/other).
412        """
413        if helpers.get_uid() == self.stat_result.st_uid:
414            return self.st_mode & permission_bits == permission_bits
415        if helpers.get_gid() == self.stat_result.st_gid:
416            return self.st_mode & (permission_bits >> 3) == permission_bits >> 3
417        return self.st_mode & (permission_bits >> 6) == permission_bits >> 6
418
419
420class FakeNullFile(FakeFile):
421    def __init__(self, filesystem: "FakeFilesystem") -> None:
422        super().__init__(filesystem.devnull, filesystem=filesystem, contents="")
423
424    @property
425    def byte_contents(self) -> bytes:
426        return b""
427
428    def set_initial_contents(self, contents: AnyStr) -> bool:
429        return False
430
431
432class FakeFileFromRealFile(FakeFile):
433    """Represents a fake file copied from the real file system.
434
435    The contents of the file are read on demand only.
436    """
437
438    def __init__(
439        self,
440        file_path: str,
441        filesystem: "FakeFilesystem",
442        side_effect: Optional[Callable] = None,
443    ) -> None:
444        """
445        Args:
446            file_path: Path to the existing file.
447            filesystem: The fake filesystem where the file is created.
448
449        Raises:
450            OSError: if the file does not exist in the real file system.
451            OSError: if the file already exists in the fake file system.
452        """
453        super().__init__(
454            name=os.path.basename(file_path),
455            filesystem=filesystem,
456            side_effect=side_effect,
457        )
458        self.contents_read = False
459
460    @property
461    def byte_contents(self) -> Optional[bytes]:
462        if not self.contents_read:
463            self.contents_read = True
464            with io_open(self.file_path, "rb") as f:
465                self._byte_contents = f.read()
466        # On MacOS and BSD, the above io.open() updates atime on the real file
467        self.st_atime = os.stat(self.file_path).st_atime
468        return self._byte_contents
469
470    def set_contents(self, contents, encoding=None):
471        self.contents_read = True
472        super().set_contents(contents, encoding)
473
474    def is_large_file(self):
475        """The contents are never faked."""
476        return False
477
478
479class FakeDirectory(FakeFile):
480    """Provides the appearance of a real directory."""
481
482    def __init__(
483        self,
484        name: str,
485        perm_bits: int = helpers.PERM_DEF,
486        filesystem: Optional["FakeFilesystem"] = None,
487    ):
488        """
489        Args:
490            name:  name of the file/directory, without parent path information
491            perm_bits: permission bits. defaults to 0o777.
492            filesystem: if set, the fake filesystem where the directory
493                is created
494        """
495        FakeFile.__init__(self, name, S_IFDIR | perm_bits, "", filesystem=filesystem)
496        # directories have the link count of contained entries,
497        # including '.' and '..'
498        self.st_nlink += 1
499        self._entries: Dict[str, AnyFile] = {}
500
501    def set_contents(self, contents: AnyStr, encoding: Optional[str] = None) -> bool:
502        raise self.filesystem.raise_os_error(errno.EISDIR, self.path)
503
504    @property
505    def entries(self) -> Dict[str, FakeFile]:
506        """Return the list of contained directory entries."""
507        return self._entries
508
509    @property
510    def ordered_dirs(self) -> List[str]:
511        """Return the list of contained directory entry names ordered by
512        creation order.
513        """
514        return [
515            item[0]
516            for item in sorted(self._entries.items(), key=lambda entry: entry[1].st_ino)
517        ]
518
519    def add_entry(self, path_object: FakeFile) -> None:
520        """Adds a child FakeFile to this directory.
521
522        Args:
523            path_object: FakeFile instance to add as a child of this directory.
524
525        Raises:
526            OSError: if the directory has no write permission (Posix only)
527            OSError: if the file or directory to be added already exists
528        """
529        if (
530            not helpers.is_root()
531            and not self.filesystem.is_windows_fs
532            and not self.has_permission(helpers.PERM_WRITE)
533        ):
534            raise OSError(errno.EACCES, "Permission Denied", self.path)
535
536        path_object_name: str = to_string(path_object.name)
537        if path_object_name in self.entries:
538            self.filesystem.raise_os_error(errno.EEXIST, self.path)
539
540        self._entries[path_object_name] = path_object
541        path_object.parent_dir = self
542        if path_object.st_ino is None:
543            self.filesystem.last_ino += 1
544            path_object.st_ino = self.filesystem.last_ino
545        self.st_nlink += 1
546        path_object.st_nlink += 1
547        path_object.st_dev = self.st_dev
548        if path_object.st_nlink == 1:
549            self.filesystem.change_disk_usage(
550                path_object.size, path_object.name, self.st_dev
551            )
552
553    def get_entry(self, pathname_name: str) -> AnyFile:
554        """Retrieves the specified child file or directory entry.
555
556        Args:
557            pathname_name: The basename of the child object to retrieve.
558
559        Returns:
560            The fake file or directory object.
561
562        Raises:
563            KeyError: if no child exists by the specified name.
564        """
565        pathname_name = self._normalized_entryname(pathname_name)
566        return self.entries[to_string(pathname_name)]
567
568    def _normalized_entryname(self, pathname_name: str) -> str:
569        if not self.filesystem.is_case_sensitive:
570            matching_names = [
571                name for name in self.entries if name.lower() == pathname_name.lower()
572            ]
573            if matching_names:
574                pathname_name = matching_names[0]
575        return pathname_name
576
577    def remove_entry(self, pathname_name: str, recursive: bool = True) -> None:
578        """Removes the specified child file or directory.
579
580        Args:
581            pathname_name: Basename of the child object to remove.
582            recursive: If True (default), the entries in contained directories
583                are deleted first. Used to propagate removal errors
584                (e.g. permission problems) from contained entries.
585
586        Raises:
587            KeyError: if no child exists by the specified name.
588            OSError: if user lacks permission to delete the file,
589                or (Windows only) the file is open.
590        """
591        pathname_name = self._normalized_entryname(pathname_name)
592        entry = self.get_entry(pathname_name)
593        if self.filesystem.is_windows_fs:
594            if not is_root() and entry.st_mode & helpers.PERM_WRITE == 0:
595                self.filesystem.raise_os_error(errno.EACCES, pathname_name)
596            if self.filesystem.has_open_file(entry):
597                raise_error = True
598                if os.name == "posix" and not hasattr(os, "O_TMPFILE"):
599                    # special handling for emulating Windows under macOS and PyPi
600                    # tempfile uses unlink based on the real OS while deleting
601                    # a temporary file, so we ignore that error in this specific case
602                    st = traceback.extract_stack(limit=6)
603                    if sys.version_info < (3, 10):
604                        if (
605                            st[0].name == "TemporaryFile"
606                            and st[0].line == "_os.unlink(name)"
607                        ):
608                            raise_error = False
609                    else:
610                        # TemporaryFile implementation has changed in Python 3.10
611                        if st[0].name == "opener" and st[0].line == "_os.unlink(name)":
612                            raise_error = False
613                if raise_error:
614                    self.filesystem.raise_os_error(errno.EACCES, pathname_name)
615        else:
616            if not helpers.is_root() and not self.has_permission(
617                helpers.PERM_WRITE | helpers.PERM_EXE
618            ):
619                self.filesystem.raise_os_error(errno.EACCES, pathname_name)
620
621        if recursive and isinstance(entry, FakeDirectory):
622            while entry.entries:
623                entry.remove_entry(list(entry.entries)[0])
624        elif entry.st_nlink == 1:
625            self.filesystem.change_disk_usage(-entry.size, pathname_name, entry.st_dev)
626
627        self.st_nlink -= 1
628        entry.st_nlink -= 1
629        assert entry.st_nlink >= 0
630
631        del self.entries[to_string(pathname_name)]
632
633    @property
634    def size(self) -> int:
635        """Return the total size of all files contained
636        in this directory tree.
637        """
638        return sum([item[1].size for item in self.entries.items()])
639
640    @size.setter
641    def size(self, st_size: int) -> None:
642        """Setting the size is an error for a directory."""
643        raise self.filesystem.raise_os_error(errno.EISDIR, self.path)
644
645    def has_parent_object(self, dir_object: "FakeDirectory") -> bool:
646        """Return `True` if dir_object is a direct or indirect parent
647        directory, or if both are the same object."""
648        obj: Optional[FakeDirectory] = self
649        while obj:
650            if obj == dir_object:
651                return True
652            obj = obj.parent_dir
653        return False
654
655    def __str__(self) -> str:
656        description = super().__str__() + ":\n"
657        for item in self.entries:
658            item_desc = self.entries[item].__str__()
659            for line in item_desc.split("\n"):
660                if line:
661                    description = description + "  " + line + "\n"
662        return description
663
664
665class FakeDirectoryFromRealDirectory(FakeDirectory):
666    """Represents a fake directory copied from the real file system.
667
668    The contents of the directory are read on demand only.
669    """
670
671    def __init__(
672        self,
673        source_path: AnyPath,
674        filesystem: "FakeFilesystem",
675        read_only: bool,
676        target_path: Optional[AnyPath] = None,
677    ):
678        """
679        Args:
680            source_path: Full directory path.
681            filesystem: The fake filesystem where the directory is created.
682            read_only: If set, all files under the directory are treated
683                as read-only, e.g. a write access raises an exception;
684                otherwise, writing to the files changes the fake files
685                only as usually.
686            target_path: If given, the target path of the directory,
687                otherwise the target is the same as `source_path`.
688
689        Raises:
690            OSError: if the directory does not exist in the real file system
691        """
692        target_path = target_path or source_path
693        real_stat = os.stat(source_path)
694        super().__init__(
695            name=to_string(os.path.split(target_path)[1]),
696            perm_bits=real_stat.st_mode,
697            filesystem=filesystem,
698        )
699
700        self.st_ctime = real_stat.st_ctime
701        self.st_atime = real_stat.st_atime
702        self.st_mtime = real_stat.st_mtime
703        self.st_gid = real_stat.st_gid
704        self.st_uid = real_stat.st_uid
705        self.source_path = source_path  # type: ignore
706        self.read_only = read_only
707        self.contents_read = False
708
709    @property
710    def entries(self) -> Dict[str, FakeFile]:
711        """Return the list of contained directory entries, loading them
712        if not already loaded."""
713        if not self.contents_read:
714            self.contents_read = True
715            base = self.path
716            for entry in os.listdir(self.source_path):
717                source_path = os.path.join(self.source_path, entry)
718                target_path = os.path.join(base, entry)  # type: ignore
719                if os.path.islink(source_path):
720                    self.filesystem.add_real_symlink(source_path, target_path)
721                elif os.path.isdir(source_path):
722                    self.filesystem.add_real_directory(
723                        source_path, self.read_only, target_path=target_path
724                    )
725                else:
726                    self.filesystem.add_real_file(
727                        source_path, self.read_only, target_path=target_path
728                    )
729        return self._entries
730
731    @property
732    def size(self) -> int:
733        # we cannot get the size until the contents are loaded
734        if not self.contents_read:
735            return 0
736        return super().size
737
738    @size.setter
739    def size(self, st_size: int) -> None:
740        raise self.filesystem.raise_os_error(errno.EISDIR, self.path)
741
742
743class FakeFileWrapper:
744    """Wrapper for a stream object for use by a FakeFile object.
745
746    If the wrapper has any data written to it, it will propagate to
747    the FakeFile object on close() or flush().
748    """
749
750    def __init__(
751        self,
752        file_object: FakeFile,
753        file_path: AnyStr,
754        update: bool,
755        read: bool,
756        append: bool,
757        delete_on_close: bool,
758        filesystem: "FakeFilesystem",
759        newline: Optional[str],
760        binary: bool,
761        closefd: bool,
762        encoding: Optional[str],
763        errors: Optional[str],
764        buffering: int,
765        raw_io: bool,
766        opened_as_fd: bool,
767        is_stream: bool = False,
768    ):
769        self.file_object = file_object
770        self.file_path = file_path  # type: ignore[var-annotated]
771        self._append = append
772        self._read = read
773        self.allow_update = update
774        self._closefd = closefd
775        self._file_epoch = file_object.epoch
776        self.raw_io = raw_io
777        self._binary = binary
778        self.opened_as_fd = opened_as_fd
779        self.is_stream = is_stream
780        self._changed = False
781        self._buffer_size = buffering
782        if self._buffer_size == 0 and not binary:
783            raise ValueError("can't have unbuffered text I/O")
784        # buffer_size is ignored in text mode
785        elif self._buffer_size == -1 or not binary:
786            self._buffer_size = io.DEFAULT_BUFFER_SIZE
787        self._use_line_buffer = not binary and buffering == 1
788
789        contents = file_object.byte_contents
790        self._encoding = encoding or get_locale_encoding()
791        errors = errors or "strict"
792        self._io: Union[BinaryBufferIO, TextBufferIO] = (
793            BinaryBufferIO(contents)
794            if binary
795            else TextBufferIO(
796                contents, encoding=encoding, newline=newline, errors=errors
797            )
798        )
799        self._read_whence = 0
800        self._read_seek = 0
801        self._flush_pos = 0
802        if contents:
803            self._flush_pos = len(contents)
804            if update:
805                if not append:
806                    self._io.seek(0)
807                else:
808                    self._io.seek(self._flush_pos)
809                    self._read_seek = self._io.tell()
810
811        if delete_on_close:
812            assert filesystem, "delete_on_close=True requires filesystem"
813        self._filesystem = filesystem
814        self.delete_on_close = delete_on_close
815        # override, don't modify FakeFile.name, as FakeFilesystem expects
816        # it to be the file name only, no directories.
817        self.name = file_object.opened_as
818        self.filedes: Optional[int] = None
819
820    def __enter__(self) -> "FakeFileWrapper":
821        """To support usage of this fake file with the 'with' statement."""
822        return self
823
824    def __exit__(
825        self,
826        exc_type: Optional[Type[BaseException]],
827        exc_val: Optional[BaseException],
828        exc_tb: Optional[TracebackType],
829    ) -> None:
830        """To support usage of this fake file with the 'with' statement."""
831        self.close()
832
833    def _raise(self, message: str) -> NoReturn:
834        if self.raw_io:
835            self._filesystem.raise_os_error(errno.EBADF, self.file_path)
836        raise io.UnsupportedOperation(message)
837
838    def get_object(self) -> FakeFile:
839        """Return the FakeFile object that is wrapped
840        by the current instance.
841        """
842        return self.file_object
843
844    def fileno(self) -> int:
845        """Return the file descriptor of the file object."""
846        if self.filedes is not None:
847            return self.filedes
848        raise OSError(errno.EBADF, "Invalid file descriptor")
849
850    def close(self) -> None:
851        """Close the file."""
852        self.close_fd(self.filedes)
853
854    def close_fd(self, fd: Optional[int]) -> None:
855        """Close the file for the given file descriptor."""
856
857        # ignore closing a closed file
858        if not self._is_open():
859            return
860
861        # for raw io, all writes are flushed immediately
862        if not self.raw_io:
863            try:
864                self.flush()
865            except OSError as e:
866                if e.errno == errno.EBADF:
867                    # if we get here, we have an open file descriptor
868                    # without write permission, which has to be closed
869                    assert self.filedes
870                    self._filesystem.close_open_file(self.filedes)
871                raise
872
873            if self._filesystem.is_windows_fs and self._changed:
874                self.file_object.st_mtime = helpers.now()
875
876        assert fd is not None
877        if self._closefd:
878            self._filesystem.close_open_file(fd)
879        else:
880            open_files = self._filesystem.open_files[fd]
881            assert open_files is not None
882            open_files.remove(self)
883        if self.delete_on_close:
884            self._filesystem.remove_object(
885                self.get_object().path  # type: ignore[arg-type]
886            )
887
888    @property
889    def closed(self) -> bool:
890        """Simulate the `closed` attribute on file."""
891        return not self._is_open()
892
893    def _try_flush(self, old_pos: int) -> None:
894        """Try to flush and reset the position if it fails."""
895        flush_pos = self._flush_pos
896        try:
897            self.flush()
898        except OSError:
899            # write failed - reset to previous position
900            self._io.seek(old_pos)
901            self._io.truncate()
902            self._flush_pos = flush_pos
903            raise
904
905    def flush(self) -> None:
906        """Flush file contents to 'disk'."""
907        if self.is_stream:
908            return
909
910        self._check_open_file()
911
912        if self.allow_update:
913            contents = self._io.getvalue()
914            if self._append:
915                self._sync_io()
916                old_contents = self.file_object.byte_contents
917                assert old_contents is not None
918                contents = old_contents + contents[self._flush_pos :]
919                self._set_stream_contents(contents)
920            else:
921                self._io.flush()
922            changed = self.file_object.set_contents(contents, self._encoding)
923            self.update_flush_pos()
924            if changed:
925                if self._filesystem.is_windows_fs:
926                    self._changed = True
927                else:
928                    current_time = helpers.now()
929                    self.file_object.st_ctime = current_time
930                    self.file_object.st_mtime = current_time
931            self._file_epoch = self.file_object.epoch
932            self._flush_related_files()
933        else:
934            buf_length = len(self._io.getvalue())
935            content_length = 0
936            if self.file_object.byte_contents is not None:
937                content_length = len(self.file_object.byte_contents)
938            # an error is only raised if there is something to flush
939            if content_length != buf_length:
940                self._filesystem.raise_os_error(errno.EBADF)
941
942    def update_flush_pos(self) -> None:
943        self._flush_pos = self._io.tell()
944
945    def _flush_related_files(self) -> None:
946        for open_files in self._filesystem.open_files[3:]:
947            if open_files is not None:
948                for open_file in open_files:
949                    if (
950                        open_file is not self
951                        and isinstance(open_file, FakeFileWrapper)
952                        and self.file_object == open_file.file_object
953                        and not open_file._append
954                    ):
955                        open_file._sync_io()
956
957    def seek(self, offset: int, whence: int = 0) -> None:
958        """Move read/write pointer in 'file'."""
959        self._check_open_file()
960        if not self._append:
961            self._io.seek(offset, whence)
962        else:
963            self._read_seek = offset
964            self._read_whence = whence
965        if not self.is_stream:
966            self.flush()
967
968    def tell(self) -> int:
969        """Return the file's current position.
970
971        Returns:
972          int, file's current position in bytes.
973        """
974        self._check_open_file()
975        if not self.is_stream:
976            self.flush()
977
978        if not self._append:
979            return self._io.tell()
980        if self._read_whence:
981            write_seek = self._io.tell()
982            self._io.seek(self._read_seek, self._read_whence)
983            self._read_seek = self._io.tell()
984            self._read_whence = 0
985            self._io.seek(write_seek)
986        return self._read_seek
987
988    def _sync_io(self) -> None:
989        """Update the stream with changes to the file object contents."""
990        if self._file_epoch == self.file_object.epoch:
991            return
992
993        contents = self.file_object.byte_contents
994        assert contents is not None
995        self._set_stream_contents(contents)
996        self._file_epoch = self.file_object.epoch
997
998    def _set_stream_contents(self, contents: bytes) -> None:
999        whence = self._io.tell()
1000        self._io.seek(0)
1001        self._io.truncate()
1002        self._io.putvalue(contents)
1003        if not self._append:
1004            self._io.seek(whence)
1005
1006    def _read_wrappers(self, name: str) -> Callable:
1007        """Wrap a stream attribute in a read wrapper.
1008
1009        Returns a read_wrapper which tracks our own read pointer since the
1010        stream object has no concept of a different read and write pointer.
1011
1012        Args:
1013            name: The name of the attribute to wrap. Should be a read call.
1014
1015        Returns:
1016            The read_wrapper function.
1017        """
1018        io_attr = getattr(self._io, name)
1019
1020        def read_wrapper(*args, **kwargs):
1021            """Wrap all read calls to the stream object.
1022
1023            We do this to track the read pointer separate from the write
1024            pointer.  Anything that wants to read from the stream object
1025            while we're in append mode goes through this.
1026
1027            Args:
1028                *args: pass through args
1029                **kwargs: pass through kwargs
1030            Returns:
1031                Wrapped stream object method
1032            """
1033            self._io.seek(self._read_seek, self._read_whence)
1034            ret_value = io_attr(*args, **kwargs)
1035            self._read_seek = self._io.tell()
1036            self._read_whence = 0
1037            self._io.seek(0, 2)
1038            return ret_value
1039
1040        return read_wrapper
1041
1042    def _other_wrapper(self, name: str) -> Callable:
1043        """Wrap a stream attribute in an other_wrapper.
1044
1045        Args:
1046          name: the name of the stream attribute to wrap.
1047
1048        Returns:
1049          other_wrapper which is described below.
1050        """
1051        io_attr = getattr(self._io, name)
1052
1053        def other_wrapper(*args, **kwargs):
1054            """Wrap all other calls to the stream Object.
1055
1056            We do this to track changes to the write pointer.  Anything that
1057            moves the write pointer in a file open for appending should move
1058            the read pointer as well.
1059
1060            Args:
1061                *args: Pass through args.
1062                **kwargs: Pass through kwargs.
1063
1064            Returns:
1065                Wrapped stream object method.
1066            """
1067            write_seek = self._io.tell()
1068            ret_value = io_attr(*args, **kwargs)
1069            if write_seek != self._io.tell():
1070                self._read_seek = self._io.tell()
1071                self._read_whence = 0
1072
1073            return ret_value
1074
1075        return other_wrapper
1076
1077    def _write_wrapper(self, name: str) -> Callable:
1078        """Wrap a stream attribute in a write_wrapper.
1079
1080        Args:
1081          name: the name of the stream attribute to wrap.
1082
1083        Returns:
1084          write_wrapper which is described below.
1085        """
1086        io_attr = getattr(self._io, name)
1087
1088        def write_wrapper(*args, **kwargs):
1089            """Wrap all other calls to the stream Object.
1090
1091            We do this to track changes to the write pointer.  Anything that
1092            moves the write pointer in a file open for appending should move
1093            the read pointer as well.
1094
1095            Args:
1096                *args: Pass through args.
1097                **kwargs: Pass through kwargs.
1098
1099            Returns:
1100                Wrapped stream object method.
1101            """
1102            old_pos = self._io.tell()
1103            ret_value = io_attr(*args, **kwargs)
1104            new_pos = self._io.tell()
1105
1106            # if the buffer size is exceeded, we flush
1107            use_line_buf = self._use_line_buffer and "\n" in args[0]
1108            if new_pos - self._flush_pos > self._buffer_size or use_line_buf:
1109                flush_all = new_pos - old_pos > self._buffer_size or use_line_buf
1110                # if the current write does not exceed the buffer size,
1111                # we revert to the previous position and flush that,
1112                # otherwise we flush all
1113                if not flush_all:
1114                    self._io.seek(old_pos)
1115                    self._io.truncate()
1116                self._try_flush(old_pos)
1117                if not flush_all:
1118                    ret_value = io_attr(*args, **kwargs)
1119            if self._append:
1120                self._read_seek = self._io.tell()
1121                self._read_whence = 0
1122            return ret_value
1123
1124        return write_wrapper
1125
1126    def _adapt_size_for_related_files(self, size: int) -> None:
1127        for open_files in self._filesystem.open_files[3:]:
1128            if open_files is not None:
1129                for open_file in open_files:
1130                    if (
1131                        open_file is not self
1132                        and isinstance(open_file, FakeFileWrapper)
1133                        and self.file_object == open_file.file_object
1134                        and cast(FakeFileWrapper, open_file)._append
1135                    ):
1136                        open_file._read_seek += size
1137
1138    def _truncate_wrapper(self) -> Callable:
1139        """Wrap truncate() to allow flush after truncate.
1140
1141        Returns:
1142            Wrapper which is described below.
1143        """
1144        io_attr = self._io.truncate
1145
1146        def truncate_wrapper(*args, **kwargs):
1147            """Wrap truncate call to call flush after truncate."""
1148            if self._append:
1149                self._io.seek(self._read_seek, self._read_whence)
1150            size = io_attr(*args, **kwargs)
1151            self.flush()
1152            if not self.is_stream:
1153                self.file_object.size = size
1154                buffer_size = len(self._io.getvalue())
1155                if buffer_size < size:
1156                    self._io.seek(buffer_size)
1157                    self._io.putvalue(b"\0" * (size - buffer_size))
1158                    self.file_object.set_contents(self._io.getvalue(), self._encoding)
1159                    self._flush_pos = size
1160                    self._adapt_size_for_related_files(size - buffer_size)
1161
1162            self.flush()
1163            return size
1164
1165        return truncate_wrapper
1166
1167    def size(self) -> int:
1168        """Return the content size in bytes of the wrapped file."""
1169        return self.file_object.st_size
1170
1171    def __getattr__(self, name: str) -> Any:
1172        if self.file_object.is_large_file():
1173            raise FakeLargeFileIoException(self.file_path)
1174
1175        reading = name.startswith("read") or name == "next"
1176        truncate = name == "truncate"
1177        writing = name.startswith("write") or truncate
1178
1179        if reading or writing:
1180            self._check_open_file()
1181        if not self._read and reading:
1182            return self._read_error()
1183        if not self.opened_as_fd and not self.allow_update and writing:
1184            return self._write_error()
1185
1186        if reading:
1187            self._sync_io()
1188            if not self.is_stream:
1189                self.flush()
1190            if not self._filesystem.is_windows_fs:
1191                self.file_object.st_atime = helpers.now()
1192        if truncate:
1193            return self._truncate_wrapper()
1194        if self._append:
1195            if reading:
1196                return self._read_wrappers(name)
1197            elif not writing:
1198                return self._other_wrapper(name)
1199        if writing:
1200            return self._write_wrapper(name)
1201
1202        return getattr(self._io, name)
1203
1204    def _read_error(self) -> Callable:
1205        def read_error(*args, **kwargs):
1206            """Throw an error unless the argument is zero."""
1207            if args and args[0] == 0:
1208                if self._filesystem.is_windows_fs and self.raw_io:
1209                    return b"" if self._binary else ""
1210            self._raise("File is not open for reading.")
1211
1212        return read_error
1213
1214    def _write_error(self) -> Callable:
1215        def write_error(*args, **kwargs):
1216            """Throw an error."""
1217            if self.raw_io:
1218                if self._filesystem.is_windows_fs and args and len(args[0]) == 0:
1219                    return 0
1220            self._raise("File is not open for writing.")
1221
1222        return write_error
1223
1224    def _is_open(self) -> bool:
1225        if self.filedes is not None and self.filedes < len(self._filesystem.open_files):
1226            open_files = self._filesystem.open_files[self.filedes]
1227            if open_files is not None and self in open_files:
1228                return True
1229        return False
1230
1231    def _check_open_file(self) -> None:
1232        if not self.is_stream and not self._is_open():
1233            raise ValueError("I/O operation on closed file")
1234
1235    def __iter__(self) -> Union[Iterator[str], Iterator[bytes]]:
1236        if not self._read:
1237            self._raise("File is not open for reading")
1238        return self._io.__iter__()
1239
1240    def __next__(self):
1241        if not self._read:
1242            self._raise("File is not open for reading")
1243        return next(self._io)
1244
1245
1246class StandardStreamWrapper:
1247    """Wrapper for a system standard stream to be used in open files list."""
1248
1249    def __init__(self, stream_object: TextIO):
1250        self._stream_object = stream_object
1251        self.filedes: Optional[int] = None
1252
1253    def get_object(self) -> TextIO:
1254        return self._stream_object
1255
1256    def fileno(self) -> int:
1257        """Return the file descriptor of the wrapped standard stream."""
1258        if self.filedes is not None:
1259            return self.filedes
1260        raise OSError(errno.EBADF, "Invalid file descriptor")
1261
1262    def read(self, n: int = -1) -> bytes:
1263        return cast(bytes, self._stream_object.read())
1264
1265    def write(self, contents: bytes) -> int:
1266        self._stream_object.write(cast(str, contents))
1267        return len(contents)
1268
1269    def close(self) -> None:
1270        """We do not support closing standard streams."""
1271
1272    def close_fd(self, fd: Optional[int]) -> None:
1273        """We do not support closing standard streams."""
1274
1275    def is_stream(self) -> bool:
1276        return True
1277
1278    def __enter__(self) -> "StandardStreamWrapper":
1279        """To support usage of this standard stream with the 'with' statement."""
1280        return self
1281
1282    def __exit__(
1283        self,
1284        exc_type: Optional[Type[BaseException]],
1285        exc_val: Optional[BaseException],
1286        exc_tb: Optional[TracebackType],
1287    ) -> None:
1288        """To support usage of this standard stream with the 'with' statement."""
1289        self.close()
1290
1291
1292class FakeDirWrapper:
1293    """Wrapper for a FakeDirectory object to be used in open files list."""
1294
1295    def __init__(
1296        self,
1297        file_object: FakeDirectory,
1298        file_path: AnyString,
1299        filesystem: "FakeFilesystem",
1300    ):
1301        self.file_object = file_object
1302        self.file_path = file_path
1303        self._filesystem = filesystem
1304        self.filedes: Optional[int] = None
1305
1306    def get_object(self) -> FakeDirectory:
1307        """Return the FakeFile object that is wrapped by the current
1308        instance."""
1309        return self.file_object
1310
1311    def fileno(self) -> int:
1312        """Return the file descriptor of the file object."""
1313        if self.filedes is not None:
1314            return self.filedes
1315        raise OSError(errno.EBADF, "Invalid file descriptor")
1316
1317    def close(self) -> None:
1318        """Close the directory."""
1319        self.close_fd(self.filedes)
1320
1321    def close_fd(self, fd: Optional[int]) -> None:
1322        """Close the directory."""
1323        assert fd is not None
1324        self._filesystem.close_open_file(fd)
1325
1326    def read(self, numBytes: int = -1) -> bytes:
1327        """Read from the directory."""
1328        return self.file_object.read(numBytes)
1329
1330    def write(self, contents: bytes) -> int:
1331        """Write to the directory."""
1332        self.file_object.write(contents)
1333        return len(contents)
1334
1335    def __enter__(self) -> "FakeDirWrapper":
1336        """To support usage of this fake directory with the 'with' statement."""
1337        return self
1338
1339    def __exit__(
1340        self,
1341        exc_type: Optional[Type[BaseException]],
1342        exc_val: Optional[BaseException],
1343        exc_tb: Optional[TracebackType],
1344    ) -> None:
1345        """To support usage of this fake directory with the 'with' statement."""
1346        self.close()
1347
1348
1349class FakePipeWrapper:
1350    """Wrapper for a read or write descriptor of a real pipe object to be
1351    used in open files list.
1352    """
1353
1354    def __init__(
1355        self,
1356        filesystem: "FakeFilesystem",
1357        fd: int,
1358        can_write: bool,
1359        mode: str = "",
1360    ):
1361        self._filesystem = filesystem
1362        self.fd = fd  # the real file descriptor
1363        self.can_write = can_write
1364        self.file_object = None
1365        self.filedes: Optional[int] = None
1366        self.real_file = None
1367        if mode:
1368            self.real_file = open(fd, mode)
1369
1370    def __enter__(self) -> "FakePipeWrapper":
1371        """To support usage of this fake pipe with the 'with' statement."""
1372        return self
1373
1374    def __exit__(
1375        self,
1376        exc_type: Optional[Type[BaseException]],
1377        exc_val: Optional[BaseException],
1378        exc_tb: Optional[TracebackType],
1379    ) -> None:
1380        """To support usage of this fake pipe with the 'with' statement."""
1381        self.close()
1382
1383    def get_object(self) -> None:
1384        return self.file_object
1385
1386    def fileno(self) -> int:
1387        """Return the fake file descriptor of the pipe object."""
1388        if self.filedes is not None:
1389            return self.filedes
1390        raise OSError(errno.EBADF, "Invalid file descriptor")
1391
1392    def read(self, numBytes: int = -1) -> bytes:
1393        """Read from the real pipe."""
1394        if self.real_file:
1395            return self.real_file.read(numBytes)  # pytype: disable=bad-return-type
1396        return os.read(self.fd, numBytes)
1397
1398    def flush(self) -> None:
1399        """Flush the real pipe?"""
1400
1401    def write(self, contents: bytes) -> int:
1402        """Write to the real pipe."""
1403        if self.real_file:
1404            return self.real_file.write(contents)
1405        return os.write(self.fd, contents)
1406
1407    def close(self) -> None:
1408        """Close the pipe descriptor."""
1409        self.close_fd(self.filedes)
1410
1411    def close_fd(self, fd: Optional[int]) -> None:
1412        """Close the pipe descriptor with the given file descriptor."""
1413        assert fd is not None
1414        open_files = self._filesystem.open_files[fd]
1415        assert open_files is not None
1416        open_files.remove(self)
1417        if self.real_file:
1418            self.real_file.close()
1419        else:
1420            os.close(self.fd)
1421
1422    def readable(self) -> bool:
1423        """The pipe end can either be readable or writable."""
1424        return not self.can_write
1425
1426    def writable(self) -> bool:
1427        """The pipe end can either be readable or writable."""
1428        return self.can_write
1429
1430    def seekable(self) -> bool:
1431        """A pipe is not seekable."""
1432        return False
1433