1 #include "ejdb2_internal.h"
2 #include <iowow/murmur3.h>
3
4 static iwrc _jb_put_new_lw(JBCOLL jbc, JBL jbl, int64_t *id);
5
6 static const IWKV_val EMPTY_VAL = { 0 };
7
_jb_meta_nrecs_removedb(EJDB db,uint32_t dbid)8 IW_INLINE iwrc _jb_meta_nrecs_removedb(EJDB db, uint32_t dbid) {
9 dbid = IW_HTOIL(dbid);
10 IWKV_val key = {
11 .size = sizeof(dbid),
12 .data = &dbid
13 };
14 return iwkv_del(db->nrecdb, &key, 0);
15 }
16
_jb_meta_nrecs_update(EJDB db,uint32_t dbid,int64_t delta)17 IW_INLINE iwrc _jb_meta_nrecs_update(EJDB db, uint32_t dbid, int64_t delta) {
18 delta = IW_HTOILL(delta);
19 dbid = IW_HTOIL(dbid);
20 IWKV_val val = {
21 .size = sizeof(delta),
22 .data = &delta
23 };
24 IWKV_val key = {
25 .size = sizeof(dbid),
26 .data = &dbid
27 };
28 return iwkv_put(db->nrecdb, &key, &val, IWKV_VAL_INCREMENT);
29 }
30
_jb_meta_nrecs_get(EJDB db,uint32_t dbid)31 static int64_t _jb_meta_nrecs_get(EJDB db, uint32_t dbid) {
32 size_t vsz = 0;
33 uint64_t ret = 0;
34 dbid = IW_HTOIL(dbid);
35 IWKV_val key = {
36 .size = sizeof(dbid),
37 .data = &dbid
38 };
39 iwkv_get_copy(db->nrecdb, &key, &ret, sizeof(ret), &vsz);
40 if (vsz == sizeof(ret)) {
41 ret = IW_ITOHLL(ret);
42 }
43 return (int64_t) ret;
44 }
45
_jb_idx_release(JBIDX idx)46 static void _jb_idx_release(JBIDX idx) {
47 free(idx->ptr);
48 free(idx);
49 }
50
_jb_coll_release(JBCOLL jbc)51 static void _jb_coll_release(JBCOLL jbc) {
52 if (jbc->meta) {
53 jbl_destroy(&jbc->meta);
54 }
55 JBIDX nidx;
56 for (JBIDX idx = jbc->idx; idx; idx = nidx) {
57 nidx = idx->next;
58 _jb_idx_release(idx);
59 }
60 jbc->idx = 0;
61 pthread_rwlock_destroy(&jbc->rwl);
62 free(jbc);
63 }
64
_jb_coll_load_index_lr(JBCOLL jbc,IWKV_val * mval)65 static iwrc _jb_coll_load_index_lr(JBCOLL jbc, IWKV_val *mval) {
66 iwrc rc;
67 binn *bn;
68 char *ptr;
69 struct _JBL imeta;
70 JBIDX idx = calloc(1, sizeof(*idx));
71 if (!idx) {
72 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
73 }
74
75 RCC(rc, finish, jbl_from_buf_keep_onstack(&imeta, mval->data, mval->size));
76 bn = &imeta.bn;
77
78 if ( !binn_object_get_str(bn, "ptr", &ptr)
79 || !binn_object_get_uint8(bn, "mode", &idx->mode)
80 || !binn_object_get_uint8(bn, "idbf", &idx->idbf)
81 || !binn_object_get_uint32(bn, "dbid", &idx->dbid)) {
82 rc = EJDB_ERROR_INVALID_COLLECTION_INDEX_META;
83 goto finish;
84 }
85
86 RCC(rc, finish, jbl_ptr_alloc(ptr, &idx->ptr));
87 RCC(rc, finish, iwkv_db(jbc->db->iwkv, idx->dbid, idx->idbf, &idx->idb));
88
89 idx->jbc = jbc;
90 idx->rnum = _jb_meta_nrecs_get(jbc->db, idx->dbid);
91 idx->next = jbc->idx;
92 jbc->idx = idx;
93
94 finish:
95 if (rc) {
96 _jb_idx_release(idx);
97 }
98 return rc;
99 }
100
_jb_coll_load_indexes_lr(JBCOLL jbc)101 static iwrc _jb_coll_load_indexes_lr(JBCOLL jbc) {
102 iwrc rc = 0;
103 IWKV_cursor cur;
104 IWKV_val kval;
105 char buf[sizeof(KEY_PREFIX_IDXMETA) + IWNUMBUF_SIZE];
106 // Full key format: i.<coldbid>.<idxdbid>
107 int sz = snprintf(buf, sizeof(buf), KEY_PREFIX_IDXMETA "%u.", jbc->dbid);
108 if (sz >= sizeof(buf)) {
109 return IW_ERROR_OVERFLOW;
110 }
111 kval.data = buf;
112 kval.size = sz;
113 rc = iwkv_cursor_open(jbc->db->metadb, &cur, IWKV_CURSOR_GE, &kval);
114 if (rc == IWKV_ERROR_NOTFOUND) {
115 rc = 0;
116 goto finish;
117 }
118 RCRET(rc);
119
120 do {
121 IWKV_val key, val;
122 RCC(rc, finish, iwkv_cursor_key(cur, &key));
123 if ((key.size > sz) && !strncmp(buf, key.data, sz)) {
124 iwkv_val_dispose(&key);
125 RCC(rc, finish, iwkv_cursor_val(cur, &val));
126 rc = _jb_coll_load_index_lr(jbc, &val);
127 iwkv_val_dispose(&val);
128 RCBREAK(rc);
129 } else {
130 iwkv_val_dispose(&key);
131 }
132 } while (!(rc = iwkv_cursor_to(cur, IWKV_CURSOR_PREV)));
133 if (rc == IWKV_ERROR_NOTFOUND) {
134 rc = 0;
135 }
136
137 finish:
138 iwkv_cursor_close(&cur);
139 return rc;
140 }
141
_jb_coll_load_meta_lr(JBCOLL jbc)142 static iwrc _jb_coll_load_meta_lr(JBCOLL jbc) {
143 JBL jbv;
144 IWKV_cursor cur;
145 JBL jbm = jbc->meta;
146 iwrc rc = jbl_at(jbm, "/name", &jbv);
147 RCRET(rc);
148 jbc->name = jbl_get_str(jbv);
149 jbl_destroy(&jbv);
150 if (!jbc->name) {
151 return EJDB_ERROR_INVALID_COLLECTION_META;
152 }
153 rc = jbl_at(jbm, "/id", &jbv);
154 RCRET(rc);
155 jbc->dbid = (uint32_t) jbl_get_i64(jbv);
156 jbl_destroy(&jbv);
157 if (!jbc->dbid) {
158 return EJDB_ERROR_INVALID_COLLECTION_META;
159 }
160 rc = iwkv_db(jbc->db->iwkv, jbc->dbid, IWDB_VNUM64_KEYS, &jbc->cdb);
161 RCRET(rc);
162
163 jbc->rnum = _jb_meta_nrecs_get(jbc->db, jbc->dbid);
164
165 rc = _jb_coll_load_indexes_lr(jbc);
166 RCRET(rc);
167
168 rc = iwkv_cursor_open(jbc->cdb, &cur, IWKV_CURSOR_BEFORE_FIRST, 0);
169 RCRET(rc);
170 rc = iwkv_cursor_to(cur, IWKV_CURSOR_NEXT);
171 if (rc) {
172 if (rc == IWKV_ERROR_NOTFOUND) {
173 rc = 0;
174 }
175 } else {
176 size_t sz;
177 RCC(rc, finish, iwkv_cursor_copy_key(cur, &jbc->id_seq, sizeof(jbc->id_seq), &sz, 0));
178 }
179
180 finish:
181 iwkv_cursor_close(&cur);
182 return rc;
183 }
184
_jb_coll_init(JBCOLL jbc,IWKV_val * meta)185 static iwrc _jb_coll_init(JBCOLL jbc, IWKV_val *meta) {
186 iwrc rc = 0;
187 pthread_rwlockattr_t attr;
188 pthread_rwlockattr_init(&attr);
189 #if defined __linux__ && (defined __USE_UNIX98 || defined __USE_XOPEN2K)
190 pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
191 #endif
192 pthread_rwlock_init(&jbc->rwl, &attr);
193 if (meta) {
194 rc = jbl_from_buf_keep(&jbc->meta, meta->data, meta->size, false);
195 RCRET(rc);
196 }
197 if (!jbc->meta) {
198 iwlog_error("Collection %s seems to be initialized", jbc->name);
199 return IW_ERROR_INVALID_STATE;
200 }
201 rc = _jb_coll_load_meta_lr(jbc);
202 RCRET(rc);
203
204 rc = iwhmap_put(jbc->db->mcolls, (void*) jbc->name, jbc);
205 RCRET(rc);
206
207 return rc;
208 }
209
_jb_idx_add_meta_lr(JBIDX idx,binn * list)210 static iwrc _jb_idx_add_meta_lr(JBIDX idx, binn *list) {
211 iwrc rc = 0;
212 IWXSTR *xstr = iwxstr_new();
213 if (!xstr) {
214 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
215 }
216 binn *meta = binn_object();
217 if (!meta) {
218 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
219 iwxstr_destroy(xstr);
220 return rc;
221 }
222 RCC(rc, finish, jbl_ptr_serialize(idx->ptr, xstr));
223
224 if ( !binn_object_set_str(meta, "ptr", iwxstr_ptr(xstr))
225 || !binn_object_set_uint32(meta, "mode", idx->mode)
226 || !binn_object_set_uint32(meta, "idbf", idx->idbf)
227 || !binn_object_set_uint32(meta, "dbid", idx->dbid)
228 || !binn_object_set_int64(meta, "rnum", idx->rnum)) {
229 rc = JBL_ERROR_CREATION;
230 }
231
232 if (!binn_list_add_object(list, meta)) {
233 rc = JBL_ERROR_CREATION;
234 }
235
236 finish:
237 iwxstr_destroy(xstr);
238 binn_free(meta);
239 return rc;
240 }
241
_jb_coll_add_meta_lr(JBCOLL jbc,binn * list)242 static iwrc _jb_coll_add_meta_lr(JBCOLL jbc, binn *list) {
243 iwrc rc = 0;
244 binn *ilist = 0;
245 binn *meta = binn_object();
246 if (!meta) {
247 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
248 return rc;
249 }
250 if ( !binn_object_set_str(meta, "name", jbc->name)
251 || !binn_object_set_uint32(meta, "dbid", jbc->dbid)
252 || !binn_object_set_int64(meta, "rnum", jbc->rnum)) {
253 rc = JBL_ERROR_CREATION;
254 goto finish;
255 }
256 ilist = binn_list();
257 if (!ilist) {
258 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
259 goto finish;
260 }
261 for (JBIDX idx = jbc->idx; idx; idx = idx->next) {
262 RCC(rc, finish, _jb_idx_add_meta_lr(idx, ilist));
263 }
264 if (!binn_object_set_list(meta, "indexes", ilist)) {
265 rc = JBL_ERROR_CREATION;
266 goto finish;
267 }
268 if (!binn_list_add_value(list, meta)) {
269 rc = JBL_ERROR_CREATION;
270 goto finish;
271 }
272
273 finish:
274 binn_free(meta);
275 if (ilist) {
276 binn_free(ilist);
277 }
278 return rc;
279 }
280
_jb_db_meta_load(EJDB db)281 static iwrc _jb_db_meta_load(EJDB db) {
282 iwrc rc = 0;
283 if (!db->metadb) {
284 rc = iwkv_db(db->iwkv, METADB_ID, 0, &db->metadb);
285 RCRET(rc);
286 }
287 if (!db->nrecdb) {
288 rc = iwkv_db(db->iwkv, NUMRECSDB_ID, IWDB_VNUM64_KEYS, &db->nrecdb);
289 RCRET(rc);
290 }
291
292 IWKV_cursor cur;
293 rc = iwkv_cursor_open(db->metadb, &cur, IWKV_CURSOR_BEFORE_FIRST, 0);
294 RCRET(rc);
295 while (!(rc = iwkv_cursor_to(cur, IWKV_CURSOR_NEXT))) {
296 IWKV_val key, val;
297 RCC(rc, finish, iwkv_cursor_get(cur, &key, &val));
298 if (!strncmp(key.data, KEY_PREFIX_COLLMETA, sizeof(KEY_PREFIX_COLLMETA) - 1)) {
299 JBCOLL jbc = calloc(1, sizeof(*jbc));
300 if (!jbc) {
301 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
302 iwkv_val_dispose(&val);
303 goto finish;
304 }
305 jbc->db = db;
306 rc = _jb_coll_init(jbc, &val);
307 if (rc) {
308 _jb_coll_release(jbc);
309 iwkv_val_dispose(&key);
310 goto finish;
311 }
312 } else {
313 iwkv_val_dispose(&val);
314 }
315 iwkv_val_dispose(&key);
316 }
317 if (rc == IWKV_ERROR_NOTFOUND) {
318 rc = 0;
319 }
320
321 finish:
322 iwkv_cursor_close(&cur);
323 return rc;
324 }
325
_jb_db_release(EJDB * dbp)326 static iwrc _jb_db_release(EJDB *dbp) {
327 iwrc rc = 0;
328 EJDB db = *dbp;
329 *dbp = 0;
330 #ifdef JB_HTTP
331 jbr_shutdown_wait(db->jbr);
332 #endif
333 if (db->mcolls) {
334 iwhmap_destroy(db->mcolls);
335 db->mcolls = 0;
336 }
337 if (db->iwkv) {
338 IWRC(iwkv_close(&db->iwkv), rc);
339 }
340 pthread_rwlock_destroy(&db->rwl);
341
342 EJDB_HTTP *http = &db->opts.http;
343 if (http->bind) {
344 free((void*) http->bind);
345 }
346 if (http->access_token) {
347 free((void*) http->access_token);
348 }
349 free(db);
350 return rc;
351 }
352
_jb_coll_acquire_keeplock2(EJDB db,const char * coll,jb_coll_acquire_t acm,JBCOLL * jbcp)353 static iwrc _jb_coll_acquire_keeplock2(EJDB db, const char *coll, jb_coll_acquire_t acm, JBCOLL *jbcp) {
354 if (!coll || *coll == '\0' || strlen(coll) > EJDB_COLLECTION_NAME_MAX_LEN) {
355 return EJDB_ERROR_INVALID_COLLECTION_NAME;
356 }
357 int rci;
358 iwrc rc = 0;
359 *jbcp = 0;
360 JBCOLL jbc = 0;
361 bool wl = acm & JB_COLL_ACQUIRE_WRITE;
362 API_RLOCK(db, rci);
363
364 jbc = iwhmap_get(db->mcolls, coll);
365 if (jbc) {
366 wl ? pthread_rwlock_wrlock(&jbc->rwl) : pthread_rwlock_rdlock(&jbc->rwl);
367 *jbcp = jbc;
368 } else {
369 pthread_rwlock_unlock(&db->rwl); // relock
370 if ((db->oflags & IWKV_RDONLY) || (acm & JB_COLL_ACQUIRE_EXISTING)) {
371 return IW_ERROR_NOT_EXISTS;
372 }
373 API_WLOCK(db, rci);
374 jbc = iwhmap_get(db->mcolls, coll);
375 if (jbc) {
376 pthread_rwlock_rdlock(&jbc->rwl);
377 *jbcp = jbc;
378 } else {
379 JBL meta = 0;
380 IWDB cdb = 0;
381 uint32_t dbid = 0;
382 char keybuf[IWNUMBUF_SIZE + sizeof(KEY_PREFIX_COLLMETA)];
383 IWKV_val key, val;
384
385 RCC(rc, create_finish, iwkv_new_db(db->iwkv, IWDB_VNUM64_KEYS, &dbid, &cdb));
386 jbc = calloc(1, sizeof(*jbc));
387 if (!jbc) {
388 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
389 goto create_finish;
390 }
391 RCC(rc, create_finish, jbl_create_empty_object(&meta));
392 if (!binn_object_set_str(&meta->bn, "name", coll)) {
393 rc = JBL_ERROR_CREATION;
394 goto create_finish;
395 }
396 if (!binn_object_set_uint32(&meta->bn, "id", dbid)) {
397 rc = JBL_ERROR_CREATION;
398 goto create_finish;
399 }
400 RCC(rc, create_finish, jbl_as_buf(meta, &val.data, &val.size));
401
402 key.size = snprintf(keybuf, sizeof(keybuf), KEY_PREFIX_COLLMETA "%u", dbid);
403 if (key.size >= sizeof(keybuf)) {
404 rc = IW_ERROR_OVERFLOW;
405 goto create_finish;
406 }
407 key.data = keybuf;
408 RCC(rc, create_finish, iwkv_put(db->metadb, &key, &val, IWKV_SYNC));
409
410 jbc->db = db;
411 jbc->meta = meta;
412 rc = _jb_coll_init(jbc, 0);
413 if (rc) {
414 iwkv_del(db->metadb, &key, IWKV_SYNC);
415 goto create_finish;
416 }
417
418 create_finish:
419 if (rc) {
420 if (meta) {
421 jbl_destroy(&meta);
422 }
423 if (cdb) {
424 iwkv_db_destroy(&cdb);
425 }
426 if (jbc) {
427 jbc->meta = 0; // meta was cleared
428 _jb_coll_release(jbc);
429 }
430 } else {
431 rci = wl ? pthread_rwlock_wrlock(&jbc->rwl) : pthread_rwlock_rdlock(&jbc->rwl); // -V522
432 if (rci) {
433 rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
434 goto finish;
435 }
436 *jbcp = jbc;
437 }
438 }
439 }
440
441 finish:
442 if (rc) {
443 pthread_rwlock_unlock(&db->rwl);
444 }
445 return rc;
446 }
447
_jb_coll_acquire_keeplock(EJDB db,const char * coll,bool wl,JBCOLL * jbcp)448 IW_INLINE iwrc _jb_coll_acquire_keeplock(EJDB db, const char *coll, bool wl, JBCOLL *jbcp) {
449 return _jb_coll_acquire_keeplock2(db, coll, wl ? JB_COLL_ACQUIRE_WRITE : 0, jbcp);
450 }
451
_jb_idx_record_add(JBIDX idx,int64_t id,JBL jbl,JBL jblprev)452 static iwrc _jb_idx_record_add(JBIDX idx, int64_t id, JBL jbl, JBL jblprev) {
453 IWKV_val key;
454 uint8_t step;
455 char vnbuf[IW_VNUMBUFSZ];
456 char numbuf[IWNUMBUF_SIZE];
457
458 bool jbv_found, jbvprev_found;
459 struct _JBL jbv = { 0 }, jbvprev = { 0 };
460 jbl_type_t jbv_type, jbvprev_type;
461
462 iwrc rc = 0;
463 IWPOOL *pool = 0;
464 int64_t delta = 0; // delta of added/removed index records
465 bool compound = idx->idbf & IWDB_COMPOUND_KEYS;
466
467 jbvprev_found = jblprev ? _jbl_at(jblprev, idx->ptr, &jbvprev) : false;
468 jbv_found = jbl ? _jbl_at(jbl, idx->ptr, &jbv) : false;
469
470 jbv_type = jbl_type(&jbv);
471 jbvprev_type = jbl_type(&jbvprev);
472
473 // Do not index NULLs, OBJECTs, ARRAYs (in `EJDB_IDX_UNIQUE` mode)
474 if ( ((jbvprev_type == JBV_OBJECT) || (jbvprev_type <= JBV_NULL))
475 || ((jbvprev_type == JBV_ARRAY) && !compound)) {
476 jbvprev_found = false;
477 }
478 if ( ((jbv_type == JBV_OBJECT) || (jbv_type <= JBV_NULL))
479 || ((jbv_type == JBV_ARRAY) && !compound)) {
480 jbv_found = false;
481 }
482
483 if ( compound
484 && (jbv_type == jbvprev_type)
485 && (jbvprev_type == JBV_ARRAY)) { // compare next/prev obj arrays
486 pool = iwpool_create(1024);
487 if (!pool) {
488 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
489 goto finish;
490 }
491 JBL_NODE jbvprev_node, jbv_node;
492 RCC(rc, finish, jbl_to_node(&jbv, &jbv_node, false, pool));
493 jbv.node = jbv_node;
494
495 RCC(rc, finish, jbl_to_node(&jbvprev, &jbvprev_node, false, pool));
496 jbvprev.node = jbvprev_node;
497
498 if (_jbl_compare_nodes(jbv_node, jbvprev_node, &rc) == 0) {
499 goto finish; // Arrays are equal or error
500 }
501 } else if (_jbl_is_eq_atomic_values(&jbv, &jbvprev)) {
502 return 0;
503 }
504
505 if (jbvprev_found) { // Remove old index elements
506 if (jbvprev_type == JBV_ARRAY) { // TODO: array modification delta?
507 JBL_NODE n;
508 if (!pool) {
509 pool = iwpool_create(1024);
510 if (!pool) {
511 RCC(rc, finish, iwrc_set_errno(IW_ERROR_ALLOC, errno));
512 }
513 }
514 RCC(rc, finish, jbl_to_node(&jbvprev, &n, false, pool));
515 for (n = n->child; n; n = n->next) {
516 jbi_node_fill_ikey(idx, n, &key, numbuf);
517 if (key.size) {
518 key.compound = id;
519 rc = iwkv_del(idx->idb, &key, 0);
520 if (!rc) {
521 --delta;
522 } else if (rc == IWKV_ERROR_NOTFOUND) {
523 rc = 0;
524 }
525 RCGO(rc, finish);
526 }
527 }
528 } else {
529 jbi_jbl_fill_ikey(idx, &jbvprev, &key, numbuf);
530 if (key.size) {
531 key.compound = id;
532 rc = iwkv_del(idx->idb, &key, 0);
533 if (!rc) {
534 --delta;
535 } else if (rc == IWKV_ERROR_NOTFOUND) {
536 rc = 0;
537 }
538 RCGO(rc, finish);
539 }
540 }
541 }
542
543 if (jbv_found) { // Add index record
544 if (jbv_type == JBV_ARRAY) { // TODO: array modification delta?
545 JBL_NODE n;
546 if (!pool) {
547 pool = iwpool_create(1024);
548 if (!pool) {
549 RCC(rc, finish, iwrc_set_errno(IW_ERROR_ALLOC, errno));
550 }
551 }
552 RCC(rc, finish, jbl_to_node(&jbv, &n, false, pool));
553 for (n = n->child; n; n = n->next) {
554 jbi_node_fill_ikey(idx, n, &key, numbuf);
555 if (key.size) {
556 key.compound = id;
557 rc = iwkv_put(idx->idb, &key, &EMPTY_VAL, IWKV_NO_OVERWRITE);
558 if (!rc) {
559 ++delta;
560 } else if (rc == IWKV_ERROR_KEY_EXISTS) {
561 rc = 0;
562 } else {
563 goto finish;
564 }
565 }
566 }
567 } else {
568 jbi_jbl_fill_ikey(idx, &jbv, &key, numbuf);
569 if (key.size) {
570 if (compound) {
571 key.compound = id;
572 rc = iwkv_put(idx->idb, &key, &EMPTY_VAL, IWKV_NO_OVERWRITE);
573 if (!rc) {
574 ++delta;
575 } else if (rc == IWKV_ERROR_KEY_EXISTS) {
576 rc = 0;
577 }
578 } else {
579 IW_SETVNUMBUF64(step, vnbuf, id);
580 IWKV_val idval = {
581 .data = vnbuf,
582 .size = step
583 };
584 rc = iwkv_put(idx->idb, &key, &idval, IWKV_NO_OVERWRITE);
585 if (!rc) {
586 ++delta;
587 } else if (rc == IWKV_ERROR_KEY_EXISTS) {
588 rc = EJDB_ERROR_UNIQUE_INDEX_CONSTRAINT_VIOLATED;
589 goto finish;
590 }
591 }
592 }
593 }
594 }
595
596 finish:
597 if (pool) {
598 iwpool_destroy(pool);
599 }
600 if (delta && !_jb_meta_nrecs_update(idx->jbc->db, idx->dbid, delta)) {
601 idx->rnum += delta;
602 }
603 return rc;
604 }
605
_jb_idx_record_remove(JBIDX idx,int64_t id,JBL jbl)606 IW_INLINE iwrc _jb_idx_record_remove(JBIDX idx, int64_t id, JBL jbl) {
607 return _jb_idx_record_add(idx, id, 0, jbl);
608 }
609
_jb_idx_fill(JBIDX idx)610 static iwrc _jb_idx_fill(JBIDX idx) {
611 IWKV_cursor cur;
612 IWKV_val key, val;
613 struct _JBL jbs;
614 int64_t llv;
615 JBL jbl = &jbs;
616
617 iwrc rc = iwkv_cursor_open(idx->jbc->cdb, &cur, IWKV_CURSOR_BEFORE_FIRST, 0);
618 while (!rc) {
619 rc = iwkv_cursor_to(cur, IWKV_CURSOR_NEXT);
620 if (rc == IWKV_ERROR_NOTFOUND) {
621 rc = 0;
622 break;
623 }
624 rc = iwkv_cursor_get(cur, &key, &val);
625 RCBREAK(rc);
626 if (!binn_load(val.data, &jbs.bn)) {
627 rc = JBL_ERROR_CREATION;
628 break;
629 }
630 memcpy(&llv, key.data, sizeof(llv));
631 rc = _jb_idx_record_add(idx, llv, jbl, 0);
632 iwkv_kv_dispose(&key, &val);
633 }
634 IWRC(iwkv_cursor_close(&cur), rc);
635 return rc;
636 }
637
638 // Used to avoid deadlocks within a `iwkv_put` context
_jb_put_handler_after(iwrc rc,struct _JBPHCTX * ctx)639 static iwrc _jb_put_handler_after(iwrc rc, struct _JBPHCTX *ctx) {
640 IWKV_val *oldval = &ctx->oldval;
641 if (rc) {
642 if (oldval->size) {
643 iwkv_val_dispose(oldval);
644 }
645 return rc;
646 }
647 JBL prev;
648 struct _JBL jblprev;
649 JBCOLL jbc = ctx->jbc;
650 if (oldval->size) {
651 rc = jbl_from_buf_keep_onstack(&jblprev, oldval->data, oldval->size);
652 RCRET(rc);
653 prev = &jblprev;
654 } else {
655 prev = 0;
656 }
657 JBIDX fail_idx = 0;
658 for (JBIDX idx = jbc->idx; idx; idx = idx->next) {
659 rc = _jb_idx_record_add(idx, ctx->id, ctx->jbl, prev);
660 if (rc) {
661 fail_idx = idx;
662 goto finish;
663 }
664 }
665 if (!prev) {
666 _jb_meta_nrecs_update(jbc->db, jbc->dbid, 1);
667 jbc->rnum += 1;
668 }
669
670 finish:
671 if (oldval->size) {
672 iwkv_val_dispose(oldval);
673 }
674 if (rc && !oldval->size) {
675 // Cleanup on error inserting new record
676 IWKV_val key = { .data = &ctx->id, .size = sizeof(ctx->id) };
677 for (JBIDX idx = jbc->idx; idx && idx != fail_idx; idx = idx->next) {
678 IWRC(_jb_idx_record_remove(idx, ctx->id, ctx->jbl), rc);
679 }
680 IWRC(iwkv_del(jbc->cdb, &key, 0), rc);
681 }
682 return rc;
683 }
684
_jb_put_handler(const IWKV_val * key,const IWKV_val * val,IWKV_val * oldval,void * op)685 static iwrc _jb_put_handler(const IWKV_val *key, const IWKV_val *val, IWKV_val *oldval, void *op) {
686 struct _JBPHCTX *ctx = op;
687 if (oldval && oldval->size) {
688 memcpy(&ctx->oldval, oldval, sizeof(*oldval));
689 }
690 return 0;
691 }
692
_jb_exec_scan_init(JBEXEC * ctx)693 static iwrc _jb_exec_scan_init(JBEXEC *ctx) {
694 ctx->istep = 1;
695 ctx->jblbufsz = ctx->jbc->db->opts.document_buffer_sz;
696 ctx->jblbuf = malloc(ctx->jblbufsz);
697 if (!ctx->jblbuf) {
698 ctx->jblbufsz = 0;
699 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
700 }
701 struct JQP_AUX *aux = ctx->ux->q->aux;
702 if (aux->expr->flags & JQP_EXPR_NODE_FLAG_PK) { // Select by primary key
703 ctx->scanner = jbi_pk_scanner;
704 if (ctx->ux->log) {
705 iwxstr_cat2(ctx->ux->log, "[INDEX] PK");
706 }
707 return 0;
708 }
709 iwrc rc = jbi_selection(ctx);
710 RCRET(rc);
711 if (ctx->midx.idx) {
712 if (ctx->midx.idx->idbf & IWDB_COMPOUND_KEYS) {
713 ctx->scanner = jbi_dup_scanner;
714 } else {
715 ctx->scanner = jbi_uniq_scanner;
716 }
717 } else {
718 ctx->scanner = jbi_full_scanner;
719 if (ctx->ux->log) {
720 iwxstr_cat2(ctx->ux->log, "[INDEX] NO");
721 }
722 }
723 return 0;
724 }
725
_jb_exec_scan_release(JBEXEC * ctx)726 static void _jb_exec_scan_release(JBEXEC *ctx) {
727 if (ctx->proj_joined_nodes_cache) {
728 // Destroy projected nodes key
729 iwhmap_destroy(ctx->proj_joined_nodes_cache);
730 ctx->proj_joined_nodes_cache = 0;
731 }
732 if (ctx->proj_joined_nodes_pool) {
733 iwpool_destroy(ctx->proj_joined_nodes_pool);
734 }
735 free(ctx->jblbuf);
736 }
737
_jb_noop_visitor(struct _EJDB_EXEC * ctx,EJDB_DOC doc,int64_t * step)738 static iwrc _jb_noop_visitor(struct _EJDB_EXEC *ctx, EJDB_DOC doc, int64_t *step) {
739 return 0;
740 }
741
_jb_put_impl(JBCOLL jbc,JBL jbl,int64_t id)742 IW_INLINE iwrc _jb_put_impl(JBCOLL jbc, JBL jbl, int64_t id) {
743 IWKV_val val, key = {
744 .data = &id,
745 .size = sizeof(id)
746 };
747 struct _JBPHCTX pctx = {
748 .id = id,
749 .jbc = jbc,
750 .jbl = jbl
751 };
752 iwrc rc = jbl_as_buf(jbl, &val.data, &val.size);
753 RCRET(rc);
754 return _jb_put_handler_after(iwkv_puth(jbc->cdb, &key, &val, 0, _jb_put_handler, &pctx), &pctx);
755 }
756
jb_put(JBCOLL jbc,JBL jbl,int64_t id)757 iwrc jb_put(JBCOLL jbc, JBL jbl, int64_t id) {
758 return _jb_put_impl(jbc, jbl, id);
759 }
760
jb_cursor_set(JBCOLL jbc,IWKV_cursor cur,int64_t id,JBL jbl)761 iwrc jb_cursor_set(JBCOLL jbc, IWKV_cursor cur, int64_t id, JBL jbl) {
762 IWKV_val val;
763 struct _JBPHCTX pctx = {
764 .id = id,
765 .jbc = jbc,
766 .jbl = jbl
767 };
768 iwrc rc = jbl_as_buf(jbl, &val.data, &val.size);
769 RCRET(rc);
770 return _jb_put_handler_after(iwkv_cursor_seth(cur, &val, 0, _jb_put_handler, &pctx), &pctx);
771 }
772
_jb_exec_upsert_lw(JBEXEC * ctx)773 static iwrc _jb_exec_upsert_lw(JBEXEC *ctx) {
774 JBL_NODE n;
775 int64_t id;
776 iwrc rc = 0;
777 JBL jbl = 0;
778 EJDB_EXEC *ux = ctx->ux;
779 JQL q = ux->q;
780 if (q->aux->apply_placeholder) {
781 JQVAL *pv = jql_find_placeholder(q, q->aux->apply_placeholder);
782 if (!pv || (pv->type != JQVAL_JBLNODE) || !pv->vnode) {
783 rc = JQL_ERROR_INVALID_PLACEHOLDER_VALUE_TYPE;
784 goto finish;
785 }
786 n = pv->vnode;
787 } else {
788 n = q->aux->apply;
789 }
790 if (!n) {
791 // Skip silently, nothing to do.
792 goto finish;
793 }
794 RCC(rc, finish, jbl_from_node(&jbl, n));
795 RCC(rc, finish, _jb_put_new_lw(ctx->jbc, jbl, &id));
796
797 if (!(q->aux->qmode & JQP_QRY_AGGREGATE)) {
798 struct _EJDB_DOC doc = {
799 .id = id,
800 .raw = jbl,
801 .node = n
802 };
803 do {
804 ctx->istep = 1;
805 RCC(rc, finish, ux->visitor(ux, &doc, &ctx->istep));
806 } while (ctx->istep == -1);
807 }
808 ++ux->cnt;
809
810 finish:
811 jbl_destroy(&jbl);
812 return rc;
813 }
814
815 //----------------------- Public API
816
ejdb_exec(EJDB_EXEC * ux)817 iwrc ejdb_exec(EJDB_EXEC *ux) {
818 if (!ux || !ux->db || !ux->q) {
819 return IW_ERROR_INVALID_ARGS;
820 }
821 int rci;
822 iwrc rc = 0;
823 if (!ux->visitor) {
824 ux->visitor = _jb_noop_visitor;
825 ux->q->aux->projection = 0; // Actually we don't need projection if exists
826 }
827 if (ux->log) {
828 // set terminating NULL to current pos of log
829 iwxstr_cat(ux->log, 0, 0);
830 }
831 JBEXEC ctx = {
832 .ux = ux
833 };
834 if (ux->limit < 1) {
835 rc = jql_get_limit(ux->q, &ux->limit);
836 RCRET(rc);
837 if (ux->limit < 1) {
838 ux->limit = INT64_MAX;
839 }
840 }
841 if (ux->skip < 1) {
842 rc = jql_get_skip(ux->q, &ux->skip);
843 RCRET(rc);
844 }
845 rc = _jb_coll_acquire_keeplock2(ux->db, ux->q->coll,
846 jql_has_apply(ux->q) ? JB_COLL_ACQUIRE_WRITE : JB_COLL_ACQUIRE_EXISTING,
847 &ctx.jbc);
848 if (rc == IW_ERROR_NOT_EXISTS) {
849 return 0;
850 } else {
851 RCRET(rc);
852 }
853
854 RCC(rc, finish, _jb_exec_scan_init(&ctx));
855 if (ctx.sorting) {
856 if (ux->log) {
857 iwxstr_cat2(ux->log, " [COLLECTOR] SORTER\n");
858 }
859 rc = ctx.scanner(&ctx, jbi_sorter_consumer);
860 } else {
861 if (ux->log) {
862 iwxstr_cat2(ux->log, " [COLLECTOR] PLAIN\n");
863 }
864 rc = ctx.scanner(&ctx, jbi_consumer);
865 }
866 RCGO(rc, finish);
867 if ((ux->cnt == 0) && jql_has_apply_upsert(ux->q)) {
868 // No records found trying to upsert new record
869 rc = _jb_exec_upsert_lw(&ctx);
870 }
871
872 finish:
873 _jb_exec_scan_release(&ctx);
874 API_COLL_UNLOCK(ctx.jbc, rci, rc);
875 jql_reset(ux->q, true, false);
876 return rc;
877 }
878
879 struct JB_LIST_VISITOR_CTX {
880 EJDB_DOC head;
881 EJDB_DOC tail;
882 };
883
_jb_exec_list_visitor(struct _EJDB_EXEC * ctx,EJDB_DOC doc,int64_t * step)884 static iwrc _jb_exec_list_visitor(struct _EJDB_EXEC *ctx, EJDB_DOC doc, int64_t *step) {
885 struct JB_LIST_VISITOR_CTX *lvc = ctx->opaque;
886 IWPOOL *pool = ctx->pool;
887 struct _EJDB_DOC *ndoc = iwpool_alloc(sizeof(*ndoc) + sizeof(*doc->raw) + doc->raw->bn.size, pool);
888 if (!ndoc) {
889 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
890 }
891 ndoc->id = doc->id;
892 ndoc->raw = (void*) (((uint8_t*) ndoc) + sizeof(*ndoc));
893 ndoc->raw->node = 0;
894 ndoc->node = doc->node;
895 ndoc->next = 0;
896 ndoc->prev = 0;
897 memcpy(&ndoc->raw->bn, &doc->raw->bn, sizeof(ndoc->raw->bn));
898 ndoc->raw->bn.ptr = ((uint8_t*) ndoc) + sizeof(*ndoc) + sizeof(*doc->raw);
899 memcpy(ndoc->raw->bn.ptr, doc->raw->bn.ptr, doc->raw->bn.size);
900
901 if (!lvc->head) {
902 lvc->head = ndoc;
903 lvc->tail = ndoc;
904 } else {
905 lvc->tail->next = ndoc;
906 ndoc->prev = lvc->tail;
907 lvc->tail = ndoc;
908 }
909 return 0;
910 }
911
_jb_list(EJDB db,JQL q,EJDB_DOC * first,int64_t limit,IWXSTR * log,IWPOOL * pool)912 static iwrc _jb_list(EJDB db, JQL q, EJDB_DOC *first, int64_t limit, IWXSTR *log, IWPOOL *pool) {
913 if (!db || !q || !first || !pool) {
914 return IW_ERROR_INVALID_ARGS;
915 }
916 iwrc rc = 0;
917 struct JB_LIST_VISITOR_CTX lvc = { 0 };
918 struct _EJDB_EXEC ux = {
919 .db = db,
920 .q = q,
921 .visitor = _jb_exec_list_visitor,
922 .pool = pool,
923 .limit = limit,
924 .log = log,
925 .opaque = &lvc
926 };
927 rc = ejdb_exec(&ux);
928 if (rc) {
929 *first = 0;
930 } else {
931 *first = lvc.head;
932 }
933 return rc;
934 }
935
_jb_count(EJDB db,JQL q,int64_t * count,int64_t limit,IWXSTR * log)936 static iwrc _jb_count(EJDB db, JQL q, int64_t *count, int64_t limit, IWXSTR *log) {
937 if (!db || !q || !count) {
938 return IW_ERROR_INVALID_ARGS;
939 }
940 struct _EJDB_EXEC ux = {
941 .db = db,
942 .q = q,
943 .limit = limit,
944 .log = log
945 };
946 iwrc rc = ejdb_exec(&ux);
947 *count = ux.cnt;
948 return rc;
949 }
950
ejdb_count(EJDB db,JQL q,int64_t * count,int64_t limit)951 iwrc ejdb_count(EJDB db, JQL q, int64_t *count, int64_t limit) {
952 return _jb_count(db, q, count, limit, 0);
953 }
954
ejdb_count2(EJDB db,const char * coll,const char * q,int64_t * count,int64_t limit)955 iwrc ejdb_count2(EJDB db, const char *coll, const char *q, int64_t *count, int64_t limit) {
956 JQL jql;
957 iwrc rc = jql_create(&jql, coll, q);
958 RCRET(rc);
959 rc = _jb_count(db, jql, count, limit, 0);
960 jql_destroy(&jql);
961 return rc;
962 }
963
ejdb_update(EJDB db,JQL q)964 iwrc ejdb_update(EJDB db, JQL q) {
965 int64_t count;
966 return ejdb_count(db, q, &count, 0);
967 }
968
ejdb_update2(EJDB db,const char * coll,const char * q)969 iwrc ejdb_update2(EJDB db, const char *coll, const char *q) {
970 int64_t count;
971 return ejdb_count2(db, coll, q, &count, 0);
972 }
973
ejdb_list(EJDB db,JQL q,EJDB_DOC * first,int64_t limit,IWPOOL * pool)974 iwrc ejdb_list(EJDB db, JQL q, EJDB_DOC *first, int64_t limit, IWPOOL *pool) {
975 return _jb_list(db, q, first, limit, 0, pool);
976 }
977
ejdb_list3(EJDB db,const char * coll,const char * query,int64_t limit,IWXSTR * log,EJDB_LIST * listp)978 iwrc ejdb_list3(EJDB db, const char *coll, const char *query, int64_t limit, IWXSTR *log, EJDB_LIST *listp) {
979 if (!listp) {
980 return IW_ERROR_INVALID_ARGS;
981 }
982 iwrc rc = 0;
983 *listp = 0;
984 IWPOOL *pool = iwpool_create(1024);
985 if (!pool) {
986 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
987 }
988 EJDB_LIST list = iwpool_alloc(sizeof(*list), pool);
989 if (!list) {
990 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
991 goto finish;
992 }
993 list->first = 0;
994 list->db = db;
995 list->pool = pool;
996 RCC(rc, finish, jql_create(&list->q, coll, query));
997 rc = _jb_list(db, list->q, &list->first, limit, log, list->pool);
998
999 finish:
1000 if (rc) {
1001 iwpool_destroy(pool);
1002 } else {
1003 *listp = list;
1004 }
1005 return rc;
1006 }
1007
ejdb_list4(EJDB db,JQL q,int64_t limit,IWXSTR * log,EJDB_LIST * listp)1008 iwrc ejdb_list4(EJDB db, JQL q, int64_t limit, IWXSTR *log, EJDB_LIST *listp) {
1009 if (!listp) {
1010 return IW_ERROR_INVALID_ARGS;
1011 }
1012 iwrc rc = 0;
1013 *listp = 0;
1014 IWPOOL *pool = iwpool_create(1024);
1015 if (!pool) {
1016 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
1017 }
1018 EJDB_LIST list = iwpool_alloc(sizeof(*list), pool);
1019 if (!list) {
1020 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
1021 goto finish;
1022 }
1023 list->q = 0;
1024 list->first = 0;
1025 list->db = db;
1026 list->pool = pool;
1027 rc = _jb_list(db, q, &list->first, limit, log, list->pool);
1028
1029 finish:
1030 if (rc) {
1031 iwpool_destroy(pool);
1032 } else {
1033 *listp = list;
1034 }
1035 return rc;
1036 }
1037
ejdb_list2(EJDB db,const char * coll,const char * query,int64_t limit,EJDB_LIST * listp)1038 iwrc ejdb_list2(EJDB db, const char *coll, const char *query, int64_t limit, EJDB_LIST *listp) {
1039 return ejdb_list3(db, coll, query, limit, 0, listp);
1040 }
1041
ejdb_list_destroy(EJDB_LIST * listp)1042 void ejdb_list_destroy(EJDB_LIST *listp) {
1043 if (listp) {
1044 EJDB_LIST list = *listp;
1045 if (list) {
1046 if (list->q) {
1047 jql_destroy(&list->q);
1048 }
1049 if (list->pool) {
1050 iwpool_destroy(list->pool);
1051 }
1052 }
1053 *listp = 0;
1054 }
1055 }
1056
ejdb_remove_index(EJDB db,const char * coll,const char * path,ejdb_idx_mode_t mode)1057 iwrc ejdb_remove_index(EJDB db, const char *coll, const char *path, ejdb_idx_mode_t mode) {
1058 if (!db || !coll || !path) {
1059 return IW_ERROR_INVALID_ARGS;
1060 }
1061 int rci;
1062 JBCOLL jbc;
1063 IWKV_val key;
1064 JBL_PTR ptr = 0;
1065 char keybuf[sizeof(KEY_PREFIX_IDXMETA) + 1 + 2 * IWNUMBUF_SIZE]; // Full key format: i.<coldbid>.<idxdbid>
1066
1067 iwrc rc = _jb_coll_acquire_keeplock2(db, coll, JB_COLL_ACQUIRE_WRITE | JB_COLL_ACQUIRE_EXISTING, &jbc);
1068 RCRET(rc);
1069
1070 RCC(rc, finish, jbl_ptr_alloc(path, &ptr));
1071
1072 for (JBIDX idx = jbc->idx, prev = 0; idx; idx = idx->next) {
1073 if (((idx->mode & ~EJDB_IDX_UNIQUE) == (mode & ~EJDB_IDX_UNIQUE)) && !jbl_ptr_cmp(idx->ptr, ptr)) {
1074 key.data = keybuf;
1075 key.size = snprintf(keybuf, sizeof(keybuf), KEY_PREFIX_IDXMETA "%u" "." "%u", jbc->dbid, idx->dbid);
1076 if (key.size >= sizeof(keybuf)) {
1077 rc = IW_ERROR_OVERFLOW;
1078 goto finish;
1079 }
1080 RCC(rc, finish, iwkv_del(db->metadb, &key, 0));
1081 _jb_meta_nrecs_removedb(db, idx->dbid);
1082 if (prev) {
1083 prev->next = idx->next;
1084 } else {
1085 jbc->idx = idx->next;
1086 }
1087 if (idx->idb) {
1088 iwkv_db_destroy(&idx->idb);
1089 }
1090 _jb_idx_release(idx);
1091 break;
1092 }
1093 prev = idx;
1094 }
1095
1096 finish:
1097 free(ptr);
1098 API_COLL_UNLOCK(jbc, rci, rc);
1099 return rc;
1100 }
1101
ejdb_ensure_index(EJDB db,const char * coll,const char * path,ejdb_idx_mode_t mode)1102 iwrc ejdb_ensure_index(EJDB db, const char *coll, const char *path, ejdb_idx_mode_t mode) {
1103 if (!db || !coll || !path) {
1104 return IW_ERROR_INVALID_ARGS;
1105 }
1106 int rci;
1107 JBCOLL jbc;
1108 IWKV_val key, val;
1109 char keybuf[sizeof(KEY_PREFIX_IDXMETA) + 1 + 2 * IWNUMBUF_SIZE]; // Full key format: i.<coldbid>.<idxdbid>
1110
1111 JBIDX idx = 0;
1112 JBL_PTR ptr = 0;
1113 binn *imeta = 0;
1114
1115 switch (mode & (EJDB_IDX_STR | EJDB_IDX_I64 | EJDB_IDX_F64)) {
1116 case EJDB_IDX_STR:
1117 case EJDB_IDX_I64:
1118 case EJDB_IDX_F64:
1119 break;
1120 default:
1121 return EJDB_ERROR_INVALID_INDEX_MODE;
1122 }
1123
1124 iwrc rc = _jb_coll_acquire_keeplock(db, coll, true, &jbc);
1125 RCRET(rc);
1126
1127 RCC(rc, finish, jbl_ptr_alloc(path, &ptr));
1128
1129 for (idx = jbc->idx; idx; idx = idx->next) {
1130 if (((idx->mode & ~EJDB_IDX_UNIQUE) == (mode & ~EJDB_IDX_UNIQUE)) && !jbl_ptr_cmp(idx->ptr, ptr)) {
1131 if (idx->mode != mode) {
1132 rc = EJDB_ERROR_MISMATCHED_INDEX_UNIQUENESS_MODE;
1133 idx = 0;
1134 }
1135 goto finish;
1136 }
1137 }
1138
1139 idx = calloc(1, sizeof(*idx));
1140 if (!idx) {
1141 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
1142 goto finish;
1143 }
1144 idx->mode = mode;
1145 idx->jbc = jbc;
1146 idx->ptr = ptr;
1147 ptr = 0;
1148 idx->idbf = 0;
1149 if (mode & EJDB_IDX_I64) {
1150 idx->idbf |= IWDB_VNUM64_KEYS;
1151 } else if (mode & EJDB_IDX_F64) {
1152 idx->idbf |= IWDB_REALNUM_KEYS;
1153 }
1154 if (!(mode & EJDB_IDX_UNIQUE)) {
1155 idx->idbf |= IWDB_COMPOUND_KEYS;
1156 }
1157
1158 RCC(rc, finish, iwkv_new_db(db->iwkv, idx->idbf, &idx->dbid, &idx->idb));
1159 RCC(rc, finish, _jb_idx_fill(idx));
1160
1161 // save index meta into metadb
1162 imeta = binn_object();
1163 if (!imeta) {
1164 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
1165 goto finish;
1166 }
1167
1168 if ( !binn_object_set_str(imeta, "ptr", path)
1169 || !binn_object_set_uint32(imeta, "mode", idx->mode)
1170 || !binn_object_set_uint32(imeta, "idbf", idx->idbf)
1171 || !binn_object_set_uint32(imeta, "dbid", idx->dbid)) {
1172 rc = JBL_ERROR_CREATION;
1173 goto finish;
1174 }
1175
1176 key.data = keybuf;
1177 // Full key format: i.<coldbid>.<idxdbid>
1178 key.size = snprintf(keybuf, sizeof(keybuf), KEY_PREFIX_IDXMETA "%u" "." "%u", jbc->dbid, idx->dbid);
1179 if (key.size >= sizeof(keybuf)) {
1180 rc = IW_ERROR_OVERFLOW;
1181 goto finish;
1182 }
1183 val.data = binn_ptr(imeta);
1184 val.size = binn_size(imeta);
1185 RCC(rc, finish, iwkv_put(db->metadb, &key, &val, 0));
1186
1187 idx->next = jbc->idx;
1188 jbc->idx = idx;
1189
1190 finish:
1191 if (rc) {
1192 if (idx) {
1193 if (idx->idb) {
1194 iwkv_db_destroy(&idx->idb);
1195 idx->idb = 0;
1196 }
1197 _jb_idx_release(idx);
1198 }
1199 }
1200 free(ptr);
1201 binn_free(imeta);
1202 API_COLL_UNLOCK(jbc, rci, rc);
1203 return rc;
1204 }
1205
_jb_patch(EJDB db,const char * coll,int64_t id,bool upsert,const char * patchjson,JBL_NODE patchjbn,JBL patchjbl)1206 static iwrc _jb_patch(
1207 EJDB db, const char *coll, int64_t id, bool upsert,
1208 const char *patchjson, JBL_NODE patchjbn, JBL patchjbl
1209 ) {
1210 int rci;
1211 JBCOLL jbc;
1212 struct _JBL sjbl;
1213 JBL_NODE root, patch;
1214 JBL ujbl = 0;
1215 IWPOOL *pool = 0;
1216 IWKV_val val = { 0 };
1217 IWKV_val key = {
1218 .data = &id,
1219 .size = sizeof(id)
1220 };
1221
1222 iwrc rc = _jb_coll_acquire_keeplock(db, coll, true, &jbc);
1223 RCRET(rc);
1224
1225 rc = iwkv_get(jbc->cdb, &key, &val);
1226 if (upsert && (rc == IWKV_ERROR_NOTFOUND)) {
1227 if (patchjson) {
1228 rc = jbl_from_json(&ujbl, patchjson);
1229 } else if (patchjbl) {
1230 ujbl = patchjbl;
1231 } else if (patchjbn) {
1232 rc = jbl_from_node(&ujbl, patchjbn);
1233 } else {
1234 rc = IW_ERROR_INVALID_ARGS;
1235 }
1236 RCGO(rc, finish);
1237 if (jbl_type(ujbl) != JBV_OBJECT) {
1238 rc = EJDB_ERROR_PATCH_JSON_NOT_OBJECT;
1239 goto finish;
1240 }
1241 rc = _jb_put_impl(jbc, ujbl, id);
1242 if (!rc && (jbc->id_seq < id)) {
1243 jbc->id_seq = id;
1244 }
1245 goto finish;
1246 } else {
1247 RCGO(rc, finish);
1248 }
1249
1250 RCC(rc, finish, jbl_from_buf_keep_onstack(&sjbl, val.data, val.size));
1251
1252 pool = iwpool_create_empty();
1253 if (!pool) {
1254 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
1255 goto finish;
1256 }
1257
1258 RCC(rc, finish, jbl_to_node(&sjbl, &root, false, pool));
1259
1260 if (patchjson) {
1261 rc = jbn_from_json(patchjson, &patch, pool);
1262 } else if (patchjbl) {
1263 rc = jbl_to_node(patchjbl, &patch, false, pool);
1264 } else if (patchjbn) {
1265 patch = patchjbn;
1266 } else {
1267 rc = IW_ERROR_INVALID_ARGS;
1268 }
1269 RCGO(rc, finish);
1270
1271 RCC(rc, finish, jbn_patch_auto(root, patch, pool));
1272
1273 if (root->type == JBV_OBJECT) {
1274 RCC(rc, finish, jbl_create_empty_object(&ujbl));
1275 } else if (root->type == JBV_ARRAY) {
1276 RCC(rc, finish, jbl_create_empty_array(&ujbl));
1277 } else {
1278 rc = JBL_ERROR_CREATION;
1279 goto finish;
1280 }
1281
1282 RCC(rc, finish, jbl_fill_from_node(ujbl, root));
1283 rc = _jb_put_impl(jbc, ujbl, id);
1284
1285 finish:
1286 API_COLL_UNLOCK(jbc, rci, rc);
1287 if (ujbl != patchjbl) {
1288 jbl_destroy(&ujbl);
1289 }
1290 if (val.data) {
1291 iwkv_val_dispose(&val);
1292 }
1293 iwpool_destroy(pool);
1294 return rc;
1295 }
1296
_jb_wal_lock_interceptor(bool before,void * op)1297 static iwrc _jb_wal_lock_interceptor(bool before, void *op) {
1298 int rci;
1299 iwrc rc = 0;
1300 EJDB db = op;
1301 assert(db);
1302 if (before) {
1303 API_WLOCK2(db, rci);
1304 } else {
1305 API_UNLOCK(db, rci, rc);
1306 }
1307 return rc;
1308 }
1309
ejdb_patch(EJDB db,const char * coll,const char * patchjson,int64_t id)1310 iwrc ejdb_patch(EJDB db, const char *coll, const char *patchjson, int64_t id) {
1311 return _jb_patch(db, coll, id, false, patchjson, 0, 0);
1312 }
1313
ejdb_patch_jbn(EJDB db,const char * coll,JBL_NODE patch,int64_t id)1314 iwrc ejdb_patch_jbn(EJDB db, const char *coll, JBL_NODE patch, int64_t id) {
1315 return _jb_patch(db, coll, id, false, 0, patch, 0);
1316 }
1317
ejdb_patch_jbl(EJDB db,const char * coll,JBL patch,int64_t id)1318 iwrc ejdb_patch_jbl(EJDB db, const char *coll, JBL patch, int64_t id) {
1319 return _jb_patch(db, coll, id, false, 0, 0, patch);
1320 }
1321
ejdb_merge_or_put(EJDB db,const char * coll,const char * patchjson,int64_t id)1322 iwrc ejdb_merge_or_put(EJDB db, const char *coll, const char *patchjson, int64_t id) {
1323 return _jb_patch(db, coll, id, true, patchjson, 0, 0);
1324 }
1325
ejdb_merge_or_put_jbn(EJDB db,const char * coll,JBL_NODE patch,int64_t id)1326 iwrc ejdb_merge_or_put_jbn(EJDB db, const char *coll, JBL_NODE patch, int64_t id) {
1327 return _jb_patch(db, coll, id, true, 0, patch, 0);
1328 }
1329
ejdb_merge_or_put_jbl(EJDB db,const char * coll,JBL patch,int64_t id)1330 iwrc ejdb_merge_or_put_jbl(EJDB db, const char *coll, JBL patch, int64_t id) {
1331 return _jb_patch(db, coll, id, true, 0, 0, patch);
1332 }
1333
ejdb_put(EJDB db,const char * coll,JBL jbl,int64_t id)1334 iwrc ejdb_put(EJDB db, const char *coll, JBL jbl, int64_t id) {
1335 if (!jbl) {
1336 return IW_ERROR_INVALID_ARGS;
1337 }
1338 int rci;
1339 JBCOLL jbc;
1340 iwrc rc = _jb_coll_acquire_keeplock(db, coll, true, &jbc);
1341 RCRET(rc);
1342 rc = _jb_put_impl(jbc, jbl, id);
1343 if (!rc && (jbc->id_seq < id)) {
1344 jbc->id_seq = id;
1345 }
1346 API_COLL_UNLOCK(jbc, rci, rc);
1347 return rc;
1348 }
1349
ejdb_put_jbn(EJDB db,const char * coll,JBL_NODE jbn,int64_t id)1350 iwrc ejdb_put_jbn(EJDB db, const char *coll, JBL_NODE jbn, int64_t id) {
1351 JBL jbl = 0;
1352 iwrc rc = jbl_from_node(&jbl, jbn);
1353 RCRET(rc);
1354 rc = ejdb_put(db, coll, jbl, id);
1355 jbl_destroy(&jbl);
1356 return rc;
1357 }
1358
_jb_put_new_lw(JBCOLL jbc,JBL jbl,int64_t * id)1359 static iwrc _jb_put_new_lw(JBCOLL jbc, JBL jbl, int64_t *id) {
1360 iwrc rc = 0;
1361 int64_t oid = jbc->id_seq + 1;
1362 IWKV_val val, key = {
1363 .data = &oid,
1364 .size = sizeof(oid)
1365 };
1366 struct _JBPHCTX pctx = {
1367 .id = oid,
1368 .jbc = jbc,
1369 .jbl = jbl
1370 };
1371
1372 RCC(rc, finish, jbl_as_buf(jbl, &val.data, &val.size));
1373 RCC(rc, finish, _jb_put_handler_after(iwkv_puth(jbc->cdb, &key, &val, 0, _jb_put_handler, &pctx), &pctx));
1374
1375 jbc->id_seq = oid;
1376 if (id) {
1377 *id = oid;
1378 }
1379
1380 finish:
1381 return rc;
1382 }
1383
ejdb_put_new(EJDB db,const char * coll,JBL jbl,int64_t * id)1384 iwrc ejdb_put_new(EJDB db, const char *coll, JBL jbl, int64_t *id) {
1385 if (!jbl) {
1386 return IW_ERROR_INVALID_ARGS;
1387 }
1388 int rci;
1389 JBCOLL jbc;
1390 if (id) {
1391 *id = 0;
1392 }
1393 iwrc rc = _jb_coll_acquire_keeplock(db, coll, true, &jbc);
1394 RCRET(rc);
1395
1396 rc = _jb_put_new_lw(jbc, jbl, id);
1397
1398 API_COLL_UNLOCK(jbc, rci, rc);
1399 return rc;
1400 }
1401
ejdb_put_new_jbn(EJDB db,const char * coll,JBL_NODE jbn,int64_t * id)1402 iwrc ejdb_put_new_jbn(EJDB db, const char *coll, JBL_NODE jbn, int64_t *id) {
1403 JBL jbl = 0;
1404 iwrc rc = jbl_from_node(&jbl, jbn);
1405 RCRET(rc);
1406 rc = ejdb_put_new(db, coll, jbl, id);
1407 jbl_destroy(&jbl);
1408 return rc;
1409 }
1410
jb_get(EJDB db,const char * coll,int64_t id,jb_coll_acquire_t acm,JBL * jblp)1411 iwrc jb_get(EJDB db, const char *coll, int64_t id, jb_coll_acquire_t acm, JBL *jblp) {
1412 if (!id || !jblp) {
1413 return IW_ERROR_INVALID_ARGS;
1414 }
1415 *jblp = 0;
1416
1417 int rci;
1418 JBCOLL jbc;
1419 JBL jbl = 0;
1420 IWKV_val val = { 0 };
1421 IWKV_val key = { .data = &id, .size = sizeof(id) };
1422
1423 iwrc rc = _jb_coll_acquire_keeplock2(db, coll, acm, &jbc);
1424 RCRET(rc);
1425
1426 RCC(rc, finish, iwkv_get(jbc->cdb, &key, &val));
1427 RCC(rc, finish, jbl_from_buf_keep(&jbl, val.data, val.size, false));
1428
1429 *jblp = jbl;
1430
1431 finish:
1432 if (rc) {
1433 if (jbl) {
1434 jbl_destroy(&jbl);
1435 } else {
1436 iwkv_val_dispose(&val);
1437 }
1438 }
1439 API_COLL_UNLOCK(jbc, rci, rc);
1440 return rc;
1441 }
1442
ejdb_get(EJDB db,const char * coll,int64_t id,JBL * jblp)1443 iwrc ejdb_get(EJDB db, const char *coll, int64_t id, JBL *jblp) {
1444 return jb_get(db, coll, id, JB_COLL_ACQUIRE_EXISTING, jblp);
1445 }
1446
ejdb_del(EJDB db,const char * coll,int64_t id)1447 iwrc ejdb_del(EJDB db, const char *coll, int64_t id) {
1448 int rci;
1449 JBCOLL jbc;
1450 struct _JBL jbl;
1451 IWKV_val val = { 0 };
1452 IWKV_val key = { .data = &id, .size = sizeof(id) };
1453
1454 iwrc rc = _jb_coll_acquire_keeplock2(db, coll, JB_COLL_ACQUIRE_WRITE | JB_COLL_ACQUIRE_EXISTING, &jbc);
1455 RCRET(rc);
1456
1457 RCC(rc, finish, iwkv_get(jbc->cdb, &key, &val));
1458 RCC(rc, finish, jbl_from_buf_keep_onstack(&jbl, val.data, val.size));
1459
1460 for (JBIDX idx = jbc->idx; idx; idx = idx->next) {
1461 IWRC(_jb_idx_record_remove(idx, id, &jbl), rc);
1462 }
1463
1464 RCC(rc, finish, iwkv_del(jbc->cdb, &key, 0));
1465 _jb_meta_nrecs_update(jbc->db, jbc->dbid, -1);
1466 jbc->rnum -= 1;
1467
1468 finish:
1469 if (val.data) {
1470 iwkv_val_dispose(&val);
1471 }
1472 API_COLL_UNLOCK(jbc, rci, rc);
1473 return rc;
1474 }
1475
jb_del(JBCOLL jbc,JBL jbl,int64_t id)1476 iwrc jb_del(JBCOLL jbc, JBL jbl, int64_t id) {
1477 iwrc rc = 0;
1478 IWKV_val key = { .data = &id, .size = sizeof(id) };
1479 for (JBIDX idx = jbc->idx; idx; idx = idx->next) {
1480 IWRC(_jb_idx_record_remove(idx, id, jbl), rc);
1481 }
1482 rc = iwkv_del(jbc->cdb, &key, 0);
1483 RCRET(rc);
1484 _jb_meta_nrecs_update(jbc->db, jbc->dbid, -1);
1485 jbc->rnum -= 1;
1486 return rc;
1487 }
1488
jb_cursor_del(JBCOLL jbc,IWKV_cursor cur,int64_t id,JBL jbl)1489 iwrc jb_cursor_del(JBCOLL jbc, IWKV_cursor cur, int64_t id, JBL jbl) {
1490 iwrc rc = 0;
1491 for (JBIDX idx = jbc->idx; idx; idx = idx->next) {
1492 IWRC(_jb_idx_record_remove(idx, id, jbl), rc);
1493 }
1494 rc = iwkv_cursor_del(cur, 0);
1495 RCRET(rc);
1496 _jb_meta_nrecs_update(jbc->db, jbc->dbid, -1);
1497 jbc->rnum -= 1;
1498 return rc;
1499 }
1500
ejdb_ensure_collection(EJDB db,const char * coll)1501 iwrc ejdb_ensure_collection(EJDB db, const char *coll) {
1502 int rci;
1503 JBCOLL jbc;
1504 iwrc rc = _jb_coll_acquire_keeplock(db, coll, false, &jbc);
1505 RCRET(rc);
1506 API_COLL_UNLOCK(jbc, rci, rc);
1507 return rc;
1508 }
1509
ejdb_remove_collection(EJDB db,const char * coll)1510 iwrc ejdb_remove_collection(EJDB db, const char *coll) {
1511 int rci;
1512 iwrc rc = 0;
1513 if (db->oflags & IWKV_RDONLY) {
1514 return IW_ERROR_READONLY;
1515 }
1516 API_WLOCK(db, rci);
1517 JBCOLL jbc;
1518 IWKV_val key;
1519 char keybuf[sizeof(KEY_PREFIX_IDXMETA) + 1 + 2 * IWNUMBUF_SIZE]; // Full key format: i.<coldbid>.<idxdbid>
1520
1521 jbc = iwhmap_get(db->mcolls, coll);
1522 if (jbc) {
1523 key.data = keybuf;
1524 key.size = snprintf(keybuf, sizeof(keybuf), KEY_PREFIX_COLLMETA "%u", jbc->dbid);
1525
1526 RCC(rc, finish, iwkv_del(jbc->db->metadb, &key, IWKV_SYNC));
1527
1528 _jb_meta_nrecs_removedb(db, jbc->dbid);
1529
1530 for (JBIDX idx = jbc->idx; idx; idx = idx->next) {
1531 key.data = keybuf;
1532 key.size = snprintf(keybuf, sizeof(keybuf), KEY_PREFIX_IDXMETA "%u" "." "%u", jbc->dbid, idx->dbid);
1533 RCC(rc, finish, iwkv_del(jbc->db->metadb, &key, 0));
1534 _jb_meta_nrecs_removedb(db, idx->dbid);
1535 }
1536 for (JBIDX idx = jbc->idx, nidx; idx; idx = nidx) {
1537 IWRC(iwkv_db_destroy(&idx->idb), rc);
1538 idx->idb = 0;
1539 nidx = idx->next;
1540 _jb_idx_release(idx);
1541 }
1542 jbc->idx = 0;
1543 IWRC(iwkv_db_destroy(&jbc->cdb), rc);
1544 iwhmap_remove(db->mcolls, coll);
1545 }
1546
1547 finish:
1548 API_UNLOCK(db, rci, rc);
1549 return rc;
1550 }
1551
jb_collection_join_resolver(int64_t id,const char * coll,JBL * out,JBEXEC * ctx)1552 iwrc jb_collection_join_resolver(int64_t id, const char *coll, JBL *out, JBEXEC *ctx) {
1553 assert(out && ctx && coll);
1554 EJDB db = ctx->jbc->db;
1555 return jb_get(db, coll, id, JB_COLL_ACQUIRE_EXISTING, out);
1556 }
1557
jb_proj_node_cache_cmp(const void * v1,const void * v2)1558 int jb_proj_node_cache_cmp(const void *v1, const void *v2) {
1559 const struct _JBDOCREF *r1 = v1;
1560 const struct _JBDOCREF *r2 = v2;
1561 int ret = r1->id > r2->id ? 1 : r1->id < r2->id ? -1 : 0;
1562 if (!ret) {
1563 return strcmp(r1->coll, r2->coll);
1564 }
1565 return ret;
1566 }
1567
jb_proj_node_kvfree(void * key,void * val)1568 void jb_proj_node_kvfree(void *key, void *val) {
1569 free(key);
1570 }
1571
jb_proj_node_hash(const void * key)1572 uint32_t jb_proj_node_hash(const void *key) {
1573 const struct _JBDOCREF *ref = key;
1574 return murmur3(key, sizeof(*ref));
1575 }
1576
ejdb_rename_collection(EJDB db,const char * coll,const char * new_coll)1577 iwrc ejdb_rename_collection(EJDB db, const char *coll, const char *new_coll) {
1578 if (!coll || !new_coll) {
1579 return IW_ERROR_INVALID_ARGS;
1580 }
1581 int rci;
1582 iwrc rc = 0;
1583 if (db->oflags & IWKV_RDONLY) {
1584 return IW_ERROR_READONLY;
1585 }
1586 IWKV_val key, val;
1587 JBL nmeta = 0, jbv = 0;
1588 char keybuf[IWNUMBUF_SIZE + sizeof(KEY_PREFIX_COLLMETA)];
1589
1590 API_WLOCK(db, rci);
1591
1592 JBCOLL jbc = iwhmap_get(db->mcolls, coll);
1593 if (!jbc) {
1594 rc = EJDB_ERROR_COLLECTION_NOT_FOUND;
1595 goto finish;
1596 }
1597
1598 if (iwhmap_get(db->mcolls, new_coll)) {
1599 rc = EJDB_ERROR_TARGET_COLLECTION_EXISTS;
1600 goto finish;
1601 }
1602
1603 RCC(rc, finish, jbl_create_empty_object(&nmeta));
1604 if (!binn_object_set_str(&nmeta->bn, "name", new_coll)) {
1605 rc = JBL_ERROR_CREATION;
1606 goto finish;
1607 }
1608
1609 if (!binn_object_set_uint32(&nmeta->bn, "id", jbc->dbid)) {
1610 rc = JBL_ERROR_CREATION;
1611 goto finish;
1612 }
1613
1614 RCC(rc, finish, jbl_as_buf(nmeta, &val.data, &val.size));
1615 key.size = snprintf(keybuf, sizeof(keybuf), KEY_PREFIX_COLLMETA "%u", jbc->dbid);
1616 if (key.size >= sizeof(keybuf)) {
1617 rc = IW_ERROR_OVERFLOW;
1618 goto finish;
1619 }
1620 key.data = keybuf;
1621
1622 RCC(rc, finish, jbl_at(nmeta, "/name", &jbv));
1623 const char *new_name = jbl_get_str(jbv);
1624 RCC(rc, finish, iwkv_put(db->metadb, &key, &val, IWKV_SYNC));
1625 RCC(rc, finish, iwhmap_rename(db->mcolls, coll, (void*) new_name));
1626
1627 jbc->name = new_name;
1628 jbl_destroy(&jbc->meta);
1629 jbc->meta = nmeta;
1630
1631 finish:
1632 if (jbv) {
1633 jbl_destroy(&jbv);
1634 }
1635 if (rc) {
1636 if (nmeta) {
1637 jbl_destroy(&nmeta);
1638 }
1639 }
1640 API_UNLOCK(db, rci, rc);
1641 return rc;
1642 }
1643
ejdb_get_meta(EJDB db,JBL * jblp)1644 iwrc ejdb_get_meta(EJDB db, JBL *jblp) {
1645 int rci;
1646 *jblp = 0;
1647 JBL jbl;
1648 iwrc rc = jbl_create_empty_object(&jbl);
1649 RCRET(rc);
1650 binn *clist = 0;
1651 API_RLOCK(db, rci);
1652 if (!binn_object_set_str(&jbl->bn, "version", ejdb_version_full())) {
1653 rc = JBL_ERROR_CREATION;
1654 goto finish;
1655 }
1656 IWFS_FSM_STATE sfsm;
1657 rc = iwkv_state(db->iwkv, &sfsm);
1658 RCRET(rc);
1659 if ( !binn_object_set_str(&jbl->bn, "file", sfsm.exfile.file.opts.path)
1660 || !binn_object_set_int64(&jbl->bn, "size", sfsm.exfile.fsize)) {
1661 rc = JBL_ERROR_CREATION;
1662 goto finish;
1663 }
1664 clist = binn_list();
1665 if (!clist) {
1666 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
1667 goto finish;
1668 }
1669
1670 IWHMAP_ITER iter;
1671 iwhmap_iter_init(db->mcolls, &iter);
1672 while (iwhmap_iter_next(&iter)) {
1673 JBCOLL jbc = (void*) iter.val;
1674 RCC(rc, finish, _jb_coll_add_meta_lr(jbc, clist));
1675 }
1676
1677 if (!binn_object_set_list(&jbl->bn, "collections", clist)) {
1678 rc = JBL_ERROR_CREATION;
1679 goto finish;
1680 }
1681 binn_free(clist);
1682 clist = 0;
1683
1684 finish:
1685 API_UNLOCK(db, rci, rc);
1686 if (rc) {
1687 if (clist) {
1688 binn_free(clist);
1689 }
1690 jbl_destroy(&jbl);
1691 } else {
1692 *jblp = jbl;
1693 }
1694 return rc;
1695 }
1696
ejdb_online_backup(EJDB db,uint64_t * ts,const char * target_file)1697 iwrc ejdb_online_backup(EJDB db, uint64_t *ts, const char *target_file) {
1698 ENSURE_OPEN(db);
1699 return iwkv_online_backup(db->iwkv, ts, target_file);
1700 }
1701
ejdb_get_iwkv(EJDB db,IWKV * kvp)1702 iwrc ejdb_get_iwkv(EJDB db, IWKV *kvp) {
1703 if (!db || !kvp) {
1704 return IW_ERROR_INVALID_ARGS;
1705 }
1706 *kvp = db->iwkv;
1707 return 0;
1708 }
1709
_mcolls_map_entry_free(void * key,void * val)1710 static void _mcolls_map_entry_free(void *key, void *val) {
1711 if (val) {
1712 _jb_coll_release(val);
1713 }
1714 }
1715
ejdb_open(const EJDB_OPTS * _opts,EJDB * ejdbp)1716 iwrc ejdb_open(const EJDB_OPTS *_opts, EJDB *ejdbp) {
1717 *ejdbp = 0;
1718 int rci;
1719 iwrc rc = ejdb_init();
1720 RCRET(rc);
1721 if (!_opts || !_opts->kv.path || !ejdbp) {
1722 return IW_ERROR_INVALID_ARGS;
1723 }
1724
1725 EJDB db = calloc(1, sizeof(*db));
1726 if (!db) {
1727 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
1728 }
1729
1730 memcpy(&db->opts, _opts, sizeof(db->opts));
1731 if (!db->opts.sort_buffer_sz) {
1732 db->opts.sort_buffer_sz = 16 * 1024 * 1024; // 16Mb
1733 }
1734 if (db->opts.sort_buffer_sz < 1024 * 1024) { // Min 1Mb
1735 db->opts.sort_buffer_sz = 1024 * 1024;
1736 }
1737 if (!db->opts.document_buffer_sz) { // 64Kb
1738 db->opts.document_buffer_sz = 64 * 1024;
1739 }
1740 if (db->opts.document_buffer_sz < 16 * 1024) { // Min 16Kb
1741 db->opts.document_buffer_sz = 16 * 1024;
1742 }
1743 EJDB_HTTP *http = &db->opts.http;
1744 if (http->bind) {
1745 http->bind = strdup(http->bind);
1746 }
1747 if (http->access_token) {
1748 http->access_token = strdup(http->access_token);
1749 if (!http->access_token) {
1750 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
1751 }
1752 http->access_token_len = strlen(http->access_token);
1753 }
1754
1755 pthread_rwlockattr_t attr;
1756 pthread_rwlockattr_init(&attr);
1757 #if defined __linux__ && (defined __USE_UNIX98 || defined __USE_XOPEN2K)
1758 pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
1759 #endif
1760 rci = pthread_rwlock_init(&db->rwl, &attr);
1761 if (rci) {
1762 rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
1763 free(db);
1764 return rc;
1765 }
1766 RCB(finish, db->mcolls = iwhmap_create_str(_mcolls_map_entry_free));
1767
1768 IWKV_OPTS kvopts;
1769 memcpy(&kvopts, &db->opts.kv, sizeof(db->opts.kv));
1770 kvopts.wal.enabled = !db->opts.no_wal;
1771 kvopts.wal.wal_lock_interceptor = _jb_wal_lock_interceptor;
1772 kvopts.wal.wal_lock_interceptor_opaque = db;
1773
1774 RCC(rc, finish, iwkv_open(&kvopts, &db->iwkv));
1775
1776 db->oflags = kvopts.oflags;
1777 RCC(rc, finish, _jb_db_meta_load(db));
1778
1779 if (db->opts.http.enabled) {
1780 // Maximum WS/HTTP API body size. Default: 64Mb, Min: 512K
1781 if (!db->opts.http.max_body_size) {
1782 db->opts.http.max_body_size = 64 * 1024 * 1024;
1783 } else if (db->opts.http.max_body_size < 512 * 1024) {
1784 db->opts.http.max_body_size = 512 * 1024;
1785 }
1786 }
1787
1788 #ifdef JB_HTTP
1789 if (db->opts.http.enabled && !db->opts.http.blocking) {
1790 RCC(rc, finish, jbr_start(db, &db->opts, &db->jbr));
1791 }
1792 #endif
1793
1794 finish:
1795 if (rc) {
1796 _jb_db_release(&db);
1797 } else {
1798 db->open = true;
1799 *ejdbp = db;
1800 #ifdef JB_HTTP
1801 if (db->opts.http.enabled && db->opts.http.blocking) {
1802 rc = jbr_start(db, &db->opts, &db->jbr);
1803 }
1804 #endif
1805 }
1806 return rc;
1807 }
1808
ejdb_close(EJDB * ejdbp)1809 iwrc ejdb_close(EJDB *ejdbp) {
1810 if (!ejdbp || !*ejdbp) {
1811 return IW_ERROR_INVALID_ARGS;
1812 }
1813 EJDB db = *ejdbp;
1814 if (!__sync_bool_compare_and_swap(&db->open, 1, 0)) {
1815 iwlog_error2("Database is closed already");
1816 return IW_ERROR_INVALID_STATE;
1817 }
1818 iwrc rc = _jb_db_release(ejdbp);
1819 return rc;
1820 }
1821
ejdb_git_revision(void)1822 const char* ejdb_git_revision(void) {
1823 return EJDB2_GIT_REVISION;
1824 }
1825
ejdb_version_full(void)1826 const char* ejdb_version_full(void) {
1827 return EJDB2_VERSION;
1828 }
1829
ejdb_version_major(void)1830 unsigned int ejdb_version_major(void) {
1831 return EJDB2_VERSION_MAJOR;
1832 }
1833
ejdb_version_minor(void)1834 unsigned int ejdb_version_minor(void) {
1835 return EJDB2_VERSION_MINOR;
1836 }
1837
ejdb_version_patch(void)1838 unsigned int ejdb_version_patch(void) {
1839 return EJDB2_VERSION_PATCH;
1840 }
1841
_ejdb_ecodefn(locale_t locale,uint32_t ecode)1842 static const char* _ejdb_ecodefn(locale_t locale, uint32_t ecode) {
1843 if (!((ecode > _EJDB_ERROR_START) && (ecode < _EJDB_ERROR_END))) {
1844 return 0;
1845 }
1846 switch (ecode) {
1847 case EJDB_ERROR_INVALID_COLLECTION_META:
1848 return "Invalid collection metadata (EJDB_ERROR_INVALID_COLLECTION_META)";
1849 case EJDB_ERROR_INVALID_COLLECTION_INDEX_META:
1850 return "Invalid collection index metadata (EJDB_ERROR_INVALID_COLLECTION_INDEX_META)";
1851 case EJDB_ERROR_INVALID_INDEX_MODE:
1852 return "Invalid index mode specified (EJDB_ERROR_INVALID_INDEX_MODE)";
1853 case EJDB_ERROR_MISMATCHED_INDEX_UNIQUENESS_MODE:
1854 return "Index exists but mismatched uniqueness constraint (EJDB_ERROR_MISMATCHED_INDEX_UNIQUENESS_MODE)";
1855 case EJDB_ERROR_UNIQUE_INDEX_CONSTRAINT_VIOLATED:
1856 return "Unique index constraint violated (EJDB_ERROR_UNIQUE_INDEX_CONSTRAINT_VIOLATED)";
1857 case EJDB_ERROR_INVALID_COLLECTION_NAME:
1858 return "Invalid collection name (EJDB_ERROR_INVALID_COLLECTION_NAME)";
1859 case EJDB_ERROR_COLLECTION_NOT_FOUND:
1860 return "Collection not found (EJDB_ERROR_COLLECTION_NOT_FOUND)";
1861 case EJDB_ERROR_TARGET_COLLECTION_EXISTS:
1862 return "Target collection exists (EJDB_ERROR_TARGET_COLLECTION_EXISTS)";
1863 case EJDB_ERROR_PATCH_JSON_NOT_OBJECT:
1864 return "Patch JSON must be an object (map) (EJDB_ERROR_PATCH_JSON_NOT_OBJECT)";
1865 }
1866 return 0;
1867 }
1868
ejdb_init()1869 iwrc ejdb_init() {
1870 static volatile int jb_initialized = 0;
1871 if (!__sync_bool_compare_and_swap(&jb_initialized, 0, 1)) {
1872 return 0; // initialized already
1873 }
1874 iwrc rc = iw_init();
1875 RCRET(rc);
1876 rc = jbl_init();
1877 RCRET(rc);
1878 rc = jql_init();
1879 RCRET(rc);
1880 #ifdef JB_HTTP
1881 rc = jbr_init();
1882 RCRET(rc);
1883 #endif
1884 return iwlog_register_ecodefn(_ejdb_ecodefn);
1885 }
1886