1# Licensed under the Apache License, Version 2.0 (the "License"); 2# you may not use this file except in compliance with the License. 3# You may obtain a copy of the License at 4# 5# http://www.apache.org/licenses/LICENSE-2.0 6# 7# Unless required by applicable law or agreed to in writing, software 8# distributed under the License is distributed on an "AS IS" BASIS, 9# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 10# See the License for the specific language governing permissions and 11# limitations under the License. 12 13"""A fake implementation for the `scandir` function working with 14FakeFilesystem. 15Works with both the function integrated into the `os` module since Python 3.5 16and the standalone function available in the standalone `scandir` python 17package. 18""" 19 20import os 21import sys 22 23from pyfakefs.helpers import to_string, make_string_path 24 25 26class DirEntry(os.PathLike): 27 """Emulates os.DirEntry. Note that we did not enforce keyword only 28 arguments.""" 29 30 def __init__(self, filesystem): 31 """Initialize the dir entry with unset values. 32 33 Args: 34 filesystem: the fake filesystem used for implementation. 35 """ 36 self._filesystem = filesystem 37 self.name = "" 38 self.path = "" 39 self._abspath = "" 40 self._inode = None 41 self._islink = False 42 self._isdir = False 43 self._statresult = None 44 self._statresult_symlink = None 45 46 def inode(self): 47 """Return the inode number of the entry.""" 48 if self._inode is None: 49 self.stat(follow_symlinks=False) 50 return self._inode 51 52 def is_dir(self, follow_symlinks=True): 53 """Return True if this entry is a directory entry. 54 55 Args: 56 follow_symlinks: If True, also return True if this entry is a 57 symlink pointing to a directory. 58 59 Returns: 60 True if this entry is an existing directory entry, or if 61 follow_symlinks is set, and this entry points to an existing 62 directory entry. 63 """ 64 return self._isdir and (follow_symlinks or not self._islink) 65 66 def is_file(self, follow_symlinks=True): 67 """Return True if this entry is a regular file entry. 68 69 Args: 70 follow_symlinks: If True, also return True if this entry is a 71 symlink pointing to a regular file. 72 73 Returns: 74 True if this entry is an existing file entry, or if 75 follow_symlinks is set, and this entry points to an existing 76 file entry. 77 """ 78 return not self._isdir and (follow_symlinks or not self._islink) 79 80 def is_symlink(self): 81 """Return True if this entry is a symbolic link (even if broken).""" 82 return self._islink 83 84 def stat(self, follow_symlinks=True): 85 """Return a stat_result object for this entry. 86 87 Args: 88 follow_symlinks: If False and the entry is a symlink, return the 89 result for the symlink, otherwise for the object it points to. 90 """ 91 if follow_symlinks: 92 if self._statresult_symlink is None: 93 file_object = self._filesystem.resolve(self._abspath) 94 self._statresult_symlink = file_object.stat_result.copy() 95 if self._filesystem.is_windows_fs: 96 self._statresult_symlink.st_nlink = 0 97 return self._statresult_symlink 98 99 if self._statresult is None: 100 file_object = self._filesystem.lresolve(self._abspath) 101 self._inode = file_object.st_ino 102 self._statresult = file_object.stat_result.copy() 103 if self._filesystem.is_windows_fs: 104 self._statresult.st_nlink = 0 105 return self._statresult 106 107 def __fspath__(self): 108 return self.path 109 110 if sys.version_info >= (3, 12): 111 112 def is_junction(self) -> bool: 113 """Return True if this entry is a junction. 114 Junctions are not a part of posix semantic.""" 115 if not self._filesystem.is_windows_fs: 116 return False 117 file_object = self._filesystem.resolve(self._abspath) 118 return file_object.is_junction 119 120 121class ScanDirIter: 122 """Iterator for DirEntry objects returned from `scandir()` 123 function.""" 124 125 def __init__(self, filesystem, path): 126 self.filesystem = filesystem 127 if isinstance(path, int): 128 if self.filesystem.is_windows_fs: 129 raise NotImplementedError( 130 "scandir does not support file descriptor path argument" 131 ) 132 self.abspath = self.filesystem.absnormpath( 133 self.filesystem.get_open_file(path).get_object().path 134 ) 135 self.path = "" 136 else: 137 path = make_string_path(path) 138 self.abspath = self.filesystem.absnormpath(path) 139 self.path = to_string(path) 140 entries = self.filesystem.confirmdir(self.abspath, check_exe_perm=False).entries 141 self.entry_iter = iter(tuple(entries)) 142 143 def __iter__(self): 144 return self 145 146 def __next__(self): 147 entry = self.entry_iter.__next__() 148 dir_entry = DirEntry(self.filesystem) 149 dir_entry.name = entry 150 dir_entry.path = self.filesystem.joinpaths(self.path, dir_entry.name) 151 dir_entry._abspath = self.filesystem.joinpaths(self.abspath, dir_entry.name) 152 dir_entry._isdir = self.filesystem.isdir(dir_entry._abspath) 153 dir_entry._islink = self.filesystem.islink(dir_entry._abspath) 154 return dir_entry 155 156 def __enter__(self): 157 return self 158 159 def __exit__(self, exc_type, exc_val, exc_tb): 160 self.close() 161 162 def close(self): 163 pass 164 165 166def scandir(filesystem, path=""): 167 """Return an iterator of DirEntry objects corresponding to the entries 168 in the directory given by path. 169 170 Args: 171 filesystem: The fake filesystem used for implementation 172 path: Path to the target directory within the fake filesystem. 173 174 Returns: 175 an iterator to an unsorted list of os.DirEntry objects for 176 each entry in path. 177 178 Raises: 179 OSError: if the target is not a directory. 180 """ 181 return ScanDirIter(filesystem, path) 182 183 184def _classify_directory_contents(filesystem, root): 185 """Classify contents of a directory as files/directories. 186 187 Args: 188 filesystem: The fake filesystem used for implementation 189 root: (str) Directory to examine. 190 191 Returns: 192 (tuple) A tuple consisting of three values: the directory examined, 193 a list containing all of the directory entries, and a list 194 containing all of the non-directory entries. 195 (This is the same format as returned by the `os.walk` generator.) 196 197 Raises: 198 Nothing on its own, but be ready to catch exceptions generated by 199 underlying mechanisms like `os.listdir`. 200 """ 201 dirs = [] 202 files = [] 203 for entry in filesystem.listdir(root): 204 if filesystem.isdir(filesystem.joinpaths(root, entry)): 205 dirs.append(entry) 206 else: 207 files.append(entry) 208 return root, dirs, files 209 210 211def walk(filesystem, top, topdown=True, onerror=None, followlinks=False): 212 """Perform an os.walk operation over the fake filesystem. 213 214 Args: 215 filesystem: The fake filesystem used for implementation 216 top: The root directory from which to begin walk. 217 topdown: Determines whether to return the tuples with the root as 218 the first entry (`True`) or as the last, after all the child 219 directory tuples (`False`). 220 onerror: If not `None`, function which will be called to handle the 221 `os.error` instance provided when `os.listdir()` fails. 222 followlinks: If `True`, symbolic links are followed. 223 224 Yields: 225 (path, directories, nondirectories) for top and each of its 226 subdirectories. See the documentation for the builtin os module 227 for further details. 228 """ 229 230 def do_walk(top_dir, top_most=False): 231 if not top_most and not followlinks and filesystem.islink(top_dir): 232 return 233 try: 234 top_contents = _classify_directory_contents(filesystem, top_dir) 235 except OSError as exc: 236 top_contents = None 237 if onerror is not None: 238 onerror(exc) 239 240 if top_contents is not None: 241 if topdown: 242 yield top_contents 243 244 for directory in top_contents[1]: 245 path = filesystem.joinpaths(top_dir, directory) 246 if not followlinks and filesystem.islink(path): 247 continue 248 yield from do_walk(path) 249 if not topdown: 250 yield top_contents 251 252 return do_walk(make_string_path(to_string(top)), top_most=True) 253