• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "jbr.h"
2 #include "ejdb2_internal.h"
3 
4 #include <iwnet/iwn_ws_server.h>
5 #include <iwnet/iwn_pairs.h>
6 #include <iowow/iwconv.h>
7 
8 #include <pthread.h>
9 #include <ctype.h>
10 
11 #define JBR_MAX_KEY_LEN          36
12 #define JBR_WS_STR_PREMATURE_END "Premature end of message"
13 
14 typedef enum {
15   _JBR_ERROR_START = (IW_ERROR_START + 15000UL + 3000),
16   JBR_ERROR_WS_INVALID_MESSAGE, /**< Invalid message recieved (JBR_ERROR_WS_INVALID_MESSAGE) */
17   JBR_ERROR_WS_ACCESS_DENIED,   /**< Access denied (JBR_ERROR_WS_ACCESS_DENIED) */
18   _JBR_ERROR_END,
19 } jbr_ecode_t;
20 
21 struct jbr {
22   struct iwn_poller *poller;
23   pthread_t poller_thread;
24   const EJDB_HTTP   *http;
25   struct iwn_wf_ctx *ctx;
26   EJDB db;
27 };
28 
29 struct rctx {
30   struct iwn_wf_req  *req;
31   struct iwn_ws_sess *ws;
32   struct jbr     *jbr;
33   struct iwn_vals vals;
34   pthread_mutex_t mtx;
35   pthread_cond_t  cond;
36   EJDB_EXEC       ux;
37   int64_t   id;
38   pthread_t request_thread;
39   bool      read_anon;
40   bool      visitor_started;
41   bool      visitor_finished;
42   char      cname[EJDB_COLLECTION_NAME_MAX_LEN + 1];
43 };
44 
45 struct mctx {
46   struct rctx *ctx;
47   IWXSTR      *wbuf;
48   int64_t      id;
49   char cname[EJDB_COLLECTION_NAME_MAX_LEN + 1];
50   char key[JBR_MAX_KEY_LEN + 1];
51 };
52 
53 #define JBR_RC_REPORT(ret_, r_, rc_)                                 \
54   do {                                                                \
55     if ((ret_) >= 500) iwlog_ecode_error3(rc_);                      \
56     const char *err = iwlog_ecode_explained(rc_);                     \
57     if (!iwn_http_response_write(r_, ret_, "text/plain", err, -1)) { \
58       ret_ = -1;                                                     \
59     } else {                                                          \
60       ret_ = 1;                                                      \
61     }                                                                 \
62   } while (0)
63 
jbr_shutdown_request(EJDB db)64 void jbr_shutdown_request(EJDB db) {
65   if (db->jbr) {
66     iwn_poller_shutdown_request(db->jbr->poller);
67   }
68 }
69 
jbr_shutdown_wait(struct jbr * jbr)70 void jbr_shutdown_wait(struct jbr *jbr) {
71   if (jbr) {
72     pthread_t t = jbr->poller_thread;
73     iwn_poller_shutdown_request(jbr->poller);
74     if (t && t != pthread_self()) {
75       jbr->poller_thread = 0;
76       pthread_join(t, 0);
77     }
78     free(jbr);
79   }
80 }
81 
_poller_worker(void * op)82 static void* _poller_worker(void *op) {
83   JBR jbr = op;
84   iwn_poller_poll(jbr->poller);
85   iwn_poller_destroy(&jbr->poller);
86   return 0;
87 }
88 
_rctx_dispose(struct rctx * ctx)89 static void _rctx_dispose(struct rctx *ctx) {
90   if (ctx) {
91     for (struct iwn_val *v = ctx->vals.first; v; ) {
92       struct iwn_val *n = v->next;
93       free(v->buf);
94       free(v);
95       v = n;
96     }
97     pthread_mutex_destroy(&ctx->mtx);
98     pthread_cond_destroy(&ctx->cond);
99     free(ctx);
100   }
101 }
102 
_on_http_request_dispose(struct iwn_http_req * req)103 static void _on_http_request_dispose(struct iwn_http_req *req) {
104   struct rctx *ctx = req->user_data;
105   _rctx_dispose(ctx);
106 }
107 
_on_get(struct rctx * ctx)108 static int _on_get(struct rctx *ctx) {
109   JBL jbl = 0;
110   IWXSTR *xstr = 0;
111   int nbytes = 0, ret = 500;
112 
113   iwrc rc = ejdb_get(ctx->jbr->db, ctx->cname, ctx->id, &jbl);
114   if (rc) {
115     if ((rc == IWKV_ERROR_NOTFOUND) || (rc == IW_ERROR_NOT_EXISTS)) {
116       return 404;
117     }
118     goto finish;
119   }
120 
121   if (ctx->req->flags & IWN_WF_HEAD) {
122     RCC(rc, finish, jbl_as_json(jbl, jbl_count_json_printer, &nbytes, JBL_PRINT_PRETTY));
123     RCC(rc, finish, iwn_http_response_header_i64_set(ctx->req->http, "content-length", nbytes));
124   } else {
125     RCA(xstr = iwxstr_new2(jbl->bn.size * 2), finish);
126     RCC(rc, finish, jbl_as_json(jbl, jbl_xstr_json_printer, xstr, JBL_PRINT_PRETTY));
127     nbytes = iwxstr_size(xstr);
128   }
129 
130   ret = iwn_http_response_write(ctx->req->http, 200, "application/json",
131                                 xstr ? iwxstr_ptr(xstr) : 0, xstr ? nbytes : 0) ? 1 : -1;
132 
133 finish:
134   if (rc) {
135     JBR_RC_REPORT(ret, ctx->req->http, rc);
136   }
137   jbl_destroy(&jbl);
138   iwxstr_destroy(xstr);
139   return ret;
140 }
141 
_on_post(struct rctx * ctx)142 static int _on_post(struct rctx *ctx) {
143   if (ctx->read_anon) {
144     return 403;
145   }
146   if (ctx->req->body_len < 1) {
147     return 400;
148   }
149 
150   JBL jbl;
151   int64_t id;
152   int ret = 500;
153   iwrc rc = jbl_from_json(&jbl, ctx->req->body);
154   if (rc) {
155     ret = 400;
156     goto finish;
157   }
158   RCC(rc, finish, ejdb_put_new(ctx->jbr->db, ctx->cname, jbl, &id));
159   ret = iwn_http_response_printf(ctx->req->http, 200, "text/plain", "%" PRId64, id) ? 1 : -1;
160 
161 finish:
162   if (rc) {
163     JBR_RC_REPORT(ret, ctx->req->http, rc);
164   }
165   jbl_destroy(&jbl);
166   return ret;
167 }
168 
_on_put(struct rctx * ctx)169 static int _on_put(struct rctx *ctx) {
170   if (ctx->read_anon) {
171     return 403;
172   }
173   if (ctx->req->body_len < 1) {
174     return 400;
175   }
176 
177   JBL jbl;
178   int ret = 500;
179   iwrc rc = jbl_from_json(&jbl, ctx->req->body);
180   if (rc) {
181     ret = 400;
182     goto finish;
183   }
184   RCC(rc, finish, ejdb_put(ctx->jbr->db, ctx->cname, jbl, ctx->id));
185   ret = 200;
186 
187 finish:
188   if (rc) {
189     JBR_RC_REPORT(ret, ctx->req->http, rc);
190   }
191   jbl_destroy(&jbl);
192   return ret;
193 }
194 
_on_patch(struct rctx * ctx)195 static int _on_patch(struct rctx *ctx) {
196   if (ctx->read_anon) {
197     return 403;
198   }
199   if (ctx->req->body_len < 1) {
200     return 400;
201   }
202   int ret = 200;
203   iwrc rc = ejdb_patch(ctx->jbr->db, ctx->cname, ctx->req->body, ctx->id);
204   if (rc) {
205     iwrc_strip_code(&rc);
206     switch (rc) {
207       case IWKV_ERROR_NOTFOUND:
208       case IW_ERROR_NOT_EXISTS:
209         rc = 0;
210         ret = 404;
211         break;
212       case JBL_ERROR_PARSE_JSON:
213       case JBL_ERROR_PARSE_INVALID_CODEPOINT:
214       case JBL_ERROR_PARSE_INVALID_UTF8:
215       case JBL_ERROR_PARSE_UNQUOTED_STRING:
216       case JBL_ERROR_PATCH_TARGET_INVALID:
217       case JBL_ERROR_PATCH_NOVALUE:
218       case JBL_ERROR_PATCH_INVALID_OP:
219       case JBL_ERROR_PATCH_TEST_FAILED:
220       case JBL_ERROR_PATCH_INVALID_ARRAY_INDEX:
221       case JBL_ERROR_JSON_POINTER:
222         ret = 400;
223         break;
224       default:
225         ret = 500;
226         break;
227     }
228   }
229   if (rc) {
230     JBR_RC_REPORT(ret, ctx->req->http, rc);
231   }
232   return ret;
233 }
234 
_on_delete(struct rctx * ctx)235 static int _on_delete(struct rctx *ctx) {
236   if (ctx->read_anon) {
237     return 403;
238   }
239   iwrc rc = ejdb_del(ctx->jbr->db, ctx->cname, ctx->id);
240   if (rc) {
241     if (rc == IWKV_ERROR_NOTFOUND || rc == IW_ERROR_NOT_EXISTS) {
242       return 404;
243     } else {
244       int ret = 500;
245       JBR_RC_REPORT(ret, ctx->req->http, rc);
246       return ret;
247     }
248   }
249   return 200;
250 }
251 
_on_options(struct rctx * ctx)252 static int _on_options(struct rctx *ctx) {
253   iwrc rc;
254   JBL jbl = 0;
255   IWXSTR *xstr = 0;
256   int ret = 500;
257 
258   RCC(rc, finish, ejdb_get_meta(ctx->jbr->db, &jbl));
259   RCA(xstr = iwxstr_new2(jbl->bn.size * 2), finish);
260   RCC(rc, finish, jbl_as_json(jbl, jbl_xstr_json_printer, xstr, JBL_PRINT_PRETTY));
261 
262   if (ctx->jbr->http->read_anon) {
263     RCC(rc, finish, iwn_http_response_header_add(ctx->req->http, "Allow", "GET, HEAD, POST, OPTIONS", 24));
264   } else {
265     RCC(rc, finish, iwn_http_response_header_add(ctx->req->http, "Allow",
266                                                  "GET, HEAD, POST, PUT, PATCH, DELETE, OPTIONS", 44));
267   }
268 
269   if (ctx->jbr->http->cors) {
270     RCC(rc, finish, iwn_http_response_header_add(ctx->req->http, "Access-Control-Allow-Origin", "*", 1));
271     RCC(rc, finish, iwn_http_response_header_add(ctx->req->http, "Access-Control-Allow-Headers",
272                                                  "X-Requested-With, Content-Type, Accept, Origin, Authorization", 61));
273 
274     if (ctx->jbr->http->read_anon) {
275       RCC(rc, finish, iwn_http_response_header_add(ctx->req->http, "Access-Control-Allow-Methods",
276                                                    "GET, HEAD, POST, OPTIONS", 24));
277     } else {
278       RCC(rc, finish, iwn_http_response_header_add(ctx->req->http, "Access-Control-Allow-Methods",
279                                                    "GET, HEAD, POST, PUT, PATCH, DELETE, OPTIONS", 44));
280     }
281   }
282 
283   ret = iwn_http_response_write(ctx->req->http, 200, "application/json", iwxstr_ptr(xstr), iwxstr_size(xstr)) ? 1 : -1;
284 
285 finish:
286   if (rc) {
287     JBR_RC_REPORT(ret, ctx->req->http, rc);
288   }
289   jbl_destroy(&jbl);
290   iwxstr_destroy(xstr);
291   return ret;
292 }
293 
_query_chunk_write_next(struct iwn_http_req * req,bool * again)294 static bool _query_chunk_write_next(struct iwn_http_req *req, bool *again) {
295   iwrc rc = 0;
296   struct iwn_val *val;
297   struct rctx *ctx = req->user_data;
298 
299   if (ctx->request_thread == pthread_self()) {
300     return iwn_poller_arm_events(req->poller_adapter->poller, req->poller_adapter->fd, IWN_POLLOUT) == 0;
301   }
302 
303   pthread_mutex_lock(&ctx->mtx);
304 start:
305   val = ctx->vals.first;
306   if (val) {
307     ctx->vals.first = val->next;
308     if (ctx->vals.last == val) {
309       ctx->vals.last = 0;
310     }
311   } else if (!ctx->visitor_finished) {
312     pthread_cond_wait(&ctx->cond, &ctx->mtx);
313     goto start;
314   }
315   pthread_mutex_unlock(&ctx->mtx);
316 
317   if (val) {
318     rc = iwn_http_response_chunk_write(req, val->buf, val->len, _query_chunk_write_next, again);
319     free(val->buf);
320     free(val);
321   } else {
322     rc = iwn_http_response_chunk_end(req);
323   }
324 
325   return rc == 0;
326 }
327 
_query_visitor(EJDB_EXEC * ux,EJDB_DOC doc,int64_t * step)328 static iwrc _query_visitor(EJDB_EXEC *ux, EJDB_DOC doc, int64_t *step) {
329   iwrc rc = 0;
330   struct rctx *ctx = ux->opaque;
331   IWXSTR *xstr = iwxstr_new();
332   RCA(xstr, finish);
333 
334   if (ux->log) {
335     RCC(rc, finish, iwxstr_cat(xstr, iwxstr_ptr(ux->log), iwxstr_size(ux->log)));
336     RCC(rc, finish, iwxstr_cat(xstr, "--------------------", 20));
337     iwxstr_destroy(ux->log);
338     ux->log = 0;
339   }
340   RCC(rc, finish, iwxstr_printf(xstr, "\r\n%" PRId64 "\t", doc->id));
341   if (doc->node) {
342     RCC(rc, finish, jbn_as_json(doc->node, jbl_xstr_json_printer, xstr, 0));
343   } else {
344     RCC(rc, finish, jbl_as_json(doc->raw, jbl_xstr_json_printer, xstr, 0));
345   }
346 
347   if (ctx->visitor_started) {
348     size_t sz = iwxstr_size(xstr);
349     char *buf = iwxstr_destroy_keep_ptr(xstr);
350     pthread_mutex_lock(&ctx->mtx);
351     iwn_val_add_new(&ctx->vals, buf, sz);
352     pthread_cond_broadcast(&ctx->cond);
353     pthread_mutex_unlock(&ctx->mtx);
354   } else {
355     ctx->visitor_started = true;
356     RCC(rc, finish, iwn_http_response_chunk_write(
357           ctx->req->http, iwxstr_ptr(xstr), iwxstr_size(xstr), _query_chunk_write_next, 0));
358     iwxstr_destroy(xstr);
359   }
360 
361   xstr = 0;
362 
363 finish:
364   iwxstr_destroy(xstr);
365   return rc;
366 }
367 
_on_query(struct rctx * ctx)368 static int _on_query(struct rctx *ctx) {
369   if (ctx->req->body_len < 1) {
370     return 400;
371   }
372   iwrc rc = 0;
373   int ret = 500;
374 
375   ctx->ux.opaque = ctx;
376   ctx->ux.db = ctx->jbr->db;
377   ctx->ux.visitor = _query_visitor;
378 
379   RCC(rc, finish,
380       jql_create2(&ctx->ux.q, 0, ctx->req->body, JQL_SILENT_ON_PARSE_ERROR | JQL_KEEP_QUERY_ON_PARSE_ERROR));
381 
382   if (ctx->read_anon && jql_has_apply(ctx->ux.q)) {
383     jql_destroy(&ctx->ux.q);
384     return 403;
385   }
386 
387   struct iwn_val val = iwn_http_request_header_get(ctx->req->http, "x-hints", IW_LLEN("x-hints"));
388   if (val.len) {
389     char buf[val.len + 1];
390     memcpy(buf, val.buf, val.len);
391     buf[val.len] = '\0';
392     if (strstr(buf, "explain")) {
393       RCA(ctx->ux.log = iwxstr_new(), finish);
394     }
395   }
396 
397   rc = ejdb_exec(&ctx->ux);
398 
399   pthread_mutex_lock(&ctx->mtx);
400   ctx->visitor_finished = true;
401   pthread_cond_broadcast(&ctx->cond);
402   pthread_mutex_unlock(&ctx->mtx);
403   RCGO(rc, finish);
404 
405   if (!ctx->visitor_started) {
406     if (ctx->ux.log) {
407       iwxstr_cat(ctx->ux.log, "--------------------", 20);
408       if (jql_has_aggregate_count(ctx->ux.q)) {
409         iwxstr_printf(ctx->ux.log, "\n%" PRId64, ctx->ux.cnt);
410       }
411       ret = iwn_http_response_write(ctx->req->http, 200, "text/plain",
412                                     iwxstr_ptr(ctx->ux.log), iwxstr_size(ctx->ux.log))
413             ? 1 : -1;
414     } else if (jql_has_aggregate_count(ctx->ux.q)) {
415       ret = iwn_http_response_printf(ctx->req->http, 200, "text/plain", "%" PRId64, ctx->ux.cnt)
416             ? 1 : -1;
417     } else {
418       ret = 200;
419     }
420   } else {
421     ret = 1;
422   }
423 
424 finish:
425   if (rc) {
426     iwrc rcs = rc;
427     iwrc_strip_code(&rcs);
428     switch (rcs) {
429       case JQL_ERROR_QUERY_PARSE: {
430         const char *err = jql_error(ctx->ux.q);
431         ret = iwn_http_response_write(ctx->req->http, 400, "text/plain", err, -1) ? 1 : -1;
432         break;
433       }
434       case JQL_ERROR_NO_COLLECTION:
435         ret = 400;
436         JBR_RC_REPORT(ret, ctx->req->http, rc);
437         break;
438       default:
439         JBR_RC_REPORT(ret, ctx->req->http, rc);
440         break;
441     }
442   }
443   jql_destroy(&ctx->ux.q);
444   iwxstr_destroy(ctx->ux.log);
445   return ret;
446 }
447 
_on_http_request(struct iwn_wf_req * req,void * op)448 static int _on_http_request(struct iwn_wf_req *req, void *op) {
449   struct jbr *jbr = op;
450   struct rctx *ctx = calloc(1, sizeof(*ctx));
451   if (!ctx) {
452     return 500;
453   }
454   pthread_mutex_init(&ctx->mtx, 0);
455   pthread_cond_init(&ctx->cond, 0);
456 
457   ctx->req = req;
458   ctx->jbr = jbr;
459   ctx->request_thread = pthread_self();
460 
461   uint32_t method = req->flags & IWN_WF_METHODS_ALL;
462   req->http->user_data = ctx;
463   req->http->on_request_dispose = _on_http_request_dispose;
464 
465   if ((req->flags & IWN_WF_OPTIONS) && req->path_unmatched[0] != '\0') {
466     return 400;
467   }
468 
469   {
470     // Parse {collection name}/{id}
471     const char *cname = req->path_unmatched;
472     size_t len = strlen(cname);
473     char *c = strchr(req->path_unmatched, '/');
474     if (!c) {
475       if (  len > EJDB_COLLECTION_NAME_MAX_LEN
476          || (method & (IWN_WF_GET | IWN_WF_HEAD | IWN_WF_PUT | IWN_WF_DELETE | IWN_WF_PATCH))) {
477         return 400;
478       }
479     } else {
480       len = c - cname;
481       if (  len > EJDB_COLLECTION_NAME_MAX_LEN
482          || method == IWN_WF_POST) {
483         return 400;
484       }
485       char *ep;
486       ctx->id = strtoll(c + 1, &ep, 10);
487       if (*ep != '\0' || ctx->id < 1) {
488         return 400;
489       }
490     }
491     memcpy(ctx->cname, cname, len);
492     ctx->cname[len] = '\0';
493   }
494 
495   if (jbr->http->access_token) {
496     struct iwn_val val = iwn_http_request_header_get(req->http, "x-access-token", IW_LLEN("x-access-token"));
497     if (!val.len) {
498       if (jbr->http->read_anon) {
499         if (  (method & (IWN_WF_GET | IWN_WF_HEAD))
500            || (method == IWN_WF_POST && ctx->cname[0] == '\0')) {
501           ctx->read_anon = true;
502           goto process;
503         }
504       }
505       return 401;
506     } else {
507       if (  val.len != jbr->http->access_token_len
508          || strncmp(val.buf, jbr->http->access_token, val.len) != 0) {
509         return 403;
510       }
511     }
512   }
513 
514 process:
515   if (jbr->http->cors && iwn_http_response_header_set(req->http, "access-control-allow-origin", "*", 1)) {
516     return 500;
517   }
518   if (ctx->cname[0] != '\0') {
519     switch (method) {
520       case IWN_WF_GET:
521       case IWN_WF_HEAD:
522         return _on_get(ctx);
523       case IWN_WF_POST:
524         return _on_post(ctx);
525       case IWN_WF_PUT:
526         return _on_put(ctx);
527       case IWN_WF_PATCH:
528         return _on_patch(ctx);
529       case IWN_WF_DELETE:
530         return _on_delete(ctx);
531       default:
532         return 400;
533     }
534   } else if (method == IWN_WF_POST) {
535     return _on_query(ctx);
536   } else if (method == IWN_WF_OPTIONS) {
537     return _on_options(ctx);
538   } else {
539     return 400;
540   }
541 }
542 
543 typedef enum {
544   JBWS_NONE,
545   JBWS_SET,
546   JBWS_GET,
547   JBWS_ADD,
548   JBWS_DEL,
549   JBWS_PATCH,
550   JBWS_QUERY,
551   JBWS_EXPLAIN,
552   JBWS_INFO,
553   JBWS_IDX,
554   JBWS_NIDX,
555   JBWS_REMOVE_COLL,
556 } jbws_e;
557 
_on_ws_session_http(struct iwn_wf_req * req,struct iwn_ws_handler_spec * spec)558 static int _on_ws_session_http(struct iwn_wf_req *req, struct iwn_ws_handler_spec *spec) {
559   struct jbr *jbr = spec->user_data;
560   struct rctx *ctx = calloc(1, sizeof(*ctx));
561   if (!ctx) {
562     return 500;
563   }
564   pthread_mutex_init(&ctx->mtx, 0);
565   pthread_cond_init(&ctx->cond, 0);
566   ctx->req = req;
567   ctx->jbr = jbr;
568   req->http->user_data = ctx;
569 
570   if (jbr->http->access_token) {
571     struct iwn_val val = iwn_http_request_header_get(req->http, "x-access-token", IW_LLEN("x-access-token"));
572     if (val.len) {
573       if (val.len != jbr->http->access_token_len || strncmp(val.buf, jbr->http->access_token, val.len) != 0) {
574         return 403;
575       }
576     } else {
577       if (jbr->http->read_anon) {
578         ctx->read_anon = true;
579       } else {
580         return 401;
581       }
582     }
583   }
584 
585   return 0;
586 }
587 
_on_ws_session_dispose(struct iwn_ws_sess * ws)588 static void _on_ws_session_dispose(struct iwn_ws_sess *ws) {
589   _on_http_request_dispose(ws->req->http);
590 }
591 
_on_ws_session_init(struct iwn_ws_sess * ws)592 static bool _on_ws_session_init(struct iwn_ws_sess *ws) {
593   struct rctx *ctx = ws->req->http->user_data;
594   ctx->ws = ws;
595   return true;
596 }
597 
_ws_error_send(struct iwn_ws_sess * ws,const char * key,const char * error,const char * extra)598 static bool _ws_error_send(struct iwn_ws_sess *ws, const char *key, const char *error, const char *extra) {
599   if (key == 0) {
600     return false;
601   }
602   if (extra) {
603     return iwn_ws_server_printf(ws, "%s ERROR: %s %s", key, error, extra);
604   } else {
605     return iwn_ws_server_printf(ws, "%s ERROR: %s", key, error);
606   }
607 }
608 
_ws_rc_send(struct iwn_ws_sess * ws,const char * key,iwrc rc,const char * extra)609 static bool _ws_rc_send(struct iwn_ws_sess *ws, const char *key, iwrc rc, const char *extra) {
610   const char *error = iwlog_ecode_explained(rc);
611   return _ws_error_send(ws, key, error ? error : "unknown", extra);
612 }
613 
_ws_info(struct iwn_ws_sess * ws,struct mctx * mctx)614 static bool _ws_info(struct iwn_ws_sess *ws, struct mctx *mctx) {
615   iwrc rc;
616   JBL jbl = 0;
617   IWXSTR *xstr = 0;
618   bool ret = true;
619   struct rctx *ctx = mctx->ctx;
620   if (ctx->read_anon) {
621     rc = JBR_ERROR_WS_ACCESS_DENIED;
622     goto finish;
623   }
624   RCC(rc, finish, ejdb_get_meta(ctx->jbr->db, &jbl));
625   RCA(xstr = iwxstr_new2(jbl->bn.size * 2), finish);
626   RCC(rc, finish, iwxstr_printf(xstr, "%s\t", mctx->key));
627   RCC(rc, finish, jbl_as_json(jbl, jbl_xstr_json_printer, xstr, JBL_PRINT_PRETTY));
628   ret = iwn_ws_server_write(ws, iwxstr_ptr(xstr), iwxstr_size(xstr));
629 
630 finish:
631   if (rc && !_ws_rc_send(ws, mctx->key, rc, 0)) {
632     ret = false;
633   }
634   jbl_destroy(&jbl);
635   iwxstr_destroy(xstr);
636   return ret;
637 }
638 
_ws_coll_remove(struct iwn_ws_sess * ws,struct mctx * mctx)639 static bool _ws_coll_remove(struct iwn_ws_sess *ws, struct mctx *mctx) {
640   iwrc rc;
641   bool ret = true;
642   struct rctx *ctx = mctx->ctx;
643   if (ctx->read_anon) {
644     rc = JBR_ERROR_WS_ACCESS_DENIED;
645     goto finish;
646   }
647   RCC(rc, finish, ejdb_remove_collection(ctx->jbr->db, mctx->cname));
648   ret = iwn_ws_server_write(ws, mctx->key, -1);
649 
650 finish:
651   if (rc && !_ws_rc_send(ws, mctx->key, rc, 0)) {
652     ret = false;
653   }
654   return ret;
655 }
656 
_ws_document_add(struct iwn_ws_sess * ws,struct mctx * mctx,const char * json)657 static bool _ws_document_add(struct iwn_ws_sess *ws, struct mctx *mctx, const char *json) {
658   iwrc rc;
659   JBL jbl = 0;
660   int64_t id;
661   bool ret = true;
662   struct rctx *ctx = ws->req->http->user_data;
663   if (ctx->read_anon) {
664     rc = JBR_ERROR_WS_ACCESS_DENIED;
665     goto finish;
666   }
667   RCC(rc, finish, jbl_from_json(&jbl, json));
668   RCC(rc, finish, ejdb_put_new(ctx->jbr->db, mctx->cname, jbl, &id));
669   ret = iwn_ws_server_printf(ws, "%s\t%" PRId64, mctx->key, id);
670 
671 finish:
672   jbl_destroy(&jbl);
673   if (rc && !_ws_rc_send(ws, mctx->key, rc, 0)) {
674     ret = false;
675   }
676   return ret;
677 }
678 
_ws_document_get(struct iwn_ws_sess * ws,struct mctx * mctx,int64_t id)679 static bool _ws_document_get(struct iwn_ws_sess *ws, struct mctx *mctx, int64_t id) {
680   iwrc rc;
681   JBL jbl = 0;
682   IWXSTR *xstr = 0;
683   bool ret = true;
684   struct rctx *ctx = ws->req->http->user_data;
685 
686   RCC(rc, finish, ejdb_get(ctx->jbr->db, mctx->cname, id, &jbl));
687   RCA(xstr = iwxstr_new2(jbl->bn.size * 2), finish);
688   RCC(rc, finish, iwxstr_printf(xstr, "%s\t%" PRId64 "\t", mctx->key, id));
689   RCC(rc, finish, jbl_as_json(jbl, jbl_xstr_json_printer, xstr, JBL_PRINT_PRETTY));
690   ret = iwn_ws_server_write(ws, iwxstr_ptr(xstr), iwxstr_size(xstr));
691 
692 finish:
693   iwxstr_destroy(xstr);
694   jbl_destroy(&jbl);
695   if (rc && !_ws_rc_send(ws, mctx->key, rc, 0)) {
696     ret = false;
697   }
698   return ret;
699 }
700 
_ws_document_set(struct iwn_ws_sess * ws,struct mctx * mctx,int64_t id,const char * json)701 static bool _ws_document_set(struct iwn_ws_sess *ws, struct mctx *mctx, int64_t id, const char *json) {
702   iwrc rc;
703   JBL jbl = 0;
704   bool ret = true;
705   struct rctx *ctx = ws->req->http->user_data;
706   if (ctx->read_anon) {
707     rc = JBR_ERROR_WS_ACCESS_DENIED;
708     goto finish;
709   }
710   RCC(rc, finish, jbl_from_json(&jbl, json));
711   RCC(rc, finish, ejdb_put(ctx->jbr->db, mctx->cname, jbl, id));
712   ret = iwn_ws_server_printf(ctx->ws, "%s\t%" PRId64, mctx->key, id);
713 
714 finish:
715   jbl_destroy(&jbl);
716   if (rc && !_ws_rc_send(ws, mctx->key, rc, 0)) {
717     ret = false;
718   }
719   return ret;
720 }
721 
_ws_document_del(struct iwn_ws_sess * ws,struct mctx * mctx,int64_t id)722 static bool _ws_document_del(struct iwn_ws_sess *ws, struct mctx *mctx, int64_t id) {
723   iwrc rc;
724   bool ret = true;
725   struct rctx *ctx = ws->req->http->user_data;
726   if (ctx->read_anon) {
727     rc = JBR_ERROR_WS_ACCESS_DENIED;
728     goto finish;
729   }
730   RCC(rc, finish, ejdb_del(ctx->jbr->db, mctx->cname, id));
731   ret = iwn_ws_server_printf(ctx->ws, "%s\t%" PRId64, mctx->key, id);
732 
733 finish:
734   if (rc && !_ws_rc_send(ws, mctx->key, rc, 0)) {
735     ret = false;
736   }
737   return ret;
738 }
739 
_ws_document_patch(struct iwn_ws_sess * ws,struct mctx * mctx,int64_t id,const char * json)740 static bool _ws_document_patch(struct iwn_ws_sess *ws, struct mctx *mctx, int64_t id, const char *json) {
741   iwrc rc;
742   bool ret = true;
743   struct rctx *ctx = ws->req->http->user_data;
744   if (ctx->read_anon) {
745     rc = JBR_ERROR_WS_ACCESS_DENIED;
746     goto finish;
747   }
748   RCC(rc, finish, ejdb_patch(ctx->jbr->db, mctx->cname, json, id));
749   ret = iwn_ws_server_printf(ctx->ws, "%s\t%" PRId64, mctx->key, id);
750 
751 finish:
752   if (rc && !_ws_rc_send(ws, mctx->key, rc, 0)) {
753     ret = false;
754   }
755   return ret;
756 }
757 
_ws_index_set(struct iwn_ws_sess * ws,struct mctx * mctx,int64_t mode,const char * path)758 static bool _ws_index_set(struct iwn_ws_sess *ws, struct mctx *mctx, int64_t mode, const char *path) {
759   iwrc rc;
760   bool ret = true;
761   struct rctx *ctx = ws->req->http->user_data;
762   if (ctx->read_anon) {
763     rc = JBR_ERROR_WS_ACCESS_DENIED;
764     goto finish;
765   }
766   RCC(rc, finish, ejdb_ensure_index(ctx->jbr->db, mctx->cname, path, mode));
767   ret = iwn_ws_server_write(ws, mctx->key, -1);
768 
769 finish:
770   if (rc && !_ws_rc_send(ws, mctx->key, rc, 0)) {
771     ret = false;
772   }
773   return ret;
774 }
775 
_ws_index_del(struct iwn_ws_sess * ws,struct mctx * mctx,int64_t mode,const char * path)776 static bool _ws_index_del(struct iwn_ws_sess *ws, struct mctx *mctx, int64_t mode, const char *path) {
777   iwrc rc;
778   bool ret = true;
779   struct rctx *ctx = ws->req->http->user_data;
780   if (ctx->read_anon) {
781     rc = JBR_ERROR_WS_ACCESS_DENIED;
782     goto finish;
783   }
784   RCC(rc, finish, ejdb_remove_index(ctx->jbr->db, mctx->cname, path, mode));
785   ret = iwn_ws_server_write(ws, mctx->key, -1);
786 
787 finish:
788   if (rc && !_ws_rc_send(ws, mctx->key, rc, 0)) {
789     ret = false;
790   }
791   return ret;
792 }
793 
_ws_query_visitor(EJDB_EXEC * ux,EJDB_DOC doc,int64_t * step)794 static iwrc _ws_query_visitor(EJDB_EXEC *ux, EJDB_DOC doc, int64_t *step) {
795   iwrc rc = 0;
796   struct mctx *mctx = ux->opaque;
797   if (!mctx->wbuf) {
798     RCA(mctx->wbuf = iwxstr_new2(512), finish);
799   } else {
800     iwxstr_clear(mctx->wbuf);
801   }
802   if (ux->log) {
803     iwn_ws_server_printf(mctx->ctx->ws, "%s\texplain\t%s", mctx->key, iwxstr_ptr(ux->log));
804     iwxstr_destroy(ux->log);
805     ux->log = 0;
806   }
807   RCC(rc, finish, iwxstr_printf(mctx->wbuf, "%s\t%" PRId64 "\t", mctx->key, doc->id));
808   if (doc->node) {
809     RCC(rc, finish, jbn_as_json(doc->node, jbl_xstr_json_printer, mctx->wbuf, 0));
810   } else {
811     RCC(rc, finish, jbl_as_json(doc->raw, jbl_xstr_json_printer, mctx->wbuf, 0));
812   }
813   if (!iwn_ws_server_write(mctx->ctx->ws, iwxstr_ptr(mctx->wbuf), iwxstr_size(mctx->wbuf))) {
814     *step = 0;
815   }
816 
817 finish:
818   return rc;
819 }
820 
_ws_query(struct iwn_ws_sess * ws,struct mctx * mctx,const char * query,bool explain)821 static bool _ws_query(struct iwn_ws_sess *ws, struct mctx *mctx, const char *query, bool explain) {
822   iwrc rc;
823   bool ret = false;
824   struct rctx *ctx = mctx->ctx;
825 
826   EJDB_EXEC ux = {
827     .db      = ctx->jbr->db,
828     .opaque  = mctx,
829     .visitor = _ws_query_visitor,
830   };
831 
832   RCC(rc, finish,
833       jql_create2(&ux.q, mctx->cname, query, JQL_SILENT_ON_PARSE_ERROR | JQL_KEEP_QUERY_ON_PARSE_ERROR));
834   if (ctx->read_anon && jql_has_apply(ux.q)) {
835     rc = JBR_ERROR_WS_ACCESS_DENIED;
836     goto finish;
837   }
838   if (explain) {
839     RCA(ux.log = iwxstr_new(), finish);
840   }
841   RCC(rc, finish, ejdb_exec(&ux));
842   if (ux.log) {
843     ret = iwn_ws_server_printf(mctx->ctx->ws, "%s\texplain\t%s", mctx->key, iwxstr_ptr(ux.log));
844   } else {
845     ret = true;
846   }
847 
848 finish:
849   if (rc) {
850     iwrc rcs = rc;
851     iwrc_strip_code(&rcs);
852     switch (rcs) {
853       case JQL_ERROR_QUERY_PARSE:
854         ret = _ws_error_send(ws, mctx->key, jql_error(ux.q), 0);
855         break;
856       default:
857         ret = _ws_rc_send(ws, mctx->key, rc, 0);
858         break;
859     }
860   } else {
861     if (jql_has_aggregate_count(ux.q)) {
862       ret = iwn_ws_server_printf(ws, "%s\t%" PRId64, mctx->key, ux.cnt);
863     } else {
864       ret = iwn_ws_server_write(ws, mctx->key, -1);
865     }
866   }
867   jql_destroy(&ux.q);
868   iwxstr_destroy(ux.log);
869   return ret;
870 }
871 
_on_ws_msg_impl(struct iwn_ws_sess * ws,struct mctx * mctx,const char * msg_,size_t len)872 static bool _on_ws_msg_impl(struct iwn_ws_sess *ws, struct mctx *mctx, const char *msg_, size_t len) {
873   if (len < 1) {
874     return true;
875   }
876 
877   int pos;
878   jbws_e wsop = JBWS_NONE;
879   const char *key = 0;
880   char *msg = (char*) msg_; // Discard const
881 
882   // Trim left/right
883   for ( ; len > 0 && isspace(msg[len - 1]); --len);
884   for (pos = 0; pos < len && isspace(msg[pos]); ++pos);
885   len -= pos;
886   msg += pos;
887   if (len < 1) {
888     return true;
889   }
890 
891   if (len == 1 && msg[0] == '?') {
892     static const char help[]
893       = "<key> info"
894         "\n<key> get     <collection> <id>"
895         "\n<key> set     <collection> <id> <document json>"
896         "\n<key> add     <collection> <document json>"
897         "\n<key> del     <collection> <id>"
898         "\n<key> patch   <collection> <id> <patch json>"
899         "\n<key> idx     <collection> <mode> <path>"
900         "\n<key> rmi     <collection> <mode> <path>"
901         "\n<key> rmc     <collection>"
902         "\n<key> query   <collection> <query>"
903         "\n<key> explain <collection> <query>"
904         "\n<key> <query>";
905     return iwn_ws_server_write(ws, help, sizeof(help) - 1);
906   }
907 
908   // Fetch key, after we can do good errors reporting
909   for (pos = 0; pos < len && !isspace(msg[pos]); ++pos);
910   if (pos > JBR_MAX_KEY_LEN) {
911     return false;
912   }
913   memcpy(mctx->key, msg, pos);
914   mctx->key[pos] = '\0';
915   if (pos >= len) {
916     return _ws_rc_send(ws, key, JBR_ERROR_WS_INVALID_MESSAGE, JBR_WS_STR_PREMATURE_END);
917   }
918 
919   for ( ; pos < len && isspace(msg[pos]); ++pos);
920   len -= pos;
921   msg += pos;
922   if (len < 1) {
923     return _ws_rc_send(ws, key, JBR_ERROR_WS_INVALID_MESSAGE, JBR_WS_STR_PREMATURE_END);
924   }
925 
926   // Fetch command
927   for (pos = 0; pos < len && !isspace(msg[pos]); ++pos);
928 
929   if (pos <= len) {
930     if (!strncmp("get", msg, pos)) {
931       wsop = JBWS_GET;
932     } else if (!strncmp("add", msg, pos)) {
933       wsop = JBWS_ADD;
934     } else if (!strncmp("set", msg, pos)) {
935       wsop = JBWS_SET;
936     } else if (!strncmp("query", msg, pos)) {
937       wsop = JBWS_QUERY;
938     } else if (!strncmp("del", msg, pos)) {
939       wsop = JBWS_DEL;
940     } else if (!strncmp("patch", msg, pos)) {
941       wsop = JBWS_PATCH;
942     } else if (!strncmp("explain", msg, pos)) {
943       wsop = JBWS_EXPLAIN;
944     } else if (!strncmp("info", msg, pos)) {
945       wsop = JBWS_INFO;
946     } else if (!strncmp("idx", msg, pos)) {
947       wsop = JBWS_IDX;
948     } else if (!strncmp("rmi", msg, pos)) {
949       wsop = JBWS_NIDX;
950     } else if (!strncmp("rmc", msg, pos)) {
951       wsop = JBWS_REMOVE_COLL;
952     }
953   }
954 
955   if (wsop > JBWS_NONE) {
956     if (wsop == JBWS_INFO) {
957       return _ws_info(ws, mctx);
958     }
959     for ( ; pos < len && isspace(msg[pos]); ++pos);
960     len -= pos;
961     msg += pos;
962 
963     const char *rp = msg;
964     for (pos = 0; pos < len && !isspace(msg[pos]); ++pos);
965     len -= pos;
966     msg += pos;
967 
968     if (pos < 1 || len < 1) {
969       if (wsop != JBWS_REMOVE_COLL) {
970         return _ws_rc_send(ws, mctx->key, JBR_ERROR_WS_INVALID_MESSAGE, JBR_WS_STR_PREMATURE_END);
971       }
972     } else if (pos > EJDB_COLLECTION_NAME_MAX_LEN) {
973       return _ws_rc_send(ws, key, JBR_ERROR_WS_INVALID_MESSAGE,
974                          "Collection name exceeds maximum length allowed: "
975                          "EJDB_COLLECTION_NAME_MAX_LEN");
976     }
977     memcpy(mctx->cname, rp, pos);
978     mctx->cname[pos] = '\0';
979 
980     if (wsop == JBWS_REMOVE_COLL) {
981       return _ws_coll_remove(ws, mctx);
982     }
983 
984     for (pos = 0; pos < len && isspace(msg[pos]); ++pos);
985     len -= pos;
986     msg += pos;
987     if (len < 1) {
988       return _ws_rc_send(ws, mctx->key, JBR_ERROR_WS_INVALID_MESSAGE, JBR_WS_STR_PREMATURE_END);
989     }
990 
991     switch (wsop) {
992       case JBWS_ADD:
993         msg[len] = '\0';
994         return _ws_document_add(ws, mctx, msg);
995       case JBWS_QUERY:
996       case JBWS_EXPLAIN:
997         msg[len] = '\0';
998         return _ws_query(ws, mctx, msg, (wsop == JBWS_EXPLAIN));
999       default: {
1000         char nbuf[IWNUMBUF_SIZE];
1001         for (pos = 0; pos < len && pos < IWNUMBUF_SIZE - 1 && isdigit(msg[pos]); ++pos) {
1002           nbuf[pos] = msg[pos];
1003         }
1004         nbuf[pos] = '\0';
1005         for ( ; pos < len && isspace(msg[pos]); ++pos);
1006         len -= pos;
1007         msg += pos;
1008 
1009         int64_t id = iwatoi(nbuf);
1010         if (id < 1) {
1011           return _ws_rc_send(ws, mctx->key, JBR_ERROR_WS_INVALID_MESSAGE, "Invalid document id specified");
1012         }
1013         msg[len] = '\0';
1014         switch (wsop) {
1015           case JBWS_GET:
1016             return _ws_document_get(ws, mctx, id);
1017           case JBWS_SET:
1018             return _ws_document_set(ws, mctx, id, msg);
1019           case JBWS_DEL:
1020             return _ws_document_del(ws, mctx, id);
1021           case JBWS_PATCH:
1022             return _ws_document_patch(ws, mctx, id, msg);
1023           case JBWS_IDX:
1024             return _ws_index_set(ws, mctx, id, msg);
1025           case JBWS_NIDX:
1026             return _ws_index_del(ws, mctx, id, msg);
1027           default:
1028             return _ws_rc_send(ws, mctx->key, JBR_ERROR_WS_INVALID_MESSAGE, 0);
1029         }
1030       }
1031     }
1032   } else {
1033     msg[len] = '\0';
1034     return _ws_query(ws, mctx, msg, false);
1035   }
1036 }
1037 
_on_ws_msg(struct iwn_ws_sess * ws,const char * msg,size_t len,uint8_t frame)1038 static bool _on_ws_msg(struct iwn_ws_sess *ws, const char *msg, size_t len, uint8_t frame) {
1039   struct mctx mctx = {
1040     .ctx = ws->req->http->user_data,
1041   };
1042   bool ret = _on_ws_msg_impl(ws, &mctx, msg, len);
1043   iwxstr_destroy(mctx.wbuf);
1044   return ret;
1045 }
1046 
_configure(struct jbr * jbr)1047 static iwrc _configure(struct jbr *jbr) {
1048   iwrc rc;
1049 
1050   RCC(rc, finish, iwn_wf_create(&(struct iwn_wf_route) {
1051     .handler = 0
1052   }, &jbr->ctx));
1053 
1054   RCC(rc, finish, iwn_wf_route(iwn_ws_server_route_attach(&(struct iwn_wf_route) {
1055     .ctx = jbr->ctx,
1056     .pattern = "/",
1057     .flags = IWN_WF_GET,
1058   }, &(struct iwn_ws_handler_spec) {
1059     .handler = _on_ws_msg,
1060     .on_http_init = _on_ws_session_http,
1061     .on_session_init = _on_ws_session_init,
1062     .on_session_dispose = _on_ws_session_dispose,
1063     .user_data = jbr
1064   }), 0));
1065 
1066   RCC(rc, finish, iwn_wf_route(&(struct iwn_wf_route) {
1067     .ctx = jbr->ctx,
1068     .pattern = "/",
1069     .flags = IWN_WF_MATCH_PREFIX | IWN_WF_METHODS_ALL,
1070     .handler = _on_http_request,
1071     .user_data = jbr
1072   }, 0));
1073 
1074 finish:
1075   return rc;
1076 }
1077 
_start(struct jbr * jbr)1078 static iwrc _start(struct jbr *jbr) {
1079   const EJDB_HTTP *http = jbr->http;
1080   struct iwn_wf_server_spec spec = {
1081     .poller = jbr->poller,
1082     .listen = http->bind ? http->bind : "localhost",
1083     .port   = http->port > 0 ? http->port : 9292,
1084   };
1085   if (http->ssl_private_key) {
1086     spec.ssl.private_key = http->ssl_private_key;
1087     spec.ssl.private_key_len = -1;
1088   }
1089   if (http->ssl_certs) {
1090     spec.ssl.certs = http->ssl_certs;
1091     spec.ssl.certs_len = -1;
1092   }
1093 
1094   return iwn_wf_server(&spec, jbr->ctx);
1095 }
1096 
jbr_start(EJDB db,const EJDB_OPTS * opts,struct jbr ** jbrp)1097 iwrc jbr_start(EJDB db, const EJDB_OPTS *opts, struct jbr **jbrp) {
1098   iwrc rc = 0;
1099   *jbrp = 0;
1100   if (!opts->http.enabled) {
1101     return 0;
1102   }
1103   JBR jbr = calloc(1, sizeof(*jbr));
1104   if (!jbr) {
1105     return iwrc_set_errno(IW_ERROR_ALLOC, errno);
1106   }
1107   jbr->db = db;
1108   jbr->http = &opts->http;
1109   *jbrp = jbr;
1110 
1111   uint16_t cores = iwp_num_cpu_cores();
1112   cores = MAX(2, cores == 0 ? 1 : cores - 1);
1113 
1114   RCC(rc, finish, _configure(jbr));
1115   RCC(rc, finish, iwn_poller_create(cores, cores / 2, &jbr->poller));
1116   RCC(rc, finish, _start(jbr));
1117 
1118   if (!jbr->http->blocking) {
1119     pthread_create(&jbr->poller_thread, 0, _poller_worker, jbr);
1120   } else {
1121     iwn_poller_poll(jbr->poller);
1122     iwn_poller_destroy(&jbr->poller);
1123     *jbrp = 0;
1124     free(jbr);
1125   }
1126 
1127 finish:
1128   if (rc) {
1129     *jbrp = 0;
1130     iwn_wf_destroy(jbr->ctx);
1131     iwn_poller_destroy(&jbr->poller);
1132     free(jbr);
1133   }
1134   return rc;
1135 }
1136 
_jbr_ecodefn(locale_t locale,uint32_t ecode)1137 static const char* _jbr_ecodefn(locale_t locale, uint32_t ecode) {
1138   if (!((ecode > _JBR_ERROR_START) && (ecode < _JBR_ERROR_END))) {
1139     return 0;
1140   }
1141   switch (ecode) {
1142     case JBR_ERROR_WS_INVALID_MESSAGE:
1143       return "Invalid message recieved (JBR_ERROR_WS_INVALID_MESSAGE)";
1144     case JBR_ERROR_WS_ACCESS_DENIED:
1145       return "Access denied (JBR_ERROR_WS_ACCESS_DENIED)";
1146   }
1147   return 0;
1148 }
1149 
jbr_init(void)1150 iwrc jbr_init(void) {
1151   static int _jbr_initialized = 0;
1152   if (!__sync_bool_compare_and_swap(&_jbr_initialized, 0, 1)) {
1153     return 0;
1154   }
1155   return iwlog_register_ecodefn(_jbr_ecodefn);
1156 }
1157