(root)/
Python-3.11.7/
Lib/
test/
test_sqlite3/
test_transactions.py
       1  # pysqlite2/test/transactions.py: tests transactions
       2  #
       3  # Copyright (C) 2005-2007 Gerhard Häring <gh@ghaering.de>
       4  #
       5  # This file is part of pysqlite.
       6  #
       7  # This software is provided 'as-is', without any express or implied
       8  # warranty.  In no event will the authors be held liable for any damages
       9  # arising from the use of this software.
      10  #
      11  # Permission is granted to anyone to use this software for any purpose,
      12  # including commercial applications, and to alter it and redistribute it
      13  # freely, subject to the following restrictions:
      14  #
      15  # 1. The origin of this software must not be misrepresented; you must not
      16  #    claim that you wrote the original software. If you use this software
      17  #    in a product, an acknowledgment in the product documentation would be
      18  #    appreciated but is not required.
      19  # 2. Altered source versions must be plainly marked as such, and must not be
      20  #    misrepresented as being the original software.
      21  # 3. This notice may not be removed or altered from any source distribution.
      22  
      23  import os, unittest
      24  import sqlite3 as sqlite
      25  
      26  from test.support.os_helper import TESTFN, unlink
      27  
      28  from test.test_sqlite3.test_dbapi import memory_database
      29  
      30  
      31  class ESC[4;38;5;81mTransactionTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      32      def setUp(self):
      33          # We can disable the busy handlers, since we control
      34          # the order of SQLite C API operations.
      35          self.con1 = sqlite.connect(TESTFN, timeout=0)
      36          self.cur1 = self.con1.cursor()
      37  
      38          self.con2 = sqlite.connect(TESTFN, timeout=0)
      39          self.cur2 = self.con2.cursor()
      40  
      41      def tearDown(self):
      42          try:
      43              self.cur1.close()
      44              self.con1.close()
      45  
      46              self.cur2.close()
      47              self.con2.close()
      48  
      49          finally:
      50              unlink(TESTFN)
      51  
      52      def test_dml_does_not_auto_commit_before(self):
      53          self.cur1.execute("create table test(i)")
      54          self.cur1.execute("insert into test(i) values (5)")
      55          self.cur1.execute("create table test2(j)")
      56          self.cur2.execute("select i from test")
      57          res = self.cur2.fetchall()
      58          self.assertEqual(len(res), 0)
      59  
      60      def test_insert_starts_transaction(self):
      61          self.cur1.execute("create table test(i)")
      62          self.cur1.execute("insert into test(i) values (5)")
      63          self.cur2.execute("select i from test")
      64          res = self.cur2.fetchall()
      65          self.assertEqual(len(res), 0)
      66  
      67      def test_update_starts_transaction(self):
      68          self.cur1.execute("create table test(i)")
      69          self.cur1.execute("insert into test(i) values (5)")
      70          self.con1.commit()
      71          self.cur1.execute("update test set i=6")
      72          self.cur2.execute("select i from test")
      73          res = self.cur2.fetchone()[0]
      74          self.assertEqual(res, 5)
      75  
      76      def test_delete_starts_transaction(self):
      77          self.cur1.execute("create table test(i)")
      78          self.cur1.execute("insert into test(i) values (5)")
      79          self.con1.commit()
      80          self.cur1.execute("delete from test")
      81          self.cur2.execute("select i from test")
      82          res = self.cur2.fetchall()
      83          self.assertEqual(len(res), 1)
      84  
      85      def test_replace_starts_transaction(self):
      86          self.cur1.execute("create table test(i)")
      87          self.cur1.execute("insert into test(i) values (5)")
      88          self.con1.commit()
      89          self.cur1.execute("replace into test(i) values (6)")
      90          self.cur2.execute("select i from test")
      91          res = self.cur2.fetchall()
      92          self.assertEqual(len(res), 1)
      93          self.assertEqual(res[0][0], 5)
      94  
      95      def test_toggle_auto_commit(self):
      96          self.cur1.execute("create table test(i)")
      97          self.cur1.execute("insert into test(i) values (5)")
      98          self.con1.isolation_level = None
      99          self.assertEqual(self.con1.isolation_level, None)
     100          self.cur2.execute("select i from test")
     101          res = self.cur2.fetchall()
     102          self.assertEqual(len(res), 1)
     103  
     104          self.con1.isolation_level = "DEFERRED"
     105          self.assertEqual(self.con1.isolation_level , "DEFERRED")
     106          self.cur1.execute("insert into test(i) values (5)")
     107          self.cur2.execute("select i from test")
     108          res = self.cur2.fetchall()
     109          self.assertEqual(len(res), 1)
     110  
     111      def test_raise_timeout(self):
     112          self.cur1.execute("create table test(i)")
     113          self.cur1.execute("insert into test(i) values (5)")
     114          with self.assertRaises(sqlite.OperationalError):
     115              self.cur2.execute("insert into test(i) values (5)")
     116  
     117      def test_locking(self):
     118          # This tests the improved concurrency with pysqlite 2.3.4. You needed
     119          # to roll back con2 before you could commit con1.
     120          self.cur1.execute("create table test(i)")
     121          self.cur1.execute("insert into test(i) values (5)")
     122          with self.assertRaises(sqlite.OperationalError):
     123              self.cur2.execute("insert into test(i) values (5)")
     124          # NO self.con2.rollback() HERE!!!
     125          self.con1.commit()
     126  
     127      def test_rollback_cursor_consistency(self):
     128          """Check that cursors behave correctly after rollback."""
     129          con = sqlite.connect(":memory:")
     130          cur = con.cursor()
     131          cur.execute("create table test(x)")
     132          cur.execute("insert into test(x) values (5)")
     133          cur.execute("select 1 union select 2 union select 3")
     134  
     135          con.rollback()
     136          self.assertEqual(cur.fetchall(), [(1,), (2,), (3,)])
     137  
     138      def test_multiple_cursors_and_iternext(self):
     139          # gh-94028: statements are cleared and reset in cursor iternext.
     140  
     141          # Provoke the gh-94028 by using a cursor cache.
     142          CURSORS = {}
     143          def sql(cx, sql, *args):
     144              cu = cx.cursor()
     145              cu.execute(sql, args)
     146              CURSORS[id(sql)] = cu
     147              return cu
     148  
     149          self.con1.execute("create table t(t)")
     150          sql(self.con1, "insert into t values (?), (?), (?)", "u1", "u2", "u3")
     151          self.con1.commit()
     152  
     153          # On second connection, verify rows are visible, then delete them.
     154          count = sql(self.con2, "select count(*) from t").fetchone()[0]
     155          self.assertEqual(count, 3)
     156          changes = sql(self.con2, "delete from t").rowcount
     157          self.assertEqual(changes, 3)
     158          self.con2.commit()
     159  
     160          # Back in original connection, create 2 new users.
     161          sql(self.con1, "insert into t values (?)", "u4")
     162          sql(self.con1, "insert into t values (?)", "u5")
     163  
     164          # The second connection cannot see uncommitted changes.
     165          count = sql(self.con2, "select count(*) from t").fetchone()[0]
     166          self.assertEqual(count, 0)
     167  
     168          # First connection can see its own changes.
     169          count = sql(self.con1, "select count(*) from t").fetchone()[0]
     170          self.assertEqual(count, 2)
     171  
     172          # The second connection can now see the changes.
     173          self.con1.commit()
     174          count = sql(self.con2, "select count(*) from t").fetchone()[0]
     175          self.assertEqual(count, 2)
     176  
     177  
     178  class ESC[4;38;5;81mRollbackTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     179      """bpo-44092: sqlite3 now leaves it to SQLite to resolve rollback issues"""
     180  
     181      def setUp(self):
     182          self.con = sqlite.connect(":memory:")
     183          self.cur1 = self.con.cursor()
     184          self.cur2 = self.con.cursor()
     185          with self.con:
     186              self.con.execute("create table t(c)");
     187              self.con.executemany("insert into t values(?)", [(0,), (1,), (2,)])
     188          self.cur1.execute("begin transaction")
     189          select = "select c from t"
     190          self.cur1.execute(select)
     191          self.con.rollback()
     192          self.res = self.cur2.execute(select)  # Reusing stmt from cache
     193  
     194      def tearDown(self):
     195          self.con.close()
     196  
     197      def _check_rows(self):
     198          for i, row in enumerate(self.res):
     199              self.assertEqual(row[0], i)
     200  
     201      def test_no_duplicate_rows_after_rollback_del_cursor(self):
     202          del self.cur1
     203          self._check_rows()
     204  
     205      def test_no_duplicate_rows_after_rollback_close_cursor(self):
     206          self.cur1.close()
     207          self._check_rows()
     208  
     209      def test_no_duplicate_rows_after_rollback_new_query(self):
     210          self.cur1.execute("select c from t where c = 1")
     211          self._check_rows()
     212  
     213  
     214  
     215  class ESC[4;38;5;81mSpecialCommandTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     216      def setUp(self):
     217          self.con = sqlite.connect(":memory:")
     218          self.cur = self.con.cursor()
     219  
     220      def test_drop_table(self):
     221          self.cur.execute("create table test(i)")
     222          self.cur.execute("insert into test(i) values (5)")
     223          self.cur.execute("drop table test")
     224  
     225      def test_pragma(self):
     226          self.cur.execute("create table test(i)")
     227          self.cur.execute("insert into test(i) values (5)")
     228          self.cur.execute("pragma count_changes=1")
     229  
     230      def tearDown(self):
     231          self.cur.close()
     232          self.con.close()
     233  
     234  
     235  class ESC[4;38;5;81mTransactionalDDL(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     236      def setUp(self):
     237          self.con = sqlite.connect(":memory:")
     238  
     239      def test_ddl_does_not_autostart_transaction(self):
     240          # For backwards compatibility reasons, DDL statements should not
     241          # implicitly start a transaction.
     242          self.con.execute("create table test(i)")
     243          self.con.rollback()
     244          result = self.con.execute("select * from test").fetchall()
     245          self.assertEqual(result, [])
     246  
     247      def test_immediate_transactional_ddl(self):
     248          # You can achieve transactional DDL by issuing a BEGIN
     249          # statement manually.
     250          self.con.execute("begin immediate")
     251          self.con.execute("create table test(i)")
     252          self.con.rollback()
     253          with self.assertRaises(sqlite.OperationalError):
     254              self.con.execute("select * from test")
     255  
     256      def test_transactional_ddl(self):
     257          # You can achieve transactional DDL by issuing a BEGIN
     258          # statement manually.
     259          self.con.execute("begin")
     260          self.con.execute("create table test(i)")
     261          self.con.rollback()
     262          with self.assertRaises(sqlite.OperationalError):
     263              self.con.execute("select * from test")
     264  
     265      def tearDown(self):
     266          self.con.close()
     267  
     268  
     269  class ESC[4;38;5;81mIsolationLevelFromInit(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     270      CREATE = "create table t(t)"
     271      INSERT = "insert into t values(1)"
     272  
     273      def setUp(self):
     274          self.traced = []
     275  
     276      def _run_test(self, cx):
     277          cx.execute(self.CREATE)
     278          cx.set_trace_callback(lambda stmt: self.traced.append(stmt))
     279          with cx:
     280              cx.execute(self.INSERT)
     281  
     282      def test_isolation_level_default(self):
     283          with memory_database() as cx:
     284              self._run_test(cx)
     285              self.assertEqual(self.traced, ["BEGIN ", self.INSERT, "COMMIT"])
     286  
     287      def test_isolation_level_begin(self):
     288          with memory_database(isolation_level="") as cx:
     289              self._run_test(cx)
     290              self.assertEqual(self.traced, ["BEGIN ", self.INSERT, "COMMIT"])
     291  
     292      def test_isolation_level_deferred(self):
     293          with memory_database(isolation_level="DEFERRED") as cx:
     294              self._run_test(cx)
     295              self.assertEqual(self.traced, ["BEGIN DEFERRED", self.INSERT, "COMMIT"])
     296  
     297      def test_isolation_level_immediate(self):
     298          with memory_database(isolation_level="IMMEDIATE") as cx:
     299              self._run_test(cx)
     300              self.assertEqual(self.traced,
     301                               ["BEGIN IMMEDIATE", self.INSERT, "COMMIT"])
     302  
     303      def test_isolation_level_exclusive(self):
     304          with memory_database(isolation_level="EXCLUSIVE") as cx:
     305              self._run_test(cx)
     306              self.assertEqual(self.traced,
     307                               ["BEGIN EXCLUSIVE", self.INSERT, "COMMIT"])
     308  
     309      def test_isolation_level_none(self):
     310          with memory_database(isolation_level=None) as cx:
     311              self._run_test(cx)
     312              self.assertEqual(self.traced, [self.INSERT])
     313  
     314  
     315  class ESC[4;38;5;81mIsolationLevelPostInit(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     316      QUERY = "insert into t values(1)"
     317  
     318      def setUp(self):
     319          self.cx = sqlite.connect(":memory:")
     320          self.cx.execute("create table t(t)")
     321          self.traced = []
     322          self.cx.set_trace_callback(lambda stmt: self.traced.append(stmt))
     323  
     324      def tearDown(self):
     325          self.cx.close()
     326  
     327      def test_isolation_level_default(self):
     328          with self.cx:
     329              self.cx.execute(self.QUERY)
     330          self.assertEqual(self.traced, ["BEGIN ", self.QUERY, "COMMIT"])
     331  
     332      def test_isolation_level_begin(self):
     333          self.cx.isolation_level = ""
     334          with self.cx:
     335              self.cx.execute(self.QUERY)
     336          self.assertEqual(self.traced, ["BEGIN ", self.QUERY, "COMMIT"])
     337  
     338      def test_isolation_level_deferrred(self):
     339          self.cx.isolation_level = "DEFERRED"
     340          with self.cx:
     341              self.cx.execute(self.QUERY)
     342          self.assertEqual(self.traced, ["BEGIN DEFERRED", self.QUERY, "COMMIT"])
     343  
     344      def test_isolation_level_immediate(self):
     345          self.cx.isolation_level = "IMMEDIATE"
     346          with self.cx:
     347              self.cx.execute(self.QUERY)
     348          self.assertEqual(self.traced,
     349                           ["BEGIN IMMEDIATE", self.QUERY, "COMMIT"])
     350  
     351      def test_isolation_level_exclusive(self):
     352          self.cx.isolation_level = "EXCLUSIVE"
     353          with self.cx:
     354              self.cx.execute(self.QUERY)
     355          self.assertEqual(self.traced,
     356                           ["BEGIN EXCLUSIVE", self.QUERY, "COMMIT"])
     357  
     358      def test_isolation_level_none(self):
     359          self.cx.isolation_level = None
     360          with self.cx:
     361              self.cx.execute(self.QUERY)
     362          self.assertEqual(self.traced, [self.QUERY])
     363  
     364  
     365  if __name__ == "__main__":
     366      unittest.main()