1 # pysqlite2/test/regression.py: pysqlite regression tests
2 #
3 # Copyright (C) 2006-2010 Gerhard Häring <gh@ghaering.de>
4 #
5 # This file is part of pysqlite.
6 #
7 # This software is provided 'as-is', without any express or implied
8 # warranty. In no event will the authors be held liable for any damages
9 # arising from the use of this software.
10 #
11 # Permission is granted to anyone to use this software for any purpose,
12 # including commercial applications, and to alter it and redistribute it
13 # freely, subject to the following restrictions:
14 #
15 # 1. The origin of this software must not be misrepresented; you must not
16 # claim that you wrote the original software. If you use this software
17 # in a product, an acknowledgment in the product documentation would be
18 # appreciated but is not required.
19 # 2. Altered source versions must be plainly marked as such, and must not be
20 # misrepresented as being the original software.
21 # 3. This notice may not be removed or altered from any source distribution.
22
23 import datetime
24 import unittest
25 import sqlite3 as sqlite
26 import weakref
27 import functools
28
29 from test import support
30 from unittest.mock import patch
31 from test.test_sqlite3.test_dbapi import memory_database, cx_limit
32
33
34 class ESC[4;38;5;81mRegressionTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
35 def setUp(self):
36 self.con = sqlite.connect(":memory:")
37
38 def tearDown(self):
39 self.con.close()
40
41 def test_pragma_user_version(self):
42 # This used to crash pysqlite because this pragma command returns NULL for the column name
43 cur = self.con.cursor()
44 cur.execute("pragma user_version")
45
46 def test_pragma_schema_version(self):
47 # This still crashed pysqlite <= 2.2.1
48 con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
49 try:
50 cur = self.con.cursor()
51 cur.execute("pragma schema_version")
52 finally:
53 cur.close()
54 con.close()
55
56 def test_statement_reset(self):
57 # pysqlite 2.1.0 to 2.2.0 have the problem that not all statements are
58 # reset before a rollback, but only those that are still in the
59 # statement cache. The others are not accessible from the connection object.
60 con = sqlite.connect(":memory:", cached_statements=5)
61 cursors = [con.cursor() for x in range(5)]
62 cursors[0].execute("create table test(x)")
63 for i in range(10):
64 cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in range(10)])
65
66 for i in range(5):
67 cursors[i].execute(" " * i + "select x from test")
68
69 con.rollback()
70
71 def test_column_name_with_spaces(self):
72 cur = self.con.cursor()
73 cur.execute('select 1 as "foo bar [datetime]"')
74 self.assertEqual(cur.description[0][0], "foo bar [datetime]")
75
76 cur.execute('select 1 as "foo baz"')
77 self.assertEqual(cur.description[0][0], "foo baz")
78
79 def test_statement_finalization_on_close_db(self):
80 # pysqlite versions <= 2.3.3 only finalized statements in the statement
81 # cache when closing the database. statements that were still
82 # referenced in cursors weren't closed and could provoke "
83 # "OperationalError: Unable to close due to unfinalised statements".
84 con = sqlite.connect(":memory:")
85 cursors = []
86 # default statement cache size is 100
87 for i in range(105):
88 cur = con.cursor()
89 cursors.append(cur)
90 cur.execute("select 1 x union select " + str(i))
91 con.close()
92
93 def test_on_conflict_rollback(self):
94 con = sqlite.connect(":memory:")
95 con.execute("create table foo(x, unique(x) on conflict rollback)")
96 con.execute("insert into foo(x) values (1)")
97 try:
98 con.execute("insert into foo(x) values (1)")
99 except sqlite.DatabaseError:
100 pass
101 con.execute("insert into foo(x) values (2)")
102 try:
103 con.commit()
104 except sqlite.OperationalError:
105 self.fail("pysqlite knew nothing about the implicit ROLLBACK")
106
107 def test_workaround_for_buggy_sqlite_transfer_bindings(self):
108 """
109 pysqlite would crash with older SQLite versions unless
110 a workaround is implemented.
111 """
112 self.con.execute("create table foo(bar)")
113 self.con.execute("drop table foo")
114 self.con.execute("create table foo(bar)")
115
116 def test_empty_statement(self):
117 """
118 pysqlite used to segfault with SQLite versions 3.5.x. These return NULL
119 for "no-operation" statements
120 """
121 self.con.execute("")
122
123 def test_type_map_usage(self):
124 """
125 pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling
126 a statement. This test exhibits the problem.
127 """
128 SELECT = "select * from foo"
129 con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES)
130 cur = con.cursor()
131 cur.execute("create table foo(bar timestamp)")
132 cur.execute("insert into foo(bar) values (?)", (datetime.datetime.now(),))
133 cur.execute(SELECT)
134 cur.execute("drop table foo")
135 cur.execute("create table foo(bar integer)")
136 cur.execute("insert into foo(bar) values (5)")
137 cur.execute(SELECT)
138
139 def test_bind_mutating_list(self):
140 # Issue41662: Crash when mutate a list of parameters during iteration.
141 class ESC[4;38;5;81mX:
142 def __conform__(self, protocol):
143 parameters.clear()
144 return "..."
145 parameters = [X(), 0]
146 con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES)
147 con.execute("create table foo(bar X, baz integer)")
148 # Should not crash
149 with self.assertRaises(IndexError):
150 con.execute("insert into foo(bar, baz) values (?, ?)", parameters)
151
152 def test_error_msg_decode_error(self):
153 # When porting the module to Python 3.0, the error message about
154 # decoding errors disappeared. This verifies they're back again.
155 with self.assertRaises(sqlite.OperationalError) as cm:
156 self.con.execute("select 'xxx' || ? || 'yyy' colname",
157 (bytes(bytearray([250])),)).fetchone()
158 msg = "Could not decode to UTF-8 column 'colname' with text 'xxx"
159 self.assertIn(msg, str(cm.exception))
160
161 def test_register_adapter(self):
162 """
163 See issue 3312.
164 """
165 self.assertRaises(TypeError, sqlite.register_adapter, {}, None)
166
167 def test_set_isolation_level(self):
168 # See issue 27881.
169 class ESC[4;38;5;81mCustomStr(ESC[4;38;5;149mstr):
170 def upper(self):
171 return None
172 def __del__(self):
173 con.isolation_level = ""
174
175 con = sqlite.connect(":memory:")
176 con.isolation_level = None
177 for level in "", "DEFERRED", "IMMEDIATE", "EXCLUSIVE":
178 with self.subTest(level=level):
179 con.isolation_level = level
180 con.isolation_level = level.lower()
181 con.isolation_level = level.capitalize()
182 con.isolation_level = CustomStr(level)
183
184 # setting isolation_level failure should not alter previous state
185 con.isolation_level = None
186 con.isolation_level = "DEFERRED"
187 pairs = [
188 (1, TypeError), (b'', TypeError), ("abc", ValueError),
189 ("IMMEDIATE\0EXCLUSIVE", ValueError), ("\xe9", ValueError),
190 ]
191 for value, exc in pairs:
192 with self.subTest(level=value):
193 with self.assertRaises(exc):
194 con.isolation_level = value
195 self.assertEqual(con.isolation_level, "DEFERRED")
196
197 def test_cursor_constructor_call_check(self):
198 """
199 Verifies that cursor methods check whether base class __init__ was
200 called.
201 """
202 class ESC[4;38;5;81mCursor(ESC[4;38;5;149msqliteESC[4;38;5;149m.ESC[4;38;5;149mCursor):
203 def __init__(self, con):
204 pass
205
206 con = sqlite.connect(":memory:")
207 cur = Cursor(con)
208 with self.assertRaises(sqlite.ProgrammingError):
209 cur.execute("select 4+5").fetchall()
210 with self.assertRaisesRegex(sqlite.ProgrammingError,
211 r'^Base Cursor\.__init__ not called\.$'):
212 cur.close()
213
214 def test_str_subclass(self):
215 """
216 The Python 3.0 port of the module didn't cope with values of subclasses of str.
217 """
218 class ESC[4;38;5;81mMyStr(ESC[4;38;5;149mstr): pass
219 self.con.execute("select ?", (MyStr("abc"),))
220
221 def test_connection_constructor_call_check(self):
222 """
223 Verifies that connection methods check whether base class __init__ was
224 called.
225 """
226 class ESC[4;38;5;81mConnection(ESC[4;38;5;149msqliteESC[4;38;5;149m.ESC[4;38;5;149mConnection):
227 def __init__(self, name):
228 pass
229
230 con = Connection(":memory:")
231 with self.assertRaises(sqlite.ProgrammingError):
232 cur = con.cursor()
233
234 def test_auto_commit(self):
235 """
236 Verifies that creating a connection in autocommit mode works.
237 2.5.3 introduced a regression so that these could no longer
238 be created.
239 """
240 con = sqlite.connect(":memory:", isolation_level=None)
241
242 def test_pragma_autocommit(self):
243 """
244 Verifies that running a PRAGMA statement that does an autocommit does
245 work. This did not work in 2.5.3/2.5.4.
246 """
247 cur = self.con.cursor()
248 cur.execute("create table foo(bar)")
249 cur.execute("insert into foo(bar) values (5)")
250
251 cur.execute("pragma page_size")
252 row = cur.fetchone()
253
254 def test_connection_call(self):
255 """
256 Call a connection with a non-string SQL request: check error handling
257 of the statement constructor.
258 """
259 self.assertRaises(TypeError, self.con, b"select 1")
260
261 def test_collation(self):
262 def collation_cb(a, b):
263 return 1
264 self.assertRaises(UnicodeEncodeError, self.con.create_collation,
265 # Lone surrogate cannot be encoded to the default encoding (utf8)
266 "\uDC80", collation_cb)
267
268 def test_recursive_cursor_use(self):
269 """
270 http://bugs.python.org/issue10811
271
272 Recursively using a cursor, such as when reusing it from a generator led to segfaults.
273 Now we catch recursive cursor usage and raise a ProgrammingError.
274 """
275 con = sqlite.connect(":memory:")
276
277 cur = con.cursor()
278 cur.execute("create table a (bar)")
279 cur.execute("create table b (baz)")
280
281 def foo():
282 cur.execute("insert into a (bar) values (?)", (1,))
283 yield 1
284
285 with self.assertRaises(sqlite.ProgrammingError):
286 cur.executemany("insert into b (baz) values (?)",
287 ((i,) for i in foo()))
288
289 def test_convert_timestamp_microsecond_padding(self):
290 """
291 http://bugs.python.org/issue14720
292
293 The microsecond parsing of convert_timestamp() should pad with zeros,
294 since the microsecond string "456" actually represents "456000".
295 """
296
297 con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES)
298 cur = con.cursor()
299 cur.execute("CREATE TABLE t (x TIMESTAMP)")
300
301 # Microseconds should be 456000
302 cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.456')")
303
304 # Microseconds should be truncated to 123456
305 cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.123456789')")
306
307 cur.execute("SELECT * FROM t")
308 values = [x[0] for x in cur.fetchall()]
309
310 self.assertEqual(values, [
311 datetime.datetime(2012, 4, 4, 15, 6, 0, 456000),
312 datetime.datetime(2012, 4, 4, 15, 6, 0, 123456),
313 ])
314
315 def test_invalid_isolation_level_type(self):
316 # isolation level is a string, not an integer
317 self.assertRaises(TypeError,
318 sqlite.connect, ":memory:", isolation_level=123)
319
320
321 def test_null_character(self):
322 # Issue #21147
323 cur = self.con.cursor()
324 queries = ["\0select 1", "select 1\0"]
325 for query in queries:
326 with self.subTest(query=query):
327 self.assertRaisesRegex(sqlite.ProgrammingError, "null char",
328 self.con.execute, query)
329 with self.subTest(query=query):
330 self.assertRaisesRegex(sqlite.ProgrammingError, "null char",
331 cur.execute, query)
332
333 def test_surrogates(self):
334 con = sqlite.connect(":memory:")
335 self.assertRaises(UnicodeEncodeError, con, "select '\ud8ff'")
336 self.assertRaises(UnicodeEncodeError, con, "select '\udcff'")
337 cur = con.cursor()
338 self.assertRaises(UnicodeEncodeError, cur.execute, "select '\ud8ff'")
339 self.assertRaises(UnicodeEncodeError, cur.execute, "select '\udcff'")
340
341 def test_large_sql(self):
342 msg = "query string is too large"
343 with memory_database() as cx, cx_limit(cx) as lim:
344 cu = cx.cursor()
345
346 cx("select 1".ljust(lim))
347 # use a different SQL statement; don't reuse from the LRU cache
348 cu.execute("select 2".ljust(lim))
349
350 sql = "select 3".ljust(lim+1)
351 self.assertRaisesRegex(sqlite.DataError, msg, cx, sql)
352 self.assertRaisesRegex(sqlite.DataError, msg, cu.execute, sql)
353
354 def test_commit_cursor_reset(self):
355 """
356 Connection.commit() did reset cursors, which made sqlite3
357 to return rows multiple times when fetched from cursors
358 after commit. See issues 10513 and 23129 for details.
359 """
360 con = sqlite.connect(":memory:")
361 con.executescript("""
362 create table t(c);
363 create table t2(c);
364 insert into t values(0);
365 insert into t values(1);
366 insert into t values(2);
367 """)
368
369 self.assertEqual(con.isolation_level, "")
370
371 counter = 0
372 for i, row in enumerate(con.execute("select c from t")):
373 with self.subTest(i=i, row=row):
374 con.execute("insert into t2(c) values (?)", (i,))
375 con.commit()
376 if counter == 0:
377 self.assertEqual(row[0], 0)
378 elif counter == 1:
379 self.assertEqual(row[0], 1)
380 elif counter == 2:
381 self.assertEqual(row[0], 2)
382 counter += 1
383 self.assertEqual(counter, 3, "should have returned exactly three rows")
384
385 def test_bpo31770(self):
386 """
387 The interpreter shouldn't crash in case Cursor.__init__() is called
388 more than once.
389 """
390 def callback(*args):
391 pass
392 con = sqlite.connect(":memory:")
393 cur = sqlite.Cursor(con)
394 ref = weakref.ref(cur, callback)
395 cur.__init__(con)
396 del cur
397 # The interpreter shouldn't crash when ref is collected.
398 del ref
399 support.gc_collect()
400
401 def test_del_isolation_level_segfault(self):
402 with self.assertRaises(AttributeError):
403 del self.con.isolation_level
404
405 def test_bpo37347(self):
406 class ESC[4;38;5;81mPrinter:
407 def log(self, *args):
408 return sqlite.SQLITE_OK
409
410 for method in [self.con.set_trace_callback,
411 functools.partial(self.con.set_progress_handler, n=1),
412 self.con.set_authorizer]:
413 printer_instance = Printer()
414 method(printer_instance.log)
415 method(printer_instance.log)
416 self.con.execute("select 1") # trigger seg fault
417 method(None)
418
419 def test_return_empty_bytestring(self):
420 cur = self.con.execute("select X''")
421 val = cur.fetchone()[0]
422 self.assertEqual(val, b'')
423
424 def test_table_lock_cursor_replace_stmt(self):
425 with memory_database() as con:
426 cur = con.cursor()
427 cur.execute("create table t(t)")
428 cur.executemany("insert into t values(?)",
429 ((v,) for v in range(5)))
430 con.commit()
431 cur.execute("select t from t")
432 cur.execute("drop table t")
433 con.commit()
434
435 def test_table_lock_cursor_dealloc(self):
436 with memory_database() as con:
437 con.execute("create table t(t)")
438 con.executemany("insert into t values(?)",
439 ((v,) for v in range(5)))
440 con.commit()
441 cur = con.execute("select t from t")
442 del cur
443 con.execute("drop table t")
444 con.commit()
445
446 def test_table_lock_cursor_non_readonly_select(self):
447 with memory_database() as con:
448 con.execute("create table t(t)")
449 con.executemany("insert into t values(?)",
450 ((v,) for v in range(5)))
451 con.commit()
452 def dup(v):
453 con.execute("insert into t values(?)", (v,))
454 return
455 con.create_function("dup", 1, dup)
456 cur = con.execute("select dup(t) from t")
457 del cur
458 con.execute("drop table t")
459 con.commit()
460
461 def test_executescript_step_through_select(self):
462 with memory_database() as con:
463 values = [(v,) for v in range(5)]
464 with con:
465 con.execute("create table t(t)")
466 con.executemany("insert into t values(?)", values)
467 steps = []
468 con.create_function("step", 1, lambda x: steps.append((x,)))
469 con.executescript("select step(t) from t")
470 self.assertEqual(steps, values)
471
472 def test_custom_cursor_object_crash_gh_99886(self):
473 # This test segfaults on GH-99886
474 class ESC[4;38;5;81mMyCursor(ESC[4;38;5;149msqliteESC[4;38;5;149m.ESC[4;38;5;149mCursor):
475 def __init__(self, *args, **kwargs):
476 super().__init__(*args, **kwargs)
477 # this can go before or after the super call; doesn't matter
478 self.some_attr = None
479
480 with memory_database() as con:
481 cur = con.cursor(MyCursor)
482 cur.close()
483 del cur
484
485 class ESC[4;38;5;81mRecursiveUseOfCursors(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
486 # GH-80254: sqlite3 should not segfault for recursive use of cursors.
487 msg = "Recursive use of cursors not allowed"
488
489 def setUp(self):
490 self.con = sqlite.connect(":memory:",
491 detect_types=sqlite.PARSE_COLNAMES)
492 self.cur = self.con.cursor()
493 self.cur.execute("create table test(x foo)")
494 self.cur.executemany("insert into test(x) values (?)",
495 [("foo",), ("bar",)])
496
497 def tearDown(self):
498 self.cur.close()
499 self.con.close()
500
501 def test_recursive_cursor_init(self):
502 conv = lambda x: self.cur.__init__(self.con)
503 with patch.dict(sqlite.converters, {"INIT": conv}):
504 self.cur.execute(f'select x as "x [INIT]", x from test')
505 self.assertRaisesRegex(sqlite.ProgrammingError, self.msg,
506 self.cur.fetchall)
507
508 def test_recursive_cursor_close(self):
509 conv = lambda x: self.cur.close()
510 with patch.dict(sqlite.converters, {"CLOSE": conv}):
511 self.cur.execute(f'select x as "x [CLOSE]", x from test')
512 self.assertRaisesRegex(sqlite.ProgrammingError, self.msg,
513 self.cur.fetchall)
514
515 def test_recursive_cursor_iter(self):
516 conv = lambda x, l=[]: self.cur.fetchone() if l else l.append(None)
517 with patch.dict(sqlite.converters, {"ITER": conv}):
518 self.cur.execute(f'select x as "x [ITER]", x from test')
519 self.assertRaisesRegex(sqlite.ProgrammingError, self.msg,
520 self.cur.fetchall)
521
522
523 if __name__ == "__main__":
524 unittest.main()