1"""Interface to the liblzma compression library. 2 3This module provides a class for reading and writing compressed files, 4classes for incremental (de)compression, and convenience functions for 5one-shot (de)compression. 6 7These classes and functions support both the XZ and legacy LZMA 8container formats, as well as raw compressed data streams. 9""" 10 11__all__ = [ 12 "CHECK_NONE", "CHECK_CRC32", "CHECK_CRC64", "CHECK_SHA256", 13 "CHECK_ID_MAX", "CHECK_UNKNOWN", 14 "FILTER_LZMA1", "FILTER_LZMA2", "FILTER_DELTA", "FILTER_X86", "FILTER_IA64", 15 "FILTER_ARM", "FILTER_ARMTHUMB", "FILTER_POWERPC", "FILTER_SPARC", 16 "FORMAT_AUTO", "FORMAT_XZ", "FORMAT_ALONE", "FORMAT_RAW", 17 "MF_HC3", "MF_HC4", "MF_BT2", "MF_BT3", "MF_BT4", 18 "MODE_FAST", "MODE_NORMAL", "PRESET_DEFAULT", "PRESET_EXTREME", 19 20 "LZMACompressor", "LZMADecompressor", "LZMAFile", "LZMAError", 21 "open", "compress", "decompress", "is_check_supported", 22] 23 24import builtins 25import io 26import os 27from _lzma import * 28from _lzma import _encode_filter_properties, _decode_filter_properties 29import _compression 30 31 32_MODE_CLOSED = 0 33_MODE_READ = 1 34# Value 2 no longer used 35_MODE_WRITE = 3 36 37 38class LZMAFile(_compression.BaseStream): 39 40 """A file object providing transparent LZMA (de)compression. 41 42 An LZMAFile can act as a wrapper for an existing file object, or 43 refer directly to a named file on disk. 44 45 Note that LZMAFile provides a *binary* file interface - data read 46 is returned as bytes, and data to be written must be given as bytes. 47 """ 48 49 def __init__(self, filename=None, mode="r", *, 50 format=None, check=-1, preset=None, filters=None): 51 """Open an LZMA-compressed file in binary mode. 52 53 filename can be either an actual file name (given as a str, 54 bytes, or PathLike object), in which case the named file is 55 opened, or it can be an existing file object to read from or 56 write to. 57 58 mode can be "r" for reading (default), "w" for (over)writing, 59 "x" for creating exclusively, or "a" for appending. These can 60 equivalently be given as "rb", "wb", "xb" and "ab" respectively. 61 62 format specifies the container format to use for the file. 63 If mode is "r", this defaults to FORMAT_AUTO. Otherwise, the 64 default is FORMAT_XZ. 65 66 check specifies the integrity check to use. This argument can 67 only be used when opening a file for writing. For FORMAT_XZ, 68 the default is CHECK_CRC64. FORMAT_ALONE and FORMAT_RAW do not 69 support integrity checks - for these formats, check must be 70 omitted, or be CHECK_NONE. 71 72 When opening a file for reading, the *preset* argument is not 73 meaningful, and should be omitted. The *filters* argument should 74 also be omitted, except when format is FORMAT_RAW (in which case 75 it is required). 76 77 When opening a file for writing, the settings used by the 78 compressor can be specified either as a preset compression 79 level (with the *preset* argument), or in detail as a custom 80 filter chain (with the *filters* argument). For FORMAT_XZ and 81 FORMAT_ALONE, the default is to use the PRESET_DEFAULT preset 82 level. For FORMAT_RAW, the caller must always specify a filter 83 chain; the raw compressor does not support preset compression 84 levels. 85 86 preset (if provided) should be an integer in the range 0-9, 87 optionally OR-ed with the constant PRESET_EXTREME. 88 89 filters (if provided) should be a sequence of dicts. Each dict 90 should have an entry for "id" indicating ID of the filter, plus 91 additional entries for options to the filter. 92 """ 93 self._fp = None 94 self._closefp = False 95 self._mode = _MODE_CLOSED 96 97 if mode in ("r", "rb"): 98 if check != -1: 99 raise ValueError("Cannot specify an integrity check " 100 "when opening a file for reading") 101 if preset is not None: 102 raise ValueError("Cannot specify a preset compression " 103 "level when opening a file for reading") 104 if format is None: 105 format = FORMAT_AUTO 106 mode_code = _MODE_READ 107 elif mode in ("w", "wb", "a", "ab", "x", "xb"): 108 if format is None: 109 format = FORMAT_XZ 110 mode_code = _MODE_WRITE 111 self._compressor = LZMACompressor(format=format, check=check, 112 preset=preset, filters=filters) 113 self._pos = 0 114 else: 115 raise ValueError("Invalid mode: {!r}".format(mode)) 116 117 if isinstance(filename, (str, bytes, os.PathLike)): 118 if "b" not in mode: 119 mode += "b" 120 self._fp = builtins.open(filename, mode) 121 self._closefp = True 122 self._mode = mode_code 123 elif hasattr(filename, "read") or hasattr(filename, "write"): 124 self._fp = filename 125 self._mode = mode_code 126 else: 127 raise TypeError("filename must be a str, bytes, file or PathLike object") 128 129 if self._mode == _MODE_READ: 130 raw = _compression.DecompressReader(self._fp, LZMADecompressor, 131 trailing_error=LZMAError, format=format, filters=filters) 132 self._buffer = io.BufferedReader(raw) 133 134 def close(self): 135 """Flush and close the file. 136 137 May be called more than once without error. Once the file is 138 closed, any other operation on it will raise a ValueError. 139 """ 140 if self._mode == _MODE_CLOSED: 141 return 142 try: 143 if self._mode == _MODE_READ: 144 self._buffer.close() 145 self._buffer = None 146 elif self._mode == _MODE_WRITE: 147 self._fp.write(self._compressor.flush()) 148 self._compressor = None 149 finally: 150 try: 151 if self._closefp: 152 self._fp.close() 153 finally: 154 self._fp = None 155 self._closefp = False 156 self._mode = _MODE_CLOSED 157 158 @property 159 def closed(self): 160 """True if this file is closed.""" 161 return self._mode == _MODE_CLOSED 162 163 def fileno(self): 164 """Return the file descriptor for the underlying file.""" 165 self._check_not_closed() 166 return self._fp.fileno() 167 168 def seekable(self): 169 """Return whether the file supports seeking.""" 170 return self.readable() and self._buffer.seekable() 171 172 def readable(self): 173 """Return whether the file was opened for reading.""" 174 self._check_not_closed() 175 return self._mode == _MODE_READ 176 177 def writable(self): 178 """Return whether the file was opened for writing.""" 179 self._check_not_closed() 180 return self._mode == _MODE_WRITE 181 182 def peek(self, size=-1): 183 """Return buffered data without advancing the file position. 184 185 Always returns at least one byte of data, unless at EOF. 186 The exact number of bytes returned is unspecified. 187 """ 188 self._check_can_read() 189 # Relies on the undocumented fact that BufferedReader.peek() always 190 # returns at least one byte (except at EOF) 191 return self._buffer.peek(size) 192 193 def read(self, size=-1): 194 """Read up to size uncompressed bytes from the file. 195 196 If size is negative or omitted, read until EOF is reached. 197 Returns b"" if the file is already at EOF. 198 """ 199 self._check_can_read() 200 return self._buffer.read(size) 201 202 def read1(self, size=-1): 203 """Read up to size uncompressed bytes, while trying to avoid 204 making multiple reads from the underlying stream. Reads up to a 205 buffer's worth of data if size is negative. 206 207 Returns b"" if the file is at EOF. 208 """ 209 self._check_can_read() 210 if size < 0: 211 size = io.DEFAULT_BUFFER_SIZE 212 return self._buffer.read1(size) 213 214 def readline(self, size=-1): 215 """Read a line of uncompressed bytes from the file. 216 217 The terminating newline (if present) is retained. If size is 218 non-negative, no more than size bytes will be read (in which 219 case the line may be incomplete). Returns b'' if already at EOF. 220 """ 221 self._check_can_read() 222 return self._buffer.readline(size) 223 224 def write(self, data): 225 """Write a bytes object to the file. 226 227 Returns the number of uncompressed bytes written, which is 228 always the length of data in bytes. Note that due to buffering, 229 the file on disk may not reflect the data written until close() 230 is called. 231 """ 232 self._check_can_write() 233 if isinstance(data, (bytes, bytearray)): 234 length = len(data) 235 else: 236 # accept any data that supports the buffer protocol 237 data = memoryview(data) 238 length = data.nbytes 239 240 compressed = self._compressor.compress(data) 241 self._fp.write(compressed) 242 self._pos += length 243 return length 244 245 def seek(self, offset, whence=io.SEEK_SET): 246 """Change the file position. 247 248 The new position is specified by offset, relative to the 249 position indicated by whence. Possible values for whence are: 250 251 0: start of stream (default): offset must not be negative 252 1: current stream position 253 2: end of stream; offset must not be positive 254 255 Returns the new file position. 256 257 Note that seeking is emulated, so depending on the parameters, 258 this operation may be extremely slow. 259 """ 260 self._check_can_seek() 261 return self._buffer.seek(offset, whence) 262 263 def tell(self): 264 """Return the current file position.""" 265 self._check_not_closed() 266 if self._mode == _MODE_READ: 267 return self._buffer.tell() 268 return self._pos 269 270 271def open(filename, mode="rb", *, 272 format=None, check=-1, preset=None, filters=None, 273 encoding=None, errors=None, newline=None): 274 """Open an LZMA-compressed file in binary or text mode. 275 276 filename can be either an actual file name (given as a str, bytes, 277 or PathLike object), in which case the named file is opened, or it 278 can be an existing file object to read from or write to. 279 280 The mode argument can be "r", "rb" (default), "w", "wb", "x", "xb", 281 "a", or "ab" for binary mode, or "rt", "wt", "xt", or "at" for text 282 mode. 283 284 The format, check, preset and filters arguments specify the 285 compression settings, as for LZMACompressor, LZMADecompressor and 286 LZMAFile. 287 288 For binary mode, this function is equivalent to the LZMAFile 289 constructor: LZMAFile(filename, mode, ...). In this case, the 290 encoding, errors and newline arguments must not be provided. 291 292 For text mode, an LZMAFile object is created, and wrapped in an 293 io.TextIOWrapper instance with the specified encoding, error 294 handling behavior, and line ending(s). 295 296 """ 297 if "t" in mode: 298 if "b" in mode: 299 raise ValueError("Invalid mode: %r" % (mode,)) 300 else: 301 if encoding is not None: 302 raise ValueError("Argument 'encoding' not supported in binary mode") 303 if errors is not None: 304 raise ValueError("Argument 'errors' not supported in binary mode") 305 if newline is not None: 306 raise ValueError("Argument 'newline' not supported in binary mode") 307 308 lz_mode = mode.replace("t", "") 309 binary_file = LZMAFile(filename, lz_mode, format=format, check=check, 310 preset=preset, filters=filters) 311 312 if "t" in mode: 313 encoding = io.text_encoding(encoding) 314 return io.TextIOWrapper(binary_file, encoding, errors, newline) 315 else: 316 return binary_file 317 318 319def compress(data, format=FORMAT_XZ, check=-1, preset=None, filters=None): 320 """Compress a block of data. 321 322 Refer to LZMACompressor's docstring for a description of the 323 optional arguments *format*, *check*, *preset* and *filters*. 324 325 For incremental compression, use an LZMACompressor instead. 326 """ 327 comp = LZMACompressor(format, check, preset, filters) 328 return comp.compress(data) + comp.flush() 329 330 331def decompress(data, format=FORMAT_AUTO, memlimit=None, filters=None): 332 """Decompress a block of data. 333 334 Refer to LZMADecompressor's docstring for a description of the 335 optional arguments *format*, *check* and *filters*. 336 337 For incremental decompression, use an LZMADecompressor instead. 338 """ 339 results = [] 340 while True: 341 decomp = LZMADecompressor(format, memlimit, filters) 342 try: 343 res = decomp.decompress(data) 344 except LZMAError: 345 if results: 346 break # Leftover data is not a valid LZMA/XZ stream; ignore it. 347 else: 348 raise # Error on the first iteration; bail out. 349 results.append(res) 350 if not decomp.eof: 351 raise LZMAError("Compressed data ended before the " 352 "end-of-stream marker was reached") 353 data = decomp.unused_data 354 if not data: 355 break 356 return b"".join(results) 357