• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# pysqlite2/test/regression.py: pysqlite regression tests
2#
3# Copyright (C) 2006-2010 Gerhard Häring <gh@ghaering.de>
4#
5# This file is part of pysqlite.
6#
7# This software is provided 'as-is', without any express or implied
8# warranty.  In no event will the authors be held liable for any damages
9# arising from the use of this software.
10#
11# Permission is granted to anyone to use this software for any purpose,
12# including commercial applications, and to alter it and redistribute it
13# freely, subject to the following restrictions:
14#
15# 1. The origin of this software must not be misrepresented; you must not
16#    claim that you wrote the original software. If you use this software
17#    in a product, an acknowledgment in the product documentation would be
18#    appreciated but is not required.
19# 2. Altered source versions must be plainly marked as such, and must not be
20#    misrepresented as being the original software.
21# 3. This notice may not be removed or altered from any source distribution.
22
23import datetime
24import unittest
25import sqlite3 as sqlite
26import weakref
27import functools
28from test import support
29
30class RegressionTests(unittest.TestCase):
31    def setUp(self):
32        self.con = sqlite.connect(":memory:")
33
34    def tearDown(self):
35        self.con.close()
36
37    def test_pragma_user_version(self):
38        # This used to crash pysqlite because this pragma command returns NULL for the column name
39        cur = self.con.cursor()
40        cur.execute("pragma user_version")
41
42    def test_pragma_schema_version(self):
43        # This still crashed pysqlite <= 2.2.1
44        con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
45        try:
46            cur = self.con.cursor()
47            cur.execute("pragma schema_version")
48        finally:
49            cur.close()
50            con.close()
51
52    def test_statement_reset(self):
53        # pysqlite 2.1.0 to 2.2.0 have the problem that not all statements are
54        # reset before a rollback, but only those that are still in the
55        # statement cache. The others are not accessible from the connection object.
56        con = sqlite.connect(":memory:", cached_statements=5)
57        cursors = [con.cursor() for x in range(5)]
58        cursors[0].execute("create table test(x)")
59        for i in range(10):
60            cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in range(10)])
61
62        for i in range(5):
63            cursors[i].execute(" " * i + "select x from test")
64
65        con.rollback()
66
67    def test_column_name_with_spaces(self):
68        cur = self.con.cursor()
69        cur.execute('select 1 as "foo bar [datetime]"')
70        self.assertEqual(cur.description[0][0], "foo bar [datetime]")
71
72        cur.execute('select 1 as "foo baz"')
73        self.assertEqual(cur.description[0][0], "foo baz")
74
75    def test_statement_finalization_on_close_db(self):
76        # pysqlite versions <= 2.3.3 only finalized statements in the statement
77        # cache when closing the database. statements that were still
78        # referenced in cursors weren't closed and could provoke "
79        # "OperationalError: Unable to close due to unfinalised statements".
80        con = sqlite.connect(":memory:")
81        cursors = []
82        # default statement cache size is 100
83        for i in range(105):
84            cur = con.cursor()
85            cursors.append(cur)
86            cur.execute("select 1 x union select " + str(i))
87        con.close()
88
89    def test_on_conflict_rollback(self):
90        con = sqlite.connect(":memory:")
91        con.execute("create table foo(x, unique(x) on conflict rollback)")
92        con.execute("insert into foo(x) values (1)")
93        try:
94            con.execute("insert into foo(x) values (1)")
95        except sqlite.DatabaseError:
96            pass
97        con.execute("insert into foo(x) values (2)")
98        try:
99            con.commit()
100        except sqlite.OperationalError:
101            self.fail("pysqlite knew nothing about the implicit ROLLBACK")
102
103    def test_workaround_for_buggy_sqlite_transfer_bindings(self):
104        """
105        pysqlite would crash with older SQLite versions unless
106        a workaround is implemented.
107        """
108        self.con.execute("create table foo(bar)")
109        self.con.execute("drop table foo")
110        self.con.execute("create table foo(bar)")
111
112    def test_empty_statement(self):
113        """
114        pysqlite used to segfault with SQLite versions 3.5.x. These return NULL
115        for "no-operation" statements
116        """
117        self.con.execute("")
118
119    def test_type_map_usage(self):
120        """
121        pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling
122        a statement. This test exhibits the problem.
123        """
124        SELECT = "select * from foo"
125        con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES)
126        con.execute("create table foo(bar timestamp)")
127        con.execute("insert into foo(bar) values (?)", (datetime.datetime.now(),))
128        con.execute(SELECT).close()
129        con.execute("drop table foo")
130        con.execute("create table foo(bar integer)")
131        con.execute("insert into foo(bar) values (5)")
132        con.execute(SELECT).close()
133
134    def test_bind_mutating_list(self):
135        # Issue41662: Crash when mutate a list of parameters during iteration.
136        class X:
137            def __conform__(self, protocol):
138                parameters.clear()
139                return "..."
140        parameters = [X(), 0]
141        con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES)
142        con.execute("create table foo(bar X, baz integer)")
143        # Should not crash
144        with self.assertRaises(IndexError):
145            con.execute("insert into foo(bar, baz) values (?, ?)", parameters)
146
147    def test_error_msg_decode_error(self):
148        # When porting the module to Python 3.0, the error message about
149        # decoding errors disappeared. This verifies they're back again.
150        with self.assertRaises(sqlite.OperationalError) as cm:
151            self.con.execute("select 'xxx' || ? || 'yyy' colname",
152                             (bytes(bytearray([250])),)).fetchone()
153        msg = "Could not decode to UTF-8 column 'colname' with text 'xxx"
154        self.assertIn(msg, str(cm.exception))
155
156    def test_register_adapter(self):
157        """
158        See issue 3312.
159        """
160        self.assertRaises(TypeError, sqlite.register_adapter, {}, None)
161
162    def test_set_isolation_level(self):
163        # See issue 27881.
164        class CustomStr(str):
165            def upper(self):
166                return None
167            def __del__(self):
168                con.isolation_level = ""
169
170        con = sqlite.connect(":memory:")
171        con.isolation_level = None
172        for level in "", "DEFERRED", "IMMEDIATE", "EXCLUSIVE":
173            with self.subTest(level=level):
174                con.isolation_level = level
175                con.isolation_level = level.lower()
176                con.isolation_level = level.capitalize()
177                con.isolation_level = CustomStr(level)
178
179        # setting isolation_level failure should not alter previous state
180        con.isolation_level = None
181        con.isolation_level = "DEFERRED"
182        pairs = [
183            (1, TypeError), (b'', TypeError), ("abc", ValueError),
184            ("IMMEDIATE\0EXCLUSIVE", ValueError), ("\xe9", ValueError),
185        ]
186        for value, exc in pairs:
187            with self.subTest(level=value):
188                with self.assertRaises(exc):
189                    con.isolation_level = value
190                self.assertEqual(con.isolation_level, "DEFERRED")
191
192    def test_cursor_constructor_call_check(self):
193        """
194        Verifies that cursor methods check whether base class __init__ was
195        called.
196        """
197        class Cursor(sqlite.Cursor):
198            def __init__(self, con):
199                pass
200
201        con = sqlite.connect(":memory:")
202        cur = Cursor(con)
203        with self.assertRaises(sqlite.ProgrammingError):
204            cur.execute("select 4+5").fetchall()
205        with self.assertRaisesRegex(sqlite.ProgrammingError,
206                                    r'^Base Cursor\.__init__ not called\.$'):
207            cur.close()
208
209    def test_str_subclass(self):
210        """
211        The Python 3.0 port of the module didn't cope with values of subclasses of str.
212        """
213        class MyStr(str): pass
214        self.con.execute("select ?", (MyStr("abc"),))
215
216    def test_connection_constructor_call_check(self):
217        """
218        Verifies that connection methods check whether base class __init__ was
219        called.
220        """
221        class Connection(sqlite.Connection):
222            def __init__(self, name):
223                pass
224
225        con = Connection(":memory:")
226        with self.assertRaises(sqlite.ProgrammingError):
227            cur = con.cursor()
228
229    def test_cursor_registration(self):
230        """
231        Verifies that subclassed cursor classes are correctly registered with
232        the connection object, too.  (fetch-across-rollback problem)
233        """
234        class Connection(sqlite.Connection):
235            def cursor(self):
236                return Cursor(self)
237
238        class Cursor(sqlite.Cursor):
239            def __init__(self, con):
240                sqlite.Cursor.__init__(self, con)
241
242        con = Connection(":memory:")
243        cur = con.cursor()
244        cur.execute("create table foo(x)")
245        cur.executemany("insert into foo(x) values (?)", [(3,), (4,), (5,)])
246        cur.execute("select x from foo")
247        con.rollback()
248        with self.assertRaises(sqlite.InterfaceError):
249            cur.fetchall()
250
251    def test_auto_commit(self):
252        """
253        Verifies that creating a connection in autocommit mode works.
254        2.5.3 introduced a regression so that these could no longer
255        be created.
256        """
257        con = sqlite.connect(":memory:", isolation_level=None)
258
259    def test_pragma_autocommit(self):
260        """
261        Verifies that running a PRAGMA statement that does an autocommit does
262        work. This did not work in 2.5.3/2.5.4.
263        """
264        cur = self.con.cursor()
265        cur.execute("create table foo(bar)")
266        cur.execute("insert into foo(bar) values (5)")
267
268        cur.execute("pragma page_size")
269        row = cur.fetchone()
270
271    def test_connection_call(self):
272        """
273        Call a connection with a non-string SQL request: check error handling
274        of the statement constructor.
275        """
276        self.assertRaises(TypeError, self.con, 1)
277
278    def test_collation(self):
279        def collation_cb(a, b):
280            return 1
281        self.assertRaises(sqlite.ProgrammingError, self.con.create_collation,
282            # Lone surrogate cannot be encoded to the default encoding (utf8)
283            "\uDC80", collation_cb)
284
285    def test_recursive_cursor_use(self):
286        """
287        http://bugs.python.org/issue10811
288
289        Recursively using a cursor, such as when reusing it from a generator led to segfaults.
290        Now we catch recursive cursor usage and raise a ProgrammingError.
291        """
292        con = sqlite.connect(":memory:")
293
294        cur = con.cursor()
295        cur.execute("create table a (bar)")
296        cur.execute("create table b (baz)")
297
298        def foo():
299            cur.execute("insert into a (bar) values (?)", (1,))
300            yield 1
301
302        with self.assertRaises(sqlite.ProgrammingError):
303            cur.executemany("insert into b (baz) values (?)",
304                            ((i,) for i in foo()))
305
306    def test_convert_timestamp_microsecond_padding(self):
307        """
308        http://bugs.python.org/issue14720
309
310        The microsecond parsing of convert_timestamp() should pad with zeros,
311        since the microsecond string "456" actually represents "456000".
312        """
313
314        con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES)
315        cur = con.cursor()
316        cur.execute("CREATE TABLE t (x TIMESTAMP)")
317
318        # Microseconds should be 456000
319        cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.456')")
320
321        # Microseconds should be truncated to 123456
322        cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.123456789')")
323
324        cur.execute("SELECT * FROM t")
325        values = [x[0] for x in cur.fetchall()]
326
327        self.assertEqual(values, [
328            datetime.datetime(2012, 4, 4, 15, 6, 0, 456000),
329            datetime.datetime(2012, 4, 4, 15, 6, 0, 123456),
330        ])
331
332    def test_invalid_isolation_level_type(self):
333        # isolation level is a string, not an integer
334        self.assertRaises(TypeError,
335                          sqlite.connect, ":memory:", isolation_level=123)
336
337
338    def test_null_character(self):
339        # Issue #21147
340        con = sqlite.connect(":memory:")
341        self.assertRaises(ValueError, con, "\0select 1")
342        self.assertRaises(ValueError, con, "select 1\0")
343        cur = con.cursor()
344        self.assertRaises(ValueError, cur.execute, " \0select 2")
345        self.assertRaises(ValueError, cur.execute, "select 2\0")
346
347    def test_commit_cursor_reset(self):
348        """
349        Connection.commit() did reset cursors, which made sqlite3
350        to return rows multiple times when fetched from cursors
351        after commit. See issues 10513 and 23129 for details.
352        """
353        con = sqlite.connect(":memory:")
354        con.executescript("""
355        create table t(c);
356        create table t2(c);
357        insert into t values(0);
358        insert into t values(1);
359        insert into t values(2);
360        """)
361
362        self.assertEqual(con.isolation_level, "")
363
364        counter = 0
365        for i, row in enumerate(con.execute("select c from t")):
366            with self.subTest(i=i, row=row):
367                con.execute("insert into t2(c) values (?)", (i,))
368                con.commit()
369                if counter == 0:
370                    self.assertEqual(row[0], 0)
371                elif counter == 1:
372                    self.assertEqual(row[0], 1)
373                elif counter == 2:
374                    self.assertEqual(row[0], 2)
375                counter += 1
376        self.assertEqual(counter, 3, "should have returned exactly three rows")
377
378    def test_bpo31770(self):
379        """
380        The interpreter shouldn't crash in case Cursor.__init__() is called
381        more than once.
382        """
383        def callback(*args):
384            pass
385        con = sqlite.connect(":memory:")
386        cur = sqlite.Cursor(con)
387        ref = weakref.ref(cur, callback)
388        cur.__init__(con)
389        del cur
390        # The interpreter shouldn't crash when ref is collected.
391        del ref
392        support.gc_collect()
393
394    def test_del_isolation_level_segfault(self):
395        with self.assertRaises(AttributeError):
396            del self.con.isolation_level
397
398    def test_bpo37347(self):
399        class Printer:
400            def log(self, *args):
401                return sqlite.SQLITE_OK
402
403        for method in [self.con.set_trace_callback,
404                       functools.partial(self.con.set_progress_handler, n=1),
405                       self.con.set_authorizer]:
406            printer_instance = Printer()
407            method(printer_instance.log)
408            method(printer_instance.log)
409            self.con.execute("select 1")  # trigger seg fault
410            method(None)
411
412    def test_return_empty_bytestring(self):
413        cur = self.con.execute("select X''")
414        val = cur.fetchone()[0]
415        self.assertEqual(val, b'')
416
417
418def suite():
419    tests = [
420        RegressionTests
421    ]
422    return unittest.TestSuite(
423        [unittest.TestLoader().loadTestsFromTestCase(t) for t in tests]
424    )
425
426def test():
427    runner = unittest.TextTestRunner()
428    runner.run(suite())
429
430if __name__ == "__main__":
431    test()
432