1import argparse 2import contextlib 3import fnmatch 4import logging 5import os 6import os.path 7import shutil 8import sys 9 10from . import fsutil, strutil, iterutil, logging as loggingutil 11 12 13_NOT_SET = object() 14 15 16def get_prog(spec=None, *, absolute=False, allowsuffix=True): 17 if spec is None: 18 _, spec = _find_script() 19 # This is more natural for prog than __file__ would be. 20 filename = sys.argv[0] 21 elif isinstance(spec, str): 22 filename = os.path.normpath(spec) 23 spec = None 24 else: 25 filename = spec.origin 26 if _is_standalone(filename): 27 # Check if "installed". 28 if allowsuffix or not filename.endswith('.py'): 29 basename = os.path.basename(filename) 30 found = shutil.which(basename) 31 if found: 32 script = os.path.abspath(filename) 33 found = os.path.abspath(found) 34 if os.path.normcase(script) == os.path.normcase(found): 35 return basename 36 # It is only "standalone". 37 if absolute: 38 filename = os.path.abspath(filename) 39 return filename 40 elif spec is not None: 41 module = spec.name 42 if module.endswith('.__main__'): 43 module = module[:-9] 44 return f'{sys.executable} -m {module}' 45 else: 46 if absolute: 47 filename = os.path.abspath(filename) 48 return f'{sys.executable} {filename}' 49 50 51def _find_script(): 52 frame = sys._getframe(2) 53 while frame.f_globals['__name__'] != '__main__': 54 frame = frame.f_back 55 56 # This should match sys.argv[0]. 57 filename = frame.f_globals['__file__'] 58 # This will be None if -m wasn't used.. 59 spec = frame.f_globals['__spec__'] 60 return filename, spec 61 62 63def is_installed(filename, *, allowsuffix=True): 64 if not allowsuffix and filename.endswith('.py'): 65 return False 66 filename = os.path.abspath(os.path.normalize(filename)) 67 found = shutil.which(os.path.basename(filename)) 68 if not found: 69 return False 70 if found != filename: 71 return False 72 return _is_standalone(filename) 73 74 75def is_standalone(filename): 76 filename = os.path.abspath(os.path.normalize(filename)) 77 return _is_standalone(filename) 78 79 80def _is_standalone(filename): 81 return fsutil.is_executable(filename) 82 83 84################################## 85# logging 86 87VERBOSITY = 3 88 89TRACEBACK = os.environ.get('SHOW_TRACEBACK', '').strip() 90TRACEBACK = bool(TRACEBACK and TRACEBACK.upper() not in ('0', 'FALSE', 'NO')) 91 92 93logger = logging.getLogger(__name__) 94 95 96def configure_logger(verbosity, logger=None, **kwargs): 97 if logger is None: 98 # Configure the root logger. 99 logger = logging.getLogger() 100 loggingutil.configure_logger(logger, verbosity, **kwargs) 101 102 103################################## 104# selections 105 106class UnsupportedSelectionError(Exception): 107 def __init__(self, values, possible): 108 self.values = tuple(values) 109 self.possible = tuple(possible) 110 super().__init__(f'unsupported selections {self.unique}') 111 112 @property 113 def unique(self): 114 return tuple(sorted(set(self.values))) 115 116 117def normalize_selection(selected: str, *, possible=None): 118 if selected in (None, True, False): 119 return selected 120 elif isinstance(selected, str): 121 selected = [selected] 122 elif not selected: 123 return () 124 125 unsupported = [] 126 _selected = set() 127 for item in selected: 128 if not item: 129 continue 130 for value in item.strip().replace(',', ' ').split(): 131 if not value: 132 continue 133 # XXX Handle subtraction (leading "-"). 134 if possible and value not in possible and value != 'all': 135 unsupported.append(value) 136 _selected.add(value) 137 if unsupported: 138 raise UnsupportedSelectionError(unsupported, tuple(possible)) 139 if 'all' in _selected: 140 return True 141 return frozenset(selected) 142 143 144################################## 145# CLI parsing helpers 146 147class CLIArgSpec(tuple): 148 def __new__(cls, *args, **kwargs): 149 return super().__new__(cls, (args, kwargs)) 150 151 def __repr__(self): 152 args, kwargs = self 153 args = [repr(arg) for arg in args] 154 for name, value in kwargs.items(): 155 args.append(f'{name}={value!r}') 156 return f'{type(self).__name__}({", ".join(args)})' 157 158 def __call__(self, parser, *, _noop=(lambda a: None)): 159 self.apply(parser) 160 return _noop 161 162 def apply(self, parser): 163 args, kwargs = self 164 parser.add_argument(*args, **kwargs) 165 166 167def apply_cli_argspecs(parser, specs): 168 processors = [] 169 for spec in specs: 170 if callable(spec): 171 procs = spec(parser) 172 _add_procs(processors, procs) 173 else: 174 args, kwargs = spec 175 parser.add_argument(args, kwargs) 176 return processors 177 178 179def _add_procs(flattened, procs): 180 # XXX Fail on non-empty, non-callable procs? 181 if not procs: 182 return 183 if callable(procs): 184 flattened.append(procs) 185 else: 186 #processors.extend(p for p in procs if callable(p)) 187 for proc in procs: 188 _add_procs(flattened, proc) 189 190 191def add_verbosity_cli(parser): 192 parser.add_argument('-q', '--quiet', action='count', default=0) 193 parser.add_argument('-v', '--verbose', action='count', default=0) 194 195 def process_args(args, *, argv=None): 196 ns = vars(args) 197 key = 'verbosity' 198 if key in ns: 199 parser.error(f'duplicate arg {key!r}') 200 ns[key] = max(0, VERBOSITY + ns.pop('verbose') - ns.pop('quiet')) 201 return key 202 return process_args 203 204 205def add_traceback_cli(parser): 206 parser.add_argument('--traceback', '--tb', action='store_true', 207 default=TRACEBACK) 208 parser.add_argument('--no-traceback', '--no-tb', dest='traceback', 209 action='store_const', const=False) 210 211 def process_args(args, *, argv=None): 212 ns = vars(args) 213 key = 'traceback_cm' 214 if key in ns: 215 parser.error(f'duplicate arg {key!r}') 216 showtb = ns.pop('traceback') 217 218 @contextlib.contextmanager 219 def traceback_cm(): 220 restore = loggingutil.hide_emit_errors() 221 try: 222 yield 223 except BrokenPipeError: 224 # It was piped to "head" or something similar. 225 pass 226 except NotImplementedError: 227 raise # re-raise 228 except Exception as exc: 229 if not showtb: 230 sys.exit(f'ERROR: {exc}') 231 raise # re-raise 232 except KeyboardInterrupt: 233 if not showtb: 234 sys.exit('\nINTERRUPTED') 235 raise # re-raise 236 except BaseException as exc: 237 if not showtb: 238 sys.exit(f'{type(exc).__name__}: {exc}') 239 raise # re-raise 240 finally: 241 restore() 242 ns[key] = traceback_cm() 243 return key 244 return process_args 245 246 247def add_sepval_cli(parser, opt, dest, choices, *, sep=',', **kwargs): 248# if opt is True: 249# parser.add_argument(f'--{dest}', action='append', **kwargs) 250# elif isinstance(opt, str) and opt.startswith('-'): 251# parser.add_argument(opt, dest=dest, action='append', **kwargs) 252# else: 253# arg = dest if not opt else opt 254# kwargs.setdefault('nargs', '+') 255# parser.add_argument(arg, dest=dest, action='append', **kwargs) 256 if not isinstance(opt, str): 257 parser.error(f'opt must be a string, got {opt!r}') 258 elif opt.startswith('-'): 259 parser.add_argument(opt, dest=dest, action='append', **kwargs) 260 else: 261 kwargs.setdefault('nargs', '+') 262 #kwargs.setdefault('metavar', opt.upper()) 263 parser.add_argument(opt, dest=dest, action='append', **kwargs) 264 265 def process_args(args, *, argv=None): 266 ns = vars(args) 267 268 # XXX Use normalize_selection()? 269 if isinstance(ns[dest], str): 270 ns[dest] = [ns[dest]] 271 selections = [] 272 for many in ns[dest] or (): 273 for value in many.split(sep): 274 if value not in choices: 275 parser.error(f'unknown {dest} {value!r}') 276 selections.append(value) 277 ns[dest] = selections 278 return process_args 279 280 281def add_files_cli(parser, *, excluded=None, nargs=None): 282 process_files = add_file_filtering_cli(parser, excluded=excluded) 283 parser.add_argument('filenames', nargs=nargs or '+', metavar='FILENAME') 284 return [ 285 process_files, 286 ] 287 288 289def add_file_filtering_cli(parser, *, excluded=None): 290 parser.add_argument('--start') 291 parser.add_argument('--include', action='append') 292 parser.add_argument('--exclude', action='append') 293 294 excluded = tuple(excluded or ()) 295 296 def process_args(args, *, argv=None): 297 ns = vars(args) 298 key = 'iter_filenames' 299 if key in ns: 300 parser.error(f'duplicate arg {key!r}') 301 302 _include = tuple(ns.pop('include') or ()) 303 _exclude = excluded + tuple(ns.pop('exclude') or ()) 304 kwargs = dict( 305 start=ns.pop('start'), 306 include=tuple(_parse_files(_include)), 307 exclude=tuple(_parse_files(_exclude)), 308 # We use the default for "show_header" 309 ) 310 def process_filenames(filenames, relroot=None): 311 return fsutil.process_filenames(filenames, relroot=relroot, **kwargs) 312 ns[key] = process_filenames 313 return process_args 314 315 316def _parse_files(filenames): 317 for filename, _ in strutil.parse_entries(filenames): 318 yield filename.strip() 319 320 321def add_progress_cli(parser, *, threshold=VERBOSITY, **kwargs): 322 parser.add_argument('--progress', dest='track_progress', action='store_const', const=True) 323 parser.add_argument('--no-progress', dest='track_progress', action='store_false') 324 parser.set_defaults(track_progress=True) 325 326 def process_args(args, *, argv=None): 327 if args.track_progress: 328 ns = vars(args) 329 verbosity = ns.get('verbosity', VERBOSITY) 330 if verbosity <= threshold: 331 args.track_progress = track_progress_compact 332 else: 333 args.track_progress = track_progress_flat 334 return process_args 335 336 337def add_failure_filtering_cli(parser, pool, *, default=False): 338 parser.add_argument('--fail', action='append', 339 metavar=f'"{{all|{"|".join(sorted(pool))}}},..."') 340 parser.add_argument('--no-fail', dest='fail', action='store_const', const=()) 341 342 def process_args(args, *, argv=None): 343 ns = vars(args) 344 345 fail = ns.pop('fail') 346 try: 347 fail = normalize_selection(fail, possible=pool) 348 except UnsupportedSelectionError as exc: 349 parser.error(f'invalid --fail values: {", ".join(exc.unique)}') 350 else: 351 if fail is None: 352 fail = default 353 354 if fail is True: 355 def ignore_exc(_exc): 356 return False 357 elif fail is False: 358 def ignore_exc(_exc): 359 return True 360 else: 361 def ignore_exc(exc): 362 for err in fail: 363 if type(exc) == pool[err]: 364 return False 365 else: 366 return True 367 args.ignore_exc = ignore_exc 368 return process_args 369 370 371def add_kind_filtering_cli(parser, *, default=None): 372 parser.add_argument('--kinds', action='append') 373 374 def process_args(args, *, argv=None): 375 ns = vars(args) 376 377 kinds = [] 378 for kind in ns.pop('kinds') or default or (): 379 kinds.extend(kind.strip().replace(',', ' ').split()) 380 381 if not kinds: 382 match_kind = (lambda k: True) 383 else: 384 included = set() 385 excluded = set() 386 for kind in kinds: 387 if kind.startswith('-'): 388 kind = kind[1:] 389 excluded.add(kind) 390 if kind in included: 391 included.remove(kind) 392 else: 393 included.add(kind) 394 if kind in excluded: 395 excluded.remove(kind) 396 if excluded: 397 if included: 398 ... # XXX fail? 399 def match_kind(kind, *, _excluded=excluded): 400 return kind not in _excluded 401 else: 402 def match_kind(kind, *, _included=included): 403 return kind in _included 404 args.match_kind = match_kind 405 return process_args 406 407 408COMMON_CLI = [ 409 add_verbosity_cli, 410 add_traceback_cli, 411 #add_dryrun_cli, 412] 413 414 415def add_commands_cli(parser, commands, *, commonspecs=COMMON_CLI, subset=None): 416 arg_processors = {} 417 if isinstance(subset, str): 418 cmdname = subset 419 try: 420 _, argspecs, _ = commands[cmdname] 421 except KeyError: 422 raise ValueError(f'unsupported subset {subset!r}') 423 parser.set_defaults(cmd=cmdname) 424 arg_processors[cmdname] = _add_cmd_cli(parser, commonspecs, argspecs) 425 else: 426 if subset is None: 427 cmdnames = subset = list(commands) 428 elif not subset: 429 raise NotImplementedError 430 elif isinstance(subset, set): 431 cmdnames = [k for k in commands if k in subset] 432 subset = sorted(subset) 433 else: 434 cmdnames = [n for n in subset if n in commands] 435 if len(cmdnames) < len(subset): 436 bad = tuple(n for n in subset if n not in commands) 437 raise ValueError(f'unsupported subset {bad}') 438 439 common = argparse.ArgumentParser(add_help=False) 440 common_processors = apply_cli_argspecs(common, commonspecs) 441 subs = parser.add_subparsers(dest='cmd') 442 for cmdname in cmdnames: 443 description, argspecs, _ = commands[cmdname] 444 sub = subs.add_parser( 445 cmdname, 446 description=description, 447 parents=[common], 448 ) 449 cmd_processors = _add_cmd_cli(sub, (), argspecs) 450 arg_processors[cmdname] = common_processors + cmd_processors 451 return arg_processors 452 453 454def _add_cmd_cli(parser, commonspecs, argspecs): 455 processors = [] 456 argspecs = list(commonspecs or ()) + list(argspecs or ()) 457 for argspec in argspecs: 458 if callable(argspec): 459 procs = argspec(parser) 460 _add_procs(processors, procs) 461 else: 462 if not argspec: 463 raise NotImplementedError 464 args = list(argspec) 465 if not isinstance(args[-1], str): 466 kwargs = args.pop() 467 if not isinstance(args[0], str): 468 try: 469 args, = args 470 except (TypeError, ValueError): 471 parser.error(f'invalid cmd args {argspec!r}') 472 else: 473 kwargs = {} 474 parser.add_argument(*args, **kwargs) 475 # There will be nothing to process. 476 return processors 477 478 479def _flatten_processors(processors): 480 for proc in processors: 481 if proc is None: 482 continue 483 if callable(proc): 484 yield proc 485 else: 486 yield from _flatten_processors(proc) 487 488 489def process_args(args, argv, processors, *, keys=None): 490 processors = _flatten_processors(processors) 491 ns = vars(args) 492 extracted = {} 493 if keys is None: 494 for process_args in processors: 495 for key in process_args(args, argv=argv): 496 extracted[key] = ns.pop(key) 497 else: 498 remainder = set(keys) 499 for process_args in processors: 500 hanging = process_args(args, argv=argv) 501 if isinstance(hanging, str): 502 hanging = [hanging] 503 for key in hanging or (): 504 if key not in remainder: 505 raise NotImplementedError(key) 506 extracted[key] = ns.pop(key) 507 remainder.remove(key) 508 if remainder: 509 raise NotImplementedError(sorted(remainder)) 510 return extracted 511 512 513def process_args_by_key(args, argv, processors, keys): 514 extracted = process_args(args, argv, processors, keys=keys) 515 return [extracted[key] for key in keys] 516 517 518################################## 519# commands 520 521def set_command(name, add_cli): 522 """A decorator factory to set CLI info.""" 523 def decorator(func): 524 if hasattr(func, '__cli__'): 525 raise Exception(f'already set') 526 func.__cli__ = (name, add_cli) 527 return func 528 return decorator 529 530 531################################## 532# main() helpers 533 534def filter_filenames(filenames, process_filenames=None, relroot=fsutil.USE_CWD): 535 # We expect each filename to be a normalized, absolute path. 536 for filename, _, check, _ in _iter_filenames(filenames, process_filenames, relroot): 537 if (reason := check()): 538 logger.debug(f'{filename}: {reason}') 539 continue 540 yield filename 541 542 543def main_for_filenames(filenames, process_filenames=None, relroot=fsutil.USE_CWD): 544 filenames, relroot = fsutil.fix_filenames(filenames, relroot=relroot) 545 for filename, relfile, check, show in _iter_filenames(filenames, process_filenames, relroot): 546 if show: 547 print() 548 print(relfile) 549 print('-------------------------------------------') 550 if (reason := check()): 551 print(reason) 552 continue 553 yield filename, relfile 554 555 556def _iter_filenames(filenames, process, relroot): 557 if process is None: 558 yield from fsutil.process_filenames(filenames, relroot=relroot) 559 return 560 561 onempty = Exception('no filenames provided') 562 items = process(filenames, relroot=relroot) 563 items, peeked = iterutil.peek_and_iter(items) 564 if not items: 565 raise onempty 566 if isinstance(peeked, str): 567 if relroot and relroot is not fsutil.USE_CWD: 568 relroot = os.path.abspath(relroot) 569 check = (lambda: True) 570 for filename, ismany in iterutil.iter_many(items, onempty): 571 relfile = fsutil.format_filename(filename, relroot, fixroot=False) 572 yield filename, relfile, check, ismany 573 elif len(peeked) == 4: 574 yield from items 575 else: 576 raise NotImplementedError 577 578 579def track_progress_compact(items, *, groups=5, **mark_kwargs): 580 last = os.linesep 581 marks = iter_marks(groups=groups, **mark_kwargs) 582 for item in items: 583 last = next(marks) 584 print(last, end='', flush=True) 585 yield item 586 if not last.endswith(os.linesep): 587 print() 588 589 590def track_progress_flat(items, fmt='<{}>'): 591 for item in items: 592 print(fmt.format(item), flush=True) 593 yield item 594 595 596def iter_marks(mark='.', *, group=5, groups=2, lines=_NOT_SET, sep=' '): 597 mark = mark or '' 598 group = group if group and group > 1 else 1 599 groups = groups if groups and groups > 1 else 1 600 601 sep = f'{mark}{sep}' if sep else mark 602 end = f'{mark}{os.linesep}' 603 div = os.linesep 604 perline = group * groups 605 if lines is _NOT_SET: 606 # By default we try to put about 100 in each line group. 607 perlines = 100 // perline * perline 608 elif not lines or lines < 0: 609 perlines = None 610 else: 611 perlines = perline * lines 612 613 if perline == 1: 614 yield end 615 elif group == 1: 616 yield sep 617 618 count = 1 619 while True: 620 if count % perline == 0: 621 yield end 622 if perlines and count % perlines == 0: 623 yield div 624 elif count % group == 0: 625 yield sep 626 else: 627 yield mark 628 count += 1 629