1 #include "ejdb2_internal.h"
2 #include "sort_r.h"
3
_jbi_scan_sorter_release(struct _JBEXEC * ctx)4 static void _jbi_scan_sorter_release(struct _JBEXEC *ctx) {
5 struct _JBSSC *ssc = &ctx->ssc;
6 free(ssc->refs);
7 if (ssc->sof_active) {
8 ssc->sof.close(&ssc->sof);
9 } else {
10 free(ssc->docs);
11 }
12 memset(ssc, 0, sizeof(*ssc));
13 }
14
_jbi_scan_sorter_cmp(const void * o1,const void * o2,void * op)15 static int _jbi_scan_sorter_cmp(const void *o1, const void *o2, void *op) {
16 int rv = 0;
17 uint32_t r1, r2;
18 struct _JBL d1, d2;
19 struct _JBEXEC *ctx = op;
20 struct _JBSSC *ssc = &ctx->ssc;
21 struct JQP_AUX *aux = ctx->ux->q->aux;
22 uint8_t *p1, *p2;
23 assert(aux->orderby_num > 0);
24
25 memcpy(&r1, o1, sizeof(r1));
26 memcpy(&r2, o2, sizeof(r2));
27
28 p1 = ssc->docs + r1 + sizeof(uint64_t) /*id*/;
29 p2 = ssc->docs + r2 + sizeof(uint64_t) /*id*/;
30
31 iwrc rc = jbl_from_buf_keep_onstack2(&d1, p1);
32 RCGO(rc, finish);
33 rc = jbl_from_buf_keep_onstack2(&d2, p2);
34 RCGO(rc, finish);
35
36 for (int i = 0; i < aux->orderby_num; ++i) {
37 struct _JBL v1 = { 0 };
38 struct _JBL v2 = { 0 };
39 JBL_PTR ptr = aux->orderby_ptrs[i];
40 int desc = (ptr->op & 1) ? -1 : 1; // If `-1` do desc sorting
41 _jbl_at(&d1, ptr, &v1);
42 _jbl_at(&d2, ptr, &v2);
43 rv = _jbl_cmp_atomic_values(&v1, &v2) * desc;
44 if (rv) {
45 break;
46 }
47 }
48
49 finish:
50 if (rc) {
51 ssc->rc = rc;
52 longjmp(ssc->fatal_jmp, 1);
53 }
54 return rv;
55 }
56
_jbi_scan_sorter_apply(IWPOOL * pool,struct _JBEXEC * ctx,JQL q,struct _EJDB_DOC * doc)57 static iwrc _jbi_scan_sorter_apply(IWPOOL *pool, struct _JBEXEC *ctx, JQL q, struct _EJDB_DOC *doc) {
58 JBL_NODE root;
59 JBL jbl = doc->raw;
60 struct JQP_AUX *aux = q->aux;
61 iwrc rc = jbl_to_node(jbl, &root, true, pool);
62 RCRET(rc);
63 doc->node = root;
64 if (aux->qmode & JQP_QRY_APPLY_DEL) {
65 rc = jb_del(ctx->jbc, jbl, doc->id);
66 RCRET(rc);
67 } else if (aux->apply || aux->apply_placeholder) {
68 struct _JBL sn = { 0 };
69 rc = jql_apply(q, root, pool);
70 RCRET(rc);
71 rc = _jbl_from_node(&sn, root);
72 RCRET(rc);
73 rc = jb_put(ctx->jbc, &sn, doc->id);
74 binn_free(&sn.bn);
75 RCRET(rc);
76 }
77 if (aux->projection) {
78 rc = jql_project(q, root, ctx->ux->pool, ctx);
79 }
80 return rc;
81 }
82
_jbi_scan_sorter_do(struct _JBEXEC * ctx)83 static iwrc _jbi_scan_sorter_do(struct _JBEXEC *ctx) {
84 iwrc rc = 0;
85 int64_t step = 1, id;
86 struct _JBL jbl;
87 EJDB_EXEC *ux = ctx->ux;
88 struct _JBSSC *ssc = &ctx->ssc;
89 uint32_t rnum = ssc->refs_num;
90 struct JQP_AUX *aux = ux->q->aux;
91 IWPOOL *pool = ux->pool;
92
93 if (rnum) {
94 if (setjmp(ssc->fatal_jmp)) { // Init error jump
95 rc = ssc->rc;
96 goto finish;
97 }
98 if (!ssc->docs) {
99 size_t sp;
100 rc = ssc->sof.probe_mmap(&ssc->sof, 0, &ssc->docs, &sp);
101 RCGO(rc, finish);
102 }
103
104 sort_r(ssc->refs, rnum, sizeof(ssc->refs[0]), _jbi_scan_sorter_cmp, ctx);
105 }
106
107 for (int64_t i = ux->skip; step && i < rnum && i >= 0; ) {
108 uint8_t *rp = ssc->docs + ssc->refs[i];
109 memcpy(&id, rp, sizeof(id));
110 rp += sizeof(id);
111 rc = jbl_from_buf_keep_onstack2(&jbl, rp);
112 RCGO(rc, finish);
113 struct _EJDB_DOC doc = {
114 .id = id,
115 .raw = &jbl
116 };
117 if (aux->apply || aux->projection) {
118 if (!pool) {
119 pool = iwpool_create(jbl.bn.size * 2);
120 if (!pool) {
121 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
122 goto finish;
123 }
124 }
125 rc = _jbi_scan_sorter_apply(pool, ctx, ux->q, &doc);
126 RCGO(rc, finish);
127 } else if (aux->qmode & JQP_QRY_APPLY_DEL) {
128 rc = jb_del(ctx->jbc, &jbl, id);
129 RCGO(rc, finish);
130 }
131 if (!(aux->qmode & JQP_QRY_AGGREGATE)) {
132 do {
133 step = 1;
134 rc = ux->visitor(ux, &doc, &step);
135 RCGO(rc, finish);
136 } while (step == -1);
137 }
138 ++ux->cnt;
139 i += step;
140 if (pool != ux->pool) {
141 iwpool_destroy(pool);
142 pool = 0;
143 }
144 if (--ux->limit < 1) {
145 break;
146 }
147 }
148
149 finish:
150 if (pool != ux->pool) {
151 iwpool_destroy(pool);
152 }
153 _jbi_scan_sorter_release(ctx);
154 return rc;
155 }
156
_jbi_scan_sorter_init(struct _JBSSC * ssc,off_t initial_size)157 static iwrc _jbi_scan_sorter_init(struct _JBSSC *ssc, off_t initial_size) {
158 IWFS_EXT_OPTS opts = {
159 .initial_size = initial_size,
160 .rspolicy = iw_exfile_szpolicy_fibo,
161 .file = {
162 .path = "jb-",
163 .omode = IWFS_OTMP | IWFS_OUNLINK
164 }
165 };
166 iwrc rc = iwfs_exfile_open(&ssc->sof, &opts);
167 RCRET(rc);
168 rc = ssc->sof.add_mmap(&ssc->sof, 0, SIZE_T_MAX, 0);
169 if (rc) {
170 ssc->sof.close(&ssc->sof);
171 }
172 return rc;
173 }
174
jbi_sorter_consumer(struct _JBEXEC * ctx,IWKV_cursor cur,int64_t id,int64_t * step,bool * matched,iwrc err)175 iwrc jbi_sorter_consumer(
176 struct _JBEXEC *ctx, IWKV_cursor cur, int64_t id,
177 int64_t *step, bool *matched, iwrc err) {
178 if (!id) {
179 // End of scan
180 if (err) {
181 // In the case of error do not perform sorting just release resources
182 _jbi_scan_sorter_release(ctx);
183 return err;
184 } else {
185 return _jbi_scan_sorter_do(ctx);
186 }
187 }
188
189 iwrc rc;
190 size_t vsz = 0;
191 struct _JBL jbl;
192 struct _JBSSC *ssc = &ctx->ssc;
193 EJDB db = ctx->jbc->db;
194 IWFS_EXT *sof = &ssc->sof;
195
196 start:
197 {
198 if (cur) {
199 rc = iwkv_cursor_copy_val(cur, ctx->jblbuf + sizeof(id), ctx->jblbufsz - sizeof(id), &vsz);
200 } else {
201 IWKV_val key = {
202 .data = &id,
203 .size = sizeof(id)
204 };
205 rc = iwkv_get_copy(ctx->jbc->cdb, &key, ctx->jblbuf + sizeof(id), ctx->jblbufsz - sizeof(id), &vsz);
206 }
207 if (rc == IWKV_ERROR_NOTFOUND) {
208 rc = 0;
209 } else {
210 RCRET(rc);
211 }
212 if (vsz + sizeof(id) > ctx->jblbufsz) {
213 size_t nsize = MAX(vsz + sizeof(id), ctx->jblbufsz * 2);
214 void *nbuf = realloc(ctx->jblbuf, nsize);
215 if (!nbuf) {
216 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
217 }
218 ctx->jblbuf = nbuf;
219 ctx->jblbufsz = nsize;
220 goto start;
221 }
222 }
223
224 rc = jbl_from_buf_keep_onstack(&jbl, ctx->jblbuf + sizeof(id), vsz);
225 RCRET(rc);
226
227 rc = jql_matched(ctx->ux->q, &jbl, matched);
228 if (!*matched) {
229 return 0;
230 }
231
232 if (!ssc->refs) {
233 ssc->refs_asz = 64 * 1024; // 64K
234 ssc->refs = malloc(db->opts.document_buffer_sz);
235 if (!ssc->refs) {
236 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
237 }
238 ssc->docs_asz = 128 * 1024; // 128K
239 ssc->docs = malloc(ssc->docs_asz);
240 if (!ssc->docs) {
241 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
242 }
243 } else if (ssc->refs_asz <= (ssc->refs_num + 1) * sizeof(ssc->refs[0])) {
244 ssc->refs_asz *= 2;
245 uint32_t *nrefs = realloc(ssc->refs, ssc->refs_asz);
246 if (!nrefs) {
247 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
248 }
249 ssc->refs = nrefs;
250 }
251
252 vsz += sizeof(id);
253 memcpy(ctx->jblbuf, &id, sizeof(id));
254
255 start2:
256 {
257 if (ssc->docs) {
258 uint32_t rsize = ssc->docs_npos + vsz;
259 if (rsize > ssc->docs_asz) {
260 ssc->docs_asz = MIN(rsize * 2, db->opts.sort_buffer_sz);
261 if (rsize > ssc->docs_asz) {
262 size_t sz;
263 rc = _jbi_scan_sorter_init(ssc, (ssc->docs_npos + vsz) * 2);
264 RCRET(rc);
265 rc = sof->write(sof, 0, ssc->docs, ssc->docs_npos, &sz);
266 RCRET(rc);
267 free(ssc->docs);
268 ssc->docs = 0;
269 ssc->sof_active = true;
270 goto start2;
271 } else {
272 void *nbuf = realloc(ssc->docs, ssc->docs_asz);
273 if (!nbuf) {
274 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
275 }
276 ssc->docs = nbuf;
277 }
278 }
279 memcpy(ssc->docs + ssc->docs_npos, ctx->jblbuf, vsz);
280 } else {
281 size_t sz;
282 rc = sof->write(sof, ssc->docs_npos, ctx->jblbuf, vsz, &sz);
283 RCRET(rc);
284 }
285 ssc->refs[ssc->refs_num++] = ssc->docs_npos;
286 ssc->docs_npos += vsz;
287 }
288
289 return rc;
290 }
291