1import argparse 2import contextlib 3import logging 4import os 5import os.path 6import shutil 7import sys 8 9from . import fsutil, strutil, iterutil, logging as loggingutil 10 11 12_NOT_SET = object() 13 14 15def get_prog(spec=None, *, absolute=False, allowsuffix=True): 16 if spec is None: 17 _, spec = _find_script() 18 # This is more natural for prog than __file__ would be. 19 filename = sys.argv[0] 20 elif isinstance(spec, str): 21 filename = os.path.normpath(spec) 22 spec = None 23 else: 24 filename = spec.origin 25 if _is_standalone(filename): 26 # Check if "installed". 27 if allowsuffix or not filename.endswith('.py'): 28 basename = os.path.basename(filename) 29 found = shutil.which(basename) 30 if found: 31 script = os.path.abspath(filename) 32 found = os.path.abspath(found) 33 if os.path.normcase(script) == os.path.normcase(found): 34 return basename 35 # It is only "standalone". 36 if absolute: 37 filename = os.path.abspath(filename) 38 return filename 39 elif spec is not None: 40 module = spec.name 41 if module.endswith('.__main__'): 42 module = module[:-9] 43 return f'{sys.executable} -m {module}' 44 else: 45 if absolute: 46 filename = os.path.abspath(filename) 47 return f'{sys.executable} {filename}' 48 49 50def _find_script(): 51 frame = sys._getframe(2) 52 while frame.f_globals['__name__'] != '__main__': 53 frame = frame.f_back 54 55 # This should match sys.argv[0]. 56 filename = frame.f_globals['__file__'] 57 # This will be None if -m wasn't used.. 58 spec = frame.f_globals['__spec__'] 59 return filename, spec 60 61 62def is_installed(filename, *, allowsuffix=True): 63 if not allowsuffix and filename.endswith('.py'): 64 return False 65 filename = os.path.abspath(os.path.normalize(filename)) 66 found = shutil.which(os.path.basename(filename)) 67 if not found: 68 return False 69 if found != filename: 70 return False 71 return _is_standalone(filename) 72 73 74def is_standalone(filename): 75 filename = os.path.abspath(os.path.normalize(filename)) 76 return _is_standalone(filename) 77 78 79def _is_standalone(filename): 80 return fsutil.is_executable(filename) 81 82 83################################## 84# logging 85 86VERBOSITY = 3 87 88TRACEBACK = os.environ.get('SHOW_TRACEBACK', '').strip() 89TRACEBACK = bool(TRACEBACK and TRACEBACK.upper() not in ('0', 'FALSE', 'NO')) 90 91 92logger = logging.getLogger(__name__) 93 94 95def configure_logger(verbosity, logger=None, **kwargs): 96 if logger is None: 97 # Configure the root logger. 98 logger = logging.getLogger() 99 loggingutil.configure_logger(logger, verbosity, **kwargs) 100 101 102################################## 103# selections 104 105class UnsupportedSelectionError(Exception): 106 def __init__(self, values, possible): 107 self.values = tuple(values) 108 self.possible = tuple(possible) 109 super().__init__(f'unsupported selections {self.unique}') 110 111 @property 112 def unique(self): 113 return tuple(sorted(set(self.values))) 114 115 116def normalize_selection(selected: str, *, possible=None): 117 if selected in (None, True, False): 118 return selected 119 elif isinstance(selected, str): 120 selected = [selected] 121 elif not selected: 122 return () 123 124 unsupported = [] 125 _selected = set() 126 for item in selected: 127 if not item: 128 continue 129 for value in item.strip().replace(',', ' ').split(): 130 if not value: 131 continue 132 # XXX Handle subtraction (leading "-"). 133 if possible and value not in possible and value != 'all': 134 unsupported.append(value) 135 _selected.add(value) 136 if unsupported: 137 raise UnsupportedSelectionError(unsupported, tuple(possible)) 138 if 'all' in _selected: 139 return True 140 return frozenset(selected) 141 142 143################################## 144# CLI parsing helpers 145 146class CLIArgSpec(tuple): 147 def __new__(cls, *args, **kwargs): 148 return super().__new__(cls, (args, kwargs)) 149 150 def __repr__(self): 151 args, kwargs = self 152 args = [repr(arg) for arg in args] 153 for name, value in kwargs.items(): 154 args.append(f'{name}={value!r}') 155 return f'{type(self).__name__}({", ".join(args)})' 156 157 def __call__(self, parser, *, _noop=(lambda a: None)): 158 self.apply(parser) 159 return _noop 160 161 def apply(self, parser): 162 args, kwargs = self 163 parser.add_argument(*args, **kwargs) 164 165 166def apply_cli_argspecs(parser, specs): 167 processors = [] 168 for spec in specs: 169 if callable(spec): 170 procs = spec(parser) 171 _add_procs(processors, procs) 172 else: 173 args, kwargs = spec 174 parser.add_argument(args, kwargs) 175 return processors 176 177 178def _add_procs(flattened, procs): 179 # XXX Fail on non-empty, non-callable procs? 180 if not procs: 181 return 182 if callable(procs): 183 flattened.append(procs) 184 else: 185 #processors.extend(p for p in procs if callable(p)) 186 for proc in procs: 187 _add_procs(flattened, proc) 188 189 190def add_verbosity_cli(parser): 191 parser.add_argument('-q', '--quiet', action='count', default=0) 192 parser.add_argument('-v', '--verbose', action='count', default=0) 193 194 def process_args(args, *, argv=None): 195 ns = vars(args) 196 key = 'verbosity' 197 if key in ns: 198 parser.error(f'duplicate arg {key!r}') 199 ns[key] = max(0, VERBOSITY + ns.pop('verbose') - ns.pop('quiet')) 200 return key 201 return process_args 202 203 204def add_traceback_cli(parser): 205 parser.add_argument('--traceback', '--tb', action='store_true', 206 default=TRACEBACK) 207 parser.add_argument('--no-traceback', '--no-tb', dest='traceback', 208 action='store_const', const=False) 209 210 def process_args(args, *, argv=None): 211 ns = vars(args) 212 key = 'traceback_cm' 213 if key in ns: 214 parser.error(f'duplicate arg {key!r}') 215 showtb = ns.pop('traceback') 216 217 @contextlib.contextmanager 218 def traceback_cm(): 219 restore = loggingutil.hide_emit_errors() 220 try: 221 yield 222 except BrokenPipeError: 223 # It was piped to "head" or something similar. 224 pass 225 except NotImplementedError: 226 raise # re-raise 227 except Exception as exc: 228 if not showtb: 229 sys.exit(f'ERROR: {exc}') 230 raise # re-raise 231 except KeyboardInterrupt: 232 if not showtb: 233 sys.exit('\nINTERRUPTED') 234 raise # re-raise 235 except BaseException as exc: 236 if not showtb: 237 sys.exit(f'{type(exc).__name__}: {exc}') 238 raise # re-raise 239 finally: 240 restore() 241 ns[key] = traceback_cm() 242 return key 243 return process_args 244 245 246def add_sepval_cli(parser, opt, dest, choices, *, sep=',', **kwargs): 247# if opt is True: 248# parser.add_argument(f'--{dest}', action='append', **kwargs) 249# elif isinstance(opt, str) and opt.startswith('-'): 250# parser.add_argument(opt, dest=dest, action='append', **kwargs) 251# else: 252# arg = dest if not opt else opt 253# kwargs.setdefault('nargs', '+') 254# parser.add_argument(arg, dest=dest, action='append', **kwargs) 255 if not isinstance(opt, str): 256 parser.error(f'opt must be a string, got {opt!r}') 257 elif opt.startswith('-'): 258 parser.add_argument(opt, dest=dest, action='append', **kwargs) 259 else: 260 kwargs.setdefault('nargs', '+') 261 #kwargs.setdefault('metavar', opt.upper()) 262 parser.add_argument(opt, dest=dest, action='append', **kwargs) 263 264 def process_args(args, *, argv=None): 265 ns = vars(args) 266 267 # XXX Use normalize_selection()? 268 if isinstance(ns[dest], str): 269 ns[dest] = [ns[dest]] 270 selections = [] 271 for many in ns[dest] or (): 272 for value in many.split(sep): 273 if value not in choices: 274 parser.error(f'unknown {dest} {value!r}') 275 selections.append(value) 276 ns[dest] = selections 277 return process_args 278 279 280def add_files_cli(parser, *, excluded=None, nargs=None): 281 process_files = add_file_filtering_cli(parser, excluded=excluded) 282 parser.add_argument('filenames', nargs=nargs or '+', metavar='FILENAME') 283 return [ 284 process_files, 285 ] 286 287 288def add_file_filtering_cli(parser, *, excluded=None): 289 parser.add_argument('--start') 290 parser.add_argument('--include', action='append') 291 parser.add_argument('--exclude', action='append') 292 293 excluded = tuple(excluded or ()) 294 295 def process_args(args, *, argv=None): 296 ns = vars(args) 297 key = 'iter_filenames' 298 if key in ns: 299 parser.error(f'duplicate arg {key!r}') 300 301 _include = tuple(ns.pop('include') or ()) 302 _exclude = excluded + tuple(ns.pop('exclude') or ()) 303 kwargs = dict( 304 start=ns.pop('start'), 305 include=tuple(_parse_files(_include)), 306 exclude=tuple(_parse_files(_exclude)), 307 # We use the default for "show_header" 308 ) 309 def process_filenames(filenames, relroot=None): 310 return fsutil.process_filenames(filenames, relroot=relroot, **kwargs) 311 ns[key] = process_filenames 312 return process_args 313 314 315def _parse_files(filenames): 316 for filename, _ in strutil.parse_entries(filenames): 317 yield filename.strip() 318 319 320def add_progress_cli(parser, *, threshold=VERBOSITY, **kwargs): 321 parser.add_argument('--progress', dest='track_progress', action='store_const', const=True) 322 parser.add_argument('--no-progress', dest='track_progress', action='store_false') 323 parser.set_defaults(track_progress=True) 324 325 def process_args(args, *, argv=None): 326 if args.track_progress: 327 ns = vars(args) 328 verbosity = ns.get('verbosity', VERBOSITY) 329 if verbosity <= threshold: 330 args.track_progress = track_progress_compact 331 else: 332 args.track_progress = track_progress_flat 333 return process_args 334 335 336def add_failure_filtering_cli(parser, pool, *, default=False): 337 parser.add_argument('--fail', action='append', 338 metavar=f'"{{all|{"|".join(sorted(pool))}}},..."') 339 parser.add_argument('--no-fail', dest='fail', action='store_const', const=()) 340 341 def process_args(args, *, argv=None): 342 ns = vars(args) 343 344 fail = ns.pop('fail') 345 try: 346 fail = normalize_selection(fail, possible=pool) 347 except UnsupportedSelectionError as exc: 348 parser.error(f'invalid --fail values: {", ".join(exc.unique)}') 349 else: 350 if fail is None: 351 fail = default 352 353 if fail is True: 354 def ignore_exc(_exc): 355 return False 356 elif fail is False: 357 def ignore_exc(_exc): 358 return True 359 else: 360 def ignore_exc(exc): 361 for err in fail: 362 if type(exc) == pool[err]: 363 return False 364 else: 365 return True 366 args.ignore_exc = ignore_exc 367 return process_args 368 369 370def add_kind_filtering_cli(parser, *, default=None): 371 parser.add_argument('--kinds', action='append') 372 373 def process_args(args, *, argv=None): 374 ns = vars(args) 375 376 kinds = [] 377 for kind in ns.pop('kinds') or default or (): 378 kinds.extend(kind.strip().replace(',', ' ').split()) 379 380 if not kinds: 381 match_kind = (lambda k: True) 382 else: 383 included = set() 384 excluded = set() 385 for kind in kinds: 386 if kind.startswith('-'): 387 kind = kind[1:] 388 excluded.add(kind) 389 if kind in included: 390 included.remove(kind) 391 else: 392 included.add(kind) 393 if kind in excluded: 394 excluded.remove(kind) 395 if excluded: 396 if included: 397 ... # XXX fail? 398 def match_kind(kind, *, _excluded=excluded): 399 return kind not in _excluded 400 else: 401 def match_kind(kind, *, _included=included): 402 return kind in _included 403 args.match_kind = match_kind 404 return process_args 405 406 407COMMON_CLI = [ 408 add_verbosity_cli, 409 add_traceback_cli, 410 #add_dryrun_cli, 411] 412 413 414def add_commands_cli(parser, commands, *, commonspecs=COMMON_CLI, subset=None): 415 arg_processors = {} 416 if isinstance(subset, str): 417 cmdname = subset 418 try: 419 _, argspecs, _ = commands[cmdname] 420 except KeyError: 421 raise ValueError(f'unsupported subset {subset!r}') 422 parser.set_defaults(cmd=cmdname) 423 arg_processors[cmdname] = _add_cmd_cli(parser, commonspecs, argspecs) 424 else: 425 if subset is None: 426 cmdnames = subset = list(commands) 427 elif not subset: 428 raise NotImplementedError 429 elif isinstance(subset, set): 430 cmdnames = [k for k in commands if k in subset] 431 subset = sorted(subset) 432 else: 433 cmdnames = [n for n in subset if n in commands] 434 if len(cmdnames) < len(subset): 435 bad = tuple(n for n in subset if n not in commands) 436 raise ValueError(f'unsupported subset {bad}') 437 438 common = argparse.ArgumentParser(add_help=False) 439 common_processors = apply_cli_argspecs(common, commonspecs) 440 subs = parser.add_subparsers(dest='cmd') 441 for cmdname in cmdnames: 442 description, argspecs, _ = commands[cmdname] 443 sub = subs.add_parser( 444 cmdname, 445 description=description, 446 parents=[common], 447 ) 448 cmd_processors = _add_cmd_cli(sub, (), argspecs) 449 arg_processors[cmdname] = common_processors + cmd_processors 450 return arg_processors 451 452 453def _add_cmd_cli(parser, commonspecs, argspecs): 454 processors = [] 455 argspecs = list(commonspecs or ()) + list(argspecs or ()) 456 for argspec in argspecs: 457 if callable(argspec): 458 procs = argspec(parser) 459 _add_procs(processors, procs) 460 else: 461 if not argspec: 462 raise NotImplementedError 463 args = list(argspec) 464 if not isinstance(args[-1], str): 465 kwargs = args.pop() 466 if not isinstance(args[0], str): 467 try: 468 args, = args 469 except (TypeError, ValueError): 470 parser.error(f'invalid cmd args {argspec!r}') 471 else: 472 kwargs = {} 473 parser.add_argument(*args, **kwargs) 474 # There will be nothing to process. 475 return processors 476 477 478def _flatten_processors(processors): 479 for proc in processors: 480 if proc is None: 481 continue 482 if callable(proc): 483 yield proc 484 else: 485 yield from _flatten_processors(proc) 486 487 488def process_args(args, argv, processors, *, keys=None): 489 processors = _flatten_processors(processors) 490 ns = vars(args) 491 extracted = {} 492 if keys is None: 493 for process_args in processors: 494 for key in process_args(args, argv=argv): 495 extracted[key] = ns.pop(key) 496 else: 497 remainder = set(keys) 498 for process_args in processors: 499 hanging = process_args(args, argv=argv) 500 if isinstance(hanging, str): 501 hanging = [hanging] 502 for key in hanging or (): 503 if key not in remainder: 504 raise NotImplementedError(key) 505 extracted[key] = ns.pop(key) 506 remainder.remove(key) 507 if remainder: 508 raise NotImplementedError(sorted(remainder)) 509 return extracted 510 511 512def process_args_by_key(args, argv, processors, keys): 513 extracted = process_args(args, argv, processors, keys=keys) 514 return [extracted[key] for key in keys] 515 516 517################################## 518# commands 519 520def set_command(name, add_cli): 521 """A decorator factory to set CLI info.""" 522 def decorator(func): 523 if hasattr(func, '__cli__'): 524 raise Exception(f'already set') 525 func.__cli__ = (name, add_cli) 526 return func 527 return decorator 528 529 530################################## 531# main() helpers 532 533def filter_filenames(filenames, process_filenames=None, relroot=fsutil.USE_CWD): 534 # We expect each filename to be a normalized, absolute path. 535 for filename, _, check, _ in _iter_filenames(filenames, process_filenames, relroot): 536 if (reason := check()): 537 logger.debug(f'{filename}: {reason}') 538 continue 539 yield filename 540 541 542def main_for_filenames(filenames, process_filenames=None, relroot=fsutil.USE_CWD): 543 filenames, relroot = fsutil.fix_filenames(filenames, relroot=relroot) 544 for filename, relfile, check, show in _iter_filenames(filenames, process_filenames, relroot): 545 if show: 546 print() 547 print(relfile) 548 print('-------------------------------------------') 549 if (reason := check()): 550 print(reason) 551 continue 552 yield filename, relfile 553 554 555def _iter_filenames(filenames, process, relroot): 556 if process is None: 557 yield from fsutil.process_filenames(filenames, relroot=relroot) 558 return 559 560 onempty = Exception('no filenames provided') 561 items = process(filenames, relroot=relroot) 562 items, peeked = iterutil.peek_and_iter(items) 563 if not items: 564 raise onempty 565 if isinstance(peeked, str): 566 if relroot and relroot is not fsutil.USE_CWD: 567 relroot = os.path.abspath(relroot) 568 check = (lambda: True) 569 for filename, ismany in iterutil.iter_many(items, onempty): 570 relfile = fsutil.format_filename(filename, relroot, fixroot=False) 571 yield filename, relfile, check, ismany 572 elif len(peeked) == 4: 573 yield from items 574 else: 575 raise NotImplementedError 576 577 578def track_progress_compact(items, *, groups=5, **mark_kwargs): 579 last = os.linesep 580 marks = iter_marks(groups=groups, **mark_kwargs) 581 for item in items: 582 last = next(marks) 583 print(last, end='', flush=True) 584 yield item 585 if not last.endswith(os.linesep): 586 print() 587 588 589def track_progress_flat(items, fmt='<{}>'): 590 for item in items: 591 print(fmt.format(item), flush=True) 592 yield item 593 594 595def iter_marks(mark='.', *, group=5, groups=2, lines=_NOT_SET, sep=' '): 596 mark = mark or '' 597 group = group if group and group > 1 else 1 598 groups = groups if groups and groups > 1 else 1 599 600 sep = f'{mark}{sep}' if sep else mark 601 end = f'{mark}{os.linesep}' 602 div = os.linesep 603 perline = group * groups 604 if lines is _NOT_SET: 605 # By default we try to put about 100 in each line group. 606 perlines = 100 // perline * perline 607 elif not lines or lines < 0: 608 perlines = None 609 else: 610 perlines = perline * lines 611 612 if perline == 1: 613 yield end 614 elif group == 1: 615 yield sep 616 617 count = 1 618 while True: 619 if count % perline == 0: 620 yield end 621 if perlines and count % perlines == 0: 622 yield div 623 elif count % group == 0: 624 yield sep 625 else: 626 yield mark 627 count += 1 628