• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "ejdb2_internal.h"
2 #include "sort_r.h"
3 
_jbi_scan_sorter_release(struct _JBEXEC * ctx)4 static void _jbi_scan_sorter_release(struct _JBEXEC *ctx) {
5   struct _JBSSC *ssc = &ctx->ssc;
6   free(ssc->refs);
7   if (ssc->sof_active) {
8     ssc->sof.close(&ssc->sof);
9   } else {
10     free(ssc->docs);
11   }
12   memset(ssc, 0, sizeof(*ssc));
13 }
14 
_jbi_scan_sorter_cmp(const void * o1,const void * o2,void * op)15 static int _jbi_scan_sorter_cmp(const void *o1, const void *o2, void *op) {
16   int rv = 0;
17   uint32_t r1, r2;
18   struct _JBL d1, d2;
19   struct _JBEXEC *ctx = op;
20   struct _JBSSC *ssc = &ctx->ssc;
21   struct JQP_AUX *aux = ctx->ux->q->aux;
22   uint8_t *p1, *p2;
23   assert(aux->orderby_num > 0);
24 
25   memcpy(&r1, o1, sizeof(r1));
26   memcpy(&r2, o2, sizeof(r2));
27 
28   p1 = ssc->docs + r1 + sizeof(uint64_t) /*id*/;
29   p2 = ssc->docs + r2 + sizeof(uint64_t) /*id*/;
30 
31   iwrc rc = jbl_from_buf_keep_onstack2(&d1, p1);
32   RCGO(rc, finish);
33   rc = jbl_from_buf_keep_onstack2(&d2, p2);
34   RCGO(rc, finish);
35 
36   for (int i = 0; i < aux->orderby_num; ++i) {
37     struct _JBL v1 = { 0 };
38     struct _JBL v2 = { 0 };
39     JBL_PTR ptr = aux->orderby_ptrs[i];
40     int desc = (ptr->op & 1) ? -1 : 1; // If `-1` do desc sorting
41     _jbl_at(&d1, ptr, &v1);
42     _jbl_at(&d2, ptr, &v2);
43     rv = _jbl_cmp_atomic_values(&v1, &v2) * desc;
44     if (rv) {
45       break;
46     }
47   }
48 
49 finish:
50   if (rc) {
51     ssc->rc = rc;
52     longjmp(ssc->fatal_jmp, 1);
53   }
54   return rv;
55 }
56 
_jbi_scan_sorter_apply(IWPOOL * pool,struct _JBEXEC * ctx,JQL q,struct _EJDB_DOC * doc)57 static iwrc _jbi_scan_sorter_apply(IWPOOL *pool, struct _JBEXEC *ctx, JQL q, struct _EJDB_DOC *doc) {
58   JBL_NODE root;
59   JBL jbl = doc->raw;
60   struct JQP_AUX *aux = q->aux;
61   iwrc rc = jbl_to_node(jbl, &root, true, pool);
62   RCRET(rc);
63   doc->node = root;
64   if (aux->qmode & JQP_QRY_APPLY_DEL) {
65     rc = jb_del(ctx->jbc, jbl, doc->id);
66     RCRET(rc);
67   } else if (aux->apply || aux->apply_placeholder) {
68     struct _JBL sn = { 0 };
69     rc = jql_apply(q, root, pool);
70     RCRET(rc);
71     rc = _jbl_from_node(&sn, root);
72     RCRET(rc);
73     rc = jb_put(ctx->jbc, &sn, doc->id);
74     binn_free(&sn.bn);
75     RCRET(rc);
76   }
77   if (aux->projection) {
78     rc = jql_project(q, root, ctx->ux->pool, ctx);
79   }
80   return rc;
81 }
82 
_jbi_scan_sorter_do(struct _JBEXEC * ctx)83 static iwrc _jbi_scan_sorter_do(struct _JBEXEC *ctx) {
84   iwrc rc = 0;
85   int64_t step = 1, id;
86   struct _JBL jbl;
87   EJDB_EXEC *ux = ctx->ux;
88   struct _JBSSC *ssc = &ctx->ssc;
89   uint32_t rnum = ssc->refs_num;
90   struct JQP_AUX *aux = ux->q->aux;
91   IWPOOL *pool = ux->pool;
92 
93   if (rnum) {
94     if (setjmp(ssc->fatal_jmp)) { // Init error jump
95       rc = ssc->rc;
96       goto finish;
97     }
98     if (!ssc->docs) {
99       size_t sp;
100       rc = ssc->sof.probe_mmap(&ssc->sof, 0, &ssc->docs, &sp);
101       RCGO(rc, finish);
102     }
103 
104     sort_r(ssc->refs, rnum, sizeof(ssc->refs[0]), _jbi_scan_sorter_cmp, ctx);
105   }
106 
107   for (int64_t i = ux->skip; step && i < rnum && i >= 0; ) {
108     uint8_t *rp = ssc->docs + ssc->refs[i];
109     memcpy(&id, rp, sizeof(id));
110     rp += sizeof(id);
111     rc = jbl_from_buf_keep_onstack2(&jbl, rp);
112     RCGO(rc, finish);
113     struct _EJDB_DOC doc = {
114       .id  = id,
115       .raw = &jbl
116     };
117     if (aux->apply || aux->projection) {
118       if (!pool) {
119         pool = iwpool_create(jbl.bn.size * 2);
120         if (!pool) {
121           rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
122           goto finish;
123         }
124       }
125       rc = _jbi_scan_sorter_apply(pool, ctx, ux->q, &doc);
126       RCGO(rc, finish);
127     } else if (aux->qmode & JQP_QRY_APPLY_DEL) {
128       rc = jb_del(ctx->jbc, &jbl, id);
129       RCGO(rc, finish);
130     }
131     if (!(aux->qmode & JQP_QRY_AGGREGATE)) {
132       do {
133         step = 1;
134         rc = ux->visitor(ux, &doc, &step);
135         RCGO(rc, finish);
136       } while (step == -1);
137     }
138     ++ux->cnt;
139     i += step;
140     if (pool != ux->pool) {
141       iwpool_destroy(pool);
142       pool = 0;
143     }
144     if (--ux->limit < 1) {
145       break;
146     }
147   }
148 
149 finish:
150   if (pool != ux->pool) {
151     iwpool_destroy(pool);
152   }
153   _jbi_scan_sorter_release(ctx);
154   return rc;
155 }
156 
_jbi_scan_sorter_init(struct _JBSSC * ssc,off_t initial_size)157 static iwrc _jbi_scan_sorter_init(struct _JBSSC *ssc, off_t initial_size) {
158   IWFS_EXT_OPTS opts = {
159     .initial_size = initial_size,
160     .rspolicy     = iw_exfile_szpolicy_fibo,
161     .file         = {
162       .path       = "jb-",
163       .omode      = IWFS_OTMP | IWFS_OUNLINK
164     }
165   };
166   iwrc rc = iwfs_exfile_open(&ssc->sof, &opts);
167   RCRET(rc);
168   rc = ssc->sof.add_mmap(&ssc->sof, 0, SIZE_T_MAX, 0);
169   if (rc) {
170     ssc->sof.close(&ssc->sof);
171   }
172   return rc;
173 }
174 
jbi_sorter_consumer(struct _JBEXEC * ctx,IWKV_cursor cur,int64_t id,int64_t * step,bool * matched,iwrc err)175 iwrc jbi_sorter_consumer(
176   struct _JBEXEC *ctx, IWKV_cursor cur, int64_t id,
177   int64_t *step, bool *matched, iwrc err) {
178   if (!id) {
179     // End of scan
180     if (err) {
181       // In the case of error do not perform sorting just release resources
182       _jbi_scan_sorter_release(ctx);
183       return err;
184     } else {
185       return _jbi_scan_sorter_do(ctx);
186     }
187   }
188 
189   iwrc rc;
190   size_t vsz = 0;
191   struct _JBL jbl;
192   struct _JBSSC *ssc = &ctx->ssc;
193   EJDB db = ctx->jbc->db;
194   IWFS_EXT *sof = &ssc->sof;
195 
196 start:
197   {
198     if (cur) {
199       rc = iwkv_cursor_copy_val(cur, ctx->jblbuf + sizeof(id), ctx->jblbufsz - sizeof(id), &vsz);
200     } else {
201       IWKV_val key = {
202         .data = &id,
203         .size = sizeof(id)
204       };
205       rc = iwkv_get_copy(ctx->jbc->cdb, &key, ctx->jblbuf + sizeof(id), ctx->jblbufsz - sizeof(id), &vsz);
206     }
207     if (rc == IWKV_ERROR_NOTFOUND) {
208       rc = 0;
209     } else {
210       RCRET(rc);
211     }
212     if (vsz + sizeof(id) > ctx->jblbufsz) {
213       size_t nsize = MAX(vsz + sizeof(id), ctx->jblbufsz * 2);
214       void *nbuf = realloc(ctx->jblbuf, nsize);
215       if (!nbuf) {
216         return iwrc_set_errno(IW_ERROR_ALLOC, errno);
217       }
218       ctx->jblbuf = nbuf;
219       ctx->jblbufsz = nsize;
220       goto start;
221     }
222   }
223 
224   rc = jbl_from_buf_keep_onstack(&jbl, ctx->jblbuf + sizeof(id), vsz);
225   RCRET(rc);
226 
227   rc = jql_matched(ctx->ux->q, &jbl, matched);
228   if (!*matched) {
229     return 0;
230   }
231 
232   if (!ssc->refs) {
233     ssc->refs_asz = 64 * 1024; // 64K
234     ssc->refs = malloc(db->opts.document_buffer_sz);
235     if (!ssc->refs) {
236       return iwrc_set_errno(IW_ERROR_ALLOC, errno);
237     }
238     ssc->docs_asz = 128 * 1024; // 128K
239     ssc->docs = malloc(ssc->docs_asz);
240     if (!ssc->docs) {
241       return iwrc_set_errno(IW_ERROR_ALLOC, errno);
242     }
243   } else if (ssc->refs_asz <= (ssc->refs_num + 1) * sizeof(ssc->refs[0])) {
244     ssc->refs_asz *= 2;
245     uint32_t *nrefs = realloc(ssc->refs, ssc->refs_asz);
246     if (!nrefs) {
247       return iwrc_set_errno(IW_ERROR_ALLOC, errno);
248     }
249     ssc->refs = nrefs;
250   }
251 
252   vsz += sizeof(id);
253   memcpy(ctx->jblbuf, &id, sizeof(id));
254 
255 start2:
256   {
257     if (ssc->docs) {
258       uint32_t rsize = ssc->docs_npos + vsz;
259       if (rsize > ssc->docs_asz) {
260         ssc->docs_asz = MIN(rsize * 2, db->opts.sort_buffer_sz);
261         if (rsize > ssc->docs_asz) {
262           size_t sz;
263           rc = _jbi_scan_sorter_init(ssc, (ssc->docs_npos + vsz) * 2);
264           RCRET(rc);
265           rc = sof->write(sof, 0, ssc->docs, ssc->docs_npos, &sz);
266           RCRET(rc);
267           free(ssc->docs);
268           ssc->docs = 0;
269           ssc->sof_active = true;
270           goto start2;
271         } else {
272           void *nbuf = realloc(ssc->docs, ssc->docs_asz);
273           if (!nbuf) {
274             return iwrc_set_errno(IW_ERROR_ALLOC, errno);
275           }
276           ssc->docs = nbuf;
277         }
278       }
279       memcpy(ssc->docs + ssc->docs_npos, ctx->jblbuf, vsz);
280     } else {
281       size_t sz;
282       rc = sof->write(sof, ssc->docs_npos, ctx->jblbuf, vsz, &sz);
283       RCRET(rc);
284     }
285     ssc->refs[ssc->refs_num++] = ssc->docs_npos;
286     ssc->docs_npos += vsz;
287   }
288 
289   return rc;
290 }
291