1# Licensed under the Apache License, Version 2.0 (the "License"); 2# you may not use this file except in compliance with the License. 3# You may obtain a copy of the License at 4# 5# http://www.apache.org/licenses/LICENSE-2.0 6# 7# Unless required by applicable law or agreed to in writing, software 8# distributed under the License is distributed on an "AS IS" BASIS, 9# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 10# See the License for the specific language governing permissions and 11# limitations under the License. 12 13"""Helper classes use for fake file system implementation.""" 14import io 15import locale 16import platform 17import stat 18import sys 19from copy import copy 20from stat import S_IFLNK 21 22import os 23 24IS_PYPY = platform.python_implementation() == 'PyPy' 25IS_WIN = sys.platform == 'win32' 26IN_DOCKER = os.path.exists('/.dockerenv') 27 28 29def is_int_type(val): 30 """Return True if `val` is of integer type.""" 31 return isinstance(val, int) 32 33 34def is_byte_string(val): 35 """Return True if `val` is a bytes-like object, False for a unicode 36 string.""" 37 return not hasattr(val, 'encode') 38 39 40def is_unicode_string(val): 41 """Return True if `val` is a unicode string, False for a bytes-like 42 object.""" 43 return hasattr(val, 'encode') 44 45 46def make_string_path(dir_name): 47 if sys.version_info >= (3, 6): 48 dir_name = os.fspath(dir_name) 49 return dir_name 50 51 52def to_string(path): 53 """Return the string representation of a byte string using the preferred 54 encoding, or the string itself if path is a str.""" 55 if isinstance(path, bytes): 56 return path.decode(locale.getpreferredencoding(False)) 57 return path 58 59 60class FakeStatResult: 61 """Mimics os.stat_result for use as return type of `stat()` and similar. 62 This is needed as `os.stat_result` has no possibility to set 63 nanosecond times directly. 64 """ 65 _stat_float_times = True 66 67 def __init__(self, is_windows, user_id, group_id, initial_time=None): 68 self._use_float = None 69 self.st_mode = None 70 self.st_ino = None 71 self.st_dev = None 72 self.st_nlink = 0 73 self.st_uid = user_id 74 self.st_gid = group_id 75 self._st_size = None 76 self.is_windows = is_windows 77 if initial_time is not None: 78 self._st_atime_ns = int(initial_time * 1e9) 79 else: 80 self._st_atime_ns = None 81 self._st_mtime_ns = self._st_atime_ns 82 self._st_ctime_ns = self._st_atime_ns 83 84 @property 85 def use_float(self): 86 if self._use_float is None: 87 return self.stat_float_times() 88 return self._use_float 89 90 @use_float.setter 91 def use_float(self, val): 92 self._use_float = val 93 94 def __eq__(self, other): 95 return ( 96 isinstance(other, FakeStatResult) and 97 self._st_atime_ns == other._st_atime_ns and 98 self._st_ctime_ns == other._st_ctime_ns and 99 self._st_mtime_ns == other._st_mtime_ns and 100 self.st_size == other.st_size and 101 self.st_gid == other.st_gid and 102 self.st_uid == other.st_uid and 103 self.st_nlink == other.st_nlink and 104 self.st_dev == other.st_dev and 105 self.st_ino == other.st_ino and 106 self.st_mode == other.st_mode 107 ) 108 109 def __ne__(self, other): 110 return not self == other 111 112 def copy(self): 113 """Return a copy where the float usage is hard-coded to mimic the 114 behavior of the real os.stat_result. 115 """ 116 stat_result = copy(self) 117 stat_result.use_float = self.use_float 118 return stat_result 119 120 def set_from_stat_result(self, stat_result): 121 """Set values from a real os.stat_result. 122 Note: values that are controlled by the fake filesystem are not set. 123 This includes st_ino, st_dev and st_nlink. 124 """ 125 self.st_mode = stat_result.st_mode 126 self.st_uid = stat_result.st_uid 127 self.st_gid = stat_result.st_gid 128 self._st_size = stat_result.st_size 129 self._st_atime_ns = stat_result.st_atime_ns 130 self._st_mtime_ns = stat_result.st_mtime_ns 131 self._st_ctime_ns = stat_result.st_ctime_ns 132 133 @classmethod 134 def stat_float_times(cls, newvalue=None): 135 """Determine whether a file's time stamps are reported as floats 136 or ints. 137 138 Calling without arguments returns the current value. 139 The value is shared by all instances of FakeOsModule. 140 141 Args: 142 newvalue: If `True`, mtime, ctime, atime are reported as floats. 143 Otherwise, they are returned as ints (rounding down). 144 """ 145 if newvalue is not None: 146 cls._stat_float_times = bool(newvalue) 147 return cls._stat_float_times 148 149 @property 150 def st_ctime(self): 151 """Return the creation time in seconds.""" 152 ctime = self._st_ctime_ns / 1e9 153 return ctime if self.use_float else int(ctime) 154 155 @property 156 def st_atime(self): 157 """Return the access time in seconds.""" 158 atime = self._st_atime_ns / 1e9 159 return atime if self.use_float else int(atime) 160 161 @property 162 def st_mtime(self): 163 """Return the modification time in seconds.""" 164 mtime = self._st_mtime_ns / 1e9 165 return mtime if self.use_float else int(mtime) 166 167 @st_ctime.setter 168 def st_ctime(self, val): 169 """Set the creation time in seconds.""" 170 self._st_ctime_ns = int(val * 1e9) 171 172 @st_atime.setter 173 def st_atime(self, val): 174 """Set the access time in seconds.""" 175 self._st_atime_ns = int(val * 1e9) 176 177 @st_mtime.setter 178 def st_mtime(self, val): 179 """Set the modification time in seconds.""" 180 self._st_mtime_ns = int(val * 1e9) 181 182 @property 183 def st_size(self): 184 if self.st_mode & S_IFLNK == S_IFLNK and self.is_windows: 185 return 0 186 return self._st_size 187 188 @st_size.setter 189 def st_size(self, val): 190 self._st_size = val 191 192 @property 193 def st_file_attributes(self): 194 if not self.is_windows: 195 raise AttributeError("module 'os.stat_result' " 196 "has no attribute 'st_file_attributes'") 197 mode = 0 198 st_mode = self.st_mode 199 if st_mode & stat.S_IFDIR: 200 mode |= stat.FILE_ATTRIBUTE_DIRECTORY 201 if st_mode & stat.S_IFREG: 202 mode |= stat.FILE_ATTRIBUTE_NORMAL 203 if st_mode & (stat.S_IFCHR | stat.S_IFBLK): 204 mode |= stat.FILE_ATTRIBUTE_DEVICE 205 if st_mode & stat.S_IFLNK: 206 mode |= stat.FILE_ATTRIBUTE_REPARSE_POINT 207 return mode 208 209 @property 210 def st_reparse_tag(self): 211 if not self.is_windows or sys.version_info < (3, 8): 212 raise AttributeError("module 'os.stat_result' " 213 "has no attribute 'st_reparse_tag'") 214 if self.st_mode & stat.S_IFLNK: 215 return stat.IO_REPARSE_TAG_SYMLINK 216 return 0 217 218 def __getitem__(self, item): 219 """Implement item access to mimic `os.stat_result` behavior.""" 220 import stat 221 222 if item == stat.ST_MODE: 223 return self.st_mode 224 if item == stat.ST_INO: 225 return self.st_ino 226 if item == stat.ST_DEV: 227 return self.st_dev 228 if item == stat.ST_NLINK: 229 return self.st_nlink 230 if item == stat.ST_UID: 231 return self.st_uid 232 if item == stat.ST_GID: 233 return self.st_gid 234 if item == stat.ST_SIZE: 235 return self.st_size 236 if item == stat.ST_ATIME: 237 # item access always returns int for backward compatibility 238 return int(self.st_atime) 239 if item == stat.ST_MTIME: 240 return int(self.st_mtime) 241 if item == stat.ST_CTIME: 242 return int(self.st_ctime) 243 raise ValueError('Invalid item') 244 245 @property 246 def st_atime_ns(self): 247 """Return the access time in nanoseconds.""" 248 return self._st_atime_ns 249 250 @property 251 def st_mtime_ns(self): 252 """Return the modification time in nanoseconds.""" 253 return self._st_mtime_ns 254 255 @property 256 def st_ctime_ns(self): 257 """Return the creation time in nanoseconds.""" 258 return self._st_ctime_ns 259 260 @st_atime_ns.setter 261 def st_atime_ns(self, val): 262 """Set the access time in nanoseconds.""" 263 self._st_atime_ns = val 264 265 @st_mtime_ns.setter 266 def st_mtime_ns(self, val): 267 """Set the modification time of the fake file in nanoseconds.""" 268 self._st_mtime_ns = val 269 270 @st_ctime_ns.setter 271 def st_ctime_ns(self, val): 272 """Set the creation time of the fake file in nanoseconds.""" 273 self._st_ctime_ns = val 274 275 276class FileBufferIO: 277 """Stream class that handles Python string and byte contents for files. 278 The standard io.StringIO cannot be used for strings due to the slightly 279 different handling of newline mode. 280 Uses an io.BytesIO stream for the raw data and adds handling of encoding 281 and newlines. 282 """ 283 284 def __init__(self, contents=None, linesep='\n', binary=False, 285 newline=None, encoding=None, errors='strict'): 286 self._newline = newline 287 self._encoding = encoding 288 self.errors = errors 289 self._linesep = linesep 290 self.binary = binary 291 self._bytestream = io.BytesIO() 292 if contents is not None: 293 self.putvalue(contents) 294 self._bytestream.seek(0) 295 296 def encoding(self): 297 return self._encoding or locale.getpreferredencoding(False) 298 299 def encoded_string(self, contents): 300 if is_byte_string(contents): 301 return contents 302 return contents.encode(self.encoding(), self.errors) 303 304 def decoded_string(self, contents): 305 return contents.decode(self.encoding(), self.errors) 306 307 def convert_newlines_for_writing(self, s): 308 if self.binary: 309 return s 310 if self._newline in (None, '-'): 311 return s.replace('\n', self._linesep) 312 if self._newline in ('', '\n'): 313 return s 314 return s.replace('\n', self._newline) 315 316 def convert_newlines_after_reading(self, s): 317 if self._newline is None: 318 return s.replace('\r\n', '\n').replace('\r', '\n') 319 if self._newline == '-': 320 return s.replace(self._linesep, '\n') 321 return s 322 323 def read(self, size=-1): 324 contents = self._bytestream.read(size) 325 if self.binary: 326 return contents 327 return self.convert_newlines_after_reading( 328 self.decoded_string(contents)) 329 330 def readline(self, size=-1): 331 seek_pos = self._bytestream.tell() 332 byte_contents = self._bytestream.read(size) 333 if self.binary: 334 read_contents = byte_contents 335 LF = b'\n' 336 else: 337 read_contents = self.convert_newlines_after_reading( 338 self.decoded_string(byte_contents)) 339 LF = '\n' 340 end_pos = 0 341 342 if self._newline is None: 343 end_pos = self._linelen_for_universal_newlines(byte_contents) 344 if end_pos > 0: 345 length = read_contents.find(LF) + 1 346 elif self._newline == '': 347 end_pos = self._linelen_for_universal_newlines(byte_contents) 348 if end_pos > 0: 349 if byte_contents[end_pos - 1] == ord(b'\r'): 350 newline = '\r' 351 elif end_pos > 1 and byte_contents[end_pos - 2] == ord(b'\r'): 352 newline = '\r\n' 353 else: 354 newline = '\n' 355 length = read_contents.find(newline) + len(newline) 356 else: 357 newline = '\n' if self._newline == '-' else self._newline 358 length = read_contents.find(newline) 359 if length >= 0: 360 nl_len = len(newline) 361 end_pos = byte_contents.find(newline.encode()) + nl_len 362 length += nl_len 363 364 if end_pos == 0: 365 length = len(read_contents) 366 end_pos = len(byte_contents) 367 368 self._bytestream.seek(seek_pos + end_pos) 369 return (byte_contents[:end_pos] if self.binary 370 else read_contents[:length]) 371 372 def _linelen_for_universal_newlines(self, byte_contents): 373 if self.binary: 374 return byte_contents.find(b'\n') + 1 375 pos_lf = byte_contents.find(b'\n') 376 pos_cr = byte_contents.find(b'\r') 377 if pos_lf == -1 and pos_cr == -1: 378 return 0 379 if pos_lf != -1 and (pos_lf < pos_cr or pos_cr == -1): 380 end_pos = pos_lf 381 else: 382 end_pos = pos_cr 383 if end_pos == pos_cr and end_pos + 1 == pos_lf: 384 end_pos = pos_lf 385 return end_pos + 1 386 387 def readlines(self, size=-1): 388 remaining_size = size 389 lines = [] 390 while True: 391 line = self.readline(remaining_size) 392 if not line: 393 return lines 394 lines.append(line) 395 if size > 0: 396 remaining_size -= len(line) 397 if remaining_size <= 0: 398 return lines 399 400 def putvalue(self, s): 401 self._bytestream.write(self.encoded_string(s)) 402 403 def write(self, s): 404 if self.binary != is_byte_string(s): 405 raise TypeError('Incorrect type for writing') 406 contents = self.convert_newlines_for_writing(s) 407 length = len(contents) 408 self.putvalue(contents) 409 return length 410 411 def writelines(self, lines): 412 for line in lines: 413 self.write(line) 414 415 def __iter__(self): 416 return self 417 418 def __next__(self): 419 line = self.readline() 420 if not line: 421 raise StopIteration 422 return line 423 424 def __getattr__(self, name): 425 return getattr(self._bytestream, name) 426 427 428class NullFileBufferIO(FileBufferIO): 429 """Special stream for null device. Does nothing on writing.""" 430 431 def putvalue(self, s): 432 pass 433