• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Author: Paul Kippes <kippesp@gmail.com>
2
3import unittest
4
5from .util import memory_database
6from .util import MemoryDatabaseMixin
7
8
9class DumpTests(MemoryDatabaseMixin, unittest.TestCase):
10
11    def test_table_dump(self):
12        expected_sqls = [
13                "PRAGMA foreign_keys=OFF;",
14                """CREATE TABLE "index"("index" blob);"""
15                ,
16                """INSERT INTO "index" VALUES(X'01');"""
17                ,
18                """CREATE TABLE "quoted""table"("quoted""field" text);"""
19                ,
20                """INSERT INTO "quoted""table" VALUES('quoted''value');"""
21                ,
22                "CREATE TABLE t1(id integer primary key, s1 text, " \
23                "t1_i1 integer not null, i2 integer, unique (s1), " \
24                "constraint t1_idx1 unique (i2), " \
25                "constraint t1_i1_idx1 unique (t1_i1));"
26                ,
27                "INSERT INTO \"t1\" VALUES(1,'foo',10,20);"
28                ,
29                "INSERT INTO \"t1\" VALUES(2,'foo2',30,30);"
30                ,
31                "CREATE TABLE t2(id integer, t2_i1 integer, " \
32                "t2_i2 integer, primary key (id)," \
33                "foreign key(t2_i1) references t1(t1_i1));"
34                ,
35                # Foreign key violation.
36                "INSERT INTO \"t2\" VALUES(1,2,3);"
37                ,
38                "CREATE TRIGGER trigger_1 update of t1_i1 on t1 " \
39                "begin " \
40                "update t2 set t2_i1 = new.t1_i1 where t2_i1 = old.t1_i1; " \
41                "end;"
42                ,
43                "CREATE VIEW v1 as select * from t1 left join t2 " \
44                "using (id);"
45                ]
46        [self.cu.execute(s) for s in expected_sqls]
47        i = self.cx.iterdump()
48        actual_sqls = [s for s in i]
49        expected_sqls = [
50            "PRAGMA foreign_keys=OFF;",
51            "BEGIN TRANSACTION;",
52            *expected_sqls[1:],
53            "COMMIT;",
54        ]
55        [self.assertEqual(expected_sqls[i], actual_sqls[i])
56            for i in range(len(expected_sqls))]
57
58    def test_table_dump_filter(self):
59        all_table_sqls = [
60            """CREATE TABLE "some_table_2" ("id_1" INTEGER);""",
61            """INSERT INTO "some_table_2" VALUES(3);""",
62            """INSERT INTO "some_table_2" VALUES(4);""",
63            """CREATE TABLE "test_table_1" ("id_2" INTEGER);""",
64            """INSERT INTO "test_table_1" VALUES(1);""",
65            """INSERT INTO "test_table_1" VALUES(2);""",
66        ]
67        all_views_sqls = [
68            """CREATE VIEW "view_1" AS SELECT * FROM "some_table_2";""",
69            """CREATE VIEW "view_2" AS SELECT * FROM "test_table_1";""",
70        ]
71        # Create database structure.
72        for sql in [*all_table_sqls, *all_views_sqls]:
73            self.cu.execute(sql)
74        # %_table_% matches all tables.
75        dump_sqls = list(self.cx.iterdump(filter="%_table_%"))
76        self.assertEqual(
77            dump_sqls,
78            ["BEGIN TRANSACTION;", *all_table_sqls, "COMMIT;"],
79        )
80        # view_% matches all views.
81        dump_sqls = list(self.cx.iterdump(filter="view_%"))
82        self.assertEqual(
83            dump_sqls,
84            ["BEGIN TRANSACTION;", *all_views_sqls, "COMMIT;"],
85        )
86        # %_1 matches tables and views with the _1 suffix.
87        dump_sqls = list(self.cx.iterdump(filter="%_1"))
88        self.assertEqual(
89            dump_sqls,
90            [
91                "BEGIN TRANSACTION;",
92                """CREATE TABLE "test_table_1" ("id_2" INTEGER);""",
93                """INSERT INTO "test_table_1" VALUES(1);""",
94                """INSERT INTO "test_table_1" VALUES(2);""",
95                """CREATE VIEW "view_1" AS SELECT * FROM "some_table_2";""",
96                "COMMIT;"
97            ],
98        )
99        # some_% matches some_table_2.
100        dump_sqls = list(self.cx.iterdump(filter="some_%"))
101        self.assertEqual(
102            dump_sqls,
103            [
104                "BEGIN TRANSACTION;",
105                """CREATE TABLE "some_table_2" ("id_1" INTEGER);""",
106                """INSERT INTO "some_table_2" VALUES(3);""",
107                """INSERT INTO "some_table_2" VALUES(4);""",
108                "COMMIT;"
109            ],
110        )
111        # Only single object.
112        dump_sqls = list(self.cx.iterdump(filter="view_2"))
113        self.assertEqual(
114            dump_sqls,
115            [
116                "BEGIN TRANSACTION;",
117                """CREATE VIEW "view_2" AS SELECT * FROM "test_table_1";""",
118                "COMMIT;"
119            ],
120        )
121        # % matches all objects.
122        dump_sqls = list(self.cx.iterdump(filter="%"))
123        self.assertEqual(
124            dump_sqls,
125            ["BEGIN TRANSACTION;", *all_table_sqls, *all_views_sqls, "COMMIT;"],
126        )
127
128    def test_dump_autoincrement(self):
129        expected = [
130            'CREATE TABLE "t1" (id integer primary key autoincrement);',
131            'INSERT INTO "t1" VALUES(NULL);',
132            'CREATE TABLE "t2" (id integer primary key autoincrement);',
133        ]
134        self.cu.executescript("".join(expected))
135
136        # the NULL value should now be automatically be set to 1
137        expected[1] = expected[1].replace("NULL", "1")
138        expected.insert(0, "BEGIN TRANSACTION;")
139        expected.extend([
140            'DELETE FROM "sqlite_sequence";',
141            'INSERT INTO "sqlite_sequence" VALUES(\'t1\',1);',
142            'COMMIT;',
143        ])
144
145        actual = [stmt for stmt in self.cx.iterdump()]
146        self.assertEqual(expected, actual)
147
148    def test_dump_autoincrement_create_new_db(self):
149        self.cu.execute("BEGIN TRANSACTION")
150        self.cu.execute("CREATE TABLE t1 (id integer primary key autoincrement)")
151        self.cu.execute("CREATE TABLE t2 (id integer primary key autoincrement)")
152        self.cu.executemany("INSERT INTO t1 VALUES(?)", ((None,) for _ in range(9)))
153        self.cu.executemany("INSERT INTO t2 VALUES(?)", ((None,) for _ in range(4)))
154        self.cx.commit()
155
156        with memory_database() as cx2:
157            query = "".join(self.cx.iterdump())
158            cx2.executescript(query)
159            cu2 = cx2.cursor()
160
161            dataset = (
162                ("t1", 9),
163                ("t2", 4),
164            )
165            for table, seq in dataset:
166                with self.subTest(table=table, seq=seq):
167                    res = cu2.execute("""
168                        SELECT "seq" FROM "sqlite_sequence" WHERE "name" == ?
169                    """, (table,))
170                    rows = res.fetchall()
171                    self.assertEqual(rows[0][0], seq)
172
173    def test_unorderable_row(self):
174        # iterdump() should be able to cope with unorderable row types (issue #15545)
175        class UnorderableRow:
176            def __init__(self, cursor, row):
177                self.row = row
178            def __getitem__(self, index):
179                return self.row[index]
180        self.cx.row_factory = UnorderableRow
181        CREATE_ALPHA = """CREATE TABLE "alpha" ("one");"""
182        CREATE_BETA = """CREATE TABLE "beta" ("two");"""
183        expected = [
184            "BEGIN TRANSACTION;",
185            CREATE_ALPHA,
186            CREATE_BETA,
187            "COMMIT;"
188            ]
189        self.cu.execute(CREATE_BETA)
190        self.cu.execute(CREATE_ALPHA)
191        got = list(self.cx.iterdump())
192        self.assertEqual(expected, got)
193
194    def test_dump_custom_row_factory(self):
195        # gh-118221: iterdump should be able to cope with custom row factories.
196        def dict_factory(cu, row):
197            fields = [col[0] for col in cu.description]
198            return dict(zip(fields, row))
199
200        self.cx.row_factory = dict_factory
201        CREATE_TABLE = "CREATE TABLE test(t);"
202        expected = ["BEGIN TRANSACTION;", CREATE_TABLE, "COMMIT;"]
203
204        self.cu.execute(CREATE_TABLE)
205        actual = list(self.cx.iterdump())
206        self.assertEqual(expected, actual)
207        self.assertEqual(self.cx.row_factory, dict_factory)
208
209    def test_dump_virtual_tables(self):
210        # gh-64662
211        expected = [
212            "BEGIN TRANSACTION;",
213            "PRAGMA writable_schema=ON;",
214            ("INSERT INTO sqlite_master(type,name,tbl_name,rootpage,sql)"
215             "VALUES('table','test','test',0,'CREATE VIRTUAL TABLE test USING fts4(example)');"),
216            "CREATE TABLE 'test_content'(docid INTEGER PRIMARY KEY, 'c0example');",
217            "CREATE TABLE 'test_docsize'(docid INTEGER PRIMARY KEY, size BLOB);",
218            ("CREATE TABLE 'test_segdir'(level INTEGER,idx INTEGER,start_block INTEGER,"
219             "leaves_end_block INTEGER,end_block INTEGER,root BLOB,PRIMARY KEY(level, idx));"),
220            "CREATE TABLE 'test_segments'(blockid INTEGER PRIMARY KEY, block BLOB);",
221            "CREATE TABLE 'test_stat'(id INTEGER PRIMARY KEY, value BLOB);",
222            "PRAGMA writable_schema=OFF;",
223            "COMMIT;"
224        ]
225        self.cu.execute("CREATE VIRTUAL TABLE test USING fts4(example)")
226        actual = list(self.cx.iterdump())
227        self.assertEqual(expected, actual)
228
229
230if __name__ == "__main__":
231    unittest.main()
232