• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#-*- coding: iso-8859-1 -*-
2# pysqlite2/test/dbapi.py: tests for DB-API compliance
3#
4# Copyright (C) 2004-2010 Gerhard H�ring <gh@ghaering.de>
5#
6# This file is part of pysqlite.
7#
8# This software is provided 'as-is', without any express or implied
9# warranty.  In no event will the authors be held liable for any damages
10# arising from the use of this software.
11#
12# Permission is granted to anyone to use this software for any purpose,
13# including commercial applications, and to alter it and redistribute it
14# freely, subject to the following restrictions:
15#
16# 1. The origin of this software must not be misrepresented; you must not
17#    claim that you wrote the original software. If you use this software
18#    in a product, an acknowledgment in the product documentation would be
19#    appreciated but is not required.
20# 2. Altered source versions must be plainly marked as such, and must not be
21#    misrepresented as being the original software.
22# 3. This notice may not be removed or altered from any source distribution.
23
24import threading
25import unittest
26import sqlite3 as sqlite
27
28from test.support import TESTFN, unlink
29
30
31class ModuleTests(unittest.TestCase):
32    def CheckAPILevel(self):
33        self.assertEqual(sqlite.apilevel, "2.0",
34                         "apilevel is %s, should be 2.0" % sqlite.apilevel)
35
36    def CheckThreadSafety(self):
37        self.assertEqual(sqlite.threadsafety, 1,
38                         "threadsafety is %d, should be 1" % sqlite.threadsafety)
39
40    def CheckParamStyle(self):
41        self.assertEqual(sqlite.paramstyle, "qmark",
42                         "paramstyle is '%s', should be 'qmark'" %
43                         sqlite.paramstyle)
44
45    def CheckWarning(self):
46        self.assertTrue(issubclass(sqlite.Warning, Exception),
47                     "Warning is not a subclass of Exception")
48
49    def CheckError(self):
50        self.assertTrue(issubclass(sqlite.Error, Exception),
51                        "Error is not a subclass of Exception")
52
53    def CheckInterfaceError(self):
54        self.assertTrue(issubclass(sqlite.InterfaceError, sqlite.Error),
55                        "InterfaceError is not a subclass of Error")
56
57    def CheckDatabaseError(self):
58        self.assertTrue(issubclass(sqlite.DatabaseError, sqlite.Error),
59                        "DatabaseError is not a subclass of Error")
60
61    def CheckDataError(self):
62        self.assertTrue(issubclass(sqlite.DataError, sqlite.DatabaseError),
63                        "DataError is not a subclass of DatabaseError")
64
65    def CheckOperationalError(self):
66        self.assertTrue(issubclass(sqlite.OperationalError, sqlite.DatabaseError),
67                        "OperationalError is not a subclass of DatabaseError")
68
69    def CheckIntegrityError(self):
70        self.assertTrue(issubclass(sqlite.IntegrityError, sqlite.DatabaseError),
71                        "IntegrityError is not a subclass of DatabaseError")
72
73    def CheckInternalError(self):
74        self.assertTrue(issubclass(sqlite.InternalError, sqlite.DatabaseError),
75                        "InternalError is not a subclass of DatabaseError")
76
77    def CheckProgrammingError(self):
78        self.assertTrue(issubclass(sqlite.ProgrammingError, sqlite.DatabaseError),
79                        "ProgrammingError is not a subclass of DatabaseError")
80
81    def CheckNotSupportedError(self):
82        self.assertTrue(issubclass(sqlite.NotSupportedError,
83                                   sqlite.DatabaseError),
84                        "NotSupportedError is not a subclass of DatabaseError")
85
86class ConnectionTests(unittest.TestCase):
87
88    def setUp(self):
89        self.cx = sqlite.connect(":memory:")
90        cu = self.cx.cursor()
91        cu.execute("create table test(id integer primary key, name text)")
92        cu.execute("insert into test(name) values (?)", ("foo",))
93
94    def tearDown(self):
95        self.cx.close()
96
97    def CheckCommit(self):
98        self.cx.commit()
99
100    def CheckCommitAfterNoChanges(self):
101        """
102        A commit should also work when no changes were made to the database.
103        """
104        self.cx.commit()
105        self.cx.commit()
106
107    def CheckRollback(self):
108        self.cx.rollback()
109
110    def CheckRollbackAfterNoChanges(self):
111        """
112        A rollback should also work when no changes were made to the database.
113        """
114        self.cx.rollback()
115        self.cx.rollback()
116
117    def CheckCursor(self):
118        cu = self.cx.cursor()
119
120    def CheckFailedOpen(self):
121        YOU_CANNOT_OPEN_THIS = "/foo/bar/bla/23534/mydb.db"
122        with self.assertRaises(sqlite.OperationalError):
123            con = sqlite.connect(YOU_CANNOT_OPEN_THIS)
124
125    def CheckClose(self):
126        self.cx.close()
127
128    def CheckExceptions(self):
129        # Optional DB-API extension.
130        self.assertEqual(self.cx.Warning, sqlite.Warning)
131        self.assertEqual(self.cx.Error, sqlite.Error)
132        self.assertEqual(self.cx.InterfaceError, sqlite.InterfaceError)
133        self.assertEqual(self.cx.DatabaseError, sqlite.DatabaseError)
134        self.assertEqual(self.cx.DataError, sqlite.DataError)
135        self.assertEqual(self.cx.OperationalError, sqlite.OperationalError)
136        self.assertEqual(self.cx.IntegrityError, sqlite.IntegrityError)
137        self.assertEqual(self.cx.InternalError, sqlite.InternalError)
138        self.assertEqual(self.cx.ProgrammingError, sqlite.ProgrammingError)
139        self.assertEqual(self.cx.NotSupportedError, sqlite.NotSupportedError)
140
141    def CheckInTransaction(self):
142        # Can't use db from setUp because we want to test initial state.
143        cx = sqlite.connect(":memory:")
144        cu = cx.cursor()
145        self.assertEqual(cx.in_transaction, False)
146        cu.execute("create table transactiontest(id integer primary key, name text)")
147        self.assertEqual(cx.in_transaction, False)
148        cu.execute("insert into transactiontest(name) values (?)", ("foo",))
149        self.assertEqual(cx.in_transaction, True)
150        cu.execute("select name from transactiontest where name=?", ["foo"])
151        row = cu.fetchone()
152        self.assertEqual(cx.in_transaction, True)
153        cx.commit()
154        self.assertEqual(cx.in_transaction, False)
155        cu.execute("select name from transactiontest where name=?", ["foo"])
156        row = cu.fetchone()
157        self.assertEqual(cx.in_transaction, False)
158
159    def CheckInTransactionRO(self):
160        with self.assertRaises(AttributeError):
161            self.cx.in_transaction = True
162
163    def CheckOpenWithPathLikeObject(self):
164        """ Checks that we can successfully connect to a database using an object that
165            is PathLike, i.e. has __fspath__(). """
166        self.addCleanup(unlink, TESTFN)
167        class Path:
168            def __fspath__(self):
169                return TESTFN
170        path = Path()
171        with sqlite.connect(path) as cx:
172            cx.execute('create table test(id integer)')
173
174    def CheckOpenUri(self):
175        if sqlite.sqlite_version_info < (3, 7, 7):
176            with self.assertRaises(sqlite.NotSupportedError):
177                sqlite.connect(':memory:', uri=True)
178            return
179        self.addCleanup(unlink, TESTFN)
180        with sqlite.connect(TESTFN) as cx:
181            cx.execute('create table test(id integer)')
182        with sqlite.connect('file:' + TESTFN, uri=True) as cx:
183            cx.execute('insert into test(id) values(0)')
184        with sqlite.connect('file:' + TESTFN + '?mode=ro', uri=True) as cx:
185            with self.assertRaises(sqlite.OperationalError):
186                cx.execute('insert into test(id) values(1)')
187
188    @unittest.skipIf(sqlite.sqlite_version_info >= (3, 3, 1),
189                     'needs sqlite versions older than 3.3.1')
190    def CheckSameThreadErrorOnOldVersion(self):
191        with self.assertRaises(sqlite.NotSupportedError) as cm:
192            sqlite.connect(':memory:', check_same_thread=False)
193        self.assertEqual(str(cm.exception), 'shared connections not available')
194
195class CursorTests(unittest.TestCase):
196    def setUp(self):
197        self.cx = sqlite.connect(":memory:")
198        self.cu = self.cx.cursor()
199        self.cu.execute(
200            "create table test(id integer primary key, name text, "
201            "income number, unique_test text unique)"
202        )
203        self.cu.execute("insert into test(name) values (?)", ("foo",))
204
205    def tearDown(self):
206        self.cu.close()
207        self.cx.close()
208
209    def CheckExecuteNoArgs(self):
210        self.cu.execute("delete from test")
211
212    def CheckExecuteIllegalSql(self):
213        with self.assertRaises(sqlite.OperationalError):
214            self.cu.execute("select asdf")
215
216    def CheckExecuteTooMuchSql(self):
217        with self.assertRaises(sqlite.Warning):
218            self.cu.execute("select 5+4; select 4+5")
219
220    def CheckExecuteTooMuchSql2(self):
221        self.cu.execute("select 5+4; -- foo bar")
222
223    def CheckExecuteTooMuchSql3(self):
224        self.cu.execute("""
225            select 5+4;
226
227            /*
228            foo
229            */
230            """)
231
232    def CheckExecuteWrongSqlArg(self):
233        with self.assertRaises(TypeError):
234            self.cu.execute(42)
235
236    def CheckExecuteArgInt(self):
237        self.cu.execute("insert into test(id) values (?)", (42,))
238
239    def CheckExecuteArgFloat(self):
240        self.cu.execute("insert into test(income) values (?)", (2500.32,))
241
242    def CheckExecuteArgString(self):
243        self.cu.execute("insert into test(name) values (?)", ("Hugo",))
244
245    def CheckExecuteArgStringWithZeroByte(self):
246        self.cu.execute("insert into test(name) values (?)", ("Hu\x00go",))
247
248        self.cu.execute("select name from test where id=?", (self.cu.lastrowid,))
249        row = self.cu.fetchone()
250        self.assertEqual(row[0], "Hu\x00go")
251
252    def CheckExecuteNonIterable(self):
253        with self.assertRaises(ValueError) as cm:
254            self.cu.execute("insert into test(id) values (?)", 42)
255        self.assertEqual(str(cm.exception), 'parameters are of unsupported type')
256
257    def CheckExecuteWrongNoOfArgs1(self):
258        # too many parameters
259        with self.assertRaises(sqlite.ProgrammingError):
260            self.cu.execute("insert into test(id) values (?)", (17, "Egon"))
261
262    def CheckExecuteWrongNoOfArgs2(self):
263        # too little parameters
264        with self.assertRaises(sqlite.ProgrammingError):
265            self.cu.execute("insert into test(id) values (?)")
266
267    def CheckExecuteWrongNoOfArgs3(self):
268        # no parameters, parameters are needed
269        with self.assertRaises(sqlite.ProgrammingError):
270            self.cu.execute("insert into test(id) values (?)")
271
272    def CheckExecuteParamList(self):
273        self.cu.execute("insert into test(name) values ('foo')")
274        self.cu.execute("select name from test where name=?", ["foo"])
275        row = self.cu.fetchone()
276        self.assertEqual(row[0], "foo")
277
278    def CheckExecuteParamSequence(self):
279        class L:
280            def __len__(self):
281                return 1
282            def __getitem__(self, x):
283                assert x == 0
284                return "foo"
285
286        self.cu.execute("insert into test(name) values ('foo')")
287        self.cu.execute("select name from test where name=?", L())
288        row = self.cu.fetchone()
289        self.assertEqual(row[0], "foo")
290
291    def CheckExecuteParamSequenceBadLen(self):
292        # Issue41662: Error in __len__() was overridden with ProgrammingError.
293        class L:
294            def __len__(self):
295                1/0
296            def __getitem__(slf, x):
297                raise AssertionError
298
299        self.cu.execute("insert into test(name) values ('foo')")
300        with self.assertRaises(ZeroDivisionError):
301            self.cu.execute("select name from test where name=?", L())
302
303    def CheckExecuteDictMapping(self):
304        self.cu.execute("insert into test(name) values ('foo')")
305        self.cu.execute("select name from test where name=:name", {"name": "foo"})
306        row = self.cu.fetchone()
307        self.assertEqual(row[0], "foo")
308
309    def CheckExecuteDictMapping_Mapping(self):
310        class D(dict):
311            def __missing__(self, key):
312                return "foo"
313
314        self.cu.execute("insert into test(name) values ('foo')")
315        self.cu.execute("select name from test where name=:name", D())
316        row = self.cu.fetchone()
317        self.assertEqual(row[0], "foo")
318
319    def CheckExecuteDictMappingTooLittleArgs(self):
320        self.cu.execute("insert into test(name) values ('foo')")
321        with self.assertRaises(sqlite.ProgrammingError):
322            self.cu.execute("select name from test where name=:name and id=:id", {"name": "foo"})
323
324    def CheckExecuteDictMappingNoArgs(self):
325        self.cu.execute("insert into test(name) values ('foo')")
326        with self.assertRaises(sqlite.ProgrammingError):
327            self.cu.execute("select name from test where name=:name")
328
329    def CheckExecuteDictMappingUnnamed(self):
330        self.cu.execute("insert into test(name) values ('foo')")
331        with self.assertRaises(sqlite.ProgrammingError):
332            self.cu.execute("select name from test where name=?", {"name": "foo"})
333
334    def CheckClose(self):
335        self.cu.close()
336
337    def CheckRowcountExecute(self):
338        self.cu.execute("delete from test")
339        self.cu.execute("insert into test(name) values ('foo')")
340        self.cu.execute("insert into test(name) values ('foo')")
341        self.cu.execute("update test set name='bar'")
342        self.assertEqual(self.cu.rowcount, 2)
343
344    def CheckRowcountSelect(self):
345        """
346        pysqlite does not know the rowcount of SELECT statements, because we
347        don't fetch all rows after executing the select statement. The rowcount
348        has thus to be -1.
349        """
350        self.cu.execute("select 5 union select 6")
351        self.assertEqual(self.cu.rowcount, -1)
352
353    def CheckRowcountExecutemany(self):
354        self.cu.execute("delete from test")
355        self.cu.executemany("insert into test(name) values (?)", [(1,), (2,), (3,)])
356        self.assertEqual(self.cu.rowcount, 3)
357
358    def CheckTotalChanges(self):
359        self.cu.execute("insert into test(name) values ('foo')")
360        self.cu.execute("insert into test(name) values ('foo')")
361        self.assertLess(2, self.cx.total_changes, msg='total changes reported wrong value')
362
363    # Checks for executemany:
364    # Sequences are required by the DB-API, iterators
365    # enhancements in pysqlite.
366
367    def CheckExecuteManySequence(self):
368        self.cu.executemany("insert into test(income) values (?)", [(x,) for x in range(100, 110)])
369
370    def CheckExecuteManyIterator(self):
371        class MyIter:
372            def __init__(self):
373                self.value = 5
374
375            def __next__(self):
376                if self.value == 10:
377                    raise StopIteration
378                else:
379                    self.value += 1
380                    return (self.value,)
381
382        self.cu.executemany("insert into test(income) values (?)", MyIter())
383
384    def CheckExecuteManyGenerator(self):
385        def mygen():
386            for i in range(5):
387                yield (i,)
388
389        self.cu.executemany("insert into test(income) values (?)", mygen())
390
391    def CheckExecuteManyWrongSqlArg(self):
392        with self.assertRaises(TypeError):
393            self.cu.executemany(42, [(3,)])
394
395    def CheckExecuteManySelect(self):
396        with self.assertRaises(sqlite.ProgrammingError):
397            self.cu.executemany("select ?", [(3,)])
398
399    def CheckExecuteManyNotIterable(self):
400        with self.assertRaises(TypeError):
401            self.cu.executemany("insert into test(income) values (?)", 42)
402
403    def CheckFetchIter(self):
404        # Optional DB-API extension.
405        self.cu.execute("delete from test")
406        self.cu.execute("insert into test(id) values (?)", (5,))
407        self.cu.execute("insert into test(id) values (?)", (6,))
408        self.cu.execute("select id from test order by id")
409        lst = []
410        for row in self.cu:
411            lst.append(row[0])
412        self.assertEqual(lst[0], 5)
413        self.assertEqual(lst[1], 6)
414
415    def CheckFetchone(self):
416        self.cu.execute("select name from test")
417        row = self.cu.fetchone()
418        self.assertEqual(row[0], "foo")
419        row = self.cu.fetchone()
420        self.assertEqual(row, None)
421
422    def CheckFetchoneNoStatement(self):
423        cur = self.cx.cursor()
424        row = cur.fetchone()
425        self.assertEqual(row, None)
426
427    def CheckArraySize(self):
428        # must default ot 1
429        self.assertEqual(self.cu.arraysize, 1)
430
431        # now set to 2
432        self.cu.arraysize = 2
433
434        # now make the query return 3 rows
435        self.cu.execute("delete from test")
436        self.cu.execute("insert into test(name) values ('A')")
437        self.cu.execute("insert into test(name) values ('B')")
438        self.cu.execute("insert into test(name) values ('C')")
439        self.cu.execute("select name from test")
440        res = self.cu.fetchmany()
441
442        self.assertEqual(len(res), 2)
443
444    def CheckFetchmany(self):
445        self.cu.execute("select name from test")
446        res = self.cu.fetchmany(100)
447        self.assertEqual(len(res), 1)
448        res = self.cu.fetchmany(100)
449        self.assertEqual(res, [])
450
451    def CheckFetchmanyKwArg(self):
452        """Checks if fetchmany works with keyword arguments"""
453        self.cu.execute("select name from test")
454        res = self.cu.fetchmany(size=100)
455        self.assertEqual(len(res), 1)
456
457    def CheckFetchall(self):
458        self.cu.execute("select name from test")
459        res = self.cu.fetchall()
460        self.assertEqual(len(res), 1)
461        res = self.cu.fetchall()
462        self.assertEqual(res, [])
463
464    def CheckSetinputsizes(self):
465        self.cu.setinputsizes([3, 4, 5])
466
467    def CheckSetoutputsize(self):
468        self.cu.setoutputsize(5, 0)
469
470    def CheckSetoutputsizeNoColumn(self):
471        self.cu.setoutputsize(42)
472
473    def CheckCursorConnection(self):
474        # Optional DB-API extension.
475        self.assertEqual(self.cu.connection, self.cx)
476
477    def CheckWrongCursorCallable(self):
478        with self.assertRaises(TypeError):
479            def f(): pass
480            cur = self.cx.cursor(f)
481
482    def CheckCursorWrongClass(self):
483        class Foo: pass
484        foo = Foo()
485        with self.assertRaises(TypeError):
486            cur = sqlite.Cursor(foo)
487
488    def CheckLastRowIDOnReplace(self):
489        """
490        INSERT OR REPLACE and REPLACE INTO should produce the same behavior.
491        """
492        sql = '{} INTO test(id, unique_test) VALUES (?, ?)'
493        for statement in ('INSERT OR REPLACE', 'REPLACE'):
494            with self.subTest(statement=statement):
495                self.cu.execute(sql.format(statement), (1, 'foo'))
496                self.assertEqual(self.cu.lastrowid, 1)
497
498    def CheckLastRowIDOnIgnore(self):
499        self.cu.execute(
500            "insert or ignore into test(unique_test) values (?)",
501            ('test',))
502        self.assertEqual(self.cu.lastrowid, 2)
503        self.cu.execute(
504            "insert or ignore into test(unique_test) values (?)",
505            ('test',))
506        self.assertEqual(self.cu.lastrowid, 2)
507
508    def CheckLastRowIDInsertOR(self):
509        results = []
510        for statement in ('FAIL', 'ABORT', 'ROLLBACK'):
511            sql = 'INSERT OR {} INTO test(unique_test) VALUES (?)'
512            with self.subTest(statement='INSERT OR {}'.format(statement)):
513                self.cu.execute(sql.format(statement), (statement,))
514                results.append((statement, self.cu.lastrowid))
515                with self.assertRaises(sqlite.IntegrityError):
516                    self.cu.execute(sql.format(statement), (statement,))
517                results.append((statement, self.cu.lastrowid))
518        expected = [
519            ('FAIL', 2), ('FAIL', 2),
520            ('ABORT', 3), ('ABORT', 3),
521            ('ROLLBACK', 4), ('ROLLBACK', 4),
522        ]
523        self.assertEqual(results, expected)
524
525
526class ThreadTests(unittest.TestCase):
527    def setUp(self):
528        self.con = sqlite.connect(":memory:")
529        self.cur = self.con.cursor()
530        self.cur.execute("create table test(id integer primary key, name text, bin binary, ratio number, ts timestamp)")
531
532    def tearDown(self):
533        self.cur.close()
534        self.con.close()
535
536    def CheckConCursor(self):
537        def run(con, errors):
538            try:
539                cur = con.cursor()
540                errors.append("did not raise ProgrammingError")
541                return
542            except sqlite.ProgrammingError:
543                return
544            except:
545                errors.append("raised wrong exception")
546
547        errors = []
548        t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
549        t.start()
550        t.join()
551        if len(errors) > 0:
552            self.fail("\n".join(errors))
553
554    def CheckConCommit(self):
555        def run(con, errors):
556            try:
557                con.commit()
558                errors.append("did not raise ProgrammingError")
559                return
560            except sqlite.ProgrammingError:
561                return
562            except:
563                errors.append("raised wrong exception")
564
565        errors = []
566        t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
567        t.start()
568        t.join()
569        if len(errors) > 0:
570            self.fail("\n".join(errors))
571
572    def CheckConRollback(self):
573        def run(con, errors):
574            try:
575                con.rollback()
576                errors.append("did not raise ProgrammingError")
577                return
578            except sqlite.ProgrammingError:
579                return
580            except:
581                errors.append("raised wrong exception")
582
583        errors = []
584        t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
585        t.start()
586        t.join()
587        if len(errors) > 0:
588            self.fail("\n".join(errors))
589
590    def CheckConClose(self):
591        def run(con, errors):
592            try:
593                con.close()
594                errors.append("did not raise ProgrammingError")
595                return
596            except sqlite.ProgrammingError:
597                return
598            except:
599                errors.append("raised wrong exception")
600
601        errors = []
602        t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
603        t.start()
604        t.join()
605        if len(errors) > 0:
606            self.fail("\n".join(errors))
607
608    def CheckCurImplicitBegin(self):
609        def run(cur, errors):
610            try:
611                cur.execute("insert into test(name) values ('a')")
612                errors.append("did not raise ProgrammingError")
613                return
614            except sqlite.ProgrammingError:
615                return
616            except:
617                errors.append("raised wrong exception")
618
619        errors = []
620        t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
621        t.start()
622        t.join()
623        if len(errors) > 0:
624            self.fail("\n".join(errors))
625
626    def CheckCurClose(self):
627        def run(cur, errors):
628            try:
629                cur.close()
630                errors.append("did not raise ProgrammingError")
631                return
632            except sqlite.ProgrammingError:
633                return
634            except:
635                errors.append("raised wrong exception")
636
637        errors = []
638        t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
639        t.start()
640        t.join()
641        if len(errors) > 0:
642            self.fail("\n".join(errors))
643
644    def CheckCurExecute(self):
645        def run(cur, errors):
646            try:
647                cur.execute("select name from test")
648                errors.append("did not raise ProgrammingError")
649                return
650            except sqlite.ProgrammingError:
651                return
652            except:
653                errors.append("raised wrong exception")
654
655        errors = []
656        self.cur.execute("insert into test(name) values ('a')")
657        t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
658        t.start()
659        t.join()
660        if len(errors) > 0:
661            self.fail("\n".join(errors))
662
663    def CheckCurIterNext(self):
664        def run(cur, errors):
665            try:
666                row = cur.fetchone()
667                errors.append("did not raise ProgrammingError")
668                return
669            except sqlite.ProgrammingError:
670                return
671            except:
672                errors.append("raised wrong exception")
673
674        errors = []
675        self.cur.execute("insert into test(name) values ('a')")
676        self.cur.execute("select name from test")
677        t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
678        t.start()
679        t.join()
680        if len(errors) > 0:
681            self.fail("\n".join(errors))
682
683class ConstructorTests(unittest.TestCase):
684    def CheckDate(self):
685        d = sqlite.Date(2004, 10, 28)
686
687    def CheckTime(self):
688        t = sqlite.Time(12, 39, 35)
689
690    def CheckTimestamp(self):
691        ts = sqlite.Timestamp(2004, 10, 28, 12, 39, 35)
692
693    def CheckDateFromTicks(self):
694        d = sqlite.DateFromTicks(42)
695
696    def CheckTimeFromTicks(self):
697        t = sqlite.TimeFromTicks(42)
698
699    def CheckTimestampFromTicks(self):
700        ts = sqlite.TimestampFromTicks(42)
701
702    def CheckBinary(self):
703        b = sqlite.Binary(b"\0'")
704
705class ExtensionTests(unittest.TestCase):
706    def CheckScriptStringSql(self):
707        con = sqlite.connect(":memory:")
708        cur = con.cursor()
709        cur.executescript("""
710            -- bla bla
711            /* a stupid comment */
712            create table a(i);
713            insert into a(i) values (5);
714            """)
715        cur.execute("select i from a")
716        res = cur.fetchone()[0]
717        self.assertEqual(res, 5)
718
719    def CheckScriptSyntaxError(self):
720        con = sqlite.connect(":memory:")
721        cur = con.cursor()
722        with self.assertRaises(sqlite.OperationalError):
723            cur.executescript("create table test(x); asdf; create table test2(x)")
724
725    def CheckScriptErrorNormal(self):
726        con = sqlite.connect(":memory:")
727        cur = con.cursor()
728        with self.assertRaises(sqlite.OperationalError):
729            cur.executescript("create table test(sadfsadfdsa); select foo from hurz;")
730
731    def CheckCursorExecutescriptAsBytes(self):
732        con = sqlite.connect(":memory:")
733        cur = con.cursor()
734        with self.assertRaises(ValueError) as cm:
735            cur.executescript(b"create table test(foo); insert into test(foo) values (5);")
736        self.assertEqual(str(cm.exception), 'script argument must be unicode.')
737
738    def CheckConnectionExecute(self):
739        con = sqlite.connect(":memory:")
740        result = con.execute("select 5").fetchone()[0]
741        self.assertEqual(result, 5, "Basic test of Connection.execute")
742
743    def CheckConnectionExecutemany(self):
744        con = sqlite.connect(":memory:")
745        con.execute("create table test(foo)")
746        con.executemany("insert into test(foo) values (?)", [(3,), (4,)])
747        result = con.execute("select foo from test order by foo").fetchall()
748        self.assertEqual(result[0][0], 3, "Basic test of Connection.executemany")
749        self.assertEqual(result[1][0], 4, "Basic test of Connection.executemany")
750
751    def CheckConnectionExecutescript(self):
752        con = sqlite.connect(":memory:")
753        con.executescript("create table test(foo); insert into test(foo) values (5);")
754        result = con.execute("select foo from test").fetchone()[0]
755        self.assertEqual(result, 5, "Basic test of Connection.executescript")
756
757class ClosedConTests(unittest.TestCase):
758    def CheckClosedConCursor(self):
759        con = sqlite.connect(":memory:")
760        con.close()
761        with self.assertRaises(sqlite.ProgrammingError):
762            cur = con.cursor()
763
764    def CheckClosedConCommit(self):
765        con = sqlite.connect(":memory:")
766        con.close()
767        with self.assertRaises(sqlite.ProgrammingError):
768            con.commit()
769
770    def CheckClosedConRollback(self):
771        con = sqlite.connect(":memory:")
772        con.close()
773        with self.assertRaises(sqlite.ProgrammingError):
774            con.rollback()
775
776    def CheckClosedCurExecute(self):
777        con = sqlite.connect(":memory:")
778        cur = con.cursor()
779        con.close()
780        with self.assertRaises(sqlite.ProgrammingError):
781            cur.execute("select 4")
782
783    def CheckClosedCreateFunction(self):
784        con = sqlite.connect(":memory:")
785        con.close()
786        def f(x): return 17
787        with self.assertRaises(sqlite.ProgrammingError):
788            con.create_function("foo", 1, f)
789
790    def CheckClosedCreateAggregate(self):
791        con = sqlite.connect(":memory:")
792        con.close()
793        class Agg:
794            def __init__(self):
795                pass
796            def step(self, x):
797                pass
798            def finalize(self):
799                return 17
800        with self.assertRaises(sqlite.ProgrammingError):
801            con.create_aggregate("foo", 1, Agg)
802
803    def CheckClosedSetAuthorizer(self):
804        con = sqlite.connect(":memory:")
805        con.close()
806        def authorizer(*args):
807            return sqlite.DENY
808        with self.assertRaises(sqlite.ProgrammingError):
809            con.set_authorizer(authorizer)
810
811    def CheckClosedSetProgressCallback(self):
812        con = sqlite.connect(":memory:")
813        con.close()
814        def progress(): pass
815        with self.assertRaises(sqlite.ProgrammingError):
816            con.set_progress_handler(progress, 100)
817
818    def CheckClosedCall(self):
819        con = sqlite.connect(":memory:")
820        con.close()
821        with self.assertRaises(sqlite.ProgrammingError):
822            con()
823
824class ClosedCurTests(unittest.TestCase):
825    def CheckClosed(self):
826        con = sqlite.connect(":memory:")
827        cur = con.cursor()
828        cur.close()
829
830        for method_name in ("execute", "executemany", "executescript", "fetchall", "fetchmany", "fetchone"):
831            if method_name in ("execute", "executescript"):
832                params = ("select 4 union select 5",)
833            elif method_name == "executemany":
834                params = ("insert into foo(bar) values (?)", [(3,), (4,)])
835            else:
836                params = []
837
838            with self.assertRaises(sqlite.ProgrammingError):
839                method = getattr(cur, method_name)
840                method(*params)
841
842
843class SqliteOnConflictTests(unittest.TestCase):
844    """
845    Tests for SQLite's "insert on conflict" feature.
846
847    See https://www.sqlite.org/lang_conflict.html for details.
848    """
849
850    def setUp(self):
851        self.cx = sqlite.connect(":memory:")
852        self.cu = self.cx.cursor()
853        self.cu.execute("""
854          CREATE TABLE test(
855            id INTEGER PRIMARY KEY, name TEXT, unique_name TEXT UNIQUE
856          );
857        """)
858
859    def tearDown(self):
860        self.cu.close()
861        self.cx.close()
862
863    def CheckOnConflictRollbackWithExplicitTransaction(self):
864        self.cx.isolation_level = None  # autocommit mode
865        self.cu = self.cx.cursor()
866        # Start an explicit transaction.
867        self.cu.execute("BEGIN")
868        self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')")
869        self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')")
870        with self.assertRaises(sqlite.IntegrityError):
871            self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')")
872        # Use connection to commit.
873        self.cx.commit()
874        self.cu.execute("SELECT name, unique_name from test")
875        # Transaction should have rolled back and nothing should be in table.
876        self.assertEqual(self.cu.fetchall(), [])
877
878    def CheckOnConflictAbortRaisesWithExplicitTransactions(self):
879        # Abort cancels the current sql statement but doesn't change anything
880        # about the current transaction.
881        self.cx.isolation_level = None  # autocommit mode
882        self.cu = self.cx.cursor()
883        # Start an explicit transaction.
884        self.cu.execute("BEGIN")
885        self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')")
886        self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')")
887        with self.assertRaises(sqlite.IntegrityError):
888            self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')")
889        self.cx.commit()
890        self.cu.execute("SELECT name, unique_name FROM test")
891        # Expect the first two inserts to work, third to do nothing.
892        self.assertEqual(self.cu.fetchall(), [('abort_test', None), (None, 'foo',)])
893
894    def CheckOnConflictRollbackWithoutTransaction(self):
895        # Start of implicit transaction
896        self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')")
897        self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')")
898        with self.assertRaises(sqlite.IntegrityError):
899            self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')")
900        self.cu.execute("SELECT name, unique_name FROM test")
901        # Implicit transaction is rolled back on error.
902        self.assertEqual(self.cu.fetchall(), [])
903
904    def CheckOnConflictAbortRaisesWithoutTransactions(self):
905        # Abort cancels the current sql statement but doesn't change anything
906        # about the current transaction.
907        self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')")
908        self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')")
909        with self.assertRaises(sqlite.IntegrityError):
910            self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')")
911        # Make sure all other values were inserted.
912        self.cu.execute("SELECT name, unique_name FROM test")
913        self.assertEqual(self.cu.fetchall(), [('abort_test', None), (None, 'foo',)])
914
915    def CheckOnConflictFail(self):
916        self.cu.execute("INSERT OR FAIL INTO test(unique_name) VALUES ('foo')")
917        with self.assertRaises(sqlite.IntegrityError):
918            self.cu.execute("INSERT OR FAIL INTO test(unique_name) VALUES ('foo')")
919        self.assertEqual(self.cu.fetchall(), [])
920
921    def CheckOnConflictIgnore(self):
922        self.cu.execute("INSERT OR IGNORE INTO test(unique_name) VALUES ('foo')")
923        # Nothing should happen.
924        self.cu.execute("INSERT OR IGNORE INTO test(unique_name) VALUES ('foo')")
925        self.cu.execute("SELECT unique_name FROM test")
926        self.assertEqual(self.cu.fetchall(), [('foo',)])
927
928    def CheckOnConflictReplace(self):
929        self.cu.execute("INSERT OR REPLACE INTO test(name, unique_name) VALUES ('Data!', 'foo')")
930        # There shouldn't be an IntegrityError exception.
931        self.cu.execute("INSERT OR REPLACE INTO test(name, unique_name) VALUES ('Very different data!', 'foo')")
932        self.cu.execute("SELECT name, unique_name FROM test")
933        self.assertEqual(self.cu.fetchall(), [('Very different data!', 'foo')])
934
935
936def suite():
937    module_suite = unittest.makeSuite(ModuleTests, "Check")
938    connection_suite = unittest.makeSuite(ConnectionTests, "Check")
939    cursor_suite = unittest.makeSuite(CursorTests, "Check")
940    thread_suite = unittest.makeSuite(ThreadTests, "Check")
941    constructor_suite = unittest.makeSuite(ConstructorTests, "Check")
942    ext_suite = unittest.makeSuite(ExtensionTests, "Check")
943    closed_con_suite = unittest.makeSuite(ClosedConTests, "Check")
944    closed_cur_suite = unittest.makeSuite(ClosedCurTests, "Check")
945    on_conflict_suite = unittest.makeSuite(SqliteOnConflictTests, "Check")
946    return unittest.TestSuite((
947        module_suite, connection_suite, cursor_suite, thread_suite,
948        constructor_suite, ext_suite, closed_con_suite, closed_cur_suite,
949        on_conflict_suite,
950    ))
951
952def test():
953    runner = unittest.TextTestRunner()
954    runner.run(suite())
955
956if __name__ == "__main__":
957    test()
958