1 #include "jbr.h"
2 #include "ejdb2_internal.h"
3
4 #include <iwnet/iwn_ws_server.h>
5 #include <iwnet/iwn_pairs.h>
6 #include <iowow/iwconv.h>
7
8 #include <pthread.h>
9 #include <ctype.h>
10
11 #define JBR_MAX_KEY_LEN 36
12 #define JBR_WS_STR_PREMATURE_END "Premature end of message"
13
14 typedef enum {
15 _JBR_ERROR_START = (IW_ERROR_START + 15000UL + 3000),
16 JBR_ERROR_WS_INVALID_MESSAGE, /**< Invalid message recieved (JBR_ERROR_WS_INVALID_MESSAGE) */
17 JBR_ERROR_WS_ACCESS_DENIED, /**< Access denied (JBR_ERROR_WS_ACCESS_DENIED) */
18 _JBR_ERROR_END,
19 } jbr_ecode_t;
20
21 struct jbr {
22 struct iwn_poller *poller;
23 pthread_t poller_thread;
24 const EJDB_HTTP *http;
25 struct iwn_wf_ctx *ctx;
26 EJDB db;
27 };
28
29 struct rctx {
30 struct iwn_wf_req *req;
31 struct iwn_ws_sess *ws;
32 struct jbr *jbr;
33 struct iwn_vals vals;
34 pthread_mutex_t mtx;
35 pthread_cond_t cond;
36 EJDB_EXEC ux;
37 int64_t id;
38 pthread_t request_thread;
39 bool read_anon;
40 bool visitor_started;
41 bool visitor_finished;
42 char cname[EJDB_COLLECTION_NAME_MAX_LEN + 1];
43 };
44
45 struct mctx {
46 struct rctx *ctx;
47 IWXSTR *wbuf;
48 int64_t id;
49 char cname[EJDB_COLLECTION_NAME_MAX_LEN + 1];
50 char key[JBR_MAX_KEY_LEN + 1];
51 };
52
53 #define JBR_RC_REPORT(ret_, r_, rc_) \
54 do { \
55 if ((ret_) >= 500) iwlog_ecode_error3(rc_); \
56 const char *err = iwlog_ecode_explained(rc_); \
57 if (!iwn_http_response_write(r_, ret_, "text/plain", err, -1)) { \
58 ret_ = -1; \
59 } else { \
60 ret_ = 1; \
61 } \
62 } while (0)
63
jbr_shutdown_request(EJDB db)64 void jbr_shutdown_request(EJDB db) {
65 if (db->jbr) {
66 iwn_poller_shutdown_request(db->jbr->poller);
67 }
68 }
69
jbr_shutdown_wait(struct jbr * jbr)70 void jbr_shutdown_wait(struct jbr *jbr) {
71 if (jbr) {
72 pthread_t t = jbr->poller_thread;
73 iwn_poller_shutdown_request(jbr->poller);
74 if (t && t != pthread_self()) {
75 jbr->poller_thread = 0;
76 pthread_join(t, 0);
77 }
78 free(jbr);
79 }
80 }
81
_poller_worker(void * op)82 static void* _poller_worker(void *op) {
83 JBR jbr = op;
84 iwn_poller_poll(jbr->poller);
85 iwn_poller_destroy(&jbr->poller);
86 return 0;
87 }
88
_rctx_dispose(struct rctx * ctx)89 static void _rctx_dispose(struct rctx *ctx) {
90 if (ctx) {
91 for (struct iwn_val *v = ctx->vals.first; v; ) {
92 struct iwn_val *n = v->next;
93 free(v->buf);
94 free(v);
95 v = n;
96 }
97 pthread_mutex_destroy(&ctx->mtx);
98 pthread_cond_destroy(&ctx->cond);
99 free(ctx);
100 }
101 }
102
_on_http_request_dispose(struct iwn_http_req * req)103 static void _on_http_request_dispose(struct iwn_http_req *req) {
104 struct rctx *ctx = req->user_data;
105 _rctx_dispose(ctx);
106 }
107
_on_get(struct rctx * ctx)108 static int _on_get(struct rctx *ctx) {
109 JBL jbl = 0;
110 IWXSTR *xstr = 0;
111 int nbytes = 0, ret = 500;
112
113 iwrc rc = ejdb_get(ctx->jbr->db, ctx->cname, ctx->id, &jbl);
114 if (rc) {
115 if ((rc == IWKV_ERROR_NOTFOUND) || (rc == IW_ERROR_NOT_EXISTS)) {
116 return 404;
117 }
118 goto finish;
119 }
120
121 if (ctx->req->flags & IWN_WF_HEAD) {
122 RCC(rc, finish, jbl_as_json(jbl, jbl_count_json_printer, &nbytes, JBL_PRINT_PRETTY));
123 RCC(rc, finish, iwn_http_response_header_i64_set(ctx->req->http, "content-length", nbytes));
124 } else {
125 RCA(xstr = iwxstr_new2(jbl->bn.size * 2), finish);
126 RCC(rc, finish, jbl_as_json(jbl, jbl_xstr_json_printer, xstr, JBL_PRINT_PRETTY));
127 nbytes = iwxstr_size(xstr);
128 }
129
130 ret = iwn_http_response_write(ctx->req->http, 200, "application/json",
131 xstr ? iwxstr_ptr(xstr) : 0, xstr ? nbytes : 0) ? 1 : -1;
132
133 finish:
134 if (rc) {
135 JBR_RC_REPORT(ret, ctx->req->http, rc);
136 }
137 jbl_destroy(&jbl);
138 iwxstr_destroy(xstr);
139 return ret;
140 }
141
_on_post(struct rctx * ctx)142 static int _on_post(struct rctx *ctx) {
143 if (ctx->read_anon) {
144 return 403;
145 }
146 if (ctx->req->body_len < 1) {
147 return 400;
148 }
149
150 JBL jbl;
151 int64_t id;
152 int ret = 500;
153 iwrc rc = jbl_from_json(&jbl, ctx->req->body);
154 if (rc) {
155 ret = 400;
156 goto finish;
157 }
158 RCC(rc, finish, ejdb_put_new(ctx->jbr->db, ctx->cname, jbl, &id));
159 ret = iwn_http_response_printf(ctx->req->http, 200, "text/plain", "%" PRId64, id) ? 1 : -1;
160
161 finish:
162 if (rc) {
163 JBR_RC_REPORT(ret, ctx->req->http, rc);
164 }
165 jbl_destroy(&jbl);
166 return ret;
167 }
168
_on_put(struct rctx * ctx)169 static int _on_put(struct rctx *ctx) {
170 if (ctx->read_anon) {
171 return 403;
172 }
173 if (ctx->req->body_len < 1) {
174 return 400;
175 }
176
177 JBL jbl;
178 int ret = 500;
179 iwrc rc = jbl_from_json(&jbl, ctx->req->body);
180 if (rc) {
181 ret = 400;
182 goto finish;
183 }
184 RCC(rc, finish, ejdb_put(ctx->jbr->db, ctx->cname, jbl, ctx->id));
185 ret = 200;
186
187 finish:
188 if (rc) {
189 JBR_RC_REPORT(ret, ctx->req->http, rc);
190 }
191 jbl_destroy(&jbl);
192 return ret;
193 }
194
_on_patch(struct rctx * ctx)195 static int _on_patch(struct rctx *ctx) {
196 if (ctx->read_anon) {
197 return 403;
198 }
199 if (ctx->req->body_len < 1) {
200 return 400;
201 }
202 int ret = 200;
203 iwrc rc = ejdb_patch(ctx->jbr->db, ctx->cname, ctx->req->body, ctx->id);
204 if (rc) {
205 iwrc_strip_code(&rc);
206 switch (rc) {
207 case IWKV_ERROR_NOTFOUND:
208 case IW_ERROR_NOT_EXISTS:
209 rc = 0;
210 ret = 404;
211 break;
212 case JBL_ERROR_PARSE_JSON:
213 case JBL_ERROR_PARSE_INVALID_CODEPOINT:
214 case JBL_ERROR_PARSE_INVALID_UTF8:
215 case JBL_ERROR_PARSE_UNQUOTED_STRING:
216 case JBL_ERROR_PATCH_TARGET_INVALID:
217 case JBL_ERROR_PATCH_NOVALUE:
218 case JBL_ERROR_PATCH_INVALID_OP:
219 case JBL_ERROR_PATCH_TEST_FAILED:
220 case JBL_ERROR_PATCH_INVALID_ARRAY_INDEX:
221 case JBL_ERROR_JSON_POINTER:
222 ret = 400;
223 break;
224 default:
225 ret = 500;
226 break;
227 }
228 }
229 if (rc) {
230 JBR_RC_REPORT(ret, ctx->req->http, rc);
231 }
232 return ret;
233 }
234
_on_delete(struct rctx * ctx)235 static int _on_delete(struct rctx *ctx) {
236 if (ctx->read_anon) {
237 return 403;
238 }
239 iwrc rc = ejdb_del(ctx->jbr->db, ctx->cname, ctx->id);
240 if (rc) {
241 if (rc == IWKV_ERROR_NOTFOUND || rc == IW_ERROR_NOT_EXISTS) {
242 return 404;
243 } else {
244 int ret = 500;
245 JBR_RC_REPORT(ret, ctx->req->http, rc);
246 return ret;
247 }
248 }
249 return 200;
250 }
251
_on_options(struct rctx * ctx)252 static int _on_options(struct rctx *ctx) {
253 iwrc rc;
254 JBL jbl = 0;
255 IWXSTR *xstr = 0;
256 int ret = 500;
257
258 RCC(rc, finish, ejdb_get_meta(ctx->jbr->db, &jbl));
259 RCA(xstr = iwxstr_new2(jbl->bn.size * 2), finish);
260 RCC(rc, finish, jbl_as_json(jbl, jbl_xstr_json_printer, xstr, JBL_PRINT_PRETTY));
261
262 if (ctx->jbr->http->read_anon) {
263 RCC(rc, finish, iwn_http_response_header_add(ctx->req->http, "Allow", "GET, HEAD, POST, OPTIONS", 24));
264 } else {
265 RCC(rc, finish, iwn_http_response_header_add(ctx->req->http, "Allow",
266 "GET, HEAD, POST, PUT, PATCH, DELETE, OPTIONS", 44));
267 }
268
269 if (ctx->jbr->http->cors) {
270 RCC(rc, finish, iwn_http_response_header_add(ctx->req->http, "Access-Control-Allow-Origin", "*", 1));
271 RCC(rc, finish, iwn_http_response_header_add(ctx->req->http, "Access-Control-Allow-Headers",
272 "X-Requested-With, Content-Type, Accept, Origin, Authorization", 61));
273
274 if (ctx->jbr->http->read_anon) {
275 RCC(rc, finish, iwn_http_response_header_add(ctx->req->http, "Access-Control-Allow-Methods",
276 "GET, HEAD, POST, OPTIONS", 24));
277 } else {
278 RCC(rc, finish, iwn_http_response_header_add(ctx->req->http, "Access-Control-Allow-Methods",
279 "GET, HEAD, POST, PUT, PATCH, DELETE, OPTIONS", 44));
280 }
281 }
282
283 ret = iwn_http_response_write(ctx->req->http, 200, "application/json", iwxstr_ptr(xstr), iwxstr_size(xstr)) ? 1 : -1;
284
285 finish:
286 if (rc) {
287 JBR_RC_REPORT(ret, ctx->req->http, rc);
288 }
289 jbl_destroy(&jbl);
290 iwxstr_destroy(xstr);
291 return ret;
292 }
293
_query_chunk_write_next(struct iwn_http_req * req,bool * again)294 static bool _query_chunk_write_next(struct iwn_http_req *req, bool *again) {
295 iwrc rc = 0;
296 struct iwn_val *val;
297 struct rctx *ctx = req->user_data;
298
299 if (ctx->request_thread == pthread_self()) {
300 return iwn_poller_arm_events(req->poller_adapter->poller, req->poller_adapter->fd, IWN_POLLOUT) == 0;
301 }
302
303 pthread_mutex_lock(&ctx->mtx);
304 start:
305 val = ctx->vals.first;
306 if (val) {
307 ctx->vals.first = val->next;
308 if (ctx->vals.last == val) {
309 ctx->vals.last = 0;
310 }
311 } else if (!ctx->visitor_finished) {
312 pthread_cond_wait(&ctx->cond, &ctx->mtx);
313 goto start;
314 }
315 pthread_mutex_unlock(&ctx->mtx);
316
317 if (val) {
318 rc = iwn_http_response_chunk_write(req, val->buf, val->len, _query_chunk_write_next, again);
319 free(val->buf);
320 free(val);
321 } else {
322 rc = iwn_http_response_chunk_end(req);
323 }
324
325 return rc == 0;
326 }
327
_query_visitor(EJDB_EXEC * ux,EJDB_DOC doc,int64_t * step)328 static iwrc _query_visitor(EJDB_EXEC *ux, EJDB_DOC doc, int64_t *step) {
329 iwrc rc = 0;
330 struct rctx *ctx = ux->opaque;
331 IWXSTR *xstr = iwxstr_new();
332 RCA(xstr, finish);
333
334 if (ux->log) {
335 RCC(rc, finish, iwxstr_cat(xstr, iwxstr_ptr(ux->log), iwxstr_size(ux->log)));
336 RCC(rc, finish, iwxstr_cat(xstr, "--------------------", 20));
337 iwxstr_destroy(ux->log);
338 ux->log = 0;
339 }
340 RCC(rc, finish, iwxstr_printf(xstr, "\r\n%" PRId64 "\t", doc->id));
341 if (doc->node) {
342 RCC(rc, finish, jbn_as_json(doc->node, jbl_xstr_json_printer, xstr, 0));
343 } else {
344 RCC(rc, finish, jbl_as_json(doc->raw, jbl_xstr_json_printer, xstr, 0));
345 }
346
347 if (ctx->visitor_started) {
348 size_t sz = iwxstr_size(xstr);
349 char *buf = iwxstr_destroy_keep_ptr(xstr);
350 pthread_mutex_lock(&ctx->mtx);
351 iwn_val_add_new(&ctx->vals, buf, sz);
352 pthread_cond_broadcast(&ctx->cond);
353 pthread_mutex_unlock(&ctx->mtx);
354 } else {
355 ctx->visitor_started = true;
356 RCC(rc, finish, iwn_http_response_chunk_write(
357 ctx->req->http, iwxstr_ptr(xstr), iwxstr_size(xstr), _query_chunk_write_next, 0));
358 iwxstr_destroy(xstr);
359 }
360
361 xstr = 0;
362
363 finish:
364 iwxstr_destroy(xstr);
365 return rc;
366 }
367
_on_query(struct rctx * ctx)368 static int _on_query(struct rctx *ctx) {
369 if (ctx->req->body_len < 1) {
370 return 400;
371 }
372 iwrc rc = 0;
373 int ret = 500;
374
375 ctx->ux.opaque = ctx;
376 ctx->ux.db = ctx->jbr->db;
377 ctx->ux.visitor = _query_visitor;
378
379 RCC(rc, finish,
380 jql_create2(&ctx->ux.q, 0, ctx->req->body, JQL_SILENT_ON_PARSE_ERROR | JQL_KEEP_QUERY_ON_PARSE_ERROR));
381
382 if (ctx->read_anon && jql_has_apply(ctx->ux.q)) {
383 jql_destroy(&ctx->ux.q);
384 return 403;
385 }
386
387 struct iwn_val val = iwn_http_request_header_get(ctx->req->http, "x-hints", IW_LLEN("x-hints"));
388 if (val.len) {
389 char buf[val.len + 1];
390 memcpy(buf, val.buf, val.len);
391 buf[val.len] = '\0';
392 if (strstr(buf, "explain")) {
393 RCA(ctx->ux.log = iwxstr_new(), finish);
394 }
395 }
396
397 rc = ejdb_exec(&ctx->ux);
398
399 pthread_mutex_lock(&ctx->mtx);
400 ctx->visitor_finished = true;
401 pthread_cond_broadcast(&ctx->cond);
402 pthread_mutex_unlock(&ctx->mtx);
403 RCGO(rc, finish);
404
405 if (!ctx->visitor_started) {
406 if (ctx->ux.log) {
407 iwxstr_cat(ctx->ux.log, "--------------------", 20);
408 if (jql_has_aggregate_count(ctx->ux.q)) {
409 iwxstr_printf(ctx->ux.log, "\n%" PRId64, ctx->ux.cnt);
410 }
411 ret = iwn_http_response_write(ctx->req->http, 200, "text/plain",
412 iwxstr_ptr(ctx->ux.log), iwxstr_size(ctx->ux.log))
413 ? 1 : -1;
414 } else if (jql_has_aggregate_count(ctx->ux.q)) {
415 ret = iwn_http_response_printf(ctx->req->http, 200, "text/plain", "%" PRId64, ctx->ux.cnt)
416 ? 1 : -1;
417 } else {
418 ret = 200;
419 }
420 } else {
421 ret = 1;
422 }
423
424 finish:
425 if (rc) {
426 iwrc rcs = rc;
427 iwrc_strip_code(&rcs);
428 switch (rcs) {
429 case JQL_ERROR_QUERY_PARSE: {
430 const char *err = jql_error(ctx->ux.q);
431 ret = iwn_http_response_write(ctx->req->http, 400, "text/plain", err, -1) ? 1 : -1;
432 break;
433 }
434 case JQL_ERROR_NO_COLLECTION:
435 ret = 400;
436 JBR_RC_REPORT(ret, ctx->req->http, rc);
437 break;
438 default:
439 JBR_RC_REPORT(ret, ctx->req->http, rc);
440 break;
441 }
442 }
443 jql_destroy(&ctx->ux.q);
444 iwxstr_destroy(ctx->ux.log);
445 return ret;
446 }
447
_on_http_request(struct iwn_wf_req * req,void * op)448 static int _on_http_request(struct iwn_wf_req *req, void *op) {
449 struct jbr *jbr = op;
450 struct rctx *ctx = calloc(1, sizeof(*ctx));
451 if (!ctx) {
452 return 500;
453 }
454 pthread_mutex_init(&ctx->mtx, 0);
455 pthread_cond_init(&ctx->cond, 0);
456
457 ctx->req = req;
458 ctx->jbr = jbr;
459 ctx->request_thread = pthread_self();
460
461 uint32_t method = req->flags & IWN_WF_METHODS_ALL;
462 req->http->user_data = ctx;
463 req->http->on_request_dispose = _on_http_request_dispose;
464
465 if ((req->flags & IWN_WF_OPTIONS) && req->path_unmatched[0] != '\0') {
466 return 400;
467 }
468
469 {
470 // Parse {collection name}/{id}
471 const char *cname = req->path_unmatched;
472 size_t len = strlen(cname);
473 char *c = strchr(req->path_unmatched, '/');
474 if (!c) {
475 if ( len > EJDB_COLLECTION_NAME_MAX_LEN
476 || (method & (IWN_WF_GET | IWN_WF_HEAD | IWN_WF_PUT | IWN_WF_DELETE | IWN_WF_PATCH))) {
477 return 400;
478 }
479 } else {
480 len = c - cname;
481 if ( len > EJDB_COLLECTION_NAME_MAX_LEN
482 || method == IWN_WF_POST) {
483 return 400;
484 }
485 char *ep;
486 ctx->id = strtoll(c + 1, &ep, 10);
487 if (*ep != '\0' || ctx->id < 1) {
488 return 400;
489 }
490 }
491 memcpy(ctx->cname, cname, len);
492 ctx->cname[len] = '\0';
493 }
494
495 if (jbr->http->access_token) {
496 struct iwn_val val = iwn_http_request_header_get(req->http, "x-access-token", IW_LLEN("x-access-token"));
497 if (!val.len) {
498 if (jbr->http->read_anon) {
499 if ( (method & (IWN_WF_GET | IWN_WF_HEAD))
500 || (method == IWN_WF_POST && ctx->cname[0] == '\0')) {
501 ctx->read_anon = true;
502 goto process;
503 }
504 }
505 return 401;
506 } else {
507 if ( val.len != jbr->http->access_token_len
508 || strncmp(val.buf, jbr->http->access_token, val.len) != 0) {
509 return 403;
510 }
511 }
512 }
513
514 process:
515 if (jbr->http->cors && iwn_http_response_header_set(req->http, "access-control-allow-origin", "*", 1)) {
516 return 500;
517 }
518 if (ctx->cname[0] != '\0') {
519 switch (method) {
520 case IWN_WF_GET:
521 case IWN_WF_HEAD:
522 return _on_get(ctx);
523 case IWN_WF_POST:
524 return _on_post(ctx);
525 case IWN_WF_PUT:
526 return _on_put(ctx);
527 case IWN_WF_PATCH:
528 return _on_patch(ctx);
529 case IWN_WF_DELETE:
530 return _on_delete(ctx);
531 default:
532 return 400;
533 }
534 } else if (method == IWN_WF_POST) {
535 return _on_query(ctx);
536 } else if (method == IWN_WF_OPTIONS) {
537 return _on_options(ctx);
538 } else {
539 return 400;
540 }
541 }
542
543 typedef enum {
544 JBWS_NONE,
545 JBWS_SET,
546 JBWS_GET,
547 JBWS_ADD,
548 JBWS_DEL,
549 JBWS_PATCH,
550 JBWS_QUERY,
551 JBWS_EXPLAIN,
552 JBWS_INFO,
553 JBWS_IDX,
554 JBWS_NIDX,
555 JBWS_REMOVE_COLL,
556 } jbws_e;
557
_on_ws_session_http(struct iwn_wf_req * req,struct iwn_ws_handler_spec * spec)558 static int _on_ws_session_http(struct iwn_wf_req *req, struct iwn_ws_handler_spec *spec) {
559 struct jbr *jbr = spec->user_data;
560 struct rctx *ctx = calloc(1, sizeof(*ctx));
561 if (!ctx) {
562 return 500;
563 }
564 pthread_mutex_init(&ctx->mtx, 0);
565 pthread_cond_init(&ctx->cond, 0);
566 ctx->req = req;
567 ctx->jbr = jbr;
568 req->http->user_data = ctx;
569
570 if (jbr->http->access_token) {
571 struct iwn_val val = iwn_http_request_header_get(req->http, "x-access-token", IW_LLEN("x-access-token"));
572 if (val.len) {
573 if (val.len != jbr->http->access_token_len || strncmp(val.buf, jbr->http->access_token, val.len) != 0) {
574 return 403;
575 }
576 } else {
577 if (jbr->http->read_anon) {
578 ctx->read_anon = true;
579 } else {
580 return 401;
581 }
582 }
583 }
584
585 return 0;
586 }
587
_on_ws_session_dispose(struct iwn_ws_sess * ws)588 static void _on_ws_session_dispose(struct iwn_ws_sess *ws) {
589 _on_http_request_dispose(ws->req->http);
590 }
591
_on_ws_session_init(struct iwn_ws_sess * ws)592 static bool _on_ws_session_init(struct iwn_ws_sess *ws) {
593 struct rctx *ctx = ws->req->http->user_data;
594 ctx->ws = ws;
595 return true;
596 }
597
_ws_error_send(struct iwn_ws_sess * ws,const char * key,const char * error,const char * extra)598 static bool _ws_error_send(struct iwn_ws_sess *ws, const char *key, const char *error, const char *extra) {
599 if (key == 0) {
600 return false;
601 }
602 if (extra) {
603 return iwn_ws_server_printf(ws, "%s ERROR: %s %s", key, error, extra);
604 } else {
605 return iwn_ws_server_printf(ws, "%s ERROR: %s", key, error);
606 }
607 }
608
_ws_rc_send(struct iwn_ws_sess * ws,const char * key,iwrc rc,const char * extra)609 static bool _ws_rc_send(struct iwn_ws_sess *ws, const char *key, iwrc rc, const char *extra) {
610 const char *error = iwlog_ecode_explained(rc);
611 return _ws_error_send(ws, key, error ? error : "unknown", extra);
612 }
613
_ws_info(struct iwn_ws_sess * ws,struct mctx * mctx)614 static bool _ws_info(struct iwn_ws_sess *ws, struct mctx *mctx) {
615 iwrc rc;
616 JBL jbl = 0;
617 IWXSTR *xstr = 0;
618 bool ret = true;
619 struct rctx *ctx = mctx->ctx;
620 if (ctx->read_anon) {
621 rc = JBR_ERROR_WS_ACCESS_DENIED;
622 goto finish;
623 }
624 RCC(rc, finish, ejdb_get_meta(ctx->jbr->db, &jbl));
625 RCA(xstr = iwxstr_new2(jbl->bn.size * 2), finish);
626 RCC(rc, finish, iwxstr_printf(xstr, "%s\t", mctx->key));
627 RCC(rc, finish, jbl_as_json(jbl, jbl_xstr_json_printer, xstr, JBL_PRINT_PRETTY));
628 ret = iwn_ws_server_write(ws, iwxstr_ptr(xstr), iwxstr_size(xstr));
629
630 finish:
631 if (rc && !_ws_rc_send(ws, mctx->key, rc, 0)) {
632 ret = false;
633 }
634 jbl_destroy(&jbl);
635 iwxstr_destroy(xstr);
636 return ret;
637 }
638
_ws_coll_remove(struct iwn_ws_sess * ws,struct mctx * mctx)639 static bool _ws_coll_remove(struct iwn_ws_sess *ws, struct mctx *mctx) {
640 iwrc rc;
641 bool ret = true;
642 struct rctx *ctx = mctx->ctx;
643 if (ctx->read_anon) {
644 rc = JBR_ERROR_WS_ACCESS_DENIED;
645 goto finish;
646 }
647 RCC(rc, finish, ejdb_remove_collection(ctx->jbr->db, mctx->cname));
648 ret = iwn_ws_server_write(ws, mctx->key, -1);
649
650 finish:
651 if (rc && !_ws_rc_send(ws, mctx->key, rc, 0)) {
652 ret = false;
653 }
654 return ret;
655 }
656
_ws_document_add(struct iwn_ws_sess * ws,struct mctx * mctx,const char * json)657 static bool _ws_document_add(struct iwn_ws_sess *ws, struct mctx *mctx, const char *json) {
658 iwrc rc;
659 JBL jbl = 0;
660 int64_t id;
661 bool ret = true;
662 struct rctx *ctx = ws->req->http->user_data;
663 if (ctx->read_anon) {
664 rc = JBR_ERROR_WS_ACCESS_DENIED;
665 goto finish;
666 }
667 RCC(rc, finish, jbl_from_json(&jbl, json));
668 RCC(rc, finish, ejdb_put_new(ctx->jbr->db, mctx->cname, jbl, &id));
669 ret = iwn_ws_server_printf(ws, "%s\t%" PRId64, mctx->key, id);
670
671 finish:
672 jbl_destroy(&jbl);
673 if (rc && !_ws_rc_send(ws, mctx->key, rc, 0)) {
674 ret = false;
675 }
676 return ret;
677 }
678
_ws_document_get(struct iwn_ws_sess * ws,struct mctx * mctx,int64_t id)679 static bool _ws_document_get(struct iwn_ws_sess *ws, struct mctx *mctx, int64_t id) {
680 iwrc rc;
681 JBL jbl = 0;
682 IWXSTR *xstr = 0;
683 bool ret = true;
684 struct rctx *ctx = ws->req->http->user_data;
685
686 RCC(rc, finish, ejdb_get(ctx->jbr->db, mctx->cname, id, &jbl));
687 RCA(xstr = iwxstr_new2(jbl->bn.size * 2), finish);
688 RCC(rc, finish, iwxstr_printf(xstr, "%s\t%" PRId64 "\t", mctx->key, id));
689 RCC(rc, finish, jbl_as_json(jbl, jbl_xstr_json_printer, xstr, JBL_PRINT_PRETTY));
690 ret = iwn_ws_server_write(ws, iwxstr_ptr(xstr), iwxstr_size(xstr));
691
692 finish:
693 iwxstr_destroy(xstr);
694 jbl_destroy(&jbl);
695 if (rc && !_ws_rc_send(ws, mctx->key, rc, 0)) {
696 ret = false;
697 }
698 return ret;
699 }
700
_ws_document_set(struct iwn_ws_sess * ws,struct mctx * mctx,int64_t id,const char * json)701 static bool _ws_document_set(struct iwn_ws_sess *ws, struct mctx *mctx, int64_t id, const char *json) {
702 iwrc rc;
703 JBL jbl = 0;
704 bool ret = true;
705 struct rctx *ctx = ws->req->http->user_data;
706 if (ctx->read_anon) {
707 rc = JBR_ERROR_WS_ACCESS_DENIED;
708 goto finish;
709 }
710 RCC(rc, finish, jbl_from_json(&jbl, json));
711 RCC(rc, finish, ejdb_put(ctx->jbr->db, mctx->cname, jbl, id));
712 ret = iwn_ws_server_printf(ctx->ws, "%s\t%" PRId64, mctx->key, id);
713
714 finish:
715 jbl_destroy(&jbl);
716 if (rc && !_ws_rc_send(ws, mctx->key, rc, 0)) {
717 ret = false;
718 }
719 return ret;
720 }
721
_ws_document_del(struct iwn_ws_sess * ws,struct mctx * mctx,int64_t id)722 static bool _ws_document_del(struct iwn_ws_sess *ws, struct mctx *mctx, int64_t id) {
723 iwrc rc;
724 bool ret = true;
725 struct rctx *ctx = ws->req->http->user_data;
726 if (ctx->read_anon) {
727 rc = JBR_ERROR_WS_ACCESS_DENIED;
728 goto finish;
729 }
730 RCC(rc, finish, ejdb_del(ctx->jbr->db, mctx->cname, id));
731 ret = iwn_ws_server_printf(ctx->ws, "%s\t%" PRId64, mctx->key, id);
732
733 finish:
734 if (rc && !_ws_rc_send(ws, mctx->key, rc, 0)) {
735 ret = false;
736 }
737 return ret;
738 }
739
_ws_document_patch(struct iwn_ws_sess * ws,struct mctx * mctx,int64_t id,const char * json)740 static bool _ws_document_patch(struct iwn_ws_sess *ws, struct mctx *mctx, int64_t id, const char *json) {
741 iwrc rc;
742 bool ret = true;
743 struct rctx *ctx = ws->req->http->user_data;
744 if (ctx->read_anon) {
745 rc = JBR_ERROR_WS_ACCESS_DENIED;
746 goto finish;
747 }
748 RCC(rc, finish, ejdb_patch(ctx->jbr->db, mctx->cname, json, id));
749 ret = iwn_ws_server_printf(ctx->ws, "%s\t%" PRId64, mctx->key, id);
750
751 finish:
752 if (rc && !_ws_rc_send(ws, mctx->key, rc, 0)) {
753 ret = false;
754 }
755 return ret;
756 }
757
_ws_index_set(struct iwn_ws_sess * ws,struct mctx * mctx,int64_t mode,const char * path)758 static bool _ws_index_set(struct iwn_ws_sess *ws, struct mctx *mctx, int64_t mode, const char *path) {
759 iwrc rc;
760 bool ret = true;
761 struct rctx *ctx = ws->req->http->user_data;
762 if (ctx->read_anon) {
763 rc = JBR_ERROR_WS_ACCESS_DENIED;
764 goto finish;
765 }
766 RCC(rc, finish, ejdb_ensure_index(ctx->jbr->db, mctx->cname, path, mode));
767 ret = iwn_ws_server_write(ws, mctx->key, -1);
768
769 finish:
770 if (rc && !_ws_rc_send(ws, mctx->key, rc, 0)) {
771 ret = false;
772 }
773 return ret;
774 }
775
_ws_index_del(struct iwn_ws_sess * ws,struct mctx * mctx,int64_t mode,const char * path)776 static bool _ws_index_del(struct iwn_ws_sess *ws, struct mctx *mctx, int64_t mode, const char *path) {
777 iwrc rc;
778 bool ret = true;
779 struct rctx *ctx = ws->req->http->user_data;
780 if (ctx->read_anon) {
781 rc = JBR_ERROR_WS_ACCESS_DENIED;
782 goto finish;
783 }
784 RCC(rc, finish, ejdb_remove_index(ctx->jbr->db, mctx->cname, path, mode));
785 ret = iwn_ws_server_write(ws, mctx->key, -1);
786
787 finish:
788 if (rc && !_ws_rc_send(ws, mctx->key, rc, 0)) {
789 ret = false;
790 }
791 return ret;
792 }
793
_ws_query_visitor(EJDB_EXEC * ux,EJDB_DOC doc,int64_t * step)794 static iwrc _ws_query_visitor(EJDB_EXEC *ux, EJDB_DOC doc, int64_t *step) {
795 iwrc rc = 0;
796 struct mctx *mctx = ux->opaque;
797 if (!mctx->wbuf) {
798 RCA(mctx->wbuf = iwxstr_new2(512), finish);
799 } else {
800 iwxstr_clear(mctx->wbuf);
801 }
802 if (ux->log) {
803 iwn_ws_server_printf(mctx->ctx->ws, "%s\texplain\t%s", mctx->key, iwxstr_ptr(ux->log));
804 iwxstr_destroy(ux->log);
805 ux->log = 0;
806 }
807 RCC(rc, finish, iwxstr_printf(mctx->wbuf, "%s\t%" PRId64 "\t", mctx->key, doc->id));
808 if (doc->node) {
809 RCC(rc, finish, jbn_as_json(doc->node, jbl_xstr_json_printer, mctx->wbuf, 0));
810 } else {
811 RCC(rc, finish, jbl_as_json(doc->raw, jbl_xstr_json_printer, mctx->wbuf, 0));
812 }
813 if (!iwn_ws_server_write(mctx->ctx->ws, iwxstr_ptr(mctx->wbuf), iwxstr_size(mctx->wbuf))) {
814 *step = 0;
815 }
816
817 finish:
818 return rc;
819 }
820
_ws_query(struct iwn_ws_sess * ws,struct mctx * mctx,const char * query,bool explain)821 static bool _ws_query(struct iwn_ws_sess *ws, struct mctx *mctx, const char *query, bool explain) {
822 iwrc rc;
823 bool ret = false;
824 struct rctx *ctx = mctx->ctx;
825
826 EJDB_EXEC ux = {
827 .db = ctx->jbr->db,
828 .opaque = mctx,
829 .visitor = _ws_query_visitor,
830 };
831
832 RCC(rc, finish,
833 jql_create2(&ux.q, mctx->cname, query, JQL_SILENT_ON_PARSE_ERROR | JQL_KEEP_QUERY_ON_PARSE_ERROR));
834 if (ctx->read_anon && jql_has_apply(ux.q)) {
835 rc = JBR_ERROR_WS_ACCESS_DENIED;
836 goto finish;
837 }
838 if (explain) {
839 RCA(ux.log = iwxstr_new(), finish);
840 }
841 RCC(rc, finish, ejdb_exec(&ux));
842 if (ux.log) {
843 ret = iwn_ws_server_printf(mctx->ctx->ws, "%s\texplain\t%s", mctx->key, iwxstr_ptr(ux.log));
844 } else {
845 ret = true;
846 }
847
848 finish:
849 if (rc) {
850 iwrc rcs = rc;
851 iwrc_strip_code(&rcs);
852 switch (rcs) {
853 case JQL_ERROR_QUERY_PARSE:
854 ret = _ws_error_send(ws, mctx->key, jql_error(ux.q), 0);
855 break;
856 default:
857 ret = _ws_rc_send(ws, mctx->key, rc, 0);
858 break;
859 }
860 } else {
861 if (jql_has_aggregate_count(ux.q)) {
862 ret = iwn_ws_server_printf(ws, "%s\t%" PRId64, mctx->key, ux.cnt);
863 } else {
864 ret = iwn_ws_server_write(ws, mctx->key, -1);
865 }
866 }
867 jql_destroy(&ux.q);
868 iwxstr_destroy(ux.log);
869 return ret;
870 }
871
_on_ws_msg_impl(struct iwn_ws_sess * ws,struct mctx * mctx,const char * msg_,size_t len)872 static bool _on_ws_msg_impl(struct iwn_ws_sess *ws, struct mctx *mctx, const char *msg_, size_t len) {
873 if (len < 1) {
874 return true;
875 }
876
877 int pos;
878 jbws_e wsop = JBWS_NONE;
879 const char *key = 0;
880 char *msg = (char*) msg_; // Discard const
881
882 // Trim left/right
883 for ( ; len > 0 && isspace(msg[len - 1]); --len);
884 for (pos = 0; pos < len && isspace(msg[pos]); ++pos);
885 len -= pos;
886 msg += pos;
887 if (len < 1) {
888 return true;
889 }
890
891 if (len == 1 && msg[0] == '?') {
892 static const char help[]
893 = "<key> info"
894 "\n<key> get <collection> <id>"
895 "\n<key> set <collection> <id> <document json>"
896 "\n<key> add <collection> <document json>"
897 "\n<key> del <collection> <id>"
898 "\n<key> patch <collection> <id> <patch json>"
899 "\n<key> idx <collection> <mode> <path>"
900 "\n<key> rmi <collection> <mode> <path>"
901 "\n<key> rmc <collection>"
902 "\n<key> query <collection> <query>"
903 "\n<key> explain <collection> <query>"
904 "\n<key> <query>";
905 return iwn_ws_server_write(ws, help, sizeof(help) - 1);
906 }
907
908 // Fetch key, after we can do good errors reporting
909 for (pos = 0; pos < len && !isspace(msg[pos]); ++pos);
910 if (pos > JBR_MAX_KEY_LEN) {
911 return false;
912 }
913 memcpy(mctx->key, msg, pos);
914 mctx->key[pos] = '\0';
915 if (pos >= len) {
916 return _ws_rc_send(ws, key, JBR_ERROR_WS_INVALID_MESSAGE, JBR_WS_STR_PREMATURE_END);
917 }
918
919 for ( ; pos < len && isspace(msg[pos]); ++pos);
920 len -= pos;
921 msg += pos;
922 if (len < 1) {
923 return _ws_rc_send(ws, key, JBR_ERROR_WS_INVALID_MESSAGE, JBR_WS_STR_PREMATURE_END);
924 }
925
926 // Fetch command
927 for (pos = 0; pos < len && !isspace(msg[pos]); ++pos);
928
929 if (pos <= len) {
930 if (!strncmp("get", msg, pos)) {
931 wsop = JBWS_GET;
932 } else if (!strncmp("add", msg, pos)) {
933 wsop = JBWS_ADD;
934 } else if (!strncmp("set", msg, pos)) {
935 wsop = JBWS_SET;
936 } else if (!strncmp("query", msg, pos)) {
937 wsop = JBWS_QUERY;
938 } else if (!strncmp("del", msg, pos)) {
939 wsop = JBWS_DEL;
940 } else if (!strncmp("patch", msg, pos)) {
941 wsop = JBWS_PATCH;
942 } else if (!strncmp("explain", msg, pos)) {
943 wsop = JBWS_EXPLAIN;
944 } else if (!strncmp("info", msg, pos)) {
945 wsop = JBWS_INFO;
946 } else if (!strncmp("idx", msg, pos)) {
947 wsop = JBWS_IDX;
948 } else if (!strncmp("rmi", msg, pos)) {
949 wsop = JBWS_NIDX;
950 } else if (!strncmp("rmc", msg, pos)) {
951 wsop = JBWS_REMOVE_COLL;
952 }
953 }
954
955 if (wsop > JBWS_NONE) {
956 if (wsop == JBWS_INFO) {
957 return _ws_info(ws, mctx);
958 }
959 for ( ; pos < len && isspace(msg[pos]); ++pos);
960 len -= pos;
961 msg += pos;
962
963 const char *rp = msg;
964 for (pos = 0; pos < len && !isspace(msg[pos]); ++pos);
965 len -= pos;
966 msg += pos;
967
968 if (pos < 1 || len < 1) {
969 if (wsop != JBWS_REMOVE_COLL) {
970 return _ws_rc_send(ws, mctx->key, JBR_ERROR_WS_INVALID_MESSAGE, JBR_WS_STR_PREMATURE_END);
971 }
972 } else if (pos > EJDB_COLLECTION_NAME_MAX_LEN) {
973 return _ws_rc_send(ws, key, JBR_ERROR_WS_INVALID_MESSAGE,
974 "Collection name exceeds maximum length allowed: "
975 "EJDB_COLLECTION_NAME_MAX_LEN");
976 }
977 memcpy(mctx->cname, rp, pos);
978 mctx->cname[pos] = '\0';
979
980 if (wsop == JBWS_REMOVE_COLL) {
981 return _ws_coll_remove(ws, mctx);
982 }
983
984 for (pos = 0; pos < len && isspace(msg[pos]); ++pos);
985 len -= pos;
986 msg += pos;
987 if (len < 1) {
988 return _ws_rc_send(ws, mctx->key, JBR_ERROR_WS_INVALID_MESSAGE, JBR_WS_STR_PREMATURE_END);
989 }
990
991 switch (wsop) {
992 case JBWS_ADD:
993 msg[len] = '\0';
994 return _ws_document_add(ws, mctx, msg);
995 case JBWS_QUERY:
996 case JBWS_EXPLAIN:
997 msg[len] = '\0';
998 return _ws_query(ws, mctx, msg, (wsop == JBWS_EXPLAIN));
999 default: {
1000 char nbuf[IWNUMBUF_SIZE];
1001 for (pos = 0; pos < len && pos < IWNUMBUF_SIZE - 1 && isdigit(msg[pos]); ++pos) {
1002 nbuf[pos] = msg[pos];
1003 }
1004 nbuf[pos] = '\0';
1005 for ( ; pos < len && isspace(msg[pos]); ++pos);
1006 len -= pos;
1007 msg += pos;
1008
1009 int64_t id = iwatoi(nbuf);
1010 if (id < 1) {
1011 return _ws_rc_send(ws, mctx->key, JBR_ERROR_WS_INVALID_MESSAGE, "Invalid document id specified");
1012 }
1013 msg[len] = '\0';
1014 switch (wsop) {
1015 case JBWS_GET:
1016 return _ws_document_get(ws, mctx, id);
1017 case JBWS_SET:
1018 return _ws_document_set(ws, mctx, id, msg);
1019 case JBWS_DEL:
1020 return _ws_document_del(ws, mctx, id);
1021 case JBWS_PATCH:
1022 return _ws_document_patch(ws, mctx, id, msg);
1023 case JBWS_IDX:
1024 return _ws_index_set(ws, mctx, id, msg);
1025 case JBWS_NIDX:
1026 return _ws_index_del(ws, mctx, id, msg);
1027 default:
1028 return _ws_rc_send(ws, mctx->key, JBR_ERROR_WS_INVALID_MESSAGE, 0);
1029 }
1030 }
1031 }
1032 } else {
1033 msg[len] = '\0';
1034 return _ws_query(ws, mctx, msg, false);
1035 }
1036 }
1037
_on_ws_msg(struct iwn_ws_sess * ws,const char * msg,size_t len,uint8_t frame)1038 static bool _on_ws_msg(struct iwn_ws_sess *ws, const char *msg, size_t len, uint8_t frame) {
1039 struct mctx mctx = {
1040 .ctx = ws->req->http->user_data,
1041 };
1042 bool ret = _on_ws_msg_impl(ws, &mctx, msg, len);
1043 iwxstr_destroy(mctx.wbuf);
1044 return ret;
1045 }
1046
_configure(struct jbr * jbr)1047 static iwrc _configure(struct jbr *jbr) {
1048 iwrc rc;
1049
1050 RCC(rc, finish, iwn_wf_create(&(struct iwn_wf_route) {
1051 .handler = 0
1052 }, &jbr->ctx));
1053
1054 RCC(rc, finish, iwn_wf_route(iwn_ws_server_route_attach(&(struct iwn_wf_route) {
1055 .ctx = jbr->ctx,
1056 .pattern = "/",
1057 .flags = IWN_WF_GET,
1058 }, &(struct iwn_ws_handler_spec) {
1059 .handler = _on_ws_msg,
1060 .on_http_init = _on_ws_session_http,
1061 .on_session_init = _on_ws_session_init,
1062 .on_session_dispose = _on_ws_session_dispose,
1063 .user_data = jbr
1064 }), 0));
1065
1066 RCC(rc, finish, iwn_wf_route(&(struct iwn_wf_route) {
1067 .ctx = jbr->ctx,
1068 .pattern = "/",
1069 .flags = IWN_WF_MATCH_PREFIX | IWN_WF_METHODS_ALL,
1070 .handler = _on_http_request,
1071 .user_data = jbr
1072 }, 0));
1073
1074 finish:
1075 return rc;
1076 }
1077
_start(struct jbr * jbr)1078 static iwrc _start(struct jbr *jbr) {
1079 const EJDB_HTTP *http = jbr->http;
1080 struct iwn_wf_server_spec spec = {
1081 .poller = jbr->poller,
1082 .listen = http->bind ? http->bind : "localhost",
1083 .port = http->port > 0 ? http->port : 9292,
1084 };
1085 if (http->ssl_private_key) {
1086 spec.ssl.private_key = http->ssl_private_key;
1087 spec.ssl.private_key_len = -1;
1088 }
1089 if (http->ssl_certs) {
1090 spec.ssl.certs = http->ssl_certs;
1091 spec.ssl.certs_len = -1;
1092 }
1093
1094 return iwn_wf_server(&spec, jbr->ctx);
1095 }
1096
jbr_start(EJDB db,const EJDB_OPTS * opts,struct jbr ** jbrp)1097 iwrc jbr_start(EJDB db, const EJDB_OPTS *opts, struct jbr **jbrp) {
1098 iwrc rc = 0;
1099 *jbrp = 0;
1100 if (!opts->http.enabled) {
1101 return 0;
1102 }
1103 JBR jbr = calloc(1, sizeof(*jbr));
1104 if (!jbr) {
1105 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
1106 }
1107 jbr->db = db;
1108 jbr->http = &opts->http;
1109 *jbrp = jbr;
1110
1111 uint16_t cores = iwp_num_cpu_cores();
1112 cores = MAX(2, cores == 0 ? 1 : cores - 1);
1113
1114 RCC(rc, finish, _configure(jbr));
1115 RCC(rc, finish, iwn_poller_create(cores, cores / 2, &jbr->poller));
1116 RCC(rc, finish, _start(jbr));
1117
1118 if (!jbr->http->blocking) {
1119 pthread_create(&jbr->poller_thread, 0, _poller_worker, jbr);
1120 } else {
1121 iwn_poller_poll(jbr->poller);
1122 iwn_poller_destroy(&jbr->poller);
1123 *jbrp = 0;
1124 free(jbr);
1125 }
1126
1127 finish:
1128 if (rc) {
1129 *jbrp = 0;
1130 iwn_wf_destroy(jbr->ctx);
1131 iwn_poller_destroy(&jbr->poller);
1132 free(jbr);
1133 }
1134 return rc;
1135 }
1136
_jbr_ecodefn(locale_t locale,uint32_t ecode)1137 static const char* _jbr_ecodefn(locale_t locale, uint32_t ecode) {
1138 if (!((ecode > _JBR_ERROR_START) && (ecode < _JBR_ERROR_END))) {
1139 return 0;
1140 }
1141 switch (ecode) {
1142 case JBR_ERROR_WS_INVALID_MESSAGE:
1143 return "Invalid message recieved (JBR_ERROR_WS_INVALID_MESSAGE)";
1144 case JBR_ERROR_WS_ACCESS_DENIED:
1145 return "Access denied (JBR_ERROR_WS_ACCESS_DENIED)";
1146 }
1147 return 0;
1148 }
1149
jbr_init(void)1150 iwrc jbr_init(void) {
1151 static int _jbr_initialized = 0;
1152 if (!__sync_bool_compare_and_swap(&_jbr_initialized, 0, 1)) {
1153 return 0;
1154 }
1155 return iwlog_register_ecodefn(_jbr_ecodefn);
1156 }
1157