• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Licensed under the Apache License, Version 2.0 (the "License");
2# you may not use this file except in compliance with the License.
3# You may obtain a copy of the License at
4#
5#      http://www.apache.org/licenses/LICENSE-2.0
6#
7# Unless required by applicable law or agreed to in writing, software
8# distributed under the License is distributed on an "AS IS" BASIS,
9# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10# See the License for the specific language governing permissions and
11# limitations under the License.
12
13"""Helper classes use for fake file system implementation."""
14import io
15import locale
16import platform
17import stat
18import sys
19from copy import copy
20from stat import S_IFLNK
21
22import os
23
24IS_PYPY = platform.python_implementation() == 'PyPy'
25IS_WIN = sys.platform == 'win32'
26IN_DOCKER = os.path.exists('/.dockerenv')
27
28
29def is_int_type(val):
30    """Return True if `val` is of integer type."""
31    return isinstance(val, int)
32
33
34def is_byte_string(val):
35    """Return True if `val` is a bytes-like object, False for a unicode
36    string."""
37    return not hasattr(val, 'encode')
38
39
40def is_unicode_string(val):
41    """Return True if `val` is a unicode string, False for a bytes-like
42    object."""
43    return hasattr(val, 'encode')
44
45
46def make_string_path(dir_name):
47    if sys.version_info >= (3, 6):
48        dir_name = os.fspath(dir_name)
49    return dir_name
50
51
52def to_string(path):
53    """Return the string representation of a byte string using the preferred
54     encoding, or the string itself if path is a str."""
55    if isinstance(path, bytes):
56        return path.decode(locale.getpreferredencoding(False))
57    return path
58
59
60class FakeStatResult:
61    """Mimics os.stat_result for use as return type of `stat()` and similar.
62    This is needed as `os.stat_result` has no possibility to set
63    nanosecond times directly.
64    """
65    _stat_float_times = True
66
67    def __init__(self, is_windows, user_id, group_id, initial_time=None):
68        self._use_float = None
69        self.st_mode = None
70        self.st_ino = None
71        self.st_dev = None
72        self.st_nlink = 0
73        self.st_uid = user_id
74        self.st_gid = group_id
75        self._st_size = None
76        self.is_windows = is_windows
77        if initial_time is not None:
78            self._st_atime_ns = int(initial_time * 1e9)
79        else:
80            self._st_atime_ns = None
81        self._st_mtime_ns = self._st_atime_ns
82        self._st_ctime_ns = self._st_atime_ns
83
84    @property
85    def use_float(self):
86        if self._use_float is None:
87            return self.stat_float_times()
88        return self._use_float
89
90    @use_float.setter
91    def use_float(self, val):
92        self._use_float = val
93
94    def __eq__(self, other):
95        return (
96                isinstance(other, FakeStatResult) and
97                self._st_atime_ns == other._st_atime_ns and
98                self._st_ctime_ns == other._st_ctime_ns and
99                self._st_mtime_ns == other._st_mtime_ns and
100                self.st_size == other.st_size and
101                self.st_gid == other.st_gid and
102                self.st_uid == other.st_uid and
103                self.st_nlink == other.st_nlink and
104                self.st_dev == other.st_dev and
105                self.st_ino == other.st_ino and
106                self.st_mode == other.st_mode
107        )
108
109    def __ne__(self, other):
110        return not self == other
111
112    def copy(self):
113        """Return a copy where the float usage is hard-coded to mimic the
114        behavior of the real os.stat_result.
115        """
116        stat_result = copy(self)
117        stat_result.use_float = self.use_float
118        return stat_result
119
120    def set_from_stat_result(self, stat_result):
121        """Set values from a real os.stat_result.
122        Note: values that are controlled by the fake filesystem are not set.
123        This includes st_ino, st_dev and st_nlink.
124        """
125        self.st_mode = stat_result.st_mode
126        self.st_uid = stat_result.st_uid
127        self.st_gid = stat_result.st_gid
128        self._st_size = stat_result.st_size
129        self._st_atime_ns = stat_result.st_atime_ns
130        self._st_mtime_ns = stat_result.st_mtime_ns
131        self._st_ctime_ns = stat_result.st_ctime_ns
132
133    @classmethod
134    def stat_float_times(cls, newvalue=None):
135        """Determine whether a file's time stamps are reported as floats
136        or ints.
137
138        Calling without arguments returns the current value.
139        The value is shared by all instances of FakeOsModule.
140
141        Args:
142            newvalue: If `True`, mtime, ctime, atime are reported as floats.
143                Otherwise, they are returned as ints (rounding down).
144        """
145        if newvalue is not None:
146            cls._stat_float_times = bool(newvalue)
147        return cls._stat_float_times
148
149    @property
150    def st_ctime(self):
151        """Return the creation time in seconds."""
152        ctime = self._st_ctime_ns / 1e9
153        return ctime if self.use_float else int(ctime)
154
155    @property
156    def st_atime(self):
157        """Return the access time in seconds."""
158        atime = self._st_atime_ns / 1e9
159        return atime if self.use_float else int(atime)
160
161    @property
162    def st_mtime(self):
163        """Return the modification time in seconds."""
164        mtime = self._st_mtime_ns / 1e9
165        return mtime if self.use_float else int(mtime)
166
167    @st_ctime.setter
168    def st_ctime(self, val):
169        """Set the creation time in seconds."""
170        self._st_ctime_ns = int(val * 1e9)
171
172    @st_atime.setter
173    def st_atime(self, val):
174        """Set the access time in seconds."""
175        self._st_atime_ns = int(val * 1e9)
176
177    @st_mtime.setter
178    def st_mtime(self, val):
179        """Set the modification time in seconds."""
180        self._st_mtime_ns = int(val * 1e9)
181
182    @property
183    def st_size(self):
184        if self.st_mode & S_IFLNK == S_IFLNK and self.is_windows:
185            return 0
186        return self._st_size
187
188    @st_size.setter
189    def st_size(self, val):
190        self._st_size = val
191
192    @property
193    def st_file_attributes(self):
194        if not self.is_windows:
195            raise AttributeError("module 'os.stat_result' "
196                                 "has no attribute 'st_file_attributes'")
197        mode = 0
198        st_mode = self.st_mode
199        if st_mode & stat.S_IFDIR:
200            mode |= stat.FILE_ATTRIBUTE_DIRECTORY
201        if st_mode & stat.S_IFREG:
202            mode |= stat.FILE_ATTRIBUTE_NORMAL
203        if st_mode & (stat.S_IFCHR | stat.S_IFBLK):
204            mode |= stat.FILE_ATTRIBUTE_DEVICE
205        if st_mode & stat.S_IFLNK:
206            mode |= stat.FILE_ATTRIBUTE_REPARSE_POINT
207        return mode
208
209    @property
210    def st_reparse_tag(self):
211        if not self.is_windows or sys.version_info < (3, 8):
212            raise AttributeError("module 'os.stat_result' "
213                                 "has no attribute 'st_reparse_tag'")
214        if self.st_mode & stat.S_IFLNK:
215            return stat.IO_REPARSE_TAG_SYMLINK
216        return 0
217
218    def __getitem__(self, item):
219        """Implement item access to mimic `os.stat_result` behavior."""
220        import stat
221
222        if item == stat.ST_MODE:
223            return self.st_mode
224        if item == stat.ST_INO:
225            return self.st_ino
226        if item == stat.ST_DEV:
227            return self.st_dev
228        if item == stat.ST_NLINK:
229            return self.st_nlink
230        if item == stat.ST_UID:
231            return self.st_uid
232        if item == stat.ST_GID:
233            return self.st_gid
234        if item == stat.ST_SIZE:
235            return self.st_size
236        if item == stat.ST_ATIME:
237            # item access always returns int for backward compatibility
238            return int(self.st_atime)
239        if item == stat.ST_MTIME:
240            return int(self.st_mtime)
241        if item == stat.ST_CTIME:
242            return int(self.st_ctime)
243        raise ValueError('Invalid item')
244
245    @property
246    def st_atime_ns(self):
247        """Return the access time in nanoseconds."""
248        return self._st_atime_ns
249
250    @property
251    def st_mtime_ns(self):
252        """Return the modification time in nanoseconds."""
253        return self._st_mtime_ns
254
255    @property
256    def st_ctime_ns(self):
257        """Return the creation time in nanoseconds."""
258        return self._st_ctime_ns
259
260    @st_atime_ns.setter
261    def st_atime_ns(self, val):
262        """Set the access time in nanoseconds."""
263        self._st_atime_ns = val
264
265    @st_mtime_ns.setter
266    def st_mtime_ns(self, val):
267        """Set the modification time of the fake file in nanoseconds."""
268        self._st_mtime_ns = val
269
270    @st_ctime_ns.setter
271    def st_ctime_ns(self, val):
272        """Set the creation time of the fake file in nanoseconds."""
273        self._st_ctime_ns = val
274
275
276class FileBufferIO:
277    """Stream class that handles Python string and byte contents for files.
278    The standard io.StringIO cannot be used for strings due to the slightly
279    different handling of newline mode.
280    Uses an io.BytesIO stream for the raw data and adds handling of encoding
281    and newlines.
282    """
283
284    def __init__(self, contents=None, linesep='\n', binary=False,
285                 newline=None, encoding=None, errors='strict'):
286        self._newline = newline
287        self._encoding = encoding
288        self.errors = errors
289        self._linesep = linesep
290        self.binary = binary
291        self._bytestream = io.BytesIO()
292        if contents is not None:
293            self.putvalue(contents)
294            self._bytestream.seek(0)
295
296    def encoding(self):
297        return self._encoding or locale.getpreferredencoding(False)
298
299    def encoded_string(self, contents):
300        if is_byte_string(contents):
301            return contents
302        return contents.encode(self.encoding(), self.errors)
303
304    def decoded_string(self, contents):
305        return contents.decode(self.encoding(), self.errors)
306
307    def convert_newlines_for_writing(self, s):
308        if self.binary:
309            return s
310        if self._newline in (None, '-'):
311            return s.replace('\n', self._linesep)
312        if self._newline in ('', '\n'):
313            return s
314        return s.replace('\n', self._newline)
315
316    def convert_newlines_after_reading(self, s):
317        if self._newline is None:
318            return s.replace('\r\n', '\n').replace('\r', '\n')
319        if self._newline == '-':
320            return s.replace(self._linesep, '\n')
321        return s
322
323    def read(self, size=-1):
324        contents = self._bytestream.read(size)
325        if self.binary:
326            return contents
327        return self.convert_newlines_after_reading(
328            self.decoded_string(contents))
329
330    def readline(self, size=-1):
331        seek_pos = self._bytestream.tell()
332        byte_contents = self._bytestream.read(size)
333        if self.binary:
334            read_contents = byte_contents
335            LF = b'\n'
336        else:
337            read_contents = self.convert_newlines_after_reading(
338                self.decoded_string(byte_contents))
339            LF = '\n'
340        end_pos = 0
341
342        if self._newline is None:
343            end_pos = self._linelen_for_universal_newlines(byte_contents)
344            if end_pos > 0:
345                length = read_contents.find(LF) + 1
346        elif self._newline == '':
347            end_pos = self._linelen_for_universal_newlines(byte_contents)
348            if end_pos > 0:
349                if byte_contents[end_pos - 1] == ord(b'\r'):
350                    newline = '\r'
351                elif end_pos > 1 and byte_contents[end_pos - 2] == ord(b'\r'):
352                    newline = '\r\n'
353                else:
354                    newline = '\n'
355                length = read_contents.find(newline) + len(newline)
356        else:
357            newline = '\n' if self._newline == '-' else self._newline
358            length = read_contents.find(newline)
359            if length >= 0:
360                nl_len = len(newline)
361                end_pos = byte_contents.find(newline.encode()) + nl_len
362                length += nl_len
363
364        if end_pos == 0:
365            length = len(read_contents)
366            end_pos = len(byte_contents)
367
368        self._bytestream.seek(seek_pos + end_pos)
369        return (byte_contents[:end_pos] if self.binary
370                else read_contents[:length])
371
372    def _linelen_for_universal_newlines(self, byte_contents):
373        if self.binary:
374            return byte_contents.find(b'\n') + 1
375        pos_lf = byte_contents.find(b'\n')
376        pos_cr = byte_contents.find(b'\r')
377        if pos_lf == -1 and pos_cr == -1:
378            return 0
379        if pos_lf != -1 and (pos_lf < pos_cr or pos_cr == -1):
380            end_pos = pos_lf
381        else:
382            end_pos = pos_cr
383        if end_pos == pos_cr and end_pos + 1 == pos_lf:
384            end_pos = pos_lf
385        return end_pos + 1
386
387    def readlines(self, size=-1):
388        remaining_size = size
389        lines = []
390        while True:
391            line = self.readline(remaining_size)
392            if not line:
393                return lines
394            lines.append(line)
395            if size > 0:
396                remaining_size -= len(line)
397                if remaining_size <= 0:
398                    return lines
399
400    def putvalue(self, s):
401        self._bytestream.write(self.encoded_string(s))
402
403    def write(self, s):
404        if self.binary != is_byte_string(s):
405            raise TypeError('Incorrect type for writing')
406        contents = self.convert_newlines_for_writing(s)
407        length = len(contents)
408        self.putvalue(contents)
409        return length
410
411    def writelines(self, lines):
412        for line in lines:
413            self.write(line)
414
415    def __iter__(self):
416        return self
417
418    def __next__(self):
419        line = self.readline()
420        if not line:
421            raise StopIteration
422        return line
423
424    def __getattr__(self, name):
425        return getattr(self._bytestream, name)
426
427
428class NullFileBufferIO(FileBufferIO):
429    """Special stream for null device. Does nothing on writing."""
430
431    def putvalue(self, s):
432        pass
433