1# pysqlite2/test/dbapi.py: tests for DB-API compliance 2# 3# Copyright (C) 2004-2010 Gerhard Häring <gh@ghaering.de> 4# 5# This file is part of pysqlite. 6# 7# This software is provided 'as-is', without any express or implied 8# warranty. In no event will the authors be held liable for any damages 9# arising from the use of this software. 10# 11# Permission is granted to anyone to use this software for any purpose, 12# including commercial applications, and to alter it and redistribute it 13# freely, subject to the following restrictions: 14# 15# 1. The origin of this software must not be misrepresented; you must not 16# claim that you wrote the original software. If you use this software 17# in a product, an acknowledgment in the product documentation would be 18# appreciated but is not required. 19# 2. Altered source versions must be plainly marked as such, and must not be 20# misrepresented as being the original software. 21# 3. This notice may not be removed or altered from any source distribution. 22 23import subprocess 24import threading 25import unittest 26import sqlite3 as sqlite 27import sys 28 29from test.support import check_disallow_instantiation, SHORT_TIMEOUT 30from test.support.os_helper import TESTFN, unlink 31 32 33class ModuleTests(unittest.TestCase): 34 def test_api_level(self): 35 self.assertEqual(sqlite.apilevel, "2.0", 36 "apilevel is %s, should be 2.0" % sqlite.apilevel) 37 38 def test_thread_safety(self): 39 self.assertEqual(sqlite.threadsafety, 1, 40 "threadsafety is %d, should be 1" % sqlite.threadsafety) 41 42 def test_param_style(self): 43 self.assertEqual(sqlite.paramstyle, "qmark", 44 "paramstyle is '%s', should be 'qmark'" % 45 sqlite.paramstyle) 46 47 def test_warning(self): 48 self.assertTrue(issubclass(sqlite.Warning, Exception), 49 "Warning is not a subclass of Exception") 50 51 def test_error(self): 52 self.assertTrue(issubclass(sqlite.Error, Exception), 53 "Error is not a subclass of Exception") 54 55 def test_interface_error(self): 56 self.assertTrue(issubclass(sqlite.InterfaceError, sqlite.Error), 57 "InterfaceError is not a subclass of Error") 58 59 def test_database_error(self): 60 self.assertTrue(issubclass(sqlite.DatabaseError, sqlite.Error), 61 "DatabaseError is not a subclass of Error") 62 63 def test_data_error(self): 64 self.assertTrue(issubclass(sqlite.DataError, sqlite.DatabaseError), 65 "DataError is not a subclass of DatabaseError") 66 67 def test_operational_error(self): 68 self.assertTrue(issubclass(sqlite.OperationalError, sqlite.DatabaseError), 69 "OperationalError is not a subclass of DatabaseError") 70 71 def test_integrity_error(self): 72 self.assertTrue(issubclass(sqlite.IntegrityError, sqlite.DatabaseError), 73 "IntegrityError is not a subclass of DatabaseError") 74 75 def test_internal_error(self): 76 self.assertTrue(issubclass(sqlite.InternalError, sqlite.DatabaseError), 77 "InternalError is not a subclass of DatabaseError") 78 79 def test_programming_error(self): 80 self.assertTrue(issubclass(sqlite.ProgrammingError, sqlite.DatabaseError), 81 "ProgrammingError is not a subclass of DatabaseError") 82 83 def test_not_supported_error(self): 84 self.assertTrue(issubclass(sqlite.NotSupportedError, 85 sqlite.DatabaseError), 86 "NotSupportedError is not a subclass of DatabaseError") 87 88 # sqlite3_enable_shared_cache() is deprecated on macOS and calling it may raise 89 # OperationalError on some buildbots. 90 @unittest.skipIf(sys.platform == "darwin", "shared cache is deprecated on macOS") 91 def test_shared_cache_deprecated(self): 92 for enable in (True, False): 93 with self.assertWarns(DeprecationWarning) as cm: 94 sqlite.enable_shared_cache(enable) 95 self.assertIn("dbapi.py", cm.filename) 96 97 def test_disallow_instantiation(self): 98 cx = sqlite.connect(":memory:") 99 check_disallow_instantiation(self, type(cx("select 1"))) 100 101 102class ConnectionTests(unittest.TestCase): 103 104 def setUp(self): 105 self.cx = sqlite.connect(":memory:") 106 cu = self.cx.cursor() 107 cu.execute("create table test(id integer primary key, name text)") 108 cu.execute("insert into test(name) values (?)", ("foo",)) 109 110 def tearDown(self): 111 self.cx.close() 112 113 def test_commit(self): 114 self.cx.commit() 115 116 def test_commit_after_no_changes(self): 117 """ 118 A commit should also work when no changes were made to the database. 119 """ 120 self.cx.commit() 121 self.cx.commit() 122 123 def test_rollback(self): 124 self.cx.rollback() 125 126 def test_rollback_after_no_changes(self): 127 """ 128 A rollback should also work when no changes were made to the database. 129 """ 130 self.cx.rollback() 131 self.cx.rollback() 132 133 def test_cursor(self): 134 cu = self.cx.cursor() 135 136 def test_failed_open(self): 137 YOU_CANNOT_OPEN_THIS = "/foo/bar/bla/23534/mydb.db" 138 with self.assertRaises(sqlite.OperationalError): 139 con = sqlite.connect(YOU_CANNOT_OPEN_THIS) 140 141 def test_close(self): 142 self.cx.close() 143 144 def test_exceptions(self): 145 # Optional DB-API extension. 146 self.assertEqual(self.cx.Warning, sqlite.Warning) 147 self.assertEqual(self.cx.Error, sqlite.Error) 148 self.assertEqual(self.cx.InterfaceError, sqlite.InterfaceError) 149 self.assertEqual(self.cx.DatabaseError, sqlite.DatabaseError) 150 self.assertEqual(self.cx.DataError, sqlite.DataError) 151 self.assertEqual(self.cx.OperationalError, sqlite.OperationalError) 152 self.assertEqual(self.cx.IntegrityError, sqlite.IntegrityError) 153 self.assertEqual(self.cx.InternalError, sqlite.InternalError) 154 self.assertEqual(self.cx.ProgrammingError, sqlite.ProgrammingError) 155 self.assertEqual(self.cx.NotSupportedError, sqlite.NotSupportedError) 156 157 def test_in_transaction(self): 158 # Can't use db from setUp because we want to test initial state. 159 cx = sqlite.connect(":memory:") 160 cu = cx.cursor() 161 self.assertEqual(cx.in_transaction, False) 162 cu.execute("create table transactiontest(id integer primary key, name text)") 163 self.assertEqual(cx.in_transaction, False) 164 cu.execute("insert into transactiontest(name) values (?)", ("foo",)) 165 self.assertEqual(cx.in_transaction, True) 166 cu.execute("select name from transactiontest where name=?", ["foo"]) 167 row = cu.fetchone() 168 self.assertEqual(cx.in_transaction, True) 169 cx.commit() 170 self.assertEqual(cx.in_transaction, False) 171 cu.execute("select name from transactiontest where name=?", ["foo"]) 172 row = cu.fetchone() 173 self.assertEqual(cx.in_transaction, False) 174 175 def test_in_transaction_ro(self): 176 with self.assertRaises(AttributeError): 177 self.cx.in_transaction = True 178 179 def test_open_with_path_like_object(self): 180 """ Checks that we can successfully connect to a database using an object that 181 is PathLike, i.e. has __fspath__(). """ 182 self.addCleanup(unlink, TESTFN) 183 class Path: 184 def __fspath__(self): 185 return TESTFN 186 path = Path() 187 with sqlite.connect(path) as cx: 188 cx.execute('create table test(id integer)') 189 190 def test_open_uri(self): 191 self.addCleanup(unlink, TESTFN) 192 with sqlite.connect(TESTFN) as cx: 193 cx.execute('create table test(id integer)') 194 with sqlite.connect('file:' + TESTFN, uri=True) as cx: 195 cx.execute('insert into test(id) values(0)') 196 with sqlite.connect('file:' + TESTFN + '?mode=ro', uri=True) as cx: 197 with self.assertRaises(sqlite.OperationalError): 198 cx.execute('insert into test(id) values(1)') 199 200 201class UninitialisedConnectionTests(unittest.TestCase): 202 def setUp(self): 203 self.cx = sqlite.Connection.__new__(sqlite.Connection) 204 205 def test_uninit_operations(self): 206 funcs = ( 207 lambda: self.cx.isolation_level, 208 lambda: self.cx.total_changes, 209 lambda: self.cx.in_transaction, 210 lambda: self.cx.iterdump(), 211 lambda: self.cx.cursor(), 212 lambda: self.cx.close(), 213 ) 214 for func in funcs: 215 with self.subTest(func=func): 216 self.assertRaisesRegex(sqlite.ProgrammingError, 217 "Base Connection.__init__ not called", 218 func) 219 220 221class CursorTests(unittest.TestCase): 222 def setUp(self): 223 self.cx = sqlite.connect(":memory:") 224 self.cu = self.cx.cursor() 225 self.cu.execute( 226 "create table test(id integer primary key, name text, " 227 "income number, unique_test text unique)" 228 ) 229 self.cu.execute("insert into test(name) values (?)", ("foo",)) 230 231 def tearDown(self): 232 self.cu.close() 233 self.cx.close() 234 235 def test_execute_no_args(self): 236 self.cu.execute("delete from test") 237 238 def test_execute_illegal_sql(self): 239 with self.assertRaises(sqlite.OperationalError): 240 self.cu.execute("select asdf") 241 242 def test_execute_too_much_sql(self): 243 with self.assertRaises(sqlite.Warning): 244 self.cu.execute("select 5+4; select 4+5") 245 246 def test_execute_too_much_sql2(self): 247 self.cu.execute("select 5+4; -- foo bar") 248 249 def test_execute_too_much_sql3(self): 250 self.cu.execute(""" 251 select 5+4; 252 253 /* 254 foo 255 */ 256 """) 257 258 def test_execute_wrong_sql_arg(self): 259 with self.assertRaises(TypeError): 260 self.cu.execute(42) 261 262 def test_execute_arg_int(self): 263 self.cu.execute("insert into test(id) values (?)", (42,)) 264 265 def test_execute_arg_float(self): 266 self.cu.execute("insert into test(income) values (?)", (2500.32,)) 267 268 def test_execute_arg_string(self): 269 self.cu.execute("insert into test(name) values (?)", ("Hugo",)) 270 271 def test_execute_arg_string_with_zero_byte(self): 272 self.cu.execute("insert into test(name) values (?)", ("Hu\x00go",)) 273 274 self.cu.execute("select name from test where id=?", (self.cu.lastrowid,)) 275 row = self.cu.fetchone() 276 self.assertEqual(row[0], "Hu\x00go") 277 278 def test_execute_non_iterable(self): 279 with self.assertRaises(ValueError) as cm: 280 self.cu.execute("insert into test(id) values (?)", 42) 281 self.assertEqual(str(cm.exception), 'parameters are of unsupported type') 282 283 def test_execute_wrong_no_of_args1(self): 284 # too many parameters 285 with self.assertRaises(sqlite.ProgrammingError): 286 self.cu.execute("insert into test(id) values (?)", (17, "Egon")) 287 288 def test_execute_wrong_no_of_args2(self): 289 # too little parameters 290 with self.assertRaises(sqlite.ProgrammingError): 291 self.cu.execute("insert into test(id) values (?)") 292 293 def test_execute_wrong_no_of_args3(self): 294 # no parameters, parameters are needed 295 with self.assertRaises(sqlite.ProgrammingError): 296 self.cu.execute("insert into test(id) values (?)") 297 298 def test_execute_param_list(self): 299 self.cu.execute("insert into test(name) values ('foo')") 300 self.cu.execute("select name from test where name=?", ["foo"]) 301 row = self.cu.fetchone() 302 self.assertEqual(row[0], "foo") 303 304 def test_execute_param_sequence(self): 305 class L: 306 def __len__(self): 307 return 1 308 def __getitem__(self, x): 309 assert x == 0 310 return "foo" 311 312 self.cu.execute("insert into test(name) values ('foo')") 313 self.cu.execute("select name from test where name=?", L()) 314 row = self.cu.fetchone() 315 self.assertEqual(row[0], "foo") 316 317 def test_execute_param_sequence_bad_len(self): 318 # Issue41662: Error in __len__() was overridden with ProgrammingError. 319 class L: 320 def __len__(self): 321 1/0 322 def __getitem__(slf, x): 323 raise AssertionError 324 325 self.cu.execute("insert into test(name) values ('foo')") 326 with self.assertRaises(ZeroDivisionError): 327 self.cu.execute("select name from test where name=?", L()) 328 329 def test_execute_dict_mapping(self): 330 self.cu.execute("insert into test(name) values ('foo')") 331 self.cu.execute("select name from test where name=:name", {"name": "foo"}) 332 row = self.cu.fetchone() 333 self.assertEqual(row[0], "foo") 334 335 def test_execute_dict_mapping_mapping(self): 336 class D(dict): 337 def __missing__(self, key): 338 return "foo" 339 340 self.cu.execute("insert into test(name) values ('foo')") 341 self.cu.execute("select name from test where name=:name", D()) 342 row = self.cu.fetchone() 343 self.assertEqual(row[0], "foo") 344 345 def test_execute_dict_mapping_too_little_args(self): 346 self.cu.execute("insert into test(name) values ('foo')") 347 with self.assertRaises(sqlite.ProgrammingError): 348 self.cu.execute("select name from test where name=:name and id=:id", {"name": "foo"}) 349 350 def test_execute_dict_mapping_no_args(self): 351 self.cu.execute("insert into test(name) values ('foo')") 352 with self.assertRaises(sqlite.ProgrammingError): 353 self.cu.execute("select name from test where name=:name") 354 355 def test_execute_dict_mapping_unnamed(self): 356 self.cu.execute("insert into test(name) values ('foo')") 357 with self.assertRaises(sqlite.ProgrammingError): 358 self.cu.execute("select name from test where name=?", {"name": "foo"}) 359 360 def test_close(self): 361 self.cu.close() 362 363 def test_rowcount_execute(self): 364 self.cu.execute("delete from test") 365 self.cu.execute("insert into test(name) values ('foo')") 366 self.cu.execute("insert into test(name) values ('foo')") 367 self.cu.execute("update test set name='bar'") 368 self.assertEqual(self.cu.rowcount, 2) 369 370 def test_rowcount_select(self): 371 """ 372 pysqlite does not know the rowcount of SELECT statements, because we 373 don't fetch all rows after executing the select statement. The rowcount 374 has thus to be -1. 375 """ 376 self.cu.execute("select 5 union select 6") 377 self.assertEqual(self.cu.rowcount, -1) 378 379 def test_rowcount_executemany(self): 380 self.cu.execute("delete from test") 381 self.cu.executemany("insert into test(name) values (?)", [(1,), (2,), (3,)]) 382 self.assertEqual(self.cu.rowcount, 3) 383 384 def test_total_changes(self): 385 self.cu.execute("insert into test(name) values ('foo')") 386 self.cu.execute("insert into test(name) values ('foo')") 387 self.assertLess(2, self.cx.total_changes, msg='total changes reported wrong value') 388 389 # Checks for executemany: 390 # Sequences are required by the DB-API, iterators 391 # enhancements in pysqlite. 392 393 def test_execute_many_sequence(self): 394 self.cu.executemany("insert into test(income) values (?)", [(x,) for x in range(100, 110)]) 395 396 def test_execute_many_iterator(self): 397 class MyIter: 398 def __init__(self): 399 self.value = 5 400 401 def __iter__(self): 402 return self 403 404 def __next__(self): 405 if self.value == 10: 406 raise StopIteration 407 else: 408 self.value += 1 409 return (self.value,) 410 411 self.cu.executemany("insert into test(income) values (?)", MyIter()) 412 413 def test_execute_many_generator(self): 414 def mygen(): 415 for i in range(5): 416 yield (i,) 417 418 self.cu.executemany("insert into test(income) values (?)", mygen()) 419 420 def test_execute_many_wrong_sql_arg(self): 421 with self.assertRaises(TypeError): 422 self.cu.executemany(42, [(3,)]) 423 424 def test_execute_many_select(self): 425 with self.assertRaises(sqlite.ProgrammingError): 426 self.cu.executemany("select ?", [(3,)]) 427 428 def test_execute_many_not_iterable(self): 429 with self.assertRaises(TypeError): 430 self.cu.executemany("insert into test(income) values (?)", 42) 431 432 def test_fetch_iter(self): 433 # Optional DB-API extension. 434 self.cu.execute("delete from test") 435 self.cu.execute("insert into test(id) values (?)", (5,)) 436 self.cu.execute("insert into test(id) values (?)", (6,)) 437 self.cu.execute("select id from test order by id") 438 lst = [] 439 for row in self.cu: 440 lst.append(row[0]) 441 self.assertEqual(lst[0], 5) 442 self.assertEqual(lst[1], 6) 443 444 def test_fetchone(self): 445 self.cu.execute("select name from test") 446 row = self.cu.fetchone() 447 self.assertEqual(row[0], "foo") 448 row = self.cu.fetchone() 449 self.assertEqual(row, None) 450 451 def test_fetchone_no_statement(self): 452 cur = self.cx.cursor() 453 row = cur.fetchone() 454 self.assertEqual(row, None) 455 456 def test_array_size(self): 457 # must default to 1 458 self.assertEqual(self.cu.arraysize, 1) 459 460 # now set to 2 461 self.cu.arraysize = 2 462 463 # now make the query return 3 rows 464 self.cu.execute("delete from test") 465 self.cu.execute("insert into test(name) values ('A')") 466 self.cu.execute("insert into test(name) values ('B')") 467 self.cu.execute("insert into test(name) values ('C')") 468 self.cu.execute("select name from test") 469 res = self.cu.fetchmany() 470 471 self.assertEqual(len(res), 2) 472 473 def test_fetchmany(self): 474 self.cu.execute("select name from test") 475 res = self.cu.fetchmany(100) 476 self.assertEqual(len(res), 1) 477 res = self.cu.fetchmany(100) 478 self.assertEqual(res, []) 479 480 def test_fetchmany_kw_arg(self): 481 """Checks if fetchmany works with keyword arguments""" 482 self.cu.execute("select name from test") 483 res = self.cu.fetchmany(size=100) 484 self.assertEqual(len(res), 1) 485 486 def test_fetchall(self): 487 self.cu.execute("select name from test") 488 res = self.cu.fetchall() 489 self.assertEqual(len(res), 1) 490 res = self.cu.fetchall() 491 self.assertEqual(res, []) 492 493 def test_setinputsizes(self): 494 self.cu.setinputsizes([3, 4, 5]) 495 496 def test_setoutputsize(self): 497 self.cu.setoutputsize(5, 0) 498 499 def test_setoutputsize_no_column(self): 500 self.cu.setoutputsize(42) 501 502 def test_cursor_connection(self): 503 # Optional DB-API extension. 504 self.assertEqual(self.cu.connection, self.cx) 505 506 def test_wrong_cursor_callable(self): 507 with self.assertRaises(TypeError): 508 def f(): pass 509 cur = self.cx.cursor(f) 510 511 def test_cursor_wrong_class(self): 512 class Foo: pass 513 foo = Foo() 514 with self.assertRaises(TypeError): 515 cur = sqlite.Cursor(foo) 516 517 def test_last_row_id_on_replace(self): 518 """ 519 INSERT OR REPLACE and REPLACE INTO should produce the same behavior. 520 """ 521 sql = '{} INTO test(id, unique_test) VALUES (?, ?)' 522 for statement in ('INSERT OR REPLACE', 'REPLACE'): 523 with self.subTest(statement=statement): 524 self.cu.execute(sql.format(statement), (1, 'foo')) 525 self.assertEqual(self.cu.lastrowid, 1) 526 527 def test_last_row_id_on_ignore(self): 528 self.cu.execute( 529 "insert or ignore into test(unique_test) values (?)", 530 ('test',)) 531 self.assertEqual(self.cu.lastrowid, 2) 532 self.cu.execute( 533 "insert or ignore into test(unique_test) values (?)", 534 ('test',)) 535 self.assertEqual(self.cu.lastrowid, 2) 536 537 def test_last_row_id_insert_o_r(self): 538 results = [] 539 for statement in ('FAIL', 'ABORT', 'ROLLBACK'): 540 sql = 'INSERT OR {} INTO test(unique_test) VALUES (?)' 541 with self.subTest(statement='INSERT OR {}'.format(statement)): 542 self.cu.execute(sql.format(statement), (statement,)) 543 results.append((statement, self.cu.lastrowid)) 544 with self.assertRaises(sqlite.IntegrityError): 545 self.cu.execute(sql.format(statement), (statement,)) 546 results.append((statement, self.cu.lastrowid)) 547 expected = [ 548 ('FAIL', 2), ('FAIL', 2), 549 ('ABORT', 3), ('ABORT', 3), 550 ('ROLLBACK', 4), ('ROLLBACK', 4), 551 ] 552 self.assertEqual(results, expected) 553 554 555class ThreadTests(unittest.TestCase): 556 def setUp(self): 557 self.con = sqlite.connect(":memory:") 558 self.cur = self.con.cursor() 559 self.cur.execute("create table test(id integer primary key, name text, bin binary, ratio number, ts timestamp)") 560 561 def tearDown(self): 562 self.cur.close() 563 self.con.close() 564 565 def test_con_cursor(self): 566 def run(con, errors): 567 try: 568 cur = con.cursor() 569 errors.append("did not raise ProgrammingError") 570 return 571 except sqlite.ProgrammingError: 572 return 573 except: 574 errors.append("raised wrong exception") 575 576 errors = [] 577 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors}) 578 t.start() 579 t.join() 580 if len(errors) > 0: 581 self.fail("\n".join(errors)) 582 583 def test_con_commit(self): 584 def run(con, errors): 585 try: 586 con.commit() 587 errors.append("did not raise ProgrammingError") 588 return 589 except sqlite.ProgrammingError: 590 return 591 except: 592 errors.append("raised wrong exception") 593 594 errors = [] 595 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors}) 596 t.start() 597 t.join() 598 if len(errors) > 0: 599 self.fail("\n".join(errors)) 600 601 def test_con_rollback(self): 602 def run(con, errors): 603 try: 604 con.rollback() 605 errors.append("did not raise ProgrammingError") 606 return 607 except sqlite.ProgrammingError: 608 return 609 except: 610 errors.append("raised wrong exception") 611 612 errors = [] 613 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors}) 614 t.start() 615 t.join() 616 if len(errors) > 0: 617 self.fail("\n".join(errors)) 618 619 def test_con_close(self): 620 def run(con, errors): 621 try: 622 con.close() 623 errors.append("did not raise ProgrammingError") 624 return 625 except sqlite.ProgrammingError: 626 return 627 except: 628 errors.append("raised wrong exception") 629 630 errors = [] 631 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors}) 632 t.start() 633 t.join() 634 if len(errors) > 0: 635 self.fail("\n".join(errors)) 636 637 def test_cur_implicit_begin(self): 638 def run(cur, errors): 639 try: 640 cur.execute("insert into test(name) values ('a')") 641 errors.append("did not raise ProgrammingError") 642 return 643 except sqlite.ProgrammingError: 644 return 645 except: 646 errors.append("raised wrong exception") 647 648 errors = [] 649 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors}) 650 t.start() 651 t.join() 652 if len(errors) > 0: 653 self.fail("\n".join(errors)) 654 655 def test_cur_close(self): 656 def run(cur, errors): 657 try: 658 cur.close() 659 errors.append("did not raise ProgrammingError") 660 return 661 except sqlite.ProgrammingError: 662 return 663 except: 664 errors.append("raised wrong exception") 665 666 errors = [] 667 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors}) 668 t.start() 669 t.join() 670 if len(errors) > 0: 671 self.fail("\n".join(errors)) 672 673 def test_cur_execute(self): 674 def run(cur, errors): 675 try: 676 cur.execute("select name from test") 677 errors.append("did not raise ProgrammingError") 678 return 679 except sqlite.ProgrammingError: 680 return 681 except: 682 errors.append("raised wrong exception") 683 684 errors = [] 685 self.cur.execute("insert into test(name) values ('a')") 686 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors}) 687 t.start() 688 t.join() 689 if len(errors) > 0: 690 self.fail("\n".join(errors)) 691 692 def test_cur_iter_next(self): 693 def run(cur, errors): 694 try: 695 row = cur.fetchone() 696 errors.append("did not raise ProgrammingError") 697 return 698 except sqlite.ProgrammingError: 699 return 700 except: 701 errors.append("raised wrong exception") 702 703 errors = [] 704 self.cur.execute("insert into test(name) values ('a')") 705 self.cur.execute("select name from test") 706 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors}) 707 t.start() 708 t.join() 709 if len(errors) > 0: 710 self.fail("\n".join(errors)) 711 712class ConstructorTests(unittest.TestCase): 713 def test_date(self): 714 d = sqlite.Date(2004, 10, 28) 715 716 def test_time(self): 717 t = sqlite.Time(12, 39, 35) 718 719 def test_timestamp(self): 720 ts = sqlite.Timestamp(2004, 10, 28, 12, 39, 35) 721 722 def test_date_from_ticks(self): 723 d = sqlite.DateFromTicks(42) 724 725 def test_time_from_ticks(self): 726 t = sqlite.TimeFromTicks(42) 727 728 def test_timestamp_from_ticks(self): 729 ts = sqlite.TimestampFromTicks(42) 730 731 def test_binary(self): 732 b = sqlite.Binary(b"\0'") 733 734class ExtensionTests(unittest.TestCase): 735 def test_script_string_sql(self): 736 con = sqlite.connect(":memory:") 737 cur = con.cursor() 738 cur.executescript(""" 739 -- bla bla 740 /* a stupid comment */ 741 create table a(i); 742 insert into a(i) values (5); 743 """) 744 cur.execute("select i from a") 745 res = cur.fetchone()[0] 746 self.assertEqual(res, 5) 747 748 def test_script_syntax_error(self): 749 con = sqlite.connect(":memory:") 750 cur = con.cursor() 751 with self.assertRaises(sqlite.OperationalError): 752 cur.executescript("create table test(x); asdf; create table test2(x)") 753 754 def test_script_error_normal(self): 755 con = sqlite.connect(":memory:") 756 cur = con.cursor() 757 with self.assertRaises(sqlite.OperationalError): 758 cur.executescript("create table test(sadfsadfdsa); select foo from hurz;") 759 760 def test_cursor_executescript_as_bytes(self): 761 con = sqlite.connect(":memory:") 762 cur = con.cursor() 763 with self.assertRaises(ValueError) as cm: 764 cur.executescript(b"create table test(foo); insert into test(foo) values (5);") 765 self.assertEqual(str(cm.exception), 'script argument must be unicode.') 766 767 def test_connection_execute(self): 768 con = sqlite.connect(":memory:") 769 result = con.execute("select 5").fetchone()[0] 770 self.assertEqual(result, 5, "Basic test of Connection.execute") 771 772 def test_connection_executemany(self): 773 con = sqlite.connect(":memory:") 774 con.execute("create table test(foo)") 775 con.executemany("insert into test(foo) values (?)", [(3,), (4,)]) 776 result = con.execute("select foo from test order by foo").fetchall() 777 self.assertEqual(result[0][0], 3, "Basic test of Connection.executemany") 778 self.assertEqual(result[1][0], 4, "Basic test of Connection.executemany") 779 780 def test_connection_executescript(self): 781 con = sqlite.connect(":memory:") 782 con.executescript("create table test(foo); insert into test(foo) values (5);") 783 result = con.execute("select foo from test").fetchone()[0] 784 self.assertEqual(result, 5, "Basic test of Connection.executescript") 785 786class ClosedConTests(unittest.TestCase): 787 def test_closed_con_cursor(self): 788 con = sqlite.connect(":memory:") 789 con.close() 790 with self.assertRaises(sqlite.ProgrammingError): 791 cur = con.cursor() 792 793 def test_closed_con_commit(self): 794 con = sqlite.connect(":memory:") 795 con.close() 796 with self.assertRaises(sqlite.ProgrammingError): 797 con.commit() 798 799 def test_closed_con_rollback(self): 800 con = sqlite.connect(":memory:") 801 con.close() 802 with self.assertRaises(sqlite.ProgrammingError): 803 con.rollback() 804 805 def test_closed_cur_execute(self): 806 con = sqlite.connect(":memory:") 807 cur = con.cursor() 808 con.close() 809 with self.assertRaises(sqlite.ProgrammingError): 810 cur.execute("select 4") 811 812 def test_closed_create_function(self): 813 con = sqlite.connect(":memory:") 814 con.close() 815 def f(x): return 17 816 with self.assertRaises(sqlite.ProgrammingError): 817 con.create_function("foo", 1, f) 818 819 def test_closed_create_aggregate(self): 820 con = sqlite.connect(":memory:") 821 con.close() 822 class Agg: 823 def __init__(self): 824 pass 825 def step(self, x): 826 pass 827 def finalize(self): 828 return 17 829 with self.assertRaises(sqlite.ProgrammingError): 830 con.create_aggregate("foo", 1, Agg) 831 832 def test_closed_set_authorizer(self): 833 con = sqlite.connect(":memory:") 834 con.close() 835 def authorizer(*args): 836 return sqlite.DENY 837 with self.assertRaises(sqlite.ProgrammingError): 838 con.set_authorizer(authorizer) 839 840 def test_closed_set_progress_callback(self): 841 con = sqlite.connect(":memory:") 842 con.close() 843 def progress(): pass 844 with self.assertRaises(sqlite.ProgrammingError): 845 con.set_progress_handler(progress, 100) 846 847 def test_closed_call(self): 848 con = sqlite.connect(":memory:") 849 con.close() 850 with self.assertRaises(sqlite.ProgrammingError): 851 con() 852 853class ClosedCurTests(unittest.TestCase): 854 def test_closed(self): 855 con = sqlite.connect(":memory:") 856 cur = con.cursor() 857 cur.close() 858 859 for method_name in ("execute", "executemany", "executescript", "fetchall", "fetchmany", "fetchone"): 860 if method_name in ("execute", "executescript"): 861 params = ("select 4 union select 5",) 862 elif method_name == "executemany": 863 params = ("insert into foo(bar) values (?)", [(3,), (4,)]) 864 else: 865 params = [] 866 867 with self.assertRaises(sqlite.ProgrammingError): 868 method = getattr(cur, method_name) 869 method(*params) 870 871 872class SqliteOnConflictTests(unittest.TestCase): 873 """ 874 Tests for SQLite's "insert on conflict" feature. 875 876 See https://www.sqlite.org/lang_conflict.html for details. 877 """ 878 879 def setUp(self): 880 self.cx = sqlite.connect(":memory:") 881 self.cu = self.cx.cursor() 882 self.cu.execute(""" 883 CREATE TABLE test( 884 id INTEGER PRIMARY KEY, name TEXT, unique_name TEXT UNIQUE 885 ); 886 """) 887 888 def tearDown(self): 889 self.cu.close() 890 self.cx.close() 891 892 def test_on_conflict_rollback_with_explicit_transaction(self): 893 self.cx.isolation_level = None # autocommit mode 894 self.cu = self.cx.cursor() 895 # Start an explicit transaction. 896 self.cu.execute("BEGIN") 897 self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')") 898 self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')") 899 with self.assertRaises(sqlite.IntegrityError): 900 self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')") 901 # Use connection to commit. 902 self.cx.commit() 903 self.cu.execute("SELECT name, unique_name from test") 904 # Transaction should have rolled back and nothing should be in table. 905 self.assertEqual(self.cu.fetchall(), []) 906 907 def test_on_conflict_abort_raises_with_explicit_transactions(self): 908 # Abort cancels the current sql statement but doesn't change anything 909 # about the current transaction. 910 self.cx.isolation_level = None # autocommit mode 911 self.cu = self.cx.cursor() 912 # Start an explicit transaction. 913 self.cu.execute("BEGIN") 914 self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')") 915 self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')") 916 with self.assertRaises(sqlite.IntegrityError): 917 self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')") 918 self.cx.commit() 919 self.cu.execute("SELECT name, unique_name FROM test") 920 # Expect the first two inserts to work, third to do nothing. 921 self.assertEqual(self.cu.fetchall(), [('abort_test', None), (None, 'foo',)]) 922 923 def test_on_conflict_rollback_without_transaction(self): 924 # Start of implicit transaction 925 self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')") 926 self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')") 927 with self.assertRaises(sqlite.IntegrityError): 928 self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')") 929 self.cu.execute("SELECT name, unique_name FROM test") 930 # Implicit transaction is rolled back on error. 931 self.assertEqual(self.cu.fetchall(), []) 932 933 def test_on_conflict_abort_raises_without_transactions(self): 934 # Abort cancels the current sql statement but doesn't change anything 935 # about the current transaction. 936 self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')") 937 self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')") 938 with self.assertRaises(sqlite.IntegrityError): 939 self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')") 940 # Make sure all other values were inserted. 941 self.cu.execute("SELECT name, unique_name FROM test") 942 self.assertEqual(self.cu.fetchall(), [('abort_test', None), (None, 'foo',)]) 943 944 def test_on_conflict_fail(self): 945 self.cu.execute("INSERT OR FAIL INTO test(unique_name) VALUES ('foo')") 946 with self.assertRaises(sqlite.IntegrityError): 947 self.cu.execute("INSERT OR FAIL INTO test(unique_name) VALUES ('foo')") 948 self.assertEqual(self.cu.fetchall(), []) 949 950 def test_on_conflict_ignore(self): 951 self.cu.execute("INSERT OR IGNORE INTO test(unique_name) VALUES ('foo')") 952 # Nothing should happen. 953 self.cu.execute("INSERT OR IGNORE INTO test(unique_name) VALUES ('foo')") 954 self.cu.execute("SELECT unique_name FROM test") 955 self.assertEqual(self.cu.fetchall(), [('foo',)]) 956 957 def test_on_conflict_replace(self): 958 self.cu.execute("INSERT OR REPLACE INTO test(name, unique_name) VALUES ('Data!', 'foo')") 959 # There shouldn't be an IntegrityError exception. 960 self.cu.execute("INSERT OR REPLACE INTO test(name, unique_name) VALUES ('Very different data!', 'foo')") 961 self.cu.execute("SELECT name, unique_name FROM test") 962 self.assertEqual(self.cu.fetchall(), [('Very different data!', 'foo')]) 963 964 965class MultiprocessTests(unittest.TestCase): 966 CONNECTION_TIMEOUT = SHORT_TIMEOUT / 1000. # Defaults to 30 ms 967 968 def tearDown(self): 969 unlink(TESTFN) 970 971 def test_ctx_mgr_rollback_if_commit_failed(self): 972 # bpo-27334: ctx manager does not rollback if commit fails 973 SCRIPT = f"""if 1: 974 import sqlite3 975 def wait(): 976 print("started") 977 assert "database is locked" in input() 978 979 cx = sqlite3.connect("{TESTFN}", timeout={self.CONNECTION_TIMEOUT}) 980 cx.create_function("wait", 0, wait) 981 with cx: 982 cx.execute("create table t(t)") 983 try: 984 # execute two transactions; both will try to lock the db 985 cx.executescript(''' 986 -- start a transaction and wait for parent 987 begin transaction; 988 select * from t; 989 select wait(); 990 rollback; 991 992 -- start a new transaction; would fail if parent holds lock 993 begin transaction; 994 select * from t; 995 rollback; 996 ''') 997 finally: 998 cx.close() 999 """ 1000 1001 # spawn child process 1002 proc = subprocess.Popen( 1003 [sys.executable, "-c", SCRIPT], 1004 encoding="utf-8", 1005 bufsize=0, 1006 stdin=subprocess.PIPE, 1007 stdout=subprocess.PIPE, 1008 ) 1009 self.addCleanup(proc.communicate) 1010 1011 # wait for child process to start 1012 self.assertEqual("started", proc.stdout.readline().strip()) 1013 1014 cx = sqlite.connect(TESTFN, timeout=self.CONNECTION_TIMEOUT) 1015 try: # context manager should correctly release the db lock 1016 with cx: 1017 cx.execute("insert into t values('test')") 1018 except sqlite.OperationalError as exc: 1019 proc.stdin.write(str(exc)) 1020 else: 1021 proc.stdin.write("no error") 1022 finally: 1023 cx.close() 1024 1025 # terminate child process 1026 self.assertIsNone(proc.returncode) 1027 try: 1028 proc.communicate(input="end", timeout=SHORT_TIMEOUT) 1029 except subprocess.TimeoutExpired: 1030 proc.kill() 1031 proc.communicate() 1032 raise 1033 self.assertEqual(proc.returncode, 0) 1034 1035 1036def suite(): 1037 tests = [ 1038 ClosedConTests, 1039 ClosedCurTests, 1040 ConnectionTests, 1041 ConstructorTests, 1042 CursorTests, 1043 ExtensionTests, 1044 ModuleTests, 1045 MultiprocessTests, 1046 SqliteOnConflictTests, 1047 ThreadTests, 1048 UninitialisedConnectionTests, 1049 ] 1050 return unittest.TestSuite( 1051 [unittest.TestLoader().loadTestsFromTestCase(t) for t in tests] 1052 ) 1053 1054def test(): 1055 runner = unittest.TextTestRunner() 1056 runner.run(suite()) 1057 1058if __name__ == "__main__": 1059 test() 1060