1# Copyright 2009 Google Inc. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""A fake filesystem implementation for unit testing. 16 17:Includes: 18 * :py:class:`FakeFile`: Provides the appearance of a real file. 19 * :py:class:`FakeDirectory`: Provides the appearance of a real directory. 20 * :py:class:`FakeFilesystem`: Provides the appearance of a real directory 21 hierarchy. 22 * :py:class:`FakeOsModule`: Uses :py:class:`FakeFilesystem` to provide a 23 fake :py:mod:`os` module replacement. 24 * :py:class:`FakeIoModule`: Uses :py:class:`FakeFilesystem` to provide a 25 fake ``io`` module replacement. 26 * :py:class:`FakePathModule`: Faked ``os.path`` module replacement. 27 * :py:class:`FakeFileOpen`: Faked ``file()`` and ``open()`` function 28 replacements. 29 30:Usage: 31 32>>> from pyfakefs import fake_filesystem 33>>> filesystem = fake_filesystem.FakeFilesystem() 34>>> os_module = fake_filesystem.FakeOsModule(filesystem) 35>>> pathname = '/a/new/dir/new-file' 36 37Create a new file object, creating parent directory objects as needed: 38 39>>> os_module.path.exists(pathname) 40False 41>>> new_file = filesystem.create_file(pathname) 42 43File objects can't be overwritten: 44 45>>> os_module.path.exists(pathname) 46True 47>>> try: 48... filesystem.create_file(pathname) 49... except OSError as e: 50... assert e.errno == errno.EEXIST, 'unexpected errno: %d' % e.errno 51... assert e.strerror == 'File exists in the fake filesystem' 52 53Remove a file object: 54 55>>> filesystem.remove_object(pathname) 56>>> os_module.path.exists(pathname) 57False 58 59Create a new file object at the previous path: 60 61>>> beatles_file = filesystem.create_file(pathname, 62... contents='Dear Prudence\\nWon\\'t you come out to play?\\n') 63>>> os_module.path.exists(pathname) 64True 65 66Use the FakeFileOpen class to read fake file objects: 67 68>>> file_module = fake_filesystem.FakeFileOpen(filesystem) 69>>> for line in file_module(pathname): 70... print(line.rstrip()) 71... 72Dear Prudence 73Won't you come out to play? 74 75File objects cannot be treated like directory objects: 76 77>>> try: 78... os_module.listdir(pathname) 79... except OSError as e: 80... assert e.errno == errno.ENOTDIR, 'unexpected errno: %d' % e.errno 81... assert e.strerror == 'Not a directory in the fake filesystem' 82 83The FakeOsModule can list fake directory objects: 84 85>>> os_module.listdir(os_module.path.dirname(pathname)) 86['new-file'] 87 88The FakeOsModule also supports stat operations: 89 90>>> import stat 91>>> stat.S_ISREG(os_module.stat(pathname).st_mode) 92True 93>>> stat.S_ISDIR(os_module.stat(os_module.path.dirname(pathname)).st_mode) 94True 95""" 96import errno 97import heapq 98import io 99import locale 100import os 101import sys 102import time 103import uuid 104from collections import namedtuple 105from stat import ( 106 S_IFREG, S_IFDIR, S_ISLNK, S_IFMT, S_ISDIR, S_IFLNK, S_ISREG, S_IFSOCK 107) 108 109from pyfakefs.deprecator import Deprecator 110from pyfakefs.extra_packages import use_scandir 111from pyfakefs.fake_scandir import scandir, walk 112from pyfakefs.helpers import ( 113 FakeStatResult, FileBufferIO, NullFileBufferIO, 114 is_int_type, is_byte_string, is_unicode_string, 115 make_string_path, IS_WIN, to_string) 116 117__pychecker__ = 'no-reimportself' 118 119__version__ = '4.1dev' 120 121PERM_READ = 0o400 # Read permission bit. 122PERM_WRITE = 0o200 # Write permission bit. 123PERM_EXE = 0o100 # Execute permission bit. 124PERM_DEF = 0o777 # Default permission bits. 125PERM_DEF_FILE = 0o666 # Default permission bits (regular file) 126PERM_ALL = 0o7777 # All permission bits. 127 128_OpenModes = namedtuple( 129 'open_modes', 130 'must_exist can_read can_write truncate append must_not_exist' 131) 132 133_OPEN_MODE_MAP = { 134 # mode name:(file must exist, can read, can write, 135 # truncate, append, must not exist) 136 'r': (True, True, False, False, False, False), 137 'w': (False, False, True, True, False, False), 138 'a': (False, False, True, False, True, False), 139 'r+': (True, True, True, False, False, False), 140 'w+': (False, True, True, True, False, False), 141 'a+': (False, True, True, False, True, False), 142 'x': (False, False, True, False, False, True), 143 'x+': (False, True, True, False, False, True) 144} 145 146if sys.platform.startswith('linux'): 147 # on newer Linux system, the default maximum recursion depth is 40 148 # we ignore older systems here 149 _MAX_LINK_DEPTH = 40 150else: 151 # on MacOS and Windows, the maximum recursion depth is 32 152 _MAX_LINK_DEPTH = 32 153 154NR_STD_STREAMS = 3 155USER_ID = 1 if IS_WIN else os.getuid() 156GROUP_ID = 1 if IS_WIN else os.getgid() 157 158 159def set_uid(uid): 160 """Set the global user id. This is used as st_uid for new files 161 and to differentiate between a normal user and the root user (uid 0). 162 For the root user, some permission restrictions are ignored. 163 164 Args: 165 uid: (int) the user ID of the user calling the file system functions. 166 """ 167 global USER_ID 168 USER_ID = uid 169 170 171def set_gid(gid): 172 """Set the global group id. This is only used to set st_gid for new files, 173 no permision checks are performed. 174 175 Args: 176 gid: (int) the group ID of the user calling the file system functions. 177 """ 178 global GROUP_ID 179 GROUP_ID = gid 180 181 182def reset_ids(): 183 """Set the global user ID and group ID back to default values.""" 184 set_uid(1 if IS_WIN else os.getuid()) 185 set_gid(1 if IS_WIN else os.getgid()) 186 187 188def is_root(): 189 """Return True if the current user is the root user.""" 190 return USER_ID == 0 191 192 193class FakeLargeFileIoException(Exception): 194 """Exception thrown on unsupported operations for fake large files. 195 Fake large files have a size with no real content. 196 """ 197 198 def __init__(self, file_path): 199 super(FakeLargeFileIoException, self).__init__( 200 'Read and write operations not supported for ' 201 'fake large file: %s' % file_path) 202 203 204def _copy_module(old): 205 """Recompiles and creates new module object.""" 206 saved = sys.modules.pop(old.__name__, None) 207 new = __import__(old.__name__) 208 sys.modules[old.__name__] = saved 209 return new 210 211 212class FakeFile: 213 """Provides the appearance of a real file. 214 215 Attributes currently faked out: 216 * `st_mode`: user-specified, otherwise S_IFREG 217 * `st_ctime`: the time.time() timestamp of the file change time (updated 218 each time a file's attributes is modified). 219 * `st_atime`: the time.time() timestamp when the file was last accessed. 220 * `st_mtime`: the time.time() timestamp when the file was last modified. 221 * `st_size`: the size of the file 222 * `st_nlink`: the number of hard links to the file 223 * `st_ino`: the inode number - a unique number identifying the file 224 * `st_dev`: a unique number identifying the (fake) file system device 225 the file belongs to 226 * `st_uid`: always set to USER_ID, which can be changed globally using 227 `set_uid` 228 * `st_gid`: always set to GROUP_ID, which can be changed globally using 229 `set_gid` 230 231 .. note:: The resolution for `st_ctime`, `st_mtime` and `st_atime` in the 232 real file system depends on the used file system (for example it is 233 only 1s for HFS+ and older Linux file systems, but much higher for 234 ext4 and NTFS). This is currently ignored by pyfakefs, which uses 235 the resolution of `time.time()`. 236 237 Under Windows, `st_atime` is not updated for performance reasons by 238 default. pyfakefs never updates `st_atime` under Windows, assuming 239 the default setting. 240 """ 241 stat_types = ( 242 'st_mode', 'st_ino', 'st_dev', 'st_nlink', 'st_uid', 'st_gid', 243 'st_size', 'st_atime', 'st_mtime', 'st_ctime', 244 'st_atime_ns', 'st_mtime_ns', 'st_ctime_ns' 245 ) 246 247 def __init__(self, name, st_mode=S_IFREG | PERM_DEF_FILE, 248 contents=None, filesystem=None, encoding=None, errors=None, 249 side_effect=None): 250 """ 251 Args: 252 name: Name of the file/directory, without parent path information 253 st_mode: The stat.S_IF* constant representing the file type (i.e. 254 stat.S_IFREG, stat.S_IFDIR) 255 contents: The contents of the filesystem object; should be a string 256 or byte object for regular files, and a list of other 257 FakeFile or FakeDirectory objects for FakeDirectory objects 258 filesystem: The fake filesystem where the file is created. 259 encoding: If contents is a unicode string, the encoding used 260 for serialization. 261 errors: The error mode used for encoding/decoding errors. 262 side_effect: function handle that is executed when file is written, 263 must accept the file object as an argument. 264 """ 265 # to be backwards compatible regarding argument order, we raise on None 266 if filesystem is None: 267 raise ValueError('filesystem shall not be None') 268 self.filesystem = filesystem 269 self._side_effect = side_effect 270 self.name = name 271 self.stat_result = FakeStatResult( 272 filesystem.is_windows_fs, USER_ID, GROUP_ID, time.time()) 273 self.stat_result.st_mode = st_mode 274 self.encoding = encoding 275 self.errors = errors or 'strict' 276 self._byte_contents = self._encode_contents(contents) 277 self.stat_result.st_size = ( 278 len(self._byte_contents) if self._byte_contents is not None else 0) 279 self.epoch = 0 280 self.parent_dir = None 281 # Linux specific: extended file system attributes 282 self.xattr = {} 283 284 @property 285 def byte_contents(self): 286 """Return the contents as raw byte array.""" 287 return self._byte_contents 288 289 @property 290 def contents(self): 291 """Return the contents as string with the original encoding.""" 292 if isinstance(self.byte_contents, bytes): 293 return self.byte_contents.decode( 294 self.encoding or locale.getpreferredencoding(False), 295 errors=self.errors) 296 return self.byte_contents 297 298 @property 299 def st_ctime(self): 300 """Return the creation time of the fake file.""" 301 return self.stat_result.st_ctime 302 303 @property 304 def st_atime(self): 305 """Return the access time of the fake file.""" 306 return self.stat_result.st_atime 307 308 @property 309 def st_mtime(self): 310 """Return the modification time of the fake file.""" 311 return self.stat_result.st_mtime 312 313 @st_ctime.setter 314 def st_ctime(self, val): 315 """Set the creation time of the fake file.""" 316 self.stat_result.st_ctime = val 317 318 @st_atime.setter 319 def st_atime(self, val): 320 """Set the access time of the fake file.""" 321 self.stat_result.st_atime = val 322 323 @st_mtime.setter 324 def st_mtime(self, val): 325 """Set the modification time of the fake file.""" 326 self.stat_result.st_mtime = val 327 328 def set_large_file_size(self, st_size): 329 """Sets the self.st_size attribute and replaces self.content with None. 330 331 Provided specifically to simulate very large files without regards 332 to their content (which wouldn't fit in memory). 333 Note that read/write operations with such a file raise 334 :py:class:`FakeLargeFileIoException`. 335 336 Args: 337 st_size: (int) The desired file size 338 339 Raises: 340 OSError: if the st_size is not a non-negative integer, 341 or if st_size exceeds the available file system space 342 """ 343 self._check_positive_int(st_size) 344 if self.st_size: 345 self.size = 0 346 if self.filesystem: 347 self.filesystem.change_disk_usage(st_size, self.name, self.st_dev) 348 self.st_size = st_size 349 self._byte_contents = None 350 351 def _check_positive_int(self, size): 352 # the size should be an positive integer value 353 if not is_int_type(size) or size < 0: 354 self.filesystem.raise_os_error(errno.ENOSPC, self.name) 355 356 def is_large_file(self): 357 """Return `True` if this file was initialized with size but no contents. 358 """ 359 return self._byte_contents is None 360 361 def _encode_contents(self, contents): 362 if is_unicode_string(contents): 363 contents = bytes( 364 contents, 365 self.encoding or locale.getpreferredencoding(False), 366 self.errors) 367 return contents 368 369 def _set_initial_contents(self, contents): 370 """Sets the file contents and size. 371 Called internally after initial file creation. 372 373 Args: 374 contents: string, new content of file. 375 376 Returns: 377 True if the contents have been changed. 378 379 Raises: 380 OSError: if the st_size is not a non-negative integer, 381 or if st_size exceeds the available file system space 382 """ 383 contents = self._encode_contents(contents) 384 changed = self._byte_contents != contents 385 st_size = len(contents) 386 387 if self._byte_contents: 388 self.size = 0 389 current_size = self.st_size or 0 390 self.filesystem.change_disk_usage( 391 st_size - current_size, self.name, self.st_dev) 392 self._byte_contents = contents 393 self.st_size = st_size 394 self.epoch += 1 395 return changed 396 397 def set_contents(self, contents, encoding=None): 398 """Sets the file contents and size and increases the modification time. 399 Also executes the side_effects if available. 400 401 Args: 402 contents: (str, bytes, unicode) new content of file. 403 encoding: (str) the encoding to be used for writing the contents 404 if they are a unicode string. 405 If not given, the locale preferred encoding is used. 406 407 Raises: 408 OSError: if `st_size` is not a non-negative integer, 409 or if it exceeds the available file system space. 410 """ 411 self.encoding = encoding 412 changed = self._set_initial_contents(contents) 413 if self._side_effect is not None: 414 self._side_effect(self) 415 return changed 416 417 @property 418 def size(self): 419 """Return the size in bytes of the file contents. 420 """ 421 return self.st_size 422 423 @property 424 def path(self): 425 """Return the full path of the current object.""" 426 names = [] 427 obj = self 428 while obj: 429 names.insert(0, obj.name) 430 obj = obj.parent_dir 431 sep = self.filesystem._path_separator(self.name) 432 if names[0] == sep: 433 names.pop(0) 434 dir_path = sep.join(names) 435 # Windows paths with drive have a root separator entry 436 # which should be removed 437 is_drive = names and len(names[0]) == 2 and names[0][1] == ':' 438 if not is_drive: 439 dir_path = sep + dir_path 440 else: 441 dir_path = sep.join(names) 442 dir_path = self.filesystem.absnormpath(dir_path) 443 return dir_path 444 445 @Deprecator('property path') 446 def GetPath(self): 447 return self.path 448 449 @Deprecator('property size') 450 def GetSize(self): 451 return self.size 452 453 @size.setter 454 def size(self, st_size): 455 """Resizes file content, padding with nulls if new size exceeds the 456 old size. 457 458 Args: 459 st_size: The desired size for the file. 460 461 Raises: 462 OSError: if the st_size arg is not a non-negative integer 463 or if st_size exceeds the available file system space 464 """ 465 466 self._check_positive_int(st_size) 467 current_size = self.st_size or 0 468 self.filesystem.change_disk_usage( 469 st_size - current_size, self.name, self.st_dev) 470 if self._byte_contents: 471 if st_size < current_size: 472 self._byte_contents = self._byte_contents[:st_size] 473 else: 474 self._byte_contents += b'\0' * (st_size - current_size) 475 self.st_size = st_size 476 self.epoch += 1 477 478 @Deprecator('property size') 479 def SetSize(self, value): 480 self.size = value 481 482 @Deprecator('property st_atime') 483 def SetATime(self, st_atime): 484 """Set the self.st_atime attribute. 485 486 Args: 487 st_atime: The desired access time. 488 """ 489 self.st_atime = st_atime 490 491 @Deprecator('property st_mtime') 492 def SetMTime(self, st_mtime): 493 """Set the self.st_mtime attribute. 494 495 Args: 496 st_mtime: The desired modification time. 497 """ 498 self.st_mtime = st_mtime 499 500 @Deprecator('property st_ctime') 501 def SetCTime(self, st_ctime): 502 """Set the self.st_ctime attribute. 503 504 Args: 505 st_ctime: The desired creation time. 506 """ 507 self.st_ctime = st_ctime 508 509 def __getattr__(self, item): 510 """Forward some properties to stat_result.""" 511 if item in self.stat_types: 512 return getattr(self.stat_result, item) 513 return super(FakeFile, self).__getattr__(item) 514 515 def __setattr__(self, key, value): 516 """Forward some properties to stat_result.""" 517 if key in self.stat_types: 518 return setattr(self.stat_result, key, value) 519 return super(FakeFile, self).__setattr__(key, value) 520 521 def __str__(self): 522 return '%s(%o)' % (self.name, self.st_mode) 523 524 @Deprecator('st_ino') 525 def SetIno(self, st_ino): 526 """Set the self.st_ino attribute. 527 Note that a unique inode is assigned automatically to a new fake file. 528 This function does not guarantee uniqueness and should be used with 529 caution. 530 531 Args: 532 st_ino: (int) The desired inode. 533 """ 534 self.st_ino = st_ino 535 536 537class FakeNullFile(FakeFile): 538 def __init__(self, filesystem): 539 devnull = '/dev/nul' if filesystem.is_windows_fs else '/dev/nul' 540 super(FakeNullFile, self).__init__( 541 devnull, filesystem=filesystem, contents=b'') 542 543 @property 544 def byte_contents(self): 545 return b'' 546 547 def _set_initial_contents(self, contents): 548 pass 549 550 551Deprecator.add(FakeFile, FakeFile.set_large_file_size, 'SetLargeFileSize') 552Deprecator.add(FakeFile, FakeFile.set_contents, 'SetContents') 553Deprecator.add(FakeFile, FakeFile.is_large_file, 'IsLargeFile') 554 555 556class FakeFileFromRealFile(FakeFile): 557 """Represents a fake file copied from the real file system. 558 559 The contents of the file are read on demand only. 560 """ 561 562 def __init__(self, file_path, filesystem, side_effect=None): 563 """ 564 Args: 565 file_path: Path to the existing file. 566 filesystem: The fake filesystem where the file is created. 567 568 Raises: 569 OSError: if the file does not exist in the real file system. 570 OSError: if the file already exists in the fake file system. 571 """ 572 super(FakeFileFromRealFile, self).__init__( 573 name=os.path.basename(file_path), filesystem=filesystem, 574 side_effect=side_effect) 575 self.contents_read = False 576 577 @property 578 def byte_contents(self): 579 if not self.contents_read: 580 self.contents_read = True 581 with io.open(self.file_path, 'rb') as f: 582 self._byte_contents = f.read() 583 # On MacOS and BSD, the above io.open() updates atime on the real file 584 self.st_atime = os.stat(self.file_path).st_atime 585 return self._byte_contents 586 587 def set_contents(self, contents, encoding=None): 588 self.contents_read = True 589 super(FakeFileFromRealFile, self).set_contents(contents, encoding) 590 591 def is_large_file(self): 592 """The contents are never faked.""" 593 return False 594 595 596class FakeDirectory(FakeFile): 597 """Provides the appearance of a real directory.""" 598 599 def __init__(self, name, perm_bits=PERM_DEF, filesystem=None): 600 """ 601 Args: 602 name: name of the file/directory, without parent path information 603 perm_bits: permission bits. defaults to 0o777. 604 filesystem: if set, the fake filesystem where the directory 605 is created 606 """ 607 FakeFile.__init__( 608 self, name, S_IFDIR | perm_bits, {}, filesystem=filesystem) 609 # directories have the link count of contained entries, 610 # inclusing '.' and '..' 611 self.st_nlink += 1 612 613 def set_contents(self, contents, encoding=None): 614 raise self.filesystem.raise_os_error(errno.EISDIR, self.path) 615 616 @property 617 def contents(self): 618 """Return the list of contained directory entries.""" 619 return self.byte_contents 620 621 @property 622 def ordered_dirs(self): 623 """Return the list of contained directory entry names ordered by 624 creation order. 625 """ 626 return [item[0] for item in sorted( 627 self.byte_contents.items(), key=lambda entry: entry[1].st_ino)] 628 629 def add_entry(self, path_object): 630 """Adds a child FakeFile to this directory. 631 632 Args: 633 path_object: FakeFile instance to add as a child of this directory. 634 635 Raises: 636 OSError: if the directory has no write permission (Posix only) 637 OSError: if the file or directory to be added already exists 638 """ 639 if (not is_root() and not self.st_mode & PERM_WRITE and 640 not self.filesystem.is_windows_fs): 641 raise OSError(errno.EACCES, 'Permission Denied', self.path) 642 643 path_object_name = to_string(path_object.name) 644 if path_object_name in self.contents: 645 self.filesystem.raise_os_error(errno.EEXIST, self.path) 646 647 self.contents[path_object_name] = path_object 648 path_object.parent_dir = self 649 if path_object.st_ino is None: 650 self.filesystem.last_ino += 1 651 path_object.st_ino = self.filesystem.last_ino 652 self.st_nlink += 1 653 path_object.st_nlink += 1 654 path_object.st_dev = self.st_dev 655 if path_object.st_nlink == 1: 656 self.filesystem.change_disk_usage( 657 path_object.size, path_object.name, self.st_dev) 658 659 def get_entry(self, pathname_name): 660 """Retrieves the specified child file or directory entry. 661 662 Args: 663 pathname_name: The basename of the child object to retrieve. 664 665 Returns: 666 The fake file or directory object. 667 668 Raises: 669 KeyError: if no child exists by the specified name. 670 """ 671 pathname_name = self._normalized_entryname(pathname_name) 672 return self.contents[to_string(pathname_name)] 673 674 def _normalized_entryname(self, pathname_name): 675 if not self.filesystem.is_case_sensitive: 676 matching_names = [name for name in self.contents 677 if name.lower() == pathname_name.lower()] 678 if matching_names: 679 pathname_name = matching_names[0] 680 return pathname_name 681 682 def remove_entry(self, pathname_name, recursive=True): 683 """Removes the specified child file or directory. 684 685 Args: 686 pathname_name: Basename of the child object to remove. 687 recursive: If True (default), the entries in contained directories 688 are deleted first. Used to propagate removal errors 689 (e.g. permission problems) from contained entries. 690 691 Raises: 692 KeyError: if no child exists by the specified name. 693 OSError: if user lacks permission to delete the file, 694 or (Windows only) the file is open. 695 """ 696 pathname_name = self._normalized_entryname(pathname_name) 697 entry = self.get_entry(pathname_name) 698 if self.filesystem.is_windows_fs: 699 if entry.st_mode & PERM_WRITE == 0: 700 self.filesystem.raise_os_error(errno.EACCES, pathname_name) 701 if self.filesystem.has_open_file(entry): 702 self.filesystem.raise_os_error(errno.EACCES, pathname_name) 703 else: 704 if (not is_root() and (self.st_mode & (PERM_WRITE | PERM_EXE) != 705 PERM_WRITE | PERM_EXE)): 706 self.filesystem.raise_os_error(errno.EACCES, pathname_name) 707 708 if recursive and isinstance(entry, FakeDirectory): 709 while entry.contents: 710 entry.remove_entry(list(entry.contents)[0]) 711 elif entry.st_nlink == 1: 712 self.filesystem.change_disk_usage( 713 -entry.size, pathname_name, entry.st_dev) 714 715 self.st_nlink -= 1 716 entry.st_nlink -= 1 717 assert entry.st_nlink >= 0 718 719 del self.contents[to_string(pathname_name)] 720 721 @property 722 def size(self): 723 """Return the total size of all files contained in this directory tree. 724 """ 725 return sum([item[1].size for item in self.contents.items()]) 726 727 @Deprecator('property size') 728 def GetSize(self): 729 return self.size 730 731 def has_parent_object(self, dir_object): 732 """Return `True` if dir_object is a direct or indirect parent 733 directory, or if both are the same object.""" 734 obj = self 735 while obj: 736 if obj == dir_object: 737 return True 738 obj = obj.parent_dir 739 return False 740 741 def __str__(self): 742 description = super(FakeDirectory, self).__str__() + ':\n' 743 for item in self.contents: 744 item_desc = self.contents[item].__str__() 745 for line in item_desc.split('\n'): 746 if line: 747 description = description + ' ' + line + '\n' 748 return description 749 750 751Deprecator.add(FakeDirectory, FakeDirectory.add_entry, 'AddEntry') 752Deprecator.add(FakeDirectory, FakeDirectory.get_entry, 'GetEntry') 753Deprecator.add(FakeDirectory, FakeDirectory.set_contents, 'SetContents') 754Deprecator.add(FakeDirectory, FakeDirectory.remove_entry, 'RemoveEntry') 755 756 757class FakeDirectoryFromRealDirectory(FakeDirectory): 758 """Represents a fake directory copied from the real file system. 759 760 The contents of the directory are read on demand only. 761 """ 762 763 def __init__(self, source_path, filesystem, read_only, 764 target_path=None): 765 """ 766 Args: 767 source_path: Full directory path. 768 filesystem: The fake filesystem where the directory is created. 769 read_only: If set, all files under the directory are treated 770 as read-only, e.g. a write access raises an exception; 771 otherwise, writing to the files changes the fake files 772 only as usually. 773 target_path: If given, the target path of the directory, 774 otherwise the target is the same as `source_path`. 775 776 Raises: 777 OSError: if the directory does not exist in the real file system 778 """ 779 target_path = target_path or source_path 780 real_stat = os.stat(source_path) 781 super(FakeDirectoryFromRealDirectory, self).__init__( 782 name=os.path.split(target_path)[1], 783 perm_bits=real_stat.st_mode, 784 filesystem=filesystem) 785 786 self.st_ctime = real_stat.st_ctime 787 self.st_atime = real_stat.st_atime 788 self.st_mtime = real_stat.st_mtime 789 self.st_gid = real_stat.st_gid 790 self.st_uid = real_stat.st_uid 791 self.source_path = source_path 792 self.read_only = read_only 793 self.contents_read = False 794 795 @property 796 def contents(self): 797 """Return the list of contained directory entries, loading them 798 if not already loaded.""" 799 if not self.contents_read: 800 self.contents_read = True 801 base = self.path 802 for entry in os.listdir(self.source_path): 803 source_path = os.path.join(self.source_path, entry) 804 target_path = os.path.join(base, entry) 805 if os.path.islink(source_path): 806 self.filesystem.add_real_symlink(source_path, target_path) 807 elif os.path.isdir(source_path): 808 self.filesystem.add_real_directory( 809 source_path, self.read_only, target_path=target_path) 810 else: 811 self.filesystem.add_real_file( 812 source_path, self.read_only, target_path=target_path) 813 return self.byte_contents 814 815 @property 816 def size(self): 817 # we cannot get the size until the contents are loaded 818 if not self.contents_read: 819 return 0 820 return super(FakeDirectoryFromRealDirectory, self).size 821 822 823class FakeFilesystem: 824 """Provides the appearance of a real directory tree for unit testing. 825 826 Attributes: 827 path_separator: The path separator, corresponds to `os.path.sep`. 828 alternative_path_separator: Corresponds to `os.path.altsep`. 829 is_windows_fs: `True` in a real or faked Windows file system. 830 is_macos: `True` under MacOS, or if we are faking it. 831 is_case_sensitive: `True` if a case-sensitive file system is assumed. 832 root: The root :py:class:`FakeDirectory` entry of the file system. 833 cwd: The current working directory path. 834 umask: The umask used for newly created files, see `os.umask`. 835 patcher: Holds the Patcher object if created from it. Allows access 836 to the patcher object if using the pytest fs fixture. 837 """ 838 839 def __init__(self, path_separator=os.path.sep, total_size=None, 840 patcher=None): 841 """ 842 Args: 843 path_separator: optional substitute for os.path.sep 844 total_size: if not None, the total size in bytes of the 845 root filesystem. 846 847 Example usage to use the same path separator under all systems: 848 849 >>> filesystem = FakeFilesystem(path_separator='/') 850 851 """ 852 self.path_separator = path_separator 853 self.alternative_path_separator = os.path.altsep 854 self.patcher = patcher 855 if path_separator != os.sep: 856 self.alternative_path_separator = None 857 858 # is_windows_fs can be used to test the behavior of pyfakefs under 859 # Windows fs on non-Windows systems and vice verse; 860 # is it used to support drive letters, UNC paths and some other 861 # Windows-specific features 862 self.is_windows_fs = sys.platform == 'win32' 863 864 # can be used to test some MacOS-specific behavior under other systems 865 self.is_macos = sys.platform == 'darwin' 866 867 # is_case_sensitive can be used to test pyfakefs for case-sensitive 868 # file systems on non-case-sensitive systems and vice verse 869 self.is_case_sensitive = not (self.is_windows_fs or self.is_macos) 870 871 self.root = FakeDirectory(self.path_separator, filesystem=self) 872 self.cwd = self.root.name 873 874 # We can't query the current value without changing it: 875 self.umask = os.umask(0o22) 876 os.umask(self.umask) 877 878 # A list of open file objects. Their position in the list is their 879 # file descriptor number 880 self.open_files = [] 881 # A heap containing all free positions in self.open_files list 882 self._free_fd_heap = [] 883 # last used numbers for inodes (st_ino) and devices (st_dev) 884 self.last_ino = 0 885 self.last_dev = 0 886 self.mount_points = {} 887 self.add_mount_point(self.root.name, total_size) 888 self._add_standard_streams() 889 self.dev_null = FakeNullFile(self) 890 891 @property 892 def is_linux(self): 893 return not self.is_windows_fs and not self.is_macos 894 895 def reset(self, total_size=None): 896 """Remove all file system contents and reset the root.""" 897 self.root = FakeDirectory(self.path_separator, filesystem=self) 898 self.cwd = self.root.name 899 900 self.open_files = [] 901 self._free_fd_heap = [] 902 self.last_ino = 0 903 self.last_dev = 0 904 self.mount_points = {} 905 self.add_mount_point(self.root.name, total_size) 906 self._add_standard_streams() 907 908 def pause(self): 909 """Pause the patching of the file system modules until `resume` is 910 called. After that call, all file system calls are executed in the 911 real file system. 912 Calling pause() twice is silently ignored. 913 Only allowed if the file system object was created by a 914 Patcher object. This is also the case for the pytest `fs` fixture. 915 916 Raises: 917 RuntimeError: if the file system was not created by a Patcher. 918 """ 919 if self.patcher is None: 920 raise RuntimeError('pause() can only be called from a fake file ' 921 'system object created by a Patcher object') 922 self.patcher.pause() 923 924 def resume(self): 925 """Resume the patching of the file system modules if `pause` has 926 been called before. After that call, all file system calls are 927 executed in the fake file system. 928 Does nothing if patching is not paused. 929 Raises: 930 RuntimeError: if the file system has not been created by `Patcher`. 931 """ 932 if self.patcher is None: 933 raise RuntimeError('resume() can only be called from a fake file ' 934 'system object created by a Patcher object') 935 self.patcher.resume() 936 937 def line_separator(self): 938 return '\r\n' if self.is_windows_fs else '\n' 939 940 def _error_message(self, errno): 941 return os.strerror(errno) + ' in the fake filesystem' 942 943 def raise_os_error(self, errno, filename=None, winerror=None): 944 """Raises OSError. 945 The error message is constructed from the given error code and shall 946 start with the error string issued in the real system. 947 Note: this is not true under Windows if winerror is given - in this 948 case a localized message specific to winerror will be shown in the 949 real file system. 950 951 Args: 952 errno: A numeric error code from the C variable errno. 953 filename: The name of the affected file, if any. 954 winerror: Windows only - the specific Windows error code. 955 """ 956 message = self._error_message(errno) 957 if (winerror is not None and sys.platform == 'win32' and 958 self.is_windows_fs): 959 raise OSError(errno, message, filename, winerror) 960 raise OSError(errno, message, filename) 961 962 @staticmethod 963 def _matching_string(matched, string): 964 """Return the string as byte or unicode depending 965 on the type of matched, assuming string is an ASCII string. 966 """ 967 if string is None: 968 return string 969 if isinstance(matched, bytes) and isinstance(string, str): 970 return string.encode(locale.getpreferredencoding(False)) 971 return string 972 973 def _path_separator(self, path): 974 """Return the path separator as the same type as path""" 975 return self._matching_string(path, self.path_separator) 976 977 def _alternative_path_separator(self, path): 978 """Return the alternative path separator as the same type as path""" 979 return self._matching_string(path, self.alternative_path_separator) 980 981 def add_mount_point(self, path, total_size=None): 982 """Add a new mount point for a filesystem device. 983 The mount point gets a new unique device number. 984 985 Args: 986 path: The root path for the new mount path. 987 988 total_size: The new total size of the added filesystem device 989 in bytes. Defaults to infinite size. 990 991 Returns: 992 The newly created mount point dict. 993 994 Raises: 995 OSError: if trying to mount an existing mount point again. 996 """ 997 path = self.absnormpath(path) 998 if path in self.mount_points: 999 self.raise_os_error(errno.EEXIST, path) 1000 self.last_dev += 1 1001 self.mount_points[path] = { 1002 'idev': self.last_dev, 'total_size': total_size, 'used_size': 0 1003 } 1004 # special handling for root path: has been created before 1005 if path == self.root.name: 1006 root_dir = self.root 1007 self.last_ino += 1 1008 root_dir.st_ino = self.last_ino 1009 else: 1010 root_dir = self.create_dir(path) 1011 root_dir.st_dev = self.last_dev 1012 return self.mount_points[path] 1013 1014 def _auto_mount_drive_if_needed(self, path, force=False): 1015 if (self.is_windows_fs and 1016 (force or not self._mount_point_for_path(path))): 1017 drive = self.splitdrive(path)[0] 1018 if drive: 1019 return self.add_mount_point(path=drive) 1020 1021 def _mount_point_for_path(self, path): 1022 def to_str(string): 1023 """Convert the str, unicode or byte object to a str 1024 using the default encoding.""" 1025 if string is None or isinstance(string, str): 1026 return string 1027 return string.decode(locale.getpreferredencoding(False)) 1028 1029 path = self.absnormpath(self._original_path(path)) 1030 if path in self.mount_points: 1031 return self.mount_points[path] 1032 mount_path = self._matching_string(path, '') 1033 drive = self.splitdrive(path)[:1] 1034 for root_path in self.mount_points: 1035 root_path = self._matching_string(path, root_path) 1036 if drive and not root_path.startswith(drive): 1037 continue 1038 if path.startswith(root_path) and len(root_path) > len(mount_path): 1039 mount_path = root_path 1040 if mount_path: 1041 return self.mount_points[to_str(mount_path)] 1042 mount_point = self._auto_mount_drive_if_needed(path, force=True) 1043 assert mount_point 1044 return mount_point 1045 1046 def _mount_point_for_device(self, idev): 1047 for mount_point in self.mount_points.values(): 1048 if mount_point['idev'] == idev: 1049 return mount_point 1050 1051 def get_disk_usage(self, path=None): 1052 """Return the total, used and free disk space in bytes as named tuple, 1053 or placeholder values simulating unlimited space if not set. 1054 1055 .. note:: This matches the return value of shutil.disk_usage(). 1056 1057 Args: 1058 path: The disk space is returned for the file system device where 1059 `path` resides. 1060 Defaults to the root path (e.g. '/' on Unix systems). 1061 """ 1062 DiskUsage = namedtuple('usage', 'total, used, free') 1063 if path is None: 1064 mount_point = self.mount_points[self.root.name] 1065 else: 1066 mount_point = self._mount_point_for_path(path) 1067 if mount_point and mount_point['total_size'] is not None: 1068 return DiskUsage(mount_point['total_size'], 1069 mount_point['used_size'], 1070 mount_point['total_size'] - 1071 mount_point['used_size']) 1072 return DiskUsage( 1073 1024 * 1024 * 1024 * 1024, 0, 1024 * 1024 * 1024 * 1024) 1074 1075 def set_disk_usage(self, total_size, path=None): 1076 """Changes the total size of the file system, preserving the used space. 1077 Example usage: set the size of an auto-mounted Windows drive. 1078 1079 Args: 1080 total_size: The new total size of the filesystem in bytes. 1081 1082 path: The disk space is changed for the file system device where 1083 `path` resides. 1084 Defaults to the root path (e.g. '/' on Unix systems). 1085 1086 Raises: 1087 OSError: if the new space is smaller than the used size. 1088 """ 1089 if path is None: 1090 path = self.root.name 1091 mount_point = self._mount_point_for_path(path) 1092 if (mount_point['total_size'] is not None and 1093 mount_point['used_size'] > total_size): 1094 self.raise_os_error(errno.ENOSPC, path) 1095 mount_point['total_size'] = total_size 1096 1097 def change_disk_usage(self, usage_change, file_path, st_dev): 1098 """Change the used disk space by the given amount. 1099 1100 Args: 1101 usage_change: Number of bytes added to the used space. 1102 If negative, the used space will be decreased. 1103 1104 file_path: The path of the object needing the disk space. 1105 1106 st_dev: The device ID for the respective file system. 1107 1108 Raises: 1109 OSError: if usage_change exceeds the free file system space 1110 """ 1111 mount_point = self._mount_point_for_device(st_dev) 1112 if mount_point: 1113 total_size = mount_point['total_size'] 1114 if total_size is not None: 1115 if total_size - mount_point['used_size'] < usage_change: 1116 self.raise_os_error(errno.ENOSPC, file_path) 1117 mount_point['used_size'] += usage_change 1118 1119 def stat(self, entry_path, follow_symlinks=True): 1120 """Return the os.stat-like tuple for the FakeFile object of entry_path. 1121 1122 Args: 1123 entry_path: Path to filesystem object to retrieve. 1124 follow_symlinks: If False and entry_path points to a symlink, 1125 the link itself is inspected instead of the linked object. 1126 1127 Returns: 1128 The FakeStatResult object corresponding to entry_path. 1129 1130 Raises: 1131 OSError: if the filesystem object doesn't exist. 1132 """ 1133 # stat should return the tuple representing return value of os.stat 1134 file_object = self.resolve( 1135 entry_path, follow_symlinks, 1136 allow_fd=True, check_read_perm=False) 1137 if not is_root(): 1138 # make sure stat raises if a parent dir is not readable 1139 parent_dir = file_object.parent_dir 1140 if parent_dir: 1141 self.get_object(parent_dir.path) 1142 1143 self.raise_for_filepath_ending_with_separator( 1144 entry_path, file_object, follow_symlinks) 1145 1146 return file_object.stat_result.copy() 1147 1148 def raise_for_filepath_ending_with_separator(self, entry_path, 1149 file_object, 1150 follow_symlinks=True, 1151 macos_handling=False): 1152 if self.ends_with_path_separator(entry_path): 1153 if S_ISLNK(file_object.st_mode): 1154 try: 1155 link_object = self.resolve(entry_path) 1156 except OSError as exc: 1157 if self.is_macos and exc.errno != errno.ENOENT: 1158 return 1159 if self.is_windows_fs: 1160 self.raise_os_error(errno.EINVAL, entry_path) 1161 raise 1162 if not follow_symlinks or self.is_windows_fs or self.is_macos: 1163 file_object = link_object 1164 if self.is_windows_fs: 1165 is_error = S_ISREG(file_object.st_mode) 1166 elif self.is_macos and macos_handling: 1167 is_error = not S_ISLNK(file_object.st_mode) 1168 else: 1169 is_error = not S_ISDIR(file_object.st_mode) 1170 if is_error: 1171 error_nr = (errno.EINVAL if self.is_windows_fs 1172 else errno.ENOTDIR) 1173 self.raise_os_error(error_nr, entry_path) 1174 1175 def chmod(self, path, mode, follow_symlinks=True): 1176 """Change the permissions of a file as encoded in integer mode. 1177 1178 Args: 1179 path: (str) Path to the file. 1180 mode: (int) Permissions. 1181 follow_symlinks: If `False` and `path` points to a symlink, 1182 the link itself is affected instead of the linked object. 1183 """ 1184 file_object = self.resolve(path, follow_symlinks, allow_fd=True) 1185 if self.is_windows_fs: 1186 if mode & PERM_WRITE: 1187 file_object.st_mode = file_object.st_mode | 0o222 1188 else: 1189 file_object.st_mode = file_object.st_mode & 0o777555 1190 else: 1191 file_object.st_mode = ((file_object.st_mode & ~PERM_ALL) | 1192 (mode & PERM_ALL)) 1193 file_object.st_ctime = time.time() 1194 1195 def utime(self, path, times=None, *, ns=None, follow_symlinks=True): 1196 """Change the access and modified times of a file. 1197 1198 Args: 1199 path: (str) Path to the file. 1200 times: 2-tuple of int or float numbers, of the form (atime, mtime) 1201 which is used to set the access and modified times in seconds. 1202 If None, both times are set to the current time. 1203 ns: 2-tuple of int numbers, of the form (atime, mtime) which is 1204 used to set the access and modified times in nanoseconds. 1205 If `None`, both times are set to the current time. 1206 follow_symlinks: If `False` and entry_path points to a symlink, 1207 the link itself is queried instead of the linked object. 1208 1209 Raises: 1210 TypeError: If anything other than the expected types is 1211 specified in the passed `times` or `ns` tuple, 1212 or if the tuple length is not equal to 2. 1213 ValueError: If both times and ns are specified. 1214 """ 1215 self._handle_utime_arg_errors(ns, times) 1216 1217 file_object = self.resolve(path, follow_symlinks, allow_fd=True) 1218 if times is not None: 1219 for file_time in times: 1220 if not isinstance(file_time, (int, float)): 1221 raise TypeError('atime and mtime must be numbers') 1222 1223 file_object.st_atime = times[0] 1224 file_object.st_mtime = times[1] 1225 elif ns is not None: 1226 for file_time in ns: 1227 if not isinstance(file_time, int): 1228 raise TypeError('atime and mtime must be ints') 1229 1230 file_object.st_atime_ns = ns[0] 1231 file_object.st_mtime_ns = ns[1] 1232 else: 1233 current_time = time.time() 1234 file_object.st_atime = current_time 1235 file_object.st_mtime = current_time 1236 1237 def _handle_utime_arg_errors(self, ns, times): 1238 if times is not None and ns is not None: 1239 raise ValueError( 1240 "utime: you may specify either 'times' or 'ns' but not both") 1241 if times is not None and len(times) != 2: 1242 raise TypeError( 1243 "utime: 'times' must be either a tuple of two ints or None") 1244 if ns is not None and len(ns) != 2: 1245 raise TypeError("utime: 'ns' must be a tuple of two ints") 1246 1247 @Deprecator 1248 def SetIno(self, path, st_ino): 1249 """Set the self.st_ino attribute of file at 'path'. 1250 Note that a unique inode is assigned automatically to a new fake file. 1251 Using this function does not guarantee uniqueness and should used 1252 with caution. 1253 1254 Args: 1255 path: Path to file. 1256 st_ino: The desired inode. 1257 """ 1258 self.get_object(path).st_ino = st_ino 1259 1260 def _add_open_file(self, file_obj): 1261 """Add file_obj to the list of open files on the filesystem. 1262 Used internally to manage open files. 1263 1264 The position in the open_files array is the file descriptor number. 1265 1266 Args: 1267 file_obj: File object to be added to open files list. 1268 1269 Returns: 1270 File descriptor number for the file object. 1271 """ 1272 if self._free_fd_heap: 1273 open_fd = heapq.heappop(self._free_fd_heap) 1274 self.open_files[open_fd] = [file_obj] 1275 return open_fd 1276 1277 self.open_files.append([file_obj]) 1278 return len(self.open_files) - 1 1279 1280 def _close_open_file(self, file_des): 1281 """Remove file object with given descriptor from the list 1282 of open files. 1283 1284 Sets the entry in open_files to None. 1285 1286 Args: 1287 file_des: Descriptor of file object to be removed from 1288 open files list. 1289 """ 1290 self.open_files[file_des] = None 1291 heapq.heappush(self._free_fd_heap, file_des) 1292 1293 def get_open_file(self, file_des): 1294 """Return an open file. 1295 1296 Args: 1297 file_des: File descriptor of the open file. 1298 1299 Raises: 1300 OSError: an invalid file descriptor. 1301 TypeError: filedes is not an integer. 1302 1303 Returns: 1304 Open file object. 1305 """ 1306 if not is_int_type(file_des): 1307 raise TypeError('an integer is required') 1308 if (file_des >= len(self.open_files) or 1309 self.open_files[file_des] is None): 1310 self.raise_os_error(errno.EBADF, str(file_des)) 1311 return self.open_files[file_des][0] 1312 1313 def has_open_file(self, file_object): 1314 """Return True if the given file object is in the list of open files. 1315 1316 Args: 1317 file_object: The FakeFile object to be checked. 1318 1319 Returns: 1320 `True` if the file is open. 1321 """ 1322 return (file_object in [wrappers[0].get_object() 1323 for wrappers in self.open_files if wrappers]) 1324 1325 def _normalize_path_sep(self, path): 1326 if self.alternative_path_separator is None or not path: 1327 return path 1328 return path.replace(self._alternative_path_separator(path), 1329 self._path_separator(path)) 1330 1331 def normcase(self, path): 1332 """Replace all appearances of alternative path separator 1333 with path separator. 1334 1335 Do nothing if no alternative separator is set. 1336 1337 Args: 1338 path: The path to be normalized. 1339 1340 Returns: 1341 The normalized path that will be used internally. 1342 """ 1343 path = make_string_path(path) 1344 return self._normalize_path_sep(path) 1345 1346 def normpath(self, path): 1347 """Mimic os.path.normpath using the specified path_separator. 1348 1349 Mimics os.path.normpath using the path_separator that was specified 1350 for this FakeFilesystem. Normalizes the path, but unlike the method 1351 absnormpath, does not make it absolute. Eliminates dot components 1352 (. and ..) and combines repeated path separators (//). Initial .. 1353 components are left in place for relative paths. 1354 If the result is an empty path, '.' is returned instead. 1355 1356 This also replaces alternative path separator with path separator. 1357 That is, it behaves like the real os.path.normpath on Windows if 1358 initialized with '\\' as path separator and '/' as alternative 1359 separator. 1360 1361 Args: 1362 path: (str) The path to normalize. 1363 1364 Returns: 1365 (str) A copy of path with empty components and dot components 1366 removed. 1367 """ 1368 path = self.normcase(path) 1369 drive, path = self.splitdrive(path) 1370 sep = self._path_separator(path) 1371 is_absolute_path = path.startswith(sep) 1372 path_components = path.split(sep) 1373 collapsed_path_components = [] 1374 dot = self._matching_string(path, '.') 1375 dotdot = self._matching_string(path, '..') 1376 for component in path_components: 1377 if (not component) or (component == dot): 1378 continue 1379 if component == dotdot: 1380 if collapsed_path_components and ( 1381 collapsed_path_components[-1] != dotdot): 1382 # Remove an up-reference: directory/.. 1383 collapsed_path_components.pop() 1384 continue 1385 elif is_absolute_path: 1386 # Ignore leading .. components if starting from the 1387 # root directory. 1388 continue 1389 collapsed_path_components.append(component) 1390 collapsed_path = sep.join(collapsed_path_components) 1391 if is_absolute_path: 1392 collapsed_path = sep + collapsed_path 1393 return drive + collapsed_path or dot 1394 1395 def _original_path(self, path): 1396 """Return a normalized case version of the given path for 1397 case-insensitive file systems. For case-sensitive file systems, 1398 return path unchanged. 1399 1400 Args: 1401 path: the file path to be transformed 1402 1403 Returns: 1404 A version of path matching the case of existing path elements. 1405 """ 1406 1407 def components_to_path(): 1408 if len(path_components) > len(normalized_components): 1409 normalized_components.extend( 1410 path_components[len(normalized_components):]) 1411 sep = self._path_separator(path) 1412 normalized_path = sep.join(normalized_components) 1413 if path.startswith(sep) and not normalized_path.startswith(sep): 1414 normalized_path = sep + normalized_path 1415 return normalized_path 1416 1417 if self.is_case_sensitive or not path: 1418 return path 1419 path_components = self._path_components(path) 1420 normalized_components = [] 1421 current_dir = self.root 1422 for component in path_components: 1423 if not isinstance(current_dir, FakeDirectory): 1424 return components_to_path() 1425 dir_name, current_dir = self._directory_content( 1426 current_dir, component) 1427 if current_dir is None or ( 1428 isinstance(current_dir, FakeDirectory) and 1429 current_dir._byte_contents is None and 1430 current_dir.st_size == 0): 1431 return components_to_path() 1432 normalized_components.append(dir_name) 1433 return components_to_path() 1434 1435 def absnormpath(self, path): 1436 """Absolutize and minimalize the given path. 1437 1438 Forces all relative paths to be absolute, and normalizes the path to 1439 eliminate dot and empty components. 1440 1441 Args: 1442 path: Path to normalize. 1443 1444 Returns: 1445 The normalized path relative to the current working directory, 1446 or the root directory if path is empty. 1447 """ 1448 path = self.normcase(path) 1449 cwd = self._matching_string(path, self.cwd) 1450 if not path: 1451 path = self.path_separator 1452 if path == self._matching_string(path, '.'): 1453 path = cwd 1454 elif not self._starts_with_root_path(path): 1455 # Prefix relative paths with cwd, if cwd is not root. 1456 root_name = self._matching_string(path, self.root.name) 1457 empty = self._matching_string(path, '') 1458 path = self._path_separator(path).join( 1459 (cwd != root_name and cwd or empty, path)) 1460 if path == self._matching_string(path, '.'): 1461 path = cwd 1462 return self.normpath(path) 1463 1464 def splitpath(self, path): 1465 """Mimic os.path.splitpath using the specified path_separator. 1466 1467 Mimics os.path.splitpath using the path_separator that was specified 1468 for this FakeFilesystem. 1469 1470 Args: 1471 path: (str) The path to split. 1472 1473 Returns: 1474 (str) A duple (pathname, basename) for which pathname does not 1475 end with a slash, and basename does not contain a slash. 1476 """ 1477 path = self.normcase(path) 1478 sep = self._path_separator(path) 1479 path_components = path.split(sep) 1480 if not path_components: 1481 return ('', '') 1482 1483 starts_with_drive = self._starts_with_drive_letter(path) 1484 basename = path_components.pop() 1485 colon = self._matching_string(path, ':') 1486 if not path_components: 1487 if starts_with_drive: 1488 components = basename.split(colon) 1489 return (components[0] + colon, components[1]) 1490 return ('', basename) 1491 for component in path_components: 1492 if component: 1493 # The path is not the root; it contains a non-separator 1494 # component. Strip all trailing separators. 1495 while not path_components[-1]: 1496 path_components.pop() 1497 if starts_with_drive: 1498 if not path_components: 1499 components = basename.split(colon) 1500 return (components[0] + colon, components[1]) 1501 if (len(path_components) == 1 and 1502 path_components[0].endswith(colon)): 1503 return (path_components[0] + sep, basename) 1504 return (sep.join(path_components), basename) 1505 # Root path. Collapse all leading separators. 1506 return (sep, basename) 1507 1508 def splitdrive(self, path): 1509 """Splits the path into the drive part and the rest of the path. 1510 1511 Taken from Windows specific implementation in Python 3.5 1512 and slightly adapted. 1513 1514 Args: 1515 path: the full path to be splitpath. 1516 1517 Returns: 1518 A tuple of the drive part and the rest of the path, or of 1519 an empty string and the full path if drive letters are 1520 not supported or no drive is present. 1521 """ 1522 path = make_string_path(path) 1523 if self.is_windows_fs: 1524 if len(path) >= 2: 1525 path = self.normcase(path) 1526 sep = self._path_separator(path) 1527 # UNC path handling 1528 if (path[0:2] == sep * 2) and ( 1529 path[2:3] != sep): 1530 # UNC path handling - splits off the mount point 1531 # instead of the drive 1532 sep_index = path.find(sep, 2) 1533 if sep_index == -1: 1534 return path[:0], path 1535 sep_index2 = path.find(sep, sep_index + 1) 1536 if sep_index2 == sep_index + 1: 1537 return path[:0], path 1538 if sep_index2 == -1: 1539 sep_index2 = len(path) 1540 return path[:sep_index2], path[sep_index2:] 1541 if path[1:2] == self._matching_string(path, ':'): 1542 return path[:2], path[2:] 1543 return path[:0], path 1544 1545 def _join_paths_with_drive_support(self, *all_paths): 1546 """Taken from Python 3.5 os.path.join() code in ntpath.py 1547 and slightly adapted""" 1548 base_path = all_paths[0] 1549 paths_to_add = all_paths[1:] 1550 sep = self._path_separator(base_path) 1551 seps = [sep, self._alternative_path_separator(base_path)] 1552 result_drive, result_path = self.splitdrive(base_path) 1553 for path in paths_to_add: 1554 drive_part, path_part = self.splitdrive(path) 1555 if path_part and path_part[:1] in seps: 1556 # Second path is absolute 1557 if drive_part or not result_drive: 1558 result_drive = drive_part 1559 result_path = path_part 1560 continue 1561 elif drive_part and drive_part != result_drive: 1562 if (self.is_case_sensitive or 1563 drive_part.lower() != result_drive.lower()): 1564 # Different drives => ignore the first path entirely 1565 result_drive = drive_part 1566 result_path = path_part 1567 continue 1568 # Same drive in different case 1569 result_drive = drive_part 1570 # Second path is relative to the first 1571 if result_path and result_path[-1:] not in seps: 1572 result_path = result_path + sep 1573 result_path = result_path + path_part 1574 # add separator between UNC and non-absolute path 1575 colon = self._matching_string(base_path, ':') 1576 if (result_path and result_path[:1] not in seps and 1577 result_drive and result_drive[-1:] != colon): 1578 return result_drive + sep + result_path 1579 return result_drive + result_path 1580 1581 def joinpaths(self, *paths): 1582 """Mimic os.path.join using the specified path_separator. 1583 1584 Args: 1585 *paths: (str) Zero or more paths to join. 1586 1587 Returns: 1588 (str) The paths joined by the path separator, starting with 1589 the last absolute path in paths. 1590 """ 1591 if sys.version_info >= (3, 6): 1592 paths = [os.fspath(path) for path in paths] 1593 if len(paths) == 1: 1594 return paths[0] 1595 if self.is_windows_fs: 1596 return self._join_paths_with_drive_support(*paths) 1597 joined_path_segments = [] 1598 sep = self._path_separator(paths[0]) 1599 for path_segment in paths: 1600 if self._starts_with_root_path(path_segment): 1601 # An absolute path 1602 joined_path_segments = [path_segment] 1603 else: 1604 if (joined_path_segments and 1605 not joined_path_segments[-1].endswith(sep)): 1606 joined_path_segments.append(sep) 1607 if path_segment: 1608 joined_path_segments.append(path_segment) 1609 return self._matching_string(paths[0], '').join(joined_path_segments) 1610 1611 def _path_components(self, path): 1612 """Breaks the path into a list of component names. 1613 1614 Does not include the root directory as a component, as all paths 1615 are considered relative to the root directory for the FakeFilesystem. 1616 Callers should basically follow this pattern: 1617 1618 .. code:: python 1619 1620 file_path = self.absnormpath(file_path) 1621 path_components = self._path_components(file_path) 1622 current_dir = self.root 1623 for component in path_components: 1624 if component not in current_dir.contents: 1625 raise OSError 1626 _do_stuff_with_component(current_dir, component) 1627 current_dir = current_dir.get_entry(component) 1628 1629 Args: 1630 path: Path to tokenize. 1631 1632 Returns: 1633 The list of names split from path. 1634 """ 1635 if not path or path == self._path_separator(path): 1636 return [] 1637 drive, path = self.splitdrive(path) 1638 path_components = path.split(self._path_separator(path)) 1639 assert drive or path_components 1640 if not path_components[0]: 1641 if len(path_components) > 1 and not path_components[1]: 1642 path_components = [] 1643 else: 1644 # This is an absolute path. 1645 path_components = path_components[1:] 1646 if drive: 1647 path_components.insert(0, drive) 1648 return path_components 1649 1650 def _starts_with_drive_letter(self, file_path): 1651 """Return True if file_path starts with a drive letter. 1652 1653 Args: 1654 file_path: the full path to be examined. 1655 1656 Returns: 1657 `True` if drive letter support is enabled in the filesystem and 1658 the path starts with a drive letter. 1659 """ 1660 colon = self._matching_string(file_path, ':') 1661 return (self.is_windows_fs and len(file_path) >= 2 and 1662 file_path[:1].isalpha and (file_path[1:2]) == colon) 1663 1664 def _starts_with_root_path(self, file_path): 1665 root_name = self._matching_string(file_path, self.root.name) 1666 file_path = self._normalize_path_sep(file_path) 1667 return (file_path.startswith(root_name) or 1668 not self.is_case_sensitive and file_path.lower().startswith( 1669 root_name.lower()) or 1670 self._starts_with_drive_letter(file_path)) 1671 1672 def _is_root_path(self, file_path): 1673 root_name = self._matching_string(file_path, self.root.name) 1674 return (file_path == root_name or not self.is_case_sensitive and 1675 file_path.lower() == root_name.lower() or 1676 2 <= len(file_path) <= 3 and 1677 self._starts_with_drive_letter(file_path)) 1678 1679 def ends_with_path_separator(self, file_path): 1680 """Return True if ``file_path`` ends with a valid path separator.""" 1681 if is_int_type(file_path): 1682 return False 1683 file_path = make_string_path(file_path) 1684 return (file_path and 1685 file_path not in (self.path_separator, 1686 self.alternative_path_separator) and 1687 (file_path.endswith(self._path_separator(file_path)) or 1688 self.alternative_path_separator is not None and 1689 file_path.endswith( 1690 self._alternative_path_separator(file_path)))) 1691 1692 def is_filepath_ending_with_separator(self, path): 1693 if not self.ends_with_path_separator(path): 1694 return False 1695 return self.isfile(self._path_without_trailing_separators(path)) 1696 1697 def _directory_content(self, directory, component): 1698 if not isinstance(directory, FakeDirectory): 1699 return None, None 1700 if component in directory.contents: 1701 return component, directory.contents[component] 1702 if not self.is_case_sensitive: 1703 matching_content = [(subdir, directory.contents[subdir]) for 1704 subdir in directory.contents 1705 if subdir.lower() == component.lower()] 1706 if matching_content: 1707 return matching_content[0] 1708 1709 return None, None 1710 1711 def exists(self, file_path, check_link=False): 1712 """Return true if a path points to an existing file system object. 1713 1714 Args: 1715 file_path: The path to examine. 1716 1717 Returns: 1718 (bool) True if the corresponding object exists. 1719 1720 Raises: 1721 TypeError: if file_path is None. 1722 """ 1723 if check_link and self.islink(file_path): 1724 return True 1725 file_path = make_string_path(file_path) 1726 if file_path is None: 1727 raise TypeError 1728 if not file_path: 1729 return False 1730 if file_path == self.dev_null.name: 1731 return not self.is_windows_fs or sys.version_info >= (3, 8) 1732 try: 1733 if self.is_filepath_ending_with_separator(file_path): 1734 return False 1735 file_path = self.resolve_path(file_path) 1736 except OSError: 1737 return False 1738 if file_path == self.root.name: 1739 return True 1740 1741 path_components = self._path_components(file_path) 1742 current_dir = self.root 1743 for component in path_components: 1744 current_dir = self._directory_content(current_dir, component)[1] 1745 if not current_dir: 1746 return False 1747 return True 1748 1749 def resolve_path(self, file_path, allow_fd=False, raw_io=True): 1750 """Follow a path, resolving symlinks. 1751 1752 ResolvePath traverses the filesystem along the specified file path, 1753 resolving file names and symbolic links until all elements of the path 1754 are exhausted, or we reach a file which does not exist. 1755 If all the elements are not consumed, they just get appended to the 1756 path resolved so far. 1757 This gives us the path which is as resolved as it can be, even if the 1758 file does not exist. 1759 1760 This behavior mimics Unix semantics, and is best shown by example. 1761 Given a file system that looks like this: 1762 1763 /a/b/ 1764 /a/b/c -> /a/b2 c is a symlink to /a/b2 1765 /a/b2/x 1766 /a/c -> ../d 1767 /a/x -> y 1768 1769 Then: 1770 /a/b/x => /a/b/x 1771 /a/c => /a/d 1772 /a/x => /a/y 1773 /a/b/c/d/e => /a/b2/d/e 1774 1775 Args: 1776 file_path: The path to examine. 1777 allow_fd: If `True`, `file_path` may be open file descriptor. 1778 raw_io: `True` if called from low-level I/O functions. 1779 1780 Returns: 1781 The resolved_path (string) or None. 1782 1783 Raises: 1784 TypeError: if `file_path` is `None`. 1785 OSError: if `file_path` is '' or a part of the path doesn't exist. 1786 """ 1787 1788 if allow_fd and isinstance(file_path, int): 1789 return self.get_open_file(file_path).get_object().path 1790 file_path = make_string_path(file_path) 1791 if file_path is None: 1792 # file.open(None) raises TypeError, so mimic that. 1793 raise TypeError('Expected file system path string, received None') 1794 if not file_path or not self._valid_relative_path(file_path): 1795 # file.open('') raises OSError, so mimic that, and validate that 1796 # all parts of a relative path exist. 1797 self.raise_os_error(errno.ENOENT, file_path) 1798 file_path = self.absnormpath(self._original_path(file_path)) 1799 if self._is_root_path(file_path): 1800 return file_path 1801 if file_path == self.dev_null.name: 1802 return file_path 1803 path_components = self._path_components(file_path) 1804 resolved_components = self._resolve_components(path_components, raw_io) 1805 return self._components_to_path(resolved_components) 1806 1807 def _components_to_path(self, component_folders): 1808 sep = (self._path_separator(component_folders[0]) 1809 if component_folders else self.path_separator) 1810 path = sep.join(component_folders) 1811 if not self._starts_with_root_path(path): 1812 path = sep + path 1813 return path 1814 1815 def _resolve_components(self, path_components, raw_io): 1816 current_dir = self.root 1817 link_depth = 0 1818 resolved_components = [] 1819 while path_components: 1820 component = path_components.pop(0) 1821 resolved_components.append(component) 1822 current_dir = self._directory_content(current_dir, component)[1] 1823 if current_dir is None: 1824 # The component of the path at this point does not actually 1825 # exist in the folder. We can't resolve the path any more. 1826 # It is legal to link to a file that does not yet exist, so 1827 # rather than raise an error, we just append the remaining 1828 # components to what return path we have built so far and 1829 # return that. 1830 resolved_components.extend(path_components) 1831 break 1832 1833 # Resolve any possible symlinks in the current path component. 1834 if S_ISLNK(current_dir.st_mode): 1835 # This link_depth check is not really meant to be an accurate 1836 # check. It is just a quick hack to prevent us from looping 1837 # forever on cycles. 1838 if link_depth > _MAX_LINK_DEPTH: 1839 self.raise_os_error(errno.ELOOP, 1840 self._components_to_path( 1841 resolved_components)) 1842 link_path = self._follow_link(resolved_components, current_dir) 1843 1844 # Following the link might result in the complete replacement 1845 # of the current_dir, so we evaluate the entire resulting path. 1846 target_components = self._path_components(link_path) 1847 path_components = target_components + path_components 1848 resolved_components = [] 1849 current_dir = self.root 1850 link_depth += 1 1851 return resolved_components 1852 1853 def _valid_relative_path(self, file_path): 1854 if self.is_windows_fs: 1855 return True 1856 slash_dotdot = self._matching_string( 1857 file_path, self.path_separator + '..') 1858 while file_path and slash_dotdot in file_path: 1859 file_path = file_path[:file_path.rfind(slash_dotdot)] 1860 if not self.exists(self.absnormpath(file_path)): 1861 return False 1862 return True 1863 1864 def _follow_link(self, link_path_components, link): 1865 """Follow a link w.r.t. a path resolved so far. 1866 1867 The component is either a real file, which is a no-op, or a 1868 symlink. In the case of a symlink, we have to modify the path 1869 as built up so far 1870 /a/b => ../c should yield /a/../c (which will normalize to /a/c) 1871 /a/b => x should yield /a/x 1872 /a/b => /x/y/z should yield /x/y/z 1873 The modified path may land us in a new spot which is itself a 1874 link, so we may repeat the process. 1875 1876 Args: 1877 link_path_components: The resolved path built up to the link 1878 so far. 1879 link: The link object itself. 1880 1881 Returns: 1882 (string) The updated path resolved after following the link. 1883 1884 Raises: 1885 OSError: if there are too many levels of symbolic link 1886 """ 1887 link_path = link.contents 1888 # ignore UNC prefix for local files 1889 if self.is_windows_fs and link_path.startswith('\\\\?\\'): 1890 link_path = link_path[4:] 1891 sep = self._path_separator(link_path) 1892 # For links to absolute paths, we want to throw out everything 1893 # in the path built so far and replace with the link. For relative 1894 # links, we have to append the link to what we have so far, 1895 if not self._starts_with_root_path(link_path): 1896 # Relative path. Append remainder of path to what we have 1897 # processed so far, excluding the name of the link itself. 1898 # /a/b => ../c should yield /a/../c 1899 # (which will normalize to /c) 1900 # /a/b => d should yield a/d 1901 components = link_path_components[:-1] 1902 components.append(link_path) 1903 link_path = sep.join(components) 1904 # Don't call self.NormalizePath(), as we don't want to prepend 1905 # self.cwd. 1906 return self.normpath(link_path) 1907 1908 def get_object_from_normpath(self, file_path, check_read_perm=True): 1909 """Search for the specified filesystem object within the fake 1910 filesystem. 1911 1912 Args: 1913 file_path: Specifies target FakeFile object to retrieve, with a 1914 path that has already been normalized/resolved. 1915 check_read_perm: If True, raises OSError if a parent directory 1916 does not have read permission 1917 1918 Returns: 1919 The FakeFile object corresponding to file_path. 1920 1921 Raises: 1922 OSError: if the object is not found. 1923 """ 1924 file_path = make_string_path(file_path) 1925 if file_path == self.root.name: 1926 return self.root 1927 if file_path == self.dev_null.name: 1928 return self.dev_null 1929 1930 file_path = self._original_path(file_path) 1931 path_components = self._path_components(file_path) 1932 target_object = self.root 1933 try: 1934 for component in path_components: 1935 if S_ISLNK(target_object.st_mode): 1936 target_object = self.resolve(target_object.contents) 1937 if not S_ISDIR(target_object.st_mode): 1938 if not self.is_windows_fs: 1939 self.raise_os_error(errno.ENOTDIR, file_path) 1940 self.raise_os_error(errno.ENOENT, file_path) 1941 target_object = target_object.get_entry(component) 1942 if (not is_root() and check_read_perm and target_object and 1943 not target_object.st_mode & PERM_READ): 1944 self.raise_os_error(errno.EACCES, target_object.path) 1945 except KeyError: 1946 self.raise_os_error(errno.ENOENT, file_path) 1947 return target_object 1948 1949 def get_object(self, file_path, check_read_perm=True): 1950 """Search for the specified filesystem object within the fake 1951 filesystem. 1952 1953 Args: 1954 file_path: Specifies the target FakeFile object to retrieve. 1955 check_read_perm: If True, raises OSError if a parent directory 1956 does not have read permission 1957 1958 Returns: 1959 The FakeFile object corresponding to `file_path`. 1960 1961 Raises: 1962 OSError: if the object is not found. 1963 """ 1964 file_path = make_string_path(file_path) 1965 file_path = self.absnormpath(self._original_path(file_path)) 1966 return self.get_object_from_normpath(file_path, check_read_perm) 1967 1968 def resolve(self, file_path, follow_symlinks=True, allow_fd=False, 1969 check_read_perm=True): 1970 """Search for the specified filesystem object, resolving all links. 1971 1972 Args: 1973 file_path: Specifies the target FakeFile object to retrieve. 1974 follow_symlinks: If `False`, the link itself is resolved, 1975 otherwise the object linked to. 1976 allow_fd: If `True`, `file_path` may be an open file descriptor 1977 check_read_perm: If True, raises OSError if a parent directory 1978 does not have read permission 1979 1980 Returns: 1981 The FakeFile object corresponding to `file_path`. 1982 1983 Raises: 1984 OSError: if the object is not found. 1985 """ 1986 if isinstance(file_path, int): 1987 if allow_fd: 1988 return self.get_open_file(file_path).get_object() 1989 raise TypeError('path should be string, bytes or ' 1990 'os.PathLike (if supported), not int') 1991 1992 if follow_symlinks: 1993 file_path = make_string_path(file_path) 1994 return self.get_object_from_normpath(self.resolve_path( 1995 file_path, check_read_perm), check_read_perm) 1996 return self.lresolve(file_path) 1997 1998 def lresolve(self, path): 1999 """Search for the specified object, resolving only parent links. 2000 2001 This is analogous to the stat/lstat difference. This resolves links 2002 *to* the object but not of the final object itself. 2003 2004 Args: 2005 path: Specifies target FakeFile object to retrieve. 2006 2007 Returns: 2008 The FakeFile object corresponding to path. 2009 2010 Raises: 2011 OSError: if the object is not found. 2012 """ 2013 path = make_string_path(path) 2014 if not path: 2015 raise OSError(errno.ENOENT, path) 2016 if path == self.root.name: 2017 # The root directory will never be a link 2018 return self.root 2019 2020 # remove trailing separator 2021 path = self._path_without_trailing_separators(path) 2022 if path == self._matching_string(path, '.'): 2023 path = self.cwd 2024 path = self._original_path(path) 2025 2026 parent_directory, child_name = self.splitpath(path) 2027 if not parent_directory: 2028 parent_directory = self.cwd 2029 try: 2030 parent_obj = self.resolve(parent_directory) 2031 assert parent_obj 2032 if not isinstance(parent_obj, FakeDirectory): 2033 if not self.is_windows_fs and isinstance(parent_obj, FakeFile): 2034 self.raise_os_error(errno.ENOTDIR, path) 2035 self.raise_os_error(errno.ENOENT, path) 2036 if not parent_obj.st_mode & PERM_READ: 2037 self.raise_os_error(errno.EACCES, parent_directory) 2038 return (parent_obj.get_entry(child_name) if child_name 2039 else parent_obj) 2040 except KeyError: 2041 self.raise_os_error(errno.ENOENT, path) 2042 2043 def add_object(self, file_path, file_object): 2044 """Add a fake file or directory into the filesystem at file_path. 2045 2046 Args: 2047 file_path: The path to the file to be added relative to self. 2048 file_object: File or directory to add. 2049 2050 Raises: 2051 OSError: if file_path does not correspond to a 2052 directory. 2053 """ 2054 if not file_path: 2055 target_directory = self.root 2056 else: 2057 target_directory = self.resolve(file_path) 2058 if not S_ISDIR(target_directory.st_mode): 2059 error = errno.ENOENT if self.is_windows_fs else errno.ENOTDIR 2060 self.raise_os_error(error, file_path) 2061 target_directory.add_entry(file_object) 2062 2063 def rename(self, old_file_path, new_file_path, force_replace=False): 2064 """Renames a FakeFile object at old_file_path to new_file_path, 2065 preserving all properties. 2066 2067 Args: 2068 old_file_path: Path to filesystem object to rename. 2069 new_file_path: Path to where the filesystem object will live 2070 after this call. 2071 force_replace: If set and destination is an existing file, it 2072 will be replaced even under Windows if the user has 2073 permissions, otherwise replacement happens under Unix only. 2074 2075 Raises: 2076 OSError: if old_file_path does not exist. 2077 OSError: if new_file_path is an existing directory 2078 (Windows, or Posix if old_file_path points to a regular file) 2079 OSError: if old_file_path is a directory and new_file_path a file 2080 OSError: if new_file_path is an existing file and force_replace 2081 not set (Windows only). 2082 OSError: if new_file_path is an existing file and could not be 2083 removed (Posix, or Windows with force_replace set). 2084 OSError: if dirname(new_file_path) does not exist. 2085 OSError: if the file would be moved to another filesystem 2086 (e.g. mount point). 2087 """ 2088 ends_with_sep = self.ends_with_path_separator(old_file_path) 2089 old_file_path = self.absnormpath(old_file_path) 2090 new_file_path = self.absnormpath(new_file_path) 2091 if not self.exists(old_file_path, check_link=True): 2092 self.raise_os_error(errno.ENOENT, old_file_path, 2) 2093 if ends_with_sep: 2094 self._handle_broken_link_with_trailing_sep(old_file_path) 2095 2096 old_object = self.lresolve(old_file_path) 2097 if not self.is_windows_fs: 2098 self._handle_posix_dir_link_errors( 2099 new_file_path, old_file_path, ends_with_sep) 2100 2101 if self.exists(new_file_path, check_link=True): 2102 new_file_path = self._rename_to_existing_path( 2103 force_replace, new_file_path, old_file_path, 2104 old_object, ends_with_sep) 2105 2106 if not new_file_path: 2107 return 2108 2109 old_dir, old_name = self.splitpath(old_file_path) 2110 new_dir, new_name = self.splitpath(new_file_path) 2111 if not self.exists(new_dir): 2112 self.raise_os_error(errno.ENOENT, new_dir) 2113 old_dir_object = self.resolve(old_dir) 2114 new_dir_object = self.resolve(new_dir) 2115 if old_dir_object.st_dev != new_dir_object.st_dev: 2116 self.raise_os_error(errno.EXDEV, old_file_path) 2117 if not S_ISDIR(new_dir_object.st_mode): 2118 self.raise_os_error( 2119 errno.EACCES if self.is_windows_fs else errno.ENOTDIR, 2120 new_file_path) 2121 if new_dir_object.has_parent_object(old_object): 2122 self.raise_os_error(errno.EINVAL, new_file_path) 2123 2124 object_to_rename = old_dir_object.get_entry(old_name) 2125 old_dir_object.remove_entry(old_name, recursive=False) 2126 object_to_rename.name = new_name 2127 new_name = new_dir_object._normalized_entryname(new_name) 2128 if new_name in new_dir_object.contents: 2129 # in case of overwriting remove the old entry first 2130 new_dir_object.remove_entry(new_name) 2131 new_dir_object.add_entry(object_to_rename) 2132 2133 def _handle_broken_link_with_trailing_sep(self, path): 2134 # note that the check for trailing sep has to be done earlier 2135 if self.islink(path): 2136 if not self.exists(path): 2137 error = (errno.ENOENT if self.is_macos else 2138 errno.EINVAL if self.is_windows_fs else errno.ENOTDIR) 2139 self.raise_os_error(error, path) 2140 2141 def _handle_posix_dir_link_errors(self, new_file_path, old_file_path, 2142 ends_with_sep): 2143 if (self.isdir(old_file_path, follow_symlinks=False) and 2144 self.islink(new_file_path)): 2145 self.raise_os_error(errno.ENOTDIR, new_file_path) 2146 if (self.isdir(new_file_path, follow_symlinks=False) and 2147 self.islink(old_file_path)): 2148 if ends_with_sep and self.is_macos: 2149 return 2150 error = errno.ENOTDIR if ends_with_sep else errno.EISDIR 2151 self.raise_os_error(error, new_file_path) 2152 if (ends_with_sep and self.islink(old_file_path) and 2153 old_file_path == new_file_path and not self.is_windows_fs): 2154 self.raise_os_error(errno.ENOTDIR, new_file_path) 2155 2156 def _rename_to_existing_path(self, force_replace, new_file_path, 2157 old_file_path, old_object, ends_with_sep): 2158 new_object = self.get_object(new_file_path) 2159 if old_file_path == new_file_path: 2160 if not S_ISLNK(new_object.st_mode) and ends_with_sep: 2161 error = errno.EINVAL if self.is_windows_fs else errno.ENOTDIR 2162 self.raise_os_error(error, old_file_path) 2163 return # Nothing to do here. 2164 2165 if old_object == new_object: 2166 new_file_path = self._rename_same_object( 2167 new_file_path, old_file_path) 2168 elif (S_ISDIR(new_object.st_mode) or S_ISLNK(new_object.st_mode)): 2169 self._handle_rename_error_for_dir_or_link( 2170 force_replace, new_file_path, 2171 new_object, old_object, ends_with_sep) 2172 elif S_ISDIR(old_object.st_mode): 2173 error = errno.EEXIST if self.is_windows_fs else errno.ENOTDIR 2174 self.raise_os_error(error, new_file_path) 2175 elif self.is_windows_fs and not force_replace: 2176 self.raise_os_error(errno.EEXIST, new_file_path) 2177 else: 2178 self.remove_object(new_file_path) 2179 return new_file_path 2180 2181 def _handle_rename_error_for_dir_or_link(self, force_replace, 2182 new_file_path, new_object, 2183 old_object, ends_with_sep): 2184 if self.is_windows_fs: 2185 if force_replace: 2186 self.raise_os_error(errno.EACCES, new_file_path) 2187 else: 2188 self.raise_os_error(errno.EEXIST, new_file_path) 2189 if not S_ISLNK(new_object.st_mode): 2190 if new_object.contents: 2191 if (not S_ISLNK(old_object.st_mode) or 2192 not ends_with_sep or not self.is_macos): 2193 self.raise_os_error(errno.ENOTEMPTY, new_file_path) 2194 if S_ISREG(old_object.st_mode): 2195 self.raise_os_error(errno.EISDIR, new_file_path) 2196 2197 def _rename_same_object(self, new_file_path, old_file_path): 2198 do_rename = old_file_path.lower() == new_file_path.lower() 2199 if not do_rename: 2200 try: 2201 real_old_path = self.resolve_path(old_file_path) 2202 original_old_path = self._original_path(real_old_path) 2203 real_new_path = self.resolve_path(new_file_path) 2204 if (real_new_path == original_old_path and 2205 (new_file_path == real_old_path) == 2206 (new_file_path.lower() == 2207 real_old_path.lower())): 2208 real_object = self.resolve(old_file_path, 2209 follow_symlinks=False) 2210 do_rename = (os.path.basename(old_file_path) == 2211 real_object.name or not self.is_macos) 2212 else: 2213 do_rename = (real_new_path.lower() == 2214 real_old_path.lower()) 2215 if do_rename: 2216 # only case is changed in case-insensitive file 2217 # system - do the rename 2218 parent, file_name = self.splitpath(new_file_path) 2219 new_file_path = self.joinpaths( 2220 self._original_path(parent), file_name) 2221 except OSError: 2222 # ResolvePath may fail due to symlink loop issues or 2223 # similar - in this case just assume the paths 2224 # to be different 2225 pass 2226 if not do_rename: 2227 # hard links to the same file - nothing to do 2228 new_file_path = None 2229 return new_file_path 2230 2231 def remove_object(self, file_path): 2232 """Remove an existing file or directory. 2233 2234 Args: 2235 file_path: The path to the file relative to self. 2236 2237 Raises: 2238 OSError: if file_path does not correspond to an existing file, or 2239 if part of the path refers to something other than a directory. 2240 OSError: if the directory is in use (eg, if it is '/'). 2241 """ 2242 file_path = self.absnormpath(self._original_path(file_path)) 2243 if self._is_root_path(file_path): 2244 self.raise_os_error(errno.EBUSY, file_path) 2245 try: 2246 dirname, basename = self.splitpath(file_path) 2247 target_directory = self.resolve(dirname, check_read_perm=False) 2248 target_directory.remove_entry(basename) 2249 except KeyError: 2250 self.raise_os_error(errno.ENOENT, file_path) 2251 except AttributeError: 2252 self.raise_os_error(errno.ENOTDIR, file_path) 2253 2254 def make_string_path(self, path): 2255 path = make_string_path(path) 2256 os_sep = self._matching_string(path, os.sep) 2257 fake_sep = self._matching_string(path, self.path_separator) 2258 return path.replace(os_sep, fake_sep) 2259 2260 def create_dir(self, directory_path, perm_bits=PERM_DEF): 2261 """Create `directory_path`, and all the parent directories. 2262 2263 Helper method to set up your test faster. 2264 2265 Args: 2266 directory_path: The full directory path to create. 2267 perm_bits: The permission bits as set by `chmod`. 2268 2269 Returns: 2270 The newly created FakeDirectory object. 2271 2272 Raises: 2273 OSError: if the directory already exists. 2274 """ 2275 directory_path = self.make_string_path(directory_path) 2276 directory_path = self.absnormpath(directory_path) 2277 self._auto_mount_drive_if_needed(directory_path) 2278 if self.exists(directory_path, check_link=True): 2279 self.raise_os_error(errno.EEXIST, directory_path) 2280 path_components = self._path_components(directory_path) 2281 current_dir = self.root 2282 2283 new_dirs = [] 2284 for component in path_components: 2285 directory = self._directory_content(current_dir, component)[1] 2286 if not directory: 2287 new_dir = FakeDirectory(component, filesystem=self) 2288 new_dirs.append(new_dir) 2289 current_dir.add_entry(new_dir) 2290 current_dir = new_dir 2291 else: 2292 if S_ISLNK(directory.st_mode): 2293 directory = self.resolve(directory.contents) 2294 current_dir = directory 2295 if directory.st_mode & S_IFDIR != S_IFDIR: 2296 self.raise_os_error(errno.ENOTDIR, current_dir.path) 2297 2298 # set the permission after creating the directories 2299 # to allow directory creation inside a read-only directory 2300 for new_dir in new_dirs: 2301 new_dir.st_mode = S_IFDIR | perm_bits 2302 2303 return current_dir 2304 2305 def create_file(self, file_path, st_mode=S_IFREG | PERM_DEF_FILE, 2306 contents='', st_size=None, create_missing_dirs=True, 2307 apply_umask=False, encoding=None, errors=None, 2308 side_effect=None): 2309 """Create `file_path`, including all the parent directories along 2310 the way. 2311 2312 This helper method can be used to set up tests more easily. 2313 2314 Args: 2315 file_path: The path to the file to create. 2316 st_mode: The stat constant representing the file type. 2317 contents: the contents of the file. If not given and st_size is 2318 None, an empty file is assumed. 2319 st_size: file size; only valid if contents not given. If given, 2320 the file is considered to be in "large file mode" and trying 2321 to read from or write to the file will result in an exception. 2322 create_missing_dirs: If `True`, auto create missing directories. 2323 apply_umask: `True` if the current umask must be applied 2324 on `st_mode`. 2325 encoding: If `contents` is a unicode string, the encoding used 2326 for serialization. 2327 errors: The error mode used for encoding/decoding errors. 2328 side_effect: function handle that is executed when file is written, 2329 must accept the file object as an argument. 2330 2331 Returns: 2332 The newly created FakeFile object. 2333 2334 Raises: 2335 OSError: if the file already exists. 2336 OSError: if the containing directory is required and missing. 2337 """ 2338 return self.create_file_internally( 2339 file_path, st_mode, contents, st_size, create_missing_dirs, 2340 apply_umask, encoding, errors, side_effect=side_effect) 2341 2342 def add_real_file(self, source_path, read_only=True, target_path=None): 2343 """Create `file_path`, including all the parent directories along the 2344 way, for an existing real file. The contents of the real file are read 2345 only on demand. 2346 2347 Args: 2348 source_path: Path to an existing file in the real file system 2349 read_only: If `True` (the default), writing to the fake file 2350 raises an exception. Otherwise, writing to the file changes 2351 the fake file only. 2352 target_path: If given, the path of the target direction, 2353 otherwise it is equal to `source_path`. 2354 2355 Returns: 2356 the newly created FakeFile object. 2357 2358 Raises: 2359 OSError: if the file does not exist in the real file system. 2360 OSError: if the file already exists in the fake file system. 2361 2362 .. note:: On most systems, accessing the fake file's contents may 2363 update both the real and fake files' `atime` (access time). 2364 In this particular case, `add_real_file()` violates the rule 2365 that `pyfakefs` must not modify the real file system. 2366 """ 2367 target_path = target_path or source_path 2368 source_path = make_string_path(source_path) 2369 target_path = self.make_string_path(target_path) 2370 real_stat = os.stat(source_path) 2371 fake_file = self.create_file_internally(target_path, 2372 read_from_real_fs=True) 2373 2374 # for read-only mode, remove the write/executable permission bits 2375 fake_file.stat_result.set_from_stat_result(real_stat) 2376 if read_only: 2377 fake_file.st_mode &= 0o777444 2378 fake_file.file_path = source_path 2379 self.change_disk_usage(fake_file.size, fake_file.name, 2380 fake_file.st_dev) 2381 return fake_file 2382 2383 def add_real_symlink(self, source_path, target_path=None): 2384 """Create a symlink at source_path (or target_path, if given). It will 2385 point to the same path as the symlink on the real filesystem. Relative 2386 symlinks will point relative to their new location. Absolute symlinks 2387 will point to the same, absolute path as on the real filesystem. 2388 2389 Args: 2390 source_path: The path to the existing symlink. 2391 target_path: If given, the name of the symlink in the fake 2392 fileystem, otherwise, the same as `source_path`. 2393 2394 Returns: 2395 the newly created FakeDirectory object. 2396 2397 Raises: 2398 OSError: if the directory does not exist in the real file system. 2399 OSError: if the symlink could not be created 2400 (see :py:meth:`create_file`). 2401 OSError: if the directory already exists in the fake file system. 2402 """ 2403 source_path = self._path_without_trailing_separators(source_path) 2404 if not os.path.exists(source_path) and not os.path.islink(source_path): 2405 self.raise_os_error(errno.ENOENT, source_path) 2406 2407 target = os.readlink(source_path) 2408 2409 if target_path: 2410 return self.create_symlink(target_path, target) 2411 else: 2412 return self.create_symlink(source_path, target) 2413 2414 def add_real_directory(self, source_path, read_only=True, lazy_read=True, 2415 target_path=None): 2416 """Create a fake directory corresponding to the real directory at the 2417 specified path. Add entries in the fake directory corresponding to 2418 the entries in the real directory. Symlinks are supported. 2419 2420 Args: 2421 source_path: The path to the existing directory. 2422 read_only: If set, all files under the directory are treated as 2423 read-only, e.g. a write access raises an exception; 2424 otherwise, writing to the files changes the fake files only 2425 as usually. 2426 lazy_read: If set (default), directory contents are only read when 2427 accessed, and only until the needed subdirectory level. 2428 2429 .. note:: This means that the file system size is only updated 2430 at the time the directory contents are read; set this to 2431 `False` only if you are dependent on accurate file system 2432 size in your test 2433 target_path: If given, the target directory, otherwise, 2434 the target directory is the same as `source_path`. 2435 2436 Returns: 2437 the newly created FakeDirectory object. 2438 2439 Raises: 2440 OSError: if the directory does not exist in the real file system. 2441 OSError: if the directory already exists in the fake file system. 2442 """ 2443 source_path = self._path_without_trailing_separators(source_path) 2444 if not os.path.exists(source_path): 2445 self.raise_os_error(errno.ENOENT, source_path) 2446 target_path = target_path or source_path 2447 if lazy_read: 2448 parent_path = os.path.split(target_path)[0] 2449 if self.exists(parent_path): 2450 parent_dir = self.get_object(parent_path) 2451 else: 2452 parent_dir = self.create_dir(parent_path) 2453 new_dir = FakeDirectoryFromRealDirectory( 2454 source_path, self, read_only, target_path) 2455 parent_dir.add_entry(new_dir) 2456 else: 2457 new_dir = self.create_dir(target_path) 2458 for base, _, files in os.walk(source_path): 2459 new_base = os.path.join(new_dir.path, 2460 os.path.relpath(base, source_path)) 2461 for fileEntry in os.listdir(base): 2462 abs_fileEntry = os.path.join(base, fileEntry) 2463 2464 if not os.path.islink(abs_fileEntry): 2465 continue 2466 2467 self.add_real_symlink( 2468 abs_fileEntry, os.path.join(new_base, fileEntry)) 2469 for fileEntry in files: 2470 path = os.path.join(base, fileEntry) 2471 if os.path.islink(path): 2472 continue 2473 self.add_real_file(path, 2474 read_only, 2475 os.path.join(new_base, fileEntry)) 2476 return new_dir 2477 2478 def add_real_paths(self, path_list, read_only=True, lazy_dir_read=True): 2479 """This convenience method adds multiple files and/or directories from 2480 the real file system to the fake file system. See `add_real_file()` and 2481 `add_real_directory()`. 2482 2483 Args: 2484 path_list: List of file and directory paths in the real file 2485 system. 2486 read_only: If set, all files and files under under the directories 2487 are treated as read-only, e.g. a write access raises an 2488 exception; otherwise, writing to the files changes the fake 2489 files only as usually. 2490 lazy_dir_read: Uses lazy reading of directory contents if set 2491 (see `add_real_directory`) 2492 2493 Raises: 2494 OSError: if any of the files and directories in the list 2495 does not exist in the real file system. 2496 OSError: if any of the files and directories in the list 2497 already exists in the fake file system. 2498 """ 2499 for path in path_list: 2500 if os.path.isdir(path): 2501 self.add_real_directory(path, read_only, lazy_dir_read) 2502 else: 2503 self.add_real_file(path, read_only) 2504 2505 def create_file_internally(self, file_path, 2506 st_mode=S_IFREG | PERM_DEF_FILE, 2507 contents='', st_size=None, 2508 create_missing_dirs=True, 2509 apply_umask=False, encoding=None, errors=None, 2510 read_from_real_fs=False, raw_io=False, 2511 side_effect=None): 2512 """Internal fake file creator that supports both normal fake files 2513 and fake files based on real files. 2514 2515 Args: 2516 file_path: path to the file to create. 2517 st_mode: the stat.S_IF constant representing the file type. 2518 contents: the contents of the file. If not given and st_size is 2519 None, an empty file is assumed. 2520 st_size: file size; only valid if contents not given. If given, 2521 the file is considered to be in "large file mode" and trying 2522 to read from or write to the file will result in an exception. 2523 create_missing_dirs: if True, auto create missing directories. 2524 apply_umask: whether or not the current umask must be applied 2525 on st_mode. 2526 encoding: if contents is a unicode string, the encoding used for 2527 serialization. 2528 errors: the error mode used for encoding/decoding errors 2529 read_from_real_fs: if True, the contents are read from the real 2530 file system on demand. 2531 raw_io: `True` if called from low-level API (`os.open`) 2532 side_effect: function handle that is executed when file is written, 2533 must accept the file object as an argument. 2534 """ 2535 file_path = self.make_string_path(file_path) 2536 file_path = self.absnormpath(file_path) 2537 if not is_int_type(st_mode): 2538 raise TypeError( 2539 'st_mode must be of int type - did you mean to set contents?') 2540 2541 if self.exists(file_path, check_link=True): 2542 self.raise_os_error(errno.EEXIST, file_path) 2543 parent_directory, new_file = self.splitpath(file_path) 2544 if not parent_directory: 2545 parent_directory = self.cwd 2546 self._auto_mount_drive_if_needed(parent_directory) 2547 if not self.exists(parent_directory): 2548 if not create_missing_dirs: 2549 self.raise_os_error(errno.ENOENT, parent_directory) 2550 self.create_dir(parent_directory) 2551 else: 2552 parent_directory = self._original_path(parent_directory) 2553 if apply_umask: 2554 st_mode &= ~self.umask 2555 if read_from_real_fs: 2556 file_object = FakeFileFromRealFile(file_path, filesystem=self, 2557 side_effect=side_effect) 2558 else: 2559 file_object = FakeFile(new_file, st_mode, filesystem=self, 2560 encoding=encoding, errors=errors, 2561 side_effect=side_effect) 2562 2563 self.add_object(parent_directory, file_object) 2564 2565 if st_size is None and contents is None: 2566 contents = '' 2567 if (not read_from_real_fs and 2568 (contents is not None or st_size is not None)): 2569 try: 2570 if st_size is not None: 2571 file_object.set_large_file_size(st_size) 2572 else: 2573 file_object._set_initial_contents(contents) 2574 except OSError: 2575 self.remove_object(file_path) 2576 raise 2577 2578 return file_object 2579 2580 # pylint: disable=unused-argument 2581 def create_symlink(self, file_path, link_target, create_missing_dirs=True): 2582 """Create the specified symlink, pointed at the specified link target. 2583 2584 Args: 2585 file_path: path to the symlink to create 2586 link_target: the target of the symlink 2587 create_missing_dirs: If `True`, any missing parent directories of 2588 file_path will be created 2589 2590 Returns: 2591 The newly created FakeFile object. 2592 2593 Raises: 2594 OSError: if the symlink could not be created 2595 (see :py:meth:`create_file`). 2596 """ 2597 # the link path cannot end with a path separator 2598 file_path = self.make_string_path(file_path) 2599 link_target = self.make_string_path(link_target) 2600 file_path = self.normcase(file_path) 2601 if self.ends_with_path_separator(file_path): 2602 if self.exists(file_path): 2603 self.raise_os_error(errno.EEXIST, file_path) 2604 if self.exists(link_target): 2605 if not self.is_windows_fs: 2606 self.raise_os_error(errno.ENOENT, file_path) 2607 else: 2608 if self.is_windows_fs: 2609 self.raise_os_error(errno.EINVAL, link_target) 2610 if not self.exists( 2611 self._path_without_trailing_separators(file_path), 2612 check_link=True): 2613 self.raise_os_error(errno.ENOENT, link_target) 2614 if self.is_macos: 2615 # to avoid EEXIST exception, remove the link 2616 # if it already exists 2617 if self.exists(file_path, check_link=True): 2618 self.remove_object(file_path) 2619 else: 2620 self.raise_os_error(errno.EEXIST, link_target) 2621 2622 # resolve the link path only if it is not a link itself 2623 if not self.islink(file_path): 2624 file_path = self.resolve_path(file_path) 2625 link_target = make_string_path(link_target) 2626 return self.create_file_internally( 2627 file_path, st_mode=S_IFLNK | PERM_DEF, 2628 contents=link_target, 2629 create_missing_dirs=create_missing_dirs, 2630 raw_io=True) 2631 2632 def link(self, old_path, new_path, follow_symlinks=True): 2633 """Create a hard link at new_path, pointing at old_path. 2634 2635 Args: 2636 old_path: An existing link to the target file. 2637 new_path: The destination path to create a new link at. 2638 follow_symlinks: If False and old_path is a symlink, link the 2639 symlink instead of the object it points to. 2640 2641 Returns: 2642 The FakeFile object referred to by old_path. 2643 2644 Raises: 2645 OSError: if something already exists at new_path. 2646 OSError: if old_path is a directory. 2647 OSError: if the parent directory doesn't exist. 2648 """ 2649 new_path_normalized = self.absnormpath(new_path) 2650 if self.exists(new_path_normalized, check_link=True): 2651 self.raise_os_error(errno.EEXIST, new_path) 2652 2653 new_parent_directory, new_basename = self.splitpath( 2654 new_path_normalized) 2655 if not new_parent_directory: 2656 new_parent_directory = self.cwd 2657 2658 if not self.exists(new_parent_directory): 2659 self.raise_os_error(errno.ENOENT, new_parent_directory) 2660 2661 if self.ends_with_path_separator(old_path): 2662 error = errno.EINVAL if self.is_windows_fs else errno.ENOTDIR 2663 self.raise_os_error(error, old_path) 2664 2665 if not self.is_windows_fs and self.ends_with_path_separator(new_path): 2666 self.raise_os_error(errno.ENOENT, old_path) 2667 2668 # Retrieve the target file 2669 try: 2670 old_file = self.resolve(old_path, follow_symlinks=follow_symlinks) 2671 except OSError: 2672 self.raise_os_error(errno.ENOENT, old_path) 2673 2674 if old_file.st_mode & S_IFDIR: 2675 self.raise_os_error( 2676 errno.EACCES if self.is_windows_fs else errno.EPERM, old_path) 2677 2678 # abuse the name field to control the filename of the 2679 # newly created link 2680 old_file.name = new_basename 2681 self.add_object(new_parent_directory, old_file) 2682 return old_file 2683 2684 def _is_circular_link(self, link_obj): 2685 try: 2686 self.resolve_path(link_obj.contents) 2687 except OSError as exc: 2688 return exc.errno == errno.ELOOP 2689 return False 2690 2691 def readlink(self, path): 2692 """Read the target of a symlink. 2693 2694 Args: 2695 path: symlink to read the target of. 2696 2697 Returns: 2698 the string representing the path to which the symbolic link points. 2699 2700 Raises: 2701 TypeError: if path is None 2702 OSError: (with errno=ENOENT) if path is not a valid path, or 2703 (with errno=EINVAL) if path is valid, but is not a symlink, 2704 or if the path ends with a path separator (Posix only) 2705 """ 2706 if path is None: 2707 raise TypeError 2708 link_obj = self.lresolve(path) 2709 if S_IFMT(link_obj.st_mode) != S_IFLNK: 2710 self.raise_os_error(errno.EINVAL, path) 2711 2712 if self.ends_with_path_separator(path): 2713 if not self.is_windows_fs and self.exists(path): 2714 self.raise_os_error(errno.EINVAL, path) 2715 if not self.exists(link_obj.path): 2716 if self.is_windows_fs: 2717 error = errno.EINVAL 2718 elif self._is_circular_link(link_obj): 2719 if self.is_macos: 2720 return link_obj.path 2721 error = errno.ELOOP 2722 else: 2723 error = errno.ENOENT 2724 self.raise_os_error(error, link_obj.path) 2725 2726 return link_obj.contents 2727 2728 def makedir(self, dir_name, mode=PERM_DEF): 2729 """Create a leaf Fake directory. 2730 2731 Args: 2732 dir_name: (str) Name of directory to create. 2733 Relative paths are assumed to be relative to '/'. 2734 mode: (int) Mode to create directory with. This argument defaults 2735 to 0o777. The umask is applied to this mode. 2736 2737 Raises: 2738 OSError: if the directory name is invalid or parent directory is 2739 read only or as per :py:meth:`add_object`. 2740 """ 2741 dir_name = make_string_path(dir_name) 2742 ends_with_sep = self.ends_with_path_separator(dir_name) 2743 dir_name = self._path_without_trailing_separators(dir_name) 2744 if not dir_name: 2745 self.raise_os_error(errno.ENOENT, '') 2746 2747 if self.is_windows_fs: 2748 dir_name = self.absnormpath(dir_name) 2749 parent_dir, _ = self.splitpath(dir_name) 2750 if parent_dir: 2751 base_dir = self.normpath(parent_dir) 2752 ellipsis = self._matching_string( 2753 parent_dir, self.path_separator + '..') 2754 if parent_dir.endswith(ellipsis) and not self.is_windows_fs: 2755 base_dir, dummy_dotdot, _ = parent_dir.partition(ellipsis) 2756 if not self.exists(base_dir): 2757 self.raise_os_error(errno.ENOENT, base_dir) 2758 2759 dir_name = self.absnormpath(dir_name) 2760 if self.exists(dir_name, check_link=True): 2761 if self.is_windows_fs and dir_name == self.path_separator: 2762 error_nr = errno.EACCES 2763 else: 2764 error_nr = errno.EEXIST 2765 if ends_with_sep and self.is_macos and not self.exists(dir_name): 2766 # to avoid EEXIST exception, remove the link 2767 self.remove_object(dir_name) 2768 else: 2769 self.raise_os_error(error_nr, dir_name) 2770 head, tail = self.splitpath(dir_name) 2771 2772 self.add_object( 2773 head, FakeDirectory(tail, mode & ~self.umask, filesystem=self)) 2774 2775 def _path_without_trailing_separators(self, path): 2776 while self.ends_with_path_separator(path): 2777 path = path[:-1] 2778 return path 2779 2780 def makedirs(self, dir_name, mode=PERM_DEF, exist_ok=False): 2781 """Create a leaf Fake directory and create any non-existent 2782 parent dirs. 2783 2784 Args: 2785 dir_name: (str) Name of directory to create. 2786 mode: (int) Mode to create directory (and any necessary parent 2787 directories) with. This argument defaults to 0o777. 2788 The umask is applied to this mode. 2789 exist_ok: (boolean) If exist_ok is False (the default), an OSError is 2790 raised if the target directory already exists. 2791 2792 Raises: 2793 OSError: if the directory already exists and exist_ok=False, 2794 or as per :py:meth:`create_dir`. 2795 """ 2796 if not dir_name: 2797 self.raise_os_error(errno.ENOENT, '') 2798 dir_name = to_string(dir_name) 2799 ends_with_sep = self.ends_with_path_separator(dir_name) 2800 dir_name = self.absnormpath(dir_name) 2801 if (ends_with_sep and self.is_macos and 2802 self.exists(dir_name, check_link=True) and 2803 not self.exists(dir_name)): 2804 # to avoid EEXIST exception, remove the link 2805 self.remove_object(dir_name) 2806 2807 path_components = self._path_components(dir_name) 2808 2809 # Raise a permission denied error if thioe first existing directory 2810 # is not writeable. 2811 current_dir = self.root 2812 for component in path_components: 2813 if (component not in current_dir.contents 2814 or not isinstance(current_dir.contents, dict)): 2815 break 2816 else: 2817 current_dir = current_dir.contents[component] 2818 try: 2819 self.create_dir(dir_name, mode & ~self.umask) 2820 except OSError as e: 2821 if e.errno == errno.EACCES: 2822 # permission denied - propagate exception 2823 raise 2824 if (not exist_ok or 2825 not isinstance(self.resolve(dir_name), FakeDirectory)): 2826 if self.is_windows_fs and e.errno == errno.ENOTDIR: 2827 e.errno = errno.ENOENT 2828 self.raise_os_error(e.errno, e.filename) 2829 2830 def _is_of_type(self, path, st_flag, follow_symlinks=True): 2831 """Helper function to implement isdir(), islink(), etc. 2832 2833 See the stat(2) man page for valid stat.S_I* flag values 2834 2835 Args: 2836 path: Path to file to stat and test 2837 st_flag: The stat.S_I* flag checked for the file's st_mode 2838 2839 Returns: 2840 (boolean) `True` if the st_flag is set in path's st_mode. 2841 2842 Raises: 2843 TypeError: if path is None 2844 """ 2845 path = make_string_path(path) 2846 if path is None: 2847 raise TypeError 2848 try: 2849 obj = self.resolve(path, follow_symlinks) 2850 if obj: 2851 self.raise_for_filepath_ending_with_separator( 2852 path, obj, macos_handling=not follow_symlinks) 2853 return S_IFMT(obj.st_mode) == st_flag 2854 except OSError: 2855 return False 2856 return False 2857 2858 def isdir(self, path, follow_symlinks=True): 2859 """Determine if path identifies a directory. 2860 2861 Args: 2862 path: Path to filesystem object. 2863 2864 Returns: 2865 `True` if path points to a directory (following symlinks). 2866 2867 Raises: 2868 TypeError: if path is None. 2869 """ 2870 return self._is_of_type(path, S_IFDIR, follow_symlinks) 2871 2872 def isfile(self, path, follow_symlinks=True): 2873 """Determine if path identifies a regular file. 2874 2875 Args: 2876 path: Path to filesystem object. 2877 2878 Returns: 2879 `True` if path points to a regular file (following symlinks). 2880 2881 Raises: 2882 TypeError: if path is None. 2883 """ 2884 return self._is_of_type(path, S_IFREG, follow_symlinks) 2885 2886 def islink(self, path): 2887 """Determine if path identifies a symbolic link. 2888 2889 Args: 2890 path: Path to filesystem object. 2891 2892 Returns: 2893 `True` if path points to a symlink (S_IFLNK set in st_mode) 2894 2895 Raises: 2896 TypeError: if path is None. 2897 """ 2898 return self._is_of_type(path, S_IFLNK, follow_symlinks=False) 2899 2900 def confirmdir(self, target_directory): 2901 """Test that the target is actually a directory, raising OSError 2902 if not. 2903 2904 Args: 2905 target_directory: Path to the target directory within the fake 2906 filesystem. 2907 2908 Returns: 2909 The FakeDirectory object corresponding to target_directory. 2910 2911 Raises: 2912 OSError: if the target is not a directory. 2913 """ 2914 directory = self.resolve(target_directory) 2915 if not directory.st_mode & S_IFDIR: 2916 self.raise_os_error(errno.ENOTDIR, target_directory, 267) 2917 return directory 2918 2919 def remove(self, path): 2920 """Remove the FakeFile object at the specified file path. 2921 2922 Args: 2923 path: Path to file to be removed. 2924 2925 Raises: 2926 OSError: if path points to a directory. 2927 OSError: if path does not exist. 2928 OSError: if removal failed. 2929 """ 2930 norm_path = self.absnormpath(path) 2931 if self.ends_with_path_separator(path): 2932 self._handle_broken_link_with_trailing_sep(norm_path) 2933 if self.exists(norm_path): 2934 obj = self.resolve(norm_path, check_read_perm=False) 2935 if S_IFMT(obj.st_mode) == S_IFDIR: 2936 link_obj = self.lresolve(norm_path) 2937 if S_IFMT(link_obj.st_mode) != S_IFLNK: 2938 if self.is_windows_fs: 2939 error = errno.EACCES 2940 elif self.is_macos: 2941 error = errno.EPERM 2942 else: 2943 error = errno.EISDIR 2944 self.raise_os_error(error, norm_path) 2945 2946 norm_path = make_string_path(norm_path) 2947 if path.endswith(self.path_separator): 2948 if self.is_windows_fs: 2949 error = errno.EACCES 2950 elif self.is_macos: 2951 error = errno.EPERM 2952 else: 2953 error = errno.ENOTDIR 2954 self.raise_os_error(error, norm_path) 2955 else: 2956 self.raise_for_filepath_ending_with_separator(path, obj) 2957 2958 self.remove_object(norm_path) 2959 2960 def rmdir(self, target_directory, allow_symlink=False): 2961 """Remove a leaf Fake directory. 2962 2963 Args: 2964 target_directory: (str) Name of directory to remove. 2965 allow_symlink: (bool) if `target_directory` is a symlink, 2966 the function just returns, otherwise it raises (Posix only) 2967 2968 Raises: 2969 OSError: if target_directory does not exist. 2970 OSError: if target_directory does not point to a directory. 2971 OSError: if removal failed per FakeFilesystem.RemoveObject. 2972 Cannot remove '.'. 2973 """ 2974 if target_directory in (b'.', u'.'): 2975 error_nr = errno.EACCES if self.is_windows_fs else errno.EINVAL 2976 self.raise_os_error(error_nr, target_directory) 2977 ends_with_sep = self.ends_with_path_separator(target_directory) 2978 target_directory = self.absnormpath(target_directory) 2979 if self.confirmdir(target_directory): 2980 if not self.is_windows_fs and self.islink(target_directory): 2981 if allow_symlink: 2982 return 2983 if not ends_with_sep or not self.is_macos: 2984 self.raise_os_error(errno.ENOTDIR, target_directory) 2985 2986 dir_object = self.resolve(target_directory) 2987 if dir_object.contents: 2988 self.raise_os_error(errno.ENOTEMPTY, target_directory) 2989 self.remove_object(target_directory) 2990 2991 def listdir(self, target_directory): 2992 """Return a list of file names in target_directory. 2993 2994 Args: 2995 target_directory: Path to the target directory within the 2996 fake filesystem. 2997 2998 Returns: 2999 A list of file names within the target directory in arbitrary 3000 order. 3001 3002 Raises: 3003 OSError: if the target is not a directory. 3004 """ 3005 target_directory = self.resolve_path(target_directory, allow_fd=True) 3006 directory = self.confirmdir(target_directory) 3007 directory_contents = directory.contents 3008 return list(directory_contents.keys()) 3009 3010 def __str__(self): 3011 return str(self.root) 3012 3013 def _add_standard_streams(self): 3014 self._add_open_file(StandardStreamWrapper(sys.stdin)) 3015 self._add_open_file(StandardStreamWrapper(sys.stdout)) 3016 self._add_open_file(StandardStreamWrapper(sys.stderr)) 3017 3018 3019Deprecator.add(FakeFilesystem, FakeFilesystem.get_disk_usage, 'GetDiskUsage') 3020Deprecator.add(FakeFilesystem, FakeFilesystem.set_disk_usage, 'SetDiskUsage') 3021Deprecator.add(FakeFilesystem, 3022 FakeFilesystem.change_disk_usage, 'ChangeDiskUsage') 3023Deprecator.add(FakeFilesystem, FakeFilesystem.add_mount_point, 'AddMountPoint') 3024Deprecator.add(FakeFilesystem, FakeFilesystem.stat, 'GetStat') 3025Deprecator.add(FakeFilesystem, FakeFilesystem.chmod, 'ChangeMode') 3026Deprecator.add(FakeFilesystem, FakeFilesystem.utime, 'UpdateTime') 3027Deprecator.add(FakeFilesystem, FakeFilesystem._add_open_file, 'AddOpenFile') 3028Deprecator.add(FakeFilesystem, 3029 FakeFilesystem._close_open_file, 'CloseOpenFile') 3030Deprecator.add(FakeFilesystem, FakeFilesystem.has_open_file, 'HasOpenFile') 3031Deprecator.add(FakeFilesystem, FakeFilesystem.get_open_file, 'GetOpenFile') 3032Deprecator.add(FakeFilesystem, 3033 FakeFilesystem.normcase, 'NormalizePathSeparator') 3034Deprecator.add(FakeFilesystem, FakeFilesystem.normpath, 'CollapsePath') 3035Deprecator.add(FakeFilesystem, FakeFilesystem._original_path, 'NormalizeCase') 3036Deprecator.add(FakeFilesystem, FakeFilesystem.absnormpath, 'NormalizePath') 3037Deprecator.add(FakeFilesystem, FakeFilesystem.splitpath, 'SplitPath') 3038Deprecator.add(FakeFilesystem, FakeFilesystem.splitdrive, 'SplitDrive') 3039Deprecator.add(FakeFilesystem, FakeFilesystem.joinpaths, 'JoinPaths') 3040Deprecator.add(FakeFilesystem, 3041 FakeFilesystem._path_components, 'GetPathComponents') 3042Deprecator.add(FakeFilesystem, FakeFilesystem._starts_with_drive_letter, 3043 'StartsWithDriveLetter') 3044Deprecator.add(FakeFilesystem, FakeFilesystem.exists, 'Exists') 3045Deprecator.add(FakeFilesystem, FakeFilesystem.resolve_path, 'ResolvePath') 3046Deprecator.add(FakeFilesystem, FakeFilesystem.get_object_from_normpath, 3047 'GetObjectFromNormalizedPath') 3048Deprecator.add(FakeFilesystem, FakeFilesystem.get_object, 'GetObject') 3049Deprecator.add(FakeFilesystem, FakeFilesystem.resolve, 'ResolveObject') 3050Deprecator.add(FakeFilesystem, FakeFilesystem.lresolve, 'LResolveObject') 3051Deprecator.add(FakeFilesystem, FakeFilesystem.add_object, 'AddObject') 3052Deprecator.add(FakeFilesystem, FakeFilesystem.remove_object, 'RemoveObject') 3053Deprecator.add(FakeFilesystem, FakeFilesystem.rename, 'RenameObject') 3054Deprecator.add(FakeFilesystem, FakeFilesystem.create_dir, 'CreateDirectory') 3055Deprecator.add(FakeFilesystem, FakeFilesystem.create_file, 'CreateFile') 3056Deprecator.add(FakeFilesystem, FakeFilesystem.create_symlink, 'CreateLink') 3057Deprecator.add(FakeFilesystem, FakeFilesystem.link, 'CreateHardLink') 3058Deprecator.add(FakeFilesystem, FakeFilesystem.readlink, 'ReadLink') 3059Deprecator.add(FakeFilesystem, FakeFilesystem.makedir, 'MakeDirectory') 3060Deprecator.add(FakeFilesystem, FakeFilesystem.makedirs, 'MakeDirectories') 3061Deprecator.add(FakeFilesystem, FakeFilesystem.isdir, 'IsDir') 3062Deprecator.add(FakeFilesystem, FakeFilesystem.isfile, 'IsFile') 3063Deprecator.add(FakeFilesystem, FakeFilesystem.islink, 'IsLink') 3064Deprecator.add(FakeFilesystem, FakeFilesystem.confirmdir, 'ConfirmDir') 3065Deprecator.add(FakeFilesystem, FakeFilesystem.remove, 'RemoveFile') 3066Deprecator.add(FakeFilesystem, FakeFilesystem.rmdir, 'RemoveDirectory') 3067Deprecator.add(FakeFilesystem, FakeFilesystem.listdir, 'ListDir') 3068 3069 3070class FakePathModule: 3071 """Faked os.path module replacement. 3072 3073 FakePathModule should *only* be instantiated by FakeOsModule. See the 3074 FakeOsModule docstring for details. 3075 """ 3076 _OS_PATH_COPY = _copy_module(os.path) 3077 3078 @staticmethod 3079 def dir(): 3080 """Return the list of patched function names. Used for patching 3081 functions imported from the module. 3082 """ 3083 return [ 3084 'abspath', 'dirname', 'exists', 'expanduser', 'getatime', 3085 'getctime', 'getmtime', 'getsize', 'isabs', 'isdir', 'isfile', 3086 'islink', 'ismount', 'join', 'lexists', 'normcase', 'normpath', 3087 'realpath', 'relpath', 'split', 'splitdrive', 'samefile' 3088 ] 3089 3090 def __init__(self, filesystem, os_module): 3091 """Init. 3092 3093 Args: 3094 filesystem: FakeFilesystem used to provide file system information 3095 """ 3096 self.filesystem = filesystem 3097 self._os_path = self._OS_PATH_COPY 3098 self._os_path.os = self.os = os_module 3099 self.sep = self.filesystem.path_separator 3100 self.altsep = self.filesystem.alternative_path_separator 3101 3102 def exists(self, path): 3103 """Determine whether the file object exists within the fake filesystem. 3104 3105 Args: 3106 path: The path to the file object. 3107 3108 Returns: 3109 (bool) `True` if the file exists. 3110 """ 3111 return self.filesystem.exists(path) 3112 3113 def lexists(self, path): 3114 """Test whether a path exists. Returns True for broken symbolic links. 3115 3116 Args: 3117 path: path to the symlink object. 3118 3119 Returns: 3120 bool (if file exists). 3121 """ 3122 return self.filesystem.exists(path, check_link=True) 3123 3124 def getsize(self, path): 3125 """Return the file object size in bytes. 3126 3127 Args: 3128 path: path to the file object. 3129 3130 Returns: 3131 file size in bytes. 3132 """ 3133 file_obj = self.filesystem.resolve(path) 3134 if (self.filesystem.ends_with_path_separator(path) and 3135 S_IFMT(file_obj.st_mode) != S_IFDIR): 3136 error_nr = (errno.EINVAL if self.filesystem.is_windows_fs 3137 else errno.ENOTDIR) 3138 self.filesystem.raise_os_error(error_nr, path) 3139 return file_obj.st_size 3140 3141 def isabs(self, path): 3142 """Return True if path is an absolute pathname.""" 3143 if self.filesystem.is_windows_fs: 3144 path = self.splitdrive(path)[1] 3145 path = make_string_path(path) 3146 sep = self.filesystem._path_separator(path) 3147 altsep = self.filesystem._alternative_path_separator(path) 3148 if self.filesystem.is_windows_fs: 3149 return len(path) > 0 and path[:1] in (sep, altsep) 3150 else: 3151 return (path.startswith(sep) or 3152 altsep is not None and path.startswith(altsep)) 3153 3154 def isdir(self, path): 3155 """Determine if path identifies a directory.""" 3156 return self.filesystem.isdir(path) 3157 3158 def isfile(self, path): 3159 """Determine if path identifies a regular file.""" 3160 return self.filesystem.isfile(path) 3161 3162 def islink(self, path): 3163 """Determine if path identifies a symbolic link. 3164 3165 Args: 3166 path: Path to filesystem object. 3167 3168 Returns: 3169 `True` if path points to a symbolic link. 3170 3171 Raises: 3172 TypeError: if path is None. 3173 """ 3174 return self.filesystem.islink(path) 3175 3176 def getmtime(self, path): 3177 """Returns the modification time of the fake file. 3178 3179 Args: 3180 path: the path to fake file. 3181 3182 Returns: 3183 (int, float) the modification time of the fake file 3184 in number of seconds since the epoch. 3185 3186 Raises: 3187 OSError: if the file does not exist. 3188 """ 3189 try: 3190 file_obj = self.filesystem.resolve(path) 3191 return file_obj.st_mtime 3192 except OSError: 3193 self.filesystem.raise_os_error(errno.ENOENT, winerror=3) 3194 3195 def getatime(self, path): 3196 """Returns the last access time of the fake file. 3197 3198 Note: Access time is not set automatically in fake filesystem 3199 on access. 3200 3201 Args: 3202 path: the path to fake file. 3203 3204 Returns: 3205 (int, float) the access time of the fake file in number of seconds 3206 since the epoch. 3207 3208 Raises: 3209 OSError: if the file does not exist. 3210 """ 3211 try: 3212 file_obj = self.filesystem.resolve(path) 3213 except OSError: 3214 self.filesystem.raise_os_error(errno.ENOENT) 3215 return file_obj.st_atime 3216 3217 def getctime(self, path): 3218 """Returns the creation time of the fake file. 3219 3220 Args: 3221 path: the path to fake file. 3222 3223 Returns: 3224 (int, float) the creation time of the fake file in number of 3225 seconds since the epoch. 3226 3227 Raises: 3228 OSError: if the file does not exist. 3229 """ 3230 try: 3231 file_obj = self.filesystem.resolve(path) 3232 except OSError: 3233 self.filesystem.raise_os_error(errno.ENOENT) 3234 return file_obj.st_ctime 3235 3236 def abspath(self, path): 3237 """Return the absolute version of a path.""" 3238 3239 def getcwd(): 3240 """Return the current working directory.""" 3241 # pylint: disable=undefined-variable 3242 if isinstance(path, bytes): 3243 return self.os.getcwdb() 3244 else: 3245 return self.os.getcwd() 3246 3247 path = make_string_path(path) 3248 sep = self.filesystem._path_separator(path) 3249 altsep = self.filesystem._alternative_path_separator(path) 3250 if not self.isabs(path): 3251 path = self.join(getcwd(), path) 3252 elif (self.filesystem.is_windows_fs and 3253 path.startswith(sep) or altsep is not None and 3254 path.startswith(altsep)): 3255 cwd = getcwd() 3256 if self.filesystem._starts_with_drive_letter(cwd): 3257 path = self.join(cwd[:2], path) 3258 return self.normpath(path) 3259 3260 def join(self, *p): 3261 """Return the completed path with a separator of the parts.""" 3262 return self.filesystem.joinpaths(*p) 3263 3264 def split(self, path): 3265 """Split the path into the directory and the filename of the path. 3266 """ 3267 return self.filesystem.splitpath(path) 3268 3269 def splitdrive(self, path): 3270 """Split the path into the drive part and the rest of the path, if 3271 supported.""" 3272 return self.filesystem.splitdrive(path) 3273 3274 def normpath(self, path): 3275 """Normalize path, eliminating double slashes, etc.""" 3276 return self.filesystem.normpath(path) 3277 3278 def normcase(self, path): 3279 """Convert to lower case under windows, replaces additional path 3280 separator.""" 3281 path = self.filesystem.normcase(path) 3282 if self.filesystem.is_windows_fs: 3283 path = path.lower() 3284 return path 3285 3286 def relpath(self, path, start=None): 3287 """We mostly rely on the native implementation and adapt the 3288 path separator.""" 3289 if not path: 3290 raise ValueError("no path specified") 3291 path = make_string_path(path) 3292 if start is not None: 3293 start = make_string_path(start) 3294 else: 3295 start = self.filesystem.cwd 3296 if self.filesystem.alternative_path_separator is not None: 3297 path = path.replace(self.filesystem.alternative_path_separator, 3298 self._os_path.sep) 3299 start = start.replace(self.filesystem.alternative_path_separator, 3300 self._os_path.sep) 3301 path = path.replace(self.filesystem.path_separator, self._os_path.sep) 3302 start = start.replace( 3303 self.filesystem.path_separator, self._os_path.sep) 3304 path = self._os_path.relpath(path, start) 3305 return path.replace(self._os_path.sep, self.filesystem.path_separator) 3306 3307 def realpath(self, filename): 3308 """Return the canonical path of the specified filename, eliminating any 3309 symbolic links encountered in the path. 3310 """ 3311 if self.filesystem.is_windows_fs: 3312 return self.abspath(filename) 3313 filename = make_string_path(filename) 3314 path, ok = self._joinrealpath(filename[:0], filename, {}) 3315 return self.abspath(path) 3316 3317 def samefile(self, path1, path2): 3318 """Return whether path1 and path2 point to the same file. 3319 3320 Args: 3321 path1: first file path or path object (Python >=3.6) 3322 path2: second file path or path object (Python >=3.6) 3323 3324 Raises: 3325 OSError: if one of the paths does not point to an existing 3326 file system object. 3327 """ 3328 stat1 = self.filesystem.stat(path1) 3329 stat2 = self.filesystem.stat(path2) 3330 return (stat1.st_ino == stat2.st_ino and 3331 stat1.st_dev == stat2.st_dev) 3332 3333 def _joinrealpath(self, path, rest, seen): 3334 """Join two paths, normalizing and eliminating any symbolic links 3335 encountered in the second path. 3336 Taken from Python source and adapted. 3337 """ 3338 curdir = self.filesystem._matching_string(path, '.') 3339 pardir = self.filesystem._matching_string(path, '..') 3340 3341 sep = self.filesystem._path_separator(path) 3342 if self.isabs(rest): 3343 rest = rest[1:] 3344 path = sep 3345 3346 while rest: 3347 name, _, rest = rest.partition(sep) 3348 if not name or name == curdir: 3349 # current dir 3350 continue 3351 if name == pardir: 3352 # parent dir 3353 if path: 3354 path, name = self.filesystem.splitpath(path) 3355 if name == pardir: 3356 path = self.filesystem.joinpaths(path, pardir, pardir) 3357 else: 3358 path = pardir 3359 continue 3360 newpath = self.filesystem.joinpaths(path, name) 3361 if not self.filesystem.islink(newpath): 3362 path = newpath 3363 continue 3364 # Resolve the symbolic link 3365 if newpath in seen: 3366 # Already seen this path 3367 path = seen[newpath] 3368 if path is not None: 3369 # use cached value 3370 continue 3371 # The symlink is not resolved, so we must have a symlink loop. 3372 # Return already resolved part + rest of the path unchanged. 3373 return self.filesystem.joinpaths(newpath, rest), False 3374 seen[newpath] = None # not resolved symlink 3375 path, ok = self._joinrealpath( 3376 path, self.filesystem.readlink(newpath), seen) 3377 if not ok: 3378 return self.filesystem.joinpaths(path, rest), False 3379 seen[newpath] = path # resolved symlink 3380 return path, True 3381 3382 def dirname(self, path): 3383 """Returns the first part of the result of `split()`.""" 3384 return self.split(path)[0] 3385 3386 def expanduser(self, path): 3387 """Return the argument with an initial component of ~ or ~user 3388 replaced by that user's home directory. 3389 """ 3390 return self._os_path.expanduser(path).replace( 3391 self._os_path.sep, self.sep) 3392 3393 def ismount(self, path): 3394 """Return true if the given path is a mount point. 3395 3396 Args: 3397 path: Path to filesystem object to be checked 3398 3399 Returns: 3400 `True` if path is a mount point added to the fake file system. 3401 Under Windows also returns True for drive and UNC roots 3402 (independent of their existence). 3403 """ 3404 path = make_string_path(path) 3405 if not path: 3406 return False 3407 normed_path = self.filesystem.absnormpath(path) 3408 sep = self.filesystem._path_separator(path) 3409 if self.filesystem.is_windows_fs: 3410 if self.filesystem.alternative_path_separator is not None: 3411 path_seps = ( 3412 sep, self.filesystem._alternative_path_separator(path) 3413 ) 3414 else: 3415 path_seps = (sep,) 3416 drive, rest = self.filesystem.splitdrive(normed_path) 3417 if drive and drive[:1] in path_seps: 3418 return (not rest) or (rest in path_seps) 3419 if rest in path_seps: 3420 return True 3421 for mount_point in self.filesystem.mount_points: 3422 if normed_path.rstrip(sep) == mount_point.rstrip(sep): 3423 return True 3424 return False 3425 3426 def __getattr__(self, name): 3427 """Forwards any non-faked calls to the real os.path.""" 3428 return getattr(self._os_path, name) 3429 3430 3431class FakeOsModule: 3432 """Uses FakeFilesystem to provide a fake os module replacement. 3433 3434 Do not create os.path separately from os, as there is a necessary circular 3435 dependency between os and os.path to replicate the behavior of the standard 3436 Python modules. What you want to do is to just let FakeOsModule take care 3437 of `os.path` setup itself. 3438 3439 # You always want to do this. 3440 filesystem = fake_filesystem.FakeFilesystem() 3441 my_os_module = fake_filesystem.FakeOsModule(filesystem) 3442 """ 3443 3444 devnull = None 3445 3446 @staticmethod 3447 def dir(): 3448 """Return the list of patched function names. Used for patching 3449 functions imported from the module. 3450 """ 3451 dir = [ 3452 'access', 'chdir', 'chmod', 'chown', 'close', 'fstat', 'fsync', 3453 'getcwd', 'lchmod', 'link', 'listdir', 'lstat', 'makedirs', 3454 'mkdir', 'mknod', 'open', 'read', 'readlink', 'remove', 3455 'removedirs', 'rename', 'rmdir', 'stat', 'symlink', 'umask', 3456 'unlink', 'utime', 'walk', 'write', 'getcwdb', 'replace' 3457 ] 3458 if sys.platform.startswith('linux'): 3459 dir += [ 3460 'fdatasync', 'getxattr', 'listxattr', 3461 'removexattr', 'setxattr' 3462 ] 3463 if use_scandir: 3464 dir += ['scandir'] 3465 return dir 3466 3467 def __init__(self, filesystem): 3468 """Also exposes self.path (to fake os.path). 3469 3470 Args: 3471 filesystem: FakeFilesystem used to provide file system information 3472 """ 3473 self.filesystem = filesystem 3474 self.sep = filesystem.path_separator 3475 self.altsep = filesystem.alternative_path_separator 3476 self.linesep = filesystem.line_separator() 3477 self._os_module = os 3478 self.path = FakePathModule(self.filesystem, self) 3479 self.__class__.devnull = ('/dev/nul' if filesystem.is_windows_fs 3480 else '/dev/nul') 3481 3482 def fdopen(self, fd, *args, **kwargs): 3483 """Redirector to open() builtin function. 3484 3485 Args: 3486 fd: The file descriptor of the file to open. 3487 *args: Pass through args. 3488 **kwargs: Pass through kwargs. 3489 3490 Returns: 3491 File object corresponding to file_des. 3492 3493 Raises: 3494 TypeError: if file descriptor is not an integer. 3495 """ 3496 if not is_int_type(fd): 3497 raise TypeError('an integer is required') 3498 return FakeFileOpen(self.filesystem)(fd, *args, **kwargs) 3499 3500 def _umask(self): 3501 """Return the current umask.""" 3502 if self.filesystem.is_windows_fs: 3503 # windows always returns 0 - it has no real notion of umask 3504 return 0 3505 if sys.platform == 'win32': 3506 # if we are testing Unix under Windows we assume a default mask 3507 return 0o002 3508 else: 3509 # under Unix, we return the real umask; 3510 # as there is no pure getter for umask, so we have to first 3511 # set a mode to get the previous one and then re-set that 3512 mask = os.umask(0) 3513 os.umask(mask) 3514 return mask 3515 3516 def open(self, path, flags, mode=None, *, dir_fd=None): 3517 """Return the file descriptor for a FakeFile. 3518 3519 Args: 3520 path: the path to the file 3521 flags: low-level bits to indicate io operation 3522 mode: bits to define default permissions 3523 Note: only basic modes are supported, OS-specific modes are 3524 ignored 3525 dir_fd: If not `None`, the file descriptor of a directory, 3526 with `file_path` being relative to this directory. 3527 3528 Returns: 3529 A file descriptor. 3530 3531 Raises: 3532 OSError: if the path cannot be found 3533 ValueError: if invalid mode is given 3534 NotImplementedError: if `os.O_EXCL` is used without `os.O_CREAT` 3535 """ 3536 path = self._path_with_dir_fd(path, self.open, dir_fd) 3537 if mode is None: 3538 if self.filesystem.is_windows_fs: 3539 mode = 0o666 3540 else: 3541 mode = 0o777 & ~self._umask() 3542 3543 has_tmpfile_flag = (hasattr(os, 'O_TMPFILE') and 3544 flags & getattr(os, 'O_TMPFILE')) 3545 open_modes = _OpenModes( 3546 must_exist=not flags & os.O_CREAT and not has_tmpfile_flag, 3547 can_read=not flags & os.O_WRONLY, 3548 can_write=flags & (os.O_RDWR | os.O_WRONLY) != 0, 3549 truncate=flags & os.O_TRUNC != 0, 3550 append=flags & os.O_APPEND != 0, 3551 must_not_exist=flags & os.O_EXCL != 0 3552 ) 3553 if open_modes.must_not_exist and open_modes.must_exist: 3554 raise NotImplementedError( 3555 'O_EXCL without O_CREAT mode is not supported') 3556 if has_tmpfile_flag: 3557 # this is a workaround for tempfiles that do not have a filename 3558 # as we do not support this directly, we just add a unique filename 3559 # and set the file to delete on close 3560 path = self.filesystem.joinpaths( 3561 path, str(uuid.uuid4())) 3562 3563 if (not self.filesystem.is_windows_fs and 3564 self.filesystem.exists(path)): 3565 # handle opening directory - only allowed under Posix 3566 # with read-only mode 3567 obj = self.filesystem.resolve(path) 3568 if isinstance(obj, FakeDirectory): 3569 if ((not open_modes.must_exist and 3570 not self.filesystem.is_macos) 3571 or open_modes.can_write): 3572 self.filesystem.raise_os_error(errno.EISDIR, path) 3573 dir_wrapper = FakeDirWrapper(obj, path, self.filesystem) 3574 file_des = self.filesystem._add_open_file(dir_wrapper) 3575 dir_wrapper.filedes = file_des 3576 return file_des 3577 3578 # low level open is always binary 3579 str_flags = 'b' 3580 delete_on_close = has_tmpfile_flag 3581 if hasattr(os, 'O_TEMPORARY'): 3582 delete_on_close = flags & os.O_TEMPORARY == os.O_TEMPORARY 3583 fake_file = FakeFileOpen( 3584 self.filesystem, delete_on_close=delete_on_close, raw_io=True)( 3585 path, str_flags, open_modes=open_modes) 3586 if fake_file.file_object != self.filesystem.dev_null: 3587 self.chmod(path, mode) 3588 return fake_file.fileno() 3589 3590 def close(self, fd): 3591 """Close a file descriptor. 3592 3593 Args: 3594 fd: An integer file descriptor for the file object requested. 3595 3596 Raises: 3597 OSError: bad file descriptor. 3598 TypeError: if file descriptor is not an integer. 3599 """ 3600 file_handle = self.filesystem.get_open_file(fd) 3601 file_handle.close() 3602 3603 def read(self, fd, n): 3604 """Read number of bytes from a file descriptor, returns bytes read. 3605 3606 Args: 3607 fd: An integer file descriptor for the file object requested. 3608 n: Number of bytes to read from file. 3609 3610 Returns: 3611 Bytes read from file. 3612 3613 Raises: 3614 OSError: bad file descriptor. 3615 TypeError: if file descriptor is not an integer. 3616 """ 3617 file_handle = self.filesystem.get_open_file(fd) 3618 file_handle.raw_io = True 3619 return file_handle.read(n) 3620 3621 def write(self, fd, contents): 3622 """Write string to file descriptor, returns number of bytes written. 3623 3624 Args: 3625 fd: An integer file descriptor for the file object requested. 3626 contents: String of bytes to write to file. 3627 3628 Returns: 3629 Number of bytes written. 3630 3631 Raises: 3632 OSError: bad file descriptor. 3633 TypeError: if file descriptor is not an integer. 3634 """ 3635 file_handle = self.filesystem.get_open_file(fd) 3636 if isinstance(file_handle, FakeDirWrapper): 3637 self.filesystem.raise_os_error(errno.EBADF, file_handle.file_path) 3638 3639 if isinstance(file_handle, FakePipeWrapper): 3640 return file_handle.write(contents) 3641 3642 file_handle.raw_io = True 3643 file_handle._sync_io() 3644 file_handle.update_flush_pos() 3645 file_handle.write(contents) 3646 file_handle.flush() 3647 return len(contents) 3648 3649 def pipe(self): 3650 read_fd, write_fd = os.pipe() 3651 read_wrapper = FakePipeWrapper(self.filesystem, read_fd) 3652 file_des = self.filesystem._add_open_file(read_wrapper) 3653 read_wrapper.filedes = file_des 3654 write_wrapper = FakePipeWrapper(self.filesystem, write_fd) 3655 file_des = self.filesystem._add_open_file(write_wrapper) 3656 write_wrapper.filedes = file_des 3657 return read_wrapper.filedes, write_wrapper.filedes 3658 3659 @staticmethod 3660 def stat_float_times(newvalue=None): 3661 """Determine whether a file's time stamps are reported as floats 3662 or ints. 3663 3664 Calling without arguments returns the current value. The value is 3665 shared by all instances of FakeOsModule. 3666 3667 Args: 3668 newvalue: If `True`, mtime, ctime, atime are reported as floats. 3669 Otherwise, they are returned as ints (rounding down). 3670 """ 3671 return FakeStatResult.stat_float_times(newvalue) 3672 3673 def fstat(self, fd): 3674 """Return the os.stat-like tuple for the FakeFile object of file_des. 3675 3676 Args: 3677 fd: The file descriptor of filesystem object to retrieve. 3678 3679 Returns: 3680 The FakeStatResult object corresponding to entry_path. 3681 3682 Raises: 3683 OSError: if the filesystem object doesn't exist. 3684 """ 3685 # stat should return the tuple representing return value of os.stat 3686 file_object = self.filesystem.get_open_file(fd).get_object() 3687 return file_object.stat_result.copy() 3688 3689 def umask(self, mask): 3690 """Change the current umask. 3691 3692 Args: 3693 mask: (int) The new umask value. 3694 3695 Returns: 3696 The old umask. 3697 3698 Raises: 3699 TypeError: if new_mask is of an invalid type. 3700 """ 3701 if not is_int_type(mask): 3702 raise TypeError('an integer is required') 3703 old_umask = self.filesystem.umask 3704 self.filesystem.umask = mask 3705 return old_umask 3706 3707 def chdir(self, path): 3708 """Change current working directory to target directory. 3709 3710 Args: 3711 path: The path to new current working directory. 3712 3713 Raises: 3714 OSError: if user lacks permission to enter the argument directory 3715 or if the target is not a directory. 3716 """ 3717 path = self.filesystem.resolve_path( 3718 path, allow_fd=True) 3719 self.filesystem.confirmdir(path) 3720 directory = self.filesystem.resolve(path) 3721 # A full implementation would check permissions all the way 3722 # up the tree. 3723 if not is_root() and not directory.st_mode | PERM_EXE: 3724 self.filesystem.raise_os_error(errno.EACCES, directory) 3725 self.filesystem.cwd = path 3726 3727 def getcwd(self): 3728 """Return current working directory.""" 3729 return self.filesystem.cwd 3730 3731 def getcwdb(self): 3732 """Return current working directory as bytes.""" 3733 return bytes( 3734 self.filesystem.cwd, locale.getpreferredencoding(False)) 3735 3736 def listdir(self, path): 3737 """Return a list of file names in target_directory. 3738 3739 Args: 3740 path: Path to the target directory within the fake 3741 filesystem. 3742 3743 Returns: 3744 A list of file names within the target directory in arbitrary 3745 order. 3746 3747 Raises: 3748 OSError: if the target is not a directory. 3749 """ 3750 return self.filesystem.listdir(path) 3751 3752 XATTR_CREATE = 1 3753 XATTR_REPLACE = 2 3754 3755 def getxattr(self, path, attribute, *, follow_symlinks=True): 3756 """Return the value of the given extended filesystem attribute for 3757 `path`. 3758 3759 Args: 3760 path: File path, file descriptor or path-like object (for 3761 Python >= 3.6). 3762 attribute: (str or bytes) The attribute name. 3763 follow_symlinks: (bool) If True (the default), symlinks in the 3764 path are traversed. 3765 3766 Returns: 3767 The contents of the extended attribute as bytes or None if 3768 the attribute does not exist. 3769 3770 Raises: 3771 OSError: if the path does not exist. 3772 """ 3773 if not self.filesystem.is_linux: 3774 raise AttributeError( 3775 "module 'os' has no attribute 'getxattr'") 3776 3777 if isinstance(attribute, bytes): 3778 attribute = attribute.decode(sys.getfilesystemencoding()) 3779 file_obj = self.filesystem.resolve(path, follow_symlinks, 3780 allow_fd=True) 3781 return file_obj.xattr.get(attribute) 3782 3783 def listxattr(self, path=None, *, follow_symlinks=True): 3784 """Return a list of the extended filesystem attributes on `path`. 3785 3786 Args: 3787 path: File path, file descriptor or path-like object (for 3788 Python >= 3.6). If None, the current directory is used. 3789 follow_symlinks: (bool) If True (the default), symlinks in the 3790 path are traversed. 3791 3792 Returns: 3793 A list of all attribute names for the given path as str. 3794 3795 Raises: 3796 OSError: if the path does not exist. 3797 """ 3798 if not self.filesystem.is_linux: 3799 raise AttributeError( 3800 "module 'os' has no attribute 'listxattr'") 3801 3802 if path is None: 3803 path = self.getcwd() 3804 file_obj = self.filesystem.resolve(path, follow_symlinks, 3805 allow_fd=True) 3806 return list(file_obj.xattr.keys()) 3807 3808 def removexattr(self, path, attribute, *, follow_symlinks=True): 3809 """Removes the extended filesystem attribute attribute from `path`. 3810 3811 Args: 3812 path: File path, file descriptor or path-like object (for 3813 Python >= 3.6). 3814 attribute: (str or bytes) The attribute name. 3815 follow_symlinks: (bool) If True (the default), symlinks in the 3816 path are traversed. 3817 3818 Raises: 3819 OSError: if the path does not exist. 3820 """ 3821 if not self.filesystem.is_linux: 3822 raise AttributeError( 3823 "module 'os' has no attribute 'removexattr'") 3824 3825 if isinstance(attribute, bytes): 3826 attribute = attribute.decode(sys.getfilesystemencoding()) 3827 file_obj = self.filesystem.resolve(path, follow_symlinks, 3828 allow_fd=True) 3829 if attribute in file_obj.xattr: 3830 del file_obj.xattr[attribute] 3831 3832 def setxattr(self, path, attribute, value, 3833 flags=0, *, follow_symlinks=True): 3834 """Sets the value of the given extended filesystem attribute for 3835 `path`. 3836 3837 Args: 3838 path: File path, file descriptor or path-like object (for 3839 Python >= 3.6). 3840 attribute: The attribute name (str or bytes). 3841 value: (byte-like) The value to be set. 3842 follow_symlinks: (bool) If True (the default), symlinks in the 3843 path are traversed. 3844 3845 Raises: 3846 OSError: if the path does not exist. 3847 TypeError: if `value` is not a byte-like object. 3848 """ 3849 if not self.filesystem.is_linux: 3850 raise AttributeError( 3851 "module 'os' has no attribute 'setxattr'") 3852 3853 if isinstance(attribute, bytes): 3854 attribute = attribute.decode(sys.getfilesystemencoding()) 3855 if not is_byte_string(value): 3856 raise TypeError('a bytes-like object is required') 3857 file_obj = self.filesystem.resolve(path, follow_symlinks, 3858 allow_fd=True) 3859 exists = attribute in file_obj.xattr 3860 if exists and flags == self.XATTR_CREATE: 3861 self.filesystem.raise_os_error(errno.ENODATA, file_obj.path) 3862 if not exists and flags == self.XATTR_REPLACE: 3863 self.filesystem.raise_os_error(errno.EEXIST, file_obj.path) 3864 file_obj.xattr[attribute] = value 3865 3866 if use_scandir: 3867 def scandir(self, path='.'): 3868 """Return an iterator of DirEntry objects corresponding to the 3869 entries in the directory given by path. 3870 3871 Args: 3872 path: Path to the target directory within the fake filesystem. 3873 3874 Returns: 3875 An iterator to an unsorted list of os.DirEntry objects for 3876 each entry in path. 3877 3878 Raises: 3879 OSError: if the target is not a directory. 3880 """ 3881 return scandir(self.filesystem, path) 3882 3883 def walk(self, top, topdown=True, onerror=None, followlinks=False): 3884 """Perform an os.walk operation over the fake filesystem. 3885 3886 Args: 3887 top: The root directory from which to begin walk. 3888 topdown: Determines whether to return the tuples with the root as 3889 the first entry (`True`) or as the last, after all the child 3890 directory tuples (`False`). 3891 onerror: If not `None`, function which will be called to handle the 3892 `os.error` instance provided when `os.listdir()` fails. 3893 followlinks: If `True`, symbolic links are followed. 3894 3895 Yields: 3896 (path, directories, nondirectories) for top and each of its 3897 subdirectories. See the documentation for the builtin os module 3898 for further details. 3899 """ 3900 return walk(self.filesystem, top, topdown, onerror, followlinks) 3901 3902 def readlink(self, path, dir_fd=None): 3903 """Read the target of a symlink. 3904 3905 Args: 3906 path: Symlink to read the target of. 3907 dir_fd: If not `None`, the file descriptor of a directory, 3908 with `path` being relative to this directory. 3909 3910 Returns: 3911 the string representing the path to which the symbolic link points. 3912 3913 Raises: 3914 TypeError: if `path` is None 3915 OSError: (with errno=ENOENT) if path is not a valid path, or 3916 (with errno=EINVAL) if path is valid, but is not a symlink 3917 """ 3918 path = self._path_with_dir_fd(path, self.readlink, dir_fd) 3919 return self.filesystem.readlink(path) 3920 3921 def stat(self, path, *, dir_fd=None, follow_symlinks=True): 3922 """Return the os.stat-like tuple for the FakeFile object of entry_path. 3923 3924 Args: 3925 path: path to filesystem object to retrieve. 3926 dir_fd: (int) If not `None`, the file descriptor of a directory, 3927 with `entry_path` being relative to this directory. 3928 follow_symlinks: (bool) If `False` and `entry_path` points to a 3929 symlink, the link itself is changed instead of the linked 3930 object. 3931 3932 Returns: 3933 The FakeStatResult object corresponding to entry_path. 3934 3935 Raises: 3936 OSError: if the filesystem object doesn't exist. 3937 """ 3938 path = self._path_with_dir_fd(path, self.stat, dir_fd) 3939 return self.filesystem.stat(path, follow_symlinks) 3940 3941 def lstat(self, path, *, dir_fd=None): 3942 """Return the os.stat-like tuple for entry_path, not following symlinks. 3943 3944 Args: 3945 path: path to filesystem object to retrieve. 3946 dir_fd: If not `None`, the file descriptor of a directory, with 3947 `path` being relative to this directory. 3948 3949 Returns: 3950 the FakeStatResult object corresponding to `path`. 3951 3952 Raises: 3953 OSError: if the filesystem object doesn't exist. 3954 """ 3955 # stat should return the tuple representing return value of os.stat 3956 path = self._path_with_dir_fd(path, self.lstat, dir_fd) 3957 return self.filesystem.stat(path, follow_symlinks=False) 3958 3959 def remove(self, path, dir_fd=None): 3960 """Remove the FakeFile object at the specified file path. 3961 3962 Args: 3963 path: Path to file to be removed. 3964 dir_fd: If not `None`, the file descriptor of a directory, 3965 with `path` being relative to this directory. 3966 3967 Raises: 3968 OSError: if path points to a directory. 3969 OSError: if path does not exist. 3970 OSError: if removal failed. 3971 """ 3972 path = self._path_with_dir_fd(path, self.remove, dir_fd) 3973 self.filesystem.remove(path) 3974 3975 def unlink(self, path, *, dir_fd=None): 3976 """Remove the FakeFile object at the specified file path. 3977 3978 Args: 3979 path: Path to file to be removed. 3980 dir_fd: If not `None`, the file descriptor of a directory, 3981 with `path` being relative to this directory. 3982 3983 Raises: 3984 OSError: if path points to a directory. 3985 OSError: if path does not exist. 3986 OSError: if removal failed. 3987 """ 3988 path = self._path_with_dir_fd(path, self.unlink, dir_fd) 3989 self.filesystem.remove(path) 3990 3991 def rename(self, src, dst, *, src_dir_fd=None, dst_dir_fd=None): 3992 """Rename a FakeFile object at old_file_path to new_file_path, 3993 preserving all properties. 3994 Also replaces existing new_file_path object, if one existed 3995 (Unix only). 3996 3997 Args: 3998 src: Path to filesystem object to rename. 3999 dst: Path to where the filesystem object will live 4000 after this call. 4001 src_dir_fd: If not `None`, the file descriptor of a directory, 4002 with `src` being relative to this directory. 4003 dst_dir_fd: If not `None`, the file descriptor of a directory, 4004 with `dst` being relative to this directory. 4005 4006 Raises: 4007 OSError: if old_file_path does not exist. 4008 OSError: if new_file_path is an existing directory. 4009 OSError: if new_file_path is an existing file (Windows only) 4010 OSError: if new_file_path is an existing file and could not 4011 be removed (Unix) 4012 OSError: if `dirname(new_file)` does not exist 4013 OSError: if the file would be moved to another filesystem 4014 (e.g. mount point) 4015 """ 4016 src = self._path_with_dir_fd(src, self.rename, src_dir_fd) 4017 dst = self._path_with_dir_fd(dst, self.rename, dst_dir_fd) 4018 self.filesystem.rename(src, dst) 4019 4020 def replace(self, src, dst, *, src_dir_fd=None, dst_dir_fd=None): 4021 """Renames a FakeFile object at old_file_path to new_file_path, 4022 preserving all properties. 4023 Also replaces existing new_file_path object, if one existed. 4024 4025 Arg 4026 src: Path to filesystem object to rename. 4027 dst: Path to where the filesystem object will live 4028 after this call. 4029 src_dir_fd: If not `None`, the file descriptor of a directory, 4030 with `src` being relative to this directory. 4031 dst_dir_fd: If not `None`, the file descriptor of a directory, 4032 with `dst` being relative to this directory. 4033 4034 Raises: 4035 OSError: if old_file_path does not exist. 4036 OSError: if new_file_path is an existing directory. 4037 OSError: if new_file_path is an existing file and could 4038 not be removed 4039 OSError: if `dirname(new_file)` does not exist 4040 OSError: if the file would be moved to another filesystem 4041 (e.g. mount point) 4042 """ 4043 src = self._path_with_dir_fd(src, self.rename, src_dir_fd) 4044 dst = self._path_with_dir_fd(dst, self.rename, dst_dir_fd) 4045 self.filesystem.rename(src, dst, force_replace=True) 4046 4047 def rmdir(self, path, *, dir_fd=None): 4048 """Remove a leaf Fake directory. 4049 4050 Args: 4051 path: (str) Name of directory to remove. 4052 dir_fd: If not `None`, the file descriptor of a directory, 4053 with `path` being relative to this directory. 4054 4055 Raises: 4056 OSError: if `path` does not exist or is not a directory, 4057 or as per FakeFilesystem.remove_object. Cannot remove '.'. 4058 """ 4059 path = self._path_with_dir_fd(path, self.rmdir, dir_fd) 4060 self.filesystem.rmdir(path) 4061 4062 def removedirs(self, name): 4063 """Remove a leaf fake directory and all empty intermediate ones. 4064 4065 Args: 4066 name: the directory to be removed. 4067 4068 Raises: 4069 OSError: if target_directory does not exist or is not a directory. 4070 OSError: if target_directory is not empty. 4071 """ 4072 name = self.filesystem.absnormpath(name) 4073 directory = self.filesystem.confirmdir(name) 4074 if directory.contents: 4075 self.filesystem.raise_os_error( 4076 errno.ENOTEMPTY, self.path.basename(name)) 4077 else: 4078 self.rmdir(name) 4079 head, tail = self.path.split(name) 4080 if not tail: 4081 head, tail = self.path.split(head) 4082 while head and tail: 4083 head_dir = self.filesystem.confirmdir(head) 4084 if head_dir.contents: 4085 break 4086 # only the top-level dir may not be a symlink 4087 self.filesystem.rmdir(head, allow_symlink=True) 4088 head, tail = self.path.split(head) 4089 4090 def mkdir(self, path, mode=PERM_DEF, *, dir_fd=None): 4091 """Create a leaf Fake directory. 4092 4093 Args: 4094 path: (str) Name of directory to create. 4095 Relative paths are assumed to be relative to '/'. 4096 mode: (int) Mode to create directory with. This argument defaults 4097 to 0o777. The umask is applied to this mode. 4098 dir_fd: If not `None`, the file descriptor of a directory, 4099 with `path` being relative to this directory. 4100 4101 Raises: 4102 OSError: if the directory name is invalid or parent directory is 4103 read only or as per FakeFilesystem.add_object. 4104 """ 4105 path = self._path_with_dir_fd(path, self.mkdir, dir_fd) 4106 try: 4107 self.filesystem.makedir(path, mode) 4108 except OSError as e: 4109 if e.errno == errno.EACCES: 4110 self.filesystem.raise_os_error(e.errno, path) 4111 raise 4112 4113 def makedirs(self, name, mode=PERM_DEF, exist_ok=None): 4114 """Create a leaf Fake directory + create any non-existent parent dirs. 4115 4116 Args: 4117 name: (str) Name of directory to create. 4118 mode: (int) Mode to create directory (and any necessary parent 4119 directories) with. This argument defaults to 0o777. 4120 The umask is applied to this mode. 4121 exist_ok: (boolean) If exist_ok is False (the default), an OSError 4122 is raised if the target directory already exists. 4123 4124 Raises: 4125 OSError: if the directory already exists and exist_ok=False, or as 4126 per :py:meth:`FakeFilesystem.create_dir`. 4127 """ 4128 if exist_ok is None: 4129 exist_ok = False 4130 self.filesystem.makedirs(name, mode, exist_ok) 4131 4132 def _path_with_dir_fd(self, path, fct, dir_fd): 4133 """Return the path considering dir_fd. Raise on invalid parameters.""" 4134 path = to_string(path) 4135 if dir_fd is not None: 4136 # check if fd is supported for the built-in real function 4137 real_fct = getattr(os, fct.__name__) 4138 if real_fct not in self.supports_dir_fd: 4139 raise NotImplementedError( 4140 'dir_fd unavailable on this platform') 4141 if isinstance(path, int): 4142 raise ValueError("%s: Can't specify dir_fd without " 4143 "matching path" % fct.__name__) 4144 if not self.path.isabs(path): 4145 return self.path.join( 4146 self.filesystem.get_open_file( 4147 dir_fd).get_object().path, path) 4148 return path 4149 4150 def access(self, path, mode, *, dir_fd=None, follow_symlinks=True): 4151 """Check if a file exists and has the specified permissions. 4152 4153 Args: 4154 path: (str) Path to the file. 4155 mode: (int) Permissions represented as a bitwise-OR combination of 4156 os.F_OK, os.R_OK, os.W_OK, and os.X_OK. 4157 dir_fd: If not `None`, the file descriptor of a directory, with 4158 `path` being relative to this directory. 4159 follow_symlinks: (bool) If `False` and `path` points to a symlink, 4160 the link itself is queried instead of the linked object. 4161 4162 Returns: 4163 bool, `True` if file is accessible, `False` otherwise. 4164 """ 4165 path = self._path_with_dir_fd(path, self.access, dir_fd) 4166 try: 4167 stat_result = self.stat(path, follow_symlinks=follow_symlinks) 4168 except OSError as os_error: 4169 if os_error.errno == errno.ENOENT: 4170 return False 4171 raise 4172 if is_root(): 4173 mode &= ~os.W_OK 4174 return (mode & ((stat_result.st_mode >> 6) & 7)) == mode 4175 4176 def chmod(self, path, mode, *, dir_fd=None, follow_symlinks=True): 4177 """Change the permissions of a file as encoded in integer mode. 4178 4179 Args: 4180 path: (str) Path to the file. 4181 mode: (int) Permissions. 4182 dir_fd: If not `None`, the file descriptor of a directory, with 4183 `path` being relative to this directory. 4184 follow_symlinks: (bool) If `False` and `path` points to a symlink, 4185 the link itself is queried instead of the linked object. 4186 """ 4187 path = self._path_with_dir_fd(path, self.chmod, dir_fd) 4188 self.filesystem.chmod(path, mode, follow_symlinks) 4189 4190 def lchmod(self, path, mode): 4191 """Change the permissions of a file as encoded in integer mode. 4192 If the file is a link, the permissions of the link are changed. 4193 4194 Args: 4195 path: (str) Path to the file. 4196 mode: (int) Permissions. 4197 """ 4198 if self.filesystem.is_windows_fs: 4199 raise (NameError, "name 'lchmod' is not defined") 4200 self.filesystem.chmod(path, mode, follow_symlinks=False) 4201 4202 def utime(self, path, times=None, ns=None, 4203 dir_fd=None, follow_symlinks=True): 4204 """Change the access and modified times of a file. 4205 4206 Args: 4207 path: (str) Path to the file. 4208 times: 2-tuple of int or float numbers, of the form (atime, mtime) 4209 which is used to set the access and modified times in seconds. 4210 If None, both times are set to the current time. 4211 ns: 2-tuple of int numbers, of the form (atime, mtime) which is 4212 used to set the access and modified times in nanoseconds. 4213 If None, both times are set to the current time. 4214 dir_fd: If not `None`, the file descriptor of a directory, 4215 with `path` being relative to this directory. 4216 follow_symlinks: (bool) If `False` and `path` points to a symlink, 4217 the link itself is queried instead of the linked object. 4218 4219 Raises: 4220 TypeError: If anything other than the expected types is 4221 specified in the passed `times` or `ns` tuple, 4222 or if the tuple length is not equal to 2. 4223 ValueError: If both times and ns are specified. 4224 """ 4225 path = self._path_with_dir_fd(path, self.utime, dir_fd) 4226 self.filesystem.utime( 4227 path, times=times, ns=ns, follow_symlinks=follow_symlinks) 4228 4229 def chown(self, path, uid, gid, *, dir_fd=None, follow_symlinks=True): 4230 """Set ownership of a faked file. 4231 4232 Args: 4233 path: (str) Path to the file or directory. 4234 uid: (int) Numeric uid to set the file or directory to. 4235 gid: (int) Numeric gid to set the file or directory to. 4236 dir_fd: (int) If not `None`, the file descriptor of a directory, 4237 with `path` being relative to this directory. 4238 follow_symlinks: (bool) If `False` and path points to a symlink, 4239 the link itself is changed instead of the linked object. 4240 4241 Raises: 4242 OSError: if path does not exist. 4243 4244 `None` is also allowed for `uid` and `gid`. This permits `os.rename` 4245 to use `os.chown` even when the source file `uid` and `gid` are 4246 `None` (unset). 4247 """ 4248 path = self._path_with_dir_fd(path, self.chown, dir_fd) 4249 file_object = self.filesystem.resolve( 4250 path, follow_symlinks, allow_fd=True) 4251 if not ((is_int_type(uid) or uid is None) and 4252 (is_int_type(gid) or gid is None)): 4253 raise TypeError("An integer is required") 4254 if uid != -1: 4255 file_object.st_uid = uid 4256 if gid != -1: 4257 file_object.st_gid = gid 4258 4259 def mknod(self, path, mode=None, device=0, *, dir_fd=None): 4260 """Create a filesystem node named 'filename'. 4261 4262 Does not support device special files or named pipes as the real os 4263 module does. 4264 4265 Args: 4266 path: (str) Name of the file to create 4267 mode: (int) Permissions to use and type of file to be created. 4268 Default permissions are 0o666. Only the stat.S_IFREG file type 4269 is supported by the fake implementation. The umask is applied 4270 to this mode. 4271 device: not supported in fake implementation 4272 dir_fd: If not `None`, the file descriptor of a directory, 4273 with `path` being relative to this directory. 4274 4275 Raises: 4276 OSError: if called with unsupported options or the file can not be 4277 created. 4278 """ 4279 if self.filesystem.is_windows_fs: 4280 raise (AttributeError, "module 'os' has no attribute 'mknode'") 4281 if mode is None: 4282 # note that a default value of 0o600 without a device type is 4283 # documented - this is not how it seems to work 4284 mode = S_IFREG | 0o600 4285 if device or not mode & S_IFREG and not is_root(): 4286 self.filesystem.raise_os_error(errno.EPERM) 4287 4288 path = self._path_with_dir_fd(path, self.mknod, dir_fd) 4289 head, tail = self.path.split(path) 4290 if not tail: 4291 if self.filesystem.exists(head, check_link=True): 4292 self.filesystem.raise_os_error(errno.EEXIST, path) 4293 self.filesystem.raise_os_error(errno.ENOENT, path) 4294 if tail in (b'.', u'.', b'..', u'..'): 4295 self.filesystem.raise_os_error(errno.ENOENT, path) 4296 if self.filesystem.exists(path, check_link=True): 4297 self.filesystem.raise_os_error(errno.EEXIST, path) 4298 self.filesystem.add_object(head, FakeFile( 4299 tail, mode & ~self.filesystem.umask, 4300 filesystem=self.filesystem)) 4301 4302 def symlink(self, src, dst, *, dir_fd=None): 4303 """Creates the specified symlink, pointed at the specified link target. 4304 4305 Args: 4306 src: The target of the symlink. 4307 dst: Path to the symlink to create. 4308 dir_fd: If not `None`, the file descriptor of a directory, 4309 with `src` being relative to this directory. 4310 4311 Raises: 4312 OSError: if the file already exists. 4313 """ 4314 src = self._path_with_dir_fd(src, self.symlink, dir_fd) 4315 self.filesystem.create_symlink( 4316 dst, src, create_missing_dirs=False) 4317 4318 def link(self, src, dst, *, src_dir_fd=None, dst_dir_fd=None): 4319 """Create a hard link at new_path, pointing at old_path. 4320 4321 Args: 4322 src: An existing path to the target file. 4323 dst: The destination path to create a new link at. 4324 src_dir_fd: If not `None`, the file descriptor of a directory, 4325 with `src` being relative to this directory. 4326 dst_dir_fd: If not `None`, the file descriptor of a directory, 4327 with `dst` being relative to this directory. 4328 4329 Returns: 4330 The FakeFile object referred to by `src`. 4331 4332 Raises: 4333 OSError: if something already exists at new_path. 4334 OSError: if the parent directory doesn't exist. 4335 """ 4336 src = self._path_with_dir_fd(src, self.link, src_dir_fd) 4337 dst = self._path_with_dir_fd(dst, self.link, dst_dir_fd) 4338 self.filesystem.link(src, dst) 4339 4340 def fsync(self, fd): 4341 """Perform fsync for a fake file (in other words, do nothing). 4342 4343 Args: 4344 fd: The file descriptor of the open file. 4345 4346 Raises: 4347 OSError: file_des is an invalid file descriptor. 4348 TypeError: file_des is not an integer. 4349 """ 4350 # Throw an error if file_des isn't valid 4351 if 0 <= fd < NR_STD_STREAMS: 4352 self.filesystem.raise_os_error(errno.EINVAL) 4353 file_object = self.filesystem.get_open_file(fd) 4354 if self.filesystem.is_windows_fs: 4355 if (not hasattr(file_object, 'allow_update') or 4356 not file_object.allow_update): 4357 self.filesystem.raise_os_error( 4358 errno.EBADF, file_object.file_path) 4359 4360 def fdatasync(self, fd): 4361 """Perform fdatasync for a fake file (in other words, do nothing). 4362 4363 Args: 4364 fd: The file descriptor of the open file. 4365 4366 Raises: 4367 OSError: `fd` is an invalid file descriptor. 4368 TypeError: `fd` is not an integer. 4369 """ 4370 if self.filesystem.is_windows_fs or self.filesystem.is_macos: 4371 raise AttributeError("module 'os' has no attribute 'fdatasync'") 4372 # Throw an error if file_des isn't valid 4373 if 0 <= fd < NR_STD_STREAMS: 4374 self.filesystem.raise_os_error(errno.EINVAL) 4375 self.filesystem.get_open_file(fd) 4376 4377 def sendfile(self, fd_out, fd_in, offset, count): 4378 """Copy count bytes from file descriptor fd_in to file descriptor 4379 fd_out starting at offset. 4380 4381 Args: 4382 fd_out: The file descriptor of the destination file. 4383 fd_in: The file descriptor of the source file. 4384 offset: The offset in bytes where to start the copy in the 4385 source file. If `None` (Linux only), copying is started at 4386 the current position, and the position is updated. 4387 count: The number of bytes to copy. If 0, all remaining bytes 4388 are copied (MacOs only). 4389 4390 Raises: 4391 OSError: If `fd_in` or `fd_out` is an invalid file descriptor. 4392 TypeError: If `fd_in` or `fd_out` is not an integer. 4393 TypeError: If `offset` is None under MacOs. 4394 """ 4395 if self.filesystem.is_windows_fs: 4396 raise AttributeError("module 'os' has no attribute 'sendfile'") 4397 if 0 <= fd_in < NR_STD_STREAMS: 4398 self.filesystem.raise_os_error(errno.EINVAL) 4399 if 0 <= fd_out < NR_STD_STREAMS: 4400 self.filesystem.raise_os_error(errno.EINVAL) 4401 source = self.filesystem.get_open_file(fd_in) 4402 dest = self.filesystem.get_open_file(fd_out) 4403 if self.filesystem.is_macos: 4404 if dest.get_object().stat_result.st_mode & 0o777000 != S_IFSOCK: 4405 raise OSError('Socket operation on non-socket') 4406 if offset is None: 4407 if self.filesystem.is_macos: 4408 raise TypeError('None is not a valid offset') 4409 contents = source.read(count) 4410 else: 4411 position = source.tell() 4412 source.seek(offset) 4413 if count == 0 and self.filesystem.is_macos: 4414 contents = source.read() 4415 else: 4416 contents = source.read(count) 4417 source.seek(position) 4418 if contents: 4419 written = dest.write(contents) 4420 dest.flush() 4421 return written 4422 return 0 4423 4424 def __getattr__(self, name): 4425 """Forwards any unfaked calls to the standard os module.""" 4426 return getattr(self._os_module, name) 4427 4428 4429class FakeIoModule: 4430 """Uses FakeFilesystem to provide a fake io module replacement. 4431 4432 Currently only used to wrap `io.open()` which is an alias to `open()`. 4433 4434 You need a fake_filesystem to use this: 4435 filesystem = fake_filesystem.FakeFilesystem() 4436 my_io_module = fake_filesystem.FakeIoModule(filesystem) 4437 """ 4438 4439 @staticmethod 4440 def dir(): 4441 """Return the list of patched function names. Used for patching 4442 functions imported from the module. 4443 """ 4444 return 'open', 4445 4446 def __init__(self, filesystem): 4447 """ 4448 Args: 4449 filesystem: FakeFilesystem used to provide file system information. 4450 """ 4451 self.filesystem = filesystem 4452 self._io_module = io 4453 4454 def open(self, file, mode='r', buffering=-1, encoding=None, 4455 errors=None, newline=None, closefd=True, opener=None): 4456 """Redirect the call to FakeFileOpen. 4457 See FakeFileOpen.call() for description. 4458 """ 4459 fake_open = FakeFileOpen(self.filesystem) 4460 return fake_open(file, mode, buffering, encoding, errors, 4461 newline, closefd, opener) 4462 4463 def __getattr__(self, name): 4464 """Forwards any unfaked calls to the standard io module.""" 4465 return getattr(self._io_module, name) 4466 4467 4468class FakeFileWrapper: 4469 """Wrapper for a stream object for use by a FakeFile object. 4470 4471 If the wrapper has any data written to it, it will propagate to 4472 the FakeFile object on close() or flush(). 4473 """ 4474 4475 def __init__(self, file_object, file_path, update=False, read=False, 4476 append=False, delete_on_close=False, filesystem=None, 4477 newline=None, binary=True, closefd=True, encoding=None, 4478 errors=None, raw_io=False, is_stream=False): 4479 self.file_object = file_object 4480 self.file_path = file_path 4481 self._append = append 4482 self._read = read 4483 self.allow_update = update 4484 self._closefd = closefd 4485 self._file_epoch = file_object.epoch 4486 self.raw_io = raw_io 4487 self._binary = binary 4488 self.is_stream = is_stream 4489 self._changed = False 4490 contents = file_object.byte_contents 4491 self._encoding = encoding or locale.getpreferredencoding(False) 4492 errors = errors or 'strict' 4493 buffer_class = (NullFileBufferIO if file_object == filesystem.dev_null 4494 else FileBufferIO) 4495 self._io = buffer_class(contents, linesep=filesystem.line_separator(), 4496 binary=binary, encoding=encoding, 4497 newline=newline, errors=errors) 4498 4499 self._read_whence = 0 4500 self._read_seek = 0 4501 self._flush_pos = 0 4502 if contents: 4503 self._flush_pos = len(contents) 4504 if update: 4505 if not append: 4506 self._io.seek(0) 4507 else: 4508 self._io.seek(self._flush_pos) 4509 self._read_seek = self._io.tell() 4510 4511 if delete_on_close: 4512 assert filesystem, 'delete_on_close=True requires filesystem' 4513 self._filesystem = filesystem 4514 self.delete_on_close = delete_on_close 4515 # override, don't modify FakeFile.name, as FakeFilesystem expects 4516 # it to be the file name only, no directories. 4517 self.name = file_object.opened_as 4518 self.filedes = None 4519 4520 def __enter__(self): 4521 """To support usage of this fake file with the 'with' statement.""" 4522 return self 4523 4524 def __exit__(self, type, value, traceback): 4525 """To support usage of this fake file with the 'with' statement.""" 4526 self.close() 4527 4528 def _raise(self, message): 4529 if self.raw_io: 4530 self._filesystem.raise_os_error(errno.EBADF, self.file_path) 4531 raise io.UnsupportedOperation(message) 4532 4533 def get_object(self): 4534 """Return the FakeFile object that is wrapped by the current instance. 4535 """ 4536 return self.file_object 4537 4538 def fileno(self): 4539 """Return the file descriptor of the file object.""" 4540 return self.filedes 4541 4542 def close(self): 4543 """Close the file.""" 4544 # ignore closing a closed file 4545 if not self._is_open(): 4546 return 4547 4548 # for raw io, all writes are flushed immediately 4549 if self.allow_update and not self.raw_io: 4550 self.flush() 4551 if self._filesystem.is_windows_fs and self._changed: 4552 self.file_object.st_mtime = time.time() 4553 if self._closefd: 4554 self._filesystem._close_open_file(self.filedes) 4555 else: 4556 self._filesystem.open_files[self.filedes].remove(self) 4557 if self.delete_on_close: 4558 self._filesystem.remove_object(self.get_object().path) 4559 4560 @property 4561 def closed(self): 4562 """Simulate the `closed` attribute on file.""" 4563 return not self._is_open() 4564 4565 def flush(self): 4566 """Flush file contents to 'disk'.""" 4567 self._check_open_file() 4568 if self.allow_update and not self.is_stream: 4569 contents = self._io.getvalue() 4570 if self._append: 4571 self._sync_io() 4572 old_contents = (self.file_object.byte_contents 4573 if is_byte_string(contents) else 4574 self.file_object.contents) 4575 contents = old_contents + contents[self._flush_pos:] 4576 self._set_stream_contents(contents) 4577 self.update_flush_pos() 4578 else: 4579 self._io.flush() 4580 if self.file_object.set_contents(contents, self._encoding): 4581 if self._filesystem.is_windows_fs: 4582 self._changed = True 4583 else: 4584 current_time = time.time() 4585 self.file_object.st_ctime = current_time 4586 self.file_object.st_mtime = current_time 4587 self._file_epoch = self.file_object.epoch 4588 4589 if not self.is_stream: 4590 self._flush_related_files() 4591 4592 def update_flush_pos(self): 4593 self._flush_pos = self._io.tell() 4594 4595 def _flush_related_files(self): 4596 for open_files in self._filesystem.open_files[3:]: 4597 if open_files is not None: 4598 for open_file in open_files: 4599 if (open_file is not self and 4600 self.file_object == open_file.file_object and 4601 not open_file._append): 4602 open_file._sync_io() 4603 4604 def seek(self, offset, whence=0): 4605 """Move read/write pointer in 'file'.""" 4606 self._check_open_file() 4607 if not self._append: 4608 self._io.seek(offset, whence) 4609 else: 4610 self._read_seek = offset 4611 self._read_whence = whence 4612 if not self.is_stream: 4613 self.flush() 4614 4615 def tell(self): 4616 """Return the file's current position. 4617 4618 Returns: 4619 int, file's current position in bytes. 4620 """ 4621 self._check_open_file() 4622 if not self.is_stream: 4623 self.flush() 4624 4625 if not self._append: 4626 return self._io.tell() 4627 if self._read_whence: 4628 write_seek = self._io.tell() 4629 self._io.seek(self._read_seek, self._read_whence) 4630 self._read_seek = self._io.tell() 4631 self._read_whence = 0 4632 self._io.seek(write_seek) 4633 return self._read_seek 4634 4635 def _sync_io(self): 4636 """Update the stream with changes to the file object contents.""" 4637 if self._file_epoch == self.file_object.epoch: 4638 return 4639 4640 if self._io.binary: 4641 contents = self.file_object.byte_contents 4642 else: 4643 contents = self.file_object.contents 4644 4645 self._set_stream_contents(contents) 4646 self._file_epoch = self.file_object.epoch 4647 4648 def _set_stream_contents(self, contents): 4649 whence = self._io.tell() 4650 self._io.seek(0) 4651 self._io.truncate() 4652 if not self._io.binary and is_byte_string(contents): 4653 contents = contents.decode(self._encoding) 4654 self._io.putvalue(contents) 4655 if not self._append: 4656 self._io.seek(whence) 4657 4658 def _read_wrappers(self, name): 4659 """Wrap a stream attribute in a read wrapper. 4660 4661 Returns a read_wrapper which tracks our own read pointer since the 4662 stream object has no concept of a different read and write pointer. 4663 4664 Args: 4665 name: The name of the attribute to wrap. Should be a read call. 4666 4667 Returns: 4668 The read_wrapper function. 4669 """ 4670 io_attr = getattr(self._io, name) 4671 4672 def read_wrapper(*args, **kwargs): 4673 """Wrap all read calls to the stream object. 4674 4675 We do this to track the read pointer separate from the write 4676 pointer. Anything that wants to read from the stream object 4677 while we're in append mode goes through this. 4678 4679 Args: 4680 *args: pass through args 4681 **kwargs: pass through kwargs 4682 Returns: 4683 Wrapped stream object method 4684 """ 4685 self._io.seek(self._read_seek, self._read_whence) 4686 ret_value = io_attr(*args, **kwargs) 4687 self._read_seek = self._io.tell() 4688 self._read_whence = 0 4689 self._io.seek(0, 2) 4690 return ret_value 4691 4692 return read_wrapper 4693 4694 def _other_wrapper(self, name, writing): 4695 """Wrap a stream attribute in an other_wrapper. 4696 4697 Args: 4698 name: the name of the stream attribute to wrap. 4699 4700 Returns: 4701 other_wrapper which is described below. 4702 """ 4703 io_attr = getattr(self._io, name) 4704 4705 def other_wrapper(*args, **kwargs): 4706 """Wrap all other calls to the stream Object. 4707 4708 We do this to track changes to the write pointer. Anything that 4709 moves the write pointer in a file open for appending should move 4710 the read pointer as well. 4711 4712 Args: 4713 *args: Pass through args. 4714 **kwargs: Pass through kwargs. 4715 4716 Returns: 4717 Wrapped stream object method. 4718 """ 4719 write_seek = self._io.tell() 4720 ret_value = io_attr(*args, **kwargs) 4721 if write_seek != self._io.tell(): 4722 self._read_seek = self._io.tell() 4723 self._read_whence = 0 4724 return ret_value 4725 4726 return other_wrapper 4727 4728 def _adapt_size_for_related_files(self, size): 4729 for open_files in self._filesystem.open_files[3:]: 4730 if open_files is not None: 4731 for open_file in open_files: 4732 if (open_file is not self and 4733 self.file_object == open_file.file_object and 4734 open_file._append): 4735 open_file._read_seek += size 4736 4737 def _truncate_wrapper(self): 4738 """Wrap truncate() to allow flush after truncate. 4739 4740 Returns: 4741 Wrapper which is described below. 4742 """ 4743 io_attr = getattr(self._io, 'truncate') 4744 4745 def truncate_wrapper(*args, **kwargs): 4746 """Wrap truncate call to call flush after truncate.""" 4747 if self._append: 4748 self._io.seek(self._read_seek, self._read_whence) 4749 size = io_attr(*args, **kwargs) 4750 self.flush() 4751 if not self.is_stream: 4752 self.file_object.size = size 4753 buffer_size = len(self._io.getvalue()) 4754 if buffer_size < size: 4755 self._io.seek(buffer_size) 4756 self._io.write('\0' * (size - buffer_size)) 4757 self.file_object.set_contents( 4758 self._io.getvalue(), self._encoding) 4759 self._flush_pos = size 4760 self._adapt_size_for_related_files(size - buffer_size) 4761 4762 self.flush() 4763 return size 4764 4765 return truncate_wrapper 4766 4767 def size(self): 4768 """Return the content size in bytes of the wrapped file.""" 4769 return self.file_object.st_size 4770 4771 def __getattr__(self, name): 4772 if self.file_object.is_large_file(): 4773 raise FakeLargeFileIoException(self.file_path) 4774 4775 reading = name.startswith('read') or name == 'next' 4776 truncate = name == 'truncate' 4777 writing = name.startswith('write') or truncate 4778 4779 if reading or writing: 4780 self._check_open_file() 4781 if not self._read and reading: 4782 return self._read_error() 4783 if not self.allow_update and writing: 4784 return self._write_error() 4785 4786 if reading: 4787 self._sync_io() 4788 if not self.is_stream: 4789 self.flush() 4790 if not self._filesystem.is_windows_fs: 4791 self.file_object.st_atime = time.time() 4792 if truncate: 4793 return self._truncate_wrapper() 4794 if self._append: 4795 if reading: 4796 return self._read_wrappers(name) 4797 else: 4798 return self._other_wrapper(name, writing) 4799 4800 return getattr(self._io, name) 4801 4802 def _read_error(self): 4803 def read_error(*args, **kwargs): 4804 """Throw an error unless the argument is zero.""" 4805 if args and args[0] == 0: 4806 if self._filesystem.is_windows_fs and self.raw_io: 4807 return b'' if self._binary else u'' 4808 self._raise('File is not open for reading.') 4809 4810 return read_error 4811 4812 def _write_error(self): 4813 def write_error(*args, **kwargs): 4814 """Throw an error.""" 4815 if self.raw_io: 4816 if (self._filesystem.is_windows_fs and args 4817 and len(args[0]) == 0): 4818 return 0 4819 self._raise('File is not open for writing.') 4820 4821 return write_error 4822 4823 def _is_open(self): 4824 return (self.filedes < len(self._filesystem.open_files) and 4825 self._filesystem.open_files[self.filedes] is not None and 4826 self in self._filesystem.open_files[self.filedes]) 4827 4828 def _check_open_file(self): 4829 if not self.is_stream and not self._is_open(): 4830 raise ValueError('I/O operation on closed file') 4831 4832 def __iter__(self): 4833 if not self._read: 4834 self._raise('File is not open for reading') 4835 return self._io.__iter__() 4836 4837 def __next__(self): 4838 if not self._read: 4839 self._raise('File is not open for reading') 4840 return next(self._io) 4841 4842 4843class StandardStreamWrapper: 4844 """Wrapper for a system standard stream to be used in open files list. 4845 """ 4846 4847 def __init__(self, stream_object): 4848 self._stream_object = stream_object 4849 self.filedes = None 4850 4851 def get_object(self): 4852 return self._stream_object 4853 4854 def fileno(self): 4855 """Return the file descriptor of the wrapped standard stream.""" 4856 return self.filedes 4857 4858 def close(self): 4859 """We do not support closing standard streams.""" 4860 pass 4861 4862 def is_stream(self): 4863 return True 4864 4865 4866class FakeDirWrapper: 4867 """Wrapper for a FakeDirectory object to be used in open files list. 4868 """ 4869 4870 def __init__(self, file_object, file_path, filesystem): 4871 self.file_object = file_object 4872 self.file_path = file_path 4873 self._filesystem = filesystem 4874 self.filedes = None 4875 4876 def get_object(self): 4877 """Return the FakeFile object that is wrapped by the current instance. 4878 """ 4879 return self.file_object 4880 4881 def fileno(self): 4882 """Return the file descriptor of the file object.""" 4883 return self.filedes 4884 4885 def close(self): 4886 """Close the directory.""" 4887 self._filesystem._close_open_file(self.filedes) 4888 4889 4890class FakePipeWrapper: 4891 """Wrapper for a read or write descriptor of a real pipe object to be 4892 used in open files list. 4893 """ 4894 4895 def __init__(self, filesystem, fd): 4896 self._filesystem = filesystem 4897 self.fd = fd # the real file descriptor 4898 self.file_object = None 4899 self.filedes = None 4900 4901 def get_object(self): 4902 return self.file_object 4903 4904 def fileno(self): 4905 """Return the fake file descriptor of the pipe object.""" 4906 return self.filedes 4907 4908 def read(self, numBytes): 4909 """Read from the real pipe.""" 4910 return os.read(self.fd, numBytes) 4911 4912 def write(self, contents): 4913 """Write to the real pipe.""" 4914 return os.write(self.fd, contents) 4915 4916 def close(self): 4917 """Close the pipe descriptor.""" 4918 self._filesystem.open_files[self.filedes].remove(self) 4919 os.close(self.fd) 4920 4921 4922Deprecator.add(FakeFileWrapper, FakeFileWrapper.get_object, 'GetObject') 4923Deprecator.add(FakeFileWrapper, FakeFileWrapper.size, 'Size') 4924 4925 4926class FakeFileOpen: 4927 """Faked `file()` and `open()` function replacements. 4928 4929 Returns FakeFile objects in a FakeFilesystem in place of the `file()` 4930 or `open()` function. 4931 """ 4932 __name__ = 'FakeFileOpen' 4933 4934 def __init__(self, filesystem, delete_on_close=False, raw_io=False): 4935 """ 4936 Args: 4937 filesystem: FakeFilesystem used to provide file system information 4938 delete_on_close: optional boolean, deletes file on close() 4939 """ 4940 self.filesystem = filesystem 4941 self._delete_on_close = delete_on_close 4942 self.raw_io = raw_io 4943 4944 def __call__(self, *args, **kwargs): 4945 """Redirects calls to file() or open() to appropriate method.""" 4946 return self.call(*args, **kwargs) 4947 4948 def call(self, file_, mode='r', buffering=-1, encoding=None, 4949 errors=None, newline=None, closefd=True, opener=None, 4950 open_modes=None): 4951 """Return a file-like object with the contents of the target 4952 file object. 4953 4954 Args: 4955 file_: Path to target file or a file descriptor. 4956 mode: Additional file modes (all modes in `open()` are supported). 4957 buffering: ignored. (Used for signature compliance with 4958 __builtin__.open) 4959 encoding: The encoding used to encode unicode strings / decode 4960 bytes. 4961 errors: (str) Defines how encoding errors are handled. 4962 newline: Controls universal newlines, passed to stream object. 4963 closefd: If a file descriptor rather than file name is passed, 4964 and this is set to `False`, then the file descriptor is kept 4965 open when file is closed. 4966 opener: not supported. 4967 open_modes: Modes for opening files if called from low-level API. 4968 4969 Returns: 4970 A file-like object containing the contents of the target file. 4971 4972 Raises: 4973 OSError depending on Python version / call mode: 4974 - if the target object is a directory 4975 - on an invalid path 4976 - if the file does not exist when it should 4977 - if the file exists but should not 4978 - if permission is denied 4979 ValueError: for an invalid mode or mode combination 4980 """ 4981 binary = 'b' in mode 4982 newline, open_modes = self._handle_file_mode(mode, newline, open_modes) 4983 4984 file_object, file_path, filedes, real_path = self._handle_file_arg( 4985 file_) 4986 if not filedes: 4987 closefd = True 4988 4989 if (open_modes.must_not_exist and 4990 (file_object or self.filesystem.islink(file_path) and 4991 not self.filesystem.is_windows_fs)): 4992 self.filesystem.raise_os_error(errno.EEXIST, file_path) 4993 4994 file_object = self._init_file_object(file_object, 4995 file_path, open_modes, 4996 real_path) 4997 4998 if S_ISDIR(file_object.st_mode): 4999 if self.filesystem.is_windows_fs: 5000 self.filesystem.raise_os_error(errno.EACCES, file_path) 5001 else: 5002 self.filesystem.raise_os_error(errno.EISDIR, file_path) 5003 5004 # If you print obj.name, the argument to open() must be printed. 5005 # Not the abspath, not the filename, but the actual argument. 5006 file_object.opened_as = file_path 5007 if open_modes.truncate: 5008 current_time = time.time() 5009 file_object.st_mtime = current_time 5010 if not self.filesystem.is_windows_fs: 5011 file_object.st_ctime = current_time 5012 5013 fakefile = FakeFileWrapper(file_object, 5014 file_path, 5015 update=open_modes.can_write, 5016 read=open_modes.can_read, 5017 append=open_modes.append, 5018 delete_on_close=self._delete_on_close, 5019 filesystem=self.filesystem, 5020 newline=newline, 5021 binary=binary, 5022 closefd=closefd, 5023 encoding=encoding, 5024 errors=errors, 5025 raw_io=self.raw_io) 5026 if filedes is not None: 5027 fakefile.filedes = filedes 5028 # replace the file wrapper 5029 self.filesystem.open_files[filedes].append(fakefile) 5030 else: 5031 fakefile.filedes = self.filesystem._add_open_file(fakefile) 5032 return fakefile 5033 5034 def _init_file_object(self, file_object, file_path, 5035 open_modes, real_path): 5036 if file_object: 5037 if (not is_root() and 5038 ((open_modes.can_read and 5039 not file_object.st_mode & PERM_READ) 5040 or (open_modes.can_write and 5041 not file_object.st_mode & PERM_WRITE))): 5042 self.filesystem.raise_os_error(errno.EACCES, file_path) 5043 if open_modes.can_write: 5044 if open_modes.truncate: 5045 file_object.set_contents('') 5046 else: 5047 if open_modes.must_exist: 5048 self.filesystem.raise_os_error(errno.ENOENT, file_path) 5049 if self.filesystem.islink(file_path): 5050 link_object = self.filesystem.resolve(file_path, 5051 follow_symlinks=False) 5052 target_path = link_object.contents 5053 else: 5054 target_path = file_path 5055 if self.filesystem.ends_with_path_separator(target_path): 5056 error = (errno.EINVAL if self.filesystem.is_windows_fs 5057 else errno.ENOENT if self.filesystem.is_macos 5058 else errno.EISDIR) 5059 self.filesystem.raise_os_error(error, file_path) 5060 file_object = self.filesystem.create_file_internally( 5061 real_path, create_missing_dirs=False, 5062 apply_umask=True, raw_io=self.raw_io) 5063 return file_object 5064 5065 def _handle_file_arg(self, file_): 5066 file_object = None 5067 if isinstance(file_, int): 5068 # opening a file descriptor 5069 filedes = file_ 5070 wrapper = self.filesystem.get_open_file(filedes) 5071 self._delete_on_close = wrapper.delete_on_close 5072 file_object = self.filesystem.get_open_file(filedes).get_object() 5073 file_path = file_object.name 5074 real_path = file_path 5075 else: 5076 # open a file file by path 5077 filedes = None 5078 file_path = file_ 5079 if file_path == self.filesystem.dev_null.name: 5080 file_object = self.filesystem.dev_null 5081 real_path = file_path 5082 else: 5083 real_path = self.filesystem.resolve_path( 5084 file_path, raw_io=self.raw_io) 5085 if self.filesystem.exists(file_path): 5086 file_object = self.filesystem.get_object_from_normpath( 5087 real_path, check_read_perm=False) 5088 return file_object, file_path, filedes, real_path 5089 5090 def _handle_file_mode(self, mode, newline, open_modes): 5091 orig_modes = mode # Save original modes for error messages. 5092 # Normalize modes. Handle 't' and 'U'. 5093 if 'b' in mode and 't' in mode: 5094 raise ValueError('Invalid mode: ' + mode) 5095 mode = mode.replace('t', '').replace('b', '') 5096 mode = mode.replace('rU', 'r').replace('U', 'r') 5097 if not self.raw_io: 5098 if mode not in _OPEN_MODE_MAP: 5099 raise ValueError('Invalid mode: %r' % orig_modes) 5100 open_modes = _OpenModes(*_OPEN_MODE_MAP[mode]) 5101 return newline, open_modes 5102 5103 5104def _run_doctest(): 5105 import doctest 5106 import pyfakefs 5107 return doctest.testmod(pyfakefs.fake_filesystem) 5108 5109 5110if __name__ == '__main__': 5111 _run_doctest() 5112