• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "ejdb2_internal.h"
2 #include "sort_r.h"
3 
_jbi_scan_sorter_release(struct _JBEXEC * ctx)4 static void _jbi_scan_sorter_release(struct _JBEXEC *ctx) {
5   struct _JBSSC *ssc = &ctx->ssc;
6   free(ssc->refs);
7   if (ssc->sof_active) {
8     ssc->sof.close(&ssc->sof);
9   } else {
10     free(ssc->docs);
11   }
12   memset(ssc, 0, sizeof(*ssc));
13 }
14 
_jbi_scan_sorter_cmp(const void * o1,const void * o2,void * op)15 static int _jbi_scan_sorter_cmp(const void *o1, const void *o2, void *op) {
16   int rv = 0;
17   iwrc rc;
18   uint32_t r1, r2;
19   struct _JBL d1, d2;
20   struct _JBEXEC *ctx = op;
21   struct _JBSSC *ssc = &ctx->ssc;
22   struct JQP_AUX *aux = ctx->ux->q->aux;
23   uint8_t *p1, *p2;
24   assert(aux->orderby_num > 0);
25 
26   memcpy(&r1, o1, sizeof(r1));
27   memcpy(&r2, o2, sizeof(r2));
28 
29   p1 = ssc->docs + r1 + sizeof(uint64_t) /*id*/;
30   p2 = ssc->docs + r2 + sizeof(uint64_t) /*id*/;
31 
32   RCC(rc, finish, jbl_from_buf_keep_onstack2(&d1, p1));
33   RCC(rc, finish, jbl_from_buf_keep_onstack2(&d2, p2));
34 
35   for (int i = 0; i < aux->orderby_num; ++i) {
36     struct _JBL v1 = { 0 };
37     struct _JBL v2 = { 0 };
38     JBL_PTR ptr = aux->orderby_ptrs[i];
39     int desc = (ptr->op & 1) ? -1 : 1; // If `-1` do desc sorting
40     _jbl_at(&d1, ptr, &v1);
41     _jbl_at(&d2, ptr, &v2);
42     rv = _jbl_cmp_atomic_values(&v1, &v2) * desc;
43     if (rv) {
44       break;
45     }
46   }
47 
48 finish:
49   if (rc) {
50     ssc->rc = rc;
51     longjmp(ssc->fatal_jmp, 1);
52   }
53   return rv;
54 }
55 
_jbi_scan_sorter_apply(IWPOOL * pool,struct _JBEXEC * ctx,JQL q,struct _EJDB_DOC * doc)56 static iwrc _jbi_scan_sorter_apply(IWPOOL *pool, struct _JBEXEC *ctx, JQL q, struct _EJDB_DOC *doc) {
57   JBL_NODE root;
58   JBL jbl = doc->raw;
59   struct JQP_AUX *aux = q->aux;
60   iwrc rc = jbl_to_node(jbl, &root, true, pool);
61   RCRET(rc);
62   doc->node = root;
63   if (aux->qmode & JQP_QRY_APPLY_DEL) {
64     rc = jb_del(ctx->jbc, jbl, doc->id);
65     RCRET(rc);
66   } else if (aux->apply || aux->apply_placeholder) {
67     struct _JBL sn = { 0 };
68     rc = jql_apply(q, root, pool);
69     RCRET(rc);
70     rc = _jbl_from_node(&sn, root);
71     RCRET(rc);
72     rc = jb_put(ctx->jbc, &sn, doc->id);
73     binn_free(&sn.bn);
74     RCRET(rc);
75   }
76   if (aux->projection) {
77     rc = jql_project(q, root, ctx->ux->pool, ctx);
78   }
79   return rc;
80 }
81 
_jbi_scan_sorter_do(struct _JBEXEC * ctx)82 static iwrc _jbi_scan_sorter_do(struct _JBEXEC *ctx) {
83   iwrc rc = 0;
84   int64_t step = 1, id;
85   struct _JBL jbl;
86   EJDB_EXEC *ux = ctx->ux;
87   struct _JBSSC *ssc = &ctx->ssc;
88   uint32_t rnum = ssc->refs_num;
89   struct JQP_AUX *aux = ux->q->aux;
90   IWPOOL *pool = ux->pool;
91 
92   if (rnum) {
93     if (setjmp(ssc->fatal_jmp)) { // Init error jump
94       rc = ssc->rc;
95       goto finish;
96     }
97     if (!ssc->docs) {
98       size_t sp;
99       RCC(rc, finish, ssc->sof.probe_mmap(&ssc->sof, 0, &ssc->docs, &sp));
100     }
101 
102     sort_r(ssc->refs, rnum, sizeof(ssc->refs[0]), _jbi_scan_sorter_cmp, ctx);
103   }
104 
105   for (int64_t i = ux->skip; step && i < rnum && i >= 0; ) {
106     uint8_t *rp = ssc->docs + ssc->refs[i];
107     memcpy(&id, rp, sizeof(id));
108     rp += sizeof(id);
109     RCC(rc, finish, jbl_from_buf_keep_onstack2(&jbl, rp));
110 
111     struct _EJDB_DOC doc = {
112       .id  = id,
113       .raw = &jbl
114     };
115 
116     if (aux->apply || aux->projection) {
117       if (!pool) {
118         pool = iwpool_create(jbl.bn.size * 2);
119         if (!pool) {
120           rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
121           goto finish;
122         }
123       }
124       RCC(rc, finish, _jbi_scan_sorter_apply(pool, ctx, ux->q, &doc));
125     } else if (aux->qmode & JQP_QRY_APPLY_DEL) {
126       RCC(rc, finish, jb_del(ctx->jbc, &jbl, id));
127     }
128 
129     if (!(aux->qmode & JQP_QRY_AGGREGATE)) {
130       do {
131         step = 1;
132         RCC(rc, finish, ux->visitor(ux, &doc, &step));
133       } while (step == -1);
134     }
135 
136     ++ux->cnt;
137     i += step;
138     if (pool != ux->pool) {
139       iwpool_destroy(pool);
140       pool = 0;
141     }
142     if (--ux->limit < 1) {
143       break;
144     }
145   }
146 
147 finish:
148   if (pool != ux->pool) {
149     iwpool_destroy(pool);
150   }
151   _jbi_scan_sorter_release(ctx);
152   return rc;
153 }
154 
_jbi_scan_sorter_init(struct _JBSSC * ssc,off_t initial_size)155 static iwrc _jbi_scan_sorter_init(struct _JBSSC *ssc, off_t initial_size) {
156   IWFS_EXT_OPTS opts = {
157     .initial_size = initial_size,
158     .rspolicy     = iw_exfile_szpolicy_fibo,
159     .file         = {
160       .path       = "jb-",
161       .omode      = IWFS_OTMP | IWFS_OUNLINK
162     }
163   };
164   iwrc rc = iwfs_exfile_open(&ssc->sof, &opts);
165   RCRET(rc);
166   rc = ssc->sof.add_mmap(&ssc->sof, 0, SIZE_T_MAX, 0);
167   if (rc) {
168     ssc->sof.close(&ssc->sof);
169   }
170   return rc;
171 }
172 
jbi_sorter_consumer(struct _JBEXEC * ctx,IWKV_cursor cur,int64_t id,int64_t * step,bool * matched,iwrc err)173 iwrc jbi_sorter_consumer(
174   struct _JBEXEC *ctx, IWKV_cursor cur, int64_t id,
175   int64_t *step, bool *matched, iwrc err
176   ) {
177   if (!id) {
178     // End of scan
179     if (err) {
180       // In the case of error do not perform sorting just release resources
181       _jbi_scan_sorter_release(ctx);
182       return err;
183     } else {
184       return _jbi_scan_sorter_do(ctx);
185     }
186   }
187 
188   iwrc rc;
189   size_t vsz = 0;
190   struct _JBL jbl;
191   struct _JBSSC *ssc = &ctx->ssc;
192   EJDB db = ctx->jbc->db;
193   IWFS_EXT *sof = &ssc->sof;
194 
195 start:
196   {
197     if (cur) {
198       rc = iwkv_cursor_copy_val(cur, ctx->jblbuf + sizeof(id), ctx->jblbufsz - sizeof(id), &vsz);
199     } else {
200       IWKV_val key = {
201         .data = &id,
202         .size = sizeof(id)
203       };
204       rc = iwkv_get_copy(ctx->jbc->cdb, &key, ctx->jblbuf + sizeof(id), ctx->jblbufsz - sizeof(id), &vsz);
205     }
206     if (rc == IWKV_ERROR_NOTFOUND) {
207       rc = 0;
208     } else {
209       RCRET(rc);
210     }
211     if (vsz + sizeof(id) > ctx->jblbufsz) {
212       size_t nsize = MAX(vsz + sizeof(id), ctx->jblbufsz * 2);
213       void *nbuf = realloc(ctx->jblbuf, nsize);
214       if (!nbuf) {
215         return iwrc_set_errno(IW_ERROR_ALLOC, errno);
216       }
217       ctx->jblbuf = nbuf;
218       ctx->jblbufsz = nsize;
219       goto start;
220     }
221   }
222 
223   rc = jbl_from_buf_keep_onstack(&jbl, ctx->jblbuf + sizeof(id), vsz);
224   RCRET(rc);
225 
226   rc = jql_matched(ctx->ux->q, &jbl, matched);
227   if (!*matched) {
228     return 0;
229   }
230 
231   if (!ssc->refs) {
232     ssc->refs_asz = 64 * 1024; // 64K
233     ssc->refs = malloc(db->opts.document_buffer_sz);
234     if (!ssc->refs) {
235       return iwrc_set_errno(IW_ERROR_ALLOC, errno);
236     }
237     ssc->docs_asz = 128 * 1024; // 128K
238     ssc->docs = malloc(ssc->docs_asz);
239     if (!ssc->docs) {
240       return iwrc_set_errno(IW_ERROR_ALLOC, errno);
241     }
242   } else if (ssc->refs_asz <= (ssc->refs_num + 1) * sizeof(ssc->refs[0])) {
243     ssc->refs_asz *= 2;
244     uint32_t *nrefs = realloc(ssc->refs, ssc->refs_asz);
245     if (!nrefs) {
246       return iwrc_set_errno(IW_ERROR_ALLOC, errno);
247     }
248     ssc->refs = nrefs;
249   }
250 
251   vsz += sizeof(id);
252   memcpy(ctx->jblbuf, &id, sizeof(id));
253 
254 start2:
255   {
256     if (ssc->docs) {
257       uint32_t rsize = ssc->docs_npos + vsz;
258       if (rsize > ssc->docs_asz) {
259         ssc->docs_asz = MIN(rsize * 2, db->opts.sort_buffer_sz);
260         if (rsize > ssc->docs_asz) {
261           size_t sz;
262           rc = _jbi_scan_sorter_init(ssc, (ssc->docs_npos + vsz) * 2);
263           RCRET(rc);
264           rc = sof->write(sof, 0, ssc->docs, ssc->docs_npos, &sz);
265           RCRET(rc);
266           free(ssc->docs);
267           ssc->docs = 0;
268           ssc->sof_active = true;
269           goto start2;
270         } else {
271           void *nbuf = realloc(ssc->docs, ssc->docs_asz);
272           if (!nbuf) {
273             return iwrc_set_errno(IW_ERROR_ALLOC, errno);
274           }
275           ssc->docs = nbuf;
276         }
277       }
278       memcpy(ssc->docs + ssc->docs_npos, ctx->jblbuf, vsz);
279     } else {
280       size_t sz;
281       rc = sof->write(sof, ssc->docs_npos, ctx->jblbuf, vsz, &sz);
282       RCRET(rc);
283     }
284     ssc->refs[ssc->refs_num++] = ssc->docs_npos;
285     ssc->docs_npos += vsz;
286   }
287 
288   return rc;
289 }
290