1 /*
2 * Copyright (c) 2000-2004 Niels Provos <provos@citi.umich.edu>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
17 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
18 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
19 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
20 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
21 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
22 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
23 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
25 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27 #ifdef HAVE_CONFIG_H
28 #include "config.h"
29 #endif
30
31 #ifdef WIN32
32 #define WIN32_LEAN_AND_MEAN
33 #include <winsock2.h>
34 #include <windows.h>
35 #undef WIN32_LEAN_AND_MEAN
36 #endif
37
38 #include <sys/types.h>
39 #ifndef WIN32
40 #include <sys/socket.h>
41 #endif
42 #ifdef HAVE_SYS_TIME_H
43 #include <sys/time.h>
44 #endif
45 #include <sys/queue.h>
46 #include <stdio.h>
47 #include <stdlib.h>
48 #ifndef WIN32
49 #include <unistd.h>
50 #endif
51 #include <errno.h>
52 #include <signal.h>
53 #include <string.h>
54 #include <assert.h>
55
56 #include "event.h"
57 #include "evrpc.h"
58 #include "evrpc-internal.h"
59 #include "evhttp.h"
60 #include "evutil.h"
61 #include "log.h"
62
63 struct evrpc_base *
evrpc_init(struct evhttp * http_server)64 evrpc_init(struct evhttp *http_server)
65 {
66 struct evrpc_base* base = calloc(1, sizeof(struct evrpc_base));
67 if (base == NULL)
68 return (NULL);
69
70 /* we rely on the tagging sub system */
71 evtag_init();
72
73 TAILQ_INIT(&base->registered_rpcs);
74 TAILQ_INIT(&base->input_hooks);
75 TAILQ_INIT(&base->output_hooks);
76 base->http_server = http_server;
77
78 return (base);
79 }
80
81 void
evrpc_free(struct evrpc_base * base)82 evrpc_free(struct evrpc_base *base)
83 {
84 struct evrpc *rpc;
85 struct evrpc_hook *hook;
86
87 while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) {
88 assert(evrpc_unregister_rpc(base, rpc->uri));
89 }
90 while ((hook = TAILQ_FIRST(&base->input_hooks)) != NULL) {
91 assert(evrpc_remove_hook(base, EVRPC_INPUT, hook));
92 }
93 while ((hook = TAILQ_FIRST(&base->output_hooks)) != NULL) {
94 assert(evrpc_remove_hook(base, EVRPC_OUTPUT, hook));
95 }
96 free(base);
97 }
98
99 void *
evrpc_add_hook(void * vbase,enum EVRPC_HOOK_TYPE hook_type,int (* cb)(struct evhttp_request *,struct evbuffer *,void *),void * cb_arg)100 evrpc_add_hook(void *vbase,
101 enum EVRPC_HOOK_TYPE hook_type,
102 int (*cb)(struct evhttp_request *, struct evbuffer *, void *),
103 void *cb_arg)
104 {
105 struct _evrpc_hooks *base = vbase;
106 struct evrpc_hook_list *head = NULL;
107 struct evrpc_hook *hook = NULL;
108 switch (hook_type) {
109 case EVRPC_INPUT:
110 head = &base->in_hooks;
111 break;
112 case EVRPC_OUTPUT:
113 head = &base->out_hooks;
114 break;
115 default:
116 assert(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT);
117 }
118
119 hook = calloc(1, sizeof(struct evrpc_hook));
120 assert(hook != NULL);
121
122 hook->process = cb;
123 hook->process_arg = cb_arg;
124 TAILQ_INSERT_TAIL(head, hook, next);
125
126 return (hook);
127 }
128
129 static int
evrpc_remove_hook_internal(struct evrpc_hook_list * head,void * handle)130 evrpc_remove_hook_internal(struct evrpc_hook_list *head, void *handle)
131 {
132 struct evrpc_hook *hook = NULL;
133 TAILQ_FOREACH(hook, head, next) {
134 if (hook == handle) {
135 TAILQ_REMOVE(head, hook, next);
136 free(hook);
137 return (1);
138 }
139 }
140
141 return (0);
142 }
143
144 /*
145 * remove the hook specified by the handle
146 */
147
148 int
evrpc_remove_hook(void * vbase,enum EVRPC_HOOK_TYPE hook_type,void * handle)149 evrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle)
150 {
151 struct _evrpc_hooks *base = vbase;
152 struct evrpc_hook_list *head = NULL;
153 switch (hook_type) {
154 case EVRPC_INPUT:
155 head = &base->in_hooks;
156 break;
157 case EVRPC_OUTPUT:
158 head = &base->out_hooks;
159 break;
160 default:
161 assert(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT);
162 }
163
164 return (evrpc_remove_hook_internal(head, handle));
165 }
166
167 static int
evrpc_process_hooks(struct evrpc_hook_list * head,struct evhttp_request * req,struct evbuffer * evbuf)168 evrpc_process_hooks(struct evrpc_hook_list *head,
169 struct evhttp_request *req, struct evbuffer *evbuf)
170 {
171 struct evrpc_hook *hook;
172 TAILQ_FOREACH(hook, head, next) {
173 if (hook->process(req, evbuf, hook->process_arg) == -1)
174 return (-1);
175 }
176
177 return (0);
178 }
179
180 static void evrpc_pool_schedule(struct evrpc_pool *pool);
181 static void evrpc_request_cb(struct evhttp_request *, void *);
182 void evrpc_request_done(struct evrpc_req_generic*);
183
184 /*
185 * Registers a new RPC with the HTTP server. The evrpc object is expected
186 * to have been filled in via the EVRPC_REGISTER_OBJECT macro which in turn
187 * calls this function.
188 */
189
190 static char *
evrpc_construct_uri(const char * uri)191 evrpc_construct_uri(const char *uri)
192 {
193 char *constructed_uri;
194 int constructed_uri_len;
195
196 constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(uri) + 1;
197 if ((constructed_uri = malloc(constructed_uri_len)) == NULL)
198 event_err(1, "%s: failed to register rpc at %s",
199 __func__, uri);
200 memcpy(constructed_uri, EVRPC_URI_PREFIX, strlen(EVRPC_URI_PREFIX));
201 memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), uri, strlen(uri));
202 constructed_uri[constructed_uri_len - 1] = '\0';
203
204 return (constructed_uri);
205 }
206
207 int
evrpc_register_rpc(struct evrpc_base * base,struct evrpc * rpc,void (* cb)(struct evrpc_req_generic *,void *),void * cb_arg)208 evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc,
209 void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg)
210 {
211 char *constructed_uri = evrpc_construct_uri(rpc->uri);
212
213 rpc->base = base;
214 rpc->cb = cb;
215 rpc->cb_arg = cb_arg;
216
217 TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next);
218
219 evhttp_set_cb(base->http_server,
220 constructed_uri,
221 evrpc_request_cb,
222 rpc);
223
224 free(constructed_uri);
225
226 return (0);
227 }
228
229 int
evrpc_unregister_rpc(struct evrpc_base * base,const char * name)230 evrpc_unregister_rpc(struct evrpc_base *base, const char *name)
231 {
232 char *registered_uri = NULL;
233 struct evrpc *rpc;
234
235 /* find the right rpc; linear search might be slow */
236 TAILQ_FOREACH(rpc, &base->registered_rpcs, next) {
237 if (strcmp(rpc->uri, name) == 0)
238 break;
239 }
240 if (rpc == NULL) {
241 /* We did not find an RPC with this name */
242 return (-1);
243 }
244 TAILQ_REMOVE(&base->registered_rpcs, rpc, next);
245
246 free((char *)rpc->uri);
247 free(rpc);
248
249 registered_uri = evrpc_construct_uri(name);
250
251 /* remove the http server callback */
252 assert(evhttp_del_cb(base->http_server, registered_uri) == 0);
253
254 free(registered_uri);
255 return (0);
256 }
257
258 static void
evrpc_request_cb(struct evhttp_request * req,void * arg)259 evrpc_request_cb(struct evhttp_request *req, void *arg)
260 {
261 struct evrpc *rpc = arg;
262 struct evrpc_req_generic *rpc_state = NULL;
263
264 /* let's verify the outside parameters */
265 if (req->type != EVHTTP_REQ_POST ||
266 EVBUFFER_LENGTH(req->input_buffer) <= 0)
267 goto error;
268
269 /*
270 * we might want to allow hooks to suspend the processing,
271 * but at the moment, we assume that they just act as simple
272 * filters.
273 */
274 if (evrpc_process_hooks(&rpc->base->input_hooks,
275 req, req->input_buffer) == -1)
276 goto error;
277
278 rpc_state = calloc(1, sizeof(struct evrpc_req_generic));
279 if (rpc_state == NULL)
280 goto error;
281
282 /* let's check that we can parse the request */
283 rpc_state->request = rpc->request_new();
284 if (rpc_state->request == NULL)
285 goto error;
286
287 rpc_state->rpc = rpc;
288
289 if (rpc->request_unmarshal(
290 rpc_state->request, req->input_buffer) == -1) {
291 /* we failed to parse the request; that's a bummer */
292 goto error;
293 }
294
295 /* at this point, we have a well formed request, prepare the reply */
296
297 rpc_state->reply = rpc->reply_new();
298 if (rpc_state->reply == NULL)
299 goto error;
300
301 rpc_state->http_req = req;
302 rpc_state->done = evrpc_request_done;
303
304 /* give the rpc to the user; they can deal with it */
305 rpc->cb(rpc_state, rpc->cb_arg);
306
307 return;
308
309 error:
310 evrpc_reqstate_free(rpc_state);
311 evhttp_send_error(req, HTTP_SERVUNAVAIL, "Service Error");
312 return;
313 }
314
315 void
evrpc_reqstate_free(struct evrpc_req_generic * rpc_state)316 evrpc_reqstate_free(struct evrpc_req_generic* rpc_state)
317 {
318 /* clean up all memory */
319 if (rpc_state != NULL) {
320 struct evrpc *rpc = rpc_state->rpc;
321
322 if (rpc_state->request != NULL)
323 rpc->request_free(rpc_state->request);
324 if (rpc_state->reply != NULL)
325 rpc->reply_free(rpc_state->reply);
326 free(rpc_state);
327 }
328 }
329
330 void
evrpc_request_done(struct evrpc_req_generic * rpc_state)331 evrpc_request_done(struct evrpc_req_generic* rpc_state)
332 {
333 struct evhttp_request *req = rpc_state->http_req;
334 struct evrpc *rpc = rpc_state->rpc;
335 struct evbuffer* data = NULL;
336
337 if (rpc->reply_complete(rpc_state->reply) == -1) {
338 /* the reply was not completely filled in. error out */
339 goto error;
340 }
341
342 if ((data = evbuffer_new()) == NULL) {
343 /* out of memory */
344 goto error;
345 }
346
347 /* serialize the reply */
348 rpc->reply_marshal(data, rpc_state->reply);
349
350 /* do hook based tweaks to the request */
351 if (evrpc_process_hooks(&rpc->base->output_hooks,
352 req, data) == -1)
353 goto error;
354
355 /* on success, we are going to transmit marshaled binary data */
356 if (evhttp_find_header(req->output_headers, "Content-Type") == NULL) {
357 evhttp_add_header(req->output_headers,
358 "Content-Type", "application/octet-stream");
359 }
360
361 evhttp_send_reply(req, HTTP_OK, "OK", data);
362
363 evbuffer_free(data);
364
365 evrpc_reqstate_free(rpc_state);
366
367 return;
368
369 error:
370 if (data != NULL)
371 evbuffer_free(data);
372 evrpc_reqstate_free(rpc_state);
373 evhttp_send_error(req, HTTP_SERVUNAVAIL, "Service Error");
374 return;
375 }
376
377 /* Client implementation of RPC site */
378
379 static int evrpc_schedule_request(struct evhttp_connection *connection,
380 struct evrpc_request_wrapper *ctx);
381
382 struct evrpc_pool *
evrpc_pool_new(struct event_base * base)383 evrpc_pool_new(struct event_base *base)
384 {
385 struct evrpc_pool *pool = calloc(1, sizeof(struct evrpc_pool));
386 if (pool == NULL)
387 return (NULL);
388
389 TAILQ_INIT(&pool->connections);
390 TAILQ_INIT(&pool->requests);
391
392 TAILQ_INIT(&pool->input_hooks);
393 TAILQ_INIT(&pool->output_hooks);
394
395 pool->base = base;
396 pool->timeout = -1;
397
398 return (pool);
399 }
400
401 static void
evrpc_request_wrapper_free(struct evrpc_request_wrapper * request)402 evrpc_request_wrapper_free(struct evrpc_request_wrapper *request)
403 {
404 free(request->name);
405 free(request);
406 }
407
408 void
evrpc_pool_free(struct evrpc_pool * pool)409 evrpc_pool_free(struct evrpc_pool *pool)
410 {
411 struct evhttp_connection *connection;
412 struct evrpc_request_wrapper *request;
413 struct evrpc_hook *hook;
414
415 while ((request = TAILQ_FIRST(&pool->requests)) != NULL) {
416 TAILQ_REMOVE(&pool->requests, request, next);
417 /* if this gets more complicated we need our own function */
418 evrpc_request_wrapper_free(request);
419 }
420
421 while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) {
422 TAILQ_REMOVE(&pool->connections, connection, next);
423 evhttp_connection_free(connection);
424 }
425
426 while ((hook = TAILQ_FIRST(&pool->input_hooks)) != NULL) {
427 assert(evrpc_remove_hook(pool, EVRPC_INPUT, hook));
428 }
429
430 while ((hook = TAILQ_FIRST(&pool->output_hooks)) != NULL) {
431 assert(evrpc_remove_hook(pool, EVRPC_OUTPUT, hook));
432 }
433
434 free(pool);
435 }
436
437 /*
438 * Add a connection to the RPC pool. A request scheduled on the pool
439 * may use any available connection.
440 */
441
442 void
evrpc_pool_add_connection(struct evrpc_pool * pool,struct evhttp_connection * connection)443 evrpc_pool_add_connection(struct evrpc_pool *pool,
444 struct evhttp_connection *connection) {
445 assert(connection->http_server == NULL);
446 TAILQ_INSERT_TAIL(&pool->connections, connection, next);
447
448 /*
449 * associate an event base with this connection
450 */
451 if (pool->base != NULL)
452 evhttp_connection_set_base(connection, pool->base);
453
454 /*
455 * unless a timeout was specifically set for a connection,
456 * the connection inherits the timeout from the pool.
457 */
458 if (connection->timeout == -1)
459 connection->timeout = pool->timeout;
460
461 /*
462 * if we have any requests pending, schedule them with the new
463 * connections.
464 */
465
466 if (TAILQ_FIRST(&pool->requests) != NULL) {
467 struct evrpc_request_wrapper *request =
468 TAILQ_FIRST(&pool->requests);
469 TAILQ_REMOVE(&pool->requests, request, next);
470 evrpc_schedule_request(connection, request);
471 }
472 }
473
474 void
evrpc_pool_set_timeout(struct evrpc_pool * pool,int timeout_in_secs)475 evrpc_pool_set_timeout(struct evrpc_pool *pool, int timeout_in_secs)
476 {
477 struct evhttp_connection *evcon;
478 TAILQ_FOREACH(evcon, &pool->connections, next) {
479 evcon->timeout = timeout_in_secs;
480 }
481 pool->timeout = timeout_in_secs;
482 }
483
484
485 static void evrpc_reply_done(struct evhttp_request *, void *);
486 static void evrpc_request_timeout(int, short, void *);
487
488 /*
489 * Finds a connection object associated with the pool that is currently
490 * idle and can be used to make a request.
491 */
492 static struct evhttp_connection *
evrpc_pool_find_connection(struct evrpc_pool * pool)493 evrpc_pool_find_connection(struct evrpc_pool *pool)
494 {
495 struct evhttp_connection *connection;
496 TAILQ_FOREACH(connection, &pool->connections, next) {
497 if (TAILQ_FIRST(&connection->requests) == NULL)
498 return (connection);
499 }
500
501 return (NULL);
502 }
503
504 /*
505 * We assume that the ctx is no longer queued on the pool.
506 */
507 static int
evrpc_schedule_request(struct evhttp_connection * connection,struct evrpc_request_wrapper * ctx)508 evrpc_schedule_request(struct evhttp_connection *connection,
509 struct evrpc_request_wrapper *ctx)
510 {
511 struct evhttp_request *req = NULL;
512 struct evrpc_pool *pool = ctx->pool;
513 struct evrpc_status status;
514 char *uri = NULL;
515 int res = 0;
516
517 if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL)
518 goto error;
519
520 /* serialize the request data into the output buffer */
521 ctx->request_marshal(req->output_buffer, ctx->request);
522
523 uri = evrpc_construct_uri(ctx->name);
524 if (uri == NULL)
525 goto error;
526
527 /* we need to know the connection that we might have to abort */
528 ctx->evcon = connection;
529
530 /* apply hooks to the outgoing request */
531 if (evrpc_process_hooks(&pool->output_hooks,
532 req, req->output_buffer) == -1)
533 goto error;
534
535 if (pool->timeout > 0) {
536 /*
537 * a timeout after which the whole rpc is going to be aborted.
538 */
539 struct timeval tv;
540 evutil_timerclear(&tv);
541 tv.tv_sec = pool->timeout;
542 evtimer_add(&ctx->ev_timeout, &tv);
543 }
544
545 /* start the request over the connection */
546 res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri);
547 free(uri);
548
549 if (res == -1)
550 goto error;
551
552 return (0);
553
554 error:
555 memset(&status, 0, sizeof(status));
556 status.error = EVRPC_STATUS_ERR_UNSTARTED;
557 (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
558 evrpc_request_wrapper_free(ctx);
559 return (-1);
560 }
561
562 int
evrpc_make_request(struct evrpc_request_wrapper * ctx)563 evrpc_make_request(struct evrpc_request_wrapper *ctx)
564 {
565 struct evrpc_pool *pool = ctx->pool;
566
567 /* initialize the event structure for this rpc */
568 evtimer_set(&ctx->ev_timeout, evrpc_request_timeout, ctx);
569 if (pool->base != NULL)
570 event_base_set(pool->base, &ctx->ev_timeout);
571
572 /* we better have some available connections on the pool */
573 assert(TAILQ_FIRST(&pool->connections) != NULL);
574
575 /*
576 * if no connection is available, we queue the request on the pool,
577 * the next time a connection is empty, the rpc will be send on that.
578 */
579 TAILQ_INSERT_TAIL(&pool->requests, ctx, next);
580
581 evrpc_pool_schedule(pool);
582
583 return (0);
584 }
585
586 static void
evrpc_reply_done(struct evhttp_request * req,void * arg)587 evrpc_reply_done(struct evhttp_request *req, void *arg)
588 {
589 struct evrpc_request_wrapper *ctx = arg;
590 struct evrpc_pool *pool = ctx->pool;
591 struct evrpc_status status;
592 int res = -1;
593
594 /* cancel any timeout we might have scheduled */
595 event_del(&ctx->ev_timeout);
596
597 memset(&status, 0, sizeof(status));
598 status.http_req = req;
599
600 /* we need to get the reply now */
601 if (req != NULL) {
602 /* apply hooks to the incoming request */
603 if (evrpc_process_hooks(&pool->input_hooks,
604 req, req->input_buffer) == -1) {
605 status.error = EVRPC_STATUS_ERR_HOOKABORTED;
606 res = -1;
607 } else {
608 res = ctx->reply_unmarshal(ctx->reply,
609 req->input_buffer);
610 if (res == -1) {
611 status.error = EVRPC_STATUS_ERR_BADPAYLOAD;
612 }
613 }
614 } else {
615 status.error = EVRPC_STATUS_ERR_TIMEOUT;
616 }
617
618 if (res == -1) {
619 /* clear everything that we might have written previously */
620 ctx->reply_clear(ctx->reply);
621 }
622
623 (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
624
625 evrpc_request_wrapper_free(ctx);
626
627 /* the http layer owns the request structure */
628
629 /* see if we can schedule another request */
630 evrpc_pool_schedule(pool);
631 }
632
633 static void
evrpc_pool_schedule(struct evrpc_pool * pool)634 evrpc_pool_schedule(struct evrpc_pool *pool)
635 {
636 struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests);
637 struct evhttp_connection *evcon;
638
639 /* if no requests are pending, we have no work */
640 if (ctx == NULL)
641 return;
642
643 if ((evcon = evrpc_pool_find_connection(pool)) != NULL) {
644 TAILQ_REMOVE(&pool->requests, ctx, next);
645 evrpc_schedule_request(evcon, ctx);
646 }
647 }
648
649 static void
evrpc_request_timeout(int fd,short what,void * arg)650 evrpc_request_timeout(int fd, short what, void *arg)
651 {
652 struct evrpc_request_wrapper *ctx = arg;
653 struct evhttp_connection *evcon = ctx->evcon;
654 assert(evcon != NULL);
655
656 evhttp_connection_fail(evcon, EVCON_HTTP_TIMEOUT);
657 }
658