• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# pysqlite2/test/regression.py: pysqlite regression tests
2#
3# Copyright (C) 2006-2010 Gerhard Häring <gh@ghaering.de>
4#
5# This file is part of pysqlite.
6#
7# This software is provided 'as-is', without any express or implied
8# warranty.  In no event will the authors be held liable for any damages
9# arising from the use of this software.
10#
11# Permission is granted to anyone to use this software for any purpose,
12# including commercial applications, and to alter it and redistribute it
13# freely, subject to the following restrictions:
14#
15# 1. The origin of this software must not be misrepresented; you must not
16#    claim that you wrote the original software. If you use this software
17#    in a product, an acknowledgment in the product documentation would be
18#    appreciated but is not required.
19# 2. Altered source versions must be plainly marked as such, and must not be
20#    misrepresented as being the original software.
21# 3. This notice may not be removed or altered from any source distribution.
22
23import datetime
24import unittest
25import sqlite3 as sqlite
26import weakref
27import functools
28
29from test import support
30from unittest.mock import patch
31
32from .util import memory_database, cx_limit
33from .util import MemoryDatabaseMixin
34
35
36class RegressionTests(MemoryDatabaseMixin, unittest.TestCase):
37
38    def test_pragma_user_version(self):
39        # This used to crash pysqlite because this pragma command returns NULL for the column name
40        cur = self.con.cursor()
41        cur.execute("pragma user_version")
42
43    def test_pragma_schema_version(self):
44        # This still crashed pysqlite <= 2.2.1
45        with memory_database(detect_types=sqlite.PARSE_COLNAMES) as con:
46            cur = self.con.cursor()
47            cur.execute("pragma schema_version")
48
49    def test_statement_reset(self):
50        # pysqlite 2.1.0 to 2.2.0 have the problem that not all statements are
51        # reset before a rollback, but only those that are still in the
52        # statement cache. The others are not accessible from the connection object.
53        with memory_database(cached_statements=5) as con:
54            cursors = [con.cursor() for x in range(5)]
55            cursors[0].execute("create table test(x)")
56            for i in range(10):
57                cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in range(10)])
58
59            for i in range(5):
60                cursors[i].execute(" " * i + "select x from test")
61
62            con.rollback()
63
64    def test_column_name_with_spaces(self):
65        cur = self.con.cursor()
66        cur.execute('select 1 as "foo bar [datetime]"')
67        self.assertEqual(cur.description[0][0], "foo bar [datetime]")
68
69        cur.execute('select 1 as "foo baz"')
70        self.assertEqual(cur.description[0][0], "foo baz")
71
72    def test_statement_finalization_on_close_db(self):
73        # pysqlite versions <= 2.3.3 only finalized statements in the statement
74        # cache when closing the database. statements that were still
75        # referenced in cursors weren't closed and could provoke "
76        # "OperationalError: Unable to close due to unfinalised statements".
77        cursors = []
78        # default statement cache size is 100
79        for i in range(105):
80            cur = self.con.cursor()
81            cursors.append(cur)
82            cur.execute("select 1 x union select " + str(i))
83
84    def test_on_conflict_rollback(self):
85        con = self.con
86        con.execute("create table foo(x, unique(x) on conflict rollback)")
87        con.execute("insert into foo(x) values (1)")
88        try:
89            con.execute("insert into foo(x) values (1)")
90        except sqlite.DatabaseError:
91            pass
92        con.execute("insert into foo(x) values (2)")
93        try:
94            con.commit()
95        except sqlite.OperationalError:
96            self.fail("pysqlite knew nothing about the implicit ROLLBACK")
97
98    def test_workaround_for_buggy_sqlite_transfer_bindings(self):
99        """
100        pysqlite would crash with older SQLite versions unless
101        a workaround is implemented.
102        """
103        self.con.execute("create table foo(bar)")
104        self.con.execute("drop table foo")
105        self.con.execute("create table foo(bar)")
106
107    def test_empty_statement(self):
108        """
109        pysqlite used to segfault with SQLite versions 3.5.x. These return NULL
110        for "no-operation" statements
111        """
112        self.con.execute("")
113
114    def test_type_map_usage(self):
115        """
116        pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling
117        a statement. This test exhibits the problem.
118        """
119        SELECT = "select * from foo"
120        with memory_database(detect_types=sqlite.PARSE_DECLTYPES) as con:
121            cur = con.cursor()
122            cur.execute("create table foo(bar timestamp)")
123            with self.assertWarnsRegex(DeprecationWarning, "adapter"):
124                cur.execute("insert into foo(bar) values (?)", (datetime.datetime.now(),))
125            cur.execute(SELECT)
126            cur.execute("drop table foo")
127            cur.execute("create table foo(bar integer)")
128            cur.execute("insert into foo(bar) values (5)")
129            cur.execute(SELECT)
130
131    def test_bind_mutating_list(self):
132        # Issue41662: Crash when mutate a list of parameters during iteration.
133        class X:
134            def __conform__(self, protocol):
135                parameters.clear()
136                return "..."
137        parameters = [X(), 0]
138        with memory_database(detect_types=sqlite.PARSE_DECLTYPES) as con:
139            con.execute("create table foo(bar X, baz integer)")
140            # Should not crash
141            with self.assertRaises(IndexError):
142                con.execute("insert into foo(bar, baz) values (?, ?)", parameters)
143
144    def test_error_msg_decode_error(self):
145        # When porting the module to Python 3.0, the error message about
146        # decoding errors disappeared. This verifies they're back again.
147        with self.assertRaises(sqlite.OperationalError) as cm:
148            self.con.execute("select 'xxx' || ? || 'yyy' colname",
149                             (bytes(bytearray([250])),)).fetchone()
150        msg = "Could not decode to UTF-8 column 'colname' with text 'xxx"
151        self.assertIn(msg, str(cm.exception))
152
153    def test_register_adapter(self):
154        """
155        See issue 3312.
156        """
157        self.assertRaises(TypeError, sqlite.register_adapter, {}, None)
158
159    def test_set_isolation_level(self):
160        # See issue 27881.
161        class CustomStr(str):
162            def upper(self):
163                return None
164            def __del__(self):
165                con.isolation_level = ""
166
167        con = self.con
168        con.isolation_level = None
169        for level in "", "DEFERRED", "IMMEDIATE", "EXCLUSIVE":
170            with self.subTest(level=level):
171                con.isolation_level = level
172                con.isolation_level = level.lower()
173                con.isolation_level = level.capitalize()
174                con.isolation_level = CustomStr(level)
175
176        # setting isolation_level failure should not alter previous state
177        con.isolation_level = None
178        con.isolation_level = "DEFERRED"
179        pairs = [
180            (1, TypeError), (b'', TypeError), ("abc", ValueError),
181            ("IMMEDIATE\0EXCLUSIVE", ValueError), ("\xe9", ValueError),
182        ]
183        for value, exc in pairs:
184            with self.subTest(level=value):
185                with self.assertRaises(exc):
186                    con.isolation_level = value
187                self.assertEqual(con.isolation_level, "DEFERRED")
188
189    def test_cursor_constructor_call_check(self):
190        """
191        Verifies that cursor methods check whether base class __init__ was
192        called.
193        """
194        class Cursor(sqlite.Cursor):
195            def __init__(self, con):
196                pass
197
198        cur = Cursor(self.con)
199        with self.assertRaises(sqlite.ProgrammingError):
200            cur.execute("select 4+5").fetchall()
201        with self.assertRaisesRegex(sqlite.ProgrammingError,
202                                    r'^Base Cursor\.__init__ not called\.$'):
203            cur.close()
204
205    def test_str_subclass(self):
206        """
207        The Python 3.0 port of the module didn't cope with values of subclasses of str.
208        """
209        class MyStr(str): pass
210        self.con.execute("select ?", (MyStr("abc"),))
211
212    def test_connection_constructor_call_check(self):
213        """
214        Verifies that connection methods check whether base class __init__ was
215        called.
216        """
217        class Connection(sqlite.Connection):
218            def __init__(self, name):
219                pass
220
221        con = Connection(":memory:")
222        with self.assertRaises(sqlite.ProgrammingError):
223            cur = con.cursor()
224
225    def test_auto_commit(self):
226        """
227        Verifies that creating a connection in autocommit mode works.
228        2.5.3 introduced a regression so that these could no longer
229        be created.
230        """
231        with memory_database(isolation_level=None) as con:
232            self.assertIsNone(con.isolation_level)
233            self.assertFalse(con.in_transaction)
234
235    def test_pragma_autocommit(self):
236        """
237        Verifies that running a PRAGMA statement that does an autocommit does
238        work. This did not work in 2.5.3/2.5.4.
239        """
240        cur = self.con.cursor()
241        cur.execute("create table foo(bar)")
242        cur.execute("insert into foo(bar) values (5)")
243
244        cur.execute("pragma page_size")
245        row = cur.fetchone()
246
247    def test_connection_call(self):
248        """
249        Call a connection with a non-string SQL request: check error handling
250        of the statement constructor.
251        """
252        self.assertRaises(TypeError, self.con, b"select 1")
253
254    def test_collation(self):
255        def collation_cb(a, b):
256            return 1
257        self.assertRaises(UnicodeEncodeError, self.con.create_collation,
258            # Lone surrogate cannot be encoded to the default encoding (utf8)
259            "\uDC80", collation_cb)
260
261    def test_recursive_cursor_use(self):
262        """
263        http://bugs.python.org/issue10811
264
265        Recursively using a cursor, such as when reusing it from a generator led to segfaults.
266        Now we catch recursive cursor usage and raise a ProgrammingError.
267        """
268        cur = self.con.cursor()
269        cur.execute("create table a (bar)")
270        cur.execute("create table b (baz)")
271
272        def foo():
273            cur.execute("insert into a (bar) values (?)", (1,))
274            yield 1
275
276        with self.assertRaises(sqlite.ProgrammingError):
277            cur.executemany("insert into b (baz) values (?)",
278                            ((i,) for i in foo()))
279
280    def test_convert_timestamp_microsecond_padding(self):
281        """
282        http://bugs.python.org/issue14720
283
284        The microsecond parsing of convert_timestamp() should pad with zeros,
285        since the microsecond string "456" actually represents "456000".
286        """
287
288        with memory_database(detect_types=sqlite.PARSE_DECLTYPES) as con:
289            cur = con.cursor()
290            cur.execute("CREATE TABLE t (x TIMESTAMP)")
291
292            # Microseconds should be 456000
293            cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.456')")
294
295            # Microseconds should be truncated to 123456
296            cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.123456789')")
297
298            cur.execute("SELECT * FROM t")
299            with self.assertWarnsRegex(DeprecationWarning, "converter"):
300                values = [x[0] for x in cur.fetchall()]
301
302            self.assertEqual(values, [
303                datetime.datetime(2012, 4, 4, 15, 6, 0, 456000),
304                datetime.datetime(2012, 4, 4, 15, 6, 0, 123456),
305            ])
306
307    def test_invalid_isolation_level_type(self):
308        # isolation level is a string, not an integer
309        regex = "isolation_level must be str or None"
310        with self.assertRaisesRegex(TypeError, regex):
311            memory_database(isolation_level=123).__enter__()
312
313
314    def test_null_character(self):
315        # Issue #21147
316        cur = self.con.cursor()
317        queries = ["\0select 1", "select 1\0"]
318        for query in queries:
319            with self.subTest(query=query):
320                self.assertRaisesRegex(sqlite.ProgrammingError, "null char",
321                                       self.con.execute, query)
322            with self.subTest(query=query):
323                self.assertRaisesRegex(sqlite.ProgrammingError, "null char",
324                                       cur.execute, query)
325
326    def test_surrogates(self):
327        con = self.con
328        self.assertRaises(UnicodeEncodeError, con, "select '\ud8ff'")
329        self.assertRaises(UnicodeEncodeError, con, "select '\udcff'")
330        cur = con.cursor()
331        self.assertRaises(UnicodeEncodeError, cur.execute, "select '\ud8ff'")
332        self.assertRaises(UnicodeEncodeError, cur.execute, "select '\udcff'")
333
334    def test_large_sql(self):
335        msg = "query string is too large"
336        with memory_database() as cx, cx_limit(cx) as lim:
337            cu = cx.cursor()
338
339            cx("select 1".ljust(lim))
340            # use a different SQL statement; don't reuse from the LRU cache
341            cu.execute("select 2".ljust(lim))
342
343            sql = "select 3".ljust(lim+1)
344            self.assertRaisesRegex(sqlite.DataError, msg, cx, sql)
345            self.assertRaisesRegex(sqlite.DataError, msg, cu.execute, sql)
346
347    def test_commit_cursor_reset(self):
348        """
349        Connection.commit() did reset cursors, which made sqlite3
350        to return rows multiple times when fetched from cursors
351        after commit. See issues 10513 and 23129 for details.
352        """
353        con = self.con
354        con.executescript("""
355        create table t(c);
356        create table t2(c);
357        insert into t values(0);
358        insert into t values(1);
359        insert into t values(2);
360        """)
361
362        self.assertEqual(con.isolation_level, "")
363
364        counter = 0
365        for i, row in enumerate(con.execute("select c from t")):
366            with self.subTest(i=i, row=row):
367                con.execute("insert into t2(c) values (?)", (i,))
368                con.commit()
369                if counter == 0:
370                    self.assertEqual(row[0], 0)
371                elif counter == 1:
372                    self.assertEqual(row[0], 1)
373                elif counter == 2:
374                    self.assertEqual(row[0], 2)
375                counter += 1
376        self.assertEqual(counter, 3, "should have returned exactly three rows")
377
378    def test_bpo31770(self):
379        """
380        The interpreter shouldn't crash in case Cursor.__init__() is called
381        more than once.
382        """
383        def callback(*args):
384            pass
385        cur = sqlite.Cursor(self.con)
386        ref = weakref.ref(cur, callback)
387        cur.__init__(self.con)
388        del cur
389        # The interpreter shouldn't crash when ref is collected.
390        del ref
391        support.gc_collect()
392
393    def test_del_isolation_level_segfault(self):
394        with self.assertRaises(AttributeError):
395            del self.con.isolation_level
396
397    def test_bpo37347(self):
398        class Printer:
399            def log(self, *args):
400                return sqlite.SQLITE_OK
401
402        for method in [self.con.set_trace_callback,
403                       functools.partial(self.con.set_progress_handler, n=1),
404                       self.con.set_authorizer]:
405            printer_instance = Printer()
406            method(printer_instance.log)
407            method(printer_instance.log)
408            self.con.execute("select 1")  # trigger seg fault
409            method(None)
410
411    def test_return_empty_bytestring(self):
412        cur = self.con.execute("select X''")
413        val = cur.fetchone()[0]
414        self.assertEqual(val, b'')
415
416    def test_table_lock_cursor_replace_stmt(self):
417        with memory_database() as con:
418            con = self.con
419            cur = con.cursor()
420            cur.execute("create table t(t)")
421            cur.executemany("insert into t values(?)",
422                            ((v,) for v in range(5)))
423            con.commit()
424            cur.execute("select t from t")
425            cur.execute("drop table t")
426            con.commit()
427
428    def test_table_lock_cursor_dealloc(self):
429        with memory_database() as con:
430            con.execute("create table t(t)")
431            con.executemany("insert into t values(?)",
432                            ((v,) for v in range(5)))
433            con.commit()
434            cur = con.execute("select t from t")
435            del cur
436            support.gc_collect()
437            con.execute("drop table t")
438            con.commit()
439
440    def test_table_lock_cursor_non_readonly_select(self):
441        with memory_database() as con:
442            con.execute("create table t(t)")
443            con.executemany("insert into t values(?)",
444                            ((v,) for v in range(5)))
445            con.commit()
446            def dup(v):
447                con.execute("insert into t values(?)", (v,))
448                return
449            con.create_function("dup", 1, dup)
450            cur = con.execute("select dup(t) from t")
451            del cur
452            support.gc_collect()
453            con.execute("drop table t")
454            con.commit()
455
456    def test_executescript_step_through_select(self):
457        with memory_database() as con:
458            values = [(v,) for v in range(5)]
459            with con:
460                con.execute("create table t(t)")
461                con.executemany("insert into t values(?)", values)
462            steps = []
463            con.create_function("step", 1, lambda x: steps.append((x,)))
464            con.executescript("select step(t) from t")
465            self.assertEqual(steps, values)
466
467
468class RecursiveUseOfCursors(unittest.TestCase):
469    # GH-80254: sqlite3 should not segfault for recursive use of cursors.
470    msg = "Recursive use of cursors not allowed"
471
472    def setUp(self):
473        self.con = sqlite.connect(":memory:",
474                                  detect_types=sqlite.PARSE_COLNAMES)
475        self.cur = self.con.cursor()
476        self.cur.execute("create table test(x foo)")
477        self.cur.executemany("insert into test(x) values (?)",
478                             [("foo",), ("bar",)])
479
480    def tearDown(self):
481        self.cur.close()
482        self.con.close()
483
484    def test_recursive_cursor_init(self):
485        conv = lambda x: self.cur.__init__(self.con)
486        with patch.dict(sqlite.converters, {"INIT": conv}):
487            self.cur.execute('select x as "x [INIT]", x from test')
488            self.assertRaisesRegex(sqlite.ProgrammingError, self.msg,
489                                   self.cur.fetchall)
490
491    def test_recursive_cursor_close(self):
492        conv = lambda x: self.cur.close()
493        with patch.dict(sqlite.converters, {"CLOSE": conv}):
494            self.cur.execute('select x as "x [CLOSE]", x from test')
495            self.assertRaisesRegex(sqlite.ProgrammingError, self.msg,
496                                   self.cur.fetchall)
497
498    def test_recursive_cursor_iter(self):
499        conv = lambda x, l=[]: self.cur.fetchone() if l else l.append(None)
500        with patch.dict(sqlite.converters, {"ITER": conv}):
501            self.cur.execute('select x as "x [ITER]", x from test')
502            self.assertRaisesRegex(sqlite.ProgrammingError, self.msg,
503                                   self.cur.fetchall)
504
505
506if __name__ == "__main__":
507    unittest.main()
508