1 // -V::512
2
3 #include "iwkv_internal.h"
4 #include "iwconv.h"
5 #include <stdalign.h>
6
7 #define _wnw_db_wl(db_) _api_db_wlock(db_)
8
9 #ifdef IW_TESTS
10 volatile int8_t iwkv_next_level = -1;
11 #endif
12 atomic_uint_fast64_t g_trigger;
13
14 #define IWKV_IS_INTERNAL_RC(rc_) ((rc_) > _IWKV_ERROR_END && (rc_) < _IWKV_RC_END)
15
_to_effective_key(struct _IWDB * db,const IWKV_val * key,IWKV_val * okey,uint8_t nbuf[static IW_VNUMBUFSZ])16 IW_SOFT_INLINE iwrc _to_effective_key(
17 struct _IWDB *db, const IWKV_val *key, IWKV_val *okey,
18 uint8_t nbuf[static IW_VNUMBUFSZ]
19 ) {
20 static_assert(IW_VNUMBUFSZ >= sizeof(uint64_t), "IW_VNUMBUFSZ >= sizeof(uint64_t)");
21 iwdb_flags_t dbflg = db->dbflg;
22 // Keys compound will be processed at lower levels at `addkv` routines
23 okey->compound = key->compound;
24 if (dbflg & IWDB_VNUM64_KEYS) {
25 unsigned len;
26 if (key->size == 8) {
27 uint64_t llv;
28 memcpy(&llv, key->data, sizeof(llv));
29 IW_SETVNUMBUF64(len, nbuf, llv);
30 if (!len) {
31 return IW_ERROR_OVERFLOW;
32 }
33 okey->size = len;
34 okey->data = nbuf;
35 } else if (key->size == 4) {
36 uint32_t lv;
37 memcpy(&lv, key->data, sizeof(lv));
38 IW_SETVNUMBUF(len, nbuf, lv);
39 if (!len) {
40 return IW_ERROR_OVERFLOW;
41 }
42 okey->size = len;
43 okey->data = nbuf;
44 } else {
45 return IWKV_ERROR_KEY_NUM_VALUE_SIZE;
46 }
47 } else {
48 okey->data = key->data;
49 okey->size = key->size;
50 }
51 return 0;
52 }
53
54 // NOTE: at least `2*IW_VNUMBUFSZ` must be allocated for key->data
_unpack_effective_key(struct _IWDB * db,IWKV_val * key,bool no_move_key_data)55 static iwrc _unpack_effective_key(struct _IWDB *db, IWKV_val *key, bool no_move_key_data) {
56 iwdb_flags_t dbflg = db->dbflg;
57 uint8_t *data = key->data;
58 if (dbflg & IWDB_COMPOUND_KEYS) {
59 int step;
60 IW_READVNUMBUF64(key->data, key->compound, step);
61 if (step >= key->size) {
62 return IWKV_ERROR_KEY_NUM_VALUE_SIZE;
63 }
64 data += step;
65 key->size -= step;
66 if (!no_move_key_data && !(dbflg & IWDB_VNUM64_KEYS)) {
67 memmove(key->data, data, key->size);
68 }
69 } else {
70 key->compound = 0;
71 }
72 if (dbflg & IWDB_VNUM64_KEYS) {
73 int64_t llv;
74 char nbuf[IW_VNUMBUFSZ];
75 if (key->size > IW_VNUMBUFSZ) {
76 return IWKV_ERROR_KEY_NUM_VALUE_SIZE;
77 }
78 memcpy(nbuf, data, key->size);
79 IW_READVNUMBUF64_2(nbuf, llv);
80 memcpy(key->data, &llv, sizeof(llv));
81 key->size = sizeof(llv);
82 }
83 return 0;
84 }
85
_cmp_keys_prefix(iwdb_flags_t dbflg,const void * v1,int v1len,const IWKV_val * key)86 static int _cmp_keys_prefix(iwdb_flags_t dbflg, const void *v1, int v1len, const IWKV_val *key) {
87 int ret;
88 if (dbflg & IWDB_COMPOUND_KEYS) {
89 // Compound keys mode
90 const char *u1 = v1;
91 const char *u2 = key->data;
92 int step, v2len = (int) key->size;
93 int64_t c1, c2 = key->compound;
94 IW_READVNUMBUF64(v1, c1, step);
95 v1len -= step;
96 u1 += step;
97 if (v1len < 1) {
98 // Inconsistent data?
99 return v2len - v1len;
100 }
101 if (dbflg & IWDB_VNUM64_KEYS) {
102 if ((v2len != v1len) || (v2len > IW_VNUMBUFSZ) || (v1len > IW_VNUMBUFSZ)) {
103 return v2len - v1len;
104 }
105 int64_t n1, n2;
106 char vbuf[IW_VNUMBUFSZ];
107 memcpy(vbuf, u1, v1len);
108 IW_READVNUMBUF64_2(vbuf, n1);
109 memcpy(vbuf, u2, v2len);
110 IW_READVNUMBUF64_2(vbuf, n2);
111 ret = n1 > n2 ? -1 : n1 < n2 ? 1 : 0;
112 if (ret == 0) {
113 ret = c1 > c2 ? -1 : c1 < c2 ? 1 : 0;
114 }
115 } else if (dbflg & IWDB_REALNUM_KEYS) {
116 ret = iwafcmp(u2, v2len, u1, v1len);
117 if (ret == 0) {
118 ret = c1 > c2 ? -1 : c1 < c2 ? 1 : 0;
119 }
120 } else {
121 IW_CMP2(ret, u2, v2len, u1, v1len);
122 }
123 return ret;
124 } else {
125 int v2len = (int) key->size;
126 const void *v2 = key->data;
127 if (dbflg & IWDB_VNUM64_KEYS) {
128 if ((v2len != v1len) || (v2len > IW_VNUMBUFSZ) || (v1len > IW_VNUMBUFSZ)) {
129 return v2len - v1len;
130 }
131 int64_t n1, n2;
132 char vbuf[IW_VNUMBUFSZ];
133 memcpy(vbuf, v1, v1len);
134 IW_READVNUMBUF64_2(vbuf, n1);
135 memcpy(vbuf, v2, v2len);
136 IW_READVNUMBUF64_2(vbuf, n2);
137 return n1 > n2 ? -1 : n1 < n2 ? 1 : 0;
138 } else if (dbflg & IWDB_REALNUM_KEYS) {
139 return iwafcmp(v2, v2len, v1, v1len);
140 } else {
141 IW_CMP2(ret, v2, v2len, v1, v1len);
142 return ret;
143 }
144 }
145 }
146
_cmp_keys(iwdb_flags_t dbflg,const void * v1,int v1len,const IWKV_val * key)147 IW_INLINE int _cmp_keys(iwdb_flags_t dbflg, const void *v1, int v1len, const IWKV_val *key) {
148 int rv = _cmp_keys_prefix(dbflg, v1, v1len, key);
149 if ((rv == 0) && !(dbflg & (IWDB_VNUM64_KEYS | IWDB_REALNUM_KEYS))) {
150 if (dbflg & IWDB_COMPOUND_KEYS) {
151 int step;
152 int64_t c1, c2 = key->compound;
153 IW_READVNUMBUF64(v1, c1, step);
154 v1len -= step;
155 if ((int) key->size == v1len) {
156 return c1 > c2 ? -1 : c1 < c2 ? 1 : 0;
157 }
158 }
159 return (int) key->size - v1len;
160 } else {
161 return rv;
162 }
163 }
164
_kv_val_dispose(IWKV_val * v)165 IW_INLINE void _kv_val_dispose(IWKV_val *v) {
166 if (v) {
167 free(v->data);
168 v->size = 0;
169 v->data = 0;
170 }
171 }
172
_kv_dispose(IWKV_val * key,IWKV_val * val)173 IW_INLINE void _kv_dispose(IWKV_val *key, IWKV_val *val) {
174 _kv_val_dispose(key);
175 _kv_val_dispose(val);
176 }
177
iwkv_val_dispose(IWKV_val * v)178 void iwkv_val_dispose(IWKV_val *v) {
179 _kv_val_dispose(v);
180 }
181
iwkv_kv_dispose(IWKV_val * key,IWKV_val * val)182 void iwkv_kv_dispose(IWKV_val *key, IWKV_val *val) {
183 _kv_dispose(key, val);
184 }
185
_num2lebuf(uint8_t buf[static8],void * numdata,size_t sz)186 IW_INLINE void _num2lebuf(uint8_t buf[static 8], void *numdata, size_t sz) {
187 assert(sz == 4 || sz == 8);
188 if (sz > 4) {
189 uint64_t llv;
190 memcpy(&llv, numdata, sizeof(llv));
191 llv = IW_HTOILL(llv);
192 memcpy(buf, &llv, sizeof(llv));
193 } else {
194 uint32_t lv;
195 memcpy(&lv, numdata, sizeof(lv));
196 lv = IW_HTOIL(lv);
197 memcpy(buf, &lv, sizeof(lv));
198 }
199 }
200
201 //-------------------------- IWKV/IWDB WORKERS
202
_iwkv_worker_inc_nolk(IWKV iwkv)203 static WUR iwrc _iwkv_worker_inc_nolk(IWKV iwkv) {
204 if (!iwkv || !iwkv->open) {
205 return IW_ERROR_INVALID_STATE;
206 }
207 int rci = pthread_mutex_lock(&iwkv->wk_mtx);
208 if (rci) {
209 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
210 }
211 if (!iwkv->open) { // -V547
212 pthread_mutex_unlock(&iwkv->wk_mtx);
213 return IW_ERROR_INVALID_STATE;
214 }
215 while (iwkv->wk_pending_exclusive) {
216 pthread_cond_wait(&iwkv->wk_cond, &iwkv->wk_mtx);
217 }
218 ++iwkv->wk_count;
219 pthread_cond_broadcast(&iwkv->wk_cond);
220 pthread_mutex_unlock(&iwkv->wk_mtx);
221 return 0;
222 }
223
_db_worker_inc_nolk(IWDB db)224 static WUR iwrc _db_worker_inc_nolk(IWDB db) {
225 if (!db || !db->iwkv || !db->iwkv->open || !db->open) {
226 return IW_ERROR_INVALID_STATE;
227 }
228 IWKV iwkv = db->iwkv;
229 int rci = pthread_mutex_lock(&iwkv->wk_mtx);
230 if (rci) {
231 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
232 }
233 if (!iwkv->open || !db->open) { // -V560
234 pthread_mutex_unlock(&iwkv->wk_mtx);
235 return IW_ERROR_INVALID_STATE;
236 }
237 while (db->wk_pending_exclusive) {
238 pthread_cond_wait(&iwkv->wk_cond, &iwkv->wk_mtx);
239 }
240 ++iwkv->wk_count;
241 ++db->wk_count;
242 pthread_cond_broadcast(&iwkv->wk_cond);
243 pthread_mutex_unlock(&iwkv->wk_mtx);
244 return 0;
245 }
246
_iwkv_worker_dec_nolk(IWKV iwkv)247 static iwrc _iwkv_worker_dec_nolk(IWKV iwkv) {
248 if (!iwkv) {
249 return IW_ERROR_INVALID_STATE;
250 }
251 int rci = pthread_mutex_lock(&iwkv->wk_mtx);
252 if (rci) {
253 // Last chanсe to be consistent
254 --iwkv->wk_count;
255 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
256 }
257 --iwkv->wk_count;
258 pthread_cond_broadcast(&iwkv->wk_cond);
259 pthread_mutex_unlock(&iwkv->wk_mtx);
260 return 0;
261 }
262
_db_worker_dec_nolk(IWDB db)263 static iwrc _db_worker_dec_nolk(IWDB db) {
264 if (!db || !db->iwkv) { // do not use ENSURE_OPEN_DB here
265 return IW_ERROR_INVALID_STATE;
266 }
267 IWKV iwkv = db->iwkv;
268 int rci = pthread_mutex_lock(&iwkv->wk_mtx);
269 if (rci) {
270 // Last chanсe to be consistent
271 --iwkv->wk_count;
272 --db->wk_count;
273 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
274 }
275 --iwkv->wk_count;
276 --db->wk_count;
277 pthread_cond_broadcast(&iwkv->wk_cond);
278 pthread_mutex_unlock(&iwkv->wk_mtx);
279 return 0;
280 }
281
_wnw_iwkw_wl(IWKV iwkv)282 static WUR iwrc _wnw_iwkw_wl(IWKV iwkv) {
283 int rci = pthread_rwlock_wrlock(&iwkv->rwl);
284 if (rci) {
285 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
286 }
287 return 0;
288 }
289
_wnw(IWKV iwkv,iwrc (* after)(IWKV iwkv))290 static WUR iwrc _wnw(IWKV iwkv, iwrc (*after)(IWKV iwkv)) {
291 iwrc rc = 0;
292 int rci = pthread_mutex_lock(&iwkv->wk_mtx);
293 if (rci) {
294 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
295 }
296 iwkv->wk_pending_exclusive = true;
297 while (iwkv->wk_count > 0) {
298 pthread_cond_wait(&iwkv->wk_cond, &iwkv->wk_mtx);
299 }
300 if (after) {
301 rc = after(iwkv);
302 }
303 iwkv->wk_pending_exclusive = false;
304 pthread_cond_broadcast(&iwkv->wk_cond);
305 rci = pthread_mutex_unlock(&iwkv->wk_mtx);
306 if (rci) {
307 IWRC(iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci), rc);
308 }
309 return rc;
310 }
311
_wnw_db(IWDB db,iwrc (* after)(IWDB db))312 static WUR iwrc _wnw_db(IWDB db, iwrc (*after)(IWDB db)) {
313 iwrc rc = 0;
314 IWKV iwkv = db->iwkv;
315 int rci = pthread_mutex_lock(&iwkv->wk_mtx);
316 if (rci) {
317 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
318 }
319 db->wk_pending_exclusive = true;
320 while (db->wk_count > 0) {
321 pthread_cond_wait(&iwkv->wk_cond, &iwkv->wk_mtx);
322 }
323 if (after) {
324 rc = after(db);
325 }
326 db->wk_pending_exclusive = false;
327 pthread_cond_broadcast(&iwkv->wk_cond);
328 rci = pthread_mutex_unlock(&iwkv->wk_mtx);
329 if (rci) {
330 IWRC(iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci), rc);
331 }
332 return rc;
333 }
334
335 //-------------------------- DB
336
_db_at(IWKV iwkv,IWDB * dbp,off_t addr,uint8_t * mm)337 static WUR iwrc _db_at(IWKV iwkv, IWDB *dbp, off_t addr, uint8_t *mm) {
338 iwrc rc = 0;
339 uint8_t *rp, bv;
340 uint32_t lv;
341 int rci;
342 IWDB db = calloc(1, sizeof(struct _IWDB));
343 *dbp = 0;
344 if (!db) {
345 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
346 }
347 pthread_rwlockattr_t attr;
348 pthread_rwlockattr_init(&attr);
349 #if defined __linux__ && (defined __USE_UNIX98 || defined __USE_XOPEN2K)
350 pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
351 #endif
352 rci = pthread_rwlock_init(&db->rwl, &attr);
353 if (rci) {
354 free(db);
355 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
356 }
357 rci = pthread_spin_init(&db->cursors_slk, 0);
358 if (rci) {
359 pthread_rwlock_destroy(&db->rwl);
360 free(db);
361 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
362 }
363 // [magic:u4,dbflg:u1,dbid:u4,next_db_blk:u4,p0:u4,n[24]:u4,c[24]:u4,meta_blk:u4,meta_blkn:u4]:217
364 db->flags = SBLK_DB;
365 db->addr = addr;
366 db->db = db;
367 db->iwkv = iwkv;
368 rp = mm + addr;
369 IW_READLV(rp, lv, lv);
370 if (lv != IWDB_MAGIC) {
371 rc = IWKV_ERROR_CORRUPTED;
372 iwlog_ecode_error3(rc);
373 goto finish;
374 }
375 IW_READBV(rp, bv, db->dbflg);
376 IW_READLV(rp, lv, db->id);
377 IW_READLV(rp, lv, db->next_db_addr);
378 db->next_db_addr = BLK2ADDR(db->next_db_addr); // blknum -> addr
379 rp = mm + addr + DOFF_C0_U4;
380 for (int i = 0; i < SLEVELS; ++i) {
381 IW_READLV(rp, lv, db->lcnt[i]);
382 }
383 if (iwkv->fmt_version >= 1) {
384 IW_READLV(rp, lv, db->meta_blk);
385 IW_READLV(rp, lv, db->meta_blkn);
386 }
387 db->open = true;
388 *dbp = db;
389
390 finish:
391 if (rc) {
392 pthread_rwlock_destroy(&db->rwl);
393 free(db);
394 }
395 return rc;
396 }
397
_db_save(IWDB db,bool newdb,uint8_t * mm)398 static WUR iwrc _db_save(IWDB db, bool newdb, uint8_t *mm) {
399 iwrc rc = 0;
400 uint32_t lv;
401 uint8_t *wp = mm + db->addr, bv;
402 uint8_t *sp = wp;
403 IWDLSNR *dlsnr = db->iwkv->dlsnr;
404 db->next_db_addr = db->next ? db->next->addr : 0;
405 // [magic:u4,dbflg:u1,dbid:u4,next_db_blk:u4,p0:u4,n[24]:u4,c[24]:u4,meta_blk:u4,meta_blkn:u4]:217
406 IW_WRITELV(wp, lv, IWDB_MAGIC);
407 IW_WRITEBV(wp, bv, db->dbflg);
408 IW_WRITELV(wp, lv, db->id);
409 IW_WRITELV(wp, lv, ADDR2BLK(db->next_db_addr));
410 if (dlsnr) {
411 rc = dlsnr->onwrite(dlsnr, db->addr, sp, wp - sp, 0);
412 RCRET(rc);
413 }
414 if (db->iwkv->fmt_version >= 1) {
415 if (newdb) {
416 memset(wp, 0, 4 + SLEVELS * 4 * 2); // p0 + n[24] + c[24]
417 sp = wp;
418 wp += 4 + SLEVELS * 4 * 2; // set to zero
419 } else {
420 wp += 4 + SLEVELS * 4 * 2; // skip
421 sp = wp;
422 }
423 IW_WRITELV(wp, lv, db->meta_blk);
424 IW_WRITELV(wp, lv, db->meta_blkn);
425 if (dlsnr) {
426 rc = dlsnr->onwrite(dlsnr, sp - mm, sp, wp - sp, 0);
427 }
428 }
429 return rc;
430 }
431
_db_load_chain(IWKV iwkv,off_t addr,uint8_t * mm)432 static WUR iwrc _db_load_chain(IWKV iwkv, off_t addr, uint8_t *mm) {
433 iwrc rc;
434 IWDB db = 0, ndb;
435 if (!addr) {
436 return 0;
437 }
438 do {
439 rc = _db_at(iwkv, &ndb, addr, mm);
440 RCRET(rc);
441
442 if (db) {
443 db->next = ndb;
444 ndb->prev = db;
445 } else {
446 iwkv->first_db = ndb;
447 }
448 db = ndb;
449 addr = db->next_db_addr;
450
451 rc = iwhmap_put_u32(iwkv->dbs, db->id, db);
452 RCRET(rc);
453
454 iwkv->last_db = db;
455 } while (db->next_db_addr);
456
457 return 0;
458 }
459
_db_release_lw(IWDB * dbp)460 static void _db_release_lw(IWDB *dbp) {
461 assert(dbp && *dbp);
462 IWDB db = *dbp;
463 pthread_rwlock_destroy(&db->rwl);
464 pthread_spin_destroy(&db->cursors_slk);
465 free(db);
466 *dbp = 0;
467 }
468
469 typedef struct DISPOSE_DB_CTX {
470 IWKV iwkv;
471 IWDB db;
472 blkn_t sbn; // First `SBLK` block in DB
473 } DISPOSE_DB_CTX;
474
_db_dispose_chain(DISPOSE_DB_CTX * dctx)475 static iwrc _db_dispose_chain(DISPOSE_DB_CTX *dctx) {
476 iwrc rc = 0;
477 uint8_t *mm, kvszpow;
478 IWFS_FSM *fsm = &dctx->iwkv->fsm;
479 blkn_t sbn = dctx->sbn, kvblkn;
480 off_t page = 0;
481
482 while (sbn) {
483 off_t sba = BLK2ADDR(sbn);
484 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
485 RCBREAK(rc);
486 memcpy(&kvblkn, mm + sba + SOFF_KBLK_U4, 4);
487 kvblkn = IW_ITOHL(kvblkn);
488 memcpy(&sbn, mm + sba + SOFF_N0_U4, 4);
489 sbn = IW_ITOHL(sbn);
490 if (kvblkn) {
491 memcpy(&kvszpow, mm + BLK2ADDR(kvblkn) + KBLK_SZPOW_OFF, 1);
492 }
493 if (dctx->iwkv->fmt_version > 1) {
494 uint8_t bpos;
495 memcpy(&bpos, mm + sba + SOFF_BPOS_U1_V2, 1);
496 rc = fsm->release_mmap(fsm);
497 RCBREAK(rc);
498 if ((bpos > 0) && (bpos <= SBLK_PAGE_SBLK_NUM_V2)) {
499 off_t npage = sba - (bpos - 1) * SBLK_SZ;
500 if (npage != page) {
501 if (page) {
502 if (!fsm->check_allocation_status(fsm, page, SBLK_PAGE_SZ_V2, true)) {
503 rc = fsm->deallocate(fsm, page, SBLK_PAGE_SZ_V2);
504 }
505 RCBREAK(rc);
506 }
507 page = npage;
508 }
509 }
510 } else {
511 rc = fsm->release_mmap(fsm);
512 RCBREAK(rc);
513 // Deallocate `SBLK`
514 rc = fsm->deallocate(fsm, sba, SBLK_SZ);
515 RCBREAK(rc);
516 }
517 // Deallocate `KVBLK`
518 if (kvblkn) {
519 rc = fsm->deallocate(fsm, BLK2ADDR(kvblkn), 1ULL << kvszpow);
520 RCBREAK(rc);
521 }
522 }
523 if (page) {
524 if (!fsm->check_allocation_status(fsm, page, SBLK_PAGE_SZ_V2, true)) {
525 IWRC(fsm->deallocate(fsm, page, SBLK_PAGE_SZ_V2), rc);
526 }
527 }
528 _db_release_lw(&dctx->db);
529 return rc;
530 }
531
_db_destroy_lw(IWDB * dbp)532 static WUR iwrc _db_destroy_lw(IWDB *dbp) {
533 iwrc rc;
534 uint8_t *mm;
535 IWDB db = *dbp;
536 IWKV iwkv = db->iwkv;
537 IWDB prev = db->prev;
538 IWDB next = db->next;
539 IWFS_FSM *fsm = &iwkv->fsm;
540 uint32_t first_sblkn;
541
542 if (!iwhmap_get_u32(iwkv->dbs, db->id)) {
543 iwlog_ecode_error3(IW_ERROR_INVALID_STATE);
544 return IW_ERROR_INVALID_STATE;
545 }
546 iwhmap_remove_u32(iwkv->dbs, db->id);
547
548 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
549 RCRET(rc);
550 if (prev) {
551 prev->next = next;
552 rc = _db_save(prev, false, mm);
553 if (rc) {
554 fsm->release_mmap(fsm);
555 return rc;
556 }
557 }
558 if (next) {
559 next->prev = prev;
560 rc = _db_save(next, false, mm);
561 if (rc) {
562 fsm->release_mmap(fsm);
563 return rc;
564 }
565 }
566 // [magic:u4,dbflg:u1,dbid:u4,next_db_blk:u4,p0:u4,n[24]:u4,c[24]:u4,meta_blk:u4,meta_blkn:u4]:217
567 memcpy(&first_sblkn, mm + db->addr + DOFF_N0_U4, 4);
568 first_sblkn = IW_ITOHL(first_sblkn);
569 fsm->release_mmap(fsm);
570
571 if (iwkv->first_db && (iwkv->first_db->addr == db->addr)) {
572 uint64_t llv;
573 db->iwkv->first_db = next;
574 llv = next ? (uint64_t) next->addr : 0;
575 llv = IW_HTOILL(llv);
576 rc = fsm->writehdr(fsm, sizeof(uint32_t) /*skip magic*/, &llv, sizeof(llv));
577 }
578 if (iwkv->last_db && (iwkv->last_db->addr == db->addr)) {
579 iwkv->last_db = prev;
580 }
581 // Cleanup DB
582 off_t db_addr = db->addr;
583 blkn_t meta_blk = db->meta_blk;
584 blkn_t meta_blkn = db->meta_blkn;
585 db->open = false;
586
587 DISPOSE_DB_CTX dctx = {
588 .sbn = first_sblkn,
589 .iwkv = iwkv,
590 .db = db
591 };
592 IWRC(_db_dispose_chain(&dctx), rc);
593 if (meta_blk && meta_blkn) {
594 IWRC(fsm->deallocate(fsm, BLK2ADDR(meta_blk), BLK2ADDR(meta_blkn)), rc);
595 }
596 IWRC(fsm->deallocate(fsm, db_addr, DB_SZ), rc);
597 return rc;
598 }
599
_db_create_lw(IWKV iwkv,dbid_t dbid,iwdb_flags_t dbflg,IWDB * odb)600 static WUR iwrc _db_create_lw(IWKV iwkv, dbid_t dbid, iwdb_flags_t dbflg, IWDB *odb) {
601 iwrc rc;
602 int rci;
603 uint8_t *mm = 0;
604 off_t baddr = 0, blen;
605 IWFS_FSM *fsm = &iwkv->fsm;
606 *odb = 0;
607 IWDB db = calloc(1, sizeof(struct _IWDB));
608 if (!db) {
609 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
610 }
611 pthread_rwlockattr_t attr;
612 pthread_rwlockattr_init(&attr);
613 #if defined __linux__ && (defined __USE_UNIX98 || defined __USE_XOPEN2K)
614 pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
615 #endif
616 rci = pthread_rwlock_init(&db->rwl, &attr);
617 if (rci) {
618 free(db);
619 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
620 }
621 rci = pthread_spin_init(&db->cursors_slk, 0);
622 if (rci) {
623 pthread_rwlock_destroy(&db->rwl);
624 free(db);
625 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
626 }
627 rc = fsm->allocate(fsm, DB_SZ, &baddr, &blen, IWKV_FSM_ALLOC_FLAGS);
628 if (rc) {
629 _db_release_lw(&db);
630 return rc;
631 }
632 db->iwkv = iwkv;
633 db->dbflg = dbflg;
634 db->addr = baddr;
635 db->id = dbid;
636 db->prev = iwkv->last_db;
637 if (!iwkv->first_db) {
638 uint64_t llv;
639 iwkv->first_db = db;
640 llv = (uint64_t) db->addr;
641 llv = IW_HTOILL(llv);
642 rc = fsm->writehdr(fsm, sizeof(uint32_t) /*skip magic*/, &llv, sizeof(llv));
643 } else if (iwkv->last_db) {
644 iwkv->last_db->next = db;
645 }
646
647 RCC(rc, finish, iwhmap_put_u32(iwkv->dbs, db->id, db));
648 iwkv->last_db = db;
649
650 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
651 RCGO(rc, finish);
652 rc = _db_save(db, true, mm);
653 RCGO(rc, finish);
654 if (db->prev) {
655 rc = _db_save(db->prev, false, mm);
656 RCGO(rc, finish);
657 }
658 db->open = true;
659 *odb = db;
660
661 finish:
662 if (mm) {
663 fsm->release_mmap(fsm);
664 }
665 if (rc) {
666 fsm->deallocate(fsm, baddr, blen);
667 _db_release_lw(&db);
668 }
669 return rc;
670 }
671
672 //-------------------------- KVBLK
673
_kvblk_create(IWLCTX * lx,off_t baddr,uint8_t kvbpow,KVBLK ** oblk)674 IW_INLINE void _kvblk_create(IWLCTX *lx, off_t baddr, uint8_t kvbpow, KVBLK **oblk) {
675 KVBLK *kblk = &lx->kaa[lx->kaan];
676 kblk->db = lx->db;
677 kblk->addr = baddr;
678 kblk->maxoff = 0;
679 kblk->idxsz = 2 * IW_VNUMSIZE(0) * KVBLK_IDXNUM;
680 kblk->zidx = 0;
681 kblk->szpow = kvbpow;
682 kblk->flags = KVBLK_DURTY;
683 memset(kblk->pidx, 0, sizeof(kblk->pidx));
684 *oblk = kblk;
685 AAPOS_INC(lx->kaan);
686 }
687
_kvblk_key_peek(const KVBLK * kb,uint8_t idx,const uint8_t * mm,uint8_t ** obuf,uint32_t * olen)688 IW_INLINE WUR iwrc _kvblk_key_peek(
689 const KVBLK *kb,
690 uint8_t idx, const uint8_t *mm, uint8_t **obuf,
691 uint32_t *olen
692 ) {
693 if (kb->pidx[idx].len) {
694 uint32_t klen, step;
695 const uint8_t *rp = mm + kb->addr + (1ULL << kb->szpow) - kb->pidx[idx].off;
696 IW_READVNUMBUF(rp, klen, step);
697 if (!klen) {
698 *obuf = 0;
699 *olen = 0;
700 iwlog_ecode_error3(IWKV_ERROR_CORRUPTED);
701 return IWKV_ERROR_CORRUPTED;
702 }
703 rp += step;
704 *obuf = (uint8_t*) rp;
705 *olen = klen;
706 } else {
707 *obuf = 0;
708 *olen = 0;
709 }
710 return 0;
711 }
712
_kvblk_value_peek(const KVBLK * kb,uint8_t idx,const uint8_t * mm,uint8_t ** obuf,uint32_t * olen)713 IW_INLINE void _kvblk_value_peek(const KVBLK *kb, uint8_t idx, const uint8_t *mm, uint8_t **obuf, uint32_t *olen) {
714 assert(idx < KVBLK_IDXNUM);
715 if (kb->pidx[idx].len) {
716 uint32_t klen, step;
717 const uint8_t *rp = mm + kb->addr + (1ULL << kb->szpow) - kb->pidx[idx].off;
718 IW_READVNUMBUF(rp, klen, step);
719 rp += step;
720 rp += klen;
721 *obuf = (uint8_t*) rp;
722 *olen = kb->pidx[idx].len - klen - step;
723 } else {
724 *obuf = 0;
725 *olen = 0;
726 }
727 }
728
_kvblk_key_get(KVBLK * kb,uint8_t * mm,uint8_t idx,IWKV_val * key)729 static WUR iwrc _kvblk_key_get(KVBLK *kb, uint8_t *mm, uint8_t idx, IWKV_val *key) {
730 assert(mm && idx < KVBLK_IDXNUM);
731 int32_t klen;
732 int step;
733 KVP *kvp = &kb->pidx[idx];
734 key->compound = 0;
735 if (!kvp->len) {
736 key->data = 0;
737 key->size = 0;
738 return 0;
739 }
740 // [klen:vn,key,value]
741 uint8_t *rp = mm + kb->addr + (1ULL << kb->szpow) - kvp->off;
742 IW_READVNUMBUF(rp, klen, step);
743 rp += step;
744 if ((klen < 1) || (klen > kvp->len) || (klen > kvp->off)) {
745 iwlog_ecode_error3(IWKV_ERROR_CORRUPTED);
746 return IWKV_ERROR_CORRUPTED;
747 }
748 key->size = (size_t) klen;
749 if (kb->db->dbflg & IWDB_VNUM64_KEYS) {
750 // Needed to provide enough buffer in _unpack_effective_key()
751 key->data = malloc(MAX(key->size, sizeof(int64_t)));
752 } else {
753 key->data = malloc(key->size);
754 }
755 if (!key->data) {
756 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
757 }
758 memcpy(key->data, rp, key->size);
759 return 0;
760 }
761
_kvblk_value_get(KVBLK * kb,uint8_t * mm,uint8_t idx,IWKV_val * val)762 static WUR iwrc _kvblk_value_get(KVBLK *kb, uint8_t *mm, uint8_t idx, IWKV_val *val) {
763 assert(mm && idx < KVBLK_IDXNUM);
764 int32_t klen;
765 int step;
766 KVP *kvp = &kb->pidx[idx];
767 val->compound = 0;
768 if (!kvp->len) {
769 val->data = 0;
770 val->size = 0;
771 return 0;
772 }
773 // [klen:vn,key,value]
774 uint8_t *rp = mm + kb->addr + (1ULL << kb->szpow) - kvp->off;
775 IW_READVNUMBUF(rp, klen, step);
776 rp += step;
777 if ((klen < 1) || (klen > kvp->len) || (klen > kvp->off)) {
778 iwlog_ecode_error3(IWKV_ERROR_CORRUPTED);
779 return IWKV_ERROR_CORRUPTED;
780 }
781 rp += klen;
782 if (kvp->len > klen + step) {
783 val->size = kvp->len - klen - step;
784 val->data = malloc(val->size);
785 if (!val->data) {
786 iwrc rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
787 val->size = 0;
788 return rc;
789 }
790 memcpy(val->data, rp, val->size);
791 } else {
792 val->data = 0;
793 val->size = 0;
794 }
795 return 0;
796 }
797
_kvblk_kv_get(KVBLK * kb,uint8_t * mm,uint8_t idx,IWKV_val * key,IWKV_val * val)798 static WUR iwrc _kvblk_kv_get(KVBLK *kb, uint8_t *mm, uint8_t idx, IWKV_val *key, IWKV_val *val) {
799 assert(mm && idx < KVBLK_IDXNUM);
800 int32_t klen;
801 int step;
802 KVP *kvp = &kb->pidx[idx];
803 key->compound = 0;
804 val->compound = 0;
805 if (!kvp->len) {
806 key->data = 0;
807 key->size = 0;
808 val->data = 0;
809 val->size = 0;
810 return 0;
811 }
812 // [klen:vn,key,value]
813 uint8_t *rp = mm + kb->addr + (1ULL << kb->szpow) - kvp->off;
814 IW_READVNUMBUF(rp, klen, step);
815 rp += step;
816 if ((klen < 1) || (klen > kvp->len) || (klen > kvp->off)) {
817 iwlog_ecode_error3(IWKV_ERROR_CORRUPTED);
818 return IWKV_ERROR_CORRUPTED;
819 }
820 key->size = (size_t) klen;
821 if (kb->db->dbflg & IWDB_VNUM64_KEYS) {
822 // Needed to provide enough buffer in _unpack_effective_key()
823 key->data = malloc(MAX(key->size, sizeof(int64_t)));
824 } else {
825 key->data = malloc(key->size);
826 }
827 if (!key->data) {
828 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
829 }
830 memcpy(key->data, rp, key->size);
831 rp += klen;
832 if (kvp->len > klen + step) {
833 val->size = kvp->len - klen - step;
834 val->data = malloc(val->size);
835 if (!val->data) {
836 iwrc rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
837 free(key->data);
838 key->data = 0;
839 key->size = 0;
840 val->size = 0;
841 return rc;
842 }
843 memcpy(val->data, rp, val->size);
844 } else {
845 val->data = 0;
846 val->size = 0;
847 }
848 return 0;
849 }
850
_kvblk_at_mm(IWLCTX * lx,off_t addr,uint8_t * mm,KVBLK * kbp,KVBLK ** blkp)851 static WUR iwrc _kvblk_at_mm(IWLCTX *lx, off_t addr, uint8_t *mm, KVBLK *kbp, KVBLK **blkp) {
852 uint8_t *rp;
853 uint16_t sv;
854 int step;
855 iwrc rc = 0;
856 KVBLK *kb = kbp ? kbp : &lx->kaa[lx->kaan];
857 kb->db = lx->db;
858 kb->addr = addr;
859 kb->maxoff = 0;
860 kb->idxsz = 0;
861 kb->zidx = -1;
862 kb->szpow = 0;
863 kb->flags = KVBLK_DEFAULT;
864 memset(kb->pidx, 0, sizeof(kb->pidx));
865
866 *blkp = 0;
867 rp = mm + addr;
868 memcpy(&kb->szpow, rp, 1);
869 rp += 1;
870 IW_READSV(rp, sv, kb->idxsz);
871 if (IW_UNLIKELY(kb->idxsz > KVBLK_MAX_IDX_SZ)) {
872 rc = IWKV_ERROR_CORRUPTED;
873 iwlog_ecode_error3(rc);
874 goto finish;
875 }
876 for (uint8_t i = 0; i < KVBLK_IDXNUM; ++i) {
877 IW_READVNUMBUF64(rp, kb->pidx[i].off, step);
878 rp += step;
879 IW_READVNUMBUF(rp, kb->pidx[i].len, step);
880 rp += step;
881 if (kb->pidx[i].len) {
882 if (IW_UNLIKELY(!kb->pidx[i].off)) {
883 rc = IWKV_ERROR_CORRUPTED;
884 iwlog_ecode_error3(rc);
885 goto finish;
886 }
887 if (kb->pidx[i].off > kb->maxoff) {
888 kb->maxoff = kb->pidx[i].off;
889 }
890 } else if (kb->zidx < 0) {
891 kb->zidx = i;
892 }
893 kb->pidx[i].ridx = i;
894 }
895 *blkp = kb;
896 assert(rp - (mm + addr) <= (1ULL << kb->szpow));
897 if (!kbp) {
898 AAPOS_INC(lx->kaan);
899 }
900
901 finish:
902 return rc;
903 }
904
_kvblk_compacted_offset(KVBLK * kb)905 IW_INLINE off_t _kvblk_compacted_offset(KVBLK *kb) {
906 off_t coff = 0;
907 for (int i = 0; i < KVBLK_IDXNUM; ++i) {
908 coff += kb->pidx[i].len;
909 }
910 return coff;
911 }
912
_kvblk_compacted_dsize(KVBLK * kb)913 IW_INLINE off_t _kvblk_compacted_dsize(KVBLK *kb) {
914 off_t coff = KVBLK_HDRSZ;
915 for (int i = 0; i < KVBLK_IDXNUM; ++i) {
916 coff += kb->pidx[i].len;
917 coff += IW_VNUMSIZE32(kb->pidx[i].len);
918 coff += IW_VNUMSIZE(kb->pidx[i].off);
919 }
920 return coff;
921 }
922
_kvblk_sync_mm(KVBLK * kb,uint8_t * mm)923 static WUR iwrc _kvblk_sync_mm(KVBLK *kb, uint8_t *mm) {
924 iwrc rc = 0;
925 if (!(kb->flags & KVBLK_DURTY)) {
926 return rc;
927 }
928 uint16_t sp;
929 uint8_t *szp;
930 uint8_t *wp = mm + kb->addr;
931 uint8_t *sptr = wp;
932 IWDLSNR *dlsnr = kb->db->iwkv->dlsnr;
933 memcpy(wp, &kb->szpow, 1);
934 wp += 1;
935 szp = wp;
936 wp += sizeof(uint16_t);
937 for (int i = 0; i < KVBLK_IDXNUM; ++i) {
938 KVP *kvp = &kb->pidx[i];
939 IW_SETVNUMBUF64(sp, wp, kvp->off);
940 wp += sp;
941 IW_SETVNUMBUF(sp, wp, kvp->len);
942 wp += sp;
943 }
944 sp = wp - szp - sizeof(uint16_t);
945 kb->idxsz = sp;
946 assert(kb->idxsz <= KVBLK_MAX_IDX_SZ);
947 sp = IW_HTOIS(sp);
948 memcpy(szp, &sp, sizeof(uint16_t));
949 assert(wp - (mm + kb->addr) <= (1ULL << kb->szpow));
950 if (dlsnr) {
951 rc = dlsnr->onwrite(dlsnr, kb->addr, sptr, wp - sptr, 0);
952 }
953 kb->flags &= ~KVBLK_DURTY;
954 return rc;
955 }
956
957 #define _kvblk_sort_kv_lt(v1, v2, o) \
958 (((v1).off > 0 ? (v1).off : -1UL) < ((v2).off > 0 ? (v2).off : -1UL))
959
960 // -V:KSORT_INIT:522, 756, 769
KSORT_INIT(kvblk,KVP,_kvblk_sort_kv_lt)961 KSORT_INIT(kvblk, KVP, _kvblk_sort_kv_lt)
962
963 static WUR iwrc _kvblk_compact_mm(KVBLK *kb, uint8_t *mm) {
964 uint8_t i;
965 off_t coff = _kvblk_compacted_offset(kb);
966 if (coff == kb->maxoff) { // compacted
967 return 0;
968 }
969 KVP tidx[KVBLK_IDXNUM];
970 KVP tidx_tmp[KVBLK_IDXNUM];
971 iwrc rc = 0;
972 uint16_t idxsiz = 0;
973 IWDLSNR *dlsnr = kb->db->iwkv->dlsnr;
974 off_t blkend = kb->addr + (1ULL << kb->szpow);
975 uint8_t *wp = mm + blkend;
976 memcpy(tidx, kb->pidx, sizeof(tidx));
977 ks_mergesort_kvblk(KVBLK_IDXNUM, tidx, tidx_tmp, 0);
978
979 coff = 0;
980 for (i = 0; i < KVBLK_IDXNUM && tidx[i].off; ++i) {
981 #ifndef NDEBUG
982 if (i > 0) {
983 assert(tidx[i - 1].off < tidx[i].off);
984 }
985 #endif
986 KVP *kvp = &kb->pidx[tidx[i].ridx];
987 off_t noff = coff + kvp->len;
988 if (kvp->off > noff) {
989 assert(noff <= (1ULL << kb->szpow) && kvp->len <= noff);
990 if (dlsnr) {
991 rc = dlsnr->onwrite(dlsnr, blkend - noff, wp - kvp->off, kvp->len, 0);
992 }
993 memmove(wp - noff, wp - kvp->off, kvp->len);
994 kvp->off = noff;
995 }
996 coff += kvp->len;
997 idxsiz += IW_VNUMSIZE(kvp->off);
998 idxsiz += IW_VNUMSIZE32(kvp->len);
999 }
1000 idxsiz += (KVBLK_IDXNUM - i) * 2;
1001 for (i = 0; i < KVBLK_IDXNUM; ++i) {
1002 if (!kb->pidx[i].len) {
1003 kb->zidx = i;
1004 break;
1005 }
1006 }
1007 assert(idxsiz <= kb->idxsz);
1008 kb->idxsz = idxsiz;
1009 kb->maxoff = coff;
1010 if (i == KVBLK_IDXNUM) {
1011 kb->zidx = -1;
1012 }
1013 kb->flags |= KVBLK_DURTY;
1014 assert(_kvblk_compacted_offset(kb) == kb->maxoff);
1015 return rc;
1016 }
1017
_kvblk_maxkvoff(KVBLK * kb)1018 IW_INLINE off_t _kvblk_maxkvoff(KVBLK *kb) {
1019 off_t off = 0;
1020 for (int i = 0; i < KVBLK_IDXNUM; ++i) {
1021 if (kb->pidx[i].off > off) {
1022 off = kb->pidx[i].off;
1023 }
1024 }
1025 return off;
1026 }
1027
_kvblk_rmkv(KVBLK * kb,uint8_t idx,kvblk_rmkv_opts_t opts)1028 static WUR iwrc _kvblk_rmkv(KVBLK *kb, uint8_t idx, kvblk_rmkv_opts_t opts) {
1029 iwrc rc = 0;
1030 uint8_t *mm = 0;
1031 IWDLSNR *dlsnr = kb->db->iwkv->dlsnr;
1032 IWFS_FSM *fsm = &kb->db->iwkv->fsm;
1033 if (kb->pidx[idx].off >= kb->maxoff) {
1034 kb->maxoff = 0;
1035 for (int i = 0; i < KVBLK_IDXNUM; ++i) {
1036 if ((i != idx) && (kb->pidx[i].off > kb->maxoff)) {
1037 kb->maxoff = kb->pidx[i].off;
1038 }
1039 }
1040 }
1041 kb->pidx[idx].len = 0;
1042 kb->pidx[idx].off = 0;
1043 kb->flags |= KVBLK_DURTY;
1044 if ((kb->zidx < 0) || (idx < kb->zidx)) {
1045 kb->zidx = idx;
1046 }
1047 if (!(RMKV_NO_RESIZE & opts) && (kb->szpow > KVBLK_INISZPOW)) {
1048 off_t nlen = 1ULL << kb->szpow;
1049 off_t dsz = _kvblk_compacted_dsize(kb);
1050 if (nlen >= 2 * dsz) {
1051 uint8_t npow = kb->szpow - 1;
1052 while (npow > KVBLK_INISZPOW && (1ULL << (npow - 1)) >= dsz) {
1053 --npow;
1054 }
1055 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1056 RCGO(rc, finish);
1057
1058 rc = _kvblk_compact_mm(kb, mm);
1059 RCGO(rc, finish);
1060
1061 off_t maxoff = _kvblk_maxkvoff(kb);
1062 if (dlsnr) {
1063 rc = dlsnr->onwrite(dlsnr, kb->addr + (1ULL << npow) - maxoff, mm + kb->addr + nlen - maxoff, maxoff, 0);
1064 RCGO(rc, finish);
1065 }
1066 memmove(mm + kb->addr + (1ULL << npow) - maxoff,
1067 mm + kb->addr + nlen - maxoff,
1068 (size_t) maxoff);
1069
1070 fsm->release_mmap(fsm);
1071 mm = 0;
1072 rc = fsm->reallocate(fsm, (1ULL << npow), &kb->addr, &nlen, IWKV_FSM_ALLOC_FLAGS);
1073 RCGO(rc, finish);
1074 kb->szpow = npow;
1075 assert(nlen == (1ULL << kb->szpow));
1076 opts |= RMKV_SYNC;
1077 }
1078 }
1079 if (RMKV_SYNC & opts) {
1080 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1081 RCGO(rc, finish);
1082 IWRC(_kvblk_sync_mm(kb, mm), rc);
1083 }
1084
1085 finish:
1086 if (mm) {
1087 fsm->release_mmap(fsm);
1088 }
1089 return rc;
1090 }
1091
_kvblk_addkv(KVBLK * kb,const IWKV_val * key,const IWKV_val * val,uint8_t * oidx,bool raw_key)1092 static WUR iwrc _kvblk_addkv(
1093 KVBLK *kb,
1094 const IWKV_val *key,
1095 const IWKV_val *val,
1096 uint8_t *oidx,
1097 bool raw_key
1098 ) {
1099 *oidx = 0;
1100
1101 iwrc rc = 0;
1102 off_t msz; // max available free space
1103 off_t rsz; // required size to add new key/value pair
1104 off_t noff; // offset of new kvpair from end of block
1105 uint8_t *mm, *wp, *sptr;
1106 size_t i, sp;
1107 KVP *kvp;
1108 IWDB db = kb->db;
1109 bool compound = !raw_key && (db->dbflg & IWDB_COMPOUND_KEYS);
1110 IWFS_FSM *fsm = &db->iwkv->fsm;
1111 bool compacted = false;
1112 IWDLSNR *dlsnr = kb->db->iwkv->dlsnr;
1113 IWKV_val *uval = (IWKV_val*) val;
1114
1115 size_t ksize = key->size;
1116 if (compound) {
1117 ksize += IW_VNUMSIZE(key->compound);
1118 }
1119 off_t psz = IW_VNUMSIZE(ksize) + ksize;
1120
1121 if (kb->zidx < 0) {
1122 return _IWKV_RC_KVBLOCK_FULL;
1123 }
1124 psz += uval->size;
1125 if (psz > IWKV_MAX_KVSZ) {
1126 return IWKV_ERROR_MAXKVSZ;
1127 }
1128
1129 start:
1130 // [szpow:u1,idxsz:u2,[ps0:vn,pl0:vn,..., ps32,pl32]____[[KV],...]] // KVBLK
1131 msz = (1ULL << kb->szpow) - (KVBLK_HDRSZ + kb->idxsz + kb->maxoff);
1132 assert(msz >= 0);
1133 noff = kb->maxoff + psz;
1134 rsz = psz + IW_VNUMSIZE(noff) + IW_VNUMSIZE(psz);
1135
1136 if (msz < rsz) { // not enough space
1137 if (!compacted) {
1138 compacted = true;
1139 if (_kvblk_compacted_offset(kb) != kb->maxoff) {
1140 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1141 RCGO(rc, finish);
1142 rc = _kvblk_compact_mm(kb, mm);
1143 RCGO(rc, finish);
1144 fsm->release_mmap(fsm);
1145 goto start;
1146 }
1147 }
1148 // resize the whole block
1149 off_t nlen = 1ULL << kb->szpow;
1150 off_t nsz = rsz - msz + nlen;
1151 off_t naddr = kb->addr;
1152 off_t olen = nlen;
1153
1154 uint8_t npow = kb->szpow;
1155 while ((1ULL << ++npow) < nsz);
1156
1157 rc = fsm->allocate(fsm, (1ULL << npow), &naddr, &nlen, IWKV_FSM_ALLOC_FLAGS);
1158 RCGO(rc, finish);
1159 assert(nlen == (1ULL << npow));
1160 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1161 RCGO(rc, finish);
1162 if (dlsnr) {
1163 rc = dlsnr->onwrite(dlsnr, naddr, mm + kb->addr, KVBLK_HDRSZ, 0);
1164 RCGO(rc, finish);
1165 memcpy(mm + naddr, mm + kb->addr, KVBLK_HDRSZ);
1166 rc = dlsnr->onwrite(dlsnr, naddr + nlen - kb->maxoff, mm + kb->addr + olen - kb->maxoff, kb->maxoff, 0);
1167 RCGO(rc, finish);
1168 memcpy(mm + naddr + nlen - kb->maxoff, mm + kb->addr + olen - kb->maxoff, (size_t) kb->maxoff);
1169 } else {
1170 memcpy(mm + naddr, mm + kb->addr, KVBLK_HDRSZ);
1171 memcpy(mm + naddr + nlen - kb->maxoff, mm + kb->addr + olen - kb->maxoff, (size_t) kb->maxoff);
1172 }
1173 fsm->release_mmap(fsm);
1174 rc = fsm->deallocate(fsm, kb->addr, olen);
1175 RCGO(rc, finish);
1176
1177 kb->addr = naddr;
1178 kb->szpow = npow;
1179 }
1180 *oidx = (uint8_t) kb->zidx;
1181 kvp = &kb->pidx[kb->zidx];
1182 kvp->len = (uint32_t) psz;
1183 kvp->off = noff;
1184 kvp->ridx = (uint8_t) kb->zidx;
1185 kb->maxoff = noff;
1186 kb->flags |= KVBLK_DURTY;
1187 for (i = 0; i < KVBLK_IDXNUM; ++i) {
1188 if (!kb->pidx[i].len && (i != kb->zidx)) {
1189 kb->zidx = i;
1190 break;
1191 }
1192 }
1193 if (i >= KVBLK_IDXNUM) {
1194 kb->zidx = -1;
1195 }
1196 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1197 RCGO(rc, finish);
1198 assert((1ULL << kb->szpow) >= KVBLK_HDRSZ + kb->idxsz + kb->maxoff);
1199 assert(kvp->off < (1ULL << kb->szpow) && kvp->len <= kvp->off);
1200 wp = mm + kb->addr + (1ULL << kb->szpow) - kvp->off;
1201 sptr = wp;
1202 // [klen:vn,key,value]
1203 IW_SETVNUMBUF(sp, wp, ksize);
1204 wp += sp;
1205 if (compound) {
1206 IW_SETVNUMBUF64(sp, wp, key->compound);
1207 wp += sp;
1208 }
1209 memcpy(wp, key->data, key->size);
1210 wp += key->size;
1211 if (uval->size) {
1212 memcpy(wp, uval->data, uval->size);
1213 wp += uval->size;
1214 }
1215 #ifndef NDEBUG
1216 assert(wp - sptr == kvp->len);
1217 #endif
1218 if (dlsnr) {
1219 rc = dlsnr->onwrite(dlsnr, kb->addr + (1ULL << kb->szpow) - kvp->off, sptr, wp - sptr, 0);
1220 }
1221 fsm->release_mmap(fsm);
1222
1223 finish:
1224 return rc;
1225 }
1226
_kvblk_updatev(KVBLK * kb,uint8_t * idxp,const IWKV_val * key,const IWKV_val * val)1227 static WUR iwrc _kvblk_updatev(
1228 KVBLK *kb,
1229 uint8_t *idxp,
1230 const IWKV_val *key, /* Nullable */
1231 const IWKV_val *val
1232 ) {
1233 assert(*idxp < KVBLK_IDXNUM);
1234 int32_t i;
1235 uint32_t len, nlen, sz;
1236 uint8_t pidx = *idxp, *mm = 0, *wp, *sp;
1237 IWDB db = kb->db;
1238 IWDLSNR *dlsnr = kb->db->iwkv->dlsnr;
1239 IWKV_val *uval = (IWKV_val*) val;
1240 IWKV_val *ukey = (IWKV_val*) key;
1241 IWKV_val skey; // stack allocated key/val
1242 KVP *kvp = &kb->pidx[pidx];
1243 size_t kbsz = 1ULL << kb->szpow; // kvblk size
1244 off_t freesz = kbsz - KVBLK_HDRSZ - kb->idxsz - kb->maxoff; // free space available
1245 IWFS_FSM *fsm = &db->iwkv->fsm;
1246
1247 iwrc rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1248 RCRET(rc);
1249 assert(freesz >= 0);
1250
1251 wp = mm + kb->addr + kbsz - kvp->off;
1252 sp = wp;
1253 IW_READVNUMBUF(wp, len, sz);
1254 wp += sz;
1255 if (ukey && (len != ukey->size)) {
1256 rc = IWKV_ERROR_CORRUPTED;
1257 iwlog_ecode_error3(rc);
1258 goto finish;
1259 }
1260 wp += len;
1261 off_t rsize = sz + len + uval->size; // required size
1262 if (rsize <= kvp->len) {
1263 memcpy(wp, uval->data, uval->size);
1264 if (dlsnr) {
1265 rc = dlsnr->onwrite(dlsnr, wp - mm, uval->data, uval->size, 0);
1266 RCGO(rc, finish);
1267 }
1268 wp += uval->size;
1269 if ((wp - sp) != kvp->len) {
1270 kvp->len = wp - sp;
1271 kb->flags |= KVBLK_DURTY;
1272 }
1273 } else {
1274 KVP tidx[KVBLK_IDXNUM];
1275 KVP tidx_tmp[KVBLK_IDXNUM];
1276 off_t koff = kb->pidx[pidx].off;
1277 memcpy(tidx, kb->pidx, KVBLK_IDXNUM * sizeof(kb->pidx[0]));
1278 ks_mergesort_kvblk(KVBLK_IDXNUM, tidx, tidx_tmp, 0);
1279 kb->flags |= KVBLK_DURTY;
1280 if (!ukey) { // we need a key
1281 ukey = &skey;
1282 rc = _kvblk_key_get(kb, mm, pidx, ukey);
1283 RCGO(rc, finish);
1284 }
1285 for (i = 0; i < KVBLK_IDXNUM; ++i) {
1286 if (tidx[i].off == koff) {
1287 if (koff - ((i > 0) ? tidx[i - 1].off : 0) >= rsize) {
1288 nlen = wp + uval->size - sp;
1289 if (!((nlen > kvp->len) && (freesz - IW_VNUMSIZE32(nlen) + IW_VNUMSIZE32(kvp->len) < 0))) { // enough space?
1290 memcpy(wp, uval->data, uval->size);
1291 if (dlsnr) {
1292 rc = dlsnr->onwrite(dlsnr, wp - mm, uval->data, uval->size, 0);
1293 RCGO(rc, finish);
1294 }
1295 wp += uval->size;
1296 kvp->len = nlen;
1297 break;
1298 ;
1299 }
1300 }
1301 mm = 0;
1302 fsm->release_mmap(fsm);
1303 rc = _kvblk_rmkv(kb, pidx, RMKV_NO_RESIZE);
1304 RCGO(rc, finish);
1305 rc = _kvblk_addkv(kb, ukey, uval, idxp, false);
1306 break;
1307 }
1308 }
1309 }
1310
1311 finish:
1312 if (ukey != key) {
1313 _kv_val_dispose(ukey);
1314 }
1315 if (mm) {
1316 IWRC(fsm->release_mmap(fsm), rc);
1317 }
1318 return rc;
1319 }
1320
1321 //-------------------------- SBLK
1322
_sblk_release(IWLCTX * lx,SBLK ** sblkp)1323 IW_INLINE void _sblk_release(IWLCTX *lx, SBLK **sblkp) {
1324 assert(sblkp && *sblkp);
1325 SBLK *sblk = *sblkp;
1326 sblk->flags &= ~SBLK_DURTY; // clear dirty flag
1327 sblk->kvblk = 0;
1328 *sblkp = 0;
1329 }
1330
_sblk_loadkvblk_mm(IWLCTX * lx,SBLK * sblk,uint8_t * mm)1331 IW_INLINE WUR iwrc _sblk_loadkvblk_mm(IWLCTX *lx, SBLK *sblk, uint8_t *mm) {
1332 if (!sblk->kvblk && sblk->kvblkn) {
1333 return _kvblk_at_mm(lx, BLK2ADDR(sblk->kvblkn), mm, 0, &sblk->kvblk);
1334 } else {
1335 return 0;
1336 }
1337 }
1338
_sblk_is_only_one_on_page_v2(IWLCTX * lx,uint8_t * mm,SBLK * sblk,off_t * page_addr)1339 static bool _sblk_is_only_one_on_page_v2(IWLCTX *lx, uint8_t *mm, SBLK *sblk, off_t *page_addr) {
1340 *page_addr = 0;
1341 if ((sblk->bpos > 0) && (sblk->bpos <= SBLK_PAGE_SBLK_NUM_V2)) {
1342 off_t addr = sblk->addr - (sblk->bpos - 1) * SBLK_SZ;
1343 *page_addr = addr;
1344 for (int i = 0; i < SBLK_PAGE_SBLK_NUM_V2; ++i) {
1345 if (i != sblk->bpos - 1) {
1346 uint8_t bv;
1347 memcpy(&bv, mm + addr + i * SBLK_SZ + SOFF_BPOS_U1_V2, 1);
1348 if (bv) {
1349 return false;
1350 }
1351 }
1352 }
1353 } else {
1354 return false; // be safe
1355 }
1356 return true;
1357 }
1358
_sblk_destroy(IWLCTX * lx,SBLK ** sblkp)1359 IW_INLINE WUR iwrc _sblk_destroy(IWLCTX *lx, SBLK **sblkp) {
1360 assert(sblkp && *sblkp && (*sblkp)->addr);
1361 iwrc rc = 0;
1362 SBLK *sblk = *sblkp;
1363 lx->destroy_addr = sblk->addr;
1364
1365 if (!(sblk->flags & SBLK_DB)) {
1366 uint8_t kvb_szpow, *mm;
1367 IWDLSNR *dlsnr = lx->db->iwkv->dlsnr;
1368 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
1369 off_t kvb_addr = BLK2ADDR(sblk->kvblkn);
1370 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1371 RCRET(rc);
1372
1373 if (!sblk->kvblk) {
1374 // Read KVBLK size as power of two
1375 memcpy(&kvb_szpow, mm + kvb_addr + KBLK_SZPOW_OFF, 1);
1376 } else {
1377 kvb_szpow = sblk->kvblk->szpow;
1378 }
1379 if (lx->db->lcnt[sblk->lvl]) {
1380 lx->db->lcnt[sblk->lvl]--;
1381 lx->db->flags |= SBLK_DURTY;
1382 }
1383 if (lx->db->iwkv->fmt_version > 1) {
1384 off_t paddr;
1385 if (_sblk_is_only_one_on_page_v2(lx, mm, sblk, &paddr)) {
1386 fsm->release_mmap(fsm);
1387 // Deallocate whole page
1388 rc = fsm->deallocate(fsm, paddr, SBLK_PAGE_SZ_V2);
1389 } else {
1390 memset(mm + sblk->addr + SOFF_BPOS_U1_V2, 0, 1);
1391 fsm->release_mmap(fsm);
1392 if (dlsnr) {
1393 dlsnr->onset(dlsnr, sblk->addr + SOFF_BPOS_U1_V2, 0, 1, 0);
1394 }
1395 }
1396 } else {
1397 fsm->release_mmap(fsm);
1398 rc = fsm->deallocate(fsm, sblk->addr, SBLK_SZ);
1399 }
1400 IWRC(fsm->deallocate(fsm, kvb_addr, 1ULL << kvb_szpow), rc);
1401 }
1402 _sblk_release(lx, sblkp);
1403 return rc;
1404 }
1405
_sblk_genlevel(IWDB db)1406 IW_INLINE uint8_t _sblk_genlevel(IWDB db) {
1407 uint8_t lvl;
1408 #ifdef IW_TESTS
1409 if (iwkv_next_level >= 0) {
1410 lvl = (uint8_t) iwkv_next_level;
1411 iwkv_next_level = -1;
1412 assert(lvl < SLEVELS);
1413 return lvl;
1414 }
1415 #endif
1416 uint32_t r = iwu_rand_u32();
1417 for (lvl = 0; lvl < SLEVELS && !(r & 1); ++lvl) r >>= 1;
1418 uint8_t ret = IW_UNLIKELY(lvl >= SLEVELS) ? SLEVELS - 1 : lvl;
1419 while (ret > 0 && db->lcnt[ret - 1] == 0) {
1420 --ret;
1421 }
1422 return ret;
1423 }
1424
_sblk_create_v1(IWLCTX * lx,uint8_t nlevel,uint8_t kvbpow,off_t baddr,uint8_t bpos,SBLK ** oblk)1425 static WUR iwrc _sblk_create_v1(IWLCTX *lx, uint8_t nlevel, uint8_t kvbpow, off_t baddr, uint8_t bpos, SBLK **oblk) {
1426 iwrc rc;
1427 SBLK *sblk;
1428 KVBLK *kvblk;
1429 off_t blen;
1430 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
1431 if (kvbpow < KVBLK_INISZPOW) {
1432 kvbpow = KVBLK_INISZPOW;
1433 }
1434 *oblk = 0;
1435 if (!bpos) {
1436 rc = fsm->allocate(fsm, SBLK_SZ + (1ULL << kvbpow), &baddr, &blen, IWKV_FSM_ALLOC_FLAGS);
1437 RCRET(rc);
1438 assert(blen - SBLK_SZ == (1ULL << kvbpow));
1439 _kvblk_create(lx, baddr + SBLK_SZ, kvbpow, &kvblk);
1440 } else {
1441 // Allocate kvblk as separate chunk
1442 off_t kblkaddr = 0;
1443 rc = fsm->allocate(fsm, (1ULL << kvbpow), &kblkaddr, &blen, IWKV_FSM_ALLOC_FLAGS);
1444 assert(blen == (1ULL << kvbpow));
1445 _kvblk_create(lx, kblkaddr, kvbpow, &kvblk);
1446 }
1447 sblk = &lx->saa[lx->saan];
1448 sblk->db = lx->db;
1449 sblk->db->lcnt[nlevel]++;
1450 sblk->db->flags |= SBLK_DURTY;
1451 sblk->addr = baddr;
1452 sblk->flags = SBLK_DURTY;
1453 sblk->lvl = nlevel;
1454 sblk->p0 = 0;
1455 memset(sblk->n, 0, sizeof(sblk->n));
1456 sblk->kvblk = kvblk;
1457 sblk->kvblkn = ADDR2BLK(kvblk->addr);
1458 sblk->lkl = 0;
1459 sblk->pnum = 0;
1460 sblk->bpos = bpos;
1461 memset(sblk->pi, 0, sizeof(sblk->pi));
1462 *oblk = sblk;
1463 AAPOS_INC(lx->saan);
1464 return 0;
1465 }
1466
_sblk_find_free_page_slot_v2(IWLCTX * lx,uint8_t * mm,SBLK * sblk,off_t * obaddr,uint8_t * oslot)1467 static void _sblk_find_free_page_slot_v2(IWLCTX *lx, uint8_t *mm, SBLK *sblk, off_t *obaddr, uint8_t *oslot) {
1468 if ((sblk->bpos < 1) || (sblk->bpos > SBLK_PAGE_SBLK_NUM_V2)) {
1469 *obaddr = 0;
1470 *oslot = 0;
1471 return;
1472 }
1473 off_t paddr = sblk->addr - (sblk->bpos - 1) * SBLK_SZ;
1474 for (int i = sblk->bpos + 1; i <= SBLK_PAGE_SBLK_NUM_V2; ++i) {
1475 uint8_t slot;
1476 memcpy(&slot, mm + paddr + (i - 1) * SBLK_SZ + SOFF_BPOS_U1_V2, 1);
1477 if (!slot) {
1478 *obaddr = paddr + (i - 1) * SBLK_SZ;
1479 *oslot = i;
1480 return;
1481 }
1482 }
1483 for (int i = sblk->bpos - 1; i > 0; --i) {
1484 uint8_t slot;
1485 memcpy(&slot, mm + paddr + (i - 1) * SBLK_SZ + SOFF_BPOS_U1_V2, 1);
1486 if (!slot) {
1487 *obaddr = paddr + (i - 1) * SBLK_SZ;
1488 *oslot = i;
1489 return;
1490 }
1491 }
1492 *obaddr = 0;
1493 *oslot = 0;
1494 }
1495
1496 /// Create
_sblk_create_v2(IWLCTX * lx,uint8_t nlevel,uint8_t kvbpow,SBLK * lower,SBLK * upper,SBLK ** oblk)1497 static WUR iwrc _sblk_create_v2(IWLCTX *lx, uint8_t nlevel, uint8_t kvbpow, SBLK *lower, SBLK *upper, SBLK **oblk) {
1498 off_t baddr = 0;
1499 uint8_t bpos = 0, *mm;
1500 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
1501 SBLK *_lower = lower;
1502 SBLK *_upper = upper;
1503
1504 for (int i = SLEVELS - 1; i >= 0; --i) {
1505 if (lx->pupper[i] && (lx->pupper[i]->lvl >= nlevel)) {
1506 _upper = lx->pupper[i];
1507 }
1508 if (lx->plower[i] && (lx->plower[i]->lvl >= nlevel)) {
1509 _lower = lx->plower[i];
1510 }
1511 }
1512
1513 iwrc rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1514 RCRET(rc);
1515 _sblk_find_free_page_slot_v2(lx, mm, _lower, &baddr, &bpos);
1516 if (!baddr && _upper && (_upper->addr != _lower->addr)) {
1517 _sblk_find_free_page_slot_v2(lx, mm, _upper, &baddr, &bpos);
1518 }
1519 if (!baddr) {
1520 if (_lower->addr != lower->addr) {
1521 _sblk_find_free_page_slot_v2(lx, mm, lower, &baddr, &bpos);
1522 }
1523 if (!baddr && upper && _upper && (_upper->addr != upper->addr)) {
1524 _sblk_find_free_page_slot_v2(lx, mm, upper, &baddr, &bpos);
1525 }
1526 }
1527 fsm->release_mmap(fsm);
1528
1529 if (!baddr) {
1530 // No free slots - allocate new SBLK page
1531 off_t blen;
1532 bpos = 1;
1533 IWDLSNR *dlsnr = lx->db->iwkv->dlsnr;
1534 rc = fsm->allocate(fsm, SBLK_PAGE_SZ_V2, &baddr, &blen, IWKV_FSM_ALLOC_FLAGS);
1535 RCRET(rc);
1536 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1537 RCRET(rc);
1538 // Fill page to zero
1539 memset(mm + baddr, 0, blen);
1540 if (dlsnr) {
1541 rc = dlsnr->onset(dlsnr, baddr, 0, blen, 0);
1542 }
1543 fsm->release_mmap(fsm);
1544 RCRET(rc);
1545 }
1546 return _sblk_create_v1(lx, nlevel, kvbpow, baddr, bpos, oblk);
1547 }
1548
_sblk_create(IWLCTX * lx,uint8_t nlevel,uint8_t kvbpow,SBLK * lower,SBLK * upper,SBLK ** oblk)1549 IW_INLINE WUR iwrc _sblk_create(IWLCTX *lx, uint8_t nlevel, uint8_t kvbpow, SBLK *lower, SBLK *upper, SBLK **oblk) {
1550 if (lx->db->iwkv->fmt_version > 1) {
1551 return _sblk_create_v2(lx, nlevel, kvbpow, lower, upper, oblk);
1552 } else {
1553 return _sblk_create_v1(lx, nlevel, kvbpow, lower->addr, 0, oblk);
1554 }
1555 }
1556
_sblk_at2(IWLCTX * lx,off_t addr,sblk_flags_t flgs,SBLK * sblk)1557 static WUR iwrc _sblk_at2(IWLCTX *lx, off_t addr, sblk_flags_t flgs, SBLK *sblk) {
1558 iwrc rc;
1559 uint8_t *mm;
1560 uint32_t lv;
1561 sblk_flags_t flags = lx->sbflags | flgs;
1562 IWDB db = lx->db;
1563 IWFS_FSM *fsm = &db->iwkv->fsm;
1564 sblk->kvblk = 0;
1565 sblk->bpos = 0;
1566 sblk->db = db;
1567
1568 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1569 RCRET(rc);
1570
1571 if (IW_UNLIKELY(addr == db->addr)) {
1572 uint8_t *rp = mm + addr + DOFF_N0_U4;
1573 // [magic:u4,dbflg:u1,dbid:u4,next_db_blk:u4,p0:u4,n[24]:u4,c[24]:u4,meta_blk:u4,meta_blkn:u4]:217
1574 sblk->addr = addr;
1575 sblk->flags = SBLK_DB | flags;
1576 sblk->lvl = 0;
1577 sblk->p0 = 0;
1578 sblk->kvblkn = 0;
1579 sblk->lkl = 0;
1580 sblk->pnum = KVBLK_IDXNUM;
1581 memset(sblk->pi, 0, sizeof(sblk->pi));
1582 for (int i = 0; i < SLEVELS; ++i) {
1583 IW_READLV(rp, lv, sblk->n[i]);
1584 if (sblk->n[i]) {
1585 ++sblk->lvl;
1586 } else {
1587 break;
1588 }
1589 }
1590 if (sblk->lvl) {
1591 --sblk->lvl;
1592 }
1593 } else if (addr) {
1594 uint8_t uflags;
1595 uint8_t *rp = mm + addr;
1596 sblk->addr = addr;
1597 // [flags:u1,lvl:u1,lkl:u1,pnum:u1,p0:u4,kblk:u4,pi:u1[32],n:u4[24],bpos:u1,lk:u115]:u256
1598 memcpy(&uflags, rp++, 1);
1599 sblk->flags = uflags;
1600 if (sblk->flags & ~SBLK_PERSISTENT_FLAGS) {
1601 rc = IWKV_ERROR_CORRUPTED;
1602 iwlog_ecode_error3(rc);
1603 goto finish;
1604 }
1605 sblk->flags |= flags;
1606 memcpy(&sblk->lvl, rp++, 1);
1607 if (sblk->lvl >= SLEVELS) {
1608 rc = IWKV_ERROR_CORRUPTED;
1609 iwlog_ecode_error3(rc);
1610 goto finish;
1611 }
1612 memcpy(&sblk->lkl, rp++, 1);
1613 if (sblk->lkl > db->iwkv->pklen) {
1614 rc = IWKV_ERROR_CORRUPTED;
1615 iwlog_ecode_error3(rc);
1616 goto finish;
1617 }
1618 memcpy(&sblk->pnum, rp++, 1);
1619 if (sblk->pnum < 0) {
1620 rc = IWKV_ERROR_CORRUPTED;
1621 iwlog_ecode_error3(rc);
1622 goto finish;
1623 }
1624 memcpy(&sblk->p0, rp, 4);
1625 sblk->p0 = IW_ITOHL(sblk->p0);
1626 rp += 4;
1627 memcpy(&sblk->kvblkn, rp, 4);
1628 sblk->kvblkn = IW_ITOHL(sblk->kvblkn);
1629 rp += 4;
1630 memcpy(sblk->pi, rp, KVBLK_IDXNUM);
1631 rp += KVBLK_IDXNUM;
1632
1633 #ifdef IW_BIGENDIAN
1634 for (int i = 0; i <= sblk->lvl; ++i) {
1635 memcpy(&sblk->n[i], rp, 4);
1636 sblk->n[i] = IW_ITOHL(sblk->n[i]);
1637 rp += 4;
1638 }
1639 #else
1640 memcpy(sblk->n, rp, 4 * (sblk->lvl + 1));
1641 rp += 4 * (sblk->lvl + 1);
1642 #endif
1643 if (db->iwkv->fmt_version > 1) {
1644 rp = mm + addr + SOFF_BPOS_U1_V2;
1645 memcpy(&sblk->bpos, rp++, 1);
1646 } else {
1647 rp = mm + addr + SOFF_LK_V1;
1648 }
1649 // Lower key
1650 memcpy(sblk->lk, rp, (size_t) sblk->lkl);
1651 } else { // Database tail
1652 uint8_t *rp = mm + db->addr + DOFF_P0_U4;
1653 sblk->addr = 0;
1654 sblk->flags = SBLK_DB | flags;
1655 sblk->lvl = 0;
1656 sblk->kvblkn = 0;
1657 sblk->lkl = 0;
1658 sblk->pnum = KVBLK_IDXNUM;
1659 memset(sblk->pi, 0, sizeof(sblk->pi));
1660 IW_READLV(rp, lv, sblk->p0);
1661 if (!sblk->p0) {
1662 sblk->p0 = ADDR2BLK(db->addr);
1663 }
1664 }
1665
1666 finish:
1667 fsm->release_mmap(fsm);
1668 return rc;
1669 }
1670
_sblk_at(IWLCTX * lx,off_t addr,sblk_flags_t flgs,SBLK ** sblkp)1671 IW_INLINE WUR iwrc _sblk_at(IWLCTX *lx, off_t addr, sblk_flags_t flgs, SBLK **sblkp) {
1672 *sblkp = 0;
1673 SBLK *sblk = &lx->saa[lx->saan];
1674 iwrc rc = _sblk_at2(lx, addr, flgs, sblk);
1675 AAPOS_INC(lx->saan);
1676 *sblkp = sblk;
1677 return rc;
1678 }
1679
_sblk_sync_mm(IWLCTX * lx,SBLK * sblk,uint8_t * mm)1680 static WUR iwrc _sblk_sync_mm(IWLCTX *lx, SBLK *sblk, uint8_t *mm) {
1681 iwrc rc = 0;
1682 if (sblk->flags & SBLK_DURTY) {
1683 uint32_t lv;
1684 IWDLSNR *dlsnr = lx->db->iwkv->dlsnr;
1685 sblk->flags &= ~SBLK_DURTY;
1686 if (IW_UNLIKELY(sblk->flags & SBLK_DB)) {
1687 uint8_t *sp;
1688 uint8_t *wp = mm + sblk->db->addr;
1689 if (sblk->addr) {
1690 assert(sblk->addr == sblk->db->addr);
1691 wp += DOFF_N0_U4;
1692 sp = wp;
1693 // [magic:u4,dbflg:u1,dbid:u4,next_db_blk:u4,p0:u4,n[24]:u4,c[24]:u4,meta_blk:u4,meta_blkn:u4]:217
1694 for (int i = 0; i < SLEVELS; ++i) {
1695 IW_WRITELV(wp, lv, sblk->n[i]);
1696 }
1697 assert(wp - (mm + sblk->db->addr) <= SBLK_SZ);
1698 for (int i = 0; i < SLEVELS; ++i) {
1699 IW_WRITELV(wp, lv, lx->db->lcnt[i]);
1700 }
1701 } else { // Database tail
1702 wp += DOFF_P0_U4;
1703 sp = wp;
1704 IW_WRITELV(wp, lv, sblk->p0);
1705 assert(wp - (mm + sblk->db->addr) <= SBLK_SZ);
1706 }
1707 if (dlsnr) {
1708 rc = dlsnr->onwrite(dlsnr, sp - mm, sp, wp - sp, 0);
1709 }
1710 return rc;
1711 } else {
1712 uint8_t *wp = mm + sblk->addr;
1713 sblk_flags_t flags = (sblk->flags & SBLK_PERSISTENT_FLAGS);
1714 uint8_t uflags = flags;
1715 assert(sblk->lkl <= lx->db->iwkv->pklen);
1716 // [u1:flags,lvl:u1,lkl:u1,pnum:u1,p0:u4,kblk:u4,[pi0:u1,... pi32],n0-n23:u4,lk:u116]:u256
1717 wp += SOFF_FLAGS_U1;
1718 memcpy(wp++, &uflags, 1);
1719 memcpy(wp++, &sblk->lvl, 1);
1720 memcpy(wp++, &sblk->lkl, 1);
1721 memcpy(wp++, &sblk->pnum, 1);
1722 IW_WRITELV(wp, lv, sblk->p0);
1723 IW_WRITELV(wp, lv, sblk->kvblkn);
1724 memcpy(wp, sblk->pi, KVBLK_IDXNUM);
1725 wp = mm + sblk->addr + SOFF_N0_U4;
1726
1727 #ifdef IW_BIGENDIAN
1728 for (int i = 0; i <= sblk->lvl; ++i) {
1729 IW_WRITELV(wp, lv, sblk->n[i]);
1730 }
1731 #else
1732 memcpy(wp, sblk->n, 4 * (sblk->lvl + 1));
1733 wp += 4 * (sblk->lvl + 1);
1734 #endif
1735
1736 if (lx->db->iwkv->fmt_version > 1) {
1737 wp = mm + sblk->addr + SOFF_BPOS_U1_V2;
1738 memcpy(wp++, &sblk->bpos, 1);
1739 } else {
1740 wp = mm + sblk->addr + SOFF_LK_V1;
1741 }
1742 memcpy(wp, sblk->lk, (size_t) sblk->lkl);
1743 if (dlsnr) {
1744 rc = dlsnr->onwrite(dlsnr, sblk->addr, mm + sblk->addr, SOFF_END, 0);
1745 RCRET(rc);
1746 }
1747 }
1748 }
1749 if (sblk->kvblk && (sblk->kvblk->flags & KVBLK_DURTY)) {
1750 IWRC(_kvblk_sync_mm(sblk->kvblk, mm), rc);
1751 }
1752 return rc;
1753 }
1754
_sblk_sync(IWLCTX * lx,SBLK * sblk)1755 IW_INLINE WUR iwrc _sblk_sync(IWLCTX *lx, SBLK *sblk) {
1756 if ((sblk->flags & SBLK_DURTY) || (sblk->kvblk && (sblk->kvblk->flags & KVBLK_DURTY))) {
1757 uint8_t *mm;
1758 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
1759 iwrc rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1760 RCRET(rc);
1761 rc = _sblk_sync_mm(lx, sblk, mm);
1762 fsm->release_mmap(fsm);
1763 return rc;
1764 }
1765 return 0;
1766 }
1767
_sblk_sync_and_release_mm(IWLCTX * lx,SBLK ** sblkp,uint8_t * mm)1768 IW_INLINE WUR iwrc _sblk_sync_and_release_mm(IWLCTX *lx, SBLK **sblkp, uint8_t *mm) {
1769 SBLK *sblk = *sblkp;
1770 if (lx->destroy_addr && (lx->destroy_addr == sblk->addr)) {
1771 return 0;
1772 }
1773 iwrc rc = 0;
1774 if (mm) {
1775 rc = _sblk_sync_mm(lx, *sblkp, mm);
1776 }
1777 _sblk_release(lx, sblkp);
1778 return rc;
1779 }
1780
_sblk_find_pi_mm(SBLK * sblk,IWLCTX * lx,const uint8_t * mm,bool * found,uint8_t * idxp)1781 static WUR iwrc _sblk_find_pi_mm(SBLK *sblk, IWLCTX *lx, const uint8_t *mm, bool *found, uint8_t *idxp) {
1782 *found = false;
1783 if (sblk->flags & SBLK_DB) {
1784 *idxp = KVBLK_IDXNUM;
1785 return 0;
1786 }
1787 uint8_t *k;
1788 uint32_t kl;
1789 int idx = 0, lb = 0, ub = sblk->pnum - 1;
1790 iwdb_flags_t dbflg = lx->db->dbflg;
1791
1792 if (sblk->pnum < 1) {
1793 *idxp = 0;
1794 return 0;
1795 }
1796 while (1) {
1797 idx = (ub + lb) / 2;
1798 iwrc rc = _kvblk_key_peek(sblk->kvblk, sblk->pi[idx], mm, &k, &kl);
1799 RCRET(rc);
1800 int cr = _cmp_keys(dbflg, k, kl, lx->key);
1801 if (!cr) {
1802 *found = true;
1803 break;
1804 } else if (cr < 0) {
1805 lb = idx + 1;
1806 if (lb > ub) {
1807 idx = lb;
1808 break;
1809 }
1810 } else {
1811 ub = idx - 1;
1812 if (lb > ub) {
1813 break;
1814 }
1815 }
1816 }
1817 *idxp = idx;
1818 return 0;
1819 }
1820
_sblk_insert_pi_mm(SBLK * sblk,uint8_t nidx,IWLCTX * lx,const uint8_t * mm,uint8_t * idxp)1821 static WUR iwrc _sblk_insert_pi_mm(
1822 SBLK *sblk, uint8_t nidx, IWLCTX *lx,
1823 const uint8_t *mm, uint8_t *idxp
1824 ) {
1825 assert(sblk->kvblk);
1826
1827 uint8_t *k;
1828 uint32_t kl;
1829 int idx = 0, lb = 0, ub = sblk->pnum - 1, nels = sblk->pnum; // NOLINT
1830
1831 if (nels < 1) {
1832 sblk->pi[0] = nidx;
1833 ++sblk->pnum;
1834 *idxp = 0;
1835 return 0;
1836 }
1837 iwdb_flags_t dbflg = sblk->db->dbflg;
1838 while (1) {
1839 idx = (ub + lb) / 2;
1840 iwrc rc = _kvblk_key_peek(sblk->kvblk, sblk->pi[idx], mm, &k, &kl);
1841 RCRET(rc);
1842 int cr = _cmp_keys(dbflg, k, kl, lx->key);
1843 if (!cr) {
1844 break;
1845 } else if (cr < 0) {
1846 lb = idx + 1;
1847 if (lb > ub) {
1848 idx = lb;
1849 ++sblk->pnum;
1850 break;
1851 }
1852 } else {
1853 ub = idx - 1;
1854 if (lb > ub) {
1855 ++sblk->pnum;
1856 break;
1857 }
1858 }
1859 }
1860 if (nels - idx > 0) {
1861 memmove(sblk->pi + idx + 1, sblk->pi + idx, nels - idx);
1862 }
1863 sblk->pi[idx] = nidx;
1864 *idxp = idx;
1865 return 0;
1866 }
1867
_sblk_addkv2(SBLK * sblk,int8_t idx,const IWKV_val * key,const IWKV_val * val,bool raw_key)1868 static WUR iwrc _sblk_addkv2(
1869 SBLK *sblk,
1870 int8_t idx,
1871 const IWKV_val *key,
1872 const IWKV_val *val,
1873 bool raw_key
1874 ) {
1875 assert(sblk && key && key->size && key->data && val && idx >= 0 && sblk->kvblk);
1876
1877 uint8_t kvidx;
1878 IWDB db = sblk->db;
1879 KVBLK *kvblk = sblk->kvblk;
1880 if (sblk->pnum >= KVBLK_IDXNUM) {
1881 return _IWKV_RC_KVBLOCK_FULL;
1882 }
1883
1884 iwrc rc = _kvblk_addkv(kvblk, key, val, &kvidx, raw_key);
1885 RCRET(rc);
1886 if (sblk->pnum - idx > 0) {
1887 memmove(sblk->pi + idx + 1, sblk->pi + idx, sblk->pnum - idx);
1888 }
1889 sblk->pi[idx] = kvidx;
1890 if (sblk->kvblkn != ADDR2BLK(kvblk->addr)) {
1891 sblk->kvblkn = ADDR2BLK(kvblk->addr);
1892 }
1893 ++sblk->pnum;
1894 sblk->flags |= SBLK_DURTY;
1895 if (idx == 0) { // the lowest key inserted
1896 size_t ksize = key->size;
1897 bool compound = !raw_key && (db->dbflg & IWDB_COMPOUND_KEYS);
1898 if (compound) {
1899 ksize += IW_VNUMSIZE(key->compound);
1900 }
1901 sblk->lkl = MIN(db->iwkv->pklen, ksize);
1902 uint8_t *wp = sblk->lk;
1903 if (compound) {
1904 int len;
1905 IW_SETVNUMBUF64(len, wp, key->compound);
1906 wp += len;
1907 }
1908 memcpy(wp, key->data, sblk->lkl - (ksize - key->size));
1909 if (ksize <= db->iwkv->pklen) {
1910 sblk->flags |= SBLK_FULL_LKEY;
1911 } else {
1912 sblk->flags &= ~SBLK_FULL_LKEY;
1913 }
1914 }
1915 if (!raw_key) {
1916 // Update active cursors inside this block
1917 pthread_spin_lock(&db->cursors_slk);
1918 for (IWKV_cursor cur = db->cursors; cur; cur = cur->next) {
1919 if (cur->cn && (cur->cn->addr == sblk->addr)) {
1920 if (cur->cn != sblk) {
1921 memcpy(cur->cn, sblk, sizeof(*cur->cn));
1922 cur->cn->kvblk = 0;
1923 cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
1924 }
1925 if (cur->cnpos >= idx) {
1926 cur->cnpos++;
1927 }
1928 }
1929 }
1930 pthread_spin_unlock(&db->cursors_slk);
1931 }
1932 return 0;
1933 }
1934
_sblk_addkv(SBLK * sblk,IWLCTX * lx)1935 static WUR iwrc _sblk_addkv(SBLK *sblk, IWLCTX *lx) {
1936 const IWKV_val *key = lx->key;
1937 const IWKV_val *val = lx->val;
1938 assert(key && key->size && key->data && val && sblk->kvblk);
1939 if (!sblk) {
1940 iwlog_error2("sblk != 0");
1941 return IW_ERROR_ASSERTION;
1942 }
1943 uint8_t *mm, idx, kvidx;
1944 IWDB db = sblk->db;
1945 KVBLK *kvblk = sblk->kvblk;
1946 IWFS_FSM *fsm = &sblk->db->iwkv->fsm;
1947 if (sblk->pnum >= KVBLK_IDXNUM) {
1948 return _IWKV_RC_KVBLOCK_FULL;
1949 }
1950 iwrc rc = _kvblk_addkv(kvblk, key, val, &kvidx, false);
1951 RCRET(rc);
1952 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
1953 RCRET(rc);
1954 rc = _sblk_insert_pi_mm(sblk, kvidx, lx, mm, &idx);
1955 RCRET(rc);
1956 fsm->release_mmap(fsm);
1957 if (idx == 0) { // the lowest key inserted
1958 size_t ksize = key->size;
1959 bool compound = (db->dbflg & IWDB_COMPOUND_KEYS);
1960 if (compound) {
1961 ksize += IW_VNUMSIZE(key->compound);
1962 }
1963 sblk->lkl = MIN(db->iwkv->pklen, ksize);
1964 uint8_t *wp = sblk->lk;
1965 if (compound) {
1966 int len;
1967 IW_SETVNUMBUF64(len, wp, key->compound);
1968 wp += len;
1969 }
1970 memcpy(wp, key->data, sblk->lkl - (ksize - key->size));
1971 if (ksize <= db->iwkv->pklen) {
1972 sblk->flags |= SBLK_FULL_LKEY;
1973 } else {
1974 sblk->flags &= ~SBLK_FULL_LKEY;
1975 }
1976 }
1977 if (sblk->kvblkn != ADDR2BLK(kvblk->addr)) {
1978 sblk->kvblkn = ADDR2BLK(kvblk->addr);
1979 }
1980 sblk->flags |= SBLK_DURTY;
1981
1982 // Update active cursors inside this block
1983 pthread_spin_lock(&db->cursors_slk);
1984 for (IWKV_cursor cur = db->cursors; cur; cur = cur->next) {
1985 if (cur->cn && (cur->cn->addr == sblk->addr)) {
1986 if (cur->cn != sblk) {
1987 memcpy(cur->cn, sblk, sizeof(*cur->cn));
1988 cur->cn->kvblk = 0;
1989 cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
1990 }
1991 if (cur->cnpos >= idx) {
1992 cur->cnpos++;
1993 }
1994 }
1995 }
1996 pthread_spin_unlock(&db->cursors_slk);
1997
1998 return 0;
1999 }
2000
_sblk_updatekv(SBLK * sblk,int8_t idx,const IWKV_val * key,const IWKV_val * val)2001 static WUR iwrc _sblk_updatekv(
2002 SBLK *sblk, int8_t idx,
2003 const IWKV_val *key, const IWKV_val *val
2004 ) {
2005 assert(sblk && sblk->kvblk && idx >= 0 && idx < sblk->pnum);
2006 IWDB db = sblk->db;
2007 KVBLK *kvblk = sblk->kvblk;
2008 uint8_t kvidx = sblk->pi[idx];
2009 iwrc intrc = 0;
2010 iwrc rc = _kvblk_updatev(kvblk, &kvidx, key, val);
2011 if (IWKV_IS_INTERNAL_RC(rc)) {
2012 intrc = rc;
2013 rc = 0;
2014 }
2015 RCRET(rc);
2016 if (sblk->kvblkn != ADDR2BLK(kvblk->addr)) {
2017 sblk->kvblkn = ADDR2BLK(kvblk->addr);
2018 }
2019 sblk->pi[idx] = kvidx;
2020 sblk->flags |= SBLK_DURTY;
2021 // Update active cursors inside this block
2022 pthread_spin_lock(&db->cursors_slk);
2023 for (IWKV_cursor cur = db->cursors; cur; cur = cur->next) {
2024 if (cur->cn && (cur->cn != sblk) && (cur->cn->addr == sblk->addr)) {
2025 memcpy(cur->cn, sblk, sizeof(*cur->cn));
2026 cur->cn->kvblk = 0;
2027 cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2028 }
2029 }
2030 pthread_spin_unlock(&db->cursors_slk);
2031 return intrc;
2032 }
2033
_sblk_rmkv(SBLK * sblk,uint8_t idx)2034 static WUR iwrc _sblk_rmkv(SBLK *sblk, uint8_t idx) {
2035 assert(sblk && sblk->kvblk);
2036 IWDB db = sblk->db;
2037 KVBLK *kvblk = sblk->kvblk;
2038 IWFS_FSM *fsm = &sblk->db->iwkv->fsm;
2039 assert(kvblk && idx < sblk->pnum && sblk->pi[idx] < KVBLK_IDXNUM);
2040
2041 iwrc rc = _kvblk_rmkv(kvblk, sblk->pi[idx], 0);
2042 RCRET(rc);
2043
2044 if (sblk->kvblkn != ADDR2BLK(kvblk->addr)) {
2045 sblk->kvblkn = ADDR2BLK(kvblk->addr);
2046 }
2047 --sblk->pnum;
2048 sblk->flags |= SBLK_DURTY;
2049
2050 if ((idx < sblk->pnum) && (sblk->pnum > 0)) {
2051 memmove(sblk->pi + idx, sblk->pi + idx + 1, sblk->pnum - idx);
2052 }
2053
2054 if (idx == 0) { // Lowest key removed
2055 // Replace the lowest key with the next one or reset
2056 if (sblk->pnum > 0) {
2057 uint8_t *mm, *kbuf;
2058 uint32_t klen;
2059 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2060 RCRET(rc);
2061 rc = _kvblk_key_peek(sblk->kvblk, sblk->pi[idx], mm, &kbuf, &klen);
2062 if (rc) {
2063 fsm->release_mmap(fsm);
2064 return rc;
2065 }
2066 sblk->lkl = MIN(db->iwkv->pklen, klen);
2067 memcpy(sblk->lk, kbuf, sblk->lkl);
2068 fsm->release_mmap(fsm);
2069 if (klen <= db->iwkv->pklen) {
2070 sblk->flags |= SBLK_FULL_LKEY;
2071 } else {
2072 sblk->flags &= ~SBLK_FULL_LKEY;
2073 }
2074 } else {
2075 sblk->lkl = 0;
2076 }
2077 }
2078
2079 // Update active cursors
2080 pthread_spin_lock(&db->cursors_slk);
2081 for (IWKV_cursor cur = db->cursors; cur; cur = cur->next) {
2082 if (cur->cn && (cur->cn->addr == sblk->addr)) {
2083 cur->skip_next = 0;
2084 if (cur->cn != sblk) {
2085 memcpy(cur->cn, sblk, sizeof(*cur->cn));
2086 cur->cn->kvblk = 0;
2087 cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2088 }
2089 if (cur->cnpos == idx) {
2090 if (idx && (idx == sblk->pnum)) {
2091 cur->cnpos--;
2092 cur->skip_next = -1;
2093 } else {
2094 cur->skip_next = 1;
2095 }
2096 } else if (cur->cnpos > idx) {
2097 cur->cnpos--;
2098 }
2099 }
2100 }
2101 pthread_spin_unlock(&db->cursors_slk);
2102 return 0;
2103 }
2104
2105 //-------------------------- IWLCTX
2106
_lx_sblk_cmp_key(IWLCTX * lx,SBLK * sblk,int * resp)2107 WUR iwrc _lx_sblk_cmp_key(IWLCTX *lx, SBLK *sblk, int *resp) {
2108 int res = 0;
2109 iwrc rc = 0;
2110 iwdb_flags_t dbflg = sblk->db->dbflg;
2111 const IWKV_val *key = lx->key;
2112 uint8_t lkl = sblk->lkl;
2113 size_t ksize = key->size;
2114
2115 if (IW_UNLIKELY((sblk->pnum < 1) || (sblk->flags & SBLK_DB))) {
2116 *resp = 0;
2117 iwlog_ecode_error3(IWKV_ERROR_CORRUPTED);
2118 return IWKV_ERROR_CORRUPTED;
2119 }
2120 if (dbflg & IWDB_COMPOUND_KEYS) {
2121 ksize += IW_VNUMSIZE(key->compound);
2122 }
2123 if ( (sblk->flags & SBLK_FULL_LKEY)
2124 || (ksize < lkl)
2125 || (dbflg & (IWDB_VNUM64_KEYS | IWDB_REALNUM_KEYS))) {
2126 res = _cmp_keys(dbflg, sblk->lk, lkl, key);
2127 } else {
2128 res = _cmp_keys_prefix(dbflg, sblk->lk, lkl, key);
2129 if (res == 0) {
2130 uint32_t kl;
2131 uint8_t *mm, *k;
2132 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
2133 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2134 if (rc) {
2135 *resp = 0;
2136 return rc;
2137 }
2138 if (!sblk->kvblk) {
2139 rc = _sblk_loadkvblk_mm(lx, sblk, mm);
2140 if (rc) {
2141 *resp = 0;
2142 fsm->release_mmap(fsm);
2143 return rc;
2144 }
2145 }
2146 rc = _kvblk_key_peek(sblk->kvblk, sblk->pi[0], mm, &k, &kl);
2147 RCRET(rc);
2148 res = _cmp_keys(dbflg, k, kl, key);
2149 fsm->release_mmap(fsm);
2150 }
2151 }
2152 *resp = res;
2153 return rc;
2154 }
2155
_lx_roll_forward(IWLCTX * lx,uint8_t lvl)2156 static WUR iwrc _lx_roll_forward(IWLCTX *lx, uint8_t lvl) {
2157 iwrc rc = 0;
2158 int cret;
2159 SBLK *sblk;
2160 blkn_t blkn;
2161 assert(lx->lower);
2162
2163 while ((blkn = lx->lower->n[lvl])) {
2164 off_t blkaddr = BLK2ADDR(blkn);
2165 if ((lx->nlvl > -1) && (lvl < lx->nlvl)) {
2166 uint8_t ulvl = lvl + 1;
2167 if (lx->pupper[ulvl] && (lx->pupper[ulvl]->addr == blkaddr)) {
2168 sblk = lx->pupper[ulvl];
2169 } else if (lx->plower[ulvl] && (lx->plower[ulvl]->addr == blkaddr)) {
2170 sblk = lx->plower[ulvl];
2171 } else {
2172 rc = _sblk_at(lx, blkaddr, 0, &sblk);
2173 }
2174 } else {
2175 rc = _sblk_at(lx, blkaddr, 0, &sblk);
2176 }
2177 RCRET(rc);
2178 #ifndef NDEBUG
2179 ++lx->num_cmps;
2180 #endif
2181 rc = _lx_sblk_cmp_key(lx, sblk, &cret);
2182 RCRET(rc);
2183 if ((cret > 0) || (lx->upper_addr == sblk->addr)) { // upper > key
2184 lx->upper = sblk;
2185 break;
2186 } else {
2187 lx->lower = sblk;
2188 }
2189 }
2190 return 0;
2191 }
2192
_lx_find_bounds(IWLCTX * lx)2193 static WUR iwrc _lx_find_bounds(IWLCTX *lx) {
2194 iwrc rc = 0;
2195 int lvl;
2196 blkn_t blkn;
2197 SBLK *dblk = &lx->dblk;
2198 if (!dblk->addr) {
2199 SBLK *s;
2200 rc = _sblk_at(lx, lx->db->addr, 0, &s);
2201 RCRET(rc);
2202 memcpy(dblk, s, sizeof(*dblk));
2203 }
2204 if (!lx->lower) {
2205 lx->lower = &lx->dblk;
2206 }
2207 if (lx->nlvl > dblk->lvl) {
2208 // New level in DB
2209 dblk->lvl = (uint8_t) lx->nlvl;
2210 dblk->flags |= SBLK_DURTY;
2211 }
2212 lvl = lx->lower->lvl;
2213 while (lvl > -1) {
2214 rc = _lx_roll_forward(lx, (uint8_t) lvl);
2215 RCRET(rc);
2216 if (lx->upper) {
2217 blkn = ADDR2BLK(lx->upper->addr);
2218 } else {
2219 blkn = 0;
2220 }
2221 do {
2222 if (lx->nlvl >= lvl) {
2223 lx->plower[lvl] = lx->lower;
2224 lx->pupper[lvl] = lx->upper;
2225 }
2226 } while (lvl-- && lx->lower->n[lvl] == blkn);
2227 }
2228 return 0;
2229 }
2230
_lx_release_mm(IWLCTX * lx,uint8_t * mm)2231 static iwrc _lx_release_mm(IWLCTX *lx, uint8_t *mm) {
2232 iwrc rc = 0;
2233 if (lx->nlvl > -1) {
2234 SBLK *lsb = 0, *usb = 0;
2235 if (lx->nb) {
2236 rc = _sblk_sync_mm(lx, lx->nb, mm);
2237 RCGO(rc, finish);
2238 }
2239 if (lx->pupper[0] == lx->upper) {
2240 lx->upper = 0;
2241 }
2242 if (lx->plower[0] == lx->lower) {
2243 lx->lower = 0;
2244 }
2245 for (int i = 0; i <= lx->nlvl; ++i) {
2246 if (lx->pupper[i]) {
2247 if (lx->pupper[i] != usb) {
2248 usb = lx->pupper[i];
2249 rc = _sblk_sync_and_release_mm(lx, &lx->pupper[i], mm);
2250 RCGO(rc, finish);
2251 }
2252 lx->pupper[i] = 0;
2253 }
2254 if (lx->plower[i]) {
2255 if (lx->plower[i] != lsb) {
2256 lsb = lx->plower[i];
2257 rc = _sblk_sync_and_release_mm(lx, &lx->plower[i], mm);
2258 RCGO(rc, finish);
2259 }
2260 lx->plower[i] = 0;
2261 }
2262 }
2263 }
2264 if (lx->upper) {
2265 rc = _sblk_sync_and_release_mm(lx, &lx->upper, mm);
2266 RCGO(rc, finish);
2267 }
2268 if (lx->lower) {
2269 rc = _sblk_sync_and_release_mm(lx, &lx->lower, mm);
2270 RCGO(rc, finish);
2271 }
2272 if (lx->dblk.flags & SBLK_DURTY) {
2273 rc = _sblk_sync_mm(lx, &lx->dblk, mm);
2274 RCGO(rc, finish);
2275 }
2276 if (lx->nb) {
2277 _sblk_release(lx, &lx->nb);
2278 RCGO(rc, finish);
2279 }
2280
2281 finish:
2282 lx->destroy_addr = 0;
2283 return rc;
2284 }
2285
_lx_release(IWLCTX * lx)2286 iwrc _lx_release(IWLCTX *lx) {
2287 uint8_t *mm;
2288 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
2289 iwrc rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2290 RCRET(rc);
2291 rc = _lx_release_mm(lx, mm);
2292 IWRC(fsm->release_mmap(fsm), rc);
2293 return rc;
2294 }
2295
_lx_split_addkv(IWLCTX * lx,int idx,SBLK * sblk)2296 static iwrc _lx_split_addkv(IWLCTX *lx, int idx, SBLK *sblk) {
2297 iwrc rc;
2298 SBLK *nb;
2299 blkn_t nblk;
2300 IWDB db = sblk->db;
2301 bool uside = (idx == sblk->pnum);
2302 register const int8_t pivot = (KVBLK_IDXNUM / 2) + 1; // 32
2303
2304 if (uside) { // Upper side
2305 rc = _sblk_create(lx, (uint8_t) lx->nlvl, 0, sblk, lx->upper, &nb);
2306 RCRET(rc);
2307 rc = _sblk_addkv(nb, lx);
2308 RCGO(rc, finish);
2309 } else { // New key is somewhere in a middle of sblk->kvblk
2310 assert(sblk->kvblk);
2311 // We are in the middle
2312 // Do the partial split
2313 // Move kv pairs into new `nb`
2314 // Compute space required for the new sblk which stores kv pairs after pivot `idx`
2315 size_t sz = 0;
2316 for (int8_t i = pivot; i < sblk->pnum; ++i) {
2317 sz += sblk->kvblk->pidx[sblk->pi[i]].len;
2318 }
2319 if (idx > pivot) {
2320 sz += IW_VNUMSIZE(lx->key->size) + lx->key->size + lx->val->size;
2321 }
2322 sz += KVBLK_MAX_NKV_SZ;
2323 uint8_t kvbpow = (uint8_t) iwlog2_64(sz);
2324 while ((1ULL << kvbpow) < sz) kvbpow++;
2325
2326 rc = _sblk_create(lx, (uint8_t) lx->nlvl, kvbpow, sblk, lx->upper, &nb);
2327 RCRET(rc);
2328
2329 IWKV_val key, val;
2330 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
2331 for (int8_t i = pivot, end = sblk->pnum; i < end; ++i) {
2332 uint8_t *mm;
2333 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2334 RCBREAK(rc);
2335
2336 rc = _kvblk_kv_get(sblk->kvblk, mm, sblk->pi[i], &key, &val);
2337 assert(key.size);
2338 fsm->release_mmap(fsm);
2339 RCBREAK(rc);
2340
2341 rc = _sblk_addkv2(nb, i - pivot, &key, &val, true);
2342 _kv_dispose(&key, &val);
2343
2344 RCBREAK(rc);
2345 sblk->kvblk->pidx[sblk->pi[i]].len = 0;
2346 sblk->kvblk->pidx[sblk->pi[i]].off = 0;
2347 --sblk->pnum;
2348 }
2349 sblk->kvblk->flags |= KVBLK_DURTY;
2350 sblk->kvblk->zidx = sblk->pi[pivot];
2351 sblk->kvblk->maxoff = 0;
2352 for (int i = 0; i < KVBLK_IDXNUM; ++i) {
2353 if (sblk->kvblk->pidx[i].off > sblk->kvblk->maxoff) {
2354 sblk->kvblk->maxoff = sblk->kvblk->pidx[i].off;
2355 }
2356 }
2357 }
2358
2359 // Fix levels:
2360 // [ lb -> sblk -> ub ]
2361 // [ lb -> sblk -> nb -> ub ]
2362 nblk = ADDR2BLK(nb->addr);
2363 lx->pupper[0]->p0 = nblk;
2364 lx->pupper[0]->flags |= SBLK_DURTY;
2365 nb->p0 = ADDR2BLK(lx->plower[0]->addr);
2366 for (int i = 0; i <= nb->lvl; ++i) {
2367 lx->plower[i]->n[i] = nblk;
2368 lx->plower[i]->flags |= SBLK_DURTY;
2369 nb->n[i] = ADDR2BLK(lx->pupper[i]->addr);
2370 }
2371
2372 pthread_spin_lock(&db->cursors_slk);
2373 for (IWKV_cursor cur = db->cursors; cur; cur = cur->next) {
2374 if (cur->cn && (cur->cn->addr == sblk->addr)) {
2375 if (cur->cnpos >= pivot) {
2376 memcpy(cur->cn, nb, sizeof(*cur->cn));
2377 cur->cn->kvblk = 0;
2378 cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2379 cur->cnpos -= pivot;
2380 }
2381 }
2382 }
2383 pthread_spin_unlock(&db->cursors_slk);
2384
2385 if (!uside) {
2386 if (idx > pivot) {
2387 rc = _sblk_addkv(nb, lx);
2388 } else {
2389 rc = _sblk_addkv(sblk, lx);
2390 }
2391 RCGO(rc, finish);
2392 }
2393
2394 finish:
2395 if (rc) {
2396 lx->nb = 0;
2397 IWRC(_sblk_destroy(lx, &nb), rc);
2398 } else {
2399 lx->nb = nb;
2400 }
2401 return rc;
2402 }
2403
_lx_init_chute(IWLCTX * lx)2404 IW_INLINE iwrc _lx_init_chute(IWLCTX *lx) {
2405 assert(lx->nlvl >= 0);
2406 iwrc rc = 0;
2407 if (!lx->pupper[lx->nlvl]) { // fix zero upper by dbtail
2408 SBLK *dbtail;
2409 rc = _sblk_at(lx, 0, 0, &dbtail);
2410 RCRET(rc);
2411 for (int8_t i = lx->nlvl; i >= 0 && !lx->pupper[i]; --i) {
2412 lx->pupper[i] = dbtail;
2413 }
2414 }
2415 return 0;
2416 }
2417
_lx_addkv(IWLCTX * lx)2418 static WUR iwrc _lx_addkv(IWLCTX *lx) {
2419 iwrc rc;
2420 bool found, uadd;
2421 uint8_t *mm = 0, idx;
2422 SBLK *sblk = lx->lower;
2423 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
2424 if (lx->nlvl > -1) {
2425 rc = _lx_init_chute(lx);
2426 RCRET(rc);
2427 }
2428 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2429 RCRET(rc);
2430 rc = _sblk_loadkvblk_mm(lx, sblk, mm);
2431 if (rc) {
2432 fsm->release_mmap(fsm);
2433 return rc;
2434 }
2435 rc = _sblk_find_pi_mm(sblk, lx, mm, &found, &idx);
2436 RCRET(rc);
2437 if (found && (lx->opflags & IWKV_NO_OVERWRITE)) {
2438 fsm->release_mmap(fsm);
2439 return IWKV_ERROR_KEY_EXISTS;
2440 }
2441 uadd = ( !found
2442 && sblk->pnum > KVBLK_IDXNUM - 1 && idx > KVBLK_IDXNUM - 1
2443 && lx->upper && lx->upper->pnum < KVBLK_IDXNUM);
2444 if (uadd) {
2445 rc = _sblk_loadkvblk_mm(lx, lx->upper, mm);
2446 if (rc) {
2447 fsm->release_mmap(fsm);
2448 return rc;
2449 }
2450 }
2451 if (found) {
2452 IWKV_val sval, *val = lx->val;
2453 if (lx->opflags & IWKV_VAL_INCREMENT) {
2454 int64_t ival;
2455 uint8_t *rp;
2456 uint32_t len;
2457 if (val->size == 4) {
2458 int32_t lv;
2459 memcpy(&lv, val->data, val->size);
2460 lv = IW_ITOHL(lv);
2461 ival = lv;
2462 } else if (val->size == 8) {
2463 memcpy(&ival, val->data, val->size);
2464 ival = IW_ITOHLL(ival);
2465 } else {
2466 rc = IWKV_ERROR_VALUE_CANNOT_BE_INCREMENTED;
2467 fsm->release_mmap(fsm);
2468 return rc;
2469 }
2470 _kvblk_value_peek(sblk->kvblk, sblk->pi[idx], mm, &rp, &len);
2471 sval.data = rp;
2472 sval.size = len;
2473 if (sval.size == 4) {
2474 uint32_t lv;
2475 memcpy(&lv, sval.data, 4);
2476 lv = IW_ITOHL(lv);
2477 lv += ival;
2478 _num2lebuf(lx->incbuf, &lv, 4);
2479 } else if (sval.size == 8) {
2480 uint64_t llv;
2481 memcpy(&llv, sval.data, 8);
2482 llv = IW_ITOHLL(llv);
2483 llv += ival;
2484 _num2lebuf(lx->incbuf, &llv, 8);
2485 } else {
2486 rc = IWKV_ERROR_VALUE_CANNOT_BE_INCREMENTED;
2487 fsm->release_mmap(fsm);
2488 return rc;
2489 }
2490 sval.data = lx->incbuf;
2491 val = &sval;
2492 }
2493 if (lx->ph) {
2494 IWKV_val oldval;
2495 rc = _kvblk_value_get(sblk->kvblk, mm, sblk->pi[idx], &oldval);
2496 fsm->release_mmap(fsm);
2497 if (!rc) {
2498 // note: oldval should be disposed by ph
2499 rc = lx->ph(lx->key, lx->val, &oldval, lx->phop);
2500 }
2501 RCRET(rc);
2502 } else {
2503 fsm->release_mmap(fsm);
2504 }
2505 return _sblk_updatekv(sblk, idx, lx->key, val);
2506 } else {
2507 fsm->release_mmap(fsm);
2508 if (sblk->pnum > KVBLK_IDXNUM - 1) {
2509 if (uadd) {
2510 if (lx->ph) {
2511 rc = lx->ph(lx->key, lx->val, 0, lx->phop);
2512 RCRET(rc);
2513 }
2514 return _sblk_addkv(lx->upper, lx);
2515 }
2516 if (lx->nlvl < 0) {
2517 return _IWKV_RC_REQUIRE_NLEVEL;
2518 }
2519 if (lx->ph) {
2520 rc = lx->ph(lx->key, lx->val, 0, lx->phop);
2521 RCRET(rc);
2522 }
2523 return _lx_split_addkv(lx, idx, sblk);
2524 } else {
2525 if (lx->ph) {
2526 rc = lx->ph(lx->key, lx->val, 0, lx->phop);
2527 RCRET(rc);
2528 }
2529 return _sblk_addkv2(sblk, idx, lx->key, lx->val, false);
2530 }
2531 }
2532 }
2533
_lx_put_lw(IWLCTX * lx)2534 IW_INLINE WUR iwrc _lx_put_lw(IWLCTX *lx) {
2535 iwrc rc;
2536 start:
2537 rc = _lx_find_bounds(lx);
2538 if (rc) {
2539 _lx_release_mm(lx, 0);
2540 return rc;
2541 }
2542 rc = _lx_addkv(lx);
2543 if (rc == _IWKV_RC_REQUIRE_NLEVEL) {
2544 SBLK *lower = lx->lower;
2545 lx->lower = 0;
2546 _lx_release_mm(lx, 0);
2547 lx->nlvl = _sblk_genlevel(lx->db);
2548 if (lower->lvl >= lx->nlvl) {
2549 lx->lower = lower;
2550 }
2551 goto start;
2552 }
2553 if (rc == _IWKV_RC_KVBLOCK_FULL) {
2554 rc = IWKV_ERROR_CORRUPTED;
2555 iwlog_ecode_error3(rc);
2556 }
2557 IWRC(_lx_release(lx), rc);
2558 return rc;
2559 }
2560
_lx_get_lr(IWLCTX * lx)2561 IW_INLINE WUR iwrc _lx_get_lr(IWLCTX *lx) {
2562 iwrc rc = _lx_find_bounds(lx);
2563 RCRET(rc);
2564 bool found;
2565 uint8_t *mm, idx;
2566 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
2567 lx->val->size = 0;
2568 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2569 RCRET(rc);
2570 rc = _sblk_loadkvblk_mm(lx, lx->lower, mm);
2571 RCGO(rc, finish);
2572 rc = _sblk_find_pi_mm(lx->lower, lx, mm, &found, &idx);
2573 RCGO(rc, finish);
2574 if (found) {
2575 rc = _kvblk_value_get(lx->lower->kvblk, mm, lx->lower->pi[idx], lx->val);
2576 } else {
2577 rc = IWKV_ERROR_NOTFOUND;
2578 }
2579
2580 finish:
2581 IWRC(fsm->release_mmap(fsm), rc);
2582 _lx_release_mm(lx, 0);
2583 return rc;
2584 }
2585
_lx_del_sblk_lw(IWLCTX * lx,SBLK * sblk,uint8_t idx)2586 static WUR iwrc _lx_del_sblk_lw(IWLCTX *lx, SBLK *sblk, uint8_t idx) {
2587 assert(sblk->pnum == 1 && sblk->kvblk);
2588
2589 iwrc rc;
2590 IWDB db = lx->db;
2591 KVBLK *kvblk = sblk->kvblk;
2592 blkn_t sblk_blkn = ADDR2BLK(sblk->addr);
2593
2594 _lx_release_mm(lx, 0);
2595 lx->nlvl = sblk->lvl;
2596 lx->upper_addr = sblk->addr;
2597
2598 rc = _lx_find_bounds(lx);
2599 RCRET(rc);
2600 assert(lx->upper->pnum == 1 && lx->upper->addr == lx->upper_addr);
2601
2602 lx->upper->kvblk = kvblk;
2603 rc = _sblk_rmkv(lx->upper, idx);
2604 RCGO(rc, finish);
2605
2606 for (int i = 0; i <= lx->nlvl; ++i) {
2607 lx->plower[i]->n[i] = lx->upper->n[i];
2608 lx->plower[i]->flags |= SBLK_DURTY;
2609 if (lx->plower[i]->flags & SBLK_DB) {
2610 if (!lx->plower[i]->n[i]) {
2611 --lx->plower[i]->lvl;
2612 }
2613 }
2614 if (lx->pupper[i] == lx->upper) {
2615 // Do not touch `lx->upper` in next `_lx_release_mm()` call
2616 lx->pupper[i] = 0;
2617 }
2618 }
2619
2620 SBLK rb; // Block to remove
2621 memcpy(&rb, lx->upper, sizeof(rb));
2622
2623 SBLK *nb, // Block after lx->upper
2624 *rbp = &rb;
2625
2626 assert(!lx->nb);
2627 rc = _sblk_at(lx, BLK2ADDR(rb.n[0]), 0, &nb);
2628 RCGO(rc, finish);
2629 lx->nb = nb;
2630 lx->nb->p0 = rb.p0;
2631 lx->nb->flags |= SBLK_DURTY;
2632
2633 // Update cursors within sblk removed
2634 pthread_spin_lock(&db->cursors_slk);
2635 for (IWKV_cursor cur = db->cursors; cur; cur = cur->next) {
2636 if (cur->cn) {
2637 if (cur->cn->addr == sblk->addr) {
2638 if (nb->flags & SBLK_DB) {
2639 if (!(lx->plower[0]->flags & SBLK_DB)) {
2640 memcpy(cur->cn, lx->plower[0], sizeof(*cur->cn));
2641 cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2642 cur->cn->kvblk = 0;
2643 cur->skip_next = -1;
2644 cur->cnpos = lx->plower[0]->pnum;
2645 if (cur->cnpos) {
2646 cur->cnpos--;
2647 }
2648 } else {
2649 cur->cn = 0;
2650 cur->cnpos = 0;
2651 cur->skip_next = 0;
2652 }
2653 } else {
2654 memcpy(cur->cn, nb, sizeof(*nb));
2655 cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2656 cur->cn->kvblk = 0;
2657 cur->cnpos = 0;
2658 cur->skip_next = 1;
2659 }
2660 } else if (cur->cn->n[0] == sblk_blkn) {
2661 memcpy(cur->cn, lx->plower[0], sizeof(*cur->cn));
2662 cur->cn->kvblk = 0;
2663 cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2664 } else if (cur->cn->p0 == sblk_blkn) {
2665 memcpy(cur->cn, nb, sizeof(*nb));
2666 cur->cn->kvblk = 0;
2667 cur->cn->flags &= SBLK_PERSISTENT_FLAGS;
2668 }
2669 }
2670 }
2671 pthread_spin_unlock(&db->cursors_slk);
2672
2673 rc = _sblk_destroy(lx, &rbp);
2674
2675 finish:
2676 return rc;
2677 }
2678
_lx_del_lw(IWLCTX * lx)2679 static WUR iwrc _lx_del_lw(IWLCTX *lx) {
2680 iwrc rc;
2681 bool found;
2682 uint8_t *mm = 0, idx;
2683 IWDB db = lx->db;
2684 IWFS_FSM *fsm = &db->iwkv->fsm;
2685 SBLK *sblk;
2686
2687 rc = _lx_find_bounds(lx);
2688 RCRET(rc);
2689
2690 sblk = lx->lower;
2691 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2692 RCGO(rc, finish);
2693 rc = _sblk_loadkvblk_mm(lx, sblk, mm);
2694 RCGO(rc, finish);
2695 rc = _sblk_find_pi_mm(sblk, lx, mm, &found, &idx);
2696 RCGO(rc, finish);
2697 if (!found) {
2698 rc = IWKV_ERROR_NOTFOUND;
2699 goto finish;
2700 }
2701 fsm->release_mmap(fsm);
2702 mm = 0;
2703
2704 if (sblk->pnum == 1) { // last kv in block
2705 rc = _lx_del_sblk_lw(lx, sblk, idx);
2706 } else {
2707 rc = _sblk_rmkv(sblk, idx);
2708 }
2709
2710 finish:
2711 if (mm) {
2712 fsm->release_mmap(fsm);
2713 }
2714 if (rc) {
2715 _lx_release_mm(lx, 0);
2716 } else {
2717 rc = _lx_release(lx);
2718 }
2719 return rc;
2720 }
2721
2722 //-------------------------- CURSOR
2723
_cursor_get_ge_idx(IWLCTX * lx,IWKV_cursor_op op,uint8_t * oidx)2724 IW_INLINE WUR iwrc _cursor_get_ge_idx(IWLCTX *lx, IWKV_cursor_op op, uint8_t *oidx) {
2725 iwrc rc = _lx_find_bounds(lx);
2726 RCRET(rc);
2727 bool found;
2728 uint8_t *mm, idx;
2729 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
2730 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
2731 RCRET(rc);
2732 rc = _sblk_loadkvblk_mm(lx, lx->lower, mm);
2733 RCGO(rc, finish);
2734 rc = _sblk_find_pi_mm(lx->lower, lx, mm, &found, &idx);
2735 RCGO(rc, finish);
2736 if (found) {
2737 *oidx = idx;
2738 } else {
2739 if ((op == IWKV_CURSOR_EQ) || (lx->lower->flags & SBLK_DB) || (lx->lower->pnum < 1)) {
2740 rc = IWKV_ERROR_NOTFOUND;
2741 } else {
2742 *oidx = idx ? idx - 1 : idx;
2743 }
2744 }
2745
2746 finish:
2747 IWRC(fsm->release_mmap(fsm), rc);
2748 return rc;
2749 }
2750
_cursor_to_lr(IWKV_cursor cur,IWKV_cursor_op op)2751 static WUR iwrc _cursor_to_lr(IWKV_cursor cur, IWKV_cursor_op op) {
2752 iwrc rc = 0;
2753 IWDB db = cur->lx.db;
2754 IWLCTX *lx = &cur->lx;
2755 blkn_t dblk = ADDR2BLK(db->addr);
2756 if (op < IWKV_CURSOR_NEXT) { // IWKV_CURSOR_BEFORE_FIRST | IWKV_CURSOR_AFTER_LAST
2757 if (cur->cn) {
2758 _sblk_release(lx, &cur->cn);
2759 }
2760 if (op == IWKV_CURSOR_BEFORE_FIRST) {
2761 cur->dbaddr = db->addr;
2762 cur->cnpos = KVBLK_IDXNUM - 1;
2763 } else {
2764 cur->dbaddr = -1; // Negative as sign of dbtail
2765 cur->cnpos = 0;
2766 }
2767 return 0;
2768 }
2769
2770 start:
2771 if (op < IWKV_CURSOR_EQ) { // IWKV_CURSOR_NEXT | IWKV_CURSOR_PREV
2772 blkn_t n = 0;
2773 if (!cur->cn) {
2774 if (cur->dbaddr) {
2775 rc = _sblk_at(lx, (cur->dbaddr < 0 ? 0 : cur->dbaddr), 0, &cur->cn);
2776 cur->dbaddr = 0;
2777 RCGO(rc, finish);
2778 } else {
2779 rc = IWKV_ERROR_NOTFOUND;
2780 goto finish;
2781 }
2782 }
2783 if (op == IWKV_CURSOR_NEXT) {
2784 if (cur->skip_next > 0) {
2785 goto finish;
2786 }
2787 if (cur->cnpos + 1 >= cur->cn->pnum) {
2788 n = cur->cn->n[0];
2789 if (!n) {
2790 rc = IWKV_ERROR_NOTFOUND;
2791 goto finish;
2792 }
2793 _sblk_release(lx, &cur->cn);
2794 rc = _sblk_at(lx, BLK2ADDR(n), 0, &cur->cn);
2795 RCGO(rc, finish);
2796 cur->cnpos = 0;
2797 if (IW_UNLIKELY(!cur->cn->pnum)) {
2798 goto start;
2799 }
2800 } else {
2801 if (cur->cn->flags & SBLK_DB) {
2802 rc = IWKV_ERROR_NOTFOUND;
2803 goto finish;
2804 }
2805 ++cur->cnpos;
2806 }
2807 } else { // IWKV_CURSOR_PREV
2808 if (cur->skip_next < 0) {
2809 goto finish;
2810 }
2811 if (cur->cnpos == 0) {
2812 n = cur->cn->p0;
2813 if (!n || (n == dblk)) {
2814 rc = IWKV_ERROR_NOTFOUND;
2815 goto finish;
2816 }
2817 _sblk_release(lx, &cur->cn);
2818 RCGO(rc, finish);
2819 rc = _sblk_at(lx, BLK2ADDR(n), 0, &cur->cn);
2820 RCGO(rc, finish);
2821 if (IW_LIKELY(cur->cn->pnum)) {
2822 cur->cnpos = cur->cn->pnum - 1;
2823 } else {
2824 goto start;
2825 }
2826 } else {
2827 if (cur->cn->flags & SBLK_DB) {
2828 rc = IWKV_ERROR_NOTFOUND;
2829 goto finish;
2830 }
2831 --cur->cnpos;
2832 }
2833 }
2834 } else { // IWKV_CURSOR_EQ | IWKV_CURSOR_GE
2835 if (!lx->key) {
2836 rc = IW_ERROR_INVALID_STATE;
2837 goto finish;
2838 }
2839 rc = _cursor_get_ge_idx(lx, op, &cur->cnpos);
2840 if (lx->upper) {
2841 _sblk_release(lx, &lx->upper);
2842 }
2843 if (!rc) {
2844 cur->cn = lx->lower;
2845 lx->lower = 0;
2846 }
2847 }
2848
2849 finish:
2850 cur->skip_next = 0;
2851 if (rc && (rc != IWKV_ERROR_NOTFOUND)) {
2852 if (cur->cn) {
2853 _sblk_release(lx, &cur->cn);
2854 }
2855 }
2856 return rc;
2857 }
2858
2859 //-------------------------- PUBLIC API
2860
_kv_ecodefn(locale_t locale,uint32_t ecode)2861 static const char* _kv_ecodefn(locale_t locale, uint32_t ecode) {
2862 if (!((ecode > _IWKV_ERROR_START) && (ecode < _IWKV_ERROR_END))) {
2863 return 0;
2864 }
2865 switch (ecode) {
2866 case IWKV_ERROR_NOTFOUND:
2867 return "Key not found. (IWKV_ERROR_NOTFOUND)";
2868 case IWKV_ERROR_KEY_EXISTS:
2869 return "Key exists. (IWKV_ERROR_KEY_EXISTS)";
2870 case IWKV_ERROR_MAXKVSZ:
2871 return "Size of Key+value must be not greater than 0xfffffff bytes (IWKV_ERROR_MAXKVSZ)";
2872 case IWKV_ERROR_CORRUPTED:
2873 return "Database file invalid or corrupted (IWKV_ERROR_CORRUPTED)";
2874 case IWKV_ERROR_DUP_VALUE_SIZE:
2875 return "Value size is not compatible for insertion into sorted values array (IWKV_ERROR_DUP_VALUE_SIZE)";
2876 case IWKV_ERROR_KEY_NUM_VALUE_SIZE:
2877 return "Given key is not compatible to store as number (IWKV_ERROR_KEY_NUM_VALUE_SIZE)";
2878 case IWKV_ERROR_INCOMPATIBLE_DB_MODE:
2879 return "Incompatible database open mode (IWKV_ERROR_INCOMPATIBLE_DB_MODE)";
2880 case IWKV_ERROR_INCOMPATIBLE_DB_FORMAT:
2881 return "Incompatible database format version, please migrate database data (IWKV_ERROR_INCOMPATIBLE_DB_FORMAT)";
2882 case IWKV_ERROR_CORRUPTED_WAL_FILE:
2883 return "Corrupted WAL file (IWKV_ERROR_CORRUPTED_WAL_FILE)";
2884 case IWKV_ERROR_VALUE_CANNOT_BE_INCREMENTED:
2885 return "Stored value cannot be incremented/descremented (IWKV_ERROR_VALUE_CANNOT_BE_INCREMENTED)";
2886 case IWKV_ERROR_WAL_MODE_REQUIRED:
2887 return "Operation requires WAL enabled database. (IWKV_ERROR_WAL_MODE_REQUIRED)";
2888 case IWKV_ERROR_BACKUP_IN_PROGRESS:
2889 return "Backup operation in progress. (IWKV_ERROR_BACKUP_IN_PROGRESS)";
2890 default:
2891 break;
2892 }
2893 return 0;
2894 }
2895
iwkv_init(void)2896 iwrc iwkv_init(void) {
2897 static int _kv_initialized = 0;
2898 if (!__sync_bool_compare_and_swap(&_kv_initialized, 0, 1)) {
2899 return 0;
2900 }
2901 return iwlog_register_ecodefn(_kv_ecodefn);
2902 }
2903
_szpolicy(off_t nsize,off_t csize,struct IWFS_EXT * f,void ** _ctx)2904 static off_t _szpolicy(off_t nsize, off_t csize, struct IWFS_EXT *f, void **_ctx) {
2905 off_t res;
2906 size_t aunit = iwp_alloc_unit();
2907 if (csize < 0x4000000) { // Doubled alloc up to 64M
2908 res = csize ? csize : aunit;
2909 while (res < nsize) {
2910 res <<= 1;
2911 }
2912 } else {
2913 res = nsize + 10 * 1024 * 1024; // + 10M extra space
2914 }
2915 res = IW_ROUNDUP(res, aunit);
2916 return res;
2917 }
2918
iwkv_state(IWKV iwkv,IWFS_FSM_STATE * out)2919 iwrc iwkv_state(IWKV iwkv, IWFS_FSM_STATE *out) {
2920 if (!iwkv || !out) {
2921 return IW_ERROR_INVALID_ARGS;
2922 }
2923 int rci;
2924 API_RLOCK(iwkv, rci);
2925 IWFS_FSM fsm = iwkv->fsm;
2926 iwrc rc = fsm.state(&fsm, out);
2927 API_UNLOCK(iwkv, rci, rc);
2928 return rc;
2929 }
2930
iwkv_online_backup(IWKV iwkv,uint64_t * ts,const char * target_file)2931 iwrc iwkv_online_backup(IWKV iwkv, uint64_t *ts, const char *target_file) {
2932 return iwal_online_backup(iwkv, ts, target_file);
2933 }
2934
_iwkv_check_online_backup(const char * path,iwp_lockmode extra_lock_flags,bool * out_has_online_bkp)2935 static iwrc _iwkv_check_online_backup(const char *path, iwp_lockmode extra_lock_flags, bool *out_has_online_bkp) {
2936 size_t sp;
2937 uint32_t lv;
2938 off_t fsz, pos;
2939 uint64_t waloff; // WAL offset
2940 char buf[16384];
2941
2942 *out_has_online_bkp = false;
2943 const size_t aunit = iwp_alloc_unit();
2944 char *wpath = 0;
2945
2946 IWFS_FILE f = { 0 }, w = { 0 };
2947 IWFS_FILE_STATE fs, fw;
2948 iwrc rc = iwfs_file_open(&f, &(IWFS_FILE_OPTS) {
2949 .path = path,
2950 .omode = IWFS_OREAD | IWFS_OWRITE,
2951 .lock_mode = IWP_WLOCK | extra_lock_flags
2952 });
2953 if (rc == IW_ERROR_NOT_EXISTS) {
2954 return 0;
2955 }
2956 RCRET(rc);
2957
2958 rc = f.state(&f, &fs);
2959 RCGO(rc, finish);
2960
2961 rc = iwp_lseek(fs.fh, 0, IWP_SEEK_END, &fsz);
2962 RCGO(rc, finish);
2963 if (fsz < iwp_alloc_unit()) {
2964 goto finish;
2965 }
2966
2967 rc = iwp_pread(fs.fh, 0, &lv, sizeof(lv), &sp);
2968 RCGO(rc, finish);
2969 lv = IW_ITOHL(lv);
2970 if ((sp != sizeof(lv)) || (lv != IWFSM_MAGICK)) {
2971 goto finish;
2972 }
2973
2974 rc = iwp_pread(fs.fh, IWFSM_CUSTOM_HDR_DATA_OFFSET, &lv, sizeof(lv), &sp);
2975 RCGO(rc, finish);
2976 lv = IW_ITOHL(lv);
2977 if ((sp != sizeof(lv)) || (lv != IWKV_MAGIC)) {
2978 goto finish;
2979 }
2980
2981 rc = iwp_lseek(fs.fh, (off_t) -1 * sizeof(lv), IWP_SEEK_END, 0);
2982 RCGO(rc, finish);
2983
2984 rc = iwp_read(fs.fh, &lv, sizeof(lv), &sp);
2985 RCGO(rc, finish);
2986 lv = IW_ITOHL(lv);
2987 if ((sp != sizeof(lv)) || (lv != IWKV_BACKUP_MAGIC)) {
2988 goto finish;
2989 }
2990
2991 // Get WAL data offset
2992 rc = iwp_lseek(fs.fh, (off_t) -1 * (sizeof(waloff) + sizeof(lv)), IWP_SEEK_END, &pos);
2993 RCGO(rc, finish);
2994
2995 rc = iwp_read(fs.fh, &waloff, sizeof(waloff), &sp);
2996 RCGO(rc, finish);
2997
2998 waloff = IW_ITOHLL(waloff);
2999 if (((waloff != pos) && (waloff > pos - sizeof(WBSEP))) || (waloff & (aunit - 1))) {
3000 goto finish;
3001 }
3002
3003 // Read the first WAL instruction: WBSEP
3004 if (waloff != pos) { // Not an empty WAL?
3005 WBSEP wbsep = { 0 };
3006 rc = iwp_pread(fs.fh, waloff, &wbsep, sizeof(wbsep), &sp);
3007 RCGO(rc, finish);
3008 if (wbsep.id != WOP_SEP) {
3009 goto finish;
3010 }
3011 }
3012
3013 // Now we have an online backup image, unpack WAL file
3014
3015 sp = strlen(path);
3016 wpath = malloc(sp + 4 /*-wal*/ + 1 /*\0*/);
3017 if (!wpath) {
3018 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
3019 goto finish;
3020 }
3021 memcpy(wpath, path, sp);
3022 memcpy(wpath + sp, "-wal", 4);
3023 wpath[sp + 4] = '\0';
3024
3025 iwlog_warn("Unpacking WAL from online backup into: %s", wpath);
3026 *out_has_online_bkp = true;
3027
3028 // WAL file
3029 rc = iwfs_file_open(&w, &(IWFS_FILE_OPTS) {
3030 .path = wpath,
3031 .omode = IWFS_OREAD | IWFS_OWRITE | IWFS_OTRUNC
3032 });
3033 RCGO(rc, finish);
3034
3035 rc = w.state(&w, &fw);
3036 RCGO(rc, finish);
3037
3038 // WAL content copy
3039 rc = iwp_lseek(fs.fh, waloff, IWP_SEEK_SET, 0);
3040 RCGO(rc, finish);
3041 fsz = fsz - waloff - sizeof(lv) /* magic */ - sizeof(waloff) /* wal offset */;
3042 if (fsz > 0) {
3043 sp = 0;
3044 do {
3045 rc = iwp_read(fs.fh, buf, sizeof(buf), &sp);
3046 RCGO(rc, finish);
3047 if (sp > fsz) {
3048 sp = fsz;
3049 }
3050 fsz -= sp;
3051 rc = iwp_write(fw.fh, buf, sp);
3052 RCGO(rc, finish);
3053 } while (fsz > 0 && sp > 0);
3054 }
3055 rc = iwp_fsync(fw.fh);
3056 RCGO(rc, finish);
3057
3058 rc = iwp_ftruncate(fs.fh, waloff);
3059 RCGO(rc, finish);
3060
3061 rc = iwp_fsync(fs.fh);
3062 RCGO(rc, finish);
3063
3064 finish:
3065 if (f.impl) {
3066 IWRC(f.close(&f), rc);
3067 }
3068 if (w.impl) {
3069 IWRC(w.close(&w), rc);
3070 }
3071 free(wpath);
3072 return rc;
3073 }
3074
iwkv_open(const IWKV_OPTS * opts,IWKV * iwkvp)3075 iwrc iwkv_open(const IWKV_OPTS *opts, IWKV *iwkvp) {
3076 if (!opts || !iwkvp || !opts->path) {
3077 return IW_ERROR_INVALID_ARGS;
3078 }
3079 *iwkvp = 0;
3080 int rci;
3081 iwrc rc = 0;
3082 uint32_t lv;
3083 uint64_t llv;
3084 uint8_t *rp, *mm;
3085 bool has_online_bkp = false;
3086
3087 rc = iw_init();
3088 RCRET(rc);
3089
3090 if (opts->random_seed) {
3091 iwu_rand_seed(opts->random_seed);
3092 }
3093 iwkv_openflags oflags = opts->oflags;
3094 iwfs_omode omode = IWFS_OREAD;
3095 if (oflags & IWKV_TRUNC) {
3096 oflags &= ~IWKV_RDONLY;
3097 omode |= IWFS_OTRUNC;
3098 }
3099 if (!(oflags & IWKV_RDONLY)) {
3100 omode |= IWFS_OWRITE;
3101 omode |= IWFS_OCREATE;
3102 }
3103 if ((omode & IWFS_OWRITE) && !(omode & IWFS_OTRUNC)) {
3104 iwp_lockmode extra_lock_flags = 0;
3105 if (opts->file_lock_fail_fast) {
3106 extra_lock_flags |= IWP_NBLOCK;
3107 }
3108 rc = _iwkv_check_online_backup(opts->path, extra_lock_flags, &has_online_bkp);
3109 RCRET(rc);
3110 }
3111
3112 *iwkvp = calloc(1, sizeof(struct _IWKV));
3113 if (!*iwkvp) {
3114 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
3115 }
3116 IWKV iwkv = *iwkvp;
3117 iwkv->fmt_version = opts->fmt_version > 0 ? opts->fmt_version : IWKV_FORMAT;
3118 if (iwkv->fmt_version > IWKV_FORMAT) {
3119 rc = IWKV_ERROR_INCOMPATIBLE_DB_FORMAT;
3120 iwlog_ecode_error3(rc);
3121 return rc;
3122 }
3123 // Adjust lower key len accourding to database format version
3124 if (iwkv->fmt_version < 2) {
3125 iwkv->pklen = PREFIX_KEY_LEN_V1;
3126 } else {
3127 iwkv->pklen = PREFIX_KEY_LEN_V2;
3128 }
3129
3130 pthread_rwlockattr_t attr;
3131 pthread_rwlockattr_init(&attr);
3132 #if defined __linux__ && (defined __USE_UNIX98 || defined __USE_XOPEN2K)
3133 pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
3134 #endif
3135 rci = pthread_rwlock_init(&iwkv->rwl, &attr);
3136 if (rci) {
3137 free(*iwkvp);
3138 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
3139 }
3140 rci = pthread_mutex_init(&iwkv->wk_mtx, 0);
3141 if (rci) {
3142 pthread_rwlock_destroy(&iwkv->rwl);
3143 free(*iwkvp);
3144 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
3145 }
3146 rci = pthread_cond_init(&iwkv->wk_cond, 0);
3147 if (rci) {
3148 pthread_rwlock_destroy(&iwkv->rwl);
3149 pthread_mutex_destroy(&iwkv->wk_mtx);
3150 free(*iwkvp);
3151 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
3152 }
3153
3154 iwkv->oflags = oflags;
3155 IWFS_FSM_STATE fsmstate;
3156 IWFS_FSM_OPTS fsmopts = {
3157 .exfile = {
3158 .file = {
3159 .path = opts->path,
3160 .omode = omode,
3161 .lock_mode = (oflags & IWKV_RDONLY) ? IWP_RLOCK : IWP_WLOCK
3162 },
3163 .rspolicy = _szpolicy,
3164 .maxoff = IWKV_MAX_DBSZ,
3165 .use_locks = true
3166 },
3167 .bpow = IWKV_FSM_BPOW, // 64 bytes block size
3168 .hdrlen = KVHDRSZ, // Size of custom file header
3169 .oflags = ((oflags & IWKV_RDONLY) ? IWFSM_NOLOCKS : 0),
3170 .mmap_all = true,
3171 .mmap_opts = IWFS_MMAP_RANDOM
3172 };
3173 #ifndef NDEBUG
3174 fsmopts.oflags |= IWFSM_STRICT;
3175 #endif
3176 if (oflags & IWKV_NO_TRIM_ON_CLOSE) {
3177 fsmopts.oflags |= IWFSM_NO_TRIM_ON_CLOSE;
3178 }
3179 if (opts->file_lock_fail_fast) {
3180 fsmopts.exfile.file.lock_mode |= IWP_NBLOCK;
3181 }
3182 // Init WAL
3183 RCC(rc, finish, iwal_create(iwkv, opts, &fsmopts, has_online_bkp));
3184
3185 // Now open database file
3186 RCC(rc, finish, iwfs_fsmfile_open(&iwkv->fsm, &fsmopts));
3187 RCB(finish, iwkv->dbs = iwhmap_create_u32(0));
3188
3189 IWFS_FSM *fsm = &iwkv->fsm;
3190 RCC(rc, finish, fsm->state(fsm, &fsmstate));
3191
3192 // Database header: [magic:u4, first_addr:u8, db_format_version:u4]
3193 if (fsmstate.exfile.file.ostatus & IWFS_OPEN_NEW) {
3194 uint8_t hdr[KVHDRSZ] = { 0 };
3195 uint8_t *wp = hdr;
3196 IW_WRITELV(wp, lv, IWKV_MAGIC);
3197 wp += sizeof(llv); // skip first db addr
3198 IW_WRITELV(wp, lv, iwkv->fmt_version);
3199 RCC(rc, finish, fsm->writehdr(fsm, 0, hdr, sizeof(hdr)));
3200 RCC(rc, finish, fsm->sync(fsm, 0));
3201 } else {
3202 off_t dbaddr; // first database address
3203 uint8_t hdr[KVHDRSZ];
3204 RCC(rc, finish, fsm->readhdr(fsm, 0, hdr, KVHDRSZ));
3205 rp = hdr; // -V507
3206 IW_READLV(rp, lv, lv);
3207 IW_READLLV(rp, llv, dbaddr);
3208 if ((lv != IWKV_MAGIC) || (dbaddr < 0)) {
3209 rc = IWKV_ERROR_CORRUPTED;
3210 iwlog_ecode_error3(rc);
3211 goto finish;
3212 }
3213 IW_READLV(rp, lv, iwkv->fmt_version);
3214 if ((iwkv->fmt_version > IWKV_FORMAT)) {
3215 rc = IWKV_ERROR_INCOMPATIBLE_DB_FORMAT;
3216 iwlog_ecode_error3(rc);
3217 goto finish;
3218 }
3219 if (iwkv->fmt_version < 2) {
3220 iwkv->pklen = PREFIX_KEY_LEN_V1;
3221 } else {
3222 iwkv->pklen = PREFIX_KEY_LEN_V2;
3223 }
3224 RCC(rc, finish, fsm->acquire_mmap(fsm, 0, &mm, 0));
3225 RCC(rc, finish, _db_load_chain(iwkv, dbaddr, mm));
3226 fsm->release_mmap(fsm);
3227 }
3228 (*iwkvp)->open = true;
3229
3230 finish:
3231 if (rc) {
3232 (*iwkvp)->open = true; // will be closed in iwkv_close
3233 IWRC(iwkv_close(iwkvp), rc);
3234 }
3235 return rc;
3236 }
3237
iwkv_exclusive_lock(IWKV iwkv)3238 iwrc iwkv_exclusive_lock(IWKV iwkv) {
3239 return _wnw(iwkv, _wnw_iwkw_wl);
3240 }
3241
iwkv_exclusive_unlock(IWKV iwkv)3242 iwrc iwkv_exclusive_unlock(IWKV iwkv) {
3243 int rci;
3244 iwrc rc = 0;
3245 API_UNLOCK(iwkv, rci, rc);
3246 return rc;
3247 }
3248
iwkv_close(IWKV * iwkvp)3249 iwrc iwkv_close(IWKV *iwkvp) {
3250 ENSURE_OPEN((*iwkvp));
3251 IWKV iwkv = *iwkvp;
3252 iwkv->open = false;
3253 iwal_shutdown(iwkv);
3254 iwrc rc = iwkv_exclusive_lock(iwkv);
3255 RCRET(rc);
3256 IWDB db = iwkv->first_db;
3257 while (db) {
3258 IWDB ndb = db->next;
3259 _db_release_lw(&db);
3260 db = ndb;
3261 }
3262 IWRC(iwkv->fsm.close(&iwkv->fsm), rc);
3263 // Below the memory cleanup only
3264 if (iwkv->dbs) {
3265 iwhmap_destroy(iwkv->dbs);
3266 iwkv->dbs = 0;
3267 }
3268
3269 iwkv_exclusive_unlock(iwkv);
3270 pthread_rwlock_destroy(&iwkv->rwl);
3271 pthread_mutex_destroy(&iwkv->wk_mtx);
3272 pthread_cond_destroy(&iwkv->wk_cond);
3273 free(iwkv);
3274 *iwkvp = 0;
3275 return rc;
3276 }
3277
_iwkv_sync(IWKV iwkv,iwfs_sync_flags _flags)3278 static iwrc _iwkv_sync(IWKV iwkv, iwfs_sync_flags _flags) {
3279 ENSURE_OPEN(iwkv);
3280 if (iwkv->oflags & IWKV_RDONLY) {
3281 return IW_ERROR_READONLY;
3282 }
3283 iwrc rc;
3284 if (iwkv->dlsnr) {
3285 rc = iwal_poke_savepoint(iwkv);
3286 } else {
3287 IWFS_FSM *fsm = &iwkv->fsm;
3288 pthread_rwlock_wrlock(&iwkv->rwl);
3289 iwfs_sync_flags flags = IWFS_FDATASYNC | _flags;
3290 rc = fsm->sync(fsm, flags);
3291 pthread_rwlock_unlock(&iwkv->rwl);
3292 }
3293 return rc;
3294 }
3295
iwkv_sync(IWKV iwkv,iwfs_sync_flags _flags)3296 iwrc iwkv_sync(IWKV iwkv, iwfs_sync_flags _flags) {
3297 ENSURE_OPEN(iwkv);
3298 if (iwkv->oflags & IWKV_RDONLY) {
3299 return IW_ERROR_READONLY;
3300 }
3301 iwrc rc;
3302 if (iwkv->dlsnr) {
3303 rc = iwkv_exclusive_lock(iwkv);
3304 RCRET(rc);
3305 rc = iwal_savepoint_exl(iwkv, true);
3306 iwkv_exclusive_unlock(iwkv);
3307 } else {
3308 IWFS_FSM *fsm = &iwkv->fsm;
3309 pthread_rwlock_wrlock(&iwkv->rwl);
3310 iwfs_sync_flags flags = IWFS_FDATASYNC | _flags;
3311 rc = fsm->sync(fsm, flags);
3312 pthread_rwlock_unlock(&iwkv->rwl);
3313 }
3314 return rc;
3315 }
3316
iwkv_db(IWKV iwkv,uint32_t dbid,iwdb_flags_t dbflg,IWDB * dbp)3317 iwrc iwkv_db(IWKV iwkv, uint32_t dbid, iwdb_flags_t dbflg, IWDB *dbp) {
3318 int rci;
3319 iwrc rc = 0;
3320 IWDB db = 0;
3321 *dbp = 0;
3322
3323 API_RLOCK(iwkv, rci);
3324 db = iwhmap_get_u32(iwkv->dbs, dbid);
3325 API_UNLOCK(iwkv, rci, rc);
3326 RCRET(rc);
3327
3328 if (db) {
3329 if (db->dbflg != dbflg) {
3330 return IWKV_ERROR_INCOMPATIBLE_DB_MODE;
3331 }
3332 *dbp = db;
3333 return 0;
3334 }
3335 if (iwkv->oflags & IWKV_RDONLY) {
3336 return IW_ERROR_READONLY;
3337 }
3338 rc = iwkv_exclusive_lock(iwkv);
3339 RCRET(rc);
3340
3341 db = iwhmap_get_u32(iwkv->dbs, dbid);
3342 if (db) {
3343 if (db->dbflg != dbflg) {
3344 return IWKV_ERROR_INCOMPATIBLE_DB_MODE;
3345 }
3346 *dbp = db;
3347 } else {
3348 rc = _db_create_lw(iwkv, dbid, dbflg, dbp);
3349 }
3350 if (!rc) {
3351 rc = iwal_savepoint_exl(iwkv, true);
3352 }
3353 iwkv_exclusive_unlock(iwkv);
3354 return rc;
3355 }
3356
iwkv_new_db(IWKV iwkv,iwdb_flags_t dbflg,uint32_t * dbidp,IWDB * dbp)3357 iwrc iwkv_new_db(IWKV iwkv, iwdb_flags_t dbflg, uint32_t *dbidp, IWDB *dbp) {
3358 *dbp = 0;
3359 *dbidp = 0;
3360 if (iwkv->oflags & IWKV_RDONLY) {
3361 return IW_ERROR_READONLY;
3362 }
3363 uint32_t dbid = 0;
3364 iwrc rc = iwkv_exclusive_lock(iwkv);
3365 RCRET(rc);
3366
3367 IWHMAP_ITER iter;
3368 iwhmap_iter_init(iwkv->dbs, &iter);
3369
3370 while (iwhmap_iter_next(&iter)) {
3371 uint32_t id = (uint32_t) (uintptr_t) iter.key;
3372 if (id > dbid) {
3373 dbid = id;
3374 }
3375 }
3376
3377 dbid++;
3378 rc = _db_create_lw(iwkv, dbid, dbflg, dbp);
3379 if (!rc) {
3380 *dbidp = dbid;
3381 rc = iwal_savepoint_exl(iwkv, true);
3382 }
3383 iwkv_exclusive_unlock(iwkv);
3384 return rc;
3385 }
3386
iwkv_db_destroy(IWDB * dbp)3387 iwrc iwkv_db_destroy(IWDB *dbp) {
3388 if (!dbp || !*dbp) {
3389 return IW_ERROR_INVALID_ARGS;
3390 }
3391 IWDB db = *dbp;
3392 IWKV iwkv = db->iwkv;
3393 *dbp = 0;
3394 if (iwkv->oflags & IWKV_RDONLY) {
3395 return IW_ERROR_READONLY;
3396 }
3397 iwrc rc = iwkv_exclusive_lock(iwkv);
3398 RCRET(rc);
3399 rc = _db_destroy_lw(&db);
3400 iwkv_exclusive_unlock(iwkv);
3401 return rc;
3402 }
3403
iwkv_puth(IWDB db,const IWKV_val * key,const IWKV_val * val,iwkv_opflags opflags,IWKV_PUT_HANDLER ph,void * phop)3404 iwrc iwkv_puth(
3405 IWDB db, const IWKV_val *key, const IWKV_val *val,
3406 iwkv_opflags opflags, IWKV_PUT_HANDLER ph, void *phop
3407 ) {
3408 if (!db || !db->iwkv || !key || !key->size || !val) {
3409 return IW_ERROR_INVALID_ARGS;
3410 }
3411 IWKV iwkv = db->iwkv;
3412 if (iwkv->oflags & IWKV_RDONLY) {
3413 return IW_ERROR_READONLY;
3414 }
3415 if (opflags & IWKV_VAL_INCREMENT) {
3416 // No overwrite for increment
3417 opflags &= ~IWKV_NO_OVERWRITE;
3418 }
3419
3420 int rci;
3421 IWKV_val ekey;
3422 uint8_t nbuf[IW_VNUMBUFSZ];
3423 iwrc rc = _to_effective_key(db, key, &ekey, nbuf);
3424 RCRET(rc);
3425
3426 IWLCTX lx = {
3427 .db = db,
3428 .key = &ekey,
3429 .val = (IWKV_val*) val,
3430 .nlvl = -1,
3431 .op = IWLCTX_PUT,
3432 .opflags = opflags,
3433 .ph = ph,
3434 .phop = phop
3435 };
3436 API_DB_WLOCK(db, rci);
3437 rc = _lx_put_lw(&lx);
3438 API_DB_UNLOCK(db, rci, rc);
3439 if (!rc) {
3440 if (lx.opflags & IWKV_SYNC) {
3441 rc = _iwkv_sync(iwkv, 0);
3442 } else {
3443 rc = iwal_poke_checkpoint(iwkv, false);
3444 }
3445 }
3446 return rc;
3447 }
3448
iwkv_put(IWDB db,const IWKV_val * key,const IWKV_val * val,iwkv_opflags opflags)3449 iwrc iwkv_put(IWDB db, const IWKV_val *key, const IWKV_val *val, iwkv_opflags opflags) {
3450 return iwkv_puth(db, key, val, opflags, 0, 0);
3451 }
3452
iwkv_get(IWDB db,const IWKV_val * key,IWKV_val * oval)3453 iwrc iwkv_get(IWDB db, const IWKV_val *key, IWKV_val *oval) {
3454 if (!db || !db->iwkv || !key || !oval) {
3455 return IW_ERROR_INVALID_ARGS;
3456 }
3457
3458 int rci;
3459 IWKV_val ekey;
3460 uint8_t nbuf[IW_VNUMBUFSZ];
3461 iwrc rc = _to_effective_key(db, key, &ekey, nbuf);
3462 RCRET(rc);
3463
3464 IWLCTX lx = {
3465 .db = db,
3466 .key = &ekey,
3467 .val = oval,
3468 .nlvl = -1
3469 };
3470 oval->size = 0;
3471 API_DB_RLOCK(db, rci);
3472 rc = _lx_get_lr(&lx);
3473 API_DB_UNLOCK(db, rci, rc);
3474 return rc;
3475 }
3476
iwkv_get_copy(IWDB db,const IWKV_val * key,void * vbuf,size_t vbufsz,size_t * vsz)3477 iwrc iwkv_get_copy(IWDB db, const IWKV_val *key, void *vbuf, size_t vbufsz, size_t *vsz) {
3478 if (!db || !db->iwkv || !key || !vbuf) {
3479 return IW_ERROR_INVALID_ARGS;
3480 }
3481 *vsz = 0;
3482
3483 int rci;
3484 bool found;
3485 IWKV_val ekey;
3486 uint32_t ovalsz;
3487 uint8_t *mm = 0, *oval, idx;
3488 IWFS_FSM *fsm = &db->iwkv->fsm;
3489 uint8_t nbuf[IW_VNUMBUFSZ];
3490 iwrc rc = _to_effective_key(db, key, &ekey, nbuf);
3491 RCRET(rc);
3492
3493 IWLCTX lx = {
3494 .db = db,
3495 .key = &ekey,
3496 .nlvl = -1
3497 };
3498 API_DB_RLOCK(db, rci);
3499 rc = _lx_find_bounds(&lx);
3500 RCGO(rc, finish);
3501 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
3502 RCGO(rc, finish);
3503 rc = _sblk_loadkvblk_mm(&lx, lx.lower, mm);
3504 RCGO(rc, finish);
3505 rc = _sblk_find_pi_mm(lx.lower, &lx, mm, &found, &idx);
3506 RCGO(rc, finish);
3507 if (found) {
3508 _kvblk_value_peek(lx.lower->kvblk, lx.lower->pi[idx], mm, &oval, &ovalsz);
3509 *vsz = ovalsz;
3510 memcpy(vbuf, oval, MIN(vbufsz, ovalsz));
3511 } else {
3512 rc = IWKV_ERROR_NOTFOUND;
3513 }
3514
3515 finish:
3516 if (mm) {
3517 IWRC(fsm->release_mmap(fsm), rc);
3518 }
3519 _lx_release_mm(&lx, 0);
3520 API_DB_UNLOCK(db, rci, rc);
3521 return rc;
3522 }
3523
iwkv_db_set_meta(IWDB db,void * buf,size_t sz)3524 iwrc iwkv_db_set_meta(IWDB db, void *buf, size_t sz) {
3525 if (!db || !db->iwkv || !buf) {
3526 return IW_ERROR_INVALID_ARGS;
3527 }
3528 if (!sz) {
3529 return 0;
3530 }
3531
3532 int rci;
3533 iwrc rc = 0;
3534 bool resized = false;
3535 uint8_t *mm = 0, *wp, *sp;
3536 IWFS_FSM *fsm = &db->iwkv->fsm;
3537 size_t asz = IW_ROUNDUP(sz, 1U << IWKV_FSM_BPOW);
3538
3539 API_DB_WLOCK(db, rci);
3540 if ((asz > db->meta_blkn) || (asz * 2 <= db->meta_blkn)) {
3541 off_t oaddr = 0;
3542 off_t olen = 0;
3543 if (db->meta_blk) {
3544 rc = fsm->deallocate(fsm, BLK2ADDR(db->meta_blk), BLK2ADDR(db->meta_blkn));
3545 RCGO(rc, finish);
3546 }
3547 rc = fsm->allocate(fsm, asz, &oaddr, &olen, IWKV_FSM_ALLOC_FLAGS);
3548 RCGO(rc, finish);
3549 db->meta_blk = ADDR2BLK(oaddr);
3550 db->meta_blkn = ADDR2BLK(olen);
3551 resized = true;
3552 }
3553 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
3554 RCGO(rc, finish);
3555 wp = mm + BLK2ADDR(db->meta_blk);
3556 memcpy(wp, buf, sz);
3557 if (db->iwkv->dlsnr) {
3558 rc = db->iwkv->dlsnr->onwrite(db->iwkv->dlsnr, wp - mm, wp, sz, 0);
3559 RCGO(rc, finish);
3560 }
3561 if (resized) {
3562 uint32_t lv;
3563 wp = mm + db->addr + DOFF_METABLK_U4;
3564 sp = wp;
3565 IW_WRITELV(wp, lv, db->meta_blk);
3566 IW_WRITELV(wp, lv, db->meta_blkn);
3567 if (db->iwkv->dlsnr) {
3568 rc = db->iwkv->dlsnr->onwrite(db->iwkv->dlsnr, sp - mm, sp, wp - sp, 0);
3569 RCGO(rc, finish);
3570 }
3571 }
3572 fsm->release_mmap(fsm);
3573 mm = 0;
3574
3575 finish:
3576 if (mm) {
3577 fsm->release_mmap(fsm);
3578 }
3579 API_DB_UNLOCK(db, rci, rc);
3580 return rc;
3581 }
3582
iwkv_db_get_meta(IWDB db,void * buf,size_t sz,size_t * rsz)3583 iwrc iwkv_db_get_meta(IWDB db, void *buf, size_t sz, size_t *rsz) {
3584 if (!db || !db->iwkv || !buf) {
3585 return IW_ERROR_INVALID_ARGS;
3586 }
3587 *rsz = 0;
3588 if (!sz || !db->meta_blkn) {
3589 return 0;
3590 }
3591 int rci;
3592 iwrc rc = 0;
3593 uint8_t *mm = 0;
3594 IWFS_FSM *fsm = &db->iwkv->fsm;
3595 size_t rmax = BLK2ADDR(db->meta_blkn);
3596 if (sz > rmax) {
3597 sz = rmax;
3598 }
3599 API_DB_RLOCK(db, rci);
3600 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
3601 RCGO(rc, finish);
3602 memcpy(buf, mm + BLK2ADDR(db->meta_blk), sz);
3603 *rsz = sz;
3604
3605 finish:
3606 if (mm) {
3607 fsm->release_mmap(fsm);
3608 }
3609 API_DB_UNLOCK(db, rci, rc);
3610 return rc;
3611 }
3612
iwkv_del(IWDB db,const IWKV_val * key,iwkv_opflags opflags)3613 iwrc iwkv_del(IWDB db, const IWKV_val *key, iwkv_opflags opflags) {
3614 if (!db || !db->iwkv || !key) {
3615 return IW_ERROR_INVALID_ARGS;
3616 }
3617 int rci;
3618 IWKV_val ekey;
3619 IWKV iwkv = db->iwkv;
3620
3621 uint8_t nbuf[IW_VNUMBUFSZ];
3622 iwrc rc = _to_effective_key(db, key, &ekey, nbuf);
3623 RCRET(rc);
3624 IWLCTX lx = {
3625 .db = db,
3626 .key = &ekey,
3627 .nlvl = -1,
3628 .op = IWLCTX_DEL,
3629 .opflags = opflags
3630 };
3631 API_DB_WLOCK(db, rci);
3632 rc = _lx_del_lw(&lx);
3633 API_DB_UNLOCK(db, rci, rc);
3634 if (!rc) {
3635 if (lx.opflags & IWKV_SYNC) {
3636 rc = _iwkv_sync(iwkv, 0);
3637 } else {
3638 rc = iwal_poke_checkpoint(iwkv, false);
3639 }
3640 }
3641 return rc;
3642 }
3643
_cursor_close_lw(IWKV_cursor cur)3644 IW_INLINE iwrc _cursor_close_lw(IWKV_cursor cur) {
3645 iwrc rc = 0;
3646 cur->closed = true;
3647 IWDB db = cur->lx.db;
3648 pthread_spin_lock(&db->cursors_slk);
3649 for (IWKV_cursor c = db->cursors, pc = 0; c; pc = c, c = c->next) {
3650 if (c == cur) {
3651 if (pc) {
3652 pc->next = c->next;
3653 } else {
3654 db->cursors = c->next;
3655 }
3656 break;
3657 }
3658 }
3659 pthread_spin_unlock(&db->cursors_slk);
3660 return rc;
3661 }
3662
iwkv_cursor_open(IWDB db,IWKV_cursor * curptr,IWKV_cursor_op op,const IWKV_val * key)3663 iwrc iwkv_cursor_open(
3664 IWDB db,
3665 IWKV_cursor *curptr,
3666 IWKV_cursor_op op,
3667 const IWKV_val *key
3668 ) {
3669 if ( !db || !db->iwkv || !curptr
3670 || (key && (op < IWKV_CURSOR_EQ)) || (op < IWKV_CURSOR_BEFORE_FIRST)) {
3671 return IW_ERROR_INVALID_ARGS;
3672 }
3673 iwrc rc;
3674 int rci;
3675 rc = _db_worker_inc_nolk(db);
3676 RCRET(rc);
3677 rc = _api_db_rlock(db);
3678 if (rc) {
3679 _db_worker_dec_nolk(db);
3680 return rc;
3681 }
3682 IWKV_cursor cur = 0;
3683 *curptr = calloc(1, sizeof(**curptr));
3684 if (!(*curptr)) {
3685 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
3686 goto finish;
3687 }
3688 cur = *curptr;
3689 IWLCTX *lx = &cur->lx;
3690 lx->db = db;
3691 lx->nlvl = -1;
3692 if (key) {
3693 rc = _to_effective_key(db, key, &lx->ekey, lx->nbuf);
3694 RCGO(rc, finish);
3695 lx->key = &lx->ekey;
3696 }
3697 rc = _cursor_to_lr(cur, op);
3698
3699 finish:
3700 if (cur) {
3701 if (rc) {
3702 *curptr = 0;
3703 IWRC(_cursor_close_lw(cur), rc);
3704 free(cur);
3705 } else {
3706 pthread_spin_lock(&db->cursors_slk);
3707 cur->next = db->cursors;
3708 db->cursors = cur;
3709 pthread_spin_unlock(&db->cursors_slk);
3710 }
3711 }
3712 API_DB_UNLOCK(db, rci, rc);
3713 if (rc) {
3714 _db_worker_dec_nolk(db);
3715 }
3716 return rc;
3717 }
3718
iwkv_cursor_close(IWKV_cursor * curp)3719 iwrc iwkv_cursor_close(IWKV_cursor *curp) {
3720 iwrc rc = 0;
3721 int rci;
3722 if (!curp || !*curp) {
3723 return 0;
3724 }
3725 IWKV_cursor cur = *curp;
3726 *curp = 0;
3727 IWKV iwkv = cur->lx.db->iwkv;
3728 if (cur->closed) {
3729 free(cur);
3730 return 0;
3731 }
3732 if (!cur->lx.db) {
3733 return IW_ERROR_INVALID_ARGS;
3734 }
3735 API_DB_WLOCK(cur->lx.db, rci);
3736 rc = _cursor_close_lw(cur);
3737 API_DB_UNLOCK(cur->lx.db, rci, rc);
3738 IWRC(_db_worker_dec_nolk(cur->lx.db), rc);
3739 free(cur);
3740 if (!rc) {
3741 rc = iwal_poke_checkpoint(iwkv, false);
3742 }
3743 return rc;
3744 }
3745
iwkv_cursor_to(IWKV_cursor cur,IWKV_cursor_op op)3746 iwrc iwkv_cursor_to(IWKV_cursor cur, IWKV_cursor_op op) {
3747 int rci;
3748 if (!cur) {
3749 return IW_ERROR_INVALID_ARGS;
3750 }
3751 if (!cur->lx.db) {
3752 return IW_ERROR_INVALID_ARGS;
3753 }
3754 API_DB_RLOCK(cur->lx.db, rci);
3755 iwrc rc = _cursor_to_lr(cur, op);
3756 API_DB_UNLOCK(cur->lx.db, rci, rc);
3757 return rc;
3758 }
3759
iwkv_cursor_to_key(IWKV_cursor cur,IWKV_cursor_op op,const IWKV_val * key)3760 iwrc iwkv_cursor_to_key(IWKV_cursor cur, IWKV_cursor_op op, const IWKV_val *key) {
3761 int rci;
3762 if (!cur || ((op != IWKV_CURSOR_EQ) && (op != IWKV_CURSOR_GE))) {
3763 return IW_ERROR_INVALID_ARGS;
3764 }
3765 IWLCTX *lx = &cur->lx;
3766 if (!lx->db) {
3767 return IW_ERROR_INVALID_STATE;
3768 }
3769 iwrc rc = _to_effective_key(lx->db, key, &lx->ekey, lx->nbuf);
3770 RCRET(rc);
3771
3772 API_DB_RLOCK(lx->db, rci);
3773 lx->key = &lx->ekey;
3774 rc = _cursor_to_lr(cur, op);
3775 API_DB_UNLOCK(lx->db, rci, rc);
3776 return rc;
3777 }
3778
iwkv_cursor_get(IWKV_cursor cur,IWKV_val * okey,IWKV_val * oval)3779 iwrc iwkv_cursor_get(
3780 IWKV_cursor cur,
3781 IWKV_val *okey, /* Nullable */
3782 IWKV_val *oval
3783 ) { /* Nullable */
3784 int rci;
3785 iwrc rc = 0;
3786 if (!cur || !cur->lx.db) {
3787 return IW_ERROR_INVALID_ARGS;
3788 }
3789 if (!cur->cn || (cur->cn->flags & SBLK_DB) || (cur->cnpos >= cur->cn->pnum)) {
3790 return IWKV_ERROR_NOTFOUND;
3791 }
3792 IWLCTX *lx = &cur->lx;
3793 API_DB_RLOCK(lx->db, rci);
3794 uint8_t *mm = 0;
3795 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
3796 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
3797 RCGO(rc, finish);
3798 if (!cur->cn->kvblk) {
3799 rc = _sblk_loadkvblk_mm(lx, cur->cn, mm);
3800 RCGO(rc, finish);
3801 }
3802 uint8_t idx = cur->cn->pi[cur->cnpos];
3803 if (okey && oval) {
3804 rc = _kvblk_kv_get(cur->cn->kvblk, mm, idx, okey, oval);
3805 } else if (oval) {
3806 rc = _kvblk_value_get(cur->cn->kvblk, mm, idx, oval);
3807 } else if (okey) {
3808 rc = _kvblk_key_get(cur->cn->kvblk, mm, idx, okey);
3809 } else {
3810 rc = IW_ERROR_INVALID_ARGS;
3811 }
3812 if (!rc && okey) {
3813 _unpack_effective_key(lx->db, okey, false);
3814 }
3815 finish:
3816 if (mm) {
3817 fsm->release_mmap(fsm);
3818 }
3819 API_DB_UNLOCK(lx->db, rci, rc);
3820 return rc;
3821 }
3822
iwkv_cursor_copy_val(IWKV_cursor cur,void * vbuf,size_t vbufsz,size_t * vsz)3823 iwrc iwkv_cursor_copy_val(IWKV_cursor cur, void *vbuf, size_t vbufsz, size_t *vsz) {
3824 int rci;
3825 iwrc rc = 0;
3826 if (!cur || !vbuf || !cur->lx.db) {
3827 return IW_ERROR_INVALID_ARGS;
3828 }
3829 if (!cur->cn || (cur->cn->flags & SBLK_DB) || (cur->cnpos >= cur->cn->pnum)) {
3830 return IWKV_ERROR_NOTFOUND;
3831 }
3832
3833 *vsz = 0;
3834 IWLCTX *lx = &cur->lx;
3835 API_DB_RLOCK(lx->db, rci);
3836 uint8_t *mm = 0, *oval;
3837 uint32_t ovalsz;
3838 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
3839 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
3840 RCGO(rc, finish);
3841 if (!cur->cn->kvblk) {
3842 rc = _sblk_loadkvblk_mm(lx, cur->cn, mm);
3843 RCGO(rc, finish);
3844 }
3845 uint8_t idx = cur->cn->pi[cur->cnpos];
3846 _kvblk_value_peek(cur->cn->kvblk, idx, mm, &oval, &ovalsz);
3847 *vsz = ovalsz;
3848 memcpy(vbuf, oval, MIN(vbufsz, ovalsz));
3849
3850 finish:
3851 if (mm) {
3852 fsm->release_mmap(fsm);
3853 }
3854 API_DB_UNLOCK(lx->db, rci, rc);
3855 return rc;
3856 }
3857
iwkv_cursor_is_matched_key(IWKV_cursor cur,const IWKV_val * key,bool * ores,int64_t * ocompound)3858 iwrc iwkv_cursor_is_matched_key(IWKV_cursor cur, const IWKV_val *key, bool *ores, int64_t *ocompound) {
3859 int rci;
3860 iwrc rc = 0;
3861 if (!cur || !ores || !key || !cur->lx.db) {
3862 return IW_ERROR_INVALID_ARGS;
3863 }
3864 if (!cur->cn || (cur->cn->flags & SBLK_DB) || (cur->cnpos >= cur->cn->pnum)) {
3865 return IWKV_ERROR_NOTFOUND;
3866 }
3867
3868 *ores = 0;
3869 if (ocompound) {
3870 *ocompound = 0;
3871 }
3872
3873 IWLCTX *lx = &cur->lx;
3874 API_DB_RLOCK(lx->db, rci);
3875 uint8_t *mm = 0, *okey;
3876 uint32_t okeysz;
3877 iwdb_flags_t dbflg = lx->db->dbflg;
3878 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
3879 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
3880 RCGO(rc, finish);
3881 if (!cur->cn->kvblk) {
3882 rc = _sblk_loadkvblk_mm(lx, cur->cn, mm);
3883 RCGO(rc, finish);
3884 }
3885
3886 uint8_t idx = cur->cn->pi[cur->cnpos];
3887 rc = _kvblk_key_peek(cur->cn->kvblk, idx, mm, &okey, &okeysz);
3888 RCGO(rc, finish);
3889
3890 if (dbflg & (IWDB_COMPOUND_KEYS | IWDB_VNUM64_KEYS)) {
3891 char nbuf[2 * IW_VNUMBUFSZ];
3892 IWKV_val rkey = { .data = nbuf, .size = okeysz };
3893 memcpy(rkey.data, okey, MIN(rkey.size, sizeof(nbuf)));
3894 rc = _unpack_effective_key(lx->db, &rkey, true);
3895 RCGO(rc, finish);
3896 if (ocompound) {
3897 *ocompound = rkey.compound;
3898 }
3899 if (rkey.size != key->size) {
3900 *ores = false;
3901 goto finish;
3902 }
3903 if (dbflg & IWDB_VNUM64_KEYS) {
3904 *ores = !memcmp(rkey.data, key->data, key->size);
3905 } else {
3906 *ores = !memcmp(okey + (okeysz - rkey.size), key->data, key->size);
3907 }
3908 } else {
3909 *ores = (okeysz == key->size) && !memcmp(okey, key->data, key->size);
3910 }
3911
3912 finish:
3913 if (mm) {
3914 fsm->release_mmap(fsm);
3915 }
3916 API_DB_UNLOCK(cur->lx.db, rci, rc);
3917 return rc;
3918 }
3919
iwkv_cursor_copy_key(IWKV_cursor cur,void * kbuf,size_t kbufsz,size_t * ksz,int64_t * compound)3920 iwrc iwkv_cursor_copy_key(IWKV_cursor cur, void *kbuf, size_t kbufsz, size_t *ksz, int64_t *compound) {
3921 int rci;
3922 iwrc rc = 0;
3923 if (!cur || !cur->lx.db) {
3924 return IW_ERROR_INVALID_ARGS;
3925 }
3926 if (!cur->cn || (cur->cn->flags & SBLK_DB) || (cur->cnpos >= cur->cn->pnum)) {
3927 return IWKV_ERROR_NOTFOUND;
3928 }
3929
3930 *ksz = 0;
3931 IWLCTX *lx = &cur->lx;
3932 API_DB_RLOCK(lx->db, rci);
3933 uint8_t *mm = 0, *okey;
3934 uint32_t okeysz;
3935 iwdb_flags_t dbflg = lx->db->dbflg;
3936 IWFS_FSM *fsm = &lx->db->iwkv->fsm;
3937 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
3938 RCGO(rc, finish);
3939 if (!cur->cn->kvblk) {
3940 rc = _sblk_loadkvblk_mm(lx, cur->cn, mm);
3941 RCGO(rc, finish);
3942 }
3943
3944 uint8_t idx = cur->cn->pi[cur->cnpos];
3945 rc = _kvblk_key_peek(cur->cn->kvblk, idx, mm, &okey, &okeysz);
3946 RCGO(rc, finish);
3947
3948 if (dbflg & (IWDB_COMPOUND_KEYS | IWDB_VNUM64_KEYS)) {
3949 char nbuf[2 * IW_VNUMBUFSZ];
3950 IWKV_val rkey = { .data = nbuf, .size = okeysz };
3951 memcpy(rkey.data, okey, MIN(rkey.size, sizeof(nbuf)));
3952 rc = _unpack_effective_key(lx->db, &rkey, true);
3953 RCGO(rc, finish);
3954 if (compound) {
3955 *compound = rkey.compound;
3956 }
3957 *ksz = rkey.size;
3958 if (dbflg & IWDB_VNUM64_KEYS) {
3959 memcpy(kbuf, rkey.data, MIN(kbufsz, rkey.size));
3960 } else {
3961 memcpy(kbuf, okey + (okeysz - rkey.size), MIN(kbufsz, rkey.size));
3962 }
3963 } else {
3964 *ksz = okeysz;
3965 if (compound) {
3966 *compound = 0;
3967 }
3968 memcpy(kbuf, okey, MIN(kbufsz, okeysz));
3969 }
3970
3971 finish:
3972 if (mm) {
3973 fsm->release_mmap(fsm);
3974 }
3975 API_DB_UNLOCK(cur->lx.db, rci, rc);
3976 return rc;
3977 }
3978
iwkv_cursor_seth(IWKV_cursor cur,IWKV_val * val,iwkv_opflags opflags,IWKV_PUT_HANDLER ph,void * phop)3979 IW_EXPORT iwrc iwkv_cursor_seth(
3980 IWKV_cursor cur, IWKV_val *val, iwkv_opflags opflags,
3981 IWKV_PUT_HANDLER ph, void *phop
3982 ) {
3983 int rci;
3984 iwrc rc = 0, irc = 0;
3985 if (!cur || !cur->lx.db) {
3986 return IW_ERROR_INVALID_ARGS;
3987 }
3988 if (!cur->cn || (cur->cn->flags & SBLK_DB) || (cur->cnpos >= cur->cn->pnum)) {
3989 return IWKV_ERROR_NOTFOUND;
3990 }
3991
3992 IWLCTX *lx = &cur->lx;
3993 IWDB db = lx->db;
3994 IWKV iwkv = db->iwkv;
3995 SBLK *sblk = cur->cn;
3996
3997 API_DB_WLOCK(db, rci);
3998 if (ph) {
3999 uint8_t *mm;
4000 IWKV_val key, oldval;
4001 IWFS_FSM *fsm = &db->iwkv->fsm;
4002 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
4003 RCGO(rc, finish);
4004 rc = _kvblk_kv_get(sblk->kvblk, mm, sblk->pi[cur->cnpos], &key, &oldval);
4005 fsm->release_mmap(fsm);
4006 if (!rc) {
4007 // note: oldval should be disposed by ph
4008 rc = ph(&key, val, &oldval, phop);
4009 _kv_val_dispose(&key);
4010 }
4011 RCGO(rc, finish);
4012 }
4013
4014 rc = _sblk_updatekv(sblk, cur->cnpos, 0, val);
4015 if (IWKV_IS_INTERNAL_RC(rc)) {
4016 irc = rc;
4017 rc = 0;
4018 }
4019 RCGO(rc, finish);
4020
4021 rc = _sblk_sync(lx, sblk);
4022 RCGO(rc, finish);
4023
4024 // Update active cursors inside this block
4025 pthread_spin_lock(&db->cursors_slk);
4026 for (IWKV_cursor c = db->cursors; c; c = c->next) {
4027 if (c->cn && (c->cn->addr == sblk->addr)) {
4028 if (c->cn != sblk) {
4029 memcpy(c->cn, sblk, sizeof(*c->cn));
4030 c->cn->kvblk = 0;
4031 c->cn->flags &= SBLK_PERSISTENT_FLAGS;
4032 }
4033 }
4034 }
4035 pthread_spin_unlock(&db->cursors_slk);
4036
4037 finish:
4038 API_DB_UNLOCK(db, rci, rc);
4039 if (!rc) {
4040 if (opflags & IWKV_SYNC) {
4041 rc = _iwkv_sync(iwkv, 0);
4042 } else {
4043 rc = iwal_poke_checkpoint(iwkv, false);
4044 }
4045 }
4046 return rc ? rc : irc;
4047 }
4048
iwkv_cursor_set(IWKV_cursor cur,IWKV_val * val,iwkv_opflags opflags)4049 iwrc iwkv_cursor_set(IWKV_cursor cur, IWKV_val *val, iwkv_opflags opflags) {
4050 return iwkv_cursor_seth(cur, val, opflags, 0, 0);
4051 }
4052
iwkv_cursor_val(IWKV_cursor cur,IWKV_val * oval)4053 iwrc iwkv_cursor_val(IWKV_cursor cur, IWKV_val *oval) {
4054 return iwkv_cursor_get(cur, 0, oval);
4055 }
4056
iwkv_cursor_key(IWKV_cursor cur,IWKV_val * okey)4057 iwrc iwkv_cursor_key(IWKV_cursor cur, IWKV_val *okey) {
4058 return iwkv_cursor_get(cur, okey, 0);
4059 }
4060
iwkv_cursor_del(IWKV_cursor cur,iwkv_opflags opflags)4061 iwrc iwkv_cursor_del(IWKV_cursor cur, iwkv_opflags opflags) {
4062 int rci;
4063 iwrc rc = 0;
4064 if (!cur || !cur->lx.db) {
4065 return IW_ERROR_INVALID_ARGS;
4066 }
4067 if (!cur->cn || (cur->cn->flags & SBLK_DB) || (cur->cnpos >= cur->cn->pnum)) {
4068 return IWKV_ERROR_NOTFOUND;
4069 }
4070
4071 uint8_t *mm;
4072 SBLK *sblk = cur->cn;
4073 IWLCTX *lx = &cur->lx;
4074 IWDB db = lx->db;
4075 IWKV iwkv = db->iwkv;
4076 IWFS_FSM *fsm = &iwkv->fsm;
4077
4078 API_DB_WLOCK(db, rci);
4079 if (sblk->pnum == 1) { // sblk will be removed
4080 IWKV_val key = { 0 };
4081 // Key a key
4082 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
4083 RCGO(rc, finish2);
4084 if (!sblk->kvblk) {
4085 rc = _sblk_loadkvblk_mm(lx, sblk, mm);
4086 fsm->release_mmap(fsm);
4087 RCGO(rc, finish2);
4088 }
4089 rc = _kvblk_key_get(sblk->kvblk, mm, sblk->pi[cur->cnpos], &key);
4090 fsm->release_mmap(fsm);
4091 RCGO(rc, finish2);
4092
4093 lx->key = &key;
4094 rc = _lx_del_sblk_lw(lx, sblk, cur->cnpos);
4095 lx->key = 0;
4096
4097 finish2:
4098 if (rc) {
4099 _lx_release_mm(lx, 0);
4100 } else {
4101 rc = _lx_release(lx);
4102 }
4103 if (key.data) {
4104 _kv_val_dispose(&key);
4105 }
4106 } else { // Simple case
4107 if (!sblk->kvblk) {
4108 rc = fsm->acquire_mmap(fsm, 0, &mm, 0);
4109 RCGO(rc, finish);
4110 rc = _sblk_loadkvblk_mm(lx, sblk, mm);
4111 fsm->release_mmap(fsm);
4112 RCGO(rc, finish);
4113 }
4114 rc = _sblk_rmkv(sblk, cur->cnpos);
4115 RCGO(rc, finish);
4116 rc = _sblk_sync(lx, sblk);
4117 }
4118
4119 finish:
4120 API_DB_UNLOCK(db, rci, rc);
4121 if (!rc) {
4122 if (opflags & IWKV_SYNC) {
4123 rc = _iwkv_sync(iwkv, 0);
4124 } else {
4125 rc = iwal_poke_checkpoint(iwkv, false);
4126 }
4127 }
4128 return rc;
4129 }
4130
4131 #include "./dbg/iwkvdbg.c"
4132