1# Copyright 2009 Google Inc. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""A fake open() function replacement. See ``fake_filesystem`` for usage.""" 16 17import errno 18import io 19import os 20import sys 21from stat import ( 22 S_ISDIR, 23) 24from typing import ( 25 Optional, 26 Union, 27 Any, 28 Tuple, 29 cast, 30 AnyStr, 31 TYPE_CHECKING, 32 Callable, 33 IO, 34 List, 35) 36 37from pyfakefs import helpers 38from pyfakefs.fake_file import ( 39 FakePipeWrapper, 40 FakeFileWrapper, 41 FakeFile, 42 AnyFileWrapper, 43) 44from pyfakefs.helpers import ( 45 AnyString, 46 is_called_from_skipped_module, 47 is_root, 48 PERM_READ, 49 PERM_WRITE, 50 _OpenModes, 51) 52 53if TYPE_CHECKING: 54 from pyfakefs.fake_filesystem import FakeFilesystem 55 56 57# Work around pyupgrade auto-rewriting `io.open()` to `open()`. 58io_open = io.open 59 60_OPEN_MODE_MAP = { 61 # mode name:(file must exist, can read, can write, 62 # truncate, append, must not exist) 63 "r": (True, True, False, False, False, False), 64 "w": (False, False, True, True, False, False), 65 "a": (False, False, True, False, True, False), 66 "r+": (True, True, True, False, False, False), 67 "w+": (False, True, True, True, False, False), 68 "a+": (False, True, True, False, True, False), 69 "x": (False, False, True, False, False, True), 70 "x+": (False, True, True, False, False, True), 71} 72 73 74def fake_open( 75 filesystem: "FakeFilesystem", 76 skip_names: List[str], 77 file: Union[AnyStr, int], 78 mode: str = "r", 79 buffering: int = -1, 80 encoding: Optional[str] = None, 81 errors: Optional[str] = None, 82 newline: Optional[str] = None, 83 closefd: bool = True, 84 opener: Optional[Callable] = None, 85) -> Union[AnyFileWrapper, IO[Any]]: 86 """Redirect the call to FakeFileOpen. 87 See FakeFileOpen.call() for description. 88 """ 89 if is_called_from_skipped_module( 90 skip_names=skip_names, 91 case_sensitive=filesystem.is_case_sensitive, 92 check_open_code=sys.version_info >= (3, 12), 93 ): 94 return io_open( # pytype: disable=wrong-arg-count 95 file, 96 mode, 97 buffering, 98 encoding, 99 errors, 100 newline, 101 closefd, 102 opener, 103 ) 104 fake_file_open = FakeFileOpen(filesystem) 105 return fake_file_open( 106 file, mode, buffering, encoding, errors, newline, closefd, opener 107 ) 108 109 110class FakeFileOpen: 111 """Faked `file()` and `open()` function replacements. 112 113 Returns FakeFile objects in a FakeFilesystem in place of the `file()` 114 or `open()` function. 115 """ 116 117 __name__ = "FakeFileOpen" 118 119 def __init__( 120 self, 121 filesystem: "FakeFilesystem", 122 delete_on_close: bool = False, 123 raw_io: bool = False, 124 ): 125 """ 126 Args: 127 filesystem: FakeFilesystem used to provide file system information 128 delete_on_close: optional boolean, deletes file on close() 129 """ 130 self.filesystem = filesystem 131 self._delete_on_close = delete_on_close 132 self.raw_io = raw_io 133 134 def __call__(self, *args: Any, **kwargs: Any) -> AnyFileWrapper: 135 """Redirects calls to file() or open() to appropriate method.""" 136 return self.call(*args, **kwargs) 137 138 def call( 139 self, 140 file_: Union[AnyStr, int], 141 mode: str = "r", 142 buffering: int = -1, 143 encoding: Optional[str] = None, 144 errors: Optional[str] = None, 145 newline: Optional[str] = None, 146 closefd: bool = True, 147 opener: Any = None, 148 open_modes: Optional[_OpenModes] = None, 149 ) -> AnyFileWrapper: 150 """Return a file-like object with the contents of the target 151 file object. 152 153 Args: 154 file_: Path to target file or a file descriptor. 155 mode: Additional file modes (all modes in `open()` are supported). 156 buffering: the buffer size used for writing. Data will only be 157 flushed if buffer size is exceeded. The default (-1) uses a 158 system specific default buffer size. Text line mode (e.g. 159 buffering=1 in text mode) is not supported. 160 encoding: The encoding used to encode unicode strings / decode 161 bytes. 162 errors: (str) Defines how encoding errors are handled. 163 newline: Controls universal newlines, passed to stream object. 164 closefd: If a file descriptor rather than file name is passed, 165 and this is set to `False`, then the file descriptor is kept 166 open when file is closed. 167 opener: an optional function object that will be called with 168 `file_` and the open flags (derived from `mode`) and returns 169 a file descriptor. 170 open_modes: Modes for opening files if called from low-level API. 171 172 Returns: 173 A file-like object containing the contents of the target file. 174 175 Raises: 176 OSError depending on Python version / call mode: 177 - if the target object is a directory 178 - on an invalid path 179 - if the file does not exist when it should 180 - if the file exists but should not 181 - if permission is denied 182 ValueError: for an invalid mode or mode combination 183 """ 184 binary = "b" in mode 185 186 if binary and encoding: 187 raise ValueError("binary mode doesn't take an encoding argument") 188 189 newline, open_modes = self._handle_file_mode(mode, newline, open_modes) 190 opened_as_fd = isinstance(file_, int) 191 192 # the pathlib opener is defined in a Path instance that may not be 193 # patched under some circumstances; as it just calls standard open(), 194 # we may ignore it, as it would not change the behavior 195 if opener is not None and opener.__module__ not in ( 196 "pathlib", 197 "pathlib._local", 198 ): 199 # opener shall return a file descriptor, which will be handled 200 # here as if directly passed 201 file_ = opener(file_, self._open_flags_from_open_modes(open_modes)) 202 203 file_object, file_path, filedes, real_path, can_write = self._handle_file_arg( 204 file_ 205 ) 206 if file_object is None and file_path is None: 207 # file must be a fake pipe wrapper, find it... 208 if ( 209 filedes is None 210 or len(self.filesystem.open_files) <= filedes 211 or not self.filesystem.open_files[filedes] 212 ): 213 raise OSError(errno.EBADF, "invalid pipe file descriptor") 214 wrappers = self.filesystem.open_files[filedes] 215 assert wrappers is not None 216 existing_wrapper = wrappers[0] 217 assert isinstance(existing_wrapper, FakePipeWrapper) 218 wrapper = FakePipeWrapper( 219 self.filesystem, 220 existing_wrapper.fd, 221 existing_wrapper.can_write, 222 mode, 223 ) 224 file_des = self.filesystem.add_open_file(wrapper) 225 wrapper.filedes = file_des 226 return wrapper 227 228 assert file_path is not None 229 if not filedes: 230 closefd = True 231 232 if ( 233 not opener 234 and open_modes.must_not_exist 235 and ( 236 file_object 237 or self.filesystem.islink(file_path) 238 and not self.filesystem.is_windows_fs 239 ) 240 ): 241 self.filesystem.raise_os_error(errno.EEXIST, file_path) 242 243 assert real_path is not None 244 file_object = self._init_file_object( 245 file_object, 246 file_path, 247 open_modes, 248 real_path, 249 check_file_permission=not opened_as_fd, 250 ) 251 252 if S_ISDIR(file_object.st_mode): 253 if self.filesystem.is_windows_fs: 254 self.filesystem.raise_os_error(errno.EACCES, file_path) 255 else: 256 self.filesystem.raise_os_error(errno.EISDIR, file_path) 257 258 # If you print obj.name, the argument to open() must be printed. 259 # Not the abspath, not the filename, but the actual argument. 260 file_object.opened_as = file_path 261 if open_modes.truncate: 262 current_time = helpers.now() 263 file_object.st_mtime = current_time 264 if not self.filesystem.is_windows_fs: 265 file_object.st_ctime = current_time 266 267 fakefile = FakeFileWrapper( 268 file_object, 269 file_path, 270 update=open_modes.can_write and can_write, 271 read=open_modes.can_read, 272 append=open_modes.append, 273 delete_on_close=self._delete_on_close, 274 filesystem=self.filesystem, 275 newline=newline, 276 binary=binary, 277 closefd=closefd, 278 encoding=encoding, 279 errors=errors, 280 buffering=buffering, 281 raw_io=self.raw_io, 282 opened_as_fd=opened_as_fd, 283 ) 284 if filedes is not None: 285 fakefile.filedes = filedes 286 # replace the file wrapper 287 open_files_list = self.filesystem.open_files[filedes] 288 assert open_files_list is not None 289 open_files_list.append(fakefile) 290 else: 291 fakefile.filedes = self.filesystem.add_open_file(fakefile) 292 return fakefile 293 294 @staticmethod 295 def _open_flags_from_open_modes(open_modes: _OpenModes) -> int: 296 flags = 0 297 if open_modes.can_read and open_modes.can_write: 298 flags |= os.O_RDWR 299 elif open_modes.can_read: 300 flags |= os.O_RDONLY 301 elif open_modes.can_write: 302 flags |= os.O_WRONLY 303 304 if open_modes.append: 305 flags |= os.O_APPEND 306 if open_modes.truncate: 307 flags |= os.O_TRUNC 308 if not open_modes.must_exist and open_modes.can_write: 309 flags |= os.O_CREAT 310 if open_modes.must_not_exist and open_modes.can_write: 311 flags |= os.O_EXCL 312 return flags 313 314 def _init_file_object( 315 self, 316 file_object: Optional[FakeFile], 317 file_path: AnyStr, 318 open_modes: _OpenModes, 319 real_path: AnyString, 320 check_file_permission: bool, 321 ) -> FakeFile: 322 if file_object: 323 if ( 324 check_file_permission 325 and not is_root() 326 and ( 327 (open_modes.can_read and not file_object.has_permission(PERM_READ)) 328 or ( 329 open_modes.can_write 330 and not file_object.has_permission(PERM_WRITE) 331 ) 332 ) 333 ): 334 self.filesystem.raise_os_error(errno.EACCES, file_path) 335 if open_modes.can_write: 336 if open_modes.truncate: 337 file_object.set_contents("") 338 else: 339 if open_modes.must_exist: 340 self.filesystem.raise_os_error(errno.ENOENT, file_path) 341 if self.filesystem.islink(file_path): 342 link_object = self.filesystem.resolve(file_path, follow_symlinks=False) 343 assert link_object.contents is not None 344 target_path = cast( 345 AnyStr, link_object.contents 346 ) # pytype: disable=invalid-annotation 347 else: 348 target_path = file_path 349 if self.filesystem.ends_with_path_separator(target_path): 350 error = ( 351 errno.EINVAL 352 if self.filesystem.is_windows_fs 353 else errno.ENOENT 354 if self.filesystem.is_macos 355 else errno.EISDIR 356 ) 357 self.filesystem.raise_os_error(error, file_path) 358 file_object = self.filesystem.create_file_internally( 359 real_path, create_missing_dirs=False, apply_umask=True 360 ) 361 return file_object 362 363 def _handle_file_arg( 364 self, file_: Union[AnyStr, int] 365 ) -> Tuple[ 366 Optional[FakeFile], Optional[AnyStr], Optional[int], Optional[AnyStr], bool 367 ]: 368 file_object = None 369 if isinstance(file_, int): 370 # opening a file descriptor 371 filedes: int = file_ 372 wrapper = self.filesystem.get_open_file(filedes) 373 can_write = True 374 if isinstance(wrapper, FakePipeWrapper): 375 return None, None, filedes, None, can_write 376 if isinstance(wrapper, FakeFileWrapper): 377 self._delete_on_close = wrapper.delete_on_close 378 can_write = wrapper.allow_update 379 380 file_object = cast( 381 FakeFile, self.filesystem.get_open_file(filedes).get_object() 382 ) 383 assert file_object is not None 384 path = file_object.name 385 return ( # pytype: disable=bad-return-type 386 file_object, 387 cast(AnyStr, path), # pytype: disable=invalid-annotation 388 filedes, 389 cast(AnyStr, path), # pytype: disable=invalid-annotation 390 can_write, 391 ) 392 393 # open a file by path 394 file_path = cast(AnyStr, file_) # pytype: disable=invalid-annotation 395 if file_path == self.filesystem.dev_null.name: 396 file_object = self.filesystem.dev_null 397 real_path = file_path 398 else: 399 real_path = self.filesystem.resolve_path(file_path) 400 if self.filesystem.exists(file_path): 401 file_object = self.filesystem.get_object_from_normpath( 402 real_path, check_read_perm=False 403 ) 404 return file_object, file_path, None, real_path, True 405 406 def _handle_file_mode( 407 self, 408 mode: str, 409 newline: Optional[str], 410 open_modes: Optional[_OpenModes], 411 ) -> Tuple[Optional[str], _OpenModes]: 412 orig_modes = mode # Save original modes for error messages. 413 # Normalize modes. Handle 't' and 'U'. 414 if ("b" in mode and "t" in mode) or ( 415 sys.version_info > (3, 10) and "U" in mode 416 ): 417 raise ValueError("Invalid mode: " + mode) 418 mode = mode.replace("t", "").replace("b", "") 419 mode = mode.replace("rU", "r").replace("U", "r") 420 if not self.raw_io: 421 if mode not in _OPEN_MODE_MAP: 422 raise ValueError("Invalid mode: %r" % orig_modes) 423 open_modes = _OpenModes(*_OPEN_MODE_MAP[mode]) 424 assert open_modes is not None 425 return newline, open_modes 426