1 import argparse
2 import contextlib
3 import fnmatch
4 import logging
5 import os
6 import os.path
7 import shutil
8 import sys
9
10 from . import fsutil, strutil, iterutil, logging as loggingutil
11
12
13 _NOT_SET = object()
14
15
16 def get_prog(spec=None, *, absolute=False, allowsuffix=True):
17 if spec is None:
18 _, spec = _find_script()
19 # This is more natural for prog than __file__ would be.
20 filename = sys.argv[0]
21 elif isinstance(spec, str):
22 filename = os.path.normpath(spec)
23 spec = None
24 else:
25 filename = spec.origin
26 if _is_standalone(filename):
27 # Check if "installed".
28 if allowsuffix or not filename.endswith('.py'):
29 basename = os.path.basename(filename)
30 found = shutil.which(basename)
31 if found:
32 script = os.path.abspath(filename)
33 found = os.path.abspath(found)
34 if os.path.normcase(script) == os.path.normcase(found):
35 return basename
36 # It is only "standalone".
37 if absolute:
38 filename = os.path.abspath(filename)
39 return filename
40 elif spec is not None:
41 module = spec.name
42 if module.endswith('.__main__'):
43 module = module[:-9]
44 return f'{sys.executable} -m {module}'
45 else:
46 if absolute:
47 filename = os.path.abspath(filename)
48 return f'{sys.executable} {filename}'
49
50
51 def _find_script():
52 frame = sys._getframe(2)
53 while frame.f_globals['__name__'] != '__main__':
54 frame = frame.f_back
55
56 # This should match sys.argv[0].
57 filename = frame.f_globals['__file__']
58 # This will be None if -m wasn't used..
59 spec = frame.f_globals['__spec__']
60 return filename, spec
61
62
63 def is_installed(filename, *, allowsuffix=True):
64 if not allowsuffix and filename.endswith('.py'):
65 return False
66 filename = os.path.abspath(os.path.normalize(filename))
67 found = shutil.which(os.path.basename(filename))
68 if not found:
69 return False
70 if found != filename:
71 return False
72 return _is_standalone(filename)
73
74
75 def is_standalone(filename):
76 filename = os.path.abspath(os.path.normalize(filename))
77 return _is_standalone(filename)
78
79
80 def _is_standalone(filename):
81 return fsutil.is_executable(filename)
82
83
84 ##################################
85 # logging
86
87 VERBOSITY = 3
88
89 TRACEBACK = os.environ.get('SHOW_TRACEBACK', '').strip()
90 TRACEBACK = bool(TRACEBACK and TRACEBACK.upper() not in ('0', 'FALSE', 'NO'))
91
92
93 logger = logging.getLogger(__name__)
94
95
96 def configure_logger(verbosity, logger=None, **kwargs):
97 if logger is None:
98 # Configure the root logger.
99 logger = logging.getLogger()
100 loggingutil.configure_logger(logger, verbosity, **kwargs)
101
102
103 ##################################
104 # selections
105
106 class ESC[4;38;5;81mUnsupportedSelectionError(ESC[4;38;5;149mException):
107 def __init__(self, values, possible):
108 self.values = tuple(values)
109 self.possible = tuple(possible)
110 super().__init__(f'unsupported selections {self.unique}')
111
112 @property
113 def unique(self):
114 return tuple(sorted(set(self.values)))
115
116
117 def normalize_selection(selected: str, *, possible=None):
118 if selected in (None, True, False):
119 return selected
120 elif isinstance(selected, str):
121 selected = [selected]
122 elif not selected:
123 return ()
124
125 unsupported = []
126 _selected = set()
127 for item in selected:
128 if not item:
129 continue
130 for value in item.strip().replace(',', ' ').split():
131 if not value:
132 continue
133 # XXX Handle subtraction (leading "-").
134 if possible and value not in possible and value != 'all':
135 unsupported.append(value)
136 _selected.add(value)
137 if unsupported:
138 raise UnsupportedSelectionError(unsupported, tuple(possible))
139 if 'all' in _selected:
140 return True
141 return frozenset(selected)
142
143
144 ##################################
145 # CLI parsing helpers
146
147 class ESC[4;38;5;81mCLIArgSpec(ESC[4;38;5;149mtuple):
148 def __new__(cls, *args, **kwargs):
149 return super().__new__(cls, (args, kwargs))
150
151 def __repr__(self):
152 args, kwargs = self
153 args = [repr(arg) for arg in args]
154 for name, value in kwargs.items():
155 args.append(f'{name}={value!r}')
156 return f'{type(self).__name__}({", ".join(args)})'
157
158 def __call__(self, parser, *, _noop=(lambda a: None)):
159 self.apply(parser)
160 return _noop
161
162 def apply(self, parser):
163 args, kwargs = self
164 parser.add_argument(*args, **kwargs)
165
166
167 def apply_cli_argspecs(parser, specs):
168 processors = []
169 for spec in specs:
170 if callable(spec):
171 procs = spec(parser)
172 _add_procs(processors, procs)
173 else:
174 args, kwargs = spec
175 parser.add_argument(args, kwargs)
176 return processors
177
178
179 def _add_procs(flattened, procs):
180 # XXX Fail on non-empty, non-callable procs?
181 if not procs:
182 return
183 if callable(procs):
184 flattened.append(procs)
185 else:
186 #processors.extend(p for p in procs if callable(p))
187 for proc in procs:
188 _add_procs(flattened, proc)
189
190
191 def add_verbosity_cli(parser):
192 parser.add_argument('-q', '--quiet', action='count', default=0)
193 parser.add_argument('-v', '--verbose', action='count', default=0)
194
195 def process_args(args, *, argv=None):
196 ns = vars(args)
197 key = 'verbosity'
198 if key in ns:
199 parser.error(f'duplicate arg {key!r}')
200 ns[key] = max(0, VERBOSITY + ns.pop('verbose') - ns.pop('quiet'))
201 return key
202 return process_args
203
204
205 def add_traceback_cli(parser):
206 parser.add_argument('--traceback', '--tb', action='store_true',
207 default=TRACEBACK)
208 parser.add_argument('--no-traceback', '--no-tb', dest='traceback',
209 action='store_const', const=False)
210
211 def process_args(args, *, argv=None):
212 ns = vars(args)
213 key = 'traceback_cm'
214 if key in ns:
215 parser.error(f'duplicate arg {key!r}')
216 showtb = ns.pop('traceback')
217
218 @contextlib.contextmanager
219 def traceback_cm():
220 restore = loggingutil.hide_emit_errors()
221 try:
222 yield
223 except BrokenPipeError:
224 # It was piped to "head" or something similar.
225 pass
226 except NotImplementedError:
227 raise # re-raise
228 except Exception as exc:
229 if not showtb:
230 sys.exit(f'ERROR: {exc}')
231 raise # re-raise
232 except KeyboardInterrupt:
233 if not showtb:
234 sys.exit('\nINTERRUPTED')
235 raise # re-raise
236 except BaseException as exc:
237 if not showtb:
238 sys.exit(f'{type(exc).__name__}: {exc}')
239 raise # re-raise
240 finally:
241 restore()
242 ns[key] = traceback_cm()
243 return key
244 return process_args
245
246
247 def add_sepval_cli(parser, opt, dest, choices, *, sep=',', **kwargs):
248 # if opt is True:
249 # parser.add_argument(f'--{dest}', action='append', **kwargs)
250 # elif isinstance(opt, str) and opt.startswith('-'):
251 # parser.add_argument(opt, dest=dest, action='append', **kwargs)
252 # else:
253 # arg = dest if not opt else opt
254 # kwargs.setdefault('nargs', '+')
255 # parser.add_argument(arg, dest=dest, action='append', **kwargs)
256 if not isinstance(opt, str):
257 parser.error(f'opt must be a string, got {opt!r}')
258 elif opt.startswith('-'):
259 parser.add_argument(opt, dest=dest, action='append', **kwargs)
260 else:
261 kwargs.setdefault('nargs', '+')
262 #kwargs.setdefault('metavar', opt.upper())
263 parser.add_argument(opt, dest=dest, action='append', **kwargs)
264
265 def process_args(args, *, argv=None):
266 ns = vars(args)
267
268 # XXX Use normalize_selection()?
269 if isinstance(ns[dest], str):
270 ns[dest] = [ns[dest]]
271 selections = []
272 for many in ns[dest] or ():
273 for value in many.split(sep):
274 if value not in choices:
275 parser.error(f'unknown {dest} {value!r}')
276 selections.append(value)
277 ns[dest] = selections
278 return process_args
279
280
281 def add_files_cli(parser, *, excluded=None, nargs=None):
282 process_files = add_file_filtering_cli(parser, excluded=excluded)
283 parser.add_argument('filenames', nargs=nargs or '+', metavar='FILENAME')
284 return [
285 process_files,
286 ]
287
288
289 def add_file_filtering_cli(parser, *, excluded=None):
290 parser.add_argument('--start')
291 parser.add_argument('--include', action='append')
292 parser.add_argument('--exclude', action='append')
293
294 excluded = tuple(excluded or ())
295
296 def process_args(args, *, argv=None):
297 ns = vars(args)
298 key = 'iter_filenames'
299 if key in ns:
300 parser.error(f'duplicate arg {key!r}')
301
302 _include = tuple(ns.pop('include') or ())
303 _exclude = excluded + tuple(ns.pop('exclude') or ())
304 kwargs = dict(
305 start=ns.pop('start'),
306 include=tuple(_parse_files(_include)),
307 exclude=tuple(_parse_files(_exclude)),
308 # We use the default for "show_header"
309 )
310 def process_filenames(filenames, relroot=None):
311 return fsutil.process_filenames(filenames, relroot=relroot, **kwargs)
312 ns[key] = process_filenames
313 return process_args
314
315
316 def _parse_files(filenames):
317 for filename, _ in strutil.parse_entries(filenames):
318 yield filename.strip()
319
320
321 def add_progress_cli(parser, *, threshold=VERBOSITY, **kwargs):
322 parser.add_argument('--progress', dest='track_progress', action='store_const', const=True)
323 parser.add_argument('--no-progress', dest='track_progress', action='store_false')
324 parser.set_defaults(track_progress=True)
325
326 def process_args(args, *, argv=None):
327 if args.track_progress:
328 ns = vars(args)
329 verbosity = ns.get('verbosity', VERBOSITY)
330 if verbosity <= threshold:
331 args.track_progress = track_progress_compact
332 else:
333 args.track_progress = track_progress_flat
334 return process_args
335
336
337 def add_failure_filtering_cli(parser, pool, *, default=False):
338 parser.add_argument('--fail', action='append',
339 metavar=f'"{{all|{"|".join(sorted(pool))}}},..."')
340 parser.add_argument('--no-fail', dest='fail', action='store_const', const=())
341
342 def process_args(args, *, argv=None):
343 ns = vars(args)
344
345 fail = ns.pop('fail')
346 try:
347 fail = normalize_selection(fail, possible=pool)
348 except UnsupportedSelectionError as exc:
349 parser.error(f'invalid --fail values: {", ".join(exc.unique)}')
350 else:
351 if fail is None:
352 fail = default
353
354 if fail is True:
355 def ignore_exc(_exc):
356 return False
357 elif fail is False:
358 def ignore_exc(_exc):
359 return True
360 else:
361 def ignore_exc(exc):
362 for err in fail:
363 if type(exc) == pool[err]:
364 return False
365 else:
366 return True
367 args.ignore_exc = ignore_exc
368 return process_args
369
370
371 def add_kind_filtering_cli(parser, *, default=None):
372 parser.add_argument('--kinds', action='append')
373
374 def process_args(args, *, argv=None):
375 ns = vars(args)
376
377 kinds = []
378 for kind in ns.pop('kinds') or default or ():
379 kinds.extend(kind.strip().replace(',', ' ').split())
380
381 if not kinds:
382 match_kind = (lambda k: True)
383 else:
384 included = set()
385 excluded = set()
386 for kind in kinds:
387 if kind.startswith('-'):
388 kind = kind[1:]
389 excluded.add(kind)
390 if kind in included:
391 included.remove(kind)
392 else:
393 included.add(kind)
394 if kind in excluded:
395 excluded.remove(kind)
396 if excluded:
397 if included:
398 ... # XXX fail?
399 def match_kind(kind, *, _excluded=excluded):
400 return kind not in _excluded
401 else:
402 def match_kind(kind, *, _included=included):
403 return kind in _included
404 args.match_kind = match_kind
405 return process_args
406
407
408 COMMON_CLI = [
409 add_verbosity_cli,
410 add_traceback_cli,
411 #add_dryrun_cli,
412 ]
413
414
415 def add_commands_cli(parser, commands, *, commonspecs=COMMON_CLI, subset=None):
416 arg_processors = {}
417 if isinstance(subset, str):
418 cmdname = subset
419 try:
420 _, argspecs, _ = commands[cmdname]
421 except KeyError:
422 raise ValueError(f'unsupported subset {subset!r}')
423 parser.set_defaults(cmd=cmdname)
424 arg_processors[cmdname] = _add_cmd_cli(parser, commonspecs, argspecs)
425 else:
426 if subset is None:
427 cmdnames = subset = list(commands)
428 elif not subset:
429 raise NotImplementedError
430 elif isinstance(subset, set):
431 cmdnames = [k for k in commands if k in subset]
432 subset = sorted(subset)
433 else:
434 cmdnames = [n for n in subset if n in commands]
435 if len(cmdnames) < len(subset):
436 bad = tuple(n for n in subset if n not in commands)
437 raise ValueError(f'unsupported subset {bad}')
438
439 common = argparse.ArgumentParser(add_help=False)
440 common_processors = apply_cli_argspecs(common, commonspecs)
441 subs = parser.add_subparsers(dest='cmd')
442 for cmdname in cmdnames:
443 description, argspecs, _ = commands[cmdname]
444 sub = subs.add_parser(
445 cmdname,
446 description=description,
447 parents=[common],
448 )
449 cmd_processors = _add_cmd_cli(sub, (), argspecs)
450 arg_processors[cmdname] = common_processors + cmd_processors
451 return arg_processors
452
453
454 def _add_cmd_cli(parser, commonspecs, argspecs):
455 processors = []
456 argspecs = list(commonspecs or ()) + list(argspecs or ())
457 for argspec in argspecs:
458 if callable(argspec):
459 procs = argspec(parser)
460 _add_procs(processors, procs)
461 else:
462 if not argspec:
463 raise NotImplementedError
464 args = list(argspec)
465 if not isinstance(args[-1], str):
466 kwargs = args.pop()
467 if not isinstance(args[0], str):
468 try:
469 args, = args
470 except (TypeError, ValueError):
471 parser.error(f'invalid cmd args {argspec!r}')
472 else:
473 kwargs = {}
474 parser.add_argument(*args, **kwargs)
475 # There will be nothing to process.
476 return processors
477
478
479 def _flatten_processors(processors):
480 for proc in processors:
481 if proc is None:
482 continue
483 if callable(proc):
484 yield proc
485 else:
486 yield from _flatten_processors(proc)
487
488
489 def process_args(args, argv, processors, *, keys=None):
490 processors = _flatten_processors(processors)
491 ns = vars(args)
492 extracted = {}
493 if keys is None:
494 for process_args in processors:
495 for key in process_args(args, argv=argv):
496 extracted[key] = ns.pop(key)
497 else:
498 remainder = set(keys)
499 for process_args in processors:
500 hanging = process_args(args, argv=argv)
501 if isinstance(hanging, str):
502 hanging = [hanging]
503 for key in hanging or ():
504 if key not in remainder:
505 raise NotImplementedError(key)
506 extracted[key] = ns.pop(key)
507 remainder.remove(key)
508 if remainder:
509 raise NotImplementedError(sorted(remainder))
510 return extracted
511
512
513 def process_args_by_key(args, argv, processors, keys):
514 extracted = process_args(args, argv, processors, keys=keys)
515 return [extracted[key] for key in keys]
516
517
518 ##################################
519 # commands
520
521 def set_command(name, add_cli):
522 """A decorator factory to set CLI info."""
523 def decorator(func):
524 if hasattr(func, '__cli__'):
525 raise Exception(f'already set')
526 func.__cli__ = (name, add_cli)
527 return func
528 return decorator
529
530
531 ##################################
532 # main() helpers
533
534 def filter_filenames(filenames, process_filenames=None, relroot=fsutil.USE_CWD):
535 # We expect each filename to be a normalized, absolute path.
536 for filename, _, check, _ in _iter_filenames(filenames, process_filenames, relroot):
537 if (reason := check()):
538 logger.debug(f'{filename}: {reason}')
539 continue
540 yield filename
541
542
543 def main_for_filenames(filenames, process_filenames=None, relroot=fsutil.USE_CWD):
544 filenames, relroot = fsutil.fix_filenames(filenames, relroot=relroot)
545 for filename, relfile, check, show in _iter_filenames(filenames, process_filenames, relroot):
546 if show:
547 print()
548 print(relfile)
549 print('-------------------------------------------')
550 if (reason := check()):
551 print(reason)
552 continue
553 yield filename, relfile
554
555
556 def _iter_filenames(filenames, process, relroot):
557 if process is None:
558 yield from fsutil.process_filenames(filenames, relroot=relroot)
559 return
560
561 onempty = Exception('no filenames provided')
562 items = process(filenames, relroot=relroot)
563 items, peeked = iterutil.peek_and_iter(items)
564 if not items:
565 raise onempty
566 if isinstance(peeked, str):
567 if relroot and relroot is not fsutil.USE_CWD:
568 relroot = os.path.abspath(relroot)
569 check = (lambda: True)
570 for filename, ismany in iterutil.iter_many(items, onempty):
571 relfile = fsutil.format_filename(filename, relroot, fixroot=False)
572 yield filename, relfile, check, ismany
573 elif len(peeked) == 4:
574 yield from items
575 else:
576 raise NotImplementedError
577
578
579 def track_progress_compact(items, *, groups=5, **mark_kwargs):
580 last = os.linesep
581 marks = iter_marks(groups=groups, **mark_kwargs)
582 for item in items:
583 last = next(marks)
584 print(last, end='', flush=True)
585 yield item
586 if not last.endswith(os.linesep):
587 print()
588
589
590 def track_progress_flat(items, fmt='<{}>'):
591 for item in items:
592 print(fmt.format(item), flush=True)
593 yield item
594
595
596 def iter_marks(mark='.', *, group=5, groups=2, lines=_NOT_SET, sep=' '):
597 mark = mark or ''
598 group = group if group and group > 1 else 1
599 groups = groups if groups and groups > 1 else 1
600
601 sep = f'{mark}{sep}' if sep else mark
602 end = f'{mark}{os.linesep}'
603 div = os.linesep
604 perline = group * groups
605 if lines is _NOT_SET:
606 # By default we try to put about 100 in each line group.
607 perlines = 100 // perline * perline
608 elif not lines or lines < 0:
609 perlines = None
610 else:
611 perlines = perline * lines
612
613 if perline == 1:
614 yield end
615 elif group == 1:
616 yield sep
617
618 count = 1
619 while True:
620 if count % perline == 0:
621 yield end
622 if perlines and count % perlines == 0:
623 yield div
624 elif count % group == 0:
625 yield sep
626 else:
627 yield mark
628 count += 1