1 # Copyright (C) 2001,2002 Python Software Foundation
2 # csv package unit tests
3
4 import copy
5 import sys
6 import unittest
7 from io import StringIO
8 from tempfile import TemporaryFile
9 import csv
10 import gc
11 import pickle
12 from test import support
13 from test.support import warnings_helper, import_helper, check_disallow_instantiation
14 from itertools import permutations
15 from textwrap import dedent
16 from collections import OrderedDict
17
18
19 class ESC[4;38;5;81mBadIterable:
20 def __iter__(self):
21 raise OSError
22
23
24 class ESC[4;38;5;81mTest_Csv(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
25 """
26 Test the underlying C csv parser in ways that are not appropriate
27 from the high level interface. Further tests of this nature are done
28 in TestDialectRegistry.
29 """
30 def _test_arg_valid(self, ctor, arg):
31 self.assertRaises(TypeError, ctor)
32 self.assertRaises(TypeError, ctor, None)
33 self.assertRaises(TypeError, ctor, arg, bad_attr = 0)
34 self.assertRaises(TypeError, ctor, arg, delimiter = 0)
35 self.assertRaises(TypeError, ctor, arg, delimiter = 'XX')
36 self.assertRaises(csv.Error, ctor, arg, 'foo')
37 self.assertRaises(TypeError, ctor, arg, delimiter=None)
38 self.assertRaises(TypeError, ctor, arg, delimiter=1)
39 self.assertRaises(TypeError, ctor, arg, quotechar=1)
40 self.assertRaises(TypeError, ctor, arg, lineterminator=None)
41 self.assertRaises(TypeError, ctor, arg, lineterminator=1)
42 self.assertRaises(TypeError, ctor, arg, quoting=None)
43 self.assertRaises(TypeError, ctor, arg,
44 quoting=csv.QUOTE_ALL, quotechar='')
45 self.assertRaises(TypeError, ctor, arg,
46 quoting=csv.QUOTE_ALL, quotechar=None)
47 self.assertRaises(TypeError, ctor, arg,
48 quoting=csv.QUOTE_NONE, quotechar='')
49
50 def test_reader_arg_valid(self):
51 self._test_arg_valid(csv.reader, [])
52 self.assertRaises(OSError, csv.reader, BadIterable())
53
54 def test_writer_arg_valid(self):
55 self._test_arg_valid(csv.writer, StringIO())
56 class ESC[4;38;5;81mBadWriter:
57 @property
58 def write(self):
59 raise OSError
60 self.assertRaises(OSError, csv.writer, BadWriter())
61
62 def _test_default_attrs(self, ctor, *args):
63 obj = ctor(*args)
64 # Check defaults
65 self.assertEqual(obj.dialect.delimiter, ',')
66 self.assertIs(obj.dialect.doublequote, True)
67 self.assertEqual(obj.dialect.escapechar, None)
68 self.assertEqual(obj.dialect.lineterminator, "\r\n")
69 self.assertEqual(obj.dialect.quotechar, '"')
70 self.assertEqual(obj.dialect.quoting, csv.QUOTE_MINIMAL)
71 self.assertIs(obj.dialect.skipinitialspace, False)
72 self.assertIs(obj.dialect.strict, False)
73 # Try deleting or changing attributes (they are read-only)
74 self.assertRaises(AttributeError, delattr, obj.dialect, 'delimiter')
75 self.assertRaises(AttributeError, setattr, obj.dialect, 'delimiter', ':')
76 self.assertRaises(AttributeError, delattr, obj.dialect, 'quoting')
77 self.assertRaises(AttributeError, setattr, obj.dialect,
78 'quoting', None)
79
80 def test_reader_attrs(self):
81 self._test_default_attrs(csv.reader, [])
82
83 def test_writer_attrs(self):
84 self._test_default_attrs(csv.writer, StringIO())
85
86 def _test_kw_attrs(self, ctor, *args):
87 # Now try with alternate options
88 kwargs = dict(delimiter=':', doublequote=False, escapechar='\\',
89 lineterminator='\r', quotechar='*',
90 quoting=csv.QUOTE_NONE, skipinitialspace=True,
91 strict=True)
92 obj = ctor(*args, **kwargs)
93 self.assertEqual(obj.dialect.delimiter, ':')
94 self.assertIs(obj.dialect.doublequote, False)
95 self.assertEqual(obj.dialect.escapechar, '\\')
96 self.assertEqual(obj.dialect.lineterminator, "\r")
97 self.assertEqual(obj.dialect.quotechar, '*')
98 self.assertEqual(obj.dialect.quoting, csv.QUOTE_NONE)
99 self.assertIs(obj.dialect.skipinitialspace, True)
100 self.assertIs(obj.dialect.strict, True)
101
102 def test_reader_kw_attrs(self):
103 self._test_kw_attrs(csv.reader, [])
104
105 def test_writer_kw_attrs(self):
106 self._test_kw_attrs(csv.writer, StringIO())
107
108 def _test_dialect_attrs(self, ctor, *args):
109 # Now try with dialect-derived options
110 class ESC[4;38;5;81mdialect:
111 delimiter='-'
112 doublequote=False
113 escapechar='^'
114 lineterminator='$'
115 quotechar='#'
116 quoting=csv.QUOTE_ALL
117 skipinitialspace=True
118 strict=False
119 args = args + (dialect,)
120 obj = ctor(*args)
121 self.assertEqual(obj.dialect.delimiter, '-')
122 self.assertIs(obj.dialect.doublequote, False)
123 self.assertEqual(obj.dialect.escapechar, '^')
124 self.assertEqual(obj.dialect.lineterminator, "$")
125 self.assertEqual(obj.dialect.quotechar, '#')
126 self.assertEqual(obj.dialect.quoting, csv.QUOTE_ALL)
127 self.assertIs(obj.dialect.skipinitialspace, True)
128 self.assertIs(obj.dialect.strict, False)
129
130 def test_reader_dialect_attrs(self):
131 self._test_dialect_attrs(csv.reader, [])
132
133 def test_writer_dialect_attrs(self):
134 self._test_dialect_attrs(csv.writer, StringIO())
135
136
137 def _write_test(self, fields, expect, **kwargs):
138 with TemporaryFile("w+", encoding="utf-8", newline='') as fileobj:
139 writer = csv.writer(fileobj, **kwargs)
140 writer.writerow(fields)
141 fileobj.seek(0)
142 self.assertEqual(fileobj.read(),
143 expect + writer.dialect.lineterminator)
144
145 def _write_error_test(self, exc, fields, **kwargs):
146 with TemporaryFile("w+", encoding="utf-8", newline='') as fileobj:
147 writer = csv.writer(fileobj, **kwargs)
148 with self.assertRaises(exc):
149 writer.writerow(fields)
150 fileobj.seek(0)
151 self.assertEqual(fileobj.read(), '')
152
153 def test_write_arg_valid(self):
154 self._write_error_test(csv.Error, None)
155 self._write_test((), '')
156 self._write_test([None], '""')
157 self._write_error_test(csv.Error, [None], quoting = csv.QUOTE_NONE)
158 # Check that exceptions are passed up the chain
159 self._write_error_test(OSError, BadIterable())
160 class ESC[4;38;5;81mBadList:
161 def __len__(self):
162 return 10
163 def __getitem__(self, i):
164 if i > 2:
165 raise OSError
166 self._write_error_test(OSError, BadList())
167 class ESC[4;38;5;81mBadItem:
168 def __str__(self):
169 raise OSError
170 self._write_error_test(OSError, [BadItem()])
171
172 def test_write_bigfield(self):
173 # This exercises the buffer realloc functionality
174 bigstring = 'X' * 50000
175 self._write_test([bigstring,bigstring], '%s,%s' % \
176 (bigstring, bigstring))
177
178 def test_write_quoting(self):
179 self._write_test(['a',1,'p,q'], 'a,1,"p,q"')
180 self._write_error_test(csv.Error, ['a',1,'p,q'],
181 quoting = csv.QUOTE_NONE)
182 self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
183 quoting = csv.QUOTE_MINIMAL)
184 self._write_test(['a',1,'p,q'], '"a",1,"p,q"',
185 quoting = csv.QUOTE_NONNUMERIC)
186 self._write_test(['a',1,'p,q'], '"a","1","p,q"',
187 quoting = csv.QUOTE_ALL)
188 self._write_test(['a\nb',1], '"a\nb","1"',
189 quoting = csv.QUOTE_ALL)
190
191 def test_write_escape(self):
192 self._write_test(['a',1,'p,q'], 'a,1,"p,q"',
193 escapechar='\\')
194 self._write_error_test(csv.Error, ['a',1,'p,"q"'],
195 escapechar=None, doublequote=False)
196 self._write_test(['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
197 escapechar='\\', doublequote = False)
198 self._write_test(['"'], '""""',
199 escapechar='\\', quoting = csv.QUOTE_MINIMAL)
200 self._write_test(['"'], '\\"',
201 escapechar='\\', quoting = csv.QUOTE_MINIMAL,
202 doublequote = False)
203 self._write_test(['"'], '\\"',
204 escapechar='\\', quoting = csv.QUOTE_NONE)
205 self._write_test(['a',1,'p,q'], 'a,1,p\\,q',
206 escapechar='\\', quoting = csv.QUOTE_NONE)
207 self._write_test(['\\', 'a'], '\\\\,a',
208 escapechar='\\', quoting=csv.QUOTE_NONE)
209 self._write_test(['\\', 'a'], '\\\\,a',
210 escapechar='\\', quoting=csv.QUOTE_MINIMAL)
211 self._write_test(['\\', 'a'], '"\\\\","a"',
212 escapechar='\\', quoting=csv.QUOTE_ALL)
213 self._write_test(['\\ ', 'a'], '\\\\ ,a',
214 escapechar='\\', quoting=csv.QUOTE_MINIMAL)
215 self._write_test(['\\,', 'a'], '\\\\\\,,a',
216 escapechar='\\', quoting=csv.QUOTE_NONE)
217 self._write_test([',\\', 'a'], '",\\\\",a',
218 escapechar='\\', quoting=csv.QUOTE_MINIMAL)
219 self._write_test(['C\\', '6', '7', 'X"'], 'C\\\\,6,7,"X"""',
220 escapechar='\\', quoting=csv.QUOTE_MINIMAL)
221
222 def test_write_lineterminator(self):
223 for lineterminator in '\r\n', '\n', '\r', '!@#', '\0':
224 with self.subTest(lineterminator=lineterminator):
225 with StringIO() as sio:
226 writer = csv.writer(sio, lineterminator=lineterminator)
227 writer.writerow(['a', 'b'])
228 writer.writerow([1, 2])
229 self.assertEqual(sio.getvalue(),
230 f'a,b{lineterminator}'
231 f'1,2{lineterminator}')
232
233 def test_write_iterable(self):
234 self._write_test(iter(['a', 1, 'p,q']), 'a,1,"p,q"')
235 self._write_test(iter(['a', 1, None]), 'a,1,')
236 self._write_test(iter([]), '')
237 self._write_test(iter([None]), '""')
238 self._write_error_test(csv.Error, iter([None]), quoting=csv.QUOTE_NONE)
239 self._write_test(iter([None, None]), ',')
240
241 def test_writerows(self):
242 class ESC[4;38;5;81mBrokenFile:
243 def write(self, buf):
244 raise OSError
245 writer = csv.writer(BrokenFile())
246 self.assertRaises(OSError, writer.writerows, [['a']])
247
248 with TemporaryFile("w+", encoding="utf-8", newline='') as fileobj:
249 writer = csv.writer(fileobj)
250 self.assertRaises(TypeError, writer.writerows, None)
251 writer.writerows([['a', 'b'], ['c', 'd']])
252 fileobj.seek(0)
253 self.assertEqual(fileobj.read(), "a,b\r\nc,d\r\n")
254
255 def test_writerows_with_none(self):
256 with TemporaryFile("w+", encoding="utf-8", newline='') as fileobj:
257 writer = csv.writer(fileobj)
258 writer.writerows([['a', None], [None, 'd']])
259 fileobj.seek(0)
260 self.assertEqual(fileobj.read(), "a,\r\n,d\r\n")
261
262 with TemporaryFile("w+", encoding="utf-8", newline='') as fileobj:
263 writer = csv.writer(fileobj)
264 writer.writerows([[None], ['a']])
265 fileobj.seek(0)
266 self.assertEqual(fileobj.read(), '""\r\na\r\n')
267
268 with TemporaryFile("w+", encoding="utf-8", newline='') as fileobj:
269 writer = csv.writer(fileobj)
270 writer.writerows([['a'], [None]])
271 fileobj.seek(0)
272 self.assertEqual(fileobj.read(), 'a\r\n""\r\n')
273
274 def test_writerows_errors(self):
275 with TemporaryFile("w+", encoding="utf-8", newline='') as fileobj:
276 writer = csv.writer(fileobj)
277 self.assertRaises(TypeError, writer.writerows, None)
278 self.assertRaises(OSError, writer.writerows, BadIterable())
279
280 @support.cpython_only
281 @support.requires_legacy_unicode_capi
282 @warnings_helper.ignore_warnings(category=DeprecationWarning)
283 def test_writerows_legacy_strings(self):
284 import _testcapi
285 c = _testcapi.unicode_legacy_string('a')
286 with TemporaryFile("w+", encoding="utf-8", newline='') as fileobj:
287 writer = csv.writer(fileobj)
288 writer.writerows([[c]])
289 fileobj.seek(0)
290 self.assertEqual(fileobj.read(), "a\r\n")
291
292 def _read_test(self, input, expect, **kwargs):
293 reader = csv.reader(input, **kwargs)
294 result = list(reader)
295 self.assertEqual(result, expect)
296
297 def test_read_oddinputs(self):
298 self._read_test([], [])
299 self._read_test([''], [[]])
300 self.assertRaises(csv.Error, self._read_test,
301 ['"ab"c'], None, strict = 1)
302 self._read_test(['"ab"c'], [['abc']], doublequote = 0)
303
304 self.assertRaises(csv.Error, self._read_test,
305 [b'abc'], None)
306
307 def test_read_eol(self):
308 self._read_test(['a,b'], [['a','b']])
309 self._read_test(['a,b\n'], [['a','b']])
310 self._read_test(['a,b\r\n'], [['a','b']])
311 self._read_test(['a,b\r'], [['a','b']])
312 self.assertRaises(csv.Error, self._read_test, ['a,b\rc,d'], [])
313 self.assertRaises(csv.Error, self._read_test, ['a,b\nc,d'], [])
314 self.assertRaises(csv.Error, self._read_test, ['a,b\r\nc,d'], [])
315
316 def test_read_eof(self):
317 self._read_test(['a,"'], [['a', '']])
318 self._read_test(['"a'], [['a']])
319 self._read_test(['^'], [['\n']], escapechar='^')
320 self.assertRaises(csv.Error, self._read_test, ['a,"'], [], strict=True)
321 self.assertRaises(csv.Error, self._read_test, ['"a'], [], strict=True)
322 self.assertRaises(csv.Error, self._read_test,
323 ['^'], [], escapechar='^', strict=True)
324
325 def test_read_nul(self):
326 self._read_test(['\0'], [['\0']])
327 self._read_test(['a,\0b,c'], [['a', '\0b', 'c']])
328 self._read_test(['a,b\0,c'], [['a', 'b\0', 'c']])
329 self._read_test(['a,b\\\0,c'], [['a', 'b\0', 'c']], escapechar='\\')
330 self._read_test(['a,"\0b",c'], [['a', '\0b', 'c']])
331
332 def test_read_delimiter(self):
333 self._read_test(['a,b,c'], [['a', 'b', 'c']])
334 self._read_test(['a;b;c'], [['a', 'b', 'c']], delimiter=';')
335 self._read_test(['a\0b\0c'], [['a', 'b', 'c']], delimiter='\0')
336
337 def test_read_escape(self):
338 self._read_test(['a,\\b,c'], [['a', 'b', 'c']], escapechar='\\')
339 self._read_test(['a,b\\,c'], [['a', 'b,c']], escapechar='\\')
340 self._read_test(['a,"b\\,c"'], [['a', 'b,c']], escapechar='\\')
341 self._read_test(['a,"b,\\c"'], [['a', 'b,c']], escapechar='\\')
342 self._read_test(['a,"b,c\\""'], [['a', 'b,c"']], escapechar='\\')
343 self._read_test(['a,"b,c"\\'], [['a', 'b,c\\']], escapechar='\\')
344 self._read_test(['a,^b,c'], [['a', 'b', 'c']], escapechar='^')
345 self._read_test(['a,\0b,c'], [['a', 'b', 'c']], escapechar='\0')
346 self._read_test(['a,\\b,c'], [['a', '\\b', 'c']], escapechar=None)
347 self._read_test(['a,\\b,c'], [['a', '\\b', 'c']])
348
349 def test_read_quoting(self):
350 self._read_test(['1,",3,",5'], [['1', ',3,', '5']])
351 self._read_test(['1,",3,",5'], [['1', '"', '3', '"', '5']],
352 quotechar=None, escapechar='\\')
353 self._read_test(['1,",3,",5'], [['1', '"', '3', '"', '5']],
354 quoting=csv.QUOTE_NONE, escapechar='\\')
355 # will this fail where locale uses comma for decimals?
356 self._read_test([',3,"5",7.3, 9'], [['', 3, '5', 7.3, 9]],
357 quoting=csv.QUOTE_NONNUMERIC)
358 self._read_test(['"a\nb", 7'], [['a\nb', ' 7']])
359 self.assertRaises(ValueError, self._read_test,
360 ['abc,3'], [[]],
361 quoting=csv.QUOTE_NONNUMERIC)
362 self._read_test(['1,@,3,@,5'], [['1', ',3,', '5']], quotechar='@')
363 self._read_test(['1,\0,3,\0,5'], [['1', ',3,', '5']], quotechar='\0')
364
365 def test_read_skipinitialspace(self):
366 self._read_test(['no space, space, spaces,\ttab'],
367 [['no space', 'space', 'spaces', '\ttab']],
368 skipinitialspace=True)
369
370 def test_read_bigfield(self):
371 # This exercises the buffer realloc functionality and field size
372 # limits.
373 limit = csv.field_size_limit()
374 try:
375 size = 50000
376 bigstring = 'X' * size
377 bigline = '%s,%s' % (bigstring, bigstring)
378 self._read_test([bigline], [[bigstring, bigstring]])
379 csv.field_size_limit(size)
380 self._read_test([bigline], [[bigstring, bigstring]])
381 self.assertEqual(csv.field_size_limit(), size)
382 csv.field_size_limit(size-1)
383 self.assertRaises(csv.Error, self._read_test, [bigline], [])
384 self.assertRaises(TypeError, csv.field_size_limit, None)
385 self.assertRaises(TypeError, csv.field_size_limit, 1, None)
386 finally:
387 csv.field_size_limit(limit)
388
389 def test_read_linenum(self):
390 r = csv.reader(['line,1', 'line,2', 'line,3'])
391 self.assertEqual(r.line_num, 0)
392 next(r)
393 self.assertEqual(r.line_num, 1)
394 next(r)
395 self.assertEqual(r.line_num, 2)
396 next(r)
397 self.assertEqual(r.line_num, 3)
398 self.assertRaises(StopIteration, next, r)
399 self.assertEqual(r.line_num, 3)
400
401 def test_roundtrip_quoteed_newlines(self):
402 with TemporaryFile("w+", encoding="utf-8", newline='') as fileobj:
403 writer = csv.writer(fileobj)
404 rows = [['a\nb','b'],['c','x\r\nd']]
405 writer.writerows(rows)
406 fileobj.seek(0)
407 for i, row in enumerate(csv.reader(fileobj)):
408 self.assertEqual(row, rows[i])
409
410 def test_roundtrip_escaped_unquoted_newlines(self):
411 with TemporaryFile("w+", encoding="utf-8", newline='') as fileobj:
412 writer = csv.writer(fileobj,quoting=csv.QUOTE_NONE,escapechar="\\")
413 rows = [['a\nb','b'],['c','x\r\nd']]
414 writer.writerows(rows)
415 fileobj.seek(0)
416 for i, row in enumerate(csv.reader(fileobj,quoting=csv.QUOTE_NONE,escapechar="\\")):
417 self.assertEqual(row,rows[i])
418
419 class ESC[4;38;5;81mTestDialectRegistry(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
420 def test_registry_badargs(self):
421 self.assertRaises(TypeError, csv.list_dialects, None)
422 self.assertRaises(TypeError, csv.get_dialect)
423 self.assertRaises(csv.Error, csv.get_dialect, None)
424 self.assertRaises(csv.Error, csv.get_dialect, "nonesuch")
425 self.assertRaises(TypeError, csv.unregister_dialect)
426 self.assertRaises(csv.Error, csv.unregister_dialect, None)
427 self.assertRaises(csv.Error, csv.unregister_dialect, "nonesuch")
428 self.assertRaises(TypeError, csv.register_dialect, None)
429 self.assertRaises(TypeError, csv.register_dialect, None, None)
430 self.assertRaises(TypeError, csv.register_dialect, "nonesuch", 0, 0)
431 self.assertRaises(TypeError, csv.register_dialect, "nonesuch",
432 badargument=None)
433 self.assertRaises(TypeError, csv.register_dialect, "nonesuch",
434 quoting=None)
435 self.assertRaises(TypeError, csv.register_dialect, [])
436
437 def test_registry(self):
438 class ESC[4;38;5;81mmyexceltsv(ESC[4;38;5;149mcsvESC[4;38;5;149m.ESC[4;38;5;149mexcel):
439 delimiter = "\t"
440 name = "myexceltsv"
441 expected_dialects = csv.list_dialects() + [name]
442 expected_dialects.sort()
443 csv.register_dialect(name, myexceltsv)
444 self.addCleanup(csv.unregister_dialect, name)
445 self.assertEqual(csv.get_dialect(name).delimiter, '\t')
446 got_dialects = sorted(csv.list_dialects())
447 self.assertEqual(expected_dialects, got_dialects)
448
449 def test_register_kwargs(self):
450 name = 'fedcba'
451 csv.register_dialect(name, delimiter=';')
452 self.addCleanup(csv.unregister_dialect, name)
453 self.assertEqual(csv.get_dialect(name).delimiter, ';')
454 self.assertEqual([['X', 'Y', 'Z']], list(csv.reader(['X;Y;Z'], name)))
455
456 def test_register_kwargs_override(self):
457 class ESC[4;38;5;81mmydialect(ESC[4;38;5;149mcsvESC[4;38;5;149m.ESC[4;38;5;149mDialect):
458 delimiter = "\t"
459 quotechar = '"'
460 doublequote = True
461 skipinitialspace = False
462 lineterminator = '\r\n'
463 quoting = csv.QUOTE_MINIMAL
464
465 name = 'test_dialect'
466 csv.register_dialect(name, mydialect,
467 delimiter=';',
468 quotechar="'",
469 doublequote=False,
470 skipinitialspace=True,
471 lineterminator='\n',
472 quoting=csv.QUOTE_ALL)
473 self.addCleanup(csv.unregister_dialect, name)
474
475 # Ensure that kwargs do override attributes of a dialect class:
476 dialect = csv.get_dialect(name)
477 self.assertEqual(dialect.delimiter, ';')
478 self.assertEqual(dialect.quotechar, "'")
479 self.assertEqual(dialect.doublequote, False)
480 self.assertEqual(dialect.skipinitialspace, True)
481 self.assertEqual(dialect.lineterminator, '\n')
482 self.assertEqual(dialect.quoting, csv.QUOTE_ALL)
483
484 def test_incomplete_dialect(self):
485 class ESC[4;38;5;81mmyexceltsv(ESC[4;38;5;149mcsvESC[4;38;5;149m.ESC[4;38;5;149mDialect):
486 delimiter = "\t"
487 self.assertRaises(csv.Error, myexceltsv)
488
489 def test_space_dialect(self):
490 class ESC[4;38;5;81mspace(ESC[4;38;5;149mcsvESC[4;38;5;149m.ESC[4;38;5;149mexcel):
491 delimiter = " "
492 quoting = csv.QUOTE_NONE
493 escapechar = "\\"
494
495 with TemporaryFile("w+", encoding="utf-8") as fileobj:
496 fileobj.write("abc def\nc1ccccc1 benzene\n")
497 fileobj.seek(0)
498 reader = csv.reader(fileobj, dialect=space())
499 self.assertEqual(next(reader), ["abc", "def"])
500 self.assertEqual(next(reader), ["c1ccccc1", "benzene"])
501
502 def compare_dialect_123(self, expected, *writeargs, **kwwriteargs):
503
504 with TemporaryFile("w+", newline='', encoding="utf-8") as fileobj:
505
506 writer = csv.writer(fileobj, *writeargs, **kwwriteargs)
507 writer.writerow([1,2,3])
508 fileobj.seek(0)
509 self.assertEqual(fileobj.read(), expected)
510
511 def test_dialect_apply(self):
512 class ESC[4;38;5;81mtestA(ESC[4;38;5;149mcsvESC[4;38;5;149m.ESC[4;38;5;149mexcel):
513 delimiter = "\t"
514 class ESC[4;38;5;81mtestB(ESC[4;38;5;149mcsvESC[4;38;5;149m.ESC[4;38;5;149mexcel):
515 delimiter = ":"
516 class ESC[4;38;5;81mtestC(ESC[4;38;5;149mcsvESC[4;38;5;149m.ESC[4;38;5;149mexcel):
517 delimiter = "|"
518 class ESC[4;38;5;81mtestUni(ESC[4;38;5;149mcsvESC[4;38;5;149m.ESC[4;38;5;149mexcel):
519 delimiter = "\u039B"
520
521 class ESC[4;38;5;81munspecified():
522 # A class to pass as dialect but with no dialect attributes.
523 pass
524
525 csv.register_dialect('testC', testC)
526 try:
527 self.compare_dialect_123("1,2,3\r\n")
528 self.compare_dialect_123("1,2,3\r\n", dialect=None)
529 self.compare_dialect_123("1,2,3\r\n", dialect=unspecified)
530 self.compare_dialect_123("1\t2\t3\r\n", testA)
531 self.compare_dialect_123("1:2:3\r\n", dialect=testB())
532 self.compare_dialect_123("1|2|3\r\n", dialect='testC')
533 self.compare_dialect_123("1;2;3\r\n", dialect=testA,
534 delimiter=';')
535 self.compare_dialect_123("1\u039B2\u039B3\r\n",
536 dialect=testUni)
537
538 finally:
539 csv.unregister_dialect('testC')
540
541 def test_bad_dialect(self):
542 # Unknown parameter
543 self.assertRaises(TypeError, csv.reader, [], bad_attr = 0)
544 # Bad values
545 self.assertRaises(TypeError, csv.reader, [], delimiter = None)
546 self.assertRaises(TypeError, csv.reader, [], quoting = -1)
547 self.assertRaises(TypeError, csv.reader, [], quoting = 100)
548
549 def test_copy(self):
550 for name in csv.list_dialects():
551 dialect = csv.get_dialect(name)
552 self.assertRaises(TypeError, copy.copy, dialect)
553
554 def test_pickle(self):
555 for name in csv.list_dialects():
556 dialect = csv.get_dialect(name)
557 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
558 self.assertRaises(TypeError, pickle.dumps, dialect, proto)
559
560 class ESC[4;38;5;81mTestCsvBase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
561 def readerAssertEqual(self, input, expected_result):
562 with TemporaryFile("w+", encoding="utf-8", newline='') as fileobj:
563 fileobj.write(input)
564 fileobj.seek(0)
565 reader = csv.reader(fileobj, dialect = self.dialect)
566 fields = list(reader)
567 self.assertEqual(fields, expected_result)
568
569 def writerAssertEqual(self, input, expected_result):
570 with TemporaryFile("w+", encoding="utf-8", newline='') as fileobj:
571 writer = csv.writer(fileobj, dialect = self.dialect)
572 writer.writerows(input)
573 fileobj.seek(0)
574 self.assertEqual(fileobj.read(), expected_result)
575
576 class ESC[4;38;5;81mTestDialectExcel(ESC[4;38;5;149mTestCsvBase):
577 dialect = 'excel'
578
579 def test_single(self):
580 self.readerAssertEqual('abc', [['abc']])
581
582 def test_simple(self):
583 self.readerAssertEqual('1,2,3,4,5', [['1','2','3','4','5']])
584
585 def test_blankline(self):
586 self.readerAssertEqual('', [])
587
588 def test_empty_fields(self):
589 self.readerAssertEqual(',', [['', '']])
590
591 def test_singlequoted(self):
592 self.readerAssertEqual('""', [['']])
593
594 def test_singlequoted_left_empty(self):
595 self.readerAssertEqual('"",', [['','']])
596
597 def test_singlequoted_right_empty(self):
598 self.readerAssertEqual(',""', [['','']])
599
600 def test_single_quoted_quote(self):
601 self.readerAssertEqual('""""', [['"']])
602
603 def test_quoted_quotes(self):
604 self.readerAssertEqual('""""""', [['""']])
605
606 def test_inline_quote(self):
607 self.readerAssertEqual('a""b', [['a""b']])
608
609 def test_inline_quotes(self):
610 self.readerAssertEqual('a"b"c', [['a"b"c']])
611
612 def test_quotes_and_more(self):
613 # Excel would never write a field containing '"a"b', but when
614 # reading one, it will return 'ab'.
615 self.readerAssertEqual('"a"b', [['ab']])
616
617 def test_lone_quote(self):
618 self.readerAssertEqual('a"b', [['a"b']])
619
620 def test_quote_and_quote(self):
621 # Excel would never write a field containing '"a" "b"', but when
622 # reading one, it will return 'a "b"'.
623 self.readerAssertEqual('"a" "b"', [['a "b"']])
624
625 def test_space_and_quote(self):
626 self.readerAssertEqual(' "a"', [[' "a"']])
627
628 def test_quoted(self):
629 self.readerAssertEqual('1,2,3,"I think, therefore I am",5,6',
630 [['1', '2', '3',
631 'I think, therefore I am',
632 '5', '6']])
633
634 def test_quoted_quote(self):
635 self.readerAssertEqual('1,2,3,"""I see,"" said the blind man","as he picked up his hammer and saw"',
636 [['1', '2', '3',
637 '"I see," said the blind man',
638 'as he picked up his hammer and saw']])
639
640 def test_quoted_nl(self):
641 input = '''\
642 1,2,3,"""I see,""
643 said the blind man","as he picked up his
644 hammer and saw"
645 9,8,7,6'''
646 self.readerAssertEqual(input,
647 [['1', '2', '3',
648 '"I see,"\nsaid the blind man',
649 'as he picked up his\nhammer and saw'],
650 ['9','8','7','6']])
651
652 def test_dubious_quote(self):
653 self.readerAssertEqual('12,12,1",', [['12', '12', '1"', '']])
654
655 def test_null(self):
656 self.writerAssertEqual([], '')
657
658 def test_single_writer(self):
659 self.writerAssertEqual([['abc']], 'abc\r\n')
660
661 def test_simple_writer(self):
662 self.writerAssertEqual([[1, 2, 'abc', 3, 4]], '1,2,abc,3,4\r\n')
663
664 def test_quotes(self):
665 self.writerAssertEqual([[1, 2, 'a"bc"', 3, 4]], '1,2,"a""bc""",3,4\r\n')
666
667 def test_quote_fieldsep(self):
668 self.writerAssertEqual([['abc,def']], '"abc,def"\r\n')
669
670 def test_newlines(self):
671 self.writerAssertEqual([[1, 2, 'a\nbc', 3, 4]], '1,2,"a\nbc",3,4\r\n')
672
673 class ESC[4;38;5;81mEscapedExcel(ESC[4;38;5;149mcsvESC[4;38;5;149m.ESC[4;38;5;149mexcel):
674 quoting = csv.QUOTE_NONE
675 escapechar = '\\'
676
677 class ESC[4;38;5;81mTestEscapedExcel(ESC[4;38;5;149mTestCsvBase):
678 dialect = EscapedExcel()
679
680 def test_escape_fieldsep(self):
681 self.writerAssertEqual([['abc,def']], 'abc\\,def\r\n')
682
683 def test_read_escape_fieldsep(self):
684 self.readerAssertEqual('abc\\,def\r\n', [['abc,def']])
685
686 class ESC[4;38;5;81mTestDialectUnix(ESC[4;38;5;149mTestCsvBase):
687 dialect = 'unix'
688
689 def test_simple_writer(self):
690 self.writerAssertEqual([[1, 'abc def', 'abc']], '"1","abc def","abc"\n')
691
692 def test_simple_reader(self):
693 self.readerAssertEqual('"1","abc def","abc"\n', [['1', 'abc def', 'abc']])
694
695 class ESC[4;38;5;81mQuotedEscapedExcel(ESC[4;38;5;149mcsvESC[4;38;5;149m.ESC[4;38;5;149mexcel):
696 quoting = csv.QUOTE_NONNUMERIC
697 escapechar = '\\'
698
699 class ESC[4;38;5;81mTestQuotedEscapedExcel(ESC[4;38;5;149mTestCsvBase):
700 dialect = QuotedEscapedExcel()
701
702 def test_write_escape_fieldsep(self):
703 self.writerAssertEqual([['abc,def']], '"abc,def"\r\n')
704
705 def test_read_escape_fieldsep(self):
706 self.readerAssertEqual('"abc\\,def"\r\n', [['abc,def']])
707
708 class ESC[4;38;5;81mTestDictFields(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
709 ### "long" means the row is longer than the number of fieldnames
710 ### "short" means there are fewer elements in the row than fieldnames
711 def test_writeheader_return_value(self):
712 with TemporaryFile("w+", encoding="utf-8", newline='') as fileobj:
713 writer = csv.DictWriter(fileobj, fieldnames = ["f1", "f2", "f3"])
714 writeheader_return_value = writer.writeheader()
715 self.assertEqual(writeheader_return_value, 10)
716
717 def test_write_simple_dict(self):
718 with TemporaryFile("w+", encoding="utf-8", newline='') as fileobj:
719 writer = csv.DictWriter(fileobj, fieldnames = ["f1", "f2", "f3"])
720 writer.writeheader()
721 fileobj.seek(0)
722 self.assertEqual(fileobj.readline(), "f1,f2,f3\r\n")
723 writer.writerow({"f1": 10, "f3": "abc"})
724 fileobj.seek(0)
725 fileobj.readline() # header
726 self.assertEqual(fileobj.read(), "10,,abc\r\n")
727
728 def test_write_multiple_dict_rows(self):
729 fileobj = StringIO()
730 writer = csv.DictWriter(fileobj, fieldnames=["f1", "f2", "f3"])
731 writer.writeheader()
732 self.assertEqual(fileobj.getvalue(), "f1,f2,f3\r\n")
733 writer.writerows([{"f1": 1, "f2": "abc", "f3": "f"},
734 {"f1": 2, "f2": 5, "f3": "xyz"}])
735 self.assertEqual(fileobj.getvalue(),
736 "f1,f2,f3\r\n1,abc,f\r\n2,5,xyz\r\n")
737
738 def test_write_no_fields(self):
739 fileobj = StringIO()
740 self.assertRaises(TypeError, csv.DictWriter, fileobj)
741
742 def test_write_fields_not_in_fieldnames(self):
743 with TemporaryFile("w+", encoding="utf-8", newline='') as fileobj:
744 writer = csv.DictWriter(fileobj, fieldnames = ["f1", "f2", "f3"])
745 # Of special note is the non-string key (issue 19449)
746 with self.assertRaises(ValueError) as cx:
747 writer.writerow({"f4": 10, "f2": "spam", 1: "abc"})
748 exception = str(cx.exception)
749 self.assertIn("fieldnames", exception)
750 self.assertIn("'f4'", exception)
751 self.assertNotIn("'f2'", exception)
752 self.assertIn("1", exception)
753
754 def test_typo_in_extrasaction_raises_error(self):
755 fileobj = StringIO()
756 self.assertRaises(ValueError, csv.DictWriter, fileobj, ['f1', 'f2'],
757 extrasaction="raised")
758
759 def test_write_field_not_in_field_names_raise(self):
760 fileobj = StringIO()
761 writer = csv.DictWriter(fileobj, ['f1', 'f2'], extrasaction="raise")
762 dictrow = {'f0': 0, 'f1': 1, 'f2': 2, 'f3': 3}
763 self.assertRaises(ValueError, csv.DictWriter.writerow, writer, dictrow)
764
765 def test_write_field_not_in_field_names_ignore(self):
766 fileobj = StringIO()
767 writer = csv.DictWriter(fileobj, ['f1', 'f2'], extrasaction="ignore")
768 dictrow = {'f0': 0, 'f1': 1, 'f2': 2, 'f3': 3}
769 csv.DictWriter.writerow(writer, dictrow)
770 self.assertEqual(fileobj.getvalue(), "1,2\r\n")
771
772 def test_read_dict_fields(self):
773 with TemporaryFile("w+", encoding="utf-8") as fileobj:
774 fileobj.write("1,2,abc\r\n")
775 fileobj.seek(0)
776 reader = csv.DictReader(fileobj,
777 fieldnames=["f1", "f2", "f3"])
778 self.assertEqual(next(reader), {"f1": '1', "f2": '2', "f3": 'abc'})
779
780 def test_read_dict_no_fieldnames(self):
781 with TemporaryFile("w+", encoding="utf-8") as fileobj:
782 fileobj.write("f1,f2,f3\r\n1,2,abc\r\n")
783 fileobj.seek(0)
784 reader = csv.DictReader(fileobj)
785 self.assertEqual(next(reader), {"f1": '1', "f2": '2', "f3": 'abc'})
786 self.assertEqual(reader.fieldnames, ["f1", "f2", "f3"])
787
788 # Two test cases to make sure existing ways of implicitly setting
789 # fieldnames continue to work. Both arise from discussion in issue3436.
790 def test_read_dict_fieldnames_from_file(self):
791 with TemporaryFile("w+", encoding="utf-8") as fileobj:
792 fileobj.write("f1,f2,f3\r\n1,2,abc\r\n")
793 fileobj.seek(0)
794 reader = csv.DictReader(fileobj,
795 fieldnames=next(csv.reader(fileobj)))
796 self.assertEqual(reader.fieldnames, ["f1", "f2", "f3"])
797 self.assertEqual(next(reader), {"f1": '1', "f2": '2', "f3": 'abc'})
798
799 def test_read_dict_fieldnames_chain(self):
800 import itertools
801 with TemporaryFile("w+", encoding="utf-8") as fileobj:
802 fileobj.write("f1,f2,f3\r\n1,2,abc\r\n")
803 fileobj.seek(0)
804 reader = csv.DictReader(fileobj)
805 first = next(reader)
806 for row in itertools.chain([first], reader):
807 self.assertEqual(reader.fieldnames, ["f1", "f2", "f3"])
808 self.assertEqual(row, {"f1": '1', "f2": '2', "f3": 'abc'})
809
810 def test_read_long(self):
811 with TemporaryFile("w+", encoding="utf-8") as fileobj:
812 fileobj.write("1,2,abc,4,5,6\r\n")
813 fileobj.seek(0)
814 reader = csv.DictReader(fileobj,
815 fieldnames=["f1", "f2"])
816 self.assertEqual(next(reader), {"f1": '1', "f2": '2',
817 None: ["abc", "4", "5", "6"]})
818
819 def test_read_long_with_rest(self):
820 with TemporaryFile("w+", encoding="utf-8") as fileobj:
821 fileobj.write("1,2,abc,4,5,6\r\n")
822 fileobj.seek(0)
823 reader = csv.DictReader(fileobj,
824 fieldnames=["f1", "f2"], restkey="_rest")
825 self.assertEqual(next(reader), {"f1": '1', "f2": '2',
826 "_rest": ["abc", "4", "5", "6"]})
827
828 def test_read_long_with_rest_no_fieldnames(self):
829 with TemporaryFile("w+", encoding="utf-8") as fileobj:
830 fileobj.write("f1,f2\r\n1,2,abc,4,5,6\r\n")
831 fileobj.seek(0)
832 reader = csv.DictReader(fileobj, restkey="_rest")
833 self.assertEqual(reader.fieldnames, ["f1", "f2"])
834 self.assertEqual(next(reader), {"f1": '1', "f2": '2',
835 "_rest": ["abc", "4", "5", "6"]})
836
837 def test_read_short(self):
838 with TemporaryFile("w+", encoding="utf-8") as fileobj:
839 fileobj.write("1,2,abc,4,5,6\r\n1,2,abc\r\n")
840 fileobj.seek(0)
841 reader = csv.DictReader(fileobj,
842 fieldnames="1 2 3 4 5 6".split(),
843 restval="DEFAULT")
844 self.assertEqual(next(reader), {"1": '1', "2": '2', "3": 'abc',
845 "4": '4', "5": '5', "6": '6'})
846 self.assertEqual(next(reader), {"1": '1', "2": '2', "3": 'abc',
847 "4": 'DEFAULT', "5": 'DEFAULT',
848 "6": 'DEFAULT'})
849
850 def test_read_multi(self):
851 sample = [
852 '2147483648,43.0e12,17,abc,def\r\n',
853 '147483648,43.0e2,17,abc,def\r\n',
854 '47483648,43.0,170,abc,def\r\n'
855 ]
856
857 reader = csv.DictReader(sample,
858 fieldnames="i1 float i2 s1 s2".split())
859 self.assertEqual(next(reader), {"i1": '2147483648',
860 "float": '43.0e12',
861 "i2": '17',
862 "s1": 'abc',
863 "s2": 'def'})
864
865 def test_read_with_blanks(self):
866 reader = csv.DictReader(["1,2,abc,4,5,6\r\n","\r\n",
867 "1,2,abc,4,5,6\r\n"],
868 fieldnames="1 2 3 4 5 6".split())
869 self.assertEqual(next(reader), {"1": '1', "2": '2', "3": 'abc',
870 "4": '4', "5": '5', "6": '6'})
871 self.assertEqual(next(reader), {"1": '1', "2": '2', "3": 'abc',
872 "4": '4', "5": '5', "6": '6'})
873
874 def test_read_semi_sep(self):
875 reader = csv.DictReader(["1;2;abc;4;5;6\r\n"],
876 fieldnames="1 2 3 4 5 6".split(),
877 delimiter=';')
878 self.assertEqual(next(reader), {"1": '1', "2": '2', "3": 'abc',
879 "4": '4', "5": '5', "6": '6'})
880
881 class ESC[4;38;5;81mTestArrayWrites(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
882 def test_int_write(self):
883 import array
884 contents = [(20-i) for i in range(20)]
885 a = array.array('i', contents)
886
887 with TemporaryFile("w+", encoding="utf-8", newline='') as fileobj:
888 writer = csv.writer(fileobj, dialect="excel")
889 writer.writerow(a)
890 expected = ",".join([str(i) for i in a])+"\r\n"
891 fileobj.seek(0)
892 self.assertEqual(fileobj.read(), expected)
893
894 def test_double_write(self):
895 import array
896 contents = [(20-i)*0.1 for i in range(20)]
897 a = array.array('d', contents)
898 with TemporaryFile("w+", encoding="utf-8", newline='') as fileobj:
899 writer = csv.writer(fileobj, dialect="excel")
900 writer.writerow(a)
901 expected = ",".join([str(i) for i in a])+"\r\n"
902 fileobj.seek(0)
903 self.assertEqual(fileobj.read(), expected)
904
905 def test_float_write(self):
906 import array
907 contents = [(20-i)*0.1 for i in range(20)]
908 a = array.array('f', contents)
909 with TemporaryFile("w+", encoding="utf-8", newline='') as fileobj:
910 writer = csv.writer(fileobj, dialect="excel")
911 writer.writerow(a)
912 expected = ",".join([str(i) for i in a])+"\r\n"
913 fileobj.seek(0)
914 self.assertEqual(fileobj.read(), expected)
915
916 def test_char_write(self):
917 import array, string
918 a = array.array('u', string.ascii_letters)
919
920 with TemporaryFile("w+", encoding="utf-8", newline='') as fileobj:
921 writer = csv.writer(fileobj, dialect="excel")
922 writer.writerow(a)
923 expected = ",".join(a)+"\r\n"
924 fileobj.seek(0)
925 self.assertEqual(fileobj.read(), expected)
926
927 class ESC[4;38;5;81mTestDialectValidity(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
928 def test_quoting(self):
929 class ESC[4;38;5;81mmydialect(ESC[4;38;5;149mcsvESC[4;38;5;149m.ESC[4;38;5;149mDialect):
930 delimiter = ";"
931 escapechar = '\\'
932 doublequote = False
933 skipinitialspace = True
934 lineterminator = '\r\n'
935 quoting = csv.QUOTE_NONE
936 d = mydialect()
937 self.assertEqual(d.quoting, csv.QUOTE_NONE)
938
939 mydialect.quoting = None
940 self.assertRaises(csv.Error, mydialect)
941
942 mydialect.doublequote = True
943 mydialect.quoting = csv.QUOTE_ALL
944 mydialect.quotechar = '"'
945 d = mydialect()
946 self.assertEqual(d.quoting, csv.QUOTE_ALL)
947 self.assertEqual(d.quotechar, '"')
948 self.assertTrue(d.doublequote)
949
950 mydialect.quotechar = ""
951 with self.assertRaises(csv.Error) as cm:
952 mydialect()
953 self.assertEqual(str(cm.exception),
954 '"quotechar" must be a 1-character string')
955
956 mydialect.quotechar = "''"
957 with self.assertRaises(csv.Error) as cm:
958 mydialect()
959 self.assertEqual(str(cm.exception),
960 '"quotechar" must be a 1-character string')
961
962 mydialect.quotechar = 4
963 with self.assertRaises(csv.Error) as cm:
964 mydialect()
965 self.assertEqual(str(cm.exception),
966 '"quotechar" must be string or None, not int')
967
968 def test_delimiter(self):
969 class ESC[4;38;5;81mmydialect(ESC[4;38;5;149mcsvESC[4;38;5;149m.ESC[4;38;5;149mDialect):
970 delimiter = ";"
971 escapechar = '\\'
972 doublequote = False
973 skipinitialspace = True
974 lineterminator = '\r\n'
975 quoting = csv.QUOTE_NONE
976 d = mydialect()
977 self.assertEqual(d.delimiter, ";")
978
979 mydialect.delimiter = ":::"
980 with self.assertRaises(csv.Error) as cm:
981 mydialect()
982 self.assertEqual(str(cm.exception),
983 '"delimiter" must be a 1-character string')
984
985 mydialect.delimiter = ""
986 with self.assertRaises(csv.Error) as cm:
987 mydialect()
988 self.assertEqual(str(cm.exception),
989 '"delimiter" must be a 1-character string')
990
991 mydialect.delimiter = b","
992 with self.assertRaises(csv.Error) as cm:
993 mydialect()
994 self.assertEqual(str(cm.exception),
995 '"delimiter" must be string, not bytes')
996
997 mydialect.delimiter = 4
998 with self.assertRaises(csv.Error) as cm:
999 mydialect()
1000 self.assertEqual(str(cm.exception),
1001 '"delimiter" must be string, not int')
1002
1003 mydialect.delimiter = None
1004 with self.assertRaises(csv.Error) as cm:
1005 mydialect()
1006 self.assertEqual(str(cm.exception),
1007 '"delimiter" must be string, not NoneType')
1008
1009 def test_escapechar(self):
1010 class ESC[4;38;5;81mmydialect(ESC[4;38;5;149mcsvESC[4;38;5;149m.ESC[4;38;5;149mDialect):
1011 delimiter = ";"
1012 escapechar = '\\'
1013 doublequote = False
1014 skipinitialspace = True
1015 lineterminator = '\r\n'
1016 quoting = csv.QUOTE_NONE
1017 d = mydialect()
1018 self.assertEqual(d.escapechar, "\\")
1019
1020 mydialect.escapechar = ""
1021 with self.assertRaisesRegex(csv.Error, '"escapechar" must be a 1-character string'):
1022 mydialect()
1023
1024 mydialect.escapechar = "**"
1025 with self.assertRaisesRegex(csv.Error, '"escapechar" must be a 1-character string'):
1026 mydialect()
1027
1028 mydialect.escapechar = b"*"
1029 with self.assertRaisesRegex(csv.Error, '"escapechar" must be string or None, not bytes'):
1030 mydialect()
1031
1032 mydialect.escapechar = 4
1033 with self.assertRaisesRegex(csv.Error, '"escapechar" must be string or None, not int'):
1034 mydialect()
1035
1036 def test_lineterminator(self):
1037 class ESC[4;38;5;81mmydialect(ESC[4;38;5;149mcsvESC[4;38;5;149m.ESC[4;38;5;149mDialect):
1038 delimiter = ";"
1039 escapechar = '\\'
1040 doublequote = False
1041 skipinitialspace = True
1042 lineterminator = '\r\n'
1043 quoting = csv.QUOTE_NONE
1044 d = mydialect()
1045 self.assertEqual(d.lineterminator, '\r\n')
1046
1047 mydialect.lineterminator = ":::"
1048 d = mydialect()
1049 self.assertEqual(d.lineterminator, ":::")
1050
1051 mydialect.lineterminator = 4
1052 with self.assertRaises(csv.Error) as cm:
1053 mydialect()
1054 self.assertEqual(str(cm.exception),
1055 '"lineterminator" must be a string')
1056
1057 def test_invalid_chars(self):
1058 def create_invalid(field_name, value):
1059 class ESC[4;38;5;81mmydialect(ESC[4;38;5;149mcsvESC[4;38;5;149m.ESC[4;38;5;149mDialect):
1060 pass
1061 setattr(mydialect, field_name, value)
1062 d = mydialect()
1063
1064 for field_name in ("delimiter", "escapechar", "quotechar"):
1065 with self.subTest(field_name=field_name):
1066 self.assertRaises(csv.Error, create_invalid, field_name, "")
1067 self.assertRaises(csv.Error, create_invalid, field_name, "abc")
1068 self.assertRaises(csv.Error, create_invalid, field_name, b'x')
1069 self.assertRaises(csv.Error, create_invalid, field_name, 5)
1070
1071
1072 class ESC[4;38;5;81mTestSniffer(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1073 sample1 = """\
1074 Harry's, Arlington Heights, IL, 2/1/03, Kimi Hayes
1075 Shark City, Glendale Heights, IL, 12/28/02, Prezence
1076 Tommy's Place, Blue Island, IL, 12/28/02, Blue Sunday/White Crow
1077 Stonecutters Seafood and Chop House, Lemont, IL, 12/19/02, Week Back
1078 """
1079 sample2 = """\
1080 'Harry''s':'Arlington Heights':'IL':'2/1/03':'Kimi Hayes'
1081 'Shark City':'Glendale Heights':'IL':'12/28/02':'Prezence'
1082 'Tommy''s Place':'Blue Island':'IL':'12/28/02':'Blue Sunday/White Crow'
1083 'Stonecutters ''Seafood'' and Chop House':'Lemont':'IL':'12/19/02':'Week Back'
1084 """
1085 header1 = '''\
1086 "venue","city","state","date","performers"
1087 '''
1088 sample3 = '''\
1089 05/05/03?05/05/03?05/05/03?05/05/03?05/05/03?05/05/03
1090 05/05/03?05/05/03?05/05/03?05/05/03?05/05/03?05/05/03
1091 05/05/03?05/05/03?05/05/03?05/05/03?05/05/03?05/05/03
1092 '''
1093
1094 sample4 = '''\
1095 2147483648;43.0e12;17;abc;def
1096 147483648;43.0e2;17;abc;def
1097 47483648;43.0;170;abc;def
1098 '''
1099
1100 sample5 = "aaa\tbbb\r\nAAA\t\r\nBBB\t\r\n"
1101 sample6 = "a|b|c\r\nd|e|f\r\n"
1102 sample7 = "'a'|'b'|'c'\r\n'd'|e|f\r\n"
1103
1104 # Issue 18155: Use a delimiter that is a special char to regex:
1105
1106 header2 = '''\
1107 "venue"+"city"+"state"+"date"+"performers"
1108 '''
1109 sample8 = """\
1110 Harry's+ Arlington Heights+ IL+ 2/1/03+ Kimi Hayes
1111 Shark City+ Glendale Heights+ IL+ 12/28/02+ Prezence
1112 Tommy's Place+ Blue Island+ IL+ 12/28/02+ Blue Sunday/White Crow
1113 Stonecutters Seafood and Chop House+ Lemont+ IL+ 12/19/02+ Week Back
1114 """
1115 sample9 = """\
1116 'Harry''s'+ Arlington Heights'+ 'IL'+ '2/1/03'+ 'Kimi Hayes'
1117 'Shark City'+ Glendale Heights'+' IL'+ '12/28/02'+ 'Prezence'
1118 'Tommy''s Place'+ Blue Island'+ 'IL'+ '12/28/02'+ 'Blue Sunday/White Crow'
1119 'Stonecutters ''Seafood'' and Chop House'+ 'Lemont'+ 'IL'+ '12/19/02'+ 'Week Back'
1120 """
1121
1122 sample10 = dedent("""
1123 abc,def
1124 ghijkl,mno
1125 ghi,jkl
1126 """)
1127
1128 sample11 = dedent("""
1129 abc,def
1130 ghijkl,mnop
1131 ghi,jkl
1132 """)
1133
1134 sample12 = dedent(""""time","forces"
1135 1,1.5
1136 0.5,5+0j
1137 0,0
1138 1+1j,6
1139 """)
1140
1141 sample13 = dedent(""""time","forces"
1142 0,0
1143 1,2
1144 a,b
1145 """)
1146
1147 sample14 = """\
1148 abc\0def
1149 ghijkl\0mno
1150 ghi\0jkl
1151 """
1152
1153 def test_issue43625(self):
1154 sniffer = csv.Sniffer()
1155 self.assertTrue(sniffer.has_header(self.sample12))
1156 self.assertFalse(sniffer.has_header(self.sample13))
1157
1158 def test_has_header_strings(self):
1159 "More to document existing (unexpected?) behavior than anything else."
1160 sniffer = csv.Sniffer()
1161 self.assertFalse(sniffer.has_header(self.sample10))
1162 self.assertFalse(sniffer.has_header(self.sample11))
1163
1164 def test_has_header(self):
1165 sniffer = csv.Sniffer()
1166 self.assertIs(sniffer.has_header(self.sample1), False)
1167 self.assertIs(sniffer.has_header(self.header1 + self.sample1), True)
1168
1169 def test_has_header_regex_special_delimiter(self):
1170 sniffer = csv.Sniffer()
1171 self.assertIs(sniffer.has_header(self.sample8), False)
1172 self.assertIs(sniffer.has_header(self.header2 + self.sample8), True)
1173
1174 def test_guess_quote_and_delimiter(self):
1175 sniffer = csv.Sniffer()
1176 for header in (";'123;4';", "'123;4';", ";'123;4'", "'123;4'"):
1177 with self.subTest(header):
1178 dialect = sniffer.sniff(header, ",;")
1179 self.assertEqual(dialect.delimiter, ';')
1180 self.assertEqual(dialect.quotechar, "'")
1181 self.assertIs(dialect.doublequote, False)
1182 self.assertIs(dialect.skipinitialspace, False)
1183
1184 def test_sniff(self):
1185 sniffer = csv.Sniffer()
1186 dialect = sniffer.sniff(self.sample1)
1187 self.assertEqual(dialect.delimiter, ",")
1188 self.assertEqual(dialect.quotechar, '"')
1189 self.assertIs(dialect.skipinitialspace, True)
1190
1191 dialect = sniffer.sniff(self.sample2)
1192 self.assertEqual(dialect.delimiter, ":")
1193 self.assertEqual(dialect.quotechar, "'")
1194 self.assertIs(dialect.skipinitialspace, False)
1195
1196 def test_delimiters(self):
1197 sniffer = csv.Sniffer()
1198 dialect = sniffer.sniff(self.sample3)
1199 # given that all three lines in sample3 are equal,
1200 # I think that any character could have been 'guessed' as the
1201 # delimiter, depending on dictionary order
1202 self.assertIn(dialect.delimiter, self.sample3)
1203 dialect = sniffer.sniff(self.sample3, delimiters="?,")
1204 self.assertEqual(dialect.delimiter, "?")
1205 dialect = sniffer.sniff(self.sample3, delimiters="/,")
1206 self.assertEqual(dialect.delimiter, "/")
1207 dialect = sniffer.sniff(self.sample4)
1208 self.assertEqual(dialect.delimiter, ";")
1209 dialect = sniffer.sniff(self.sample5)
1210 self.assertEqual(dialect.delimiter, "\t")
1211 dialect = sniffer.sniff(self.sample6)
1212 self.assertEqual(dialect.delimiter, "|")
1213 dialect = sniffer.sniff(self.sample7)
1214 self.assertEqual(dialect.delimiter, "|")
1215 self.assertEqual(dialect.quotechar, "'")
1216 dialect = sniffer.sniff(self.sample8)
1217 self.assertEqual(dialect.delimiter, '+')
1218 dialect = sniffer.sniff(self.sample9)
1219 self.assertEqual(dialect.delimiter, '+')
1220 self.assertEqual(dialect.quotechar, "'")
1221 dialect = sniffer.sniff(self.sample14)
1222 self.assertEqual(dialect.delimiter, '\0')
1223
1224 def test_doublequote(self):
1225 sniffer = csv.Sniffer()
1226 dialect = sniffer.sniff(self.header1)
1227 self.assertFalse(dialect.doublequote)
1228 dialect = sniffer.sniff(self.header2)
1229 self.assertFalse(dialect.doublequote)
1230 dialect = sniffer.sniff(self.sample2)
1231 self.assertTrue(dialect.doublequote)
1232 dialect = sniffer.sniff(self.sample8)
1233 self.assertFalse(dialect.doublequote)
1234 dialect = sniffer.sniff(self.sample9)
1235 self.assertTrue(dialect.doublequote)
1236
1237 class ESC[4;38;5;81mNUL:
1238 def write(s, *args):
1239 pass
1240 writelines = write
1241
1242 @unittest.skipUnless(hasattr(sys, "gettotalrefcount"),
1243 'requires sys.gettotalrefcount()')
1244 class ESC[4;38;5;81mTestLeaks(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1245 def test_create_read(self):
1246 delta = 0
1247 lastrc = sys.gettotalrefcount()
1248 for i in range(20):
1249 gc.collect()
1250 self.assertEqual(gc.garbage, [])
1251 rc = sys.gettotalrefcount()
1252 csv.reader(["a,b,c\r\n"])
1253 csv.reader(["a,b,c\r\n"])
1254 csv.reader(["a,b,c\r\n"])
1255 delta = rc-lastrc
1256 lastrc = rc
1257 # if csv.reader() leaks, last delta should be 3 or more
1258 self.assertLess(delta, 3)
1259
1260 def test_create_write(self):
1261 delta = 0
1262 lastrc = sys.gettotalrefcount()
1263 s = NUL()
1264 for i in range(20):
1265 gc.collect()
1266 self.assertEqual(gc.garbage, [])
1267 rc = sys.gettotalrefcount()
1268 csv.writer(s)
1269 csv.writer(s)
1270 csv.writer(s)
1271 delta = rc-lastrc
1272 lastrc = rc
1273 # if csv.writer() leaks, last delta should be 3 or more
1274 self.assertLess(delta, 3)
1275
1276 def test_read(self):
1277 delta = 0
1278 rows = ["a,b,c\r\n"]*5
1279 lastrc = sys.gettotalrefcount()
1280 for i in range(20):
1281 gc.collect()
1282 self.assertEqual(gc.garbage, [])
1283 rc = sys.gettotalrefcount()
1284 rdr = csv.reader(rows)
1285 for row in rdr:
1286 pass
1287 delta = rc-lastrc
1288 lastrc = rc
1289 # if reader leaks during read, delta should be 5 or more
1290 self.assertLess(delta, 5)
1291
1292 def test_write(self):
1293 delta = 0
1294 rows = [[1,2,3]]*5
1295 s = NUL()
1296 lastrc = sys.gettotalrefcount()
1297 for i in range(20):
1298 gc.collect()
1299 self.assertEqual(gc.garbage, [])
1300 rc = sys.gettotalrefcount()
1301 writer = csv.writer(s)
1302 for row in rows:
1303 writer.writerow(row)
1304 delta = rc-lastrc
1305 lastrc = rc
1306 # if writer leaks during write, last delta should be 5 or more
1307 self.assertLess(delta, 5)
1308
1309 class ESC[4;38;5;81mTestUnicode(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1310
1311 names = ["Martin von Löwis",
1312 "Marc André Lemburg",
1313 "Guido van Rossum",
1314 "François Pinard"]
1315
1316 def test_unicode_read(self):
1317 with TemporaryFile("w+", newline='', encoding="utf-8") as fileobj:
1318 fileobj.write(",".join(self.names) + "\r\n")
1319 fileobj.seek(0)
1320 reader = csv.reader(fileobj)
1321 self.assertEqual(list(reader), [self.names])
1322
1323
1324 def test_unicode_write(self):
1325 with TemporaryFile("w+", newline='', encoding="utf-8") as fileobj:
1326 writer = csv.writer(fileobj)
1327 writer.writerow(self.names)
1328 expected = ",".join(self.names)+"\r\n"
1329 fileobj.seek(0)
1330 self.assertEqual(fileobj.read(), expected)
1331
1332 class ESC[4;38;5;81mKeyOrderingTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1333
1334 def test_ordering_for_the_dict_reader_and_writer(self):
1335 resultset = set()
1336 for keys in permutations("abcde"):
1337 with TemporaryFile('w+', newline='', encoding="utf-8") as fileobject:
1338 dw = csv.DictWriter(fileobject, keys)
1339 dw.writeheader()
1340 fileobject.seek(0)
1341 dr = csv.DictReader(fileobject)
1342 kt = tuple(dr.fieldnames)
1343 self.assertEqual(keys, kt)
1344 resultset.add(kt)
1345 # Final sanity check: were all permutations unique?
1346 self.assertEqual(len(resultset), 120, "Key ordering: some key permutations not collected (expected 120)")
1347
1348 def test_ordered_dict_reader(self):
1349 data = dedent('''\
1350 FirstName,LastName
1351 Eric,Idle
1352 Graham,Chapman,Over1,Over2
1353
1354 Under1
1355 John,Cleese
1356 ''').splitlines()
1357
1358 self.assertEqual(list(csv.DictReader(data)),
1359 [OrderedDict([('FirstName', 'Eric'), ('LastName', 'Idle')]),
1360 OrderedDict([('FirstName', 'Graham'), ('LastName', 'Chapman'),
1361 (None, ['Over1', 'Over2'])]),
1362 OrderedDict([('FirstName', 'Under1'), ('LastName', None)]),
1363 OrderedDict([('FirstName', 'John'), ('LastName', 'Cleese')]),
1364 ])
1365
1366 self.assertEqual(list(csv.DictReader(data, restkey='OtherInfo')),
1367 [OrderedDict([('FirstName', 'Eric'), ('LastName', 'Idle')]),
1368 OrderedDict([('FirstName', 'Graham'), ('LastName', 'Chapman'),
1369 ('OtherInfo', ['Over1', 'Over2'])]),
1370 OrderedDict([('FirstName', 'Under1'), ('LastName', None)]),
1371 OrderedDict([('FirstName', 'John'), ('LastName', 'Cleese')]),
1372 ])
1373
1374 del data[0] # Remove the header row
1375 self.assertEqual(list(csv.DictReader(data, fieldnames=['fname', 'lname'])),
1376 [OrderedDict([('fname', 'Eric'), ('lname', 'Idle')]),
1377 OrderedDict([('fname', 'Graham'), ('lname', 'Chapman'),
1378 (None, ['Over1', 'Over2'])]),
1379 OrderedDict([('fname', 'Under1'), ('lname', None)]),
1380 OrderedDict([('fname', 'John'), ('lname', 'Cleese')]),
1381 ])
1382
1383
1384 class ESC[4;38;5;81mMiscTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1385 def test__all__(self):
1386 extra = {'__doc__', '__version__'}
1387 support.check__all__(self, csv, ('csv', '_csv'), extra=extra)
1388
1389 def test_subclassable(self):
1390 # issue 44089
1391 class ESC[4;38;5;81mFoo(ESC[4;38;5;149mcsvESC[4;38;5;149m.ESC[4;38;5;149mError): ...
1392
1393 @support.cpython_only
1394 def test_disallow_instantiation(self):
1395 _csv = import_helper.import_module("_csv")
1396 for tp in _csv.Reader, _csv.Writer:
1397 with self.subTest(tp=tp):
1398 check_disallow_instantiation(self, tp)
1399
1400 if __name__ == '__main__':
1401 unittest.main()