python (3.12.0)
1 # Copyright (C) 2001,2002 Python Software Foundation
2 # csv package unit tests
3
4 import copy
5 import sys
6 import unittest
7 from io import StringIO
8 from tempfile import TemporaryFile
9 import csv
10 import gc
11 import pickle
12 from test import support
13 from test.support import warnings_helper, import_helper, check_disallow_instantiation
14 from itertools import permutations
15 from textwrap import dedent
16 from collections import OrderedDict
17
18
19 class ESC[4;38;5;81mBadIterable:
20 def __iter__(self):
21 raise OSError
22
23
24 class ESC[4;38;5;81mTest_Csv(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
25 """
26 Test the underlying C csv parser in ways that are not appropriate
27 from the high level interface. Further tests of this nature are done
28 in TestDialectRegistry.
29 """
30 def _test_arg_valid(self, ctor, arg):
31 self.assertRaises(TypeError, ctor)
32 self.assertRaises(TypeError, ctor, None)
33 self.assertRaises(TypeError, ctor, arg, bad_attr = 0)
34 self.assertRaises(TypeError, ctor, arg, delimiter = 0)
35 self.assertRaises(TypeError, ctor, arg, delimiter = 'XX')
36 self.assertRaises(csv.Error, ctor, arg, 'foo')
37 self.assertRaises(TypeError, ctor, arg, delimiter=None)
38 self.assertRaises(TypeError, ctor, arg, delimiter=1)
39 self.assertRaises(TypeError, ctor, arg, quotechar=1)
40 self.assertRaises(TypeError, ctor, arg, lineterminator=None)
41 self.assertRaises(TypeError, ctor, arg, lineterminator=1)
42 self.assertRaises(TypeError, ctor, arg, quoting=None)
43 self.assertRaises(TypeError, ctor, arg,
44 quoting=csv.QUOTE_ALL, quotechar='')
45 self.assertRaises(TypeError, ctor, arg,
46 quoting=csv.QUOTE_ALL, quotechar=None)
47 self.assertRaises(TypeError, ctor, arg,
48 quoting=csv.QUOTE_NONE, quotechar='')
49
50 def test_reader_arg_valid(self):
51 self._test_arg_valid(csv.reader, [])
52 self.assertRaises(OSError, csv.reader, BadIterable())
53
54 def test_writer_arg_valid(self):
55 self._test_arg_valid(csv.writer, StringIO())
56 class ESC[4;38;5;81mBadWriter:
57 @property
58 def write(self):
59 raise OSError
60 self.assertRaises(OSError, csv.writer, BadWriter())
61
62 def _test_default_attrs(self, ctor, *args):
63 obj = ctor(*args)
64 # Check defaults
65 self.assertEqual(obj.dialect.delimiter, ',')
66 self.assertIs(obj.dialect.doublequote, True)
67 self.assertEqual(obj.dialect.escapechar, None)
68 self.assertEqual(obj.dialect.lineterminator, "\r\n")
69 self.assertEqual(obj.dialect.quotechar, '"')
70 self.assertEqual(obj.dialect.quoting, csv.QUOTE_MINIMAL)
71 self.assertIs(obj.dialect.skipinitialspace, False)
72 self.assertIs(obj.dialect.strict, False)
73 # Try deleting or changing attributes (they are read-only)
74 self.assertRaises(AttributeError, delattr, obj.dialect, 'delimiter')
75 self.assertRaises(AttributeError, setattr, obj.dialect, 'delimiter', ':')
76 self.assertRaises(AttributeError, delattr, obj.dialect, 'quoting')
77 self.assertRaises(AttributeError, setattr, obj.dialect,
78 'quoting', None)
79
80 def test_reader_attrs(self):
81 self._test_default_attrs(csv.reader, [])
82
83 def test_writer_attrs(self):
84 self._test_default_attrs(csv.writer, StringIO())
85
86 def _test_kw_attrs(self, ctor, *args):
87 # Now try with alternate options
88 kwargs = dict(delimiter=':', doublequote=False, escapechar='\\',
89 lineterminator='\r', quotechar='*',
90 quoting=csv.QUOTE_NONE, skipinitialspace=True,
91 strict=True)
92 obj = ctor(*args, **kwargs)
93 self.assertEqual(obj.dialect.delimiter, ':')
94 self.assertIs(obj.dialect.doublequote, False)
95 self.assertEqual(obj.dialect.escapechar, '\\')
96 self.assertEqual(obj.dialect.lineterminator, "\r")
97 self.assertEqual(obj.dialect.quotechar, '*')
98 self.assertEqual(obj.dialect.quoting, csv.QUOTE_NONE)
99 self.assertIs(obj.dialect.skipinitialspace, True)
100 self.assertIs(obj.dialect.strict, True)
101
102 def test_reader_kw_attrs(self):
103 self._test_kw_attrs(csv.reader, [])
104
105 def test_writer_kw_attrs(self):
106 self._test_kw_attrs(csv.writer, StringIO())
107
108 def _test_dialect_attrs(self, ctor, *args):
109 # Now try with dialect-derived options
110 class ESC[4;38;5;81mdialect:
111 delimiter='-'
112 doublequote=False
113 escapechar='^'
114 lineterminator='$'
115 quotechar='#'
116 quoting=csv.QUOTE_ALL
117 skipinitialspace=True
118 strict=False
119 args = args + (dialect,)
120 obj = ctor(*args)
121 self.assertEqual(obj.dialect.delimiter, '-')
122 self.assertIs(obj.dialect.doublequote, False)
123 self.assertEqual(obj.dialect.escapechar, '^')
124 self.assertEqual(obj.dialect.lineterminator, "$")
125 self.assertEqual(obj.dialect.quotechar, '#')
126 self.assertEqual(obj.dialect.quoting, csv.QUOTE_ALL)
127 self.assertIs(obj.dialect.skipinitialspace, True)
128 self.assertIs(obj.dialect.strict, False)
129
130 def test_reader_dialect_attrs(self):
131 self._test_dialect_attrs(csv.reader, [])
132
133 def test_writer_dialect_attrs(self):
134 self._test_dialect_attrs(csv.writer, StringIO())
135
136
137 def _write_test(self, fields, expect, **kwargs):
138 with TemporaryFile("w+", encoding="utf-8", newline='') as fileobj:
139 writer = csv.writer(fileobj, **kwargs)
140 writer.writerow(fields)
141 fileobj.seek(0)
142 self.assertEqual(fileobj.read(),
143 expect + writer.dialect.lineterminator)
144
145 def _write_error_test(self, exc, fields, **kwargs):
146 with TemporaryFile("w+", encoding="utf-8", newline='') as fileobj:
147 writer = csv.writer(fileobj, **kwargs)
148 with self.assertRaises(exc):
149 writer.writerow(fields)
150 fileobj.seek(0)
151 self.assertEqual(fileobj.read(), '')
152
153 def test_write_arg_valid(self):
154 self._write_error_test(csv.Error, None)
155 self._write_test((), '')
156 self._write_test([None], '""')
157 self._write_error_test(csv.Error, [None], quoting = csv.QUOTE_NONE)
158 # Check that exceptions are passed up the chain
159 self._write_error_test(OSError, BadIterable())
160 class ESC[4;38;5;81mBadList:
161 def __len__(self):
162 return 10
163 def __getitem__(self, i):
164 if i > 2:
165 raise OSError
166 self._write_error_test(OSError, BadList())
167 class ESC[4;38;5;81mBadItem:
168 def __str__(self):
169 raise OSError
170 self._write_error_test(OSError, [BadItem()])
171
172 def test_write_bigfield(self):
173 # This exercises the buffer realloc functionality
174 bigstring = 'X' * 50000
175 self._write_test([bigstring,bigstring], '%s,%s' % \
176 (bigstring, bigstring))
177
178 def test_write_quoting(self):
179 self._write_test(['a',1,'p,q'], 'a,1,"p,q"')
180 self._write_error_test(csv.Error, ['a',1,'p,q'],
181 quoting = csv.QUOTE_NONE)
182 self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
183 quoting = csv.QUOTE_MINIMAL)
184 self._write_test(['a',1,'p,q'], '"a",1,"p,q"',
185 quoting = csv.QUOTE_NONNUMERIC)
186 self._write_test(['a',1,'p,q'], '"a","1","p,q"',
187 quoting = csv.QUOTE_ALL)
188 self._write_test(['a\nb',1], '"a\nb","1"',
189 quoting = csv.QUOTE_ALL)
190 self._write_test(['a','',None,1], '"a","",,1',
191 quoting = csv.QUOTE_STRINGS)
192 self._write_test(['a','',None,1], '"a","",,"1"',
193 quoting = csv.QUOTE_NOTNULL)
194
195 def test_write_escape(self):
196 self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
197 escapechar='\\')
198 self._write_error_test(csv.Error, ['a',1,'p,"q"'],
199 escapechar=None, doublequote=False)
200 self._write_test(['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
201 escapechar='\\', doublequote = False)
202 self._write_test(['"'], '""""',
203 escapechar='\\', quoting = csv.QUOTE_MINIMAL)
204 self._write_test(['"'], '\\"',
205 escapechar='\\', quoting = csv.QUOTE_MINIMAL,
206 doublequote = False)
207 self._write_test(['"'], '\\"',
208 escapechar='\\', quoting = csv.QUOTE_NONE)
209 self._write_test(['a',1,'p,q'], 'a,1,p\\,q',
210 escapechar='\\', quoting = csv.QUOTE_NONE)
211 self._write_test(['\\', 'a'], '\\\\,a',
212 escapechar='\\', quoting=csv.QUOTE_NONE)
213 self._write_test(['\\', 'a'], '\\\\,a',
214 escapechar='\\', quoting=csv.QUOTE_MINIMAL)
215 self._write_test(['\\', 'a'], '"\\\\","a"',
216 escapechar='\\', quoting=csv.QUOTE_ALL)
217 self._write_test(['\\ ', 'a'], '\\\\ ,a',
218 escapechar='\\', quoting=csv.QUOTE_MINIMAL)
219 self._write_test(['\\,', 'a'], '\\\\\\,,a',
220 escapechar='\\', quoting=csv.QUOTE_NONE)
221 self._write_test([',\\', 'a'], '",\\\\",a',
222 escapechar='\\', quoting=csv.QUOTE_MINIMAL)
223 self._write_test(['C\\', '6', '7', 'X"'], 'C\\\\,6,7,"X"""',
224 escapechar='\\', quoting=csv.QUOTE_MINIMAL)
225
226 def test_write_lineterminator(self):
227 for lineterminator in '\r\n', '\n', '\r', '!@#', '\0':
228 with self.subTest(lineterminator=lineterminator):
229 with StringIO() as sio:
230 writer = csv.writer(sio, lineterminator=lineterminator)
231 writer.writerow(['a', 'b'])
232 writer.writerow([1, 2])
233 self.assertEqual(sio.getvalue(),
234 f'a,b{lineterminator}'
235 f'1,2{lineterminator}')
236
237 def test_write_iterable(self):
238 self._write_test(iter(['a', 1, 'p,q']), 'a,1,"p,q"')
239 self._write_test(iter(['a', 1, None]), 'a,1,')
240 self._write_test(iter([]), '')
241 self._write_test(iter([None]), '""')
242 self._write_error_test(csv.Error, iter([None]), quoting=csv.QUOTE_NONE)
243 self._write_test(iter([None, None]), ',')
244
245 def test_writerows(self):
246 class ESC[4;38;5;81mBrokenFile:
247 def write(self, buf):
248 raise OSError
249 writer = csv.writer(BrokenFile())
250 self.assertRaises(OSError, writer.writerows, [['a']])
251
252 with TemporaryFile("w+", encoding="utf-8", newline='') as fileobj:
253 writer = csv.writer(fileobj)
254 self.assertRaises(TypeError, writer.writerows, None)
255 writer.writerows([['a', 'b'], ['c', 'd']])
256 fileobj.seek(0)
257 self.assertEqual(fileobj.read(), "a,b\r\nc,d\r\n")
258
259 def test_writerows_with_none(self):
260 with TemporaryFile("w+", encoding="utf-8", newline='') as fileobj:
261 writer = csv.writer(fileobj)
262 writer.writerows([['a', None], [None, 'd']])
263 fileobj.seek(0)
264 self.assertEqual(fileobj.read(), "a,\r\n,d\r\n")
265
266 with TemporaryFile("w+", encoding="utf-8", newline='') as fileobj:
267 writer = csv.writer(fileobj)
268 writer.writerows([[None], ['a']])
269 fileobj.seek(0)
270 self.assertEqual(fileobj.read(), '""\r\na\r\n')
271
272 with TemporaryFile("w+", encoding="utf-8", newline='') as fileobj:
273 writer = csv.writer(fileobj)
274 writer.writerows([['a'], [None]])
275 fileobj.seek(0)
276 self.assertEqual(fileobj.read(), 'a\r\n""\r\n')
277
278 def test_writerows_errors(self):
279 with TemporaryFile("w+", encoding="utf-8", newline='') as fileobj:
280 writer = csv.writer(fileobj)
281 self.assertRaises(TypeError, writer.writerows, None)
282 self.assertRaises(OSError, writer.writerows, BadIterable())
283
284 @support.cpython_only
285 @support.requires_legacy_unicode_capi()
286 @warnings_helper.ignore_warnings(category=DeprecationWarning)
287 def test_writerows_legacy_strings(self):
288 import _testcapi
289 c = _testcapi.unicode_legacy_string('a')
290 with TemporaryFile("w+", encoding="utf-8", newline='') as fileobj:
291 writer = csv.writer(fileobj)
292 writer.writerows([[c]])
293 fileobj.seek(0)
294 self.assertEqual(fileobj.read(), "a\r\n")
295
296 def _read_test(self, input, expect, **kwargs):
297 reader = csv.reader(input, **kwargs)
298 result = list(reader)
299 self.assertEqual(result, expect)
300
301 def test_read_oddinputs(self):
302 self._read_test([], [])
303 self._read_test([''], [[]])
304 self.assertRaises(csv.Error, self._read_test,
305 ['"ab"c'], None, strict = 1)
306 self._read_test(['"ab"c'], [['abc']], doublequote = 0)
307
308 self.assertRaises(csv.Error, self._read_test,
309 [b'abc'], None)
310
311 def test_read_eol(self):
312 self._read_test(['a,b'], [['a','b']])
313 self._read_test(['a,b\n'], [['a','b']])
314 self._read_test(['a,b\r\n'], [['a','b']])
315 self._read_test(['a,b\r'], [['a','b']])
316 self.assertRaises(csv.Error, self._read_test, ['a,b\rc,d'], [])
317 self.assertRaises(csv.Error, self._read_test, ['a,b\nc,d'], [])
318 self.assertRaises(csv.Error, self._read_test, ['a,b\r\nc,d'], [])
319
320 def test_read_eof(self):
321 self._read_test(['a,"'], [['a', '']])
322 self._read_test(['"a'], [['a']])
323 self._read_test(['^'], [['\n']], escapechar='^')
324 self.assertRaises(csv.Error, self._read_test, ['a,"'], [], strict=True)
325 self.assertRaises(csv.Error, self._read_test, ['"a'], [], strict=True)
326 self.assertRaises(csv.Error, self._read_test,
327 ['^'], [], escapechar='^', strict=True)
328
329 def test_read_nul(self):
330 self._read_test(['\0'], [['\0']])
331 self._read_test(['a,\0b,c'], [['a', '\0b', 'c']])
332 self._read_test(['a,b\0,c'], [['a', 'b\0', 'c']])
333 self._read_test(['a,b\\\0,c'], [['a', 'b\0', 'c']], escapechar='\\')
334 self._read_test(['a,"\0b",c'], [['a', '\0b', 'c']])
335
336 def test_read_delimiter(self):
337 self._read_test(['a,b,c'], [['a', 'b', 'c']])
338 self._read_test(['a;b;c'], [['a', 'b', 'c']], delimiter=';')
339 self._read_test(['a\0b\0c'], [['a', 'b', 'c']], delimiter='\0')
340
341 def test_read_escape(self):
342 self._read_test(['a,\\b,c'], [['a', 'b', 'c']], escapechar='\\')
343 self._read_test(['a,b\\,c'], [['a', 'b,c']], escapechar='\\')
344 self._read_test(['a,"b\\,c"'], [['a', 'b,c']], escapechar='\\')
345 self._read_test(['a,"b,\\c"'], [['a', 'b,c']], escapechar='\\')
346 self._read_test(['a,"b,c\\""'], [['a', 'b,c"']], escapechar='\\')
347 self._read_test(['a,"b,c"\\'], [['a', 'b,c\\']], escapechar='\\')
348 self._read_test(['a,^b,c'], [['a', 'b', 'c']], escapechar='^')
349 self._read_test(['a,\0b,c'], [['a', 'b', 'c']], escapechar='\0')
350 self._read_test(['a,\\b,c'], [['a', '\\b', 'c']], escapechar=None)
351 self._read_test(['a,\\b,c'], [['a', '\\b', 'c']])
352
353 def test_read_quoting(self):
354 self._read_test(['1,",3,",5'], [['1', ',3,', '5']])
355 self._read_test(['1,",3,",5'], [['1', '"', '3', '"', '5']],
356 quotechar=None, escapechar='\\')
357 self._read_test(['1,",3,",5'], [['1', '"', '3', '"', '5']],
358 quoting=csv.QUOTE_NONE, escapechar='\\')
359 # will this fail where locale uses comma for decimals?
360 self._read_test([',3,"5",7.3, 9'], [['', 3, '5', 7.3, 9]],
361 quoting=csv.QUOTE_NONNUMERIC)
362 self._read_test(['"a\nb", 7'], [['a\nb', ' 7']])
363 self.assertRaises(ValueError, self._read_test,
364 ['abc,3'], [[]],
365 quoting=csv.QUOTE_NONNUMERIC)
366 self._read_test(['1,@,3,@,5'], [['1', ',3,', '5']], quotechar='@')
367 self._read_test(['1,\0,3,\0,5'], [['1', ',3,', '5']], quotechar='\0')
368
369 def test_read_skipinitialspace(self):
370 self._read_test(['no space, space, spaces,\ttab'],
371 [['no space', 'space', 'spaces', '\ttab']],
372 skipinitialspace=True)
373
374 def test_read_bigfield(self):
375 # This exercises the buffer realloc functionality and field size
376 # limits.
377 limit = csv.field_size_limit()
378 try:
379 size = 50000
380 bigstring = 'X' * size
381 bigline = '%s,%s' % (bigstring, bigstring)
382 self._read_test([bigline], [[bigstring, bigstring]])
383 csv.field_size_limit(size)
384 self._read_test([bigline], [[bigstring, bigstring]])
385 self.assertEqual(csv.field_size_limit(), size)
386 csv.field_size_limit(size-1)
387 self.assertRaises(csv.Error, self._read_test, [bigline], [])
388 self.assertRaises(TypeError, csv.field_size_limit, None)
389 self.assertRaises(TypeError, csv.field_size_limit, 1, None)
390 finally:
391 csv.field_size_limit(limit)
392
393 def test_read_linenum(self):
394 r = csv.reader(['line,1', 'line,2', 'line,3'])
395 self.assertEqual(r.line_num, 0)
396 next(r)
397 self.assertEqual(r.line_num, 1)
398 next(r)
399 self.assertEqual(r.line_num, 2)
400 next(r)
401 self.assertEqual(r.line_num, 3)
402 self.assertRaises(StopIteration, next, r)
403 self.assertEqual(r.line_num, 3)
404
405 def test_roundtrip_quoteed_newlines(self):
406 with TemporaryFile("w+", encoding="utf-8", newline='') as fileobj:
407 writer = csv.writer(fileobj)
408 rows = [['a\nb','b'],['c','x\r\nd']]
409 writer.writerows(rows)
410 fileobj.seek(0)
411 for i, row in enumerate(csv.reader(fileobj)):
412 self.assertEqual(row, rows[i])
413
414 def test_roundtrip_escaped_unquoted_newlines(self):
415 with TemporaryFile("w+", encoding="utf-8", newline='') as fileobj:
416 writer = csv.writer(fileobj,quoting=csv.QUOTE_NONE,escapechar="\\")
417 rows = [['a\nb','b'],['c','x\r\nd']]
418 writer.writerows(rows)
419 fileobj.seek(0)
420 for i, row in enumerate(csv.reader(fileobj,quoting=csv.QUOTE_NONE,escapechar="\\")):
421 self.assertEqual(row,rows[i])
422
423 class ESC[4;38;5;81mTestDialectRegistry(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
424 def test_registry_badargs(self):
425 self.assertRaises(TypeError, csv.list_dialects, None)
426 self.assertRaises(TypeError, csv.get_dialect)
427 self.assertRaises(csv.Error, csv.get_dialect, None)
428 self.assertRaises(csv.Error, csv.get_dialect, "nonesuch")
429 self.assertRaises(TypeError, csv.unregister_dialect)
430 self.assertRaises(csv.Error, csv.unregister_dialect, None)
431 self.assertRaises(csv.Error, csv.unregister_dialect, "nonesuch")
432 self.assertRaises(TypeError, csv.register_dialect, None)
433 self.assertRaises(TypeError, csv.register_dialect, None, None)
434 self.assertRaises(TypeError, csv.register_dialect, "nonesuch", 0, 0)
435 self.assertRaises(TypeError, csv.register_dialect, "nonesuch",
436 badargument=None)
437 self.assertRaises(TypeError, csv.register_dialect, "nonesuch",
438 quoting=None)
439 self.assertRaises(TypeError, csv.register_dialect, [])
440
441 def test_registry(self):
442 class ESC[4;38;5;81mmyexceltsv(ESC[4;38;5;149mcsvESC[4;38;5;149m.ESC[4;38;5;149mexcel):
443 delimiter = "\t"
444 name = "myexceltsv"
445 expected_dialects = csv.list_dialects() + [name]
446 expected_dialects.sort()
447 csv.register_dialect(name, myexceltsv)
448 self.addCleanup(csv.unregister_dialect, name)
449 self.assertEqual(csv.get_dialect(name).delimiter, '\t')
450 got_dialects = sorted(csv.list_dialects())
451 self.assertEqual(expected_dialects, got_dialects)
452
453 def test_register_kwargs(self):
454 name = 'fedcba'
455 csv.register_dialect(name, delimiter=';')
456 self.addCleanup(csv.unregister_dialect, name)
457 self.assertEqual(csv.get_dialect(name).delimiter, ';')
458 self.assertEqual([['X', 'Y', 'Z']], list(csv.reader(['X;Y;Z'], name)))
459
460 def test_register_kwargs_override(self):
461 class ESC[4;38;5;81mmydialect(ESC[4;38;5;149mcsvESC[4;38;5;149m.ESC[4;38;5;149mDialect):
462 delimiter = "\t"
463 quotechar = '"'
464 doublequote = True
465 skipinitialspace = False
466 lineterminator = '\r\n'
467 quoting = csv.QUOTE_MINIMAL
468
469 name = 'test_dialect'
470 csv.register_dialect(name, mydialect,
471 delimiter=';',
472 quotechar="'",
473 doublequote=False,
474 skipinitialspace=True,
475 lineterminator='\n',
476 quoting=csv.QUOTE_ALL)
477 self.addCleanup(csv.unregister_dialect, name)
478
479 # Ensure that kwargs do override attributes of a dialect class:
480 dialect = csv.get_dialect(name)
481 self.assertEqual(dialect.delimiter, ';')
482 self.assertEqual(dialect.quotechar, "'")
483 self.assertEqual(dialect.doublequote, False)
484 self.assertEqual(dialect.skipinitialspace, True)
485 self.assertEqual(dialect.lineterminator, '\n')
486 self.assertEqual(dialect.quoting, csv.QUOTE_ALL)
487
488 def test_incomplete_dialect(self):
489 class ESC[4;38;5;81mmyexceltsv(ESC[4;38;5;149mcsvESC[4;38;5;149m.ESC[4;38;5;149mDialect):
490 delimiter = "\t"
491 self.assertRaises(csv.Error, myexceltsv)
492
493 def test_space_dialect(self):
494 class ESC[4;38;5;81mspace(ESC[4;38;5;149mcsvESC[4;38;5;149m.ESC[4;38;5;149mexcel):
495 delimiter = " "
496 quoting = csv.QUOTE_NONE
497 escapechar = "\\"
498
499 with TemporaryFile("w+", encoding="utf-8") as fileobj:
500 fileobj.write("abc def\nc1ccccc1 benzene\n")
501 fileobj.seek(0)
502 reader = csv.reader(fileobj, dialect=space())
503 self.assertEqual(next(reader), ["abc", "def"])
504 self.assertEqual(next(reader), ["c1ccccc1", "benzene"])
505
506 def compare_dialect_123(self, expected, *writeargs, **kwwriteargs):
507
508 with TemporaryFile("w+", newline='', encoding="utf-8") as fileobj:
509
510 writer = csv.writer(fileobj, *writeargs, **kwwriteargs)
511 writer.writerow([1,2,3])
512 fileobj.seek(0)
513 self.assertEqual(fileobj.read(), expected)
514
515 def test_dialect_apply(self):
516 class ESC[4;38;5;81mtestA(ESC[4;38;5;149mcsvESC[4;38;5;149m.ESC[4;38;5;149mexcel):
517 delimiter = "\t"
518 class ESC[4;38;5;81mtestB(ESC[4;38;5;149mcsvESC[4;38;5;149m.ESC[4;38;5;149mexcel):
519 delimiter = ":"
520 class ESC[4;38;5;81mtestC(ESC[4;38;5;149mcsvESC[4;38;5;149m.ESC[4;38;5;149mexcel):
521 delimiter = "|"
522 class ESC[4;38;5;81mtestUni(ESC[4;38;5;149mcsvESC[4;38;5;149m.ESC[4;38;5;149mexcel):
523 delimiter = "\u039B"
524
525 class ESC[4;38;5;81munspecified():
526 # A class to pass as dialect but with no dialect attributes.
527 pass
528
529 csv.register_dialect('testC', testC)
530 try:
531 self.compare_dialect_123("1,2,3\r\n")
532 self.compare_dialect_123("1,2,3\r\n", dialect=None)
533 self.compare_dialect_123("1,2,3\r\n", dialect=unspecified)
534 self.compare_dialect_123("1\t2\t3\r\n", testA)
535 self.compare_dialect_123("1:2:3\r\n", dialect=testB())
536 self.compare_dialect_123("1|2|3\r\n", dialect='testC')
537 self.compare_dialect_123("1;2;3\r\n", dialect=testA,
538 delimiter=';')
539 self.compare_dialect_123("1\u039B2\u039B3\r\n",
540 dialect=testUni)
541
542 finally:
543 csv.unregister_dialect('testC')
544
545 def test_bad_dialect(self):
546 # Unknown parameter
547 self.assertRaises(TypeError, csv.reader, [], bad_attr = 0)
548 # Bad values
549 self.assertRaises(TypeError, csv.reader, [], delimiter = None)
550 self.assertRaises(TypeError, csv.reader, [], quoting = -1)
551 self.assertRaises(TypeError, csv.reader, [], quoting = 100)
552
553 def test_copy(self):
554 for name in csv.list_dialects():
555 dialect = csv.get_dialect(name)
556 self.assertRaises(TypeError, copy.copy, dialect)
557
558 def test_pickle(self):
559 for name in csv.list_dialects():
560 dialect = csv.get_dialect(name)
561 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
562 self.assertRaises(TypeError, pickle.dumps, dialect, proto)
563
564 class ESC[4;38;5;81mTestCsvBase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
565 def readerAssertEqual(self, input, expected_result):
566 with TemporaryFile("w+", encoding="utf-8", newline='') as fileobj:
567 fileobj.write(input)
568 fileobj.seek(0)
569 reader = csv.reader(fileobj, dialect = self.dialect)
570 fields = list(reader)
571 self.assertEqual(fields, expected_result)
572
573 def writerAssertEqual(self, input, expected_result):
574 with TemporaryFile("w+", encoding="utf-8", newline='') as fileobj:
575 writer = csv.writer(fileobj, dialect = self.dialect)
576 writer.writerows(input)
577 fileobj.seek(0)
578 self.assertEqual(fileobj.read(), expected_result)
579
580 class ESC[4;38;5;81mTestDialectExcel(ESC[4;38;5;149mTestCsvBase):
581 dialect = 'excel'
582
583 def test_single(self):
584 self.readerAssertEqual('abc', [['abc']])
585
586 def test_simple(self):
587 self.readerAssertEqual('1,2,3,4,5', [['1','2','3','4','5']])
588
589 def test_blankline(self):
590 self.readerAssertEqual('', [])
591
592 def test_empty_fields(self):
593 self.readerAssertEqual(',', [['', '']])
594
595 def test_singlequoted(self):
596 self.readerAssertEqual('""', [['']])
597
598 def test_singlequoted_left_empty(self):
599 self.readerAssertEqual('"",', [['','']])
600
601 def test_singlequoted_right_empty(self):
602 self.readerAssertEqual(',""', [['','']])
603
604 def test_single_quoted_quote(self):
605 self.readerAssertEqual('""""', [['"']])
606
607 def test_quoted_quotes(self):
608 self.readerAssertEqual('""""""', [['""']])
609
610 def test_inline_quote(self):
611 self.readerAssertEqual('a""b', [['a""b']])
612
613 def test_inline_quotes(self):
614 self.readerAssertEqual('a"b"c', [['a"b"c']])
615
616 def test_quotes_and_more(self):
617 # Excel would never write a field containing '"a"b', but when
618 # reading one, it will return 'ab'.
619 self.readerAssertEqual('"a"b', [['ab']])
620
621 def test_lone_quote(self):
622 self.readerAssertEqual('a"b', [['a"b']])
623
624 def test_quote_and_quote(self):
625 # Excel would never write a field containing '"a" "b"', but when
626 # reading one, it will return 'a "b"'.
627 self.readerAssertEqual('"a" "b"', [['a "b"']])
628
629 def test_space_and_quote(self):
630 self.readerAssertEqual(' "a"', [[' "a"']])
631
632 def test_quoted(self):
633 self.readerAssertEqual('1,2,3,"I think, therefore I am",5,6',
634 [['1', '2', '3',
635 'I think, therefore I am',
636 '5', '6']])
637
638 def test_quoted_quote(self):
639 self.readerAssertEqual('1,2,3,"""I see,"" said the blind man","as he picked up his hammer and saw"',
640 [['1', '2', '3',
641 '"I see," said the blind man',
642 'as he picked up his hammer and saw']])
643
644 def test_quoted_nl(self):
645 input = '''\
646 1,2,3,"""I see,""
647 said the blind man","as he picked up his
648 hammer and saw"
649 9,8,7,6'''
650 self.readerAssertEqual(input,
651 [['1', '2', '3',
652 '"I see,"\nsaid the blind man',
653 'as he picked up his\nhammer and saw'],
654 ['9','8','7','6']])
655
656 def test_dubious_quote(self):
657 self.readerAssertEqual('12,12,1",', [['12', '12', '1"', '']])
658
659 def test_null(self):
660 self.writerAssertEqual([], '')
661
662 def test_single_writer(self):
663 self.writerAssertEqual([['abc']], 'abc\r\n')
664
665 def test_simple_writer(self):
666 self.writerAssertEqual([[1, 2, 'abc', 3, 4]], '1,2,abc,3,4\r\n')
667
668 def test_quotes(self):
669 self.writerAssertEqual([[1, 2, 'a"bc"', 3, 4]], '1,2,"a""bc""",3,4\r\n')
670
671 def test_quote_fieldsep(self):
672 self.writerAssertEqual([['abc,def']], '"abc,def"\r\n')
673
674 def test_newlines(self):
675 self.writerAssertEqual([[1, 2, 'a\nbc', 3, 4]], '1,2,"a\nbc",3,4\r\n')
676
677 class ESC[4;38;5;81mEscapedExcel(ESC[4;38;5;149mcsvESC[4;38;5;149m.ESC[4;38;5;149mexcel):
678 quoting = csv.QUOTE_NONE
679 escapechar = '\\'
680
681 class ESC[4;38;5;81mTestEscapedExcel(ESC[4;38;5;149mTestCsvBase):
682 dialect = EscapedExcel()
683
684 def test_escape_fieldsep(self):
685 self.writerAssertEqual([['abc,def']], 'abc\\,def\r\n')
686
687 def test_read_escape_fieldsep(self):
688 self.readerAssertEqual('abc\\,def\r\n', [['abc,def']])
689
690 class ESC[4;38;5;81mTestDialectUnix(ESC[4;38;5;149mTestCsvBase):
691 dialect = 'unix'
692
693 def test_simple_writer(self):
694 self.writerAssertEqual([[1, 'abc def', 'abc']], '"1","abc def","abc"\n')
695
696 def test_simple_reader(self):
697 self.readerAssertEqual('"1","abc def","abc"\n', [['1', 'abc def', 'abc']])
698
699 class ESC[4;38;5;81mQuotedEscapedExcel(ESC[4;38;5;149mcsvESC[4;38;5;149m.ESC[4;38;5;149mexcel):
700 quoting = csv.QUOTE_NONNUMERIC
701 escapechar = '\\'
702
703 class ESC[4;38;5;81mTestQuotedEscapedExcel(ESC[4;38;5;149mTestCsvBase):
704 dialect = QuotedEscapedExcel()
705
706 def test_write_escape_fieldsep(self):
707 self.writerAssertEqual([['abc,def']], '"abc,def"\r\n')
708
709 def test_read_escape_fieldsep(self):
710 self.readerAssertEqual('"abc\\,def"\r\n', [['abc,def']])
711
712 class ESC[4;38;5;81mTestDictFields(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
713 ### "long" means the row is longer than the number of fieldnames
714 ### "short" means there are fewer elements in the row than fieldnames
715 def test_writeheader_return_value(self):
716 with TemporaryFile("w+", encoding="utf-8", newline='') as fileobj:
717 writer = csv.DictWriter(fileobj, fieldnames = ["f1", "f2", "f3"])
718 writeheader_return_value = writer.writeheader()
719 self.assertEqual(writeheader_return_value, 10)
720
721 def test_write_simple_dict(self):
722 with TemporaryFile("w+", encoding="utf-8", newline='') as fileobj:
723 writer = csv.DictWriter(fileobj, fieldnames = ["f1", "f2", "f3"])
724 writer.writeheader()
725 fileobj.seek(0)
726 self.assertEqual(fileobj.readline(), "f1,f2,f3\r\n")
727 writer.writerow({"f1": 10, "f3": "abc"})
728 fileobj.seek(0)
729 fileobj.readline() # header
730 self.assertEqual(fileobj.read(), "10,,abc\r\n")
731
732 def test_write_multiple_dict_rows(self):
733 fileobj = StringIO()
734 writer = csv.DictWriter(fileobj, fieldnames=["f1", "f2", "f3"])
735 writer.writeheader()
736 self.assertEqual(fileobj.getvalue(), "f1,f2,f3\r\n")
737 writer.writerows([{"f1": 1, "f2": "abc", "f3": "f"},
738 {"f1": 2, "f2": 5, "f3": "xyz"}])
739 self.assertEqual(fileobj.getvalue(),
740 "f1,f2,f3\r\n1,abc,f\r\n2,5,xyz\r\n")
741
742 def test_write_no_fields(self):
743 fileobj = StringIO()
744 self.assertRaises(TypeError, csv.DictWriter, fileobj)
745
746 def test_write_fields_not_in_fieldnames(self):
747 with TemporaryFile("w+", encoding="utf-8", newline='') as fileobj:
748 writer = csv.DictWriter(fileobj, fieldnames = ["f1", "f2", "f3"])
749 # Of special note is the non-string key (issue 19449)
750 with self.assertRaises(ValueError) as cx:
751 writer.writerow({"f4": 10, "f2": "spam", 1: "abc"})
752 exception = str(cx.exception)
753 self.assertIn("fieldnames", exception)
754 self.assertIn("'f4'", exception)
755 self.assertNotIn("'f2'", exception)
756 self.assertIn("1", exception)
757
758 def test_typo_in_extrasaction_raises_error(self):
759 fileobj = StringIO()
760 self.assertRaises(ValueError, csv.DictWriter, fileobj, ['f1', 'f2'],
761 extrasaction="raised")
762
763 def test_write_field_not_in_field_names_raise(self):
764 fileobj = StringIO()
765 writer = csv.DictWriter(fileobj, ['f1', 'f2'], extrasaction="raise")
766 dictrow = {'f0': 0, 'f1': 1, 'f2': 2, 'f3': 3}
767 self.assertRaises(ValueError, csv.DictWriter.writerow, writer, dictrow)
768
769 # see bpo-44512 (differently cased 'raise' should not result in 'ignore')
770 writer = csv.DictWriter(fileobj, ['f1', 'f2'], extrasaction="RAISE")
771 self.assertRaises(ValueError, csv.DictWriter.writerow, writer, dictrow)
772
773 def test_write_field_not_in_field_names_ignore(self):
774 fileobj = StringIO()
775 writer = csv.DictWriter(fileobj, ['f1', 'f2'], extrasaction="ignore")
776 dictrow = {'f0': 0, 'f1': 1, 'f2': 2, 'f3': 3}
777 csv.DictWriter.writerow(writer, dictrow)
778 self.assertEqual(fileobj.getvalue(), "1,2\r\n")
779
780 # bpo-44512
781 writer = csv.DictWriter(fileobj, ['f1', 'f2'], extrasaction="IGNORE")
782 csv.DictWriter.writerow(writer, dictrow)
783
784 def test_dict_reader_fieldnames_accepts_iter(self):
785 fieldnames = ["a", "b", "c"]
786 f = StringIO()
787 reader = csv.DictReader(f, iter(fieldnames))
788 self.assertEqual(reader.fieldnames, fieldnames)
789
790 def test_dict_reader_fieldnames_accepts_list(self):
791 fieldnames = ["a", "b", "c"]
792 f = StringIO()
793 reader = csv.DictReader(f, fieldnames)
794 self.assertEqual(reader.fieldnames, fieldnames)
795
796 def test_dict_writer_fieldnames_rejects_iter(self):
797 fieldnames = ["a", "b", "c"]
798 f = StringIO()
799 writer = csv.DictWriter(f, iter(fieldnames))
800 self.assertEqual(writer.fieldnames, fieldnames)
801
802 def test_dict_writer_fieldnames_accepts_list(self):
803 fieldnames = ["a", "b", "c"]
804 f = StringIO()
805 writer = csv.DictWriter(f, fieldnames)
806 self.assertEqual(writer.fieldnames, fieldnames)
807
808 def test_dict_reader_fieldnames_is_optional(self):
809 f = StringIO()
810 reader = csv.DictReader(f, fieldnames=None)
811
812 def test_read_dict_fields(self):
813 with TemporaryFile("w+", encoding="utf-8") as fileobj:
814 fileobj.write("1,2,abc\r\n")
815 fileobj.seek(0)
816 reader = csv.DictReader(fileobj,
817 fieldnames=["f1", "f2", "f3"])
818 self.assertEqual(next(reader), {"f1": '1', "f2": '2', "f3": 'abc'})
819
820 def test_read_dict_no_fieldnames(self):
821 with TemporaryFile("w+", encoding="utf-8") as fileobj:
822 fileobj.write("f1,f2,f3\r\n1,2,abc\r\n")
823 fileobj.seek(0)
824 reader = csv.DictReader(fileobj)
825 self.assertEqual(next(reader), {"f1": '1', "f2": '2', "f3": 'abc'})
826 self.assertEqual(reader.fieldnames, ["f1", "f2", "f3"])
827
828 # Two test cases to make sure existing ways of implicitly setting
829 # fieldnames continue to work. Both arise from discussion in issue3436.
830 def test_read_dict_fieldnames_from_file(self):
831 with TemporaryFile("w+", encoding="utf-8") as fileobj:
832 fileobj.write("f1,f2,f3\r\n1,2,abc\r\n")
833 fileobj.seek(0)
834 reader = csv.DictReader(fileobj,
835 fieldnames=next(csv.reader(fileobj)))
836 self.assertEqual(reader.fieldnames, ["f1", "f2", "f3"])
837 self.assertEqual(next(reader), {"f1": '1', "f2": '2', "f3": 'abc'})
838
839 def test_read_dict_fieldnames_chain(self):
840 import itertools
841 with TemporaryFile("w+", encoding="utf-8") as fileobj:
842 fileobj.write("f1,f2,f3\r\n1,2,abc\r\n")
843 fileobj.seek(0)
844 reader = csv.DictReader(fileobj)
845 first = next(reader)
846 for row in itertools.chain([first], reader):
847 self.assertEqual(reader.fieldnames, ["f1", "f2", "f3"])
848 self.assertEqual(row, {"f1": '1', "f2": '2', "f3": 'abc'})
849
850 def test_read_long(self):
851 with TemporaryFile("w+", encoding="utf-8") as fileobj:
852 fileobj.write("1,2,abc,4,5,6\r\n")
853 fileobj.seek(0)
854 reader = csv.DictReader(fileobj,
855 fieldnames=["f1", "f2"])
856 self.assertEqual(next(reader), {"f1": '1', "f2": '2',
857 None: ["abc", "4", "5", "6"]})
858
859 def test_read_long_with_rest(self):
860 with TemporaryFile("w+", encoding="utf-8") as fileobj:
861 fileobj.write("1,2,abc,4,5,6\r\n")
862 fileobj.seek(0)
863 reader = csv.DictReader(fileobj,
864 fieldnames=["f1", "f2"], restkey="_rest")
865 self.assertEqual(next(reader), {"f1": '1', "f2": '2',
866 "_rest": ["abc", "4", "5", "6"]})
867
868 def test_read_long_with_rest_no_fieldnames(self):
869 with TemporaryFile("w+", encoding="utf-8") as fileobj:
870 fileobj.write("f1,f2\r\n1,2,abc,4,5,6\r\n")
871 fileobj.seek(0)
872 reader = csv.DictReader(fileobj, restkey="_rest")
873 self.assertEqual(reader.fieldnames, ["f1", "f2"])
874 self.assertEqual(next(reader), {"f1": '1', "f2": '2',
875 "_rest": ["abc", "4", "5", "6"]})
876
877 def test_read_short(self):
878 with TemporaryFile("w+", encoding="utf-8") as fileobj:
879 fileobj.write("1,2,abc,4,5,6\r\n1,2,abc\r\n")
880 fileobj.seek(0)
881 reader = csv.DictReader(fileobj,
882 fieldnames="1 2 3 4 5 6".split(),
883 restval="DEFAULT")
884 self.assertEqual(next(reader), {"1": '1', "2": '2', "3": 'abc',
885 "4": '4', "5": '5', "6": '6'})
886 self.assertEqual(next(reader), {"1": '1', "2": '2', "3": 'abc',
887 "4": 'DEFAULT', "5": 'DEFAULT',
888 "6": 'DEFAULT'})
889
890 def test_read_multi(self):
891 sample = [
892 '2147483648,43.0e12,17,abc,def\r\n',
893 '147483648,43.0e2,17,abc,def\r\n',
894 '47483648,43.0,170,abc,def\r\n'
895 ]
896
897 reader = csv.DictReader(sample,
898 fieldnames="i1 float i2 s1 s2".split())
899 self.assertEqual(next(reader), {"i1": '2147483648',
900 "float": '43.0e12',
901 "i2": '17',
902 "s1": 'abc',
903 "s2": 'def'})
904
905 def test_read_with_blanks(self):
906 reader = csv.DictReader(["1,2,abc,4,5,6\r\n","\r\n",
907 "1,2,abc,4,5,6\r\n"],
908 fieldnames="1 2 3 4 5 6".split())
909 self.assertEqual(next(reader), {"1": '1', "2": '2', "3": 'abc',
910 "4": '4', "5": '5', "6": '6'})
911 self.assertEqual(next(reader), {"1": '1', "2": '2', "3": 'abc',
912 "4": '4', "5": '5', "6": '6'})
913
914 def test_read_semi_sep(self):
915 reader = csv.DictReader(["1;2;abc;4;5;6\r\n"],
916 fieldnames="1 2 3 4 5 6".split(),
917 delimiter=';')
918 self.assertEqual(next(reader), {"1": '1', "2": '2', "3": 'abc',
919 "4": '4', "5": '5', "6": '6'})
920
921 class ESC[4;38;5;81mTestArrayWrites(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
922 def test_int_write(self):
923 import array
924 contents = [(20-i) for i in range(20)]
925 a = array.array('i', contents)
926
927 with TemporaryFile("w+", encoding="utf-8", newline='') as fileobj:
928 writer = csv.writer(fileobj, dialect="excel")
929 writer.writerow(a)
930 expected = ",".join([str(i) for i in a])+"\r\n"
931 fileobj.seek(0)
932 self.assertEqual(fileobj.read(), expected)
933
934 def test_double_write(self):
935 import array
936 contents = [(20-i)*0.1 for i in range(20)]
937 a = array.array('d', contents)
938 with TemporaryFile("w+", encoding="utf-8", newline='') as fileobj:
939 writer = csv.writer(fileobj, dialect="excel")
940 writer.writerow(a)
941 expected = ",".join([str(i) for i in a])+"\r\n"
942 fileobj.seek(0)
943 self.assertEqual(fileobj.read(), expected)
944
945 def test_float_write(self):
946 import array
947 contents = [(20-i)*0.1 for i in range(20)]
948 a = array.array('f', contents)
949 with TemporaryFile("w+", encoding="utf-8", newline='') as fileobj:
950 writer = csv.writer(fileobj, dialect="excel")
951 writer.writerow(a)
952 expected = ",".join([str(i) for i in a])+"\r\n"
953 fileobj.seek(0)
954 self.assertEqual(fileobj.read(), expected)
955
956 def test_char_write(self):
957 import array, string
958 a = array.array('u', string.ascii_letters)
959
960 with TemporaryFile("w+", encoding="utf-8", newline='') as fileobj:
961 writer = csv.writer(fileobj, dialect="excel")
962 writer.writerow(a)
963 expected = ",".join(a)+"\r\n"
964 fileobj.seek(0)
965 self.assertEqual(fileobj.read(), expected)
966
967 class ESC[4;38;5;81mTestDialectValidity(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
968 def test_quoting(self):
969 class ESC[4;38;5;81mmydialect(ESC[4;38;5;149mcsvESC[4;38;5;149m.ESC[4;38;5;149mDialect):
970 delimiter = ";"
971 escapechar = '\\'
972 doublequote = False
973 skipinitialspace = True
974 lineterminator = '\r\n'
975 quoting = csv.QUOTE_NONE
976 d = mydialect()
977 self.assertEqual(d.quoting, csv.QUOTE_NONE)
978
979 mydialect.quoting = None
980 self.assertRaises(csv.Error, mydialect)
981
982 mydialect.doublequote = True
983 mydialect.quoting = csv.QUOTE_ALL
984 mydialect.quotechar = '"'
985 d = mydialect()
986 self.assertEqual(d.quoting, csv.QUOTE_ALL)
987 self.assertEqual(d.quotechar, '"')
988 self.assertTrue(d.doublequote)
989
990 mydialect.quotechar = ""
991 with self.assertRaises(csv.Error) as cm:
992 mydialect()
993 self.assertEqual(str(cm.exception),
994 '"quotechar" must be a 1-character string')
995
996 mydialect.quotechar = "''"
997 with self.assertRaises(csv.Error) as cm:
998 mydialect()
999 self.assertEqual(str(cm.exception),
1000 '"quotechar" must be a 1-character string')
1001
1002 mydialect.quotechar = 4
1003 with self.assertRaises(csv.Error) as cm:
1004 mydialect()
1005 self.assertEqual(str(cm.exception),
1006 '"quotechar" must be string or None, not int')
1007
1008 def test_delimiter(self):
1009 class ESC[4;38;5;81mmydialect(ESC[4;38;5;149mcsvESC[4;38;5;149m.ESC[4;38;5;149mDialect):
1010 delimiter = ";"
1011 escapechar = '\\'
1012 doublequote = False
1013 skipinitialspace = True
1014 lineterminator = '\r\n'
1015 quoting = csv.QUOTE_NONE
1016 d = mydialect()
1017 self.assertEqual(d.delimiter, ";")
1018
1019 mydialect.delimiter = ":::"
1020 with self.assertRaises(csv.Error) as cm:
1021 mydialect()
1022 self.assertEqual(str(cm.exception),
1023 '"delimiter" must be a 1-character string')
1024
1025 mydialect.delimiter = ""
1026 with self.assertRaises(csv.Error) as cm:
1027 mydialect()
1028 self.assertEqual(str(cm.exception),
1029 '"delimiter" must be a 1-character string')
1030
1031 mydialect.delimiter = b","
1032 with self.assertRaises(csv.Error) as cm:
1033 mydialect()
1034 self.assertEqual(str(cm.exception),
1035 '"delimiter" must be string, not bytes')
1036
1037 mydialect.delimiter = 4
1038 with self.assertRaises(csv.Error) as cm:
1039 mydialect()
1040 self.assertEqual(str(cm.exception),
1041 '"delimiter" must be string, not int')
1042
1043 mydialect.delimiter = None
1044 with self.assertRaises(csv.Error) as cm:
1045 mydialect()
1046 self.assertEqual(str(cm.exception),
1047 '"delimiter" must be string, not NoneType')
1048
1049 def test_escapechar(self):
1050 class ESC[4;38;5;81mmydialect(ESC[4;38;5;149mcsvESC[4;38;5;149m.ESC[4;38;5;149mDialect):
1051 delimiter = ";"
1052 escapechar = '\\'
1053 doublequote = False
1054 skipinitialspace = True
1055 lineterminator = '\r\n'
1056 quoting = csv.QUOTE_NONE
1057 d = mydialect()
1058 self.assertEqual(d.escapechar, "\\")
1059
1060 mydialect.escapechar = ""
1061 with self.assertRaisesRegex(csv.Error, '"escapechar" must be a 1-character string'):
1062 mydialect()
1063
1064 mydialect.escapechar = "**"
1065 with self.assertRaisesRegex(csv.Error, '"escapechar" must be a 1-character string'):
1066 mydialect()
1067
1068 mydialect.escapechar = b"*"
1069 with self.assertRaisesRegex(csv.Error, '"escapechar" must be string or None, not bytes'):
1070 mydialect()
1071
1072 mydialect.escapechar = 4
1073 with self.assertRaisesRegex(csv.Error, '"escapechar" must be string or None, not int'):
1074 mydialect()
1075
1076 def test_lineterminator(self):
1077 class ESC[4;38;5;81mmydialect(ESC[4;38;5;149mcsvESC[4;38;5;149m.ESC[4;38;5;149mDialect):
1078 delimiter = ";"
1079 escapechar = '\\'
1080 doublequote = False
1081 skipinitialspace = True
1082 lineterminator = '\r\n'
1083 quoting = csv.QUOTE_NONE
1084 d = mydialect()
1085 self.assertEqual(d.lineterminator, '\r\n')
1086
1087 mydialect.lineterminator = ":::"
1088 d = mydialect()
1089 self.assertEqual(d.lineterminator, ":::")
1090
1091 mydialect.lineterminator = 4
1092 with self.assertRaises(csv.Error) as cm:
1093 mydialect()
1094 self.assertEqual(str(cm.exception),
1095 '"lineterminator" must be a string')
1096
1097 def test_invalid_chars(self):
1098 def create_invalid(field_name, value):
1099 class ESC[4;38;5;81mmydialect(ESC[4;38;5;149mcsvESC[4;38;5;149m.ESC[4;38;5;149mDialect):
1100 pass
1101 setattr(mydialect, field_name, value)
1102 d = mydialect()
1103
1104 for field_name in ("delimiter", "escapechar", "quotechar"):
1105 with self.subTest(field_name=field_name):
1106 self.assertRaises(csv.Error, create_invalid, field_name, "")
1107 self.assertRaises(csv.Error, create_invalid, field_name, "abc")
1108 self.assertRaises(csv.Error, create_invalid, field_name, b'x')
1109 self.assertRaises(csv.Error, create_invalid, field_name, 5)
1110
1111
1112 class ESC[4;38;5;81mTestSniffer(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1113 sample1 = """\
1114 Harry's, Arlington Heights, IL, 2/1/03, Kimi Hayes
1115 Shark City, Glendale Heights, IL, 12/28/02, Prezence
1116 Tommy's Place, Blue Island, IL, 12/28/02, Blue Sunday/White Crow
1117 Stonecutters Seafood and Chop House, Lemont, IL, 12/19/02, Week Back
1118 """
1119 sample2 = """\
1120 'Harry''s':'Arlington Heights':'IL':'2/1/03':'Kimi Hayes'
1121 'Shark City':'Glendale Heights':'IL':'12/28/02':'Prezence'
1122 'Tommy''s Place':'Blue Island':'IL':'12/28/02':'Blue Sunday/White Crow'
1123 'Stonecutters ''Seafood'' and Chop House':'Lemont':'IL':'12/19/02':'Week Back'
1124 """
1125 header1 = '''\
1126 "venue","city","state","date","performers"
1127 '''
1128 sample3 = '''\
1129 05/05/03?05/05/03?05/05/03?05/05/03?05/05/03?05/05/03
1130 05/05/03?05/05/03?05/05/03?05/05/03?05/05/03?05/05/03
1131 05/05/03?05/05/03?05/05/03?05/05/03?05/05/03?05/05/03
1132 '''
1133
1134 sample4 = '''\
1135 2147483648;43.0e12;17;abc;def
1136 147483648;43.0e2;17;abc;def
1137 47483648;43.0;170;abc;def
1138 '''
1139
1140 sample5 = "aaa\tbbb\r\nAAA\t\r\nBBB\t\r\n"
1141 sample6 = "a|b|c\r\nd|e|f\r\n"
1142 sample7 = "'a'|'b'|'c'\r\n'd'|e|f\r\n"
1143
1144 # Issue 18155: Use a delimiter that is a special char to regex:
1145
1146 header2 = '''\
1147 "venue"+"city"+"state"+"date"+"performers"
1148 '''
1149 sample8 = """\
1150 Harry's+ Arlington Heights+ IL+ 2/1/03+ Kimi Hayes
1151 Shark City+ Glendale Heights+ IL+ 12/28/02+ Prezence
1152 Tommy's Place+ Blue Island+ IL+ 12/28/02+ Blue Sunday/White Crow
1153 Stonecutters Seafood and Chop House+ Lemont+ IL+ 12/19/02+ Week Back
1154 """
1155 sample9 = """\
1156 'Harry''s'+ Arlington Heights'+ 'IL'+ '2/1/03'+ 'Kimi Hayes'
1157 'Shark City'+ Glendale Heights'+' IL'+ '12/28/02'+ 'Prezence'
1158 'Tommy''s Place'+ Blue Island'+ 'IL'+ '12/28/02'+ 'Blue Sunday/White Crow'
1159 'Stonecutters ''Seafood'' and Chop House'+ 'Lemont'+ 'IL'+ '12/19/02'+ 'Week Back'
1160 """
1161
1162 sample10 = dedent("""
1163 abc,def
1164 ghijkl,mno
1165 ghi,jkl
1166 """)
1167
1168 sample11 = dedent("""
1169 abc,def
1170 ghijkl,mnop
1171 ghi,jkl
1172 """)
1173
1174 sample12 = dedent(""""time","forces"
1175 1,1.5
1176 0.5,5+0j
1177 0,0
1178 1+1j,6
1179 """)
1180
1181 sample13 = dedent(""""time","forces"
1182 0,0
1183 1,2
1184 a,b
1185 """)
1186
1187 sample14 = """\
1188 abc\0def
1189 ghijkl\0mno
1190 ghi\0jkl
1191 """
1192
1193 def test_issue43625(self):
1194 sniffer = csv.Sniffer()
1195 self.assertTrue(sniffer.has_header(self.sample12))
1196 self.assertFalse(sniffer.has_header(self.sample13))
1197
1198 def test_has_header_strings(self):
1199 "More to document existing (unexpected?) behavior than anything else."
1200 sniffer = csv.Sniffer()
1201 self.assertFalse(sniffer.has_header(self.sample10))
1202 self.assertFalse(sniffer.has_header(self.sample11))
1203
1204 def test_has_header(self):
1205 sniffer = csv.Sniffer()
1206 self.assertIs(sniffer.has_header(self.sample1), False)
1207 self.assertIs(sniffer.has_header(self.header1 + self.sample1), True)
1208
1209 def test_has_header_regex_special_delimiter(self):
1210 sniffer = csv.Sniffer()
1211 self.assertIs(sniffer.has_header(self.sample8), False)
1212 self.assertIs(sniffer.has_header(self.header2 + self.sample8), True)
1213
1214 def test_guess_quote_and_delimiter(self):
1215 sniffer = csv.Sniffer()
1216 for header in (";'123;4';", "'123;4';", ";'123;4'", "'123;4'"):
1217 with self.subTest(header):
1218 dialect = sniffer.sniff(header, ",;")
1219 self.assertEqual(dialect.delimiter, ';')
1220 self.assertEqual(dialect.quotechar, "'")
1221 self.assertIs(dialect.doublequote, False)
1222 self.assertIs(dialect.skipinitialspace, False)
1223
1224 def test_sniff(self):
1225 sniffer = csv.Sniffer()
1226 dialect = sniffer.sniff(self.sample1)
1227 self.assertEqual(dialect.delimiter, ",")
1228 self.assertEqual(dialect.quotechar, '"')
1229 self.assertIs(dialect.skipinitialspace, True)
1230
1231 dialect = sniffer.sniff(self.sample2)
1232 self.assertEqual(dialect.delimiter, ":")
1233 self.assertEqual(dialect.quotechar, "'")
1234 self.assertIs(dialect.skipinitialspace, False)
1235
1236 def test_delimiters(self):
1237 sniffer = csv.Sniffer()
1238 dialect = sniffer.sniff(self.sample3)
1239 # given that all three lines in sample3 are equal,
1240 # I think that any character could have been 'guessed' as the
1241 # delimiter, depending on dictionary order
1242 self.assertIn(dialect.delimiter, self.sample3)
1243 dialect = sniffer.sniff(self.sample3, delimiters="?,")
1244 self.assertEqual(dialect.delimiter, "?")
1245 dialect = sniffer.sniff(self.sample3, delimiters="/,")
1246 self.assertEqual(dialect.delimiter, "/")
1247 dialect = sniffer.sniff(self.sample4)
1248 self.assertEqual(dialect.delimiter, ";")
1249 dialect = sniffer.sniff(self.sample5)
1250 self.assertEqual(dialect.delimiter, "\t")
1251 dialect = sniffer.sniff(self.sample6)
1252 self.assertEqual(dialect.delimiter, "|")
1253 dialect = sniffer.sniff(self.sample7)
1254 self.assertEqual(dialect.delimiter, "|")
1255 self.assertEqual(dialect.quotechar, "'")
1256 dialect = sniffer.sniff(self.sample8)
1257 self.assertEqual(dialect.delimiter, '+')
1258 dialect = sniffer.sniff(self.sample9)
1259 self.assertEqual(dialect.delimiter, '+')
1260 self.assertEqual(dialect.quotechar, "'")
1261 dialect = sniffer.sniff(self.sample14)
1262 self.assertEqual(dialect.delimiter, '\0')
1263
1264 def test_doublequote(self):
1265 sniffer = csv.Sniffer()
1266 dialect = sniffer.sniff(self.header1)
1267 self.assertFalse(dialect.doublequote)
1268 dialect = sniffer.sniff(self.header2)
1269 self.assertFalse(dialect.doublequote)
1270 dialect = sniffer.sniff(self.sample2)
1271 self.assertTrue(dialect.doublequote)
1272 dialect = sniffer.sniff(self.sample8)
1273 self.assertFalse(dialect.doublequote)
1274 dialect = sniffer.sniff(self.sample9)
1275 self.assertTrue(dialect.doublequote)
1276
1277 class ESC[4;38;5;81mNUL:
1278 def write(s, *args):
1279 pass
1280 writelines = write
1281
1282 @unittest.skipUnless(hasattr(sys, "gettotalrefcount"),
1283 'requires sys.gettotalrefcount()')
1284 class ESC[4;38;5;81mTestLeaks(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1285 def test_create_read(self):
1286 delta = 0
1287 lastrc = sys.gettotalrefcount()
1288 for i in range(20):
1289 gc.collect()
1290 self.assertEqual(gc.garbage, [])
1291 rc = sys.gettotalrefcount()
1292 csv.reader(["a,b,c\r\n"])
1293 csv.reader(["a,b,c\r\n"])
1294 csv.reader(["a,b,c\r\n"])
1295 delta = rc-lastrc
1296 lastrc = rc
1297 # if csv.reader() leaks, last delta should be 3 or more
1298 self.assertLess(delta, 3)
1299
1300 def test_create_write(self):
1301 delta = 0
1302 lastrc = sys.gettotalrefcount()
1303 s = NUL()
1304 for i in range(20):
1305 gc.collect()
1306 self.assertEqual(gc.garbage, [])
1307 rc = sys.gettotalrefcount()
1308 csv.writer(s)
1309 csv.writer(s)
1310 csv.writer(s)
1311 delta = rc-lastrc
1312 lastrc = rc
1313 # if csv.writer() leaks, last delta should be 3 or more
1314 self.assertLess(delta, 3)
1315
1316 def test_read(self):
1317 delta = 0
1318 rows = ["a,b,c\r\n"]*5
1319 lastrc = sys.gettotalrefcount()
1320 for i in range(20):
1321 gc.collect()
1322 self.assertEqual(gc.garbage, [])
1323 rc = sys.gettotalrefcount()
1324 rdr = csv.reader(rows)
1325 for row in rdr:
1326 pass
1327 delta = rc-lastrc
1328 lastrc = rc
1329 # if reader leaks during read, delta should be 5 or more
1330 self.assertLess(delta, 5)
1331
1332 def test_write(self):
1333 delta = 0
1334 rows = [[1,2,3]]*5
1335 s = NUL()
1336 lastrc = sys.gettotalrefcount()
1337 for i in range(20):
1338 gc.collect()
1339 self.assertEqual(gc.garbage, [])
1340 rc = sys.gettotalrefcount()
1341 writer = csv.writer(s)
1342 for row in rows:
1343 writer.writerow(row)
1344 delta = rc-lastrc
1345 lastrc = rc
1346 # if writer leaks during write, last delta should be 5 or more
1347 self.assertLess(delta, 5)
1348
1349 class ESC[4;38;5;81mTestUnicode(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1350
1351 names = ["Martin von Löwis",
1352 "Marc André Lemburg",
1353 "Guido van Rossum",
1354 "François Pinard"]
1355
1356 def test_unicode_read(self):
1357 with TemporaryFile("w+", newline='', encoding="utf-8") as fileobj:
1358 fileobj.write(",".join(self.names) + "\r\n")
1359 fileobj.seek(0)
1360 reader = csv.reader(fileobj)
1361 self.assertEqual(list(reader), [self.names])
1362
1363
1364 def test_unicode_write(self):
1365 with TemporaryFile("w+", newline='', encoding="utf-8") as fileobj:
1366 writer = csv.writer(fileobj)
1367 writer.writerow(self.names)
1368 expected = ",".join(self.names)+"\r\n"
1369 fileobj.seek(0)
1370 self.assertEqual(fileobj.read(), expected)
1371
1372 class ESC[4;38;5;81mKeyOrderingTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1373
1374 def test_ordering_for_the_dict_reader_and_writer(self):
1375 resultset = set()
1376 for keys in permutations("abcde"):
1377 with TemporaryFile('w+', newline='', encoding="utf-8") as fileobject:
1378 dw = csv.DictWriter(fileobject, keys)
1379 dw.writeheader()
1380 fileobject.seek(0)
1381 dr = csv.DictReader(fileobject)
1382 kt = tuple(dr.fieldnames)
1383 self.assertEqual(keys, kt)
1384 resultset.add(kt)
1385 # Final sanity check: were all permutations unique?
1386 self.assertEqual(len(resultset), 120, "Key ordering: some key permutations not collected (expected 120)")
1387
1388 def test_ordered_dict_reader(self):
1389 data = dedent('''\
1390 FirstName,LastName
1391 Eric,Idle
1392 Graham,Chapman,Over1,Over2
1393
1394 Under1
1395 John,Cleese
1396 ''').splitlines()
1397
1398 self.assertEqual(list(csv.DictReader(data)),
1399 [OrderedDict([('FirstName', 'Eric'), ('LastName', 'Idle')]),
1400 OrderedDict([('FirstName', 'Graham'), ('LastName', 'Chapman'),
1401 (None, ['Over1', 'Over2'])]),
1402 OrderedDict([('FirstName', 'Under1'), ('LastName', None)]),
1403 OrderedDict([('FirstName', 'John'), ('LastName', 'Cleese')]),
1404 ])
1405
1406 self.assertEqual(list(csv.DictReader(data, restkey='OtherInfo')),
1407 [OrderedDict([('FirstName', 'Eric'), ('LastName', 'Idle')]),
1408 OrderedDict([('FirstName', 'Graham'), ('LastName', 'Chapman'),
1409 ('OtherInfo', ['Over1', 'Over2'])]),
1410 OrderedDict([('FirstName', 'Under1'), ('LastName', None)]),
1411 OrderedDict([('FirstName', 'John'), ('LastName', 'Cleese')]),
1412 ])
1413
1414 del data[0] # Remove the header row
1415 self.assertEqual(list(csv.DictReader(data, fieldnames=['fname', 'lname'])),
1416 [OrderedDict([('fname', 'Eric'), ('lname', 'Idle')]),
1417 OrderedDict([('fname', 'Graham'), ('lname', 'Chapman'),
1418 (None, ['Over1', 'Over2'])]),
1419 OrderedDict([('fname', 'Under1'), ('lname', None)]),
1420 OrderedDict([('fname', 'John'), ('lname', 'Cleese')]),
1421 ])
1422
1423
1424 class ESC[4;38;5;81mMiscTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1425 def test__all__(self):
1426 extra = {'__doc__', '__version__'}
1427 support.check__all__(self, csv, ('csv', '_csv'), extra=extra)
1428
1429 def test_subclassable(self):
1430 # issue 44089
1431 class ESC[4;38;5;81mFoo(ESC[4;38;5;149mcsvESC[4;38;5;149m.ESC[4;38;5;149mError): ...
1432
1433 @support.cpython_only
1434 def test_disallow_instantiation(self):
1435 _csv = import_helper.import_module("_csv")
1436 for tp in _csv.Reader, _csv.Writer:
1437 with self.subTest(tp=tp):
1438 check_disallow_instantiation(self, tp)
1439
1440 if __name__ == '__main__':
1441 unittest.main()