1#-*- coding: iso-8859-1 -*- 2# pysqlite2/test/regression.py: pysqlite regression tests 3# 4# Copyright (C) 2006-2007 Gerhard H�ring <gh@ghaering.de> 5# 6# This file is part of pysqlite. 7# 8# This software is provided 'as-is', without any express or implied 9# warranty. In no event will the authors be held liable for any damages 10# arising from the use of this software. 11# 12# Permission is granted to anyone to use this software for any purpose, 13# including commercial applications, and to alter it and redistribute it 14# freely, subject to the following restrictions: 15# 16# 1. The origin of this software must not be misrepresented; you must not 17# claim that you wrote the original software. If you use this software 18# in a product, an acknowledgment in the product documentation would be 19# appreciated but is not required. 20# 2. Altered source versions must be plainly marked as such, and must not be 21# misrepresented as being the original software. 22# 3. This notice may not be removed or altered from any source distribution. 23 24import datetime 25import unittest 26import sqlite3 as sqlite 27import weakref 28from test import support 29 30class RegressionTests(unittest.TestCase): 31 def setUp(self): 32 self.con = sqlite.connect(":memory:") 33 34 def tearDown(self): 35 self.con.close() 36 37 def CheckPragmaUserVersion(self): 38 # This used to crash pysqlite because this pragma command returns NULL for the column name 39 cur = self.con.cursor() 40 cur.execute("pragma user_version") 41 42 def CheckPragmaSchemaVersion(self): 43 # This still crashed pysqlite <= 2.2.1 44 con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES) 45 try: 46 cur = self.con.cursor() 47 cur.execute("pragma schema_version") 48 finally: 49 cur.close() 50 con.close() 51 52 def CheckStatementReset(self): 53 # pysqlite 2.1.0 to 2.2.0 have the problem that not all statements are 54 # reset before a rollback, but only those that are still in the 55 # statement cache. The others are not accessible from the connection object. 56 con = sqlite.connect(":memory:", cached_statements=5) 57 cursors = [con.cursor() for x in xrange(5)] 58 cursors[0].execute("create table test(x)") 59 for i in range(10): 60 cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in xrange(10)]) 61 62 for i in range(5): 63 cursors[i].execute(" " * i + "select x from test") 64 65 con.rollback() 66 67 def CheckColumnNameWithSpaces(self): 68 cur = self.con.cursor() 69 cur.execute('select 1 as "foo bar [datetime]"') 70 self.assertEqual(cur.description[0][0], "foo bar") 71 72 cur.execute('select 1 as "foo baz"') 73 self.assertEqual(cur.description[0][0], "foo baz") 74 75 def CheckStatementFinalizationOnCloseDb(self): 76 # pysqlite versions <= 2.3.3 only finalized statements in the statement 77 # cache when closing the database. statements that were still 78 # referenced in cursors weren't closed and could provoke " 79 # "OperationalError: Unable to close due to unfinalised statements". 80 con = sqlite.connect(":memory:") 81 cursors = [] 82 # default statement cache size is 100 83 for i in range(105): 84 cur = con.cursor() 85 cursors.append(cur) 86 cur.execute("select 1 x union select " + str(i)) 87 con.close() 88 89 def CheckOnConflictRollback(self): 90 if sqlite.sqlite_version_info < (3, 2, 2): 91 return 92 con = sqlite.connect(":memory:") 93 con.execute("create table foo(x, unique(x) on conflict rollback)") 94 con.execute("insert into foo(x) values (1)") 95 try: 96 con.execute("insert into foo(x) values (1)") 97 except sqlite.DatabaseError: 98 pass 99 con.execute("insert into foo(x) values (2)") 100 try: 101 con.commit() 102 except sqlite.OperationalError: 103 self.fail("pysqlite knew nothing about the implicit ROLLBACK") 104 105 def CheckWorkaroundForBuggySqliteTransferBindings(self): 106 """ 107 pysqlite would crash with older SQLite versions unless 108 a workaround is implemented. 109 """ 110 self.con.execute("create table foo(bar)") 111 self.con.execute("drop table foo") 112 self.con.execute("create table foo(bar)") 113 114 def CheckEmptyStatement(self): 115 """ 116 pysqlite used to segfault with SQLite versions 3.5.x. These return NULL 117 for "no-operation" statements 118 """ 119 self.con.execute("") 120 121 def CheckUnicodeConnect(self): 122 """ 123 With pysqlite 2.4.0 you needed to use a string or an APSW connection 124 object for opening database connections. 125 126 Formerly, both bytestrings and unicode strings used to work. 127 128 Let's make sure unicode strings work in the future. 129 """ 130 con = sqlite.connect(u":memory:") 131 con.close() 132 133 def CheckTypeMapUsage(self): 134 """ 135 pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling 136 a statement. This test exhibits the problem. 137 """ 138 SELECT = "select * from foo" 139 con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES) 140 con.execute("create table foo(bar timestamp)") 141 con.execute("insert into foo(bar) values (?)", (datetime.datetime.now(),)) 142 con.execute(SELECT) 143 con.execute("drop table foo") 144 con.execute("create table foo(bar integer)") 145 con.execute("insert into foo(bar) values (5)") 146 con.execute(SELECT) 147 148 def CheckRegisterAdapter(self): 149 """ 150 See issue 3312. 151 """ 152 self.assertRaises(TypeError, sqlite.register_adapter, {}, None) 153 154 def CheckSetIsolationLevel(self): 155 """ 156 See issue 3312. 157 """ 158 con = sqlite.connect(":memory:") 159 self.assertRaises(UnicodeEncodeError, setattr, con, 160 "isolation_level", u"\xe9") 161 162 def CheckCursorConstructorCallCheck(self): 163 """ 164 Verifies that cursor methods check whether base class __init__ was 165 called. 166 """ 167 class Cursor(sqlite.Cursor): 168 def __init__(self, con): 169 pass 170 171 con = sqlite.connect(":memory:") 172 cur = Cursor(con) 173 try: 174 cur.execute("select 4+5").fetchall() 175 self.fail("should have raised ProgrammingError") 176 except sqlite.ProgrammingError: 177 pass 178 except: 179 self.fail("should have raised ProgrammingError") 180 with self.assertRaisesRegexp(sqlite.ProgrammingError, 181 r'^Base Cursor\.__init__ not called\.$'): 182 cur.close() 183 184 def CheckConnectionConstructorCallCheck(self): 185 """ 186 Verifies that connection methods check whether base class __init__ was 187 called. 188 """ 189 class Connection(sqlite.Connection): 190 def __init__(self, name): 191 pass 192 193 con = Connection(":memory:") 194 try: 195 cur = con.cursor() 196 self.fail("should have raised ProgrammingError") 197 except sqlite.ProgrammingError: 198 pass 199 except: 200 self.fail("should have raised ProgrammingError") 201 202 def CheckCursorRegistration(self): 203 """ 204 Verifies that subclassed cursor classes are correctly registered with 205 the connection object, too. (fetch-across-rollback problem) 206 """ 207 class Connection(sqlite.Connection): 208 def cursor(self): 209 return Cursor(self) 210 211 class Cursor(sqlite.Cursor): 212 def __init__(self, con): 213 sqlite.Cursor.__init__(self, con) 214 215 con = Connection(":memory:") 216 cur = con.cursor() 217 cur.execute("create table foo(x)") 218 cur.executemany("insert into foo(x) values (?)", [(3,), (4,), (5,)]) 219 cur.execute("select x from foo") 220 con.rollback() 221 try: 222 cur.fetchall() 223 self.fail("should have raised InterfaceError") 224 except sqlite.InterfaceError: 225 pass 226 except: 227 self.fail("should have raised InterfaceError") 228 229 def CheckAutoCommit(self): 230 """ 231 Verifies that creating a connection in autocommit mode works. 232 2.5.3 introduced a regression so that these could no longer 233 be created. 234 """ 235 con = sqlite.connect(":memory:", isolation_level=None) 236 237 def CheckPragmaAutocommit(self): 238 """ 239 Verifies that running a PRAGMA statement that does an autocommit does 240 work. This did not work in 2.5.3/2.5.4. 241 """ 242 cur = self.con.cursor() 243 cur.execute("create table foo(bar)") 244 cur.execute("insert into foo(bar) values (5)") 245 246 cur.execute("pragma page_size") 247 row = cur.fetchone() 248 249 def CheckSetDict(self): 250 """ 251 See http://bugs.python.org/issue7478 252 253 It was possible to successfully register callbacks that could not be 254 hashed. Return codes of PyDict_SetItem were not checked properly. 255 """ 256 class NotHashable: 257 def __call__(self, *args, **kw): 258 pass 259 def __hash__(self): 260 raise TypeError() 261 var = NotHashable() 262 self.assertRaises(TypeError, self.con.create_function, var) 263 self.assertRaises(TypeError, self.con.create_aggregate, var) 264 self.assertRaises(TypeError, self.con.set_authorizer, var) 265 self.assertRaises(TypeError, self.con.set_progress_handler, var) 266 267 def CheckConnectionCall(self): 268 """ 269 Call a connection with a non-string SQL request: check error handling 270 of the statement constructor. 271 """ 272 self.assertRaises(sqlite.Warning, self.con, 1) 273 274 def CheckRecursiveCursorUse(self): 275 """ 276 http://bugs.python.org/issue10811 277 278 Recursively using a cursor, such as when reusing it from a generator led to segfaults. 279 Now we catch recursive cursor usage and raise a ProgrammingError. 280 """ 281 con = sqlite.connect(":memory:") 282 283 cur = con.cursor() 284 cur.execute("create table a (bar)") 285 cur.execute("create table b (baz)") 286 287 def foo(): 288 cur.execute("insert into a (bar) values (?)", (1,)) 289 yield 1 290 291 with self.assertRaises(sqlite.ProgrammingError): 292 cur.executemany("insert into b (baz) values (?)", 293 ((i,) for i in foo())) 294 295 def CheckConvertTimestampMicrosecondPadding(self): 296 """ 297 http://bugs.python.org/issue14720 298 299 The microsecond parsing of convert_timestamp() should pad with zeros, 300 since the microsecond string "456" actually represents "456000". 301 """ 302 303 con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES) 304 cur = con.cursor() 305 cur.execute("CREATE TABLE t (x TIMESTAMP)") 306 307 # Microseconds should be 456000 308 cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.456')") 309 310 # Microseconds should be truncated to 123456 311 cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.123456789')") 312 313 cur.execute("SELECT * FROM t") 314 values = [x[0] for x in cur.fetchall()] 315 316 self.assertEqual(values, [ 317 datetime.datetime(2012, 4, 4, 15, 6, 0, 456000), 318 datetime.datetime(2012, 4, 4, 15, 6, 0, 123456), 319 ]) 320 321 def CheckInvalidIsolationLevelType(self): 322 # isolation level is a string, not an integer 323 self.assertRaises(TypeError, 324 sqlite.connect, ":memory:", isolation_level=123) 325 326 327 def CheckNullCharacter(self): 328 # Issue #21147 329 con = sqlite.connect(":memory:") 330 self.assertRaises(ValueError, con, "\0select 1") 331 self.assertRaises(ValueError, con, "select 1\0") 332 cur = con.cursor() 333 self.assertRaises(ValueError, cur.execute, " \0select 2") 334 self.assertRaises(ValueError, cur.execute, "select 2\0") 335 336 def CheckCommitCursorReset(self): 337 """ 338 Connection.commit() did reset cursors, which made sqlite3 339 to return rows multiple times when fetched from cursors 340 after commit. See issues 10513 and 23129 for details. 341 """ 342 con = sqlite.connect(":memory:") 343 con.executescript(""" 344 create table t(c); 345 create table t2(c); 346 insert into t values(0); 347 insert into t values(1); 348 insert into t values(2); 349 """) 350 351 self.assertEqual(con.isolation_level, "") 352 353 counter = 0 354 for i, row in enumerate(con.execute("select c from t")): 355 con.execute("insert into t2(c) values (?)", (i,)) 356 con.commit() 357 if counter == 0: 358 self.assertEqual(row[0], 0) 359 elif counter == 1: 360 self.assertEqual(row[0], 1) 361 elif counter == 2: 362 self.assertEqual(row[0], 2) 363 counter += 1 364 self.assertEqual(counter, 3, "should have returned exactly three rows") 365 366 def CheckBpo31770(self): 367 """ 368 The interpreter shouldn't crash in case Cursor.__init__() is called 369 more than once. 370 """ 371 def callback(*args): 372 pass 373 con = sqlite.connect(":memory:") 374 cur = sqlite.Cursor(con) 375 ref = weakref.ref(cur, callback) 376 cur.__init__(con) 377 del cur 378 # The interpreter shouldn't crash when ref is collected. 379 del ref 380 support.gc_collect() 381 382 383def suite(): 384 regression_suite = unittest.makeSuite(RegressionTests, "Check") 385 return unittest.TestSuite((regression_suite,)) 386 387def test(): 388 runner = unittest.TextTestRunner() 389 runner.run(suite()) 390 391if __name__ == "__main__": 392 test() 393