(root)/
Python-3.11.7/
Lib/
test/
test_sqlite3/
test_regression.py
       1  # pysqlite2/test/regression.py: pysqlite regression tests
       2  #
       3  # Copyright (C) 2006-2010 Gerhard Häring <gh@ghaering.de>
       4  #
       5  # This file is part of pysqlite.
       6  #
       7  # This software is provided 'as-is', without any express or implied
       8  # warranty.  In no event will the authors be held liable for any damages
       9  # arising from the use of this software.
      10  #
      11  # Permission is granted to anyone to use this software for any purpose,
      12  # including commercial applications, and to alter it and redistribute it
      13  # freely, subject to the following restrictions:
      14  #
      15  # 1. The origin of this software must not be misrepresented; you must not
      16  #    claim that you wrote the original software. If you use this software
      17  #    in a product, an acknowledgment in the product documentation would be
      18  #    appreciated but is not required.
      19  # 2. Altered source versions must be plainly marked as such, and must not be
      20  #    misrepresented as being the original software.
      21  # 3. This notice may not be removed or altered from any source distribution.
      22  
      23  import datetime
      24  import unittest
      25  import sqlite3 as sqlite
      26  import weakref
      27  import functools
      28  
      29  from test import support
      30  from unittest.mock import patch
      31  from test.test_sqlite3.test_dbapi import memory_database, cx_limit
      32  
      33  
      34  class ESC[4;38;5;81mRegressionTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      35      def setUp(self):
      36          self.con = sqlite.connect(":memory:")
      37  
      38      def tearDown(self):
      39          self.con.close()
      40  
      41      def test_pragma_user_version(self):
      42          # This used to crash pysqlite because this pragma command returns NULL for the column name
      43          cur = self.con.cursor()
      44          cur.execute("pragma user_version")
      45  
      46      def test_pragma_schema_version(self):
      47          # This still crashed pysqlite <= 2.2.1
      48          con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
      49          try:
      50              cur = self.con.cursor()
      51              cur.execute("pragma schema_version")
      52          finally:
      53              cur.close()
      54              con.close()
      55  
      56      def test_statement_reset(self):
      57          # pysqlite 2.1.0 to 2.2.0 have the problem that not all statements are
      58          # reset before a rollback, but only those that are still in the
      59          # statement cache. The others are not accessible from the connection object.
      60          con = sqlite.connect(":memory:", cached_statements=5)
      61          cursors = [con.cursor() for x in range(5)]
      62          cursors[0].execute("create table test(x)")
      63          for i in range(10):
      64              cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in range(10)])
      65  
      66          for i in range(5):
      67              cursors[i].execute(" " * i + "select x from test")
      68  
      69          con.rollback()
      70  
      71      def test_column_name_with_spaces(self):
      72          cur = self.con.cursor()
      73          cur.execute('select 1 as "foo bar [datetime]"')
      74          self.assertEqual(cur.description[0][0], "foo bar [datetime]")
      75  
      76          cur.execute('select 1 as "foo baz"')
      77          self.assertEqual(cur.description[0][0], "foo baz")
      78  
      79      def test_statement_finalization_on_close_db(self):
      80          # pysqlite versions <= 2.3.3 only finalized statements in the statement
      81          # cache when closing the database. statements that were still
      82          # referenced in cursors weren't closed and could provoke "
      83          # "OperationalError: Unable to close due to unfinalised statements".
      84          con = sqlite.connect(":memory:")
      85          cursors = []
      86          # default statement cache size is 100
      87          for i in range(105):
      88              cur = con.cursor()
      89              cursors.append(cur)
      90              cur.execute("select 1 x union select " + str(i))
      91          con.close()
      92  
      93      def test_on_conflict_rollback(self):
      94          con = sqlite.connect(":memory:")
      95          con.execute("create table foo(x, unique(x) on conflict rollback)")
      96          con.execute("insert into foo(x) values (1)")
      97          try:
      98              con.execute("insert into foo(x) values (1)")
      99          except sqlite.DatabaseError:
     100              pass
     101          con.execute("insert into foo(x) values (2)")
     102          try:
     103              con.commit()
     104          except sqlite.OperationalError:
     105              self.fail("pysqlite knew nothing about the implicit ROLLBACK")
     106  
     107      def test_workaround_for_buggy_sqlite_transfer_bindings(self):
     108          """
     109          pysqlite would crash with older SQLite versions unless
     110          a workaround is implemented.
     111          """
     112          self.con.execute("create table foo(bar)")
     113          self.con.execute("drop table foo")
     114          self.con.execute("create table foo(bar)")
     115  
     116      def test_empty_statement(self):
     117          """
     118          pysqlite used to segfault with SQLite versions 3.5.x. These return NULL
     119          for "no-operation" statements
     120          """
     121          self.con.execute("")
     122  
     123      def test_type_map_usage(self):
     124          """
     125          pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling
     126          a statement. This test exhibits the problem.
     127          """
     128          SELECT = "select * from foo"
     129          con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES)
     130          cur = con.cursor()
     131          cur.execute("create table foo(bar timestamp)")
     132          cur.execute("insert into foo(bar) values (?)", (datetime.datetime.now(),))
     133          cur.execute(SELECT)
     134          cur.execute("drop table foo")
     135          cur.execute("create table foo(bar integer)")
     136          cur.execute("insert into foo(bar) values (5)")
     137          cur.execute(SELECT)
     138  
     139      def test_bind_mutating_list(self):
     140          # Issue41662: Crash when mutate a list of parameters during iteration.
     141          class ESC[4;38;5;81mX:
     142              def __conform__(self, protocol):
     143                  parameters.clear()
     144                  return "..."
     145          parameters = [X(), 0]
     146          con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES)
     147          con.execute("create table foo(bar X, baz integer)")
     148          # Should not crash
     149          with self.assertRaises(IndexError):
     150              con.execute("insert into foo(bar, baz) values (?, ?)", parameters)
     151  
     152      def test_error_msg_decode_error(self):
     153          # When porting the module to Python 3.0, the error message about
     154          # decoding errors disappeared. This verifies they're back again.
     155          with self.assertRaises(sqlite.OperationalError) as cm:
     156              self.con.execute("select 'xxx' || ? || 'yyy' colname",
     157                               (bytes(bytearray([250])),)).fetchone()
     158          msg = "Could not decode to UTF-8 column 'colname' with text 'xxx"
     159          self.assertIn(msg, str(cm.exception))
     160  
     161      def test_register_adapter(self):
     162          """
     163          See issue 3312.
     164          """
     165          self.assertRaises(TypeError, sqlite.register_adapter, {}, None)
     166  
     167      def test_set_isolation_level(self):
     168          # See issue 27881.
     169          class ESC[4;38;5;81mCustomStr(ESC[4;38;5;149mstr):
     170              def upper(self):
     171                  return None
     172              def __del__(self):
     173                  con.isolation_level = ""
     174  
     175          con = sqlite.connect(":memory:")
     176          con.isolation_level = None
     177          for level in "", "DEFERRED", "IMMEDIATE", "EXCLUSIVE":
     178              with self.subTest(level=level):
     179                  con.isolation_level = level
     180                  con.isolation_level = level.lower()
     181                  con.isolation_level = level.capitalize()
     182                  con.isolation_level = CustomStr(level)
     183  
     184          # setting isolation_level failure should not alter previous state
     185          con.isolation_level = None
     186          con.isolation_level = "DEFERRED"
     187          pairs = [
     188              (1, TypeError), (b'', TypeError), ("abc", ValueError),
     189              ("IMMEDIATE\0EXCLUSIVE", ValueError), ("\xe9", ValueError),
     190          ]
     191          for value, exc in pairs:
     192              with self.subTest(level=value):
     193                  with self.assertRaises(exc):
     194                      con.isolation_level = value
     195                  self.assertEqual(con.isolation_level, "DEFERRED")
     196  
     197      def test_cursor_constructor_call_check(self):
     198          """
     199          Verifies that cursor methods check whether base class __init__ was
     200          called.
     201          """
     202          class ESC[4;38;5;81mCursor(ESC[4;38;5;149msqliteESC[4;38;5;149m.ESC[4;38;5;149mCursor):
     203              def __init__(self, con):
     204                  pass
     205  
     206          con = sqlite.connect(":memory:")
     207          cur = Cursor(con)
     208          with self.assertRaises(sqlite.ProgrammingError):
     209              cur.execute("select 4+5").fetchall()
     210          with self.assertRaisesRegex(sqlite.ProgrammingError,
     211                                      r'^Base Cursor\.__init__ not called\.$'):
     212              cur.close()
     213  
     214      def test_str_subclass(self):
     215          """
     216          The Python 3.0 port of the module didn't cope with values of subclasses of str.
     217          """
     218          class ESC[4;38;5;81mMyStr(ESC[4;38;5;149mstr): pass
     219          self.con.execute("select ?", (MyStr("abc"),))
     220  
     221      def test_connection_constructor_call_check(self):
     222          """
     223          Verifies that connection methods check whether base class __init__ was
     224          called.
     225          """
     226          class ESC[4;38;5;81mConnection(ESC[4;38;5;149msqliteESC[4;38;5;149m.ESC[4;38;5;149mConnection):
     227              def __init__(self, name):
     228                  pass
     229  
     230          con = Connection(":memory:")
     231          with self.assertRaises(sqlite.ProgrammingError):
     232              cur = con.cursor()
     233  
     234      def test_auto_commit(self):
     235          """
     236          Verifies that creating a connection in autocommit mode works.
     237          2.5.3 introduced a regression so that these could no longer
     238          be created.
     239          """
     240          con = sqlite.connect(":memory:", isolation_level=None)
     241  
     242      def test_pragma_autocommit(self):
     243          """
     244          Verifies that running a PRAGMA statement that does an autocommit does
     245          work. This did not work in 2.5.3/2.5.4.
     246          """
     247          cur = self.con.cursor()
     248          cur.execute("create table foo(bar)")
     249          cur.execute("insert into foo(bar) values (5)")
     250  
     251          cur.execute("pragma page_size")
     252          row = cur.fetchone()
     253  
     254      def test_connection_call(self):
     255          """
     256          Call a connection with a non-string SQL request: check error handling
     257          of the statement constructor.
     258          """
     259          self.assertRaises(TypeError, self.con, b"select 1")
     260  
     261      def test_collation(self):
     262          def collation_cb(a, b):
     263              return 1
     264          self.assertRaises(UnicodeEncodeError, self.con.create_collation,
     265              # Lone surrogate cannot be encoded to the default encoding (utf8)
     266              "\uDC80", collation_cb)
     267  
     268      def test_recursive_cursor_use(self):
     269          """
     270          http://bugs.python.org/issue10811
     271  
     272          Recursively using a cursor, such as when reusing it from a generator led to segfaults.
     273          Now we catch recursive cursor usage and raise a ProgrammingError.
     274          """
     275          con = sqlite.connect(":memory:")
     276  
     277          cur = con.cursor()
     278          cur.execute("create table a (bar)")
     279          cur.execute("create table b (baz)")
     280  
     281          def foo():
     282              cur.execute("insert into a (bar) values (?)", (1,))
     283              yield 1
     284  
     285          with self.assertRaises(sqlite.ProgrammingError):
     286              cur.executemany("insert into b (baz) values (?)",
     287                              ((i,) for i in foo()))
     288  
     289      def test_convert_timestamp_microsecond_padding(self):
     290          """
     291          http://bugs.python.org/issue14720
     292  
     293          The microsecond parsing of convert_timestamp() should pad with zeros,
     294          since the microsecond string "456" actually represents "456000".
     295          """
     296  
     297          con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES)
     298          cur = con.cursor()
     299          cur.execute("CREATE TABLE t (x TIMESTAMP)")
     300  
     301          # Microseconds should be 456000
     302          cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.456')")
     303  
     304          # Microseconds should be truncated to 123456
     305          cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.123456789')")
     306  
     307          cur.execute("SELECT * FROM t")
     308          values = [x[0] for x in cur.fetchall()]
     309  
     310          self.assertEqual(values, [
     311              datetime.datetime(2012, 4, 4, 15, 6, 0, 456000),
     312              datetime.datetime(2012, 4, 4, 15, 6, 0, 123456),
     313          ])
     314  
     315      def test_invalid_isolation_level_type(self):
     316          # isolation level is a string, not an integer
     317          self.assertRaises(TypeError,
     318                            sqlite.connect, ":memory:", isolation_level=123)
     319  
     320  
     321      def test_null_character(self):
     322          # Issue #21147
     323          cur = self.con.cursor()
     324          queries = ["\0select 1", "select 1\0"]
     325          for query in queries:
     326              with self.subTest(query=query):
     327                  self.assertRaisesRegex(sqlite.ProgrammingError, "null char",
     328                                         self.con.execute, query)
     329              with self.subTest(query=query):
     330                  self.assertRaisesRegex(sqlite.ProgrammingError, "null char",
     331                                         cur.execute, query)
     332  
     333      def test_surrogates(self):
     334          con = sqlite.connect(":memory:")
     335          self.assertRaises(UnicodeEncodeError, con, "select '\ud8ff'")
     336          self.assertRaises(UnicodeEncodeError, con, "select '\udcff'")
     337          cur = con.cursor()
     338          self.assertRaises(UnicodeEncodeError, cur.execute, "select '\ud8ff'")
     339          self.assertRaises(UnicodeEncodeError, cur.execute, "select '\udcff'")
     340  
     341      def test_large_sql(self):
     342          msg = "query string is too large"
     343          with memory_database() as cx, cx_limit(cx) as lim:
     344              cu = cx.cursor()
     345  
     346              cx("select 1".ljust(lim))
     347              # use a different SQL statement; don't reuse from the LRU cache
     348              cu.execute("select 2".ljust(lim))
     349  
     350              sql = "select 3".ljust(lim+1)
     351              self.assertRaisesRegex(sqlite.DataError, msg, cx, sql)
     352              self.assertRaisesRegex(sqlite.DataError, msg, cu.execute, sql)
     353  
     354      def test_commit_cursor_reset(self):
     355          """
     356          Connection.commit() did reset cursors, which made sqlite3
     357          to return rows multiple times when fetched from cursors
     358          after commit. See issues 10513 and 23129 for details.
     359          """
     360          con = sqlite.connect(":memory:")
     361          con.executescript("""
     362          create table t(c);
     363          create table t2(c);
     364          insert into t values(0);
     365          insert into t values(1);
     366          insert into t values(2);
     367          """)
     368  
     369          self.assertEqual(con.isolation_level, "")
     370  
     371          counter = 0
     372          for i, row in enumerate(con.execute("select c from t")):
     373              with self.subTest(i=i, row=row):
     374                  con.execute("insert into t2(c) values (?)", (i,))
     375                  con.commit()
     376                  if counter == 0:
     377                      self.assertEqual(row[0], 0)
     378                  elif counter == 1:
     379                      self.assertEqual(row[0], 1)
     380                  elif counter == 2:
     381                      self.assertEqual(row[0], 2)
     382                  counter += 1
     383          self.assertEqual(counter, 3, "should have returned exactly three rows")
     384  
     385      def test_bpo31770(self):
     386          """
     387          The interpreter shouldn't crash in case Cursor.__init__() is called
     388          more than once.
     389          """
     390          def callback(*args):
     391              pass
     392          con = sqlite.connect(":memory:")
     393          cur = sqlite.Cursor(con)
     394          ref = weakref.ref(cur, callback)
     395          cur.__init__(con)
     396          del cur
     397          # The interpreter shouldn't crash when ref is collected.
     398          del ref
     399          support.gc_collect()
     400  
     401      def test_del_isolation_level_segfault(self):
     402          with self.assertRaises(AttributeError):
     403              del self.con.isolation_level
     404  
     405      def test_bpo37347(self):
     406          class ESC[4;38;5;81mPrinter:
     407              def log(self, *args):
     408                  return sqlite.SQLITE_OK
     409  
     410          for method in [self.con.set_trace_callback,
     411                         functools.partial(self.con.set_progress_handler, n=1),
     412                         self.con.set_authorizer]:
     413              printer_instance = Printer()
     414              method(printer_instance.log)
     415              method(printer_instance.log)
     416              self.con.execute("select 1")  # trigger seg fault
     417              method(None)
     418  
     419      def test_return_empty_bytestring(self):
     420          cur = self.con.execute("select X''")
     421          val = cur.fetchone()[0]
     422          self.assertEqual(val, b'')
     423  
     424      def test_table_lock_cursor_replace_stmt(self):
     425          with memory_database() as con:
     426              cur = con.cursor()
     427              cur.execute("create table t(t)")
     428              cur.executemany("insert into t values(?)",
     429                              ((v,) for v in range(5)))
     430              con.commit()
     431              cur.execute("select t from t")
     432              cur.execute("drop table t")
     433              con.commit()
     434  
     435      def test_table_lock_cursor_dealloc(self):
     436          with memory_database() as con:
     437              con.execute("create table t(t)")
     438              con.executemany("insert into t values(?)",
     439                              ((v,) for v in range(5)))
     440              con.commit()
     441              cur = con.execute("select t from t")
     442              del cur
     443              con.execute("drop table t")
     444              con.commit()
     445  
     446      def test_table_lock_cursor_non_readonly_select(self):
     447          with memory_database() as con:
     448              con.execute("create table t(t)")
     449              con.executemany("insert into t values(?)",
     450                              ((v,) for v in range(5)))
     451              con.commit()
     452              def dup(v):
     453                  con.execute("insert into t values(?)", (v,))
     454                  return
     455              con.create_function("dup", 1, dup)
     456              cur = con.execute("select dup(t) from t")
     457              del cur
     458              con.execute("drop table t")
     459              con.commit()
     460  
     461      def test_executescript_step_through_select(self):
     462          with memory_database() as con:
     463              values = [(v,) for v in range(5)]
     464              with con:
     465                  con.execute("create table t(t)")
     466                  con.executemany("insert into t values(?)", values)
     467              steps = []
     468              con.create_function("step", 1, lambda x: steps.append((x,)))
     469              con.executescript("select step(t) from t")
     470              self.assertEqual(steps, values)
     471  
     472      def test_custom_cursor_object_crash_gh_99886(self):
     473          # This test segfaults on GH-99886
     474          class ESC[4;38;5;81mMyCursor(ESC[4;38;5;149msqliteESC[4;38;5;149m.ESC[4;38;5;149mCursor):
     475              def __init__(self, *args, **kwargs):
     476                  super().__init__(*args, **kwargs)
     477                  # this can go before or after the super call; doesn't matter
     478                  self.some_attr = None
     479  
     480          with memory_database() as con:
     481              cur = con.cursor(MyCursor)
     482              cur.close()
     483              del cur
     484  
     485  class ESC[4;38;5;81mRecursiveUseOfCursors(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     486      # GH-80254: sqlite3 should not segfault for recursive use of cursors.
     487      msg = "Recursive use of cursors not allowed"
     488  
     489      def setUp(self):
     490          self.con = sqlite.connect(":memory:",
     491                                    detect_types=sqlite.PARSE_COLNAMES)
     492          self.cur = self.con.cursor()
     493          self.cur.execute("create table test(x foo)")
     494          self.cur.executemany("insert into test(x) values (?)",
     495                               [("foo",), ("bar",)])
     496  
     497      def tearDown(self):
     498          self.cur.close()
     499          self.con.close()
     500  
     501      def test_recursive_cursor_init(self):
     502          conv = lambda x: self.cur.__init__(self.con)
     503          with patch.dict(sqlite.converters, {"INIT": conv}):
     504              self.cur.execute(f'select x as "x [INIT]", x from test')
     505              self.assertRaisesRegex(sqlite.ProgrammingError, self.msg,
     506                                     self.cur.fetchall)
     507  
     508      def test_recursive_cursor_close(self):
     509          conv = lambda x: self.cur.close()
     510          with patch.dict(sqlite.converters, {"CLOSE": conv}):
     511              self.cur.execute(f'select x as "x [CLOSE]", x from test')
     512              self.assertRaisesRegex(sqlite.ProgrammingError, self.msg,
     513                                     self.cur.fetchall)
     514  
     515      def test_recursive_cursor_iter(self):
     516          conv = lambda x, l=[]: self.cur.fetchone() if l else l.append(None)
     517          with patch.dict(sqlite.converters, {"ITER": conv}):
     518              self.cur.execute(f'select x as "x [ITER]", x from test')
     519              self.assertRaisesRegex(sqlite.ProgrammingError, self.msg,
     520                                     self.cur.fetchall)
     521  
     522  
     523  if __name__ == "__main__":
     524      unittest.main()