1#! /usr/bin/env python 2"""Compression/decompression utility using the Brotli algorithm.""" 3 4# Note: Python2 has been deprecated long ago, but some projects out in 5# the wide world may still use it nevertheless. This should not 6# deprive them from being able to run Brotli. 7from __future__ import print_function 8 9import argparse 10import os 11import platform 12import sys 13 14import brotli 15 16 17# default values of encoder parameters 18_DEFAULT_PARAMS = { 19 'mode': brotli.MODE_GENERIC, 20 'quality': 11, 21 'lgwin': 22, 22 'lgblock': 0, 23} 24 25 26def get_binary_stdio(stream): 27 """Return the specified stdin/stdout/stderr stream. 28 29 If the stdio stream requested (i.e. sys.(stdin|stdout|stderr)) 30 has been replaced with a stream object that does not have a `.buffer` 31 attribute, this will return the original stdio stream's buffer, i.e. 32 `sys.__(stdin|stdout|stderr)__.buffer`. 33 34 Args: 35 stream: One of 'stdin', 'stdout', 'stderr'. 36 37 Returns: 38 The stream, as a 'raw' buffer object (i.e. io.BufferedIOBase subclass 39 instance such as io.Bufferedreader/io.BufferedWriter), suitable for 40 reading/writing binary data from/to it. 41 """ 42 if stream == 'stdin': stdio = sys.stdin 43 elif stream == 'stdout': stdio = sys.stdout 44 elif stream == 'stderr': stdio = sys.stderr 45 else: 46 raise ValueError('invalid stream name: %s' % (stream,)) 47 if sys.version_info[0] < 3: 48 if sys.platform == 'win32': 49 # set I/O stream binary flag on python2.x (Windows) 50 runtime = platform.python_implementation() 51 if runtime == 'PyPy': 52 # the msvcrt trick doesn't work in pypy, so use fdopen(). 53 mode = 'rb' if stream == 'stdin' else 'wb' 54 stdio = os.fdopen(stdio.fileno(), mode, 0) 55 else: 56 # this works with CPython -- untested on other implementations 57 import msvcrt 58 msvcrt.setmode(stdio.fileno(), os.O_BINARY) 59 return stdio 60 else: 61 try: 62 return stdio.buffer 63 except AttributeError: 64 # The Python reference explains 65 # (-> https://docs.python.org/3/library/sys.html#sys.stdin) 66 # that the `.buffer` attribute might not exist, since 67 # the standard streams might have been replaced by something else 68 # (such as an `io.StringIO()` - perhaps via 69 # `contextlib.redirect_stdout()`). 70 # We fall back to the original stdio in these cases. 71 if stream == 'stdin': return sys.__stdin__.buffer 72 if stream == 'stdout': return sys.__stdout__.buffer 73 if stream == 'stderr': return sys.__stderr__.buffer 74 assert False, 'Impossible Situation.' 75 76 77def main(args=None): 78 79 parser = argparse.ArgumentParser( 80 prog=os.path.basename(__file__), description=__doc__) 81 parser.add_argument( 82 '--version', action='version', version=brotli.version) 83 parser.add_argument( 84 '-i', 85 '--input', 86 metavar='FILE', 87 type=str, 88 dest='infile', 89 help='Input file', 90 default=None) 91 parser.add_argument( 92 '-o', 93 '--output', 94 metavar='FILE', 95 type=str, 96 dest='outfile', 97 help='Output file', 98 default=None) 99 parser.add_argument( 100 '-f', 101 '--force', 102 action='store_true', 103 help='Overwrite existing output file', 104 default=False) 105 parser.add_argument( 106 '-d', 107 '--decompress', 108 action='store_true', 109 help='Decompress input file', 110 default=False) 111 params = parser.add_argument_group('optional encoder parameters') 112 params.add_argument( 113 '-m', 114 '--mode', 115 metavar='MODE', 116 type=int, 117 choices=[0, 1, 2], 118 help='The compression mode can be 0 for generic input, ' 119 '1 for UTF-8 encoded text, or 2 for WOFF 2.0 font data. ' 120 'Defaults to 0.') 121 params.add_argument( 122 '-q', 123 '--quality', 124 metavar='QUALITY', 125 type=int, 126 choices=list(range(0, 12)), 127 help='Controls the compression-speed vs compression-density ' 128 'tradeoff. The higher the quality, the slower the ' 129 'compression. Range is 0 to 11. Defaults to 11.') 130 params.add_argument( 131 '--lgwin', 132 metavar='LGWIN', 133 type=int, 134 choices=list(range(10, 25)), 135 help='Base 2 logarithm of the sliding window size. Range is ' 136 '10 to 24. Defaults to 22.') 137 params.add_argument( 138 '--lgblock', 139 metavar='LGBLOCK', 140 type=int, 141 choices=[0] + list(range(16, 25)), 142 help='Base 2 logarithm of the maximum input block size. ' 143 'Range is 16 to 24. If set to 0, the value will be set based ' 144 'on the quality. Defaults to 0.') 145 # set default values using global _DEFAULT_PARAMS dictionary 146 parser.set_defaults(**_DEFAULT_PARAMS) 147 148 options = parser.parse_args(args=args) 149 150 if options.infile: 151 try: 152 with open(options.infile, 'rb') as infile: 153 data = infile.read() 154 except OSError: 155 parser.error('Could not read --infile: %s' % (infile,)) 156 else: 157 if sys.stdin.isatty(): 158 # interactive console, just quit 159 parser.error('No input (called from interactive terminal).') 160 infile = get_binary_stdio('stdin') 161 data = infile.read() 162 163 if options.outfile: 164 # Caution! If `options.outfile` is a broken symlink, will try to 165 # redirect the write according to symlink. 166 if os.path.exists(options.outfile) and not options.force: 167 parser.error(('Target --outfile=%s already exists, ' 168 'but --force was not requested.') % (outfile,)) 169 outfile = open(options.outfile, 'wb') 170 did_open_outfile = True 171 else: 172 outfile = get_binary_stdio('stdout') 173 did_open_outfile = False 174 try: 175 try: 176 if options.decompress: 177 data = brotli.decompress(data) 178 else: 179 data = brotli.compress( 180 data, 181 mode=options.mode, 182 quality=options.quality, 183 lgwin=options.lgwin, 184 lgblock=options.lgblock) 185 outfile.write(data) 186 finally: 187 if did_open_outfile: outfile.close() 188 except brotli.error as e: 189 parser.exit(1, 190 'bro: error: %s: %s' % (e, options.infile or '{stdin}')) 191 192 193if __name__ == '__main__': 194 main() 195