• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "ejdb2_internal.h"
2 #include <iowow/murmur3.h>
3 
4 static iwrc _jb_put_new_lw(JBCOLL jbc, JBL jbl, int64_t *id);
5 
6 static const IWKV_val EMPTY_VAL = { 0 };
7 
_jb_meta_nrecs_removedb(EJDB db,uint32_t dbid)8 IW_INLINE iwrc _jb_meta_nrecs_removedb(EJDB db, uint32_t dbid) {
9   dbid = IW_HTOIL(dbid);
10   IWKV_val key = {
11     .size = sizeof(dbid),
12     .data = &dbid
13   };
14   return iwkv_del(db->nrecdb, &key, 0);
15 }
16 
_jb_meta_nrecs_update(EJDB db,uint32_t dbid,int64_t delta)17 IW_INLINE iwrc _jb_meta_nrecs_update(EJDB db, uint32_t dbid, int64_t delta) {
18   delta = IW_HTOILL(delta);
19   dbid = IW_HTOIL(dbid);
20   IWKV_val val = {
21     .size = sizeof(delta),
22     .data = &delta
23   };
24   IWKV_val key = {
25     .size = sizeof(dbid),
26     .data = &dbid
27   };
28   return iwkv_put(db->nrecdb, &key, &val, IWKV_VAL_INCREMENT);
29 }
30 
_jb_meta_nrecs_get(EJDB db,uint32_t dbid)31 static int64_t _jb_meta_nrecs_get(EJDB db, uint32_t dbid) {
32   size_t vsz = 0;
33   uint64_t ret = 0;
34   dbid = IW_HTOIL(dbid);
35   IWKV_val key = {
36     .size = sizeof(dbid),
37     .data = &dbid
38   };
39   iwkv_get_copy(db->nrecdb, &key, &ret, sizeof(ret), &vsz);
40   if (vsz == sizeof(ret)) {
41     ret = IW_ITOHLL(ret);
42   }
43   return (int64_t) ret;
44 }
45 
_jb_idx_release(JBIDX idx)46 static void _jb_idx_release(JBIDX idx) {
47   free(idx->ptr);
48   free(idx);
49 }
50 
_jb_coll_release(JBCOLL jbc)51 static void _jb_coll_release(JBCOLL jbc) {
52   if (jbc->meta) {
53     jbl_destroy(&jbc->meta);
54   }
55   JBIDX nidx;
56   for (JBIDX idx = jbc->idx; idx; idx = nidx) {
57     nidx = idx->next;
58     _jb_idx_release(idx);
59   }
60   jbc->idx = 0;
61   pthread_rwlock_destroy(&jbc->rwl);
62   free(jbc);
63 }
64 
_jb_coll_load_index_lr(JBCOLL jbc,IWKV_val * mval)65 static iwrc _jb_coll_load_index_lr(JBCOLL jbc, IWKV_val *mval) {
66   iwrc rc;
67   binn *bn;
68   char *ptr;
69   struct _JBL imeta;
70   JBIDX idx = calloc(1, sizeof(*idx));
71   if (!idx) {
72     return iwrc_set_errno(IW_ERROR_ALLOC, errno);
73   }
74 
75   RCC(rc, finish, jbl_from_buf_keep_onstack(&imeta, mval->data, mval->size));
76   bn = &imeta.bn;
77 
78   if (  !binn_object_get_str(bn, "ptr", &ptr)
79      || !binn_object_get_uint8(bn, "mode", &idx->mode)
80      || !binn_object_get_uint8(bn, "idbf", &idx->idbf)
81      || !binn_object_get_uint32(bn, "dbid", &idx->dbid)) {
82     rc = EJDB_ERROR_INVALID_COLLECTION_INDEX_META;
83     goto finish;
84   }
85 
86   RCC(rc, finish, jbl_ptr_alloc(ptr, &idx->ptr));
87   RCC(rc, finish, iwkv_db(jbc->db->iwkv, idx->dbid, idx->idbf, &idx->idb));
88 
89   idx->jbc = jbc;
90   idx->rnum = _jb_meta_nrecs_get(jbc->db, idx->dbid);
91   idx->next = jbc->idx;
92   jbc->idx = idx;
93 
94 finish:
95   if (rc) {
96     _jb_idx_release(idx);
97   }
98   return rc;
99 }
100 
_jb_coll_load_indexes_lr(JBCOLL jbc)101 static iwrc _jb_coll_load_indexes_lr(JBCOLL jbc) {
102   iwrc rc = 0;
103   IWKV_cursor cur;
104   IWKV_val kval;
105   char buf[sizeof(KEY_PREFIX_IDXMETA) + IWNUMBUF_SIZE];
106   // Full key format: i.<coldbid>.<idxdbid>
107   int sz = snprintf(buf, sizeof(buf), KEY_PREFIX_IDXMETA "%u.", jbc->dbid);
108   if (sz >= sizeof(buf)) {
109     return IW_ERROR_OVERFLOW;
110   }
111   kval.data = buf;
112   kval.size = sz;
113   rc = iwkv_cursor_open(jbc->db->metadb, &cur, IWKV_CURSOR_GE, &kval);
114   if (rc == IWKV_ERROR_NOTFOUND) {
115     rc = 0;
116     goto finish;
117   }
118   RCRET(rc);
119 
120   do {
121     IWKV_val key, val;
122     RCC(rc, finish, iwkv_cursor_key(cur, &key));
123     if ((key.size > sz) && !strncmp(buf, key.data, sz)) {
124       iwkv_val_dispose(&key);
125       RCC(rc, finish, iwkv_cursor_val(cur, &val));
126       rc = _jb_coll_load_index_lr(jbc, &val);
127       iwkv_val_dispose(&val);
128       RCBREAK(rc);
129     } else {
130       iwkv_val_dispose(&key);
131     }
132   } while (!(rc = iwkv_cursor_to(cur, IWKV_CURSOR_PREV)));
133   if (rc == IWKV_ERROR_NOTFOUND) {
134     rc = 0;
135   }
136 
137 finish:
138   iwkv_cursor_close(&cur);
139   return rc;
140 }
141 
_jb_coll_load_meta_lr(JBCOLL jbc)142 static iwrc _jb_coll_load_meta_lr(JBCOLL jbc) {
143   JBL jbv;
144   IWKV_cursor cur;
145   JBL jbm = jbc->meta;
146   iwrc rc = jbl_at(jbm, "/name", &jbv);
147   RCRET(rc);
148   jbc->name = jbl_get_str(jbv);
149   jbl_destroy(&jbv);
150   if (!jbc->name) {
151     return EJDB_ERROR_INVALID_COLLECTION_META;
152   }
153   rc = jbl_at(jbm, "/id", &jbv);
154   RCRET(rc);
155   jbc->dbid = (uint32_t) jbl_get_i64(jbv);
156   jbl_destroy(&jbv);
157   if (!jbc->dbid) {
158     return EJDB_ERROR_INVALID_COLLECTION_META;
159   }
160   rc = iwkv_db(jbc->db->iwkv, jbc->dbid, IWDB_VNUM64_KEYS, &jbc->cdb);
161   RCRET(rc);
162 
163   jbc->rnum = _jb_meta_nrecs_get(jbc->db, jbc->dbid);
164 
165   rc = _jb_coll_load_indexes_lr(jbc);
166   RCRET(rc);
167 
168   rc = iwkv_cursor_open(jbc->cdb, &cur, IWKV_CURSOR_BEFORE_FIRST, 0);
169   RCRET(rc);
170   rc = iwkv_cursor_to(cur, IWKV_CURSOR_NEXT);
171   if (rc) {
172     if (rc == IWKV_ERROR_NOTFOUND) {
173       rc = 0;
174     }
175   } else {
176     size_t sz;
177     RCC(rc, finish, iwkv_cursor_copy_key(cur, &jbc->id_seq, sizeof(jbc->id_seq), &sz, 0));
178   }
179 
180 finish:
181   iwkv_cursor_close(&cur);
182   return rc;
183 }
184 
_jb_coll_init(JBCOLL jbc,IWKV_val * meta)185 static iwrc _jb_coll_init(JBCOLL jbc, IWKV_val *meta) {
186   iwrc rc = 0;
187   pthread_rwlockattr_t attr;
188   pthread_rwlockattr_init(&attr);
189 #if defined __linux__ && (defined __USE_UNIX98 || defined __USE_XOPEN2K)
190   pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
191 #endif
192   pthread_rwlock_init(&jbc->rwl, &attr);
193   if (meta) {
194     rc = jbl_from_buf_keep(&jbc->meta, meta->data, meta->size, false);
195     RCRET(rc);
196   }
197   if (!jbc->meta) {
198     iwlog_error("Collection %s seems to be initialized", jbc->name);
199     return IW_ERROR_INVALID_STATE;
200   }
201   rc = _jb_coll_load_meta_lr(jbc);
202   RCRET(rc);
203 
204   rc = iwhmap_put(jbc->db->mcolls, (void*) jbc->name, jbc);
205   RCRET(rc);
206 
207   return rc;
208 }
209 
_jb_idx_add_meta_lr(JBIDX idx,binn * list)210 static iwrc _jb_idx_add_meta_lr(JBIDX idx, binn *list) {
211   iwrc rc = 0;
212   IWXSTR *xstr = iwxstr_new();
213   if (!xstr) {
214     return iwrc_set_errno(IW_ERROR_ALLOC, errno);
215   }
216   binn *meta = binn_object();
217   if (!meta) {
218     rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
219     iwxstr_destroy(xstr);
220     return rc;
221   }
222   RCC(rc, finish, jbl_ptr_serialize(idx->ptr, xstr));
223 
224   if (  !binn_object_set_str(meta, "ptr", iwxstr_ptr(xstr))
225      || !binn_object_set_uint32(meta, "mode", idx->mode)
226      || !binn_object_set_uint32(meta, "idbf", idx->idbf)
227      || !binn_object_set_uint32(meta, "dbid", idx->dbid)
228      || !binn_object_set_int64(meta, "rnum", idx->rnum)) {
229     rc = JBL_ERROR_CREATION;
230   }
231 
232   if (!binn_list_add_object(list, meta)) {
233     rc = JBL_ERROR_CREATION;
234   }
235 
236 finish:
237   iwxstr_destroy(xstr);
238   binn_free(meta);
239   return rc;
240 }
241 
_jb_coll_add_meta_lr(JBCOLL jbc,binn * list)242 static iwrc _jb_coll_add_meta_lr(JBCOLL jbc, binn *list) {
243   iwrc rc = 0;
244   binn *ilist = 0;
245   binn *meta = binn_object();
246   if (!meta) {
247     rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
248     return rc;
249   }
250   if (  !binn_object_set_str(meta, "name", jbc->name)
251      || !binn_object_set_uint32(meta, "dbid", jbc->dbid)
252      || !binn_object_set_int64(meta, "rnum", jbc->rnum)) {
253     rc = JBL_ERROR_CREATION;
254     goto finish;
255   }
256   ilist = binn_list();
257   if (!ilist) {
258     rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
259     goto finish;
260   }
261   for (JBIDX idx = jbc->idx; idx; idx = idx->next) {
262     RCC(rc, finish, _jb_idx_add_meta_lr(idx, ilist));
263   }
264   if (!binn_object_set_list(meta, "indexes", ilist)) {
265     rc = JBL_ERROR_CREATION;
266     goto finish;
267   }
268   if (!binn_list_add_value(list, meta)) {
269     rc = JBL_ERROR_CREATION;
270     goto finish;
271   }
272 
273 finish:
274   binn_free(meta);
275   if (ilist) {
276     binn_free(ilist);
277   }
278   return rc;
279 }
280 
_jb_db_meta_load(EJDB db)281 static iwrc _jb_db_meta_load(EJDB db) {
282   iwrc rc = 0;
283   if (!db->metadb) {
284     rc = iwkv_db(db->iwkv, METADB_ID, 0, &db->metadb);
285     RCRET(rc);
286   }
287   if (!db->nrecdb) {
288     rc = iwkv_db(db->iwkv, NUMRECSDB_ID, IWDB_VNUM64_KEYS, &db->nrecdb);
289     RCRET(rc);
290   }
291 
292   IWKV_cursor cur;
293   rc = iwkv_cursor_open(db->metadb, &cur, IWKV_CURSOR_BEFORE_FIRST, 0);
294   RCRET(rc);
295   while (!(rc = iwkv_cursor_to(cur, IWKV_CURSOR_NEXT))) {
296     IWKV_val key, val;
297     RCC(rc, finish, iwkv_cursor_get(cur, &key, &val));
298     if (!strncmp(key.data, KEY_PREFIX_COLLMETA, sizeof(KEY_PREFIX_COLLMETA) - 1)) {
299       JBCOLL jbc = calloc(1, sizeof(*jbc));
300       if (!jbc) {
301         rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
302         iwkv_val_dispose(&val);
303         goto finish;
304       }
305       jbc->db = db;
306       rc = _jb_coll_init(jbc, &val);
307       if (rc) {
308         _jb_coll_release(jbc);
309         iwkv_val_dispose(&key);
310         goto finish;
311       }
312     } else {
313       iwkv_val_dispose(&val);
314     }
315     iwkv_val_dispose(&key);
316   }
317   if (rc == IWKV_ERROR_NOTFOUND) {
318     rc = 0;
319   }
320 
321 finish:
322   iwkv_cursor_close(&cur);
323   return rc;
324 }
325 
_jb_db_release(EJDB * dbp)326 static iwrc _jb_db_release(EJDB *dbp) {
327   iwrc rc = 0;
328   EJDB db = *dbp;
329   *dbp = 0;
330 #ifdef JB_HTTP
331   jbr_shutdown_wait(db->jbr);
332 #endif
333   if (db->mcolls) {
334     iwhmap_destroy(db->mcolls);
335     db->mcolls = 0;
336   }
337   if (db->iwkv) {
338     IWRC(iwkv_close(&db->iwkv), rc);
339   }
340   pthread_rwlock_destroy(&db->rwl);
341 
342   EJDB_HTTP *http = &db->opts.http;
343   if (http->bind) {
344     free((void*) http->bind);
345   }
346   if (http->access_token) {
347     free((void*) http->access_token);
348   }
349   free(db);
350   return rc;
351 }
352 
_jb_coll_acquire_keeplock2(EJDB db,const char * coll,jb_coll_acquire_t acm,JBCOLL * jbcp)353 static iwrc _jb_coll_acquire_keeplock2(EJDB db, const char *coll, jb_coll_acquire_t acm, JBCOLL *jbcp) {
354   if (!coll || *coll == '\0' || strlen(coll) > EJDB_COLLECTION_NAME_MAX_LEN) {
355     return EJDB_ERROR_INVALID_COLLECTION_NAME;
356   }
357   int rci;
358   iwrc rc = 0;
359   *jbcp = 0;
360   JBCOLL jbc = 0;
361   bool wl = acm & JB_COLL_ACQUIRE_WRITE;
362   API_RLOCK(db, rci);
363 
364   jbc = iwhmap_get(db->mcolls, coll);
365   if (jbc) {
366     wl ? pthread_rwlock_wrlock(&jbc->rwl) : pthread_rwlock_rdlock(&jbc->rwl);
367     *jbcp = jbc;
368   } else {
369     pthread_rwlock_unlock(&db->rwl); // relock
370     if ((db->oflags & IWKV_RDONLY) || (acm & JB_COLL_ACQUIRE_EXISTING)) {
371       return IW_ERROR_NOT_EXISTS;
372     }
373     API_WLOCK(db, rci);
374     jbc = iwhmap_get(db->mcolls, coll);
375     if (jbc) {
376       pthread_rwlock_rdlock(&jbc->rwl);
377       *jbcp = jbc;
378     } else {
379       JBL meta = 0;
380       IWDB cdb = 0;
381       uint32_t dbid = 0;
382       char keybuf[IWNUMBUF_SIZE + sizeof(KEY_PREFIX_COLLMETA)];
383       IWKV_val key, val;
384 
385       RCC(rc, create_finish, iwkv_new_db(db->iwkv, IWDB_VNUM64_KEYS, &dbid, &cdb));
386       jbc = calloc(1, sizeof(*jbc));
387       if (!jbc) {
388         rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
389         goto create_finish;
390       }
391       RCC(rc, create_finish, jbl_create_empty_object(&meta));
392       if (!binn_object_set_str(&meta->bn, "name", coll)) {
393         rc = JBL_ERROR_CREATION;
394         goto create_finish;
395       }
396       if (!binn_object_set_uint32(&meta->bn, "id", dbid)) {
397         rc = JBL_ERROR_CREATION;
398         goto create_finish;
399       }
400       RCC(rc, create_finish, jbl_as_buf(meta, &val.data, &val.size));
401 
402       key.size = snprintf(keybuf, sizeof(keybuf), KEY_PREFIX_COLLMETA "%u", dbid);
403       if (key.size >= sizeof(keybuf)) {
404         rc = IW_ERROR_OVERFLOW;
405         goto create_finish;
406       }
407       key.data = keybuf;
408       RCC(rc, create_finish, iwkv_put(db->metadb, &key, &val, IWKV_SYNC));
409 
410       jbc->db = db;
411       jbc->meta = meta;
412       rc = _jb_coll_init(jbc, 0);
413       if (rc) {
414         iwkv_del(db->metadb, &key, IWKV_SYNC);
415         goto create_finish;
416       }
417 
418 create_finish:
419       if (rc) {
420         if (meta) {
421           jbl_destroy(&meta);
422         }
423         if (cdb) {
424           iwkv_db_destroy(&cdb);
425         }
426         if (jbc) {
427           jbc->meta = 0; // meta was cleared
428           _jb_coll_release(jbc);
429         }
430       } else {
431         rci = wl ? pthread_rwlock_wrlock(&jbc->rwl) : pthread_rwlock_rdlock(&jbc->rwl); // -V522
432         if (rci) {
433           rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
434           goto finish;
435         }
436         *jbcp = jbc;
437       }
438     }
439   }
440 
441 finish:
442   if (rc) {
443     pthread_rwlock_unlock(&db->rwl);
444   }
445   return rc;
446 }
447 
_jb_coll_acquire_keeplock(EJDB db,const char * coll,bool wl,JBCOLL * jbcp)448 IW_INLINE iwrc _jb_coll_acquire_keeplock(EJDB db, const char *coll, bool wl, JBCOLL *jbcp) {
449   return _jb_coll_acquire_keeplock2(db, coll, wl ? JB_COLL_ACQUIRE_WRITE : 0, jbcp);
450 }
451 
_jb_idx_record_add(JBIDX idx,int64_t id,JBL jbl,JBL jblprev)452 static iwrc _jb_idx_record_add(JBIDX idx, int64_t id, JBL jbl, JBL jblprev) {
453   IWKV_val key;
454   uint8_t step;
455   char vnbuf[IW_VNUMBUFSZ];
456   char numbuf[IWNUMBUF_SIZE];
457 
458   bool jbv_found, jbvprev_found;
459   struct _JBL jbv = { 0 }, jbvprev = { 0 };
460   jbl_type_t jbv_type, jbvprev_type;
461 
462   iwrc rc = 0;
463   IWPOOL *pool = 0;
464   int64_t delta = 0; // delta of added/removed index records
465   bool compound = idx->idbf & IWDB_COMPOUND_KEYS;
466 
467   jbvprev_found = jblprev ? _jbl_at(jblprev, idx->ptr, &jbvprev) : false;
468   jbv_found = jbl ? _jbl_at(jbl, idx->ptr, &jbv) : false;
469 
470   jbv_type = jbl_type(&jbv);
471   jbvprev_type = jbl_type(&jbvprev);
472 
473   // Do not index NULLs, OBJECTs, ARRAYs (in `EJDB_IDX_UNIQUE` mode)
474   if (  ((jbvprev_type == JBV_OBJECT) || (jbvprev_type <= JBV_NULL))
475      || ((jbvprev_type == JBV_ARRAY) && !compound)) {
476     jbvprev_found = false;
477   }
478   if (  ((jbv_type == JBV_OBJECT) || (jbv_type <= JBV_NULL))
479      || ((jbv_type == JBV_ARRAY) && !compound)) {
480     jbv_found = false;
481   }
482 
483   if (  compound
484      && (jbv_type == jbvprev_type)
485      && (jbvprev_type == JBV_ARRAY)) {  // compare next/prev obj arrays
486     pool = iwpool_create(1024);
487     if (!pool) {
488       rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
489       goto finish;
490     }
491     JBL_NODE jbvprev_node, jbv_node;
492     RCC(rc, finish, jbl_to_node(&jbv, &jbv_node, false, pool));
493     jbv.node = jbv_node;
494 
495     RCC(rc, finish, jbl_to_node(&jbvprev, &jbvprev_node, false, pool));
496     jbvprev.node = jbvprev_node;
497 
498     if (_jbl_compare_nodes(jbv_node, jbvprev_node, &rc) == 0) {
499       goto finish; // Arrays are equal or error
500     }
501   } else if (_jbl_is_eq_atomic_values(&jbv, &jbvprev)) {
502     return 0;
503   }
504 
505   if (jbvprev_found) {               // Remove old index elements
506     if (jbvprev_type == JBV_ARRAY) { // TODO: array modification delta?
507       JBL_NODE n;
508       if (!pool) {
509         pool = iwpool_create(1024);
510         if (!pool) {
511           RCC(rc, finish, iwrc_set_errno(IW_ERROR_ALLOC, errno));
512         }
513       }
514       RCC(rc, finish, jbl_to_node(&jbvprev, &n, false, pool));
515       for (n = n->child; n; n = n->next) {
516         jbi_node_fill_ikey(idx, n, &key, numbuf);
517         if (key.size) {
518           key.compound = id;
519           rc = iwkv_del(idx->idb, &key, 0);
520           if (!rc) {
521             --delta;
522           } else if (rc == IWKV_ERROR_NOTFOUND) {
523             rc = 0;
524           }
525           RCGO(rc, finish);
526         }
527       }
528     } else {
529       jbi_jbl_fill_ikey(idx, &jbvprev, &key, numbuf);
530       if (key.size) {
531         key.compound = id;
532         rc = iwkv_del(idx->idb, &key, 0);
533         if (!rc) {
534           --delta;
535         } else if (rc == IWKV_ERROR_NOTFOUND) {
536           rc = 0;
537         }
538         RCGO(rc, finish);
539       }
540     }
541   }
542 
543   if (jbv_found) {               // Add index record
544     if (jbv_type == JBV_ARRAY) { // TODO: array modification delta?
545       JBL_NODE n;
546       if (!pool) {
547         pool = iwpool_create(1024);
548         if (!pool) {
549           RCC(rc, finish, iwrc_set_errno(IW_ERROR_ALLOC, errno));
550         }
551       }
552       RCC(rc, finish, jbl_to_node(&jbv, &n, false, pool));
553       for (n = n->child; n; n = n->next) {
554         jbi_node_fill_ikey(idx, n, &key, numbuf);
555         if (key.size) {
556           key.compound = id;
557           rc = iwkv_put(idx->idb, &key, &EMPTY_VAL, IWKV_NO_OVERWRITE);
558           if (!rc) {
559             ++delta;
560           } else if (rc == IWKV_ERROR_KEY_EXISTS) {
561             rc = 0;
562           } else {
563             goto finish;
564           }
565         }
566       }
567     } else {
568       jbi_jbl_fill_ikey(idx, &jbv, &key, numbuf);
569       if (key.size) {
570         if (compound) {
571           key.compound = id;
572           rc = iwkv_put(idx->idb, &key, &EMPTY_VAL, IWKV_NO_OVERWRITE);
573           if (!rc) {
574             ++delta;
575           } else if (rc == IWKV_ERROR_KEY_EXISTS) {
576             rc = 0;
577           }
578         } else {
579           IW_SETVNUMBUF64(step, vnbuf, id);
580           IWKV_val idval = {
581             .data = vnbuf,
582             .size = step
583           };
584           rc = iwkv_put(idx->idb, &key, &idval, IWKV_NO_OVERWRITE);
585           if (!rc) {
586             ++delta;
587           } else if (rc == IWKV_ERROR_KEY_EXISTS) {
588             rc = EJDB_ERROR_UNIQUE_INDEX_CONSTRAINT_VIOLATED;
589             goto finish;
590           }
591         }
592       }
593     }
594   }
595 
596 finish:
597   if (pool) {
598     iwpool_destroy(pool);
599   }
600   if (delta && !_jb_meta_nrecs_update(idx->jbc->db, idx->dbid, delta)) {
601     idx->rnum += delta;
602   }
603   return rc;
604 }
605 
_jb_idx_record_remove(JBIDX idx,int64_t id,JBL jbl)606 IW_INLINE iwrc _jb_idx_record_remove(JBIDX idx, int64_t id, JBL jbl) {
607   return _jb_idx_record_add(idx, id, 0, jbl);
608 }
609 
_jb_idx_fill(JBIDX idx)610 static iwrc _jb_idx_fill(JBIDX idx) {
611   IWKV_cursor cur;
612   IWKV_val key, val;
613   struct _JBL jbs;
614   int64_t llv;
615   JBL jbl = &jbs;
616 
617   iwrc rc = iwkv_cursor_open(idx->jbc->cdb, &cur, IWKV_CURSOR_BEFORE_FIRST, 0);
618   while (!rc) {
619     rc = iwkv_cursor_to(cur, IWKV_CURSOR_NEXT);
620     if (rc == IWKV_ERROR_NOTFOUND) {
621       rc = 0;
622       break;
623     }
624     rc = iwkv_cursor_get(cur, &key, &val);
625     RCBREAK(rc);
626     if (!binn_load(val.data, &jbs.bn)) {
627       rc = JBL_ERROR_CREATION;
628       break;
629     }
630     memcpy(&llv, key.data, sizeof(llv));
631     rc = _jb_idx_record_add(idx, llv, jbl, 0);
632     iwkv_kv_dispose(&key, &val);
633   }
634   IWRC(iwkv_cursor_close(&cur), rc);
635   return rc;
636 }
637 
638 // Used to avoid deadlocks within a `iwkv_put` context
_jb_put_handler_after(iwrc rc,struct _JBPHCTX * ctx)639 static iwrc _jb_put_handler_after(iwrc rc, struct _JBPHCTX *ctx) {
640   IWKV_val *oldval = &ctx->oldval;
641   if (rc) {
642     if (oldval->size) {
643       iwkv_val_dispose(oldval);
644     }
645     return rc;
646   }
647   JBL prev;
648   struct _JBL jblprev;
649   JBCOLL jbc = ctx->jbc;
650   if (oldval->size) {
651     rc = jbl_from_buf_keep_onstack(&jblprev, oldval->data, oldval->size);
652     RCRET(rc);
653     prev = &jblprev;
654   } else {
655     prev = 0;
656   }
657   JBIDX fail_idx = 0;
658   for (JBIDX idx = jbc->idx; idx; idx = idx->next) {
659     rc = _jb_idx_record_add(idx, ctx->id, ctx->jbl, prev);
660     if (rc) {
661       fail_idx = idx;
662       goto finish;
663     }
664   }
665   if (!prev) {
666     _jb_meta_nrecs_update(jbc->db, jbc->dbid, 1);
667     jbc->rnum += 1;
668   }
669 
670 finish:
671   if (oldval->size) {
672     iwkv_val_dispose(oldval);
673   }
674   if (rc && !oldval->size) {
675     // Cleanup on error inserting new record
676     IWKV_val key = { .data = &ctx->id, .size = sizeof(ctx->id) };
677     for (JBIDX idx = jbc->idx; idx && idx != fail_idx; idx = idx->next) {
678       IWRC(_jb_idx_record_remove(idx, ctx->id, ctx->jbl), rc);
679     }
680     IWRC(iwkv_del(jbc->cdb, &key, 0), rc);
681   }
682   return rc;
683 }
684 
_jb_put_handler(const IWKV_val * key,const IWKV_val * val,IWKV_val * oldval,void * op)685 static iwrc _jb_put_handler(const IWKV_val *key, const IWKV_val *val, IWKV_val *oldval, void *op) {
686   struct _JBPHCTX *ctx = op;
687   if (oldval && oldval->size) {
688     memcpy(&ctx->oldval, oldval, sizeof(*oldval));
689   }
690   return 0;
691 }
692 
_jb_exec_scan_init(JBEXEC * ctx)693 static iwrc _jb_exec_scan_init(JBEXEC *ctx) {
694   ctx->istep = 1;
695   ctx->jblbufsz = ctx->jbc->db->opts.document_buffer_sz;
696   ctx->jblbuf = malloc(ctx->jblbufsz);
697   if (!ctx->jblbuf) {
698     ctx->jblbufsz = 0;
699     return iwrc_set_errno(IW_ERROR_ALLOC, errno);
700   }
701   struct JQP_AUX *aux = ctx->ux->q->aux;
702   if (aux->expr->flags & JQP_EXPR_NODE_FLAG_PK) { // Select by primary key
703     ctx->scanner = jbi_pk_scanner;
704     if (ctx->ux->log) {
705       iwxstr_cat2(ctx->ux->log, "[INDEX] PK");
706     }
707     return 0;
708   }
709   iwrc rc = jbi_selection(ctx);
710   RCRET(rc);
711   if (ctx->midx.idx) {
712     if (ctx->midx.idx->idbf & IWDB_COMPOUND_KEYS) {
713       ctx->scanner = jbi_dup_scanner;
714     } else {
715       ctx->scanner = jbi_uniq_scanner;
716     }
717   } else {
718     ctx->scanner = jbi_full_scanner;
719     if (ctx->ux->log) {
720       iwxstr_cat2(ctx->ux->log, "[INDEX] NO");
721     }
722   }
723   return 0;
724 }
725 
_jb_exec_scan_release(JBEXEC * ctx)726 static void _jb_exec_scan_release(JBEXEC *ctx) {
727   if (ctx->proj_joined_nodes_cache) {
728     // Destroy projected nodes key
729     iwhmap_destroy(ctx->proj_joined_nodes_cache);
730     ctx->proj_joined_nodes_cache = 0;
731   }
732   if (ctx->proj_joined_nodes_pool) {
733     iwpool_destroy(ctx->proj_joined_nodes_pool);
734   }
735   free(ctx->jblbuf);
736 }
737 
_jb_noop_visitor(struct _EJDB_EXEC * ctx,EJDB_DOC doc,int64_t * step)738 static iwrc _jb_noop_visitor(struct _EJDB_EXEC *ctx, EJDB_DOC doc, int64_t *step) {
739   return 0;
740 }
741 
_jb_put_impl(JBCOLL jbc,JBL jbl,int64_t id)742 IW_INLINE iwrc _jb_put_impl(JBCOLL jbc, JBL jbl, int64_t id) {
743   IWKV_val val, key = {
744     .data = &id,
745     .size = sizeof(id)
746   };
747   struct _JBPHCTX pctx = {
748     .id  = id,
749     .jbc = jbc,
750     .jbl = jbl
751   };
752   iwrc rc = jbl_as_buf(jbl, &val.data, &val.size);
753   RCRET(rc);
754   return _jb_put_handler_after(iwkv_puth(jbc->cdb, &key, &val, 0, _jb_put_handler, &pctx), &pctx);
755 }
756 
jb_put(JBCOLL jbc,JBL jbl,int64_t id)757 iwrc jb_put(JBCOLL jbc, JBL jbl, int64_t id) {
758   return _jb_put_impl(jbc, jbl, id);
759 }
760 
jb_cursor_set(JBCOLL jbc,IWKV_cursor cur,int64_t id,JBL jbl)761 iwrc jb_cursor_set(JBCOLL jbc, IWKV_cursor cur, int64_t id, JBL jbl) {
762   IWKV_val val;
763   struct _JBPHCTX pctx = {
764     .id  = id,
765     .jbc = jbc,
766     .jbl = jbl
767   };
768   iwrc rc = jbl_as_buf(jbl, &val.data, &val.size);
769   RCRET(rc);
770   return _jb_put_handler_after(iwkv_cursor_seth(cur, &val, 0, _jb_put_handler, &pctx), &pctx);
771 }
772 
_jb_exec_upsert_lw(JBEXEC * ctx)773 static iwrc _jb_exec_upsert_lw(JBEXEC *ctx) {
774   JBL_NODE n;
775   int64_t id;
776   iwrc rc = 0;
777   JBL jbl = 0;
778   EJDB_EXEC *ux = ctx->ux;
779   JQL q = ux->q;
780   if (q->aux->apply_placeholder) {
781     JQVAL *pv = jql_find_placeholder(q, q->aux->apply_placeholder);
782     if (!pv || (pv->type != JQVAL_JBLNODE) || !pv->vnode) {
783       rc = JQL_ERROR_INVALID_PLACEHOLDER_VALUE_TYPE;
784       goto finish;
785     }
786     n = pv->vnode;
787   } else {
788     n = q->aux->apply;
789   }
790   if (!n) {
791     // Skip silently, nothing to do.
792     goto finish;
793   }
794   RCC(rc, finish, jbl_from_node(&jbl, n));
795   RCC(rc, finish, _jb_put_new_lw(ctx->jbc, jbl, &id));
796 
797   if (!(q->aux->qmode & JQP_QRY_AGGREGATE)) {
798     struct _EJDB_DOC doc = {
799       .id   = id,
800       .raw  = jbl,
801       .node = n
802     };
803     do {
804       ctx->istep = 1;
805       RCC(rc, finish, ux->visitor(ux, &doc, &ctx->istep));
806     } while (ctx->istep == -1);
807   }
808   ++ux->cnt;
809 
810 finish:
811   jbl_destroy(&jbl);
812   return rc;
813 }
814 
815 //----------------------- Public API
816 
ejdb_exec(EJDB_EXEC * ux)817 iwrc ejdb_exec(EJDB_EXEC *ux) {
818   if (!ux || !ux->db || !ux->q) {
819     return IW_ERROR_INVALID_ARGS;
820   }
821   int rci;
822   iwrc rc = 0;
823   if (!ux->visitor) {
824     ux->visitor = _jb_noop_visitor;
825     ux->q->aux->projection = 0; // Actually we don't need projection if exists
826   }
827   if (ux->log) {
828     // set terminating NULL to current pos of log
829     iwxstr_cat(ux->log, 0, 0);
830   }
831   JBEXEC ctx = {
832     .ux = ux
833   };
834   if (ux->limit < 1) {
835     rc = jql_get_limit(ux->q, &ux->limit);
836     RCRET(rc);
837     if (ux->limit < 1) {
838       ux->limit = INT64_MAX;
839     }
840   }
841   if (ux->skip < 1) {
842     rc = jql_get_skip(ux->q, &ux->skip);
843     RCRET(rc);
844   }
845   rc = _jb_coll_acquire_keeplock2(ux->db, ux->q->coll,
846                                   jql_has_apply(ux->q) ? JB_COLL_ACQUIRE_WRITE : JB_COLL_ACQUIRE_EXISTING,
847                                   &ctx.jbc);
848   if (rc == IW_ERROR_NOT_EXISTS) {
849     return 0;
850   } else {
851     RCRET(rc);
852   }
853 
854   RCC(rc, finish, _jb_exec_scan_init(&ctx));
855   if (ctx.sorting) {
856     if (ux->log) {
857       iwxstr_cat2(ux->log, " [COLLECTOR] SORTER\n");
858     }
859     rc = ctx.scanner(&ctx, jbi_sorter_consumer);
860   } else {
861     if (ux->log) {
862       iwxstr_cat2(ux->log, " [COLLECTOR] PLAIN\n");
863     }
864     rc = ctx.scanner(&ctx, jbi_consumer);
865   }
866   RCGO(rc, finish);
867   if ((ux->cnt == 0) && jql_has_apply_upsert(ux->q)) {
868     // No records found trying to upsert new record
869     rc = _jb_exec_upsert_lw(&ctx);
870   }
871 
872 finish:
873   _jb_exec_scan_release(&ctx);
874   API_COLL_UNLOCK(ctx.jbc, rci, rc);
875   jql_reset(ux->q, true, false);
876   return rc;
877 }
878 
879 struct JB_LIST_VISITOR_CTX {
880   EJDB_DOC head;
881   EJDB_DOC tail;
882 };
883 
_jb_exec_list_visitor(struct _EJDB_EXEC * ctx,EJDB_DOC doc,int64_t * step)884 static iwrc _jb_exec_list_visitor(struct _EJDB_EXEC *ctx, EJDB_DOC doc, int64_t *step) {
885   struct JB_LIST_VISITOR_CTX *lvc = ctx->opaque;
886   IWPOOL *pool = ctx->pool;
887   struct _EJDB_DOC *ndoc = iwpool_alloc(sizeof(*ndoc) + sizeof(*doc->raw) + doc->raw->bn.size, pool);
888   if (!ndoc) {
889     return iwrc_set_errno(IW_ERROR_ALLOC, errno);
890   }
891   ndoc->id = doc->id;
892   ndoc->raw = (void*) (((uint8_t*) ndoc) + sizeof(*ndoc));
893   ndoc->raw->node = 0;
894   ndoc->node = doc->node;
895   ndoc->next = 0;
896   ndoc->prev = 0;
897   memcpy(&ndoc->raw->bn, &doc->raw->bn, sizeof(ndoc->raw->bn));
898   ndoc->raw->bn.ptr = ((uint8_t*) ndoc) + sizeof(*ndoc) + sizeof(*doc->raw);
899   memcpy(ndoc->raw->bn.ptr, doc->raw->bn.ptr, doc->raw->bn.size);
900 
901   if (!lvc->head) {
902     lvc->head = ndoc;
903     lvc->tail = ndoc;
904   } else {
905     lvc->tail->next = ndoc;
906     ndoc->prev = lvc->tail;
907     lvc->tail = ndoc;
908   }
909   return 0;
910 }
911 
_jb_list(EJDB db,JQL q,EJDB_DOC * first,int64_t limit,IWXSTR * log,IWPOOL * pool)912 static iwrc _jb_list(EJDB db, JQL q, EJDB_DOC *first, int64_t limit, IWXSTR *log, IWPOOL *pool) {
913   if (!db || !q || !first || !pool) {
914     return IW_ERROR_INVALID_ARGS;
915   }
916   iwrc rc = 0;
917   struct JB_LIST_VISITOR_CTX lvc = { 0 };
918   struct _EJDB_EXEC ux = {
919     .db      = db,
920     .q       = q,
921     .visitor = _jb_exec_list_visitor,
922     .pool    = pool,
923     .limit   = limit,
924     .log     = log,
925     .opaque  = &lvc
926   };
927   rc = ejdb_exec(&ux);
928   if (rc) {
929     *first = 0;
930   } else {
931     *first = lvc.head;
932   }
933   return rc;
934 }
935 
_jb_count(EJDB db,JQL q,int64_t * count,int64_t limit,IWXSTR * log)936 static iwrc _jb_count(EJDB db, JQL q, int64_t *count, int64_t limit, IWXSTR *log) {
937   if (!db || !q || !count) {
938     return IW_ERROR_INVALID_ARGS;
939   }
940   struct _EJDB_EXEC ux = {
941     .db    = db,
942     .q     = q,
943     .limit = limit,
944     .log   = log
945   };
946   iwrc rc = ejdb_exec(&ux);
947   *count = ux.cnt;
948   return rc;
949 }
950 
ejdb_count(EJDB db,JQL q,int64_t * count,int64_t limit)951 iwrc ejdb_count(EJDB db, JQL q, int64_t *count, int64_t limit) {
952   return _jb_count(db, q, count, limit, 0);
953 }
954 
ejdb_count2(EJDB db,const char * coll,const char * q,int64_t * count,int64_t limit)955 iwrc ejdb_count2(EJDB db, const char *coll, const char *q, int64_t *count, int64_t limit) {
956   JQL jql;
957   iwrc rc = jql_create(&jql, coll, q);
958   RCRET(rc);
959   rc = _jb_count(db, jql, count, limit, 0);
960   jql_destroy(&jql);
961   return rc;
962 }
963 
ejdb_update(EJDB db,JQL q)964 iwrc ejdb_update(EJDB db, JQL q) {
965   int64_t count;
966   return ejdb_count(db, q, &count, 0);
967 }
968 
ejdb_update2(EJDB db,const char * coll,const char * q)969 iwrc ejdb_update2(EJDB db, const char *coll, const char *q) {
970   int64_t count;
971   return ejdb_count2(db, coll, q, &count, 0);
972 }
973 
ejdb_list(EJDB db,JQL q,EJDB_DOC * first,int64_t limit,IWPOOL * pool)974 iwrc ejdb_list(EJDB db, JQL q, EJDB_DOC *first, int64_t limit, IWPOOL *pool) {
975   return _jb_list(db, q, first, limit, 0, pool);
976 }
977 
ejdb_list3(EJDB db,const char * coll,const char * query,int64_t limit,IWXSTR * log,EJDB_LIST * listp)978 iwrc ejdb_list3(EJDB db, const char *coll, const char *query, int64_t limit, IWXSTR *log, EJDB_LIST *listp) {
979   if (!listp) {
980     return IW_ERROR_INVALID_ARGS;
981   }
982   iwrc rc = 0;
983   *listp = 0;
984   IWPOOL *pool = iwpool_create(1024);
985   if (!pool) {
986     return iwrc_set_errno(IW_ERROR_ALLOC, errno);
987   }
988   EJDB_LIST list = iwpool_alloc(sizeof(*list), pool);
989   if (!list) {
990     rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
991     goto finish;
992   }
993   list->first = 0;
994   list->db = db;
995   list->pool = pool;
996   RCC(rc, finish, jql_create(&list->q, coll, query));
997   rc = _jb_list(db, list->q, &list->first, limit, log, list->pool);
998 
999 finish:
1000   if (rc) {
1001     iwpool_destroy(pool);
1002   } else {
1003     *listp = list;
1004   }
1005   return rc;
1006 }
1007 
ejdb_list4(EJDB db,JQL q,int64_t limit,IWXSTR * log,EJDB_LIST * listp)1008 iwrc ejdb_list4(EJDB db, JQL q, int64_t limit, IWXSTR *log, EJDB_LIST *listp) {
1009   if (!listp) {
1010     return IW_ERROR_INVALID_ARGS;
1011   }
1012   iwrc rc = 0;
1013   *listp = 0;
1014   IWPOOL *pool = iwpool_create(1024);
1015   if (!pool) {
1016     return iwrc_set_errno(IW_ERROR_ALLOC, errno);
1017   }
1018   EJDB_LIST list = iwpool_alloc(sizeof(*list), pool);
1019   if (!list) {
1020     rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
1021     goto finish;
1022   }
1023   list->q = 0;
1024   list->first = 0;
1025   list->db = db;
1026   list->pool = pool;
1027   rc = _jb_list(db, q, &list->first, limit, log, list->pool);
1028 
1029 finish:
1030   if (rc) {
1031     iwpool_destroy(pool);
1032   } else {
1033     *listp = list;
1034   }
1035   return rc;
1036 }
1037 
ejdb_list2(EJDB db,const char * coll,const char * query,int64_t limit,EJDB_LIST * listp)1038 iwrc ejdb_list2(EJDB db, const char *coll, const char *query, int64_t limit, EJDB_LIST *listp) {
1039   return ejdb_list3(db, coll, query, limit, 0, listp);
1040 }
1041 
ejdb_list_destroy(EJDB_LIST * listp)1042 void ejdb_list_destroy(EJDB_LIST *listp) {
1043   if (listp) {
1044     EJDB_LIST list = *listp;
1045     if (list) {
1046       if (list->q) {
1047         jql_destroy(&list->q);
1048       }
1049       if (list->pool) {
1050         iwpool_destroy(list->pool);
1051       }
1052     }
1053     *listp = 0;
1054   }
1055 }
1056 
ejdb_remove_index(EJDB db,const char * coll,const char * path,ejdb_idx_mode_t mode)1057 iwrc ejdb_remove_index(EJDB db, const char *coll, const char *path, ejdb_idx_mode_t mode) {
1058   if (!db || !coll || !path) {
1059     return IW_ERROR_INVALID_ARGS;
1060   }
1061   int rci;
1062   JBCOLL jbc;
1063   IWKV_val key;
1064   JBL_PTR ptr = 0;
1065   char keybuf[sizeof(KEY_PREFIX_IDXMETA) + 1 + 2 * IWNUMBUF_SIZE]; // Full key format: i.<coldbid>.<idxdbid>
1066 
1067   iwrc rc = _jb_coll_acquire_keeplock2(db, coll, JB_COLL_ACQUIRE_WRITE | JB_COLL_ACQUIRE_EXISTING, &jbc);
1068   RCRET(rc);
1069 
1070   RCC(rc, finish, jbl_ptr_alloc(path, &ptr));
1071 
1072   for (JBIDX idx = jbc->idx, prev = 0; idx; idx = idx->next) {
1073     if (((idx->mode & ~EJDB_IDX_UNIQUE) == (mode & ~EJDB_IDX_UNIQUE)) && !jbl_ptr_cmp(idx->ptr, ptr)) {
1074       key.data = keybuf;
1075       key.size = snprintf(keybuf, sizeof(keybuf), KEY_PREFIX_IDXMETA "%u" "." "%u", jbc->dbid, idx->dbid);
1076       if (key.size >= sizeof(keybuf)) {
1077         rc = IW_ERROR_OVERFLOW;
1078         goto finish;
1079       }
1080       RCC(rc, finish, iwkv_del(db->metadb, &key, 0));
1081       _jb_meta_nrecs_removedb(db, idx->dbid);
1082       if (prev) {
1083         prev->next = idx->next;
1084       } else {
1085         jbc->idx = idx->next;
1086       }
1087       if (idx->idb) {
1088         iwkv_db_destroy(&idx->idb);
1089       }
1090       _jb_idx_release(idx);
1091       break;
1092     }
1093     prev = idx;
1094   }
1095 
1096 finish:
1097   free(ptr);
1098   API_COLL_UNLOCK(jbc, rci, rc);
1099   return rc;
1100 }
1101 
ejdb_ensure_index(EJDB db,const char * coll,const char * path,ejdb_idx_mode_t mode)1102 iwrc ejdb_ensure_index(EJDB db, const char *coll, const char *path, ejdb_idx_mode_t mode) {
1103   if (!db || !coll || !path) {
1104     return IW_ERROR_INVALID_ARGS;
1105   }
1106   int rci;
1107   JBCOLL jbc;
1108   IWKV_val key, val;
1109   char keybuf[sizeof(KEY_PREFIX_IDXMETA) + 1 + 2 * IWNUMBUF_SIZE]; // Full key format: i.<coldbid>.<idxdbid>
1110 
1111   JBIDX idx = 0;
1112   JBL_PTR ptr = 0;
1113   binn *imeta = 0;
1114 
1115   switch (mode & (EJDB_IDX_STR | EJDB_IDX_I64 | EJDB_IDX_F64)) {
1116     case EJDB_IDX_STR:
1117     case EJDB_IDX_I64:
1118     case EJDB_IDX_F64:
1119       break;
1120     default:
1121       return EJDB_ERROR_INVALID_INDEX_MODE;
1122   }
1123 
1124   iwrc rc = _jb_coll_acquire_keeplock(db, coll, true, &jbc);
1125   RCRET(rc);
1126 
1127   RCC(rc, finish, jbl_ptr_alloc(path, &ptr));
1128 
1129   for (idx = jbc->idx; idx; idx = idx->next) {
1130     if (((idx->mode & ~EJDB_IDX_UNIQUE) == (mode & ~EJDB_IDX_UNIQUE)) && !jbl_ptr_cmp(idx->ptr, ptr)) {
1131       if (idx->mode != mode) {
1132         rc = EJDB_ERROR_MISMATCHED_INDEX_UNIQUENESS_MODE;
1133         idx = 0;
1134       }
1135       goto finish;
1136     }
1137   }
1138 
1139   idx = calloc(1, sizeof(*idx));
1140   if (!idx) {
1141     rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
1142     goto finish;
1143   }
1144   idx->mode = mode;
1145   idx->jbc = jbc;
1146   idx->ptr = ptr;
1147   ptr = 0;
1148   idx->idbf = 0;
1149   if (mode & EJDB_IDX_I64) {
1150     idx->idbf |= IWDB_VNUM64_KEYS;
1151   } else if (mode & EJDB_IDX_F64) {
1152     idx->idbf |= IWDB_REALNUM_KEYS;
1153   }
1154   if (!(mode & EJDB_IDX_UNIQUE)) {
1155     idx->idbf |= IWDB_COMPOUND_KEYS;
1156   }
1157 
1158   RCC(rc, finish, iwkv_new_db(db->iwkv, idx->idbf, &idx->dbid, &idx->idb));
1159   RCC(rc, finish, _jb_idx_fill(idx));
1160 
1161   // save index meta into metadb
1162   imeta = binn_object();
1163   if (!imeta) {
1164     rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
1165     goto finish;
1166   }
1167 
1168   if (  !binn_object_set_str(imeta, "ptr", path)
1169      || !binn_object_set_uint32(imeta, "mode", idx->mode)
1170      || !binn_object_set_uint32(imeta, "idbf", idx->idbf)
1171      || !binn_object_set_uint32(imeta, "dbid", idx->dbid)) {
1172     rc = JBL_ERROR_CREATION;
1173     goto finish;
1174   }
1175 
1176   key.data = keybuf;
1177   // Full key format: i.<coldbid>.<idxdbid>
1178   key.size = snprintf(keybuf, sizeof(keybuf), KEY_PREFIX_IDXMETA "%u" "." "%u", jbc->dbid, idx->dbid);
1179   if (key.size >= sizeof(keybuf)) {
1180     rc = IW_ERROR_OVERFLOW;
1181     goto finish;
1182   }
1183   val.data = binn_ptr(imeta);
1184   val.size = binn_size(imeta);
1185   RCC(rc, finish, iwkv_put(db->metadb, &key, &val, 0));
1186 
1187   idx->next = jbc->idx;
1188   jbc->idx = idx;
1189 
1190 finish:
1191   if (rc) {
1192     if (idx) {
1193       if (idx->idb) {
1194         iwkv_db_destroy(&idx->idb);
1195         idx->idb = 0;
1196       }
1197       _jb_idx_release(idx);
1198     }
1199   }
1200   free(ptr);
1201   binn_free(imeta);
1202   API_COLL_UNLOCK(jbc, rci, rc);
1203   return rc;
1204 }
1205 
_jb_patch(EJDB db,const char * coll,int64_t id,bool upsert,const char * patchjson,JBL_NODE patchjbn,JBL patchjbl)1206 static iwrc _jb_patch(
1207   EJDB db, const char *coll, int64_t id, bool upsert,
1208   const char *patchjson, JBL_NODE patchjbn, JBL patchjbl
1209   ) {
1210   int rci;
1211   JBCOLL jbc;
1212   struct _JBL sjbl;
1213   JBL_NODE root, patch;
1214   JBL ujbl = 0;
1215   IWPOOL *pool = 0;
1216   IWKV_val val = { 0 };
1217   IWKV_val key = {
1218     .data = &id,
1219     .size = sizeof(id)
1220   };
1221 
1222   iwrc rc = _jb_coll_acquire_keeplock(db, coll, true, &jbc);
1223   RCRET(rc);
1224 
1225   rc = iwkv_get(jbc->cdb, &key, &val);
1226   if (upsert && (rc == IWKV_ERROR_NOTFOUND)) {
1227     if (patchjson) {
1228       rc = jbl_from_json(&ujbl, patchjson);
1229     } else if (patchjbl) {
1230       ujbl = patchjbl;
1231     } else if (patchjbn) {
1232       rc = jbl_from_node(&ujbl, patchjbn);
1233     } else {
1234       rc = IW_ERROR_INVALID_ARGS;
1235     }
1236     RCGO(rc, finish);
1237     if (jbl_type(ujbl) != JBV_OBJECT) {
1238       rc = EJDB_ERROR_PATCH_JSON_NOT_OBJECT;
1239       goto finish;
1240     }
1241     rc = _jb_put_impl(jbc, ujbl, id);
1242     if (!rc && (jbc->id_seq < id)) {
1243       jbc->id_seq = id;
1244     }
1245     goto finish;
1246   } else {
1247     RCGO(rc, finish);
1248   }
1249 
1250   RCC(rc, finish, jbl_from_buf_keep_onstack(&sjbl, val.data, val.size));
1251 
1252   pool = iwpool_create_empty();
1253   if (!pool) {
1254     rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
1255     goto finish;
1256   }
1257 
1258   RCC(rc, finish, jbl_to_node(&sjbl, &root, false, pool));
1259 
1260   if (patchjson) {
1261     rc = jbn_from_json(patchjson, &patch, pool);
1262   } else if (patchjbl) {
1263     rc = jbl_to_node(patchjbl, &patch, false, pool);
1264   } else if (patchjbn) {
1265     patch = patchjbn;
1266   } else {
1267     rc = IW_ERROR_INVALID_ARGS;
1268   }
1269   RCGO(rc, finish);
1270 
1271   RCC(rc, finish, jbn_patch_auto(root, patch, pool));
1272 
1273   if (root->type == JBV_OBJECT) {
1274     RCC(rc, finish, jbl_create_empty_object(&ujbl));
1275   } else if (root->type == JBV_ARRAY) {
1276     RCC(rc, finish, jbl_create_empty_array(&ujbl));
1277   } else {
1278     rc = JBL_ERROR_CREATION;
1279     goto finish;
1280   }
1281 
1282   RCC(rc, finish, jbl_fill_from_node(ujbl, root));
1283   rc = _jb_put_impl(jbc, ujbl, id);
1284 
1285 finish:
1286   API_COLL_UNLOCK(jbc, rci, rc);
1287   if (ujbl != patchjbl) {
1288     jbl_destroy(&ujbl);
1289   }
1290   if (val.data) {
1291     iwkv_val_dispose(&val);
1292   }
1293   iwpool_destroy(pool);
1294   return rc;
1295 }
1296 
_jb_wal_lock_interceptor(bool before,void * op)1297 static iwrc _jb_wal_lock_interceptor(bool before, void *op) {
1298   int rci;
1299   iwrc rc = 0;
1300   EJDB db = op;
1301   assert(db);
1302   if (before) {
1303     API_WLOCK2(db, rci);
1304   } else {
1305     API_UNLOCK(db, rci, rc);
1306   }
1307   return rc;
1308 }
1309 
ejdb_patch(EJDB db,const char * coll,const char * patchjson,int64_t id)1310 iwrc ejdb_patch(EJDB db, const char *coll, const char *patchjson, int64_t id) {
1311   return _jb_patch(db, coll, id, false, patchjson, 0, 0);
1312 }
1313 
ejdb_patch_jbn(EJDB db,const char * coll,JBL_NODE patch,int64_t id)1314 iwrc ejdb_patch_jbn(EJDB db, const char *coll, JBL_NODE patch, int64_t id) {
1315   return _jb_patch(db, coll, id, false, 0, patch, 0);
1316 }
1317 
ejdb_patch_jbl(EJDB db,const char * coll,JBL patch,int64_t id)1318 iwrc ejdb_patch_jbl(EJDB db, const char *coll, JBL patch, int64_t id) {
1319   return _jb_patch(db, coll, id, false, 0, 0, patch);
1320 }
1321 
ejdb_merge_or_put(EJDB db,const char * coll,const char * patchjson,int64_t id)1322 iwrc ejdb_merge_or_put(EJDB db, const char *coll, const char *patchjson, int64_t id) {
1323   return _jb_patch(db, coll, id, true, patchjson, 0, 0);
1324 }
1325 
ejdb_merge_or_put_jbn(EJDB db,const char * coll,JBL_NODE patch,int64_t id)1326 iwrc ejdb_merge_or_put_jbn(EJDB db, const char *coll, JBL_NODE patch, int64_t id) {
1327   return _jb_patch(db, coll, id, true, 0, patch, 0);
1328 }
1329 
ejdb_merge_or_put_jbl(EJDB db,const char * coll,JBL patch,int64_t id)1330 iwrc ejdb_merge_or_put_jbl(EJDB db, const char *coll, JBL patch, int64_t id) {
1331   return _jb_patch(db, coll, id, true, 0, 0, patch);
1332 }
1333 
ejdb_put(EJDB db,const char * coll,JBL jbl,int64_t id)1334 iwrc ejdb_put(EJDB db, const char *coll, JBL jbl, int64_t id) {
1335   if (!jbl) {
1336     return IW_ERROR_INVALID_ARGS;
1337   }
1338   int rci;
1339   JBCOLL jbc;
1340   iwrc rc = _jb_coll_acquire_keeplock(db, coll, true, &jbc);
1341   RCRET(rc);
1342   rc = _jb_put_impl(jbc, jbl, id);
1343   if (!rc && (jbc->id_seq < id)) {
1344     jbc->id_seq = id;
1345   }
1346   API_COLL_UNLOCK(jbc, rci, rc);
1347   return rc;
1348 }
1349 
ejdb_put_jbn(EJDB db,const char * coll,JBL_NODE jbn,int64_t id)1350 iwrc ejdb_put_jbn(EJDB db, const char *coll, JBL_NODE jbn, int64_t id) {
1351   JBL jbl = 0;
1352   iwrc rc = jbl_from_node(&jbl, jbn);
1353   RCRET(rc);
1354   rc = ejdb_put(db, coll, jbl, id);
1355   jbl_destroy(&jbl);
1356   return rc;
1357 }
1358 
_jb_put_new_lw(JBCOLL jbc,JBL jbl,int64_t * id)1359 static iwrc _jb_put_new_lw(JBCOLL jbc, JBL jbl, int64_t *id) {
1360   iwrc rc = 0;
1361   int64_t oid = jbc->id_seq + 1;
1362   IWKV_val val, key = {
1363     .data = &oid,
1364     .size = sizeof(oid)
1365   };
1366   struct _JBPHCTX pctx = {
1367     .id  = oid,
1368     .jbc = jbc,
1369     .jbl = jbl
1370   };
1371 
1372   RCC(rc, finish, jbl_as_buf(jbl, &val.data, &val.size));
1373   RCC(rc, finish, _jb_put_handler_after(iwkv_puth(jbc->cdb, &key, &val, 0, _jb_put_handler, &pctx), &pctx));
1374 
1375   jbc->id_seq = oid;
1376   if (id) {
1377     *id = oid;
1378   }
1379 
1380 finish:
1381   return rc;
1382 }
1383 
ejdb_put_new(EJDB db,const char * coll,JBL jbl,int64_t * id)1384 iwrc ejdb_put_new(EJDB db, const char *coll, JBL jbl, int64_t *id) {
1385   if (!jbl) {
1386     return IW_ERROR_INVALID_ARGS;
1387   }
1388   int rci;
1389   JBCOLL jbc;
1390   if (id) {
1391     *id = 0;
1392   }
1393   iwrc rc = _jb_coll_acquire_keeplock(db, coll, true, &jbc);
1394   RCRET(rc);
1395 
1396   rc = _jb_put_new_lw(jbc, jbl, id);
1397 
1398   API_COLL_UNLOCK(jbc, rci, rc);
1399   return rc;
1400 }
1401 
ejdb_put_new_jbn(EJDB db,const char * coll,JBL_NODE jbn,int64_t * id)1402 iwrc ejdb_put_new_jbn(EJDB db, const char *coll, JBL_NODE jbn, int64_t *id) {
1403   JBL jbl = 0;
1404   iwrc rc = jbl_from_node(&jbl, jbn);
1405   RCRET(rc);
1406   rc = ejdb_put_new(db, coll, jbl, id);
1407   jbl_destroy(&jbl);
1408   return rc;
1409 }
1410 
jb_get(EJDB db,const char * coll,int64_t id,jb_coll_acquire_t acm,JBL * jblp)1411 iwrc jb_get(EJDB db, const char *coll, int64_t id, jb_coll_acquire_t acm, JBL *jblp) {
1412   if (!id || !jblp) {
1413     return IW_ERROR_INVALID_ARGS;
1414   }
1415   *jblp = 0;
1416 
1417   int rci;
1418   JBCOLL jbc;
1419   JBL jbl = 0;
1420   IWKV_val val = { 0 };
1421   IWKV_val key = { .data = &id, .size = sizeof(id) };
1422 
1423   iwrc rc = _jb_coll_acquire_keeplock2(db, coll, acm, &jbc);
1424   RCRET(rc);
1425 
1426   RCC(rc, finish, iwkv_get(jbc->cdb, &key, &val));
1427   RCC(rc, finish, jbl_from_buf_keep(&jbl, val.data, val.size, false));
1428 
1429   *jblp = jbl;
1430 
1431 finish:
1432   if (rc) {
1433     if (jbl) {
1434       jbl_destroy(&jbl);
1435     } else {
1436       iwkv_val_dispose(&val);
1437     }
1438   }
1439   API_COLL_UNLOCK(jbc, rci, rc);
1440   return rc;
1441 }
1442 
ejdb_get(EJDB db,const char * coll,int64_t id,JBL * jblp)1443 iwrc ejdb_get(EJDB db, const char *coll, int64_t id, JBL *jblp) {
1444   return jb_get(db, coll, id, JB_COLL_ACQUIRE_EXISTING, jblp);
1445 }
1446 
ejdb_del(EJDB db,const char * coll,int64_t id)1447 iwrc ejdb_del(EJDB db, const char *coll, int64_t id) {
1448   int rci;
1449   JBCOLL jbc;
1450   struct _JBL jbl;
1451   IWKV_val val = { 0 };
1452   IWKV_val key = { .data = &id, .size = sizeof(id) };
1453 
1454   iwrc rc = _jb_coll_acquire_keeplock2(db, coll, JB_COLL_ACQUIRE_WRITE | JB_COLL_ACQUIRE_EXISTING, &jbc);
1455   RCRET(rc);
1456 
1457   RCC(rc, finish, iwkv_get(jbc->cdb, &key, &val));
1458   RCC(rc, finish, jbl_from_buf_keep_onstack(&jbl, val.data, val.size));
1459 
1460   for (JBIDX idx = jbc->idx; idx; idx = idx->next) {
1461     IWRC(_jb_idx_record_remove(idx, id, &jbl), rc);
1462   }
1463 
1464   RCC(rc, finish, iwkv_del(jbc->cdb, &key, 0));
1465   _jb_meta_nrecs_update(jbc->db, jbc->dbid, -1);
1466   jbc->rnum -= 1;
1467 
1468 finish:
1469   if (val.data) {
1470     iwkv_val_dispose(&val);
1471   }
1472   API_COLL_UNLOCK(jbc, rci, rc);
1473   return rc;
1474 }
1475 
jb_del(JBCOLL jbc,JBL jbl,int64_t id)1476 iwrc jb_del(JBCOLL jbc, JBL jbl, int64_t id) {
1477   iwrc rc = 0;
1478   IWKV_val key = { .data = &id, .size = sizeof(id) };
1479   for (JBIDX idx = jbc->idx; idx; idx = idx->next) {
1480     IWRC(_jb_idx_record_remove(idx, id, jbl), rc);
1481   }
1482   rc = iwkv_del(jbc->cdb, &key, 0);
1483   RCRET(rc);
1484   _jb_meta_nrecs_update(jbc->db, jbc->dbid, -1);
1485   jbc->rnum -= 1;
1486   return rc;
1487 }
1488 
jb_cursor_del(JBCOLL jbc,IWKV_cursor cur,int64_t id,JBL jbl)1489 iwrc jb_cursor_del(JBCOLL jbc, IWKV_cursor cur, int64_t id, JBL jbl) {
1490   iwrc rc = 0;
1491   for (JBIDX idx = jbc->idx; idx; idx = idx->next) {
1492     IWRC(_jb_idx_record_remove(idx, id, jbl), rc);
1493   }
1494   rc = iwkv_cursor_del(cur, 0);
1495   RCRET(rc);
1496   _jb_meta_nrecs_update(jbc->db, jbc->dbid, -1);
1497   jbc->rnum -= 1;
1498   return rc;
1499 }
1500 
ejdb_ensure_collection(EJDB db,const char * coll)1501 iwrc ejdb_ensure_collection(EJDB db, const char *coll) {
1502   int rci;
1503   JBCOLL jbc;
1504   iwrc rc = _jb_coll_acquire_keeplock(db, coll, false, &jbc);
1505   RCRET(rc);
1506   API_COLL_UNLOCK(jbc, rci, rc);
1507   return rc;
1508 }
1509 
ejdb_remove_collection(EJDB db,const char * coll)1510 iwrc ejdb_remove_collection(EJDB db, const char *coll) {
1511   int rci;
1512   iwrc rc = 0;
1513   if (db->oflags & IWKV_RDONLY) {
1514     return IW_ERROR_READONLY;
1515   }
1516   API_WLOCK(db, rci);
1517   JBCOLL jbc;
1518   IWKV_val key;
1519   char keybuf[sizeof(KEY_PREFIX_IDXMETA) + 1 + 2 * IWNUMBUF_SIZE]; // Full key format: i.<coldbid>.<idxdbid>
1520 
1521   jbc = iwhmap_get(db->mcolls, coll);
1522   if (jbc) {
1523     key.data = keybuf;
1524     key.size = snprintf(keybuf, sizeof(keybuf), KEY_PREFIX_COLLMETA "%u", jbc->dbid);
1525 
1526     RCC(rc, finish, iwkv_del(jbc->db->metadb, &key, IWKV_SYNC));
1527 
1528     _jb_meta_nrecs_removedb(db, jbc->dbid);
1529 
1530     for (JBIDX idx = jbc->idx; idx; idx = idx->next) {
1531       key.data = keybuf;
1532       key.size = snprintf(keybuf, sizeof(keybuf), KEY_PREFIX_IDXMETA "%u" "." "%u", jbc->dbid, idx->dbid);
1533       RCC(rc, finish, iwkv_del(jbc->db->metadb, &key, 0));
1534       _jb_meta_nrecs_removedb(db, idx->dbid);
1535     }
1536     for (JBIDX idx = jbc->idx, nidx; idx; idx = nidx) {
1537       IWRC(iwkv_db_destroy(&idx->idb), rc);
1538       idx->idb = 0;
1539       nidx = idx->next;
1540       _jb_idx_release(idx);
1541     }
1542     jbc->idx = 0;
1543     IWRC(iwkv_db_destroy(&jbc->cdb), rc);
1544     iwhmap_remove(db->mcolls, coll);
1545   }
1546 
1547 finish:
1548   API_UNLOCK(db, rci, rc);
1549   return rc;
1550 }
1551 
jb_collection_join_resolver(int64_t id,const char * coll,JBL * out,JBEXEC * ctx)1552 iwrc jb_collection_join_resolver(int64_t id, const char *coll, JBL *out, JBEXEC *ctx) {
1553   assert(out && ctx && coll);
1554   EJDB db = ctx->jbc->db;
1555   return jb_get(db, coll, id, JB_COLL_ACQUIRE_EXISTING, out);
1556 }
1557 
jb_proj_node_cache_cmp(const void * v1,const void * v2)1558 int jb_proj_node_cache_cmp(const void *v1, const void *v2) {
1559   const struct _JBDOCREF *r1 = v1;
1560   const struct _JBDOCREF *r2 = v2;
1561   int ret = r1->id > r2->id ? 1 : r1->id < r2->id ? -1 : 0;
1562   if (!ret) {
1563     return strcmp(r1->coll, r2->coll);
1564   }
1565   return ret;
1566 }
1567 
jb_proj_node_kvfree(void * key,void * val)1568 void jb_proj_node_kvfree(void *key, void *val) {
1569   free(key);
1570 }
1571 
jb_proj_node_hash(const void * key)1572 uint32_t jb_proj_node_hash(const void *key) {
1573   const struct _JBDOCREF *ref = key;
1574   return murmur3(key, sizeof(*ref));
1575 }
1576 
ejdb_rename_collection(EJDB db,const char * coll,const char * new_coll)1577 iwrc ejdb_rename_collection(EJDB db, const char *coll, const char *new_coll) {
1578   if (!coll || !new_coll) {
1579     return IW_ERROR_INVALID_ARGS;
1580   }
1581   int rci;
1582   iwrc rc = 0;
1583   if (db->oflags & IWKV_RDONLY) {
1584     return IW_ERROR_READONLY;
1585   }
1586   IWKV_val key, val;
1587   JBL nmeta = 0, jbv = 0;
1588   char keybuf[IWNUMBUF_SIZE + sizeof(KEY_PREFIX_COLLMETA)];
1589 
1590   API_WLOCK(db, rci);
1591 
1592   JBCOLL jbc = iwhmap_get(db->mcolls, coll);
1593   if (!jbc) {
1594     rc = EJDB_ERROR_COLLECTION_NOT_FOUND;
1595     goto finish;
1596   }
1597 
1598   if (iwhmap_get(db->mcolls, new_coll)) {
1599     rc = EJDB_ERROR_TARGET_COLLECTION_EXISTS;
1600     goto finish;
1601   }
1602 
1603   RCC(rc, finish, jbl_create_empty_object(&nmeta));
1604   if (!binn_object_set_str(&nmeta->bn, "name", new_coll)) {
1605     rc = JBL_ERROR_CREATION;
1606     goto finish;
1607   }
1608 
1609   if (!binn_object_set_uint32(&nmeta->bn, "id", jbc->dbid)) {
1610     rc = JBL_ERROR_CREATION;
1611     goto finish;
1612   }
1613 
1614   RCC(rc, finish, jbl_as_buf(nmeta, &val.data, &val.size));
1615   key.size = snprintf(keybuf, sizeof(keybuf), KEY_PREFIX_COLLMETA "%u", jbc->dbid);
1616   if (key.size >= sizeof(keybuf)) {
1617     rc = IW_ERROR_OVERFLOW;
1618     goto finish;
1619   }
1620   key.data = keybuf;
1621 
1622   RCC(rc, finish, jbl_at(nmeta, "/name", &jbv));
1623   const char *new_name = jbl_get_str(jbv);
1624   RCC(rc, finish, iwkv_put(db->metadb, &key, &val, IWKV_SYNC));
1625   RCC(rc, finish, iwhmap_rename(db->mcolls, coll, (void*) new_name));
1626 
1627   jbc->name = new_name;
1628   jbl_destroy(&jbc->meta);
1629   jbc->meta = nmeta;
1630 
1631 finish:
1632   if (jbv) {
1633     jbl_destroy(&jbv);
1634   }
1635   if (rc) {
1636     if (nmeta) {
1637       jbl_destroy(&nmeta);
1638     }
1639   }
1640   API_UNLOCK(db, rci, rc);
1641   return rc;
1642 }
1643 
ejdb_get_meta(EJDB db,JBL * jblp)1644 iwrc ejdb_get_meta(EJDB db, JBL *jblp) {
1645   int rci;
1646   *jblp = 0;
1647   JBL jbl;
1648   iwrc rc = jbl_create_empty_object(&jbl);
1649   RCRET(rc);
1650   binn *clist = 0;
1651   API_RLOCK(db, rci);
1652   if (!binn_object_set_str(&jbl->bn, "version", ejdb_version_full())) {
1653     rc = JBL_ERROR_CREATION;
1654     goto finish;
1655   }
1656   IWFS_FSM_STATE sfsm;
1657   rc = iwkv_state(db->iwkv, &sfsm);
1658   RCRET(rc);
1659   if (  !binn_object_set_str(&jbl->bn, "file", sfsm.exfile.file.opts.path)
1660      || !binn_object_set_int64(&jbl->bn, "size", sfsm.exfile.fsize)) {
1661     rc = JBL_ERROR_CREATION;
1662     goto finish;
1663   }
1664   clist = binn_list();
1665   if (!clist) {
1666     rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
1667     goto finish;
1668   }
1669 
1670   IWHMAP_ITER iter;
1671   iwhmap_iter_init(db->mcolls, &iter);
1672   while (iwhmap_iter_next(&iter)) {
1673     JBCOLL jbc = (void*) iter.val;
1674     RCC(rc, finish, _jb_coll_add_meta_lr(jbc, clist));
1675   }
1676 
1677   if (!binn_object_set_list(&jbl->bn, "collections", clist)) {
1678     rc = JBL_ERROR_CREATION;
1679     goto finish;
1680   }
1681   binn_free(clist);
1682   clist = 0;
1683 
1684 finish:
1685   API_UNLOCK(db, rci, rc);
1686   if (rc) {
1687     if (clist) {
1688       binn_free(clist);
1689     }
1690     jbl_destroy(&jbl);
1691   } else {
1692     *jblp = jbl;
1693   }
1694   return rc;
1695 }
1696 
ejdb_online_backup(EJDB db,uint64_t * ts,const char * target_file)1697 iwrc ejdb_online_backup(EJDB db, uint64_t *ts, const char *target_file) {
1698   ENSURE_OPEN(db);
1699   return iwkv_online_backup(db->iwkv, ts, target_file);
1700 }
1701 
ejdb_get_iwkv(EJDB db,IWKV * kvp)1702 iwrc ejdb_get_iwkv(EJDB db, IWKV *kvp) {
1703   if (!db || !kvp) {
1704     return IW_ERROR_INVALID_ARGS;
1705   }
1706   *kvp = db->iwkv;
1707   return 0;
1708 }
1709 
_mcolls_map_entry_free(void * key,void * val)1710 static void _mcolls_map_entry_free(void *key, void *val) {
1711   if (val) {
1712     _jb_coll_release(val);
1713   }
1714 }
1715 
ejdb_open(const EJDB_OPTS * _opts,EJDB * ejdbp)1716 iwrc ejdb_open(const EJDB_OPTS *_opts, EJDB *ejdbp) {
1717   *ejdbp = 0;
1718   int rci;
1719   iwrc rc = ejdb_init();
1720   RCRET(rc);
1721   if (!_opts || !_opts->kv.path || !ejdbp) {
1722     return IW_ERROR_INVALID_ARGS;
1723   }
1724 
1725   EJDB db = calloc(1, sizeof(*db));
1726   if (!db) {
1727     return iwrc_set_errno(IW_ERROR_ALLOC, errno);
1728   }
1729 
1730   memcpy(&db->opts, _opts, sizeof(db->opts));
1731   if (!db->opts.sort_buffer_sz) {
1732     db->opts.sort_buffer_sz = 16 * 1024 * 1024; // 16Mb
1733   }
1734   if (db->opts.sort_buffer_sz < 1024 * 1024) { // Min 1Mb
1735     db->opts.sort_buffer_sz = 1024 * 1024;
1736   }
1737   if (!db->opts.document_buffer_sz) { // 64Kb
1738     db->opts.document_buffer_sz = 64 * 1024;
1739   }
1740   if (db->opts.document_buffer_sz < 16 * 1024) { // Min 16Kb
1741     db->opts.document_buffer_sz = 16 * 1024;
1742   }
1743   EJDB_HTTP *http = &db->opts.http;
1744   if (http->bind) {
1745     http->bind = strdup(http->bind);
1746   }
1747   if (http->access_token) {
1748     http->access_token = strdup(http->access_token);
1749     if (!http->access_token) {
1750       return iwrc_set_errno(IW_ERROR_ALLOC, errno);
1751     }
1752     http->access_token_len = strlen(http->access_token);
1753   }
1754 
1755   pthread_rwlockattr_t attr;
1756   pthread_rwlockattr_init(&attr);
1757 #if defined __linux__ && (defined __USE_UNIX98 || defined __USE_XOPEN2K)
1758   pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
1759 #endif
1760   rci = pthread_rwlock_init(&db->rwl, &attr);
1761   if (rci) {
1762     rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
1763     free(db);
1764     return rc;
1765   }
1766   RCB(finish, db->mcolls = iwhmap_create_str(_mcolls_map_entry_free));
1767 
1768   IWKV_OPTS kvopts;
1769   memcpy(&kvopts, &db->opts.kv, sizeof(db->opts.kv));
1770   kvopts.wal.enabled = !db->opts.no_wal;
1771   kvopts.wal.wal_lock_interceptor = _jb_wal_lock_interceptor;
1772   kvopts.wal.wal_lock_interceptor_opaque = db;
1773 
1774   RCC(rc, finish, iwkv_open(&kvopts, &db->iwkv));
1775 
1776   db->oflags = kvopts.oflags;
1777   RCC(rc, finish, _jb_db_meta_load(db));
1778 
1779   if (db->opts.http.enabled) {
1780     // Maximum WS/HTTP API body size. Default: 64Mb, Min: 512K
1781     if (!db->opts.http.max_body_size) {
1782       db->opts.http.max_body_size = 64 * 1024 * 1024;
1783     } else if (db->opts.http.max_body_size < 512 * 1024) {
1784       db->opts.http.max_body_size = 512 * 1024;
1785     }
1786   }
1787 
1788 #ifdef JB_HTTP
1789   if (db->opts.http.enabled && !db->opts.http.blocking) {
1790     RCC(rc, finish, jbr_start(db, &db->opts, &db->jbr));
1791   }
1792 #endif
1793 
1794 finish:
1795   if (rc) {
1796     _jb_db_release(&db);
1797   } else {
1798     db->open = true;
1799     *ejdbp = db;
1800 #ifdef JB_HTTP
1801     if (db->opts.http.enabled && db->opts.http.blocking) {
1802       rc = jbr_start(db, &db->opts, &db->jbr);
1803     }
1804 #endif
1805   }
1806   return rc;
1807 }
1808 
ejdb_close(EJDB * ejdbp)1809 iwrc ejdb_close(EJDB *ejdbp) {
1810   if (!ejdbp || !*ejdbp) {
1811     return IW_ERROR_INVALID_ARGS;
1812   }
1813   EJDB db = *ejdbp;
1814   if (!__sync_bool_compare_and_swap(&db->open, 1, 0)) {
1815     iwlog_error2("Database is closed already");
1816     return IW_ERROR_INVALID_STATE;
1817   }
1818   iwrc rc = _jb_db_release(ejdbp);
1819   return rc;
1820 }
1821 
ejdb_git_revision(void)1822 const char* ejdb_git_revision(void) {
1823   return EJDB2_GIT_REVISION;
1824 }
1825 
ejdb_version_full(void)1826 const char* ejdb_version_full(void) {
1827   return EJDB2_VERSION;
1828 }
1829 
ejdb_version_major(void)1830 unsigned int ejdb_version_major(void) {
1831   return EJDB2_VERSION_MAJOR;
1832 }
1833 
ejdb_version_minor(void)1834 unsigned int ejdb_version_minor(void) {
1835   return EJDB2_VERSION_MINOR;
1836 }
1837 
ejdb_version_patch(void)1838 unsigned int ejdb_version_patch(void) {
1839   return EJDB2_VERSION_PATCH;
1840 }
1841 
_ejdb_ecodefn(locale_t locale,uint32_t ecode)1842 static const char* _ejdb_ecodefn(locale_t locale, uint32_t ecode) {
1843   if (!((ecode > _EJDB_ERROR_START) && (ecode < _EJDB_ERROR_END))) {
1844     return 0;
1845   }
1846   switch (ecode) {
1847     case EJDB_ERROR_INVALID_COLLECTION_META:
1848       return "Invalid collection metadata (EJDB_ERROR_INVALID_COLLECTION_META)";
1849     case EJDB_ERROR_INVALID_COLLECTION_INDEX_META:
1850       return "Invalid collection index metadata (EJDB_ERROR_INVALID_COLLECTION_INDEX_META)";
1851     case EJDB_ERROR_INVALID_INDEX_MODE:
1852       return "Invalid index mode specified (EJDB_ERROR_INVALID_INDEX_MODE)";
1853     case EJDB_ERROR_MISMATCHED_INDEX_UNIQUENESS_MODE:
1854       return "Index exists but mismatched uniqueness constraint (EJDB_ERROR_MISMATCHED_INDEX_UNIQUENESS_MODE)";
1855     case EJDB_ERROR_UNIQUE_INDEX_CONSTRAINT_VIOLATED:
1856       return "Unique index constraint violated (EJDB_ERROR_UNIQUE_INDEX_CONSTRAINT_VIOLATED)";
1857     case EJDB_ERROR_INVALID_COLLECTION_NAME:
1858       return "Invalid collection name (EJDB_ERROR_INVALID_COLLECTION_NAME)";
1859     case EJDB_ERROR_COLLECTION_NOT_FOUND:
1860       return "Collection not found (EJDB_ERROR_COLLECTION_NOT_FOUND)";
1861     case EJDB_ERROR_TARGET_COLLECTION_EXISTS:
1862       return "Target collection exists (EJDB_ERROR_TARGET_COLLECTION_EXISTS)";
1863     case EJDB_ERROR_PATCH_JSON_NOT_OBJECT:
1864       return "Patch JSON must be an object (map) (EJDB_ERROR_PATCH_JSON_NOT_OBJECT)";
1865   }
1866   return 0;
1867 }
1868 
ejdb_init()1869 iwrc ejdb_init() {
1870   static volatile int jb_initialized = 0;
1871   if (!__sync_bool_compare_and_swap(&jb_initialized, 0, 1)) {
1872     return 0;  // initialized already
1873   }
1874   iwrc rc = iw_init();
1875   RCRET(rc);
1876   rc = jbl_init();
1877   RCRET(rc);
1878   rc = jql_init();
1879   RCRET(rc);
1880 #ifdef JB_HTTP
1881   rc = jbr_init();
1882   RCRET(rc);
1883 #endif
1884   return iwlog_register_ecodefn(_ejdb_ecodefn);
1885 }
1886