python (3.12.0)
1 # pysqlite2/test/transactions.py: tests transactions
2 #
3 # Copyright (C) 2005-2007 Gerhard Häring <gh@ghaering.de>
4 #
5 # This file is part of pysqlite.
6 #
7 # This software is provided 'as-is', without any express or implied
8 # warranty. In no event will the authors be held liable for any damages
9 # arising from the use of this software.
10 #
11 # Permission is granted to anyone to use this software for any purpose,
12 # including commercial applications, and to alter it and redistribute it
13 # freely, subject to the following restrictions:
14 #
15 # 1. The origin of this software must not be misrepresented; you must not
16 # claim that you wrote the original software. If you use this software
17 # in a product, an acknowledgment in the product documentation would be
18 # appreciated but is not required.
19 # 2. Altered source versions must be plainly marked as such, and must not be
20 # misrepresented as being the original software.
21 # 3. This notice may not be removed or altered from any source distribution.
22
23 import unittest
24 import sqlite3 as sqlite
25 from contextlib import contextmanager
26
27 from test.support.os_helper import TESTFN, unlink
28 from test.support.script_helper import assert_python_ok
29
30 from test.test_sqlite3.test_dbapi import memory_database
31
32
33 class ESC[4;38;5;81mTransactionTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
34 def setUp(self):
35 # We can disable the busy handlers, since we control
36 # the order of SQLite C API operations.
37 self.con1 = sqlite.connect(TESTFN, timeout=0)
38 self.cur1 = self.con1.cursor()
39
40 self.con2 = sqlite.connect(TESTFN, timeout=0)
41 self.cur2 = self.con2.cursor()
42
43 def tearDown(self):
44 try:
45 self.cur1.close()
46 self.con1.close()
47
48 self.cur2.close()
49 self.con2.close()
50
51 finally:
52 unlink(TESTFN)
53
54 def test_dml_does_not_auto_commit_before(self):
55 self.cur1.execute("create table test(i)")
56 self.cur1.execute("insert into test(i) values (5)")
57 self.cur1.execute("create table test2(j)")
58 self.cur2.execute("select i from test")
59 res = self.cur2.fetchall()
60 self.assertEqual(len(res), 0)
61
62 def test_insert_starts_transaction(self):
63 self.cur1.execute("create table test(i)")
64 self.cur1.execute("insert into test(i) values (5)")
65 self.cur2.execute("select i from test")
66 res = self.cur2.fetchall()
67 self.assertEqual(len(res), 0)
68
69 def test_update_starts_transaction(self):
70 self.cur1.execute("create table test(i)")
71 self.cur1.execute("insert into test(i) values (5)")
72 self.con1.commit()
73 self.cur1.execute("update test set i=6")
74 self.cur2.execute("select i from test")
75 res = self.cur2.fetchone()[0]
76 self.assertEqual(res, 5)
77
78 def test_delete_starts_transaction(self):
79 self.cur1.execute("create table test(i)")
80 self.cur1.execute("insert into test(i) values (5)")
81 self.con1.commit()
82 self.cur1.execute("delete from test")
83 self.cur2.execute("select i from test")
84 res = self.cur2.fetchall()
85 self.assertEqual(len(res), 1)
86
87 def test_replace_starts_transaction(self):
88 self.cur1.execute("create table test(i)")
89 self.cur1.execute("insert into test(i) values (5)")
90 self.con1.commit()
91 self.cur1.execute("replace into test(i) values (6)")
92 self.cur2.execute("select i from test")
93 res = self.cur2.fetchall()
94 self.assertEqual(len(res), 1)
95 self.assertEqual(res[0][0], 5)
96
97 def test_toggle_auto_commit(self):
98 self.cur1.execute("create table test(i)")
99 self.cur1.execute("insert into test(i) values (5)")
100 self.con1.isolation_level = None
101 self.assertEqual(self.con1.isolation_level, None)
102 self.cur2.execute("select i from test")
103 res = self.cur2.fetchall()
104 self.assertEqual(len(res), 1)
105
106 self.con1.isolation_level = "DEFERRED"
107 self.assertEqual(self.con1.isolation_level , "DEFERRED")
108 self.cur1.execute("insert into test(i) values (5)")
109 self.cur2.execute("select i from test")
110 res = self.cur2.fetchall()
111 self.assertEqual(len(res), 1)
112
113 def test_raise_timeout(self):
114 self.cur1.execute("create table test(i)")
115 self.cur1.execute("insert into test(i) values (5)")
116 with self.assertRaises(sqlite.OperationalError):
117 self.cur2.execute("insert into test(i) values (5)")
118
119 def test_locking(self):
120 # This tests the improved concurrency with pysqlite 2.3.4. You needed
121 # to roll back con2 before you could commit con1.
122 self.cur1.execute("create table test(i)")
123 self.cur1.execute("insert into test(i) values (5)")
124 with self.assertRaises(sqlite.OperationalError):
125 self.cur2.execute("insert into test(i) values (5)")
126 # NO self.con2.rollback() HERE!!!
127 self.con1.commit()
128
129 def test_rollback_cursor_consistency(self):
130 """Check that cursors behave correctly after rollback."""
131 con = sqlite.connect(":memory:")
132 cur = con.cursor()
133 cur.execute("create table test(x)")
134 cur.execute("insert into test(x) values (5)")
135 cur.execute("select 1 union select 2 union select 3")
136
137 con.rollback()
138 self.assertEqual(cur.fetchall(), [(1,), (2,), (3,)])
139
140 def test_multiple_cursors_and_iternext(self):
141 # gh-94028: statements are cleared and reset in cursor iternext.
142
143 # Provoke the gh-94028 by using a cursor cache.
144 CURSORS = {}
145 def sql(cx, sql, *args):
146 cu = cx.cursor()
147 cu.execute(sql, args)
148 CURSORS[id(sql)] = cu
149 return cu
150
151 self.con1.execute("create table t(t)")
152 sql(self.con1, "insert into t values (?), (?), (?)", "u1", "u2", "u3")
153 self.con1.commit()
154
155 # On second connection, verify rows are visible, then delete them.
156 count = sql(self.con2, "select count(*) from t").fetchone()[0]
157 self.assertEqual(count, 3)
158 changes = sql(self.con2, "delete from t").rowcount
159 self.assertEqual(changes, 3)
160 self.con2.commit()
161
162 # Back in original connection, create 2 new users.
163 sql(self.con1, "insert into t values (?)", "u4")
164 sql(self.con1, "insert into t values (?)", "u5")
165
166 # The second connection cannot see uncommitted changes.
167 count = sql(self.con2, "select count(*) from t").fetchone()[0]
168 self.assertEqual(count, 0)
169
170 # First connection can see its own changes.
171 count = sql(self.con1, "select count(*) from t").fetchone()[0]
172 self.assertEqual(count, 2)
173
174 # The second connection can now see the changes.
175 self.con1.commit()
176 count = sql(self.con2, "select count(*) from t").fetchone()[0]
177 self.assertEqual(count, 2)
178
179
180 class ESC[4;38;5;81mRollbackTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
181 """bpo-44092: sqlite3 now leaves it to SQLite to resolve rollback issues"""
182
183 def setUp(self):
184 self.con = sqlite.connect(":memory:")
185 self.cur1 = self.con.cursor()
186 self.cur2 = self.con.cursor()
187 with self.con:
188 self.con.execute("create table t(c)");
189 self.con.executemany("insert into t values(?)", [(0,), (1,), (2,)])
190 self.cur1.execute("begin transaction")
191 select = "select c from t"
192 self.cur1.execute(select)
193 self.con.rollback()
194 self.res = self.cur2.execute(select) # Reusing stmt from cache
195
196 def tearDown(self):
197 self.con.close()
198
199 def _check_rows(self):
200 for i, row in enumerate(self.res):
201 self.assertEqual(row[0], i)
202
203 def test_no_duplicate_rows_after_rollback_del_cursor(self):
204 del self.cur1
205 self._check_rows()
206
207 def test_no_duplicate_rows_after_rollback_close_cursor(self):
208 self.cur1.close()
209 self._check_rows()
210
211 def test_no_duplicate_rows_after_rollback_new_query(self):
212 self.cur1.execute("select c from t where c = 1")
213 self._check_rows()
214
215
216
217 class ESC[4;38;5;81mSpecialCommandTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
218 def setUp(self):
219 self.con = sqlite.connect(":memory:")
220 self.cur = self.con.cursor()
221
222 def test_drop_table(self):
223 self.cur.execute("create table test(i)")
224 self.cur.execute("insert into test(i) values (5)")
225 self.cur.execute("drop table test")
226
227 def test_pragma(self):
228 self.cur.execute("create table test(i)")
229 self.cur.execute("insert into test(i) values (5)")
230 self.cur.execute("pragma count_changes=1")
231
232 def tearDown(self):
233 self.cur.close()
234 self.con.close()
235
236
237 class ESC[4;38;5;81mTransactionalDDL(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
238 def setUp(self):
239 self.con = sqlite.connect(":memory:")
240
241 def test_ddl_does_not_autostart_transaction(self):
242 # For backwards compatibility reasons, DDL statements should not
243 # implicitly start a transaction.
244 self.con.execute("create table test(i)")
245 self.con.rollback()
246 result = self.con.execute("select * from test").fetchall()
247 self.assertEqual(result, [])
248
249 def test_immediate_transactional_ddl(self):
250 # You can achieve transactional DDL by issuing a BEGIN
251 # statement manually.
252 self.con.execute("begin immediate")
253 self.con.execute("create table test(i)")
254 self.con.rollback()
255 with self.assertRaises(sqlite.OperationalError):
256 self.con.execute("select * from test")
257
258 def test_transactional_ddl(self):
259 # You can achieve transactional DDL by issuing a BEGIN
260 # statement manually.
261 self.con.execute("begin")
262 self.con.execute("create table test(i)")
263 self.con.rollback()
264 with self.assertRaises(sqlite.OperationalError):
265 self.con.execute("select * from test")
266
267 def tearDown(self):
268 self.con.close()
269
270
271 class ESC[4;38;5;81mIsolationLevelFromInit(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
272 CREATE = "create table t(t)"
273 INSERT = "insert into t values(1)"
274
275 def setUp(self):
276 self.traced = []
277
278 def _run_test(self, cx):
279 cx.execute(self.CREATE)
280 cx.set_trace_callback(lambda stmt: self.traced.append(stmt))
281 with cx:
282 cx.execute(self.INSERT)
283
284 def test_isolation_level_default(self):
285 with memory_database() as cx:
286 self._run_test(cx)
287 self.assertEqual(self.traced, ["BEGIN ", self.INSERT, "COMMIT"])
288
289 def test_isolation_level_begin(self):
290 with memory_database(isolation_level="") as cx:
291 self._run_test(cx)
292 self.assertEqual(self.traced, ["BEGIN ", self.INSERT, "COMMIT"])
293
294 def test_isolation_level_deferred(self):
295 with memory_database(isolation_level="DEFERRED") as cx:
296 self._run_test(cx)
297 self.assertEqual(self.traced, ["BEGIN DEFERRED", self.INSERT, "COMMIT"])
298
299 def test_isolation_level_immediate(self):
300 with memory_database(isolation_level="IMMEDIATE") as cx:
301 self._run_test(cx)
302 self.assertEqual(self.traced,
303 ["BEGIN IMMEDIATE", self.INSERT, "COMMIT"])
304
305 def test_isolation_level_exclusive(self):
306 with memory_database(isolation_level="EXCLUSIVE") as cx:
307 self._run_test(cx)
308 self.assertEqual(self.traced,
309 ["BEGIN EXCLUSIVE", self.INSERT, "COMMIT"])
310
311 def test_isolation_level_none(self):
312 with memory_database(isolation_level=None) as cx:
313 self._run_test(cx)
314 self.assertEqual(self.traced, [self.INSERT])
315
316
317 class ESC[4;38;5;81mIsolationLevelPostInit(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
318 QUERY = "insert into t values(1)"
319
320 def setUp(self):
321 self.cx = sqlite.connect(":memory:")
322 self.cx.execute("create table t(t)")
323 self.traced = []
324 self.cx.set_trace_callback(lambda stmt: self.traced.append(stmt))
325
326 def tearDown(self):
327 self.cx.close()
328
329 def test_isolation_level_default(self):
330 with self.cx:
331 self.cx.execute(self.QUERY)
332 self.assertEqual(self.traced, ["BEGIN ", self.QUERY, "COMMIT"])
333
334 def test_isolation_level_begin(self):
335 self.cx.isolation_level = ""
336 with self.cx:
337 self.cx.execute(self.QUERY)
338 self.assertEqual(self.traced, ["BEGIN ", self.QUERY, "COMMIT"])
339
340 def test_isolation_level_deferrred(self):
341 self.cx.isolation_level = "DEFERRED"
342 with self.cx:
343 self.cx.execute(self.QUERY)
344 self.assertEqual(self.traced, ["BEGIN DEFERRED", self.QUERY, "COMMIT"])
345
346 def test_isolation_level_immediate(self):
347 self.cx.isolation_level = "IMMEDIATE"
348 with self.cx:
349 self.cx.execute(self.QUERY)
350 self.assertEqual(self.traced,
351 ["BEGIN IMMEDIATE", self.QUERY, "COMMIT"])
352
353 def test_isolation_level_exclusive(self):
354 self.cx.isolation_level = "EXCLUSIVE"
355 with self.cx:
356 self.cx.execute(self.QUERY)
357 self.assertEqual(self.traced,
358 ["BEGIN EXCLUSIVE", self.QUERY, "COMMIT"])
359
360 def test_isolation_level_none(self):
361 self.cx.isolation_level = None
362 with self.cx:
363 self.cx.execute(self.QUERY)
364 self.assertEqual(self.traced, [self.QUERY])
365
366
367 class ESC[4;38;5;81mAutocommitAttribute(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
368 """Test PEP 249-compliant autocommit behaviour."""
369 legacy = sqlite.LEGACY_TRANSACTION_CONTROL
370
371 @contextmanager
372 def check_stmt_trace(self, cx, expected, reset=True):
373 try:
374 traced = []
375 cx.set_trace_callback(lambda stmt: traced.append(stmt))
376 yield
377 finally:
378 self.assertEqual(traced, expected)
379 if reset:
380 cx.set_trace_callback(None)
381
382 def test_autocommit_default(self):
383 with memory_database() as cx:
384 self.assertEqual(cx.autocommit,
385 sqlite.LEGACY_TRANSACTION_CONTROL)
386
387 def test_autocommit_setget(self):
388 dataset = (
389 True,
390 False,
391 sqlite.LEGACY_TRANSACTION_CONTROL,
392 )
393 for mode in dataset:
394 with self.subTest(mode=mode):
395 with memory_database(autocommit=mode) as cx:
396 self.assertEqual(cx.autocommit, mode)
397 with memory_database() as cx:
398 cx.autocommit = mode
399 self.assertEqual(cx.autocommit, mode)
400
401 def test_autocommit_setget_invalid(self):
402 msg = "autocommit must be True, False, or.*LEGACY"
403 for mode in "a", 12, (), None:
404 with self.subTest(mode=mode):
405 with self.assertRaisesRegex(ValueError, msg):
406 sqlite.connect(":memory:", autocommit=mode)
407
408 def test_autocommit_disabled(self):
409 expected = [
410 "SELECT 1",
411 "COMMIT",
412 "BEGIN",
413 "ROLLBACK",
414 "BEGIN",
415 ]
416 with memory_database(autocommit=False) as cx:
417 self.assertTrue(cx.in_transaction)
418 with self.check_stmt_trace(cx, expected):
419 cx.execute("SELECT 1")
420 cx.commit()
421 cx.rollback()
422
423 def test_autocommit_disabled_implicit_rollback(self):
424 expected = ["ROLLBACK"]
425 with memory_database(autocommit=False) as cx:
426 self.assertTrue(cx.in_transaction)
427 with self.check_stmt_trace(cx, expected, reset=False):
428 cx.close()
429
430 def test_autocommit_enabled(self):
431 expected = ["CREATE TABLE t(t)", "INSERT INTO t VALUES(1)"]
432 with memory_database(autocommit=True) as cx:
433 self.assertFalse(cx.in_transaction)
434 with self.check_stmt_trace(cx, expected):
435 cx.execute("CREATE TABLE t(t)")
436 cx.execute("INSERT INTO t VALUES(1)")
437 self.assertFalse(cx.in_transaction)
438
439 def test_autocommit_enabled_txn_ctl(self):
440 for op in "commit", "rollback":
441 with self.subTest(op=op):
442 with memory_database(autocommit=True) as cx:
443 meth = getattr(cx, op)
444 self.assertFalse(cx.in_transaction)
445 with self.check_stmt_trace(cx, []):
446 meth() # expect this to pass silently
447 self.assertFalse(cx.in_transaction)
448
449 def test_autocommit_disabled_then_enabled(self):
450 expected = ["COMMIT"]
451 with memory_database(autocommit=False) as cx:
452 self.assertTrue(cx.in_transaction)
453 with self.check_stmt_trace(cx, expected):
454 cx.autocommit = True # should commit
455 self.assertFalse(cx.in_transaction)
456
457 def test_autocommit_enabled_then_disabled(self):
458 expected = ["BEGIN"]
459 with memory_database(autocommit=True) as cx:
460 self.assertFalse(cx.in_transaction)
461 with self.check_stmt_trace(cx, expected):
462 cx.autocommit = False # should begin
463 self.assertTrue(cx.in_transaction)
464
465 def test_autocommit_explicit_then_disabled(self):
466 expected = ["BEGIN DEFERRED"]
467 with memory_database(autocommit=True) as cx:
468 self.assertFalse(cx.in_transaction)
469 with self.check_stmt_trace(cx, expected):
470 cx.execute("BEGIN DEFERRED")
471 cx.autocommit = False # should now be a no-op
472 self.assertTrue(cx.in_transaction)
473
474 def test_autocommit_enabled_ctx_mgr(self):
475 with memory_database(autocommit=True) as cx:
476 # The context manager is a no-op if autocommit=True
477 with self.check_stmt_trace(cx, []):
478 with cx:
479 self.assertFalse(cx.in_transaction)
480 self.assertFalse(cx.in_transaction)
481
482 def test_autocommit_disabled_ctx_mgr(self):
483 expected = ["COMMIT", "BEGIN"]
484 with memory_database(autocommit=False) as cx:
485 with self.check_stmt_trace(cx, expected):
486 with cx:
487 self.assertTrue(cx.in_transaction)
488 self.assertTrue(cx.in_transaction)
489
490 def test_autocommit_compat_ctx_mgr(self):
491 expected = ["BEGIN ", "INSERT INTO T VALUES(1)", "COMMIT"]
492 with memory_database(autocommit=self.legacy) as cx:
493 cx.execute("create table t(t)")
494 with self.check_stmt_trace(cx, expected):
495 with cx:
496 self.assertFalse(cx.in_transaction)
497 cx.execute("INSERT INTO T VALUES(1)")
498 self.assertTrue(cx.in_transaction)
499 self.assertFalse(cx.in_transaction)
500
501 def test_autocommit_enabled_executescript(self):
502 expected = ["BEGIN", "SELECT 1"]
503 with memory_database(autocommit=True) as cx:
504 with self.check_stmt_trace(cx, expected):
505 self.assertFalse(cx.in_transaction)
506 cx.execute("BEGIN")
507 cx.executescript("SELECT 1")
508 self.assertTrue(cx.in_transaction)
509
510 def test_autocommit_disabled_executescript(self):
511 expected = ["SELECT 1"]
512 with memory_database(autocommit=False) as cx:
513 with self.check_stmt_trace(cx, expected):
514 self.assertTrue(cx.in_transaction)
515 cx.executescript("SELECT 1")
516 self.assertTrue(cx.in_transaction)
517
518 def test_autocommit_compat_executescript(self):
519 expected = ["BEGIN", "COMMIT", "SELECT 1"]
520 with memory_database(autocommit=self.legacy) as cx:
521 with self.check_stmt_trace(cx, expected):
522 self.assertFalse(cx.in_transaction)
523 cx.execute("BEGIN")
524 cx.executescript("SELECT 1")
525 self.assertFalse(cx.in_transaction)
526
527 def test_autocommit_disabled_implicit_shutdown(self):
528 # The implicit ROLLBACK should not call back into Python during
529 # interpreter tear-down.
530 code = """if 1:
531 import sqlite3
532 cx = sqlite3.connect(":memory:", autocommit=False)
533 cx.set_trace_callback(print)
534 """
535 assert_python_ok("-c", code, PYTHONIOENCODING="utf-8")
536
537
538 if __name__ == "__main__":
539 unittest.main()