• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Internal classes used by the gzip, lzma and bz2 modules"""
2
3import io
4import sys
5
6BUFFER_SIZE = io.DEFAULT_BUFFER_SIZE  # Compressed data read chunk size
7
8
9class BaseStream(io.BufferedIOBase):
10    """Mode-checking helper functions."""
11
12    def _check_not_closed(self):
13        if self.closed:
14            raise ValueError("I/O operation on closed file")
15
16    def _check_can_read(self):
17        if not self.readable():
18            raise io.UnsupportedOperation("File not open for reading")
19
20    def _check_can_write(self):
21        if not self.writable():
22            raise io.UnsupportedOperation("File not open for writing")
23
24    def _check_can_seek(self):
25        if not self.readable():
26            raise io.UnsupportedOperation("Seeking is only supported "
27                                          "on files open for reading")
28        if not self.seekable():
29            raise io.UnsupportedOperation("The underlying file object "
30                                          "does not support seeking")
31
32
33class DecompressReader(io.RawIOBase):
34    """Adapts the decompressor API to a RawIOBase reader API"""
35
36    def readable(self):
37        return True
38
39    def __init__(self, fp, decomp_factory, trailing_error=(), **decomp_args):
40        self._fp = fp
41        self._eof = False
42        self._pos = 0  # Current offset in decompressed stream
43
44        # Set to size of decompressed stream once it is known, for SEEK_END
45        self._size = -1
46
47        # Save the decompressor factory and arguments.
48        # If the file contains multiple compressed streams, each
49        # stream will need a separate decompressor object. A new decompressor
50        # object is also needed when implementing a backwards seek().
51        self._decomp_factory = decomp_factory
52        self._decomp_args = decomp_args
53        self._decompressor = self._decomp_factory(**self._decomp_args)
54
55        # Exception class to catch from decompressor signifying invalid
56        # trailing data to ignore
57        self._trailing_error = trailing_error
58
59    def close(self):
60        self._decompressor = None
61        return super().close()
62
63    def seekable(self):
64        return self._fp.seekable()
65
66    def readinto(self, b):
67        with memoryview(b) as view, view.cast("B") as byte_view:
68            data = self.read(len(byte_view))
69            byte_view[:len(data)] = data
70        return len(data)
71
72    def read(self, size=-1):
73        if size < 0:
74            return self.readall()
75
76        if not size or self._eof:
77            return b""
78        data = None  # Default if EOF is encountered
79        # Depending on the input data, our call to the decompressor may not
80        # return any data. In this case, try again after reading another block.
81        while True:
82            if self._decompressor.eof:
83                rawblock = (self._decompressor.unused_data or
84                            self._fp.read(BUFFER_SIZE))
85                if not rawblock:
86                    break
87                # Continue to next stream.
88                self._decompressor = self._decomp_factory(
89                    **self._decomp_args)
90                try:
91                    data = self._decompressor.decompress(rawblock, size)
92                except self._trailing_error:
93                    # Trailing data isn't a valid compressed stream; ignore it.
94                    break
95            else:
96                if self._decompressor.needs_input:
97                    rawblock = self._fp.read(BUFFER_SIZE)
98                    if not rawblock:
99                        raise EOFError("Compressed file ended before the "
100                                       "end-of-stream marker was reached")
101                else:
102                    rawblock = b""
103                data = self._decompressor.decompress(rawblock, size)
104            if data:
105                break
106        if not data:
107            self._eof = True
108            self._size = self._pos
109            return b""
110        self._pos += len(data)
111        return data
112
113    def readall(self):
114        chunks = []
115        # sys.maxsize means the max length of output buffer is unlimited,
116        # so that the whole input buffer can be decompressed within one
117        # .decompress() call.
118        while data := self.read(sys.maxsize):
119            chunks.append(data)
120
121        return b"".join(chunks)
122
123    # Rewind the file to the beginning of the data stream.
124    def _rewind(self):
125        self._fp.seek(0)
126        self._eof = False
127        self._pos = 0
128        self._decompressor = self._decomp_factory(**self._decomp_args)
129
130    def seek(self, offset, whence=io.SEEK_SET):
131        # Recalculate offset as an absolute file position.
132        if whence == io.SEEK_SET:
133            pass
134        elif whence == io.SEEK_CUR:
135            offset = self._pos + offset
136        elif whence == io.SEEK_END:
137            # Seeking relative to EOF - we need to know the file's size.
138            if self._size < 0:
139                while self.read(io.DEFAULT_BUFFER_SIZE):
140                    pass
141            offset = self._size + offset
142        else:
143            raise ValueError("Invalid value for whence: {}".format(whence))
144
145        # Make it so that offset is the number of bytes to skip forward.
146        if offset < self._pos:
147            self._rewind()
148        else:
149            offset -= self._pos
150
151        # Read and discard data until we reach the desired position.
152        while offset > 0:
153            data = self.read(min(io.DEFAULT_BUFFER_SIZE, offset))
154            if not data:
155                break
156            offset -= len(data)
157
158        return self._pos
159
160    def tell(self):
161        """Return the current file position."""
162        return self._pos
163