1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2 *
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
9 *
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19 * IN THE SOFTWARE.
20 */
21
22 #include "task.h"
23 #include "uv.h"
24
25 #include <math.h>
26 #include <stdio.h>
27
28
29 static int TARGET_CONNECTIONS;
30 #define WRITE_BUFFER_SIZE 8192
31 #define MAX_SIMULTANEOUS_CONNECTS 100
32
33 #define PRINT_STATS 0
34 #define STATS_INTERVAL 1000 /* msec */
35 #define STATS_COUNT 5
36
37
38 static void do_write(uv_stream_t*);
39 static void maybe_connect_some(void);
40
41 static uv_req_t* req_alloc(void);
42 static void req_free(uv_req_t* uv_req);
43
44 static void buf_alloc(uv_handle_t* handle, size_t size, uv_buf_t* buf);
45 static void buf_free(const uv_buf_t* buf);
46
47 static uv_loop_t* loop;
48
49 static uv_tcp_t tcpServer;
50 static uv_pipe_t pipeServer;
51 static uv_stream_t* server;
52 static struct sockaddr_in listen_addr;
53 static struct sockaddr_in connect_addr;
54
55 static int64_t start_time;
56
57 static int max_connect_socket = 0;
58 static int max_read_sockets = 0;
59 static int read_sockets = 0;
60 static int write_sockets = 0;
61
62 static int64_t nrecv = 0;
63 static int64_t nrecv_total = 0;
64 static int64_t nsent = 0;
65 static int64_t nsent_total = 0;
66
67 static int stats_left = 0;
68
69 static char write_buffer[WRITE_BUFFER_SIZE];
70
71 /* Make this as large as you need. */
72 #define MAX_WRITE_HANDLES 1000
73
74 static stream_type type;
75
76 static uv_tcp_t tcp_write_handles[MAX_WRITE_HANDLES];
77 static uv_pipe_t pipe_write_handles[MAX_WRITE_HANDLES];
78
79 static uv_timer_t timer_handle;
80
81
gbit(int64_t bytes,int64_t passed_ms)82 static double gbit(int64_t bytes, int64_t passed_ms) {
83 double gbits = ((double)bytes / (1024 * 1024 * 1024)) * 8;
84 return gbits / ((double)passed_ms / 1000);
85 }
86
87
show_stats(uv_timer_t * handle)88 static void show_stats(uv_timer_t* handle) {
89 int64_t diff;
90 int i;
91
92 #if PRINT_STATS
93 fprintf(stderr, "connections: %d, write: %.1f gbit/s\n",
94 write_sockets,
95 gbit(nsent, STATS_INTERVAL));
96 fflush(stderr);
97 #endif
98
99 /* Exit if the show is over */
100 if (!--stats_left) {
101
102 uv_update_time(loop);
103 diff = uv_now(loop) - start_time;
104
105 fprintf(stderr, "%s_pump%d_client: %.1f gbit/s\n",
106 type == TCP ? "tcp" : "pipe",
107 write_sockets,
108 gbit(nsent_total, diff));
109 fflush(stderr);
110
111 for (i = 0; i < write_sockets; i++) {
112 if (type == TCP)
113 uv_close((uv_handle_t*) &tcp_write_handles[i], NULL);
114 else
115 uv_close((uv_handle_t*) &pipe_write_handles[i], NULL);
116 }
117
118 exit(0);
119 }
120
121 /* Reset read and write counters */
122 nrecv = 0;
123 nsent = 0;
124 }
125
126
read_show_stats(void)127 static void read_show_stats(void) {
128 int64_t diff;
129
130 uv_update_time(loop);
131 diff = uv_now(loop) - start_time;
132
133 fprintf(stderr, "%s_pump%d_server: %.1f gbit/s\n",
134 type == TCP ? "tcp" : "pipe",
135 max_read_sockets,
136 gbit(nrecv_total, diff));
137 fflush(stderr);
138 }
139
140
141
read_sockets_close_cb(uv_handle_t * handle)142 static void read_sockets_close_cb(uv_handle_t* handle) {
143 free(handle);
144 read_sockets--;
145
146 /* If it's past the first second and everyone has closed their connection
147 * Then print stats.
148 */
149 if (uv_now(loop) - start_time > 1000 && read_sockets == 0) {
150 read_show_stats();
151 uv_close((uv_handle_t*)server, NULL);
152 }
153 }
154
155
start_stats_collection(void)156 static void start_stats_collection(void) {
157 int r;
158
159 /* Show-stats timer */
160 stats_left = STATS_COUNT;
161 r = uv_timer_init(loop, &timer_handle);
162 ASSERT(r == 0);
163 r = uv_timer_start(&timer_handle, show_stats, STATS_INTERVAL, STATS_INTERVAL);
164 ASSERT(r == 0);
165
166 uv_update_time(loop);
167 start_time = uv_now(loop);
168 }
169
170
read_cb(uv_stream_t * stream,ssize_t bytes,const uv_buf_t * buf)171 static void read_cb(uv_stream_t* stream, ssize_t bytes, const uv_buf_t* buf) {
172 if (nrecv_total == 0) {
173 ASSERT(start_time == 0);
174 uv_update_time(loop);
175 start_time = uv_now(loop);
176 }
177
178 if (bytes < 0) {
179 uv_close((uv_handle_t*)stream, read_sockets_close_cb);
180 return;
181 }
182
183 buf_free(buf);
184
185 nrecv += bytes;
186 nrecv_total += bytes;
187 }
188
189
write_cb(uv_write_t * req,int status)190 static void write_cb(uv_write_t* req, int status) {
191 ASSERT(status == 0);
192
193 req_free((uv_req_t*) req);
194
195 nsent += sizeof write_buffer;
196 nsent_total += sizeof write_buffer;
197
198 do_write((uv_stream_t*) req->handle);
199 }
200
201
do_write(uv_stream_t * stream)202 static void do_write(uv_stream_t* stream) {
203 uv_write_t* req;
204 uv_buf_t buf;
205 int r;
206
207 buf.base = (char*) &write_buffer;
208 buf.len = sizeof write_buffer;
209
210 req = (uv_write_t*) req_alloc();
211 r = uv_write(req, stream, &buf, 1, write_cb);
212 ASSERT(r == 0);
213 }
214
215
connect_cb(uv_connect_t * req,int status)216 static void connect_cb(uv_connect_t* req, int status) {
217 int i;
218
219 if (status) {
220 fprintf(stderr, "%s", uv_strerror(status));
221 fflush(stderr);
222 }
223 ASSERT(status == 0);
224
225 write_sockets++;
226 req_free((uv_req_t*) req);
227
228 maybe_connect_some();
229
230 if (write_sockets == TARGET_CONNECTIONS) {
231 start_stats_collection();
232
233 /* Yay! start writing */
234 for (i = 0; i < write_sockets; i++) {
235 if (type == TCP)
236 do_write((uv_stream_t*) &tcp_write_handles[i]);
237 else
238 do_write((uv_stream_t*) &pipe_write_handles[i]);
239 }
240 }
241 }
242
243
maybe_connect_some(void)244 static void maybe_connect_some(void) {
245 uv_connect_t* req;
246 uv_tcp_t* tcp;
247 uv_pipe_t* pipe;
248 int r;
249
250 while (max_connect_socket < TARGET_CONNECTIONS &&
251 max_connect_socket < write_sockets + MAX_SIMULTANEOUS_CONNECTS) {
252 if (type == TCP) {
253 tcp = &tcp_write_handles[max_connect_socket++];
254
255 r = uv_tcp_init(loop, tcp);
256 ASSERT(r == 0);
257
258 req = (uv_connect_t*) req_alloc();
259 r = uv_tcp_connect(req,
260 tcp,
261 (const struct sockaddr*) &connect_addr,
262 connect_cb);
263 ASSERT(r == 0);
264 } else {
265 pipe = &pipe_write_handles[max_connect_socket++];
266
267 r = uv_pipe_init(loop, pipe, 0);
268 ASSERT(r == 0);
269
270 req = (uv_connect_t*) req_alloc();
271 uv_pipe_connect(req, pipe, TEST_PIPENAME, connect_cb);
272 }
273 }
274 }
275
276
connection_cb(uv_stream_t * s,int status)277 static void connection_cb(uv_stream_t* s, int status) {
278 uv_stream_t* stream;
279 int r;
280
281 ASSERT(server == s);
282 ASSERT(status == 0);
283
284 if (type == TCP) {
285 stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
286 r = uv_tcp_init(loop, (uv_tcp_t*)stream);
287 ASSERT(r == 0);
288 } else {
289 stream = (uv_stream_t*)malloc(sizeof(uv_pipe_t));
290 r = uv_pipe_init(loop, (uv_pipe_t*)stream, 0);
291 ASSERT(r == 0);
292 }
293
294 r = uv_accept(s, stream);
295 ASSERT(r == 0);
296
297 r = uv_read_start(stream, buf_alloc, read_cb);
298 ASSERT(r == 0);
299
300 read_sockets++;
301 max_read_sockets++;
302 }
303
304
305 /*
306 * Request allocator
307 */
308
309 typedef struct req_list_s {
310 union uv_any_req uv_req;
311 struct req_list_s* next;
312 } req_list_t;
313
314
315 static req_list_t* req_freelist = NULL;
316
317
req_alloc(void)318 static uv_req_t* req_alloc(void) {
319 req_list_t* req;
320
321 req = req_freelist;
322 if (req != NULL) {
323 req_freelist = req->next;
324 return (uv_req_t*) req;
325 }
326
327 req = (req_list_t*) malloc(sizeof *req);
328 return (uv_req_t*) req;
329 }
330
331
req_free(uv_req_t * uv_req)332 static void req_free(uv_req_t* uv_req) {
333 req_list_t* req = (req_list_t*) uv_req;
334
335 req->next = req_freelist;
336 req_freelist = req;
337 }
338
339
340 /*
341 * Buffer allocator
342 */
343
344 typedef struct buf_list_s {
345 uv_buf_t uv_buf_t;
346 struct buf_list_s* next;
347 } buf_list_t;
348
349
350 static buf_list_t* buf_freelist = NULL;
351
352
buf_alloc(uv_handle_t * handle,size_t size,uv_buf_t * buf)353 static void buf_alloc(uv_handle_t* handle, size_t size, uv_buf_t* buf) {
354 buf_list_t* ab;
355
356 ab = buf_freelist;
357 if (ab != NULL)
358 buf_freelist = ab->next;
359 else {
360 ab = malloc(size + sizeof(*ab));
361 ab->uv_buf_t.len = size;
362 ab->uv_buf_t.base = (char*) (ab + 1);
363 }
364
365 *buf = ab->uv_buf_t;
366 }
367
368
buf_free(const uv_buf_t * buf)369 static void buf_free(const uv_buf_t* buf) {
370 buf_list_t* ab = (buf_list_t*) buf->base - 1;
371 ab->next = buf_freelist;
372 buf_freelist = ab;
373 }
374
375
HELPER_IMPL(tcp_pump_server)376 HELPER_IMPL(tcp_pump_server) {
377 int r;
378
379 type = TCP;
380 loop = uv_default_loop();
381
382 ASSERT(0 == uv_ip4_addr("0.0.0.0", TEST_PORT, &listen_addr));
383
384 /* Server */
385 server = (uv_stream_t*)&tcpServer;
386 r = uv_tcp_init(loop, &tcpServer);
387 ASSERT(r == 0);
388 r = uv_tcp_bind(&tcpServer, (const struct sockaddr*) &listen_addr, 0);
389 ASSERT(r == 0);
390 r = uv_listen((uv_stream_t*)&tcpServer, MAX_WRITE_HANDLES, connection_cb);
391 ASSERT(r == 0);
392
393 notify_parent_process();
394 uv_run(loop, UV_RUN_DEFAULT);
395
396 return 0;
397 }
398
399
HELPER_IMPL(pipe_pump_server)400 HELPER_IMPL(pipe_pump_server) {
401 int r;
402 type = PIPE;
403
404 loop = uv_default_loop();
405
406 /* Server */
407 server = (uv_stream_t*)&pipeServer;
408 r = uv_pipe_init(loop, &pipeServer, 0);
409 ASSERT(r == 0);
410 r = uv_pipe_bind(&pipeServer, TEST_PIPENAME);
411 ASSERT(r == 0);
412 r = uv_listen((uv_stream_t*)&pipeServer, MAX_WRITE_HANDLES, connection_cb);
413 ASSERT(r == 0);
414
415 notify_parent_process();
416 uv_run(loop, UV_RUN_DEFAULT);
417
418 MAKE_VALGRIND_HAPPY();
419 return 0;
420 }
421
422
tcp_pump(int n)423 static void tcp_pump(int n) {
424 ASSERT(n <= MAX_WRITE_HANDLES);
425 TARGET_CONNECTIONS = n;
426 type = TCP;
427
428 loop = uv_default_loop();
429
430 ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &connect_addr));
431
432 /* Start making connections */
433 maybe_connect_some();
434
435 uv_run(loop, UV_RUN_DEFAULT);
436
437 MAKE_VALGRIND_HAPPY();
438 }
439
440
pipe_pump(int n)441 static void pipe_pump(int n) {
442 ASSERT(n <= MAX_WRITE_HANDLES);
443 TARGET_CONNECTIONS = n;
444 type = PIPE;
445
446 loop = uv_default_loop();
447
448 /* Start making connections */
449 maybe_connect_some();
450
451 uv_run(loop, UV_RUN_DEFAULT);
452
453 MAKE_VALGRIND_HAPPY();
454 }
455
456
BENCHMARK_IMPL(tcp_pump100_client)457 BENCHMARK_IMPL(tcp_pump100_client) {
458 tcp_pump(100);
459 return 0;
460 }
461
462
BENCHMARK_IMPL(tcp_pump1_client)463 BENCHMARK_IMPL(tcp_pump1_client) {
464 tcp_pump(1);
465 return 0;
466 }
467
468
BENCHMARK_IMPL(pipe_pump100_client)469 BENCHMARK_IMPL(pipe_pump100_client) {
470 pipe_pump(100);
471 return 0;
472 }
473
474
BENCHMARK_IMPL(pipe_pump1_client)475 BENCHMARK_IMPL(pipe_pump1_client) {
476 pipe_pump(1);
477 return 0;
478 }
479