1 // -V::512
2
3 #include "iwkv_internal.h"
4 #include "iwconv.h"
5 #include <stdalign.h>
6
7 static iwrc _dbcache_fill_lw(IWLCTX *lx);
8 static iwrc _dbcache_get(IWLCTX *lx);
9 static iwrc _dbcache_put_lw(IWLCTX *lx, SBLK *sblk);
10 static void _dbcache_remove_lw(IWLCTX *lx, SBLK *sblk);
11 static void _dbcache_update_lw(IWLCTX *lx, SBLK *sblk);
12 static void _dbcache_destroy_lw(IWDB db);
13
14 #define _wnw_db_wl(db_) _api_db_wlock(db_)
15
16 //-------------------------- GLOBALS
17
18 #ifdef IW_TESTS
19 volatile int8_t iwkv_next_level = -1;
20 #endif
21 atomic_uint_fast64_t g_trigger;
22
23 #define IWKV_IS_INTERNAL_RC(rc_) ((rc_) > _IWKV_ERROR_END && (rc_) < _IWKV_RC_END)
24
25 //-------------------------- UTILS
26
_to_effective_key(struct _IWDB * db,const IWKV_val * key,IWKV_val * okey,uint8_t nbuf[static IW_VNUMBUFSZ])27 IW_SOFT_INLINE iwrc _to_effective_key(struct _IWDB *db, const IWKV_val *key, IWKV_val *okey,
28 uint8_t nbuf[static IW_VNUMBUFSZ]) {
29 static_assert(IW_VNUMBUFSZ >= sizeof(uint64_t), "IW_VNUMBUFSZ >= sizeof(uint64_t)");
30 iwdb_flags_t dbflg = db->dbflg;
31 // Keys compound will be processed at lower levels at `addkv` routines
32 okey->compound = key->compound;
33 if (dbflg & IWDB_VNUM64_KEYS) {
34 unsigned len;
35 if (key->size == 8) {
36 uint64_t llv;
37 memcpy(&llv, key->data, sizeof(llv));
38 IW_SETVNUMBUF64(len, nbuf, llv);
39 if (!len) return IW_ERROR_OVERFLOW;
40 okey->size = len;
41 okey->data = nbuf;
42 } else if (key->size == 4) {
43 uint32_t lv;
44 memcpy(&lv, key->data, sizeof(lv));
45 IW_SETVNUMBUF(len, nbuf, lv);
46 if (!len) return IW_ERROR_OVERFLOW;
47 okey->size = len;
48 okey->data = nbuf;
49 } else {
50 return IWKV_ERROR_KEY_NUM_VALUE_SIZE;
51 }
52 } else {
53 okey->data = key->data;
54 okey->size = key->size;
55 }
56 return 0;
57 }
58
59 // NOTE: at least `2*IW_VNUMBUFSZ` must be allocated for key->data
_unpack_effective_key(struct _IWDB * db,IWKV_val * key,bool no_move_key_data)60 static iwrc _unpack_effective_key(struct _IWDB *db, IWKV_val *key, bool no_move_key_data) {
61 iwdb_flags_t dbflg = db->dbflg;
62 uint8_t *data = key->data;
63 if (dbflg & IWDB_COMPOUND_KEYS) {
64 int step;
65 IW_READVNUMBUF64(key->data, key->compound, step);
66 if (step >= key->size) {
67 return IWKV_ERROR_KEY_NUM_VALUE_SIZE;
68 }
69 data += step;
70 key->size -= step;
71 if (!no_move_key_data && !(dbflg & IWDB_VNUM64_KEYS)) {
72 memmove(key->data, data, key->size);
73 }
74 } else {
75 key->compound = 0;
76 }
77 if (dbflg & IWDB_VNUM64_KEYS) {
78 int64_t llv;
79 char nbuf[IW_VNUMBUFSZ];
80 if (key->size > IW_VNUMBUFSZ) {
81 return IWKV_ERROR_KEY_NUM_VALUE_SIZE;
82 }
83 memcpy(nbuf, data, key->size);
84 IW_READVNUMBUF64_2(nbuf, llv);
85 memcpy(key->data, &llv, sizeof(llv));
86 key->size = sizeof(llv);
87 }
88 return 0;
89 }
90
_cmp_keys_prefix(iwdb_flags_t dbflg,const void * v1,int v1len,const IWKV_val * key)91 static int _cmp_keys_prefix(iwdb_flags_t dbflg, const void *v1, int v1len, const IWKV_val *key) {
92 int ret;
93 if (dbflg & IWDB_COMPOUND_KEYS) {
94 // Compound keys mode
95 const char *u1 = v1;
96 const char *u2 = key->data;
97 int step, v2len = (int) key->size;
98 int64_t c1, c2 = key->compound;
99 IW_READVNUMBUF64(v1, c1, step);
100 v1len -= step;
101 u1 += step;
102 if (v1len < 1) {
103 // Inconsistent data?
104 return v2len - v1len;
105 }
106 if (dbflg & IWDB_VNUM64_KEYS) {
107 if (v2len != v1len || v2len > IW_VNUMBUFSZ || v1len > IW_VNUMBUFSZ) {
108 return v2len - v1len;
109 }
110 int64_t n1, n2;
111 char vbuf[IW_VNUMBUFSZ];
112 memcpy(vbuf, u1, v1len);
113 IW_READVNUMBUF64_2(vbuf, n1);
114 memcpy(vbuf, u2, v2len);
115 IW_READVNUMBUF64_2(vbuf, n2);
116 ret = n1 > n2 ? -1 : n1 < n2 ? 1 : 0;
117 if (ret == 0) {
118 ret = c1 > c2 ? -1 : c1 < c2 ? 1 : 0;
119 }
120 } else if (dbflg & IWDB_REALNUM_KEYS) {
121 ret = iwafcmp(u2, v2len, u1, v1len);
122 if (ret == 0) {
123 ret = c1 > c2 ? -1 : c1 < c2 ? 1 : 0;
124 }
125 } else {
126 IW_CMP2(ret, u2, v2len, u1, v1len);
127 }
128 return ret;
129 } else {
130 int v2len = (int) key->size;
131 const void *v2 = key->data;
132 if (dbflg & IWDB_VNUM64_KEYS) {
133 if (v2len != v1len || v2len > IW_VNUMBUFSZ || v1len > IW_VNUMBUFSZ) {
134 return v2len - v1len;
135 }
136 int64_t n1, n2;
137 char vbuf[IW_VNUMBUFSZ];
138 memcpy(vbuf, v1, v1len);
139 IW_READVNUMBUF64_2(vbuf, n1);
140 memcpy(vbuf, v2, v2len);
141 IW_READVNUMBUF64_2(vbuf, n2);
142 return n1 > n2 ? -1 : n1 < n2 ? 1 : 0;
143 } else if (dbflg & IWDB_REALNUM_KEYS) {
144 return iwafcmp(v2, v2len, v1, v1len);
145 } else {
146 IW_CMP2(ret, v2, v2len, v1, v1len);
147 return ret;
148 }
149 }
150 }
151
_cmp_keys(iwdb_flags_t dbflg,const void * v1,int v1len,const IWKV_val * key)152 IW_INLINE int _cmp_keys(iwdb_flags_t dbflg, const void *v1, int v1len, const IWKV_val *key) {
153 int rv = _cmp_keys_prefix(dbflg, v1, v1len, key);
154 if (rv == 0 && !(dbflg & (IWDB_VNUM64_KEYS | IWDB_REALNUM_KEYS))) {
155 if (dbflg & IWDB_COMPOUND_KEYS) {
156 int step;
157 int64_t c1, c2 = key->compound;
158 IW_READVNUMBUF64(v1, c1, step);
159 v1len -= step;
160 if ((int) key->size == v1len) {
161 return c1 > c2 ? -1 : c1 < c2 ? 1 : 0;
162 }
163 }
164 return (int) key->size - v1len;
165 } else {
166 return rv;
167 }
168 }
169
_kv_val_dispose(IWKV_val * v)170 IW_INLINE void _kv_val_dispose(IWKV_val *v) {
171 if (v) {
172 free(v->data);
173 v->size = 0;
174 v->data = 0;
175 }
176 }
177
_kv_dispose(IWKV_val * key,IWKV_val * val)178 IW_INLINE void _kv_dispose(IWKV_val *key, IWKV_val *val) {
179 _kv_val_dispose(key);
180 _kv_val_dispose(val);
181 }
182
iwkv_val_dispose(IWKV_val * v)183 void iwkv_val_dispose(IWKV_val *v) {
184 _kv_val_dispose(v);
185 }
186
iwkv_kv_dispose(IWKV_val * key,IWKV_val * val)187 void iwkv_kv_dispose(IWKV_val *key, IWKV_val *val) {
188 _kv_dispose(key, val);
189 }
190
_num2lebuf(uint8_t buf[static8],void * numdata,size_t sz)191 IW_INLINE void _num2lebuf(uint8_t buf[static 8], void *numdata, size_t sz) {
192 assert(sz == 4 || sz == 8);
193 if (sz > 4) {
194 uint64_t llv;
195 memcpy(&llv, numdata, sizeof(llv));
196 llv = IW_HTOILL(llv);
197 memcpy(buf, &llv, sizeof(llv));
198 } else {
199 uint32_t lv;
200 memcpy(&lv, numdata, sizeof(lv));
201 lv = IW_HTOIL(lv);
202 memcpy(buf, &lv, sizeof(lv));
203 }
204 }
205
206 //-------------------------- IWKV/IWDB WORKERS
207
_iwkv_worker_inc_nolk(IWKV iwkv)208 static WUR iwrc _iwkv_worker_inc_nolk(IWKV iwkv) {
209 if (!iwkv || !iwkv->open) {
210 return IW_ERROR_INVALID_STATE;
211 }
212 int rci = pthread_mutex_lock(&iwkv->wk_mtx);
213 if (rci) {
214 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
215 }
216 if (!iwkv->open) { // -V547
217 pthread_mutex_unlock(&iwkv->wk_mtx);
218 return IW_ERROR_INVALID_STATE;
219 }
220 while (iwkv->wk_pending_exclusive) {
221 pthread_cond_wait(&iwkv->wk_cond, &iwkv->wk_mtx);
222 }
223 ++iwkv->wk_count;
224 pthread_cond_broadcast(&iwkv->wk_cond);
225 pthread_mutex_unlock(&iwkv->wk_mtx);
226 return 0;
227 }
228
_db_worker_inc_nolk(IWDB db)229 static WUR iwrc _db_worker_inc_nolk(IWDB db) {
230 if (!db || !db->iwkv || !db->iwkv->open || !db->open) {
231 return IW_ERROR_INVALID_STATE;
232 }
233 IWKV iwkv = db->iwkv;
234 int rci = pthread_mutex_lock(&iwkv->wk_mtx);
235 if (rci) {
236 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
237 }
238 if (!iwkv->open || !db->open) { // -V560
239 pthread_mutex_unlock(&iwkv->wk_mtx);
240 return IW_ERROR_INVALID_STATE;
241 }
242 while (db->wk_pending_exclusive) {
243 pthread_cond_wait(&iwkv->wk_cond, &iwkv->wk_mtx);
244 }
245 ++iwkv->wk_count;
246 ++db->wk_count;
247 pthread_cond_broadcast(&iwkv->wk_cond);
248 pthread_mutex_unlock(&iwkv->wk_mtx);
249 return 0;
250 }
251
_iwkv_worker_dec_nolk(IWKV iwkv)252 static iwrc _iwkv_worker_dec_nolk(IWKV iwkv) {
253 if (!iwkv) {
254 return IW_ERROR_INVALID_STATE;
255 }
256 int rci = pthread_mutex_lock(&iwkv->wk_mtx);
257 if (rci) {
258 // Last chanсe to be consistent
259 --iwkv->wk_count;
260 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
261 }
262 --iwkv->wk_count;
263 pthread_cond_broadcast(&iwkv->wk_cond);
264 pthread_mutex_unlock(&iwkv->wk_mtx);
265 return 0;
266 }
267
_db_worker_dec_nolk(IWDB db)268 static iwrc _db_worker_dec_nolk(IWDB db) {
269 if (!db || !db->iwkv) { // do not use ENSURE_OPEN_DB here
270 return IW_ERROR_INVALID_STATE;
271 }
272 IWKV iwkv = db->iwkv;
273 int rci = pthread_mutex_lock(&iwkv->wk_mtx);
274 if (rci) {
275 // Last chanсe to be consistent
276 --iwkv->wk_count;
277 --db->wk_count;
278 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
279 }
280 --iwkv->wk_count;
281 --db->wk_count;
282 pthread_cond_broadcast(&iwkv->wk_cond);
283 pthread_mutex_unlock(&iwkv->wk_mtx);
284 return 0;
285 }
286
_wnw_iwkw_wl(IWKV iwkv)287 static WUR iwrc _wnw_iwkw_wl(IWKV iwkv) {
288 int rci = pthread_rwlock_wrlock(&iwkv->rwl);
289 if (rci) {
290 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
291 }
292 return 0;
293 }
294
_wnw(IWKV iwkv,iwrc (* after)(IWKV iwkv))295 static WUR iwrc _wnw(IWKV iwkv, iwrc(*after)(IWKV iwkv)) {
296 iwrc rc = 0;
297 int rci = pthread_mutex_lock(&iwkv->wk_mtx);
298 if (rci) {
299 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
300 }
301 iwkv->wk_pending_exclusive = true;
302 while (iwkv->wk_count > 0) {
303 pthread_cond_wait(&iwkv->wk_cond, &iwkv->wk_mtx);
304 }
305 if (after) {
306 rc = after(iwkv);
307 }
308 iwkv->wk_pending_exclusive = false;
309 pthread_cond_broadcast(&iwkv->wk_cond);
310 rci = pthread_mutex_unlock(&iwkv->wk_mtx);
311 if (rci) {
312 IWRC(iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci), rc);
313 }
314 return rc;
315 }
316
_wnw_db(IWDB db,iwrc (* after)(IWDB db))317 static WUR iwrc _wnw_db(IWDB db, iwrc(*after)(IWDB db)) {
318 iwrc rc = 0;
319 IWKV iwkv = db->iwkv;
320 int rci = pthread_mutex_lock(&iwkv->wk_mtx);
321 if (rci) {
322 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
323 }
324 db->wk_pending_exclusive = true;
325 while (db->wk_count > 0) {
326 pthread_cond_wait(&iwkv->wk_cond, &iwkv->wk_mtx);
327 }
328 if (after) {
329 rc = after(db);
330 }
331 db->wk_pending_exclusive = false;
332 pthread_cond_broadcast(&iwkv->wk_cond);
333 rci = pthread_mutex_unlock(&iwkv->wk_mtx);
334 if (rci) {
335 IWRC(iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci), rc);
336 }
337 return rc;
338 }
339
340 //-------------------------- DB
341
_db_at(IWKV iwkv,IWDB * dbp,off_t addr,uint8_t * mm)342 static WUR iwrc _db_at(IWKV iwkv, IWDB *dbp, off_t addr, uint8_t *mm) {
343 iwrc rc = 0;
344 uint8_t *rp, bv;
345 uint32_t lv;
346 int rci;
347 IWDB db = calloc(1, sizeof(struct _IWDB));
348 *dbp = 0;
349 if (!db) {
350 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
351 }
352 pthread_rwlockattr_t attr;
353 pthread_rwlockattr_init(&attr);
354 #if defined __linux__ && (defined __USE_UNIX98 || defined __USE_XOPEN2K)
355 pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
356 #endif
357 rci = pthread_rwlock_init(&db->rwl, &attr);
358 if (rci) {
359 free(db);
360 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
361 }
362 rci = pthread_spin_init(&db->cursors_slk, 0);
363 if (rci) {
364 pthread_rwlock_destroy(&db->rwl);
365 free(db);
366 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
367 }
368 // [magic:u4,dbflg:u1,dbid:u4,next_db_blk:u4,p0:u4,n[24]:u4,c[24]:u4,meta_blk:u4,meta_blkn:u4]:217
369 db->flags = SBLK_DB;
370 db->addr = addr;
371 db->db = db;
372 db->iwkv = iwkv;
373 rp = mm + addr;
374 IW_READLV(rp, lv, lv);
375 if (lv != IWDB_MAGIC) {
376 rc = IWKV_ERROR_CORRUPTED;
377 iwlog_ecode_error3(rc);
378 goto finish;
379 }
380 IW_READBV(rp, bv, db->dbflg);
381 IW_READLV(rp, lv, db->id);
382 IW_READLV(rp, lv, db->next_db_addr);
383 db->next_db_addr = BLK2ADDR(db->next_db_addr); // blknum -> addr
384 rp = mm + addr + DOFF_C0_U4;
385 for (int i = 0; i < SLEVELS; ++i) {
386 IW_READLV(rp, lv, db->lcnt[i]);
387 }
388 if (iwkv->fmt_version >= 1) {
389 IW_READLV(rp, lv, db->meta_blk);
390 IW_READLV(rp, lv, db->meta_blkn);
391 }
392 db->open = true;
393 *dbp = db;
394
395 finish:
396 if (rc) {
397 pthread_rwlock_destroy(&db->rwl);
398 free(db);
399 }
400 return rc;
401 }
402
_db_save(IWDB db,bool newdb,uint8_t * mm)403 static WUR iwrc _db_save(IWDB db, bool newdb, uint8_t *mm) {
404 iwrc rc = 0;
405 uint32_t lv;
406 uint8_t *wp = mm + db->addr, bv;
407 uint8_t *sp = wp;
408 IWDLSNR *dlsnr = db->iwkv->dlsnr;
409 db->next_db_addr = db->next ? db->next->addr : 0;
410 // [magic:u4,dbflg:u1,dbid:u4,next_db_blk:u4,p0:u4,n[24]:u4,c[24]:u4,meta_blk:u4,meta_blkn:u4]:217
411 IW_WRITELV(wp, lv, IWDB_MAGIC);
412 IW_WRITEBV(wp, bv, db->dbflg);
413 IW_WRITELV(wp, lv, db->id);
414 IW_WRITELV(wp, lv, ADDR2BLK(db->next_db_addr));
415 if (dlsnr) {
416 rc = dlsnr->onwrite(dlsnr, db->addr, sp, wp - sp, 0);
417 RCRET(rc);
418 }
419 if (db->iwkv->fmt_version >= 1) {
420 if (newdb) {
421 memset(wp, 0, 4 + SLEVELS * 4 * 2); // p0 + n[24] + c[24]
422 sp = wp;
423 wp += 4 + SLEVELS * 4 * 2; // set to zero
424 } else {
425 wp += 4 + SLEVELS * 4 * 2; // skip
426 sp = wp;
427 }
428 IW_WRITELV(wp, lv, db->meta_blk);
429 IW_WRITELV(wp, lv, db->meta_blkn);
430 if (dlsnr) {
431 rc = dlsnr->onwrite(dlsnr, sp - mm, sp, wp - sp, 0);
432 }
433 }
434 return rc;
435 }
436
_db_load_chain(IWKV iwkv,off_t addr,uint8_t * mm)437 static WUR iwrc _db_load_chain(IWKV iwkv, off_t addr, uint8_t *mm) {
438 iwrc rc;
439 int rci;
440 IWDB db = 0, ndb;
441 if (!addr) return 0;
442 do {
443 rc = _db_at(iwkv, &ndb, addr, mm);
444 RCRET(rc);
445 if (db) {
446 db->next = ndb;
447 ndb->prev = db;
448 } else {
449 iwkv->first_db = ndb;
450 }
451 db = ndb;
452 addr = db->next_db_addr;
453 iwkv->last_db = db;
454 khiter_t k = kh_put(DBS, iwkv->dbs, db->id, &rci);
455 if (rci != -1) {
456 kh_value(iwkv->dbs, k) = db;
457 } else {
458 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
459 }
460 } while (db->next_db_addr);
461 return rc;
462 }
463
_db_release_lw(IWDB * dbp)464 static void _db_release_lw(IWDB *dbp) {
465 assert(dbp && *dbp);
466 IWDB db = *dbp;
467 _dbcache_destroy_lw(db);
468 pthread_rwlock_destroy(&db->rwl);
469 pthread_spin_destroy(&db->cursors_slk);
470 free(db);
471 *dbp = 0;
472 }
473
474 typedef struct DISPOSE_DB_CTX {
475 IWKV iwkv;
476 IWDB db;
477 blkn_t sbn; // First `SBLK` block in DB
478 } DISPOSE_DB_CTX;
479
_db_dispose_chain(DISPOSE_DB_CTX * dctx)480 static iwrc _db_dispose_chain(DISPOSE_DB_CTX *dctx) {
481 iwrc rc = 0;
482 uint8_t *mm, kvszpow;
483 IWFS_FSM *fsm = &dctx->iwkv->fsm;
484 blkn_t sbn = dctx->sbn, kvblkn;
485 off_t page = 0;
486
487 while (sbn) {
488 off_t sba = BLK2ADDR(sbn);
489 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
490 RCBREAK(rc);
491 memcpy(&kvblkn, mm + sba + SOFF_KBLK_U4, 4);
492 kvblkn = IW_ITOHL(kvblkn);
493 memcpy(&sbn, mm + sba + SOFF_N0_U4, 4);
494 sbn = IW_ITOHL(sbn);
495 if (kvblkn) {
496 memcpy(&kvszpow, mm + BLK2ADDR(kvblkn) + KBLK_SZPOW_OFF, 1);
497 }
498 if (dctx->iwkv->fmt_version > 1) {
499 uint8_t bpos;
500 memcpy(&bpos, mm + sba + SOFF_BPOS_U1_V2, 1);
501 rc = fsm->release_mmap(fsm);
502 RCBREAK(rc);
503 if (bpos > 0 && bpos <= SBLK_PAGE_SBLK_NUM_V2) {
504 off_t npage = sba - (bpos - 1) * SBLK_SZ;
505 if (npage != page) {
506 if (page) {
507 if (!fsm->check_allocation_status(fsm, page, SBLK_PAGE_SZ_V2, true)) {
508 rc = fsm->deallocate(fsm, page, SBLK_PAGE_SZ_V2);
509 }
510 RCBREAK(rc);
511 }
512 page = npage;
513 }
514 }
515 } else {
516 rc = fsm->release_mmap(fsm);
517 RCBREAK(rc);
518 // Deallocate `SBLK`
519 rc = fsm->deallocate(fsm, sba, SBLK_SZ);
520 RCBREAK(rc);
521 }
522 // Deallocate `KVBLK`
523 if (kvblkn) {
524 rc = fsm->deallocate(fsm, BLK2ADDR(kvblkn), 1ULL << kvszpow);
525 RCBREAK(rc);
526 }
527 }
528 if (page) {
529 if (!fsm->check_allocation_status(fsm, page, SBLK_PAGE_SZ_V2, true)) {
530 IWRC(fsm->deallocate(fsm, page, SBLK_PAGE_SZ_V2), rc);
531 }
532 }
533 _db_release_lw(&dctx->db);
534 return rc;
535 }
536
_db_destroy_lw(IWDB * dbp)537 static WUR iwrc _db_destroy_lw(IWDB *dbp) {
538 iwrc rc;
539 uint8_t *mm;
540 IWDB db = *dbp;
541 IWKV iwkv = db->iwkv;
542 IWDB prev = db->prev;
543 IWDB next = db->next;
544 IWFS_FSM *fsm = &iwkv->fsm;
545 uint32_t first_sblkn;
546
547 khiter_t k = kh_get(DBS, iwkv->dbs, db->id);
548 if (k == kh_end(iwkv->dbs)) {
549 iwlog_ecode_error3(IW_ERROR_INVALID_STATE);
550 return IW_ERROR_INVALID_STATE;
551 }
552 kh_del(DBS, iwkv->dbs, k);
553
554 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
555 RCRET(rc);
556 if (prev) {
557 prev->next = next;
558 rc = _db_save(prev, false, mm);
559 if (rc) {
560 fsm->release_mmap(fsm);
561 return rc;
562 }
563 }
564 if (next) {
565 next->prev = prev;
566 rc = _db_save(next, false, mm);
567 if (rc) {
568 fsm->release_mmap(fsm);
569 return rc;
570 }
571 }
572 // [magic:u4,dbflg:u1,dbid:u4,next_db_blk:u4,p0:u4,n[24]:u4,c[24]:u4,meta_blk:u4,meta_blkn:u4]:217
573 memcpy(&first_sblkn, mm + db->addr + DOFF_N0_U4, 4);
574 first_sblkn = IW_ITOHL(first_sblkn);
575 fsm->release_mmap(fsm);
576
577 if (iwkv->first_db && iwkv->first_db->addr == db->addr) {
578 uint64_t llv;
579 db->iwkv->first_db = next;
580 llv = next ? (uint64_t) next->addr : 0;
581 llv = IW_HTOILL(llv);
582 rc = fsm->writehdr(fsm, sizeof(uint32_t) /*skip magic*/, &llv, sizeof(llv));
583 }
584 if (iwkv->last_db && iwkv->last_db->addr == db->addr) {
585 iwkv->last_db = prev;
586 }
587 // Cleanup DB
588 off_t db_addr = db->addr;
589 blkn_t meta_blk = db->meta_blk;
590 blkn_t meta_blkn = db->meta_blkn;
591 db->open = false;
592
593 DISPOSE_DB_CTX dctx = {
594 .sbn = first_sblkn,
595 .iwkv = iwkv,
596 .db = db
597 };
598 IWRC(_db_dispose_chain(&dctx), rc);
599 if (meta_blk && meta_blkn) {
600 IWRC(fsm->deallocate(fsm, BLK2ADDR(db->meta_blk), BLK2ADDR(db->meta_blkn)), rc);
601 }
602 IWRC(fsm->deallocate(fsm, db_addr, DB_SZ), rc);
603 return rc;
604 }
605
_db_create_lw(IWKV iwkv,dbid_t dbid,iwdb_flags_t dbflg,IWDB * odb)606 static WUR iwrc _db_create_lw(IWKV iwkv, dbid_t dbid, iwdb_flags_t dbflg, IWDB *odb) {
607 iwrc rc;
608 int rci;
609 uint8_t *mm = 0;
610 off_t baddr = 0, blen;
611 IWFS_FSM *fsm = &iwkv->fsm;
612 *odb = 0;
613 IWDB db = calloc(1, sizeof(struct _IWDB));
614 if (!db) {
615 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
616 }
617 pthread_rwlockattr_t attr;
618 pthread_rwlockattr_init(&attr);
619 #if defined __linux__ && (defined __USE_UNIX98 || defined __USE_XOPEN2K)
620 pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
621 #endif
622 rci = pthread_rwlock_init(&db->rwl, &attr);
623 if (rci) {
624 free(db);
625 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
626 }
627 rci = pthread_spin_init(&db->cursors_slk, 0);
628 if (rci) {
629 pthread_rwlock_destroy(&db->rwl);
630 free(db);
631 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
632 }
633 rc = fsm->allocate(fsm, DB_SZ, &baddr, &blen, IWKV_FSM_ALLOC_FLAGS);
634 if (rc) {
635 _db_release_lw(&db);
636 return rc;
637 }
638 db->iwkv = iwkv;
639 db->dbflg = dbflg;
640 db->addr = baddr;
641 db->id = dbid;
642 db->prev = iwkv->last_db;
643 if (!iwkv->first_db) {
644 uint64_t llv;
645 iwkv->first_db = db;
646 llv = (uint64_t) db->addr;
647 llv = IW_HTOILL(llv);
648 rc = fsm->writehdr(fsm, sizeof(uint32_t) /*skip magic*/, &llv, sizeof(llv));
649 } else if (iwkv->last_db) {
650 iwkv->last_db->next = db;
651 }
652 iwkv->last_db = db;
653 khiter_t k = kh_put(DBS, iwkv->dbs, db->id, &rci);
654 if (rci != -1) {
655 kh_value(iwkv->dbs, k) = db;
656 } else {
657 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
658 goto finish;
659 }
660 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
661 RCGO(rc, finish);
662 rc = _db_save(db, true, mm);
663 RCGO(rc, finish);
664 if (db->prev) {
665 rc = _db_save(db->prev, false, mm);
666 RCGO(rc, finish);
667 }
668 db->open = true;
669 *odb = db;
670
671 finish:
672 if (mm) {
673 fsm->release_mmap(fsm);
674 }
675 if (rc) {
676 fsm->deallocate(fsm, baddr, blen);
677 _db_release_lw(&db);
678 }
679 return rc;
680 }
681
682 //-------------------------- KVBLK
683
_kvblk_create(IWLCTX * lx,off_t baddr,uint8_t kvbpow,KVBLK ** oblk)684 IW_INLINE void _kvblk_create(IWLCTX *lx, off_t baddr, uint8_t kvbpow, KVBLK **oblk) {
685 KVBLK *kblk = &lx->kaa[lx->kaan];
686 kblk->db = lx->db;
687 kblk->addr = baddr;
688 kblk->maxoff = 0;
689 kblk->idxsz = 2 * IW_VNUMSIZE(0) * KVBLK_IDXNUM;
690 kblk->zidx = 0;
691 kblk->szpow = kvbpow;
692 kblk->flags = KVBLK_DURTY;
693 memset(kblk->pidx, 0, sizeof(kblk->pidx));
694 *oblk = kblk;
695 AAPOS_INC(lx->kaan);
696 }
697
_kvblk_key_peek(const KVBLK * kb,uint8_t idx,const uint8_t * mm,uint8_t ** obuf,uint32_t * olen)698 IW_INLINE WUR iwrc _kvblk_key_peek(const KVBLK *kb,
699 uint8_t idx, const uint8_t *mm, uint8_t **obuf,
700 uint32_t *olen) {
701 if (kb->pidx[idx].len) {
702 uint32_t klen, step;
703 const uint8_t *rp = mm + kb->addr + (1ULL << kb->szpow) - kb->pidx[idx].off;
704 IW_READVNUMBUF(rp, klen, step);
705 if (!klen) {
706 *obuf = 0;
707 *olen = 0;
708 iwlog_ecode_error3(IWKV_ERROR_CORRUPTED);
709 return IWKV_ERROR_CORRUPTED;
710 }
711 rp += step;
712 *obuf = (uint8_t *) rp;
713 *olen = klen;
714 } else {
715 *obuf = 0;
716 *olen = 0;
717 }
718 return 0;
719 }
720
_kvblk_value_peek(const KVBLK * kb,uint8_t idx,const uint8_t * mm,uint8_t ** obuf,uint32_t * olen)721 IW_INLINE void _kvblk_value_peek(const KVBLK *kb, uint8_t idx, const uint8_t *mm, uint8_t **obuf, uint32_t *olen) {
722 assert(idx < KVBLK_IDXNUM);
723 if (kb->pidx[idx].len) {
724 uint32_t klen, step;
725 const uint8_t *rp = mm + kb->addr + (1ULL << kb->szpow) - kb->pidx[idx].off;
726 IW_READVNUMBUF(rp, klen, step);
727 rp += step;
728 rp += klen;
729 *obuf = (uint8_t *) rp;
730 *olen = kb->pidx[idx].len - klen - step;
731 } else {
732 *obuf = 0;
733 *olen = 0;
734 }
735 }
736
_kvblk_key_get(KVBLK * kb,uint8_t * mm,uint8_t idx,IWKV_val * key)737 static WUR iwrc _kvblk_key_get(KVBLK *kb, uint8_t *mm, uint8_t idx, IWKV_val *key) {
738 assert(mm && idx < KVBLK_IDXNUM);
739 int32_t klen;
740 int step;
741 KVP *kvp = &kb->pidx[idx];
742 key->compound = 0;
743 if (!kvp->len) {
744 key->data = 0;
745 key->size = 0;
746 return 0;
747 }
748 // [klen:vn,key,value]
749 uint8_t *rp = mm + kb->addr + (1ULL << kb->szpow) - kvp->off;
750 IW_READVNUMBUF(rp, klen, step);
751 rp += step;
752 if (klen < 1 || klen > kvp->len || klen > kvp->off) {
753 iwlog_ecode_error3(IWKV_ERROR_CORRUPTED);
754 return IWKV_ERROR_CORRUPTED;
755 }
756 key->size = (size_t) klen;
757 if (kb->db->dbflg & IWDB_VNUM64_KEYS) {
758 // Needed to provide enough buffer in _unpack_effective_key()
759 key->data = malloc(MAX(key->size, sizeof(int64_t)));
760 } else {
761 key->data = malloc(key->size);
762 }
763 if (!key->data) {
764 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
765 }
766 memcpy(key->data, rp, key->size);
767 return 0;
768 }
769
_kvblk_value_get(KVBLK * kb,uint8_t * mm,uint8_t idx,IWKV_val * val)770 static WUR iwrc _kvblk_value_get(KVBLK *kb, uint8_t *mm, uint8_t idx, IWKV_val *val) {
771 assert(mm && idx < KVBLK_IDXNUM);
772 int32_t klen;
773 int step;
774 KVP *kvp = &kb->pidx[idx];
775 val->compound = 0;
776 if (!kvp->len) {
777 val->data = 0;
778 val->size = 0;
779 return 0;
780 }
781 // [klen:vn,key,value]
782 uint8_t *rp = mm + kb->addr + (1ULL << kb->szpow) - kvp->off;
783 IW_READVNUMBUF(rp, klen, step);
784 rp += step;
785 if (klen < 1 || klen > kvp->len || klen > kvp->off) {
786 iwlog_ecode_error3(IWKV_ERROR_CORRUPTED);
787 return IWKV_ERROR_CORRUPTED;
788 }
789 rp += klen;
790 if (kvp->len > klen + step) {
791 val->size = kvp->len - klen - step;
792 val->data = malloc(val->size);
793 if (!val->data) {
794 iwrc rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
795 val->size = 0;
796 return rc;
797 }
798 memcpy(val->data, rp, val->size);
799 } else {
800 val->data = 0;
801 val->size = 0;
802 }
803 return 0;
804 }
805
_kvblk_kv_get(KVBLK * kb,uint8_t * mm,uint8_t idx,IWKV_val * key,IWKV_val * val)806 static WUR iwrc _kvblk_kv_get(KVBLK *kb, uint8_t *mm, uint8_t idx, IWKV_val *key, IWKV_val *val) {
807 assert(mm && idx < KVBLK_IDXNUM);
808 int32_t klen;
809 int step;
810 KVP *kvp = &kb->pidx[idx];
811 key->compound = 0;
812 val->compound = 0;
813 if (!kvp->len) {
814 key->data = 0;
815 key->size = 0;
816 val->data = 0;
817 val->size = 0;
818 return 0;
819 }
820 // [klen:vn,key,value]
821 uint8_t *rp = mm + kb->addr + (1ULL << kb->szpow) - kvp->off;
822 IW_READVNUMBUF(rp, klen, step);
823 rp += step;
824 if (klen < 1 || klen > kvp->len || klen > kvp->off) {
825 iwlog_ecode_error3(IWKV_ERROR_CORRUPTED);
826 return IWKV_ERROR_CORRUPTED;
827 }
828 key->size = (size_t) klen;
829 if (kb->db->dbflg & IWDB_VNUM64_KEYS) {
830 // Needed to provide enough buffer in _unpack_effective_key()
831 key->data = malloc(MAX(key->size, sizeof(int64_t)));
832 } else {
833 key->data = malloc(key->size);
834 }
835 if (!key->data) {
836 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
837 }
838 memcpy(key->data, rp, key->size);
839 rp += klen;
840 if (kvp->len > klen + step) {
841 val->size = kvp->len - klen - step;
842 val->data = malloc(val->size);
843 if (!val->data) {
844 iwrc rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
845 free(key->data);
846 key->data = 0;
847 key->size = 0;
848 val->size = 0;
849 return rc;
850 }
851 memcpy(val->data, rp, val->size);
852 } else {
853 val->data = 0;
854 val->size = 0;
855 }
856 return 0;
857 }
858
_kvblk_at_mm(IWLCTX * lx,off_t addr,uint8_t * mm,KVBLK * kbp,KVBLK ** blkp)859 static WUR iwrc _kvblk_at_mm(IWLCTX *lx, off_t addr, uint8_t *mm, KVBLK *kbp, KVBLK **blkp) {
860 uint8_t *rp;
861 uint16_t sv;
862 int step;
863 iwrc rc = 0;
864 KVBLK *kb = kbp ? kbp : &lx->kaa[lx->kaan];
865 kb->db = lx->db;
866 kb->addr = addr;
867 kb->maxoff = 0;
868 kb->idxsz = 0;
869 kb->zidx = -1;
870 kb->szpow = 0;
871 kb->flags = KVBLK_DEFAULT;
872 memset(kb->pidx, 0, sizeof(kb->pidx));
873
874 *blkp = 0;
875 rp = mm + addr;
876 memcpy(&kb->szpow, rp, 1);
877 rp += 1;
878 IW_READSV(rp, sv, kb->idxsz);
879 if (IW_UNLIKELY(kb->idxsz > KVBLK_MAX_IDX_SZ)) {
880 rc = IWKV_ERROR_CORRUPTED;
881 iwlog_ecode_error3(rc);
882 goto finish;
883 }
884 for (uint8_t i = 0; i < KVBLK_IDXNUM; ++i) {
885 IW_READVNUMBUF64(rp, kb->pidx[i].off, step);
886 rp += step;
887 IW_READVNUMBUF(rp, kb->pidx[i].len, step);
888 rp += step;
889 if (kb->pidx[i].len) {
890 if (IW_UNLIKELY(!kb->pidx[i].off)) {
891 rc = IWKV_ERROR_CORRUPTED;
892 iwlog_ecode_error3(rc);
893 goto finish;
894 }
895 if (kb->pidx[i].off > kb->maxoff) {
896 kb->maxoff = kb->pidx[i].off;
897 }
898 } else if (kb->zidx < 0) {
899 kb->zidx = i;
900 }
901 kb->pidx[i].ridx = i;
902 }
903 *blkp = kb;
904 assert(rp - (mm + addr) <= (1ULL << kb->szpow));
905 if (!kbp) {
906 AAPOS_INC(lx->kaan);
907 }
908
909 finish:
910 return rc;
911 }
912
_kvblk_compacted_offset(KVBLK * kb)913 IW_INLINE off_t _kvblk_compacted_offset(KVBLK *kb) {
914 off_t coff = 0;
915 for (int i = 0; i < KVBLK_IDXNUM; ++i) {
916 coff += kb->pidx[i].len;
917 }
918 return coff;
919 }
920
_kvblk_compacted_dsize(KVBLK * kb)921 IW_INLINE off_t _kvblk_compacted_dsize(KVBLK *kb) {
922 off_t coff = KVBLK_HDRSZ;
923 for (int i = 0; i < KVBLK_IDXNUM; ++i) {
924 coff += kb->pidx[i].len;
925 coff += IW_VNUMSIZE32(kb->pidx[i].len);
926 coff += IW_VNUMSIZE(kb->pidx[i].off);
927 }
928 return coff;
929 }
930
_kvblk_sync_mm(KVBLK * kb,uint8_t * mm)931 static WUR iwrc _kvblk_sync_mm(KVBLK *kb, uint8_t *mm) {
932 iwrc rc = 0;
933 if (!(kb->flags & KVBLK_DURTY)) {
934 return rc;
935 }
936 uint16_t sp;
937 uint8_t *szp;
938 uint8_t *wp = mm + kb->addr;
939 uint8_t *sptr = wp;
940 IWDLSNR *dlsnr = kb->db->iwkv->dlsnr;
941 memcpy(wp, &kb->szpow, 1);
942 wp += 1;
943 szp = wp;
944 wp += sizeof(uint16_t);
945 for (int i = 0; i < KVBLK_IDXNUM; ++i) {
946 KVP *kvp = &kb->pidx[i];
947 IW_SETVNUMBUF64(sp, wp, kvp->off);
948 wp += sp;
949 IW_SETVNUMBUF(sp, wp, kvp->len);
950 wp += sp;
951 }
952 sp = wp - szp - sizeof(uint16_t);
953 kb->idxsz = sp;
954 assert(kb->idxsz <= KVBLK_MAX_IDX_SZ);
955 sp = IW_HTOIS(sp);
956 memcpy(szp, &sp, sizeof(uint16_t));
957 assert(wp - (mm + kb->addr) <= (1ULL << kb->szpow));
958 if (dlsnr) {
959 rc = dlsnr->onwrite(dlsnr, kb->addr, sptr, wp - sptr, 0);
960 }
961 kb->flags &= ~KVBLK_DURTY;
962 return rc;
963 }
964
965 #define _kvblk_sort_kv_lt(v1, v2) \
966 (((v1).off > 0 ? (v1).off : -1UL) < ((v2).off > 0 ? (v2).off : -1UL))
967
968 // -V:KSORT_INIT:522, 756, 769
KSORT_INIT(kvblk,KVP,_kvblk_sort_kv_lt)969 KSORT_INIT(kvblk, KVP, _kvblk_sort_kv_lt)
970
971 static WUR iwrc _kvblk_compact_mm(KVBLK *kb, uint8_t *mm) {
972 uint8_t i;
973 off_t coff = _kvblk_compacted_offset(kb);
974 if (coff == kb->maxoff) { // compacted
975 return 0;
976 }
977 KVP tidx[KVBLK_IDXNUM];
978 KVP tidx_tmp[KVBLK_IDXNUM];
979 iwrc rc = 0;
980 uint16_t idxsiz = 0;
981 IWDLSNR *dlsnr = kb->db->iwkv->dlsnr;
982 off_t blkend = kb->addr + (1ULL << kb->szpow);
983 uint8_t *wp = mm + blkend;
984 memcpy(tidx, kb->pidx, sizeof(tidx));
985 ks_mergesort_kvblk(KVBLK_IDXNUM, tidx, tidx_tmp);
986
987 coff = 0;
988 for (i = 0; i < KVBLK_IDXNUM && tidx[i].off; ++i) {
989 #ifndef NDEBUG
990 if (i > 0) {
991 assert(tidx[i - 1].off < tidx[i].off);
992 }
993 #endif
994 KVP *kvp = &kb->pidx[tidx[i].ridx];
995 off_t noff = coff + kvp->len;
996 if (kvp->off > noff) {
997 assert(noff <= (1ULL << kb->szpow) && kvp->len <= noff);
998 if (dlsnr) {
999 rc = dlsnr->onwrite(dlsnr, blkend - noff, wp - kvp->off, kvp->len, 0);
1000 }
1001 memmove(wp - noff, wp - kvp->off, kvp->len);
1002 kvp->off = noff;
1003 }
1004 coff += kvp->len;
1005 idxsiz += IW_VNUMSIZE(kvp->off);
1006 idxsiz += IW_VNUMSIZE32(kvp->len);
1007 }
1008 idxsiz += (KVBLK_IDXNUM - i) * 2;
1009 for (i = 0; i < KVBLK_IDXNUM; ++i) {
1010 if (!kb->pidx[i].len) {
1011 kb->zidx = i;
1012 break;
1013 }
1014 }
1015 assert(idxsiz <= kb->idxsz);
1016 kb->idxsz = idxsiz;
1017 kb->maxoff = coff;
1018 if (i == KVBLK_IDXNUM) {
1019 kb->zidx = -1;
1020 }
1021 kb->flags |= KVBLK_DURTY;
1022 assert(_kvblk_compacted_offset(kb) == kb->maxoff);
1023 return rc;
1024 }
1025
_kvblk_maxkvoff(KVBLK * kb)1026 IW_INLINE off_t _kvblk_maxkvoff(KVBLK *kb) {
1027 off_t off = 0;
1028 for (int i = 0; i < KVBLK_IDXNUM; ++i) {
1029 if (kb->pidx[i].off > off) {
1030 off = kb->pidx[i].off;
1031 }
1032 }
1033 return off;
1034 }
1035
_kvblk_rmkv(KVBLK * kb,uint8_t idx,kvblk_rmkv_opts_t opts)1036 static WUR iwrc _kvblk_rmkv(KVBLK *kb, uint8_t idx, kvblk_rmkv_opts_t opts) {
1037 iwrc rc = 0;
1038 uint8_t *mm = 0;
1039 IWDLSNR *dlsnr = kb->db->iwkv->dlsnr;
1040 IWFS_FSM *fsm = &kb->db->iwkv->fsm;
1041 if (kb->pidx[idx].off >= kb->maxoff) {
1042 kb->maxoff = 0;
1043 for (int i = 0; i < KVBLK_IDXNUM; ++i) {
1044 if (i != idx && kb->pidx[i].off > kb->maxoff) {
1045 kb->maxoff = kb->pidx[i].off;
1046 }
1047 }
1048 }
1049 kb->pidx[idx].len = 0;
1050 kb->pidx[idx].off = 0;
1051 kb->flags |= KVBLK_DURTY;
1052 if (kb->zidx < 0 || idx < kb->zidx) {
1053 kb->zidx = idx;
1054 }
1055 if (!(RMKV_NO_RESIZE & opts) && kb->szpow > KVBLK_INISZPOW) {
1056 off_t nlen = 1ULL << kb->szpow;
1057 off_t dsz = _kvblk_compacted_dsize(kb);
1058 if (nlen >= 2 * dsz) {
1059 uint8_t npow = kb->szpow - 1;
1060 while (npow > KVBLK_INISZPOW && (1ULL << (npow - 1)) >= dsz) {
1061 --npow;
1062 }
1063 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1064 RCGO(rc, finish);
1065
1066 rc = _kvblk_compact_mm(kb, mm);
1067 RCGO(rc, finish);
1068
1069 off_t maxoff = _kvblk_maxkvoff(kb);
1070 if (dlsnr) {
1071 rc = dlsnr->onwrite(dlsnr, kb->addr + (1ULL << npow) - maxoff, mm + kb->addr + nlen - maxoff, maxoff, 0);
1072 RCGO(rc, finish);
1073 }
1074 memmove(mm + kb->addr + (1ULL << npow) - maxoff,
1075 mm + kb->addr + nlen - maxoff,
1076 (size_t) maxoff);
1077
1078 fsm->release_mmap(fsm);
1079 mm = 0;
1080 rc = fsm->reallocate(fsm, (1ULL << npow), &kb->addr, &nlen, IWKV_FSM_ALLOC_FLAGS);
1081 RCGO(rc, finish);
1082 kb->szpow = npow;
1083 assert(nlen == (1ULL << kb->szpow));
1084 opts |= RMKV_SYNC;
1085 }
1086 }
1087 if (RMKV_SYNC & opts) {
1088 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1089 RCGO(rc, finish);
1090 IWRC(_kvblk_sync_mm(kb, mm), rc);
1091 }
1092
1093 finish:
1094 if (mm) {
1095 fsm->release_mmap(fsm);
1096 }
1097 return rc;
1098 }
1099
_kvblk_addkv(KVBLK * kb,const IWKV_val * key,const IWKV_val * val,uint8_t * oidx,bool raw_key)1100 static WUR iwrc _kvblk_addkv(KVBLK *kb,
1101 const IWKV_val *key,
1102 const IWKV_val *val,
1103 uint8_t *oidx,
1104 bool raw_key) {
1105 *oidx = 0;
1106
1107 iwrc rc = 0;
1108 off_t msz; // max available free space
1109 off_t rsz; // required size to add new key/value pair
1110 off_t noff; // offset of new kvpair from end of block
1111 uint8_t *mm, *wp, *sptr;
1112 size_t i, sp;
1113 KVP *kvp;
1114 IWDB db = kb->db;
1115 bool compound = !raw_key && (db->dbflg & IWDB_COMPOUND_KEYS);
1116 IWFS_FSM *fsm = &db->iwkv->fsm;
1117 bool compacted = false;
1118 IWDLSNR *dlsnr = kb->db->iwkv->dlsnr;
1119 IWKV_val *uval = (IWKV_val *) val;
1120
1121 size_t ksize = key->size;
1122 if (compound) {
1123 ksize += IW_VNUMSIZE(key->compound);
1124 }
1125 off_t psz = IW_VNUMSIZE(ksize) + ksize;
1126
1127 if (kb->zidx < 0) {
1128 return _IWKV_RC_KVBLOCK_FULL;
1129 }
1130 psz += uval->size;
1131 if (psz > IWKV_MAX_KVSZ) {
1132 return IWKV_ERROR_MAXKVSZ;
1133 }
1134
1135 start:
1136 // [szpow:u1,idxsz:u2,[ps0:vn,pl0:vn,..., ps32,pl32]____[[KV],...]] // KVBLK
1137 msz = (1ULL << kb->szpow) - (KVBLK_HDRSZ + kb->idxsz + kb->maxoff);
1138 assert(msz >= 0);
1139 noff = kb->maxoff + psz;
1140 rsz = psz + IW_VNUMSIZE(noff) + IW_VNUMSIZE(psz);
1141
1142 if (msz < rsz) { // not enough space
1143 if (!compacted) {
1144 compacted = true;
1145 if (_kvblk_compacted_offset(kb) != kb->maxoff) {
1146 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1147 RCGO(rc, finish);
1148 rc = _kvblk_compact_mm(kb, mm);
1149 RCGO(rc, finish);
1150 fsm->release_mmap(fsm);
1151 goto start;
1152 }
1153 }
1154 // resize the whole block
1155 off_t nlen = 1ULL << kb->szpow;
1156 off_t nsz = rsz - msz + nlen;
1157 off_t naddr = kb->addr;
1158 off_t olen = nlen;
1159
1160 uint8_t npow = kb->szpow;
1161 while ((1ULL << ++npow) < nsz);
1162
1163 rc = fsm->allocate(fsm, (1ULL << npow), &naddr, &nlen, IWKV_FSM_ALLOC_FLAGS);
1164 RCGO(rc, finish);
1165 assert(nlen == (1ULL << npow));
1166 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1167 RCGO(rc, finish);
1168 if (dlsnr) {
1169 rc = dlsnr->onwrite(dlsnr, naddr, mm + kb->addr, KVBLK_HDRSZ, 0);
1170 RCGO(rc, finish);
1171 memcpy(mm + naddr, mm + kb->addr, KVBLK_HDRSZ);
1172 rc = dlsnr->onwrite(dlsnr, naddr + nlen - kb->maxoff, mm + kb->addr + olen - kb->maxoff, kb->maxoff, 0);
1173 RCGO(rc, finish);
1174 memcpy(mm + naddr + nlen - kb->maxoff, mm + kb->addr + olen - kb->maxoff, (size_t) kb->maxoff);
1175 } else {
1176 memcpy(mm + naddr, mm + kb->addr, KVBLK_HDRSZ);
1177 memcpy(mm + naddr + nlen - kb->maxoff, mm + kb->addr + olen - kb->maxoff, (size_t) kb->maxoff);
1178 }
1179 fsm->release_mmap(fsm);
1180 rc = fsm->deallocate(fsm, kb->addr, olen);
1181 RCGO(rc, finish);
1182
1183 kb->addr = naddr;
1184 kb->szpow = npow;
1185 }
1186 *oidx = (uint8_t) kb->zidx;
1187 kvp = &kb->pidx[kb->zidx];
1188 kvp->len = (uint32_t) psz;
1189 kvp->off = noff;
1190 kvp->ridx = (uint8_t) kb->zidx;
1191 kb->maxoff = noff;
1192 kb->flags |= KVBLK_DURTY;
1193 for (i = 0; i < KVBLK_IDXNUM; ++i) {
1194 if (!kb->pidx[i].len && i != kb->zidx) {
1195 kb->zidx = i;
1196 break;
1197 }
1198 }
1199 if (i >= KVBLK_IDXNUM) {
1200 kb->zidx = -1;
1201 }
1202 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1203 RCGO(rc, finish);
1204 assert((1ULL << kb->szpow) >= KVBLK_HDRSZ + kb->idxsz + kb->maxoff);
1205 assert(kvp->off < (1ULL << kb->szpow) && kvp->len <= kvp->off);
1206 wp = mm + kb->addr + (1ULL << kb->szpow) - kvp->off;
1207 sptr = wp;
1208 // [klen:vn,key,value]
1209 IW_SETVNUMBUF(sp, wp, ksize);
1210 wp += sp;
1211 if (compound) {
1212 IW_SETVNUMBUF64(sp, wp, key->compound);
1213 wp += sp;
1214 }
1215 memcpy(wp, key->data, key->size);
1216 wp += key->size;
1217 memcpy(wp, uval->data, uval->size);
1218 wp += uval->size;
1219 #ifndef NDEBUG
1220 assert(wp - sptr == kvp->len);
1221 #endif
1222 if (dlsnr) {
1223 rc = dlsnr->onwrite(dlsnr, kb->addr + (1ULL << kb->szpow) - kvp->off, sptr, wp - sptr, 0);
1224 }
1225 fsm->release_mmap(fsm);
1226
1227 finish:
1228 return rc;
1229 }
1230
_kvblk_updatev(KVBLK * kb,uint8_t * idxp,const IWKV_val * key,const IWKV_val * val)1231 static WUR iwrc _kvblk_updatev(KVBLK *kb,
1232 uint8_t *idxp,
1233 const IWKV_val *key, /* Nullable */
1234 const IWKV_val *val) {
1235 assert(*idxp < KVBLK_IDXNUM);
1236 int32_t i;
1237 uint32_t len, nlen, sz;
1238 uint8_t pidx = *idxp, *mm = 0, *wp, *sp;
1239 IWDB db = kb->db;
1240 IWDLSNR *dlsnr = kb->db->iwkv->dlsnr;
1241 IWKV_val *uval = (IWKV_val *) val;
1242 IWKV_val *ukey = (IWKV_val *) key;
1243 IWKV_val skey; // stack allocated key/val
1244 KVP *kvp = &kb->pidx[pidx];
1245 size_t kbsz = 1ULL << kb->szpow; // kvblk size
1246 off_t freesz = kbsz - KVBLK_HDRSZ - kb->idxsz - kb->maxoff; // free space available
1247 IWFS_FSM *fsm = &db->iwkv->fsm;
1248
1249 iwrc rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1250 RCRET(rc);
1251 assert(freesz >= 0);
1252
1253 wp = mm + kb->addr + kbsz - kvp->off;
1254 sp = wp;
1255 IW_READVNUMBUF(wp, len, sz);
1256 wp += sz;
1257 if (ukey && len != ukey->size) {
1258 rc = IWKV_ERROR_CORRUPTED;
1259 iwlog_ecode_error3(rc);
1260 goto finish;
1261 }
1262 wp += len;
1263 off_t rsize = sz + len + uval->size; // required size
1264 if (rsize <= kvp->len) {
1265 memcpy(wp, uval->data, uval->size);
1266 if (dlsnr) {
1267 rc = dlsnr->onwrite(dlsnr, wp - mm, uval->data, uval->size, 0);
1268 RCGO(rc, finish);
1269 }
1270 wp += uval->size;
1271 if ((wp - sp) != kvp->len) {
1272 kvp->len = wp - sp;
1273 kb->flags |= KVBLK_DURTY;
1274 }
1275 } else {
1276 KVP tidx[KVBLK_IDXNUM];
1277 KVP tidx_tmp[KVBLK_IDXNUM];
1278 off_t koff = kb->pidx[pidx].off;
1279 memcpy(tidx, kb->pidx, KVBLK_IDXNUM * sizeof(kb->pidx[0]));
1280 ks_mergesort_kvblk(KVBLK_IDXNUM, tidx, tidx_tmp);
1281 kb->flags |= KVBLK_DURTY;
1282 if (!ukey) { // we need a key
1283 ukey = &skey;
1284 rc = _kvblk_key_get(kb, mm, pidx, ukey);
1285 RCGO(rc, finish);
1286 }
1287 for (i = 0; i < KVBLK_IDXNUM; ++i) {
1288 if (tidx[i].off == koff) {
1289 if (koff - (i > 0 ? tidx[i - 1].off : 0) >= rsize) {
1290 nlen = wp + uval->size - sp;
1291 if (!(nlen > kvp->len && freesz - IW_VNUMSIZE32(nlen) + IW_VNUMSIZE32(kvp->len) < 0)) { // enough space?
1292 memcpy(wp, uval->data, uval->size);
1293 if (dlsnr) {
1294 rc = dlsnr->onwrite(dlsnr, wp - mm, uval->data, uval->size, 0);
1295 RCGO(rc, finish);
1296 }
1297 wp += uval->size;
1298 kvp->len = nlen;
1299 break;;
1300 }
1301 }
1302 mm = 0;
1303 fsm->release_mmap(fsm);
1304 rc = _kvblk_rmkv(kb, pidx, RMKV_NO_RESIZE);
1305 RCGO(rc, finish);
1306 rc = _kvblk_addkv(kb, ukey, uval, idxp, false);
1307 break;
1308 }
1309 }
1310 }
1311
1312 finish:
1313 if (ukey != key) {
1314 _kv_val_dispose(ukey);
1315 }
1316 if (mm) {
1317 IWRC(fsm->release_mmap(fsm), rc);
1318 }
1319 return rc;
1320 }
1321
1322 //-------------------------- SBLK
1323
_sblk_release(IWLCTX * lx,SBLK ** sblkp)1324 IW_INLINE void _sblk_release(IWLCTX *lx, SBLK **sblkp) {
1325 assert(sblkp && *sblkp);
1326 SBLK *sblk = *sblkp;
1327 sblk->flags &= ~SBLK_CACHE_FLAGS; // clear cache flags
1328 sblk->flags &= ~SBLK_DURTY; // clear dirty flag
1329 sblk->kvblk = 0;
1330 *sblkp = 0;
1331 }
1332
_sblk_loadkvblk_mm(IWLCTX * lx,SBLK * sblk,uint8_t * mm)1333 IW_INLINE WUR iwrc _sblk_loadkvblk_mm(IWLCTX *lx, SBLK *sblk, uint8_t *mm) {
1334 if (!sblk->kvblk && sblk->kvblkn) {
1335 return _kvblk_at_mm(lx, BLK2ADDR(sblk->kvblkn), mm, 0, &sblk->kvblk);
1336 } else {
1337 return 0;
1338 }
1339 }
1340
_sblk_is_only_one_on_page_v2(IWLCTX * lx,uint8_t * mm,SBLK * sblk,off_t * page_addr)1341 static bool _sblk_is_only_one_on_page_v2(IWLCTX *lx, uint8_t *mm, SBLK *sblk, off_t *page_addr) {
1342 *page_addr = 0;
1343 if (sblk->bpos > 0 && sblk->bpos <= SBLK_PAGE_SBLK_NUM_V2) {
1344 off_t addr = sblk->addr - (sblk->bpos - 1) * SBLK_SZ;
1345 *page_addr = addr;
1346 for (int i = 0; i < SBLK_PAGE_SBLK_NUM_V2; ++i) {
1347 if (i != sblk->bpos - 1) {
1348 uint8_t bv;
1349 memcpy(&bv, mm + addr + i * SBLK_SZ + SOFF_BPOS_U1_V2, 1);
1350 if (bv) {
1351 return false;
1352 }
1353 }
1354 }
1355 } else {
1356 return false; // be safe
1357 }
1358 return true;
1359 }
1360
_sblk_destroy(IWLCTX * lx,SBLK ** sblkp)1361 IW_INLINE WUR iwrc _sblk_destroy(IWLCTX *lx, SBLK **sblkp) {
1362 assert(sblkp && *sblkp && (*sblkp)->addr);
1363 iwrc rc = 0;
1364 SBLK *sblk = *sblkp;
1365 lx->destroy_addr = sblk->addr;
1366
1367 if (!(sblk->flags & SBLK_DB)) {
1368 uint8_t kvb_szpow, *mm;
1369 IWDLSNR *dlsnr = lx->db->iwkv->dlsnr;
1370 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
1371 off_t kvb_addr = BLK2ADDR(sblk->kvblkn);
1372 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1373 RCRET(rc);
1374
1375 if (!sblk->kvblk) {
1376 // Read KVBLK size as power of two
1377 memcpy(&kvb_szpow, mm + kvb_addr + KBLK_SZPOW_OFF, 1);
1378 } else {
1379 kvb_szpow = sblk->kvblk->szpow;
1380 }
1381 if (lx->db->lcnt[sblk->lvl]) {
1382 lx->db->lcnt[sblk->lvl]--;
1383 lx->db->flags |= SBLK_DURTY;
1384 }
1385 _dbcache_remove_lw(lx, sblk);
1386 if (lx->db->iwkv->fmt_version > 1) {
1387 off_t paddr;
1388 if (_sblk_is_only_one_on_page_v2(lx, mm, sblk, &paddr)) {
1389 fsm->release_mmap(fsm);
1390 // Deallocate whole page
1391 rc = fsm->deallocate(fsm, paddr, SBLK_PAGE_SZ_V2);
1392 } else {
1393 memset(mm + sblk->addr + SOFF_BPOS_U1_V2, 0, 1);
1394 fsm->release_mmap(fsm);
1395 if (dlsnr) {
1396 dlsnr->onset(dlsnr, sblk->addr + SOFF_BPOS_U1_V2, 0, 1, 0);
1397 }
1398 }
1399 } else {
1400 fsm->release_mmap(fsm);
1401 rc = fsm->deallocate(fsm, sblk->addr, SBLK_SZ);
1402 }
1403 IWRC(fsm->deallocate(fsm, kvb_addr, 1ULL << kvb_szpow), rc);
1404 }
1405 _sblk_release(lx, sblkp);
1406 return rc;
1407 }
1408
_sblk_genlevel(IWDB db)1409 IW_INLINE uint8_t _sblk_genlevel(IWDB db) {
1410 uint8_t lvl;
1411 #ifdef IW_TESTS
1412 if (iwkv_next_level >= 0) {
1413 lvl = (uint8_t) iwkv_next_level;
1414 iwkv_next_level = -1;
1415 assert(lvl < SLEVELS);
1416 return lvl;
1417 }
1418 #endif
1419 uint32_t r = iwu_rand_u32();
1420 for (lvl = 0; lvl < SLEVELS && !(r & 1); ++lvl) r >>= 1;
1421 uint8_t ret = IW_UNLIKELY(lvl >= SLEVELS) ? SLEVELS - 1 : lvl;
1422 while (ret > 0 && db->lcnt[ret - 1] == 0) {
1423 --ret;
1424 }
1425 return ret;
1426 }
1427
_sblk_create_v1(IWLCTX * lx,uint8_t nlevel,uint8_t kvbpow,off_t baddr,uint8_t bpos,SBLK ** oblk)1428 static WUR iwrc _sblk_create_v1(IWLCTX *lx, uint8_t nlevel, uint8_t kvbpow, off_t baddr, uint8_t bpos, SBLK **oblk) {
1429 iwrc rc;
1430 SBLK *sblk;
1431 KVBLK *kvblk;
1432 off_t blen;
1433 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
1434 if (kvbpow < KVBLK_INISZPOW) {
1435 kvbpow = KVBLK_INISZPOW;
1436 }
1437 *oblk = 0;
1438 if (!bpos) {
1439 rc = fsm->allocate(fsm, SBLK_SZ + (1ULL << kvbpow), &baddr, &blen, IWKV_FSM_ALLOC_FLAGS);
1440 RCRET(rc);
1441 assert(blen - SBLK_SZ == (1ULL << kvbpow));
1442 _kvblk_create(lx, baddr + SBLK_SZ, kvbpow, &kvblk);
1443 } else {
1444 // Allocate kvblk as separate chunk
1445 off_t kblkaddr = 0;
1446 rc = fsm->allocate(fsm, (1ULL << kvbpow), &kblkaddr, &blen, IWKV_FSM_ALLOC_FLAGS);
1447 assert(blen == (1ULL << kvbpow));
1448 _kvblk_create(lx, kblkaddr, kvbpow, &kvblk);
1449 }
1450 sblk = &lx->saa[lx->saan];
1451 sblk->db = lx->db;
1452 sblk->db->lcnt[nlevel]++;
1453 sblk->db->flags |= SBLK_DURTY;
1454 sblk->addr = baddr;
1455 sblk->flags = (SBLK_DURTY | SBLK_CACHE_PUT);
1456 sblk->lvl = nlevel;
1457 sblk->p0 = 0;
1458 memset(sblk->n, 0, sizeof(sblk->n));
1459 sblk->kvblk = kvblk;
1460 sblk->kvblkn = ADDR2BLK(kvblk->addr);
1461 sblk->lkl = 0;
1462 sblk->pnum = 0;
1463 sblk->bpos = bpos;
1464 memset(sblk->pi, 0, sizeof(sblk->pi));
1465 *oblk = sblk;
1466 AAPOS_INC(lx->saan);
1467 return 0;
1468 }
1469
_sblk_find_free_page_slot_v2(IWLCTX * lx,uint8_t * mm,SBLK * sblk,off_t * obaddr,uint8_t * oslot)1470 static void _sblk_find_free_page_slot_v2(IWLCTX *lx, uint8_t *mm, SBLK *sblk, off_t *obaddr, uint8_t *oslot) {
1471 if (sblk->bpos < 1 || sblk->bpos > SBLK_PAGE_SBLK_NUM_V2) {
1472 *obaddr = 0;
1473 *oslot = 0;
1474 return;
1475 }
1476 off_t paddr = sblk->addr - (sblk->bpos - 1) * SBLK_SZ;
1477 for (int i = sblk->bpos + 1; i <= SBLK_PAGE_SBLK_NUM_V2; ++i) {
1478 uint8_t slot;
1479 memcpy(&slot, mm + paddr + (i - 1) * SBLK_SZ + SOFF_BPOS_U1_V2, 1);
1480 if (!slot) {
1481 *obaddr = paddr + (i - 1) * SBLK_SZ;
1482 *oslot = i;
1483 return;
1484 }
1485 }
1486 for (int i = sblk->bpos - 1; i > 0; --i) {
1487 uint8_t slot;
1488 memcpy(&slot, mm + paddr + (i - 1) * SBLK_SZ + SOFF_BPOS_U1_V2, 1);
1489 if (!slot) {
1490 *obaddr = paddr + (i - 1) * SBLK_SZ;
1491 *oslot = i;
1492 return;
1493 }
1494 }
1495 *obaddr = 0;
1496 *oslot = 0;
1497 }
1498
1499 /// Create
_sblk_create_v2(IWLCTX * lx,uint8_t nlevel,uint8_t kvbpow,SBLK * lower,SBLK * upper,SBLK ** oblk)1500 static WUR iwrc _sblk_create_v2(IWLCTX *lx, uint8_t nlevel, uint8_t kvbpow, SBLK *lower, SBLK *upper, SBLK **oblk) {
1501 off_t baddr = 0;
1502 uint8_t bpos = 0, *mm;
1503 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
1504 SBLK *_lower = lower;
1505 SBLK *_upper = upper;
1506
1507 for (int i = SLEVELS - 1; i >= 0; --i) {
1508 if (lx->pupper[i] && lx->pupper[i]->lvl >= nlevel) {
1509 _upper = lx->pupper[i];
1510 }
1511 if (lx->plower[i] && lx->plower[i]->lvl >= nlevel) {
1512 _lower = lx->plower[i];
1513 }
1514 }
1515
1516 iwrc rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1517 RCRET(rc);
1518 _sblk_find_free_page_slot_v2(lx, mm, _lower, &baddr, &bpos);
1519 if (!baddr && _upper && _upper->addr != _lower->addr) {
1520 _sblk_find_free_page_slot_v2(lx, mm, _upper, &baddr, &bpos);
1521 }
1522 if (!baddr) {
1523 if (_lower->addr != lower->addr) {
1524 _sblk_find_free_page_slot_v2(lx, mm, lower, &baddr, &bpos);
1525 }
1526 if (!baddr && upper && _upper && _upper->addr != upper->addr) {
1527 _sblk_find_free_page_slot_v2(lx, mm, upper, &baddr, &bpos);
1528 }
1529 }
1530 fsm->release_mmap(fsm);
1531
1532 if (!baddr) {
1533 // No free slots - allocate new SBLK page
1534 off_t blen;
1535 bpos = 1;
1536 IWDLSNR *dlsnr = lx->db->iwkv->dlsnr;
1537 rc = fsm->allocate(fsm, SBLK_PAGE_SZ_V2, &baddr, &blen, IWKV_FSM_ALLOC_FLAGS);
1538 RCRET(rc);
1539 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1540 RCRET(rc);
1541 // Fill page to zero
1542 memset(mm + baddr, 0, blen);
1543 if (dlsnr) {
1544 rc = dlsnr->onset(dlsnr, baddr, 0, blen, 0);
1545 }
1546 fsm->release_mmap(fsm);
1547 RCRET(rc);
1548 }
1549 return _sblk_create_v1(lx, nlevel, kvbpow, baddr, bpos, oblk);
1550 }
1551
_sblk_create(IWLCTX * lx,uint8_t nlevel,uint8_t kvbpow,SBLK * lower,SBLK * upper,SBLK ** oblk)1552 IW_INLINE WUR iwrc _sblk_create(IWLCTX *lx, uint8_t nlevel, uint8_t kvbpow, SBLK *lower, SBLK *upper, SBLK **oblk) {
1553 if (lx->db->iwkv->fmt_version > 1) {
1554 return _sblk_create_v2(lx, nlevel, kvbpow, lower, upper, oblk);
1555 } else {
1556 return _sblk_create_v1(lx, nlevel, kvbpow, lower->addr, 0, oblk);
1557 }
1558 }
1559
_sblk_at2(IWLCTX * lx,off_t addr,sblk_flags_t flgs,SBLK * sblk)1560 static WUR iwrc _sblk_at2(IWLCTX *lx, off_t addr, sblk_flags_t flgs, SBLK *sblk) {
1561 iwrc rc;
1562 uint8_t *mm;
1563 uint32_t lv;
1564 sblk_flags_t flags = lx->sbflags | flgs;
1565 IWDB db = lx->db;
1566 IWFS_FSM *fsm = &db->iwkv->fsm;
1567 sblk->kvblk = 0;
1568 sblk->bpos = 0;
1569 sblk->db = db;
1570
1571 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1572 RCRET(rc);
1573
1574 if (IW_UNLIKELY(addr == db->addr)) {
1575 uint8_t *rp = mm + addr + DOFF_N0_U4;
1576 // [magic:u4,dbflg:u1,dbid:u4,next_db_blk:u4,p0:u4,n[24]:u4,c[24]:u4,meta_blk:u4,meta_blkn:u4]:217
1577 sblk->addr = addr;
1578 sblk->flags = SBLK_DB | flags;
1579 sblk->lvl = 0;
1580 sblk->p0 = 0;
1581 sblk->kvblkn = 0;
1582 sblk->lkl = 0;
1583 sblk->pnum = KVBLK_IDXNUM;
1584 memset(sblk->pi, 0, sizeof(sblk->pi));
1585 for (int i = 0; i < SLEVELS; ++i) {
1586 IW_READLV(rp, lv, sblk->n[i]);
1587 if (sblk->n[i]) {
1588 ++sblk->lvl;
1589 } else {
1590 break;
1591 }
1592 }
1593 if (sblk->lvl) --sblk->lvl;
1594 } else if (addr) {
1595 uint8_t uflags;
1596 uint8_t *rp = mm + addr;
1597 sblk->addr = addr;
1598 // [flags:u1,lvl:u1,lkl:u1,pnum:u1,p0:u4,kblk:u4,pi:u1[32],n:u4[24],bpos:u1,lk:u115]:u256
1599 memcpy(&uflags, rp++, 1);
1600 sblk->flags = uflags;
1601 if (sblk->flags & ~SBLK_PERSISTENT_FLAGS) {
1602 rc = IWKV_ERROR_CORRUPTED;
1603 iwlog_ecode_error3(rc);
1604 goto finish;
1605 }
1606 sblk->flags |= flags;
1607 memcpy(&sblk->lvl, rp++, 1);
1608 if (sblk->lvl >= SLEVELS) {
1609 rc = IWKV_ERROR_CORRUPTED;
1610 iwlog_ecode_error3(rc);
1611 goto finish;
1612 }
1613 memcpy(&sblk->lkl, rp++, 1);
1614 if (sblk->lkl > db->iwkv->pklen) {
1615 rc = IWKV_ERROR_CORRUPTED;
1616 iwlog_ecode_error3(rc);
1617 goto finish;
1618 }
1619 memcpy(&sblk->pnum, rp++, 1);
1620 if (sblk->pnum < 0) {
1621 rc = IWKV_ERROR_CORRUPTED;
1622 iwlog_ecode_error3(rc);
1623 goto finish;
1624 }
1625 memcpy(&sblk->p0, rp, 4);
1626 sblk->p0 = IW_ITOHL(sblk->p0);
1627 rp += 4;
1628 memcpy(&sblk->kvblkn, rp, 4);
1629 sblk->kvblkn = IW_ITOHL(sblk->kvblkn);
1630 rp += 4;
1631 memcpy(sblk->pi, rp, KVBLK_IDXNUM);
1632 rp += KVBLK_IDXNUM;
1633 for (int i = 0; i <= sblk->lvl; ++i) {
1634 memcpy(&sblk->n[i], rp, 4);
1635 sblk->n[i] = IW_ITOHL(sblk->n[i]);
1636 rp += 4;
1637 }
1638 if (db->iwkv->fmt_version > 1) {
1639 rp = mm + addr + SOFF_BPOS_U1_V2;
1640 memcpy(&sblk->bpos, rp++, 1);
1641 } else {
1642 rp = mm + addr + SOFF_LK_V1;
1643 }
1644 // Lower key
1645 memcpy(sblk->lk, rp, (size_t) sblk->lkl);
1646
1647 } else { // Database tail
1648 uint8_t *rp = mm + db->addr + DOFF_P0_U4;
1649 sblk->addr = 0;
1650 sblk->flags = SBLK_DB | flags;
1651 sblk->lvl = 0;
1652 sblk->kvblkn = 0;
1653 sblk->lkl = 0;
1654 sblk->pnum = KVBLK_IDXNUM;
1655 memset(sblk->pi, 0, sizeof(sblk->pi));
1656 IW_READLV(rp, lv, sblk->p0);
1657 if (!sblk->p0) {
1658 sblk->p0 = ADDR2BLK(db->addr);
1659 }
1660 }
1661
1662 finish:
1663 fsm->release_mmap(fsm);
1664 return rc;
1665 }
1666
_sblk_at(IWLCTX * lx,off_t addr,sblk_flags_t flgs,SBLK ** sblkp)1667 IW_INLINE WUR iwrc _sblk_at(IWLCTX *lx, off_t addr, sblk_flags_t flgs, SBLK **sblkp) {
1668 *sblkp = 0;
1669 SBLK *sblk = &lx->saa[lx->saan];
1670 iwrc rc = _sblk_at2(lx, addr, flgs, sblk);
1671 AAPOS_INC(lx->saan);
1672 *sblkp = sblk;
1673 return rc;
1674 }
1675
_sblk_sync_mm(IWLCTX * lx,SBLK * sblk,uint8_t * mm)1676 static WUR iwrc _sblk_sync_mm(IWLCTX *lx, SBLK *sblk, uint8_t *mm) {
1677 iwrc rc = 0;
1678 if (sblk->flags & SBLK_DURTY) {
1679 uint32_t lv;
1680 IWDLSNR *dlsnr = lx->db->iwkv->dlsnr;
1681 sblk->flags &= ~SBLK_DURTY;
1682 if (IW_UNLIKELY(sblk->flags & SBLK_DB)) {
1683 uint8_t *sp;
1684 uint8_t *wp = mm + sblk->db->addr;
1685 if (sblk->addr) {
1686 assert(sblk->addr == sblk->db->addr);
1687 wp += DOFF_N0_U4;
1688 sp = wp;
1689 // [magic:u4,dbflg:u1,dbid:u4,next_db_blk:u4,p0:u4,n[24]:u4,c[24]:u4,meta_blk:u4,meta_blkn:u4]:217
1690 for (int i = 0; i < SLEVELS; ++i) {
1691 IW_WRITELV(wp, lv, sblk->n[i]);
1692 }
1693 assert(wp - (mm + sblk->db->addr) <= SBLK_SZ);
1694 for (int i = 0; i < SLEVELS; ++i) {
1695 IW_WRITELV(wp, lv, lx->db->lcnt[i]);
1696 }
1697 } else { // Database tail
1698 wp += DOFF_P0_U4;
1699 sp = wp;
1700 IW_WRITELV(wp, lv, sblk->p0);
1701 assert(wp - (mm + sblk->db->addr) <= SBLK_SZ);
1702 }
1703 if (dlsnr) {
1704 rc = dlsnr->onwrite(dlsnr, sp - mm, sp, wp - sp, 0);
1705 }
1706 return rc;
1707 } else {
1708 uint8_t *wp = mm + sblk->addr;
1709 sblk_flags_t flags = (sblk->flags & SBLK_PERSISTENT_FLAGS);
1710 uint8_t uflags = flags;
1711 assert(sblk->lkl <= lx->db->iwkv->pklen);
1712 // [u1:flags,lvl:u1,lkl:u1,pnum:u1,p0:u4,kblk:u4,[pi0:u1,... pi32],n0-n23:u4,lk:u116]:u256
1713 wp += SOFF_FLAGS_U1;
1714 memcpy(wp++, &uflags, 1);
1715 memcpy(wp++, &sblk->lvl, 1);
1716 memcpy(wp++, &sblk->lkl, 1);
1717 memcpy(wp++, &sblk->pnum, 1);
1718 IW_WRITELV(wp, lv, sblk->p0);
1719 IW_WRITELV(wp, lv, sblk->kvblkn);
1720 memcpy(wp, sblk->pi, KVBLK_IDXNUM);
1721 wp = mm + sblk->addr + SOFF_N0_U4;
1722 for (int i = 0; i <= sblk->lvl; ++i) {
1723 IW_WRITELV(wp, lv, sblk->n[i]);
1724 }
1725 if (lx->db->iwkv->fmt_version > 1) {
1726 wp = mm + sblk->addr + SOFF_BPOS_U1_V2;
1727 memcpy(wp++, &sblk->bpos, 1);
1728 } else {
1729 wp = mm + sblk->addr + SOFF_LK_V1;
1730 }
1731 memcpy(wp, sblk->lk, (size_t) sblk->lkl);
1732 if (dlsnr) {
1733 rc = dlsnr->onwrite(dlsnr, sblk->addr, mm + sblk->addr, SOFF_END, 0);
1734 RCRET(rc);
1735 }
1736 }
1737 }
1738 if (sblk->kvblk && (sblk->kvblk->flags & KVBLK_DURTY)) {
1739 IWRC(_kvblk_sync_mm(sblk->kvblk, mm), rc);
1740 }
1741 if (sblk->flags & SBLK_CACHE_UPDATE) {
1742 _dbcache_update_lw(lx, sblk);
1743 }
1744 return rc;
1745 }
1746
_sblk_sync(IWLCTX * lx,SBLK * sblk)1747 IW_INLINE WUR iwrc _sblk_sync(IWLCTX *lx, SBLK *sblk) {
1748 if ((sblk->flags & SBLK_DURTY) || (sblk->kvblk && (sblk->kvblk->flags & KVBLK_DURTY))) {
1749 uint8_t *mm;
1750 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
1751 iwrc rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1752 RCRET(rc);
1753 rc = _sblk_sync_mm(lx, sblk, mm);
1754 fsm->release_mmap(fsm);
1755 return rc;
1756 }
1757 return 0;
1758 }
1759
_sblk_sync_and_release_mm(IWLCTX * lx,SBLK ** sblkp,uint8_t * mm)1760 IW_INLINE WUR iwrc _sblk_sync_and_release_mm(IWLCTX *lx, SBLK **sblkp, uint8_t *mm) {
1761 SBLK *sblk = *sblkp;
1762 if (lx->destroy_addr && lx->destroy_addr == sblk->addr) {
1763 return 0;
1764 }
1765 iwrc rc = 0;
1766 if (mm) {
1767 rc = _sblk_sync_mm(lx, *sblkp, mm);
1768 }
1769 _sblk_release(lx, sblkp);
1770 return rc;
1771 }
1772
_sblk_find_pi_mm(SBLK * sblk,IWLCTX * lx,const uint8_t * mm,bool * found,uint8_t * idxp)1773 static WUR iwrc _sblk_find_pi_mm(SBLK *sblk, IWLCTX *lx, const uint8_t *mm, bool *found, uint8_t *idxp) {
1774 *found = false;
1775 if (sblk->flags & SBLK_DB) {
1776 *idxp = KVBLK_IDXNUM;
1777 return 0;
1778 }
1779 uint8_t *k;
1780 uint32_t kl;
1781 int idx = 0, lb = 0, ub = sblk->pnum - 1;
1782 iwdb_flags_t dbflg = lx->db->dbflg;
1783
1784 if (sblk->pnum < 1) {
1785 *idxp = 0;
1786 return 0;
1787 }
1788 while (1) {
1789 idx = (ub + lb) / 2;
1790 iwrc rc = _kvblk_key_peek(sblk->kvblk, sblk->pi[idx], mm, &k, &kl);
1791 RCRET(rc);
1792 int cr = _cmp_keys(dbflg, k, kl, lx->key);
1793 if (!cr) {
1794 *found = true;
1795 break;
1796 } else if (cr < 0) {
1797 lb = idx + 1;
1798 if (lb > ub) {
1799 idx = lb;
1800 break;
1801 }
1802 } else {
1803 ub = idx - 1;
1804 if (lb > ub) {
1805 break;
1806 }
1807 }
1808 }
1809 *idxp = idx;
1810 return 0;
1811 }
1812
_sblk_insert_pi_mm(SBLK * sblk,uint8_t nidx,IWLCTX * lx,const uint8_t * mm,uint8_t * idxp)1813 static WUR iwrc _sblk_insert_pi_mm(SBLK *sblk, uint8_t nidx, IWLCTX *lx,
1814 const uint8_t *mm, uint8_t *idxp) {
1815 assert(sblk->kvblk);
1816
1817 uint8_t *k;
1818 uint32_t kl;
1819 int idx = 0, lb = 0, ub = sblk->pnum - 1, nels = sblk->pnum; // NOLINT
1820
1821 if (nels < 1) {
1822 sblk->pi[0] = nidx;
1823 ++sblk->pnum;
1824 *idxp = 0;
1825 return 0;
1826 }
1827 iwdb_flags_t dbflg = sblk->db->dbflg;
1828 while (1) {
1829 idx = (ub + lb) / 2;
1830 iwrc rc = _kvblk_key_peek(sblk->kvblk, sblk->pi[idx], mm, &k, &kl);
1831 RCRET(rc);
1832 int cr = _cmp_keys(dbflg, k, kl, lx->key);
1833 if (!cr) {
1834 break;
1835 } else if (cr < 0) {
1836 lb = idx + 1;
1837 if (lb > ub) {
1838 idx = lb;
1839 ++sblk->pnum;
1840 break;
1841 }
1842 } else {
1843 ub = idx - 1;
1844 if (lb > ub) {
1845 ++sblk->pnum;
1846 break;
1847 }
1848 }
1849 }
1850 if (nels - idx > 0) {
1851 memmove(sblk->pi + idx + 1, sblk->pi + idx, nels - idx);
1852 }
1853 sblk->pi[idx] = nidx;
1854 *idxp = idx;
1855 return 0;
1856 }
1857
_sblk_addkv2(SBLK * sblk,int8_t idx,const IWKV_val * key,const IWKV_val * val,bool raw_key)1858 static WUR iwrc _sblk_addkv2(SBLK *sblk,
1859 int8_t idx,
1860 const IWKV_val *key,
1861 const IWKV_val *val,
1862 bool raw_key) {
1863 assert(sblk && key && key->size && key->data && val && idx >= 0 && sblk->kvblk);
1864
1865 uint8_t kvidx;
1866 IWDB db = sblk->db;
1867 KVBLK *kvblk = sblk->kvblk;
1868 if (sblk->pnum >= KVBLK_IDXNUM) {
1869 return _IWKV_RC_KVBLOCK_FULL;
1870 }
1871
1872 iwrc rc = _kvblk_addkv(kvblk, key, val, &kvidx, raw_key);
1873 RCRET(rc);
1874 if (sblk->pnum - idx > 0) {
1875 memmove(sblk->pi + idx + 1, sblk->pi + idx, sblk->pnum - idx);
1876 }
1877 sblk->pi[idx] = kvidx;
1878 if (sblk->kvblkn != ADDR2BLK(kvblk->addr)) {
1879 sblk->kvblkn = ADDR2BLK(kvblk->addr);
1880 if (!(sblk->flags & SBLK_CACHE_FLAGS)) {
1881 sblk->flags |= SBLK_CACHE_UPDATE;
1882 }
1883 }
1884 ++sblk->pnum;
1885 sblk->flags |= SBLK_DURTY;
1886 if (idx == 0) { // the lowest key inserted
1887 size_t ksize = key->size;
1888 bool compound = !raw_key && (db->dbflg & IWDB_COMPOUND_KEYS);
1889 if (compound) {
1890 ksize += IW_VNUMSIZE(key->compound);
1891 }
1892 sblk->lkl = MIN(db->iwkv->pklen, ksize);
1893 uint8_t *wp = sblk->lk;
1894 if (compound) {
1895 int len;
1896 IW_SETVNUMBUF64(len, wp, key->compound);
1897 wp += len;
1898 }
1899 memcpy(wp, key->data, sblk->lkl - (ksize - key->size));
1900 if (ksize <= db->iwkv->pklen) {
1901 sblk->flags |= SBLK_FULL_LKEY;
1902 } else {
1903 sblk->flags &= ~SBLK_FULL_LKEY;
1904 }
1905 if (!(sblk->flags & SBLK_CACHE_FLAGS)) {
1906 sblk->flags |= SBLK_CACHE_UPDATE;
1907 }
1908 }
1909 if (!raw_key) {
1910 // Update active cursors inside this block
1911 pthread_spin_lock(&db->cursors_slk);
1912 for (IWKV_cursor cur = db->cursors; cur; cur = cur->next) {
1913 if (cur->cn && cur->cn->addr == sblk->addr) {
1914 if (cur->cn != sblk) {
1915 memcpy(cur->cn, sblk, sizeof(*cur->cn));
1916 cur->cn->kvblk = 0;
1917 cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
1918 }
1919 if (cur->cnpos >= idx) {
1920 cur->cnpos++;
1921 }
1922 }
1923 }
1924 pthread_spin_unlock(&db->cursors_slk);
1925 }
1926 return 0;
1927 }
1928
_sblk_addkv(SBLK * sblk,IWLCTX * lx)1929 static WUR iwrc _sblk_addkv(SBLK *sblk, IWLCTX *lx) {
1930 const IWKV_val *key = lx->key;
1931 const IWKV_val *val = lx->val;
1932 assert(key && key->size && key->data && val && sblk->kvblk);
1933 if (!sblk) {
1934 iwlog_error2("sblk != 0");
1935 return IW_ERROR_ASSERTION;
1936 }
1937 uint8_t *mm, idx, kvidx;
1938 IWDB db = sblk->db;
1939 KVBLK *kvblk = sblk->kvblk;
1940 IWFS_FSM *fsm = &sblk->db->iwkv->fsm;
1941 if (sblk->pnum >= KVBLK_IDXNUM) {
1942 return _IWKV_RC_KVBLOCK_FULL;
1943 }
1944 iwrc rc = _kvblk_addkv(kvblk, key, val, &kvidx, false);
1945 RCRET(rc);
1946 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1947 RCRET(rc);
1948 rc = _sblk_insert_pi_mm(sblk, kvidx, lx, mm, &idx);
1949 RCRET(rc);
1950 fsm->release_mmap(fsm);
1951 if (idx == 0) { // the lowest key inserted
1952 size_t ksize = key->size;
1953 bool compound = (db->dbflg & IWDB_COMPOUND_KEYS);
1954 if (compound) {
1955 ksize += IW_VNUMSIZE(key->compound);
1956 }
1957 sblk->lkl = MIN(db->iwkv->pklen, ksize);
1958 uint8_t *wp = sblk->lk;
1959 if (compound) {
1960 int len;
1961 IW_SETVNUMBUF64(len, wp, key->compound);
1962 wp += len;
1963 }
1964 memcpy(wp, key->data, sblk->lkl - (ksize - key->size));
1965 if (ksize <= db->iwkv->pklen) {
1966 sblk->flags |= SBLK_FULL_LKEY;
1967 } else {
1968 sblk->flags &= ~SBLK_FULL_LKEY;
1969 }
1970 if (!(sblk->flags & SBLK_CACHE_FLAGS)) {
1971 sblk->flags |= SBLK_CACHE_UPDATE;
1972 }
1973 }
1974 if (sblk->kvblkn != ADDR2BLK(kvblk->addr)) {
1975 sblk->kvblkn = ADDR2BLK(kvblk->addr);
1976 if (!(sblk->flags & SBLK_CACHE_FLAGS)) {
1977 sblk->flags |= SBLK_CACHE_UPDATE;
1978 }
1979 }
1980 sblk->flags |= SBLK_DURTY;
1981
1982 // Update active cursors inside this block
1983 pthread_spin_lock(&db->cursors_slk);
1984 for (IWKV_cursor cur = db->cursors; cur; cur = cur->next) {
1985 if (cur->cn && cur->cn->addr == sblk->addr) {
1986 if (cur->cn != sblk) {
1987 memcpy(cur->cn, sblk, sizeof(*cur->cn));
1988 cur->cn->kvblk = 0;
1989 cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
1990 }
1991 if (cur->cnpos >= idx) {
1992 cur->cnpos++;
1993 }
1994 }
1995 }
1996 pthread_spin_unlock(&db->cursors_slk);
1997
1998 return 0;
1999 }
2000
_sblk_updatekv(SBLK * sblk,int8_t idx,const IWKV_val * key,const IWKV_val * val)2001 static WUR iwrc _sblk_updatekv(SBLK *sblk, int8_t idx,
2002 const IWKV_val *key, const IWKV_val *val) {
2003 assert(sblk && sblk->kvblk && idx >= 0 && idx < sblk->pnum);
2004 IWDB db = sblk->db;
2005 KVBLK *kvblk = sblk->kvblk;
2006 uint8_t kvidx = sblk->pi[idx];
2007 iwrc intrc = 0;
2008 iwrc rc = _kvblk_updatev(kvblk, &kvidx, key, val);
2009 if (IWKV_IS_INTERNAL_RC(rc)) {
2010 intrc = rc;
2011 rc = 0;
2012 }
2013 RCRET(rc);
2014 if (sblk->kvblkn != ADDR2BLK(kvblk->addr)) {
2015 sblk->kvblkn = ADDR2BLK(kvblk->addr);
2016 if (!(sblk->flags & SBLK_CACHE_FLAGS)) {
2017 sblk->flags |= SBLK_CACHE_UPDATE;
2018 }
2019 }
2020 sblk->pi[idx] = kvidx;
2021 sblk->flags |= SBLK_DURTY;
2022 // Update active cursors inside this block
2023 pthread_spin_lock(&db->cursors_slk);
2024 for (IWKV_cursor cur = db->cursors; cur; cur = cur->next) {
2025 if (cur->cn && cur->cn != sblk && cur->cn->addr == sblk->addr) {
2026 memcpy(cur->cn, sblk, sizeof(*cur->cn));
2027 cur->cn->kvblk = 0;
2028 cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2029 }
2030 }
2031 pthread_spin_unlock(&db->cursors_slk);
2032 return intrc;
2033 }
2034
_sblk_rmkv(SBLK * sblk,uint8_t idx)2035 static WUR iwrc _sblk_rmkv(SBLK *sblk, uint8_t idx) {
2036 assert(sblk && sblk->kvblk);
2037 IWDB db = sblk->db;
2038 KVBLK *kvblk = sblk->kvblk;
2039 IWFS_FSM *fsm = &sblk->db->iwkv->fsm;
2040 assert(kvblk && idx < sblk->pnum && sblk->pi[idx] < KVBLK_IDXNUM);
2041
2042 iwrc rc = _kvblk_rmkv(kvblk, sblk->pi[idx], 0);
2043 RCRET(rc);
2044
2045 if (sblk->kvblkn != ADDR2BLK(kvblk->addr)) {
2046 sblk->kvblkn = ADDR2BLK(kvblk->addr);
2047 if (!(sblk->flags & SBLK_CACHE_FLAGS)) {
2048 sblk->flags |= SBLK_CACHE_UPDATE;
2049 }
2050 }
2051 --sblk->pnum;
2052 sblk->flags |= SBLK_DURTY;
2053
2054 if (idx < sblk->pnum && sblk->pnum > 0) {
2055 memmove(sblk->pi + idx, sblk->pi + idx + 1, sblk->pnum - idx);
2056 }
2057
2058 if (idx == 0) { // Lowest key removed
2059 // Replace the lowest key with the next one or reset
2060 if (sblk->pnum > 0) {
2061 uint8_t *mm, *kbuf;
2062 uint32_t klen;
2063 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2064 RCRET(rc);
2065 rc = _kvblk_key_peek(sblk->kvblk, sblk->pi[idx], mm, &kbuf, &klen);
2066 if (rc) {
2067 fsm->release_mmap(fsm);
2068 return rc;
2069 }
2070 sblk->lkl = MIN(db->iwkv->pklen, klen);
2071 memcpy(sblk->lk, kbuf, sblk->lkl);
2072 fsm->release_mmap(fsm);
2073 if (klen <= db->iwkv->pklen) {
2074 sblk->flags |= SBLK_FULL_LKEY;
2075 } else {
2076 sblk->flags &= ~SBLK_FULL_LKEY;
2077 }
2078 if (!(sblk->flags & SBLK_CACHE_FLAGS)) {
2079 sblk->flags |= SBLK_CACHE_UPDATE;
2080 }
2081 } else {
2082 sblk->lkl = 0;
2083 sblk->flags |= SBLK_CACHE_REMOVE;
2084 }
2085 }
2086
2087 // Update active cursors
2088 pthread_spin_lock(&db->cursors_slk);
2089 for (IWKV_cursor cur = db->cursors; cur; cur = cur->next) {
2090 if (cur->cn && cur->cn->addr == sblk->addr) {
2091 cur->skip_next = 0;
2092 if (cur->cn != sblk) {
2093 memcpy(cur->cn, sblk, sizeof(*cur->cn));
2094 cur->cn->kvblk = 0;
2095 cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2096 }
2097 if (cur->cnpos == idx) {
2098 if (idx && idx == sblk->pnum) {
2099 cur->cnpos--;
2100 cur->skip_next = -1;
2101 } else {
2102 cur->skip_next = 1;
2103 }
2104 } else if (cur->cnpos > idx) {
2105 cur->cnpos--;
2106 }
2107 }
2108 }
2109 pthread_spin_unlock(&db->cursors_slk);
2110 return 0;
2111 }
2112
2113 //-------------------------- IWLCTX
2114
_lx_sblk_cmp_key(IWLCTX * lx,SBLK * sblk,int * resp)2115 WUR iwrc _lx_sblk_cmp_key(IWLCTX *lx, SBLK *sblk, int *resp) {
2116 int res = 0;
2117 iwrc rc = 0;
2118 iwdb_flags_t dbflg = sblk->db->dbflg;
2119 const IWKV_val *key = lx->key;
2120 uint8_t lkl = sblk->lkl;
2121 size_t ksize = key->size;
2122
2123 if (IW_UNLIKELY(sblk->pnum < 1 || (sblk->flags & SBLK_DB))) {
2124 *resp = 0;
2125 iwlog_ecode_error3(IWKV_ERROR_CORRUPTED);
2126 return IWKV_ERROR_CORRUPTED;
2127 }
2128 if (dbflg & IWDB_COMPOUND_KEYS) {
2129 ksize += IW_VNUMSIZE(key->compound);
2130 }
2131 if ((sblk->flags & SBLK_FULL_LKEY)
2132 || ksize < lkl
2133 || (dbflg & (IWDB_VNUM64_KEYS | IWDB_REALNUM_KEYS))) {
2134 res = _cmp_keys(dbflg, sblk->lk, lkl, key);
2135 } else {
2136 res = _cmp_keys_prefix(dbflg, sblk->lk, lkl, key);
2137 if (res == 0) {
2138 uint32_t kl;
2139 uint8_t *mm, *k;
2140 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
2141 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2142 if (rc) {
2143 *resp = 0;
2144 return rc;
2145 }
2146 if (!sblk->kvblk) {
2147 rc = _sblk_loadkvblk_mm(lx, sblk, mm);
2148 if (rc) {
2149 *resp = 0;
2150 fsm->release_mmap(fsm);
2151 return rc;
2152 }
2153 }
2154 rc = _kvblk_key_peek(sblk->kvblk, sblk->pi[0], mm, &k, &kl);
2155 RCRET(rc);
2156 res = _cmp_keys(dbflg, k, kl, key);
2157 fsm->release_mmap(fsm);
2158 }
2159 }
2160 *resp = res;
2161 return rc;
2162 }
2163
_lx_roll_forward(IWLCTX * lx,uint8_t lvl)2164 static WUR iwrc _lx_roll_forward(IWLCTX *lx, uint8_t lvl) {
2165 iwrc rc = 0;
2166 int cret;
2167 SBLK *sblk;
2168 blkn_t blkn;
2169 assert(lx->lower);
2170
2171 while ((blkn = lx->lower->n[lvl])) {
2172 off_t blkaddr = BLK2ADDR(blkn);
2173 if (lx->nlvl > -1 && lvl < lx->nlvl) {
2174 uint8_t ulvl = lvl + 1;
2175 if (lx->pupper[ulvl] && lx->pupper[ulvl]->addr == blkaddr) {
2176 sblk = lx->pupper[ulvl];
2177 } else if (lx->plower[ulvl] && lx->plower[ulvl]->addr == blkaddr) {
2178 sblk = lx->plower[ulvl];
2179 } else {
2180 rc = _sblk_at(lx, blkaddr, 0, &sblk);
2181 }
2182 } else {
2183 rc = _sblk_at(lx, blkaddr, 0, &sblk);
2184 }
2185 RCRET(rc);
2186 #ifndef NDEBUG
2187 ++lx->num_cmps;
2188 #endif
2189 rc = _lx_sblk_cmp_key(lx, sblk, &cret);
2190 RCRET(rc);
2191 if (cret > 0 || lx->upper_addr == sblk->addr) { // upper > key
2192 lx->upper = sblk;
2193 break;
2194 } else {
2195 lx->lower = sblk;
2196 }
2197 }
2198 return 0;
2199 }
2200
_lx_find_bounds(IWLCTX * lx)2201 static WUR iwrc _lx_find_bounds(IWLCTX *lx) {
2202 iwrc rc = 0;
2203 int lvl;
2204 blkn_t blkn;
2205 SBLK *dblk = &lx->dblk;
2206 if (!dblk->addr) {
2207 SBLK *s;
2208 rc = _sblk_at(lx, lx->db->addr, 0, &s);
2209 RCRET(rc);
2210 memcpy(dblk, s, sizeof(*dblk));
2211 }
2212 if (!lx->lower) {
2213 rc = _dbcache_get(lx);
2214 RCRET(rc);
2215 }
2216 if (lx->nlvl > dblk->lvl) {
2217 // New level in DB
2218 dblk->lvl = (uint8_t) lx->nlvl;
2219 dblk->flags |= SBLK_DURTY;
2220 }
2221 lvl = lx->lower->lvl;
2222 while (lvl > -1) {
2223 rc = _lx_roll_forward(lx, (uint8_t) lvl);
2224 RCRET(rc);
2225 if (lx->upper) {
2226 blkn = ADDR2BLK(lx->upper->addr);
2227 } else {
2228 blkn = 0;
2229 }
2230 do {
2231 if (lx->nlvl >= lvl) {
2232 lx->plower[lvl] = lx->lower;
2233 lx->pupper[lvl] = lx->upper;
2234 }
2235 } while (lvl-- && lx->lower->n[lvl] == blkn);
2236 }
2237 return 0;
2238 }
2239
_lx_release_mm(IWLCTX * lx,uint8_t * mm)2240 static iwrc _lx_release_mm(IWLCTX *lx, uint8_t *mm) {
2241 iwrc rc = 0;
2242 if (lx->nlvl > -1) {
2243 SBLK *lsb = 0, *usb = 0;
2244 if (lx->nb) {
2245 rc = _sblk_sync_mm(lx, lx->nb, mm);
2246 RCGO(rc, finish);
2247 }
2248 if (lx->pupper[0] == lx->upper) {
2249 lx->upper = 0;
2250 }
2251 if (lx->plower[0] == lx->lower) {
2252 lx->lower = 0;
2253 }
2254 for (int i = 0; i <= lx->nlvl; ++i) {
2255 if (lx->pupper[i]) {
2256 if (lx->pupper[i] != usb) {
2257 usb = lx->pupper[i];
2258 rc = _sblk_sync_and_release_mm(lx, &lx->pupper[i], mm);
2259 RCGO(rc, finish);
2260 }
2261 lx->pupper[i] = 0;
2262 }
2263 if (lx->plower[i]) {
2264 if (lx->plower[i] != lsb) {
2265 lsb = lx->plower[i];
2266 rc = _sblk_sync_and_release_mm(lx, &lx->plower[i], mm);
2267 RCGO(rc, finish);
2268 }
2269 lx->plower[i] = 0;
2270 }
2271 }
2272 }
2273 if (lx->upper) {
2274 rc = _sblk_sync_and_release_mm(lx, &lx->upper, mm);
2275 RCGO(rc, finish);
2276 }
2277 if (lx->lower) {
2278 rc = _sblk_sync_and_release_mm(lx, &lx->lower, mm);
2279 RCGO(rc, finish);
2280 }
2281 if (lx->dblk.flags & SBLK_DURTY) {
2282 rc = _sblk_sync_mm(lx, &lx->dblk, mm);
2283 RCGO(rc, finish);
2284 }
2285 if (lx->nb) {
2286 if (lx->nb->flags & SBLK_CACHE_PUT) {
2287 rc = _dbcache_put_lw(lx, lx->nb);
2288 }
2289 _sblk_release(lx, &lx->nb);
2290 RCGO(rc, finish);
2291 }
2292 if (lx->cache_reload) {
2293 rc = _dbcache_fill_lw(lx);
2294 }
2295
2296 finish:
2297 lx->destroy_addr = 0;
2298 return rc;
2299 }
2300
_lx_release(IWLCTX * lx)2301 iwrc _lx_release(IWLCTX *lx) {
2302 uint8_t *mm;
2303 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
2304 iwrc rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2305 RCRET(rc);
2306 rc = _lx_release_mm(lx, mm);
2307 IWRC(fsm->release_mmap(fsm), rc);
2308 return rc;
2309 }
2310
_lx_split_addkv(IWLCTX * lx,int idx,SBLK * sblk)2311 static iwrc _lx_split_addkv(IWLCTX *lx, int idx, SBLK *sblk) {
2312 iwrc rc;
2313 SBLK *nb;
2314 blkn_t nblk;
2315 IWDB db = sblk->db;
2316 bool uside = (idx == sblk->pnum);
2317 register const int8_t pivot = (KVBLK_IDXNUM / 2) + 1; // 32
2318
2319 if (uside) { // Upper side
2320 rc = _sblk_create(lx, (uint8_t) lx->nlvl, 0, sblk, lx->upper, &nb);
2321 RCRET(rc);
2322 rc = _sblk_addkv(nb, lx);
2323 RCGO(rc, finish);
2324
2325 } else { // New key is somewhere in a middle of sblk->kvblk
2326 assert(sblk->kvblk);
2327 // We are in the middle
2328 // Do the partial split
2329 // Move kv pairs into new `nb`
2330 // Compute space required for the new sblk which stores kv pairs after pivot `idx`
2331 size_t sz = 0;
2332 for (int8_t i = pivot; i < sblk->pnum; ++i) {
2333 sz += sblk->kvblk->pidx[sblk->pi[i]].len;
2334 }
2335 if (idx > pivot) {
2336 sz += IW_VNUMSIZE(lx->key->size) + lx->key->size + lx->val->size;
2337 }
2338 sz += KVBLK_MAX_NKV_SZ;
2339 uint8_t kvbpow = (uint8_t) iwlog2_64(sz);
2340 while ((1ULL << kvbpow) < sz) kvbpow++;
2341
2342 rc = _sblk_create(lx, (uint8_t) lx->nlvl, kvbpow, sblk, lx->upper, &nb);
2343 RCRET(rc);
2344
2345 IWKV_val key, val;
2346 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
2347 for (int8_t i = pivot, end = sblk->pnum; i < end; ++i) {
2348 uint8_t *mm;
2349 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2350 RCBREAK(rc);
2351
2352 rc = _kvblk_kv_get(sblk->kvblk, mm, sblk->pi[i], &key, &val);
2353 assert(key.size);
2354 fsm->release_mmap(fsm);
2355 RCBREAK(rc);
2356
2357 rc = _sblk_addkv2(nb, i - pivot, &key, &val, true);
2358 _kv_dispose(&key, &val);
2359
2360 RCBREAK(rc);
2361 sblk->kvblk->pidx[sblk->pi[i]].len = 0;
2362 sblk->kvblk->pidx[sblk->pi[i]].off = 0;
2363 --sblk->pnum;
2364 }
2365 sblk->kvblk->flags |= KVBLK_DURTY;
2366 sblk->kvblk->zidx = sblk->pi[pivot];
2367 sblk->kvblk->maxoff = 0;
2368 for (int i = 0; i < KVBLK_IDXNUM; ++i) {
2369 if (sblk->kvblk->pidx[i].off > sblk->kvblk->maxoff) {
2370 sblk->kvblk->maxoff = sblk->kvblk->pidx[i].off;
2371 }
2372 }
2373 }
2374
2375 // Fix levels:
2376 // [ lb -> sblk -> ub ]
2377 // [ lb -> sblk -> nb -> ub ]
2378 nblk = ADDR2BLK(nb->addr);
2379 lx->pupper[0]->p0 = nblk;
2380 lx->pupper[0]->flags |= SBLK_DURTY;
2381 nb->p0 = ADDR2BLK(lx->plower[0]->addr);
2382 for (int i = 0; i <= nb->lvl; ++i) {
2383 lx->plower[i]->n[i] = nblk;
2384 lx->plower[i]->flags |= SBLK_DURTY;
2385 nb->n[i] = ADDR2BLK(lx->pupper[i]->addr);
2386 }
2387
2388 pthread_spin_lock(&db->cursors_slk);
2389 for (IWKV_cursor cur = db->cursors; cur; cur = cur->next) {
2390 if (cur->cn && cur->cn->addr == sblk->addr) {
2391 if (cur->cnpos >= pivot) {
2392 memcpy(cur->cn, nb, sizeof(*cur->cn));
2393 cur->cn->kvblk = 0;
2394 cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2395 cur->cnpos -= pivot;
2396 }
2397 }
2398 }
2399 pthread_spin_unlock(&db->cursors_slk);
2400
2401 if (!uside) {
2402 if (idx > pivot) {
2403 rc = _sblk_addkv(nb, lx);
2404 } else {
2405 rc = _sblk_addkv(sblk, lx);
2406 }
2407 RCGO(rc, finish);
2408 }
2409
2410 finish:
2411 if (rc) {
2412 lx->nb = 0;
2413 IWRC(_sblk_destroy(lx, &nb), rc);
2414 } else {
2415 lx->nb = nb;
2416 }
2417 return rc;
2418 }
2419
_lx_init_chute(IWLCTX * lx)2420 IW_INLINE iwrc _lx_init_chute(IWLCTX *lx) {
2421 assert(lx->nlvl >= 0);
2422 iwrc rc = 0;
2423 if (!lx->pupper[lx->nlvl]) { // fix zero upper by dbtail
2424 SBLK *dbtail;
2425 rc = _sblk_at(lx, 0, 0, &dbtail);
2426 RCRET(rc);
2427 for (int8_t i = lx->nlvl; i >= 0 && !lx->pupper[i]; --i) {
2428 lx->pupper[i] = dbtail;
2429 }
2430 }
2431 return 0;
2432 }
2433
_lx_addkv(IWLCTX * lx)2434 static WUR iwrc _lx_addkv(IWLCTX *lx) {
2435 iwrc rc;
2436 bool found, uadd;
2437 uint8_t *mm = 0, idx;
2438 SBLK *sblk = lx->lower;
2439 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
2440 if (lx->nlvl > -1) {
2441 rc = _lx_init_chute(lx);
2442 RCRET(rc);
2443 }
2444 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2445 RCRET(rc);
2446 rc = _sblk_loadkvblk_mm(lx, sblk, mm);
2447 if (rc) {
2448 fsm->release_mmap(fsm);
2449 return rc;
2450 }
2451 rc = _sblk_find_pi_mm(sblk, lx, mm, &found, &idx);
2452 RCRET(rc);
2453 if (found && (lx->opflags & IWKV_NO_OVERWRITE)) {
2454 fsm->release_mmap(fsm);
2455 return IWKV_ERROR_KEY_EXISTS;
2456 }
2457 uadd = (!found
2458 && sblk->pnum > KVBLK_IDXNUM - 1 && idx > KVBLK_IDXNUM - 1
2459 && lx->upper && lx->upper->pnum < KVBLK_IDXNUM);
2460 if (uadd) {
2461 rc = _sblk_loadkvblk_mm(lx, lx->upper, mm);
2462 if (rc) {
2463 fsm->release_mmap(fsm);
2464 return rc;
2465 }
2466 }
2467 if (found) {
2468 IWKV_val sval, *val = lx->val;
2469 if (lx->opflags & IWKV_VAL_INCREMENT) {
2470 int64_t ival;
2471 uint8_t *rp;
2472 uint32_t len;
2473 if (val->size == 4) {
2474 int32_t lv;
2475 memcpy(&lv, val->data, val->size);
2476 lv = IW_ITOHL(lv);
2477 ival = lv;
2478 } else if (val->size == 8) {
2479 memcpy(&ival, val->data, val->size);
2480 ival = IW_ITOHLL(ival);
2481 } else {
2482 rc = IWKV_ERROR_VALUE_CANNOT_BE_INCREMENTED;
2483 fsm->release_mmap(fsm);
2484 return rc;
2485 }
2486 _kvblk_value_peek(sblk->kvblk, sblk->pi[idx], mm, &rp, &len);
2487 sval.data = rp;
2488 sval.size = len;
2489 if (sval.size == 4) {
2490 uint32_t lv;
2491 memcpy(&lv, sval.data, 4);
2492 lv = IW_ITOHL(lv);
2493 lv += ival;
2494 _num2lebuf(lx->incbuf, &lv, 4);
2495 } else if (sval.size == 8) {
2496 uint64_t llv;
2497 memcpy(&llv, sval.data, 8);
2498 llv = IW_ITOHLL(llv);
2499 llv += ival;
2500 _num2lebuf(lx->incbuf, &llv, 8);
2501 } else {
2502 rc = IWKV_ERROR_VALUE_CANNOT_BE_INCREMENTED;
2503 fsm->release_mmap(fsm);
2504 return rc;
2505 }
2506 sval.data = lx->incbuf;
2507 val = &sval;
2508 }
2509 if (lx->ph) {
2510 IWKV_val oldval;
2511 rc = _kvblk_value_get(sblk->kvblk, mm, sblk->pi[idx], &oldval);
2512 fsm->release_mmap(fsm);
2513 if (!rc) {
2514 // note: oldval should be disposed by ph
2515 rc = lx->ph(lx->key, lx->val, &oldval, lx->phop);
2516 }
2517 RCRET(rc);
2518 } else {
2519 fsm->release_mmap(fsm);
2520 }
2521 return _sblk_updatekv(sblk, idx, lx->key, val);
2522 } else {
2523 fsm->release_mmap(fsm);
2524 if (sblk->pnum > KVBLK_IDXNUM - 1) {
2525 if (uadd) {
2526 if (lx->ph) {
2527 rc = lx->ph(lx->key, lx->val, 0, lx->phop);
2528 RCRET(rc);
2529 }
2530 return _sblk_addkv(lx->upper, lx);
2531 }
2532 if (lx->nlvl < 0) {
2533 return _IWKV_RC_REQUIRE_NLEVEL;
2534 }
2535 if (lx->ph) {
2536 rc = lx->ph(lx->key, lx->val, 0, lx->phop);
2537 RCRET(rc);
2538 }
2539 return _lx_split_addkv(lx, idx, sblk);
2540 } else {
2541 if (lx->ph) {
2542 rc = lx->ph(lx->key, lx->val, 0, lx->phop);
2543 RCRET(rc);
2544 }
2545 return _sblk_addkv2(sblk, idx, lx->key, lx->val, false);
2546 }
2547 }
2548 }
2549
_lx_put_lw(IWLCTX * lx)2550 IW_INLINE WUR iwrc _lx_put_lw(IWLCTX *lx) {
2551 iwrc rc;
2552 start:
2553 rc = _lx_find_bounds(lx);
2554 if (rc) {
2555 _lx_release_mm(lx, 0);
2556 return rc;
2557 }
2558 rc = _lx_addkv(lx);
2559 if (rc == _IWKV_RC_REQUIRE_NLEVEL) {
2560 SBLK *lower = lx->lower;
2561 lx->lower = 0;
2562 _lx_release_mm(lx, 0);
2563 lx->nlvl = _sblk_genlevel(lx->db);
2564 if (lower->lvl >= lx->nlvl) {
2565 lx->lower = lower;
2566 }
2567 goto start;
2568 }
2569 if (rc == _IWKV_RC_KVBLOCK_FULL) {
2570 rc = IWKV_ERROR_CORRUPTED;
2571 iwlog_ecode_error3(rc);
2572 }
2573 IWRC(_lx_release(lx), rc);
2574 return rc;
2575 }
2576
_lx_get_lr(IWLCTX * lx)2577 IW_INLINE WUR iwrc _lx_get_lr(IWLCTX *lx) {
2578 iwrc rc = _lx_find_bounds(lx);
2579 RCRET(rc);
2580 bool found;
2581 uint8_t *mm, idx;
2582 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
2583 lx->val->size = 0;
2584 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2585 RCRET(rc);
2586 rc = _sblk_loadkvblk_mm(lx, lx->lower, mm);
2587 RCGO(rc, finish);
2588 rc = _sblk_find_pi_mm(lx->lower, lx, mm, &found, &idx);
2589 RCGO(rc, finish);
2590 if (found) {
2591 rc = _kvblk_value_get(lx->lower->kvblk, mm, lx->lower->pi[idx], lx->val);
2592 } else {
2593 rc = IWKV_ERROR_NOTFOUND;
2594 }
2595
2596 finish:
2597 IWRC(fsm->release_mmap(fsm), rc);
2598 _lx_release_mm(lx, 0);
2599 return rc;
2600 }
2601
_lx_del_sblk_lw(IWLCTX * lx,SBLK * sblk,uint8_t idx)2602 static WUR iwrc _lx_del_sblk_lw(IWLCTX *lx, SBLK *sblk, uint8_t idx) {
2603 assert(sblk->pnum == 1 && sblk->kvblk);
2604
2605 iwrc rc;
2606 IWDB db = lx->db;
2607 KVBLK *kvblk = sblk->kvblk;
2608 blkn_t sblk_blkn = ADDR2BLK(sblk->addr);
2609
2610 _lx_release_mm(lx, 0);
2611 lx->nlvl = sblk->lvl;
2612 lx->upper_addr = sblk->addr;
2613
2614 rc = _lx_find_bounds(lx);
2615 RCRET(rc);
2616 assert(lx->upper->pnum == 1 && lx->upper->addr == lx->upper_addr);
2617
2618 lx->upper->kvblk = kvblk;
2619 rc = _sblk_rmkv(lx->upper, idx);
2620 RCGO(rc, finish);
2621
2622 for (int i = 0; i <= lx->nlvl; ++i) {
2623 lx->plower[i]->n[i] = lx->upper->n[i];
2624 lx->plower[i]->flags |= SBLK_DURTY;
2625 if (lx->plower[i]->flags & SBLK_DB) {
2626 if (!lx->plower[i]->n[i]) {
2627 --lx->plower[i]->lvl;
2628 }
2629 }
2630 if (lx->pupper[i] == lx->upper) {
2631 // Do not touch `lx->upper` in next `_lx_release_mm()` call
2632 lx->pupper[i] = 0;
2633 }
2634 }
2635
2636 SBLK rb; // Block to remove
2637 memcpy(&rb, lx->upper, sizeof(rb));
2638
2639 SBLK *nb, // Block after lx->upper
2640 *rbp = &rb;
2641
2642 assert(!lx->nb);
2643 rc = _sblk_at(lx, BLK2ADDR(rb.n[0]), 0, &nb);
2644 RCGO(rc, finish);
2645 lx->nb = nb;
2646 lx->nb->p0 = rb.p0;
2647 lx->nb->flags |= SBLK_DURTY;
2648
2649 // Update cursors within sblk removed
2650 pthread_spin_lock(&db->cursors_slk);
2651 for (IWKV_cursor cur = db->cursors; cur; cur = cur->next) {
2652 if (cur->cn) {
2653 if (cur->cn->addr == sblk->addr) {
2654 if (nb->flags & SBLK_DB) {
2655 if (!(lx->plower[0]->flags & SBLK_DB)) {
2656 memcpy(cur->cn, lx->plower[0], sizeof(*cur->cn));
2657 cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2658 cur->cn->kvblk = 0;
2659 cur->skip_next = -1;
2660 cur->cnpos = lx->plower[0]->pnum;
2661 if (cur->cnpos) cur->cnpos--;
2662 } else {
2663 cur->cn = 0;
2664 cur->cnpos = 0;
2665 cur->skip_next = 0;
2666 }
2667 } else {
2668 memcpy(cur->cn, nb, sizeof(*nb));
2669 cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2670 cur->cn->kvblk = 0;
2671 cur->cnpos = 0;
2672 cur->skip_next = 1;
2673 }
2674 } else if (cur->cn->n[0] == sblk_blkn) {
2675 memcpy(cur->cn, lx->plower[0], sizeof(*cur->cn));
2676 cur->cn->kvblk = 0;
2677 cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2678 } else if (cur->cn->p0 == sblk_blkn) {
2679 memcpy(cur->cn, nb, sizeof(*nb));
2680 cur->cn->kvblk = 0;
2681 cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2682 }
2683 }
2684 }
2685 pthread_spin_unlock(&db->cursors_slk);
2686
2687 rc = _sblk_destroy(lx, &rbp);
2688
2689 finish:
2690 return rc;
2691 }
2692
_lx_del_lw(IWLCTX * lx)2693 static WUR iwrc _lx_del_lw(IWLCTX *lx) {
2694 iwrc rc;
2695 bool found;
2696 uint8_t *mm = 0, idx;
2697 IWDB db = lx->db;
2698 IWFS_FSM *fsm = &db->iwkv->fsm;
2699 SBLK *sblk;
2700
2701 rc = _lx_find_bounds(lx);
2702 RCRET(rc);
2703
2704 sblk = lx->lower;
2705 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2706 RCGO(rc, finish);
2707 rc = _sblk_loadkvblk_mm(lx, sblk, mm);
2708 RCGO(rc, finish);
2709 rc = _sblk_find_pi_mm(sblk, lx, mm, &found, &idx);
2710 RCGO(rc, finish);
2711 if (!found) {
2712 rc = IWKV_ERROR_NOTFOUND;
2713 goto finish;
2714 }
2715 fsm->release_mmap(fsm);
2716 mm = 0;
2717
2718 if (sblk->pnum == 1) { // last kv in block
2719 rc = _lx_del_sblk_lw(lx, sblk, idx);
2720 } else {
2721 rc = _sblk_rmkv(sblk, idx);
2722 }
2723
2724 finish:
2725 if (mm) {
2726 fsm->release_mmap(fsm);
2727 }
2728 if (rc) {
2729 _lx_release_mm(lx, 0);
2730 } else {
2731 rc = _lx_release(lx);
2732 }
2733 return rc;
2734 }
2735
2736 //-------------------------- CACHE
2737
_dbcache_destroy_lw(IWDB db)2738 static void _dbcache_destroy_lw(IWDB db) {
2739 free(db->cache.nodes);
2740 memset(&db->cache, 0, sizeof(db->cache));
2741 }
2742
_dbcache_lvl(uint8_t lvl)2743 IW_INLINE uint8_t _dbcache_lvl(uint8_t lvl) {
2744 uint8_t clvl = (lvl >= DBCACHE_LEVELS) ? (lvl - DBCACHE_LEVELS + 1) : DBCACHE_MIN_LEVEL;
2745 if (clvl < DBCACHE_MIN_LEVEL) {
2746 clvl = DBCACHE_MIN_LEVEL;
2747 }
2748 return clvl;
2749 }
2750
_dbcache_cmp_nodes(const void * v1,const void * v2,void * op,int * res)2751 static WUR iwrc _dbcache_cmp_nodes(const void *v1, const void *v2, void *op, int *res) {
2752 iwrc rc = 0;
2753 uint8_t *mm = 0;
2754 IWLCTX *lx = op;
2755 IWDB db = lx->db;
2756 IWFS_FSM *fsm = &db->iwkv->fsm;
2757 iwdb_flags_t dbflg = db->dbflg;
2758 int rv = 0, step;
2759
2760 const DBCNODE *cn1 = v1, *cn2 = v2;
2761 uint8_t *k1 = (uint8_t *) cn1->lk, *k2 = (uint8_t *) cn2->lk;
2762 uint32_t kl1 = cn1->lkl, kl2 = cn2->lkl;
2763 KVBLK *kb;
2764
2765 if (!kl1 && cn1->fullkey) {
2766 kl1 = cn1->sblkn;
2767 }
2768 if (!kl2 && cn2->fullkey) {
2769 kl2 = cn2->sblkn;
2770 }
2771
2772 IWKV_val key2 = {
2773 .size = kl2,
2774 .data = k2
2775 };
2776
2777 if (dbflg & IWDB_COMPOUND_KEYS) {
2778 IW_READVNUMBUF64(k2, key2.compound, step);
2779 key2.size -= step;
2780 key2.data = (char *) key2.data + step;
2781 }
2782
2783 rv = _cmp_keys_prefix(dbflg, k1, kl1, &key2);
2784
2785 if (rv == 0 && !(dbflg & (IWDB_VNUM64_KEYS | IWDB_REALNUM_KEYS))) {
2786
2787 if (!cn1->fullkey || !cn2->fullkey) {
2788 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2789 RCRET(rc);
2790 if (!cn1->fullkey) {
2791 rc = _kvblk_at_mm(lx, BLK2ADDR(cn1->kblkn), mm, 0, &kb);
2792 RCGO(rc, finish);
2793 rc = _kvblk_key_peek(kb, cn1->k0idx, mm, &k1, &kl1);
2794 RCGO(rc, finish);
2795 }
2796 if (!cn2->fullkey) {
2797 rc = _kvblk_at_mm(lx, BLK2ADDR(cn2->kblkn), mm, 0, &kb);
2798 RCGO(rc, finish);
2799 rc = _kvblk_key_peek(kb, cn2->k0idx, mm, &k2, &kl2);
2800 RCGO(rc, finish);
2801 key2.size = kl2;
2802 key2.data = k2;
2803 if (dbflg & IWDB_COMPOUND_KEYS) {
2804 IW_READVNUMBUF64(k2, key2.compound, step);
2805 key2.size -= step;
2806 key2.data = (char *) key2.data + step;
2807 }
2808 }
2809
2810 rv = _cmp_keys(dbflg, k1, kl1, &key2);
2811
2812 } else if (dbflg & IWDB_COMPOUND_KEYS) {
2813
2814 int64_t c1, c2 = key2.compound;
2815 IW_READVNUMBUF64(k1, c1, step);
2816 kl1 -= step;
2817 if (key2.size == kl1) {
2818 rv = c1 > c2 ? -1 : c1 < c2 ? 1 : 0;
2819 } else {
2820 rv = (int) key2.size - (int) kl1;
2821 }
2822
2823 } else {
2824 rv = (int) kl2 - (int) kl1;
2825 }
2826 }
2827
2828 finish:
2829 *res = rv;
2830 if (mm) {
2831 fsm->release_mmap(fsm);
2832 }
2833 return rc;
2834 }
2835
_dbcache_fill_lw(IWLCTX * lx)2836 static WUR iwrc _dbcache_fill_lw(IWLCTX *lx) {
2837 iwrc rc = 0;
2838 IWDB db = lx->db;
2839 lx->cache_reload = 0;
2840 if (!lx->dblk.addr) {
2841 SBLK *s;
2842 rc = _sblk_at(lx, lx->db->addr, 0, &s);
2843 RCRET(rc);
2844 memcpy(&lx->dblk, s, sizeof(lx->dblk));
2845 }
2846 SBLK *sdb = &lx->dblk;
2847 SBLK *sblk = sdb;
2848 DBCACHE *c = &db->cache;
2849 assert(lx->db->addr == sdb->addr);
2850 c->num = 0;
2851 if (c->nodes) {
2852 free(c->nodes);
2853 c->nodes = 0;
2854 }
2855 if (sdb->lvl < DBCACHE_MIN_LEVEL) {
2856 c->open = true;
2857 return 0;
2858 }
2859 c->lvl = _dbcache_lvl(sdb->lvl);
2860 c->nsize = (lx->db->dbflg & IWDB_VNUM64_KEYS) ? DBCNODE_VNUM_SZ : DBCNODE_STR_SZ;
2861 c->asize = c->nsize * ((1U << DBCACHE_LEVELS) + DBCACHE_ALLOC_STEP);
2862
2863 size_t nsize = c->nsize;
2864 c->nodes = malloc(c->asize);
2865 if (!c->nodes) {
2866 c->open = false;
2867 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
2868 }
2869 blkn_t n;
2870 uint8_t *wp;
2871 size_t num = 0;
2872 while ((n = sblk->n[c->lvl])) {
2873 rc = _sblk_at(lx, BLK2ADDR(n), 0, &sblk);
2874 RCRET(rc);
2875 if (offsetof(DBCNODE, lk) + sblk->lkl > nsize) {
2876 free(c->nodes);
2877 c->nodes = 0;
2878 rc = IWKV_ERROR_CORRUPTED;
2879 iwlog_ecode_error3(rc);
2880 return rc;
2881 }
2882 DBCNODE cn = {
2883 .lkl = sblk->lkl,
2884 .fullkey = (sblk->flags & SBLK_FULL_LKEY),
2885 .k0idx = sblk->pi[0],
2886 .sblkn = ADDR2BLK(sblk->addr),
2887 .kblkn = sblk->kvblkn
2888 };
2889 if (c->asize < nsize * (num + 1)) {
2890 c->asize += (nsize * DBCACHE_ALLOC_STEP);
2891 wp = (uint8_t *) c->nodes;
2892 DBCNODE *nn = realloc(c->nodes, c->asize);
2893 if (!nn) {
2894 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
2895 free(wp);
2896 return rc;
2897 }
2898 c->nodes = nn;
2899 }
2900 wp = (uint8_t *) c->nodes + nsize * num;
2901 memcpy(wp, &cn, offsetof(DBCNODE, lk));
2902 wp += offsetof(DBCNODE, lk);
2903 memcpy(wp, sblk->lk, sblk->lkl);
2904 ++num;
2905 }
2906 c->num = num;
2907 c->open = true;
2908 return 0;
2909 }
2910
_dbcache_get(IWLCTX * lx)2911 static WUR iwrc _dbcache_get(IWLCTX *lx) {
2912 iwrc rc = 0;
2913 off_t idx;
2914 bool found;
2915 DBCNODE *n;
2916 alignas(DBCNODE) uint8_t dbcbuf[255];
2917 IWDB db = lx->db;
2918 DBCACHE *cache = &db->cache;
2919 const IWKV_val *key = lx->key;
2920 if (lx->nlvl > -1 || cache->num < 1) {
2921 lx->lower = &lx->dblk;
2922 return 0;
2923 }
2924 assert(cache->nodes);
2925 size_t lxksiz = key->size;
2926 if (db->dbflg & IWDB_COMPOUND_KEYS) {
2927 lxksiz += IW_VNUMSIZE(key->compound);
2928 }
2929
2930 if (sizeof(DBCNODE) + lxksiz <= sizeof(dbcbuf)) {
2931 n = (DBCNODE *) dbcbuf;
2932 } else {
2933 n = malloc(sizeof(DBCNODE) + lxksiz);
2934 if (!n) {
2935 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
2936 }
2937 }
2938 n->sblkn = (uint32_t) lxksiz; // `sblkn` used to store key size (to keep DBCNODE compact)
2939 n->kblkn = 0;
2940 n->fullkey = 1;
2941 n->lkl = 0;
2942 n->k0idx = 0;
2943
2944 uint8_t *wp = (uint8_t *) n + offsetof(DBCNODE, lk);
2945 if (db->dbflg & IWDB_COMPOUND_KEYS) {
2946 size_t step;
2947 char vbuf[IW_VNUMBUFSZ];
2948 IW_SETVNUMBUF(step, vbuf, key->compound);
2949 memcpy(wp, vbuf, step);
2950 wp += step;
2951 }
2952 memcpy(wp, key->data, key->size);
2953
2954 idx = iwarr_sorted_find2(cache->nodes, cache->num, cache->nsize, n, lx, &found, _dbcache_cmp_nodes);
2955 if (idx > 0) {
2956 DBCNODE *fn = (DBCNODE *)((uint8_t *) cache->nodes + (idx - 1) * cache->nsize);
2957 assert(fn && idx - 1 < cache->num);
2958 rc = _sblk_at(lx, BLK2ADDR(fn->sblkn), 0, &lx->lower);
2959 } else {
2960 lx->lower = &lx->dblk;
2961 }
2962 if ((uint8_t *) n != dbcbuf) {
2963 free(n);
2964 }
2965 return rc;
2966 }
2967
_dbcache_put_lw(IWLCTX * lx,SBLK * sblk)2968 static WUR iwrc _dbcache_put_lw(IWLCTX *lx, SBLK *sblk) {
2969 off_t idx;
2970 bool found;
2971 IWDB db = lx->db;
2972 alignas(DBCNODE) uint8_t dbcbuf[255];
2973 DBCNODE *n = (DBCNODE *) dbcbuf;
2974 DBCACHE *cache = &db->cache;
2975 size_t nsize = cache->nsize;
2976
2977 sblk->flags &= ~SBLK_CACHE_PUT;
2978 assert(sizeof(*cache) + sblk->lkl <= sizeof(dbcbuf));
2979 if (sblk->pnum < 1 || sblk->lvl < cache->lvl) {
2980 return 0;
2981 }
2982 if (sblk->lvl >= cache->lvl + DBCACHE_LEVELS || !cache->nodes) { // need to reload full cache
2983 lx->cache_reload = 1;
2984 return 0;
2985 }
2986 if (!sblk->kvblk) {
2987 assert(sblk->kvblk);
2988 return IW_ERROR_INVALID_STATE;
2989 }
2990 n->lkl = sblk->lkl;
2991 n->fullkey = (sblk->flags & SBLK_FULL_LKEY);
2992 n->k0idx = sblk->pi[0];
2993 n->sblkn = ADDR2BLK(sblk->addr);
2994 n->kblkn = sblk->kvblkn;
2995 memcpy((uint8_t *) n + offsetof(DBCNODE, lk), sblk->lk, sblk->lkl);
2996
2997 idx = iwarr_sorted_find2(cache->nodes, cache->num, nsize, n, lx, &found, _dbcache_cmp_nodes);
2998 assert(!found);
2999
3000 if (cache->asize <= cache->num * nsize) {
3001 size_t nsz = cache->asize + (nsize * DBCACHE_ALLOC_STEP);
3002 DBCNODE *nodes = realloc(cache->nodes, nsz);
3003 if (!nodes) {
3004 iwrc rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
3005 free(cache->nodes);
3006 cache->nodes = 0;
3007 return rc;
3008 }
3009 cache->asize = nsz;
3010 cache->nodes = nodes;
3011 }
3012
3013 uint8_t *cptr = (uint8_t *) cache->nodes;
3014 if (cache->num != idx) {
3015 memmove(cptr + (idx + 1) * nsize, cptr + idx * nsize, (cache->num - idx) * nsize);
3016 }
3017 memcpy(cptr + idx * nsize, n, nsize);
3018 ++cache->num;
3019 return 0;
3020 }
3021
_dbcache_remove_lw(IWLCTX * lx,SBLK * sblk)3022 static void _dbcache_remove_lw(IWLCTX *lx, SBLK *sblk) {
3023 IWDB db = lx->db;
3024 DBCACHE *cache = &db->cache;
3025 sblk->flags &= ~SBLK_CACHE_REMOVE;
3026 if (sblk->lvl < cache->lvl || cache->num < 1) {
3027 return;
3028 }
3029 if (cache->lvl > DBCACHE_MIN_LEVEL && lx->dblk.lvl < sblk->lvl) {
3030 // Database level reduced so we need to shift cache down
3031 lx->cache_reload = 1;
3032 return;
3033 }
3034 blkn_t sblkn = ADDR2BLK(sblk->addr);
3035 size_t num = cache->num;
3036 size_t nsize = cache->nsize;
3037 uint8_t *rp = (uint8_t *) cache->nodes;
3038 for (size_t i = 0; i < num; ++i) {
3039 DBCNODE *n = (DBCNODE *)(rp + i * nsize);
3040 if (sblkn == n->sblkn) {
3041 if (i < num - 1) {
3042 memmove(rp + i * nsize, rp + (i + 1) * nsize, (num - i - 1) * nsize);
3043 }
3044 --cache->num;
3045 break;
3046 }
3047 }
3048 }
3049
_dbcache_update_lw(IWLCTX * lx,SBLK * sblk)3050 static void _dbcache_update_lw(IWLCTX *lx, SBLK *sblk) {
3051 IWDB db = lx->db;
3052 DBCACHE *cache = &db->cache;
3053 assert(sblk->pnum > 0);
3054 sblk->flags &= ~SBLK_CACHE_UPDATE;
3055 if (sblk->lvl < cache->lvl || cache->num < 1) {
3056 return;
3057 }
3058 blkn_t sblkn = ADDR2BLK(sblk->addr);
3059 size_t num = cache->num;
3060 size_t nsize = cache->nsize;
3061 uint8_t *rp = (uint8_t *) cache->nodes;
3062 for (size_t i = 0; i < num; ++i) {
3063 DBCNODE *n = (DBCNODE *)(rp + i * nsize);
3064 if (sblkn == n->sblkn) {
3065 n->kblkn = sblk->kvblkn;
3066 n->lkl = sblk->lkl;
3067 n->fullkey = (sblk->flags & SBLK_FULL_LKEY);
3068 n->k0idx = sblk->pi[0];
3069 memcpy((uint8_t *) n + offsetof(DBCNODE, lk), sblk->lk, sblk->lkl);
3070 break;
3071 }
3072 }
3073 }
3074
3075 //-------------------------- CURSOR
3076
_cursor_get_ge_idx(IWLCTX * lx,IWKV_cursor_op op,uint8_t * oidx)3077 IW_INLINE WUR iwrc _cursor_get_ge_idx(IWLCTX *lx, IWKV_cursor_op op, uint8_t *oidx) {
3078 iwrc rc = _lx_find_bounds(lx);
3079 RCRET(rc);
3080 bool found;
3081 uint8_t *mm, idx;
3082 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
3083 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
3084 RCRET(rc);
3085 rc = _sblk_loadkvblk_mm(lx, lx->lower, mm);
3086 RCGO(rc, finish);
3087 rc = _sblk_find_pi_mm(lx->lower, lx, mm, &found, &idx);
3088 RCGO(rc, finish);
3089 if (found) {
3090 *oidx = idx;
3091 } else {
3092 if (op == IWKV_CURSOR_EQ || (lx->lower->flags & SBLK_DB) || lx->lower->pnum < 1) {
3093 rc = IWKV_ERROR_NOTFOUND;
3094 } else {
3095 *oidx = idx ? idx - 1 : idx;
3096 }
3097 }
3098
3099 finish:
3100 IWRC(fsm->release_mmap(fsm), rc);
3101 return rc;
3102 }
3103
_cursor_to_lr(IWKV_cursor cur,IWKV_cursor_op op)3104 static WUR iwrc _cursor_to_lr(IWKV_cursor cur, IWKV_cursor_op op) {
3105 iwrc rc = 0;
3106 IWDB db = cur->lx.db;
3107 IWLCTX *lx = &cur->lx;
3108 blkn_t dblk = ADDR2BLK(db->addr);
3109 if (op < IWKV_CURSOR_NEXT) { // IWKV_CURSOR_BEFORE_FIRST | IWKV_CURSOR_AFTER_LAST
3110 if (cur->cn) {
3111 _sblk_release(lx, &cur->cn);
3112 }
3113 if (op == IWKV_CURSOR_BEFORE_FIRST) {
3114 cur->dbaddr = db->addr;
3115 cur->cnpos = KVBLK_IDXNUM - 1;
3116 } else {
3117 cur->dbaddr = -1; // Negative as sign of dbtail
3118 cur->cnpos = 0;
3119 }
3120 return 0;
3121 }
3122
3123 start:
3124 if (op < IWKV_CURSOR_EQ) { // IWKV_CURSOR_NEXT | IWKV_CURSOR_PREV
3125 blkn_t n = 0;
3126 if (!cur->cn) {
3127 if (cur->dbaddr) {
3128 rc = _sblk_at(lx, (cur->dbaddr < 0 ? 0 : cur->dbaddr), 0, &cur->cn);
3129 cur->dbaddr = 0;
3130 RCGO(rc, finish);
3131 } else {
3132 rc = IWKV_ERROR_NOTFOUND;
3133 goto finish;
3134 }
3135 }
3136 if (op == IWKV_CURSOR_NEXT) {
3137 if (cur->skip_next > 0) {
3138 goto finish;
3139 }
3140 if (cur->cnpos + 1 >= cur->cn->pnum) {
3141 n = cur->cn->n[0];
3142 if (!n) {
3143 rc = IWKV_ERROR_NOTFOUND;
3144 goto finish;
3145 }
3146 _sblk_release(lx, &cur->cn);
3147 rc = _sblk_at(lx, BLK2ADDR(n), 0, &cur->cn);
3148 RCGO(rc, finish);
3149 cur->cnpos = 0;
3150 if (IW_UNLIKELY(!cur->cn->pnum)) {
3151 goto start;
3152 }
3153 } else {
3154 if (cur->cn->flags & SBLK_DB) {
3155 rc = IWKV_ERROR_NOTFOUND;
3156 goto finish;
3157 }
3158 ++cur->cnpos;
3159 }
3160 } else { // IWKV_CURSOR_PREV
3161 if (cur->skip_next < 0) {
3162 goto finish;
3163 }
3164 if (cur->cnpos == 0) {
3165 n = cur->cn->p0;
3166 if (!n || n == dblk) {
3167 rc = IWKV_ERROR_NOTFOUND;
3168 goto finish;
3169 }
3170 _sblk_release(lx, &cur->cn);
3171 RCGO(rc, finish);
3172 rc = _sblk_at(lx, BLK2ADDR(n), 0, &cur->cn);
3173 RCGO(rc, finish);
3174 if (IW_LIKELY(cur->cn->pnum)) {
3175 cur->cnpos = cur->cn->pnum - 1;
3176 } else {
3177 goto start;
3178 }
3179 } else {
3180 if (cur->cn->flags & SBLK_DB) {
3181 rc = IWKV_ERROR_NOTFOUND;
3182 goto finish;
3183 }
3184 --cur->cnpos;
3185 }
3186 }
3187 } else { // IWKV_CURSOR_EQ | IWKV_CURSOR_GE
3188 if (!lx->key) {
3189 rc = IW_ERROR_INVALID_STATE;
3190 goto finish;
3191 }
3192 rc = _cursor_get_ge_idx(lx, op, &cur->cnpos);
3193 if (lx->upper) {
3194 _sblk_release(lx, &lx->upper);
3195 }
3196 if (!rc) {
3197 cur->cn = lx->lower;
3198 lx->lower = 0;
3199 }
3200 }
3201
3202 finish:
3203 cur->skip_next = 0;
3204 if (rc && rc != IWKV_ERROR_NOTFOUND) {
3205 if (cur->cn) _sblk_release(lx, &cur->cn);
3206 }
3207 return rc;
3208 }
3209
3210 //-------------------------- PUBLIC API
3211
_kv_ecodefn(locale_t locale,uint32_t ecode)3212 static const char *_kv_ecodefn(locale_t locale, uint32_t ecode) {
3213 if (!(ecode > _IWKV_ERROR_START && ecode < _IWKV_ERROR_END)) {
3214 return 0;
3215 }
3216 switch (ecode) {
3217 case IWKV_ERROR_NOTFOUND:
3218 return "Key not found. (IWKV_ERROR_NOTFOUND)";
3219 case IWKV_ERROR_KEY_EXISTS:
3220 return "Key exists. (IWKV_ERROR_KEY_EXISTS)";
3221 case IWKV_ERROR_MAXKVSZ:
3222 return "Size of Key+value must be not greater than 0xfffffff bytes (IWKV_ERROR_MAXKVSZ)";
3223 case IWKV_ERROR_CORRUPTED:
3224 return "Database file invalid or corrupted (IWKV_ERROR_CORRUPTED)";
3225 case IWKV_ERROR_DUP_VALUE_SIZE:
3226 return "Value size is not compatible for insertion into sorted values array (IWKV_ERROR_DUP_VALUE_SIZE)";
3227 case IWKV_ERROR_KEY_NUM_VALUE_SIZE:
3228 return "Given key is not compatible to store as number (IWKV_ERROR_KEY_NUM_VALUE_SIZE)";
3229 case IWKV_ERROR_INCOMPATIBLE_DB_MODE:
3230 return "Incompatible database open mode (IWKV_ERROR_INCOMPATIBLE_DB_MODE)";
3231 case IWKV_ERROR_INCOMPATIBLE_DB_FORMAT:
3232 return "Incompatible database format version, please migrate database data (IWKV_ERROR_INCOMPATIBLE_DB_FORMAT)";
3233 case IWKV_ERROR_CORRUPTED_WAL_FILE:
3234 return "Corrupted WAL file (IWKV_ERROR_CORRUPTED_WAL_FILE)";
3235 case IWKV_ERROR_VALUE_CANNOT_BE_INCREMENTED:
3236 return "Stored value cannot be incremented/descremented (IWKV_ERROR_VALUE_CANNOT_BE_INCREMENTED)";
3237 case IWKV_ERROR_WAL_MODE_REQUIRED:
3238 return "Operation requires WAL enabled database. (IWKV_ERROR_WAL_MODE_REQUIRED)";
3239 case IWKV_ERROR_BACKUP_IN_PROGRESS:
3240 return "ackup operation in progress. (IWKV_ERROR_BACKUP_IN_PROGRESS)";
3241 default:
3242 break;
3243 }
3244 return 0;
3245 }
3246
iwkv_init(void)3247 iwrc iwkv_init(void) {
3248 static int _kv_initialized = 0;
3249 if (!__sync_bool_compare_and_swap(&_kv_initialized, 0, 1)) {
3250 return 0;
3251 }
3252 return iwlog_register_ecodefn(_kv_ecodefn);
3253 }
3254
_szpolicy(off_t nsize,off_t csize,struct IWFS_EXT * f,void ** _ctx)3255 static off_t _szpolicy(off_t nsize, off_t csize, struct IWFS_EXT *f, void **_ctx) {
3256 off_t res;
3257 size_t aunit = iwp_alloc_unit();
3258 if (csize < 0x4000000) { // Doubled alloc up to 64M
3259 res = csize ? csize : aunit;
3260 while (res < nsize) {
3261 res <<= 1;
3262 }
3263 } else {
3264 res = nsize + 10 * 1024 * 1024; // + 10M extra space
3265 }
3266 res = IW_ROUNDUP(res, aunit);
3267 return res;
3268 }
3269
iwkv_state(IWKV iwkv,IWFS_FSM_STATE * out)3270 iwrc iwkv_state(IWKV iwkv, IWFS_FSM_STATE *out) {
3271 if (!iwkv || !out) {
3272 return IW_ERROR_INVALID_ARGS;
3273 }
3274 int rci;
3275 API_RLOCK(iwkv, rci);
3276 IWFS_FSM fsm = iwkv->fsm;
3277 iwrc rc = fsm.state(&fsm, out);
3278 API_UNLOCK(iwkv, rci, rc);
3279 return rc;
3280 }
3281
iwkv_online_backup(IWKV iwkv,uint64_t * ts,const char * target_file)3282 iwrc iwkv_online_backup(IWKV iwkv, uint64_t *ts, const char *target_file) {
3283 return iwal_online_backup(iwkv, ts, target_file);
3284 }
3285
_iwkv_check_online_backup(const char * path,iwp_lockmode extra_lock_flags,bool * out_has_online_bkp)3286 static iwrc _iwkv_check_online_backup(const char *path, iwp_lockmode extra_lock_flags, bool *out_has_online_bkp) {
3287 size_t sp;
3288 uint32_t lv;
3289 off_t fsz, pos;
3290 uint64_t waloff; // WAL offset
3291 char buf[16384];
3292
3293 *out_has_online_bkp = false;
3294 const size_t aunit = iwp_alloc_unit();
3295 char *wpath = 0;
3296
3297 IWFS_FILE f = {0}, w = {0};
3298 IWFS_FILE_STATE fs, fw;
3299 iwrc rc = iwfs_file_open(&f, &(IWFS_FILE_OPTS) {
3300 .path = path,
3301 .omode = IWFS_OREAD | IWFS_OWRITE,
3302 .lock_mode = IWP_WLOCK | extra_lock_flags
3303 });
3304 if (rc == IW_ERROR_NOT_EXISTS) {
3305 return 0;
3306 }
3307 RCRET(rc);
3308
3309 rc = f.state(&f, &fs);
3310 RCGO(rc, finish);
3311
3312 rc = iwp_lseek(fs.fh, 0, IWP_SEEK_END, &fsz);
3313 RCGO(rc, finish);
3314 if (fsz < iwp_alloc_unit()) {
3315 goto finish;
3316 }
3317
3318 rc = iwp_pread(fs.fh, 0, &lv, sizeof(lv), &sp);
3319 RCGO(rc, finish);
3320 lv = IW_ITOHL(lv);
3321 if (sp != sizeof(lv) || lv != IWFSM_MAGICK) {
3322 goto finish;
3323 }
3324
3325 rc = iwp_pread(fs.fh, IWFSM_CUSTOM_HDR_DATA_OFFSET, &lv, sizeof(lv), &sp);
3326 RCGO(rc, finish);
3327 lv = IW_ITOHL(lv);
3328 if (sp != sizeof(lv) || lv != IWKV_MAGIC) {
3329 goto finish;
3330 }
3331
3332 rc = iwp_lseek(fs.fh, (off_t) -1 * sizeof(lv), IWP_SEEK_END, 0);
3333 RCGO(rc, finish);
3334
3335 rc = iwp_read(fs.fh, &lv, sizeof(lv), &sp);
3336 RCGO(rc, finish);
3337 lv = IW_ITOHL(lv);
3338 if (sp != sizeof(lv) || lv != IWKV_BACKUP_MAGIC) {
3339 goto finish;
3340 }
3341
3342 // Get WAL data offset
3343 rc = iwp_lseek(fs.fh, (off_t) -1 * (sizeof(waloff) + sizeof(lv)), IWP_SEEK_END, &pos);
3344 RCGO(rc, finish);
3345
3346 rc = iwp_read(fs.fh, &waloff, sizeof(waloff), &sp);
3347 RCGO(rc, finish);
3348
3349 waloff = IW_ITOHLL(waloff);
3350 if ((waloff != pos && waloff > pos - sizeof(WBSEP)) || (waloff & (aunit - 1))) {
3351 goto finish;
3352 }
3353
3354 // Read the first WAL instruction: WBSEP
3355 if (waloff != pos) { // Not an empty WAL?
3356 WBSEP wbsep = {0};
3357 rc = iwp_pread(fs.fh, waloff, &wbsep, sizeof(wbsep), &sp);
3358 RCGO(rc, finish);
3359 if (wbsep.id != WOP_SEP) {
3360 goto finish;
3361 }
3362 }
3363
3364 // Now we have an online backup image, unpack WAL file
3365
3366 sp = strlen(path);
3367 wpath = malloc(sp + 4 /*-wal*/ + 1 /*\0*/);
3368 if (!wpath) {
3369 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
3370 goto finish;
3371 }
3372 memcpy(wpath, path, sp);
3373 memcpy(wpath + sp, "-wal", 4);
3374 wpath[sp + 4] = '\0';
3375
3376 iwlog_warn("Unpacking WAL from online backup into: %s", wpath);
3377 *out_has_online_bkp = true;
3378
3379 // WAL file
3380 rc = iwfs_file_open(&w, &(IWFS_FILE_OPTS) {
3381 .path = wpath,
3382 .omode = IWFS_OREAD | IWFS_OWRITE | IWFS_OTRUNC
3383 });
3384 RCGO(rc, finish);
3385
3386 rc = w.state(&w, &fw);
3387 RCGO(rc, finish);
3388
3389 // WAL content copy
3390 rc = iwp_lseek(fs.fh, waloff, IWP_SEEK_SET, 0);
3391 RCGO(rc, finish);
3392 fsz = fsz - waloff - sizeof(lv) /* magic */ - sizeof(waloff) /* wal offset */;
3393 if (fsz > 0) {
3394 sp = 0;
3395 do {
3396 rc = iwp_read(fs.fh, buf, sizeof(buf), &sp);
3397 RCGO(rc, finish);
3398 if (sp > fsz) {
3399 sp = fsz;
3400 }
3401 fsz -= sp;
3402 rc = iwp_write(fw.fh, buf, sp);
3403 RCGO(rc, finish);
3404 } while (fsz > 0 && sp > 0);
3405 }
3406 rc = iwp_fsync(fw.fh);
3407 RCGO(rc, finish);
3408
3409 rc = iwp_ftruncate(fs.fh, waloff);
3410 RCGO(rc, finish);
3411
3412 rc = iwp_fsync(fs.fh);
3413 RCGO(rc, finish);
3414
3415 finish:
3416 if (f.impl) {
3417 IWRC(f.close(&f), rc);
3418 }
3419 if (w.impl) {
3420 IWRC(w.close(&w), rc);
3421 }
3422 free(wpath);
3423 return rc;
3424 }
3425
iwkv_open(const IWKV_OPTS * opts,IWKV * iwkvp)3426 iwrc iwkv_open(const IWKV_OPTS *opts, IWKV *iwkvp) {
3427 if (!opts || !iwkvp || !opts->path) {
3428 return IW_ERROR_INVALID_ARGS;
3429 }
3430 *iwkvp = 0;
3431 int rci;
3432 iwrc rc = 0;
3433 uint32_t lv;
3434 uint64_t llv;
3435 uint8_t *rp, *mm;
3436 bool has_online_bkp = false;
3437
3438 rc = iw_init();
3439 RCRET(rc);
3440
3441 if (opts->random_seed) {
3442 iwu_rand_seed(opts->random_seed);
3443 }
3444 iwkv_openflags oflags = opts->oflags;
3445 iwfs_omode omode = IWFS_OREAD;
3446 if (oflags & IWKV_TRUNC) {
3447 oflags &= ~IWKV_RDONLY;
3448 omode |= IWFS_OTRUNC;
3449 }
3450 if (!(oflags & IWKV_RDONLY)) {
3451 omode |= IWFS_OWRITE;
3452 omode |= IWFS_OCREATE;
3453 }
3454 if ((omode & IWFS_OWRITE) && !(omode & IWFS_OTRUNC)) {
3455 iwp_lockmode extra_lock_flags = 0;
3456 if (opts->file_lock_fail_fast) {
3457 extra_lock_flags |= IWP_NBLOCK;
3458 }
3459 rc = _iwkv_check_online_backup(opts->path, extra_lock_flags, &has_online_bkp);
3460 RCRET(rc);
3461 }
3462
3463 *iwkvp = calloc(1, sizeof(struct _IWKV));
3464 if (!*iwkvp) {
3465 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
3466 }
3467 IWKV iwkv = *iwkvp;
3468 iwkv->fmt_version = opts->fmt_version > 0 ? opts->fmt_version : IWKV_FORMAT;
3469 if (iwkv->fmt_version > IWKV_FORMAT) {
3470 rc = IWKV_ERROR_INCOMPATIBLE_DB_FORMAT;
3471 iwlog_ecode_error3(rc);
3472 return rc;
3473 }
3474 // Adjust lower key len accourding to database format version
3475 if (iwkv->fmt_version < 2) {
3476 iwkv->pklen = PREFIX_KEY_LEN_V1;
3477 } else {
3478 iwkv->pklen = PREFIX_KEY_LEN_V2;
3479 }
3480
3481 pthread_rwlockattr_t attr;
3482 pthread_rwlockattr_init(&attr);
3483 #if defined __linux__ && (defined __USE_UNIX98 || defined __USE_XOPEN2K)
3484 pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
3485 #endif
3486 rci = pthread_rwlock_init(&iwkv->rwl, &attr);
3487 if (rci) {
3488 free(*iwkvp);
3489 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
3490 }
3491 rci = pthread_mutex_init(&iwkv->wk_mtx, 0);
3492 if (rci) {
3493 pthread_rwlock_destroy(&iwkv->rwl);
3494 free(*iwkvp);
3495 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
3496 }
3497 rci = pthread_cond_init(&iwkv->wk_cond, 0);
3498 if (rci) {
3499 pthread_rwlock_destroy(&iwkv->rwl);
3500 pthread_mutex_destroy(&iwkv->wk_mtx);
3501 free(*iwkvp);
3502 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
3503 }
3504
3505 iwkv->oflags = oflags;
3506 IWFS_FSM_STATE fsmstate;
3507 IWFS_FSM_OPTS fsmopts = {
3508 .exfile = {
3509 .file = {
3510 .path = opts->path,
3511 .omode = omode,
3512 .lock_mode = (oflags & IWKV_RDONLY) ? IWP_RLOCK : IWP_WLOCK
3513 },
3514 .rspolicy = _szpolicy,
3515 .maxoff = IWKV_MAX_DBSZ,
3516 .use_locks = true
3517 },
3518 .bpow = IWKV_FSM_BPOW, // 64 bytes block size
3519 .hdrlen = KVHDRSZ, // Size of custom file header
3520 .oflags = ((oflags & IWKV_RDONLY) ? IWFSM_NOLOCKS : 0),
3521 .mmap_all = true
3522 };
3523 #ifndef NDEBUG
3524 fsmopts.oflags |= IWFSM_STRICT;
3525 #endif
3526 if (oflags & IWKV_NO_TRIM_ON_CLOSE) {
3527 fsmopts.oflags |= IWFSM_NO_TRIM_ON_CLOSE;
3528 }
3529 if (opts->file_lock_fail_fast) {
3530 fsmopts.exfile.file.lock_mode |= IWP_NBLOCK;
3531 }
3532 // Init WAL
3533 rc = iwal_create(iwkv, opts, &fsmopts, has_online_bkp);
3534 RCGO(rc, finish);
3535
3536 // Now open database file
3537 rc = iwfs_fsmfile_open(&iwkv->fsm, &fsmopts);
3538 RCGO(rc, finish);
3539
3540 IWFS_FSM *fsm = &iwkv->fsm;
3541 iwkv->dbs = kh_init(DBS);
3542 rc = fsm->state(fsm, &fsmstate);
3543 RCGO(rc, finish);
3544
3545 // Database header: [magic:u4, first_addr:u8, db_format_version:u4]
3546 if (fsmstate.exfile.file.ostatus & IWFS_OPEN_NEW) {
3547 uint8_t hdr[KVHDRSZ] = {0};
3548 uint8_t *wp = hdr;
3549 IW_WRITELV(wp, lv, IWKV_MAGIC);
3550 wp += sizeof(llv); // skip first db addr
3551 IW_WRITELV(wp, lv, iwkv->fmt_version);
3552 rc = fsm->writehdr(fsm, 0, hdr, sizeof(hdr));
3553 RCGO(rc, finish);
3554 rc = fsm->sync(fsm, 0);
3555 RCGO(rc, finish);
3556 } else {
3557 off_t dbaddr; // first database address
3558 uint8_t hdr[KVHDRSZ];
3559 rc = fsm->readhdr(fsm, 0, hdr, KVHDRSZ);
3560 RCGO(rc, finish);
3561 rp = hdr; // -V507
3562 IW_READLV(rp, lv, lv);
3563 IW_READLLV(rp, llv, dbaddr);
3564 if (lv != IWKV_MAGIC || dbaddr < 0) {
3565 rc = IWKV_ERROR_CORRUPTED;
3566 iwlog_ecode_error3(rc);
3567 goto finish;
3568 }
3569 IW_READLV(rp, lv, iwkv->fmt_version);
3570 if ((iwkv->fmt_version > IWKV_FORMAT)) {
3571 rc = IWKV_ERROR_INCOMPATIBLE_DB_FORMAT;
3572 iwlog_ecode_error3(rc);
3573 goto finish;
3574 }
3575 if (iwkv->fmt_version < 2) {
3576 iwkv->pklen = PREFIX_KEY_LEN_V1;
3577 } else {
3578 iwkv->pklen = PREFIX_KEY_LEN_V2;
3579 }
3580 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
3581 RCGO(rc, finish);
3582 rc = _db_load_chain(iwkv, dbaddr, mm);
3583 fsm->release_mmap(fsm);
3584 }
3585 (*iwkvp)->open = true;
3586
3587 finish:
3588 if (rc) {
3589 (*iwkvp)->open = true; // will be closed in iwkv_close
3590 IWRC(iwkv_close(iwkvp), rc);
3591 }
3592 return rc;
3593 }
3594
iwkv_exclusive_lock(IWKV iwkv)3595 iwrc iwkv_exclusive_lock(IWKV iwkv) {
3596 return _wnw(iwkv, _wnw_iwkw_wl);
3597 }
3598
iwkv_exclusive_unlock(IWKV iwkv)3599 iwrc iwkv_exclusive_unlock(IWKV iwkv) {
3600 int rci;
3601 iwrc rc = 0;
3602 API_UNLOCK(iwkv, rci, rc);
3603 return rc;
3604 }
3605
iwkv_close(IWKV * iwkvp)3606 iwrc iwkv_close(IWKV *iwkvp) {
3607 ENSURE_OPEN((*iwkvp));
3608 IWKV iwkv = *iwkvp;
3609 iwkv->open = false;
3610 iwal_shutdown(iwkv);
3611 iwrc rc = iwkv_exclusive_lock(iwkv);
3612 RCRET(rc);
3613 IWDB db = iwkv->first_db;
3614 while (db) {
3615 IWDB ndb = db->next;
3616 _db_release_lw(&db);
3617 db = ndb;
3618 }
3619 IWRC(iwkv->fsm.close(&iwkv->fsm), rc);
3620 // Below the memory cleanup only
3621 if (iwkv->dbs) {
3622 kh_destroy(DBS, iwkv->dbs);
3623 iwkv->dbs = 0;
3624 }
3625 iwkv_exclusive_unlock(iwkv);
3626 pthread_rwlock_destroy(&iwkv->rwl);
3627 pthread_mutex_destroy(&iwkv->wk_mtx);
3628 pthread_cond_destroy(&iwkv->wk_cond);
3629 free(iwkv);
3630 *iwkvp = 0;
3631 return rc;
3632 }
3633
_iwkv_sync(IWKV iwkv,iwfs_sync_flags _flags)3634 static iwrc _iwkv_sync(IWKV iwkv, iwfs_sync_flags _flags) {
3635 ENSURE_OPEN(iwkv);
3636 if (iwkv->oflags & IWKV_RDONLY) {
3637 return IW_ERROR_READONLY;
3638 }
3639 iwrc rc;
3640 if (iwkv->dlsnr) {
3641 rc = iwal_poke_savepoint(iwkv);
3642 } else {
3643 IWFS_FSM *fsm = &iwkv->fsm;
3644 pthread_rwlock_wrlock(&iwkv->rwl);
3645 iwfs_sync_flags flags = IWFS_FDATASYNC | _flags;
3646 rc = fsm->sync(fsm, flags);
3647 pthread_rwlock_unlock(&iwkv->rwl);
3648 }
3649 return rc;
3650 }
3651
iwkv_sync(IWKV iwkv,iwfs_sync_flags _flags)3652 iwrc iwkv_sync(IWKV iwkv, iwfs_sync_flags _flags) {
3653 ENSURE_OPEN(iwkv);
3654 if (iwkv->oflags & IWKV_RDONLY) {
3655 return IW_ERROR_READONLY;
3656 }
3657 iwrc rc;
3658 if (iwkv->dlsnr) {
3659 rc = iwkv_exclusive_lock(iwkv);
3660 RCRET(rc);
3661 rc = iwal_savepoint_exl(iwkv, true);
3662 iwkv_exclusive_unlock(iwkv);
3663 } else {
3664 IWFS_FSM *fsm = &iwkv->fsm;
3665 pthread_rwlock_wrlock(&iwkv->rwl);
3666 iwfs_sync_flags flags = IWFS_FDATASYNC | _flags;
3667 rc = fsm->sync(fsm, flags);
3668 pthread_rwlock_unlock(&iwkv->rwl);
3669 }
3670 return rc;
3671 }
3672
iwkv_db(IWKV iwkv,uint32_t dbid,iwdb_flags_t dbflg,IWDB * dbp)3673 iwrc iwkv_db(IWKV iwkv, uint32_t dbid, iwdb_flags_t dbflg, IWDB *dbp) {
3674 int rci;
3675 iwrc rc = 0;
3676 IWDB db = 0;
3677 *dbp = 0;
3678 API_RLOCK(iwkv, rci);
3679 khiter_t ki = kh_get(DBS, iwkv->dbs, dbid);
3680 if (ki != kh_end(iwkv->dbs)) {
3681 db = kh_value(iwkv->dbs, ki);
3682 }
3683 API_UNLOCK(iwkv, rci, rc);
3684 RCRET(rc);
3685 if (db) {
3686 if (db->dbflg != dbflg) {
3687 return IWKV_ERROR_INCOMPATIBLE_DB_MODE;
3688 }
3689 *dbp = db;
3690 return 0;
3691 }
3692 if (iwkv->oflags & IWKV_RDONLY) {
3693 return IW_ERROR_READONLY;
3694 }
3695 rc = iwkv_exclusive_lock(iwkv);
3696 RCRET(rc);
3697 ki = kh_get(DBS, iwkv->dbs, dbid);
3698 if (ki != kh_end(iwkv->dbs)) {
3699 db = kh_value(iwkv->dbs, ki);
3700 }
3701 if (db) {
3702 if (db->dbflg != dbflg) {
3703 return IWKV_ERROR_INCOMPATIBLE_DB_MODE;
3704 }
3705 *dbp = db;
3706 } else {
3707 rc = _db_create_lw(iwkv, dbid, dbflg, dbp);
3708 }
3709 if (!rc) {
3710 rc = iwal_savepoint_exl(iwkv, true);
3711 }
3712 iwkv_exclusive_unlock(iwkv);
3713 return rc;
3714 }
3715
iwkv_new_db(IWKV iwkv,iwdb_flags_t dbflg,uint32_t * dbidp,IWDB * dbp)3716 iwrc iwkv_new_db(IWKV iwkv, iwdb_flags_t dbflg, uint32_t *dbidp, IWDB *dbp) {
3717 *dbp = 0;
3718 *dbidp = 0;
3719 if (iwkv->oflags & IWKV_RDONLY) {
3720 return IW_ERROR_READONLY;
3721 }
3722 uint32_t dbid = 0;
3723 iwrc rc = iwkv_exclusive_lock(iwkv);
3724 RCRET(rc);
3725 for (khiter_t k = kh_begin(iwkv->dbs); k != kh_end(iwkv->dbs); ++k) {
3726 if (!kh_exist(iwkv->dbs, k)) continue;
3727 uint32_t id = kh_key(iwkv->dbs, k);
3728 if (id > dbid) dbid = id;
3729 }
3730 dbid++;
3731 rc = _db_create_lw(iwkv, dbid, dbflg, dbp);
3732 if (!rc) {
3733 *dbidp = dbid;
3734 rc = iwal_savepoint_exl(iwkv, true);
3735 }
3736 iwkv_exclusive_unlock(iwkv);
3737 return rc;
3738 }
3739
iwkv_db_cache_release(IWDB db)3740 iwrc iwkv_db_cache_release(IWDB db) {
3741 if (!db || !db->iwkv) {
3742 return IW_ERROR_INVALID_ARGS;
3743 }
3744 int rci;
3745 iwrc rc = 0;
3746 API_DB_WLOCK(db, rci);
3747 _dbcache_destroy_lw(db);
3748 API_DB_UNLOCK(db, rci, rc);
3749 return rc;
3750 }
3751
iwkv_db_destroy(IWDB * dbp)3752 iwrc iwkv_db_destroy(IWDB *dbp) {
3753 if (!dbp || !*dbp) {
3754 return IW_ERROR_INVALID_ARGS;
3755 }
3756 IWDB db = *dbp;
3757 IWKV iwkv = db->iwkv;
3758 *dbp = 0;
3759 if (iwkv->oflags & IWKV_RDONLY) {
3760 return IW_ERROR_READONLY;
3761 }
3762 iwrc rc = iwkv_exclusive_lock(iwkv);
3763 RCRET(rc);
3764 rc = _db_destroy_lw(&db);
3765 iwkv_exclusive_unlock(iwkv);
3766 return rc;
3767 }
3768
iwkv_puth(IWDB db,const IWKV_val * key,const IWKV_val * val,iwkv_opflags opflags,IWKV_PUT_HANDLER ph,void * phop)3769 iwrc iwkv_puth(IWDB db, const IWKV_val *key, const IWKV_val *val,
3770 iwkv_opflags opflags, IWKV_PUT_HANDLER ph, void *phop) {
3771 if (!db || !db->iwkv || !key || !key->size || !val) {
3772 return IW_ERROR_INVALID_ARGS;
3773 }
3774 IWKV iwkv = db->iwkv;
3775 if (iwkv->oflags & IWKV_RDONLY) {
3776 return IW_ERROR_READONLY;
3777 }
3778 if (opflags & IWKV_VAL_INCREMENT) {
3779 // No overwrite for increment
3780 opflags &= ~IWKV_NO_OVERWRITE;
3781 }
3782
3783 int rci;
3784 IWKV_val ekey;
3785 uint8_t nbuf[IW_VNUMBUFSZ];
3786 iwrc rc = _to_effective_key(db, key, &ekey, nbuf);
3787 RCRET(rc);
3788
3789 IWLCTX lx = {
3790 .db = db,
3791 .key = &ekey,
3792 .val = (IWKV_val *) val,
3793 .nlvl = -1,
3794 .op = IWLCTX_PUT,
3795 .opflags = opflags,
3796 .ph = ph,
3797 .phop = phop
3798 };
3799 API_DB_WLOCK(db, rci);
3800 if (!db->cache.open) {
3801 rc = _dbcache_fill_lw(&lx);
3802 RCGO(rc, finish);
3803 }
3804 rc = _lx_put_lw(&lx);
3805
3806 finish:
3807 API_DB_UNLOCK(db, rci, rc);
3808 if (!rc) {
3809 if (lx.opflags & IWKV_SYNC) {
3810 rc = _iwkv_sync(iwkv, 0);
3811 } else {
3812 rc = iwal_poke_checkpoint(iwkv, false);
3813 }
3814 }
3815 return rc;
3816 }
3817
iwkv_put(IWDB db,const IWKV_val * key,const IWKV_val * val,iwkv_opflags opflags)3818 iwrc iwkv_put(IWDB db, const IWKV_val *key, const IWKV_val *val, iwkv_opflags opflags) {
3819 return iwkv_puth(db, key, val, opflags, 0, 0);
3820 }
3821
iwkv_get(IWDB db,const IWKV_val * key,IWKV_val * oval)3822 iwrc iwkv_get(IWDB db, const IWKV_val *key, IWKV_val *oval) {
3823 if (!db || !db->iwkv || !key || !oval) {
3824 return IW_ERROR_INVALID_ARGS;
3825 }
3826
3827 int rci;
3828 IWKV_val ekey;
3829 uint8_t nbuf[IW_VNUMBUFSZ];
3830 iwrc rc = _to_effective_key(db, key, &ekey, nbuf);
3831 RCRET(rc);
3832
3833 IWLCTX lx = {
3834 .db = db,
3835 .key = &ekey,
3836 .val = oval,
3837 .nlvl = -1
3838 };
3839 oval->size = 0;
3840 if (IW_LIKELY(db->cache.open)) {
3841 API_DB_RLOCK(db, rci);
3842 } else {
3843 API_DB_WLOCK(db, rci);
3844 if (!db->cache.open) { // -V547
3845 rc = _dbcache_fill_lw(&lx);
3846 RCGO(rc, finish);
3847 }
3848 }
3849 rc = _lx_get_lr(&lx);
3850
3851 finish:
3852 API_DB_UNLOCK(db, rci, rc);
3853 return rc;
3854 }
3855
iwkv_get_copy(IWDB db,const IWKV_val * key,void * vbuf,size_t vbufsz,size_t * vsz)3856 iwrc iwkv_get_copy(IWDB db, const IWKV_val *key, void *vbuf, size_t vbufsz, size_t *vsz) {
3857 if (!db || !db->iwkv || !key || !vbuf) {
3858 return IW_ERROR_INVALID_ARGS;
3859 }
3860 *vsz = 0;
3861
3862 int rci;
3863 bool found;
3864 IWKV_val ekey;
3865 uint32_t ovalsz;
3866 uint8_t *mm = 0, *oval, idx;
3867 IWFS_FSM *fsm = &db->iwkv->fsm;
3868 uint8_t nbuf[IW_VNUMBUFSZ];
3869 iwrc rc = _to_effective_key(db, key, &ekey, nbuf);
3870 RCRET(rc);
3871
3872 IWLCTX lx = {
3873 .db = db,
3874 .key = &ekey,
3875 .nlvl = -1
3876 };
3877 if (IW_LIKELY(db->cache.open)) {
3878 API_DB_RLOCK(db, rci);
3879 } else {
3880 API_DB_WLOCK(db, rci);
3881 if (!db->cache.open) { // -V547
3882 rc = _dbcache_fill_lw(&lx);
3883 RCGO(rc, finish);
3884 }
3885 }
3886 rc = _lx_find_bounds(&lx);
3887 RCGO(rc, finish);
3888 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
3889 RCGO(rc, finish);
3890 rc = _sblk_loadkvblk_mm(&lx, lx.lower, mm);
3891 RCGO(rc, finish);
3892 rc = _sblk_find_pi_mm(lx.lower, &lx, mm, &found, &idx);
3893 RCGO(rc, finish);
3894 if (found) {
3895 _kvblk_value_peek(lx.lower->kvblk, lx.lower->pi[idx], mm, &oval, &ovalsz);
3896 *vsz = ovalsz;
3897 memcpy(vbuf, oval, MIN(vbufsz, ovalsz));
3898 } else {
3899 rc = IWKV_ERROR_NOTFOUND;
3900 }
3901
3902 finish:
3903 if (mm) {
3904 IWRC(fsm->release_mmap(fsm), rc);
3905 }
3906 _lx_release_mm(&lx, 0);
3907 API_DB_UNLOCK(db, rci, rc);
3908 return rc;
3909 }
3910
iwkv_db_set_meta(IWDB db,void * buf,size_t sz)3911 iwrc iwkv_db_set_meta(IWDB db, void *buf, size_t sz) {
3912 if (!db || !db->iwkv || !buf) {
3913 return IW_ERROR_INVALID_ARGS;
3914 }
3915 if (!sz) {
3916 return 0;
3917 }
3918
3919 int rci;
3920 iwrc rc = 0;
3921 bool resized = false;
3922 uint8_t *mm = 0, *wp, *sp;
3923 IWFS_FSM *fsm = &db->iwkv->fsm;
3924 size_t asz = IW_ROUNDUP(sz, 1U << IWKV_FSM_BPOW);
3925
3926 API_DB_WLOCK(db, rci);
3927 if (asz > db->meta_blkn || asz * 2 <= db->meta_blkn) {
3928 off_t oaddr = 0;
3929 off_t olen = 0;
3930 if (db->meta_blk) {
3931 rc = fsm->deallocate(fsm, BLK2ADDR(db->meta_blk), BLK2ADDR(db->meta_blkn));
3932 RCGO(rc, finish);
3933 }
3934 rc = fsm->allocate(fsm, asz, &oaddr, &olen, IWKV_FSM_ALLOC_FLAGS);
3935 RCGO(rc, finish);
3936 db->meta_blk = ADDR2BLK(oaddr);
3937 db->meta_blkn = ADDR2BLK(olen);
3938 resized = true;
3939 }
3940 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
3941 RCGO(rc, finish);
3942 wp = mm + BLK2ADDR(db->meta_blk);
3943 memcpy(wp, buf, sz);
3944 if (db->iwkv->dlsnr) {
3945 rc = db->iwkv->dlsnr->onwrite(db->iwkv->dlsnr, wp - mm, wp, sz, 0);
3946 RCGO(rc, finish);
3947 }
3948 if (resized) {
3949 uint32_t lv;
3950 wp = mm + db->addr + DOFF_METABLK_U4;
3951 sp = wp;
3952 IW_WRITELV(wp, lv, db->meta_blk);
3953 IW_WRITELV(wp, lv, db->meta_blkn);
3954 if (db->iwkv->dlsnr) {
3955 rc = db->iwkv->dlsnr->onwrite(db->iwkv->dlsnr, sp - mm, sp, wp - sp, 0);
3956 RCGO(rc, finish);
3957 }
3958 }
3959 fsm->release_mmap(fsm);
3960 mm = 0;
3961
3962 finish:
3963 if (mm) {
3964 fsm->release_mmap(fsm);
3965 }
3966 API_DB_UNLOCK(db, rci, rc);
3967 return rc;
3968 }
3969
iwkv_db_get_meta(IWDB db,void * buf,size_t sz,size_t * rsz)3970 iwrc iwkv_db_get_meta(IWDB db, void *buf, size_t sz, size_t *rsz) {
3971 if (!db || !db->iwkv || !buf) {
3972 return IW_ERROR_INVALID_ARGS;
3973 }
3974 *rsz = 0;
3975 if (!sz || !db->meta_blkn) {
3976 return 0;
3977 }
3978 int rci;
3979 iwrc rc = 0;
3980 uint8_t *mm = 0;
3981 IWFS_FSM *fsm = &db->iwkv->fsm;
3982 size_t rmax = BLK2ADDR(db->meta_blkn);
3983 if (sz > rmax) {
3984 sz = rmax;
3985 }
3986 API_DB_RLOCK(db, rci);
3987 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
3988 RCGO(rc, finish);
3989 memcpy(buf, mm + BLK2ADDR(db->meta_blk), sz);
3990 *rsz = sz;
3991
3992 finish:
3993 if (mm) {
3994 fsm->release_mmap(fsm);
3995 }
3996 API_DB_UNLOCK(db, rci, rc);
3997 return rc;
3998 }
3999
iwkv_del(IWDB db,const IWKV_val * key,iwkv_opflags opflags)4000 iwrc iwkv_del(IWDB db, const IWKV_val *key, iwkv_opflags opflags) {
4001 if (!db || !db->iwkv || !key) {
4002 return IW_ERROR_INVALID_ARGS;
4003 }
4004 int rci;
4005 IWKV_val ekey;
4006 IWKV iwkv = db->iwkv;
4007
4008 uint8_t nbuf[IW_VNUMBUFSZ];
4009 iwrc rc = _to_effective_key(db, key, &ekey, nbuf);
4010 RCRET(rc);
4011 IWLCTX lx = {
4012 .db = db,
4013 .key = &ekey,
4014 .nlvl = -1,
4015 .op = IWLCTX_DEL,
4016 .opflags = opflags
4017 };
4018 API_DB_WLOCK(db, rci);
4019 if (!db->cache.open) {
4020 rc = _dbcache_fill_lw(&lx);
4021 RCGO(rc, finish);
4022 }
4023 rc = _lx_del_lw(&lx);
4024
4025 finish:
4026 API_DB_UNLOCK(db, rci, rc);
4027 if (!rc) {
4028 if (lx.opflags & IWKV_SYNC) {
4029 rc = _iwkv_sync(iwkv, 0);
4030 } else {
4031 rc = iwal_poke_checkpoint(iwkv, false);
4032 }
4033 }
4034 return rc;
4035 }
4036
_cursor_close_lw(IWKV_cursor cur)4037 IW_INLINE iwrc _cursor_close_lw(IWKV_cursor cur) {
4038 iwrc rc = 0;
4039 cur->closed = true;
4040 IWDB db = cur->lx.db;
4041 pthread_spin_lock(&db->cursors_slk);
4042 for (IWKV_cursor c = db->cursors, pc = 0; c; pc = c, c = c->next) {
4043 if (c == cur) {
4044 if (pc) {
4045 pc->next = c->next;
4046 } else {
4047 db->cursors = c->next;
4048 }
4049 break;
4050 }
4051 }
4052 pthread_spin_unlock(&db->cursors_slk);
4053 return rc;
4054 }
4055
iwkv_cursor_open(IWDB db,IWKV_cursor * curptr,IWKV_cursor_op op,const IWKV_val * key)4056 iwrc iwkv_cursor_open(IWDB db,
4057 IWKV_cursor *curptr,
4058 IWKV_cursor_op op,
4059 const IWKV_val *key) {
4060 if (!db || !db->iwkv || !curptr ||
4061 (key && op < IWKV_CURSOR_EQ) || op < IWKV_CURSOR_BEFORE_FIRST) {
4062 return IW_ERROR_INVALID_ARGS;
4063 }
4064 iwrc rc;
4065 int rci;
4066 rc = _db_worker_inc_nolk(db);
4067 RCRET(rc);
4068 if (IW_LIKELY(db->cache.open)) {
4069 rc = _api_db_rlock(db);
4070 } else {
4071 rc = _api_db_wlock(db);
4072 }
4073 if (rc) {
4074 _db_worker_dec_nolk(db);
4075 return rc;
4076 }
4077 IWKV_cursor cur = 0;
4078 *curptr = calloc(1, sizeof(**curptr));
4079 if (!(*curptr)) {
4080 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
4081 goto finish;
4082 }
4083 cur = *curptr;
4084 IWLCTX *lx = &cur->lx;
4085 if (key) {
4086 rc = _to_effective_key(db, key, &lx->ekey, lx->nbuf);
4087 RCGO(rc, finish);
4088 lx->key = &lx->ekey;
4089 }
4090 lx->db = db;
4091 lx->nlvl = -1;
4092 if (!db->cache.open) {
4093 rc = _dbcache_fill_lw(lx);
4094 RCGO(rc, finish);
4095 }
4096 rc = _cursor_to_lr(cur, op);
4097
4098 finish:
4099 if (cur) {
4100 if (rc) {
4101 *curptr = 0;
4102 IWRC(_cursor_close_lw(cur), rc);
4103 free(cur);
4104 } else {
4105 pthread_spin_lock(&db->cursors_slk);
4106 cur->next = db->cursors;
4107 db->cursors = cur;
4108 pthread_spin_unlock(&db->cursors_slk);
4109 }
4110 }
4111 API_DB_UNLOCK(db, rci, rc);
4112 if (rc) {
4113 _db_worker_dec_nolk(db);
4114 }
4115 return rc;
4116 }
4117
iwkv_cursor_close(IWKV_cursor * curp)4118 iwrc iwkv_cursor_close(IWKV_cursor *curp) {
4119 iwrc rc = 0;
4120 int rci;
4121 if (!curp || !*curp) {
4122 return 0;
4123 }
4124 IWKV_cursor cur = *curp;
4125 *curp = 0;
4126 IWKV iwkv = cur->lx.db->iwkv;
4127 if (cur->closed) {
4128 free(cur);
4129 return 0;
4130 }
4131 if (!cur->lx.db) {
4132 return IW_ERROR_INVALID_ARGS;
4133 }
4134 API_DB_WLOCK(cur->lx.db, rci);
4135 rc = _cursor_close_lw(cur);
4136 API_DB_UNLOCK(cur->lx.db, rci, rc);
4137 IWRC(_db_worker_dec_nolk(cur->lx.db), rc);
4138 free(cur);
4139 if (!rc) {
4140 rc = iwal_poke_checkpoint(iwkv, false);
4141 }
4142 return rc;
4143 }
4144
iwkv_cursor_to(IWKV_cursor cur,IWKV_cursor_op op)4145 iwrc iwkv_cursor_to(IWKV_cursor cur, IWKV_cursor_op op) {
4146 int rci;
4147 if (!cur) {
4148 return IW_ERROR_INVALID_ARGS;
4149 }
4150 if (!cur->lx.db) {
4151 return IW_ERROR_INVALID_ARGS;
4152 }
4153 API_DB_RLOCK(cur->lx.db, rci);
4154 iwrc rc = _cursor_to_lr(cur, op);
4155 API_DB_UNLOCK(cur->lx.db, rci, rc);
4156 return rc;
4157 }
4158
iwkv_cursor_to_key(IWKV_cursor cur,IWKV_cursor_op op,const IWKV_val * key)4159 iwrc iwkv_cursor_to_key(IWKV_cursor cur, IWKV_cursor_op op, const IWKV_val *key) {
4160 int rci;
4161 if (!cur || (op != IWKV_CURSOR_EQ && op != IWKV_CURSOR_GE)) {
4162 return IW_ERROR_INVALID_ARGS;
4163 }
4164 IWLCTX *lx = &cur->lx;
4165 if (!lx->db) {
4166 return IW_ERROR_INVALID_STATE;
4167 }
4168 iwrc rc = _to_effective_key(lx->db, key, &lx->ekey, lx->nbuf);
4169 RCRET(rc);
4170
4171 API_DB_RLOCK(lx->db, rci);
4172 lx->key = &lx->ekey;
4173 rc = _cursor_to_lr(cur, op);
4174 API_DB_UNLOCK(lx->db, rci, rc);
4175 return rc;
4176 }
4177
iwkv_cursor_get(IWKV_cursor cur,IWKV_val * okey,IWKV_val * oval)4178 iwrc iwkv_cursor_get(IWKV_cursor cur,
4179 IWKV_val *okey, /* Nullable */
4180 IWKV_val *oval) { /* Nullable */
4181 int rci;
4182 iwrc rc = 0;
4183 if (!cur || !cur->lx.db) {
4184 return IW_ERROR_INVALID_ARGS;
4185 }
4186 if (!cur->cn || (cur->cn->flags & SBLK_DB) || cur->cnpos >= cur->cn->pnum) {
4187 return IWKV_ERROR_NOTFOUND;
4188 }
4189 IWLCTX *lx = &cur->lx;
4190 API_DB_RLOCK(lx->db, rci);
4191 uint8_t *mm = 0;
4192 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
4193 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
4194 RCGO(rc, finish);
4195 if (!cur->cn->kvblk) {
4196 rc = _sblk_loadkvblk_mm(lx, cur->cn, mm);
4197 RCGO(rc, finish);
4198 }
4199 uint8_t idx = cur->cn->pi[cur->cnpos];
4200 if (okey && oval) {
4201 rc = _kvblk_kv_get(cur->cn->kvblk, mm, idx, okey, oval);
4202 } else if (oval) {
4203 rc = _kvblk_value_get(cur->cn->kvblk, mm, idx, oval);
4204 } else if (okey) {
4205 rc = _kvblk_key_get(cur->cn->kvblk, mm, idx, okey);
4206 } else {
4207 rc = IW_ERROR_INVALID_ARGS;
4208 }
4209 if (!rc && okey) {
4210 _unpack_effective_key(lx->db, okey, false);
4211 }
4212 finish:
4213 if (mm) {
4214 fsm->release_mmap(fsm);
4215 }
4216 API_DB_UNLOCK(lx->db, rci, rc);
4217 return rc;
4218 }
4219
iwkv_cursor_copy_val(IWKV_cursor cur,void * vbuf,size_t vbufsz,size_t * vsz)4220 iwrc iwkv_cursor_copy_val(IWKV_cursor cur, void *vbuf, size_t vbufsz, size_t *vsz) {
4221 int rci;
4222 iwrc rc = 0;
4223 if (!cur || !vbuf || !cur->lx.db) {
4224 return IW_ERROR_INVALID_ARGS;
4225 }
4226 if (!cur->cn || (cur->cn->flags & SBLK_DB) || cur->cnpos >= cur->cn->pnum) {
4227 return IWKV_ERROR_NOTFOUND;
4228 }
4229
4230 *vsz = 0;
4231 IWLCTX *lx = &cur->lx;
4232 API_DB_RLOCK(lx->db, rci);
4233 uint8_t *mm = 0, *oval;
4234 uint32_t ovalsz;
4235 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
4236 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
4237 RCGO(rc, finish);
4238 if (!cur->cn->kvblk) {
4239 rc = _sblk_loadkvblk_mm(lx, cur->cn, mm);
4240 RCGO(rc, finish);
4241 }
4242 uint8_t idx = cur->cn->pi[cur->cnpos];
4243 _kvblk_value_peek(cur->cn->kvblk, idx, mm, &oval, &ovalsz);
4244 *vsz = ovalsz;
4245 memcpy(vbuf, oval, MIN(vbufsz, ovalsz));
4246
4247 finish:
4248 if (mm) {
4249 fsm->release_mmap(fsm);
4250 }
4251 API_DB_UNLOCK(lx->db, rci, rc);
4252 return rc;
4253 }
4254
iwkv_cursor_is_matched_key(IWKV_cursor cur,const IWKV_val * key,bool * ores,int64_t * ocompound)4255 iwrc iwkv_cursor_is_matched_key(IWKV_cursor cur, const IWKV_val *key, bool *ores, int64_t *ocompound) {
4256 int rci;
4257 iwrc rc = 0;
4258 if (!cur || !ores || !key || !cur->lx.db) {
4259 return IW_ERROR_INVALID_ARGS;
4260 }
4261 if (!cur->cn || (cur->cn->flags & SBLK_DB) || cur->cnpos >= cur->cn->pnum) {
4262 return IWKV_ERROR_NOTFOUND;
4263 }
4264
4265 *ores = 0;
4266 if (ocompound) *ocompound = 0;
4267
4268 IWLCTX *lx = &cur->lx;
4269 API_DB_RLOCK(lx->db, rci);
4270 uint8_t *mm = 0, *okey;
4271 uint32_t okeysz;
4272 iwdb_flags_t dbflg = lx->db->dbflg;
4273 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
4274 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
4275 RCGO(rc, finish);
4276 if (!cur->cn->kvblk) {
4277 rc = _sblk_loadkvblk_mm(lx, cur->cn, mm);
4278 RCGO(rc, finish);
4279 }
4280
4281 uint8_t idx = cur->cn->pi[cur->cnpos];
4282 rc = _kvblk_key_peek(cur->cn->kvblk, idx, mm, &okey, &okeysz);
4283 RCGO(rc, finish);
4284
4285 if (dbflg & (IWDB_COMPOUND_KEYS | IWDB_VNUM64_KEYS)) {
4286 char nbuf[2 * IW_VNUMBUFSZ];
4287 IWKV_val rkey = {.data = nbuf, .size = okeysz};
4288 memcpy(rkey.data, okey, MIN(rkey.size, sizeof(nbuf)));
4289 rc = _unpack_effective_key(lx->db, &rkey, true);
4290 RCGO(rc, finish);
4291 if (ocompound) {
4292 *ocompound = rkey.compound;
4293 }
4294 if (rkey.size != key->size) {
4295 *ores = false;
4296 goto finish;
4297 }
4298 if (dbflg & IWDB_VNUM64_KEYS) {
4299 *ores = !memcmp(rkey.data, key->data, key->size);
4300 } else {
4301 *ores = !memcmp(okey + (okeysz - rkey.size), key->data, key->size);
4302 }
4303 } else {
4304 *ores = (okeysz == key->size) && !memcmp(okey, key->data, key->size);
4305 }
4306
4307 finish:
4308 if (mm) {
4309 fsm->release_mmap(fsm);
4310 }
4311 API_DB_UNLOCK(cur->lx.db, rci, rc);
4312 return rc;
4313 }
4314
iwkv_cursor_copy_key(IWKV_cursor cur,void * kbuf,size_t kbufsz,size_t * ksz,int64_t * compound)4315 iwrc iwkv_cursor_copy_key(IWKV_cursor cur, void *kbuf, size_t kbufsz, size_t *ksz, int64_t *compound) {
4316 int rci;
4317 iwrc rc = 0;
4318 if (!cur || !cur->lx.db) {
4319 return IW_ERROR_INVALID_ARGS;
4320 }
4321 if (!cur->cn || (cur->cn->flags & SBLK_DB) || cur->cnpos >= cur->cn->pnum) {
4322 return IWKV_ERROR_NOTFOUND;
4323 }
4324
4325 *ksz = 0;
4326 IWLCTX *lx = &cur->lx;
4327 API_DB_RLOCK(lx->db, rci);
4328 uint8_t *mm = 0, *okey;
4329 uint32_t okeysz;
4330 iwdb_flags_t dbflg = lx->db->dbflg;
4331 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
4332 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
4333 RCGO(rc, finish);
4334 if (!cur->cn->kvblk) {
4335 rc = _sblk_loadkvblk_mm(lx, cur->cn, mm);
4336 RCGO(rc, finish);
4337 }
4338
4339 uint8_t idx = cur->cn->pi[cur->cnpos];
4340 rc = _kvblk_key_peek(cur->cn->kvblk, idx, mm, &okey, &okeysz);
4341 RCGO(rc, finish);
4342
4343 if (dbflg & (IWDB_COMPOUND_KEYS | IWDB_VNUM64_KEYS)) {
4344 char nbuf[2 * IW_VNUMBUFSZ];
4345 IWKV_val rkey = {.data = nbuf, .size = okeysz};
4346 memcpy(rkey.data, okey, MIN(rkey.size, sizeof(nbuf)));
4347 rc = _unpack_effective_key(lx->db, &rkey, true);
4348 RCGO(rc, finish);
4349 if (compound) {
4350 *compound = rkey.compound;
4351 }
4352 *ksz = rkey.size;
4353 if (dbflg & IWDB_VNUM64_KEYS) {
4354 memcpy(kbuf, rkey.data, MIN(kbufsz, rkey.size));
4355 } else {
4356 memcpy(kbuf, okey + (okeysz - rkey.size), MIN(kbufsz, rkey.size));
4357 }
4358 } else {
4359 *ksz = okeysz;
4360 if (compound) *compound = 0;
4361 memcpy(kbuf, okey, MIN(kbufsz, okeysz));
4362 }
4363
4364 finish:
4365 if (mm) {
4366 fsm->release_mmap(fsm);
4367 }
4368 API_DB_UNLOCK(cur->lx.db, rci, rc);
4369 return rc;
4370 }
4371
iwkv_cursor_seth(IWKV_cursor cur,IWKV_val * val,iwkv_opflags opflags,IWKV_PUT_HANDLER ph,void * phop)4372 IW_EXPORT iwrc iwkv_cursor_seth(IWKV_cursor cur, IWKV_val *val, iwkv_opflags opflags,
4373 IWKV_PUT_HANDLER ph, void *phop) {
4374 int rci;
4375 iwrc rc = 0, irc = 0;
4376 if (!cur || !cur->lx.db) {
4377 return IW_ERROR_INVALID_ARGS;
4378 }
4379 if (!cur->cn || (cur->cn->flags & SBLK_DB) || cur->cnpos >= cur->cn->pnum) {
4380 return IWKV_ERROR_NOTFOUND;
4381 }
4382
4383 IWLCTX *lx = &cur->lx;
4384 IWDB db = lx->db;
4385 IWKV iwkv = db->iwkv;
4386 SBLK *sblk = cur->cn;
4387
4388 API_DB_WLOCK(db, rci);
4389 if (ph) {
4390 uint8_t *mm;
4391 IWKV_val key, oldval;
4392 IWFS_FSM *fsm = &db->iwkv->fsm;
4393 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
4394 RCGO(rc, finish);
4395 rc = _kvblk_kv_get(sblk->kvblk, mm, sblk->pi[cur->cnpos], &key, &oldval);
4396 fsm->release_mmap(fsm);
4397 if (!rc) {
4398 // note: oldval should be disposed by ph
4399 rc = ph(&key, val, &oldval, phop);
4400 _kv_val_dispose(&key);
4401 }
4402 RCGO(rc, finish);
4403 }
4404
4405 rc = _sblk_updatekv(sblk, cur->cnpos, 0, val);
4406 if (IWKV_IS_INTERNAL_RC(rc)) {
4407 irc = rc;
4408 rc = 0;
4409 }
4410 RCGO(rc, finish);
4411
4412 rc = _sblk_sync(lx, sblk);
4413 RCGO(rc, finish);
4414
4415 // Update active cursors inside this block
4416 pthread_spin_lock(&db->cursors_slk);
4417 for (IWKV_cursor c = db->cursors; c; c = c->next) {
4418 if (c->cn && c->cn->addr == sblk->addr) {
4419 if (c->cn != sblk) {
4420 memcpy(c->cn, sblk, sizeof(*c->cn));
4421 c->cn->kvblk = 0;
4422 c->cn->flags &= SBLK_PERSISTENT_FLAGS;
4423 }
4424 }
4425 }
4426 pthread_spin_unlock(&db->cursors_slk);
4427
4428 finish:
4429 API_DB_UNLOCK(db, rci, rc);
4430 if (!rc) {
4431 if (opflags & IWKV_SYNC) {
4432 rc = _iwkv_sync(iwkv, 0);
4433 } else {
4434 rc = iwal_poke_checkpoint(iwkv, false);
4435 }
4436 }
4437 return rc ? rc : irc;
4438 }
4439
iwkv_cursor_set(IWKV_cursor cur,IWKV_val * val,iwkv_opflags opflags)4440 iwrc iwkv_cursor_set(IWKV_cursor cur, IWKV_val *val, iwkv_opflags opflags) {
4441 return iwkv_cursor_seth(cur, val, opflags, 0, 0);
4442 }
4443
iwkv_cursor_val(IWKV_cursor cur,IWKV_val * oval)4444 iwrc iwkv_cursor_val(IWKV_cursor cur, IWKV_val *oval) {
4445 return iwkv_cursor_get(cur, 0, oval);
4446 }
4447
iwkv_cursor_key(IWKV_cursor cur,IWKV_val * okey)4448 iwrc iwkv_cursor_key(IWKV_cursor cur, IWKV_val *okey) {
4449 return iwkv_cursor_get(cur, okey, 0);
4450 }
4451
iwkv_cursor_del(IWKV_cursor cur,iwkv_opflags opflags)4452 iwrc iwkv_cursor_del(IWKV_cursor cur, iwkv_opflags opflags) {
4453 int rci;
4454 iwrc rc = 0;
4455 if (!cur || !cur->lx.db) {
4456 return IW_ERROR_INVALID_ARGS;
4457 }
4458 if (!cur->cn || (cur->cn->flags & SBLK_DB) || cur->cnpos >= cur->cn->pnum) {
4459 return IWKV_ERROR_NOTFOUND;
4460 }
4461
4462 uint8_t *mm;
4463 SBLK *sblk = cur->cn;
4464 IWLCTX *lx = &cur->lx;
4465 IWDB db = lx->db;
4466 IWKV iwkv = db->iwkv;
4467 IWFS_FSM *fsm = &iwkv->fsm;
4468
4469 API_DB_WLOCK(db, rci);
4470 if (!db->cache.open) {
4471 rc = _dbcache_fill_lw(lx);
4472 RCGO(rc, finish);
4473 }
4474 if (sblk->pnum == 1) { // sblk will be removed
4475 IWKV_val key = {0};
4476 // Key a key
4477 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
4478 RCGO(rc, finish2);
4479 if (!sblk->kvblk) {
4480 rc = _sblk_loadkvblk_mm(lx, sblk, mm);
4481 fsm->release_mmap(fsm);
4482 RCGO(rc, finish2);
4483 }
4484 rc = _kvblk_key_get(sblk->kvblk, mm, sblk->pi[cur->cnpos], &key);
4485 fsm->release_mmap(fsm);
4486 RCGO(rc, finish2);
4487
4488 lx->key = &key;
4489 rc = _lx_del_sblk_lw(lx, sblk, cur->cnpos);
4490 lx->key = 0;
4491
4492 finish2:
4493 if (rc) {
4494 _lx_release_mm(lx, 0);
4495 } else {
4496 rc = _lx_release(lx);
4497 }
4498 if (key.data) {
4499 _kv_val_dispose(&key);
4500 }
4501 } else { // Simple case
4502 if (!sblk->kvblk) {
4503 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
4504 RCGO(rc, finish);
4505 rc = _sblk_loadkvblk_mm(lx, sblk, mm);
4506 fsm->release_mmap(fsm);
4507 RCGO(rc, finish);
4508 }
4509 rc = _sblk_rmkv(sblk, cur->cnpos);
4510 RCGO(rc, finish);
4511 rc = _sblk_sync(lx, sblk);
4512 }
4513
4514 finish:
4515 API_DB_UNLOCK(db, rci, rc);
4516 if (!rc) {
4517 if (opflags & IWKV_SYNC) {
4518 rc = _iwkv_sync(iwkv, 0);
4519 } else {
4520 rc = iwal_poke_checkpoint(iwkv, false);
4521 }
4522 }
4523 return rc;
4524 }
4525
4526 #include "./dbg/iwkvdbg.c"
4527