• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "ejdb2_internal.h"
2 
jbi_consumer(struct _JBEXEC * ctx,IWKV_cursor cur,int64_t id,int64_t * step,bool * matched,iwrc err)3 iwrc jbi_consumer(struct _JBEXEC *ctx, IWKV_cursor cur, int64_t id, int64_t *step, bool *matched, iwrc err) {
4   if (!id) { // EOF scan
5     return err;
6   }
7 
8   iwrc rc;
9   struct _JBL jbl;
10   size_t vsz = 0;
11   EJDB_EXEC *ux = ctx->ux;
12   IWPOOL *pool = ux->pool;
13 
14 start:
15   {
16     if (cur) {
17       rc = iwkv_cursor_copy_val(cur, ctx->jblbuf, ctx->jblbufsz, &vsz);
18     } else {
19       IWKV_val key = {
20         .data = &id,
21         .size = sizeof(id)
22       };
23       rc = iwkv_get_copy(ctx->jbc->cdb, &key, ctx->jblbuf, ctx->jblbufsz, &vsz);
24     }
25     if (rc == IWKV_ERROR_NOTFOUND) {
26       rc = 0;
27       if (ctx->midx.idx) {
28         iwlog_error("Orphaned index entry."
29                     "\n\tCollection db: %" PRIu32
30                     "\n\tIndex db: %" PRIu32
31                     "\n\tEntry id: %" PRId64, ctx->jbc->dbid, ctx->midx.idx->dbid, id);
32       } else {
33         iwlog_error("Orphaned index entry."
34                     "\n\tCollection db: %" PRIu32
35                     "\n\tEntry id: %" PRId64, ctx->jbc->dbid, id);
36       }
37       goto finish;
38     }
39     RCGO(rc, finish);
40     if (vsz > ctx->jblbufsz) {
41       size_t nsize = MAX(vsz, ctx->jblbufsz * 2);
42       void *nbuf = realloc(ctx->jblbuf, nsize);
43       if (!nbuf) {
44         rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
45         goto finish;
46       }
47       ctx->jblbuf = nbuf;
48       ctx->jblbufsz = nsize;
49       goto start;
50     }
51   }
52 
53   rc = jbl_from_buf_keep_onstack(&jbl, ctx->jblbuf, vsz);
54   RCGO(rc, finish);
55 
56   rc = jql_matched(ux->q, &jbl, matched);
57   if (rc || !*matched || (ux->skip && (ux->skip-- > 0))) {
58     goto finish;
59   }
60   if (ctx->istep > 0) {
61     --ctx->istep;
62   } else if (ctx->istep < 0) {
63     ++ctx->istep;
64   }
65   if (!ctx->istep) {
66     JQL q = ux->q;
67     ctx->istep = 1;
68     struct JQP_AUX *aux = q->aux;
69     struct _EJDB_DOC doc = {
70       .id  = id,
71       .raw = &jbl
72     };
73     if (aux->apply || aux->apply_placeholder || aux->projection) {
74       JBL_NODE root;
75       if (!pool) {
76         pool = iwpool_create(jbl.bn.size * 2);
77         if (!pool) {
78           rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
79           goto finish;
80         }
81       }
82       rc = jbl_to_node(&jbl, &root, true, pool);
83       RCGO(rc, finish);
84       doc.node = root;
85       if (aux->qmode & JQP_QRY_APPLY_DEL) {
86         if (cur) {
87           rc = jb_cursor_del(ctx->jbc, cur, id, &jbl);
88         } else {
89           rc = jb_del(ctx->jbc, &jbl, id);
90         }
91       } else if (aux->apply || aux->apply_placeholder) {
92         struct _JBL sn = { 0 };
93         rc = jql_apply(q, root, pool);
94         RCGO(rc, finish);
95         rc = _jbl_from_node(&sn, root);
96         RCGO(rc, finish);
97         if (cur) {
98           rc = jb_cursor_set(ctx->jbc, cur, id, &sn);
99         } else {
100           rc = jb_put(ctx->jbc, &sn, id);
101         }
102         binn_free(&sn.bn);
103       }
104       RCGO(rc, finish);
105       if (aux->projection) {
106         rc = jql_project(q, root, pool, ctx);
107         RCGO(rc, finish);
108       }
109     } else if (aux->qmode & JQP_QRY_APPLY_DEL) {
110       if (cur) {
111         rc = jb_cursor_del(ctx->jbc, cur, id, &jbl);
112       } else {
113         rc = jb_del(ctx->jbc, &jbl, id);
114       }
115       RCGO(rc, finish);
116     }
117     if (!(aux->qmode & JQP_QRY_AGGREGATE)) {
118       do {
119         ctx->istep = 1;
120         rc = ux->visitor(ux, &doc, &ctx->istep);
121         RCGO(rc, finish);
122       } while (ctx->istep == -1);
123     }
124     ++ux->cnt;
125     *step = ctx->istep > 0 ? 1 : ctx->istep < 0 ? -1 : 0;
126     if (--ux->limit < 1) {
127       *step = 0;
128     }
129   } else {
130     *step = ctx->istep > 0 ? 1 : ctx->istep < 0 ? -1 : 0; // -V547
131   }
132 
133 finish:
134   if (pool && (pool != ctx->ux->pool)) {
135     iwpool_destroy(pool);
136   }
137   return rc;
138 }
139