• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import contextlib
2import os
3import pathlib
4import shutil
5import stat
6import sys
7import zipfile
8
9__all__ = ['ZipAppError', 'create_archive', 'get_interpreter']
10
11
12# The __main__.py used if the users specifies "-m module:fn".
13# Note that this will always be written as UTF-8 (module and
14# function names can be non-ASCII in Python 3).
15# We add a coding cookie even though UTF-8 is the default in Python 3
16# because the resulting archive may be intended to be run under Python 2.
17MAIN_TEMPLATE = """\
18# -*- coding: utf-8 -*-
19import {module}
20{module}.{fn}()
21"""
22
23
24# The Windows launcher defaults to UTF-8 when parsing shebang lines if the
25# file has no BOM. So use UTF-8 on Windows.
26# On Unix, use the filesystem encoding.
27if sys.platform.startswith('win'):
28    shebang_encoding = 'utf-8'
29else:
30    shebang_encoding = sys.getfilesystemencoding()
31
32
33class ZipAppError(ValueError):
34    pass
35
36
37@contextlib.contextmanager
38def _maybe_open(archive, mode):
39    if isinstance(archive, (str, os.PathLike)):
40        with open(archive, mode) as f:
41            yield f
42    else:
43        yield archive
44
45
46def _write_file_prefix(f, interpreter):
47    """Write a shebang line."""
48    if interpreter:
49        shebang = b'#!' + interpreter.encode(shebang_encoding) + b'\n'
50        f.write(shebang)
51
52
53def _copy_archive(archive, new_archive, interpreter=None):
54    """Copy an application archive, modifying the shebang line."""
55    with _maybe_open(archive, 'rb') as src:
56        # Skip the shebang line from the source.
57        # Read 2 bytes of the source and check if they are #!.
58        first_2 = src.read(2)
59        if first_2 == b'#!':
60            # Discard the initial 2 bytes and the rest of the shebang line.
61            first_2 = b''
62            src.readline()
63
64        with _maybe_open(new_archive, 'wb') as dst:
65            _write_file_prefix(dst, interpreter)
66            # If there was no shebang, "first_2" contains the first 2 bytes
67            # of the source file, so write them before copying the rest
68            # of the file.
69            dst.write(first_2)
70            shutil.copyfileobj(src, dst)
71
72    if interpreter and isinstance(new_archive, str):
73        os.chmod(new_archive, os.stat(new_archive).st_mode | stat.S_IEXEC)
74
75
76def create_archive(source, target=None, interpreter=None, main=None,
77                   filter=None, compressed=False):
78    """Create an application archive from SOURCE.
79
80    The SOURCE can be the name of a directory, or a filename or a file-like
81    object referring to an existing archive.
82
83    The content of SOURCE is packed into an application archive in TARGET,
84    which can be a filename or a file-like object.  If SOURCE is a directory,
85    TARGET can be omitted and will default to the name of SOURCE with .pyz
86    appended.
87
88    The created application archive will have a shebang line specifying
89    that it should run with INTERPRETER (there will be no shebang line if
90    INTERPRETER is None), and a __main__.py which runs MAIN (if MAIN is
91    not specified, an existing __main__.py will be used).  It is an error
92    to specify MAIN for anything other than a directory source with no
93    __main__.py, and it is an error to omit MAIN if the directory has no
94    __main__.py.
95    """
96    # Are we copying an existing archive?
97    source_is_file = False
98    if hasattr(source, 'read') and hasattr(source, 'readline'):
99        source_is_file = True
100    else:
101        source = pathlib.Path(source)
102        if source.is_file():
103            source_is_file = True
104
105    if source_is_file:
106        _copy_archive(source, target, interpreter)
107        return
108
109    # We are creating a new archive from a directory.
110    if not source.exists():
111        raise ZipAppError("Source does not exist")
112    has_main = (source / '__main__.py').is_file()
113    if main and has_main:
114        raise ZipAppError(
115            "Cannot specify entry point if the source has __main__.py")
116    if not (main or has_main):
117        raise ZipAppError("Archive has no entry point")
118
119    main_py = None
120    if main:
121        # Check that main has the right format.
122        mod, sep, fn = main.partition(':')
123        mod_ok = all(part.isidentifier() for part in mod.split('.'))
124        fn_ok = all(part.isidentifier() for part in fn.split('.'))
125        if not (sep == ':' and mod_ok and fn_ok):
126            raise ZipAppError("Invalid entry point: " + main)
127        main_py = MAIN_TEMPLATE.format(module=mod, fn=fn)
128
129    if target is None:
130        target = source.with_suffix('.pyz')
131    elif not hasattr(target, 'write'):
132        target = pathlib.Path(target)
133
134    with _maybe_open(target, 'wb') as fd:
135        _write_file_prefix(fd, interpreter)
136        compression = (zipfile.ZIP_DEFLATED if compressed else
137                       zipfile.ZIP_STORED)
138        with zipfile.ZipFile(fd, 'w', compression=compression) as z:
139            for child in source.rglob('*'):
140                arcname = child.relative_to(source)
141                if filter is None or filter(arcname):
142                    z.write(child, arcname.as_posix())
143            if main_py:
144                z.writestr('__main__.py', main_py.encode('utf-8'))
145
146    if interpreter and not hasattr(target, 'write'):
147        target.chmod(target.stat().st_mode | stat.S_IEXEC)
148
149
150def get_interpreter(archive):
151    with _maybe_open(archive, 'rb') as f:
152        if f.read(2) == b'#!':
153            return f.readline().strip().decode(shebang_encoding)
154
155
156def main(args=None):
157    """Run the zipapp command line interface.
158
159    The ARGS parameter lets you specify the argument list directly.
160    Omitting ARGS (or setting it to None) works as for argparse, using
161    sys.argv[1:] as the argument list.
162    """
163    import argparse
164
165    parser = argparse.ArgumentParser()
166    parser.add_argument('--output', '-o', default=None,
167            help="The name of the output archive. "
168                 "Required if SOURCE is an archive.")
169    parser.add_argument('--python', '-p', default=None,
170            help="The name of the Python interpreter to use "
171                 "(default: no shebang line).")
172    parser.add_argument('--main', '-m', default=None,
173            help="The main function of the application "
174                 "(default: use an existing __main__.py).")
175    parser.add_argument('--compress', '-c', action='store_true',
176            help="Compress files with the deflate method. "
177                 "Files are stored uncompressed by default.")
178    parser.add_argument('--info', default=False, action='store_true',
179            help="Display the interpreter from the archive.")
180    parser.add_argument('source',
181            help="Source directory (or existing archive).")
182
183    args = parser.parse_args(args)
184
185    # Handle `python -m zipapp archive.pyz --info`.
186    if args.info:
187        if not os.path.isfile(args.source):
188            raise SystemExit("Can only get info for an archive file")
189        interpreter = get_interpreter(args.source)
190        print("Interpreter: {}".format(interpreter or "<none>"))
191        sys.exit(0)
192
193    if os.path.isfile(args.source):
194        if args.output is None or (os.path.exists(args.output) and
195                                   os.path.samefile(args.source, args.output)):
196            raise SystemExit("In-place editing of archives is not supported")
197        if args.main:
198            raise SystemExit("Cannot change the main function when copying")
199
200    create_archive(args.source, args.output,
201                   interpreter=args.python, main=args.main,
202                   compressed=args.compress)
203
204
205if __name__ == '__main__':
206    main()
207