1 #include "ejdb2_internal.h"
2
3 // ---------------------------------------------------------------------------
4
5 static iwrc _jb_put_new_lw(JBCOLL jbc, JBL jbl, int64_t *id);
6
7 static const IWKV_val EMPTY_VAL = { 0 };
8
_jb_meta_nrecs_removedb(EJDB db,uint32_t dbid)9 IW_INLINE iwrc _jb_meta_nrecs_removedb(EJDB db, uint32_t dbid) {
10 dbid = IW_HTOIL(dbid);
11 IWKV_val key = {
12 .size = sizeof(dbid),
13 .data = &dbid
14 };
15 return iwkv_del(db->nrecdb, &key, 0);
16 }
17
_jb_meta_nrecs_update(EJDB db,uint32_t dbid,int64_t delta)18 IW_INLINE iwrc _jb_meta_nrecs_update(EJDB db, uint32_t dbid, int64_t delta) {
19 delta = IW_HTOILL(delta);
20 dbid = IW_HTOIL(dbid);
21 IWKV_val val = {
22 .size = sizeof(delta),
23 .data = &delta
24 };
25 IWKV_val key = {
26 .size = sizeof(dbid),
27 .data = &dbid
28 };
29 return iwkv_put(db->nrecdb, &key, &val, IWKV_VAL_INCREMENT);
30 }
31
_jb_meta_nrecs_get(EJDB db,uint32_t dbid)32 static int64_t _jb_meta_nrecs_get(EJDB db, uint32_t dbid) {
33 size_t vsz = 0;
34 uint64_t ret = 0;
35 dbid = IW_HTOIL(dbid);
36 IWKV_val key = {
37 .size = sizeof(dbid),
38 .data = &dbid
39 };
40 iwkv_get_copy(db->nrecdb, &key, &ret, sizeof(ret), &vsz);
41 if (vsz == sizeof(ret)) {
42 ret = IW_ITOHLL(ret);
43 }
44 return (int64_t) ret;
45 }
46
_jb_idx_release(JBIDX idx)47 static void _jb_idx_release(JBIDX idx) {
48 if (idx->idb) {
49 iwkv_db_cache_release(idx->idb);
50 }
51 free(idx->ptr);
52 free(idx);
53 }
54
_jb_coll_release(JBCOLL jbc)55 static void _jb_coll_release(JBCOLL jbc) {
56 if (jbc->cdb) {
57 iwkv_db_cache_release(jbc->cdb);
58 }
59 if (jbc->meta) {
60 jbl_destroy(&jbc->meta);
61 }
62 JBIDX nidx;
63 for (JBIDX idx = jbc->idx; idx; idx = nidx) {
64 nidx = idx->next;
65 _jb_idx_release(idx);
66 }
67 jbc->idx = 0;
68 pthread_rwlock_destroy(&jbc->rwl);
69 free(jbc);
70 }
71
_jb_coll_load_index_lr(JBCOLL jbc,IWKV_val * mval)72 static iwrc _jb_coll_load_index_lr(JBCOLL jbc, IWKV_val *mval) {
73 binn *bn;
74 char *ptr;
75 struct _JBL imeta;
76 JBIDX idx = calloc(1, sizeof(*idx));
77 if (!idx) {
78 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
79 }
80
81 iwrc rc = jbl_from_buf_keep_onstack(&imeta, mval->data, mval->size);
82 RCGO(rc, finish);
83 bn = &imeta.bn;
84
85 if ( !binn_object_get_str(bn, "ptr", &ptr)
86 || !binn_object_get_uint8(bn, "mode", &idx->mode)
87 || !binn_object_get_uint8(bn, "idbf", &idx->idbf)
88 || !binn_object_get_uint32(bn, "dbid", &idx->dbid)) {
89 rc = EJDB_ERROR_INVALID_COLLECTION_INDEX_META;
90 goto finish;
91 }
92 rc = jbl_ptr_alloc(ptr, &idx->ptr);
93 RCGO(rc, finish);
94
95 rc = iwkv_db(jbc->db->iwkv, idx->dbid, idx->idbf, &idx->idb);
96 RCGO(rc, finish);
97 idx->jbc = jbc;
98 idx->rnum = _jb_meta_nrecs_get(jbc->db, idx->dbid);
99 idx->next = jbc->idx;
100 jbc->idx = idx;
101
102 finish:
103 if (rc) {
104 _jb_idx_release(idx);
105 }
106 return rc;
107 }
108
_jb_coll_load_indexes_lr(JBCOLL jbc)109 static iwrc _jb_coll_load_indexes_lr(JBCOLL jbc) {
110 iwrc rc = 0;
111 IWKV_cursor cur;
112 IWKV_val kval;
113 char buf[sizeof(KEY_PREFIX_IDXMETA) + JBNUMBUF_SIZE];
114 // Full key format: i.<coldbid>.<idxdbid>
115 int sz = snprintf(buf, sizeof(buf), KEY_PREFIX_IDXMETA "%u.", jbc->dbid);
116 if (sz >= sizeof(buf)) {
117 return IW_ERROR_OVERFLOW;
118 }
119 kval.data = buf;
120 kval.size = sz;
121 rc = iwkv_cursor_open(jbc->db->metadb, &cur, IWKV_CURSOR_GE, &kval);
122 if (rc == IWKV_ERROR_NOTFOUND) {
123 rc = 0;
124 goto finish;
125 }
126 RCRET(rc);
127
128 do {
129 IWKV_val key, val;
130 rc = iwkv_cursor_key(cur, &key);
131 RCGO(rc, finish);
132 if ((key.size > sz) && !strncmp(buf, key.data, sz)) {
133 iwkv_val_dispose(&key);
134 rc = iwkv_cursor_val(cur, &val);
135 RCGO(rc, finish);
136 rc = _jb_coll_load_index_lr(jbc, &val);
137 iwkv_val_dispose(&val);
138 RCBREAK(rc);
139 } else {
140 iwkv_val_dispose(&key);
141 }
142 } while (!(rc = iwkv_cursor_to(cur, IWKV_CURSOR_PREV)));
143 if (rc == IWKV_ERROR_NOTFOUND) {
144 rc = 0;
145 }
146
147 finish:
148 iwkv_cursor_close(&cur);
149 return rc;
150 }
151
_jb_coll_load_meta_lr(JBCOLL jbc)152 static iwrc _jb_coll_load_meta_lr(JBCOLL jbc) {
153 JBL jbv;
154 IWKV_cursor cur;
155 JBL jbm = jbc->meta;
156 iwrc rc = jbl_at(jbm, "/name", &jbv);
157 RCRET(rc);
158 jbc->name = jbl_get_str(jbv);
159 jbl_destroy(&jbv);
160 if (!jbc->name) {
161 return EJDB_ERROR_INVALID_COLLECTION_META;
162 }
163 rc = jbl_at(jbm, "/id", &jbv);
164 RCRET(rc);
165 jbc->dbid = (uint32_t) jbl_get_i64(jbv);
166 jbl_destroy(&jbv);
167 if (!jbc->dbid) {
168 return EJDB_ERROR_INVALID_COLLECTION_META;
169 }
170 rc = iwkv_db(jbc->db->iwkv, jbc->dbid, IWDB_VNUM64_KEYS, &jbc->cdb);
171 RCRET(rc);
172
173 jbc->rnum = _jb_meta_nrecs_get(jbc->db, jbc->dbid);
174
175 rc = _jb_coll_load_indexes_lr(jbc);
176 RCRET(rc);
177
178 rc = iwkv_cursor_open(jbc->cdb, &cur, IWKV_CURSOR_BEFORE_FIRST, 0);
179 RCRET(rc);
180 rc = iwkv_cursor_to(cur, IWKV_CURSOR_NEXT);
181 if (rc) {
182 if (rc == IWKV_ERROR_NOTFOUND) {
183 rc = 0;
184 }
185 } else {
186 size_t sz;
187 rc = iwkv_cursor_copy_key(cur, &jbc->id_seq, sizeof(jbc->id_seq), &sz, 0);
188 RCGO(rc, finish);
189 }
190
191 finish:
192 iwkv_cursor_close(&cur);
193 return rc;
194 }
195
_jb_coll_init(JBCOLL jbc,IWKV_val * meta)196 static iwrc _jb_coll_init(JBCOLL jbc, IWKV_val *meta) {
197 int rci;
198 iwrc rc = 0;
199
200 pthread_rwlockattr_t attr;
201 pthread_rwlockattr_init(&attr);
202 #if defined __linux__ && (defined __USE_UNIX98 || defined __USE_XOPEN2K)
203 pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
204 #endif
205 pthread_rwlock_init(&jbc->rwl, &attr);
206 if (meta) {
207 rc = jbl_from_buf_keep(&jbc->meta, meta->data, meta->size, false);
208 RCRET(rc);
209 }
210 if (!jbc->meta) {
211 iwlog_error("Collection %s seems to be initialized", jbc->name);
212 return IW_ERROR_INVALID_STATE;
213 }
214 rc = _jb_coll_load_meta_lr(jbc);
215 RCRET(rc);
216
217 khiter_t k = kh_put(JBCOLLM, jbc->db->mcolls, jbc->name, &rci);
218 if (rci != -1) {
219 kh_value(jbc->db->mcolls, k) = jbc;
220 } else {
221 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
222 }
223 return rc;
224 }
225
_jb_idx_add_meta_lr(JBIDX idx,binn * list)226 static iwrc _jb_idx_add_meta_lr(JBIDX idx, binn *list) {
227 iwrc rc = 0;
228 IWXSTR *xstr = iwxstr_new();
229 if (!xstr) {
230 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
231 }
232 binn *meta = binn_object();
233 if (!meta) {
234 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
235 iwxstr_destroy(xstr);
236 return rc;
237 }
238 rc = jbl_ptr_serialize(idx->ptr, xstr);
239 RCGO(rc, finish);
240
241 if ( !binn_object_set_str(meta, "ptr", iwxstr_ptr(xstr))
242 || !binn_object_set_uint32(meta, "mode", idx->mode)
243 || !binn_object_set_uint32(meta, "idbf", idx->idbf)
244 || !binn_object_set_uint32(meta, "dbid", idx->dbid)
245 || !binn_object_set_int64(meta, "rnum", idx->rnum)) {
246 rc = JBL_ERROR_CREATION;
247 }
248
249 if (!binn_list_add_object(list, meta)) {
250 rc = JBL_ERROR_CREATION;
251 }
252
253 finish:
254 iwxstr_destroy(xstr);
255 binn_free(meta);
256 return rc;
257 }
258
_jb_coll_add_meta_lr(JBCOLL jbc,binn * list)259 static iwrc _jb_coll_add_meta_lr(JBCOLL jbc, binn *list) {
260 iwrc rc = 0;
261 binn *ilist = 0;
262 binn *meta = binn_object();
263 if (!meta) {
264 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
265 return rc;
266 }
267 if ( !binn_object_set_str(meta, "name", jbc->name)
268 || !binn_object_set_uint32(meta, "dbid", jbc->dbid)
269 || !binn_object_set_int64(meta, "rnum", jbc->rnum)) {
270 rc = JBL_ERROR_CREATION;
271 goto finish;
272 }
273 ilist = binn_list();
274 if (!ilist) {
275 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
276 goto finish;
277 }
278 for (JBIDX idx = jbc->idx; idx; idx = idx->next) {
279 rc = _jb_idx_add_meta_lr(idx, ilist);
280 RCGO(rc, finish);
281 }
282 if (!binn_object_set_list(meta, "indexes", ilist)) {
283 rc = JBL_ERROR_CREATION;
284 goto finish;
285 }
286 if (!binn_list_add_value(list, meta)) {
287 rc = JBL_ERROR_CREATION;
288 goto finish;
289 }
290
291 finish:
292 binn_free(meta);
293 if (ilist) {
294 binn_free(ilist);
295 }
296 return rc;
297 }
298
_jb_db_meta_load(EJDB db)299 static iwrc _jb_db_meta_load(EJDB db) {
300 iwrc rc = 0;
301 if (!db->metadb) {
302 rc = iwkv_db(db->iwkv, METADB_ID, 0, &db->metadb);
303 RCRET(rc);
304 }
305 if (!db->nrecdb) {
306 rc = iwkv_db(db->iwkv, NUMRECSDB_ID, IWDB_VNUM64_KEYS, &db->nrecdb);
307 RCRET(rc);
308 }
309
310 IWKV_cursor cur;
311 rc = iwkv_cursor_open(db->metadb, &cur, IWKV_CURSOR_BEFORE_FIRST, 0);
312 RCRET(rc);
313 while (!(rc = iwkv_cursor_to(cur, IWKV_CURSOR_NEXT))) {
314 IWKV_val key, val;
315 rc = iwkv_cursor_get(cur, &key, &val);
316 RCGO(rc, finish);
317 if (!strncmp(key.data, KEY_PREFIX_COLLMETA, sizeof(KEY_PREFIX_COLLMETA) - 1)) {
318 JBCOLL jbc = calloc(1, sizeof(*jbc));
319 if (!jbc) {
320 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
321 iwkv_val_dispose(&val);
322 goto finish;
323 }
324 jbc->db = db;
325 rc = _jb_coll_init(jbc, &val);
326 if (rc) {
327 _jb_coll_release(jbc);
328 iwkv_val_dispose(&key);
329 goto finish;
330 }
331 } else {
332 iwkv_val_dispose(&val);
333 }
334 iwkv_val_dispose(&key);
335 }
336 if (rc == IWKV_ERROR_NOTFOUND) {
337 rc = 0;
338 }
339
340 finish:
341 iwkv_cursor_close(&cur);
342 return rc;
343 }
344
_jb_db_release(EJDB * dbp)345 static iwrc _jb_db_release(EJDB *dbp) {
346 iwrc rc = 0;
347 EJDB db = *dbp;
348 *dbp = 0;
349 #ifdef JB_HTTP
350 if (db->jbr) {
351 IWRC(jbr_shutdown(&db->jbr), rc);
352 }
353 #endif
354 if (db->mcolls) {
355 for (khiter_t k = kh_begin(db->mcolls); k != kh_end(db->mcolls); ++k) {
356 if (!kh_exist(db->mcolls, k)) {
357 continue;
358 }
359 JBCOLL jbc = kh_val(db->mcolls, k);
360 _jb_coll_release(jbc);
361 }
362 kh_destroy(JBCOLLM, db->mcolls);
363 db->mcolls = 0;
364 }
365 if (db->iwkv) {
366 IWRC(iwkv_close(&db->iwkv), rc);
367 }
368 pthread_rwlock_destroy(&db->rwl);
369
370 EJDB_HTTP *http = &db->opts.http;
371 if (http->bind) {
372 free((void*) http->bind);
373 }
374 if (http->access_token) {
375 free((void*) http->access_token);
376 }
377 free(db);
378 return rc;
379 }
380
_jb_coll_acquire_keeplock2(EJDB db,const char * coll,jb_coll_acquire_t acm,JBCOLL * jbcp)381 static iwrc _jb_coll_acquire_keeplock2(EJDB db, const char *coll, jb_coll_acquire_t acm, JBCOLL *jbcp) {
382 if (strlen(coll) > EJDB_COLLECTION_NAME_MAX_LEN) {
383 return EJDB_ERROR_INVALID_COLLECTION_NAME;
384 }
385 int rci;
386 iwrc rc = 0;
387 *jbcp = 0;
388 JBCOLL jbc = 0;
389 bool wl = acm & JB_COLL_ACQUIRE_WRITE;
390 API_RLOCK(db, rci);
391 khiter_t k = kh_get(JBCOLLM, db->mcolls, coll);
392 if (k != kh_end(db->mcolls)) {
393 jbc = kh_value(db->mcolls, k);
394 assert(jbc);
395 rci = wl ? pthread_rwlock_wrlock(&jbc->rwl) : pthread_rwlock_rdlock(&jbc->rwl);
396 if (rci) {
397 rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
398 goto finish;
399 }
400 *jbcp = jbc;
401 } else {
402 pthread_rwlock_unlock(&db->rwl); // relock
403 if ((db->oflags & IWKV_RDONLY) || (acm & JB_COLL_ACQUIRE_EXISTING)) {
404 return IW_ERROR_NOT_EXISTS;
405 }
406 API_WLOCK(db, rci);
407 k = kh_get(JBCOLLM, db->mcolls, coll);
408 if (k != kh_end(db->mcolls)) {
409 jbc = kh_value(db->mcolls, k);
410 assert(jbc);
411 rci = pthread_rwlock_rdlock(&jbc->rwl);
412 if (rci) {
413 rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
414 goto finish;
415 }
416 *jbcp = jbc;
417 } else {
418 JBL meta = 0;
419 IWDB cdb = 0;
420 uint32_t dbid = 0;
421 char keybuf[JBNUMBUF_SIZE + sizeof(KEY_PREFIX_COLLMETA)];
422 IWKV_val key, val;
423
424 rc = iwkv_new_db(db->iwkv, IWDB_VNUM64_KEYS, &dbid, &cdb);
425 RCGO(rc, create_finish);
426 jbc = calloc(1, sizeof(*jbc));
427 if (!jbc) {
428 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
429 goto create_finish;
430 }
431 rc = jbl_create_empty_object(&meta);
432 RCGO(rc, create_finish);
433 if (!binn_object_set_str(&meta->bn, "name", coll)) {
434 rc = JBL_ERROR_CREATION;
435 goto create_finish;
436 }
437 if (!binn_object_set_uint32(&meta->bn, "id", dbid)) {
438 rc = JBL_ERROR_CREATION;
439 goto create_finish;
440 }
441 rc = jbl_as_buf(meta, &val.data, &val.size);
442 RCGO(rc, create_finish);
443
444 key.size = snprintf(keybuf, sizeof(keybuf), KEY_PREFIX_COLLMETA "%u", dbid);
445 if (key.size >= sizeof(keybuf)) {
446 rc = IW_ERROR_OVERFLOW;
447 goto create_finish;
448 }
449 key.data = keybuf;
450 rc = iwkv_put(db->metadb, &key, &val, IWKV_SYNC);
451 RCGO(rc, create_finish);
452
453 jbc->db = db;
454 jbc->meta = meta;
455 rc = _jb_coll_init(jbc, 0);
456 if (rc) {
457 iwkv_del(db->metadb, &key, IWKV_SYNC);
458 goto create_finish;
459 }
460
461 create_finish:
462 if (rc) {
463 if (meta) {
464 jbl_destroy(&meta);
465 }
466 if (cdb) {
467 iwkv_db_destroy(&cdb);
468 }
469 if (jbc) {
470 jbc->meta = 0; // meta was cleared
471 _jb_coll_release(jbc);
472 }
473 } else {
474 rci = wl ? pthread_rwlock_wrlock(&jbc->rwl) : pthread_rwlock_rdlock(&jbc->rwl); // -V522
475 if (rci) {
476 rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
477 goto finish;
478 }
479 *jbcp = jbc;
480 }
481 }
482 }
483
484 finish:
485 if (rc) {
486 pthread_rwlock_unlock(&db->rwl);
487 }
488 return rc;
489 }
490
_jb_coll_acquire_keeplock(EJDB db,const char * coll,bool wl,JBCOLL * jbcp)491 IW_INLINE iwrc _jb_coll_acquire_keeplock(EJDB db, const char *coll, bool wl, JBCOLL *jbcp) {
492 return _jb_coll_acquire_keeplock2(db, coll, wl ? JB_COLL_ACQUIRE_WRITE : 0, jbcp);
493 }
494
_jb_idx_record_add(JBIDX idx,int64_t id,JBL jbl,JBL jblprev)495 static iwrc _jb_idx_record_add(JBIDX idx, int64_t id, JBL jbl, JBL jblprev) {
496 IWKV_val key;
497 uint8_t step;
498 char vnbuf[IW_VNUMBUFSZ];
499 char numbuf[JBNUMBUF_SIZE];
500
501 bool jbv_found, jbvprev_found;
502 struct _JBL jbv = { 0 }, jbvprev = { 0 };
503 jbl_type_t jbv_type, jbvprev_type;
504
505 iwrc rc = 0;
506 IWPOOL *pool = 0;
507 int64_t delta = 0; // delta of added/removed index records
508 bool compound = idx->idbf & IWDB_COMPOUND_KEYS;
509
510 jbvprev_found = jblprev ? _jbl_at(jblprev, idx->ptr, &jbvprev) : false;
511 jbv_found = jbl ? _jbl_at(jbl, idx->ptr, &jbv) : false;
512
513 jbv_type = jbl_type(&jbv);
514 jbvprev_type = jbl_type(&jbvprev);
515
516 // Do not index NULLs, OBJECTs, ARRAYs (in `EJDB_IDX_UNIQUE` mode)
517 if ( ((jbvprev_type == JBV_OBJECT) || (jbvprev_type <= JBV_NULL))
518 || ((jbvprev_type == JBV_ARRAY) && !compound)) {
519 jbvprev_found = false;
520 }
521 if ( ((jbv_type == JBV_OBJECT) || (jbv_type <= JBV_NULL))
522 || ((jbv_type == JBV_ARRAY) && !compound)) {
523 jbv_found = false;
524 }
525
526 if ( compound
527 && (jbv_type == jbvprev_type)
528 && (jbvprev_type == JBV_ARRAY)) { // compare next/prev obj arrays
529 pool = iwpool_create(1024);
530 if (!pool) {
531 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
532 goto finish;
533 }
534 JBL_NODE jbvprev_node, jbv_node;
535 rc = jbl_to_node(&jbv, &jbv_node, false, pool);
536 RCGO(rc, finish);
537 jbv.node = jbv_node;
538
539 rc = jbl_to_node(&jbvprev, &jbvprev_node, false, pool);
540 RCGO(rc, finish);
541 jbvprev.node = jbvprev_node;
542
543 if (_jbl_compare_nodes(jbv_node, jbvprev_node, &rc) == 0) {
544 goto finish; // Arrays are equal or error
545 }
546 } else if (_jbl_is_eq_atomic_values(&jbv, &jbvprev)) {
547 return 0;
548 }
549
550 if (jbvprev_found) { // Remove old index elements
551 if (jbvprev_type == JBV_ARRAY) { // TODO: array modification delta?
552 JBL_NODE n;
553 if (!pool) {
554 pool = iwpool_create(1024);
555 if (!pool) {
556 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
557 RCGO(rc, finish);
558 }
559 }
560 rc = jbl_to_node(&jbvprev, &n, false, pool);
561 RCGO(rc, finish);
562 for (n = n->child; n; n = n->next) {
563 jbi_node_fill_ikey(idx, n, &key, numbuf);
564 if (key.size) {
565 key.compound = id;
566 rc = iwkv_del(idx->idb, &key, 0);
567 if (!rc) {
568 --delta;
569 } else if (rc == IWKV_ERROR_NOTFOUND) {
570 rc = 0;
571 }
572 RCGO(rc, finish);
573 }
574 }
575 } else {
576 jbi_jbl_fill_ikey(idx, &jbvprev, &key, numbuf);
577 if (key.size) {
578 key.compound = id;
579 rc = iwkv_del(idx->idb, &key, 0);
580 if (!rc) {
581 --delta;
582 } else if (rc == IWKV_ERROR_NOTFOUND) {
583 rc = 0;
584 }
585 RCGO(rc, finish);
586 }
587 }
588 }
589
590 if (jbv_found) { // Add index record
591 if (jbv_type == JBV_ARRAY) { // TODO: array modification delta?
592 JBL_NODE n;
593 if (!pool) {
594 pool = iwpool_create(1024);
595 if (!pool) {
596 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
597 RCGO(rc, finish);
598 }
599 }
600 rc = jbl_to_node(&jbv, &n, false, pool);
601 RCGO(rc, finish);
602 for (n = n->child; n; n = n->next) {
603 jbi_node_fill_ikey(idx, n, &key, numbuf);
604 if (key.size) {
605 key.compound = id;
606 rc = iwkv_put(idx->idb, &key, &EMPTY_VAL, IWKV_NO_OVERWRITE);
607 if (!rc) {
608 ++delta;
609 } else if (rc == IWKV_ERROR_KEY_EXISTS) {
610 rc = 0;
611 } else {
612 goto finish;
613 }
614 }
615 }
616 } else {
617 jbi_jbl_fill_ikey(idx, &jbv, &key, numbuf);
618 if (key.size) {
619 if (compound) {
620 key.compound = id;
621 rc = iwkv_put(idx->idb, &key, &EMPTY_VAL, IWKV_NO_OVERWRITE);
622 if (!rc) {
623 ++delta;
624 } else if (rc == IWKV_ERROR_KEY_EXISTS) {
625 rc = 0;
626 }
627 } else {
628 IW_SETVNUMBUF64(step, vnbuf, id);
629 IWKV_val idval = {
630 .data = vnbuf,
631 .size = step
632 };
633 rc = iwkv_put(idx->idb, &key, &idval, IWKV_NO_OVERWRITE);
634 if (!rc) {
635 ++delta;
636 } else if (rc == IWKV_ERROR_KEY_EXISTS) {
637 rc = EJDB_ERROR_UNIQUE_INDEX_CONSTRAINT_VIOLATED;
638 goto finish;
639 }
640 }
641 }
642 }
643 }
644
645 finish:
646 if (pool) {
647 iwpool_destroy(pool);
648 }
649 if (delta && !_jb_meta_nrecs_update(idx->jbc->db, idx->dbid, delta)) {
650 idx->rnum += delta;
651 }
652 return rc;
653 }
654
_jb_idx_record_remove(JBIDX idx,int64_t id,JBL jbl)655 IW_INLINE iwrc _jb_idx_record_remove(JBIDX idx, int64_t id, JBL jbl) {
656 return _jb_idx_record_add(idx, id, 0, jbl);
657 }
658
_jb_idx_fill(JBIDX idx)659 static iwrc _jb_idx_fill(JBIDX idx) {
660 IWKV_cursor cur;
661 IWKV_val key, val;
662 struct _JBL jbs;
663 int64_t llv;
664 JBL jbl = &jbs;
665
666 iwrc rc = iwkv_cursor_open(idx->jbc->cdb, &cur, IWKV_CURSOR_BEFORE_FIRST, 0);
667 while (!rc) {
668 rc = iwkv_cursor_to(cur, IWKV_CURSOR_NEXT);
669 if (rc == IWKV_ERROR_NOTFOUND) {
670 rc = 0;
671 break;
672 }
673 rc = iwkv_cursor_get(cur, &key, &val);
674 RCBREAK(rc);
675 if (!binn_load(val.data, &jbs.bn)) {
676 rc = JBL_ERROR_CREATION;
677 break;
678 }
679 memcpy(&llv, key.data, sizeof(llv));
680 rc = _jb_idx_record_add(idx, llv, jbl, 0);
681 iwkv_kv_dispose(&key, &val);
682 }
683 IWRC(iwkv_cursor_close(&cur), rc);
684 return rc;
685 }
686
687 // Used to avoid deadlocks within a `iwkv_put` context
_jb_put_handler_after(iwrc rc,struct _JBPHCTX * ctx)688 static iwrc _jb_put_handler_after(iwrc rc, struct _JBPHCTX *ctx) {
689 IWKV_val *oldval = &ctx->oldval;
690 if (rc) {
691 if (oldval->size) {
692 iwkv_val_dispose(oldval);
693 }
694 return rc;
695 }
696 JBL prev;
697 struct _JBL jblprev;
698 JBCOLL jbc = ctx->jbc;
699 if (oldval->size) {
700 rc = jbl_from_buf_keep_onstack(&jblprev, oldval->data, oldval->size);
701 RCRET(rc);
702 prev = &jblprev;
703 } else {
704 prev = 0;
705 }
706 JBIDX fail_idx = 0;
707 for (JBIDX idx = jbc->idx; idx; idx = idx->next) {
708 rc = _jb_idx_record_add(idx, ctx->id, ctx->jbl, prev);
709 if (rc) {
710 fail_idx = idx;
711 goto finish;
712 }
713 }
714 if (!prev) {
715 _jb_meta_nrecs_update(jbc->db, jbc->dbid, 1);
716 jbc->rnum += 1;
717 }
718
719 finish:
720 if (oldval->size) {
721 iwkv_val_dispose(oldval);
722 }
723 if (rc && !oldval->size) {
724 // Cleanup on error inserting new record
725 IWKV_val key = { .data = &ctx->id, .size = sizeof(ctx->id) };
726 for (JBIDX idx = jbc->idx; idx && idx != fail_idx; idx = idx->next) {
727 IWRC(_jb_idx_record_remove(idx, ctx->id, ctx->jbl), rc);
728 }
729 IWRC(iwkv_del(jbc->cdb, &key, 0), rc);
730 }
731 return rc;
732 }
733
_jb_put_handler(const IWKV_val * key,const IWKV_val * val,IWKV_val * oldval,void * op)734 static iwrc _jb_put_handler(const IWKV_val *key, const IWKV_val *val, IWKV_val *oldval, void *op) {
735 struct _JBPHCTX *ctx = op;
736 if (oldval && oldval->size) {
737 memcpy(&ctx->oldval, oldval, sizeof(*oldval));
738 }
739 return 0;
740 }
741
_jb_exec_scan_init(JBEXEC * ctx)742 static iwrc _jb_exec_scan_init(JBEXEC *ctx) {
743 ctx->istep = 1;
744 ctx->jblbufsz = ctx->jbc->db->opts.document_buffer_sz;
745 ctx->jblbuf = malloc(ctx->jblbufsz);
746 if (!ctx->jblbuf) {
747 ctx->jblbufsz = 0;
748 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
749 }
750 struct JQP_AUX *aux = ctx->ux->q->aux;
751 if (aux->expr->flags & JQP_EXPR_NODE_FLAG_PK) { // Select by primary key
752 ctx->scanner = jbi_pk_scanner;
753 if (ctx->ux->log) {
754 iwxstr_cat2(ctx->ux->log, "[INDEX] PK");
755 }
756 return 0;
757 }
758 iwrc rc = jbi_selection(ctx);
759 RCRET(rc);
760 if (ctx->midx.idx) {
761 if (ctx->midx.idx->idbf & IWDB_COMPOUND_KEYS) {
762 ctx->scanner = jbi_dup_scanner;
763 } else {
764 ctx->scanner = jbi_uniq_scanner;
765 }
766 } else {
767 ctx->scanner = jbi_full_scanner;
768 if (ctx->ux->log) {
769 iwxstr_cat2(ctx->ux->log, "[INDEX] NO");
770 }
771 }
772 return 0;
773 }
774
_jb_exec_scan_release(JBEXEC * ctx)775 static void _jb_exec_scan_release(JBEXEC *ctx) {
776 if (ctx->proj_joined_nodes_cache) {
777 // Destroy projected nodes key
778 iwstree_destroy(ctx->proj_joined_nodes_cache);
779 }
780 if (ctx->proj_joined_nodes_pool) {
781 iwpool_destroy(ctx->proj_joined_nodes_pool);
782 }
783 free(ctx->jblbuf);
784 }
785
_jb_noop_visitor(struct _EJDB_EXEC * ctx,EJDB_DOC doc,int64_t * step)786 static iwrc _jb_noop_visitor(struct _EJDB_EXEC *ctx, EJDB_DOC doc, int64_t *step) {
787 return 0;
788 }
789
_jb_put_impl(JBCOLL jbc,JBL jbl,int64_t id)790 IW_INLINE iwrc _jb_put_impl(JBCOLL jbc, JBL jbl, int64_t id) {
791 IWKV_val val, key = {
792 .data = &id,
793 .size = sizeof(id)
794 };
795 struct _JBPHCTX pctx = {
796 .id = id,
797 .jbc = jbc,
798 .jbl = jbl
799 };
800 iwrc rc = jbl_as_buf(jbl, &val.data, &val.size);
801 RCRET(rc);
802 return _jb_put_handler_after(iwkv_puth(jbc->cdb, &key, &val, 0, _jb_put_handler, &pctx), &pctx);
803 }
804
jb_put(JBCOLL jbc,JBL jbl,int64_t id)805 iwrc jb_put(JBCOLL jbc, JBL jbl, int64_t id) {
806 return _jb_put_impl(jbc, jbl, id);
807 }
808
jb_cursor_set(JBCOLL jbc,IWKV_cursor cur,int64_t id,JBL jbl)809 iwrc jb_cursor_set(JBCOLL jbc, IWKV_cursor cur, int64_t id, JBL jbl) {
810 IWKV_val val;
811 struct _JBPHCTX pctx = {
812 .id = id,
813 .jbc = jbc,
814 .jbl = jbl
815 };
816 iwrc rc = jbl_as_buf(jbl, &val.data, &val.size);
817 RCRET(rc);
818 return _jb_put_handler_after(iwkv_cursor_seth(cur, &val, 0, _jb_put_handler, &pctx), &pctx);
819 }
820
_jb_exec_upsert_lw(JBEXEC * ctx)821 static iwrc _jb_exec_upsert_lw(JBEXEC *ctx) {
822 JBL_NODE n;
823 int64_t id;
824 iwrc rc = 0;
825 JBL jbl = 0;
826 EJDB_EXEC *ux = ctx->ux;
827 JQL q = ux->q;
828 if (q->aux->apply_placeholder) {
829 JQVAL *pv = jql_find_placeholder(q, q->aux->apply_placeholder);
830 if (!pv || (pv->type != JQVAL_JBLNODE) || !pv->vnode) {
831 rc = JQL_ERROR_INVALID_PLACEHOLDER_VALUE_TYPE;
832 goto finish;
833 }
834 n = pv->vnode;
835 } else {
836 n = q->aux->apply;
837 }
838 if (!n) {
839 // Skip silently, nothing to do.
840 goto finish;
841 }
842 RCC(rc, finish, jbl_from_node(&jbl, n));
843 RCC(rc, finish, _jb_put_new_lw(ctx->jbc, jbl, &id));
844
845 if (!(q->aux->qmode & JQP_QRY_AGGREGATE)) {
846 struct _EJDB_DOC doc = {
847 .id = id,
848 .raw = jbl,
849 .node = n
850 };
851 do {
852 ctx->istep = 1;
853 RCC(rc, finish, ux->visitor(ux, &doc, &ctx->istep));
854 } while (ctx->istep == -1);
855 }
856 ++ux->cnt;
857
858 finish:
859 jbl_destroy(&jbl);
860 return rc;
861 }
862
863 //----------------------- Public API
864
ejdb_exec(EJDB_EXEC * ux)865 iwrc ejdb_exec(EJDB_EXEC *ux) {
866 if (!ux || !ux->db || !ux->q) {
867 return IW_ERROR_INVALID_ARGS;
868 }
869 int rci;
870 iwrc rc = 0;
871 if (!ux->visitor) {
872 ux->visitor = _jb_noop_visitor;
873 ux->q->aux->projection = 0; // Actually we don't need projection if exists
874 }
875 if (ux->log) {
876 // set terminating NULL to current pos of log
877 iwxstr_cat(ux->log, 0, 0);
878 }
879 JBEXEC ctx = {
880 .ux = ux
881 };
882 if (ux->limit < 1) {
883 rc = jql_get_limit(ux->q, &ux->limit);
884 RCRET(rc);
885 if (ux->limit < 1) {
886 ux->limit = INT64_MAX;
887 }
888 }
889 if (ux->skip < 1) {
890 rc = jql_get_skip(ux->q, &ux->skip);
891 RCRET(rc);
892 }
893 rc = _jb_coll_acquire_keeplock2(ux->db, ux->q->coll,
894 jql_has_apply(ux->q) ? JB_COLL_ACQUIRE_WRITE : JB_COLL_ACQUIRE_EXISTING,
895 &ctx.jbc);
896 if (rc == IW_ERROR_NOT_EXISTS) {
897 return 0;
898 } else {
899 RCRET(rc);
900 }
901
902 rc = _jb_exec_scan_init(&ctx);
903 RCGO(rc, finish);
904 if (ctx.sorting) {
905 if (ux->log) {
906 iwxstr_cat2(ux->log, " [COLLECTOR] SORTER\n");
907 }
908 rc = ctx.scanner(&ctx, jbi_sorter_consumer);
909 } else {
910 if (ux->log) {
911 iwxstr_cat2(ux->log, " [COLLECTOR] PLAIN\n");
912 }
913 rc = ctx.scanner(&ctx, jbi_consumer);
914 }
915 RCGO(rc, finish);
916 if ((ux->cnt == 0) && jql_has_apply_upsert(ux->q)) {
917 // No records found trying to upsert new record
918 rc = _jb_exec_upsert_lw(&ctx);
919 }
920
921 finish:
922 _jb_exec_scan_release(&ctx);
923 API_COLL_UNLOCK(ctx.jbc, rci, rc);
924 jql_reset(ux->q, true, false);
925 return rc;
926 }
927
928 struct JB_LIST_VISITOR_CTX {
929 EJDB_DOC head;
930 EJDB_DOC tail;
931 };
932
_jb_exec_list_visitor(struct _EJDB_EXEC * ctx,EJDB_DOC doc,int64_t * step)933 static iwrc _jb_exec_list_visitor(struct _EJDB_EXEC *ctx, EJDB_DOC doc, int64_t *step) {
934 struct JB_LIST_VISITOR_CTX *lvc = ctx->opaque;
935 IWPOOL *pool = ctx->pool;
936 struct _EJDB_DOC *ndoc = iwpool_alloc(sizeof(*ndoc) + sizeof(*doc->raw) + doc->raw->bn.size, pool);
937 if (!ndoc) {
938 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
939 }
940 ndoc->id = doc->id;
941 ndoc->raw = (void*) (((uint8_t*) ndoc) + sizeof(*ndoc));
942 ndoc->raw->node = 0;
943 ndoc->node = doc->node;
944 ndoc->next = 0;
945 ndoc->prev = 0;
946 memcpy(&ndoc->raw->bn, &doc->raw->bn, sizeof(ndoc->raw->bn));
947 ndoc->raw->bn.ptr = ((uint8_t*) ndoc) + sizeof(*ndoc) + sizeof(*doc->raw);
948 memcpy(ndoc->raw->bn.ptr, doc->raw->bn.ptr, doc->raw->bn.size);
949
950 if (!lvc->head) {
951 lvc->head = ndoc;
952 lvc->tail = ndoc;
953 } else {
954 lvc->tail->next = ndoc;
955 ndoc->prev = lvc->tail;
956 lvc->tail = ndoc;
957 }
958 return 0;
959 }
960
_jb_list(EJDB db,JQL q,EJDB_DOC * first,int64_t limit,IWXSTR * log,IWPOOL * pool)961 static iwrc _jb_list(EJDB db, JQL q, EJDB_DOC *first, int64_t limit, IWXSTR *log, IWPOOL *pool) {
962 if (!db || !q || !first || !pool) {
963 return IW_ERROR_INVALID_ARGS;
964 }
965 iwrc rc = 0;
966 struct JB_LIST_VISITOR_CTX lvc = { 0 };
967 struct _EJDB_EXEC ux = {
968 .db = db,
969 .q = q,
970 .visitor = _jb_exec_list_visitor,
971 .pool = pool,
972 .limit = limit,
973 .log = log,
974 .opaque = &lvc
975 };
976 rc = ejdb_exec(&ux);
977 if (rc) {
978 *first = 0;
979 } else {
980 *first = lvc.head;
981 }
982 return rc;
983 }
984
_jb_count(EJDB db,JQL q,int64_t * count,int64_t limit,IWXSTR * log)985 static iwrc _jb_count(EJDB db, JQL q, int64_t *count, int64_t limit, IWXSTR *log) {
986 if (!db || !q || !count) {
987 return IW_ERROR_INVALID_ARGS;
988 }
989 struct _EJDB_EXEC ux = {
990 .db = db,
991 .q = q,
992 .limit = limit,
993 .log = log
994 };
995 iwrc rc = ejdb_exec(&ux);
996 *count = ux.cnt;
997 return rc;
998 }
999
ejdb_count(EJDB db,JQL q,int64_t * count,int64_t limit)1000 iwrc ejdb_count(EJDB db, JQL q, int64_t *count, int64_t limit) {
1001 return _jb_count(db, q, count, limit, 0);
1002 }
1003
ejdb_count2(EJDB db,const char * coll,const char * q,int64_t * count,int64_t limit)1004 iwrc ejdb_count2(EJDB db, const char *coll, const char *q, int64_t *count, int64_t limit) {
1005 JQL jql;
1006 iwrc rc = jql_create(&jql, coll, q);
1007 RCRET(rc);
1008 rc = _jb_count(db, jql, count, limit, 0);
1009 jql_destroy(&jql);
1010 return rc;
1011 }
1012
ejdb_update(EJDB db,JQL q)1013 iwrc ejdb_update(EJDB db, JQL q) {
1014 int64_t count;
1015 return ejdb_count(db, q, &count, 0);
1016 }
1017
ejdb_update2(EJDB db,const char * coll,const char * q)1018 iwrc ejdb_update2(EJDB db, const char *coll, const char *q) {
1019 int64_t count;
1020 return ejdb_count2(db, coll, q, &count, 0);
1021 }
1022
ejdb_list(EJDB db,JQL q,EJDB_DOC * first,int64_t limit,IWPOOL * pool)1023 iwrc ejdb_list(EJDB db, JQL q, EJDB_DOC *first, int64_t limit, IWPOOL *pool) {
1024 return _jb_list(db, q, first, limit, 0, pool);
1025 }
1026
ejdb_list3(EJDB db,const char * coll,const char * query,int64_t limit,IWXSTR * log,EJDB_LIST * listp)1027 iwrc ejdb_list3(EJDB db, const char *coll, const char *query, int64_t limit, IWXSTR *log, EJDB_LIST *listp) {
1028 if (!listp) {
1029 return IW_ERROR_INVALID_ARGS;
1030 }
1031 iwrc rc = 0;
1032 *listp = 0;
1033 IWPOOL *pool = iwpool_create(1024);
1034 if (!pool) {
1035 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
1036 }
1037 EJDB_LIST list = iwpool_alloc(sizeof(*list), pool);
1038 if (!list) {
1039 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
1040 goto finish;
1041 }
1042 list->first = 0;
1043 list->db = db;
1044 list->pool = pool;
1045 rc = jql_create(&list->q, coll, query);
1046 RCGO(rc, finish);
1047 rc = _jb_list(db, list->q, &list->first, limit, log, list->pool);
1048
1049 finish:
1050 if (rc) {
1051 iwpool_destroy(pool);
1052 } else {
1053 *listp = list;
1054 }
1055 return rc;
1056 }
1057
ejdb_list4(EJDB db,JQL q,int64_t limit,IWXSTR * log,EJDB_LIST * listp)1058 iwrc ejdb_list4(EJDB db, JQL q, int64_t limit, IWXSTR *log, EJDB_LIST *listp) {
1059 if (!listp) {
1060 return IW_ERROR_INVALID_ARGS;
1061 }
1062 iwrc rc = 0;
1063 *listp = 0;
1064 IWPOOL *pool = iwpool_create(1024);
1065 if (!pool) {
1066 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
1067 }
1068 EJDB_LIST list = iwpool_alloc(sizeof(*list), pool);
1069 if (!list) {
1070 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
1071 goto finish;
1072 }
1073 list->q = 0;
1074 list->first = 0;
1075 list->db = db;
1076 list->pool = pool;
1077 rc = _jb_list(db, q, &list->first, limit, log, list->pool);
1078
1079 finish:
1080 if (rc) {
1081 iwpool_destroy(pool);
1082 } else {
1083 *listp = list;
1084 }
1085 return rc;
1086 }
1087
ejdb_list2(EJDB db,const char * coll,const char * query,int64_t limit,EJDB_LIST * listp)1088 iwrc ejdb_list2(EJDB db, const char *coll, const char *query, int64_t limit, EJDB_LIST *listp) {
1089 return ejdb_list3(db, coll, query, limit, 0, listp);
1090 }
1091
ejdb_list_destroy(EJDB_LIST * listp)1092 void ejdb_list_destroy(EJDB_LIST *listp) {
1093 if (listp) {
1094 EJDB_LIST list = *listp;
1095 if (list) {
1096 if (list->q) {
1097 jql_destroy(&list->q);
1098 }
1099 if (list->pool) {
1100 iwpool_destroy(list->pool);
1101 }
1102 }
1103 *listp = 0;
1104 }
1105 }
1106
ejdb_remove_index(EJDB db,const char * coll,const char * path,ejdb_idx_mode_t mode)1107 iwrc ejdb_remove_index(EJDB db, const char *coll, const char *path, ejdb_idx_mode_t mode) {
1108 if (!db || !coll || !path) {
1109 return IW_ERROR_INVALID_ARGS;
1110 }
1111 int rci;
1112 JBCOLL jbc;
1113 IWKV_val key;
1114 JBL_PTR ptr = 0;
1115 char keybuf[sizeof(KEY_PREFIX_IDXMETA) + 1 + 2 * JBNUMBUF_SIZE]; // Full key format: i.<coldbid>.<idxdbid>
1116
1117 iwrc rc = _jb_coll_acquire_keeplock2(db, coll, JB_COLL_ACQUIRE_WRITE | JB_COLL_ACQUIRE_EXISTING, &jbc);
1118 RCRET(rc);
1119
1120 rc = jbl_ptr_alloc(path, &ptr);
1121 RCGO(rc, finish);
1122
1123 for (JBIDX idx = jbc->idx, prev = 0; idx; idx = idx->next) {
1124 if (((idx->mode & ~EJDB_IDX_UNIQUE) == (mode & ~EJDB_IDX_UNIQUE)) && !jbl_ptr_cmp(idx->ptr, ptr)) {
1125 key.data = keybuf;
1126 key.size = snprintf(keybuf, sizeof(keybuf), KEY_PREFIX_IDXMETA "%u" "." "%u", jbc->dbid, idx->dbid);
1127 if (key.size >= sizeof(keybuf)) {
1128 rc = IW_ERROR_OVERFLOW;
1129 goto finish;
1130 }
1131 rc = iwkv_del(db->metadb, &key, 0);
1132 RCGO(rc, finish);
1133 _jb_meta_nrecs_removedb(db, idx->dbid);
1134 if (prev) {
1135 prev->next = idx->next;
1136 } else {
1137 jbc->idx = idx->next;
1138 }
1139 if (idx->idb) {
1140 iwkv_db_destroy(&idx->idb);
1141 }
1142 _jb_idx_release(idx);
1143 break;
1144 }
1145 prev = idx;
1146 }
1147
1148 finish:
1149 free(ptr);
1150 API_COLL_UNLOCK(jbc, rci, rc);
1151 return rc;
1152 }
1153
ejdb_ensure_index(EJDB db,const char * coll,const char * path,ejdb_idx_mode_t mode)1154 iwrc ejdb_ensure_index(EJDB db, const char *coll, const char *path, ejdb_idx_mode_t mode) {
1155 if (!db || !coll || !path) {
1156 return IW_ERROR_INVALID_ARGS;
1157 }
1158 int rci;
1159 JBCOLL jbc;
1160 IWKV_val key, val;
1161 char keybuf[sizeof(KEY_PREFIX_IDXMETA) + 1 + 2 * JBNUMBUF_SIZE]; // Full key format: i.<coldbid>.<idxdbid>
1162
1163 JBIDX idx = 0;
1164 JBL_PTR ptr = 0;
1165 binn *imeta = 0;
1166
1167 switch (mode & (EJDB_IDX_STR | EJDB_IDX_I64 | EJDB_IDX_F64)) {
1168 case EJDB_IDX_STR:
1169 case EJDB_IDX_I64:
1170 case EJDB_IDX_F64:
1171 break;
1172 default:
1173 return EJDB_ERROR_INVALID_INDEX_MODE;
1174 }
1175
1176 iwrc rc = _jb_coll_acquire_keeplock(db, coll, true, &jbc);
1177 RCRET(rc);
1178 rc = jbl_ptr_alloc(path, &ptr);
1179 RCGO(rc, finish);
1180
1181 for (idx = jbc->idx; idx; idx = idx->next) {
1182 if (((idx->mode & ~EJDB_IDX_UNIQUE) == (mode & ~EJDB_IDX_UNIQUE)) && !jbl_ptr_cmp(idx->ptr, ptr)) {
1183 if (idx->mode != mode) {
1184 rc = EJDB_ERROR_MISMATCHED_INDEX_UNIQUENESS_MODE;
1185 idx = 0;
1186 }
1187 goto finish;
1188 }
1189 }
1190
1191 idx = calloc(1, sizeof(*idx));
1192 if (!idx) {
1193 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
1194 goto finish;
1195 }
1196 idx->mode = mode;
1197 idx->jbc = jbc;
1198 idx->ptr = ptr;
1199 ptr = 0;
1200 idx->idbf = 0;
1201 if (mode & EJDB_IDX_I64) {
1202 idx->idbf |= IWDB_VNUM64_KEYS;
1203 } else if (mode & EJDB_IDX_F64) {
1204 idx->idbf |= IWDB_REALNUM_KEYS;
1205 }
1206 if (!(mode & EJDB_IDX_UNIQUE)) {
1207 idx->idbf |= IWDB_COMPOUND_KEYS;
1208 }
1209 rc = iwkv_new_db(db->iwkv, idx->idbf, &idx->dbid, &idx->idb);
1210 RCGO(rc, finish);
1211
1212 rc = _jb_idx_fill(idx);
1213 RCGO(rc, finish);
1214
1215 // save index meta into metadb
1216 imeta = binn_object();
1217 if (!imeta) {
1218 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
1219 goto finish;
1220 }
1221
1222 if ( !binn_object_set_str(imeta, "ptr", path)
1223 || !binn_object_set_uint32(imeta, "mode", idx->mode)
1224 || !binn_object_set_uint32(imeta, "idbf", idx->idbf)
1225 || !binn_object_set_uint32(imeta, "dbid", idx->dbid)) {
1226 rc = JBL_ERROR_CREATION;
1227 goto finish;
1228 }
1229
1230 key.data = keybuf;
1231 // Full key format: i.<coldbid>.<idxdbid>
1232 key.size = snprintf(keybuf, sizeof(keybuf), KEY_PREFIX_IDXMETA "%u" "." "%u", jbc->dbid, idx->dbid);
1233 if (key.size >= sizeof(keybuf)) {
1234 rc = IW_ERROR_OVERFLOW;
1235 goto finish;
1236 }
1237 val.data = binn_ptr(imeta);
1238 val.size = binn_size(imeta);
1239 rc = iwkv_put(db->metadb, &key, &val, 0);
1240 RCGO(rc, finish);
1241
1242 idx->next = jbc->idx;
1243 jbc->idx = idx;
1244
1245 finish:
1246 if (rc) {
1247 if (idx) {
1248 if (idx->idb) {
1249 iwkv_db_destroy(&idx->idb);
1250 idx->idb = 0;
1251 }
1252 _jb_idx_release(idx);
1253 }
1254 }
1255 free(ptr);
1256 binn_free(imeta);
1257 API_COLL_UNLOCK(jbc, rci, rc);
1258 return rc;
1259 }
1260
_jb_patch(EJDB db,const char * coll,int64_t id,bool upsert,const char * patchjson,JBL_NODE patchjbn,JBL patchjbl)1261 static iwrc _jb_patch(
1262 EJDB db, const char *coll, int64_t id, bool upsert,
1263 const char *patchjson, JBL_NODE patchjbn, JBL patchjbl) {
1264
1265 int rci;
1266 JBCOLL jbc;
1267 struct _JBL sjbl;
1268 JBL_NODE root, patch;
1269 JBL ujbl = 0;
1270 IWPOOL *pool = 0;
1271 IWKV_val val = { 0 };
1272 IWKV_val key = {
1273 .data = &id,
1274 .size = sizeof(id)
1275 };
1276
1277 iwrc rc = _jb_coll_acquire_keeplock(db, coll, true, &jbc);
1278 RCGO(rc, finish);
1279
1280 rc = iwkv_get(jbc->cdb, &key, &val);
1281 if (upsert && (rc == IWKV_ERROR_NOTFOUND)) {
1282 if (patchjson) {
1283 rc = jbl_from_json(&ujbl, patchjson);
1284 } else if (patchjbl) {
1285 ujbl = patchjbl;
1286 } else if (patchjbn) {
1287 rc = jbl_from_node(&ujbl, patchjbn);
1288 } else {
1289 rc = IW_ERROR_INVALID_ARGS;
1290 }
1291 RCGO(rc, finish);
1292 if (jbl_type(ujbl) != JBV_OBJECT) {
1293 rc = EJDB_ERROR_PATCH_JSON_NOT_OBJECT;
1294 goto finish;
1295 }
1296 rc = _jb_put_impl(jbc, ujbl, id);
1297 if (!rc && (jbc->id_seq < id)) {
1298 jbc->id_seq = id;
1299 }
1300 goto finish;
1301 } else {
1302 RCGO(rc, finish);
1303 }
1304
1305 rc = jbl_from_buf_keep_onstack(&sjbl, val.data, val.size);
1306 RCGO(rc, finish);
1307
1308 pool = iwpool_create_empty();
1309 if (!pool) {
1310 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
1311 goto finish;
1312 }
1313
1314 rc = jbl_to_node(&sjbl, &root, false, pool);
1315 RCGO(rc, finish);
1316
1317 if (patchjson) {
1318 rc = jbn_from_json(patchjson, &patch, pool);
1319 } else if (patchjbl) {
1320 rc = jbl_to_node(patchjbl, &patch, false, pool);
1321 } else if (patchjbn) {
1322 patch = patchjbn;
1323 } else {
1324 rc = IW_ERROR_INVALID_ARGS;
1325 }
1326 RCGO(rc, finish);
1327
1328 rc = jbn_patch_auto(root, patch, pool);
1329 RCGO(rc, finish);
1330
1331 if (root->type == JBV_OBJECT) {
1332 rc = jbl_create_empty_object(&ujbl);
1333 RCGO(rc, finish);
1334 } else if (root->type == JBV_ARRAY) {
1335 rc = jbl_create_empty_array(&ujbl);
1336 RCGO(rc, finish);
1337 } else {
1338 rc = JBL_ERROR_CREATION;
1339 goto finish;
1340 }
1341 rc = jbl_fill_from_node(ujbl, root);
1342 RCGO(rc, finish);
1343
1344 rc = _jb_put_impl(jbc, ujbl, id);
1345
1346 finish:
1347 API_COLL_UNLOCK(jbc, rci, rc);
1348 if (ujbl != patchjbl) {
1349 jbl_destroy(&ujbl);
1350 }
1351 if (val.data) {
1352 iwkv_val_dispose(&val);
1353 }
1354 iwpool_destroy(pool);
1355 return rc;
1356 }
1357
_jb_wal_lock_interceptor(bool before,void * op)1358 static iwrc _jb_wal_lock_interceptor(bool before, void *op) {
1359 int rci;
1360 iwrc rc = 0;
1361 EJDB db = op;
1362 assert(db);
1363 if (before) {
1364 API_WLOCK2(db, rci);
1365 } else {
1366 API_UNLOCK(db, rci, rc);
1367 }
1368 return rc;
1369 }
1370
ejdb_patch(EJDB db,const char * coll,const char * patchjson,int64_t id)1371 iwrc ejdb_patch(EJDB db, const char *coll, const char *patchjson, int64_t id) {
1372 return _jb_patch(db, coll, id, false, patchjson, 0, 0);
1373 }
1374
ejdb_patch_jbn(EJDB db,const char * coll,JBL_NODE patch,int64_t id)1375 iwrc ejdb_patch_jbn(EJDB db, const char *coll, JBL_NODE patch, int64_t id) {
1376 return _jb_patch(db, coll, id, false, 0, patch, 0);
1377 }
1378
ejdb_patch_jbl(EJDB db,const char * coll,JBL patch,int64_t id)1379 iwrc ejdb_patch_jbl(EJDB db, const char *coll, JBL patch, int64_t id) {
1380 return _jb_patch(db, coll, id, false, 0, 0, patch);
1381 }
1382
ejdb_merge_or_put(EJDB db,const char * coll,const char * patchjson,int64_t id)1383 iwrc ejdb_merge_or_put(EJDB db, const char *coll, const char *patchjson, int64_t id) {
1384 return _jb_patch(db, coll, id, true, patchjson, 0, 0);
1385 }
1386
ejdb_merge_or_put_jbn(EJDB db,const char * coll,JBL_NODE patch,int64_t id)1387 iwrc ejdb_merge_or_put_jbn(EJDB db, const char *coll, JBL_NODE patch, int64_t id) {
1388 return _jb_patch(db, coll, id, true, 0, patch, 0);
1389 }
1390
ejdb_merge_or_put_jbl(EJDB db,const char * coll,JBL patch,int64_t id)1391 iwrc ejdb_merge_or_put_jbl(EJDB db, const char *coll, JBL patch, int64_t id) {
1392 return _jb_patch(db, coll, id, true, 0, 0, patch);
1393 }
1394
ejdb_put(EJDB db,const char * coll,JBL jbl,int64_t id)1395 iwrc ejdb_put(EJDB db, const char *coll, JBL jbl, int64_t id) {
1396 if (!jbl) {
1397 return IW_ERROR_INVALID_ARGS;
1398 }
1399 int rci;
1400 JBCOLL jbc;
1401 iwrc rc = _jb_coll_acquire_keeplock(db, coll, true, &jbc);
1402 RCRET(rc);
1403 rc = _jb_put_impl(jbc, jbl, id);
1404 if (!rc && (jbc->id_seq < id)) {
1405 jbc->id_seq = id;
1406 }
1407 API_COLL_UNLOCK(jbc, rci, rc);
1408 return rc;
1409 }
1410
_jb_put_new_lw(JBCOLL jbc,JBL jbl,int64_t * id)1411 static iwrc _jb_put_new_lw(JBCOLL jbc, JBL jbl, int64_t *id) {
1412 iwrc rc = 0;
1413 int64_t oid = jbc->id_seq + 1;
1414 IWKV_val val, key = {
1415 .data = &oid,
1416 .size = sizeof(oid)
1417 };
1418 struct _JBPHCTX pctx = {
1419 .id = oid,
1420 .jbc = jbc,
1421 .jbl = jbl
1422 };
1423
1424 RCC(rc, finish, jbl_as_buf(jbl, &val.data, &val.size));
1425 RCC(rc, finish, _jb_put_handler_after(iwkv_puth(jbc->cdb, &key, &val, 0, _jb_put_handler, &pctx), &pctx));
1426
1427 jbc->id_seq = oid;
1428 if (id) {
1429 *id = oid;
1430 }
1431
1432 finish:
1433 return rc;
1434 }
1435
ejdb_put_new(EJDB db,const char * coll,JBL jbl,int64_t * id)1436 iwrc ejdb_put_new(EJDB db, const char *coll, JBL jbl, int64_t *id) {
1437 if (!jbl) {
1438 return IW_ERROR_INVALID_ARGS;
1439 }
1440 int rci;
1441 JBCOLL jbc;
1442 if (id) {
1443 *id = 0;
1444 }
1445 iwrc rc = _jb_coll_acquire_keeplock(db, coll, true, &jbc);
1446 RCRET(rc);
1447
1448 rc = _jb_put_new_lw(jbc, jbl, id);
1449
1450 API_COLL_UNLOCK(jbc, rci, rc);
1451 return rc;
1452 }
1453
ejdb_put_new_jbn(EJDB db,const char * coll,JBL_NODE jbn,int64_t * id)1454 iwrc ejdb_put_new_jbn(EJDB db, const char *coll, JBL_NODE jbn, int64_t *id) {
1455 JBL jbl = 0;
1456 iwrc rc = jbl_from_node(&jbl, jbn);
1457 RCRET(rc);
1458 rc = ejdb_put_new(db, coll, jbl, id);
1459 jbl_destroy(&jbl);
1460 return rc;
1461 }
1462
jb_get(EJDB db,const char * coll,int64_t id,jb_coll_acquire_t acm,JBL * jblp)1463 iwrc jb_get(EJDB db, const char *coll, int64_t id, jb_coll_acquire_t acm, JBL *jblp) {
1464 if (!id || !jblp) {
1465 return IW_ERROR_INVALID_ARGS;
1466 }
1467 *jblp = 0;
1468 int rci;
1469 JBCOLL jbc;
1470 JBL jbl = 0;
1471 IWKV_val val = { 0 };
1472 IWKV_val key = { .data = &id, .size = sizeof(id) };
1473 iwrc rc = _jb_coll_acquire_keeplock2(db, coll, acm, &jbc);
1474 RCRET(rc);
1475
1476 rc = iwkv_get(jbc->cdb, &key, &val);
1477 RCGO(rc, finish);
1478 rc = jbl_from_buf_keep(&jbl, val.data, val.size, false);
1479 RCGO(rc, finish);
1480 *jblp = jbl;
1481
1482 finish:
1483 if (rc) {
1484 if (jbl) {
1485 jbl_destroy(&jbl);
1486 } else {
1487 iwkv_val_dispose(&val);
1488 }
1489 }
1490 API_COLL_UNLOCK(jbc, rci, rc);
1491 return rc;
1492 }
1493
ejdb_get(EJDB db,const char * coll,int64_t id,JBL * jblp)1494 iwrc ejdb_get(EJDB db, const char *coll, int64_t id, JBL *jblp) {
1495 return jb_get(db, coll, id, JB_COLL_ACQUIRE_EXISTING, jblp);
1496 }
1497
ejdb_del(EJDB db,const char * coll,int64_t id)1498 iwrc ejdb_del(EJDB db, const char *coll, int64_t id) {
1499 int rci;
1500 JBCOLL jbc;
1501 struct _JBL jbl;
1502 IWKV_val val = { 0 };
1503 IWKV_val key = { .data = &id, .size = sizeof(id) };
1504 iwrc rc = _jb_coll_acquire_keeplock2(db, coll, JB_COLL_ACQUIRE_WRITE | JB_COLL_ACQUIRE_EXISTING, &jbc);
1505 RCRET(rc);
1506
1507 rc = iwkv_get(jbc->cdb, &key, &val);
1508 RCGO(rc, finish);
1509
1510 rc = jbl_from_buf_keep_onstack(&jbl, val.data, val.size);
1511 RCGO(rc, finish);
1512
1513 for (JBIDX idx = jbc->idx; idx; idx = idx->next) {
1514 IWRC(_jb_idx_record_remove(idx, id, &jbl), rc);
1515 }
1516 rc = iwkv_del(jbc->cdb, &key, 0);
1517 RCGO(rc, finish);
1518 _jb_meta_nrecs_update(jbc->db, jbc->dbid, -1);
1519 jbc->rnum -= 1;
1520
1521 finish:
1522 if (val.data) {
1523 iwkv_val_dispose(&val);
1524 }
1525 API_COLL_UNLOCK(jbc, rci, rc);
1526 return rc;
1527 }
1528
jb_del(JBCOLL jbc,JBL jbl,int64_t id)1529 iwrc jb_del(JBCOLL jbc, JBL jbl, int64_t id) {
1530 iwrc rc = 0;
1531 IWKV_val key = { .data = &id, .size = sizeof(id) };
1532 for (JBIDX idx = jbc->idx; idx; idx = idx->next) {
1533 IWRC(_jb_idx_record_remove(idx, id, jbl), rc);
1534 }
1535 rc = iwkv_del(jbc->cdb, &key, 0);
1536 RCRET(rc);
1537 _jb_meta_nrecs_update(jbc->db, jbc->dbid, -1);
1538 jbc->rnum -= 1;
1539 return rc;
1540 }
1541
jb_cursor_del(JBCOLL jbc,IWKV_cursor cur,int64_t id,JBL jbl)1542 iwrc jb_cursor_del(JBCOLL jbc, IWKV_cursor cur, int64_t id, JBL jbl) {
1543 iwrc rc = 0;
1544 for (JBIDX idx = jbc->idx; idx; idx = idx->next) {
1545 IWRC(_jb_idx_record_remove(idx, id, jbl), rc);
1546 }
1547 rc = iwkv_cursor_del(cur, 0);
1548 RCRET(rc);
1549 _jb_meta_nrecs_update(jbc->db, jbc->dbid, -1);
1550 jbc->rnum -= 1;
1551 return rc;
1552 }
1553
ejdb_ensure_collection(EJDB db,const char * coll)1554 iwrc ejdb_ensure_collection(EJDB db, const char *coll) {
1555 int rci;
1556 JBCOLL jbc;
1557 iwrc rc = _jb_coll_acquire_keeplock(db, coll, false, &jbc);
1558 RCRET(rc);
1559 API_COLL_UNLOCK(jbc, rci, rc);
1560 return rc;
1561 }
1562
ejdb_remove_collection(EJDB db,const char * coll)1563 iwrc ejdb_remove_collection(EJDB db, const char *coll) {
1564 int rci;
1565 iwrc rc = 0;
1566 if (db->oflags & IWKV_RDONLY) {
1567 return IW_ERROR_READONLY;
1568 }
1569 API_WLOCK(db, rci);
1570 JBCOLL jbc;
1571 IWKV_val key;
1572 char keybuf[sizeof(KEY_PREFIX_IDXMETA) + 1 + 2 * JBNUMBUF_SIZE]; // Full key format: i.<coldbid>.<idxdbid>
1573 khiter_t k = kh_get(JBCOLLM, db->mcolls, coll);
1574
1575 if (k != kh_end(db->mcolls)) {
1576
1577 jbc = kh_value(db->mcolls, k);
1578 key.data = keybuf;
1579 key.size = snprintf(keybuf, sizeof(keybuf), KEY_PREFIX_COLLMETA "%u", jbc->dbid);
1580 rc = iwkv_del(jbc->db->metadb, &key, IWKV_SYNC);
1581 RCGO(rc, finish);
1582
1583 _jb_meta_nrecs_removedb(db, jbc->dbid);
1584
1585 for (JBIDX idx = jbc->idx; idx; idx = idx->next) {
1586 key.data = keybuf;
1587 key.size = snprintf(keybuf, sizeof(keybuf), KEY_PREFIX_IDXMETA "%u" "." "%u", jbc->dbid, idx->dbid);
1588 rc = iwkv_del(jbc->db->metadb, &key, 0);
1589 RCGO(rc, finish);
1590 _jb_meta_nrecs_removedb(db, idx->dbid);
1591 }
1592 for (JBIDX idx = jbc->idx, nidx; idx; idx = nidx) {
1593 IWRC(iwkv_db_destroy(&idx->idb), rc);
1594 idx->idb = 0;
1595 nidx = idx->next;
1596 _jb_idx_release(idx);
1597 }
1598 jbc->idx = 0;
1599 IWRC(iwkv_db_destroy(&jbc->cdb), rc);
1600 kh_del(JBCOLLM, db->mcolls, k);
1601 _jb_coll_release(jbc);
1602 }
1603
1604 finish:
1605 API_UNLOCK(db, rci, rc);
1606 return rc;
1607 }
1608
jb_collection_join_resolver(int64_t id,const char * coll,JBL * out,JBEXEC * ctx)1609 iwrc jb_collection_join_resolver(int64_t id, const char *coll, JBL *out, JBEXEC *ctx) {
1610 assert(out && ctx && coll);
1611 EJDB db = ctx->jbc->db;
1612 return jb_get(db, coll, id, JB_COLL_ACQUIRE_EXISTING, out);
1613 }
1614
jb_proj_node_cache_cmp(const void * v1,const void * v2)1615 int jb_proj_node_cache_cmp(const void *v1, const void *v2) {
1616 const struct _JBDOCREF *r1 = v1;
1617 const struct _JBDOCREF *r2 = v2;
1618 int ret = r1->id > r2->id ? 1 : r1->id < r2->id ? -1 : 0;
1619 if (!ret) {
1620 return strcmp(r1->coll, r2->coll);
1621 ;
1622 }
1623 return ret;
1624 }
1625
jb_proj_node_kvfree(void * key,void * val)1626 void jb_proj_node_kvfree(void *key, void *val) {
1627 free(key);
1628 }
1629
ejdb_rename_collection(EJDB db,const char * coll,const char * new_coll)1630 iwrc ejdb_rename_collection(EJDB db, const char *coll, const char *new_coll) {
1631 if (!coll || !new_coll) {
1632 return IW_ERROR_INVALID_ARGS;
1633 }
1634 int rci;
1635 iwrc rc = 0;
1636 if (db->oflags & IWKV_RDONLY) {
1637 return IW_ERROR_READONLY;
1638 }
1639 IWKV_val key, val;
1640 JBL nmeta = 0, jbv = 0;
1641 char keybuf[JBNUMBUF_SIZE + sizeof(KEY_PREFIX_COLLMETA)];
1642
1643 API_WLOCK(db, rci);
1644
1645 khiter_t k = kh_get(JBCOLLM, db->mcolls, coll);
1646 if (k == kh_end(db->mcolls)) {
1647 rc = EJDB_ERROR_COLLECTION_NOT_FOUND;
1648 goto finish;
1649 }
1650 khiter_t k2 = kh_get(JBCOLLM, db->mcolls, new_coll);
1651 if (k2 != kh_end(db->mcolls)) {
1652 rc = EJDB_ERROR_TARGET_COLLECTION_EXISTS;
1653 goto finish;
1654 }
1655
1656 JBCOLL jbc = kh_value(db->mcolls, k);
1657
1658 rc = jbl_create_empty_object(&nmeta);
1659 RCGO(rc, finish);
1660
1661 if (!binn_object_set_str(&nmeta->bn, "name", new_coll)) {
1662 rc = JBL_ERROR_CREATION;
1663 goto finish;
1664 }
1665 if (!binn_object_set_uint32(&nmeta->bn, "id", jbc->dbid)) {
1666 rc = JBL_ERROR_CREATION;
1667 goto finish;
1668 }
1669
1670 rc = jbl_as_buf(nmeta, &val.data, &val.size);
1671 RCGO(rc, finish);
1672 key.size = snprintf(keybuf, sizeof(keybuf), KEY_PREFIX_COLLMETA "%u", jbc->dbid);
1673 if (key.size >= sizeof(keybuf)) {
1674 rc = IW_ERROR_OVERFLOW;
1675 goto finish;
1676 }
1677 key.data = keybuf;
1678
1679 rc = jbl_at(nmeta, "/name", &jbv);
1680 RCGO(rc, finish);
1681
1682 const char *new_name = jbl_get_str(jbv);
1683
1684 rc = iwkv_put(db->metadb, &key, &val, IWKV_SYNC);
1685 RCGO(rc, finish);
1686
1687 kh_del(JBCOLLM, db->mcolls, k);
1688 k2 = kh_put(JBCOLLM, db->mcolls, new_name, &rci);
1689 if (rci != -1) {
1690 kh_value(db->mcolls, k2) = jbc;
1691 } else {
1692 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
1693 goto finish;
1694 }
1695
1696 jbc->name = new_name;
1697 jbl_destroy(&jbc->meta);
1698 jbc->meta = nmeta;
1699
1700 finish:
1701 if (jbv) {
1702 jbl_destroy(&jbv);
1703 }
1704 if (rc) {
1705 if (nmeta) {
1706 jbl_destroy(&nmeta);
1707 }
1708 }
1709 API_UNLOCK(db, rci, rc);
1710 return rc;
1711 }
1712
ejdb_get_meta(EJDB db,JBL * jblp)1713 iwrc ejdb_get_meta(EJDB db, JBL *jblp) {
1714 int rci;
1715 *jblp = 0;
1716 JBL jbl;
1717 iwrc rc = jbl_create_empty_object(&jbl);
1718 RCRET(rc);
1719 binn *clist = 0;
1720 API_RLOCK(db, rci);
1721 if (!binn_object_set_str(&jbl->bn, "version", ejdb_version_full())) {
1722 rc = JBL_ERROR_CREATION;
1723 goto finish;
1724 }
1725 IWFS_FSM_STATE sfsm;
1726 rc = iwkv_state(db->iwkv, &sfsm);
1727 RCRET(rc);
1728 if ( !binn_object_set_str(&jbl->bn, "file", sfsm.exfile.file.opts.path)
1729 || !binn_object_set_int64(&jbl->bn, "size", sfsm.exfile.fsize)) {
1730 rc = JBL_ERROR_CREATION;
1731 goto finish;
1732 }
1733 clist = binn_list();
1734 if (!clist) {
1735 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
1736 goto finish;
1737 }
1738 for (khiter_t k = kh_begin(db->mcolls); k != kh_end(db->mcolls); ++k) {
1739 if (!kh_exist(db->mcolls, k)) {
1740 continue;
1741 }
1742 JBCOLL jbc = kh_val(db->mcolls, k);
1743 rc = _jb_coll_add_meta_lr(jbc, clist);
1744 RCGO(rc, finish);
1745 }
1746 if (!binn_object_set_list(&jbl->bn, "collections", clist)) {
1747 rc = JBL_ERROR_CREATION;
1748 goto finish;
1749 }
1750 binn_free(clist);
1751 clist = 0;
1752
1753 finish:
1754 API_UNLOCK(db, rci, rc);
1755 if (rc) {
1756 if (clist) {
1757 binn_free(clist);
1758 }
1759 jbl_destroy(&jbl);
1760 } else {
1761 *jblp = jbl;
1762 }
1763 return rc;
1764 }
1765
ejdb_online_backup(EJDB db,uint64_t * ts,const char * target_file)1766 iwrc ejdb_online_backup(EJDB db, uint64_t *ts, const char *target_file) {
1767 ENSURE_OPEN(db);
1768 return iwkv_online_backup(db->iwkv, ts, target_file);
1769 }
1770
ejdb_get_iwkv(EJDB db,IWKV * kvp)1771 iwrc ejdb_get_iwkv(EJDB db, IWKV *kvp) {
1772 if (!db || !kvp) {
1773 return IW_ERROR_INVALID_ARGS;
1774 }
1775 *kvp = db->iwkv;
1776 return 0;
1777 }
1778
ejdb_open(const EJDB_OPTS * _opts,EJDB * ejdbp)1779 iwrc ejdb_open(const EJDB_OPTS *_opts, EJDB *ejdbp) {
1780 *ejdbp = 0;
1781 int rci;
1782 iwrc rc = ejdb_init();
1783 RCRET(rc);
1784 if (!_opts || !_opts->kv.path || !ejdbp) {
1785 return IW_ERROR_INVALID_ARGS;
1786 }
1787
1788 EJDB db = calloc(1, sizeof(*db));
1789 if (!db) {
1790 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
1791 }
1792
1793 memcpy(&db->opts, _opts, sizeof(db->opts));
1794 if (!db->opts.sort_buffer_sz) {
1795 db->opts.sort_buffer_sz = 16 * 1024 * 1024; // 16Mb
1796 }
1797 if (db->opts.sort_buffer_sz < 1024 * 1024) { // Min 1Mb
1798 db->opts.sort_buffer_sz = 1024 * 1024;
1799 }
1800 if (!db->opts.document_buffer_sz) { // 64Kb
1801 db->opts.document_buffer_sz = 64 * 1024;
1802 }
1803 if (db->opts.document_buffer_sz < 16 * 1024) { // Min 16Kb
1804 db->opts.document_buffer_sz = 16 * 1024;
1805 }
1806 EJDB_HTTP *http = &db->opts.http;
1807 if (http->bind) {
1808 http->bind = strdup(http->bind);
1809 }
1810 if (http->access_token) {
1811 http->access_token = strdup(http->access_token);
1812 if (!http->access_token) {
1813 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
1814 }
1815 http->access_token_len = strlen(http->access_token);
1816 }
1817
1818 pthread_rwlockattr_t attr;
1819 pthread_rwlockattr_init(&attr);
1820 #if defined __linux__ && (defined __USE_UNIX98 || defined __USE_XOPEN2K)
1821 pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
1822 #endif
1823 rci = pthread_rwlock_init(&db->rwl, &attr);
1824 if (rci) {
1825 rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
1826 free(db);
1827 return rc;
1828 }
1829 db->mcolls = kh_init(JBCOLLM);
1830 if (!db->mcolls) {
1831 rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
1832 goto finish;
1833 }
1834
1835 IWKV_OPTS kvopts;
1836 memcpy(&kvopts, &db->opts.kv, sizeof(db->opts.kv));
1837 kvopts.wal.enabled = !db->opts.no_wal;
1838 kvopts.wal.wal_lock_interceptor = _jb_wal_lock_interceptor;
1839 kvopts.wal.wal_lock_interceptor_opaque = db;
1840
1841 rc = iwkv_open(&kvopts, &db->iwkv);
1842 RCGO(rc, finish);
1843
1844 db->oflags = kvopts.oflags;
1845 rc = _jb_db_meta_load(db);
1846 RCGO(rc, finish);
1847
1848 if (db->opts.http.enabled) {
1849 // Maximum WS/HTTP API body size. Default: 64Mb, Min: 512K
1850 if (!db->opts.http.max_body_size) {
1851 db->opts.http.max_body_size = 64 * 1024 * 1024;
1852 } else if (db->opts.http.max_body_size < 512 * 1024) {
1853 db->opts.http.max_body_size = 512 * 1024;
1854 }
1855 }
1856
1857 #ifdef JB_HTTP
1858 if (db->opts.http.enabled && !db->opts.http.blocking) {
1859 rc = jbr_start(db, &db->opts, &db->jbr);
1860 RCGO(rc, finish);
1861 }
1862 #endif
1863
1864 finish:
1865 if (rc) {
1866 _jb_db_release(&db);
1867 } else {
1868 db->open = true;
1869 *ejdbp = db;
1870 #ifdef JB_HTTP
1871 if (db->opts.http.enabled && db->opts.http.blocking) {
1872 rc = jbr_start(db, &db->opts, &db->jbr);
1873 }
1874 #endif
1875 }
1876 return rc;
1877 }
1878
ejdb_close(EJDB * ejdbp)1879 iwrc ejdb_close(EJDB *ejdbp) {
1880 if (!ejdbp || !*ejdbp) {
1881 return IW_ERROR_INVALID_ARGS;
1882 }
1883 EJDB db = *ejdbp;
1884 if (!__sync_bool_compare_and_swap(&db->open, 1, 0)) {
1885 iwlog_error2("Database is closed already");
1886 return IW_ERROR_INVALID_STATE;
1887 }
1888 iwrc rc = _jb_db_release(ejdbp);
1889 return rc;
1890 }
1891
ejdb_git_revision(void)1892 const char *ejdb_git_revision(void) {
1893 return EJDB2_GIT_REVISION;
1894 }
1895
ejdb_version_full(void)1896 const char *ejdb_version_full(void) {
1897 return EJDB2_VERSION;
1898 }
1899
ejdb_version_major(void)1900 unsigned int ejdb_version_major(void) {
1901 return EJDB2_VERSION_MAJOR;
1902 }
1903
ejdb_version_minor(void)1904 unsigned int ejdb_version_minor(void) {
1905 return EJDB2_VERSION_MINOR;
1906 }
1907
ejdb_version_patch(void)1908 unsigned int ejdb_version_patch(void) {
1909 return EJDB2_VERSION_PATCH;
1910 }
1911
_ejdb_ecodefn(locale_t locale,uint32_t ecode)1912 static const char *_ejdb_ecodefn(locale_t locale, uint32_t ecode) {
1913 if (!((ecode > _EJDB_ERROR_START) && (ecode < _EJDB_ERROR_END))) {
1914 return 0;
1915 }
1916 switch (ecode) {
1917 case EJDB_ERROR_INVALID_COLLECTION_META:
1918 return "Invalid collection metadata (EJDB_ERROR_INVALID_COLLECTION_META)";
1919 case EJDB_ERROR_INVALID_COLLECTION_INDEX_META:
1920 return "Invalid collection index metadata (EJDB_ERROR_INVALID_COLLECTION_INDEX_META)";
1921 case EJDB_ERROR_INVALID_INDEX_MODE:
1922 return "Invalid index mode specified (EJDB_ERROR_INVALID_INDEX_MODE)";
1923 case EJDB_ERROR_MISMATCHED_INDEX_UNIQUENESS_MODE:
1924 return "Index exists but mismatched uniqueness constraint (EJDB_ERROR_MISMATCHED_INDEX_UNIQUENESS_MODE)";
1925 case EJDB_ERROR_UNIQUE_INDEX_CONSTRAINT_VIOLATED:
1926 return "Unique index constraint violated (EJDB_ERROR_UNIQUE_INDEX_CONSTRAINT_VIOLATED)";
1927 case EJDB_ERROR_INVALID_COLLECTION_NAME:
1928 return "Invalid collection name (EJDB_ERROR_INVALID_COLLECTION_NAME)";
1929 case EJDB_ERROR_COLLECTION_NOT_FOUND:
1930 return "Collection not found (EJDB_ERROR_COLLECTION_NOT_FOUND)";
1931 case EJDB_ERROR_TARGET_COLLECTION_EXISTS:
1932 return "Target collection exists (EJDB_ERROR_TARGET_COLLECTION_EXISTS)";
1933 case EJDB_ERROR_PATCH_JSON_NOT_OBJECT:
1934 return "Patch JSON must be an object (map) (EJDB_ERROR_PATCH_JSON_NOT_OBJECT)";
1935 }
1936 return 0;
1937 }
1938
ejdb_init()1939 iwrc ejdb_init() {
1940 static volatile int jb_initialized = 0;
1941 if (!__sync_bool_compare_and_swap(&jb_initialized, 0, 1)) {
1942 return 0; // initialized already
1943 }
1944 iwrc rc = iw_init();
1945 RCRET(rc);
1946 rc = jbl_init();
1947 RCRET(rc);
1948 rc = jql_init();
1949 RCRET(rc);
1950 #ifdef JB_HTTP
1951 rc = jbr_init();
1952 RCRET(rc);
1953 #endif
1954 return iwlog_register_ecodefn(_ejdb_ecodefn);
1955 }
1956