• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 import contextlib
2 import os
3 import pathlib
4 import shutil
5 import stat
6 import sys
7 import zipfile
8 
9 __all__ = ['ZipAppError', 'create_archive', 'get_interpreter']
10 
11 
12 # The __main__.py used if the users specifies "-m module:fn".
13 # Note that this will always be written as UTF-8 (module and
14 # function names can be non-ASCII in Python 3).
15 # We add a coding cookie even though UTF-8 is the default in Python 3
16 # because the resulting archive may be intended to be run under Python 2.
17 MAIN_TEMPLATE = """\
18 # -*- coding: utf-8 -*-
19 import {module}
20 {module}.{fn}()
21 """
22 
23 
24 # The Windows launcher defaults to UTF-8 when parsing shebang lines if the
25 # file has no BOM. So use UTF-8 on Windows.
26 # On Unix, use the filesystem encoding.
27 if sys.platform.startswith('win'):
28     shebang_encoding = 'utf-8'
29 else:
30     shebang_encoding = sys.getfilesystemencoding()
31 
32 
33 class ZipAppError(ValueError):
34     pass
35 
36 
37 @contextlib.contextmanager
38 def _maybe_open(archive, mode):
39     if isinstance(archive, (str, os.PathLike)):
40         with open(archive, mode) as f:
41             yield f
42     else:
43         yield archive
44 
45 
46 def _write_file_prefix(f, interpreter):
47     """Write a shebang line."""
48     if interpreter:
49         shebang = b'#!' + interpreter.encode(shebang_encoding) + b'\n'
50         f.write(shebang)
51 
52 
53 def _copy_archive(archive, new_archive, interpreter=None):
54     """Copy an application archive, modifying the shebang line."""
55     with _maybe_open(archive, 'rb') as src:
56         # Skip the shebang line from the source.
57         # Read 2 bytes of the source and check if they are #!.
58         first_2 = src.read(2)
59         if first_2 == b'#!':
60             # Discard the initial 2 bytes and the rest of the shebang line.
61             first_2 = b''
62             src.readline()
63 
64         with _maybe_open(new_archive, 'wb') as dst:
65             _write_file_prefix(dst, interpreter)
66             # If there was no shebang, "first_2" contains the first 2 bytes
67             # of the source file, so write them before copying the rest
68             # of the file.
69             dst.write(first_2)
70             shutil.copyfileobj(src, dst)
71 
72     if interpreter and isinstance(new_archive, str):
73         os.chmod(new_archive, os.stat(new_archive).st_mode | stat.S_IEXEC)
74 
75 
76 def create_archive(source, target=None, interpreter=None, main=None,
77                    filter=None, compressed=False):
78     """Create an application archive from SOURCE.
79 
80     The SOURCE can be the name of a directory, or a filename or a file-like
81     object referring to an existing archive.
82 
83     The content of SOURCE is packed into an application archive in TARGET,
84     which can be a filename or a file-like object.  If SOURCE is a directory,
85     TARGET can be omitted and will default to the name of SOURCE with .pyz
86     appended.
87 
88     The created application archive will have a shebang line specifying
89     that it should run with INTERPRETER (there will be no shebang line if
90     INTERPRETER is None), and a __main__.py which runs MAIN (if MAIN is
91     not specified, an existing __main__.py will be used).  It is an error
92     to specify MAIN for anything other than a directory source with no
93     __main__.py, and it is an error to omit MAIN if the directory has no
94     __main__.py.
95     """
96     # Are we copying an existing archive?
97     source_is_file = False
98     if hasattr(source, 'read') and hasattr(source, 'readline'):
99         source_is_file = True
100     else:
101         source = pathlib.Path(source)
102         if source.is_file():
103             source_is_file = True
104 
105     if source_is_file:
106         _copy_archive(source, target, interpreter)
107         return
108 
109     # We are creating a new archive from a directory.
110     if not source.exists():
111         raise ZipAppError("Source does not exist")
112     has_main = (source / '__main__.py').is_file()
113     if main and has_main:
114         raise ZipAppError(
115             "Cannot specify entry point if the source has __main__.py")
116     if not (main or has_main):
117         raise ZipAppError("Archive has no entry point")
118 
119     main_py = None
120     if main:
121         # Check that main has the right format.
122         mod, sep, fn = main.partition(':')
123         mod_ok = all(part.isidentifier() for part in mod.split('.'))
124         fn_ok = all(part.isidentifier() for part in fn.split('.'))
125         if not (sep == ':' and mod_ok and fn_ok):
126             raise ZipAppError("Invalid entry point: " + main)
127         main_py = MAIN_TEMPLATE.format(module=mod, fn=fn)
128 
129     if target is None:
130         target = source.with_suffix('.pyz')
131     elif not hasattr(target, 'write'):
132         target = pathlib.Path(target)
133 
134     with _maybe_open(target, 'wb') as fd:
135         _write_file_prefix(fd, interpreter)
136         compression = (zipfile.ZIP_DEFLATED if compressed else
137                        zipfile.ZIP_STORED)
138         with zipfile.ZipFile(fd, 'w', compression=compression) as z:
139             for child in source.rglob('*'):
140                 arcname = child.relative_to(source)
141                 if filter is None or filter(arcname):
142                     z.write(child, arcname.as_posix())
143             if main_py:
144                 z.writestr('__main__.py', main_py.encode('utf-8'))
145 
146     if interpreter and not hasattr(target, 'write'):
147         target.chmod(target.stat().st_mode | stat.S_IEXEC)
148 
149 
150 def get_interpreter(archive):
151     with _maybe_open(archive, 'rb') as f:
152         if f.read(2) == b'#!':
153             return f.readline().strip().decode(shebang_encoding)
154 
155 
156 def main(args=None):
157     """Run the zipapp command line interface.
158 
159     The ARGS parameter lets you specify the argument list directly.
160     Omitting ARGS (or setting it to None) works as for argparse, using
161     sys.argv[1:] as the argument list.
162     """
163     import argparse
164 
165     parser = argparse.ArgumentParser()
166     parser.add_argument('--output', '-o', default=None,
167             help="The name of the output archive. "
168                  "Required if SOURCE is an archive.")
169     parser.add_argument('--python', '-p', default=None,
170             help="The name of the Python interpreter to use "
171                  "(default: no shebang line).")
172     parser.add_argument('--main', '-m', default=None,
173             help="The main function of the application "
174                  "(default: use an existing __main__.py).")
175     parser.add_argument('--compress', '-c', action='store_true',
176             help="Compress files with the deflate method. "
177                  "Files are stored uncompressed by default.")
178     parser.add_argument('--info', default=False, action='store_true',
179             help="Display the interpreter from the archive.")
180     parser.add_argument('source',
181             help="Source directory (or existing archive).")
182 
183     args = parser.parse_args(args)
184 
185     # Handle `python -m zipapp archive.pyz --info`.
186     if args.info:
187         if not os.path.isfile(args.source):
188             raise SystemExit("Can only get info for an archive file")
189         interpreter = get_interpreter(args.source)
190         print("Interpreter: {}".format(interpreter or "<none>"))
191         sys.exit(0)
192 
193     if os.path.isfile(args.source):
194         if args.output is None or (os.path.exists(args.output) and
195                                    os.path.samefile(args.source, args.output)):
196             raise SystemExit("In-place editing of archives is not supported")
197         if args.main:
198             raise SystemExit("Cannot change the main function when copying")
199 
200     create_archive(args.source, args.output,
201                    interpreter=args.python, main=args.main,
202                    compressed=args.compress)
203 
204 
205 if __name__ == '__main__':
206     main()
207