1  # pysqlite2/test/transactions.py: tests transactions
       2  #
       3  # Copyright (C) 2005-2007 Gerhard Häring <gh@ghaering.de>
       4  #
       5  # This file is part of pysqlite.
       6  #
       7  # This software is provided 'as-is', without any express or implied
       8  # warranty.  In no event will the authors be held liable for any damages
       9  # arising from the use of this software.
      10  #
      11  # Permission is granted to anyone to use this software for any purpose,
      12  # including commercial applications, and to alter it and redistribute it
      13  # freely, subject to the following restrictions:
      14  #
      15  # 1. The origin of this software must not be misrepresented; you must not
      16  #    claim that you wrote the original software. If you use this software
      17  #    in a product, an acknowledgment in the product documentation would be
      18  #    appreciated but is not required.
      19  # 2. Altered source versions must be plainly marked as such, and must not be
      20  #    misrepresented as being the original software.
      21  # 3. This notice may not be removed or altered from any source distribution.
      22  
      23  import unittest
      24  import sqlite3 as sqlite
      25  from contextlib import contextmanager
      26  
      27  from test.support.os_helper import TESTFN, unlink
      28  from test.support.script_helper import assert_python_ok
      29  
      30  from test.test_sqlite3.test_dbapi import memory_database
      31  
      32  
      33  class ESC[4;38;5;81mTransactionTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      34      def setUp(self):
      35          # We can disable the busy handlers, since we control
      36          # the order of SQLite C API operations.
      37          self.con1 = sqlite.connect(TESTFN, timeout=0)
      38          self.cur1 = self.con1.cursor()
      39  
      40          self.con2 = sqlite.connect(TESTFN, timeout=0)
      41          self.cur2 = self.con2.cursor()
      42  
      43      def tearDown(self):
      44          try:
      45              self.cur1.close()
      46              self.con1.close()
      47  
      48              self.cur2.close()
      49              self.con2.close()
      50  
      51          finally:
      52              unlink(TESTFN)
      53  
      54      def test_dml_does_not_auto_commit_before(self):
      55          self.cur1.execute("create table test(i)")
      56          self.cur1.execute("insert into test(i) values (5)")
      57          self.cur1.execute("create table test2(j)")
      58          self.cur2.execute("select i from test")
      59          res = self.cur2.fetchall()
      60          self.assertEqual(len(res), 0)
      61  
      62      def test_insert_starts_transaction(self):
      63          self.cur1.execute("create table test(i)")
      64          self.cur1.execute("insert into test(i) values (5)")
      65          self.cur2.execute("select i from test")
      66          res = self.cur2.fetchall()
      67          self.assertEqual(len(res), 0)
      68  
      69      def test_update_starts_transaction(self):
      70          self.cur1.execute("create table test(i)")
      71          self.cur1.execute("insert into test(i) values (5)")
      72          self.con1.commit()
      73          self.cur1.execute("update test set i=6")
      74          self.cur2.execute("select i from test")
      75          res = self.cur2.fetchone()[0]
      76          self.assertEqual(res, 5)
      77  
      78      def test_delete_starts_transaction(self):
      79          self.cur1.execute("create table test(i)")
      80          self.cur1.execute("insert into test(i) values (5)")
      81          self.con1.commit()
      82          self.cur1.execute("delete from test")
      83          self.cur2.execute("select i from test")
      84          res = self.cur2.fetchall()
      85          self.assertEqual(len(res), 1)
      86  
      87      def test_replace_starts_transaction(self):
      88          self.cur1.execute("create table test(i)")
      89          self.cur1.execute("insert into test(i) values (5)")
      90          self.con1.commit()
      91          self.cur1.execute("replace into test(i) values (6)")
      92          self.cur2.execute("select i from test")
      93          res = self.cur2.fetchall()
      94          self.assertEqual(len(res), 1)
      95          self.assertEqual(res[0][0], 5)
      96  
      97      def test_toggle_auto_commit(self):
      98          self.cur1.execute("create table test(i)")
      99          self.cur1.execute("insert into test(i) values (5)")
     100          self.con1.isolation_level = None
     101          self.assertEqual(self.con1.isolation_level, None)
     102          self.cur2.execute("select i from test")
     103          res = self.cur2.fetchall()
     104          self.assertEqual(len(res), 1)
     105  
     106          self.con1.isolation_level = "DEFERRED"
     107          self.assertEqual(self.con1.isolation_level , "DEFERRED")
     108          self.cur1.execute("insert into test(i) values (5)")
     109          self.cur2.execute("select i from test")
     110          res = self.cur2.fetchall()
     111          self.assertEqual(len(res), 1)
     112  
     113      def test_raise_timeout(self):
     114          self.cur1.execute("create table test(i)")
     115          self.cur1.execute("insert into test(i) values (5)")
     116          with self.assertRaises(sqlite.OperationalError):
     117              self.cur2.execute("insert into test(i) values (5)")
     118  
     119      def test_locking(self):
     120          # This tests the improved concurrency with pysqlite 2.3.4. You needed
     121          # to roll back con2 before you could commit con1.
     122          self.cur1.execute("create table test(i)")
     123          self.cur1.execute("insert into test(i) values (5)")
     124          with self.assertRaises(sqlite.OperationalError):
     125              self.cur2.execute("insert into test(i) values (5)")
     126          # NO self.con2.rollback() HERE!!!
     127          self.con1.commit()
     128  
     129      def test_rollback_cursor_consistency(self):
     130          """Check that cursors behave correctly after rollback."""
     131          con = sqlite.connect(":memory:")
     132          cur = con.cursor()
     133          cur.execute("create table test(x)")
     134          cur.execute("insert into test(x) values (5)")
     135          cur.execute("select 1 union select 2 union select 3")
     136  
     137          con.rollback()
     138          self.assertEqual(cur.fetchall(), [(1,), (2,), (3,)])
     139  
     140      def test_multiple_cursors_and_iternext(self):
     141          # gh-94028: statements are cleared and reset in cursor iternext.
     142  
     143          # Provoke the gh-94028 by using a cursor cache.
     144          CURSORS = {}
     145          def sql(cx, sql, *args):
     146              cu = cx.cursor()
     147              cu.execute(sql, args)
     148              CURSORS[id(sql)] = cu
     149              return cu
     150  
     151          self.con1.execute("create table t(t)")
     152          sql(self.con1, "insert into t values (?), (?), (?)", "u1", "u2", "u3")
     153          self.con1.commit()
     154  
     155          # On second connection, verify rows are visible, then delete them.
     156          count = sql(self.con2, "select count(*) from t").fetchone()[0]
     157          self.assertEqual(count, 3)
     158          changes = sql(self.con2, "delete from t").rowcount
     159          self.assertEqual(changes, 3)
     160          self.con2.commit()
     161  
     162          # Back in original connection, create 2 new users.
     163          sql(self.con1, "insert into t values (?)", "u4")
     164          sql(self.con1, "insert into t values (?)", "u5")
     165  
     166          # The second connection cannot see uncommitted changes.
     167          count = sql(self.con2, "select count(*) from t").fetchone()[0]
     168          self.assertEqual(count, 0)
     169  
     170          # First connection can see its own changes.
     171          count = sql(self.con1, "select count(*) from t").fetchone()[0]
     172          self.assertEqual(count, 2)
     173  
     174          # The second connection can now see the changes.
     175          self.con1.commit()
     176          count = sql(self.con2, "select count(*) from t").fetchone()[0]
     177          self.assertEqual(count, 2)
     178  
     179  
     180  class ESC[4;38;5;81mRollbackTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     181      """bpo-44092: sqlite3 now leaves it to SQLite to resolve rollback issues"""
     182  
     183      def setUp(self):
     184          self.con = sqlite.connect(":memory:")
     185          self.cur1 = self.con.cursor()
     186          self.cur2 = self.con.cursor()
     187          with self.con:
     188              self.con.execute("create table t(c)");
     189              self.con.executemany("insert into t values(?)", [(0,), (1,), (2,)])
     190          self.cur1.execute("begin transaction")
     191          select = "select c from t"
     192          self.cur1.execute(select)
     193          self.con.rollback()
     194          self.res = self.cur2.execute(select)  # Reusing stmt from cache
     195  
     196      def tearDown(self):
     197          self.con.close()
     198  
     199      def _check_rows(self):
     200          for i, row in enumerate(self.res):
     201              self.assertEqual(row[0], i)
     202  
     203      def test_no_duplicate_rows_after_rollback_del_cursor(self):
     204          del self.cur1
     205          self._check_rows()
     206  
     207      def test_no_duplicate_rows_after_rollback_close_cursor(self):
     208          self.cur1.close()
     209          self._check_rows()
     210  
     211      def test_no_duplicate_rows_after_rollback_new_query(self):
     212          self.cur1.execute("select c from t where c = 1")
     213          self._check_rows()
     214  
     215  
     216  
     217  class ESC[4;38;5;81mSpecialCommandTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     218      def setUp(self):
     219          self.con = sqlite.connect(":memory:")
     220          self.cur = self.con.cursor()
     221  
     222      def test_drop_table(self):
     223          self.cur.execute("create table test(i)")
     224          self.cur.execute("insert into test(i) values (5)")
     225          self.cur.execute("drop table test")
     226  
     227      def test_pragma(self):
     228          self.cur.execute("create table test(i)")
     229          self.cur.execute("insert into test(i) values (5)")
     230          self.cur.execute("pragma count_changes=1")
     231  
     232      def tearDown(self):
     233          self.cur.close()
     234          self.con.close()
     235  
     236  
     237  class ESC[4;38;5;81mTransactionalDDL(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     238      def setUp(self):
     239          self.con = sqlite.connect(":memory:")
     240  
     241      def test_ddl_does_not_autostart_transaction(self):
     242          # For backwards compatibility reasons, DDL statements should not
     243          # implicitly start a transaction.
     244          self.con.execute("create table test(i)")
     245          self.con.rollback()
     246          result = self.con.execute("select * from test").fetchall()
     247          self.assertEqual(result, [])
     248  
     249      def test_immediate_transactional_ddl(self):
     250          # You can achieve transactional DDL by issuing a BEGIN
     251          # statement manually.
     252          self.con.execute("begin immediate")
     253          self.con.execute("create table test(i)")
     254          self.con.rollback()
     255          with self.assertRaises(sqlite.OperationalError):
     256              self.con.execute("select * from test")
     257  
     258      def test_transactional_ddl(self):
     259          # You can achieve transactional DDL by issuing a BEGIN
     260          # statement manually.
     261          self.con.execute("begin")
     262          self.con.execute("create table test(i)")
     263          self.con.rollback()
     264          with self.assertRaises(sqlite.OperationalError):
     265              self.con.execute("select * from test")
     266  
     267      def tearDown(self):
     268          self.con.close()
     269  
     270  
     271  class ESC[4;38;5;81mIsolationLevelFromInit(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     272      CREATE = "create table t(t)"
     273      INSERT = "insert into t values(1)"
     274  
     275      def setUp(self):
     276          self.traced = []
     277  
     278      def _run_test(self, cx):
     279          cx.execute(self.CREATE)
     280          cx.set_trace_callback(lambda stmt: self.traced.append(stmt))
     281          with cx:
     282              cx.execute(self.INSERT)
     283  
     284      def test_isolation_level_default(self):
     285          with memory_database() as cx:
     286              self._run_test(cx)
     287              self.assertEqual(self.traced, ["BEGIN ", self.INSERT, "COMMIT"])
     288  
     289      def test_isolation_level_begin(self):
     290          with memory_database(isolation_level="") as cx:
     291              self._run_test(cx)
     292              self.assertEqual(self.traced, ["BEGIN ", self.INSERT, "COMMIT"])
     293  
     294      def test_isolation_level_deferred(self):
     295          with memory_database(isolation_level="DEFERRED") as cx:
     296              self._run_test(cx)
     297              self.assertEqual(self.traced, ["BEGIN DEFERRED", self.INSERT, "COMMIT"])
     298  
     299      def test_isolation_level_immediate(self):
     300          with memory_database(isolation_level="IMMEDIATE") as cx:
     301              self._run_test(cx)
     302              self.assertEqual(self.traced,
     303                               ["BEGIN IMMEDIATE", self.INSERT, "COMMIT"])
     304  
     305      def test_isolation_level_exclusive(self):
     306          with memory_database(isolation_level="EXCLUSIVE") as cx:
     307              self._run_test(cx)
     308              self.assertEqual(self.traced,
     309                               ["BEGIN EXCLUSIVE", self.INSERT, "COMMIT"])
     310  
     311      def test_isolation_level_none(self):
     312          with memory_database(isolation_level=None) as cx:
     313              self._run_test(cx)
     314              self.assertEqual(self.traced, [self.INSERT])
     315  
     316  
     317  class ESC[4;38;5;81mIsolationLevelPostInit(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     318      QUERY = "insert into t values(1)"
     319  
     320      def setUp(self):
     321          self.cx = sqlite.connect(":memory:")
     322          self.cx.execute("create table t(t)")
     323          self.traced = []
     324          self.cx.set_trace_callback(lambda stmt: self.traced.append(stmt))
     325  
     326      def tearDown(self):
     327          self.cx.close()
     328  
     329      def test_isolation_level_default(self):
     330          with self.cx:
     331              self.cx.execute(self.QUERY)
     332          self.assertEqual(self.traced, ["BEGIN ", self.QUERY, "COMMIT"])
     333  
     334      def test_isolation_level_begin(self):
     335          self.cx.isolation_level = ""
     336          with self.cx:
     337              self.cx.execute(self.QUERY)
     338          self.assertEqual(self.traced, ["BEGIN ", self.QUERY, "COMMIT"])
     339  
     340      def test_isolation_level_deferrred(self):
     341          self.cx.isolation_level = "DEFERRED"
     342          with self.cx:
     343              self.cx.execute(self.QUERY)
     344          self.assertEqual(self.traced, ["BEGIN DEFERRED", self.QUERY, "COMMIT"])
     345  
     346      def test_isolation_level_immediate(self):
     347          self.cx.isolation_level = "IMMEDIATE"
     348          with self.cx:
     349              self.cx.execute(self.QUERY)
     350          self.assertEqual(self.traced,
     351                           ["BEGIN IMMEDIATE", self.QUERY, "COMMIT"])
     352  
     353      def test_isolation_level_exclusive(self):
     354          self.cx.isolation_level = "EXCLUSIVE"
     355          with self.cx:
     356              self.cx.execute(self.QUERY)
     357          self.assertEqual(self.traced,
     358                           ["BEGIN EXCLUSIVE", self.QUERY, "COMMIT"])
     359  
     360      def test_isolation_level_none(self):
     361          self.cx.isolation_level = None
     362          with self.cx:
     363              self.cx.execute(self.QUERY)
     364          self.assertEqual(self.traced, [self.QUERY])
     365  
     366  
     367  class ESC[4;38;5;81mAutocommitAttribute(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     368      """Test PEP 249-compliant autocommit behaviour."""
     369      legacy = sqlite.LEGACY_TRANSACTION_CONTROL
     370  
     371      @contextmanager
     372      def check_stmt_trace(self, cx, expected, reset=True):
     373          try:
     374              traced = []
     375              cx.set_trace_callback(lambda stmt: traced.append(stmt))
     376              yield
     377          finally:
     378              self.assertEqual(traced, expected)
     379              if reset:
     380                  cx.set_trace_callback(None)
     381  
     382      def test_autocommit_default(self):
     383          with memory_database() as cx:
     384              self.assertEqual(cx.autocommit,
     385                               sqlite.LEGACY_TRANSACTION_CONTROL)
     386  
     387      def test_autocommit_setget(self):
     388          dataset = (
     389              True,
     390              False,
     391              sqlite.LEGACY_TRANSACTION_CONTROL,
     392          )
     393          for mode in dataset:
     394              with self.subTest(mode=mode):
     395                  with memory_database(autocommit=mode) as cx:
     396                      self.assertEqual(cx.autocommit, mode)
     397                  with memory_database() as cx:
     398                      cx.autocommit = mode
     399                      self.assertEqual(cx.autocommit, mode)
     400  
     401      def test_autocommit_setget_invalid(self):
     402          msg = "autocommit must be True, False, or.*LEGACY"
     403          for mode in "a", 12, (), None:
     404              with self.subTest(mode=mode):
     405                  with self.assertRaisesRegex(ValueError, msg):
     406                      sqlite.connect(":memory:", autocommit=mode)
     407  
     408      def test_autocommit_disabled(self):
     409          expected = [
     410              "SELECT 1",
     411              "COMMIT",
     412              "BEGIN",
     413              "ROLLBACK",
     414              "BEGIN",
     415          ]
     416          with memory_database(autocommit=False) as cx:
     417              self.assertTrue(cx.in_transaction)
     418              with self.check_stmt_trace(cx, expected):
     419                  cx.execute("SELECT 1")
     420                  cx.commit()
     421                  cx.rollback()
     422  
     423      def test_autocommit_disabled_implicit_rollback(self):
     424          expected = ["ROLLBACK"]
     425          with memory_database(autocommit=False) as cx:
     426              self.assertTrue(cx.in_transaction)
     427              with self.check_stmt_trace(cx, expected, reset=False):
     428                  cx.close()
     429  
     430      def test_autocommit_enabled(self):
     431          expected = ["CREATE TABLE t(t)", "INSERT INTO t VALUES(1)"]
     432          with memory_database(autocommit=True) as cx:
     433              self.assertFalse(cx.in_transaction)
     434              with self.check_stmt_trace(cx, expected):
     435                  cx.execute("CREATE TABLE t(t)")
     436                  cx.execute("INSERT INTO t VALUES(1)")
     437                  self.assertFalse(cx.in_transaction)
     438  
     439      def test_autocommit_enabled_txn_ctl(self):
     440          for op in "commit", "rollback":
     441              with self.subTest(op=op):
     442                  with memory_database(autocommit=True) as cx:
     443                      meth = getattr(cx, op)
     444                      self.assertFalse(cx.in_transaction)
     445                      with self.check_stmt_trace(cx, []):
     446                          meth()  # expect this to pass silently
     447                          self.assertFalse(cx.in_transaction)
     448  
     449      def test_autocommit_disabled_then_enabled(self):
     450          expected = ["COMMIT"]
     451          with memory_database(autocommit=False) as cx:
     452              self.assertTrue(cx.in_transaction)
     453              with self.check_stmt_trace(cx, expected):
     454                  cx.autocommit = True  # should commit
     455                  self.assertFalse(cx.in_transaction)
     456  
     457      def test_autocommit_enabled_then_disabled(self):
     458          expected = ["BEGIN"]
     459          with memory_database(autocommit=True) as cx:
     460              self.assertFalse(cx.in_transaction)
     461              with self.check_stmt_trace(cx, expected):
     462                  cx.autocommit = False  # should begin
     463                  self.assertTrue(cx.in_transaction)
     464  
     465      def test_autocommit_explicit_then_disabled(self):
     466          expected = ["BEGIN DEFERRED"]
     467          with memory_database(autocommit=True) as cx:
     468              self.assertFalse(cx.in_transaction)
     469              with self.check_stmt_trace(cx, expected):
     470                  cx.execute("BEGIN DEFERRED")
     471                  cx.autocommit = False  # should now be a no-op
     472                  self.assertTrue(cx.in_transaction)
     473  
     474      def test_autocommit_enabled_ctx_mgr(self):
     475          with memory_database(autocommit=True) as cx:
     476              # The context manager is a no-op if autocommit=True
     477              with self.check_stmt_trace(cx, []):
     478                  with cx:
     479                      self.assertFalse(cx.in_transaction)
     480                  self.assertFalse(cx.in_transaction)
     481  
     482      def test_autocommit_disabled_ctx_mgr(self):
     483          expected = ["COMMIT", "BEGIN"]
     484          with memory_database(autocommit=False) as cx:
     485              with self.check_stmt_trace(cx, expected):
     486                  with cx:
     487                      self.assertTrue(cx.in_transaction)
     488                  self.assertTrue(cx.in_transaction)
     489  
     490      def test_autocommit_compat_ctx_mgr(self):
     491          expected = ["BEGIN ", "INSERT INTO T VALUES(1)", "COMMIT"]
     492          with memory_database(autocommit=self.legacy) as cx:
     493              cx.execute("create table t(t)")
     494              with self.check_stmt_trace(cx, expected):
     495                  with cx:
     496                      self.assertFalse(cx.in_transaction)
     497                      cx.execute("INSERT INTO T VALUES(1)")
     498                      self.assertTrue(cx.in_transaction)
     499                  self.assertFalse(cx.in_transaction)
     500  
     501      def test_autocommit_enabled_executescript(self):
     502          expected = ["BEGIN", "SELECT 1"]
     503          with memory_database(autocommit=True) as cx:
     504              with self.check_stmt_trace(cx, expected):
     505                  self.assertFalse(cx.in_transaction)
     506                  cx.execute("BEGIN")
     507                  cx.executescript("SELECT 1")
     508                  self.assertTrue(cx.in_transaction)
     509  
     510      def test_autocommit_disabled_executescript(self):
     511          expected = ["SELECT 1"]
     512          with memory_database(autocommit=False) as cx:
     513              with self.check_stmt_trace(cx, expected):
     514                  self.assertTrue(cx.in_transaction)
     515                  cx.executescript("SELECT 1")
     516                  self.assertTrue(cx.in_transaction)
     517  
     518      def test_autocommit_compat_executescript(self):
     519          expected = ["BEGIN", "COMMIT", "SELECT 1"]
     520          with memory_database(autocommit=self.legacy) as cx:
     521              with self.check_stmt_trace(cx, expected):
     522                  self.assertFalse(cx.in_transaction)
     523                  cx.execute("BEGIN")
     524                  cx.executescript("SELECT 1")
     525                  self.assertFalse(cx.in_transaction)
     526  
     527      def test_autocommit_disabled_implicit_shutdown(self):
     528          # The implicit ROLLBACK should not call back into Python during
     529          # interpreter tear-down.
     530          code = """if 1:
     531              import sqlite3
     532              cx = sqlite3.connect(":memory:", autocommit=False)
     533              cx.set_trace_callback(print)
     534          """
     535          assert_python_ok("-c", code, PYTHONIOENCODING="utf-8")
     536  
     537  
     538  if __name__ == "__main__":
     539      unittest.main()