1#-*- coding: iso-8859-1 -*- 2# pysqlite2/test/regression.py: pysqlite regression tests 3# 4# Copyright (C) 2006-2010 Gerhard H�ring <gh@ghaering.de> 5# 6# This file is part of pysqlite. 7# 8# This software is provided 'as-is', without any express or implied 9# warranty. In no event will the authors be held liable for any damages 10# arising from the use of this software. 11# 12# Permission is granted to anyone to use this software for any purpose, 13# including commercial applications, and to alter it and redistribute it 14# freely, subject to the following restrictions: 15# 16# 1. The origin of this software must not be misrepresented; you must not 17# claim that you wrote the original software. If you use this software 18# in a product, an acknowledgment in the product documentation would be 19# appreciated but is not required. 20# 2. Altered source versions must be plainly marked as such, and must not be 21# misrepresented as being the original software. 22# 3. This notice may not be removed or altered from any source distribution. 23 24import datetime 25import unittest 26import sqlite3 as sqlite 27import weakref 28import functools 29from test import support 30 31class RegressionTests(unittest.TestCase): 32 def setUp(self): 33 self.con = sqlite.connect(":memory:") 34 35 def tearDown(self): 36 self.con.close() 37 38 def CheckPragmaUserVersion(self): 39 # This used to crash pysqlite because this pragma command returns NULL for the column name 40 cur = self.con.cursor() 41 cur.execute("pragma user_version") 42 43 def CheckPragmaSchemaVersion(self): 44 # This still crashed pysqlite <= 2.2.1 45 con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES) 46 try: 47 cur = self.con.cursor() 48 cur.execute("pragma schema_version") 49 finally: 50 cur.close() 51 con.close() 52 53 def CheckStatementReset(self): 54 # pysqlite 2.1.0 to 2.2.0 have the problem that not all statements are 55 # reset before a rollback, but only those that are still in the 56 # statement cache. The others are not accessible from the connection object. 57 con = sqlite.connect(":memory:", cached_statements=5) 58 cursors = [con.cursor() for x in range(5)] 59 cursors[0].execute("create table test(x)") 60 for i in range(10): 61 cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in range(10)]) 62 63 for i in range(5): 64 cursors[i].execute(" " * i + "select x from test") 65 66 con.rollback() 67 68 def CheckColumnNameWithSpaces(self): 69 cur = self.con.cursor() 70 cur.execute('select 1 as "foo bar [datetime]"') 71 self.assertEqual(cur.description[0][0], "foo bar") 72 73 cur.execute('select 1 as "foo baz"') 74 self.assertEqual(cur.description[0][0], "foo baz") 75 76 def CheckStatementFinalizationOnCloseDb(self): 77 # pysqlite versions <= 2.3.3 only finalized statements in the statement 78 # cache when closing the database. statements that were still 79 # referenced in cursors weren't closed and could provoke " 80 # "OperationalError: Unable to close due to unfinalised statements". 81 con = sqlite.connect(":memory:") 82 cursors = [] 83 # default statement cache size is 100 84 for i in range(105): 85 cur = con.cursor() 86 cursors.append(cur) 87 cur.execute("select 1 x union select " + str(i)) 88 con.close() 89 90 @unittest.skipIf(sqlite.sqlite_version_info < (3, 2, 2), 'needs sqlite 3.2.2 or newer') 91 def CheckOnConflictRollback(self): 92 con = sqlite.connect(":memory:") 93 con.execute("create table foo(x, unique(x) on conflict rollback)") 94 con.execute("insert into foo(x) values (1)") 95 try: 96 con.execute("insert into foo(x) values (1)") 97 except sqlite.DatabaseError: 98 pass 99 con.execute("insert into foo(x) values (2)") 100 try: 101 con.commit() 102 except sqlite.OperationalError: 103 self.fail("pysqlite knew nothing about the implicit ROLLBACK") 104 105 def CheckWorkaroundForBuggySqliteTransferBindings(self): 106 """ 107 pysqlite would crash with older SQLite versions unless 108 a workaround is implemented. 109 """ 110 self.con.execute("create table foo(bar)") 111 self.con.execute("drop table foo") 112 self.con.execute("create table foo(bar)") 113 114 def CheckEmptyStatement(self): 115 """ 116 pysqlite used to segfault with SQLite versions 3.5.x. These return NULL 117 for "no-operation" statements 118 """ 119 self.con.execute("") 120 121 def CheckTypeMapUsage(self): 122 """ 123 pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling 124 a statement. This test exhibits the problem. 125 """ 126 SELECT = "select * from foo" 127 con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES) 128 con.execute("create table foo(bar timestamp)") 129 con.execute("insert into foo(bar) values (?)", (datetime.datetime.now(),)) 130 con.execute(SELECT) 131 con.execute("drop table foo") 132 con.execute("create table foo(bar integer)") 133 con.execute("insert into foo(bar) values (5)") 134 con.execute(SELECT) 135 136 def CheckErrorMsgDecodeError(self): 137 # When porting the module to Python 3.0, the error message about 138 # decoding errors disappeared. This verifies they're back again. 139 with self.assertRaises(sqlite.OperationalError) as cm: 140 self.con.execute("select 'xxx' || ? || 'yyy' colname", 141 (bytes(bytearray([250])),)).fetchone() 142 msg = "Could not decode to UTF-8 column 'colname' with text 'xxx" 143 self.assertIn(msg, str(cm.exception)) 144 145 def CheckRegisterAdapter(self): 146 """ 147 See issue 3312. 148 """ 149 self.assertRaises(TypeError, sqlite.register_adapter, {}, None) 150 151 def CheckSetIsolationLevel(self): 152 # See issue 27881. 153 class CustomStr(str): 154 def upper(self): 155 return None 156 def __del__(self): 157 con.isolation_level = "" 158 159 con = sqlite.connect(":memory:") 160 con.isolation_level = None 161 for level in "", "DEFERRED", "IMMEDIATE", "EXCLUSIVE": 162 with self.subTest(level=level): 163 con.isolation_level = level 164 con.isolation_level = level.lower() 165 con.isolation_level = level.capitalize() 166 con.isolation_level = CustomStr(level) 167 168 # setting isolation_level failure should not alter previous state 169 con.isolation_level = None 170 con.isolation_level = "DEFERRED" 171 pairs = [ 172 (1, TypeError), (b'', TypeError), ("abc", ValueError), 173 ("IMMEDIATE\0EXCLUSIVE", ValueError), ("\xe9", ValueError), 174 ] 175 for value, exc in pairs: 176 with self.subTest(level=value): 177 with self.assertRaises(exc): 178 con.isolation_level = value 179 self.assertEqual(con.isolation_level, "DEFERRED") 180 181 def CheckCursorConstructorCallCheck(self): 182 """ 183 Verifies that cursor methods check whether base class __init__ was 184 called. 185 """ 186 class Cursor(sqlite.Cursor): 187 def __init__(self, con): 188 pass 189 190 con = sqlite.connect(":memory:") 191 cur = Cursor(con) 192 with self.assertRaises(sqlite.ProgrammingError): 193 cur.execute("select 4+5").fetchall() 194 with self.assertRaisesRegex(sqlite.ProgrammingError, 195 r'^Base Cursor\.__init__ not called\.$'): 196 cur.close() 197 198 def CheckStrSubclass(self): 199 """ 200 The Python 3.0 port of the module didn't cope with values of subclasses of str. 201 """ 202 class MyStr(str): pass 203 self.con.execute("select ?", (MyStr("abc"),)) 204 205 def CheckConnectionConstructorCallCheck(self): 206 """ 207 Verifies that connection methods check whether base class __init__ was 208 called. 209 """ 210 class Connection(sqlite.Connection): 211 def __init__(self, name): 212 pass 213 214 con = Connection(":memory:") 215 with self.assertRaises(sqlite.ProgrammingError): 216 cur = con.cursor() 217 218 def CheckCursorRegistration(self): 219 """ 220 Verifies that subclassed cursor classes are correctly registered with 221 the connection object, too. (fetch-across-rollback problem) 222 """ 223 class Connection(sqlite.Connection): 224 def cursor(self): 225 return Cursor(self) 226 227 class Cursor(sqlite.Cursor): 228 def __init__(self, con): 229 sqlite.Cursor.__init__(self, con) 230 231 con = Connection(":memory:") 232 cur = con.cursor() 233 cur.execute("create table foo(x)") 234 cur.executemany("insert into foo(x) values (?)", [(3,), (4,), (5,)]) 235 cur.execute("select x from foo") 236 con.rollback() 237 with self.assertRaises(sqlite.InterfaceError): 238 cur.fetchall() 239 240 def CheckAutoCommit(self): 241 """ 242 Verifies that creating a connection in autocommit mode works. 243 2.5.3 introduced a regression so that these could no longer 244 be created. 245 """ 246 con = sqlite.connect(":memory:", isolation_level=None) 247 248 def CheckPragmaAutocommit(self): 249 """ 250 Verifies that running a PRAGMA statement that does an autocommit does 251 work. This did not work in 2.5.3/2.5.4. 252 """ 253 cur = self.con.cursor() 254 cur.execute("create table foo(bar)") 255 cur.execute("insert into foo(bar) values (5)") 256 257 cur.execute("pragma page_size") 258 row = cur.fetchone() 259 260 def CheckConnectionCall(self): 261 """ 262 Call a connection with a non-string SQL request: check error handling 263 of the statement constructor. 264 """ 265 self.assertRaises(sqlite.Warning, self.con, 1) 266 267 def CheckCollation(self): 268 def collation_cb(a, b): 269 return 1 270 self.assertRaises(sqlite.ProgrammingError, self.con.create_collation, 271 # Lone surrogate cannot be encoded to the default encoding (utf8) 272 "\uDC80", collation_cb) 273 274 def CheckRecursiveCursorUse(self): 275 """ 276 http://bugs.python.org/issue10811 277 278 Recursively using a cursor, such as when reusing it from a generator led to segfaults. 279 Now we catch recursive cursor usage and raise a ProgrammingError. 280 """ 281 con = sqlite.connect(":memory:") 282 283 cur = con.cursor() 284 cur.execute("create table a (bar)") 285 cur.execute("create table b (baz)") 286 287 def foo(): 288 cur.execute("insert into a (bar) values (?)", (1,)) 289 yield 1 290 291 with self.assertRaises(sqlite.ProgrammingError): 292 cur.executemany("insert into b (baz) values (?)", 293 ((i,) for i in foo())) 294 295 def CheckConvertTimestampMicrosecondPadding(self): 296 """ 297 http://bugs.python.org/issue14720 298 299 The microsecond parsing of convert_timestamp() should pad with zeros, 300 since the microsecond string "456" actually represents "456000". 301 """ 302 303 con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES) 304 cur = con.cursor() 305 cur.execute("CREATE TABLE t (x TIMESTAMP)") 306 307 # Microseconds should be 456000 308 cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.456')") 309 310 # Microseconds should be truncated to 123456 311 cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.123456789')") 312 313 cur.execute("SELECT * FROM t") 314 values = [x[0] for x in cur.fetchall()] 315 316 self.assertEqual(values, [ 317 datetime.datetime(2012, 4, 4, 15, 6, 0, 456000), 318 datetime.datetime(2012, 4, 4, 15, 6, 0, 123456), 319 ]) 320 321 def CheckInvalidIsolationLevelType(self): 322 # isolation level is a string, not an integer 323 self.assertRaises(TypeError, 324 sqlite.connect, ":memory:", isolation_level=123) 325 326 327 def CheckNullCharacter(self): 328 # Issue #21147 329 con = sqlite.connect(":memory:") 330 self.assertRaises(ValueError, con, "\0select 1") 331 self.assertRaises(ValueError, con, "select 1\0") 332 cur = con.cursor() 333 self.assertRaises(ValueError, cur.execute, " \0select 2") 334 self.assertRaises(ValueError, cur.execute, "select 2\0") 335 336 def CheckCommitCursorReset(self): 337 """ 338 Connection.commit() did reset cursors, which made sqlite3 339 to return rows multiple times when fetched from cursors 340 after commit. See issues 10513 and 23129 for details. 341 """ 342 con = sqlite.connect(":memory:") 343 con.executescript(""" 344 create table t(c); 345 create table t2(c); 346 insert into t values(0); 347 insert into t values(1); 348 insert into t values(2); 349 """) 350 351 self.assertEqual(con.isolation_level, "") 352 353 counter = 0 354 for i, row in enumerate(con.execute("select c from t")): 355 with self.subTest(i=i, row=row): 356 con.execute("insert into t2(c) values (?)", (i,)) 357 con.commit() 358 if counter == 0: 359 self.assertEqual(row[0], 0) 360 elif counter == 1: 361 self.assertEqual(row[0], 1) 362 elif counter == 2: 363 self.assertEqual(row[0], 2) 364 counter += 1 365 self.assertEqual(counter, 3, "should have returned exactly three rows") 366 367 def CheckBpo31770(self): 368 """ 369 The interpreter shouldn't crash in case Cursor.__init__() is called 370 more than once. 371 """ 372 def callback(*args): 373 pass 374 con = sqlite.connect(":memory:") 375 cur = sqlite.Cursor(con) 376 ref = weakref.ref(cur, callback) 377 cur.__init__(con) 378 del cur 379 # The interpreter shouldn't crash when ref is collected. 380 del ref 381 support.gc_collect() 382 383 def CheckDelIsolation_levelSegfault(self): 384 with self.assertRaises(AttributeError): 385 del self.con.isolation_level 386 387 def CheckBpo37347(self): 388 class Printer: 389 def log(self, *args): 390 return sqlite.SQLITE_OK 391 392 for method in [self.con.set_trace_callback, 393 functools.partial(self.con.set_progress_handler, n=1), 394 self.con.set_authorizer]: 395 printer_instance = Printer() 396 method(printer_instance.log) 397 method(printer_instance.log) 398 self.con.execute("select 1") # trigger seg fault 399 method(None) 400 401 402 403def suite(): 404 regression_suite = unittest.makeSuite(RegressionTests, "Check") 405 return unittest.TestSuite(( 406 regression_suite, 407 )) 408 409def test(): 410 runner = unittest.TextTestRunner() 411 runner.run(suite()) 412 413if __name__ == "__main__": 414 test() 415