#!/usr/bin/env python # # Copyright 2017 Google Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Compression support for apitools.""" from collections import deque from apitools.base.py import gzip __all__ = [ 'CompressStream', ] # pylint: disable=invalid-name # Note: Apitools only uses the default chunksize when compressing. def CompressStream(in_stream, length=None, compresslevel=2, chunksize=16777216): """Compresses an input stream into a file-like buffer. This reads from the input stream until either we've stored at least length compressed bytes, or the input stream has been exhausted. This supports streams of unknown size. Args: in_stream: The input stream to read from. length: The target number of compressed bytes to buffer in the output stream. If length is none, the input stream will be compressed until it's exhausted. The actual length of the output buffer can vary from the target. If the input stream is exhaused, the output buffer may be smaller than expected. If the data is incompressible, the maximum length can be exceeded by can be calculated to be: chunksize + 5 * (floor((chunksize - 1) / 16383) + 1) + 17 This accounts for additional header data gzip adds. For the default 16MiB chunksize, this results in the max size of the output buffer being: length + 16Mib + 5142 bytes compresslevel: Optional, defaults to 2. The desired compression level. chunksize: Optional, defaults to 16MiB. The chunk size used when reading data from the input stream to write into the output buffer. Returns: A file-like output buffer of compressed bytes, the number of bytes read from the input stream, and a flag denoting if the input stream was exhausted. """ in_read = 0 in_exhausted = False out_stream = StreamingBuffer() with gzip.GzipFile(mode='wb', fileobj=out_stream, compresslevel=compresslevel) as compress_stream: # Read until we've written at least length bytes to the output stream. while not length or out_stream.length < length: data = in_stream.read(chunksize) data_length = len(data) compress_stream.write(data) in_read += data_length # If we read less than requested, the stream is exhausted. if data_length < chunksize: in_exhausted = True break return out_stream, in_read, in_exhausted class StreamingBuffer(object): """Provides a file-like object that writes to a temporary buffer. When data is read from the buffer, it is permanently removed. This is useful when there are memory constraints preventing the entire buffer from being stored in memory. """ def __init__(self): # The buffer of byte arrays. self.__buf = deque() # The number of bytes in __buf. self.__size = 0 def __len__(self): return self.__size def __nonzero__(self): # For 32-bit python2.x, len() cannot exceed a 32-bit number; avoid # accidental len() calls from httplib in the form of "if this_object:". return bool(self.__size) @property def length(self): # For 32-bit python2.x, len() cannot exceed a 32-bit number. return self.__size def write(self, data): # Gzip can write many 0 byte chunks for highly compressible data. # Prevent them from being added internally. if data is not None and data: self.__buf.append(data) self.__size += len(data) def read(self, size=None): """Read at most size bytes from this buffer. Bytes read from this buffer are consumed and are permanently removed. Args: size: If provided, read no more than size bytes from the buffer. Otherwise, this reads the entire buffer. Returns: The bytes read from this buffer. """ if size is None: size = self.__size ret_list = [] while size > 0 and self.__buf: data = self.__buf.popleft() size -= len(data) ret_list.append(data) if size < 0: ret_list[-1], remainder = ret_list[-1][:size], ret_list[-1][size:] self.__buf.appendleft(remainder) ret = b''.join(ret_list) self.__size -= len(ret) return ret