1
2 """
3 csv.py - read/write/investigate CSV files
4 """
5
6 import re
7 import types
8 from _csv import Error, __version__, writer, reader, register_dialect, \
9 unregister_dialect, get_dialect, list_dialects, \
10 field_size_limit, \
11 QUOTE_MINIMAL, QUOTE_ALL, QUOTE_NONNUMERIC, QUOTE_NONE, \
12 QUOTE_STRINGS, QUOTE_NOTNULL, \
13 __doc__
14 from _csv import Dialect as _Dialect
15
16 from io import StringIO
17
18 __all__ = ["QUOTE_MINIMAL", "QUOTE_ALL", "QUOTE_NONNUMERIC", "QUOTE_NONE",
19 "QUOTE_STRINGS", "QUOTE_NOTNULL",
20 "Error", "Dialect", "__doc__", "excel", "excel_tab",
21 "field_size_limit", "reader", "writer",
22 "register_dialect", "get_dialect", "list_dialects", "Sniffer",
23 "unregister_dialect", "__version__", "DictReader", "DictWriter",
24 "unix_dialect"]
25
26 class ESC[4;38;5;81mDialect:
27 """Describe a CSV dialect.
28
29 This must be subclassed (see csv.excel). Valid attributes are:
30 delimiter, quotechar, escapechar, doublequote, skipinitialspace,
31 lineterminator, quoting.
32
33 """
34 _name = ""
35 _valid = False
36 # placeholders
37 delimiter = None
38 quotechar = None
39 escapechar = None
40 doublequote = None
41 skipinitialspace = None
42 lineterminator = None
43 quoting = None
44
45 def __init__(self):
46 if self.__class__ != Dialect:
47 self._valid = True
48 self._validate()
49
50 def _validate(self):
51 try:
52 _Dialect(self)
53 except TypeError as e:
54 # We do this for compatibility with py2.3
55 raise Error(str(e))
56
57 class ESC[4;38;5;81mexcel(ESC[4;38;5;149mDialect):
58 """Describe the usual properties of Excel-generated CSV files."""
59 delimiter = ','
60 quotechar = '"'
61 doublequote = True
62 skipinitialspace = False
63 lineterminator = '\r\n'
64 quoting = QUOTE_MINIMAL
65 register_dialect("excel", excel)
66
67 class ESC[4;38;5;81mexcel_tab(ESC[4;38;5;149mexcel):
68 """Describe the usual properties of Excel-generated TAB-delimited files."""
69 delimiter = '\t'
70 register_dialect("excel-tab", excel_tab)
71
72 class ESC[4;38;5;81munix_dialect(ESC[4;38;5;149mDialect):
73 """Describe the usual properties of Unix-generated CSV files."""
74 delimiter = ','
75 quotechar = '"'
76 doublequote = True
77 skipinitialspace = False
78 lineterminator = '\n'
79 quoting = QUOTE_ALL
80 register_dialect("unix", unix_dialect)
81
82
83 class ESC[4;38;5;81mDictReader:
84 def __init__(self, f, fieldnames=None, restkey=None, restval=None,
85 dialect="excel", *args, **kwds):
86 if fieldnames is not None and iter(fieldnames) is fieldnames:
87 fieldnames = list(fieldnames)
88 self._fieldnames = fieldnames # list of keys for the dict
89 self.restkey = restkey # key to catch long rows
90 self.restval = restval # default value for short rows
91 self.reader = reader(f, dialect, *args, **kwds)
92 self.dialect = dialect
93 self.line_num = 0
94
95 def __iter__(self):
96 return self
97
98 @property
99 def fieldnames(self):
100 if self._fieldnames is None:
101 try:
102 self._fieldnames = next(self.reader)
103 except StopIteration:
104 pass
105 self.line_num = self.reader.line_num
106 return self._fieldnames
107
108 @fieldnames.setter
109 def fieldnames(self, value):
110 self._fieldnames = value
111
112 def __next__(self):
113 if self.line_num == 0:
114 # Used only for its side effect.
115 self.fieldnames
116 row = next(self.reader)
117 self.line_num = self.reader.line_num
118
119 # unlike the basic reader, we prefer not to return blanks,
120 # because we will typically wind up with a dict full of None
121 # values
122 while row == []:
123 row = next(self.reader)
124 d = dict(zip(self.fieldnames, row))
125 lf = len(self.fieldnames)
126 lr = len(row)
127 if lf < lr:
128 d[self.restkey] = row[lf:]
129 elif lf > lr:
130 for key in self.fieldnames[lr:]:
131 d[key] = self.restval
132 return d
133
134 __class_getitem__ = classmethod(types.GenericAlias)
135
136
137 class ESC[4;38;5;81mDictWriter:
138 def __init__(self, f, fieldnames, restval="", extrasaction="raise",
139 dialect="excel", *args, **kwds):
140 if fieldnames is not None and iter(fieldnames) is fieldnames:
141 fieldnames = list(fieldnames)
142 self.fieldnames = fieldnames # list of keys for the dict
143 self.restval = restval # for writing short dicts
144 extrasaction = extrasaction.lower()
145 if extrasaction not in ("raise", "ignore"):
146 raise ValueError("extrasaction (%s) must be 'raise' or 'ignore'"
147 % extrasaction)
148 self.extrasaction = extrasaction
149 self.writer = writer(f, dialect, *args, **kwds)
150
151 def writeheader(self):
152 header = dict(zip(self.fieldnames, self.fieldnames))
153 return self.writerow(header)
154
155 def _dict_to_list(self, rowdict):
156 if self.extrasaction == "raise":
157 wrong_fields = rowdict.keys() - self.fieldnames
158 if wrong_fields:
159 raise ValueError("dict contains fields not in fieldnames: "
160 + ", ".join([repr(x) for x in wrong_fields]))
161 return (rowdict.get(key, self.restval) for key in self.fieldnames)
162
163 def writerow(self, rowdict):
164 return self.writer.writerow(self._dict_to_list(rowdict))
165
166 def writerows(self, rowdicts):
167 return self.writer.writerows(map(self._dict_to_list, rowdicts))
168
169 __class_getitem__ = classmethod(types.GenericAlias)
170
171
172 class ESC[4;38;5;81mSniffer:
173 '''
174 "Sniffs" the format of a CSV file (i.e. delimiter, quotechar)
175 Returns a Dialect object.
176 '''
177 def __init__(self):
178 # in case there is more than one possible delimiter
179 self.preferred = [',', '\t', ';', ' ', ':']
180
181
182 def sniff(self, sample, delimiters=None):
183 """
184 Returns a dialect (or None) corresponding to the sample
185 """
186
187 quotechar, doublequote, delimiter, skipinitialspace = \
188 self._guess_quote_and_delimiter(sample, delimiters)
189 if not delimiter:
190 delimiter, skipinitialspace = self._guess_delimiter(sample,
191 delimiters)
192
193 if not delimiter:
194 raise Error("Could not determine delimiter")
195
196 class ESC[4;38;5;81mdialect(ESC[4;38;5;149mDialect):
197 _name = "sniffed"
198 lineterminator = '\r\n'
199 quoting = QUOTE_MINIMAL
200 # escapechar = ''
201
202 dialect.doublequote = doublequote
203 dialect.delimiter = delimiter
204 # _csv.reader won't accept a quotechar of ''
205 dialect.quotechar = quotechar or '"'
206 dialect.skipinitialspace = skipinitialspace
207
208 return dialect
209
210
211 def _guess_quote_and_delimiter(self, data, delimiters):
212 """
213 Looks for text enclosed between two identical quotes
214 (the probable quotechar) which are preceded and followed
215 by the same character (the probable delimiter).
216 For example:
217 ,'some text',
218 The quote with the most wins, same with the delimiter.
219 If there is no quotechar the delimiter can't be determined
220 this way.
221 """
222
223 matches = []
224 for restr in (r'(?P<delim>[^\w\n"\'])(?P<space> ?)(?P<quote>["\']).*?(?P=quote)(?P=delim)', # ,".*?",
225 r'(?:^|\n)(?P<quote>["\']).*?(?P=quote)(?P<delim>[^\w\n"\'])(?P<space> ?)', # ".*?",
226 r'(?P<delim>[^\w\n"\'])(?P<space> ?)(?P<quote>["\']).*?(?P=quote)(?:$|\n)', # ,".*?"
227 r'(?:^|\n)(?P<quote>["\']).*?(?P=quote)(?:$|\n)'): # ".*?" (no delim, no space)
228 regexp = re.compile(restr, re.DOTALL | re.MULTILINE)
229 matches = regexp.findall(data)
230 if matches:
231 break
232
233 if not matches:
234 # (quotechar, doublequote, delimiter, skipinitialspace)
235 return ('', False, None, 0)
236 quotes = {}
237 delims = {}
238 spaces = 0
239 groupindex = regexp.groupindex
240 for m in matches:
241 n = groupindex['quote'] - 1
242 key = m[n]
243 if key:
244 quotes[key] = quotes.get(key, 0) + 1
245 try:
246 n = groupindex['delim'] - 1
247 key = m[n]
248 except KeyError:
249 continue
250 if key and (delimiters is None or key in delimiters):
251 delims[key] = delims.get(key, 0) + 1
252 try:
253 n = groupindex['space'] - 1
254 except KeyError:
255 continue
256 if m[n]:
257 spaces += 1
258
259 quotechar = max(quotes, key=quotes.get)
260
261 if delims:
262 delim = max(delims, key=delims.get)
263 skipinitialspace = delims[delim] == spaces
264 if delim == '\n': # most likely a file with a single column
265 delim = ''
266 else:
267 # there is *no* delimiter, it's a single column of quoted data
268 delim = ''
269 skipinitialspace = 0
270
271 # if we see an extra quote between delimiters, we've got a
272 # double quoted format
273 dq_regexp = re.compile(
274 r"((%(delim)s)|^)\W*%(quote)s[^%(delim)s\n]*%(quote)s[^%(delim)s\n]*%(quote)s\W*((%(delim)s)|$)" % \
275 {'delim':re.escape(delim), 'quote':quotechar}, re.MULTILINE)
276
277
278
279 if dq_regexp.search(data):
280 doublequote = True
281 else:
282 doublequote = False
283
284 return (quotechar, doublequote, delim, skipinitialspace)
285
286
287 def _guess_delimiter(self, data, delimiters):
288 """
289 The delimiter /should/ occur the same number of times on
290 each row. However, due to malformed data, it may not. We don't want
291 an all or nothing approach, so we allow for small variations in this
292 number.
293 1) build a table of the frequency of each character on every line.
294 2) build a table of frequencies of this frequency (meta-frequency?),
295 e.g. 'x occurred 5 times in 10 rows, 6 times in 1000 rows,
296 7 times in 2 rows'
297 3) use the mode of the meta-frequency to determine the /expected/
298 frequency for that character
299 4) find out how often the character actually meets that goal
300 5) the character that best meets its goal is the delimiter
301 For performance reasons, the data is evaluated in chunks, so it can
302 try and evaluate the smallest portion of the data possible, evaluating
303 additional chunks as necessary.
304 """
305
306 data = list(filter(None, data.split('\n')))
307
308 ascii = [chr(c) for c in range(127)] # 7-bit ASCII
309
310 # build frequency tables
311 chunkLength = min(10, len(data))
312 iteration = 0
313 charFrequency = {}
314 modes = {}
315 delims = {}
316 start, end = 0, chunkLength
317 while start < len(data):
318 iteration += 1
319 for line in data[start:end]:
320 for char in ascii:
321 metaFrequency = charFrequency.get(char, {})
322 # must count even if frequency is 0
323 freq = line.count(char)
324 # value is the mode
325 metaFrequency[freq] = metaFrequency.get(freq, 0) + 1
326 charFrequency[char] = metaFrequency
327
328 for char in charFrequency.keys():
329 items = list(charFrequency[char].items())
330 if len(items) == 1 and items[0][0] == 0:
331 continue
332 # get the mode of the frequencies
333 if len(items) > 1:
334 modes[char] = max(items, key=lambda x: x[1])
335 # adjust the mode - subtract the sum of all
336 # other frequencies
337 items.remove(modes[char])
338 modes[char] = (modes[char][0], modes[char][1]
339 - sum(item[1] for item in items))
340 else:
341 modes[char] = items[0]
342
343 # build a list of possible delimiters
344 modeList = modes.items()
345 total = float(min(chunkLength * iteration, len(data)))
346 # (rows of consistent data) / (number of rows) = 100%
347 consistency = 1.0
348 # minimum consistency threshold
349 threshold = 0.9
350 while len(delims) == 0 and consistency >= threshold:
351 for k, v in modeList:
352 if v[0] > 0 and v[1] > 0:
353 if ((v[1]/total) >= consistency and
354 (delimiters is None or k in delimiters)):
355 delims[k] = v
356 consistency -= 0.01
357
358 if len(delims) == 1:
359 delim = list(delims.keys())[0]
360 skipinitialspace = (data[0].count(delim) ==
361 data[0].count("%c " % delim))
362 return (delim, skipinitialspace)
363
364 # analyze another chunkLength lines
365 start = end
366 end += chunkLength
367
368 if not delims:
369 return ('', 0)
370
371 # if there's more than one, fall back to a 'preferred' list
372 if len(delims) > 1:
373 for d in self.preferred:
374 if d in delims.keys():
375 skipinitialspace = (data[0].count(d) ==
376 data[0].count("%c " % d))
377 return (d, skipinitialspace)
378
379 # nothing else indicates a preference, pick the character that
380 # dominates(?)
381 items = [(v,k) for (k,v) in delims.items()]
382 items.sort()
383 delim = items[-1][1]
384
385 skipinitialspace = (data[0].count(delim) ==
386 data[0].count("%c " % delim))
387 return (delim, skipinitialspace)
388
389
390 def has_header(self, sample):
391 # Creates a dictionary of types of data in each column. If any
392 # column is of a single type (say, integers), *except* for the first
393 # row, then the first row is presumed to be labels. If the type
394 # can't be determined, it is assumed to be a string in which case
395 # the length of the string is the determining factor: if all of the
396 # rows except for the first are the same length, it's a header.
397 # Finally, a 'vote' is taken at the end for each column, adding or
398 # subtracting from the likelihood of the first row being a header.
399
400 rdr = reader(StringIO(sample), self.sniff(sample))
401
402 header = next(rdr) # assume first row is header
403
404 columns = len(header)
405 columnTypes = {}
406 for i in range(columns): columnTypes[i] = None
407
408 checked = 0
409 for row in rdr:
410 # arbitrary number of rows to check, to keep it sane
411 if checked > 20:
412 break
413 checked += 1
414
415 if len(row) != columns:
416 continue # skip rows that have irregular number of columns
417
418 for col in list(columnTypes.keys()):
419 thisType = complex
420 try:
421 thisType(row[col])
422 except (ValueError, OverflowError):
423 # fallback to length of string
424 thisType = len(row[col])
425
426 if thisType != columnTypes[col]:
427 if columnTypes[col] is None: # add new column type
428 columnTypes[col] = thisType
429 else:
430 # type is inconsistent, remove column from
431 # consideration
432 del columnTypes[col]
433
434 # finally, compare results against first row and "vote"
435 # on whether it's a header
436 hasHeader = 0
437 for col, colType in columnTypes.items():
438 if isinstance(colType, int): # it's a length
439 if len(header[col]) != colType:
440 hasHeader += 1
441 else:
442 hasHeader -= 1
443 else: # attempt typecast
444 try:
445 colType(header[col])
446 except (ValueError, TypeError):
447 hasHeader += 1
448 else:
449 hasHeader -= 1
450
451 return hasHeader > 0