• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Module/script to byte-compile all .py files to .pyc files.
2
3When called as a script with arguments, this compiles the directories
4given as arguments recursively; the -l option prevents it from
5recursing into directories.
6
7Without arguments, if compiles all modules on sys.path, without
8recursing into subdirectories.  (Even though it should do so for
9packages -- for now, you'll have to deal with packages separately.)
10
11See module py_compile for details of the actual byte-compilation.
12"""
13import os
14import sys
15import importlib.util
16import py_compile
17import struct
18import filecmp
19
20from functools import partial
21from pathlib import Path
22
23__all__ = ["compile_dir","compile_file","compile_path"]
24
25def _walk_dir(dir, maxlevels, quiet=0):
26    if quiet < 2 and isinstance(dir, os.PathLike):
27        dir = os.fspath(dir)
28    if not quiet:
29        print('Listing {!r}...'.format(dir))
30    try:
31        names = os.listdir(dir)
32    except OSError:
33        if quiet < 2:
34            print("Can't list {!r}".format(dir))
35        names = []
36    names.sort()
37    for name in names:
38        if name == '__pycache__':
39            continue
40        fullname = os.path.join(dir, name)
41        if not os.path.isdir(fullname):
42            yield fullname
43        elif (maxlevels > 0 and name != os.curdir and name != os.pardir and
44              os.path.isdir(fullname) and not os.path.islink(fullname)):
45            yield from _walk_dir(fullname, maxlevels=maxlevels - 1,
46                                 quiet=quiet)
47
48def compile_dir(dir, maxlevels=None, ddir=None, force=False,
49                rx=None, quiet=0, legacy=False, optimize=-1, workers=1,
50                invalidation_mode=None, *, stripdir=None,
51                prependdir=None, limit_sl_dest=None, hardlink_dupes=False):
52    """Byte-compile all modules in the given directory tree.
53
54    Arguments (only dir is required):
55
56    dir:       the directory to byte-compile
57    maxlevels: maximum recursion level (default `sys.getrecursionlimit()`)
58    ddir:      the directory that will be prepended to the path to the
59               file as it is compiled into each byte-code file.
60    force:     if True, force compilation, even if timestamps are up-to-date
61    quiet:     full output with False or 0, errors only with 1,
62               no output with 2
63    legacy:    if True, produce legacy pyc paths instead of PEP 3147 paths
64    optimize:  int or list of optimization levels or -1 for level of
65               the interpreter. Multiple levels leads to multiple compiled
66               files each with one optimization level.
67    workers:   maximum number of parallel workers
68    invalidation_mode: how the up-to-dateness of the pyc will be checked
69    stripdir:  part of path to left-strip from source file path
70    prependdir: path to prepend to beginning of original file path, applied
71               after stripdir
72    limit_sl_dest: ignore symlinks if they are pointing outside of
73                   the defined path
74    hardlink_dupes: hardlink duplicated pyc files
75    """
76    ProcessPoolExecutor = None
77    if ddir is not None and (stripdir is not None or prependdir is not None):
78        raise ValueError(("Destination dir (ddir) cannot be used "
79                          "in combination with stripdir or prependdir"))
80    if ddir is not None:
81        stripdir = dir
82        prependdir = ddir
83        ddir = None
84    if workers < 0:
85        raise ValueError('workers must be greater or equal to 0')
86    if workers != 1:
87        # Check if this is a system where ProcessPoolExecutor can function.
88        from concurrent.futures.process import _check_system_limits
89        try:
90            _check_system_limits()
91        except NotImplementedError:
92            workers = 1
93        else:
94            from concurrent.futures import ProcessPoolExecutor
95    if maxlevels is None:
96        maxlevels = sys.getrecursionlimit()
97    files = _walk_dir(dir, quiet=quiet, maxlevels=maxlevels)
98    success = True
99    if workers != 1 and ProcessPoolExecutor is not None:
100        # If workers == 0, let ProcessPoolExecutor choose
101        workers = workers or None
102        with ProcessPoolExecutor(max_workers=workers) as executor:
103            results = executor.map(partial(compile_file,
104                                           ddir=ddir, force=force,
105                                           rx=rx, quiet=quiet,
106                                           legacy=legacy,
107                                           optimize=optimize,
108                                           invalidation_mode=invalidation_mode,
109                                           stripdir=stripdir,
110                                           prependdir=prependdir,
111                                           limit_sl_dest=limit_sl_dest,
112                                           hardlink_dupes=hardlink_dupes),
113                                   files)
114            success = min(results, default=True)
115    else:
116        for file in files:
117            if not compile_file(file, ddir, force, rx, quiet,
118                                legacy, optimize, invalidation_mode,
119                                stripdir=stripdir, prependdir=prependdir,
120                                limit_sl_dest=limit_sl_dest,
121                                hardlink_dupes=hardlink_dupes):
122                success = False
123    return success
124
125def compile_file(fullname, ddir=None, force=False, rx=None, quiet=0,
126                 legacy=False, optimize=-1,
127                 invalidation_mode=None, *, stripdir=None, prependdir=None,
128                 limit_sl_dest=None, hardlink_dupes=False):
129    """Byte-compile one file.
130
131    Arguments (only fullname is required):
132
133    fullname:  the file to byte-compile
134    ddir:      if given, the directory name compiled in to the
135               byte-code file.
136    force:     if True, force compilation, even if timestamps are up-to-date
137    quiet:     full output with False or 0, errors only with 1,
138               no output with 2
139    legacy:    if True, produce legacy pyc paths instead of PEP 3147 paths
140    optimize:  int or list of optimization levels or -1 for level of
141               the interpreter. Multiple levels leads to multiple compiled
142               files each with one optimization level.
143    invalidation_mode: how the up-to-dateness of the pyc will be checked
144    stripdir:  part of path to left-strip from source file path
145    prependdir: path to prepend to beginning of original file path, applied
146               after stripdir
147    limit_sl_dest: ignore symlinks if they are pointing outside of
148                   the defined path.
149    hardlink_dupes: hardlink duplicated pyc files
150    """
151
152    if ddir is not None and (stripdir is not None or prependdir is not None):
153        raise ValueError(("Destination dir (ddir) cannot be used "
154                          "in combination with stripdir or prependdir"))
155
156    success = True
157    if quiet < 2 and isinstance(fullname, os.PathLike):
158        fullname = os.fspath(fullname)
159    name = os.path.basename(fullname)
160
161    dfile = None
162
163    if ddir is not None:
164        dfile = os.path.join(ddir, name)
165
166    if stripdir is not None:
167        fullname_parts = fullname.split(os.path.sep)
168        stripdir_parts = stripdir.split(os.path.sep)
169        ddir_parts = list(fullname_parts)
170
171        for spart, opart in zip(stripdir_parts, fullname_parts):
172            if spart == opart:
173                ddir_parts.remove(spart)
174
175        dfile = os.path.join(*ddir_parts)
176
177    if prependdir is not None:
178        if dfile is None:
179            dfile = os.path.join(prependdir, fullname)
180        else:
181            dfile = os.path.join(prependdir, dfile)
182
183    if isinstance(optimize, int):
184        optimize = [optimize]
185
186    # Use set() to remove duplicates.
187    # Use sorted() to create pyc files in a deterministic order.
188    optimize = sorted(set(optimize))
189
190    if hardlink_dupes and len(optimize) < 2:
191        raise ValueError("Hardlinking of duplicated bytecode makes sense "
192                          "only for more than one optimization level")
193
194    if rx is not None:
195        mo = rx.search(fullname)
196        if mo:
197            return success
198
199    if limit_sl_dest is not None and os.path.islink(fullname):
200        if Path(limit_sl_dest).resolve() not in Path(fullname).resolve().parents:
201            return success
202
203    opt_cfiles = {}
204
205    if os.path.isfile(fullname):
206        for opt_level in optimize:
207            if legacy:
208                opt_cfiles[opt_level] = fullname + 'c'
209            else:
210                if opt_level >= 0:
211                    opt = opt_level if opt_level >= 1 else ''
212                    cfile = (importlib.util.cache_from_source(
213                             fullname, optimization=opt))
214                    opt_cfiles[opt_level] = cfile
215                else:
216                    cfile = importlib.util.cache_from_source(fullname)
217                    opt_cfiles[opt_level] = cfile
218
219        head, tail = name[:-3], name[-3:]
220        if tail == '.py':
221            if not force:
222                try:
223                    mtime = int(os.stat(fullname).st_mtime)
224                    expect = struct.pack('<4sLL', importlib.util.MAGIC_NUMBER,
225                                         0, mtime & 0xFFFF_FFFF)
226                    for cfile in opt_cfiles.values():
227                        with open(cfile, 'rb') as chandle:
228                            actual = chandle.read(12)
229                        if expect != actual:
230                            break
231                    else:
232                        return success
233                except OSError:
234                    pass
235            if not quiet:
236                print('Compiling {!r}...'.format(fullname))
237            try:
238                for index, opt_level in enumerate(optimize):
239                    cfile = opt_cfiles[opt_level]
240                    ok = py_compile.compile(fullname, cfile, dfile, True,
241                                            optimize=opt_level,
242                                            invalidation_mode=invalidation_mode)
243                    if index > 0 and hardlink_dupes:
244                        previous_cfile = opt_cfiles[optimize[index - 1]]
245                        if filecmp.cmp(cfile, previous_cfile, shallow=False):
246                            os.unlink(cfile)
247                            os.link(previous_cfile, cfile)
248            except py_compile.PyCompileError as err:
249                success = False
250                if quiet >= 2:
251                    return success
252                elif quiet:
253                    print('*** Error compiling {!r}...'.format(fullname))
254                else:
255                    print('*** ', end='')
256                # escape non-printable characters in msg
257                encoding = sys.stdout.encoding or sys.getdefaultencoding()
258                msg = err.msg.encode(encoding, errors='backslashreplace').decode(encoding)
259                print(msg)
260            except (SyntaxError, UnicodeError, OSError) as e:
261                success = False
262                if quiet >= 2:
263                    return success
264                elif quiet:
265                    print('*** Error compiling {!r}...'.format(fullname))
266                else:
267                    print('*** ', end='')
268                print(e.__class__.__name__ + ':', e)
269            else:
270                if ok == 0:
271                    success = False
272    return success
273
274def compile_path(skip_curdir=1, maxlevels=0, force=False, quiet=0,
275                 legacy=False, optimize=-1,
276                 invalidation_mode=None):
277    """Byte-compile all module on sys.path.
278
279    Arguments (all optional):
280
281    skip_curdir: if true, skip current directory (default True)
282    maxlevels:   max recursion level (default 0)
283    force: as for compile_dir() (default False)
284    quiet: as for compile_dir() (default 0)
285    legacy: as for compile_dir() (default False)
286    optimize: as for compile_dir() (default -1)
287    invalidation_mode: as for compiler_dir()
288    """
289    success = True
290    for dir in sys.path:
291        if (not dir or dir == os.curdir) and skip_curdir:
292            if quiet < 2:
293                print('Skipping current directory')
294        else:
295            success = success and compile_dir(
296                dir,
297                maxlevels,
298                None,
299                force,
300                quiet=quiet,
301                legacy=legacy,
302                optimize=optimize,
303                invalidation_mode=invalidation_mode,
304            )
305    return success
306
307
308def main():
309    """Script main program."""
310    import argparse
311
312    parser = argparse.ArgumentParser(
313        description='Utilities to support installing Python libraries.')
314    parser.add_argument('-l', action='store_const', const=0,
315                        default=None, dest='maxlevels',
316                        help="don't recurse into subdirectories")
317    parser.add_argument('-r', type=int, dest='recursion',
318                        help=('control the maximum recursion level. '
319                              'if `-l` and `-r` options are specified, '
320                              'then `-r` takes precedence.'))
321    parser.add_argument('-f', action='store_true', dest='force',
322                        help='force rebuild even if timestamps are up to date')
323    parser.add_argument('-q', action='count', dest='quiet', default=0,
324                        help='output only error messages; -qq will suppress '
325                             'the error messages as well.')
326    parser.add_argument('-b', action='store_true', dest='legacy',
327                        help='use legacy (pre-PEP3147) compiled file locations')
328    parser.add_argument('-d', metavar='DESTDIR',  dest='ddir', default=None,
329                        help=('directory to prepend to file paths for use in '
330                              'compile-time tracebacks and in runtime '
331                              'tracebacks in cases where the source file is '
332                              'unavailable'))
333    parser.add_argument('-s', metavar='STRIPDIR',  dest='stripdir',
334                        default=None,
335                        help=('part of path to left-strip from path '
336                              'to source file - for example buildroot. '
337                              '`-d` and `-s` options cannot be '
338                              'specified together.'))
339    parser.add_argument('-p', metavar='PREPENDDIR',  dest='prependdir',
340                        default=None,
341                        help=('path to add as prefix to path '
342                              'to source file - for example / to make '
343                              'it absolute when some part is removed '
344                              'by `-s` option. '
345                              '`-d` and `-p` options cannot be '
346                              'specified together.'))
347    parser.add_argument('-x', metavar='REGEXP', dest='rx', default=None,
348                        help=('skip files matching the regular expression; '
349                              'the regexp is searched for in the full path '
350                              'of each file considered for compilation'))
351    parser.add_argument('-i', metavar='FILE', dest='flist',
352                        help=('add all the files and directories listed in '
353                              'FILE to the list considered for compilation; '
354                              'if "-", names are read from stdin'))
355    parser.add_argument('compile_dest', metavar='FILE|DIR', nargs='*',
356                        help=('zero or more file and directory names '
357                              'to compile; if no arguments given, defaults '
358                              'to the equivalent of -l sys.path'))
359    parser.add_argument('-j', '--workers', default=1,
360                        type=int, help='Run compileall concurrently')
361    invalidation_modes = [mode.name.lower().replace('_', '-')
362                          for mode in py_compile.PycInvalidationMode]
363    parser.add_argument('--invalidation-mode',
364                        choices=sorted(invalidation_modes),
365                        help=('set .pyc invalidation mode; defaults to '
366                              '"checked-hash" if the SOURCE_DATE_EPOCH '
367                              'environment variable is set, and '
368                              '"timestamp" otherwise.'))
369    parser.add_argument('-o', action='append', type=int, dest='opt_levels',
370                        help=('Optimization levels to run compilation with. '
371                              'Default is -1 which uses the optimization level '
372                              'of the Python interpreter itself (see -O).'))
373    parser.add_argument('-e', metavar='DIR', dest='limit_sl_dest',
374                        help='Ignore symlinks pointing outsite of the DIR')
375    parser.add_argument('--hardlink-dupes', action='store_true',
376                        dest='hardlink_dupes',
377                        help='Hardlink duplicated pyc files')
378
379    args = parser.parse_args()
380    compile_dests = args.compile_dest
381
382    if args.rx:
383        import re
384        args.rx = re.compile(args.rx)
385
386    if args.limit_sl_dest == "":
387        args.limit_sl_dest = None
388
389    if args.recursion is not None:
390        maxlevels = args.recursion
391    else:
392        maxlevels = args.maxlevels
393
394    if args.opt_levels is None:
395        args.opt_levels = [-1]
396
397    if len(args.opt_levels) == 1 and args.hardlink_dupes:
398        parser.error(("Hardlinking of duplicated bytecode makes sense "
399                      "only for more than one optimization level."))
400
401    if args.ddir is not None and (
402        args.stripdir is not None or args.prependdir is not None
403    ):
404        parser.error("-d cannot be used in combination with -s or -p")
405
406    # if flist is provided then load it
407    if args.flist:
408        try:
409            with (sys.stdin if args.flist=='-' else
410                    open(args.flist, encoding="utf-8")) as f:
411                for line in f:
412                    compile_dests.append(line.strip())
413        except OSError:
414            if args.quiet < 2:
415                print("Error reading file list {}".format(args.flist))
416            return False
417
418    if args.invalidation_mode:
419        ivl_mode = args.invalidation_mode.replace('-', '_').upper()
420        invalidation_mode = py_compile.PycInvalidationMode[ivl_mode]
421    else:
422        invalidation_mode = None
423
424    success = True
425    try:
426        if compile_dests:
427            for dest in compile_dests:
428                if os.path.isfile(dest):
429                    if not compile_file(dest, args.ddir, args.force, args.rx,
430                                        args.quiet, args.legacy,
431                                        invalidation_mode=invalidation_mode,
432                                        stripdir=args.stripdir,
433                                        prependdir=args.prependdir,
434                                        optimize=args.opt_levels,
435                                        limit_sl_dest=args.limit_sl_dest,
436                                        hardlink_dupes=args.hardlink_dupes):
437                        success = False
438                else:
439                    if not compile_dir(dest, maxlevels, args.ddir,
440                                       args.force, args.rx, args.quiet,
441                                       args.legacy, workers=args.workers,
442                                       invalidation_mode=invalidation_mode,
443                                       stripdir=args.stripdir,
444                                       prependdir=args.prependdir,
445                                       optimize=args.opt_levels,
446                                       limit_sl_dest=args.limit_sl_dest,
447                                       hardlink_dupes=args.hardlink_dupes):
448                        success = False
449            return success
450        else:
451            return compile_path(legacy=args.legacy, force=args.force,
452                                quiet=args.quiet,
453                                invalidation_mode=invalidation_mode)
454    except KeyboardInterrupt:
455        if args.quiet < 2:
456            print("\n[interrupted]")
457        return False
458    return True
459
460
461if __name__ == '__main__':
462    exit_status = int(not main())
463    sys.exit(exit_status)
464