1#-*- coding: ISO-8859-1 -*- 2# pysqlite2/test/dbapi.py: tests for DB-API compliance 3# 4# Copyright (C) 2004-2010 Gerhard H�ring <gh@ghaering.de> 5# 6# This file is part of pysqlite. 7# 8# This software is provided 'as-is', without any express or implied 9# warranty. In no event will the authors be held liable for any damages 10# arising from the use of this software. 11# 12# Permission is granted to anyone to use this software for any purpose, 13# including commercial applications, and to alter it and redistribute it 14# freely, subject to the following restrictions: 15# 16# 1. The origin of this software must not be misrepresented; you must not 17# claim that you wrote the original software. If you use this software 18# in a product, an acknowledgment in the product documentation would be 19# appreciated but is not required. 20# 2. Altered source versions must be plainly marked as such, and must not be 21# misrepresented as being the original software. 22# 3. This notice may not be removed or altered from any source distribution. 23 24import unittest 25import sys 26import sqlite3 as sqlite 27from test import test_support 28try: 29 import threading 30except ImportError: 31 threading = None 32 33class ModuleTests(unittest.TestCase): 34 def CheckAPILevel(self): 35 self.assertEqual(sqlite.apilevel, "2.0", 36 "apilevel is %s, should be 2.0" % sqlite.apilevel) 37 38 def CheckThreadSafety(self): 39 self.assertEqual(sqlite.threadsafety, 1, 40 "threadsafety is %d, should be 1" % sqlite.threadsafety) 41 42 def CheckParamStyle(self): 43 self.assertEqual(sqlite.paramstyle, "qmark", 44 "paramstyle is '%s', should be 'qmark'" % 45 sqlite.paramstyle) 46 47 def CheckWarning(self): 48 self.assertTrue(issubclass(sqlite.Warning, StandardError), 49 "Warning is not a subclass of StandardError") 50 51 def CheckError(self): 52 self.assertTrue(issubclass(sqlite.Error, StandardError), 53 "Error is not a subclass of StandardError") 54 55 def CheckInterfaceError(self): 56 self.assertTrue(issubclass(sqlite.InterfaceError, sqlite.Error), 57 "InterfaceError is not a subclass of Error") 58 59 def CheckDatabaseError(self): 60 self.assertTrue(issubclass(sqlite.DatabaseError, sqlite.Error), 61 "DatabaseError is not a subclass of Error") 62 63 def CheckDataError(self): 64 self.assertTrue(issubclass(sqlite.DataError, sqlite.DatabaseError), 65 "DataError is not a subclass of DatabaseError") 66 67 def CheckOperationalError(self): 68 self.assertTrue(issubclass(sqlite.OperationalError, sqlite.DatabaseError), 69 "OperationalError is not a subclass of DatabaseError") 70 71 def CheckIntegrityError(self): 72 self.assertTrue(issubclass(sqlite.IntegrityError, sqlite.DatabaseError), 73 "IntegrityError is not a subclass of DatabaseError") 74 75 def CheckInternalError(self): 76 self.assertTrue(issubclass(sqlite.InternalError, sqlite.DatabaseError), 77 "InternalError is not a subclass of DatabaseError") 78 79 def CheckProgrammingError(self): 80 self.assertTrue(issubclass(sqlite.ProgrammingError, sqlite.DatabaseError), 81 "ProgrammingError is not a subclass of DatabaseError") 82 83 def CheckNotSupportedError(self): 84 self.assertTrue(issubclass(sqlite.NotSupportedError, 85 sqlite.DatabaseError), 86 "NotSupportedError is not a subclass of DatabaseError") 87 88class ConnectionTests(unittest.TestCase): 89 def setUp(self): 90 self.cx = sqlite.connect(":memory:") 91 cu = self.cx.cursor() 92 cu.execute("create table test(id integer primary key, name text)") 93 cu.execute("insert into test(name) values (?)", ("foo",)) 94 95 def tearDown(self): 96 self.cx.close() 97 98 def CheckCommit(self): 99 self.cx.commit() 100 101 def CheckCommitAfterNoChanges(self): 102 """ 103 A commit should also work when no changes were made to the database. 104 """ 105 self.cx.commit() 106 self.cx.commit() 107 108 def CheckRollback(self): 109 self.cx.rollback() 110 111 def CheckRollbackAfterNoChanges(self): 112 """ 113 A rollback should also work when no changes were made to the database. 114 """ 115 self.cx.rollback() 116 self.cx.rollback() 117 118 def CheckCursor(self): 119 cu = self.cx.cursor() 120 121 def CheckFailedOpen(self): 122 YOU_CANNOT_OPEN_THIS = "/foo/bar/bla/23534/mydb.db" 123 try: 124 con = sqlite.connect(YOU_CANNOT_OPEN_THIS) 125 except sqlite.OperationalError: 126 return 127 self.fail("should have raised an OperationalError") 128 129 def CheckClose(self): 130 self.cx.close() 131 132 def CheckExceptions(self): 133 # Optional DB-API extension. 134 self.assertEqual(self.cx.Warning, sqlite.Warning) 135 self.assertEqual(self.cx.Error, sqlite.Error) 136 self.assertEqual(self.cx.InterfaceError, sqlite.InterfaceError) 137 self.assertEqual(self.cx.DatabaseError, sqlite.DatabaseError) 138 self.assertEqual(self.cx.DataError, sqlite.DataError) 139 self.assertEqual(self.cx.OperationalError, sqlite.OperationalError) 140 self.assertEqual(self.cx.IntegrityError, sqlite.IntegrityError) 141 self.assertEqual(self.cx.InternalError, sqlite.InternalError) 142 self.assertEqual(self.cx.ProgrammingError, sqlite.ProgrammingError) 143 self.assertEqual(self.cx.NotSupportedError, sqlite.NotSupportedError) 144 145class CursorTests(unittest.TestCase): 146 def setUp(self): 147 self.cx = sqlite.connect(":memory:") 148 self.cu = self.cx.cursor() 149 self.cu.execute("create table test(id integer primary key, name text, income number)") 150 self.cu.execute("insert into test(name) values (?)", ("foo",)) 151 152 def tearDown(self): 153 self.cu.close() 154 self.cx.close() 155 156 def CheckExecuteNoArgs(self): 157 self.cu.execute("delete from test") 158 159 def CheckExecuteIllegalSql(self): 160 try: 161 self.cu.execute("select asdf") 162 self.fail("should have raised an OperationalError") 163 except sqlite.OperationalError: 164 return 165 except: 166 self.fail("raised wrong exception") 167 168 def CheckExecuteTooMuchSql(self): 169 try: 170 self.cu.execute("select 5+4; select 4+5") 171 self.fail("should have raised a Warning") 172 except sqlite.Warning: 173 return 174 except: 175 self.fail("raised wrong exception") 176 177 def CheckExecuteTooMuchSql2(self): 178 self.cu.execute("select 5+4; -- foo bar") 179 180 def CheckExecuteTooMuchSql3(self): 181 self.cu.execute(""" 182 select 5+4; 183 184 /* 185 foo 186 */ 187 """) 188 189 def CheckExecuteWrongSqlArg(self): 190 try: 191 self.cu.execute(42) 192 self.fail("should have raised a ValueError") 193 except ValueError: 194 return 195 except: 196 self.fail("raised wrong exception.") 197 198 def CheckExecuteArgInt(self): 199 self.cu.execute("insert into test(id) values (?)", (42,)) 200 201 def CheckExecuteArgFloat(self): 202 self.cu.execute("insert into test(income) values (?)", (2500.32,)) 203 204 def CheckExecuteArgString(self): 205 self.cu.execute("insert into test(name) values (?)", ("Hugo",)) 206 207 def CheckExecuteArgStringWithZeroByte(self): 208 self.cu.execute("insert into test(name) values (?)", ("Hu\x00go",)) 209 210 self.cu.execute("select name from test where id=?", (self.cu.lastrowid,)) 211 row = self.cu.fetchone() 212 self.assertEqual(row[0], "Hu\x00go") 213 214 def CheckExecuteWrongNoOfArgs1(self): 215 # too many parameters 216 try: 217 self.cu.execute("insert into test(id) values (?)", (17, "Egon")) 218 self.fail("should have raised ProgrammingError") 219 except sqlite.ProgrammingError: 220 pass 221 222 def CheckExecuteWrongNoOfArgs2(self): 223 # too little parameters 224 try: 225 self.cu.execute("insert into test(id) values (?)") 226 self.fail("should have raised ProgrammingError") 227 except sqlite.ProgrammingError: 228 pass 229 230 def CheckExecuteWrongNoOfArgs3(self): 231 # no parameters, parameters are needed 232 try: 233 self.cu.execute("insert into test(id) values (?)") 234 self.fail("should have raised ProgrammingError") 235 except sqlite.ProgrammingError: 236 pass 237 238 def CheckExecuteParamList(self): 239 self.cu.execute("insert into test(name) values ('foo')") 240 self.cu.execute("select name from test where name=?", ["foo"]) 241 row = self.cu.fetchone() 242 self.assertEqual(row[0], "foo") 243 244 def CheckExecuteParamSequence(self): 245 class L(object): 246 def __len__(self): 247 return 1 248 def __getitem__(self, x): 249 assert x == 0 250 return "foo" 251 252 self.cu.execute("insert into test(name) values ('foo')") 253 self.cu.execute("select name from test where name=?", L()) 254 row = self.cu.fetchone() 255 self.assertEqual(row[0], "foo") 256 257 def CheckExecuteDictMapping(self): 258 self.cu.execute("insert into test(name) values ('foo')") 259 self.cu.execute("select name from test where name=:name", {"name": "foo"}) 260 row = self.cu.fetchone() 261 self.assertEqual(row[0], "foo") 262 263 def CheckExecuteDictMapping_Mapping(self): 264 # Test only works with Python 2.5 or later 265 if sys.version_info < (2, 5, 0): 266 return 267 268 class D(dict): 269 def __missing__(self, key): 270 return "foo" 271 272 self.cu.execute("insert into test(name) values ('foo')") 273 self.cu.execute("select name from test where name=:name", D()) 274 row = self.cu.fetchone() 275 self.assertEqual(row[0], "foo") 276 277 def CheckExecuteDictMappingTooLittleArgs(self): 278 self.cu.execute("insert into test(name) values ('foo')") 279 try: 280 self.cu.execute("select name from test where name=:name and id=:id", {"name": "foo"}) 281 self.fail("should have raised ProgrammingError") 282 except sqlite.ProgrammingError: 283 pass 284 285 def CheckExecuteDictMappingNoArgs(self): 286 self.cu.execute("insert into test(name) values ('foo')") 287 try: 288 self.cu.execute("select name from test where name=:name") 289 self.fail("should have raised ProgrammingError") 290 except sqlite.ProgrammingError: 291 pass 292 293 def CheckExecuteDictMappingUnnamed(self): 294 self.cu.execute("insert into test(name) values ('foo')") 295 try: 296 self.cu.execute("select name from test where name=?", {"name": "foo"}) 297 self.fail("should have raised ProgrammingError") 298 except sqlite.ProgrammingError: 299 pass 300 301 def CheckClose(self): 302 self.cu.close() 303 304 def CheckRowcountExecute(self): 305 self.cu.execute("delete from test") 306 self.cu.execute("insert into test(name) values ('foo')") 307 self.cu.execute("insert into test(name) values ('foo')") 308 self.cu.execute("update test set name='bar'") 309 self.assertEqual(self.cu.rowcount, 2) 310 311 def CheckRowcountSelect(self): 312 """ 313 pysqlite does not know the rowcount of SELECT statements, because we 314 don't fetch all rows after executing the select statement. The rowcount 315 has thus to be -1. 316 """ 317 self.cu.execute("select 5 union select 6") 318 self.assertEqual(self.cu.rowcount, -1) 319 320 def CheckRowcountExecutemany(self): 321 self.cu.execute("delete from test") 322 self.cu.executemany("insert into test(name) values (?)", [(1,), (2,), (3,)]) 323 self.assertEqual(self.cu.rowcount, 3) 324 325 def CheckTotalChanges(self): 326 self.cu.execute("insert into test(name) values ('foo')") 327 self.cu.execute("insert into test(name) values ('foo')") 328 if self.cx.total_changes < 2: 329 self.fail("total changes reported wrong value") 330 331 # Checks for executemany: 332 # Sequences are required by the DB-API, iterators 333 # enhancements in pysqlite. 334 335 def CheckExecuteManySequence(self): 336 self.cu.executemany("insert into test(income) values (?)", [(x,) for x in range(100, 110)]) 337 338 def CheckExecuteManyIterator(self): 339 class MyIter: 340 def __init__(self): 341 self.value = 5 342 343 def next(self): 344 if self.value == 10: 345 raise StopIteration 346 else: 347 self.value += 1 348 return (self.value,) 349 350 self.cu.executemany("insert into test(income) values (?)", MyIter()) 351 352 def CheckExecuteManyGenerator(self): 353 def mygen(): 354 for i in range(5): 355 yield (i,) 356 357 self.cu.executemany("insert into test(income) values (?)", mygen()) 358 359 def CheckExecuteManyWrongSqlArg(self): 360 try: 361 self.cu.executemany(42, [(3,)]) 362 self.fail("should have raised a ValueError") 363 except ValueError: 364 return 365 except: 366 self.fail("raised wrong exception.") 367 368 def CheckExecuteManySelect(self): 369 try: 370 self.cu.executemany("select ?", [(3,)]) 371 self.fail("should have raised a ProgrammingError") 372 except sqlite.ProgrammingError: 373 return 374 except: 375 self.fail("raised wrong exception.") 376 377 def CheckExecuteManyNotIterable(self): 378 try: 379 self.cu.executemany("insert into test(income) values (?)", 42) 380 self.fail("should have raised a TypeError") 381 except TypeError: 382 return 383 except Exception, e: 384 print "raised", e.__class__ 385 self.fail("raised wrong exception.") 386 387 def CheckFetchIter(self): 388 # Optional DB-API extension. 389 self.cu.execute("delete from test") 390 self.cu.execute("insert into test(id) values (?)", (5,)) 391 self.cu.execute("insert into test(id) values (?)", (6,)) 392 self.cu.execute("select id from test order by id") 393 lst = [] 394 for row in self.cu: 395 lst.append(row[0]) 396 self.assertEqual(lst[0], 5) 397 self.assertEqual(lst[1], 6) 398 399 def CheckFetchone(self): 400 self.cu.execute("select name from test") 401 row = self.cu.fetchone() 402 self.assertEqual(row[0], "foo") 403 row = self.cu.fetchone() 404 self.assertEqual(row, None) 405 406 def CheckFetchoneNoStatement(self): 407 cur = self.cx.cursor() 408 row = cur.fetchone() 409 self.assertEqual(row, None) 410 411 def CheckArraySize(self): 412 # must default ot 1 413 self.assertEqual(self.cu.arraysize, 1) 414 415 # now set to 2 416 self.cu.arraysize = 2 417 418 # now make the query return 3 rows 419 self.cu.execute("delete from test") 420 self.cu.execute("insert into test(name) values ('A')") 421 self.cu.execute("insert into test(name) values ('B')") 422 self.cu.execute("insert into test(name) values ('C')") 423 self.cu.execute("select name from test") 424 res = self.cu.fetchmany() 425 426 self.assertEqual(len(res), 2) 427 428 def CheckFetchmany(self): 429 self.cu.execute("select name from test") 430 res = self.cu.fetchmany(100) 431 self.assertEqual(len(res), 1) 432 res = self.cu.fetchmany(100) 433 self.assertEqual(res, []) 434 435 def CheckFetchmanyKwArg(self): 436 """Checks if fetchmany works with keyword arguments""" 437 self.cu.execute("select name from test") 438 res = self.cu.fetchmany(size=100) 439 self.assertEqual(len(res), 1) 440 441 def CheckFetchall(self): 442 self.cu.execute("select name from test") 443 res = self.cu.fetchall() 444 self.assertEqual(len(res), 1) 445 res = self.cu.fetchall() 446 self.assertEqual(res, []) 447 448 def CheckSetinputsizes(self): 449 self.cu.setinputsizes([3, 4, 5]) 450 451 def CheckSetoutputsize(self): 452 self.cu.setoutputsize(5, 0) 453 454 def CheckSetoutputsizeNoColumn(self): 455 self.cu.setoutputsize(42) 456 457 def CheckCursorConnection(self): 458 # Optional DB-API extension. 459 self.assertEqual(self.cu.connection, self.cx) 460 461 def CheckWrongCursorCallable(self): 462 try: 463 def f(): pass 464 cur = self.cx.cursor(f) 465 self.fail("should have raised a TypeError") 466 except TypeError: 467 return 468 self.fail("should have raised a ValueError") 469 470 def CheckCursorWrongClass(self): 471 class Foo: pass 472 foo = Foo() 473 try: 474 cur = sqlite.Cursor(foo) 475 self.fail("should have raised a ValueError") 476 except TypeError: 477 pass 478 479@unittest.skipUnless(threading, 'This test requires threading.') 480class ThreadTests(unittest.TestCase): 481 def setUp(self): 482 self.con = sqlite.connect(":memory:") 483 self.cur = self.con.cursor() 484 self.cur.execute("create table test(id integer primary key, name text, bin binary, ratio number, ts timestamp)") 485 486 def tearDown(self): 487 self.cur.close() 488 self.con.close() 489 490 def CheckConCursor(self): 491 def run(con, errors): 492 try: 493 cur = con.cursor() 494 errors.append("did not raise ProgrammingError") 495 return 496 except sqlite.ProgrammingError: 497 return 498 except: 499 errors.append("raised wrong exception") 500 501 errors = [] 502 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors}) 503 t.start() 504 t.join() 505 if len(errors) > 0: 506 self.fail("\n".join(errors)) 507 508 def CheckConCommit(self): 509 def run(con, errors): 510 try: 511 con.commit() 512 errors.append("did not raise ProgrammingError") 513 return 514 except sqlite.ProgrammingError: 515 return 516 except: 517 errors.append("raised wrong exception") 518 519 errors = [] 520 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors}) 521 t.start() 522 t.join() 523 if len(errors) > 0: 524 self.fail("\n".join(errors)) 525 526 def CheckConRollback(self): 527 def run(con, errors): 528 try: 529 con.rollback() 530 errors.append("did not raise ProgrammingError") 531 return 532 except sqlite.ProgrammingError: 533 return 534 except: 535 errors.append("raised wrong exception") 536 537 errors = [] 538 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors}) 539 t.start() 540 t.join() 541 if len(errors) > 0: 542 self.fail("\n".join(errors)) 543 544 def CheckConClose(self): 545 def run(con, errors): 546 try: 547 con.close() 548 errors.append("did not raise ProgrammingError") 549 return 550 except sqlite.ProgrammingError: 551 return 552 except: 553 errors.append("raised wrong exception") 554 555 errors = [] 556 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors}) 557 t.start() 558 t.join() 559 if len(errors) > 0: 560 self.fail("\n".join(errors)) 561 562 def CheckCurImplicitBegin(self): 563 def run(cur, errors): 564 try: 565 cur.execute("insert into test(name) values ('a')") 566 errors.append("did not raise ProgrammingError") 567 return 568 except sqlite.ProgrammingError: 569 return 570 except: 571 errors.append("raised wrong exception") 572 573 errors = [] 574 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors}) 575 t.start() 576 t.join() 577 if len(errors) > 0: 578 self.fail("\n".join(errors)) 579 580 def CheckCurClose(self): 581 def run(cur, errors): 582 try: 583 cur.close() 584 errors.append("did not raise ProgrammingError") 585 return 586 except sqlite.ProgrammingError: 587 return 588 except: 589 errors.append("raised wrong exception") 590 591 errors = [] 592 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors}) 593 t.start() 594 t.join() 595 if len(errors) > 0: 596 self.fail("\n".join(errors)) 597 598 def CheckCurExecute(self): 599 def run(cur, errors): 600 try: 601 cur.execute("select name from test") 602 errors.append("did not raise ProgrammingError") 603 return 604 except sqlite.ProgrammingError: 605 return 606 except: 607 errors.append("raised wrong exception") 608 609 errors = [] 610 self.cur.execute("insert into test(name) values ('a')") 611 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors}) 612 t.start() 613 t.join() 614 if len(errors) > 0: 615 self.fail("\n".join(errors)) 616 617 def CheckCurIterNext(self): 618 def run(cur, errors): 619 try: 620 row = cur.fetchone() 621 errors.append("did not raise ProgrammingError") 622 return 623 except sqlite.ProgrammingError: 624 return 625 except: 626 errors.append("raised wrong exception") 627 628 errors = [] 629 self.cur.execute("insert into test(name) values ('a')") 630 self.cur.execute("select name from test") 631 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors}) 632 t.start() 633 t.join() 634 if len(errors) > 0: 635 self.fail("\n".join(errors)) 636 637class ConstructorTests(unittest.TestCase): 638 def CheckDate(self): 639 d = sqlite.Date(2004, 10, 28) 640 641 def CheckTime(self): 642 t = sqlite.Time(12, 39, 35) 643 644 def CheckTimestamp(self): 645 ts = sqlite.Timestamp(2004, 10, 28, 12, 39, 35) 646 647 def CheckDateFromTicks(self): 648 d = sqlite.DateFromTicks(42) 649 650 def CheckTimeFromTicks(self): 651 t = sqlite.TimeFromTicks(42) 652 653 def CheckTimestampFromTicks(self): 654 ts = sqlite.TimestampFromTicks(42) 655 656 def CheckBinary(self): 657 with test_support.check_py3k_warnings(): 658 b = sqlite.Binary(chr(0) + "'") 659 660class ExtensionTests(unittest.TestCase): 661 def CheckScriptStringSql(self): 662 con = sqlite.connect(":memory:") 663 cur = con.cursor() 664 cur.executescript(""" 665 -- bla bla 666 /* a stupid comment */ 667 create table a(i); 668 insert into a(i) values (5); 669 """) 670 cur.execute("select i from a") 671 res = cur.fetchone()[0] 672 self.assertEqual(res, 5) 673 674 def CheckScriptStringUnicode(self): 675 con = sqlite.connect(":memory:") 676 cur = con.cursor() 677 cur.executescript(u""" 678 create table a(i); 679 insert into a(i) values (5); 680 select i from a; 681 delete from a; 682 insert into a(i) values (6); 683 """) 684 cur.execute("select i from a") 685 res = cur.fetchone()[0] 686 self.assertEqual(res, 6) 687 688 def CheckScriptSyntaxError(self): 689 con = sqlite.connect(":memory:") 690 cur = con.cursor() 691 raised = False 692 try: 693 cur.executescript("create table test(x); asdf; create table test2(x)") 694 except sqlite.OperationalError: 695 raised = True 696 self.assertEqual(raised, True, "should have raised an exception") 697 698 def CheckScriptErrorNormal(self): 699 con = sqlite.connect(":memory:") 700 cur = con.cursor() 701 raised = False 702 try: 703 cur.executescript("create table test(sadfsadfdsa); select foo from hurz;") 704 except sqlite.OperationalError: 705 raised = True 706 self.assertEqual(raised, True, "should have raised an exception") 707 708 def CheckConnectionExecute(self): 709 con = sqlite.connect(":memory:") 710 result = con.execute("select 5").fetchone()[0] 711 self.assertEqual(result, 5, "Basic test of Connection.execute") 712 713 def CheckConnectionExecutemany(self): 714 con = sqlite.connect(":memory:") 715 con.execute("create table test(foo)") 716 con.executemany("insert into test(foo) values (?)", [(3,), (4,)]) 717 result = con.execute("select foo from test order by foo").fetchall() 718 self.assertEqual(result[0][0], 3, "Basic test of Connection.executemany") 719 self.assertEqual(result[1][0], 4, "Basic test of Connection.executemany") 720 721 def CheckConnectionExecutescript(self): 722 con = sqlite.connect(":memory:") 723 con.executescript("create table test(foo); insert into test(foo) values (5);") 724 result = con.execute("select foo from test").fetchone()[0] 725 self.assertEqual(result, 5, "Basic test of Connection.executescript") 726 727class ClosedConTests(unittest.TestCase): 728 def setUp(self): 729 pass 730 731 def tearDown(self): 732 pass 733 734 def CheckClosedConCursor(self): 735 con = sqlite.connect(":memory:") 736 con.close() 737 try: 738 cur = con.cursor() 739 self.fail("Should have raised a ProgrammingError") 740 except sqlite.ProgrammingError: 741 pass 742 except: 743 self.fail("Should have raised a ProgrammingError") 744 745 def CheckClosedConCommit(self): 746 con = sqlite.connect(":memory:") 747 con.close() 748 try: 749 con.commit() 750 self.fail("Should have raised a ProgrammingError") 751 except sqlite.ProgrammingError: 752 pass 753 except: 754 self.fail("Should have raised a ProgrammingError") 755 756 def CheckClosedConRollback(self): 757 con = sqlite.connect(":memory:") 758 con.close() 759 try: 760 con.rollback() 761 self.fail("Should have raised a ProgrammingError") 762 except sqlite.ProgrammingError: 763 pass 764 except: 765 self.fail("Should have raised a ProgrammingError") 766 767 def CheckClosedCurExecute(self): 768 con = sqlite.connect(":memory:") 769 cur = con.cursor() 770 con.close() 771 try: 772 cur.execute("select 4") 773 self.fail("Should have raised a ProgrammingError") 774 except sqlite.ProgrammingError: 775 pass 776 except: 777 self.fail("Should have raised a ProgrammingError") 778 779 def CheckClosedCreateFunction(self): 780 con = sqlite.connect(":memory:") 781 con.close() 782 def f(x): return 17 783 try: 784 con.create_function("foo", 1, f) 785 self.fail("Should have raised a ProgrammingError") 786 except sqlite.ProgrammingError: 787 pass 788 except: 789 self.fail("Should have raised a ProgrammingError") 790 791 def CheckClosedCreateAggregate(self): 792 con = sqlite.connect(":memory:") 793 con.close() 794 class Agg: 795 def __init__(self): 796 pass 797 def step(self, x): 798 pass 799 def finalize(self): 800 return 17 801 try: 802 con.create_aggregate("foo", 1, Agg) 803 self.fail("Should have raised a ProgrammingError") 804 except sqlite.ProgrammingError: 805 pass 806 except: 807 self.fail("Should have raised a ProgrammingError") 808 809 def CheckClosedSetAuthorizer(self): 810 con = sqlite.connect(":memory:") 811 con.close() 812 def authorizer(*args): 813 return sqlite.DENY 814 try: 815 con.set_authorizer(authorizer) 816 self.fail("Should have raised a ProgrammingError") 817 except sqlite.ProgrammingError: 818 pass 819 except: 820 self.fail("Should have raised a ProgrammingError") 821 822 def CheckClosedSetProgressCallback(self): 823 con = sqlite.connect(":memory:") 824 con.close() 825 def progress(): pass 826 try: 827 con.set_progress_handler(progress, 100) 828 self.fail("Should have raised a ProgrammingError") 829 except sqlite.ProgrammingError: 830 pass 831 except: 832 self.fail("Should have raised a ProgrammingError") 833 834 def CheckClosedCall(self): 835 con = sqlite.connect(":memory:") 836 con.close() 837 try: 838 con() 839 self.fail("Should have raised a ProgrammingError") 840 except sqlite.ProgrammingError: 841 pass 842 except: 843 self.fail("Should have raised a ProgrammingError") 844 845class ClosedCurTests(unittest.TestCase): 846 def setUp(self): 847 pass 848 849 def tearDown(self): 850 pass 851 852 def CheckClosed(self): 853 con = sqlite.connect(":memory:") 854 cur = con.cursor() 855 cur.close() 856 857 for method_name in ("execute", "executemany", "executescript", "fetchall", "fetchmany", "fetchone"): 858 if method_name in ("execute", "executescript"): 859 params = ("select 4 union select 5",) 860 elif method_name == "executemany": 861 params = ("insert into foo(bar) values (?)", [(3,), (4,)]) 862 else: 863 params = [] 864 865 try: 866 method = getattr(cur, method_name) 867 868 method(*params) 869 self.fail("Should have raised a ProgrammingError: method " + method_name) 870 except sqlite.ProgrammingError: 871 pass 872 except: 873 self.fail("Should have raised a ProgrammingError: " + method_name) 874 875def suite(): 876 module_suite = unittest.makeSuite(ModuleTests, "Check") 877 connection_suite = unittest.makeSuite(ConnectionTests, "Check") 878 cursor_suite = unittest.makeSuite(CursorTests, "Check") 879 thread_suite = unittest.makeSuite(ThreadTests, "Check") 880 constructor_suite = unittest.makeSuite(ConstructorTests, "Check") 881 ext_suite = unittest.makeSuite(ExtensionTests, "Check") 882 closed_con_suite = unittest.makeSuite(ClosedConTests, "Check") 883 closed_cur_suite = unittest.makeSuite(ClosedCurTests, "Check") 884 return unittest.TestSuite((module_suite, connection_suite, cursor_suite, thread_suite, constructor_suite, ext_suite, closed_con_suite, closed_cur_suite)) 885 886def test(): 887 runner = unittest.TextTestRunner() 888 runner.run(suite()) 889 890if __name__ == "__main__": 891 test() 892