1#-*- coding: iso-8859-1 -*- 2# pysqlite2/test/regression.py: pysqlite regression tests 3# 4# Copyright (C) 2006-2010 Gerhard H�ring <gh@ghaering.de> 5# 6# This file is part of pysqlite. 7# 8# This software is provided 'as-is', without any express or implied 9# warranty. In no event will the authors be held liable for any damages 10# arising from the use of this software. 11# 12# Permission is granted to anyone to use this software for any purpose, 13# including commercial applications, and to alter it and redistribute it 14# freely, subject to the following restrictions: 15# 16# 1. The origin of this software must not be misrepresented; you must not 17# claim that you wrote the original software. If you use this software 18# in a product, an acknowledgment in the product documentation would be 19# appreciated but is not required. 20# 2. Altered source versions must be plainly marked as such, and must not be 21# misrepresented as being the original software. 22# 3. This notice may not be removed or altered from any source distribution. 23 24import datetime 25import unittest 26import sqlite3 as sqlite 27import weakref 28import functools 29from test import support 30 31class RegressionTests(unittest.TestCase): 32 def setUp(self): 33 self.con = sqlite.connect(":memory:") 34 35 def tearDown(self): 36 self.con.close() 37 38 def CheckPragmaUserVersion(self): 39 # This used to crash pysqlite because this pragma command returns NULL for the column name 40 cur = self.con.cursor() 41 cur.execute("pragma user_version") 42 43 def CheckPragmaSchemaVersion(self): 44 # This still crashed pysqlite <= 2.2.1 45 con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES) 46 try: 47 cur = self.con.cursor() 48 cur.execute("pragma schema_version") 49 finally: 50 cur.close() 51 con.close() 52 53 def CheckStatementReset(self): 54 # pysqlite 2.1.0 to 2.2.0 have the problem that not all statements are 55 # reset before a rollback, but only those that are still in the 56 # statement cache. The others are not accessible from the connection object. 57 con = sqlite.connect(":memory:", cached_statements=5) 58 cursors = [con.cursor() for x in range(5)] 59 cursors[0].execute("create table test(x)") 60 for i in range(10): 61 cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in range(10)]) 62 63 for i in range(5): 64 cursors[i].execute(" " * i + "select x from test") 65 66 con.rollback() 67 68 def CheckColumnNameWithSpaces(self): 69 cur = self.con.cursor() 70 cur.execute('select 1 as "foo bar [datetime]"') 71 self.assertEqual(cur.description[0][0], "foo bar [datetime]") 72 73 cur.execute('select 1 as "foo baz"') 74 self.assertEqual(cur.description[0][0], "foo baz") 75 76 def CheckStatementFinalizationOnCloseDb(self): 77 # pysqlite versions <= 2.3.3 only finalized statements in the statement 78 # cache when closing the database. statements that were still 79 # referenced in cursors weren't closed and could provoke " 80 # "OperationalError: Unable to close due to unfinalised statements". 81 con = sqlite.connect(":memory:") 82 cursors = [] 83 # default statement cache size is 100 84 for i in range(105): 85 cur = con.cursor() 86 cursors.append(cur) 87 cur.execute("select 1 x union select " + str(i)) 88 con.close() 89 90 @unittest.skipIf(sqlite.sqlite_version_info < (3, 2, 2), 'needs sqlite 3.2.2 or newer') 91 def CheckOnConflictRollback(self): 92 con = sqlite.connect(":memory:") 93 con.execute("create table foo(x, unique(x) on conflict rollback)") 94 con.execute("insert into foo(x) values (1)") 95 try: 96 con.execute("insert into foo(x) values (1)") 97 except sqlite.DatabaseError: 98 pass 99 con.execute("insert into foo(x) values (2)") 100 try: 101 con.commit() 102 except sqlite.OperationalError: 103 self.fail("pysqlite knew nothing about the implicit ROLLBACK") 104 105 def CheckWorkaroundForBuggySqliteTransferBindings(self): 106 """ 107 pysqlite would crash with older SQLite versions unless 108 a workaround is implemented. 109 """ 110 self.con.execute("create table foo(bar)") 111 self.con.execute("drop table foo") 112 self.con.execute("create table foo(bar)") 113 114 def CheckEmptyStatement(self): 115 """ 116 pysqlite used to segfault with SQLite versions 3.5.x. These return NULL 117 for "no-operation" statements 118 """ 119 self.con.execute("") 120 121 def CheckTypeMapUsage(self): 122 """ 123 pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling 124 a statement. This test exhibits the problem. 125 """ 126 SELECT = "select * from foo" 127 con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES) 128 con.execute("create table foo(bar timestamp)") 129 con.execute("insert into foo(bar) values (?)", (datetime.datetime.now(),)) 130 con.execute(SELECT) 131 con.execute("drop table foo") 132 con.execute("create table foo(bar integer)") 133 con.execute("insert into foo(bar) values (5)") 134 con.execute(SELECT) 135 136 def CheckBindMutatingList(self): 137 # Issue41662: Crash when mutate a list of parameters during iteration. 138 class X: 139 def __conform__(self, protocol): 140 parameters.clear() 141 return "..." 142 parameters = [X(), 0] 143 con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES) 144 con.execute("create table foo(bar X, baz integer)") 145 # Should not crash 146 with self.assertRaises(IndexError): 147 con.execute("insert into foo(bar, baz) values (?, ?)", parameters) 148 149 def CheckErrorMsgDecodeError(self): 150 # When porting the module to Python 3.0, the error message about 151 # decoding errors disappeared. This verifies they're back again. 152 with self.assertRaises(sqlite.OperationalError) as cm: 153 self.con.execute("select 'xxx' || ? || 'yyy' colname", 154 (bytes(bytearray([250])),)).fetchone() 155 msg = "Could not decode to UTF-8 column 'colname' with text 'xxx" 156 self.assertIn(msg, str(cm.exception)) 157 158 def CheckRegisterAdapter(self): 159 """ 160 See issue 3312. 161 """ 162 self.assertRaises(TypeError, sqlite.register_adapter, {}, None) 163 164 def CheckSetIsolationLevel(self): 165 # See issue 27881. 166 class CustomStr(str): 167 def upper(self): 168 return None 169 def __del__(self): 170 con.isolation_level = "" 171 172 con = sqlite.connect(":memory:") 173 con.isolation_level = None 174 for level in "", "DEFERRED", "IMMEDIATE", "EXCLUSIVE": 175 with self.subTest(level=level): 176 con.isolation_level = level 177 con.isolation_level = level.lower() 178 con.isolation_level = level.capitalize() 179 con.isolation_level = CustomStr(level) 180 181 # setting isolation_level failure should not alter previous state 182 con.isolation_level = None 183 con.isolation_level = "DEFERRED" 184 pairs = [ 185 (1, TypeError), (b'', TypeError), ("abc", ValueError), 186 ("IMMEDIATE\0EXCLUSIVE", ValueError), ("\xe9", ValueError), 187 ] 188 for value, exc in pairs: 189 with self.subTest(level=value): 190 with self.assertRaises(exc): 191 con.isolation_level = value 192 self.assertEqual(con.isolation_level, "DEFERRED") 193 194 def CheckCursorConstructorCallCheck(self): 195 """ 196 Verifies that cursor methods check whether base class __init__ was 197 called. 198 """ 199 class Cursor(sqlite.Cursor): 200 def __init__(self, con): 201 pass 202 203 con = sqlite.connect(":memory:") 204 cur = Cursor(con) 205 with self.assertRaises(sqlite.ProgrammingError): 206 cur.execute("select 4+5").fetchall() 207 with self.assertRaisesRegex(sqlite.ProgrammingError, 208 r'^Base Cursor\.__init__ not called\.$'): 209 cur.close() 210 211 def CheckStrSubclass(self): 212 """ 213 The Python 3.0 port of the module didn't cope with values of subclasses of str. 214 """ 215 class MyStr(str): pass 216 self.con.execute("select ?", (MyStr("abc"),)) 217 218 def CheckConnectionConstructorCallCheck(self): 219 """ 220 Verifies that connection methods check whether base class __init__ was 221 called. 222 """ 223 class Connection(sqlite.Connection): 224 def __init__(self, name): 225 pass 226 227 con = Connection(":memory:") 228 with self.assertRaises(sqlite.ProgrammingError): 229 cur = con.cursor() 230 231 def CheckCursorRegistration(self): 232 """ 233 Verifies that subclassed cursor classes are correctly registered with 234 the connection object, too. (fetch-across-rollback problem) 235 """ 236 class Connection(sqlite.Connection): 237 def cursor(self): 238 return Cursor(self) 239 240 class Cursor(sqlite.Cursor): 241 def __init__(self, con): 242 sqlite.Cursor.__init__(self, con) 243 244 con = Connection(":memory:") 245 cur = con.cursor() 246 cur.execute("create table foo(x)") 247 cur.executemany("insert into foo(x) values (?)", [(3,), (4,), (5,)]) 248 cur.execute("select x from foo") 249 con.rollback() 250 with self.assertRaises(sqlite.InterfaceError): 251 cur.fetchall() 252 253 def CheckAutoCommit(self): 254 """ 255 Verifies that creating a connection in autocommit mode works. 256 2.5.3 introduced a regression so that these could no longer 257 be created. 258 """ 259 con = sqlite.connect(":memory:", isolation_level=None) 260 261 def CheckPragmaAutocommit(self): 262 """ 263 Verifies that running a PRAGMA statement that does an autocommit does 264 work. This did not work in 2.5.3/2.5.4. 265 """ 266 cur = self.con.cursor() 267 cur.execute("create table foo(bar)") 268 cur.execute("insert into foo(bar) values (5)") 269 270 cur.execute("pragma page_size") 271 row = cur.fetchone() 272 273 def CheckConnectionCall(self): 274 """ 275 Call a connection with a non-string SQL request: check error handling 276 of the statement constructor. 277 """ 278 self.assertRaises(TypeError, self.con, 1) 279 280 def CheckCollation(self): 281 def collation_cb(a, b): 282 return 1 283 self.assertRaises(sqlite.ProgrammingError, self.con.create_collation, 284 # Lone surrogate cannot be encoded to the default encoding (utf8) 285 "\uDC80", collation_cb) 286 287 def CheckRecursiveCursorUse(self): 288 """ 289 http://bugs.python.org/issue10811 290 291 Recursively using a cursor, such as when reusing it from a generator led to segfaults. 292 Now we catch recursive cursor usage and raise a ProgrammingError. 293 """ 294 con = sqlite.connect(":memory:") 295 296 cur = con.cursor() 297 cur.execute("create table a (bar)") 298 cur.execute("create table b (baz)") 299 300 def foo(): 301 cur.execute("insert into a (bar) values (?)", (1,)) 302 yield 1 303 304 with self.assertRaises(sqlite.ProgrammingError): 305 cur.executemany("insert into b (baz) values (?)", 306 ((i,) for i in foo())) 307 308 def CheckConvertTimestampMicrosecondPadding(self): 309 """ 310 http://bugs.python.org/issue14720 311 312 The microsecond parsing of convert_timestamp() should pad with zeros, 313 since the microsecond string "456" actually represents "456000". 314 """ 315 316 con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES) 317 cur = con.cursor() 318 cur.execute("CREATE TABLE t (x TIMESTAMP)") 319 320 # Microseconds should be 456000 321 cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.456')") 322 323 # Microseconds should be truncated to 123456 324 cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.123456789')") 325 326 cur.execute("SELECT * FROM t") 327 values = [x[0] for x in cur.fetchall()] 328 329 self.assertEqual(values, [ 330 datetime.datetime(2012, 4, 4, 15, 6, 0, 456000), 331 datetime.datetime(2012, 4, 4, 15, 6, 0, 123456), 332 ]) 333 334 def CheckInvalidIsolationLevelType(self): 335 # isolation level is a string, not an integer 336 self.assertRaises(TypeError, 337 sqlite.connect, ":memory:", isolation_level=123) 338 339 340 def CheckNullCharacter(self): 341 # Issue #21147 342 con = sqlite.connect(":memory:") 343 self.assertRaises(ValueError, con, "\0select 1") 344 self.assertRaises(ValueError, con, "select 1\0") 345 cur = con.cursor() 346 self.assertRaises(ValueError, cur.execute, " \0select 2") 347 self.assertRaises(ValueError, cur.execute, "select 2\0") 348 349 def CheckCommitCursorReset(self): 350 """ 351 Connection.commit() did reset cursors, which made sqlite3 352 to return rows multiple times when fetched from cursors 353 after commit. See issues 10513 and 23129 for details. 354 """ 355 con = sqlite.connect(":memory:") 356 con.executescript(""" 357 create table t(c); 358 create table t2(c); 359 insert into t values(0); 360 insert into t values(1); 361 insert into t values(2); 362 """) 363 364 self.assertEqual(con.isolation_level, "") 365 366 counter = 0 367 for i, row in enumerate(con.execute("select c from t")): 368 with self.subTest(i=i, row=row): 369 con.execute("insert into t2(c) values (?)", (i,)) 370 con.commit() 371 if counter == 0: 372 self.assertEqual(row[0], 0) 373 elif counter == 1: 374 self.assertEqual(row[0], 1) 375 elif counter == 2: 376 self.assertEqual(row[0], 2) 377 counter += 1 378 self.assertEqual(counter, 3, "should have returned exactly three rows") 379 380 def CheckBpo31770(self): 381 """ 382 The interpreter shouldn't crash in case Cursor.__init__() is called 383 more than once. 384 """ 385 def callback(*args): 386 pass 387 con = sqlite.connect(":memory:") 388 cur = sqlite.Cursor(con) 389 ref = weakref.ref(cur, callback) 390 cur.__init__(con) 391 del cur 392 # The interpreter shouldn't crash when ref is collected. 393 del ref 394 support.gc_collect() 395 396 def CheckDelIsolation_levelSegfault(self): 397 with self.assertRaises(AttributeError): 398 del self.con.isolation_level 399 400 def CheckBpo37347(self): 401 class Printer: 402 def log(self, *args): 403 return sqlite.SQLITE_OK 404 405 for method in [self.con.set_trace_callback, 406 functools.partial(self.con.set_progress_handler, n=1), 407 self.con.set_authorizer]: 408 printer_instance = Printer() 409 method(printer_instance.log) 410 method(printer_instance.log) 411 self.con.execute("select 1") # trigger seg fault 412 method(None) 413 414 415 416def suite(): 417 regression_suite = unittest.makeSuite(RegressionTests, "Check") 418 return unittest.TestSuite(( 419 regression_suite, 420 )) 421 422def test(): 423 runner = unittest.TextTestRunner() 424 runner.run(suite()) 425 426if __name__ == "__main__": 427 test() 428