1 #include "jbr.h"
2 #include <ejdb2/iowow/iwconv.h>
3 #include <ejdb2/iowow/iwth.h>
4 #include "ejdb2_internal.h"
5
6 #define FIO_INCLUDE_STR
7
8 #include <fio.h>
9 #include <fiobj.h>
10 #include <fiobj_data.h>
11 #include <http/http.h>
12 #include <ctype.h>
13
14 #define JBR_MAX_KEY_LEN 36
15 #define JBR_HTTP_CHUNK_SIZE 4096
16 #define JBR_WS_STR_PREMATURE_END "Premature end of message"
17
18 static uint64_t k_header_x_access_token_hash;
19 static uint64_t k_header_x_hints_hash;
20 static uint64_t k_header_content_length_hash;
21 static uint64_t k_header_content_type_hash;
22
23 typedef enum {
24 JBR_GET = 1,
25 JBR_PUT,
26 JBR_PATCH,
27 JBR_DELETE,
28 JBR_POST,
29 JBR_HEAD,
30 JBR_OPTIONS,
31 } jbr_method_t;
32
33 struct _JBR {
34 volatile bool terminated;
35 volatile iwrc rc;
36 pthread_t worker_thread;
37 pthread_barrier_t start_barrier;
38 const EJDB_HTTP *http;
39 EJDB db;
40 };
41
42 typedef struct _JBRCTX {
43 JBR jbr;
44 http_s *req;
45 const char *collection;
46 IWXSTR *wbuf;
47 int64_t id;
48 size_t collection_len;
49 jbr_method_t method;
50 bool read_anon;
51 bool data_sent;
52 } JBRCTX;
53
54 #define JBR_RC_REPORT(code_, r_, rc_) \
55 do { \
56 if ((code_) >= 500) iwlog_ecode_error3(rc_); \
57 const char *strerr = iwlog_ecode_explained(rc_); \
58 _jbr_http_send(r_, code_, "text/plain", strerr, strerr ? strlen(strerr) : 0); \
59 } while (0)
60
_jbr_http_set_content_length(http_s * r,uintptr_t length)61 IW_INLINE void _jbr_http_set_content_length(http_s *r, uintptr_t length) {
62 if (!fiobj_hash_get2(r->private_data.out_headers, k_header_content_length_hash)) {
63 fiobj_hash_set(r->private_data.out_headers, HTTP_HEADER_CONTENT_LENGTH,
64 fiobj_num_new(length));
65 }
66 }
67
_jbr_http_set_content_type(http_s * r,const char * ctype)68 IW_INLINE void _jbr_http_set_content_type(http_s *r, const char *ctype) {
69 if (!fiobj_hash_get2(r->private_data.out_headers, k_header_content_type_hash)) {
70 fiobj_hash_set(r->private_data.out_headers, HTTP_HEADER_CONTENT_TYPE,
71 fiobj_str_new(ctype, strlen(ctype)));
72 }
73 }
74
_jbr_http_set_header(http_s * r,char * name,size_t nlen,char * val,size_t vlen)75 IW_INLINE void _jbr_http_set_header(
76 http_s *r,
77 char *name, size_t nlen,
78 char *val, size_t vlen) {
79 http_set_header2(r, (fio_str_info_s) {
80 .data = name, .len = nlen
81 }, (fio_str_info_s) {
82 .data = val, .len = vlen
83 });
84 }
85
_jbr_http_send(http_s * r,int status,const char * ctype,const char * body,int bodylen)86 static iwrc _jbr_http_send(http_s *r, int status, const char *ctype, const char *body, int bodylen) {
87 if (!r || !r->private_data.out_headers) {
88 iwlog_ecode_error3(IW_ERROR_INVALID_ARGS);
89 return IW_ERROR_INVALID_ARGS;
90 }
91 r->status = status;
92 if (ctype) {
93 _jbr_http_set_content_type(r, ctype);
94 }
95 if (http_send_body(r, (char*) body, bodylen)) {
96 iwlog_ecode_error3(JBR_ERROR_SEND_RESPONSE);
97 return JBR_ERROR_SEND_RESPONSE;
98 }
99 return 0;
100 }
101
_jbr_http_error_send(http_s * r,int status)102 IW_INLINE iwrc _jbr_http_error_send(http_s *r, int status) {
103 return _jbr_http_send(r, status, 0, 0, 0);
104 }
105
_jbr_http_error_send2(http_s * r,int status,const char * ctype,const char * body,int bodylen)106 IW_INLINE iwrc _jbr_http_error_send2(http_s *r, int status, const char *ctype, const char *body, int bodylen) {
107 return _jbr_http_send(r, status, ctype, body, bodylen);
108 }
109
_jbr_flush_chunk(JBRCTX * rctx,bool finish)110 static iwrc _jbr_flush_chunk(JBRCTX *rctx, bool finish) {
111 http_s *req = rctx->req;
112 IWXSTR *wbuf = rctx->wbuf;
113 char nbuf[JBNUMBUF_SIZE + 2]; // + \r\n
114 assert(wbuf);
115 if (!rctx->data_sent) {
116 req->status = 200;
117 _jbr_http_set_content_type(req, "application/json");
118 _jbr_http_set_header(req, "transfer-encoding", 17, "chunked", 7);
119 if (http_write_headers(req) < 0) {
120 iwlog_ecode_error3(JBR_ERROR_SEND_RESPONSE);
121 return JBR_ERROR_SEND_RESPONSE;
122 }
123 rctx->data_sent = true;
124 }
125 if (!finish && (iwxstr_size(wbuf) < JBR_HTTP_CHUNK_SIZE)) {
126 return 0;
127 }
128 intptr_t uuid = http_uuid(req);
129 if (iwxstr_size(wbuf) > 0) {
130 int sz = snprintf(nbuf, JBNUMBUF_SIZE, "%zX\r\n", iwxstr_size(wbuf));
131 if (fio_write(uuid, nbuf, sz) < 0) {
132 iwlog_ecode_error3(JBR_ERROR_SEND_RESPONSE);
133 return JBR_ERROR_SEND_RESPONSE;
134 }
135 if (fio_write(uuid, iwxstr_ptr(wbuf), iwxstr_size(wbuf)) < 0) {
136 iwlog_ecode_error3(JBR_ERROR_SEND_RESPONSE);
137 return JBR_ERROR_SEND_RESPONSE;
138 }
139 if (fio_write(uuid, "\r\n", 2) < 0) {
140 iwlog_ecode_error3(JBR_ERROR_SEND_RESPONSE);
141 return JBR_ERROR_SEND_RESPONSE;
142 }
143 iwxstr_clear(wbuf);
144 }
145 if (finish) {
146 if (fio_write(uuid, "0\r\n\r\n", 5) < 0) {
147 iwlog_ecode_error3(JBR_ERROR_SEND_RESPONSE);
148 return JBR_ERROR_SEND_RESPONSE;
149 }
150 }
151 return 0;
152 }
153
_jbr_query_visitor(EJDB_EXEC * ux,EJDB_DOC doc,int64_t * step)154 static iwrc _jbr_query_visitor(EJDB_EXEC *ux, EJDB_DOC doc, int64_t *step) {
155 JBRCTX *rctx = ux->opaque;
156 assert(rctx);
157 iwrc rc = 0;
158 IWXSTR *wbuf = rctx->wbuf;
159 if (!wbuf) {
160 wbuf = iwxstr_new2(512);
161 if (!wbuf) {
162 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
163 }
164 rctx->wbuf = wbuf;
165 }
166 if (ux->log) {
167 rc = iwxstr_cat(wbuf, iwxstr_ptr(ux->log), iwxstr_size(ux->log));
168 RCRET(rc);
169 rc = iwxstr_cat(wbuf, "--------------------", 20);
170 RCRET(rc);
171 iwxstr_destroy(ux->log);
172 ux->log = 0;
173 }
174 rc = iwxstr_printf(wbuf, "\r\n%lld\t", doc->id);
175 RCRET(rc);
176 if (doc->node) {
177 rc = jbn_as_json(doc->node, jbl_xstr_json_printer, wbuf, 0);
178 } else {
179 rc = jbl_as_json(doc->raw, jbl_xstr_json_printer, wbuf, 0);
180 }
181 RCRET(rc);
182 return _jbr_flush_chunk(rctx, false);
183 }
184
_jbr_on_query(JBRCTX * rctx)185 static void _jbr_on_query(JBRCTX *rctx) {
186 http_s *req = rctx->req;
187 fio_str_info_s data = fiobj_data_read(req->body, 0);
188 if (data.len < 1) {
189 _jbr_http_error_send(rctx->req, 400);
190 return;
191 }
192 EJDB_EXEC ux = {
193 .opaque = rctx,
194 .db = rctx->jbr->db,
195 .visitor = _jbr_query_visitor
196 };
197
198 // Collection name must be encoded in query
199 iwrc rc = jql_create2(&ux.q, 0, data.data, JQL_SILENT_ON_PARSE_ERROR | JQL_KEEP_QUERY_ON_PARSE_ERROR);
200 RCGO(rc, finish);
201 if (rctx->read_anon && jql_has_apply(ux.q)) {
202 // We have not permitted data modification request
203 jql_destroy(&ux.q);
204 _jbr_http_error_send(rctx->req, 403);
205 return;
206 }
207
208 FIOBJ h = fiobj_hash_get2(req->headers, k_header_x_hints_hash);
209 if (h) {
210 if (!fiobj_type_is(h, FIOBJ_T_STRING)) {
211 jql_destroy(&ux.q);
212 _jbr_http_error_send(req, 400);
213 return;
214 }
215 fio_str_info_s hv = fiobj_obj2cstr(h);
216 if (strstr(hv.data, "explain")) {
217 ux.log = iwxstr_new();
218 if (!ux.log) {
219 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
220 goto finish;
221 }
222 }
223 }
224
225 rc = ejdb_exec(&ux);
226
227 if (!rc && rctx->wbuf) {
228 rc = iwxstr_cat(rctx->wbuf, "\r\n", 2);
229 RCGO(rc, finish);
230 rc = _jbr_flush_chunk(rctx, true);
231 }
232
233 finish:
234 if (rc) {
235 iwrc rcs = rc;
236 iwrc_strip_code(&rcs);
237 switch (rcs) {
238 case JQL_ERROR_QUERY_PARSE: {
239 const char *err = jql_error(ux.q);
240 _jbr_http_error_send2(rctx->req, 400, "text/plain", err, err ? (int) strlen(err) : 0);
241 break;
242 }
243 case JQL_ERROR_NO_COLLECTION:
244 JBR_RC_REPORT(400, req, rc);
245 break;
246 default:
247 if (rctx->data_sent) {
248 // We cannot report error over HTTP
249 // because already sent some data to client
250 iwlog_ecode_error3(rc);
251 http_complete(req);
252 } else {
253 JBR_RC_REPORT(500, req, rc);
254 }
255 break;
256 }
257 } else if (rctx->data_sent) {
258 http_complete(req);
259 } else if (ux.log) {
260 iwxstr_cat(ux.log, "--------------------", 20);
261 if (jql_has_aggregate_count(ux.q)) {
262 iwxstr_printf(ux.log, "\n%lld", ux.cnt);
263 }
264 _jbr_http_send(req, 200, "text/plain", iwxstr_ptr(ux.log), iwxstr_size(ux.log));
265 } else {
266 if (jql_has_aggregate_count(ux.q)) {
267 char nbuf[JBNUMBUF_SIZE];
268 snprintf(nbuf, sizeof(nbuf), "%" PRId64, ux.cnt);
269 _jbr_http_send(req, 200, "text/plain", nbuf, (int) strlen(nbuf));
270 } else {
271 _jbr_http_send(req, 200, 0, 0, 0);
272 }
273 }
274 if (ux.q) {
275 jql_destroy(&ux.q);
276 }
277 if (ux.log) {
278 iwxstr_destroy(ux.log);
279 }
280 if (rctx->wbuf) {
281 iwxstr_destroy(rctx->wbuf);
282 rctx->wbuf = 0;
283 }
284 }
285
_jbr_on_patch(JBRCTX * rctx)286 static void _jbr_on_patch(JBRCTX *rctx) {
287 if (rctx->read_anon) {
288 _jbr_http_error_send(rctx->req, 403);
289 return;
290 }
291 EJDB db = rctx->jbr->db;
292 http_s *req = rctx->req;
293 fio_str_info_s data = fiobj_data_read(req->body, 0);
294 if (data.len < 1) {
295 _jbr_http_error_send(rctx->req, 400);
296 return;
297 }
298 iwrc rc = ejdb_patch(db, rctx->collection, data.data, rctx->id);
299 iwrc_strip_code(&rc);
300 switch (rc) {
301 case IWKV_ERROR_NOTFOUND:
302 case IW_ERROR_NOT_EXISTS:
303 _jbr_http_error_send(req, 404);
304 return;
305 case JBL_ERROR_PARSE_JSON:
306 case JBL_ERROR_PARSE_INVALID_CODEPOINT:
307 case JBL_ERROR_PARSE_INVALID_UTF8:
308 case JBL_ERROR_PARSE_UNQUOTED_STRING:
309 case JBL_ERROR_PATCH_TARGET_INVALID:
310 case JBL_ERROR_PATCH_NOVALUE:
311 case JBL_ERROR_PATCH_INVALID_OP:
312 case JBL_ERROR_PATCH_TEST_FAILED:
313 case JBL_ERROR_PATCH_INVALID_ARRAY_INDEX:
314 case JBL_ERROR_JSON_POINTER:
315 JBR_RC_REPORT(400, req, rc);
316 break;
317 default:
318 break;
319 }
320 if (rc) {
321 JBR_RC_REPORT(500, req, rc);
322 } else {
323 _jbr_http_send(req, 200, 0, 0, 0);
324 }
325 }
326
_jbr_on_delete(JBRCTX * rctx)327 static void _jbr_on_delete(JBRCTX *rctx) {
328 if (rctx->read_anon) {
329 _jbr_http_error_send(rctx->req, 403);
330 return;
331 }
332 EJDB db = rctx->jbr->db;
333 http_s *req = rctx->req;
334 iwrc rc = ejdb_del(db, rctx->collection, rctx->id);
335 if ((rc == IWKV_ERROR_NOTFOUND) || (rc == IW_ERROR_NOT_EXISTS)) {
336 _jbr_http_error_send(req, 404);
337 return;
338 } else if (rc) {
339 JBR_RC_REPORT(500, req, rc);
340 return;
341 }
342 _jbr_http_send(req, 200, 0, 0, 0);
343 }
344
_jbr_on_put(JBRCTX * rctx)345 static void _jbr_on_put(JBRCTX *rctx) {
346 if (rctx->read_anon) {
347 _jbr_http_error_send(rctx->req, 403);
348 return;
349 }
350 JBL jbl;
351 EJDB db = rctx->jbr->db;
352 http_s *req = rctx->req;
353 fio_str_info_s data = fiobj_data_read(req->body, 0);
354 if (data.len < 1) {
355 _jbr_http_error_send(rctx->req, 400);
356 return;
357 }
358 iwrc rc = jbl_from_json(&jbl, data.data);
359 if (rc) {
360 JBR_RC_REPORT(400, req, rc);
361 return;
362 }
363 rc = ejdb_put(db, rctx->collection, jbl, rctx->id);
364 if (rc) {
365 JBR_RC_REPORT(500, req, rc);
366 goto finish;
367 }
368 _jbr_http_send(req, 200, 0, 0, 0);
369
370 finish:
371 jbl_destroy(&jbl);
372 }
373
_jbr_on_post(JBRCTX * rctx)374 static void _jbr_on_post(JBRCTX *rctx) {
375 if (rctx->read_anon) {
376 _jbr_http_error_send(rctx->req, 403);
377 return;
378 }
379 JBL jbl;
380 int64_t id;
381 EJDB db = rctx->jbr->db;
382 http_s *req = rctx->req;
383 fio_str_info_s data = fiobj_data_read(req->body, 0);
384 if (data.len < 1) {
385 _jbr_http_error_send(rctx->req, 400);
386 return;
387 }
388 iwrc rc = jbl_from_json(&jbl, data.data);
389 if (rc) {
390 JBR_RC_REPORT(400, req, rc);
391 return;
392 }
393 rc = ejdb_put_new(db, rctx->collection, jbl, &id);
394 if (rc) {
395 JBR_RC_REPORT(500, req, rc);
396 goto finish;
397 }
398
399 char nbuf[JBNUMBUF_SIZE];
400 int len = iwitoa(id, nbuf, sizeof(nbuf));
401 _jbr_http_send(req, 200, "text/plain", nbuf, len);
402
403 finish:
404 jbl_destroy(&jbl);
405 }
406
_jbr_on_get(JBRCTX * rctx)407 static void _jbr_on_get(JBRCTX *rctx) {
408 JBL jbl;
409 IWXSTR *xstr = 0;
410 int nbytes = 0;
411 EJDB db = rctx->jbr->db;
412 http_s *req = rctx->req;
413
414 iwrc rc = ejdb_get(db, rctx->collection, rctx->id, &jbl);
415 if ((rc == IWKV_ERROR_NOTFOUND) || (rc == IW_ERROR_NOT_EXISTS)) {
416 _jbr_http_error_send(req, 404);
417 return;
418 } else if (rc) {
419 JBR_RC_REPORT(500, req, rc);
420 return;
421 }
422 if (req->method == JBR_HEAD) {
423 rc = jbl_as_json(jbl, jbl_count_json_printer, &nbytes, JBL_PRINT_PRETTY);
424 } else {
425 xstr = iwxstr_new2(jbl->bn.size * 2);
426 if (!xstr) {
427 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
428 goto finish;
429 }
430 rc = jbl_as_json(jbl, jbl_xstr_json_printer, xstr, JBL_PRINT_PRETTY);
431 }
432 RCGO(rc, finish);
433
434 _jbr_http_send(req, 200, "application/json",
435 xstr ? iwxstr_ptr(xstr) : 0,
436 xstr ? (int) iwxstr_size(xstr) : nbytes);
437 finish:
438 if (rc) {
439 JBR_RC_REPORT(500, req, rc);
440 }
441 jbl_destroy(&jbl);
442 if (xstr) {
443 iwxstr_destroy(xstr);
444 }
445 }
446
_jbr_on_options(JBRCTX * rctx)447 static void _jbr_on_options(JBRCTX *rctx) {
448 JBL jbl;
449 EJDB db = rctx->jbr->db;
450 http_s *req = rctx->req;
451 const EJDB_HTTP *http = rctx->jbr->http;
452
453 iwrc rc = ejdb_get_meta(db, &jbl);
454 if (rc) {
455 JBR_RC_REPORT(500, req, rc);
456 return;
457 }
458 IWXSTR *xstr = iwxstr_new2(jbl->bn.size * 2);
459 if (!xstr) {
460 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
461 goto finish;
462 }
463 rc = jbl_as_json(jbl, jbl_xstr_json_printer, xstr, JBL_PRINT_PRETTY);
464 RCGO(rc, finish);
465
466 if (http->read_anon) {
467 _jbr_http_set_header(req, "Allow", 5, "GET, HEAD, POST, OPTIONS", 24);
468 } else {
469 _jbr_http_set_header(req, "Allow", 5, "GET, HEAD, POST, PUT, PATCH, DELETE, OPTIONS", 44);
470 }
471
472 if (http->cors) {
473 _jbr_http_set_header(req, "Access-Control-Allow-Origin", 27, "*", 1);
474 _jbr_http_set_header(req, "Access-Control-Allow-Headers", 28, "X-Requested-With, Content-Type, Accept, Origin, Authorization", 61);
475
476 if (http->read_anon) {
477 _jbr_http_set_header(req, "Access-Control-Allow-Methods", 28, "GET, HEAD, POST, OPTIONS", 24);
478 } else {
479 _jbr_http_set_header(req, "Access-Control-Allow-Methods", 28, "GET, HEAD, POST, PUT, PATCH, DELETE, OPTIONS", 44);
480 }
481 }
482
483 _jbr_http_send(req, 200, "application/json",
484 iwxstr_ptr(xstr),
485 iwxstr_size(xstr));
486 finish:
487 if (rc) {
488 JBR_RC_REPORT(500, req, rc);
489 }
490 jbl_destroy(&jbl);
491 if (xstr) {
492 iwxstr_destroy(xstr);
493 }
494 }
495
_jbr_fill_ctx(http_s * req,JBRCTX * r)496 static bool _jbr_fill_ctx(http_s *req, JBRCTX *r) {
497 JBR jbr = req->udata;
498 memset(r, 0, sizeof(*r));
499 r->req = req;
500 r->jbr = jbr;
501 fio_str_info_s method = fiobj_obj2cstr(req->method);
502 switch (method.len) {
503 case 3:
504 if (!strncmp("GET", method.data, method.len)) {
505 r->method = JBR_GET;
506 } else if (!strncmp("PUT", method.data, method.len)) {
507 r->method = JBR_PUT;
508 }
509 break;
510 case 4:
511 if (!strncmp("POST", method.data, method.len)) {
512 r->method = JBR_POST;
513 } else if (!strncmp("HEAD", method.data, method.len)) {
514 r->method = JBR_HEAD;
515 }
516 break;
517 case 5:
518 if (!strncmp("PATCH", method.data, method.len)) {
519 r->method = JBR_PATCH;
520 }
521 break;
522 case 6:
523 if (!strncmp("DELETE", method.data, method.len)) {
524 r->method = JBR_DELETE;
525 }
526 break;
527 case 7:
528 if (!strncmp("OPTIONS", method.data, method.len)) {
529 r->method = JBR_OPTIONS;
530 }
531 break;
532 }
533 if (!r->method) {
534 // Unknown method
535 return false;
536 }
537 fio_str_info_s path = fiobj_obj2cstr(req->path);
538 if (!req->path || (path.len < 2)) {
539 return true;
540 } else if (r->method == JBR_OPTIONS) {
541 return false;
542 }
543
544 char *c = strchr(path.data + 1, '/');
545 if (!c) {
546 switch (r->method) {
547 case JBR_GET:
548 case JBR_HEAD:
549 case JBR_PUT:
550 case JBR_DELETE:
551 case JBR_PATCH:
552 return false;
553 default:
554 break;
555 }
556 r->collection = path.data + 1;
557 r->collection_len = path.len - 1;
558 } else {
559 char *eptr;
560 char nbuf[JBNUMBUF_SIZE];
561 r->collection = path.data + 1;
562 r->collection_len = c - r->collection;
563 int nlen = path.len - (c - path.data) - 1;
564 if (nlen < 1) {
565 goto finish;
566 }
567 if (nlen > JBNUMBUF_SIZE - 1) {
568 return false;
569 }
570 memcpy(nbuf, r->collection + r->collection_len + 1, nlen);
571 nbuf[nlen] = '\0';
572 r->id = strtoll(nbuf, &eptr, 10);
573 if ((*eptr != '\0') || (r->id < 1) || (r->method == JBR_POST)) {
574 return false;
575 }
576 }
577
578 finish:
579 return (r->collection_len <= EJDB_COLLECTION_NAME_MAX_LEN);
580 }
581
_jbr_on_http_request(http_s * req)582 static void _jbr_on_http_request(http_s *req) {
583 JBRCTX rctx;
584 JBR jbr = req->udata;
585 assert(jbr);
586 const EJDB_HTTP *http = jbr->http;
587 char cname[EJDB_COLLECTION_NAME_MAX_LEN + 1];
588
589 if (http->cors) {
590 _jbr_http_set_header(req, "Access-Control-Allow-Origin", 27, "*", 1);
591 }
592
593 if (!_jbr_fill_ctx(req, &rctx)) {
594 http_send_error(req, 400); // Bad request
595 return;
596 }
597 if (http->access_token) {
598 FIOBJ h = fiobj_hash_get2(req->headers, k_header_x_access_token_hash);
599 if (!h) {
600 if (http->read_anon) {
601 if ((rctx.method == JBR_GET) || (rctx.method == JBR_HEAD) || ((rctx.method == JBR_POST) && !rctx.collection)) {
602 rctx.read_anon = true;
603 goto process;
604 }
605 }
606 http_send_error(req, 401);
607 return;
608 }
609 if (!fiobj_type_is(h, FIOBJ_T_STRING)) { // header specified more than once
610 http_send_error(req, 400);
611 return;
612 }
613 fio_str_info_s hv = fiobj_obj2cstr(h);
614 if ((hv.len != http->access_token_len) || (memcmp(hv.data, http->access_token, http->access_token_len) != 0)) { // -V526
615 http_send_error(req, 403);
616 return;
617 }
618 }
619
620 process:
621 if (rctx.collection) {
622 // convert to `\0` terminated c-string
623 memcpy(cname, rctx.collection, rctx.collection_len);
624 cname[rctx.collection_len] = '\0';
625 rctx.collection = cname;
626 switch (rctx.method) {
627 case JBR_GET:
628 case JBR_HEAD:
629 _jbr_on_get(&rctx);
630 break;
631 case JBR_POST:
632 _jbr_on_post(&rctx);
633 break;
634 case JBR_PUT:
635 _jbr_on_put(&rctx);
636 break;
637 case JBR_PATCH:
638 _jbr_on_patch(&rctx);
639 break;
640 case JBR_DELETE:
641 _jbr_on_delete(&rctx);
642 break;
643 default:
644 http_send_error(req, 400);
645 break;
646 }
647 } else if (rctx.method == JBR_POST) {
648 _jbr_on_query(&rctx);
649 } else if (rctx.method == JBR_OPTIONS) {
650 _jbr_on_options(&rctx);
651 } else {
652 http_send_error(req, 400);
653 }
654 }
655
_jbr_on_http_finish(struct http_settings_s * settings)656 static void _jbr_on_http_finish(struct http_settings_s *settings) {
657 }
658
_jbr_on_pre_start(void * op)659 static void _jbr_on_pre_start(void *op) {
660 JBR jbr = op;
661 if (!jbr->http->blocking) {
662 pthread_barrier_wait(&jbr->start_barrier);
663 }
664 }
665
666 //------------------ WS ---------------------
667
668
669 #define _WS_KEYPREFIX_BUFSZ (JBNUMBUF_SIZE + JBR_MAX_KEY_LEN + 2)
670
671 typedef enum {
672 JBWS_NONE,
673 JBWS_SET,
674 JBWS_GET,
675 JBWS_ADD,
676 JBWS_DEL,
677 JBWS_PATCH,
678 JBWS_QUERY,
679 JBWS_EXPLAIN,
680 JBWS_INFO,
681 JBWS_IDX,
682 JBWS_NIDX,
683 JBWS_REMOVE_COLL,
684 } jbwsop_t;
685
686 typedef struct _JBWCTX {
687 bool read_anon;
688 EJDB db;
689 ws_s *ws;
690 } JBWCTX;
691
_jbr_ws_write_text(ws_s * ws,const char * data,int len)692 IW_INLINE bool _jbr_ws_write_text(ws_s *ws, const char *data, int len) {
693 if (fio_is_closed(websocket_uuid(ws)) || (websocket_write(ws, (fio_str_info_s) {
694 .data = (char*) data, .len = len
695 }, 1) < 0)) {
696 iwlog_warn2("Websocket channel closed");
697 return false;
698 }
699 return true;
700 }
701
_jbr_fill_prefix_buf(const char * key,int64_t id,char buf[static _WS_KEYPREFIX_BUFSZ])702 IW_INLINE int _jbr_fill_prefix_buf(const char *key, int64_t id, char buf[static _WS_KEYPREFIX_BUFSZ]) {
703 int len = (int) strlen(key);
704 char *wp = buf;
705 memcpy(wp, key, len); // NOLINT(bugprone-not-null-terminated-result)
706 wp += len;
707 *wp++ = '\t';
708 wp += iwitoa(id, wp, _WS_KEYPREFIX_BUFSZ - (wp - buf));
709 return (int) (wp - buf);
710 }
711
_jbr_ws_on_open(ws_s * ws)712 static void _jbr_ws_on_open(ws_s *ws) {
713 JBWCTX *wctx = websocket_udata_get(ws);
714 if (wctx) {
715 wctx->ws = ws;
716 }
717 }
718
_jbr_ws_on_close(intptr_t uuid,void * udata)719 static void _jbr_ws_on_close(intptr_t uuid, void *udata) {
720 JBWCTX *wctx = udata;
721 free(wctx);
722 }
723
_jbr_ws_send_error(JBWCTX * wctx,const char * key,const char * error,const char * extra)724 static void _jbr_ws_send_error(JBWCTX *wctx, const char *key, const char *error, const char *extra) {
725 assert(wctx && key && error);
726 IWXSTR *xstr = iwxstr_new();
727 if (!xstr) {
728 iwlog_ecode_error3(iwrc_set_errno(IW_ERROR_ALLOC, errno));
729 return;
730 }
731 iwrc rc;
732 if (extra) {
733 rc = iwxstr_printf(xstr, "%s ERROR: %s %s", key, error, extra);
734 } else {
735 rc = iwxstr_printf(xstr, "%s ERROR: %s", key, error);
736 }
737 if (rc) {
738 iwlog_ecode_error3(rc);
739 } else {
740 _jbr_ws_write_text(wctx->ws, iwxstr_ptr(xstr), iwxstr_size(xstr));
741 }
742 iwxstr_destroy(xstr);
743 }
744
_jbr_ws_send_rc(JBWCTX * wctx,const char * key,iwrc rc,const char * extra)745 static void _jbr_ws_send_rc(JBWCTX *wctx, const char *key, iwrc rc, const char *extra) {
746 const char *error = iwlog_ecode_explained(rc);
747 if (error) {
748 _jbr_ws_send_error(wctx, key, error, extra);
749 }
750 }
751
_jbr_ws_add_document(JBWCTX * wctx,const char * key,const char * coll,const char * json)752 static void _jbr_ws_add_document(JBWCTX *wctx, const char *key, const char *coll, const char *json) {
753 if (wctx->read_anon) {
754 _jbr_ws_send_rc(wctx, key, JBR_ERROR_WS_ACCESS_DENIED, 0);
755 return;
756 }
757 JBL jbl;
758 int64_t id;
759 iwrc rc = jbl_from_json(&jbl, json);
760 if (rc) {
761 _jbr_ws_send_rc(wctx, key, rc, 0);
762 return;
763 }
764 rc = ejdb_put_new(wctx->db, coll, jbl, &id);
765 if (rc) {
766 _jbr_ws_send_rc(wctx, key, rc, 0);
767 goto finish;
768 }
769 char pbuf[_WS_KEYPREFIX_BUFSZ];
770 _jbr_fill_prefix_buf(key, id, pbuf);
771 _jbr_ws_write_text(wctx->ws, pbuf, (int) strlen(pbuf));
772
773 finish:
774 jbl_destroy(&jbl);
775 }
776
_jbr_ws_set_document(JBWCTX * wctx,const char * key,const char * coll,int64_t id,const char * json)777 static void _jbr_ws_set_document(JBWCTX *wctx, const char *key, const char *coll, int64_t id, const char *json) {
778 if (wctx->read_anon) {
779 _jbr_ws_send_rc(wctx, key, JBR_ERROR_WS_ACCESS_DENIED, 0);
780 return;
781 }
782 JBL jbl;
783 iwrc rc = jbl_from_json(&jbl, json);
784 if (rc) {
785 _jbr_ws_send_rc(wctx, key, rc, 0);
786 return;
787 }
788 rc = ejdb_put(wctx->db, coll, jbl, id);
789 if (rc) {
790 _jbr_ws_send_rc(wctx, key, rc, 0);
791 goto finish;
792 }
793 char pbuf[_WS_KEYPREFIX_BUFSZ];
794 int len = _jbr_fill_prefix_buf(key, id, pbuf);
795 _jbr_ws_write_text(wctx->ws, pbuf, len);
796
797 finish:
798 jbl_destroy(&jbl);
799 }
800
_jbr_ws_get_document(JBWCTX * wctx,const char * key,const char * coll,int64_t id)801 static void _jbr_ws_get_document(JBWCTX *wctx, const char *key, const char *coll, int64_t id) {
802 JBL jbl;
803 iwrc rc = ejdb_get(wctx->db, coll, id, &jbl);
804 if (rc) {
805 _jbr_ws_send_rc(wctx, key, rc, 0);
806 return;
807 }
808 IWXSTR *xstr = iwxstr_new2(jbl->bn.size * 2);
809 if (!xstr) {
810 rc = iwrc_set_errno(rc, IW_ERROR_ALLOC);
811 iwlog_ecode_error3(rc);
812 _jbr_ws_send_rc(wctx, key, rc, 0);
813 return;
814 }
815 char pbuf[_WS_KEYPREFIX_BUFSZ];
816 _jbr_fill_prefix_buf(key, id, pbuf);
817 rc = iwxstr_printf(xstr, "%s\t", pbuf);
818 if (rc) {
819 _jbr_ws_send_rc(wctx, key, rc, 0);
820 goto finish;
821 }
822 rc = jbl_as_json(jbl, jbl_xstr_json_printer, xstr, JBL_PRINT_PRETTY);
823 if (rc) {
824 _jbr_ws_send_rc(wctx, key, rc, 0);
825 goto finish;
826 }
827 _jbr_ws_write_text(wctx->ws, iwxstr_ptr(xstr), iwxstr_size(xstr));
828
829 finish:
830 iwxstr_destroy(xstr);
831 jbl_destroy(&jbl);
832 }
833
_jbr_ws_del_document(JBWCTX * wctx,const char * key,const char * coll,int64_t id)834 static void _jbr_ws_del_document(JBWCTX *wctx, const char *key, const char *coll, int64_t id) {
835 iwrc rc = ejdb_del(wctx->db, coll, id);
836 if (rc) {
837 _jbr_ws_send_rc(wctx, key, rc, 0);
838 return;
839 }
840 char pbuf[_WS_KEYPREFIX_BUFSZ];
841 int len = _jbr_fill_prefix_buf(key, id, pbuf);
842 _jbr_ws_write_text(wctx->ws, pbuf, len);
843 }
844
_jbr_ws_patch_document(JBWCTX * wctx,const char * key,const char * coll,int64_t id,const char * json)845 static void _jbr_ws_patch_document(JBWCTX *wctx, const char *key, const char *coll, int64_t id, const char *json) {
846 if (wctx->read_anon) {
847 _jbr_ws_send_rc(wctx, key, JBR_ERROR_WS_ACCESS_DENIED, 0);
848 return;
849 }
850 iwrc rc = ejdb_patch(wctx->db, coll, json, id);
851 if (rc) {
852 _jbr_ws_send_rc(wctx, key, rc, 0);
853 return;
854 }
855 char pbuf[_WS_KEYPREFIX_BUFSZ];
856 int len = _jbr_fill_prefix_buf(key, id, pbuf);
857 _jbr_ws_write_text(wctx->ws, pbuf, len);
858 }
859
_jbr_ws_set_index(JBWCTX * wctx,const char * key,const char * coll,int64_t mode,const char * path)860 static void _jbr_ws_set_index(JBWCTX *wctx, const char *key, const char *coll, int64_t mode, const char *path) {
861 if (wctx->read_anon) {
862 _jbr_ws_send_rc(wctx, key, JBR_ERROR_WS_ACCESS_DENIED, 0);
863 return;
864 }
865 iwrc rc = ejdb_ensure_index(wctx->db, coll, path, mode);
866 if (rc) {
867 _jbr_ws_send_rc(wctx, key, rc, 0);
868 } else {
869 _jbr_ws_write_text(wctx->ws, key, (int) strlen(key));
870 }
871 }
872
_jbr_ws_del_index(JBWCTX * wctx,const char * key,const char * coll,int64_t mode,const char * path)873 static void _jbr_ws_del_index(JBWCTX *wctx, const char *key, const char *coll, int64_t mode, const char *path) {
874 if (wctx->read_anon) {
875 _jbr_ws_send_rc(wctx, key, JBR_ERROR_WS_ACCESS_DENIED, 0);
876 return;
877 }
878 iwrc rc = ejdb_remove_index(wctx->db, coll, path, mode);
879 if (rc) {
880 _jbr_ws_send_rc(wctx, key, rc, 0);
881 } else {
882 _jbr_ws_write_text(wctx->ws, key, (int) strlen(key));
883 }
884 }
885
886 typedef struct JBWQCTX {
887 JBWCTX *wctx;
888 IWXSTR *wbuf;
889 const char *key;
890 } JBWQCTX;
891
_jbr_ws_query_visitor(EJDB_EXEC * ux,EJDB_DOC doc,int64_t * step)892 static iwrc _jbr_ws_query_visitor(EJDB_EXEC *ux, EJDB_DOC doc, int64_t *step) {
893 iwrc rc = 0;
894 JBWQCTX *qctx = ux->opaque;
895 assert(qctx);
896 IWXSTR *wbuf = qctx->wbuf;
897 if (!wbuf) {
898 wbuf = iwxstr_new2(512);
899 if (!wbuf) {
900 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
901 }
902 qctx->wbuf = wbuf;
903 } else {
904 iwxstr_clear(wbuf);
905 }
906 if (ux->log) {
907 rc = iwxstr_printf(wbuf, "%s\texplain\t%s", qctx->key, iwxstr_ptr(ux->log));
908 iwxstr_destroy(ux->log);
909 ux->log = 0;
910 RCRET(rc);
911 _jbr_ws_write_text(qctx->wctx->ws, iwxstr_ptr(wbuf), iwxstr_size(wbuf));
912 iwxstr_clear(wbuf);
913 }
914
915 rc = iwxstr_printf(wbuf, "%s\t%lld\t", qctx->key, doc->id);
916 RCRET(rc);
917
918 if (doc->node) {
919 rc = jbn_as_json(doc->node, jbl_xstr_json_printer, wbuf, 0);
920 } else {
921 rc = jbl_as_json(doc->raw, jbl_xstr_json_printer, wbuf, 0);
922 }
923 RCRET(rc);
924 if (!_jbr_ws_write_text(qctx->wctx->ws, iwxstr_ptr(wbuf), iwxstr_size(wbuf))) {
925 *step = 0;
926 }
927 return 0;
928 }
929
_jbr_ws_query(JBWCTX * wctx,const char * key,const char * coll,const char * query,bool explain)930 static void _jbr_ws_query(JBWCTX *wctx, const char *key, const char *coll, const char *query, bool explain) {
931 JBWQCTX qctx = {
932 .wctx = wctx,
933 .key = key
934 };
935 EJDB_EXEC ux = {
936 .db = wctx->db,
937 .opaque = &qctx,
938 .visitor = _jbr_ws_query_visitor,
939 };
940
941 iwrc rc = jql_create2(&ux.q, coll, query, JQL_SILENT_ON_PARSE_ERROR | JQL_KEEP_QUERY_ON_PARSE_ERROR);
942 RCGO(rc, finish);
943
944 if (wctx->read_anon && jql_has_apply(ux.q)) {
945 _jbr_ws_send_rc(wctx, key, JBR_ERROR_WS_ACCESS_DENIED, 0);
946 goto finish;
947 }
948
949 if (explain) {
950 ux.log = iwxstr_new();
951 if (!ux.log) {
952 iwlog_ecode_error3(iwrc_set_errno(IW_ERROR_ALLOC, errno));
953 goto finish;
954 }
955 }
956
957 rc = ejdb_exec(&ux);
958
959 if (!rc) {
960 if (ux.log) {
961 IWXSTR *wbuf = iwxstr_new();
962 if (!wbuf) {
963 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
964 goto finish;
965 }
966 if (!iwxstr_printf(wbuf, "%s\texplain\t%s", qctx.key, iwxstr_ptr(ux.log))) {
967 _jbr_ws_write_text(wctx->ws, iwxstr_ptr(wbuf), iwxstr_size(wbuf));
968 }
969 iwxstr_destroy(wbuf);
970 }
971 }
972
973 finish:
974 if (rc) {
975 iwrc rcs = rc;
976 iwrc_strip_code(&rcs);
977 switch (rcs) {
978 case JQL_ERROR_QUERY_PARSE:
979 _jbr_ws_send_error(wctx, key, jql_error(ux.q), 0);
980 break;
981 default:
982 _jbr_ws_send_rc(wctx, key, rc, 0);
983 break;
984 }
985 } else {
986 if (jql_has_aggregate_count(ux.q)) {
987 char pbuf[_WS_KEYPREFIX_BUFSZ];
988 _jbr_fill_prefix_buf(key, ux.cnt, pbuf);
989 _jbr_ws_write_text(wctx->ws, pbuf, (int) strlen(pbuf));
990 }
991 _jbr_ws_write_text(wctx->ws, key, (int) strlen(key));
992 }
993 if (ux.q) {
994 jql_destroy(&ux.q);
995 }
996 if (ux.log) {
997 iwxstr_destroy(ux.log);
998 }
999 if (qctx.wbuf) {
1000 iwxstr_destroy(qctx.wbuf);
1001 }
1002 }
1003
_jbr_ws_info(JBWCTX * wctx,const char * key)1004 static void _jbr_ws_info(JBWCTX *wctx, const char *key) {
1005 if (wctx->read_anon) {
1006 _jbr_ws_send_rc(wctx, key, JBR_ERROR_WS_ACCESS_DENIED, 0);
1007 return;
1008 }
1009 JBL jbl;
1010 iwrc rc = ejdb_get_meta(wctx->db, &jbl);
1011 if (rc) {
1012 _jbr_ws_send_rc(wctx, key, rc, 0);
1013 return;
1014 }
1015 IWXSTR *xstr = iwxstr_new2(jbl->bn.size * 2);
1016 if (!xstr) {
1017 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
1018 RCGO(rc, finish);
1019 }
1020 rc = iwxstr_printf(xstr, "%s\t", key);
1021 RCGO(rc, finish);
1022
1023 rc = jbl_as_json(jbl, jbl_xstr_json_printer, xstr, JBL_PRINT_PRETTY);
1024 RCGO(rc, finish);
1025 _jbr_ws_write_text(wctx->ws, iwxstr_ptr(xstr), iwxstr_size(xstr));
1026
1027 finish:
1028 if (rc) {
1029 _jbr_ws_send_rc(wctx, key, rc, 0);
1030 }
1031 jbl_destroy(&jbl);
1032 if (xstr) {
1033 iwxstr_destroy(xstr);
1034 }
1035 }
1036
_jbr_ws_remove_coll(JBWCTX * wctx,const char * key,const char * coll)1037 static void _jbr_ws_remove_coll(JBWCTX *wctx, const char *key, const char *coll) {
1038 if (wctx->read_anon) {
1039 _jbr_ws_send_rc(wctx, key, JBR_ERROR_WS_ACCESS_DENIED, 0);
1040 return;
1041 }
1042 iwrc rc = ejdb_remove_collection(wctx->db, coll);
1043 if (rc) {
1044 _jbr_ws_send_rc(wctx, key, rc, 0);
1045 } else {
1046 _jbr_ws_write_text(wctx->ws, key, (int) strlen(key));
1047 }
1048 }
1049
_jbr_ws_on_message(ws_s * ws,fio_str_info_s msg,uint8_t is_text)1050 static void _jbr_ws_on_message(ws_s *ws, fio_str_info_s msg, uint8_t is_text) {
1051 if (!is_text) { // Do not serve binary requests
1052 websocket_close(ws);
1053 return;
1054 }
1055 if (!msg.data || (msg.len < 1)) { // Ignore empty messages, but keep connection
1056 return;
1057 }
1058 JBWCTX *wctx = websocket_udata_get(ws);
1059 assert(wctx);
1060 wctx->ws = ws;
1061 jbwsop_t wsop = JBWS_NONE;
1062
1063 char keybuf[JBR_MAX_KEY_LEN + 1];
1064 char cnamebuf[EJDB_COLLECTION_NAME_MAX_LEN + 1];
1065
1066 char *data = msg.data, *key = 0;
1067 int len = msg.len, pos;
1068
1069 // Trim right
1070 for (pos = len; pos > 0 && isspace(data[pos - 1]); --pos) ;
1071 len = pos;
1072 // Trim left
1073 for (pos = 0; pos < len && isspace(data[pos]); ++pos) ;
1074 len -= pos;
1075 data += pos;
1076 if (len < 1) {
1077 return;
1078 }
1079 if ((len == 1) && (data[0] == '?')) {
1080 const char *help
1081 = "\n<key> info"
1082 "\n<key> get <collection> <id>"
1083 "\n<key> set <collection> <id> <document json>"
1084 "\n<key> add <collection> <document json>"
1085 "\n<key> del <collection> <id>"
1086 "\n<key> patch <collection> <id> <patch json>"
1087 "\n<key> idx <collection> <mode> <path>"
1088 "\n<key> rmi <collection> <mode> <path>"
1089 "\n<key> rmc <collection>"
1090 "\n<key> query <collection> <query>"
1091 "\n<key> explain <collection> <query>"
1092 "\n<key> <query>"
1093 "\n";
1094 _jbr_ws_write_text(ws, help, (int) strlen(help));
1095 return;
1096 }
1097
1098 // Fetch key, after we can do good errors reporting
1099 for (pos = 0; pos < len && !isspace(data[pos]); ++pos) ;
1100 if (pos > JBR_MAX_KEY_LEN) {
1101 iwlog_warn("The key length: %d exceeded limit: %d", pos, JBR_MAX_KEY_LEN);
1102 return;
1103 }
1104 memcpy(keybuf, data, pos);
1105 keybuf[pos] = '\0';
1106 key = keybuf;
1107 if (pos >= len) {
1108 _jbr_ws_send_rc(wctx, key, JBR_ERROR_WS_INVALID_MESSAGE, JBR_WS_STR_PREMATURE_END);
1109 return;
1110 }
1111
1112 // Space
1113 for ( ; pos < len && isspace(data[pos]); ++pos) ;
1114 len -= pos;
1115 data += pos;
1116 if (len < 1) {
1117 _jbr_ws_send_rc(wctx, key, JBR_ERROR_WS_INVALID_MESSAGE, JBR_WS_STR_PREMATURE_END);
1118 return;
1119 }
1120
1121 // Fetch command
1122 for (pos = 0; pos < len && !isspace(data[pos]); ++pos) ;
1123
1124 if (pos <= len) {
1125 if (!strncmp("get", data, pos)) {
1126 wsop = JBWS_GET;
1127 } else if (!strncmp("add", data, pos)) {
1128 wsop = JBWS_ADD;
1129 } else if (!strncmp("set", data, pos)) {
1130 wsop = JBWS_SET;
1131 } else if (!strncmp("query", data, pos)) {
1132 wsop = JBWS_QUERY;
1133 } else if (!strncmp("del", data, pos)) {
1134 wsop = JBWS_DEL;
1135 } else if (!strncmp("patch", data, pos)) {
1136 wsop = JBWS_PATCH;
1137 } else if (!strncmp("explain", data, pos)) {
1138 wsop = JBWS_EXPLAIN;
1139 } else if (!strncmp("info", data, pos)) {
1140 wsop = JBWS_INFO;
1141 } else if (!strncmp("idx", data, pos)) {
1142 wsop = JBWS_IDX;
1143 } else if (!strncmp("rmi", data, pos)) {
1144 wsop = JBWS_NIDX;
1145 } else if (!strncmp("rmc", data, pos)) {
1146 wsop = JBWS_REMOVE_COLL;
1147 }
1148 }
1149
1150 if (wsop > JBWS_NONE) {
1151 if (wsop == JBWS_INFO) {
1152 _jbr_ws_info(wctx, key);
1153 return;
1154 }
1155 for ( ; pos < len && isspace(data[pos]); ++pos) ;
1156 len -= pos;
1157 data += pos;
1158
1159 char *coll = data;
1160 for (pos = 0; pos < len && !isspace(data[pos]); ++pos) ;
1161 len -= pos;
1162 data += pos;
1163
1164 if ((pos < 1) || (len < 1)) {
1165 if (wsop != JBWS_REMOVE_COLL) {
1166 _jbr_ws_send_rc(wctx, key, JBR_ERROR_WS_INVALID_MESSAGE, JBR_WS_STR_PREMATURE_END);
1167 return;
1168 }
1169 } else if (pos > EJDB_COLLECTION_NAME_MAX_LEN) {
1170 _jbr_ws_send_rc(wctx, key, JBR_ERROR_WS_INVALID_MESSAGE,
1171 "Collection name exceeds maximum length allowed: "
1172 "EJDB_COLLECTION_NAME_MAX_LEN");
1173 return;
1174 }
1175 memcpy(cnamebuf, coll, pos);
1176 cnamebuf[pos] = '\0';
1177 coll = cnamebuf;
1178
1179 if (wsop == JBWS_REMOVE_COLL) {
1180 _jbr_ws_remove_coll(wctx, key, coll);
1181 return;
1182 }
1183
1184 for (pos = 0; pos < len && isspace(data[pos]); ++pos) ;
1185 len -= pos;
1186 data += pos;
1187 if (len < 1) {
1188 _jbr_ws_send_rc(wctx, key, JBR_ERROR_WS_INVALID_MESSAGE, JBR_WS_STR_PREMATURE_END);
1189 return;
1190 }
1191
1192 switch (wsop) {
1193 case JBWS_ADD:
1194 data[len] = '\0';
1195 _jbr_ws_add_document(wctx, key, coll, data);
1196 break;
1197 case JBWS_QUERY:
1198 case JBWS_EXPLAIN:
1199 data[len] = '\0';
1200 _jbr_ws_query(wctx, key, coll, data, (wsop == JBWS_EXPLAIN));
1201 break;
1202 default: {
1203 char nbuf[JBNUMBUF_SIZE];
1204 for (pos = 0; pos < len && pos < JBNUMBUF_SIZE - 1 && isdigit(data[pos]); ++pos) {
1205 nbuf[pos] = data[pos];
1206 }
1207 nbuf[pos] = '\0';
1208 for ( ; pos < len && isspace(data[pos]); ++pos) ;
1209 len -= pos;
1210 data += pos;
1211
1212 int64_t id = iwatoi(nbuf);
1213 if (id < 1) {
1214 _jbr_ws_send_rc(wctx, key, JBR_ERROR_WS_INVALID_MESSAGE, "Invalid document id specified");
1215 return;
1216 }
1217 switch (wsop) {
1218 case JBWS_GET:
1219 _jbr_ws_get_document(wctx, key, coll, id);
1220 break;
1221 case JBWS_SET:
1222 data[len] = '\0';
1223 _jbr_ws_set_document(wctx, key, coll, id, data);
1224 break;
1225 case JBWS_DEL:
1226 _jbr_ws_del_document(wctx, key, coll, id);
1227 break;
1228 case JBWS_PATCH:
1229 data[len] = '\0';
1230 _jbr_ws_patch_document(wctx, key, coll, id, data);
1231 break;
1232 case JBWS_IDX:
1233 data[len] = '\0';
1234 _jbr_ws_set_index(wctx, key, coll, id, data);
1235 break;
1236 case JBWS_NIDX:
1237 data[len] = '\0';
1238 _jbr_ws_del_index(wctx, key, coll, id, data);
1239 break;
1240 default:
1241 _jbr_ws_send_rc(wctx, key, JBR_ERROR_WS_INVALID_MESSAGE, 0);
1242 return;
1243 }
1244 }
1245 }
1246 } else {
1247 data[len] = '\0';
1248 _jbr_ws_query(wctx, key, 0, data, false);
1249 }
1250 }
1251
_jbr_on_http_upgrade(http_s * req,char * requested_protocol,size_t len)1252 static void _jbr_on_http_upgrade(http_s *req, char *requested_protocol, size_t len) {
1253 JBR jbr = req->udata;
1254 assert(jbr);
1255 const EJDB_HTTP *http = jbr->http;
1256 fio_str_info_s path = fiobj_obj2cstr(req->path);
1257
1258 if ( ((path.len != 1) || (path.data[0] != '/'))
1259 || ((len != 9) || (requested_protocol[1] != 'e'))) {
1260 http_send_error(req, 400);
1261 return;
1262 }
1263 JBWCTX *wctx = calloc(1, sizeof(*wctx));
1264 if (!wctx) {
1265 http_send_error(req, 500);
1266 return;
1267 }
1268 wctx->db = jbr->db;
1269
1270 if (http->access_token) {
1271 FIOBJ h = fiobj_hash_get2(req->headers, k_header_x_access_token_hash);
1272 if (!h) {
1273 if (http->read_anon) {
1274 wctx->read_anon = true;
1275 } else {
1276 free(wctx);
1277 http_send_error(req, 401);
1278 return;
1279 }
1280 }
1281 if (!fiobj_type_is(h, FIOBJ_T_STRING)) { // header specified more than once
1282 free(wctx);
1283 http_send_error(req, 400);
1284 return;
1285 }
1286 fio_str_info_s hv = fiobj_obj2cstr(h);
1287 if ((hv.len != http->access_token_len) || (memcmp(hv.data, http->access_token, http->access_token_len) != 0)) { // -V526
1288 free(wctx);
1289 http_send_error(req, 403);
1290 return;
1291 }
1292 }
1293 if (http_upgrade2ws(req,
1294 .on_message = _jbr_ws_on_message,
1295 .on_open = _jbr_ws_on_open,
1296 .on_close = _jbr_ws_on_close,
1297 .udata = wctx) < 0) {
1298 free(wctx);
1299 JBR_RC_REPORT(500, req, JBR_ERROR_WS_UPGRADE);
1300 }
1301 }
1302
1303 //---------------- Main ---------------------
1304
_jbr_start_thread(void * op)1305 static void *_jbr_start_thread(void *op) {
1306 JBR jbr = op;
1307 char nbuf[JBNUMBUF_SIZE];
1308 const EJDB_HTTP *http = jbr->http;
1309 const char *bind = http->bind ? http->bind : "localhost";
1310 if (http->port < 1) {
1311 jbr->rc = JBR_ERROR_PORT_INVALID;
1312 if (!jbr->http->blocking) {
1313 pthread_barrier_wait(&jbr->start_barrier);
1314 }
1315 return 0;
1316 }
1317 iwitoa(http->port, nbuf, sizeof(nbuf));
1318 iwlog_info("HTTP/WS endpoint at %s:%s", bind, nbuf);
1319 websocket_optimize4broadcasts(WEBSOCKET_OPTIMIZE_PUBSUB_TEXT, 1);
1320 if (http_listen(nbuf, bind,
1321 .udata = jbr,
1322 .on_request = _jbr_on_http_request,
1323 .on_upgrade = _jbr_on_http_upgrade,
1324 .on_finish = _jbr_on_http_finish,
1325 .max_body_size = http->max_body_size,
1326 .ws_max_msg_size = http->max_body_size) == -1) {
1327 jbr->rc = iwrc_set_errno(JBR_ERROR_HTTP_LISTEN, errno);
1328 }
1329 if (jbr->rc) {
1330 if (!jbr->http->blocking) {
1331 pthread_barrier_wait(&jbr->start_barrier);
1332 }
1333 return 0;
1334 }
1335 fio_state_callback_add(FIO_CALL_PRE_START, _jbr_on_pre_start, jbr);
1336 fio_start(.threads = -2, .workers = 1, .is_no_signal_handlers = !jbr->http->blocking); // Will block current thread
1337 // here
1338 return 0;
1339 }
1340
_jbr_release(JBR * pjbr)1341 static void _jbr_release(JBR *pjbr) {
1342 JBR jbr = *pjbr;
1343 free(jbr);
1344 *pjbr = 0;
1345 }
1346
jbr_start(EJDB db,const EJDB_OPTS * opts,JBR * pjbr)1347 iwrc jbr_start(EJDB db, const EJDB_OPTS *opts, JBR *pjbr) {
1348 iwrc rc;
1349 *pjbr = 0;
1350 if (!opts->http.enabled) {
1351 return 0;
1352 }
1353 JBR jbr = calloc(1, sizeof(*jbr));
1354 if (!jbr) {
1355 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
1356 }
1357 jbr->db = db;
1358 jbr->terminated = true;
1359 jbr->http = &opts->http;
1360
1361 if (!jbr->http->blocking) {
1362 int rci = pthread_barrier_init(&jbr->start_barrier, 0, 2);
1363 if (rci) {
1364 free(jbr);
1365 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
1366 }
1367 rci = pthread_create(&jbr->worker_thread, 0, _jbr_start_thread, jbr);
1368 if (rci) {
1369 pthread_barrier_destroy(&jbr->start_barrier);
1370 free(jbr);
1371 return iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
1372 }
1373 pthread_barrier_wait(&jbr->start_barrier);
1374 pthread_barrier_destroy(&jbr->start_barrier);
1375 jbr->terminated = false;
1376 rc = jbr->rc;
1377 if (rc) {
1378 jbr_shutdown(pjbr);
1379 return rc;
1380 }
1381 *pjbr = jbr;
1382 } else {
1383 *pjbr = jbr;
1384 jbr->terminated = false;
1385 _jbr_start_thread(jbr); // Will block here
1386 rc = jbr->rc;
1387 jbr->terminated = true;
1388 IWRC(jbr_shutdown(pjbr), rc);
1389 }
1390 return rc;
1391 }
1392
jbr_shutdown(JBR * pjbr)1393 iwrc jbr_shutdown(JBR *pjbr) {
1394 if (!*pjbr) {
1395 return 0;
1396 }
1397 JBR jbr = *pjbr;
1398 if (__sync_bool_compare_and_swap(&jbr->terminated, 0, 1)) {
1399 fio_state_callback_remove(FIO_CALL_PRE_START, _jbr_on_pre_start, jbr);
1400 fio_stop();
1401 if (!jbr->http->blocking) {
1402 pthread_join(jbr->worker_thread, 0);
1403 }
1404 }
1405 _jbr_release(pjbr);
1406 *pjbr = 0;
1407 return 0;
1408 }
1409
_jbr_ecodefn(locale_t locale,uint32_t ecode)1410 static const char *_jbr_ecodefn(locale_t locale, uint32_t ecode) {
1411 if (!((ecode > _JBR_ERROR_START) && (ecode < _JBR_ERROR_END))) {
1412 return 0;
1413 }
1414 switch (ecode) {
1415 case JBR_ERROR_HTTP_LISTEN:
1416 return "Failed to start HTTP network listener (JBR_ERROR_HTTP_LISTEN)";
1417 case JBR_ERROR_PORT_INVALID:
1418 return "Invalid port specified (JBR_ERROR_PORT_INVALID)";
1419 case JBR_ERROR_SEND_RESPONSE:
1420 return "Error sending response (JBR_ERROR_SEND_RESPONSE)";
1421 case JBR_ERROR_WS_UPGRADE:
1422 return "Failed upgrading to websocket connection (JBR_ERROR_WS_UPGRADE)";
1423 case JBR_ERROR_WS_INVALID_MESSAGE:
1424 return "Invalid message recieved (JBR_ERROR_WS_INVALID_MESSAGE)";
1425 case JBR_ERROR_WS_ACCESS_DENIED:
1426 return "Access denied (JBR_ERROR_WS_ACCESS_DENIED)";
1427 }
1428 return 0;
1429 }
1430
jbr_init()1431 iwrc jbr_init() {
1432 static int _jbr_initialized = 0;
1433 if (!__sync_bool_compare_and_swap(&_jbr_initialized, 0, 1)) {
1434 return 0;
1435 }
1436 k_header_x_access_token_hash = fiobj_hash_string("x-access-token", 14);
1437 k_header_x_hints_hash = fiobj_hash_string("x-hints", 7);
1438 k_header_content_length_hash = fiobj_hash_string("content-length", 14);
1439 k_header_content_type_hash = fiobj_hash_string("content-type", 12);
1440 return iwlog_register_ecodefn(_jbr_ecodefn);
1441 }
1442