1 # Adapted from test_file.py by Daniel Stutzbach
2
3 import sys
4 import os
5 import io
6 import errno
7 import unittest
8 from array import array
9 from weakref import proxy
10 from functools import wraps
11
12 from test.support import (
13 cpython_only, swap_attr, gc_collect, is_emscripten, is_wasi
14 )
15 from test.support.os_helper import (
16 TESTFN, TESTFN_ASCII, TESTFN_UNICODE, make_bad_fd,
17 )
18 from test.support.warnings_helper import check_warnings
19 from collections import UserList
20
21 import _io # C implementation of io
22 import _pyio # Python implementation of io
23
24
25 class ESC[4;38;5;81mAutoFileTests:
26 # file tests for which a test file is automatically set up
27
28 def setUp(self):
29 self.f = self.FileIO(TESTFN, 'w')
30
31 def tearDown(self):
32 if self.f:
33 self.f.close()
34 os.remove(TESTFN)
35
36 def testWeakRefs(self):
37 # verify weak references
38 p = proxy(self.f)
39 p.write(bytes(range(10)))
40 self.assertEqual(self.f.tell(), p.tell())
41 self.f.close()
42 self.f = None
43 gc_collect() # For PyPy or other GCs.
44 self.assertRaises(ReferenceError, getattr, p, 'tell')
45
46 def testSeekTell(self):
47 self.f.write(bytes(range(20)))
48 self.assertEqual(self.f.tell(), 20)
49 self.f.seek(0)
50 self.assertEqual(self.f.tell(), 0)
51 self.f.seek(10)
52 self.assertEqual(self.f.tell(), 10)
53 self.f.seek(5, 1)
54 self.assertEqual(self.f.tell(), 15)
55 self.f.seek(-5, 1)
56 self.assertEqual(self.f.tell(), 10)
57 self.f.seek(-5, 2)
58 self.assertEqual(self.f.tell(), 15)
59
60 def testAttributes(self):
61 # verify expected attributes exist
62 f = self.f
63
64 self.assertEqual(f.mode, "wb")
65 self.assertEqual(f.closed, False)
66
67 # verify the attributes are readonly
68 for attr in 'mode', 'closed':
69 self.assertRaises((AttributeError, TypeError),
70 setattr, f, attr, 'oops')
71
72 @unittest.skipIf(is_wasi, "WASI does not expose st_blksize.")
73 def testBlksize(self):
74 # test private _blksize attribute
75 blksize = io.DEFAULT_BUFFER_SIZE
76 # try to get preferred blksize from stat.st_blksize, if available
77 if hasattr(os, 'fstat'):
78 fst = os.fstat(self.f.fileno())
79 blksize = getattr(fst, 'st_blksize', blksize)
80 self.assertEqual(self.f._blksize, blksize)
81
82 # verify readinto
83 def testReadintoByteArray(self):
84 self.f.write(bytes([1, 2, 0, 255]))
85 self.f.close()
86
87 ba = bytearray(b'abcdefgh')
88 with self.FileIO(TESTFN, 'r') as f:
89 n = f.readinto(ba)
90 self.assertEqual(ba, b'\x01\x02\x00\xffefgh')
91 self.assertEqual(n, 4)
92
93 def _testReadintoMemoryview(self):
94 self.f.write(bytes([1, 2, 0, 255]))
95 self.f.close()
96
97 m = memoryview(bytearray(b'abcdefgh'))
98 with self.FileIO(TESTFN, 'r') as f:
99 n = f.readinto(m)
100 self.assertEqual(m, b'\x01\x02\x00\xffefgh')
101 self.assertEqual(n, 4)
102
103 m = memoryview(bytearray(b'abcdefgh')).cast('H', shape=[2, 2])
104 with self.FileIO(TESTFN, 'r') as f:
105 n = f.readinto(m)
106 self.assertEqual(bytes(m), b'\x01\x02\x00\xffefgh')
107 self.assertEqual(n, 4)
108
109 def _testReadintoArray(self):
110 self.f.write(bytes([1, 2, 0, 255]))
111 self.f.close()
112
113 a = array('B', b'abcdefgh')
114 with self.FileIO(TESTFN, 'r') as f:
115 n = f.readinto(a)
116 self.assertEqual(a, array('B', [1, 2, 0, 255, 101, 102, 103, 104]))
117 self.assertEqual(n, 4)
118
119 a = array('b', b'abcdefgh')
120 with self.FileIO(TESTFN, 'r') as f:
121 n = f.readinto(a)
122 self.assertEqual(a, array('b', [1, 2, 0, -1, 101, 102, 103, 104]))
123 self.assertEqual(n, 4)
124
125 a = array('I', b'abcdefgh')
126 with self.FileIO(TESTFN, 'r') as f:
127 n = f.readinto(a)
128 self.assertEqual(a, array('I', b'\x01\x02\x00\xffefgh'))
129 self.assertEqual(n, 4)
130
131 def testWritelinesList(self):
132 l = [b'123', b'456']
133 self.f.writelines(l)
134 self.f.close()
135 self.f = self.FileIO(TESTFN, 'rb')
136 buf = self.f.read()
137 self.assertEqual(buf, b'123456')
138
139 def testWritelinesUserList(self):
140 l = UserList([b'123', b'456'])
141 self.f.writelines(l)
142 self.f.close()
143 self.f = self.FileIO(TESTFN, 'rb')
144 buf = self.f.read()
145 self.assertEqual(buf, b'123456')
146
147 def testWritelinesError(self):
148 self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
149 self.assertRaises(TypeError, self.f.writelines, None)
150 self.assertRaises(TypeError, self.f.writelines, "abc")
151
152 def test_none_args(self):
153 self.f.write(b"hi\nbye\nabc")
154 self.f.close()
155 self.f = self.FileIO(TESTFN, 'r')
156 self.assertEqual(self.f.read(None), b"hi\nbye\nabc")
157 self.f.seek(0)
158 self.assertEqual(self.f.readline(None), b"hi\n")
159 self.assertEqual(self.f.readlines(None), [b"bye\n", b"abc"])
160
161 def test_reject(self):
162 self.assertRaises(TypeError, self.f.write, "Hello!")
163
164 def testRepr(self):
165 self.assertEqual(repr(self.f),
166 "<%s.FileIO name=%r mode=%r closefd=True>" %
167 (self.modulename, self.f.name, self.f.mode))
168 del self.f.name
169 self.assertEqual(repr(self.f),
170 "<%s.FileIO fd=%r mode=%r closefd=True>" %
171 (self.modulename, self.f.fileno(), self.f.mode))
172 self.f.close()
173 self.assertEqual(repr(self.f),
174 "<%s.FileIO [closed]>" % (self.modulename,))
175
176 def testReprNoCloseFD(self):
177 fd = os.open(TESTFN, os.O_RDONLY)
178 try:
179 with self.FileIO(fd, 'r', closefd=False) as f:
180 self.assertEqual(repr(f),
181 "<%s.FileIO name=%r mode=%r closefd=False>" %
182 (self.modulename, f.name, f.mode))
183 finally:
184 os.close(fd)
185
186 def testRecursiveRepr(self):
187 # Issue #25455
188 with swap_attr(self.f, 'name', self.f):
189 with self.assertRaises(RuntimeError):
190 repr(self.f) # Should not crash
191
192 def testErrors(self):
193 f = self.f
194 self.assertFalse(f.isatty())
195 self.assertFalse(f.closed)
196 #self.assertEqual(f.name, TESTFN)
197 self.assertRaises(ValueError, f.read, 10) # Open for reading
198 f.close()
199 self.assertTrue(f.closed)
200 f = self.FileIO(TESTFN, 'r')
201 self.assertRaises(TypeError, f.readinto, "")
202 self.assertFalse(f.closed)
203 f.close()
204 self.assertTrue(f.closed)
205
206 def testMethods(self):
207 methods = ['fileno', 'isatty', 'seekable', 'readable', 'writable',
208 'read', 'readall', 'readline', 'readlines',
209 'tell', 'truncate', 'flush']
210
211 self.f.close()
212 self.assertTrue(self.f.closed)
213
214 for methodname in methods:
215 method = getattr(self.f, methodname)
216 # should raise on closed file
217 self.assertRaises(ValueError, method)
218
219 self.assertRaises(TypeError, self.f.readinto)
220 self.assertRaises(ValueError, self.f.readinto, bytearray(1))
221 self.assertRaises(TypeError, self.f.seek)
222 self.assertRaises(ValueError, self.f.seek, 0)
223 self.assertRaises(TypeError, self.f.write)
224 self.assertRaises(ValueError, self.f.write, b'')
225 self.assertRaises(TypeError, self.f.writelines)
226 self.assertRaises(ValueError, self.f.writelines, b'')
227
228 def testOpendir(self):
229 # Issue 3703: opening a directory should fill the errno
230 # Windows always returns "[Errno 13]: Permission denied
231 # Unix uses fstat and returns "[Errno 21]: Is a directory"
232 try:
233 self.FileIO('.', 'r')
234 except OSError as e:
235 self.assertNotEqual(e.errno, 0)
236 self.assertEqual(e.filename, ".")
237 else:
238 self.fail("Should have raised OSError")
239
240 @unittest.skipIf(os.name == 'nt', "test only works on a POSIX-like system")
241 def testOpenDirFD(self):
242 fd = os.open('.', os.O_RDONLY)
243 with self.assertRaises(OSError) as cm:
244 self.FileIO(fd, 'r')
245 os.close(fd)
246 self.assertEqual(cm.exception.errno, errno.EISDIR)
247
248 #A set of functions testing that we get expected behaviour if someone has
249 #manually closed the internal file descriptor. First, a decorator:
250 def ClosedFD(func):
251 @wraps(func)
252 def wrapper(self):
253 #forcibly close the fd before invoking the problem function
254 f = self.f
255 os.close(f.fileno())
256 try:
257 func(self, f)
258 finally:
259 try:
260 self.f.close()
261 except OSError:
262 pass
263 return wrapper
264
265 def ClosedFDRaises(func):
266 @wraps(func)
267 def wrapper(self):
268 #forcibly close the fd before invoking the problem function
269 f = self.f
270 os.close(f.fileno())
271 try:
272 func(self, f)
273 except OSError as e:
274 self.assertEqual(e.errno, errno.EBADF)
275 else:
276 self.fail("Should have raised OSError")
277 finally:
278 try:
279 self.f.close()
280 except OSError:
281 pass
282 return wrapper
283
284 @ClosedFDRaises
285 def testErrnoOnClose(self, f):
286 f.close()
287
288 @ClosedFDRaises
289 def testErrnoOnClosedWrite(self, f):
290 f.write(b'a')
291
292 @ClosedFDRaises
293 def testErrnoOnClosedSeek(self, f):
294 f.seek(0)
295
296 @ClosedFDRaises
297 def testErrnoOnClosedTell(self, f):
298 f.tell()
299
300 @ClosedFDRaises
301 def testErrnoOnClosedTruncate(self, f):
302 f.truncate(0)
303
304 @ClosedFD
305 def testErrnoOnClosedSeekable(self, f):
306 f.seekable()
307
308 @ClosedFD
309 def testErrnoOnClosedReadable(self, f):
310 f.readable()
311
312 @ClosedFD
313 def testErrnoOnClosedWritable(self, f):
314 f.writable()
315
316 @ClosedFD
317 def testErrnoOnClosedFileno(self, f):
318 f.fileno()
319
320 @ClosedFD
321 def testErrnoOnClosedIsatty(self, f):
322 self.assertEqual(f.isatty(), False)
323
324 def ReopenForRead(self):
325 try:
326 self.f.close()
327 except OSError:
328 pass
329 self.f = self.FileIO(TESTFN, 'r')
330 os.close(self.f.fileno())
331 return self.f
332
333 @ClosedFDRaises
334 def testErrnoOnClosedRead(self, f):
335 f = self.ReopenForRead()
336 f.read(1)
337
338 @ClosedFDRaises
339 def testErrnoOnClosedReadall(self, f):
340 f = self.ReopenForRead()
341 f.readall()
342
343 @ClosedFDRaises
344 def testErrnoOnClosedReadinto(self, f):
345 f = self.ReopenForRead()
346 a = array('b', b'x'*10)
347 f.readinto(a)
348
349 class ESC[4;38;5;81mCAutoFileTests(ESC[4;38;5;149mAutoFileTests, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
350 FileIO = _io.FileIO
351 modulename = '_io'
352
353 class ESC[4;38;5;81mPyAutoFileTests(ESC[4;38;5;149mAutoFileTests, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
354 FileIO = _pyio.FileIO
355 modulename = '_pyio'
356
357
358 class ESC[4;38;5;81mOtherFileTests:
359
360 def testAbles(self):
361 try:
362 f = self.FileIO(TESTFN, "w")
363 self.assertEqual(f.readable(), False)
364 self.assertEqual(f.writable(), True)
365 self.assertEqual(f.seekable(), True)
366 f.close()
367
368 f = self.FileIO(TESTFN, "r")
369 self.assertEqual(f.readable(), True)
370 self.assertEqual(f.writable(), False)
371 self.assertEqual(f.seekable(), True)
372 f.close()
373
374 f = self.FileIO(TESTFN, "a+")
375 self.assertEqual(f.readable(), True)
376 self.assertEqual(f.writable(), True)
377 self.assertEqual(f.seekable(), True)
378 self.assertEqual(f.isatty(), False)
379 f.close()
380
381 if sys.platform != "win32" and not is_emscripten:
382 try:
383 f = self.FileIO("/dev/tty", "a")
384 except OSError:
385 # When run in a cron job there just aren't any
386 # ttys, so skip the test. This also handles other
387 # OS'es that don't support /dev/tty.
388 pass
389 else:
390 self.assertEqual(f.readable(), False)
391 self.assertEqual(f.writable(), True)
392 if sys.platform != "darwin" and \
393 'bsd' not in sys.platform and \
394 not sys.platform.startswith(('sunos', 'aix')):
395 # Somehow /dev/tty appears seekable on some BSDs
396 self.assertEqual(f.seekable(), False)
397 self.assertEqual(f.isatty(), True)
398 f.close()
399 finally:
400 os.unlink(TESTFN)
401
402 def testInvalidModeStrings(self):
403 # check invalid mode strings
404 for mode in ("", "aU", "wU+", "rw", "rt"):
405 try:
406 f = self.FileIO(TESTFN, mode)
407 except ValueError:
408 pass
409 else:
410 f.close()
411 self.fail('%r is an invalid file mode' % mode)
412
413 def testModeStrings(self):
414 # test that the mode attribute is correct for various mode strings
415 # given as init args
416 try:
417 for modes in [('w', 'wb'), ('wb', 'wb'), ('wb+', 'rb+'),
418 ('w+b', 'rb+'), ('a', 'ab'), ('ab', 'ab'),
419 ('ab+', 'ab+'), ('a+b', 'ab+'), ('r', 'rb'),
420 ('rb', 'rb'), ('rb+', 'rb+'), ('r+b', 'rb+')]:
421 # read modes are last so that TESTFN will exist first
422 with self.FileIO(TESTFN, modes[0]) as f:
423 self.assertEqual(f.mode, modes[1])
424 finally:
425 if os.path.exists(TESTFN):
426 os.unlink(TESTFN)
427
428 def testUnicodeOpen(self):
429 # verify repr works for unicode too
430 f = self.FileIO(str(TESTFN), "w")
431 f.close()
432 os.unlink(TESTFN)
433
434 def testBytesOpen(self):
435 # Opening a bytes filename
436 fn = TESTFN_ASCII.encode("ascii")
437 f = self.FileIO(fn, "w")
438 try:
439 f.write(b"abc")
440 f.close()
441 with open(TESTFN_ASCII, "rb") as f:
442 self.assertEqual(f.read(), b"abc")
443 finally:
444 os.unlink(TESTFN_ASCII)
445
446 @unittest.skipIf(sys.getfilesystemencoding() != 'utf-8',
447 "test only works for utf-8 filesystems")
448 def testUtf8BytesOpen(self):
449 # Opening a UTF-8 bytes filename
450 try:
451 fn = TESTFN_UNICODE.encode("utf-8")
452 except UnicodeEncodeError:
453 self.skipTest('could not encode %r to utf-8' % TESTFN_UNICODE)
454 f = self.FileIO(fn, "w")
455 try:
456 f.write(b"abc")
457 f.close()
458 with open(TESTFN_UNICODE, "rb") as f:
459 self.assertEqual(f.read(), b"abc")
460 finally:
461 os.unlink(TESTFN_UNICODE)
462
463 def testConstructorHandlesNULChars(self):
464 fn_with_NUL = 'foo\0bar'
465 self.assertRaises(ValueError, self.FileIO, fn_with_NUL, 'w')
466 self.assertRaises(ValueError, self.FileIO, bytes(fn_with_NUL, 'ascii'), 'w')
467
468 def testInvalidFd(self):
469 self.assertRaises(ValueError, self.FileIO, -10)
470 self.assertRaises(OSError, self.FileIO, make_bad_fd())
471 if sys.platform == 'win32':
472 import msvcrt
473 self.assertRaises(OSError, msvcrt.get_osfhandle, make_bad_fd())
474
475 def testBadModeArgument(self):
476 # verify that we get a sensible error message for bad mode argument
477 bad_mode = "qwerty"
478 try:
479 f = self.FileIO(TESTFN, bad_mode)
480 except ValueError as msg:
481 if msg.args[0] != 0:
482 s = str(msg)
483 if TESTFN in s or bad_mode not in s:
484 self.fail("bad error message for invalid mode: %s" % s)
485 # if msg.args[0] == 0, we're probably on Windows where there may be
486 # no obvious way to discover why open() failed.
487 else:
488 f.close()
489 self.fail("no error for invalid mode: %s" % bad_mode)
490
491 def testTruncate(self):
492 f = self.FileIO(TESTFN, 'w')
493 f.write(bytes(bytearray(range(10))))
494 self.assertEqual(f.tell(), 10)
495 f.truncate(5)
496 self.assertEqual(f.tell(), 10)
497 self.assertEqual(f.seek(0, io.SEEK_END), 5)
498 f.truncate(15)
499 self.assertEqual(f.tell(), 5)
500 self.assertEqual(f.seek(0, io.SEEK_END), 15)
501 f.close()
502
503 def testTruncateOnWindows(self):
504 def bug801631():
505 # SF bug <https://bugs.python.org/issue801631>
506 # "file.truncate fault on windows"
507 f = self.FileIO(TESTFN, 'w')
508 f.write(bytes(range(11)))
509 f.close()
510
511 f = self.FileIO(TESTFN,'r+')
512 data = f.read(5)
513 if data != bytes(range(5)):
514 self.fail("Read on file opened for update failed %r" % data)
515 if f.tell() != 5:
516 self.fail("File pos after read wrong %d" % f.tell())
517
518 f.truncate()
519 if f.tell() != 5:
520 self.fail("File pos after ftruncate wrong %d" % f.tell())
521
522 f.close()
523 size = os.path.getsize(TESTFN)
524 if size != 5:
525 self.fail("File size after ftruncate wrong %d" % size)
526
527 try:
528 bug801631()
529 finally:
530 os.unlink(TESTFN)
531
532 def testAppend(self):
533 try:
534 f = open(TESTFN, 'wb')
535 f.write(b'spam')
536 f.close()
537 f = open(TESTFN, 'ab')
538 f.write(b'eggs')
539 f.close()
540 f = open(TESTFN, 'rb')
541 d = f.read()
542 f.close()
543 self.assertEqual(d, b'spameggs')
544 finally:
545 try:
546 os.unlink(TESTFN)
547 except:
548 pass
549
550 def testInvalidInit(self):
551 self.assertRaises(TypeError, self.FileIO, "1", 0, 0)
552
553 def testWarnings(self):
554 with check_warnings(quiet=True) as w:
555 self.assertEqual(w.warnings, [])
556 self.assertRaises(TypeError, self.FileIO, [])
557 self.assertEqual(w.warnings, [])
558 self.assertRaises(ValueError, self.FileIO, "/some/invalid/name", "rt")
559 self.assertEqual(w.warnings, [])
560
561 def testUnclosedFDOnException(self):
562 class ESC[4;38;5;81mMyException(ESC[4;38;5;149mException): pass
563 class ESC[4;38;5;81mMyFileIO(ESC[4;38;5;149mselfESC[4;38;5;149m.ESC[4;38;5;149mFileIO):
564 def __setattr__(self, name, value):
565 if name == "name":
566 raise MyException("blocked setting name")
567 return super(MyFileIO, self).__setattr__(name, value)
568 fd = os.open(__file__, os.O_RDONLY)
569 self.assertRaises(MyException, MyFileIO, fd)
570 os.close(fd) # should not raise OSError(EBADF)
571
572
573 class ESC[4;38;5;81mCOtherFileTests(ESC[4;38;5;149mOtherFileTests, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
574 FileIO = _io.FileIO
575 modulename = '_io'
576
577 @cpython_only
578 def testInvalidFd_overflow(self):
579 # Issue 15989
580 import _testcapi
581 self.assertRaises(TypeError, self.FileIO, _testcapi.INT_MAX + 1)
582 self.assertRaises(TypeError, self.FileIO, _testcapi.INT_MIN - 1)
583
584 def test_open_code(self):
585 # Check that the default behaviour of open_code matches
586 # open("rb")
587 with self.FileIO(__file__, "rb") as f:
588 expected = f.read()
589 with _io.open_code(__file__) as f:
590 actual = f.read()
591 self.assertEqual(expected, actual)
592
593
594 class ESC[4;38;5;81mPyOtherFileTests(ESC[4;38;5;149mOtherFileTests, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
595 FileIO = _pyio.FileIO
596 modulename = '_pyio'
597
598 def test_open_code(self):
599 # Check that the default behaviour of open_code matches
600 # open("rb")
601 with self.FileIO(__file__, "rb") as f:
602 expected = f.read()
603 with check_warnings(quiet=True) as w:
604 # Always test _open_code_with_warning
605 with _pyio._open_code_with_warning(__file__) as f:
606 actual = f.read()
607 self.assertEqual(expected, actual)
608 self.assertNotEqual(w.warnings, [])
609
610
611 def tearDownModule():
612 # Historically, these tests have been sloppy about removing TESTFN.
613 # So get rid of it no matter what.
614 if os.path.exists(TESTFN):
615 os.unlink(TESTFN)
616
617
618 if __name__ == '__main__':
619 unittest.main()