(root)/
Python-3.11.7/
Lib/
test/
test_dbm_dumb.py
       1  """Test script for the dumbdbm module
       2     Original by Roger E. Masse
       3  """
       4  
       5  import contextlib
       6  import io
       7  import operator
       8  import os
       9  import stat
      10  import unittest
      11  import dbm.dumb as dumbdbm
      12  from test import support
      13  from test.support import os_helper
      14  from functools import partial
      15  
      16  _fname = os_helper.TESTFN
      17  
      18  
      19  def _delete_files():
      20      for ext in [".dir", ".dat", ".bak"]:
      21          try:
      22              os.unlink(_fname + ext)
      23          except OSError:
      24              pass
      25  
      26  class ESC[4;38;5;81mDumbDBMTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      27      _dict = {b'0': b'',
      28               b'a': b'Python:',
      29               b'b': b'Programming',
      30               b'c': b'the',
      31               b'd': b'way',
      32               b'f': b'Guido',
      33               b'g': b'intended',
      34               '\u00fc'.encode('utf-8') : b'!',
      35               }
      36  
      37      def test_dumbdbm_creation(self):
      38          with contextlib.closing(dumbdbm.open(_fname, 'c')) as f:
      39              self.assertEqual(list(f.keys()), [])
      40              for key in self._dict:
      41                  f[key] = self._dict[key]
      42              self.read_helper(f)
      43  
      44      @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
      45      @os_helper.skip_unless_working_chmod
      46      def test_dumbdbm_creation_mode(self):
      47          try:
      48              old_umask = os.umask(0o002)
      49              f = dumbdbm.open(_fname, 'c', 0o637)
      50              f.close()
      51          finally:
      52              os.umask(old_umask)
      53  
      54          expected_mode = 0o635
      55          if os.name != 'posix':
      56              # Windows only supports setting the read-only attribute.
      57              # This shouldn't fail, but doesn't work like Unix either.
      58              expected_mode = 0o666
      59  
      60          import stat
      61          st = os.stat(_fname + '.dat')
      62          self.assertEqual(stat.S_IMODE(st.st_mode), expected_mode)
      63          st = os.stat(_fname + '.dir')
      64          self.assertEqual(stat.S_IMODE(st.st_mode), expected_mode)
      65  
      66      def test_close_twice(self):
      67          f = dumbdbm.open(_fname)
      68          f[b'a'] = b'b'
      69          self.assertEqual(f[b'a'], b'b')
      70          f.close()
      71          f.close()
      72  
      73      def test_dumbdbm_modification(self):
      74          self.init_db()
      75          with contextlib.closing(dumbdbm.open(_fname, 'w')) as f:
      76              self._dict[b'g'] = f[b'g'] = b"indented"
      77              self.read_helper(f)
      78              # setdefault() works as in the dict interface
      79              self.assertEqual(f.setdefault(b'xxx', b'foo'), b'foo')
      80              self.assertEqual(f[b'xxx'], b'foo')
      81  
      82      def test_dumbdbm_read(self):
      83          self.init_db()
      84          with contextlib.closing(dumbdbm.open(_fname, 'r')) as f:
      85              self.read_helper(f)
      86              with self.assertRaisesRegex(dumbdbm.error,
      87                                      'The database is opened for reading only'):
      88                  f[b'g'] = b'x'
      89              with self.assertRaisesRegex(dumbdbm.error,
      90                                      'The database is opened for reading only'):
      91                  del f[b'a']
      92              # get() works as in the dict interface
      93              self.assertEqual(f.get(b'a'), self._dict[b'a'])
      94              self.assertEqual(f.get(b'xxx', b'foo'), b'foo')
      95              self.assertIsNone(f.get(b'xxx'))
      96              with self.assertRaises(KeyError):
      97                  f[b'xxx']
      98  
      99      def test_dumbdbm_keys(self):
     100          self.init_db()
     101          with contextlib.closing(dumbdbm.open(_fname)) as f:
     102              keys = self.keys_helper(f)
     103  
     104      def test_write_contains(self):
     105          with contextlib.closing(dumbdbm.open(_fname)) as f:
     106              f[b'1'] = b'hello'
     107              self.assertIn(b'1', f)
     108  
     109      def test_write_write_read(self):
     110          # test for bug #482460
     111          with contextlib.closing(dumbdbm.open(_fname)) as f:
     112              f[b'1'] = b'hello'
     113              f[b'1'] = b'hello2'
     114          with contextlib.closing(dumbdbm.open(_fname)) as f:
     115              self.assertEqual(f[b'1'], b'hello2')
     116  
     117      def test_str_read(self):
     118          self.init_db()
     119          with contextlib.closing(dumbdbm.open(_fname, 'r')) as f:
     120              self.assertEqual(f['\u00fc'], self._dict['\u00fc'.encode('utf-8')])
     121  
     122      def test_str_write_contains(self):
     123          self.init_db()
     124          with contextlib.closing(dumbdbm.open(_fname)) as f:
     125              f['\u00fc'] = b'!'
     126              f['1'] = 'a'
     127          with contextlib.closing(dumbdbm.open(_fname, 'r')) as f:
     128              self.assertIn('\u00fc', f)
     129              self.assertEqual(f['\u00fc'.encode('utf-8')],
     130                               self._dict['\u00fc'.encode('utf-8')])
     131              self.assertEqual(f[b'1'], b'a')
     132  
     133      def test_line_endings(self):
     134          # test for bug #1172763: dumbdbm would die if the line endings
     135          # weren't what was expected.
     136          with contextlib.closing(dumbdbm.open(_fname)) as f:
     137              f[b'1'] = b'hello'
     138              f[b'2'] = b'hello2'
     139  
     140          # Mangle the file by changing the line separator to Windows or Unix
     141          with io.open(_fname + '.dir', 'rb') as file:
     142              data = file.read()
     143          if os.linesep == '\n':
     144              data = data.replace(b'\n', b'\r\n')
     145          else:
     146              data = data.replace(b'\r\n', b'\n')
     147          with io.open(_fname + '.dir', 'wb') as file:
     148              file.write(data)
     149  
     150          f = dumbdbm.open(_fname)
     151          self.assertEqual(f[b'1'], b'hello')
     152          self.assertEqual(f[b'2'], b'hello2')
     153  
     154  
     155      def read_helper(self, f):
     156          keys = self.keys_helper(f)
     157          for key in self._dict:
     158              self.assertEqual(self._dict[key], f[key])
     159  
     160      def init_db(self):
     161          with contextlib.closing(dumbdbm.open(_fname, 'n')) as f:
     162              for k in self._dict:
     163                  f[k] = self._dict[k]
     164  
     165      def keys_helper(self, f):
     166          keys = sorted(f.keys())
     167          dkeys = sorted(self._dict.keys())
     168          self.assertEqual(keys, dkeys)
     169          return keys
     170  
     171      # Perform randomized operations.  This doesn't make assumptions about
     172      # what *might* fail.
     173      def test_random(self):
     174          import random
     175          d = {}  # mirror the database
     176          for dummy in range(5):
     177              with contextlib.closing(dumbdbm.open(_fname)) as f:
     178                  for dummy in range(100):
     179                      k = random.choice('abcdefghijklm')
     180                      if random.random() < 0.2:
     181                          if k in d:
     182                              del d[k]
     183                              del f[k]
     184                      else:
     185                          v = random.choice((b'a', b'b', b'c')) * random.randrange(10000)
     186                          d[k] = v
     187                          f[k] = v
     188                          self.assertEqual(f[k], v)
     189  
     190              with contextlib.closing(dumbdbm.open(_fname)) as f:
     191                  expected = sorted((k.encode("latin-1"), v) for k, v in d.items())
     192                  got = sorted(f.items())
     193                  self.assertEqual(expected, got)
     194  
     195      def test_context_manager(self):
     196          with dumbdbm.open(_fname, 'c') as db:
     197              db["dumbdbm context manager"] = "context manager"
     198  
     199          with dumbdbm.open(_fname, 'r') as db:
     200              self.assertEqual(list(db.keys()), [b"dumbdbm context manager"])
     201  
     202          with self.assertRaises(dumbdbm.error):
     203              db.keys()
     204  
     205      def test_check_closed(self):
     206          f = dumbdbm.open(_fname, 'c')
     207          f.close()
     208  
     209          for meth in (partial(operator.delitem, f),
     210                       partial(operator.setitem, f, 'b'),
     211                       partial(operator.getitem, f),
     212                       partial(operator.contains, f)):
     213              with self.assertRaises(dumbdbm.error) as cm:
     214                  meth('test')
     215              self.assertEqual(str(cm.exception),
     216                               "DBM object has already been closed")
     217  
     218          for meth in (operator.methodcaller('keys'),
     219                       operator.methodcaller('iterkeys'),
     220                       operator.methodcaller('items'),
     221                       len):
     222              with self.assertRaises(dumbdbm.error) as cm:
     223                  meth(f)
     224              self.assertEqual(str(cm.exception),
     225                               "DBM object has already been closed")
     226  
     227      def test_create_new(self):
     228          with dumbdbm.open(_fname, 'n') as f:
     229              for k in self._dict:
     230                  f[k] = self._dict[k]
     231  
     232          with dumbdbm.open(_fname, 'n') as f:
     233              self.assertEqual(f.keys(), [])
     234  
     235      def test_eval(self):
     236          with open(_fname + '.dir', 'w', encoding="utf-8") as stream:
     237              stream.write("str(print('Hacked!')), 0\n")
     238          with support.captured_stdout() as stdout:
     239              with self.assertRaises(ValueError):
     240                  with dumbdbm.open(_fname) as f:
     241                      pass
     242              self.assertEqual(stdout.getvalue(), '')
     243  
     244      def test_missing_data(self):
     245          for value in ('r', 'w'):
     246              _delete_files()
     247              with self.assertRaises(FileNotFoundError):
     248                  dumbdbm.open(_fname, value)
     249              self.assertFalse(os.path.exists(_fname + '.dir'))
     250              self.assertFalse(os.path.exists(_fname + '.bak'))
     251  
     252      def test_missing_index(self):
     253          with dumbdbm.open(_fname, 'n') as f:
     254              pass
     255          os.unlink(_fname + '.dir')
     256          for value in ('r', 'w'):
     257              with self.assertRaises(FileNotFoundError):
     258                  dumbdbm.open(_fname, value)
     259              self.assertFalse(os.path.exists(_fname + '.dir'))
     260              self.assertFalse(os.path.exists(_fname + '.bak'))
     261  
     262      def test_invalid_flag(self):
     263          for flag in ('x', 'rf', None):
     264              with self.assertRaisesRegex(ValueError,
     265                                          "Flag must be one of "
     266                                          "'r', 'w', 'c', or 'n'"):
     267                  dumbdbm.open(_fname, flag)
     268  
     269      @os_helper.skip_unless_working_chmod
     270      def test_readonly_files(self):
     271          with os_helper.temp_dir() as dir:
     272              fname = os.path.join(dir, 'db')
     273              with dumbdbm.open(fname, 'n') as f:
     274                  self.assertEqual(list(f.keys()), [])
     275                  for key in self._dict:
     276                      f[key] = self._dict[key]
     277              os.chmod(fname + ".dir", stat.S_IRUSR)
     278              os.chmod(fname + ".dat", stat.S_IRUSR)
     279              os.chmod(dir, stat.S_IRUSR|stat.S_IXUSR)
     280              with dumbdbm.open(fname, 'r') as f:
     281                  self.assertEqual(sorted(f.keys()), sorted(self._dict))
     282                  f.close()  # don't write
     283  
     284      @unittest.skipUnless(os_helper.TESTFN_NONASCII,
     285                           'requires OS support of non-ASCII encodings')
     286      def test_nonascii_filename(self):
     287          filename = os_helper.TESTFN_NONASCII
     288          for suffix in ['.dir', '.dat', '.bak']:
     289              self.addCleanup(os_helper.unlink, filename + suffix)
     290          with dumbdbm.open(filename, 'c') as db:
     291              db[b'key'] = b'value'
     292          self.assertTrue(os.path.exists(filename + '.dat'))
     293          self.assertTrue(os.path.exists(filename + '.dir'))
     294          with dumbdbm.open(filename, 'r') as db:
     295              self.assertEqual(list(db.keys()), [b'key'])
     296              self.assertTrue(b'key' in db)
     297              self.assertEqual(db[b'key'], b'value')
     298  
     299      def test_open_with_pathlib_path(self):
     300          dumbdbm.open(os_helper.FakePath(_fname), "c").close()
     301  
     302      def test_open_with_bytes_path(self):
     303          dumbdbm.open(os.fsencode(_fname), "c").close()
     304  
     305      def test_open_with_pathlib_bytes_path(self):
     306          dumbdbm.open(os_helper.FakePath(os.fsencode(_fname)), "c").close()
     307  
     308      def tearDown(self):
     309          _delete_files()
     310  
     311      def setUp(self):
     312          _delete_files()
     313  
     314  
     315  if __name__ == "__main__":
     316      unittest.main()