• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import argparse
2import contextlib
3import logging
4import os
5import os.path
6import shutil
7import sys
8
9from . import fsutil, strutil, iterutil, logging as loggingutil
10
11
12_NOT_SET = object()
13
14
15def get_prog(spec=None, *, absolute=False, allowsuffix=True):
16    if spec is None:
17        _, spec = _find_script()
18        # This is more natural for prog than __file__ would be.
19        filename = sys.argv[0]
20    elif isinstance(spec, str):
21        filename = os.path.normpath(spec)
22        spec = None
23    else:
24        filename = spec.origin
25    if _is_standalone(filename):
26        # Check if "installed".
27        if allowsuffix or not filename.endswith('.py'):
28            basename = os.path.basename(filename)
29            found = shutil.which(basename)
30            if found:
31                script = os.path.abspath(filename)
32                found = os.path.abspath(found)
33                if os.path.normcase(script) == os.path.normcase(found):
34                    return basename
35        # It is only "standalone".
36        if absolute:
37            filename = os.path.abspath(filename)
38        return filename
39    elif spec is not None:
40        module = spec.name
41        if module.endswith('.__main__'):
42            module = module[:-9]
43        return f'{sys.executable} -m {module}'
44    else:
45        if absolute:
46            filename = os.path.abspath(filename)
47        return f'{sys.executable} {filename}'
48
49
50def _find_script():
51    frame = sys._getframe(2)
52    while frame.f_globals['__name__'] != '__main__':
53        frame = frame.f_back
54
55    # This should match sys.argv[0].
56    filename = frame.f_globals['__file__']
57    # This will be None if -m wasn't used..
58    spec = frame.f_globals['__spec__']
59    return filename, spec
60
61
62def is_installed(filename, *, allowsuffix=True):
63    if not allowsuffix and filename.endswith('.py'):
64        return False
65    filename = os.path.abspath(os.path.normalize(filename))
66    found = shutil.which(os.path.basename(filename))
67    if not found:
68        return False
69    if found != filename:
70        return False
71    return _is_standalone(filename)
72
73
74def is_standalone(filename):
75    filename = os.path.abspath(os.path.normalize(filename))
76    return _is_standalone(filename)
77
78
79def _is_standalone(filename):
80    return fsutil.is_executable(filename)
81
82
83##################################
84# logging
85
86VERBOSITY = 3
87
88TRACEBACK = os.environ.get('SHOW_TRACEBACK', '').strip()
89TRACEBACK = bool(TRACEBACK and TRACEBACK.upper() not in ('0', 'FALSE', 'NO'))
90
91
92logger = logging.getLogger(__name__)
93
94
95def configure_logger(verbosity, logger=None, **kwargs):
96    if logger is None:
97        # Configure the root logger.
98        logger = logging.getLogger()
99    loggingutil.configure_logger(logger, verbosity, **kwargs)
100
101
102##################################
103# selections
104
105class UnsupportedSelectionError(Exception):
106    def __init__(self, values, possible):
107        self.values = tuple(values)
108        self.possible = tuple(possible)
109        super().__init__(f'unsupported selections {self.unique}')
110
111    @property
112    def unique(self):
113        return tuple(sorted(set(self.values)))
114
115
116def normalize_selection(selected: str, *, possible=None):
117    if selected in (None, True, False):
118        return selected
119    elif isinstance(selected, str):
120        selected = [selected]
121    elif not selected:
122        return ()
123
124    unsupported = []
125    _selected = set()
126    for item in selected:
127        if not item:
128            continue
129        for value in item.strip().replace(',', ' ').split():
130            if not value:
131                continue
132            # XXX Handle subtraction (leading "-").
133            if possible and value not in possible and value != 'all':
134                unsupported.append(value)
135            _selected.add(value)
136    if unsupported:
137        raise UnsupportedSelectionError(unsupported, tuple(possible))
138    if 'all' in _selected:
139        return True
140    return frozenset(selected)
141
142
143##################################
144# CLI parsing helpers
145
146class CLIArgSpec(tuple):
147    def __new__(cls, *args, **kwargs):
148        return super().__new__(cls, (args, kwargs))
149
150    def __repr__(self):
151        args, kwargs = self
152        args = [repr(arg) for arg in args]
153        for name, value in kwargs.items():
154            args.append(f'{name}={value!r}')
155        return f'{type(self).__name__}({", ".join(args)})'
156
157    def __call__(self, parser, *, _noop=(lambda a: None)):
158        self.apply(parser)
159        return _noop
160
161    def apply(self, parser):
162        args, kwargs = self
163        parser.add_argument(*args, **kwargs)
164
165
166def apply_cli_argspecs(parser, specs):
167    processors = []
168    for spec in specs:
169        if callable(spec):
170            procs = spec(parser)
171            _add_procs(processors, procs)
172        else:
173            args, kwargs = spec
174            parser.add_argument(args, kwargs)
175    return processors
176
177
178def _add_procs(flattened, procs):
179    # XXX Fail on non-empty, non-callable procs?
180    if not procs:
181        return
182    if callable(procs):
183        flattened.append(procs)
184    else:
185        #processors.extend(p for p in procs if callable(p))
186        for proc in procs:
187            _add_procs(flattened, proc)
188
189
190def add_verbosity_cli(parser):
191    parser.add_argument('-q', '--quiet', action='count', default=0)
192    parser.add_argument('-v', '--verbose', action='count', default=0)
193
194    def process_args(args, *, argv=None):
195        ns = vars(args)
196        key = 'verbosity'
197        if key in ns:
198            parser.error(f'duplicate arg {key!r}')
199        ns[key] = max(0, VERBOSITY + ns.pop('verbose') - ns.pop('quiet'))
200        return key
201    return process_args
202
203
204def add_traceback_cli(parser):
205    parser.add_argument('--traceback', '--tb', action='store_true',
206                        default=TRACEBACK)
207    parser.add_argument('--no-traceback', '--no-tb', dest='traceback',
208                        action='store_const', const=False)
209
210    def process_args(args, *, argv=None):
211        ns = vars(args)
212        key = 'traceback_cm'
213        if key in ns:
214            parser.error(f'duplicate arg {key!r}')
215        showtb = ns.pop('traceback')
216
217        @contextlib.contextmanager
218        def traceback_cm():
219            restore = loggingutil.hide_emit_errors()
220            try:
221                yield
222            except BrokenPipeError:
223                # It was piped to "head" or something similar.
224                pass
225            except NotImplementedError:
226                raise  # re-raise
227            except Exception as exc:
228                if not showtb:
229                    sys.exit(f'ERROR: {exc}')
230                raise  # re-raise
231            except KeyboardInterrupt:
232                if not showtb:
233                    sys.exit('\nINTERRUPTED')
234                raise  # re-raise
235            except BaseException as exc:
236                if not showtb:
237                    sys.exit(f'{type(exc).__name__}: {exc}')
238                raise  # re-raise
239            finally:
240                restore()
241        ns[key] = traceback_cm()
242        return key
243    return process_args
244
245
246def add_sepval_cli(parser, opt, dest, choices, *, sep=',', **kwargs):
247#    if opt is True:
248#        parser.add_argument(f'--{dest}', action='append', **kwargs)
249#    elif isinstance(opt, str) and opt.startswith('-'):
250#        parser.add_argument(opt, dest=dest, action='append', **kwargs)
251#    else:
252#        arg = dest if not opt else opt
253#        kwargs.setdefault('nargs', '+')
254#        parser.add_argument(arg, dest=dest, action='append', **kwargs)
255    if not isinstance(opt, str):
256        parser.error(f'opt must be a string, got {opt!r}')
257    elif opt.startswith('-'):
258        parser.add_argument(opt, dest=dest, action='append', **kwargs)
259    else:
260        kwargs.setdefault('nargs', '+')
261        #kwargs.setdefault('metavar', opt.upper())
262        parser.add_argument(opt, dest=dest, action='append', **kwargs)
263
264    def process_args(args, *, argv=None):
265        ns = vars(args)
266
267        # XXX Use normalize_selection()?
268        if isinstance(ns[dest], str):
269            ns[dest] = [ns[dest]]
270        selections = []
271        for many in ns[dest] or ():
272            for value in many.split(sep):
273                if value not in choices:
274                    parser.error(f'unknown {dest} {value!r}')
275                selections.append(value)
276        ns[dest] = selections
277    return process_args
278
279
280def add_files_cli(parser, *, excluded=None, nargs=None):
281    process_files = add_file_filtering_cli(parser, excluded=excluded)
282    parser.add_argument('filenames', nargs=nargs or '+', metavar='FILENAME')
283    return [
284        process_files,
285    ]
286
287
288def add_file_filtering_cli(parser, *, excluded=None):
289    parser.add_argument('--start')
290    parser.add_argument('--include', action='append')
291    parser.add_argument('--exclude', action='append')
292
293    excluded = tuple(excluded or ())
294
295    def process_args(args, *, argv=None):
296        ns = vars(args)
297        key = 'iter_filenames'
298        if key in ns:
299            parser.error(f'duplicate arg {key!r}')
300
301        _include = tuple(ns.pop('include') or ())
302        _exclude = excluded + tuple(ns.pop('exclude') or ())
303        kwargs = dict(
304            start=ns.pop('start'),
305            include=tuple(_parse_files(_include)),
306            exclude=tuple(_parse_files(_exclude)),
307            # We use the default for "show_header"
308        )
309        def process_filenames(filenames, relroot=None):
310            return fsutil.process_filenames(filenames, relroot=relroot, **kwargs)
311        ns[key] = process_filenames
312    return process_args
313
314
315def _parse_files(filenames):
316    for filename, _ in strutil.parse_entries(filenames):
317        yield filename.strip()
318
319
320def add_progress_cli(parser, *, threshold=VERBOSITY, **kwargs):
321    parser.add_argument('--progress', dest='track_progress', action='store_const', const=True)
322    parser.add_argument('--no-progress', dest='track_progress', action='store_false')
323    parser.set_defaults(track_progress=True)
324
325    def process_args(args, *, argv=None):
326        if args.track_progress:
327            ns = vars(args)
328            verbosity = ns.get('verbosity', VERBOSITY)
329            if verbosity <= threshold:
330                args.track_progress = track_progress_compact
331            else:
332                args.track_progress = track_progress_flat
333    return process_args
334
335
336def add_failure_filtering_cli(parser, pool, *, default=False):
337    parser.add_argument('--fail', action='append',
338                        metavar=f'"{{all|{"|".join(sorted(pool))}}},..."')
339    parser.add_argument('--no-fail', dest='fail', action='store_const', const=())
340
341    def process_args(args, *, argv=None):
342        ns = vars(args)
343
344        fail = ns.pop('fail')
345        try:
346            fail = normalize_selection(fail, possible=pool)
347        except UnsupportedSelectionError as exc:
348            parser.error(f'invalid --fail values: {", ".join(exc.unique)}')
349        else:
350            if fail is None:
351                fail = default
352
353            if fail is True:
354                def ignore_exc(_exc):
355                    return False
356            elif fail is False:
357                def ignore_exc(_exc):
358                    return True
359            else:
360                def ignore_exc(exc):
361                    for err in fail:
362                        if type(exc) == pool[err]:
363                            return False
364                    else:
365                        return True
366            args.ignore_exc = ignore_exc
367    return process_args
368
369
370def add_kind_filtering_cli(parser, *, default=None):
371    parser.add_argument('--kinds', action='append')
372
373    def process_args(args, *, argv=None):
374        ns = vars(args)
375
376        kinds = []
377        for kind in ns.pop('kinds') or default or ():
378            kinds.extend(kind.strip().replace(',', ' ').split())
379
380        if not kinds:
381            match_kind = (lambda k: True)
382        else:
383            included = set()
384            excluded = set()
385            for kind in kinds:
386                if kind.startswith('-'):
387                    kind = kind[1:]
388                    excluded.add(kind)
389                    if kind in included:
390                        included.remove(kind)
391                else:
392                    included.add(kind)
393                    if kind in excluded:
394                        excluded.remove(kind)
395            if excluded:
396                if included:
397                    ...  # XXX fail?
398                def match_kind(kind, *, _excluded=excluded):
399                    return kind not in _excluded
400            else:
401                def match_kind(kind, *, _included=included):
402                    return kind in _included
403        args.match_kind = match_kind
404    return process_args
405
406
407COMMON_CLI = [
408    add_verbosity_cli,
409    add_traceback_cli,
410    #add_dryrun_cli,
411]
412
413
414def add_commands_cli(parser, commands, *, commonspecs=COMMON_CLI, subset=None):
415    arg_processors = {}
416    if isinstance(subset, str):
417        cmdname = subset
418        try:
419            _, argspecs, _ = commands[cmdname]
420        except KeyError:
421            raise ValueError(f'unsupported subset {subset!r}')
422        parser.set_defaults(cmd=cmdname)
423        arg_processors[cmdname] = _add_cmd_cli(parser, commonspecs, argspecs)
424    else:
425        if subset is None:
426            cmdnames = subset = list(commands)
427        elif not subset:
428            raise NotImplementedError
429        elif isinstance(subset, set):
430            cmdnames = [k for k in commands if k in subset]
431            subset = sorted(subset)
432        else:
433            cmdnames = [n for n in subset if n in commands]
434        if len(cmdnames) < len(subset):
435            bad = tuple(n for n in subset if n not in commands)
436            raise ValueError(f'unsupported subset {bad}')
437
438        common = argparse.ArgumentParser(add_help=False)
439        common_processors = apply_cli_argspecs(common, commonspecs)
440        subs = parser.add_subparsers(dest='cmd')
441        for cmdname in cmdnames:
442            description, argspecs, _ = commands[cmdname]
443            sub = subs.add_parser(
444                cmdname,
445                description=description,
446                parents=[common],
447            )
448            cmd_processors = _add_cmd_cli(sub, (), argspecs)
449            arg_processors[cmdname] = common_processors + cmd_processors
450    return arg_processors
451
452
453def _add_cmd_cli(parser, commonspecs, argspecs):
454    processors = []
455    argspecs = list(commonspecs or ()) + list(argspecs or ())
456    for argspec in argspecs:
457        if callable(argspec):
458            procs = argspec(parser)
459            _add_procs(processors, procs)
460        else:
461            if not argspec:
462                raise NotImplementedError
463            args = list(argspec)
464            if not isinstance(args[-1], str):
465                kwargs = args.pop()
466                if not isinstance(args[0], str):
467                    try:
468                        args, = args
469                    except (TypeError, ValueError):
470                        parser.error(f'invalid cmd args {argspec!r}')
471            else:
472                kwargs = {}
473            parser.add_argument(*args, **kwargs)
474            # There will be nothing to process.
475    return processors
476
477
478def _flatten_processors(processors):
479    for proc in processors:
480        if proc is None:
481            continue
482        if callable(proc):
483            yield proc
484        else:
485            yield from _flatten_processors(proc)
486
487
488def process_args(args, argv, processors, *, keys=None):
489    processors = _flatten_processors(processors)
490    ns = vars(args)
491    extracted = {}
492    if keys is None:
493        for process_args in processors:
494            for key in process_args(args, argv=argv):
495                extracted[key] = ns.pop(key)
496    else:
497        remainder = set(keys)
498        for process_args in processors:
499            hanging = process_args(args, argv=argv)
500            if isinstance(hanging, str):
501                hanging = [hanging]
502            for key in hanging or ():
503                if key not in remainder:
504                    raise NotImplementedError(key)
505                extracted[key] = ns.pop(key)
506                remainder.remove(key)
507        if remainder:
508            raise NotImplementedError(sorted(remainder))
509    return extracted
510
511
512def process_args_by_key(args, argv, processors, keys):
513    extracted = process_args(args, argv, processors, keys=keys)
514    return [extracted[key] for key in keys]
515
516
517##################################
518# commands
519
520def set_command(name, add_cli):
521    """A decorator factory to set CLI info."""
522    def decorator(func):
523        if hasattr(func, '__cli__'):
524            raise Exception(f'already set')
525        func.__cli__ = (name, add_cli)
526        return func
527    return decorator
528
529
530##################################
531# main() helpers
532
533def filter_filenames(filenames, process_filenames=None, relroot=fsutil.USE_CWD):
534    # We expect each filename to be a normalized, absolute path.
535    for filename, _, check, _ in _iter_filenames(filenames, process_filenames, relroot):
536        if (reason := check()):
537            logger.debug(f'{filename}: {reason}')
538            continue
539        yield filename
540
541
542def main_for_filenames(filenames, process_filenames=None, relroot=fsutil.USE_CWD):
543    filenames, relroot = fsutil.fix_filenames(filenames, relroot=relroot)
544    for filename, relfile, check, show in _iter_filenames(filenames, process_filenames, relroot):
545        if show:
546            print()
547            print(relfile)
548            print('-------------------------------------------')
549        if (reason := check()):
550            print(reason)
551            continue
552        yield filename, relfile
553
554
555def _iter_filenames(filenames, process, relroot):
556    if process is None:
557        yield from fsutil.process_filenames(filenames, relroot=relroot)
558        return
559
560    onempty = Exception('no filenames provided')
561    items = process(filenames, relroot=relroot)
562    items, peeked = iterutil.peek_and_iter(items)
563    if not items:
564        raise onempty
565    if isinstance(peeked, str):
566        if relroot and relroot is not fsutil.USE_CWD:
567            relroot = os.path.abspath(relroot)
568        check = (lambda: True)
569        for filename, ismany in iterutil.iter_many(items, onempty):
570            relfile = fsutil.format_filename(filename, relroot, fixroot=False)
571            yield filename, relfile, check, ismany
572    elif len(peeked) == 4:
573        yield from items
574    else:
575        raise NotImplementedError
576
577
578def track_progress_compact(items, *, groups=5, **mark_kwargs):
579    last = os.linesep
580    marks = iter_marks(groups=groups, **mark_kwargs)
581    for item in items:
582        last = next(marks)
583        print(last, end='', flush=True)
584        yield item
585    if not last.endswith(os.linesep):
586        print()
587
588
589def track_progress_flat(items, fmt='<{}>'):
590    for item in items:
591        print(fmt.format(item), flush=True)
592        yield item
593
594
595def iter_marks(mark='.', *, group=5, groups=2, lines=_NOT_SET, sep=' '):
596    mark = mark or ''
597    group = group if group and group > 1 else 1
598    groups = groups if groups and groups > 1 else 1
599
600    sep = f'{mark}{sep}' if sep else mark
601    end = f'{mark}{os.linesep}'
602    div = os.linesep
603    perline = group * groups
604    if lines is _NOT_SET:
605        # By default we try to put about 100 in each line group.
606        perlines = 100 // perline * perline
607    elif not lines or lines < 0:
608        perlines = None
609    else:
610        perlines = perline * lines
611
612    if perline == 1:
613        yield end
614    elif group == 1:
615        yield sep
616
617    count = 1
618    while True:
619        if count % perline == 0:
620            yield end
621            if perlines and count % perlines == 0:
622                yield div
623        elif count % group == 0:
624            yield sep
625        else:
626            yield mark
627        count += 1
628