• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import os
2import sqlite3
3from pathlib import Path
4from contextlib import suppress, closing
5from collections.abc import MutableMapping
6
7BUILD_TABLE = """
8  CREATE TABLE IF NOT EXISTS Dict (
9    key BLOB UNIQUE NOT NULL,
10    value BLOB NOT NULL
11  )
12"""
13GET_SIZE = "SELECT COUNT (key) FROM Dict"
14LOOKUP_KEY = "SELECT value FROM Dict WHERE key = CAST(? AS BLOB)"
15STORE_KV = "REPLACE INTO Dict (key, value) VALUES (CAST(? AS BLOB), CAST(? AS BLOB))"
16DELETE_KEY = "DELETE FROM Dict WHERE key = CAST(? AS BLOB)"
17ITER_KEYS = "SELECT key FROM Dict"
18
19
20class error(OSError):
21    pass
22
23
24_ERR_CLOSED = "DBM object has already been closed"
25_ERR_REINIT = "DBM object does not support reinitialization"
26
27
28def _normalize_uri(path):
29    path = Path(path)
30    uri = path.absolute().as_uri()
31    while "//" in uri:
32        uri = uri.replace("//", "/")
33    return uri
34
35
36class _Database(MutableMapping):
37
38    def __init__(self, path, /, *, flag, mode):
39        if hasattr(self, "_cx"):
40            raise error(_ERR_REINIT)
41
42        path = os.fsdecode(path)
43        match flag:
44            case "r":
45                flag = "ro"
46            case "w":
47                flag = "rw"
48            case "c":
49                flag = "rwc"
50                Path(path).touch(mode=mode, exist_ok=True)
51            case "n":
52                flag = "rwc"
53                Path(path).unlink(missing_ok=True)
54                Path(path).touch(mode=mode)
55            case _:
56                raise ValueError("Flag must be one of 'r', 'w', 'c', or 'n', "
57                                 f"not {flag!r}")
58
59        # We use the URI format when opening the database.
60        uri = _normalize_uri(path)
61        uri = f"{uri}?mode={flag}"
62
63        try:
64            self._cx = sqlite3.connect(uri, autocommit=True, uri=True)
65        except sqlite3.Error as exc:
66            raise error(str(exc))
67
68        # This is an optimization only; it's ok if it fails.
69        with suppress(sqlite3.OperationalError):
70            self._cx.execute("PRAGMA journal_mode = wal")
71
72        if flag == "rwc":
73            self._execute(BUILD_TABLE)
74
75    def _execute(self, *args, **kwargs):
76        if not self._cx:
77            raise error(_ERR_CLOSED)
78        try:
79            return closing(self._cx.execute(*args, **kwargs))
80        except sqlite3.Error as exc:
81            raise error(str(exc))
82
83    def __len__(self):
84        with self._execute(GET_SIZE) as cu:
85            row = cu.fetchone()
86        return row[0]
87
88    def __getitem__(self, key):
89        with self._execute(LOOKUP_KEY, (key,)) as cu:
90            row = cu.fetchone()
91        if not row:
92            raise KeyError(key)
93        return row[0]
94
95    def __setitem__(self, key, value):
96        self._execute(STORE_KV, (key, value))
97
98    def __delitem__(self, key):
99        with self._execute(DELETE_KEY, (key,)) as cu:
100            if not cu.rowcount:
101                raise KeyError(key)
102
103    def __iter__(self):
104        try:
105            with self._execute(ITER_KEYS) as cu:
106                for row in cu:
107                    yield row[0]
108        except sqlite3.Error as exc:
109            raise error(str(exc))
110
111    def close(self):
112        if self._cx:
113            self._cx.close()
114            self._cx = None
115
116    def keys(self):
117        return list(super().keys())
118
119    def __enter__(self):
120        return self
121
122    def __exit__(self, *args):
123        self.close()
124
125
126def open(filename, /, flag="r", mode=0o666):
127    """Open a dbm.sqlite3 database and return the dbm object.
128
129    The 'filename' parameter is the name of the database file.
130
131    The optional 'flag' parameter can be one of ...:
132        'r' (default): open an existing database for read only access
133        'w': open an existing database for read/write access
134        'c': create a database if it does not exist; open for read/write access
135        'n': always create a new, empty database; open for read/write access
136
137    The optional 'mode' parameter is the Unix file access mode of the database;
138    only used when creating a new database. Default: 0o666.
139    """
140    return _Database(filename, flag=flag, mode=mode)
141