1# Author: Paul Kippes <kippesp@gmail.com> 2 3import unittest 4 5from .util import memory_database 6from .util import MemoryDatabaseMixin 7 8 9class DumpTests(MemoryDatabaseMixin, unittest.TestCase): 10 11 def test_table_dump(self): 12 expected_sqls = [ 13 "PRAGMA foreign_keys=OFF;", 14 """CREATE TABLE "index"("index" blob);""" 15 , 16 """INSERT INTO "index" VALUES(X'01');""" 17 , 18 """CREATE TABLE "quoted""table"("quoted""field" text);""" 19 , 20 """INSERT INTO "quoted""table" VALUES('quoted''value');""" 21 , 22 "CREATE TABLE t1(id integer primary key, s1 text, " \ 23 "t1_i1 integer not null, i2 integer, unique (s1), " \ 24 "constraint t1_idx1 unique (i2), " \ 25 "constraint t1_i1_idx1 unique (t1_i1));" 26 , 27 "INSERT INTO \"t1\" VALUES(1,'foo',10,20);" 28 , 29 "INSERT INTO \"t1\" VALUES(2,'foo2',30,30);" 30 , 31 "CREATE TABLE t2(id integer, t2_i1 integer, " \ 32 "t2_i2 integer, primary key (id)," \ 33 "foreign key(t2_i1) references t1(t1_i1));" 34 , 35 # Foreign key violation. 36 "INSERT INTO \"t2\" VALUES(1,2,3);" 37 , 38 "CREATE TRIGGER trigger_1 update of t1_i1 on t1 " \ 39 "begin " \ 40 "update t2 set t2_i1 = new.t1_i1 where t2_i1 = old.t1_i1; " \ 41 "end;" 42 , 43 "CREATE VIEW v1 as select * from t1 left join t2 " \ 44 "using (id);" 45 ] 46 [self.cu.execute(s) for s in expected_sqls] 47 i = self.cx.iterdump() 48 actual_sqls = [s for s in i] 49 expected_sqls = [ 50 "PRAGMA foreign_keys=OFF;", 51 "BEGIN TRANSACTION;", 52 *expected_sqls[1:], 53 "COMMIT;", 54 ] 55 [self.assertEqual(expected_sqls[i], actual_sqls[i]) 56 for i in range(len(expected_sqls))] 57 58 def test_table_dump_filter(self): 59 all_table_sqls = [ 60 """CREATE TABLE "some_table_2" ("id_1" INTEGER);""", 61 """INSERT INTO "some_table_2" VALUES(3);""", 62 """INSERT INTO "some_table_2" VALUES(4);""", 63 """CREATE TABLE "test_table_1" ("id_2" INTEGER);""", 64 """INSERT INTO "test_table_1" VALUES(1);""", 65 """INSERT INTO "test_table_1" VALUES(2);""", 66 ] 67 all_views_sqls = [ 68 """CREATE VIEW "view_1" AS SELECT * FROM "some_table_2";""", 69 """CREATE VIEW "view_2" AS SELECT * FROM "test_table_1";""", 70 ] 71 # Create database structure. 72 for sql in [*all_table_sqls, *all_views_sqls]: 73 self.cu.execute(sql) 74 # %_table_% matches all tables. 75 dump_sqls = list(self.cx.iterdump(filter="%_table_%")) 76 self.assertEqual( 77 dump_sqls, 78 ["BEGIN TRANSACTION;", *all_table_sqls, "COMMIT;"], 79 ) 80 # view_% matches all views. 81 dump_sqls = list(self.cx.iterdump(filter="view_%")) 82 self.assertEqual( 83 dump_sqls, 84 ["BEGIN TRANSACTION;", *all_views_sqls, "COMMIT;"], 85 ) 86 # %_1 matches tables and views with the _1 suffix. 87 dump_sqls = list(self.cx.iterdump(filter="%_1")) 88 self.assertEqual( 89 dump_sqls, 90 [ 91 "BEGIN TRANSACTION;", 92 """CREATE TABLE "test_table_1" ("id_2" INTEGER);""", 93 """INSERT INTO "test_table_1" VALUES(1);""", 94 """INSERT INTO "test_table_1" VALUES(2);""", 95 """CREATE VIEW "view_1" AS SELECT * FROM "some_table_2";""", 96 "COMMIT;" 97 ], 98 ) 99 # some_% matches some_table_2. 100 dump_sqls = list(self.cx.iterdump(filter="some_%")) 101 self.assertEqual( 102 dump_sqls, 103 [ 104 "BEGIN TRANSACTION;", 105 """CREATE TABLE "some_table_2" ("id_1" INTEGER);""", 106 """INSERT INTO "some_table_2" VALUES(3);""", 107 """INSERT INTO "some_table_2" VALUES(4);""", 108 "COMMIT;" 109 ], 110 ) 111 # Only single object. 112 dump_sqls = list(self.cx.iterdump(filter="view_2")) 113 self.assertEqual( 114 dump_sqls, 115 [ 116 "BEGIN TRANSACTION;", 117 """CREATE VIEW "view_2" AS SELECT * FROM "test_table_1";""", 118 "COMMIT;" 119 ], 120 ) 121 # % matches all objects. 122 dump_sqls = list(self.cx.iterdump(filter="%")) 123 self.assertEqual( 124 dump_sqls, 125 ["BEGIN TRANSACTION;", *all_table_sqls, *all_views_sqls, "COMMIT;"], 126 ) 127 128 def test_dump_autoincrement(self): 129 expected = [ 130 'CREATE TABLE "t1" (id integer primary key autoincrement);', 131 'INSERT INTO "t1" VALUES(NULL);', 132 'CREATE TABLE "t2" (id integer primary key autoincrement);', 133 ] 134 self.cu.executescript("".join(expected)) 135 136 # the NULL value should now be automatically be set to 1 137 expected[1] = expected[1].replace("NULL", "1") 138 expected.insert(0, "BEGIN TRANSACTION;") 139 expected.extend([ 140 'DELETE FROM "sqlite_sequence";', 141 'INSERT INTO "sqlite_sequence" VALUES(\'t1\',1);', 142 'COMMIT;', 143 ]) 144 145 actual = [stmt for stmt in self.cx.iterdump()] 146 self.assertEqual(expected, actual) 147 148 def test_dump_autoincrement_create_new_db(self): 149 self.cu.execute("BEGIN TRANSACTION") 150 self.cu.execute("CREATE TABLE t1 (id integer primary key autoincrement)") 151 self.cu.execute("CREATE TABLE t2 (id integer primary key autoincrement)") 152 self.cu.executemany("INSERT INTO t1 VALUES(?)", ((None,) for _ in range(9))) 153 self.cu.executemany("INSERT INTO t2 VALUES(?)", ((None,) for _ in range(4))) 154 self.cx.commit() 155 156 with memory_database() as cx2: 157 query = "".join(self.cx.iterdump()) 158 cx2.executescript(query) 159 cu2 = cx2.cursor() 160 161 dataset = ( 162 ("t1", 9), 163 ("t2", 4), 164 ) 165 for table, seq in dataset: 166 with self.subTest(table=table, seq=seq): 167 res = cu2.execute(""" 168 SELECT "seq" FROM "sqlite_sequence" WHERE "name" == ? 169 """, (table,)) 170 rows = res.fetchall() 171 self.assertEqual(rows[0][0], seq) 172 173 def test_unorderable_row(self): 174 # iterdump() should be able to cope with unorderable row types (issue #15545) 175 class UnorderableRow: 176 def __init__(self, cursor, row): 177 self.row = row 178 def __getitem__(self, index): 179 return self.row[index] 180 self.cx.row_factory = UnorderableRow 181 CREATE_ALPHA = """CREATE TABLE "alpha" ("one");""" 182 CREATE_BETA = """CREATE TABLE "beta" ("two");""" 183 expected = [ 184 "BEGIN TRANSACTION;", 185 CREATE_ALPHA, 186 CREATE_BETA, 187 "COMMIT;" 188 ] 189 self.cu.execute(CREATE_BETA) 190 self.cu.execute(CREATE_ALPHA) 191 got = list(self.cx.iterdump()) 192 self.assertEqual(expected, got) 193 194 def test_dump_custom_row_factory(self): 195 # gh-118221: iterdump should be able to cope with custom row factories. 196 def dict_factory(cu, row): 197 fields = [col[0] for col in cu.description] 198 return dict(zip(fields, row)) 199 200 self.cx.row_factory = dict_factory 201 CREATE_TABLE = "CREATE TABLE test(t);" 202 expected = ["BEGIN TRANSACTION;", CREATE_TABLE, "COMMIT;"] 203 204 self.cu.execute(CREATE_TABLE) 205 actual = list(self.cx.iterdump()) 206 self.assertEqual(expected, actual) 207 self.assertEqual(self.cx.row_factory, dict_factory) 208 209 def test_dump_virtual_tables(self): 210 # gh-64662 211 expected = [ 212 "BEGIN TRANSACTION;", 213 "PRAGMA writable_schema=ON;", 214 ("INSERT INTO sqlite_master(type,name,tbl_name,rootpage,sql)" 215 "VALUES('table','test','test',0,'CREATE VIRTUAL TABLE test USING fts4(example)');"), 216 "CREATE TABLE 'test_content'(docid INTEGER PRIMARY KEY, 'c0example');", 217 "CREATE TABLE 'test_docsize'(docid INTEGER PRIMARY KEY, size BLOB);", 218 ("CREATE TABLE 'test_segdir'(level INTEGER,idx INTEGER,start_block INTEGER," 219 "leaves_end_block INTEGER,end_block INTEGER,root BLOB,PRIMARY KEY(level, idx));"), 220 "CREATE TABLE 'test_segments'(blockid INTEGER PRIMARY KEY, block BLOB);", 221 "CREATE TABLE 'test_stat'(id INTEGER PRIMARY KEY, value BLOB);", 222 "PRAGMA writable_schema=OFF;", 223 "COMMIT;" 224 ] 225 self.cu.execute("CREATE VIRTUAL TABLE test USING fts4(example)") 226 actual = list(self.cx.iterdump()) 227 self.assertEqual(expected, actual) 228 229 230if __name__ == "__main__": 231 unittest.main() 232