1 # pysqlite2/test/dbapi.py: tests for DB-API compliance
2 #
3 # Copyright (C) 2004-2010 Gerhard Häring <gh@ghaering.de>
4 #
5 # This file is part of pysqlite.
6 #
7 # This software is provided 'as-is', without any express or implied
8 # warranty. In no event will the authors be held liable for any damages
9 # arising from the use of this software.
10 #
11 # Permission is granted to anyone to use this software for any purpose,
12 # including commercial applications, and to alter it and redistribute it
13 # freely, subject to the following restrictions:
14 #
15 # 1. The origin of this software must not be misrepresented; you must not
16 # claim that you wrote the original software. If you use this software
17 # in a product, an acknowledgment in the product documentation would be
18 # appreciated but is not required.
19 # 2. Altered source versions must be plainly marked as such, and must not be
20 # misrepresented as being the original software.
21 # 3. This notice may not be removed or altered from any source distribution.
22
23 import contextlib
24 import os
25 import sqlite3 as sqlite
26 import subprocess
27 import sys
28 import threading
29 import unittest
30 import urllib.parse
31
32 from test.support import (
33 SHORT_TIMEOUT, bigmemtest, check_disallow_instantiation, requires_subprocess,
34 is_emscripten, is_wasi
35 )
36 from test.support import threading_helper
37 from _testcapi import INT_MAX, ULLONG_MAX
38 from os import SEEK_SET, SEEK_CUR, SEEK_END
39 from test.support.os_helper import TESTFN, TESTFN_UNDECODABLE, unlink, temp_dir, FakePath
40
41
42 # Helper for temporary memory databases
43 def memory_database(*args, **kwargs):
44 cx = sqlite.connect(":memory:", *args, **kwargs)
45 return contextlib.closing(cx)
46
47
48 # Temporarily limit a database connection parameter
49 @contextlib.contextmanager
50 def cx_limit(cx, category=sqlite.SQLITE_LIMIT_SQL_LENGTH, limit=128):
51 try:
52 _prev = cx.setlimit(category, limit)
53 yield limit
54 finally:
55 cx.setlimit(category, _prev)
56
57
58 class ESC[4;38;5;81mModuleTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
59 def test_api_level(self):
60 self.assertEqual(sqlite.apilevel, "2.0",
61 "apilevel is %s, should be 2.0" % sqlite.apilevel)
62
63 def test_thread_safety(self):
64 self.assertIn(sqlite.threadsafety, {0, 1, 3},
65 "threadsafety is %d, should be 0, 1 or 3" %
66 sqlite.threadsafety)
67
68 def test_param_style(self):
69 self.assertEqual(sqlite.paramstyle, "qmark",
70 "paramstyle is '%s', should be 'qmark'" %
71 sqlite.paramstyle)
72
73 def test_warning(self):
74 self.assertTrue(issubclass(sqlite.Warning, Exception),
75 "Warning is not a subclass of Exception")
76
77 def test_error(self):
78 self.assertTrue(issubclass(sqlite.Error, Exception),
79 "Error is not a subclass of Exception")
80
81 def test_interface_error(self):
82 self.assertTrue(issubclass(sqlite.InterfaceError, sqlite.Error),
83 "InterfaceError is not a subclass of Error")
84
85 def test_database_error(self):
86 self.assertTrue(issubclass(sqlite.DatabaseError, sqlite.Error),
87 "DatabaseError is not a subclass of Error")
88
89 def test_data_error(self):
90 self.assertTrue(issubclass(sqlite.DataError, sqlite.DatabaseError),
91 "DataError is not a subclass of DatabaseError")
92
93 def test_operational_error(self):
94 self.assertTrue(issubclass(sqlite.OperationalError, sqlite.DatabaseError),
95 "OperationalError is not a subclass of DatabaseError")
96
97 def test_integrity_error(self):
98 self.assertTrue(issubclass(sqlite.IntegrityError, sqlite.DatabaseError),
99 "IntegrityError is not a subclass of DatabaseError")
100
101 def test_internal_error(self):
102 self.assertTrue(issubclass(sqlite.InternalError, sqlite.DatabaseError),
103 "InternalError is not a subclass of DatabaseError")
104
105 def test_programming_error(self):
106 self.assertTrue(issubclass(sqlite.ProgrammingError, sqlite.DatabaseError),
107 "ProgrammingError is not a subclass of DatabaseError")
108
109 def test_not_supported_error(self):
110 self.assertTrue(issubclass(sqlite.NotSupportedError,
111 sqlite.DatabaseError),
112 "NotSupportedError is not a subclass of DatabaseError")
113
114 def test_module_constants(self):
115 consts = [
116 "SQLITE_ABORT",
117 "SQLITE_ALTER_TABLE",
118 "SQLITE_ANALYZE",
119 "SQLITE_ATTACH",
120 "SQLITE_AUTH",
121 "SQLITE_BUSY",
122 "SQLITE_CANTOPEN",
123 "SQLITE_CONSTRAINT",
124 "SQLITE_CORRUPT",
125 "SQLITE_CREATE_INDEX",
126 "SQLITE_CREATE_TABLE",
127 "SQLITE_CREATE_TEMP_INDEX",
128 "SQLITE_CREATE_TEMP_TABLE",
129 "SQLITE_CREATE_TEMP_TRIGGER",
130 "SQLITE_CREATE_TEMP_VIEW",
131 "SQLITE_CREATE_TRIGGER",
132 "SQLITE_CREATE_VIEW",
133 "SQLITE_CREATE_VTABLE",
134 "SQLITE_DELETE",
135 "SQLITE_DENY",
136 "SQLITE_DETACH",
137 "SQLITE_DONE",
138 "SQLITE_DROP_INDEX",
139 "SQLITE_DROP_TABLE",
140 "SQLITE_DROP_TEMP_INDEX",
141 "SQLITE_DROP_TEMP_TABLE",
142 "SQLITE_DROP_TEMP_TRIGGER",
143 "SQLITE_DROP_TEMP_VIEW",
144 "SQLITE_DROP_TRIGGER",
145 "SQLITE_DROP_VIEW",
146 "SQLITE_DROP_VTABLE",
147 "SQLITE_EMPTY",
148 "SQLITE_ERROR",
149 "SQLITE_FORMAT",
150 "SQLITE_FULL",
151 "SQLITE_FUNCTION",
152 "SQLITE_IGNORE",
153 "SQLITE_INSERT",
154 "SQLITE_INTERNAL",
155 "SQLITE_INTERRUPT",
156 "SQLITE_IOERR",
157 "SQLITE_LOCKED",
158 "SQLITE_MISMATCH",
159 "SQLITE_MISUSE",
160 "SQLITE_NOLFS",
161 "SQLITE_NOMEM",
162 "SQLITE_NOTADB",
163 "SQLITE_NOTFOUND",
164 "SQLITE_OK",
165 "SQLITE_PERM",
166 "SQLITE_PRAGMA",
167 "SQLITE_PROTOCOL",
168 "SQLITE_RANGE",
169 "SQLITE_READ",
170 "SQLITE_READONLY",
171 "SQLITE_REINDEX",
172 "SQLITE_ROW",
173 "SQLITE_SAVEPOINT",
174 "SQLITE_SCHEMA",
175 "SQLITE_SELECT",
176 "SQLITE_TOOBIG",
177 "SQLITE_TRANSACTION",
178 "SQLITE_UPDATE",
179 # Run-time limit categories
180 "SQLITE_LIMIT_LENGTH",
181 "SQLITE_LIMIT_SQL_LENGTH",
182 "SQLITE_LIMIT_COLUMN",
183 "SQLITE_LIMIT_EXPR_DEPTH",
184 "SQLITE_LIMIT_COMPOUND_SELECT",
185 "SQLITE_LIMIT_VDBE_OP",
186 "SQLITE_LIMIT_FUNCTION_ARG",
187 "SQLITE_LIMIT_ATTACHED",
188 "SQLITE_LIMIT_LIKE_PATTERN_LENGTH",
189 "SQLITE_LIMIT_VARIABLE_NUMBER",
190 "SQLITE_LIMIT_TRIGGER_DEPTH",
191 ]
192 if sqlite.sqlite_version_info >= (3, 7, 17):
193 consts += ["SQLITE_NOTICE", "SQLITE_WARNING"]
194 if sqlite.sqlite_version_info >= (3, 8, 3):
195 consts.append("SQLITE_RECURSIVE")
196 if sqlite.sqlite_version_info >= (3, 8, 7):
197 consts.append("SQLITE_LIMIT_WORKER_THREADS")
198 consts += ["PARSE_DECLTYPES", "PARSE_COLNAMES"]
199 # Extended result codes
200 consts += [
201 "SQLITE_ABORT_ROLLBACK",
202 "SQLITE_BUSY_RECOVERY",
203 "SQLITE_CANTOPEN_FULLPATH",
204 "SQLITE_CANTOPEN_ISDIR",
205 "SQLITE_CANTOPEN_NOTEMPDIR",
206 "SQLITE_CORRUPT_VTAB",
207 "SQLITE_IOERR_ACCESS",
208 "SQLITE_IOERR_BLOCKED",
209 "SQLITE_IOERR_CHECKRESERVEDLOCK",
210 "SQLITE_IOERR_CLOSE",
211 "SQLITE_IOERR_DELETE",
212 "SQLITE_IOERR_DELETE_NOENT",
213 "SQLITE_IOERR_DIR_CLOSE",
214 "SQLITE_IOERR_DIR_FSYNC",
215 "SQLITE_IOERR_FSTAT",
216 "SQLITE_IOERR_FSYNC",
217 "SQLITE_IOERR_LOCK",
218 "SQLITE_IOERR_NOMEM",
219 "SQLITE_IOERR_RDLOCK",
220 "SQLITE_IOERR_READ",
221 "SQLITE_IOERR_SEEK",
222 "SQLITE_IOERR_SHMLOCK",
223 "SQLITE_IOERR_SHMMAP",
224 "SQLITE_IOERR_SHMOPEN",
225 "SQLITE_IOERR_SHMSIZE",
226 "SQLITE_IOERR_SHORT_READ",
227 "SQLITE_IOERR_TRUNCATE",
228 "SQLITE_IOERR_UNLOCK",
229 "SQLITE_IOERR_WRITE",
230 "SQLITE_LOCKED_SHAREDCACHE",
231 "SQLITE_READONLY_CANTLOCK",
232 "SQLITE_READONLY_RECOVERY",
233 ]
234 if sqlite.sqlite_version_info >= (3, 7, 16):
235 consts += [
236 "SQLITE_CONSTRAINT_CHECK",
237 "SQLITE_CONSTRAINT_COMMITHOOK",
238 "SQLITE_CONSTRAINT_FOREIGNKEY",
239 "SQLITE_CONSTRAINT_FUNCTION",
240 "SQLITE_CONSTRAINT_NOTNULL",
241 "SQLITE_CONSTRAINT_PRIMARYKEY",
242 "SQLITE_CONSTRAINT_TRIGGER",
243 "SQLITE_CONSTRAINT_UNIQUE",
244 "SQLITE_CONSTRAINT_VTAB",
245 "SQLITE_READONLY_ROLLBACK",
246 ]
247 if sqlite.sqlite_version_info >= (3, 7, 17):
248 consts += [
249 "SQLITE_IOERR_MMAP",
250 "SQLITE_NOTICE_RECOVER_ROLLBACK",
251 "SQLITE_NOTICE_RECOVER_WAL",
252 ]
253 if sqlite.sqlite_version_info >= (3, 8, 0):
254 consts += [
255 "SQLITE_BUSY_SNAPSHOT",
256 "SQLITE_IOERR_GETTEMPPATH",
257 "SQLITE_WARNING_AUTOINDEX",
258 ]
259 if sqlite.sqlite_version_info >= (3, 8, 1):
260 consts += ["SQLITE_CANTOPEN_CONVPATH", "SQLITE_IOERR_CONVPATH"]
261 if sqlite.sqlite_version_info >= (3, 8, 2):
262 consts.append("SQLITE_CONSTRAINT_ROWID")
263 if sqlite.sqlite_version_info >= (3, 8, 3):
264 consts.append("SQLITE_READONLY_DBMOVED")
265 if sqlite.sqlite_version_info >= (3, 8, 7):
266 consts.append("SQLITE_AUTH_USER")
267 if sqlite.sqlite_version_info >= (3, 9, 0):
268 consts.append("SQLITE_IOERR_VNODE")
269 if sqlite.sqlite_version_info >= (3, 10, 0):
270 consts.append("SQLITE_IOERR_AUTH")
271 if sqlite.sqlite_version_info >= (3, 14, 1):
272 consts.append("SQLITE_OK_LOAD_PERMANENTLY")
273 if sqlite.sqlite_version_info >= (3, 21, 0):
274 consts += [
275 "SQLITE_IOERR_BEGIN_ATOMIC",
276 "SQLITE_IOERR_COMMIT_ATOMIC",
277 "SQLITE_IOERR_ROLLBACK_ATOMIC",
278 ]
279 if sqlite.sqlite_version_info >= (3, 22, 0):
280 consts += [
281 "SQLITE_ERROR_MISSING_COLLSEQ",
282 "SQLITE_ERROR_RETRY",
283 "SQLITE_READONLY_CANTINIT",
284 "SQLITE_READONLY_DIRECTORY",
285 ]
286 if sqlite.sqlite_version_info >= (3, 24, 0):
287 consts += ["SQLITE_CORRUPT_SEQUENCE", "SQLITE_LOCKED_VTAB"]
288 if sqlite.sqlite_version_info >= (3, 25, 0):
289 consts += ["SQLITE_CANTOPEN_DIRTYWAL", "SQLITE_ERROR_SNAPSHOT"]
290 if sqlite.sqlite_version_info >= (3, 31, 0):
291 consts += [
292 "SQLITE_CANTOPEN_SYMLINK",
293 "SQLITE_CONSTRAINT_PINNED",
294 "SQLITE_OK_SYMLINK",
295 ]
296 if sqlite.sqlite_version_info >= (3, 32, 0):
297 consts += [
298 "SQLITE_BUSY_TIMEOUT",
299 "SQLITE_CORRUPT_INDEX",
300 "SQLITE_IOERR_DATA",
301 ]
302 if sqlite.sqlite_version_info >= (3, 34, 0):
303 consts.append("SQLITE_IOERR_CORRUPTFS")
304 for const in consts:
305 with self.subTest(const=const):
306 self.assertTrue(hasattr(sqlite, const))
307
308 def test_error_code_on_exception(self):
309 err_msg = "unable to open database file"
310 if sys.platform.startswith("win"):
311 err_code = sqlite.SQLITE_CANTOPEN_ISDIR
312 else:
313 err_code = sqlite.SQLITE_CANTOPEN
314
315 with temp_dir() as db:
316 with self.assertRaisesRegex(sqlite.Error, err_msg) as cm:
317 sqlite.connect(db)
318 e = cm.exception
319 self.assertEqual(e.sqlite_errorcode, err_code)
320 self.assertTrue(e.sqlite_errorname.startswith("SQLITE_CANTOPEN"))
321
322 @unittest.skipIf(sqlite.sqlite_version_info <= (3, 7, 16),
323 "Requires SQLite 3.7.16 or newer")
324 def test_extended_error_code_on_exception(self):
325 with memory_database() as con:
326 with con:
327 con.execute("create table t(t integer check(t > 0))")
328 errmsg = "constraint failed"
329 with self.assertRaisesRegex(sqlite.IntegrityError, errmsg) as cm:
330 con.execute("insert into t values(-1)")
331 exc = cm.exception
332 self.assertEqual(exc.sqlite_errorcode,
333 sqlite.SQLITE_CONSTRAINT_CHECK)
334 self.assertEqual(exc.sqlite_errorname, "SQLITE_CONSTRAINT_CHECK")
335
336 # sqlite3_enable_shared_cache() is deprecated on macOS and calling it may raise
337 # OperationalError on some buildbots.
338 @unittest.skipIf(sys.platform == "darwin", "shared cache is deprecated on macOS")
339 def test_shared_cache_deprecated(self):
340 for enable in (True, False):
341 with self.assertWarns(DeprecationWarning) as cm:
342 sqlite.enable_shared_cache(enable)
343 self.assertIn("dbapi.py", cm.filename)
344
345 def test_disallow_instantiation(self):
346 cx = sqlite.connect(":memory:")
347 check_disallow_instantiation(self, type(cx("select 1")))
348 check_disallow_instantiation(self, sqlite.Blob)
349
350 def test_complete_statement(self):
351 self.assertFalse(sqlite.complete_statement("select t"))
352 self.assertTrue(sqlite.complete_statement("create table t(t);"))
353
354
355 class ESC[4;38;5;81mConnectionTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
356
357 def setUp(self):
358 self.cx = sqlite.connect(":memory:")
359 cu = self.cx.cursor()
360 cu.execute("create table test(id integer primary key, name text)")
361 cu.execute("insert into test(name) values (?)", ("foo",))
362
363 def tearDown(self):
364 self.cx.close()
365
366 def test_commit(self):
367 self.cx.commit()
368
369 def test_commit_after_no_changes(self):
370 """
371 A commit should also work when no changes were made to the database.
372 """
373 self.cx.commit()
374 self.cx.commit()
375
376 def test_rollback(self):
377 self.cx.rollback()
378
379 def test_rollback_after_no_changes(self):
380 """
381 A rollback should also work when no changes were made to the database.
382 """
383 self.cx.rollback()
384 self.cx.rollback()
385
386 def test_cursor(self):
387 cu = self.cx.cursor()
388
389 def test_failed_open(self):
390 YOU_CANNOT_OPEN_THIS = "/foo/bar/bla/23534/mydb.db"
391 with self.assertRaises(sqlite.OperationalError):
392 sqlite.connect(YOU_CANNOT_OPEN_THIS)
393
394 def test_close(self):
395 self.cx.close()
396
397 def test_use_after_close(self):
398 sql = "select 1"
399 cu = self.cx.cursor()
400 res = cu.execute(sql)
401 self.cx.close()
402 self.assertRaises(sqlite.ProgrammingError, res.fetchall)
403 self.assertRaises(sqlite.ProgrammingError, cu.execute, sql)
404 self.assertRaises(sqlite.ProgrammingError, cu.executemany, sql, [])
405 self.assertRaises(sqlite.ProgrammingError, cu.executescript, sql)
406 self.assertRaises(sqlite.ProgrammingError, self.cx.execute, sql)
407 self.assertRaises(sqlite.ProgrammingError,
408 self.cx.executemany, sql, [])
409 self.assertRaises(sqlite.ProgrammingError, self.cx.executescript, sql)
410 self.assertRaises(sqlite.ProgrammingError,
411 self.cx.create_function, "t", 1, lambda x: x)
412 self.assertRaises(sqlite.ProgrammingError, self.cx.cursor)
413 with self.assertRaises(sqlite.ProgrammingError):
414 with self.cx:
415 pass
416
417 def test_exceptions(self):
418 # Optional DB-API extension.
419 self.assertEqual(self.cx.Warning, sqlite.Warning)
420 self.assertEqual(self.cx.Error, sqlite.Error)
421 self.assertEqual(self.cx.InterfaceError, sqlite.InterfaceError)
422 self.assertEqual(self.cx.DatabaseError, sqlite.DatabaseError)
423 self.assertEqual(self.cx.DataError, sqlite.DataError)
424 self.assertEqual(self.cx.OperationalError, sqlite.OperationalError)
425 self.assertEqual(self.cx.IntegrityError, sqlite.IntegrityError)
426 self.assertEqual(self.cx.InternalError, sqlite.InternalError)
427 self.assertEqual(self.cx.ProgrammingError, sqlite.ProgrammingError)
428 self.assertEqual(self.cx.NotSupportedError, sqlite.NotSupportedError)
429
430 def test_in_transaction(self):
431 # Can't use db from setUp because we want to test initial state.
432 cx = sqlite.connect(":memory:")
433 cu = cx.cursor()
434 self.assertEqual(cx.in_transaction, False)
435 cu.execute("create table transactiontest(id integer primary key, name text)")
436 self.assertEqual(cx.in_transaction, False)
437 cu.execute("insert into transactiontest(name) values (?)", ("foo",))
438 self.assertEqual(cx.in_transaction, True)
439 cu.execute("select name from transactiontest where name=?", ["foo"])
440 row = cu.fetchone()
441 self.assertEqual(cx.in_transaction, True)
442 cx.commit()
443 self.assertEqual(cx.in_transaction, False)
444 cu.execute("select name from transactiontest where name=?", ["foo"])
445 row = cu.fetchone()
446 self.assertEqual(cx.in_transaction, False)
447
448 def test_in_transaction_ro(self):
449 with self.assertRaises(AttributeError):
450 self.cx.in_transaction = True
451
452 def test_connection_exceptions(self):
453 exceptions = [
454 "DataError",
455 "DatabaseError",
456 "Error",
457 "IntegrityError",
458 "InterfaceError",
459 "NotSupportedError",
460 "OperationalError",
461 "ProgrammingError",
462 "Warning",
463 ]
464 for exc in exceptions:
465 with self.subTest(exc=exc):
466 self.assertTrue(hasattr(self.cx, exc))
467 self.assertIs(getattr(sqlite, exc), getattr(self.cx, exc))
468
469 def test_interrupt_on_closed_db(self):
470 cx = sqlite.connect(":memory:")
471 cx.close()
472 with self.assertRaises(sqlite.ProgrammingError):
473 cx.interrupt()
474
475 def test_interrupt(self):
476 self.assertIsNone(self.cx.interrupt())
477
478 def test_drop_unused_refs(self):
479 for n in range(500):
480 cu = self.cx.execute(f"select {n}")
481 self.assertEqual(cu.fetchone()[0], n)
482
483 def test_connection_limits(self):
484 category = sqlite.SQLITE_LIMIT_SQL_LENGTH
485 saved_limit = self.cx.getlimit(category)
486 try:
487 new_limit = 10
488 prev_limit = self.cx.setlimit(category, new_limit)
489 self.assertEqual(saved_limit, prev_limit)
490 self.assertEqual(self.cx.getlimit(category), new_limit)
491 msg = "query string is too large"
492 self.assertRaisesRegex(sqlite.DataError, msg,
493 self.cx.execute, "select 1 as '16'")
494 finally: # restore saved limit
495 self.cx.setlimit(category, saved_limit)
496
497 def test_connection_bad_limit_category(self):
498 msg = "'category' is out of bounds"
499 cat = 1111
500 self.assertRaisesRegex(sqlite.ProgrammingError, msg,
501 self.cx.getlimit, cat)
502 self.assertRaisesRegex(sqlite.ProgrammingError, msg,
503 self.cx.setlimit, cat, 0)
504
505 def test_connection_init_bad_isolation_level(self):
506 msg = (
507 "isolation_level string must be '', 'DEFERRED', 'IMMEDIATE', or "
508 "'EXCLUSIVE'"
509 )
510 levels = (
511 "BOGUS",
512 " ",
513 "DEFERRE",
514 "IMMEDIAT",
515 "EXCLUSIV",
516 "DEFERREDS",
517 "IMMEDIATES",
518 "EXCLUSIVES",
519 )
520 for level in levels:
521 with self.subTest(level=level):
522 with self.assertRaisesRegex(ValueError, msg):
523 memory_database(isolation_level=level)
524 with memory_database() as cx:
525 with self.assertRaisesRegex(ValueError, msg):
526 cx.isolation_level = level
527 # Check that the default level is not changed
528 self.assertEqual(cx.isolation_level, "")
529
530 def test_connection_init_good_isolation_levels(self):
531 for level in ("", "DEFERRED", "IMMEDIATE", "EXCLUSIVE", None):
532 with self.subTest(level=level):
533 with memory_database(isolation_level=level) as cx:
534 self.assertEqual(cx.isolation_level, level)
535 with memory_database() as cx:
536 self.assertEqual(cx.isolation_level, "")
537 cx.isolation_level = level
538 self.assertEqual(cx.isolation_level, level)
539
540 def test_connection_reinit(self):
541 db = ":memory:"
542 cx = sqlite.connect(db)
543 cx.text_factory = bytes
544 cx.row_factory = sqlite.Row
545 cu = cx.cursor()
546 cu.execute("create table foo (bar)")
547 cu.executemany("insert into foo (bar) values (?)",
548 ((str(v),) for v in range(4)))
549 cu.execute("select bar from foo")
550
551 rows = [r for r in cu.fetchmany(2)]
552 self.assertTrue(all(isinstance(r, sqlite.Row) for r in rows))
553 self.assertEqual([r[0] for r in rows], [b"0", b"1"])
554
555 cx.__init__(db)
556 cx.execute("create table foo (bar)")
557 cx.executemany("insert into foo (bar) values (?)",
558 ((v,) for v in ("a", "b", "c", "d")))
559
560 # This uses the old database, old row factory, but new text factory
561 rows = [r for r in cu.fetchall()]
562 self.assertTrue(all(isinstance(r, sqlite.Row) for r in rows))
563 self.assertEqual([r[0] for r in rows], ["2", "3"])
564
565 def test_connection_bad_reinit(self):
566 cx = sqlite.connect(":memory:")
567 with cx:
568 cx.execute("create table t(t)")
569 with temp_dir() as db:
570 self.assertRaisesRegex(sqlite.OperationalError,
571 "unable to open database file",
572 cx.__init__, db)
573 self.assertRaisesRegex(sqlite.ProgrammingError,
574 "Base Connection.__init__ not called",
575 cx.executemany, "insert into t values(?)",
576 ((v,) for v in range(3)))
577
578
579 class ESC[4;38;5;81mUninitialisedConnectionTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
580 def setUp(self):
581 self.cx = sqlite.Connection.__new__(sqlite.Connection)
582
583 def test_uninit_operations(self):
584 funcs = (
585 lambda: self.cx.isolation_level,
586 lambda: self.cx.total_changes,
587 lambda: self.cx.in_transaction,
588 lambda: self.cx.iterdump(),
589 lambda: self.cx.cursor(),
590 lambda: self.cx.close(),
591 )
592 for func in funcs:
593 with self.subTest(func=func):
594 self.assertRaisesRegex(sqlite.ProgrammingError,
595 "Base Connection.__init__ not called",
596 func)
597
598
599 @unittest.skipUnless(hasattr(sqlite.Connection, "serialize"),
600 "Needs SQLite serialize API")
601 class ESC[4;38;5;81mSerializeTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
602 def test_serialize_deserialize(self):
603 with memory_database() as cx:
604 with cx:
605 cx.execute("create table t(t)")
606 data = cx.serialize()
607
608 # Remove test table, verify that it was removed.
609 with cx:
610 cx.execute("drop table t")
611 regex = "no such table"
612 with self.assertRaisesRegex(sqlite.OperationalError, regex):
613 cx.execute("select t from t")
614
615 # Deserialize and verify that test table is restored.
616 cx.deserialize(data)
617 cx.execute("select t from t")
618
619 def test_deserialize_wrong_args(self):
620 dataset = (
621 (BufferError, memoryview(b"blob")[::2]),
622 (TypeError, []),
623 (TypeError, 1),
624 (TypeError, None),
625 )
626 for exc, arg in dataset:
627 with self.subTest(exc=exc, arg=arg):
628 with memory_database() as cx:
629 self.assertRaises(exc, cx.deserialize, arg)
630
631 def test_deserialize_corrupt_database(self):
632 with memory_database() as cx:
633 regex = "file is not a database"
634 with self.assertRaisesRegex(sqlite.DatabaseError, regex):
635 cx.deserialize(b"\0\1\3")
636 # SQLite does not generate an error until you try to query the
637 # deserialized database.
638 cx.execute("create table fail(f)")
639
640 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
641 @bigmemtest(size=2**63, memuse=3, dry_run=False)
642 def test_deserialize_too_much_data_64bit(self):
643 with memory_database() as cx:
644 with self.assertRaisesRegex(OverflowError, "'data' is too large"):
645 cx.deserialize(b"b" * size)
646
647
648 class ESC[4;38;5;81mOpenTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
649 _sql = "create table test(id integer)"
650
651 def test_open_with_path_like_object(self):
652 """ Checks that we can successfully connect to a database using an object that
653 is PathLike, i.e. has __fspath__(). """
654 path = FakePath(TESTFN)
655 self.addCleanup(unlink, path)
656 self.assertFalse(os.path.exists(path))
657 with contextlib.closing(sqlite.connect(path)) as cx:
658 self.assertTrue(os.path.exists(path))
659 cx.execute(self._sql)
660
661 @unittest.skipIf(sys.platform == "win32", "skipped on Windows")
662 @unittest.skipIf(sys.platform == "darwin", "skipped on macOS")
663 @unittest.skipIf(is_emscripten or is_wasi, "not supported on Emscripten/WASI")
664 @unittest.skipUnless(TESTFN_UNDECODABLE, "only works if there are undecodable paths")
665 def test_open_with_undecodable_path(self):
666 path = TESTFN_UNDECODABLE
667 self.addCleanup(unlink, path)
668 self.assertFalse(os.path.exists(path))
669 with contextlib.closing(sqlite.connect(path)) as cx:
670 self.assertTrue(os.path.exists(path))
671 cx.execute(self._sql)
672
673 def test_open_uri(self):
674 path = TESTFN
675 self.addCleanup(unlink, path)
676 uri = "file:" + urllib.parse.quote(os.fsencode(path))
677 self.assertFalse(os.path.exists(path))
678 with contextlib.closing(sqlite.connect(uri, uri=True)) as cx:
679 self.assertTrue(os.path.exists(path))
680 cx.execute(self._sql)
681
682 def test_open_unquoted_uri(self):
683 path = TESTFN
684 self.addCleanup(unlink, path)
685 uri = "file:" + path
686 self.assertFalse(os.path.exists(path))
687 with contextlib.closing(sqlite.connect(uri, uri=True)) as cx:
688 self.assertTrue(os.path.exists(path))
689 cx.execute(self._sql)
690
691 def test_open_uri_readonly(self):
692 path = TESTFN
693 self.addCleanup(unlink, path)
694 uri = "file:" + urllib.parse.quote(os.fsencode(path)) + "?mode=ro"
695 self.assertFalse(os.path.exists(path))
696 # Cannot create new DB
697 with self.assertRaises(sqlite.OperationalError):
698 sqlite.connect(uri, uri=True)
699 self.assertFalse(os.path.exists(path))
700 sqlite.connect(path).close()
701 self.assertTrue(os.path.exists(path))
702 # Cannot modify new DB
703 with contextlib.closing(sqlite.connect(uri, uri=True)) as cx:
704 with self.assertRaises(sqlite.OperationalError):
705 cx.execute(self._sql)
706
707 @unittest.skipIf(sys.platform == "win32", "skipped on Windows")
708 @unittest.skipIf(sys.platform == "darwin", "skipped on macOS")
709 @unittest.skipIf(is_emscripten or is_wasi, "not supported on Emscripten/WASI")
710 @unittest.skipUnless(TESTFN_UNDECODABLE, "only works if there are undecodable paths")
711 def test_open_undecodable_uri(self):
712 path = TESTFN_UNDECODABLE
713 self.addCleanup(unlink, path)
714 uri = "file:" + urllib.parse.quote(path)
715 self.assertFalse(os.path.exists(path))
716 with contextlib.closing(sqlite.connect(uri, uri=True)) as cx:
717 self.assertTrue(os.path.exists(path))
718 cx.execute(self._sql)
719
720 def test_factory_database_arg(self):
721 def factory(database, *args, **kwargs):
722 nonlocal database_arg
723 database_arg = database
724 return sqlite.Connection(":memory:", *args, **kwargs)
725
726 for database in (TESTFN, os.fsencode(TESTFN),
727 FakePath(TESTFN), FakePath(os.fsencode(TESTFN))):
728 database_arg = None
729 sqlite.connect(database, factory=factory).close()
730 self.assertEqual(database_arg, database)
731
732 def test_database_keyword(self):
733 with contextlib.closing(sqlite.connect(database=":memory:")) as cx:
734 self.assertEqual(type(cx), sqlite.Connection)
735
736
737 class ESC[4;38;5;81mCursorTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
738 def setUp(self):
739 self.cx = sqlite.connect(":memory:")
740 self.cu = self.cx.cursor()
741 self.cu.execute(
742 "create table test(id integer primary key, name text, "
743 "income number, unique_test text unique)"
744 )
745 self.cu.execute("insert into test(name) values (?)", ("foo",))
746
747 def tearDown(self):
748 self.cu.close()
749 self.cx.close()
750
751 def test_execute_no_args(self):
752 self.cu.execute("delete from test")
753
754 def test_execute_illegal_sql(self):
755 with self.assertRaises(sqlite.OperationalError):
756 self.cu.execute("select asdf")
757
758 def test_execute_multiple_statements(self):
759 msg = "You can only execute one statement at a time"
760 dataset = (
761 "select 1; select 2",
762 "select 1; // c++ comments are not allowed",
763 "select 1; *not a comment",
764 "select 1; -*not a comment",
765 "select 1; /* */ a",
766 "select 1; /**/a",
767 "select 1; -",
768 "select 1; /",
769 "select 1; -\n- select 2",
770 """select 1;
771 -- comment
772 select 2
773 """,
774 )
775 for query in dataset:
776 with self.subTest(query=query):
777 with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
778 self.cu.execute(query)
779
780 def test_execute_with_appended_comments(self):
781 dataset = (
782 "select 1; -- foo bar",
783 "select 1; --",
784 "select 1; /*", # Unclosed comments ending in \0 are skipped.
785 """
786 select 5+4;
787
788 /*
789 foo
790 */
791 """,
792 )
793 for query in dataset:
794 with self.subTest(query=query):
795 self.cu.execute(query)
796
797 def test_execute_wrong_sql_arg(self):
798 with self.assertRaises(TypeError):
799 self.cu.execute(42)
800
801 def test_execute_arg_int(self):
802 self.cu.execute("insert into test(id) values (?)", (42,))
803
804 def test_execute_arg_float(self):
805 self.cu.execute("insert into test(income) values (?)", (2500.32,))
806
807 def test_execute_arg_string(self):
808 self.cu.execute("insert into test(name) values (?)", ("Hugo",))
809
810 def test_execute_arg_string_with_zero_byte(self):
811 self.cu.execute("insert into test(name) values (?)", ("Hu\x00go",))
812
813 self.cu.execute("select name from test where id=?", (self.cu.lastrowid,))
814 row = self.cu.fetchone()
815 self.assertEqual(row[0], "Hu\x00go")
816
817 def test_execute_non_iterable(self):
818 with self.assertRaises(sqlite.ProgrammingError) as cm:
819 self.cu.execute("insert into test(id) values (?)", 42)
820 self.assertEqual(str(cm.exception), 'parameters are of unsupported type')
821
822 def test_execute_wrong_no_of_args1(self):
823 # too many parameters
824 with self.assertRaises(sqlite.ProgrammingError):
825 self.cu.execute("insert into test(id) values (?)", (17, "Egon"))
826
827 def test_execute_wrong_no_of_args2(self):
828 # too little parameters
829 with self.assertRaises(sqlite.ProgrammingError):
830 self.cu.execute("insert into test(id) values (?)")
831
832 def test_execute_wrong_no_of_args3(self):
833 # no parameters, parameters are needed
834 with self.assertRaises(sqlite.ProgrammingError):
835 self.cu.execute("insert into test(id) values (?)")
836
837 def test_execute_param_list(self):
838 self.cu.execute("insert into test(name) values ('foo')")
839 self.cu.execute("select name from test where name=?", ["foo"])
840 row = self.cu.fetchone()
841 self.assertEqual(row[0], "foo")
842
843 def test_execute_param_sequence(self):
844 class ESC[4;38;5;81mL:
845 def __len__(self):
846 return 1
847 def __getitem__(self, x):
848 assert x == 0
849 return "foo"
850
851 self.cu.execute("insert into test(name) values ('foo')")
852 self.cu.execute("select name from test where name=?", L())
853 row = self.cu.fetchone()
854 self.assertEqual(row[0], "foo")
855
856 def test_execute_param_sequence_bad_len(self):
857 # Issue41662: Error in __len__() was overridden with ProgrammingError.
858 class ESC[4;38;5;81mL:
859 def __len__(self):
860 1/0
861 def __getitem__(slf, x):
862 raise AssertionError
863
864 self.cu.execute("insert into test(name) values ('foo')")
865 with self.assertRaises(ZeroDivisionError):
866 self.cu.execute("select name from test where name=?", L())
867
868 def test_execute_too_many_params(self):
869 category = sqlite.SQLITE_LIMIT_VARIABLE_NUMBER
870 msg = "too many SQL variables"
871 with cx_limit(self.cx, category=category, limit=1):
872 self.cu.execute("select * from test where id=?", (1,))
873 with self.assertRaisesRegex(sqlite.OperationalError, msg):
874 self.cu.execute("select * from test where id!=? and id!=?",
875 (1, 2))
876
877 def test_execute_dict_mapping(self):
878 self.cu.execute("insert into test(name) values ('foo')")
879 self.cu.execute("select name from test where name=:name", {"name": "foo"})
880 row = self.cu.fetchone()
881 self.assertEqual(row[0], "foo")
882
883 def test_execute_dict_mapping_mapping(self):
884 class ESC[4;38;5;81mD(ESC[4;38;5;149mdict):
885 def __missing__(self, key):
886 return "foo"
887
888 self.cu.execute("insert into test(name) values ('foo')")
889 self.cu.execute("select name from test where name=:name", D())
890 row = self.cu.fetchone()
891 self.assertEqual(row[0], "foo")
892
893 def test_execute_dict_mapping_too_little_args(self):
894 self.cu.execute("insert into test(name) values ('foo')")
895 with self.assertRaises(sqlite.ProgrammingError):
896 self.cu.execute("select name from test where name=:name and id=:id", {"name": "foo"})
897
898 def test_execute_dict_mapping_no_args(self):
899 self.cu.execute("insert into test(name) values ('foo')")
900 with self.assertRaises(sqlite.ProgrammingError):
901 self.cu.execute("select name from test where name=:name")
902
903 def test_execute_dict_mapping_unnamed(self):
904 self.cu.execute("insert into test(name) values ('foo')")
905 with self.assertRaises(sqlite.ProgrammingError):
906 self.cu.execute("select name from test where name=?", {"name": "foo"})
907
908 def test_close(self):
909 self.cu.close()
910
911 def test_rowcount_execute(self):
912 self.cu.execute("delete from test")
913 self.cu.execute("insert into test(name) values ('foo')")
914 self.cu.execute("insert into test(name) values ('foo')")
915 self.cu.execute("update test set name='bar'")
916 self.assertEqual(self.cu.rowcount, 2)
917
918 def test_rowcount_select(self):
919 """
920 pysqlite does not know the rowcount of SELECT statements, because we
921 don't fetch all rows after executing the select statement. The rowcount
922 has thus to be -1.
923 """
924 self.cu.execute("select 5 union select 6")
925 self.assertEqual(self.cu.rowcount, -1)
926
927 def test_rowcount_executemany(self):
928 self.cu.execute("delete from test")
929 self.cu.executemany("insert into test(name) values (?)", [(1,), (2,), (3,)])
930 self.assertEqual(self.cu.rowcount, 3)
931
932 @unittest.skipIf(sqlite.sqlite_version_info < (3, 35, 0),
933 "Requires SQLite 3.35.0 or newer")
934 def test_rowcount_update_returning(self):
935 # gh-93421: rowcount is updated correctly for UPDATE...RETURNING queries
936 self.cu.execute("update test set name='bar' where name='foo' returning 1")
937 self.assertEqual(self.cu.fetchone()[0], 1)
938 self.assertEqual(self.cu.rowcount, 1)
939
940 def test_rowcount_prefixed_with_comment(self):
941 # gh-79579: rowcount is updated even if query is prefixed with comments
942 self.cu.execute("""
943 -- foo
944 insert into test(name) values ('foo'), ('foo')
945 """)
946 self.assertEqual(self.cu.rowcount, 2)
947 self.cu.execute("""
948 /* -- messy *r /* /* ** *- *--
949 */
950 /* one more */ insert into test(name) values ('messy')
951 """)
952 self.assertEqual(self.cu.rowcount, 1)
953 self.cu.execute("/* bar */ update test set name='bar' where name='foo'")
954 self.assertEqual(self.cu.rowcount, 3)
955
956 def test_rowcount_vaccuum(self):
957 data = ((1,), (2,), (3,))
958 self.cu.executemany("insert into test(income) values(?)", data)
959 self.assertEqual(self.cu.rowcount, 3)
960 self.cx.commit()
961 self.cu.execute("vacuum")
962 self.assertEqual(self.cu.rowcount, -1)
963
964 def test_total_changes(self):
965 self.cu.execute("insert into test(name) values ('foo')")
966 self.cu.execute("insert into test(name) values ('foo')")
967 self.assertLess(2, self.cx.total_changes, msg='total changes reported wrong value')
968
969 # Checks for executemany:
970 # Sequences are required by the DB-API, iterators
971 # enhancements in pysqlite.
972
973 def test_execute_many_sequence(self):
974 self.cu.executemany("insert into test(income) values (?)", [(x,) for x in range(100, 110)])
975
976 def test_execute_many_iterator(self):
977 class ESC[4;38;5;81mMyIter:
978 def __init__(self):
979 self.value = 5
980
981 def __iter__(self):
982 return self
983
984 def __next__(self):
985 if self.value == 10:
986 raise StopIteration
987 else:
988 self.value += 1
989 return (self.value,)
990
991 self.cu.executemany("insert into test(income) values (?)", MyIter())
992
993 def test_execute_many_generator(self):
994 def mygen():
995 for i in range(5):
996 yield (i,)
997
998 self.cu.executemany("insert into test(income) values (?)", mygen())
999
1000 def test_execute_many_wrong_sql_arg(self):
1001 with self.assertRaises(TypeError):
1002 self.cu.executemany(42, [(3,)])
1003
1004 def test_execute_many_select(self):
1005 with self.assertRaises(sqlite.ProgrammingError):
1006 self.cu.executemany("select ?", [(3,)])
1007
1008 def test_execute_many_not_iterable(self):
1009 with self.assertRaises(TypeError):
1010 self.cu.executemany("insert into test(income) values (?)", 42)
1011
1012 def test_fetch_iter(self):
1013 # Optional DB-API extension.
1014 self.cu.execute("delete from test")
1015 self.cu.execute("insert into test(id) values (?)", (5,))
1016 self.cu.execute("insert into test(id) values (?)", (6,))
1017 self.cu.execute("select id from test order by id")
1018 lst = []
1019 for row in self.cu:
1020 lst.append(row[0])
1021 self.assertEqual(lst[0], 5)
1022 self.assertEqual(lst[1], 6)
1023
1024 def test_fetchone(self):
1025 self.cu.execute("select name from test")
1026 row = self.cu.fetchone()
1027 self.assertEqual(row[0], "foo")
1028 row = self.cu.fetchone()
1029 self.assertEqual(row, None)
1030
1031 def test_fetchone_no_statement(self):
1032 cur = self.cx.cursor()
1033 row = cur.fetchone()
1034 self.assertEqual(row, None)
1035
1036 def test_array_size(self):
1037 # must default to 1
1038 self.assertEqual(self.cu.arraysize, 1)
1039
1040 # now set to 2
1041 self.cu.arraysize = 2
1042
1043 # now make the query return 3 rows
1044 self.cu.execute("delete from test")
1045 self.cu.execute("insert into test(name) values ('A')")
1046 self.cu.execute("insert into test(name) values ('B')")
1047 self.cu.execute("insert into test(name) values ('C')")
1048 self.cu.execute("select name from test")
1049 res = self.cu.fetchmany()
1050
1051 self.assertEqual(len(res), 2)
1052
1053 def test_fetchmany(self):
1054 self.cu.execute("select name from test")
1055 res = self.cu.fetchmany(100)
1056 self.assertEqual(len(res), 1)
1057 res = self.cu.fetchmany(100)
1058 self.assertEqual(res, [])
1059
1060 def test_fetchmany_kw_arg(self):
1061 """Checks if fetchmany works with keyword arguments"""
1062 self.cu.execute("select name from test")
1063 res = self.cu.fetchmany(size=100)
1064 self.assertEqual(len(res), 1)
1065
1066 def test_fetchall(self):
1067 self.cu.execute("select name from test")
1068 res = self.cu.fetchall()
1069 self.assertEqual(len(res), 1)
1070 res = self.cu.fetchall()
1071 self.assertEqual(res, [])
1072
1073 def test_setinputsizes(self):
1074 self.cu.setinputsizes([3, 4, 5])
1075
1076 def test_setoutputsize(self):
1077 self.cu.setoutputsize(5, 0)
1078
1079 def test_setoutputsize_no_column(self):
1080 self.cu.setoutputsize(42)
1081
1082 def test_cursor_connection(self):
1083 # Optional DB-API extension.
1084 self.assertEqual(self.cu.connection, self.cx)
1085
1086 def test_wrong_cursor_callable(self):
1087 with self.assertRaises(TypeError):
1088 def f(): pass
1089 cur = self.cx.cursor(f)
1090
1091 def test_cursor_wrong_class(self):
1092 class ESC[4;38;5;81mFoo: pass
1093 foo = Foo()
1094 with self.assertRaises(TypeError):
1095 cur = sqlite.Cursor(foo)
1096
1097 def test_last_row_id_on_replace(self):
1098 """
1099 INSERT OR REPLACE and REPLACE INTO should produce the same behavior.
1100 """
1101 sql = '{} INTO test(id, unique_test) VALUES (?, ?)'
1102 for statement in ('INSERT OR REPLACE', 'REPLACE'):
1103 with self.subTest(statement=statement):
1104 self.cu.execute(sql.format(statement), (1, 'foo'))
1105 self.assertEqual(self.cu.lastrowid, 1)
1106
1107 def test_last_row_id_on_ignore(self):
1108 self.cu.execute(
1109 "insert or ignore into test(unique_test) values (?)",
1110 ('test',))
1111 self.assertEqual(self.cu.lastrowid, 2)
1112 self.cu.execute(
1113 "insert or ignore into test(unique_test) values (?)",
1114 ('test',))
1115 self.assertEqual(self.cu.lastrowid, 2)
1116
1117 def test_last_row_id_insert_o_r(self):
1118 results = []
1119 for statement in ('FAIL', 'ABORT', 'ROLLBACK'):
1120 sql = 'INSERT OR {} INTO test(unique_test) VALUES (?)'
1121 with self.subTest(statement='INSERT OR {}'.format(statement)):
1122 self.cu.execute(sql.format(statement), (statement,))
1123 results.append((statement, self.cu.lastrowid))
1124 with self.assertRaises(sqlite.IntegrityError):
1125 self.cu.execute(sql.format(statement), (statement,))
1126 results.append((statement, self.cu.lastrowid))
1127 expected = [
1128 ('FAIL', 2), ('FAIL', 2),
1129 ('ABORT', 3), ('ABORT', 3),
1130 ('ROLLBACK', 4), ('ROLLBACK', 4),
1131 ]
1132 self.assertEqual(results, expected)
1133
1134 def test_column_count(self):
1135 # Check that column count is updated correctly for cached statements
1136 select = "select * from test"
1137 res = self.cu.execute(select)
1138 old_count = len(res.description)
1139 # Add a new column and execute the cached select query again
1140 self.cu.execute("alter table test add newcol")
1141 res = self.cu.execute(select)
1142 new_count = len(res.description)
1143 self.assertEqual(new_count - old_count, 1)
1144
1145 def test_same_query_in_multiple_cursors(self):
1146 cursors = [self.cx.execute("select 1") for _ in range(3)]
1147 for cu in cursors:
1148 self.assertEqual(cu.fetchall(), [(1,)])
1149
1150
1151 class ESC[4;38;5;81mBlobTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1152 def setUp(self):
1153 self.cx = sqlite.connect(":memory:")
1154 self.cx.execute("create table test(b blob)")
1155 self.data = b"this blob data string is exactly fifty bytes long!"
1156 self.cx.execute("insert into test(b) values (?)", (self.data,))
1157 self.blob = self.cx.blobopen("test", "b", 1)
1158
1159 def tearDown(self):
1160 self.blob.close()
1161 self.cx.close()
1162
1163 def test_blob_is_a_blob(self):
1164 self.assertIsInstance(self.blob, sqlite.Blob)
1165
1166 def test_blob_seek_and_tell(self):
1167 self.blob.seek(10)
1168 self.assertEqual(self.blob.tell(), 10)
1169
1170 self.blob.seek(10, SEEK_SET)
1171 self.assertEqual(self.blob.tell(), 10)
1172
1173 self.blob.seek(10, SEEK_CUR)
1174 self.assertEqual(self.blob.tell(), 20)
1175
1176 self.blob.seek(-10, SEEK_END)
1177 self.assertEqual(self.blob.tell(), 40)
1178
1179 def test_blob_seek_error(self):
1180 msg_oor = "offset out of blob range"
1181 msg_orig = "'origin' should be os.SEEK_SET, os.SEEK_CUR, or os.SEEK_END"
1182 msg_of = "seek offset results in overflow"
1183
1184 dataset = (
1185 (ValueError, msg_oor, lambda: self.blob.seek(1000)),
1186 (ValueError, msg_oor, lambda: self.blob.seek(-10)),
1187 (ValueError, msg_orig, lambda: self.blob.seek(10, -1)),
1188 (ValueError, msg_orig, lambda: self.blob.seek(10, 3)),
1189 )
1190 for exc, msg, fn in dataset:
1191 with self.subTest(exc=exc, msg=msg, fn=fn):
1192 self.assertRaisesRegex(exc, msg, fn)
1193
1194 # Force overflow errors
1195 self.blob.seek(1, SEEK_SET)
1196 with self.assertRaisesRegex(OverflowError, msg_of):
1197 self.blob.seek(INT_MAX, SEEK_CUR)
1198 with self.assertRaisesRegex(OverflowError, msg_of):
1199 self.blob.seek(INT_MAX, SEEK_END)
1200
1201 def test_blob_read(self):
1202 buf = self.blob.read()
1203 self.assertEqual(buf, self.data)
1204
1205 def test_blob_read_oversized(self):
1206 buf = self.blob.read(len(self.data) * 2)
1207 self.assertEqual(buf, self.data)
1208
1209 def test_blob_read_advance_offset(self):
1210 n = 10
1211 buf = self.blob.read(n)
1212 self.assertEqual(buf, self.data[:n])
1213 self.assertEqual(self.blob.tell(), n)
1214
1215 def test_blob_read_at_offset(self):
1216 self.blob.seek(10)
1217 self.assertEqual(self.blob.read(10), self.data[10:20])
1218
1219 def test_blob_read_error_row_changed(self):
1220 self.cx.execute("update test set b='aaaa' where rowid=1")
1221 with self.assertRaises(sqlite.OperationalError):
1222 self.blob.read()
1223
1224 def test_blob_write(self):
1225 new_data = b"new data".ljust(50)
1226 self.blob.write(new_data)
1227 row = self.cx.execute("select b from test").fetchone()
1228 self.assertEqual(row[0], new_data)
1229
1230 def test_blob_write_at_offset(self):
1231 new_data = b"c" * 25
1232 self.blob.seek(25)
1233 self.blob.write(new_data)
1234 row = self.cx.execute("select b from test").fetchone()
1235 self.assertEqual(row[0], self.data[:25] + new_data)
1236
1237 def test_blob_write_advance_offset(self):
1238 self.blob.write(b"d"*10)
1239 self.assertEqual(self.blob.tell(), 10)
1240
1241 def test_blob_write_error_length(self):
1242 with self.assertRaisesRegex(ValueError, "data longer than blob"):
1243 self.blob.write(b"a" * 1000)
1244
1245 self.blob.seek(0, SEEK_SET)
1246 n = len(self.blob)
1247 self.blob.write(b"a" * (n-1))
1248 self.blob.write(b"a")
1249 with self.assertRaisesRegex(ValueError, "data longer than blob"):
1250 self.blob.write(b"a")
1251
1252 def test_blob_write_error_row_changed(self):
1253 self.cx.execute("update test set b='aaaa' where rowid=1")
1254 with self.assertRaises(sqlite.OperationalError):
1255 self.blob.write(b"aaa")
1256
1257 def test_blob_write_error_readonly(self):
1258 ro_blob = self.cx.blobopen("test", "b", 1, readonly=True)
1259 with self.assertRaisesRegex(sqlite.OperationalError, "readonly"):
1260 ro_blob.write(b"aaa")
1261 ro_blob.close()
1262
1263 def test_blob_open_error(self):
1264 dataset = (
1265 (("test", "b", 1), {"name": "notexisting"}),
1266 (("notexisting", "b", 1), {}),
1267 (("test", "notexisting", 1), {}),
1268 (("test", "b", 2), {}),
1269 )
1270 regex = "no such"
1271 for args, kwds in dataset:
1272 with self.subTest(args=args, kwds=kwds):
1273 with self.assertRaisesRegex(sqlite.OperationalError, regex):
1274 self.cx.blobopen(*args, **kwds)
1275
1276 def test_blob_length(self):
1277 self.assertEqual(len(self.blob), 50)
1278
1279 def test_blob_get_item(self):
1280 self.assertEqual(self.blob[5], ord("b"))
1281 self.assertEqual(self.blob[6], ord("l"))
1282 self.assertEqual(self.blob[7], ord("o"))
1283 self.assertEqual(self.blob[8], ord("b"))
1284 self.assertEqual(self.blob[-1], ord("!"))
1285
1286 def test_blob_set_item(self):
1287 self.blob[0] = ord("b")
1288 expected = b"b" + self.data[1:]
1289 actual = self.cx.execute("select b from test").fetchone()[0]
1290 self.assertEqual(actual, expected)
1291
1292 def test_blob_set_item_with_offset(self):
1293 self.blob.seek(0, SEEK_END)
1294 self.assertEqual(self.blob.read(), b"") # verify that we're at EOB
1295 self.blob[0] = ord("T")
1296 self.blob[-1] = ord(".")
1297 self.blob.seek(0, SEEK_SET)
1298 expected = b"This blob data string is exactly fifty bytes long."
1299 self.assertEqual(self.blob.read(), expected)
1300
1301 def test_blob_set_slice_buffer_object(self):
1302 from array import array
1303 self.blob[0:5] = memoryview(b"12345")
1304 self.assertEqual(self.blob[0:5], b"12345")
1305
1306 self.blob[0:5] = bytearray(b"23456")
1307 self.assertEqual(self.blob[0:5], b"23456")
1308
1309 self.blob[0:5] = array("b", [1, 2, 3, 4, 5])
1310 self.assertEqual(self.blob[0:5], b"\x01\x02\x03\x04\x05")
1311
1312 def test_blob_set_item_negative_index(self):
1313 self.blob[-1] = 255
1314 self.assertEqual(self.blob[-1], 255)
1315
1316 def test_blob_get_slice(self):
1317 self.assertEqual(self.blob[5:14], b"blob data")
1318
1319 def test_blob_get_empty_slice(self):
1320 self.assertEqual(self.blob[5:5], b"")
1321
1322 def test_blob_get_slice_negative_index(self):
1323 self.assertEqual(self.blob[5:-5], self.data[5:-5])
1324
1325 def test_blob_get_slice_with_skip(self):
1326 self.assertEqual(self.blob[0:10:2], b"ti lb")
1327
1328 def test_blob_set_slice(self):
1329 self.blob[0:5] = b"12345"
1330 expected = b"12345" + self.data[5:]
1331 actual = self.cx.execute("select b from test").fetchone()[0]
1332 self.assertEqual(actual, expected)
1333
1334 def test_blob_set_empty_slice(self):
1335 self.blob[0:0] = b""
1336 self.assertEqual(self.blob[:], self.data)
1337
1338 def test_blob_set_slice_with_skip(self):
1339 self.blob[0:10:2] = b"12345"
1340 actual = self.cx.execute("select b from test").fetchone()[0]
1341 expected = b"1h2s3b4o5 " + self.data[10:]
1342 self.assertEqual(actual, expected)
1343
1344 def test_blob_mapping_invalid_index_type(self):
1345 msg = "indices must be integers"
1346 with self.assertRaisesRegex(TypeError, msg):
1347 self.blob[5:5.5]
1348 with self.assertRaisesRegex(TypeError, msg):
1349 self.blob[1.5]
1350 with self.assertRaisesRegex(TypeError, msg):
1351 self.blob["a"] = b"b"
1352
1353 def test_blob_get_item_error(self):
1354 dataset = [len(self.blob), 105, -105]
1355 for idx in dataset:
1356 with self.subTest(idx=idx):
1357 with self.assertRaisesRegex(IndexError, "index out of range"):
1358 self.blob[idx]
1359 with self.assertRaisesRegex(IndexError, "cannot fit 'int'"):
1360 self.blob[ULLONG_MAX]
1361
1362 # Provoke read error
1363 self.cx.execute("update test set b='aaaa' where rowid=1")
1364 with self.assertRaises(sqlite.OperationalError):
1365 self.blob[0]
1366
1367 def test_blob_set_item_error(self):
1368 with self.assertRaisesRegex(TypeError, "cannot be interpreted"):
1369 self.blob[0] = b"multiple"
1370 with self.assertRaisesRegex(TypeError, "cannot be interpreted"):
1371 self.blob[0] = b"1"
1372 with self.assertRaisesRegex(TypeError, "cannot be interpreted"):
1373 self.blob[0] = bytearray(b"1")
1374 with self.assertRaisesRegex(TypeError, "doesn't support.*deletion"):
1375 del self.blob[0]
1376 with self.assertRaisesRegex(IndexError, "Blob index out of range"):
1377 self.blob[1000] = 0
1378 with self.assertRaisesRegex(ValueError, "must be in range"):
1379 self.blob[0] = -1
1380 with self.assertRaisesRegex(ValueError, "must be in range"):
1381 self.blob[0] = 256
1382 # Overflow errors are overridden with ValueError
1383 with self.assertRaisesRegex(ValueError, "must be in range"):
1384 self.blob[0] = 2**65
1385
1386 def test_blob_set_slice_error(self):
1387 with self.assertRaisesRegex(IndexError, "wrong size"):
1388 self.blob[5:10] = b"a"
1389 with self.assertRaisesRegex(IndexError, "wrong size"):
1390 self.blob[5:10] = b"a" * 1000
1391 with self.assertRaisesRegex(TypeError, "doesn't support.*deletion"):
1392 del self.blob[5:10]
1393 with self.assertRaisesRegex(ValueError, "step cannot be zero"):
1394 self.blob[5:10:0] = b"12345"
1395 with self.assertRaises(BufferError):
1396 self.blob[5:10] = memoryview(b"abcde")[::2]
1397
1398 def test_blob_sequence_not_supported(self):
1399 with self.assertRaisesRegex(TypeError, "unsupported operand"):
1400 self.blob + self.blob
1401 with self.assertRaisesRegex(TypeError, "unsupported operand"):
1402 self.blob * 5
1403 with self.assertRaisesRegex(TypeError, "is not iterable"):
1404 b"a" in self.blob
1405
1406 def test_blob_context_manager(self):
1407 data = b"a" * 50
1408 with self.cx.blobopen("test", "b", 1) as blob:
1409 blob.write(data)
1410 actual = self.cx.execute("select b from test").fetchone()[0]
1411 self.assertEqual(actual, data)
1412
1413 # Check that __exit__ closed the blob
1414 with self.assertRaisesRegex(sqlite.ProgrammingError, "closed blob"):
1415 blob.read()
1416
1417 def test_blob_context_manager_reraise_exceptions(self):
1418 class ESC[4;38;5;81mDummyException(ESC[4;38;5;149mException):
1419 pass
1420 with self.assertRaisesRegex(DummyException, "reraised"):
1421 with self.cx.blobopen("test", "b", 1) as blob:
1422 raise DummyException("reraised")
1423
1424
1425 def test_blob_closed(self):
1426 with memory_database() as cx:
1427 cx.execute("create table test(b blob)")
1428 cx.execute("insert into test values (zeroblob(100))")
1429 blob = cx.blobopen("test", "b", 1)
1430 blob.close()
1431
1432 msg = "Cannot operate on a closed blob"
1433 with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
1434 blob.read()
1435 with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
1436 blob.write(b"")
1437 with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
1438 blob.seek(0)
1439 with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
1440 blob.tell()
1441 with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
1442 blob.__enter__()
1443 with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
1444 blob.__exit__(None, None, None)
1445 with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
1446 len(blob)
1447 with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
1448 blob[0]
1449 with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
1450 blob[0:1]
1451 with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
1452 blob[0] = b""
1453
1454 def test_blob_closed_db_read(self):
1455 with memory_database() as cx:
1456 cx.execute("create table test(b blob)")
1457 cx.execute("insert into test(b) values (zeroblob(100))")
1458 blob = cx.blobopen("test", "b", 1)
1459 cx.close()
1460 self.assertRaisesRegex(sqlite.ProgrammingError,
1461 "Cannot operate on a closed database",
1462 blob.read)
1463
1464 def test_blob_32bit_rowid(self):
1465 # gh-100370: we should not get an OverflowError for 32-bit rowids
1466 with memory_database() as cx:
1467 rowid = 2**32
1468 cx.execute("create table t(t blob)")
1469 cx.execute("insert into t(rowid, t) values (?, zeroblob(1))", (rowid,))
1470 cx.blobopen('t', 't', rowid)
1471
1472
1473 @threading_helper.requires_working_threading()
1474 class ESC[4;38;5;81mThreadTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1475 def setUp(self):
1476 self.con = sqlite.connect(":memory:")
1477 self.cur = self.con.cursor()
1478 self.cur.execute("create table test(name text, b blob)")
1479 self.cur.execute("insert into test values('blob', zeroblob(1))")
1480
1481 def tearDown(self):
1482 self.cur.close()
1483 self.con.close()
1484
1485 @threading_helper.reap_threads
1486 def _run_test(self, fn, *args, **kwds):
1487 def run(err):
1488 try:
1489 fn(*args, **kwds)
1490 err.append("did not raise ProgrammingError")
1491 except sqlite.ProgrammingError:
1492 pass
1493 except:
1494 err.append("raised wrong exception")
1495
1496 err = []
1497 t = threading.Thread(target=run, kwargs={"err": err})
1498 t.start()
1499 t.join()
1500 if err:
1501 self.fail("\n".join(err))
1502
1503 def test_check_connection_thread(self):
1504 fns = [
1505 lambda: self.con.cursor(),
1506 lambda: self.con.commit(),
1507 lambda: self.con.rollback(),
1508 lambda: self.con.close(),
1509 lambda: self.con.set_trace_callback(None),
1510 lambda: self.con.set_authorizer(None),
1511 lambda: self.con.create_collation("foo", None),
1512 lambda: self.con.setlimit(sqlite.SQLITE_LIMIT_LENGTH, -1),
1513 lambda: self.con.getlimit(sqlite.SQLITE_LIMIT_LENGTH),
1514 lambda: self.con.blobopen("test", "b", 1),
1515 ]
1516 if hasattr(sqlite.Connection, "serialize"):
1517 fns.append(lambda: self.con.serialize())
1518 fns.append(lambda: self.con.deserialize(b""))
1519 if sqlite.sqlite_version_info >= (3, 25, 0):
1520 fns.append(lambda: self.con.create_window_function("foo", 0, None))
1521
1522 for fn in fns:
1523 with self.subTest(fn=fn):
1524 self._run_test(fn)
1525
1526 def test_check_cursor_thread(self):
1527 fns = [
1528 lambda: self.cur.execute("insert into test(name) values('a')"),
1529 lambda: self.cur.close(),
1530 lambda: self.cur.execute("select name from test"),
1531 lambda: self.cur.fetchone(),
1532 ]
1533 for fn in fns:
1534 with self.subTest(fn=fn):
1535 self._run_test(fn)
1536
1537
1538 @threading_helper.reap_threads
1539 def test_dont_check_same_thread(self):
1540 def run(con, err):
1541 try:
1542 con.execute("select 1")
1543 except sqlite.Error:
1544 err.append("multi-threading not allowed")
1545
1546 con = sqlite.connect(":memory:", check_same_thread=False)
1547 err = []
1548 t = threading.Thread(target=run, kwargs={"con": con, "err": err})
1549 t.start()
1550 t.join()
1551 self.assertEqual(len(err), 0, "\n".join(err))
1552
1553
1554 class ESC[4;38;5;81mConstructorTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1555 def test_date(self):
1556 d = sqlite.Date(2004, 10, 28)
1557
1558 def test_time(self):
1559 t = sqlite.Time(12, 39, 35)
1560
1561 def test_timestamp(self):
1562 ts = sqlite.Timestamp(2004, 10, 28, 12, 39, 35)
1563
1564 def test_date_from_ticks(self):
1565 d = sqlite.DateFromTicks(42)
1566
1567 def test_time_from_ticks(self):
1568 t = sqlite.TimeFromTicks(42)
1569
1570 def test_timestamp_from_ticks(self):
1571 ts = sqlite.TimestampFromTicks(42)
1572
1573 def test_binary(self):
1574 b = sqlite.Binary(b"\0'")
1575
1576 class ESC[4;38;5;81mExtensionTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1577 def test_script_string_sql(self):
1578 con = sqlite.connect(":memory:")
1579 cur = con.cursor()
1580 cur.executescript("""
1581 -- bla bla
1582 /* a stupid comment */
1583 create table a(i);
1584 insert into a(i) values (5);
1585 """)
1586 cur.execute("select i from a")
1587 res = cur.fetchone()[0]
1588 self.assertEqual(res, 5)
1589
1590 def test_script_syntax_error(self):
1591 con = sqlite.connect(":memory:")
1592 cur = con.cursor()
1593 with self.assertRaises(sqlite.OperationalError):
1594 cur.executescript("create table test(x); asdf; create table test2(x)")
1595
1596 def test_script_error_normal(self):
1597 con = sqlite.connect(":memory:")
1598 cur = con.cursor()
1599 with self.assertRaises(sqlite.OperationalError):
1600 cur.executescript("create table test(sadfsadfdsa); select foo from hurz;")
1601
1602 def test_cursor_executescript_as_bytes(self):
1603 con = sqlite.connect(":memory:")
1604 cur = con.cursor()
1605 with self.assertRaises(TypeError):
1606 cur.executescript(b"create table test(foo); insert into test(foo) values (5);")
1607
1608 def test_cursor_executescript_with_null_characters(self):
1609 con = sqlite.connect(":memory:")
1610 cur = con.cursor()
1611 with self.assertRaises(ValueError):
1612 cur.executescript("""
1613 create table a(i);\0
1614 insert into a(i) values (5);
1615 """)
1616
1617 def test_cursor_executescript_with_surrogates(self):
1618 con = sqlite.connect(":memory:")
1619 cur = con.cursor()
1620 with self.assertRaises(UnicodeEncodeError):
1621 cur.executescript("""
1622 create table a(s);
1623 insert into a(s) values ('\ud8ff');
1624 """)
1625
1626 def test_cursor_executescript_too_large_script(self):
1627 msg = "query string is too large"
1628 with memory_database() as cx, cx_limit(cx) as lim:
1629 cx.executescript("select 'almost too large'".ljust(lim))
1630 with self.assertRaisesRegex(sqlite.DataError, msg):
1631 cx.executescript("select 'too large'".ljust(lim+1))
1632
1633 def test_cursor_executescript_tx_control(self):
1634 con = sqlite.connect(":memory:")
1635 con.execute("begin")
1636 self.assertTrue(con.in_transaction)
1637 con.executescript("select 1")
1638 self.assertFalse(con.in_transaction)
1639
1640 def test_connection_execute(self):
1641 con = sqlite.connect(":memory:")
1642 result = con.execute("select 5").fetchone()[0]
1643 self.assertEqual(result, 5, "Basic test of Connection.execute")
1644
1645 def test_connection_executemany(self):
1646 con = sqlite.connect(":memory:")
1647 con.execute("create table test(foo)")
1648 con.executemany("insert into test(foo) values (?)", [(3,), (4,)])
1649 result = con.execute("select foo from test order by foo").fetchall()
1650 self.assertEqual(result[0][0], 3, "Basic test of Connection.executemany")
1651 self.assertEqual(result[1][0], 4, "Basic test of Connection.executemany")
1652
1653 def test_connection_executescript(self):
1654 con = sqlite.connect(":memory:")
1655 con.executescript("create table test(foo); insert into test(foo) values (5);")
1656 result = con.execute("select foo from test").fetchone()[0]
1657 self.assertEqual(result, 5, "Basic test of Connection.executescript")
1658
1659 class ESC[4;38;5;81mClosedConTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1660 def test_closed_con_cursor(self):
1661 con = sqlite.connect(":memory:")
1662 con.close()
1663 with self.assertRaises(sqlite.ProgrammingError):
1664 cur = con.cursor()
1665
1666 def test_closed_con_commit(self):
1667 con = sqlite.connect(":memory:")
1668 con.close()
1669 with self.assertRaises(sqlite.ProgrammingError):
1670 con.commit()
1671
1672 def test_closed_con_rollback(self):
1673 con = sqlite.connect(":memory:")
1674 con.close()
1675 with self.assertRaises(sqlite.ProgrammingError):
1676 con.rollback()
1677
1678 def test_closed_cur_execute(self):
1679 con = sqlite.connect(":memory:")
1680 cur = con.cursor()
1681 con.close()
1682 with self.assertRaises(sqlite.ProgrammingError):
1683 cur.execute("select 4")
1684
1685 def test_closed_create_function(self):
1686 con = sqlite.connect(":memory:")
1687 con.close()
1688 def f(x): return 17
1689 with self.assertRaises(sqlite.ProgrammingError):
1690 con.create_function("foo", 1, f)
1691
1692 def test_closed_create_aggregate(self):
1693 con = sqlite.connect(":memory:")
1694 con.close()
1695 class ESC[4;38;5;81mAgg:
1696 def __init__(self):
1697 pass
1698 def step(self, x):
1699 pass
1700 def finalize(self):
1701 return 17
1702 with self.assertRaises(sqlite.ProgrammingError):
1703 con.create_aggregate("foo", 1, Agg)
1704
1705 def test_closed_set_authorizer(self):
1706 con = sqlite.connect(":memory:")
1707 con.close()
1708 def authorizer(*args):
1709 return sqlite.DENY
1710 with self.assertRaises(sqlite.ProgrammingError):
1711 con.set_authorizer(authorizer)
1712
1713 def test_closed_set_progress_callback(self):
1714 con = sqlite.connect(":memory:")
1715 con.close()
1716 def progress(): pass
1717 with self.assertRaises(sqlite.ProgrammingError):
1718 con.set_progress_handler(progress, 100)
1719
1720 def test_closed_call(self):
1721 con = sqlite.connect(":memory:")
1722 con.close()
1723 with self.assertRaises(sqlite.ProgrammingError):
1724 con()
1725
1726 class ESC[4;38;5;81mClosedCurTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1727 def test_closed(self):
1728 con = sqlite.connect(":memory:")
1729 cur = con.cursor()
1730 cur.close()
1731
1732 for method_name in ("execute", "executemany", "executescript", "fetchall", "fetchmany", "fetchone"):
1733 if method_name in ("execute", "executescript"):
1734 params = ("select 4 union select 5",)
1735 elif method_name == "executemany":
1736 params = ("insert into foo(bar) values (?)", [(3,), (4,)])
1737 else:
1738 params = []
1739
1740 with self.assertRaises(sqlite.ProgrammingError):
1741 method = getattr(cur, method_name)
1742 method(*params)
1743
1744
1745 class ESC[4;38;5;81mSqliteOnConflictTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1746 """
1747 Tests for SQLite's "insert on conflict" feature.
1748
1749 See https://www.sqlite.org/lang_conflict.html for details.
1750 """
1751
1752 def setUp(self):
1753 self.cx = sqlite.connect(":memory:")
1754 self.cu = self.cx.cursor()
1755 self.cu.execute("""
1756 CREATE TABLE test(
1757 id INTEGER PRIMARY KEY, name TEXT, unique_name TEXT UNIQUE
1758 );
1759 """)
1760
1761 def tearDown(self):
1762 self.cu.close()
1763 self.cx.close()
1764
1765 def test_on_conflict_rollback_with_explicit_transaction(self):
1766 self.cx.isolation_level = None # autocommit mode
1767 self.cu = self.cx.cursor()
1768 # Start an explicit transaction.
1769 self.cu.execute("BEGIN")
1770 self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')")
1771 self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')")
1772 with self.assertRaises(sqlite.IntegrityError):
1773 self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')")
1774 # Use connection to commit.
1775 self.cx.commit()
1776 self.cu.execute("SELECT name, unique_name from test")
1777 # Transaction should have rolled back and nothing should be in table.
1778 self.assertEqual(self.cu.fetchall(), [])
1779
1780 def test_on_conflict_abort_raises_with_explicit_transactions(self):
1781 # Abort cancels the current sql statement but doesn't change anything
1782 # about the current transaction.
1783 self.cx.isolation_level = None # autocommit mode
1784 self.cu = self.cx.cursor()
1785 # Start an explicit transaction.
1786 self.cu.execute("BEGIN")
1787 self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')")
1788 self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')")
1789 with self.assertRaises(sqlite.IntegrityError):
1790 self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')")
1791 self.cx.commit()
1792 self.cu.execute("SELECT name, unique_name FROM test")
1793 # Expect the first two inserts to work, third to do nothing.
1794 self.assertEqual(self.cu.fetchall(), [('abort_test', None), (None, 'foo',)])
1795
1796 def test_on_conflict_rollback_without_transaction(self):
1797 # Start of implicit transaction
1798 self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')")
1799 self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')")
1800 with self.assertRaises(sqlite.IntegrityError):
1801 self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')")
1802 self.cu.execute("SELECT name, unique_name FROM test")
1803 # Implicit transaction is rolled back on error.
1804 self.assertEqual(self.cu.fetchall(), [])
1805
1806 def test_on_conflict_abort_raises_without_transactions(self):
1807 # Abort cancels the current sql statement but doesn't change anything
1808 # about the current transaction.
1809 self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')")
1810 self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')")
1811 with self.assertRaises(sqlite.IntegrityError):
1812 self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')")
1813 # Make sure all other values were inserted.
1814 self.cu.execute("SELECT name, unique_name FROM test")
1815 self.assertEqual(self.cu.fetchall(), [('abort_test', None), (None, 'foo',)])
1816
1817 def test_on_conflict_fail(self):
1818 self.cu.execute("INSERT OR FAIL INTO test(unique_name) VALUES ('foo')")
1819 with self.assertRaises(sqlite.IntegrityError):
1820 self.cu.execute("INSERT OR FAIL INTO test(unique_name) VALUES ('foo')")
1821 self.assertEqual(self.cu.fetchall(), [])
1822
1823 def test_on_conflict_ignore(self):
1824 self.cu.execute("INSERT OR IGNORE INTO test(unique_name) VALUES ('foo')")
1825 # Nothing should happen.
1826 self.cu.execute("INSERT OR IGNORE INTO test(unique_name) VALUES ('foo')")
1827 self.cu.execute("SELECT unique_name FROM test")
1828 self.assertEqual(self.cu.fetchall(), [('foo',)])
1829
1830 def test_on_conflict_replace(self):
1831 self.cu.execute("INSERT OR REPLACE INTO test(name, unique_name) VALUES ('Data!', 'foo')")
1832 # There shouldn't be an IntegrityError exception.
1833 self.cu.execute("INSERT OR REPLACE INTO test(name, unique_name) VALUES ('Very different data!', 'foo')")
1834 self.cu.execute("SELECT name, unique_name FROM test")
1835 self.assertEqual(self.cu.fetchall(), [('Very different data!', 'foo')])
1836
1837
1838 @requires_subprocess()
1839 class ESC[4;38;5;81mMultiprocessTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1840 CONNECTION_TIMEOUT = 0 # Disable the busy timeout.
1841
1842 def tearDown(self):
1843 unlink(TESTFN)
1844
1845 def test_ctx_mgr_rollback_if_commit_failed(self):
1846 # bpo-27334: ctx manager does not rollback if commit fails
1847 SCRIPT = f"""if 1:
1848 import sqlite3
1849 def wait():
1850 print("started")
1851 assert "database is locked" in input()
1852
1853 cx = sqlite3.connect("{TESTFN}", timeout={self.CONNECTION_TIMEOUT})
1854 cx.create_function("wait", 0, wait)
1855 with cx:
1856 cx.execute("create table t(t)")
1857 try:
1858 # execute two transactions; both will try to lock the db
1859 cx.executescript('''
1860 -- start a transaction and wait for parent
1861 begin transaction;
1862 select * from t;
1863 select wait();
1864 rollback;
1865
1866 -- start a new transaction; would fail if parent holds lock
1867 begin transaction;
1868 select * from t;
1869 rollback;
1870 ''')
1871 finally:
1872 cx.close()
1873 """
1874
1875 # spawn child process
1876 proc = subprocess.Popen(
1877 [sys.executable, "-c", SCRIPT],
1878 encoding="utf-8",
1879 bufsize=0,
1880 stdin=subprocess.PIPE,
1881 stdout=subprocess.PIPE,
1882 )
1883 self.addCleanup(proc.communicate)
1884
1885 # wait for child process to start
1886 self.assertEqual("started", proc.stdout.readline().strip())
1887
1888 cx = sqlite.connect(TESTFN, timeout=self.CONNECTION_TIMEOUT)
1889 try: # context manager should correctly release the db lock
1890 with cx:
1891 cx.execute("insert into t values('test')")
1892 except sqlite.OperationalError as exc:
1893 proc.stdin.write(str(exc))
1894 else:
1895 proc.stdin.write("no error")
1896 finally:
1897 cx.close()
1898
1899 # terminate child process
1900 self.assertIsNone(proc.returncode)
1901 try:
1902 proc.communicate(input="end", timeout=SHORT_TIMEOUT)
1903 except subprocess.TimeoutExpired:
1904 proc.kill()
1905 proc.communicate()
1906 raise
1907 self.assertEqual(proc.returncode, 0)
1908
1909
1910 if __name__ == "__main__":
1911 unittest.main()