1#-*- coding: iso-8859-1 -*- 2# pysqlite2/test/dbapi.py: tests for DB-API compliance 3# 4# Copyright (C) 2004-2010 Gerhard H�ring <gh@ghaering.de> 5# 6# This file is part of pysqlite. 7# 8# This software is provided 'as-is', without any express or implied 9# warranty. In no event will the authors be held liable for any damages 10# arising from the use of this software. 11# 12# Permission is granted to anyone to use this software for any purpose, 13# including commercial applications, and to alter it and redistribute it 14# freely, subject to the following restrictions: 15# 16# 1. The origin of this software must not be misrepresented; you must not 17# claim that you wrote the original software. If you use this software 18# in a product, an acknowledgment in the product documentation would be 19# appreciated but is not required. 20# 2. Altered source versions must be plainly marked as such, and must not be 21# misrepresented as being the original software. 22# 3. This notice may not be removed or altered from any source distribution. 23 24import threading 25import unittest 26import sqlite3 as sqlite 27 28from test.support import TESTFN, unlink 29 30 31class ModuleTests(unittest.TestCase): 32 def CheckAPILevel(self): 33 self.assertEqual(sqlite.apilevel, "2.0", 34 "apilevel is %s, should be 2.0" % sqlite.apilevel) 35 36 def CheckThreadSafety(self): 37 self.assertEqual(sqlite.threadsafety, 1, 38 "threadsafety is %d, should be 1" % sqlite.threadsafety) 39 40 def CheckParamStyle(self): 41 self.assertEqual(sqlite.paramstyle, "qmark", 42 "paramstyle is '%s', should be 'qmark'" % 43 sqlite.paramstyle) 44 45 def CheckWarning(self): 46 self.assertTrue(issubclass(sqlite.Warning, Exception), 47 "Warning is not a subclass of Exception") 48 49 def CheckError(self): 50 self.assertTrue(issubclass(sqlite.Error, Exception), 51 "Error is not a subclass of Exception") 52 53 def CheckInterfaceError(self): 54 self.assertTrue(issubclass(sqlite.InterfaceError, sqlite.Error), 55 "InterfaceError is not a subclass of Error") 56 57 def CheckDatabaseError(self): 58 self.assertTrue(issubclass(sqlite.DatabaseError, sqlite.Error), 59 "DatabaseError is not a subclass of Error") 60 61 def CheckDataError(self): 62 self.assertTrue(issubclass(sqlite.DataError, sqlite.DatabaseError), 63 "DataError is not a subclass of DatabaseError") 64 65 def CheckOperationalError(self): 66 self.assertTrue(issubclass(sqlite.OperationalError, sqlite.DatabaseError), 67 "OperationalError is not a subclass of DatabaseError") 68 69 def CheckIntegrityError(self): 70 self.assertTrue(issubclass(sqlite.IntegrityError, sqlite.DatabaseError), 71 "IntegrityError is not a subclass of DatabaseError") 72 73 def CheckInternalError(self): 74 self.assertTrue(issubclass(sqlite.InternalError, sqlite.DatabaseError), 75 "InternalError is not a subclass of DatabaseError") 76 77 def CheckProgrammingError(self): 78 self.assertTrue(issubclass(sqlite.ProgrammingError, sqlite.DatabaseError), 79 "ProgrammingError is not a subclass of DatabaseError") 80 81 def CheckNotSupportedError(self): 82 self.assertTrue(issubclass(sqlite.NotSupportedError, 83 sqlite.DatabaseError), 84 "NotSupportedError is not a subclass of DatabaseError") 85 86class ConnectionTests(unittest.TestCase): 87 88 def setUp(self): 89 self.cx = sqlite.connect(":memory:") 90 cu = self.cx.cursor() 91 cu.execute("create table test(id integer primary key, name text)") 92 cu.execute("insert into test(name) values (?)", ("foo",)) 93 94 def tearDown(self): 95 self.cx.close() 96 97 def CheckCommit(self): 98 self.cx.commit() 99 100 def CheckCommitAfterNoChanges(self): 101 """ 102 A commit should also work when no changes were made to the database. 103 """ 104 self.cx.commit() 105 self.cx.commit() 106 107 def CheckRollback(self): 108 self.cx.rollback() 109 110 def CheckRollbackAfterNoChanges(self): 111 """ 112 A rollback should also work when no changes were made to the database. 113 """ 114 self.cx.rollback() 115 self.cx.rollback() 116 117 def CheckCursor(self): 118 cu = self.cx.cursor() 119 120 def CheckFailedOpen(self): 121 YOU_CANNOT_OPEN_THIS = "/foo/bar/bla/23534/mydb.db" 122 with self.assertRaises(sqlite.OperationalError): 123 con = sqlite.connect(YOU_CANNOT_OPEN_THIS) 124 125 def CheckClose(self): 126 self.cx.close() 127 128 def CheckExceptions(self): 129 # Optional DB-API extension. 130 self.assertEqual(self.cx.Warning, sqlite.Warning) 131 self.assertEqual(self.cx.Error, sqlite.Error) 132 self.assertEqual(self.cx.InterfaceError, sqlite.InterfaceError) 133 self.assertEqual(self.cx.DatabaseError, sqlite.DatabaseError) 134 self.assertEqual(self.cx.DataError, sqlite.DataError) 135 self.assertEqual(self.cx.OperationalError, sqlite.OperationalError) 136 self.assertEqual(self.cx.IntegrityError, sqlite.IntegrityError) 137 self.assertEqual(self.cx.InternalError, sqlite.InternalError) 138 self.assertEqual(self.cx.ProgrammingError, sqlite.ProgrammingError) 139 self.assertEqual(self.cx.NotSupportedError, sqlite.NotSupportedError) 140 141 def CheckInTransaction(self): 142 # Can't use db from setUp because we want to test initial state. 143 cx = sqlite.connect(":memory:") 144 cu = cx.cursor() 145 self.assertEqual(cx.in_transaction, False) 146 cu.execute("create table transactiontest(id integer primary key, name text)") 147 self.assertEqual(cx.in_transaction, False) 148 cu.execute("insert into transactiontest(name) values (?)", ("foo",)) 149 self.assertEqual(cx.in_transaction, True) 150 cu.execute("select name from transactiontest where name=?", ["foo"]) 151 row = cu.fetchone() 152 self.assertEqual(cx.in_transaction, True) 153 cx.commit() 154 self.assertEqual(cx.in_transaction, False) 155 cu.execute("select name from transactiontest where name=?", ["foo"]) 156 row = cu.fetchone() 157 self.assertEqual(cx.in_transaction, False) 158 159 def CheckInTransactionRO(self): 160 with self.assertRaises(AttributeError): 161 self.cx.in_transaction = True 162 163 def CheckOpenWithPathLikeObject(self): 164 """ Checks that we can successfully connect to a database using an object that 165 is PathLike, i.e. has __fspath__(). """ 166 self.addCleanup(unlink, TESTFN) 167 class Path: 168 def __fspath__(self): 169 return TESTFN 170 path = Path() 171 with sqlite.connect(path) as cx: 172 cx.execute('create table test(id integer)') 173 174 def CheckOpenUri(self): 175 if sqlite.sqlite_version_info < (3, 7, 7): 176 with self.assertRaises(sqlite.NotSupportedError): 177 sqlite.connect(':memory:', uri=True) 178 return 179 self.addCleanup(unlink, TESTFN) 180 with sqlite.connect(TESTFN) as cx: 181 cx.execute('create table test(id integer)') 182 with sqlite.connect('file:' + TESTFN, uri=True) as cx: 183 cx.execute('insert into test(id) values(0)') 184 with sqlite.connect('file:' + TESTFN + '?mode=ro', uri=True) as cx: 185 with self.assertRaises(sqlite.OperationalError): 186 cx.execute('insert into test(id) values(1)') 187 188 @unittest.skipIf(sqlite.sqlite_version_info >= (3, 3, 1), 189 'needs sqlite versions older than 3.3.1') 190 def CheckSameThreadErrorOnOldVersion(self): 191 with self.assertRaises(sqlite.NotSupportedError) as cm: 192 sqlite.connect(':memory:', check_same_thread=False) 193 self.assertEqual(str(cm.exception), 'shared connections not available') 194 195class CursorTests(unittest.TestCase): 196 def setUp(self): 197 self.cx = sqlite.connect(":memory:") 198 self.cu = self.cx.cursor() 199 self.cu.execute( 200 "create table test(id integer primary key, name text, " 201 "income number, unique_test text unique)" 202 ) 203 self.cu.execute("insert into test(name) values (?)", ("foo",)) 204 205 def tearDown(self): 206 self.cu.close() 207 self.cx.close() 208 209 def CheckExecuteNoArgs(self): 210 self.cu.execute("delete from test") 211 212 def CheckExecuteIllegalSql(self): 213 with self.assertRaises(sqlite.OperationalError): 214 self.cu.execute("select asdf") 215 216 def CheckExecuteTooMuchSql(self): 217 with self.assertRaises(sqlite.Warning): 218 self.cu.execute("select 5+4; select 4+5") 219 220 def CheckExecuteTooMuchSql2(self): 221 self.cu.execute("select 5+4; -- foo bar") 222 223 def CheckExecuteTooMuchSql3(self): 224 self.cu.execute(""" 225 select 5+4; 226 227 /* 228 foo 229 */ 230 """) 231 232 def CheckExecuteWrongSqlArg(self): 233 with self.assertRaises(TypeError): 234 self.cu.execute(42) 235 236 def CheckExecuteArgInt(self): 237 self.cu.execute("insert into test(id) values (?)", (42,)) 238 239 def CheckExecuteArgFloat(self): 240 self.cu.execute("insert into test(income) values (?)", (2500.32,)) 241 242 def CheckExecuteArgString(self): 243 self.cu.execute("insert into test(name) values (?)", ("Hugo",)) 244 245 def CheckExecuteArgStringWithZeroByte(self): 246 self.cu.execute("insert into test(name) values (?)", ("Hu\x00go",)) 247 248 self.cu.execute("select name from test where id=?", (self.cu.lastrowid,)) 249 row = self.cu.fetchone() 250 self.assertEqual(row[0], "Hu\x00go") 251 252 def CheckExecuteNonIterable(self): 253 with self.assertRaises(ValueError) as cm: 254 self.cu.execute("insert into test(id) values (?)", 42) 255 self.assertEqual(str(cm.exception), 'parameters are of unsupported type') 256 257 def CheckExecuteWrongNoOfArgs1(self): 258 # too many parameters 259 with self.assertRaises(sqlite.ProgrammingError): 260 self.cu.execute("insert into test(id) values (?)", (17, "Egon")) 261 262 def CheckExecuteWrongNoOfArgs2(self): 263 # too little parameters 264 with self.assertRaises(sqlite.ProgrammingError): 265 self.cu.execute("insert into test(id) values (?)") 266 267 def CheckExecuteWrongNoOfArgs3(self): 268 # no parameters, parameters are needed 269 with self.assertRaises(sqlite.ProgrammingError): 270 self.cu.execute("insert into test(id) values (?)") 271 272 def CheckExecuteParamList(self): 273 self.cu.execute("insert into test(name) values ('foo')") 274 self.cu.execute("select name from test where name=?", ["foo"]) 275 row = self.cu.fetchone() 276 self.assertEqual(row[0], "foo") 277 278 def CheckExecuteParamSequence(self): 279 class L: 280 def __len__(self): 281 return 1 282 def __getitem__(self, x): 283 assert x == 0 284 return "foo" 285 286 self.cu.execute("insert into test(name) values ('foo')") 287 self.cu.execute("select name from test where name=?", L()) 288 row = self.cu.fetchone() 289 self.assertEqual(row[0], "foo") 290 291 def CheckExecuteParamSequenceBadLen(self): 292 # Issue41662: Error in __len__() was overridden with ProgrammingError. 293 class L: 294 def __len__(self): 295 1/0 296 def __getitem__(slf, x): 297 raise AssertionError 298 299 self.cu.execute("insert into test(name) values ('foo')") 300 with self.assertRaises(ZeroDivisionError): 301 self.cu.execute("select name from test where name=?", L()) 302 303 def CheckExecuteDictMapping(self): 304 self.cu.execute("insert into test(name) values ('foo')") 305 self.cu.execute("select name from test where name=:name", {"name": "foo"}) 306 row = self.cu.fetchone() 307 self.assertEqual(row[0], "foo") 308 309 def CheckExecuteDictMapping_Mapping(self): 310 class D(dict): 311 def __missing__(self, key): 312 return "foo" 313 314 self.cu.execute("insert into test(name) values ('foo')") 315 self.cu.execute("select name from test where name=:name", D()) 316 row = self.cu.fetchone() 317 self.assertEqual(row[0], "foo") 318 319 def CheckExecuteDictMappingTooLittleArgs(self): 320 self.cu.execute("insert into test(name) values ('foo')") 321 with self.assertRaises(sqlite.ProgrammingError): 322 self.cu.execute("select name from test where name=:name and id=:id", {"name": "foo"}) 323 324 def CheckExecuteDictMappingNoArgs(self): 325 self.cu.execute("insert into test(name) values ('foo')") 326 with self.assertRaises(sqlite.ProgrammingError): 327 self.cu.execute("select name from test where name=:name") 328 329 def CheckExecuteDictMappingUnnamed(self): 330 self.cu.execute("insert into test(name) values ('foo')") 331 with self.assertRaises(sqlite.ProgrammingError): 332 self.cu.execute("select name from test where name=?", {"name": "foo"}) 333 334 def CheckClose(self): 335 self.cu.close() 336 337 def CheckRowcountExecute(self): 338 self.cu.execute("delete from test") 339 self.cu.execute("insert into test(name) values ('foo')") 340 self.cu.execute("insert into test(name) values ('foo')") 341 self.cu.execute("update test set name='bar'") 342 self.assertEqual(self.cu.rowcount, 2) 343 344 def CheckRowcountSelect(self): 345 """ 346 pysqlite does not know the rowcount of SELECT statements, because we 347 don't fetch all rows after executing the select statement. The rowcount 348 has thus to be -1. 349 """ 350 self.cu.execute("select 5 union select 6") 351 self.assertEqual(self.cu.rowcount, -1) 352 353 def CheckRowcountExecutemany(self): 354 self.cu.execute("delete from test") 355 self.cu.executemany("insert into test(name) values (?)", [(1,), (2,), (3,)]) 356 self.assertEqual(self.cu.rowcount, 3) 357 358 def CheckTotalChanges(self): 359 self.cu.execute("insert into test(name) values ('foo')") 360 self.cu.execute("insert into test(name) values ('foo')") 361 self.assertLess(2, self.cx.total_changes, msg='total changes reported wrong value') 362 363 # Checks for executemany: 364 # Sequences are required by the DB-API, iterators 365 # enhancements in pysqlite. 366 367 def CheckExecuteManySequence(self): 368 self.cu.executemany("insert into test(income) values (?)", [(x,) for x in range(100, 110)]) 369 370 def CheckExecuteManyIterator(self): 371 class MyIter: 372 def __init__(self): 373 self.value = 5 374 375 def __next__(self): 376 if self.value == 10: 377 raise StopIteration 378 else: 379 self.value += 1 380 return (self.value,) 381 382 self.cu.executemany("insert into test(income) values (?)", MyIter()) 383 384 def CheckExecuteManyGenerator(self): 385 def mygen(): 386 for i in range(5): 387 yield (i,) 388 389 self.cu.executemany("insert into test(income) values (?)", mygen()) 390 391 def CheckExecuteManyWrongSqlArg(self): 392 with self.assertRaises(TypeError): 393 self.cu.executemany(42, [(3,)]) 394 395 def CheckExecuteManySelect(self): 396 with self.assertRaises(sqlite.ProgrammingError): 397 self.cu.executemany("select ?", [(3,)]) 398 399 def CheckExecuteManyNotIterable(self): 400 with self.assertRaises(TypeError): 401 self.cu.executemany("insert into test(income) values (?)", 42) 402 403 def CheckFetchIter(self): 404 # Optional DB-API extension. 405 self.cu.execute("delete from test") 406 self.cu.execute("insert into test(id) values (?)", (5,)) 407 self.cu.execute("insert into test(id) values (?)", (6,)) 408 self.cu.execute("select id from test order by id") 409 lst = [] 410 for row in self.cu: 411 lst.append(row[0]) 412 self.assertEqual(lst[0], 5) 413 self.assertEqual(lst[1], 6) 414 415 def CheckFetchone(self): 416 self.cu.execute("select name from test") 417 row = self.cu.fetchone() 418 self.assertEqual(row[0], "foo") 419 row = self.cu.fetchone() 420 self.assertEqual(row, None) 421 422 def CheckFetchoneNoStatement(self): 423 cur = self.cx.cursor() 424 row = cur.fetchone() 425 self.assertEqual(row, None) 426 427 def CheckArraySize(self): 428 # must default ot 1 429 self.assertEqual(self.cu.arraysize, 1) 430 431 # now set to 2 432 self.cu.arraysize = 2 433 434 # now make the query return 3 rows 435 self.cu.execute("delete from test") 436 self.cu.execute("insert into test(name) values ('A')") 437 self.cu.execute("insert into test(name) values ('B')") 438 self.cu.execute("insert into test(name) values ('C')") 439 self.cu.execute("select name from test") 440 res = self.cu.fetchmany() 441 442 self.assertEqual(len(res), 2) 443 444 def CheckFetchmany(self): 445 self.cu.execute("select name from test") 446 res = self.cu.fetchmany(100) 447 self.assertEqual(len(res), 1) 448 res = self.cu.fetchmany(100) 449 self.assertEqual(res, []) 450 451 def CheckFetchmanyKwArg(self): 452 """Checks if fetchmany works with keyword arguments""" 453 self.cu.execute("select name from test") 454 res = self.cu.fetchmany(size=100) 455 self.assertEqual(len(res), 1) 456 457 def CheckFetchall(self): 458 self.cu.execute("select name from test") 459 res = self.cu.fetchall() 460 self.assertEqual(len(res), 1) 461 res = self.cu.fetchall() 462 self.assertEqual(res, []) 463 464 def CheckSetinputsizes(self): 465 self.cu.setinputsizes([3, 4, 5]) 466 467 def CheckSetoutputsize(self): 468 self.cu.setoutputsize(5, 0) 469 470 def CheckSetoutputsizeNoColumn(self): 471 self.cu.setoutputsize(42) 472 473 def CheckCursorConnection(self): 474 # Optional DB-API extension. 475 self.assertEqual(self.cu.connection, self.cx) 476 477 def CheckWrongCursorCallable(self): 478 with self.assertRaises(TypeError): 479 def f(): pass 480 cur = self.cx.cursor(f) 481 482 def CheckCursorWrongClass(self): 483 class Foo: pass 484 foo = Foo() 485 with self.assertRaises(TypeError): 486 cur = sqlite.Cursor(foo) 487 488 def CheckLastRowIDOnReplace(self): 489 """ 490 INSERT OR REPLACE and REPLACE INTO should produce the same behavior. 491 """ 492 sql = '{} INTO test(id, unique_test) VALUES (?, ?)' 493 for statement in ('INSERT OR REPLACE', 'REPLACE'): 494 with self.subTest(statement=statement): 495 self.cu.execute(sql.format(statement), (1, 'foo')) 496 self.assertEqual(self.cu.lastrowid, 1) 497 498 def CheckLastRowIDOnIgnore(self): 499 self.cu.execute( 500 "insert or ignore into test(unique_test) values (?)", 501 ('test',)) 502 self.assertEqual(self.cu.lastrowid, 2) 503 self.cu.execute( 504 "insert or ignore into test(unique_test) values (?)", 505 ('test',)) 506 self.assertEqual(self.cu.lastrowid, 2) 507 508 def CheckLastRowIDInsertOR(self): 509 results = [] 510 for statement in ('FAIL', 'ABORT', 'ROLLBACK'): 511 sql = 'INSERT OR {} INTO test(unique_test) VALUES (?)' 512 with self.subTest(statement='INSERT OR {}'.format(statement)): 513 self.cu.execute(sql.format(statement), (statement,)) 514 results.append((statement, self.cu.lastrowid)) 515 with self.assertRaises(sqlite.IntegrityError): 516 self.cu.execute(sql.format(statement), (statement,)) 517 results.append((statement, self.cu.lastrowid)) 518 expected = [ 519 ('FAIL', 2), ('FAIL', 2), 520 ('ABORT', 3), ('ABORT', 3), 521 ('ROLLBACK', 4), ('ROLLBACK', 4), 522 ] 523 self.assertEqual(results, expected) 524 525 526class ThreadTests(unittest.TestCase): 527 def setUp(self): 528 self.con = sqlite.connect(":memory:") 529 self.cur = self.con.cursor() 530 self.cur.execute("create table test(id integer primary key, name text, bin binary, ratio number, ts timestamp)") 531 532 def tearDown(self): 533 self.cur.close() 534 self.con.close() 535 536 def CheckConCursor(self): 537 def run(con, errors): 538 try: 539 cur = con.cursor() 540 errors.append("did not raise ProgrammingError") 541 return 542 except sqlite.ProgrammingError: 543 return 544 except: 545 errors.append("raised wrong exception") 546 547 errors = [] 548 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors}) 549 t.start() 550 t.join() 551 if len(errors) > 0: 552 self.fail("\n".join(errors)) 553 554 def CheckConCommit(self): 555 def run(con, errors): 556 try: 557 con.commit() 558 errors.append("did not raise ProgrammingError") 559 return 560 except sqlite.ProgrammingError: 561 return 562 except: 563 errors.append("raised wrong exception") 564 565 errors = [] 566 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors}) 567 t.start() 568 t.join() 569 if len(errors) > 0: 570 self.fail("\n".join(errors)) 571 572 def CheckConRollback(self): 573 def run(con, errors): 574 try: 575 con.rollback() 576 errors.append("did not raise ProgrammingError") 577 return 578 except sqlite.ProgrammingError: 579 return 580 except: 581 errors.append("raised wrong exception") 582 583 errors = [] 584 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors}) 585 t.start() 586 t.join() 587 if len(errors) > 0: 588 self.fail("\n".join(errors)) 589 590 def CheckConClose(self): 591 def run(con, errors): 592 try: 593 con.close() 594 errors.append("did not raise ProgrammingError") 595 return 596 except sqlite.ProgrammingError: 597 return 598 except: 599 errors.append("raised wrong exception") 600 601 errors = [] 602 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors}) 603 t.start() 604 t.join() 605 if len(errors) > 0: 606 self.fail("\n".join(errors)) 607 608 def CheckCurImplicitBegin(self): 609 def run(cur, errors): 610 try: 611 cur.execute("insert into test(name) values ('a')") 612 errors.append("did not raise ProgrammingError") 613 return 614 except sqlite.ProgrammingError: 615 return 616 except: 617 errors.append("raised wrong exception") 618 619 errors = [] 620 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors}) 621 t.start() 622 t.join() 623 if len(errors) > 0: 624 self.fail("\n".join(errors)) 625 626 def CheckCurClose(self): 627 def run(cur, errors): 628 try: 629 cur.close() 630 errors.append("did not raise ProgrammingError") 631 return 632 except sqlite.ProgrammingError: 633 return 634 except: 635 errors.append("raised wrong exception") 636 637 errors = [] 638 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors}) 639 t.start() 640 t.join() 641 if len(errors) > 0: 642 self.fail("\n".join(errors)) 643 644 def CheckCurExecute(self): 645 def run(cur, errors): 646 try: 647 cur.execute("select name from test") 648 errors.append("did not raise ProgrammingError") 649 return 650 except sqlite.ProgrammingError: 651 return 652 except: 653 errors.append("raised wrong exception") 654 655 errors = [] 656 self.cur.execute("insert into test(name) values ('a')") 657 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors}) 658 t.start() 659 t.join() 660 if len(errors) > 0: 661 self.fail("\n".join(errors)) 662 663 def CheckCurIterNext(self): 664 def run(cur, errors): 665 try: 666 row = cur.fetchone() 667 errors.append("did not raise ProgrammingError") 668 return 669 except sqlite.ProgrammingError: 670 return 671 except: 672 errors.append("raised wrong exception") 673 674 errors = [] 675 self.cur.execute("insert into test(name) values ('a')") 676 self.cur.execute("select name from test") 677 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors}) 678 t.start() 679 t.join() 680 if len(errors) > 0: 681 self.fail("\n".join(errors)) 682 683class ConstructorTests(unittest.TestCase): 684 def CheckDate(self): 685 d = sqlite.Date(2004, 10, 28) 686 687 def CheckTime(self): 688 t = sqlite.Time(12, 39, 35) 689 690 def CheckTimestamp(self): 691 ts = sqlite.Timestamp(2004, 10, 28, 12, 39, 35) 692 693 def CheckDateFromTicks(self): 694 d = sqlite.DateFromTicks(42) 695 696 def CheckTimeFromTicks(self): 697 t = sqlite.TimeFromTicks(42) 698 699 def CheckTimestampFromTicks(self): 700 ts = sqlite.TimestampFromTicks(42) 701 702 def CheckBinary(self): 703 b = sqlite.Binary(b"\0'") 704 705class ExtensionTests(unittest.TestCase): 706 def CheckScriptStringSql(self): 707 con = sqlite.connect(":memory:") 708 cur = con.cursor() 709 cur.executescript(""" 710 -- bla bla 711 /* a stupid comment */ 712 create table a(i); 713 insert into a(i) values (5); 714 """) 715 cur.execute("select i from a") 716 res = cur.fetchone()[0] 717 self.assertEqual(res, 5) 718 719 def CheckScriptSyntaxError(self): 720 con = sqlite.connect(":memory:") 721 cur = con.cursor() 722 with self.assertRaises(sqlite.OperationalError): 723 cur.executescript("create table test(x); asdf; create table test2(x)") 724 725 def CheckScriptErrorNormal(self): 726 con = sqlite.connect(":memory:") 727 cur = con.cursor() 728 with self.assertRaises(sqlite.OperationalError): 729 cur.executescript("create table test(sadfsadfdsa); select foo from hurz;") 730 731 def CheckCursorExecutescriptAsBytes(self): 732 con = sqlite.connect(":memory:") 733 cur = con.cursor() 734 with self.assertRaises(ValueError) as cm: 735 cur.executescript(b"create table test(foo); insert into test(foo) values (5);") 736 self.assertEqual(str(cm.exception), 'script argument must be unicode.') 737 738 def CheckConnectionExecute(self): 739 con = sqlite.connect(":memory:") 740 result = con.execute("select 5").fetchone()[0] 741 self.assertEqual(result, 5, "Basic test of Connection.execute") 742 743 def CheckConnectionExecutemany(self): 744 con = sqlite.connect(":memory:") 745 con.execute("create table test(foo)") 746 con.executemany("insert into test(foo) values (?)", [(3,), (4,)]) 747 result = con.execute("select foo from test order by foo").fetchall() 748 self.assertEqual(result[0][0], 3, "Basic test of Connection.executemany") 749 self.assertEqual(result[1][0], 4, "Basic test of Connection.executemany") 750 751 def CheckConnectionExecutescript(self): 752 con = sqlite.connect(":memory:") 753 con.executescript("create table test(foo); insert into test(foo) values (5);") 754 result = con.execute("select foo from test").fetchone()[0] 755 self.assertEqual(result, 5, "Basic test of Connection.executescript") 756 757class ClosedConTests(unittest.TestCase): 758 def CheckClosedConCursor(self): 759 con = sqlite.connect(":memory:") 760 con.close() 761 with self.assertRaises(sqlite.ProgrammingError): 762 cur = con.cursor() 763 764 def CheckClosedConCommit(self): 765 con = sqlite.connect(":memory:") 766 con.close() 767 with self.assertRaises(sqlite.ProgrammingError): 768 con.commit() 769 770 def CheckClosedConRollback(self): 771 con = sqlite.connect(":memory:") 772 con.close() 773 with self.assertRaises(sqlite.ProgrammingError): 774 con.rollback() 775 776 def CheckClosedCurExecute(self): 777 con = sqlite.connect(":memory:") 778 cur = con.cursor() 779 con.close() 780 with self.assertRaises(sqlite.ProgrammingError): 781 cur.execute("select 4") 782 783 def CheckClosedCreateFunction(self): 784 con = sqlite.connect(":memory:") 785 con.close() 786 def f(x): return 17 787 with self.assertRaises(sqlite.ProgrammingError): 788 con.create_function("foo", 1, f) 789 790 def CheckClosedCreateAggregate(self): 791 con = sqlite.connect(":memory:") 792 con.close() 793 class Agg: 794 def __init__(self): 795 pass 796 def step(self, x): 797 pass 798 def finalize(self): 799 return 17 800 with self.assertRaises(sqlite.ProgrammingError): 801 con.create_aggregate("foo", 1, Agg) 802 803 def CheckClosedSetAuthorizer(self): 804 con = sqlite.connect(":memory:") 805 con.close() 806 def authorizer(*args): 807 return sqlite.DENY 808 with self.assertRaises(sqlite.ProgrammingError): 809 con.set_authorizer(authorizer) 810 811 def CheckClosedSetProgressCallback(self): 812 con = sqlite.connect(":memory:") 813 con.close() 814 def progress(): pass 815 with self.assertRaises(sqlite.ProgrammingError): 816 con.set_progress_handler(progress, 100) 817 818 def CheckClosedCall(self): 819 con = sqlite.connect(":memory:") 820 con.close() 821 with self.assertRaises(sqlite.ProgrammingError): 822 con() 823 824class ClosedCurTests(unittest.TestCase): 825 def CheckClosed(self): 826 con = sqlite.connect(":memory:") 827 cur = con.cursor() 828 cur.close() 829 830 for method_name in ("execute", "executemany", "executescript", "fetchall", "fetchmany", "fetchone"): 831 if method_name in ("execute", "executescript"): 832 params = ("select 4 union select 5",) 833 elif method_name == "executemany": 834 params = ("insert into foo(bar) values (?)", [(3,), (4,)]) 835 else: 836 params = [] 837 838 with self.assertRaises(sqlite.ProgrammingError): 839 method = getattr(cur, method_name) 840 method(*params) 841 842 843class SqliteOnConflictTests(unittest.TestCase): 844 """ 845 Tests for SQLite's "insert on conflict" feature. 846 847 See https://www.sqlite.org/lang_conflict.html for details. 848 """ 849 850 def setUp(self): 851 self.cx = sqlite.connect(":memory:") 852 self.cu = self.cx.cursor() 853 self.cu.execute(""" 854 CREATE TABLE test( 855 id INTEGER PRIMARY KEY, name TEXT, unique_name TEXT UNIQUE 856 ); 857 """) 858 859 def tearDown(self): 860 self.cu.close() 861 self.cx.close() 862 863 def CheckOnConflictRollbackWithExplicitTransaction(self): 864 self.cx.isolation_level = None # autocommit mode 865 self.cu = self.cx.cursor() 866 # Start an explicit transaction. 867 self.cu.execute("BEGIN") 868 self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')") 869 self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')") 870 with self.assertRaises(sqlite.IntegrityError): 871 self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')") 872 # Use connection to commit. 873 self.cx.commit() 874 self.cu.execute("SELECT name, unique_name from test") 875 # Transaction should have rolled back and nothing should be in table. 876 self.assertEqual(self.cu.fetchall(), []) 877 878 def CheckOnConflictAbortRaisesWithExplicitTransactions(self): 879 # Abort cancels the current sql statement but doesn't change anything 880 # about the current transaction. 881 self.cx.isolation_level = None # autocommit mode 882 self.cu = self.cx.cursor() 883 # Start an explicit transaction. 884 self.cu.execute("BEGIN") 885 self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')") 886 self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')") 887 with self.assertRaises(sqlite.IntegrityError): 888 self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')") 889 self.cx.commit() 890 self.cu.execute("SELECT name, unique_name FROM test") 891 # Expect the first two inserts to work, third to do nothing. 892 self.assertEqual(self.cu.fetchall(), [('abort_test', None), (None, 'foo',)]) 893 894 def CheckOnConflictRollbackWithoutTransaction(self): 895 # Start of implicit transaction 896 self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')") 897 self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')") 898 with self.assertRaises(sqlite.IntegrityError): 899 self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')") 900 self.cu.execute("SELECT name, unique_name FROM test") 901 # Implicit transaction is rolled back on error. 902 self.assertEqual(self.cu.fetchall(), []) 903 904 def CheckOnConflictAbortRaisesWithoutTransactions(self): 905 # Abort cancels the current sql statement but doesn't change anything 906 # about the current transaction. 907 self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')") 908 self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')") 909 with self.assertRaises(sqlite.IntegrityError): 910 self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')") 911 # Make sure all other values were inserted. 912 self.cu.execute("SELECT name, unique_name FROM test") 913 self.assertEqual(self.cu.fetchall(), [('abort_test', None), (None, 'foo',)]) 914 915 def CheckOnConflictFail(self): 916 self.cu.execute("INSERT OR FAIL INTO test(unique_name) VALUES ('foo')") 917 with self.assertRaises(sqlite.IntegrityError): 918 self.cu.execute("INSERT OR FAIL INTO test(unique_name) VALUES ('foo')") 919 self.assertEqual(self.cu.fetchall(), []) 920 921 def CheckOnConflictIgnore(self): 922 self.cu.execute("INSERT OR IGNORE INTO test(unique_name) VALUES ('foo')") 923 # Nothing should happen. 924 self.cu.execute("INSERT OR IGNORE INTO test(unique_name) VALUES ('foo')") 925 self.cu.execute("SELECT unique_name FROM test") 926 self.assertEqual(self.cu.fetchall(), [('foo',)]) 927 928 def CheckOnConflictReplace(self): 929 self.cu.execute("INSERT OR REPLACE INTO test(name, unique_name) VALUES ('Data!', 'foo')") 930 # There shouldn't be an IntegrityError exception. 931 self.cu.execute("INSERT OR REPLACE INTO test(name, unique_name) VALUES ('Very different data!', 'foo')") 932 self.cu.execute("SELECT name, unique_name FROM test") 933 self.assertEqual(self.cu.fetchall(), [('Very different data!', 'foo')]) 934 935 936def suite(): 937 module_suite = unittest.makeSuite(ModuleTests, "Check") 938 connection_suite = unittest.makeSuite(ConnectionTests, "Check") 939 cursor_suite = unittest.makeSuite(CursorTests, "Check") 940 thread_suite = unittest.makeSuite(ThreadTests, "Check") 941 constructor_suite = unittest.makeSuite(ConstructorTests, "Check") 942 ext_suite = unittest.makeSuite(ExtensionTests, "Check") 943 closed_con_suite = unittest.makeSuite(ClosedConTests, "Check") 944 closed_cur_suite = unittest.makeSuite(ClosedCurTests, "Check") 945 on_conflict_suite = unittest.makeSuite(SqliteOnConflictTests, "Check") 946 return unittest.TestSuite(( 947 module_suite, connection_suite, cursor_suite, thread_suite, 948 constructor_suite, ext_suite, closed_con_suite, closed_cur_suite, 949 on_conflict_suite, 950 )) 951 952def test(): 953 runner = unittest.TextTestRunner() 954 runner.run(suite()) 955 956if __name__ == "__main__": 957 test() 958