• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // -V::512
2 
3 #include "iwkv_internal.h"
4 #include "iwconv.h"
5 #include <stdalign.h>
6 
7 static iwrc _dbcache_fill_lw(IWLCTX *lx);
8 static iwrc _dbcache_get(IWLCTX *lx);
9 static iwrc _dbcache_put_lw(IWLCTX *lx, SBLK *sblk);
10 static void _dbcache_remove_lw(IWLCTX *lx, SBLK *sblk);
11 static void _dbcache_update_lw(IWLCTX *lx, SBLK *sblk);
12 static void _dbcache_destroy_lw(IWDB db);
13 
14 #define _wnw_db_wl(db_) _api_db_wlock(db_)
15 
16 //-------------------------- GLOBALS
17 
18 #ifdef IW_TESTS
19 volatile int8_t iwkv_next_level = -1;
20 #endif
21 atomic_uint_fast64_t g_trigger;
22 
23 #define IWKV_IS_INTERNAL_RC(rc_) ((rc_) > _IWKV_ERROR_END && (rc_) < _IWKV_RC_END)
24 
25 //-------------------------- UTILS
26 
_to_effective_key(struct _IWDB * db,const IWKV_val * key,IWKV_val * okey,uint8_t nbuf[static IW_VNUMBUFSZ])27 IW_SOFT_INLINE iwrc _to_effective_key(struct _IWDB *db, const IWKV_val *key, IWKV_val *okey,
28                                       uint8_t nbuf[static IW_VNUMBUFSZ]) {
29   static_assert(IW_VNUMBUFSZ >= sizeof(uint64_t), "IW_VNUMBUFSZ >= sizeof(uint64_t)");
30   iwdb_flags_t dbflg = db->dbflg;
31   // Keys compound will be processed at lower levels at `addkv` routines
32   okey->compound = key->compound;
33   if (dbflg & IWDB_VNUM64_KEYS) {
34     unsigned len;
35     if (key->size == 8) {
36       uint64_t llv;
37       memcpy(&llv, key->data, sizeof(llv));
38       IW_SETVNUMBUF64(len, nbuf, llv);
39       if (!len) return IW_ERROR_OVERFLOW;
40       okey->size = len;
41       okey->data = nbuf;
42     } else if (key->size == 4) {
43       uint32_t lv;
44       memcpy(&lv, key->data, sizeof(lv));
45       IW_SETVNUMBUF(len, nbuf, lv);
46       if (!len) return IW_ERROR_OVERFLOW;
47       okey->size = len;
48       okey->data = nbuf;
49     } else {
50       return IWKV_ERROR_KEY_NUM_VALUE_SIZE;
51     }
52   } else {
53     okey->data = key->data;
54     okey->size = key->size;
55   }
56   return 0;
57 }
58 
59 // NOTE: at least `2*IW_VNUMBUFSZ` must be allocated for key->data
_unpack_effective_key(struct _IWDB * db,IWKV_val * key,bool no_move_key_data)60 static iwrc _unpack_effective_key(struct _IWDB *db, IWKV_val *key, bool no_move_key_data) {
61   iwdb_flags_t dbflg = db->dbflg;
62   uint8_t *data = key->data;
63   if (dbflg & IWDB_COMPOUND_KEYS) {
64     int step;
65     IW_READVNUMBUF64(key->data, key->compound, step);
66     if (step >= key->size) {
67       return IWKV_ERROR_KEY_NUM_VALUE_SIZE;
68     }
69     data += step;
70     key->size -= step;
71     if (!no_move_key_data && !(dbflg & IWDB_VNUM64_KEYS)) {
72       memmove(key->data, data, key->size);
73     }
74   } else {
75     key->compound = 0;
76   }
77   if (dbflg & IWDB_VNUM64_KEYS) {
78     int64_t llv;
79     char nbuf[IW_VNUMBUFSZ];
80     if (key->size > IW_VNUMBUFSZ) {
81       return IWKV_ERROR_KEY_NUM_VALUE_SIZE;
82     }
83     memcpy(nbuf, data, key->size);
84     IW_READVNUMBUF64_2(nbuf, llv);
85     memcpy(key->data, &llv, sizeof(llv));
86     key->size = sizeof(llv);
87   }
88   return 0;
89 }
90 
_cmp_keys_prefix(iwdb_flags_t dbflg,const void * v1,int v1len,const IWKV_val * key)91 static int _cmp_keys_prefix(iwdb_flags_t dbflg, const void *v1, int v1len, const IWKV_val *key) {
92   int ret;
93   if (dbflg & IWDB_COMPOUND_KEYS) {
94     // Compound keys mode
95     const char *u1 = v1;
96     const char *u2 = key->data;
97     int step, v2len = (int) key->size;
98     int64_t c1, c2 = key->compound;
99     IW_READVNUMBUF64(v1, c1, step);
100     v1len -= step;
101     u1 += step;
102     if (v1len < 1) {
103       // Inconsistent data?
104       return v2len - v1len;
105     }
106     if (dbflg & IWDB_VNUM64_KEYS) {
107       if (v2len != v1len || v2len > IW_VNUMBUFSZ || v1len > IW_VNUMBUFSZ) {
108         return v2len - v1len;
109       }
110       int64_t n1, n2;
111       char vbuf[IW_VNUMBUFSZ];
112       memcpy(vbuf, u1, v1len);
113       IW_READVNUMBUF64_2(vbuf, n1);
114       memcpy(vbuf, u2, v2len);
115       IW_READVNUMBUF64_2(vbuf, n2);
116       ret = n1 > n2 ? -1 : n1 < n2 ? 1 : 0;
117       if (ret == 0) {
118         ret = c1 > c2 ? -1 : c1 < c2 ? 1 : 0;
119       }
120     } else if (dbflg & IWDB_REALNUM_KEYS) {
121       ret = iwafcmp(u2, v2len, u1, v1len);
122       if (ret == 0) {
123         ret = c1 > c2 ? -1 : c1 < c2 ? 1 : 0;
124       }
125     } else {
126       IW_CMP2(ret, u2, v2len, u1, v1len);
127     }
128     return ret;
129   } else {
130     int v2len = (int) key->size;
131     const void *v2 = key->data;
132     if (dbflg & IWDB_VNUM64_KEYS) {
133       if (v2len != v1len || v2len > IW_VNUMBUFSZ || v1len > IW_VNUMBUFSZ) {
134         return v2len - v1len;
135       }
136       int64_t n1, n2;
137       char vbuf[IW_VNUMBUFSZ];
138       memcpy(vbuf, v1, v1len);
139       IW_READVNUMBUF64_2(vbuf, n1);
140       memcpy(vbuf, v2, v2len);
141       IW_READVNUMBUF64_2(vbuf, n2);
142       return n1 > n2 ? -1 : n1 < n2 ? 1 : 0;
143     } else if (dbflg & IWDB_REALNUM_KEYS) {
144       return iwafcmp(v2, v2len, v1, v1len);
145     } else {
146       IW_CMP2(ret, v2, v2len, v1, v1len);
147       return ret;
148     }
149   }
150 }
151 
_cmp_keys(iwdb_flags_t dbflg,const void * v1,int v1len,const IWKV_val * key)152 IW_INLINE int _cmp_keys(iwdb_flags_t dbflg, const void *v1, int v1len, const IWKV_val *key) {
153   int rv = _cmp_keys_prefix(dbflg, v1, v1len, key);
154   if (rv == 0 && !(dbflg & (IWDB_VNUM64_KEYS | IWDB_REALNUM_KEYS))) {
155     if (dbflg & IWDB_COMPOUND_KEYS) {
156       int step;
157       int64_t c1, c2 = key->compound;
158       IW_READVNUMBUF64(v1, c1, step);
159       v1len -= step;
160       if ((int) key->size == v1len) {
161         return c1 > c2 ? -1 : c1 < c2 ? 1 : 0;
162       }
163     }
164     return (int) key->size - v1len;
165   } else {
166     return rv;
167   }
168 }
169 
_kv_val_dispose(IWKV_val * v)170 IW_INLINE void _kv_val_dispose(IWKV_val *v) {
171   if (v) {
172     free(v->data);
173     v->size = 0;
174     v->data = 0;
175   }
176 }
177 
_kv_dispose(IWKV_val * key,IWKV_val * val)178 IW_INLINE void _kv_dispose(IWKV_val *key, IWKV_val *val) {
179   _kv_val_dispose(key);
180   _kv_val_dispose(val);
181 }
182 
iwkv_val_dispose(IWKV_val * v)183 void iwkv_val_dispose(IWKV_val *v) {
184   _kv_val_dispose(v);
185 }
186 
iwkv_kv_dispose(IWKV_val * key,IWKV_val * val)187 void iwkv_kv_dispose(IWKV_val *key, IWKV_val *val) {
188   _kv_dispose(key, val);
189 }
190 
_num2lebuf(uint8_t buf[static8],void * numdata,size_t sz)191 IW_INLINE void _num2lebuf(uint8_t buf[static 8], void *numdata, size_t sz) {
192   assert(sz == 4 || sz == 8);
193   if (sz > 4) {
194     uint64_t llv;
195     memcpy(&llv, numdata, sizeof(llv));
196     llv = IW_HTOILL(llv);
197     memcpy(buf, &llv, sizeof(llv));
198   } else {
199     uint32_t lv;
200     memcpy(&lv, numdata, sizeof(lv));
201     lv = IW_HTOIL(lv);
202     memcpy(buf, &lv, sizeof(lv));
203   }
204 }
205 
206 //-------------------------- IWKV/IWDB WORKERS
207 
_iwkv_worker_inc_nolk(IWKV iwkv)208 static WUR iwrc _iwkv_worker_inc_nolk(IWKV iwkv) {
209   if (!iwkv || !iwkv->open) {
210     return IW_ERROR_INVALID_STATE;
211   }
212   int rci = pthread_mutex_lock(&iwkv->wk_mtx);
213   if (rci) {
214     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
215   }
216   if (!iwkv->open) { // -V547
217     pthread_mutex_unlock(&iwkv->wk_mtx);
218     return IW_ERROR_INVALID_STATE;
219   }
220   while (iwkv->wk_pending_exclusive) {
221     pthread_cond_wait(&iwkv->wk_cond, &iwkv->wk_mtx);
222   }
223   ++iwkv->wk_count;
224   pthread_cond_broadcast(&iwkv->wk_cond);
225   pthread_mutex_unlock(&iwkv->wk_mtx);
226   return 0;
227 }
228 
_db_worker_inc_nolk(IWDB db)229 static WUR iwrc _db_worker_inc_nolk(IWDB db) {
230   if (!db || !db->iwkv || !db->iwkv->open || !db->open) {
231     return IW_ERROR_INVALID_STATE;
232   }
233   IWKV iwkv = db->iwkv;
234   int rci = pthread_mutex_lock(&iwkv->wk_mtx);
235   if (rci) {
236     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
237   }
238   if (!iwkv->open || !db->open) { // -V560
239     pthread_mutex_unlock(&iwkv->wk_mtx);
240     return IW_ERROR_INVALID_STATE;
241   }
242   while (db->wk_pending_exclusive) {
243     pthread_cond_wait(&iwkv->wk_cond, &iwkv->wk_mtx);
244   }
245   ++iwkv->wk_count;
246   ++db->wk_count;
247   pthread_cond_broadcast(&iwkv->wk_cond);
248   pthread_mutex_unlock(&iwkv->wk_mtx);
249   return 0;
250 }
251 
_iwkv_worker_dec_nolk(IWKV iwkv)252 static iwrc _iwkv_worker_dec_nolk(IWKV iwkv) {
253   if (!iwkv) {
254     return IW_ERROR_INVALID_STATE;
255   }
256   int rci = pthread_mutex_lock(&iwkv->wk_mtx);
257   if (rci) {
258     // Last chanсe to be consistent
259     --iwkv->wk_count;
260     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
261   }
262   --iwkv->wk_count;
263   pthread_cond_broadcast(&iwkv->wk_cond);
264   pthread_mutex_unlock(&iwkv->wk_mtx);
265   return 0;
266 }
267 
_db_worker_dec_nolk(IWDB db)268 static iwrc _db_worker_dec_nolk(IWDB db) {
269   if (!db || !db->iwkv) { // do not use ENSURE_OPEN_DB here
270     return IW_ERROR_INVALID_STATE;
271   }
272   IWKV iwkv = db->iwkv;
273   int rci = pthread_mutex_lock(&iwkv->wk_mtx);
274   if (rci) {
275     // Last chanсe to be consistent
276     --iwkv->wk_count;
277     --db->wk_count;
278     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
279   }
280   --iwkv->wk_count;
281   --db->wk_count;
282   pthread_cond_broadcast(&iwkv->wk_cond);
283   pthread_mutex_unlock(&iwkv->wk_mtx);
284   return 0;
285 }
286 
_wnw_iwkw_wl(IWKV iwkv)287 static WUR iwrc _wnw_iwkw_wl(IWKV iwkv) {
288   int rci = pthread_rwlock_wrlock(&iwkv->rwl);
289   if (rci) {
290     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
291   }
292   return 0;
293 }
294 
_wnw(IWKV iwkv,iwrc (* after)(IWKV iwkv))295 static WUR iwrc _wnw(IWKV iwkv, iwrc(*after)(IWKV iwkv)) {
296   iwrc rc = 0;
297   int rci = pthread_mutex_lock(&iwkv->wk_mtx);
298   if (rci) {
299     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
300   }
301   iwkv->wk_pending_exclusive = true;
302   while (iwkv->wk_count > 0) {
303     pthread_cond_wait(&iwkv->wk_cond, &iwkv->wk_mtx);
304   }
305   if (after) {
306     rc = after(iwkv);
307   }
308   iwkv->wk_pending_exclusive = false;
309   pthread_cond_broadcast(&iwkv->wk_cond);
310   rci = pthread_mutex_unlock(&iwkv->wk_mtx);
311   if (rci) {
312     IWRC(iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci), rc);
313   }
314   return rc;
315 }
316 
_wnw_db(IWDB db,iwrc (* after)(IWDB db))317 static WUR iwrc _wnw_db(IWDB db, iwrc(*after)(IWDB db)) {
318   iwrc rc = 0;
319   IWKV iwkv = db->iwkv;
320   int rci = pthread_mutex_lock(&iwkv->wk_mtx);
321   if (rci) {
322     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
323   }
324   db->wk_pending_exclusive = true;
325   while (db->wk_count > 0) {
326     pthread_cond_wait(&iwkv->wk_cond, &iwkv->wk_mtx);
327   }
328   if (after) {
329     rc = after(db);
330   }
331   db->wk_pending_exclusive = false;
332   pthread_cond_broadcast(&iwkv->wk_cond);
333   rci = pthread_mutex_unlock(&iwkv->wk_mtx);
334   if (rci) {
335     IWRC(iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci), rc);
336   }
337   return rc;
338 }
339 
340 //--------------------------  DB
341 
_db_at(IWKV iwkv,IWDB * dbp,off_t addr,uint8_t * mm)342 static WUR iwrc _db_at(IWKV iwkv, IWDB *dbp, off_t addr, uint8_t *mm) {
343   iwrc rc = 0;
344   uint8_t *rp, bv;
345   uint32_t lv;
346   int rci;
347   IWDB db = calloc(1, sizeof(struct _IWDB));
348   *dbp = 0;
349   if (!db) {
350     return iwrc_set_errno(IW_ERROR_ALLOC, errno);
351   }
352   pthread_rwlockattr_t attr;
353   pthread_rwlockattr_init(&attr);
354 #if defined __linux__ && (defined __USE_UNIX98 || defined __USE_XOPEN2K)
355   pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
356 #endif
357   rci = pthread_rwlock_init(&db->rwl, &attr);
358   if (rci) {
359     free(db);
360     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
361   }
362   rci = pthread_spin_init(&db->cursors_slk, 0);
363   if (rci) {
364     pthread_rwlock_destroy(&db->rwl);
365     free(db);
366     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
367   }
368   // [magic:u4,dbflg:u1,dbid:u4,next_db_blk:u4,p0:u4,n[24]:u4,c[24]:u4,meta_blk:u4,meta_blkn:u4]:217
369   db->flags = SBLK_DB;
370   db->addr = addr;
371   db->db = db;
372   db->iwkv = iwkv;
373   rp = mm + addr;
374   IW_READLV(rp, lv, lv);
375   if (lv != IWDB_MAGIC) {
376     rc = IWKV_ERROR_CORRUPTED;
377     iwlog_ecode_error3(rc);
378     goto finish;
379   }
380   IW_READBV(rp, bv, db->dbflg);
381   IW_READLV(rp, lv, db->id);
382   IW_READLV(rp, lv, db->next_db_addr);
383   db->next_db_addr = BLK2ADDR(db->next_db_addr); // blknum -> addr
384   rp = mm + addr + DOFF_C0_U4;
385   for (int i = 0; i < SLEVELS; ++i) {
386     IW_READLV(rp, lv, db->lcnt[i]);
387   }
388   if (iwkv->fmt_version >= 1) {
389     IW_READLV(rp, lv, db->meta_blk);
390     IW_READLV(rp, lv, db->meta_blkn);
391   }
392   db->open = true;
393   *dbp = db;
394 
395 finish:
396   if (rc) {
397     pthread_rwlock_destroy(&db->rwl);
398     free(db);
399   }
400   return rc;
401 }
402 
_db_save(IWDB db,bool newdb,uint8_t * mm)403 static WUR iwrc _db_save(IWDB db, bool newdb, uint8_t *mm) {
404   iwrc rc = 0;
405   uint32_t lv;
406   uint8_t *wp = mm + db->addr, bv;
407   uint8_t *sp = wp;
408   IWDLSNR *dlsnr = db->iwkv->dlsnr;
409   db->next_db_addr = db->next ? db->next->addr : 0;
410   // [magic:u4,dbflg:u1,dbid:u4,next_db_blk:u4,p0:u4,n[24]:u4,c[24]:u4,meta_blk:u4,meta_blkn:u4]:217
411   IW_WRITELV(wp, lv, IWDB_MAGIC);
412   IW_WRITEBV(wp, bv, db->dbflg);
413   IW_WRITELV(wp, lv, db->id);
414   IW_WRITELV(wp, lv, ADDR2BLK(db->next_db_addr));
415   if (dlsnr) {
416     rc = dlsnr->onwrite(dlsnr, db->addr, sp, wp - sp, 0);
417     RCRET(rc);
418   }
419   if (db->iwkv->fmt_version >= 1) {
420     if (newdb) {
421       memset(wp, 0, 4 + SLEVELS * 4 * 2); // p0 + n[24] + c[24]
422       sp = wp;
423       wp += 4 + SLEVELS * 4 * 2; // set to zero
424     } else {
425       wp += 4 + SLEVELS * 4 * 2; // skip
426       sp = wp;
427     }
428     IW_WRITELV(wp, lv, db->meta_blk);
429     IW_WRITELV(wp, lv, db->meta_blkn);
430     if (dlsnr) {
431       rc = dlsnr->onwrite(dlsnr, sp - mm, sp, wp - sp, 0);
432     }
433   }
434   return rc;
435 }
436 
_db_load_chain(IWKV iwkv,off_t addr,uint8_t * mm)437 static WUR iwrc _db_load_chain(IWKV iwkv, off_t addr, uint8_t *mm) {
438   iwrc rc;
439   int rci;
440   IWDB db = 0, ndb;
441   if (!addr) return 0;
442   do {
443     rc = _db_at(iwkv, &ndb, addr, mm);
444     RCRET(rc);
445     if (db) {
446       db->next = ndb;
447       ndb->prev = db;
448     } else {
449       iwkv->first_db = ndb;
450     }
451     db = ndb;
452     addr = db->next_db_addr;
453     iwkv->last_db = db;
454     khiter_t k = kh_put(DBS, iwkv->dbs, db->id, &rci);
455     if (rci != -1) {
456       kh_value(iwkv->dbs, k) = db;
457     } else {
458       return iwrc_set_errno(IW_ERROR_ALLOC, errno);
459     }
460   } while (db->next_db_addr);
461   return rc;
462 }
463 
_db_release_lw(IWDB * dbp)464 static void _db_release_lw(IWDB *dbp) {
465   assert(dbp && *dbp);
466   IWDB db = *dbp;
467   _dbcache_destroy_lw(db);
468   pthread_rwlock_destroy(&db->rwl);
469   pthread_spin_destroy(&db->cursors_slk);
470   free(db);
471   *dbp = 0;
472 }
473 
474 typedef struct DISPOSE_DB_CTX {
475   IWKV iwkv;
476   IWDB db;
477   blkn_t sbn; // First `SBLK` block in DB
478 } DISPOSE_DB_CTX;
479 
_db_dispose_chain(DISPOSE_DB_CTX * dctx)480 static iwrc _db_dispose_chain(DISPOSE_DB_CTX *dctx) {
481   iwrc rc = 0;
482   uint8_t *mm, kvszpow;
483   IWFS_FSM *fsm = &dctx->iwkv->fsm;
484   blkn_t sbn = dctx->sbn, kvblkn;
485   off_t page = 0;
486 
487   while (sbn) {
488     off_t sba = BLK2ADDR(sbn);
489     rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
490     RCBREAK(rc);
491     memcpy(&kvblkn, mm + sba + SOFF_KBLK_U4, 4);
492     kvblkn = IW_ITOHL(kvblkn);
493     memcpy(&sbn, mm + sba + SOFF_N0_U4, 4);
494     sbn = IW_ITOHL(sbn);
495     if (kvblkn) {
496       memcpy(&kvszpow, mm + BLK2ADDR(kvblkn) + KBLK_SZPOW_OFF, 1);
497     }
498     if (dctx->iwkv->fmt_version > 1) {
499       uint8_t bpos;
500       memcpy(&bpos, mm + sba + SOFF_BPOS_U1_V2, 1);
501       rc = fsm->release_mmap(fsm);
502       RCBREAK(rc);
503       if (bpos > 0 && bpos <= SBLK_PAGE_SBLK_NUM_V2) {
504         off_t npage = sba - (bpos - 1) * SBLK_SZ;
505         if (npage != page) {
506           if (page) {
507             if (!fsm->check_allocation_status(fsm, page, SBLK_PAGE_SZ_V2, true)) {
508               rc = fsm->deallocate(fsm, page, SBLK_PAGE_SZ_V2);
509             }
510             RCBREAK(rc);
511           }
512           page = npage;
513         }
514       }
515     } else {
516       rc = fsm->release_mmap(fsm);
517       RCBREAK(rc);
518       // Deallocate `SBLK`
519       rc = fsm->deallocate(fsm, sba, SBLK_SZ);
520       RCBREAK(rc);
521     }
522     // Deallocate `KVBLK`
523     if (kvblkn) {
524       rc = fsm->deallocate(fsm, BLK2ADDR(kvblkn), 1ULL << kvszpow);
525       RCBREAK(rc);
526     }
527   }
528   if (page) {
529     if (!fsm->check_allocation_status(fsm, page, SBLK_PAGE_SZ_V2, true)) {
530       IWRC(fsm->deallocate(fsm, page, SBLK_PAGE_SZ_V2), rc);
531     }
532   }
533   _db_release_lw(&dctx->db);
534   return rc;
535 }
536 
_db_destroy_lw(IWDB * dbp)537 static WUR iwrc _db_destroy_lw(IWDB *dbp) {
538   iwrc rc;
539   uint8_t *mm;
540   IWDB db = *dbp;
541   IWKV iwkv = db->iwkv;
542   IWDB prev = db->prev;
543   IWDB next = db->next;
544   IWFS_FSM *fsm = &iwkv->fsm;
545   uint32_t first_sblkn;
546 
547   khiter_t k = kh_get(DBS, iwkv->dbs, db->id);
548   if (k == kh_end(iwkv->dbs)) {
549     iwlog_ecode_error3(IW_ERROR_INVALID_STATE);
550     return IW_ERROR_INVALID_STATE;
551   }
552   kh_del(DBS, iwkv->dbs, k);
553 
554   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
555   RCRET(rc);
556   if (prev) {
557     prev->next = next;
558     rc = _db_save(prev, false, mm);
559     if (rc) {
560       fsm->release_mmap(fsm);
561       return rc;
562     }
563   }
564   if (next) {
565     next->prev = prev;
566     rc = _db_save(next, false, mm);
567     if (rc) {
568       fsm->release_mmap(fsm);
569       return rc;
570     }
571   }
572   // [magic:u4,dbflg:u1,dbid:u4,next_db_blk:u4,p0:u4,n[24]:u4,c[24]:u4,meta_blk:u4,meta_blkn:u4]:217
573   memcpy(&first_sblkn, mm + db->addr + DOFF_N0_U4, 4);
574   first_sblkn = IW_ITOHL(first_sblkn);
575   fsm->release_mmap(fsm);
576 
577   if (iwkv->first_db && iwkv->first_db->addr == db->addr) {
578     uint64_t llv;
579     db->iwkv->first_db = next;
580     llv = next ? (uint64_t) next->addr : 0;
581     llv = IW_HTOILL(llv);
582     rc = fsm->writehdr(fsm, sizeof(uint32_t) /*skip magic*/, &llv, sizeof(llv));
583   }
584   if (iwkv->last_db && iwkv->last_db->addr == db->addr) {
585     iwkv->last_db = prev;
586   }
587   // Cleanup DB
588   off_t db_addr = db->addr;
589   blkn_t meta_blk = db->meta_blk;
590   blkn_t meta_blkn = db->meta_blkn;
591   db->open = false;
592 
593   DISPOSE_DB_CTX dctx = {
594     .sbn = first_sblkn,
595     .iwkv = iwkv,
596     .db = db
597   };
598   IWRC(_db_dispose_chain(&dctx), rc);
599   if (meta_blk && meta_blkn) {
600     IWRC(fsm->deallocate(fsm, BLK2ADDR(db->meta_blk), BLK2ADDR(db->meta_blkn)), rc);
601   }
602   IWRC(fsm->deallocate(fsm, db_addr, DB_SZ), rc);
603   return rc;
604 }
605 
_db_create_lw(IWKV iwkv,dbid_t dbid,iwdb_flags_t dbflg,IWDB * odb)606 static WUR iwrc _db_create_lw(IWKV iwkv, dbid_t dbid, iwdb_flags_t dbflg, IWDB *odb) {
607   iwrc rc;
608   int rci;
609   uint8_t *mm = 0;
610   off_t baddr = 0, blen;
611   IWFS_FSM *fsm = &iwkv->fsm;
612   *odb = 0;
613   IWDB db = calloc(1, sizeof(struct _IWDB));
614   if (!db) {
615     return iwrc_set_errno(IW_ERROR_ALLOC, errno);
616   }
617   pthread_rwlockattr_t attr;
618   pthread_rwlockattr_init(&attr);
619 #if defined __linux__ && (defined __USE_UNIX98 || defined __USE_XOPEN2K)
620   pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
621 #endif
622   rci = pthread_rwlock_init(&db->rwl, &attr);
623   if (rci) {
624     free(db);
625     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
626   }
627   rci = pthread_spin_init(&db->cursors_slk, 0);
628   if (rci) {
629     pthread_rwlock_destroy(&db->rwl);
630     free(db);
631     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
632   }
633   rc = fsm->allocate(fsm, DB_SZ, &baddr, &blen, IWKV_FSM_ALLOC_FLAGS);
634   if (rc) {
635     _db_release_lw(&db);
636     return rc;
637   }
638   db->iwkv = iwkv;
639   db->dbflg = dbflg;
640   db->addr = baddr;
641   db->id = dbid;
642   db->prev = iwkv->last_db;
643   if (!iwkv->first_db) {
644     uint64_t llv;
645     iwkv->first_db = db;
646     llv = (uint64_t) db->addr;
647     llv = IW_HTOILL(llv);
648     rc = fsm->writehdr(fsm, sizeof(uint32_t) /*skip magic*/, &llv, sizeof(llv));
649   } else if (iwkv->last_db) {
650     iwkv->last_db->next = db;
651   }
652   iwkv->last_db = db;
653   khiter_t k = kh_put(DBS, iwkv->dbs, db->id, &rci);
654   if (rci != -1) {
655     kh_value(iwkv->dbs, k) = db;
656   } else {
657     rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
658     goto finish;
659   }
660   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
661   RCGO(rc, finish);
662   rc = _db_save(db, true, mm);
663   RCGO(rc, finish);
664   if (db->prev) {
665     rc = _db_save(db->prev, false, mm);
666     RCGO(rc, finish);
667   }
668   db->open = true;
669   *odb = db;
670 
671 finish:
672   if (mm) {
673     fsm->release_mmap(fsm);
674   }
675   if (rc) {
676     fsm->deallocate(fsm, baddr, blen);
677     _db_release_lw(&db);
678   }
679   return rc;
680 }
681 
682 //--------------------------  KVBLK
683 
_kvblk_create(IWLCTX * lx,off_t baddr,uint8_t kvbpow,KVBLK ** oblk)684 IW_INLINE void _kvblk_create(IWLCTX *lx, off_t baddr, uint8_t kvbpow, KVBLK **oblk) {
685   KVBLK *kblk = &lx->kaa[lx->kaan];
686   kblk->db = lx->db;
687   kblk->addr = baddr;
688   kblk->maxoff = 0;
689   kblk->idxsz = 2 * IW_VNUMSIZE(0) * KVBLK_IDXNUM;
690   kblk->zidx = 0;
691   kblk->szpow = kvbpow;
692   kblk->flags = KVBLK_DURTY;
693   memset(kblk->pidx, 0, sizeof(kblk->pidx));
694   *oblk = kblk;
695   AAPOS_INC(lx->kaan);
696 }
697 
_kvblk_key_peek(const KVBLK * kb,uint8_t idx,const uint8_t * mm,uint8_t ** obuf,uint32_t * olen)698 IW_INLINE WUR iwrc _kvblk_key_peek(const KVBLK *kb,
699                                    uint8_t idx, const uint8_t *mm, uint8_t **obuf,
700                                    uint32_t *olen) {
701   if (kb->pidx[idx].len) {
702     uint32_t klen, step;
703     const uint8_t *rp = mm + kb->addr + (1ULL << kb->szpow) - kb->pidx[idx].off;
704     IW_READVNUMBUF(rp, klen, step);
705     if (!klen) {
706       *obuf = 0;
707       *olen = 0;
708       iwlog_ecode_error3(IWKV_ERROR_CORRUPTED);
709       return IWKV_ERROR_CORRUPTED;
710     }
711     rp += step;
712     *obuf = (uint8_t *) rp;
713     *olen = klen;
714   } else {
715     *obuf = 0;
716     *olen = 0;
717   }
718   return 0;
719 }
720 
_kvblk_value_peek(const KVBLK * kb,uint8_t idx,const uint8_t * mm,uint8_t ** obuf,uint32_t * olen)721 IW_INLINE void _kvblk_value_peek(const KVBLK *kb, uint8_t idx, const uint8_t *mm, uint8_t **obuf, uint32_t *olen) {
722   assert(idx < KVBLK_IDXNUM);
723   if (kb->pidx[idx].len) {
724     uint32_t klen, step;
725     const uint8_t *rp = mm + kb->addr + (1ULL << kb->szpow) - kb->pidx[idx].off;
726     IW_READVNUMBUF(rp, klen, step);
727     rp += step;
728     rp += klen;
729     *obuf = (uint8_t *) rp;
730     *olen = kb->pidx[idx].len - klen - step;
731   } else {
732     *obuf = 0;
733     *olen = 0;
734   }
735 }
736 
_kvblk_key_get(KVBLK * kb,uint8_t * mm,uint8_t idx,IWKV_val * key)737 static WUR iwrc _kvblk_key_get(KVBLK *kb, uint8_t *mm, uint8_t idx, IWKV_val *key) {
738   assert(mm && idx < KVBLK_IDXNUM);
739   int32_t klen;
740   int step;
741   KVP *kvp = &kb->pidx[idx];
742   key->compound = 0;
743   if (!kvp->len) {
744     key->data = 0;
745     key->size = 0;
746     return 0;
747   }
748   // [klen:vn,key,value]
749   uint8_t *rp = mm + kb->addr + (1ULL << kb->szpow) - kvp->off;
750   IW_READVNUMBUF(rp, klen, step);
751   rp += step;
752   if (klen < 1 || klen > kvp->len || klen > kvp->off) {
753     iwlog_ecode_error3(IWKV_ERROR_CORRUPTED);
754     return IWKV_ERROR_CORRUPTED;
755   }
756   key->size = (size_t) klen;
757   if (kb->db->dbflg & IWDB_VNUM64_KEYS) {
758     // Needed to provide enough buffer in _unpack_effective_key()
759     key->data = malloc(MAX(key->size, sizeof(int64_t)));
760   } else {
761     key->data = malloc(key->size);
762   }
763   if (!key->data) {
764     return iwrc_set_errno(IW_ERROR_ALLOC, errno);
765   }
766   memcpy(key->data, rp, key->size);
767   return 0;
768 }
769 
_kvblk_value_get(KVBLK * kb,uint8_t * mm,uint8_t idx,IWKV_val * val)770 static WUR iwrc _kvblk_value_get(KVBLK *kb, uint8_t *mm, uint8_t idx, IWKV_val *val) {
771   assert(mm && idx < KVBLK_IDXNUM);
772   int32_t klen;
773   int step;
774   KVP *kvp = &kb->pidx[idx];
775   val->compound = 0;
776   if (!kvp->len) {
777     val->data = 0;
778     val->size = 0;
779     return 0;
780   }
781   // [klen:vn,key,value]
782   uint8_t *rp = mm + kb->addr + (1ULL << kb->szpow) - kvp->off;
783   IW_READVNUMBUF(rp, klen, step);
784   rp += step;
785   if (klen < 1 || klen > kvp->len || klen > kvp->off) {
786     iwlog_ecode_error3(IWKV_ERROR_CORRUPTED);
787     return IWKV_ERROR_CORRUPTED;
788   }
789   rp += klen;
790   if (kvp->len > klen + step) {
791     val->size = kvp->len - klen - step;
792     val->data = malloc(val->size);
793     if (!val->data) {
794       iwrc rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
795       val->size = 0;
796       return rc;
797     }
798     memcpy(val->data, rp, val->size);
799   } else {
800     val->data = 0;
801     val->size = 0;
802   }
803   return 0;
804 }
805 
_kvblk_kv_get(KVBLK * kb,uint8_t * mm,uint8_t idx,IWKV_val * key,IWKV_val * val)806 static WUR iwrc _kvblk_kv_get(KVBLK *kb, uint8_t *mm, uint8_t idx, IWKV_val *key, IWKV_val *val) {
807   assert(mm && idx < KVBLK_IDXNUM);
808   int32_t klen;
809   int step;
810   KVP *kvp = &kb->pidx[idx];
811   key->compound = 0;
812   val->compound = 0;
813   if (!kvp->len) {
814     key->data = 0;
815     key->size = 0;
816     val->data = 0;
817     val->size = 0;
818     return 0;
819   }
820   // [klen:vn,key,value]
821   uint8_t *rp = mm + kb->addr + (1ULL << kb->szpow) - kvp->off;
822   IW_READVNUMBUF(rp, klen, step);
823   rp += step;
824   if (klen < 1 || klen > kvp->len || klen > kvp->off) {
825     iwlog_ecode_error3(IWKV_ERROR_CORRUPTED);
826     return IWKV_ERROR_CORRUPTED;
827   }
828   key->size = (size_t) klen;
829   if (kb->db->dbflg & IWDB_VNUM64_KEYS) {
830     // Needed to provide enough buffer in _unpack_effective_key()
831     key->data = malloc(MAX(key->size, sizeof(int64_t)));
832   } else {
833     key->data = malloc(key->size);
834   }
835   if (!key->data) {
836     return iwrc_set_errno(IW_ERROR_ALLOC, errno);
837   }
838   memcpy(key->data, rp, key->size);
839   rp += klen;
840   if (kvp->len > klen + step) {
841     val->size = kvp->len - klen - step;
842     val->data = malloc(val->size);
843     if (!val->data) {
844       iwrc rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
845       free(key->data);
846       key->data = 0;
847       key->size = 0;
848       val->size = 0;
849       return rc;
850     }
851     memcpy(val->data, rp, val->size);
852   } else {
853     val->data = 0;
854     val->size = 0;
855   }
856   return 0;
857 }
858 
_kvblk_at_mm(IWLCTX * lx,off_t addr,uint8_t * mm,KVBLK * kbp,KVBLK ** blkp)859 static WUR iwrc _kvblk_at_mm(IWLCTX *lx, off_t addr, uint8_t *mm, KVBLK *kbp, KVBLK **blkp) {
860   uint8_t *rp;
861   uint16_t sv;
862   int step;
863   iwrc rc = 0;
864   KVBLK *kb = kbp ? kbp : &lx->kaa[lx->kaan];
865   kb->db = lx->db;
866   kb->addr = addr;
867   kb->maxoff = 0;
868   kb->idxsz = 0;
869   kb->zidx = -1;
870   kb->szpow = 0;
871   kb->flags = KVBLK_DEFAULT;
872   memset(kb->pidx, 0, sizeof(kb->pidx));
873 
874   *blkp = 0;
875   rp = mm + addr;
876   memcpy(&kb->szpow, rp, 1);
877   rp += 1;
878   IW_READSV(rp, sv, kb->idxsz);
879   if (IW_UNLIKELY(kb->idxsz > KVBLK_MAX_IDX_SZ)) {
880     rc = IWKV_ERROR_CORRUPTED;
881     iwlog_ecode_error3(rc);
882     goto finish;
883   }
884   for (uint8_t i = 0; i < KVBLK_IDXNUM; ++i) {
885     IW_READVNUMBUF64(rp, kb->pidx[i].off, step);
886     rp += step;
887     IW_READVNUMBUF(rp, kb->pidx[i].len, step);
888     rp += step;
889     if (kb->pidx[i].len) {
890       if (IW_UNLIKELY(!kb->pidx[i].off)) {
891         rc = IWKV_ERROR_CORRUPTED;
892         iwlog_ecode_error3(rc);
893         goto finish;
894       }
895       if (kb->pidx[i].off > kb->maxoff) {
896         kb->maxoff = kb->pidx[i].off;
897       }
898     } else if (kb->zidx < 0) {
899       kb->zidx = i;
900     }
901     kb->pidx[i].ridx = i;
902   }
903   *blkp = kb;
904   assert(rp - (mm + addr) <= (1ULL << kb->szpow));
905   if (!kbp) {
906     AAPOS_INC(lx->kaan);
907   }
908 
909 finish:
910   return rc;
911 }
912 
_kvblk_compacted_offset(KVBLK * kb)913 IW_INLINE off_t _kvblk_compacted_offset(KVBLK *kb) {
914   off_t coff = 0;
915   for (int i = 0; i < KVBLK_IDXNUM; ++i) {
916     coff += kb->pidx[i].len;
917   }
918   return coff;
919 }
920 
_kvblk_compacted_dsize(KVBLK * kb)921 IW_INLINE off_t _kvblk_compacted_dsize(KVBLK *kb) {
922   off_t coff = KVBLK_HDRSZ;
923   for (int i = 0; i < KVBLK_IDXNUM; ++i) {
924     coff += kb->pidx[i].len;
925     coff += IW_VNUMSIZE32(kb->pidx[i].len);
926     coff += IW_VNUMSIZE(kb->pidx[i].off);
927   }
928   return coff;
929 }
930 
_kvblk_sync_mm(KVBLK * kb,uint8_t * mm)931 static WUR iwrc _kvblk_sync_mm(KVBLK *kb, uint8_t *mm) {
932   iwrc rc = 0;
933   if (!(kb->flags & KVBLK_DURTY)) {
934     return rc;
935   }
936   uint16_t sp;
937   uint8_t *szp;
938   uint8_t *wp = mm + kb->addr;
939   uint8_t *sptr = wp;
940   IWDLSNR *dlsnr = kb->db->iwkv->dlsnr;
941   memcpy(wp, &kb->szpow, 1);
942   wp += 1;
943   szp = wp;
944   wp += sizeof(uint16_t);
945   for (int i = 0; i < KVBLK_IDXNUM; ++i) {
946     KVP *kvp = &kb->pidx[i];
947     IW_SETVNUMBUF64(sp, wp, kvp->off);
948     wp += sp;
949     IW_SETVNUMBUF(sp, wp, kvp->len);
950     wp += sp;
951   }
952   sp = wp - szp - sizeof(uint16_t);
953   kb->idxsz = sp;
954   assert(kb->idxsz <= KVBLK_MAX_IDX_SZ);
955   sp = IW_HTOIS(sp);
956   memcpy(szp, &sp, sizeof(uint16_t));
957   assert(wp - (mm + kb->addr) <= (1ULL << kb->szpow));
958   if (dlsnr) {
959     rc = dlsnr->onwrite(dlsnr, kb->addr, sptr, wp - sptr, 0);
960   }
961   kb->flags &= ~KVBLK_DURTY;
962   return rc;
963 }
964 
965 #define _kvblk_sort_kv_lt(v1, v2) \
966   (((v1).off > 0 ? (v1).off : -1UL) < ((v2).off > 0 ? (v2).off : -1UL))
967 
968 // -V:KSORT_INIT:522, 756, 769
KSORT_INIT(kvblk,KVP,_kvblk_sort_kv_lt)969 KSORT_INIT(kvblk, KVP, _kvblk_sort_kv_lt)
970 
971 static WUR iwrc _kvblk_compact_mm(KVBLK *kb, uint8_t *mm) {
972   uint8_t i;
973   off_t coff = _kvblk_compacted_offset(kb);
974   if (coff == kb->maxoff) { // compacted
975     return 0;
976   }
977   KVP tidx[KVBLK_IDXNUM];
978   KVP tidx_tmp[KVBLK_IDXNUM];
979   iwrc rc = 0;
980   uint16_t idxsiz = 0;
981   IWDLSNR *dlsnr = kb->db->iwkv->dlsnr;
982   off_t blkend = kb->addr + (1ULL << kb->szpow);
983   uint8_t *wp = mm + blkend;
984   memcpy(tidx, kb->pidx, sizeof(tidx));
985   ks_mergesort_kvblk(KVBLK_IDXNUM, tidx, tidx_tmp);
986 
987   coff = 0;
988   for (i = 0; i < KVBLK_IDXNUM && tidx[i].off; ++i) {
989 #ifndef NDEBUG
990     if (i > 0) {
991       assert(tidx[i - 1].off < tidx[i].off);
992     }
993 #endif
994     KVP *kvp = &kb->pidx[tidx[i].ridx];
995     off_t noff = coff + kvp->len;
996     if (kvp->off > noff) {
997       assert(noff <= (1ULL << kb->szpow) && kvp->len <= noff);
998       if (dlsnr) {
999         rc = dlsnr->onwrite(dlsnr, blkend - noff, wp - kvp->off, kvp->len, 0);
1000       }
1001       memmove(wp - noff, wp - kvp->off, kvp->len);
1002       kvp->off = noff;
1003     }
1004     coff += kvp->len;
1005     idxsiz += IW_VNUMSIZE(kvp->off);
1006     idxsiz += IW_VNUMSIZE32(kvp->len);
1007   }
1008   idxsiz += (KVBLK_IDXNUM - i) * 2;
1009   for (i = 0; i < KVBLK_IDXNUM; ++i) {
1010     if (!kb->pidx[i].len) {
1011       kb->zidx = i;
1012       break;
1013     }
1014   }
1015   assert(idxsiz <= kb->idxsz);
1016   kb->idxsz = idxsiz;
1017   kb->maxoff = coff;
1018   if (i == KVBLK_IDXNUM) {
1019     kb->zidx = -1;
1020   }
1021   kb->flags |= KVBLK_DURTY;
1022   assert(_kvblk_compacted_offset(kb) == kb->maxoff);
1023   return rc;
1024 }
1025 
_kvblk_maxkvoff(KVBLK * kb)1026 IW_INLINE off_t _kvblk_maxkvoff(KVBLK *kb) {
1027   off_t off = 0;
1028   for (int i = 0; i < KVBLK_IDXNUM; ++i) {
1029     if (kb->pidx[i].off > off) {
1030       off = kb->pidx[i].off;
1031     }
1032   }
1033   return off;
1034 }
1035 
_kvblk_rmkv(KVBLK * kb,uint8_t idx,kvblk_rmkv_opts_t opts)1036 static WUR iwrc _kvblk_rmkv(KVBLK *kb, uint8_t idx, kvblk_rmkv_opts_t opts) {
1037   iwrc rc = 0;
1038   uint8_t *mm = 0;
1039   IWDLSNR *dlsnr = kb->db->iwkv->dlsnr;
1040   IWFS_FSM *fsm = &kb->db->iwkv->fsm;
1041   if (kb->pidx[idx].off >= kb->maxoff) {
1042     kb->maxoff = 0;
1043     for (int i = 0; i < KVBLK_IDXNUM; ++i) {
1044       if (i != idx && kb->pidx[i].off > kb->maxoff) {
1045         kb->maxoff = kb->pidx[i].off;
1046       }
1047     }
1048   }
1049   kb->pidx[idx].len = 0;
1050   kb->pidx[idx].off = 0;
1051   kb->flags |= KVBLK_DURTY;
1052   if (kb->zidx < 0 || idx < kb->zidx) {
1053     kb->zidx = idx;
1054   }
1055   if (!(RMKV_NO_RESIZE & opts) && kb->szpow > KVBLK_INISZPOW) {
1056     off_t nlen = 1ULL << kb->szpow;
1057     off_t dsz = _kvblk_compacted_dsize(kb);
1058     if (nlen >= 2 * dsz) {
1059       uint8_t npow = kb->szpow - 1;
1060       while (npow > KVBLK_INISZPOW && (1ULL << (npow - 1)) >= dsz) {
1061         --npow;
1062       }
1063       rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1064       RCGO(rc, finish);
1065 
1066       rc = _kvblk_compact_mm(kb, mm);
1067       RCGO(rc, finish);
1068 
1069       off_t maxoff = _kvblk_maxkvoff(kb);
1070       if (dlsnr) {
1071         rc = dlsnr->onwrite(dlsnr, kb->addr + (1ULL << npow) - maxoff, mm + kb->addr + nlen - maxoff, maxoff, 0);
1072         RCGO(rc, finish);
1073       }
1074       memmove(mm + kb->addr + (1ULL << npow) - maxoff,
1075               mm + kb->addr + nlen - maxoff,
1076               (size_t) maxoff);
1077 
1078       fsm->release_mmap(fsm);
1079       mm = 0;
1080       rc = fsm->reallocate(fsm, (1ULL << npow), &kb->addr, &nlen, IWKV_FSM_ALLOC_FLAGS);
1081       RCGO(rc, finish);
1082       kb->szpow = npow;
1083       assert(nlen == (1ULL << kb->szpow));
1084       opts |= RMKV_SYNC;
1085     }
1086   }
1087   if (RMKV_SYNC & opts) {
1088     rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1089     RCGO(rc, finish);
1090     IWRC(_kvblk_sync_mm(kb, mm), rc);
1091   }
1092 
1093 finish:
1094   if (mm) {
1095     fsm->release_mmap(fsm);
1096   }
1097   return rc;
1098 }
1099 
_kvblk_addkv(KVBLK * kb,const IWKV_val * key,const IWKV_val * val,uint8_t * oidx,bool raw_key)1100 static WUR iwrc _kvblk_addkv(KVBLK *kb,
1101                              const IWKV_val *key,
1102                              const IWKV_val *val,
1103                              uint8_t *oidx,
1104                              bool raw_key) {
1105   *oidx = 0;
1106 
1107   iwrc rc = 0;
1108   off_t msz;    // max available free space
1109   off_t rsz;    // required size to add new key/value pair
1110   off_t noff;   // offset of new kvpair from end of block
1111   uint8_t *mm, *wp, *sptr;
1112   size_t i, sp;
1113   KVP *kvp;
1114   IWDB db = kb->db;
1115   bool compound = !raw_key && (db->dbflg & IWDB_COMPOUND_KEYS);
1116   IWFS_FSM *fsm = &db->iwkv->fsm;
1117   bool compacted = false;
1118   IWDLSNR *dlsnr = kb->db->iwkv->dlsnr;
1119   IWKV_val *uval = (IWKV_val *) val;
1120 
1121   size_t ksize = key->size;
1122   if (compound) {
1123     ksize += IW_VNUMSIZE(key->compound);
1124   }
1125   off_t psz = IW_VNUMSIZE(ksize) + ksize;
1126 
1127   if (kb->zidx < 0) {
1128     return _IWKV_RC_KVBLOCK_FULL;
1129   }
1130   psz += uval->size;
1131   if (psz > IWKV_MAX_KVSZ) {
1132     return IWKV_ERROR_MAXKVSZ;
1133   }
1134 
1135 start:
1136   // [szpow:u1,idxsz:u2,[ps0:vn,pl0:vn,..., ps32,pl32]____[[KV],...]] // KVBLK
1137   msz = (1ULL << kb->szpow) - (KVBLK_HDRSZ + kb->idxsz + kb->maxoff);
1138   assert(msz >= 0);
1139   noff = kb->maxoff + psz;
1140   rsz = psz + IW_VNUMSIZE(noff) + IW_VNUMSIZE(psz);
1141 
1142   if (msz < rsz) { // not enough space
1143     if (!compacted) {
1144       compacted = true;
1145       if (_kvblk_compacted_offset(kb) != kb->maxoff) {
1146         rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1147         RCGO(rc, finish);
1148         rc = _kvblk_compact_mm(kb, mm);
1149         RCGO(rc, finish);
1150         fsm->release_mmap(fsm);
1151         goto start;
1152       }
1153     }
1154     // resize the whole block
1155     off_t nlen = 1ULL << kb->szpow;
1156     off_t nsz = rsz - msz + nlen;
1157     off_t naddr = kb->addr;
1158     off_t olen = nlen;
1159 
1160     uint8_t npow = kb->szpow;
1161     while ((1ULL << ++npow) < nsz);
1162 
1163     rc = fsm->allocate(fsm, (1ULL << npow), &naddr, &nlen, IWKV_FSM_ALLOC_FLAGS);
1164     RCGO(rc, finish);
1165     assert(nlen == (1ULL << npow));
1166     rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1167     RCGO(rc, finish);
1168     if (dlsnr) {
1169       rc = dlsnr->onwrite(dlsnr, naddr, mm + kb->addr, KVBLK_HDRSZ, 0);
1170       RCGO(rc, finish);
1171       memcpy(mm + naddr, mm + kb->addr, KVBLK_HDRSZ);
1172       rc = dlsnr->onwrite(dlsnr, naddr + nlen - kb->maxoff, mm + kb->addr + olen - kb->maxoff, kb->maxoff, 0);
1173       RCGO(rc, finish);
1174       memcpy(mm + naddr + nlen - kb->maxoff, mm + kb->addr + olen - kb->maxoff, (size_t) kb->maxoff);
1175     } else {
1176       memcpy(mm + naddr, mm + kb->addr, KVBLK_HDRSZ);
1177       memcpy(mm + naddr + nlen - kb->maxoff, mm + kb->addr + olen - kb->maxoff, (size_t) kb->maxoff);
1178     }
1179     fsm->release_mmap(fsm);
1180     rc = fsm->deallocate(fsm, kb->addr, olen);
1181     RCGO(rc, finish);
1182 
1183     kb->addr = naddr;
1184     kb->szpow = npow;
1185   }
1186   *oidx = (uint8_t) kb->zidx;
1187   kvp = &kb->pidx[kb->zidx];
1188   kvp->len = (uint32_t) psz;
1189   kvp->off = noff;
1190   kvp->ridx = (uint8_t) kb->zidx;
1191   kb->maxoff = noff;
1192   kb->flags |= KVBLK_DURTY;
1193   for (i = 0; i < KVBLK_IDXNUM; ++i) {
1194     if (!kb->pidx[i].len && i != kb->zidx) {
1195       kb->zidx = i;
1196       break;
1197     }
1198   }
1199   if (i >= KVBLK_IDXNUM) {
1200     kb->zidx = -1;
1201   }
1202   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1203   RCGO(rc, finish);
1204   assert((1ULL << kb->szpow) >= KVBLK_HDRSZ + kb->idxsz + kb->maxoff);
1205   assert(kvp->off < (1ULL << kb->szpow) && kvp->len <= kvp->off);
1206   wp = mm + kb->addr + (1ULL << kb->szpow) - kvp->off;
1207   sptr = wp;
1208   // [klen:vn,key,value]
1209   IW_SETVNUMBUF(sp, wp, ksize);
1210   wp += sp;
1211   if (compound) {
1212     IW_SETVNUMBUF64(sp, wp, key->compound);
1213     wp += sp;
1214   }
1215   memcpy(wp, key->data, key->size);
1216   wp += key->size;
1217   memcpy(wp, uval->data, uval->size);
1218   wp += uval->size;
1219 #ifndef NDEBUG
1220   assert(wp - sptr == kvp->len);
1221 #endif
1222   if (dlsnr) {
1223     rc = dlsnr->onwrite(dlsnr, kb->addr + (1ULL << kb->szpow) - kvp->off, sptr, wp - sptr, 0);
1224   }
1225   fsm->release_mmap(fsm);
1226 
1227 finish:
1228   return rc;
1229 }
1230 
_kvblk_updatev(KVBLK * kb,uint8_t * idxp,const IWKV_val * key,const IWKV_val * val)1231 static WUR iwrc _kvblk_updatev(KVBLK *kb,
1232                                uint8_t *idxp,
1233                                const IWKV_val *key, /* Nullable */
1234                                const IWKV_val *val) {
1235   assert(*idxp < KVBLK_IDXNUM);
1236   int32_t i;
1237   uint32_t len, nlen, sz;
1238   uint8_t pidx = *idxp, *mm = 0, *wp, *sp;
1239   IWDB db = kb->db;
1240   IWDLSNR *dlsnr = kb->db->iwkv->dlsnr;
1241   IWKV_val *uval = (IWKV_val *) val;
1242   IWKV_val *ukey = (IWKV_val *) key;
1243   IWKV_val skey; // stack allocated key/val
1244   KVP *kvp = &kb->pidx[pidx];
1245   size_t kbsz = 1ULL << kb->szpow; // kvblk size
1246   off_t freesz = kbsz - KVBLK_HDRSZ - kb->idxsz - kb->maxoff; // free space available
1247   IWFS_FSM *fsm = &db->iwkv->fsm;
1248 
1249   iwrc rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1250   RCRET(rc);
1251   assert(freesz >= 0);
1252 
1253   wp = mm + kb->addr + kbsz - kvp->off;
1254   sp = wp;
1255   IW_READVNUMBUF(wp, len, sz);
1256   wp += sz;
1257   if (ukey && len != ukey->size) {
1258     rc = IWKV_ERROR_CORRUPTED;
1259     iwlog_ecode_error3(rc);
1260     goto finish;
1261   }
1262   wp += len;
1263   off_t rsize = sz + len + uval->size; // required size
1264   if (rsize <= kvp->len) {
1265     memcpy(wp, uval->data, uval->size);
1266     if (dlsnr) {
1267       rc = dlsnr->onwrite(dlsnr, wp - mm, uval->data, uval->size, 0);
1268       RCGO(rc, finish);
1269     }
1270     wp += uval->size;
1271     if ((wp - sp) != kvp->len) {
1272       kvp->len = wp - sp;
1273       kb->flags |= KVBLK_DURTY;
1274     }
1275   } else {
1276     KVP tidx[KVBLK_IDXNUM];
1277     KVP tidx_tmp[KVBLK_IDXNUM];
1278     off_t koff = kb->pidx[pidx].off;
1279     memcpy(tidx, kb->pidx, KVBLK_IDXNUM * sizeof(kb->pidx[0]));
1280     ks_mergesort_kvblk(KVBLK_IDXNUM, tidx, tidx_tmp);
1281     kb->flags |= KVBLK_DURTY;
1282     if (!ukey) { // we need a key
1283       ukey = &skey;
1284       rc = _kvblk_key_get(kb, mm, pidx, ukey);
1285       RCGO(rc, finish);
1286     }
1287     for (i = 0; i < KVBLK_IDXNUM; ++i) {
1288       if (tidx[i].off == koff) {
1289         if (koff - (i > 0 ? tidx[i - 1].off : 0) >= rsize) {
1290           nlen = wp + uval->size - sp;
1291           if (!(nlen > kvp->len && freesz - IW_VNUMSIZE32(nlen) + IW_VNUMSIZE32(kvp->len) < 0)) { // enough space?
1292             memcpy(wp, uval->data, uval->size);
1293             if (dlsnr) {
1294               rc = dlsnr->onwrite(dlsnr, wp - mm, uval->data, uval->size, 0);
1295               RCGO(rc, finish);
1296             }
1297             wp += uval->size;
1298             kvp->len = nlen;
1299             break;;
1300           }
1301         }
1302         mm = 0;
1303         fsm->release_mmap(fsm);
1304         rc = _kvblk_rmkv(kb, pidx, RMKV_NO_RESIZE);
1305         RCGO(rc, finish);
1306         rc = _kvblk_addkv(kb, ukey, uval, idxp, false);
1307         break;
1308       }
1309     }
1310   }
1311 
1312 finish:
1313   if (ukey != key) {
1314     _kv_val_dispose(ukey);
1315   }
1316   if (mm) {
1317     IWRC(fsm->release_mmap(fsm), rc);
1318   }
1319   return rc;
1320 }
1321 
1322 //--------------------------  SBLK
1323 
_sblk_release(IWLCTX * lx,SBLK ** sblkp)1324 IW_INLINE void _sblk_release(IWLCTX *lx, SBLK **sblkp) {
1325   assert(sblkp && *sblkp);
1326   SBLK *sblk = *sblkp;
1327   sblk->flags &= ~SBLK_CACHE_FLAGS; // clear cache flags
1328   sblk->flags &= ~SBLK_DURTY; // clear dirty flag
1329   sblk->kvblk = 0;
1330   *sblkp = 0;
1331 }
1332 
_sblk_loadkvblk_mm(IWLCTX * lx,SBLK * sblk,uint8_t * mm)1333 IW_INLINE WUR iwrc _sblk_loadkvblk_mm(IWLCTX *lx, SBLK *sblk, uint8_t *mm) {
1334   if (!sblk->kvblk && sblk->kvblkn) {
1335     return _kvblk_at_mm(lx, BLK2ADDR(sblk->kvblkn), mm, 0, &sblk->kvblk);
1336   } else {
1337     return 0;
1338   }
1339 }
1340 
_sblk_is_only_one_on_page_v2(IWLCTX * lx,uint8_t * mm,SBLK * sblk,off_t * page_addr)1341 static bool _sblk_is_only_one_on_page_v2(IWLCTX *lx, uint8_t *mm, SBLK *sblk, off_t *page_addr) {
1342   *page_addr = 0;
1343   if (sblk->bpos > 0 && sblk->bpos <= SBLK_PAGE_SBLK_NUM_V2) {
1344     off_t addr = sblk->addr - (sblk->bpos - 1) * SBLK_SZ;
1345     *page_addr = addr;
1346     for (int i = 0; i < SBLK_PAGE_SBLK_NUM_V2; ++i) {
1347       if (i != sblk->bpos - 1) {
1348         uint8_t bv;
1349         memcpy(&bv, mm + addr + i * SBLK_SZ + SOFF_BPOS_U1_V2, 1);
1350         if (bv) {
1351           return false;
1352         }
1353       }
1354     }
1355   } else {
1356     return false; // be safe
1357   }
1358   return true;
1359 }
1360 
_sblk_destroy(IWLCTX * lx,SBLK ** sblkp)1361 IW_INLINE WUR iwrc _sblk_destroy(IWLCTX *lx, SBLK **sblkp) {
1362   assert(sblkp && *sblkp && (*sblkp)->addr);
1363   iwrc rc = 0;
1364   SBLK *sblk = *sblkp;
1365   lx->destroy_addr = sblk->addr;
1366 
1367   if (!(sblk->flags & SBLK_DB)) {
1368     uint8_t kvb_szpow, *mm;
1369     IWDLSNR *dlsnr = lx->db->iwkv->dlsnr;
1370     IWFS_FSM *fsm = &lx->db->iwkv->fsm;
1371     off_t kvb_addr = BLK2ADDR(sblk->kvblkn);
1372     rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1373     RCRET(rc);
1374 
1375     if (!sblk->kvblk) {
1376       // Read KVBLK size as power of two
1377       memcpy(&kvb_szpow, mm + kvb_addr + KBLK_SZPOW_OFF, 1);
1378     } else {
1379       kvb_szpow = sblk->kvblk->szpow;
1380     }
1381     if (lx->db->lcnt[sblk->lvl]) {
1382       lx->db->lcnt[sblk->lvl]--;
1383       lx->db->flags |= SBLK_DURTY;
1384     }
1385     _dbcache_remove_lw(lx, sblk);
1386     if (lx->db->iwkv->fmt_version > 1) {
1387       off_t paddr;
1388       if (_sblk_is_only_one_on_page_v2(lx, mm, sblk, &paddr)) {
1389         fsm->release_mmap(fsm);
1390         // Deallocate whole page
1391         rc = fsm->deallocate(fsm, paddr, SBLK_PAGE_SZ_V2);
1392       } else {
1393         memset(mm + sblk->addr + SOFF_BPOS_U1_V2, 0, 1);
1394         fsm->release_mmap(fsm);
1395         if (dlsnr) {
1396           dlsnr->onset(dlsnr, sblk->addr + SOFF_BPOS_U1_V2, 0, 1, 0);
1397         }
1398       }
1399     } else {
1400       fsm->release_mmap(fsm);
1401       rc = fsm->deallocate(fsm, sblk->addr, SBLK_SZ);
1402     }
1403     IWRC(fsm->deallocate(fsm, kvb_addr, 1ULL << kvb_szpow), rc);
1404   }
1405   _sblk_release(lx, sblkp);
1406   return rc;
1407 }
1408 
_sblk_genlevel(IWDB db)1409 IW_INLINE uint8_t _sblk_genlevel(IWDB db) {
1410   uint8_t lvl;
1411 #ifdef IW_TESTS
1412   if (iwkv_next_level >= 0) {
1413     lvl = (uint8_t) iwkv_next_level;
1414     iwkv_next_level = -1;
1415     assert(lvl < SLEVELS);
1416     return lvl;
1417   }
1418 #endif
1419   uint32_t r = iwu_rand_u32();
1420   for (lvl = 0; lvl < SLEVELS && !(r & 1); ++lvl) r >>= 1;
1421   uint8_t ret = IW_UNLIKELY(lvl >= SLEVELS) ? SLEVELS - 1 : lvl;
1422   while (ret > 0 && db->lcnt[ret - 1] == 0) {
1423     --ret;
1424   }
1425   return ret;
1426 }
1427 
_sblk_create_v1(IWLCTX * lx,uint8_t nlevel,uint8_t kvbpow,off_t baddr,uint8_t bpos,SBLK ** oblk)1428 static WUR iwrc _sblk_create_v1(IWLCTX *lx, uint8_t nlevel, uint8_t kvbpow, off_t baddr, uint8_t bpos, SBLK **oblk) {
1429   iwrc rc;
1430   SBLK *sblk;
1431   KVBLK *kvblk;
1432   off_t blen;
1433   IWFS_FSM *fsm = &lx->db->iwkv->fsm;
1434   if (kvbpow < KVBLK_INISZPOW) {
1435     kvbpow = KVBLK_INISZPOW;
1436   }
1437   *oblk = 0;
1438   if (!bpos) {
1439     rc = fsm->allocate(fsm, SBLK_SZ + (1ULL << kvbpow), &baddr, &blen, IWKV_FSM_ALLOC_FLAGS);
1440     RCRET(rc);
1441     assert(blen - SBLK_SZ == (1ULL << kvbpow));
1442     _kvblk_create(lx, baddr + SBLK_SZ, kvbpow, &kvblk);
1443   } else {
1444     // Allocate kvblk as separate chunk
1445     off_t kblkaddr = 0;
1446     rc = fsm->allocate(fsm, (1ULL << kvbpow), &kblkaddr, &blen, IWKV_FSM_ALLOC_FLAGS);
1447     assert(blen == (1ULL << kvbpow));
1448     _kvblk_create(lx, kblkaddr, kvbpow, &kvblk);
1449   }
1450   sblk = &lx->saa[lx->saan];
1451   sblk->db = lx->db;
1452   sblk->db->lcnt[nlevel]++;
1453   sblk->db->flags |= SBLK_DURTY;
1454   sblk->addr = baddr;
1455   sblk->flags = (SBLK_DURTY | SBLK_CACHE_PUT);
1456   sblk->lvl = nlevel;
1457   sblk->p0 = 0;
1458   memset(sblk->n, 0, sizeof(sblk->n));
1459   sblk->kvblk = kvblk;
1460   sblk->kvblkn = ADDR2BLK(kvblk->addr);
1461   sblk->lkl = 0;
1462   sblk->pnum = 0;
1463   sblk->bpos = bpos;
1464   memset(sblk->pi, 0, sizeof(sblk->pi));
1465   *oblk = sblk;
1466   AAPOS_INC(lx->saan);
1467   return 0;
1468 }
1469 
_sblk_find_free_page_slot_v2(IWLCTX * lx,uint8_t * mm,SBLK * sblk,off_t * obaddr,uint8_t * oslot)1470 static void _sblk_find_free_page_slot_v2(IWLCTX *lx, uint8_t *mm, SBLK *sblk, off_t *obaddr, uint8_t *oslot) {
1471   if (sblk->bpos < 1 || sblk->bpos > SBLK_PAGE_SBLK_NUM_V2) {
1472     *obaddr = 0;
1473     *oslot = 0;
1474     return;
1475   }
1476   off_t paddr = sblk->addr - (sblk->bpos - 1) * SBLK_SZ;
1477   for (int i = sblk->bpos + 1; i <= SBLK_PAGE_SBLK_NUM_V2; ++i) {
1478     uint8_t slot;
1479     memcpy(&slot, mm + paddr + (i - 1) * SBLK_SZ + SOFF_BPOS_U1_V2, 1);
1480     if (!slot) {
1481       *obaddr = paddr + (i - 1) * SBLK_SZ;
1482       *oslot = i;
1483       return;
1484     }
1485   }
1486   for (int i = sblk->bpos - 1; i > 0; --i) {
1487     uint8_t slot;
1488     memcpy(&slot, mm + paddr + (i - 1) * SBLK_SZ + SOFF_BPOS_U1_V2, 1);
1489     if (!slot) {
1490       *obaddr = paddr + (i - 1) * SBLK_SZ;
1491       *oslot = i;
1492       return;
1493     }
1494   }
1495   *obaddr = 0;
1496   *oslot = 0;
1497 }
1498 
1499 /// Create
_sblk_create_v2(IWLCTX * lx,uint8_t nlevel,uint8_t kvbpow,SBLK * lower,SBLK * upper,SBLK ** oblk)1500 static WUR iwrc _sblk_create_v2(IWLCTX *lx, uint8_t nlevel, uint8_t kvbpow, SBLK *lower, SBLK *upper, SBLK **oblk) {
1501   off_t baddr = 0;
1502   uint8_t bpos = 0, *mm;
1503   IWFS_FSM *fsm = &lx->db->iwkv->fsm;
1504   SBLK *_lower = lower;
1505   SBLK *_upper = upper;
1506 
1507   for (int i = SLEVELS - 1; i >= 0; --i) {
1508     if (lx->pupper[i] && lx->pupper[i]->lvl >= nlevel) {
1509       _upper = lx->pupper[i];
1510     }
1511     if (lx->plower[i] && lx->plower[i]->lvl >= nlevel) {
1512       _lower = lx->plower[i];
1513     }
1514   }
1515 
1516   iwrc rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1517   RCRET(rc);
1518   _sblk_find_free_page_slot_v2(lx, mm, _lower, &baddr, &bpos);
1519   if (!baddr && _upper && _upper->addr != _lower->addr) {
1520     _sblk_find_free_page_slot_v2(lx, mm, _upper, &baddr, &bpos);
1521   }
1522   if (!baddr) {
1523     if (_lower->addr != lower->addr) {
1524       _sblk_find_free_page_slot_v2(lx, mm, lower, &baddr, &bpos);
1525     }
1526     if (!baddr && upper && _upper && _upper->addr != upper->addr) {
1527       _sblk_find_free_page_slot_v2(lx, mm, upper, &baddr, &bpos);
1528     }
1529   }
1530   fsm->release_mmap(fsm);
1531 
1532   if (!baddr) {
1533     // No free slots - allocate new SBLK page
1534     off_t blen;
1535     bpos = 1;
1536     IWDLSNR *dlsnr = lx->db->iwkv->dlsnr;
1537     rc = fsm->allocate(fsm, SBLK_PAGE_SZ_V2, &baddr, &blen, IWKV_FSM_ALLOC_FLAGS);
1538     RCRET(rc);
1539     rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1540     RCRET(rc);
1541     // Fill page to zero
1542     memset(mm + baddr, 0, blen);
1543     if (dlsnr) {
1544       rc = dlsnr->onset(dlsnr, baddr, 0, blen, 0);
1545     }
1546     fsm->release_mmap(fsm);
1547     RCRET(rc);
1548   }
1549   return _sblk_create_v1(lx, nlevel, kvbpow, baddr, bpos, oblk);
1550 }
1551 
_sblk_create(IWLCTX * lx,uint8_t nlevel,uint8_t kvbpow,SBLK * lower,SBLK * upper,SBLK ** oblk)1552 IW_INLINE WUR iwrc _sblk_create(IWLCTX *lx, uint8_t nlevel, uint8_t kvbpow, SBLK *lower, SBLK *upper, SBLK **oblk) {
1553   if (lx->db->iwkv->fmt_version > 1) {
1554     return _sblk_create_v2(lx, nlevel, kvbpow, lower, upper, oblk);
1555   } else {
1556     return _sblk_create_v1(lx, nlevel, kvbpow, lower->addr, 0, oblk);
1557   }
1558 }
1559 
_sblk_at2(IWLCTX * lx,off_t addr,sblk_flags_t flgs,SBLK * sblk)1560 static WUR iwrc _sblk_at2(IWLCTX *lx, off_t addr, sblk_flags_t flgs, SBLK *sblk) {
1561   iwrc rc;
1562   uint8_t *mm;
1563   uint32_t lv;
1564   sblk_flags_t flags = lx->sbflags | flgs;
1565   IWDB db = lx->db;
1566   IWFS_FSM *fsm = &db->iwkv->fsm;
1567   sblk->kvblk = 0;
1568   sblk->bpos = 0;
1569   sblk->db = db;
1570 
1571   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1572   RCRET(rc);
1573 
1574   if (IW_UNLIKELY(addr == db->addr)) {
1575     uint8_t *rp = mm + addr + DOFF_N0_U4;
1576     // [magic:u4,dbflg:u1,dbid:u4,next_db_blk:u4,p0:u4,n[24]:u4,c[24]:u4,meta_blk:u4,meta_blkn:u4]:217
1577     sblk->addr = addr;
1578     sblk->flags = SBLK_DB | flags;
1579     sblk->lvl = 0;
1580     sblk->p0 = 0;
1581     sblk->kvblkn = 0;
1582     sblk->lkl = 0;
1583     sblk->pnum = KVBLK_IDXNUM;
1584     memset(sblk->pi, 0, sizeof(sblk->pi));
1585     for (int i = 0; i < SLEVELS; ++i) {
1586       IW_READLV(rp, lv, sblk->n[i]);
1587       if (sblk->n[i]) {
1588         ++sblk->lvl;
1589       } else {
1590         break;
1591       }
1592     }
1593     if (sblk->lvl) --sblk->lvl;
1594   } else if (addr) {
1595     uint8_t uflags;
1596     uint8_t *rp = mm + addr;
1597     sblk->addr = addr;
1598     // [flags:u1,lvl:u1,lkl:u1,pnum:u1,p0:u4,kblk:u4,pi:u1[32],n:u4[24],bpos:u1,lk:u115]:u256
1599     memcpy(&uflags, rp++, 1);
1600     sblk->flags = uflags;
1601     if (sblk->flags & ~SBLK_PERSISTENT_FLAGS) {
1602       rc = IWKV_ERROR_CORRUPTED;
1603       iwlog_ecode_error3(rc);
1604       goto finish;
1605     }
1606     sblk->flags |= flags;
1607     memcpy(&sblk->lvl, rp++, 1);
1608     if (sblk->lvl >= SLEVELS) {
1609       rc = IWKV_ERROR_CORRUPTED;
1610       iwlog_ecode_error3(rc);
1611       goto finish;
1612     }
1613     memcpy(&sblk->lkl, rp++, 1);
1614     if (sblk->lkl > db->iwkv->pklen) {
1615       rc = IWKV_ERROR_CORRUPTED;
1616       iwlog_ecode_error3(rc);
1617       goto finish;
1618     }
1619     memcpy(&sblk->pnum, rp++, 1);
1620     if (sblk->pnum < 0) {
1621       rc = IWKV_ERROR_CORRUPTED;
1622       iwlog_ecode_error3(rc);
1623       goto finish;
1624     }
1625     memcpy(&sblk->p0, rp, 4);
1626     sblk->p0 = IW_ITOHL(sblk->p0);
1627     rp += 4;
1628     memcpy(&sblk->kvblkn, rp, 4);
1629     sblk->kvblkn = IW_ITOHL(sblk->kvblkn);
1630     rp += 4;
1631     memcpy(sblk->pi, rp, KVBLK_IDXNUM);
1632     rp += KVBLK_IDXNUM;
1633     for (int i = 0; i <= sblk->lvl; ++i) {
1634       memcpy(&sblk->n[i], rp, 4);
1635       sblk->n[i] = IW_ITOHL(sblk->n[i]);
1636       rp += 4;
1637     }
1638     if (db->iwkv->fmt_version > 1) {
1639       rp = mm + addr + SOFF_BPOS_U1_V2;
1640       memcpy(&sblk->bpos, rp++, 1);
1641     } else {
1642       rp = mm + addr + SOFF_LK_V1;
1643     }
1644     // Lower key
1645     memcpy(sblk->lk, rp, (size_t) sblk->lkl);
1646 
1647   } else { // Database tail
1648     uint8_t *rp = mm + db->addr + DOFF_P0_U4;
1649     sblk->addr = 0;
1650     sblk->flags = SBLK_DB | flags;
1651     sblk->lvl = 0;
1652     sblk->kvblkn = 0;
1653     sblk->lkl = 0;
1654     sblk->pnum = KVBLK_IDXNUM;
1655     memset(sblk->pi, 0, sizeof(sblk->pi));
1656     IW_READLV(rp, lv, sblk->p0);
1657     if (!sblk->p0) {
1658       sblk->p0 = ADDR2BLK(db->addr);
1659     }
1660   }
1661 
1662 finish:
1663   fsm->release_mmap(fsm);
1664   return rc;
1665 }
1666 
_sblk_at(IWLCTX * lx,off_t addr,sblk_flags_t flgs,SBLK ** sblkp)1667 IW_INLINE WUR iwrc _sblk_at(IWLCTX *lx, off_t addr, sblk_flags_t flgs, SBLK **sblkp) {
1668   *sblkp = 0;
1669   SBLK *sblk = &lx->saa[lx->saan];
1670   iwrc rc = _sblk_at2(lx, addr, flgs, sblk);
1671   AAPOS_INC(lx->saan);
1672   *sblkp = sblk;
1673   return rc;
1674 }
1675 
_sblk_sync_mm(IWLCTX * lx,SBLK * sblk,uint8_t * mm)1676 static WUR iwrc _sblk_sync_mm(IWLCTX *lx, SBLK *sblk, uint8_t *mm) {
1677   iwrc rc = 0;
1678   if (sblk->flags & SBLK_DURTY) {
1679     uint32_t lv;
1680     IWDLSNR *dlsnr = lx->db->iwkv->dlsnr;
1681     sblk->flags &= ~SBLK_DURTY;
1682     if (IW_UNLIKELY(sblk->flags & SBLK_DB)) {
1683       uint8_t *sp;
1684       uint8_t *wp = mm + sblk->db->addr;
1685       if (sblk->addr) {
1686         assert(sblk->addr == sblk->db->addr);
1687         wp += DOFF_N0_U4;
1688         sp = wp;
1689         // [magic:u4,dbflg:u1,dbid:u4,next_db_blk:u4,p0:u4,n[24]:u4,c[24]:u4,meta_blk:u4,meta_blkn:u4]:217
1690         for (int i = 0; i < SLEVELS; ++i) {
1691           IW_WRITELV(wp, lv, sblk->n[i]);
1692         }
1693         assert(wp - (mm + sblk->db->addr) <= SBLK_SZ);
1694         for (int i = 0; i < SLEVELS; ++i) {
1695           IW_WRITELV(wp, lv, lx->db->lcnt[i]);
1696         }
1697       } else { // Database tail
1698         wp += DOFF_P0_U4;
1699         sp = wp;
1700         IW_WRITELV(wp, lv, sblk->p0);
1701         assert(wp - (mm + sblk->db->addr) <= SBLK_SZ);
1702       }
1703       if (dlsnr) {
1704         rc = dlsnr->onwrite(dlsnr, sp - mm, sp, wp - sp, 0);
1705       }
1706       return rc;
1707     } else {
1708       uint8_t *wp = mm + sblk->addr;
1709       sblk_flags_t flags = (sblk->flags & SBLK_PERSISTENT_FLAGS);
1710       uint8_t uflags = flags;
1711       assert(sblk->lkl <= lx->db->iwkv->pklen);
1712       // [u1:flags,lvl:u1,lkl:u1,pnum:u1,p0:u4,kblk:u4,[pi0:u1,... pi32],n0-n23:u4,lk:u116]:u256
1713       wp += SOFF_FLAGS_U1;
1714       memcpy(wp++, &uflags, 1);
1715       memcpy(wp++, &sblk->lvl, 1);
1716       memcpy(wp++, &sblk->lkl, 1);
1717       memcpy(wp++, &sblk->pnum, 1);
1718       IW_WRITELV(wp, lv, sblk->p0);
1719       IW_WRITELV(wp, lv, sblk->kvblkn);
1720       memcpy(wp, sblk->pi, KVBLK_IDXNUM);
1721       wp = mm + sblk->addr + SOFF_N0_U4;
1722       for (int i = 0; i <= sblk->lvl; ++i) {
1723         IW_WRITELV(wp, lv, sblk->n[i]);
1724       }
1725       if (lx->db->iwkv->fmt_version > 1) {
1726         wp = mm + sblk->addr + SOFF_BPOS_U1_V2;
1727         memcpy(wp++, &sblk->bpos, 1);
1728       } else {
1729         wp = mm + sblk->addr + SOFF_LK_V1;
1730       }
1731       memcpy(wp, sblk->lk, (size_t) sblk->lkl);
1732       if (dlsnr) {
1733         rc = dlsnr->onwrite(dlsnr, sblk->addr, mm + sblk->addr, SOFF_END, 0);
1734         RCRET(rc);
1735       }
1736     }
1737   }
1738   if (sblk->kvblk && (sblk->kvblk->flags & KVBLK_DURTY)) {
1739     IWRC(_kvblk_sync_mm(sblk->kvblk, mm), rc);
1740   }
1741   if (sblk->flags & SBLK_CACHE_UPDATE) {
1742     _dbcache_update_lw(lx, sblk);
1743   }
1744   return rc;
1745 }
1746 
_sblk_sync(IWLCTX * lx,SBLK * sblk)1747 IW_INLINE WUR iwrc _sblk_sync(IWLCTX *lx, SBLK *sblk) {
1748   if ((sblk->flags & SBLK_DURTY) || (sblk->kvblk && (sblk->kvblk->flags & KVBLK_DURTY))) {
1749     uint8_t *mm;
1750     IWFS_FSM *fsm = &lx->db->iwkv->fsm;
1751     iwrc rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1752     RCRET(rc);
1753     rc = _sblk_sync_mm(lx, sblk, mm);
1754     fsm->release_mmap(fsm);
1755     return rc;
1756   }
1757   return 0;
1758 }
1759 
_sblk_sync_and_release_mm(IWLCTX * lx,SBLK ** sblkp,uint8_t * mm)1760 IW_INLINE WUR iwrc _sblk_sync_and_release_mm(IWLCTX *lx, SBLK **sblkp, uint8_t *mm) {
1761   SBLK *sblk = *sblkp;
1762   if (lx->destroy_addr && lx->destroy_addr == sblk->addr) {
1763     return 0;
1764   }
1765   iwrc rc = 0;
1766   if (mm) {
1767     rc = _sblk_sync_mm(lx, *sblkp, mm);
1768   }
1769   _sblk_release(lx, sblkp);
1770   return rc;
1771 }
1772 
_sblk_find_pi_mm(SBLK * sblk,IWLCTX * lx,const uint8_t * mm,bool * found,uint8_t * idxp)1773 static WUR iwrc _sblk_find_pi_mm(SBLK *sblk, IWLCTX *lx, const uint8_t *mm, bool *found, uint8_t *idxp) {
1774   *found = false;
1775   if (sblk->flags & SBLK_DB) {
1776     *idxp = KVBLK_IDXNUM;
1777     return 0;
1778   }
1779   uint8_t *k;
1780   uint32_t kl;
1781   int idx = 0, lb = 0, ub = sblk->pnum - 1;
1782   iwdb_flags_t dbflg = lx->db->dbflg;
1783 
1784   if (sblk->pnum < 1) {
1785     *idxp = 0;
1786     return 0;
1787   }
1788   while (1) {
1789     idx = (ub + lb) / 2;
1790     iwrc rc = _kvblk_key_peek(sblk->kvblk, sblk->pi[idx], mm, &k, &kl);
1791     RCRET(rc);
1792     int cr = _cmp_keys(dbflg, k, kl, lx->key);
1793     if (!cr) {
1794       *found = true;
1795       break;
1796     } else if (cr < 0) {
1797       lb = idx + 1;
1798       if (lb > ub) {
1799         idx = lb;
1800         break;
1801       }
1802     } else {
1803       ub = idx - 1;
1804       if (lb > ub) {
1805         break;
1806       }
1807     }
1808   }
1809   *idxp = idx;
1810   return 0;
1811 }
1812 
_sblk_insert_pi_mm(SBLK * sblk,uint8_t nidx,IWLCTX * lx,const uint8_t * mm,uint8_t * idxp)1813 static WUR iwrc _sblk_insert_pi_mm(SBLK *sblk, uint8_t nidx, IWLCTX *lx,
1814                                    const uint8_t *mm, uint8_t *idxp) {
1815   assert(sblk->kvblk);
1816 
1817   uint8_t *k;
1818   uint32_t kl;
1819   int idx = 0, lb = 0, ub = sblk->pnum - 1, nels = sblk->pnum; // NOLINT
1820 
1821   if (nels < 1) {
1822     sblk->pi[0] = nidx;
1823     ++sblk->pnum;
1824     *idxp = 0;
1825     return 0;
1826   }
1827   iwdb_flags_t dbflg = sblk->db->dbflg;
1828   while (1) {
1829     idx = (ub + lb) / 2;
1830     iwrc rc = _kvblk_key_peek(sblk->kvblk, sblk->pi[idx], mm, &k, &kl);
1831     RCRET(rc);
1832     int cr = _cmp_keys(dbflg, k, kl, lx->key);
1833     if (!cr) {
1834       break;
1835     } else if (cr < 0) {
1836       lb = idx + 1;
1837       if (lb > ub) {
1838         idx = lb;
1839         ++sblk->pnum;
1840         break;
1841       }
1842     } else {
1843       ub = idx - 1;
1844       if (lb > ub) {
1845         ++sblk->pnum;
1846         break;
1847       }
1848     }
1849   }
1850   if (nels - idx > 0) {
1851     memmove(sblk->pi + idx + 1, sblk->pi + idx, nels - idx);
1852   }
1853   sblk->pi[idx] = nidx;
1854   *idxp = idx;
1855   return 0;
1856 }
1857 
_sblk_addkv2(SBLK * sblk,int8_t idx,const IWKV_val * key,const IWKV_val * val,bool raw_key)1858 static WUR iwrc _sblk_addkv2(SBLK *sblk,
1859                              int8_t idx,
1860                              const IWKV_val *key,
1861                              const IWKV_val *val,
1862                              bool raw_key) {
1863   assert(sblk && key && key->size && key->data && val && idx >= 0 && sblk->kvblk);
1864 
1865   uint8_t kvidx;
1866   IWDB db = sblk->db;
1867   KVBLK *kvblk = sblk->kvblk;
1868   if (sblk->pnum >= KVBLK_IDXNUM) {
1869     return _IWKV_RC_KVBLOCK_FULL;
1870   }
1871 
1872   iwrc rc = _kvblk_addkv(kvblk, key, val, &kvidx, raw_key);
1873   RCRET(rc);
1874   if (sblk->pnum - idx > 0) {
1875     memmove(sblk->pi + idx + 1, sblk->pi + idx, sblk->pnum - idx);
1876   }
1877   sblk->pi[idx] = kvidx;
1878   if (sblk->kvblkn != ADDR2BLK(kvblk->addr)) {
1879     sblk->kvblkn = ADDR2BLK(kvblk->addr);
1880     if (!(sblk->flags & SBLK_CACHE_FLAGS)) {
1881       sblk->flags |= SBLK_CACHE_UPDATE;
1882     }
1883   }
1884   ++sblk->pnum;
1885   sblk->flags |= SBLK_DURTY;
1886   if (idx == 0) { // the lowest key inserted
1887     size_t ksize = key->size;
1888     bool compound = !raw_key && (db->dbflg & IWDB_COMPOUND_KEYS);
1889     if (compound) {
1890       ksize += IW_VNUMSIZE(key->compound);
1891     }
1892     sblk->lkl = MIN(db->iwkv->pklen, ksize);
1893     uint8_t *wp = sblk->lk;
1894     if (compound) {
1895       int len;
1896       IW_SETVNUMBUF64(len, wp, key->compound);
1897       wp += len;
1898     }
1899     memcpy(wp, key->data, sblk->lkl - (ksize - key->size));
1900     if (ksize <= db->iwkv->pklen) {
1901       sblk->flags |= SBLK_FULL_LKEY;
1902     } else {
1903       sblk->flags &= ~SBLK_FULL_LKEY;
1904     }
1905     if (!(sblk->flags & SBLK_CACHE_FLAGS)) {
1906       sblk->flags |= SBLK_CACHE_UPDATE;
1907     }
1908   }
1909   if (!raw_key) {
1910     // Update active cursors inside this block
1911     pthread_spin_lock(&db->cursors_slk);
1912     for (IWKV_cursor cur = db->cursors; cur; cur = cur->next) {
1913       if (cur->cn && cur->cn->addr == sblk->addr) {
1914         if (cur->cn != sblk) {
1915           memcpy(cur->cn, sblk, sizeof(*cur->cn));
1916           cur->cn->kvblk = 0;
1917           cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
1918         }
1919         if (cur->cnpos >= idx) {
1920           cur->cnpos++;
1921         }
1922       }
1923     }
1924     pthread_spin_unlock(&db->cursors_slk);
1925   }
1926   return 0;
1927 }
1928 
_sblk_addkv(SBLK * sblk,IWLCTX * lx)1929 static WUR iwrc _sblk_addkv(SBLK *sblk, IWLCTX *lx) {
1930   const IWKV_val *key = lx->key;
1931   const IWKV_val *val = lx->val;
1932   assert(key && key->size && key->data && val && sblk->kvblk);
1933   if (!sblk) {
1934     iwlog_error2("sblk != 0");
1935     return IW_ERROR_ASSERTION;
1936   }
1937   uint8_t *mm, idx, kvidx;
1938   IWDB db = sblk->db;
1939   KVBLK *kvblk = sblk->kvblk;
1940   IWFS_FSM *fsm = &sblk->db->iwkv->fsm;
1941   if (sblk->pnum >= KVBLK_IDXNUM) {
1942     return _IWKV_RC_KVBLOCK_FULL;
1943   }
1944   iwrc rc = _kvblk_addkv(kvblk, key, val, &kvidx, false);
1945   RCRET(rc);
1946   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1947   RCRET(rc);
1948   rc = _sblk_insert_pi_mm(sblk, kvidx, lx, mm, &idx);
1949   RCRET(rc);
1950   fsm->release_mmap(fsm);
1951   if (idx == 0) { // the lowest key inserted
1952     size_t ksize = key->size;
1953     bool compound = (db->dbflg & IWDB_COMPOUND_KEYS);
1954     if (compound) {
1955       ksize += IW_VNUMSIZE(key->compound);
1956     }
1957     sblk->lkl = MIN(db->iwkv->pklen, ksize);
1958     uint8_t *wp = sblk->lk;
1959     if (compound) {
1960       int len;
1961       IW_SETVNUMBUF64(len, wp, key->compound);
1962       wp += len;
1963     }
1964     memcpy(wp, key->data, sblk->lkl - (ksize - key->size));
1965     if (ksize <= db->iwkv->pklen) {
1966       sblk->flags |= SBLK_FULL_LKEY;
1967     } else {
1968       sblk->flags &= ~SBLK_FULL_LKEY;
1969     }
1970     if (!(sblk->flags & SBLK_CACHE_FLAGS)) {
1971       sblk->flags |= SBLK_CACHE_UPDATE;
1972     }
1973   }
1974   if (sblk->kvblkn != ADDR2BLK(kvblk->addr)) {
1975     sblk->kvblkn = ADDR2BLK(kvblk->addr);
1976     if (!(sblk->flags & SBLK_CACHE_FLAGS)) {
1977       sblk->flags |= SBLK_CACHE_UPDATE;
1978     }
1979   }
1980   sblk->flags |= SBLK_DURTY;
1981 
1982   // Update active cursors inside this block
1983   pthread_spin_lock(&db->cursors_slk);
1984   for (IWKV_cursor cur = db->cursors; cur; cur = cur->next) {
1985     if (cur->cn && cur->cn->addr == sblk->addr) {
1986       if (cur->cn != sblk) {
1987         memcpy(cur->cn, sblk, sizeof(*cur->cn));
1988         cur->cn->kvblk = 0;
1989         cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
1990       }
1991       if (cur->cnpos >= idx) {
1992         cur->cnpos++;
1993       }
1994     }
1995   }
1996   pthread_spin_unlock(&db->cursors_slk);
1997 
1998   return 0;
1999 }
2000 
_sblk_updatekv(SBLK * sblk,int8_t idx,const IWKV_val * key,const IWKV_val * val)2001 static WUR iwrc _sblk_updatekv(SBLK *sblk, int8_t idx,
2002                                const IWKV_val *key, const IWKV_val *val) {
2003   assert(sblk && sblk->kvblk && idx >= 0 && idx < sblk->pnum);
2004   IWDB db = sblk->db;
2005   KVBLK *kvblk = sblk->kvblk;
2006   uint8_t kvidx = sblk->pi[idx];
2007   iwrc intrc = 0;
2008   iwrc rc = _kvblk_updatev(kvblk, &kvidx, key, val);
2009   if (IWKV_IS_INTERNAL_RC(rc)) {
2010     intrc = rc;
2011     rc = 0;
2012   }
2013   RCRET(rc);
2014   if (sblk->kvblkn != ADDR2BLK(kvblk->addr)) {
2015     sblk->kvblkn = ADDR2BLK(kvblk->addr);
2016     if (!(sblk->flags & SBLK_CACHE_FLAGS)) {
2017       sblk->flags |= SBLK_CACHE_UPDATE;
2018     }
2019   }
2020   sblk->pi[idx] = kvidx;
2021   sblk->flags |= SBLK_DURTY;
2022   // Update active cursors inside this block
2023   pthread_spin_lock(&db->cursors_slk);
2024   for (IWKV_cursor cur = db->cursors; cur; cur = cur->next) {
2025     if (cur->cn && cur->cn != sblk && cur->cn->addr == sblk->addr) {
2026       memcpy(cur->cn, sblk, sizeof(*cur->cn));
2027       cur->cn->kvblk = 0;
2028       cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2029     }
2030   }
2031   pthread_spin_unlock(&db->cursors_slk);
2032   return intrc;
2033 }
2034 
_sblk_rmkv(SBLK * sblk,uint8_t idx)2035 static WUR iwrc _sblk_rmkv(SBLK *sblk, uint8_t idx) {
2036   assert(sblk && sblk->kvblk);
2037   IWDB db = sblk->db;
2038   KVBLK *kvblk = sblk->kvblk;
2039   IWFS_FSM *fsm = &sblk->db->iwkv->fsm;
2040   assert(kvblk && idx < sblk->pnum && sblk->pi[idx] < KVBLK_IDXNUM);
2041 
2042   iwrc rc = _kvblk_rmkv(kvblk, sblk->pi[idx], 0);
2043   RCRET(rc);
2044 
2045   if (sblk->kvblkn != ADDR2BLK(kvblk->addr)) {
2046     sblk->kvblkn = ADDR2BLK(kvblk->addr);
2047     if (!(sblk->flags & SBLK_CACHE_FLAGS)) {
2048       sblk->flags |= SBLK_CACHE_UPDATE;
2049     }
2050   }
2051   --sblk->pnum;
2052   sblk->flags |= SBLK_DURTY;
2053 
2054   if (idx < sblk->pnum && sblk->pnum > 0) {
2055     memmove(sblk->pi + idx, sblk->pi + idx + 1, sblk->pnum - idx);
2056   }
2057 
2058   if (idx == 0) { // Lowest key removed
2059     // Replace the lowest key with the next one or reset
2060     if (sblk->pnum > 0) {
2061       uint8_t *mm, *kbuf;
2062       uint32_t klen;
2063       rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2064       RCRET(rc);
2065       rc = _kvblk_key_peek(sblk->kvblk, sblk->pi[idx], mm, &kbuf, &klen);
2066       if (rc) {
2067         fsm->release_mmap(fsm);
2068         return rc;
2069       }
2070       sblk->lkl = MIN(db->iwkv->pklen, klen);
2071       memcpy(sblk->lk, kbuf, sblk->lkl);
2072       fsm->release_mmap(fsm);
2073       if (klen <= db->iwkv->pklen) {
2074         sblk->flags |= SBLK_FULL_LKEY;
2075       } else {
2076         sblk->flags &= ~SBLK_FULL_LKEY;
2077       }
2078       if (!(sblk->flags & SBLK_CACHE_FLAGS)) {
2079         sblk->flags |= SBLK_CACHE_UPDATE;
2080       }
2081     } else {
2082       sblk->lkl = 0;
2083       sblk->flags |= SBLK_CACHE_REMOVE;
2084     }
2085   }
2086 
2087   // Update active cursors
2088   pthread_spin_lock(&db->cursors_slk);
2089   for (IWKV_cursor cur = db->cursors; cur; cur = cur->next) {
2090     if (cur->cn && cur->cn->addr == sblk->addr) {
2091       cur->skip_next = 0;
2092       if (cur->cn != sblk) {
2093         memcpy(cur->cn, sblk, sizeof(*cur->cn));
2094         cur->cn->kvblk = 0;
2095         cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2096       }
2097       if (cur->cnpos == idx) {
2098         if (idx && idx == sblk->pnum) {
2099           cur->cnpos--;
2100           cur->skip_next = -1;
2101         } else {
2102           cur->skip_next = 1;
2103         }
2104       } else if (cur->cnpos > idx) {
2105         cur->cnpos--;
2106       }
2107     }
2108   }
2109   pthread_spin_unlock(&db->cursors_slk);
2110   return 0;
2111 }
2112 
2113 //--------------------------  IWLCTX
2114 
_lx_sblk_cmp_key(IWLCTX * lx,SBLK * sblk,int * resp)2115 WUR iwrc _lx_sblk_cmp_key(IWLCTX *lx, SBLK *sblk, int *resp) {
2116   int res = 0;
2117   iwrc rc = 0;
2118   iwdb_flags_t dbflg = sblk->db->dbflg;
2119   const IWKV_val *key = lx->key;
2120   uint8_t lkl = sblk->lkl;
2121   size_t ksize = key->size;
2122 
2123   if (IW_UNLIKELY(sblk->pnum < 1 || (sblk->flags & SBLK_DB))) {
2124     *resp = 0;
2125     iwlog_ecode_error3(IWKV_ERROR_CORRUPTED);
2126     return IWKV_ERROR_CORRUPTED;
2127   }
2128   if (dbflg & IWDB_COMPOUND_KEYS) {
2129     ksize += IW_VNUMSIZE(key->compound);
2130   }
2131   if ((sblk->flags & SBLK_FULL_LKEY)
2132       || ksize < lkl
2133       || (dbflg & (IWDB_VNUM64_KEYS | IWDB_REALNUM_KEYS))) {
2134     res = _cmp_keys(dbflg, sblk->lk, lkl, key);
2135   } else {
2136     res = _cmp_keys_prefix(dbflg, sblk->lk, lkl, key);
2137     if (res == 0) {
2138       uint32_t kl;
2139       uint8_t *mm, *k;
2140       IWFS_FSM *fsm = &lx->db->iwkv->fsm;
2141       rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2142       if (rc) {
2143         *resp = 0;
2144         return rc;
2145       }
2146       if (!sblk->kvblk) {
2147         rc = _sblk_loadkvblk_mm(lx, sblk, mm);
2148         if (rc) {
2149           *resp = 0;
2150           fsm->release_mmap(fsm);
2151           return rc;
2152         }
2153       }
2154       rc = _kvblk_key_peek(sblk->kvblk, sblk->pi[0], mm, &k, &kl);
2155       RCRET(rc);
2156       res = _cmp_keys(dbflg, k, kl, key);
2157       fsm->release_mmap(fsm);
2158     }
2159   }
2160   *resp = res;
2161   return rc;
2162 }
2163 
_lx_roll_forward(IWLCTX * lx,uint8_t lvl)2164 static WUR iwrc _lx_roll_forward(IWLCTX *lx, uint8_t lvl) {
2165   iwrc rc = 0;
2166   int cret;
2167   SBLK *sblk;
2168   blkn_t blkn;
2169   assert(lx->lower);
2170 
2171   while ((blkn = lx->lower->n[lvl])) {
2172     off_t blkaddr = BLK2ADDR(blkn);
2173     if (lx->nlvl > -1 && lvl < lx->nlvl) {
2174       uint8_t ulvl = lvl + 1;
2175       if (lx->pupper[ulvl] && lx->pupper[ulvl]->addr == blkaddr) {
2176         sblk = lx->pupper[ulvl];
2177       } else if (lx->plower[ulvl] && lx->plower[ulvl]->addr == blkaddr) {
2178         sblk = lx->plower[ulvl];
2179       } else {
2180         rc = _sblk_at(lx, blkaddr, 0, &sblk);
2181       }
2182     } else {
2183       rc = _sblk_at(lx, blkaddr, 0, &sblk);
2184     }
2185     RCRET(rc);
2186 #ifndef NDEBUG
2187     ++lx->num_cmps;
2188 #endif
2189     rc = _lx_sblk_cmp_key(lx, sblk, &cret);
2190     RCRET(rc);
2191     if (cret > 0 || lx->upper_addr == sblk->addr) { // upper > key
2192       lx->upper = sblk;
2193       break;
2194     } else {
2195       lx->lower = sblk;
2196     }
2197   }
2198   return 0;
2199 }
2200 
_lx_find_bounds(IWLCTX * lx)2201 static WUR iwrc _lx_find_bounds(IWLCTX *lx) {
2202   iwrc rc = 0;
2203   int lvl;
2204   blkn_t blkn;
2205   SBLK *dblk = &lx->dblk;
2206   if (!dblk->addr) {
2207     SBLK *s;
2208     rc = _sblk_at(lx, lx->db->addr, 0, &s);
2209     RCRET(rc);
2210     memcpy(dblk, s, sizeof(*dblk));
2211   }
2212   if (!lx->lower) {
2213     rc = _dbcache_get(lx);
2214     RCRET(rc);
2215   }
2216   if (lx->nlvl > dblk->lvl) {
2217     // New level in DB
2218     dblk->lvl = (uint8_t) lx->nlvl;
2219     dblk->flags |= SBLK_DURTY;
2220   }
2221   lvl = lx->lower->lvl;
2222   while (lvl > -1) {
2223     rc = _lx_roll_forward(lx, (uint8_t) lvl);
2224     RCRET(rc);
2225     if (lx->upper) {
2226       blkn = ADDR2BLK(lx->upper->addr);
2227     } else {
2228       blkn = 0;
2229     }
2230     do {
2231       if (lx->nlvl >= lvl) {
2232         lx->plower[lvl] = lx->lower;
2233         lx->pupper[lvl] = lx->upper;
2234       }
2235     } while (lvl-- && lx->lower->n[lvl] == blkn);
2236   }
2237   return 0;
2238 }
2239 
_lx_release_mm(IWLCTX * lx,uint8_t * mm)2240 static iwrc _lx_release_mm(IWLCTX *lx, uint8_t *mm) {
2241   iwrc rc = 0;
2242   if (lx->nlvl > -1) {
2243     SBLK *lsb = 0, *usb = 0;
2244     if (lx->nb) {
2245       rc = _sblk_sync_mm(lx, lx->nb, mm);
2246       RCGO(rc, finish);
2247     }
2248     if (lx->pupper[0] == lx->upper) {
2249       lx->upper = 0;
2250     }
2251     if (lx->plower[0] == lx->lower) {
2252       lx->lower = 0;
2253     }
2254     for (int i = 0; i <= lx->nlvl; ++i) {
2255       if (lx->pupper[i]) {
2256         if (lx->pupper[i] != usb) {
2257           usb = lx->pupper[i];
2258           rc = _sblk_sync_and_release_mm(lx, &lx->pupper[i], mm);
2259           RCGO(rc, finish);
2260         }
2261         lx->pupper[i] = 0;
2262       }
2263       if (lx->plower[i]) {
2264         if (lx->plower[i] != lsb) {
2265           lsb = lx->plower[i];
2266           rc = _sblk_sync_and_release_mm(lx, &lx->plower[i], mm);
2267           RCGO(rc, finish);
2268         }
2269         lx->plower[i] = 0;
2270       }
2271     }
2272   }
2273   if (lx->upper) {
2274     rc = _sblk_sync_and_release_mm(lx, &lx->upper, mm);
2275     RCGO(rc, finish);
2276   }
2277   if (lx->lower) {
2278     rc = _sblk_sync_and_release_mm(lx, &lx->lower, mm);
2279     RCGO(rc, finish);
2280   }
2281   if (lx->dblk.flags & SBLK_DURTY) {
2282     rc = _sblk_sync_mm(lx, &lx->dblk, mm);
2283     RCGO(rc, finish);
2284   }
2285   if (lx->nb) {
2286     if (lx->nb->flags & SBLK_CACHE_PUT) {
2287       rc = _dbcache_put_lw(lx, lx->nb);
2288     }
2289     _sblk_release(lx, &lx->nb);
2290     RCGO(rc, finish);
2291   }
2292   if (lx->cache_reload) {
2293     rc = _dbcache_fill_lw(lx);
2294   }
2295 
2296 finish:
2297   lx->destroy_addr = 0;
2298   return rc;
2299 }
2300 
_lx_release(IWLCTX * lx)2301 iwrc _lx_release(IWLCTX *lx) {
2302   uint8_t *mm;
2303   IWFS_FSM *fsm = &lx->db->iwkv->fsm;
2304   iwrc rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2305   RCRET(rc);
2306   rc = _lx_release_mm(lx, mm);
2307   IWRC(fsm->release_mmap(fsm), rc);
2308   return rc;
2309 }
2310 
_lx_split_addkv(IWLCTX * lx,int idx,SBLK * sblk)2311 static iwrc _lx_split_addkv(IWLCTX *lx, int idx, SBLK *sblk) {
2312   iwrc rc;
2313   SBLK *nb;
2314   blkn_t nblk;
2315   IWDB db = sblk->db;
2316   bool uside = (idx == sblk->pnum);
2317   register const int8_t pivot = (KVBLK_IDXNUM / 2) + 1; // 32
2318 
2319   if (uside) { // Upper side
2320     rc = _sblk_create(lx, (uint8_t) lx->nlvl, 0, sblk, lx->upper, &nb);
2321     RCRET(rc);
2322     rc = _sblk_addkv(nb, lx);
2323     RCGO(rc, finish);
2324 
2325   } else { // New key is somewhere in a middle of sblk->kvblk
2326     assert(sblk->kvblk);
2327     // We are in the middle
2328     // Do the partial split
2329     // Move kv pairs into new `nb`
2330     // Compute space required for the new sblk which stores kv pairs after pivot `idx`
2331     size_t sz = 0;
2332     for (int8_t i = pivot; i < sblk->pnum; ++i) {
2333       sz += sblk->kvblk->pidx[sblk->pi[i]].len;
2334     }
2335     if (idx > pivot) {
2336       sz += IW_VNUMSIZE(lx->key->size) + lx->key->size + lx->val->size;
2337     }
2338     sz += KVBLK_MAX_NKV_SZ;
2339     uint8_t kvbpow = (uint8_t) iwlog2_64(sz);
2340     while ((1ULL << kvbpow) < sz) kvbpow++;
2341 
2342     rc = _sblk_create(lx, (uint8_t) lx->nlvl, kvbpow, sblk, lx->upper, &nb);
2343     RCRET(rc);
2344 
2345     IWKV_val key, val;
2346     IWFS_FSM *fsm = &lx->db->iwkv->fsm;
2347     for (int8_t i = pivot, end = sblk->pnum; i < end; ++i) {
2348       uint8_t *mm;
2349       rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2350       RCBREAK(rc);
2351 
2352       rc = _kvblk_kv_get(sblk->kvblk, mm, sblk->pi[i], &key, &val);
2353       assert(key.size);
2354       fsm->release_mmap(fsm);
2355       RCBREAK(rc);
2356 
2357       rc = _sblk_addkv2(nb, i - pivot, &key, &val, true);
2358       _kv_dispose(&key, &val);
2359 
2360       RCBREAK(rc);
2361       sblk->kvblk->pidx[sblk->pi[i]].len = 0;
2362       sblk->kvblk->pidx[sblk->pi[i]].off = 0;
2363       --sblk->pnum;
2364     }
2365     sblk->kvblk->flags |= KVBLK_DURTY;
2366     sblk->kvblk->zidx = sblk->pi[pivot];
2367     sblk->kvblk->maxoff = 0;
2368     for (int i = 0; i < KVBLK_IDXNUM; ++i) {
2369       if (sblk->kvblk->pidx[i].off > sblk->kvblk->maxoff) {
2370         sblk->kvblk->maxoff = sblk->kvblk->pidx[i].off;
2371       }
2372     }
2373   }
2374 
2375   // Fix levels:
2376   //  [ lb -> sblk -> ub ]
2377   //  [ lb -> sblk -> nb -> ub ]
2378   nblk = ADDR2BLK(nb->addr);
2379   lx->pupper[0]->p0 = nblk;
2380   lx->pupper[0]->flags |= SBLK_DURTY;
2381   nb->p0 = ADDR2BLK(lx->plower[0]->addr);
2382   for (int i = 0; i <= nb->lvl; ++i) {
2383     lx->plower[i]->n[i] = nblk;
2384     lx->plower[i]->flags |= SBLK_DURTY;
2385     nb->n[i] = ADDR2BLK(lx->pupper[i]->addr);
2386   }
2387 
2388   pthread_spin_lock(&db->cursors_slk);
2389   for (IWKV_cursor cur = db->cursors; cur; cur = cur->next) {
2390     if (cur->cn && cur->cn->addr == sblk->addr) {
2391       if (cur->cnpos >= pivot) {
2392         memcpy(cur->cn, nb, sizeof(*cur->cn));
2393         cur->cn->kvblk = 0;
2394         cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2395         cur->cnpos -= pivot;
2396       }
2397     }
2398   }
2399   pthread_spin_unlock(&db->cursors_slk);
2400 
2401   if (!uside) {
2402     if (idx > pivot) {
2403       rc = _sblk_addkv(nb, lx);
2404     } else {
2405       rc = _sblk_addkv(sblk, lx);
2406     }
2407     RCGO(rc, finish);
2408   }
2409 
2410 finish:
2411   if (rc) {
2412     lx->nb = 0;
2413     IWRC(_sblk_destroy(lx, &nb), rc);
2414   } else {
2415     lx->nb = nb;
2416   }
2417   return rc;
2418 }
2419 
_lx_init_chute(IWLCTX * lx)2420 IW_INLINE iwrc _lx_init_chute(IWLCTX *lx) {
2421   assert(lx->nlvl >= 0);
2422   iwrc rc = 0;
2423   if (!lx->pupper[lx->nlvl]) { // fix zero upper by dbtail
2424     SBLK *dbtail;
2425     rc = _sblk_at(lx, 0, 0, &dbtail);
2426     RCRET(rc);
2427     for (int8_t i = lx->nlvl; i >= 0 && !lx->pupper[i]; --i) {
2428       lx->pupper[i] = dbtail;
2429     }
2430   }
2431   return 0;
2432 }
2433 
_lx_addkv(IWLCTX * lx)2434 static WUR iwrc _lx_addkv(IWLCTX *lx) {
2435   iwrc rc;
2436   bool found, uadd;
2437   uint8_t *mm = 0, idx;
2438   SBLK *sblk = lx->lower;
2439   IWFS_FSM *fsm = &lx->db->iwkv->fsm;
2440   if (lx->nlvl > -1) {
2441     rc = _lx_init_chute(lx);
2442     RCRET(rc);
2443   }
2444   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2445   RCRET(rc);
2446   rc = _sblk_loadkvblk_mm(lx, sblk, mm);
2447   if (rc) {
2448     fsm->release_mmap(fsm);
2449     return rc;
2450   }
2451   rc = _sblk_find_pi_mm(sblk, lx, mm, &found, &idx);
2452   RCRET(rc);
2453   if (found && (lx->opflags & IWKV_NO_OVERWRITE)) {
2454     fsm->release_mmap(fsm);
2455     return IWKV_ERROR_KEY_EXISTS;
2456   }
2457   uadd = (!found
2458           && sblk->pnum > KVBLK_IDXNUM - 1 && idx > KVBLK_IDXNUM - 1
2459           && lx->upper && lx->upper->pnum < KVBLK_IDXNUM);
2460   if (uadd) {
2461     rc = _sblk_loadkvblk_mm(lx, lx->upper, mm);
2462     if (rc) {
2463       fsm->release_mmap(fsm);
2464       return rc;
2465     }
2466   }
2467   if (found) {
2468     IWKV_val sval, *val = lx->val;
2469     if (lx->opflags & IWKV_VAL_INCREMENT) {
2470       int64_t ival;
2471       uint8_t *rp;
2472       uint32_t len;
2473       if (val->size == 4) {
2474         int32_t lv;
2475         memcpy(&lv, val->data, val->size);
2476         lv = IW_ITOHL(lv);
2477         ival = lv;
2478       } else if (val->size == 8) {
2479         memcpy(&ival, val->data, val->size);
2480         ival = IW_ITOHLL(ival);
2481       } else {
2482         rc = IWKV_ERROR_VALUE_CANNOT_BE_INCREMENTED;
2483         fsm->release_mmap(fsm);
2484         return rc;
2485       }
2486       _kvblk_value_peek(sblk->kvblk, sblk->pi[idx], mm, &rp, &len);
2487       sval.data = rp;
2488       sval.size = len;
2489       if (sval.size == 4) {
2490         uint32_t lv;
2491         memcpy(&lv, sval.data, 4);
2492         lv = IW_ITOHL(lv);
2493         lv += ival;
2494         _num2lebuf(lx->incbuf, &lv, 4);
2495       } else if (sval.size == 8) {
2496         uint64_t llv;
2497         memcpy(&llv, sval.data, 8);
2498         llv = IW_ITOHLL(llv);
2499         llv += ival;
2500         _num2lebuf(lx->incbuf, &llv, 8);
2501       } else {
2502         rc = IWKV_ERROR_VALUE_CANNOT_BE_INCREMENTED;
2503         fsm->release_mmap(fsm);
2504         return rc;
2505       }
2506       sval.data = lx->incbuf;
2507       val = &sval;
2508     }
2509     if (lx->ph) {
2510       IWKV_val oldval;
2511       rc = _kvblk_value_get(sblk->kvblk, mm, sblk->pi[idx], &oldval);
2512       fsm->release_mmap(fsm);
2513       if (!rc) {
2514         // note: oldval should be disposed by ph
2515         rc = lx->ph(lx->key, lx->val, &oldval, lx->phop);
2516       }
2517       RCRET(rc);
2518     } else {
2519       fsm->release_mmap(fsm);
2520     }
2521     return _sblk_updatekv(sblk, idx, lx->key, val);
2522   } else {
2523     fsm->release_mmap(fsm);
2524     if (sblk->pnum > KVBLK_IDXNUM - 1) {
2525       if (uadd) {
2526         if (lx->ph) {
2527           rc = lx->ph(lx->key, lx->val, 0, lx->phop);
2528           RCRET(rc);
2529         }
2530         return _sblk_addkv(lx->upper, lx);
2531       }
2532       if (lx->nlvl < 0) {
2533         return _IWKV_RC_REQUIRE_NLEVEL;
2534       }
2535       if (lx->ph) {
2536         rc = lx->ph(lx->key, lx->val, 0, lx->phop);
2537         RCRET(rc);
2538       }
2539       return _lx_split_addkv(lx, idx, sblk);
2540     } else {
2541       if (lx->ph) {
2542         rc = lx->ph(lx->key, lx->val, 0, lx->phop);
2543         RCRET(rc);
2544       }
2545       return _sblk_addkv2(sblk, idx, lx->key, lx->val, false);
2546     }
2547   }
2548 }
2549 
_lx_put_lw(IWLCTX * lx)2550 IW_INLINE WUR iwrc _lx_put_lw(IWLCTX *lx) {
2551   iwrc rc;
2552 start:
2553   rc = _lx_find_bounds(lx);
2554   if (rc) {
2555     _lx_release_mm(lx, 0);
2556     return rc;
2557   }
2558   rc = _lx_addkv(lx);
2559   if (rc == _IWKV_RC_REQUIRE_NLEVEL) {
2560     SBLK *lower = lx->lower;
2561     lx->lower = 0;
2562     _lx_release_mm(lx, 0);
2563     lx->nlvl = _sblk_genlevel(lx->db);
2564     if (lower->lvl >= lx->nlvl) {
2565       lx->lower = lower;
2566     }
2567     goto start;
2568   }
2569   if (rc == _IWKV_RC_KVBLOCK_FULL) {
2570     rc = IWKV_ERROR_CORRUPTED;
2571     iwlog_ecode_error3(rc);
2572   }
2573   IWRC(_lx_release(lx), rc);
2574   return rc;
2575 }
2576 
_lx_get_lr(IWLCTX * lx)2577 IW_INLINE WUR iwrc _lx_get_lr(IWLCTX *lx) {
2578   iwrc rc = _lx_find_bounds(lx);
2579   RCRET(rc);
2580   bool found;
2581   uint8_t *mm, idx;
2582   IWFS_FSM *fsm = &lx->db->iwkv->fsm;
2583   lx->val->size = 0;
2584   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2585   RCRET(rc);
2586   rc = _sblk_loadkvblk_mm(lx, lx->lower, mm);
2587   RCGO(rc, finish);
2588   rc = _sblk_find_pi_mm(lx->lower, lx, mm, &found, &idx);
2589   RCGO(rc, finish);
2590   if (found) {
2591     rc = _kvblk_value_get(lx->lower->kvblk, mm, lx->lower->pi[idx], lx->val);
2592   } else {
2593     rc = IWKV_ERROR_NOTFOUND;
2594   }
2595 
2596 finish:
2597   IWRC(fsm->release_mmap(fsm), rc);
2598   _lx_release_mm(lx, 0);
2599   return rc;
2600 }
2601 
_lx_del_sblk_lw(IWLCTX * lx,SBLK * sblk,uint8_t idx)2602 static WUR iwrc _lx_del_sblk_lw(IWLCTX *lx, SBLK *sblk, uint8_t idx) {
2603   assert(sblk->pnum == 1 && sblk->kvblk);
2604 
2605   iwrc rc;
2606   IWDB db = lx->db;
2607   KVBLK *kvblk = sblk->kvblk;
2608   blkn_t sblk_blkn = ADDR2BLK(sblk->addr);
2609 
2610   _lx_release_mm(lx, 0);
2611   lx->nlvl = sblk->lvl;
2612   lx->upper_addr = sblk->addr;
2613 
2614   rc = _lx_find_bounds(lx);
2615   RCRET(rc);
2616   assert(lx->upper->pnum == 1 && lx->upper->addr == lx->upper_addr);
2617 
2618   lx->upper->kvblk = kvblk;
2619   rc = _sblk_rmkv(lx->upper, idx);
2620   RCGO(rc, finish);
2621 
2622   for (int i = 0; i <= lx->nlvl; ++i) {
2623     lx->plower[i]->n[i] = lx->upper->n[i];
2624     lx->plower[i]->flags |= SBLK_DURTY;
2625     if (lx->plower[i]->flags & SBLK_DB) {
2626       if (!lx->plower[i]->n[i]) {
2627         --lx->plower[i]->lvl;
2628       }
2629     }
2630     if (lx->pupper[i] == lx->upper) {
2631       // Do not touch `lx->upper` in next `_lx_release_mm()` call
2632       lx->pupper[i] = 0;
2633     }
2634   }
2635 
2636   SBLK rb;  // Block to remove
2637   memcpy(&rb, lx->upper, sizeof(rb));
2638 
2639   SBLK *nb, // Block after lx->upper
2640        *rbp = &rb;
2641 
2642   assert(!lx->nb);
2643   rc = _sblk_at(lx, BLK2ADDR(rb.n[0]), 0, &nb);
2644   RCGO(rc, finish);
2645   lx->nb = nb;
2646   lx->nb->p0 = rb.p0;
2647   lx->nb->flags |= SBLK_DURTY;
2648 
2649   // Update cursors within sblk removed
2650   pthread_spin_lock(&db->cursors_slk);
2651   for (IWKV_cursor cur = db->cursors; cur; cur = cur->next) {
2652     if (cur->cn) {
2653       if (cur->cn->addr == sblk->addr) {
2654         if (nb->flags & SBLK_DB) {
2655           if (!(lx->plower[0]->flags & SBLK_DB)) {
2656             memcpy(cur->cn, lx->plower[0], sizeof(*cur->cn));
2657             cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2658             cur->cn->kvblk = 0;
2659             cur->skip_next = -1;
2660             cur->cnpos = lx->plower[0]->pnum;
2661             if (cur->cnpos) cur->cnpos--;
2662           } else {
2663             cur->cn = 0;
2664             cur->cnpos = 0;
2665             cur->skip_next = 0;
2666           }
2667         } else {
2668           memcpy(cur->cn, nb, sizeof(*nb));
2669           cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2670           cur->cn->kvblk = 0;
2671           cur->cnpos = 0;
2672           cur->skip_next = 1;
2673         }
2674       } else if (cur->cn->n[0] == sblk_blkn) {
2675         memcpy(cur->cn, lx->plower[0], sizeof(*cur->cn));
2676         cur->cn->kvblk = 0;
2677         cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2678       } else if (cur->cn->p0 == sblk_blkn) {
2679         memcpy(cur->cn, nb, sizeof(*nb));
2680         cur->cn->kvblk = 0;
2681         cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2682       }
2683     }
2684   }
2685   pthread_spin_unlock(&db->cursors_slk);
2686 
2687   rc = _sblk_destroy(lx, &rbp);
2688 
2689 finish:
2690   return rc;
2691 }
2692 
_lx_del_lw(IWLCTX * lx)2693 static WUR iwrc _lx_del_lw(IWLCTX *lx) {
2694   iwrc rc;
2695   bool found;
2696   uint8_t *mm = 0, idx;
2697   IWDB db = lx->db;
2698   IWFS_FSM *fsm = &db->iwkv->fsm;
2699   SBLK *sblk;
2700 
2701   rc = _lx_find_bounds(lx);
2702   RCRET(rc);
2703 
2704   sblk = lx->lower;
2705   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2706   RCGO(rc, finish);
2707   rc = _sblk_loadkvblk_mm(lx, sblk, mm);
2708   RCGO(rc, finish);
2709   rc = _sblk_find_pi_mm(sblk, lx, mm, &found, &idx);
2710   RCGO(rc, finish);
2711   if (!found) {
2712     rc = IWKV_ERROR_NOTFOUND;
2713     goto finish;
2714   }
2715   fsm->release_mmap(fsm);
2716   mm = 0;
2717 
2718   if (sblk->pnum == 1) { // last kv in block
2719     rc = _lx_del_sblk_lw(lx, sblk, idx);
2720   } else {
2721     rc = _sblk_rmkv(sblk, idx);
2722   }
2723 
2724 finish:
2725   if (mm) {
2726     fsm->release_mmap(fsm);
2727   }
2728   if (rc) {
2729     _lx_release_mm(lx, 0);
2730   } else {
2731     rc = _lx_release(lx);
2732   }
2733   return rc;
2734 }
2735 
2736 //-------------------------- CACHE
2737 
_dbcache_destroy_lw(IWDB db)2738 static void _dbcache_destroy_lw(IWDB db) {
2739   free(db->cache.nodes);
2740   memset(&db->cache, 0, sizeof(db->cache));
2741 }
2742 
_dbcache_lvl(uint8_t lvl)2743 IW_INLINE uint8_t _dbcache_lvl(uint8_t lvl) {
2744   uint8_t clvl = (lvl >= DBCACHE_LEVELS) ? (lvl - DBCACHE_LEVELS + 1) : DBCACHE_MIN_LEVEL;
2745   if (clvl < DBCACHE_MIN_LEVEL) {
2746     clvl = DBCACHE_MIN_LEVEL;
2747   }
2748   return clvl;
2749 }
2750 
_dbcache_cmp_nodes(const void * v1,const void * v2,void * op,int * res)2751 static WUR iwrc _dbcache_cmp_nodes(const void *v1, const void *v2, void *op, int *res) {
2752   iwrc rc = 0;
2753   uint8_t *mm = 0;
2754   IWLCTX *lx = op;
2755   IWDB db = lx->db;
2756   IWFS_FSM *fsm = &db->iwkv->fsm;
2757   iwdb_flags_t dbflg = db->dbflg;
2758   int rv = 0, step;
2759 
2760   const DBCNODE *cn1 = v1, *cn2 = v2;
2761   uint8_t *k1 = (uint8_t *) cn1->lk, *k2 = (uint8_t *) cn2->lk;
2762   uint32_t kl1 = cn1->lkl, kl2 = cn2->lkl;
2763   KVBLK *kb;
2764 
2765   if (!kl1 && cn1->fullkey) {
2766     kl1 = cn1->sblkn;
2767   }
2768   if (!kl2 && cn2->fullkey) {
2769     kl2 = cn2->sblkn;
2770   }
2771 
2772   IWKV_val key2 = {
2773     .size = kl2,
2774     .data = k2
2775   };
2776 
2777   if (dbflg & IWDB_COMPOUND_KEYS) {
2778     IW_READVNUMBUF64(k2, key2.compound, step);
2779     key2.size -= step;
2780     key2.data = (char *) key2.data + step;
2781   }
2782 
2783   rv = _cmp_keys_prefix(dbflg, k1, kl1, &key2);
2784 
2785   if (rv == 0 && !(dbflg & (IWDB_VNUM64_KEYS | IWDB_REALNUM_KEYS))) {
2786 
2787     if (!cn1->fullkey || !cn2->fullkey) {
2788       rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2789       RCRET(rc);
2790       if (!cn1->fullkey) {
2791         rc = _kvblk_at_mm(lx, BLK2ADDR(cn1->kblkn), mm, 0, &kb);
2792         RCGO(rc, finish);
2793         rc = _kvblk_key_peek(kb, cn1->k0idx, mm, &k1, &kl1);
2794         RCGO(rc, finish);
2795       }
2796       if (!cn2->fullkey) {
2797         rc = _kvblk_at_mm(lx, BLK2ADDR(cn2->kblkn), mm, 0, &kb);
2798         RCGO(rc, finish);
2799         rc = _kvblk_key_peek(kb, cn2->k0idx, mm, &k2, &kl2);
2800         RCGO(rc, finish);
2801         key2.size = kl2;
2802         key2.data = k2;
2803         if (dbflg & IWDB_COMPOUND_KEYS) {
2804           IW_READVNUMBUF64(k2, key2.compound, step);
2805           key2.size -= step;
2806           key2.data = (char *) key2.data + step;
2807         }
2808       }
2809 
2810       rv = _cmp_keys(dbflg, k1, kl1, &key2);
2811 
2812     } else if (dbflg & IWDB_COMPOUND_KEYS) {
2813 
2814       int64_t c1, c2 = key2.compound;
2815       IW_READVNUMBUF64(k1, c1, step);
2816       kl1 -= step;
2817       if (key2.size == kl1) {
2818         rv = c1 > c2 ? -1 : c1 < c2 ? 1 : 0;
2819       } else {
2820         rv = (int) key2.size - (int) kl1;
2821       }
2822 
2823     } else  {
2824       rv = (int) kl2 - (int) kl1;
2825     }
2826   }
2827 
2828 finish:
2829   *res = rv;
2830   if (mm) {
2831     fsm->release_mmap(fsm);
2832   }
2833   return rc;
2834 }
2835 
_dbcache_fill_lw(IWLCTX * lx)2836 static WUR iwrc _dbcache_fill_lw(IWLCTX *lx) {
2837   iwrc rc = 0;
2838   IWDB db = lx->db;
2839   lx->cache_reload = 0;
2840   if (!lx->dblk.addr) {
2841     SBLK *s;
2842     rc = _sblk_at(lx, lx->db->addr, 0, &s);
2843     RCRET(rc);
2844     memcpy(&lx->dblk, s, sizeof(lx->dblk));
2845   }
2846   SBLK *sdb = &lx->dblk;
2847   SBLK *sblk = sdb;
2848   DBCACHE *c = &db->cache;
2849   assert(lx->db->addr == sdb->addr);
2850   c->num = 0;
2851   if (c->nodes) {
2852     free(c->nodes);
2853     c->nodes = 0;
2854   }
2855   if (sdb->lvl < DBCACHE_MIN_LEVEL) {
2856     c->open = true;
2857     return 0;
2858   }
2859   c->lvl = _dbcache_lvl(sdb->lvl);
2860   c->nsize = (lx->db->dbflg & IWDB_VNUM64_KEYS) ? DBCNODE_VNUM_SZ : DBCNODE_STR_SZ;
2861   c->asize = c->nsize * ((1U << DBCACHE_LEVELS) + DBCACHE_ALLOC_STEP);
2862 
2863   size_t nsize = c->nsize;
2864   c->nodes = malloc(c->asize);
2865   if (!c->nodes) {
2866     c->open = false;
2867     return iwrc_set_errno(IW_ERROR_ALLOC, errno);
2868   }
2869   blkn_t n;
2870   uint8_t *wp;
2871   size_t num = 0;
2872   while ((n = sblk->n[c->lvl])) {
2873     rc = _sblk_at(lx, BLK2ADDR(n), 0, &sblk);
2874     RCRET(rc);
2875     if (offsetof(DBCNODE, lk) + sblk->lkl > nsize) {
2876       free(c->nodes);
2877       c->nodes = 0;
2878       rc = IWKV_ERROR_CORRUPTED;
2879       iwlog_ecode_error3(rc);
2880       return rc;
2881     }
2882     DBCNODE cn = {
2883       .lkl = sblk->lkl,
2884       .fullkey = (sblk->flags & SBLK_FULL_LKEY),
2885       .k0idx = sblk->pi[0],
2886       .sblkn = ADDR2BLK(sblk->addr),
2887       .kblkn = sblk->kvblkn
2888     };
2889     if (c->asize < nsize * (num + 1)) {
2890       c->asize += (nsize * DBCACHE_ALLOC_STEP);
2891       wp = (uint8_t *) c->nodes;
2892       DBCNODE *nn = realloc(c->nodes, c->asize);
2893       if (!nn) {
2894         rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
2895         free(wp);
2896         return rc;
2897       }
2898       c->nodes = nn;
2899     }
2900     wp = (uint8_t *) c->nodes + nsize * num;
2901     memcpy(wp, &cn, offsetof(DBCNODE, lk));
2902     wp += offsetof(DBCNODE, lk);
2903     memcpy(wp, sblk->lk, sblk->lkl);
2904     ++num;
2905   }
2906   c->num = num;
2907   c->open = true;
2908   return 0;
2909 }
2910 
_dbcache_get(IWLCTX * lx)2911 static WUR iwrc _dbcache_get(IWLCTX *lx) {
2912   iwrc rc = 0;
2913   off_t idx;
2914   bool found;
2915   DBCNODE *n;
2916   alignas(DBCNODE) uint8_t dbcbuf[255];
2917   IWDB db = lx->db;
2918   DBCACHE *cache = &db->cache;
2919   const IWKV_val *key = lx->key;
2920   if (lx->nlvl > -1 || cache->num < 1) {
2921     lx->lower = &lx->dblk;
2922     return 0;
2923   }
2924   assert(cache->nodes);
2925   size_t lxksiz = key->size;
2926   if (db->dbflg & IWDB_COMPOUND_KEYS) {
2927     lxksiz += IW_VNUMSIZE(key->compound);
2928   }
2929 
2930   if (sizeof(DBCNODE) + lxksiz <= sizeof(dbcbuf)) {
2931     n = (DBCNODE *) dbcbuf;
2932   } else {
2933     n = malloc(sizeof(DBCNODE) + lxksiz);
2934     if (!n) {
2935       return iwrc_set_errno(IW_ERROR_ALLOC, errno);
2936     }
2937   }
2938   n->sblkn = (uint32_t) lxksiz; // `sblkn` used to store key size (to keep DBCNODE compact)
2939   n->kblkn = 0;
2940   n->fullkey = 1;
2941   n->lkl = 0;
2942   n->k0idx = 0;
2943 
2944   uint8_t *wp = (uint8_t *) n + offsetof(DBCNODE, lk);
2945   if (db->dbflg & IWDB_COMPOUND_KEYS) {
2946     size_t step;
2947     char vbuf[IW_VNUMBUFSZ];
2948     IW_SETVNUMBUF(step, vbuf, key->compound);
2949     memcpy(wp, vbuf, step);
2950     wp += step;
2951   }
2952   memcpy(wp, key->data, key->size);
2953 
2954   idx = iwarr_sorted_find2(cache->nodes, cache->num, cache->nsize, n, lx, &found, _dbcache_cmp_nodes);
2955   if (idx > 0) {
2956     DBCNODE *fn = (DBCNODE *)((uint8_t *) cache->nodes + (idx - 1) * cache->nsize);
2957     assert(fn && idx - 1 < cache->num);
2958     rc = _sblk_at(lx, BLK2ADDR(fn->sblkn), 0, &lx->lower);
2959   } else {
2960     lx->lower = &lx->dblk;
2961   }
2962   if ((uint8_t *) n != dbcbuf) {
2963     free(n);
2964   }
2965   return rc;
2966 }
2967 
_dbcache_put_lw(IWLCTX * lx,SBLK * sblk)2968 static WUR iwrc _dbcache_put_lw(IWLCTX *lx, SBLK *sblk) {
2969   off_t idx;
2970   bool found;
2971   IWDB db = lx->db;
2972   alignas(DBCNODE) uint8_t dbcbuf[255];
2973   DBCNODE *n = (DBCNODE *) dbcbuf;
2974   DBCACHE *cache = &db->cache;
2975   size_t nsize = cache->nsize;
2976 
2977   sblk->flags &= ~SBLK_CACHE_PUT;
2978   assert(sizeof(*cache) + sblk->lkl <= sizeof(dbcbuf));
2979   if (sblk->pnum < 1 || sblk->lvl < cache->lvl) {
2980     return 0;
2981   }
2982   if (sblk->lvl >= cache->lvl + DBCACHE_LEVELS || !cache->nodes) { // need to reload full cache
2983     lx->cache_reload = 1;
2984     return 0;
2985   }
2986   if (!sblk->kvblk) {
2987     assert(sblk->kvblk);
2988     return IW_ERROR_INVALID_STATE;
2989   }
2990   n->lkl = sblk->lkl;
2991   n->fullkey = (sblk->flags & SBLK_FULL_LKEY);
2992   n->k0idx = sblk->pi[0];
2993   n->sblkn = ADDR2BLK(sblk->addr);
2994   n->kblkn = sblk->kvblkn;
2995   memcpy((uint8_t *) n + offsetof(DBCNODE, lk), sblk->lk, sblk->lkl);
2996 
2997   idx = iwarr_sorted_find2(cache->nodes, cache->num, nsize, n, lx, &found, _dbcache_cmp_nodes);
2998   assert(!found);
2999 
3000   if (cache->asize <= cache->num * nsize) {
3001     size_t nsz = cache->asize + (nsize * DBCACHE_ALLOC_STEP);
3002     DBCNODE *nodes = realloc(cache->nodes, nsz);
3003     if (!nodes) {
3004       iwrc rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
3005       free(cache->nodes);
3006       cache->nodes = 0;
3007       return rc;
3008     }
3009     cache->asize = nsz;
3010     cache->nodes = nodes;
3011   }
3012 
3013   uint8_t *cptr = (uint8_t *) cache->nodes;
3014   if (cache->num != idx) {
3015     memmove(cptr + (idx + 1) * nsize, cptr + idx * nsize, (cache->num - idx) * nsize);
3016   }
3017   memcpy(cptr + idx * nsize, n, nsize);
3018   ++cache->num;
3019   return 0;
3020 }
3021 
_dbcache_remove_lw(IWLCTX * lx,SBLK * sblk)3022 static void _dbcache_remove_lw(IWLCTX *lx, SBLK *sblk) {
3023   IWDB db = lx->db;
3024   DBCACHE *cache = &db->cache;
3025   sblk->flags &= ~SBLK_CACHE_REMOVE;
3026   if (sblk->lvl < cache->lvl || cache->num < 1) {
3027     return;
3028   }
3029   if (cache->lvl > DBCACHE_MIN_LEVEL && lx->dblk.lvl < sblk->lvl) {
3030     // Database level reduced so we need to shift cache down
3031     lx->cache_reload = 1;
3032     return;
3033   }
3034   blkn_t sblkn = ADDR2BLK(sblk->addr);
3035   size_t num = cache->num;
3036   size_t nsize = cache->nsize;
3037   uint8_t *rp = (uint8_t *) cache->nodes;
3038   for (size_t i = 0; i < num; ++i) {
3039     DBCNODE *n = (DBCNODE *)(rp + i * nsize);
3040     if (sblkn == n->sblkn) {
3041       if (i < num - 1) {
3042         memmove(rp + i * nsize, rp + (i + 1) * nsize, (num - i - 1) * nsize);
3043       }
3044       --cache->num;
3045       break;
3046     }
3047   }
3048 }
3049 
_dbcache_update_lw(IWLCTX * lx,SBLK * sblk)3050 static void _dbcache_update_lw(IWLCTX *lx, SBLK *sblk) {
3051   IWDB db = lx->db;
3052   DBCACHE *cache = &db->cache;
3053   assert(sblk->pnum > 0);
3054   sblk->flags &= ~SBLK_CACHE_UPDATE;
3055   if (sblk->lvl < cache->lvl || cache->num < 1) {
3056     return;
3057   }
3058   blkn_t sblkn = ADDR2BLK(sblk->addr);
3059   size_t num = cache->num;
3060   size_t nsize = cache->nsize;
3061   uint8_t *rp = (uint8_t *) cache->nodes;
3062   for (size_t i = 0; i < num; ++i) {
3063     DBCNODE *n = (DBCNODE *)(rp + i * nsize);
3064     if (sblkn == n->sblkn) {
3065       n->kblkn = sblk->kvblkn;
3066       n->lkl = sblk->lkl;
3067       n->fullkey = (sblk->flags & SBLK_FULL_LKEY);
3068       n->k0idx = sblk->pi[0];
3069       memcpy((uint8_t *) n + offsetof(DBCNODE, lk), sblk->lk, sblk->lkl);
3070       break;
3071     }
3072   }
3073 }
3074 
3075 //--------------------------  CURSOR
3076 
_cursor_get_ge_idx(IWLCTX * lx,IWKV_cursor_op op,uint8_t * oidx)3077 IW_INLINE WUR iwrc _cursor_get_ge_idx(IWLCTX *lx, IWKV_cursor_op op, uint8_t *oidx) {
3078   iwrc rc = _lx_find_bounds(lx);
3079   RCRET(rc);
3080   bool found;
3081   uint8_t *mm, idx;
3082   IWFS_FSM *fsm = &lx->db->iwkv->fsm;
3083   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
3084   RCRET(rc);
3085   rc = _sblk_loadkvblk_mm(lx, lx->lower, mm);
3086   RCGO(rc, finish);
3087   rc = _sblk_find_pi_mm(lx->lower, lx, mm, &found, &idx);
3088   RCGO(rc, finish);
3089   if (found) {
3090     *oidx = idx;
3091   } else {
3092     if (op == IWKV_CURSOR_EQ || (lx->lower->flags & SBLK_DB) || lx->lower->pnum < 1) {
3093       rc = IWKV_ERROR_NOTFOUND;
3094     } else {
3095       *oidx = idx ? idx - 1 : idx;
3096     }
3097   }
3098 
3099 finish:
3100   IWRC(fsm->release_mmap(fsm), rc);
3101   return rc;
3102 }
3103 
_cursor_to_lr(IWKV_cursor cur,IWKV_cursor_op op)3104 static WUR iwrc _cursor_to_lr(IWKV_cursor cur, IWKV_cursor_op op) {
3105   iwrc rc = 0;
3106   IWDB db = cur->lx.db;
3107   IWLCTX *lx = &cur->lx;
3108   blkn_t dblk = ADDR2BLK(db->addr);
3109   if (op < IWKV_CURSOR_NEXT) { // IWKV_CURSOR_BEFORE_FIRST | IWKV_CURSOR_AFTER_LAST
3110     if (cur->cn) {
3111       _sblk_release(lx, &cur->cn);
3112     }
3113     if (op == IWKV_CURSOR_BEFORE_FIRST) {
3114       cur->dbaddr = db->addr;
3115       cur->cnpos = KVBLK_IDXNUM - 1;
3116     } else {
3117       cur->dbaddr = -1; // Negative as sign of dbtail
3118       cur->cnpos = 0;
3119     }
3120     return 0;
3121   }
3122 
3123 start:
3124   if (op < IWKV_CURSOR_EQ) { // IWKV_CURSOR_NEXT | IWKV_CURSOR_PREV
3125     blkn_t n = 0;
3126     if (!cur->cn) {
3127       if (cur->dbaddr) {
3128         rc = _sblk_at(lx, (cur->dbaddr < 0 ? 0 : cur->dbaddr), 0, &cur->cn);
3129         cur->dbaddr = 0;
3130         RCGO(rc, finish);
3131       } else {
3132         rc = IWKV_ERROR_NOTFOUND;
3133         goto finish;
3134       }
3135     }
3136     if (op == IWKV_CURSOR_NEXT) {
3137       if (cur->skip_next > 0) {
3138         goto finish;
3139       }
3140       if (cur->cnpos + 1 >= cur->cn->pnum) {
3141         n = cur->cn->n[0];
3142         if (!n) {
3143           rc = IWKV_ERROR_NOTFOUND;
3144           goto finish;
3145         }
3146         _sblk_release(lx, &cur->cn);
3147         rc = _sblk_at(lx, BLK2ADDR(n), 0, &cur->cn);
3148         RCGO(rc, finish);
3149         cur->cnpos = 0;
3150         if (IW_UNLIKELY(!cur->cn->pnum)) {
3151           goto start;
3152         }
3153       } else {
3154         if (cur->cn->flags & SBLK_DB) {
3155           rc = IWKV_ERROR_NOTFOUND;
3156           goto finish;
3157         }
3158         ++cur->cnpos;
3159       }
3160     } else { // IWKV_CURSOR_PREV
3161       if (cur->skip_next < 0) {
3162         goto finish;
3163       }
3164       if (cur->cnpos == 0) {
3165         n = cur->cn->p0;
3166         if (!n || n == dblk) {
3167           rc = IWKV_ERROR_NOTFOUND;
3168           goto finish;
3169         }
3170         _sblk_release(lx, &cur->cn);
3171         RCGO(rc, finish);
3172         rc = _sblk_at(lx, BLK2ADDR(n), 0, &cur->cn);
3173         RCGO(rc, finish);
3174         if (IW_LIKELY(cur->cn->pnum)) {
3175           cur->cnpos = cur->cn->pnum - 1;
3176         } else {
3177           goto start;
3178         }
3179       } else {
3180         if (cur->cn->flags & SBLK_DB) {
3181           rc = IWKV_ERROR_NOTFOUND;
3182           goto finish;
3183         }
3184         --cur->cnpos;
3185       }
3186     }
3187   } else { // IWKV_CURSOR_EQ | IWKV_CURSOR_GE
3188     if (!lx->key) {
3189       rc = IW_ERROR_INVALID_STATE;
3190       goto finish;
3191     }
3192     rc = _cursor_get_ge_idx(lx, op, &cur->cnpos);
3193     if (lx->upper) {
3194       _sblk_release(lx, &lx->upper);
3195     }
3196     if (!rc) {
3197       cur->cn = lx->lower;
3198       lx->lower = 0;
3199     }
3200   }
3201 
3202 finish:
3203   cur->skip_next = 0;
3204   if (rc && rc != IWKV_ERROR_NOTFOUND) {
3205     if (cur->cn) _sblk_release(lx, &cur->cn);
3206   }
3207   return rc;
3208 }
3209 
3210 //--------------------------  PUBLIC API
3211 
_kv_ecodefn(locale_t locale,uint32_t ecode)3212 static const char *_kv_ecodefn(locale_t locale, uint32_t ecode) {
3213   if (!(ecode > _IWKV_ERROR_START && ecode < _IWKV_ERROR_END)) {
3214     return 0;
3215   }
3216   switch (ecode) {
3217     case IWKV_ERROR_NOTFOUND:
3218       return "Key not found. (IWKV_ERROR_NOTFOUND)";
3219     case IWKV_ERROR_KEY_EXISTS:
3220       return "Key exists. (IWKV_ERROR_KEY_EXISTS)";
3221     case IWKV_ERROR_MAXKVSZ:
3222       return "Size of Key+value must be not greater than 0xfffffff bytes (IWKV_ERROR_MAXKVSZ)";
3223     case IWKV_ERROR_CORRUPTED:
3224       return "Database file invalid or corrupted (IWKV_ERROR_CORRUPTED)";
3225     case IWKV_ERROR_DUP_VALUE_SIZE:
3226       return "Value size is not compatible for insertion into sorted values array (IWKV_ERROR_DUP_VALUE_SIZE)";
3227     case IWKV_ERROR_KEY_NUM_VALUE_SIZE:
3228       return "Given key is not compatible to store as number (IWKV_ERROR_KEY_NUM_VALUE_SIZE)";
3229     case IWKV_ERROR_INCOMPATIBLE_DB_MODE:
3230       return "Incompatible database open mode (IWKV_ERROR_INCOMPATIBLE_DB_MODE)";
3231     case IWKV_ERROR_INCOMPATIBLE_DB_FORMAT:
3232       return "Incompatible database format version, please migrate database data (IWKV_ERROR_INCOMPATIBLE_DB_FORMAT)";
3233     case IWKV_ERROR_CORRUPTED_WAL_FILE:
3234       return "Corrupted WAL file (IWKV_ERROR_CORRUPTED_WAL_FILE)";
3235     case IWKV_ERROR_VALUE_CANNOT_BE_INCREMENTED:
3236       return "Stored value cannot be incremented/descremented (IWKV_ERROR_VALUE_CANNOT_BE_INCREMENTED)";
3237     case IWKV_ERROR_WAL_MODE_REQUIRED:
3238       return "Operation requires WAL enabled database. (IWKV_ERROR_WAL_MODE_REQUIRED)";
3239     case IWKV_ERROR_BACKUP_IN_PROGRESS:
3240       return "ackup operation in progress. (IWKV_ERROR_BACKUP_IN_PROGRESS)";
3241     default:
3242       break;
3243   }
3244   return 0;
3245 }
3246 
iwkv_init(void)3247 iwrc iwkv_init(void) {
3248   static int _kv_initialized = 0;
3249   if (!__sync_bool_compare_and_swap(&_kv_initialized, 0, 1)) {
3250     return 0;
3251   }
3252   return iwlog_register_ecodefn(_kv_ecodefn);
3253 }
3254 
_szpolicy(off_t nsize,off_t csize,struct IWFS_EXT * f,void ** _ctx)3255 static off_t _szpolicy(off_t nsize, off_t csize, struct IWFS_EXT *f, void **_ctx) {
3256   off_t res;
3257   size_t aunit = iwp_alloc_unit();
3258   if (csize < 0x4000000) { // Doubled alloc up to 64M
3259     res = csize ? csize : aunit;
3260     while (res < nsize) {
3261       res <<= 1;
3262     }
3263   } else {
3264     res = nsize + 10 * 1024 * 1024; // + 10M extra space
3265   }
3266   res = IW_ROUNDUP(res, aunit);
3267   return res;
3268 }
3269 
iwkv_state(IWKV iwkv,IWFS_FSM_STATE * out)3270 iwrc iwkv_state(IWKV iwkv, IWFS_FSM_STATE *out) {
3271   if (!iwkv || !out) {
3272     return IW_ERROR_INVALID_ARGS;
3273   }
3274   int rci;
3275   API_RLOCK(iwkv, rci);
3276   IWFS_FSM fsm = iwkv->fsm;
3277   iwrc rc = fsm.state(&fsm, out);
3278   API_UNLOCK(iwkv, rci, rc);
3279   return rc;
3280 }
3281 
iwkv_online_backup(IWKV iwkv,uint64_t * ts,const char * target_file)3282 iwrc iwkv_online_backup(IWKV iwkv, uint64_t *ts, const char *target_file) {
3283   return iwal_online_backup(iwkv, ts, target_file);
3284 }
3285 
_iwkv_check_online_backup(const char * path,iwp_lockmode extra_lock_flags,bool * out_has_online_bkp)3286 static iwrc _iwkv_check_online_backup(const char *path, iwp_lockmode extra_lock_flags, bool *out_has_online_bkp) {
3287   size_t sp;
3288   uint32_t lv;
3289   off_t fsz, pos;
3290   uint64_t waloff; // WAL offset
3291   char buf[16384];
3292 
3293   *out_has_online_bkp = false;
3294   const size_t aunit = iwp_alloc_unit();
3295   char *wpath = 0;
3296 
3297   IWFS_FILE f = {0}, w = {0};
3298   IWFS_FILE_STATE fs, fw;
3299   iwrc rc = iwfs_file_open(&f, &(IWFS_FILE_OPTS) {
3300     .path = path,
3301     .omode = IWFS_OREAD | IWFS_OWRITE,
3302     .lock_mode = IWP_WLOCK | extra_lock_flags
3303   });
3304   if (rc == IW_ERROR_NOT_EXISTS) {
3305     return 0;
3306   }
3307   RCRET(rc);
3308 
3309   rc = f.state(&f, &fs);
3310   RCGO(rc, finish);
3311 
3312   rc = iwp_lseek(fs.fh, 0, IWP_SEEK_END, &fsz);
3313   RCGO(rc, finish);
3314   if (fsz < iwp_alloc_unit()) {
3315     goto finish;
3316   }
3317 
3318   rc = iwp_pread(fs.fh, 0, &lv, sizeof(lv), &sp);
3319   RCGO(rc, finish);
3320   lv = IW_ITOHL(lv);
3321   if (sp != sizeof(lv) || lv != IWFSM_MAGICK) {
3322     goto finish;
3323   }
3324 
3325   rc = iwp_pread(fs.fh, IWFSM_CUSTOM_HDR_DATA_OFFSET, &lv, sizeof(lv), &sp);
3326   RCGO(rc, finish);
3327   lv = IW_ITOHL(lv);
3328   if (sp != sizeof(lv) || lv != IWKV_MAGIC) {
3329     goto finish;
3330   }
3331 
3332   rc = iwp_lseek(fs.fh, (off_t) -1 * sizeof(lv), IWP_SEEK_END, 0);
3333   RCGO(rc, finish);
3334 
3335   rc = iwp_read(fs.fh, &lv, sizeof(lv), &sp);
3336   RCGO(rc, finish);
3337   lv = IW_ITOHL(lv);
3338   if (sp != sizeof(lv) || lv != IWKV_BACKUP_MAGIC) {
3339     goto finish;
3340   }
3341 
3342   // Get WAL data offset
3343   rc = iwp_lseek(fs.fh, (off_t) -1 * (sizeof(waloff) + sizeof(lv)), IWP_SEEK_END, &pos);
3344   RCGO(rc, finish);
3345 
3346   rc = iwp_read(fs.fh, &waloff, sizeof(waloff), &sp);
3347   RCGO(rc, finish);
3348 
3349   waloff = IW_ITOHLL(waloff);
3350   if ((waloff != pos && waloff > pos - sizeof(WBSEP)) || (waloff & (aunit - 1))) {
3351     goto finish;
3352   }
3353 
3354   // Read the first WAL instruction: WBSEP
3355   if (waloff != pos) { // Not an empty WAL?
3356     WBSEP wbsep = {0};
3357     rc = iwp_pread(fs.fh, waloff, &wbsep, sizeof(wbsep), &sp);
3358     RCGO(rc, finish);
3359     if (wbsep.id != WOP_SEP) {
3360       goto finish;
3361     }
3362   }
3363 
3364   // Now we have an online backup image, unpack WAL file
3365 
3366   sp = strlen(path);
3367   wpath = malloc(sp + 4 /*-wal*/ + 1 /*\0*/);
3368   if (!wpath) {
3369     rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
3370     goto finish;
3371   }
3372   memcpy(wpath, path, sp);
3373   memcpy(wpath + sp, "-wal", 4);
3374   wpath[sp + 4] = '\0';
3375 
3376   iwlog_warn("Unpacking WAL from online backup into: %s", wpath);
3377   *out_has_online_bkp = true;
3378 
3379   // WAL file
3380   rc = iwfs_file_open(&w, &(IWFS_FILE_OPTS) {
3381     .path = wpath,
3382     .omode = IWFS_OREAD | IWFS_OWRITE | IWFS_OTRUNC
3383   });
3384   RCGO(rc, finish);
3385 
3386   rc = w.state(&w, &fw);
3387   RCGO(rc, finish);
3388 
3389   // WAL content copy
3390   rc = iwp_lseek(fs.fh, waloff, IWP_SEEK_SET, 0);
3391   RCGO(rc, finish);
3392   fsz = fsz - waloff - sizeof(lv) /* magic */ - sizeof(waloff) /* wal offset */;
3393   if (fsz > 0) {
3394     sp = 0;
3395     do {
3396       rc = iwp_read(fs.fh, buf, sizeof(buf), &sp);
3397       RCGO(rc, finish);
3398       if (sp > fsz) {
3399         sp = fsz;
3400       }
3401       fsz -= sp;
3402       rc = iwp_write(fw.fh, buf, sp);
3403       RCGO(rc, finish);
3404     } while (fsz > 0 && sp > 0);
3405   }
3406   rc = iwp_fsync(fw.fh);
3407   RCGO(rc, finish);
3408 
3409   rc = iwp_ftruncate(fs.fh, waloff);
3410   RCGO(rc, finish);
3411 
3412   rc = iwp_fsync(fs.fh);
3413   RCGO(rc, finish);
3414 
3415 finish:
3416   if (f.impl) {
3417     IWRC(f.close(&f), rc);
3418   }
3419   if (w.impl) {
3420     IWRC(w.close(&w), rc);
3421   }
3422   free(wpath);
3423   return rc;
3424 }
3425 
iwkv_open(const IWKV_OPTS * opts,IWKV * iwkvp)3426 iwrc iwkv_open(const IWKV_OPTS *opts, IWKV *iwkvp) {
3427   if (!opts || !iwkvp || !opts->path) {
3428     return IW_ERROR_INVALID_ARGS;
3429   }
3430   *iwkvp = 0;
3431   int rci;
3432   iwrc rc = 0;
3433   uint32_t lv;
3434   uint64_t llv;
3435   uint8_t *rp, *mm;
3436   bool has_online_bkp = false;
3437 
3438   rc = iw_init();
3439   RCRET(rc);
3440 
3441   if (opts->random_seed) {
3442     iwu_rand_seed(opts->random_seed);
3443   }
3444   iwkv_openflags oflags = opts->oflags;
3445   iwfs_omode omode = IWFS_OREAD;
3446   if (oflags & IWKV_TRUNC) {
3447     oflags &= ~IWKV_RDONLY;
3448     omode |= IWFS_OTRUNC;
3449   }
3450   if (!(oflags & IWKV_RDONLY)) {
3451     omode |= IWFS_OWRITE;
3452     omode |= IWFS_OCREATE;
3453   }
3454   if ((omode & IWFS_OWRITE) && !(omode & IWFS_OTRUNC)) {
3455     iwp_lockmode extra_lock_flags = 0;
3456     if (opts->file_lock_fail_fast) {
3457       extra_lock_flags |= IWP_NBLOCK;
3458     }
3459     rc = _iwkv_check_online_backup(opts->path, extra_lock_flags, &has_online_bkp);
3460     RCRET(rc);
3461   }
3462 
3463   *iwkvp = calloc(1, sizeof(struct _IWKV));
3464   if (!*iwkvp) {
3465     return iwrc_set_errno(IW_ERROR_ALLOC, errno);
3466   }
3467   IWKV iwkv = *iwkvp;
3468   iwkv->fmt_version = opts->fmt_version > 0 ? opts->fmt_version : IWKV_FORMAT;
3469   if (iwkv->fmt_version > IWKV_FORMAT) {
3470     rc = IWKV_ERROR_INCOMPATIBLE_DB_FORMAT;
3471     iwlog_ecode_error3(rc);
3472     return rc;
3473   }
3474   // Adjust lower key len accourding to database format version
3475   if (iwkv->fmt_version < 2) {
3476     iwkv->pklen = PREFIX_KEY_LEN_V1;
3477   } else {
3478     iwkv->pklen = PREFIX_KEY_LEN_V2;
3479   }
3480 
3481   pthread_rwlockattr_t attr;
3482   pthread_rwlockattr_init(&attr);
3483 #if defined __linux__ && (defined __USE_UNIX98 || defined __USE_XOPEN2K)
3484   pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
3485 #endif
3486   rci = pthread_rwlock_init(&iwkv->rwl, &attr);
3487   if (rci) {
3488     free(*iwkvp);
3489     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
3490   }
3491   rci = pthread_mutex_init(&iwkv->wk_mtx, 0);
3492   if (rci) {
3493     pthread_rwlock_destroy(&iwkv->rwl);
3494     free(*iwkvp);
3495     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
3496   }
3497   rci = pthread_cond_init(&iwkv->wk_cond, 0);
3498   if (rci) {
3499     pthread_rwlock_destroy(&iwkv->rwl);
3500     pthread_mutex_destroy(&iwkv->wk_mtx);
3501     free(*iwkvp);
3502     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
3503   }
3504 
3505   iwkv->oflags = oflags;
3506   IWFS_FSM_STATE fsmstate;
3507   IWFS_FSM_OPTS fsmopts = {
3508     .exfile = {
3509       .file = {
3510         .path       = opts->path,
3511         .omode      = omode,
3512         .lock_mode  = (oflags & IWKV_RDONLY) ? IWP_RLOCK : IWP_WLOCK
3513       },
3514       .rspolicy     = _szpolicy,
3515       .maxoff       = IWKV_MAX_DBSZ,
3516       .use_locks    = true
3517     },
3518     .bpow = IWKV_FSM_BPOW,      // 64 bytes block size
3519     .hdrlen = KVHDRSZ,          // Size of custom file header
3520     .oflags = ((oflags & IWKV_RDONLY) ? IWFSM_NOLOCKS : 0),
3521     .mmap_all = true
3522   };
3523 #ifndef NDEBUG
3524   fsmopts.oflags |= IWFSM_STRICT;
3525 #endif
3526   if (oflags & IWKV_NO_TRIM_ON_CLOSE) {
3527     fsmopts.oflags |= IWFSM_NO_TRIM_ON_CLOSE;
3528   }
3529   if (opts->file_lock_fail_fast) {
3530     fsmopts.exfile.file.lock_mode |= IWP_NBLOCK;
3531   }
3532   // Init WAL
3533   rc = iwal_create(iwkv, opts, &fsmopts, has_online_bkp);
3534   RCGO(rc, finish);
3535 
3536   // Now open database file
3537   rc = iwfs_fsmfile_open(&iwkv->fsm, &fsmopts);
3538   RCGO(rc, finish);
3539 
3540   IWFS_FSM *fsm = &iwkv->fsm;
3541   iwkv->dbs = kh_init(DBS);
3542   rc = fsm->state(fsm, &fsmstate);
3543   RCGO(rc, finish);
3544 
3545   // Database header: [magic:u4, first_addr:u8, db_format_version:u4]
3546   if (fsmstate.exfile.file.ostatus & IWFS_OPEN_NEW) {
3547     uint8_t hdr[KVHDRSZ] = {0};
3548     uint8_t *wp = hdr;
3549     IW_WRITELV(wp, lv, IWKV_MAGIC);
3550     wp += sizeof(llv); // skip first db addr
3551     IW_WRITELV(wp, lv, iwkv->fmt_version);
3552     rc = fsm->writehdr(fsm, 0, hdr, sizeof(hdr));
3553     RCGO(rc, finish);
3554     rc = fsm->sync(fsm, 0);
3555     RCGO(rc, finish);
3556   } else {
3557     off_t dbaddr; // first database address
3558     uint8_t hdr[KVHDRSZ];
3559     rc = fsm->readhdr(fsm, 0, hdr, KVHDRSZ);
3560     RCGO(rc, finish);
3561     rp = hdr; // -V507
3562     IW_READLV(rp, lv, lv);
3563     IW_READLLV(rp, llv, dbaddr);
3564     if (lv != IWKV_MAGIC || dbaddr < 0) {
3565       rc = IWKV_ERROR_CORRUPTED;
3566       iwlog_ecode_error3(rc);
3567       goto finish;
3568     }
3569     IW_READLV(rp, lv, iwkv->fmt_version);
3570     if ((iwkv->fmt_version > IWKV_FORMAT)) {
3571       rc = IWKV_ERROR_INCOMPATIBLE_DB_FORMAT;
3572       iwlog_ecode_error3(rc);
3573       goto finish;
3574     }
3575     if (iwkv->fmt_version < 2) {
3576       iwkv->pklen = PREFIX_KEY_LEN_V1;
3577     } else {
3578       iwkv->pklen = PREFIX_KEY_LEN_V2;
3579     }
3580     rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
3581     RCGO(rc, finish);
3582     rc = _db_load_chain(iwkv, dbaddr, mm);
3583     fsm->release_mmap(fsm);
3584   }
3585   (*iwkvp)->open = true;
3586 
3587 finish:
3588   if (rc) {
3589     (*iwkvp)->open = true; // will be closed in iwkv_close
3590     IWRC(iwkv_close(iwkvp), rc);
3591   }
3592   return rc;
3593 }
3594 
iwkv_exclusive_lock(IWKV iwkv)3595 iwrc iwkv_exclusive_lock(IWKV iwkv) {
3596   return _wnw(iwkv, _wnw_iwkw_wl);
3597 }
3598 
iwkv_exclusive_unlock(IWKV iwkv)3599 iwrc iwkv_exclusive_unlock(IWKV iwkv) {
3600   int rci;
3601   iwrc rc = 0;
3602   API_UNLOCK(iwkv, rci, rc);
3603   return rc;
3604 }
3605 
iwkv_close(IWKV * iwkvp)3606 iwrc iwkv_close(IWKV *iwkvp) {
3607   ENSURE_OPEN((*iwkvp));
3608   IWKV iwkv = *iwkvp;
3609   iwkv->open = false;
3610   iwal_shutdown(iwkv);
3611   iwrc rc = iwkv_exclusive_lock(iwkv);
3612   RCRET(rc);
3613   IWDB db = iwkv->first_db;
3614   while (db) {
3615     IWDB ndb = db->next;
3616     _db_release_lw(&db);
3617     db = ndb;
3618   }
3619   IWRC(iwkv->fsm.close(&iwkv->fsm), rc);
3620   // Below the memory cleanup only
3621   if (iwkv->dbs) {
3622     kh_destroy(DBS, iwkv->dbs);
3623     iwkv->dbs = 0;
3624   }
3625   iwkv_exclusive_unlock(iwkv);
3626   pthread_rwlock_destroy(&iwkv->rwl);
3627   pthread_mutex_destroy(&iwkv->wk_mtx);
3628   pthread_cond_destroy(&iwkv->wk_cond);
3629   free(iwkv);
3630   *iwkvp = 0;
3631   return rc;
3632 }
3633 
_iwkv_sync(IWKV iwkv,iwfs_sync_flags _flags)3634 static iwrc _iwkv_sync(IWKV iwkv, iwfs_sync_flags _flags) {
3635   ENSURE_OPEN(iwkv);
3636   if (iwkv->oflags & IWKV_RDONLY) {
3637     return IW_ERROR_READONLY;
3638   }
3639   iwrc rc;
3640   if (iwkv->dlsnr) {
3641     rc = iwal_poke_savepoint(iwkv);
3642   } else {
3643     IWFS_FSM *fsm = &iwkv->fsm;
3644     pthread_rwlock_wrlock(&iwkv->rwl);
3645     iwfs_sync_flags flags = IWFS_FDATASYNC | _flags;
3646     rc = fsm->sync(fsm, flags);
3647     pthread_rwlock_unlock(&iwkv->rwl);
3648   }
3649   return rc;
3650 }
3651 
iwkv_sync(IWKV iwkv,iwfs_sync_flags _flags)3652 iwrc iwkv_sync(IWKV iwkv, iwfs_sync_flags _flags) {
3653   ENSURE_OPEN(iwkv);
3654   if (iwkv->oflags & IWKV_RDONLY) {
3655     return IW_ERROR_READONLY;
3656   }
3657   iwrc rc;
3658   if (iwkv->dlsnr) {
3659     rc = iwkv_exclusive_lock(iwkv);
3660     RCRET(rc);
3661     rc = iwal_savepoint_exl(iwkv, true);
3662     iwkv_exclusive_unlock(iwkv);
3663   } else {
3664     IWFS_FSM *fsm = &iwkv->fsm;
3665     pthread_rwlock_wrlock(&iwkv->rwl);
3666     iwfs_sync_flags flags = IWFS_FDATASYNC | _flags;
3667     rc = fsm->sync(fsm, flags);
3668     pthread_rwlock_unlock(&iwkv->rwl);
3669   }
3670   return rc;
3671 }
3672 
iwkv_db(IWKV iwkv,uint32_t dbid,iwdb_flags_t dbflg,IWDB * dbp)3673 iwrc iwkv_db(IWKV iwkv, uint32_t dbid, iwdb_flags_t dbflg, IWDB *dbp) {
3674   int rci;
3675   iwrc rc = 0;
3676   IWDB db = 0;
3677   *dbp = 0;
3678   API_RLOCK(iwkv, rci);
3679   khiter_t ki = kh_get(DBS, iwkv->dbs, dbid);
3680   if (ki != kh_end(iwkv->dbs)) {
3681     db = kh_value(iwkv->dbs, ki);
3682   }
3683   API_UNLOCK(iwkv, rci, rc);
3684   RCRET(rc);
3685   if (db) {
3686     if (db->dbflg != dbflg) {
3687       return IWKV_ERROR_INCOMPATIBLE_DB_MODE;
3688     }
3689     *dbp = db;
3690     return 0;
3691   }
3692   if (iwkv->oflags & IWKV_RDONLY) {
3693     return IW_ERROR_READONLY;
3694   }
3695   rc = iwkv_exclusive_lock(iwkv);
3696   RCRET(rc);
3697   ki = kh_get(DBS, iwkv->dbs, dbid);
3698   if (ki != kh_end(iwkv->dbs)) {
3699     db = kh_value(iwkv->dbs, ki);
3700   }
3701   if (db) {
3702     if (db->dbflg != dbflg) {
3703       return IWKV_ERROR_INCOMPATIBLE_DB_MODE;
3704     }
3705     *dbp = db;
3706   } else {
3707     rc = _db_create_lw(iwkv, dbid, dbflg, dbp);
3708   }
3709   if (!rc) {
3710     rc = iwal_savepoint_exl(iwkv, true);
3711   }
3712   iwkv_exclusive_unlock(iwkv);
3713   return rc;
3714 }
3715 
iwkv_new_db(IWKV iwkv,iwdb_flags_t dbflg,uint32_t * dbidp,IWDB * dbp)3716 iwrc iwkv_new_db(IWKV iwkv, iwdb_flags_t dbflg, uint32_t *dbidp, IWDB *dbp) {
3717   *dbp = 0;
3718   *dbidp = 0;
3719   if (iwkv->oflags & IWKV_RDONLY) {
3720     return IW_ERROR_READONLY;
3721   }
3722   uint32_t dbid = 0;
3723   iwrc rc = iwkv_exclusive_lock(iwkv);
3724   RCRET(rc);
3725   for (khiter_t k = kh_begin(iwkv->dbs); k != kh_end(iwkv->dbs); ++k) {
3726     if (!kh_exist(iwkv->dbs, k)) continue;
3727     uint32_t id = kh_key(iwkv->dbs, k);
3728     if (id > dbid) dbid = id;
3729   }
3730   dbid++;
3731   rc = _db_create_lw(iwkv, dbid, dbflg, dbp);
3732   if (!rc) {
3733     *dbidp = dbid;
3734     rc = iwal_savepoint_exl(iwkv, true);
3735   }
3736   iwkv_exclusive_unlock(iwkv);
3737   return rc;
3738 }
3739 
iwkv_db_cache_release(IWDB db)3740 iwrc iwkv_db_cache_release(IWDB db) {
3741   if (!db || !db->iwkv) {
3742     return IW_ERROR_INVALID_ARGS;
3743   }
3744   int rci;
3745   iwrc rc = 0;
3746   API_DB_WLOCK(db, rci);
3747   _dbcache_destroy_lw(db);
3748   API_DB_UNLOCK(db, rci, rc);
3749   return rc;
3750 }
3751 
iwkv_db_destroy(IWDB * dbp)3752 iwrc iwkv_db_destroy(IWDB *dbp) {
3753   if (!dbp || !*dbp) {
3754     return IW_ERROR_INVALID_ARGS;
3755   }
3756   IWDB db = *dbp;
3757   IWKV iwkv = db->iwkv;
3758   *dbp = 0;
3759   if (iwkv->oflags & IWKV_RDONLY) {
3760     return IW_ERROR_READONLY;
3761   }
3762   iwrc rc = iwkv_exclusive_lock(iwkv);
3763   RCRET(rc);
3764   rc = _db_destroy_lw(&db);
3765   iwkv_exclusive_unlock(iwkv);
3766   return rc;
3767 }
3768 
iwkv_puth(IWDB db,const IWKV_val * key,const IWKV_val * val,iwkv_opflags opflags,IWKV_PUT_HANDLER ph,void * phop)3769 iwrc iwkv_puth(IWDB db, const IWKV_val *key, const IWKV_val *val,
3770                iwkv_opflags opflags, IWKV_PUT_HANDLER ph, void *phop) {
3771   if (!db || !db->iwkv || !key || !key->size || !val) {
3772     return IW_ERROR_INVALID_ARGS;
3773   }
3774   IWKV iwkv = db->iwkv;
3775   if (iwkv->oflags & IWKV_RDONLY) {
3776     return IW_ERROR_READONLY;
3777   }
3778   if (opflags & IWKV_VAL_INCREMENT) {
3779     // No overwrite for increment
3780     opflags &= ~IWKV_NO_OVERWRITE;
3781   }
3782 
3783   int rci;
3784   IWKV_val ekey;
3785   uint8_t nbuf[IW_VNUMBUFSZ];
3786   iwrc rc = _to_effective_key(db, key, &ekey, nbuf);
3787   RCRET(rc);
3788 
3789   IWLCTX lx = {
3790     .db = db,
3791     .key = &ekey,
3792     .val = (IWKV_val *) val,
3793     .nlvl = -1,
3794     .op = IWLCTX_PUT,
3795     .opflags = opflags,
3796     .ph = ph,
3797     .phop = phop
3798   };
3799   API_DB_WLOCK(db, rci);
3800   if (!db->cache.open) {
3801     rc = _dbcache_fill_lw(&lx);
3802     RCGO(rc, finish);
3803   }
3804   rc = _lx_put_lw(&lx);
3805 
3806 finish:
3807   API_DB_UNLOCK(db, rci, rc);
3808   if (!rc) {
3809     if (lx.opflags & IWKV_SYNC) {
3810       rc = _iwkv_sync(iwkv, 0);
3811     } else {
3812       rc = iwal_poke_checkpoint(iwkv, false);
3813     }
3814   }
3815   return rc;
3816 }
3817 
iwkv_put(IWDB db,const IWKV_val * key,const IWKV_val * val,iwkv_opflags opflags)3818 iwrc iwkv_put(IWDB db, const IWKV_val *key, const IWKV_val *val, iwkv_opflags opflags) {
3819   return iwkv_puth(db, key, val, opflags, 0, 0);
3820 }
3821 
iwkv_get(IWDB db,const IWKV_val * key,IWKV_val * oval)3822 iwrc iwkv_get(IWDB db, const IWKV_val *key, IWKV_val *oval) {
3823   if (!db || !db->iwkv || !key || !oval) {
3824     return IW_ERROR_INVALID_ARGS;
3825   }
3826 
3827   int rci;
3828   IWKV_val ekey;
3829   uint8_t nbuf[IW_VNUMBUFSZ];
3830   iwrc rc = _to_effective_key(db, key, &ekey, nbuf);
3831   RCRET(rc);
3832 
3833   IWLCTX lx = {
3834     .db = db,
3835     .key = &ekey,
3836     .val = oval,
3837     .nlvl = -1
3838   };
3839   oval->size = 0;
3840   if (IW_LIKELY(db->cache.open)) {
3841     API_DB_RLOCK(db, rci);
3842   } else {
3843     API_DB_WLOCK(db, rci);
3844     if (!db->cache.open) { // -V547
3845       rc = _dbcache_fill_lw(&lx);
3846       RCGO(rc, finish);
3847     }
3848   }
3849   rc = _lx_get_lr(&lx);
3850 
3851 finish:
3852   API_DB_UNLOCK(db, rci, rc);
3853   return rc;
3854 }
3855 
iwkv_get_copy(IWDB db,const IWKV_val * key,void * vbuf,size_t vbufsz,size_t * vsz)3856 iwrc iwkv_get_copy(IWDB db, const IWKV_val *key, void *vbuf, size_t vbufsz, size_t *vsz) {
3857   if (!db || !db->iwkv || !key || !vbuf) {
3858     return IW_ERROR_INVALID_ARGS;
3859   }
3860   *vsz = 0;
3861 
3862   int rci;
3863   bool found;
3864   IWKV_val ekey;
3865   uint32_t ovalsz;
3866   uint8_t *mm = 0, *oval, idx;
3867   IWFS_FSM *fsm = &db->iwkv->fsm;
3868   uint8_t nbuf[IW_VNUMBUFSZ];
3869   iwrc rc = _to_effective_key(db, key, &ekey, nbuf);
3870   RCRET(rc);
3871 
3872   IWLCTX lx = {
3873     .db = db,
3874     .key = &ekey,
3875     .nlvl = -1
3876   };
3877   if (IW_LIKELY(db->cache.open)) {
3878     API_DB_RLOCK(db, rci);
3879   } else {
3880     API_DB_WLOCK(db, rci);
3881     if (!db->cache.open) { // -V547
3882       rc = _dbcache_fill_lw(&lx);
3883       RCGO(rc, finish);
3884     }
3885   }
3886   rc = _lx_find_bounds(&lx);
3887   RCGO(rc, finish);
3888   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
3889   RCGO(rc, finish);
3890   rc = _sblk_loadkvblk_mm(&lx, lx.lower, mm);
3891   RCGO(rc, finish);
3892   rc = _sblk_find_pi_mm(lx.lower, &lx, mm, &found, &idx);
3893   RCGO(rc, finish);
3894   if (found) {
3895     _kvblk_value_peek(lx.lower->kvblk, lx.lower->pi[idx], mm, &oval, &ovalsz);
3896     *vsz = ovalsz;
3897     memcpy(vbuf, oval, MIN(vbufsz, ovalsz));
3898   } else {
3899     rc = IWKV_ERROR_NOTFOUND;
3900   }
3901 
3902 finish:
3903   if (mm) {
3904     IWRC(fsm->release_mmap(fsm), rc);
3905   }
3906   _lx_release_mm(&lx, 0);
3907   API_DB_UNLOCK(db, rci, rc);
3908   return rc;
3909 }
3910 
iwkv_db_set_meta(IWDB db,void * buf,size_t sz)3911 iwrc iwkv_db_set_meta(IWDB db, void *buf, size_t sz) {
3912   if (!db || !db->iwkv || !buf) {
3913     return IW_ERROR_INVALID_ARGS;
3914   }
3915   if (!sz) {
3916     return 0;
3917   }
3918 
3919   int rci;
3920   iwrc rc = 0;
3921   bool resized = false;
3922   uint8_t *mm = 0, *wp, *sp;
3923   IWFS_FSM *fsm = &db->iwkv->fsm;
3924   size_t asz = IW_ROUNDUP(sz, 1U << IWKV_FSM_BPOW);
3925 
3926   API_DB_WLOCK(db, rci);
3927   if (asz > db->meta_blkn || asz * 2 <= db->meta_blkn) {
3928     off_t oaddr = 0;
3929     off_t olen = 0;
3930     if (db->meta_blk) {
3931       rc = fsm->deallocate(fsm, BLK2ADDR(db->meta_blk), BLK2ADDR(db->meta_blkn));
3932       RCGO(rc, finish);
3933     }
3934     rc = fsm->allocate(fsm, asz, &oaddr, &olen, IWKV_FSM_ALLOC_FLAGS);
3935     RCGO(rc, finish);
3936     db->meta_blk = ADDR2BLK(oaddr);
3937     db->meta_blkn = ADDR2BLK(olen);
3938     resized = true;
3939   }
3940   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
3941   RCGO(rc, finish);
3942   wp = mm + BLK2ADDR(db->meta_blk);
3943   memcpy(wp, buf, sz);
3944   if (db->iwkv->dlsnr) {
3945     rc = db->iwkv->dlsnr->onwrite(db->iwkv->dlsnr, wp - mm, wp, sz, 0);
3946     RCGO(rc, finish);
3947   }
3948   if (resized) {
3949     uint32_t lv;
3950     wp = mm + db->addr + DOFF_METABLK_U4;
3951     sp = wp;
3952     IW_WRITELV(wp, lv, db->meta_blk);
3953     IW_WRITELV(wp, lv, db->meta_blkn);
3954     if (db->iwkv->dlsnr) {
3955       rc = db->iwkv->dlsnr->onwrite(db->iwkv->dlsnr, sp - mm, sp, wp - sp, 0);
3956       RCGO(rc, finish);
3957     }
3958   }
3959   fsm->release_mmap(fsm);
3960   mm = 0;
3961 
3962 finish:
3963   if (mm) {
3964     fsm->release_mmap(fsm);
3965   }
3966   API_DB_UNLOCK(db, rci, rc);
3967   return rc;
3968 }
3969 
iwkv_db_get_meta(IWDB db,void * buf,size_t sz,size_t * rsz)3970 iwrc iwkv_db_get_meta(IWDB db, void *buf, size_t sz, size_t *rsz) {
3971   if (!db || !db->iwkv || !buf) {
3972     return IW_ERROR_INVALID_ARGS;
3973   }
3974   *rsz = 0;
3975   if (!sz || !db->meta_blkn) {
3976     return 0;
3977   }
3978   int rci;
3979   iwrc rc = 0;
3980   uint8_t *mm = 0;
3981   IWFS_FSM *fsm = &db->iwkv->fsm;
3982   size_t rmax = BLK2ADDR(db->meta_blkn);
3983   if (sz > rmax) {
3984     sz = rmax;
3985   }
3986   API_DB_RLOCK(db, rci);
3987   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
3988   RCGO(rc, finish);
3989   memcpy(buf, mm + BLK2ADDR(db->meta_blk), sz);
3990   *rsz = sz;
3991 
3992 finish:
3993   if (mm) {
3994     fsm->release_mmap(fsm);
3995   }
3996   API_DB_UNLOCK(db, rci, rc);
3997   return rc;
3998 }
3999 
iwkv_del(IWDB db,const IWKV_val * key,iwkv_opflags opflags)4000 iwrc iwkv_del(IWDB db, const IWKV_val *key, iwkv_opflags opflags) {
4001   if (!db || !db->iwkv || !key) {
4002     return IW_ERROR_INVALID_ARGS;
4003   }
4004   int rci;
4005   IWKV_val ekey;
4006   IWKV iwkv = db->iwkv;
4007 
4008   uint8_t nbuf[IW_VNUMBUFSZ];
4009   iwrc rc = _to_effective_key(db, key, &ekey, nbuf);
4010   RCRET(rc);
4011   IWLCTX lx = {
4012     .db = db,
4013     .key = &ekey,
4014     .nlvl = -1,
4015     .op = IWLCTX_DEL,
4016     .opflags = opflags
4017   };
4018   API_DB_WLOCK(db, rci);
4019   if (!db->cache.open) {
4020     rc = _dbcache_fill_lw(&lx);
4021     RCGO(rc, finish);
4022   }
4023   rc = _lx_del_lw(&lx);
4024 
4025 finish:
4026   API_DB_UNLOCK(db, rci, rc);
4027   if (!rc) {
4028     if (lx.opflags & IWKV_SYNC) {
4029       rc = _iwkv_sync(iwkv, 0);
4030     } else {
4031       rc = iwal_poke_checkpoint(iwkv, false);
4032     }
4033   }
4034   return rc;
4035 }
4036 
_cursor_close_lw(IWKV_cursor cur)4037 IW_INLINE iwrc _cursor_close_lw(IWKV_cursor cur) {
4038   iwrc rc = 0;
4039   cur->closed = true;
4040   IWDB db = cur->lx.db;
4041   pthread_spin_lock(&db->cursors_slk);
4042   for (IWKV_cursor c = db->cursors, pc = 0; c; pc = c, c = c->next) {
4043     if (c == cur) {
4044       if (pc) {
4045         pc->next = c->next;
4046       } else {
4047         db->cursors = c->next;
4048       }
4049       break;
4050     }
4051   }
4052   pthread_spin_unlock(&db->cursors_slk);
4053   return rc;
4054 }
4055 
iwkv_cursor_open(IWDB db,IWKV_cursor * curptr,IWKV_cursor_op op,const IWKV_val * key)4056 iwrc iwkv_cursor_open(IWDB db,
4057                       IWKV_cursor *curptr,
4058                       IWKV_cursor_op op,
4059                       const IWKV_val *key) {
4060   if (!db || !db->iwkv || !curptr ||
4061       (key && op < IWKV_CURSOR_EQ) || op < IWKV_CURSOR_BEFORE_FIRST) {
4062     return IW_ERROR_INVALID_ARGS;
4063   }
4064   iwrc rc;
4065   int rci;
4066   rc = _db_worker_inc_nolk(db);
4067   RCRET(rc);
4068   if (IW_LIKELY(db->cache.open)) {
4069     rc = _api_db_rlock(db);
4070   } else {
4071     rc = _api_db_wlock(db);
4072   }
4073   if (rc) {
4074     _db_worker_dec_nolk(db);
4075     return rc;
4076   }
4077   IWKV_cursor cur = 0;
4078   *curptr = calloc(1, sizeof(**curptr));
4079   if (!(*curptr)) {
4080     rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
4081     goto finish;
4082   }
4083   cur = *curptr;
4084   IWLCTX *lx = &cur->lx;
4085   if (key) {
4086     rc = _to_effective_key(db, key, &lx->ekey, lx->nbuf);
4087     RCGO(rc, finish);
4088     lx->key = &lx->ekey;
4089   }
4090   lx->db = db;
4091   lx->nlvl = -1;
4092   if (!db->cache.open) {
4093     rc = _dbcache_fill_lw(lx);
4094     RCGO(rc, finish);
4095   }
4096   rc = _cursor_to_lr(cur, op);
4097 
4098 finish:
4099   if (cur) {
4100     if (rc) {
4101       *curptr = 0;
4102       IWRC(_cursor_close_lw(cur), rc);
4103       free(cur);
4104     } else {
4105       pthread_spin_lock(&db->cursors_slk);
4106       cur->next = db->cursors;
4107       db->cursors = cur;
4108       pthread_spin_unlock(&db->cursors_slk);
4109     }
4110   }
4111   API_DB_UNLOCK(db, rci, rc);
4112   if (rc) {
4113     _db_worker_dec_nolk(db);
4114   }
4115   return rc;
4116 }
4117 
iwkv_cursor_close(IWKV_cursor * curp)4118 iwrc iwkv_cursor_close(IWKV_cursor *curp) {
4119   iwrc rc = 0;
4120   int rci;
4121   if (!curp || !*curp) {
4122     return 0;
4123   }
4124   IWKV_cursor cur = *curp;
4125   *curp = 0;
4126   IWKV iwkv = cur->lx.db->iwkv;
4127   if (cur->closed) {
4128     free(cur);
4129     return 0;
4130   }
4131   if (!cur->lx.db) {
4132     return IW_ERROR_INVALID_ARGS;
4133   }
4134   API_DB_WLOCK(cur->lx.db, rci);
4135   rc = _cursor_close_lw(cur);
4136   API_DB_UNLOCK(cur->lx.db, rci, rc);
4137   IWRC(_db_worker_dec_nolk(cur->lx.db), rc);
4138   free(cur);
4139   if (!rc) {
4140     rc = iwal_poke_checkpoint(iwkv, false);
4141   }
4142   return rc;
4143 }
4144 
iwkv_cursor_to(IWKV_cursor cur,IWKV_cursor_op op)4145 iwrc iwkv_cursor_to(IWKV_cursor cur, IWKV_cursor_op op) {
4146   int rci;
4147   if (!cur) {
4148     return IW_ERROR_INVALID_ARGS;
4149   }
4150   if (!cur->lx.db) {
4151     return IW_ERROR_INVALID_ARGS;
4152   }
4153   API_DB_RLOCK(cur->lx.db, rci);
4154   iwrc rc = _cursor_to_lr(cur, op);
4155   API_DB_UNLOCK(cur->lx.db, rci, rc);
4156   return rc;
4157 }
4158 
iwkv_cursor_to_key(IWKV_cursor cur,IWKV_cursor_op op,const IWKV_val * key)4159 iwrc iwkv_cursor_to_key(IWKV_cursor cur, IWKV_cursor_op op, const IWKV_val *key) {
4160   int rci;
4161   if (!cur || (op != IWKV_CURSOR_EQ && op != IWKV_CURSOR_GE)) {
4162     return IW_ERROR_INVALID_ARGS;
4163   }
4164   IWLCTX *lx = &cur->lx;
4165   if (!lx->db) {
4166     return IW_ERROR_INVALID_STATE;
4167   }
4168   iwrc rc = _to_effective_key(lx->db, key, &lx->ekey, lx->nbuf);
4169   RCRET(rc);
4170 
4171   API_DB_RLOCK(lx->db, rci);
4172   lx->key = &lx->ekey;
4173   rc = _cursor_to_lr(cur, op);
4174   API_DB_UNLOCK(lx->db, rci, rc);
4175   return rc;
4176 }
4177 
iwkv_cursor_get(IWKV_cursor cur,IWKV_val * okey,IWKV_val * oval)4178 iwrc iwkv_cursor_get(IWKV_cursor cur,
4179                      IWKV_val *okey,   /* Nullable */
4180                      IWKV_val *oval) { /* Nullable */
4181   int rci;
4182   iwrc rc = 0;
4183   if (!cur || !cur->lx.db) {
4184     return IW_ERROR_INVALID_ARGS;
4185   }
4186   if (!cur->cn || (cur->cn->flags & SBLK_DB) || cur->cnpos >= cur->cn->pnum) {
4187     return IWKV_ERROR_NOTFOUND;
4188   }
4189   IWLCTX *lx = &cur->lx;
4190   API_DB_RLOCK(lx->db, rci);
4191   uint8_t *mm = 0;
4192   IWFS_FSM *fsm = &lx->db->iwkv->fsm;
4193   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
4194   RCGO(rc, finish);
4195   if (!cur->cn->kvblk) {
4196     rc = _sblk_loadkvblk_mm(lx, cur->cn, mm);
4197     RCGO(rc, finish);
4198   }
4199   uint8_t idx = cur->cn->pi[cur->cnpos];
4200   if (okey && oval) {
4201     rc = _kvblk_kv_get(cur->cn->kvblk, mm, idx, okey, oval);
4202   } else if (oval) {
4203     rc = _kvblk_value_get(cur->cn->kvblk, mm, idx, oval);
4204   } else if (okey) {
4205     rc = _kvblk_key_get(cur->cn->kvblk, mm, idx, okey);
4206   } else {
4207     rc = IW_ERROR_INVALID_ARGS;
4208   }
4209   if (!rc && okey) {
4210     _unpack_effective_key(lx->db, okey, false);
4211   }
4212 finish:
4213   if (mm) {
4214     fsm->release_mmap(fsm);
4215   }
4216   API_DB_UNLOCK(lx->db, rci, rc);
4217   return rc;
4218 }
4219 
iwkv_cursor_copy_val(IWKV_cursor cur,void * vbuf,size_t vbufsz,size_t * vsz)4220 iwrc iwkv_cursor_copy_val(IWKV_cursor cur, void *vbuf, size_t vbufsz, size_t *vsz) {
4221   int rci;
4222   iwrc rc = 0;
4223   if (!cur || !vbuf || !cur->lx.db) {
4224     return IW_ERROR_INVALID_ARGS;
4225   }
4226   if (!cur->cn || (cur->cn->flags & SBLK_DB) || cur->cnpos >= cur->cn->pnum) {
4227     return IWKV_ERROR_NOTFOUND;
4228   }
4229 
4230   *vsz = 0;
4231   IWLCTX *lx = &cur->lx;
4232   API_DB_RLOCK(lx->db, rci);
4233   uint8_t *mm = 0, *oval;
4234   uint32_t ovalsz;
4235   IWFS_FSM *fsm = &lx->db->iwkv->fsm;
4236   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
4237   RCGO(rc, finish);
4238   if (!cur->cn->kvblk) {
4239     rc = _sblk_loadkvblk_mm(lx, cur->cn, mm);
4240     RCGO(rc, finish);
4241   }
4242   uint8_t idx = cur->cn->pi[cur->cnpos];
4243   _kvblk_value_peek(cur->cn->kvblk, idx, mm, &oval, &ovalsz);
4244   *vsz = ovalsz;
4245   memcpy(vbuf, oval, MIN(vbufsz, ovalsz));
4246 
4247 finish:
4248   if (mm) {
4249     fsm->release_mmap(fsm);
4250   }
4251   API_DB_UNLOCK(lx->db, rci, rc);
4252   return rc;
4253 }
4254 
iwkv_cursor_is_matched_key(IWKV_cursor cur,const IWKV_val * key,bool * ores,int64_t * ocompound)4255 iwrc iwkv_cursor_is_matched_key(IWKV_cursor cur, const IWKV_val *key, bool *ores, int64_t *ocompound) {
4256   int rci;
4257   iwrc rc = 0;
4258   if (!cur || !ores || !key || !cur->lx.db) {
4259     return IW_ERROR_INVALID_ARGS;
4260   }
4261   if (!cur->cn || (cur->cn->flags & SBLK_DB) || cur->cnpos >= cur->cn->pnum) {
4262     return IWKV_ERROR_NOTFOUND;
4263   }
4264 
4265   *ores = 0;
4266   if (ocompound) *ocompound = 0;
4267 
4268   IWLCTX *lx = &cur->lx;
4269   API_DB_RLOCK(lx->db, rci);
4270   uint8_t *mm = 0, *okey;
4271   uint32_t okeysz;
4272   iwdb_flags_t dbflg = lx->db->dbflg;
4273   IWFS_FSM *fsm = &lx->db->iwkv->fsm;
4274   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
4275   RCGO(rc, finish);
4276   if (!cur->cn->kvblk) {
4277     rc = _sblk_loadkvblk_mm(lx, cur->cn, mm);
4278     RCGO(rc, finish);
4279   }
4280 
4281   uint8_t idx = cur->cn->pi[cur->cnpos];
4282   rc = _kvblk_key_peek(cur->cn->kvblk, idx, mm, &okey, &okeysz);
4283   RCGO(rc, finish);
4284 
4285   if (dbflg & (IWDB_COMPOUND_KEYS | IWDB_VNUM64_KEYS)) {
4286     char nbuf[2 * IW_VNUMBUFSZ];
4287     IWKV_val rkey = {.data = nbuf, .size = okeysz};
4288     memcpy(rkey.data, okey, MIN(rkey.size, sizeof(nbuf)));
4289     rc = _unpack_effective_key(lx->db, &rkey, true);
4290     RCGO(rc, finish);
4291     if (ocompound) {
4292       *ocompound = rkey.compound;
4293     }
4294     if (rkey.size != key->size) {
4295       *ores = false;
4296       goto finish;
4297     }
4298     if (dbflg & IWDB_VNUM64_KEYS) {
4299       *ores = !memcmp(rkey.data, key->data, key->size);
4300     } else {
4301       *ores = !memcmp(okey + (okeysz - rkey.size), key->data, key->size);
4302     }
4303   } else {
4304     *ores = (okeysz == key->size) && !memcmp(okey, key->data, key->size);
4305   }
4306 
4307 finish:
4308   if (mm) {
4309     fsm->release_mmap(fsm);
4310   }
4311   API_DB_UNLOCK(cur->lx.db, rci, rc);
4312   return rc;
4313 }
4314 
iwkv_cursor_copy_key(IWKV_cursor cur,void * kbuf,size_t kbufsz,size_t * ksz,int64_t * compound)4315 iwrc iwkv_cursor_copy_key(IWKV_cursor cur, void *kbuf, size_t kbufsz, size_t *ksz, int64_t *compound) {
4316   int rci;
4317   iwrc rc = 0;
4318   if (!cur || !cur->lx.db) {
4319     return IW_ERROR_INVALID_ARGS;
4320   }
4321   if (!cur->cn || (cur->cn->flags & SBLK_DB) || cur->cnpos >= cur->cn->pnum) {
4322     return IWKV_ERROR_NOTFOUND;
4323   }
4324 
4325   *ksz = 0;
4326   IWLCTX *lx = &cur->lx;
4327   API_DB_RLOCK(lx->db, rci);
4328   uint8_t *mm = 0, *okey;
4329   uint32_t okeysz;
4330   iwdb_flags_t dbflg = lx->db->dbflg;
4331   IWFS_FSM *fsm = &lx->db->iwkv->fsm;
4332   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
4333   RCGO(rc, finish);
4334   if (!cur->cn->kvblk) {
4335     rc = _sblk_loadkvblk_mm(lx, cur->cn, mm);
4336     RCGO(rc, finish);
4337   }
4338 
4339   uint8_t idx = cur->cn->pi[cur->cnpos];
4340   rc = _kvblk_key_peek(cur->cn->kvblk, idx, mm, &okey, &okeysz);
4341   RCGO(rc, finish);
4342 
4343   if (dbflg & (IWDB_COMPOUND_KEYS | IWDB_VNUM64_KEYS)) {
4344     char nbuf[2 * IW_VNUMBUFSZ];
4345     IWKV_val rkey = {.data = nbuf, .size = okeysz};
4346     memcpy(rkey.data, okey, MIN(rkey.size, sizeof(nbuf)));
4347     rc = _unpack_effective_key(lx->db, &rkey, true);
4348     RCGO(rc, finish);
4349     if (compound) {
4350       *compound = rkey.compound;
4351     }
4352     *ksz = rkey.size;
4353     if (dbflg & IWDB_VNUM64_KEYS) {
4354       memcpy(kbuf, rkey.data, MIN(kbufsz, rkey.size));
4355     } else {
4356       memcpy(kbuf, okey + (okeysz - rkey.size), MIN(kbufsz, rkey.size));
4357     }
4358   } else {
4359     *ksz = okeysz;
4360     if (compound) *compound = 0;
4361     memcpy(kbuf, okey, MIN(kbufsz, okeysz));
4362   }
4363 
4364 finish:
4365   if (mm) {
4366     fsm->release_mmap(fsm);
4367   }
4368   API_DB_UNLOCK(cur->lx.db, rci, rc);
4369   return rc;
4370 }
4371 
iwkv_cursor_seth(IWKV_cursor cur,IWKV_val * val,iwkv_opflags opflags,IWKV_PUT_HANDLER ph,void * phop)4372 IW_EXPORT iwrc iwkv_cursor_seth(IWKV_cursor cur, IWKV_val *val, iwkv_opflags opflags,
4373                                 IWKV_PUT_HANDLER ph, void *phop) {
4374   int rci;
4375   iwrc rc = 0, irc = 0;
4376   if (!cur || !cur->lx.db) {
4377     return IW_ERROR_INVALID_ARGS;
4378   }
4379   if (!cur->cn || (cur->cn->flags & SBLK_DB) || cur->cnpos >= cur->cn->pnum) {
4380     return IWKV_ERROR_NOTFOUND;
4381   }
4382 
4383   IWLCTX *lx = &cur->lx;
4384   IWDB db = lx->db;
4385   IWKV iwkv = db->iwkv;
4386   SBLK *sblk = cur->cn;
4387 
4388   API_DB_WLOCK(db, rci);
4389   if (ph) {
4390     uint8_t *mm;
4391     IWKV_val key, oldval;
4392     IWFS_FSM *fsm = &db->iwkv->fsm;
4393     rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
4394     RCGO(rc, finish);
4395     rc = _kvblk_kv_get(sblk->kvblk, mm, sblk->pi[cur->cnpos], &key, &oldval);
4396     fsm->release_mmap(fsm);
4397     if (!rc) {
4398       // note: oldval should be disposed by ph
4399       rc = ph(&key, val, &oldval, phop);
4400       _kv_val_dispose(&key);
4401     }
4402     RCGO(rc, finish);
4403   }
4404 
4405   rc = _sblk_updatekv(sblk, cur->cnpos, 0, val);
4406   if (IWKV_IS_INTERNAL_RC(rc)) {
4407     irc = rc;
4408     rc = 0;
4409   }
4410   RCGO(rc, finish);
4411 
4412   rc = _sblk_sync(lx, sblk);
4413   RCGO(rc, finish);
4414 
4415   // Update active cursors inside this block
4416   pthread_spin_lock(&db->cursors_slk);
4417   for (IWKV_cursor c = db->cursors; c; c = c->next) {
4418     if (c->cn && c->cn->addr == sblk->addr) {
4419       if (c->cn != sblk) {
4420         memcpy(c->cn, sblk, sizeof(*c->cn));
4421         c->cn->kvblk = 0;
4422         c->cn->flags &= SBLK_PERSISTENT_FLAGS;
4423       }
4424     }
4425   }
4426   pthread_spin_unlock(&db->cursors_slk);
4427 
4428 finish:
4429   API_DB_UNLOCK(db, rci, rc);
4430   if (!rc) {
4431     if (opflags & IWKV_SYNC) {
4432       rc = _iwkv_sync(iwkv, 0);
4433     } else {
4434       rc = iwal_poke_checkpoint(iwkv, false);
4435     }
4436   }
4437   return rc ? rc : irc;
4438 }
4439 
iwkv_cursor_set(IWKV_cursor cur,IWKV_val * val,iwkv_opflags opflags)4440 iwrc iwkv_cursor_set(IWKV_cursor cur, IWKV_val *val, iwkv_opflags opflags) {
4441   return iwkv_cursor_seth(cur, val, opflags, 0, 0);
4442 }
4443 
iwkv_cursor_val(IWKV_cursor cur,IWKV_val * oval)4444 iwrc iwkv_cursor_val(IWKV_cursor cur, IWKV_val *oval) {
4445   return iwkv_cursor_get(cur, 0, oval);
4446 }
4447 
iwkv_cursor_key(IWKV_cursor cur,IWKV_val * okey)4448 iwrc iwkv_cursor_key(IWKV_cursor cur, IWKV_val *okey) {
4449   return iwkv_cursor_get(cur, okey, 0);
4450 }
4451 
iwkv_cursor_del(IWKV_cursor cur,iwkv_opflags opflags)4452 iwrc iwkv_cursor_del(IWKV_cursor cur, iwkv_opflags opflags) {
4453   int rci;
4454   iwrc rc = 0;
4455   if (!cur || !cur->lx.db) {
4456     return IW_ERROR_INVALID_ARGS;
4457   }
4458   if (!cur->cn || (cur->cn->flags & SBLK_DB) || cur->cnpos >= cur->cn->pnum) {
4459     return IWKV_ERROR_NOTFOUND;
4460   }
4461 
4462   uint8_t *mm;
4463   SBLK *sblk = cur->cn;
4464   IWLCTX *lx = &cur->lx;
4465   IWDB db = lx->db;
4466   IWKV iwkv = db->iwkv;
4467   IWFS_FSM *fsm = &iwkv->fsm;
4468 
4469   API_DB_WLOCK(db, rci);
4470   if (!db->cache.open) {
4471     rc = _dbcache_fill_lw(lx);
4472     RCGO(rc, finish);
4473   }
4474   if (sblk->pnum == 1) { // sblk will be removed
4475     IWKV_val key = {0};
4476     // Key a key
4477     rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
4478     RCGO(rc, finish2);
4479     if (!sblk->kvblk) {
4480       rc = _sblk_loadkvblk_mm(lx, sblk, mm);
4481       fsm->release_mmap(fsm);
4482       RCGO(rc, finish2);
4483     }
4484     rc = _kvblk_key_get(sblk->kvblk, mm, sblk->pi[cur->cnpos], &key);
4485     fsm->release_mmap(fsm);
4486     RCGO(rc, finish2);
4487 
4488     lx->key = &key;
4489     rc = _lx_del_sblk_lw(lx, sblk, cur->cnpos);
4490     lx->key = 0;
4491 
4492 finish2:
4493     if (rc) {
4494       _lx_release_mm(lx, 0);
4495     } else {
4496       rc = _lx_release(lx);
4497     }
4498     if (key.data) {
4499       _kv_val_dispose(&key);
4500     }
4501   } else { // Simple case
4502     if (!sblk->kvblk) {
4503       rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
4504       RCGO(rc, finish);
4505       rc = _sblk_loadkvblk_mm(lx, sblk, mm);
4506       fsm->release_mmap(fsm);
4507       RCGO(rc, finish);
4508     }
4509     rc = _sblk_rmkv(sblk, cur->cnpos);
4510     RCGO(rc, finish);
4511     rc = _sblk_sync(lx, sblk);
4512   }
4513 
4514 finish:
4515   API_DB_UNLOCK(db, rci, rc);
4516   if (!rc) {
4517     if (opflags & IWKV_SYNC) {
4518       rc = _iwkv_sync(iwkv, 0);
4519     } else {
4520       rc = iwal_poke_checkpoint(iwkv, false);
4521     }
4522   }
4523   return rc;
4524 }
4525 
4526 #include "./dbg/iwkvdbg.c"
4527