python (3.12.0)
1 # pysqlite2/test/dbapi.py: tests for DB-API compliance
2 #
3 # Copyright (C) 2004-2010 Gerhard Häring <gh@ghaering.de>
4 #
5 # This file is part of pysqlite.
6 #
7 # This software is provided 'as-is', without any express or implied
8 # warranty. In no event will the authors be held liable for any damages
9 # arising from the use of this software.
10 #
11 # Permission is granted to anyone to use this software for any purpose,
12 # including commercial applications, and to alter it and redistribute it
13 # freely, subject to the following restrictions:
14 #
15 # 1. The origin of this software must not be misrepresented; you must not
16 # claim that you wrote the original software. If you use this software
17 # in a product, an acknowledgment in the product documentation would be
18 # appreciated but is not required.
19 # 2. Altered source versions must be plainly marked as such, and must not be
20 # misrepresented as being the original software.
21 # 3. This notice may not be removed or altered from any source distribution.
22
23 import contextlib
24 import os
25 import sqlite3 as sqlite
26 import subprocess
27 import sys
28 import threading
29 import unittest
30 import urllib.parse
31
32 from test.support import (
33 SHORT_TIMEOUT, check_disallow_instantiation, requires_subprocess,
34 is_emscripten, is_wasi
35 )
36 from test.support import threading_helper
37 from _testcapi import INT_MAX, ULLONG_MAX
38 from os import SEEK_SET, SEEK_CUR, SEEK_END
39 from test.support.os_helper import TESTFN, TESTFN_UNDECODABLE, unlink, temp_dir, FakePath
40
41
42 # Helper for temporary memory databases
43 def memory_database(*args, **kwargs):
44 cx = sqlite.connect(":memory:", *args, **kwargs)
45 return contextlib.closing(cx)
46
47
48 # Temporarily limit a database connection parameter
49 @contextlib.contextmanager
50 def cx_limit(cx, category=sqlite.SQLITE_LIMIT_SQL_LENGTH, limit=128):
51 try:
52 _prev = cx.setlimit(category, limit)
53 yield limit
54 finally:
55 cx.setlimit(category, _prev)
56
57
58 class ESC[4;38;5;81mModuleTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
59 def test_api_level(self):
60 self.assertEqual(sqlite.apilevel, "2.0",
61 "apilevel is %s, should be 2.0" % sqlite.apilevel)
62
63 def test_deprecated_version(self):
64 msg = "deprecated and will be removed in Python 3.14"
65 for attr in "version", "version_info":
66 with self.subTest(attr=attr):
67 with self.assertWarnsRegex(DeprecationWarning, msg) as cm:
68 getattr(sqlite, attr)
69 self.assertEqual(cm.filename, __file__)
70 with self.assertWarnsRegex(DeprecationWarning, msg) as cm:
71 getattr(sqlite.dbapi2, attr)
72 self.assertEqual(cm.filename, __file__)
73
74 def test_thread_safety(self):
75 self.assertIn(sqlite.threadsafety, {0, 1, 3},
76 "threadsafety is %d, should be 0, 1 or 3" %
77 sqlite.threadsafety)
78
79 def test_param_style(self):
80 self.assertEqual(sqlite.paramstyle, "qmark",
81 "paramstyle is '%s', should be 'qmark'" %
82 sqlite.paramstyle)
83
84 def test_warning(self):
85 self.assertTrue(issubclass(sqlite.Warning, Exception),
86 "Warning is not a subclass of Exception")
87
88 def test_error(self):
89 self.assertTrue(issubclass(sqlite.Error, Exception),
90 "Error is not a subclass of Exception")
91
92 def test_interface_error(self):
93 self.assertTrue(issubclass(sqlite.InterfaceError, sqlite.Error),
94 "InterfaceError is not a subclass of Error")
95
96 def test_database_error(self):
97 self.assertTrue(issubclass(sqlite.DatabaseError, sqlite.Error),
98 "DatabaseError is not a subclass of Error")
99
100 def test_data_error(self):
101 self.assertTrue(issubclass(sqlite.DataError, sqlite.DatabaseError),
102 "DataError is not a subclass of DatabaseError")
103
104 def test_operational_error(self):
105 self.assertTrue(issubclass(sqlite.OperationalError, sqlite.DatabaseError),
106 "OperationalError is not a subclass of DatabaseError")
107
108 def test_integrity_error(self):
109 self.assertTrue(issubclass(sqlite.IntegrityError, sqlite.DatabaseError),
110 "IntegrityError is not a subclass of DatabaseError")
111
112 def test_internal_error(self):
113 self.assertTrue(issubclass(sqlite.InternalError, sqlite.DatabaseError),
114 "InternalError is not a subclass of DatabaseError")
115
116 def test_programming_error(self):
117 self.assertTrue(issubclass(sqlite.ProgrammingError, sqlite.DatabaseError),
118 "ProgrammingError is not a subclass of DatabaseError")
119
120 def test_not_supported_error(self):
121 self.assertTrue(issubclass(sqlite.NotSupportedError,
122 sqlite.DatabaseError),
123 "NotSupportedError is not a subclass of DatabaseError")
124
125 def test_module_constants(self):
126 consts = [
127 "SQLITE_ABORT",
128 "SQLITE_ALTER_TABLE",
129 "SQLITE_ANALYZE",
130 "SQLITE_ATTACH",
131 "SQLITE_AUTH",
132 "SQLITE_BUSY",
133 "SQLITE_CANTOPEN",
134 "SQLITE_CONSTRAINT",
135 "SQLITE_CORRUPT",
136 "SQLITE_CREATE_INDEX",
137 "SQLITE_CREATE_TABLE",
138 "SQLITE_CREATE_TEMP_INDEX",
139 "SQLITE_CREATE_TEMP_TABLE",
140 "SQLITE_CREATE_TEMP_TRIGGER",
141 "SQLITE_CREATE_TEMP_VIEW",
142 "SQLITE_CREATE_TRIGGER",
143 "SQLITE_CREATE_VIEW",
144 "SQLITE_CREATE_VTABLE",
145 "SQLITE_DELETE",
146 "SQLITE_DENY",
147 "SQLITE_DETACH",
148 "SQLITE_DONE",
149 "SQLITE_DROP_INDEX",
150 "SQLITE_DROP_TABLE",
151 "SQLITE_DROP_TEMP_INDEX",
152 "SQLITE_DROP_TEMP_TABLE",
153 "SQLITE_DROP_TEMP_TRIGGER",
154 "SQLITE_DROP_TEMP_VIEW",
155 "SQLITE_DROP_TRIGGER",
156 "SQLITE_DROP_VIEW",
157 "SQLITE_DROP_VTABLE",
158 "SQLITE_EMPTY",
159 "SQLITE_ERROR",
160 "SQLITE_FORMAT",
161 "SQLITE_FULL",
162 "SQLITE_FUNCTION",
163 "SQLITE_IGNORE",
164 "SQLITE_INSERT",
165 "SQLITE_INTERNAL",
166 "SQLITE_INTERRUPT",
167 "SQLITE_IOERR",
168 "SQLITE_LOCKED",
169 "SQLITE_MISMATCH",
170 "SQLITE_MISUSE",
171 "SQLITE_NOLFS",
172 "SQLITE_NOMEM",
173 "SQLITE_NOTADB",
174 "SQLITE_NOTFOUND",
175 "SQLITE_OK",
176 "SQLITE_PERM",
177 "SQLITE_PRAGMA",
178 "SQLITE_PROTOCOL",
179 "SQLITE_RANGE",
180 "SQLITE_READ",
181 "SQLITE_READONLY",
182 "SQLITE_REINDEX",
183 "SQLITE_ROW",
184 "SQLITE_SAVEPOINT",
185 "SQLITE_SCHEMA",
186 "SQLITE_SELECT",
187 "SQLITE_TOOBIG",
188 "SQLITE_TRANSACTION",
189 "SQLITE_UPDATE",
190 # Run-time limit categories
191 "SQLITE_LIMIT_LENGTH",
192 "SQLITE_LIMIT_SQL_LENGTH",
193 "SQLITE_LIMIT_COLUMN",
194 "SQLITE_LIMIT_EXPR_DEPTH",
195 "SQLITE_LIMIT_COMPOUND_SELECT",
196 "SQLITE_LIMIT_VDBE_OP",
197 "SQLITE_LIMIT_FUNCTION_ARG",
198 "SQLITE_LIMIT_ATTACHED",
199 "SQLITE_LIMIT_LIKE_PATTERN_LENGTH",
200 "SQLITE_LIMIT_VARIABLE_NUMBER",
201 "SQLITE_LIMIT_TRIGGER_DEPTH",
202 ]
203 if sqlite.sqlite_version_info >= (3, 7, 17):
204 consts += ["SQLITE_NOTICE", "SQLITE_WARNING"]
205 if sqlite.sqlite_version_info >= (3, 8, 3):
206 consts.append("SQLITE_RECURSIVE")
207 if sqlite.sqlite_version_info >= (3, 8, 7):
208 consts.append("SQLITE_LIMIT_WORKER_THREADS")
209 consts += ["PARSE_DECLTYPES", "PARSE_COLNAMES"]
210 # Extended result codes
211 consts += [
212 "SQLITE_ABORT_ROLLBACK",
213 "SQLITE_BUSY_RECOVERY",
214 "SQLITE_CANTOPEN_FULLPATH",
215 "SQLITE_CANTOPEN_ISDIR",
216 "SQLITE_CANTOPEN_NOTEMPDIR",
217 "SQLITE_CORRUPT_VTAB",
218 "SQLITE_IOERR_ACCESS",
219 "SQLITE_IOERR_BLOCKED",
220 "SQLITE_IOERR_CHECKRESERVEDLOCK",
221 "SQLITE_IOERR_CLOSE",
222 "SQLITE_IOERR_DELETE",
223 "SQLITE_IOERR_DELETE_NOENT",
224 "SQLITE_IOERR_DIR_CLOSE",
225 "SQLITE_IOERR_DIR_FSYNC",
226 "SQLITE_IOERR_FSTAT",
227 "SQLITE_IOERR_FSYNC",
228 "SQLITE_IOERR_LOCK",
229 "SQLITE_IOERR_NOMEM",
230 "SQLITE_IOERR_RDLOCK",
231 "SQLITE_IOERR_READ",
232 "SQLITE_IOERR_SEEK",
233 "SQLITE_IOERR_SHMLOCK",
234 "SQLITE_IOERR_SHMMAP",
235 "SQLITE_IOERR_SHMOPEN",
236 "SQLITE_IOERR_SHMSIZE",
237 "SQLITE_IOERR_SHORT_READ",
238 "SQLITE_IOERR_TRUNCATE",
239 "SQLITE_IOERR_UNLOCK",
240 "SQLITE_IOERR_WRITE",
241 "SQLITE_LOCKED_SHAREDCACHE",
242 "SQLITE_READONLY_CANTLOCK",
243 "SQLITE_READONLY_RECOVERY",
244 ]
245 if sqlite.sqlite_version_info >= (3, 7, 16):
246 consts += [
247 "SQLITE_CONSTRAINT_CHECK",
248 "SQLITE_CONSTRAINT_COMMITHOOK",
249 "SQLITE_CONSTRAINT_FOREIGNKEY",
250 "SQLITE_CONSTRAINT_FUNCTION",
251 "SQLITE_CONSTRAINT_NOTNULL",
252 "SQLITE_CONSTRAINT_PRIMARYKEY",
253 "SQLITE_CONSTRAINT_TRIGGER",
254 "SQLITE_CONSTRAINT_UNIQUE",
255 "SQLITE_CONSTRAINT_VTAB",
256 "SQLITE_READONLY_ROLLBACK",
257 ]
258 if sqlite.sqlite_version_info >= (3, 7, 17):
259 consts += [
260 "SQLITE_IOERR_MMAP",
261 "SQLITE_NOTICE_RECOVER_ROLLBACK",
262 "SQLITE_NOTICE_RECOVER_WAL",
263 ]
264 if sqlite.sqlite_version_info >= (3, 8, 0):
265 consts += [
266 "SQLITE_BUSY_SNAPSHOT",
267 "SQLITE_IOERR_GETTEMPPATH",
268 "SQLITE_WARNING_AUTOINDEX",
269 ]
270 if sqlite.sqlite_version_info >= (3, 8, 1):
271 consts += ["SQLITE_CANTOPEN_CONVPATH", "SQLITE_IOERR_CONVPATH"]
272 if sqlite.sqlite_version_info >= (3, 8, 2):
273 consts.append("SQLITE_CONSTRAINT_ROWID")
274 if sqlite.sqlite_version_info >= (3, 8, 3):
275 consts.append("SQLITE_READONLY_DBMOVED")
276 if sqlite.sqlite_version_info >= (3, 8, 7):
277 consts.append("SQLITE_AUTH_USER")
278 if sqlite.sqlite_version_info >= (3, 9, 0):
279 consts.append("SQLITE_IOERR_VNODE")
280 if sqlite.sqlite_version_info >= (3, 10, 0):
281 consts.append("SQLITE_IOERR_AUTH")
282 if sqlite.sqlite_version_info >= (3, 14, 1):
283 consts.append("SQLITE_OK_LOAD_PERMANENTLY")
284 if sqlite.sqlite_version_info >= (3, 21, 0):
285 consts += [
286 "SQLITE_IOERR_BEGIN_ATOMIC",
287 "SQLITE_IOERR_COMMIT_ATOMIC",
288 "SQLITE_IOERR_ROLLBACK_ATOMIC",
289 ]
290 if sqlite.sqlite_version_info >= (3, 22, 0):
291 consts += [
292 "SQLITE_ERROR_MISSING_COLLSEQ",
293 "SQLITE_ERROR_RETRY",
294 "SQLITE_READONLY_CANTINIT",
295 "SQLITE_READONLY_DIRECTORY",
296 ]
297 if sqlite.sqlite_version_info >= (3, 24, 0):
298 consts += ["SQLITE_CORRUPT_SEQUENCE", "SQLITE_LOCKED_VTAB"]
299 if sqlite.sqlite_version_info >= (3, 25, 0):
300 consts += ["SQLITE_CANTOPEN_DIRTYWAL", "SQLITE_ERROR_SNAPSHOT"]
301 if sqlite.sqlite_version_info >= (3, 31, 0):
302 consts += [
303 "SQLITE_CANTOPEN_SYMLINK",
304 "SQLITE_CONSTRAINT_PINNED",
305 "SQLITE_OK_SYMLINK",
306 ]
307 if sqlite.sqlite_version_info >= (3, 32, 0):
308 consts += [
309 "SQLITE_BUSY_TIMEOUT",
310 "SQLITE_CORRUPT_INDEX",
311 "SQLITE_IOERR_DATA",
312 ]
313 if sqlite.sqlite_version_info >= (3, 34, 0):
314 consts.append("SQLITE_IOERR_CORRUPTFS")
315 for const in consts:
316 with self.subTest(const=const):
317 self.assertTrue(hasattr(sqlite, const))
318
319 def test_error_code_on_exception(self):
320 err_msg = "unable to open database file"
321 if sys.platform.startswith("win"):
322 err_code = sqlite.SQLITE_CANTOPEN_ISDIR
323 else:
324 err_code = sqlite.SQLITE_CANTOPEN
325
326 with temp_dir() as db:
327 with self.assertRaisesRegex(sqlite.Error, err_msg) as cm:
328 sqlite.connect(db)
329 e = cm.exception
330 self.assertEqual(e.sqlite_errorcode, err_code)
331 self.assertTrue(e.sqlite_errorname.startswith("SQLITE_CANTOPEN"))
332
333 @unittest.skipIf(sqlite.sqlite_version_info <= (3, 7, 16),
334 "Requires SQLite 3.7.16 or newer")
335 def test_extended_error_code_on_exception(self):
336 with memory_database() as con:
337 with con:
338 con.execute("create table t(t integer check(t > 0))")
339 errmsg = "constraint failed"
340 with self.assertRaisesRegex(sqlite.IntegrityError, errmsg) as cm:
341 con.execute("insert into t values(-1)")
342 exc = cm.exception
343 self.assertEqual(exc.sqlite_errorcode,
344 sqlite.SQLITE_CONSTRAINT_CHECK)
345 self.assertEqual(exc.sqlite_errorname, "SQLITE_CONSTRAINT_CHECK")
346
347 def test_disallow_instantiation(self):
348 cx = sqlite.connect(":memory:")
349 check_disallow_instantiation(self, type(cx("select 1")))
350 check_disallow_instantiation(self, sqlite.Blob)
351
352 def test_complete_statement(self):
353 self.assertFalse(sqlite.complete_statement("select t"))
354 self.assertTrue(sqlite.complete_statement("create table t(t);"))
355
356
357 class ESC[4;38;5;81mConnectionTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
358
359 def setUp(self):
360 self.cx = sqlite.connect(":memory:")
361 cu = self.cx.cursor()
362 cu.execute("create table test(id integer primary key, name text)")
363 cu.execute("insert into test(name) values (?)", ("foo",))
364
365 def tearDown(self):
366 self.cx.close()
367
368 def test_commit(self):
369 self.cx.commit()
370
371 def test_commit_after_no_changes(self):
372 """
373 A commit should also work when no changes were made to the database.
374 """
375 self.cx.commit()
376 self.cx.commit()
377
378 def test_rollback(self):
379 self.cx.rollback()
380
381 def test_rollback_after_no_changes(self):
382 """
383 A rollback should also work when no changes were made to the database.
384 """
385 self.cx.rollback()
386 self.cx.rollback()
387
388 def test_cursor(self):
389 cu = self.cx.cursor()
390
391 def test_failed_open(self):
392 YOU_CANNOT_OPEN_THIS = "/foo/bar/bla/23534/mydb.db"
393 with self.assertRaises(sqlite.OperationalError):
394 sqlite.connect(YOU_CANNOT_OPEN_THIS)
395
396 def test_close(self):
397 self.cx.close()
398
399 def test_use_after_close(self):
400 sql = "select 1"
401 cu = self.cx.cursor()
402 res = cu.execute(sql)
403 self.cx.close()
404 self.assertRaises(sqlite.ProgrammingError, res.fetchall)
405 self.assertRaises(sqlite.ProgrammingError, cu.execute, sql)
406 self.assertRaises(sqlite.ProgrammingError, cu.executemany, sql, [])
407 self.assertRaises(sqlite.ProgrammingError, cu.executescript, sql)
408 self.assertRaises(sqlite.ProgrammingError, self.cx.execute, sql)
409 self.assertRaises(sqlite.ProgrammingError,
410 self.cx.executemany, sql, [])
411 self.assertRaises(sqlite.ProgrammingError, self.cx.executescript, sql)
412 self.assertRaises(sqlite.ProgrammingError,
413 self.cx.create_function, "t", 1, lambda x: x)
414 self.assertRaises(sqlite.ProgrammingError, self.cx.cursor)
415 with self.assertRaises(sqlite.ProgrammingError):
416 with self.cx:
417 pass
418
419 def test_exceptions(self):
420 # Optional DB-API extension.
421 self.assertEqual(self.cx.Warning, sqlite.Warning)
422 self.assertEqual(self.cx.Error, sqlite.Error)
423 self.assertEqual(self.cx.InterfaceError, sqlite.InterfaceError)
424 self.assertEqual(self.cx.DatabaseError, sqlite.DatabaseError)
425 self.assertEqual(self.cx.DataError, sqlite.DataError)
426 self.assertEqual(self.cx.OperationalError, sqlite.OperationalError)
427 self.assertEqual(self.cx.IntegrityError, sqlite.IntegrityError)
428 self.assertEqual(self.cx.InternalError, sqlite.InternalError)
429 self.assertEqual(self.cx.ProgrammingError, sqlite.ProgrammingError)
430 self.assertEqual(self.cx.NotSupportedError, sqlite.NotSupportedError)
431
432 def test_in_transaction(self):
433 # Can't use db from setUp because we want to test initial state.
434 cx = sqlite.connect(":memory:")
435 cu = cx.cursor()
436 self.assertEqual(cx.in_transaction, False)
437 cu.execute("create table transactiontest(id integer primary key, name text)")
438 self.assertEqual(cx.in_transaction, False)
439 cu.execute("insert into transactiontest(name) values (?)", ("foo",))
440 self.assertEqual(cx.in_transaction, True)
441 cu.execute("select name from transactiontest where name=?", ["foo"])
442 row = cu.fetchone()
443 self.assertEqual(cx.in_transaction, True)
444 cx.commit()
445 self.assertEqual(cx.in_transaction, False)
446 cu.execute("select name from transactiontest where name=?", ["foo"])
447 row = cu.fetchone()
448 self.assertEqual(cx.in_transaction, False)
449
450 def test_in_transaction_ro(self):
451 with self.assertRaises(AttributeError):
452 self.cx.in_transaction = True
453
454 def test_connection_exceptions(self):
455 exceptions = [
456 "DataError",
457 "DatabaseError",
458 "Error",
459 "IntegrityError",
460 "InterfaceError",
461 "NotSupportedError",
462 "OperationalError",
463 "ProgrammingError",
464 "Warning",
465 ]
466 for exc in exceptions:
467 with self.subTest(exc=exc):
468 self.assertTrue(hasattr(self.cx, exc))
469 self.assertIs(getattr(sqlite, exc), getattr(self.cx, exc))
470
471 def test_interrupt_on_closed_db(self):
472 cx = sqlite.connect(":memory:")
473 cx.close()
474 with self.assertRaises(sqlite.ProgrammingError):
475 cx.interrupt()
476
477 def test_interrupt(self):
478 self.assertIsNone(self.cx.interrupt())
479
480 def test_drop_unused_refs(self):
481 for n in range(500):
482 cu = self.cx.execute(f"select {n}")
483 self.assertEqual(cu.fetchone()[0], n)
484
485 def test_connection_limits(self):
486 category = sqlite.SQLITE_LIMIT_SQL_LENGTH
487 saved_limit = self.cx.getlimit(category)
488 try:
489 new_limit = 10
490 prev_limit = self.cx.setlimit(category, new_limit)
491 self.assertEqual(saved_limit, prev_limit)
492 self.assertEqual(self.cx.getlimit(category), new_limit)
493 msg = "query string is too large"
494 self.assertRaisesRegex(sqlite.DataError, msg,
495 self.cx.execute, "select 1 as '16'")
496 finally: # restore saved limit
497 self.cx.setlimit(category, saved_limit)
498
499 def test_connection_bad_limit_category(self):
500 msg = "'category' is out of bounds"
501 cat = 1111
502 self.assertRaisesRegex(sqlite.ProgrammingError, msg,
503 self.cx.getlimit, cat)
504 self.assertRaisesRegex(sqlite.ProgrammingError, msg,
505 self.cx.setlimit, cat, 0)
506
507 def test_connection_init_bad_isolation_level(self):
508 msg = (
509 "isolation_level string must be '', 'DEFERRED', 'IMMEDIATE', or "
510 "'EXCLUSIVE'"
511 )
512 levels = (
513 "BOGUS",
514 " ",
515 "DEFERRE",
516 "IMMEDIAT",
517 "EXCLUSIV",
518 "DEFERREDS",
519 "IMMEDIATES",
520 "EXCLUSIVES",
521 )
522 for level in levels:
523 with self.subTest(level=level):
524 with self.assertRaisesRegex(ValueError, msg):
525 memory_database(isolation_level=level)
526 with memory_database() as cx:
527 with self.assertRaisesRegex(ValueError, msg):
528 cx.isolation_level = level
529 # Check that the default level is not changed
530 self.assertEqual(cx.isolation_level, "")
531
532 def test_connection_init_good_isolation_levels(self):
533 for level in ("", "DEFERRED", "IMMEDIATE", "EXCLUSIVE", None):
534 with self.subTest(level=level):
535 with memory_database(isolation_level=level) as cx:
536 self.assertEqual(cx.isolation_level, level)
537 with memory_database() as cx:
538 self.assertEqual(cx.isolation_level, "")
539 cx.isolation_level = level
540 self.assertEqual(cx.isolation_level, level)
541
542 def test_connection_reinit(self):
543 db = ":memory:"
544 cx = sqlite.connect(db)
545 cx.text_factory = bytes
546 cx.row_factory = sqlite.Row
547 cu = cx.cursor()
548 cu.execute("create table foo (bar)")
549 cu.executemany("insert into foo (bar) values (?)",
550 ((str(v),) for v in range(4)))
551 cu.execute("select bar from foo")
552
553 rows = [r for r in cu.fetchmany(2)]
554 self.assertTrue(all(isinstance(r, sqlite.Row) for r in rows))
555 self.assertEqual([r[0] for r in rows], [b"0", b"1"])
556
557 cx.__init__(db)
558 cx.execute("create table foo (bar)")
559 cx.executemany("insert into foo (bar) values (?)",
560 ((v,) for v in ("a", "b", "c", "d")))
561
562 # This uses the old database, old row factory, but new text factory
563 rows = [r for r in cu.fetchall()]
564 self.assertTrue(all(isinstance(r, sqlite.Row) for r in rows))
565 self.assertEqual([r[0] for r in rows], ["2", "3"])
566
567 def test_connection_bad_reinit(self):
568 cx = sqlite.connect(":memory:")
569 with cx:
570 cx.execute("create table t(t)")
571 with temp_dir() as db:
572 self.assertRaisesRegex(sqlite.OperationalError,
573 "unable to open database file",
574 cx.__init__, db)
575 self.assertRaisesRegex(sqlite.ProgrammingError,
576 "Base Connection.__init__ not called",
577 cx.executemany, "insert into t values(?)",
578 ((v,) for v in range(3)))
579
580 def test_connection_config(self):
581 op = sqlite.SQLITE_DBCONFIG_ENABLE_FKEY
582 with memory_database() as cx:
583 with self.assertRaisesRegex(ValueError, "unknown"):
584 cx.getconfig(-1)
585
586 # Toggle and verify.
587 old = cx.getconfig(op)
588 new = not old
589 cx.setconfig(op, new)
590 self.assertEqual(cx.getconfig(op), new)
591
592 cx.setconfig(op) # defaults to True
593 self.assertTrue(cx.getconfig(op))
594
595 # Check that foreign key support was actually enabled.
596 with cx:
597 cx.executescript("""
598 create table t(t integer primary key);
599 create table u(u, foreign key(u) references t(t));
600 """)
601 with self.assertRaisesRegex(sqlite.IntegrityError, "constraint"):
602 cx.execute("insert into u values(0)")
603
604
605 class ESC[4;38;5;81mUninitialisedConnectionTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
606 def setUp(self):
607 self.cx = sqlite.Connection.__new__(sqlite.Connection)
608
609 def test_uninit_operations(self):
610 funcs = (
611 lambda: self.cx.isolation_level,
612 lambda: self.cx.total_changes,
613 lambda: self.cx.in_transaction,
614 lambda: self.cx.iterdump(),
615 lambda: self.cx.cursor(),
616 lambda: self.cx.close(),
617 )
618 for func in funcs:
619 with self.subTest(func=func):
620 self.assertRaisesRegex(sqlite.ProgrammingError,
621 "Base Connection.__init__ not called",
622 func)
623
624
625 @unittest.skipUnless(hasattr(sqlite.Connection, "serialize"),
626 "Needs SQLite serialize API")
627 class ESC[4;38;5;81mSerializeTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
628 def test_serialize_deserialize(self):
629 with memory_database() as cx:
630 with cx:
631 cx.execute("create table t(t)")
632 data = cx.serialize()
633
634 # Remove test table, verify that it was removed.
635 with cx:
636 cx.execute("drop table t")
637 regex = "no such table"
638 with self.assertRaisesRegex(sqlite.OperationalError, regex):
639 cx.execute("select t from t")
640
641 # Deserialize and verify that test table is restored.
642 cx.deserialize(data)
643 cx.execute("select t from t")
644
645 def test_deserialize_wrong_args(self):
646 dataset = (
647 (BufferError, memoryview(b"blob")[::2]),
648 (TypeError, []),
649 (TypeError, 1),
650 (TypeError, None),
651 )
652 for exc, arg in dataset:
653 with self.subTest(exc=exc, arg=arg):
654 with memory_database() as cx:
655 self.assertRaises(exc, cx.deserialize, arg)
656
657 def test_deserialize_corrupt_database(self):
658 with memory_database() as cx:
659 regex = "file is not a database"
660 with self.assertRaisesRegex(sqlite.DatabaseError, regex):
661 cx.deserialize(b"\0\1\3")
662 # SQLite does not generate an error until you try to query the
663 # deserialized database.
664 cx.execute("create table fail(f)")
665
666
667 class ESC[4;38;5;81mOpenTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
668 _sql = "create table test(id integer)"
669
670 def test_open_with_path_like_object(self):
671 """ Checks that we can successfully connect to a database using an object that
672 is PathLike, i.e. has __fspath__(). """
673 path = FakePath(TESTFN)
674 self.addCleanup(unlink, path)
675 self.assertFalse(os.path.exists(path))
676 with contextlib.closing(sqlite.connect(path)) as cx:
677 self.assertTrue(os.path.exists(path))
678 cx.execute(self._sql)
679
680 @unittest.skipIf(sys.platform == "win32", "skipped on Windows")
681 @unittest.skipIf(sys.platform == "darwin", "skipped on macOS")
682 @unittest.skipIf(is_emscripten or is_wasi, "not supported on Emscripten/WASI")
683 @unittest.skipUnless(TESTFN_UNDECODABLE, "only works if there are undecodable paths")
684 def test_open_with_undecodable_path(self):
685 path = TESTFN_UNDECODABLE
686 self.addCleanup(unlink, path)
687 self.assertFalse(os.path.exists(path))
688 with contextlib.closing(sqlite.connect(path)) as cx:
689 self.assertTrue(os.path.exists(path))
690 cx.execute(self._sql)
691
692 def test_open_uri(self):
693 path = TESTFN
694 self.addCleanup(unlink, path)
695 uri = "file:" + urllib.parse.quote(os.fsencode(path))
696 self.assertFalse(os.path.exists(path))
697 with contextlib.closing(sqlite.connect(uri, uri=True)) as cx:
698 self.assertTrue(os.path.exists(path))
699 cx.execute(self._sql)
700
701 def test_open_unquoted_uri(self):
702 path = TESTFN
703 self.addCleanup(unlink, path)
704 uri = "file:" + path
705 self.assertFalse(os.path.exists(path))
706 with contextlib.closing(sqlite.connect(uri, uri=True)) as cx:
707 self.assertTrue(os.path.exists(path))
708 cx.execute(self._sql)
709
710 def test_open_uri_readonly(self):
711 path = TESTFN
712 self.addCleanup(unlink, path)
713 uri = "file:" + urllib.parse.quote(os.fsencode(path)) + "?mode=ro"
714 self.assertFalse(os.path.exists(path))
715 # Cannot create new DB
716 with self.assertRaises(sqlite.OperationalError):
717 sqlite.connect(uri, uri=True)
718 self.assertFalse(os.path.exists(path))
719 sqlite.connect(path).close()
720 self.assertTrue(os.path.exists(path))
721 # Cannot modify new DB
722 with contextlib.closing(sqlite.connect(uri, uri=True)) as cx:
723 with self.assertRaises(sqlite.OperationalError):
724 cx.execute(self._sql)
725
726 @unittest.skipIf(sys.platform == "win32", "skipped on Windows")
727 @unittest.skipIf(sys.platform == "darwin", "skipped on macOS")
728 @unittest.skipIf(is_emscripten or is_wasi, "not supported on Emscripten/WASI")
729 @unittest.skipUnless(TESTFN_UNDECODABLE, "only works if there are undecodable paths")
730 def test_open_undecodable_uri(self):
731 path = TESTFN_UNDECODABLE
732 self.addCleanup(unlink, path)
733 uri = "file:" + urllib.parse.quote(path)
734 self.assertFalse(os.path.exists(path))
735 with contextlib.closing(sqlite.connect(uri, uri=True)) as cx:
736 self.assertTrue(os.path.exists(path))
737 cx.execute(self._sql)
738
739 def test_factory_database_arg(self):
740 def factory(database, *args, **kwargs):
741 nonlocal database_arg
742 database_arg = database
743 return sqlite.Connection(":memory:", *args, **kwargs)
744
745 for database in (TESTFN, os.fsencode(TESTFN),
746 FakePath(TESTFN), FakePath(os.fsencode(TESTFN))):
747 database_arg = None
748 sqlite.connect(database, factory=factory).close()
749 self.assertEqual(database_arg, database)
750
751 def test_database_keyword(self):
752 with contextlib.closing(sqlite.connect(database=":memory:")) as cx:
753 self.assertEqual(type(cx), sqlite.Connection)
754
755
756 class ESC[4;38;5;81mCursorTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
757 def setUp(self):
758 self.cx = sqlite.connect(":memory:")
759 self.cu = self.cx.cursor()
760 self.cu.execute(
761 "create table test(id integer primary key, name text, "
762 "income number, unique_test text unique)"
763 )
764 self.cu.execute("insert into test(name) values (?)", ("foo",))
765
766 def tearDown(self):
767 self.cu.close()
768 self.cx.close()
769
770 def test_execute_no_args(self):
771 self.cu.execute("delete from test")
772
773 def test_execute_illegal_sql(self):
774 with self.assertRaises(sqlite.OperationalError):
775 self.cu.execute("select asdf")
776
777 def test_execute_multiple_statements(self):
778 msg = "You can only execute one statement at a time"
779 dataset = (
780 "select 1; select 2",
781 "select 1; // c++ comments are not allowed",
782 "select 1; *not a comment",
783 "select 1; -*not a comment",
784 "select 1; /* */ a",
785 "select 1; /**/a",
786 "select 1; -",
787 "select 1; /",
788 "select 1; -\n- select 2",
789 """select 1;
790 -- comment
791 select 2
792 """,
793 )
794 for query in dataset:
795 with self.subTest(query=query):
796 with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
797 self.cu.execute(query)
798
799 def test_execute_with_appended_comments(self):
800 dataset = (
801 "select 1; -- foo bar",
802 "select 1; --",
803 "select 1; /*", # Unclosed comments ending in \0 are skipped.
804 """
805 select 5+4;
806
807 /*
808 foo
809 */
810 """,
811 )
812 for query in dataset:
813 with self.subTest(query=query):
814 self.cu.execute(query)
815
816 def test_execute_wrong_sql_arg(self):
817 with self.assertRaises(TypeError):
818 self.cu.execute(42)
819
820 def test_execute_arg_int(self):
821 self.cu.execute("insert into test(id) values (?)", (42,))
822
823 def test_execute_arg_float(self):
824 self.cu.execute("insert into test(income) values (?)", (2500.32,))
825
826 def test_execute_arg_string(self):
827 self.cu.execute("insert into test(name) values (?)", ("Hugo",))
828
829 def test_execute_arg_string_with_zero_byte(self):
830 self.cu.execute("insert into test(name) values (?)", ("Hu\x00go",))
831
832 self.cu.execute("select name from test where id=?", (self.cu.lastrowid,))
833 row = self.cu.fetchone()
834 self.assertEqual(row[0], "Hu\x00go")
835
836 def test_execute_non_iterable(self):
837 with self.assertRaises(sqlite.ProgrammingError) as cm:
838 self.cu.execute("insert into test(id) values (?)", 42)
839 self.assertEqual(str(cm.exception), 'parameters are of unsupported type')
840
841 def test_execute_wrong_no_of_args1(self):
842 # too many parameters
843 with self.assertRaises(sqlite.ProgrammingError):
844 self.cu.execute("insert into test(id) values (?)", (17, "Egon"))
845
846 def test_execute_wrong_no_of_args2(self):
847 # too little parameters
848 with self.assertRaises(sqlite.ProgrammingError):
849 self.cu.execute("insert into test(id) values (?)")
850
851 def test_execute_wrong_no_of_args3(self):
852 # no parameters, parameters are needed
853 with self.assertRaises(sqlite.ProgrammingError):
854 self.cu.execute("insert into test(id) values (?)")
855
856 def test_execute_param_list(self):
857 self.cu.execute("insert into test(name) values ('foo')")
858 self.cu.execute("select name from test where name=?", ["foo"])
859 row = self.cu.fetchone()
860 self.assertEqual(row[0], "foo")
861
862 def test_execute_param_sequence(self):
863 class ESC[4;38;5;81mL:
864 def __len__(self):
865 return 1
866 def __getitem__(self, x):
867 assert x == 0
868 return "foo"
869
870 self.cu.execute("insert into test(name) values ('foo')")
871 self.cu.execute("select name from test where name=?", L())
872 row = self.cu.fetchone()
873 self.assertEqual(row[0], "foo")
874
875 def test_execute_param_sequence_bad_len(self):
876 # Issue41662: Error in __len__() was overridden with ProgrammingError.
877 class ESC[4;38;5;81mL:
878 def __len__(self):
879 1/0
880 def __getitem__(slf, x):
881 raise AssertionError
882
883 self.cu.execute("insert into test(name) values ('foo')")
884 with self.assertRaises(ZeroDivisionError):
885 self.cu.execute("select name from test where name=?", L())
886
887 def test_execute_named_param_and_sequence(self):
888 dataset = (
889 ("select :a", (1,)),
890 ("select :a, ?, ?", (1, 2, 3)),
891 ("select ?, :b, ?", (1, 2, 3)),
892 ("select ?, ?, :c", (1, 2, 3)),
893 ("select :a, :b, ?", (1, 2, 3)),
894 )
895 msg = "Binding.*is a named parameter"
896 for query, params in dataset:
897 with self.subTest(query=query, params=params):
898 with self.assertWarnsRegex(DeprecationWarning, msg) as cm:
899 self.cu.execute(query, params)
900 self.assertEqual(cm.filename, __file__)
901
902 def test_execute_too_many_params(self):
903 category = sqlite.SQLITE_LIMIT_VARIABLE_NUMBER
904 msg = "too many SQL variables"
905 with cx_limit(self.cx, category=category, limit=1):
906 self.cu.execute("select * from test where id=?", (1,))
907 with self.assertRaisesRegex(sqlite.OperationalError, msg):
908 self.cu.execute("select * from test where id!=? and id!=?",
909 (1, 2))
910
911 def test_execute_dict_mapping(self):
912 self.cu.execute("insert into test(name) values ('foo')")
913 self.cu.execute("select name from test where name=:name", {"name": "foo"})
914 row = self.cu.fetchone()
915 self.assertEqual(row[0], "foo")
916
917 def test_execute_dict_mapping_mapping(self):
918 class ESC[4;38;5;81mD(ESC[4;38;5;149mdict):
919 def __missing__(self, key):
920 return "foo"
921
922 self.cu.execute("insert into test(name) values ('foo')")
923 self.cu.execute("select name from test where name=:name", D())
924 row = self.cu.fetchone()
925 self.assertEqual(row[0], "foo")
926
927 def test_execute_dict_mapping_too_little_args(self):
928 self.cu.execute("insert into test(name) values ('foo')")
929 with self.assertRaises(sqlite.ProgrammingError):
930 self.cu.execute("select name from test where name=:name and id=:id", {"name": "foo"})
931
932 def test_execute_dict_mapping_no_args(self):
933 self.cu.execute("insert into test(name) values ('foo')")
934 with self.assertRaises(sqlite.ProgrammingError):
935 self.cu.execute("select name from test where name=:name")
936
937 def test_execute_dict_mapping_unnamed(self):
938 self.cu.execute("insert into test(name) values ('foo')")
939 with self.assertRaises(sqlite.ProgrammingError):
940 self.cu.execute("select name from test where name=?", {"name": "foo"})
941
942 def test_close(self):
943 self.cu.close()
944
945 def test_rowcount_execute(self):
946 self.cu.execute("delete from test")
947 self.cu.execute("insert into test(name) values ('foo')")
948 self.cu.execute("insert into test(name) values ('foo')")
949 self.cu.execute("update test set name='bar'")
950 self.assertEqual(self.cu.rowcount, 2)
951
952 def test_rowcount_select(self):
953 """
954 pysqlite does not know the rowcount of SELECT statements, because we
955 don't fetch all rows after executing the select statement. The rowcount
956 has thus to be -1.
957 """
958 self.cu.execute("select 5 union select 6")
959 self.assertEqual(self.cu.rowcount, -1)
960
961 def test_rowcount_executemany(self):
962 self.cu.execute("delete from test")
963 self.cu.executemany("insert into test(name) values (?)", [(1,), (2,), (3,)])
964 self.assertEqual(self.cu.rowcount, 3)
965
966 @unittest.skipIf(sqlite.sqlite_version_info < (3, 35, 0),
967 "Requires SQLite 3.35.0 or newer")
968 def test_rowcount_update_returning(self):
969 # gh-93421: rowcount is updated correctly for UPDATE...RETURNING queries
970 self.cu.execute("update test set name='bar' where name='foo' returning 1")
971 self.assertEqual(self.cu.fetchone()[0], 1)
972 self.assertEqual(self.cu.rowcount, 1)
973
974 def test_rowcount_prefixed_with_comment(self):
975 # gh-79579: rowcount is updated even if query is prefixed with comments
976 self.cu.execute("""
977 -- foo
978 insert into test(name) values ('foo'), ('foo')
979 """)
980 self.assertEqual(self.cu.rowcount, 2)
981 self.cu.execute("""
982 /* -- messy *r /* /* ** *- *--
983 */
984 /* one more */ insert into test(name) values ('messy')
985 """)
986 self.assertEqual(self.cu.rowcount, 1)
987 self.cu.execute("/* bar */ update test set name='bar' where name='foo'")
988 self.assertEqual(self.cu.rowcount, 3)
989
990 def test_rowcount_vaccuum(self):
991 data = ((1,), (2,), (3,))
992 self.cu.executemany("insert into test(income) values(?)", data)
993 self.assertEqual(self.cu.rowcount, 3)
994 self.cx.commit()
995 self.cu.execute("vacuum")
996 self.assertEqual(self.cu.rowcount, -1)
997
998 def test_total_changes(self):
999 self.cu.execute("insert into test(name) values ('foo')")
1000 self.cu.execute("insert into test(name) values ('foo')")
1001 self.assertLess(2, self.cx.total_changes, msg='total changes reported wrong value')
1002
1003 # Checks for executemany:
1004 # Sequences are required by the DB-API, iterators
1005 # enhancements in pysqlite.
1006
1007 def test_execute_many_sequence(self):
1008 self.cu.executemany("insert into test(income) values (?)", [(x,) for x in range(100, 110)])
1009
1010 def test_execute_many_iterator(self):
1011 class ESC[4;38;5;81mMyIter:
1012 def __init__(self):
1013 self.value = 5
1014
1015 def __iter__(self):
1016 return self
1017
1018 def __next__(self):
1019 if self.value == 10:
1020 raise StopIteration
1021 else:
1022 self.value += 1
1023 return (self.value,)
1024
1025 self.cu.executemany("insert into test(income) values (?)", MyIter())
1026
1027 def test_execute_many_generator(self):
1028 def mygen():
1029 for i in range(5):
1030 yield (i,)
1031
1032 self.cu.executemany("insert into test(income) values (?)", mygen())
1033
1034 def test_execute_many_wrong_sql_arg(self):
1035 with self.assertRaises(TypeError):
1036 self.cu.executemany(42, [(3,)])
1037
1038 def test_execute_many_select(self):
1039 with self.assertRaises(sqlite.ProgrammingError):
1040 self.cu.executemany("select ?", [(3,)])
1041
1042 def test_execute_many_not_iterable(self):
1043 with self.assertRaises(TypeError):
1044 self.cu.executemany("insert into test(income) values (?)", 42)
1045
1046 def test_fetch_iter(self):
1047 # Optional DB-API extension.
1048 self.cu.execute("delete from test")
1049 self.cu.execute("insert into test(id) values (?)", (5,))
1050 self.cu.execute("insert into test(id) values (?)", (6,))
1051 self.cu.execute("select id from test order by id")
1052 lst = []
1053 for row in self.cu:
1054 lst.append(row[0])
1055 self.assertEqual(lst[0], 5)
1056 self.assertEqual(lst[1], 6)
1057
1058 def test_fetchone(self):
1059 self.cu.execute("select name from test")
1060 row = self.cu.fetchone()
1061 self.assertEqual(row[0], "foo")
1062 row = self.cu.fetchone()
1063 self.assertEqual(row, None)
1064
1065 def test_fetchone_no_statement(self):
1066 cur = self.cx.cursor()
1067 row = cur.fetchone()
1068 self.assertEqual(row, None)
1069
1070 def test_array_size(self):
1071 # must default to 1
1072 self.assertEqual(self.cu.arraysize, 1)
1073
1074 # now set to 2
1075 self.cu.arraysize = 2
1076
1077 # now make the query return 3 rows
1078 self.cu.execute("delete from test")
1079 self.cu.execute("insert into test(name) values ('A')")
1080 self.cu.execute("insert into test(name) values ('B')")
1081 self.cu.execute("insert into test(name) values ('C')")
1082 self.cu.execute("select name from test")
1083 res = self.cu.fetchmany()
1084
1085 self.assertEqual(len(res), 2)
1086
1087 def test_fetchmany(self):
1088 self.cu.execute("select name from test")
1089 res = self.cu.fetchmany(100)
1090 self.assertEqual(len(res), 1)
1091 res = self.cu.fetchmany(100)
1092 self.assertEqual(res, [])
1093
1094 def test_fetchmany_kw_arg(self):
1095 """Checks if fetchmany works with keyword arguments"""
1096 self.cu.execute("select name from test")
1097 res = self.cu.fetchmany(size=100)
1098 self.assertEqual(len(res), 1)
1099
1100 def test_fetchall(self):
1101 self.cu.execute("select name from test")
1102 res = self.cu.fetchall()
1103 self.assertEqual(len(res), 1)
1104 res = self.cu.fetchall()
1105 self.assertEqual(res, [])
1106
1107 def test_setinputsizes(self):
1108 self.cu.setinputsizes([3, 4, 5])
1109
1110 def test_setoutputsize(self):
1111 self.cu.setoutputsize(5, 0)
1112
1113 def test_setoutputsize_no_column(self):
1114 self.cu.setoutputsize(42)
1115
1116 def test_cursor_connection(self):
1117 # Optional DB-API extension.
1118 self.assertEqual(self.cu.connection, self.cx)
1119
1120 def test_wrong_cursor_callable(self):
1121 with self.assertRaises(TypeError):
1122 def f(): pass
1123 cur = self.cx.cursor(f)
1124
1125 def test_cursor_wrong_class(self):
1126 class ESC[4;38;5;81mFoo: pass
1127 foo = Foo()
1128 with self.assertRaises(TypeError):
1129 cur = sqlite.Cursor(foo)
1130
1131 def test_last_row_id_on_replace(self):
1132 """
1133 INSERT OR REPLACE and REPLACE INTO should produce the same behavior.
1134 """
1135 sql = '{} INTO test(id, unique_test) VALUES (?, ?)'
1136 for statement in ('INSERT OR REPLACE', 'REPLACE'):
1137 with self.subTest(statement=statement):
1138 self.cu.execute(sql.format(statement), (1, 'foo'))
1139 self.assertEqual(self.cu.lastrowid, 1)
1140
1141 def test_last_row_id_on_ignore(self):
1142 self.cu.execute(
1143 "insert or ignore into test(unique_test) values (?)",
1144 ('test',))
1145 self.assertEqual(self.cu.lastrowid, 2)
1146 self.cu.execute(
1147 "insert or ignore into test(unique_test) values (?)",
1148 ('test',))
1149 self.assertEqual(self.cu.lastrowid, 2)
1150
1151 def test_last_row_id_insert_o_r(self):
1152 results = []
1153 for statement in ('FAIL', 'ABORT', 'ROLLBACK'):
1154 sql = 'INSERT OR {} INTO test(unique_test) VALUES (?)'
1155 with self.subTest(statement='INSERT OR {}'.format(statement)):
1156 self.cu.execute(sql.format(statement), (statement,))
1157 results.append((statement, self.cu.lastrowid))
1158 with self.assertRaises(sqlite.IntegrityError):
1159 self.cu.execute(sql.format(statement), (statement,))
1160 results.append((statement, self.cu.lastrowid))
1161 expected = [
1162 ('FAIL', 2), ('FAIL', 2),
1163 ('ABORT', 3), ('ABORT', 3),
1164 ('ROLLBACK', 4), ('ROLLBACK', 4),
1165 ]
1166 self.assertEqual(results, expected)
1167
1168 def test_column_count(self):
1169 # Check that column count is updated correctly for cached statements
1170 select = "select * from test"
1171 res = self.cu.execute(select)
1172 old_count = len(res.description)
1173 # Add a new column and execute the cached select query again
1174 self.cu.execute("alter table test add newcol")
1175 res = self.cu.execute(select)
1176 new_count = len(res.description)
1177 self.assertEqual(new_count - old_count, 1)
1178
1179 def test_same_query_in_multiple_cursors(self):
1180 cursors = [self.cx.execute("select 1") for _ in range(3)]
1181 for cu in cursors:
1182 self.assertEqual(cu.fetchall(), [(1,)])
1183
1184
1185 class ESC[4;38;5;81mBlobTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1186 def setUp(self):
1187 self.cx = sqlite.connect(":memory:")
1188 self.cx.execute("create table test(b blob)")
1189 self.data = b"this blob data string is exactly fifty bytes long!"
1190 self.cx.execute("insert into test(b) values (?)", (self.data,))
1191 self.blob = self.cx.blobopen("test", "b", 1)
1192
1193 def tearDown(self):
1194 self.blob.close()
1195 self.cx.close()
1196
1197 def test_blob_is_a_blob(self):
1198 self.assertIsInstance(self.blob, sqlite.Blob)
1199
1200 def test_blob_seek_and_tell(self):
1201 self.blob.seek(10)
1202 self.assertEqual(self.blob.tell(), 10)
1203
1204 self.blob.seek(10, SEEK_SET)
1205 self.assertEqual(self.blob.tell(), 10)
1206
1207 self.blob.seek(10, SEEK_CUR)
1208 self.assertEqual(self.blob.tell(), 20)
1209
1210 self.blob.seek(-10, SEEK_END)
1211 self.assertEqual(self.blob.tell(), 40)
1212
1213 def test_blob_seek_error(self):
1214 msg_oor = "offset out of blob range"
1215 msg_orig = "'origin' should be os.SEEK_SET, os.SEEK_CUR, or os.SEEK_END"
1216 msg_of = "seek offset results in overflow"
1217
1218 dataset = (
1219 (ValueError, msg_oor, lambda: self.blob.seek(1000)),
1220 (ValueError, msg_oor, lambda: self.blob.seek(-10)),
1221 (ValueError, msg_orig, lambda: self.blob.seek(10, -1)),
1222 (ValueError, msg_orig, lambda: self.blob.seek(10, 3)),
1223 )
1224 for exc, msg, fn in dataset:
1225 with self.subTest(exc=exc, msg=msg, fn=fn):
1226 self.assertRaisesRegex(exc, msg, fn)
1227
1228 # Force overflow errors
1229 self.blob.seek(1, SEEK_SET)
1230 with self.assertRaisesRegex(OverflowError, msg_of):
1231 self.blob.seek(INT_MAX, SEEK_CUR)
1232 with self.assertRaisesRegex(OverflowError, msg_of):
1233 self.blob.seek(INT_MAX, SEEK_END)
1234
1235 def test_blob_read(self):
1236 buf = self.blob.read()
1237 self.assertEqual(buf, self.data)
1238
1239 def test_blob_read_oversized(self):
1240 buf = self.blob.read(len(self.data) * 2)
1241 self.assertEqual(buf, self.data)
1242
1243 def test_blob_read_advance_offset(self):
1244 n = 10
1245 buf = self.blob.read(n)
1246 self.assertEqual(buf, self.data[:n])
1247 self.assertEqual(self.blob.tell(), n)
1248
1249 def test_blob_read_at_offset(self):
1250 self.blob.seek(10)
1251 self.assertEqual(self.blob.read(10), self.data[10:20])
1252
1253 def test_blob_read_error_row_changed(self):
1254 self.cx.execute("update test set b='aaaa' where rowid=1")
1255 with self.assertRaises(sqlite.OperationalError):
1256 self.blob.read()
1257
1258 def test_blob_write(self):
1259 new_data = b"new data".ljust(50)
1260 self.blob.write(new_data)
1261 row = self.cx.execute("select b from test").fetchone()
1262 self.assertEqual(row[0], new_data)
1263
1264 def test_blob_write_at_offset(self):
1265 new_data = b"c" * 25
1266 self.blob.seek(25)
1267 self.blob.write(new_data)
1268 row = self.cx.execute("select b from test").fetchone()
1269 self.assertEqual(row[0], self.data[:25] + new_data)
1270
1271 def test_blob_write_advance_offset(self):
1272 self.blob.write(b"d"*10)
1273 self.assertEqual(self.blob.tell(), 10)
1274
1275 def test_blob_write_error_length(self):
1276 with self.assertRaisesRegex(ValueError, "data longer than blob"):
1277 self.blob.write(b"a" * 1000)
1278
1279 self.blob.seek(0, SEEK_SET)
1280 n = len(self.blob)
1281 self.blob.write(b"a" * (n-1))
1282 self.blob.write(b"a")
1283 with self.assertRaisesRegex(ValueError, "data longer than blob"):
1284 self.blob.write(b"a")
1285
1286 def test_blob_write_error_row_changed(self):
1287 self.cx.execute("update test set b='aaaa' where rowid=1")
1288 with self.assertRaises(sqlite.OperationalError):
1289 self.blob.write(b"aaa")
1290
1291 def test_blob_write_error_readonly(self):
1292 ro_blob = self.cx.blobopen("test", "b", 1, readonly=True)
1293 with self.assertRaisesRegex(sqlite.OperationalError, "readonly"):
1294 ro_blob.write(b"aaa")
1295 ro_blob.close()
1296
1297 def test_blob_open_error(self):
1298 dataset = (
1299 (("test", "b", 1), {"name": "notexisting"}),
1300 (("notexisting", "b", 1), {}),
1301 (("test", "notexisting", 1), {}),
1302 (("test", "b", 2), {}),
1303 )
1304 regex = "no such"
1305 for args, kwds in dataset:
1306 with self.subTest(args=args, kwds=kwds):
1307 with self.assertRaisesRegex(sqlite.OperationalError, regex):
1308 self.cx.blobopen(*args, **kwds)
1309
1310 def test_blob_length(self):
1311 self.assertEqual(len(self.blob), 50)
1312
1313 def test_blob_get_item(self):
1314 self.assertEqual(self.blob[5], ord("b"))
1315 self.assertEqual(self.blob[6], ord("l"))
1316 self.assertEqual(self.blob[7], ord("o"))
1317 self.assertEqual(self.blob[8], ord("b"))
1318 self.assertEqual(self.blob[-1], ord("!"))
1319
1320 def test_blob_set_item(self):
1321 self.blob[0] = ord("b")
1322 expected = b"b" + self.data[1:]
1323 actual = self.cx.execute("select b from test").fetchone()[0]
1324 self.assertEqual(actual, expected)
1325
1326 def test_blob_set_item_with_offset(self):
1327 self.blob.seek(0, SEEK_END)
1328 self.assertEqual(self.blob.read(), b"") # verify that we're at EOB
1329 self.blob[0] = ord("T")
1330 self.blob[-1] = ord(".")
1331 self.blob.seek(0, SEEK_SET)
1332 expected = b"This blob data string is exactly fifty bytes long."
1333 self.assertEqual(self.blob.read(), expected)
1334
1335 def test_blob_set_slice_buffer_object(self):
1336 from array import array
1337 self.blob[0:5] = memoryview(b"12345")
1338 self.assertEqual(self.blob[0:5], b"12345")
1339
1340 self.blob[0:5] = bytearray(b"23456")
1341 self.assertEqual(self.blob[0:5], b"23456")
1342
1343 self.blob[0:5] = array("b", [1, 2, 3, 4, 5])
1344 self.assertEqual(self.blob[0:5], b"\x01\x02\x03\x04\x05")
1345
1346 def test_blob_set_item_negative_index(self):
1347 self.blob[-1] = 255
1348 self.assertEqual(self.blob[-1], 255)
1349
1350 def test_blob_get_slice(self):
1351 self.assertEqual(self.blob[5:14], b"blob data")
1352
1353 def test_blob_get_empty_slice(self):
1354 self.assertEqual(self.blob[5:5], b"")
1355
1356 def test_blob_get_slice_negative_index(self):
1357 self.assertEqual(self.blob[5:-5], self.data[5:-5])
1358
1359 def test_blob_get_slice_with_skip(self):
1360 self.assertEqual(self.blob[0:10:2], b"ti lb")
1361
1362 def test_blob_set_slice(self):
1363 self.blob[0:5] = b"12345"
1364 expected = b"12345" + self.data[5:]
1365 actual = self.cx.execute("select b from test").fetchone()[0]
1366 self.assertEqual(actual, expected)
1367
1368 def test_blob_set_empty_slice(self):
1369 self.blob[0:0] = b""
1370 self.assertEqual(self.blob[:], self.data)
1371
1372 def test_blob_set_slice_with_skip(self):
1373 self.blob[0:10:2] = b"12345"
1374 actual = self.cx.execute("select b from test").fetchone()[0]
1375 expected = b"1h2s3b4o5 " + self.data[10:]
1376 self.assertEqual(actual, expected)
1377
1378 def test_blob_mapping_invalid_index_type(self):
1379 msg = "indices must be integers"
1380 with self.assertRaisesRegex(TypeError, msg):
1381 self.blob[5:5.5]
1382 with self.assertRaisesRegex(TypeError, msg):
1383 self.blob[1.5]
1384 with self.assertRaisesRegex(TypeError, msg):
1385 self.blob["a"] = b"b"
1386
1387 def test_blob_get_item_error(self):
1388 dataset = [len(self.blob), 105, -105]
1389 for idx in dataset:
1390 with self.subTest(idx=idx):
1391 with self.assertRaisesRegex(IndexError, "index out of range"):
1392 self.blob[idx]
1393 with self.assertRaisesRegex(IndexError, "cannot fit 'int'"):
1394 self.blob[ULLONG_MAX]
1395
1396 # Provoke read error
1397 self.cx.execute("update test set b='aaaa' where rowid=1")
1398 with self.assertRaises(sqlite.OperationalError):
1399 self.blob[0]
1400
1401 def test_blob_set_item_error(self):
1402 with self.assertRaisesRegex(TypeError, "cannot be interpreted"):
1403 self.blob[0] = b"multiple"
1404 with self.assertRaisesRegex(TypeError, "cannot be interpreted"):
1405 self.blob[0] = b"1"
1406 with self.assertRaisesRegex(TypeError, "cannot be interpreted"):
1407 self.blob[0] = bytearray(b"1")
1408 with self.assertRaisesRegex(TypeError, "doesn't support.*deletion"):
1409 del self.blob[0]
1410 with self.assertRaisesRegex(IndexError, "Blob index out of range"):
1411 self.blob[1000] = 0
1412 with self.assertRaisesRegex(ValueError, "must be in range"):
1413 self.blob[0] = -1
1414 with self.assertRaisesRegex(ValueError, "must be in range"):
1415 self.blob[0] = 256
1416 # Overflow errors are overridden with ValueError
1417 with self.assertRaisesRegex(ValueError, "must be in range"):
1418 self.blob[0] = 2**65
1419
1420 def test_blob_set_slice_error(self):
1421 with self.assertRaisesRegex(IndexError, "wrong size"):
1422 self.blob[5:10] = b"a"
1423 with self.assertRaisesRegex(IndexError, "wrong size"):
1424 self.blob[5:10] = b"a" * 1000
1425 with self.assertRaisesRegex(TypeError, "doesn't support.*deletion"):
1426 del self.blob[5:10]
1427 with self.assertRaisesRegex(ValueError, "step cannot be zero"):
1428 self.blob[5:10:0] = b"12345"
1429 with self.assertRaises(BufferError):
1430 self.blob[5:10] = memoryview(b"abcde")[::2]
1431
1432 def test_blob_sequence_not_supported(self):
1433 with self.assertRaisesRegex(TypeError, "unsupported operand"):
1434 self.blob + self.blob
1435 with self.assertRaisesRegex(TypeError, "unsupported operand"):
1436 self.blob * 5
1437 with self.assertRaisesRegex(TypeError, "is not iterable"):
1438 b"a" in self.blob
1439
1440 def test_blob_context_manager(self):
1441 data = b"a" * 50
1442 with self.cx.blobopen("test", "b", 1) as blob:
1443 blob.write(data)
1444 actual = self.cx.execute("select b from test").fetchone()[0]
1445 self.assertEqual(actual, data)
1446
1447 # Check that __exit__ closed the blob
1448 with self.assertRaisesRegex(sqlite.ProgrammingError, "closed blob"):
1449 blob.read()
1450
1451 def test_blob_context_manager_reraise_exceptions(self):
1452 class ESC[4;38;5;81mDummyException(ESC[4;38;5;149mException):
1453 pass
1454 with self.assertRaisesRegex(DummyException, "reraised"):
1455 with self.cx.blobopen("test", "b", 1) as blob:
1456 raise DummyException("reraised")
1457
1458
1459 def test_blob_closed(self):
1460 with memory_database() as cx:
1461 cx.execute("create table test(b blob)")
1462 cx.execute("insert into test values (zeroblob(100))")
1463 blob = cx.blobopen("test", "b", 1)
1464 blob.close()
1465
1466 msg = "Cannot operate on a closed blob"
1467 with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
1468 blob.read()
1469 with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
1470 blob.write(b"")
1471 with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
1472 blob.seek(0)
1473 with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
1474 blob.tell()
1475 with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
1476 blob.__enter__()
1477 with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
1478 blob.__exit__(None, None, None)
1479 with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
1480 len(blob)
1481 with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
1482 blob[0]
1483 with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
1484 blob[0:1]
1485 with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
1486 blob[0] = b""
1487
1488 def test_blob_closed_db_read(self):
1489 with memory_database() as cx:
1490 cx.execute("create table test(b blob)")
1491 cx.execute("insert into test(b) values (zeroblob(100))")
1492 blob = cx.blobopen("test", "b", 1)
1493 cx.close()
1494 self.assertRaisesRegex(sqlite.ProgrammingError,
1495 "Cannot operate on a closed database",
1496 blob.read)
1497
1498 def test_blob_32bit_rowid(self):
1499 # gh-100370: we should not get an OverflowError for 32-bit rowids
1500 with memory_database() as cx:
1501 rowid = 2**32
1502 cx.execute("create table t(t blob)")
1503 cx.execute("insert into t(rowid, t) values (?, zeroblob(1))", (rowid,))
1504 cx.blobopen('t', 't', rowid)
1505
1506
1507 @threading_helper.requires_working_threading()
1508 class ESC[4;38;5;81mThreadTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1509 def setUp(self):
1510 self.con = sqlite.connect(":memory:")
1511 self.cur = self.con.cursor()
1512 self.cur.execute("create table test(name text, b blob)")
1513 self.cur.execute("insert into test values('blob', zeroblob(1))")
1514
1515 def tearDown(self):
1516 self.cur.close()
1517 self.con.close()
1518
1519 @threading_helper.reap_threads
1520 def _run_test(self, fn, *args, **kwds):
1521 def run(err):
1522 try:
1523 fn(*args, **kwds)
1524 err.append("did not raise ProgrammingError")
1525 except sqlite.ProgrammingError:
1526 pass
1527 except:
1528 err.append("raised wrong exception")
1529
1530 err = []
1531 t = threading.Thread(target=run, kwargs={"err": err})
1532 t.start()
1533 t.join()
1534 if err:
1535 self.fail("\n".join(err))
1536
1537 def test_check_connection_thread(self):
1538 fns = [
1539 lambda: self.con.cursor(),
1540 lambda: self.con.commit(),
1541 lambda: self.con.rollback(),
1542 lambda: self.con.close(),
1543 lambda: self.con.set_trace_callback(None),
1544 lambda: self.con.set_authorizer(None),
1545 lambda: self.con.create_collation("foo", None),
1546 lambda: self.con.setlimit(sqlite.SQLITE_LIMIT_LENGTH, -1),
1547 lambda: self.con.getlimit(sqlite.SQLITE_LIMIT_LENGTH),
1548 lambda: self.con.blobopen("test", "b", 1),
1549 ]
1550 if hasattr(sqlite.Connection, "serialize"):
1551 fns.append(lambda: self.con.serialize())
1552 fns.append(lambda: self.con.deserialize(b""))
1553 if sqlite.sqlite_version_info >= (3, 25, 0):
1554 fns.append(lambda: self.con.create_window_function("foo", 0, None))
1555
1556 for fn in fns:
1557 with self.subTest(fn=fn):
1558 self._run_test(fn)
1559
1560 def test_check_cursor_thread(self):
1561 fns = [
1562 lambda: self.cur.execute("insert into test(name) values('a')"),
1563 lambda: self.cur.close(),
1564 lambda: self.cur.execute("select name from test"),
1565 lambda: self.cur.fetchone(),
1566 ]
1567 for fn in fns:
1568 with self.subTest(fn=fn):
1569 self._run_test(fn)
1570
1571
1572 @threading_helper.reap_threads
1573 def test_dont_check_same_thread(self):
1574 def run(con, err):
1575 try:
1576 con.execute("select 1")
1577 except sqlite.Error:
1578 err.append("multi-threading not allowed")
1579
1580 con = sqlite.connect(":memory:", check_same_thread=False)
1581 err = []
1582 t = threading.Thread(target=run, kwargs={"con": con, "err": err})
1583 t.start()
1584 t.join()
1585 self.assertEqual(len(err), 0, "\n".join(err))
1586
1587
1588 class ESC[4;38;5;81mConstructorTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1589 def test_date(self):
1590 d = sqlite.Date(2004, 10, 28)
1591
1592 def test_time(self):
1593 t = sqlite.Time(12, 39, 35)
1594
1595 def test_timestamp(self):
1596 ts = sqlite.Timestamp(2004, 10, 28, 12, 39, 35)
1597
1598 def test_date_from_ticks(self):
1599 d = sqlite.DateFromTicks(42)
1600
1601 def test_time_from_ticks(self):
1602 t = sqlite.TimeFromTicks(42)
1603
1604 def test_timestamp_from_ticks(self):
1605 ts = sqlite.TimestampFromTicks(42)
1606
1607 def test_binary(self):
1608 b = sqlite.Binary(b"\0'")
1609
1610 class ESC[4;38;5;81mExtensionTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1611 def test_script_string_sql(self):
1612 con = sqlite.connect(":memory:")
1613 cur = con.cursor()
1614 cur.executescript("""
1615 -- bla bla
1616 /* a stupid comment */
1617 create table a(i);
1618 insert into a(i) values (5);
1619 """)
1620 cur.execute("select i from a")
1621 res = cur.fetchone()[0]
1622 self.assertEqual(res, 5)
1623
1624 def test_script_syntax_error(self):
1625 con = sqlite.connect(":memory:")
1626 cur = con.cursor()
1627 with self.assertRaises(sqlite.OperationalError):
1628 cur.executescript("create table test(x); asdf; create table test2(x)")
1629
1630 def test_script_error_normal(self):
1631 con = sqlite.connect(":memory:")
1632 cur = con.cursor()
1633 with self.assertRaises(sqlite.OperationalError):
1634 cur.executescript("create table test(sadfsadfdsa); select foo from hurz;")
1635
1636 def test_cursor_executescript_as_bytes(self):
1637 con = sqlite.connect(":memory:")
1638 cur = con.cursor()
1639 with self.assertRaises(TypeError):
1640 cur.executescript(b"create table test(foo); insert into test(foo) values (5);")
1641
1642 def test_cursor_executescript_with_null_characters(self):
1643 con = sqlite.connect(":memory:")
1644 cur = con.cursor()
1645 with self.assertRaises(ValueError):
1646 cur.executescript("""
1647 create table a(i);\0
1648 insert into a(i) values (5);
1649 """)
1650
1651 def test_cursor_executescript_with_surrogates(self):
1652 con = sqlite.connect(":memory:")
1653 cur = con.cursor()
1654 with self.assertRaises(UnicodeEncodeError):
1655 cur.executescript("""
1656 create table a(s);
1657 insert into a(s) values ('\ud8ff');
1658 """)
1659
1660 def test_cursor_executescript_too_large_script(self):
1661 msg = "query string is too large"
1662 with memory_database() as cx, cx_limit(cx) as lim:
1663 cx.executescript("select 'almost too large'".ljust(lim))
1664 with self.assertRaisesRegex(sqlite.DataError, msg):
1665 cx.executescript("select 'too large'".ljust(lim+1))
1666
1667 def test_cursor_executescript_tx_control(self):
1668 con = sqlite.connect(":memory:")
1669 con.execute("begin")
1670 self.assertTrue(con.in_transaction)
1671 con.executescript("select 1")
1672 self.assertFalse(con.in_transaction)
1673
1674 def test_connection_execute(self):
1675 con = sqlite.connect(":memory:")
1676 result = con.execute("select 5").fetchone()[0]
1677 self.assertEqual(result, 5, "Basic test of Connection.execute")
1678
1679 def test_connection_executemany(self):
1680 con = sqlite.connect(":memory:")
1681 con.execute("create table test(foo)")
1682 con.executemany("insert into test(foo) values (?)", [(3,), (4,)])
1683 result = con.execute("select foo from test order by foo").fetchall()
1684 self.assertEqual(result[0][0], 3, "Basic test of Connection.executemany")
1685 self.assertEqual(result[1][0], 4, "Basic test of Connection.executemany")
1686
1687 def test_connection_executescript(self):
1688 con = sqlite.connect(":memory:")
1689 con.executescript("create table test(foo); insert into test(foo) values (5);")
1690 result = con.execute("select foo from test").fetchone()[0]
1691 self.assertEqual(result, 5, "Basic test of Connection.executescript")
1692
1693 class ESC[4;38;5;81mClosedConTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1694 def test_closed_con_cursor(self):
1695 con = sqlite.connect(":memory:")
1696 con.close()
1697 with self.assertRaises(sqlite.ProgrammingError):
1698 cur = con.cursor()
1699
1700 def test_closed_con_commit(self):
1701 con = sqlite.connect(":memory:")
1702 con.close()
1703 with self.assertRaises(sqlite.ProgrammingError):
1704 con.commit()
1705
1706 def test_closed_con_rollback(self):
1707 con = sqlite.connect(":memory:")
1708 con.close()
1709 with self.assertRaises(sqlite.ProgrammingError):
1710 con.rollback()
1711
1712 def test_closed_cur_execute(self):
1713 con = sqlite.connect(":memory:")
1714 cur = con.cursor()
1715 con.close()
1716 with self.assertRaises(sqlite.ProgrammingError):
1717 cur.execute("select 4")
1718
1719 def test_closed_create_function(self):
1720 con = sqlite.connect(":memory:")
1721 con.close()
1722 def f(x): return 17
1723 with self.assertRaises(sqlite.ProgrammingError):
1724 con.create_function("foo", 1, f)
1725
1726 def test_closed_create_aggregate(self):
1727 con = sqlite.connect(":memory:")
1728 con.close()
1729 class ESC[4;38;5;81mAgg:
1730 def __init__(self):
1731 pass
1732 def step(self, x):
1733 pass
1734 def finalize(self):
1735 return 17
1736 with self.assertRaises(sqlite.ProgrammingError):
1737 con.create_aggregate("foo", 1, Agg)
1738
1739 def test_closed_set_authorizer(self):
1740 con = sqlite.connect(":memory:")
1741 con.close()
1742 def authorizer(*args):
1743 return sqlite.DENY
1744 with self.assertRaises(sqlite.ProgrammingError):
1745 con.set_authorizer(authorizer)
1746
1747 def test_closed_set_progress_callback(self):
1748 con = sqlite.connect(":memory:")
1749 con.close()
1750 def progress(): pass
1751 with self.assertRaises(sqlite.ProgrammingError):
1752 con.set_progress_handler(progress, 100)
1753
1754 def test_closed_call(self):
1755 con = sqlite.connect(":memory:")
1756 con.close()
1757 with self.assertRaises(sqlite.ProgrammingError):
1758 con()
1759
1760 class ESC[4;38;5;81mClosedCurTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1761 def test_closed(self):
1762 con = sqlite.connect(":memory:")
1763 cur = con.cursor()
1764 cur.close()
1765
1766 for method_name in ("execute", "executemany", "executescript", "fetchall", "fetchmany", "fetchone"):
1767 if method_name in ("execute", "executescript"):
1768 params = ("select 4 union select 5",)
1769 elif method_name == "executemany":
1770 params = ("insert into foo(bar) values (?)", [(3,), (4,)])
1771 else:
1772 params = []
1773
1774 with self.assertRaises(sqlite.ProgrammingError):
1775 method = getattr(cur, method_name)
1776 method(*params)
1777
1778
1779 class ESC[4;38;5;81mSqliteOnConflictTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1780 """
1781 Tests for SQLite's "insert on conflict" feature.
1782
1783 See https://www.sqlite.org/lang_conflict.html for details.
1784 """
1785
1786 def setUp(self):
1787 self.cx = sqlite.connect(":memory:")
1788 self.cu = self.cx.cursor()
1789 self.cu.execute("""
1790 CREATE TABLE test(
1791 id INTEGER PRIMARY KEY, name TEXT, unique_name TEXT UNIQUE
1792 );
1793 """)
1794
1795 def tearDown(self):
1796 self.cu.close()
1797 self.cx.close()
1798
1799 def test_on_conflict_rollback_with_explicit_transaction(self):
1800 self.cx.isolation_level = None # autocommit mode
1801 self.cu = self.cx.cursor()
1802 # Start an explicit transaction.
1803 self.cu.execute("BEGIN")
1804 self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')")
1805 self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')")
1806 with self.assertRaises(sqlite.IntegrityError):
1807 self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')")
1808 # Use connection to commit.
1809 self.cx.commit()
1810 self.cu.execute("SELECT name, unique_name from test")
1811 # Transaction should have rolled back and nothing should be in table.
1812 self.assertEqual(self.cu.fetchall(), [])
1813
1814 def test_on_conflict_abort_raises_with_explicit_transactions(self):
1815 # Abort cancels the current sql statement but doesn't change anything
1816 # about the current transaction.
1817 self.cx.isolation_level = None # autocommit mode
1818 self.cu = self.cx.cursor()
1819 # Start an explicit transaction.
1820 self.cu.execute("BEGIN")
1821 self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')")
1822 self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')")
1823 with self.assertRaises(sqlite.IntegrityError):
1824 self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')")
1825 self.cx.commit()
1826 self.cu.execute("SELECT name, unique_name FROM test")
1827 # Expect the first two inserts to work, third to do nothing.
1828 self.assertEqual(self.cu.fetchall(), [('abort_test', None), (None, 'foo',)])
1829
1830 def test_on_conflict_rollback_without_transaction(self):
1831 # Start of implicit transaction
1832 self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')")
1833 self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')")
1834 with self.assertRaises(sqlite.IntegrityError):
1835 self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')")
1836 self.cu.execute("SELECT name, unique_name FROM test")
1837 # Implicit transaction is rolled back on error.
1838 self.assertEqual(self.cu.fetchall(), [])
1839
1840 def test_on_conflict_abort_raises_without_transactions(self):
1841 # Abort cancels the current sql statement but doesn't change anything
1842 # about the current transaction.
1843 self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')")
1844 self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')")
1845 with self.assertRaises(sqlite.IntegrityError):
1846 self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')")
1847 # Make sure all other values were inserted.
1848 self.cu.execute("SELECT name, unique_name FROM test")
1849 self.assertEqual(self.cu.fetchall(), [('abort_test', None), (None, 'foo',)])
1850
1851 def test_on_conflict_fail(self):
1852 self.cu.execute("INSERT OR FAIL INTO test(unique_name) VALUES ('foo')")
1853 with self.assertRaises(sqlite.IntegrityError):
1854 self.cu.execute("INSERT OR FAIL INTO test(unique_name) VALUES ('foo')")
1855 self.assertEqual(self.cu.fetchall(), [])
1856
1857 def test_on_conflict_ignore(self):
1858 self.cu.execute("INSERT OR IGNORE INTO test(unique_name) VALUES ('foo')")
1859 # Nothing should happen.
1860 self.cu.execute("INSERT OR IGNORE INTO test(unique_name) VALUES ('foo')")
1861 self.cu.execute("SELECT unique_name FROM test")
1862 self.assertEqual(self.cu.fetchall(), [('foo',)])
1863
1864 def test_on_conflict_replace(self):
1865 self.cu.execute("INSERT OR REPLACE INTO test(name, unique_name) VALUES ('Data!', 'foo')")
1866 # There shouldn't be an IntegrityError exception.
1867 self.cu.execute("INSERT OR REPLACE INTO test(name, unique_name) VALUES ('Very different data!', 'foo')")
1868 self.cu.execute("SELECT name, unique_name FROM test")
1869 self.assertEqual(self.cu.fetchall(), [('Very different data!', 'foo')])
1870
1871
1872 @requires_subprocess()
1873 class ESC[4;38;5;81mMultiprocessTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1874 CONNECTION_TIMEOUT = 0 # Disable the busy timeout.
1875
1876 def tearDown(self):
1877 unlink(TESTFN)
1878
1879 def test_ctx_mgr_rollback_if_commit_failed(self):
1880 # bpo-27334: ctx manager does not rollback if commit fails
1881 SCRIPT = f"""if 1:
1882 import sqlite3
1883 def wait():
1884 print("started")
1885 assert "database is locked" in input()
1886
1887 cx = sqlite3.connect("{TESTFN}", timeout={self.CONNECTION_TIMEOUT})
1888 cx.create_function("wait", 0, wait)
1889 with cx:
1890 cx.execute("create table t(t)")
1891 try:
1892 # execute two transactions; both will try to lock the db
1893 cx.executescript('''
1894 -- start a transaction and wait for parent
1895 begin transaction;
1896 select * from t;
1897 select wait();
1898 rollback;
1899
1900 -- start a new transaction; would fail if parent holds lock
1901 begin transaction;
1902 select * from t;
1903 rollback;
1904 ''')
1905 finally:
1906 cx.close()
1907 """
1908
1909 # spawn child process
1910 proc = subprocess.Popen(
1911 [sys.executable, "-c", SCRIPT],
1912 encoding="utf-8",
1913 bufsize=0,
1914 stdin=subprocess.PIPE,
1915 stdout=subprocess.PIPE,
1916 )
1917 self.addCleanup(proc.communicate)
1918
1919 # wait for child process to start
1920 self.assertEqual("started", proc.stdout.readline().strip())
1921
1922 cx = sqlite.connect(TESTFN, timeout=self.CONNECTION_TIMEOUT)
1923 try: # context manager should correctly release the db lock
1924 with cx:
1925 cx.execute("insert into t values('test')")
1926 except sqlite.OperationalError as exc:
1927 proc.stdin.write(str(exc))
1928 else:
1929 proc.stdin.write("no error")
1930 finally:
1931 cx.close()
1932
1933 # terminate child process
1934 self.assertIsNone(proc.returncode)
1935 try:
1936 proc.communicate(input="end", timeout=SHORT_TIMEOUT)
1937 except subprocess.TimeoutExpired:
1938 proc.kill()
1939 proc.communicate()
1940 raise
1941 self.assertEqual(proc.returncode, 0)
1942
1943
1944 if __name__ == "__main__":
1945 unittest.main()