1# pysqlite2/test/regression.py: pysqlite regression tests 2# 3# Copyright (C) 2006-2010 Gerhard Häring <gh@ghaering.de> 4# 5# This file is part of pysqlite. 6# 7# This software is provided 'as-is', without any express or implied 8# warranty. In no event will the authors be held liable for any damages 9# arising from the use of this software. 10# 11# Permission is granted to anyone to use this software for any purpose, 12# including commercial applications, and to alter it and redistribute it 13# freely, subject to the following restrictions: 14# 15# 1. The origin of this software must not be misrepresented; you must not 16# claim that you wrote the original software. If you use this software 17# in a product, an acknowledgment in the product documentation would be 18# appreciated but is not required. 19# 2. Altered source versions must be plainly marked as such, and must not be 20# misrepresented as being the original software. 21# 3. This notice may not be removed or altered from any source distribution. 22 23import datetime 24import unittest 25import sqlite3 as sqlite 26import weakref 27import functools 28from test import support 29 30class RegressionTests(unittest.TestCase): 31 def setUp(self): 32 self.con = sqlite.connect(":memory:") 33 34 def tearDown(self): 35 self.con.close() 36 37 def test_pragma_user_version(self): 38 # This used to crash pysqlite because this pragma command returns NULL for the column name 39 cur = self.con.cursor() 40 cur.execute("pragma user_version") 41 42 def test_pragma_schema_version(self): 43 # This still crashed pysqlite <= 2.2.1 44 con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES) 45 try: 46 cur = self.con.cursor() 47 cur.execute("pragma schema_version") 48 finally: 49 cur.close() 50 con.close() 51 52 def test_statement_reset(self): 53 # pysqlite 2.1.0 to 2.2.0 have the problem that not all statements are 54 # reset before a rollback, but only those that are still in the 55 # statement cache. The others are not accessible from the connection object. 56 con = sqlite.connect(":memory:", cached_statements=5) 57 cursors = [con.cursor() for x in range(5)] 58 cursors[0].execute("create table test(x)") 59 for i in range(10): 60 cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in range(10)]) 61 62 for i in range(5): 63 cursors[i].execute(" " * i + "select x from test") 64 65 con.rollback() 66 67 def test_column_name_with_spaces(self): 68 cur = self.con.cursor() 69 cur.execute('select 1 as "foo bar [datetime]"') 70 self.assertEqual(cur.description[0][0], "foo bar [datetime]") 71 72 cur.execute('select 1 as "foo baz"') 73 self.assertEqual(cur.description[0][0], "foo baz") 74 75 def test_statement_finalization_on_close_db(self): 76 # pysqlite versions <= 2.3.3 only finalized statements in the statement 77 # cache when closing the database. statements that were still 78 # referenced in cursors weren't closed and could provoke " 79 # "OperationalError: Unable to close due to unfinalised statements". 80 con = sqlite.connect(":memory:") 81 cursors = [] 82 # default statement cache size is 100 83 for i in range(105): 84 cur = con.cursor() 85 cursors.append(cur) 86 cur.execute("select 1 x union select " + str(i)) 87 con.close() 88 89 def test_on_conflict_rollback(self): 90 con = sqlite.connect(":memory:") 91 con.execute("create table foo(x, unique(x) on conflict rollback)") 92 con.execute("insert into foo(x) values (1)") 93 try: 94 con.execute("insert into foo(x) values (1)") 95 except sqlite.DatabaseError: 96 pass 97 con.execute("insert into foo(x) values (2)") 98 try: 99 con.commit() 100 except sqlite.OperationalError: 101 self.fail("pysqlite knew nothing about the implicit ROLLBACK") 102 103 def test_workaround_for_buggy_sqlite_transfer_bindings(self): 104 """ 105 pysqlite would crash with older SQLite versions unless 106 a workaround is implemented. 107 """ 108 self.con.execute("create table foo(bar)") 109 self.con.execute("drop table foo") 110 self.con.execute("create table foo(bar)") 111 112 def test_empty_statement(self): 113 """ 114 pysqlite used to segfault with SQLite versions 3.5.x. These return NULL 115 for "no-operation" statements 116 """ 117 self.con.execute("") 118 119 def test_type_map_usage(self): 120 """ 121 pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling 122 a statement. This test exhibits the problem. 123 """ 124 SELECT = "select * from foo" 125 con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES) 126 con.execute("create table foo(bar timestamp)") 127 con.execute("insert into foo(bar) values (?)", (datetime.datetime.now(),)) 128 con.execute(SELECT).close() 129 con.execute("drop table foo") 130 con.execute("create table foo(bar integer)") 131 con.execute("insert into foo(bar) values (5)") 132 con.execute(SELECT).close() 133 134 def test_bind_mutating_list(self): 135 # Issue41662: Crash when mutate a list of parameters during iteration. 136 class X: 137 def __conform__(self, protocol): 138 parameters.clear() 139 return "..." 140 parameters = [X(), 0] 141 con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES) 142 con.execute("create table foo(bar X, baz integer)") 143 # Should not crash 144 with self.assertRaises(IndexError): 145 con.execute("insert into foo(bar, baz) values (?, ?)", parameters) 146 147 def test_error_msg_decode_error(self): 148 # When porting the module to Python 3.0, the error message about 149 # decoding errors disappeared. This verifies they're back again. 150 with self.assertRaises(sqlite.OperationalError) as cm: 151 self.con.execute("select 'xxx' || ? || 'yyy' colname", 152 (bytes(bytearray([250])),)).fetchone() 153 msg = "Could not decode to UTF-8 column 'colname' with text 'xxx" 154 self.assertIn(msg, str(cm.exception)) 155 156 def test_register_adapter(self): 157 """ 158 See issue 3312. 159 """ 160 self.assertRaises(TypeError, sqlite.register_adapter, {}, None) 161 162 def test_set_isolation_level(self): 163 # See issue 27881. 164 class CustomStr(str): 165 def upper(self): 166 return None 167 def __del__(self): 168 con.isolation_level = "" 169 170 con = sqlite.connect(":memory:") 171 con.isolation_level = None 172 for level in "", "DEFERRED", "IMMEDIATE", "EXCLUSIVE": 173 with self.subTest(level=level): 174 con.isolation_level = level 175 con.isolation_level = level.lower() 176 con.isolation_level = level.capitalize() 177 con.isolation_level = CustomStr(level) 178 179 # setting isolation_level failure should not alter previous state 180 con.isolation_level = None 181 con.isolation_level = "DEFERRED" 182 pairs = [ 183 (1, TypeError), (b'', TypeError), ("abc", ValueError), 184 ("IMMEDIATE\0EXCLUSIVE", ValueError), ("\xe9", ValueError), 185 ] 186 for value, exc in pairs: 187 with self.subTest(level=value): 188 with self.assertRaises(exc): 189 con.isolation_level = value 190 self.assertEqual(con.isolation_level, "DEFERRED") 191 192 def test_cursor_constructor_call_check(self): 193 """ 194 Verifies that cursor methods check whether base class __init__ was 195 called. 196 """ 197 class Cursor(sqlite.Cursor): 198 def __init__(self, con): 199 pass 200 201 con = sqlite.connect(":memory:") 202 cur = Cursor(con) 203 with self.assertRaises(sqlite.ProgrammingError): 204 cur.execute("select 4+5").fetchall() 205 with self.assertRaisesRegex(sqlite.ProgrammingError, 206 r'^Base Cursor\.__init__ not called\.$'): 207 cur.close() 208 209 def test_str_subclass(self): 210 """ 211 The Python 3.0 port of the module didn't cope with values of subclasses of str. 212 """ 213 class MyStr(str): pass 214 self.con.execute("select ?", (MyStr("abc"),)) 215 216 def test_connection_constructor_call_check(self): 217 """ 218 Verifies that connection methods check whether base class __init__ was 219 called. 220 """ 221 class Connection(sqlite.Connection): 222 def __init__(self, name): 223 pass 224 225 con = Connection(":memory:") 226 with self.assertRaises(sqlite.ProgrammingError): 227 cur = con.cursor() 228 229 def test_cursor_registration(self): 230 """ 231 Verifies that subclassed cursor classes are correctly registered with 232 the connection object, too. (fetch-across-rollback problem) 233 """ 234 class Connection(sqlite.Connection): 235 def cursor(self): 236 return Cursor(self) 237 238 class Cursor(sqlite.Cursor): 239 def __init__(self, con): 240 sqlite.Cursor.__init__(self, con) 241 242 con = Connection(":memory:") 243 cur = con.cursor() 244 cur.execute("create table foo(x)") 245 cur.executemany("insert into foo(x) values (?)", [(3,), (4,), (5,)]) 246 cur.execute("select x from foo") 247 con.rollback() 248 with self.assertRaises(sqlite.InterfaceError): 249 cur.fetchall() 250 251 def test_auto_commit(self): 252 """ 253 Verifies that creating a connection in autocommit mode works. 254 2.5.3 introduced a regression so that these could no longer 255 be created. 256 """ 257 con = sqlite.connect(":memory:", isolation_level=None) 258 259 def test_pragma_autocommit(self): 260 """ 261 Verifies that running a PRAGMA statement that does an autocommit does 262 work. This did not work in 2.5.3/2.5.4. 263 """ 264 cur = self.con.cursor() 265 cur.execute("create table foo(bar)") 266 cur.execute("insert into foo(bar) values (5)") 267 268 cur.execute("pragma page_size") 269 row = cur.fetchone() 270 271 def test_connection_call(self): 272 """ 273 Call a connection with a non-string SQL request: check error handling 274 of the statement constructor. 275 """ 276 self.assertRaises(TypeError, self.con, 1) 277 278 def test_collation(self): 279 def collation_cb(a, b): 280 return 1 281 self.assertRaises(sqlite.ProgrammingError, self.con.create_collation, 282 # Lone surrogate cannot be encoded to the default encoding (utf8) 283 "\uDC80", collation_cb) 284 285 def test_recursive_cursor_use(self): 286 """ 287 http://bugs.python.org/issue10811 288 289 Recursively using a cursor, such as when reusing it from a generator led to segfaults. 290 Now we catch recursive cursor usage and raise a ProgrammingError. 291 """ 292 con = sqlite.connect(":memory:") 293 294 cur = con.cursor() 295 cur.execute("create table a (bar)") 296 cur.execute("create table b (baz)") 297 298 def foo(): 299 cur.execute("insert into a (bar) values (?)", (1,)) 300 yield 1 301 302 with self.assertRaises(sqlite.ProgrammingError): 303 cur.executemany("insert into b (baz) values (?)", 304 ((i,) for i in foo())) 305 306 def test_convert_timestamp_microsecond_padding(self): 307 """ 308 http://bugs.python.org/issue14720 309 310 The microsecond parsing of convert_timestamp() should pad with zeros, 311 since the microsecond string "456" actually represents "456000". 312 """ 313 314 con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES) 315 cur = con.cursor() 316 cur.execute("CREATE TABLE t (x TIMESTAMP)") 317 318 # Microseconds should be 456000 319 cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.456')") 320 321 # Microseconds should be truncated to 123456 322 cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.123456789')") 323 324 cur.execute("SELECT * FROM t") 325 values = [x[0] for x in cur.fetchall()] 326 327 self.assertEqual(values, [ 328 datetime.datetime(2012, 4, 4, 15, 6, 0, 456000), 329 datetime.datetime(2012, 4, 4, 15, 6, 0, 123456), 330 ]) 331 332 def test_invalid_isolation_level_type(self): 333 # isolation level is a string, not an integer 334 self.assertRaises(TypeError, 335 sqlite.connect, ":memory:", isolation_level=123) 336 337 338 def test_null_character(self): 339 # Issue #21147 340 con = sqlite.connect(":memory:") 341 self.assertRaises(ValueError, con, "\0select 1") 342 self.assertRaises(ValueError, con, "select 1\0") 343 cur = con.cursor() 344 self.assertRaises(ValueError, cur.execute, " \0select 2") 345 self.assertRaises(ValueError, cur.execute, "select 2\0") 346 347 def test_commit_cursor_reset(self): 348 """ 349 Connection.commit() did reset cursors, which made sqlite3 350 to return rows multiple times when fetched from cursors 351 after commit. See issues 10513 and 23129 for details. 352 """ 353 con = sqlite.connect(":memory:") 354 con.executescript(""" 355 create table t(c); 356 create table t2(c); 357 insert into t values(0); 358 insert into t values(1); 359 insert into t values(2); 360 """) 361 362 self.assertEqual(con.isolation_level, "") 363 364 counter = 0 365 for i, row in enumerate(con.execute("select c from t")): 366 with self.subTest(i=i, row=row): 367 con.execute("insert into t2(c) values (?)", (i,)) 368 con.commit() 369 if counter == 0: 370 self.assertEqual(row[0], 0) 371 elif counter == 1: 372 self.assertEqual(row[0], 1) 373 elif counter == 2: 374 self.assertEqual(row[0], 2) 375 counter += 1 376 self.assertEqual(counter, 3, "should have returned exactly three rows") 377 378 def test_bpo31770(self): 379 """ 380 The interpreter shouldn't crash in case Cursor.__init__() is called 381 more than once. 382 """ 383 def callback(*args): 384 pass 385 con = sqlite.connect(":memory:") 386 cur = sqlite.Cursor(con) 387 ref = weakref.ref(cur, callback) 388 cur.__init__(con) 389 del cur 390 # The interpreter shouldn't crash when ref is collected. 391 del ref 392 support.gc_collect() 393 394 def test_del_isolation_level_segfault(self): 395 with self.assertRaises(AttributeError): 396 del self.con.isolation_level 397 398 def test_bpo37347(self): 399 class Printer: 400 def log(self, *args): 401 return sqlite.SQLITE_OK 402 403 for method in [self.con.set_trace_callback, 404 functools.partial(self.con.set_progress_handler, n=1), 405 self.con.set_authorizer]: 406 printer_instance = Printer() 407 method(printer_instance.log) 408 method(printer_instance.log) 409 self.con.execute("select 1") # trigger seg fault 410 method(None) 411 412 def test_return_empty_bytestring(self): 413 cur = self.con.execute("select X''") 414 val = cur.fetchone()[0] 415 self.assertEqual(val, b'') 416 417 418def suite(): 419 tests = [ 420 RegressionTests 421 ] 422 return unittest.TestSuite( 423 [unittest.TestLoader().loadTestsFromTestCase(t) for t in tests] 424 ) 425 426def test(): 427 runner = unittest.TextTestRunner() 428 runner.run(suite()) 429 430if __name__ == "__main__": 431 test() 432