• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "jbr.h"
2 #include <ejdb2/iowow/iwconv.h>
3 #include <ejdb2/iowow/iwth.h>
4 #include "ejdb2_internal.h"
5 
6 #define FIO_INCLUDE_STR
7 
8 #include <fio.h>
9 #include <fiobj.h>
10 #include <fiobj_data.h>
11 #include <http/http.h>
12 #include <ctype.h>
13 
14 #define JBR_MAX_KEY_LEN          36
15 #define JBR_HTTP_CHUNK_SIZE      4096
16 #define JBR_WS_STR_PREMATURE_END "Premature end of message"
17 
18 static uint64_t k_header_x_access_token_hash;
19 static uint64_t k_header_x_hints_hash;
20 static uint64_t k_header_content_length_hash;
21 static uint64_t k_header_content_type_hash;
22 
23 typedef enum {
24   JBR_GET = 1,
25   JBR_PUT,
26   JBR_PATCH,
27   JBR_DELETE,
28   JBR_POST,
29   JBR_HEAD,
30   JBR_OPTIONS,
31 } jbr_method_t;
32 
33 struct _JBR {
34   volatile bool terminated;
35   volatile iwrc rc;
36   pthread_t     worker_thread;
37   pthread_barrier_t start_barrier;
38   const EJDB_HTTP  *http;
39   EJDB db;
40 };
41 
42 typedef struct _JBRCTX {
43   JBR     jbr;
44   http_s *req;
45   const char  *collection;
46   IWXSTR      *wbuf;
47   int64_t      id;
48   size_t       collection_len;
49   jbr_method_t method;
50   bool read_anon;
51   bool data_sent;
52 } JBRCTX;
53 
54 #define JBR_RC_REPORT(code_, r_, rc_)                                              \
55   do {                                                                             \
56     if ((code_) >= 500) iwlog_ecode_error3(rc_);                                   \
57     const char *strerr = iwlog_ecode_explained(rc_);                               \
58     _jbr_http_send(r_, code_, "text/plain", strerr, strerr ? strlen(strerr) : 0);   \
59   } while (0)
60 
_jbr_http_set_content_length(http_s * r,uintptr_t length)61 IW_INLINE void _jbr_http_set_content_length(http_s *r, uintptr_t length) {
62   if (!fiobj_hash_get2(r->private_data.out_headers, k_header_content_length_hash)) {
63     fiobj_hash_set(r->private_data.out_headers, HTTP_HEADER_CONTENT_LENGTH,
64                    fiobj_num_new(length));
65   }
66 }
67 
_jbr_http_set_content_type(http_s * r,const char * ctype)68 IW_INLINE void _jbr_http_set_content_type(http_s *r, const char *ctype) {
69   if (!fiobj_hash_get2(r->private_data.out_headers, k_header_content_type_hash)) {
70     fiobj_hash_set(r->private_data.out_headers, HTTP_HEADER_CONTENT_TYPE,
71                    fiobj_str_new(ctype, strlen(ctype)));
72   }
73 }
74 
_jbr_http_set_header(http_s * r,char * name,size_t nlen,char * val,size_t vlen)75 IW_INLINE void _jbr_http_set_header(
76   http_s *r,
77   char *name, size_t nlen,
78   char *val, size_t vlen) {
79   http_set_header2(r, (fio_str_info_s) {
80     .data = name, .len = nlen
81   }, (fio_str_info_s) {
82     .data = val, .len = vlen
83   });
84 }
85 
_jbr_http_send(http_s * r,int status,const char * ctype,const char * body,int bodylen)86 static iwrc _jbr_http_send(http_s *r, int status, const char *ctype, const char *body, int bodylen) {
87   if (!r || !r->private_data.out_headers) {
88     iwlog_ecode_error3(IW_ERROR_INVALID_ARGS);
89     return IW_ERROR_INVALID_ARGS;
90   }
91   r->status = status;
92   if (ctype) {
93     _jbr_http_set_content_type(r, ctype);
94   }
95   if (http_send_body(r, (char*) body, bodylen)) {
96     iwlog_ecode_error3(JBR_ERROR_SEND_RESPONSE);
97     return JBR_ERROR_SEND_RESPONSE;
98   }
99   return 0;
100 }
101 
_jbr_http_error_send(http_s * r,int status)102 IW_INLINE iwrc _jbr_http_error_send(http_s *r, int status) {
103   return _jbr_http_send(r, status, 0, 0, 0);
104 }
105 
_jbr_http_error_send2(http_s * r,int status,const char * ctype,const char * body,int bodylen)106 IW_INLINE iwrc _jbr_http_error_send2(http_s *r, int status, const char *ctype, const char *body, int bodylen) {
107   return _jbr_http_send(r, status, ctype, body, bodylen);
108 }
109 
_jbr_flush_chunk(JBRCTX * rctx,bool finish)110 static iwrc _jbr_flush_chunk(JBRCTX *rctx, bool finish) {
111   http_s *req = rctx->req;
112   IWXSTR *wbuf = rctx->wbuf;
113   char nbuf[JBNUMBUF_SIZE + 2]; // + \r\n
114   assert(wbuf);
115   if (!rctx->data_sent) {
116     req->status = 200;
117     _jbr_http_set_content_type(req, "application/json");
118     _jbr_http_set_header(req, "transfer-encoding", 17, "chunked", 7);
119     if (http_write_headers(req) < 0) {
120       iwlog_ecode_error3(JBR_ERROR_SEND_RESPONSE);
121       return JBR_ERROR_SEND_RESPONSE;
122     }
123     rctx->data_sent = true;
124   }
125   if (!finish && (iwxstr_size(wbuf) < JBR_HTTP_CHUNK_SIZE)) {
126     return 0;
127   }
128   intptr_t uuid = http_uuid(req);
129   if (iwxstr_size(wbuf) > 0) {
130     int sz = snprintf(nbuf, JBNUMBUF_SIZE, "%zX\r\n", iwxstr_size(wbuf));
131     if (fio_write(uuid, nbuf, sz) < 0) {
132       iwlog_ecode_error3(JBR_ERROR_SEND_RESPONSE);
133       return JBR_ERROR_SEND_RESPONSE;
134     }
135     if (fio_write(uuid, iwxstr_ptr(wbuf), iwxstr_size(wbuf)) < 0) {
136       iwlog_ecode_error3(JBR_ERROR_SEND_RESPONSE);
137       return JBR_ERROR_SEND_RESPONSE;
138     }
139     if (fio_write(uuid, "\r\n", 2) < 0) {
140       iwlog_ecode_error3(JBR_ERROR_SEND_RESPONSE);
141       return JBR_ERROR_SEND_RESPONSE;
142     }
143     iwxstr_clear(wbuf);
144   }
145   if (finish) {
146     if (fio_write(uuid, "0\r\n\r\n", 5) < 0) {
147       iwlog_ecode_error3(JBR_ERROR_SEND_RESPONSE);
148       return JBR_ERROR_SEND_RESPONSE;
149     }
150   }
151   return 0;
152 }
153 
_jbr_query_visitor(EJDB_EXEC * ux,EJDB_DOC doc,int64_t * step)154 static iwrc _jbr_query_visitor(EJDB_EXEC *ux, EJDB_DOC doc, int64_t *step) {
155   JBRCTX *rctx = ux->opaque;
156   assert(rctx);
157   iwrc rc = 0;
158   IWXSTR *wbuf = rctx->wbuf;
159   if (!wbuf) {
160     wbuf = iwxstr_new2(512);
161     if (!wbuf) {
162       return iwrc_set_errno(IW_ERROR_ALLOC, errno);
163     }
164     rctx->wbuf = wbuf;
165   }
166   if (ux->log) {
167     rc = iwxstr_cat(wbuf, iwxstr_ptr(ux->log), iwxstr_size(ux->log));
168     RCRET(rc);
169     rc = iwxstr_cat(wbuf, "--------------------", 20);
170     RCRET(rc);
171     iwxstr_destroy(ux->log);
172     ux->log = 0;
173   }
174   rc = iwxstr_printf(wbuf, "\r\n%lld\t", doc->id);
175   RCRET(rc);
176   if (doc->node) {
177     rc = jbn_as_json(doc->node, jbl_xstr_json_printer, wbuf, 0);
178   } else {
179     rc = jbl_as_json(doc->raw, jbl_xstr_json_printer, wbuf, 0);
180   }
181   RCRET(rc);
182   return _jbr_flush_chunk(rctx, false);
183 }
184 
_jbr_on_query(JBRCTX * rctx)185 static void _jbr_on_query(JBRCTX *rctx) {
186   http_s *req = rctx->req;
187   fio_str_info_s data = fiobj_data_read(req->body, 0);
188   if (data.len < 1) {
189     _jbr_http_error_send(rctx->req, 400);
190     return;
191   }
192   EJDB_EXEC ux = {
193     .opaque  = rctx,
194     .db      = rctx->jbr->db,
195     .visitor = _jbr_query_visitor
196   };
197 
198   // Collection name must be encoded in query
199   iwrc rc = jql_create2(&ux.q, 0, data.data, JQL_SILENT_ON_PARSE_ERROR | JQL_KEEP_QUERY_ON_PARSE_ERROR);
200   RCGO(rc, finish);
201   if (rctx->read_anon && jql_has_apply(ux.q)) {
202     // We have not permitted data modification request
203     jql_destroy(&ux.q);
204     _jbr_http_error_send(rctx->req, 403);
205     return;
206   }
207 
208   FIOBJ h = fiobj_hash_get2(req->headers, k_header_x_hints_hash);
209   if (h) {
210     if (!fiobj_type_is(h, FIOBJ_T_STRING)) {
211       jql_destroy(&ux.q);
212       _jbr_http_error_send(req, 400);
213       return;
214     }
215     fio_str_info_s hv = fiobj_obj2cstr(h);
216     if (strstr(hv.data, "explain")) {
217       ux.log = iwxstr_new();
218       if (!ux.log) {
219         rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
220         goto finish;
221       }
222     }
223   }
224 
225   rc = ejdb_exec(&ux);
226 
227   if (!rc && rctx->wbuf) {
228     rc = iwxstr_cat(rctx->wbuf, "\r\n", 2);
229     RCGO(rc, finish);
230     rc = _jbr_flush_chunk(rctx, true);
231   }
232 
233 finish:
234   if (rc) {
235     iwrc rcs = rc;
236     iwrc_strip_code(&rcs);
237     switch (rcs) {
238       case JQL_ERROR_QUERY_PARSE: {
239         const char *err = jql_error(ux.q);
240         _jbr_http_error_send2(rctx->req, 400, "text/plain", err, err ? (int) strlen(err) : 0);
241         break;
242       }
243       case JQL_ERROR_NO_COLLECTION:
244         JBR_RC_REPORT(400, req, rc);
245         break;
246       default:
247         if (rctx->data_sent) {
248           // We cannot report error over HTTP
249           // because already sent some data to client
250           iwlog_ecode_error3(rc);
251           http_complete(req);
252         } else {
253           JBR_RC_REPORT(500, req, rc);
254         }
255         break;
256     }
257   } else if (rctx->data_sent) {
258     http_complete(req);
259   } else if (ux.log) {
260     iwxstr_cat(ux.log, "--------------------", 20);
261     if (jql_has_aggregate_count(ux.q)) {
262       iwxstr_printf(ux.log, "\n%lld", ux.cnt);
263     }
264     _jbr_http_send(req, 200, "text/plain", iwxstr_ptr(ux.log), iwxstr_size(ux.log));
265   } else {
266     if (jql_has_aggregate_count(ux.q)) {
267       char nbuf[JBNUMBUF_SIZE];
268       snprintf(nbuf, sizeof(nbuf), "%" PRId64, ux.cnt);
269       _jbr_http_send(req, 200, "text/plain", nbuf, (int) strlen(nbuf));
270     } else {
271       _jbr_http_send(req, 200, 0, 0, 0);
272     }
273   }
274   if (ux.q) {
275     jql_destroy(&ux.q);
276   }
277   if (ux.log) {
278     iwxstr_destroy(ux.log);
279   }
280   if (rctx->wbuf) {
281     iwxstr_destroy(rctx->wbuf);
282     rctx->wbuf = 0;
283   }
284 }
285 
_jbr_on_patch(JBRCTX * rctx)286 static void _jbr_on_patch(JBRCTX *rctx) {
287   if (rctx->read_anon) {
288     _jbr_http_error_send(rctx->req, 403);
289     return;
290   }
291   EJDB db = rctx->jbr->db;
292   http_s *req = rctx->req;
293   fio_str_info_s data = fiobj_data_read(req->body, 0);
294   if (data.len < 1) {
295     _jbr_http_error_send(rctx->req, 400);
296     return;
297   }
298   iwrc rc = ejdb_patch(db, rctx->collection, data.data, rctx->id);
299   iwrc_strip_code(&rc);
300   switch (rc) {
301     case IWKV_ERROR_NOTFOUND:
302     case IW_ERROR_NOT_EXISTS:
303       _jbr_http_error_send(req, 404);
304       return;
305     case JBL_ERROR_PARSE_JSON:
306     case JBL_ERROR_PARSE_INVALID_CODEPOINT:
307     case JBL_ERROR_PARSE_INVALID_UTF8:
308     case JBL_ERROR_PARSE_UNQUOTED_STRING:
309     case JBL_ERROR_PATCH_TARGET_INVALID:
310     case JBL_ERROR_PATCH_NOVALUE:
311     case JBL_ERROR_PATCH_INVALID_OP:
312     case JBL_ERROR_PATCH_TEST_FAILED:
313     case JBL_ERROR_PATCH_INVALID_ARRAY_INDEX:
314     case JBL_ERROR_JSON_POINTER:
315       JBR_RC_REPORT(400, req, rc);
316       break;
317     default:
318       break;
319   }
320   if (rc) {
321     JBR_RC_REPORT(500, req, rc);
322   } else {
323     _jbr_http_send(req, 200, 0, 0, 0);
324   }
325 }
326 
_jbr_on_delete(JBRCTX * rctx)327 static void _jbr_on_delete(JBRCTX *rctx) {
328   if (rctx->read_anon) {
329     _jbr_http_error_send(rctx->req, 403);
330     return;
331   }
332   EJDB db = rctx->jbr->db;
333   http_s *req = rctx->req;
334   iwrc rc = ejdb_del(db, rctx->collection, rctx->id);
335   if ((rc == IWKV_ERROR_NOTFOUND) || (rc == IW_ERROR_NOT_EXISTS)) {
336     _jbr_http_error_send(req, 404);
337     return;
338   } else if (rc) {
339     JBR_RC_REPORT(500, req, rc);
340     return;
341   }
342   _jbr_http_send(req, 200, 0, 0, 0);
343 }
344 
_jbr_on_put(JBRCTX * rctx)345 static void _jbr_on_put(JBRCTX *rctx) {
346   if (rctx->read_anon) {
347     _jbr_http_error_send(rctx->req, 403);
348     return;
349   }
350   JBL jbl;
351   EJDB db = rctx->jbr->db;
352   http_s *req = rctx->req;
353   fio_str_info_s data = fiobj_data_read(req->body, 0);
354   if (data.len < 1) {
355     _jbr_http_error_send(rctx->req, 400);
356     return;
357   }
358   iwrc rc = jbl_from_json(&jbl, data.data);
359   if (rc) {
360     JBR_RC_REPORT(400, req, rc);
361     return;
362   }
363   rc = ejdb_put(db, rctx->collection, jbl, rctx->id);
364   if (rc) {
365     JBR_RC_REPORT(500, req, rc);
366     goto finish;
367   }
368   _jbr_http_send(req, 200, 0, 0, 0);
369 
370 finish:
371   jbl_destroy(&jbl);
372 }
373 
_jbr_on_post(JBRCTX * rctx)374 static void _jbr_on_post(JBRCTX *rctx) {
375   if (rctx->read_anon) {
376     _jbr_http_error_send(rctx->req, 403);
377     return;
378   }
379   JBL jbl;
380   int64_t id;
381   EJDB db = rctx->jbr->db;
382   http_s *req = rctx->req;
383   fio_str_info_s data = fiobj_data_read(req->body, 0);
384   if (data.len < 1) {
385     _jbr_http_error_send(rctx->req, 400);
386     return;
387   }
388   iwrc rc = jbl_from_json(&jbl, data.data);
389   if (rc) {
390     JBR_RC_REPORT(400, req, rc);
391     return;
392   }
393   rc = ejdb_put_new(db, rctx->collection, jbl, &id);
394   if (rc) {
395     JBR_RC_REPORT(500, req, rc);
396     goto finish;
397   }
398 
399   char nbuf[JBNUMBUF_SIZE];
400   int len = iwitoa(id, nbuf, sizeof(nbuf));
401   _jbr_http_send(req, 200, "text/plain", nbuf, len);
402 
403 finish:
404   jbl_destroy(&jbl);
405 }
406 
_jbr_on_get(JBRCTX * rctx)407 static void _jbr_on_get(JBRCTX *rctx) {
408   JBL jbl;
409   IWXSTR *xstr = 0;
410   int nbytes = 0;
411   EJDB db = rctx->jbr->db;
412   http_s *req = rctx->req;
413 
414   iwrc rc = ejdb_get(db, rctx->collection, rctx->id, &jbl);
415   if ((rc == IWKV_ERROR_NOTFOUND) || (rc == IW_ERROR_NOT_EXISTS)) {
416     _jbr_http_error_send(req, 404);
417     return;
418   } else if (rc) {
419     JBR_RC_REPORT(500, req, rc);
420     return;
421   }
422   if (req->method == JBR_HEAD) {
423     rc = jbl_as_json(jbl, jbl_count_json_printer, &nbytes, JBL_PRINT_PRETTY);
424   } else {
425     xstr = iwxstr_new2(jbl->bn.size * 2);
426     if (!xstr) {
427       rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
428       goto finish;
429     }
430     rc = jbl_as_json(jbl, jbl_xstr_json_printer, xstr, JBL_PRINT_PRETTY);
431   }
432   RCGO(rc, finish);
433 
434   _jbr_http_send(req, 200, "application/json",
435                  xstr ? iwxstr_ptr(xstr) : 0,
436                  xstr ? (int) iwxstr_size(xstr) : nbytes);
437 finish:
438   if (rc) {
439     JBR_RC_REPORT(500, req, rc);
440   }
441   jbl_destroy(&jbl);
442   if (xstr) {
443     iwxstr_destroy(xstr);
444   }
445 }
446 
_jbr_on_options(JBRCTX * rctx)447 static void _jbr_on_options(JBRCTX *rctx) {
448   JBL jbl;
449   EJDB db = rctx->jbr->db;
450   http_s *req = rctx->req;
451   const EJDB_HTTP *http = rctx->jbr->http;
452 
453   iwrc rc = ejdb_get_meta(db, &jbl);
454   if (rc) {
455     JBR_RC_REPORT(500, req, rc);
456     return;
457   }
458   IWXSTR *xstr = iwxstr_new2(jbl->bn.size * 2);
459   if (!xstr) {
460     rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
461     goto finish;
462   }
463   rc = jbl_as_json(jbl, jbl_xstr_json_printer, xstr, JBL_PRINT_PRETTY);
464   RCGO(rc, finish);
465 
466   if (http->read_anon) {
467     _jbr_http_set_header(req, "Allow", 5, "GET, HEAD, POST, OPTIONS", 24);
468   } else {
469     _jbr_http_set_header(req, "Allow", 5, "GET, HEAD, POST, PUT, PATCH, DELETE, OPTIONS", 44);
470   }
471 
472   if (http->cors) {
473     _jbr_http_set_header(req, "Access-Control-Allow-Origin", 27, "*", 1);
474     _jbr_http_set_header(req, "Access-Control-Allow-Headers", 28, "X-Requested-With, Content-Type, Accept, Origin, Authorization", 61);
475 
476     if (http->read_anon) {
477       _jbr_http_set_header(req, "Access-Control-Allow-Methods", 28, "GET, HEAD, POST, OPTIONS", 24);
478     } else {
479       _jbr_http_set_header(req, "Access-Control-Allow-Methods", 28, "GET, HEAD, POST, PUT, PATCH, DELETE, OPTIONS", 44);
480     }
481   }
482 
483   _jbr_http_send(req, 200, "application/json",
484                  iwxstr_ptr(xstr),
485                  iwxstr_size(xstr));
486 finish:
487   if (rc) {
488     JBR_RC_REPORT(500, req, rc);
489   }
490   jbl_destroy(&jbl);
491   if (xstr) {
492     iwxstr_destroy(xstr);
493   }
494 }
495 
_jbr_fill_ctx(http_s * req,JBRCTX * r)496 static bool _jbr_fill_ctx(http_s *req, JBRCTX *r) {
497   JBR jbr = req->udata;
498   memset(r, 0, sizeof(*r));
499   r->req = req;
500   r->jbr = jbr;
501   fio_str_info_s method = fiobj_obj2cstr(req->method);
502   switch (method.len) {
503     case 3:
504       if (!strncmp("GET", method.data, method.len)) {
505         r->method = JBR_GET;
506       } else if (!strncmp("PUT", method.data, method.len)) {
507         r->method = JBR_PUT;
508       }
509       break;
510     case 4:
511       if (!strncmp("POST", method.data, method.len)) {
512         r->method = JBR_POST;
513       } else if (!strncmp("HEAD", method.data, method.len)) {
514         r->method = JBR_HEAD;
515       }
516       break;
517     case 5:
518       if (!strncmp("PATCH", method.data, method.len)) {
519         r->method = JBR_PATCH;
520       }
521       break;
522     case 6:
523       if (!strncmp("DELETE", method.data, method.len)) {
524         r->method = JBR_DELETE;
525       }
526       break;
527     case 7:
528       if (!strncmp("OPTIONS", method.data, method.len)) {
529         r->method = JBR_OPTIONS;
530       }
531       break;
532   }
533   if (!r->method) {
534     // Unknown method
535     return false;
536   }
537   fio_str_info_s path = fiobj_obj2cstr(req->path);
538   if (!req->path || (path.len < 2)) {
539     return true;
540   } else if (r->method == JBR_OPTIONS) {
541     return false;
542   }
543 
544   char *c = strchr(path.data + 1, '/');
545   if (!c) {
546     switch (r->method) {
547       case JBR_GET:
548       case JBR_HEAD:
549       case JBR_PUT:
550       case JBR_DELETE:
551       case JBR_PATCH:
552         return false;
553       default:
554         break;
555     }
556     r->collection = path.data + 1;
557     r->collection_len = path.len - 1;
558   } else {
559     char *eptr;
560     char nbuf[JBNUMBUF_SIZE];
561     r->collection = path.data + 1;
562     r->collection_len = c - r->collection;
563     int nlen = path.len - (c - path.data) - 1;
564     if (nlen < 1) {
565       goto finish;
566     }
567     if (nlen > JBNUMBUF_SIZE - 1) {
568       return false;
569     }
570     memcpy(nbuf, r->collection + r->collection_len + 1, nlen);
571     nbuf[nlen] = '\0';
572     r->id = strtoll(nbuf, &eptr, 10);
573     if ((*eptr != '\0') || (r->id < 1) || (r->method == JBR_POST)) {
574       return false;
575     }
576   }
577 
578 finish:
579   return (r->collection_len <= EJDB_COLLECTION_NAME_MAX_LEN);
580 }
581 
_jbr_on_http_request(http_s * req)582 static void _jbr_on_http_request(http_s *req) {
583   JBRCTX rctx;
584   JBR jbr = req->udata;
585   assert(jbr);
586   const EJDB_HTTP *http = jbr->http;
587   char cname[EJDB_COLLECTION_NAME_MAX_LEN + 1];
588 
589   if (http->cors) {
590     _jbr_http_set_header(req, "Access-Control-Allow-Origin", 27, "*", 1);
591   }
592 
593   if (!_jbr_fill_ctx(req, &rctx)) {
594     http_send_error(req, 400); // Bad request
595     return;
596   }
597   if (http->access_token) {
598     FIOBJ h = fiobj_hash_get2(req->headers, k_header_x_access_token_hash);
599     if (!h) {
600       if (http->read_anon) {
601         if ((rctx.method == JBR_GET) || (rctx.method == JBR_HEAD) || ((rctx.method == JBR_POST) && !rctx.collection)) {
602           rctx.read_anon = true;
603           goto process;
604         }
605       }
606       http_send_error(req, 401);
607       return;
608     }
609     if (!fiobj_type_is(h, FIOBJ_T_STRING)) { // header specified more than once
610       http_send_error(req, 400);
611       return;
612     }
613     fio_str_info_s hv = fiobj_obj2cstr(h);
614     if ((hv.len != http->access_token_len) || (memcmp(hv.data, http->access_token, http->access_token_len) != 0)) { // -V526
615       http_send_error(req, 403);
616       return;
617     }
618   }
619 
620 process:
621   if (rctx.collection) {
622     // convert to `\0` terminated c-string
623     memcpy(cname, rctx.collection, rctx.collection_len);
624     cname[rctx.collection_len] = '\0';
625     rctx.collection = cname;
626     switch (rctx.method) {
627       case JBR_GET:
628       case JBR_HEAD:
629         _jbr_on_get(&rctx);
630         break;
631       case JBR_POST:
632         _jbr_on_post(&rctx);
633         break;
634       case JBR_PUT:
635         _jbr_on_put(&rctx);
636         break;
637       case JBR_PATCH:
638         _jbr_on_patch(&rctx);
639         break;
640       case JBR_DELETE:
641         _jbr_on_delete(&rctx);
642         break;
643       default:
644         http_send_error(req, 400);
645         break;
646     }
647   } else if (rctx.method == JBR_POST) {
648     _jbr_on_query(&rctx);
649   } else if (rctx.method == JBR_OPTIONS) {
650     _jbr_on_options(&rctx);
651   } else {
652     http_send_error(req, 400);
653   }
654 }
655 
_jbr_on_http_finish(struct http_settings_s * settings)656 static void _jbr_on_http_finish(struct http_settings_s *settings) {
657 }
658 
_jbr_on_pre_start(void * op)659 static void _jbr_on_pre_start(void *op) {
660   JBR jbr = op;
661   if (!jbr->http->blocking) {
662     pthread_barrier_wait(&jbr->start_barrier);
663   }
664 }
665 
666 //------------------ WS ---------------------
667 
668 
669 #define _WS_KEYPREFIX_BUFSZ (JBNUMBUF_SIZE + JBR_MAX_KEY_LEN + 2)
670 
671 typedef enum {
672   JBWS_NONE,
673   JBWS_SET,
674   JBWS_GET,
675   JBWS_ADD,
676   JBWS_DEL,
677   JBWS_PATCH,
678   JBWS_QUERY,
679   JBWS_EXPLAIN,
680   JBWS_INFO,
681   JBWS_IDX,
682   JBWS_NIDX,
683   JBWS_REMOVE_COLL,
684 } jbwsop_t;
685 
686 typedef struct _JBWCTX {
687   bool  read_anon;
688   EJDB  db;
689   ws_s *ws;
690 } JBWCTX;
691 
_jbr_ws_write_text(ws_s * ws,const char * data,int len)692 IW_INLINE bool _jbr_ws_write_text(ws_s *ws, const char *data, int len) {
693   if (fio_is_closed(websocket_uuid(ws)) || (websocket_write(ws, (fio_str_info_s) {
694     .data = (char*) data, .len = len
695   }, 1) < 0)) {
696     iwlog_warn2("Websocket channel closed");
697     return false;
698   }
699   return true;
700 }
701 
_jbr_fill_prefix_buf(const char * key,int64_t id,char buf[static _WS_KEYPREFIX_BUFSZ])702 IW_INLINE int _jbr_fill_prefix_buf(const char *key, int64_t id, char buf[static _WS_KEYPREFIX_BUFSZ]) {
703   int len = (int) strlen(key);
704   char *wp = buf;
705   memcpy(wp, key, len); // NOLINT(bugprone-not-null-terminated-result)
706   wp += len;
707   *wp++ = '\t';
708   wp += iwitoa(id, wp, _WS_KEYPREFIX_BUFSZ - (wp - buf));
709   return (int) (wp - buf);
710 }
711 
_jbr_ws_on_open(ws_s * ws)712 static void _jbr_ws_on_open(ws_s *ws) {
713   JBWCTX *wctx = websocket_udata_get(ws);
714   if (wctx) {
715     wctx->ws = ws;
716   }
717 }
718 
_jbr_ws_on_close(intptr_t uuid,void * udata)719 static void _jbr_ws_on_close(intptr_t uuid, void *udata) {
720   JBWCTX *wctx = udata;
721   free(wctx);
722 }
723 
_jbr_ws_send_error(JBWCTX * wctx,const char * key,const char * error,const char * extra)724 static void _jbr_ws_send_error(JBWCTX *wctx, const char *key, const char *error, const char *extra) {
725   assert(wctx && key && error);
726   IWXSTR *xstr = iwxstr_new();
727   if (!xstr) {
728     iwlog_ecode_error3(iwrc_set_errno(IW_ERROR_ALLOC, errno));
729     return;
730   }
731   iwrc rc;
732   if (extra) {
733     rc = iwxstr_printf(xstr, "%s ERROR: %s %s", key, error, extra);
734   } else {
735     rc = iwxstr_printf(xstr, "%s ERROR: %s", key, error);
736   }
737   if (rc) {
738     iwlog_ecode_error3(rc);
739   } else {
740     _jbr_ws_write_text(wctx->ws, iwxstr_ptr(xstr), iwxstr_size(xstr));
741   }
742   iwxstr_destroy(xstr);
743 }
744 
_jbr_ws_send_rc(JBWCTX * wctx,const char * key,iwrc rc,const char * extra)745 static void _jbr_ws_send_rc(JBWCTX *wctx, const char *key, iwrc rc, const char *extra) {
746   const char *error = iwlog_ecode_explained(rc);
747   if (error) {
748     _jbr_ws_send_error(wctx, key, error, extra);
749   }
750 }
751 
_jbr_ws_add_document(JBWCTX * wctx,const char * key,const char * coll,const char * json)752 static void _jbr_ws_add_document(JBWCTX *wctx, const char *key, const char *coll, const char *json) {
753   if (wctx->read_anon) {
754     _jbr_ws_send_rc(wctx, key, JBR_ERROR_WS_ACCESS_DENIED, 0);
755     return;
756   }
757   JBL jbl;
758   int64_t id;
759   iwrc rc = jbl_from_json(&jbl, json);
760   if (rc) {
761     _jbr_ws_send_rc(wctx, key, rc, 0);
762     return;
763   }
764   rc = ejdb_put_new(wctx->db, coll, jbl, &id);
765   if (rc) {
766     _jbr_ws_send_rc(wctx, key, rc, 0);
767     goto finish;
768   }
769   char pbuf[_WS_KEYPREFIX_BUFSZ];
770   _jbr_fill_prefix_buf(key, id, pbuf);
771   _jbr_ws_write_text(wctx->ws, pbuf, (int) strlen(pbuf));
772 
773 finish:
774   jbl_destroy(&jbl);
775 }
776 
_jbr_ws_set_document(JBWCTX * wctx,const char * key,const char * coll,int64_t id,const char * json)777 static void _jbr_ws_set_document(JBWCTX *wctx, const char *key, const char *coll, int64_t id, const char *json) {
778   if (wctx->read_anon) {
779     _jbr_ws_send_rc(wctx, key, JBR_ERROR_WS_ACCESS_DENIED, 0);
780     return;
781   }
782   JBL jbl;
783   iwrc rc = jbl_from_json(&jbl, json);
784   if (rc) {
785     _jbr_ws_send_rc(wctx, key, rc, 0);
786     return;
787   }
788   rc = ejdb_put(wctx->db, coll, jbl, id);
789   if (rc) {
790     _jbr_ws_send_rc(wctx, key, rc, 0);
791     goto finish;
792   }
793   char pbuf[_WS_KEYPREFIX_BUFSZ];
794   int len = _jbr_fill_prefix_buf(key, id, pbuf);
795   _jbr_ws_write_text(wctx->ws, pbuf, len);
796 
797 finish:
798   jbl_destroy(&jbl);
799 }
800 
_jbr_ws_get_document(JBWCTX * wctx,const char * key,const char * coll,int64_t id)801 static void _jbr_ws_get_document(JBWCTX *wctx, const char *key, const char *coll, int64_t id) {
802   JBL jbl;
803   iwrc rc = ejdb_get(wctx->db, coll, id, &jbl);
804   if (rc) {
805     _jbr_ws_send_rc(wctx, key, rc, 0);
806     return;
807   }
808   IWXSTR *xstr = iwxstr_new2(jbl->bn.size * 2);
809   if (!xstr) {
810     rc = iwrc_set_errno(rc, IW_ERROR_ALLOC);
811     iwlog_ecode_error3(rc);
812     _jbr_ws_send_rc(wctx, key, rc, 0);
813     return;
814   }
815   char pbuf[_WS_KEYPREFIX_BUFSZ];
816   _jbr_fill_prefix_buf(key, id, pbuf);
817   rc = iwxstr_printf(xstr, "%s\t", pbuf);
818   if (rc) {
819     _jbr_ws_send_rc(wctx, key, rc, 0);
820     goto finish;
821   }
822   rc = jbl_as_json(jbl, jbl_xstr_json_printer, xstr, JBL_PRINT_PRETTY);
823   if (rc) {
824     _jbr_ws_send_rc(wctx, key, rc, 0);
825     goto finish;
826   }
827   _jbr_ws_write_text(wctx->ws, iwxstr_ptr(xstr), iwxstr_size(xstr));
828 
829 finish:
830   iwxstr_destroy(xstr);
831   jbl_destroy(&jbl);
832 }
833 
_jbr_ws_del_document(JBWCTX * wctx,const char * key,const char * coll,int64_t id)834 static void _jbr_ws_del_document(JBWCTX *wctx, const char *key, const char *coll, int64_t id) {
835   iwrc rc = ejdb_del(wctx->db, coll, id);
836   if (rc) {
837     _jbr_ws_send_rc(wctx, key, rc, 0);
838     return;
839   }
840   char pbuf[_WS_KEYPREFIX_BUFSZ];
841   int len = _jbr_fill_prefix_buf(key, id, pbuf);
842   _jbr_ws_write_text(wctx->ws, pbuf, len);
843 }
844 
_jbr_ws_patch_document(JBWCTX * wctx,const char * key,const char * coll,int64_t id,const char * json)845 static void _jbr_ws_patch_document(JBWCTX *wctx, const char *key, const char *coll, int64_t id, const char *json) {
846   if (wctx->read_anon) {
847     _jbr_ws_send_rc(wctx, key, JBR_ERROR_WS_ACCESS_DENIED, 0);
848     return;
849   }
850   iwrc rc = ejdb_patch(wctx->db, coll, json, id);
851   if (rc) {
852     _jbr_ws_send_rc(wctx, key, rc, 0);
853     return;
854   }
855   char pbuf[_WS_KEYPREFIX_BUFSZ];
856   int len = _jbr_fill_prefix_buf(key, id, pbuf);
857   _jbr_ws_write_text(wctx->ws, pbuf, len);
858 }
859 
_jbr_ws_set_index(JBWCTX * wctx,const char * key,const char * coll,int64_t mode,const char * path)860 static void _jbr_ws_set_index(JBWCTX *wctx, const char *key, const char *coll, int64_t mode, const char *path) {
861   if (wctx->read_anon) {
862     _jbr_ws_send_rc(wctx, key, JBR_ERROR_WS_ACCESS_DENIED, 0);
863     return;
864   }
865   iwrc rc = ejdb_ensure_index(wctx->db, coll, path, mode);
866   if (rc) {
867     _jbr_ws_send_rc(wctx, key, rc, 0);
868   } else {
869     _jbr_ws_write_text(wctx->ws, key, (int) strlen(key));
870   }
871 }
872 
_jbr_ws_del_index(JBWCTX * wctx,const char * key,const char * coll,int64_t mode,const char * path)873 static void _jbr_ws_del_index(JBWCTX *wctx, const char *key, const char *coll, int64_t mode, const char *path) {
874   if (wctx->read_anon) {
875     _jbr_ws_send_rc(wctx, key, JBR_ERROR_WS_ACCESS_DENIED, 0);
876     return;
877   }
878   iwrc rc = ejdb_remove_index(wctx->db, coll, path, mode);
879   if (rc) {
880     _jbr_ws_send_rc(wctx, key, rc, 0);
881   } else {
882     _jbr_ws_write_text(wctx->ws, key, (int) strlen(key));
883   }
884 }
885 
886 typedef struct JBWQCTX {
887   JBWCTX     *wctx;
888   IWXSTR     *wbuf;
889   const char *key;
890 } JBWQCTX;
891 
_jbr_ws_query_visitor(EJDB_EXEC * ux,EJDB_DOC doc,int64_t * step)892 static iwrc _jbr_ws_query_visitor(EJDB_EXEC *ux, EJDB_DOC doc, int64_t *step) {
893   iwrc rc = 0;
894   JBWQCTX *qctx = ux->opaque;
895   assert(qctx);
896   IWXSTR *wbuf = qctx->wbuf;
897   if (!wbuf) {
898     wbuf = iwxstr_new2(512);
899     if (!wbuf) {
900       return iwrc_set_errno(IW_ERROR_ALLOC, errno);
901     }
902     qctx->wbuf = wbuf;
903   } else {
904     iwxstr_clear(wbuf);
905   }
906   if (ux->log) {
907     rc = iwxstr_printf(wbuf, "%s\texplain\t%s", qctx->key, iwxstr_ptr(ux->log));
908     iwxstr_destroy(ux->log);
909     ux->log = 0;
910     RCRET(rc);
911     _jbr_ws_write_text(qctx->wctx->ws, iwxstr_ptr(wbuf), iwxstr_size(wbuf));
912     iwxstr_clear(wbuf);
913   }
914 
915   rc = iwxstr_printf(wbuf, "%s\t%lld\t", qctx->key, doc->id);
916   RCRET(rc);
917 
918   if (doc->node) {
919     rc = jbn_as_json(doc->node, jbl_xstr_json_printer, wbuf, 0);
920   } else {
921     rc = jbl_as_json(doc->raw, jbl_xstr_json_printer, wbuf, 0);
922   }
923   RCRET(rc);
924   if (!_jbr_ws_write_text(qctx->wctx->ws, iwxstr_ptr(wbuf), iwxstr_size(wbuf))) {
925     *step = 0;
926   }
927   return 0;
928 }
929 
_jbr_ws_query(JBWCTX * wctx,const char * key,const char * coll,const char * query,bool explain)930 static void _jbr_ws_query(JBWCTX *wctx, const char *key, const char *coll, const char *query, bool explain) {
931   JBWQCTX qctx = {
932     .wctx = wctx,
933     .key  = key
934   };
935   EJDB_EXEC ux = {
936     .db      = wctx->db,
937     .opaque  = &qctx,
938     .visitor = _jbr_ws_query_visitor,
939   };
940 
941   iwrc rc = jql_create2(&ux.q, coll, query, JQL_SILENT_ON_PARSE_ERROR | JQL_KEEP_QUERY_ON_PARSE_ERROR);
942   RCGO(rc, finish);
943 
944   if (wctx->read_anon && jql_has_apply(ux.q)) {
945     _jbr_ws_send_rc(wctx, key, JBR_ERROR_WS_ACCESS_DENIED, 0);
946     goto finish;
947   }
948 
949   if (explain) {
950     ux.log = iwxstr_new();
951     if (!ux.log) {
952       iwlog_ecode_error3(iwrc_set_errno(IW_ERROR_ALLOC, errno));
953       goto finish;
954     }
955   }
956 
957   rc = ejdb_exec(&ux);
958 
959   if (!rc) {
960     if (ux.log) {
961       IWXSTR *wbuf = iwxstr_new();
962       if (!wbuf) {
963         rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
964         goto finish;
965       }
966       if (!iwxstr_printf(wbuf, "%s\texplain\t%s", qctx.key, iwxstr_ptr(ux.log))) {
967         _jbr_ws_write_text(wctx->ws, iwxstr_ptr(wbuf), iwxstr_size(wbuf));
968       }
969       iwxstr_destroy(wbuf);
970     }
971   }
972 
973 finish:
974   if (rc) {
975     iwrc rcs = rc;
976     iwrc_strip_code(&rcs);
977     switch (rcs) {
978       case JQL_ERROR_QUERY_PARSE:
979         _jbr_ws_send_error(wctx, key, jql_error(ux.q), 0);
980         break;
981       default:
982         _jbr_ws_send_rc(wctx, key, rc, 0);
983         break;
984     }
985   } else {
986     if (jql_has_aggregate_count(ux.q)) {
987       char pbuf[_WS_KEYPREFIX_BUFSZ];
988       _jbr_fill_prefix_buf(key, ux.cnt, pbuf);
989       _jbr_ws_write_text(wctx->ws, pbuf, (int) strlen(pbuf));
990     }
991     _jbr_ws_write_text(wctx->ws, key, (int) strlen(key));
992   }
993   if (ux.q) {
994     jql_destroy(&ux.q);
995   }
996   if (ux.log) {
997     iwxstr_destroy(ux.log);
998   }
999   if (qctx.wbuf) {
1000     iwxstr_destroy(qctx.wbuf);
1001   }
1002 }
1003 
_jbr_ws_info(JBWCTX * wctx,const char * key)1004 static void _jbr_ws_info(JBWCTX *wctx, const char *key) {
1005   if (wctx->read_anon) {
1006     _jbr_ws_send_rc(wctx, key, JBR_ERROR_WS_ACCESS_DENIED, 0);
1007     return;
1008   }
1009   JBL jbl;
1010   iwrc rc = ejdb_get_meta(wctx->db, &jbl);
1011   if (rc) {
1012     _jbr_ws_send_rc(wctx, key, rc, 0);
1013     return;
1014   }
1015   IWXSTR *xstr = iwxstr_new2(jbl->bn.size * 2);
1016   if (!xstr) {
1017     rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
1018     RCGO(rc, finish);
1019   }
1020   rc = iwxstr_printf(xstr, "%s\t", key);
1021   RCGO(rc, finish);
1022 
1023   rc = jbl_as_json(jbl, jbl_xstr_json_printer, xstr, JBL_PRINT_PRETTY);
1024   RCGO(rc, finish);
1025   _jbr_ws_write_text(wctx->ws, iwxstr_ptr(xstr), iwxstr_size(xstr));
1026 
1027 finish:
1028   if (rc) {
1029     _jbr_ws_send_rc(wctx, key, rc, 0);
1030   }
1031   jbl_destroy(&jbl);
1032   if (xstr) {
1033     iwxstr_destroy(xstr);
1034   }
1035 }
1036 
_jbr_ws_remove_coll(JBWCTX * wctx,const char * key,const char * coll)1037 static void _jbr_ws_remove_coll(JBWCTX *wctx, const char *key, const char *coll) {
1038   if (wctx->read_anon) {
1039     _jbr_ws_send_rc(wctx, key, JBR_ERROR_WS_ACCESS_DENIED, 0);
1040     return;
1041   }
1042   iwrc rc = ejdb_remove_collection(wctx->db, coll);
1043   if (rc) {
1044     _jbr_ws_send_rc(wctx, key, rc, 0);
1045   } else {
1046     _jbr_ws_write_text(wctx->ws, key, (int) strlen(key));
1047   }
1048 }
1049 
_jbr_ws_on_message(ws_s * ws,fio_str_info_s msg,uint8_t is_text)1050 static void _jbr_ws_on_message(ws_s *ws, fio_str_info_s msg, uint8_t is_text) {
1051   if (!is_text) { // Do not serve binary requests
1052     websocket_close(ws);
1053     return;
1054   }
1055   if (!msg.data || (msg.len < 1)) { // Ignore empty messages, but keep connection
1056     return;
1057   }
1058   JBWCTX *wctx = websocket_udata_get(ws);
1059   assert(wctx);
1060   wctx->ws = ws;
1061   jbwsop_t wsop = JBWS_NONE;
1062 
1063   char keybuf[JBR_MAX_KEY_LEN + 1];
1064   char cnamebuf[EJDB_COLLECTION_NAME_MAX_LEN + 1];
1065 
1066   char *data = msg.data, *key = 0;
1067   int len = msg.len, pos;
1068 
1069   // Trim right
1070   for (pos = len; pos > 0 && isspace(data[pos - 1]); --pos) ;
1071   len = pos;
1072   // Trim left
1073   for (pos = 0; pos < len && isspace(data[pos]); ++pos) ;
1074   len -= pos;
1075   data += pos;
1076   if (len < 1) {
1077     return;
1078   }
1079   if ((len == 1) && (data[0] == '?')) {
1080     const char *help
1081       = "\n<key> info"
1082         "\n<key> get     <collection> <id>"
1083         "\n<key> set     <collection> <id> <document json>"
1084         "\n<key> add     <collection> <document json>"
1085         "\n<key> del     <collection> <id>"
1086         "\n<key> patch   <collection> <id> <patch json>"
1087         "\n<key> idx     <collection> <mode> <path>"
1088         "\n<key> rmi     <collection> <mode> <path>"
1089         "\n<key> rmc     <collection>"
1090         "\n<key> query   <collection> <query>"
1091         "\n<key> explain <collection> <query>"
1092         "\n<key> <query>"
1093         "\n";
1094     _jbr_ws_write_text(ws, help, (int) strlen(help));
1095     return;
1096   }
1097 
1098   // Fetch key, after we can do good errors reporting
1099   for (pos = 0; pos < len && !isspace(data[pos]); ++pos) ;
1100   if (pos > JBR_MAX_KEY_LEN) {
1101     iwlog_warn("The key length: %d exceeded limit: %d", pos, JBR_MAX_KEY_LEN);
1102     return;
1103   }
1104   memcpy(keybuf, data, pos);
1105   keybuf[pos] = '\0';
1106   key = keybuf;
1107   if (pos >= len) {
1108     _jbr_ws_send_rc(wctx, key, JBR_ERROR_WS_INVALID_MESSAGE, JBR_WS_STR_PREMATURE_END);
1109     return;
1110   }
1111 
1112   // Space
1113   for ( ; pos < len && isspace(data[pos]); ++pos) ;
1114   len -= pos;
1115   data += pos;
1116   if (len < 1) {
1117     _jbr_ws_send_rc(wctx, key, JBR_ERROR_WS_INVALID_MESSAGE, JBR_WS_STR_PREMATURE_END);
1118     return;
1119   }
1120 
1121   // Fetch command
1122   for (pos = 0; pos < len && !isspace(data[pos]); ++pos) ;
1123 
1124   if (pos <= len) {
1125     if (!strncmp("get", data, pos)) {
1126       wsop = JBWS_GET;
1127     } else if (!strncmp("add", data, pos)) {
1128       wsop = JBWS_ADD;
1129     } else if (!strncmp("set", data, pos)) {
1130       wsop = JBWS_SET;
1131     } else if (!strncmp("query", data, pos)) {
1132       wsop = JBWS_QUERY;
1133     } else if (!strncmp("del", data, pos)) {
1134       wsop = JBWS_DEL;
1135     } else if (!strncmp("patch", data, pos)) {
1136       wsop = JBWS_PATCH;
1137     } else if (!strncmp("explain", data, pos)) {
1138       wsop = JBWS_EXPLAIN;
1139     } else if (!strncmp("info", data, pos)) {
1140       wsop = JBWS_INFO;
1141     } else if (!strncmp("idx", data, pos)) {
1142       wsop = JBWS_IDX;
1143     } else if (!strncmp("rmi", data, pos)) {
1144       wsop = JBWS_NIDX;
1145     } else if (!strncmp("rmc", data, pos)) {
1146       wsop = JBWS_REMOVE_COLL;
1147     }
1148   }
1149 
1150   if (wsop > JBWS_NONE) {
1151     if (wsop == JBWS_INFO) {
1152       _jbr_ws_info(wctx, key);
1153       return;
1154     }
1155     for ( ; pos < len && isspace(data[pos]); ++pos) ;
1156     len -= pos;
1157     data += pos;
1158 
1159     char *coll = data;
1160     for (pos = 0; pos < len && !isspace(data[pos]); ++pos) ;
1161     len -= pos;
1162     data += pos;
1163 
1164     if ((pos < 1) || (len < 1)) {
1165       if (wsop != JBWS_REMOVE_COLL) {
1166         _jbr_ws_send_rc(wctx, key, JBR_ERROR_WS_INVALID_MESSAGE, JBR_WS_STR_PREMATURE_END);
1167         return;
1168       }
1169     } else if (pos > EJDB_COLLECTION_NAME_MAX_LEN) {
1170       _jbr_ws_send_rc(wctx, key, JBR_ERROR_WS_INVALID_MESSAGE,
1171                       "Collection name exceeds maximum length allowed: "
1172                       "EJDB_COLLECTION_NAME_MAX_LEN");
1173       return;
1174     }
1175     memcpy(cnamebuf, coll, pos);
1176     cnamebuf[pos] = '\0';
1177     coll = cnamebuf;
1178 
1179     if (wsop == JBWS_REMOVE_COLL) {
1180       _jbr_ws_remove_coll(wctx, key, coll);
1181       return;
1182     }
1183 
1184     for (pos = 0; pos < len && isspace(data[pos]); ++pos) ;
1185     len -= pos;
1186     data += pos;
1187     if (len < 1) {
1188       _jbr_ws_send_rc(wctx, key, JBR_ERROR_WS_INVALID_MESSAGE, JBR_WS_STR_PREMATURE_END);
1189       return;
1190     }
1191 
1192     switch (wsop) {
1193       case JBWS_ADD:
1194         data[len] = '\0';
1195         _jbr_ws_add_document(wctx, key, coll, data);
1196         break;
1197       case JBWS_QUERY:
1198       case JBWS_EXPLAIN:
1199         data[len] = '\0';
1200         _jbr_ws_query(wctx, key, coll, data, (wsop == JBWS_EXPLAIN));
1201         break;
1202       default: {
1203         char nbuf[JBNUMBUF_SIZE];
1204         for (pos = 0; pos < len && pos < JBNUMBUF_SIZE - 1 && isdigit(data[pos]); ++pos) {
1205           nbuf[pos] = data[pos];
1206         }
1207         nbuf[pos] = '\0';
1208         for ( ; pos < len && isspace(data[pos]); ++pos) ;
1209         len -= pos;
1210         data += pos;
1211 
1212         int64_t id = iwatoi(nbuf);
1213         if (id < 1) {
1214           _jbr_ws_send_rc(wctx, key, JBR_ERROR_WS_INVALID_MESSAGE, "Invalid document id specified");
1215           return;
1216         }
1217         switch (wsop) {
1218           case JBWS_GET:
1219             _jbr_ws_get_document(wctx, key, coll, id);
1220             break;
1221           case JBWS_SET:
1222             data[len] = '\0';
1223             _jbr_ws_set_document(wctx, key, coll, id, data);
1224             break;
1225           case JBWS_DEL:
1226             _jbr_ws_del_document(wctx, key, coll, id);
1227             break;
1228           case JBWS_PATCH:
1229             data[len] = '\0';
1230             _jbr_ws_patch_document(wctx, key, coll, id, data);
1231             break;
1232           case JBWS_IDX:
1233             data[len] = '\0';
1234             _jbr_ws_set_index(wctx, key, coll, id, data);
1235             break;
1236           case JBWS_NIDX:
1237             data[len] = '\0';
1238             _jbr_ws_del_index(wctx, key, coll, id, data);
1239             break;
1240           default:
1241             _jbr_ws_send_rc(wctx, key, JBR_ERROR_WS_INVALID_MESSAGE, 0);
1242             return;
1243         }
1244       }
1245     }
1246   } else {
1247     data[len] = '\0';
1248     _jbr_ws_query(wctx, key, 0, data, false);
1249   }
1250 }
1251 
_jbr_on_http_upgrade(http_s * req,char * requested_protocol,size_t len)1252 static void _jbr_on_http_upgrade(http_s *req, char *requested_protocol, size_t len) {
1253   JBR jbr = req->udata;
1254   assert(jbr);
1255   const EJDB_HTTP *http = jbr->http;
1256   fio_str_info_s path = fiobj_obj2cstr(req->path);
1257 
1258   if (  ((path.len != 1) || (path.data[0] != '/'))
1259      || ((len != 9) || (requested_protocol[1] != 'e'))) {
1260     http_send_error(req, 400);
1261     return;
1262   }
1263   JBWCTX *wctx = calloc(1, sizeof(*wctx));
1264   if (!wctx) {
1265     http_send_error(req, 500);
1266     return;
1267   }
1268   wctx->db = jbr->db;
1269 
1270   if (http->access_token) {
1271     FIOBJ h = fiobj_hash_get2(req->headers, k_header_x_access_token_hash);
1272     if (!h) {
1273       if (http->read_anon) {
1274         wctx->read_anon = true;
1275       } else {
1276         free(wctx);
1277         http_send_error(req, 401);
1278         return;
1279       }
1280     }
1281     if (!fiobj_type_is(h, FIOBJ_T_STRING)) { // header specified more than once
1282       free(wctx);
1283       http_send_error(req, 400);
1284       return;
1285     }
1286     fio_str_info_s hv = fiobj_obj2cstr(h);
1287     if ((hv.len != http->access_token_len) || (memcmp(hv.data, http->access_token, http->access_token_len) != 0)) { // -V526
1288       free(wctx);
1289       http_send_error(req, 403);
1290       return;
1291     }
1292   }
1293   if (http_upgrade2ws(req,
1294                       .on_message = _jbr_ws_on_message,
1295                       .on_open = _jbr_ws_on_open,
1296                       .on_close = _jbr_ws_on_close,
1297                       .udata = wctx) < 0) {
1298     free(wctx);
1299     JBR_RC_REPORT(500, req, JBR_ERROR_WS_UPGRADE);
1300   }
1301 }
1302 
1303 //---------------- Main ---------------------
1304 
_jbr_start_thread(void * op)1305 static void *_jbr_start_thread(void *op) {
1306   JBR jbr = op;
1307   char nbuf[JBNUMBUF_SIZE];
1308   const EJDB_HTTP *http = jbr->http;
1309   const char *bind = http->bind ? http->bind : "localhost";
1310   if (http->port < 1) {
1311     jbr->rc = JBR_ERROR_PORT_INVALID;
1312     if (!jbr->http->blocking) {
1313       pthread_barrier_wait(&jbr->start_barrier);
1314     }
1315     return 0;
1316   }
1317   iwitoa(http->port, nbuf, sizeof(nbuf));
1318   iwlog_info("HTTP/WS endpoint at %s:%s", bind, nbuf);
1319   websocket_optimize4broadcasts(WEBSOCKET_OPTIMIZE_PUBSUB_TEXT, 1);
1320   if (http_listen(nbuf, bind,
1321                   .udata = jbr,
1322                   .on_request = _jbr_on_http_request,
1323                   .on_upgrade = _jbr_on_http_upgrade,
1324                   .on_finish = _jbr_on_http_finish,
1325                   .max_body_size = http->max_body_size,
1326                   .ws_max_msg_size = http->max_body_size) == -1) {
1327     jbr->rc = iwrc_set_errno(JBR_ERROR_HTTP_LISTEN, errno);
1328   }
1329   if (jbr->rc) {
1330     if (!jbr->http->blocking) {
1331       pthread_barrier_wait(&jbr->start_barrier);
1332     }
1333     return 0;
1334   }
1335   fio_state_callback_add(FIO_CALL_PRE_START, _jbr_on_pre_start, jbr);
1336   fio_start(.threads = -2, .workers = 1, .is_no_signal_handlers = !jbr->http->blocking); // Will block current thread
1337                                                                                          // here
1338   return 0;
1339 }
1340 
_jbr_release(JBR * pjbr)1341 static void _jbr_release(JBR *pjbr) {
1342   JBR jbr = *pjbr;
1343   free(jbr);
1344   *pjbr = 0;
1345 }
1346 
jbr_start(EJDB db,const EJDB_OPTS * opts,JBR * pjbr)1347 iwrc jbr_start(EJDB db, const EJDB_OPTS *opts, JBR *pjbr) {
1348   iwrc rc;
1349   *pjbr = 0;
1350   if (!opts->http.enabled) {
1351     return 0;
1352   }
1353   JBR jbr = calloc(1, sizeof(*jbr));
1354   if (!jbr) {
1355     return iwrc_set_errno(IW_ERROR_ALLOC, errno);
1356   }
1357   jbr->db = db;
1358   jbr->terminated = true;
1359   jbr->http = &opts->http;
1360 
1361   if (!jbr->http->blocking) {
1362     int rci = pthread_barrier_init(&jbr->start_barrier, 0, 2);
1363     if (rci) {
1364       free(jbr);
1365       return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
1366     }
1367     rci = pthread_create(&jbr->worker_thread, 0, _jbr_start_thread, jbr);
1368     if (rci) {
1369       pthread_barrier_destroy(&jbr->start_barrier);
1370       free(jbr);
1371       return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
1372     }
1373     pthread_barrier_wait(&jbr->start_barrier);
1374     pthread_barrier_destroy(&jbr->start_barrier);
1375     jbr->terminated = false;
1376     rc = jbr->rc;
1377     if (rc) {
1378       jbr_shutdown(pjbr);
1379       return rc;
1380     }
1381     *pjbr = jbr;
1382   } else {
1383     *pjbr = jbr;
1384     jbr->terminated = false;
1385     _jbr_start_thread(jbr); // Will block here
1386     rc = jbr->rc;
1387     jbr->terminated = true;
1388     IWRC(jbr_shutdown(pjbr), rc);
1389   }
1390   return rc;
1391 }
1392 
jbr_shutdown(JBR * pjbr)1393 iwrc jbr_shutdown(JBR *pjbr) {
1394   if (!*pjbr) {
1395     return 0;
1396   }
1397   JBR jbr = *pjbr;
1398   if (__sync_bool_compare_and_swap(&jbr->terminated, 0, 1)) {
1399     fio_state_callback_remove(FIO_CALL_PRE_START, _jbr_on_pre_start, jbr);
1400     fio_stop();
1401     if (!jbr->http->blocking) {
1402       pthread_join(jbr->worker_thread, 0);
1403     }
1404   }
1405   _jbr_release(pjbr);
1406   *pjbr = 0;
1407   return 0;
1408 }
1409 
_jbr_ecodefn(locale_t locale,uint32_t ecode)1410 static const char *_jbr_ecodefn(locale_t locale, uint32_t ecode) {
1411   if (!((ecode > _JBR_ERROR_START) && (ecode < _JBR_ERROR_END))) {
1412     return 0;
1413   }
1414   switch (ecode) {
1415     case JBR_ERROR_HTTP_LISTEN:
1416       return "Failed to start HTTP network listener (JBR_ERROR_HTTP_LISTEN)";
1417     case JBR_ERROR_PORT_INVALID:
1418       return "Invalid port specified (JBR_ERROR_PORT_INVALID)";
1419     case JBR_ERROR_SEND_RESPONSE:
1420       return "Error sending response (JBR_ERROR_SEND_RESPONSE)";
1421     case JBR_ERROR_WS_UPGRADE:
1422       return "Failed upgrading to websocket connection (JBR_ERROR_WS_UPGRADE)";
1423     case JBR_ERROR_WS_INVALID_MESSAGE:
1424       return "Invalid message recieved (JBR_ERROR_WS_INVALID_MESSAGE)";
1425     case JBR_ERROR_WS_ACCESS_DENIED:
1426       return "Access denied (JBR_ERROR_WS_ACCESS_DENIED)";
1427   }
1428   return 0;
1429 }
1430 
jbr_init()1431 iwrc jbr_init() {
1432   static int _jbr_initialized = 0;
1433   if (!__sync_bool_compare_and_swap(&_jbr_initialized, 0, 1)) {
1434     return 0;
1435   }
1436   k_header_x_access_token_hash = fiobj_hash_string("x-access-token", 14);
1437   k_header_x_hints_hash = fiobj_hash_string("x-hints", 7);
1438   k_header_content_length_hash = fiobj_hash_string("content-length", 14);
1439   k_header_content_type_hash = fiobj_hash_string("content-type", 12);
1440   return iwlog_register_ecodefn(_jbr_ecodefn);
1441 }
1442