• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "ejdb2_internal.h"
2 
3 // ---------------------------------------------------------------------------
4 
5 static iwrc _jb_put_new_lw(JBCOLL jbc, JBL jbl, int64_t *id);
6 
7 static const IWKV_val EMPTY_VAL = { 0 };
8 
_jb_meta_nrecs_removedb(EJDB db,uint32_t dbid)9 IW_INLINE iwrc _jb_meta_nrecs_removedb(EJDB db, uint32_t dbid) {
10   dbid = IW_HTOIL(dbid);
11   IWKV_val key = {
12     .size = sizeof(dbid),
13     .data = &dbid
14   };
15   return iwkv_del(db->nrecdb, &key, 0);
16 }
17 
_jb_meta_nrecs_update(EJDB db,uint32_t dbid,int64_t delta)18 IW_INLINE iwrc _jb_meta_nrecs_update(EJDB db, uint32_t dbid, int64_t delta) {
19   delta = IW_HTOILL(delta);
20   dbid = IW_HTOIL(dbid);
21   IWKV_val val = {
22     .size = sizeof(delta),
23     .data = &delta
24   };
25   IWKV_val key = {
26     .size = sizeof(dbid),
27     .data = &dbid
28   };
29   return iwkv_put(db->nrecdb, &key, &val, IWKV_VAL_INCREMENT);
30 }
31 
_jb_meta_nrecs_get(EJDB db,uint32_t dbid)32 static int64_t _jb_meta_nrecs_get(EJDB db, uint32_t dbid) {
33   size_t vsz = 0;
34   uint64_t ret = 0;
35   dbid = IW_HTOIL(dbid);
36   IWKV_val key = {
37     .size = sizeof(dbid),
38     .data = &dbid
39   };
40   iwkv_get_copy(db->nrecdb, &key, &ret, sizeof(ret), &vsz);
41   if (vsz == sizeof(ret)) {
42     ret = IW_ITOHLL(ret);
43   }
44   return (int64_t) ret;
45 }
46 
_jb_idx_release(JBIDX idx)47 static void _jb_idx_release(JBIDX idx) {
48   if (idx->idb) {
49     iwkv_db_cache_release(idx->idb);
50   }
51   free(idx->ptr);
52   free(idx);
53 }
54 
_jb_coll_release(JBCOLL jbc)55 static void _jb_coll_release(JBCOLL jbc) {
56   if (jbc->cdb) {
57     iwkv_db_cache_release(jbc->cdb);
58   }
59   if (jbc->meta) {
60     jbl_destroy(&jbc->meta);
61   }
62   JBIDX nidx;
63   for (JBIDX idx = jbc->idx; idx; idx = nidx) {
64     nidx = idx->next;
65     _jb_idx_release(idx);
66   }
67   jbc->idx = 0;
68   pthread_rwlock_destroy(&jbc->rwl);
69   free(jbc);
70 }
71 
_jb_coll_load_index_lr(JBCOLL jbc,IWKV_val * mval)72 static iwrc _jb_coll_load_index_lr(JBCOLL jbc, IWKV_val *mval) {
73   binn *bn;
74   char *ptr;
75   struct _JBL imeta;
76   JBIDX idx = calloc(1, sizeof(*idx));
77   if (!idx) {
78     return iwrc_set_errno(IW_ERROR_ALLOC, errno);
79   }
80 
81   iwrc rc = jbl_from_buf_keep_onstack(&imeta, mval->data, mval->size);
82   RCGO(rc, finish);
83   bn = &imeta.bn;
84 
85   if (  !binn_object_get_str(bn, "ptr", &ptr)
86      || !binn_object_get_uint8(bn, "mode", &idx->mode)
87      || !binn_object_get_uint8(bn, "idbf", &idx->idbf)
88      || !binn_object_get_uint32(bn, "dbid", &idx->dbid)) {
89     rc = EJDB_ERROR_INVALID_COLLECTION_INDEX_META;
90     goto finish;
91   }
92   rc = jbl_ptr_alloc(ptr, &idx->ptr);
93   RCGO(rc, finish);
94 
95   rc = iwkv_db(jbc->db->iwkv, idx->dbid, idx->idbf, &idx->idb);
96   RCGO(rc, finish);
97   idx->jbc = jbc;
98   idx->rnum = _jb_meta_nrecs_get(jbc->db, idx->dbid);
99   idx->next = jbc->idx;
100   jbc->idx = idx;
101 
102 finish:
103   if (rc) {
104     _jb_idx_release(idx);
105   }
106   return rc;
107 }
108 
_jb_coll_load_indexes_lr(JBCOLL jbc)109 static iwrc _jb_coll_load_indexes_lr(JBCOLL jbc) {
110   iwrc rc = 0;
111   IWKV_cursor cur;
112   IWKV_val kval;
113   char buf[sizeof(KEY_PREFIX_IDXMETA) + JBNUMBUF_SIZE];
114   // Full key format: i.<coldbid>.<idxdbid>
115   int sz = snprintf(buf, sizeof(buf), KEY_PREFIX_IDXMETA "%u.", jbc->dbid);
116   if (sz >= sizeof(buf)) {
117     return IW_ERROR_OVERFLOW;
118   }
119   kval.data = buf;
120   kval.size = sz;
121   rc = iwkv_cursor_open(jbc->db->metadb, &cur, IWKV_CURSOR_GE, &kval);
122   if (rc == IWKV_ERROR_NOTFOUND) {
123     rc = 0;
124     goto finish;
125   }
126   RCRET(rc);
127 
128   do {
129     IWKV_val key, val;
130     rc = iwkv_cursor_key(cur, &key);
131     RCGO(rc, finish);
132     if ((key.size > sz) && !strncmp(buf, key.data, sz)) {
133       iwkv_val_dispose(&key);
134       rc = iwkv_cursor_val(cur, &val);
135       RCGO(rc, finish);
136       rc = _jb_coll_load_index_lr(jbc, &val);
137       iwkv_val_dispose(&val);
138       RCBREAK(rc);
139     } else {
140       iwkv_val_dispose(&key);
141     }
142   } while (!(rc = iwkv_cursor_to(cur, IWKV_CURSOR_PREV)));
143   if (rc == IWKV_ERROR_NOTFOUND) {
144     rc = 0;
145   }
146 
147 finish:
148   iwkv_cursor_close(&cur);
149   return rc;
150 }
151 
_jb_coll_load_meta_lr(JBCOLL jbc)152 static iwrc _jb_coll_load_meta_lr(JBCOLL jbc) {
153   JBL jbv;
154   IWKV_cursor cur;
155   JBL jbm = jbc->meta;
156   iwrc rc = jbl_at(jbm, "/name", &jbv);
157   RCRET(rc);
158   jbc->name = jbl_get_str(jbv);
159   jbl_destroy(&jbv);
160   if (!jbc->name) {
161     return EJDB_ERROR_INVALID_COLLECTION_META;
162   }
163   rc = jbl_at(jbm, "/id", &jbv);
164   RCRET(rc);
165   jbc->dbid = (uint32_t) jbl_get_i64(jbv);
166   jbl_destroy(&jbv);
167   if (!jbc->dbid) {
168     return EJDB_ERROR_INVALID_COLLECTION_META;
169   }
170   rc = iwkv_db(jbc->db->iwkv, jbc->dbid, IWDB_VNUM64_KEYS, &jbc->cdb);
171   RCRET(rc);
172 
173   jbc->rnum = _jb_meta_nrecs_get(jbc->db, jbc->dbid);
174 
175   rc = _jb_coll_load_indexes_lr(jbc);
176   RCRET(rc);
177 
178   rc = iwkv_cursor_open(jbc->cdb, &cur, IWKV_CURSOR_BEFORE_FIRST, 0);
179   RCRET(rc);
180   rc = iwkv_cursor_to(cur, IWKV_CURSOR_NEXT);
181   if (rc) {
182     if (rc == IWKV_ERROR_NOTFOUND) {
183       rc = 0;
184     }
185   } else {
186     size_t sz;
187     rc = iwkv_cursor_copy_key(cur, &jbc->id_seq, sizeof(jbc->id_seq), &sz, 0);
188     RCGO(rc, finish);
189   }
190 
191 finish:
192   iwkv_cursor_close(&cur);
193   return rc;
194 }
195 
_jb_coll_init(JBCOLL jbc,IWKV_val * meta)196 static iwrc _jb_coll_init(JBCOLL jbc, IWKV_val *meta) {
197   int rci;
198   iwrc rc = 0;
199 
200   pthread_rwlockattr_t attr;
201   pthread_rwlockattr_init(&attr);
202 #if defined __linux__ && (defined __USE_UNIX98 || defined __USE_XOPEN2K)
203   pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
204 #endif
205   pthread_rwlock_init(&jbc->rwl, &attr);
206   if (meta) {
207     rc = jbl_from_buf_keep(&jbc->meta, meta->data, meta->size, false);
208     RCRET(rc);
209   }
210   if (!jbc->meta) {
211     iwlog_error("Collection %s seems to be initialized", jbc->name);
212     return IW_ERROR_INVALID_STATE;
213   }
214   rc = _jb_coll_load_meta_lr(jbc);
215   RCRET(rc);
216 
217   khiter_t k = kh_put(JBCOLLM, jbc->db->mcolls, jbc->name, &rci);
218   if (rci != -1) {
219     kh_value(jbc->db->mcolls, k) = jbc;
220   } else {
221     return iwrc_set_errno(IW_ERROR_ALLOC, errno);
222   }
223   return rc;
224 }
225 
_jb_idx_add_meta_lr(JBIDX idx,binn * list)226 static iwrc _jb_idx_add_meta_lr(JBIDX idx, binn *list) {
227   iwrc rc = 0;
228   IWXSTR *xstr = iwxstr_new();
229   if (!xstr) {
230     return iwrc_set_errno(IW_ERROR_ALLOC, errno);
231   }
232   binn *meta = binn_object();
233   if (!meta) {
234     rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
235     iwxstr_destroy(xstr);
236     return rc;
237   }
238   rc = jbl_ptr_serialize(idx->ptr, xstr);
239   RCGO(rc, finish);
240 
241   if (  !binn_object_set_str(meta, "ptr", iwxstr_ptr(xstr))
242      || !binn_object_set_uint32(meta, "mode", idx->mode)
243      || !binn_object_set_uint32(meta, "idbf", idx->idbf)
244      || !binn_object_set_uint32(meta, "dbid", idx->dbid)
245      || !binn_object_set_int64(meta, "rnum", idx->rnum)) {
246     rc = JBL_ERROR_CREATION;
247   }
248 
249   if (!binn_list_add_object(list, meta)) {
250     rc = JBL_ERROR_CREATION;
251   }
252 
253 finish:
254   iwxstr_destroy(xstr);
255   binn_free(meta);
256   return rc;
257 }
258 
_jb_coll_add_meta_lr(JBCOLL jbc,binn * list)259 static iwrc _jb_coll_add_meta_lr(JBCOLL jbc, binn *list) {
260   iwrc rc = 0;
261   binn *ilist = 0;
262   binn *meta = binn_object();
263   if (!meta) {
264     rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
265     return rc;
266   }
267   if (  !binn_object_set_str(meta, "name", jbc->name)
268      || !binn_object_set_uint32(meta, "dbid", jbc->dbid)
269      || !binn_object_set_int64(meta, "rnum", jbc->rnum)) {
270     rc = JBL_ERROR_CREATION;
271     goto finish;
272   }
273   ilist = binn_list();
274   if (!ilist) {
275     rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
276     goto finish;
277   }
278   for (JBIDX idx = jbc->idx; idx; idx = idx->next) {
279     rc = _jb_idx_add_meta_lr(idx, ilist);
280     RCGO(rc, finish);
281   }
282   if (!binn_object_set_list(meta, "indexes", ilist)) {
283     rc = JBL_ERROR_CREATION;
284     goto finish;
285   }
286   if (!binn_list_add_value(list, meta)) {
287     rc = JBL_ERROR_CREATION;
288     goto finish;
289   }
290 
291 finish:
292   binn_free(meta);
293   if (ilist) {
294     binn_free(ilist);
295   }
296   return rc;
297 }
298 
_jb_db_meta_load(EJDB db)299 static iwrc _jb_db_meta_load(EJDB db) {
300   iwrc rc = 0;
301   if (!db->metadb) {
302     rc = iwkv_db(db->iwkv, METADB_ID, 0, &db->metadb);
303     RCRET(rc);
304   }
305   if (!db->nrecdb) {
306     rc = iwkv_db(db->iwkv, NUMRECSDB_ID, IWDB_VNUM64_KEYS, &db->nrecdb);
307     RCRET(rc);
308   }
309 
310   IWKV_cursor cur;
311   rc = iwkv_cursor_open(db->metadb, &cur, IWKV_CURSOR_BEFORE_FIRST, 0);
312   RCRET(rc);
313   while (!(rc = iwkv_cursor_to(cur, IWKV_CURSOR_NEXT))) {
314     IWKV_val key, val;
315     rc = iwkv_cursor_get(cur, &key, &val);
316     RCGO(rc, finish);
317     if (!strncmp(key.data, KEY_PREFIX_COLLMETA, sizeof(KEY_PREFIX_COLLMETA) - 1)) {
318       JBCOLL jbc = calloc(1, sizeof(*jbc));
319       if (!jbc) {
320         rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
321         iwkv_val_dispose(&val);
322         goto finish;
323       }
324       jbc->db = db;
325       rc = _jb_coll_init(jbc, &val);
326       if (rc) {
327         _jb_coll_release(jbc);
328         iwkv_val_dispose(&key);
329         goto finish;
330       }
331     } else {
332       iwkv_val_dispose(&val);
333     }
334     iwkv_val_dispose(&key);
335   }
336   if (rc == IWKV_ERROR_NOTFOUND) {
337     rc = 0;
338   }
339 
340 finish:
341   iwkv_cursor_close(&cur);
342   return rc;
343 }
344 
_jb_db_release(EJDB * dbp)345 static iwrc _jb_db_release(EJDB *dbp) {
346   iwrc rc = 0;
347   EJDB db = *dbp;
348   *dbp = 0;
349 #ifdef JB_HTTP
350   if (db->jbr) {
351     IWRC(jbr_shutdown(&db->jbr), rc);
352   }
353 #endif
354   if (db->mcolls) {
355     for (khiter_t k = kh_begin(db->mcolls); k != kh_end(db->mcolls); ++k) {
356       if (!kh_exist(db->mcolls, k)) {
357         continue;
358       }
359       JBCOLL jbc = kh_val(db->mcolls, k);
360       _jb_coll_release(jbc);
361     }
362     kh_destroy(JBCOLLM, db->mcolls);
363     db->mcolls = 0;
364   }
365   if (db->iwkv) {
366     IWRC(iwkv_close(&db->iwkv), rc);
367   }
368   pthread_rwlock_destroy(&db->rwl);
369 
370   EJDB_HTTP *http = &db->opts.http;
371   if (http->bind) {
372     free((void*) http->bind);
373   }
374   if (http->access_token) {
375     free((void*) http->access_token);
376   }
377   free(db);
378   return rc;
379 }
380 
_jb_coll_acquire_keeplock2(EJDB db,const char * coll,jb_coll_acquire_t acm,JBCOLL * jbcp)381 static iwrc _jb_coll_acquire_keeplock2(EJDB db, const char *coll, jb_coll_acquire_t acm, JBCOLL *jbcp) {
382   if (strlen(coll) > EJDB_COLLECTION_NAME_MAX_LEN) {
383     return EJDB_ERROR_INVALID_COLLECTION_NAME;
384   }
385   int rci;
386   iwrc rc = 0;
387   *jbcp = 0;
388   JBCOLL jbc = 0;
389   bool wl = acm & JB_COLL_ACQUIRE_WRITE;
390   API_RLOCK(db, rci);
391   khiter_t k = kh_get(JBCOLLM, db->mcolls, coll);
392   if (k != kh_end(db->mcolls)) {
393     jbc = kh_value(db->mcolls, k);
394     assert(jbc);
395     rci = wl ? pthread_rwlock_wrlock(&jbc->rwl) : pthread_rwlock_rdlock(&jbc->rwl);
396     if (rci) {
397       rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
398       goto finish;
399     }
400     *jbcp = jbc;
401   } else {
402     pthread_rwlock_unlock(&db->rwl); // relock
403     if ((db->oflags & IWKV_RDONLY) || (acm & JB_COLL_ACQUIRE_EXISTING)) {
404       return IW_ERROR_NOT_EXISTS;
405     }
406     API_WLOCK(db, rci);
407     k = kh_get(JBCOLLM, db->mcolls, coll);
408     if (k != kh_end(db->mcolls)) {
409       jbc = kh_value(db->mcolls, k);
410       assert(jbc);
411       rci = pthread_rwlock_rdlock(&jbc->rwl);
412       if (rci) {
413         rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
414         goto finish;
415       }
416       *jbcp = jbc;
417     } else {
418       JBL meta = 0;
419       IWDB cdb = 0;
420       uint32_t dbid = 0;
421       char keybuf[JBNUMBUF_SIZE + sizeof(KEY_PREFIX_COLLMETA)];
422       IWKV_val key, val;
423 
424       rc = iwkv_new_db(db->iwkv, IWDB_VNUM64_KEYS, &dbid, &cdb);
425       RCGO(rc, create_finish);
426       jbc = calloc(1, sizeof(*jbc));
427       if (!jbc) {
428         rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
429         goto create_finish;
430       }
431       rc = jbl_create_empty_object(&meta);
432       RCGO(rc, create_finish);
433       if (!binn_object_set_str(&meta->bn, "name", coll)) {
434         rc = JBL_ERROR_CREATION;
435         goto create_finish;
436       }
437       if (!binn_object_set_uint32(&meta->bn, "id", dbid)) {
438         rc = JBL_ERROR_CREATION;
439         goto create_finish;
440       }
441       rc = jbl_as_buf(meta, &val.data, &val.size);
442       RCGO(rc, create_finish);
443 
444       key.size = snprintf(keybuf, sizeof(keybuf), KEY_PREFIX_COLLMETA "%u", dbid);
445       if (key.size >= sizeof(keybuf)) {
446         rc = IW_ERROR_OVERFLOW;
447         goto create_finish;
448       }
449       key.data = keybuf;
450       rc = iwkv_put(db->metadb, &key, &val, IWKV_SYNC);
451       RCGO(rc, create_finish);
452 
453       jbc->db = db;
454       jbc->meta = meta;
455       rc = _jb_coll_init(jbc, 0);
456       if (rc) {
457         iwkv_del(db->metadb, &key, IWKV_SYNC);
458         goto create_finish;
459       }
460 
461 create_finish:
462       if (rc) {
463         if (meta) {
464           jbl_destroy(&meta);
465         }
466         if (cdb) {
467           iwkv_db_destroy(&cdb);
468         }
469         if (jbc) {
470           jbc->meta = 0; // meta was cleared
471           _jb_coll_release(jbc);
472         }
473       } else {
474         rci = wl ? pthread_rwlock_wrlock(&jbc->rwl) : pthread_rwlock_rdlock(&jbc->rwl); // -V522
475         if (rci) {
476           rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
477           goto finish;
478         }
479         *jbcp = jbc;
480       }
481     }
482   }
483 
484 finish:
485   if (rc) {
486     pthread_rwlock_unlock(&db->rwl);
487   }
488   return rc;
489 }
490 
_jb_coll_acquire_keeplock(EJDB db,const char * coll,bool wl,JBCOLL * jbcp)491 IW_INLINE iwrc _jb_coll_acquire_keeplock(EJDB db, const char *coll, bool wl, JBCOLL *jbcp) {
492   return _jb_coll_acquire_keeplock2(db, coll, wl ? JB_COLL_ACQUIRE_WRITE : 0, jbcp);
493 }
494 
_jb_idx_record_add(JBIDX idx,int64_t id,JBL jbl,JBL jblprev)495 static iwrc _jb_idx_record_add(JBIDX idx, int64_t id, JBL jbl, JBL jblprev) {
496   IWKV_val key;
497   uint8_t step;
498   char vnbuf[IW_VNUMBUFSZ];
499   char numbuf[JBNUMBUF_SIZE];
500 
501   bool jbv_found, jbvprev_found;
502   struct _JBL jbv = { 0 }, jbvprev = { 0 };
503   jbl_type_t jbv_type, jbvprev_type;
504 
505   iwrc rc = 0;
506   IWPOOL *pool = 0;
507   int64_t delta = 0; // delta of added/removed index records
508   bool compound = idx->idbf & IWDB_COMPOUND_KEYS;
509 
510   jbvprev_found = jblprev ? _jbl_at(jblprev, idx->ptr, &jbvprev) : false;
511   jbv_found = jbl ? _jbl_at(jbl, idx->ptr, &jbv) : false;
512 
513   jbv_type = jbl_type(&jbv);
514   jbvprev_type = jbl_type(&jbvprev);
515 
516   // Do not index NULLs, OBJECTs, ARRAYs (in `EJDB_IDX_UNIQUE` mode)
517   if (  ((jbvprev_type == JBV_OBJECT) || (jbvprev_type <= JBV_NULL))
518      || ((jbvprev_type == JBV_ARRAY) && !compound)) {
519     jbvprev_found = false;
520   }
521   if (  ((jbv_type == JBV_OBJECT) || (jbv_type <= JBV_NULL))
522      || ((jbv_type == JBV_ARRAY) && !compound)) {
523     jbv_found = false;
524   }
525 
526   if (  compound
527      && (jbv_type == jbvprev_type)
528      && (jbvprev_type == JBV_ARRAY)) {  // compare next/prev obj arrays
529     pool = iwpool_create(1024);
530     if (!pool) {
531       rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
532       goto finish;
533     }
534     JBL_NODE jbvprev_node, jbv_node;
535     rc = jbl_to_node(&jbv, &jbv_node, false, pool);
536     RCGO(rc, finish);
537     jbv.node = jbv_node;
538 
539     rc = jbl_to_node(&jbvprev, &jbvprev_node, false, pool);
540     RCGO(rc, finish);
541     jbvprev.node = jbvprev_node;
542 
543     if (_jbl_compare_nodes(jbv_node, jbvprev_node, &rc) == 0) {
544       goto finish; // Arrays are equal or error
545     }
546   } else if (_jbl_is_eq_atomic_values(&jbv, &jbvprev)) {
547     return 0;
548   }
549 
550   if (jbvprev_found) {               // Remove old index elements
551     if (jbvprev_type == JBV_ARRAY) { // TODO: array modification delta?
552       JBL_NODE n;
553       if (!pool) {
554         pool = iwpool_create(1024);
555         if (!pool) {
556           rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
557           RCGO(rc, finish);
558         }
559       }
560       rc = jbl_to_node(&jbvprev, &n, false, pool);
561       RCGO(rc, finish);
562       for (n = n->child; n; n = n->next) {
563         jbi_node_fill_ikey(idx, n, &key, numbuf);
564         if (key.size) {
565           key.compound = id;
566           rc = iwkv_del(idx->idb, &key, 0);
567           if (!rc) {
568             --delta;
569           } else if (rc == IWKV_ERROR_NOTFOUND) {
570             rc = 0;
571           }
572           RCGO(rc, finish);
573         }
574       }
575     } else {
576       jbi_jbl_fill_ikey(idx, &jbvprev, &key, numbuf);
577       if (key.size) {
578         key.compound = id;
579         rc = iwkv_del(idx->idb, &key, 0);
580         if (!rc) {
581           --delta;
582         } else if (rc == IWKV_ERROR_NOTFOUND) {
583           rc = 0;
584         }
585         RCGO(rc, finish);
586       }
587     }
588   }
589 
590   if (jbv_found) {               // Add index record
591     if (jbv_type == JBV_ARRAY) { // TODO: array modification delta?
592       JBL_NODE n;
593       if (!pool) {
594         pool = iwpool_create(1024);
595         if (!pool) {
596           rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
597           RCGO(rc, finish);
598         }
599       }
600       rc = jbl_to_node(&jbv, &n, false, pool);
601       RCGO(rc, finish);
602       for (n = n->child; n; n = n->next) {
603         jbi_node_fill_ikey(idx, n, &key, numbuf);
604         if (key.size) {
605           key.compound = id;
606           rc = iwkv_put(idx->idb, &key, &EMPTY_VAL, IWKV_NO_OVERWRITE);
607           if (!rc) {
608             ++delta;
609           } else if (rc == IWKV_ERROR_KEY_EXISTS) {
610             rc = 0;
611           } else {
612             goto finish;
613           }
614         }
615       }
616     } else {
617       jbi_jbl_fill_ikey(idx, &jbv, &key, numbuf);
618       if (key.size) {
619         if (compound) {
620           key.compound = id;
621           rc = iwkv_put(idx->idb, &key, &EMPTY_VAL, IWKV_NO_OVERWRITE);
622           if (!rc) {
623             ++delta;
624           } else if (rc == IWKV_ERROR_KEY_EXISTS) {
625             rc = 0;
626           }
627         } else {
628           IW_SETVNUMBUF64(step, vnbuf, id);
629           IWKV_val idval = {
630             .data = vnbuf,
631             .size = step
632           };
633           rc = iwkv_put(idx->idb, &key, &idval, IWKV_NO_OVERWRITE);
634           if (!rc) {
635             ++delta;
636           } else if (rc == IWKV_ERROR_KEY_EXISTS) {
637             rc = EJDB_ERROR_UNIQUE_INDEX_CONSTRAINT_VIOLATED;
638             goto finish;
639           }
640         }
641       }
642     }
643   }
644 
645 finish:
646   if (pool) {
647     iwpool_destroy(pool);
648   }
649   if (delta && !_jb_meta_nrecs_update(idx->jbc->db, idx->dbid, delta)) {
650     idx->rnum += delta;
651   }
652   return rc;
653 }
654 
_jb_idx_record_remove(JBIDX idx,int64_t id,JBL jbl)655 IW_INLINE iwrc _jb_idx_record_remove(JBIDX idx, int64_t id, JBL jbl) {
656   return _jb_idx_record_add(idx, id, 0, jbl);
657 }
658 
_jb_idx_fill(JBIDX idx)659 static iwrc _jb_idx_fill(JBIDX idx) {
660   IWKV_cursor cur;
661   IWKV_val key, val;
662   struct _JBL jbs;
663   int64_t llv;
664   JBL jbl = &jbs;
665 
666   iwrc rc = iwkv_cursor_open(idx->jbc->cdb, &cur, IWKV_CURSOR_BEFORE_FIRST, 0);
667   while (!rc) {
668     rc = iwkv_cursor_to(cur, IWKV_CURSOR_NEXT);
669     if (rc == IWKV_ERROR_NOTFOUND) {
670       rc = 0;
671       break;
672     }
673     rc = iwkv_cursor_get(cur, &key, &val);
674     RCBREAK(rc);
675     if (!binn_load(val.data, &jbs.bn)) {
676       rc = JBL_ERROR_CREATION;
677       break;
678     }
679     memcpy(&llv, key.data, sizeof(llv));
680     rc = _jb_idx_record_add(idx, llv, jbl, 0);
681     iwkv_kv_dispose(&key, &val);
682   }
683   IWRC(iwkv_cursor_close(&cur), rc);
684   return rc;
685 }
686 
687 // Used to avoid deadlocks within a `iwkv_put` context
_jb_put_handler_after(iwrc rc,struct _JBPHCTX * ctx)688 static iwrc _jb_put_handler_after(iwrc rc, struct _JBPHCTX *ctx) {
689   IWKV_val *oldval = &ctx->oldval;
690   if (rc) {
691     if (oldval->size) {
692       iwkv_val_dispose(oldval);
693     }
694     return rc;
695   }
696   JBL prev;
697   struct _JBL jblprev;
698   JBCOLL jbc = ctx->jbc;
699   if (oldval->size) {
700     rc = jbl_from_buf_keep_onstack(&jblprev, oldval->data, oldval->size);
701     RCRET(rc);
702     prev = &jblprev;
703   } else {
704     prev = 0;
705   }
706   JBIDX fail_idx = 0;
707   for (JBIDX idx = jbc->idx; idx; idx = idx->next) {
708     rc = _jb_idx_record_add(idx, ctx->id, ctx->jbl, prev);
709     if (rc) {
710       fail_idx = idx;
711       goto finish;
712     }
713   }
714   if (!prev) {
715     _jb_meta_nrecs_update(jbc->db, jbc->dbid, 1);
716     jbc->rnum += 1;
717   }
718 
719 finish:
720   if (oldval->size) {
721     iwkv_val_dispose(oldval);
722   }
723   if (rc && !oldval->size) {
724     // Cleanup on error inserting new record
725     IWKV_val key = { .data = &ctx->id, .size = sizeof(ctx->id) };
726     for (JBIDX idx = jbc->idx; idx && idx != fail_idx; idx = idx->next) {
727       IWRC(_jb_idx_record_remove(idx, ctx->id, ctx->jbl), rc);
728     }
729     IWRC(iwkv_del(jbc->cdb, &key, 0), rc);
730   }
731   return rc;
732 }
733 
_jb_put_handler(const IWKV_val * key,const IWKV_val * val,IWKV_val * oldval,void * op)734 static iwrc _jb_put_handler(const IWKV_val *key, const IWKV_val *val, IWKV_val *oldval, void *op) {
735   struct _JBPHCTX *ctx = op;
736   if (oldval && oldval->size) {
737     memcpy(&ctx->oldval, oldval, sizeof(*oldval));
738   }
739   return 0;
740 }
741 
_jb_exec_scan_init(JBEXEC * ctx)742 static iwrc _jb_exec_scan_init(JBEXEC *ctx) {
743   ctx->istep = 1;
744   ctx->jblbufsz = ctx->jbc->db->opts.document_buffer_sz;
745   ctx->jblbuf = malloc(ctx->jblbufsz);
746   if (!ctx->jblbuf) {
747     ctx->jblbufsz = 0;
748     return iwrc_set_errno(IW_ERROR_ALLOC, errno);
749   }
750   struct JQP_AUX *aux = ctx->ux->q->aux;
751   if (aux->expr->flags & JQP_EXPR_NODE_FLAG_PK) { // Select by primary key
752     ctx->scanner = jbi_pk_scanner;
753     if (ctx->ux->log) {
754       iwxstr_cat2(ctx->ux->log, "[INDEX] PK");
755     }
756     return 0;
757   }
758   iwrc rc = jbi_selection(ctx);
759   RCRET(rc);
760   if (ctx->midx.idx) {
761     if (ctx->midx.idx->idbf & IWDB_COMPOUND_KEYS) {
762       ctx->scanner = jbi_dup_scanner;
763     } else {
764       ctx->scanner = jbi_uniq_scanner;
765     }
766   } else {
767     ctx->scanner = jbi_full_scanner;
768     if (ctx->ux->log) {
769       iwxstr_cat2(ctx->ux->log, "[INDEX] NO");
770     }
771   }
772   return 0;
773 }
774 
_jb_exec_scan_release(JBEXEC * ctx)775 static void _jb_exec_scan_release(JBEXEC *ctx) {
776   if (ctx->proj_joined_nodes_cache) {
777     // Destroy projected nodes key
778     iwstree_destroy(ctx->proj_joined_nodes_cache);
779   }
780   if (ctx->proj_joined_nodes_pool) {
781     iwpool_destroy(ctx->proj_joined_nodes_pool);
782   }
783   free(ctx->jblbuf);
784 }
785 
_jb_noop_visitor(struct _EJDB_EXEC * ctx,EJDB_DOC doc,int64_t * step)786 static iwrc _jb_noop_visitor(struct _EJDB_EXEC *ctx, EJDB_DOC doc, int64_t *step) {
787   return 0;
788 }
789 
_jb_put_impl(JBCOLL jbc,JBL jbl,int64_t id)790 IW_INLINE iwrc _jb_put_impl(JBCOLL jbc, JBL jbl, int64_t id) {
791   IWKV_val val, key = {
792     .data = &id,
793     .size = sizeof(id)
794   };
795   struct _JBPHCTX pctx = {
796     .id  = id,
797     .jbc = jbc,
798     .jbl = jbl
799   };
800   iwrc rc = jbl_as_buf(jbl, &val.data, &val.size);
801   RCRET(rc);
802   return _jb_put_handler_after(iwkv_puth(jbc->cdb, &key, &val, 0, _jb_put_handler, &pctx), &pctx);
803 }
804 
jb_put(JBCOLL jbc,JBL jbl,int64_t id)805 iwrc jb_put(JBCOLL jbc, JBL jbl, int64_t id) {
806   return _jb_put_impl(jbc, jbl, id);
807 }
808 
jb_cursor_set(JBCOLL jbc,IWKV_cursor cur,int64_t id,JBL jbl)809 iwrc jb_cursor_set(JBCOLL jbc, IWKV_cursor cur, int64_t id, JBL jbl) {
810   IWKV_val val;
811   struct _JBPHCTX pctx = {
812     .id  = id,
813     .jbc = jbc,
814     .jbl = jbl
815   };
816   iwrc rc = jbl_as_buf(jbl, &val.data, &val.size);
817   RCRET(rc);
818   return _jb_put_handler_after(iwkv_cursor_seth(cur, &val, 0, _jb_put_handler, &pctx), &pctx);
819 }
820 
_jb_exec_upsert_lw(JBEXEC * ctx)821 static iwrc _jb_exec_upsert_lw(JBEXEC *ctx) {
822   JBL_NODE n;
823   int64_t id;
824   iwrc rc = 0;
825   JBL jbl = 0;
826   EJDB_EXEC *ux = ctx->ux;
827   JQL q = ux->q;
828   if (q->aux->apply_placeholder) {
829     JQVAL *pv = jql_find_placeholder(q, q->aux->apply_placeholder);
830     if (!pv || (pv->type != JQVAL_JBLNODE) || !pv->vnode) {
831       rc = JQL_ERROR_INVALID_PLACEHOLDER_VALUE_TYPE;
832       goto finish;
833     }
834     n = pv->vnode;
835   } else {
836     n = q->aux->apply;
837   }
838   if (!n) {
839     // Skip silently, nothing to do.
840     goto finish;
841   }
842   RCC(rc, finish, jbl_from_node(&jbl, n));
843   RCC(rc, finish, _jb_put_new_lw(ctx->jbc, jbl, &id));
844 
845   if (!(q->aux->qmode & JQP_QRY_AGGREGATE)) {
846     struct _EJDB_DOC doc = {
847       .id   = id,
848       .raw  = jbl,
849       .node = n
850     };
851     do {
852       ctx->istep = 1;
853       RCC(rc, finish, ux->visitor(ux, &doc, &ctx->istep));
854     } while (ctx->istep == -1);
855   }
856   ++ux->cnt;
857 
858 finish:
859   jbl_destroy(&jbl);
860   return rc;
861 }
862 
863 //----------------------- Public API
864 
ejdb_exec(EJDB_EXEC * ux)865 iwrc ejdb_exec(EJDB_EXEC *ux) {
866   if (!ux || !ux->db || !ux->q) {
867     return IW_ERROR_INVALID_ARGS;
868   }
869   int rci;
870   iwrc rc = 0;
871   if (!ux->visitor) {
872     ux->visitor = _jb_noop_visitor;
873     ux->q->aux->projection = 0; // Actually we don't need projection if exists
874   }
875   if (ux->log) {
876     // set terminating NULL to current pos of log
877     iwxstr_cat(ux->log, 0, 0);
878   }
879   JBEXEC ctx = {
880     .ux = ux
881   };
882   if (ux->limit < 1) {
883     rc = jql_get_limit(ux->q, &ux->limit);
884     RCRET(rc);
885     if (ux->limit < 1) {
886       ux->limit = INT64_MAX;
887     }
888   }
889   if (ux->skip < 1) {
890     rc = jql_get_skip(ux->q, &ux->skip);
891     RCRET(rc);
892   }
893   rc = _jb_coll_acquire_keeplock2(ux->db, ux->q->coll,
894                                   jql_has_apply(ux->q) ? JB_COLL_ACQUIRE_WRITE : JB_COLL_ACQUIRE_EXISTING,
895                                   &ctx.jbc);
896   if (rc == IW_ERROR_NOT_EXISTS) {
897     return 0;
898   } else {
899     RCRET(rc);
900   }
901 
902   rc = _jb_exec_scan_init(&ctx);
903   RCGO(rc, finish);
904   if (ctx.sorting) {
905     if (ux->log) {
906       iwxstr_cat2(ux->log, " [COLLECTOR] SORTER\n");
907     }
908     rc = ctx.scanner(&ctx, jbi_sorter_consumer);
909   } else {
910     if (ux->log) {
911       iwxstr_cat2(ux->log, " [COLLECTOR] PLAIN\n");
912     }
913     rc = ctx.scanner(&ctx, jbi_consumer);
914   }
915   RCGO(rc, finish);
916   if ((ux->cnt == 0) && jql_has_apply_upsert(ux->q)) {
917     // No records found trying to upsert new record
918     rc = _jb_exec_upsert_lw(&ctx);
919   }
920 
921 finish:
922   _jb_exec_scan_release(&ctx);
923   API_COLL_UNLOCK(ctx.jbc, rci, rc);
924   jql_reset(ux->q, true, false);
925   return rc;
926 }
927 
928 struct JB_LIST_VISITOR_CTX {
929   EJDB_DOC head;
930   EJDB_DOC tail;
931 };
932 
_jb_exec_list_visitor(struct _EJDB_EXEC * ctx,EJDB_DOC doc,int64_t * step)933 static iwrc _jb_exec_list_visitor(struct _EJDB_EXEC *ctx, EJDB_DOC doc, int64_t *step) {
934   struct JB_LIST_VISITOR_CTX *lvc = ctx->opaque;
935   IWPOOL *pool = ctx->pool;
936   struct _EJDB_DOC *ndoc = iwpool_alloc(sizeof(*ndoc) + sizeof(*doc->raw) + doc->raw->bn.size, pool);
937   if (!ndoc) {
938     return iwrc_set_errno(IW_ERROR_ALLOC, errno);
939   }
940   ndoc->id = doc->id;
941   ndoc->raw = (void*) (((uint8_t*) ndoc) + sizeof(*ndoc));
942   ndoc->raw->node = 0;
943   ndoc->node = doc->node;
944   ndoc->next = 0;
945   ndoc->prev = 0;
946   memcpy(&ndoc->raw->bn, &doc->raw->bn, sizeof(ndoc->raw->bn));
947   ndoc->raw->bn.ptr = ((uint8_t*) ndoc) + sizeof(*ndoc) + sizeof(*doc->raw);
948   memcpy(ndoc->raw->bn.ptr, doc->raw->bn.ptr, doc->raw->bn.size);
949 
950   if (!lvc->head) {
951     lvc->head = ndoc;
952     lvc->tail = ndoc;
953   } else {
954     lvc->tail->next = ndoc;
955     ndoc->prev = lvc->tail;
956     lvc->tail = ndoc;
957   }
958   return 0;
959 }
960 
_jb_list(EJDB db,JQL q,EJDB_DOC * first,int64_t limit,IWXSTR * log,IWPOOL * pool)961 static iwrc _jb_list(EJDB db, JQL q, EJDB_DOC *first, int64_t limit, IWXSTR *log, IWPOOL *pool) {
962   if (!db || !q || !first || !pool) {
963     return IW_ERROR_INVALID_ARGS;
964   }
965   iwrc rc = 0;
966   struct JB_LIST_VISITOR_CTX lvc = { 0 };
967   struct _EJDB_EXEC ux = {
968     .db      = db,
969     .q       = q,
970     .visitor = _jb_exec_list_visitor,
971     .pool    = pool,
972     .limit   = limit,
973     .log     = log,
974     .opaque  = &lvc
975   };
976   rc = ejdb_exec(&ux);
977   if (rc) {
978     *first = 0;
979   } else {
980     *first = lvc.head;
981   }
982   return rc;
983 }
984 
_jb_count(EJDB db,JQL q,int64_t * count,int64_t limit,IWXSTR * log)985 static iwrc _jb_count(EJDB db, JQL q, int64_t *count, int64_t limit, IWXSTR *log) {
986   if (!db || !q || !count) {
987     return IW_ERROR_INVALID_ARGS;
988   }
989   struct _EJDB_EXEC ux = {
990     .db    = db,
991     .q     = q,
992     .limit = limit,
993     .log   = log
994   };
995   iwrc rc = ejdb_exec(&ux);
996   *count = ux.cnt;
997   return rc;
998 }
999 
ejdb_count(EJDB db,JQL q,int64_t * count,int64_t limit)1000 iwrc ejdb_count(EJDB db, JQL q, int64_t *count, int64_t limit) {
1001   return _jb_count(db, q, count, limit, 0);
1002 }
1003 
ejdb_count2(EJDB db,const char * coll,const char * q,int64_t * count,int64_t limit)1004 iwrc ejdb_count2(EJDB db, const char *coll, const char *q, int64_t *count, int64_t limit) {
1005   JQL jql;
1006   iwrc rc = jql_create(&jql, coll, q);
1007   RCRET(rc);
1008   rc = _jb_count(db, jql, count, limit, 0);
1009   jql_destroy(&jql);
1010   return rc;
1011 }
1012 
ejdb_update(EJDB db,JQL q)1013 iwrc ejdb_update(EJDB db, JQL q) {
1014   int64_t count;
1015   return ejdb_count(db, q, &count, 0);
1016 }
1017 
ejdb_update2(EJDB db,const char * coll,const char * q)1018 iwrc ejdb_update2(EJDB db, const char *coll, const char *q) {
1019   int64_t count;
1020   return ejdb_count2(db, coll, q, &count, 0);
1021 }
1022 
ejdb_list(EJDB db,JQL q,EJDB_DOC * first,int64_t limit,IWPOOL * pool)1023 iwrc ejdb_list(EJDB db, JQL q, EJDB_DOC *first, int64_t limit, IWPOOL *pool) {
1024   return _jb_list(db, q, first, limit, 0, pool);
1025 }
1026 
ejdb_list3(EJDB db,const char * coll,const char * query,int64_t limit,IWXSTR * log,EJDB_LIST * listp)1027 iwrc ejdb_list3(EJDB db, const char *coll, const char *query, int64_t limit, IWXSTR *log, EJDB_LIST *listp) {
1028   if (!listp) {
1029     return IW_ERROR_INVALID_ARGS;
1030   }
1031   iwrc rc = 0;
1032   *listp = 0;
1033   IWPOOL *pool = iwpool_create(1024);
1034   if (!pool) {
1035     return iwrc_set_errno(IW_ERROR_ALLOC, errno);
1036   }
1037   EJDB_LIST list = iwpool_alloc(sizeof(*list), pool);
1038   if (!list) {
1039     rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
1040     goto finish;
1041   }
1042   list->first = 0;
1043   list->db = db;
1044   list->pool = pool;
1045   rc = jql_create(&list->q, coll, query);
1046   RCGO(rc, finish);
1047   rc = _jb_list(db, list->q, &list->first, limit, log, list->pool);
1048 
1049 finish:
1050   if (rc) {
1051     iwpool_destroy(pool);
1052   } else {
1053     *listp = list;
1054   }
1055   return rc;
1056 }
1057 
ejdb_list4(EJDB db,JQL q,int64_t limit,IWXSTR * log,EJDB_LIST * listp)1058 iwrc ejdb_list4(EJDB db, JQL q, int64_t limit, IWXSTR *log, EJDB_LIST *listp) {
1059   if (!listp) {
1060     return IW_ERROR_INVALID_ARGS;
1061   }
1062   iwrc rc = 0;
1063   *listp = 0;
1064   IWPOOL *pool = iwpool_create(1024);
1065   if (!pool) {
1066     return iwrc_set_errno(IW_ERROR_ALLOC, errno);
1067   }
1068   EJDB_LIST list = iwpool_alloc(sizeof(*list), pool);
1069   if (!list) {
1070     rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
1071     goto finish;
1072   }
1073   list->q = 0;
1074   list->first = 0;
1075   list->db = db;
1076   list->pool = pool;
1077   rc = _jb_list(db, q, &list->first, limit, log, list->pool);
1078 
1079 finish:
1080   if (rc) {
1081     iwpool_destroy(pool);
1082   } else {
1083     *listp = list;
1084   }
1085   return rc;
1086 }
1087 
ejdb_list2(EJDB db,const char * coll,const char * query,int64_t limit,EJDB_LIST * listp)1088 iwrc ejdb_list2(EJDB db, const char *coll, const char *query, int64_t limit, EJDB_LIST *listp) {
1089   return ejdb_list3(db, coll, query, limit, 0, listp);
1090 }
1091 
ejdb_list_destroy(EJDB_LIST * listp)1092 void ejdb_list_destroy(EJDB_LIST *listp) {
1093   if (listp) {
1094     EJDB_LIST list = *listp;
1095     if (list) {
1096       if (list->q) {
1097         jql_destroy(&list->q);
1098       }
1099       if (list->pool) {
1100         iwpool_destroy(list->pool);
1101       }
1102     }
1103     *listp = 0;
1104   }
1105 }
1106 
ejdb_remove_index(EJDB db,const char * coll,const char * path,ejdb_idx_mode_t mode)1107 iwrc ejdb_remove_index(EJDB db, const char *coll, const char *path, ejdb_idx_mode_t mode) {
1108   if (!db || !coll || !path) {
1109     return IW_ERROR_INVALID_ARGS;
1110   }
1111   int rci;
1112   JBCOLL jbc;
1113   IWKV_val key;
1114   JBL_PTR ptr = 0;
1115   char keybuf[sizeof(KEY_PREFIX_IDXMETA) + 1 + 2 * JBNUMBUF_SIZE]; // Full key format: i.<coldbid>.<idxdbid>
1116 
1117   iwrc rc = _jb_coll_acquire_keeplock2(db, coll, JB_COLL_ACQUIRE_WRITE | JB_COLL_ACQUIRE_EXISTING, &jbc);
1118   RCRET(rc);
1119 
1120   rc = jbl_ptr_alloc(path, &ptr);
1121   RCGO(rc, finish);
1122 
1123   for (JBIDX idx = jbc->idx, prev = 0; idx; idx = idx->next) {
1124     if (((idx->mode & ~EJDB_IDX_UNIQUE) == (mode & ~EJDB_IDX_UNIQUE)) && !jbl_ptr_cmp(idx->ptr, ptr)) {
1125       key.data = keybuf;
1126       key.size = snprintf(keybuf, sizeof(keybuf), KEY_PREFIX_IDXMETA "%u" "." "%u", jbc->dbid, idx->dbid);
1127       if (key.size >= sizeof(keybuf)) {
1128         rc = IW_ERROR_OVERFLOW;
1129         goto finish;
1130       }
1131       rc = iwkv_del(db->metadb, &key, 0);
1132       RCGO(rc, finish);
1133       _jb_meta_nrecs_removedb(db, idx->dbid);
1134       if (prev) {
1135         prev->next = idx->next;
1136       } else {
1137         jbc->idx = idx->next;
1138       }
1139       if (idx->idb) {
1140         iwkv_db_destroy(&idx->idb);
1141       }
1142       _jb_idx_release(idx);
1143       break;
1144     }
1145     prev = idx;
1146   }
1147 
1148 finish:
1149   free(ptr);
1150   API_COLL_UNLOCK(jbc, rci, rc);
1151   return rc;
1152 }
1153 
ejdb_ensure_index(EJDB db,const char * coll,const char * path,ejdb_idx_mode_t mode)1154 iwrc ejdb_ensure_index(EJDB db, const char *coll, const char *path, ejdb_idx_mode_t mode) {
1155   if (!db || !coll || !path) {
1156     return IW_ERROR_INVALID_ARGS;
1157   }
1158   int rci;
1159   JBCOLL jbc;
1160   IWKV_val key, val;
1161   char keybuf[sizeof(KEY_PREFIX_IDXMETA) + 1 + 2 * JBNUMBUF_SIZE]; // Full key format: i.<coldbid>.<idxdbid>
1162 
1163   JBIDX idx = 0;
1164   JBL_PTR ptr = 0;
1165   binn *imeta = 0;
1166 
1167   switch (mode & (EJDB_IDX_STR | EJDB_IDX_I64 | EJDB_IDX_F64)) {
1168     case EJDB_IDX_STR:
1169     case EJDB_IDX_I64:
1170     case EJDB_IDX_F64:
1171       break;
1172     default:
1173       return EJDB_ERROR_INVALID_INDEX_MODE;
1174   }
1175 
1176   iwrc rc = _jb_coll_acquire_keeplock(db, coll, true, &jbc);
1177   RCRET(rc);
1178   rc = jbl_ptr_alloc(path, &ptr);
1179   RCGO(rc, finish);
1180 
1181   for (idx = jbc->idx; idx; idx = idx->next) {
1182     if (((idx->mode & ~EJDB_IDX_UNIQUE) == (mode & ~EJDB_IDX_UNIQUE)) && !jbl_ptr_cmp(idx->ptr, ptr)) {
1183       if (idx->mode != mode) {
1184         rc = EJDB_ERROR_MISMATCHED_INDEX_UNIQUENESS_MODE;
1185         idx = 0;
1186       }
1187       goto finish;
1188     }
1189   }
1190 
1191   idx = calloc(1, sizeof(*idx));
1192   if (!idx) {
1193     rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
1194     goto finish;
1195   }
1196   idx->mode = mode;
1197   idx->jbc = jbc;
1198   idx->ptr = ptr;
1199   ptr = 0;
1200   idx->idbf = 0;
1201   if (mode & EJDB_IDX_I64) {
1202     idx->idbf |= IWDB_VNUM64_KEYS;
1203   } else if (mode & EJDB_IDX_F64) {
1204     idx->idbf |= IWDB_REALNUM_KEYS;
1205   }
1206   if (!(mode & EJDB_IDX_UNIQUE)) {
1207     idx->idbf |= IWDB_COMPOUND_KEYS;
1208   }
1209   rc = iwkv_new_db(db->iwkv, idx->idbf, &idx->dbid, &idx->idb);
1210   RCGO(rc, finish);
1211 
1212   rc = _jb_idx_fill(idx);
1213   RCGO(rc, finish);
1214 
1215   // save index meta into metadb
1216   imeta = binn_object();
1217   if (!imeta) {
1218     rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
1219     goto finish;
1220   }
1221 
1222   if (  !binn_object_set_str(imeta, "ptr", path)
1223      || !binn_object_set_uint32(imeta, "mode", idx->mode)
1224      || !binn_object_set_uint32(imeta, "idbf", idx->idbf)
1225      || !binn_object_set_uint32(imeta, "dbid", idx->dbid)) {
1226     rc = JBL_ERROR_CREATION;
1227     goto finish;
1228   }
1229 
1230   key.data = keybuf;
1231   // Full key format: i.<coldbid>.<idxdbid>
1232   key.size = snprintf(keybuf, sizeof(keybuf), KEY_PREFIX_IDXMETA "%u" "." "%u", jbc->dbid, idx->dbid);
1233   if (key.size >= sizeof(keybuf)) {
1234     rc = IW_ERROR_OVERFLOW;
1235     goto finish;
1236   }
1237   val.data = binn_ptr(imeta);
1238   val.size = binn_size(imeta);
1239   rc = iwkv_put(db->metadb, &key, &val, 0);
1240   RCGO(rc, finish);
1241 
1242   idx->next = jbc->idx;
1243   jbc->idx = idx;
1244 
1245 finish:
1246   if (rc) {
1247     if (idx) {
1248       if (idx->idb) {
1249         iwkv_db_destroy(&idx->idb);
1250         idx->idb = 0;
1251       }
1252       _jb_idx_release(idx);
1253     }
1254   }
1255   free(ptr);
1256   binn_free(imeta);
1257   API_COLL_UNLOCK(jbc, rci, rc);
1258   return rc;
1259 }
1260 
_jb_patch(EJDB db,const char * coll,int64_t id,bool upsert,const char * patchjson,JBL_NODE patchjbn,JBL patchjbl)1261 static iwrc _jb_patch(
1262   EJDB db, const char *coll, int64_t id, bool upsert,
1263   const char *patchjson, JBL_NODE patchjbn, JBL patchjbl) {
1264 
1265   int rci;
1266   JBCOLL jbc;
1267   struct _JBL sjbl;
1268   JBL_NODE root, patch;
1269   JBL ujbl = 0;
1270   IWPOOL *pool = 0;
1271   IWKV_val val = { 0 };
1272   IWKV_val key = {
1273     .data = &id,
1274     .size = sizeof(id)
1275   };
1276 
1277   iwrc rc = _jb_coll_acquire_keeplock(db, coll, true, &jbc);
1278   RCGO(rc, finish);
1279 
1280   rc = iwkv_get(jbc->cdb, &key, &val);
1281   if (upsert && (rc == IWKV_ERROR_NOTFOUND)) {
1282     if (patchjson) {
1283       rc = jbl_from_json(&ujbl, patchjson);
1284     } else if (patchjbl) {
1285       ujbl = patchjbl;
1286     } else if (patchjbn) {
1287       rc = jbl_from_node(&ujbl, patchjbn);
1288     } else {
1289       rc = IW_ERROR_INVALID_ARGS;
1290     }
1291     RCGO(rc, finish);
1292     if (jbl_type(ujbl) != JBV_OBJECT) {
1293       rc = EJDB_ERROR_PATCH_JSON_NOT_OBJECT;
1294       goto finish;
1295     }
1296     rc = _jb_put_impl(jbc, ujbl, id);
1297     if (!rc && (jbc->id_seq < id)) {
1298       jbc->id_seq = id;
1299     }
1300     goto finish;
1301   } else {
1302     RCGO(rc, finish);
1303   }
1304 
1305   rc = jbl_from_buf_keep_onstack(&sjbl, val.data, val.size);
1306   RCGO(rc, finish);
1307 
1308   pool = iwpool_create_empty();
1309   if (!pool) {
1310     rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
1311     goto finish;
1312   }
1313 
1314   rc = jbl_to_node(&sjbl, &root, false, pool);
1315   RCGO(rc, finish);
1316 
1317   if (patchjson) {
1318     rc = jbn_from_json(patchjson, &patch, pool);
1319   } else if (patchjbl) {
1320     rc = jbl_to_node(patchjbl, &patch, false, pool);
1321   } else if (patchjbn) {
1322     patch = patchjbn;
1323   } else {
1324     rc = IW_ERROR_INVALID_ARGS;
1325   }
1326   RCGO(rc, finish);
1327 
1328   rc = jbn_patch_auto(root, patch, pool);
1329   RCGO(rc, finish);
1330 
1331   if (root->type == JBV_OBJECT) {
1332     rc = jbl_create_empty_object(&ujbl);
1333     RCGO(rc, finish);
1334   } else if (root->type == JBV_ARRAY) {
1335     rc = jbl_create_empty_array(&ujbl);
1336     RCGO(rc, finish);
1337   } else {
1338     rc = JBL_ERROR_CREATION;
1339     goto finish;
1340   }
1341   rc = jbl_fill_from_node(ujbl, root);
1342   RCGO(rc, finish);
1343 
1344   rc = _jb_put_impl(jbc, ujbl, id);
1345 
1346 finish:
1347   API_COLL_UNLOCK(jbc, rci, rc);
1348   if (ujbl != patchjbl) {
1349     jbl_destroy(&ujbl);
1350   }
1351   if (val.data) {
1352     iwkv_val_dispose(&val);
1353   }
1354   iwpool_destroy(pool);
1355   return rc;
1356 }
1357 
_jb_wal_lock_interceptor(bool before,void * op)1358 static iwrc _jb_wal_lock_interceptor(bool before, void *op) {
1359   int rci;
1360   iwrc rc = 0;
1361   EJDB db = op;
1362   assert(db);
1363   if (before) {
1364     API_WLOCK2(db, rci);
1365   } else {
1366     API_UNLOCK(db, rci, rc);
1367   }
1368   return rc;
1369 }
1370 
ejdb_patch(EJDB db,const char * coll,const char * patchjson,int64_t id)1371 iwrc ejdb_patch(EJDB db, const char *coll, const char *patchjson, int64_t id) {
1372   return _jb_patch(db, coll, id, false, patchjson, 0, 0);
1373 }
1374 
ejdb_patch_jbn(EJDB db,const char * coll,JBL_NODE patch,int64_t id)1375 iwrc ejdb_patch_jbn(EJDB db, const char *coll, JBL_NODE patch, int64_t id) {
1376   return _jb_patch(db, coll, id, false, 0, patch, 0);
1377 }
1378 
ejdb_patch_jbl(EJDB db,const char * coll,JBL patch,int64_t id)1379 iwrc ejdb_patch_jbl(EJDB db, const char *coll, JBL patch, int64_t id) {
1380   return _jb_patch(db, coll, id, false, 0, 0, patch);
1381 }
1382 
ejdb_merge_or_put(EJDB db,const char * coll,const char * patchjson,int64_t id)1383 iwrc ejdb_merge_or_put(EJDB db, const char *coll, const char *patchjson, int64_t id) {
1384   return _jb_patch(db, coll, id, true, patchjson, 0, 0);
1385 }
1386 
ejdb_merge_or_put_jbn(EJDB db,const char * coll,JBL_NODE patch,int64_t id)1387 iwrc ejdb_merge_or_put_jbn(EJDB db, const char *coll, JBL_NODE patch, int64_t id) {
1388   return _jb_patch(db, coll, id, true, 0, patch, 0);
1389 }
1390 
ejdb_merge_or_put_jbl(EJDB db,const char * coll,JBL patch,int64_t id)1391 iwrc ejdb_merge_or_put_jbl(EJDB db, const char *coll, JBL patch, int64_t id) {
1392   return _jb_patch(db, coll, id, true, 0, 0, patch);
1393 }
1394 
ejdb_put(EJDB db,const char * coll,JBL jbl,int64_t id)1395 iwrc ejdb_put(EJDB db, const char *coll, JBL jbl, int64_t id) {
1396   if (!jbl) {
1397     return IW_ERROR_INVALID_ARGS;
1398   }
1399   int rci;
1400   JBCOLL jbc;
1401   iwrc rc = _jb_coll_acquire_keeplock(db, coll, true, &jbc);
1402   RCRET(rc);
1403   rc = _jb_put_impl(jbc, jbl, id);
1404   if (!rc && (jbc->id_seq < id)) {
1405     jbc->id_seq = id;
1406   }
1407   API_COLL_UNLOCK(jbc, rci, rc);
1408   return rc;
1409 }
1410 
_jb_put_new_lw(JBCOLL jbc,JBL jbl,int64_t * id)1411 static iwrc _jb_put_new_lw(JBCOLL jbc, JBL jbl, int64_t *id) {
1412   iwrc rc = 0;
1413   int64_t oid = jbc->id_seq + 1;
1414   IWKV_val val, key = {
1415     .data = &oid,
1416     .size = sizeof(oid)
1417   };
1418   struct _JBPHCTX pctx = {
1419     .id  = oid,
1420     .jbc = jbc,
1421     .jbl = jbl
1422   };
1423 
1424   RCC(rc, finish, jbl_as_buf(jbl, &val.data, &val.size));
1425   RCC(rc, finish, _jb_put_handler_after(iwkv_puth(jbc->cdb, &key, &val, 0, _jb_put_handler, &pctx), &pctx));
1426 
1427   jbc->id_seq = oid;
1428   if (id) {
1429     *id = oid;
1430   }
1431 
1432 finish:
1433   return rc;
1434 }
1435 
ejdb_put_new(EJDB db,const char * coll,JBL jbl,int64_t * id)1436 iwrc ejdb_put_new(EJDB db, const char *coll, JBL jbl, int64_t *id) {
1437   if (!jbl) {
1438     return IW_ERROR_INVALID_ARGS;
1439   }
1440   int rci;
1441   JBCOLL jbc;
1442   if (id) {
1443     *id = 0;
1444   }
1445   iwrc rc = _jb_coll_acquire_keeplock(db, coll, true, &jbc);
1446   RCRET(rc);
1447 
1448   rc = _jb_put_new_lw(jbc, jbl, id);
1449 
1450   API_COLL_UNLOCK(jbc, rci, rc);
1451   return rc;
1452 }
1453 
ejdb_put_new_jbn(EJDB db,const char * coll,JBL_NODE jbn,int64_t * id)1454 iwrc ejdb_put_new_jbn(EJDB db, const char *coll, JBL_NODE jbn, int64_t *id) {
1455   JBL jbl = 0;
1456   iwrc rc = jbl_from_node(&jbl, jbn);
1457   RCRET(rc);
1458   rc = ejdb_put_new(db, coll, jbl, id);
1459   jbl_destroy(&jbl);
1460   return rc;
1461 }
1462 
jb_get(EJDB db,const char * coll,int64_t id,jb_coll_acquire_t acm,JBL * jblp)1463 iwrc jb_get(EJDB db, const char *coll, int64_t id, jb_coll_acquire_t acm, JBL *jblp) {
1464   if (!id || !jblp) {
1465     return IW_ERROR_INVALID_ARGS;
1466   }
1467   *jblp = 0;
1468   int rci;
1469   JBCOLL jbc;
1470   JBL jbl = 0;
1471   IWKV_val val = { 0 };
1472   IWKV_val key = { .data = &id, .size = sizeof(id) };
1473   iwrc rc = _jb_coll_acquire_keeplock2(db, coll, acm, &jbc);
1474   RCRET(rc);
1475 
1476   rc = iwkv_get(jbc->cdb, &key, &val);
1477   RCGO(rc, finish);
1478   rc = jbl_from_buf_keep(&jbl, val.data, val.size, false);
1479   RCGO(rc, finish);
1480   *jblp = jbl;
1481 
1482 finish:
1483   if (rc) {
1484     if (jbl) {
1485       jbl_destroy(&jbl);
1486     } else {
1487       iwkv_val_dispose(&val);
1488     }
1489   }
1490   API_COLL_UNLOCK(jbc, rci, rc);
1491   return rc;
1492 }
1493 
ejdb_get(EJDB db,const char * coll,int64_t id,JBL * jblp)1494 iwrc ejdb_get(EJDB db, const char *coll, int64_t id, JBL *jblp) {
1495   return jb_get(db, coll, id, JB_COLL_ACQUIRE_EXISTING, jblp);
1496 }
1497 
ejdb_del(EJDB db,const char * coll,int64_t id)1498 iwrc ejdb_del(EJDB db, const char *coll, int64_t id) {
1499   int rci;
1500   JBCOLL jbc;
1501   struct _JBL jbl;
1502   IWKV_val val = { 0 };
1503   IWKV_val key = { .data = &id, .size = sizeof(id) };
1504   iwrc rc = _jb_coll_acquire_keeplock2(db, coll, JB_COLL_ACQUIRE_WRITE | JB_COLL_ACQUIRE_EXISTING, &jbc);
1505   RCRET(rc);
1506 
1507   rc = iwkv_get(jbc->cdb, &key, &val);
1508   RCGO(rc, finish);
1509 
1510   rc = jbl_from_buf_keep_onstack(&jbl, val.data, val.size);
1511   RCGO(rc, finish);
1512 
1513   for (JBIDX idx = jbc->idx; idx; idx = idx->next) {
1514     IWRC(_jb_idx_record_remove(idx, id, &jbl), rc);
1515   }
1516   rc = iwkv_del(jbc->cdb, &key, 0);
1517   RCGO(rc, finish);
1518   _jb_meta_nrecs_update(jbc->db, jbc->dbid, -1);
1519   jbc->rnum -= 1;
1520 
1521 finish:
1522   if (val.data) {
1523     iwkv_val_dispose(&val);
1524   }
1525   API_COLL_UNLOCK(jbc, rci, rc);
1526   return rc;
1527 }
1528 
jb_del(JBCOLL jbc,JBL jbl,int64_t id)1529 iwrc jb_del(JBCOLL jbc, JBL jbl, int64_t id) {
1530   iwrc rc = 0;
1531   IWKV_val key = { .data = &id, .size = sizeof(id) };
1532   for (JBIDX idx = jbc->idx; idx; idx = idx->next) {
1533     IWRC(_jb_idx_record_remove(idx, id, jbl), rc);
1534   }
1535   rc = iwkv_del(jbc->cdb, &key, 0);
1536   RCRET(rc);
1537   _jb_meta_nrecs_update(jbc->db, jbc->dbid, -1);
1538   jbc->rnum -= 1;
1539   return rc;
1540 }
1541 
jb_cursor_del(JBCOLL jbc,IWKV_cursor cur,int64_t id,JBL jbl)1542 iwrc jb_cursor_del(JBCOLL jbc, IWKV_cursor cur, int64_t id, JBL jbl) {
1543   iwrc rc = 0;
1544   for (JBIDX idx = jbc->idx; idx; idx = idx->next) {
1545     IWRC(_jb_idx_record_remove(idx, id, jbl), rc);
1546   }
1547   rc = iwkv_cursor_del(cur, 0);
1548   RCRET(rc);
1549   _jb_meta_nrecs_update(jbc->db, jbc->dbid, -1);
1550   jbc->rnum -= 1;
1551   return rc;
1552 }
1553 
ejdb_ensure_collection(EJDB db,const char * coll)1554 iwrc ejdb_ensure_collection(EJDB db, const char *coll) {
1555   int rci;
1556   JBCOLL jbc;
1557   iwrc rc = _jb_coll_acquire_keeplock(db, coll, false, &jbc);
1558   RCRET(rc);
1559   API_COLL_UNLOCK(jbc, rci, rc);
1560   return rc;
1561 }
1562 
ejdb_remove_collection(EJDB db,const char * coll)1563 iwrc ejdb_remove_collection(EJDB db, const char *coll) {
1564   int rci;
1565   iwrc rc = 0;
1566   if (db->oflags & IWKV_RDONLY) {
1567     return IW_ERROR_READONLY;
1568   }
1569   API_WLOCK(db, rci);
1570   JBCOLL jbc;
1571   IWKV_val key;
1572   char keybuf[sizeof(KEY_PREFIX_IDXMETA) + 1 + 2 * JBNUMBUF_SIZE]; // Full key format: i.<coldbid>.<idxdbid>
1573   khiter_t k = kh_get(JBCOLLM, db->mcolls, coll);
1574 
1575   if (k != kh_end(db->mcolls)) {
1576 
1577     jbc = kh_value(db->mcolls, k);
1578     key.data = keybuf;
1579     key.size = snprintf(keybuf, sizeof(keybuf), KEY_PREFIX_COLLMETA "%u", jbc->dbid);
1580     rc = iwkv_del(jbc->db->metadb, &key, IWKV_SYNC);
1581     RCGO(rc, finish);
1582 
1583     _jb_meta_nrecs_removedb(db, jbc->dbid);
1584 
1585     for (JBIDX idx = jbc->idx; idx; idx = idx->next) {
1586       key.data = keybuf;
1587       key.size = snprintf(keybuf, sizeof(keybuf), KEY_PREFIX_IDXMETA "%u" "." "%u", jbc->dbid, idx->dbid);
1588       rc = iwkv_del(jbc->db->metadb, &key, 0);
1589       RCGO(rc, finish);
1590       _jb_meta_nrecs_removedb(db, idx->dbid);
1591     }
1592     for (JBIDX idx = jbc->idx, nidx; idx; idx = nidx) {
1593       IWRC(iwkv_db_destroy(&idx->idb), rc);
1594       idx->idb = 0;
1595       nidx = idx->next;
1596       _jb_idx_release(idx);
1597     }
1598     jbc->idx = 0;
1599     IWRC(iwkv_db_destroy(&jbc->cdb), rc);
1600     kh_del(JBCOLLM, db->mcolls, k);
1601     _jb_coll_release(jbc);
1602   }
1603 
1604 finish:
1605   API_UNLOCK(db, rci, rc);
1606   return rc;
1607 }
1608 
jb_collection_join_resolver(int64_t id,const char * coll,JBL * out,JBEXEC * ctx)1609 iwrc jb_collection_join_resolver(int64_t id, const char *coll, JBL *out, JBEXEC *ctx) {
1610   assert(out && ctx && coll);
1611   EJDB db = ctx->jbc->db;
1612   return jb_get(db, coll, id, JB_COLL_ACQUIRE_EXISTING, out);
1613 }
1614 
jb_proj_node_cache_cmp(const void * v1,const void * v2)1615 int jb_proj_node_cache_cmp(const void *v1, const void *v2) {
1616   const struct _JBDOCREF *r1 = v1;
1617   const struct _JBDOCREF *r2 = v2;
1618   int ret = r1->id > r2->id ? 1 : r1->id < r2->id ? -1 : 0;
1619   if (!ret) {
1620     return strcmp(r1->coll, r2->coll);
1621     ;
1622   }
1623   return ret;
1624 }
1625 
jb_proj_node_kvfree(void * key,void * val)1626 void jb_proj_node_kvfree(void *key, void *val) {
1627   free(key);
1628 }
1629 
ejdb_rename_collection(EJDB db,const char * coll,const char * new_coll)1630 iwrc ejdb_rename_collection(EJDB db, const char *coll, const char *new_coll) {
1631   if (!coll || !new_coll) {
1632     return IW_ERROR_INVALID_ARGS;
1633   }
1634   int rci;
1635   iwrc rc = 0;
1636   if (db->oflags & IWKV_RDONLY) {
1637     return IW_ERROR_READONLY;
1638   }
1639   IWKV_val key, val;
1640   JBL nmeta = 0, jbv = 0;
1641   char keybuf[JBNUMBUF_SIZE + sizeof(KEY_PREFIX_COLLMETA)];
1642 
1643   API_WLOCK(db, rci);
1644 
1645   khiter_t k = kh_get(JBCOLLM, db->mcolls, coll);
1646   if (k == kh_end(db->mcolls)) {
1647     rc = EJDB_ERROR_COLLECTION_NOT_FOUND;
1648     goto finish;
1649   }
1650   khiter_t k2 = kh_get(JBCOLLM, db->mcolls, new_coll);
1651   if (k2 != kh_end(db->mcolls)) {
1652     rc = EJDB_ERROR_TARGET_COLLECTION_EXISTS;
1653     goto finish;
1654   }
1655 
1656   JBCOLL jbc = kh_value(db->mcolls, k);
1657 
1658   rc = jbl_create_empty_object(&nmeta);
1659   RCGO(rc, finish);
1660 
1661   if (!binn_object_set_str(&nmeta->bn, "name", new_coll)) {
1662     rc = JBL_ERROR_CREATION;
1663     goto finish;
1664   }
1665   if (!binn_object_set_uint32(&nmeta->bn, "id", jbc->dbid)) {
1666     rc = JBL_ERROR_CREATION;
1667     goto finish;
1668   }
1669 
1670   rc = jbl_as_buf(nmeta, &val.data, &val.size);
1671   RCGO(rc, finish);
1672   key.size = snprintf(keybuf, sizeof(keybuf), KEY_PREFIX_COLLMETA "%u", jbc->dbid);
1673   if (key.size >= sizeof(keybuf)) {
1674     rc = IW_ERROR_OVERFLOW;
1675     goto finish;
1676   }
1677   key.data = keybuf;
1678 
1679   rc = jbl_at(nmeta, "/name", &jbv);
1680   RCGO(rc, finish);
1681 
1682   const char *new_name = jbl_get_str(jbv);
1683 
1684   rc = iwkv_put(db->metadb, &key, &val, IWKV_SYNC);
1685   RCGO(rc, finish);
1686 
1687   kh_del(JBCOLLM, db->mcolls, k);
1688   k2 = kh_put(JBCOLLM, db->mcolls, new_name, &rci);
1689   if (rci != -1) {
1690     kh_value(db->mcolls, k2) = jbc;
1691   } else {
1692     rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
1693     goto finish;
1694   }
1695 
1696   jbc->name = new_name;
1697   jbl_destroy(&jbc->meta);
1698   jbc->meta = nmeta;
1699 
1700 finish:
1701   if (jbv) {
1702     jbl_destroy(&jbv);
1703   }
1704   if (rc) {
1705     if (nmeta) {
1706       jbl_destroy(&nmeta);
1707     }
1708   }
1709   API_UNLOCK(db, rci, rc);
1710   return rc;
1711 }
1712 
ejdb_get_meta(EJDB db,JBL * jblp)1713 iwrc ejdb_get_meta(EJDB db, JBL *jblp) {
1714   int rci;
1715   *jblp = 0;
1716   JBL jbl;
1717   iwrc rc = jbl_create_empty_object(&jbl);
1718   RCRET(rc);
1719   binn *clist = 0;
1720   API_RLOCK(db, rci);
1721   if (!binn_object_set_str(&jbl->bn, "version", ejdb_version_full())) {
1722     rc = JBL_ERROR_CREATION;
1723     goto finish;
1724   }
1725   IWFS_FSM_STATE sfsm;
1726   rc = iwkv_state(db->iwkv, &sfsm);
1727   RCRET(rc);
1728   if (  !binn_object_set_str(&jbl->bn, "file", sfsm.exfile.file.opts.path)
1729      || !binn_object_set_int64(&jbl->bn, "size", sfsm.exfile.fsize)) {
1730     rc = JBL_ERROR_CREATION;
1731     goto finish;
1732   }
1733   clist = binn_list();
1734   if (!clist) {
1735     rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
1736     goto finish;
1737   }
1738   for (khiter_t k = kh_begin(db->mcolls); k != kh_end(db->mcolls); ++k) {
1739     if (!kh_exist(db->mcolls, k)) {
1740       continue;
1741     }
1742     JBCOLL jbc = kh_val(db->mcolls, k);
1743     rc = _jb_coll_add_meta_lr(jbc, clist);
1744     RCGO(rc, finish);
1745   }
1746   if (!binn_object_set_list(&jbl->bn, "collections", clist)) {
1747     rc = JBL_ERROR_CREATION;
1748     goto finish;
1749   }
1750   binn_free(clist);
1751   clist = 0;
1752 
1753 finish:
1754   API_UNLOCK(db, rci, rc);
1755   if (rc) {
1756     if (clist) {
1757       binn_free(clist);
1758     }
1759     jbl_destroy(&jbl);
1760   } else {
1761     *jblp = jbl;
1762   }
1763   return rc;
1764 }
1765 
ejdb_online_backup(EJDB db,uint64_t * ts,const char * target_file)1766 iwrc ejdb_online_backup(EJDB db, uint64_t *ts, const char *target_file) {
1767   ENSURE_OPEN(db);
1768   return iwkv_online_backup(db->iwkv, ts, target_file);
1769 }
1770 
ejdb_get_iwkv(EJDB db,IWKV * kvp)1771 iwrc ejdb_get_iwkv(EJDB db, IWKV *kvp) {
1772   if (!db || !kvp) {
1773     return IW_ERROR_INVALID_ARGS;
1774   }
1775   *kvp = db->iwkv;
1776   return 0;
1777 }
1778 
ejdb_open(const EJDB_OPTS * _opts,EJDB * ejdbp)1779 iwrc ejdb_open(const EJDB_OPTS *_opts, EJDB *ejdbp) {
1780   *ejdbp = 0;
1781   int rci;
1782   iwrc rc = ejdb_init();
1783   RCRET(rc);
1784   if (!_opts || !_opts->kv.path || !ejdbp) {
1785     return IW_ERROR_INVALID_ARGS;
1786   }
1787 
1788   EJDB db = calloc(1, sizeof(*db));
1789   if (!db) {
1790     return iwrc_set_errno(IW_ERROR_ALLOC, errno);
1791   }
1792 
1793   memcpy(&db->opts, _opts, sizeof(db->opts));
1794   if (!db->opts.sort_buffer_sz) {
1795     db->opts.sort_buffer_sz = 16 * 1024 * 1024; // 16Mb
1796   }
1797   if (db->opts.sort_buffer_sz < 1024 * 1024) { // Min 1Mb
1798     db->opts.sort_buffer_sz = 1024 * 1024;
1799   }
1800   if (!db->opts.document_buffer_sz) { // 64Kb
1801     db->opts.document_buffer_sz = 64 * 1024;
1802   }
1803   if (db->opts.document_buffer_sz < 16 * 1024) { // Min 16Kb
1804     db->opts.document_buffer_sz = 16 * 1024;
1805   }
1806   EJDB_HTTP *http = &db->opts.http;
1807   if (http->bind) {
1808     http->bind = strdup(http->bind);
1809   }
1810   if (http->access_token) {
1811     http->access_token = strdup(http->access_token);
1812     if (!http->access_token) {
1813       return iwrc_set_errno(IW_ERROR_ALLOC, errno);
1814     }
1815     http->access_token_len = strlen(http->access_token);
1816   }
1817 
1818   pthread_rwlockattr_t attr;
1819   pthread_rwlockattr_init(&attr);
1820 #if defined __linux__ && (defined __USE_UNIX98 || defined __USE_XOPEN2K)
1821   pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
1822 #endif
1823   rci = pthread_rwlock_init(&db->rwl, &attr);
1824   if (rci) {
1825     rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
1826     free(db);
1827     return rc;
1828   }
1829   db->mcolls = kh_init(JBCOLLM);
1830   if (!db->mcolls) {
1831     rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
1832     goto finish;
1833   }
1834 
1835   IWKV_OPTS kvopts;
1836   memcpy(&kvopts, &db->opts.kv, sizeof(db->opts.kv));
1837   kvopts.wal.enabled = !db->opts.no_wal;
1838   kvopts.wal.wal_lock_interceptor = _jb_wal_lock_interceptor;
1839   kvopts.wal.wal_lock_interceptor_opaque = db;
1840 
1841   rc = iwkv_open(&kvopts, &db->iwkv);
1842   RCGO(rc, finish);
1843 
1844   db->oflags = kvopts.oflags;
1845   rc = _jb_db_meta_load(db);
1846   RCGO(rc, finish);
1847 
1848   if (db->opts.http.enabled) {
1849     // Maximum WS/HTTP API body size. Default: 64Mb, Min: 512K
1850     if (!db->opts.http.max_body_size) {
1851       db->opts.http.max_body_size = 64 * 1024 * 1024;
1852     } else if (db->opts.http.max_body_size < 512 * 1024) {
1853       db->opts.http.max_body_size = 512 * 1024;
1854     }
1855   }
1856 
1857 #ifdef JB_HTTP
1858   if (db->opts.http.enabled && !db->opts.http.blocking) {
1859     rc = jbr_start(db, &db->opts, &db->jbr);
1860     RCGO(rc, finish);
1861   }
1862 #endif
1863 
1864 finish:
1865   if (rc) {
1866     _jb_db_release(&db);
1867   } else {
1868     db->open = true;
1869     *ejdbp = db;
1870 #ifdef JB_HTTP
1871     if (db->opts.http.enabled && db->opts.http.blocking) {
1872       rc = jbr_start(db, &db->opts, &db->jbr);
1873     }
1874 #endif
1875   }
1876   return rc;
1877 }
1878 
ejdb_close(EJDB * ejdbp)1879 iwrc ejdb_close(EJDB *ejdbp) {
1880   if (!ejdbp || !*ejdbp) {
1881     return IW_ERROR_INVALID_ARGS;
1882   }
1883   EJDB db = *ejdbp;
1884   if (!__sync_bool_compare_and_swap(&db->open, 1, 0)) {
1885     iwlog_error2("Database is closed already");
1886     return IW_ERROR_INVALID_STATE;
1887   }
1888   iwrc rc = _jb_db_release(ejdbp);
1889   return rc;
1890 }
1891 
ejdb_git_revision(void)1892 const char *ejdb_git_revision(void) {
1893   return EJDB2_GIT_REVISION;
1894 }
1895 
ejdb_version_full(void)1896 const char *ejdb_version_full(void) {
1897   return EJDB2_VERSION;
1898 }
1899 
ejdb_version_major(void)1900 unsigned int ejdb_version_major(void) {
1901   return EJDB2_VERSION_MAJOR;
1902 }
1903 
ejdb_version_minor(void)1904 unsigned int ejdb_version_minor(void) {
1905   return EJDB2_VERSION_MINOR;
1906 }
1907 
ejdb_version_patch(void)1908 unsigned int ejdb_version_patch(void) {
1909   return EJDB2_VERSION_PATCH;
1910 }
1911 
_ejdb_ecodefn(locale_t locale,uint32_t ecode)1912 static const char *_ejdb_ecodefn(locale_t locale, uint32_t ecode) {
1913   if (!((ecode > _EJDB_ERROR_START) && (ecode < _EJDB_ERROR_END))) {
1914     return 0;
1915   }
1916   switch (ecode) {
1917     case EJDB_ERROR_INVALID_COLLECTION_META:
1918       return "Invalid collection metadata (EJDB_ERROR_INVALID_COLLECTION_META)";
1919     case EJDB_ERROR_INVALID_COLLECTION_INDEX_META:
1920       return "Invalid collection index metadata (EJDB_ERROR_INVALID_COLLECTION_INDEX_META)";
1921     case EJDB_ERROR_INVALID_INDEX_MODE:
1922       return "Invalid index mode specified (EJDB_ERROR_INVALID_INDEX_MODE)";
1923     case EJDB_ERROR_MISMATCHED_INDEX_UNIQUENESS_MODE:
1924       return "Index exists but mismatched uniqueness constraint (EJDB_ERROR_MISMATCHED_INDEX_UNIQUENESS_MODE)";
1925     case EJDB_ERROR_UNIQUE_INDEX_CONSTRAINT_VIOLATED:
1926       return "Unique index constraint violated (EJDB_ERROR_UNIQUE_INDEX_CONSTRAINT_VIOLATED)";
1927     case EJDB_ERROR_INVALID_COLLECTION_NAME:
1928       return "Invalid collection name (EJDB_ERROR_INVALID_COLLECTION_NAME)";
1929     case EJDB_ERROR_COLLECTION_NOT_FOUND:
1930       return "Collection not found (EJDB_ERROR_COLLECTION_NOT_FOUND)";
1931     case EJDB_ERROR_TARGET_COLLECTION_EXISTS:
1932       return "Target collection exists (EJDB_ERROR_TARGET_COLLECTION_EXISTS)";
1933     case EJDB_ERROR_PATCH_JSON_NOT_OBJECT:
1934       return "Patch JSON must be an object (map) (EJDB_ERROR_PATCH_JSON_NOT_OBJECT)";
1935   }
1936   return 0;
1937 }
1938 
ejdb_init()1939 iwrc ejdb_init() {
1940   static volatile int jb_initialized = 0;
1941   if (!__sync_bool_compare_and_swap(&jb_initialized, 0, 1)) {
1942     return 0;  // initialized already
1943   }
1944   iwrc rc = iw_init();
1945   RCRET(rc);
1946   rc = jbl_init();
1947   RCRET(rc);
1948   rc = jql_init();
1949   RCRET(rc);
1950 #ifdef JB_HTTP
1951   rc = jbr_init();
1952   RCRET(rc);
1953 #endif
1954   return iwlog_register_ecodefn(_ejdb_ecodefn);
1955 }
1956