1 # Author: Paul Kippes <kippesp@gmail.com>
2
3 import unittest
4 import sqlite3 as sqlite
5 from .test_dbapi import memory_database
6
7
8 class ESC[4;38;5;81mDumpTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
9 def setUp(self):
10 self.cx = sqlite.connect(":memory:")
11 self.cu = self.cx.cursor()
12
13 def tearDown(self):
14 self.cx.close()
15
16 def test_table_dump(self):
17 expected_sqls = [
18 """CREATE TABLE "index"("index" blob);"""
19 ,
20 """INSERT INTO "index" VALUES(X'01');"""
21 ,
22 """CREATE TABLE "quoted""table"("quoted""field" text);"""
23 ,
24 """INSERT INTO "quoted""table" VALUES('quoted''value');"""
25 ,
26 "CREATE TABLE t1(id integer primary key, s1 text, " \
27 "t1_i1 integer not null, i2 integer, unique (s1), " \
28 "constraint t1_idx1 unique (i2));"
29 ,
30 "INSERT INTO \"t1\" VALUES(1,'foo',10,20);"
31 ,
32 "INSERT INTO \"t1\" VALUES(2,'foo2',30,30);"
33 ,
34 "CREATE TABLE t2(id integer, t2_i1 integer, " \
35 "t2_i2 integer, primary key (id)," \
36 "foreign key(t2_i1) references t1(t1_i1));"
37 ,
38 "CREATE TRIGGER trigger_1 update of t1_i1 on t1 " \
39 "begin " \
40 "update t2 set t2_i1 = new.t1_i1 where t2_i1 = old.t1_i1; " \
41 "end;"
42 ,
43 "CREATE VIEW v1 as select * from t1 left join t2 " \
44 "using (id);"
45 ]
46 [self.cu.execute(s) for s in expected_sqls]
47 i = self.cx.iterdump()
48 actual_sqls = [s for s in i]
49 expected_sqls = ['BEGIN TRANSACTION;'] + expected_sqls + \
50 ['COMMIT;']
51 [self.assertEqual(expected_sqls[i], actual_sqls[i])
52 for i in range(len(expected_sqls))]
53
54 def test_dump_autoincrement(self):
55 expected = [
56 'CREATE TABLE "t1" (id integer primary key autoincrement);',
57 'INSERT INTO "t1" VALUES(NULL);',
58 'CREATE TABLE "t2" (id integer primary key autoincrement);',
59 ]
60 self.cu.executescript("".join(expected))
61
62 # the NULL value should now be automatically be set to 1
63 expected[1] = expected[1].replace("NULL", "1")
64 expected.insert(0, "BEGIN TRANSACTION;")
65 expected.extend([
66 'DELETE FROM "sqlite_sequence";',
67 'INSERT INTO "sqlite_sequence" VALUES(\'t1\',1);',
68 'COMMIT;',
69 ])
70
71 actual = [stmt for stmt in self.cx.iterdump()]
72 self.assertEqual(expected, actual)
73
74 def test_dump_autoincrement_create_new_db(self):
75 self.cu.execute("BEGIN TRANSACTION")
76 self.cu.execute("CREATE TABLE t1 (id integer primary key autoincrement)")
77 self.cu.execute("CREATE TABLE t2 (id integer primary key autoincrement)")
78 self.cu.executemany("INSERT INTO t1 VALUES(?)", ((None,) for _ in range(9)))
79 self.cu.executemany("INSERT INTO t2 VALUES(?)", ((None,) for _ in range(4)))
80 self.cx.commit()
81
82 with memory_database() as cx2:
83 query = "".join(self.cx.iterdump())
84 cx2.executescript(query)
85 cu2 = cx2.cursor()
86
87 dataset = (
88 ("t1", 9),
89 ("t2", 4),
90 )
91 for table, seq in dataset:
92 with self.subTest(table=table, seq=seq):
93 res = cu2.execute("""
94 SELECT "seq" FROM "sqlite_sequence" WHERE "name" == ?
95 """, (table,))
96 rows = res.fetchall()
97 self.assertEqual(rows[0][0], seq)
98
99 def test_unorderable_row(self):
100 # iterdump() should be able to cope with unorderable row types (issue #15545)
101 class ESC[4;38;5;81mUnorderableRow:
102 def __init__(self, cursor, row):
103 self.row = row
104 def __getitem__(self, index):
105 return self.row[index]
106 self.cx.row_factory = UnorderableRow
107 CREATE_ALPHA = """CREATE TABLE "alpha" ("one");"""
108 CREATE_BETA = """CREATE TABLE "beta" ("two");"""
109 expected = [
110 "BEGIN TRANSACTION;",
111 CREATE_ALPHA,
112 CREATE_BETA,
113 "COMMIT;"
114 ]
115 self.cu.execute(CREATE_BETA)
116 self.cu.execute(CREATE_ALPHA)
117 got = list(self.cx.iterdump())
118 self.assertEqual(expected, got)
119
120 def test_dump_virtual_tables(self):
121 # gh-64662
122 expected = [
123 "BEGIN TRANSACTION;",
124 "PRAGMA writable_schema=ON;",
125 ("INSERT INTO sqlite_master(type,name,tbl_name,rootpage,sql)"
126 "VALUES('table','test','test',0,'CREATE VIRTUAL TABLE test USING fts4(example)');"),
127 "CREATE TABLE 'test_content'(docid INTEGER PRIMARY KEY, 'c0example');",
128 "CREATE TABLE 'test_docsize'(docid INTEGER PRIMARY KEY, size BLOB);",
129 ("CREATE TABLE 'test_segdir'(level INTEGER,idx INTEGER,start_block INTEGER,"
130 "leaves_end_block INTEGER,end_block INTEGER,root BLOB,PRIMARY KEY(level, idx));"),
131 "CREATE TABLE 'test_segments'(blockid INTEGER PRIMARY KEY, block BLOB);",
132 "CREATE TABLE 'test_stat'(id INTEGER PRIMARY KEY, value BLOB);",
133 "PRAGMA writable_schema=OFF;",
134 "COMMIT;"
135 ]
136 self.cu.execute("CREATE VIRTUAL TABLE test USING fts4(example)")
137 actual = list(self.cx.iterdump())
138 self.assertEqual(expected, actual)
139
140
141 if __name__ == "__main__":
142 unittest.main()