(root)/
Python-3.11.7/
Lib/
test/
test_sqlite3/
test_dbapi.py
       1  # pysqlite2/test/dbapi.py: tests for DB-API compliance
       2  #
       3  # Copyright (C) 2004-2010 Gerhard Häring <gh@ghaering.de>
       4  #
       5  # This file is part of pysqlite.
       6  #
       7  # This software is provided 'as-is', without any express or implied
       8  # warranty.  In no event will the authors be held liable for any damages
       9  # arising from the use of this software.
      10  #
      11  # Permission is granted to anyone to use this software for any purpose,
      12  # including commercial applications, and to alter it and redistribute it
      13  # freely, subject to the following restrictions:
      14  #
      15  # 1. The origin of this software must not be misrepresented; you must not
      16  #    claim that you wrote the original software. If you use this software
      17  #    in a product, an acknowledgment in the product documentation would be
      18  #    appreciated but is not required.
      19  # 2. Altered source versions must be plainly marked as such, and must not be
      20  #    misrepresented as being the original software.
      21  # 3. This notice may not be removed or altered from any source distribution.
      22  
      23  import contextlib
      24  import os
      25  import sqlite3 as sqlite
      26  import subprocess
      27  import sys
      28  import threading
      29  import unittest
      30  import urllib.parse
      31  
      32  from test.support import (
      33      SHORT_TIMEOUT, bigmemtest, check_disallow_instantiation, requires_subprocess,
      34      is_emscripten, is_wasi
      35  )
      36  from test.support import threading_helper
      37  from _testcapi import INT_MAX, ULLONG_MAX
      38  from os import SEEK_SET, SEEK_CUR, SEEK_END
      39  from test.support.os_helper import TESTFN, TESTFN_UNDECODABLE, unlink, temp_dir, FakePath
      40  
      41  
      42  # Helper for temporary memory databases
      43  def memory_database(*args, **kwargs):
      44      cx = sqlite.connect(":memory:", *args, **kwargs)
      45      return contextlib.closing(cx)
      46  
      47  
      48  # Temporarily limit a database connection parameter
      49  @contextlib.contextmanager
      50  def cx_limit(cx, category=sqlite.SQLITE_LIMIT_SQL_LENGTH, limit=128):
      51      try:
      52          _prev = cx.setlimit(category, limit)
      53          yield limit
      54      finally:
      55          cx.setlimit(category, _prev)
      56  
      57  
      58  class ESC[4;38;5;81mModuleTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      59      def test_api_level(self):
      60          self.assertEqual(sqlite.apilevel, "2.0",
      61                           "apilevel is %s, should be 2.0" % sqlite.apilevel)
      62  
      63      def test_thread_safety(self):
      64          self.assertIn(sqlite.threadsafety, {0, 1, 3},
      65                        "threadsafety is %d, should be 0, 1 or 3" %
      66                        sqlite.threadsafety)
      67  
      68      def test_param_style(self):
      69          self.assertEqual(sqlite.paramstyle, "qmark",
      70                           "paramstyle is '%s', should be 'qmark'" %
      71                           sqlite.paramstyle)
      72  
      73      def test_warning(self):
      74          self.assertTrue(issubclass(sqlite.Warning, Exception),
      75                       "Warning is not a subclass of Exception")
      76  
      77      def test_error(self):
      78          self.assertTrue(issubclass(sqlite.Error, Exception),
      79                          "Error is not a subclass of Exception")
      80  
      81      def test_interface_error(self):
      82          self.assertTrue(issubclass(sqlite.InterfaceError, sqlite.Error),
      83                          "InterfaceError is not a subclass of Error")
      84  
      85      def test_database_error(self):
      86          self.assertTrue(issubclass(sqlite.DatabaseError, sqlite.Error),
      87                          "DatabaseError is not a subclass of Error")
      88  
      89      def test_data_error(self):
      90          self.assertTrue(issubclass(sqlite.DataError, sqlite.DatabaseError),
      91                          "DataError is not a subclass of DatabaseError")
      92  
      93      def test_operational_error(self):
      94          self.assertTrue(issubclass(sqlite.OperationalError, sqlite.DatabaseError),
      95                          "OperationalError is not a subclass of DatabaseError")
      96  
      97      def test_integrity_error(self):
      98          self.assertTrue(issubclass(sqlite.IntegrityError, sqlite.DatabaseError),
      99                          "IntegrityError is not a subclass of DatabaseError")
     100  
     101      def test_internal_error(self):
     102          self.assertTrue(issubclass(sqlite.InternalError, sqlite.DatabaseError),
     103                          "InternalError is not a subclass of DatabaseError")
     104  
     105      def test_programming_error(self):
     106          self.assertTrue(issubclass(sqlite.ProgrammingError, sqlite.DatabaseError),
     107                          "ProgrammingError is not a subclass of DatabaseError")
     108  
     109      def test_not_supported_error(self):
     110          self.assertTrue(issubclass(sqlite.NotSupportedError,
     111                                     sqlite.DatabaseError),
     112                          "NotSupportedError is not a subclass of DatabaseError")
     113  
     114      def test_module_constants(self):
     115          consts = [
     116              "SQLITE_ABORT",
     117              "SQLITE_ALTER_TABLE",
     118              "SQLITE_ANALYZE",
     119              "SQLITE_ATTACH",
     120              "SQLITE_AUTH",
     121              "SQLITE_BUSY",
     122              "SQLITE_CANTOPEN",
     123              "SQLITE_CONSTRAINT",
     124              "SQLITE_CORRUPT",
     125              "SQLITE_CREATE_INDEX",
     126              "SQLITE_CREATE_TABLE",
     127              "SQLITE_CREATE_TEMP_INDEX",
     128              "SQLITE_CREATE_TEMP_TABLE",
     129              "SQLITE_CREATE_TEMP_TRIGGER",
     130              "SQLITE_CREATE_TEMP_VIEW",
     131              "SQLITE_CREATE_TRIGGER",
     132              "SQLITE_CREATE_VIEW",
     133              "SQLITE_CREATE_VTABLE",
     134              "SQLITE_DELETE",
     135              "SQLITE_DENY",
     136              "SQLITE_DETACH",
     137              "SQLITE_DONE",
     138              "SQLITE_DROP_INDEX",
     139              "SQLITE_DROP_TABLE",
     140              "SQLITE_DROP_TEMP_INDEX",
     141              "SQLITE_DROP_TEMP_TABLE",
     142              "SQLITE_DROP_TEMP_TRIGGER",
     143              "SQLITE_DROP_TEMP_VIEW",
     144              "SQLITE_DROP_TRIGGER",
     145              "SQLITE_DROP_VIEW",
     146              "SQLITE_DROP_VTABLE",
     147              "SQLITE_EMPTY",
     148              "SQLITE_ERROR",
     149              "SQLITE_FORMAT",
     150              "SQLITE_FULL",
     151              "SQLITE_FUNCTION",
     152              "SQLITE_IGNORE",
     153              "SQLITE_INSERT",
     154              "SQLITE_INTERNAL",
     155              "SQLITE_INTERRUPT",
     156              "SQLITE_IOERR",
     157              "SQLITE_LOCKED",
     158              "SQLITE_MISMATCH",
     159              "SQLITE_MISUSE",
     160              "SQLITE_NOLFS",
     161              "SQLITE_NOMEM",
     162              "SQLITE_NOTADB",
     163              "SQLITE_NOTFOUND",
     164              "SQLITE_OK",
     165              "SQLITE_PERM",
     166              "SQLITE_PRAGMA",
     167              "SQLITE_PROTOCOL",
     168              "SQLITE_RANGE",
     169              "SQLITE_READ",
     170              "SQLITE_READONLY",
     171              "SQLITE_REINDEX",
     172              "SQLITE_ROW",
     173              "SQLITE_SAVEPOINT",
     174              "SQLITE_SCHEMA",
     175              "SQLITE_SELECT",
     176              "SQLITE_TOOBIG",
     177              "SQLITE_TRANSACTION",
     178              "SQLITE_UPDATE",
     179              # Run-time limit categories
     180              "SQLITE_LIMIT_LENGTH",
     181              "SQLITE_LIMIT_SQL_LENGTH",
     182              "SQLITE_LIMIT_COLUMN",
     183              "SQLITE_LIMIT_EXPR_DEPTH",
     184              "SQLITE_LIMIT_COMPOUND_SELECT",
     185              "SQLITE_LIMIT_VDBE_OP",
     186              "SQLITE_LIMIT_FUNCTION_ARG",
     187              "SQLITE_LIMIT_ATTACHED",
     188              "SQLITE_LIMIT_LIKE_PATTERN_LENGTH",
     189              "SQLITE_LIMIT_VARIABLE_NUMBER",
     190              "SQLITE_LIMIT_TRIGGER_DEPTH",
     191          ]
     192          if sqlite.sqlite_version_info >= (3, 7, 17):
     193              consts += ["SQLITE_NOTICE", "SQLITE_WARNING"]
     194          if sqlite.sqlite_version_info >= (3, 8, 3):
     195              consts.append("SQLITE_RECURSIVE")
     196          if sqlite.sqlite_version_info >= (3, 8, 7):
     197              consts.append("SQLITE_LIMIT_WORKER_THREADS")
     198          consts += ["PARSE_DECLTYPES", "PARSE_COLNAMES"]
     199          # Extended result codes
     200          consts += [
     201              "SQLITE_ABORT_ROLLBACK",
     202              "SQLITE_BUSY_RECOVERY",
     203              "SQLITE_CANTOPEN_FULLPATH",
     204              "SQLITE_CANTOPEN_ISDIR",
     205              "SQLITE_CANTOPEN_NOTEMPDIR",
     206              "SQLITE_CORRUPT_VTAB",
     207              "SQLITE_IOERR_ACCESS",
     208              "SQLITE_IOERR_BLOCKED",
     209              "SQLITE_IOERR_CHECKRESERVEDLOCK",
     210              "SQLITE_IOERR_CLOSE",
     211              "SQLITE_IOERR_DELETE",
     212              "SQLITE_IOERR_DELETE_NOENT",
     213              "SQLITE_IOERR_DIR_CLOSE",
     214              "SQLITE_IOERR_DIR_FSYNC",
     215              "SQLITE_IOERR_FSTAT",
     216              "SQLITE_IOERR_FSYNC",
     217              "SQLITE_IOERR_LOCK",
     218              "SQLITE_IOERR_NOMEM",
     219              "SQLITE_IOERR_RDLOCK",
     220              "SQLITE_IOERR_READ",
     221              "SQLITE_IOERR_SEEK",
     222              "SQLITE_IOERR_SHMLOCK",
     223              "SQLITE_IOERR_SHMMAP",
     224              "SQLITE_IOERR_SHMOPEN",
     225              "SQLITE_IOERR_SHMSIZE",
     226              "SQLITE_IOERR_SHORT_READ",
     227              "SQLITE_IOERR_TRUNCATE",
     228              "SQLITE_IOERR_UNLOCK",
     229              "SQLITE_IOERR_WRITE",
     230              "SQLITE_LOCKED_SHAREDCACHE",
     231              "SQLITE_READONLY_CANTLOCK",
     232              "SQLITE_READONLY_RECOVERY",
     233          ]
     234          if sqlite.sqlite_version_info >= (3, 7, 16):
     235              consts += [
     236                  "SQLITE_CONSTRAINT_CHECK",
     237                  "SQLITE_CONSTRAINT_COMMITHOOK",
     238                  "SQLITE_CONSTRAINT_FOREIGNKEY",
     239                  "SQLITE_CONSTRAINT_FUNCTION",
     240                  "SQLITE_CONSTRAINT_NOTNULL",
     241                  "SQLITE_CONSTRAINT_PRIMARYKEY",
     242                  "SQLITE_CONSTRAINT_TRIGGER",
     243                  "SQLITE_CONSTRAINT_UNIQUE",
     244                  "SQLITE_CONSTRAINT_VTAB",
     245                  "SQLITE_READONLY_ROLLBACK",
     246              ]
     247          if sqlite.sqlite_version_info >= (3, 7, 17):
     248              consts += [
     249                  "SQLITE_IOERR_MMAP",
     250                  "SQLITE_NOTICE_RECOVER_ROLLBACK",
     251                  "SQLITE_NOTICE_RECOVER_WAL",
     252              ]
     253          if sqlite.sqlite_version_info >= (3, 8, 0):
     254              consts += [
     255                  "SQLITE_BUSY_SNAPSHOT",
     256                  "SQLITE_IOERR_GETTEMPPATH",
     257                  "SQLITE_WARNING_AUTOINDEX",
     258              ]
     259          if sqlite.sqlite_version_info >= (3, 8, 1):
     260              consts += ["SQLITE_CANTOPEN_CONVPATH", "SQLITE_IOERR_CONVPATH"]
     261          if sqlite.sqlite_version_info >= (3, 8, 2):
     262              consts.append("SQLITE_CONSTRAINT_ROWID")
     263          if sqlite.sqlite_version_info >= (3, 8, 3):
     264              consts.append("SQLITE_READONLY_DBMOVED")
     265          if sqlite.sqlite_version_info >= (3, 8, 7):
     266              consts.append("SQLITE_AUTH_USER")
     267          if sqlite.sqlite_version_info >= (3, 9, 0):
     268              consts.append("SQLITE_IOERR_VNODE")
     269          if sqlite.sqlite_version_info >= (3, 10, 0):
     270              consts.append("SQLITE_IOERR_AUTH")
     271          if sqlite.sqlite_version_info >= (3, 14, 1):
     272              consts.append("SQLITE_OK_LOAD_PERMANENTLY")
     273          if sqlite.sqlite_version_info >= (3, 21, 0):
     274              consts += [
     275                  "SQLITE_IOERR_BEGIN_ATOMIC",
     276                  "SQLITE_IOERR_COMMIT_ATOMIC",
     277                  "SQLITE_IOERR_ROLLBACK_ATOMIC",
     278              ]
     279          if sqlite.sqlite_version_info >= (3, 22, 0):
     280              consts += [
     281                  "SQLITE_ERROR_MISSING_COLLSEQ",
     282                  "SQLITE_ERROR_RETRY",
     283                  "SQLITE_READONLY_CANTINIT",
     284                  "SQLITE_READONLY_DIRECTORY",
     285              ]
     286          if sqlite.sqlite_version_info >= (3, 24, 0):
     287              consts += ["SQLITE_CORRUPT_SEQUENCE", "SQLITE_LOCKED_VTAB"]
     288          if sqlite.sqlite_version_info >= (3, 25, 0):
     289              consts += ["SQLITE_CANTOPEN_DIRTYWAL", "SQLITE_ERROR_SNAPSHOT"]
     290          if sqlite.sqlite_version_info >= (3, 31, 0):
     291              consts += [
     292                  "SQLITE_CANTOPEN_SYMLINK",
     293                  "SQLITE_CONSTRAINT_PINNED",
     294                  "SQLITE_OK_SYMLINK",
     295              ]
     296          if sqlite.sqlite_version_info >= (3, 32, 0):
     297              consts += [
     298                  "SQLITE_BUSY_TIMEOUT",
     299                  "SQLITE_CORRUPT_INDEX",
     300                  "SQLITE_IOERR_DATA",
     301              ]
     302          if sqlite.sqlite_version_info >= (3, 34, 0):
     303              consts.append("SQLITE_IOERR_CORRUPTFS")
     304          for const in consts:
     305              with self.subTest(const=const):
     306                  self.assertTrue(hasattr(sqlite, const))
     307  
     308      def test_error_code_on_exception(self):
     309          err_msg = "unable to open database file"
     310          if sys.platform.startswith("win"):
     311              err_code = sqlite.SQLITE_CANTOPEN_ISDIR
     312          else:
     313              err_code = sqlite.SQLITE_CANTOPEN
     314  
     315          with temp_dir() as db:
     316              with self.assertRaisesRegex(sqlite.Error, err_msg) as cm:
     317                  sqlite.connect(db)
     318              e = cm.exception
     319              self.assertEqual(e.sqlite_errorcode, err_code)
     320              self.assertTrue(e.sqlite_errorname.startswith("SQLITE_CANTOPEN"))
     321  
     322      @unittest.skipIf(sqlite.sqlite_version_info <= (3, 7, 16),
     323                       "Requires SQLite 3.7.16 or newer")
     324      def test_extended_error_code_on_exception(self):
     325          with memory_database() as con:
     326              with con:
     327                  con.execute("create table t(t integer check(t > 0))")
     328              errmsg = "constraint failed"
     329              with self.assertRaisesRegex(sqlite.IntegrityError, errmsg) as cm:
     330                  con.execute("insert into t values(-1)")
     331              exc = cm.exception
     332              self.assertEqual(exc.sqlite_errorcode,
     333                               sqlite.SQLITE_CONSTRAINT_CHECK)
     334              self.assertEqual(exc.sqlite_errorname, "SQLITE_CONSTRAINT_CHECK")
     335  
     336      # sqlite3_enable_shared_cache() is deprecated on macOS and calling it may raise
     337      # OperationalError on some buildbots.
     338      @unittest.skipIf(sys.platform == "darwin", "shared cache is deprecated on macOS")
     339      def test_shared_cache_deprecated(self):
     340          for enable in (True, False):
     341              with self.assertWarns(DeprecationWarning) as cm:
     342                  sqlite.enable_shared_cache(enable)
     343              self.assertIn("dbapi.py", cm.filename)
     344  
     345      def test_disallow_instantiation(self):
     346          cx = sqlite.connect(":memory:")
     347          check_disallow_instantiation(self, type(cx("select 1")))
     348          check_disallow_instantiation(self, sqlite.Blob)
     349  
     350      def test_complete_statement(self):
     351          self.assertFalse(sqlite.complete_statement("select t"))
     352          self.assertTrue(sqlite.complete_statement("create table t(t);"))
     353  
     354  
     355  class ESC[4;38;5;81mConnectionTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     356  
     357      def setUp(self):
     358          self.cx = sqlite.connect(":memory:")
     359          cu = self.cx.cursor()
     360          cu.execute("create table test(id integer primary key, name text)")
     361          cu.execute("insert into test(name) values (?)", ("foo",))
     362  
     363      def tearDown(self):
     364          self.cx.close()
     365  
     366      def test_commit(self):
     367          self.cx.commit()
     368  
     369      def test_commit_after_no_changes(self):
     370          """
     371          A commit should also work when no changes were made to the database.
     372          """
     373          self.cx.commit()
     374          self.cx.commit()
     375  
     376      def test_rollback(self):
     377          self.cx.rollback()
     378  
     379      def test_rollback_after_no_changes(self):
     380          """
     381          A rollback should also work when no changes were made to the database.
     382          """
     383          self.cx.rollback()
     384          self.cx.rollback()
     385  
     386      def test_cursor(self):
     387          cu = self.cx.cursor()
     388  
     389      def test_failed_open(self):
     390          YOU_CANNOT_OPEN_THIS = "/foo/bar/bla/23534/mydb.db"
     391          with self.assertRaises(sqlite.OperationalError):
     392              sqlite.connect(YOU_CANNOT_OPEN_THIS)
     393  
     394      def test_close(self):
     395          self.cx.close()
     396  
     397      def test_use_after_close(self):
     398          sql = "select 1"
     399          cu = self.cx.cursor()
     400          res = cu.execute(sql)
     401          self.cx.close()
     402          self.assertRaises(sqlite.ProgrammingError, res.fetchall)
     403          self.assertRaises(sqlite.ProgrammingError, cu.execute, sql)
     404          self.assertRaises(sqlite.ProgrammingError, cu.executemany, sql, [])
     405          self.assertRaises(sqlite.ProgrammingError, cu.executescript, sql)
     406          self.assertRaises(sqlite.ProgrammingError, self.cx.execute, sql)
     407          self.assertRaises(sqlite.ProgrammingError,
     408                            self.cx.executemany, sql, [])
     409          self.assertRaises(sqlite.ProgrammingError, self.cx.executescript, sql)
     410          self.assertRaises(sqlite.ProgrammingError,
     411                            self.cx.create_function, "t", 1, lambda x: x)
     412          self.assertRaises(sqlite.ProgrammingError, self.cx.cursor)
     413          with self.assertRaises(sqlite.ProgrammingError):
     414              with self.cx:
     415                  pass
     416  
     417      def test_exceptions(self):
     418          # Optional DB-API extension.
     419          self.assertEqual(self.cx.Warning, sqlite.Warning)
     420          self.assertEqual(self.cx.Error, sqlite.Error)
     421          self.assertEqual(self.cx.InterfaceError, sqlite.InterfaceError)
     422          self.assertEqual(self.cx.DatabaseError, sqlite.DatabaseError)
     423          self.assertEqual(self.cx.DataError, sqlite.DataError)
     424          self.assertEqual(self.cx.OperationalError, sqlite.OperationalError)
     425          self.assertEqual(self.cx.IntegrityError, sqlite.IntegrityError)
     426          self.assertEqual(self.cx.InternalError, sqlite.InternalError)
     427          self.assertEqual(self.cx.ProgrammingError, sqlite.ProgrammingError)
     428          self.assertEqual(self.cx.NotSupportedError, sqlite.NotSupportedError)
     429  
     430      def test_in_transaction(self):
     431          # Can't use db from setUp because we want to test initial state.
     432          cx = sqlite.connect(":memory:")
     433          cu = cx.cursor()
     434          self.assertEqual(cx.in_transaction, False)
     435          cu.execute("create table transactiontest(id integer primary key, name text)")
     436          self.assertEqual(cx.in_transaction, False)
     437          cu.execute("insert into transactiontest(name) values (?)", ("foo",))
     438          self.assertEqual(cx.in_transaction, True)
     439          cu.execute("select name from transactiontest where name=?", ["foo"])
     440          row = cu.fetchone()
     441          self.assertEqual(cx.in_transaction, True)
     442          cx.commit()
     443          self.assertEqual(cx.in_transaction, False)
     444          cu.execute("select name from transactiontest where name=?", ["foo"])
     445          row = cu.fetchone()
     446          self.assertEqual(cx.in_transaction, False)
     447  
     448      def test_in_transaction_ro(self):
     449          with self.assertRaises(AttributeError):
     450              self.cx.in_transaction = True
     451  
     452      def test_connection_exceptions(self):
     453          exceptions = [
     454              "DataError",
     455              "DatabaseError",
     456              "Error",
     457              "IntegrityError",
     458              "InterfaceError",
     459              "NotSupportedError",
     460              "OperationalError",
     461              "ProgrammingError",
     462              "Warning",
     463          ]
     464          for exc in exceptions:
     465              with self.subTest(exc=exc):
     466                  self.assertTrue(hasattr(self.cx, exc))
     467                  self.assertIs(getattr(sqlite, exc), getattr(self.cx, exc))
     468  
     469      def test_interrupt_on_closed_db(self):
     470          cx = sqlite.connect(":memory:")
     471          cx.close()
     472          with self.assertRaises(sqlite.ProgrammingError):
     473              cx.interrupt()
     474  
     475      def test_interrupt(self):
     476          self.assertIsNone(self.cx.interrupt())
     477  
     478      def test_drop_unused_refs(self):
     479          for n in range(500):
     480              cu = self.cx.execute(f"select {n}")
     481              self.assertEqual(cu.fetchone()[0], n)
     482  
     483      def test_connection_limits(self):
     484          category = sqlite.SQLITE_LIMIT_SQL_LENGTH
     485          saved_limit = self.cx.getlimit(category)
     486          try:
     487              new_limit = 10
     488              prev_limit = self.cx.setlimit(category, new_limit)
     489              self.assertEqual(saved_limit, prev_limit)
     490              self.assertEqual(self.cx.getlimit(category), new_limit)
     491              msg = "query string is too large"
     492              self.assertRaisesRegex(sqlite.DataError, msg,
     493                                     self.cx.execute, "select 1 as '16'")
     494          finally:  # restore saved limit
     495              self.cx.setlimit(category, saved_limit)
     496  
     497      def test_connection_bad_limit_category(self):
     498          msg = "'category' is out of bounds"
     499          cat = 1111
     500          self.assertRaisesRegex(sqlite.ProgrammingError, msg,
     501                                 self.cx.getlimit, cat)
     502          self.assertRaisesRegex(sqlite.ProgrammingError, msg,
     503                                 self.cx.setlimit, cat, 0)
     504  
     505      def test_connection_init_bad_isolation_level(self):
     506          msg = (
     507              "isolation_level string must be '', 'DEFERRED', 'IMMEDIATE', or "
     508              "'EXCLUSIVE'"
     509          )
     510          levels = (
     511              "BOGUS",
     512              " ",
     513              "DEFERRE",
     514              "IMMEDIAT",
     515              "EXCLUSIV",
     516              "DEFERREDS",
     517              "IMMEDIATES",
     518              "EXCLUSIVES",
     519          )
     520          for level in levels:
     521              with self.subTest(level=level):
     522                  with self.assertRaisesRegex(ValueError, msg):
     523                      memory_database(isolation_level=level)
     524                  with memory_database() as cx:
     525                      with self.assertRaisesRegex(ValueError, msg):
     526                          cx.isolation_level = level
     527                      # Check that the default level is not changed
     528                      self.assertEqual(cx.isolation_level, "")
     529  
     530      def test_connection_init_good_isolation_levels(self):
     531          for level in ("", "DEFERRED", "IMMEDIATE", "EXCLUSIVE", None):
     532              with self.subTest(level=level):
     533                  with memory_database(isolation_level=level) as cx:
     534                      self.assertEqual(cx.isolation_level, level)
     535                  with memory_database() as cx:
     536                      self.assertEqual(cx.isolation_level, "")
     537                      cx.isolation_level = level
     538                      self.assertEqual(cx.isolation_level, level)
     539  
     540      def test_connection_reinit(self):
     541          db = ":memory:"
     542          cx = sqlite.connect(db)
     543          cx.text_factory = bytes
     544          cx.row_factory = sqlite.Row
     545          cu = cx.cursor()
     546          cu.execute("create table foo (bar)")
     547          cu.executemany("insert into foo (bar) values (?)",
     548                         ((str(v),) for v in range(4)))
     549          cu.execute("select bar from foo")
     550  
     551          rows = [r for r in cu.fetchmany(2)]
     552          self.assertTrue(all(isinstance(r, sqlite.Row) for r in rows))
     553          self.assertEqual([r[0] for r in rows], [b"0", b"1"])
     554  
     555          cx.__init__(db)
     556          cx.execute("create table foo (bar)")
     557          cx.executemany("insert into foo (bar) values (?)",
     558                         ((v,) for v in ("a", "b", "c", "d")))
     559  
     560          # This uses the old database, old row factory, but new text factory
     561          rows = [r for r in cu.fetchall()]
     562          self.assertTrue(all(isinstance(r, sqlite.Row) for r in rows))
     563          self.assertEqual([r[0] for r in rows], ["2", "3"])
     564  
     565      def test_connection_bad_reinit(self):
     566          cx = sqlite.connect(":memory:")
     567          with cx:
     568              cx.execute("create table t(t)")
     569          with temp_dir() as db:
     570              self.assertRaisesRegex(sqlite.OperationalError,
     571                                     "unable to open database file",
     572                                     cx.__init__, db)
     573              self.assertRaisesRegex(sqlite.ProgrammingError,
     574                                     "Base Connection.__init__ not called",
     575                                     cx.executemany, "insert into t values(?)",
     576                                     ((v,) for v in range(3)))
     577  
     578  
     579  class ESC[4;38;5;81mUninitialisedConnectionTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     580      def setUp(self):
     581          self.cx = sqlite.Connection.__new__(sqlite.Connection)
     582  
     583      def test_uninit_operations(self):
     584          funcs = (
     585              lambda: self.cx.isolation_level,
     586              lambda: self.cx.total_changes,
     587              lambda: self.cx.in_transaction,
     588              lambda: self.cx.iterdump(),
     589              lambda: self.cx.cursor(),
     590              lambda: self.cx.close(),
     591          )
     592          for func in funcs:
     593              with self.subTest(func=func):
     594                  self.assertRaisesRegex(sqlite.ProgrammingError,
     595                                         "Base Connection.__init__ not called",
     596                                         func)
     597  
     598  
     599  @unittest.skipUnless(hasattr(sqlite.Connection, "serialize"),
     600                       "Needs SQLite serialize API")
     601  class ESC[4;38;5;81mSerializeTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     602      def test_serialize_deserialize(self):
     603          with memory_database() as cx:
     604              with cx:
     605                  cx.execute("create table t(t)")
     606              data = cx.serialize()
     607  
     608              # Remove test table, verify that it was removed.
     609              with cx:
     610                  cx.execute("drop table t")
     611              regex = "no such table"
     612              with self.assertRaisesRegex(sqlite.OperationalError, regex):
     613                  cx.execute("select t from t")
     614  
     615              # Deserialize and verify that test table is restored.
     616              cx.deserialize(data)
     617              cx.execute("select t from t")
     618  
     619      def test_deserialize_wrong_args(self):
     620          dataset = (
     621              (BufferError, memoryview(b"blob")[::2]),
     622              (TypeError, []),
     623              (TypeError, 1),
     624              (TypeError, None),
     625          )
     626          for exc, arg in dataset:
     627              with self.subTest(exc=exc, arg=arg):
     628                  with memory_database() as cx:
     629                      self.assertRaises(exc, cx.deserialize, arg)
     630  
     631      def test_deserialize_corrupt_database(self):
     632          with memory_database() as cx:
     633              regex = "file is not a database"
     634              with self.assertRaisesRegex(sqlite.DatabaseError, regex):
     635                  cx.deserialize(b"\0\1\3")
     636                  # SQLite does not generate an error until you try to query the
     637                  # deserialized database.
     638                  cx.execute("create table fail(f)")
     639  
     640      @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
     641      @bigmemtest(size=2**63, memuse=3, dry_run=False)
     642      def test_deserialize_too_much_data_64bit(self):
     643          with memory_database() as cx:
     644              with self.assertRaisesRegex(OverflowError, "'data' is too large"):
     645                  cx.deserialize(b"b" * size)
     646  
     647  
     648  class ESC[4;38;5;81mOpenTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     649      _sql = "create table test(id integer)"
     650  
     651      def test_open_with_path_like_object(self):
     652          """ Checks that we can successfully connect to a database using an object that
     653              is PathLike, i.e. has __fspath__(). """
     654          path = FakePath(TESTFN)
     655          self.addCleanup(unlink, path)
     656          self.assertFalse(os.path.exists(path))
     657          with contextlib.closing(sqlite.connect(path)) as cx:
     658              self.assertTrue(os.path.exists(path))
     659              cx.execute(self._sql)
     660  
     661      @unittest.skipIf(sys.platform == "win32", "skipped on Windows")
     662      @unittest.skipIf(sys.platform == "darwin", "skipped on macOS")
     663      @unittest.skipIf(is_emscripten or is_wasi, "not supported on Emscripten/WASI")
     664      @unittest.skipUnless(TESTFN_UNDECODABLE, "only works if there are undecodable paths")
     665      def test_open_with_undecodable_path(self):
     666          path = TESTFN_UNDECODABLE
     667          self.addCleanup(unlink, path)
     668          self.assertFalse(os.path.exists(path))
     669          with contextlib.closing(sqlite.connect(path)) as cx:
     670              self.assertTrue(os.path.exists(path))
     671              cx.execute(self._sql)
     672  
     673      def test_open_uri(self):
     674          path = TESTFN
     675          self.addCleanup(unlink, path)
     676          uri = "file:" + urllib.parse.quote(os.fsencode(path))
     677          self.assertFalse(os.path.exists(path))
     678          with contextlib.closing(sqlite.connect(uri, uri=True)) as cx:
     679              self.assertTrue(os.path.exists(path))
     680              cx.execute(self._sql)
     681  
     682      def test_open_unquoted_uri(self):
     683          path = TESTFN
     684          self.addCleanup(unlink, path)
     685          uri = "file:" + path
     686          self.assertFalse(os.path.exists(path))
     687          with contextlib.closing(sqlite.connect(uri, uri=True)) as cx:
     688              self.assertTrue(os.path.exists(path))
     689              cx.execute(self._sql)
     690  
     691      def test_open_uri_readonly(self):
     692          path = TESTFN
     693          self.addCleanup(unlink, path)
     694          uri = "file:" + urllib.parse.quote(os.fsencode(path)) + "?mode=ro"
     695          self.assertFalse(os.path.exists(path))
     696          # Cannot create new DB
     697          with self.assertRaises(sqlite.OperationalError):
     698              sqlite.connect(uri, uri=True)
     699          self.assertFalse(os.path.exists(path))
     700          sqlite.connect(path).close()
     701          self.assertTrue(os.path.exists(path))
     702          # Cannot modify new DB
     703          with contextlib.closing(sqlite.connect(uri, uri=True)) as cx:
     704              with self.assertRaises(sqlite.OperationalError):
     705                  cx.execute(self._sql)
     706  
     707      @unittest.skipIf(sys.platform == "win32", "skipped on Windows")
     708      @unittest.skipIf(sys.platform == "darwin", "skipped on macOS")
     709      @unittest.skipIf(is_emscripten or is_wasi, "not supported on Emscripten/WASI")
     710      @unittest.skipUnless(TESTFN_UNDECODABLE, "only works if there are undecodable paths")
     711      def test_open_undecodable_uri(self):
     712          path = TESTFN_UNDECODABLE
     713          self.addCleanup(unlink, path)
     714          uri = "file:" + urllib.parse.quote(path)
     715          self.assertFalse(os.path.exists(path))
     716          with contextlib.closing(sqlite.connect(uri, uri=True)) as cx:
     717              self.assertTrue(os.path.exists(path))
     718              cx.execute(self._sql)
     719  
     720      def test_factory_database_arg(self):
     721          def factory(database, *args, **kwargs):
     722              nonlocal database_arg
     723              database_arg = database
     724              return sqlite.Connection(":memory:", *args, **kwargs)
     725  
     726          for database in (TESTFN, os.fsencode(TESTFN),
     727                           FakePath(TESTFN), FakePath(os.fsencode(TESTFN))):
     728              database_arg = None
     729              sqlite.connect(database, factory=factory).close()
     730              self.assertEqual(database_arg, database)
     731  
     732      def test_database_keyword(self):
     733          with contextlib.closing(sqlite.connect(database=":memory:")) as cx:
     734              self.assertEqual(type(cx), sqlite.Connection)
     735  
     736  
     737  class ESC[4;38;5;81mCursorTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     738      def setUp(self):
     739          self.cx = sqlite.connect(":memory:")
     740          self.cu = self.cx.cursor()
     741          self.cu.execute(
     742              "create table test(id integer primary key, name text, "
     743              "income number, unique_test text unique)"
     744          )
     745          self.cu.execute("insert into test(name) values (?)", ("foo",))
     746  
     747      def tearDown(self):
     748          self.cu.close()
     749          self.cx.close()
     750  
     751      def test_execute_no_args(self):
     752          self.cu.execute("delete from test")
     753  
     754      def test_execute_illegal_sql(self):
     755          with self.assertRaises(sqlite.OperationalError):
     756              self.cu.execute("select asdf")
     757  
     758      def test_execute_multiple_statements(self):
     759          msg = "You can only execute one statement at a time"
     760          dataset = (
     761              "select 1; select 2",
     762              "select 1; // c++ comments are not allowed",
     763              "select 1; *not a comment",
     764              "select 1; -*not a comment",
     765              "select 1; /* */ a",
     766              "select 1; /**/a",
     767              "select 1; -",
     768              "select 1; /",
     769              "select 1; -\n- select 2",
     770              """select 1;
     771                 -- comment
     772                 select 2
     773              """,
     774          )
     775          for query in dataset:
     776              with self.subTest(query=query):
     777                  with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
     778                      self.cu.execute(query)
     779  
     780      def test_execute_with_appended_comments(self):
     781          dataset = (
     782              "select 1; -- foo bar",
     783              "select 1; --",
     784              "select 1; /*",  # Unclosed comments ending in \0 are skipped.
     785              """
     786              select 5+4;
     787  
     788              /*
     789              foo
     790              */
     791              """,
     792          )
     793          for query in dataset:
     794              with self.subTest(query=query):
     795                  self.cu.execute(query)
     796  
     797      def test_execute_wrong_sql_arg(self):
     798          with self.assertRaises(TypeError):
     799              self.cu.execute(42)
     800  
     801      def test_execute_arg_int(self):
     802          self.cu.execute("insert into test(id) values (?)", (42,))
     803  
     804      def test_execute_arg_float(self):
     805          self.cu.execute("insert into test(income) values (?)", (2500.32,))
     806  
     807      def test_execute_arg_string(self):
     808          self.cu.execute("insert into test(name) values (?)", ("Hugo",))
     809  
     810      def test_execute_arg_string_with_zero_byte(self):
     811          self.cu.execute("insert into test(name) values (?)", ("Hu\x00go",))
     812  
     813          self.cu.execute("select name from test where id=?", (self.cu.lastrowid,))
     814          row = self.cu.fetchone()
     815          self.assertEqual(row[0], "Hu\x00go")
     816  
     817      def test_execute_non_iterable(self):
     818          with self.assertRaises(sqlite.ProgrammingError) as cm:
     819              self.cu.execute("insert into test(id) values (?)", 42)
     820          self.assertEqual(str(cm.exception), 'parameters are of unsupported type')
     821  
     822      def test_execute_wrong_no_of_args1(self):
     823          # too many parameters
     824          with self.assertRaises(sqlite.ProgrammingError):
     825              self.cu.execute("insert into test(id) values (?)", (17, "Egon"))
     826  
     827      def test_execute_wrong_no_of_args2(self):
     828          # too little parameters
     829          with self.assertRaises(sqlite.ProgrammingError):
     830              self.cu.execute("insert into test(id) values (?)")
     831  
     832      def test_execute_wrong_no_of_args3(self):
     833          # no parameters, parameters are needed
     834          with self.assertRaises(sqlite.ProgrammingError):
     835              self.cu.execute("insert into test(id) values (?)")
     836  
     837      def test_execute_param_list(self):
     838          self.cu.execute("insert into test(name) values ('foo')")
     839          self.cu.execute("select name from test where name=?", ["foo"])
     840          row = self.cu.fetchone()
     841          self.assertEqual(row[0], "foo")
     842  
     843      def test_execute_param_sequence(self):
     844          class ESC[4;38;5;81mL:
     845              def __len__(self):
     846                  return 1
     847              def __getitem__(self, x):
     848                  assert x == 0
     849                  return "foo"
     850  
     851          self.cu.execute("insert into test(name) values ('foo')")
     852          self.cu.execute("select name from test where name=?", L())
     853          row = self.cu.fetchone()
     854          self.assertEqual(row[0], "foo")
     855  
     856      def test_execute_param_sequence_bad_len(self):
     857          # Issue41662: Error in __len__() was overridden with ProgrammingError.
     858          class ESC[4;38;5;81mL:
     859              def __len__(self):
     860                  1/0
     861              def __getitem__(slf, x):
     862                  raise AssertionError
     863  
     864          self.cu.execute("insert into test(name) values ('foo')")
     865          with self.assertRaises(ZeroDivisionError):
     866              self.cu.execute("select name from test where name=?", L())
     867  
     868      def test_execute_too_many_params(self):
     869          category = sqlite.SQLITE_LIMIT_VARIABLE_NUMBER
     870          msg = "too many SQL variables"
     871          with cx_limit(self.cx, category=category, limit=1):
     872              self.cu.execute("select * from test where id=?", (1,))
     873              with self.assertRaisesRegex(sqlite.OperationalError, msg):
     874                  self.cu.execute("select * from test where id!=? and id!=?",
     875                                  (1, 2))
     876  
     877      def test_execute_dict_mapping(self):
     878          self.cu.execute("insert into test(name) values ('foo')")
     879          self.cu.execute("select name from test where name=:name", {"name": "foo"})
     880          row = self.cu.fetchone()
     881          self.assertEqual(row[0], "foo")
     882  
     883      def test_execute_dict_mapping_mapping(self):
     884          class ESC[4;38;5;81mD(ESC[4;38;5;149mdict):
     885              def __missing__(self, key):
     886                  return "foo"
     887  
     888          self.cu.execute("insert into test(name) values ('foo')")
     889          self.cu.execute("select name from test where name=:name", D())
     890          row = self.cu.fetchone()
     891          self.assertEqual(row[0], "foo")
     892  
     893      def test_execute_dict_mapping_too_little_args(self):
     894          self.cu.execute("insert into test(name) values ('foo')")
     895          with self.assertRaises(sqlite.ProgrammingError):
     896              self.cu.execute("select name from test where name=:name and id=:id", {"name": "foo"})
     897  
     898      def test_execute_dict_mapping_no_args(self):
     899          self.cu.execute("insert into test(name) values ('foo')")
     900          with self.assertRaises(sqlite.ProgrammingError):
     901              self.cu.execute("select name from test where name=:name")
     902  
     903      def test_execute_dict_mapping_unnamed(self):
     904          self.cu.execute("insert into test(name) values ('foo')")
     905          with self.assertRaises(sqlite.ProgrammingError):
     906              self.cu.execute("select name from test where name=?", {"name": "foo"})
     907  
     908      def test_close(self):
     909          self.cu.close()
     910  
     911      def test_rowcount_execute(self):
     912          self.cu.execute("delete from test")
     913          self.cu.execute("insert into test(name) values ('foo')")
     914          self.cu.execute("insert into test(name) values ('foo')")
     915          self.cu.execute("update test set name='bar'")
     916          self.assertEqual(self.cu.rowcount, 2)
     917  
     918      def test_rowcount_select(self):
     919          """
     920          pysqlite does not know the rowcount of SELECT statements, because we
     921          don't fetch all rows after executing the select statement. The rowcount
     922          has thus to be -1.
     923          """
     924          self.cu.execute("select 5 union select 6")
     925          self.assertEqual(self.cu.rowcount, -1)
     926  
     927      def test_rowcount_executemany(self):
     928          self.cu.execute("delete from test")
     929          self.cu.executemany("insert into test(name) values (?)", [(1,), (2,), (3,)])
     930          self.assertEqual(self.cu.rowcount, 3)
     931  
     932      @unittest.skipIf(sqlite.sqlite_version_info < (3, 35, 0),
     933                       "Requires SQLite 3.35.0 or newer")
     934      def test_rowcount_update_returning(self):
     935          # gh-93421: rowcount is updated correctly for UPDATE...RETURNING queries
     936          self.cu.execute("update test set name='bar' where name='foo' returning 1")
     937          self.assertEqual(self.cu.fetchone()[0], 1)
     938          self.assertEqual(self.cu.rowcount, 1)
     939  
     940      def test_rowcount_prefixed_with_comment(self):
     941          # gh-79579: rowcount is updated even if query is prefixed with comments
     942          self.cu.execute("""
     943              -- foo
     944              insert into test(name) values ('foo'), ('foo')
     945          """)
     946          self.assertEqual(self.cu.rowcount, 2)
     947          self.cu.execute("""
     948              /* -- messy *r /* /* ** *- *--
     949              */
     950              /* one more */ insert into test(name) values ('messy')
     951          """)
     952          self.assertEqual(self.cu.rowcount, 1)
     953          self.cu.execute("/* bar */ update test set name='bar' where name='foo'")
     954          self.assertEqual(self.cu.rowcount, 3)
     955  
     956      def test_rowcount_vaccuum(self):
     957          data = ((1,), (2,), (3,))
     958          self.cu.executemany("insert into test(income) values(?)", data)
     959          self.assertEqual(self.cu.rowcount, 3)
     960          self.cx.commit()
     961          self.cu.execute("vacuum")
     962          self.assertEqual(self.cu.rowcount, -1)
     963  
     964      def test_total_changes(self):
     965          self.cu.execute("insert into test(name) values ('foo')")
     966          self.cu.execute("insert into test(name) values ('foo')")
     967          self.assertLess(2, self.cx.total_changes, msg='total changes reported wrong value')
     968  
     969      # Checks for executemany:
     970      # Sequences are required by the DB-API, iterators
     971      # enhancements in pysqlite.
     972  
     973      def test_execute_many_sequence(self):
     974          self.cu.executemany("insert into test(income) values (?)", [(x,) for x in range(100, 110)])
     975  
     976      def test_execute_many_iterator(self):
     977          class ESC[4;38;5;81mMyIter:
     978              def __init__(self):
     979                  self.value = 5
     980  
     981              def __iter__(self):
     982                  return self
     983  
     984              def __next__(self):
     985                  if self.value == 10:
     986                      raise StopIteration
     987                  else:
     988                      self.value += 1
     989                      return (self.value,)
     990  
     991          self.cu.executemany("insert into test(income) values (?)", MyIter())
     992  
     993      def test_execute_many_generator(self):
     994          def mygen():
     995              for i in range(5):
     996                  yield (i,)
     997  
     998          self.cu.executemany("insert into test(income) values (?)", mygen())
     999  
    1000      def test_execute_many_wrong_sql_arg(self):
    1001          with self.assertRaises(TypeError):
    1002              self.cu.executemany(42, [(3,)])
    1003  
    1004      def test_execute_many_select(self):
    1005          with self.assertRaises(sqlite.ProgrammingError):
    1006              self.cu.executemany("select ?", [(3,)])
    1007  
    1008      def test_execute_many_not_iterable(self):
    1009          with self.assertRaises(TypeError):
    1010              self.cu.executemany("insert into test(income) values (?)", 42)
    1011  
    1012      def test_fetch_iter(self):
    1013          # Optional DB-API extension.
    1014          self.cu.execute("delete from test")
    1015          self.cu.execute("insert into test(id) values (?)", (5,))
    1016          self.cu.execute("insert into test(id) values (?)", (6,))
    1017          self.cu.execute("select id from test order by id")
    1018          lst = []
    1019          for row in self.cu:
    1020              lst.append(row[0])
    1021          self.assertEqual(lst[0], 5)
    1022          self.assertEqual(lst[1], 6)
    1023  
    1024      def test_fetchone(self):
    1025          self.cu.execute("select name from test")
    1026          row = self.cu.fetchone()
    1027          self.assertEqual(row[0], "foo")
    1028          row = self.cu.fetchone()
    1029          self.assertEqual(row, None)
    1030  
    1031      def test_fetchone_no_statement(self):
    1032          cur = self.cx.cursor()
    1033          row = cur.fetchone()
    1034          self.assertEqual(row, None)
    1035  
    1036      def test_array_size(self):
    1037          # must default to 1
    1038          self.assertEqual(self.cu.arraysize, 1)
    1039  
    1040          # now set to 2
    1041          self.cu.arraysize = 2
    1042  
    1043          # now make the query return 3 rows
    1044          self.cu.execute("delete from test")
    1045          self.cu.execute("insert into test(name) values ('A')")
    1046          self.cu.execute("insert into test(name) values ('B')")
    1047          self.cu.execute("insert into test(name) values ('C')")
    1048          self.cu.execute("select name from test")
    1049          res = self.cu.fetchmany()
    1050  
    1051          self.assertEqual(len(res), 2)
    1052  
    1053      def test_fetchmany(self):
    1054          self.cu.execute("select name from test")
    1055          res = self.cu.fetchmany(100)
    1056          self.assertEqual(len(res), 1)
    1057          res = self.cu.fetchmany(100)
    1058          self.assertEqual(res, [])
    1059  
    1060      def test_fetchmany_kw_arg(self):
    1061          """Checks if fetchmany works with keyword arguments"""
    1062          self.cu.execute("select name from test")
    1063          res = self.cu.fetchmany(size=100)
    1064          self.assertEqual(len(res), 1)
    1065  
    1066      def test_fetchall(self):
    1067          self.cu.execute("select name from test")
    1068          res = self.cu.fetchall()
    1069          self.assertEqual(len(res), 1)
    1070          res = self.cu.fetchall()
    1071          self.assertEqual(res, [])
    1072  
    1073      def test_setinputsizes(self):
    1074          self.cu.setinputsizes([3, 4, 5])
    1075  
    1076      def test_setoutputsize(self):
    1077          self.cu.setoutputsize(5, 0)
    1078  
    1079      def test_setoutputsize_no_column(self):
    1080          self.cu.setoutputsize(42)
    1081  
    1082      def test_cursor_connection(self):
    1083          # Optional DB-API extension.
    1084          self.assertEqual(self.cu.connection, self.cx)
    1085  
    1086      def test_wrong_cursor_callable(self):
    1087          with self.assertRaises(TypeError):
    1088              def f(): pass
    1089              cur = self.cx.cursor(f)
    1090  
    1091      def test_cursor_wrong_class(self):
    1092          class ESC[4;38;5;81mFoo: pass
    1093          foo = Foo()
    1094          with self.assertRaises(TypeError):
    1095              cur = sqlite.Cursor(foo)
    1096  
    1097      def test_last_row_id_on_replace(self):
    1098          """
    1099          INSERT OR REPLACE and REPLACE INTO should produce the same behavior.
    1100          """
    1101          sql = '{} INTO test(id, unique_test) VALUES (?, ?)'
    1102          for statement in ('INSERT OR REPLACE', 'REPLACE'):
    1103              with self.subTest(statement=statement):
    1104                  self.cu.execute(sql.format(statement), (1, 'foo'))
    1105                  self.assertEqual(self.cu.lastrowid, 1)
    1106  
    1107      def test_last_row_id_on_ignore(self):
    1108          self.cu.execute(
    1109              "insert or ignore into test(unique_test) values (?)",
    1110              ('test',))
    1111          self.assertEqual(self.cu.lastrowid, 2)
    1112          self.cu.execute(
    1113              "insert or ignore into test(unique_test) values (?)",
    1114              ('test',))
    1115          self.assertEqual(self.cu.lastrowid, 2)
    1116  
    1117      def test_last_row_id_insert_o_r(self):
    1118          results = []
    1119          for statement in ('FAIL', 'ABORT', 'ROLLBACK'):
    1120              sql = 'INSERT OR {} INTO test(unique_test) VALUES (?)'
    1121              with self.subTest(statement='INSERT OR {}'.format(statement)):
    1122                  self.cu.execute(sql.format(statement), (statement,))
    1123                  results.append((statement, self.cu.lastrowid))
    1124                  with self.assertRaises(sqlite.IntegrityError):
    1125                      self.cu.execute(sql.format(statement), (statement,))
    1126                  results.append((statement, self.cu.lastrowid))
    1127          expected = [
    1128              ('FAIL', 2), ('FAIL', 2),
    1129              ('ABORT', 3), ('ABORT', 3),
    1130              ('ROLLBACK', 4), ('ROLLBACK', 4),
    1131          ]
    1132          self.assertEqual(results, expected)
    1133  
    1134      def test_column_count(self):
    1135          # Check that column count is updated correctly for cached statements
    1136          select = "select * from test"
    1137          res = self.cu.execute(select)
    1138          old_count = len(res.description)
    1139          # Add a new column and execute the cached select query again
    1140          self.cu.execute("alter table test add newcol")
    1141          res = self.cu.execute(select)
    1142          new_count = len(res.description)
    1143          self.assertEqual(new_count - old_count, 1)
    1144  
    1145      def test_same_query_in_multiple_cursors(self):
    1146          cursors = [self.cx.execute("select 1") for _ in range(3)]
    1147          for cu in cursors:
    1148              self.assertEqual(cu.fetchall(), [(1,)])
    1149  
    1150  
    1151  class ESC[4;38;5;81mBlobTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1152      def setUp(self):
    1153          self.cx = sqlite.connect(":memory:")
    1154          self.cx.execute("create table test(b blob)")
    1155          self.data = b"this blob data string is exactly fifty bytes long!"
    1156          self.cx.execute("insert into test(b) values (?)", (self.data,))
    1157          self.blob = self.cx.blobopen("test", "b", 1)
    1158  
    1159      def tearDown(self):
    1160          self.blob.close()
    1161          self.cx.close()
    1162  
    1163      def test_blob_is_a_blob(self):
    1164          self.assertIsInstance(self.blob, sqlite.Blob)
    1165  
    1166      def test_blob_seek_and_tell(self):
    1167          self.blob.seek(10)
    1168          self.assertEqual(self.blob.tell(), 10)
    1169  
    1170          self.blob.seek(10, SEEK_SET)
    1171          self.assertEqual(self.blob.tell(), 10)
    1172  
    1173          self.blob.seek(10, SEEK_CUR)
    1174          self.assertEqual(self.blob.tell(), 20)
    1175  
    1176          self.blob.seek(-10, SEEK_END)
    1177          self.assertEqual(self.blob.tell(), 40)
    1178  
    1179      def test_blob_seek_error(self):
    1180          msg_oor = "offset out of blob range"
    1181          msg_orig = "'origin' should be os.SEEK_SET, os.SEEK_CUR, or os.SEEK_END"
    1182          msg_of = "seek offset results in overflow"
    1183  
    1184          dataset = (
    1185              (ValueError, msg_oor, lambda: self.blob.seek(1000)),
    1186              (ValueError, msg_oor, lambda: self.blob.seek(-10)),
    1187              (ValueError, msg_orig, lambda: self.blob.seek(10, -1)),
    1188              (ValueError, msg_orig, lambda: self.blob.seek(10, 3)),
    1189          )
    1190          for exc, msg, fn in dataset:
    1191              with self.subTest(exc=exc, msg=msg, fn=fn):
    1192                  self.assertRaisesRegex(exc, msg, fn)
    1193  
    1194          # Force overflow errors
    1195          self.blob.seek(1, SEEK_SET)
    1196          with self.assertRaisesRegex(OverflowError, msg_of):
    1197              self.blob.seek(INT_MAX, SEEK_CUR)
    1198          with self.assertRaisesRegex(OverflowError, msg_of):
    1199              self.blob.seek(INT_MAX, SEEK_END)
    1200  
    1201      def test_blob_read(self):
    1202          buf = self.blob.read()
    1203          self.assertEqual(buf, self.data)
    1204  
    1205      def test_blob_read_oversized(self):
    1206          buf = self.blob.read(len(self.data) * 2)
    1207          self.assertEqual(buf, self.data)
    1208  
    1209      def test_blob_read_advance_offset(self):
    1210          n = 10
    1211          buf = self.blob.read(n)
    1212          self.assertEqual(buf, self.data[:n])
    1213          self.assertEqual(self.blob.tell(), n)
    1214  
    1215      def test_blob_read_at_offset(self):
    1216          self.blob.seek(10)
    1217          self.assertEqual(self.blob.read(10), self.data[10:20])
    1218  
    1219      def test_blob_read_error_row_changed(self):
    1220          self.cx.execute("update test set b='aaaa' where rowid=1")
    1221          with self.assertRaises(sqlite.OperationalError):
    1222              self.blob.read()
    1223  
    1224      def test_blob_write(self):
    1225          new_data = b"new data".ljust(50)
    1226          self.blob.write(new_data)
    1227          row = self.cx.execute("select b from test").fetchone()
    1228          self.assertEqual(row[0], new_data)
    1229  
    1230      def test_blob_write_at_offset(self):
    1231          new_data = b"c" * 25
    1232          self.blob.seek(25)
    1233          self.blob.write(new_data)
    1234          row = self.cx.execute("select b from test").fetchone()
    1235          self.assertEqual(row[0], self.data[:25] + new_data)
    1236  
    1237      def test_blob_write_advance_offset(self):
    1238          self.blob.write(b"d"*10)
    1239          self.assertEqual(self.blob.tell(), 10)
    1240  
    1241      def test_blob_write_error_length(self):
    1242          with self.assertRaisesRegex(ValueError, "data longer than blob"):
    1243              self.blob.write(b"a" * 1000)
    1244  
    1245          self.blob.seek(0, SEEK_SET)
    1246          n = len(self.blob)
    1247          self.blob.write(b"a" * (n-1))
    1248          self.blob.write(b"a")
    1249          with self.assertRaisesRegex(ValueError, "data longer than blob"):
    1250              self.blob.write(b"a")
    1251  
    1252      def test_blob_write_error_row_changed(self):
    1253          self.cx.execute("update test set b='aaaa' where rowid=1")
    1254          with self.assertRaises(sqlite.OperationalError):
    1255              self.blob.write(b"aaa")
    1256  
    1257      def test_blob_write_error_readonly(self):
    1258          ro_blob = self.cx.blobopen("test", "b", 1, readonly=True)
    1259          with self.assertRaisesRegex(sqlite.OperationalError, "readonly"):
    1260              ro_blob.write(b"aaa")
    1261          ro_blob.close()
    1262  
    1263      def test_blob_open_error(self):
    1264          dataset = (
    1265              (("test", "b", 1), {"name": "notexisting"}),
    1266              (("notexisting", "b", 1), {}),
    1267              (("test", "notexisting", 1), {}),
    1268              (("test", "b", 2), {}),
    1269          )
    1270          regex = "no such"
    1271          for args, kwds in dataset:
    1272              with self.subTest(args=args, kwds=kwds):
    1273                  with self.assertRaisesRegex(sqlite.OperationalError, regex):
    1274                      self.cx.blobopen(*args, **kwds)
    1275  
    1276      def test_blob_length(self):
    1277          self.assertEqual(len(self.blob), 50)
    1278  
    1279      def test_blob_get_item(self):
    1280          self.assertEqual(self.blob[5], ord("b"))
    1281          self.assertEqual(self.blob[6], ord("l"))
    1282          self.assertEqual(self.blob[7], ord("o"))
    1283          self.assertEqual(self.blob[8], ord("b"))
    1284          self.assertEqual(self.blob[-1], ord("!"))
    1285  
    1286      def test_blob_set_item(self):
    1287          self.blob[0] = ord("b")
    1288          expected = b"b" + self.data[1:]
    1289          actual = self.cx.execute("select b from test").fetchone()[0]
    1290          self.assertEqual(actual, expected)
    1291  
    1292      def test_blob_set_item_with_offset(self):
    1293          self.blob.seek(0, SEEK_END)
    1294          self.assertEqual(self.blob.read(), b"")  # verify that we're at EOB
    1295          self.blob[0] = ord("T")
    1296          self.blob[-1] = ord(".")
    1297          self.blob.seek(0, SEEK_SET)
    1298          expected = b"This blob data string is exactly fifty bytes long."
    1299          self.assertEqual(self.blob.read(), expected)
    1300  
    1301      def test_blob_set_slice_buffer_object(self):
    1302          from array import array
    1303          self.blob[0:5] = memoryview(b"12345")
    1304          self.assertEqual(self.blob[0:5], b"12345")
    1305  
    1306          self.blob[0:5] = bytearray(b"23456")
    1307          self.assertEqual(self.blob[0:5], b"23456")
    1308  
    1309          self.blob[0:5] = array("b", [1, 2, 3, 4, 5])
    1310          self.assertEqual(self.blob[0:5], b"\x01\x02\x03\x04\x05")
    1311  
    1312      def test_blob_set_item_negative_index(self):
    1313          self.blob[-1] = 255
    1314          self.assertEqual(self.blob[-1], 255)
    1315  
    1316      def test_blob_get_slice(self):
    1317          self.assertEqual(self.blob[5:14], b"blob data")
    1318  
    1319      def test_blob_get_empty_slice(self):
    1320          self.assertEqual(self.blob[5:5], b"")
    1321  
    1322      def test_blob_get_slice_negative_index(self):
    1323          self.assertEqual(self.blob[5:-5], self.data[5:-5])
    1324  
    1325      def test_blob_get_slice_with_skip(self):
    1326          self.assertEqual(self.blob[0:10:2], b"ti lb")
    1327  
    1328      def test_blob_set_slice(self):
    1329          self.blob[0:5] = b"12345"
    1330          expected = b"12345" + self.data[5:]
    1331          actual = self.cx.execute("select b from test").fetchone()[0]
    1332          self.assertEqual(actual, expected)
    1333  
    1334      def test_blob_set_empty_slice(self):
    1335          self.blob[0:0] = b""
    1336          self.assertEqual(self.blob[:], self.data)
    1337  
    1338      def test_blob_set_slice_with_skip(self):
    1339          self.blob[0:10:2] = b"12345"
    1340          actual = self.cx.execute("select b from test").fetchone()[0]
    1341          expected = b"1h2s3b4o5 " + self.data[10:]
    1342          self.assertEqual(actual, expected)
    1343  
    1344      def test_blob_mapping_invalid_index_type(self):
    1345          msg = "indices must be integers"
    1346          with self.assertRaisesRegex(TypeError, msg):
    1347              self.blob[5:5.5]
    1348          with self.assertRaisesRegex(TypeError, msg):
    1349              self.blob[1.5]
    1350          with self.assertRaisesRegex(TypeError, msg):
    1351              self.blob["a"] = b"b"
    1352  
    1353      def test_blob_get_item_error(self):
    1354          dataset = [len(self.blob), 105, -105]
    1355          for idx in dataset:
    1356              with self.subTest(idx=idx):
    1357                  with self.assertRaisesRegex(IndexError, "index out of range"):
    1358                      self.blob[idx]
    1359          with self.assertRaisesRegex(IndexError, "cannot fit 'int'"):
    1360              self.blob[ULLONG_MAX]
    1361  
    1362          # Provoke read error
    1363          self.cx.execute("update test set b='aaaa' where rowid=1")
    1364          with self.assertRaises(sqlite.OperationalError):
    1365              self.blob[0]
    1366  
    1367      def test_blob_set_item_error(self):
    1368          with self.assertRaisesRegex(TypeError, "cannot be interpreted"):
    1369              self.blob[0] = b"multiple"
    1370          with self.assertRaisesRegex(TypeError, "cannot be interpreted"):
    1371              self.blob[0] = b"1"
    1372          with self.assertRaisesRegex(TypeError, "cannot be interpreted"):
    1373              self.blob[0] = bytearray(b"1")
    1374          with self.assertRaisesRegex(TypeError, "doesn't support.*deletion"):
    1375              del self.blob[0]
    1376          with self.assertRaisesRegex(IndexError, "Blob index out of range"):
    1377              self.blob[1000] = 0
    1378          with self.assertRaisesRegex(ValueError, "must be in range"):
    1379              self.blob[0] = -1
    1380          with self.assertRaisesRegex(ValueError, "must be in range"):
    1381              self.blob[0] = 256
    1382          # Overflow errors are overridden with ValueError
    1383          with self.assertRaisesRegex(ValueError, "must be in range"):
    1384              self.blob[0] = 2**65
    1385  
    1386      def test_blob_set_slice_error(self):
    1387          with self.assertRaisesRegex(IndexError, "wrong size"):
    1388              self.blob[5:10] = b"a"
    1389          with self.assertRaisesRegex(IndexError, "wrong size"):
    1390              self.blob[5:10] = b"a" * 1000
    1391          with self.assertRaisesRegex(TypeError, "doesn't support.*deletion"):
    1392              del self.blob[5:10]
    1393          with self.assertRaisesRegex(ValueError, "step cannot be zero"):
    1394              self.blob[5:10:0] = b"12345"
    1395          with self.assertRaises(BufferError):
    1396              self.blob[5:10] = memoryview(b"abcde")[::2]
    1397  
    1398      def test_blob_sequence_not_supported(self):
    1399          with self.assertRaisesRegex(TypeError, "unsupported operand"):
    1400              self.blob + self.blob
    1401          with self.assertRaisesRegex(TypeError, "unsupported operand"):
    1402              self.blob * 5
    1403          with self.assertRaisesRegex(TypeError, "is not iterable"):
    1404              b"a" in self.blob
    1405  
    1406      def test_blob_context_manager(self):
    1407          data = b"a" * 50
    1408          with self.cx.blobopen("test", "b", 1) as blob:
    1409              blob.write(data)
    1410          actual = self.cx.execute("select b from test").fetchone()[0]
    1411          self.assertEqual(actual, data)
    1412  
    1413          # Check that __exit__ closed the blob
    1414          with self.assertRaisesRegex(sqlite.ProgrammingError, "closed blob"):
    1415              blob.read()
    1416  
    1417      def test_blob_context_manager_reraise_exceptions(self):
    1418          class ESC[4;38;5;81mDummyException(ESC[4;38;5;149mException):
    1419              pass
    1420          with self.assertRaisesRegex(DummyException, "reraised"):
    1421              with self.cx.blobopen("test", "b", 1) as blob:
    1422                  raise DummyException("reraised")
    1423  
    1424  
    1425      def test_blob_closed(self):
    1426          with memory_database() as cx:
    1427              cx.execute("create table test(b blob)")
    1428              cx.execute("insert into test values (zeroblob(100))")
    1429              blob = cx.blobopen("test", "b", 1)
    1430              blob.close()
    1431  
    1432              msg = "Cannot operate on a closed blob"
    1433              with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
    1434                  blob.read()
    1435              with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
    1436                  blob.write(b"")
    1437              with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
    1438                  blob.seek(0)
    1439              with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
    1440                  blob.tell()
    1441              with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
    1442                  blob.__enter__()
    1443              with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
    1444                  blob.__exit__(None, None, None)
    1445              with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
    1446                  len(blob)
    1447              with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
    1448                  blob[0]
    1449              with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
    1450                  blob[0:1]
    1451              with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
    1452                  blob[0] = b""
    1453  
    1454      def test_blob_closed_db_read(self):
    1455          with memory_database() as cx:
    1456              cx.execute("create table test(b blob)")
    1457              cx.execute("insert into test(b) values (zeroblob(100))")
    1458              blob = cx.blobopen("test", "b", 1)
    1459              cx.close()
    1460              self.assertRaisesRegex(sqlite.ProgrammingError,
    1461                                     "Cannot operate on a closed database",
    1462                                     blob.read)
    1463  
    1464      def test_blob_32bit_rowid(self):
    1465          # gh-100370: we should not get an OverflowError for 32-bit rowids
    1466          with memory_database() as cx:
    1467              rowid = 2**32
    1468              cx.execute("create table t(t blob)")
    1469              cx.execute("insert into t(rowid, t) values (?, zeroblob(1))", (rowid,))
    1470              cx.blobopen('t', 't', rowid)
    1471  
    1472  
    1473  @threading_helper.requires_working_threading()
    1474  class ESC[4;38;5;81mThreadTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1475      def setUp(self):
    1476          self.con = sqlite.connect(":memory:")
    1477          self.cur = self.con.cursor()
    1478          self.cur.execute("create table test(name text, b blob)")
    1479          self.cur.execute("insert into test values('blob', zeroblob(1))")
    1480  
    1481      def tearDown(self):
    1482          self.cur.close()
    1483          self.con.close()
    1484  
    1485      @threading_helper.reap_threads
    1486      def _run_test(self, fn, *args, **kwds):
    1487          def run(err):
    1488              try:
    1489                  fn(*args, **kwds)
    1490                  err.append("did not raise ProgrammingError")
    1491              except sqlite.ProgrammingError:
    1492                  pass
    1493              except:
    1494                  err.append("raised wrong exception")
    1495  
    1496          err = []
    1497          t = threading.Thread(target=run, kwargs={"err": err})
    1498          t.start()
    1499          t.join()
    1500          if err:
    1501              self.fail("\n".join(err))
    1502  
    1503      def test_check_connection_thread(self):
    1504          fns = [
    1505              lambda: self.con.cursor(),
    1506              lambda: self.con.commit(),
    1507              lambda: self.con.rollback(),
    1508              lambda: self.con.close(),
    1509              lambda: self.con.set_trace_callback(None),
    1510              lambda: self.con.set_authorizer(None),
    1511              lambda: self.con.create_collation("foo", None),
    1512              lambda: self.con.setlimit(sqlite.SQLITE_LIMIT_LENGTH, -1),
    1513              lambda: self.con.getlimit(sqlite.SQLITE_LIMIT_LENGTH),
    1514              lambda: self.con.blobopen("test", "b", 1),
    1515          ]
    1516          if hasattr(sqlite.Connection, "serialize"):
    1517              fns.append(lambda: self.con.serialize())
    1518              fns.append(lambda: self.con.deserialize(b""))
    1519          if sqlite.sqlite_version_info >= (3, 25, 0):
    1520              fns.append(lambda: self.con.create_window_function("foo", 0, None))
    1521  
    1522          for fn in fns:
    1523              with self.subTest(fn=fn):
    1524                  self._run_test(fn)
    1525  
    1526      def test_check_cursor_thread(self):
    1527          fns = [
    1528              lambda: self.cur.execute("insert into test(name) values('a')"),
    1529              lambda: self.cur.close(),
    1530              lambda: self.cur.execute("select name from test"),
    1531              lambda: self.cur.fetchone(),
    1532          ]
    1533          for fn in fns:
    1534              with self.subTest(fn=fn):
    1535                  self._run_test(fn)
    1536  
    1537  
    1538      @threading_helper.reap_threads
    1539      def test_dont_check_same_thread(self):
    1540          def run(con, err):
    1541              try:
    1542                  con.execute("select 1")
    1543              except sqlite.Error:
    1544                  err.append("multi-threading not allowed")
    1545  
    1546          con = sqlite.connect(":memory:", check_same_thread=False)
    1547          err = []
    1548          t = threading.Thread(target=run, kwargs={"con": con, "err": err})
    1549          t.start()
    1550          t.join()
    1551          self.assertEqual(len(err), 0, "\n".join(err))
    1552  
    1553  
    1554  class ESC[4;38;5;81mConstructorTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1555      def test_date(self):
    1556          d = sqlite.Date(2004, 10, 28)
    1557  
    1558      def test_time(self):
    1559          t = sqlite.Time(12, 39, 35)
    1560  
    1561      def test_timestamp(self):
    1562          ts = sqlite.Timestamp(2004, 10, 28, 12, 39, 35)
    1563  
    1564      def test_date_from_ticks(self):
    1565          d = sqlite.DateFromTicks(42)
    1566  
    1567      def test_time_from_ticks(self):
    1568          t = sqlite.TimeFromTicks(42)
    1569  
    1570      def test_timestamp_from_ticks(self):
    1571          ts = sqlite.TimestampFromTicks(42)
    1572  
    1573      def test_binary(self):
    1574          b = sqlite.Binary(b"\0'")
    1575  
    1576  class ESC[4;38;5;81mExtensionTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1577      def test_script_string_sql(self):
    1578          con = sqlite.connect(":memory:")
    1579          cur = con.cursor()
    1580          cur.executescript("""
    1581              -- bla bla
    1582              /* a stupid comment */
    1583              create table a(i);
    1584              insert into a(i) values (5);
    1585              """)
    1586          cur.execute("select i from a")
    1587          res = cur.fetchone()[0]
    1588          self.assertEqual(res, 5)
    1589  
    1590      def test_script_syntax_error(self):
    1591          con = sqlite.connect(":memory:")
    1592          cur = con.cursor()
    1593          with self.assertRaises(sqlite.OperationalError):
    1594              cur.executescript("create table test(x); asdf; create table test2(x)")
    1595  
    1596      def test_script_error_normal(self):
    1597          con = sqlite.connect(":memory:")
    1598          cur = con.cursor()
    1599          with self.assertRaises(sqlite.OperationalError):
    1600              cur.executescript("create table test(sadfsadfdsa); select foo from hurz;")
    1601  
    1602      def test_cursor_executescript_as_bytes(self):
    1603          con = sqlite.connect(":memory:")
    1604          cur = con.cursor()
    1605          with self.assertRaises(TypeError):
    1606              cur.executescript(b"create table test(foo); insert into test(foo) values (5);")
    1607  
    1608      def test_cursor_executescript_with_null_characters(self):
    1609          con = sqlite.connect(":memory:")
    1610          cur = con.cursor()
    1611          with self.assertRaises(ValueError):
    1612              cur.executescript("""
    1613                  create table a(i);\0
    1614                  insert into a(i) values (5);
    1615                  """)
    1616  
    1617      def test_cursor_executescript_with_surrogates(self):
    1618          con = sqlite.connect(":memory:")
    1619          cur = con.cursor()
    1620          with self.assertRaises(UnicodeEncodeError):
    1621              cur.executescript("""
    1622                  create table a(s);
    1623                  insert into a(s) values ('\ud8ff');
    1624                  """)
    1625  
    1626      def test_cursor_executescript_too_large_script(self):
    1627          msg = "query string is too large"
    1628          with memory_database() as cx, cx_limit(cx) as lim:
    1629              cx.executescript("select 'almost too large'".ljust(lim))
    1630              with self.assertRaisesRegex(sqlite.DataError, msg):
    1631                  cx.executescript("select 'too large'".ljust(lim+1))
    1632  
    1633      def test_cursor_executescript_tx_control(self):
    1634          con = sqlite.connect(":memory:")
    1635          con.execute("begin")
    1636          self.assertTrue(con.in_transaction)
    1637          con.executescript("select 1")
    1638          self.assertFalse(con.in_transaction)
    1639  
    1640      def test_connection_execute(self):
    1641          con = sqlite.connect(":memory:")
    1642          result = con.execute("select 5").fetchone()[0]
    1643          self.assertEqual(result, 5, "Basic test of Connection.execute")
    1644  
    1645      def test_connection_executemany(self):
    1646          con = sqlite.connect(":memory:")
    1647          con.execute("create table test(foo)")
    1648          con.executemany("insert into test(foo) values (?)", [(3,), (4,)])
    1649          result = con.execute("select foo from test order by foo").fetchall()
    1650          self.assertEqual(result[0][0], 3, "Basic test of Connection.executemany")
    1651          self.assertEqual(result[1][0], 4, "Basic test of Connection.executemany")
    1652  
    1653      def test_connection_executescript(self):
    1654          con = sqlite.connect(":memory:")
    1655          con.executescript("create table test(foo); insert into test(foo) values (5);")
    1656          result = con.execute("select foo from test").fetchone()[0]
    1657          self.assertEqual(result, 5, "Basic test of Connection.executescript")
    1658  
    1659  class ESC[4;38;5;81mClosedConTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1660      def test_closed_con_cursor(self):
    1661          con = sqlite.connect(":memory:")
    1662          con.close()
    1663          with self.assertRaises(sqlite.ProgrammingError):
    1664              cur = con.cursor()
    1665  
    1666      def test_closed_con_commit(self):
    1667          con = sqlite.connect(":memory:")
    1668          con.close()
    1669          with self.assertRaises(sqlite.ProgrammingError):
    1670              con.commit()
    1671  
    1672      def test_closed_con_rollback(self):
    1673          con = sqlite.connect(":memory:")
    1674          con.close()
    1675          with self.assertRaises(sqlite.ProgrammingError):
    1676              con.rollback()
    1677  
    1678      def test_closed_cur_execute(self):
    1679          con = sqlite.connect(":memory:")
    1680          cur = con.cursor()
    1681          con.close()
    1682          with self.assertRaises(sqlite.ProgrammingError):
    1683              cur.execute("select 4")
    1684  
    1685      def test_closed_create_function(self):
    1686          con = sqlite.connect(":memory:")
    1687          con.close()
    1688          def f(x): return 17
    1689          with self.assertRaises(sqlite.ProgrammingError):
    1690              con.create_function("foo", 1, f)
    1691  
    1692      def test_closed_create_aggregate(self):
    1693          con = sqlite.connect(":memory:")
    1694          con.close()
    1695          class ESC[4;38;5;81mAgg:
    1696              def __init__(self):
    1697                  pass
    1698              def step(self, x):
    1699                  pass
    1700              def finalize(self):
    1701                  return 17
    1702          with self.assertRaises(sqlite.ProgrammingError):
    1703              con.create_aggregate("foo", 1, Agg)
    1704  
    1705      def test_closed_set_authorizer(self):
    1706          con = sqlite.connect(":memory:")
    1707          con.close()
    1708          def authorizer(*args):
    1709              return sqlite.DENY
    1710          with self.assertRaises(sqlite.ProgrammingError):
    1711              con.set_authorizer(authorizer)
    1712  
    1713      def test_closed_set_progress_callback(self):
    1714          con = sqlite.connect(":memory:")
    1715          con.close()
    1716          def progress(): pass
    1717          with self.assertRaises(sqlite.ProgrammingError):
    1718              con.set_progress_handler(progress, 100)
    1719  
    1720      def test_closed_call(self):
    1721          con = sqlite.connect(":memory:")
    1722          con.close()
    1723          with self.assertRaises(sqlite.ProgrammingError):
    1724              con()
    1725  
    1726  class ESC[4;38;5;81mClosedCurTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1727      def test_closed(self):
    1728          con = sqlite.connect(":memory:")
    1729          cur = con.cursor()
    1730          cur.close()
    1731  
    1732          for method_name in ("execute", "executemany", "executescript", "fetchall", "fetchmany", "fetchone"):
    1733              if method_name in ("execute", "executescript"):
    1734                  params = ("select 4 union select 5",)
    1735              elif method_name == "executemany":
    1736                  params = ("insert into foo(bar) values (?)", [(3,), (4,)])
    1737              else:
    1738                  params = []
    1739  
    1740              with self.assertRaises(sqlite.ProgrammingError):
    1741                  method = getattr(cur, method_name)
    1742                  method(*params)
    1743  
    1744  
    1745  class ESC[4;38;5;81mSqliteOnConflictTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1746      """
    1747      Tests for SQLite's "insert on conflict" feature.
    1748  
    1749      See https://www.sqlite.org/lang_conflict.html for details.
    1750      """
    1751  
    1752      def setUp(self):
    1753          self.cx = sqlite.connect(":memory:")
    1754          self.cu = self.cx.cursor()
    1755          self.cu.execute("""
    1756            CREATE TABLE test(
    1757              id INTEGER PRIMARY KEY, name TEXT, unique_name TEXT UNIQUE
    1758            );
    1759          """)
    1760  
    1761      def tearDown(self):
    1762          self.cu.close()
    1763          self.cx.close()
    1764  
    1765      def test_on_conflict_rollback_with_explicit_transaction(self):
    1766          self.cx.isolation_level = None  # autocommit mode
    1767          self.cu = self.cx.cursor()
    1768          # Start an explicit transaction.
    1769          self.cu.execute("BEGIN")
    1770          self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')")
    1771          self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')")
    1772          with self.assertRaises(sqlite.IntegrityError):
    1773              self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')")
    1774          # Use connection to commit.
    1775          self.cx.commit()
    1776          self.cu.execute("SELECT name, unique_name from test")
    1777          # Transaction should have rolled back and nothing should be in table.
    1778          self.assertEqual(self.cu.fetchall(), [])
    1779  
    1780      def test_on_conflict_abort_raises_with_explicit_transactions(self):
    1781          # Abort cancels the current sql statement but doesn't change anything
    1782          # about the current transaction.
    1783          self.cx.isolation_level = None  # autocommit mode
    1784          self.cu = self.cx.cursor()
    1785          # Start an explicit transaction.
    1786          self.cu.execute("BEGIN")
    1787          self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')")
    1788          self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')")
    1789          with self.assertRaises(sqlite.IntegrityError):
    1790              self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')")
    1791          self.cx.commit()
    1792          self.cu.execute("SELECT name, unique_name FROM test")
    1793          # Expect the first two inserts to work, third to do nothing.
    1794          self.assertEqual(self.cu.fetchall(), [('abort_test', None), (None, 'foo',)])
    1795  
    1796      def test_on_conflict_rollback_without_transaction(self):
    1797          # Start of implicit transaction
    1798          self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')")
    1799          self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')")
    1800          with self.assertRaises(sqlite.IntegrityError):
    1801              self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')")
    1802          self.cu.execute("SELECT name, unique_name FROM test")
    1803          # Implicit transaction is rolled back on error.
    1804          self.assertEqual(self.cu.fetchall(), [])
    1805  
    1806      def test_on_conflict_abort_raises_without_transactions(self):
    1807          # Abort cancels the current sql statement but doesn't change anything
    1808          # about the current transaction.
    1809          self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')")
    1810          self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')")
    1811          with self.assertRaises(sqlite.IntegrityError):
    1812              self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')")
    1813          # Make sure all other values were inserted.
    1814          self.cu.execute("SELECT name, unique_name FROM test")
    1815          self.assertEqual(self.cu.fetchall(), [('abort_test', None), (None, 'foo',)])
    1816  
    1817      def test_on_conflict_fail(self):
    1818          self.cu.execute("INSERT OR FAIL INTO test(unique_name) VALUES ('foo')")
    1819          with self.assertRaises(sqlite.IntegrityError):
    1820              self.cu.execute("INSERT OR FAIL INTO test(unique_name) VALUES ('foo')")
    1821          self.assertEqual(self.cu.fetchall(), [])
    1822  
    1823      def test_on_conflict_ignore(self):
    1824          self.cu.execute("INSERT OR IGNORE INTO test(unique_name) VALUES ('foo')")
    1825          # Nothing should happen.
    1826          self.cu.execute("INSERT OR IGNORE INTO test(unique_name) VALUES ('foo')")
    1827          self.cu.execute("SELECT unique_name FROM test")
    1828          self.assertEqual(self.cu.fetchall(), [('foo',)])
    1829  
    1830      def test_on_conflict_replace(self):
    1831          self.cu.execute("INSERT OR REPLACE INTO test(name, unique_name) VALUES ('Data!', 'foo')")
    1832          # There shouldn't be an IntegrityError exception.
    1833          self.cu.execute("INSERT OR REPLACE INTO test(name, unique_name) VALUES ('Very different data!', 'foo')")
    1834          self.cu.execute("SELECT name, unique_name FROM test")
    1835          self.assertEqual(self.cu.fetchall(), [('Very different data!', 'foo')])
    1836  
    1837  
    1838  @requires_subprocess()
    1839  class ESC[4;38;5;81mMultiprocessTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1840      CONNECTION_TIMEOUT = 0  # Disable the busy timeout.
    1841  
    1842      def tearDown(self):
    1843          unlink(TESTFN)
    1844  
    1845      def test_ctx_mgr_rollback_if_commit_failed(self):
    1846          # bpo-27334: ctx manager does not rollback if commit fails
    1847          SCRIPT = f"""if 1:
    1848              import sqlite3
    1849              def wait():
    1850                  print("started")
    1851                  assert "database is locked" in input()
    1852  
    1853              cx = sqlite3.connect("{TESTFN}", timeout={self.CONNECTION_TIMEOUT})
    1854              cx.create_function("wait", 0, wait)
    1855              with cx:
    1856                  cx.execute("create table t(t)")
    1857              try:
    1858                  # execute two transactions; both will try to lock the db
    1859                  cx.executescript('''
    1860                      -- start a transaction and wait for parent
    1861                      begin transaction;
    1862                      select * from t;
    1863                      select wait();
    1864                      rollback;
    1865  
    1866                      -- start a new transaction; would fail if parent holds lock
    1867                      begin transaction;
    1868                      select * from t;
    1869                      rollback;
    1870                  ''')
    1871              finally:
    1872                  cx.close()
    1873          """
    1874  
    1875          # spawn child process
    1876          proc = subprocess.Popen(
    1877              [sys.executable, "-c", SCRIPT],
    1878              encoding="utf-8",
    1879              bufsize=0,
    1880              stdin=subprocess.PIPE,
    1881              stdout=subprocess.PIPE,
    1882          )
    1883          self.addCleanup(proc.communicate)
    1884  
    1885          # wait for child process to start
    1886          self.assertEqual("started", proc.stdout.readline().strip())
    1887  
    1888          cx = sqlite.connect(TESTFN, timeout=self.CONNECTION_TIMEOUT)
    1889          try:  # context manager should correctly release the db lock
    1890              with cx:
    1891                  cx.execute("insert into t values('test')")
    1892          except sqlite.OperationalError as exc:
    1893              proc.stdin.write(str(exc))
    1894          else:
    1895              proc.stdin.write("no error")
    1896          finally:
    1897              cx.close()
    1898  
    1899          # terminate child process
    1900          self.assertIsNone(proc.returncode)
    1901          try:
    1902              proc.communicate(input="end", timeout=SHORT_TIMEOUT)
    1903          except subprocess.TimeoutExpired:
    1904              proc.kill()
    1905              proc.communicate()
    1906              raise
    1907          self.assertEqual(proc.returncode, 0)
    1908  
    1909  
    1910  if __name__ == "__main__":
    1911      unittest.main()