(root)/
Python-3.12.0/
Lib/
test/
test_sqlite3/
test_dbapi.py
       1  # pysqlite2/test/dbapi.py: tests for DB-API compliance
       2  #
       3  # Copyright (C) 2004-2010 Gerhard Häring <gh@ghaering.de>
       4  #
       5  # This file is part of pysqlite.
       6  #
       7  # This software is provided 'as-is', without any express or implied
       8  # warranty.  In no event will the authors be held liable for any damages
       9  # arising from the use of this software.
      10  #
      11  # Permission is granted to anyone to use this software for any purpose,
      12  # including commercial applications, and to alter it and redistribute it
      13  # freely, subject to the following restrictions:
      14  #
      15  # 1. The origin of this software must not be misrepresented; you must not
      16  #    claim that you wrote the original software. If you use this software
      17  #    in a product, an acknowledgment in the product documentation would be
      18  #    appreciated but is not required.
      19  # 2. Altered source versions must be plainly marked as such, and must not be
      20  #    misrepresented as being the original software.
      21  # 3. This notice may not be removed or altered from any source distribution.
      22  
      23  import contextlib
      24  import os
      25  import sqlite3 as sqlite
      26  import subprocess
      27  import sys
      28  import threading
      29  import unittest
      30  import urllib.parse
      31  
      32  from test.support import (
      33      SHORT_TIMEOUT, check_disallow_instantiation, requires_subprocess,
      34      is_emscripten, is_wasi
      35  )
      36  from test.support import threading_helper
      37  from _testcapi import INT_MAX, ULLONG_MAX
      38  from os import SEEK_SET, SEEK_CUR, SEEK_END
      39  from test.support.os_helper import TESTFN, TESTFN_UNDECODABLE, unlink, temp_dir, FakePath
      40  
      41  
      42  # Helper for temporary memory databases
      43  def memory_database(*args, **kwargs):
      44      cx = sqlite.connect(":memory:", *args, **kwargs)
      45      return contextlib.closing(cx)
      46  
      47  
      48  # Temporarily limit a database connection parameter
      49  @contextlib.contextmanager
      50  def cx_limit(cx, category=sqlite.SQLITE_LIMIT_SQL_LENGTH, limit=128):
      51      try:
      52          _prev = cx.setlimit(category, limit)
      53          yield limit
      54      finally:
      55          cx.setlimit(category, _prev)
      56  
      57  
      58  class ESC[4;38;5;81mModuleTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      59      def test_api_level(self):
      60          self.assertEqual(sqlite.apilevel, "2.0",
      61                           "apilevel is %s, should be 2.0" % sqlite.apilevel)
      62  
      63      def test_deprecated_version(self):
      64          msg = "deprecated and will be removed in Python 3.14"
      65          for attr in "version", "version_info":
      66              with self.subTest(attr=attr):
      67                  with self.assertWarnsRegex(DeprecationWarning, msg) as cm:
      68                      getattr(sqlite, attr)
      69                  self.assertEqual(cm.filename,  __file__)
      70                  with self.assertWarnsRegex(DeprecationWarning, msg) as cm:
      71                      getattr(sqlite.dbapi2, attr)
      72                  self.assertEqual(cm.filename,  __file__)
      73  
      74      def test_thread_safety(self):
      75          self.assertIn(sqlite.threadsafety, {0, 1, 3},
      76                        "threadsafety is %d, should be 0, 1 or 3" %
      77                        sqlite.threadsafety)
      78  
      79      def test_param_style(self):
      80          self.assertEqual(sqlite.paramstyle, "qmark",
      81                           "paramstyle is '%s', should be 'qmark'" %
      82                           sqlite.paramstyle)
      83  
      84      def test_warning(self):
      85          self.assertTrue(issubclass(sqlite.Warning, Exception),
      86                       "Warning is not a subclass of Exception")
      87  
      88      def test_error(self):
      89          self.assertTrue(issubclass(sqlite.Error, Exception),
      90                          "Error is not a subclass of Exception")
      91  
      92      def test_interface_error(self):
      93          self.assertTrue(issubclass(sqlite.InterfaceError, sqlite.Error),
      94                          "InterfaceError is not a subclass of Error")
      95  
      96      def test_database_error(self):
      97          self.assertTrue(issubclass(sqlite.DatabaseError, sqlite.Error),
      98                          "DatabaseError is not a subclass of Error")
      99  
     100      def test_data_error(self):
     101          self.assertTrue(issubclass(sqlite.DataError, sqlite.DatabaseError),
     102                          "DataError is not a subclass of DatabaseError")
     103  
     104      def test_operational_error(self):
     105          self.assertTrue(issubclass(sqlite.OperationalError, sqlite.DatabaseError),
     106                          "OperationalError is not a subclass of DatabaseError")
     107  
     108      def test_integrity_error(self):
     109          self.assertTrue(issubclass(sqlite.IntegrityError, sqlite.DatabaseError),
     110                          "IntegrityError is not a subclass of DatabaseError")
     111  
     112      def test_internal_error(self):
     113          self.assertTrue(issubclass(sqlite.InternalError, sqlite.DatabaseError),
     114                          "InternalError is not a subclass of DatabaseError")
     115  
     116      def test_programming_error(self):
     117          self.assertTrue(issubclass(sqlite.ProgrammingError, sqlite.DatabaseError),
     118                          "ProgrammingError is not a subclass of DatabaseError")
     119  
     120      def test_not_supported_error(self):
     121          self.assertTrue(issubclass(sqlite.NotSupportedError,
     122                                     sqlite.DatabaseError),
     123                          "NotSupportedError is not a subclass of DatabaseError")
     124  
     125      def test_module_constants(self):
     126          consts = [
     127              "SQLITE_ABORT",
     128              "SQLITE_ALTER_TABLE",
     129              "SQLITE_ANALYZE",
     130              "SQLITE_ATTACH",
     131              "SQLITE_AUTH",
     132              "SQLITE_BUSY",
     133              "SQLITE_CANTOPEN",
     134              "SQLITE_CONSTRAINT",
     135              "SQLITE_CORRUPT",
     136              "SQLITE_CREATE_INDEX",
     137              "SQLITE_CREATE_TABLE",
     138              "SQLITE_CREATE_TEMP_INDEX",
     139              "SQLITE_CREATE_TEMP_TABLE",
     140              "SQLITE_CREATE_TEMP_TRIGGER",
     141              "SQLITE_CREATE_TEMP_VIEW",
     142              "SQLITE_CREATE_TRIGGER",
     143              "SQLITE_CREATE_VIEW",
     144              "SQLITE_CREATE_VTABLE",
     145              "SQLITE_DELETE",
     146              "SQLITE_DENY",
     147              "SQLITE_DETACH",
     148              "SQLITE_DONE",
     149              "SQLITE_DROP_INDEX",
     150              "SQLITE_DROP_TABLE",
     151              "SQLITE_DROP_TEMP_INDEX",
     152              "SQLITE_DROP_TEMP_TABLE",
     153              "SQLITE_DROP_TEMP_TRIGGER",
     154              "SQLITE_DROP_TEMP_VIEW",
     155              "SQLITE_DROP_TRIGGER",
     156              "SQLITE_DROP_VIEW",
     157              "SQLITE_DROP_VTABLE",
     158              "SQLITE_EMPTY",
     159              "SQLITE_ERROR",
     160              "SQLITE_FORMAT",
     161              "SQLITE_FULL",
     162              "SQLITE_FUNCTION",
     163              "SQLITE_IGNORE",
     164              "SQLITE_INSERT",
     165              "SQLITE_INTERNAL",
     166              "SQLITE_INTERRUPT",
     167              "SQLITE_IOERR",
     168              "SQLITE_LOCKED",
     169              "SQLITE_MISMATCH",
     170              "SQLITE_MISUSE",
     171              "SQLITE_NOLFS",
     172              "SQLITE_NOMEM",
     173              "SQLITE_NOTADB",
     174              "SQLITE_NOTFOUND",
     175              "SQLITE_OK",
     176              "SQLITE_PERM",
     177              "SQLITE_PRAGMA",
     178              "SQLITE_PROTOCOL",
     179              "SQLITE_RANGE",
     180              "SQLITE_READ",
     181              "SQLITE_READONLY",
     182              "SQLITE_REINDEX",
     183              "SQLITE_ROW",
     184              "SQLITE_SAVEPOINT",
     185              "SQLITE_SCHEMA",
     186              "SQLITE_SELECT",
     187              "SQLITE_TOOBIG",
     188              "SQLITE_TRANSACTION",
     189              "SQLITE_UPDATE",
     190              # Run-time limit categories
     191              "SQLITE_LIMIT_LENGTH",
     192              "SQLITE_LIMIT_SQL_LENGTH",
     193              "SQLITE_LIMIT_COLUMN",
     194              "SQLITE_LIMIT_EXPR_DEPTH",
     195              "SQLITE_LIMIT_COMPOUND_SELECT",
     196              "SQLITE_LIMIT_VDBE_OP",
     197              "SQLITE_LIMIT_FUNCTION_ARG",
     198              "SQLITE_LIMIT_ATTACHED",
     199              "SQLITE_LIMIT_LIKE_PATTERN_LENGTH",
     200              "SQLITE_LIMIT_VARIABLE_NUMBER",
     201              "SQLITE_LIMIT_TRIGGER_DEPTH",
     202          ]
     203          if sqlite.sqlite_version_info >= (3, 7, 17):
     204              consts += ["SQLITE_NOTICE", "SQLITE_WARNING"]
     205          if sqlite.sqlite_version_info >= (3, 8, 3):
     206              consts.append("SQLITE_RECURSIVE")
     207          if sqlite.sqlite_version_info >= (3, 8, 7):
     208              consts.append("SQLITE_LIMIT_WORKER_THREADS")
     209          consts += ["PARSE_DECLTYPES", "PARSE_COLNAMES"]
     210          # Extended result codes
     211          consts += [
     212              "SQLITE_ABORT_ROLLBACK",
     213              "SQLITE_BUSY_RECOVERY",
     214              "SQLITE_CANTOPEN_FULLPATH",
     215              "SQLITE_CANTOPEN_ISDIR",
     216              "SQLITE_CANTOPEN_NOTEMPDIR",
     217              "SQLITE_CORRUPT_VTAB",
     218              "SQLITE_IOERR_ACCESS",
     219              "SQLITE_IOERR_BLOCKED",
     220              "SQLITE_IOERR_CHECKRESERVEDLOCK",
     221              "SQLITE_IOERR_CLOSE",
     222              "SQLITE_IOERR_DELETE",
     223              "SQLITE_IOERR_DELETE_NOENT",
     224              "SQLITE_IOERR_DIR_CLOSE",
     225              "SQLITE_IOERR_DIR_FSYNC",
     226              "SQLITE_IOERR_FSTAT",
     227              "SQLITE_IOERR_FSYNC",
     228              "SQLITE_IOERR_LOCK",
     229              "SQLITE_IOERR_NOMEM",
     230              "SQLITE_IOERR_RDLOCK",
     231              "SQLITE_IOERR_READ",
     232              "SQLITE_IOERR_SEEK",
     233              "SQLITE_IOERR_SHMLOCK",
     234              "SQLITE_IOERR_SHMMAP",
     235              "SQLITE_IOERR_SHMOPEN",
     236              "SQLITE_IOERR_SHMSIZE",
     237              "SQLITE_IOERR_SHORT_READ",
     238              "SQLITE_IOERR_TRUNCATE",
     239              "SQLITE_IOERR_UNLOCK",
     240              "SQLITE_IOERR_WRITE",
     241              "SQLITE_LOCKED_SHAREDCACHE",
     242              "SQLITE_READONLY_CANTLOCK",
     243              "SQLITE_READONLY_RECOVERY",
     244          ]
     245          if sqlite.sqlite_version_info >= (3, 7, 16):
     246              consts += [
     247                  "SQLITE_CONSTRAINT_CHECK",
     248                  "SQLITE_CONSTRAINT_COMMITHOOK",
     249                  "SQLITE_CONSTRAINT_FOREIGNKEY",
     250                  "SQLITE_CONSTRAINT_FUNCTION",
     251                  "SQLITE_CONSTRAINT_NOTNULL",
     252                  "SQLITE_CONSTRAINT_PRIMARYKEY",
     253                  "SQLITE_CONSTRAINT_TRIGGER",
     254                  "SQLITE_CONSTRAINT_UNIQUE",
     255                  "SQLITE_CONSTRAINT_VTAB",
     256                  "SQLITE_READONLY_ROLLBACK",
     257              ]
     258          if sqlite.sqlite_version_info >= (3, 7, 17):
     259              consts += [
     260                  "SQLITE_IOERR_MMAP",
     261                  "SQLITE_NOTICE_RECOVER_ROLLBACK",
     262                  "SQLITE_NOTICE_RECOVER_WAL",
     263              ]
     264          if sqlite.sqlite_version_info >= (3, 8, 0):
     265              consts += [
     266                  "SQLITE_BUSY_SNAPSHOT",
     267                  "SQLITE_IOERR_GETTEMPPATH",
     268                  "SQLITE_WARNING_AUTOINDEX",
     269              ]
     270          if sqlite.sqlite_version_info >= (3, 8, 1):
     271              consts += ["SQLITE_CANTOPEN_CONVPATH", "SQLITE_IOERR_CONVPATH"]
     272          if sqlite.sqlite_version_info >= (3, 8, 2):
     273              consts.append("SQLITE_CONSTRAINT_ROWID")
     274          if sqlite.sqlite_version_info >= (3, 8, 3):
     275              consts.append("SQLITE_READONLY_DBMOVED")
     276          if sqlite.sqlite_version_info >= (3, 8, 7):
     277              consts.append("SQLITE_AUTH_USER")
     278          if sqlite.sqlite_version_info >= (3, 9, 0):
     279              consts.append("SQLITE_IOERR_VNODE")
     280          if sqlite.sqlite_version_info >= (3, 10, 0):
     281              consts.append("SQLITE_IOERR_AUTH")
     282          if sqlite.sqlite_version_info >= (3, 14, 1):
     283              consts.append("SQLITE_OK_LOAD_PERMANENTLY")
     284          if sqlite.sqlite_version_info >= (3, 21, 0):
     285              consts += [
     286                  "SQLITE_IOERR_BEGIN_ATOMIC",
     287                  "SQLITE_IOERR_COMMIT_ATOMIC",
     288                  "SQLITE_IOERR_ROLLBACK_ATOMIC",
     289              ]
     290          if sqlite.sqlite_version_info >= (3, 22, 0):
     291              consts += [
     292                  "SQLITE_ERROR_MISSING_COLLSEQ",
     293                  "SQLITE_ERROR_RETRY",
     294                  "SQLITE_READONLY_CANTINIT",
     295                  "SQLITE_READONLY_DIRECTORY",
     296              ]
     297          if sqlite.sqlite_version_info >= (3, 24, 0):
     298              consts += ["SQLITE_CORRUPT_SEQUENCE", "SQLITE_LOCKED_VTAB"]
     299          if sqlite.sqlite_version_info >= (3, 25, 0):
     300              consts += ["SQLITE_CANTOPEN_DIRTYWAL", "SQLITE_ERROR_SNAPSHOT"]
     301          if sqlite.sqlite_version_info >= (3, 31, 0):
     302              consts += [
     303                  "SQLITE_CANTOPEN_SYMLINK",
     304                  "SQLITE_CONSTRAINT_PINNED",
     305                  "SQLITE_OK_SYMLINK",
     306              ]
     307          if sqlite.sqlite_version_info >= (3, 32, 0):
     308              consts += [
     309                  "SQLITE_BUSY_TIMEOUT",
     310                  "SQLITE_CORRUPT_INDEX",
     311                  "SQLITE_IOERR_DATA",
     312              ]
     313          if sqlite.sqlite_version_info >= (3, 34, 0):
     314              consts.append("SQLITE_IOERR_CORRUPTFS")
     315          for const in consts:
     316              with self.subTest(const=const):
     317                  self.assertTrue(hasattr(sqlite, const))
     318  
     319      def test_error_code_on_exception(self):
     320          err_msg = "unable to open database file"
     321          if sys.platform.startswith("win"):
     322              err_code = sqlite.SQLITE_CANTOPEN_ISDIR
     323          else:
     324              err_code = sqlite.SQLITE_CANTOPEN
     325  
     326          with temp_dir() as db:
     327              with self.assertRaisesRegex(sqlite.Error, err_msg) as cm:
     328                  sqlite.connect(db)
     329              e = cm.exception
     330              self.assertEqual(e.sqlite_errorcode, err_code)
     331              self.assertTrue(e.sqlite_errorname.startswith("SQLITE_CANTOPEN"))
     332  
     333      @unittest.skipIf(sqlite.sqlite_version_info <= (3, 7, 16),
     334                       "Requires SQLite 3.7.16 or newer")
     335      def test_extended_error_code_on_exception(self):
     336          with memory_database() as con:
     337              with con:
     338                  con.execute("create table t(t integer check(t > 0))")
     339              errmsg = "constraint failed"
     340              with self.assertRaisesRegex(sqlite.IntegrityError, errmsg) as cm:
     341                  con.execute("insert into t values(-1)")
     342              exc = cm.exception
     343              self.assertEqual(exc.sqlite_errorcode,
     344                               sqlite.SQLITE_CONSTRAINT_CHECK)
     345              self.assertEqual(exc.sqlite_errorname, "SQLITE_CONSTRAINT_CHECK")
     346  
     347      def test_disallow_instantiation(self):
     348          cx = sqlite.connect(":memory:")
     349          check_disallow_instantiation(self, type(cx("select 1")))
     350          check_disallow_instantiation(self, sqlite.Blob)
     351  
     352      def test_complete_statement(self):
     353          self.assertFalse(sqlite.complete_statement("select t"))
     354          self.assertTrue(sqlite.complete_statement("create table t(t);"))
     355  
     356  
     357  class ESC[4;38;5;81mConnectionTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     358  
     359      def setUp(self):
     360          self.cx = sqlite.connect(":memory:")
     361          cu = self.cx.cursor()
     362          cu.execute("create table test(id integer primary key, name text)")
     363          cu.execute("insert into test(name) values (?)", ("foo",))
     364  
     365      def tearDown(self):
     366          self.cx.close()
     367  
     368      def test_commit(self):
     369          self.cx.commit()
     370  
     371      def test_commit_after_no_changes(self):
     372          """
     373          A commit should also work when no changes were made to the database.
     374          """
     375          self.cx.commit()
     376          self.cx.commit()
     377  
     378      def test_rollback(self):
     379          self.cx.rollback()
     380  
     381      def test_rollback_after_no_changes(self):
     382          """
     383          A rollback should also work when no changes were made to the database.
     384          """
     385          self.cx.rollback()
     386          self.cx.rollback()
     387  
     388      def test_cursor(self):
     389          cu = self.cx.cursor()
     390  
     391      def test_failed_open(self):
     392          YOU_CANNOT_OPEN_THIS = "/foo/bar/bla/23534/mydb.db"
     393          with self.assertRaises(sqlite.OperationalError):
     394              sqlite.connect(YOU_CANNOT_OPEN_THIS)
     395  
     396      def test_close(self):
     397          self.cx.close()
     398  
     399      def test_use_after_close(self):
     400          sql = "select 1"
     401          cu = self.cx.cursor()
     402          res = cu.execute(sql)
     403          self.cx.close()
     404          self.assertRaises(sqlite.ProgrammingError, res.fetchall)
     405          self.assertRaises(sqlite.ProgrammingError, cu.execute, sql)
     406          self.assertRaises(sqlite.ProgrammingError, cu.executemany, sql, [])
     407          self.assertRaises(sqlite.ProgrammingError, cu.executescript, sql)
     408          self.assertRaises(sqlite.ProgrammingError, self.cx.execute, sql)
     409          self.assertRaises(sqlite.ProgrammingError,
     410                            self.cx.executemany, sql, [])
     411          self.assertRaises(sqlite.ProgrammingError, self.cx.executescript, sql)
     412          self.assertRaises(sqlite.ProgrammingError,
     413                            self.cx.create_function, "t", 1, lambda x: x)
     414          self.assertRaises(sqlite.ProgrammingError, self.cx.cursor)
     415          with self.assertRaises(sqlite.ProgrammingError):
     416              with self.cx:
     417                  pass
     418  
     419      def test_exceptions(self):
     420          # Optional DB-API extension.
     421          self.assertEqual(self.cx.Warning, sqlite.Warning)
     422          self.assertEqual(self.cx.Error, sqlite.Error)
     423          self.assertEqual(self.cx.InterfaceError, sqlite.InterfaceError)
     424          self.assertEqual(self.cx.DatabaseError, sqlite.DatabaseError)
     425          self.assertEqual(self.cx.DataError, sqlite.DataError)
     426          self.assertEqual(self.cx.OperationalError, sqlite.OperationalError)
     427          self.assertEqual(self.cx.IntegrityError, sqlite.IntegrityError)
     428          self.assertEqual(self.cx.InternalError, sqlite.InternalError)
     429          self.assertEqual(self.cx.ProgrammingError, sqlite.ProgrammingError)
     430          self.assertEqual(self.cx.NotSupportedError, sqlite.NotSupportedError)
     431  
     432      def test_in_transaction(self):
     433          # Can't use db from setUp because we want to test initial state.
     434          cx = sqlite.connect(":memory:")
     435          cu = cx.cursor()
     436          self.assertEqual(cx.in_transaction, False)
     437          cu.execute("create table transactiontest(id integer primary key, name text)")
     438          self.assertEqual(cx.in_transaction, False)
     439          cu.execute("insert into transactiontest(name) values (?)", ("foo",))
     440          self.assertEqual(cx.in_transaction, True)
     441          cu.execute("select name from transactiontest where name=?", ["foo"])
     442          row = cu.fetchone()
     443          self.assertEqual(cx.in_transaction, True)
     444          cx.commit()
     445          self.assertEqual(cx.in_transaction, False)
     446          cu.execute("select name from transactiontest where name=?", ["foo"])
     447          row = cu.fetchone()
     448          self.assertEqual(cx.in_transaction, False)
     449  
     450      def test_in_transaction_ro(self):
     451          with self.assertRaises(AttributeError):
     452              self.cx.in_transaction = True
     453  
     454      def test_connection_exceptions(self):
     455          exceptions = [
     456              "DataError",
     457              "DatabaseError",
     458              "Error",
     459              "IntegrityError",
     460              "InterfaceError",
     461              "NotSupportedError",
     462              "OperationalError",
     463              "ProgrammingError",
     464              "Warning",
     465          ]
     466          for exc in exceptions:
     467              with self.subTest(exc=exc):
     468                  self.assertTrue(hasattr(self.cx, exc))
     469                  self.assertIs(getattr(sqlite, exc), getattr(self.cx, exc))
     470  
     471      def test_interrupt_on_closed_db(self):
     472          cx = sqlite.connect(":memory:")
     473          cx.close()
     474          with self.assertRaises(sqlite.ProgrammingError):
     475              cx.interrupt()
     476  
     477      def test_interrupt(self):
     478          self.assertIsNone(self.cx.interrupt())
     479  
     480      def test_drop_unused_refs(self):
     481          for n in range(500):
     482              cu = self.cx.execute(f"select {n}")
     483              self.assertEqual(cu.fetchone()[0], n)
     484  
     485      def test_connection_limits(self):
     486          category = sqlite.SQLITE_LIMIT_SQL_LENGTH
     487          saved_limit = self.cx.getlimit(category)
     488          try:
     489              new_limit = 10
     490              prev_limit = self.cx.setlimit(category, new_limit)
     491              self.assertEqual(saved_limit, prev_limit)
     492              self.assertEqual(self.cx.getlimit(category), new_limit)
     493              msg = "query string is too large"
     494              self.assertRaisesRegex(sqlite.DataError, msg,
     495                                     self.cx.execute, "select 1 as '16'")
     496          finally:  # restore saved limit
     497              self.cx.setlimit(category, saved_limit)
     498  
     499      def test_connection_bad_limit_category(self):
     500          msg = "'category' is out of bounds"
     501          cat = 1111
     502          self.assertRaisesRegex(sqlite.ProgrammingError, msg,
     503                                 self.cx.getlimit, cat)
     504          self.assertRaisesRegex(sqlite.ProgrammingError, msg,
     505                                 self.cx.setlimit, cat, 0)
     506  
     507      def test_connection_init_bad_isolation_level(self):
     508          msg = (
     509              "isolation_level string must be '', 'DEFERRED', 'IMMEDIATE', or "
     510              "'EXCLUSIVE'"
     511          )
     512          levels = (
     513              "BOGUS",
     514              " ",
     515              "DEFERRE",
     516              "IMMEDIAT",
     517              "EXCLUSIV",
     518              "DEFERREDS",
     519              "IMMEDIATES",
     520              "EXCLUSIVES",
     521          )
     522          for level in levels:
     523              with self.subTest(level=level):
     524                  with self.assertRaisesRegex(ValueError, msg):
     525                      memory_database(isolation_level=level)
     526                  with memory_database() as cx:
     527                      with self.assertRaisesRegex(ValueError, msg):
     528                          cx.isolation_level = level
     529                      # Check that the default level is not changed
     530                      self.assertEqual(cx.isolation_level, "")
     531  
     532      def test_connection_init_good_isolation_levels(self):
     533          for level in ("", "DEFERRED", "IMMEDIATE", "EXCLUSIVE", None):
     534              with self.subTest(level=level):
     535                  with memory_database(isolation_level=level) as cx:
     536                      self.assertEqual(cx.isolation_level, level)
     537                  with memory_database() as cx:
     538                      self.assertEqual(cx.isolation_level, "")
     539                      cx.isolation_level = level
     540                      self.assertEqual(cx.isolation_level, level)
     541  
     542      def test_connection_reinit(self):
     543          db = ":memory:"
     544          cx = sqlite.connect(db)
     545          cx.text_factory = bytes
     546          cx.row_factory = sqlite.Row
     547          cu = cx.cursor()
     548          cu.execute("create table foo (bar)")
     549          cu.executemany("insert into foo (bar) values (?)",
     550                         ((str(v),) for v in range(4)))
     551          cu.execute("select bar from foo")
     552  
     553          rows = [r for r in cu.fetchmany(2)]
     554          self.assertTrue(all(isinstance(r, sqlite.Row) for r in rows))
     555          self.assertEqual([r[0] for r in rows], [b"0", b"1"])
     556  
     557          cx.__init__(db)
     558          cx.execute("create table foo (bar)")
     559          cx.executemany("insert into foo (bar) values (?)",
     560                         ((v,) for v in ("a", "b", "c", "d")))
     561  
     562          # This uses the old database, old row factory, but new text factory
     563          rows = [r for r in cu.fetchall()]
     564          self.assertTrue(all(isinstance(r, sqlite.Row) for r in rows))
     565          self.assertEqual([r[0] for r in rows], ["2", "3"])
     566  
     567      def test_connection_bad_reinit(self):
     568          cx = sqlite.connect(":memory:")
     569          with cx:
     570              cx.execute("create table t(t)")
     571          with temp_dir() as db:
     572              self.assertRaisesRegex(sqlite.OperationalError,
     573                                     "unable to open database file",
     574                                     cx.__init__, db)
     575              self.assertRaisesRegex(sqlite.ProgrammingError,
     576                                     "Base Connection.__init__ not called",
     577                                     cx.executemany, "insert into t values(?)",
     578                                     ((v,) for v in range(3)))
     579  
     580      def test_connection_config(self):
     581          op = sqlite.SQLITE_DBCONFIG_ENABLE_FKEY
     582          with memory_database() as cx:
     583              with self.assertRaisesRegex(ValueError, "unknown"):
     584                  cx.getconfig(-1)
     585  
     586              # Toggle and verify.
     587              old = cx.getconfig(op)
     588              new = not old
     589              cx.setconfig(op, new)
     590              self.assertEqual(cx.getconfig(op), new)
     591  
     592              cx.setconfig(op)  # defaults to True
     593              self.assertTrue(cx.getconfig(op))
     594  
     595              # Check that foreign key support was actually enabled.
     596              with cx:
     597                  cx.executescript("""
     598                      create table t(t integer primary key);
     599                      create table u(u, foreign key(u) references t(t));
     600                  """)
     601              with self.assertRaisesRegex(sqlite.IntegrityError, "constraint"):
     602                  cx.execute("insert into u values(0)")
     603  
     604  
     605  class ESC[4;38;5;81mUninitialisedConnectionTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     606      def setUp(self):
     607          self.cx = sqlite.Connection.__new__(sqlite.Connection)
     608  
     609      def test_uninit_operations(self):
     610          funcs = (
     611              lambda: self.cx.isolation_level,
     612              lambda: self.cx.total_changes,
     613              lambda: self.cx.in_transaction,
     614              lambda: self.cx.iterdump(),
     615              lambda: self.cx.cursor(),
     616              lambda: self.cx.close(),
     617          )
     618          for func in funcs:
     619              with self.subTest(func=func):
     620                  self.assertRaisesRegex(sqlite.ProgrammingError,
     621                                         "Base Connection.__init__ not called",
     622                                         func)
     623  
     624  
     625  @unittest.skipUnless(hasattr(sqlite.Connection, "serialize"),
     626                       "Needs SQLite serialize API")
     627  class ESC[4;38;5;81mSerializeTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     628      def test_serialize_deserialize(self):
     629          with memory_database() as cx:
     630              with cx:
     631                  cx.execute("create table t(t)")
     632              data = cx.serialize()
     633  
     634              # Remove test table, verify that it was removed.
     635              with cx:
     636                  cx.execute("drop table t")
     637              regex = "no such table"
     638              with self.assertRaisesRegex(sqlite.OperationalError, regex):
     639                  cx.execute("select t from t")
     640  
     641              # Deserialize and verify that test table is restored.
     642              cx.deserialize(data)
     643              cx.execute("select t from t")
     644  
     645      def test_deserialize_wrong_args(self):
     646          dataset = (
     647              (BufferError, memoryview(b"blob")[::2]),
     648              (TypeError, []),
     649              (TypeError, 1),
     650              (TypeError, None),
     651          )
     652          for exc, arg in dataset:
     653              with self.subTest(exc=exc, arg=arg):
     654                  with memory_database() as cx:
     655                      self.assertRaises(exc, cx.deserialize, arg)
     656  
     657      def test_deserialize_corrupt_database(self):
     658          with memory_database() as cx:
     659              regex = "file is not a database"
     660              with self.assertRaisesRegex(sqlite.DatabaseError, regex):
     661                  cx.deserialize(b"\0\1\3")
     662                  # SQLite does not generate an error until you try to query the
     663                  # deserialized database.
     664                  cx.execute("create table fail(f)")
     665  
     666  
     667  class ESC[4;38;5;81mOpenTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     668      _sql = "create table test(id integer)"
     669  
     670      def test_open_with_path_like_object(self):
     671          """ Checks that we can successfully connect to a database using an object that
     672              is PathLike, i.e. has __fspath__(). """
     673          path = FakePath(TESTFN)
     674          self.addCleanup(unlink, path)
     675          self.assertFalse(os.path.exists(path))
     676          with contextlib.closing(sqlite.connect(path)) as cx:
     677              self.assertTrue(os.path.exists(path))
     678              cx.execute(self._sql)
     679  
     680      @unittest.skipIf(sys.platform == "win32", "skipped on Windows")
     681      @unittest.skipIf(sys.platform == "darwin", "skipped on macOS")
     682      @unittest.skipIf(is_emscripten or is_wasi, "not supported on Emscripten/WASI")
     683      @unittest.skipUnless(TESTFN_UNDECODABLE, "only works if there are undecodable paths")
     684      def test_open_with_undecodable_path(self):
     685          path = TESTFN_UNDECODABLE
     686          self.addCleanup(unlink, path)
     687          self.assertFalse(os.path.exists(path))
     688          with contextlib.closing(sqlite.connect(path)) as cx:
     689              self.assertTrue(os.path.exists(path))
     690              cx.execute(self._sql)
     691  
     692      def test_open_uri(self):
     693          path = TESTFN
     694          self.addCleanup(unlink, path)
     695          uri = "file:" + urllib.parse.quote(os.fsencode(path))
     696          self.assertFalse(os.path.exists(path))
     697          with contextlib.closing(sqlite.connect(uri, uri=True)) as cx:
     698              self.assertTrue(os.path.exists(path))
     699              cx.execute(self._sql)
     700  
     701      def test_open_unquoted_uri(self):
     702          path = TESTFN
     703          self.addCleanup(unlink, path)
     704          uri = "file:" + path
     705          self.assertFalse(os.path.exists(path))
     706          with contextlib.closing(sqlite.connect(uri, uri=True)) as cx:
     707              self.assertTrue(os.path.exists(path))
     708              cx.execute(self._sql)
     709  
     710      def test_open_uri_readonly(self):
     711          path = TESTFN
     712          self.addCleanup(unlink, path)
     713          uri = "file:" + urllib.parse.quote(os.fsencode(path)) + "?mode=ro"
     714          self.assertFalse(os.path.exists(path))
     715          # Cannot create new DB
     716          with self.assertRaises(sqlite.OperationalError):
     717              sqlite.connect(uri, uri=True)
     718          self.assertFalse(os.path.exists(path))
     719          sqlite.connect(path).close()
     720          self.assertTrue(os.path.exists(path))
     721          # Cannot modify new DB
     722          with contextlib.closing(sqlite.connect(uri, uri=True)) as cx:
     723              with self.assertRaises(sqlite.OperationalError):
     724                  cx.execute(self._sql)
     725  
     726      @unittest.skipIf(sys.platform == "win32", "skipped on Windows")
     727      @unittest.skipIf(sys.platform == "darwin", "skipped on macOS")
     728      @unittest.skipIf(is_emscripten or is_wasi, "not supported on Emscripten/WASI")
     729      @unittest.skipUnless(TESTFN_UNDECODABLE, "only works if there are undecodable paths")
     730      def test_open_undecodable_uri(self):
     731          path = TESTFN_UNDECODABLE
     732          self.addCleanup(unlink, path)
     733          uri = "file:" + urllib.parse.quote(path)
     734          self.assertFalse(os.path.exists(path))
     735          with contextlib.closing(sqlite.connect(uri, uri=True)) as cx:
     736              self.assertTrue(os.path.exists(path))
     737              cx.execute(self._sql)
     738  
     739      def test_factory_database_arg(self):
     740          def factory(database, *args, **kwargs):
     741              nonlocal database_arg
     742              database_arg = database
     743              return sqlite.Connection(":memory:", *args, **kwargs)
     744  
     745          for database in (TESTFN, os.fsencode(TESTFN),
     746                           FakePath(TESTFN), FakePath(os.fsencode(TESTFN))):
     747              database_arg = None
     748              sqlite.connect(database, factory=factory).close()
     749              self.assertEqual(database_arg, database)
     750  
     751      def test_database_keyword(self):
     752          with contextlib.closing(sqlite.connect(database=":memory:")) as cx:
     753              self.assertEqual(type(cx), sqlite.Connection)
     754  
     755  
     756  class ESC[4;38;5;81mCursorTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     757      def setUp(self):
     758          self.cx = sqlite.connect(":memory:")
     759          self.cu = self.cx.cursor()
     760          self.cu.execute(
     761              "create table test(id integer primary key, name text, "
     762              "income number, unique_test text unique)"
     763          )
     764          self.cu.execute("insert into test(name) values (?)", ("foo",))
     765  
     766      def tearDown(self):
     767          self.cu.close()
     768          self.cx.close()
     769  
     770      def test_execute_no_args(self):
     771          self.cu.execute("delete from test")
     772  
     773      def test_execute_illegal_sql(self):
     774          with self.assertRaises(sqlite.OperationalError):
     775              self.cu.execute("select asdf")
     776  
     777      def test_execute_multiple_statements(self):
     778          msg = "You can only execute one statement at a time"
     779          dataset = (
     780              "select 1; select 2",
     781              "select 1; // c++ comments are not allowed",
     782              "select 1; *not a comment",
     783              "select 1; -*not a comment",
     784              "select 1; /* */ a",
     785              "select 1; /**/a",
     786              "select 1; -",
     787              "select 1; /",
     788              "select 1; -\n- select 2",
     789              """select 1;
     790                 -- comment
     791                 select 2
     792              """,
     793          )
     794          for query in dataset:
     795              with self.subTest(query=query):
     796                  with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
     797                      self.cu.execute(query)
     798  
     799      def test_execute_with_appended_comments(self):
     800          dataset = (
     801              "select 1; -- foo bar",
     802              "select 1; --",
     803              "select 1; /*",  # Unclosed comments ending in \0 are skipped.
     804              """
     805              select 5+4;
     806  
     807              /*
     808              foo
     809              */
     810              """,
     811          )
     812          for query in dataset:
     813              with self.subTest(query=query):
     814                  self.cu.execute(query)
     815  
     816      def test_execute_wrong_sql_arg(self):
     817          with self.assertRaises(TypeError):
     818              self.cu.execute(42)
     819  
     820      def test_execute_arg_int(self):
     821          self.cu.execute("insert into test(id) values (?)", (42,))
     822  
     823      def test_execute_arg_float(self):
     824          self.cu.execute("insert into test(income) values (?)", (2500.32,))
     825  
     826      def test_execute_arg_string(self):
     827          self.cu.execute("insert into test(name) values (?)", ("Hugo",))
     828  
     829      def test_execute_arg_string_with_zero_byte(self):
     830          self.cu.execute("insert into test(name) values (?)", ("Hu\x00go",))
     831  
     832          self.cu.execute("select name from test where id=?", (self.cu.lastrowid,))
     833          row = self.cu.fetchone()
     834          self.assertEqual(row[0], "Hu\x00go")
     835  
     836      def test_execute_non_iterable(self):
     837          with self.assertRaises(sqlite.ProgrammingError) as cm:
     838              self.cu.execute("insert into test(id) values (?)", 42)
     839          self.assertEqual(str(cm.exception), 'parameters are of unsupported type')
     840  
     841      def test_execute_wrong_no_of_args1(self):
     842          # too many parameters
     843          with self.assertRaises(sqlite.ProgrammingError):
     844              self.cu.execute("insert into test(id) values (?)", (17, "Egon"))
     845  
     846      def test_execute_wrong_no_of_args2(self):
     847          # too little parameters
     848          with self.assertRaises(sqlite.ProgrammingError):
     849              self.cu.execute("insert into test(id) values (?)")
     850  
     851      def test_execute_wrong_no_of_args3(self):
     852          # no parameters, parameters are needed
     853          with self.assertRaises(sqlite.ProgrammingError):
     854              self.cu.execute("insert into test(id) values (?)")
     855  
     856      def test_execute_param_list(self):
     857          self.cu.execute("insert into test(name) values ('foo')")
     858          self.cu.execute("select name from test where name=?", ["foo"])
     859          row = self.cu.fetchone()
     860          self.assertEqual(row[0], "foo")
     861  
     862      def test_execute_param_sequence(self):
     863          class ESC[4;38;5;81mL:
     864              def __len__(self):
     865                  return 1
     866              def __getitem__(self, x):
     867                  assert x == 0
     868                  return "foo"
     869  
     870          self.cu.execute("insert into test(name) values ('foo')")
     871          self.cu.execute("select name from test where name=?", L())
     872          row = self.cu.fetchone()
     873          self.assertEqual(row[0], "foo")
     874  
     875      def test_execute_param_sequence_bad_len(self):
     876          # Issue41662: Error in __len__() was overridden with ProgrammingError.
     877          class ESC[4;38;5;81mL:
     878              def __len__(self):
     879                  1/0
     880              def __getitem__(slf, x):
     881                  raise AssertionError
     882  
     883          self.cu.execute("insert into test(name) values ('foo')")
     884          with self.assertRaises(ZeroDivisionError):
     885              self.cu.execute("select name from test where name=?", L())
     886  
     887      def test_execute_named_param_and_sequence(self):
     888          dataset = (
     889              ("select :a", (1,)),
     890              ("select :a, ?, ?", (1, 2, 3)),
     891              ("select ?, :b, ?", (1, 2, 3)),
     892              ("select ?, ?, :c", (1, 2, 3)),
     893              ("select :a, :b, ?", (1, 2, 3)),
     894          )
     895          msg = "Binding.*is a named parameter"
     896          for query, params in dataset:
     897              with self.subTest(query=query, params=params):
     898                  with self.assertWarnsRegex(DeprecationWarning, msg) as cm:
     899                      self.cu.execute(query, params)
     900                  self.assertEqual(cm.filename,  __file__)
     901  
     902      def test_execute_too_many_params(self):
     903          category = sqlite.SQLITE_LIMIT_VARIABLE_NUMBER
     904          msg = "too many SQL variables"
     905          with cx_limit(self.cx, category=category, limit=1):
     906              self.cu.execute("select * from test where id=?", (1,))
     907              with self.assertRaisesRegex(sqlite.OperationalError, msg):
     908                  self.cu.execute("select * from test where id!=? and id!=?",
     909                                  (1, 2))
     910  
     911      def test_execute_dict_mapping(self):
     912          self.cu.execute("insert into test(name) values ('foo')")
     913          self.cu.execute("select name from test where name=:name", {"name": "foo"})
     914          row = self.cu.fetchone()
     915          self.assertEqual(row[0], "foo")
     916  
     917      def test_execute_dict_mapping_mapping(self):
     918          class ESC[4;38;5;81mD(ESC[4;38;5;149mdict):
     919              def __missing__(self, key):
     920                  return "foo"
     921  
     922          self.cu.execute("insert into test(name) values ('foo')")
     923          self.cu.execute("select name from test where name=:name", D())
     924          row = self.cu.fetchone()
     925          self.assertEqual(row[0], "foo")
     926  
     927      def test_execute_dict_mapping_too_little_args(self):
     928          self.cu.execute("insert into test(name) values ('foo')")
     929          with self.assertRaises(sqlite.ProgrammingError):
     930              self.cu.execute("select name from test where name=:name and id=:id", {"name": "foo"})
     931  
     932      def test_execute_dict_mapping_no_args(self):
     933          self.cu.execute("insert into test(name) values ('foo')")
     934          with self.assertRaises(sqlite.ProgrammingError):
     935              self.cu.execute("select name from test where name=:name")
     936  
     937      def test_execute_dict_mapping_unnamed(self):
     938          self.cu.execute("insert into test(name) values ('foo')")
     939          with self.assertRaises(sqlite.ProgrammingError):
     940              self.cu.execute("select name from test where name=?", {"name": "foo"})
     941  
     942      def test_close(self):
     943          self.cu.close()
     944  
     945      def test_rowcount_execute(self):
     946          self.cu.execute("delete from test")
     947          self.cu.execute("insert into test(name) values ('foo')")
     948          self.cu.execute("insert into test(name) values ('foo')")
     949          self.cu.execute("update test set name='bar'")
     950          self.assertEqual(self.cu.rowcount, 2)
     951  
     952      def test_rowcount_select(self):
     953          """
     954          pysqlite does not know the rowcount of SELECT statements, because we
     955          don't fetch all rows after executing the select statement. The rowcount
     956          has thus to be -1.
     957          """
     958          self.cu.execute("select 5 union select 6")
     959          self.assertEqual(self.cu.rowcount, -1)
     960  
     961      def test_rowcount_executemany(self):
     962          self.cu.execute("delete from test")
     963          self.cu.executemany("insert into test(name) values (?)", [(1,), (2,), (3,)])
     964          self.assertEqual(self.cu.rowcount, 3)
     965  
     966      @unittest.skipIf(sqlite.sqlite_version_info < (3, 35, 0),
     967                       "Requires SQLite 3.35.0 or newer")
     968      def test_rowcount_update_returning(self):
     969          # gh-93421: rowcount is updated correctly for UPDATE...RETURNING queries
     970          self.cu.execute("update test set name='bar' where name='foo' returning 1")
     971          self.assertEqual(self.cu.fetchone()[0], 1)
     972          self.assertEqual(self.cu.rowcount, 1)
     973  
     974      def test_rowcount_prefixed_with_comment(self):
     975          # gh-79579: rowcount is updated even if query is prefixed with comments
     976          self.cu.execute("""
     977              -- foo
     978              insert into test(name) values ('foo'), ('foo')
     979          """)
     980          self.assertEqual(self.cu.rowcount, 2)
     981          self.cu.execute("""
     982              /* -- messy *r /* /* ** *- *--
     983              */
     984              /* one more */ insert into test(name) values ('messy')
     985          """)
     986          self.assertEqual(self.cu.rowcount, 1)
     987          self.cu.execute("/* bar */ update test set name='bar' where name='foo'")
     988          self.assertEqual(self.cu.rowcount, 3)
     989  
     990      def test_rowcount_vaccuum(self):
     991          data = ((1,), (2,), (3,))
     992          self.cu.executemany("insert into test(income) values(?)", data)
     993          self.assertEqual(self.cu.rowcount, 3)
     994          self.cx.commit()
     995          self.cu.execute("vacuum")
     996          self.assertEqual(self.cu.rowcount, -1)
     997  
     998      def test_total_changes(self):
     999          self.cu.execute("insert into test(name) values ('foo')")
    1000          self.cu.execute("insert into test(name) values ('foo')")
    1001          self.assertLess(2, self.cx.total_changes, msg='total changes reported wrong value')
    1002  
    1003      # Checks for executemany:
    1004      # Sequences are required by the DB-API, iterators
    1005      # enhancements in pysqlite.
    1006  
    1007      def test_execute_many_sequence(self):
    1008          self.cu.executemany("insert into test(income) values (?)", [(x,) for x in range(100, 110)])
    1009  
    1010      def test_execute_many_iterator(self):
    1011          class ESC[4;38;5;81mMyIter:
    1012              def __init__(self):
    1013                  self.value = 5
    1014  
    1015              def __iter__(self):
    1016                  return self
    1017  
    1018              def __next__(self):
    1019                  if self.value == 10:
    1020                      raise StopIteration
    1021                  else:
    1022                      self.value += 1
    1023                      return (self.value,)
    1024  
    1025          self.cu.executemany("insert into test(income) values (?)", MyIter())
    1026  
    1027      def test_execute_many_generator(self):
    1028          def mygen():
    1029              for i in range(5):
    1030                  yield (i,)
    1031  
    1032          self.cu.executemany("insert into test(income) values (?)", mygen())
    1033  
    1034      def test_execute_many_wrong_sql_arg(self):
    1035          with self.assertRaises(TypeError):
    1036              self.cu.executemany(42, [(3,)])
    1037  
    1038      def test_execute_many_select(self):
    1039          with self.assertRaises(sqlite.ProgrammingError):
    1040              self.cu.executemany("select ?", [(3,)])
    1041  
    1042      def test_execute_many_not_iterable(self):
    1043          with self.assertRaises(TypeError):
    1044              self.cu.executemany("insert into test(income) values (?)", 42)
    1045  
    1046      def test_fetch_iter(self):
    1047          # Optional DB-API extension.
    1048          self.cu.execute("delete from test")
    1049          self.cu.execute("insert into test(id) values (?)", (5,))
    1050          self.cu.execute("insert into test(id) values (?)", (6,))
    1051          self.cu.execute("select id from test order by id")
    1052          lst = []
    1053          for row in self.cu:
    1054              lst.append(row[0])
    1055          self.assertEqual(lst[0], 5)
    1056          self.assertEqual(lst[1], 6)
    1057  
    1058      def test_fetchone(self):
    1059          self.cu.execute("select name from test")
    1060          row = self.cu.fetchone()
    1061          self.assertEqual(row[0], "foo")
    1062          row = self.cu.fetchone()
    1063          self.assertEqual(row, None)
    1064  
    1065      def test_fetchone_no_statement(self):
    1066          cur = self.cx.cursor()
    1067          row = cur.fetchone()
    1068          self.assertEqual(row, None)
    1069  
    1070      def test_array_size(self):
    1071          # must default to 1
    1072          self.assertEqual(self.cu.arraysize, 1)
    1073  
    1074          # now set to 2
    1075          self.cu.arraysize = 2
    1076  
    1077          # now make the query return 3 rows
    1078          self.cu.execute("delete from test")
    1079          self.cu.execute("insert into test(name) values ('A')")
    1080          self.cu.execute("insert into test(name) values ('B')")
    1081          self.cu.execute("insert into test(name) values ('C')")
    1082          self.cu.execute("select name from test")
    1083          res = self.cu.fetchmany()
    1084  
    1085          self.assertEqual(len(res), 2)
    1086  
    1087      def test_fetchmany(self):
    1088          self.cu.execute("select name from test")
    1089          res = self.cu.fetchmany(100)
    1090          self.assertEqual(len(res), 1)
    1091          res = self.cu.fetchmany(100)
    1092          self.assertEqual(res, [])
    1093  
    1094      def test_fetchmany_kw_arg(self):
    1095          """Checks if fetchmany works with keyword arguments"""
    1096          self.cu.execute("select name from test")
    1097          res = self.cu.fetchmany(size=100)
    1098          self.assertEqual(len(res), 1)
    1099  
    1100      def test_fetchall(self):
    1101          self.cu.execute("select name from test")
    1102          res = self.cu.fetchall()
    1103          self.assertEqual(len(res), 1)
    1104          res = self.cu.fetchall()
    1105          self.assertEqual(res, [])
    1106  
    1107      def test_setinputsizes(self):
    1108          self.cu.setinputsizes([3, 4, 5])
    1109  
    1110      def test_setoutputsize(self):
    1111          self.cu.setoutputsize(5, 0)
    1112  
    1113      def test_setoutputsize_no_column(self):
    1114          self.cu.setoutputsize(42)
    1115  
    1116      def test_cursor_connection(self):
    1117          # Optional DB-API extension.
    1118          self.assertEqual(self.cu.connection, self.cx)
    1119  
    1120      def test_wrong_cursor_callable(self):
    1121          with self.assertRaises(TypeError):
    1122              def f(): pass
    1123              cur = self.cx.cursor(f)
    1124  
    1125      def test_cursor_wrong_class(self):
    1126          class ESC[4;38;5;81mFoo: pass
    1127          foo = Foo()
    1128          with self.assertRaises(TypeError):
    1129              cur = sqlite.Cursor(foo)
    1130  
    1131      def test_last_row_id_on_replace(self):
    1132          """
    1133          INSERT OR REPLACE and REPLACE INTO should produce the same behavior.
    1134          """
    1135          sql = '{} INTO test(id, unique_test) VALUES (?, ?)'
    1136          for statement in ('INSERT OR REPLACE', 'REPLACE'):
    1137              with self.subTest(statement=statement):
    1138                  self.cu.execute(sql.format(statement), (1, 'foo'))
    1139                  self.assertEqual(self.cu.lastrowid, 1)
    1140  
    1141      def test_last_row_id_on_ignore(self):
    1142          self.cu.execute(
    1143              "insert or ignore into test(unique_test) values (?)",
    1144              ('test',))
    1145          self.assertEqual(self.cu.lastrowid, 2)
    1146          self.cu.execute(
    1147              "insert or ignore into test(unique_test) values (?)",
    1148              ('test',))
    1149          self.assertEqual(self.cu.lastrowid, 2)
    1150  
    1151      def test_last_row_id_insert_o_r(self):
    1152          results = []
    1153          for statement in ('FAIL', 'ABORT', 'ROLLBACK'):
    1154              sql = 'INSERT OR {} INTO test(unique_test) VALUES (?)'
    1155              with self.subTest(statement='INSERT OR {}'.format(statement)):
    1156                  self.cu.execute(sql.format(statement), (statement,))
    1157                  results.append((statement, self.cu.lastrowid))
    1158                  with self.assertRaises(sqlite.IntegrityError):
    1159                      self.cu.execute(sql.format(statement), (statement,))
    1160                  results.append((statement, self.cu.lastrowid))
    1161          expected = [
    1162              ('FAIL', 2), ('FAIL', 2),
    1163              ('ABORT', 3), ('ABORT', 3),
    1164              ('ROLLBACK', 4), ('ROLLBACK', 4),
    1165          ]
    1166          self.assertEqual(results, expected)
    1167  
    1168      def test_column_count(self):
    1169          # Check that column count is updated correctly for cached statements
    1170          select = "select * from test"
    1171          res = self.cu.execute(select)
    1172          old_count = len(res.description)
    1173          # Add a new column and execute the cached select query again
    1174          self.cu.execute("alter table test add newcol")
    1175          res = self.cu.execute(select)
    1176          new_count = len(res.description)
    1177          self.assertEqual(new_count - old_count, 1)
    1178  
    1179      def test_same_query_in_multiple_cursors(self):
    1180          cursors = [self.cx.execute("select 1") for _ in range(3)]
    1181          for cu in cursors:
    1182              self.assertEqual(cu.fetchall(), [(1,)])
    1183  
    1184  
    1185  class ESC[4;38;5;81mBlobTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1186      def setUp(self):
    1187          self.cx = sqlite.connect(":memory:")
    1188          self.cx.execute("create table test(b blob)")
    1189          self.data = b"this blob data string is exactly fifty bytes long!"
    1190          self.cx.execute("insert into test(b) values (?)", (self.data,))
    1191          self.blob = self.cx.blobopen("test", "b", 1)
    1192  
    1193      def tearDown(self):
    1194          self.blob.close()
    1195          self.cx.close()
    1196  
    1197      def test_blob_is_a_blob(self):
    1198          self.assertIsInstance(self.blob, sqlite.Blob)
    1199  
    1200      def test_blob_seek_and_tell(self):
    1201          self.blob.seek(10)
    1202          self.assertEqual(self.blob.tell(), 10)
    1203  
    1204          self.blob.seek(10, SEEK_SET)
    1205          self.assertEqual(self.blob.tell(), 10)
    1206  
    1207          self.blob.seek(10, SEEK_CUR)
    1208          self.assertEqual(self.blob.tell(), 20)
    1209  
    1210          self.blob.seek(-10, SEEK_END)
    1211          self.assertEqual(self.blob.tell(), 40)
    1212  
    1213      def test_blob_seek_error(self):
    1214          msg_oor = "offset out of blob range"
    1215          msg_orig = "'origin' should be os.SEEK_SET, os.SEEK_CUR, or os.SEEK_END"
    1216          msg_of = "seek offset results in overflow"
    1217  
    1218          dataset = (
    1219              (ValueError, msg_oor, lambda: self.blob.seek(1000)),
    1220              (ValueError, msg_oor, lambda: self.blob.seek(-10)),
    1221              (ValueError, msg_orig, lambda: self.blob.seek(10, -1)),
    1222              (ValueError, msg_orig, lambda: self.blob.seek(10, 3)),
    1223          )
    1224          for exc, msg, fn in dataset:
    1225              with self.subTest(exc=exc, msg=msg, fn=fn):
    1226                  self.assertRaisesRegex(exc, msg, fn)
    1227  
    1228          # Force overflow errors
    1229          self.blob.seek(1, SEEK_SET)
    1230          with self.assertRaisesRegex(OverflowError, msg_of):
    1231              self.blob.seek(INT_MAX, SEEK_CUR)
    1232          with self.assertRaisesRegex(OverflowError, msg_of):
    1233              self.blob.seek(INT_MAX, SEEK_END)
    1234  
    1235      def test_blob_read(self):
    1236          buf = self.blob.read()
    1237          self.assertEqual(buf, self.data)
    1238  
    1239      def test_blob_read_oversized(self):
    1240          buf = self.blob.read(len(self.data) * 2)
    1241          self.assertEqual(buf, self.data)
    1242  
    1243      def test_blob_read_advance_offset(self):
    1244          n = 10
    1245          buf = self.blob.read(n)
    1246          self.assertEqual(buf, self.data[:n])
    1247          self.assertEqual(self.blob.tell(), n)
    1248  
    1249      def test_blob_read_at_offset(self):
    1250          self.blob.seek(10)
    1251          self.assertEqual(self.blob.read(10), self.data[10:20])
    1252  
    1253      def test_blob_read_error_row_changed(self):
    1254          self.cx.execute("update test set b='aaaa' where rowid=1")
    1255          with self.assertRaises(sqlite.OperationalError):
    1256              self.blob.read()
    1257  
    1258      def test_blob_write(self):
    1259          new_data = b"new data".ljust(50)
    1260          self.blob.write(new_data)
    1261          row = self.cx.execute("select b from test").fetchone()
    1262          self.assertEqual(row[0], new_data)
    1263  
    1264      def test_blob_write_at_offset(self):
    1265          new_data = b"c" * 25
    1266          self.blob.seek(25)
    1267          self.blob.write(new_data)
    1268          row = self.cx.execute("select b from test").fetchone()
    1269          self.assertEqual(row[0], self.data[:25] + new_data)
    1270  
    1271      def test_blob_write_advance_offset(self):
    1272          self.blob.write(b"d"*10)
    1273          self.assertEqual(self.blob.tell(), 10)
    1274  
    1275      def test_blob_write_error_length(self):
    1276          with self.assertRaisesRegex(ValueError, "data longer than blob"):
    1277              self.blob.write(b"a" * 1000)
    1278  
    1279          self.blob.seek(0, SEEK_SET)
    1280          n = len(self.blob)
    1281          self.blob.write(b"a" * (n-1))
    1282          self.blob.write(b"a")
    1283          with self.assertRaisesRegex(ValueError, "data longer than blob"):
    1284              self.blob.write(b"a")
    1285  
    1286      def test_blob_write_error_row_changed(self):
    1287          self.cx.execute("update test set b='aaaa' where rowid=1")
    1288          with self.assertRaises(sqlite.OperationalError):
    1289              self.blob.write(b"aaa")
    1290  
    1291      def test_blob_write_error_readonly(self):
    1292          ro_blob = self.cx.blobopen("test", "b", 1, readonly=True)
    1293          with self.assertRaisesRegex(sqlite.OperationalError, "readonly"):
    1294              ro_blob.write(b"aaa")
    1295          ro_blob.close()
    1296  
    1297      def test_blob_open_error(self):
    1298          dataset = (
    1299              (("test", "b", 1), {"name": "notexisting"}),
    1300              (("notexisting", "b", 1), {}),
    1301              (("test", "notexisting", 1), {}),
    1302              (("test", "b", 2), {}),
    1303          )
    1304          regex = "no such"
    1305          for args, kwds in dataset:
    1306              with self.subTest(args=args, kwds=kwds):
    1307                  with self.assertRaisesRegex(sqlite.OperationalError, regex):
    1308                      self.cx.blobopen(*args, **kwds)
    1309  
    1310      def test_blob_length(self):
    1311          self.assertEqual(len(self.blob), 50)
    1312  
    1313      def test_blob_get_item(self):
    1314          self.assertEqual(self.blob[5], ord("b"))
    1315          self.assertEqual(self.blob[6], ord("l"))
    1316          self.assertEqual(self.blob[7], ord("o"))
    1317          self.assertEqual(self.blob[8], ord("b"))
    1318          self.assertEqual(self.blob[-1], ord("!"))
    1319  
    1320      def test_blob_set_item(self):
    1321          self.blob[0] = ord("b")
    1322          expected = b"b" + self.data[1:]
    1323          actual = self.cx.execute("select b from test").fetchone()[0]
    1324          self.assertEqual(actual, expected)
    1325  
    1326      def test_blob_set_item_with_offset(self):
    1327          self.blob.seek(0, SEEK_END)
    1328          self.assertEqual(self.blob.read(), b"")  # verify that we're at EOB
    1329          self.blob[0] = ord("T")
    1330          self.blob[-1] = ord(".")
    1331          self.blob.seek(0, SEEK_SET)
    1332          expected = b"This blob data string is exactly fifty bytes long."
    1333          self.assertEqual(self.blob.read(), expected)
    1334  
    1335      def test_blob_set_slice_buffer_object(self):
    1336          from array import array
    1337          self.blob[0:5] = memoryview(b"12345")
    1338          self.assertEqual(self.blob[0:5], b"12345")
    1339  
    1340          self.blob[0:5] = bytearray(b"23456")
    1341          self.assertEqual(self.blob[0:5], b"23456")
    1342  
    1343          self.blob[0:5] = array("b", [1, 2, 3, 4, 5])
    1344          self.assertEqual(self.blob[0:5], b"\x01\x02\x03\x04\x05")
    1345  
    1346      def test_blob_set_item_negative_index(self):
    1347          self.blob[-1] = 255
    1348          self.assertEqual(self.blob[-1], 255)
    1349  
    1350      def test_blob_get_slice(self):
    1351          self.assertEqual(self.blob[5:14], b"blob data")
    1352  
    1353      def test_blob_get_empty_slice(self):
    1354          self.assertEqual(self.blob[5:5], b"")
    1355  
    1356      def test_blob_get_slice_negative_index(self):
    1357          self.assertEqual(self.blob[5:-5], self.data[5:-5])
    1358  
    1359      def test_blob_get_slice_with_skip(self):
    1360          self.assertEqual(self.blob[0:10:2], b"ti lb")
    1361  
    1362      def test_blob_set_slice(self):
    1363          self.blob[0:5] = b"12345"
    1364          expected = b"12345" + self.data[5:]
    1365          actual = self.cx.execute("select b from test").fetchone()[0]
    1366          self.assertEqual(actual, expected)
    1367  
    1368      def test_blob_set_empty_slice(self):
    1369          self.blob[0:0] = b""
    1370          self.assertEqual(self.blob[:], self.data)
    1371  
    1372      def test_blob_set_slice_with_skip(self):
    1373          self.blob[0:10:2] = b"12345"
    1374          actual = self.cx.execute("select b from test").fetchone()[0]
    1375          expected = b"1h2s3b4o5 " + self.data[10:]
    1376          self.assertEqual(actual, expected)
    1377  
    1378      def test_blob_mapping_invalid_index_type(self):
    1379          msg = "indices must be integers"
    1380          with self.assertRaisesRegex(TypeError, msg):
    1381              self.blob[5:5.5]
    1382          with self.assertRaisesRegex(TypeError, msg):
    1383              self.blob[1.5]
    1384          with self.assertRaisesRegex(TypeError, msg):
    1385              self.blob["a"] = b"b"
    1386  
    1387      def test_blob_get_item_error(self):
    1388          dataset = [len(self.blob), 105, -105]
    1389          for idx in dataset:
    1390              with self.subTest(idx=idx):
    1391                  with self.assertRaisesRegex(IndexError, "index out of range"):
    1392                      self.blob[idx]
    1393          with self.assertRaisesRegex(IndexError, "cannot fit 'int'"):
    1394              self.blob[ULLONG_MAX]
    1395  
    1396          # Provoke read error
    1397          self.cx.execute("update test set b='aaaa' where rowid=1")
    1398          with self.assertRaises(sqlite.OperationalError):
    1399              self.blob[0]
    1400  
    1401      def test_blob_set_item_error(self):
    1402          with self.assertRaisesRegex(TypeError, "cannot be interpreted"):
    1403              self.blob[0] = b"multiple"
    1404          with self.assertRaisesRegex(TypeError, "cannot be interpreted"):
    1405              self.blob[0] = b"1"
    1406          with self.assertRaisesRegex(TypeError, "cannot be interpreted"):
    1407              self.blob[0] = bytearray(b"1")
    1408          with self.assertRaisesRegex(TypeError, "doesn't support.*deletion"):
    1409              del self.blob[0]
    1410          with self.assertRaisesRegex(IndexError, "Blob index out of range"):
    1411              self.blob[1000] = 0
    1412          with self.assertRaisesRegex(ValueError, "must be in range"):
    1413              self.blob[0] = -1
    1414          with self.assertRaisesRegex(ValueError, "must be in range"):
    1415              self.blob[0] = 256
    1416          # Overflow errors are overridden with ValueError
    1417          with self.assertRaisesRegex(ValueError, "must be in range"):
    1418              self.blob[0] = 2**65
    1419  
    1420      def test_blob_set_slice_error(self):
    1421          with self.assertRaisesRegex(IndexError, "wrong size"):
    1422              self.blob[5:10] = b"a"
    1423          with self.assertRaisesRegex(IndexError, "wrong size"):
    1424              self.blob[5:10] = b"a" * 1000
    1425          with self.assertRaisesRegex(TypeError, "doesn't support.*deletion"):
    1426              del self.blob[5:10]
    1427          with self.assertRaisesRegex(ValueError, "step cannot be zero"):
    1428              self.blob[5:10:0] = b"12345"
    1429          with self.assertRaises(BufferError):
    1430              self.blob[5:10] = memoryview(b"abcde")[::2]
    1431  
    1432      def test_blob_sequence_not_supported(self):
    1433          with self.assertRaisesRegex(TypeError, "unsupported operand"):
    1434              self.blob + self.blob
    1435          with self.assertRaisesRegex(TypeError, "unsupported operand"):
    1436              self.blob * 5
    1437          with self.assertRaisesRegex(TypeError, "is not iterable"):
    1438              b"a" in self.blob
    1439  
    1440      def test_blob_context_manager(self):
    1441          data = b"a" * 50
    1442          with self.cx.blobopen("test", "b", 1) as blob:
    1443              blob.write(data)
    1444          actual = self.cx.execute("select b from test").fetchone()[0]
    1445          self.assertEqual(actual, data)
    1446  
    1447          # Check that __exit__ closed the blob
    1448          with self.assertRaisesRegex(sqlite.ProgrammingError, "closed blob"):
    1449              blob.read()
    1450  
    1451      def test_blob_context_manager_reraise_exceptions(self):
    1452          class ESC[4;38;5;81mDummyException(ESC[4;38;5;149mException):
    1453              pass
    1454          with self.assertRaisesRegex(DummyException, "reraised"):
    1455              with self.cx.blobopen("test", "b", 1) as blob:
    1456                  raise DummyException("reraised")
    1457  
    1458  
    1459      def test_blob_closed(self):
    1460          with memory_database() as cx:
    1461              cx.execute("create table test(b blob)")
    1462              cx.execute("insert into test values (zeroblob(100))")
    1463              blob = cx.blobopen("test", "b", 1)
    1464              blob.close()
    1465  
    1466              msg = "Cannot operate on a closed blob"
    1467              with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
    1468                  blob.read()
    1469              with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
    1470                  blob.write(b"")
    1471              with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
    1472                  blob.seek(0)
    1473              with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
    1474                  blob.tell()
    1475              with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
    1476                  blob.__enter__()
    1477              with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
    1478                  blob.__exit__(None, None, None)
    1479              with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
    1480                  len(blob)
    1481              with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
    1482                  blob[0]
    1483              with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
    1484                  blob[0:1]
    1485              with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
    1486                  blob[0] = b""
    1487  
    1488      def test_blob_closed_db_read(self):
    1489          with memory_database() as cx:
    1490              cx.execute("create table test(b blob)")
    1491              cx.execute("insert into test(b) values (zeroblob(100))")
    1492              blob = cx.blobopen("test", "b", 1)
    1493              cx.close()
    1494              self.assertRaisesRegex(sqlite.ProgrammingError,
    1495                                     "Cannot operate on a closed database",
    1496                                     blob.read)
    1497  
    1498      def test_blob_32bit_rowid(self):
    1499          # gh-100370: we should not get an OverflowError for 32-bit rowids
    1500          with memory_database() as cx:
    1501              rowid = 2**32
    1502              cx.execute("create table t(t blob)")
    1503              cx.execute("insert into t(rowid, t) values (?, zeroblob(1))", (rowid,))
    1504              cx.blobopen('t', 't', rowid)
    1505  
    1506  
    1507  @threading_helper.requires_working_threading()
    1508  class ESC[4;38;5;81mThreadTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1509      def setUp(self):
    1510          self.con = sqlite.connect(":memory:")
    1511          self.cur = self.con.cursor()
    1512          self.cur.execute("create table test(name text, b blob)")
    1513          self.cur.execute("insert into test values('blob', zeroblob(1))")
    1514  
    1515      def tearDown(self):
    1516          self.cur.close()
    1517          self.con.close()
    1518  
    1519      @threading_helper.reap_threads
    1520      def _run_test(self, fn, *args, **kwds):
    1521          def run(err):
    1522              try:
    1523                  fn(*args, **kwds)
    1524                  err.append("did not raise ProgrammingError")
    1525              except sqlite.ProgrammingError:
    1526                  pass
    1527              except:
    1528                  err.append("raised wrong exception")
    1529  
    1530          err = []
    1531          t = threading.Thread(target=run, kwargs={"err": err})
    1532          t.start()
    1533          t.join()
    1534          if err:
    1535              self.fail("\n".join(err))
    1536  
    1537      def test_check_connection_thread(self):
    1538          fns = [
    1539              lambda: self.con.cursor(),
    1540              lambda: self.con.commit(),
    1541              lambda: self.con.rollback(),
    1542              lambda: self.con.close(),
    1543              lambda: self.con.set_trace_callback(None),
    1544              lambda: self.con.set_authorizer(None),
    1545              lambda: self.con.create_collation("foo", None),
    1546              lambda: self.con.setlimit(sqlite.SQLITE_LIMIT_LENGTH, -1),
    1547              lambda: self.con.getlimit(sqlite.SQLITE_LIMIT_LENGTH),
    1548              lambda: self.con.blobopen("test", "b", 1),
    1549          ]
    1550          if hasattr(sqlite.Connection, "serialize"):
    1551              fns.append(lambda: self.con.serialize())
    1552              fns.append(lambda: self.con.deserialize(b""))
    1553          if sqlite.sqlite_version_info >= (3, 25, 0):
    1554              fns.append(lambda: self.con.create_window_function("foo", 0, None))
    1555  
    1556          for fn in fns:
    1557              with self.subTest(fn=fn):
    1558                  self._run_test(fn)
    1559  
    1560      def test_check_cursor_thread(self):
    1561          fns = [
    1562              lambda: self.cur.execute("insert into test(name) values('a')"),
    1563              lambda: self.cur.close(),
    1564              lambda: self.cur.execute("select name from test"),
    1565              lambda: self.cur.fetchone(),
    1566          ]
    1567          for fn in fns:
    1568              with self.subTest(fn=fn):
    1569                  self._run_test(fn)
    1570  
    1571  
    1572      @threading_helper.reap_threads
    1573      def test_dont_check_same_thread(self):
    1574          def run(con, err):
    1575              try:
    1576                  con.execute("select 1")
    1577              except sqlite.Error:
    1578                  err.append("multi-threading not allowed")
    1579  
    1580          con = sqlite.connect(":memory:", check_same_thread=False)
    1581          err = []
    1582          t = threading.Thread(target=run, kwargs={"con": con, "err": err})
    1583          t.start()
    1584          t.join()
    1585          self.assertEqual(len(err), 0, "\n".join(err))
    1586  
    1587  
    1588  class ESC[4;38;5;81mConstructorTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1589      def test_date(self):
    1590          d = sqlite.Date(2004, 10, 28)
    1591  
    1592      def test_time(self):
    1593          t = sqlite.Time(12, 39, 35)
    1594  
    1595      def test_timestamp(self):
    1596          ts = sqlite.Timestamp(2004, 10, 28, 12, 39, 35)
    1597  
    1598      def test_date_from_ticks(self):
    1599          d = sqlite.DateFromTicks(42)
    1600  
    1601      def test_time_from_ticks(self):
    1602          t = sqlite.TimeFromTicks(42)
    1603  
    1604      def test_timestamp_from_ticks(self):
    1605          ts = sqlite.TimestampFromTicks(42)
    1606  
    1607      def test_binary(self):
    1608          b = sqlite.Binary(b"\0'")
    1609  
    1610  class ESC[4;38;5;81mExtensionTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1611      def test_script_string_sql(self):
    1612          con = sqlite.connect(":memory:")
    1613          cur = con.cursor()
    1614          cur.executescript("""
    1615              -- bla bla
    1616              /* a stupid comment */
    1617              create table a(i);
    1618              insert into a(i) values (5);
    1619              """)
    1620          cur.execute("select i from a")
    1621          res = cur.fetchone()[0]
    1622          self.assertEqual(res, 5)
    1623  
    1624      def test_script_syntax_error(self):
    1625          con = sqlite.connect(":memory:")
    1626          cur = con.cursor()
    1627          with self.assertRaises(sqlite.OperationalError):
    1628              cur.executescript("create table test(x); asdf; create table test2(x)")
    1629  
    1630      def test_script_error_normal(self):
    1631          con = sqlite.connect(":memory:")
    1632          cur = con.cursor()
    1633          with self.assertRaises(sqlite.OperationalError):
    1634              cur.executescript("create table test(sadfsadfdsa); select foo from hurz;")
    1635  
    1636      def test_cursor_executescript_as_bytes(self):
    1637          con = sqlite.connect(":memory:")
    1638          cur = con.cursor()
    1639          with self.assertRaises(TypeError):
    1640              cur.executescript(b"create table test(foo); insert into test(foo) values (5);")
    1641  
    1642      def test_cursor_executescript_with_null_characters(self):
    1643          con = sqlite.connect(":memory:")
    1644          cur = con.cursor()
    1645          with self.assertRaises(ValueError):
    1646              cur.executescript("""
    1647                  create table a(i);\0
    1648                  insert into a(i) values (5);
    1649                  """)
    1650  
    1651      def test_cursor_executescript_with_surrogates(self):
    1652          con = sqlite.connect(":memory:")
    1653          cur = con.cursor()
    1654          with self.assertRaises(UnicodeEncodeError):
    1655              cur.executescript("""
    1656                  create table a(s);
    1657                  insert into a(s) values ('\ud8ff');
    1658                  """)
    1659  
    1660      def test_cursor_executescript_too_large_script(self):
    1661          msg = "query string is too large"
    1662          with memory_database() as cx, cx_limit(cx) as lim:
    1663              cx.executescript("select 'almost too large'".ljust(lim))
    1664              with self.assertRaisesRegex(sqlite.DataError, msg):
    1665                  cx.executescript("select 'too large'".ljust(lim+1))
    1666  
    1667      def test_cursor_executescript_tx_control(self):
    1668          con = sqlite.connect(":memory:")
    1669          con.execute("begin")
    1670          self.assertTrue(con.in_transaction)
    1671          con.executescript("select 1")
    1672          self.assertFalse(con.in_transaction)
    1673  
    1674      def test_connection_execute(self):
    1675          con = sqlite.connect(":memory:")
    1676          result = con.execute("select 5").fetchone()[0]
    1677          self.assertEqual(result, 5, "Basic test of Connection.execute")
    1678  
    1679      def test_connection_executemany(self):
    1680          con = sqlite.connect(":memory:")
    1681          con.execute("create table test(foo)")
    1682          con.executemany("insert into test(foo) values (?)", [(3,), (4,)])
    1683          result = con.execute("select foo from test order by foo").fetchall()
    1684          self.assertEqual(result[0][0], 3, "Basic test of Connection.executemany")
    1685          self.assertEqual(result[1][0], 4, "Basic test of Connection.executemany")
    1686  
    1687      def test_connection_executescript(self):
    1688          con = sqlite.connect(":memory:")
    1689          con.executescript("create table test(foo); insert into test(foo) values (5);")
    1690          result = con.execute("select foo from test").fetchone()[0]
    1691          self.assertEqual(result, 5, "Basic test of Connection.executescript")
    1692  
    1693  class ESC[4;38;5;81mClosedConTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1694      def test_closed_con_cursor(self):
    1695          con = sqlite.connect(":memory:")
    1696          con.close()
    1697          with self.assertRaises(sqlite.ProgrammingError):
    1698              cur = con.cursor()
    1699  
    1700      def test_closed_con_commit(self):
    1701          con = sqlite.connect(":memory:")
    1702          con.close()
    1703          with self.assertRaises(sqlite.ProgrammingError):
    1704              con.commit()
    1705  
    1706      def test_closed_con_rollback(self):
    1707          con = sqlite.connect(":memory:")
    1708          con.close()
    1709          with self.assertRaises(sqlite.ProgrammingError):
    1710              con.rollback()
    1711  
    1712      def test_closed_cur_execute(self):
    1713          con = sqlite.connect(":memory:")
    1714          cur = con.cursor()
    1715          con.close()
    1716          with self.assertRaises(sqlite.ProgrammingError):
    1717              cur.execute("select 4")
    1718  
    1719      def test_closed_create_function(self):
    1720          con = sqlite.connect(":memory:")
    1721          con.close()
    1722          def f(x): return 17
    1723          with self.assertRaises(sqlite.ProgrammingError):
    1724              con.create_function("foo", 1, f)
    1725  
    1726      def test_closed_create_aggregate(self):
    1727          con = sqlite.connect(":memory:")
    1728          con.close()
    1729          class ESC[4;38;5;81mAgg:
    1730              def __init__(self):
    1731                  pass
    1732              def step(self, x):
    1733                  pass
    1734              def finalize(self):
    1735                  return 17
    1736          with self.assertRaises(sqlite.ProgrammingError):
    1737              con.create_aggregate("foo", 1, Agg)
    1738  
    1739      def test_closed_set_authorizer(self):
    1740          con = sqlite.connect(":memory:")
    1741          con.close()
    1742          def authorizer(*args):
    1743              return sqlite.DENY
    1744          with self.assertRaises(sqlite.ProgrammingError):
    1745              con.set_authorizer(authorizer)
    1746  
    1747      def test_closed_set_progress_callback(self):
    1748          con = sqlite.connect(":memory:")
    1749          con.close()
    1750          def progress(): pass
    1751          with self.assertRaises(sqlite.ProgrammingError):
    1752              con.set_progress_handler(progress, 100)
    1753  
    1754      def test_closed_call(self):
    1755          con = sqlite.connect(":memory:")
    1756          con.close()
    1757          with self.assertRaises(sqlite.ProgrammingError):
    1758              con()
    1759  
    1760  class ESC[4;38;5;81mClosedCurTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1761      def test_closed(self):
    1762          con = sqlite.connect(":memory:")
    1763          cur = con.cursor()
    1764          cur.close()
    1765  
    1766          for method_name in ("execute", "executemany", "executescript", "fetchall", "fetchmany", "fetchone"):
    1767              if method_name in ("execute", "executescript"):
    1768                  params = ("select 4 union select 5",)
    1769              elif method_name == "executemany":
    1770                  params = ("insert into foo(bar) values (?)", [(3,), (4,)])
    1771              else:
    1772                  params = []
    1773  
    1774              with self.assertRaises(sqlite.ProgrammingError):
    1775                  method = getattr(cur, method_name)
    1776                  method(*params)
    1777  
    1778  
    1779  class ESC[4;38;5;81mSqliteOnConflictTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1780      """
    1781      Tests for SQLite's "insert on conflict" feature.
    1782  
    1783      See https://www.sqlite.org/lang_conflict.html for details.
    1784      """
    1785  
    1786      def setUp(self):
    1787          self.cx = sqlite.connect(":memory:")
    1788          self.cu = self.cx.cursor()
    1789          self.cu.execute("""
    1790            CREATE TABLE test(
    1791              id INTEGER PRIMARY KEY, name TEXT, unique_name TEXT UNIQUE
    1792            );
    1793          """)
    1794  
    1795      def tearDown(self):
    1796          self.cu.close()
    1797          self.cx.close()
    1798  
    1799      def test_on_conflict_rollback_with_explicit_transaction(self):
    1800          self.cx.isolation_level = None  # autocommit mode
    1801          self.cu = self.cx.cursor()
    1802          # Start an explicit transaction.
    1803          self.cu.execute("BEGIN")
    1804          self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')")
    1805          self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')")
    1806          with self.assertRaises(sqlite.IntegrityError):
    1807              self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')")
    1808          # Use connection to commit.
    1809          self.cx.commit()
    1810          self.cu.execute("SELECT name, unique_name from test")
    1811          # Transaction should have rolled back and nothing should be in table.
    1812          self.assertEqual(self.cu.fetchall(), [])
    1813  
    1814      def test_on_conflict_abort_raises_with_explicit_transactions(self):
    1815          # Abort cancels the current sql statement but doesn't change anything
    1816          # about the current transaction.
    1817          self.cx.isolation_level = None  # autocommit mode
    1818          self.cu = self.cx.cursor()
    1819          # Start an explicit transaction.
    1820          self.cu.execute("BEGIN")
    1821          self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')")
    1822          self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')")
    1823          with self.assertRaises(sqlite.IntegrityError):
    1824              self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')")
    1825          self.cx.commit()
    1826          self.cu.execute("SELECT name, unique_name FROM test")
    1827          # Expect the first two inserts to work, third to do nothing.
    1828          self.assertEqual(self.cu.fetchall(), [('abort_test', None), (None, 'foo',)])
    1829  
    1830      def test_on_conflict_rollback_without_transaction(self):
    1831          # Start of implicit transaction
    1832          self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')")
    1833          self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')")
    1834          with self.assertRaises(sqlite.IntegrityError):
    1835              self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')")
    1836          self.cu.execute("SELECT name, unique_name FROM test")
    1837          # Implicit transaction is rolled back on error.
    1838          self.assertEqual(self.cu.fetchall(), [])
    1839  
    1840      def test_on_conflict_abort_raises_without_transactions(self):
    1841          # Abort cancels the current sql statement but doesn't change anything
    1842          # about the current transaction.
    1843          self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')")
    1844          self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')")
    1845          with self.assertRaises(sqlite.IntegrityError):
    1846              self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')")
    1847          # Make sure all other values were inserted.
    1848          self.cu.execute("SELECT name, unique_name FROM test")
    1849          self.assertEqual(self.cu.fetchall(), [('abort_test', None), (None, 'foo',)])
    1850  
    1851      def test_on_conflict_fail(self):
    1852          self.cu.execute("INSERT OR FAIL INTO test(unique_name) VALUES ('foo')")
    1853          with self.assertRaises(sqlite.IntegrityError):
    1854              self.cu.execute("INSERT OR FAIL INTO test(unique_name) VALUES ('foo')")
    1855          self.assertEqual(self.cu.fetchall(), [])
    1856  
    1857      def test_on_conflict_ignore(self):
    1858          self.cu.execute("INSERT OR IGNORE INTO test(unique_name) VALUES ('foo')")
    1859          # Nothing should happen.
    1860          self.cu.execute("INSERT OR IGNORE INTO test(unique_name) VALUES ('foo')")
    1861          self.cu.execute("SELECT unique_name FROM test")
    1862          self.assertEqual(self.cu.fetchall(), [('foo',)])
    1863  
    1864      def test_on_conflict_replace(self):
    1865          self.cu.execute("INSERT OR REPLACE INTO test(name, unique_name) VALUES ('Data!', 'foo')")
    1866          # There shouldn't be an IntegrityError exception.
    1867          self.cu.execute("INSERT OR REPLACE INTO test(name, unique_name) VALUES ('Very different data!', 'foo')")
    1868          self.cu.execute("SELECT name, unique_name FROM test")
    1869          self.assertEqual(self.cu.fetchall(), [('Very different data!', 'foo')])
    1870  
    1871  
    1872  @requires_subprocess()
    1873  class ESC[4;38;5;81mMultiprocessTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    1874      CONNECTION_TIMEOUT = 0  # Disable the busy timeout.
    1875  
    1876      def tearDown(self):
    1877          unlink(TESTFN)
    1878  
    1879      def test_ctx_mgr_rollback_if_commit_failed(self):
    1880          # bpo-27334: ctx manager does not rollback if commit fails
    1881          SCRIPT = f"""if 1:
    1882              import sqlite3
    1883              def wait():
    1884                  print("started")
    1885                  assert "database is locked" in input()
    1886  
    1887              cx = sqlite3.connect("{TESTFN}", timeout={self.CONNECTION_TIMEOUT})
    1888              cx.create_function("wait", 0, wait)
    1889              with cx:
    1890                  cx.execute("create table t(t)")
    1891              try:
    1892                  # execute two transactions; both will try to lock the db
    1893                  cx.executescript('''
    1894                      -- start a transaction and wait for parent
    1895                      begin transaction;
    1896                      select * from t;
    1897                      select wait();
    1898                      rollback;
    1899  
    1900                      -- start a new transaction; would fail if parent holds lock
    1901                      begin transaction;
    1902                      select * from t;
    1903                      rollback;
    1904                  ''')
    1905              finally:
    1906                  cx.close()
    1907          """
    1908  
    1909          # spawn child process
    1910          proc = subprocess.Popen(
    1911              [sys.executable, "-c", SCRIPT],
    1912              encoding="utf-8",
    1913              bufsize=0,
    1914              stdin=subprocess.PIPE,
    1915              stdout=subprocess.PIPE,
    1916          )
    1917          self.addCleanup(proc.communicate)
    1918  
    1919          # wait for child process to start
    1920          self.assertEqual("started", proc.stdout.readline().strip())
    1921  
    1922          cx = sqlite.connect(TESTFN, timeout=self.CONNECTION_TIMEOUT)
    1923          try:  # context manager should correctly release the db lock
    1924              with cx:
    1925                  cx.execute("insert into t values('test')")
    1926          except sqlite.OperationalError as exc:
    1927              proc.stdin.write(str(exc))
    1928          else:
    1929              proc.stdin.write("no error")
    1930          finally:
    1931              cx.close()
    1932  
    1933          # terminate child process
    1934          self.assertIsNone(proc.returncode)
    1935          try:
    1936              proc.communicate(input="end", timeout=SHORT_TIMEOUT)
    1937          except subprocess.TimeoutExpired:
    1938              proc.kill()
    1939              proc.communicate()
    1940              raise
    1941          self.assertEqual(proc.returncode, 0)
    1942  
    1943  
    1944  if __name__ == "__main__":
    1945      unittest.main()