(root)/
Python-3.11.7/
Lib/
test/
test_fcntl.py
       1  """Test program for the fcntl C module.
       2  """
       3  import platform
       4  import os
       5  import struct
       6  import sys
       7  import unittest
       8  from multiprocessing import Process
       9  from test.support import verbose, cpython_only
      10  from test.support.import_helper import import_module
      11  from test.support.os_helper import TESTFN, unlink
      12  
      13  
      14  # Skip test if no fcntl module.
      15  fcntl = import_module('fcntl')
      16  
      17  
      18  
      19  def get_lockdata():
      20      try:
      21          os.O_LARGEFILE
      22      except AttributeError:
      23          start_len = "ll"
      24      else:
      25          start_len = "qq"
      26  
      27      if (sys.platform.startswith(('netbsd', 'freebsd', 'openbsd'))
      28          or sys.platform == 'darwin'):
      29          if struct.calcsize('l') == 8:
      30              off_t = 'l'
      31              pid_t = 'i'
      32          else:
      33              off_t = 'lxxxx'
      34              pid_t = 'l'
      35          lockdata = struct.pack(off_t + off_t + pid_t + 'hh', 0, 0, 0,
      36                                 fcntl.F_WRLCK, 0)
      37      elif sys.platform.startswith('gnukfreebsd'):
      38          lockdata = struct.pack('qqihhi', 0, 0, 0, fcntl.F_WRLCK, 0, 0)
      39      elif sys.platform in ['hp-uxB', 'unixware7']:
      40          lockdata = struct.pack('hhlllii', fcntl.F_WRLCK, 0, 0, 0, 0, 0, 0)
      41      else:
      42          lockdata = struct.pack('hh'+start_len+'hh', fcntl.F_WRLCK, 0, 0, 0, 0, 0)
      43      if lockdata:
      44          if verbose:
      45              print('struct.pack: ', repr(lockdata))
      46      return lockdata
      47  
      48  lockdata = get_lockdata()
      49  
      50  class ESC[4;38;5;81mBadFile:
      51      def __init__(self, fn):
      52          self.fn = fn
      53      def fileno(self):
      54          return self.fn
      55  
      56  def try_lockf_on_other_process_fail(fname, cmd):
      57      f = open(fname, 'wb+')
      58      try:
      59          fcntl.lockf(f, cmd)
      60      except BlockingIOError:
      61          pass
      62      finally:
      63          f.close()
      64  
      65  def try_lockf_on_other_process(fname, cmd):
      66      f = open(fname, 'wb+')
      67      fcntl.lockf(f, cmd)
      68      fcntl.lockf(f, fcntl.LOCK_UN)
      69      f.close()
      70  
      71  class ESC[4;38;5;81mTestFcntl(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      72  
      73      def setUp(self):
      74          self.f = None
      75  
      76      def tearDown(self):
      77          if self.f and not self.f.closed:
      78              self.f.close()
      79          unlink(TESTFN)
      80  
      81      def test_fcntl_fileno(self):
      82          # the example from the library docs
      83          self.f = open(TESTFN, 'wb')
      84          rv = fcntl.fcntl(self.f.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
      85          if verbose:
      86              print('Status from fcntl with O_NONBLOCK: ', rv)
      87          rv = fcntl.fcntl(self.f.fileno(), fcntl.F_SETLKW, lockdata)
      88          if verbose:
      89              print('String from fcntl with F_SETLKW: ', repr(rv))
      90          self.f.close()
      91  
      92      def test_fcntl_file_descriptor(self):
      93          # again, but pass the file rather than numeric descriptor
      94          self.f = open(TESTFN, 'wb')
      95          rv = fcntl.fcntl(self.f, fcntl.F_SETFL, os.O_NONBLOCK)
      96          if verbose:
      97              print('Status from fcntl with O_NONBLOCK: ', rv)
      98          rv = fcntl.fcntl(self.f, fcntl.F_SETLKW, lockdata)
      99          if verbose:
     100              print('String from fcntl with F_SETLKW: ', repr(rv))
     101          self.f.close()
     102  
     103      def test_fcntl_bad_file(self):
     104          with self.assertRaises(ValueError):
     105              fcntl.fcntl(-1, fcntl.F_SETFL, os.O_NONBLOCK)
     106          with self.assertRaises(ValueError):
     107              fcntl.fcntl(BadFile(-1), fcntl.F_SETFL, os.O_NONBLOCK)
     108          with self.assertRaises(TypeError):
     109              fcntl.fcntl('spam', fcntl.F_SETFL, os.O_NONBLOCK)
     110          with self.assertRaises(TypeError):
     111              fcntl.fcntl(BadFile('spam'), fcntl.F_SETFL, os.O_NONBLOCK)
     112  
     113      @cpython_only
     114      def test_fcntl_bad_file_overflow(self):
     115          from _testcapi import INT_MAX, INT_MIN
     116          # Issue 15989
     117          with self.assertRaises(OverflowError):
     118              fcntl.fcntl(INT_MAX + 1, fcntl.F_SETFL, os.O_NONBLOCK)
     119          with self.assertRaises(OverflowError):
     120              fcntl.fcntl(BadFile(INT_MAX + 1), fcntl.F_SETFL, os.O_NONBLOCK)
     121          with self.assertRaises(OverflowError):
     122              fcntl.fcntl(INT_MIN - 1, fcntl.F_SETFL, os.O_NONBLOCK)
     123          with self.assertRaises(OverflowError):
     124              fcntl.fcntl(BadFile(INT_MIN - 1), fcntl.F_SETFL, os.O_NONBLOCK)
     125  
     126      @unittest.skipIf(
     127          platform.machine().startswith('arm') and platform.system() == 'Linux',
     128          "ARM Linux returns EINVAL for F_NOTIFY DN_MULTISHOT")
     129      def test_fcntl_64_bit(self):
     130          # Issue #1309352: fcntl shouldn't fail when the third arg fits in a
     131          # C 'long' but not in a C 'int'.
     132          try:
     133              cmd = fcntl.F_NOTIFY
     134              # This flag is larger than 2**31 in 64-bit builds
     135              flags = fcntl.DN_MULTISHOT
     136          except AttributeError:
     137              self.skipTest("F_NOTIFY or DN_MULTISHOT unavailable")
     138          fd = os.open(os.path.dirname(os.path.abspath(TESTFN)), os.O_RDONLY)
     139          try:
     140              fcntl.fcntl(fd, cmd, flags)
     141          finally:
     142              os.close(fd)
     143  
     144      def test_flock(self):
     145          # Solaris needs readable file for shared lock
     146          self.f = open(TESTFN, 'wb+')
     147          fileno = self.f.fileno()
     148          fcntl.flock(fileno, fcntl.LOCK_SH)
     149          fcntl.flock(fileno, fcntl.LOCK_UN)
     150          fcntl.flock(self.f, fcntl.LOCK_SH | fcntl.LOCK_NB)
     151          fcntl.flock(self.f, fcntl.LOCK_UN)
     152          fcntl.flock(fileno, fcntl.LOCK_EX)
     153          fcntl.flock(fileno, fcntl.LOCK_UN)
     154  
     155          self.assertRaises(ValueError, fcntl.flock, -1, fcntl.LOCK_SH)
     156          self.assertRaises(TypeError, fcntl.flock, 'spam', fcntl.LOCK_SH)
     157  
     158      @unittest.skipIf(platform.system() == "AIX", "AIX returns PermissionError")
     159      def test_lockf_exclusive(self):
     160          self.f = open(TESTFN, 'wb+')
     161          cmd = fcntl.LOCK_EX | fcntl.LOCK_NB
     162          fcntl.lockf(self.f, cmd)
     163          p = Process(target=try_lockf_on_other_process_fail, args=(TESTFN, cmd))
     164          p.start()
     165          p.join()
     166          fcntl.lockf(self.f, fcntl.LOCK_UN)
     167          self.assertEqual(p.exitcode, 0)
     168  
     169      @unittest.skipIf(platform.system() == "AIX", "AIX returns PermissionError")
     170      def test_lockf_share(self):
     171          self.f = open(TESTFN, 'wb+')
     172          cmd = fcntl.LOCK_SH | fcntl.LOCK_NB
     173          fcntl.lockf(self.f, cmd)
     174          p = Process(target=try_lockf_on_other_process, args=(TESTFN, cmd))
     175          p.start()
     176          p.join()
     177          fcntl.lockf(self.f, fcntl.LOCK_UN)
     178          self.assertEqual(p.exitcode, 0)
     179  
     180      @cpython_only
     181      def test_flock_overflow(self):
     182          import _testcapi
     183          self.assertRaises(OverflowError, fcntl.flock, _testcapi.INT_MAX+1,
     184                            fcntl.LOCK_SH)
     185  
     186      @unittest.skipIf(sys.platform != 'darwin', "F_GETPATH is only available on macos")
     187      def test_fcntl_f_getpath(self):
     188          self.f = open(TESTFN, 'wb')
     189          expected = os.path.abspath(TESTFN).encode('utf-8')
     190          res = fcntl.fcntl(self.f.fileno(), fcntl.F_GETPATH, bytes(len(expected)))
     191          self.assertEqual(expected, res)
     192  
     193      @unittest.skipUnless(
     194          hasattr(fcntl, "F_SETPIPE_SZ") and hasattr(fcntl, "F_GETPIPE_SZ"),
     195          "F_SETPIPE_SZ and F_GETPIPE_SZ are not available on all platforms.")
     196      def test_fcntl_f_pipesize(self):
     197          test_pipe_r, test_pipe_w = os.pipe()
     198          try:
     199              # Get the default pipesize with F_GETPIPE_SZ
     200              pipesize_default = fcntl.fcntl(test_pipe_w, fcntl.F_GETPIPE_SZ)
     201              pipesize = pipesize_default // 2  # A new value to detect change.
     202              if pipesize < 512:  # the POSIX minimum
     203                  raise unittest.SkipTest(
     204                      'default pipesize too small to perform test.')
     205              fcntl.fcntl(test_pipe_w, fcntl.F_SETPIPE_SZ, pipesize)
     206              self.assertEqual(fcntl.fcntl(test_pipe_w, fcntl.F_GETPIPE_SZ),
     207                               pipesize)
     208          finally:
     209              os.close(test_pipe_r)
     210              os.close(test_pipe_w)
     211  
     212  
     213  if __name__ == '__main__':
     214      unittest.main()