• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Licensed under the Apache License, Version 2.0 (the "License");
2# you may not use this file except in compliance with the License.
3# You may obtain a copy of the License at
4#
5#      http://www.apache.org/licenses/LICENSE-2.0
6#
7# Unless required by applicable law or agreed to in writing, software
8# distributed under the License is distributed on an "AS IS" BASIS,
9# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10# See the License for the specific language governing permissions and
11# limitations under the License.
12
13"""A fake implementation for the `scandir` function working with
14FakeFilesystem.
15Works with both the function integrated into the `os` module since Python 3.5
16and the standalone function available in the standalone `scandir` python
17package.
18"""
19import os
20import sys
21
22from pyfakefs.extra_packages import use_scandir_package
23from pyfakefs.helpers import to_string
24
25if sys.version_info >= (3, 6):
26    BaseClass = os.PathLike
27else:
28    BaseClass = object
29
30
31class DirEntry(BaseClass):
32    """Emulates os.DirEntry. Note that we did not enforce keyword only
33    arguments."""
34
35    def __init__(self, filesystem):
36        """Initialize the dir entry with unset values.
37
38        Args:
39            filesystem: the fake filesystem used for implementation.
40        """
41        self._filesystem = filesystem
42        self.name = ''
43        self.path = ''
44        self._abspath = ''
45        self._inode = None
46        self._islink = False
47        self._isdir = False
48        self._statresult = None
49        self._statresult_symlink = None
50
51    def inode(self):
52        """Return the inode number of the entry."""
53        if self._inode is None:
54            self.stat(follow_symlinks=False)
55        return self._inode
56
57    def is_dir(self, follow_symlinks=True):
58        """Return True if this entry is a directory entry.
59
60        Args:
61            follow_symlinks: If True, also return True if this entry is a
62                symlink pointing to a directory.
63
64        Returns:
65            True if this entry is an existing directory entry, or if
66                follow_symlinks is set, and this entry points to an existing
67                directory entry.
68        """
69        return self._isdir and (follow_symlinks or not self._islink)
70
71    def is_file(self, follow_symlinks=True):
72        """Return True if this entry is a regular file entry.
73
74        Args:
75            follow_symlinks: If True, also return True if this entry is a
76                symlink pointing to a regular file.
77
78        Returns:
79            True if this entry is an existing file entry, or if
80                follow_symlinks is set, and this entry points to an existing
81                file entry.
82        """
83        return not self._isdir and (follow_symlinks or not self._islink)
84
85    def is_symlink(self):
86        """Return True if this entry is a symbolic link (even if broken)."""
87        return self._islink
88
89    def stat(self, follow_symlinks=True):
90        """Return a stat_result object for this entry.
91
92        Args:
93            follow_symlinks: If False and the entry is a symlink, return the
94                result for the symlink, otherwise for the object it points to.
95        """
96        if follow_symlinks:
97            if self._statresult_symlink is None:
98                file_object = self._filesystem.resolve(self._abspath)
99                self._statresult_symlink = file_object.stat_result.copy()
100                if self._filesystem.is_windows_fs:
101                    self._statresult_symlink.st_nlink = 0
102            return self._statresult_symlink
103
104        if self._statresult is None:
105            file_object = self._filesystem.lresolve(self._abspath)
106            self._inode = file_object.st_ino
107            self._statresult = file_object.stat_result.copy()
108            if self._filesystem.is_windows_fs:
109                self._statresult.st_nlink = 0
110        return self._statresult
111
112    if sys.version_info >= (3, 6):
113        def __fspath__(self):
114            return self.path
115
116
117class ScanDirIter:
118    """Iterator for DirEntry objects returned from `scandir()`
119    function."""
120
121    def __init__(self, filesystem, path):
122        self.filesystem = filesystem
123        if isinstance(path, int):
124            if not use_scandir_package and (
125                    sys.version_info < (3, 7) or
126                    self.filesystem.is_windows_fs):
127                raise NotImplementedError(
128                    'scandir does not support file descriptor '
129                    'path argument')
130            self.abspath = self.filesystem.absnormpath(
131                self.filesystem.get_open_file(path).get_object().path)
132            self.path = ''
133        else:
134            self.abspath = self.filesystem.absnormpath(path)
135            self.path = to_string(path)
136        entries = self.filesystem.confirmdir(self.abspath).entries
137        self.entry_iter = iter(entries)
138
139    def __iter__(self):
140        return self
141
142    def __next__(self):
143        entry = self.entry_iter.__next__()
144        dir_entry = DirEntry(self.filesystem)
145        dir_entry.name = entry
146        dir_entry.path = self.filesystem.joinpaths(self.path,
147                                                   dir_entry.name)
148        dir_entry._abspath = self.filesystem.joinpaths(self.abspath,
149                                                       dir_entry.name)
150        dir_entry._isdir = self.filesystem.isdir(dir_entry._abspath)
151        dir_entry._islink = self.filesystem.islink(dir_entry._abspath)
152        return dir_entry
153
154    if sys.version_info >= (3, 6):
155        def __enter__(self):
156            return self
157
158        def __exit__(self, exc_type, exc_val, exc_tb):
159            self.close()
160
161        def close(self):
162            pass
163
164
165def scandir(filesystem, path=''):
166    """Return an iterator of DirEntry objects corresponding to the entries
167    in the directory given by path.
168
169    Args:
170        filesystem: The fake filesystem used for implementation
171        path: Path to the target directory within the fake filesystem.
172
173    Returns:
174        an iterator to an unsorted list of os.DirEntry objects for
175        each entry in path.
176
177    Raises:
178        OSError: if the target is not a directory.
179    """
180    return ScanDirIter(filesystem, path)
181
182
183def _classify_directory_contents(filesystem, root):
184    """Classify contents of a directory as files/directories.
185
186    Args:
187        filesystem: The fake filesystem used for implementation
188        root: (str) Directory to examine.
189
190    Returns:
191        (tuple) A tuple consisting of three values: the directory examined,
192        a list containing all of the directory entries, and a list
193        containing all of the non-directory entries.
194        (This is the same format as returned by the `os.walk` generator.)
195
196    Raises:
197        Nothing on its own, but be ready to catch exceptions generated by
198        underlying mechanisms like `os.listdir`.
199    """
200    dirs = []
201    files = []
202    for entry in filesystem.listdir(root):
203        if filesystem.isdir(filesystem.joinpaths(root, entry)):
204            dirs.append(entry)
205        else:
206            files.append(entry)
207    return root, dirs, files
208
209
210def walk(filesystem, top, topdown=True, onerror=None, followlinks=False):
211    """Perform an os.walk operation over the fake filesystem.
212
213    Args:
214        filesystem: The fake filesystem used for implementation
215        top: The root directory from which to begin walk.
216        topdown: Determines whether to return the tuples with the root as
217            the first entry (`True`) or as the last, after all the child
218            directory tuples (`False`).
219      onerror: If not `None`, function which will be called to handle the
220            `os.error` instance provided when `os.listdir()` fails.
221      followlinks: If `True`, symbolic links are followed.
222
223    Yields:
224        (path, directories, nondirectories) for top and each of its
225        subdirectories.  See the documentation for the builtin os module
226        for further details.
227    """
228
229    def do_walk(top_dir, top_most=False):
230        if not top_most and not followlinks and filesystem.islink(top_dir):
231            return
232        try:
233            top_contents = _classify_directory_contents(filesystem, top_dir)
234        except OSError as exc:
235            top_contents = None
236            if onerror is not None:
237                onerror(exc)
238
239        if top_contents is not None:
240            if topdown:
241                yield top_contents
242
243            for directory in top_contents[1]:
244                path = filesystem.joinpaths(top_dir, directory)
245                if not followlinks and filesystem.islink(path):
246                    continue
247                for contents in do_walk(path):
248                    yield contents
249            if not topdown:
250                yield top_contents
251
252    return do_walk(to_string(top), top_most=True)
253
254
255class FakeScanDirModule:
256    """Uses FakeFilesystem to provide a fake `scandir` module replacement.
257
258    .. Note:: The ``scandir`` function is a part of the standard ``os`` module
259      since Python 3.5. This class handles the separate ``scandir`` module
260      that is available on pypi.
261
262    You need a fake_filesystem to use this:
263    `filesystem = fake_filesystem.FakeFilesystem()`
264    `fake_scandir_module = fake_filesystem.FakeScanDirModule(filesystem)`
265    """
266
267    @staticmethod
268    def dir():
269        """Return the list of patched function names. Used for patching
270        functions imported from the module.
271        """
272        return 'scandir', 'walk'
273
274    def __init__(self, filesystem):
275        self.filesystem = filesystem
276
277    def scandir(self, path='.'):
278        """Return an iterator of DirEntry objects corresponding to the entries
279        in the directory given by path.
280
281        Args:
282            path: Path to the target directory within the fake filesystem.
283
284        Returns:
285            an iterator to an unsorted list of os.DirEntry objects for
286            each entry in path.
287
288        Raises:
289            OSError: if the target is not a directory.
290        """
291        return scandir(self.filesystem, path)
292
293    def walk(self, top, topdown=True, onerror=None, followlinks=False):
294        """Perform a walk operation over the fake filesystem.
295
296        Args:
297            top: The root directory from which to begin walk.
298            topdown: Determines whether to return the tuples with the root as
299                the first entry (`True`) or as the last, after all the child
300                directory tuples (`False`).
301          onerror: If not `None`, function which will be called to handle the
302                `os.error` instance provided when `os.listdir()` fails.
303          followlinks: If `True`, symbolic links are followed.
304
305        Yields:
306            (path, directories, nondirectories) for top and each of its
307            subdirectories.  See the documentation for the builtin os module
308            for further details.
309        """
310        return walk(self.filesystem, top, topdown, onerror, followlinks)
311