1  # pysqlite2/test/regression.py: pysqlite regression tests
       2  #
       3  # Copyright (C) 2006-2010 Gerhard Häring <gh@ghaering.de>
       4  #
       5  # This file is part of pysqlite.
       6  #
       7  # This software is provided 'as-is', without any express or implied
       8  # warranty.  In no event will the authors be held liable for any damages
       9  # arising from the use of this software.
      10  #
      11  # Permission is granted to anyone to use this software for any purpose,
      12  # including commercial applications, and to alter it and redistribute it
      13  # freely, subject to the following restrictions:
      14  #
      15  # 1. The origin of this software must not be misrepresented; you must not
      16  #    claim that you wrote the original software. If you use this software
      17  #    in a product, an acknowledgment in the product documentation would be
      18  #    appreciated but is not required.
      19  # 2. Altered source versions must be plainly marked as such, and must not be
      20  #    misrepresented as being the original software.
      21  # 3. This notice may not be removed or altered from any source distribution.
      22  
      23  import datetime
      24  import unittest
      25  import sqlite3 as sqlite
      26  import weakref
      27  import functools
      28  
      29  from test import support
      30  from unittest.mock import patch
      31  from test.test_sqlite3.test_dbapi import memory_database, cx_limit
      32  
      33  
      34  class ESC[4;38;5;81mRegressionTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      35      def setUp(self):
      36          self.con = sqlite.connect(":memory:")
      37  
      38      def tearDown(self):
      39          self.con.close()
      40  
      41      def test_pragma_user_version(self):
      42          # This used to crash pysqlite because this pragma command returns NULL for the column name
      43          cur = self.con.cursor()
      44          cur.execute("pragma user_version")
      45  
      46      def test_pragma_schema_version(self):
      47          # This still crashed pysqlite <= 2.2.1
      48          con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
      49          try:
      50              cur = self.con.cursor()
      51              cur.execute("pragma schema_version")
      52          finally:
      53              cur.close()
      54              con.close()
      55  
      56      def test_statement_reset(self):
      57          # pysqlite 2.1.0 to 2.2.0 have the problem that not all statements are
      58          # reset before a rollback, but only those that are still in the
      59          # statement cache. The others are not accessible from the connection object.
      60          con = sqlite.connect(":memory:", cached_statements=5)
      61          cursors = [con.cursor() for x in range(5)]
      62          cursors[0].execute("create table test(x)")
      63          for i in range(10):
      64              cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in range(10)])
      65  
      66          for i in range(5):
      67              cursors[i].execute(" " * i + "select x from test")
      68  
      69          con.rollback()
      70  
      71      def test_column_name_with_spaces(self):
      72          cur = self.con.cursor()
      73          cur.execute('select 1 as "foo bar [datetime]"')
      74          self.assertEqual(cur.description[0][0], "foo bar [datetime]")
      75  
      76          cur.execute('select 1 as "foo baz"')
      77          self.assertEqual(cur.description[0][0], "foo baz")
      78  
      79      def test_statement_finalization_on_close_db(self):
      80          # pysqlite versions <= 2.3.3 only finalized statements in the statement
      81          # cache when closing the database. statements that were still
      82          # referenced in cursors weren't closed and could provoke "
      83          # "OperationalError: Unable to close due to unfinalised statements".
      84          con = sqlite.connect(":memory:")
      85          cursors = []
      86          # default statement cache size is 100
      87          for i in range(105):
      88              cur = con.cursor()
      89              cursors.append(cur)
      90              cur.execute("select 1 x union select " + str(i))
      91          con.close()
      92  
      93      def test_on_conflict_rollback(self):
      94          con = sqlite.connect(":memory:")
      95          con.execute("create table foo(x, unique(x) on conflict rollback)")
      96          con.execute("insert into foo(x) values (1)")
      97          try:
      98              con.execute("insert into foo(x) values (1)")
      99          except sqlite.DatabaseError:
     100              pass
     101          con.execute("insert into foo(x) values (2)")
     102          try:
     103              con.commit()
     104          except sqlite.OperationalError:
     105              self.fail("pysqlite knew nothing about the implicit ROLLBACK")
     106  
     107      def test_workaround_for_buggy_sqlite_transfer_bindings(self):
     108          """
     109          pysqlite would crash with older SQLite versions unless
     110          a workaround is implemented.
     111          """
     112          self.con.execute("create table foo(bar)")
     113          self.con.execute("drop table foo")
     114          self.con.execute("create table foo(bar)")
     115  
     116      def test_empty_statement(self):
     117          """
     118          pysqlite used to segfault with SQLite versions 3.5.x. These return NULL
     119          for "no-operation" statements
     120          """
     121          self.con.execute("")
     122  
     123      def test_type_map_usage(self):
     124          """
     125          pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling
     126          a statement. This test exhibits the problem.
     127          """
     128          SELECT = "select * from foo"
     129          con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES)
     130          cur = con.cursor()
     131          cur.execute("create table foo(bar timestamp)")
     132          with self.assertWarnsRegex(DeprecationWarning, "adapter"):
     133              cur.execute("insert into foo(bar) values (?)", (datetime.datetime.now(),))
     134          cur.execute(SELECT)
     135          cur.execute("drop table foo")
     136          cur.execute("create table foo(bar integer)")
     137          cur.execute("insert into foo(bar) values (5)")
     138          cur.execute(SELECT)
     139  
     140      def test_bind_mutating_list(self):
     141          # Issue41662: Crash when mutate a list of parameters during iteration.
     142          class ESC[4;38;5;81mX:
     143              def __conform__(self, protocol):
     144                  parameters.clear()
     145                  return "..."
     146          parameters = [X(), 0]
     147          con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES)
     148          con.execute("create table foo(bar X, baz integer)")
     149          # Should not crash
     150          with self.assertRaises(IndexError):
     151              con.execute("insert into foo(bar, baz) values (?, ?)", parameters)
     152  
     153      def test_error_msg_decode_error(self):
     154          # When porting the module to Python 3.0, the error message about
     155          # decoding errors disappeared. This verifies they're back again.
     156          with self.assertRaises(sqlite.OperationalError) as cm:
     157              self.con.execute("select 'xxx' || ? || 'yyy' colname",
     158                               (bytes(bytearray([250])),)).fetchone()
     159          msg = "Could not decode to UTF-8 column 'colname' with text 'xxx"
     160          self.assertIn(msg, str(cm.exception))
     161  
     162      def test_register_adapter(self):
     163          """
     164          See issue 3312.
     165          """
     166          self.assertRaises(TypeError, sqlite.register_adapter, {}, None)
     167  
     168      def test_set_isolation_level(self):
     169          # See issue 27881.
     170          class ESC[4;38;5;81mCustomStr(ESC[4;38;5;149mstr):
     171              def upper(self):
     172                  return None
     173              def __del__(self):
     174                  con.isolation_level = ""
     175  
     176          con = sqlite.connect(":memory:")
     177          con.isolation_level = None
     178          for level in "", "DEFERRED", "IMMEDIATE", "EXCLUSIVE":
     179              with self.subTest(level=level):
     180                  con.isolation_level = level
     181                  con.isolation_level = level.lower()
     182                  con.isolation_level = level.capitalize()
     183                  con.isolation_level = CustomStr(level)
     184  
     185          # setting isolation_level failure should not alter previous state
     186          con.isolation_level = None
     187          con.isolation_level = "DEFERRED"
     188          pairs = [
     189              (1, TypeError), (b'', TypeError), ("abc", ValueError),
     190              ("IMMEDIATE\0EXCLUSIVE", ValueError), ("\xe9", ValueError),
     191          ]
     192          for value, exc in pairs:
     193              with self.subTest(level=value):
     194                  with self.assertRaises(exc):
     195                      con.isolation_level = value
     196                  self.assertEqual(con.isolation_level, "DEFERRED")
     197  
     198      def test_cursor_constructor_call_check(self):
     199          """
     200          Verifies that cursor methods check whether base class __init__ was
     201          called.
     202          """
     203          class ESC[4;38;5;81mCursor(ESC[4;38;5;149msqliteESC[4;38;5;149m.ESC[4;38;5;149mCursor):
     204              def __init__(self, con):
     205                  pass
     206  
     207          con = sqlite.connect(":memory:")
     208          cur = Cursor(con)
     209          with self.assertRaises(sqlite.ProgrammingError):
     210              cur.execute("select 4+5").fetchall()
     211          with self.assertRaisesRegex(sqlite.ProgrammingError,
     212                                      r'^Base Cursor\.__init__ not called\.$'):
     213              cur.close()
     214  
     215      def test_str_subclass(self):
     216          """
     217          The Python 3.0 port of the module didn't cope with values of subclasses of str.
     218          """
     219          class ESC[4;38;5;81mMyStr(ESC[4;38;5;149mstr): pass
     220          self.con.execute("select ?", (MyStr("abc"),))
     221  
     222      def test_connection_constructor_call_check(self):
     223          """
     224          Verifies that connection methods check whether base class __init__ was
     225          called.
     226          """
     227          class ESC[4;38;5;81mConnection(ESC[4;38;5;149msqliteESC[4;38;5;149m.ESC[4;38;5;149mConnection):
     228              def __init__(self, name):
     229                  pass
     230  
     231          con = Connection(":memory:")
     232          with self.assertRaises(sqlite.ProgrammingError):
     233              cur = con.cursor()
     234  
     235      def test_auto_commit(self):
     236          """
     237          Verifies that creating a connection in autocommit mode works.
     238          2.5.3 introduced a regression so that these could no longer
     239          be created.
     240          """
     241          con = sqlite.connect(":memory:", isolation_level=None)
     242  
     243      def test_pragma_autocommit(self):
     244          """
     245          Verifies that running a PRAGMA statement that does an autocommit does
     246          work. This did not work in 2.5.3/2.5.4.
     247          """
     248          cur = self.con.cursor()
     249          cur.execute("create table foo(bar)")
     250          cur.execute("insert into foo(bar) values (5)")
     251  
     252          cur.execute("pragma page_size")
     253          row = cur.fetchone()
     254  
     255      def test_connection_call(self):
     256          """
     257          Call a connection with a non-string SQL request: check error handling
     258          of the statement constructor.
     259          """
     260          self.assertRaises(TypeError, self.con, b"select 1")
     261  
     262      def test_collation(self):
     263          def collation_cb(a, b):
     264              return 1
     265          self.assertRaises(UnicodeEncodeError, self.con.create_collation,
     266              # Lone surrogate cannot be encoded to the default encoding (utf8)
     267              "\uDC80", collation_cb)
     268  
     269      def test_recursive_cursor_use(self):
     270          """
     271          http://bugs.python.org/issue10811
     272  
     273          Recursively using a cursor, such as when reusing it from a generator led to segfaults.
     274          Now we catch recursive cursor usage and raise a ProgrammingError.
     275          """
     276          con = sqlite.connect(":memory:")
     277  
     278          cur = con.cursor()
     279          cur.execute("create table a (bar)")
     280          cur.execute("create table b (baz)")
     281  
     282          def foo():
     283              cur.execute("insert into a (bar) values (?)", (1,))
     284              yield 1
     285  
     286          with self.assertRaises(sqlite.ProgrammingError):
     287              cur.executemany("insert into b (baz) values (?)",
     288                              ((i,) for i in foo()))
     289  
     290      def test_convert_timestamp_microsecond_padding(self):
     291          """
     292          http://bugs.python.org/issue14720
     293  
     294          The microsecond parsing of convert_timestamp() should pad with zeros,
     295          since the microsecond string "456" actually represents "456000".
     296          """
     297  
     298          con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES)
     299          cur = con.cursor()
     300          cur.execute("CREATE TABLE t (x TIMESTAMP)")
     301  
     302          # Microseconds should be 456000
     303          cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.456')")
     304  
     305          # Microseconds should be truncated to 123456
     306          cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.123456789')")
     307  
     308          cur.execute("SELECT * FROM t")
     309          with self.assertWarnsRegex(DeprecationWarning, "converter"):
     310              values = [x[0] for x in cur.fetchall()]
     311  
     312          self.assertEqual(values, [
     313              datetime.datetime(2012, 4, 4, 15, 6, 0, 456000),
     314              datetime.datetime(2012, 4, 4, 15, 6, 0, 123456),
     315          ])
     316  
     317      def test_invalid_isolation_level_type(self):
     318          # isolation level is a string, not an integer
     319          self.assertRaises(TypeError,
     320                            sqlite.connect, ":memory:", isolation_level=123)
     321  
     322  
     323      def test_null_character(self):
     324          # Issue #21147
     325          cur = self.con.cursor()
     326          queries = ["\0select 1", "select 1\0"]
     327          for query in queries:
     328              with self.subTest(query=query):
     329                  self.assertRaisesRegex(sqlite.ProgrammingError, "null char",
     330                                         self.con.execute, query)
     331              with self.subTest(query=query):
     332                  self.assertRaisesRegex(sqlite.ProgrammingError, "null char",
     333                                         cur.execute, query)
     334  
     335      def test_surrogates(self):
     336          con = sqlite.connect(":memory:")
     337          self.assertRaises(UnicodeEncodeError, con, "select '\ud8ff'")
     338          self.assertRaises(UnicodeEncodeError, con, "select '\udcff'")
     339          cur = con.cursor()
     340          self.assertRaises(UnicodeEncodeError, cur.execute, "select '\ud8ff'")
     341          self.assertRaises(UnicodeEncodeError, cur.execute, "select '\udcff'")
     342  
     343      def test_large_sql(self):
     344          msg = "query string is too large"
     345          with memory_database() as cx, cx_limit(cx) as lim:
     346              cu = cx.cursor()
     347  
     348              cx("select 1".ljust(lim))
     349              # use a different SQL statement; don't reuse from the LRU cache
     350              cu.execute("select 2".ljust(lim))
     351  
     352              sql = "select 3".ljust(lim+1)
     353              self.assertRaisesRegex(sqlite.DataError, msg, cx, sql)
     354              self.assertRaisesRegex(sqlite.DataError, msg, cu.execute, sql)
     355  
     356      def test_commit_cursor_reset(self):
     357          """
     358          Connection.commit() did reset cursors, which made sqlite3
     359          to return rows multiple times when fetched from cursors
     360          after commit. See issues 10513 and 23129 for details.
     361          """
     362          con = sqlite.connect(":memory:")
     363          con.executescript("""
     364          create table t(c);
     365          create table t2(c);
     366          insert into t values(0);
     367          insert into t values(1);
     368          insert into t values(2);
     369          """)
     370  
     371          self.assertEqual(con.isolation_level, "")
     372  
     373          counter = 0
     374          for i, row in enumerate(con.execute("select c from t")):
     375              with self.subTest(i=i, row=row):
     376                  con.execute("insert into t2(c) values (?)", (i,))
     377                  con.commit()
     378                  if counter == 0:
     379                      self.assertEqual(row[0], 0)
     380                  elif counter == 1:
     381                      self.assertEqual(row[0], 1)
     382                  elif counter == 2:
     383                      self.assertEqual(row[0], 2)
     384                  counter += 1
     385          self.assertEqual(counter, 3, "should have returned exactly three rows")
     386  
     387      def test_bpo31770(self):
     388          """
     389          The interpreter shouldn't crash in case Cursor.__init__() is called
     390          more than once.
     391          """
     392          def callback(*args):
     393              pass
     394          con = sqlite.connect(":memory:")
     395          cur = sqlite.Cursor(con)
     396          ref = weakref.ref(cur, callback)
     397          cur.__init__(con)
     398          del cur
     399          # The interpreter shouldn't crash when ref is collected.
     400          del ref
     401          support.gc_collect()
     402  
     403      def test_del_isolation_level_segfault(self):
     404          with self.assertRaises(AttributeError):
     405              del self.con.isolation_level
     406  
     407      def test_bpo37347(self):
     408          class ESC[4;38;5;81mPrinter:
     409              def log(self, *args):
     410                  return sqlite.SQLITE_OK
     411  
     412          for method in [self.con.set_trace_callback,
     413                         functools.partial(self.con.set_progress_handler, n=1),
     414                         self.con.set_authorizer]:
     415              printer_instance = Printer()
     416              method(printer_instance.log)
     417              method(printer_instance.log)
     418              self.con.execute("select 1")  # trigger seg fault
     419              method(None)
     420  
     421      def test_return_empty_bytestring(self):
     422          cur = self.con.execute("select X''")
     423          val = cur.fetchone()[0]
     424          self.assertEqual(val, b'')
     425  
     426      def test_table_lock_cursor_replace_stmt(self):
     427          with memory_database() as con:
     428              cur = con.cursor()
     429              cur.execute("create table t(t)")
     430              cur.executemany("insert into t values(?)",
     431                              ((v,) for v in range(5)))
     432              con.commit()
     433              cur.execute("select t from t")
     434              cur.execute("drop table t")
     435              con.commit()
     436  
     437      def test_table_lock_cursor_dealloc(self):
     438          with memory_database() as con:
     439              con.execute("create table t(t)")
     440              con.executemany("insert into t values(?)",
     441                              ((v,) for v in range(5)))
     442              con.commit()
     443              cur = con.execute("select t from t")
     444              del cur
     445              con.execute("drop table t")
     446              con.commit()
     447  
     448      def test_table_lock_cursor_non_readonly_select(self):
     449          with memory_database() as con:
     450              con.execute("create table t(t)")
     451              con.executemany("insert into t values(?)",
     452                              ((v,) for v in range(5)))
     453              con.commit()
     454              def dup(v):
     455                  con.execute("insert into t values(?)", (v,))
     456                  return
     457              con.create_function("dup", 1, dup)
     458              cur = con.execute("select dup(t) from t")
     459              del cur
     460              con.execute("drop table t")
     461              con.commit()
     462  
     463      def test_executescript_step_through_select(self):
     464          with memory_database() as con:
     465              values = [(v,) for v in range(5)]
     466              with con:
     467                  con.execute("create table t(t)")
     468                  con.executemany("insert into t values(?)", values)
     469              steps = []
     470              con.create_function("step", 1, lambda x: steps.append((x,)))
     471              con.executescript("select step(t) from t")
     472              self.assertEqual(steps, values)
     473  
     474  
     475  class ESC[4;38;5;81mRecursiveUseOfCursors(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     476      # GH-80254: sqlite3 should not segfault for recursive use of cursors.
     477      msg = "Recursive use of cursors not allowed"
     478  
     479      def setUp(self):
     480          self.con = sqlite.connect(":memory:",
     481                                    detect_types=sqlite.PARSE_COLNAMES)
     482          self.cur = self.con.cursor()
     483          self.cur.execute("create table test(x foo)")
     484          self.cur.executemany("insert into test(x) values (?)",
     485                               [("foo",), ("bar",)])
     486  
     487      def tearDown(self):
     488          self.cur.close()
     489          self.con.close()
     490  
     491      def test_recursive_cursor_init(self):
     492          conv = lambda x: self.cur.__init__(self.con)
     493          with patch.dict(sqlite.converters, {"INIT": conv}):
     494              self.cur.execute('select x as "x [INIT]", x from test')
     495              self.assertRaisesRegex(sqlite.ProgrammingError, self.msg,
     496                                     self.cur.fetchall)
     497  
     498      def test_recursive_cursor_close(self):
     499          conv = lambda x: self.cur.close()
     500          with patch.dict(sqlite.converters, {"CLOSE": conv}):
     501              self.cur.execute('select x as "x [CLOSE]", x from test')
     502              self.assertRaisesRegex(sqlite.ProgrammingError, self.msg,
     503                                     self.cur.fetchall)
     504  
     505      def test_recursive_cursor_iter(self):
     506          conv = lambda x, l=[]: self.cur.fetchone() if l else l.append(None)
     507          with patch.dict(sqlite.converters, {"ITER": conv}):
     508              self.cur.execute('select x as "x [ITER]", x from test')
     509              self.assertRaisesRegex(sqlite.ProgrammingError, self.msg,
     510                                     self.cur.fetchall)
     511  
     512  
     513  if __name__ == "__main__":
     514      unittest.main()