• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Licensed under the Apache License, Version 2.0 (the "License");
2# you may not use this file except in compliance with the License.
3# You may obtain a copy of the License at
4#
5#      http://www.apache.org/licenses/LICENSE-2.0
6#
7# Unless required by applicable law or agreed to in writing, software
8# distributed under the License is distributed on an "AS IS" BASIS,
9# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10# See the License for the specific language governing permissions and
11# limitations under the License.
12
13"""Helper classes use for fake file system implementation."""
14import io
15import locale
16import os
17import platform
18import stat
19import sys
20import time
21from copy import copy
22from stat import S_IFLNK
23from typing import Union, Optional, Any, AnyStr, overload, cast
24
25IS_PYPY = platform.python_implementation() == 'PyPy'
26IS_WIN = sys.platform == 'win32'
27IN_DOCKER = os.path.exists('/.dockerenv')
28
29AnyPath = Union[AnyStr, os.PathLike]
30
31
32def is_int_type(val: Any) -> bool:
33    """Return True if `val` is of integer type."""
34    return isinstance(val, int)
35
36
37def is_byte_string(val: Any) -> bool:
38    """Return True if `val` is a bytes-like object, False for a unicode
39    string."""
40    return not hasattr(val, 'encode')
41
42
43def is_unicode_string(val: Any) -> bool:
44    """Return True if `val` is a unicode string, False for a bytes-like
45    object."""
46    return hasattr(val, 'encode')
47
48
49@overload
50def make_string_path(dir_name: AnyStr) -> AnyStr: ...
51
52
53@overload
54def make_string_path(dir_name: os.PathLike) -> str: ...
55
56
57def make_string_path(dir_name: AnyPath) -> AnyStr:
58    return cast(AnyStr, os.fspath(dir_name))
59
60
61def to_string(path: Union[AnyStr, Union[str, bytes]]) -> str:
62    """Return the string representation of a byte string using the preferred
63     encoding, or the string itself if path is a str."""
64    if isinstance(path, bytes):
65        return path.decode(locale.getpreferredencoding(False))
66    return path
67
68
69def to_bytes(path: Union[AnyStr, Union[str, bytes]]) -> bytes:
70    """Return the bytes representation of a string using the preferred
71     encoding, or the byte string itself if path is a byte string."""
72    if isinstance(path, str):
73        return bytes(path, locale.getpreferredencoding(False))
74    return path
75
76
77def join_strings(s1: AnyStr, s2: AnyStr) -> AnyStr:
78    """This is a bit of a hack to satisfy mypy - may be refactored."""
79    return s1 + s2
80
81
82def real_encoding(encoding: Optional[str]) -> Optional[str]:
83    """Since Python 3.10, the new function ``io.text_encoding`` returns
84    "locale" as the encoding if None is defined. This will be handled
85    as no encoding in pyfakefs."""
86    if sys.version_info >= (3, 10):
87        return encoding if encoding != "locale" else None
88    return encoding
89
90
91def now():
92    return time.time()
93
94
95@overload
96def matching_string(matched: bytes, string: AnyStr) -> bytes: ...
97
98
99@overload
100def matching_string(matched: str, string: AnyStr) -> str: ...
101
102
103@overload
104def matching_string(matched: AnyStr, string: None) -> None: ...
105
106
107def matching_string(  # type: ignore[misc]
108        matched: AnyStr, string: Optional[AnyStr]) -> Optional[AnyStr]:
109    """Return the string as byte or unicode depending
110    on the type of matched, assuming string is an ASCII string.
111    """
112    if string is None:
113        return string
114    if isinstance(matched, bytes) and isinstance(string, str):
115        return string.encode(locale.getpreferredencoding(False))
116    return string
117
118
119class FakeStatResult:
120    """Mimics os.stat_result for use as return type of `stat()` and similar.
121    This is needed as `os.stat_result` has no possibility to set
122    nanosecond times directly.
123    """
124    _stat_float_times: bool = True
125
126    def __init__(self, is_windows: bool, user_id: int, group_id: int,
127                 initial_time: Optional[float] = None):
128        self._use_float: Optional[bool] = None
129        self.st_mode: int = 0
130        self.st_ino: Optional[int] = None
131        self.st_dev: int = 0
132        self.st_nlink: int = 0
133        self.st_uid: int = user_id
134        self.st_gid: int = group_id
135        self._st_size: int = 0
136        self.is_windows: bool = is_windows
137        self._st_atime_ns: int = int((initial_time or 0) * 1e9)
138        self._st_mtime_ns: int = self._st_atime_ns
139        self._st_ctime_ns: int = self._st_atime_ns
140
141    @property
142    def use_float(self) -> bool:
143        if self._use_float is None:
144            return self.stat_float_times()
145        return self._use_float
146
147    @use_float.setter
148    def use_float(self, val: bool) -> None:
149        self._use_float = val
150
151    def __eq__(self, other: Any) -> bool:
152        return (
153                isinstance(other, FakeStatResult) and
154                self._st_atime_ns == other._st_atime_ns and
155                self._st_ctime_ns == other._st_ctime_ns and
156                self._st_mtime_ns == other._st_mtime_ns and
157                self.st_size == other.st_size and
158                self.st_gid == other.st_gid and
159                self.st_uid == other.st_uid and
160                self.st_nlink == other.st_nlink and
161                self.st_dev == other.st_dev and
162                self.st_ino == other.st_ino and
163                self.st_mode == other.st_mode
164        )
165
166    def __ne__(self, other: Any) -> bool:
167        return not self == other
168
169    def copy(self) -> "FakeStatResult":
170        """Return a copy where the float usage is hard-coded to mimic the
171        behavior of the real os.stat_result.
172        """
173        stat_result = copy(self)
174        stat_result.use_float = self.use_float
175        return stat_result
176
177    def set_from_stat_result(self, stat_result: os.stat_result) -> None:
178        """Set values from a real os.stat_result.
179        Note: values that are controlled by the fake filesystem are not set.
180        This includes st_ino, st_dev and st_nlink.
181        """
182        self.st_mode = stat_result.st_mode
183        self.st_uid = stat_result.st_uid
184        self.st_gid = stat_result.st_gid
185        self._st_size = stat_result.st_size
186        self._st_atime_ns = stat_result.st_atime_ns
187        self._st_mtime_ns = stat_result.st_mtime_ns
188        self._st_ctime_ns = stat_result.st_ctime_ns
189
190    @classmethod
191    def stat_float_times(cls, newvalue: Optional[bool] = None) -> bool:
192        """Determine whether a file's time stamps are reported as floats
193        or ints.
194
195        Calling without arguments returns the current value.
196        The value is shared by all instances of FakeOsModule.
197
198        Args:
199            newvalue: If `True`, mtime, ctime, atime are reported as floats.
200                Otherwise, they are returned as ints (rounding down).
201        """
202        if newvalue is not None:
203            cls._stat_float_times = bool(newvalue)
204        return cls._stat_float_times
205
206    @property
207    def st_ctime(self) -> Union[int, float]:
208        """Return the creation time in seconds."""
209        ctime = self._st_ctime_ns / 1e9
210        return ctime if self.use_float else int(ctime)
211
212    @st_ctime.setter
213    def st_ctime(self, val: Union[int, float]) -> None:
214        """Set the creation time in seconds."""
215        self._st_ctime_ns = int(val * 1e9)
216
217    @property
218    def st_atime(self) -> Union[int, float]:
219        """Return the access time in seconds."""
220        atime = self._st_atime_ns / 1e9
221        return atime if self.use_float else int(atime)
222
223    @st_atime.setter
224    def st_atime(self, val: Union[int, float]) -> None:
225        """Set the access time in seconds."""
226        self._st_atime_ns = int(val * 1e9)
227
228    @property
229    def st_mtime(self) -> Union[int, float]:
230        """Return the modification time in seconds."""
231        mtime = self._st_mtime_ns / 1e9
232        return mtime if self.use_float else int(mtime)
233
234    @st_mtime.setter
235    def st_mtime(self, val: Union[int, float]) -> None:
236        """Set the modification time in seconds."""
237        self._st_mtime_ns = int(val * 1e9)
238
239    @property
240    def st_size(self) -> int:
241        if self.st_mode & S_IFLNK == S_IFLNK and self.is_windows:
242            return 0
243        return self._st_size
244
245    @st_size.setter
246    def st_size(self, val: int) -> None:
247        self._st_size = val
248
249    @property
250    def st_file_attributes(self) -> int:
251        if not self.is_windows:
252            raise AttributeError("module 'os.stat_result' "
253                                 "has no attribute 'st_file_attributes'")
254        mode = 0
255        st_mode = self.st_mode
256        if st_mode & stat.S_IFDIR:
257            mode |= stat.FILE_ATTRIBUTE_DIRECTORY
258        if st_mode & stat.S_IFREG:
259            mode |= stat.FILE_ATTRIBUTE_NORMAL
260        if st_mode & (stat.S_IFCHR | stat.S_IFBLK):
261            mode |= stat.FILE_ATTRIBUTE_DEVICE
262        if st_mode & stat.S_IFLNK:
263            mode |= stat.FILE_ATTRIBUTE_REPARSE_POINT
264        return mode
265
266    @property
267    def st_reparse_tag(self) -> int:
268        if not self.is_windows or sys.version_info < (3, 8):
269            raise AttributeError("module 'os.stat_result' "
270                                 "has no attribute 'st_reparse_tag'")
271        if self.st_mode & stat.S_IFLNK:
272            return stat.IO_REPARSE_TAG_SYMLINK  # type: ignore[attr-defined]
273        return 0
274
275    def __getitem__(self, item: int) -> Optional[int]:
276        """Implement item access to mimic `os.stat_result` behavior."""
277        import stat
278
279        if item == stat.ST_MODE:
280            return self.st_mode
281        if item == stat.ST_INO:
282            return self.st_ino
283        if item == stat.ST_DEV:
284            return self.st_dev
285        if item == stat.ST_NLINK:
286            return self.st_nlink
287        if item == stat.ST_UID:
288            return self.st_uid
289        if item == stat.ST_GID:
290            return self.st_gid
291        if item == stat.ST_SIZE:
292            return self.st_size
293        if item == stat.ST_ATIME:
294            # item access always returns int for backward compatibility
295            return int(self.st_atime)
296        if item == stat.ST_MTIME:
297            return int(self.st_mtime)
298        if item == stat.ST_CTIME:
299            return int(self.st_ctime)
300        raise ValueError('Invalid item')
301
302    @property
303    def st_atime_ns(self) -> int:
304        """Return the access time in nanoseconds."""
305        return self._st_atime_ns
306
307    @st_atime_ns.setter
308    def st_atime_ns(self, val: int) -> None:
309        """Set the access time in nanoseconds."""
310        self._st_atime_ns = val
311
312    @property
313    def st_mtime_ns(self) -> int:
314        """Return the modification time in nanoseconds."""
315        return self._st_mtime_ns
316
317    @st_mtime_ns.setter
318    def st_mtime_ns(self, val: int) -> None:
319        """Set the modification time of the fake file in nanoseconds."""
320        self._st_mtime_ns = val
321
322    @property
323    def st_ctime_ns(self) -> int:
324        """Return the creation time in nanoseconds."""
325        return self._st_ctime_ns
326
327    @st_ctime_ns.setter
328    def st_ctime_ns(self, val: int) -> None:
329        """Set the creation time of the fake file in nanoseconds."""
330        self._st_ctime_ns = val
331
332
333class BinaryBufferIO(io.BytesIO):
334    """Stream class that handles byte contents for files."""
335
336    def __init__(self, contents: Optional[bytes]):
337        super().__init__(contents or b'')
338
339    def putvalue(self, value: bytes) -> None:
340        self.write(value)
341
342
343class TextBufferIO(io.TextIOWrapper):
344    """Stream class that handles Python string contents for files.
345    """
346
347    def __init__(self, contents: Optional[bytes] = None,
348                 newline: Optional[str] = None,
349                 encoding: Optional[str] = None,
350                 errors: str = 'strict'):
351        self._bytestream = io.BytesIO(contents or b'')
352        super().__init__(self._bytestream, encoding, errors, newline)
353
354    def getvalue(self) -> bytes:
355        return self._bytestream.getvalue()
356
357    def putvalue(self, value: bytes) -> None:
358        self._bytestream.write(value)
359