• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #pragma once
2 #ifndef IWKV_INTERNAL_H
3 #define IWKV_INTERNAL_H
4 
5 #include "iwkv.h"
6 #include "iwlog.h"
7 #include "iwarr.h"
8 #include "iwutils.h"
9 #include "iwfsmfile.h"
10 #include "iwdlsnr.h"
11 #include "iwal.h"
12 #include "khash.h"
13 #include "ksort.h"
14 #include <pthread.h>
15 #include <stdatomic.h>
16 #include <unistd.h>
17 #include "iwcfg.h"
18 
19 #if defined(__APPLE__) || defined(__ANDROID__)
20 #include "pthread_spin_lock_shim.h"
21 #endif
22 
23 // IWKV magic number
24 #define IWKV_MAGIC 0x69776b76U
25 
26 // IWKV backup magic number
27 #define IWKV_BACKUP_MAGIC 0xBACBAC69U
28 
29 // IWKV file format version
30 #define IWKV_FORMAT 2
31 
32 // IWDB magic number
33 #define IWDB_MAGIC 0x69776462U
34 
35 #ifdef IW_32
36 // Max database file size on 32 bit systems: 2Gb
37 #define IWKV_MAX_DBSZ 0x7fffffff
38 #else
39 // Max database file size: ~512Gb
40 #define IWKV_MAX_DBSZ 0x7fffffff80ULL
41 #endif
42 
43 // Size of KV fsm block as power of 2
44 #define IWKV_FSM_BPOW 7U
45 
46 #define IWKV_FSM_ALLOC_FLAGS (IWFSM_ALLOC_NO_OVERALLOCATE | IWFSM_SOLID_ALLOCATED_SPACE | IWFSM_ALLOC_NO_STATS)
47 
48 // Length of KV fsm header in bytes
49 #define KVHDRSZ 255U
50 
51 // [u1:flags,lvl:u1,lkl:u1,pnum:u1,p0:u4,kblk:u4,[pi0:u1,... pi32],n0-n23:u4,lk:u116]:u256 // SBLK
52 
53 // Maximum length of prefix key to compare for v2 formst
54 #define PREFIX_KEY_LEN_V1 116U
55 
56 #define PREFIX_KEY_LEN_V2 115U
57 
58 // Number of skip list levels
59 #define SLEVELS 24U
60 
61 #define AANUM (2U * SLEVELS + 2 /* levels + (new block created) + (db block may be updated) */)
62 
63 // Lower key length in SBLK
64 #define SBLK_LKLEN PREFIX_KEY_LEN_V2
65 
66 // Size of database start block in bytes
67 #define DB_SZ (2U * (1U << IWKV_FSM_BPOW))
68 
69 // Size of `SBLK` in bytes
70 #define SBLK_SZ (2U * (1U << IWKV_FSM_BPOW))
71 
72 // Number of SBLK blocks in one page
73 #define SBLK_PAGE_SBLK_NUM_V2 16U
74 
75 // Size of page with adjacent SBLK blocks. 4096
76 // Data format version: v2
77 #define SBLK_PAGE_SZ_V2 (SBLK_PAGE_SBLK_NUM_V2 * SBLK_SZ)
78 
79 // Number of `KV` blocks in KVBLK
80 #define KVBLK_IDXNUM 32U
81 
82 // Initial `KVBLK` size power of 2
83 #define KVBLK_INISZPOW 9U
84 
85 // KVBLK header size: blen:u1,idxsz:u2
86 #define KVBLK_HDRSZ 3U
87 
88 // Max kvp offset bytes
89 #define KVP_MAX_OFF_VLEN 8U
90 
91 // Max kvp len 0xfffffffULL bytes
92 #define KVP_MAX_LEN_VLEN 5U
93 
94 #define KVBLK_MAX_IDX_SZ ((KVP_MAX_OFF_VLEN + KVP_MAX_LEN_VLEN) * KVBLK_IDXNUM)
95 
96 // Max non KV size [blen:u1,idxsz:u2,[ps1:vn,pl1:vn,...,ps63,pl63]
97 #define KVBLK_MAX_NKV_SZ (KVBLK_HDRSZ + KVBLK_MAX_IDX_SZ)
98 
99 #define ADDR2BLK(addr_) ((blkn_t) (((uint64_t)(addr_)) >> IWKV_FSM_BPOW))
100 
101 #define BLK2ADDR(blk_) (((uint64_t) (blk_)) << IWKV_FSM_BPOW)
102 
103 struct _IWKV;
104 struct _IWDB;
105 
106 typedef uint32_t blkn_t;
107 typedef uint32_t dbid_t;
108 
109 /* Key/Value pair stored in `KVBLK` */
110 typedef struct KV {
111   size_t keysz;
112   size_t valsz;
113   uint8_t *key;
114   uint8_t *val;
115 } KV;
116 
117 /* Ket/Value (KV) index: Offset and length. */
118 typedef struct KVP {
119   off_t off;      /**< KV block offset relative to `end` of KVBLK */
120   uint32_t len;   /**< Length of kv pair block */
121   uint8_t ridx;   /**< Position of the actually persisted slot in `KVBLK` */
122 } KVP;
123 
124 typedef uint8_t kvblk_flags_t;
125 #define KVBLK_DEFAULT       ((kvblk_flags_t) 0x00U)
126 /** KVBLK data is durty and should be flushed to mm */
127 #define KVBLK_DURTY         ((kvblk_flags_t) 0x01U)
128 
129 typedef uint8_t kvblk_rmkv_opts_t;
130 #define RMKV_SYNC           ((kvblk_rmkv_opts_t) 0x01U)
131 #define RMKV_NO_RESIZE      ((kvblk_rmkv_opts_t) 0x02U)
132 
133 typedef uint8_t sblk_flags_t;
134 /** The lowest `SBLK` key is fully contained in `SBLK`. Persistent flag. */
135 #define SBLK_FULL_LKEY      ((sblk_flags_t) 0x01U)
136 /** This block is the start database block. */
137 #define SBLK_DB             ((sblk_flags_t) 0x08U)
138 /** Block data changed, block marked as durty and needs to be persisted */
139 #define SBLK_DURTY          ((sblk_flags_t) 0x10U)
140 /** Put this `SBLK` into dbcache */
141 #define SBLK_CACHE_PUT      ((sblk_flags_t) 0x20U)
142 #define SBLK_CACHE_UPDATE   ((sblk_flags_t) 0x40U)
143 #define SBLK_CACHE_REMOVE   ((sblk_flags_t) 0x80U)
144 
145 typedef uint8_t iwlctx_op_t;
146 /** Put key value operation */
147 #define IWLCTX_PUT          ((iwlctx_op_t) 0x01U)
148 /** Delete key operation */
149 #define IWLCTX_DEL          ((iwlctx_op_t) 0x01U)
150 
151 /* KVBLK: [szpow:u1,idxsz:u2,[ps0:vn,pl0:vn,..., ps32,pl32]____[[KV],...]] */
152 typedef struct KVBLK {
153   IWDB db;
154   off_t addr;                 /**< Block address */
155   off_t maxoff;               /**< Max pair offset */
156   uint16_t idxsz;             /**< Size of KV pairs index in bytes */
157   int8_t zidx;                /**< Index of first empty pair slot (zero index), or -1 */
158   uint8_t szpow;              /**< Block size as power of 2 */
159   kvblk_flags_t flags;        /**< Flags */
160   KVP pidx[KVBLK_IDXNUM];     /**< KV pairs index */
161 } KVBLK;
162 
163 #define SBLK_PERSISTENT_FLAGS (SBLK_FULL_LKEY)
164 #define SBLK_CACHE_FLAGS (SBLK_CACHE_UPDATE | SBLK_CACHE_PUT | SBLK_CACHE_REMOVE)
165 
166 // Number of top levels to cache (~ (1<<DBCACHE_LEVELS) cached elements)
167 #define DBCACHE_LEVELS 10U
168 
169 // Minimal cached level
170 #define DBCACHE_MIN_LEVEL 5U
171 
172 // Single allocation step - number of DBCNODEs
173 #define DBCACHE_ALLOC_STEP 32U
174 
175 /** Cached SBLK node */
176 typedef struct DBCNODE {
177   blkn_t sblkn;               /**< SBLK block number or used to store key size (to keep DBCNODE compact) */
178   blkn_t kblkn;               /**< KVBLK block number */
179   uint8_t lkl;                /**< Lower key length */
180   uint8_t fullkey;            /**< SBLK is full key */
181   uint8_t k0idx;              /**< KVBLK Zero KVP index */
182   uint8_t pad;                /**< 1 byte pad */
183   uint8_t lk[1];              /**< Lower key buffer */
184 } DBCNODE;
185 
186 #define DBCNODE_VNUM_SZ 24
187 #define DBCNODE_STR_SZ 128
188 
189 static_assert(DBCNODE_VNUM_SZ >= offsetof(DBCNODE, lk) + IW_VNUMBUFSZ,
190               "DBCNODE_VNUM_SZ >= offsetof(DBCNODE, lk) + IW_VNUMBUFSZ");
191 static_assert(DBCNODE_STR_SZ >= offsetof(DBCNODE, lk) + SBLK_LKLEN,
192               "DBCNODE_STR_SZ >= offsetof(DBCNODE, lk) + SBLK_LKLEN");
193 
194 /** Tallest SBLK nodes cache */
195 typedef struct DBCACHE {
196   size_t asize;                 /**< Size of allocated cache buffer */
197   size_t num;                   /**< Actual number of nodes */
198   size_t nsize;                 /**< Cached node size */
199   uint8_t lvl;                  /**< Lowes cached level */
200   bool open;                    /**< Is cache open */
201   DBCNODE *nodes;               /**< Sorted nodes array */
202 } DBCACHE;
203 
204 struct _IWKV_cursor;
205 
206 /* Database: [magic:u4,dbflg:u1,dbid:u4,next_db_blk:u4,p0:u4,n[24]:u4,c[24]:u4]:209 */
207 struct _IWDB {
208   // SBH
209   IWDB db;                        /**< Database ref */
210   off_t addr;                     /**< Database block address */
211   sblk_flags_t flags;             /**< Flags */
212   // !SBH
213   IWKV iwkv;
214   DBCACHE cache;                  /**< SBLK nodes cache */
215   pthread_rwlock_t rwl;           /**< Database API RW lock */
216   pthread_spinlock_t cursors_slk; /**< Cursors set guard lock */
217   off_t next_db_addr;             /**< Next IWDB addr */
218   struct _IWKV_cursor *cursors;   /**< Active (currently in-use) database cursors */
219   struct _IWDB *next;             /**< Next IWDB meta */
220   struct _IWDB *prev;             /**< Prev IWDB meta */
221   dbid_t id;                      /**< Database ID */
222   volatile int32_t wk_count;      /**< Number of active database workers */
223   blkn_t meta_blk;                /**< Database meta block number */
224   blkn_t meta_blkn;               /**< Database meta length (number of blocks) */
225   iwdb_flags_t dbflg;             /**< Database specific flags */
226   atomic_bool open;               /**< True if DB is in OPEN state */
227   volatile bool wk_pending_exclusive; /**< If true someone wants to acquire exclusive lock on IWDB */
228   uint32_t lcnt[SLEVELS];         /**< SBLK count per level */
229 
230 };
231 
232 /* Skiplist block: [u1:flags,lvl:u1,lkl:u1,pnum:u1,p0:u4,kblk:u4,[pi0:u1,... pi32],n0-n23:u4,lk:u116]:u256 // SBLK */
233 typedef struct SBLK {
234   // SBH
235   IWDB db;                    /**< Database ref */
236   off_t addr;                 /**< Block address */
237   sblk_flags_t flags;         /**< Flags */
238   uint8_t lvl;                /**< Skip list node level */
239   uint8_t bpos;               /**< Position of SBLK in a page block starting with 1 (zero means SBLK deleted) */
240   blkn_t p0;                  /**< Prev node, if IWDB it is the last node */
241   blkn_t n[SLEVELS];          /**< Next nodes */
242   // !SBH
243   KVBLK *kvblk;               /**< Associated KVBLK */
244   blkn_t kvblkn;              /**< Associated KVBLK block number */
245   int8_t pnum;                /**< Number of active kv indexes in `SBLK::pi` */
246   uint8_t lkl;                /**< Lower key length within a buffer */
247   uint8_t pi[KVBLK_IDXNUM];   /**< Sorted KV slots, value is an index of kv slot in `KVBLK` */
248   uint8_t lk[PREFIX_KEY_LEN_V1];   /**< Lower key buffer */
249 } SBLK;
250 
251 // -V:KHASH_MAP_INIT_INT:522
252 KHASH_MAP_INIT_INT(DBS, IWDB)
253 
254 /** IWKV instance */
255 struct _IWKV {
256   IWFS_FSM fsm;               /**< FSM pool */
257   pthread_rwlock_t rwl;       /**< API RW lock */
258   iwrc fatalrc;               /**< Fatal error occuried, no farther operations can be performed */
259   IWDB first_db;              /**< First database in chain */
260   IWDB last_db;               /**< Last database in chain */
261   IWDLSNR *dlsnr;             /**< WAL data events listener */
262   khash_t(DBS) *dbs;          /**< Database id -> IWDB mapping */
263   iwkv_openflags oflags;      /**< Open flags */
264   pthread_cond_t wk_cond;     /**< Workers cond variable */
265   pthread_mutex_t wk_mtx;     /**< Workers cond mutext */
266   int32_t fmt_version;        /**< Database format version */
267   int32_t pklen;              /**< Prefix key length in use */
268   volatile int32_t wk_count;  /**< Number of active workers */
269   volatile bool wk_pending_exclusive; /**< If true someone wants to acquire exclusive lock on IWKV */
270   atomic_bool open;           /**< True if kvstore is in OPEN state */
271 };
272 
273 /** Database lookup context */
274 typedef struct IWLCTX {
275   IWDB db;
276   const IWKV_val *key;        /**< Search key */
277   IWKV_val *val;              /**< Update value */
278   SBLK *lower;                /**< Next to upper bound block */
279   SBLK *upper;                /**< Upper bound block */
280   SBLK *nb;                   /**< New block */
281   off_t destroy_addr;         /**< Block to destroy address */
282   off_t upper_addr;           /**< Upper block address used in `_lx_del_lr()` */
283 #ifndef NDEBUG
284   uint32_t num_cmps;
285 #endif
286   iwkv_opflags opflags;       /**< Operation flags */
287   sblk_flags_t sbflags;       /**< `SBLK` flags applied to all new/looked blocks in this context */
288   iwlctx_op_t op;             /**< Context operation */
289   uint8_t saan;               /**< Position of next free `SBLK` element in the `saa` area */
290   uint8_t kaan;               /**< Position of next free `KVBLK` element in the `kaa` area */
291   int8_t nlvl;                /**< Level of new inserted/deleted `SBLK` node. -1 if no new node inserted/deleted */
292   int8_t cache_reload;        /**< If true dbcache should be refreshed after operation */
293   IWKV_PUT_HANDLER ph;        /**< Optional put handler */
294   void *phop;                 /**< Put handler opaque data */
295   SBLK *plower[SLEVELS];      /**< Pinned lower nodes per level */
296   SBLK *pupper[SLEVELS];      /**< Pinned upper nodes per level */
297   IWKV_val ekey;
298   SBLK dblk;                  /**< First database block */
299   SBLK saa[AANUM];            /**< `SBLK` allocation area */
300   KVBLK kaa[AANUM];           /**< `KVBLK` allocation area */
301   uint8_t nbuf[IW_VNUMBUFSZ];
302   uint8_t incbuf[8];          /**< Buffer used to store incremented/decremented values `IWKV_VAL_INCREMENT` opflag */
303 } IWLCTX;
304 
305 /** Cursor context */
306 struct _IWKV_cursor {
307   uint8_t cnpos;              /**< Position in the current `SBLK` node */
308   bool closed;                /**< Cursor closed */
309   int8_t skip_next;           /**< When to skip next IWKV_CURSOR_NEXT|IWKV_CURSOR_PREV cursor move
310                                    due to the side effect of `iwkv_cursor_del()` call.
311                                    If `skip_next > 0` `IWKV_CURSOR_NEXT` will be skipped
312                                    If `skip_next < 0` `IWKV_CURSOR_PREV` will be skipped */
313   SBLK *cn;                   /**< Current `SBLK` node */
314   struct _IWKV_cursor *next;  /**< Next cursor in active db cursors chain */
315   off_t dbaddr;               /**< Database address used as `cn` */
316   IWLCTX lx;                  /**< Lookup context */
317 };
318 
319 #define ENSURE_OPEN(iwkv_) \
320   if (!(iwkv_) || !((iwkv_)->open)) return IW_ERROR_INVALID_STATE; \
321   if ((iwkv_)->fatalrc) return (iwkv_)->fatalrc
322 
323 #define ENSURE_OPEN_DB(db_) \
324   if (!(db_) || !(db_)->iwkv || !(db_)->open || !((db_)->iwkv->open)) return IW_ERROR_INVALID_STATE
325 
326 #define API_RLOCK(iwkv_, rci_) \
327   ENSURE_OPEN(iwkv_);  \
328   (rci_) = pthread_rwlock_rdlock(&(iwkv_)->rwl); \
329   if (rci_) return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci_)
330 
_api_rlock(IWKV iwkv)331 IW_INLINE iwrc _api_rlock(IWKV iwkv)  {
332   int rci;
333   API_RLOCK(iwkv, rci);
334   return 0;
335 }
336 
337 #define API_WLOCK(iwkv_, rci_) \
338   ENSURE_OPEN(iwkv_);  \
339   (rci_) = pthread_rwlock_wrlock(&(iwkv_)->rwl); \
340   if (rci_) return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci_)
341 
_api_wlock(IWKV iwkv)342 IW_INLINE iwrc _api_wlock(IWKV iwkv)  {
343   int rci;
344   API_WLOCK(iwkv, rci);
345   return 0;
346 }
347 
348 #define API_UNLOCK(iwkv_, rci_, rc_)  \
349   rci_ = pthread_rwlock_unlock(&(iwkv_)->rwl); \
350   if (rci_) IWRC(iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci_), rc_)
351 
352 #define API_DB_RLOCK(db_, rci_)                               \
353   do {                                                        \
354     API_RLOCK((db_)->iwkv, rci_);                             \
355     (rci_) = pthread_rwlock_rdlock(&(db_)->rwl);                \
356     if (rci_) {                                               \
357       pthread_rwlock_unlock(&(db_)->iwkv->rwl);               \
358       return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci_);  \
359     }                                                         \
360   } while(0)
361 
_api_db_rlock(IWDB db)362 IW_INLINE iwrc _api_db_rlock(IWDB db)  {
363   int rci;
364   API_DB_RLOCK(db, rci);
365   return 0;
366 }
367 
368 #define API_DB_WLOCK(db_, rci_)                               \
369   do {                                                        \
370     API_RLOCK((db_)->iwkv, rci_);                             \
371     (rci_) = pthread_rwlock_wrlock(&(db_)->rwl);                \
372     if (rci_) {                                               \
373       pthread_rwlock_unlock(&(db_)->iwkv->rwl);               \
374       return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci_);  \
375     }                                                         \
376   } while(0)
377 
_api_db_wlock(IWDB db)378 IW_INLINE iwrc _api_db_wlock(IWDB db)  {
379   int rci;
380   API_DB_WLOCK(db, rci);
381   return 0;
382 }
383 
384 #define API_DB_UNLOCK(db_, rci_, rc_)                                     \
385   do {                                                                    \
386     (rci_) = pthread_rwlock_unlock(&(db_)->rwl);                            \
387     if (rci_) IWRC(iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci_), rc_);  \
388     API_UNLOCK((db_)->iwkv, rci_, rc_);                                   \
389   } while(0)
390 
391 #define AAPOS_INC(aan_)         \
392   do {                          \
393     if ((aan_) < AANUM - 1) {   \
394       (aan_) = (aan_) + 1;      \
395     } else {                    \
396       (aan_) = 0;               \
397     }                           \
398   } while(0)
399 
400 
401 // SBLK
402 // [flags:u1,lvl:u1,lkl:u1,pnum:u1,p0:u4,kblk:u4,pi:u1[32],n:u4[24],bpos:u1,lk:u115]:u256
403 
404 #define SOFF_FLAGS_U1     0
405 #define SOFF_LVL_U1       (SOFF_FLAGS_U1 + 1)
406 #define SOFF_LKL_U1       (SOFF_LVL_U1 + 1)
407 #define SOFF_PNUM_U1      (SOFF_LKL_U1 + 1)
408 #define SOFF_P0_U4        (SOFF_PNUM_U1 + 1)
409 #define SOFF_KBLK_U4      (SOFF_P0_U4 + 4)
410 #define SOFF_PI0_U1       (SOFF_KBLK_U4 + 4)
411 #define SOFF_N0_U4        (SOFF_PI0_U1 + 1 * KVBLK_IDXNUM)
412 #define SOFF_BPOS_U1_V2   (SOFF_N0_U4 + 4 * SLEVELS)
413 #define SOFF_LK_V2        (SOFF_BPOS_U1_V2 + 1)
414 #define SOFF_LK_V1        (SOFF_N0_U4 + 4 * SLEVELS)
415 #define SOFF_END          (SOFF_LK_V2 + SBLK_LKLEN)
416 static_assert(SOFF_END == 256, "SOFF_END == 256");
417 static_assert(SBLK_SZ >= SOFF_END, "SBLK_SZ >= SOFF_END");
418 
419 // DB
420 // [magic:u4,dbflg:u1,dbid:u4,next_db_blk:u4,p0:u4,n[24]:u4,c[24]:u4,meta_blk:u4,meta_blkn:u4]:217
421 #define DOFF_MAGIC_U4     0
422 #define DOFF_DBFLG_U1     (DOFF_MAGIC_U4 + 4)
423 #define DOFF_DBID_U4      (DOFF_DBFLG_U1 + 1)
424 #define DOFF_NEXTDB_U4    (DOFF_DBID_U4 + 4)
425 #define DOFF_P0_U4        (DOFF_NEXTDB_U4 + 4)
426 #define DOFF_N0_U4        (DOFF_P0_U4 + 4)
427 #define DOFF_C0_U4        (DOFF_N0_U4 + 4 * SLEVELS)
428 #define DOFF_METABLK_U4   (DOFF_C0_U4 + 4 * SLEVELS)
429 #define DOFF_METABLKN_U4  (DOFF_METABLK_U4 + 4)
430 #define DOFF_END          (DOFF_METABLKN_U4 + 4)
431 static_assert(DOFF_END == 217, "DOFF_END == 217");
432 static_assert(DB_SZ >= DOFF_END, "DB_SZ >= DOFF_END");
433 
434 // KVBLK
435 // [szpow:u1,idxsz:u2,[ps1:vn,pl1:vn,...,ps32,pl32]____[[_KV],...]] // KVBLK
436 #define KBLK_SZPOW_OFF   0
437 
438 
439 iwrc iwkv_exclusive_lock(IWKV iwkv);
440 iwrc iwkv_exclusive_unlock(IWKV iwkv);
441 void iwkvd_trigger_xor(uint64_t val);
442 void iwkvd_kvblk(FILE *f, KVBLK *kb, int maxvlen);
443 iwrc iwkvd_sblk(FILE *f, IWLCTX *lx, SBLK *sb, int flags);
444 void iwkvd_db(FILE *f, IWDB db, int flags, int plvl);
445 
446 // IWKVD Trigger commands
447 #ifdef IW_TESTS
448 #define IWKVD_WAL_NO_CHECKPOINT_ON_CLOSE 1UL
449 #endif
450 
451 #endif
452