• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21 
22 #include "task.h"
23 #include "uv.h"
24 
25 /* Update this is you're going to run > 1000 concurrent requests. */
26 #define MAX_CONNS 1000
27 
28 #undef NANOSEC
29 #define NANOSEC ((uint64_t) 1e9)
30 
31 #undef DEBUG
32 #define DEBUG 0
33 
34 struct conn_rec_s;
35 
36 typedef void (*setup_fn)(int num, void* arg);
37 typedef void (*make_connect_fn)(struct conn_rec_s* conn);
38 typedef int (*connect_fn)(int num, make_connect_fn make_connect, void* arg);
39 
40 /* Base class for tcp_conn_rec and pipe_conn_rec.
41  * The ordering of fields matters!
42  */
43 typedef struct conn_rec_s {
44   int i;
45   uv_connect_t conn_req;
46   uv_write_t write_req;
47   make_connect_fn make_connect;
48   uv_stream_t stream;
49 } conn_rec;
50 
51 typedef struct {
52   int i;
53   uv_connect_t conn_req;
54   uv_write_t write_req;
55   make_connect_fn make_connect;
56   uv_tcp_t stream;
57 } tcp_conn_rec;
58 
59 typedef struct {
60   int i;
61   uv_connect_t conn_req;
62   uv_write_t write_req;
63   make_connect_fn make_connect;
64   uv_pipe_t stream;
65 } pipe_conn_rec;
66 
67 static char buffer[] = "QS";
68 
69 static uv_loop_t* loop;
70 
71 static tcp_conn_rec tcp_conns[MAX_CONNS];
72 static pipe_conn_rec pipe_conns[MAX_CONNS];
73 
74 static uint64_t start; /* in ms  */
75 static int closed_streams;
76 static int conns_failed;
77 
78 static void alloc_cb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
79 static void connect_cb(uv_connect_t* conn_req, int status);
80 static void read_cb(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf);
81 static void close_cb(uv_handle_t* handle);
82 
83 
alloc_cb(uv_handle_t * handle,size_t suggested_size,uv_buf_t * buf)84 static void alloc_cb(uv_handle_t* handle,
85                      size_t suggested_size,
86                      uv_buf_t* buf) {
87   static char slab[65536];
88   buf->base = slab;
89   buf->len = sizeof(slab);
90 }
91 
92 
after_write(uv_write_t * req,int status)93 static void after_write(uv_write_t* req, int status) {
94   if (status != 0) {
95     fprintf(stderr, "write error %s\n", uv_err_name(status));
96     uv_close((uv_handle_t*)req->handle, close_cb);
97     conns_failed++;
98     return;
99   }
100 }
101 
102 
connect_cb(uv_connect_t * req,int status)103 static void connect_cb(uv_connect_t* req, int status) {
104   conn_rec* conn;
105   uv_buf_t buf;
106   int r;
107 
108   if (status != 0) {
109 #if DEBUG
110     fprintf(stderr, "connect error %s\n", uv_err_name(status));
111 #endif
112     uv_close((uv_handle_t*)req->handle, close_cb);
113     conns_failed++;
114     return;
115   }
116 
117   ASSERT_NOT_NULL(req);
118   ASSERT(status == 0);
119 
120   conn = (conn_rec*)req->data;
121   ASSERT_NOT_NULL(conn);
122 
123 #if DEBUG
124   printf("connect_cb %d\n", conn->i);
125 #endif
126 
127   r = uv_read_start(&conn->stream, alloc_cb, read_cb);
128   ASSERT(r == 0);
129 
130   buf.base = buffer;
131   buf.len = sizeof(buffer) - 1;
132 
133   r = uv_write(&conn->write_req, &conn->stream, &buf, 1, after_write);
134   ASSERT(r == 0);
135 }
136 
137 
read_cb(uv_stream_t * stream,ssize_t nread,const uv_buf_t * buf)138 static void read_cb(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
139 
140   ASSERT_NOT_NULL(stream);
141 
142 #if DEBUG
143   printf("read_cb %d\n", p->i);
144 #endif
145 
146   uv_close((uv_handle_t*)stream, close_cb);
147 
148   if (nread < 0) {
149     if (nread == UV_EOF) {
150       ;
151     } else if (nread == UV_ECONNRESET) {
152       conns_failed++;
153     } else {
154       fprintf(stderr, "read error %s\n", uv_err_name(nread));
155       ASSERT(0);
156     }
157   }
158 }
159 
160 
close_cb(uv_handle_t * handle)161 static void close_cb(uv_handle_t* handle) {
162   conn_rec* p = (conn_rec*)handle->data;
163 
164   ASSERT_NOT_NULL(handle);
165   closed_streams++;
166 
167 #if DEBUG
168   printf("close_cb %d\n", p->i);
169 #endif
170 
171   if (uv_now(loop) - start < 10000) {
172     p->make_connect(p);
173   }
174 }
175 
176 
tcp_do_setup(int num,void * arg)177 static void tcp_do_setup(int num, void* arg) {
178   int i;
179 
180   for (i = 0; i < num; i++) {
181     tcp_conns[i].i = i;
182   }
183 }
184 
185 
pipe_do_setup(int num,void * arg)186 static void pipe_do_setup(int num, void* arg) {
187   int i;
188 
189   for (i = 0; i < num; i++) {
190     pipe_conns[i].i = i;
191   }
192 }
193 
194 
tcp_make_connect(conn_rec * p)195 static void tcp_make_connect(conn_rec* p) {
196   struct sockaddr_in addr;
197   tcp_conn_rec* tp;
198   int r;
199 
200   tp = (tcp_conn_rec*) p;
201 
202   r = uv_tcp_init(loop, (uv_tcp_t*)&p->stream);
203   ASSERT(r == 0);
204 
205   ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &addr));
206 
207   r = uv_tcp_connect(&tp->conn_req,
208                      (uv_tcp_t*) &p->stream,
209                      (const struct sockaddr*) &addr,
210                      connect_cb);
211   if (r) {
212     fprintf(stderr, "uv_tcp_connect error %s\n", uv_err_name(r));
213     ASSERT(0);
214   }
215 
216 #if DEBUG
217   printf("make connect %d\n", p->i);
218 #endif
219 
220   p->conn_req.data = p;
221   p->write_req.data = p;
222   p->stream.data = p;
223 }
224 
225 
pipe_make_connect(conn_rec * p)226 static void pipe_make_connect(conn_rec* p) {
227   int r;
228 
229   r = uv_pipe_init(loop, (uv_pipe_t*)&p->stream, 0);
230   ASSERT(r == 0);
231 
232   uv_pipe_connect(&((pipe_conn_rec*) p)->conn_req,
233                   (uv_pipe_t*) &p->stream,
234                   TEST_PIPENAME,
235                   connect_cb);
236 
237 #if DEBUG
238   printf("make connect %d\n", p->i);
239 #endif
240 
241   p->conn_req.data = p;
242   p->write_req.data = p;
243   p->stream.data = p;
244 }
245 
246 
tcp_do_connect(int num,make_connect_fn make_connect,void * arg)247 static int tcp_do_connect(int num, make_connect_fn make_connect, void* arg) {
248   int i;
249 
250   for (i = 0; i < num; i++) {
251     tcp_make_connect((conn_rec*)&tcp_conns[i]);
252     tcp_conns[i].make_connect = make_connect;
253   }
254 
255   return 0;
256 }
257 
258 
pipe_do_connect(int num,make_connect_fn make_connect,void * arg)259 static int pipe_do_connect(int num, make_connect_fn make_connect, void* arg) {
260   int i;
261 
262   for (i = 0; i < num; i++) {
263     pipe_make_connect((conn_rec*)&pipe_conns[i]);
264     pipe_conns[i].make_connect = make_connect;
265   }
266 
267   return 0;
268 }
269 
270 
pound_it(int concurrency,const char * type,setup_fn do_setup,connect_fn do_connect,make_connect_fn make_connect,void * arg)271 static int pound_it(int concurrency,
272                     const char* type,
273                     setup_fn do_setup,
274                     connect_fn do_connect,
275                     make_connect_fn make_connect,
276                     void* arg) {
277   double secs;
278   int r;
279   uint64_t start_time; /* in ns */
280   uint64_t end_time;
281 
282   loop = uv_default_loop();
283 
284   uv_update_time(loop);
285   start = uv_now(loop);
286 
287   /* Run benchmark for at least five seconds. */
288   start_time = uv_hrtime();
289 
290   do_setup(concurrency, arg);
291 
292   r = do_connect(concurrency, make_connect, arg);
293   ASSERT(!r);
294 
295   uv_run(loop, UV_RUN_DEFAULT);
296 
297   end_time = uv_hrtime();
298 
299   /* Number of fractional seconds it took to run the benchmark. */
300   secs = (double)(end_time - start_time) / NANOSEC;
301 
302   fprintf(stderr, "%s-conn-pound-%d: %.0f accepts/s (%d failed)\n",
303           type,
304           concurrency,
305           closed_streams / secs,
306           conns_failed);
307   fflush(stderr);
308 
309   MAKE_VALGRIND_HAPPY();
310   return 0;
311 }
312 
313 
BENCHMARK_IMPL(tcp4_pound_100)314 BENCHMARK_IMPL(tcp4_pound_100) {
315   return pound_it(100,
316                   "tcp",
317                   tcp_do_setup,
318                   tcp_do_connect,
319                   tcp_make_connect,
320                   NULL);
321 }
322 
323 
BENCHMARK_IMPL(tcp4_pound_1000)324 BENCHMARK_IMPL(tcp4_pound_1000) {
325   return pound_it(1000,
326                   "tcp",
327                   tcp_do_setup,
328                   tcp_do_connect,
329                   tcp_make_connect,
330                   NULL);
331 }
332 
333 
BENCHMARK_IMPL(pipe_pound_100)334 BENCHMARK_IMPL(pipe_pound_100) {
335   return pound_it(100,
336                   "pipe",
337                   pipe_do_setup,
338                   pipe_do_connect,
339                   pipe_make_connect,
340                   NULL);
341 }
342 
343 
BENCHMARK_IMPL(pipe_pound_1000)344 BENCHMARK_IMPL(pipe_pound_1000) {
345   return pound_it(1000,
346                   "pipe",
347                   pipe_do_setup,
348                   pipe_do_connect,
349                   pipe_make_connect,
350                   NULL);
351 }
352