• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // -V::512
2 
3 #include "iwkv_internal.h"
4 #include "iwconv.h"
5 #include <stdalign.h>
6 
7 #define _wnw_db_wl(db_) _api_db_wlock(db_)
8 
9 #ifdef IW_TESTS
10 volatile int8_t iwkv_next_level = -1;
11 #endif
12 atomic_uint_fast64_t g_trigger;
13 
14 #define IWKV_IS_INTERNAL_RC(rc_) ((rc_) > _IWKV_ERROR_END && (rc_) < _IWKV_RC_END)
15 
_to_effective_key(struct _IWDB * db,const IWKV_val * key,IWKV_val * okey,uint8_t nbuf[static IW_VNUMBUFSZ])16 IW_SOFT_INLINE iwrc _to_effective_key(
17   struct _IWDB *db, const IWKV_val *key, IWKV_val *okey,
18   uint8_t nbuf[static IW_VNUMBUFSZ]
19   ) {
20   static_assert(IW_VNUMBUFSZ >= sizeof(uint64_t), "IW_VNUMBUFSZ >= sizeof(uint64_t)");
21   iwdb_flags_t dbflg = db->dbflg;
22   // Keys compound will be processed at lower levels at `addkv` routines
23   okey->compound = key->compound;
24   if (dbflg & IWDB_VNUM64_KEYS) {
25     unsigned len;
26     if (key->size == 8) {
27       uint64_t llv;
28       memcpy(&llv, key->data, sizeof(llv));
29       IW_SETVNUMBUF64(len, nbuf, llv);
30       if (!len) {
31         return IW_ERROR_OVERFLOW;
32       }
33       okey->size = len;
34       okey->data = nbuf;
35     } else if (key->size == 4) {
36       uint32_t lv;
37       memcpy(&lv, key->data, sizeof(lv));
38       IW_SETVNUMBUF(len, nbuf, lv);
39       if (!len) {
40         return IW_ERROR_OVERFLOW;
41       }
42       okey->size = len;
43       okey->data = nbuf;
44     } else {
45       return IWKV_ERROR_KEY_NUM_VALUE_SIZE;
46     }
47   } else {
48     okey->data = key->data;
49     okey->size = key->size;
50   }
51   return 0;
52 }
53 
54 // NOTE: at least `2*IW_VNUMBUFSZ` must be allocated for key->data
_unpack_effective_key(struct _IWDB * db,IWKV_val * key,bool no_move_key_data)55 static iwrc _unpack_effective_key(struct _IWDB *db, IWKV_val *key, bool no_move_key_data) {
56   iwdb_flags_t dbflg = db->dbflg;
57   uint8_t *data = key->data;
58   if (dbflg & IWDB_COMPOUND_KEYS) {
59     int step;
60     IW_READVNUMBUF64(key->data, key->compound, step);
61     if (step >= key->size) {
62       return IWKV_ERROR_KEY_NUM_VALUE_SIZE;
63     }
64     data += step;
65     key->size -= step;
66     if (!no_move_key_data && !(dbflg & IWDB_VNUM64_KEYS)) {
67       memmove(key->data, data, key->size);
68     }
69   } else {
70     key->compound = 0;
71   }
72   if (dbflg & IWDB_VNUM64_KEYS) {
73     int64_t llv;
74     char nbuf[IW_VNUMBUFSZ];
75     if (key->size > IW_VNUMBUFSZ) {
76       return IWKV_ERROR_KEY_NUM_VALUE_SIZE;
77     }
78     memcpy(nbuf, data, key->size);
79     IW_READVNUMBUF64_2(nbuf, llv);
80     memcpy(key->data, &llv, sizeof(llv));
81     key->size = sizeof(llv);
82   }
83   return 0;
84 }
85 
_cmp_keys_prefix(iwdb_flags_t dbflg,const void * v1,int v1len,const IWKV_val * key)86 static int _cmp_keys_prefix(iwdb_flags_t dbflg, const void *v1, int v1len, const IWKV_val *key) {
87   int ret;
88   if (dbflg & IWDB_COMPOUND_KEYS) {
89     // Compound keys mode
90     const char *u1 = v1;
91     const char *u2 = key->data;
92     int step, v2len = (int) key->size;
93     int64_t c1, c2 = key->compound;
94     IW_READVNUMBUF64(v1, c1, step);
95     v1len -= step;
96     u1 += step;
97     if (v1len < 1) {
98       // Inconsistent data?
99       return v2len - v1len;
100     }
101     if (dbflg & IWDB_VNUM64_KEYS) {
102       if ((v2len != v1len) || (v2len > IW_VNUMBUFSZ) || (v1len > IW_VNUMBUFSZ)) {
103         return v2len - v1len;
104       }
105       int64_t n1, n2;
106       char vbuf[IW_VNUMBUFSZ];
107       memcpy(vbuf, u1, v1len);
108       IW_READVNUMBUF64_2(vbuf, n1);
109       memcpy(vbuf, u2, v2len);
110       IW_READVNUMBUF64_2(vbuf, n2);
111       ret = n1 > n2 ? -1 : n1 < n2 ? 1 : 0;
112       if (ret == 0) {
113         ret = c1 > c2 ? -1 : c1 < c2 ? 1 : 0;
114       }
115     } else if (dbflg & IWDB_REALNUM_KEYS) {
116       ret = iwafcmp(u2, v2len, u1, v1len);
117       if (ret == 0) {
118         ret = c1 > c2 ? -1 : c1 < c2 ? 1 : 0;
119       }
120     } else {
121       IW_CMP2(ret, u2, v2len, u1, v1len);
122     }
123     return ret;
124   } else {
125     int v2len = (int) key->size;
126     const void *v2 = key->data;
127     if (dbflg & IWDB_VNUM64_KEYS) {
128       if ((v2len != v1len) || (v2len > IW_VNUMBUFSZ) || (v1len > IW_VNUMBUFSZ)) {
129         return v2len - v1len;
130       }
131       int64_t n1, n2;
132       char vbuf[IW_VNUMBUFSZ];
133       memcpy(vbuf, v1, v1len);
134       IW_READVNUMBUF64_2(vbuf, n1);
135       memcpy(vbuf, v2, v2len);
136       IW_READVNUMBUF64_2(vbuf, n2);
137       return n1 > n2 ? -1 : n1 < n2 ? 1 : 0;
138     } else if (dbflg & IWDB_REALNUM_KEYS) {
139       return iwafcmp(v2, v2len, v1, v1len);
140     } else {
141       IW_CMP2(ret, v2, v2len, v1, v1len);
142       return ret;
143     }
144   }
145 }
146 
_cmp_keys(iwdb_flags_t dbflg,const void * v1,int v1len,const IWKV_val * key)147 IW_INLINE int _cmp_keys(iwdb_flags_t dbflg, const void *v1, int v1len, const IWKV_val *key) {
148   int rv = _cmp_keys_prefix(dbflg, v1, v1len, key);
149   if ((rv == 0) && !(dbflg & (IWDB_VNUM64_KEYS | IWDB_REALNUM_KEYS))) {
150     if (dbflg & IWDB_COMPOUND_KEYS) {
151       int step;
152       int64_t c1, c2 = key->compound;
153       IW_READVNUMBUF64(v1, c1, step);
154       v1len -= step;
155       if ((int) key->size == v1len) {
156         return c1 > c2 ? -1 : c1 < c2 ? 1 : 0;
157       }
158     }
159     return (int) key->size - v1len;
160   } else {
161     return rv;
162   }
163 }
164 
_kv_val_dispose(IWKV_val * v)165 IW_INLINE void _kv_val_dispose(IWKV_val *v) {
166   if (v) {
167     free(v->data);
168     v->size = 0;
169     v->data = 0;
170   }
171 }
172 
_kv_dispose(IWKV_val * key,IWKV_val * val)173 IW_INLINE void _kv_dispose(IWKV_val *key, IWKV_val *val) {
174   _kv_val_dispose(key);
175   _kv_val_dispose(val);
176 }
177 
iwkv_val_dispose(IWKV_val * v)178 void iwkv_val_dispose(IWKV_val *v) {
179   _kv_val_dispose(v);
180 }
181 
iwkv_kv_dispose(IWKV_val * key,IWKV_val * val)182 void iwkv_kv_dispose(IWKV_val *key, IWKV_val *val) {
183   _kv_dispose(key, val);
184 }
185 
_num2lebuf(uint8_t buf[static8],void * numdata,size_t sz)186 IW_INLINE void _num2lebuf(uint8_t buf[static 8], void *numdata, size_t sz) {
187   assert(sz == 4 || sz == 8);
188   if (sz > 4) {
189     uint64_t llv;
190     memcpy(&llv, numdata, sizeof(llv));
191     llv = IW_HTOILL(llv);
192     memcpy(buf, &llv, sizeof(llv));
193   } else {
194     uint32_t lv;
195     memcpy(&lv, numdata, sizeof(lv));
196     lv = IW_HTOIL(lv);
197     memcpy(buf, &lv, sizeof(lv));
198   }
199 }
200 
201 //-------------------------- IWKV/IWDB WORKERS
202 
_iwkv_worker_inc_nolk(IWKV iwkv)203 static WUR iwrc _iwkv_worker_inc_nolk(IWKV iwkv) {
204   if (!iwkv || !iwkv->open) {
205     return IW_ERROR_INVALID_STATE;
206   }
207   int rci = pthread_mutex_lock(&iwkv->wk_mtx);
208   if (rci) {
209     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
210   }
211   if (!iwkv->open) { // -V547
212     pthread_mutex_unlock(&iwkv->wk_mtx);
213     return IW_ERROR_INVALID_STATE;
214   }
215   while (iwkv->wk_pending_exclusive) {
216     pthread_cond_wait(&iwkv->wk_cond, &iwkv->wk_mtx);
217   }
218   ++iwkv->wk_count;
219   pthread_cond_broadcast(&iwkv->wk_cond);
220   pthread_mutex_unlock(&iwkv->wk_mtx);
221   return 0;
222 }
223 
_db_worker_inc_nolk(IWDB db)224 static WUR iwrc _db_worker_inc_nolk(IWDB db) {
225   if (!db || !db->iwkv || !db->iwkv->open || !db->open) {
226     return IW_ERROR_INVALID_STATE;
227   }
228   IWKV iwkv = db->iwkv;
229   int rci = pthread_mutex_lock(&iwkv->wk_mtx);
230   if (rci) {
231     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
232   }
233   if (!iwkv->open || !db->open) { // -V560
234     pthread_mutex_unlock(&iwkv->wk_mtx);
235     return IW_ERROR_INVALID_STATE;
236   }
237   while (db->wk_pending_exclusive) {
238     pthread_cond_wait(&iwkv->wk_cond, &iwkv->wk_mtx);
239   }
240   ++iwkv->wk_count;
241   ++db->wk_count;
242   pthread_cond_broadcast(&iwkv->wk_cond);
243   pthread_mutex_unlock(&iwkv->wk_mtx);
244   return 0;
245 }
246 
_iwkv_worker_dec_nolk(IWKV iwkv)247 static iwrc _iwkv_worker_dec_nolk(IWKV iwkv) {
248   if (!iwkv) {
249     return IW_ERROR_INVALID_STATE;
250   }
251   int rci = pthread_mutex_lock(&iwkv->wk_mtx);
252   if (rci) {
253     // Last chanсe to be consistent
254     --iwkv->wk_count;
255     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
256   }
257   --iwkv->wk_count;
258   pthread_cond_broadcast(&iwkv->wk_cond);
259   pthread_mutex_unlock(&iwkv->wk_mtx);
260   return 0;
261 }
262 
_db_worker_dec_nolk(IWDB db)263 static iwrc _db_worker_dec_nolk(IWDB db) {
264   if (!db || !db->iwkv) { // do not use ENSURE_OPEN_DB here
265     return IW_ERROR_INVALID_STATE;
266   }
267   IWKV iwkv = db->iwkv;
268   int rci = pthread_mutex_lock(&iwkv->wk_mtx);
269   if (rci) {
270     // Last chanсe to be consistent
271     --iwkv->wk_count;
272     --db->wk_count;
273     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
274   }
275   --iwkv->wk_count;
276   --db->wk_count;
277   pthread_cond_broadcast(&iwkv->wk_cond);
278   pthread_mutex_unlock(&iwkv->wk_mtx);
279   return 0;
280 }
281 
_wnw_iwkw_wl(IWKV iwkv)282 static WUR iwrc _wnw_iwkw_wl(IWKV iwkv) {
283   int rci = pthread_rwlock_wrlock(&iwkv->rwl);
284   if (rci) {
285     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
286   }
287   return 0;
288 }
289 
_wnw(IWKV iwkv,iwrc (* after)(IWKV iwkv))290 static WUR iwrc _wnw(IWKV iwkv, iwrc (*after)(IWKV iwkv)) {
291   iwrc rc = 0;
292   int rci = pthread_mutex_lock(&iwkv->wk_mtx);
293   if (rci) {
294     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
295   }
296   iwkv->wk_pending_exclusive = true;
297   while (iwkv->wk_count > 0) {
298     pthread_cond_wait(&iwkv->wk_cond, &iwkv->wk_mtx);
299   }
300   if (after) {
301     rc = after(iwkv);
302   }
303   iwkv->wk_pending_exclusive = false;
304   pthread_cond_broadcast(&iwkv->wk_cond);
305   rci = pthread_mutex_unlock(&iwkv->wk_mtx);
306   if (rci) {
307     IWRC(iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci), rc);
308   }
309   return rc;
310 }
311 
_wnw_db(IWDB db,iwrc (* after)(IWDB db))312 static WUR iwrc _wnw_db(IWDB db, iwrc (*after)(IWDB db)) {
313   iwrc rc = 0;
314   IWKV iwkv = db->iwkv;
315   int rci = pthread_mutex_lock(&iwkv->wk_mtx);
316   if (rci) {
317     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
318   }
319   db->wk_pending_exclusive = true;
320   while (db->wk_count > 0) {
321     pthread_cond_wait(&iwkv->wk_cond, &iwkv->wk_mtx);
322   }
323   if (after) {
324     rc = after(db);
325   }
326   db->wk_pending_exclusive = false;
327   pthread_cond_broadcast(&iwkv->wk_cond);
328   rci = pthread_mutex_unlock(&iwkv->wk_mtx);
329   if (rci) {
330     IWRC(iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci), rc);
331   }
332   return rc;
333 }
334 
335 //--------------------------  DB
336 
_db_at(IWKV iwkv,IWDB * dbp,off_t addr,uint8_t * mm)337 static WUR iwrc _db_at(IWKV iwkv, IWDB *dbp, off_t addr, uint8_t *mm) {
338   iwrc rc = 0;
339   uint8_t *rp, bv;
340   uint32_t lv;
341   int rci;
342   IWDB db = calloc(1, sizeof(struct _IWDB));
343   *dbp = 0;
344   if (!db) {
345     return iwrc_set_errno(IW_ERROR_ALLOC, errno);
346   }
347   pthread_rwlockattr_t attr;
348   pthread_rwlockattr_init(&attr);
349 #if defined __linux__ && (defined __USE_UNIX98 || defined __USE_XOPEN2K)
350   pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
351 #endif
352   rci = pthread_rwlock_init(&db->rwl, &attr);
353   if (rci) {
354     free(db);
355     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
356   }
357   rci = pthread_spin_init(&db->cursors_slk, 0);
358   if (rci) {
359     pthread_rwlock_destroy(&db->rwl);
360     free(db);
361     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
362   }
363   // [magic:u4,dbflg:u1,dbid:u4,next_db_blk:u4,p0:u4,n[24]:u4,c[24]:u4,meta_blk:u4,meta_blkn:u4]:217
364   db->flags = SBLK_DB;
365   db->addr = addr;
366   db->db = db;
367   db->iwkv = iwkv;
368   rp = mm + addr;
369   IW_READLV(rp, lv, lv);
370   if (lv != IWDB_MAGIC) {
371     rc = IWKV_ERROR_CORRUPTED;
372     iwlog_ecode_error3(rc);
373     goto finish;
374   }
375   IW_READBV(rp, bv, db->dbflg);
376   IW_READLV(rp, lv, db->id);
377   IW_READLV(rp, lv, db->next_db_addr);
378   db->next_db_addr = BLK2ADDR(db->next_db_addr); // blknum -> addr
379   rp = mm + addr + DOFF_C0_U4;
380   for (int i = 0; i < SLEVELS; ++i) {
381     IW_READLV(rp, lv, db->lcnt[i]);
382   }
383   if (iwkv->fmt_version >= 1) {
384     IW_READLV(rp, lv, db->meta_blk);
385     IW_READLV(rp, lv, db->meta_blkn);
386   }
387   db->open = true;
388   *dbp = db;
389 
390 finish:
391   if (rc) {
392     pthread_rwlock_destroy(&db->rwl);
393     free(db);
394   }
395   return rc;
396 }
397 
_db_save(IWDB db,bool newdb,uint8_t * mm)398 static WUR iwrc _db_save(IWDB db, bool newdb, uint8_t *mm) {
399   iwrc rc = 0;
400   uint32_t lv;
401   uint8_t *wp = mm + db->addr, bv;
402   uint8_t *sp = wp;
403   IWDLSNR *dlsnr = db->iwkv->dlsnr;
404   db->next_db_addr = db->next ? db->next->addr : 0;
405   // [magic:u4,dbflg:u1,dbid:u4,next_db_blk:u4,p0:u4,n[24]:u4,c[24]:u4,meta_blk:u4,meta_blkn:u4]:217
406   IW_WRITELV(wp, lv, IWDB_MAGIC);
407   IW_WRITEBV(wp, bv, db->dbflg);
408   IW_WRITELV(wp, lv, db->id);
409   IW_WRITELV(wp, lv, ADDR2BLK(db->next_db_addr));
410   if (dlsnr) {
411     rc = dlsnr->onwrite(dlsnr, db->addr, sp, wp - sp, 0);
412     RCRET(rc);
413   }
414   if (db->iwkv->fmt_version >= 1) {
415     if (newdb) {
416       memset(wp, 0, 4 + SLEVELS * 4 * 2); // p0 + n[24] + c[24]
417       sp = wp;
418       wp += 4 + SLEVELS * 4 * 2; // set to zero
419     } else {
420       wp += 4 + SLEVELS * 4 * 2; // skip
421       sp = wp;
422     }
423     IW_WRITELV(wp, lv, db->meta_blk);
424     IW_WRITELV(wp, lv, db->meta_blkn);
425     if (dlsnr) {
426       rc = dlsnr->onwrite(dlsnr, sp - mm, sp, wp - sp, 0);
427     }
428   }
429   return rc;
430 }
431 
_db_load_chain(IWKV iwkv,off_t addr,uint8_t * mm)432 static WUR iwrc _db_load_chain(IWKV iwkv, off_t addr, uint8_t *mm) {
433   iwrc rc;
434   IWDB db = 0, ndb;
435   if (!addr) {
436     return 0;
437   }
438   do {
439     rc = _db_at(iwkv, &ndb, addr, mm);
440     RCRET(rc);
441 
442     if (db) {
443       db->next = ndb;
444       ndb->prev = db;
445     } else {
446       iwkv->first_db = ndb;
447     }
448     db = ndb;
449     addr = db->next_db_addr;
450 
451     rc = iwhmap_put_u32(iwkv->dbs, db->id, db);
452     RCRET(rc);
453 
454     iwkv->last_db = db;
455   } while (db->next_db_addr);
456 
457   return 0;
458 }
459 
_db_release_lw(IWDB * dbp)460 static void _db_release_lw(IWDB *dbp) {
461   assert(dbp && *dbp);
462   IWDB db = *dbp;
463   pthread_rwlock_destroy(&db->rwl);
464   pthread_spin_destroy(&db->cursors_slk);
465   free(db);
466   *dbp = 0;
467 }
468 
469 typedef struct DISPOSE_DB_CTX {
470   IWKV   iwkv;
471   IWDB   db;
472   blkn_t sbn; // First `SBLK` block in DB
473 } DISPOSE_DB_CTX;
474 
_db_dispose_chain(DISPOSE_DB_CTX * dctx)475 static iwrc _db_dispose_chain(DISPOSE_DB_CTX *dctx) {
476   iwrc rc = 0;
477   uint8_t *mm, kvszpow;
478   IWFS_FSM *fsm = &dctx->iwkv->fsm;
479   blkn_t sbn = dctx->sbn, kvblkn;
480   off_t page = 0;
481 
482   while (sbn) {
483     off_t sba = BLK2ADDR(sbn);
484     rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
485     RCBREAK(rc);
486     memcpy(&kvblkn, mm + sba + SOFF_KBLK_U4, 4);
487     kvblkn = IW_ITOHL(kvblkn);
488     memcpy(&sbn, mm + sba + SOFF_N0_U4, 4);
489     sbn = IW_ITOHL(sbn);
490     if (kvblkn) {
491       memcpy(&kvszpow, mm + BLK2ADDR(kvblkn) + KBLK_SZPOW_OFF, 1);
492     }
493     if (dctx->iwkv->fmt_version > 1) {
494       uint8_t bpos;
495       memcpy(&bpos, mm + sba + SOFF_BPOS_U1_V2, 1);
496       rc = fsm->release_mmap(fsm);
497       RCBREAK(rc);
498       if ((bpos > 0) && (bpos <= SBLK_PAGE_SBLK_NUM_V2)) {
499         off_t npage = sba - (bpos - 1) * SBLK_SZ;
500         if (npage != page) {
501           if (page) {
502             if (!fsm->check_allocation_status(fsm, page, SBLK_PAGE_SZ_V2, true)) {
503               rc = fsm->deallocate(fsm, page, SBLK_PAGE_SZ_V2);
504             }
505             RCBREAK(rc);
506           }
507           page = npage;
508         }
509       }
510     } else {
511       rc = fsm->release_mmap(fsm);
512       RCBREAK(rc);
513       // Deallocate `SBLK`
514       rc = fsm->deallocate(fsm, sba, SBLK_SZ);
515       RCBREAK(rc);
516     }
517     // Deallocate `KVBLK`
518     if (kvblkn) {
519       rc = fsm->deallocate(fsm, BLK2ADDR(kvblkn), 1ULL << kvszpow);
520       RCBREAK(rc);
521     }
522   }
523   if (page) {
524     if (!fsm->check_allocation_status(fsm, page, SBLK_PAGE_SZ_V2, true)) {
525       IWRC(fsm->deallocate(fsm, page, SBLK_PAGE_SZ_V2), rc);
526     }
527   }
528   _db_release_lw(&dctx->db);
529   return rc;
530 }
531 
_db_destroy_lw(IWDB * dbp)532 static WUR iwrc _db_destroy_lw(IWDB *dbp) {
533   iwrc rc;
534   uint8_t *mm;
535   IWDB db = *dbp;
536   IWKV iwkv = db->iwkv;
537   IWDB prev = db->prev;
538   IWDB next = db->next;
539   IWFS_FSM *fsm = &iwkv->fsm;
540   uint32_t first_sblkn;
541 
542   if (!iwhmap_get_u32(iwkv->dbs, db->id)) {
543     iwlog_ecode_error3(IW_ERROR_INVALID_STATE);
544     return IW_ERROR_INVALID_STATE;
545   }
546   iwhmap_remove_u32(iwkv->dbs, db->id);
547 
548   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
549   RCRET(rc);
550   if (prev) {
551     prev->next = next;
552     rc = _db_save(prev, false, mm);
553     if (rc) {
554       fsm->release_mmap(fsm);
555       return rc;
556     }
557   }
558   if (next) {
559     next->prev = prev;
560     rc = _db_save(next, false, mm);
561     if (rc) {
562       fsm->release_mmap(fsm);
563       return rc;
564     }
565   }
566   // [magic:u4,dbflg:u1,dbid:u4,next_db_blk:u4,p0:u4,n[24]:u4,c[24]:u4,meta_blk:u4,meta_blkn:u4]:217
567   memcpy(&first_sblkn, mm + db->addr + DOFF_N0_U4, 4);
568   first_sblkn = IW_ITOHL(first_sblkn);
569   fsm->release_mmap(fsm);
570 
571   if (iwkv->first_db && (iwkv->first_db->addr == db->addr)) {
572     uint64_t llv;
573     db->iwkv->first_db = next;
574     llv = next ? (uint64_t) next->addr : 0;
575     llv = IW_HTOILL(llv);
576     rc = fsm->writehdr(fsm, sizeof(uint32_t) /*skip magic*/, &llv, sizeof(llv));
577   }
578   if (iwkv->last_db && (iwkv->last_db->addr == db->addr)) {
579     iwkv->last_db = prev;
580   }
581   // Cleanup DB
582   off_t db_addr = db->addr;
583   blkn_t meta_blk = db->meta_blk;
584   blkn_t meta_blkn = db->meta_blkn;
585   db->open = false;
586 
587   DISPOSE_DB_CTX dctx = {
588     .sbn  = first_sblkn,
589     .iwkv = iwkv,
590     .db   = db
591   };
592   IWRC(_db_dispose_chain(&dctx), rc);
593   if (meta_blk && meta_blkn) {
594     IWRC(fsm->deallocate(fsm, BLK2ADDR(meta_blk), BLK2ADDR(meta_blkn)), rc);
595   }
596   IWRC(fsm->deallocate(fsm, db_addr, DB_SZ), rc);
597   return rc;
598 }
599 
_db_create_lw(IWKV iwkv,dbid_t dbid,iwdb_flags_t dbflg,IWDB * odb)600 static WUR iwrc _db_create_lw(IWKV iwkv, dbid_t dbid, iwdb_flags_t dbflg, IWDB *odb) {
601   iwrc rc;
602   int rci;
603   uint8_t *mm = 0;
604   off_t baddr = 0, blen;
605   IWFS_FSM *fsm = &iwkv->fsm;
606   *odb = 0;
607   IWDB db = calloc(1, sizeof(struct _IWDB));
608   if (!db) {
609     return iwrc_set_errno(IW_ERROR_ALLOC, errno);
610   }
611   pthread_rwlockattr_t attr;
612   pthread_rwlockattr_init(&attr);
613 #if defined __linux__ && (defined __USE_UNIX98 || defined __USE_XOPEN2K)
614   pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
615 #endif
616   rci = pthread_rwlock_init(&db->rwl, &attr);
617   if (rci) {
618     free(db);
619     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
620   }
621   rci = pthread_spin_init(&db->cursors_slk, 0);
622   if (rci) {
623     pthread_rwlock_destroy(&db->rwl);
624     free(db);
625     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
626   }
627   rc = fsm->allocate(fsm, DB_SZ, &baddr, &blen, IWKV_FSM_ALLOC_FLAGS);
628   if (rc) {
629     _db_release_lw(&db);
630     return rc;
631   }
632   db->iwkv = iwkv;
633   db->dbflg = dbflg;
634   db->addr = baddr;
635   db->id = dbid;
636   db->prev = iwkv->last_db;
637   if (!iwkv->first_db) {
638     uint64_t llv;
639     iwkv->first_db = db;
640     llv = (uint64_t) db->addr;
641     llv = IW_HTOILL(llv);
642     rc = fsm->writehdr(fsm, sizeof(uint32_t) /*skip magic*/, &llv, sizeof(llv));
643   } else if (iwkv->last_db) {
644     iwkv->last_db->next = db;
645   }
646 
647   RCC(rc, finish, iwhmap_put_u32(iwkv->dbs, db->id, db));
648   iwkv->last_db = db;
649 
650   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
651   RCGO(rc, finish);
652   rc = _db_save(db, true, mm);
653   RCGO(rc, finish);
654   if (db->prev) {
655     rc = _db_save(db->prev, false, mm);
656     RCGO(rc, finish);
657   }
658   db->open = true;
659   *odb = db;
660 
661 finish:
662   if (mm) {
663     fsm->release_mmap(fsm);
664   }
665   if (rc) {
666     fsm->deallocate(fsm, baddr, blen);
667     _db_release_lw(&db);
668   }
669   return rc;
670 }
671 
672 //--------------------------  KVBLK
673 
_kvblk_create(IWLCTX * lx,off_t baddr,uint8_t kvbpow,KVBLK ** oblk)674 IW_INLINE void _kvblk_create(IWLCTX *lx, off_t baddr, uint8_t kvbpow, KVBLK **oblk) {
675   KVBLK *kblk = &lx->kaa[lx->kaan];
676   kblk->db = lx->db;
677   kblk->addr = baddr;
678   kblk->maxoff = 0;
679   kblk->idxsz = 2 * IW_VNUMSIZE(0) * KVBLK_IDXNUM;
680   kblk->zidx = 0;
681   kblk->szpow = kvbpow;
682   kblk->flags = KVBLK_DURTY;
683   memset(kblk->pidx, 0, sizeof(kblk->pidx));
684   *oblk = kblk;
685   AAPOS_INC(lx->kaan);
686 }
687 
_kvblk_key_peek(const KVBLK * kb,uint8_t idx,const uint8_t * mm,uint8_t ** obuf,uint32_t * olen)688 IW_INLINE WUR iwrc _kvblk_key_peek(
689   const KVBLK *kb,
690   uint8_t idx, const uint8_t *mm, uint8_t **obuf,
691   uint32_t *olen
692   ) {
693   if (kb->pidx[idx].len) {
694     uint32_t klen, step;
695     const uint8_t *rp = mm + kb->addr + (1ULL << kb->szpow) - kb->pidx[idx].off;
696     IW_READVNUMBUF(rp, klen, step);
697     if (!klen) {
698       *obuf = 0;
699       *olen = 0;
700       iwlog_ecode_error3(IWKV_ERROR_CORRUPTED);
701       return IWKV_ERROR_CORRUPTED;
702     }
703     rp += step;
704     *obuf = (uint8_t*) rp;
705     *olen = klen;
706   } else {
707     *obuf = 0;
708     *olen = 0;
709   }
710   return 0;
711 }
712 
_kvblk_value_peek(const KVBLK * kb,uint8_t idx,const uint8_t * mm,uint8_t ** obuf,uint32_t * olen)713 IW_INLINE void _kvblk_value_peek(const KVBLK *kb, uint8_t idx, const uint8_t *mm, uint8_t **obuf, uint32_t *olen) {
714   assert(idx < KVBLK_IDXNUM);
715   if (kb->pidx[idx].len) {
716     uint32_t klen, step;
717     const uint8_t *rp = mm + kb->addr + (1ULL << kb->szpow) - kb->pidx[idx].off;
718     IW_READVNUMBUF(rp, klen, step);
719     rp += step;
720     rp += klen;
721     *obuf = (uint8_t*) rp;
722     *olen = kb->pidx[idx].len - klen - step;
723   } else {
724     *obuf = 0;
725     *olen = 0;
726   }
727 }
728 
_kvblk_key_get(KVBLK * kb,uint8_t * mm,uint8_t idx,IWKV_val * key)729 static WUR iwrc _kvblk_key_get(KVBLK *kb, uint8_t *mm, uint8_t idx, IWKV_val *key) {
730   assert(mm && idx < KVBLK_IDXNUM);
731   int32_t klen;
732   int step;
733   KVP *kvp = &kb->pidx[idx];
734   key->compound = 0;
735   if (!kvp->len) {
736     key->data = 0;
737     key->size = 0;
738     return 0;
739   }
740   // [klen:vn,key,value]
741   uint8_t *rp = mm + kb->addr + (1ULL << kb->szpow) - kvp->off;
742   IW_READVNUMBUF(rp, klen, step);
743   rp += step;
744   if ((klen < 1) || (klen > kvp->len) || (klen > kvp->off)) {
745     iwlog_ecode_error3(IWKV_ERROR_CORRUPTED);
746     return IWKV_ERROR_CORRUPTED;
747   }
748   key->size = (size_t) klen;
749   if (kb->db->dbflg & IWDB_VNUM64_KEYS) {
750     // Needed to provide enough buffer in _unpack_effective_key()
751     key->data = malloc(MAX(key->size, sizeof(int64_t)));
752   } else {
753     key->data = malloc(key->size);
754   }
755   if (!key->data) {
756     return iwrc_set_errno(IW_ERROR_ALLOC, errno);
757   }
758   memcpy(key->data, rp, key->size);
759   return 0;
760 }
761 
_kvblk_value_get(KVBLK * kb,uint8_t * mm,uint8_t idx,IWKV_val * val)762 static WUR iwrc _kvblk_value_get(KVBLK *kb, uint8_t *mm, uint8_t idx, IWKV_val *val) {
763   assert(mm && idx < KVBLK_IDXNUM);
764   int32_t klen;
765   int step;
766   KVP *kvp = &kb->pidx[idx];
767   val->compound = 0;
768   if (!kvp->len) {
769     val->data = 0;
770     val->size = 0;
771     return 0;
772   }
773   // [klen:vn,key,value]
774   uint8_t *rp = mm + kb->addr + (1ULL << kb->szpow) - kvp->off;
775   IW_READVNUMBUF(rp, klen, step);
776   rp += step;
777   if ((klen < 1) || (klen > kvp->len) || (klen > kvp->off)) {
778     iwlog_ecode_error3(IWKV_ERROR_CORRUPTED);
779     return IWKV_ERROR_CORRUPTED;
780   }
781   rp += klen;
782   if (kvp->len > klen + step) {
783     val->size = kvp->len - klen - step;
784     val->data = malloc(val->size);
785     if (!val->data) {
786       iwrc rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
787       val->size = 0;
788       return rc;
789     }
790     memcpy(val->data, rp, val->size);
791   } else {
792     val->data = 0;
793     val->size = 0;
794   }
795   return 0;
796 }
797 
_kvblk_kv_get(KVBLK * kb,uint8_t * mm,uint8_t idx,IWKV_val * key,IWKV_val * val)798 static WUR iwrc _kvblk_kv_get(KVBLK *kb, uint8_t *mm, uint8_t idx, IWKV_val *key, IWKV_val *val) {
799   assert(mm && idx < KVBLK_IDXNUM);
800   int32_t klen;
801   int step;
802   KVP *kvp = &kb->pidx[idx];
803   key->compound = 0;
804   val->compound = 0;
805   if (!kvp->len) {
806     key->data = 0;
807     key->size = 0;
808     val->data = 0;
809     val->size = 0;
810     return 0;
811   }
812   // [klen:vn,key,value]
813   uint8_t *rp = mm + kb->addr + (1ULL << kb->szpow) - kvp->off;
814   IW_READVNUMBUF(rp, klen, step);
815   rp += step;
816   if ((klen < 1) || (klen > kvp->len) || (klen > kvp->off)) {
817     iwlog_ecode_error3(IWKV_ERROR_CORRUPTED);
818     return IWKV_ERROR_CORRUPTED;
819   }
820   key->size = (size_t) klen;
821   if (kb->db->dbflg & IWDB_VNUM64_KEYS) {
822     // Needed to provide enough buffer in _unpack_effective_key()
823     key->data = malloc(MAX(key->size, sizeof(int64_t)));
824   } else {
825     key->data = malloc(key->size);
826   }
827   if (!key->data) {
828     return iwrc_set_errno(IW_ERROR_ALLOC, errno);
829   }
830   memcpy(key->data, rp, key->size);
831   rp += klen;
832   if (kvp->len > klen + step) {
833     val->size = kvp->len - klen - step;
834     val->data = malloc(val->size);
835     if (!val->data) {
836       iwrc rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
837       free(key->data);
838       key->data = 0;
839       key->size = 0;
840       val->size = 0;
841       return rc;
842     }
843     memcpy(val->data, rp, val->size);
844   } else {
845     val->data = 0;
846     val->size = 0;
847   }
848   return 0;
849 }
850 
_kvblk_at_mm(IWLCTX * lx,off_t addr,uint8_t * mm,KVBLK * kbp,KVBLK ** blkp)851 static WUR iwrc _kvblk_at_mm(IWLCTX *lx, off_t addr, uint8_t *mm, KVBLK *kbp, KVBLK **blkp) {
852   uint8_t *rp;
853   uint16_t sv;
854   int step;
855   iwrc rc = 0;
856   KVBLK *kb = kbp ? kbp : &lx->kaa[lx->kaan];
857   kb->db = lx->db;
858   kb->addr = addr;
859   kb->maxoff = 0;
860   kb->idxsz = 0;
861   kb->zidx = -1;
862   kb->szpow = 0;
863   kb->flags = KVBLK_DEFAULT;
864   memset(kb->pidx, 0, sizeof(kb->pidx));
865 
866   *blkp = 0;
867   rp = mm + addr;
868   memcpy(&kb->szpow, rp, 1);
869   rp += 1;
870   IW_READSV(rp, sv, kb->idxsz);
871   if (IW_UNLIKELY(kb->idxsz > KVBLK_MAX_IDX_SZ)) {
872     rc = IWKV_ERROR_CORRUPTED;
873     iwlog_ecode_error3(rc);
874     goto finish;
875   }
876   for (uint8_t i = 0; i < KVBLK_IDXNUM; ++i) {
877     IW_READVNUMBUF64(rp, kb->pidx[i].off, step);
878     rp += step;
879     IW_READVNUMBUF(rp, kb->pidx[i].len, step);
880     rp += step;
881     if (kb->pidx[i].len) {
882       if (IW_UNLIKELY(!kb->pidx[i].off)) {
883         rc = IWKV_ERROR_CORRUPTED;
884         iwlog_ecode_error3(rc);
885         goto finish;
886       }
887       if (kb->pidx[i].off > kb->maxoff) {
888         kb->maxoff = kb->pidx[i].off;
889       }
890     } else if (kb->zidx < 0) {
891       kb->zidx = i;
892     }
893     kb->pidx[i].ridx = i;
894   }
895   *blkp = kb;
896   assert(rp - (mm + addr) <= (1ULL << kb->szpow));
897   if (!kbp) {
898     AAPOS_INC(lx->kaan);
899   }
900 
901 finish:
902   return rc;
903 }
904 
_kvblk_compacted_offset(KVBLK * kb)905 IW_INLINE off_t _kvblk_compacted_offset(KVBLK *kb) {
906   off_t coff = 0;
907   for (int i = 0; i < KVBLK_IDXNUM; ++i) {
908     coff += kb->pidx[i].len;
909   }
910   return coff;
911 }
912 
_kvblk_compacted_dsize(KVBLK * kb)913 IW_INLINE off_t _kvblk_compacted_dsize(KVBLK *kb) {
914   off_t coff = KVBLK_HDRSZ;
915   for (int i = 0; i < KVBLK_IDXNUM; ++i) {
916     coff += kb->pidx[i].len;
917     coff += IW_VNUMSIZE32(kb->pidx[i].len);
918     coff += IW_VNUMSIZE(kb->pidx[i].off);
919   }
920   return coff;
921 }
922 
_kvblk_sync_mm(KVBLK * kb,uint8_t * mm)923 static WUR iwrc _kvblk_sync_mm(KVBLK *kb, uint8_t *mm) {
924   iwrc rc = 0;
925   if (!(kb->flags & KVBLK_DURTY)) {
926     return rc;
927   }
928   uint16_t sp;
929   uint8_t *szp;
930   uint8_t *wp = mm + kb->addr;
931   uint8_t *sptr = wp;
932   IWDLSNR *dlsnr = kb->db->iwkv->dlsnr;
933   memcpy(wp, &kb->szpow, 1);
934   wp += 1;
935   szp = wp;
936   wp += sizeof(uint16_t);
937   for (int i = 0; i < KVBLK_IDXNUM; ++i) {
938     KVP *kvp = &kb->pidx[i];
939     IW_SETVNUMBUF64(sp, wp, kvp->off);
940     wp += sp;
941     IW_SETVNUMBUF(sp, wp, kvp->len);
942     wp += sp;
943   }
944   sp = wp - szp - sizeof(uint16_t);
945   kb->idxsz = sp;
946   assert(kb->idxsz <= KVBLK_MAX_IDX_SZ);
947   sp = IW_HTOIS(sp);
948   memcpy(szp, &sp, sizeof(uint16_t));
949   assert(wp - (mm + kb->addr) <= (1ULL << kb->szpow));
950   if (dlsnr) {
951     rc = dlsnr->onwrite(dlsnr, kb->addr, sptr, wp - sptr, 0);
952   }
953   kb->flags &= ~KVBLK_DURTY;
954   return rc;
955 }
956 
957 #define _kvblk_sort_kv_lt(v1, v2, o) \
958   (((v1).off > 0 ? (v1).off : -1UL) < ((v2).off > 0 ? (v2).off : -1UL))
959 
960 // -V:KSORT_INIT:522, 756, 769
KSORT_INIT(kvblk,KVP,_kvblk_sort_kv_lt)961 KSORT_INIT(kvblk, KVP, _kvblk_sort_kv_lt)
962 
963 static WUR iwrc _kvblk_compact_mm(KVBLK *kb, uint8_t *mm) {
964   uint8_t i;
965   off_t coff = _kvblk_compacted_offset(kb);
966   if (coff == kb->maxoff) { // compacted
967     return 0;
968   }
969   KVP tidx[KVBLK_IDXNUM];
970   KVP tidx_tmp[KVBLK_IDXNUM];
971   iwrc rc = 0;
972   uint16_t idxsiz = 0;
973   IWDLSNR *dlsnr = kb->db->iwkv->dlsnr;
974   off_t blkend = kb->addr + (1ULL << kb->szpow);
975   uint8_t *wp = mm + blkend;
976   memcpy(tidx, kb->pidx, sizeof(tidx));
977   ks_mergesort_kvblk(KVBLK_IDXNUM, tidx, tidx_tmp, 0);
978 
979   coff = 0;
980   for (i = 0; i < KVBLK_IDXNUM && tidx[i].off; ++i) {
981 #ifndef NDEBUG
982     if (i > 0) {
983       assert(tidx[i - 1].off < tidx[i].off);
984     }
985 #endif
986     KVP *kvp = &kb->pidx[tidx[i].ridx];
987     off_t noff = coff + kvp->len;
988     if (kvp->off > noff) {
989       assert(noff <= (1ULL << kb->szpow) && kvp->len <= noff);
990       if (dlsnr) {
991         rc = dlsnr->onwrite(dlsnr, blkend - noff, wp - kvp->off, kvp->len, 0);
992       }
993       memmove(wp - noff, wp - kvp->off, kvp->len);
994       kvp->off = noff;
995     }
996     coff += kvp->len;
997     idxsiz += IW_VNUMSIZE(kvp->off);
998     idxsiz += IW_VNUMSIZE32(kvp->len);
999   }
1000   idxsiz += (KVBLK_IDXNUM - i) * 2;
1001   for (i = 0; i < KVBLK_IDXNUM; ++i) {
1002     if (!kb->pidx[i].len) {
1003       kb->zidx = i;
1004       break;
1005     }
1006   }
1007   assert(idxsiz <= kb->idxsz);
1008   kb->idxsz = idxsiz;
1009   kb->maxoff = coff;
1010   if (i == KVBLK_IDXNUM) {
1011     kb->zidx = -1;
1012   }
1013   kb->flags |= KVBLK_DURTY;
1014   assert(_kvblk_compacted_offset(kb) == kb->maxoff);
1015   return rc;
1016 }
1017 
_kvblk_maxkvoff(KVBLK * kb)1018 IW_INLINE off_t _kvblk_maxkvoff(KVBLK *kb) {
1019   off_t off = 0;
1020   for (int i = 0; i < KVBLK_IDXNUM; ++i) {
1021     if (kb->pidx[i].off > off) {
1022       off = kb->pidx[i].off;
1023     }
1024   }
1025   return off;
1026 }
1027 
_kvblk_rmkv(KVBLK * kb,uint8_t idx,kvblk_rmkv_opts_t opts)1028 static WUR iwrc _kvblk_rmkv(KVBLK *kb, uint8_t idx, kvblk_rmkv_opts_t opts) {
1029   iwrc rc = 0;
1030   uint8_t *mm = 0;
1031   IWDLSNR *dlsnr = kb->db->iwkv->dlsnr;
1032   IWFS_FSM *fsm = &kb->db->iwkv->fsm;
1033   if (kb->pidx[idx].off >= kb->maxoff) {
1034     kb->maxoff = 0;
1035     for (int i = 0; i < KVBLK_IDXNUM; ++i) {
1036       if ((i != idx) && (kb->pidx[i].off > kb->maxoff)) {
1037         kb->maxoff = kb->pidx[i].off;
1038       }
1039     }
1040   }
1041   kb->pidx[idx].len = 0;
1042   kb->pidx[idx].off = 0;
1043   kb->flags |= KVBLK_DURTY;
1044   if ((kb->zidx < 0) || (idx < kb->zidx)) {
1045     kb->zidx = idx;
1046   }
1047   if (!(RMKV_NO_RESIZE & opts) && (kb->szpow > KVBLK_INISZPOW)) {
1048     off_t nlen = 1ULL << kb->szpow;
1049     off_t dsz = _kvblk_compacted_dsize(kb);
1050     if (nlen >= 2 * dsz) {
1051       uint8_t npow = kb->szpow - 1;
1052       while (npow > KVBLK_INISZPOW && (1ULL << (npow - 1)) >= dsz) {
1053         --npow;
1054       }
1055       rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1056       RCGO(rc, finish);
1057 
1058       rc = _kvblk_compact_mm(kb, mm);
1059       RCGO(rc, finish);
1060 
1061       off_t maxoff = _kvblk_maxkvoff(kb);
1062       if (dlsnr) {
1063         rc = dlsnr->onwrite(dlsnr, kb->addr + (1ULL << npow) - maxoff, mm + kb->addr + nlen - maxoff, maxoff, 0);
1064         RCGO(rc, finish);
1065       }
1066       memmove(mm + kb->addr + (1ULL << npow) - maxoff,
1067               mm + kb->addr + nlen - maxoff,
1068               (size_t) maxoff);
1069 
1070       fsm->release_mmap(fsm);
1071       mm = 0;
1072       rc = fsm->reallocate(fsm, (1ULL << npow), &kb->addr, &nlen, IWKV_FSM_ALLOC_FLAGS);
1073       RCGO(rc, finish);
1074       kb->szpow = npow;
1075       assert(nlen == (1ULL << kb->szpow));
1076       opts |= RMKV_SYNC;
1077     }
1078   }
1079   if (RMKV_SYNC & opts) {
1080     rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1081     RCGO(rc, finish);
1082     IWRC(_kvblk_sync_mm(kb, mm), rc);
1083   }
1084 
1085 finish:
1086   if (mm) {
1087     fsm->release_mmap(fsm);
1088   }
1089   return rc;
1090 }
1091 
_kvblk_addkv(KVBLK * kb,const IWKV_val * key,const IWKV_val * val,uint8_t * oidx,bool raw_key)1092 static WUR iwrc _kvblk_addkv(
1093   KVBLK          *kb,
1094   const IWKV_val *key,
1095   const IWKV_val *val,
1096   uint8_t        *oidx,
1097   bool            raw_key
1098   ) {
1099   *oidx = 0;
1100 
1101   iwrc rc = 0;
1102   off_t msz;    // max available free space
1103   off_t rsz;    // required size to add new key/value pair
1104   off_t noff;   // offset of new kvpair from end of block
1105   uint8_t *mm, *wp, *sptr;
1106   size_t i, sp;
1107   KVP *kvp;
1108   IWDB db = kb->db;
1109   bool compound = !raw_key && (db->dbflg & IWDB_COMPOUND_KEYS);
1110   IWFS_FSM *fsm = &db->iwkv->fsm;
1111   bool compacted = false;
1112   IWDLSNR *dlsnr = kb->db->iwkv->dlsnr;
1113   IWKV_val *uval = (IWKV_val*) val;
1114 
1115   size_t ksize = key->size;
1116   if (compound) {
1117     ksize += IW_VNUMSIZE(key->compound);
1118   }
1119   off_t psz = IW_VNUMSIZE(ksize) + ksize;
1120 
1121   if (kb->zidx < 0) {
1122     return _IWKV_RC_KVBLOCK_FULL;
1123   }
1124   psz += uval->size;
1125   if (psz > IWKV_MAX_KVSZ) {
1126     return IWKV_ERROR_MAXKVSZ;
1127   }
1128 
1129 start:
1130   // [szpow:u1,idxsz:u2,[ps0:vn,pl0:vn,..., ps32,pl32]____[[KV],...]] // KVBLK
1131   msz = (1ULL << kb->szpow) - (KVBLK_HDRSZ + kb->idxsz + kb->maxoff);
1132   assert(msz >= 0);
1133   noff = kb->maxoff + psz;
1134   rsz = psz + IW_VNUMSIZE(noff) + IW_VNUMSIZE(psz);
1135 
1136   if (msz < rsz) { // not enough space
1137     if (!compacted) {
1138       compacted = true;
1139       if (_kvblk_compacted_offset(kb) != kb->maxoff) {
1140         rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1141         RCGO(rc, finish);
1142         rc = _kvblk_compact_mm(kb, mm);
1143         RCGO(rc, finish);
1144         fsm->release_mmap(fsm);
1145         goto start;
1146       }
1147     }
1148     // resize the whole block
1149     off_t nlen = 1ULL << kb->szpow;
1150     off_t nsz = rsz - msz + nlen;
1151     off_t naddr = kb->addr;
1152     off_t olen = nlen;
1153 
1154     uint8_t npow = kb->szpow;
1155     while ((1ULL << ++npow) < nsz);
1156 
1157     rc = fsm->allocate(fsm, (1ULL << npow), &naddr, &nlen, IWKV_FSM_ALLOC_FLAGS);
1158     RCGO(rc, finish);
1159     assert(nlen == (1ULL << npow));
1160     rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1161     RCGO(rc, finish);
1162     if (dlsnr) {
1163       rc = dlsnr->onwrite(dlsnr, naddr, mm + kb->addr, KVBLK_HDRSZ, 0);
1164       RCGO(rc, finish);
1165       memcpy(mm + naddr, mm + kb->addr, KVBLK_HDRSZ);
1166       rc = dlsnr->onwrite(dlsnr, naddr + nlen - kb->maxoff, mm + kb->addr + olen - kb->maxoff, kb->maxoff, 0);
1167       RCGO(rc, finish);
1168       memcpy(mm + naddr + nlen - kb->maxoff, mm + kb->addr + olen - kb->maxoff, (size_t) kb->maxoff);
1169     } else {
1170       memcpy(mm + naddr, mm + kb->addr, KVBLK_HDRSZ);
1171       memcpy(mm + naddr + nlen - kb->maxoff, mm + kb->addr + olen - kb->maxoff, (size_t) kb->maxoff);
1172     }
1173     fsm->release_mmap(fsm);
1174     rc = fsm->deallocate(fsm, kb->addr, olen);
1175     RCGO(rc, finish);
1176 
1177     kb->addr = naddr;
1178     kb->szpow = npow;
1179   }
1180   *oidx = (uint8_t) kb->zidx;
1181   kvp = &kb->pidx[kb->zidx];
1182   kvp->len = (uint32_t) psz;
1183   kvp->off = noff;
1184   kvp->ridx = (uint8_t) kb->zidx;
1185   kb->maxoff = noff;
1186   kb->flags |= KVBLK_DURTY;
1187   for (i = 0; i < KVBLK_IDXNUM; ++i) {
1188     if (!kb->pidx[i].len && (i != kb->zidx)) {
1189       kb->zidx = i;
1190       break;
1191     }
1192   }
1193   if (i >= KVBLK_IDXNUM) {
1194     kb->zidx = -1;
1195   }
1196   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1197   RCGO(rc, finish);
1198   assert((1ULL << kb->szpow) >= KVBLK_HDRSZ + kb->idxsz + kb->maxoff);
1199   assert(kvp->off < (1ULL << kb->szpow) && kvp->len <= kvp->off);
1200   wp = mm + kb->addr + (1ULL << kb->szpow) - kvp->off;
1201   sptr = wp;
1202   // [klen:vn,key,value]
1203   IW_SETVNUMBUF(sp, wp, ksize);
1204   wp += sp;
1205   if (compound) {
1206     IW_SETVNUMBUF64(sp, wp, key->compound);
1207     wp += sp;
1208   }
1209   memcpy(wp, key->data, key->size);
1210   wp += key->size;
1211   if (uval->size) {
1212     memcpy(wp, uval->data, uval->size);
1213     wp += uval->size;
1214   }
1215 #ifndef NDEBUG
1216   assert(wp - sptr == kvp->len);
1217 #endif
1218   if (dlsnr) {
1219     rc = dlsnr->onwrite(dlsnr, kb->addr + (1ULL << kb->szpow) - kvp->off, sptr, wp - sptr, 0);
1220   }
1221   fsm->release_mmap(fsm);
1222 
1223 finish:
1224   return rc;
1225 }
1226 
_kvblk_updatev(KVBLK * kb,uint8_t * idxp,const IWKV_val * key,const IWKV_val * val)1227 static WUR iwrc _kvblk_updatev(
1228   KVBLK          *kb,
1229   uint8_t        *idxp,
1230   const IWKV_val *key,                              /* Nullable */
1231   const IWKV_val *val
1232   ) {
1233   assert(*idxp < KVBLK_IDXNUM);
1234   int32_t i;
1235   uint32_t len, nlen, sz;
1236   uint8_t pidx = *idxp, *mm = 0, *wp, *sp;
1237   IWDB db = kb->db;
1238   IWDLSNR *dlsnr = kb->db->iwkv->dlsnr;
1239   IWKV_val *uval = (IWKV_val*) val;
1240   IWKV_val *ukey = (IWKV_val*) key;
1241   IWKV_val skey; // stack allocated key/val
1242   KVP *kvp = &kb->pidx[pidx];
1243   size_t kbsz = 1ULL << kb->szpow;                            // kvblk size
1244   off_t freesz = kbsz - KVBLK_HDRSZ - kb->idxsz - kb->maxoff; // free space available
1245   IWFS_FSM *fsm = &db->iwkv->fsm;
1246 
1247   iwrc rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1248   RCRET(rc);
1249   assert(freesz >= 0);
1250 
1251   wp = mm + kb->addr + kbsz - kvp->off;
1252   sp = wp;
1253   IW_READVNUMBUF(wp, len, sz);
1254   wp += sz;
1255   if (ukey && (len != ukey->size)) {
1256     rc = IWKV_ERROR_CORRUPTED;
1257     iwlog_ecode_error3(rc);
1258     goto finish;
1259   }
1260   wp += len;
1261   off_t rsize = sz + len + uval->size; // required size
1262   if (rsize <= kvp->len) {
1263     memcpy(wp, uval->data, uval->size);
1264     if (dlsnr) {
1265       rc = dlsnr->onwrite(dlsnr, wp - mm, uval->data, uval->size, 0);
1266       RCGO(rc, finish);
1267     }
1268     wp += uval->size;
1269     if ((wp - sp) != kvp->len) {
1270       kvp->len = wp - sp;
1271       kb->flags |= KVBLK_DURTY;
1272     }
1273   } else {
1274     KVP tidx[KVBLK_IDXNUM];
1275     KVP tidx_tmp[KVBLK_IDXNUM];
1276     off_t koff = kb->pidx[pidx].off;
1277     memcpy(tidx, kb->pidx, KVBLK_IDXNUM * sizeof(kb->pidx[0]));
1278     ks_mergesort_kvblk(KVBLK_IDXNUM, tidx, tidx_tmp, 0);
1279     kb->flags |= KVBLK_DURTY;
1280     if (!ukey) { // we need a key
1281       ukey = &skey;
1282       rc = _kvblk_key_get(kb, mm, pidx, ukey);
1283       RCGO(rc, finish);
1284     }
1285     for (i = 0; i < KVBLK_IDXNUM; ++i) {
1286       if (tidx[i].off == koff) {
1287         if (koff - ((i > 0) ? tidx[i - 1].off : 0) >= rsize) {
1288           nlen = wp + uval->size - sp;
1289           if (!((nlen > kvp->len) && (freesz - IW_VNUMSIZE32(nlen) + IW_VNUMSIZE32(kvp->len) < 0))) { // enough space?
1290             memcpy(wp, uval->data, uval->size);
1291             if (dlsnr) {
1292               rc = dlsnr->onwrite(dlsnr, wp - mm, uval->data, uval->size, 0);
1293               RCGO(rc, finish);
1294             }
1295             wp += uval->size;
1296             kvp->len = nlen;
1297             break;
1298             ;
1299           }
1300         }
1301         mm = 0;
1302         fsm->release_mmap(fsm);
1303         rc = _kvblk_rmkv(kb, pidx, RMKV_NO_RESIZE);
1304         RCGO(rc, finish);
1305         rc = _kvblk_addkv(kb, ukey, uval, idxp, false);
1306         break;
1307       }
1308     }
1309   }
1310 
1311 finish:
1312   if (ukey != key) {
1313     _kv_val_dispose(ukey);
1314   }
1315   if (mm) {
1316     IWRC(fsm->release_mmap(fsm), rc);
1317   }
1318   return rc;
1319 }
1320 
1321 //--------------------------  SBLK
1322 
_sblk_release(IWLCTX * lx,SBLK ** sblkp)1323 IW_INLINE void _sblk_release(IWLCTX *lx, SBLK **sblkp) {
1324   assert(sblkp && *sblkp);
1325   SBLK *sblk = *sblkp;
1326   sblk->flags &= ~SBLK_DURTY;       // clear dirty flag
1327   sblk->kvblk = 0;
1328   *sblkp = 0;
1329 }
1330 
_sblk_loadkvblk_mm(IWLCTX * lx,SBLK * sblk,uint8_t * mm)1331 IW_INLINE WUR iwrc _sblk_loadkvblk_mm(IWLCTX *lx, SBLK *sblk, uint8_t *mm) {
1332   if (!sblk->kvblk && sblk->kvblkn) {
1333     return _kvblk_at_mm(lx, BLK2ADDR(sblk->kvblkn), mm, 0, &sblk->kvblk);
1334   } else {
1335     return 0;
1336   }
1337 }
1338 
_sblk_is_only_one_on_page_v2(IWLCTX * lx,uint8_t * mm,SBLK * sblk,off_t * page_addr)1339 static bool _sblk_is_only_one_on_page_v2(IWLCTX *lx, uint8_t *mm, SBLK *sblk, off_t *page_addr) {
1340   *page_addr = 0;
1341   if ((sblk->bpos > 0) && (sblk->bpos <= SBLK_PAGE_SBLK_NUM_V2)) {
1342     off_t addr = sblk->addr - (sblk->bpos - 1) * SBLK_SZ;
1343     *page_addr = addr;
1344     for (int i = 0; i < SBLK_PAGE_SBLK_NUM_V2; ++i) {
1345       if (i != sblk->bpos - 1) {
1346         uint8_t bv;
1347         memcpy(&bv, mm + addr + i * SBLK_SZ + SOFF_BPOS_U1_V2, 1);
1348         if (bv) {
1349           return false;
1350         }
1351       }
1352     }
1353   } else {
1354     return false; // be safe
1355   }
1356   return true;
1357 }
1358 
_sblk_destroy(IWLCTX * lx,SBLK ** sblkp)1359 IW_INLINE WUR iwrc _sblk_destroy(IWLCTX *lx, SBLK **sblkp) {
1360   assert(sblkp && *sblkp && (*sblkp)->addr);
1361   iwrc rc = 0;
1362   SBLK *sblk = *sblkp;
1363   lx->destroy_addr = sblk->addr;
1364 
1365   if (!(sblk->flags & SBLK_DB)) {
1366     uint8_t kvb_szpow, *mm;
1367     IWDLSNR *dlsnr = lx->db->iwkv->dlsnr;
1368     IWFS_FSM *fsm = &lx->db->iwkv->fsm;
1369     off_t kvb_addr = BLK2ADDR(sblk->kvblkn);
1370     rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1371     RCRET(rc);
1372 
1373     if (!sblk->kvblk) {
1374       // Read KVBLK size as power of two
1375       memcpy(&kvb_szpow, mm + kvb_addr + KBLK_SZPOW_OFF, 1);
1376     } else {
1377       kvb_szpow = sblk->kvblk->szpow;
1378     }
1379     if (lx->db->lcnt[sblk->lvl]) {
1380       lx->db->lcnt[sblk->lvl]--;
1381       lx->db->flags |= SBLK_DURTY;
1382     }
1383     if (lx->db->iwkv->fmt_version > 1) {
1384       off_t paddr;
1385       if (_sblk_is_only_one_on_page_v2(lx, mm, sblk, &paddr)) {
1386         fsm->release_mmap(fsm);
1387         // Deallocate whole page
1388         rc = fsm->deallocate(fsm, paddr, SBLK_PAGE_SZ_V2);
1389       } else {
1390         memset(mm + sblk->addr + SOFF_BPOS_U1_V2, 0, 1);
1391         fsm->release_mmap(fsm);
1392         if (dlsnr) {
1393           dlsnr->onset(dlsnr, sblk->addr + SOFF_BPOS_U1_V2, 0, 1, 0);
1394         }
1395       }
1396     } else {
1397       fsm->release_mmap(fsm);
1398       rc = fsm->deallocate(fsm, sblk->addr, SBLK_SZ);
1399     }
1400     IWRC(fsm->deallocate(fsm, kvb_addr, 1ULL << kvb_szpow), rc);
1401   }
1402   _sblk_release(lx, sblkp);
1403   return rc;
1404 }
1405 
_sblk_genlevel(IWDB db)1406 IW_INLINE uint8_t _sblk_genlevel(IWDB db) {
1407   uint8_t lvl;
1408 #ifdef IW_TESTS
1409   if (iwkv_next_level >= 0) {
1410     lvl = (uint8_t) iwkv_next_level;
1411     iwkv_next_level = -1;
1412     assert(lvl < SLEVELS);
1413     return lvl;
1414   }
1415 #endif
1416   uint32_t r = iwu_rand_u32();
1417   for (lvl = 0; lvl < SLEVELS && !(r & 1); ++lvl) r >>= 1;
1418   uint8_t ret = IW_UNLIKELY(lvl >= SLEVELS) ? SLEVELS - 1 : lvl;
1419   while (ret > 0 && db->lcnt[ret - 1] == 0) {
1420     --ret;
1421   }
1422   return ret;
1423 }
1424 
_sblk_create_v1(IWLCTX * lx,uint8_t nlevel,uint8_t kvbpow,off_t baddr,uint8_t bpos,SBLK ** oblk)1425 static WUR iwrc _sblk_create_v1(IWLCTX *lx, uint8_t nlevel, uint8_t kvbpow, off_t baddr, uint8_t bpos, SBLK **oblk) {
1426   iwrc rc;
1427   SBLK *sblk;
1428   KVBLK *kvblk;
1429   off_t blen;
1430   IWFS_FSM *fsm = &lx->db->iwkv->fsm;
1431   if (kvbpow < KVBLK_INISZPOW) {
1432     kvbpow = KVBLK_INISZPOW;
1433   }
1434   *oblk = 0;
1435   if (!bpos) {
1436     rc = fsm->allocate(fsm, SBLK_SZ + (1ULL << kvbpow), &baddr, &blen, IWKV_FSM_ALLOC_FLAGS);
1437     RCRET(rc);
1438     assert(blen - SBLK_SZ == (1ULL << kvbpow));
1439     _kvblk_create(lx, baddr + SBLK_SZ, kvbpow, &kvblk);
1440   } else {
1441     // Allocate kvblk as separate chunk
1442     off_t kblkaddr = 0;
1443     rc = fsm->allocate(fsm, (1ULL << kvbpow), &kblkaddr, &blen, IWKV_FSM_ALLOC_FLAGS);
1444     assert(blen == (1ULL << kvbpow));
1445     _kvblk_create(lx, kblkaddr, kvbpow, &kvblk);
1446   }
1447   sblk = &lx->saa[lx->saan];
1448   sblk->db = lx->db;
1449   sblk->db->lcnt[nlevel]++;
1450   sblk->db->flags |= SBLK_DURTY;
1451   sblk->addr = baddr;
1452   sblk->flags = SBLK_DURTY;
1453   sblk->lvl = nlevel;
1454   sblk->p0 = 0;
1455   memset(sblk->n, 0, sizeof(sblk->n));
1456   sblk->kvblk = kvblk;
1457   sblk->kvblkn = ADDR2BLK(kvblk->addr);
1458   sblk->lkl = 0;
1459   sblk->pnum = 0;
1460   sblk->bpos = bpos;
1461   memset(sblk->pi, 0, sizeof(sblk->pi));
1462   *oblk = sblk;
1463   AAPOS_INC(lx->saan);
1464   return 0;
1465 }
1466 
_sblk_find_free_page_slot_v2(IWLCTX * lx,uint8_t * mm,SBLK * sblk,off_t * obaddr,uint8_t * oslot)1467 static void _sblk_find_free_page_slot_v2(IWLCTX *lx, uint8_t *mm, SBLK *sblk, off_t *obaddr, uint8_t *oslot) {
1468   if ((sblk->bpos < 1) || (sblk->bpos > SBLK_PAGE_SBLK_NUM_V2)) {
1469     *obaddr = 0;
1470     *oslot = 0;
1471     return;
1472   }
1473   off_t paddr = sblk->addr - (sblk->bpos - 1) * SBLK_SZ;
1474   for (int i = sblk->bpos + 1; i <= SBLK_PAGE_SBLK_NUM_V2; ++i) {
1475     uint8_t slot;
1476     memcpy(&slot, mm + paddr + (i - 1) * SBLK_SZ + SOFF_BPOS_U1_V2, 1);
1477     if (!slot) {
1478       *obaddr = paddr + (i - 1) * SBLK_SZ;
1479       *oslot = i;
1480       return;
1481     }
1482   }
1483   for (int i = sblk->bpos - 1; i > 0; --i) {
1484     uint8_t slot;
1485     memcpy(&slot, mm + paddr + (i - 1) * SBLK_SZ + SOFF_BPOS_U1_V2, 1);
1486     if (!slot) {
1487       *obaddr = paddr + (i - 1) * SBLK_SZ;
1488       *oslot = i;
1489       return;
1490     }
1491   }
1492   *obaddr = 0;
1493   *oslot = 0;
1494 }
1495 
1496 /// Create
_sblk_create_v2(IWLCTX * lx,uint8_t nlevel,uint8_t kvbpow,SBLK * lower,SBLK * upper,SBLK ** oblk)1497 static WUR iwrc _sblk_create_v2(IWLCTX *lx, uint8_t nlevel, uint8_t kvbpow, SBLK *lower, SBLK *upper, SBLK **oblk) {
1498   off_t baddr = 0;
1499   uint8_t bpos = 0, *mm;
1500   IWFS_FSM *fsm = &lx->db->iwkv->fsm;
1501   SBLK *_lower = lower;
1502   SBLK *_upper = upper;
1503 
1504   for (int i = SLEVELS - 1; i >= 0; --i) {
1505     if (lx->pupper[i] && (lx->pupper[i]->lvl >= nlevel)) {
1506       _upper = lx->pupper[i];
1507     }
1508     if (lx->plower[i] && (lx->plower[i]->lvl >= nlevel)) {
1509       _lower = lx->plower[i];
1510     }
1511   }
1512 
1513   iwrc rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1514   RCRET(rc);
1515   _sblk_find_free_page_slot_v2(lx, mm, _lower, &baddr, &bpos);
1516   if (!baddr && _upper && (_upper->addr != _lower->addr)) {
1517     _sblk_find_free_page_slot_v2(lx, mm, _upper, &baddr, &bpos);
1518   }
1519   if (!baddr) {
1520     if (_lower->addr != lower->addr) {
1521       _sblk_find_free_page_slot_v2(lx, mm, lower, &baddr, &bpos);
1522     }
1523     if (!baddr && upper && _upper && (_upper->addr != upper->addr)) {
1524       _sblk_find_free_page_slot_v2(lx, mm, upper, &baddr, &bpos);
1525     }
1526   }
1527   fsm->release_mmap(fsm);
1528 
1529   if (!baddr) {
1530     // No free slots - allocate new SBLK page
1531     off_t blen;
1532     bpos = 1;
1533     IWDLSNR *dlsnr = lx->db->iwkv->dlsnr;
1534     rc = fsm->allocate(fsm, SBLK_PAGE_SZ_V2, &baddr, &blen, IWKV_FSM_ALLOC_FLAGS);
1535     RCRET(rc);
1536     rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1537     RCRET(rc);
1538     // Fill page to zero
1539     memset(mm + baddr, 0, blen);
1540     if (dlsnr) {
1541       rc = dlsnr->onset(dlsnr, baddr, 0, blen, 0);
1542     }
1543     fsm->release_mmap(fsm);
1544     RCRET(rc);
1545   }
1546   return _sblk_create_v1(lx, nlevel, kvbpow, baddr, bpos, oblk);
1547 }
1548 
_sblk_create(IWLCTX * lx,uint8_t nlevel,uint8_t kvbpow,SBLK * lower,SBLK * upper,SBLK ** oblk)1549 IW_INLINE WUR iwrc _sblk_create(IWLCTX *lx, uint8_t nlevel, uint8_t kvbpow, SBLK *lower, SBLK *upper, SBLK **oblk) {
1550   if (lx->db->iwkv->fmt_version > 1) {
1551     return _sblk_create_v2(lx, nlevel, kvbpow, lower, upper, oblk);
1552   } else {
1553     return _sblk_create_v1(lx, nlevel, kvbpow, lower->addr, 0, oblk);
1554   }
1555 }
1556 
_sblk_at2(IWLCTX * lx,off_t addr,sblk_flags_t flgs,SBLK * sblk)1557 static WUR iwrc _sblk_at2(IWLCTX *lx, off_t addr, sblk_flags_t flgs, SBLK *sblk) {
1558   iwrc rc;
1559   uint8_t *mm;
1560   uint32_t lv;
1561   sblk_flags_t flags = lx->sbflags | flgs;
1562   IWDB db = lx->db;
1563   IWFS_FSM *fsm = &db->iwkv->fsm;
1564   sblk->kvblk = 0;
1565   sblk->bpos = 0;
1566   sblk->db = db;
1567 
1568   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1569   RCRET(rc);
1570 
1571   if (IW_UNLIKELY(addr == db->addr)) {
1572     uint8_t *rp = mm + addr + DOFF_N0_U4;
1573     // [magic:u4,dbflg:u1,dbid:u4,next_db_blk:u4,p0:u4,n[24]:u4,c[24]:u4,meta_blk:u4,meta_blkn:u4]:217
1574     sblk->addr = addr;
1575     sblk->flags = SBLK_DB | flags;
1576     sblk->lvl = 0;
1577     sblk->p0 = 0;
1578     sblk->kvblkn = 0;
1579     sblk->lkl = 0;
1580     sblk->pnum = KVBLK_IDXNUM;
1581     memset(sblk->pi, 0, sizeof(sblk->pi));
1582     for (int i = 0; i < SLEVELS; ++i) {
1583       IW_READLV(rp, lv, sblk->n[i]);
1584       if (sblk->n[i]) {
1585         ++sblk->lvl;
1586       } else {
1587         break;
1588       }
1589     }
1590     if (sblk->lvl) {
1591       --sblk->lvl;
1592     }
1593   } else if (addr) {
1594     uint8_t uflags;
1595     uint8_t *rp = mm + addr;
1596     sblk->addr = addr;
1597     // [flags:u1,lvl:u1,lkl:u1,pnum:u1,p0:u4,kblk:u4,pi:u1[32],n:u4[24],bpos:u1,lk:u115]:u256
1598     memcpy(&uflags, rp++, 1);
1599     sblk->flags = uflags;
1600     if (sblk->flags & ~SBLK_PERSISTENT_FLAGS) {
1601       rc = IWKV_ERROR_CORRUPTED;
1602       iwlog_ecode_error3(rc);
1603       goto finish;
1604     }
1605     sblk->flags |= flags;
1606     memcpy(&sblk->lvl, rp++, 1);
1607     if (sblk->lvl >= SLEVELS) {
1608       rc = IWKV_ERROR_CORRUPTED;
1609       iwlog_ecode_error3(rc);
1610       goto finish;
1611     }
1612     memcpy(&sblk->lkl, rp++, 1);
1613     if (sblk->lkl > db->iwkv->pklen) {
1614       rc = IWKV_ERROR_CORRUPTED;
1615       iwlog_ecode_error3(rc);
1616       goto finish;
1617     }
1618     memcpy(&sblk->pnum, rp++, 1);
1619     if (sblk->pnum < 0) {
1620       rc = IWKV_ERROR_CORRUPTED;
1621       iwlog_ecode_error3(rc);
1622       goto finish;
1623     }
1624     memcpy(&sblk->p0, rp, 4);
1625     sblk->p0 = IW_ITOHL(sblk->p0);
1626     rp += 4;
1627     memcpy(&sblk->kvblkn, rp, 4);
1628     sblk->kvblkn = IW_ITOHL(sblk->kvblkn);
1629     rp += 4;
1630     memcpy(sblk->pi, rp, KVBLK_IDXNUM);
1631     rp += KVBLK_IDXNUM;
1632 
1633 #ifdef IW_BIGENDIAN
1634     for (int i = 0; i <= sblk->lvl; ++i) {
1635       memcpy(&sblk->n[i], rp, 4);
1636       sblk->n[i] = IW_ITOHL(sblk->n[i]);
1637       rp += 4;
1638     }
1639 #else
1640     memcpy(sblk->n, rp, 4 * (sblk->lvl + 1));
1641     rp += 4 * (sblk->lvl + 1);
1642 #endif
1643     if (db->iwkv->fmt_version > 1) {
1644       rp = mm + addr + SOFF_BPOS_U1_V2;
1645       memcpy(&sblk->bpos, rp++, 1);
1646     } else {
1647       rp = mm + addr + SOFF_LK_V1;
1648     }
1649     // Lower key
1650     memcpy(sblk->lk, rp, (size_t) sblk->lkl);
1651   } else { // Database tail
1652     uint8_t *rp = mm + db->addr + DOFF_P0_U4;
1653     sblk->addr = 0;
1654     sblk->flags = SBLK_DB | flags;
1655     sblk->lvl = 0;
1656     sblk->kvblkn = 0;
1657     sblk->lkl = 0;
1658     sblk->pnum = KVBLK_IDXNUM;
1659     memset(sblk->pi, 0, sizeof(sblk->pi));
1660     IW_READLV(rp, lv, sblk->p0);
1661     if (!sblk->p0) {
1662       sblk->p0 = ADDR2BLK(db->addr);
1663     }
1664   }
1665 
1666 finish:
1667   fsm->release_mmap(fsm);
1668   return rc;
1669 }
1670 
_sblk_at(IWLCTX * lx,off_t addr,sblk_flags_t flgs,SBLK ** sblkp)1671 IW_INLINE WUR iwrc _sblk_at(IWLCTX *lx, off_t addr, sblk_flags_t flgs, SBLK **sblkp) {
1672   *sblkp = 0;
1673   SBLK *sblk = &lx->saa[lx->saan];
1674   iwrc rc = _sblk_at2(lx, addr, flgs, sblk);
1675   AAPOS_INC(lx->saan);
1676   *sblkp = sblk;
1677   return rc;
1678 }
1679 
_sblk_sync_mm(IWLCTX * lx,SBLK * sblk,uint8_t * mm)1680 static WUR iwrc _sblk_sync_mm(IWLCTX *lx, SBLK *sblk, uint8_t *mm) {
1681   iwrc rc = 0;
1682   if (sblk->flags & SBLK_DURTY) {
1683     uint32_t lv;
1684     IWDLSNR *dlsnr = lx->db->iwkv->dlsnr;
1685     sblk->flags &= ~SBLK_DURTY;
1686     if (IW_UNLIKELY(sblk->flags & SBLK_DB)) {
1687       uint8_t *sp;
1688       uint8_t *wp = mm + sblk->db->addr;
1689       if (sblk->addr) {
1690         assert(sblk->addr == sblk->db->addr);
1691         wp += DOFF_N0_U4;
1692         sp = wp;
1693         // [magic:u4,dbflg:u1,dbid:u4,next_db_blk:u4,p0:u4,n[24]:u4,c[24]:u4,meta_blk:u4,meta_blkn:u4]:217
1694         for (int i = 0; i < SLEVELS; ++i) {
1695           IW_WRITELV(wp, lv, sblk->n[i]);
1696         }
1697         assert(wp - (mm + sblk->db->addr) <= SBLK_SZ);
1698         for (int i = 0; i < SLEVELS; ++i) {
1699           IW_WRITELV(wp, lv, lx->db->lcnt[i]);
1700         }
1701       } else { // Database tail
1702         wp += DOFF_P0_U4;
1703         sp = wp;
1704         IW_WRITELV(wp, lv, sblk->p0);
1705         assert(wp - (mm + sblk->db->addr) <= SBLK_SZ);
1706       }
1707       if (dlsnr) {
1708         rc = dlsnr->onwrite(dlsnr, sp - mm, sp, wp - sp, 0);
1709       }
1710       return rc;
1711     } else {
1712       uint8_t *wp = mm + sblk->addr;
1713       sblk_flags_t flags = (sblk->flags & SBLK_PERSISTENT_FLAGS);
1714       uint8_t uflags = flags;
1715       assert(sblk->lkl <= lx->db->iwkv->pklen);
1716       // [u1:flags,lvl:u1,lkl:u1,pnum:u1,p0:u4,kblk:u4,[pi0:u1,... pi32],n0-n23:u4,lk:u116]:u256
1717       wp += SOFF_FLAGS_U1;
1718       memcpy(wp++, &uflags, 1);
1719       memcpy(wp++, &sblk->lvl, 1);
1720       memcpy(wp++, &sblk->lkl, 1);
1721       memcpy(wp++, &sblk->pnum, 1);
1722       IW_WRITELV(wp, lv, sblk->p0);
1723       IW_WRITELV(wp, lv, sblk->kvblkn);
1724       memcpy(wp, sblk->pi, KVBLK_IDXNUM);
1725       wp = mm + sblk->addr + SOFF_N0_U4;
1726 
1727 #ifdef IW_BIGENDIAN
1728       for (int i = 0; i <= sblk->lvl; ++i) {
1729         IW_WRITELV(wp, lv, sblk->n[i]);
1730       }
1731 #else
1732       memcpy(wp, sblk->n, 4 * (sblk->lvl + 1));
1733       wp += 4 * (sblk->lvl + 1);
1734 #endif
1735 
1736       if (lx->db->iwkv->fmt_version > 1) {
1737         wp = mm + sblk->addr + SOFF_BPOS_U1_V2;
1738         memcpy(wp++, &sblk->bpos, 1);
1739       } else {
1740         wp = mm + sblk->addr + SOFF_LK_V1;
1741       }
1742       memcpy(wp, sblk->lk, (size_t) sblk->lkl);
1743       if (dlsnr) {
1744         rc = dlsnr->onwrite(dlsnr, sblk->addr, mm + sblk->addr, SOFF_END, 0);
1745         RCRET(rc);
1746       }
1747     }
1748   }
1749   if (sblk->kvblk && (sblk->kvblk->flags & KVBLK_DURTY)) {
1750     IWRC(_kvblk_sync_mm(sblk->kvblk, mm), rc);
1751   }
1752   return rc;
1753 }
1754 
_sblk_sync(IWLCTX * lx,SBLK * sblk)1755 IW_INLINE WUR iwrc _sblk_sync(IWLCTX *lx, SBLK *sblk) {
1756   if ((sblk->flags & SBLK_DURTY) || (sblk->kvblk && (sblk->kvblk->flags & KVBLK_DURTY))) {
1757     uint8_t *mm;
1758     IWFS_FSM *fsm = &lx->db->iwkv->fsm;
1759     iwrc rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1760     RCRET(rc);
1761     rc = _sblk_sync_mm(lx, sblk, mm);
1762     fsm->release_mmap(fsm);
1763     return rc;
1764   }
1765   return 0;
1766 }
1767 
_sblk_sync_and_release_mm(IWLCTX * lx,SBLK ** sblkp,uint8_t * mm)1768 IW_INLINE WUR iwrc _sblk_sync_and_release_mm(IWLCTX *lx, SBLK **sblkp, uint8_t *mm) {
1769   SBLK *sblk = *sblkp;
1770   if (lx->destroy_addr && (lx->destroy_addr == sblk->addr)) {
1771     return 0;
1772   }
1773   iwrc rc = 0;
1774   if (mm) {
1775     rc = _sblk_sync_mm(lx, *sblkp, mm);
1776   }
1777   _sblk_release(lx, sblkp);
1778   return rc;
1779 }
1780 
_sblk_find_pi_mm(SBLK * sblk,IWLCTX * lx,const uint8_t * mm,bool * found,uint8_t * idxp)1781 static WUR iwrc _sblk_find_pi_mm(SBLK *sblk, IWLCTX *lx, const uint8_t *mm, bool *found, uint8_t *idxp) {
1782   *found = false;
1783   if (sblk->flags & SBLK_DB) {
1784     *idxp = KVBLK_IDXNUM;
1785     return 0;
1786   }
1787   uint8_t *k;
1788   uint32_t kl;
1789   int idx = 0, lb = 0, ub = sblk->pnum - 1;
1790   iwdb_flags_t dbflg = lx->db->dbflg;
1791 
1792   if (sblk->pnum < 1) {
1793     *idxp = 0;
1794     return 0;
1795   }
1796   while (1) {
1797     idx = (ub + lb) / 2;
1798     iwrc rc = _kvblk_key_peek(sblk->kvblk, sblk->pi[idx], mm, &k, &kl);
1799     RCRET(rc);
1800     int cr = _cmp_keys(dbflg, k, kl, lx->key);
1801     if (!cr) {
1802       *found = true;
1803       break;
1804     } else if (cr < 0) {
1805       lb = idx + 1;
1806       if (lb > ub) {
1807         idx = lb;
1808         break;
1809       }
1810     } else {
1811       ub = idx - 1;
1812       if (lb > ub) {
1813         break;
1814       }
1815     }
1816   }
1817   *idxp = idx;
1818   return 0;
1819 }
1820 
_sblk_insert_pi_mm(SBLK * sblk,uint8_t nidx,IWLCTX * lx,const uint8_t * mm,uint8_t * idxp)1821 static WUR iwrc _sblk_insert_pi_mm(
1822   SBLK *sblk, uint8_t nidx, IWLCTX *lx,
1823   const uint8_t *mm, uint8_t *idxp
1824   ) {
1825   assert(sblk->kvblk);
1826 
1827   uint8_t *k;
1828   uint32_t kl;
1829   int idx = 0, lb = 0, ub = sblk->pnum - 1, nels = sblk->pnum; // NOLINT
1830 
1831   if (nels < 1) {
1832     sblk->pi[0] = nidx;
1833     ++sblk->pnum;
1834     *idxp = 0;
1835     return 0;
1836   }
1837   iwdb_flags_t dbflg = sblk->db->dbflg;
1838   while (1) {
1839     idx = (ub + lb) / 2;
1840     iwrc rc = _kvblk_key_peek(sblk->kvblk, sblk->pi[idx], mm, &k, &kl);
1841     RCRET(rc);
1842     int cr = _cmp_keys(dbflg, k, kl, lx->key);
1843     if (!cr) {
1844       break;
1845     } else if (cr < 0) {
1846       lb = idx + 1;
1847       if (lb > ub) {
1848         idx = lb;
1849         ++sblk->pnum;
1850         break;
1851       }
1852     } else {
1853       ub = idx - 1;
1854       if (lb > ub) {
1855         ++sblk->pnum;
1856         break;
1857       }
1858     }
1859   }
1860   if (nels - idx > 0) {
1861     memmove(sblk->pi + idx + 1, sblk->pi + idx, nels - idx);
1862   }
1863   sblk->pi[idx] = nidx;
1864   *idxp = idx;
1865   return 0;
1866 }
1867 
_sblk_addkv2(SBLK * sblk,int8_t idx,const IWKV_val * key,const IWKV_val * val,bool raw_key)1868 static WUR iwrc _sblk_addkv2(
1869   SBLK           *sblk,
1870   int8_t          idx,
1871   const IWKV_val *key,
1872   const IWKV_val *val,
1873   bool            raw_key
1874   ) {
1875   assert(sblk && key && key->size && key->data && val && idx >= 0 && sblk->kvblk);
1876 
1877   uint8_t kvidx;
1878   IWDB db = sblk->db;
1879   KVBLK *kvblk = sblk->kvblk;
1880   if (sblk->pnum >= KVBLK_IDXNUM) {
1881     return _IWKV_RC_KVBLOCK_FULL;
1882   }
1883 
1884   iwrc rc = _kvblk_addkv(kvblk, key, val, &kvidx, raw_key);
1885   RCRET(rc);
1886   if (sblk->pnum - idx > 0) {
1887     memmove(sblk->pi + idx + 1, sblk->pi + idx, sblk->pnum - idx);
1888   }
1889   sblk->pi[idx] = kvidx;
1890   if (sblk->kvblkn != ADDR2BLK(kvblk->addr)) {
1891     sblk->kvblkn = ADDR2BLK(kvblk->addr);
1892   }
1893   ++sblk->pnum;
1894   sblk->flags |= SBLK_DURTY;
1895   if (idx == 0) { // the lowest key inserted
1896     size_t ksize = key->size;
1897     bool compound = !raw_key && (db->dbflg & IWDB_COMPOUND_KEYS);
1898     if (compound) {
1899       ksize += IW_VNUMSIZE(key->compound);
1900     }
1901     sblk->lkl = MIN(db->iwkv->pklen, ksize);
1902     uint8_t *wp = sblk->lk;
1903     if (compound) {
1904       int len;
1905       IW_SETVNUMBUF64(len, wp, key->compound);
1906       wp += len;
1907     }
1908     memcpy(wp, key->data, sblk->lkl - (ksize - key->size));
1909     if (ksize <= db->iwkv->pklen) {
1910       sblk->flags |= SBLK_FULL_LKEY;
1911     } else {
1912       sblk->flags &= ~SBLK_FULL_LKEY;
1913     }
1914   }
1915   if (!raw_key) {
1916     // Update active cursors inside this block
1917     pthread_spin_lock(&db->cursors_slk);
1918     for (IWKV_cursor cur = db->cursors; cur; cur = cur->next) {
1919       if (cur->cn && (cur->cn->addr == sblk->addr)) {
1920         if (cur->cn != sblk) {
1921           memcpy(cur->cn, sblk, sizeof(*cur->cn));
1922           cur->cn->kvblk = 0;
1923           cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
1924         }
1925         if (cur->cnpos >= idx) {
1926           cur->cnpos++;
1927         }
1928       }
1929     }
1930     pthread_spin_unlock(&db->cursors_slk);
1931   }
1932   return 0;
1933 }
1934 
_sblk_addkv(SBLK * sblk,IWLCTX * lx)1935 static WUR iwrc _sblk_addkv(SBLK *sblk, IWLCTX *lx) {
1936   const IWKV_val *key = lx->key;
1937   const IWKV_val *val = lx->val;
1938   assert(key && key->size && key->data && val && sblk->kvblk);
1939   if (!sblk) {
1940     iwlog_error2("sblk != 0");
1941     return IW_ERROR_ASSERTION;
1942   }
1943   uint8_t *mm, idx, kvidx;
1944   IWDB db = sblk->db;
1945   KVBLK *kvblk = sblk->kvblk;
1946   IWFS_FSM *fsm = &sblk->db->iwkv->fsm;
1947   if (sblk->pnum >= KVBLK_IDXNUM) {
1948     return _IWKV_RC_KVBLOCK_FULL;
1949   }
1950   iwrc rc = _kvblk_addkv(kvblk, key, val, &kvidx, false);
1951   RCRET(rc);
1952   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1953   RCRET(rc);
1954   rc = _sblk_insert_pi_mm(sblk, kvidx, lx, mm, &idx);
1955   RCRET(rc);
1956   fsm->release_mmap(fsm);
1957   if (idx == 0) { // the lowest key inserted
1958     size_t ksize = key->size;
1959     bool compound = (db->dbflg & IWDB_COMPOUND_KEYS);
1960     if (compound) {
1961       ksize += IW_VNUMSIZE(key->compound);
1962     }
1963     sblk->lkl = MIN(db->iwkv->pklen, ksize);
1964     uint8_t *wp = sblk->lk;
1965     if (compound) {
1966       int len;
1967       IW_SETVNUMBUF64(len, wp, key->compound);
1968       wp += len;
1969     }
1970     memcpy(wp, key->data, sblk->lkl - (ksize - key->size));
1971     if (ksize <= db->iwkv->pklen) {
1972       sblk->flags |= SBLK_FULL_LKEY;
1973     } else {
1974       sblk->flags &= ~SBLK_FULL_LKEY;
1975     }
1976   }
1977   if (sblk->kvblkn != ADDR2BLK(kvblk->addr)) {
1978     sblk->kvblkn = ADDR2BLK(kvblk->addr);
1979   }
1980   sblk->flags |= SBLK_DURTY;
1981 
1982   // Update active cursors inside this block
1983   pthread_spin_lock(&db->cursors_slk);
1984   for (IWKV_cursor cur = db->cursors; cur; cur = cur->next) {
1985     if (cur->cn && (cur->cn->addr == sblk->addr)) {
1986       if (cur->cn != sblk) {
1987         memcpy(cur->cn, sblk, sizeof(*cur->cn));
1988         cur->cn->kvblk = 0;
1989         cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
1990       }
1991       if (cur->cnpos >= idx) {
1992         cur->cnpos++;
1993       }
1994     }
1995   }
1996   pthread_spin_unlock(&db->cursors_slk);
1997 
1998   return 0;
1999 }
2000 
_sblk_updatekv(SBLK * sblk,int8_t idx,const IWKV_val * key,const IWKV_val * val)2001 static WUR iwrc _sblk_updatekv(
2002   SBLK *sblk, int8_t idx,
2003   const IWKV_val *key, const IWKV_val *val
2004   ) {
2005   assert(sblk && sblk->kvblk && idx >= 0 && idx < sblk->pnum);
2006   IWDB db = sblk->db;
2007   KVBLK *kvblk = sblk->kvblk;
2008   uint8_t kvidx = sblk->pi[idx];
2009   iwrc intrc = 0;
2010   iwrc rc = _kvblk_updatev(kvblk, &kvidx, key, val);
2011   if (IWKV_IS_INTERNAL_RC(rc)) {
2012     intrc = rc;
2013     rc = 0;
2014   }
2015   RCRET(rc);
2016   if (sblk->kvblkn != ADDR2BLK(kvblk->addr)) {
2017     sblk->kvblkn = ADDR2BLK(kvblk->addr);
2018   }
2019   sblk->pi[idx] = kvidx;
2020   sblk->flags |= SBLK_DURTY;
2021   // Update active cursors inside this block
2022   pthread_spin_lock(&db->cursors_slk);
2023   for (IWKV_cursor cur = db->cursors; cur; cur = cur->next) {
2024     if (cur->cn && (cur->cn != sblk) && (cur->cn->addr == sblk->addr)) {
2025       memcpy(cur->cn, sblk, sizeof(*cur->cn));
2026       cur->cn->kvblk = 0;
2027       cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2028     }
2029   }
2030   pthread_spin_unlock(&db->cursors_slk);
2031   return intrc;
2032 }
2033 
_sblk_rmkv(SBLK * sblk,uint8_t idx)2034 static WUR iwrc _sblk_rmkv(SBLK *sblk, uint8_t idx) {
2035   assert(sblk && sblk->kvblk);
2036   IWDB db = sblk->db;
2037   KVBLK *kvblk = sblk->kvblk;
2038   IWFS_FSM *fsm = &sblk->db->iwkv->fsm;
2039   assert(kvblk && idx < sblk->pnum && sblk->pi[idx] < KVBLK_IDXNUM);
2040 
2041   iwrc rc = _kvblk_rmkv(kvblk, sblk->pi[idx], 0);
2042   RCRET(rc);
2043 
2044   if (sblk->kvblkn != ADDR2BLK(kvblk->addr)) {
2045     sblk->kvblkn = ADDR2BLK(kvblk->addr);
2046   }
2047   --sblk->pnum;
2048   sblk->flags |= SBLK_DURTY;
2049 
2050   if ((idx < sblk->pnum) && (sblk->pnum > 0)) {
2051     memmove(sblk->pi + idx, sblk->pi + idx + 1, sblk->pnum - idx);
2052   }
2053 
2054   if (idx == 0) { // Lowest key removed
2055     // Replace the lowest key with the next one or reset
2056     if (sblk->pnum > 0) {
2057       uint8_t *mm, *kbuf;
2058       uint32_t klen;
2059       rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2060       RCRET(rc);
2061       rc = _kvblk_key_peek(sblk->kvblk, sblk->pi[idx], mm, &kbuf, &klen);
2062       if (rc) {
2063         fsm->release_mmap(fsm);
2064         return rc;
2065       }
2066       sblk->lkl = MIN(db->iwkv->pklen, klen);
2067       memcpy(sblk->lk, kbuf, sblk->lkl);
2068       fsm->release_mmap(fsm);
2069       if (klen <= db->iwkv->pklen) {
2070         sblk->flags |= SBLK_FULL_LKEY;
2071       } else {
2072         sblk->flags &= ~SBLK_FULL_LKEY;
2073       }
2074     } else {
2075       sblk->lkl = 0;
2076     }
2077   }
2078 
2079   // Update active cursors
2080   pthread_spin_lock(&db->cursors_slk);
2081   for (IWKV_cursor cur = db->cursors; cur; cur = cur->next) {
2082     if (cur->cn && (cur->cn->addr == sblk->addr)) {
2083       cur->skip_next = 0;
2084       if (cur->cn != sblk) {
2085         memcpy(cur->cn, sblk, sizeof(*cur->cn));
2086         cur->cn->kvblk = 0;
2087         cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2088       }
2089       if (cur->cnpos == idx) {
2090         if (idx && (idx == sblk->pnum)) {
2091           cur->cnpos--;
2092           cur->skip_next = -1;
2093         } else {
2094           cur->skip_next = 1;
2095         }
2096       } else if (cur->cnpos > idx) {
2097         cur->cnpos--;
2098       }
2099     }
2100   }
2101   pthread_spin_unlock(&db->cursors_slk);
2102   return 0;
2103 }
2104 
2105 //--------------------------  IWLCTX
2106 
_lx_sblk_cmp_key(IWLCTX * lx,SBLK * sblk,int * resp)2107 WUR iwrc _lx_sblk_cmp_key(IWLCTX *lx, SBLK *sblk, int *resp) {
2108   int res = 0;
2109   iwrc rc = 0;
2110   iwdb_flags_t dbflg = sblk->db->dbflg;
2111   const IWKV_val *key = lx->key;
2112   uint8_t lkl = sblk->lkl;
2113   size_t ksize = key->size;
2114 
2115   if (IW_UNLIKELY((sblk->pnum < 1) || (sblk->flags & SBLK_DB))) {
2116     *resp = 0;
2117     iwlog_ecode_error3(IWKV_ERROR_CORRUPTED);
2118     return IWKV_ERROR_CORRUPTED;
2119   }
2120   if (dbflg & IWDB_COMPOUND_KEYS) {
2121     ksize += IW_VNUMSIZE(key->compound);
2122   }
2123   if (  (sblk->flags & SBLK_FULL_LKEY)
2124      || (ksize < lkl)
2125      || (dbflg & (IWDB_VNUM64_KEYS | IWDB_REALNUM_KEYS))) {
2126     res = _cmp_keys(dbflg, sblk->lk, lkl, key);
2127   } else {
2128     res = _cmp_keys_prefix(dbflg, sblk->lk, lkl, key);
2129     if (res == 0) {
2130       uint32_t kl;
2131       uint8_t *mm, *k;
2132       IWFS_FSM *fsm = &lx->db->iwkv->fsm;
2133       rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2134       if (rc) {
2135         *resp = 0;
2136         return rc;
2137       }
2138       if (!sblk->kvblk) {
2139         rc = _sblk_loadkvblk_mm(lx, sblk, mm);
2140         if (rc) {
2141           *resp = 0;
2142           fsm->release_mmap(fsm);
2143           return rc;
2144         }
2145       }
2146       rc = _kvblk_key_peek(sblk->kvblk, sblk->pi[0], mm, &k, &kl);
2147       RCRET(rc);
2148       res = _cmp_keys(dbflg, k, kl, key);
2149       fsm->release_mmap(fsm);
2150     }
2151   }
2152   *resp = res;
2153   return rc;
2154 }
2155 
_lx_roll_forward(IWLCTX * lx,uint8_t lvl)2156 static WUR iwrc _lx_roll_forward(IWLCTX *lx, uint8_t lvl) {
2157   iwrc rc = 0;
2158   int cret;
2159   SBLK *sblk;
2160   blkn_t blkn;
2161   assert(lx->lower);
2162 
2163   while ((blkn = lx->lower->n[lvl])) {
2164     off_t blkaddr = BLK2ADDR(blkn);
2165     if ((lx->nlvl > -1) && (lvl < lx->nlvl)) {
2166       uint8_t ulvl = lvl + 1;
2167       if (lx->pupper[ulvl] && (lx->pupper[ulvl]->addr == blkaddr)) {
2168         sblk = lx->pupper[ulvl];
2169       } else if (lx->plower[ulvl] && (lx->plower[ulvl]->addr == blkaddr)) {
2170         sblk = lx->plower[ulvl];
2171       } else {
2172         rc = _sblk_at(lx, blkaddr, 0, &sblk);
2173       }
2174     } else {
2175       rc = _sblk_at(lx, blkaddr, 0, &sblk);
2176     }
2177     RCRET(rc);
2178 #ifndef NDEBUG
2179     ++lx->num_cmps;
2180 #endif
2181     rc = _lx_sblk_cmp_key(lx, sblk, &cret);
2182     RCRET(rc);
2183     if ((cret > 0) || (lx->upper_addr == sblk->addr)) { // upper > key
2184       lx->upper = sblk;
2185       break;
2186     } else {
2187       lx->lower = sblk;
2188     }
2189   }
2190   return 0;
2191 }
2192 
_lx_find_bounds(IWLCTX * lx)2193 static WUR iwrc _lx_find_bounds(IWLCTX *lx) {
2194   iwrc rc = 0;
2195   int lvl;
2196   blkn_t blkn;
2197   SBLK *dblk = &lx->dblk;
2198   if (!dblk->addr) {
2199     SBLK *s;
2200     rc = _sblk_at(lx, lx->db->addr, 0, &s);
2201     RCRET(rc);
2202     memcpy(dblk, s, sizeof(*dblk));
2203   }
2204   if (!lx->lower) {
2205     lx->lower = &lx->dblk;
2206   }
2207   if (lx->nlvl > dblk->lvl) {
2208     // New level in DB
2209     dblk->lvl = (uint8_t) lx->nlvl;
2210     dblk->flags |= SBLK_DURTY;
2211   }
2212   lvl = lx->lower->lvl;
2213   while (lvl > -1) {
2214     rc = _lx_roll_forward(lx, (uint8_t) lvl);
2215     RCRET(rc);
2216     if (lx->upper) {
2217       blkn = ADDR2BLK(lx->upper->addr);
2218     } else {
2219       blkn = 0;
2220     }
2221     do {
2222       if (lx->nlvl >= lvl) {
2223         lx->plower[lvl] = lx->lower;
2224         lx->pupper[lvl] = lx->upper;
2225       }
2226     } while (lvl-- && lx->lower->n[lvl] == blkn);
2227   }
2228   return 0;
2229 }
2230 
_lx_release_mm(IWLCTX * lx,uint8_t * mm)2231 static iwrc _lx_release_mm(IWLCTX *lx, uint8_t *mm) {
2232   iwrc rc = 0;
2233   if (lx->nlvl > -1) {
2234     SBLK *lsb = 0, *usb = 0;
2235     if (lx->nb) {
2236       rc = _sblk_sync_mm(lx, lx->nb, mm);
2237       RCGO(rc, finish);
2238     }
2239     if (lx->pupper[0] == lx->upper) {
2240       lx->upper = 0;
2241     }
2242     if (lx->plower[0] == lx->lower) {
2243       lx->lower = 0;
2244     }
2245     for (int i = 0; i <= lx->nlvl; ++i) {
2246       if (lx->pupper[i]) {
2247         if (lx->pupper[i] != usb) {
2248           usb = lx->pupper[i];
2249           rc = _sblk_sync_and_release_mm(lx, &lx->pupper[i], mm);
2250           RCGO(rc, finish);
2251         }
2252         lx->pupper[i] = 0;
2253       }
2254       if (lx->plower[i]) {
2255         if (lx->plower[i] != lsb) {
2256           lsb = lx->plower[i];
2257           rc = _sblk_sync_and_release_mm(lx, &lx->plower[i], mm);
2258           RCGO(rc, finish);
2259         }
2260         lx->plower[i] = 0;
2261       }
2262     }
2263   }
2264   if (lx->upper) {
2265     rc = _sblk_sync_and_release_mm(lx, &lx->upper, mm);
2266     RCGO(rc, finish);
2267   }
2268   if (lx->lower) {
2269     rc = _sblk_sync_and_release_mm(lx, &lx->lower, mm);
2270     RCGO(rc, finish);
2271   }
2272   if (lx->dblk.flags & SBLK_DURTY) {
2273     rc = _sblk_sync_mm(lx, &lx->dblk, mm);
2274     RCGO(rc, finish);
2275   }
2276   if (lx->nb) {
2277     _sblk_release(lx, &lx->nb);
2278     RCGO(rc, finish);
2279   }
2280 
2281 finish:
2282   lx->destroy_addr = 0;
2283   return rc;
2284 }
2285 
_lx_release(IWLCTX * lx)2286 iwrc _lx_release(IWLCTX *lx) {
2287   uint8_t *mm;
2288   IWFS_FSM *fsm = &lx->db->iwkv->fsm;
2289   iwrc rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2290   RCRET(rc);
2291   rc = _lx_release_mm(lx, mm);
2292   IWRC(fsm->release_mmap(fsm), rc);
2293   return rc;
2294 }
2295 
_lx_split_addkv(IWLCTX * lx,int idx,SBLK * sblk)2296 static iwrc _lx_split_addkv(IWLCTX *lx, int idx, SBLK *sblk) {
2297   iwrc rc;
2298   SBLK *nb;
2299   blkn_t nblk;
2300   IWDB db = sblk->db;
2301   bool uside = (idx == sblk->pnum);
2302   register const int8_t pivot = (KVBLK_IDXNUM / 2) + 1; // 32
2303 
2304   if (uside) { // Upper side
2305     rc = _sblk_create(lx, (uint8_t) lx->nlvl, 0, sblk, lx->upper, &nb);
2306     RCRET(rc);
2307     rc = _sblk_addkv(nb, lx);
2308     RCGO(rc, finish);
2309   } else { // New key is somewhere in a middle of sblk->kvblk
2310     assert(sblk->kvblk);
2311     // We are in the middle
2312     // Do the partial split
2313     // Move kv pairs into new `nb`
2314     // Compute space required for the new sblk which stores kv pairs after pivot `idx`
2315     size_t sz = 0;
2316     for (int8_t i = pivot; i < sblk->pnum; ++i) {
2317       sz += sblk->kvblk->pidx[sblk->pi[i]].len;
2318     }
2319     if (idx > pivot) {
2320       sz += IW_VNUMSIZE(lx->key->size) + lx->key->size + lx->val->size;
2321     }
2322     sz += KVBLK_MAX_NKV_SZ;
2323     uint8_t kvbpow = (uint8_t) iwlog2_64(sz);
2324     while ((1ULL << kvbpow) < sz) kvbpow++;
2325 
2326     rc = _sblk_create(lx, (uint8_t) lx->nlvl, kvbpow, sblk, lx->upper, &nb);
2327     RCRET(rc);
2328 
2329     IWKV_val key, val;
2330     IWFS_FSM *fsm = &lx->db->iwkv->fsm;
2331     for (int8_t i = pivot, end = sblk->pnum; i < end; ++i) {
2332       uint8_t *mm;
2333       rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2334       RCBREAK(rc);
2335 
2336       rc = _kvblk_kv_get(sblk->kvblk, mm, sblk->pi[i], &key, &val);
2337       assert(key.size);
2338       fsm->release_mmap(fsm);
2339       RCBREAK(rc);
2340 
2341       rc = _sblk_addkv2(nb, i - pivot, &key, &val, true);
2342       _kv_dispose(&key, &val);
2343 
2344       RCBREAK(rc);
2345       sblk->kvblk->pidx[sblk->pi[i]].len = 0;
2346       sblk->kvblk->pidx[sblk->pi[i]].off = 0;
2347       --sblk->pnum;
2348     }
2349     sblk->kvblk->flags |= KVBLK_DURTY;
2350     sblk->kvblk->zidx = sblk->pi[pivot];
2351     sblk->kvblk->maxoff = 0;
2352     for (int i = 0; i < KVBLK_IDXNUM; ++i) {
2353       if (sblk->kvblk->pidx[i].off > sblk->kvblk->maxoff) {
2354         sblk->kvblk->maxoff = sblk->kvblk->pidx[i].off;
2355       }
2356     }
2357   }
2358 
2359   // Fix levels:
2360   //  [ lb -> sblk -> ub ]
2361   //  [ lb -> sblk -> nb -> ub ]
2362   nblk = ADDR2BLK(nb->addr);
2363   lx->pupper[0]->p0 = nblk;
2364   lx->pupper[0]->flags |= SBLK_DURTY;
2365   nb->p0 = ADDR2BLK(lx->plower[0]->addr);
2366   for (int i = 0; i <= nb->lvl; ++i) {
2367     lx->plower[i]->n[i] = nblk;
2368     lx->plower[i]->flags |= SBLK_DURTY;
2369     nb->n[i] = ADDR2BLK(lx->pupper[i]->addr);
2370   }
2371 
2372   pthread_spin_lock(&db->cursors_slk);
2373   for (IWKV_cursor cur = db->cursors; cur; cur = cur->next) {
2374     if (cur->cn && (cur->cn->addr == sblk->addr)) {
2375       if (cur->cnpos >= pivot) {
2376         memcpy(cur->cn, nb, sizeof(*cur->cn));
2377         cur->cn->kvblk = 0;
2378         cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2379         cur->cnpos -= pivot;
2380       }
2381     }
2382   }
2383   pthread_spin_unlock(&db->cursors_slk);
2384 
2385   if (!uside) {
2386     if (idx > pivot) {
2387       rc = _sblk_addkv(nb, lx);
2388     } else {
2389       rc = _sblk_addkv(sblk, lx);
2390     }
2391     RCGO(rc, finish);
2392   }
2393 
2394 finish:
2395   if (rc) {
2396     lx->nb = 0;
2397     IWRC(_sblk_destroy(lx, &nb), rc);
2398   } else {
2399     lx->nb = nb;
2400   }
2401   return rc;
2402 }
2403 
_lx_init_chute(IWLCTX * lx)2404 IW_INLINE iwrc _lx_init_chute(IWLCTX *lx) {
2405   assert(lx->nlvl >= 0);
2406   iwrc rc = 0;
2407   if (!lx->pupper[lx->nlvl]) { // fix zero upper by dbtail
2408     SBLK *dbtail;
2409     rc = _sblk_at(lx, 0, 0, &dbtail);
2410     RCRET(rc);
2411     for (int8_t i = lx->nlvl; i >= 0 && !lx->pupper[i]; --i) {
2412       lx->pupper[i] = dbtail;
2413     }
2414   }
2415   return 0;
2416 }
2417 
_lx_addkv(IWLCTX * lx)2418 static WUR iwrc _lx_addkv(IWLCTX *lx) {
2419   iwrc rc;
2420   bool found, uadd;
2421   uint8_t *mm = 0, idx;
2422   SBLK *sblk = lx->lower;
2423   IWFS_FSM *fsm = &lx->db->iwkv->fsm;
2424   if (lx->nlvl > -1) {
2425     rc = _lx_init_chute(lx);
2426     RCRET(rc);
2427   }
2428   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2429   RCRET(rc);
2430   rc = _sblk_loadkvblk_mm(lx, sblk, mm);
2431   if (rc) {
2432     fsm->release_mmap(fsm);
2433     return rc;
2434   }
2435   rc = _sblk_find_pi_mm(sblk, lx, mm, &found, &idx);
2436   RCRET(rc);
2437   if (found && (lx->opflags & IWKV_NO_OVERWRITE)) {
2438     fsm->release_mmap(fsm);
2439     return IWKV_ERROR_KEY_EXISTS;
2440   }
2441   uadd = (  !found
2442          && sblk->pnum > KVBLK_IDXNUM - 1 && idx > KVBLK_IDXNUM - 1
2443          && lx->upper && lx->upper->pnum < KVBLK_IDXNUM);
2444   if (uadd) {
2445     rc = _sblk_loadkvblk_mm(lx, lx->upper, mm);
2446     if (rc) {
2447       fsm->release_mmap(fsm);
2448       return rc;
2449     }
2450   }
2451   if (found) {
2452     IWKV_val sval, *val = lx->val;
2453     if (lx->opflags & IWKV_VAL_INCREMENT) {
2454       int64_t ival;
2455       uint8_t *rp;
2456       uint32_t len;
2457       if (val->size == 4) {
2458         int32_t lv;
2459         memcpy(&lv, val->data, val->size);
2460         lv = IW_ITOHL(lv);
2461         ival = lv;
2462       } else if (val->size == 8) {
2463         memcpy(&ival, val->data, val->size);
2464         ival = IW_ITOHLL(ival);
2465       } else {
2466         rc = IWKV_ERROR_VALUE_CANNOT_BE_INCREMENTED;
2467         fsm->release_mmap(fsm);
2468         return rc;
2469       }
2470       _kvblk_value_peek(sblk->kvblk, sblk->pi[idx], mm, &rp, &len);
2471       sval.data = rp;
2472       sval.size = len;
2473       if (sval.size == 4) {
2474         uint32_t lv;
2475         memcpy(&lv, sval.data, 4);
2476         lv = IW_ITOHL(lv);
2477         lv += ival;
2478         _num2lebuf(lx->incbuf, &lv, 4);
2479       } else if (sval.size == 8) {
2480         uint64_t llv;
2481         memcpy(&llv, sval.data, 8);
2482         llv = IW_ITOHLL(llv);
2483         llv += ival;
2484         _num2lebuf(lx->incbuf, &llv, 8);
2485       } else {
2486         rc = IWKV_ERROR_VALUE_CANNOT_BE_INCREMENTED;
2487         fsm->release_mmap(fsm);
2488         return rc;
2489       }
2490       sval.data = lx->incbuf;
2491       val = &sval;
2492     }
2493     if (lx->ph) {
2494       IWKV_val oldval;
2495       rc = _kvblk_value_get(sblk->kvblk, mm, sblk->pi[idx], &oldval);
2496       fsm->release_mmap(fsm);
2497       if (!rc) {
2498         // note: oldval should be disposed by ph
2499         rc = lx->ph(lx->key, lx->val, &oldval, lx->phop);
2500       }
2501       RCRET(rc);
2502     } else {
2503       fsm->release_mmap(fsm);
2504     }
2505     return _sblk_updatekv(sblk, idx, lx->key, val);
2506   } else {
2507     fsm->release_mmap(fsm);
2508     if (sblk->pnum > KVBLK_IDXNUM - 1) {
2509       if (uadd) {
2510         if (lx->ph) {
2511           rc = lx->ph(lx->key, lx->val, 0, lx->phop);
2512           RCRET(rc);
2513         }
2514         return _sblk_addkv(lx->upper, lx);
2515       }
2516       if (lx->nlvl < 0) {
2517         return _IWKV_RC_REQUIRE_NLEVEL;
2518       }
2519       if (lx->ph) {
2520         rc = lx->ph(lx->key, lx->val, 0, lx->phop);
2521         RCRET(rc);
2522       }
2523       return _lx_split_addkv(lx, idx, sblk);
2524     } else {
2525       if (lx->ph) {
2526         rc = lx->ph(lx->key, lx->val, 0, lx->phop);
2527         RCRET(rc);
2528       }
2529       return _sblk_addkv2(sblk, idx, lx->key, lx->val, false);
2530     }
2531   }
2532 }
2533 
_lx_put_lw(IWLCTX * lx)2534 IW_INLINE WUR iwrc _lx_put_lw(IWLCTX *lx) {
2535   iwrc rc;
2536 start:
2537   rc = _lx_find_bounds(lx);
2538   if (rc) {
2539     _lx_release_mm(lx, 0);
2540     return rc;
2541   }
2542   rc = _lx_addkv(lx);
2543   if (rc == _IWKV_RC_REQUIRE_NLEVEL) {
2544     SBLK *lower = lx->lower;
2545     lx->lower = 0;
2546     _lx_release_mm(lx, 0);
2547     lx->nlvl = _sblk_genlevel(lx->db);
2548     if (lower->lvl >= lx->nlvl) {
2549       lx->lower = lower;
2550     }
2551     goto start;
2552   }
2553   if (rc == _IWKV_RC_KVBLOCK_FULL) {
2554     rc = IWKV_ERROR_CORRUPTED;
2555     iwlog_ecode_error3(rc);
2556   }
2557   IWRC(_lx_release(lx), rc);
2558   return rc;
2559 }
2560 
_lx_get_lr(IWLCTX * lx)2561 IW_INLINE WUR iwrc _lx_get_lr(IWLCTX *lx) {
2562   iwrc rc = _lx_find_bounds(lx);
2563   RCRET(rc);
2564   bool found;
2565   uint8_t *mm, idx;
2566   IWFS_FSM *fsm = &lx->db->iwkv->fsm;
2567   lx->val->size = 0;
2568   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2569   RCRET(rc);
2570   rc = _sblk_loadkvblk_mm(lx, lx->lower, mm);
2571   RCGO(rc, finish);
2572   rc = _sblk_find_pi_mm(lx->lower, lx, mm, &found, &idx);
2573   RCGO(rc, finish);
2574   if (found) {
2575     rc = _kvblk_value_get(lx->lower->kvblk, mm, lx->lower->pi[idx], lx->val);
2576   } else {
2577     rc = IWKV_ERROR_NOTFOUND;
2578   }
2579 
2580 finish:
2581   IWRC(fsm->release_mmap(fsm), rc);
2582   _lx_release_mm(lx, 0);
2583   return rc;
2584 }
2585 
_lx_del_sblk_lw(IWLCTX * lx,SBLK * sblk,uint8_t idx)2586 static WUR iwrc _lx_del_sblk_lw(IWLCTX *lx, SBLK *sblk, uint8_t idx) {
2587   assert(sblk->pnum == 1 && sblk->kvblk);
2588 
2589   iwrc rc;
2590   IWDB db = lx->db;
2591   KVBLK *kvblk = sblk->kvblk;
2592   blkn_t sblk_blkn = ADDR2BLK(sblk->addr);
2593 
2594   _lx_release_mm(lx, 0);
2595   lx->nlvl = sblk->lvl;
2596   lx->upper_addr = sblk->addr;
2597 
2598   rc = _lx_find_bounds(lx);
2599   RCRET(rc);
2600   assert(lx->upper->pnum == 1 && lx->upper->addr == lx->upper_addr);
2601 
2602   lx->upper->kvblk = kvblk;
2603   rc = _sblk_rmkv(lx->upper, idx);
2604   RCGO(rc, finish);
2605 
2606   for (int i = 0; i <= lx->nlvl; ++i) {
2607     lx->plower[i]->n[i] = lx->upper->n[i];
2608     lx->plower[i]->flags |= SBLK_DURTY;
2609     if (lx->plower[i]->flags & SBLK_DB) {
2610       if (!lx->plower[i]->n[i]) {
2611         --lx->plower[i]->lvl;
2612       }
2613     }
2614     if (lx->pupper[i] == lx->upper) {
2615       // Do not touch `lx->upper` in next `_lx_release_mm()` call
2616       lx->pupper[i] = 0;
2617     }
2618   }
2619 
2620   SBLK rb;  // Block to remove
2621   memcpy(&rb, lx->upper, sizeof(rb));
2622 
2623   SBLK *nb, // Block after lx->upper
2624        *rbp = &rb;
2625 
2626   assert(!lx->nb);
2627   rc = _sblk_at(lx, BLK2ADDR(rb.n[0]), 0, &nb);
2628   RCGO(rc, finish);
2629   lx->nb = nb;
2630   lx->nb->p0 = rb.p0;
2631   lx->nb->flags |= SBLK_DURTY;
2632 
2633   // Update cursors within sblk removed
2634   pthread_spin_lock(&db->cursors_slk);
2635   for (IWKV_cursor cur = db->cursors; cur; cur = cur->next) {
2636     if (cur->cn) {
2637       if (cur->cn->addr == sblk->addr) {
2638         if (nb->flags & SBLK_DB) {
2639           if (!(lx->plower[0]->flags & SBLK_DB)) {
2640             memcpy(cur->cn, lx->plower[0], sizeof(*cur->cn));
2641             cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2642             cur->cn->kvblk = 0;
2643             cur->skip_next = -1;
2644             cur->cnpos = lx->plower[0]->pnum;
2645             if (cur->cnpos) {
2646               cur->cnpos--;
2647             }
2648           } else {
2649             cur->cn = 0;
2650             cur->cnpos = 0;
2651             cur->skip_next = 0;
2652           }
2653         } else {
2654           memcpy(cur->cn, nb, sizeof(*nb));
2655           cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2656           cur->cn->kvblk = 0;
2657           cur->cnpos = 0;
2658           cur->skip_next = 1;
2659         }
2660       } else if (cur->cn->n[0] == sblk_blkn) {
2661         memcpy(cur->cn, lx->plower[0], sizeof(*cur->cn));
2662         cur->cn->kvblk = 0;
2663         cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2664       } else if (cur->cn->p0 == sblk_blkn) {
2665         memcpy(cur->cn, nb, sizeof(*nb));
2666         cur->cn->kvblk = 0;
2667         cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2668       }
2669     }
2670   }
2671   pthread_spin_unlock(&db->cursors_slk);
2672 
2673   rc = _sblk_destroy(lx, &rbp);
2674 
2675 finish:
2676   return rc;
2677 }
2678 
_lx_del_lw(IWLCTX * lx)2679 static WUR iwrc _lx_del_lw(IWLCTX *lx) {
2680   iwrc rc;
2681   bool found;
2682   uint8_t *mm = 0, idx;
2683   IWDB db = lx->db;
2684   IWFS_FSM *fsm = &db->iwkv->fsm;
2685   SBLK *sblk;
2686 
2687   rc = _lx_find_bounds(lx);
2688   RCRET(rc);
2689 
2690   sblk = lx->lower;
2691   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2692   RCGO(rc, finish);
2693   rc = _sblk_loadkvblk_mm(lx, sblk, mm);
2694   RCGO(rc, finish);
2695   rc = _sblk_find_pi_mm(sblk, lx, mm, &found, &idx);
2696   RCGO(rc, finish);
2697   if (!found) {
2698     rc = IWKV_ERROR_NOTFOUND;
2699     goto finish;
2700   }
2701   fsm->release_mmap(fsm);
2702   mm = 0;
2703 
2704   if (sblk->pnum == 1) { // last kv in block
2705     rc = _lx_del_sblk_lw(lx, sblk, idx);
2706   } else {
2707     rc = _sblk_rmkv(sblk, idx);
2708   }
2709 
2710 finish:
2711   if (mm) {
2712     fsm->release_mmap(fsm);
2713   }
2714   if (rc) {
2715     _lx_release_mm(lx, 0);
2716   } else {
2717     rc = _lx_release(lx);
2718   }
2719   return rc;
2720 }
2721 
2722 //--------------------------  CURSOR
2723 
_cursor_get_ge_idx(IWLCTX * lx,IWKV_cursor_op op,uint8_t * oidx)2724 IW_INLINE WUR iwrc _cursor_get_ge_idx(IWLCTX *lx, IWKV_cursor_op op, uint8_t *oidx) {
2725   iwrc rc = _lx_find_bounds(lx);
2726   RCRET(rc);
2727   bool found;
2728   uint8_t *mm, idx;
2729   IWFS_FSM *fsm = &lx->db->iwkv->fsm;
2730   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2731   RCRET(rc);
2732   rc = _sblk_loadkvblk_mm(lx, lx->lower, mm);
2733   RCGO(rc, finish);
2734   rc = _sblk_find_pi_mm(lx->lower, lx, mm, &found, &idx);
2735   RCGO(rc, finish);
2736   if (found) {
2737     *oidx = idx;
2738   } else {
2739     if ((op == IWKV_CURSOR_EQ) || (lx->lower->flags & SBLK_DB) || (lx->lower->pnum < 1)) {
2740       rc = IWKV_ERROR_NOTFOUND;
2741     } else {
2742       *oidx = idx ? idx - 1 : idx;
2743     }
2744   }
2745 
2746 finish:
2747   IWRC(fsm->release_mmap(fsm), rc);
2748   return rc;
2749 }
2750 
_cursor_to_lr(IWKV_cursor cur,IWKV_cursor_op op)2751 static WUR iwrc _cursor_to_lr(IWKV_cursor cur, IWKV_cursor_op op) {
2752   iwrc rc = 0;
2753   IWDB db = cur->lx.db;
2754   IWLCTX *lx = &cur->lx;
2755   blkn_t dblk = ADDR2BLK(db->addr);
2756   if (op < IWKV_CURSOR_NEXT) { // IWKV_CURSOR_BEFORE_FIRST | IWKV_CURSOR_AFTER_LAST
2757     if (cur->cn) {
2758       _sblk_release(lx, &cur->cn);
2759     }
2760     if (op == IWKV_CURSOR_BEFORE_FIRST) {
2761       cur->dbaddr = db->addr;
2762       cur->cnpos = KVBLK_IDXNUM - 1;
2763     } else {
2764       cur->dbaddr = -1; // Negative as sign of dbtail
2765       cur->cnpos = 0;
2766     }
2767     return 0;
2768   }
2769 
2770 start:
2771   if (op < IWKV_CURSOR_EQ) { // IWKV_CURSOR_NEXT | IWKV_CURSOR_PREV
2772     blkn_t n = 0;
2773     if (!cur->cn) {
2774       if (cur->dbaddr) {
2775         rc = _sblk_at(lx, (cur->dbaddr < 0 ? 0 : cur->dbaddr), 0, &cur->cn);
2776         cur->dbaddr = 0;
2777         RCGO(rc, finish);
2778       } else {
2779         rc = IWKV_ERROR_NOTFOUND;
2780         goto finish;
2781       }
2782     }
2783     if (op == IWKV_CURSOR_NEXT) {
2784       if (cur->skip_next > 0) {
2785         goto finish;
2786       }
2787       if (cur->cnpos + 1 >= cur->cn->pnum) {
2788         n = cur->cn->n[0];
2789         if (!n) {
2790           rc = IWKV_ERROR_NOTFOUND;
2791           goto finish;
2792         }
2793         _sblk_release(lx, &cur->cn);
2794         rc = _sblk_at(lx, BLK2ADDR(n), 0, &cur->cn);
2795         RCGO(rc, finish);
2796         cur->cnpos = 0;
2797         if (IW_UNLIKELY(!cur->cn->pnum)) {
2798           goto start;
2799         }
2800       } else {
2801         if (cur->cn->flags & SBLK_DB) {
2802           rc = IWKV_ERROR_NOTFOUND;
2803           goto finish;
2804         }
2805         ++cur->cnpos;
2806       }
2807     } else { // IWKV_CURSOR_PREV
2808       if (cur->skip_next < 0) {
2809         goto finish;
2810       }
2811       if (cur->cnpos == 0) {
2812         n = cur->cn->p0;
2813         if (!n || (n == dblk)) {
2814           rc = IWKV_ERROR_NOTFOUND;
2815           goto finish;
2816         }
2817         _sblk_release(lx, &cur->cn);
2818         RCGO(rc, finish);
2819         rc = _sblk_at(lx, BLK2ADDR(n), 0, &cur->cn);
2820         RCGO(rc, finish);
2821         if (IW_LIKELY(cur->cn->pnum)) {
2822           cur->cnpos = cur->cn->pnum - 1;
2823         } else {
2824           goto start;
2825         }
2826       } else {
2827         if (cur->cn->flags & SBLK_DB) {
2828           rc = IWKV_ERROR_NOTFOUND;
2829           goto finish;
2830         }
2831         --cur->cnpos;
2832       }
2833     }
2834   } else { // IWKV_CURSOR_EQ | IWKV_CURSOR_GE
2835     if (!lx->key) {
2836       rc = IW_ERROR_INVALID_STATE;
2837       goto finish;
2838     }
2839     rc = _cursor_get_ge_idx(lx, op, &cur->cnpos);
2840     if (lx->upper) {
2841       _sblk_release(lx, &lx->upper);
2842     }
2843     if (!rc) {
2844       cur->cn = lx->lower;
2845       lx->lower = 0;
2846     }
2847   }
2848 
2849 finish:
2850   cur->skip_next = 0;
2851   if (rc && (rc != IWKV_ERROR_NOTFOUND)) {
2852     if (cur->cn) {
2853       _sblk_release(lx, &cur->cn);
2854     }
2855   }
2856   return rc;
2857 }
2858 
2859 //--------------------------  PUBLIC API
2860 
_kv_ecodefn(locale_t locale,uint32_t ecode)2861 static const char* _kv_ecodefn(locale_t locale, uint32_t ecode) {
2862   if (!((ecode > _IWKV_ERROR_START) && (ecode < _IWKV_ERROR_END))) {
2863     return 0;
2864   }
2865   switch (ecode) {
2866     case IWKV_ERROR_NOTFOUND:
2867       return "Key not found. (IWKV_ERROR_NOTFOUND)";
2868     case IWKV_ERROR_KEY_EXISTS:
2869       return "Key exists. (IWKV_ERROR_KEY_EXISTS)";
2870     case IWKV_ERROR_MAXKVSZ:
2871       return "Size of Key+value must be not greater than 0xfffffff bytes (IWKV_ERROR_MAXKVSZ)";
2872     case IWKV_ERROR_CORRUPTED:
2873       return "Database file invalid or corrupted (IWKV_ERROR_CORRUPTED)";
2874     case IWKV_ERROR_DUP_VALUE_SIZE:
2875       return "Value size is not compatible for insertion into sorted values array (IWKV_ERROR_DUP_VALUE_SIZE)";
2876     case IWKV_ERROR_KEY_NUM_VALUE_SIZE:
2877       return "Given key is not compatible to store as number (IWKV_ERROR_KEY_NUM_VALUE_SIZE)";
2878     case IWKV_ERROR_INCOMPATIBLE_DB_MODE:
2879       return "Incompatible database open mode (IWKV_ERROR_INCOMPATIBLE_DB_MODE)";
2880     case IWKV_ERROR_INCOMPATIBLE_DB_FORMAT:
2881       return "Incompatible database format version, please migrate database data (IWKV_ERROR_INCOMPATIBLE_DB_FORMAT)";
2882     case IWKV_ERROR_CORRUPTED_WAL_FILE:
2883       return "Corrupted WAL file (IWKV_ERROR_CORRUPTED_WAL_FILE)";
2884     case IWKV_ERROR_VALUE_CANNOT_BE_INCREMENTED:
2885       return "Stored value cannot be incremented/descremented (IWKV_ERROR_VALUE_CANNOT_BE_INCREMENTED)";
2886     case IWKV_ERROR_WAL_MODE_REQUIRED:
2887       return "Operation requires WAL enabled database. (IWKV_ERROR_WAL_MODE_REQUIRED)";
2888     case IWKV_ERROR_BACKUP_IN_PROGRESS:
2889       return "Backup operation in progress. (IWKV_ERROR_BACKUP_IN_PROGRESS)";
2890     default:
2891       break;
2892   }
2893   return 0;
2894 }
2895 
iwkv_init(void)2896 iwrc iwkv_init(void) {
2897   static int _kv_initialized = 0;
2898   if (!__sync_bool_compare_and_swap(&_kv_initialized, 0, 1)) {
2899     return 0;
2900   }
2901   return iwlog_register_ecodefn(_kv_ecodefn);
2902 }
2903 
_szpolicy(off_t nsize,off_t csize,struct IWFS_EXT * f,void ** _ctx)2904 static off_t _szpolicy(off_t nsize, off_t csize, struct IWFS_EXT *f, void **_ctx) {
2905   off_t res;
2906   size_t aunit = iwp_alloc_unit();
2907   if (csize < 0x4000000) { // Doubled alloc up to 64M
2908     res = csize ? csize : aunit;
2909     while (res < nsize) {
2910       res <<= 1;
2911     }
2912   } else {
2913     res = nsize + 10 * 1024 * 1024; // + 10M extra space
2914   }
2915   res = IW_ROUNDUP(res, aunit);
2916   return res;
2917 }
2918 
iwkv_state(IWKV iwkv,IWFS_FSM_STATE * out)2919 iwrc iwkv_state(IWKV iwkv, IWFS_FSM_STATE *out) {
2920   if (!iwkv || !out) {
2921     return IW_ERROR_INVALID_ARGS;
2922   }
2923   int rci;
2924   API_RLOCK(iwkv, rci);
2925   IWFS_FSM fsm = iwkv->fsm;
2926   iwrc rc = fsm.state(&fsm, out);
2927   API_UNLOCK(iwkv, rci, rc);
2928   return rc;
2929 }
2930 
iwkv_online_backup(IWKV iwkv,uint64_t * ts,const char * target_file)2931 iwrc iwkv_online_backup(IWKV iwkv, uint64_t *ts, const char *target_file) {
2932   return iwal_online_backup(iwkv, ts, target_file);
2933 }
2934 
_iwkv_check_online_backup(const char * path,iwp_lockmode extra_lock_flags,bool * out_has_online_bkp)2935 static iwrc _iwkv_check_online_backup(const char *path, iwp_lockmode extra_lock_flags, bool *out_has_online_bkp) {
2936   size_t sp;
2937   uint32_t lv;
2938   off_t fsz, pos;
2939   uint64_t waloff; // WAL offset
2940   char buf[16384];
2941 
2942   *out_has_online_bkp = false;
2943   const size_t aunit = iwp_alloc_unit();
2944   char *wpath = 0;
2945 
2946   IWFS_FILE f = { 0 }, w = { 0 };
2947   IWFS_FILE_STATE fs, fw;
2948   iwrc rc = iwfs_file_open(&f, &(IWFS_FILE_OPTS) {
2949     .path = path,
2950     .omode = IWFS_OREAD | IWFS_OWRITE,
2951     .lock_mode = IWP_WLOCK | extra_lock_flags
2952   });
2953   if (rc == IW_ERROR_NOT_EXISTS) {
2954     return 0;
2955   }
2956   RCRET(rc);
2957 
2958   rc = f.state(&f, &fs);
2959   RCGO(rc, finish);
2960 
2961   rc = iwp_lseek(fs.fh, 0, IWP_SEEK_END, &fsz);
2962   RCGO(rc, finish);
2963   if (fsz < iwp_alloc_unit()) {
2964     goto finish;
2965   }
2966 
2967   rc = iwp_pread(fs.fh, 0, &lv, sizeof(lv), &sp);
2968   RCGO(rc, finish);
2969   lv = IW_ITOHL(lv);
2970   if ((sp != sizeof(lv)) || (lv != IWFSM_MAGICK)) {
2971     goto finish;
2972   }
2973 
2974   rc = iwp_pread(fs.fh, IWFSM_CUSTOM_HDR_DATA_OFFSET, &lv, sizeof(lv), &sp);
2975   RCGO(rc, finish);
2976   lv = IW_ITOHL(lv);
2977   if ((sp != sizeof(lv)) || (lv != IWKV_MAGIC)) {
2978     goto finish;
2979   }
2980 
2981   rc = iwp_lseek(fs.fh, (off_t) -1 * sizeof(lv), IWP_SEEK_END, 0);
2982   RCGO(rc, finish);
2983 
2984   rc = iwp_read(fs.fh, &lv, sizeof(lv), &sp);
2985   RCGO(rc, finish);
2986   lv = IW_ITOHL(lv);
2987   if ((sp != sizeof(lv)) || (lv != IWKV_BACKUP_MAGIC)) {
2988     goto finish;
2989   }
2990 
2991   // Get WAL data offset
2992   rc = iwp_lseek(fs.fh, (off_t) -1 * (sizeof(waloff) + sizeof(lv)), IWP_SEEK_END, &pos);
2993   RCGO(rc, finish);
2994 
2995   rc = iwp_read(fs.fh, &waloff, sizeof(waloff), &sp);
2996   RCGO(rc, finish);
2997 
2998   waloff = IW_ITOHLL(waloff);
2999   if (((waloff != pos) && (waloff > pos - sizeof(WBSEP))) || (waloff & (aunit - 1))) {
3000     goto finish;
3001   }
3002 
3003   // Read the first WAL instruction: WBSEP
3004   if (waloff != pos) { // Not an empty WAL?
3005     WBSEP wbsep = { 0 };
3006     rc = iwp_pread(fs.fh, waloff, &wbsep, sizeof(wbsep), &sp);
3007     RCGO(rc, finish);
3008     if (wbsep.id != WOP_SEP) {
3009       goto finish;
3010     }
3011   }
3012 
3013   // Now we have an online backup image, unpack WAL file
3014 
3015   sp = strlen(path);
3016   wpath = malloc(sp + 4 /*-wal*/ + 1 /*\0*/);
3017   if (!wpath) {
3018     rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
3019     goto finish;
3020   }
3021   memcpy(wpath, path, sp);
3022   memcpy(wpath + sp, "-wal", 4);
3023   wpath[sp + 4] = '\0';
3024 
3025   iwlog_warn("Unpacking WAL from online backup into: %s", wpath);
3026   *out_has_online_bkp = true;
3027 
3028   // WAL file
3029   rc = iwfs_file_open(&w, &(IWFS_FILE_OPTS) {
3030     .path = wpath,
3031     .omode = IWFS_OREAD | IWFS_OWRITE | IWFS_OTRUNC
3032   });
3033   RCGO(rc, finish);
3034 
3035   rc = w.state(&w, &fw);
3036   RCGO(rc, finish);
3037 
3038   // WAL content copy
3039   rc = iwp_lseek(fs.fh, waloff, IWP_SEEK_SET, 0);
3040   RCGO(rc, finish);
3041   fsz = fsz - waloff - sizeof(lv) /* magic */ - sizeof(waloff) /* wal offset */;
3042   if (fsz > 0) {
3043     sp = 0;
3044     do {
3045       rc = iwp_read(fs.fh, buf, sizeof(buf), &sp);
3046       RCGO(rc, finish);
3047       if (sp > fsz) {
3048         sp = fsz;
3049       }
3050       fsz -= sp;
3051       rc = iwp_write(fw.fh, buf, sp);
3052       RCGO(rc, finish);
3053     } while (fsz > 0 && sp > 0);
3054   }
3055   rc = iwp_fsync(fw.fh);
3056   RCGO(rc, finish);
3057 
3058   rc = iwp_ftruncate(fs.fh, waloff);
3059   RCGO(rc, finish);
3060 
3061   rc = iwp_fsync(fs.fh);
3062   RCGO(rc, finish);
3063 
3064 finish:
3065   if (f.impl) {
3066     IWRC(f.close(&f), rc);
3067   }
3068   if (w.impl) {
3069     IWRC(w.close(&w), rc);
3070   }
3071   free(wpath);
3072   return rc;
3073 }
3074 
iwkv_open(const IWKV_OPTS * opts,IWKV * iwkvp)3075 iwrc iwkv_open(const IWKV_OPTS *opts, IWKV *iwkvp) {
3076   if (!opts || !iwkvp || !opts->path) {
3077     return IW_ERROR_INVALID_ARGS;
3078   }
3079   *iwkvp = 0;
3080   int rci;
3081   iwrc rc = 0;
3082   uint32_t lv;
3083   uint64_t llv;
3084   uint8_t *rp, *mm;
3085   bool has_online_bkp = false;
3086 
3087   rc = iw_init();
3088   RCRET(rc);
3089 
3090   if (opts->random_seed) {
3091     iwu_rand_seed(opts->random_seed);
3092   }
3093   iwkv_openflags oflags = opts->oflags;
3094   iwfs_omode omode = IWFS_OREAD;
3095   if (oflags & IWKV_TRUNC) {
3096     oflags &= ~IWKV_RDONLY;
3097     omode |= IWFS_OTRUNC;
3098   }
3099   if (!(oflags & IWKV_RDONLY)) {
3100     omode |= IWFS_OWRITE;
3101     omode |= IWFS_OCREATE;
3102   }
3103   if ((omode & IWFS_OWRITE) && !(omode & IWFS_OTRUNC)) {
3104     iwp_lockmode extra_lock_flags = 0;
3105     if (opts->file_lock_fail_fast) {
3106       extra_lock_flags |= IWP_NBLOCK;
3107     }
3108     rc = _iwkv_check_online_backup(opts->path, extra_lock_flags, &has_online_bkp);
3109     RCRET(rc);
3110   }
3111 
3112   *iwkvp = calloc(1, sizeof(struct _IWKV));
3113   if (!*iwkvp) {
3114     return iwrc_set_errno(IW_ERROR_ALLOC, errno);
3115   }
3116   IWKV iwkv = *iwkvp;
3117   iwkv->fmt_version = opts->fmt_version > 0 ? opts->fmt_version : IWKV_FORMAT;
3118   if (iwkv->fmt_version > IWKV_FORMAT) {
3119     rc = IWKV_ERROR_INCOMPATIBLE_DB_FORMAT;
3120     iwlog_ecode_error3(rc);
3121     return rc;
3122   }
3123   // Adjust lower key len accourding to database format version
3124   if (iwkv->fmt_version < 2) {
3125     iwkv->pklen = PREFIX_KEY_LEN_V1;
3126   } else {
3127     iwkv->pklen = PREFIX_KEY_LEN_V2;
3128   }
3129 
3130   pthread_rwlockattr_t attr;
3131   pthread_rwlockattr_init(&attr);
3132 #if defined __linux__ && (defined __USE_UNIX98 || defined __USE_XOPEN2K)
3133   pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
3134 #endif
3135   rci = pthread_rwlock_init(&iwkv->rwl, &attr);
3136   if (rci) {
3137     free(*iwkvp);
3138     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
3139   }
3140   rci = pthread_mutex_init(&iwkv->wk_mtx, 0);
3141   if (rci) {
3142     pthread_rwlock_destroy(&iwkv->rwl);
3143     free(*iwkvp);
3144     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
3145   }
3146   rci = pthread_cond_init(&iwkv->wk_cond, 0);
3147   if (rci) {
3148     pthread_rwlock_destroy(&iwkv->rwl);
3149     pthread_mutex_destroy(&iwkv->wk_mtx);
3150     free(*iwkvp);
3151     return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
3152   }
3153 
3154   iwkv->oflags = oflags;
3155   IWFS_FSM_STATE fsmstate;
3156   IWFS_FSM_OPTS fsmopts = {
3157     .exfile        = {
3158       .file        = {
3159         .path      = opts->path,
3160         .omode     = omode,
3161         .lock_mode = (oflags & IWKV_RDONLY) ? IWP_RLOCK : IWP_WLOCK
3162       },
3163       .rspolicy    = _szpolicy,
3164       .maxoff      = IWKV_MAX_DBSZ,
3165       .use_locks   = true
3166     },
3167     .bpow          = IWKV_FSM_BPOW, // 64 bytes block size
3168     .hdrlen        = KVHDRSZ,       // Size of custom file header
3169     .oflags        = ((oflags & IWKV_RDONLY) ? IWFSM_NOLOCKS : 0),
3170     .mmap_all      = true,
3171     .mmap_opts     = IWFS_MMAP_RANDOM
3172   };
3173 #ifndef NDEBUG
3174   fsmopts.oflags |= IWFSM_STRICT;
3175 #endif
3176   if (oflags & IWKV_NO_TRIM_ON_CLOSE) {
3177     fsmopts.oflags |= IWFSM_NO_TRIM_ON_CLOSE;
3178   }
3179   if (opts->file_lock_fail_fast) {
3180     fsmopts.exfile.file.lock_mode |= IWP_NBLOCK;
3181   }
3182   // Init WAL
3183   RCC(rc, finish, iwal_create(iwkv, opts, &fsmopts, has_online_bkp));
3184 
3185   // Now open database file
3186   RCC(rc, finish, iwfs_fsmfile_open(&iwkv->fsm, &fsmopts));
3187   RCB(finish, iwkv->dbs = iwhmap_create_u32(0));
3188 
3189   IWFS_FSM *fsm = &iwkv->fsm;
3190   RCC(rc, finish, fsm->state(fsm, &fsmstate));
3191 
3192   // Database header: [magic:u4, first_addr:u8, db_format_version:u4]
3193   if (fsmstate.exfile.file.ostatus & IWFS_OPEN_NEW) {
3194     uint8_t hdr[KVHDRSZ] = { 0 };
3195     uint8_t *wp = hdr;
3196     IW_WRITELV(wp, lv, IWKV_MAGIC);
3197     wp += sizeof(llv); // skip first db addr
3198     IW_WRITELV(wp, lv, iwkv->fmt_version);
3199     RCC(rc, finish, fsm->writehdr(fsm, 0, hdr, sizeof(hdr)));
3200     RCC(rc, finish, fsm->sync(fsm, 0));
3201   } else {
3202     off_t dbaddr; // first database address
3203     uint8_t hdr[KVHDRSZ];
3204     RCC(rc, finish, fsm->readhdr(fsm, 0, hdr, KVHDRSZ));
3205     rp = hdr; // -V507
3206     IW_READLV(rp, lv, lv);
3207     IW_READLLV(rp, llv, dbaddr);
3208     if ((lv != IWKV_MAGIC) || (dbaddr < 0)) {
3209       rc = IWKV_ERROR_CORRUPTED;
3210       iwlog_ecode_error3(rc);
3211       goto finish;
3212     }
3213     IW_READLV(rp, lv, iwkv->fmt_version);
3214     if ((iwkv->fmt_version > IWKV_FORMAT)) {
3215       rc = IWKV_ERROR_INCOMPATIBLE_DB_FORMAT;
3216       iwlog_ecode_error3(rc);
3217       goto finish;
3218     }
3219     if (iwkv->fmt_version < 2) {
3220       iwkv->pklen = PREFIX_KEY_LEN_V1;
3221     } else {
3222       iwkv->pklen = PREFIX_KEY_LEN_V2;
3223     }
3224     RCC(rc, finish, fsm->acquire_mmap(fsm, 0, &mm, 0));
3225     RCC(rc, finish, _db_load_chain(iwkv, dbaddr, mm));
3226     fsm->release_mmap(fsm);
3227   }
3228   (*iwkvp)->open = true;
3229 
3230 finish:
3231   if (rc) {
3232     (*iwkvp)->open = true; // will be closed in iwkv_close
3233     IWRC(iwkv_close(iwkvp), rc);
3234   }
3235   return rc;
3236 }
3237 
iwkv_exclusive_lock(IWKV iwkv)3238 iwrc iwkv_exclusive_lock(IWKV iwkv) {
3239   return _wnw(iwkv, _wnw_iwkw_wl);
3240 }
3241 
iwkv_exclusive_unlock(IWKV iwkv)3242 iwrc iwkv_exclusive_unlock(IWKV iwkv) {
3243   int rci;
3244   iwrc rc = 0;
3245   API_UNLOCK(iwkv, rci, rc);
3246   return rc;
3247 }
3248 
iwkv_close(IWKV * iwkvp)3249 iwrc iwkv_close(IWKV *iwkvp) {
3250   ENSURE_OPEN((*iwkvp));
3251   IWKV iwkv = *iwkvp;
3252   iwkv->open = false;
3253   iwal_shutdown(iwkv);
3254   iwrc rc = iwkv_exclusive_lock(iwkv);
3255   RCRET(rc);
3256   IWDB db = iwkv->first_db;
3257   while (db) {
3258     IWDB ndb = db->next;
3259     _db_release_lw(&db);
3260     db = ndb;
3261   }
3262   IWRC(iwkv->fsm.close(&iwkv->fsm), rc);
3263   // Below the memory cleanup only
3264   if (iwkv->dbs) {
3265     iwhmap_destroy(iwkv->dbs);
3266     iwkv->dbs = 0;
3267   }
3268 
3269   iwkv_exclusive_unlock(iwkv);
3270   pthread_rwlock_destroy(&iwkv->rwl);
3271   pthread_mutex_destroy(&iwkv->wk_mtx);
3272   pthread_cond_destroy(&iwkv->wk_cond);
3273   free(iwkv);
3274   *iwkvp = 0;
3275   return rc;
3276 }
3277 
_iwkv_sync(IWKV iwkv,iwfs_sync_flags _flags)3278 static iwrc _iwkv_sync(IWKV iwkv, iwfs_sync_flags _flags) {
3279   ENSURE_OPEN(iwkv);
3280   if (iwkv->oflags & IWKV_RDONLY) {
3281     return IW_ERROR_READONLY;
3282   }
3283   iwrc rc;
3284   if (iwkv->dlsnr) {
3285     rc = iwal_poke_savepoint(iwkv);
3286   } else {
3287     IWFS_FSM *fsm = &iwkv->fsm;
3288     pthread_rwlock_wrlock(&iwkv->rwl);
3289     iwfs_sync_flags flags = IWFS_FDATASYNC | _flags;
3290     rc = fsm->sync(fsm, flags);
3291     pthread_rwlock_unlock(&iwkv->rwl);
3292   }
3293   return rc;
3294 }
3295 
iwkv_sync(IWKV iwkv,iwfs_sync_flags _flags)3296 iwrc iwkv_sync(IWKV iwkv, iwfs_sync_flags _flags) {
3297   ENSURE_OPEN(iwkv);
3298   if (iwkv->oflags & IWKV_RDONLY) {
3299     return IW_ERROR_READONLY;
3300   }
3301   iwrc rc;
3302   if (iwkv->dlsnr) {
3303     rc = iwkv_exclusive_lock(iwkv);
3304     RCRET(rc);
3305     rc = iwal_savepoint_exl(iwkv, true);
3306     iwkv_exclusive_unlock(iwkv);
3307   } else {
3308     IWFS_FSM *fsm = &iwkv->fsm;
3309     pthread_rwlock_wrlock(&iwkv->rwl);
3310     iwfs_sync_flags flags = IWFS_FDATASYNC | _flags;
3311     rc = fsm->sync(fsm, flags);
3312     pthread_rwlock_unlock(&iwkv->rwl);
3313   }
3314   return rc;
3315 }
3316 
iwkv_db(IWKV iwkv,uint32_t dbid,iwdb_flags_t dbflg,IWDB * dbp)3317 iwrc iwkv_db(IWKV iwkv, uint32_t dbid, iwdb_flags_t dbflg, IWDB *dbp) {
3318   int rci;
3319   iwrc rc = 0;
3320   IWDB db = 0;
3321   *dbp = 0;
3322 
3323   API_RLOCK(iwkv, rci);
3324   db = iwhmap_get_u32(iwkv->dbs, dbid);
3325   API_UNLOCK(iwkv, rci, rc);
3326   RCRET(rc);
3327 
3328   if (db) {
3329     if (db->dbflg != dbflg) {
3330       return IWKV_ERROR_INCOMPATIBLE_DB_MODE;
3331     }
3332     *dbp = db;
3333     return 0;
3334   }
3335   if (iwkv->oflags & IWKV_RDONLY) {
3336     return IW_ERROR_READONLY;
3337   }
3338   rc = iwkv_exclusive_lock(iwkv);
3339   RCRET(rc);
3340 
3341   db = iwhmap_get_u32(iwkv->dbs, dbid);
3342   if (db) {
3343     if (db->dbflg != dbflg) {
3344       return IWKV_ERROR_INCOMPATIBLE_DB_MODE;
3345     }
3346     *dbp = db;
3347   } else {
3348     rc = _db_create_lw(iwkv, dbid, dbflg, dbp);
3349   }
3350   if (!rc) {
3351     rc = iwal_savepoint_exl(iwkv, true);
3352   }
3353   iwkv_exclusive_unlock(iwkv);
3354   return rc;
3355 }
3356 
iwkv_new_db(IWKV iwkv,iwdb_flags_t dbflg,uint32_t * dbidp,IWDB * dbp)3357 iwrc iwkv_new_db(IWKV iwkv, iwdb_flags_t dbflg, uint32_t *dbidp, IWDB *dbp) {
3358   *dbp = 0;
3359   *dbidp = 0;
3360   if (iwkv->oflags & IWKV_RDONLY) {
3361     return IW_ERROR_READONLY;
3362   }
3363   uint32_t dbid = 0;
3364   iwrc rc = iwkv_exclusive_lock(iwkv);
3365   RCRET(rc);
3366 
3367   IWHMAP_ITER iter;
3368   iwhmap_iter_init(iwkv->dbs, &iter);
3369 
3370   while (iwhmap_iter_next(&iter)) {
3371     uint32_t id = (uint32_t) (uintptr_t) iter.key;
3372     if (id > dbid) {
3373       dbid = id;
3374     }
3375   }
3376 
3377   dbid++;
3378   rc = _db_create_lw(iwkv, dbid, dbflg, dbp);
3379   if (!rc) {
3380     *dbidp = dbid;
3381     rc = iwal_savepoint_exl(iwkv, true);
3382   }
3383   iwkv_exclusive_unlock(iwkv);
3384   return rc;
3385 }
3386 
iwkv_db_destroy(IWDB * dbp)3387 iwrc iwkv_db_destroy(IWDB *dbp) {
3388   if (!dbp || !*dbp) {
3389     return IW_ERROR_INVALID_ARGS;
3390   }
3391   IWDB db = *dbp;
3392   IWKV iwkv = db->iwkv;
3393   *dbp = 0;
3394   if (iwkv->oflags & IWKV_RDONLY) {
3395     return IW_ERROR_READONLY;
3396   }
3397   iwrc rc = iwkv_exclusive_lock(iwkv);
3398   RCRET(rc);
3399   rc = _db_destroy_lw(&db);
3400   iwkv_exclusive_unlock(iwkv);
3401   return rc;
3402 }
3403 
iwkv_puth(IWDB db,const IWKV_val * key,const IWKV_val * val,iwkv_opflags opflags,IWKV_PUT_HANDLER ph,void * phop)3404 iwrc iwkv_puth(
3405   IWDB db, const IWKV_val *key, const IWKV_val *val,
3406   iwkv_opflags opflags, IWKV_PUT_HANDLER ph, void *phop
3407   ) {
3408   if (!db || !db->iwkv || !key || !key->size || !val) {
3409     return IW_ERROR_INVALID_ARGS;
3410   }
3411   IWKV iwkv = db->iwkv;
3412   if (iwkv->oflags & IWKV_RDONLY) {
3413     return IW_ERROR_READONLY;
3414   }
3415   if (opflags & IWKV_VAL_INCREMENT) {
3416     // No overwrite for increment
3417     opflags &= ~IWKV_NO_OVERWRITE;
3418   }
3419 
3420   int rci;
3421   IWKV_val ekey;
3422   uint8_t nbuf[IW_VNUMBUFSZ];
3423   iwrc rc = _to_effective_key(db, key, &ekey, nbuf);
3424   RCRET(rc);
3425 
3426   IWLCTX lx = {
3427     .db      = db,
3428     .key     = &ekey,
3429     .val     = (IWKV_val*) val,
3430     .nlvl    = -1,
3431     .op      = IWLCTX_PUT,
3432     .opflags = opflags,
3433     .ph      = ph,
3434     .phop    = phop
3435   };
3436   API_DB_WLOCK(db, rci);
3437   rc = _lx_put_lw(&lx);
3438   API_DB_UNLOCK(db, rci, rc);
3439   if (!rc) {
3440     if (lx.opflags & IWKV_SYNC) {
3441       rc = _iwkv_sync(iwkv, 0);
3442     } else {
3443       rc = iwal_poke_checkpoint(iwkv, false);
3444     }
3445   }
3446   return rc;
3447 }
3448 
iwkv_put(IWDB db,const IWKV_val * key,const IWKV_val * val,iwkv_opflags opflags)3449 iwrc iwkv_put(IWDB db, const IWKV_val *key, const IWKV_val *val, iwkv_opflags opflags) {
3450   return iwkv_puth(db, key, val, opflags, 0, 0);
3451 }
3452 
iwkv_get(IWDB db,const IWKV_val * key,IWKV_val * oval)3453 iwrc iwkv_get(IWDB db, const IWKV_val *key, IWKV_val *oval) {
3454   if (!db || !db->iwkv || !key || !oval) {
3455     return IW_ERROR_INVALID_ARGS;
3456   }
3457 
3458   int rci;
3459   IWKV_val ekey;
3460   uint8_t nbuf[IW_VNUMBUFSZ];
3461   iwrc rc = _to_effective_key(db, key, &ekey, nbuf);
3462   RCRET(rc);
3463 
3464   IWLCTX lx = {
3465     .db   = db,
3466     .key  = &ekey,
3467     .val  = oval,
3468     .nlvl = -1
3469   };
3470   oval->size = 0;
3471   API_DB_RLOCK(db, rci);
3472   rc = _lx_get_lr(&lx);
3473   API_DB_UNLOCK(db, rci, rc);
3474   return rc;
3475 }
3476 
iwkv_get_copy(IWDB db,const IWKV_val * key,void * vbuf,size_t vbufsz,size_t * vsz)3477 iwrc iwkv_get_copy(IWDB db, const IWKV_val *key, void *vbuf, size_t vbufsz, size_t *vsz) {
3478   if (!db || !db->iwkv || !key || !vbuf) {
3479     return IW_ERROR_INVALID_ARGS;
3480   }
3481   *vsz = 0;
3482 
3483   int rci;
3484   bool found;
3485   IWKV_val ekey;
3486   uint32_t ovalsz;
3487   uint8_t *mm = 0, *oval, idx;
3488   IWFS_FSM *fsm = &db->iwkv->fsm;
3489   uint8_t nbuf[IW_VNUMBUFSZ];
3490   iwrc rc = _to_effective_key(db, key, &ekey, nbuf);
3491   RCRET(rc);
3492 
3493   IWLCTX lx = {
3494     .db   = db,
3495     .key  = &ekey,
3496     .nlvl = -1
3497   };
3498   API_DB_RLOCK(db, rci);
3499   rc = _lx_find_bounds(&lx);
3500   RCGO(rc, finish);
3501   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
3502   RCGO(rc, finish);
3503   rc = _sblk_loadkvblk_mm(&lx, lx.lower, mm);
3504   RCGO(rc, finish);
3505   rc = _sblk_find_pi_mm(lx.lower, &lx, mm, &found, &idx);
3506   RCGO(rc, finish);
3507   if (found) {
3508     _kvblk_value_peek(lx.lower->kvblk, lx.lower->pi[idx], mm, &oval, &ovalsz);
3509     *vsz = ovalsz;
3510     memcpy(vbuf, oval, MIN(vbufsz, ovalsz));
3511   } else {
3512     rc = IWKV_ERROR_NOTFOUND;
3513   }
3514 
3515 finish:
3516   if (mm) {
3517     IWRC(fsm->release_mmap(fsm), rc);
3518   }
3519   _lx_release_mm(&lx, 0);
3520   API_DB_UNLOCK(db, rci, rc);
3521   return rc;
3522 }
3523 
iwkv_db_set_meta(IWDB db,void * buf,size_t sz)3524 iwrc iwkv_db_set_meta(IWDB db, void *buf, size_t sz) {
3525   if (!db || !db->iwkv || !buf) {
3526     return IW_ERROR_INVALID_ARGS;
3527   }
3528   if (!sz) {
3529     return 0;
3530   }
3531 
3532   int rci;
3533   iwrc rc = 0;
3534   bool resized = false;
3535   uint8_t *mm = 0, *wp, *sp;
3536   IWFS_FSM *fsm = &db->iwkv->fsm;
3537   size_t asz = IW_ROUNDUP(sz, 1U << IWKV_FSM_BPOW);
3538 
3539   API_DB_WLOCK(db, rci);
3540   if ((asz > db->meta_blkn) || (asz * 2 <= db->meta_blkn)) {
3541     off_t oaddr = 0;
3542     off_t olen = 0;
3543     if (db->meta_blk) {
3544       rc = fsm->deallocate(fsm, BLK2ADDR(db->meta_blk), BLK2ADDR(db->meta_blkn));
3545       RCGO(rc, finish);
3546     }
3547     rc = fsm->allocate(fsm, asz, &oaddr, &olen, IWKV_FSM_ALLOC_FLAGS);
3548     RCGO(rc, finish);
3549     db->meta_blk = ADDR2BLK(oaddr);
3550     db->meta_blkn = ADDR2BLK(olen);
3551     resized = true;
3552   }
3553   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
3554   RCGO(rc, finish);
3555   wp = mm + BLK2ADDR(db->meta_blk);
3556   memcpy(wp, buf, sz);
3557   if (db->iwkv->dlsnr) {
3558     rc = db->iwkv->dlsnr->onwrite(db->iwkv->dlsnr, wp - mm, wp, sz, 0);
3559     RCGO(rc, finish);
3560   }
3561   if (resized) {
3562     uint32_t lv;
3563     wp = mm + db->addr + DOFF_METABLK_U4;
3564     sp = wp;
3565     IW_WRITELV(wp, lv, db->meta_blk);
3566     IW_WRITELV(wp, lv, db->meta_blkn);
3567     if (db->iwkv->dlsnr) {
3568       rc = db->iwkv->dlsnr->onwrite(db->iwkv->dlsnr, sp - mm, sp, wp - sp, 0);
3569       RCGO(rc, finish);
3570     }
3571   }
3572   fsm->release_mmap(fsm);
3573   mm = 0;
3574 
3575 finish:
3576   if (mm) {
3577     fsm->release_mmap(fsm);
3578   }
3579   API_DB_UNLOCK(db, rci, rc);
3580   return rc;
3581 }
3582 
iwkv_db_get_meta(IWDB db,void * buf,size_t sz,size_t * rsz)3583 iwrc iwkv_db_get_meta(IWDB db, void *buf, size_t sz, size_t *rsz) {
3584   if (!db || !db->iwkv || !buf) {
3585     return IW_ERROR_INVALID_ARGS;
3586   }
3587   *rsz = 0;
3588   if (!sz || !db->meta_blkn) {
3589     return 0;
3590   }
3591   int rci;
3592   iwrc rc = 0;
3593   uint8_t *mm = 0;
3594   IWFS_FSM *fsm = &db->iwkv->fsm;
3595   size_t rmax = BLK2ADDR(db->meta_blkn);
3596   if (sz > rmax) {
3597     sz = rmax;
3598   }
3599   API_DB_RLOCK(db, rci);
3600   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
3601   RCGO(rc, finish);
3602   memcpy(buf, mm + BLK2ADDR(db->meta_blk), sz);
3603   *rsz = sz;
3604 
3605 finish:
3606   if (mm) {
3607     fsm->release_mmap(fsm);
3608   }
3609   API_DB_UNLOCK(db, rci, rc);
3610   return rc;
3611 }
3612 
iwkv_del(IWDB db,const IWKV_val * key,iwkv_opflags opflags)3613 iwrc iwkv_del(IWDB db, const IWKV_val *key, iwkv_opflags opflags) {
3614   if (!db || !db->iwkv || !key) {
3615     return IW_ERROR_INVALID_ARGS;
3616   }
3617   int rci;
3618   IWKV_val ekey;
3619   IWKV iwkv = db->iwkv;
3620 
3621   uint8_t nbuf[IW_VNUMBUFSZ];
3622   iwrc rc = _to_effective_key(db, key, &ekey, nbuf);
3623   RCRET(rc);
3624   IWLCTX lx = {
3625     .db      = db,
3626     .key     = &ekey,
3627     .nlvl    = -1,
3628     .op      = IWLCTX_DEL,
3629     .opflags = opflags
3630   };
3631   API_DB_WLOCK(db, rci);
3632   rc = _lx_del_lw(&lx);
3633   API_DB_UNLOCK(db, rci, rc);
3634   if (!rc) {
3635     if (lx.opflags & IWKV_SYNC) {
3636       rc = _iwkv_sync(iwkv, 0);
3637     } else {
3638       rc = iwal_poke_checkpoint(iwkv, false);
3639     }
3640   }
3641   return rc;
3642 }
3643 
_cursor_close_lw(IWKV_cursor cur)3644 IW_INLINE iwrc _cursor_close_lw(IWKV_cursor cur) {
3645   iwrc rc = 0;
3646   cur->closed = true;
3647   IWDB db = cur->lx.db;
3648   pthread_spin_lock(&db->cursors_slk);
3649   for (IWKV_cursor c = db->cursors, pc = 0; c; pc = c, c = c->next) {
3650     if (c == cur) {
3651       if (pc) {
3652         pc->next = c->next;
3653       } else {
3654         db->cursors = c->next;
3655       }
3656       break;
3657     }
3658   }
3659   pthread_spin_unlock(&db->cursors_slk);
3660   return rc;
3661 }
3662 
iwkv_cursor_open(IWDB db,IWKV_cursor * curptr,IWKV_cursor_op op,const IWKV_val * key)3663 iwrc iwkv_cursor_open(
3664   IWDB            db,
3665   IWKV_cursor    *curptr,
3666   IWKV_cursor_op  op,
3667   const IWKV_val *key
3668   ) {
3669   if (  !db || !db->iwkv || !curptr
3670      || (key && (op < IWKV_CURSOR_EQ)) || (op < IWKV_CURSOR_BEFORE_FIRST)) {
3671     return IW_ERROR_INVALID_ARGS;
3672   }
3673   iwrc rc;
3674   int rci;
3675   rc = _db_worker_inc_nolk(db);
3676   RCRET(rc);
3677   rc = _api_db_rlock(db);
3678   if (rc) {
3679     _db_worker_dec_nolk(db);
3680     return rc;
3681   }
3682   IWKV_cursor cur = 0;
3683   *curptr = calloc(1, sizeof(**curptr));
3684   if (!(*curptr)) {
3685     rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
3686     goto finish;
3687   }
3688   cur = *curptr;
3689   IWLCTX *lx = &cur->lx;
3690   lx->db = db;
3691   lx->nlvl = -1;
3692   if (key) {
3693     rc = _to_effective_key(db, key, &lx->ekey, lx->nbuf);
3694     RCGO(rc, finish);
3695     lx->key = &lx->ekey;
3696   }
3697   rc = _cursor_to_lr(cur, op);
3698 
3699 finish:
3700   if (cur) {
3701     if (rc) {
3702       *curptr = 0;
3703       IWRC(_cursor_close_lw(cur), rc);
3704       free(cur);
3705     } else {
3706       pthread_spin_lock(&db->cursors_slk);
3707       cur->next = db->cursors;
3708       db->cursors = cur;
3709       pthread_spin_unlock(&db->cursors_slk);
3710     }
3711   }
3712   API_DB_UNLOCK(db, rci, rc);
3713   if (rc) {
3714     _db_worker_dec_nolk(db);
3715   }
3716   return rc;
3717 }
3718 
iwkv_cursor_close(IWKV_cursor * curp)3719 iwrc iwkv_cursor_close(IWKV_cursor *curp) {
3720   iwrc rc = 0;
3721   int rci;
3722   if (!curp || !*curp) {
3723     return 0;
3724   }
3725   IWKV_cursor cur = *curp;
3726   *curp = 0;
3727   IWKV iwkv = cur->lx.db->iwkv;
3728   if (cur->closed) {
3729     free(cur);
3730     return 0;
3731   }
3732   if (!cur->lx.db) {
3733     return IW_ERROR_INVALID_ARGS;
3734   }
3735   API_DB_WLOCK(cur->lx.db, rci);
3736   rc = _cursor_close_lw(cur);
3737   API_DB_UNLOCK(cur->lx.db, rci, rc);
3738   IWRC(_db_worker_dec_nolk(cur->lx.db), rc);
3739   free(cur);
3740   if (!rc) {
3741     rc = iwal_poke_checkpoint(iwkv, false);
3742   }
3743   return rc;
3744 }
3745 
iwkv_cursor_to(IWKV_cursor cur,IWKV_cursor_op op)3746 iwrc iwkv_cursor_to(IWKV_cursor cur, IWKV_cursor_op op) {
3747   int rci;
3748   if (!cur) {
3749     return IW_ERROR_INVALID_ARGS;
3750   }
3751   if (!cur->lx.db) {
3752     return IW_ERROR_INVALID_ARGS;
3753   }
3754   API_DB_RLOCK(cur->lx.db, rci);
3755   iwrc rc = _cursor_to_lr(cur, op);
3756   API_DB_UNLOCK(cur->lx.db, rci, rc);
3757   return rc;
3758 }
3759 
iwkv_cursor_to_key(IWKV_cursor cur,IWKV_cursor_op op,const IWKV_val * key)3760 iwrc iwkv_cursor_to_key(IWKV_cursor cur, IWKV_cursor_op op, const IWKV_val *key) {
3761   int rci;
3762   if (!cur || ((op != IWKV_CURSOR_EQ) && (op != IWKV_CURSOR_GE))) {
3763     return IW_ERROR_INVALID_ARGS;
3764   }
3765   IWLCTX *lx = &cur->lx;
3766   if (!lx->db) {
3767     return IW_ERROR_INVALID_STATE;
3768   }
3769   iwrc rc = _to_effective_key(lx->db, key, &lx->ekey, lx->nbuf);
3770   RCRET(rc);
3771 
3772   API_DB_RLOCK(lx->db, rci);
3773   lx->key = &lx->ekey;
3774   rc = _cursor_to_lr(cur, op);
3775   API_DB_UNLOCK(lx->db, rci, rc);
3776   return rc;
3777 }
3778 
iwkv_cursor_get(IWKV_cursor cur,IWKV_val * okey,IWKV_val * oval)3779 iwrc iwkv_cursor_get(
3780   IWKV_cursor cur,
3781   IWKV_val   *okey,                    /* Nullable */
3782   IWKV_val   *oval
3783   ) {                                  /* Nullable */
3784   int rci;
3785   iwrc rc = 0;
3786   if (!cur || !cur->lx.db) {
3787     return IW_ERROR_INVALID_ARGS;
3788   }
3789   if (!cur->cn || (cur->cn->flags & SBLK_DB) || (cur->cnpos >= cur->cn->pnum)) {
3790     return IWKV_ERROR_NOTFOUND;
3791   }
3792   IWLCTX *lx = &cur->lx;
3793   API_DB_RLOCK(lx->db, rci);
3794   uint8_t *mm = 0;
3795   IWFS_FSM *fsm = &lx->db->iwkv->fsm;
3796   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
3797   RCGO(rc, finish);
3798   if (!cur->cn->kvblk) {
3799     rc = _sblk_loadkvblk_mm(lx, cur->cn, mm);
3800     RCGO(rc, finish);
3801   }
3802   uint8_t idx = cur->cn->pi[cur->cnpos];
3803   if (okey && oval) {
3804     rc = _kvblk_kv_get(cur->cn->kvblk, mm, idx, okey, oval);
3805   } else if (oval) {
3806     rc = _kvblk_value_get(cur->cn->kvblk, mm, idx, oval);
3807   } else if (okey) {
3808     rc = _kvblk_key_get(cur->cn->kvblk, mm, idx, okey);
3809   } else {
3810     rc = IW_ERROR_INVALID_ARGS;
3811   }
3812   if (!rc && okey) {
3813     _unpack_effective_key(lx->db, okey, false);
3814   }
3815 finish:
3816   if (mm) {
3817     fsm->release_mmap(fsm);
3818   }
3819   API_DB_UNLOCK(lx->db, rci, rc);
3820   return rc;
3821 }
3822 
iwkv_cursor_copy_val(IWKV_cursor cur,void * vbuf,size_t vbufsz,size_t * vsz)3823 iwrc iwkv_cursor_copy_val(IWKV_cursor cur, void *vbuf, size_t vbufsz, size_t *vsz) {
3824   int rci;
3825   iwrc rc = 0;
3826   if (!cur || !vbuf || !cur->lx.db) {
3827     return IW_ERROR_INVALID_ARGS;
3828   }
3829   if (!cur->cn || (cur->cn->flags & SBLK_DB) || (cur->cnpos >= cur->cn->pnum)) {
3830     return IWKV_ERROR_NOTFOUND;
3831   }
3832 
3833   *vsz = 0;
3834   IWLCTX *lx = &cur->lx;
3835   API_DB_RLOCK(lx->db, rci);
3836   uint8_t *mm = 0, *oval;
3837   uint32_t ovalsz;
3838   IWFS_FSM *fsm = &lx->db->iwkv->fsm;
3839   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
3840   RCGO(rc, finish);
3841   if (!cur->cn->kvblk) {
3842     rc = _sblk_loadkvblk_mm(lx, cur->cn, mm);
3843     RCGO(rc, finish);
3844   }
3845   uint8_t idx = cur->cn->pi[cur->cnpos];
3846   _kvblk_value_peek(cur->cn->kvblk, idx, mm, &oval, &ovalsz);
3847   *vsz = ovalsz;
3848   memcpy(vbuf, oval, MIN(vbufsz, ovalsz));
3849 
3850 finish:
3851   if (mm) {
3852     fsm->release_mmap(fsm);
3853   }
3854   API_DB_UNLOCK(lx->db, rci, rc);
3855   return rc;
3856 }
3857 
iwkv_cursor_is_matched_key(IWKV_cursor cur,const IWKV_val * key,bool * ores,int64_t * ocompound)3858 iwrc iwkv_cursor_is_matched_key(IWKV_cursor cur, const IWKV_val *key, bool *ores, int64_t *ocompound) {
3859   int rci;
3860   iwrc rc = 0;
3861   if (!cur || !ores || !key || !cur->lx.db) {
3862     return IW_ERROR_INVALID_ARGS;
3863   }
3864   if (!cur->cn || (cur->cn->flags & SBLK_DB) || (cur->cnpos >= cur->cn->pnum)) {
3865     return IWKV_ERROR_NOTFOUND;
3866   }
3867 
3868   *ores = 0;
3869   if (ocompound) {
3870     *ocompound = 0;
3871   }
3872 
3873   IWLCTX *lx = &cur->lx;
3874   API_DB_RLOCK(lx->db, rci);
3875   uint8_t *mm = 0, *okey;
3876   uint32_t okeysz;
3877   iwdb_flags_t dbflg = lx->db->dbflg;
3878   IWFS_FSM *fsm = &lx->db->iwkv->fsm;
3879   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
3880   RCGO(rc, finish);
3881   if (!cur->cn->kvblk) {
3882     rc = _sblk_loadkvblk_mm(lx, cur->cn, mm);
3883     RCGO(rc, finish);
3884   }
3885 
3886   uint8_t idx = cur->cn->pi[cur->cnpos];
3887   rc = _kvblk_key_peek(cur->cn->kvblk, idx, mm, &okey, &okeysz);
3888   RCGO(rc, finish);
3889 
3890   if (dbflg & (IWDB_COMPOUND_KEYS | IWDB_VNUM64_KEYS)) {
3891     char nbuf[2 * IW_VNUMBUFSZ];
3892     IWKV_val rkey = { .data = nbuf, .size = okeysz };
3893     memcpy(rkey.data, okey, MIN(rkey.size, sizeof(nbuf)));
3894     rc = _unpack_effective_key(lx->db, &rkey, true);
3895     RCGO(rc, finish);
3896     if (ocompound) {
3897       *ocompound = rkey.compound;
3898     }
3899     if (rkey.size != key->size) {
3900       *ores = false;
3901       goto finish;
3902     }
3903     if (dbflg & IWDB_VNUM64_KEYS) {
3904       *ores = !memcmp(rkey.data, key->data, key->size);
3905     } else {
3906       *ores = !memcmp(okey + (okeysz - rkey.size), key->data, key->size);
3907     }
3908   } else {
3909     *ores = (okeysz == key->size) && !memcmp(okey, key->data, key->size);
3910   }
3911 
3912 finish:
3913   if (mm) {
3914     fsm->release_mmap(fsm);
3915   }
3916   API_DB_UNLOCK(cur->lx.db, rci, rc);
3917   return rc;
3918 }
3919 
iwkv_cursor_copy_key(IWKV_cursor cur,void * kbuf,size_t kbufsz,size_t * ksz,int64_t * compound)3920 iwrc iwkv_cursor_copy_key(IWKV_cursor cur, void *kbuf, size_t kbufsz, size_t *ksz, int64_t *compound) {
3921   int rci;
3922   iwrc rc = 0;
3923   if (!cur || !cur->lx.db) {
3924     return IW_ERROR_INVALID_ARGS;
3925   }
3926   if (!cur->cn || (cur->cn->flags & SBLK_DB) || (cur->cnpos >= cur->cn->pnum)) {
3927     return IWKV_ERROR_NOTFOUND;
3928   }
3929 
3930   *ksz = 0;
3931   IWLCTX *lx = &cur->lx;
3932   API_DB_RLOCK(lx->db, rci);
3933   uint8_t *mm = 0, *okey;
3934   uint32_t okeysz;
3935   iwdb_flags_t dbflg = lx->db->dbflg;
3936   IWFS_FSM *fsm = &lx->db->iwkv->fsm;
3937   rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
3938   RCGO(rc, finish);
3939   if (!cur->cn->kvblk) {
3940     rc = _sblk_loadkvblk_mm(lx, cur->cn, mm);
3941     RCGO(rc, finish);
3942   }
3943 
3944   uint8_t idx = cur->cn->pi[cur->cnpos];
3945   rc = _kvblk_key_peek(cur->cn->kvblk, idx, mm, &okey, &okeysz);
3946   RCGO(rc, finish);
3947 
3948   if (dbflg & (IWDB_COMPOUND_KEYS | IWDB_VNUM64_KEYS)) {
3949     char nbuf[2 * IW_VNUMBUFSZ];
3950     IWKV_val rkey = { .data = nbuf, .size = okeysz };
3951     memcpy(rkey.data, okey, MIN(rkey.size, sizeof(nbuf)));
3952     rc = _unpack_effective_key(lx->db, &rkey, true);
3953     RCGO(rc, finish);
3954     if (compound) {
3955       *compound = rkey.compound;
3956     }
3957     *ksz = rkey.size;
3958     if (dbflg & IWDB_VNUM64_KEYS) {
3959       memcpy(kbuf, rkey.data, MIN(kbufsz, rkey.size));
3960     } else {
3961       memcpy(kbuf, okey + (okeysz - rkey.size), MIN(kbufsz, rkey.size));
3962     }
3963   } else {
3964     *ksz = okeysz;
3965     if (compound) {
3966       *compound = 0;
3967     }
3968     memcpy(kbuf, okey, MIN(kbufsz, okeysz));
3969   }
3970 
3971 finish:
3972   if (mm) {
3973     fsm->release_mmap(fsm);
3974   }
3975   API_DB_UNLOCK(cur->lx.db, rci, rc);
3976   return rc;
3977 }
3978 
iwkv_cursor_seth(IWKV_cursor cur,IWKV_val * val,iwkv_opflags opflags,IWKV_PUT_HANDLER ph,void * phop)3979 IW_EXPORT iwrc iwkv_cursor_seth(
3980   IWKV_cursor cur, IWKV_val *val, iwkv_opflags opflags,
3981   IWKV_PUT_HANDLER ph, void *phop
3982   ) {
3983   int rci;
3984   iwrc rc = 0, irc = 0;
3985   if (!cur || !cur->lx.db) {
3986     return IW_ERROR_INVALID_ARGS;
3987   }
3988   if (!cur->cn || (cur->cn->flags & SBLK_DB) || (cur->cnpos >= cur->cn->pnum)) {
3989     return IWKV_ERROR_NOTFOUND;
3990   }
3991 
3992   IWLCTX *lx = &cur->lx;
3993   IWDB db = lx->db;
3994   IWKV iwkv = db->iwkv;
3995   SBLK *sblk = cur->cn;
3996 
3997   API_DB_WLOCK(db, rci);
3998   if (ph) {
3999     uint8_t *mm;
4000     IWKV_val key, oldval;
4001     IWFS_FSM *fsm = &db->iwkv->fsm;
4002     rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
4003     RCGO(rc, finish);
4004     rc = _kvblk_kv_get(sblk->kvblk, mm, sblk->pi[cur->cnpos], &key, &oldval);
4005     fsm->release_mmap(fsm);
4006     if (!rc) {
4007       // note: oldval should be disposed by ph
4008       rc = ph(&key, val, &oldval, phop);
4009       _kv_val_dispose(&key);
4010     }
4011     RCGO(rc, finish);
4012   }
4013 
4014   rc = _sblk_updatekv(sblk, cur->cnpos, 0, val);
4015   if (IWKV_IS_INTERNAL_RC(rc)) {
4016     irc = rc;
4017     rc = 0;
4018   }
4019   RCGO(rc, finish);
4020 
4021   rc = _sblk_sync(lx, sblk);
4022   RCGO(rc, finish);
4023 
4024   // Update active cursors inside this block
4025   pthread_spin_lock(&db->cursors_slk);
4026   for (IWKV_cursor c = db->cursors; c; c = c->next) {
4027     if (c->cn && (c->cn->addr == sblk->addr)) {
4028       if (c->cn != sblk) {
4029         memcpy(c->cn, sblk, sizeof(*c->cn));
4030         c->cn->kvblk = 0;
4031         c->cn->flags &= SBLK_PERSISTENT_FLAGS;
4032       }
4033     }
4034   }
4035   pthread_spin_unlock(&db->cursors_slk);
4036 
4037 finish:
4038   API_DB_UNLOCK(db, rci, rc);
4039   if (!rc) {
4040     if (opflags & IWKV_SYNC) {
4041       rc = _iwkv_sync(iwkv, 0);
4042     } else {
4043       rc = iwal_poke_checkpoint(iwkv, false);
4044     }
4045   }
4046   return rc ? rc : irc;
4047 }
4048 
iwkv_cursor_set(IWKV_cursor cur,IWKV_val * val,iwkv_opflags opflags)4049 iwrc iwkv_cursor_set(IWKV_cursor cur, IWKV_val *val, iwkv_opflags opflags) {
4050   return iwkv_cursor_seth(cur, val, opflags, 0, 0);
4051 }
4052 
iwkv_cursor_val(IWKV_cursor cur,IWKV_val * oval)4053 iwrc iwkv_cursor_val(IWKV_cursor cur, IWKV_val *oval) {
4054   return iwkv_cursor_get(cur, 0, oval);
4055 }
4056 
iwkv_cursor_key(IWKV_cursor cur,IWKV_val * okey)4057 iwrc iwkv_cursor_key(IWKV_cursor cur, IWKV_val *okey) {
4058   return iwkv_cursor_get(cur, okey, 0);
4059 }
4060 
iwkv_cursor_del(IWKV_cursor cur,iwkv_opflags opflags)4061 iwrc iwkv_cursor_del(IWKV_cursor cur, iwkv_opflags opflags) {
4062   int rci;
4063   iwrc rc = 0;
4064   if (!cur || !cur->lx.db) {
4065     return IW_ERROR_INVALID_ARGS;
4066   }
4067   if (!cur->cn || (cur->cn->flags & SBLK_DB) || (cur->cnpos >= cur->cn->pnum)) {
4068     return IWKV_ERROR_NOTFOUND;
4069   }
4070 
4071   uint8_t *mm;
4072   SBLK *sblk = cur->cn;
4073   IWLCTX *lx = &cur->lx;
4074   IWDB db = lx->db;
4075   IWKV iwkv = db->iwkv;
4076   IWFS_FSM *fsm = &iwkv->fsm;
4077 
4078   API_DB_WLOCK(db, rci);
4079   if (sblk->pnum == 1) { // sblk will be removed
4080     IWKV_val key = { 0 };
4081     // Key a key
4082     rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
4083     RCGO(rc, finish2);
4084     if (!sblk->kvblk) {
4085       rc = _sblk_loadkvblk_mm(lx, sblk, mm);
4086       fsm->release_mmap(fsm);
4087       RCGO(rc, finish2);
4088     }
4089     rc = _kvblk_key_get(sblk->kvblk, mm, sblk->pi[cur->cnpos], &key);
4090     fsm->release_mmap(fsm);
4091     RCGO(rc, finish2);
4092 
4093     lx->key = &key;
4094     rc = _lx_del_sblk_lw(lx, sblk, cur->cnpos);
4095     lx->key = 0;
4096 
4097 finish2:
4098     if (rc) {
4099       _lx_release_mm(lx, 0);
4100     } else {
4101       rc = _lx_release(lx);
4102     }
4103     if (key.data) {
4104       _kv_val_dispose(&key);
4105     }
4106   } else { // Simple case
4107     if (!sblk->kvblk) {
4108       rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
4109       RCGO(rc, finish);
4110       rc = _sblk_loadkvblk_mm(lx, sblk, mm);
4111       fsm->release_mmap(fsm);
4112       RCGO(rc, finish);
4113     }
4114     rc = _sblk_rmkv(sblk, cur->cnpos);
4115     RCGO(rc, finish);
4116     rc = _sblk_sync(lx, sblk);
4117   }
4118 
4119 finish:
4120   API_DB_UNLOCK(db, rci, rc);
4121   if (!rc) {
4122     if (opflags & IWKV_SYNC) {
4123       rc = _iwkv_sync(iwkv, 0);
4124     } else {
4125       rc = iwal_poke_checkpoint(iwkv, false);
4126     }
4127   }
4128   return rc;
4129 }
4130 
4131 #include "./dbg/iwkvdbg.c"
4132