1"""Module/script to byte-compile all .py files to .pyc files. 2 3When called as a script with arguments, this compiles the directories 4given as arguments recursively; the -l option prevents it from 5recursing into directories. 6 7Without arguments, it compiles all modules on sys.path, without 8recursing into subdirectories. (Even though it should do so for 9packages -- for now, you'll have to deal with packages separately.) 10 11See module py_compile for details of the actual byte-compilation. 12""" 13import os 14import sys 15import importlib.util 16import py_compile 17import struct 18import filecmp 19 20from functools import partial 21from pathlib import Path 22 23__all__ = ["compile_dir","compile_file","compile_path"] 24 25def _walk_dir(dir, maxlevels, quiet=0): 26 if quiet < 2 and isinstance(dir, os.PathLike): 27 dir = os.fspath(dir) 28 if not quiet: 29 print('Listing {!r}...'.format(dir)) 30 try: 31 names = os.listdir(dir) 32 except OSError: 33 if quiet < 2: 34 print("Can't list {!r}".format(dir)) 35 names = [] 36 names.sort() 37 for name in names: 38 if name == '__pycache__': 39 continue 40 fullname = os.path.join(dir, name) 41 if not os.path.isdir(fullname): 42 yield fullname 43 elif (maxlevels > 0 and name != os.curdir and name != os.pardir and 44 os.path.isdir(fullname) and not os.path.islink(fullname)): 45 yield from _walk_dir(fullname, maxlevels=maxlevels - 1, 46 quiet=quiet) 47 48def compile_dir(dir, maxlevels=None, ddir=None, force=False, 49 rx=None, quiet=0, legacy=False, optimize=-1, workers=1, 50 invalidation_mode=None, *, stripdir=None, 51 prependdir=None, limit_sl_dest=None, hardlink_dupes=False): 52 """Byte-compile all modules in the given directory tree. 53 54 Arguments (only dir is required): 55 56 dir: the directory to byte-compile 57 maxlevels: maximum recursion level (default `sys.getrecursionlimit()`) 58 ddir: the directory that will be prepended to the path to the 59 file as it is compiled into each byte-code file. 60 force: if True, force compilation, even if timestamps are up-to-date 61 quiet: full output with False or 0, errors only with 1, 62 no output with 2 63 legacy: if True, produce legacy pyc paths instead of PEP 3147 paths 64 optimize: int or list of optimization levels or -1 for level of 65 the interpreter. Multiple levels leads to multiple compiled 66 files each with one optimization level. 67 workers: maximum number of parallel workers 68 invalidation_mode: how the up-to-dateness of the pyc will be checked 69 stripdir: part of path to left-strip from source file path 70 prependdir: path to prepend to beginning of original file path, applied 71 after stripdir 72 limit_sl_dest: ignore symlinks if they are pointing outside of 73 the defined path 74 hardlink_dupes: hardlink duplicated pyc files 75 """ 76 ProcessPoolExecutor = None 77 if ddir is not None and (stripdir is not None or prependdir is not None): 78 raise ValueError(("Destination dir (ddir) cannot be used " 79 "in combination with stripdir or prependdir")) 80 if ddir is not None: 81 stripdir = dir 82 prependdir = ddir 83 ddir = None 84 if workers < 0: 85 raise ValueError('workers must be greater or equal to 0') 86 if workers != 1: 87 # Check if this is a system where ProcessPoolExecutor can function. 88 from concurrent.futures.process import _check_system_limits 89 try: 90 _check_system_limits() 91 except NotImplementedError: 92 workers = 1 93 else: 94 from concurrent.futures import ProcessPoolExecutor 95 if maxlevels is None: 96 maxlevels = sys.getrecursionlimit() 97 files = _walk_dir(dir, quiet=quiet, maxlevels=maxlevels) 98 success = True 99 if workers != 1 and ProcessPoolExecutor is not None: 100 import multiprocessing 101 if multiprocessing.get_start_method() == 'fork': 102 mp_context = multiprocessing.get_context('forkserver') 103 else: 104 mp_context = None 105 # If workers == 0, let ProcessPoolExecutor choose 106 workers = workers or None 107 with ProcessPoolExecutor(max_workers=workers, 108 mp_context=mp_context) as executor: 109 results = executor.map(partial(compile_file, 110 ddir=ddir, force=force, 111 rx=rx, quiet=quiet, 112 legacy=legacy, 113 optimize=optimize, 114 invalidation_mode=invalidation_mode, 115 stripdir=stripdir, 116 prependdir=prependdir, 117 limit_sl_dest=limit_sl_dest, 118 hardlink_dupes=hardlink_dupes), 119 files, 120 chunksize=4) 121 success = min(results, default=True) 122 else: 123 for file in files: 124 if not compile_file(file, ddir, force, rx, quiet, 125 legacy, optimize, invalidation_mode, 126 stripdir=stripdir, prependdir=prependdir, 127 limit_sl_dest=limit_sl_dest, 128 hardlink_dupes=hardlink_dupes): 129 success = False 130 return success 131 132def compile_file(fullname, ddir=None, force=False, rx=None, quiet=0, 133 legacy=False, optimize=-1, 134 invalidation_mode=None, *, stripdir=None, prependdir=None, 135 limit_sl_dest=None, hardlink_dupes=False): 136 """Byte-compile one file. 137 138 Arguments (only fullname is required): 139 140 fullname: the file to byte-compile 141 ddir: if given, the directory name compiled in to the 142 byte-code file. 143 force: if True, force compilation, even if timestamps are up-to-date 144 quiet: full output with False or 0, errors only with 1, 145 no output with 2 146 legacy: if True, produce legacy pyc paths instead of PEP 3147 paths 147 optimize: int or list of optimization levels or -1 for level of 148 the interpreter. Multiple levels leads to multiple compiled 149 files each with one optimization level. 150 invalidation_mode: how the up-to-dateness of the pyc will be checked 151 stripdir: part of path to left-strip from source file path 152 prependdir: path to prepend to beginning of original file path, applied 153 after stripdir 154 limit_sl_dest: ignore symlinks if they are pointing outside of 155 the defined path. 156 hardlink_dupes: hardlink duplicated pyc files 157 """ 158 159 if ddir is not None and (stripdir is not None or prependdir is not None): 160 raise ValueError(("Destination dir (ddir) cannot be used " 161 "in combination with stripdir or prependdir")) 162 163 success = True 164 fullname = os.fspath(fullname) 165 stripdir = os.fspath(stripdir) if stripdir is not None else None 166 name = os.path.basename(fullname) 167 168 dfile = None 169 170 if ddir is not None: 171 dfile = os.path.join(ddir, name) 172 173 if stripdir is not None: 174 fullname_parts = fullname.split(os.path.sep) 175 stripdir_parts = stripdir.split(os.path.sep) 176 177 if stripdir_parts != fullname_parts[:len(stripdir_parts)]: 178 if quiet < 2: 179 print("The stripdir path {!r} is not a valid prefix for " 180 "source path {!r}; ignoring".format(stripdir, fullname)) 181 else: 182 dfile = os.path.join(*fullname_parts[len(stripdir_parts):]) 183 184 if prependdir is not None: 185 if dfile is None: 186 dfile = os.path.join(prependdir, fullname) 187 else: 188 dfile = os.path.join(prependdir, dfile) 189 190 if isinstance(optimize, int): 191 optimize = [optimize] 192 193 # Use set() to remove duplicates. 194 # Use sorted() to create pyc files in a deterministic order. 195 optimize = sorted(set(optimize)) 196 197 if hardlink_dupes and len(optimize) < 2: 198 raise ValueError("Hardlinking of duplicated bytecode makes sense " 199 "only for more than one optimization level") 200 201 if rx is not None: 202 mo = rx.search(fullname) 203 if mo: 204 return success 205 206 if limit_sl_dest is not None and os.path.islink(fullname): 207 if Path(limit_sl_dest).resolve() not in Path(fullname).resolve().parents: 208 return success 209 210 opt_cfiles = {} 211 212 if os.path.isfile(fullname): 213 for opt_level in optimize: 214 if legacy: 215 opt_cfiles[opt_level] = fullname + 'c' 216 else: 217 if opt_level >= 0: 218 opt = opt_level if opt_level >= 1 else '' 219 cfile = (importlib.util.cache_from_source( 220 fullname, optimization=opt)) 221 opt_cfiles[opt_level] = cfile 222 else: 223 cfile = importlib.util.cache_from_source(fullname) 224 opt_cfiles[opt_level] = cfile 225 226 head, tail = name[:-3], name[-3:] 227 if tail == '.py': 228 if not force: 229 try: 230 mtime = int(os.stat(fullname).st_mtime) 231 expect = struct.pack('<4sLL', importlib.util.MAGIC_NUMBER, 232 0, mtime & 0xFFFF_FFFF) 233 for cfile in opt_cfiles.values(): 234 with open(cfile, 'rb') as chandle: 235 actual = chandle.read(12) 236 if expect != actual: 237 break 238 else: 239 return success 240 except OSError: 241 pass 242 if not quiet: 243 print('Compiling {!r}...'.format(fullname)) 244 try: 245 for index, opt_level in enumerate(optimize): 246 cfile = opt_cfiles[opt_level] 247 ok = py_compile.compile(fullname, cfile, dfile, True, 248 optimize=opt_level, 249 invalidation_mode=invalidation_mode) 250 if index > 0 and hardlink_dupes: 251 previous_cfile = opt_cfiles[optimize[index - 1]] 252 if filecmp.cmp(cfile, previous_cfile, shallow=False): 253 os.unlink(cfile) 254 os.link(previous_cfile, cfile) 255 except py_compile.PyCompileError as err: 256 success = False 257 if quiet >= 2: 258 return success 259 elif quiet: 260 print('*** Error compiling {!r}...'.format(fullname)) 261 else: 262 print('*** ', end='') 263 # escape non-printable characters in msg 264 encoding = sys.stdout.encoding or sys.getdefaultencoding() 265 msg = err.msg.encode(encoding, errors='backslashreplace').decode(encoding) 266 print(msg) 267 except (SyntaxError, UnicodeError, OSError) as e: 268 success = False 269 if quiet >= 2: 270 return success 271 elif quiet: 272 print('*** Error compiling {!r}...'.format(fullname)) 273 else: 274 print('*** ', end='') 275 print(e.__class__.__name__ + ':', e) 276 else: 277 if ok == 0: 278 success = False 279 return success 280 281def compile_path(skip_curdir=1, maxlevels=0, force=False, quiet=0, 282 legacy=False, optimize=-1, 283 invalidation_mode=None): 284 """Byte-compile all module on sys.path. 285 286 Arguments (all optional): 287 288 skip_curdir: if true, skip current directory (default True) 289 maxlevels: max recursion level (default 0) 290 force: as for compile_dir() (default False) 291 quiet: as for compile_dir() (default 0) 292 legacy: as for compile_dir() (default False) 293 optimize: as for compile_dir() (default -1) 294 invalidation_mode: as for compiler_dir() 295 """ 296 success = True 297 for dir in sys.path: 298 if (not dir or dir == os.curdir) and skip_curdir: 299 if quiet < 2: 300 print('Skipping current directory') 301 else: 302 success = success and compile_dir( 303 dir, 304 maxlevels, 305 None, 306 force, 307 quiet=quiet, 308 legacy=legacy, 309 optimize=optimize, 310 invalidation_mode=invalidation_mode, 311 ) 312 return success 313 314 315def main(): 316 """Script main program.""" 317 import argparse 318 319 parser = argparse.ArgumentParser( 320 description='Utilities to support installing Python libraries.') 321 parser.add_argument('-l', action='store_const', const=0, 322 default=None, dest='maxlevels', 323 help="don't recurse into subdirectories") 324 parser.add_argument('-r', type=int, dest='recursion', 325 help=('control the maximum recursion level. ' 326 'if `-l` and `-r` options are specified, ' 327 'then `-r` takes precedence.')) 328 parser.add_argument('-f', action='store_true', dest='force', 329 help='force rebuild even if timestamps are up to date') 330 parser.add_argument('-q', action='count', dest='quiet', default=0, 331 help='output only error messages; -qq will suppress ' 332 'the error messages as well.') 333 parser.add_argument('-b', action='store_true', dest='legacy', 334 help='use legacy (pre-PEP3147) compiled file locations') 335 parser.add_argument('-d', metavar='DESTDIR', dest='ddir', default=None, 336 help=('directory to prepend to file paths for use in ' 337 'compile-time tracebacks and in runtime ' 338 'tracebacks in cases where the source file is ' 339 'unavailable')) 340 parser.add_argument('-s', metavar='STRIPDIR', dest='stripdir', 341 default=None, 342 help=('part of path to left-strip from path ' 343 'to source file - for example buildroot. ' 344 '`-d` and `-s` options cannot be ' 345 'specified together.')) 346 parser.add_argument('-p', metavar='PREPENDDIR', dest='prependdir', 347 default=None, 348 help=('path to add as prefix to path ' 349 'to source file - for example / to make ' 350 'it absolute when some part is removed ' 351 'by `-s` option. ' 352 '`-d` and `-p` options cannot be ' 353 'specified together.')) 354 parser.add_argument('-x', metavar='REGEXP', dest='rx', default=None, 355 help=('skip files matching the regular expression; ' 356 'the regexp is searched for in the full path ' 357 'of each file considered for compilation')) 358 parser.add_argument('-i', metavar='FILE', dest='flist', 359 help=('add all the files and directories listed in ' 360 'FILE to the list considered for compilation; ' 361 'if "-", names are read from stdin')) 362 parser.add_argument('compile_dest', metavar='FILE|DIR', nargs='*', 363 help=('zero or more file and directory names ' 364 'to compile; if no arguments given, defaults ' 365 'to the equivalent of -l sys.path')) 366 parser.add_argument('-j', '--workers', default=1, 367 type=int, help='Run compileall concurrently') 368 invalidation_modes = [mode.name.lower().replace('_', '-') 369 for mode in py_compile.PycInvalidationMode] 370 parser.add_argument('--invalidation-mode', 371 choices=sorted(invalidation_modes), 372 help=('set .pyc invalidation mode; defaults to ' 373 '"checked-hash" if the SOURCE_DATE_EPOCH ' 374 'environment variable is set, and ' 375 '"timestamp" otherwise.')) 376 parser.add_argument('-o', action='append', type=int, dest='opt_levels', 377 help=('Optimization levels to run compilation with. ' 378 'Default is -1 which uses the optimization level ' 379 'of the Python interpreter itself (see -O).')) 380 parser.add_argument('-e', metavar='DIR', dest='limit_sl_dest', 381 help='Ignore symlinks pointing outsite of the DIR') 382 parser.add_argument('--hardlink-dupes', action='store_true', 383 dest='hardlink_dupes', 384 help='Hardlink duplicated pyc files') 385 386 args = parser.parse_args() 387 compile_dests = args.compile_dest 388 389 if args.rx: 390 import re 391 args.rx = re.compile(args.rx) 392 393 if args.limit_sl_dest == "": 394 args.limit_sl_dest = None 395 396 if args.recursion is not None: 397 maxlevels = args.recursion 398 else: 399 maxlevels = args.maxlevels 400 401 if args.opt_levels is None: 402 args.opt_levels = [-1] 403 404 if len(args.opt_levels) == 1 and args.hardlink_dupes: 405 parser.error(("Hardlinking of duplicated bytecode makes sense " 406 "only for more than one optimization level.")) 407 408 if args.ddir is not None and ( 409 args.stripdir is not None or args.prependdir is not None 410 ): 411 parser.error("-d cannot be used in combination with -s or -p") 412 413 # if flist is provided then load it 414 if args.flist: 415 try: 416 with (sys.stdin if args.flist=='-' else 417 open(args.flist, encoding="utf-8")) as f: 418 for line in f: 419 compile_dests.append(line.strip()) 420 except OSError: 421 if args.quiet < 2: 422 print("Error reading file list {}".format(args.flist)) 423 return False 424 425 if args.invalidation_mode: 426 ivl_mode = args.invalidation_mode.replace('-', '_').upper() 427 invalidation_mode = py_compile.PycInvalidationMode[ivl_mode] 428 else: 429 invalidation_mode = None 430 431 success = True 432 try: 433 if compile_dests: 434 for dest in compile_dests: 435 if os.path.isfile(dest): 436 if not compile_file(dest, args.ddir, args.force, args.rx, 437 args.quiet, args.legacy, 438 invalidation_mode=invalidation_mode, 439 stripdir=args.stripdir, 440 prependdir=args.prependdir, 441 optimize=args.opt_levels, 442 limit_sl_dest=args.limit_sl_dest, 443 hardlink_dupes=args.hardlink_dupes): 444 success = False 445 else: 446 if not compile_dir(dest, maxlevels, args.ddir, 447 args.force, args.rx, args.quiet, 448 args.legacy, workers=args.workers, 449 invalidation_mode=invalidation_mode, 450 stripdir=args.stripdir, 451 prependdir=args.prependdir, 452 optimize=args.opt_levels, 453 limit_sl_dest=args.limit_sl_dest, 454 hardlink_dupes=args.hardlink_dupes): 455 success = False 456 return success 457 else: 458 return compile_path(legacy=args.legacy, force=args.force, 459 quiet=args.quiet, 460 invalidation_mode=invalidation_mode) 461 except KeyboardInterrupt: 462 if args.quiet < 2: 463 print("\n[interrupted]") 464 return False 465 return True 466 467 468if __name__ == '__main__': 469 exit_status = int(not main()) 470 sys.exit(exit_status) 471