1 #include "ejdb2_internal.h"
2 #include "sort_r.h"
3
_jbi_scan_sorter_release(struct _JBEXEC * ctx)4 static void _jbi_scan_sorter_release(struct _JBEXEC *ctx) {
5 struct _JBSSC *ssc = &ctx->ssc;
6 free(ssc->refs);
7 if (ssc->sof_active) {
8 ssc->sof.close(&ssc->sof);
9 } else {
10 free(ssc->docs);
11 }
12 memset(ssc, 0, sizeof(*ssc));
13 }
14
_jbi_scan_sorter_cmp(const void * o1,const void * o2,void * op)15 static int _jbi_scan_sorter_cmp(const void *o1, const void *o2, void *op) {
16 int rv = 0;
17 iwrc rc;
18 uint32_t r1, r2;
19 struct _JBL d1, d2;
20 struct _JBEXEC *ctx = op;
21 struct _JBSSC *ssc = &ctx->ssc;
22 struct JQP_AUX *aux = ctx->ux->q->aux;
23 uint8_t *p1, *p2;
24 assert(aux->orderby_num > 0);
25
26 memcpy(&r1, o1, sizeof(r1));
27 memcpy(&r2, o2, sizeof(r2));
28
29 p1 = ssc->docs + r1 + sizeof(uint64_t) /*id*/;
30 p2 = ssc->docs + r2 + sizeof(uint64_t) /*id*/;
31
32 RCC(rc, finish, jbl_from_buf_keep_onstack2(&d1, p1));
33 RCC(rc, finish, jbl_from_buf_keep_onstack2(&d2, p2));
34
35 for (int i = 0; i < aux->orderby_num; ++i) {
36 struct _JBL v1 = { 0 };
37 struct _JBL v2 = { 0 };
38 JBL_PTR ptr = aux->orderby_ptrs[i];
39 int desc = (ptr->op & 1) ? -1 : 1; // If `-1` do desc sorting
40 _jbl_at(&d1, ptr, &v1);
41 _jbl_at(&d2, ptr, &v2);
42 rv = _jbl_cmp_atomic_values(&v1, &v2) * desc;
43 if (rv) {
44 break;
45 }
46 }
47
48 finish:
49 if (rc) {
50 ssc->rc = rc;
51 longjmp(ssc->fatal_jmp, 1);
52 }
53 return rv;
54 }
55
_jbi_scan_sorter_apply(IWPOOL * pool,struct _JBEXEC * ctx,JQL q,struct _EJDB_DOC * doc)56 static iwrc _jbi_scan_sorter_apply(IWPOOL *pool, struct _JBEXEC *ctx, JQL q, struct _EJDB_DOC *doc) {
57 JBL_NODE root;
58 JBL jbl = doc->raw;
59 struct JQP_AUX *aux = q->aux;
60 iwrc rc = jbl_to_node(jbl, &root, true, pool);
61 RCRET(rc);
62 doc->node = root;
63 if (aux->qmode & JQP_QRY_APPLY_DEL) {
64 rc = jb_del(ctx->jbc, jbl, doc->id);
65 RCRET(rc);
66 } else if (aux->apply || aux->apply_placeholder) {
67 struct _JBL sn = { 0 };
68 rc = jql_apply(q, root, pool);
69 RCRET(rc);
70 rc = _jbl_from_node(&sn, root);
71 RCRET(rc);
72 rc = jb_put(ctx->jbc, &sn, doc->id);
73 binn_free(&sn.bn);
74 RCRET(rc);
75 }
76 if (aux->projection) {
77 rc = jql_project(q, root, ctx->ux->pool, ctx);
78 }
79 return rc;
80 }
81
_jbi_scan_sorter_do(struct _JBEXEC * ctx)82 static iwrc _jbi_scan_sorter_do(struct _JBEXEC *ctx) {
83 iwrc rc = 0;
84 int64_t step = 1, id;
85 struct _JBL jbl;
86 EJDB_EXEC *ux = ctx->ux;
87 struct _JBSSC *ssc = &ctx->ssc;
88 uint32_t rnum = ssc->refs_num;
89 struct JQP_AUX *aux = ux->q->aux;
90 IWPOOL *pool = ux->pool;
91
92 if (rnum) {
93 if (setjmp(ssc->fatal_jmp)) { // Init error jump
94 rc = ssc->rc;
95 goto finish;
96 }
97 if (!ssc->docs) {
98 size_t sp;
99 RCC(rc, finish, ssc->sof.probe_mmap(&ssc->sof, 0, &ssc->docs, &sp));
100 }
101
102 sort_r(ssc->refs, rnum, sizeof(ssc->refs[0]), _jbi_scan_sorter_cmp, ctx);
103 }
104
105 for (int64_t i = ux->skip; step && i < rnum && i >= 0; ) {
106 uint8_t *rp = ssc->docs + ssc->refs[i];
107 memcpy(&id, rp, sizeof(id));
108 rp += sizeof(id);
109 RCC(rc, finish, jbl_from_buf_keep_onstack2(&jbl, rp));
110
111 struct _EJDB_DOC doc = {
112 .id = id,
113 .raw = &jbl
114 };
115
116 if (aux->apply || aux->projection) {
117 if (!pool) {
118 pool = iwpool_create(jbl.bn.size * 2);
119 if (!pool) {
120 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
121 goto finish;
122 }
123 }
124 RCC(rc, finish, _jbi_scan_sorter_apply(pool, ctx, ux->q, &doc));
125 } else if (aux->qmode & JQP_QRY_APPLY_DEL) {
126 RCC(rc, finish, jb_del(ctx->jbc, &jbl, id));
127 }
128
129 if (!(aux->qmode & JQP_QRY_AGGREGATE)) {
130 do {
131 step = 1;
132 RCC(rc, finish, ux->visitor(ux, &doc, &step));
133 } while (step == -1);
134 }
135
136 ++ux->cnt;
137 i += step;
138 if (pool != ux->pool) {
139 iwpool_destroy(pool);
140 pool = 0;
141 }
142 if (--ux->limit < 1) {
143 break;
144 }
145 }
146
147 finish:
148 if (pool != ux->pool) {
149 iwpool_destroy(pool);
150 }
151 _jbi_scan_sorter_release(ctx);
152 return rc;
153 }
154
_jbi_scan_sorter_init(struct _JBSSC * ssc,off_t initial_size)155 static iwrc _jbi_scan_sorter_init(struct _JBSSC *ssc, off_t initial_size) {
156 IWFS_EXT_OPTS opts = {
157 .initial_size = initial_size,
158 .rspolicy = iw_exfile_szpolicy_fibo,
159 .file = {
160 .path = "jb-",
161 .omode = IWFS_OTMP | IWFS_OUNLINK
162 }
163 };
164 iwrc rc = iwfs_exfile_open(&ssc->sof, &opts);
165 RCRET(rc);
166 rc = ssc->sof.add_mmap(&ssc->sof, 0, SIZE_T_MAX, 0);
167 if (rc) {
168 ssc->sof.close(&ssc->sof);
169 }
170 return rc;
171 }
172
jbi_sorter_consumer(struct _JBEXEC * ctx,IWKV_cursor cur,int64_t id,int64_t * step,bool * matched,iwrc err)173 iwrc jbi_sorter_consumer(
174 struct _JBEXEC *ctx, IWKV_cursor cur, int64_t id,
175 int64_t *step, bool *matched, iwrc err
176 ) {
177 if (!id) {
178 // End of scan
179 if (err) {
180 // In the case of error do not perform sorting just release resources
181 _jbi_scan_sorter_release(ctx);
182 return err;
183 } else {
184 return _jbi_scan_sorter_do(ctx);
185 }
186 }
187
188 iwrc rc;
189 size_t vsz = 0;
190 struct _JBL jbl;
191 struct _JBSSC *ssc = &ctx->ssc;
192 EJDB db = ctx->jbc->db;
193 IWFS_EXT *sof = &ssc->sof;
194
195 start:
196 {
197 if (cur) {
198 rc = iwkv_cursor_copy_val(cur, ctx->jblbuf + sizeof(id), ctx->jblbufsz - sizeof(id), &vsz);
199 } else {
200 IWKV_val key = {
201 .data = &id,
202 .size = sizeof(id)
203 };
204 rc = iwkv_get_copy(ctx->jbc->cdb, &key, ctx->jblbuf + sizeof(id), ctx->jblbufsz - sizeof(id), &vsz);
205 }
206 if (rc == IWKV_ERROR_NOTFOUND) {
207 rc = 0;
208 } else {
209 RCRET(rc);
210 }
211 if (vsz + sizeof(id) > ctx->jblbufsz) {
212 size_t nsize = MAX(vsz + sizeof(id), ctx->jblbufsz * 2);
213 void *nbuf = realloc(ctx->jblbuf, nsize);
214 if (!nbuf) {
215 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
216 }
217 ctx->jblbuf = nbuf;
218 ctx->jblbufsz = nsize;
219 goto start;
220 }
221 }
222
223 rc = jbl_from_buf_keep_onstack(&jbl, ctx->jblbuf + sizeof(id), vsz);
224 RCRET(rc);
225
226 rc = jql_matched(ctx->ux->q, &jbl, matched);
227 if (!*matched) {
228 return 0;
229 }
230
231 if (!ssc->refs) {
232 ssc->refs_asz = 64 * 1024; // 64K
233 ssc->refs = malloc(db->opts.document_buffer_sz);
234 if (!ssc->refs) {
235 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
236 }
237 ssc->docs_asz = 128 * 1024; // 128K
238 ssc->docs = malloc(ssc->docs_asz);
239 if (!ssc->docs) {
240 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
241 }
242 } else if (ssc->refs_asz <= (ssc->refs_num + 1) * sizeof(ssc->refs[0])) {
243 ssc->refs_asz *= 2;
244 uint32_t *nrefs = realloc(ssc->refs, ssc->refs_asz);
245 if (!nrefs) {
246 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
247 }
248 ssc->refs = nrefs;
249 }
250
251 vsz += sizeof(id);
252 memcpy(ctx->jblbuf, &id, sizeof(id));
253
254 start2:
255 {
256 if (ssc->docs) {
257 uint32_t rsize = ssc->docs_npos + vsz;
258 if (rsize > ssc->docs_asz) {
259 ssc->docs_asz = MIN(rsize * 2, db->opts.sort_buffer_sz);
260 if (rsize > ssc->docs_asz) {
261 size_t sz;
262 rc = _jbi_scan_sorter_init(ssc, (ssc->docs_npos + vsz) * 2);
263 RCRET(rc);
264 rc = sof->write(sof, 0, ssc->docs, ssc->docs_npos, &sz);
265 RCRET(rc);
266 free(ssc->docs);
267 ssc->docs = 0;
268 ssc->sof_active = true;
269 goto start2;
270 } else {
271 void *nbuf = realloc(ssc->docs, ssc->docs_asz);
272 if (!nbuf) {
273 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
274 }
275 ssc->docs = nbuf;
276 }
277 }
278 memcpy(ssc->docs + ssc->docs_npos, ctx->jblbuf, vsz);
279 } else {
280 size_t sz;
281 rc = sof->write(sof, ssc->docs_npos, ctx->jblbuf, vsz, &sz);
282 RCRET(rc);
283 }
284 ssc->refs[ssc->refs_num++] = ssc->docs_npos;
285 ssc->docs_npos += vsz;
286 }
287
288 return rc;
289 }
290