• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2009 Google Inc. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Uses :py:class:`FakeIoModule` to provide a
16fake ``io`` module replacement.
17"""
18
19import _io  # pytype: disable=import-error
20import io
21import sys
22from enum import Enum
23from typing import (
24    List,
25    Optional,
26    Callable,
27    Union,
28    Any,
29    AnyStr,
30    IO,
31    TYPE_CHECKING,
32)
33
34from pyfakefs.fake_file import AnyFileWrapper
35from pyfakefs.fake_open import fake_open
36from pyfakefs.helpers import IS_PYPY, is_called_from_skipped_module
37
38if TYPE_CHECKING:
39    from pyfakefs.fake_filesystem import FakeFilesystem
40
41
42class PatchMode(Enum):
43    """Defines if patching shall be on, off, or in automatic mode.
44    Currently only used for `patch_open_code` option.
45    """
46
47    OFF = 1
48    AUTO = 2
49    ON = 3
50
51
52class FakeIoModule:
53    """Uses FakeFilesystem to provide a fake io module replacement.
54
55    You need a fake_filesystem to use this:
56    filesystem = fake_filesystem.FakeFilesystem()
57    my_io_module = fake_io.FakeIoModule(filesystem)
58    """
59
60    @staticmethod
61    def dir() -> List[str]:
62        """Return the list of patched function names. Used for patching
63        functions imported from the module.
64        """
65        _dir = ["open"]
66        if sys.version_info >= (3, 8):
67            _dir.append("open_code")
68        return _dir
69
70    def __init__(self, filesystem: "FakeFilesystem"):
71        """
72        Args:
73            filesystem: FakeFilesystem used to provide file system information.
74        """
75        self.filesystem = filesystem
76        self.skip_names: List[str] = []
77        self._io_module = io
78
79    def open(
80        self,
81        file: Union[AnyStr, int],
82        mode: str = "r",
83        buffering: int = -1,
84        encoding: Optional[str] = None,
85        errors: Optional[str] = None,
86        newline: Optional[str] = None,
87        closefd: bool = True,
88        opener: Optional[Callable] = None,
89    ) -> Union[AnyFileWrapper, IO[Any]]:
90        """Redirect the call to FakeFileOpen.
91        See FakeFileOpen.call() for description.
92        """
93        return fake_open(
94            self.filesystem,
95            self.skip_names,
96            file,
97            mode,
98            buffering,
99            encoding,
100            errors,
101            newline,
102            closefd,
103            opener,
104        )
105
106    if sys.version_info >= (3, 8):
107
108        def open_code(self, path):
109            """Redirect the call to open. Note that the behavior of the real
110            function may be overridden by an earlier call to the
111            PyFile_SetOpenCodeHook(). This behavior is not reproduced here.
112            """
113            if not isinstance(path, str) and not IS_PYPY:
114                raise TypeError("open_code() argument 'path' must be str, not int")
115            patch_mode = self.filesystem.patch_open_code
116            if (
117                patch_mode == PatchMode.AUTO
118                and self.filesystem.exists(path)
119                or patch_mode == PatchMode.ON
120            ):
121                return self.open(path, mode="rb")
122            # mostly this is used for compiled code -
123            # don't patch these, as the files are probably in the real fs
124            return self._io_module.open_code(path)
125
126    def __getattr__(self, name):
127        """Forwards any unfaked calls to the standard io module."""
128        return getattr(self._io_module, name)
129
130
131class FakeIoModule2(FakeIoModule):
132    """Similar to ``FakeIoModule``, but fakes `_io` instead of `io`."""
133
134    def __init__(self, filesystem: "FakeFilesystem"):
135        """
136        Args:
137            filesystem: FakeFilesystem used to provide file system information.
138        """
139        super().__init__(filesystem)
140        self._io_module = _io
141
142
143if sys.platform != "win32":
144    import fcntl
145
146    class FakeFcntlModule:
147        """Replaces the fcntl module. Only valid under Linux/MacOS,
148        currently just mocks the functionality away.
149        """
150
151        @staticmethod
152        def dir() -> List[str]:
153            """Return the list of patched function names. Used for patching
154            functions imported from the module.
155            """
156            return ["fcntl", "ioctl", "flock", "lockf"]
157
158        def __init__(self, filesystem: "FakeFilesystem"):
159            """
160            Args:
161                filesystem: FakeFilesystem used to provide file system
162                    information (currently not used).
163            """
164            self.filesystem = filesystem
165            self._fcntl_module = fcntl
166
167        def fcntl(self, fd: int, cmd: int, arg: int = 0) -> Union[int, bytes]:
168            return 0 if isinstance(arg, int) else arg
169
170        def ioctl(
171            self, fd: int, request: int, arg: int = 0, mutate_flag: bool = True
172        ) -> Union[int, bytes]:
173            return 0 if isinstance(arg, int) else arg
174
175        def flock(self, fd: int, operation: int) -> None:
176            pass
177
178        def lockf(
179            self, fd: int, cmd: int, len: int = 0, start: int = 0, whence=0
180        ) -> Any:
181            pass
182
183        def __getattribute__(self, name):
184            """Prevents patching of skipped modules."""
185            fs: FakeFilesystem = object.__getattribute__(self, "filesystem")
186            fnctl_module = object.__getattribute__(self, "_fcntl_module")
187            if fs.patcher:
188                if is_called_from_skipped_module(
189                    skip_names=fs.patcher.skip_names,
190                    case_sensitive=fs.is_case_sensitive,
191                ):
192                    # remove the `self` argument for FakeOsModule methods
193                    return getattr(fnctl_module, name)
194
195            return object.__getattribute__(self, name)
196
197        def __getattr__(self, name):
198            """Forwards any unfaked calls to the standard fcntl module."""
199            return getattr(self._fcntl_module, name)
200