• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Interface to the liblzma compression library.
2
3This module provides a class for reading and writing compressed files,
4classes for incremental (de)compression, and convenience functions for
5one-shot (de)compression.
6
7These classes and functions support both the XZ and legacy LZMA
8container formats, as well as raw compressed data streams.
9"""
10
11__all__ = [
12    "CHECK_NONE", "CHECK_CRC32", "CHECK_CRC64", "CHECK_SHA256",
13    "CHECK_ID_MAX", "CHECK_UNKNOWN",
14    "FILTER_LZMA1", "FILTER_LZMA2", "FILTER_DELTA", "FILTER_X86", "FILTER_IA64",
15    "FILTER_ARM", "FILTER_ARMTHUMB", "FILTER_POWERPC", "FILTER_SPARC",
16    "FORMAT_AUTO", "FORMAT_XZ", "FORMAT_ALONE", "FORMAT_RAW",
17    "MF_HC3", "MF_HC4", "MF_BT2", "MF_BT3", "MF_BT4",
18    "MODE_FAST", "MODE_NORMAL", "PRESET_DEFAULT", "PRESET_EXTREME",
19
20    "LZMACompressor", "LZMADecompressor", "LZMAFile", "LZMAError",
21    "open", "compress", "decompress", "is_check_supported",
22]
23
24import builtins
25import io
26import os
27from _lzma import *
28from _lzma import _encode_filter_properties, _decode_filter_properties
29import _compression
30
31
32_MODE_CLOSED   = 0
33_MODE_READ     = 1
34# Value 2 no longer used
35_MODE_WRITE    = 3
36
37
38class LZMAFile(_compression.BaseStream):
39
40    """A file object providing transparent LZMA (de)compression.
41
42    An LZMAFile can act as a wrapper for an existing file object, or
43    refer directly to a named file on disk.
44
45    Note that LZMAFile provides a *binary* file interface - data read
46    is returned as bytes, and data to be written must be given as bytes.
47    """
48
49    def __init__(self, filename=None, mode="r", *,
50                 format=None, check=-1, preset=None, filters=None):
51        """Open an LZMA-compressed file in binary mode.
52
53        filename can be either an actual file name (given as a str,
54        bytes, or PathLike object), in which case the named file is
55        opened, or it can be an existing file object to read from or
56        write to.
57
58        mode can be "r" for reading (default), "w" for (over)writing,
59        "x" for creating exclusively, or "a" for appending. These can
60        equivalently be given as "rb", "wb", "xb" and "ab" respectively.
61
62        format specifies the container format to use for the file.
63        If mode is "r", this defaults to FORMAT_AUTO. Otherwise, the
64        default is FORMAT_XZ.
65
66        check specifies the integrity check to use. This argument can
67        only be used when opening a file for writing. For FORMAT_XZ,
68        the default is CHECK_CRC64. FORMAT_ALONE and FORMAT_RAW do not
69        support integrity checks - for these formats, check must be
70        omitted, or be CHECK_NONE.
71
72        When opening a file for reading, the *preset* argument is not
73        meaningful, and should be omitted. The *filters* argument should
74        also be omitted, except when format is FORMAT_RAW (in which case
75        it is required).
76
77        When opening a file for writing, the settings used by the
78        compressor can be specified either as a preset compression
79        level (with the *preset* argument), or in detail as a custom
80        filter chain (with the *filters* argument). For FORMAT_XZ and
81        FORMAT_ALONE, the default is to use the PRESET_DEFAULT preset
82        level. For FORMAT_RAW, the caller must always specify a filter
83        chain; the raw compressor does not support preset compression
84        levels.
85
86        preset (if provided) should be an integer in the range 0-9,
87        optionally OR-ed with the constant PRESET_EXTREME.
88
89        filters (if provided) should be a sequence of dicts. Each dict
90        should have an entry for "id" indicating ID of the filter, plus
91        additional entries for options to the filter.
92        """
93        self._fp = None
94        self._closefp = False
95        self._mode = _MODE_CLOSED
96
97        if mode in ("r", "rb"):
98            if check != -1:
99                raise ValueError("Cannot specify an integrity check "
100                                 "when opening a file for reading")
101            if preset is not None:
102                raise ValueError("Cannot specify a preset compression "
103                                 "level when opening a file for reading")
104            if format is None:
105                format = FORMAT_AUTO
106            mode_code = _MODE_READ
107        elif mode in ("w", "wb", "a", "ab", "x", "xb"):
108            if format is None:
109                format = FORMAT_XZ
110            mode_code = _MODE_WRITE
111            self._compressor = LZMACompressor(format=format, check=check,
112                                              preset=preset, filters=filters)
113            self._pos = 0
114        else:
115            raise ValueError("Invalid mode: {!r}".format(mode))
116
117        if isinstance(filename, (str, bytes, os.PathLike)):
118            if "b" not in mode:
119                mode += "b"
120            self._fp = builtins.open(filename, mode)
121            self._closefp = True
122            self._mode = mode_code
123        elif hasattr(filename, "read") or hasattr(filename, "write"):
124            self._fp = filename
125            self._mode = mode_code
126        else:
127            raise TypeError("filename must be a str, bytes, file or PathLike object")
128
129        if self._mode == _MODE_READ:
130            raw = _compression.DecompressReader(self._fp, LZMADecompressor,
131                trailing_error=LZMAError, format=format, filters=filters)
132            self._buffer = io.BufferedReader(raw)
133
134    def close(self):
135        """Flush and close the file.
136
137        May be called more than once without error. Once the file is
138        closed, any other operation on it will raise a ValueError.
139        """
140        if self._mode == _MODE_CLOSED:
141            return
142        try:
143            if self._mode == _MODE_READ:
144                self._buffer.close()
145                self._buffer = None
146            elif self._mode == _MODE_WRITE:
147                self._fp.write(self._compressor.flush())
148                self._compressor = None
149        finally:
150            try:
151                if self._closefp:
152                    self._fp.close()
153            finally:
154                self._fp = None
155                self._closefp = False
156                self._mode = _MODE_CLOSED
157
158    @property
159    def closed(self):
160        """True if this file is closed."""
161        return self._mode == _MODE_CLOSED
162
163    def fileno(self):
164        """Return the file descriptor for the underlying file."""
165        self._check_not_closed()
166        return self._fp.fileno()
167
168    def seekable(self):
169        """Return whether the file supports seeking."""
170        return self.readable() and self._buffer.seekable()
171
172    def readable(self):
173        """Return whether the file was opened for reading."""
174        self._check_not_closed()
175        return self._mode == _MODE_READ
176
177    def writable(self):
178        """Return whether the file was opened for writing."""
179        self._check_not_closed()
180        return self._mode == _MODE_WRITE
181
182    def peek(self, size=-1):
183        """Return buffered data without advancing the file position.
184
185        Always returns at least one byte of data, unless at EOF.
186        The exact number of bytes returned is unspecified.
187        """
188        self._check_can_read()
189        # Relies on the undocumented fact that BufferedReader.peek() always
190        # returns at least one byte (except at EOF)
191        return self._buffer.peek(size)
192
193    def read(self, size=-1):
194        """Read up to size uncompressed bytes from the file.
195
196        If size is negative or omitted, read until EOF is reached.
197        Returns b"" if the file is already at EOF.
198        """
199        self._check_can_read()
200        return self._buffer.read(size)
201
202    def read1(self, size=-1):
203        """Read up to size uncompressed bytes, while trying to avoid
204        making multiple reads from the underlying stream. Reads up to a
205        buffer's worth of data if size is negative.
206
207        Returns b"" if the file is at EOF.
208        """
209        self._check_can_read()
210        if size < 0:
211            size = io.DEFAULT_BUFFER_SIZE
212        return self._buffer.read1(size)
213
214    def readline(self, size=-1):
215        """Read a line of uncompressed bytes from the file.
216
217        The terminating newline (if present) is retained. If size is
218        non-negative, no more than size bytes will be read (in which
219        case the line may be incomplete). Returns b'' if already at EOF.
220        """
221        self._check_can_read()
222        return self._buffer.readline(size)
223
224    def write(self, data):
225        """Write a bytes object to the file.
226
227        Returns the number of uncompressed bytes written, which is
228        always len(data). Note that due to buffering, the file on disk
229        may not reflect the data written until close() is called.
230        """
231        self._check_can_write()
232        compressed = self._compressor.compress(data)
233        self._fp.write(compressed)
234        self._pos += len(data)
235        return len(data)
236
237    def seek(self, offset, whence=io.SEEK_SET):
238        """Change the file position.
239
240        The new position is specified by offset, relative to the
241        position indicated by whence. Possible values for whence are:
242
243            0: start of stream (default): offset must not be negative
244            1: current stream position
245            2: end of stream; offset must not be positive
246
247        Returns the new file position.
248
249        Note that seeking is emulated, so depending on the parameters,
250        this operation may be extremely slow.
251        """
252        self._check_can_seek()
253        return self._buffer.seek(offset, whence)
254
255    def tell(self):
256        """Return the current file position."""
257        self._check_not_closed()
258        if self._mode == _MODE_READ:
259            return self._buffer.tell()
260        return self._pos
261
262
263def open(filename, mode="rb", *,
264         format=None, check=-1, preset=None, filters=None,
265         encoding=None, errors=None, newline=None):
266    """Open an LZMA-compressed file in binary or text mode.
267
268    filename can be either an actual file name (given as a str, bytes,
269    or PathLike object), in which case the named file is opened, or it
270    can be an existing file object to read from or write to.
271
272    The mode argument can be "r", "rb" (default), "w", "wb", "x", "xb",
273    "a", or "ab" for binary mode, or "rt", "wt", "xt", or "at" for text
274    mode.
275
276    The format, check, preset and filters arguments specify the
277    compression settings, as for LZMACompressor, LZMADecompressor and
278    LZMAFile.
279
280    For binary mode, this function is equivalent to the LZMAFile
281    constructor: LZMAFile(filename, mode, ...). In this case, the
282    encoding, errors and newline arguments must not be provided.
283
284    For text mode, an LZMAFile object is created, and wrapped in an
285    io.TextIOWrapper instance with the specified encoding, error
286    handling behavior, and line ending(s).
287
288    """
289    if "t" in mode:
290        if "b" in mode:
291            raise ValueError("Invalid mode: %r" % (mode,))
292    else:
293        if encoding is not None:
294            raise ValueError("Argument 'encoding' not supported in binary mode")
295        if errors is not None:
296            raise ValueError("Argument 'errors' not supported in binary mode")
297        if newline is not None:
298            raise ValueError("Argument 'newline' not supported in binary mode")
299
300    lz_mode = mode.replace("t", "")
301    binary_file = LZMAFile(filename, lz_mode, format=format, check=check,
302                           preset=preset, filters=filters)
303
304    if "t" in mode:
305        return io.TextIOWrapper(binary_file, encoding, errors, newline)
306    else:
307        return binary_file
308
309
310def compress(data, format=FORMAT_XZ, check=-1, preset=None, filters=None):
311    """Compress a block of data.
312
313    Refer to LZMACompressor's docstring for a description of the
314    optional arguments *format*, *check*, *preset* and *filters*.
315
316    For incremental compression, use an LZMACompressor instead.
317    """
318    comp = LZMACompressor(format, check, preset, filters)
319    return comp.compress(data) + comp.flush()
320
321
322def decompress(data, format=FORMAT_AUTO, memlimit=None, filters=None):
323    """Decompress a block of data.
324
325    Refer to LZMADecompressor's docstring for a description of the
326    optional arguments *format*, *check* and *filters*.
327
328    For incremental decompression, use an LZMADecompressor instead.
329    """
330    results = []
331    while True:
332        decomp = LZMADecompressor(format, memlimit, filters)
333        try:
334            res = decomp.decompress(data)
335        except LZMAError:
336            if results:
337                break  # Leftover data is not a valid LZMA/XZ stream; ignore it.
338            else:
339                raise  # Error on the first iteration; bail out.
340        results.append(res)
341        if not decomp.eof:
342            raise LZMAError("Compressed data ended before the "
343                            "end-of-stream marker was reached")
344        data = decomp.unused_data
345        if not data:
346            break
347    return b"".join(results)
348