1# Copyright 2009 Google Inc. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""A fake filesystem implementation for unit testing. 16 17:Includes: 18 * :py:class:`FakeFile`: Provides the appearance of a real file. 19 * :py:class:`FakeDirectory`: Provides the appearance of a real directory. 20 * :py:class:`FakeFilesystem`: Provides the appearance of a real directory 21 hierarchy. 22 * :py:class:`FakeOsModule`: Uses :py:class:`FakeFilesystem` to provide a 23 fake :py:mod:`os` module replacement. 24 * :py:class:`FakeIoModule`: Uses :py:class:`FakeFilesystem` to provide a 25 fake ``io`` module replacement. 26 * :py:class:`FakePathModule`: Faked ``os.path`` module replacement. 27 * :py:class:`FakeFileOpen`: Faked ``file()`` and ``open()`` function 28 replacements. 29 30:Usage: 31 32>>> from pyfakefs import fake_filesystem 33>>> filesystem = fake_filesystem.FakeFilesystem() 34>>> os_module = fake_filesystem.FakeOsModule(filesystem) 35>>> pathname = '/a/new/dir/new-file' 36 37Create a new file object, creating parent directory objects as needed: 38 39>>> os_module.path.exists(pathname) 40False 41>>> new_file = filesystem.create_file(pathname) 42 43File objects can't be overwritten: 44 45>>> os_module.path.exists(pathname) 46True 47>>> try: 48... filesystem.create_file(pathname) 49... except OSError as e: 50... assert e.errno == errno.EEXIST, 'unexpected errno: %d' % e.errno 51... assert e.strerror == 'File exists in the fake filesystem' 52 53Remove a file object: 54 55>>> filesystem.remove_object(pathname) 56>>> os_module.path.exists(pathname) 57False 58 59Create a new file object at the previous path: 60 61>>> beatles_file = filesystem.create_file(pathname, 62... contents='Dear Prudence\\nWon\\'t you come out to play?\\n') 63>>> os_module.path.exists(pathname) 64True 65 66Use the FakeFileOpen class to read fake file objects: 67 68>>> file_module = fake_filesystem.FakeFileOpen(filesystem) 69>>> for line in file_module(pathname): 70... print(line.rstrip()) 71... 72Dear Prudence 73Won't you come out to play? 74 75File objects cannot be treated like directory objects: 76 77>>> try: 78... os_module.listdir(pathname) 79... except OSError as e: 80... assert e.errno == errno.ENOTDIR, 'unexpected errno: %d' % e.errno 81... assert e.strerror == 'Not a directory in the fake filesystem' 82 83The FakeOsModule can list fake directory objects: 84 85>>> os_module.listdir(os_module.path.dirname(pathname)) 86['new-file'] 87 88The FakeOsModule also supports stat operations: 89 90>>> import stat 91>>> stat.S_ISREG(os_module.stat(pathname).st_mode) 92True 93>>> stat.S_ISDIR(os_module.stat(os_module.path.dirname(pathname)).st_mode) 94True 95""" 96import errno 97import heapq 98import io 99import locale 100import os 101import random 102import sys 103import traceback 104import uuid 105from collections import namedtuple 106from doctest import TestResults 107from enum import Enum 108from stat import ( 109 S_IFREG, S_IFDIR, S_ISLNK, S_IFMT, S_ISDIR, S_IFLNK, S_ISREG, S_IFSOCK 110) 111from types import ModuleType, TracebackType 112from typing import ( 113 List, Optional, Callable, Union, Any, Dict, Tuple, cast, AnyStr, overload, 114 NoReturn, ClassVar, IO, Iterator, TextIO, Type 115) 116from pyfakefs.deprecator import Deprecator 117from pyfakefs.extra_packages import use_scandir 118from pyfakefs.fake_scandir import scandir, walk, ScanDirIter 119from pyfakefs.helpers import ( 120 FakeStatResult, BinaryBufferIO, TextBufferIO, 121 is_int_type, is_byte_string, is_unicode_string, make_string_path, 122 IS_PYPY, to_string, matching_string, real_encoding, now, AnyPath, to_bytes 123) 124from pyfakefs import __version__ # noqa: F401 for upwards compatibility 125 126PERM_READ = 0o400 # Read permission bit. 127PERM_WRITE = 0o200 # Write permission bit. 128PERM_EXE = 0o100 # Execute permission bit. 129PERM_DEF = 0o777 # Default permission bits. 130PERM_DEF_FILE = 0o666 # Default permission bits (regular file) 131PERM_ALL = 0o7777 # All permission bits. 132 133_OpenModes = namedtuple( 134 '_OpenModes', 135 'must_exist can_read can_write truncate append must_not_exist' 136) 137 138_OPEN_MODE_MAP = { 139 # mode name:(file must exist, can read, can write, 140 # truncate, append, must not exist) 141 'r': (True, True, False, False, False, False), 142 'w': (False, False, True, True, False, False), 143 'a': (False, False, True, False, True, False), 144 'r+': (True, True, True, False, False, False), 145 'w+': (False, True, True, True, False, False), 146 'a+': (False, True, True, False, True, False), 147 'x': (False, False, True, False, False, True), 148 'x+': (False, True, True, False, False, True) 149} 150 151AnyFileWrapper = Union[ 152 "FakeFileWrapper", "FakeDirWrapper", 153 "StandardStreamWrapper", "FakePipeWrapper" 154] 155 156AnyString = Union[str, bytes] 157 158AnyFile = Union["FakeFile", "FakeDirectory"] 159 160if sys.platform.startswith('linux'): 161 # on newer Linux system, the default maximum recursion depth is 40 162 # we ignore older systems here 163 _MAX_LINK_DEPTH = 40 164else: 165 # on MacOS and Windows, the maximum recursion depth is 32 166 _MAX_LINK_DEPTH = 32 167 168NR_STD_STREAMS = 3 169if sys.platform == 'win32': 170 USER_ID = 1 171 GROUP_ID = 1 172else: 173 USER_ID = os.getuid() 174 GROUP_ID = os.getgid() 175 176 177class OSType(Enum): 178 """Defines the real or simulated OS of the underlying file system.""" 179 LINUX = "linux" 180 MACOS = "macos" 181 WINDOWS = "windows" 182 183 184class PatchMode(Enum): 185 """Defines if patching shall be on, off, or in automatic mode. 186 Currently only used for `patch_open_code` option. 187 """ 188 OFF = 1 189 AUTO = 2 190 ON = 3 191 192 193def set_uid(uid: int) -> None: 194 """Set the global user id. This is used as st_uid for new files 195 and to differentiate between a normal user and the root user (uid 0). 196 For the root user, some permission restrictions are ignored. 197 198 Args: 199 uid: (int) the user ID of the user calling the file system functions. 200 """ 201 global USER_ID 202 USER_ID = uid 203 204 205def set_gid(gid: int) -> None: 206 """Set the global group id. This is only used to set st_gid for new files, 207 no permision checks are performed. 208 209 Args: 210 gid: (int) the group ID of the user calling the file system functions. 211 """ 212 global GROUP_ID 213 GROUP_ID = gid 214 215 216def reset_ids() -> None: 217 """Set the global user ID and group ID back to default values.""" 218 if sys.platform == 'win32': 219 set_uid(1) 220 set_gid(1) 221 else: 222 set_uid(os.getuid()) 223 set_gid(os.getgid()) 224 225 226def is_root() -> bool: 227 """Return True if the current user is the root user.""" 228 return USER_ID == 0 229 230 231class FakeLargeFileIoException(Exception): 232 """Exception thrown on unsupported operations for fake large files. 233 Fake large files have a size with no real content. 234 """ 235 236 def __init__(self, file_path: str) -> None: 237 super(FakeLargeFileIoException, self).__init__( 238 'Read and write operations not supported for ' 239 'fake large file: %s' % file_path) 240 241 242def _copy_module(old: ModuleType) -> ModuleType: 243 """Recompiles and creates new module object.""" 244 saved = sys.modules.pop(old.__name__, None) 245 new = __import__(old.__name__) 246 if saved is not None: 247 sys.modules[old.__name__] = saved 248 return new 249 250 251class FakeFile: 252 """Provides the appearance of a real file. 253 254 Attributes currently faked out: 255 * `st_mode`: user-specified, otherwise S_IFREG 256 * `st_ctime`: the time.time() timestamp of the file change time (updated 257 each time a file's attributes is modified). 258 * `st_atime`: the time.time() timestamp when the file was last accessed. 259 * `st_mtime`: the time.time() timestamp when the file was last modified. 260 * `st_size`: the size of the file 261 * `st_nlink`: the number of hard links to the file 262 * `st_ino`: the inode number - a unique number identifying the file 263 * `st_dev`: a unique number identifying the (fake) file system device 264 the file belongs to 265 * `st_uid`: always set to USER_ID, which can be changed globally using 266 `set_uid` 267 * `st_gid`: always set to GROUP_ID, which can be changed globally using 268 `set_gid` 269 270 .. note:: The resolution for `st_ctime`, `st_mtime` and `st_atime` in the 271 real file system depends on the used file system (for example it is 272 only 1s for HFS+ and older Linux file systems, but much higher for 273 ext4 and NTFS). This is currently ignored by pyfakefs, which uses 274 the resolution of `time.time()`. 275 276 Under Windows, `st_atime` is not updated for performance reasons by 277 default. pyfakefs never updates `st_atime` under Windows, assuming 278 the default setting. 279 """ 280 stat_types = ( 281 'st_mode', 'st_ino', 'st_dev', 'st_nlink', 'st_uid', 'st_gid', 282 'st_size', 'st_atime', 'st_mtime', 'st_ctime', 283 'st_atime_ns', 'st_mtime_ns', 'st_ctime_ns' 284 ) 285 286 def __init__(self, name: AnyStr, 287 st_mode: int = S_IFREG | PERM_DEF_FILE, 288 contents: Optional[AnyStr] = None, 289 filesystem: Optional["FakeFilesystem"] = None, 290 encoding: Optional[str] = None, 291 errors: Optional[str] = None, 292 side_effect: Optional[Callable[["FakeFile"], None]] = None): 293 """ 294 Args: 295 name: Name of the file/directory, without parent path information 296 st_mode: The stat.S_IF* constant representing the file type (i.e. 297 stat.S_IFREG, stat.S_IFDIR), and the file permissions. 298 If no file type is set (e.g. permission flags only), a 299 regular file type is assumed. 300 contents: The contents of the filesystem object; should be a string 301 or byte object for regular files, and a dict of other 302 FakeFile or FakeDirectory objects wih the file names as 303 keys for FakeDirectory objects 304 filesystem: The fake filesystem where the file is created. 305 encoding: If contents is a unicode string, the encoding used 306 for serialization. 307 errors: The error mode used for encoding/decoding errors. 308 side_effect: function handle that is executed when file is written, 309 must accept the file object as an argument. 310 """ 311 # to be backwards compatible regarding argument order, we raise on None 312 if filesystem is None: 313 raise ValueError('filesystem shall not be None') 314 self.filesystem: FakeFilesystem = filesystem 315 self._side_effect: Optional[Callable] = side_effect 316 self.name: AnyStr = name # type: ignore[assignment] 317 self.stat_result = FakeStatResult( 318 filesystem.is_windows_fs, USER_ID, GROUP_ID, now()) 319 if st_mode >> 12 == 0: 320 st_mode |= S_IFREG 321 self.stat_result.st_mode = st_mode 322 self.st_size: int = 0 323 self.encoding: Optional[str] = real_encoding(encoding) 324 self.errors: str = errors or 'strict' 325 self._byte_contents: Optional[bytes] = self._encode_contents(contents) 326 self.stat_result.st_size = ( 327 len(self._byte_contents) if self._byte_contents is not None else 0) 328 self.epoch: int = 0 329 self.parent_dir: Optional[FakeDirectory] = None 330 # Linux specific: extended file system attributes 331 self.xattr: Dict = {} 332 self.opened_as: AnyString = '' 333 334 @property 335 def byte_contents(self) -> Optional[bytes]: 336 """Return the contents as raw byte array.""" 337 return self._byte_contents 338 339 @property 340 def contents(self) -> Optional[str]: 341 """Return the contents as string with the original encoding.""" 342 if isinstance(self.byte_contents, bytes): 343 return self.byte_contents.decode( 344 self.encoding or locale.getpreferredencoding(False), 345 errors=self.errors) 346 return None 347 348 @property 349 def st_ctime(self) -> float: 350 """Return the creation time of the fake file.""" 351 return self.stat_result.st_ctime 352 353 @st_ctime.setter 354 def st_ctime(self, val: float) -> None: 355 """Set the creation time of the fake file.""" 356 self.stat_result.st_ctime = val 357 358 @property 359 def st_atime(self) -> float: 360 """Return the access time of the fake file.""" 361 return self.stat_result.st_atime 362 363 @st_atime.setter 364 def st_atime(self, val: float) -> None: 365 """Set the access time of the fake file.""" 366 self.stat_result.st_atime = val 367 368 @property 369 def st_mtime(self) -> float: 370 """Return the modification time of the fake file.""" 371 return self.stat_result.st_mtime 372 373 @st_mtime.setter 374 def st_mtime(self, val: float) -> None: 375 """Set the modification time of the fake file.""" 376 self.stat_result.st_mtime = val 377 378 def set_large_file_size(self, st_size: int) -> None: 379 """Sets the self.st_size attribute and replaces self.content with None. 380 381 Provided specifically to simulate very large files without regards 382 to their content (which wouldn't fit in memory). 383 Note that read/write operations with such a file raise 384 :py:class:`FakeLargeFileIoException`. 385 386 Args: 387 st_size: (int) The desired file size 388 389 Raises: 390 OSError: if the st_size is not a non-negative integer, 391 or if st_size exceeds the available file system space 392 """ 393 self._check_positive_int(st_size) 394 if self.st_size: 395 self.size = 0 396 if self.filesystem: 397 self.filesystem.change_disk_usage(st_size, self.name, self.st_dev) 398 self.st_size = st_size 399 self._byte_contents = None 400 401 def _check_positive_int(self, size: int) -> None: 402 # the size should be an positive integer value 403 if not is_int_type(size) or size < 0: 404 self.filesystem.raise_os_error(errno.ENOSPC, self.name) 405 406 def is_large_file(self) -> bool: 407 """Return `True` if this file was initialized with size 408 but no contents. 409 """ 410 return self._byte_contents is None 411 412 def _encode_contents( 413 self, contents: Union[str, bytes, None]) -> Optional[bytes]: 414 if is_unicode_string(contents): 415 contents = bytes( 416 cast(str, contents), 417 self.encoding or locale.getpreferredencoding(False), 418 self.errors) 419 return cast(bytes, contents) 420 421 def set_initial_contents(self, contents: AnyStr) -> bool: 422 """Sets the file contents and size. 423 Called internally after initial file creation. 424 425 Args: 426 contents: string, new content of file. 427 428 Returns: 429 True if the contents have been changed. 430 431 Raises: 432 OSError: if the st_size is not a non-negative integer, 433 or if st_size exceeds the available file system space 434 """ 435 byte_contents = self._encode_contents(contents) 436 changed = self._byte_contents != byte_contents 437 st_size = len(byte_contents) if byte_contents else 0 438 439 current_size = self.st_size or 0 440 self.filesystem.change_disk_usage( 441 st_size - current_size, self.name, self.st_dev) 442 self._byte_contents = byte_contents 443 self.st_size = st_size 444 self.epoch += 1 445 return changed 446 447 def set_contents(self, contents: AnyStr, 448 encoding: Optional[str] = None) -> bool: 449 """Sets the file contents and size and increases the modification time. 450 Also executes the side_effects if available. 451 452 Args: 453 contents: (str, bytes) new content of file. 454 encoding: (str) the encoding to be used for writing the contents 455 if they are a unicode string. 456 If not given, the locale preferred encoding is used. 457 458 Returns: 459 True if the contents have been changed. 460 461 Raises: 462 OSError: if `st_size` is not a non-negative integer, 463 or if it exceeds the available file system space. 464 """ 465 self.encoding = real_encoding(encoding) 466 changed = self.set_initial_contents(contents) 467 if self._side_effect is not None: 468 self._side_effect(self) 469 return changed 470 471 @property 472 def size(self) -> int: 473 """Return the size in bytes of the file contents. 474 """ 475 return self.st_size 476 477 @size.setter 478 def size(self, st_size: int) -> None: 479 """Resizes file content, padding with nulls if new size exceeds the 480 old size. 481 482 Args: 483 st_size: The desired size for the file. 484 485 Raises: 486 OSError: if the st_size arg is not a non-negative integer 487 or if st_size exceeds the available file system space 488 """ 489 490 self._check_positive_int(st_size) 491 current_size = self.st_size or 0 492 self.filesystem.change_disk_usage( 493 st_size - current_size, self.name, self.st_dev) 494 if self._byte_contents: 495 if st_size < current_size: 496 self._byte_contents = self._byte_contents[:st_size] 497 else: 498 self._byte_contents += b'\0' * (st_size - current_size) 499 self.st_size = st_size 500 self.epoch += 1 501 502 @property 503 def path(self) -> AnyStr: 504 """Return the full path of the current object.""" 505 names: List[AnyStr] = [] 506 obj: Optional[FakeFile] = self 507 while obj: 508 names.insert( 509 0, matching_string(self.name, obj.name)) # type: ignore 510 obj = obj.parent_dir 511 sep = self.filesystem.get_path_separator(names[0]) 512 if names[0] == sep: 513 names.pop(0) 514 dir_path = sep.join(names) 515 drive = self.filesystem.splitdrive(dir_path)[0] 516 # if a Windows path already starts with a drive or UNC path, 517 # no extra separator is needed 518 if not drive: 519 dir_path = sep + dir_path 520 else: 521 dir_path = sep.join(names) 522 return self.filesystem.absnormpath(dir_path) 523 524 @Deprecator('property path') 525 def GetPath(self): 526 return self.path 527 528 @Deprecator('property size') 529 def GetSize(self): 530 return self.size 531 532 @Deprecator('property size') 533 def SetSize(self, value): 534 self.size = value 535 536 @Deprecator('property st_atime') 537 def SetATime(self, st_atime): 538 """Set the self.st_atime attribute. 539 540 Args: 541 st_atime: The desired access time. 542 """ 543 self.st_atime = st_atime 544 545 @Deprecator('property st_mtime') 546 def SetMTime(self, st_mtime): 547 """Set the self.st_mtime attribute. 548 549 Args: 550 st_mtime: The desired modification time. 551 """ 552 self.st_mtime = st_mtime 553 554 @Deprecator('property st_ctime') 555 def SetCTime(self, st_ctime): 556 """Set the self.st_ctime attribute. 557 558 Args: 559 st_ctime: The desired creation time. 560 """ 561 self.st_ctime = st_ctime 562 563 def __getattr__(self, item: str) -> Any: 564 """Forward some properties to stat_result.""" 565 if item in self.stat_types: 566 return getattr(self.stat_result, item) 567 return super().__getattribute__(item) 568 569 def __setattr__(self, key: str, value: Any) -> None: 570 """Forward some properties to stat_result.""" 571 if key in self.stat_types: 572 return setattr(self.stat_result, key, value) 573 return super().__setattr__(key, value) 574 575 def __str__(self) -> str: 576 return '%r(%o)' % (self.name, self.st_mode) 577 578 @Deprecator('st_ino') 579 def SetIno(self, st_ino): 580 """Set the self.st_ino attribute. 581 Note that a unique inode is assigned automatically to a new fake file. 582 This function does not guarantee uniqueness and should be used with 583 caution. 584 585 Args: 586 st_ino: (int) The desired inode. 587 """ 588 self.st_ino = st_ino 589 590 591class FakeNullFile(FakeFile): 592 def __init__(self, filesystem: "FakeFilesystem") -> None: 593 devnull = 'nul' if filesystem.is_windows_fs else '/dev/null' 594 super(FakeNullFile, self).__init__( 595 devnull, filesystem=filesystem, contents='') 596 597 @property 598 def byte_contents(self) -> bytes: 599 return b'' 600 601 def set_initial_contents(self, contents: AnyStr) -> bool: 602 return False 603 604 605Deprecator.add(FakeFile, FakeFile.set_large_file_size, 'SetLargeFileSize') 606Deprecator.add(FakeFile, FakeFile.set_contents, 'SetContents') 607Deprecator.add(FakeFile, FakeFile.is_large_file, 'IsLargeFile') 608 609 610class FakeFileFromRealFile(FakeFile): 611 """Represents a fake file copied from the real file system. 612 613 The contents of the file are read on demand only. 614 """ 615 616 def __init__(self, file_path: str, filesystem: "FakeFilesystem", 617 side_effect: Optional[Callable] = None) -> None: 618 """ 619 Args: 620 file_path: Path to the existing file. 621 filesystem: The fake filesystem where the file is created. 622 623 Raises: 624 OSError: if the file does not exist in the real file system. 625 OSError: if the file already exists in the fake file system. 626 """ 627 super().__init__( 628 name=os.path.basename(file_path), filesystem=filesystem, 629 side_effect=side_effect) 630 self.contents_read = False 631 632 @property 633 def byte_contents(self) -> Optional[bytes]: 634 if not self.contents_read: 635 self.contents_read = True 636 with io.open(self.file_path, 'rb') as f: 637 self._byte_contents = f.read() 638 # On MacOS and BSD, the above io.open() updates atime on the real file 639 self.st_atime = os.stat(self.file_path).st_atime 640 return self._byte_contents 641 642 def set_contents(self, contents, encoding=None): 643 self.contents_read = True 644 super(FakeFileFromRealFile, self).set_contents(contents, encoding) 645 646 def is_large_file(self): 647 """The contents are never faked.""" 648 return False 649 650 651class FakeDirectory(FakeFile): 652 """Provides the appearance of a real directory.""" 653 654 def __init__(self, name: str, perm_bits: int = PERM_DEF, 655 filesystem: Optional["FakeFilesystem"] = None): 656 """ 657 Args: 658 name: name of the file/directory, without parent path information 659 perm_bits: permission bits. defaults to 0o777. 660 filesystem: if set, the fake filesystem where the directory 661 is created 662 """ 663 FakeFile.__init__( 664 self, name, S_IFDIR | perm_bits, '', filesystem=filesystem) 665 # directories have the link count of contained entries, 666 # including '.' and '..' 667 self.st_nlink += 1 668 self._entries: Dict[str, AnyFile] = {} 669 670 def set_contents(self, contents: AnyStr, 671 encoding: Optional[str] = None) -> bool: 672 raise self.filesystem.raise_os_error(errno.EISDIR, self.path) 673 674 @property 675 def entries(self) -> Dict[str, FakeFile]: 676 """Return the list of contained directory entries.""" 677 return self._entries 678 679 @property 680 def ordered_dirs(self) -> List[str]: 681 """Return the list of contained directory entry names ordered by 682 creation order. 683 """ 684 return [item[0] for item in sorted( 685 self._entries.items(), key=lambda entry: entry[1].st_ino)] 686 687 def add_entry(self, path_object: FakeFile) -> None: 688 """Adds a child FakeFile to this directory. 689 690 Args: 691 path_object: FakeFile instance to add as a child of this directory. 692 693 Raises: 694 OSError: if the directory has no write permission (Posix only) 695 OSError: if the file or directory to be added already exists 696 """ 697 if (not is_root() and not self.st_mode & PERM_WRITE and 698 not self.filesystem.is_windows_fs): 699 raise OSError(errno.EACCES, 'Permission Denied', self.path) 700 701 path_object_name: str = to_string(path_object.name) 702 if path_object_name in self.entries: 703 self.filesystem.raise_os_error(errno.EEXIST, self.path) 704 705 self._entries[path_object_name] = path_object 706 path_object.parent_dir = self 707 if path_object.st_ino is None: 708 self.filesystem.last_ino += 1 709 path_object.st_ino = self.filesystem.last_ino 710 self.st_nlink += 1 711 path_object.st_nlink += 1 712 path_object.st_dev = self.st_dev 713 if path_object.st_nlink == 1: 714 self.filesystem.change_disk_usage( 715 path_object.size, path_object.name, self.st_dev) 716 717 def get_entry(self, pathname_name: str) -> AnyFile: 718 """Retrieves the specified child file or directory entry. 719 720 Args: 721 pathname_name: The basename of the child object to retrieve. 722 723 Returns: 724 The fake file or directory object. 725 726 Raises: 727 KeyError: if no child exists by the specified name. 728 """ 729 pathname_name = self._normalized_entryname(pathname_name) 730 return self.entries[to_string(pathname_name)] 731 732 def _normalized_entryname(self, pathname_name: str) -> str: 733 if not self.filesystem.is_case_sensitive: 734 matching_names = [name for name in self.entries 735 if name.lower() == pathname_name.lower()] 736 if matching_names: 737 pathname_name = matching_names[0] 738 return pathname_name 739 740 def remove_entry(self, pathname_name: str, recursive: bool = True) -> None: 741 """Removes the specified child file or directory. 742 743 Args: 744 pathname_name: Basename of the child object to remove. 745 recursive: If True (default), the entries in contained directories 746 are deleted first. Used to propagate removal errors 747 (e.g. permission problems) from contained entries. 748 749 Raises: 750 KeyError: if no child exists by the specified name. 751 OSError: if user lacks permission to delete the file, 752 or (Windows only) the file is open. 753 """ 754 pathname_name = self._normalized_entryname(pathname_name) 755 entry = self.get_entry(pathname_name) 756 if self.filesystem.is_windows_fs: 757 if entry.st_mode & PERM_WRITE == 0: 758 self.filesystem.raise_os_error(errno.EACCES, pathname_name) 759 if self.filesystem.has_open_file(entry): 760 self.filesystem.raise_os_error(errno.EACCES, pathname_name) 761 else: 762 if (not is_root() and (self.st_mode & (PERM_WRITE | PERM_EXE) != 763 PERM_WRITE | PERM_EXE)): 764 self.filesystem.raise_os_error(errno.EACCES, pathname_name) 765 766 if recursive and isinstance(entry, FakeDirectory): 767 while entry.entries: 768 entry.remove_entry(list(entry.entries)[0]) 769 elif entry.st_nlink == 1: 770 self.filesystem.change_disk_usage( 771 -entry.size, pathname_name, entry.st_dev) 772 773 self.st_nlink -= 1 774 entry.st_nlink -= 1 775 assert entry.st_nlink >= 0 776 777 del self.entries[to_string(pathname_name)] 778 779 @property 780 def size(self) -> int: 781 """Return the total size of all files contained in this directory tree. 782 """ 783 return sum([item[1].size for item in self.entries.items()]) 784 785 @size.setter 786 def size(self, st_size: int) -> None: 787 """Setting the size is an error for a directory.""" 788 raise self.filesystem.raise_os_error(errno.EISDIR, self.path) 789 790 @Deprecator('property size') 791 def GetSize(self): 792 return self.size 793 794 def has_parent_object(self, dir_object: "FakeDirectory") -> bool: 795 """Return `True` if dir_object is a direct or indirect parent 796 directory, or if both are the same object.""" 797 obj: Optional[FakeDirectory] = self 798 while obj: 799 if obj == dir_object: 800 return True 801 obj = obj.parent_dir 802 return False 803 804 def __str__(self) -> str: 805 description = super(FakeDirectory, self).__str__() + ':\n' 806 for item in self.entries: 807 item_desc = self.entries[item].__str__() 808 for line in item_desc.split('\n'): 809 if line: 810 description = description + ' ' + line + '\n' 811 return description 812 813 814Deprecator.add(FakeDirectory, FakeDirectory.add_entry, 'AddEntry') 815Deprecator.add(FakeDirectory, FakeDirectory.get_entry, 'GetEntry') 816Deprecator.add(FakeDirectory, FakeDirectory.set_contents, 'SetContents') 817Deprecator.add(FakeDirectory, FakeDirectory.remove_entry, 'RemoveEntry') 818 819 820class FakeDirectoryFromRealDirectory(FakeDirectory): 821 """Represents a fake directory copied from the real file system. 822 823 The contents of the directory are read on demand only. 824 """ 825 826 def __init__(self, source_path: AnyPath, filesystem: "FakeFilesystem", 827 read_only: bool, target_path: Optional[AnyPath] = None): 828 """ 829 Args: 830 source_path: Full directory path. 831 filesystem: The fake filesystem where the directory is created. 832 read_only: If set, all files under the directory are treated 833 as read-only, e.g. a write access raises an exception; 834 otherwise, writing to the files changes the fake files 835 only as usually. 836 target_path: If given, the target path of the directory, 837 otherwise the target is the same as `source_path`. 838 839 Raises: 840 OSError: if the directory does not exist in the real file system 841 """ 842 target_path = target_path or source_path 843 real_stat = os.stat(source_path) 844 super(FakeDirectoryFromRealDirectory, self).__init__( 845 name=to_string(os.path.split(target_path)[1]), 846 perm_bits=real_stat.st_mode, 847 filesystem=filesystem) 848 849 self.st_ctime = real_stat.st_ctime 850 self.st_atime = real_stat.st_atime 851 self.st_mtime = real_stat.st_mtime 852 self.st_gid = real_stat.st_gid 853 self.st_uid = real_stat.st_uid 854 self.source_path = source_path # type: ignore 855 self.read_only = read_only 856 self.contents_read = False 857 858 @property 859 def entries(self) -> Dict[str, FakeFile]: 860 """Return the list of contained directory entries, loading them 861 if not already loaded.""" 862 if not self.contents_read: 863 self.contents_read = True 864 base = self.path 865 for entry in os.listdir(self.source_path): 866 source_path = os.path.join(self.source_path, entry) 867 target_path = os.path.join(base, entry) # type: ignore 868 if os.path.islink(source_path): 869 self.filesystem.add_real_symlink(source_path, target_path) 870 elif os.path.isdir(source_path): 871 self.filesystem.add_real_directory( 872 source_path, self.read_only, target_path=target_path) 873 else: 874 self.filesystem.add_real_file( 875 source_path, self.read_only, target_path=target_path) 876 return self._entries 877 878 @property 879 def size(self) -> int: 880 # we cannot get the size until the contents are loaded 881 if not self.contents_read: 882 return 0 883 return super(FakeDirectoryFromRealDirectory, self).size 884 885 @size.setter 886 def size(self, st_size: int) -> None: 887 raise self.filesystem.raise_os_error(errno.EISDIR, self.path) 888 889 890class FakeFilesystem: 891 """Provides the appearance of a real directory tree for unit testing. 892 893 Attributes: 894 path_separator: The path separator, corresponds to `os.path.sep`. 895 alternative_path_separator: Corresponds to `os.path.altsep`. 896 is_windows_fs: `True` in a real or faked Windows file system. 897 is_macos: `True` under MacOS, or if we are faking it. 898 is_case_sensitive: `True` if a case-sensitive file system is assumed. 899 root: The root :py:class:`FakeDirectory` entry of the file system. 900 cwd: The current working directory path. 901 umask: The umask used for newly created files, see `os.umask`. 902 patcher: Holds the Patcher object if created from it. Allows access 903 to the patcher object if using the pytest fs fixture. 904 """ 905 906 def __init__(self, path_separator: str = os.path.sep, 907 total_size: int = None, 908 patcher: Any = None) -> None: 909 """ 910 Args: 911 path_separator: optional substitute for os.path.sep 912 total_size: if not None, the total size in bytes of the 913 root filesystem. 914 915 Example usage to use the same path separator under all systems: 916 917 >>> filesystem = FakeFilesystem(path_separator='/') 918 919 """ 920 self.path_separator: str = path_separator 921 self.alternative_path_separator: Optional[str] = os.path.altsep 922 self.patcher = patcher 923 if path_separator != os.sep: 924 self.alternative_path_separator = None 925 926 # is_windows_fs can be used to test the behavior of pyfakefs under 927 # Windows fs on non-Windows systems and vice verse; 928 # is it used to support drive letters, UNC paths and some other 929 # Windows-specific features 930 self.is_windows_fs = sys.platform == 'win32' 931 932 # can be used to test some MacOS-specific behavior under other systems 933 self.is_macos = sys.platform == 'darwin' 934 935 # is_case_sensitive can be used to test pyfakefs for case-sensitive 936 # file systems on non-case-sensitive systems and vice verse 937 self.is_case_sensitive = not (self.is_windows_fs or self.is_macos) 938 939 self.root = FakeDirectory(self.path_separator, filesystem=self) 940 self.cwd = self.root.name 941 942 # We can't query the current value without changing it: 943 self.umask = os.umask(0o22) 944 os.umask(self.umask) 945 946 # A list of open file objects. Their position in the list is their 947 # file descriptor number 948 self.open_files: List[Optional[List[AnyFileWrapper]]] = [] 949 # A heap containing all free positions in self.open_files list 950 self._free_fd_heap: List[int] = [] 951 # last used numbers for inodes (st_ino) and devices (st_dev) 952 self.last_ino = 0 953 self.last_dev = 0 954 self.mount_points: Dict[AnyString, Dict] = {} 955 self.add_mount_point(self.root.name, total_size) 956 self._add_standard_streams() 957 self.dev_null = FakeNullFile(self) 958 # set from outside if needed 959 self.patch_open_code = PatchMode.OFF 960 self.shuffle_listdir_results = False 961 962 @property 963 def is_linux(self) -> bool: 964 return not self.is_windows_fs and not self.is_macos 965 966 @property 967 def os(self) -> OSType: 968 """Return the real or simulated type of operating system.""" 969 return (OSType.WINDOWS if self.is_windows_fs else 970 OSType.MACOS if self.is_macos else OSType.LINUX) 971 972 @os.setter 973 def os(self, value: OSType) -> None: 974 """Set the simulated type of operating system underlying the fake 975 file system.""" 976 self.is_windows_fs = value == OSType.WINDOWS 977 self.is_macos = value == OSType.MACOS 978 self.is_case_sensitive = value == OSType.LINUX 979 self.path_separator = '\\' if value == OSType.WINDOWS else '/' 980 self.alternative_path_separator = ('/' if value == OSType.WINDOWS 981 else None) 982 self.reset() 983 FakePathModule.reset(self) 984 985 def reset(self, total_size: Optional[int] = None): 986 """Remove all file system contents and reset the root.""" 987 self.root = FakeDirectory(self.path_separator, filesystem=self) 988 self.cwd = self.root.name 989 990 self.open_files = [] 991 self._free_fd_heap = [] 992 self.last_ino = 0 993 self.last_dev = 0 994 self.mount_points = {} 995 self.add_mount_point(self.root.name, total_size) 996 self._add_standard_streams() 997 from pyfakefs import fake_pathlib 998 fake_pathlib.init_module(self) 999 1000 def pause(self) -> None: 1001 """Pause the patching of the file system modules until `resume` is 1002 called. After that call, all file system calls are executed in the 1003 real file system. 1004 Calling pause() twice is silently ignored. 1005 Only allowed if the file system object was created by a 1006 Patcher object. This is also the case for the pytest `fs` fixture. 1007 1008 Raises: 1009 RuntimeError: if the file system was not created by a Patcher. 1010 """ 1011 if self.patcher is None: 1012 raise RuntimeError('pause() can only be called from a fake file ' 1013 'system object created by a Patcher object') 1014 self.patcher.pause() 1015 1016 def resume(self) -> None: 1017 """Resume the patching of the file system modules if `pause` has 1018 been called before. After that call, all file system calls are 1019 executed in the fake file system. 1020 Does nothing if patching is not paused. 1021 Raises: 1022 RuntimeError: if the file system has not been created by `Patcher`. 1023 """ 1024 if self.patcher is None: 1025 raise RuntimeError('resume() can only be called from a fake file ' 1026 'system object created by a Patcher object') 1027 self.patcher.resume() 1028 1029 def clear_cache(self) -> None: 1030 """Clear the cache of non-patched modules.""" 1031 if self.patcher: 1032 self.patcher.clear_cache() 1033 1034 def line_separator(self) -> str: 1035 return '\r\n' if self.is_windows_fs else '\n' 1036 1037 def _error_message(self, err_no: int) -> str: 1038 return os.strerror(err_no) + ' in the fake filesystem' 1039 1040 def raise_os_error(self, err_no: int, 1041 filename: Optional[AnyString] = None, 1042 winerror: Optional[int] = None) -> NoReturn: 1043 """Raises OSError. 1044 The error message is constructed from the given error code and shall 1045 start with the error string issued in the real system. 1046 Note: this is not true under Windows if winerror is given - in this 1047 case a localized message specific to winerror will be shown in the 1048 real file system. 1049 1050 Args: 1051 err_no: A numeric error code from the C variable errno. 1052 filename: The name of the affected file, if any. 1053 winerror: Windows only - the specific Windows error code. 1054 """ 1055 message = self._error_message(err_no) 1056 if (winerror is not None and sys.platform == 'win32' and 1057 self.is_windows_fs): 1058 raise OSError(err_no, message, filename, winerror) 1059 raise OSError(err_no, message, filename) 1060 1061 def get_path_separator(self, path: AnyStr) -> AnyStr: 1062 """Return the path separator as the same type as path""" 1063 return matching_string(path, self.path_separator) 1064 1065 def _alternative_path_separator( 1066 self, path: AnyStr) -> Optional[AnyStr]: 1067 """Return the alternative path separator as the same type as path""" 1068 return matching_string(path, self.alternative_path_separator) 1069 1070 def _starts_with_sep(self, path: AnyStr) -> bool: 1071 """Return True if path starts with a path separator.""" 1072 sep = self.get_path_separator(path) 1073 altsep = self._alternative_path_separator(path) 1074 return (path.startswith(sep) or altsep is not None and 1075 path.startswith(altsep)) 1076 1077 def add_mount_point(self, path: AnyStr, 1078 total_size: Optional[int] = None) -> Dict: 1079 """Add a new mount point for a filesystem device. 1080 The mount point gets a new unique device number. 1081 1082 Args: 1083 path: The root path for the new mount path. 1084 1085 total_size: The new total size of the added filesystem device 1086 in bytes. Defaults to infinite size. 1087 1088 Returns: 1089 The newly created mount point dict. 1090 1091 Raises: 1092 OSError: if trying to mount an existing mount point again. 1093 """ 1094 path = self.absnormpath(path) 1095 if path in self.mount_points: 1096 self.raise_os_error(errno.EEXIST, path) 1097 self.last_dev += 1 1098 self.mount_points[path] = { 1099 'idev': self.last_dev, 'total_size': total_size, 'used_size': 0 1100 } 1101 # special handling for root path: has been created before 1102 if path == self.root.name: 1103 root_dir = self.root 1104 self.last_ino += 1 1105 root_dir.st_ino = self.last_ino 1106 else: 1107 root_dir = self.create_dir(path) 1108 root_dir.st_dev = self.last_dev 1109 return self.mount_points[path] 1110 1111 def _auto_mount_drive_if_needed(self, path: AnyStr, 1112 force: bool = False) -> Optional[Dict]: 1113 if (self.is_windows_fs and 1114 (force or not self._mount_point_for_path(path))): 1115 drive = self.splitdrive(path)[0] 1116 if drive: 1117 return self.add_mount_point(path=drive) 1118 return None 1119 1120 def _mount_point_for_path(self, path: AnyStr) -> Dict: 1121 path = self.absnormpath(self._original_path(path)) 1122 for mount_path in self.mount_points: 1123 if path == matching_string(path, mount_path): 1124 return self.mount_points[mount_path] 1125 mount_path = matching_string(path, '') 1126 drive = self.splitdrive(path)[0] 1127 for root_path in self.mount_points: 1128 root_path = matching_string(path, root_path) 1129 if drive and not root_path.startswith(drive): 1130 continue 1131 if path.startswith(root_path) and len(root_path) > len(mount_path): 1132 mount_path = root_path 1133 if mount_path: 1134 return self.mount_points[to_string(mount_path)] 1135 mount_point = self._auto_mount_drive_if_needed(path, force=True) 1136 assert mount_point 1137 return mount_point 1138 1139 def _mount_point_for_device(self, idev: int) -> Optional[Dict]: 1140 for mount_point in self.mount_points.values(): 1141 if mount_point['idev'] == idev: 1142 return mount_point 1143 return None 1144 1145 def get_disk_usage( 1146 self, path: AnyStr = None) -> Tuple[int, int, int]: 1147 """Return the total, used and free disk space in bytes as named tuple, 1148 or placeholder values simulating unlimited space if not set. 1149 1150 .. note:: This matches the return value of shutil.disk_usage(). 1151 1152 Args: 1153 path: The disk space is returned for the file system device where 1154 `path` resides. 1155 Defaults to the root path (e.g. '/' on Unix systems). 1156 """ 1157 DiskUsage = namedtuple('DiskUsage', 'total, used, free') 1158 if path is None: 1159 mount_point = self.mount_points[self.root.name] 1160 else: 1161 mount_point = self._mount_point_for_path(path) 1162 if mount_point and mount_point['total_size'] is not None: 1163 return DiskUsage(mount_point['total_size'], 1164 mount_point['used_size'], 1165 mount_point['total_size'] - 1166 mount_point['used_size']) 1167 return DiskUsage( 1168 1024 * 1024 * 1024 * 1024, 0, 1024 * 1024 * 1024 * 1024) 1169 1170 def set_disk_usage( 1171 self, total_size: int, path: Optional[AnyStr] = None) -> None: 1172 """Changes the total size of the file system, preserving the 1173 used space. 1174 Example usage: set the size of an auto-mounted Windows drive. 1175 1176 Args: 1177 total_size: The new total size of the filesystem in bytes. 1178 1179 path: The disk space is changed for the file system device where 1180 `path` resides. 1181 Defaults to the root path (e.g. '/' on Unix systems). 1182 1183 Raises: 1184 OSError: if the new space is smaller than the used size. 1185 """ 1186 file_path: AnyStr = (path if path is not None # type: ignore 1187 else self.root.name) 1188 mount_point = self._mount_point_for_path(file_path) 1189 if (mount_point['total_size'] is not None and 1190 mount_point['used_size'] > total_size): 1191 self.raise_os_error(errno.ENOSPC, path) 1192 mount_point['total_size'] = total_size 1193 1194 def change_disk_usage(self, usage_change: int, 1195 file_path: AnyStr, st_dev: int) -> None: 1196 """Change the used disk space by the given amount. 1197 1198 Args: 1199 usage_change: Number of bytes added to the used space. 1200 If negative, the used space will be decreased. 1201 1202 file_path: The path of the object needing the disk space. 1203 1204 st_dev: The device ID for the respective file system. 1205 1206 Raises: 1207 OSError: if usage_change exceeds the free file system space 1208 """ 1209 mount_point = self._mount_point_for_device(st_dev) 1210 if mount_point: 1211 total_size = mount_point['total_size'] 1212 if total_size is not None: 1213 if total_size - mount_point['used_size'] < usage_change: 1214 self.raise_os_error(errno.ENOSPC, file_path) 1215 mount_point['used_size'] += usage_change 1216 1217 def stat(self, entry_path: AnyStr, 1218 follow_symlinks: bool = True): 1219 """Return the os.stat-like tuple for the FakeFile object of entry_path. 1220 1221 Args: 1222 entry_path: Path to filesystem object to retrieve. 1223 follow_symlinks: If False and entry_path points to a symlink, 1224 the link itself is inspected instead of the linked object. 1225 1226 Returns: 1227 The FakeStatResult object corresponding to entry_path. 1228 1229 Raises: 1230 OSError: if the filesystem object doesn't exist. 1231 """ 1232 # stat should return the tuple representing return value of os.stat 1233 try: 1234 file_object = self.resolve( 1235 entry_path, follow_symlinks, 1236 allow_fd=True, check_read_perm=False) 1237 except TypeError: 1238 file_object = self.resolve(entry_path) 1239 if not is_root(): 1240 # make sure stat raises if a parent dir is not readable 1241 parent_dir = file_object.parent_dir 1242 if parent_dir: 1243 self.get_object(parent_dir.path) # type: ignore[arg-type] 1244 1245 self.raise_for_filepath_ending_with_separator( 1246 entry_path, file_object, follow_symlinks) 1247 1248 return file_object.stat_result.copy() 1249 1250 def raise_for_filepath_ending_with_separator( 1251 self, entry_path: AnyStr, 1252 file_object: FakeFile, 1253 follow_symlinks: bool = True, 1254 macos_handling: bool = False) -> None: 1255 if self.ends_with_path_separator(entry_path): 1256 if S_ISLNK(file_object.st_mode): 1257 try: 1258 link_object = self.resolve(entry_path) 1259 except OSError as exc: 1260 if self.is_macos and exc.errno != errno.ENOENT: 1261 return 1262 if self.is_windows_fs: 1263 self.raise_os_error(errno.EINVAL, entry_path) 1264 raise 1265 if not follow_symlinks or self.is_windows_fs or self.is_macos: 1266 file_object = link_object 1267 if self.is_windows_fs: 1268 is_error = S_ISREG(file_object.st_mode) 1269 elif self.is_macos and macos_handling: 1270 is_error = not S_ISLNK(file_object.st_mode) 1271 else: 1272 is_error = not S_ISDIR(file_object.st_mode) 1273 if is_error: 1274 error_nr = (errno.EINVAL if self.is_windows_fs 1275 else errno.ENOTDIR) 1276 self.raise_os_error(error_nr, entry_path) 1277 1278 def chmod(self, path: AnyStr, mode: int, 1279 follow_symlinks: bool = True) -> None: 1280 """Change the permissions of a file as encoded in integer mode. 1281 1282 Args: 1283 path: (str) Path to the file. 1284 mode: (int) Permissions. 1285 follow_symlinks: If `False` and `path` points to a symlink, 1286 the link itself is affected instead of the linked object. 1287 """ 1288 file_object = self.resolve(path, follow_symlinks, allow_fd=True) 1289 if self.is_windows_fs: 1290 if mode & PERM_WRITE: 1291 file_object.st_mode = file_object.st_mode | 0o222 1292 else: 1293 file_object.st_mode = file_object.st_mode & 0o777555 1294 else: 1295 file_object.st_mode = ((file_object.st_mode & ~PERM_ALL) | 1296 (mode & PERM_ALL)) 1297 file_object.st_ctime = now() 1298 1299 def utime(self, path: AnyStr, 1300 times: Optional[Tuple[Union[int, float], Union[int, float]]] = 1301 None, *, ns: Optional[Tuple[int, int]] = None, 1302 follow_symlinks: bool = True) -> None: 1303 """Change the access and modified times of a file. 1304 1305 Args: 1306 path: (str) Path to the file. 1307 times: 2-tuple of int or float numbers, of the form (atime, mtime) 1308 which is used to set the access and modified times in seconds. 1309 If None, both times are set to the current time. 1310 ns: 2-tuple of int numbers, of the form (atime, mtime) which is 1311 used to set the access and modified times in nanoseconds. 1312 If `None`, both times are set to the current time. 1313 follow_symlinks: If `False` and entry_path points to a symlink, 1314 the link itself is queried instead of the linked object. 1315 1316 Raises: 1317 TypeError: If anything other than the expected types is 1318 specified in the passed `times` or `ns` tuple, 1319 or if the tuple length is not equal to 2. 1320 ValueError: If both times and ns are specified. 1321 """ 1322 self._handle_utime_arg_errors(ns, times) 1323 1324 file_object = self.resolve(path, follow_symlinks, allow_fd=True) 1325 if times is not None: 1326 for file_time in times: 1327 if not isinstance(file_time, (int, float)): 1328 raise TypeError('atime and mtime must be numbers') 1329 1330 file_object.st_atime = times[0] 1331 file_object.st_mtime = times[1] 1332 elif ns is not None: 1333 for file_time in ns: 1334 if not isinstance(file_time, int): 1335 raise TypeError('atime and mtime must be ints') 1336 1337 file_object.st_atime_ns = ns[0] 1338 file_object.st_mtime_ns = ns[1] 1339 else: 1340 current_time = now() 1341 file_object.st_atime = current_time 1342 file_object.st_mtime = current_time 1343 1344 def _handle_utime_arg_errors( 1345 self, ns: Optional[Tuple[int, int]], 1346 times: Optional[Tuple[Union[int, float], Union[int, float]]]): 1347 if times is not None and ns is not None: 1348 raise ValueError( 1349 "utime: you may specify either 'times' or 'ns' but not both") 1350 if times is not None and len(times) != 2: 1351 raise TypeError( 1352 "utime: 'times' must be either a tuple of two ints or None") 1353 if ns is not None and len(ns) != 2: 1354 raise TypeError("utime: 'ns' must be a tuple of two ints") 1355 1356 @Deprecator 1357 def SetIno(self, path, st_ino): 1358 """Set the self.st_ino attribute of file at 'path'. 1359 Note that a unique inode is assigned automatically to a new fake file. 1360 Using this function does not guarantee uniqueness and should used 1361 with caution. 1362 1363 Args: 1364 path: Path to file. 1365 st_ino: The desired inode. 1366 """ 1367 self.get_object(path).st_ino = st_ino 1368 1369 def _add_open_file( 1370 self, 1371 file_obj: AnyFileWrapper) -> int: 1372 """Add file_obj to the list of open files on the filesystem. 1373 Used internally to manage open files. 1374 1375 The position in the open_files array is the file descriptor number. 1376 1377 Args: 1378 file_obj: File object to be added to open files list. 1379 1380 Returns: 1381 File descriptor number for the file object. 1382 """ 1383 if self._free_fd_heap: 1384 open_fd = heapq.heappop(self._free_fd_heap) 1385 self.open_files[open_fd] = [file_obj] 1386 return open_fd 1387 1388 self.open_files.append([file_obj]) 1389 return len(self.open_files) - 1 1390 1391 def _close_open_file(self, file_des: int) -> None: 1392 """Remove file object with given descriptor from the list 1393 of open files. 1394 1395 Sets the entry in open_files to None. 1396 1397 Args: 1398 file_des: Descriptor of file object to be removed from 1399 open files list. 1400 """ 1401 self.open_files[file_des] = None 1402 heapq.heappush(self._free_fd_heap, file_des) 1403 1404 def get_open_file(self, file_des: int) -> AnyFileWrapper: 1405 """Return an open file. 1406 1407 Args: 1408 file_des: File descriptor of the open file. 1409 1410 Raises: 1411 OSError: an invalid file descriptor. 1412 TypeError: filedes is not an integer. 1413 1414 Returns: 1415 Open file object. 1416 """ 1417 if not is_int_type(file_des): 1418 raise TypeError('an integer is required') 1419 valid = file_des < len(self.open_files) 1420 if valid: 1421 file_list = self.open_files[file_des] 1422 if file_list is not None: 1423 return file_list[0] 1424 self.raise_os_error(errno.EBADF, str(file_des)) 1425 1426 def has_open_file(self, file_object: FakeFile) -> bool: 1427 """Return True if the given file object is in the list of open files. 1428 1429 Args: 1430 file_object: The FakeFile object to be checked. 1431 1432 Returns: 1433 `True` if the file is open. 1434 """ 1435 return (file_object in [wrappers[0].get_object() 1436 for wrappers in self.open_files if wrappers]) 1437 1438 def _normalize_path_sep(self, path: AnyStr) -> AnyStr: 1439 alt_sep = self._alternative_path_separator(path) 1440 if alt_sep is not None: 1441 return path.replace(alt_sep, self.get_path_separator(path)) 1442 return path 1443 1444 def normcase(self, path: AnyStr) -> AnyStr: 1445 """Replace all appearances of alternative path separator 1446 with path separator. 1447 1448 Do nothing if no alternative separator is set. 1449 1450 Args: 1451 path: The path to be normalized. 1452 1453 Returns: 1454 The normalized path that will be used internally. 1455 """ 1456 file_path = make_string_path(path) 1457 return self._normalize_path_sep(file_path) 1458 1459 def normpath(self, path: AnyStr) -> AnyStr: 1460 """Mimic os.path.normpath using the specified path_separator. 1461 1462 Mimics os.path.normpath using the path_separator that was specified 1463 for this FakeFilesystem. Normalizes the path, but unlike the method 1464 absnormpath, does not make it absolute. Eliminates dot components 1465 (. and ..) and combines repeated path separators (//). Initial .. 1466 components are left in place for relative paths. 1467 If the result is an empty path, '.' is returned instead. 1468 1469 This also replaces alternative path separator with path separator. 1470 That is, it behaves like the real os.path.normpath on Windows if 1471 initialized with '\\' as path separator and '/' as alternative 1472 separator. 1473 1474 Args: 1475 path: (str) The path to normalize. 1476 1477 Returns: 1478 (str) A copy of path with empty components and dot components 1479 removed. 1480 """ 1481 path_str = self.normcase(path) 1482 drive, path_str = self.splitdrive(path_str) 1483 sep = self.get_path_separator(path_str) 1484 is_absolute_path = path_str.startswith(sep) 1485 path_components: List[AnyStr] = path_str.split(sep) 1486 collapsed_path_components: List[AnyStr] = [] 1487 dot = matching_string(path_str, '.') 1488 dotdot = matching_string(path_str, '..') 1489 for component in path_components: 1490 if (not component) or (component == dot): 1491 continue 1492 if component == dotdot: 1493 if collapsed_path_components and ( 1494 collapsed_path_components[-1] != dotdot): 1495 # Remove an up-reference: directory/.. 1496 collapsed_path_components.pop() 1497 continue 1498 elif is_absolute_path: 1499 # Ignore leading .. components if starting from the 1500 # root directory. 1501 continue 1502 collapsed_path_components.append(component) 1503 collapsed_path = sep.join(collapsed_path_components) 1504 if is_absolute_path: 1505 collapsed_path = sep + collapsed_path 1506 return drive + collapsed_path or dot 1507 1508 def _original_path(self, path: AnyStr) -> AnyStr: 1509 """Return a normalized case version of the given path for 1510 case-insensitive file systems. For case-sensitive file systems, 1511 return path unchanged. 1512 1513 Args: 1514 path: the file path to be transformed 1515 1516 Returns: 1517 A version of path matching the case of existing path elements. 1518 """ 1519 1520 def components_to_path(): 1521 if len(path_components) > len(normalized_components): 1522 normalized_components.extend( 1523 to_string(p) for p in path_components[len( 1524 normalized_components):]) 1525 sep = self.path_separator 1526 normalized_path = sep.join(normalized_components) 1527 if (self._starts_with_sep(path) 1528 and not self._starts_with_sep(normalized_path)): 1529 normalized_path = sep + normalized_path 1530 return normalized_path 1531 1532 if self.is_case_sensitive or not path: 1533 return path 1534 path_components = self._path_components(path) 1535 normalized_components = [] 1536 current_dir = self.root 1537 for component in path_components: 1538 if not isinstance(current_dir, FakeDirectory): 1539 return components_to_path() 1540 dir_name, directory = self._directory_content( 1541 current_dir, to_string(component)) 1542 if directory is None or ( 1543 isinstance(directory, FakeDirectory) and 1544 directory._byte_contents is None and 1545 directory.st_size == 0): 1546 return components_to_path() 1547 current_dir = cast(FakeDirectory, directory) 1548 normalized_components.append(dir_name) 1549 return components_to_path() 1550 1551 def absnormpath(self, path: AnyStr) -> AnyStr: 1552 """Absolutize and minimalize the given path. 1553 1554 Forces all relative paths to be absolute, and normalizes the path to 1555 eliminate dot and empty components. 1556 1557 Args: 1558 path: Path to normalize. 1559 1560 Returns: 1561 The normalized path relative to the current working directory, 1562 or the root directory if path is empty. 1563 """ 1564 path = self.normcase(path) 1565 cwd = matching_string(path, self.cwd) 1566 if not path: 1567 path = matching_string(path, self.path_separator) 1568 if path == matching_string(path, '.'): 1569 path = cwd 1570 elif not self._starts_with_root_path(path): 1571 # Prefix relative paths with cwd, if cwd is not root. 1572 root_name = matching_string(path, self.root.name) 1573 empty = matching_string(path, '') 1574 path = self.get_path_separator(path).join( 1575 (cwd != root_name and cwd or empty, path)) 1576 if path == matching_string(path, '.'): 1577 path = cwd 1578 return self.normpath(path) 1579 1580 def splitpath(self, path: AnyStr) -> Tuple[AnyStr, AnyStr]: 1581 """Mimic os.path.split using the specified path_separator. 1582 1583 Mimics os.path.split using the path_separator that was specified 1584 for this FakeFilesystem. 1585 1586 Args: 1587 path: (str) The path to split. 1588 1589 Returns: 1590 (str) A duple (pathname, basename) for which pathname does not 1591 end with a slash, and basename does not contain a slash. 1592 """ 1593 path = make_string_path(path) 1594 sep = self.get_path_separator(path) 1595 alt_sep = self._alternative_path_separator(path) 1596 seps = sep if alt_sep is None else sep + alt_sep 1597 drive, path = self.splitdrive(path) 1598 i = len(path) 1599 while i and path[i-1] not in seps: 1600 i -= 1 1601 head, tail = path[:i], path[i:] # now tail has no slashes 1602 # remove trailing slashes from head, unless it's all slashes 1603 head = head.rstrip(seps) or head 1604 return drive + head, tail 1605 1606 def splitdrive(self, path: AnyStr) -> Tuple[AnyStr, AnyStr]: 1607 """Splits the path into the drive part and the rest of the path. 1608 1609 Taken from Windows specific implementation in Python 3.5 1610 and slightly adapted. 1611 1612 Args: 1613 path: the full path to be splitpath. 1614 1615 Returns: 1616 A tuple of the drive part and the rest of the path, or of 1617 an empty string and the full path if drive letters are 1618 not supported or no drive is present. 1619 """ 1620 path_str = make_string_path(path) 1621 if self.is_windows_fs: 1622 if len(path_str) >= 2: 1623 norm_str = self.normcase(path_str) 1624 sep = self.get_path_separator(path_str) 1625 # UNC path_str handling 1626 if (norm_str[0:2] == sep * 2) and ( 1627 norm_str[2:3] != sep): 1628 # UNC path_str handling - splits off the mount point 1629 # instead of the drive 1630 sep_index = norm_str.find(sep, 2) 1631 if sep_index == -1: 1632 return path_str[:0], path_str 1633 sep_index2 = norm_str.find(sep, sep_index + 1) 1634 if sep_index2 == sep_index + 1: 1635 return path_str[:0], path_str 1636 if sep_index2 == -1: 1637 sep_index2 = len(path_str) 1638 return path_str[:sep_index2], path_str[sep_index2:] 1639 if path_str[1:2] == matching_string(path_str, ':'): 1640 return path_str[:2], path_str[2:] 1641 return path_str[:0], path_str 1642 1643 def _join_paths_with_drive_support( 1644 self, *all_paths: AnyStr) -> AnyStr: 1645 """Taken from Python 3.5 os.path.join() code in ntpath.py 1646 and slightly adapted""" 1647 base_path = all_paths[0] 1648 paths_to_add = all_paths[1:] 1649 sep = self.get_path_separator(base_path) 1650 seps = [sep, self._alternative_path_separator(base_path)] 1651 result_drive, result_path = self.splitdrive(base_path) 1652 for path in paths_to_add: 1653 drive_part, path_part = self.splitdrive(path) 1654 if path_part and path_part[:1] in seps: 1655 # Second path is absolute 1656 if drive_part or not result_drive: 1657 result_drive = drive_part 1658 result_path = path_part 1659 continue 1660 elif drive_part and drive_part != result_drive: 1661 if (self.is_case_sensitive or 1662 drive_part.lower() != result_drive.lower()): 1663 # Different drives => ignore the first path entirely 1664 result_drive = drive_part 1665 result_path = path_part 1666 continue 1667 # Same drive in different case 1668 result_drive = drive_part 1669 # Second path is relative to the first 1670 if result_path and result_path[-1:] not in seps: 1671 result_path = result_path + sep 1672 result_path = result_path + path_part 1673 # add separator between UNC and non-absolute path 1674 colon = matching_string(base_path, ':') 1675 if (result_path and result_path[:1] not in seps and 1676 result_drive and result_drive[-1:] != colon): 1677 return result_drive + sep + result_path 1678 return result_drive + result_path 1679 1680 def joinpaths(self, *paths: AnyStr) -> AnyStr: 1681 """Mimic os.path.join using the specified path_separator. 1682 1683 Args: 1684 *paths: (str) Zero or more paths to join. 1685 1686 Returns: 1687 (str) The paths joined by the path separator, starting with 1688 the last absolute path in paths. 1689 """ 1690 file_paths = [os.fspath(path) for path in paths] 1691 if len(file_paths) == 1: 1692 return paths[0] 1693 if self.is_windows_fs: 1694 return self._join_paths_with_drive_support(*file_paths) 1695 joined_path_segments = [] 1696 sep = self.get_path_separator(file_paths[0]) 1697 for path_segment in file_paths: 1698 if self._starts_with_root_path(path_segment): 1699 # An absolute path 1700 joined_path_segments = [path_segment] 1701 else: 1702 if (joined_path_segments and 1703 not joined_path_segments[-1].endswith(sep)): 1704 joined_path_segments.append(sep) 1705 if path_segment: 1706 joined_path_segments.append(path_segment) 1707 return matching_string(file_paths[0], '').join(joined_path_segments) 1708 1709 @overload 1710 def _path_components(self, path: str) -> List[str]: ... 1711 1712 @overload 1713 def _path_components(self, path: bytes) -> List[bytes]: ... 1714 1715 def _path_components(self, path: AnyStr) -> List[AnyStr]: 1716 """Breaks the path into a list of component names. 1717 1718 Does not include the root directory as a component, as all paths 1719 are considered relative to the root directory for the FakeFilesystem. 1720 Callers should basically follow this pattern: 1721 1722 .. code:: python 1723 1724 file_path = self.absnormpath(file_path) 1725 path_components = self._path_components(file_path) 1726 current_dir = self.root 1727 for component in path_components: 1728 if component not in current_dir.entries: 1729 raise OSError 1730 _do_stuff_with_component(current_dir, component) 1731 current_dir = current_dir.get_entry(component) 1732 1733 Args: 1734 path: Path to tokenize. 1735 1736 Returns: 1737 The list of names split from path. 1738 """ 1739 if not path or path == self.get_path_separator(path): 1740 return [] 1741 drive, path = self.splitdrive(path) 1742 path_components = path.split(self.get_path_separator(path)) 1743 assert drive or path_components 1744 if not path_components[0]: 1745 if len(path_components) > 1 and not path_components[1]: 1746 path_components = [] 1747 else: 1748 # This is an absolute path. 1749 path_components = path_components[1:] 1750 if drive: 1751 path_components.insert(0, drive) 1752 return path_components 1753 1754 def _starts_with_drive_letter(self, file_path: AnyStr) -> bool: 1755 """Return True if file_path starts with a drive letter. 1756 1757 Args: 1758 file_path: the full path to be examined. 1759 1760 Returns: 1761 `True` if drive letter support is enabled in the filesystem and 1762 the path starts with a drive letter. 1763 """ 1764 colon = matching_string(file_path, ':') 1765 if (len(file_path) >= 2 and 1766 file_path[:1].isalpha and file_path[1:2] == colon): 1767 if self.is_windows_fs: 1768 return True 1769 if os.name == 'nt': 1770 # special case if we are emulating Posix under Windows 1771 # check if the path exists because it has been mapped in 1772 # this is not foolproof, but handles most cases 1773 try: 1774 self.get_object_from_normpath(file_path) 1775 return True 1776 except OSError: 1777 return False 1778 return False 1779 1780 def _starts_with_root_path(self, file_path: AnyStr) -> bool: 1781 root_name = matching_string(file_path, self.root.name) 1782 file_path = self._normalize_path_sep(file_path) 1783 return (file_path.startswith(root_name) or 1784 not self.is_case_sensitive and file_path.lower().startswith( 1785 root_name.lower()) or 1786 self._starts_with_drive_letter(file_path)) 1787 1788 def _is_root_path(self, file_path: AnyStr) -> bool: 1789 root_name = matching_string(file_path, self.root.name) 1790 return (file_path == root_name or not self.is_case_sensitive and 1791 file_path.lower() == root_name.lower() or 1792 2 <= len(file_path) <= 3 and 1793 self._starts_with_drive_letter(file_path)) 1794 1795 def ends_with_path_separator(self, path: Union[int, AnyPath]) -> bool: 1796 """Return True if ``file_path`` ends with a valid path separator.""" 1797 if isinstance(path, int): 1798 return False 1799 file_path = make_string_path(path) 1800 if not file_path: 1801 return False 1802 sep = self.get_path_separator(file_path) 1803 altsep = self._alternative_path_separator(file_path) 1804 return (file_path not in (sep, altsep) and 1805 (file_path.endswith(sep) or 1806 altsep is not None and file_path.endswith(altsep))) 1807 1808 def is_filepath_ending_with_separator(self, path: AnyStr) -> bool: 1809 if not self.ends_with_path_separator(path): 1810 return False 1811 return self.isfile(self._path_without_trailing_separators(path)) 1812 1813 def _directory_content(self, directory: FakeDirectory, 1814 component: str) -> Tuple[Optional[str], 1815 Optional[AnyFile]]: 1816 if not isinstance(directory, FakeDirectory): 1817 return None, None 1818 if component in directory.entries: 1819 return component, directory.entries[component] 1820 if not self.is_case_sensitive: 1821 matching_content = [(subdir, directory.entries[subdir]) for 1822 subdir in directory.entries 1823 if subdir.lower() == component.lower()] 1824 if matching_content: 1825 return matching_content[0] 1826 1827 return None, None 1828 1829 def exists(self, file_path: AnyPath, check_link: bool = False) -> bool: 1830 """Return true if a path points to an existing file system object. 1831 1832 Args: 1833 file_path: The path to examine. 1834 check_link: If True, links are not followed 1835 1836 Returns: 1837 (bool) True if the corresponding object exists. 1838 1839 Raises: 1840 TypeError: if file_path is None. 1841 """ 1842 if check_link and self.islink(file_path): 1843 return True 1844 path = to_string(make_string_path(file_path)) 1845 if path is None: 1846 raise TypeError 1847 if not path: 1848 return False 1849 if path == self.dev_null.name: 1850 return not self.is_windows_fs or sys.version_info >= (3, 8) 1851 try: 1852 if self.is_filepath_ending_with_separator(path): 1853 return False 1854 path = self.resolve_path(path) 1855 except OSError: 1856 return False 1857 if path == self.root.name: 1858 return True 1859 1860 path_components: List[str] = self._path_components(path) 1861 current_dir = self.root 1862 for component in path_components: 1863 directory = self._directory_content( 1864 current_dir, to_string(component))[1] 1865 if directory is None: 1866 return False 1867 current_dir = cast(FakeDirectory, directory) 1868 return True 1869 1870 def resolve_path(self, 1871 file_path: AnyStr, allow_fd: bool = False) -> AnyStr: 1872 """Follow a path, resolving symlinks. 1873 1874 ResolvePath traverses the filesystem along the specified file path, 1875 resolving file names and symbolic links until all elements of the path 1876 are exhausted, or we reach a file which does not exist. 1877 If all the elements are not consumed, they just get appended to the 1878 path resolved so far. 1879 This gives us the path which is as resolved as it can be, even if the 1880 file does not exist. 1881 1882 This behavior mimics Unix semantics, and is best shown by example. 1883 Given a file system that looks like this: 1884 1885 /a/b/ 1886 /a/b/c -> /a/b2 c is a symlink to /a/b2 1887 /a/b2/x 1888 /a/c -> ../d 1889 /a/x -> y 1890 1891 Then: 1892 /a/b/x => /a/b/x 1893 /a/c => /a/d 1894 /a/x => /a/y 1895 /a/b/c/d/e => /a/b2/d/e 1896 1897 Args: 1898 file_path: The path to examine. 1899 allow_fd: If `True`, `file_path` may be open file descriptor. 1900 1901 Returns: 1902 The resolved_path (str or byte). 1903 1904 Raises: 1905 TypeError: if `file_path` is `None`. 1906 OSError: if `file_path` is '' or a part of the path doesn't exist. 1907 """ 1908 1909 if allow_fd and isinstance(file_path, int): 1910 return self.get_open_file(file_path).get_object().path 1911 path = make_string_path(file_path) 1912 if path is None: 1913 # file.open(None) raises TypeError, so mimic that. 1914 raise TypeError('Expected file system path string, received None') 1915 if not path or not self._valid_relative_path(path): 1916 # file.open('') raises OSError, so mimic that, and validate that 1917 # all parts of a relative path exist. 1918 self.raise_os_error(errno.ENOENT, path) 1919 path = self.absnormpath(self._original_path(path)) 1920 if self._is_root_path(path): 1921 return path 1922 if path == matching_string(path, self.dev_null.name): 1923 return path 1924 path_components = self._path_components(path) 1925 resolved_components = self._resolve_components(path_components) 1926 return self._components_to_path(resolved_components) 1927 1928 def _components_to_path(self, component_folders): 1929 sep = (self.get_path_separator(component_folders[0]) 1930 if component_folders else self.path_separator) 1931 path = sep.join(component_folders) 1932 if not self._starts_with_root_path(path): 1933 path = sep + path 1934 return path 1935 1936 def _resolve_components(self, components: List[AnyStr]) -> List[str]: 1937 current_dir = self.root 1938 link_depth = 0 1939 path_components = [to_string(comp) for comp in components] 1940 resolved_components: List[str] = [] 1941 while path_components: 1942 component = path_components.pop(0) 1943 resolved_components.append(component) 1944 directory = self._directory_content(current_dir, component)[1] 1945 if directory is None: 1946 # The component of the path at this point does not actually 1947 # exist in the folder. We can't resolve the path any more. 1948 # It is legal to link to a file that does not yet exist, so 1949 # rather than raise an error, we just append the remaining 1950 # components to what return path we have built so far and 1951 # return that. 1952 resolved_components.extend(path_components) 1953 break 1954 # Resolve any possible symlinks in the current path component. 1955 elif S_ISLNK(directory.st_mode): 1956 # This link_depth check is not really meant to be an accurate 1957 # check. It is just a quick hack to prevent us from looping 1958 # forever on cycles. 1959 if link_depth > _MAX_LINK_DEPTH: 1960 self.raise_os_error(errno.ELOOP, 1961 self._components_to_path( 1962 resolved_components)) 1963 link_path = self._follow_link(resolved_components, directory) 1964 1965 # Following the link might result in the complete replacement 1966 # of the current_dir, so we evaluate the entire resulting path. 1967 target_components = self._path_components(link_path) 1968 path_components = target_components + path_components 1969 resolved_components = [] 1970 current_dir = self.root 1971 link_depth += 1 1972 else: 1973 current_dir = cast(FakeDirectory, directory) 1974 return resolved_components 1975 1976 def _valid_relative_path(self, file_path: AnyStr) -> bool: 1977 if self.is_windows_fs: 1978 return True 1979 slash_dotdot = matching_string( 1980 file_path, self.path_separator + '..') 1981 while file_path and slash_dotdot in file_path: 1982 file_path = file_path[:file_path.rfind(slash_dotdot)] 1983 if not self.exists(self.absnormpath(file_path)): 1984 return False 1985 return True 1986 1987 def _follow_link(self, link_path_components: List[str], 1988 link: AnyFile) -> str: 1989 """Follow a link w.r.t. a path resolved so far. 1990 1991 The component is either a real file, which is a no-op, or a 1992 symlink. In the case of a symlink, we have to modify the path 1993 as built up so far 1994 /a/b => ../c should yield /a/../c (which will normalize to /a/c) 1995 /a/b => x should yield /a/x 1996 /a/b => /x/y/z should yield /x/y/z 1997 The modified path may land us in a new spot which is itself a 1998 link, so we may repeat the process. 1999 2000 Args: 2001 link_path_components: The resolved path built up to the link 2002 so far. 2003 link: The link object itself. 2004 2005 Returns: 2006 (string) The updated path resolved after following the link. 2007 2008 Raises: 2009 OSError: if there are too many levels of symbolic link 2010 """ 2011 link_path = link.contents 2012 if link_path is not None: 2013 # ignore UNC prefix for local files 2014 if self.is_windows_fs and link_path.startswith('\\\\?\\'): 2015 link_path = link_path[4:] 2016 sep = self.get_path_separator(link_path) 2017 # For links to absolute paths, we want to throw out everything 2018 # in the path built so far and replace with the link. For relative 2019 # links, we have to append the link to what we have so far, 2020 if not self._starts_with_root_path(link_path): 2021 # Relative path. Append remainder of path to what we have 2022 # processed so far, excluding the name of the link itself. 2023 # /a/b => ../c should yield /a/../c 2024 # (which will normalize to /c) 2025 # /a/b => d should yield a/d 2026 components = link_path_components[:-1] 2027 components.append(link_path) 2028 link_path = sep.join(components) 2029 # Don't call self.NormalizePath(), as we don't want to prepend 2030 # self.cwd. 2031 return self.normpath(link_path) 2032 raise ValueError("Invalid link") 2033 2034 def get_object_from_normpath(self, 2035 file_path: AnyPath, 2036 check_read_perm: bool = True) -> AnyFile: 2037 """Search for the specified filesystem object within the fake 2038 filesystem. 2039 2040 Args: 2041 file_path: Specifies target FakeFile object to retrieve, with a 2042 path that has already been normalized/resolved. 2043 check_read_perm: If True, raises OSError if a parent directory 2044 does not have read permission 2045 2046 Returns: 2047 The FakeFile object corresponding to file_path. 2048 2049 Raises: 2050 OSError: if the object is not found. 2051 """ 2052 path = make_string_path(file_path) 2053 if path == matching_string(path, self.root.name): 2054 return self.root 2055 if path == matching_string(path, self.dev_null.name): 2056 return self.dev_null 2057 2058 path = self._original_path(path) 2059 path_components = self._path_components(path) 2060 target = self.root 2061 try: 2062 for component in path_components: 2063 if S_ISLNK(target.st_mode): 2064 if target.contents: 2065 target = cast(FakeDirectory, 2066 self.resolve(target.contents)) 2067 if not S_ISDIR(target.st_mode): 2068 if not self.is_windows_fs: 2069 self.raise_os_error(errno.ENOTDIR, path) 2070 self.raise_os_error(errno.ENOENT, path) 2071 target = target.get_entry(component) # type: ignore 2072 if (not is_root() and check_read_perm and target and 2073 not target.st_mode & PERM_READ): 2074 self.raise_os_error(errno.EACCES, target.path) 2075 except KeyError: 2076 self.raise_os_error(errno.ENOENT, path) 2077 return target 2078 2079 def get_object(self, file_path: AnyPath, 2080 check_read_perm: bool = True) -> FakeFile: 2081 """Search for the specified filesystem object within the fake 2082 filesystem. 2083 2084 Args: 2085 file_path: Specifies the target FakeFile object to retrieve. 2086 check_read_perm: If True, raises OSError if a parent directory 2087 does not have read permission 2088 2089 Returns: 2090 The FakeFile object corresponding to `file_path`. 2091 2092 Raises: 2093 OSError: if the object is not found. 2094 """ 2095 path = make_string_path(file_path) 2096 path = self.absnormpath(self._original_path(path)) 2097 return self.get_object_from_normpath(path, check_read_perm) 2098 2099 def resolve(self, file_path: AnyStr, 2100 follow_symlinks: bool = True, 2101 allow_fd: bool = False, 2102 check_read_perm: bool = True) -> FakeFile: 2103 """Search for the specified filesystem object, resolving all links. 2104 2105 Args: 2106 file_path: Specifies the target FakeFile object to retrieve. 2107 follow_symlinks: If `False`, the link itself is resolved, 2108 otherwise the object linked to. 2109 allow_fd: If `True`, `file_path` may be an open file descriptor 2110 check_read_perm: If True, raises OSError if a parent directory 2111 does not have read permission 2112 2113 Returns: 2114 The FakeFile object corresponding to `file_path`. 2115 2116 Raises: 2117 OSError: if the object is not found. 2118 """ 2119 if isinstance(file_path, int): 2120 if allow_fd: 2121 return self.get_open_file(file_path).get_object() 2122 raise TypeError('path should be string, bytes or ' 2123 'os.PathLike, not int') 2124 2125 if follow_symlinks: 2126 return self.get_object_from_normpath(self.resolve_path( 2127 file_path, check_read_perm), check_read_perm) 2128 return self.lresolve(file_path) 2129 2130 def lresolve(self, path: AnyPath) -> FakeFile: 2131 """Search for the specified object, resolving only parent links. 2132 2133 This is analogous to the stat/lstat difference. This resolves links 2134 *to* the object but not of the final object itself. 2135 2136 Args: 2137 path: Specifies target FakeFile object to retrieve. 2138 2139 Returns: 2140 The FakeFile object corresponding to path. 2141 2142 Raises: 2143 OSError: if the object is not found. 2144 """ 2145 path_str = make_string_path(path) 2146 if not path_str: 2147 raise OSError(errno.ENOENT, path_str) 2148 if path_str == matching_string(path_str, self.root.name): 2149 # The root directory will never be a link 2150 return self.root 2151 2152 # remove trailing separator 2153 path_str = self._path_without_trailing_separators(path_str) 2154 if path_str == matching_string(path_str, '.'): 2155 path_str = matching_string(path_str, self.cwd) 2156 path_str = self._original_path(path_str) 2157 2158 parent_directory, child_name = self.splitpath(path_str) 2159 if not parent_directory: 2160 parent_directory = matching_string(path_str, self.cwd) 2161 try: 2162 parent_obj = self.resolve(parent_directory) 2163 assert parent_obj 2164 if not isinstance(parent_obj, FakeDirectory): 2165 if not self.is_windows_fs and isinstance(parent_obj, FakeFile): 2166 self.raise_os_error(errno.ENOTDIR, path_str) 2167 self.raise_os_error(errno.ENOENT, path_str) 2168 if not parent_obj.st_mode & PERM_READ: 2169 self.raise_os_error(errno.EACCES, parent_directory) 2170 return (parent_obj.get_entry(to_string(child_name)) if child_name 2171 else parent_obj) 2172 except KeyError: 2173 self.raise_os_error(errno.ENOENT, path_str) 2174 2175 def add_object(self, file_path: AnyStr, file_object: AnyFile) -> None: 2176 """Add a fake file or directory into the filesystem at file_path. 2177 2178 Args: 2179 file_path: The path to the file to be added relative to self. 2180 file_object: File or directory to add. 2181 2182 Raises: 2183 OSError: if file_path does not correspond to a 2184 directory. 2185 """ 2186 if not file_path: 2187 target_directory = self.root 2188 else: 2189 target_directory = cast(FakeDirectory, self.resolve(file_path)) 2190 if not S_ISDIR(target_directory.st_mode): 2191 error = errno.ENOENT if self.is_windows_fs else errno.ENOTDIR 2192 self.raise_os_error(error, file_path) 2193 target_directory.add_entry(file_object) 2194 2195 def rename(self, old_file_path: AnyPath, 2196 new_file_path: AnyPath, 2197 force_replace: bool = False) -> None: 2198 """Renames a FakeFile object at old_file_path to new_file_path, 2199 preserving all properties. 2200 2201 Args: 2202 old_file_path: Path to filesystem object to rename. 2203 new_file_path: Path to where the filesystem object will live 2204 after this call. 2205 force_replace: If set and destination is an existing file, it 2206 will be replaced even under Windows if the user has 2207 permissions, otherwise replacement happens under Unix only. 2208 2209 Raises: 2210 OSError: if old_file_path does not exist. 2211 OSError: if new_file_path is an existing directory 2212 (Windows, or Posix if old_file_path points to a regular file) 2213 OSError: if old_file_path is a directory and new_file_path a file 2214 OSError: if new_file_path is an existing file and force_replace 2215 not set (Windows only). 2216 OSError: if new_file_path is an existing file and could not be 2217 removed (Posix, or Windows with force_replace set). 2218 OSError: if dirname(new_file_path) does not exist. 2219 OSError: if the file would be moved to another filesystem 2220 (e.g. mount point). 2221 """ 2222 old_path = make_string_path(old_file_path) 2223 new_path = make_string_path(new_file_path) 2224 ends_with_sep = self.ends_with_path_separator(old_path) 2225 old_path = self.absnormpath(old_path) 2226 new_path = self.absnormpath(new_path) 2227 if not self.exists(old_path, check_link=True): 2228 self.raise_os_error(errno.ENOENT, old_path, 2) 2229 if ends_with_sep: 2230 self._handle_broken_link_with_trailing_sep(old_path) 2231 2232 old_object = self.lresolve(old_path) 2233 if not self.is_windows_fs: 2234 self._handle_posix_dir_link_errors( 2235 new_path, old_path, ends_with_sep) 2236 2237 if self.exists(new_path, check_link=True): 2238 renamed_path = self._rename_to_existing_path( 2239 force_replace, new_path, old_path, 2240 old_object, ends_with_sep) 2241 2242 if renamed_path is None: 2243 return 2244 else: 2245 new_path = renamed_path 2246 2247 old_dir, old_name = self.splitpath(old_path) 2248 new_dir, new_name = self.splitpath(new_path) 2249 if not self.exists(new_dir): 2250 self.raise_os_error(errno.ENOENT, new_dir) 2251 old_dir_object = self.resolve(old_dir) 2252 new_dir_object = self.resolve(new_dir) 2253 if old_dir_object.st_dev != new_dir_object.st_dev: 2254 self.raise_os_error(errno.EXDEV, old_path) 2255 if not S_ISDIR(new_dir_object.st_mode): 2256 self.raise_os_error( 2257 errno.EACCES if self.is_windows_fs else errno.ENOTDIR, 2258 new_path) 2259 if new_dir_object.has_parent_object(old_object): 2260 self.raise_os_error(errno.EINVAL, new_path) 2261 2262 self._do_rename(old_dir_object, old_name, new_dir_object, new_name) 2263 2264 def _do_rename(self, old_dir_object, old_name, new_dir_object, new_name): 2265 object_to_rename = old_dir_object.get_entry(old_name) 2266 old_dir_object.remove_entry(old_name, recursive=False) 2267 object_to_rename.name = new_name 2268 new_name = new_dir_object._normalized_entryname(new_name) 2269 old_entry = (new_dir_object.get_entry(new_name) 2270 if new_name in new_dir_object.entries else None) 2271 try: 2272 if old_entry: 2273 # in case of overwriting remove the old entry first 2274 new_dir_object.remove_entry(new_name) 2275 new_dir_object.add_entry(object_to_rename) 2276 except OSError: 2277 # adding failed, roll back the changes before re-raising 2278 if old_entry and new_name not in new_dir_object.entries: 2279 new_dir_object.add_entry(old_entry) 2280 object_to_rename.name = old_name 2281 old_dir_object.add_entry(object_to_rename) 2282 raise 2283 2284 def _handle_broken_link_with_trailing_sep(self, path: AnyStr) -> None: 2285 # note that the check for trailing sep has to be done earlier 2286 if self.islink(path): 2287 if not self.exists(path): 2288 error = (errno.ENOENT if self.is_macos else 2289 errno.EINVAL if self.is_windows_fs else errno.ENOTDIR) 2290 self.raise_os_error(error, path) 2291 2292 def _handle_posix_dir_link_errors(self, new_file_path: AnyStr, 2293 old_file_path: AnyStr, 2294 ends_with_sep: bool) -> None: 2295 if (self.isdir(old_file_path, follow_symlinks=False) and 2296 self.islink(new_file_path)): 2297 self.raise_os_error(errno.ENOTDIR, new_file_path) 2298 if (self.isdir(new_file_path, follow_symlinks=False) and 2299 self.islink(old_file_path)): 2300 if ends_with_sep and self.is_macos: 2301 return 2302 error = errno.ENOTDIR if ends_with_sep else errno.EISDIR 2303 self.raise_os_error(error, new_file_path) 2304 if (ends_with_sep and self.islink(old_file_path) and 2305 old_file_path == new_file_path and not self.is_windows_fs): 2306 self.raise_os_error(errno.ENOTDIR, new_file_path) 2307 2308 def _rename_to_existing_path(self, force_replace: bool, 2309 new_file_path: AnyStr, 2310 old_file_path: AnyStr, 2311 old_object: FakeFile, 2312 ends_with_sep: bool) -> Optional[AnyStr]: 2313 new_object = self.get_object(new_file_path) 2314 if old_file_path == new_file_path: 2315 if not S_ISLNK(new_object.st_mode) and ends_with_sep: 2316 error = errno.EINVAL if self.is_windows_fs else errno.ENOTDIR 2317 self.raise_os_error(error, old_file_path) 2318 return None # Nothing to do here 2319 2320 if old_object == new_object: 2321 return self._rename_same_object(new_file_path, old_file_path) 2322 if S_ISDIR(new_object.st_mode) or S_ISLNK(new_object.st_mode): 2323 self._handle_rename_error_for_dir_or_link( 2324 force_replace, new_file_path, 2325 new_object, old_object, ends_with_sep) 2326 elif S_ISDIR(old_object.st_mode): 2327 error = errno.EEXIST if self.is_windows_fs else errno.ENOTDIR 2328 self.raise_os_error(error, new_file_path) 2329 elif self.is_windows_fs and not force_replace: 2330 self.raise_os_error(errno.EEXIST, new_file_path) 2331 else: 2332 self.remove_object(new_file_path) 2333 return new_file_path 2334 2335 def _handle_rename_error_for_dir_or_link(self, force_replace: bool, 2336 new_file_path: AnyStr, 2337 new_object: FakeFile, 2338 old_object: FakeFile, 2339 ends_with_sep: bool) -> None: 2340 if self.is_windows_fs: 2341 if force_replace: 2342 self.raise_os_error(errno.EACCES, new_file_path) 2343 else: 2344 self.raise_os_error(errno.EEXIST, new_file_path) 2345 if not S_ISLNK(new_object.st_mode): 2346 if new_object.entries: 2347 if (not S_ISLNK(old_object.st_mode) or 2348 not ends_with_sep or not self.is_macos): 2349 self.raise_os_error(errno.ENOTEMPTY, new_file_path) 2350 if S_ISREG(old_object.st_mode): 2351 self.raise_os_error(errno.EISDIR, new_file_path) 2352 2353 def _rename_same_object(self, new_file_path: AnyStr, 2354 old_file_path: AnyStr) -> Optional[AnyStr]: 2355 do_rename = old_file_path.lower() == new_file_path.lower() 2356 if not do_rename: 2357 try: 2358 real_old_path = self.resolve_path(old_file_path) 2359 original_old_path = self._original_path(real_old_path) 2360 real_new_path = self.resolve_path(new_file_path) 2361 if (real_new_path == original_old_path and 2362 (new_file_path == real_old_path) == 2363 (new_file_path.lower() == 2364 real_old_path.lower())): 2365 real_object = self.resolve(old_file_path, 2366 follow_symlinks=False) 2367 do_rename = (os.path.basename(old_file_path) == 2368 real_object.name or not self.is_macos) 2369 else: 2370 do_rename = (real_new_path.lower() == 2371 real_old_path.lower()) 2372 if do_rename: 2373 # only case is changed in case-insensitive file 2374 # system - do the rename 2375 parent, file_name = self.splitpath(new_file_path) 2376 new_file_path = self.joinpaths( 2377 self._original_path(parent), file_name) 2378 except OSError: 2379 # ResolvePath may fail due to symlink loop issues or 2380 # similar - in this case just assume the paths 2381 # to be different 2382 pass 2383 if not do_rename: 2384 # hard links to the same file - nothing to do 2385 return None 2386 return new_file_path 2387 2388 def remove_object(self, file_path: AnyStr) -> None: 2389 """Remove an existing file or directory. 2390 2391 Args: 2392 file_path: The path to the file relative to self. 2393 2394 Raises: 2395 OSError: if file_path does not correspond to an existing file, or 2396 if part of the path refers to something other than a directory. 2397 OSError: if the directory is in use (eg, if it is '/'). 2398 """ 2399 file_path = self.absnormpath(self._original_path(file_path)) 2400 if self._is_root_path(file_path): 2401 self.raise_os_error(errno.EBUSY, file_path) 2402 try: 2403 dirname, basename = self.splitpath(file_path) 2404 target_directory = self.resolve(dirname, check_read_perm=False) 2405 target_directory.remove_entry(basename) 2406 except KeyError: 2407 self.raise_os_error(errno.ENOENT, file_path) 2408 except AttributeError: 2409 self.raise_os_error(errno.ENOTDIR, file_path) 2410 2411 def make_string_path(self, path: AnyPath) -> AnyStr: 2412 path_str = make_string_path(path) 2413 os_sep = matching_string(path_str, os.sep) 2414 fake_sep = matching_string(path_str, self.path_separator) 2415 return path_str.replace(os_sep, fake_sep) # type: ignore[return-value] 2416 2417 def create_dir(self, directory_path: AnyPath, 2418 perm_bits: int = PERM_DEF) -> FakeDirectory: 2419 """Create `directory_path`, and all the parent directories. 2420 2421 Helper method to set up your test faster. 2422 2423 Args: 2424 directory_path: The full directory path to create. 2425 perm_bits: The permission bits as set by `chmod`. 2426 2427 Returns: 2428 The newly created FakeDirectory object. 2429 2430 Raises: 2431 OSError: if the directory already exists. 2432 """ 2433 dir_path = self.make_string_path(directory_path) 2434 dir_path = self.absnormpath(dir_path) 2435 self._auto_mount_drive_if_needed(dir_path) 2436 if (self.exists(dir_path, check_link=True) and 2437 dir_path not in self.mount_points): 2438 self.raise_os_error(errno.EEXIST, dir_path) 2439 path_components = self._path_components(dir_path) 2440 current_dir = self.root 2441 2442 new_dirs = [] 2443 for component in [to_string(p) for p in path_components]: 2444 directory = self._directory_content( 2445 current_dir, to_string(component))[1] 2446 if not directory: 2447 new_dir = FakeDirectory(component, filesystem=self) 2448 new_dirs.append(new_dir) 2449 current_dir.add_entry(new_dir) 2450 current_dir = new_dir 2451 else: 2452 if S_ISLNK(directory.st_mode): 2453 assert directory.contents 2454 directory = self.resolve(directory.contents) 2455 assert directory 2456 current_dir = cast(FakeDirectory, directory) 2457 if directory.st_mode & S_IFDIR != S_IFDIR: 2458 self.raise_os_error(errno.ENOTDIR, current_dir.path) 2459 2460 # set the permission after creating the directories 2461 # to allow directory creation inside a read-only directory 2462 for new_dir in new_dirs: 2463 new_dir.st_mode = S_IFDIR | perm_bits 2464 2465 return current_dir 2466 2467 def create_file(self, file_path: AnyPath, 2468 st_mode: int = S_IFREG | PERM_DEF_FILE, 2469 contents: AnyString = '', 2470 st_size: Optional[int] = None, 2471 create_missing_dirs: bool = True, 2472 apply_umask: bool = False, 2473 encoding: Optional[str] = None, 2474 errors: Optional[str] = None, 2475 side_effect: Optional[Callable] = None) -> FakeFile: 2476 """Create `file_path`, including all the parent directories along 2477 the way. 2478 2479 This helper method can be used to set up tests more easily. 2480 2481 Args: 2482 file_path: The path to the file to create. 2483 st_mode: The stat constant representing the file type. 2484 contents: the contents of the file. If not given and st_size is 2485 None, an empty file is assumed. 2486 st_size: file size; only valid if contents not given. If given, 2487 the file is considered to be in "large file mode" and trying 2488 to read from or write to the file will result in an exception. 2489 create_missing_dirs: If `True`, auto create missing directories. 2490 apply_umask: `True` if the current umask must be applied 2491 on `st_mode`. 2492 encoding: If `contents` is a unicode string, the encoding used 2493 for serialization. 2494 errors: The error mode used for encoding/decoding errors. 2495 side_effect: function handle that is executed when file is written, 2496 must accept the file object as an argument. 2497 2498 Returns: 2499 The newly created FakeFile object. 2500 2501 Raises: 2502 OSError: if the file already exists. 2503 OSError: if the containing directory is required and missing. 2504 """ 2505 return self.create_file_internally( 2506 file_path, st_mode, contents, st_size, create_missing_dirs, 2507 apply_umask, encoding, errors, side_effect=side_effect) 2508 2509 def add_real_file(self, source_path: AnyPath, 2510 read_only: bool = True, 2511 target_path: Optional[AnyPath] = None) -> FakeFile: 2512 """Create `file_path`, including all the parent directories along the 2513 way, for an existing real file. The contents of the real file are read 2514 only on demand. 2515 2516 Args: 2517 source_path: Path to an existing file in the real file system 2518 read_only: If `True` (the default), writing to the fake file 2519 raises an exception. Otherwise, writing to the file changes 2520 the fake file only. 2521 target_path: If given, the path of the target direction, 2522 otherwise it is equal to `source_path`. 2523 2524 Returns: 2525 the newly created FakeFile object. 2526 2527 Raises: 2528 OSError: if the file does not exist in the real file system. 2529 OSError: if the file already exists in the fake file system. 2530 2531 .. note:: On most systems, accessing the fake file's contents may 2532 update both the real and fake files' `atime` (access time). 2533 In this particular case, `add_real_file()` violates the rule 2534 that `pyfakefs` must not modify the real file system. 2535 """ 2536 target_path = target_path or source_path 2537 source_path_str = make_string_path(source_path) 2538 real_stat = os.stat(source_path_str) 2539 fake_file = self.create_file_internally(target_path, 2540 read_from_real_fs=True) 2541 2542 # for read-only mode, remove the write/executable permission bits 2543 fake_file.stat_result.set_from_stat_result(real_stat) 2544 if read_only: 2545 fake_file.st_mode &= 0o777444 2546 fake_file.file_path = source_path_str 2547 self.change_disk_usage(fake_file.size, fake_file.name, 2548 fake_file.st_dev) 2549 return fake_file 2550 2551 def add_real_symlink(self, source_path: AnyPath, 2552 target_path: Optional[AnyPath] = None) -> FakeFile: 2553 """Create a symlink at source_path (or target_path, if given). It will 2554 point to the same path as the symlink on the real filesystem. Relative 2555 symlinks will point relative to their new location. Absolute symlinks 2556 will point to the same, absolute path as on the real filesystem. 2557 2558 Args: 2559 source_path: The path to the existing symlink. 2560 target_path: If given, the name of the symlink in the fake 2561 filesystem, otherwise, the same as `source_path`. 2562 2563 Returns: 2564 the newly created FakeFile object. 2565 2566 Raises: 2567 OSError: if the directory does not exist in the real file system. 2568 OSError: if the symlink could not be created 2569 (see :py:meth:`create_file`). 2570 OSError: if the directory already exists in the fake file system. 2571 """ 2572 source_path_str = make_string_path(source_path) # TODO: add test 2573 source_path_str = self._path_without_trailing_separators( 2574 source_path_str) 2575 if (not os.path.exists(source_path_str) and 2576 not os.path.islink(source_path_str)): 2577 self.raise_os_error(errno.ENOENT, source_path_str) 2578 2579 target = os.readlink(source_path_str) 2580 2581 if target_path: 2582 return self.create_symlink(target_path, target) 2583 else: 2584 return self.create_symlink(source_path_str, target) 2585 2586 def add_real_directory( 2587 self, source_path: AnyPath, 2588 read_only: bool = True, 2589 lazy_read: bool = True, 2590 target_path: Optional[AnyPath] = None) -> FakeDirectory: 2591 """Create a fake directory corresponding to the real directory at the 2592 specified path. Add entries in the fake directory corresponding to 2593 the entries in the real directory. Symlinks are supported. 2594 2595 Args: 2596 source_path: The path to the existing directory. 2597 read_only: If set, all files under the directory are treated as 2598 read-only, e.g. a write access raises an exception; 2599 otherwise, writing to the files changes the fake files only 2600 as usually. 2601 lazy_read: If set (default), directory contents are only read when 2602 accessed, and only until the needed subdirectory level. 2603 2604 .. note:: This means that the file system size is only updated 2605 at the time the directory contents are read; set this to 2606 `False` only if you are dependent on accurate file system 2607 size in your test 2608 target_path: If given, the target directory, otherwise, 2609 the target directory is the same as `source_path`. 2610 2611 Returns: 2612 the newly created FakeDirectory object. 2613 2614 Raises: 2615 OSError: if the directory does not exist in the real file system. 2616 OSError: if the directory already exists in the fake file system. 2617 """ 2618 source_path_str = make_string_path(source_path) # TODO: add test 2619 source_path_str = self._path_without_trailing_separators( 2620 source_path_str) 2621 if not os.path.exists(source_path_str): 2622 self.raise_os_error(errno.ENOENT, source_path_str) 2623 target_path = target_path or source_path_str 2624 new_dir: FakeDirectory 2625 if lazy_read: 2626 parent_path = os.path.split(target_path)[0] 2627 if self.exists(parent_path): 2628 parent_dir = self.get_object(parent_path) 2629 else: 2630 parent_dir = self.create_dir(parent_path) 2631 new_dir = FakeDirectoryFromRealDirectory( 2632 source_path_str, self, read_only, target_path) 2633 parent_dir.add_entry(new_dir) 2634 else: 2635 new_dir = self.create_dir(target_path) 2636 for base, _, files in os.walk(source_path_str): 2637 new_base = os.path.join(new_dir.path, # type: ignore[arg-type] 2638 os.path.relpath(base, source_path_str)) 2639 for fileEntry in os.listdir(base): 2640 abs_fileEntry = os.path.join(base, fileEntry) 2641 2642 if not os.path.islink(abs_fileEntry): 2643 continue 2644 2645 self.add_real_symlink( 2646 abs_fileEntry, os.path.join(new_base, fileEntry)) 2647 for fileEntry in files: 2648 path = os.path.join(base, fileEntry) 2649 if os.path.islink(path): 2650 continue 2651 self.add_real_file(path, 2652 read_only, 2653 os.path.join(new_base, fileEntry)) 2654 return new_dir 2655 2656 def add_real_paths(self, path_list: List[AnyStr], 2657 read_only: bool = True, 2658 lazy_dir_read: bool = True) -> None: 2659 """This convenience method adds multiple files and/or directories from 2660 the real file system to the fake file system. See `add_real_file()` and 2661 `add_real_directory()`. 2662 2663 Args: 2664 path_list: List of file and directory paths in the real file 2665 system. 2666 read_only: If set, all files and files under under the directories 2667 are treated as read-only, e.g. a write access raises an 2668 exception; otherwise, writing to the files changes the fake 2669 files only as usually. 2670 lazy_dir_read: Uses lazy reading of directory contents if set 2671 (see `add_real_directory`) 2672 2673 Raises: 2674 OSError: if any of the files and directories in the list 2675 does not exist in the real file system. 2676 OSError: if any of the files and directories in the list 2677 already exists in the fake file system. 2678 """ 2679 for path in path_list: 2680 if os.path.isdir(path): 2681 self.add_real_directory(path, read_only, lazy_dir_read) 2682 else: 2683 self.add_real_file(path, read_only) 2684 2685 def create_file_internally( 2686 self, file_path: AnyPath, 2687 st_mode: int = S_IFREG | PERM_DEF_FILE, 2688 contents: AnyString = '', 2689 st_size: Optional[int] = None, 2690 create_missing_dirs: bool = True, 2691 apply_umask: bool = False, 2692 encoding: Optional[str] = None, 2693 errors: Optional[str] = None, 2694 read_from_real_fs: bool = False, 2695 side_effect: Optional[Callable] = None) -> FakeFile: 2696 """Internal fake file creator that supports both normal fake files 2697 and fake files based on real files. 2698 2699 Args: 2700 file_path: path to the file to create. 2701 st_mode: the stat.S_IF constant representing the file type. 2702 contents: the contents of the file. If not given and st_size is 2703 None, an empty file is assumed. 2704 st_size: file size; only valid if contents not given. If given, 2705 the file is considered to be in "large file mode" and trying 2706 to read from or write to the file will result in an exception. 2707 create_missing_dirs: if True, auto create missing directories. 2708 apply_umask: whether or not the current umask must be applied 2709 on st_mode. 2710 encoding: if contents is a unicode string, the encoding used for 2711 serialization. 2712 errors: the error mode used for encoding/decoding errors 2713 read_from_real_fs: if True, the contents are read from the real 2714 file system on demand. 2715 side_effect: function handle that is executed when file is written, 2716 must accept the file object as an argument. 2717 """ 2718 path = self.make_string_path(file_path) 2719 path = self.absnormpath(path) 2720 if not is_int_type(st_mode): 2721 raise TypeError( 2722 'st_mode must be of int type - did you mean to set contents?') 2723 2724 if self.exists(path, check_link=True): 2725 self.raise_os_error(errno.EEXIST, path) 2726 parent_directory, new_file = self.splitpath(path) 2727 if not parent_directory: 2728 parent_directory = matching_string(path, self.cwd) 2729 self._auto_mount_drive_if_needed(parent_directory) 2730 if not self.exists(parent_directory): 2731 if not create_missing_dirs: 2732 self.raise_os_error(errno.ENOENT, parent_directory) 2733 self.create_dir(parent_directory) 2734 else: 2735 parent_directory = self._original_path(parent_directory) 2736 if apply_umask: 2737 st_mode &= ~self.umask 2738 file_object: FakeFile 2739 if read_from_real_fs: 2740 file_object = FakeFileFromRealFile(to_string(path), 2741 filesystem=self, 2742 side_effect=side_effect) 2743 else: 2744 file_object = FakeFile(new_file, st_mode, filesystem=self, 2745 encoding=encoding, errors=errors, 2746 side_effect=side_effect) 2747 2748 self.add_object(parent_directory, file_object) 2749 2750 if st_size is None and contents is None: 2751 contents = '' 2752 if (not read_from_real_fs and 2753 (contents is not None or st_size is not None)): 2754 try: 2755 if st_size is not None: 2756 file_object.set_large_file_size(st_size) 2757 else: 2758 file_object.set_initial_contents(contents) # type: ignore 2759 except OSError: 2760 self.remove_object(path) 2761 raise 2762 2763 return file_object 2764 2765 def create_symlink(self, file_path: AnyPath, 2766 link_target: AnyPath, 2767 create_missing_dirs: bool = True) -> FakeFile: 2768 """Create the specified symlink, pointed at the specified link target. 2769 2770 Args: 2771 file_path: path to the symlink to create 2772 link_target: the target of the symlink 2773 create_missing_dirs: If `True`, any missing parent directories of 2774 file_path will be created 2775 2776 Returns: 2777 The newly created FakeFile object. 2778 2779 Raises: 2780 OSError: if the symlink could not be created 2781 (see :py:meth:`create_file`). 2782 """ 2783 link_path = self.make_string_path(file_path) 2784 link_target_path = self.make_string_path(link_target) 2785 link_path = self.normcase(link_path) 2786 # the link path cannot end with a path separator 2787 if self.ends_with_path_separator(link_path): 2788 if self.exists(link_path): 2789 self.raise_os_error(errno.EEXIST, link_path) 2790 if self.exists(link_target_path): 2791 if not self.is_windows_fs: 2792 self.raise_os_error(errno.ENOENT, link_path) 2793 else: 2794 if self.is_windows_fs: 2795 self.raise_os_error(errno.EINVAL, link_target_path) 2796 if not self.exists( 2797 self._path_without_trailing_separators(link_path), 2798 check_link=True): 2799 self.raise_os_error(errno.ENOENT, link_target_path) 2800 if self.is_macos: 2801 # to avoid EEXIST exception, remove the link 2802 # if it already exists 2803 if self.exists(link_path, check_link=True): 2804 self.remove_object(link_path) 2805 else: 2806 self.raise_os_error(errno.EEXIST, link_target_path) 2807 2808 # resolve the link path only if it is not a link itself 2809 if not self.islink(link_path): 2810 link_path = self.resolve_path(link_path) 2811 return self.create_file_internally( 2812 link_path, st_mode=S_IFLNK | PERM_DEF, 2813 contents=link_target_path, 2814 create_missing_dirs=create_missing_dirs) 2815 2816 def create_link(self, old_path: AnyPath, 2817 new_path: AnyPath, 2818 follow_symlinks: bool = True, 2819 create_missing_dirs: bool = True) -> FakeFile: 2820 """Create a hard link at new_path, pointing at old_path. 2821 2822 Args: 2823 old_path: An existing link to the target file. 2824 new_path: The destination path to create a new link at. 2825 follow_symlinks: If False and old_path is a symlink, link the 2826 symlink instead of the object it points to. 2827 create_missing_dirs: If `True`, any missing parent directories of 2828 file_path will be created 2829 2830 Returns: 2831 The FakeFile object referred to by old_path. 2832 2833 Raises: 2834 OSError: if something already exists at new_path. 2835 OSError: if old_path is a directory. 2836 OSError: if the parent directory doesn't exist. 2837 """ 2838 old_path_str = make_string_path(old_path) 2839 new_path_str = make_string_path(new_path) 2840 new_path_normalized = self.absnormpath(new_path_str) 2841 if self.exists(new_path_normalized, check_link=True): 2842 self.raise_os_error(errno.EEXIST, new_path_str) 2843 2844 new_parent_directory, new_basename = self.splitpath( 2845 new_path_normalized) 2846 if not new_parent_directory: 2847 new_parent_directory = matching_string(new_path_str, self.cwd) 2848 2849 if not self.exists(new_parent_directory): 2850 if create_missing_dirs: 2851 self.create_dir(new_parent_directory) 2852 else: 2853 self.raise_os_error(errno.ENOENT, new_parent_directory) 2854 2855 if self.ends_with_path_separator(old_path_str): 2856 error = errno.EINVAL if self.is_windows_fs else errno.ENOTDIR 2857 self.raise_os_error(error, old_path_str) 2858 2859 if not self.is_windows_fs and self.ends_with_path_separator(new_path): 2860 self.raise_os_error(errno.ENOENT, old_path_str) 2861 2862 # Retrieve the target file 2863 try: 2864 old_file = self.resolve(old_path_str, 2865 follow_symlinks=follow_symlinks) 2866 except OSError: 2867 self.raise_os_error(errno.ENOENT, old_path_str) 2868 2869 if old_file.st_mode & S_IFDIR: 2870 self.raise_os_error( 2871 errno.EACCES if self.is_windows_fs 2872 else errno.EPERM, old_path_str 2873 ) 2874 2875 # abuse the name field to control the filename of the 2876 # newly created link 2877 old_file.name = new_basename # type: ignore[assignment] 2878 self.add_object(new_parent_directory, old_file) 2879 return old_file 2880 2881 def link(self, old_path: AnyPath, 2882 new_path: AnyPath, 2883 follow_symlinks: bool = True) -> FakeFile: 2884 """Create a hard link at new_path, pointing at old_path. 2885 2886 Args: 2887 old_path: An existing link to the target file. 2888 new_path: The destination path to create a new link at. 2889 follow_symlinks: If False and old_path is a symlink, link the 2890 symlink instead of the object it points to. 2891 2892 Returns: 2893 The FakeFile object referred to by old_path. 2894 2895 Raises: 2896 OSError: if something already exists at new_path. 2897 OSError: if old_path is a directory. 2898 OSError: if the parent directory doesn't exist. 2899 """ 2900 return self.create_link(old_path, new_path, follow_symlinks, 2901 create_missing_dirs=False) 2902 2903 def _is_circular_link(self, link_obj: FakeFile) -> bool: 2904 try: 2905 assert link_obj.contents 2906 self.resolve_path(link_obj.contents) 2907 except OSError as exc: 2908 return exc.errno == errno.ELOOP 2909 return False 2910 2911 def readlink(self, path: AnyPath) -> str: 2912 """Read the target of a symlink. 2913 2914 Args: 2915 path: symlink to read the target of. 2916 2917 Returns: 2918 the string representing the path to which the symbolic link points. 2919 2920 Raises: 2921 TypeError: if path is None 2922 OSError: (with errno=ENOENT) if path is not a valid path, or 2923 (with errno=EINVAL) if path is valid, but is not a symlink, 2924 or if the path ends with a path separator (Posix only) 2925 """ 2926 if path is None: 2927 raise TypeError 2928 link_path = make_string_path(path) 2929 link_obj = self.lresolve(link_path) 2930 if S_IFMT(link_obj.st_mode) != S_IFLNK: 2931 self.raise_os_error(errno.EINVAL, link_path) 2932 2933 if self.ends_with_path_separator(link_path): 2934 if not self.is_windows_fs and self.exists(link_path): 2935 self.raise_os_error(errno.EINVAL, link_path) 2936 if not self.exists(link_obj.path): # type: ignore 2937 if self.is_windows_fs: 2938 error = errno.EINVAL 2939 elif self._is_circular_link(link_obj): 2940 if self.is_macos: 2941 return link_obj.path # type: ignore[return-value] 2942 error = errno.ELOOP 2943 else: 2944 error = errno.ENOENT 2945 self.raise_os_error(error, link_obj.path) 2946 2947 assert link_obj.contents 2948 return link_obj.contents 2949 2950 def makedir(self, dir_path: AnyPath, mode: int = PERM_DEF) -> None: 2951 """Create a leaf Fake directory. 2952 2953 Args: 2954 dir_path: (str) Name of directory to create. 2955 Relative paths are assumed to be relative to '/'. 2956 mode: (int) Mode to create directory with. This argument defaults 2957 to 0o777. The umask is applied to this mode. 2958 2959 Raises: 2960 OSError: if the directory name is invalid or parent directory is 2961 read only or as per :py:meth:`add_object`. 2962 """ 2963 dir_name = make_string_path(dir_path) 2964 ends_with_sep = self.ends_with_path_separator(dir_name) 2965 dir_name = self._path_without_trailing_separators(dir_name) 2966 if not dir_name: 2967 self.raise_os_error(errno.ENOENT, '') 2968 2969 if self.is_windows_fs: 2970 dir_name = self.absnormpath(dir_name) 2971 parent_dir, _ = self.splitpath(dir_name) 2972 if parent_dir: 2973 base_dir = self.normpath(parent_dir) 2974 ellipsis = matching_string(parent_dir, self.path_separator + '..') 2975 if parent_dir.endswith(ellipsis) and not self.is_windows_fs: 2976 base_dir, dummy_dotdot, _ = parent_dir.partition(ellipsis) 2977 if not self.exists(base_dir): 2978 self.raise_os_error(errno.ENOENT, base_dir) 2979 2980 dir_name = self.absnormpath(dir_name) 2981 if self.exists(dir_name, check_link=True): 2982 if self.is_windows_fs and dir_name == self.path_separator: 2983 error_nr = errno.EACCES 2984 else: 2985 error_nr = errno.EEXIST 2986 if ends_with_sep and self.is_macos and not self.exists(dir_name): 2987 # to avoid EEXIST exception, remove the link 2988 self.remove_object(dir_name) 2989 else: 2990 self.raise_os_error(error_nr, dir_name) 2991 head, tail = self.splitpath(dir_name) 2992 2993 self.add_object( 2994 to_string(head), 2995 FakeDirectory(to_string(tail), mode & ~self.umask, 2996 filesystem=self)) 2997 2998 def _path_without_trailing_separators(self, path: AnyStr) -> AnyStr: 2999 while self.ends_with_path_separator(path): 3000 path = path[:-1] 3001 return path 3002 3003 def makedirs(self, dir_name: AnyStr, mode: int = PERM_DEF, 3004 exist_ok: bool = False) -> None: 3005 """Create a leaf Fake directory and create any non-existent 3006 parent dirs. 3007 3008 Args: 3009 dir_name: (str) Name of directory to create. 3010 mode: (int) Mode to create directory (and any necessary parent 3011 directories) with. This argument defaults to 0o777. 3012 The umask is applied to this mode. 3013 exist_ok: (boolean) If exist_ok is False (the default), an OSError is 3014 raised if the target directory already exists. 3015 3016 Raises: 3017 OSError: if the directory already exists and exist_ok=False, 3018 or as per :py:meth:`create_dir`. 3019 """ 3020 if not dir_name: 3021 self.raise_os_error(errno.ENOENT, '') 3022 ends_with_sep = self.ends_with_path_separator(dir_name) 3023 dir_name = self.absnormpath(dir_name) 3024 if (ends_with_sep and self.is_macos and 3025 self.exists(dir_name, check_link=True) and 3026 not self.exists(dir_name)): 3027 # to avoid EEXIST exception, remove the link 3028 self.remove_object(dir_name) 3029 3030 dir_name_str = to_string(dir_name) 3031 path_components = self._path_components(dir_name_str) 3032 3033 # Raise a permission denied error if the first existing directory 3034 # is not writeable. 3035 current_dir = self.root 3036 for component in path_components: 3037 if (not hasattr(current_dir, "entries") or 3038 component not in current_dir.entries): 3039 break 3040 else: 3041 current_dir = cast(FakeDirectory, 3042 current_dir.entries[component]) 3043 try: 3044 self.create_dir(dir_name, mode & ~self.umask) 3045 except OSError as e: 3046 if e.errno == errno.EACCES: 3047 # permission denied - propagate exception 3048 raise 3049 if (not exist_ok or 3050 not isinstance(self.resolve(dir_name), FakeDirectory)): 3051 if self.is_windows_fs and e.errno == errno.ENOTDIR: 3052 e.errno = errno.ENOENT 3053 self.raise_os_error(e.errno, e.filename) 3054 3055 def _is_of_type(self, path: AnyPath, st_flag: int, 3056 follow_symlinks: bool = True, 3057 check_read_perm: bool = True) -> bool: 3058 """Helper function to implement isdir(), islink(), etc. 3059 3060 See the stat(2) man page for valid stat.S_I* flag values 3061 3062 Args: 3063 path: Path to file to stat and test 3064 st_flag: The stat.S_I* flag checked for the file's st_mode 3065 check_read_perm: If True (default) False is returned for 3066 existing but unreadable file paths. 3067 3068 Returns: 3069 (boolean) `True` if the st_flag is set in path's st_mode. 3070 3071 Raises: 3072 TypeError: if path is None 3073 """ 3074 if path is None: 3075 raise TypeError 3076 file_path = make_string_path(path) 3077 try: 3078 obj = self.resolve(file_path, follow_symlinks, 3079 check_read_perm=check_read_perm) 3080 if obj: 3081 self.raise_for_filepath_ending_with_separator( 3082 file_path, obj, macos_handling=not follow_symlinks) 3083 return S_IFMT(obj.st_mode) == st_flag 3084 except OSError: 3085 return False 3086 return False 3087 3088 def isdir(self, path: AnyPath, follow_symlinks: bool = True) -> bool: 3089 """Determine if path identifies a directory. 3090 3091 Args: 3092 path: Path to filesystem object. 3093 3094 Returns: 3095 `True` if path points to a directory (following symlinks). 3096 3097 Raises: 3098 TypeError: if path is None. 3099 """ 3100 return self._is_of_type(path, S_IFDIR, follow_symlinks) 3101 3102 def isfile(self, path: AnyPath, follow_symlinks: bool = True) -> bool: 3103 """Determine if path identifies a regular file. 3104 3105 Args: 3106 path: Path to filesystem object. 3107 3108 Returns: 3109 `True` if path points to a regular file (following symlinks). 3110 3111 Raises: 3112 TypeError: if path is None. 3113 """ 3114 return self._is_of_type(path, S_IFREG, follow_symlinks, 3115 check_read_perm=False) 3116 3117 def islink(self, path: AnyPath) -> bool: 3118 """Determine if path identifies a symbolic link. 3119 3120 Args: 3121 path: Path to filesystem object. 3122 3123 Returns: 3124 `True` if path points to a symlink (S_IFLNK set in st_mode) 3125 3126 Raises: 3127 TypeError: if path is None. 3128 """ 3129 return self._is_of_type(path, S_IFLNK, follow_symlinks=False) 3130 3131 def confirmdir(self, target_directory: AnyStr) -> FakeDirectory: 3132 """Test that the target is actually a directory, raising OSError 3133 if not. 3134 3135 Args: 3136 target_directory: Path to the target directory within the fake 3137 filesystem. 3138 3139 Returns: 3140 The FakeDirectory object corresponding to target_directory. 3141 3142 Raises: 3143 OSError: if the target is not a directory. 3144 """ 3145 directory = cast(FakeDirectory, self.resolve(target_directory)) 3146 if not directory.st_mode & S_IFDIR: 3147 self.raise_os_error(errno.ENOTDIR, target_directory, 267) 3148 return directory 3149 3150 def remove(self, path: AnyStr) -> None: 3151 """Remove the FakeFile object at the specified file path. 3152 3153 Args: 3154 path: Path to file to be removed. 3155 3156 Raises: 3157 OSError: if path points to a directory. 3158 OSError: if path does not exist. 3159 OSError: if removal failed. 3160 """ 3161 norm_path = make_string_path(path) 3162 norm_path = self.absnormpath(norm_path) 3163 if self.ends_with_path_separator(path): 3164 self._handle_broken_link_with_trailing_sep(norm_path) 3165 if self.exists(norm_path): 3166 obj = self.resolve(norm_path, check_read_perm=False) 3167 if S_IFMT(obj.st_mode) == S_IFDIR: 3168 link_obj = self.lresolve(norm_path) 3169 if S_IFMT(link_obj.st_mode) != S_IFLNK: 3170 if self.is_windows_fs: 3171 error = errno.EACCES 3172 elif self.is_macos: 3173 error = errno.EPERM 3174 else: 3175 error = errno.EISDIR 3176 self.raise_os_error(error, norm_path) 3177 3178 if path.endswith(matching_string(path, self.path_separator)): 3179 if self.is_windows_fs: 3180 error = errno.EACCES 3181 elif self.is_macos: 3182 error = errno.EPERM 3183 else: 3184 error = errno.ENOTDIR 3185 self.raise_os_error(error, norm_path) 3186 else: 3187 self.raise_for_filepath_ending_with_separator(path, obj) 3188 3189 self.remove_object(norm_path) 3190 3191 def rmdir(self, target_directory: AnyStr, 3192 allow_symlink: bool = False) -> None: 3193 """Remove a leaf Fake directory. 3194 3195 Args: 3196 target_directory: (str) Name of directory to remove. 3197 allow_symlink: (bool) if `target_directory` is a symlink, 3198 the function just returns, otherwise it raises (Posix only) 3199 3200 Raises: 3201 OSError: if target_directory does not exist. 3202 OSError: if target_directory does not point to a directory. 3203 OSError: if removal failed per FakeFilesystem.RemoveObject. 3204 Cannot remove '.'. 3205 """ 3206 if target_directory == matching_string(target_directory, '.'): 3207 error_nr = errno.EACCES if self.is_windows_fs else errno.EINVAL 3208 self.raise_os_error(error_nr, target_directory) 3209 ends_with_sep = self.ends_with_path_separator(target_directory) 3210 target_directory = self.absnormpath(target_directory) 3211 if self.confirmdir(target_directory): 3212 if not self.is_windows_fs and self.islink(target_directory): 3213 if allow_symlink: 3214 return 3215 if not ends_with_sep or not self.is_macos: 3216 self.raise_os_error(errno.ENOTDIR, target_directory) 3217 3218 dir_object = self.resolve(target_directory) 3219 if dir_object.entries: 3220 self.raise_os_error(errno.ENOTEMPTY, target_directory) 3221 self.remove_object(target_directory) 3222 3223 def listdir(self, target_directory: AnyStr) -> List[AnyStr]: 3224 """Return a list of file names in target_directory. 3225 3226 Args: 3227 target_directory: Path to the target directory within the 3228 fake filesystem. 3229 3230 Returns: 3231 A list of file names within the target directory in arbitrary 3232 order. If `shuffle_listdir_results` is set, the order is not the 3233 same in subsequent calls to avoid tests relying on any ordering. 3234 3235 Raises: 3236 OSError: if the target is not a directory. 3237 """ 3238 target_directory = self.resolve_path(target_directory, allow_fd=True) 3239 directory = self.confirmdir(target_directory) 3240 directory_contents = list(directory.entries.keys()) 3241 if self.shuffle_listdir_results: 3242 random.shuffle(directory_contents) 3243 return directory_contents # type: ignore[return-value] 3244 3245 def __str__(self) -> str: 3246 return str(self.root) 3247 3248 def _add_standard_streams(self) -> None: 3249 self._add_open_file(StandardStreamWrapper(sys.stdin)) 3250 self._add_open_file(StandardStreamWrapper(sys.stdout)) 3251 self._add_open_file(StandardStreamWrapper(sys.stderr)) 3252 3253 3254Deprecator.add(FakeFilesystem, FakeFilesystem.get_disk_usage, 'GetDiskUsage') 3255Deprecator.add(FakeFilesystem, FakeFilesystem.set_disk_usage, 'SetDiskUsage') 3256Deprecator.add(FakeFilesystem, 3257 FakeFilesystem.change_disk_usage, 'ChangeDiskUsage') 3258Deprecator.add(FakeFilesystem, FakeFilesystem.add_mount_point, 'AddMountPoint') 3259Deprecator.add(FakeFilesystem, FakeFilesystem.stat, 'GetStat') 3260Deprecator.add(FakeFilesystem, FakeFilesystem.chmod, 'ChangeMode') 3261Deprecator.add(FakeFilesystem, FakeFilesystem.utime, 'UpdateTime') 3262Deprecator.add(FakeFilesystem, FakeFilesystem._add_open_file, 'AddOpenFile') 3263Deprecator.add(FakeFilesystem, 3264 FakeFilesystem._close_open_file, 'CloseOpenFile') 3265Deprecator.add(FakeFilesystem, FakeFilesystem.has_open_file, 'HasOpenFile') 3266Deprecator.add(FakeFilesystem, FakeFilesystem.get_open_file, 'GetOpenFile') 3267Deprecator.add(FakeFilesystem, 3268 FakeFilesystem.normcase, 'NormalizePathSeparator') 3269Deprecator.add(FakeFilesystem, FakeFilesystem.normpath, 'CollapsePath') 3270Deprecator.add(FakeFilesystem, FakeFilesystem._original_path, 'NormalizeCase') 3271Deprecator.add(FakeFilesystem, FakeFilesystem.absnormpath, 'NormalizePath') 3272Deprecator.add(FakeFilesystem, FakeFilesystem.splitpath, 'SplitPath') 3273Deprecator.add(FakeFilesystem, FakeFilesystem.splitdrive, 'SplitDrive') 3274Deprecator.add(FakeFilesystem, FakeFilesystem.joinpaths, 'JoinPaths') 3275Deprecator.add(FakeFilesystem, 3276 FakeFilesystem._path_components, 'GetPathComponents') 3277Deprecator.add(FakeFilesystem, FakeFilesystem._starts_with_drive_letter, 3278 'StartsWithDriveLetter') 3279Deprecator.add(FakeFilesystem, FakeFilesystem.exists, 'Exists') 3280Deprecator.add(FakeFilesystem, FakeFilesystem.resolve_path, 'ResolvePath') 3281Deprecator.add(FakeFilesystem, FakeFilesystem.get_object_from_normpath, 3282 'GetObjectFromNormalizedPath') 3283Deprecator.add(FakeFilesystem, FakeFilesystem.get_object, 'GetObject') 3284Deprecator.add(FakeFilesystem, FakeFilesystem.resolve, 'ResolveObject') 3285Deprecator.add(FakeFilesystem, FakeFilesystem.lresolve, 'LResolveObject') 3286Deprecator.add(FakeFilesystem, FakeFilesystem.add_object, 'AddObject') 3287Deprecator.add(FakeFilesystem, FakeFilesystem.remove_object, 'RemoveObject') 3288Deprecator.add(FakeFilesystem, FakeFilesystem.rename, 'RenameObject') 3289Deprecator.add(FakeFilesystem, FakeFilesystem.create_dir, 'CreateDirectory') 3290Deprecator.add(FakeFilesystem, FakeFilesystem.create_file, 'CreateFile') 3291Deprecator.add(FakeFilesystem, FakeFilesystem.create_symlink, 'CreateLink') 3292Deprecator.add(FakeFilesystem, FakeFilesystem.link, 'CreateHardLink') 3293Deprecator.add(FakeFilesystem, FakeFilesystem.readlink, 'ReadLink') 3294Deprecator.add(FakeFilesystem, FakeFilesystem.makedir, 'MakeDirectory') 3295Deprecator.add(FakeFilesystem, FakeFilesystem.makedirs, 'MakeDirectories') 3296Deprecator.add(FakeFilesystem, FakeFilesystem.isdir, 'IsDir') 3297Deprecator.add(FakeFilesystem, FakeFilesystem.isfile, 'IsFile') 3298Deprecator.add(FakeFilesystem, FakeFilesystem.islink, 'IsLink') 3299Deprecator.add(FakeFilesystem, FakeFilesystem.confirmdir, 'ConfirmDir') 3300Deprecator.add(FakeFilesystem, FakeFilesystem.remove, 'RemoveFile') 3301Deprecator.add(FakeFilesystem, FakeFilesystem.rmdir, 'RemoveDirectory') 3302Deprecator.add(FakeFilesystem, FakeFilesystem.listdir, 'ListDir') 3303 3304 3305class FakePathModule: 3306 """Faked os.path module replacement. 3307 3308 FakePathModule should *only* be instantiated by FakeOsModule. See the 3309 FakeOsModule docstring for details. 3310 """ 3311 _OS_PATH_COPY: Any = _copy_module(os.path) 3312 3313 devnull: ClassVar[str] = '' 3314 sep: ClassVar[str] = '' 3315 altsep: ClassVar[Optional[str]] = None 3316 linesep: ClassVar[str] = '' 3317 pathsep: ClassVar[str] = '' 3318 3319 @staticmethod 3320 def dir() -> List[str]: 3321 """Return the list of patched function names. Used for patching 3322 functions imported from the module. 3323 """ 3324 return [ 3325 'abspath', 'dirname', 'exists', 'expanduser', 'getatime', 3326 'getctime', 'getmtime', 'getsize', 'isabs', 'isdir', 'isfile', 3327 'islink', 'ismount', 'join', 'lexists', 'normcase', 'normpath', 3328 'realpath', 'relpath', 'split', 'splitdrive', 'samefile' 3329 ] 3330 3331 def __init__(self, filesystem: FakeFilesystem, os_module: 'FakeOsModule'): 3332 """Init. 3333 3334 Args: 3335 filesystem: FakeFilesystem used to provide file system information 3336 """ 3337 self.filesystem = filesystem 3338 self._os_path = self._OS_PATH_COPY 3339 self._os_path.os = self.os = os_module # type: ignore[attr-defined] 3340 self.reset(filesystem) 3341 3342 @classmethod 3343 def reset(cls, filesystem: FakeFilesystem) -> None: 3344 cls.sep = filesystem.path_separator 3345 cls.altsep = filesystem.alternative_path_separator 3346 cls.linesep = filesystem.line_separator() 3347 cls.devnull = 'nul' if filesystem.is_windows_fs else '/dev/null' 3348 cls.pathsep = ';' if filesystem.is_windows_fs else ':' 3349 3350 def exists(self, path: AnyStr) -> bool: 3351 """Determine whether the file object exists within the fake filesystem. 3352 3353 Args: 3354 path: The path to the file object. 3355 3356 Returns: 3357 (bool) `True` if the file exists. 3358 """ 3359 return self.filesystem.exists(path) 3360 3361 def lexists(self, path: AnyStr) -> bool: 3362 """Test whether a path exists. Returns True for broken symbolic links. 3363 3364 Args: 3365 path: path to the symlink object. 3366 3367 Returns: 3368 bool (if file exists). 3369 """ 3370 return self.filesystem.exists(path, check_link=True) 3371 3372 def getsize(self, path: AnyStr): 3373 """Return the file object size in bytes. 3374 3375 Args: 3376 path: path to the file object. 3377 3378 Returns: 3379 file size in bytes. 3380 """ 3381 file_obj = self.filesystem.resolve(path) 3382 if (self.filesystem.ends_with_path_separator(path) and 3383 S_IFMT(file_obj.st_mode) != S_IFDIR): 3384 error_nr = (errno.EINVAL if self.filesystem.is_windows_fs 3385 else errno.ENOTDIR) 3386 self.filesystem.raise_os_error(error_nr, path) 3387 return file_obj.st_size 3388 3389 def isabs(self, path: AnyStr) -> bool: 3390 """Return True if path is an absolute pathname.""" 3391 if self.filesystem.is_windows_fs: 3392 path = self.splitdrive(path)[1] 3393 path = make_string_path(path) 3394 return self.filesystem._starts_with_sep(path) 3395 3396 def isdir(self, path: AnyStr) -> bool: 3397 """Determine if path identifies a directory.""" 3398 return self.filesystem.isdir(path) 3399 3400 def isfile(self, path: AnyStr) -> bool: 3401 """Determine if path identifies a regular file.""" 3402 return self.filesystem.isfile(path) 3403 3404 def islink(self, path: AnyStr) -> bool: 3405 """Determine if path identifies a symbolic link. 3406 3407 Args: 3408 path: Path to filesystem object. 3409 3410 Returns: 3411 `True` if path points to a symbolic link. 3412 3413 Raises: 3414 TypeError: if path is None. 3415 """ 3416 return self.filesystem.islink(path) 3417 3418 def getmtime(self, path: AnyStr) -> float: 3419 """Returns the modification time of the fake file. 3420 3421 Args: 3422 path: the path to fake file. 3423 3424 Returns: 3425 (int, float) the modification time of the fake file 3426 in number of seconds since the epoch. 3427 3428 Raises: 3429 OSError: if the file does not exist. 3430 """ 3431 try: 3432 file_obj = self.filesystem.resolve(path) 3433 return file_obj.st_mtime 3434 except OSError: 3435 self.filesystem.raise_os_error(errno.ENOENT, winerror=3) 3436 3437 def getatime(self, path: AnyStr) -> float: 3438 """Returns the last access time of the fake file. 3439 3440 Note: Access time is not set automatically in fake filesystem 3441 on access. 3442 3443 Args: 3444 path: the path to fake file. 3445 3446 Returns: 3447 (int, float) the access time of the fake file in number of seconds 3448 since the epoch. 3449 3450 Raises: 3451 OSError: if the file does not exist. 3452 """ 3453 try: 3454 file_obj = self.filesystem.resolve(path) 3455 except OSError: 3456 self.filesystem.raise_os_error(errno.ENOENT) 3457 return file_obj.st_atime 3458 3459 def getctime(self, path: AnyStr) -> float: 3460 """Returns the creation time of the fake file. 3461 3462 Args: 3463 path: the path to fake file. 3464 3465 Returns: 3466 (int, float) the creation time of the fake file in number of 3467 seconds since the epoch. 3468 3469 Raises: 3470 OSError: if the file does not exist. 3471 """ 3472 try: 3473 file_obj = self.filesystem.resolve(path) 3474 except OSError: 3475 self.filesystem.raise_os_error(errno.ENOENT) 3476 return file_obj.st_ctime 3477 3478 def abspath(self, path: AnyStr) -> AnyStr: 3479 """Return the absolute version of a path.""" 3480 3481 def getcwd(): 3482 """Return the current working directory.""" 3483 # pylint: disable=undefined-variable 3484 if isinstance(path, bytes): 3485 return self.os.getcwdb() 3486 else: 3487 return self.os.getcwd() 3488 3489 path = make_string_path(path) 3490 if not self.isabs(path): 3491 path = self.join(getcwd(), path) 3492 elif (self.filesystem.is_windows_fs and 3493 self.filesystem._starts_with_sep(path)): 3494 cwd = getcwd() 3495 if self.filesystem._starts_with_drive_letter(cwd): 3496 path = self.join(cwd[:2], path) 3497 return self.normpath(path) 3498 3499 def join(self, *p: AnyStr) -> AnyStr: 3500 """Return the completed path with a separator of the parts.""" 3501 return self.filesystem.joinpaths(*p) 3502 3503 def split(self, path: AnyStr) -> Tuple[AnyStr, AnyStr]: 3504 """Split the path into the directory and the filename of the path. 3505 """ 3506 return self.filesystem.splitpath(path) 3507 3508 def splitdrive(self, path: AnyStr) -> Tuple[AnyStr, AnyStr]: 3509 """Split the path into the drive part and the rest of the path, if 3510 supported.""" 3511 return self.filesystem.splitdrive(path) 3512 3513 def normpath(self, path: AnyStr) -> AnyStr: 3514 """Normalize path, eliminating double slashes, etc.""" 3515 return self.filesystem.normpath(path) 3516 3517 def normcase(self, path: AnyStr) -> AnyStr: 3518 """Convert to lower case under windows, replaces additional path 3519 separator.""" 3520 path = self.filesystem.normcase(path) 3521 if self.filesystem.is_windows_fs: 3522 path = path.lower() 3523 return path 3524 3525 def relpath(self, path: AnyStr, start: Optional[AnyStr] = None) -> AnyStr: 3526 """We mostly rely on the native implementation and adapt the 3527 path separator.""" 3528 if not path: 3529 raise ValueError("no path specified") 3530 path = make_string_path(path) 3531 if start is not None: 3532 start = make_string_path(start) 3533 else: 3534 start = matching_string(path, self.filesystem.cwd) 3535 system_sep = matching_string(path, self._os_path.sep) 3536 if self.filesystem.alternative_path_separator is not None: 3537 altsep = matching_string( 3538 path, self.filesystem.alternative_path_separator) 3539 path = path.replace(altsep, system_sep) 3540 start = start.replace(altsep, system_sep) 3541 sep = matching_string(path, self.filesystem.path_separator) 3542 path = path.replace(sep, system_sep) 3543 start = start.replace(sep, system_sep) 3544 path = self._os_path.relpath(path, start) 3545 return path.replace(system_sep, sep) 3546 3547 def realpath(self, filename: AnyStr, strict: bool = None) -> AnyStr: 3548 """Return the canonical path of the specified filename, eliminating any 3549 symbolic links encountered in the path. 3550 """ 3551 if strict is not None and sys.version_info < (3, 10): 3552 raise TypeError("realpath() got an unexpected " 3553 "keyword argument 'strict'") 3554 if strict: 3555 # raises in strict mode if the file does not exist 3556 self.filesystem.resolve(filename) 3557 if self.filesystem.is_windows_fs: 3558 return self.abspath(filename) 3559 filename = make_string_path(filename) 3560 path, ok = self._join_real_path(filename[:0], filename, {}) 3561 path = self.abspath(path) 3562 return path 3563 3564 def samefile(self, path1: AnyStr, path2: AnyStr) -> bool: 3565 """Return whether path1 and path2 point to the same file. 3566 3567 Args: 3568 path1: first file path or path object (Python >=3.6) 3569 path2: second file path or path object (Python >=3.6) 3570 3571 Raises: 3572 OSError: if one of the paths does not point to an existing 3573 file system object. 3574 """ 3575 stat1 = self.filesystem.stat(path1) 3576 stat2 = self.filesystem.stat(path2) 3577 return (stat1.st_ino == stat2.st_ino and 3578 stat1.st_dev == stat2.st_dev) 3579 3580 @overload 3581 def _join_real_path( 3582 self, path: str, 3583 rest: str, 3584 seen: Dict[str, Optional[str]]) -> Tuple[str, bool]: ... 3585 3586 @overload 3587 def _join_real_path( 3588 self, path: bytes, 3589 rest: bytes, 3590 seen: Dict[bytes, Optional[bytes]]) -> Tuple[bytes, bool]: ... 3591 3592 def _join_real_path( 3593 self, path: AnyStr, 3594 rest: AnyStr, 3595 seen: Dict[AnyStr, Optional[AnyStr]]) -> Tuple[AnyStr, bool]: 3596 """Join two paths, normalizing and eliminating any symbolic links 3597 encountered in the second path. 3598 Taken from Python source and adapted. 3599 """ 3600 curdir = matching_string(path, '.') 3601 pardir = matching_string(path, '..') 3602 3603 sep = self.filesystem.get_path_separator(path) 3604 if self.isabs(rest): 3605 rest = rest[1:] 3606 path = sep 3607 3608 while rest: 3609 name, _, rest = rest.partition(sep) 3610 if not name or name == curdir: 3611 # current dir 3612 continue 3613 if name == pardir: 3614 # parent dir 3615 if path: 3616 path, name = self.filesystem.splitpath(path) 3617 if name == pardir: 3618 path = self.filesystem.joinpaths(path, pardir, pardir) 3619 else: 3620 path = pardir 3621 continue 3622 newpath = self.filesystem.joinpaths(path, name) 3623 if not self.filesystem.islink(newpath): 3624 path = newpath 3625 continue 3626 # Resolve the symbolic link 3627 if newpath in seen: 3628 # Already seen this path 3629 seen_path = seen[newpath] 3630 if seen_path is not None: 3631 # use cached value 3632 path = seen_path 3633 continue 3634 # The symlink is not resolved, so we must have a symlink loop. 3635 # Return already resolved part + rest of the path unchanged. 3636 return self.filesystem.joinpaths(newpath, rest), False 3637 seen[newpath] = None # not resolved symlink 3638 path, ok = self._join_real_path( 3639 path, matching_string(path, self.filesystem.readlink( 3640 newpath)), seen) 3641 if not ok: 3642 return self.filesystem.joinpaths(path, rest), False 3643 seen[newpath] = path # resolved symlink 3644 return path, True 3645 3646 def dirname(self, path: AnyStr) -> AnyStr: 3647 """Returns the first part of the result of `split()`.""" 3648 return self.split(path)[0] 3649 3650 def expanduser(self, path: AnyStr) -> AnyStr: 3651 """Return the argument with an initial component of ~ or ~user 3652 replaced by that user's home directory. 3653 """ 3654 path = self._os_path.expanduser(path) 3655 return path.replace( 3656 matching_string(path, self._os_path.sep), 3657 matching_string(path, self.sep)) 3658 3659 def ismount(self, path: AnyStr) -> bool: 3660 """Return true if the given path is a mount point. 3661 3662 Args: 3663 path: Path to filesystem object to be checked 3664 3665 Returns: 3666 `True` if path is a mount point added to the fake file system. 3667 Under Windows also returns True for drive and UNC roots 3668 (independent of their existence). 3669 """ 3670 if not path: 3671 return False 3672 path_str = to_string(make_string_path(path)) 3673 normed_path = self.filesystem.absnormpath(path_str) 3674 sep = self.filesystem.path_separator 3675 if self.filesystem.is_windows_fs: 3676 path_seps: Union[Tuple[str, Optional[str]], Tuple[str]] 3677 if self.filesystem.alternative_path_separator is not None: 3678 path_seps = ( 3679 sep, self.filesystem.alternative_path_separator 3680 ) 3681 else: 3682 path_seps = (sep,) 3683 drive, rest = self.filesystem.splitdrive(normed_path) 3684 if drive and drive[:1] in path_seps: 3685 return (not rest) or (rest in path_seps) 3686 if rest in path_seps: 3687 return True 3688 for mount_point in self.filesystem.mount_points: 3689 if (to_string(normed_path).rstrip(sep) == 3690 to_string(mount_point).rstrip(sep)): 3691 return True 3692 return False 3693 3694 def __getattr__(self, name: str) -> Any: 3695 """Forwards any non-faked calls to the real os.path.""" 3696 return getattr(self._os_path, name) 3697 3698 3699class FakeOsModule: 3700 """Uses FakeFilesystem to provide a fake os module replacement. 3701 3702 Do not create os.path separately from os, as there is a necessary circular 3703 dependency between os and os.path to replicate the behavior of the standard 3704 Python modules. What you want to do is to just let FakeOsModule take care 3705 of `os.path` setup itself. 3706 3707 # You always want to do this. 3708 filesystem = fake_filesystem.FakeFilesystem() 3709 my_os_module = fake_filesystem.FakeOsModule(filesystem) 3710 """ 3711 3712 @staticmethod 3713 def dir() -> List[str]: 3714 """Return the list of patched function names. Used for patching 3715 functions imported from the module. 3716 """ 3717 _dir = [ 3718 'access', 'chdir', 'chmod', 'chown', 'close', 'fstat', 'fsync', 3719 'getcwd', 'lchmod', 'link', 'listdir', 'lstat', 'makedirs', 3720 'mkdir', 'mknod', 'open', 'read', 'readlink', 'remove', 3721 'removedirs', 'rename', 'rmdir', 'stat', 'symlink', 'umask', 3722 'unlink', 'utime', 'walk', 'write', 'getcwdb', 'replace' 3723 ] 3724 if sys.platform.startswith('linux'): 3725 _dir += [ 3726 'fdatasync', 'getxattr', 'listxattr', 3727 'removexattr', 'setxattr' 3728 ] 3729 if use_scandir: 3730 _dir += ['scandir'] 3731 return _dir 3732 3733 def __init__(self, filesystem: FakeFilesystem): 3734 """Also exposes self.path (to fake os.path). 3735 3736 Args: 3737 filesystem: FakeFilesystem used to provide file system information 3738 """ 3739 self.filesystem = filesystem 3740 self._os_module: Any = os 3741 self.path = FakePathModule(self.filesystem, self) 3742 3743 @property 3744 def devnull(self) -> str: 3745 return self.path.devnull 3746 3747 @property 3748 def sep(self) -> str: 3749 return self.path.sep 3750 3751 @property 3752 def altsep(self) -> Optional[str]: 3753 return self.path.altsep 3754 3755 @property 3756 def linesep(self) -> str: 3757 return self.path.linesep 3758 3759 @property 3760 def pathsep(self) -> str: 3761 return self.path.pathsep 3762 3763 def fdopen(self, fd: int, *args: Any, **kwargs: Any) -> AnyFileWrapper: 3764 """Redirector to open() builtin function. 3765 3766 Args: 3767 fd: The file descriptor of the file to open. 3768 *args: Pass through args. 3769 **kwargs: Pass through kwargs. 3770 3771 Returns: 3772 File object corresponding to file_des. 3773 3774 Raises: 3775 TypeError: if file descriptor is not an integer. 3776 """ 3777 if not is_int_type(fd): 3778 raise TypeError('an integer is required') 3779 return FakeFileOpen(self.filesystem)(fd, *args, **kwargs) 3780 3781 def _umask(self) -> int: 3782 """Return the current umask.""" 3783 if self.filesystem.is_windows_fs: 3784 # windows always returns 0 - it has no real notion of umask 3785 return 0 3786 if sys.platform == 'win32': 3787 # if we are testing Unix under Windows we assume a default mask 3788 return 0o002 3789 else: 3790 # under Unix, we return the real umask; 3791 # as there is no pure getter for umask, so we have to first 3792 # set a mode to get the previous one and then re-set that 3793 mask = os.umask(0) 3794 os.umask(mask) 3795 return mask 3796 3797 def open(self, path: AnyStr, flags: int, mode: Optional[int] = None, *, 3798 dir_fd: Optional[int] = None) -> int: 3799 """Return the file descriptor for a FakeFile. 3800 3801 Args: 3802 path: the path to the file 3803 flags: low-level bits to indicate io operation 3804 mode: bits to define default permissions 3805 Note: only basic modes are supported, OS-specific modes are 3806 ignored 3807 dir_fd: If not `None`, the file descriptor of a directory, 3808 with `file_path` being relative to this directory. 3809 3810 Returns: 3811 A file descriptor. 3812 3813 Raises: 3814 OSError: if the path cannot be found 3815 ValueError: if invalid mode is given 3816 NotImplementedError: if `os.O_EXCL` is used without `os.O_CREAT` 3817 """ 3818 path = self._path_with_dir_fd(path, self.open, dir_fd) 3819 if mode is None: 3820 if self.filesystem.is_windows_fs: 3821 mode = 0o666 3822 else: 3823 mode = 0o777 & ~self._umask() 3824 3825 has_tmpfile_flag = (hasattr(os, 'O_TMPFILE') and 3826 flags & getattr(os, 'O_TMPFILE')) 3827 open_modes = _OpenModes( 3828 must_exist=not flags & os.O_CREAT and not has_tmpfile_flag, 3829 can_read=not flags & os.O_WRONLY, 3830 can_write=flags & (os.O_RDWR | os.O_WRONLY) != 0, 3831 truncate=flags & os.O_TRUNC != 0, 3832 append=flags & os.O_APPEND != 0, 3833 must_not_exist=flags & os.O_EXCL != 0 3834 ) 3835 if open_modes.must_not_exist and open_modes.must_exist: 3836 raise NotImplementedError( 3837 'O_EXCL without O_CREAT mode is not supported') 3838 if has_tmpfile_flag: 3839 # this is a workaround for tempfiles that do not have a filename 3840 # as we do not support this directly, we just add a unique filename 3841 # and set the file to delete on close 3842 path = self.filesystem.joinpaths( 3843 path, matching_string(path, str(uuid.uuid4()))) 3844 3845 if (not self.filesystem.is_windows_fs and 3846 self.filesystem.exists(path)): 3847 # handle opening directory - only allowed under Posix 3848 # with read-only mode 3849 obj = self.filesystem.resolve(path) 3850 if isinstance(obj, FakeDirectory): 3851 if ((not open_modes.must_exist and 3852 not self.filesystem.is_macos) 3853 or open_modes.can_write): 3854 self.filesystem.raise_os_error(errno.EISDIR, path) 3855 dir_wrapper = FakeDirWrapper(obj, path, self.filesystem) 3856 file_des = self.filesystem._add_open_file(dir_wrapper) 3857 dir_wrapper.filedes = file_des 3858 return file_des 3859 3860 # low level open is always binary 3861 str_flags = 'b' 3862 delete_on_close = has_tmpfile_flag 3863 if hasattr(os, 'O_TEMPORARY'): 3864 delete_on_close = flags & os.O_TEMPORARY == os.O_TEMPORARY 3865 fake_file = FakeFileOpen( 3866 self.filesystem, delete_on_close=delete_on_close, raw_io=True)( 3867 path, str_flags, open_modes=open_modes) 3868 assert not isinstance(fake_file, StandardStreamWrapper) 3869 if fake_file.file_object != self.filesystem.dev_null: 3870 self.chmod(path, mode) 3871 return fake_file.fileno() 3872 3873 def close(self, fd: int) -> None: 3874 """Close a file descriptor. 3875 3876 Args: 3877 fd: An integer file descriptor for the file object requested. 3878 3879 Raises: 3880 OSError: bad file descriptor. 3881 TypeError: if file descriptor is not an integer. 3882 """ 3883 file_handle = self.filesystem.get_open_file(fd) 3884 file_handle.close() 3885 3886 def read(self, fd: int, n: int) -> bytes: 3887 """Read number of bytes from a file descriptor, returns bytes read. 3888 3889 Args: 3890 fd: An integer file descriptor for the file object requested. 3891 n: Number of bytes to read from file. 3892 3893 Returns: 3894 Bytes read from file. 3895 3896 Raises: 3897 OSError: bad file descriptor. 3898 TypeError: if file descriptor is not an integer. 3899 """ 3900 file_handle = self.filesystem.get_open_file(fd) 3901 if isinstance(file_handle, FakeFileWrapper): 3902 file_handle.raw_io = True 3903 if isinstance(file_handle, FakeDirWrapper): 3904 self.filesystem.raise_os_error(errno.EBADF, file_handle.file_path) 3905 return file_handle.read(n) 3906 3907 def write(self, fd: int, contents: bytes) -> int: 3908 """Write string to file descriptor, returns number of bytes written. 3909 3910 Args: 3911 fd: An integer file descriptor for the file object requested. 3912 contents: String of bytes to write to file. 3913 3914 Returns: 3915 Number of bytes written. 3916 3917 Raises: 3918 OSError: bad file descriptor. 3919 TypeError: if file descriptor is not an integer. 3920 """ 3921 file_handle = cast(FakeFileWrapper, self.filesystem.get_open_file(fd)) 3922 if isinstance(file_handle, FakeDirWrapper): 3923 self.filesystem.raise_os_error(errno.EBADF, file_handle.file_path) 3924 3925 if isinstance(file_handle, FakePipeWrapper): 3926 return file_handle.write(contents) 3927 3928 file_handle.raw_io = True 3929 file_handle._sync_io() 3930 file_handle.update_flush_pos() 3931 file_handle.write(contents) 3932 file_handle.flush() 3933 return len(contents) 3934 3935 def pipe(self) -> Tuple[int, int]: 3936 read_fd, write_fd = os.pipe() 3937 read_wrapper = FakePipeWrapper(self.filesystem, read_fd, False) 3938 file_des = self.filesystem._add_open_file(read_wrapper) 3939 read_wrapper.filedes = file_des 3940 write_wrapper = FakePipeWrapper(self.filesystem, write_fd, True) 3941 file_des = self.filesystem._add_open_file(write_wrapper) 3942 write_wrapper.filedes = file_des 3943 return read_wrapper.filedes, write_wrapper.filedes 3944 3945 @staticmethod 3946 def stat_float_times(newvalue: Optional[bool] = None) -> bool: 3947 """Determine whether a file's time stamps are reported as floats 3948 or ints. 3949 3950 Calling without arguments returns the current value. The value is 3951 shared by all instances of FakeOsModule. 3952 3953 Args: 3954 newvalue: If `True`, mtime, ctime, atime are reported as floats. 3955 Otherwise, they are returned as ints (rounding down). 3956 """ 3957 return FakeStatResult.stat_float_times(newvalue) 3958 3959 def fstat(self, fd: int) -> FakeStatResult: 3960 """Return the os.stat-like tuple for the FakeFile object of file_des. 3961 3962 Args: 3963 fd: The file descriptor of filesystem object to retrieve. 3964 3965 Returns: 3966 The FakeStatResult object corresponding to entry_path. 3967 3968 Raises: 3969 OSError: if the filesystem object doesn't exist. 3970 """ 3971 # stat should return the tuple representing return value of os.stat 3972 file_object = self.filesystem.get_open_file(fd).get_object() 3973 assert isinstance(file_object, FakeFile) 3974 return file_object.stat_result.copy() 3975 3976 def umask(self, mask: int) -> int: 3977 """Change the current umask. 3978 3979 Args: 3980 mask: (int) The new umask value. 3981 3982 Returns: 3983 The old umask. 3984 3985 Raises: 3986 TypeError: if new_mask is of an invalid type. 3987 """ 3988 if not is_int_type(mask): 3989 raise TypeError('an integer is required') 3990 old_umask = self.filesystem.umask 3991 self.filesystem.umask = mask 3992 return old_umask 3993 3994 def chdir(self, path: AnyStr) -> None: 3995 """Change current working directory to target directory. 3996 3997 Args: 3998 path: The path to new current working directory. 3999 4000 Raises: 4001 OSError: if user lacks permission to enter the argument directory 4002 or if the target is not a directory. 4003 """ 4004 try: 4005 path = self.filesystem.resolve_path( 4006 path, allow_fd=True) 4007 except OSError as exc: 4008 if self.filesystem.is_macos and exc.errno == errno.EBADF: 4009 raise OSError(errno.ENOTDIR, "Not a directory: " + str(path)) 4010 raise 4011 self.filesystem.confirmdir(path) 4012 directory = self.filesystem.resolve(path) 4013 # A full implementation would check permissions all the way 4014 # up the tree. 4015 if not is_root() and not directory.st_mode | PERM_EXE: 4016 self.filesystem.raise_os_error(errno.EACCES, directory.name) 4017 self.filesystem.cwd = path # type: ignore[assignment] 4018 4019 def getcwd(self) -> str: 4020 """Return current working directory.""" 4021 return to_string(self.filesystem.cwd) 4022 4023 def getcwdb(self) -> bytes: 4024 """Return current working directory as bytes.""" 4025 return to_bytes(self.filesystem.cwd) 4026 4027 def listdir(self, path: AnyStr) -> List[AnyStr]: 4028 """Return a list of file names in target_directory. 4029 4030 Args: 4031 path: Path to the target directory within the fake 4032 filesystem. 4033 4034 Returns: 4035 A list of file names within the target directory in arbitrary 4036 order. 4037 4038 Raises: 4039 OSError: if the target is not a directory. 4040 """ 4041 return self.filesystem.listdir(path) 4042 4043 XATTR_CREATE = 1 4044 XATTR_REPLACE = 2 4045 4046 def getxattr(self, path: AnyStr, attribute: AnyString, *, 4047 follow_symlinks: bool = True) -> Optional[bytes]: 4048 """Return the value of the given extended filesystem attribute for 4049 `path`. 4050 4051 Args: 4052 path: File path, file descriptor or path-like object (for 4053 Python >= 3.6). 4054 attribute: (str or bytes) The attribute name. 4055 follow_symlinks: (bool) If True (the default), symlinks in the 4056 path are traversed. 4057 4058 Returns: 4059 The contents of the extended attribute as bytes or None if 4060 the attribute does not exist. 4061 4062 Raises: 4063 OSError: if the path does not exist. 4064 """ 4065 if not self.filesystem.is_linux: 4066 raise AttributeError( 4067 "module 'os' has no attribute 'getxattr'") 4068 4069 if isinstance(attribute, bytes): 4070 attribute = attribute.decode(sys.getfilesystemencoding()) 4071 file_obj = self.filesystem.resolve(path, follow_symlinks, 4072 allow_fd=True) 4073 return file_obj.xattr.get(attribute) 4074 4075 def listxattr(self, path: Optional[AnyStr] = None, *, 4076 follow_symlinks: bool = True) -> List[str]: 4077 """Return a list of the extended filesystem attributes on `path`. 4078 4079 Args: 4080 path: File path, file descriptor or path-like object (for 4081 Python >= 3.6). If None, the current directory is used. 4082 follow_symlinks: (bool) If True (the default), symlinks in the 4083 path are traversed. 4084 4085 Returns: 4086 A list of all attribute names for the given path as str. 4087 4088 Raises: 4089 OSError: if the path does not exist. 4090 """ 4091 if not self.filesystem.is_linux: 4092 raise AttributeError( 4093 "module 'os' has no attribute 'listxattr'") 4094 4095 path_str = self.filesystem.cwd if path is None else path 4096 file_obj = self.filesystem.resolve( 4097 cast(AnyStr, path_str), follow_symlinks, allow_fd=True) 4098 return list(file_obj.xattr.keys()) 4099 4100 def removexattr(self, path: AnyStr, attribute: AnyString, *, 4101 follow_symlinks: bool = True) -> None: 4102 """Removes the extended filesystem attribute attribute from `path`. 4103 4104 Args: 4105 path: File path, file descriptor or path-like object (for 4106 Python >= 3.6). 4107 attribute: (str or bytes) The attribute name. 4108 follow_symlinks: (bool) If True (the default), symlinks in the 4109 path are traversed. 4110 4111 Raises: 4112 OSError: if the path does not exist. 4113 """ 4114 if not self.filesystem.is_linux: 4115 raise AttributeError( 4116 "module 'os' has no attribute 'removexattr'") 4117 4118 if isinstance(attribute, bytes): 4119 attribute = attribute.decode(sys.getfilesystemencoding()) 4120 file_obj = self.filesystem.resolve(path, follow_symlinks, 4121 allow_fd=True) 4122 if attribute in file_obj.xattr: 4123 del file_obj.xattr[attribute] 4124 4125 def setxattr(self, path: AnyStr, attribute: AnyString, value: bytes, 4126 flags: int = 0, *, follow_symlinks: bool = True) -> None: 4127 """Sets the value of the given extended filesystem attribute for 4128 `path`. 4129 4130 Args: 4131 path: File path, file descriptor or path-like object (for 4132 Python >= 3.6). 4133 attribute: The attribute name (str or bytes). 4134 value: (byte-like) The value to be set. 4135 follow_symlinks: (bool) If True (the default), symlinks in the 4136 path are traversed. 4137 4138 Raises: 4139 OSError: if the path does not exist. 4140 TypeError: if `value` is not a byte-like object. 4141 """ 4142 if not self.filesystem.is_linux: 4143 raise AttributeError( 4144 "module 'os' has no attribute 'setxattr'") 4145 4146 if isinstance(attribute, bytes): 4147 attribute = attribute.decode(sys.getfilesystemencoding()) 4148 if not is_byte_string(value): 4149 raise TypeError('a bytes-like object is required') 4150 file_obj = self.filesystem.resolve(path, follow_symlinks, 4151 allow_fd=True) 4152 exists = attribute in file_obj.xattr 4153 if exists and flags == self.XATTR_CREATE: 4154 self.filesystem.raise_os_error(errno.ENODATA, file_obj.path) 4155 if not exists and flags == self.XATTR_REPLACE: 4156 self.filesystem.raise_os_error(errno.EEXIST, file_obj.path) 4157 file_obj.xattr[attribute] = value 4158 4159 def scandir(self, path: str = '.') -> ScanDirIter: 4160 """Return an iterator of DirEntry objects corresponding to the 4161 entries in the directory given by path. 4162 4163 Args: 4164 path: Path to the target directory within the fake filesystem. 4165 4166 Returns: 4167 An iterator to an unsorted list of os.DirEntry objects for 4168 each entry in path. 4169 4170 Raises: 4171 OSError: if the target is not a directory. 4172 """ 4173 return scandir(self.filesystem, path) 4174 4175 def walk(self, top: AnyStr, topdown: bool = True, 4176 onerror: Optional[bool] = None, 4177 followlinks: bool = False): 4178 """Perform an os.walk operation over the fake filesystem. 4179 4180 Args: 4181 top: The root directory from which to begin walk. 4182 topdown: Determines whether to return the tuples with the root as 4183 the first entry (`True`) or as the last, after all the child 4184 directory tuples (`False`). 4185 onerror: If not `None`, function which will be called to handle the 4186 `os.error` instance provided when `os.listdir()` fails. 4187 followlinks: If `True`, symbolic links are followed. 4188 4189 Yields: 4190 (path, directories, nondirectories) for top and each of its 4191 subdirectories. See the documentation for the builtin os module 4192 for further details. 4193 """ 4194 return walk(self.filesystem, top, topdown, onerror, followlinks) 4195 4196 def readlink(self, path: AnyStr, dir_fd: Optional[int] = None) -> str: 4197 """Read the target of a symlink. 4198 4199 Args: 4200 path: Symlink to read the target of. 4201 dir_fd: If not `None`, the file descriptor of a directory, 4202 with `path` being relative to this directory. 4203 4204 Returns: 4205 the string representing the path to which the symbolic link points. 4206 4207 Raises: 4208 TypeError: if `path` is None 4209 OSError: (with errno=ENOENT) if path is not a valid path, or 4210 (with errno=EINVAL) if path is valid, but is not a symlink 4211 """ 4212 path = self._path_with_dir_fd(path, self.readlink, dir_fd) 4213 return self.filesystem.readlink(path) 4214 4215 def stat(self, path: AnyStr, *, dir_fd: Optional[int] = None, 4216 follow_symlinks: bool = True) -> FakeStatResult: 4217 """Return the os.stat-like tuple for the FakeFile object of entry_path. 4218 4219 Args: 4220 path: path to filesystem object to retrieve. 4221 dir_fd: (int) If not `None`, the file descriptor of a directory, 4222 with `entry_path` being relative to this directory. 4223 follow_symlinks: (bool) If `False` and `entry_path` points to a 4224 symlink, the link itself is changed instead of the linked 4225 object. 4226 4227 Returns: 4228 The FakeStatResult object corresponding to entry_path. 4229 4230 Raises: 4231 OSError: if the filesystem object doesn't exist. 4232 """ 4233 path = self._path_with_dir_fd(path, self.stat, dir_fd) 4234 return self.filesystem.stat(path, follow_symlinks) 4235 4236 def lstat(self, path: AnyStr, *, 4237 dir_fd: Optional[int] = None) -> FakeStatResult: 4238 """Return the os.stat-like tuple for entry_path, not following symlinks. 4239 4240 Args: 4241 path: path to filesystem object to retrieve. 4242 dir_fd: If not `None`, the file descriptor of a directory, with 4243 `path` being relative to this directory. 4244 4245 Returns: 4246 the FakeStatResult object corresponding to `path`. 4247 4248 Raises: 4249 OSError: if the filesystem object doesn't exist. 4250 """ 4251 # stat should return the tuple representing return value of os.stat 4252 path = self._path_with_dir_fd(path, self.lstat, dir_fd) 4253 return self.filesystem.stat(path, follow_symlinks=False) 4254 4255 def remove(self, path: AnyStr, dir_fd: Optional[int] = None) -> None: 4256 """Remove the FakeFile object at the specified file path. 4257 4258 Args: 4259 path: Path to file to be removed. 4260 dir_fd: If not `None`, the file descriptor of a directory, 4261 with `path` being relative to this directory. 4262 4263 Raises: 4264 OSError: if path points to a directory. 4265 OSError: if path does not exist. 4266 OSError: if removal failed. 4267 """ 4268 path = self._path_with_dir_fd(path, self.remove, dir_fd) 4269 self.filesystem.remove(path) 4270 4271 def unlink(self, path: AnyStr, *, dir_fd: Optional[int] = None) -> None: 4272 """Remove the FakeFile object at the specified file path. 4273 4274 Args: 4275 path: Path to file to be removed. 4276 dir_fd: If not `None`, the file descriptor of a directory, 4277 with `path` being relative to this directory. 4278 4279 Raises: 4280 OSError: if path points to a directory. 4281 OSError: if path does not exist. 4282 OSError: if removal failed. 4283 """ 4284 path = self._path_with_dir_fd(path, self.unlink, dir_fd) 4285 self.filesystem.remove(path) 4286 4287 def rename(self, src: AnyStr, dst: AnyStr, *, 4288 src_dir_fd: Optional[int] = None, 4289 dst_dir_fd: Optional[int] = None) -> None: 4290 """Rename a FakeFile object at old_file_path to new_file_path, 4291 preserving all properties. 4292 Also replaces existing new_file_path object, if one existed 4293 (Unix only). 4294 4295 Args: 4296 src: Path to filesystem object to rename. 4297 dst: Path to where the filesystem object will live 4298 after this call. 4299 src_dir_fd: If not `None`, the file descriptor of a directory, 4300 with `src` being relative to this directory. 4301 dst_dir_fd: If not `None`, the file descriptor of a directory, 4302 with `dst` being relative to this directory. 4303 4304 Raises: 4305 OSError: if old_file_path does not exist. 4306 OSError: if new_file_path is an existing directory. 4307 OSError: if new_file_path is an existing file (Windows only) 4308 OSError: if new_file_path is an existing file and could not 4309 be removed (Unix) 4310 OSError: if `dirname(new_file)` does not exist 4311 OSError: if the file would be moved to another filesystem 4312 (e.g. mount point) 4313 """ 4314 src = self._path_with_dir_fd(src, self.rename, src_dir_fd) 4315 dst = self._path_with_dir_fd(dst, self.rename, dst_dir_fd) 4316 self.filesystem.rename(src, dst) 4317 4318 def replace(self, src: AnyStr, dst: AnyStr, *, 4319 src_dir_fd: Optional[int] = None, 4320 dst_dir_fd: Optional[int] = None) -> None: 4321 """Renames a FakeFile object at old_file_path to new_file_path, 4322 preserving all properties. 4323 Also replaces existing new_file_path object, if one existed. 4324 4325 Arg 4326 src: Path to filesystem object to rename. 4327 dst: Path to where the filesystem object will live 4328 after this call. 4329 src_dir_fd: If not `None`, the file descriptor of a directory, 4330 with `src` being relative to this directory. 4331 dst_dir_fd: If not `None`, the file descriptor of a directory, 4332 with `dst` being relative to this directory. 4333 4334 Raises: 4335 OSError: if old_file_path does not exist. 4336 OSError: if new_file_path is an existing directory. 4337 OSError: if new_file_path is an existing file and could 4338 not be removed 4339 OSError: if `dirname(new_file)` does not exist 4340 OSError: if the file would be moved to another filesystem 4341 (e.g. mount point) 4342 """ 4343 src = self._path_with_dir_fd(src, self.rename, src_dir_fd) 4344 dst = self._path_with_dir_fd(dst, self.rename, dst_dir_fd) 4345 self.filesystem.rename(src, dst, force_replace=True) 4346 4347 def rmdir(self, path: AnyStr, *, dir_fd: Optional[int] = None) -> None: 4348 """Remove a leaf Fake directory. 4349 4350 Args: 4351 path: (str) Name of directory to remove. 4352 dir_fd: If not `None`, the file descriptor of a directory, 4353 with `path` being relative to this directory. 4354 4355 Raises: 4356 OSError: if `path` does not exist or is not a directory, 4357 or as per FakeFilesystem.remove_object. Cannot remove '.'. 4358 """ 4359 path = self._path_with_dir_fd(path, self.rmdir, dir_fd) 4360 self.filesystem.rmdir(path) 4361 4362 def removedirs(self, name: AnyStr) -> None: 4363 """Remove a leaf fake directory and all empty intermediate ones. 4364 4365 Args: 4366 name: the directory to be removed. 4367 4368 Raises: 4369 OSError: if target_directory does not exist or is not a directory. 4370 OSError: if target_directory is not empty. 4371 """ 4372 name = self.filesystem.absnormpath(name) 4373 directory = self.filesystem.confirmdir(name) 4374 if directory.entries: 4375 self.filesystem.raise_os_error( 4376 errno.ENOTEMPTY, self.path.basename(name)) 4377 else: 4378 self.rmdir(name) 4379 head, tail = self.path.split(name) 4380 if not tail: 4381 head, tail = self.path.split(head) 4382 while head and tail: 4383 head_dir = self.filesystem.confirmdir(head) 4384 if head_dir.entries: 4385 break 4386 # only the top-level dir may not be a symlink 4387 self.filesystem.rmdir(head, allow_symlink=True) 4388 head, tail = self.path.split(head) 4389 4390 def mkdir(self, path: AnyStr, mode: int = PERM_DEF, *, 4391 dir_fd: Optional[int] = None) -> None: 4392 """Create a leaf Fake directory. 4393 4394 Args: 4395 path: (str) Name of directory to create. 4396 Relative paths are assumed to be relative to '/'. 4397 mode: (int) Mode to create directory with. This argument defaults 4398 to 0o777. The umask is applied to this mode. 4399 dir_fd: If not `None`, the file descriptor of a directory, 4400 with `path` being relative to this directory. 4401 4402 Raises: 4403 OSError: if the directory name is invalid or parent directory is 4404 read only or as per FakeFilesystem.add_object. 4405 """ 4406 path = self._path_with_dir_fd(path, self.mkdir, dir_fd) 4407 try: 4408 self.filesystem.makedir(path, mode) 4409 except OSError as e: 4410 if e.errno == errno.EACCES: 4411 self.filesystem.raise_os_error(e.errno, path) 4412 raise 4413 4414 def makedirs(self, name: AnyStr, mode: int = PERM_DEF, 4415 exist_ok: bool = None) -> None: 4416 """Create a leaf Fake directory + create any non-existent parent dirs. 4417 4418 Args: 4419 name: (str) Name of directory to create. 4420 mode: (int) Mode to create directory (and any necessary parent 4421 directories) with. This argument defaults to 0o777. 4422 The umask is applied to this mode. 4423 exist_ok: (boolean) If exist_ok is False (the default), an OSError 4424 is raised if the target directory already exists. 4425 4426 Raises: 4427 OSError: if the directory already exists and exist_ok=False, or as 4428 per :py:meth:`FakeFilesystem.create_dir`. 4429 """ 4430 if exist_ok is None: 4431 exist_ok = False 4432 self.filesystem.makedirs(name, mode, exist_ok) 4433 4434 def _path_with_dir_fd(self, path: AnyStr, fct: Callable, 4435 dir_fd: Optional[int]) -> AnyStr: 4436 """Return the path considering dir_fd. Raise on invalid parameters.""" 4437 try: 4438 path = make_string_path(path) 4439 except TypeError: 4440 # the error is handled later 4441 path = path 4442 if dir_fd is not None: 4443 # check if fd is supported for the built-in real function 4444 real_fct = getattr(os, fct.__name__) 4445 if real_fct not in self.supports_dir_fd: 4446 raise NotImplementedError( 4447 'dir_fd unavailable on this platform') 4448 if isinstance(path, int): 4449 raise ValueError("%s: Can't specify dir_fd without " 4450 "matching path_str" % fct.__name__) 4451 if not self.path.isabs(path): 4452 open_file = self.filesystem.get_open_file(dir_fd) 4453 return self.path.join( # type: ignore[type-var, return-value] 4454 cast(FakeFile, open_file.get_object()).path, path) 4455 return path 4456 4457 def truncate(self, path: AnyStr, length: int) -> None: 4458 """Truncate the file corresponding to path, so that it is 4459 length bytes in size. If length is larger than the current size, 4460 the file is filled up with zero bytes. 4461 4462 Args: 4463 path: (str or int) Path to the file, or an integer file 4464 descriptor for the file object. 4465 length: (int) Length of the file after truncating it. 4466 4467 Raises: 4468 OSError: if the file does not exist or the file descriptor is 4469 invalid. 4470 """ 4471 file_object = self.filesystem.resolve(path, allow_fd=True) 4472 file_object.size = length 4473 4474 def ftruncate(self, fd: int, length: int) -> None: 4475 """Truncate the file corresponding to fd, so that it is 4476 length bytes in size. If length is larger than the current size, 4477 the file is filled up with zero bytes. 4478 4479 Args: 4480 fd: (int) File descriptor for the file object. 4481 length: (int) Maximum length of the file after truncating it. 4482 4483 Raises: 4484 OSError: if the file descriptor is invalid 4485 """ 4486 file_object = self.filesystem.get_open_file(fd).get_object() 4487 if isinstance(file_object, FakeFileWrapper): 4488 file_object.size = length 4489 else: 4490 raise OSError(errno.EBADF, 'Invalid file descriptor') 4491 4492 def access(self, path: AnyStr, mode: int, *, 4493 dir_fd: Optional[int] = None, 4494 effective_ids: bool = False, 4495 follow_symlinks: bool = True) -> bool: 4496 """Check if a file exists and has the specified permissions. 4497 4498 Args: 4499 path: (str) Path to the file. 4500 mode: (int) Permissions represented as a bitwise-OR combination of 4501 os.F_OK, os.R_OK, os.W_OK, and os.X_OK. 4502 dir_fd: If not `None`, the file descriptor of a directory, with 4503 `path` being relative to this directory. 4504 effective_ids: (bool) Unused. Only here to match the signature. 4505 follow_symlinks: (bool) If `False` and `path` points to a symlink, 4506 the link itself is queried instead of the linked object. 4507 4508 Returns: 4509 bool, `True` if file is accessible, `False` otherwise. 4510 """ 4511 if effective_ids and self.filesystem.is_windows_fs: 4512 raise NotImplementedError( 4513 'access: effective_ids unavailable on this platform') 4514 path = self._path_with_dir_fd(path, self.access, dir_fd) 4515 try: 4516 stat_result = self.stat(path, follow_symlinks=follow_symlinks) 4517 except OSError as os_error: 4518 if os_error.errno == errno.ENOENT: 4519 return False 4520 raise 4521 if is_root(): 4522 mode &= ~os.W_OK 4523 return (mode & ((stat_result.st_mode >> 6) & 7)) == mode 4524 4525 def chmod(self, path: AnyStr, mode: int, *, 4526 dir_fd: Optional[int] = None, 4527 follow_symlinks: bool = True) -> None: 4528 """Change the permissions of a file as encoded in integer mode. 4529 4530 Args: 4531 path: (str) Path to the file. 4532 mode: (int) Permissions. 4533 dir_fd: If not `None`, the file descriptor of a directory, with 4534 `path` being relative to this directory. 4535 follow_symlinks: (bool) If `False` and `path` points to a symlink, 4536 the link itself is queried instead of the linked object. 4537 """ 4538 if (not follow_symlinks and 4539 (os.chmod not in os.supports_follow_symlinks or IS_PYPY)): 4540 raise NotImplementedError( 4541 "`follow_symlinks` for chmod() is not available " 4542 "on this system") 4543 path = self._path_with_dir_fd(path, self.chmod, dir_fd) 4544 self.filesystem.chmod(path, mode, follow_symlinks) 4545 4546 def lchmod(self, path: AnyStr, mode: int) -> None: 4547 """Change the permissions of a file as encoded in integer mode. 4548 If the file is a link, the permissions of the link are changed. 4549 4550 Args: 4551 path: (str) Path to the file. 4552 mode: (int) Permissions. 4553 """ 4554 if self.filesystem.is_windows_fs: 4555 raise NameError("name 'lchmod' is not defined") 4556 self.filesystem.chmod(path, mode, follow_symlinks=False) 4557 4558 def utime(self, path: AnyStr, 4559 times: Optional[Tuple[Union[int, float], Union[int, float]]] = 4560 None, ns: Optional[Tuple[int, int]] = None, 4561 dir_fd: Optional[int] = None, 4562 follow_symlinks: bool = True) -> None: 4563 """Change the access and modified times of a file. 4564 4565 Args: 4566 path: (str) Path to the file. 4567 times: 2-tuple of int or float numbers, of the form (atime, mtime) 4568 which is used to set the access and modified times in seconds. 4569 If None, both times are set to the current time. 4570 ns: 2-tuple of int numbers, of the form (atime, mtime) which is 4571 used to set the access and modified times in nanoseconds. 4572 If None, both times are set to the current time. 4573 dir_fd: If not `None`, the file descriptor of a directory, 4574 with `path` being relative to this directory. 4575 follow_symlinks: (bool) If `False` and `path` points to a symlink, 4576 the link itself is queried instead of the linked object. 4577 4578 Raises: 4579 TypeError: If anything other than the expected types is 4580 specified in the passed `times` or `ns` tuple, 4581 or if the tuple length is not equal to 2. 4582 ValueError: If both times and ns are specified. 4583 """ 4584 path = self._path_with_dir_fd(path, self.utime, dir_fd) 4585 self.filesystem.utime( 4586 path, times=times, ns=ns, follow_symlinks=follow_symlinks) 4587 4588 def chown(self, path: AnyStr, uid: int, gid: int, *, 4589 dir_fd: Optional[int] = None, 4590 follow_symlinks: bool = True) -> None: 4591 """Set ownership of a faked file. 4592 4593 Args: 4594 path: (str) Path to the file or directory. 4595 uid: (int) Numeric uid to set the file or directory to. 4596 gid: (int) Numeric gid to set the file or directory to. 4597 dir_fd: (int) If not `None`, the file descriptor of a directory, 4598 with `path` being relative to this directory. 4599 follow_symlinks: (bool) If `False` and path points to a symlink, 4600 the link itself is changed instead of the linked object. 4601 4602 Raises: 4603 OSError: if path does not exist. 4604 4605 `None` is also allowed for `uid` and `gid`. This permits `os.rename` 4606 to use `os.chown` even when the source file `uid` and `gid` are 4607 `None` (unset). 4608 """ 4609 path = self._path_with_dir_fd(path, self.chown, dir_fd) 4610 file_object = self.filesystem.resolve( 4611 path, follow_symlinks, allow_fd=True) 4612 if not ((is_int_type(uid) or uid is None) and 4613 (is_int_type(gid) or gid is None)): 4614 raise TypeError("An integer is required") 4615 if uid != -1: 4616 file_object.st_uid = uid 4617 if gid != -1: 4618 file_object.st_gid = gid 4619 4620 def mknod(self, path: AnyStr, mode: Optional[int] = None, 4621 device: int = 0, *, 4622 dir_fd: Optional[int] = None) -> None: 4623 """Create a filesystem node named 'filename'. 4624 4625 Does not support device special files or named pipes as the real os 4626 module does. 4627 4628 Args: 4629 path: (str) Name of the file to create 4630 mode: (int) Permissions to use and type of file to be created. 4631 Default permissions are 0o666. Only the stat.S_IFREG file type 4632 is supported by the fake implementation. The umask is applied 4633 to this mode. 4634 device: not supported in fake implementation 4635 dir_fd: If not `None`, the file descriptor of a directory, 4636 with `path` being relative to this directory. 4637 4638 Raises: 4639 OSError: if called with unsupported options or the file can not be 4640 created. 4641 """ 4642 if self.filesystem.is_windows_fs: 4643 raise AttributeError("module 'os' has no attribute 'mknode'") 4644 if mode is None: 4645 # note that a default value of 0o600 without a device type is 4646 # documented - this is not how it seems to work 4647 mode = S_IFREG | 0o600 4648 if device or not mode & S_IFREG and not is_root(): 4649 self.filesystem.raise_os_error(errno.EPERM) 4650 4651 path = self._path_with_dir_fd(path, self.mknod, dir_fd) 4652 head, tail = self.path.split(path) 4653 if not tail: 4654 if self.filesystem.exists(head, check_link=True): 4655 self.filesystem.raise_os_error(errno.EEXIST, path) 4656 self.filesystem.raise_os_error(errno.ENOENT, path) 4657 if tail in (matching_string(tail, '.'), matching_string(tail, '..')): 4658 self.filesystem.raise_os_error(errno.ENOENT, path) 4659 if self.filesystem.exists(path, check_link=True): 4660 self.filesystem.raise_os_error(errno.EEXIST, path) 4661 self.filesystem.add_object(head, FakeFile( 4662 tail, mode & ~self.filesystem.umask, 4663 filesystem=self.filesystem)) 4664 4665 def symlink(self, src: AnyStr, dst: AnyStr, *, 4666 dir_fd: Optional[int] = None) -> None: 4667 """Creates the specified symlink, pointed at the specified link target. 4668 4669 Args: 4670 src: The target of the symlink. 4671 dst: Path to the symlink to create. 4672 dir_fd: If not `None`, the file descriptor of a directory, 4673 with `src` being relative to this directory. 4674 4675 Raises: 4676 OSError: if the file already exists. 4677 """ 4678 src = self._path_with_dir_fd(src, self.symlink, dir_fd) 4679 self.filesystem.create_symlink( 4680 dst, src, create_missing_dirs=False) 4681 4682 def link(self, src: AnyStr, dst: AnyStr, *, 4683 src_dir_fd: Optional[int] = None, 4684 dst_dir_fd: Optional[int] = None) -> None: 4685 """Create a hard link at new_path, pointing at old_path. 4686 4687 Args: 4688 src: An existing path to the target file. 4689 dst: The destination path to create a new link at. 4690 src_dir_fd: If not `None`, the file descriptor of a directory, 4691 with `src` being relative to this directory. 4692 dst_dir_fd: If not `None`, the file descriptor of a directory, 4693 with `dst` being relative to this directory. 4694 4695 Raises: 4696 OSError: if something already exists at new_path. 4697 OSError: if the parent directory doesn't exist. 4698 """ 4699 src = self._path_with_dir_fd(src, self.link, src_dir_fd) 4700 dst = self._path_with_dir_fd(dst, self.link, dst_dir_fd) 4701 self.filesystem.link(src, dst) 4702 4703 def fsync(self, fd: int) -> None: 4704 """Perform fsync for a fake file (in other words, do nothing). 4705 4706 Args: 4707 fd: The file descriptor of the open file. 4708 4709 Raises: 4710 OSError: file_des is an invalid file descriptor. 4711 TypeError: file_des is not an integer. 4712 """ 4713 # Throw an error if file_des isn't valid 4714 if 0 <= fd < NR_STD_STREAMS: 4715 self.filesystem.raise_os_error(errno.EINVAL) 4716 file_object = cast(FakeFileWrapper, self.filesystem.get_open_file(fd)) 4717 if self.filesystem.is_windows_fs: 4718 if (not hasattr(file_object, 'allow_update') or 4719 not file_object.allow_update): 4720 self.filesystem.raise_os_error( 4721 errno.EBADF, file_object.file_path) 4722 4723 def fdatasync(self, fd: int) -> None: 4724 """Perform fdatasync for a fake file (in other words, do nothing). 4725 4726 Args: 4727 fd: The file descriptor of the open file. 4728 4729 Raises: 4730 OSError: `fd` is an invalid file descriptor. 4731 TypeError: `fd` is not an integer. 4732 """ 4733 if self.filesystem.is_windows_fs or self.filesystem.is_macos: 4734 raise AttributeError("module 'os' has no attribute 'fdatasync'") 4735 # Throw an error if file_des isn't valid 4736 if 0 <= fd < NR_STD_STREAMS: 4737 self.filesystem.raise_os_error(errno.EINVAL) 4738 self.filesystem.get_open_file(fd) 4739 4740 def sendfile(self, fd_out: int, fd_in: int, 4741 offset: int, count: int) -> int: 4742 """Copy count bytes from file descriptor fd_in to file descriptor 4743 fd_out starting at offset. 4744 4745 Args: 4746 fd_out: The file descriptor of the destination file. 4747 fd_in: The file descriptor of the source file. 4748 offset: The offset in bytes where to start the copy in the 4749 source file. If `None` (Linux only), copying is started at 4750 the current position, and the position is updated. 4751 count: The number of bytes to copy. If 0, all remaining bytes 4752 are copied (MacOs only). 4753 4754 Raises: 4755 OSError: If `fd_in` or `fd_out` is an invalid file descriptor. 4756 TypeError: If `fd_in` or `fd_out` is not an integer. 4757 TypeError: If `offset` is None under MacOs. 4758 """ 4759 if self.filesystem.is_windows_fs: 4760 raise AttributeError("module 'os' has no attribute 'sendfile'") 4761 if 0 <= fd_in < NR_STD_STREAMS: 4762 self.filesystem.raise_os_error(errno.EINVAL) 4763 if 0 <= fd_out < NR_STD_STREAMS: 4764 self.filesystem.raise_os_error(errno.EINVAL) 4765 source = cast(FakeFileWrapper, self.filesystem.get_open_file(fd_in)) 4766 dest = cast(FakeFileWrapper, self.filesystem.get_open_file(fd_out)) 4767 if self.filesystem.is_macos: 4768 if dest.get_object().stat_result.st_mode & 0o777000 != S_IFSOCK: 4769 raise OSError('Socket operation on non-socket') 4770 if offset is None: 4771 if self.filesystem.is_macos: 4772 raise TypeError('None is not a valid offset') 4773 contents = source.read(count) 4774 else: 4775 position = source.tell() 4776 source.seek(offset) 4777 if count == 0 and self.filesystem.is_macos: 4778 contents = source.read() 4779 else: 4780 contents = source.read(count) 4781 source.seek(position) 4782 if contents: 4783 written = dest.write(contents) 4784 dest.flush() 4785 return written 4786 return 0 4787 4788 def __getattr__(self, name: str) -> Any: 4789 """Forwards any unfaked calls to the standard os module.""" 4790 return getattr(self._os_module, name) 4791 4792 4793class FakeIoModule: 4794 """Uses FakeFilesystem to provide a fake io module replacement. 4795 4796 Currently only used to wrap `io.open()` which is an alias to `open()`. 4797 4798 You need a fake_filesystem to use this: 4799 filesystem = fake_filesystem.FakeFilesystem() 4800 my_io_module = fake_filesystem.FakeIoModule(filesystem) 4801 """ 4802 4803 @staticmethod 4804 def dir() -> List[str]: 4805 """Return the list of patched function names. Used for patching 4806 functions imported from the module. 4807 """ 4808 _dir = ['open'] 4809 if sys.version_info >= (3, 8): 4810 _dir.append('open_code') 4811 return _dir 4812 4813 def __init__(self, filesystem: FakeFilesystem): 4814 """ 4815 Args: 4816 filesystem: FakeFilesystem used to provide file system information. 4817 """ 4818 self.filesystem = filesystem 4819 self.skip_names: List[str] = [] 4820 self._io_module = io 4821 4822 def open(self, file: Union[AnyStr, int], 4823 mode: str = 'r', buffering: int = -1, 4824 encoding: Optional[str] = None, 4825 errors: Optional[str] = None, 4826 newline: Optional[str] = None, 4827 closefd: bool = True, 4828 opener: Optional[Callable] = None) -> Union[AnyFileWrapper, 4829 IO[Any]]: 4830 """Redirect the call to FakeFileOpen. 4831 See FakeFileOpen.call() for description. 4832 """ 4833 # workaround for built-in open called from skipped modules (see #552) 4834 # as open is not imported explicitly, we cannot patch it for 4835 # specific modules; instead we check if the caller is a skipped 4836 # module (should work in most cases) 4837 stack = traceback.extract_stack(limit=2) 4838 module_name = os.path.splitext(stack[0].filename)[0] 4839 module_name = module_name.replace(os.sep, '.') 4840 if any([module_name == sn or module_name.endswith('.' + sn) 4841 for sn in self.skip_names]): 4842 return io.open(file, mode, buffering, encoding, errors, 4843 newline, closefd, opener) 4844 fake_open = FakeFileOpen(self.filesystem) 4845 return fake_open(file, mode, buffering, encoding, errors, 4846 newline, closefd, opener) 4847 4848 if sys.version_info >= (3, 8): 4849 def open_code(self, path): 4850 """Redirect the call to open. Note that the behavior of the real 4851 function may be overridden by an earlier call to the 4852 PyFile_SetOpenCodeHook(). This behavior is not reproduced here. 4853 """ 4854 if not isinstance(path, str): 4855 raise TypeError( 4856 "open_code() argument 'path' must be str, not int") 4857 patch_mode = self.filesystem.patch_open_code 4858 if (patch_mode == PatchMode.AUTO and self.filesystem.exists(path) 4859 or patch_mode == PatchMode.ON): 4860 return self.open(path, mode='rb') 4861 # mostly this is used for compiled code - 4862 # don't patch these, as the files are probably in the real fs 4863 return self._io_module.open_code(path) 4864 4865 def __getattr__(self, name): 4866 """Forwards any unfaked calls to the standard io module.""" 4867 return getattr(self._io_module, name) 4868 4869 4870if sys.platform != 'win32': 4871 import fcntl 4872 4873 class FakeFcntlModule: 4874 """Replaces the fcntl module. Only valid under Linux/MacOS, 4875 currently just mocks the functionality away. 4876 """ 4877 4878 @staticmethod 4879 def dir() -> List[str]: 4880 """Return the list of patched function names. Used for patching 4881 functions imported from the module. 4882 """ 4883 return ['fcntl', 'ioctl', 'flock', 'lockf'] 4884 4885 def __init__(self, filesystem: FakeFilesystem): 4886 """ 4887 Args: 4888 filesystem: FakeFilesystem used to provide file system 4889 information (currently not used). 4890 """ 4891 self.filesystem = filesystem 4892 self._fcntl_module = fcntl 4893 4894 def fcntl(self, fd: int, cmd: int, arg: int = 0) -> Union[int, bytes]: 4895 return 0 4896 4897 def ioctl(self, fd: int, request: int, arg: int = 0, 4898 mutate_flag: bool = True) -> Union[int, bytes]: 4899 return 0 4900 4901 def flock(self, fd: int, operation: int) -> None: 4902 pass 4903 4904 def lockf(self, fd: int, cmd: int, len: int = 0, 4905 start: int = 0, whence=0) -> Any: 4906 pass 4907 4908 def __getattr__(self, name): 4909 """Forwards any unfaked calls to the standard fcntl module.""" 4910 return getattr(self._fcntl_module, name) 4911 4912 4913class FakeFileWrapper: 4914 """Wrapper for a stream object for use by a FakeFile object. 4915 4916 If the wrapper has any data written to it, it will propagate to 4917 the FakeFile object on close() or flush(). 4918 """ 4919 4920 def __init__(self, file_object: FakeFile, 4921 file_path: AnyStr, 4922 update: bool, read: bool, append: bool, delete_on_close: bool, 4923 filesystem: FakeFilesystem, 4924 newline: Optional[str], binary: bool, closefd: bool, 4925 encoding: Optional[str], errors: Optional[str], 4926 buffering: int, raw_io: bool, is_stream: bool = False): 4927 self.file_object = file_object 4928 self.file_path = file_path # type: ignore[var-annotated] 4929 self._append = append 4930 self._read = read 4931 self.allow_update = update 4932 self._closefd = closefd 4933 self._file_epoch = file_object.epoch 4934 self.raw_io = raw_io 4935 self._binary = binary 4936 self.is_stream = is_stream 4937 self._changed = False 4938 self._buffer_size = buffering 4939 if self._buffer_size == 0 and not binary: 4940 raise ValueError("can't have unbuffered text I/O") 4941 # buffer_size is ignored in text mode 4942 elif self._buffer_size == -1 or not binary: 4943 self._buffer_size = io.DEFAULT_BUFFER_SIZE 4944 self._use_line_buffer = not binary and buffering == 1 4945 4946 contents = file_object.byte_contents 4947 self._encoding = encoding or locale.getpreferredencoding(False) 4948 errors = errors or 'strict' 4949 self._io: Union[BinaryBufferIO, TextBufferIO] = ( 4950 BinaryBufferIO(contents) if binary 4951 else TextBufferIO(contents, encoding=encoding, 4952 newline=newline, errors=errors) 4953 ) 4954 self._read_whence = 0 4955 self._read_seek = 0 4956 self._flush_pos = 0 4957 if contents: 4958 self._flush_pos = len(contents) 4959 if update: 4960 if not append: 4961 self._io.seek(0) 4962 else: 4963 self._io.seek(self._flush_pos) 4964 self._read_seek = self._io.tell() 4965 4966 if delete_on_close: 4967 assert filesystem, 'delete_on_close=True requires filesystem' 4968 self._filesystem = filesystem 4969 self.delete_on_close = delete_on_close 4970 # override, don't modify FakeFile.name, as FakeFilesystem expects 4971 # it to be the file name only, no directories. 4972 self.name = file_object.opened_as 4973 self.filedes: Optional[int] = None 4974 4975 def __enter__(self) -> 'FakeFileWrapper': 4976 """To support usage of this fake file with the 'with' statement.""" 4977 return self 4978 4979 def __exit__(self, exc_type: Optional[Type[BaseException]], 4980 exc_val: Optional[BaseException], 4981 exc_tb: Optional[TracebackType] 4982 ) -> None: 4983 """To support usage of this fake file with the 'with' statement.""" 4984 self.close() 4985 4986 def _raise(self, message: str) -> NoReturn: 4987 if self.raw_io: 4988 self._filesystem.raise_os_error(errno.EBADF, self.file_path) 4989 raise io.UnsupportedOperation(message) 4990 4991 def get_object(self) -> FakeFile: 4992 """Return the FakeFile object that is wrapped by the current instance. 4993 """ 4994 return self.file_object 4995 4996 def fileno(self) -> int: 4997 """Return the file descriptor of the file object.""" 4998 if self.filedes is not None: 4999 return self.filedes 5000 raise OSError(errno.EBADF, 'Invalid file descriptor') 5001 5002 def close(self) -> None: 5003 """Close the file.""" 5004 # ignore closing a closed file 5005 if not self._is_open(): 5006 return 5007 5008 # for raw io, all writes are flushed immediately 5009 if self.allow_update and not self.raw_io: 5010 self.flush() 5011 if self._filesystem.is_windows_fs and self._changed: 5012 self.file_object.st_mtime = now() 5013 5014 assert self.filedes is not None 5015 if self._closefd: 5016 self._filesystem._close_open_file(self.filedes) 5017 else: 5018 open_files = self._filesystem.open_files[self.filedes] 5019 assert open_files is not None 5020 open_files.remove(self) 5021 if self.delete_on_close: 5022 self._filesystem.remove_object( 5023 self.get_object().path) # type: ignore[arg-type] 5024 5025 @property 5026 def closed(self) -> bool: 5027 """Simulate the `closed` attribute on file.""" 5028 return not self._is_open() 5029 5030 def _try_flush(self, old_pos: int) -> None: 5031 """Try to flush and reset the position if it fails.""" 5032 flush_pos = self._flush_pos 5033 try: 5034 self.flush() 5035 except OSError: 5036 # write failed - reset to previous position 5037 self._io.seek(old_pos) 5038 self._io.truncate() 5039 self._flush_pos = flush_pos 5040 raise 5041 5042 def flush(self) -> None: 5043 """Flush file contents to 'disk'.""" 5044 self._check_open_file() 5045 if self.allow_update and not self.is_stream: 5046 contents = self._io.getvalue() 5047 if self._append: 5048 self._sync_io() 5049 old_contents = self.file_object.byte_contents 5050 assert old_contents is not None 5051 contents = old_contents + contents[self._flush_pos:] 5052 self._set_stream_contents(contents) 5053 else: 5054 self._io.flush() 5055 changed = self.file_object.set_contents(contents, self._encoding) 5056 self.update_flush_pos() 5057 if changed: 5058 if self._filesystem.is_windows_fs: 5059 self._changed = True 5060 else: 5061 current_time = now() 5062 self.file_object.st_ctime = current_time 5063 self.file_object.st_mtime = current_time 5064 self._file_epoch = self.file_object.epoch 5065 5066 if not self.is_stream: 5067 self._flush_related_files() 5068 5069 def update_flush_pos(self) -> None: 5070 self._flush_pos = self._io.tell() 5071 5072 def _flush_related_files(self) -> None: 5073 for open_files in self._filesystem.open_files[3:]: 5074 if open_files is not None: 5075 for open_file in open_files: 5076 if (open_file is not self and 5077 isinstance(open_file, FakeFileWrapper) and 5078 self.file_object == open_file.file_object and 5079 not open_file._append): 5080 open_file._sync_io() 5081 5082 def seek(self, offset: int, whence: int = 0) -> None: 5083 """Move read/write pointer in 'file'.""" 5084 self._check_open_file() 5085 if not self._append: 5086 self._io.seek(offset, whence) 5087 else: 5088 self._read_seek = offset 5089 self._read_whence = whence 5090 if not self.is_stream: 5091 self.flush() 5092 5093 def tell(self) -> int: 5094 """Return the file's current position. 5095 5096 Returns: 5097 int, file's current position in bytes. 5098 """ 5099 self._check_open_file() 5100 if not self.is_stream: 5101 self.flush() 5102 5103 if not self._append: 5104 return self._io.tell() 5105 if self._read_whence: 5106 write_seek = self._io.tell() 5107 self._io.seek(self._read_seek, self._read_whence) 5108 self._read_seek = self._io.tell() 5109 self._read_whence = 0 5110 self._io.seek(write_seek) 5111 return self._read_seek 5112 5113 def _sync_io(self) -> None: 5114 """Update the stream with changes to the file object contents.""" 5115 if self._file_epoch == self.file_object.epoch: 5116 return 5117 5118 contents = self.file_object.byte_contents 5119 assert contents is not None 5120 self._set_stream_contents(contents) 5121 self._file_epoch = self.file_object.epoch 5122 5123 def _set_stream_contents(self, contents: bytes) -> None: 5124 whence = self._io.tell() 5125 self._io.seek(0) 5126 self._io.truncate() 5127 self._io.putvalue(contents) 5128 if not self._append: 5129 self._io.seek(whence) 5130 5131 def _read_wrappers(self, name: str) -> Callable: 5132 """Wrap a stream attribute in a read wrapper. 5133 5134 Returns a read_wrapper which tracks our own read pointer since the 5135 stream object has no concept of a different read and write pointer. 5136 5137 Args: 5138 name: The name of the attribute to wrap. Should be a read call. 5139 5140 Returns: 5141 The read_wrapper function. 5142 """ 5143 io_attr = getattr(self._io, name) 5144 5145 def read_wrapper(*args, **kwargs): 5146 """Wrap all read calls to the stream object. 5147 5148 We do this to track the read pointer separate from the write 5149 pointer. Anything that wants to read from the stream object 5150 while we're in append mode goes through this. 5151 5152 Args: 5153 *args: pass through args 5154 **kwargs: pass through kwargs 5155 Returns: 5156 Wrapped stream object method 5157 """ 5158 self._io.seek(self._read_seek, self._read_whence) 5159 ret_value = io_attr(*args, **kwargs) 5160 self._read_seek = self._io.tell() 5161 self._read_whence = 0 5162 self._io.seek(0, 2) 5163 return ret_value 5164 5165 return read_wrapper 5166 5167 def _other_wrapper(self, name: str) -> Callable: 5168 """Wrap a stream attribute in an other_wrapper. 5169 5170 Args: 5171 name: the name of the stream attribute to wrap. 5172 5173 Returns: 5174 other_wrapper which is described below. 5175 """ 5176 io_attr = getattr(self._io, name) 5177 5178 def other_wrapper(*args, **kwargs): 5179 """Wrap all other calls to the stream Object. 5180 5181 We do this to track changes to the write pointer. Anything that 5182 moves the write pointer in a file open for appending should move 5183 the read pointer as well. 5184 5185 Args: 5186 *args: Pass through args. 5187 **kwargs: Pass through kwargs. 5188 5189 Returns: 5190 Wrapped stream object method. 5191 """ 5192 write_seek = self._io.tell() 5193 ret_value = io_attr(*args, **kwargs) 5194 if write_seek != self._io.tell(): 5195 self._read_seek = self._io.tell() 5196 self._read_whence = 0 5197 5198 return ret_value 5199 5200 return other_wrapper 5201 5202 def _write_wrapper(self, name: str) -> Callable: 5203 """Wrap a stream attribute in a write_wrapper. 5204 5205 Args: 5206 name: the name of the stream attribute to wrap. 5207 5208 Returns: 5209 write_wrapper which is described below. 5210 """ 5211 io_attr = getattr(self._io, name) 5212 5213 def write_wrapper(*args, **kwargs): 5214 """Wrap all other calls to the stream Object. 5215 5216 We do this to track changes to the write pointer. Anything that 5217 moves the write pointer in a file open for appending should move 5218 the read pointer as well. 5219 5220 Args: 5221 *args: Pass through args. 5222 **kwargs: Pass through kwargs. 5223 5224 Returns: 5225 Wrapped stream object method. 5226 """ 5227 old_pos = self._io.tell() 5228 ret_value = io_attr(*args, **kwargs) 5229 new_pos = self._io.tell() 5230 5231 # if the buffer size is exceeded, we flush 5232 use_line_buf = self._use_line_buffer and '\n' in args[0] 5233 if new_pos - self._flush_pos > self._buffer_size or use_line_buf: 5234 flush_all = (new_pos - old_pos > self._buffer_size or 5235 use_line_buf) 5236 # if the current write does not exceed the buffer size, 5237 # we revert to the previous position and flush that, 5238 # otherwise we flush all 5239 if not flush_all: 5240 self._io.seek(old_pos) 5241 self._io.truncate() 5242 self._try_flush(old_pos) 5243 if not flush_all: 5244 ret_value = io_attr(*args, **kwargs) 5245 if self._append: 5246 self._read_seek = self._io.tell() 5247 self._read_whence = 0 5248 return ret_value 5249 5250 return write_wrapper 5251 5252 def _adapt_size_for_related_files(self, size: int) -> None: 5253 for open_files in self._filesystem.open_files[3:]: 5254 if open_files is not None: 5255 for open_file in open_files: 5256 if (open_file is not self and 5257 isinstance(open_file, FakeFileWrapper) and 5258 self.file_object == open_file.file_object and 5259 cast(FakeFileWrapper, open_file)._append): 5260 open_file._read_seek += size 5261 5262 def _truncate_wrapper(self) -> Callable: 5263 """Wrap truncate() to allow flush after truncate. 5264 5265 Returns: 5266 Wrapper which is described below. 5267 """ 5268 io_attr = getattr(self._io, 'truncate') 5269 5270 def truncate_wrapper(*args, **kwargs): 5271 """Wrap truncate call to call flush after truncate.""" 5272 if self._append: 5273 self._io.seek(self._read_seek, self._read_whence) 5274 size = io_attr(*args, **kwargs) 5275 self.flush() 5276 if not self.is_stream: 5277 self.file_object.size = size 5278 buffer_size = len(self._io.getvalue()) 5279 if buffer_size < size: 5280 self._io.seek(buffer_size) 5281 self._io.putvalue(b'\0' * (size - buffer_size)) 5282 self.file_object.set_contents( 5283 self._io.getvalue(), self._encoding) 5284 self._flush_pos = size 5285 self._adapt_size_for_related_files(size - buffer_size) 5286 5287 self.flush() 5288 return size 5289 5290 return truncate_wrapper 5291 5292 def size(self) -> int: 5293 """Return the content size in bytes of the wrapped file.""" 5294 return self.file_object.st_size 5295 5296 def __getattr__(self, name: str) -> Any: 5297 if self.file_object.is_large_file(): 5298 raise FakeLargeFileIoException(self.file_path) 5299 5300 reading = name.startswith('read') or name == 'next' 5301 truncate = name == 'truncate' 5302 writing = name.startswith('write') or truncate 5303 5304 if reading or writing: 5305 self._check_open_file() 5306 if not self._read and reading: 5307 return self._read_error() 5308 if not self.allow_update and writing: 5309 return self._write_error() 5310 5311 if reading: 5312 self._sync_io() 5313 if not self.is_stream: 5314 self.flush() 5315 if not self._filesystem.is_windows_fs: 5316 self.file_object.st_atime = now() 5317 if truncate: 5318 return self._truncate_wrapper() 5319 if self._append: 5320 if reading: 5321 return self._read_wrappers(name) 5322 elif not writing: 5323 return self._other_wrapper(name) 5324 if writing: 5325 return self._write_wrapper(name) 5326 5327 return getattr(self._io, name) 5328 5329 def _read_error(self) -> Callable: 5330 def read_error(*args, **kwargs): 5331 """Throw an error unless the argument is zero.""" 5332 if args and args[0] == 0: 5333 if self._filesystem.is_windows_fs and self.raw_io: 5334 return b'' if self._binary else u'' 5335 self._raise('File is not open for reading.') 5336 5337 return read_error 5338 5339 def _write_error(self) -> Callable: 5340 def write_error(*args, **kwargs): 5341 """Throw an error.""" 5342 if self.raw_io: 5343 if (self._filesystem.is_windows_fs and args 5344 and len(args[0]) == 0): 5345 return 0 5346 self._raise('File is not open for writing.') 5347 5348 return write_error 5349 5350 def _is_open(self) -> bool: 5351 if (self.filedes is not None and 5352 self.filedes < len(self._filesystem.open_files)): 5353 open_files = self._filesystem.open_files[self.filedes] 5354 if open_files is not None and self in open_files: 5355 return True 5356 return False 5357 5358 def _check_open_file(self) -> None: 5359 if not self.is_stream and not self._is_open(): 5360 raise ValueError('I/O operation on closed file') 5361 5362 def __iter__(self) -> Union[Iterator[str], Iterator[bytes]]: 5363 if not self._read: 5364 self._raise('File is not open for reading') 5365 return self._io.__iter__() 5366 5367 def __next__(self): 5368 if not self._read: 5369 self._raise('File is not open for reading') 5370 return next(self._io) 5371 5372 5373class StandardStreamWrapper: 5374 """Wrapper for a system standard stream to be used in open files list. 5375 """ 5376 5377 def __init__(self, stream_object: TextIO): 5378 self._stream_object = stream_object 5379 self.filedes: Optional[int] = None 5380 5381 def get_object(self) -> TextIO: 5382 return self._stream_object 5383 5384 def fileno(self) -> int: 5385 """Return the file descriptor of the wrapped standard stream.""" 5386 if self.filedes is not None: 5387 return self.filedes 5388 raise OSError(errno.EBADF, 'Invalid file descriptor') 5389 5390 def read(self, n: int = -1) -> bytes: 5391 return cast(bytes, self._stream_object.read()) 5392 5393 def close(self) -> None: 5394 """We do not support closing standard streams.""" 5395 pass 5396 5397 def is_stream(self) -> bool: 5398 return True 5399 5400 5401class FakeDirWrapper: 5402 """Wrapper for a FakeDirectory object to be used in open files list. 5403 """ 5404 5405 def __init__(self, file_object: FakeDirectory, 5406 file_path: AnyString, filesystem: FakeFilesystem): 5407 self.file_object = file_object 5408 self.file_path = file_path 5409 self._filesystem = filesystem 5410 self.filedes: Optional[int] = None 5411 5412 def get_object(self) -> FakeDirectory: 5413 """Return the FakeFile object that is wrapped by the current instance. 5414 """ 5415 return self.file_object 5416 5417 def fileno(self) -> int: 5418 """Return the file descriptor of the file object.""" 5419 if self.filedes is not None: 5420 return self.filedes 5421 raise OSError(errno.EBADF, 'Invalid file descriptor') 5422 5423 def close(self) -> None: 5424 """Close the directory.""" 5425 assert self.filedes is not None 5426 self._filesystem._close_open_file(self.filedes) 5427 5428 5429class FakePipeWrapper: 5430 """Wrapper for a read or write descriptor of a real pipe object to be 5431 used in open files list. 5432 """ 5433 5434 def __init__(self, filesystem: FakeFilesystem, 5435 fd: int, can_write: bool, mode: str = ''): 5436 self._filesystem = filesystem 5437 self.fd = fd # the real file descriptor 5438 self.can_write = can_write 5439 self.file_object = None 5440 self.filedes: Optional[int] = None 5441 self.real_file = None 5442 if mode: 5443 self.real_file = open(fd, mode) 5444 5445 def __enter__(self) -> 'FakePipeWrapper': 5446 """To support usage of this fake pipe with the 'with' statement.""" 5447 return self 5448 5449 def __exit__(self, exc_type: Optional[Type[BaseException]], 5450 exc_val: Optional[BaseException], 5451 exc_tb: Optional[TracebackType] 5452 ) -> None: 5453 """To support usage of this fake pipe with the 'with' statement.""" 5454 self.close() 5455 5456 def get_object(self) -> None: 5457 return self.file_object 5458 5459 def fileno(self) -> int: 5460 """Return the fake file descriptor of the pipe object.""" 5461 if self.filedes is not None: 5462 return self.filedes 5463 raise OSError(errno.EBADF, 'Invalid file descriptor') 5464 5465 def read(self, numBytes: int = -1) -> bytes: 5466 """Read from the real pipe.""" 5467 if self.real_file: 5468 return self.real_file.read(numBytes) 5469 return os.read(self.fd, numBytes) 5470 5471 def flush(self) -> None: 5472 """Flush the real pipe?""" 5473 pass 5474 5475 def write(self, contents: bytes) -> int: 5476 """Write to the real pipe.""" 5477 if self.real_file: 5478 return self.real_file.write(contents) 5479 return os.write(self.fd, contents) 5480 5481 def close(self) -> None: 5482 """Close the pipe descriptor.""" 5483 assert self.filedes is not None 5484 open_files = self._filesystem.open_files[self.filedes] 5485 assert open_files is not None 5486 open_files.remove(self) 5487 if self.real_file: 5488 self.real_file.close() 5489 else: 5490 os.close(self.fd) 5491 5492 def readable(self) -> bool: 5493 """The pipe end can either be readable or writable.""" 5494 return not self.can_write 5495 5496 def writable(self) -> bool: 5497 """The pipe end can either be readable or writable.""" 5498 return self.can_write 5499 5500 def seekable(self) -> bool: 5501 """A pipe is not seekable.""" 5502 return False 5503 5504 5505Deprecator.add(FakeFileWrapper, FakeFileWrapper.get_object, 'GetObject') 5506Deprecator.add(FakeFileWrapper, FakeFileWrapper.size, 'Size') 5507 5508 5509class FakeFileOpen: 5510 """Faked `file()` and `open()` function replacements. 5511 5512 Returns FakeFile objects in a FakeFilesystem in place of the `file()` 5513 or `open()` function. 5514 """ 5515 __name__ = 'FakeFileOpen' 5516 5517 def __init__(self, filesystem: FakeFilesystem, 5518 delete_on_close: bool = False, raw_io: bool = False): 5519 """ 5520 Args: 5521 filesystem: FakeFilesystem used to provide file system information 5522 delete_on_close: optional boolean, deletes file on close() 5523 """ 5524 self.filesystem = filesystem 5525 self._delete_on_close = delete_on_close 5526 self.raw_io = raw_io 5527 5528 def __call__(self, *args: Any, **kwargs: Any) -> AnyFileWrapper: 5529 """Redirects calls to file() or open() to appropriate method.""" 5530 return self.call(*args, **kwargs) 5531 5532 def call(self, file_: Union[AnyStr, int], 5533 mode: str = 'r', 5534 buffering: int = -1, 5535 encoding: Optional[str] = None, 5536 errors: Optional[str] = None, 5537 newline: Optional[str] = None, 5538 closefd: bool = True, 5539 opener: Any = None, 5540 open_modes: Optional[_OpenModes] = None) -> AnyFileWrapper: 5541 """Return a file-like object with the contents of the target 5542 file object. 5543 5544 Args: 5545 file_: Path to target file or a file descriptor. 5546 mode: Additional file modes (all modes in `open()` are supported). 5547 buffering: the buffer size used for writing. Data will only be 5548 flushed if buffer size is exceeded. The default (-1) uses a 5549 system specific default buffer size. Text line mode (e.g. 5550 buffering=1 in text mode) is not supported. 5551 encoding: The encoding used to encode unicode strings / decode 5552 bytes. 5553 errors: (str) Defines how encoding errors are handled. 5554 newline: Controls universal newlines, passed to stream object. 5555 closefd: If a file descriptor rather than file name is passed, 5556 and this is set to `False`, then the file descriptor is kept 5557 open when file is closed. 5558 opener: not supported. 5559 open_modes: Modes for opening files if called from low-level API. 5560 5561 Returns: 5562 A file-like object containing the contents of the target file. 5563 5564 Raises: 5565 OSError depending on Python version / call mode: 5566 - if the target object is a directory 5567 - on an invalid path 5568 - if the file does not exist when it should 5569 - if the file exists but should not 5570 - if permission is denied 5571 ValueError: for an invalid mode or mode combination 5572 """ 5573 binary = 'b' in mode 5574 5575 if binary and encoding: 5576 raise ValueError("binary mode doesn't take an encoding argument") 5577 5578 newline, open_modes = self._handle_file_mode(mode, newline, open_modes) 5579 5580 file_object, file_path, filedes, real_path = self._handle_file_arg( 5581 file_) 5582 if file_object is None and file_path is None: 5583 # file must be a fake pipe wrapper, find it... 5584 if (filedes is None or 5585 len(self.filesystem.open_files) <= filedes or 5586 not self.filesystem.open_files[filedes]): 5587 raise OSError(errno.EBADF, 'invalid pipe file descriptor') 5588 wrappers = self.filesystem.open_files[filedes] 5589 assert wrappers is not None 5590 existing_wrapper = wrappers[0] 5591 assert isinstance(existing_wrapper, FakePipeWrapper) 5592 wrapper = FakePipeWrapper(self.filesystem, existing_wrapper.fd, 5593 existing_wrapper.can_write, mode) 5594 file_des = self.filesystem._add_open_file(wrapper) 5595 wrapper.filedes = file_des 5596 return wrapper 5597 5598 assert file_path is not None 5599 if not filedes: 5600 closefd = True 5601 5602 if (open_modes.must_not_exist and 5603 (file_object or self.filesystem.islink(file_path) and 5604 not self.filesystem.is_windows_fs)): 5605 self.filesystem.raise_os_error(errno.EEXIST, file_path) 5606 5607 assert real_path is not None 5608 file_object = self._init_file_object(file_object, 5609 file_path, open_modes, 5610 real_path) 5611 5612 if S_ISDIR(file_object.st_mode): 5613 if self.filesystem.is_windows_fs: 5614 self.filesystem.raise_os_error(errno.EACCES, file_path) 5615 else: 5616 self.filesystem.raise_os_error(errno.EISDIR, file_path) 5617 5618 # If you print obj.name, the argument to open() must be printed. 5619 # Not the abspath, not the filename, but the actual argument. 5620 file_object.opened_as = file_path 5621 if open_modes.truncate: 5622 current_time = now() 5623 file_object.st_mtime = current_time 5624 if not self.filesystem.is_windows_fs: 5625 file_object.st_ctime = current_time 5626 5627 fakefile = FakeFileWrapper(file_object, 5628 file_path, 5629 update=open_modes.can_write, 5630 read=open_modes.can_read, 5631 append=open_modes.append, 5632 delete_on_close=self._delete_on_close, 5633 filesystem=self.filesystem, 5634 newline=newline, 5635 binary=binary, 5636 closefd=closefd, 5637 encoding=encoding, 5638 errors=errors, 5639 buffering=buffering, 5640 raw_io=self.raw_io) 5641 if filedes is not None: 5642 fakefile.filedes = filedes 5643 # replace the file wrapper 5644 open_files_list = self.filesystem.open_files[filedes] 5645 assert open_files_list is not None 5646 open_files_list.append(fakefile) 5647 else: 5648 fakefile.filedes = self.filesystem._add_open_file(fakefile) 5649 return fakefile 5650 5651 def _init_file_object(self, file_object: Optional[FakeFile], 5652 file_path: AnyStr, 5653 open_modes: _OpenModes, 5654 real_path: AnyString) -> FakeFile: 5655 if file_object: 5656 if (not is_root() and 5657 ((open_modes.can_read and 5658 not file_object.st_mode & PERM_READ) 5659 or (open_modes.can_write and 5660 not file_object.st_mode & PERM_WRITE))): 5661 self.filesystem.raise_os_error(errno.EACCES, file_path) 5662 if open_modes.can_write: 5663 if open_modes.truncate: 5664 file_object.set_contents('') 5665 else: 5666 if open_modes.must_exist: 5667 self.filesystem.raise_os_error(errno.ENOENT, file_path) 5668 if self.filesystem.islink(file_path): 5669 link_object = self.filesystem.resolve(file_path, 5670 follow_symlinks=False) 5671 assert link_object.contents is not None 5672 target_path = cast(AnyStr, link_object.contents) 5673 else: 5674 target_path = file_path 5675 if self.filesystem.ends_with_path_separator(target_path): 5676 error = ( 5677 errno.EINVAL if self.filesystem.is_windows_fs 5678 else errno.ENOENT if self.filesystem.is_macos 5679 else errno.EISDIR 5680 ) 5681 self.filesystem.raise_os_error(error, file_path) 5682 file_object = self.filesystem.create_file_internally( 5683 real_path, create_missing_dirs=False, 5684 apply_umask=True) 5685 return file_object 5686 5687 def _handle_file_arg(self, file_: Union[AnyStr, int]) -> Tuple[ 5688 Optional[FakeFile], Optional[AnyStr], 5689 Optional[int], Optional[AnyStr]]: 5690 file_object = None 5691 if isinstance(file_, int): 5692 # opening a file descriptor 5693 filedes: int = file_ 5694 wrapper = self.filesystem.get_open_file(filedes) 5695 if isinstance(wrapper, FakePipeWrapper): 5696 return None, None, filedes, None 5697 if isinstance(wrapper, FakeFileWrapper): 5698 self._delete_on_close = wrapper.delete_on_close 5699 file_object = cast(FakeFile, self.filesystem.get_open_file( 5700 filedes).get_object()) 5701 assert file_object is not None 5702 path = file_object.name 5703 return file_object, cast(AnyStr, path), filedes, cast(AnyStr, path) 5704 5705 # open a file file by path 5706 file_path = cast(AnyStr, file_) 5707 if file_path == self.filesystem.dev_null.name: 5708 file_object = self.filesystem.dev_null 5709 real_path = file_path 5710 else: 5711 real_path = self.filesystem.resolve_path(file_path) 5712 if self.filesystem.exists(file_path): 5713 file_object = self.filesystem.get_object_from_normpath( 5714 real_path, check_read_perm=False) 5715 return file_object, file_path, None, real_path 5716 5717 def _handle_file_mode( 5718 self, mode: str, 5719 newline: Optional[str], 5720 open_modes: Optional[_OpenModes]) -> Tuple[Optional[str], 5721 _OpenModes]: 5722 orig_modes = mode # Save original modes for error messages. 5723 # Normalize modes. Handle 't' and 'U'. 5724 if 'b' in mode and 't' in mode: 5725 raise ValueError('Invalid mode: ' + mode) 5726 mode = mode.replace('t', '').replace('b', '') 5727 mode = mode.replace('rU', 'r').replace('U', 'r') 5728 if not self.raw_io: 5729 if mode not in _OPEN_MODE_MAP: 5730 raise ValueError('Invalid mode: %r' % orig_modes) 5731 open_modes = _OpenModes(*_OPEN_MODE_MAP[mode]) 5732 assert open_modes is not None 5733 return newline, open_modes 5734 5735 5736def _run_doctest() -> TestResults: 5737 import doctest 5738 import pyfakefs 5739 return doctest.testmod(pyfakefs.fake_filesystem) 5740 5741 5742if __name__ == '__main__': 5743 _run_doctest() 5744