• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# pysqlite2/test/dbapi.py: tests for DB-API compliance
2#
3# Copyright (C) 2004-2010 Gerhard Häring <gh@ghaering.de>
4#
5# This file is part of pysqlite.
6#
7# This software is provided 'as-is', without any express or implied
8# warranty.  In no event will the authors be held liable for any damages
9# arising from the use of this software.
10#
11# Permission is granted to anyone to use this software for any purpose,
12# including commercial applications, and to alter it and redistribute it
13# freely, subject to the following restrictions:
14#
15# 1. The origin of this software must not be misrepresented; you must not
16#    claim that you wrote the original software. If you use this software
17#    in a product, an acknowledgment in the product documentation would be
18#    appreciated but is not required.
19# 2. Altered source versions must be plainly marked as such, and must not be
20#    misrepresented as being the original software.
21# 3. This notice may not be removed or altered from any source distribution.
22
23import subprocess
24import threading
25import unittest
26import sqlite3 as sqlite
27import sys
28
29from test.support import check_disallow_instantiation, SHORT_TIMEOUT
30from test.support.os_helper import TESTFN, unlink
31
32
33class ModuleTests(unittest.TestCase):
34    def test_api_level(self):
35        self.assertEqual(sqlite.apilevel, "2.0",
36                         "apilevel is %s, should be 2.0" % sqlite.apilevel)
37
38    def test_thread_safety(self):
39        self.assertEqual(sqlite.threadsafety, 1,
40                         "threadsafety is %d, should be 1" % sqlite.threadsafety)
41
42    def test_param_style(self):
43        self.assertEqual(sqlite.paramstyle, "qmark",
44                         "paramstyle is '%s', should be 'qmark'" %
45                         sqlite.paramstyle)
46
47    def test_warning(self):
48        self.assertTrue(issubclass(sqlite.Warning, Exception),
49                     "Warning is not a subclass of Exception")
50
51    def test_error(self):
52        self.assertTrue(issubclass(sqlite.Error, Exception),
53                        "Error is not a subclass of Exception")
54
55    def test_interface_error(self):
56        self.assertTrue(issubclass(sqlite.InterfaceError, sqlite.Error),
57                        "InterfaceError is not a subclass of Error")
58
59    def test_database_error(self):
60        self.assertTrue(issubclass(sqlite.DatabaseError, sqlite.Error),
61                        "DatabaseError is not a subclass of Error")
62
63    def test_data_error(self):
64        self.assertTrue(issubclass(sqlite.DataError, sqlite.DatabaseError),
65                        "DataError is not a subclass of DatabaseError")
66
67    def test_operational_error(self):
68        self.assertTrue(issubclass(sqlite.OperationalError, sqlite.DatabaseError),
69                        "OperationalError is not a subclass of DatabaseError")
70
71    def test_integrity_error(self):
72        self.assertTrue(issubclass(sqlite.IntegrityError, sqlite.DatabaseError),
73                        "IntegrityError is not a subclass of DatabaseError")
74
75    def test_internal_error(self):
76        self.assertTrue(issubclass(sqlite.InternalError, sqlite.DatabaseError),
77                        "InternalError is not a subclass of DatabaseError")
78
79    def test_programming_error(self):
80        self.assertTrue(issubclass(sqlite.ProgrammingError, sqlite.DatabaseError),
81                        "ProgrammingError is not a subclass of DatabaseError")
82
83    def test_not_supported_error(self):
84        self.assertTrue(issubclass(sqlite.NotSupportedError,
85                                   sqlite.DatabaseError),
86                        "NotSupportedError is not a subclass of DatabaseError")
87
88    # sqlite3_enable_shared_cache() is deprecated on macOS and calling it may raise
89    # OperationalError on some buildbots.
90    @unittest.skipIf(sys.platform == "darwin", "shared cache is deprecated on macOS")
91    def test_shared_cache_deprecated(self):
92        for enable in (True, False):
93            with self.assertWarns(DeprecationWarning) as cm:
94                sqlite.enable_shared_cache(enable)
95            self.assertIn("dbapi.py", cm.filename)
96
97    def test_disallow_instantiation(self):
98        cx = sqlite.connect(":memory:")
99        check_disallow_instantiation(self, type(cx("select 1")))
100
101
102class ConnectionTests(unittest.TestCase):
103
104    def setUp(self):
105        self.cx = sqlite.connect(":memory:")
106        cu = self.cx.cursor()
107        cu.execute("create table test(id integer primary key, name text)")
108        cu.execute("insert into test(name) values (?)", ("foo",))
109
110    def tearDown(self):
111        self.cx.close()
112
113    def test_commit(self):
114        self.cx.commit()
115
116    def test_commit_after_no_changes(self):
117        """
118        A commit should also work when no changes were made to the database.
119        """
120        self.cx.commit()
121        self.cx.commit()
122
123    def test_rollback(self):
124        self.cx.rollback()
125
126    def test_rollback_after_no_changes(self):
127        """
128        A rollback should also work when no changes were made to the database.
129        """
130        self.cx.rollback()
131        self.cx.rollback()
132
133    def test_cursor(self):
134        cu = self.cx.cursor()
135
136    def test_failed_open(self):
137        YOU_CANNOT_OPEN_THIS = "/foo/bar/bla/23534/mydb.db"
138        with self.assertRaises(sqlite.OperationalError):
139            con = sqlite.connect(YOU_CANNOT_OPEN_THIS)
140
141    def test_close(self):
142        self.cx.close()
143
144    def test_exceptions(self):
145        # Optional DB-API extension.
146        self.assertEqual(self.cx.Warning, sqlite.Warning)
147        self.assertEqual(self.cx.Error, sqlite.Error)
148        self.assertEqual(self.cx.InterfaceError, sqlite.InterfaceError)
149        self.assertEqual(self.cx.DatabaseError, sqlite.DatabaseError)
150        self.assertEqual(self.cx.DataError, sqlite.DataError)
151        self.assertEqual(self.cx.OperationalError, sqlite.OperationalError)
152        self.assertEqual(self.cx.IntegrityError, sqlite.IntegrityError)
153        self.assertEqual(self.cx.InternalError, sqlite.InternalError)
154        self.assertEqual(self.cx.ProgrammingError, sqlite.ProgrammingError)
155        self.assertEqual(self.cx.NotSupportedError, sqlite.NotSupportedError)
156
157    def test_in_transaction(self):
158        # Can't use db from setUp because we want to test initial state.
159        cx = sqlite.connect(":memory:")
160        cu = cx.cursor()
161        self.assertEqual(cx.in_transaction, False)
162        cu.execute("create table transactiontest(id integer primary key, name text)")
163        self.assertEqual(cx.in_transaction, False)
164        cu.execute("insert into transactiontest(name) values (?)", ("foo",))
165        self.assertEqual(cx.in_transaction, True)
166        cu.execute("select name from transactiontest where name=?", ["foo"])
167        row = cu.fetchone()
168        self.assertEqual(cx.in_transaction, True)
169        cx.commit()
170        self.assertEqual(cx.in_transaction, False)
171        cu.execute("select name from transactiontest where name=?", ["foo"])
172        row = cu.fetchone()
173        self.assertEqual(cx.in_transaction, False)
174
175    def test_in_transaction_ro(self):
176        with self.assertRaises(AttributeError):
177            self.cx.in_transaction = True
178
179    def test_open_with_path_like_object(self):
180        """ Checks that we can successfully connect to a database using an object that
181            is PathLike, i.e. has __fspath__(). """
182        self.addCleanup(unlink, TESTFN)
183        class Path:
184            def __fspath__(self):
185                return TESTFN
186        path = Path()
187        with sqlite.connect(path) as cx:
188            cx.execute('create table test(id integer)')
189
190    def test_open_uri(self):
191        self.addCleanup(unlink, TESTFN)
192        with sqlite.connect(TESTFN) as cx:
193            cx.execute('create table test(id integer)')
194        with sqlite.connect('file:' + TESTFN, uri=True) as cx:
195            cx.execute('insert into test(id) values(0)')
196        with sqlite.connect('file:' + TESTFN + '?mode=ro', uri=True) as cx:
197            with self.assertRaises(sqlite.OperationalError):
198                cx.execute('insert into test(id) values(1)')
199
200
201class UninitialisedConnectionTests(unittest.TestCase):
202    def setUp(self):
203        self.cx = sqlite.Connection.__new__(sqlite.Connection)
204
205    def test_uninit_operations(self):
206        funcs = (
207            lambda: self.cx.isolation_level,
208            lambda: self.cx.total_changes,
209            lambda: self.cx.in_transaction,
210            lambda: self.cx.iterdump(),
211            lambda: self.cx.cursor(),
212            lambda: self.cx.close(),
213        )
214        for func in funcs:
215            with self.subTest(func=func):
216                self.assertRaisesRegex(sqlite.ProgrammingError,
217                                       "Base Connection.__init__ not called",
218                                       func)
219
220
221class CursorTests(unittest.TestCase):
222    def setUp(self):
223        self.cx = sqlite.connect(":memory:")
224        self.cu = self.cx.cursor()
225        self.cu.execute(
226            "create table test(id integer primary key, name text, "
227            "income number, unique_test text unique)"
228        )
229        self.cu.execute("insert into test(name) values (?)", ("foo",))
230
231    def tearDown(self):
232        self.cu.close()
233        self.cx.close()
234
235    def test_execute_no_args(self):
236        self.cu.execute("delete from test")
237
238    def test_execute_illegal_sql(self):
239        with self.assertRaises(sqlite.OperationalError):
240            self.cu.execute("select asdf")
241
242    def test_execute_too_much_sql(self):
243        with self.assertRaises(sqlite.Warning):
244            self.cu.execute("select 5+4; select 4+5")
245
246    def test_execute_too_much_sql2(self):
247        self.cu.execute("select 5+4; -- foo bar")
248
249    def test_execute_too_much_sql3(self):
250        self.cu.execute("""
251            select 5+4;
252
253            /*
254            foo
255            */
256            """)
257
258    def test_execute_wrong_sql_arg(self):
259        with self.assertRaises(TypeError):
260            self.cu.execute(42)
261
262    def test_execute_arg_int(self):
263        self.cu.execute("insert into test(id) values (?)", (42,))
264
265    def test_execute_arg_float(self):
266        self.cu.execute("insert into test(income) values (?)", (2500.32,))
267
268    def test_execute_arg_string(self):
269        self.cu.execute("insert into test(name) values (?)", ("Hugo",))
270
271    def test_execute_arg_string_with_zero_byte(self):
272        self.cu.execute("insert into test(name) values (?)", ("Hu\x00go",))
273
274        self.cu.execute("select name from test where id=?", (self.cu.lastrowid,))
275        row = self.cu.fetchone()
276        self.assertEqual(row[0], "Hu\x00go")
277
278    def test_execute_non_iterable(self):
279        with self.assertRaises(ValueError) as cm:
280            self.cu.execute("insert into test(id) values (?)", 42)
281        self.assertEqual(str(cm.exception), 'parameters are of unsupported type')
282
283    def test_execute_wrong_no_of_args1(self):
284        # too many parameters
285        with self.assertRaises(sqlite.ProgrammingError):
286            self.cu.execute("insert into test(id) values (?)", (17, "Egon"))
287
288    def test_execute_wrong_no_of_args2(self):
289        # too little parameters
290        with self.assertRaises(sqlite.ProgrammingError):
291            self.cu.execute("insert into test(id) values (?)")
292
293    def test_execute_wrong_no_of_args3(self):
294        # no parameters, parameters are needed
295        with self.assertRaises(sqlite.ProgrammingError):
296            self.cu.execute("insert into test(id) values (?)")
297
298    def test_execute_param_list(self):
299        self.cu.execute("insert into test(name) values ('foo')")
300        self.cu.execute("select name from test where name=?", ["foo"])
301        row = self.cu.fetchone()
302        self.assertEqual(row[0], "foo")
303
304    def test_execute_param_sequence(self):
305        class L:
306            def __len__(self):
307                return 1
308            def __getitem__(self, x):
309                assert x == 0
310                return "foo"
311
312        self.cu.execute("insert into test(name) values ('foo')")
313        self.cu.execute("select name from test where name=?", L())
314        row = self.cu.fetchone()
315        self.assertEqual(row[0], "foo")
316
317    def test_execute_param_sequence_bad_len(self):
318        # Issue41662: Error in __len__() was overridden with ProgrammingError.
319        class L:
320            def __len__(self):
321                1/0
322            def __getitem__(slf, x):
323                raise AssertionError
324
325        self.cu.execute("insert into test(name) values ('foo')")
326        with self.assertRaises(ZeroDivisionError):
327            self.cu.execute("select name from test where name=?", L())
328
329    def test_execute_dict_mapping(self):
330        self.cu.execute("insert into test(name) values ('foo')")
331        self.cu.execute("select name from test where name=:name", {"name": "foo"})
332        row = self.cu.fetchone()
333        self.assertEqual(row[0], "foo")
334
335    def test_execute_dict_mapping_mapping(self):
336        class D(dict):
337            def __missing__(self, key):
338                return "foo"
339
340        self.cu.execute("insert into test(name) values ('foo')")
341        self.cu.execute("select name from test where name=:name", D())
342        row = self.cu.fetchone()
343        self.assertEqual(row[0], "foo")
344
345    def test_execute_dict_mapping_too_little_args(self):
346        self.cu.execute("insert into test(name) values ('foo')")
347        with self.assertRaises(sqlite.ProgrammingError):
348            self.cu.execute("select name from test where name=:name and id=:id", {"name": "foo"})
349
350    def test_execute_dict_mapping_no_args(self):
351        self.cu.execute("insert into test(name) values ('foo')")
352        with self.assertRaises(sqlite.ProgrammingError):
353            self.cu.execute("select name from test where name=:name")
354
355    def test_execute_dict_mapping_unnamed(self):
356        self.cu.execute("insert into test(name) values ('foo')")
357        with self.assertRaises(sqlite.ProgrammingError):
358            self.cu.execute("select name from test where name=?", {"name": "foo"})
359
360    def test_close(self):
361        self.cu.close()
362
363    def test_rowcount_execute(self):
364        self.cu.execute("delete from test")
365        self.cu.execute("insert into test(name) values ('foo')")
366        self.cu.execute("insert into test(name) values ('foo')")
367        self.cu.execute("update test set name='bar'")
368        self.assertEqual(self.cu.rowcount, 2)
369
370    def test_rowcount_select(self):
371        """
372        pysqlite does not know the rowcount of SELECT statements, because we
373        don't fetch all rows after executing the select statement. The rowcount
374        has thus to be -1.
375        """
376        self.cu.execute("select 5 union select 6")
377        self.assertEqual(self.cu.rowcount, -1)
378
379    def test_rowcount_executemany(self):
380        self.cu.execute("delete from test")
381        self.cu.executemany("insert into test(name) values (?)", [(1,), (2,), (3,)])
382        self.assertEqual(self.cu.rowcount, 3)
383
384    def test_total_changes(self):
385        self.cu.execute("insert into test(name) values ('foo')")
386        self.cu.execute("insert into test(name) values ('foo')")
387        self.assertLess(2, self.cx.total_changes, msg='total changes reported wrong value')
388
389    # Checks for executemany:
390    # Sequences are required by the DB-API, iterators
391    # enhancements in pysqlite.
392
393    def test_execute_many_sequence(self):
394        self.cu.executemany("insert into test(income) values (?)", [(x,) for x in range(100, 110)])
395
396    def test_execute_many_iterator(self):
397        class MyIter:
398            def __init__(self):
399                self.value = 5
400
401            def __iter__(self):
402                return self
403
404            def __next__(self):
405                if self.value == 10:
406                    raise StopIteration
407                else:
408                    self.value += 1
409                    return (self.value,)
410
411        self.cu.executemany("insert into test(income) values (?)", MyIter())
412
413    def test_execute_many_generator(self):
414        def mygen():
415            for i in range(5):
416                yield (i,)
417
418        self.cu.executemany("insert into test(income) values (?)", mygen())
419
420    def test_execute_many_wrong_sql_arg(self):
421        with self.assertRaises(TypeError):
422            self.cu.executemany(42, [(3,)])
423
424    def test_execute_many_select(self):
425        with self.assertRaises(sqlite.ProgrammingError):
426            self.cu.executemany("select ?", [(3,)])
427
428    def test_execute_many_not_iterable(self):
429        with self.assertRaises(TypeError):
430            self.cu.executemany("insert into test(income) values (?)", 42)
431
432    def test_fetch_iter(self):
433        # Optional DB-API extension.
434        self.cu.execute("delete from test")
435        self.cu.execute("insert into test(id) values (?)", (5,))
436        self.cu.execute("insert into test(id) values (?)", (6,))
437        self.cu.execute("select id from test order by id")
438        lst = []
439        for row in self.cu:
440            lst.append(row[0])
441        self.assertEqual(lst[0], 5)
442        self.assertEqual(lst[1], 6)
443
444    def test_fetchone(self):
445        self.cu.execute("select name from test")
446        row = self.cu.fetchone()
447        self.assertEqual(row[0], "foo")
448        row = self.cu.fetchone()
449        self.assertEqual(row, None)
450
451    def test_fetchone_no_statement(self):
452        cur = self.cx.cursor()
453        row = cur.fetchone()
454        self.assertEqual(row, None)
455
456    def test_array_size(self):
457        # must default to 1
458        self.assertEqual(self.cu.arraysize, 1)
459
460        # now set to 2
461        self.cu.arraysize = 2
462
463        # now make the query return 3 rows
464        self.cu.execute("delete from test")
465        self.cu.execute("insert into test(name) values ('A')")
466        self.cu.execute("insert into test(name) values ('B')")
467        self.cu.execute("insert into test(name) values ('C')")
468        self.cu.execute("select name from test")
469        res = self.cu.fetchmany()
470
471        self.assertEqual(len(res), 2)
472
473    def test_fetchmany(self):
474        self.cu.execute("select name from test")
475        res = self.cu.fetchmany(100)
476        self.assertEqual(len(res), 1)
477        res = self.cu.fetchmany(100)
478        self.assertEqual(res, [])
479
480    def test_fetchmany_kw_arg(self):
481        """Checks if fetchmany works with keyword arguments"""
482        self.cu.execute("select name from test")
483        res = self.cu.fetchmany(size=100)
484        self.assertEqual(len(res), 1)
485
486    def test_fetchall(self):
487        self.cu.execute("select name from test")
488        res = self.cu.fetchall()
489        self.assertEqual(len(res), 1)
490        res = self.cu.fetchall()
491        self.assertEqual(res, [])
492
493    def test_setinputsizes(self):
494        self.cu.setinputsizes([3, 4, 5])
495
496    def test_setoutputsize(self):
497        self.cu.setoutputsize(5, 0)
498
499    def test_setoutputsize_no_column(self):
500        self.cu.setoutputsize(42)
501
502    def test_cursor_connection(self):
503        # Optional DB-API extension.
504        self.assertEqual(self.cu.connection, self.cx)
505
506    def test_wrong_cursor_callable(self):
507        with self.assertRaises(TypeError):
508            def f(): pass
509            cur = self.cx.cursor(f)
510
511    def test_cursor_wrong_class(self):
512        class Foo: pass
513        foo = Foo()
514        with self.assertRaises(TypeError):
515            cur = sqlite.Cursor(foo)
516
517    def test_last_row_id_on_replace(self):
518        """
519        INSERT OR REPLACE and REPLACE INTO should produce the same behavior.
520        """
521        sql = '{} INTO test(id, unique_test) VALUES (?, ?)'
522        for statement in ('INSERT OR REPLACE', 'REPLACE'):
523            with self.subTest(statement=statement):
524                self.cu.execute(sql.format(statement), (1, 'foo'))
525                self.assertEqual(self.cu.lastrowid, 1)
526
527    def test_last_row_id_on_ignore(self):
528        self.cu.execute(
529            "insert or ignore into test(unique_test) values (?)",
530            ('test',))
531        self.assertEqual(self.cu.lastrowid, 2)
532        self.cu.execute(
533            "insert or ignore into test(unique_test) values (?)",
534            ('test',))
535        self.assertEqual(self.cu.lastrowid, 2)
536
537    def test_last_row_id_insert_o_r(self):
538        results = []
539        for statement in ('FAIL', 'ABORT', 'ROLLBACK'):
540            sql = 'INSERT OR {} INTO test(unique_test) VALUES (?)'
541            with self.subTest(statement='INSERT OR {}'.format(statement)):
542                self.cu.execute(sql.format(statement), (statement,))
543                results.append((statement, self.cu.lastrowid))
544                with self.assertRaises(sqlite.IntegrityError):
545                    self.cu.execute(sql.format(statement), (statement,))
546                results.append((statement, self.cu.lastrowid))
547        expected = [
548            ('FAIL', 2), ('FAIL', 2),
549            ('ABORT', 3), ('ABORT', 3),
550            ('ROLLBACK', 4), ('ROLLBACK', 4),
551        ]
552        self.assertEqual(results, expected)
553
554
555class ThreadTests(unittest.TestCase):
556    def setUp(self):
557        self.con = sqlite.connect(":memory:")
558        self.cur = self.con.cursor()
559        self.cur.execute("create table test(id integer primary key, name text, bin binary, ratio number, ts timestamp)")
560
561    def tearDown(self):
562        self.cur.close()
563        self.con.close()
564
565    def test_con_cursor(self):
566        def run(con, errors):
567            try:
568                cur = con.cursor()
569                errors.append("did not raise ProgrammingError")
570                return
571            except sqlite.ProgrammingError:
572                return
573            except:
574                errors.append("raised wrong exception")
575
576        errors = []
577        t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
578        t.start()
579        t.join()
580        if len(errors) > 0:
581            self.fail("\n".join(errors))
582
583    def test_con_commit(self):
584        def run(con, errors):
585            try:
586                con.commit()
587                errors.append("did not raise ProgrammingError")
588                return
589            except sqlite.ProgrammingError:
590                return
591            except:
592                errors.append("raised wrong exception")
593
594        errors = []
595        t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
596        t.start()
597        t.join()
598        if len(errors) > 0:
599            self.fail("\n".join(errors))
600
601    def test_con_rollback(self):
602        def run(con, errors):
603            try:
604                con.rollback()
605                errors.append("did not raise ProgrammingError")
606                return
607            except sqlite.ProgrammingError:
608                return
609            except:
610                errors.append("raised wrong exception")
611
612        errors = []
613        t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
614        t.start()
615        t.join()
616        if len(errors) > 0:
617            self.fail("\n".join(errors))
618
619    def test_con_close(self):
620        def run(con, errors):
621            try:
622                con.close()
623                errors.append("did not raise ProgrammingError")
624                return
625            except sqlite.ProgrammingError:
626                return
627            except:
628                errors.append("raised wrong exception")
629
630        errors = []
631        t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
632        t.start()
633        t.join()
634        if len(errors) > 0:
635            self.fail("\n".join(errors))
636
637    def test_cur_implicit_begin(self):
638        def run(cur, errors):
639            try:
640                cur.execute("insert into test(name) values ('a')")
641                errors.append("did not raise ProgrammingError")
642                return
643            except sqlite.ProgrammingError:
644                return
645            except:
646                errors.append("raised wrong exception")
647
648        errors = []
649        t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
650        t.start()
651        t.join()
652        if len(errors) > 0:
653            self.fail("\n".join(errors))
654
655    def test_cur_close(self):
656        def run(cur, errors):
657            try:
658                cur.close()
659                errors.append("did not raise ProgrammingError")
660                return
661            except sqlite.ProgrammingError:
662                return
663            except:
664                errors.append("raised wrong exception")
665
666        errors = []
667        t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
668        t.start()
669        t.join()
670        if len(errors) > 0:
671            self.fail("\n".join(errors))
672
673    def test_cur_execute(self):
674        def run(cur, errors):
675            try:
676                cur.execute("select name from test")
677                errors.append("did not raise ProgrammingError")
678                return
679            except sqlite.ProgrammingError:
680                return
681            except:
682                errors.append("raised wrong exception")
683
684        errors = []
685        self.cur.execute("insert into test(name) values ('a')")
686        t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
687        t.start()
688        t.join()
689        if len(errors) > 0:
690            self.fail("\n".join(errors))
691
692    def test_cur_iter_next(self):
693        def run(cur, errors):
694            try:
695                row = cur.fetchone()
696                errors.append("did not raise ProgrammingError")
697                return
698            except sqlite.ProgrammingError:
699                return
700            except:
701                errors.append("raised wrong exception")
702
703        errors = []
704        self.cur.execute("insert into test(name) values ('a')")
705        self.cur.execute("select name from test")
706        t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
707        t.start()
708        t.join()
709        if len(errors) > 0:
710            self.fail("\n".join(errors))
711
712class ConstructorTests(unittest.TestCase):
713    def test_date(self):
714        d = sqlite.Date(2004, 10, 28)
715
716    def test_time(self):
717        t = sqlite.Time(12, 39, 35)
718
719    def test_timestamp(self):
720        ts = sqlite.Timestamp(2004, 10, 28, 12, 39, 35)
721
722    def test_date_from_ticks(self):
723        d = sqlite.DateFromTicks(42)
724
725    def test_time_from_ticks(self):
726        t = sqlite.TimeFromTicks(42)
727
728    def test_timestamp_from_ticks(self):
729        ts = sqlite.TimestampFromTicks(42)
730
731    def test_binary(self):
732        b = sqlite.Binary(b"\0'")
733
734class ExtensionTests(unittest.TestCase):
735    def test_script_string_sql(self):
736        con = sqlite.connect(":memory:")
737        cur = con.cursor()
738        cur.executescript("""
739            -- bla bla
740            /* a stupid comment */
741            create table a(i);
742            insert into a(i) values (5);
743            """)
744        cur.execute("select i from a")
745        res = cur.fetchone()[0]
746        self.assertEqual(res, 5)
747
748    def test_script_syntax_error(self):
749        con = sqlite.connect(":memory:")
750        cur = con.cursor()
751        with self.assertRaises(sqlite.OperationalError):
752            cur.executescript("create table test(x); asdf; create table test2(x)")
753
754    def test_script_error_normal(self):
755        con = sqlite.connect(":memory:")
756        cur = con.cursor()
757        with self.assertRaises(sqlite.OperationalError):
758            cur.executescript("create table test(sadfsadfdsa); select foo from hurz;")
759
760    def test_cursor_executescript_as_bytes(self):
761        con = sqlite.connect(":memory:")
762        cur = con.cursor()
763        with self.assertRaises(ValueError) as cm:
764            cur.executescript(b"create table test(foo); insert into test(foo) values (5);")
765        self.assertEqual(str(cm.exception), 'script argument must be unicode.')
766
767    def test_connection_execute(self):
768        con = sqlite.connect(":memory:")
769        result = con.execute("select 5").fetchone()[0]
770        self.assertEqual(result, 5, "Basic test of Connection.execute")
771
772    def test_connection_executemany(self):
773        con = sqlite.connect(":memory:")
774        con.execute("create table test(foo)")
775        con.executemany("insert into test(foo) values (?)", [(3,), (4,)])
776        result = con.execute("select foo from test order by foo").fetchall()
777        self.assertEqual(result[0][0], 3, "Basic test of Connection.executemany")
778        self.assertEqual(result[1][0], 4, "Basic test of Connection.executemany")
779
780    def test_connection_executescript(self):
781        con = sqlite.connect(":memory:")
782        con.executescript("create table test(foo); insert into test(foo) values (5);")
783        result = con.execute("select foo from test").fetchone()[0]
784        self.assertEqual(result, 5, "Basic test of Connection.executescript")
785
786class ClosedConTests(unittest.TestCase):
787    def test_closed_con_cursor(self):
788        con = sqlite.connect(":memory:")
789        con.close()
790        with self.assertRaises(sqlite.ProgrammingError):
791            cur = con.cursor()
792
793    def test_closed_con_commit(self):
794        con = sqlite.connect(":memory:")
795        con.close()
796        with self.assertRaises(sqlite.ProgrammingError):
797            con.commit()
798
799    def test_closed_con_rollback(self):
800        con = sqlite.connect(":memory:")
801        con.close()
802        with self.assertRaises(sqlite.ProgrammingError):
803            con.rollback()
804
805    def test_closed_cur_execute(self):
806        con = sqlite.connect(":memory:")
807        cur = con.cursor()
808        con.close()
809        with self.assertRaises(sqlite.ProgrammingError):
810            cur.execute("select 4")
811
812    def test_closed_create_function(self):
813        con = sqlite.connect(":memory:")
814        con.close()
815        def f(x): return 17
816        with self.assertRaises(sqlite.ProgrammingError):
817            con.create_function("foo", 1, f)
818
819    def test_closed_create_aggregate(self):
820        con = sqlite.connect(":memory:")
821        con.close()
822        class Agg:
823            def __init__(self):
824                pass
825            def step(self, x):
826                pass
827            def finalize(self):
828                return 17
829        with self.assertRaises(sqlite.ProgrammingError):
830            con.create_aggregate("foo", 1, Agg)
831
832    def test_closed_set_authorizer(self):
833        con = sqlite.connect(":memory:")
834        con.close()
835        def authorizer(*args):
836            return sqlite.DENY
837        with self.assertRaises(sqlite.ProgrammingError):
838            con.set_authorizer(authorizer)
839
840    def test_closed_set_progress_callback(self):
841        con = sqlite.connect(":memory:")
842        con.close()
843        def progress(): pass
844        with self.assertRaises(sqlite.ProgrammingError):
845            con.set_progress_handler(progress, 100)
846
847    def test_closed_call(self):
848        con = sqlite.connect(":memory:")
849        con.close()
850        with self.assertRaises(sqlite.ProgrammingError):
851            con()
852
853class ClosedCurTests(unittest.TestCase):
854    def test_closed(self):
855        con = sqlite.connect(":memory:")
856        cur = con.cursor()
857        cur.close()
858
859        for method_name in ("execute", "executemany", "executescript", "fetchall", "fetchmany", "fetchone"):
860            if method_name in ("execute", "executescript"):
861                params = ("select 4 union select 5",)
862            elif method_name == "executemany":
863                params = ("insert into foo(bar) values (?)", [(3,), (4,)])
864            else:
865                params = []
866
867            with self.assertRaises(sqlite.ProgrammingError):
868                method = getattr(cur, method_name)
869                method(*params)
870
871
872class SqliteOnConflictTests(unittest.TestCase):
873    """
874    Tests for SQLite's "insert on conflict" feature.
875
876    See https://www.sqlite.org/lang_conflict.html for details.
877    """
878
879    def setUp(self):
880        self.cx = sqlite.connect(":memory:")
881        self.cu = self.cx.cursor()
882        self.cu.execute("""
883          CREATE TABLE test(
884            id INTEGER PRIMARY KEY, name TEXT, unique_name TEXT UNIQUE
885          );
886        """)
887
888    def tearDown(self):
889        self.cu.close()
890        self.cx.close()
891
892    def test_on_conflict_rollback_with_explicit_transaction(self):
893        self.cx.isolation_level = None  # autocommit mode
894        self.cu = self.cx.cursor()
895        # Start an explicit transaction.
896        self.cu.execute("BEGIN")
897        self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')")
898        self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')")
899        with self.assertRaises(sqlite.IntegrityError):
900            self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')")
901        # Use connection to commit.
902        self.cx.commit()
903        self.cu.execute("SELECT name, unique_name from test")
904        # Transaction should have rolled back and nothing should be in table.
905        self.assertEqual(self.cu.fetchall(), [])
906
907    def test_on_conflict_abort_raises_with_explicit_transactions(self):
908        # Abort cancels the current sql statement but doesn't change anything
909        # about the current transaction.
910        self.cx.isolation_level = None  # autocommit mode
911        self.cu = self.cx.cursor()
912        # Start an explicit transaction.
913        self.cu.execute("BEGIN")
914        self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')")
915        self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')")
916        with self.assertRaises(sqlite.IntegrityError):
917            self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')")
918        self.cx.commit()
919        self.cu.execute("SELECT name, unique_name FROM test")
920        # Expect the first two inserts to work, third to do nothing.
921        self.assertEqual(self.cu.fetchall(), [('abort_test', None), (None, 'foo',)])
922
923    def test_on_conflict_rollback_without_transaction(self):
924        # Start of implicit transaction
925        self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')")
926        self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')")
927        with self.assertRaises(sqlite.IntegrityError):
928            self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')")
929        self.cu.execute("SELECT name, unique_name FROM test")
930        # Implicit transaction is rolled back on error.
931        self.assertEqual(self.cu.fetchall(), [])
932
933    def test_on_conflict_abort_raises_without_transactions(self):
934        # Abort cancels the current sql statement but doesn't change anything
935        # about the current transaction.
936        self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')")
937        self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')")
938        with self.assertRaises(sqlite.IntegrityError):
939            self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')")
940        # Make sure all other values were inserted.
941        self.cu.execute("SELECT name, unique_name FROM test")
942        self.assertEqual(self.cu.fetchall(), [('abort_test', None), (None, 'foo',)])
943
944    def test_on_conflict_fail(self):
945        self.cu.execute("INSERT OR FAIL INTO test(unique_name) VALUES ('foo')")
946        with self.assertRaises(sqlite.IntegrityError):
947            self.cu.execute("INSERT OR FAIL INTO test(unique_name) VALUES ('foo')")
948        self.assertEqual(self.cu.fetchall(), [])
949
950    def test_on_conflict_ignore(self):
951        self.cu.execute("INSERT OR IGNORE INTO test(unique_name) VALUES ('foo')")
952        # Nothing should happen.
953        self.cu.execute("INSERT OR IGNORE INTO test(unique_name) VALUES ('foo')")
954        self.cu.execute("SELECT unique_name FROM test")
955        self.assertEqual(self.cu.fetchall(), [('foo',)])
956
957    def test_on_conflict_replace(self):
958        self.cu.execute("INSERT OR REPLACE INTO test(name, unique_name) VALUES ('Data!', 'foo')")
959        # There shouldn't be an IntegrityError exception.
960        self.cu.execute("INSERT OR REPLACE INTO test(name, unique_name) VALUES ('Very different data!', 'foo')")
961        self.cu.execute("SELECT name, unique_name FROM test")
962        self.assertEqual(self.cu.fetchall(), [('Very different data!', 'foo')])
963
964
965class MultiprocessTests(unittest.TestCase):
966    CONNECTION_TIMEOUT = SHORT_TIMEOUT / 1000.  # Defaults to 30 ms
967
968    def tearDown(self):
969        unlink(TESTFN)
970
971    def test_ctx_mgr_rollback_if_commit_failed(self):
972        # bpo-27334: ctx manager does not rollback if commit fails
973        SCRIPT = f"""if 1:
974            import sqlite3
975            def wait():
976                print("started")
977                assert "database is locked" in input()
978
979            cx = sqlite3.connect("{TESTFN}", timeout={self.CONNECTION_TIMEOUT})
980            cx.create_function("wait", 0, wait)
981            with cx:
982                cx.execute("create table t(t)")
983            try:
984                # execute two transactions; both will try to lock the db
985                cx.executescript('''
986                    -- start a transaction and wait for parent
987                    begin transaction;
988                    select * from t;
989                    select wait();
990                    rollback;
991
992                    -- start a new transaction; would fail if parent holds lock
993                    begin transaction;
994                    select * from t;
995                    rollback;
996                ''')
997            finally:
998                cx.close()
999        """
1000
1001        # spawn child process
1002        proc = subprocess.Popen(
1003            [sys.executable, "-c", SCRIPT],
1004            encoding="utf-8",
1005            bufsize=0,
1006            stdin=subprocess.PIPE,
1007            stdout=subprocess.PIPE,
1008        )
1009        self.addCleanup(proc.communicate)
1010
1011        # wait for child process to start
1012        self.assertEqual("started", proc.stdout.readline().strip())
1013
1014        cx = sqlite.connect(TESTFN, timeout=self.CONNECTION_TIMEOUT)
1015        try:  # context manager should correctly release the db lock
1016            with cx:
1017                cx.execute("insert into t values('test')")
1018        except sqlite.OperationalError as exc:
1019            proc.stdin.write(str(exc))
1020        else:
1021            proc.stdin.write("no error")
1022        finally:
1023            cx.close()
1024
1025        # terminate child process
1026        self.assertIsNone(proc.returncode)
1027        try:
1028            proc.communicate(input="end", timeout=SHORT_TIMEOUT)
1029        except subprocess.TimeoutExpired:
1030            proc.kill()
1031            proc.communicate()
1032            raise
1033        self.assertEqual(proc.returncode, 0)
1034
1035
1036def suite():
1037    tests = [
1038        ClosedConTests,
1039        ClosedCurTests,
1040        ConnectionTests,
1041        ConstructorTests,
1042        CursorTests,
1043        ExtensionTests,
1044        ModuleTests,
1045        MultiprocessTests,
1046        SqliteOnConflictTests,
1047        ThreadTests,
1048        UninitialisedConnectionTests,
1049    ]
1050    return unittest.TestSuite(
1051        [unittest.TestLoader().loadTestsFromTestCase(t) for t in tests]
1052    )
1053
1054def test():
1055    runner = unittest.TextTestRunner()
1056    runner.run(suite())
1057
1058if __name__ == "__main__":
1059    test()
1060