1 # pysqlite2/test/transactions.py: tests transactions
2 #
3 # Copyright (C) 2005-2007 Gerhard Häring <gh@ghaering.de>
4 #
5 # This file is part of pysqlite.
6 #
7 # This software is provided 'as-is', without any express or implied
8 # warranty. In no event will the authors be held liable for any damages
9 # arising from the use of this software.
10 #
11 # Permission is granted to anyone to use this software for any purpose,
12 # including commercial applications, and to alter it and redistribute it
13 # freely, subject to the following restrictions:
14 #
15 # 1. The origin of this software must not be misrepresented; you must not
16 # claim that you wrote the original software. If you use this software
17 # in a product, an acknowledgment in the product documentation would be
18 # appreciated but is not required.
19 # 2. Altered source versions must be plainly marked as such, and must not be
20 # misrepresented as being the original software.
21 # 3. This notice may not be removed or altered from any source distribution.
22
23 import os, unittest
24 import sqlite3 as sqlite
25
26 from test.support.os_helper import TESTFN, unlink
27
28 from test.test_sqlite3.test_dbapi import memory_database
29
30
31 class ESC[4;38;5;81mTransactionTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
32 def setUp(self):
33 # We can disable the busy handlers, since we control
34 # the order of SQLite C API operations.
35 self.con1 = sqlite.connect(TESTFN, timeout=0)
36 self.cur1 = self.con1.cursor()
37
38 self.con2 = sqlite.connect(TESTFN, timeout=0)
39 self.cur2 = self.con2.cursor()
40
41 def tearDown(self):
42 try:
43 self.cur1.close()
44 self.con1.close()
45
46 self.cur2.close()
47 self.con2.close()
48
49 finally:
50 unlink(TESTFN)
51
52 def test_dml_does_not_auto_commit_before(self):
53 self.cur1.execute("create table test(i)")
54 self.cur1.execute("insert into test(i) values (5)")
55 self.cur1.execute("create table test2(j)")
56 self.cur2.execute("select i from test")
57 res = self.cur2.fetchall()
58 self.assertEqual(len(res), 0)
59
60 def test_insert_starts_transaction(self):
61 self.cur1.execute("create table test(i)")
62 self.cur1.execute("insert into test(i) values (5)")
63 self.cur2.execute("select i from test")
64 res = self.cur2.fetchall()
65 self.assertEqual(len(res), 0)
66
67 def test_update_starts_transaction(self):
68 self.cur1.execute("create table test(i)")
69 self.cur1.execute("insert into test(i) values (5)")
70 self.con1.commit()
71 self.cur1.execute("update test set i=6")
72 self.cur2.execute("select i from test")
73 res = self.cur2.fetchone()[0]
74 self.assertEqual(res, 5)
75
76 def test_delete_starts_transaction(self):
77 self.cur1.execute("create table test(i)")
78 self.cur1.execute("insert into test(i) values (5)")
79 self.con1.commit()
80 self.cur1.execute("delete from test")
81 self.cur2.execute("select i from test")
82 res = self.cur2.fetchall()
83 self.assertEqual(len(res), 1)
84
85 def test_replace_starts_transaction(self):
86 self.cur1.execute("create table test(i)")
87 self.cur1.execute("insert into test(i) values (5)")
88 self.con1.commit()
89 self.cur1.execute("replace into test(i) values (6)")
90 self.cur2.execute("select i from test")
91 res = self.cur2.fetchall()
92 self.assertEqual(len(res), 1)
93 self.assertEqual(res[0][0], 5)
94
95 def test_toggle_auto_commit(self):
96 self.cur1.execute("create table test(i)")
97 self.cur1.execute("insert into test(i) values (5)")
98 self.con1.isolation_level = None
99 self.assertEqual(self.con1.isolation_level, None)
100 self.cur2.execute("select i from test")
101 res = self.cur2.fetchall()
102 self.assertEqual(len(res), 1)
103
104 self.con1.isolation_level = "DEFERRED"
105 self.assertEqual(self.con1.isolation_level , "DEFERRED")
106 self.cur1.execute("insert into test(i) values (5)")
107 self.cur2.execute("select i from test")
108 res = self.cur2.fetchall()
109 self.assertEqual(len(res), 1)
110
111 def test_raise_timeout(self):
112 self.cur1.execute("create table test(i)")
113 self.cur1.execute("insert into test(i) values (5)")
114 with self.assertRaises(sqlite.OperationalError):
115 self.cur2.execute("insert into test(i) values (5)")
116
117 def test_locking(self):
118 # This tests the improved concurrency with pysqlite 2.3.4. You needed
119 # to roll back con2 before you could commit con1.
120 self.cur1.execute("create table test(i)")
121 self.cur1.execute("insert into test(i) values (5)")
122 with self.assertRaises(sqlite.OperationalError):
123 self.cur2.execute("insert into test(i) values (5)")
124 # NO self.con2.rollback() HERE!!!
125 self.con1.commit()
126
127 def test_rollback_cursor_consistency(self):
128 """Check that cursors behave correctly after rollback."""
129 con = sqlite.connect(":memory:")
130 cur = con.cursor()
131 cur.execute("create table test(x)")
132 cur.execute("insert into test(x) values (5)")
133 cur.execute("select 1 union select 2 union select 3")
134
135 con.rollback()
136 self.assertEqual(cur.fetchall(), [(1,), (2,), (3,)])
137
138 def test_multiple_cursors_and_iternext(self):
139 # gh-94028: statements are cleared and reset in cursor iternext.
140
141 # Provoke the gh-94028 by using a cursor cache.
142 CURSORS = {}
143 def sql(cx, sql, *args):
144 cu = cx.cursor()
145 cu.execute(sql, args)
146 CURSORS[id(sql)] = cu
147 return cu
148
149 self.con1.execute("create table t(t)")
150 sql(self.con1, "insert into t values (?), (?), (?)", "u1", "u2", "u3")
151 self.con1.commit()
152
153 # On second connection, verify rows are visible, then delete them.
154 count = sql(self.con2, "select count(*) from t").fetchone()[0]
155 self.assertEqual(count, 3)
156 changes = sql(self.con2, "delete from t").rowcount
157 self.assertEqual(changes, 3)
158 self.con2.commit()
159
160 # Back in original connection, create 2 new users.
161 sql(self.con1, "insert into t values (?)", "u4")
162 sql(self.con1, "insert into t values (?)", "u5")
163
164 # The second connection cannot see uncommitted changes.
165 count = sql(self.con2, "select count(*) from t").fetchone()[0]
166 self.assertEqual(count, 0)
167
168 # First connection can see its own changes.
169 count = sql(self.con1, "select count(*) from t").fetchone()[0]
170 self.assertEqual(count, 2)
171
172 # The second connection can now see the changes.
173 self.con1.commit()
174 count = sql(self.con2, "select count(*) from t").fetchone()[0]
175 self.assertEqual(count, 2)
176
177
178 class ESC[4;38;5;81mRollbackTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
179 """bpo-44092: sqlite3 now leaves it to SQLite to resolve rollback issues"""
180
181 def setUp(self):
182 self.con = sqlite.connect(":memory:")
183 self.cur1 = self.con.cursor()
184 self.cur2 = self.con.cursor()
185 with self.con:
186 self.con.execute("create table t(c)");
187 self.con.executemany("insert into t values(?)", [(0,), (1,), (2,)])
188 self.cur1.execute("begin transaction")
189 select = "select c from t"
190 self.cur1.execute(select)
191 self.con.rollback()
192 self.res = self.cur2.execute(select) # Reusing stmt from cache
193
194 def tearDown(self):
195 self.con.close()
196
197 def _check_rows(self):
198 for i, row in enumerate(self.res):
199 self.assertEqual(row[0], i)
200
201 def test_no_duplicate_rows_after_rollback_del_cursor(self):
202 del self.cur1
203 self._check_rows()
204
205 def test_no_duplicate_rows_after_rollback_close_cursor(self):
206 self.cur1.close()
207 self._check_rows()
208
209 def test_no_duplicate_rows_after_rollback_new_query(self):
210 self.cur1.execute("select c from t where c = 1")
211 self._check_rows()
212
213
214
215 class ESC[4;38;5;81mSpecialCommandTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
216 def setUp(self):
217 self.con = sqlite.connect(":memory:")
218 self.cur = self.con.cursor()
219
220 def test_drop_table(self):
221 self.cur.execute("create table test(i)")
222 self.cur.execute("insert into test(i) values (5)")
223 self.cur.execute("drop table test")
224
225 def test_pragma(self):
226 self.cur.execute("create table test(i)")
227 self.cur.execute("insert into test(i) values (5)")
228 self.cur.execute("pragma count_changes=1")
229
230 def tearDown(self):
231 self.cur.close()
232 self.con.close()
233
234
235 class ESC[4;38;5;81mTransactionalDDL(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
236 def setUp(self):
237 self.con = sqlite.connect(":memory:")
238
239 def test_ddl_does_not_autostart_transaction(self):
240 # For backwards compatibility reasons, DDL statements should not
241 # implicitly start a transaction.
242 self.con.execute("create table test(i)")
243 self.con.rollback()
244 result = self.con.execute("select * from test").fetchall()
245 self.assertEqual(result, [])
246
247 def test_immediate_transactional_ddl(self):
248 # You can achieve transactional DDL by issuing a BEGIN
249 # statement manually.
250 self.con.execute("begin immediate")
251 self.con.execute("create table test(i)")
252 self.con.rollback()
253 with self.assertRaises(sqlite.OperationalError):
254 self.con.execute("select * from test")
255
256 def test_transactional_ddl(self):
257 # You can achieve transactional DDL by issuing a BEGIN
258 # statement manually.
259 self.con.execute("begin")
260 self.con.execute("create table test(i)")
261 self.con.rollback()
262 with self.assertRaises(sqlite.OperationalError):
263 self.con.execute("select * from test")
264
265 def tearDown(self):
266 self.con.close()
267
268
269 class ESC[4;38;5;81mIsolationLevelFromInit(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
270 CREATE = "create table t(t)"
271 INSERT = "insert into t values(1)"
272
273 def setUp(self):
274 self.traced = []
275
276 def _run_test(self, cx):
277 cx.execute(self.CREATE)
278 cx.set_trace_callback(lambda stmt: self.traced.append(stmt))
279 with cx:
280 cx.execute(self.INSERT)
281
282 def test_isolation_level_default(self):
283 with memory_database() as cx:
284 self._run_test(cx)
285 self.assertEqual(self.traced, ["BEGIN ", self.INSERT, "COMMIT"])
286
287 def test_isolation_level_begin(self):
288 with memory_database(isolation_level="") as cx:
289 self._run_test(cx)
290 self.assertEqual(self.traced, ["BEGIN ", self.INSERT, "COMMIT"])
291
292 def test_isolation_level_deferred(self):
293 with memory_database(isolation_level="DEFERRED") as cx:
294 self._run_test(cx)
295 self.assertEqual(self.traced, ["BEGIN DEFERRED", self.INSERT, "COMMIT"])
296
297 def test_isolation_level_immediate(self):
298 with memory_database(isolation_level="IMMEDIATE") as cx:
299 self._run_test(cx)
300 self.assertEqual(self.traced,
301 ["BEGIN IMMEDIATE", self.INSERT, "COMMIT"])
302
303 def test_isolation_level_exclusive(self):
304 with memory_database(isolation_level="EXCLUSIVE") as cx:
305 self._run_test(cx)
306 self.assertEqual(self.traced,
307 ["BEGIN EXCLUSIVE", self.INSERT, "COMMIT"])
308
309 def test_isolation_level_none(self):
310 with memory_database(isolation_level=None) as cx:
311 self._run_test(cx)
312 self.assertEqual(self.traced, [self.INSERT])
313
314
315 class ESC[4;38;5;81mIsolationLevelPostInit(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
316 QUERY = "insert into t values(1)"
317
318 def setUp(self):
319 self.cx = sqlite.connect(":memory:")
320 self.cx.execute("create table t(t)")
321 self.traced = []
322 self.cx.set_trace_callback(lambda stmt: self.traced.append(stmt))
323
324 def tearDown(self):
325 self.cx.close()
326
327 def test_isolation_level_default(self):
328 with self.cx:
329 self.cx.execute(self.QUERY)
330 self.assertEqual(self.traced, ["BEGIN ", self.QUERY, "COMMIT"])
331
332 def test_isolation_level_begin(self):
333 self.cx.isolation_level = ""
334 with self.cx:
335 self.cx.execute(self.QUERY)
336 self.assertEqual(self.traced, ["BEGIN ", self.QUERY, "COMMIT"])
337
338 def test_isolation_level_deferrred(self):
339 self.cx.isolation_level = "DEFERRED"
340 with self.cx:
341 self.cx.execute(self.QUERY)
342 self.assertEqual(self.traced, ["BEGIN DEFERRED", self.QUERY, "COMMIT"])
343
344 def test_isolation_level_immediate(self):
345 self.cx.isolation_level = "IMMEDIATE"
346 with self.cx:
347 self.cx.execute(self.QUERY)
348 self.assertEqual(self.traced,
349 ["BEGIN IMMEDIATE", self.QUERY, "COMMIT"])
350
351 def test_isolation_level_exclusive(self):
352 self.cx.isolation_level = "EXCLUSIVE"
353 with self.cx:
354 self.cx.execute(self.QUERY)
355 self.assertEqual(self.traced,
356 ["BEGIN EXCLUSIVE", self.QUERY, "COMMIT"])
357
358 def test_isolation_level_none(self):
359 self.cx.isolation_level = None
360 with self.cx:
361 self.cx.execute(self.QUERY)
362 self.assertEqual(self.traced, [self.QUERY])
363
364
365 if __name__ == "__main__":
366 unittest.main()