1# pysqlite2/test/regression.py: pysqlite regression tests 2# 3# Copyright (C) 2006-2010 Gerhard Häring <gh@ghaering.de> 4# 5# This file is part of pysqlite. 6# 7# This software is provided 'as-is', without any express or implied 8# warranty. In no event will the authors be held liable for any damages 9# arising from the use of this software. 10# 11# Permission is granted to anyone to use this software for any purpose, 12# including commercial applications, and to alter it and redistribute it 13# freely, subject to the following restrictions: 14# 15# 1. The origin of this software must not be misrepresented; you must not 16# claim that you wrote the original software. If you use this software 17# in a product, an acknowledgment in the product documentation would be 18# appreciated but is not required. 19# 2. Altered source versions must be plainly marked as such, and must not be 20# misrepresented as being the original software. 21# 3. This notice may not be removed or altered from any source distribution. 22 23import datetime 24import unittest 25import sqlite3 as sqlite 26import weakref 27import functools 28 29from test import support 30from unittest.mock import patch 31 32from .util import memory_database, cx_limit 33from .util import MemoryDatabaseMixin 34 35 36class RegressionTests(MemoryDatabaseMixin, unittest.TestCase): 37 38 def test_pragma_user_version(self): 39 # This used to crash pysqlite because this pragma command returns NULL for the column name 40 cur = self.con.cursor() 41 cur.execute("pragma user_version") 42 43 def test_pragma_schema_version(self): 44 # This still crashed pysqlite <= 2.2.1 45 with memory_database(detect_types=sqlite.PARSE_COLNAMES) as con: 46 cur = self.con.cursor() 47 cur.execute("pragma schema_version") 48 49 def test_statement_reset(self): 50 # pysqlite 2.1.0 to 2.2.0 have the problem that not all statements are 51 # reset before a rollback, but only those that are still in the 52 # statement cache. The others are not accessible from the connection object. 53 with memory_database(cached_statements=5) as con: 54 cursors = [con.cursor() for x in range(5)] 55 cursors[0].execute("create table test(x)") 56 for i in range(10): 57 cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in range(10)]) 58 59 for i in range(5): 60 cursors[i].execute(" " * i + "select x from test") 61 62 con.rollback() 63 64 def test_column_name_with_spaces(self): 65 cur = self.con.cursor() 66 cur.execute('select 1 as "foo bar [datetime]"') 67 self.assertEqual(cur.description[0][0], "foo bar [datetime]") 68 69 cur.execute('select 1 as "foo baz"') 70 self.assertEqual(cur.description[0][0], "foo baz") 71 72 def test_statement_finalization_on_close_db(self): 73 # pysqlite versions <= 2.3.3 only finalized statements in the statement 74 # cache when closing the database. statements that were still 75 # referenced in cursors weren't closed and could provoke " 76 # "OperationalError: Unable to close due to unfinalised statements". 77 cursors = [] 78 # default statement cache size is 100 79 for i in range(105): 80 cur = self.con.cursor() 81 cursors.append(cur) 82 cur.execute("select 1 x union select " + str(i)) 83 84 def test_on_conflict_rollback(self): 85 con = self.con 86 con.execute("create table foo(x, unique(x) on conflict rollback)") 87 con.execute("insert into foo(x) values (1)") 88 try: 89 con.execute("insert into foo(x) values (1)") 90 except sqlite.DatabaseError: 91 pass 92 con.execute("insert into foo(x) values (2)") 93 try: 94 con.commit() 95 except sqlite.OperationalError: 96 self.fail("pysqlite knew nothing about the implicit ROLLBACK") 97 98 def test_workaround_for_buggy_sqlite_transfer_bindings(self): 99 """ 100 pysqlite would crash with older SQLite versions unless 101 a workaround is implemented. 102 """ 103 self.con.execute("create table foo(bar)") 104 self.con.execute("drop table foo") 105 self.con.execute("create table foo(bar)") 106 107 def test_empty_statement(self): 108 """ 109 pysqlite used to segfault with SQLite versions 3.5.x. These return NULL 110 for "no-operation" statements 111 """ 112 self.con.execute("") 113 114 def test_type_map_usage(self): 115 """ 116 pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling 117 a statement. This test exhibits the problem. 118 """ 119 SELECT = "select * from foo" 120 with memory_database(detect_types=sqlite.PARSE_DECLTYPES) as con: 121 cur = con.cursor() 122 cur.execute("create table foo(bar timestamp)") 123 with self.assertWarnsRegex(DeprecationWarning, "adapter"): 124 cur.execute("insert into foo(bar) values (?)", (datetime.datetime.now(),)) 125 cur.execute(SELECT) 126 cur.execute("drop table foo") 127 cur.execute("create table foo(bar integer)") 128 cur.execute("insert into foo(bar) values (5)") 129 cur.execute(SELECT) 130 131 def test_bind_mutating_list(self): 132 # Issue41662: Crash when mutate a list of parameters during iteration. 133 class X: 134 def __conform__(self, protocol): 135 parameters.clear() 136 return "..." 137 parameters = [X(), 0] 138 with memory_database(detect_types=sqlite.PARSE_DECLTYPES) as con: 139 con.execute("create table foo(bar X, baz integer)") 140 # Should not crash 141 with self.assertRaises(IndexError): 142 con.execute("insert into foo(bar, baz) values (?, ?)", parameters) 143 144 def test_error_msg_decode_error(self): 145 # When porting the module to Python 3.0, the error message about 146 # decoding errors disappeared. This verifies they're back again. 147 with self.assertRaises(sqlite.OperationalError) as cm: 148 self.con.execute("select 'xxx' || ? || 'yyy' colname", 149 (bytes(bytearray([250])),)).fetchone() 150 msg = "Could not decode to UTF-8 column 'colname' with text 'xxx" 151 self.assertIn(msg, str(cm.exception)) 152 153 def test_register_adapter(self): 154 """ 155 See issue 3312. 156 """ 157 self.assertRaises(TypeError, sqlite.register_adapter, {}, None) 158 159 def test_set_isolation_level(self): 160 # See issue 27881. 161 class CustomStr(str): 162 def upper(self): 163 return None 164 def __del__(self): 165 con.isolation_level = "" 166 167 con = self.con 168 con.isolation_level = None 169 for level in "", "DEFERRED", "IMMEDIATE", "EXCLUSIVE": 170 with self.subTest(level=level): 171 con.isolation_level = level 172 con.isolation_level = level.lower() 173 con.isolation_level = level.capitalize() 174 con.isolation_level = CustomStr(level) 175 176 # setting isolation_level failure should not alter previous state 177 con.isolation_level = None 178 con.isolation_level = "DEFERRED" 179 pairs = [ 180 (1, TypeError), (b'', TypeError), ("abc", ValueError), 181 ("IMMEDIATE\0EXCLUSIVE", ValueError), ("\xe9", ValueError), 182 ] 183 for value, exc in pairs: 184 with self.subTest(level=value): 185 with self.assertRaises(exc): 186 con.isolation_level = value 187 self.assertEqual(con.isolation_level, "DEFERRED") 188 189 def test_cursor_constructor_call_check(self): 190 """ 191 Verifies that cursor methods check whether base class __init__ was 192 called. 193 """ 194 class Cursor(sqlite.Cursor): 195 def __init__(self, con): 196 pass 197 198 cur = Cursor(self.con) 199 with self.assertRaises(sqlite.ProgrammingError): 200 cur.execute("select 4+5").fetchall() 201 with self.assertRaisesRegex(sqlite.ProgrammingError, 202 r'^Base Cursor\.__init__ not called\.$'): 203 cur.close() 204 205 def test_str_subclass(self): 206 """ 207 The Python 3.0 port of the module didn't cope with values of subclasses of str. 208 """ 209 class MyStr(str): pass 210 self.con.execute("select ?", (MyStr("abc"),)) 211 212 def test_connection_constructor_call_check(self): 213 """ 214 Verifies that connection methods check whether base class __init__ was 215 called. 216 """ 217 class Connection(sqlite.Connection): 218 def __init__(self, name): 219 pass 220 221 con = Connection(":memory:") 222 with self.assertRaises(sqlite.ProgrammingError): 223 cur = con.cursor() 224 225 def test_auto_commit(self): 226 """ 227 Verifies that creating a connection in autocommit mode works. 228 2.5.3 introduced a regression so that these could no longer 229 be created. 230 """ 231 with memory_database(isolation_level=None) as con: 232 self.assertIsNone(con.isolation_level) 233 self.assertFalse(con.in_transaction) 234 235 def test_pragma_autocommit(self): 236 """ 237 Verifies that running a PRAGMA statement that does an autocommit does 238 work. This did not work in 2.5.3/2.5.4. 239 """ 240 cur = self.con.cursor() 241 cur.execute("create table foo(bar)") 242 cur.execute("insert into foo(bar) values (5)") 243 244 cur.execute("pragma page_size") 245 row = cur.fetchone() 246 247 def test_connection_call(self): 248 """ 249 Call a connection with a non-string SQL request: check error handling 250 of the statement constructor. 251 """ 252 self.assertRaises(TypeError, self.con, b"select 1") 253 254 def test_collation(self): 255 def collation_cb(a, b): 256 return 1 257 self.assertRaises(UnicodeEncodeError, self.con.create_collation, 258 # Lone surrogate cannot be encoded to the default encoding (utf8) 259 "\uDC80", collation_cb) 260 261 def test_recursive_cursor_use(self): 262 """ 263 http://bugs.python.org/issue10811 264 265 Recursively using a cursor, such as when reusing it from a generator led to segfaults. 266 Now we catch recursive cursor usage and raise a ProgrammingError. 267 """ 268 cur = self.con.cursor() 269 cur.execute("create table a (bar)") 270 cur.execute("create table b (baz)") 271 272 def foo(): 273 cur.execute("insert into a (bar) values (?)", (1,)) 274 yield 1 275 276 with self.assertRaises(sqlite.ProgrammingError): 277 cur.executemany("insert into b (baz) values (?)", 278 ((i,) for i in foo())) 279 280 def test_convert_timestamp_microsecond_padding(self): 281 """ 282 http://bugs.python.org/issue14720 283 284 The microsecond parsing of convert_timestamp() should pad with zeros, 285 since the microsecond string "456" actually represents "456000". 286 """ 287 288 with memory_database(detect_types=sqlite.PARSE_DECLTYPES) as con: 289 cur = con.cursor() 290 cur.execute("CREATE TABLE t (x TIMESTAMP)") 291 292 # Microseconds should be 456000 293 cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.456')") 294 295 # Microseconds should be truncated to 123456 296 cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.123456789')") 297 298 cur.execute("SELECT * FROM t") 299 with self.assertWarnsRegex(DeprecationWarning, "converter"): 300 values = [x[0] for x in cur.fetchall()] 301 302 self.assertEqual(values, [ 303 datetime.datetime(2012, 4, 4, 15, 6, 0, 456000), 304 datetime.datetime(2012, 4, 4, 15, 6, 0, 123456), 305 ]) 306 307 def test_invalid_isolation_level_type(self): 308 # isolation level is a string, not an integer 309 regex = "isolation_level must be str or None" 310 with self.assertRaisesRegex(TypeError, regex): 311 memory_database(isolation_level=123).__enter__() 312 313 314 def test_null_character(self): 315 # Issue #21147 316 cur = self.con.cursor() 317 queries = ["\0select 1", "select 1\0"] 318 for query in queries: 319 with self.subTest(query=query): 320 self.assertRaisesRegex(sqlite.ProgrammingError, "null char", 321 self.con.execute, query) 322 with self.subTest(query=query): 323 self.assertRaisesRegex(sqlite.ProgrammingError, "null char", 324 cur.execute, query) 325 326 def test_surrogates(self): 327 con = self.con 328 self.assertRaises(UnicodeEncodeError, con, "select '\ud8ff'") 329 self.assertRaises(UnicodeEncodeError, con, "select '\udcff'") 330 cur = con.cursor() 331 self.assertRaises(UnicodeEncodeError, cur.execute, "select '\ud8ff'") 332 self.assertRaises(UnicodeEncodeError, cur.execute, "select '\udcff'") 333 334 def test_large_sql(self): 335 msg = "query string is too large" 336 with memory_database() as cx, cx_limit(cx) as lim: 337 cu = cx.cursor() 338 339 cx("select 1".ljust(lim)) 340 # use a different SQL statement; don't reuse from the LRU cache 341 cu.execute("select 2".ljust(lim)) 342 343 sql = "select 3".ljust(lim+1) 344 self.assertRaisesRegex(sqlite.DataError, msg, cx, sql) 345 self.assertRaisesRegex(sqlite.DataError, msg, cu.execute, sql) 346 347 def test_commit_cursor_reset(self): 348 """ 349 Connection.commit() did reset cursors, which made sqlite3 350 to return rows multiple times when fetched from cursors 351 after commit. See issues 10513 and 23129 for details. 352 """ 353 con = self.con 354 con.executescript(""" 355 create table t(c); 356 create table t2(c); 357 insert into t values(0); 358 insert into t values(1); 359 insert into t values(2); 360 """) 361 362 self.assertEqual(con.isolation_level, "") 363 364 counter = 0 365 for i, row in enumerate(con.execute("select c from t")): 366 with self.subTest(i=i, row=row): 367 con.execute("insert into t2(c) values (?)", (i,)) 368 con.commit() 369 if counter == 0: 370 self.assertEqual(row[0], 0) 371 elif counter == 1: 372 self.assertEqual(row[0], 1) 373 elif counter == 2: 374 self.assertEqual(row[0], 2) 375 counter += 1 376 self.assertEqual(counter, 3, "should have returned exactly three rows") 377 378 def test_bpo31770(self): 379 """ 380 The interpreter shouldn't crash in case Cursor.__init__() is called 381 more than once. 382 """ 383 def callback(*args): 384 pass 385 cur = sqlite.Cursor(self.con) 386 ref = weakref.ref(cur, callback) 387 cur.__init__(self.con) 388 del cur 389 # The interpreter shouldn't crash when ref is collected. 390 del ref 391 support.gc_collect() 392 393 def test_del_isolation_level_segfault(self): 394 with self.assertRaises(AttributeError): 395 del self.con.isolation_level 396 397 def test_bpo37347(self): 398 class Printer: 399 def log(self, *args): 400 return sqlite.SQLITE_OK 401 402 for method in [self.con.set_trace_callback, 403 functools.partial(self.con.set_progress_handler, n=1), 404 self.con.set_authorizer]: 405 printer_instance = Printer() 406 method(printer_instance.log) 407 method(printer_instance.log) 408 self.con.execute("select 1") # trigger seg fault 409 method(None) 410 411 def test_return_empty_bytestring(self): 412 cur = self.con.execute("select X''") 413 val = cur.fetchone()[0] 414 self.assertEqual(val, b'') 415 416 def test_table_lock_cursor_replace_stmt(self): 417 with memory_database() as con: 418 con = self.con 419 cur = con.cursor() 420 cur.execute("create table t(t)") 421 cur.executemany("insert into t values(?)", 422 ((v,) for v in range(5))) 423 con.commit() 424 cur.execute("select t from t") 425 cur.execute("drop table t") 426 con.commit() 427 428 def test_table_lock_cursor_dealloc(self): 429 with memory_database() as con: 430 con.execute("create table t(t)") 431 con.executemany("insert into t values(?)", 432 ((v,) for v in range(5))) 433 con.commit() 434 cur = con.execute("select t from t") 435 del cur 436 support.gc_collect() 437 con.execute("drop table t") 438 con.commit() 439 440 def test_table_lock_cursor_non_readonly_select(self): 441 with memory_database() as con: 442 con.execute("create table t(t)") 443 con.executemany("insert into t values(?)", 444 ((v,) for v in range(5))) 445 con.commit() 446 def dup(v): 447 con.execute("insert into t values(?)", (v,)) 448 return 449 con.create_function("dup", 1, dup) 450 cur = con.execute("select dup(t) from t") 451 del cur 452 support.gc_collect() 453 con.execute("drop table t") 454 con.commit() 455 456 def test_executescript_step_through_select(self): 457 with memory_database() as con: 458 values = [(v,) for v in range(5)] 459 with con: 460 con.execute("create table t(t)") 461 con.executemany("insert into t values(?)", values) 462 steps = [] 463 con.create_function("step", 1, lambda x: steps.append((x,))) 464 con.executescript("select step(t) from t") 465 self.assertEqual(steps, values) 466 467 468class RecursiveUseOfCursors(unittest.TestCase): 469 # GH-80254: sqlite3 should not segfault for recursive use of cursors. 470 msg = "Recursive use of cursors not allowed" 471 472 def setUp(self): 473 self.con = sqlite.connect(":memory:", 474 detect_types=sqlite.PARSE_COLNAMES) 475 self.cur = self.con.cursor() 476 self.cur.execute("create table test(x foo)") 477 self.cur.executemany("insert into test(x) values (?)", 478 [("foo",), ("bar",)]) 479 480 def tearDown(self): 481 self.cur.close() 482 self.con.close() 483 484 def test_recursive_cursor_init(self): 485 conv = lambda x: self.cur.__init__(self.con) 486 with patch.dict(sqlite.converters, {"INIT": conv}): 487 self.cur.execute('select x as "x [INIT]", x from test') 488 self.assertRaisesRegex(sqlite.ProgrammingError, self.msg, 489 self.cur.fetchall) 490 491 def test_recursive_cursor_close(self): 492 conv = lambda x: self.cur.close() 493 with patch.dict(sqlite.converters, {"CLOSE": conv}): 494 self.cur.execute('select x as "x [CLOSE]", x from test') 495 self.assertRaisesRegex(sqlite.ProgrammingError, self.msg, 496 self.cur.fetchall) 497 498 def test_recursive_cursor_iter(self): 499 conv = lambda x, l=[]: self.cur.fetchone() if l else l.append(None) 500 with patch.dict(sqlite.converters, {"ITER": conv}): 501 self.cur.execute('select x as "x [ITER]", x from test') 502 self.assertRaisesRegex(sqlite.ProgrammingError, self.msg, 503 self.cur.fetchall) 504 505 506if __name__ == "__main__": 507 unittest.main() 508