• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21 
22 #include "task.h"
23 #include "uv.h"
24 
25 #define IPC_PIPE_NAME TEST_PIPENAME
26 #define NUM_CONNECTS  (250 * 1000)
27 
28 union stream_handle {
29   uv_pipe_t pipe;
30   uv_tcp_t tcp;
31 };
32 
33 /* Use as (uv_stream_t *) &handle_storage -- it's kind of clunky but it
34  * avoids aliasing warnings.
35  */
36 typedef unsigned char handle_storage_t[sizeof(union stream_handle)];
37 
38 /* Used for passing around the listen handle, not part of the benchmark proper.
39  * We have an overabundance of server types here. It works like this:
40  *
41  *  1. The main thread starts an IPC pipe server.
42  *  2. The worker threads connect to the IPC server and obtain a listen handle.
43  *  3. The worker threads start accepting requests on the listen handle.
44  *  4. The main thread starts connecting repeatedly.
45  *
46  * Step #4 should perhaps be farmed out over several threads.
47  */
48 struct ipc_server_ctx {
49   handle_storage_t server_handle;
50   unsigned int num_connects;
51   uv_pipe_t ipc_pipe;
52 };
53 
54 struct ipc_peer_ctx {
55   handle_storage_t peer_handle;
56   uv_write_t write_req;
57 };
58 
59 struct ipc_client_ctx {
60   uv_connect_t connect_req;
61   uv_stream_t* server_handle;
62   uv_pipe_t ipc_pipe;
63   char scratch[16];
64 };
65 
66 /* Used in the actual benchmark. */
67 struct server_ctx {
68   handle_storage_t server_handle;
69   unsigned int num_connects;
70   uv_async_t async_handle;
71   uv_thread_t thread_id;
72   uv_sem_t semaphore;
73 };
74 
75 struct client_ctx {
76   handle_storage_t client_handle;
77   unsigned int num_connects;
78   uv_connect_t connect_req;
79   uv_idle_t idle_handle;
80 };
81 
82 static void ipc_connection_cb(uv_stream_t* ipc_pipe, int status);
83 static void ipc_write_cb(uv_write_t* req, int status);
84 static void ipc_close_cb(uv_handle_t* handle);
85 static void ipc_connect_cb(uv_connect_t* req, int status);
86 static void ipc_read_cb(uv_stream_t* handle,
87                         ssize_t nread,
88                         const uv_buf_t* buf);
89 static void ipc_alloc_cb(uv_handle_t* handle,
90                          size_t suggested_size,
91                          uv_buf_t* buf);
92 
93 static void sv_async_cb(uv_async_t* handle);
94 static void sv_connection_cb(uv_stream_t* server_handle, int status);
95 static void sv_read_cb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf);
96 static void sv_alloc_cb(uv_handle_t* handle,
97                         size_t suggested_size,
98                         uv_buf_t* buf);
99 
100 static void cl_connect_cb(uv_connect_t* req, int status);
101 static void cl_idle_cb(uv_idle_t* handle);
102 static void cl_close_cb(uv_handle_t* handle);
103 
104 static struct sockaddr_in listen_addr;
105 
106 
ipc_connection_cb(uv_stream_t * ipc_pipe,int status)107 static void ipc_connection_cb(uv_stream_t* ipc_pipe, int status) {
108   struct ipc_server_ctx* sc;
109   struct ipc_peer_ctx* pc;
110   uv_loop_t* loop;
111   uv_buf_t buf;
112 
113   loop = ipc_pipe->loop;
114   buf = uv_buf_init("PING", 4);
115   sc = container_of(ipc_pipe, struct ipc_server_ctx, ipc_pipe);
116   pc = calloc(1, sizeof(*pc));
117   ASSERT_NOT_NULL(pc);
118 
119   if (ipc_pipe->type == UV_TCP)
120     ASSERT(0 == uv_tcp_init(loop, (uv_tcp_t*) &pc->peer_handle));
121   else if (ipc_pipe->type == UV_NAMED_PIPE)
122     ASSERT(0 == uv_pipe_init(loop, (uv_pipe_t*) &pc->peer_handle, 1));
123   else
124     ASSERT(0);
125 
126   ASSERT(0 == uv_accept(ipc_pipe, (uv_stream_t*) &pc->peer_handle));
127   ASSERT(0 == uv_write2(&pc->write_req,
128                         (uv_stream_t*) &pc->peer_handle,
129                         &buf,
130                         1,
131                         (uv_stream_t*) &sc->server_handle,
132                         ipc_write_cb));
133 
134   if (--sc->num_connects == 0)
135     uv_close((uv_handle_t*) ipc_pipe, NULL);
136 }
137 
138 
ipc_write_cb(uv_write_t * req,int status)139 static void ipc_write_cb(uv_write_t* req, int status) {
140   struct ipc_peer_ctx* ctx;
141   ctx = container_of(req, struct ipc_peer_ctx, write_req);
142   uv_close((uv_handle_t*) &ctx->peer_handle, ipc_close_cb);
143 }
144 
145 
ipc_close_cb(uv_handle_t * handle)146 static void ipc_close_cb(uv_handle_t* handle) {
147   struct ipc_peer_ctx* ctx;
148   ctx = container_of(handle, struct ipc_peer_ctx, peer_handle);
149   free(ctx);
150 }
151 
152 
ipc_connect_cb(uv_connect_t * req,int status)153 static void ipc_connect_cb(uv_connect_t* req, int status) {
154   struct ipc_client_ctx* ctx;
155   ctx = container_of(req, struct ipc_client_ctx, connect_req);
156   ASSERT(0 == status);
157   ASSERT(0 == uv_read_start((uv_stream_t*) &ctx->ipc_pipe,
158                             ipc_alloc_cb,
159                             ipc_read_cb));
160 }
161 
162 
ipc_alloc_cb(uv_handle_t * handle,size_t suggested_size,uv_buf_t * buf)163 static void ipc_alloc_cb(uv_handle_t* handle,
164                          size_t suggested_size,
165                          uv_buf_t* buf) {
166   struct ipc_client_ctx* ctx;
167   ctx = container_of(handle, struct ipc_client_ctx, ipc_pipe);
168   buf->base = ctx->scratch;
169   buf->len = sizeof(ctx->scratch);
170 }
171 
172 
ipc_read_cb(uv_stream_t * handle,ssize_t nread,const uv_buf_t * buf)173 static void ipc_read_cb(uv_stream_t* handle,
174                         ssize_t nread,
175                         const uv_buf_t* buf) {
176   struct ipc_client_ctx* ctx;
177   uv_loop_t* loop;
178   uv_handle_type type;
179   uv_pipe_t* ipc_pipe;
180 
181   ipc_pipe = (uv_pipe_t*) handle;
182   ctx = container_of(ipc_pipe, struct ipc_client_ctx, ipc_pipe);
183   loop = ipc_pipe->loop;
184 
185   ASSERT(1 == uv_pipe_pending_count(ipc_pipe));
186   type = uv_pipe_pending_type(ipc_pipe);
187   if (type == UV_TCP)
188     ASSERT(0 == uv_tcp_init(loop, (uv_tcp_t*) ctx->server_handle));
189   else if (type == UV_NAMED_PIPE)
190     ASSERT(0 == uv_pipe_init(loop, (uv_pipe_t*) ctx->server_handle, 0));
191   else
192     ASSERT(0);
193 
194   ASSERT(0 == uv_accept(handle, ctx->server_handle));
195   uv_close((uv_handle_t*) &ctx->ipc_pipe, NULL);
196 }
197 
198 
199 /* Set up an IPC pipe server that hands out listen sockets to the worker
200  * threads. It's kind of cumbersome for such a simple operation, maybe we
201  * should revive uv_import() and uv_export().
202  */
send_listen_handles(uv_handle_type type,unsigned int num_servers,struct server_ctx * servers)203 static void send_listen_handles(uv_handle_type type,
204                                 unsigned int num_servers,
205                                 struct server_ctx* servers) {
206   struct ipc_server_ctx ctx;
207   uv_loop_t* loop;
208   unsigned int i;
209 
210   loop = uv_default_loop();
211   ctx.num_connects = num_servers;
212 
213   if (type == UV_TCP) {
214     ASSERT(0 == uv_tcp_init(loop, (uv_tcp_t*) &ctx.server_handle));
215     ASSERT(0 == uv_tcp_bind((uv_tcp_t*) &ctx.server_handle,
216                             (const struct sockaddr*) &listen_addr,
217                             0));
218   }
219   else
220     ASSERT(0);
221   /* We need to initialize this pipe with ipc=0 - this is not a uv_pipe we'll
222    * be sending handles over, it's just for listening for new connections.
223    * If we accept a connection then the connected pipe must be initialized
224    * with ipc=1.
225    */
226   ASSERT(0 == uv_pipe_init(loop, &ctx.ipc_pipe, 0));
227   ASSERT(0 == uv_pipe_bind(&ctx.ipc_pipe, IPC_PIPE_NAME));
228   ASSERT(0 == uv_listen((uv_stream_t*) &ctx.ipc_pipe, 128, ipc_connection_cb));
229 
230   for (i = 0; i < num_servers; i++)
231     uv_sem_post(&servers[i].semaphore);
232 
233   ASSERT(0 == uv_run(loop, UV_RUN_DEFAULT));
234   uv_close((uv_handle_t*) &ctx.server_handle, NULL);
235   ASSERT(0 == uv_run(loop, UV_RUN_DEFAULT));
236 
237   for (i = 0; i < num_servers; i++)
238     uv_sem_wait(&servers[i].semaphore);
239 }
240 
241 
get_listen_handle(uv_loop_t * loop,uv_stream_t * server_handle)242 static void get_listen_handle(uv_loop_t* loop, uv_stream_t* server_handle) {
243   struct ipc_client_ctx ctx;
244 
245   ctx.server_handle = server_handle;
246   ctx.server_handle->data = "server handle";
247 
248   ASSERT(0 == uv_pipe_init(loop, &ctx.ipc_pipe, 1));
249   uv_pipe_connect(&ctx.connect_req,
250                   &ctx.ipc_pipe,
251                   IPC_PIPE_NAME,
252                   ipc_connect_cb);
253   ASSERT(0 == uv_run(loop, UV_RUN_DEFAULT));
254 }
255 
256 
server_cb(void * arg)257 static void server_cb(void *arg) {
258   struct server_ctx *ctx;
259   uv_loop_t loop;
260 
261   ctx = arg;
262   ASSERT(0 == uv_loop_init(&loop));
263 
264   ASSERT(0 == uv_async_init(&loop, &ctx->async_handle, sv_async_cb));
265   uv_unref((uv_handle_t*) &ctx->async_handle);
266 
267   /* Wait until the main thread is ready. */
268   uv_sem_wait(&ctx->semaphore);
269   get_listen_handle(&loop, (uv_stream_t*) &ctx->server_handle);
270   uv_sem_post(&ctx->semaphore);
271 
272   /* Now start the actual benchmark. */
273   ASSERT(0 == uv_listen((uv_stream_t*) &ctx->server_handle,
274                         128,
275                         sv_connection_cb));
276   ASSERT(0 == uv_run(&loop, UV_RUN_DEFAULT));
277 
278   uv_loop_close(&loop);
279 }
280 
281 
sv_async_cb(uv_async_t * handle)282 static void sv_async_cb(uv_async_t* handle) {
283   struct server_ctx* ctx;
284   ctx = container_of(handle, struct server_ctx, async_handle);
285   uv_close((uv_handle_t*) &ctx->server_handle, NULL);
286   uv_close((uv_handle_t*) &ctx->async_handle, NULL);
287 }
288 
289 
sv_connection_cb(uv_stream_t * server_handle,int status)290 static void sv_connection_cb(uv_stream_t* server_handle, int status) {
291   handle_storage_t* storage;
292   struct server_ctx* ctx;
293 
294   ctx = container_of(server_handle, struct server_ctx, server_handle);
295   ASSERT(status == 0);
296 
297   storage = malloc(sizeof(*storage));
298   ASSERT_NOT_NULL(storage);
299 
300   if (server_handle->type == UV_TCP)
301     ASSERT(0 == uv_tcp_init(server_handle->loop, (uv_tcp_t*) storage));
302   else if (server_handle->type == UV_NAMED_PIPE)
303     ASSERT(0 == uv_pipe_init(server_handle->loop, (uv_pipe_t*) storage, 0));
304   else
305     ASSERT(0);
306 
307   ASSERT(0 == uv_accept(server_handle, (uv_stream_t*) storage));
308   ASSERT(0 == uv_read_start((uv_stream_t*) storage, sv_alloc_cb, sv_read_cb));
309   ctx->num_connects++;
310 }
311 
312 
sv_alloc_cb(uv_handle_t * handle,size_t suggested_size,uv_buf_t * buf)313 static void sv_alloc_cb(uv_handle_t* handle,
314                         size_t suggested_size,
315                         uv_buf_t* buf) {
316   static char slab[32];
317   buf->base = slab;
318   buf->len = sizeof(slab);
319 }
320 
321 
sv_read_cb(uv_stream_t * handle,ssize_t nread,const uv_buf_t * buf)322 static void sv_read_cb(uv_stream_t* handle,
323                        ssize_t nread,
324                        const uv_buf_t* buf) {
325   ASSERT(nread == UV_EOF);
326   uv_close((uv_handle_t*) handle, (uv_close_cb) free);
327 }
328 
329 
cl_connect_cb(uv_connect_t * req,int status)330 static void cl_connect_cb(uv_connect_t* req, int status) {
331   struct client_ctx* ctx = container_of(req, struct client_ctx, connect_req);
332   uv_idle_start(&ctx->idle_handle, cl_idle_cb);
333   ASSERT(0 == status);
334 }
335 
336 
cl_idle_cb(uv_idle_t * handle)337 static void cl_idle_cb(uv_idle_t* handle) {
338   struct client_ctx* ctx = container_of(handle, struct client_ctx, idle_handle);
339   uv_close((uv_handle_t*) &ctx->client_handle, cl_close_cb);
340   uv_idle_stop(&ctx->idle_handle);
341 }
342 
343 
cl_close_cb(uv_handle_t * handle)344 static void cl_close_cb(uv_handle_t* handle) {
345   struct client_ctx* ctx;
346 
347   ctx = container_of(handle, struct client_ctx, client_handle);
348 
349   if (--ctx->num_connects == 0) {
350     uv_close((uv_handle_t*) &ctx->idle_handle, NULL);
351     return;
352   }
353 
354   ASSERT(0 == uv_tcp_init(handle->loop, (uv_tcp_t*) &ctx->client_handle));
355   ASSERT(0 == uv_tcp_connect(&ctx->connect_req,
356                              (uv_tcp_t*) &ctx->client_handle,
357                              (const struct sockaddr*) &listen_addr,
358                              cl_connect_cb));
359 }
360 
361 
test_tcp(unsigned int num_servers,unsigned int num_clients)362 static int test_tcp(unsigned int num_servers, unsigned int num_clients) {
363   struct server_ctx* servers;
364   struct client_ctx* clients;
365   uv_loop_t* loop;
366   uv_tcp_t* handle;
367   unsigned int i;
368   double time;
369 
370   ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &listen_addr));
371   loop = uv_default_loop();
372 
373   servers = calloc(num_servers, sizeof(servers[0]));
374   clients = calloc(num_clients, sizeof(clients[0]));
375   ASSERT_NOT_NULL(servers);
376   ASSERT_NOT_NULL(clients);
377 
378   /* We're making the assumption here that from the perspective of the
379    * OS scheduler, threads are functionally equivalent to and interchangeable
380    * with full-blown processes.
381    */
382   for (i = 0; i < num_servers; i++) {
383     struct server_ctx* ctx = servers + i;
384     ASSERT(0 == uv_sem_init(&ctx->semaphore, 0));
385     ASSERT(0 == uv_thread_create(&ctx->thread_id, server_cb, ctx));
386   }
387 
388   send_listen_handles(UV_TCP, num_servers, servers);
389 
390   for (i = 0; i < num_clients; i++) {
391     struct client_ctx* ctx = clients + i;
392     ctx->num_connects = NUM_CONNECTS / num_clients;
393     handle = (uv_tcp_t*) &ctx->client_handle;
394     handle->data = "client handle";
395     ASSERT(0 == uv_tcp_init(loop, handle));
396     ASSERT(0 == uv_tcp_connect(&ctx->connect_req,
397                                handle,
398                                (const struct sockaddr*) &listen_addr,
399                                cl_connect_cb));
400     ASSERT(0 == uv_idle_init(loop, &ctx->idle_handle));
401   }
402 
403   {
404     uint64_t t = uv_hrtime();
405     ASSERT(0 == uv_run(loop, UV_RUN_DEFAULT));
406     t = uv_hrtime() - t;
407     time = t / 1e9;
408   }
409 
410   for (i = 0; i < num_servers; i++) {
411     struct server_ctx* ctx = servers + i;
412     uv_async_send(&ctx->async_handle);
413     ASSERT(0 == uv_thread_join(&ctx->thread_id));
414     uv_sem_destroy(&ctx->semaphore);
415   }
416 
417   printf("accept%u: %.0f accepts/sec (%u total)\n",
418          num_servers,
419          NUM_CONNECTS / time,
420          NUM_CONNECTS);
421 
422   for (i = 0; i < num_servers; i++) {
423     struct server_ctx* ctx = servers + i;
424     printf("  thread #%u: %.0f accepts/sec (%u total, %.1f%%)\n",
425            i,
426            ctx->num_connects / time,
427            ctx->num_connects,
428            ctx->num_connects * 100.0 / NUM_CONNECTS);
429   }
430 
431   free(clients);
432   free(servers);
433 
434   MAKE_VALGRIND_HAPPY();
435   return 0;
436 }
437 
438 
BENCHMARK_IMPL(tcp_multi_accept2)439 BENCHMARK_IMPL(tcp_multi_accept2) {
440   return test_tcp(2, 40);
441 }
442 
443 
BENCHMARK_IMPL(tcp_multi_accept4)444 BENCHMARK_IMPL(tcp_multi_accept4) {
445   return test_tcp(4, 40);
446 }
447 
448 
BENCHMARK_IMPL(tcp_multi_accept8)449 BENCHMARK_IMPL(tcp_multi_accept8) {
450   return test_tcp(8, 40);
451 }
452