1# Copyright 2009 Google Inc. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Uses :py:class:`FakeIoModule` to provide a 16fake ``io`` module replacement. 17""" 18 19import _io # pytype: disable=import-error 20import io 21import sys 22from enum import Enum 23from typing import ( 24 List, 25 Optional, 26 Callable, 27 Union, 28 Any, 29 AnyStr, 30 IO, 31 TYPE_CHECKING, 32) 33 34from pyfakefs.fake_file import AnyFileWrapper 35from pyfakefs.fake_open import fake_open 36from pyfakefs.helpers import IS_PYPY, is_called_from_skipped_module 37 38if TYPE_CHECKING: 39 from pyfakefs.fake_filesystem import FakeFilesystem 40 41 42class PatchMode(Enum): 43 """Defines if patching shall be on, off, or in automatic mode. 44 Currently only used for `patch_open_code` option. 45 """ 46 47 OFF = 1 48 AUTO = 2 49 ON = 3 50 51 52class FakeIoModule: 53 """Uses FakeFilesystem to provide a fake io module replacement. 54 55 You need a fake_filesystem to use this: 56 filesystem = fake_filesystem.FakeFilesystem() 57 my_io_module = fake_io.FakeIoModule(filesystem) 58 """ 59 60 @staticmethod 61 def dir() -> List[str]: 62 """Return the list of patched function names. Used for patching 63 functions imported from the module. 64 """ 65 _dir = ["open"] 66 if sys.version_info >= (3, 8): 67 _dir.append("open_code") 68 return _dir 69 70 def __init__(self, filesystem: "FakeFilesystem"): 71 """ 72 Args: 73 filesystem: FakeFilesystem used to provide file system information. 74 """ 75 self.filesystem = filesystem 76 self.skip_names: List[str] = [] 77 self._io_module = io 78 79 def open( 80 self, 81 file: Union[AnyStr, int], 82 mode: str = "r", 83 buffering: int = -1, 84 encoding: Optional[str] = None, 85 errors: Optional[str] = None, 86 newline: Optional[str] = None, 87 closefd: bool = True, 88 opener: Optional[Callable] = None, 89 ) -> Union[AnyFileWrapper, IO[Any]]: 90 """Redirect the call to FakeFileOpen. 91 See FakeFileOpen.call() for description. 92 """ 93 return fake_open( 94 self.filesystem, 95 self.skip_names, 96 file, 97 mode, 98 buffering, 99 encoding, 100 errors, 101 newline, 102 closefd, 103 opener, 104 ) 105 106 if sys.version_info >= (3, 8): 107 108 def open_code(self, path): 109 """Redirect the call to open. Note that the behavior of the real 110 function may be overridden by an earlier call to the 111 PyFile_SetOpenCodeHook(). This behavior is not reproduced here. 112 """ 113 if not isinstance(path, str) and not IS_PYPY: 114 raise TypeError("open_code() argument 'path' must be str, not int") 115 patch_mode = self.filesystem.patch_open_code 116 if ( 117 patch_mode == PatchMode.AUTO 118 and self.filesystem.exists(path) 119 or patch_mode == PatchMode.ON 120 ): 121 return self.open(path, mode="rb") 122 # mostly this is used for compiled code - 123 # don't patch these, as the files are probably in the real fs 124 return self._io_module.open_code(path) 125 126 def __getattr__(self, name): 127 """Forwards any unfaked calls to the standard io module.""" 128 return getattr(self._io_module, name) 129 130 131class FakeIoModule2(FakeIoModule): 132 """Similar to ``FakeIoModule``, but fakes `_io` instead of `io`.""" 133 134 def __init__(self, filesystem: "FakeFilesystem"): 135 """ 136 Args: 137 filesystem: FakeFilesystem used to provide file system information. 138 """ 139 super().__init__(filesystem) 140 self._io_module = _io 141 142 143if sys.platform != "win32": 144 import fcntl 145 146 class FakeFcntlModule: 147 """Replaces the fcntl module. Only valid under Linux/MacOS, 148 currently just mocks the functionality away. 149 """ 150 151 @staticmethod 152 def dir() -> List[str]: 153 """Return the list of patched function names. Used for patching 154 functions imported from the module. 155 """ 156 return ["fcntl", "ioctl", "flock", "lockf"] 157 158 def __init__(self, filesystem: "FakeFilesystem"): 159 """ 160 Args: 161 filesystem: FakeFilesystem used to provide file system 162 information (currently not used). 163 """ 164 self.filesystem = filesystem 165 self._fcntl_module = fcntl 166 167 def fcntl(self, fd: int, cmd: int, arg: int = 0) -> Union[int, bytes]: 168 return 0 if isinstance(arg, int) else arg 169 170 def ioctl( 171 self, fd: int, request: int, arg: int = 0, mutate_flag: bool = True 172 ) -> Union[int, bytes]: 173 return 0 if isinstance(arg, int) else arg 174 175 def flock(self, fd: int, operation: int) -> None: 176 pass 177 178 def lockf( 179 self, fd: int, cmd: int, len: int = 0, start: int = 0, whence=0 180 ) -> Any: 181 pass 182 183 def __getattribute__(self, name): 184 """Prevents patching of skipped modules.""" 185 fs: FakeFilesystem = object.__getattribute__(self, "filesystem") 186 fnctl_module = object.__getattribute__(self, "_fcntl_module") 187 if fs.patcher: 188 if is_called_from_skipped_module( 189 skip_names=fs.patcher.skip_names, 190 case_sensitive=fs.is_case_sensitive, 191 ): 192 # remove the `self` argument for FakeOsModule methods 193 return getattr(fnctl_module, name) 194 195 return object.__getattribute__(self, name) 196 197 def __getattr__(self, name): 198 """Forwards any unfaked calls to the standard fcntl module.""" 199 return getattr(self._fcntl_module, name) 200