1 """Test script for the dumbdbm module
2 Original by Roger E. Masse
3 """
4
5 import contextlib
6 import io
7 import operator
8 import os
9 import stat
10 import unittest
11 import dbm.dumb as dumbdbm
12 from test import support
13 from test.support import os_helper
14 from functools import partial
15
16 _fname = os_helper.TESTFN
17
18
19 def _delete_files():
20 for ext in [".dir", ".dat", ".bak"]:
21 try:
22 os.unlink(_fname + ext)
23 except OSError:
24 pass
25
26 class ESC[4;38;5;81mDumbDBMTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
27 _dict = {b'0': b'',
28 b'a': b'Python:',
29 b'b': b'Programming',
30 b'c': b'the',
31 b'd': b'way',
32 b'f': b'Guido',
33 b'g': b'intended',
34 '\u00fc'.encode('utf-8') : b'!',
35 }
36
37 def test_dumbdbm_creation(self):
38 with contextlib.closing(dumbdbm.open(_fname, 'c')) as f:
39 self.assertEqual(list(f.keys()), [])
40 for key in self._dict:
41 f[key] = self._dict[key]
42 self.read_helper(f)
43
44 @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
45 @os_helper.skip_unless_working_chmod
46 def test_dumbdbm_creation_mode(self):
47 try:
48 old_umask = os.umask(0o002)
49 f = dumbdbm.open(_fname, 'c', 0o637)
50 f.close()
51 finally:
52 os.umask(old_umask)
53
54 expected_mode = 0o635
55 if os.name != 'posix':
56 # Windows only supports setting the read-only attribute.
57 # This shouldn't fail, but doesn't work like Unix either.
58 expected_mode = 0o666
59
60 import stat
61 st = os.stat(_fname + '.dat')
62 self.assertEqual(stat.S_IMODE(st.st_mode), expected_mode)
63 st = os.stat(_fname + '.dir')
64 self.assertEqual(stat.S_IMODE(st.st_mode), expected_mode)
65
66 def test_close_twice(self):
67 f = dumbdbm.open(_fname)
68 f[b'a'] = b'b'
69 self.assertEqual(f[b'a'], b'b')
70 f.close()
71 f.close()
72
73 def test_dumbdbm_modification(self):
74 self.init_db()
75 with contextlib.closing(dumbdbm.open(_fname, 'w')) as f:
76 self._dict[b'g'] = f[b'g'] = b"indented"
77 self.read_helper(f)
78 # setdefault() works as in the dict interface
79 self.assertEqual(f.setdefault(b'xxx', b'foo'), b'foo')
80 self.assertEqual(f[b'xxx'], b'foo')
81
82 def test_dumbdbm_read(self):
83 self.init_db()
84 with contextlib.closing(dumbdbm.open(_fname, 'r')) as f:
85 self.read_helper(f)
86 with self.assertRaisesRegex(dumbdbm.error,
87 'The database is opened for reading only'):
88 f[b'g'] = b'x'
89 with self.assertRaisesRegex(dumbdbm.error,
90 'The database is opened for reading only'):
91 del f[b'a']
92 # get() works as in the dict interface
93 self.assertEqual(f.get(b'a'), self._dict[b'a'])
94 self.assertEqual(f.get(b'xxx', b'foo'), b'foo')
95 self.assertIsNone(f.get(b'xxx'))
96 with self.assertRaises(KeyError):
97 f[b'xxx']
98
99 def test_dumbdbm_keys(self):
100 self.init_db()
101 with contextlib.closing(dumbdbm.open(_fname)) as f:
102 keys = self.keys_helper(f)
103
104 def test_write_contains(self):
105 with contextlib.closing(dumbdbm.open(_fname)) as f:
106 f[b'1'] = b'hello'
107 self.assertIn(b'1', f)
108
109 def test_write_write_read(self):
110 # test for bug #482460
111 with contextlib.closing(dumbdbm.open(_fname)) as f:
112 f[b'1'] = b'hello'
113 f[b'1'] = b'hello2'
114 with contextlib.closing(dumbdbm.open(_fname)) as f:
115 self.assertEqual(f[b'1'], b'hello2')
116
117 def test_str_read(self):
118 self.init_db()
119 with contextlib.closing(dumbdbm.open(_fname, 'r')) as f:
120 self.assertEqual(f['\u00fc'], self._dict['\u00fc'.encode('utf-8')])
121
122 def test_str_write_contains(self):
123 self.init_db()
124 with contextlib.closing(dumbdbm.open(_fname)) as f:
125 f['\u00fc'] = b'!'
126 f['1'] = 'a'
127 with contextlib.closing(dumbdbm.open(_fname, 'r')) as f:
128 self.assertIn('\u00fc', f)
129 self.assertEqual(f['\u00fc'.encode('utf-8')],
130 self._dict['\u00fc'.encode('utf-8')])
131 self.assertEqual(f[b'1'], b'a')
132
133 def test_line_endings(self):
134 # test for bug #1172763: dumbdbm would die if the line endings
135 # weren't what was expected.
136 with contextlib.closing(dumbdbm.open(_fname)) as f:
137 f[b'1'] = b'hello'
138 f[b'2'] = b'hello2'
139
140 # Mangle the file by changing the line separator to Windows or Unix
141 with io.open(_fname + '.dir', 'rb') as file:
142 data = file.read()
143 if os.linesep == '\n':
144 data = data.replace(b'\n', b'\r\n')
145 else:
146 data = data.replace(b'\r\n', b'\n')
147 with io.open(_fname + '.dir', 'wb') as file:
148 file.write(data)
149
150 f = dumbdbm.open(_fname)
151 self.assertEqual(f[b'1'], b'hello')
152 self.assertEqual(f[b'2'], b'hello2')
153
154
155 def read_helper(self, f):
156 keys = self.keys_helper(f)
157 for key in self._dict:
158 self.assertEqual(self._dict[key], f[key])
159
160 def init_db(self):
161 with contextlib.closing(dumbdbm.open(_fname, 'n')) as f:
162 for k in self._dict:
163 f[k] = self._dict[k]
164
165 def keys_helper(self, f):
166 keys = sorted(f.keys())
167 dkeys = sorted(self._dict.keys())
168 self.assertEqual(keys, dkeys)
169 return keys
170
171 # Perform randomized operations. This doesn't make assumptions about
172 # what *might* fail.
173 def test_random(self):
174 import random
175 d = {} # mirror the database
176 for dummy in range(5):
177 with contextlib.closing(dumbdbm.open(_fname)) as f:
178 for dummy in range(100):
179 k = random.choice('abcdefghijklm')
180 if random.random() < 0.2:
181 if k in d:
182 del d[k]
183 del f[k]
184 else:
185 v = random.choice((b'a', b'b', b'c')) * random.randrange(10000)
186 d[k] = v
187 f[k] = v
188 self.assertEqual(f[k], v)
189
190 with contextlib.closing(dumbdbm.open(_fname)) as f:
191 expected = sorted((k.encode("latin-1"), v) for k, v in d.items())
192 got = sorted(f.items())
193 self.assertEqual(expected, got)
194
195 def test_context_manager(self):
196 with dumbdbm.open(_fname, 'c') as db:
197 db["dumbdbm context manager"] = "context manager"
198
199 with dumbdbm.open(_fname, 'r') as db:
200 self.assertEqual(list(db.keys()), [b"dumbdbm context manager"])
201
202 with self.assertRaises(dumbdbm.error):
203 db.keys()
204
205 def test_check_closed(self):
206 f = dumbdbm.open(_fname, 'c')
207 f.close()
208
209 for meth in (partial(operator.delitem, f),
210 partial(operator.setitem, f, 'b'),
211 partial(operator.getitem, f),
212 partial(operator.contains, f)):
213 with self.assertRaises(dumbdbm.error) as cm:
214 meth('test')
215 self.assertEqual(str(cm.exception),
216 "DBM object has already been closed")
217
218 for meth in (operator.methodcaller('keys'),
219 operator.methodcaller('iterkeys'),
220 operator.methodcaller('items'),
221 len):
222 with self.assertRaises(dumbdbm.error) as cm:
223 meth(f)
224 self.assertEqual(str(cm.exception),
225 "DBM object has already been closed")
226
227 def test_create_new(self):
228 with dumbdbm.open(_fname, 'n') as f:
229 for k in self._dict:
230 f[k] = self._dict[k]
231
232 with dumbdbm.open(_fname, 'n') as f:
233 self.assertEqual(f.keys(), [])
234
235 def test_eval(self):
236 with open(_fname + '.dir', 'w', encoding="utf-8") as stream:
237 stream.write("str(print('Hacked!')), 0\n")
238 with support.captured_stdout() as stdout:
239 with self.assertRaises(ValueError):
240 with dumbdbm.open(_fname) as f:
241 pass
242 self.assertEqual(stdout.getvalue(), '')
243
244 def test_missing_data(self):
245 for value in ('r', 'w'):
246 _delete_files()
247 with self.assertRaises(FileNotFoundError):
248 dumbdbm.open(_fname, value)
249 self.assertFalse(os.path.exists(_fname + '.dir'))
250 self.assertFalse(os.path.exists(_fname + '.bak'))
251
252 def test_missing_index(self):
253 with dumbdbm.open(_fname, 'n') as f:
254 pass
255 os.unlink(_fname + '.dir')
256 for value in ('r', 'w'):
257 with self.assertRaises(FileNotFoundError):
258 dumbdbm.open(_fname, value)
259 self.assertFalse(os.path.exists(_fname + '.dir'))
260 self.assertFalse(os.path.exists(_fname + '.bak'))
261
262 def test_invalid_flag(self):
263 for flag in ('x', 'rf', None):
264 with self.assertRaisesRegex(ValueError,
265 "Flag must be one of "
266 "'r', 'w', 'c', or 'n'"):
267 dumbdbm.open(_fname, flag)
268
269 @os_helper.skip_unless_working_chmod
270 def test_readonly_files(self):
271 with os_helper.temp_dir() as dir:
272 fname = os.path.join(dir, 'db')
273 with dumbdbm.open(fname, 'n') as f:
274 self.assertEqual(list(f.keys()), [])
275 for key in self._dict:
276 f[key] = self._dict[key]
277 os.chmod(fname + ".dir", stat.S_IRUSR)
278 os.chmod(fname + ".dat", stat.S_IRUSR)
279 os.chmod(dir, stat.S_IRUSR|stat.S_IXUSR)
280 with dumbdbm.open(fname, 'r') as f:
281 self.assertEqual(sorted(f.keys()), sorted(self._dict))
282 f.close() # don't write
283
284 @unittest.skipUnless(os_helper.TESTFN_NONASCII,
285 'requires OS support of non-ASCII encodings')
286 def test_nonascii_filename(self):
287 filename = os_helper.TESTFN_NONASCII
288 for suffix in ['.dir', '.dat', '.bak']:
289 self.addCleanup(os_helper.unlink, filename + suffix)
290 with dumbdbm.open(filename, 'c') as db:
291 db[b'key'] = b'value'
292 self.assertTrue(os.path.exists(filename + '.dat'))
293 self.assertTrue(os.path.exists(filename + '.dir'))
294 with dumbdbm.open(filename, 'r') as db:
295 self.assertEqual(list(db.keys()), [b'key'])
296 self.assertTrue(b'key' in db)
297 self.assertEqual(db[b'key'], b'value')
298
299 def test_open_with_pathlib_path(self):
300 dumbdbm.open(os_helper.FakePath(_fname), "c").close()
301
302 def test_open_with_bytes_path(self):
303 dumbdbm.open(os.fsencode(_fname), "c").close()
304
305 def test_open_with_pathlib_bytes_path(self):
306 dumbdbm.open(os_helper.FakePath(os.fsencode(_fname)), "c").close()
307
308 def tearDown(self):
309 _delete_files()
310
311 def setUp(self):
312 _delete_files()
313
314
315 if __name__ == "__main__":
316 unittest.main()