(root)/
Python-3.12.0/
Lib/
test/
test_fileio.py
       1  # Adapted from test_file.py by Daniel Stutzbach
       2  
       3  import sys
       4  import os
       5  import io
       6  import errno
       7  import unittest
       8  from array import array
       9  from weakref import proxy
      10  from functools import wraps
      11  
      12  from test.support import (
      13      cpython_only, swap_attr, gc_collect, is_emscripten, is_wasi
      14  )
      15  from test.support.os_helper import (
      16      TESTFN, TESTFN_ASCII, TESTFN_UNICODE, make_bad_fd,
      17      )
      18  from test.support.warnings_helper import check_warnings
      19  from collections import UserList
      20  
      21  import _io  # C implementation of io
      22  import _pyio # Python implementation of io
      23  
      24  
      25  class ESC[4;38;5;81mAutoFileTests:
      26      # file tests for which a test file is automatically set up
      27  
      28      def setUp(self):
      29          self.f = self.FileIO(TESTFN, 'w')
      30  
      31      def tearDown(self):
      32          if self.f:
      33              self.f.close()
      34          os.remove(TESTFN)
      35  
      36      def testWeakRefs(self):
      37          # verify weak references
      38          p = proxy(self.f)
      39          p.write(bytes(range(10)))
      40          self.assertEqual(self.f.tell(), p.tell())
      41          self.f.close()
      42          self.f = None
      43          gc_collect()  # For PyPy or other GCs.
      44          self.assertRaises(ReferenceError, getattr, p, 'tell')
      45  
      46      def testSeekTell(self):
      47          self.f.write(bytes(range(20)))
      48          self.assertEqual(self.f.tell(), 20)
      49          self.f.seek(0)
      50          self.assertEqual(self.f.tell(), 0)
      51          self.f.seek(10)
      52          self.assertEqual(self.f.tell(), 10)
      53          self.f.seek(5, 1)
      54          self.assertEqual(self.f.tell(), 15)
      55          self.f.seek(-5, 1)
      56          self.assertEqual(self.f.tell(), 10)
      57          self.f.seek(-5, 2)
      58          self.assertEqual(self.f.tell(), 15)
      59  
      60      def testAttributes(self):
      61          # verify expected attributes exist
      62          f = self.f
      63  
      64          self.assertEqual(f.mode, "wb")
      65          self.assertEqual(f.closed, False)
      66  
      67          # verify the attributes are readonly
      68          for attr in 'mode', 'closed':
      69              self.assertRaises((AttributeError, TypeError),
      70                                setattr, f, attr, 'oops')
      71  
      72      @unittest.skipIf(is_wasi, "WASI does not expose st_blksize.")
      73      def testBlksize(self):
      74          # test private _blksize attribute
      75          blksize = io.DEFAULT_BUFFER_SIZE
      76          # try to get preferred blksize from stat.st_blksize, if available
      77          if hasattr(os, 'fstat'):
      78              fst = os.fstat(self.f.fileno())
      79              blksize = getattr(fst, 'st_blksize', blksize)
      80          self.assertEqual(self.f._blksize, blksize)
      81  
      82      # verify readinto
      83      def testReadintoByteArray(self):
      84          self.f.write(bytes([1, 2, 0, 255]))
      85          self.f.close()
      86  
      87          ba = bytearray(b'abcdefgh')
      88          with self.FileIO(TESTFN, 'r') as f:
      89              n = f.readinto(ba)
      90          self.assertEqual(ba, b'\x01\x02\x00\xffefgh')
      91          self.assertEqual(n, 4)
      92  
      93      def _testReadintoMemoryview(self):
      94          self.f.write(bytes([1, 2, 0, 255]))
      95          self.f.close()
      96  
      97          m = memoryview(bytearray(b'abcdefgh'))
      98          with self.FileIO(TESTFN, 'r') as f:
      99              n = f.readinto(m)
     100          self.assertEqual(m, b'\x01\x02\x00\xffefgh')
     101          self.assertEqual(n, 4)
     102  
     103          m = memoryview(bytearray(b'abcdefgh')).cast('H', shape=[2, 2])
     104          with self.FileIO(TESTFN, 'r') as f:
     105              n = f.readinto(m)
     106          self.assertEqual(bytes(m), b'\x01\x02\x00\xffefgh')
     107          self.assertEqual(n, 4)
     108  
     109      def _testReadintoArray(self):
     110          self.f.write(bytes([1, 2, 0, 255]))
     111          self.f.close()
     112  
     113          a = array('B', b'abcdefgh')
     114          with self.FileIO(TESTFN, 'r') as f:
     115              n = f.readinto(a)
     116          self.assertEqual(a, array('B', [1, 2, 0, 255, 101, 102, 103, 104]))
     117          self.assertEqual(n, 4)
     118  
     119          a = array('b', b'abcdefgh')
     120          with self.FileIO(TESTFN, 'r') as f:
     121              n = f.readinto(a)
     122          self.assertEqual(a, array('b', [1, 2, 0, -1, 101, 102, 103, 104]))
     123          self.assertEqual(n, 4)
     124  
     125          a = array('I', b'abcdefgh')
     126          with self.FileIO(TESTFN, 'r') as f:
     127              n = f.readinto(a)
     128          self.assertEqual(a, array('I', b'\x01\x02\x00\xffefgh'))
     129          self.assertEqual(n, 4)
     130  
     131      def testWritelinesList(self):
     132          l = [b'123', b'456']
     133          self.f.writelines(l)
     134          self.f.close()
     135          self.f = self.FileIO(TESTFN, 'rb')
     136          buf = self.f.read()
     137          self.assertEqual(buf, b'123456')
     138  
     139      def testWritelinesUserList(self):
     140          l = UserList([b'123', b'456'])
     141          self.f.writelines(l)
     142          self.f.close()
     143          self.f = self.FileIO(TESTFN, 'rb')
     144          buf = self.f.read()
     145          self.assertEqual(buf, b'123456')
     146  
     147      def testWritelinesError(self):
     148          self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
     149          self.assertRaises(TypeError, self.f.writelines, None)
     150          self.assertRaises(TypeError, self.f.writelines, "abc")
     151  
     152      def test_none_args(self):
     153          self.f.write(b"hi\nbye\nabc")
     154          self.f.close()
     155          self.f = self.FileIO(TESTFN, 'r')
     156          self.assertEqual(self.f.read(None), b"hi\nbye\nabc")
     157          self.f.seek(0)
     158          self.assertEqual(self.f.readline(None), b"hi\n")
     159          self.assertEqual(self.f.readlines(None), [b"bye\n", b"abc"])
     160  
     161      def test_reject(self):
     162          self.assertRaises(TypeError, self.f.write, "Hello!")
     163  
     164      def testRepr(self):
     165          self.assertEqual(repr(self.f),
     166                           "<%s.FileIO name=%r mode=%r closefd=True>" %
     167                           (self.modulename, self.f.name, self.f.mode))
     168          del self.f.name
     169          self.assertEqual(repr(self.f),
     170                           "<%s.FileIO fd=%r mode=%r closefd=True>" %
     171                           (self.modulename, self.f.fileno(), self.f.mode))
     172          self.f.close()
     173          self.assertEqual(repr(self.f),
     174                           "<%s.FileIO [closed]>" % (self.modulename,))
     175  
     176      def testReprNoCloseFD(self):
     177          fd = os.open(TESTFN, os.O_RDONLY)
     178          try:
     179              with self.FileIO(fd, 'r', closefd=False) as f:
     180                  self.assertEqual(repr(f),
     181                                   "<%s.FileIO name=%r mode=%r closefd=False>" %
     182                                   (self.modulename, f.name, f.mode))
     183          finally:
     184              os.close(fd)
     185  
     186      def testRecursiveRepr(self):
     187          # Issue #25455
     188          with swap_attr(self.f, 'name', self.f):
     189              with self.assertRaises(RuntimeError):
     190                  repr(self.f)  # Should not crash
     191  
     192      def testErrors(self):
     193          f = self.f
     194          self.assertFalse(f.isatty())
     195          self.assertFalse(f.closed)
     196          #self.assertEqual(f.name, TESTFN)
     197          self.assertRaises(ValueError, f.read, 10) # Open for reading
     198          f.close()
     199          self.assertTrue(f.closed)
     200          f = self.FileIO(TESTFN, 'r')
     201          self.assertRaises(TypeError, f.readinto, "")
     202          self.assertFalse(f.closed)
     203          f.close()
     204          self.assertTrue(f.closed)
     205  
     206      def testMethods(self):
     207          methods = ['fileno', 'isatty', 'seekable', 'readable', 'writable',
     208                     'read', 'readall', 'readline', 'readlines',
     209                     'tell', 'truncate', 'flush']
     210  
     211          self.f.close()
     212          self.assertTrue(self.f.closed)
     213  
     214          for methodname in methods:
     215              method = getattr(self.f, methodname)
     216              # should raise on closed file
     217              self.assertRaises(ValueError, method)
     218  
     219          self.assertRaises(TypeError, self.f.readinto)
     220          self.assertRaises(ValueError, self.f.readinto, bytearray(1))
     221          self.assertRaises(TypeError, self.f.seek)
     222          self.assertRaises(ValueError, self.f.seek, 0)
     223          self.assertRaises(TypeError, self.f.write)
     224          self.assertRaises(ValueError, self.f.write, b'')
     225          self.assertRaises(TypeError, self.f.writelines)
     226          self.assertRaises(ValueError, self.f.writelines, b'')
     227  
     228      def testOpendir(self):
     229          # Issue 3703: opening a directory should fill the errno
     230          # Windows always returns "[Errno 13]: Permission denied
     231          # Unix uses fstat and returns "[Errno 21]: Is a directory"
     232          try:
     233              self.FileIO('.', 'r')
     234          except OSError as e:
     235              self.assertNotEqual(e.errno, 0)
     236              self.assertEqual(e.filename, ".")
     237          else:
     238              self.fail("Should have raised OSError")
     239  
     240      @unittest.skipIf(os.name == 'nt', "test only works on a POSIX-like system")
     241      def testOpenDirFD(self):
     242          fd = os.open('.', os.O_RDONLY)
     243          with self.assertRaises(OSError) as cm:
     244              self.FileIO(fd, 'r')
     245          os.close(fd)
     246          self.assertEqual(cm.exception.errno, errno.EISDIR)
     247  
     248      #A set of functions testing that we get expected behaviour if someone has
     249      #manually closed the internal file descriptor.  First, a decorator:
     250      def ClosedFD(func):
     251          @wraps(func)
     252          def wrapper(self):
     253              #forcibly close the fd before invoking the problem function
     254              f = self.f
     255              os.close(f.fileno())
     256              try:
     257                  func(self, f)
     258              finally:
     259                  try:
     260                      self.f.close()
     261                  except OSError:
     262                      pass
     263          return wrapper
     264  
     265      def ClosedFDRaises(func):
     266          @wraps(func)
     267          def wrapper(self):
     268              #forcibly close the fd before invoking the problem function
     269              f = self.f
     270              os.close(f.fileno())
     271              try:
     272                  func(self, f)
     273              except OSError as e:
     274                  self.assertEqual(e.errno, errno.EBADF)
     275              else:
     276                  self.fail("Should have raised OSError")
     277              finally:
     278                  try:
     279                      self.f.close()
     280                  except OSError:
     281                      pass
     282          return wrapper
     283  
     284      @ClosedFDRaises
     285      def testErrnoOnClose(self, f):
     286          f.close()
     287  
     288      @ClosedFDRaises
     289      def testErrnoOnClosedWrite(self, f):
     290          f.write(b'a')
     291  
     292      @ClosedFDRaises
     293      def testErrnoOnClosedSeek(self, f):
     294          f.seek(0)
     295  
     296      @ClosedFDRaises
     297      def testErrnoOnClosedTell(self, f):
     298          f.tell()
     299  
     300      @ClosedFDRaises
     301      def testErrnoOnClosedTruncate(self, f):
     302          f.truncate(0)
     303  
     304      @ClosedFD
     305      def testErrnoOnClosedSeekable(self, f):
     306          f.seekable()
     307  
     308      @ClosedFD
     309      def testErrnoOnClosedReadable(self, f):
     310          f.readable()
     311  
     312      @ClosedFD
     313      def testErrnoOnClosedWritable(self, f):
     314          f.writable()
     315  
     316      @ClosedFD
     317      def testErrnoOnClosedFileno(self, f):
     318          f.fileno()
     319  
     320      @ClosedFD
     321      def testErrnoOnClosedIsatty(self, f):
     322          self.assertEqual(f.isatty(), False)
     323  
     324      def ReopenForRead(self):
     325          try:
     326              self.f.close()
     327          except OSError:
     328              pass
     329          self.f = self.FileIO(TESTFN, 'r')
     330          os.close(self.f.fileno())
     331          return self.f
     332  
     333      @ClosedFDRaises
     334      def testErrnoOnClosedRead(self, f):
     335          f = self.ReopenForRead()
     336          f.read(1)
     337  
     338      @ClosedFDRaises
     339      def testErrnoOnClosedReadall(self, f):
     340          f = self.ReopenForRead()
     341          f.readall()
     342  
     343      @ClosedFDRaises
     344      def testErrnoOnClosedReadinto(self, f):
     345          f = self.ReopenForRead()
     346          a = array('b', b'x'*10)
     347          f.readinto(a)
     348  
     349  class ESC[4;38;5;81mCAutoFileTests(ESC[4;38;5;149mAutoFileTests, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     350      FileIO = _io.FileIO
     351      modulename = '_io'
     352  
     353  class ESC[4;38;5;81mPyAutoFileTests(ESC[4;38;5;149mAutoFileTests, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     354      FileIO = _pyio.FileIO
     355      modulename = '_pyio'
     356  
     357  
     358  class ESC[4;38;5;81mOtherFileTests:
     359  
     360      def testAbles(self):
     361          try:
     362              f = self.FileIO(TESTFN, "w")
     363              self.assertEqual(f.readable(), False)
     364              self.assertEqual(f.writable(), True)
     365              self.assertEqual(f.seekable(), True)
     366              f.close()
     367  
     368              f = self.FileIO(TESTFN, "r")
     369              self.assertEqual(f.readable(), True)
     370              self.assertEqual(f.writable(), False)
     371              self.assertEqual(f.seekable(), True)
     372              f.close()
     373  
     374              f = self.FileIO(TESTFN, "a+")
     375              self.assertEqual(f.readable(), True)
     376              self.assertEqual(f.writable(), True)
     377              self.assertEqual(f.seekable(), True)
     378              self.assertEqual(f.isatty(), False)
     379              f.close()
     380  
     381              if sys.platform != "win32" and not is_emscripten:
     382                  try:
     383                      f = self.FileIO("/dev/tty", "a")
     384                  except OSError:
     385                      # When run in a cron job there just aren't any
     386                      # ttys, so skip the test.  This also handles other
     387                      # OS'es that don't support /dev/tty.
     388                      pass
     389                  else:
     390                      self.assertEqual(f.readable(), False)
     391                      self.assertEqual(f.writable(), True)
     392                      if sys.platform != "darwin" and \
     393                         'bsd' not in sys.platform and \
     394                         not sys.platform.startswith(('sunos', 'aix')):
     395                          # Somehow /dev/tty appears seekable on some BSDs
     396                          self.assertEqual(f.seekable(), False)
     397                      self.assertEqual(f.isatty(), True)
     398                      f.close()
     399          finally:
     400              os.unlink(TESTFN)
     401  
     402      def testInvalidModeStrings(self):
     403          # check invalid mode strings
     404          for mode in ("", "aU", "wU+", "rw", "rt"):
     405              try:
     406                  f = self.FileIO(TESTFN, mode)
     407              except ValueError:
     408                  pass
     409              else:
     410                  f.close()
     411                  self.fail('%r is an invalid file mode' % mode)
     412  
     413      def testModeStrings(self):
     414          # test that the mode attribute is correct for various mode strings
     415          # given as init args
     416          try:
     417              for modes in [('w', 'wb'), ('wb', 'wb'), ('wb+', 'rb+'),
     418                            ('w+b', 'rb+'), ('a', 'ab'), ('ab', 'ab'),
     419                            ('ab+', 'ab+'), ('a+b', 'ab+'), ('r', 'rb'),
     420                            ('rb', 'rb'), ('rb+', 'rb+'), ('r+b', 'rb+')]:
     421                  # read modes are last so that TESTFN will exist first
     422                  with self.FileIO(TESTFN, modes[0]) as f:
     423                      self.assertEqual(f.mode, modes[1])
     424          finally:
     425              if os.path.exists(TESTFN):
     426                  os.unlink(TESTFN)
     427  
     428      def testUnicodeOpen(self):
     429          # verify repr works for unicode too
     430          f = self.FileIO(str(TESTFN), "w")
     431          f.close()
     432          os.unlink(TESTFN)
     433  
     434      def testBytesOpen(self):
     435          # Opening a bytes filename
     436          fn = TESTFN_ASCII.encode("ascii")
     437          f = self.FileIO(fn, "w")
     438          try:
     439              f.write(b"abc")
     440              f.close()
     441              with open(TESTFN_ASCII, "rb") as f:
     442                  self.assertEqual(f.read(), b"abc")
     443          finally:
     444              os.unlink(TESTFN_ASCII)
     445  
     446      @unittest.skipIf(sys.getfilesystemencoding() != 'utf-8',
     447                       "test only works for utf-8 filesystems")
     448      def testUtf8BytesOpen(self):
     449          # Opening a UTF-8 bytes filename
     450          try:
     451              fn = TESTFN_UNICODE.encode("utf-8")
     452          except UnicodeEncodeError:
     453              self.skipTest('could not encode %r to utf-8' % TESTFN_UNICODE)
     454          f = self.FileIO(fn, "w")
     455          try:
     456              f.write(b"abc")
     457              f.close()
     458              with open(TESTFN_UNICODE, "rb") as f:
     459                  self.assertEqual(f.read(), b"abc")
     460          finally:
     461              os.unlink(TESTFN_UNICODE)
     462  
     463      def testConstructorHandlesNULChars(self):
     464          fn_with_NUL = 'foo\0bar'
     465          self.assertRaises(ValueError, self.FileIO, fn_with_NUL, 'w')
     466          self.assertRaises(ValueError, self.FileIO, bytes(fn_with_NUL, 'ascii'), 'w')
     467  
     468      def testInvalidFd(self):
     469          self.assertRaises(ValueError, self.FileIO, -10)
     470          self.assertRaises(OSError, self.FileIO, make_bad_fd())
     471          if sys.platform == 'win32':
     472              import msvcrt
     473              self.assertRaises(OSError, msvcrt.get_osfhandle, make_bad_fd())
     474  
     475      def testBadModeArgument(self):
     476          # verify that we get a sensible error message for bad mode argument
     477          bad_mode = "qwerty"
     478          try:
     479              f = self.FileIO(TESTFN, bad_mode)
     480          except ValueError as msg:
     481              if msg.args[0] != 0:
     482                  s = str(msg)
     483                  if TESTFN in s or bad_mode not in s:
     484                      self.fail("bad error message for invalid mode: %s" % s)
     485              # if msg.args[0] == 0, we're probably on Windows where there may be
     486              # no obvious way to discover why open() failed.
     487          else:
     488              f.close()
     489              self.fail("no error for invalid mode: %s" % bad_mode)
     490  
     491      def testTruncate(self):
     492          f = self.FileIO(TESTFN, 'w')
     493          f.write(bytes(bytearray(range(10))))
     494          self.assertEqual(f.tell(), 10)
     495          f.truncate(5)
     496          self.assertEqual(f.tell(), 10)
     497          self.assertEqual(f.seek(0, io.SEEK_END), 5)
     498          f.truncate(15)
     499          self.assertEqual(f.tell(), 5)
     500          self.assertEqual(f.seek(0, io.SEEK_END), 15)
     501          f.close()
     502  
     503      def testTruncateOnWindows(self):
     504          def bug801631():
     505              # SF bug <https://bugs.python.org/issue801631>
     506              # "file.truncate fault on windows"
     507              f = self.FileIO(TESTFN, 'w')
     508              f.write(bytes(range(11)))
     509              f.close()
     510  
     511              f = self.FileIO(TESTFN,'r+')
     512              data = f.read(5)
     513              if data != bytes(range(5)):
     514                  self.fail("Read on file opened for update failed %r" % data)
     515              if f.tell() != 5:
     516                  self.fail("File pos after read wrong %d" % f.tell())
     517  
     518              f.truncate()
     519              if f.tell() != 5:
     520                  self.fail("File pos after ftruncate wrong %d" % f.tell())
     521  
     522              f.close()
     523              size = os.path.getsize(TESTFN)
     524              if size != 5:
     525                  self.fail("File size after ftruncate wrong %d" % size)
     526  
     527          try:
     528              bug801631()
     529          finally:
     530              os.unlink(TESTFN)
     531  
     532      def testAppend(self):
     533          try:
     534              f = open(TESTFN, 'wb')
     535              f.write(b'spam')
     536              f.close()
     537              f = open(TESTFN, 'ab')
     538              f.write(b'eggs')
     539              f.close()
     540              f = open(TESTFN, 'rb')
     541              d = f.read()
     542              f.close()
     543              self.assertEqual(d, b'spameggs')
     544          finally:
     545              try:
     546                  os.unlink(TESTFN)
     547              except:
     548                  pass
     549  
     550      def testInvalidInit(self):
     551          self.assertRaises(TypeError, self.FileIO, "1", 0, 0)
     552  
     553      def testWarnings(self):
     554          with check_warnings(quiet=True) as w:
     555              self.assertEqual(w.warnings, [])
     556              self.assertRaises(TypeError, self.FileIO, [])
     557              self.assertEqual(w.warnings, [])
     558              self.assertRaises(ValueError, self.FileIO, "/some/invalid/name", "rt")
     559              self.assertEqual(w.warnings, [])
     560  
     561      def testUnclosedFDOnException(self):
     562          class ESC[4;38;5;81mMyException(ESC[4;38;5;149mException): pass
     563          class ESC[4;38;5;81mMyFileIO(ESC[4;38;5;149mselfESC[4;38;5;149m.ESC[4;38;5;149mFileIO):
     564              def __setattr__(self, name, value):
     565                  if name == "name":
     566                      raise MyException("blocked setting name")
     567                  return super(MyFileIO, self).__setattr__(name, value)
     568          fd = os.open(__file__, os.O_RDONLY)
     569          self.assertRaises(MyException, MyFileIO, fd)
     570          os.close(fd)  # should not raise OSError(EBADF)
     571  
     572  
     573  class ESC[4;38;5;81mCOtherFileTests(ESC[4;38;5;149mOtherFileTests, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     574      FileIO = _io.FileIO
     575      modulename = '_io'
     576  
     577      @cpython_only
     578      def testInvalidFd_overflow(self):
     579          # Issue 15989
     580          import _testcapi
     581          self.assertRaises(TypeError, self.FileIO, _testcapi.INT_MAX + 1)
     582          self.assertRaises(TypeError, self.FileIO, _testcapi.INT_MIN - 1)
     583  
     584      def test_open_code(self):
     585          # Check that the default behaviour of open_code matches
     586          # open("rb")
     587          with self.FileIO(__file__, "rb") as f:
     588              expected = f.read()
     589          with _io.open_code(__file__) as f:
     590              actual = f.read()
     591          self.assertEqual(expected, actual)
     592  
     593  
     594  class ESC[4;38;5;81mPyOtherFileTests(ESC[4;38;5;149mOtherFileTests, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     595      FileIO = _pyio.FileIO
     596      modulename = '_pyio'
     597  
     598      def test_open_code(self):
     599          # Check that the default behaviour of open_code matches
     600          # open("rb")
     601          with self.FileIO(__file__, "rb") as f:
     602              expected = f.read()
     603          with check_warnings(quiet=True) as w:
     604              # Always test _open_code_with_warning
     605              with _pyio._open_code_with_warning(__file__) as f:
     606                  actual = f.read()
     607              self.assertEqual(expected, actual)
     608              self.assertNotEqual(w.warnings, [])
     609  
     610  
     611  def tearDownModule():
     612      # Historically, these tests have been sloppy about removing TESTFN.
     613      # So get rid of it no matter what.
     614      if os.path.exists(TESTFN):
     615          os.unlink(TESTFN)
     616  
     617  
     618  if __name__ == '__main__':
     619      unittest.main()