1"""A simple SQLite CLI for the sqlite3 module. 2 3Apart from using 'argparse' for the command-line interface, 4this module implements the REPL as a thin wrapper around 5the InteractiveConsole class from the 'code' stdlib module. 6""" 7import sqlite3 8import sys 9 10from argparse import ArgumentParser 11from code import InteractiveConsole 12from textwrap import dedent 13 14 15def execute(c, sql, suppress_errors=True): 16 """Helper that wraps execution of SQL code. 17 18 This is used both by the REPL and by direct execution from the CLI. 19 20 'c' may be a cursor or a connection. 21 'sql' is the SQL string to execute. 22 """ 23 24 try: 25 for row in c.execute(sql): 26 print(row) 27 except sqlite3.Error as e: 28 tp = type(e).__name__ 29 try: 30 print(f"{tp} ({e.sqlite_errorname}): {e}", file=sys.stderr) 31 except AttributeError: 32 print(f"{tp}: {e}", file=sys.stderr) 33 if not suppress_errors: 34 sys.exit(1) 35 36 37class SqliteInteractiveConsole(InteractiveConsole): 38 """A simple SQLite REPL.""" 39 40 def __init__(self, connection): 41 super().__init__() 42 self._con = connection 43 self._cur = connection.cursor() 44 45 def runsource(self, source, filename="<input>", symbol="single"): 46 """Override runsource, the core of the InteractiveConsole REPL. 47 48 Return True if more input is needed; buffering is done automatically. 49 Return False is input is a complete statement ready for execution. 50 """ 51 match source: 52 case ".version": 53 print(f"{sqlite3.sqlite_version}") 54 case ".help": 55 print("Enter SQL code and press enter.") 56 case ".quit": 57 sys.exit(0) 58 case _: 59 if not sqlite3.complete_statement(source): 60 return True 61 execute(self._cur, source) 62 return False 63 64 65def main(*args): 66 parser = ArgumentParser( 67 description="Python sqlite3 CLI", 68 prog="python -m sqlite3", 69 ) 70 parser.add_argument( 71 "filename", type=str, default=":memory:", nargs="?", 72 help=( 73 "SQLite database to open (defaults to ':memory:'). " 74 "A new database is created if the file does not previously exist." 75 ), 76 ) 77 parser.add_argument( 78 "sql", type=str, nargs="?", 79 help=( 80 "An SQL query to execute. " 81 "Any returned rows are printed to stdout." 82 ), 83 ) 84 parser.add_argument( 85 "-v", "--version", action="version", 86 version=f"SQLite version {sqlite3.sqlite_version}", 87 help="Print underlying SQLite library version", 88 ) 89 args = parser.parse_args(*args) 90 91 if args.filename == ":memory:": 92 db_name = "a transient in-memory database" 93 else: 94 db_name = repr(args.filename) 95 96 # Prepare REPL banner and prompts. 97 if sys.platform == "win32" and "idlelib.run" not in sys.modules: 98 eofkey = "CTRL-Z" 99 else: 100 eofkey = "CTRL-D" 101 banner = dedent(f""" 102 sqlite3 shell, running on SQLite version {sqlite3.sqlite_version} 103 Connected to {db_name} 104 105 Each command will be run using execute() on the cursor. 106 Type ".help" for more information; type ".quit" or {eofkey} to quit. 107 """).strip() 108 sys.ps1 = "sqlite> " 109 sys.ps2 = " ... " 110 111 con = sqlite3.connect(args.filename, isolation_level=None) 112 try: 113 if args.sql: 114 # SQL statement provided on the command-line; execute it directly. 115 execute(con, args.sql, suppress_errors=False) 116 else: 117 # No SQL provided; start the REPL. 118 console = SqliteInteractiveConsole(con) 119 try: 120 import readline 121 except ImportError: 122 pass 123 console.interact(banner, exitmsg="") 124 finally: 125 con.close() 126 127 sys.exit(0) 128 129 130if __name__ == "__main__": 131 main(sys.argv[1:]) 132