• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#! /usr/bin/env python
2"""Compression/decompression utility using the Brotli algorithm."""
3
4# Note: Python2 has been deprecated long ago, but some projects out in
5# the wide world may still use it nevertheless. This should not
6# deprive them from being able to run Brotli.
7from __future__ import print_function
8
9import argparse
10import os
11import platform
12import sys
13
14import brotli
15
16
17# default values of encoder parameters
18_DEFAULT_PARAMS = {
19    'mode': brotli.MODE_GENERIC,
20    'quality': 11,
21    'lgwin': 22,
22    'lgblock': 0,
23}
24
25
26def get_binary_stdio(stream):
27    """Return the specified stdin/stdout/stderr stream.
28
29    If the stdio stream requested (i.e. sys.(stdin|stdout|stderr))
30    has been replaced with a stream object that does not have a `.buffer`
31    attribute, this will return the original stdio stream's buffer, i.e.
32    `sys.__(stdin|stdout|stderr)__.buffer`.
33
34    Args:
35      stream: One of 'stdin', 'stdout', 'stderr'.
36
37    Returns:
38      The stream, as a 'raw' buffer object (i.e. io.BufferedIOBase subclass
39      instance such as io.Bufferedreader/io.BufferedWriter), suitable for
40      reading/writing binary data from/to it.
41    """
42    if stream == 'stdin': stdio = sys.stdin
43    elif stream == 'stdout': stdio = sys.stdout
44    elif stream == 'stderr': stdio = sys.stderr
45    else:
46        raise ValueError('invalid stream name: %s' % (stream,))
47    if sys.version_info[0] < 3:
48        if sys.platform == 'win32':
49            # set I/O stream binary flag on python2.x (Windows)
50            runtime = platform.python_implementation()
51            if runtime == 'PyPy':
52                # the msvcrt trick doesn't work in pypy, so use fdopen().
53                mode = 'rb' if stream == 'stdin' else 'wb'
54                stdio = os.fdopen(stdio.fileno(), mode, 0)
55            else:
56                # this works with CPython -- untested on other implementations
57                import msvcrt
58                msvcrt.setmode(stdio.fileno(), os.O_BINARY)
59        return stdio
60    else:
61        try:
62            return stdio.buffer
63        except AttributeError:
64            # The Python reference explains
65            # (-> https://docs.python.org/3/library/sys.html#sys.stdin)
66            # that the `.buffer` attribute might not exist, since
67            # the standard streams might have been replaced by something else
68            # (such as an `io.StringIO()` - perhaps via
69            # `contextlib.redirect_stdout()`).
70            # We fall back to the original stdio in these cases.
71            if stream == 'stdin': return sys.__stdin__.buffer
72            if stream == 'stdout': return sys.__stdout__.buffer
73            if stream == 'stderr': return sys.__stderr__.buffer
74            assert False, 'Impossible Situation.'
75
76
77def main(args=None):
78
79    parser = argparse.ArgumentParser(
80        prog=os.path.basename(__file__), description=__doc__)
81    parser.add_argument(
82        '--version', action='version', version=brotli.version)
83    parser.add_argument(
84        '-i',
85        '--input',
86        metavar='FILE',
87        type=str,
88        dest='infile',
89        help='Input file',
90        default=None)
91    parser.add_argument(
92        '-o',
93        '--output',
94        metavar='FILE',
95        type=str,
96        dest='outfile',
97        help='Output file',
98        default=None)
99    parser.add_argument(
100        '-f',
101        '--force',
102        action='store_true',
103        help='Overwrite existing output file',
104        default=False)
105    parser.add_argument(
106        '-d',
107        '--decompress',
108        action='store_true',
109        help='Decompress input file',
110        default=False)
111    params = parser.add_argument_group('optional encoder parameters')
112    params.add_argument(
113        '-m',
114        '--mode',
115        metavar='MODE',
116        type=int,
117        choices=[0, 1, 2],
118        help='The compression mode can be 0 for generic input, '
119        '1 for UTF-8 encoded text, or 2 for WOFF 2.0 font data. '
120        'Defaults to 0.')
121    params.add_argument(
122        '-q',
123        '--quality',
124        metavar='QUALITY',
125        type=int,
126        choices=list(range(0, 12)),
127        help='Controls the compression-speed vs compression-density '
128        'tradeoff. The higher the quality, the slower the '
129        'compression. Range is 0 to 11. Defaults to 11.')
130    params.add_argument(
131        '--lgwin',
132        metavar='LGWIN',
133        type=int,
134        choices=list(range(10, 25)),
135        help='Base 2 logarithm of the sliding window size. Range is '
136        '10 to 24. Defaults to 22.')
137    params.add_argument(
138        '--lgblock',
139        metavar='LGBLOCK',
140        type=int,
141        choices=[0] + list(range(16, 25)),
142        help='Base 2 logarithm of the maximum input block size. '
143        'Range is 16 to 24. If set to 0, the value will be set based '
144        'on the quality. Defaults to 0.')
145    # set default values using global _DEFAULT_PARAMS dictionary
146    parser.set_defaults(**_DEFAULT_PARAMS)
147
148    options = parser.parse_args(args=args)
149
150    if options.infile:
151        try:
152            with open(options.infile, 'rb') as infile:
153                data = infile.read()
154        except OSError:
155            parser.error('Could not read --infile: %s' % (infile,))
156    else:
157        if sys.stdin.isatty():
158            # interactive console, just quit
159            parser.error('No input (called from interactive terminal).')
160        infile = get_binary_stdio('stdin')
161        data = infile.read()
162
163    if options.outfile:
164        # Caution! If `options.outfile` is a broken symlink, will try to
165        # redirect the write according to symlink.
166        if os.path.exists(options.outfile) and not options.force:
167            parser.error(('Target --outfile=%s already exists, '
168                          'but --force was not requested.') % (outfile,))
169        outfile = open(options.outfile, 'wb')
170        did_open_outfile = True
171    else:
172        outfile = get_binary_stdio('stdout')
173        did_open_outfile = False
174    try:
175        try:
176            if options.decompress:
177                data = brotli.decompress(data)
178            else:
179                data = brotli.compress(
180                    data,
181                    mode=options.mode,
182                    quality=options.quality,
183                    lgwin=options.lgwin,
184                    lgblock=options.lgblock)
185            outfile.write(data)
186        finally:
187            if did_open_outfile: outfile.close()
188    except brotli.error as e:
189        parser.exit(1,
190                    'bro: error: %s: %s' % (e, options.infile or '{stdin}'))
191
192
193if __name__ == '__main__':
194    main()
195