1 #include "ejdb2_internal.h"
2
jbi_consumer(struct _JBEXEC * ctx,IWKV_cursor cur,int64_t id,int64_t * step,bool * matched,iwrc err)3 iwrc jbi_consumer(struct _JBEXEC *ctx, IWKV_cursor cur, int64_t id, int64_t *step, bool *matched, iwrc err) {
4 if (!id) { // EOF scan
5 return err;
6 }
7
8 iwrc rc;
9 struct _JBL jbl;
10 size_t vsz = 0;
11 EJDB_EXEC *ux = ctx->ux;
12 IWPOOL *pool = ux->pool;
13
14 start:
15 {
16 if (cur) {
17 rc = iwkv_cursor_copy_val(cur, ctx->jblbuf, ctx->jblbufsz, &vsz);
18 } else {
19 IWKV_val key = {
20 .data = &id,
21 .size = sizeof(id)
22 };
23 rc = iwkv_get_copy(ctx->jbc->cdb, &key, ctx->jblbuf, ctx->jblbufsz, &vsz);
24 }
25 if (rc == IWKV_ERROR_NOTFOUND) {
26 rc = 0;
27 if (ctx->midx.idx) {
28 iwlog_error("Orphaned index entry."
29 "\n\tCollection db: %" PRIu32
30 "\n\tIndex db: %" PRIu32
31 "\n\tEntry id: %" PRId64, ctx->jbc->dbid, ctx->midx.idx->dbid, id);
32 } else {
33 iwlog_error("Orphaned index entry."
34 "\n\tCollection db: %" PRIu32
35 "\n\tEntry id: %" PRId64, ctx->jbc->dbid, id);
36 }
37 goto finish;
38 }
39 RCGO(rc, finish);
40
41 if (vsz > ctx->jblbufsz) {
42 size_t nsize = MAX(vsz, ctx->jblbufsz * 2);
43 void *nbuf = realloc(ctx->jblbuf, nsize);
44 if (!nbuf) {
45 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
46 goto finish;
47 }
48 ctx->jblbuf = nbuf;
49 ctx->jblbufsz = nsize;
50 goto start;
51 }
52 }
53
54 RCC(rc, finish, jbl_from_buf_keep_onstack(&jbl, ctx->jblbuf, vsz));
55
56 rc = jql_matched(ux->q, &jbl, matched);
57 if (rc || !*matched || (ux->skip && (ux->skip-- > 0))) {
58 goto finish;
59 }
60 if (ctx->istep > 0) {
61 --ctx->istep;
62 } else if (ctx->istep < 0) {
63 ++ctx->istep;
64 }
65 if (!ctx->istep) {
66 JQL q = ux->q;
67 ctx->istep = 1;
68 struct JQP_AUX *aux = q->aux;
69 struct _EJDB_DOC doc = {
70 .id = id,
71 .raw = &jbl
72 };
73 if (aux->apply || aux->apply_placeholder || aux->projection) {
74 JBL_NODE root;
75 if (!pool) {
76 pool = iwpool_create(jbl.bn.size * 2);
77 if (!pool) {
78 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
79 goto finish;
80 }
81 }
82 RCC(rc, finish, jbl_to_node(&jbl, &root, true, pool));
83 doc.node = root;
84 if (aux->qmode & JQP_QRY_APPLY_DEL) {
85 if (cur) {
86 rc = jb_cursor_del(ctx->jbc, cur, id, &jbl);
87 } else {
88 rc = jb_del(ctx->jbc, &jbl, id);
89 }
90 } else if (aux->apply || aux->apply_placeholder) {
91 struct _JBL sn = { 0 };
92 RCC(rc, finish, jql_apply(q, root, pool));
93 RCC(rc, finish, _jbl_from_node(&sn, root));
94 if (cur) {
95 rc = jb_cursor_set(ctx->jbc, cur, id, &sn);
96 } else {
97 rc = jb_put(ctx->jbc, &sn, id);
98 }
99 binn_free(&sn.bn);
100 }
101 RCGO(rc, finish);
102 if (aux->projection) {
103 RCC(rc, finish, jql_project(q, root, pool, ctx));
104 }
105 } else if (aux->qmode & JQP_QRY_APPLY_DEL) {
106 if (cur) {
107 rc = jb_cursor_del(ctx->jbc, cur, id, &jbl);
108 } else {
109 rc = jb_del(ctx->jbc, &jbl, id);
110 }
111 RCGO(rc, finish);
112 }
113 if (!(aux->qmode & JQP_QRY_AGGREGATE)) {
114 do {
115 ctx->istep = 1;
116 RCC(rc, finish, ux->visitor(ux, &doc, &ctx->istep));
117 } while (ctx->istep == -1);
118 }
119 ++ux->cnt;
120 *step = ctx->istep > 0 ? 1 : ctx->istep < 0 ? -1 : 0;
121 if (--ux->limit < 1) {
122 *step = 0;
123 }
124 } else {
125 *step = ctx->istep > 0 ? 1 : ctx->istep < 0 ? -1 : 0; // -V547
126 }
127
128 finish:
129 if (pool && (pool != ctx->ux->pool)) {
130 iwpool_destroy(pool);
131 }
132 return rc;
133 }
134