python (3.12.0)
1 """A simple SQLite CLI for the sqlite3 module.
2
3 Apart from using 'argparse' for the command-line interface,
4 this module implements the REPL as a thin wrapper around
5 the InteractiveConsole class from the 'code' stdlib module.
6 """
7 import sqlite3
8 import sys
9
10 from argparse import ArgumentParser
11 from code import InteractiveConsole
12 from textwrap import dedent
13
14
15 def execute(c, sql, suppress_errors=True):
16 """Helper that wraps execution of SQL code.
17
18 This is used both by the REPL and by direct execution from the CLI.
19
20 'c' may be a cursor or a connection.
21 'sql' is the SQL string to execute.
22 """
23
24 try:
25 for row in c.execute(sql):
26 print(row)
27 except sqlite3.Error as e:
28 tp = type(e).__name__
29 try:
30 print(f"{tp} ({e.sqlite_errorname}): {e}", file=sys.stderr)
31 except AttributeError:
32 print(f"{tp}: {e}", file=sys.stderr)
33 if not suppress_errors:
34 sys.exit(1)
35
36
37 class ESC[4;38;5;81mSqliteInteractiveConsole(ESC[4;38;5;149mInteractiveConsole):
38 """A simple SQLite REPL."""
39
40 def __init__(self, connection):
41 super().__init__()
42 self._con = connection
43 self._cur = connection.cursor()
44
45 def runsource(self, source, filename="<input>", symbol="single"):
46 """Override runsource, the core of the InteractiveConsole REPL.
47
48 Return True if more input is needed; buffering is done automatically.
49 Return False is input is a complete statement ready for execution.
50 """
51 match source:
52 case ".version":
53 print(f"{sqlite3.sqlite_version}")
54 case ".help":
55 print("Enter SQL code and press enter.")
56 case ".quit":
57 sys.exit(0)
58 case _:
59 if not sqlite3.complete_statement(source):
60 return True
61 execute(self._cur, source)
62 return False
63
64
65 def main(*args):
66 parser = ArgumentParser(
67 description="Python sqlite3 CLI",
68 prog="python -m sqlite3",
69 )
70 parser.add_argument(
71 "filename", type=str, default=":memory:", nargs="?",
72 help=(
73 "SQLite database to open (defaults to ':memory:'). "
74 "A new database is created if the file does not previously exist."
75 ),
76 )
77 parser.add_argument(
78 "sql", type=str, nargs="?",
79 help=(
80 "An SQL query to execute. "
81 "Any returned rows are printed to stdout."
82 ),
83 )
84 parser.add_argument(
85 "-v", "--version", action="version",
86 version=f"SQLite version {sqlite3.sqlite_version}",
87 help="Print underlying SQLite library version",
88 )
89 args = parser.parse_args(*args)
90
91 if args.filename == ":memory:":
92 db_name = "a transient in-memory database"
93 else:
94 db_name = repr(args.filename)
95
96 # Prepare REPL banner and prompts.
97 if sys.platform == "win32" and "idlelib.run" not in sys.modules:
98 eofkey = "CTRL-Z"
99 else:
100 eofkey = "CTRL-D"
101 banner = dedent(f"""
102 sqlite3 shell, running on SQLite version {sqlite3.sqlite_version}
103 Connected to {db_name}
104
105 Each command will be run using execute() on the cursor.
106 Type ".help" for more information; type ".quit" or {eofkey} to quit.
107 """).strip()
108 sys.ps1 = "sqlite> "
109 sys.ps2 = " ... "
110
111 con = sqlite3.connect(args.filename, isolation_level=None)
112 try:
113 if args.sql:
114 # SQL statement provided on the command-line; execute it directly.
115 execute(con, args.sql, suppress_errors=False)
116 else:
117 # No SQL provided; start the REPL.
118 console = SqliteInteractiveConsole(con)
119 console.interact(banner, exitmsg="")
120 finally:
121 con.close()
122
123 sys.exit(0)
124
125
126 if __name__ == "__main__":
127 main(sys.argv[1:])