1 #include "ejdb2_internal.h"
2
jbi_consumer(struct _JBEXEC * ctx,IWKV_cursor cur,int64_t id,int64_t * step,bool * matched,iwrc err)3 iwrc jbi_consumer(struct _JBEXEC *ctx, IWKV_cursor cur, int64_t id, int64_t *step, bool *matched, iwrc err) {
4 if (!id) { // EOF scan
5 return err;
6 }
7
8 iwrc rc;
9 struct _JBL jbl;
10 size_t vsz = 0;
11 EJDB_EXEC *ux = ctx->ux;
12 IWPOOL *pool = ux->pool;
13
14 start:
15 {
16 if (cur) {
17 rc = iwkv_cursor_copy_val(cur, ctx->jblbuf, ctx->jblbufsz, &vsz);
18 } else {
19 IWKV_val key = {
20 .data = &id,
21 .size = sizeof(id)
22 };
23 rc = iwkv_get_copy(ctx->jbc->cdb, &key, ctx->jblbuf, ctx->jblbufsz, &vsz);
24 }
25 if (rc == IWKV_ERROR_NOTFOUND) {
26 rc = 0;
27 if (ctx->midx.idx) {
28 iwlog_error("Orphaned index entry."
29 "\n\tCollection db: %" PRIu32
30 "\n\tIndex db: %" PRIu32
31 "\n\tEntry id: %" PRId64, ctx->jbc->dbid, ctx->midx.idx->dbid, id);
32 } else {
33 iwlog_error("Orphaned index entry."
34 "\n\tCollection db: %" PRIu32
35 "\n\tEntry id: %" PRId64, ctx->jbc->dbid, id);
36 }
37 goto finish;
38 }
39 RCGO(rc, finish);
40 if (vsz > ctx->jblbufsz) {
41 size_t nsize = MAX(vsz, ctx->jblbufsz * 2);
42 void *nbuf = realloc(ctx->jblbuf, nsize);
43 if (!nbuf) {
44 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
45 goto finish;
46 }
47 ctx->jblbuf = nbuf;
48 ctx->jblbufsz = nsize;
49 goto start;
50 }
51 }
52
53 rc = jbl_from_buf_keep_onstack(&jbl, ctx->jblbuf, vsz);
54 RCGO(rc, finish);
55
56 rc = jql_matched(ux->q, &jbl, matched);
57 if (rc || !*matched || (ux->skip && (ux->skip-- > 0))) {
58 goto finish;
59 }
60 if (ctx->istep > 0) {
61 --ctx->istep;
62 } else if (ctx->istep < 0) {
63 ++ctx->istep;
64 }
65 if (!ctx->istep) {
66 JQL q = ux->q;
67 ctx->istep = 1;
68 struct JQP_AUX *aux = q->aux;
69 struct _EJDB_DOC doc = {
70 .id = id,
71 .raw = &jbl
72 };
73 if (aux->apply || aux->apply_placeholder || aux->projection) {
74 JBL_NODE root;
75 if (!pool) {
76 pool = iwpool_create(jbl.bn.size * 2);
77 if (!pool) {
78 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
79 goto finish;
80 }
81 }
82 rc = jbl_to_node(&jbl, &root, true, pool);
83 RCGO(rc, finish);
84 doc.node = root;
85 if (aux->qmode & JQP_QRY_APPLY_DEL) {
86 if (cur) {
87 rc = jb_cursor_del(ctx->jbc, cur, id, &jbl);
88 } else {
89 rc = jb_del(ctx->jbc, &jbl, id);
90 }
91 } else if (aux->apply || aux->apply_placeholder) {
92 struct _JBL sn = { 0 };
93 rc = jql_apply(q, root, pool);
94 RCGO(rc, finish);
95 rc = _jbl_from_node(&sn, root);
96 RCGO(rc, finish);
97 if (cur) {
98 rc = jb_cursor_set(ctx->jbc, cur, id, &sn);
99 } else {
100 rc = jb_put(ctx->jbc, &sn, id);
101 }
102 binn_free(&sn.bn);
103 }
104 RCGO(rc, finish);
105 if (aux->projection) {
106 rc = jql_project(q, root, pool, ctx);
107 RCGO(rc, finish);
108 }
109 } else if (aux->qmode & JQP_QRY_APPLY_DEL) {
110 if (cur) {
111 rc = jb_cursor_del(ctx->jbc, cur, id, &jbl);
112 } else {
113 rc = jb_del(ctx->jbc, &jbl, id);
114 }
115 RCGO(rc, finish);
116 }
117 if (!(aux->qmode & JQP_QRY_AGGREGATE)) {
118 do {
119 ctx->istep = 1;
120 rc = ux->visitor(ux, &doc, &ctx->istep);
121 RCGO(rc, finish);
122 } while (ctx->istep == -1);
123 }
124 ++ux->cnt;
125 *step = ctx->istep > 0 ? 1 : ctx->istep < 0 ? -1 : 0;
126 if (--ux->limit < 1) {
127 *step = 0;
128 }
129 } else {
130 *step = ctx->istep > 0 ? 1 : ctx->istep < 0 ? -1 : 0; // -V547
131 }
132
133 finish:
134 if (pool && (pool != ctx->ux->pool)) {
135 iwpool_destroy(pool);
136 }
137 return rc;
138 }
139