(root)/
Python-3.12.0/
Lib/
test/
test_largefile.py
       1  """Test largefile support on system where this makes sense.
       2  """
       3  
       4  import os
       5  import stat
       6  import sys
       7  import unittest
       8  import socket
       9  import shutil
      10  import threading
      11  from test.support import requires, bigmemtest, requires_resource
      12  from test.support import SHORT_TIMEOUT
      13  from test.support import socket_helper
      14  from test.support.os_helper import TESTFN, unlink
      15  import io  # C implementation of io
      16  import _pyio as pyio # Python implementation of io
      17  
      18  # size of file to create (>2 GiB; 2 GiB == 2,147,483,648 bytes)
      19  size = 2_500_000_000
      20  TESTFN2 = TESTFN + '2'
      21  
      22  
      23  class ESC[4;38;5;81mLargeFileTest:
      24  
      25      def setUp(self):
      26          if os.path.exists(TESTFN):
      27              mode = 'r+b'
      28          else:
      29              mode = 'w+b'
      30  
      31          with self.open(TESTFN, mode) as f:
      32              current_size = os.fstat(f.fileno())[stat.ST_SIZE]
      33              if current_size == size+1:
      34                  return
      35  
      36              if current_size == 0:
      37                  f.write(b'z')
      38  
      39              f.seek(0)
      40              f.seek(size)
      41              f.write(b'a')
      42              f.flush()
      43              self.assertEqual(os.fstat(f.fileno())[stat.ST_SIZE], size+1)
      44  
      45      @classmethod
      46      def tearDownClass(cls):
      47          with cls.open(TESTFN, 'wb'):
      48              pass
      49          if not os.stat(TESTFN)[stat.ST_SIZE] == 0:
      50              raise cls.failureException('File was not truncated by opening '
      51                                         'with mode "wb"')
      52          unlink(TESTFN2)
      53  
      54  
      55  class ESC[4;38;5;81mTestFileMethods(ESC[4;38;5;149mLargeFileTest):
      56      """Test that each file function works as expected for large
      57      (i.e. > 2 GiB) files.
      58      """
      59  
      60      # _pyio.FileIO.readall() uses a temporary bytearray then casted to bytes,
      61      # so memuse=2 is needed
      62      @bigmemtest(size=size, memuse=2, dry_run=False)
      63      def test_large_read(self, _size):
      64          # bpo-24658: Test that a read greater than 2GB does not fail.
      65          with self.open(TESTFN, "rb") as f:
      66              self.assertEqual(len(f.read()), size + 1)
      67              self.assertEqual(f.tell(), size + 1)
      68  
      69      def test_osstat(self):
      70          self.assertEqual(os.stat(TESTFN)[stat.ST_SIZE], size+1)
      71  
      72      def test_seek_read(self):
      73          with self.open(TESTFN, 'rb') as f:
      74              self.assertEqual(f.tell(), 0)
      75              self.assertEqual(f.read(1), b'z')
      76              self.assertEqual(f.tell(), 1)
      77              f.seek(0)
      78              self.assertEqual(f.tell(), 0)
      79              f.seek(0, 0)
      80              self.assertEqual(f.tell(), 0)
      81              f.seek(42)
      82              self.assertEqual(f.tell(), 42)
      83              f.seek(42, 0)
      84              self.assertEqual(f.tell(), 42)
      85              f.seek(42, 1)
      86              self.assertEqual(f.tell(), 84)
      87              f.seek(0, 1)
      88              self.assertEqual(f.tell(), 84)
      89              f.seek(0, 2)  # seek from the end
      90              self.assertEqual(f.tell(), size + 1 + 0)
      91              f.seek(-10, 2)
      92              self.assertEqual(f.tell(), size + 1 - 10)
      93              f.seek(-size-1, 2)
      94              self.assertEqual(f.tell(), 0)
      95              f.seek(size)
      96              self.assertEqual(f.tell(), size)
      97              # the 'a' that was written at the end of file above
      98              self.assertEqual(f.read(1), b'a')
      99              f.seek(-size-1, 1)
     100              self.assertEqual(f.read(1), b'z')
     101              self.assertEqual(f.tell(), 1)
     102  
     103      def test_lseek(self):
     104          with self.open(TESTFN, 'rb') as f:
     105              self.assertEqual(os.lseek(f.fileno(), 0, 0), 0)
     106              self.assertEqual(os.lseek(f.fileno(), 42, 0), 42)
     107              self.assertEqual(os.lseek(f.fileno(), 42, 1), 84)
     108              self.assertEqual(os.lseek(f.fileno(), 0, 1), 84)
     109              self.assertEqual(os.lseek(f.fileno(), 0, 2), size+1+0)
     110              self.assertEqual(os.lseek(f.fileno(), -10, 2), size+1-10)
     111              self.assertEqual(os.lseek(f.fileno(), -size-1, 2), 0)
     112              self.assertEqual(os.lseek(f.fileno(), size, 0), size)
     113              # the 'a' that was written at the end of file above
     114              self.assertEqual(f.read(1), b'a')
     115  
     116      def test_truncate(self):
     117          with self.open(TESTFN, 'r+b') as f:
     118              if not hasattr(f, 'truncate'):
     119                  raise unittest.SkipTest("open().truncate() not available "
     120                                          "on this system")
     121              f.seek(0, 2)
     122              # else we've lost track of the true size
     123              self.assertEqual(f.tell(), size+1)
     124              # Cut it back via seek + truncate with no argument.
     125              newsize = size - 10
     126              f.seek(newsize)
     127              f.truncate()
     128              self.assertEqual(f.tell(), newsize)  # else pointer moved
     129              f.seek(0, 2)
     130              self.assertEqual(f.tell(), newsize)  # else wasn't truncated
     131              # Ensure that truncate(smaller than true size) shrinks
     132              # the file.
     133              newsize -= 1
     134              f.seek(42)
     135              f.truncate(newsize)
     136              self.assertEqual(f.tell(), 42)
     137              f.seek(0, 2)
     138              self.assertEqual(f.tell(), newsize)
     139              # XXX truncate(larger than true size) is ill-defined
     140              # across platform; cut it waaaaay back
     141              f.seek(0)
     142              f.truncate(1)
     143              self.assertEqual(f.tell(), 0)       # else pointer moved
     144              f.seek(0)
     145              self.assertEqual(len(f.read()), 1)  # else wasn't truncated
     146  
     147      def test_seekable(self):
     148          # Issue #5016; seekable() can return False when the current position
     149          # is negative when truncated to an int.
     150          for pos in (2**31-1, 2**31, 2**31+1):
     151              with self.open(TESTFN, 'rb') as f:
     152                  f.seek(pos)
     153                  self.assertTrue(f.seekable())
     154  
     155  
     156  def skip_no_disk_space(path, required):
     157      def decorator(fun):
     158          def wrapper(*args, **kwargs):
     159              if not hasattr(shutil, "disk_usage"):
     160                  raise unittest.SkipTest("requires shutil.disk_usage")
     161              if shutil.disk_usage(os.path.realpath(path)).free < required:
     162                  hsize = int(required / 1024 / 1024)
     163                  raise unittest.SkipTest(
     164                      f"required {hsize} MiB of free disk space")
     165              return fun(*args, **kwargs)
     166          return wrapper
     167      return decorator
     168  
     169  
     170  class ESC[4;38;5;81mTestCopyfile(ESC[4;38;5;149mLargeFileTest, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     171      open = staticmethod(io.open)
     172  
     173      # Exact required disk space would be (size * 2), but let's give it a
     174      # bit more tolerance.
     175      @skip_no_disk_space(TESTFN, size * 2.5)
     176      @requires_resource('cpu')
     177      def test_it(self):
     178          # Internally shutil.copyfile() can use "fast copy" methods like
     179          # os.sendfile().
     180          size = os.path.getsize(TESTFN)
     181          shutil.copyfile(TESTFN, TESTFN2)
     182          self.assertEqual(os.path.getsize(TESTFN2), size)
     183          with open(TESTFN2, 'rb') as f:
     184              self.assertEqual(f.read(5), b'z\x00\x00\x00\x00')
     185              f.seek(size - 5)
     186              self.assertEqual(f.read(), b'\x00\x00\x00\x00a')
     187  
     188  
     189  @unittest.skipIf(not hasattr(os, 'sendfile'), 'sendfile not supported')
     190  class ESC[4;38;5;81mTestSocketSendfile(ESC[4;38;5;149mLargeFileTest, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     191      open = staticmethod(io.open)
     192      timeout = SHORT_TIMEOUT
     193  
     194      def setUp(self):
     195          super().setUp()
     196          self.thread = None
     197  
     198      def tearDown(self):
     199          super().tearDown()
     200          if self.thread is not None:
     201              self.thread.join(self.timeout)
     202              self.thread = None
     203  
     204      def tcp_server(self, sock):
     205          def run(sock):
     206              with sock:
     207                  conn, _ = sock.accept()
     208                  conn.settimeout(self.timeout)
     209                  with conn, open(TESTFN2, 'wb') as f:
     210                      event.wait(self.timeout)
     211                      while True:
     212                          chunk = conn.recv(65536)
     213                          if not chunk:
     214                              return
     215                          f.write(chunk)
     216  
     217          event = threading.Event()
     218          sock.settimeout(self.timeout)
     219          self.thread = threading.Thread(target=run, args=(sock, ))
     220          self.thread.start()
     221          event.set()
     222  
     223      # Exact required disk space would be (size * 2), but let's give it a
     224      # bit more tolerance.
     225      @skip_no_disk_space(TESTFN, size * 2.5)
     226      @requires_resource('cpu')
     227      def test_it(self):
     228          port = socket_helper.find_unused_port()
     229          with socket.create_server(("", port)) as sock:
     230              self.tcp_server(sock)
     231              with socket.create_connection(("127.0.0.1", port)) as client:
     232                  with open(TESTFN, 'rb') as f:
     233                      client.sendfile(f)
     234          self.tearDown()
     235  
     236          size = os.path.getsize(TESTFN)
     237          self.assertEqual(os.path.getsize(TESTFN2), size)
     238          with open(TESTFN2, 'rb') as f:
     239              self.assertEqual(f.read(5), b'z\x00\x00\x00\x00')
     240              f.seek(size - 5)
     241              self.assertEqual(f.read(), b'\x00\x00\x00\x00a')
     242  
     243  
     244  def setUpModule():
     245      try:
     246          import signal
     247          # The default handler for SIGXFSZ is to abort the process.
     248          # By ignoring it, system calls exceeding the file size resource
     249          # limit will raise OSError instead of crashing the interpreter.
     250          signal.signal(signal.SIGXFSZ, signal.SIG_IGN)
     251      except (ImportError, AttributeError):
     252          pass
     253  
     254      # On Windows and Mac OSX this test consumes large resources; It
     255      # takes a long time to build the >2 GiB file and takes >2 GiB of disk
     256      # space therefore the resource must be enabled to run this test.
     257      # If not, nothing after this line stanza will be executed.
     258      if sys.platform[:3] == 'win' or sys.platform == 'darwin':
     259          requires('largefile',
     260                   'test requires %s bytes and a long time to run' % str(size))
     261      else:
     262          # Only run if the current filesystem supports large files.
     263          # (Skip this test on Windows, since we now always support
     264          # large files.)
     265          f = open(TESTFN, 'wb', buffering=0)
     266          try:
     267              # 2**31 == 2147483648
     268              f.seek(2147483649)
     269              # Seeking is not enough of a test: you must write and flush, too!
     270              f.write(b'x')
     271              f.flush()
     272          except (OSError, OverflowError):
     273              raise unittest.SkipTest("filesystem does not have "
     274                                      "largefile support")
     275          finally:
     276              f.close()
     277              unlink(TESTFN)
     278  
     279  
     280  class ESC[4;38;5;81mCLargeFileTest(ESC[4;38;5;149mTestFileMethods, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     281      open = staticmethod(io.open)
     282  
     283  
     284  class ESC[4;38;5;81mPyLargeFileTest(ESC[4;38;5;149mTestFileMethods, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     285      open = staticmethod(pyio.open)
     286  
     287  
     288  def tearDownModule():
     289      unlink(TESTFN)
     290      unlink(TESTFN2)
     291  
     292  
     293  if __name__ == '__main__':
     294      unittest.main()