• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "ejdb2_internal.h"
2 
jbi_consumer(struct _JBEXEC * ctx,IWKV_cursor cur,int64_t id,int64_t * step,bool * matched,iwrc err)3 iwrc jbi_consumer(struct _JBEXEC *ctx, IWKV_cursor cur, int64_t id, int64_t *step, bool *matched, iwrc err) {
4   if (!id) { // EOF scan
5     return err;
6   }
7 
8   iwrc rc;
9   struct _JBL jbl;
10   size_t vsz = 0;
11   EJDB_EXEC *ux = ctx->ux;
12   IWPOOL *pool = ux->pool;
13 
14 start:
15   {
16     if (cur) {
17       rc = iwkv_cursor_copy_val(cur, ctx->jblbuf, ctx->jblbufsz, &vsz);
18     } else {
19       IWKV_val key = {
20         .data = &id,
21         .size = sizeof(id)
22       };
23       rc = iwkv_get_copy(ctx->jbc->cdb, &key, ctx->jblbuf, ctx->jblbufsz, &vsz);
24     }
25     if (rc == IWKV_ERROR_NOTFOUND) {
26       rc = 0;
27       if (ctx->midx.idx) {
28         iwlog_error("Orphaned index entry."
29                     "\n\tCollection db: %" PRIu32
30                     "\n\tIndex db: %" PRIu32
31                     "\n\tEntry id: %" PRId64, ctx->jbc->dbid, ctx->midx.idx->dbid, id);
32       } else {
33         iwlog_error("Orphaned index entry."
34                     "\n\tCollection db: %" PRIu32
35                     "\n\tEntry id: %" PRId64, ctx->jbc->dbid, id);
36       }
37       goto finish;
38     }
39     RCGO(rc, finish);
40 
41     if (vsz > ctx->jblbufsz) {
42       size_t nsize = MAX(vsz, ctx->jblbufsz * 2);
43       void *nbuf = realloc(ctx->jblbuf, nsize);
44       if (!nbuf) {
45         rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
46         goto finish;
47       }
48       ctx->jblbuf = nbuf;
49       ctx->jblbufsz = nsize;
50       goto start;
51     }
52   }
53 
54   RCC(rc, finish, jbl_from_buf_keep_onstack(&jbl, ctx->jblbuf, vsz));
55 
56   rc = jql_matched(ux->q, &jbl, matched);
57   if (rc || !*matched || (ux->skip && (ux->skip-- > 0))) {
58     goto finish;
59   }
60   if (ctx->istep > 0) {
61     --ctx->istep;
62   } else if (ctx->istep < 0) {
63     ++ctx->istep;
64   }
65   if (!ctx->istep) {
66     JQL q = ux->q;
67     ctx->istep = 1;
68     struct JQP_AUX *aux = q->aux;
69     struct _EJDB_DOC doc = {
70       .id  = id,
71       .raw = &jbl
72     };
73     if (aux->apply || aux->apply_placeholder || aux->projection) {
74       JBL_NODE root;
75       if (!pool) {
76         pool = iwpool_create(jbl.bn.size * 2);
77         if (!pool) {
78           rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
79           goto finish;
80         }
81       }
82       RCC(rc, finish, jbl_to_node(&jbl, &root, true, pool));
83       doc.node = root;
84       if (aux->qmode & JQP_QRY_APPLY_DEL) {
85         if (cur) {
86           rc = jb_cursor_del(ctx->jbc, cur, id, &jbl);
87         } else {
88           rc = jb_del(ctx->jbc, &jbl, id);
89         }
90       } else if (aux->apply || aux->apply_placeholder) {
91         struct _JBL sn = { 0 };
92         RCC(rc, finish, jql_apply(q, root, pool));
93         RCC(rc, finish, _jbl_from_node(&sn, root));
94         if (cur) {
95           rc = jb_cursor_set(ctx->jbc, cur, id, &sn);
96         } else {
97           rc = jb_put(ctx->jbc, &sn, id);
98         }
99         binn_free(&sn.bn);
100       }
101       RCGO(rc, finish);
102       if (aux->projection) {
103         RCC(rc, finish, jql_project(q, root, pool, ctx));
104       }
105     } else if (aux->qmode & JQP_QRY_APPLY_DEL) {
106       if (cur) {
107         rc = jb_cursor_del(ctx->jbc, cur, id, &jbl);
108       } else {
109         rc = jb_del(ctx->jbc, &jbl, id);
110       }
111       RCGO(rc, finish);
112     }
113     if (!(aux->qmode & JQP_QRY_AGGREGATE)) {
114       do {
115         ctx->istep = 1;
116         RCC(rc, finish, ux->visitor(ux, &doc, &ctx->istep));
117       } while (ctx->istep == -1);
118     }
119     ++ux->cnt;
120     *step = ctx->istep > 0 ? 1 : ctx->istep < 0 ? -1 : 0;
121     if (--ux->limit < 1) {
122       *step = 0;
123     }
124   } else {
125     *step = ctx->istep > 0 ? 1 : ctx->istep < 0 ? -1 : 0; // -V547
126   }
127 
128 finish:
129   if (pool && (pool != ctx->ux->pool)) {
130     iwpool_destroy(pool);
131   }
132   return rc;
133 }
134