1# Licensed under the Apache License, Version 2.0 (the "License"); 2# you may not use this file except in compliance with the License. 3# You may obtain a copy of the License at 4# 5# http://www.apache.org/licenses/LICENSE-2.0 6# 7# Unless required by applicable law or agreed to in writing, software 8# distributed under the License is distributed on an "AS IS" BASIS, 9# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 10# See the License for the specific language governing permissions and 11# limitations under the License. 12 13"""A fake implementation for the `scandir` function working with 14FakeFilesystem. 15Works with both the function integrated into the `os` module since Python 3.5 16and the standalone function available in the standalone `scandir` python 17package. 18""" 19import os 20import sys 21 22from pyfakefs.extra_packages import use_scandir_package 23from pyfakefs.helpers import to_string 24 25if sys.version_info >= (3, 6): 26 BaseClass = os.PathLike 27else: 28 BaseClass = object 29 30 31class DirEntry(BaseClass): 32 """Emulates os.DirEntry. Note that we did not enforce keyword only 33 arguments.""" 34 35 def __init__(self, filesystem): 36 """Initialize the dir entry with unset values. 37 38 Args: 39 filesystem: the fake filesystem used for implementation. 40 """ 41 self._filesystem = filesystem 42 self.name = '' 43 self.path = '' 44 self._abspath = '' 45 self._inode = None 46 self._islink = False 47 self._isdir = False 48 self._statresult = None 49 self._statresult_symlink = None 50 51 def inode(self): 52 """Return the inode number of the entry.""" 53 if self._inode is None: 54 self.stat(follow_symlinks=False) 55 return self._inode 56 57 def is_dir(self, follow_symlinks=True): 58 """Return True if this entry is a directory entry. 59 60 Args: 61 follow_symlinks: If True, also return True if this entry is a 62 symlink pointing to a directory. 63 64 Returns: 65 True if this entry is an existing directory entry, or if 66 follow_symlinks is set, and this entry points to an existing 67 directory entry. 68 """ 69 return self._isdir and (follow_symlinks or not self._islink) 70 71 def is_file(self, follow_symlinks=True): 72 """Return True if this entry is a regular file entry. 73 74 Args: 75 follow_symlinks: If True, also return True if this entry is a 76 symlink pointing to a regular file. 77 78 Returns: 79 True if this entry is an existing file entry, or if 80 follow_symlinks is set, and this entry points to an existing 81 file entry. 82 """ 83 return not self._isdir and (follow_symlinks or not self._islink) 84 85 def is_symlink(self): 86 """Return True if this entry is a symbolic link (even if broken).""" 87 return self._islink 88 89 def stat(self, follow_symlinks=True): 90 """Return a stat_result object for this entry. 91 92 Args: 93 follow_symlinks: If False and the entry is a symlink, return the 94 result for the symlink, otherwise for the object it points to. 95 """ 96 if follow_symlinks: 97 if self._statresult_symlink is None: 98 file_object = self._filesystem.resolve(self._abspath) 99 self._statresult_symlink = file_object.stat_result.copy() 100 if self._filesystem.is_windows_fs: 101 self._statresult_symlink.st_nlink = 0 102 return self._statresult_symlink 103 104 if self._statresult is None: 105 file_object = self._filesystem.lresolve(self._abspath) 106 self._inode = file_object.st_ino 107 self._statresult = file_object.stat_result.copy() 108 if self._filesystem.is_windows_fs: 109 self._statresult.st_nlink = 0 110 return self._statresult 111 112 if sys.version_info >= (3, 6): 113 def __fspath__(self): 114 return self.path 115 116 117class ScanDirIter: 118 """Iterator for DirEntry objects returned from `scandir()` 119 function.""" 120 121 def __init__(self, filesystem, path): 122 self.filesystem = filesystem 123 if isinstance(path, int): 124 if not use_scandir_package and ( 125 sys.version_info < (3, 7) or 126 self.filesystem.is_windows_fs): 127 raise NotImplementedError( 128 'scandir does not support file descriptor ' 129 'path argument') 130 self.abspath = self.filesystem.absnormpath( 131 self.filesystem.get_open_file(path).get_object().path) 132 self.path = '' 133 else: 134 self.abspath = self.filesystem.absnormpath(path) 135 self.path = to_string(path) 136 entries = self.filesystem.confirmdir(self.abspath).entries 137 self.entry_iter = iter(entries) 138 139 def __iter__(self): 140 return self 141 142 def __next__(self): 143 entry = self.entry_iter.__next__() 144 dir_entry = DirEntry(self.filesystem) 145 dir_entry.name = entry 146 dir_entry.path = self.filesystem.joinpaths(self.path, 147 dir_entry.name) 148 dir_entry._abspath = self.filesystem.joinpaths(self.abspath, 149 dir_entry.name) 150 dir_entry._isdir = self.filesystem.isdir(dir_entry._abspath) 151 dir_entry._islink = self.filesystem.islink(dir_entry._abspath) 152 return dir_entry 153 154 if sys.version_info >= (3, 6): 155 def __enter__(self): 156 return self 157 158 def __exit__(self, exc_type, exc_val, exc_tb): 159 self.close() 160 161 def close(self): 162 pass 163 164 165def scandir(filesystem, path=''): 166 """Return an iterator of DirEntry objects corresponding to the entries 167 in the directory given by path. 168 169 Args: 170 filesystem: The fake filesystem used for implementation 171 path: Path to the target directory within the fake filesystem. 172 173 Returns: 174 an iterator to an unsorted list of os.DirEntry objects for 175 each entry in path. 176 177 Raises: 178 OSError: if the target is not a directory. 179 """ 180 return ScanDirIter(filesystem, path) 181 182 183def _classify_directory_contents(filesystem, root): 184 """Classify contents of a directory as files/directories. 185 186 Args: 187 filesystem: The fake filesystem used for implementation 188 root: (str) Directory to examine. 189 190 Returns: 191 (tuple) A tuple consisting of three values: the directory examined, 192 a list containing all of the directory entries, and a list 193 containing all of the non-directory entries. 194 (This is the same format as returned by the `os.walk` generator.) 195 196 Raises: 197 Nothing on its own, but be ready to catch exceptions generated by 198 underlying mechanisms like `os.listdir`. 199 """ 200 dirs = [] 201 files = [] 202 for entry in filesystem.listdir(root): 203 if filesystem.isdir(filesystem.joinpaths(root, entry)): 204 dirs.append(entry) 205 else: 206 files.append(entry) 207 return root, dirs, files 208 209 210def walk(filesystem, top, topdown=True, onerror=None, followlinks=False): 211 """Perform an os.walk operation over the fake filesystem. 212 213 Args: 214 filesystem: The fake filesystem used for implementation 215 top: The root directory from which to begin walk. 216 topdown: Determines whether to return the tuples with the root as 217 the first entry (`True`) or as the last, after all the child 218 directory tuples (`False`). 219 onerror: If not `None`, function which will be called to handle the 220 `os.error` instance provided when `os.listdir()` fails. 221 followlinks: If `True`, symbolic links are followed. 222 223 Yields: 224 (path, directories, nondirectories) for top and each of its 225 subdirectories. See the documentation for the builtin os module 226 for further details. 227 """ 228 229 def do_walk(top_dir, top_most=False): 230 if not top_most and not followlinks and filesystem.islink(top_dir): 231 return 232 try: 233 top_contents = _classify_directory_contents(filesystem, top_dir) 234 except OSError as exc: 235 top_contents = None 236 if onerror is not None: 237 onerror(exc) 238 239 if top_contents is not None: 240 if topdown: 241 yield top_contents 242 243 for directory in top_contents[1]: 244 path = filesystem.joinpaths(top_dir, directory) 245 if not followlinks and filesystem.islink(path): 246 continue 247 for contents in do_walk(path): 248 yield contents 249 if not topdown: 250 yield top_contents 251 252 return do_walk(to_string(top), top_most=True) 253 254 255class FakeScanDirModule: 256 """Uses FakeFilesystem to provide a fake `scandir` module replacement. 257 258 .. Note:: The ``scandir`` function is a part of the standard ``os`` module 259 since Python 3.5. This class handles the separate ``scandir`` module 260 that is available on pypi. 261 262 You need a fake_filesystem to use this: 263 `filesystem = fake_filesystem.FakeFilesystem()` 264 `fake_scandir_module = fake_filesystem.FakeScanDirModule(filesystem)` 265 """ 266 267 @staticmethod 268 def dir(): 269 """Return the list of patched function names. Used for patching 270 functions imported from the module. 271 """ 272 return 'scandir', 'walk' 273 274 def __init__(self, filesystem): 275 self.filesystem = filesystem 276 277 def scandir(self, path='.'): 278 """Return an iterator of DirEntry objects corresponding to the entries 279 in the directory given by path. 280 281 Args: 282 path: Path to the target directory within the fake filesystem. 283 284 Returns: 285 an iterator to an unsorted list of os.DirEntry objects for 286 each entry in path. 287 288 Raises: 289 OSError: if the target is not a directory. 290 """ 291 return scandir(self.filesystem, path) 292 293 def walk(self, top, topdown=True, onerror=None, followlinks=False): 294 """Perform a walk operation over the fake filesystem. 295 296 Args: 297 top: The root directory from which to begin walk. 298 topdown: Determines whether to return the tuples with the root as 299 the first entry (`True`) or as the last, after all the child 300 directory tuples (`False`). 301 onerror: If not `None`, function which will be called to handle the 302 `os.error` instance provided when `os.listdir()` fails. 303 followlinks: If `True`, symbolic links are followed. 304 305 Yields: 306 (path, directories, nondirectories) for top and each of its 307 subdirectories. See the documentation for the builtin os module 308 for further details. 309 """ 310 return walk(self.filesystem, top, topdown, onerror, followlinks) 311