• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Licensed under the Apache License, Version 2.0 (the "License");
2# you may not use this file except in compliance with the License.
3# You may obtain a copy of the License at
4#
5#      http://www.apache.org/licenses/LICENSE-2.0
6#
7# Unless required by applicable law or agreed to in writing, software
8# distributed under the License is distributed on an "AS IS" BASIS,
9# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10# See the License for the specific language governing permissions and
11# limitations under the License.
12
13"""A fake implementation for the `scandir` function working with
14FakeFilesystem.
15Works with both the function integrated into the `os` module since Python 3.5
16and the standalone function available in the standalone `scandir` python
17package.
18"""
19
20import os
21import sys
22
23from pyfakefs.helpers import to_string, make_string_path
24
25
26class DirEntry(os.PathLike):
27    """Emulates os.DirEntry. Note that we did not enforce keyword only
28    arguments."""
29
30    def __init__(self, filesystem):
31        """Initialize the dir entry with unset values.
32
33        Args:
34            filesystem: the fake filesystem used for implementation.
35        """
36        self._filesystem = filesystem
37        self.name = ""
38        self.path = ""
39        self._abspath = ""
40        self._inode = None
41        self._islink = False
42        self._isdir = False
43        self._statresult = None
44        self._statresult_symlink = None
45
46    def inode(self):
47        """Return the inode number of the entry."""
48        if self._inode is None:
49            self.stat(follow_symlinks=False)
50        return self._inode
51
52    def is_dir(self, follow_symlinks=True):
53        """Return True if this entry is a directory entry.
54
55        Args:
56            follow_symlinks: If True, also return True if this entry is a
57                symlink pointing to a directory.
58
59        Returns:
60            True if this entry is an existing directory entry, or if
61                follow_symlinks is set, and this entry points to an existing
62                directory entry.
63        """
64        return self._isdir and (follow_symlinks or not self._islink)
65
66    def is_file(self, follow_symlinks=True):
67        """Return True if this entry is a regular file entry.
68
69        Args:
70            follow_symlinks: If True, also return True if this entry is a
71                symlink pointing to a regular file.
72
73        Returns:
74            True if this entry is an existing file entry, or if
75                follow_symlinks is set, and this entry points to an existing
76                file entry.
77        """
78        return not self._isdir and (follow_symlinks or not self._islink)
79
80    def is_symlink(self):
81        """Return True if this entry is a symbolic link (even if broken)."""
82        return self._islink
83
84    def stat(self, follow_symlinks=True):
85        """Return a stat_result object for this entry.
86
87        Args:
88            follow_symlinks: If False and the entry is a symlink, return the
89                result for the symlink, otherwise for the object it points to.
90        """
91        if follow_symlinks:
92            if self._statresult_symlink is None:
93                file_object = self._filesystem.resolve(self._abspath)
94                self._statresult_symlink = file_object.stat_result.copy()
95                if self._filesystem.is_windows_fs:
96                    self._statresult_symlink.st_nlink = 0
97            return self._statresult_symlink
98
99        if self._statresult is None:
100            file_object = self._filesystem.lresolve(self._abspath)
101            self._inode = file_object.st_ino
102            self._statresult = file_object.stat_result.copy()
103            if self._filesystem.is_windows_fs:
104                self._statresult.st_nlink = 0
105        return self._statresult
106
107    def __fspath__(self):
108        return self.path
109
110    if sys.version_info >= (3, 12):
111
112        def is_junction(self) -> bool:
113            """Return True if this entry is a junction.
114            Junctions are not a part of posix semantic."""
115            if not self._filesystem.is_windows_fs:
116                return False
117            file_object = self._filesystem.resolve(self._abspath)
118            return file_object.is_junction
119
120
121class ScanDirIter:
122    """Iterator for DirEntry objects returned from `scandir()`
123    function."""
124
125    def __init__(self, filesystem, path):
126        self.filesystem = filesystem
127        if isinstance(path, int):
128            if self.filesystem.is_windows_fs:
129                raise NotImplementedError(
130                    "scandir does not support file descriptor path argument"
131                )
132            self.abspath = self.filesystem.absnormpath(
133                self.filesystem.get_open_file(path).get_object().path
134            )
135            self.path = ""
136        else:
137            path = make_string_path(path)
138            self.abspath = self.filesystem.absnormpath(path)
139            self.path = to_string(path)
140        entries = self.filesystem.confirmdir(self.abspath, check_exe_perm=False).entries
141        self.entry_iter = iter(tuple(entries))
142
143    def __iter__(self):
144        return self
145
146    def __next__(self):
147        entry = self.entry_iter.__next__()
148        dir_entry = DirEntry(self.filesystem)
149        dir_entry.name = entry
150        dir_entry.path = self.filesystem.joinpaths(self.path, dir_entry.name)
151        dir_entry._abspath = self.filesystem.joinpaths(self.abspath, dir_entry.name)
152        dir_entry._isdir = self.filesystem.isdir(dir_entry._abspath)
153        dir_entry._islink = self.filesystem.islink(dir_entry._abspath)
154        return dir_entry
155
156    def __enter__(self):
157        return self
158
159    def __exit__(self, exc_type, exc_val, exc_tb):
160        self.close()
161
162    def close(self):
163        pass
164
165
166def scandir(filesystem, path=""):
167    """Return an iterator of DirEntry objects corresponding to the entries
168    in the directory given by path.
169
170    Args:
171        filesystem: The fake filesystem used for implementation
172        path: Path to the target directory within the fake filesystem.
173
174    Returns:
175        an iterator to an unsorted list of os.DirEntry objects for
176        each entry in path.
177
178    Raises:
179        OSError: if the target is not a directory.
180    """
181    return ScanDirIter(filesystem, path)
182
183
184def _classify_directory_contents(filesystem, root):
185    """Classify contents of a directory as files/directories.
186
187    Args:
188        filesystem: The fake filesystem used for implementation
189        root: (str) Directory to examine.
190
191    Returns:
192        (tuple) A tuple consisting of three values: the directory examined,
193        a list containing all of the directory entries, and a list
194        containing all of the non-directory entries.
195        (This is the same format as returned by the `os.walk` generator.)
196
197    Raises:
198        Nothing on its own, but be ready to catch exceptions generated by
199        underlying mechanisms like `os.listdir`.
200    """
201    dirs = []
202    files = []
203    for entry in filesystem.listdir(root):
204        if filesystem.isdir(filesystem.joinpaths(root, entry)):
205            dirs.append(entry)
206        else:
207            files.append(entry)
208    return root, dirs, files
209
210
211def walk(filesystem, top, topdown=True, onerror=None, followlinks=False):
212    """Perform an os.walk operation over the fake filesystem.
213
214    Args:
215        filesystem: The fake filesystem used for implementation
216        top: The root directory from which to begin walk.
217        topdown: Determines whether to return the tuples with the root as
218            the first entry (`True`) or as the last, after all the child
219            directory tuples (`False`).
220      onerror: If not `None`, function which will be called to handle the
221            `os.error` instance provided when `os.listdir()` fails.
222      followlinks: If `True`, symbolic links are followed.
223
224    Yields:
225        (path, directories, nondirectories) for top and each of its
226        subdirectories.  See the documentation for the builtin os module
227        for further details.
228    """
229
230    def do_walk(top_dir, top_most=False):
231        if not top_most and not followlinks and filesystem.islink(top_dir):
232            return
233        try:
234            top_contents = _classify_directory_contents(filesystem, top_dir)
235        except OSError as exc:
236            top_contents = None
237            if onerror is not None:
238                onerror(exc)
239
240        if top_contents is not None:
241            if topdown:
242                yield top_contents
243
244            for directory in top_contents[1]:
245                path = filesystem.joinpaths(top_dir, directory)
246                if not followlinks and filesystem.islink(path):
247                    continue
248                yield from do_walk(path)
249            if not topdown:
250                yield top_contents
251
252    return do_walk(make_string_path(to_string(top)), top_most=True)
253