1 from collections import namedtuple
2 import csv
3 import re
4 import textwrap
5
6 from . import NOT_SET, strutil, fsutil
7
8
9 EMPTY = '-'
10 UNKNOWN = '???'
11
12
13 def parse_markers(markers, default=None):
14 if markers is NOT_SET:
15 return default
16 if not markers:
17 return None
18 if type(markers) is not str:
19 return markers
20 if markers == markers[0] * len(markers):
21 return [markers]
22 return list(markers)
23
24
25 def fix_row(row, **markers):
26 if isinstance(row, str):
27 raise NotImplementedError(row)
28 empty = parse_markers(markers.pop('empty', ('-',)))
29 unknown = parse_markers(markers.pop('unknown', ('???',)))
30 row = (val if val else None for val in row)
31 if not empty:
32 if unknown:
33 row = (UNKNOWN if val in unknown else val for val in row)
34 elif not unknown:
35 row = (EMPTY if val in empty else val for val in row)
36 else:
37 row = (EMPTY if val in empty else (UNKNOWN if val in unknown else val)
38 for val in row)
39 return tuple(row)
40
41
42 def _fix_read_default(row):
43 for value in row:
44 yield value.strip()
45
46
47 def _fix_write_default(row, empty=''):
48 for value in row:
49 yield empty if value is None else str(value)
50
51
52 def _normalize_fix_read(fix):
53 if fix is None:
54 fix = ''
55 if callable(fix):
56 def fix_row(row):
57 values = fix(row)
58 return _fix_read_default(values)
59 elif isinstance(fix, str):
60 def fix_row(row):
61 values = _fix_read_default(row)
62 return (None if v == fix else v
63 for v in values)
64 else:
65 raise NotImplementedError(fix)
66 return fix_row
67
68
69 def _normalize_fix_write(fix, empty=''):
70 if fix is None:
71 fix = empty
72 if callable(fix):
73 def fix_row(row):
74 values = fix(row)
75 return _fix_write_default(values, empty)
76 elif isinstance(fix, str):
77 def fix_row(row):
78 return _fix_write_default(row, fix)
79 else:
80 raise NotImplementedError(fix)
81 return fix_row
82
83
84 def read_table(infile, header, *,
85 sep='\t',
86 fix=None,
87 _open=open,
88 _get_reader=csv.reader,
89 ):
90 """Yield each row of the given ???-separated (e.g. tab) file."""
91 if isinstance(infile, str):
92 with _open(infile, newline='') as infile:
93 yield from read_table(
94 infile,
95 header,
96 sep=sep,
97 fix=fix,
98 _open=_open,
99 _get_reader=_get_reader,
100 )
101 return
102 lines = strutil._iter_significant_lines(infile)
103
104 # Validate the header.
105 if not isinstance(header, str):
106 header = sep.join(header)
107 try:
108 actualheader = next(lines).strip()
109 except StopIteration:
110 actualheader = ''
111 if actualheader != header:
112 raise ValueError(f'bad header {actualheader!r}')
113
114 fix_row = _normalize_fix_read(fix)
115 for row in _get_reader(lines, delimiter=sep or '\t'):
116 yield tuple(fix_row(row))
117
118
119 def write_table(outfile, header, rows, *,
120 sep='\t',
121 fix=None,
122 backup=True,
123 _open=open,
124 _get_writer=csv.writer,
125 ):
126 """Write each of the rows to the given ???-separated (e.g. tab) file."""
127 if backup:
128 fsutil.create_backup(outfile, backup)
129 if isinstance(outfile, str):
130 with _open(outfile, 'w', newline='') as outfile:
131 return write_table(
132 outfile,
133 header,
134 rows,
135 sep=sep,
136 fix=fix,
137 backup=backup,
138 _open=_open,
139 _get_writer=_get_writer,
140 )
141
142 if isinstance(header, str):
143 header = header.split(sep or '\t')
144 fix_row = _normalize_fix_write(fix)
145 writer = _get_writer(outfile, delimiter=sep or '\t')
146 writer.writerow(header)
147 for row in rows:
148 writer.writerow(
149 tuple(fix_row(row))
150 )
151
152
153 def parse_table(entries, sep, header=None, rawsep=None, *,
154 default=NOT_SET,
155 strict=True,
156 ):
157 header, sep = _normalize_table_file_props(header, sep)
158 if not sep:
159 raise ValueError('missing "sep"')
160
161 ncols = None
162 if header:
163 if strict:
164 ncols = len(header.split(sep))
165 cur_file = None
166 for line, filename in strutil.parse_entries(entries, ignoresep=sep):
167 _sep = sep
168 if filename:
169 if header and cur_file != filename:
170 cur_file = filename
171 # Skip the first line if it's the header.
172 if line.strip() == header:
173 continue
174 else:
175 # We expected the header.
176 raise NotImplementedError((header, line))
177 elif rawsep and sep not in line:
178 _sep = rawsep
179
180 row = _parse_row(line, _sep, ncols, default)
181 if strict and not ncols:
182 ncols = len(row)
183 yield row, filename
184
185
186 def parse_row(line, sep, *, ncols=None, default=NOT_SET):
187 if not sep:
188 raise ValueError('missing "sep"')
189 return _parse_row(line, sep, ncols, default)
190
191
192 def _parse_row(line, sep, ncols, default):
193 row = tuple(v.strip() for v in line.split(sep))
194 if (ncols or 0) > 0:
195 diff = ncols - len(row)
196 if diff:
197 if default is NOT_SET or diff < 0:
198 raise Exception(f'bad row (expected {ncols} columns, got {row!r})')
199 row += (default,) * diff
200 return row
201
202
203 def _normalize_table_file_props(header, sep):
204 if not header:
205 return None, sep
206
207 if not isinstance(header, str):
208 if not sep:
209 raise NotImplementedError(header)
210 header = sep.join(header)
211 elif not sep:
212 for sep in ('\t', ',', ' '):
213 if sep in header:
214 break
215 else:
216 sep = None
217 return header, sep
218
219
220 ##################################
221 # stdout tables
222
223 WIDTH = 20
224
225
226 def resolve_columns(specs):
227 if isinstance(specs, str):
228 specs = specs.replace(',', ' ').strip().split()
229 resolved = []
230 for raw in specs:
231 column = ColumnSpec.from_raw(raw)
232 resolved.append(column)
233 return resolved
234
235
236 def build_table(specs, *, sep=' ', defaultwidth=None):
237 columns = resolve_columns(specs)
238 return _build_table(columns, sep=sep, defaultwidth=defaultwidth)
239
240
241 class ESC[4;38;5;81mColumnSpec(ESC[4;38;5;149mnamedtuple('ColumnSpec', 'field label fmt')):
242
243 REGEX = re.compile(textwrap.dedent(r'''
244 ^
245 (?:
246 \[
247 (
248 (?: [^\s\]] [^\]]* )?
249 [^\s\]]
250 ) # <label>
251 ]
252 )?
253 ( [-\w]+ ) # <field>
254 (?:
255 (?:
256 :
257 ( [<^>] ) # <align>
258 ( \d+ )? # <width1>
259 )
260 |
261 (?:
262 (?:
263 :
264 ( \d+ ) # <width2>
265 )?
266 (?:
267 :
268 ( .*? ) # <fmt>
269 )?
270 )
271 )?
272 $
273 '''), re.VERBOSE)
274
275 @classmethod
276 def from_raw(cls, raw):
277 if not raw:
278 raise ValueError('missing column spec')
279 elif isinstance(raw, cls):
280 return raw
281
282 if isinstance(raw, str):
283 *values, _ = cls._parse(raw)
284 else:
285 *values, _ = cls._normalize(raw)
286 if values is None:
287 raise ValueError(f'unsupported column spec {raw!r}')
288 return cls(*values)
289
290 @classmethod
291 def parse(cls, specstr):
292 parsed = cls._parse(specstr)
293 if not parsed:
294 return None
295 *values, _ = parsed
296 return cls(*values)
297
298 @classmethod
299 def _parse(cls, specstr):
300 m = cls.REGEX.match(specstr)
301 if not m:
302 return None
303 (label, field,
304 align, width1,
305 width2, fmt,
306 ) = m.groups()
307 if not label:
308 label = field
309 if fmt:
310 assert not align and not width1, (specstr,)
311 _parsed = _parse_fmt(fmt)
312 if not _parsed:
313 raise NotImplementedError
314 elif width2:
315 width, _ = _parsed
316 if width != int(width2):
317 raise NotImplementedError(specstr)
318 elif width2:
319 fmt = width2
320 width = int(width2)
321 else:
322 assert not fmt, (fmt, specstr)
323 if align:
324 width = int(width1) if width1 else len(label)
325 fmt = f'{align}{width}'
326 else:
327 width = None
328 return field, label, fmt, width
329
330 @classmethod
331 def _normalize(cls, spec):
332 if len(spec) == 1:
333 raw, = spec
334 raise NotImplementedError
335 return _resolve_column(raw)
336
337 if len(spec) == 4:
338 label, field, width, fmt = spec
339 if width:
340 if not fmt:
341 fmt = str(width)
342 elif _parse_fmt(fmt)[0] != width:
343 raise ValueError(f'width mismatch in {spec}')
344 elif len(raw) == 3:
345 label, field, fmt = spec
346 if not field:
347 label, field = None, label
348 elif not isinstance(field, str) or not field.isidentifier():
349 # XXX This doesn't seem right...
350 fmt = f'{field}:{fmt}' if fmt else field
351 label, field = None, label
352 elif len(raw) == 2:
353 label = None
354 field, fmt = raw
355 if not field:
356 field, fmt = fmt, None
357 elif not field.isidentifier() or fmt.isidentifier():
358 label, field = field, fmt
359 else:
360 raise NotImplementedError
361
362 fmt = f':{fmt}' if fmt else ''
363 if label:
364 return cls._parse(f'[{label}]{field}{fmt}')
365 else:
366 return cls._parse(f'{field}{fmt}')
367
368 @property
369 def width(self):
370 if not self.fmt:
371 return None
372 parsed = _parse_fmt(self.fmt)
373 if not parsed:
374 return None
375 width, _ = parsed
376 return width
377
378 def resolve_width(self, default=None):
379 return _resolve_width(self.width, self.fmt, self.label, default)
380
381
382 def _parse_fmt(fmt):
383 if fmt.startswith(tuple('<^>')):
384 align = fmt[0]
385 width = fmt[1:]
386 if width.isdigit():
387 return int(width), align
388 elif fmt.isdigit():
389 return int(fmt), '<'
390 return None
391
392
393 def _resolve_width(width, fmt, label, default):
394 if width:
395 if not isinstance(width, int):
396 raise NotImplementedError
397 return width
398 elif fmt:
399 parsed = _parse_fmt(fmt)
400 if parsed:
401 width, _ = parsed
402 if width:
403 return width
404
405 if not default:
406 return WIDTH
407 elif hasattr(default, 'get'):
408 defaults = default
409 default = defaults.get(None) or WIDTH
410 return defaults.get(label) or default
411 else:
412 return default or WIDTH
413
414
415 def _build_table(columns, *, sep=' ', defaultwidth=None):
416 header = []
417 div = []
418 rowfmt = []
419 for spec in columns:
420 width = spec.resolve_width(defaultwidth)
421 colfmt = spec.fmt
422 colfmt = f':{spec.fmt}' if spec.fmt else f':{width}'
423
424 header.append(f' {{:^{width}}} '.format(spec.label))
425 div.append('-' * (width + 2))
426 rowfmt.append(f' {{{spec.field}{colfmt}}} ')
427 return (
428 sep.join(header),
429 sep.join(div),
430 sep.join(rowfmt),
431 )