1import os 2import sqlite3 3from pathlib import Path 4from contextlib import suppress, closing 5from collections.abc import MutableMapping 6 7BUILD_TABLE = """ 8 CREATE TABLE IF NOT EXISTS Dict ( 9 key BLOB UNIQUE NOT NULL, 10 value BLOB NOT NULL 11 ) 12""" 13GET_SIZE = "SELECT COUNT (key) FROM Dict" 14LOOKUP_KEY = "SELECT value FROM Dict WHERE key = CAST(? AS BLOB)" 15STORE_KV = "REPLACE INTO Dict (key, value) VALUES (CAST(? AS BLOB), CAST(? AS BLOB))" 16DELETE_KEY = "DELETE FROM Dict WHERE key = CAST(? AS BLOB)" 17ITER_KEYS = "SELECT key FROM Dict" 18 19 20class error(OSError): 21 pass 22 23 24_ERR_CLOSED = "DBM object has already been closed" 25_ERR_REINIT = "DBM object does not support reinitialization" 26 27 28def _normalize_uri(path): 29 path = Path(path) 30 uri = path.absolute().as_uri() 31 while "//" in uri: 32 uri = uri.replace("//", "/") 33 return uri 34 35 36class _Database(MutableMapping): 37 38 def __init__(self, path, /, *, flag, mode): 39 if hasattr(self, "_cx"): 40 raise error(_ERR_REINIT) 41 42 path = os.fsdecode(path) 43 match flag: 44 case "r": 45 flag = "ro" 46 case "w": 47 flag = "rw" 48 case "c": 49 flag = "rwc" 50 Path(path).touch(mode=mode, exist_ok=True) 51 case "n": 52 flag = "rwc" 53 Path(path).unlink(missing_ok=True) 54 Path(path).touch(mode=mode) 55 case _: 56 raise ValueError("Flag must be one of 'r', 'w', 'c', or 'n', " 57 f"not {flag!r}") 58 59 # We use the URI format when opening the database. 60 uri = _normalize_uri(path) 61 uri = f"{uri}?mode={flag}" 62 63 try: 64 self._cx = sqlite3.connect(uri, autocommit=True, uri=True) 65 except sqlite3.Error as exc: 66 raise error(str(exc)) 67 68 # This is an optimization only; it's ok if it fails. 69 with suppress(sqlite3.OperationalError): 70 self._cx.execute("PRAGMA journal_mode = wal") 71 72 if flag == "rwc": 73 self._execute(BUILD_TABLE) 74 75 def _execute(self, *args, **kwargs): 76 if not self._cx: 77 raise error(_ERR_CLOSED) 78 try: 79 return closing(self._cx.execute(*args, **kwargs)) 80 except sqlite3.Error as exc: 81 raise error(str(exc)) 82 83 def __len__(self): 84 with self._execute(GET_SIZE) as cu: 85 row = cu.fetchone() 86 return row[0] 87 88 def __getitem__(self, key): 89 with self._execute(LOOKUP_KEY, (key,)) as cu: 90 row = cu.fetchone() 91 if not row: 92 raise KeyError(key) 93 return row[0] 94 95 def __setitem__(self, key, value): 96 self._execute(STORE_KV, (key, value)) 97 98 def __delitem__(self, key): 99 with self._execute(DELETE_KEY, (key,)) as cu: 100 if not cu.rowcount: 101 raise KeyError(key) 102 103 def __iter__(self): 104 try: 105 with self._execute(ITER_KEYS) as cu: 106 for row in cu: 107 yield row[0] 108 except sqlite3.Error as exc: 109 raise error(str(exc)) 110 111 def close(self): 112 if self._cx: 113 self._cx.close() 114 self._cx = None 115 116 def keys(self): 117 return list(super().keys()) 118 119 def __enter__(self): 120 return self 121 122 def __exit__(self, *args): 123 self.close() 124 125 126def open(filename, /, flag="r", mode=0o666): 127 """Open a dbm.sqlite3 database and return the dbm object. 128 129 The 'filename' parameter is the name of the database file. 130 131 The optional 'flag' parameter can be one of ...: 132 'r' (default): open an existing database for read only access 133 'w': open an existing database for read/write access 134 'c': create a database if it does not exist; open for read/write access 135 'n': always create a new, empty database; open for read/write access 136 137 The optional 'mode' parameter is the Unix file access mode of the database; 138 only used when creating a new database. Default: 0o666. 139 """ 140 return _Database(filename, flag=flag, mode=mode) 141