1#-*- coding: ISO-8859-1 -*- 2# pysqlite2/test/transactions.py: tests transactions 3# 4# Copyright (C) 2005-2007 Gerhard H�ring <gh@ghaering.de> 5# 6# This file is part of pysqlite. 7# 8# This software is provided 'as-is', without any express or implied 9# warranty. In no event will the authors be held liable for any damages 10# arising from the use of this software. 11# 12# Permission is granted to anyone to use this software for any purpose, 13# including commercial applications, and to alter it and redistribute it 14# freely, subject to the following restrictions: 15# 16# 1. The origin of this software must not be misrepresented; you must not 17# claim that you wrote the original software. If you use this software 18# in a product, an acknowledgment in the product documentation would be 19# appreciated but is not required. 20# 2. Altered source versions must be plainly marked as such, and must not be 21# misrepresented as being the original software. 22# 3. This notice may not be removed or altered from any source distribution. 23 24import sys 25import os, unittest 26import sqlite3 as sqlite 27 28def get_db_path(): 29 return "sqlite_testdb" 30 31class TransactionTests(unittest.TestCase): 32 def setUp(self): 33 try: 34 os.remove(get_db_path()) 35 except OSError: 36 pass 37 38 self.con1 = sqlite.connect(get_db_path(), timeout=0.1) 39 self.cur1 = self.con1.cursor() 40 41 self.con2 = sqlite.connect(get_db_path(), timeout=0.1) 42 self.cur2 = self.con2.cursor() 43 44 def tearDown(self): 45 self.cur1.close() 46 self.con1.close() 47 48 self.cur2.close() 49 self.con2.close() 50 51 try: 52 os.unlink(get_db_path()) 53 except OSError: 54 pass 55 56 def CheckDMLdoesAutoCommitBefore(self): 57 self.cur1.execute("create table test(i)") 58 self.cur1.execute("insert into test(i) values (5)") 59 self.cur1.execute("create table test2(j)") 60 self.cur2.execute("select i from test") 61 res = self.cur2.fetchall() 62 self.assertEqual(len(res), 1) 63 64 def CheckInsertStartsTransaction(self): 65 self.cur1.execute("create table test(i)") 66 self.cur1.execute("insert into test(i) values (5)") 67 self.cur2.execute("select i from test") 68 res = self.cur2.fetchall() 69 self.assertEqual(len(res), 0) 70 71 def CheckUpdateStartsTransaction(self): 72 self.cur1.execute("create table test(i)") 73 self.cur1.execute("insert into test(i) values (5)") 74 self.con1.commit() 75 self.cur1.execute("update test set i=6") 76 self.cur2.execute("select i from test") 77 res = self.cur2.fetchone()[0] 78 self.assertEqual(res, 5) 79 80 def CheckDeleteStartsTransaction(self): 81 self.cur1.execute("create table test(i)") 82 self.cur1.execute("insert into test(i) values (5)") 83 self.con1.commit() 84 self.cur1.execute("delete from test") 85 self.cur2.execute("select i from test") 86 res = self.cur2.fetchall() 87 self.assertEqual(len(res), 1) 88 89 def CheckReplaceStartsTransaction(self): 90 self.cur1.execute("create table test(i)") 91 self.cur1.execute("insert into test(i) values (5)") 92 self.con1.commit() 93 self.cur1.execute("replace into test(i) values (6)") 94 self.cur2.execute("select i from test") 95 res = self.cur2.fetchall() 96 self.assertEqual(len(res), 1) 97 self.assertEqual(res[0][0], 5) 98 99 def CheckToggleAutoCommit(self): 100 self.cur1.execute("create table test(i)") 101 self.cur1.execute("insert into test(i) values (5)") 102 self.con1.isolation_level = None 103 self.assertEqual(self.con1.isolation_level, None) 104 self.cur2.execute("select i from test") 105 res = self.cur2.fetchall() 106 self.assertEqual(len(res), 1) 107 108 self.con1.isolation_level = "DEFERRED" 109 self.assertEqual(self.con1.isolation_level , "DEFERRED") 110 self.cur1.execute("insert into test(i) values (5)") 111 self.cur2.execute("select i from test") 112 res = self.cur2.fetchall() 113 self.assertEqual(len(res), 1) 114 115 def CheckRaiseTimeout(self): 116 if sqlite.sqlite_version_info < (3, 2, 2): 117 # This will fail (hang) on earlier versions of sqlite. 118 # Determine exact version it was fixed. 3.2.1 hangs. 119 return 120 self.cur1.execute("create table test(i)") 121 self.cur1.execute("insert into test(i) values (5)") 122 try: 123 self.cur2.execute("insert into test(i) values (5)") 124 self.fail("should have raised an OperationalError") 125 except sqlite.OperationalError: 126 pass 127 except: 128 self.fail("should have raised an OperationalError") 129 130 def CheckLocking(self): 131 """ 132 This tests the improved concurrency with pysqlite 2.3.4. You needed 133 to roll back con2 before you could commit con1. 134 """ 135 if sqlite.sqlite_version_info < (3, 2, 2): 136 # This will fail (hang) on earlier versions of sqlite. 137 # Determine exact version it was fixed. 3.2.1 hangs. 138 return 139 self.cur1.execute("create table test(i)") 140 self.cur1.execute("insert into test(i) values (5)") 141 try: 142 self.cur2.execute("insert into test(i) values (5)") 143 self.fail("should have raised an OperationalError") 144 except sqlite.OperationalError: 145 pass 146 except: 147 self.fail("should have raised an OperationalError") 148 # NO self.con2.rollback() HERE!!! 149 self.con1.commit() 150 151 def CheckRollbackCursorConsistency(self): 152 """ 153 Checks if cursors on the connection are set into a "reset" state 154 when a rollback is done on the connection. 155 """ 156 con = sqlite.connect(":memory:") 157 cur = con.cursor() 158 cur.execute("create table test(x)") 159 cur.execute("insert into test(x) values (5)") 160 cur.execute("select 1 union select 2 union select 3") 161 162 con.rollback() 163 try: 164 cur.fetchall() 165 self.fail("InterfaceError should have been raised") 166 except sqlite.InterfaceError, e: 167 pass 168 except: 169 self.fail("InterfaceError should have been raised") 170 171class SpecialCommandTests(unittest.TestCase): 172 def setUp(self): 173 self.con = sqlite.connect(":memory:") 174 self.cur = self.con.cursor() 175 176 def CheckVacuum(self): 177 self.cur.execute("create table test(i)") 178 self.cur.execute("insert into test(i) values (5)") 179 self.cur.execute("vacuum") 180 181 def CheckDropTable(self): 182 self.cur.execute("create table test(i)") 183 self.cur.execute("insert into test(i) values (5)") 184 self.cur.execute("drop table test") 185 186 def CheckPragma(self): 187 self.cur.execute("create table test(i)") 188 self.cur.execute("insert into test(i) values (5)") 189 self.cur.execute("pragma count_changes=1") 190 191 def tearDown(self): 192 self.cur.close() 193 self.con.close() 194 195def suite(): 196 default_suite = unittest.makeSuite(TransactionTests, "Check") 197 special_command_suite = unittest.makeSuite(SpecialCommandTests, "Check") 198 return unittest.TestSuite((default_suite, special_command_suite)) 199 200def test(): 201 runner = unittest.TextTestRunner() 202 runner.run(suite()) 203 204if __name__ == "__main__": 205 test() 206