• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import argparse
2import contextlib
3import fnmatch
4import logging
5import os
6import os.path
7import shutil
8import sys
9
10from . import fsutil, strutil, iterutil, logging as loggingutil
11
12
13_NOT_SET = object()
14
15
16def get_prog(spec=None, *, absolute=False, allowsuffix=True):
17    if spec is None:
18        _, spec = _find_script()
19        # This is more natural for prog than __file__ would be.
20        filename = sys.argv[0]
21    elif isinstance(spec, str):
22        filename = os.path.normpath(spec)
23        spec = None
24    else:
25        filename = spec.origin
26    if _is_standalone(filename):
27        # Check if "installed".
28        if allowsuffix or not filename.endswith('.py'):
29            basename = os.path.basename(filename)
30            found = shutil.which(basename)
31            if found:
32                script = os.path.abspath(filename)
33                found = os.path.abspath(found)
34                if os.path.normcase(script) == os.path.normcase(found):
35                    return basename
36        # It is only "standalone".
37        if absolute:
38            filename = os.path.abspath(filename)
39        return filename
40    elif spec is not None:
41        module = spec.name
42        if module.endswith('.__main__'):
43            module = module[:-9]
44        return f'{sys.executable} -m {module}'
45    else:
46        if absolute:
47            filename = os.path.abspath(filename)
48        return f'{sys.executable} {filename}'
49
50
51def _find_script():
52    frame = sys._getframe(2)
53    while frame.f_globals['__name__'] != '__main__':
54        frame = frame.f_back
55
56    # This should match sys.argv[0].
57    filename = frame.f_globals['__file__']
58    # This will be None if -m wasn't used..
59    spec = frame.f_globals['__spec__']
60    return filename, spec
61
62
63def is_installed(filename, *, allowsuffix=True):
64    if not allowsuffix and filename.endswith('.py'):
65        return False
66    filename = os.path.abspath(os.path.normalize(filename))
67    found = shutil.which(os.path.basename(filename))
68    if not found:
69        return False
70    if found != filename:
71        return False
72    return _is_standalone(filename)
73
74
75def is_standalone(filename):
76    filename = os.path.abspath(os.path.normalize(filename))
77    return _is_standalone(filename)
78
79
80def _is_standalone(filename):
81    return fsutil.is_executable(filename)
82
83
84##################################
85# logging
86
87VERBOSITY = 3
88
89TRACEBACK = os.environ.get('SHOW_TRACEBACK', '').strip()
90TRACEBACK = bool(TRACEBACK and TRACEBACK.upper() not in ('0', 'FALSE', 'NO'))
91
92
93logger = logging.getLogger(__name__)
94
95
96def configure_logger(verbosity, logger=None, **kwargs):
97    if logger is None:
98        # Configure the root logger.
99        logger = logging.getLogger()
100    loggingutil.configure_logger(logger, verbosity, **kwargs)
101
102
103##################################
104# selections
105
106class UnsupportedSelectionError(Exception):
107    def __init__(self, values, possible):
108        self.values = tuple(values)
109        self.possible = tuple(possible)
110        super().__init__(f'unsupported selections {self.unique}')
111
112    @property
113    def unique(self):
114        return tuple(sorted(set(self.values)))
115
116
117def normalize_selection(selected: str, *, possible=None):
118    if selected in (None, True, False):
119        return selected
120    elif isinstance(selected, str):
121        selected = [selected]
122    elif not selected:
123        return ()
124
125    unsupported = []
126    _selected = set()
127    for item in selected:
128        if not item:
129            continue
130        for value in item.strip().replace(',', ' ').split():
131            if not value:
132                continue
133            # XXX Handle subtraction (leading "-").
134            if possible and value not in possible and value != 'all':
135                unsupported.append(value)
136            _selected.add(value)
137    if unsupported:
138        raise UnsupportedSelectionError(unsupported, tuple(possible))
139    if 'all' in _selected:
140        return True
141    return frozenset(selected)
142
143
144##################################
145# CLI parsing helpers
146
147class CLIArgSpec(tuple):
148    def __new__(cls, *args, **kwargs):
149        return super().__new__(cls, (args, kwargs))
150
151    def __repr__(self):
152        args, kwargs = self
153        args = [repr(arg) for arg in args]
154        for name, value in kwargs.items():
155            args.append(f'{name}={value!r}')
156        return f'{type(self).__name__}({", ".join(args)})'
157
158    def __call__(self, parser, *, _noop=(lambda a: None)):
159        self.apply(parser)
160        return _noop
161
162    def apply(self, parser):
163        args, kwargs = self
164        parser.add_argument(*args, **kwargs)
165
166
167def apply_cli_argspecs(parser, specs):
168    processors = []
169    for spec in specs:
170        if callable(spec):
171            procs = spec(parser)
172            _add_procs(processors, procs)
173        else:
174            args, kwargs = spec
175            parser.add_argument(args, kwargs)
176    return processors
177
178
179def _add_procs(flattened, procs):
180    # XXX Fail on non-empty, non-callable procs?
181    if not procs:
182        return
183    if callable(procs):
184        flattened.append(procs)
185    else:
186        #processors.extend(p for p in procs if callable(p))
187        for proc in procs:
188            _add_procs(flattened, proc)
189
190
191def add_verbosity_cli(parser):
192    parser.add_argument('-q', '--quiet', action='count', default=0)
193    parser.add_argument('-v', '--verbose', action='count', default=0)
194
195    def process_args(args, *, argv=None):
196        ns = vars(args)
197        key = 'verbosity'
198        if key in ns:
199            parser.error(f'duplicate arg {key!r}')
200        ns[key] = max(0, VERBOSITY + ns.pop('verbose') - ns.pop('quiet'))
201        return key
202    return process_args
203
204
205def add_traceback_cli(parser):
206    parser.add_argument('--traceback', '--tb', action='store_true',
207                        default=TRACEBACK)
208    parser.add_argument('--no-traceback', '--no-tb', dest='traceback',
209                        action='store_const', const=False)
210
211    def process_args(args, *, argv=None):
212        ns = vars(args)
213        key = 'traceback_cm'
214        if key in ns:
215            parser.error(f'duplicate arg {key!r}')
216        showtb = ns.pop('traceback')
217
218        @contextlib.contextmanager
219        def traceback_cm():
220            restore = loggingutil.hide_emit_errors()
221            try:
222                yield
223            except BrokenPipeError:
224                # It was piped to "head" or something similar.
225                pass
226            except NotImplementedError:
227                raise  # re-raise
228            except Exception as exc:
229                if not showtb:
230                    sys.exit(f'ERROR: {exc}')
231                raise  # re-raise
232            except KeyboardInterrupt:
233                if not showtb:
234                    sys.exit('\nINTERRUPTED')
235                raise  # re-raise
236            except BaseException as exc:
237                if not showtb:
238                    sys.exit(f'{type(exc).__name__}: {exc}')
239                raise  # re-raise
240            finally:
241                restore()
242        ns[key] = traceback_cm()
243        return key
244    return process_args
245
246
247def add_sepval_cli(parser, opt, dest, choices, *, sep=',', **kwargs):
248#    if opt is True:
249#        parser.add_argument(f'--{dest}', action='append', **kwargs)
250#    elif isinstance(opt, str) and opt.startswith('-'):
251#        parser.add_argument(opt, dest=dest, action='append', **kwargs)
252#    else:
253#        arg = dest if not opt else opt
254#        kwargs.setdefault('nargs', '+')
255#        parser.add_argument(arg, dest=dest, action='append', **kwargs)
256    if not isinstance(opt, str):
257        parser.error(f'opt must be a string, got {opt!r}')
258    elif opt.startswith('-'):
259        parser.add_argument(opt, dest=dest, action='append', **kwargs)
260    else:
261        kwargs.setdefault('nargs', '+')
262        #kwargs.setdefault('metavar', opt.upper())
263        parser.add_argument(opt, dest=dest, action='append', **kwargs)
264
265    def process_args(args, *, argv=None):
266        ns = vars(args)
267
268        # XXX Use normalize_selection()?
269        if isinstance(ns[dest], str):
270            ns[dest] = [ns[dest]]
271        selections = []
272        for many in ns[dest] or ():
273            for value in many.split(sep):
274                if value not in choices:
275                    parser.error(f'unknown {dest} {value!r}')
276                selections.append(value)
277        ns[dest] = selections
278    return process_args
279
280
281def add_files_cli(parser, *, excluded=None, nargs=None):
282    process_files = add_file_filtering_cli(parser, excluded=excluded)
283    parser.add_argument('filenames', nargs=nargs or '+', metavar='FILENAME')
284    return [
285        process_files,
286    ]
287
288
289def add_file_filtering_cli(parser, *, excluded=None):
290    parser.add_argument('--start')
291    parser.add_argument('--include', action='append')
292    parser.add_argument('--exclude', action='append')
293
294    excluded = tuple(excluded or ())
295
296    def process_args(args, *, argv=None):
297        ns = vars(args)
298        key = 'iter_filenames'
299        if key in ns:
300            parser.error(f'duplicate arg {key!r}')
301
302        _include = tuple(ns.pop('include') or ())
303        _exclude = excluded + tuple(ns.pop('exclude') or ())
304        kwargs = dict(
305            start=ns.pop('start'),
306            include=tuple(_parse_files(_include)),
307            exclude=tuple(_parse_files(_exclude)),
308            # We use the default for "show_header"
309        )
310        def process_filenames(filenames, relroot=None):
311            return fsutil.process_filenames(filenames, relroot=relroot, **kwargs)
312        ns[key] = process_filenames
313    return process_args
314
315
316def _parse_files(filenames):
317    for filename, _ in strutil.parse_entries(filenames):
318        yield filename.strip()
319
320
321def add_progress_cli(parser, *, threshold=VERBOSITY, **kwargs):
322    parser.add_argument('--progress', dest='track_progress', action='store_const', const=True)
323    parser.add_argument('--no-progress', dest='track_progress', action='store_false')
324    parser.set_defaults(track_progress=True)
325
326    def process_args(args, *, argv=None):
327        if args.track_progress:
328            ns = vars(args)
329            verbosity = ns.get('verbosity', VERBOSITY)
330            if verbosity <= threshold:
331                args.track_progress = track_progress_compact
332            else:
333                args.track_progress = track_progress_flat
334    return process_args
335
336
337def add_failure_filtering_cli(parser, pool, *, default=False):
338    parser.add_argument('--fail', action='append',
339                        metavar=f'"{{all|{"|".join(sorted(pool))}}},..."')
340    parser.add_argument('--no-fail', dest='fail', action='store_const', const=())
341
342    def process_args(args, *, argv=None):
343        ns = vars(args)
344
345        fail = ns.pop('fail')
346        try:
347            fail = normalize_selection(fail, possible=pool)
348        except UnsupportedSelectionError as exc:
349            parser.error(f'invalid --fail values: {", ".join(exc.unique)}')
350        else:
351            if fail is None:
352                fail = default
353
354            if fail is True:
355                def ignore_exc(_exc):
356                    return False
357            elif fail is False:
358                def ignore_exc(_exc):
359                    return True
360            else:
361                def ignore_exc(exc):
362                    for err in fail:
363                        if type(exc) == pool[err]:
364                            return False
365                    else:
366                        return True
367            args.ignore_exc = ignore_exc
368    return process_args
369
370
371def add_kind_filtering_cli(parser, *, default=None):
372    parser.add_argument('--kinds', action='append')
373
374    def process_args(args, *, argv=None):
375        ns = vars(args)
376
377        kinds = []
378        for kind in ns.pop('kinds') or default or ():
379            kinds.extend(kind.strip().replace(',', ' ').split())
380
381        if not kinds:
382            match_kind = (lambda k: True)
383        else:
384            included = set()
385            excluded = set()
386            for kind in kinds:
387                if kind.startswith('-'):
388                    kind = kind[1:]
389                    excluded.add(kind)
390                    if kind in included:
391                        included.remove(kind)
392                else:
393                    included.add(kind)
394                    if kind in excluded:
395                        excluded.remove(kind)
396            if excluded:
397                if included:
398                    ...  # XXX fail?
399                def match_kind(kind, *, _excluded=excluded):
400                    return kind not in _excluded
401            else:
402                def match_kind(kind, *, _included=included):
403                    return kind in _included
404        args.match_kind = match_kind
405    return process_args
406
407
408COMMON_CLI = [
409    add_verbosity_cli,
410    add_traceback_cli,
411    #add_dryrun_cli,
412]
413
414
415def add_commands_cli(parser, commands, *, commonspecs=COMMON_CLI, subset=None):
416    arg_processors = {}
417    if isinstance(subset, str):
418        cmdname = subset
419        try:
420            _, argspecs, _ = commands[cmdname]
421        except KeyError:
422            raise ValueError(f'unsupported subset {subset!r}')
423        parser.set_defaults(cmd=cmdname)
424        arg_processors[cmdname] = _add_cmd_cli(parser, commonspecs, argspecs)
425    else:
426        if subset is None:
427            cmdnames = subset = list(commands)
428        elif not subset:
429            raise NotImplementedError
430        elif isinstance(subset, set):
431            cmdnames = [k for k in commands if k in subset]
432            subset = sorted(subset)
433        else:
434            cmdnames = [n for n in subset if n in commands]
435        if len(cmdnames) < len(subset):
436            bad = tuple(n for n in subset if n not in commands)
437            raise ValueError(f'unsupported subset {bad}')
438
439        common = argparse.ArgumentParser(add_help=False)
440        common_processors = apply_cli_argspecs(common, commonspecs)
441        subs = parser.add_subparsers(dest='cmd')
442        for cmdname in cmdnames:
443            description, argspecs, _ = commands[cmdname]
444            sub = subs.add_parser(
445                cmdname,
446                description=description,
447                parents=[common],
448            )
449            cmd_processors = _add_cmd_cli(sub, (), argspecs)
450            arg_processors[cmdname] = common_processors + cmd_processors
451    return arg_processors
452
453
454def _add_cmd_cli(parser, commonspecs, argspecs):
455    processors = []
456    argspecs = list(commonspecs or ()) + list(argspecs or ())
457    for argspec in argspecs:
458        if callable(argspec):
459            procs = argspec(parser)
460            _add_procs(processors, procs)
461        else:
462            if not argspec:
463                raise NotImplementedError
464            args = list(argspec)
465            if not isinstance(args[-1], str):
466                kwargs = args.pop()
467                if not isinstance(args[0], str):
468                    try:
469                        args, = args
470                    except (TypeError, ValueError):
471                        parser.error(f'invalid cmd args {argspec!r}')
472            else:
473                kwargs = {}
474            parser.add_argument(*args, **kwargs)
475            # There will be nothing to process.
476    return processors
477
478
479def _flatten_processors(processors):
480    for proc in processors:
481        if proc is None:
482            continue
483        if callable(proc):
484            yield proc
485        else:
486            yield from _flatten_processors(proc)
487
488
489def process_args(args, argv, processors, *, keys=None):
490    processors = _flatten_processors(processors)
491    ns = vars(args)
492    extracted = {}
493    if keys is None:
494        for process_args in processors:
495            for key in process_args(args, argv=argv):
496                extracted[key] = ns.pop(key)
497    else:
498        remainder = set(keys)
499        for process_args in processors:
500            hanging = process_args(args, argv=argv)
501            if isinstance(hanging, str):
502                hanging = [hanging]
503            for key in hanging or ():
504                if key not in remainder:
505                    raise NotImplementedError(key)
506                extracted[key] = ns.pop(key)
507                remainder.remove(key)
508        if remainder:
509            raise NotImplementedError(sorted(remainder))
510    return extracted
511
512
513def process_args_by_key(args, argv, processors, keys):
514    extracted = process_args(args, argv, processors, keys=keys)
515    return [extracted[key] for key in keys]
516
517
518##################################
519# commands
520
521def set_command(name, add_cli):
522    """A decorator factory to set CLI info."""
523    def decorator(func):
524        if hasattr(func, '__cli__'):
525            raise Exception(f'already set')
526        func.__cli__ = (name, add_cli)
527        return func
528    return decorator
529
530
531##################################
532# main() helpers
533
534def filter_filenames(filenames, process_filenames=None, relroot=fsutil.USE_CWD):
535    # We expect each filename to be a normalized, absolute path.
536    for filename, _, check, _ in _iter_filenames(filenames, process_filenames, relroot):
537        if (reason := check()):
538            logger.debug(f'{filename}: {reason}')
539            continue
540        yield filename
541
542
543def main_for_filenames(filenames, process_filenames=None, relroot=fsutil.USE_CWD):
544    filenames, relroot = fsutil.fix_filenames(filenames, relroot=relroot)
545    for filename, relfile, check, show in _iter_filenames(filenames, process_filenames, relroot):
546        if show:
547            print()
548            print(relfile)
549            print('-------------------------------------------')
550        if (reason := check()):
551            print(reason)
552            continue
553        yield filename, relfile
554
555
556def _iter_filenames(filenames, process, relroot):
557    if process is None:
558        yield from fsutil.process_filenames(filenames, relroot=relroot)
559        return
560
561    onempty = Exception('no filenames provided')
562    items = process(filenames, relroot=relroot)
563    items, peeked = iterutil.peek_and_iter(items)
564    if not items:
565        raise onempty
566    if isinstance(peeked, str):
567        if relroot and relroot is not fsutil.USE_CWD:
568            relroot = os.path.abspath(relroot)
569        check = (lambda: True)
570        for filename, ismany in iterutil.iter_many(items, onempty):
571            relfile = fsutil.format_filename(filename, relroot, fixroot=False)
572            yield filename, relfile, check, ismany
573    elif len(peeked) == 4:
574        yield from items
575    else:
576        raise NotImplementedError
577
578
579def track_progress_compact(items, *, groups=5, **mark_kwargs):
580    last = os.linesep
581    marks = iter_marks(groups=groups, **mark_kwargs)
582    for item in items:
583        last = next(marks)
584        print(last, end='', flush=True)
585        yield item
586    if not last.endswith(os.linesep):
587        print()
588
589
590def track_progress_flat(items, fmt='<{}>'):
591    for item in items:
592        print(fmt.format(item), flush=True)
593        yield item
594
595
596def iter_marks(mark='.', *, group=5, groups=2, lines=_NOT_SET, sep=' '):
597    mark = mark or ''
598    group = group if group and group > 1 else 1
599    groups = groups if groups and groups > 1 else 1
600
601    sep = f'{mark}{sep}' if sep else mark
602    end = f'{mark}{os.linesep}'
603    div = os.linesep
604    perline = group * groups
605    if lines is _NOT_SET:
606        # By default we try to put about 100 in each line group.
607        perlines = 100 // perline * perline
608    elif not lines or lines < 0:
609        perlines = None
610    else:
611        perlines = perline * lines
612
613    if perline == 1:
614        yield end
615    elif group == 1:
616        yield sep
617
618    count = 1
619    while True:
620        if count % perline == 0:
621            yield end
622            if perlines and count % perlines == 0:
623                yield div
624        elif count % group == 0:
625            yield sep
626        else:
627            yield mark
628        count += 1
629