• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#-*- coding: iso-8859-1 -*-
2# pysqlite2/test/regression.py: pysqlite regression tests
3#
4# Copyright (C) 2006-2010 Gerhard H�ring <gh@ghaering.de>
5#
6# This file is part of pysqlite.
7#
8# This software is provided 'as-is', without any express or implied
9# warranty.  In no event will the authors be held liable for any damages
10# arising from the use of this software.
11#
12# Permission is granted to anyone to use this software for any purpose,
13# including commercial applications, and to alter it and redistribute it
14# freely, subject to the following restrictions:
15#
16# 1. The origin of this software must not be misrepresented; you must not
17#    claim that you wrote the original software. If you use this software
18#    in a product, an acknowledgment in the product documentation would be
19#    appreciated but is not required.
20# 2. Altered source versions must be plainly marked as such, and must not be
21#    misrepresented as being the original software.
22# 3. This notice may not be removed or altered from any source distribution.
23
24import datetime
25import unittest
26import sqlite3 as sqlite
27import weakref
28import functools
29from test import support
30
31class RegressionTests(unittest.TestCase):
32    def setUp(self):
33        self.con = sqlite.connect(":memory:")
34
35    def tearDown(self):
36        self.con.close()
37
38    def CheckPragmaUserVersion(self):
39        # This used to crash pysqlite because this pragma command returns NULL for the column name
40        cur = self.con.cursor()
41        cur.execute("pragma user_version")
42
43    def CheckPragmaSchemaVersion(self):
44        # This still crashed pysqlite <= 2.2.1
45        con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
46        try:
47            cur = self.con.cursor()
48            cur.execute("pragma schema_version")
49        finally:
50            cur.close()
51            con.close()
52
53    def CheckStatementReset(self):
54        # pysqlite 2.1.0 to 2.2.0 have the problem that not all statements are
55        # reset before a rollback, but only those that are still in the
56        # statement cache. The others are not accessible from the connection object.
57        con = sqlite.connect(":memory:", cached_statements=5)
58        cursors = [con.cursor() for x in range(5)]
59        cursors[0].execute("create table test(x)")
60        for i in range(10):
61            cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in range(10)])
62
63        for i in range(5):
64            cursors[i].execute(" " * i + "select x from test")
65
66        con.rollback()
67
68    def CheckColumnNameWithSpaces(self):
69        cur = self.con.cursor()
70        cur.execute('select 1 as "foo bar [datetime]"')
71        self.assertEqual(cur.description[0][0], "foo bar")
72
73        cur.execute('select 1 as "foo baz"')
74        self.assertEqual(cur.description[0][0], "foo baz")
75
76    def CheckStatementFinalizationOnCloseDb(self):
77        # pysqlite versions <= 2.3.3 only finalized statements in the statement
78        # cache when closing the database. statements that were still
79        # referenced in cursors weren't closed and could provoke "
80        # "OperationalError: Unable to close due to unfinalised statements".
81        con = sqlite.connect(":memory:")
82        cursors = []
83        # default statement cache size is 100
84        for i in range(105):
85            cur = con.cursor()
86            cursors.append(cur)
87            cur.execute("select 1 x union select " + str(i))
88        con.close()
89
90    @unittest.skipIf(sqlite.sqlite_version_info < (3, 2, 2), 'needs sqlite 3.2.2 or newer')
91    def CheckOnConflictRollback(self):
92        con = sqlite.connect(":memory:")
93        con.execute("create table foo(x, unique(x) on conflict rollback)")
94        con.execute("insert into foo(x) values (1)")
95        try:
96            con.execute("insert into foo(x) values (1)")
97        except sqlite.DatabaseError:
98            pass
99        con.execute("insert into foo(x) values (2)")
100        try:
101            con.commit()
102        except sqlite.OperationalError:
103            self.fail("pysqlite knew nothing about the implicit ROLLBACK")
104
105    def CheckWorkaroundForBuggySqliteTransferBindings(self):
106        """
107        pysqlite would crash with older SQLite versions unless
108        a workaround is implemented.
109        """
110        self.con.execute("create table foo(bar)")
111        self.con.execute("drop table foo")
112        self.con.execute("create table foo(bar)")
113
114    def CheckEmptyStatement(self):
115        """
116        pysqlite used to segfault with SQLite versions 3.5.x. These return NULL
117        for "no-operation" statements
118        """
119        self.con.execute("")
120
121    def CheckTypeMapUsage(self):
122        """
123        pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling
124        a statement. This test exhibits the problem.
125        """
126        SELECT = "select * from foo"
127        con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES)
128        con.execute("create table foo(bar timestamp)")
129        con.execute("insert into foo(bar) values (?)", (datetime.datetime.now(),))
130        con.execute(SELECT)
131        con.execute("drop table foo")
132        con.execute("create table foo(bar integer)")
133        con.execute("insert into foo(bar) values (5)")
134        con.execute(SELECT)
135
136    def CheckErrorMsgDecodeError(self):
137        # When porting the module to Python 3.0, the error message about
138        # decoding errors disappeared. This verifies they're back again.
139        with self.assertRaises(sqlite.OperationalError) as cm:
140            self.con.execute("select 'xxx' || ? || 'yyy' colname",
141                             (bytes(bytearray([250])),)).fetchone()
142        msg = "Could not decode to UTF-8 column 'colname' with text 'xxx"
143        self.assertIn(msg, str(cm.exception))
144
145    def CheckRegisterAdapter(self):
146        """
147        See issue 3312.
148        """
149        self.assertRaises(TypeError, sqlite.register_adapter, {}, None)
150
151    def CheckSetIsolationLevel(self):
152        # See issue 27881.
153        class CustomStr(str):
154            def upper(self):
155                return None
156            def __del__(self):
157                con.isolation_level = ""
158
159        con = sqlite.connect(":memory:")
160        con.isolation_level = None
161        for level in "", "DEFERRED", "IMMEDIATE", "EXCLUSIVE":
162            with self.subTest(level=level):
163                con.isolation_level = level
164                con.isolation_level = level.lower()
165                con.isolation_level = level.capitalize()
166                con.isolation_level = CustomStr(level)
167
168        # setting isolation_level failure should not alter previous state
169        con.isolation_level = None
170        con.isolation_level = "DEFERRED"
171        pairs = [
172            (1, TypeError), (b'', TypeError), ("abc", ValueError),
173            ("IMMEDIATE\0EXCLUSIVE", ValueError), ("\xe9", ValueError),
174        ]
175        for value, exc in pairs:
176            with self.subTest(level=value):
177                with self.assertRaises(exc):
178                    con.isolation_level = value
179                self.assertEqual(con.isolation_level, "DEFERRED")
180
181    def CheckCursorConstructorCallCheck(self):
182        """
183        Verifies that cursor methods check whether base class __init__ was
184        called.
185        """
186        class Cursor(sqlite.Cursor):
187            def __init__(self, con):
188                pass
189
190        con = sqlite.connect(":memory:")
191        cur = Cursor(con)
192        with self.assertRaises(sqlite.ProgrammingError):
193            cur.execute("select 4+5").fetchall()
194        with self.assertRaisesRegex(sqlite.ProgrammingError,
195                                    r'^Base Cursor\.__init__ not called\.$'):
196            cur.close()
197
198    def CheckStrSubclass(self):
199        """
200        The Python 3.0 port of the module didn't cope with values of subclasses of str.
201        """
202        class MyStr(str): pass
203        self.con.execute("select ?", (MyStr("abc"),))
204
205    def CheckConnectionConstructorCallCheck(self):
206        """
207        Verifies that connection methods check whether base class __init__ was
208        called.
209        """
210        class Connection(sqlite.Connection):
211            def __init__(self, name):
212                pass
213
214        con = Connection(":memory:")
215        with self.assertRaises(sqlite.ProgrammingError):
216            cur = con.cursor()
217
218    def CheckCursorRegistration(self):
219        """
220        Verifies that subclassed cursor classes are correctly registered with
221        the connection object, too.  (fetch-across-rollback problem)
222        """
223        class Connection(sqlite.Connection):
224            def cursor(self):
225                return Cursor(self)
226
227        class Cursor(sqlite.Cursor):
228            def __init__(self, con):
229                sqlite.Cursor.__init__(self, con)
230
231        con = Connection(":memory:")
232        cur = con.cursor()
233        cur.execute("create table foo(x)")
234        cur.executemany("insert into foo(x) values (?)", [(3,), (4,), (5,)])
235        cur.execute("select x from foo")
236        con.rollback()
237        with self.assertRaises(sqlite.InterfaceError):
238            cur.fetchall()
239
240    def CheckAutoCommit(self):
241        """
242        Verifies that creating a connection in autocommit mode works.
243        2.5.3 introduced a regression so that these could no longer
244        be created.
245        """
246        con = sqlite.connect(":memory:", isolation_level=None)
247
248    def CheckPragmaAutocommit(self):
249        """
250        Verifies that running a PRAGMA statement that does an autocommit does
251        work. This did not work in 2.5.3/2.5.4.
252        """
253        cur = self.con.cursor()
254        cur.execute("create table foo(bar)")
255        cur.execute("insert into foo(bar) values (5)")
256
257        cur.execute("pragma page_size")
258        row = cur.fetchone()
259
260    def CheckConnectionCall(self):
261        """
262        Call a connection with a non-string SQL request: check error handling
263        of the statement constructor.
264        """
265        self.assertRaises(sqlite.Warning, self.con, 1)
266
267    def CheckCollation(self):
268        def collation_cb(a, b):
269            return 1
270        self.assertRaises(sqlite.ProgrammingError, self.con.create_collation,
271            # Lone surrogate cannot be encoded to the default encoding (utf8)
272            "\uDC80", collation_cb)
273
274    def CheckRecursiveCursorUse(self):
275        """
276        http://bugs.python.org/issue10811
277
278        Recursively using a cursor, such as when reusing it from a generator led to segfaults.
279        Now we catch recursive cursor usage and raise a ProgrammingError.
280        """
281        con = sqlite.connect(":memory:")
282
283        cur = con.cursor()
284        cur.execute("create table a (bar)")
285        cur.execute("create table b (baz)")
286
287        def foo():
288            cur.execute("insert into a (bar) values (?)", (1,))
289            yield 1
290
291        with self.assertRaises(sqlite.ProgrammingError):
292            cur.executemany("insert into b (baz) values (?)",
293                            ((i,) for i in foo()))
294
295    def CheckConvertTimestampMicrosecondPadding(self):
296        """
297        http://bugs.python.org/issue14720
298
299        The microsecond parsing of convert_timestamp() should pad with zeros,
300        since the microsecond string "456" actually represents "456000".
301        """
302
303        con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES)
304        cur = con.cursor()
305        cur.execute("CREATE TABLE t (x TIMESTAMP)")
306
307        # Microseconds should be 456000
308        cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.456')")
309
310        # Microseconds should be truncated to 123456
311        cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.123456789')")
312
313        cur.execute("SELECT * FROM t")
314        values = [x[0] for x in cur.fetchall()]
315
316        self.assertEqual(values, [
317            datetime.datetime(2012, 4, 4, 15, 6, 0, 456000),
318            datetime.datetime(2012, 4, 4, 15, 6, 0, 123456),
319        ])
320
321    def CheckInvalidIsolationLevelType(self):
322        # isolation level is a string, not an integer
323        self.assertRaises(TypeError,
324                          sqlite.connect, ":memory:", isolation_level=123)
325
326
327    def CheckNullCharacter(self):
328        # Issue #21147
329        con = sqlite.connect(":memory:")
330        self.assertRaises(ValueError, con, "\0select 1")
331        self.assertRaises(ValueError, con, "select 1\0")
332        cur = con.cursor()
333        self.assertRaises(ValueError, cur.execute, " \0select 2")
334        self.assertRaises(ValueError, cur.execute, "select 2\0")
335
336    def CheckCommitCursorReset(self):
337        """
338        Connection.commit() did reset cursors, which made sqlite3
339        to return rows multiple times when fetched from cursors
340        after commit. See issues 10513 and 23129 for details.
341        """
342        con = sqlite.connect(":memory:")
343        con.executescript("""
344        create table t(c);
345        create table t2(c);
346        insert into t values(0);
347        insert into t values(1);
348        insert into t values(2);
349        """)
350
351        self.assertEqual(con.isolation_level, "")
352
353        counter = 0
354        for i, row in enumerate(con.execute("select c from t")):
355            with self.subTest(i=i, row=row):
356                con.execute("insert into t2(c) values (?)", (i,))
357                con.commit()
358                if counter == 0:
359                    self.assertEqual(row[0], 0)
360                elif counter == 1:
361                    self.assertEqual(row[0], 1)
362                elif counter == 2:
363                    self.assertEqual(row[0], 2)
364                counter += 1
365        self.assertEqual(counter, 3, "should have returned exactly three rows")
366
367    def CheckBpo31770(self):
368        """
369        The interpreter shouldn't crash in case Cursor.__init__() is called
370        more than once.
371        """
372        def callback(*args):
373            pass
374        con = sqlite.connect(":memory:")
375        cur = sqlite.Cursor(con)
376        ref = weakref.ref(cur, callback)
377        cur.__init__(con)
378        del cur
379        # The interpreter shouldn't crash when ref is collected.
380        del ref
381        support.gc_collect()
382
383    def CheckDelIsolation_levelSegfault(self):
384        with self.assertRaises(AttributeError):
385            del self.con.isolation_level
386
387    def CheckBpo37347(self):
388        class Printer:
389            def log(self, *args):
390                return sqlite.SQLITE_OK
391
392        for method in [self.con.set_trace_callback,
393                       functools.partial(self.con.set_progress_handler, n=1),
394                       self.con.set_authorizer]:
395            printer_instance = Printer()
396            method(printer_instance.log)
397            method(printer_instance.log)
398            self.con.execute("select 1")  # trigger seg fault
399            method(None)
400
401
402
403def suite():
404    regression_suite = unittest.makeSuite(RegressionTests, "Check")
405    return unittest.TestSuite((
406        regression_suite,
407    ))
408
409def test():
410    runner = unittest.TextTestRunner()
411    runner.run(suite())
412
413if __name__ == "__main__":
414    test()
415