(root)/
Python-3.12.0/
Tools/
c-analyzer/
c_common/
tables.py
       1  from collections import namedtuple
       2  import csv
       3  import re
       4  import textwrap
       5  
       6  from . import NOT_SET, strutil, fsutil
       7  
       8  
       9  EMPTY = '-'
      10  UNKNOWN = '???'
      11  
      12  
      13  def parse_markers(markers, default=None):
      14      if markers is NOT_SET:
      15          return default
      16      if not markers:
      17          return None
      18      if type(markers) is not str:
      19          return markers
      20      if markers == markers[0] * len(markers):
      21          return [markers]
      22      return list(markers)
      23  
      24  
      25  def fix_row(row, **markers):
      26      if isinstance(row, str):
      27          raise NotImplementedError(row)
      28      empty = parse_markers(markers.pop('empty', ('-',)))
      29      unknown = parse_markers(markers.pop('unknown', ('???',)))
      30      row = (val if val else None for val in row)
      31      if not empty:
      32          if unknown:
      33              row = (UNKNOWN if val in unknown else val for val in row)
      34      elif not unknown:
      35          row = (EMPTY if val in empty else val for val in row)
      36      else:
      37          row = (EMPTY if val in empty else (UNKNOWN if val in unknown else val)
      38                 for val in row)
      39      return tuple(row)
      40  
      41  
      42  def _fix_read_default(row):
      43      for value in row:
      44          yield value.strip()
      45  
      46  
      47  def _fix_write_default(row, empty=''):
      48      for value in row:
      49          yield empty if value is None else str(value)
      50  
      51  
      52  def _normalize_fix_read(fix):
      53      if fix is None:
      54          fix = ''
      55      if callable(fix):
      56          def fix_row(row):
      57              values = fix(row)
      58              return _fix_read_default(values)
      59      elif isinstance(fix, str):
      60          def fix_row(row):
      61              values = _fix_read_default(row)
      62              return (None if v == fix else v
      63                      for v in values)
      64      else:
      65          raise NotImplementedError(fix)
      66      return fix_row
      67  
      68  
      69  def _normalize_fix_write(fix, empty=''):
      70      if fix is None:
      71          fix = empty
      72      if callable(fix):
      73          def fix_row(row):
      74              values = fix(row)
      75              return _fix_write_default(values, empty)
      76      elif isinstance(fix, str):
      77          def fix_row(row):
      78              return _fix_write_default(row, fix)
      79      else:
      80          raise NotImplementedError(fix)
      81      return fix_row
      82  
      83  
      84  def read_table(infile, header, *,
      85                 sep='\t',
      86                 fix=None,
      87                 _open=open,
      88                 _get_reader=csv.reader,
      89                 ):
      90      """Yield each row of the given ???-separated (e.g. tab) file."""
      91      if isinstance(infile, str):
      92          with _open(infile, newline='') as infile:
      93              yield from read_table(
      94                  infile,
      95                  header,
      96                  sep=sep,
      97                  fix=fix,
      98                  _open=_open,
      99                  _get_reader=_get_reader,
     100              )
     101              return
     102      lines = strutil._iter_significant_lines(infile)
     103  
     104      # Validate the header.
     105      if not isinstance(header, str):
     106          header = sep.join(header)
     107      try:
     108          actualheader = next(lines).strip()
     109      except StopIteration:
     110          actualheader = ''
     111      if actualheader != header:
     112          raise ValueError(f'bad header {actualheader!r}')
     113  
     114      fix_row = _normalize_fix_read(fix)
     115      for row in _get_reader(lines, delimiter=sep or '\t'):
     116          yield tuple(fix_row(row))
     117  
     118  
     119  def write_table(outfile, header, rows, *,
     120                  sep='\t',
     121                  fix=None,
     122                  backup=True,
     123                  _open=open,
     124                  _get_writer=csv.writer,
     125                  ):
     126      """Write each of the rows to the given ???-separated (e.g. tab) file."""
     127      if backup:
     128          fsutil.create_backup(outfile, backup)
     129      if isinstance(outfile, str):
     130          with _open(outfile, 'w', newline='') as outfile:
     131              return write_table(
     132                  outfile,
     133                  header,
     134                  rows,
     135                  sep=sep,
     136                  fix=fix,
     137                  backup=backup,
     138                  _open=_open,
     139                  _get_writer=_get_writer,
     140              )
     141  
     142      if isinstance(header, str):
     143          header = header.split(sep or '\t')
     144      fix_row = _normalize_fix_write(fix)
     145      writer = _get_writer(outfile, delimiter=sep or '\t')
     146      writer.writerow(header)
     147      for row in rows:
     148          writer.writerow(
     149              tuple(fix_row(row))
     150          )
     151  
     152  
     153  def parse_table(entries, sep, header=None, rawsep=None, *,
     154                  default=NOT_SET,
     155                  strict=True,
     156                  ):
     157      header, sep = _normalize_table_file_props(header, sep)
     158      if not sep:
     159          raise ValueError('missing "sep"')
     160  
     161      ncols = None
     162      if header:
     163          if strict:
     164              ncols = len(header.split(sep))
     165          cur_file = None
     166      for line, filename in strutil.parse_entries(entries, ignoresep=sep):
     167          _sep = sep
     168          if filename:
     169              if header and cur_file != filename:
     170                  cur_file = filename
     171                  # Skip the first line if it's the header.
     172                  if line.strip() == header:
     173                      continue
     174                  else:
     175                      # We expected the header.
     176                      raise NotImplementedError((header, line))
     177          elif rawsep and sep not in line:
     178              _sep = rawsep
     179  
     180          row = _parse_row(line, _sep, ncols, default)
     181          if strict and not ncols:
     182              ncols = len(row)
     183          yield row, filename
     184  
     185  
     186  def parse_row(line, sep, *, ncols=None, default=NOT_SET):
     187      if not sep:
     188          raise ValueError('missing "sep"')
     189      return _parse_row(line, sep, ncols, default)
     190  
     191  
     192  def _parse_row(line, sep, ncols, default):
     193      row = tuple(v.strip() for v in line.split(sep))
     194      if (ncols or 0) > 0:
     195          diff = ncols - len(row)
     196          if diff:
     197              if default is NOT_SET or diff < 0:
     198                  raise Exception(f'bad row (expected {ncols} columns, got {row!r})')
     199              row += (default,) * diff
     200      return row
     201  
     202  
     203  def _normalize_table_file_props(header, sep):
     204      if not header:
     205          return None, sep
     206  
     207      if not isinstance(header, str):
     208          if not sep:
     209              raise NotImplementedError(header)
     210          header = sep.join(header)
     211      elif not sep:
     212          for sep in ('\t', ',', ' '):
     213              if sep in header:
     214                  break
     215          else:
     216              sep = None
     217      return header, sep
     218  
     219  
     220  ##################################
     221  # stdout tables
     222  
     223  WIDTH = 20
     224  
     225  
     226  def resolve_columns(specs):
     227      if isinstance(specs, str):
     228          specs = specs.replace(',', ' ').strip().split()
     229      resolved = []
     230      for raw in specs:
     231          column = ColumnSpec.from_raw(raw)
     232          resolved.append(column)
     233      return resolved
     234  
     235  
     236  def build_table(specs, *, sep=' ', defaultwidth=None):
     237      columns = resolve_columns(specs)
     238      return _build_table(columns, sep=sep, defaultwidth=defaultwidth)
     239  
     240  
     241  class ESC[4;38;5;81mColumnSpec(ESC[4;38;5;149mnamedtuple('ColumnSpec', 'field label fmt')):
     242  
     243      REGEX = re.compile(textwrap.dedent(r'''
     244          ^
     245          (?:
     246              \[
     247              (
     248                  (?: [^\s\]] [^\]]* )?
     249                  [^\s\]]
     250              )  # <label>
     251              ]
     252          )?
     253          ( [-\w]+ )  # <field>
     254          (?:
     255              (?:
     256                  :
     257                  ( [<^>] )  # <align>
     258                  ( \d+ )?  # <width1>
     259              )
     260              |
     261              (?:
     262                  (?:
     263                      :
     264                      ( \d+ )  # <width2>
     265                  )?
     266                  (?:
     267                      :
     268                      ( .*? )  # <fmt>
     269                  )?
     270              )
     271          )?
     272          $
     273      '''), re.VERBOSE)
     274  
     275      @classmethod
     276      def from_raw(cls, raw):
     277          if not raw:
     278              raise ValueError('missing column spec')
     279          elif isinstance(raw, cls):
     280              return raw
     281  
     282          if isinstance(raw, str):
     283              *values, _ = cls._parse(raw)
     284          else:
     285              *values, _ = cls._normalize(raw)
     286          if values is None:
     287              raise ValueError(f'unsupported column spec {raw!r}')
     288          return cls(*values)
     289  
     290      @classmethod
     291      def parse(cls, specstr):
     292          parsed = cls._parse(specstr)
     293          if not parsed:
     294              return None
     295          *values, _ = parsed
     296          return cls(*values)
     297  
     298      @classmethod
     299      def _parse(cls, specstr):
     300          m = cls.REGEX.match(specstr)
     301          if not m:
     302              return None
     303          (label, field,
     304           align, width1,
     305           width2, fmt,
     306           ) = m.groups()
     307          if not label:
     308              label = field
     309          if fmt:
     310              assert not align and not width1, (specstr,)
     311              _parsed = _parse_fmt(fmt)
     312              if not _parsed:
     313                  raise NotImplementedError
     314              elif width2:
     315                  width, _ = _parsed
     316                  if width != int(width2):
     317                      raise NotImplementedError(specstr)
     318          elif width2:
     319              fmt = width2
     320              width = int(width2)
     321          else:
     322              assert not fmt, (fmt, specstr)
     323              if align:
     324                  width = int(width1) if width1 else len(label)
     325                  fmt = f'{align}{width}'
     326              else:
     327                  width = None
     328          return field, label, fmt, width
     329  
     330      @classmethod
     331      def _normalize(cls, spec):
     332          if len(spec) == 1:
     333              raw, = spec
     334              raise NotImplementedError
     335              return _resolve_column(raw)
     336  
     337          if len(spec) == 4:
     338              label, field, width, fmt = spec
     339              if width:
     340                  if not fmt:
     341                      fmt = str(width)
     342                  elif _parse_fmt(fmt)[0] != width:
     343                      raise ValueError(f'width mismatch in {spec}')
     344          elif len(raw) == 3:
     345              label, field, fmt = spec
     346              if not field:
     347                  label, field = None, label
     348              elif not isinstance(field, str) or not field.isidentifier():
     349                  # XXX This doesn't seem right...
     350                  fmt = f'{field}:{fmt}' if fmt else field
     351                  label, field = None, label
     352          elif len(raw) == 2:
     353              label = None
     354              field, fmt = raw
     355              if not field:
     356                  field, fmt = fmt, None
     357              elif not field.isidentifier() or fmt.isidentifier():
     358                  label, field = field, fmt
     359          else:
     360              raise NotImplementedError
     361  
     362          fmt = f':{fmt}' if fmt else ''
     363          if label:
     364              return cls._parse(f'[{label}]{field}{fmt}')
     365          else:
     366              return cls._parse(f'{field}{fmt}')
     367  
     368      @property
     369      def width(self):
     370          if not self.fmt:
     371              return None
     372          parsed = _parse_fmt(self.fmt)
     373          if not parsed:
     374              return None
     375          width, _ = parsed
     376          return width
     377  
     378      def resolve_width(self, default=None):
     379          return _resolve_width(self.width, self.fmt, self.label, default)
     380  
     381  
     382  def _parse_fmt(fmt):
     383      if fmt.startswith(tuple('<^>')):
     384          align = fmt[0]
     385          width = fmt[1:]
     386          if width.isdigit():
     387              return int(width), align
     388      elif fmt.isdigit():
     389          return int(fmt), '<'
     390      return None
     391  
     392  
     393  def _resolve_width(width, fmt, label, default):
     394      if width:
     395          if not isinstance(width, int):
     396              raise NotImplementedError
     397          return width
     398      elif fmt:
     399          parsed = _parse_fmt(fmt)
     400          if parsed:
     401              width, _ = parsed
     402              if width:
     403                  return width
     404  
     405      if not default:
     406          return WIDTH
     407      elif hasattr(default, 'get'):
     408          defaults = default
     409          default = defaults.get(None) or WIDTH
     410          return defaults.get(label) or default
     411      else:
     412          return default or WIDTH
     413  
     414  
     415  def _build_table(columns, *, sep=' ', defaultwidth=None):
     416      header = []
     417      div = []
     418      rowfmt = []
     419      for spec in columns:
     420          width = spec.resolve_width(defaultwidth)
     421          colfmt = spec.fmt
     422          colfmt = f':{spec.fmt}' if spec.fmt else f':{width}'
     423  
     424          header.append(f' {{:^{width}}} '.format(spec.label))
     425          div.append('-' * (width + 2))
     426          rowfmt.append(f' {{{spec.field}{colfmt}}} ')
     427      return (
     428          sep.join(header),
     429          sep.join(div),
     430          sep.join(rowfmt),
     431      )