1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 /*
20 Basic I/O ping-pong benchmarks.
21
22 The goal here is to establish lower bounds on how fast the stack could get by
23 measuring the cost of using various I/O strategies to do a basic
24 request-response loop.
25 */
26
27 #include <errno.h>
28 #include <netinet/ip.h>
29 #include <poll.h>
30 #include <stdio.h>
31 #include <string.h>
32 #ifdef __linux__
33 #include <sys/epoll.h>
34 #endif
35 #include <sys/socket.h>
36
37 #include <grpc/support/alloc.h>
38 #include <grpc/support/log.h>
39 #include <grpc/support/time.h>
40
41 #include "src/core/lib/gpr/useful.h"
42 #include "src/core/lib/gprpp/thd.h"
43 #include "src/core/lib/iomgr/error.h"
44 #include "src/core/lib/iomgr/socket_utils_posix.h"
45 #include "test/core/util/cmdline.h"
46 #include "test/core/util/histogram.h"
47
48 typedef struct fd_pair {
49 int read_fd;
50 int write_fd;
51 } fd_pair;
52
53 typedef struct thread_args {
54 fd_pair fds;
55 size_t msg_size;
56 int (*read_bytes)(struct thread_args* args, char* buf);
57 int (*write_bytes)(struct thread_args* args, char* buf);
58 int (*setup)(struct thread_args* args);
59 int epoll_fd;
60 const char* strategy_name;
61 } thread_args;
62
63 /*
64 Read strategies
65
66 There are a number of read strategies, each of which has a blocking and
67 non-blocking version.
68 */
69
70 /* Basic call to read() */
read_bytes(int fd,char * buf,size_t read_size,int spin)71 static int read_bytes(int fd, char* buf, size_t read_size, int spin) {
72 size_t bytes_read = 0;
73 ssize_t err;
74 do {
75 err = read(fd, buf + bytes_read, read_size - bytes_read);
76 if (err < 0) {
77 if (errno == EINTR) {
78 continue;
79 } else {
80 if (errno == EAGAIN && spin == 1) {
81 continue;
82 }
83 gpr_log(GPR_ERROR, "Read failed: %s", strerror(errno));
84 return -1;
85 }
86 } else {
87 bytes_read += static_cast<size_t>(err);
88 }
89 } while (bytes_read < read_size);
90 return 0;
91 }
92
blocking_read_bytes(thread_args * args,char * buf)93 static int blocking_read_bytes(thread_args* args, char* buf) {
94 return read_bytes(args->fds.read_fd, buf, args->msg_size, 0);
95 }
96
spin_read_bytes(thread_args * args,char * buf)97 static int spin_read_bytes(thread_args* args, char* buf) {
98 return read_bytes(args->fds.read_fd, buf, args->msg_size, 1);
99 }
100
101 /* Call poll() to monitor a non-blocking fd */
poll_read_bytes(int fd,char * buf,size_t read_size,int spin)102 static int poll_read_bytes(int fd, char* buf, size_t read_size, int spin) {
103 struct pollfd pfd;
104 size_t bytes_read = 0;
105 int err;
106 ssize_t err2;
107
108 pfd.fd = fd;
109 pfd.events = POLLIN;
110 do {
111 err = poll(&pfd, 1, spin ? 0 : -1);
112 if (err < 0) {
113 if (errno == EINTR) {
114 continue;
115 } else {
116 gpr_log(GPR_ERROR, "Poll failed: %s", strerror(errno));
117 return -1;
118 }
119 }
120 if (err == 0 && spin) continue;
121 GPR_ASSERT(err == 1);
122 GPR_ASSERT(pfd.revents == POLLIN);
123 do {
124 err2 = read(fd, buf + bytes_read, read_size - bytes_read);
125 } while (err2 < 0 && errno == EINTR);
126 if (err2 < 0 && errno != EAGAIN) {
127 gpr_log(GPR_ERROR, "Read failed: %s", strerror(errno));
128 return -1;
129 }
130 bytes_read += static_cast<size_t>(err2);
131 } while (bytes_read < read_size);
132 return 0;
133 }
134
poll_read_bytes_blocking(struct thread_args * args,char * buf)135 static int poll_read_bytes_blocking(struct thread_args* args, char* buf) {
136 return poll_read_bytes(args->fds.read_fd, buf, args->msg_size, 0);
137 }
138
poll_read_bytes_spin(struct thread_args * args,char * buf)139 static int poll_read_bytes_spin(struct thread_args* args, char* buf) {
140 return poll_read_bytes(args->fds.read_fd, buf, args->msg_size, 1);
141 }
142
143 #ifdef __linux__
144 /* Call epoll_wait() to monitor a non-blocking fd */
epoll_read_bytes(struct thread_args * args,char * buf,int spin)145 static int epoll_read_bytes(struct thread_args* args, char* buf, int spin) {
146 struct epoll_event ev;
147 size_t bytes_read = 0;
148 int err;
149 ssize_t err2;
150 size_t read_size = args->msg_size;
151
152 do {
153 err = epoll_wait(args->epoll_fd, &ev, 1, spin ? 0 : -1);
154 if (err < 0) {
155 if (errno == EINTR) continue;
156 gpr_log(GPR_ERROR, "epoll_wait failed: %s", strerror(errno));
157 return -1;
158 }
159 if (err == 0 && spin) continue;
160 GPR_ASSERT(err == 1);
161 GPR_ASSERT(ev.events & EPOLLIN);
162 GPR_ASSERT(ev.data.fd == args->fds.read_fd);
163 do {
164 do {
165 err2 =
166 read(args->fds.read_fd, buf + bytes_read, read_size - bytes_read);
167 } while (err2 < 0 && errno == EINTR);
168 if (errno == EAGAIN) break;
169 bytes_read += static_cast<size_t>(err2);
170 /* TODO(klempner): This should really be doing an extra call after we are
171 done to ensure we see an EAGAIN */
172 } while (bytes_read < read_size);
173 } while (bytes_read < read_size);
174 GPR_ASSERT(bytes_read == read_size);
175 return 0;
176 }
177
epoll_read_bytes_blocking(struct thread_args * args,char * buf)178 static int epoll_read_bytes_blocking(struct thread_args* args, char* buf) {
179 return epoll_read_bytes(args, buf, 0);
180 }
181
epoll_read_bytes_spin(struct thread_args * args,char * buf)182 static int epoll_read_bytes_spin(struct thread_args* args, char* buf) {
183 return epoll_read_bytes(args, buf, 1);
184 }
185 #endif /* __linux__ */
186
187 /* Write out bytes.
188 At this point we only have one strategy, since in the common case these
189 writes go directly out to the kernel.
190 */
blocking_write_bytes(struct thread_args * args,char * buf)191 static int blocking_write_bytes(struct thread_args* args, char* buf) {
192 size_t bytes_written = 0;
193 ssize_t err;
194 size_t write_size = args->msg_size;
195 do {
196 err = write(args->fds.write_fd, buf + bytes_written,
197 write_size - bytes_written);
198 if (err < 0) {
199 if (errno == EINTR) {
200 continue;
201 } else {
202 gpr_log(GPR_ERROR, "Read failed: %s", strerror(errno));
203 return -1;
204 }
205 } else {
206 bytes_written += static_cast<size_t>(err);
207 }
208 } while (bytes_written < write_size);
209 return 0;
210 }
211
212 /*
213 Initialization code
214
215 These are called at the beginning of the client and server thread, depending
216 on the scenario we're using.
217 */
set_socket_nonblocking(thread_args * args)218 static int set_socket_nonblocking(thread_args* args) {
219 if (!GRPC_LOG_IF_ERROR("Unable to set read socket nonblocking",
220 grpc_set_socket_nonblocking(args->fds.read_fd, 1))) {
221 return -1;
222 }
223 if (!GRPC_LOG_IF_ERROR("Unable to set write socket nonblocking",
224 grpc_set_socket_nonblocking(args->fds.write_fd, 1))) {
225 return -1;
226 }
227 return 0;
228 }
229
do_nothing(thread_args * args)230 static int do_nothing(thread_args* args) { return 0; }
231
232 #ifdef __linux__
233 /* Special case for epoll, where we need to create the fd ahead of time. */
epoll_setup(thread_args * args)234 static int epoll_setup(thread_args* args) {
235 int epoll_fd;
236 struct epoll_event ev;
237 set_socket_nonblocking(args);
238 epoll_fd = epoll_create(1);
239 if (epoll_fd < 0) {
240 gpr_log(GPR_ERROR, "epoll_create: %s", strerror(errno));
241 return -1;
242 }
243
244 args->epoll_fd = epoll_fd;
245
246 ev.events = EPOLLIN | EPOLLET;
247 ev.data.fd = args->fds.read_fd;
248 if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, args->fds.read_fd, &ev) < 0) {
249 gpr_log(GPR_ERROR, "epoll_ctl: %s", strerror(errno));
250 }
251 return 0;
252 }
253 #endif
254
server_thread(thread_args * args)255 static void server_thread(thread_args* args) {
256 char* buf = static_cast<char*>(gpr_malloc(args->msg_size));
257 if (args->setup(args) < 0) {
258 gpr_log(GPR_ERROR, "Setup failed");
259 }
260 for (;;) {
261 if (args->read_bytes(args, buf) < 0) {
262 gpr_log(GPR_ERROR, "Server read failed");
263 gpr_free(buf);
264 return;
265 }
266 if (args->write_bytes(args, buf) < 0) {
267 gpr_log(GPR_ERROR, "Server write failed");
268 gpr_free(buf);
269 return;
270 }
271 }
272 }
273
server_thread_wrap(void * arg)274 static void server_thread_wrap(void* arg) {
275 thread_args* args = static_cast<thread_args*>(arg);
276 server_thread(args);
277 }
278
print_histogram(grpc_histogram * histogram)279 static void print_histogram(grpc_histogram* histogram) {
280 /* TODO(klempner): Print more detailed information, such as detailed histogram
281 buckets */
282 gpr_log(GPR_INFO, "latency (50/95/99/99.9): %f/%f/%f/%f",
283 grpc_histogram_percentile(histogram, 50),
284 grpc_histogram_percentile(histogram, 95),
285 grpc_histogram_percentile(histogram, 99),
286 grpc_histogram_percentile(histogram, 99.9));
287 }
288
now(void)289 static double now(void) {
290 gpr_timespec tv = gpr_now(GPR_CLOCK_REALTIME);
291 return 1e9 * static_cast<double>(tv.tv_sec) + static_cast<double>(tv.tv_nsec);
292 }
293
client_thread(thread_args * args)294 static void client_thread(thread_args* args) {
295 char* buf = static_cast<char*>(gpr_malloc(args->msg_size * sizeof(char)));
296 memset(buf, 0, args->msg_size * sizeof(char));
297 grpc_histogram* histogram = grpc_histogram_create(0.01, 60e9);
298 double start_time;
299 double end_time;
300 double interval;
301 const int kNumIters = 100000;
302 int i;
303
304 if (args->setup(args) < 0) {
305 gpr_log(GPR_ERROR, "Setup failed");
306 }
307 for (i = 0; i < kNumIters; ++i) {
308 start_time = now();
309 if (args->write_bytes(args, buf) < 0) {
310 gpr_log(GPR_ERROR, "Client write failed");
311 goto error;
312 }
313 if (args->read_bytes(args, buf) < 0) {
314 gpr_log(GPR_ERROR, "Client read failed");
315 goto error;
316 }
317 end_time = now();
318 if (i > kNumIters / 2) {
319 interval = end_time - start_time;
320 grpc_histogram_add(histogram, interval);
321 }
322 }
323 print_histogram(histogram);
324 error:
325 gpr_free(buf);
326 grpc_histogram_destroy(histogram);
327 }
328
329 /* This roughly matches tcp_server's create_listening_socket */
create_listening_socket(struct sockaddr * port,socklen_t len)330 static int create_listening_socket(struct sockaddr* port, socklen_t len) {
331 int fd = socket(port->sa_family, SOCK_STREAM, 0);
332 if (fd < 0) {
333 gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno));
334 goto error;
335 }
336
337 if (!GRPC_LOG_IF_ERROR("Failed to set listening socket cloexec",
338 grpc_set_socket_cloexec(fd, 1))) {
339 goto error;
340 }
341 if (!GRPC_LOG_IF_ERROR("Failed to set listening socket low latency",
342 grpc_set_socket_low_latency(fd, 1))) {
343 goto error;
344 }
345 if (!GRPC_LOG_IF_ERROR("Failed to set listening socket reuse addr",
346 grpc_set_socket_reuse_addr(fd, 1))) {
347 goto error;
348 }
349
350 if (bind(fd, port, len) < 0) {
351 gpr_log(GPR_ERROR, "bind: %s", strerror(errno));
352 goto error;
353 }
354
355 if (listen(fd, 1) < 0) {
356 gpr_log(GPR_ERROR, "listen: %s", strerror(errno));
357 goto error;
358 }
359
360 if (getsockname(fd, port, &len) < 0) {
361 gpr_log(GPR_ERROR, "getsockname: %s", strerror(errno));
362 goto error;
363 }
364
365 return fd;
366
367 error:
368 if (fd >= 0) {
369 close(fd);
370 }
371 return -1;
372 }
373
connect_client(struct sockaddr * addr,socklen_t len)374 static int connect_client(struct sockaddr* addr, socklen_t len) {
375 int fd = socket(addr->sa_family, SOCK_STREAM, 0);
376 int err;
377 if (fd < 0) {
378 gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno));
379 goto error;
380 }
381
382 if (!GRPC_LOG_IF_ERROR("Failed to set connecting socket cloexec",
383 grpc_set_socket_cloexec(fd, 1))) {
384 goto error;
385 }
386 if (!GRPC_LOG_IF_ERROR("Failed to set connecting socket low latency",
387 grpc_set_socket_low_latency(fd, 1))) {
388 goto error;
389 }
390
391 do {
392 err = connect(fd, addr, len);
393 } while (err < 0 && errno == EINTR);
394
395 if (err < 0) {
396 gpr_log(GPR_ERROR, "connect error: %s", strerror(errno));
397 goto error;
398 }
399 return fd;
400
401 error:
402 if (fd >= 0) {
403 close(fd);
404 }
405 return -1;
406 }
407
accept_server(int listen_fd)408 static int accept_server(int listen_fd) {
409 int fd = accept(listen_fd, nullptr, nullptr);
410 if (fd < 0) {
411 gpr_log(GPR_ERROR, "Accept failed: %s", strerror(errno));
412 return -1;
413 }
414 return fd;
415 }
416
create_sockets_tcp(fd_pair * client_fds,fd_pair * server_fds)417 static int create_sockets_tcp(fd_pair* client_fds, fd_pair* server_fds) {
418 int listen_fd = -1;
419 int client_fd = -1;
420 int server_fd = -1;
421
422 struct sockaddr_in port;
423 struct sockaddr* sa_port = reinterpret_cast<struct sockaddr*>(&port);
424
425 port.sin_family = AF_INET;
426 port.sin_port = 0;
427 port.sin_addr.s_addr = INADDR_ANY;
428
429 listen_fd = create_listening_socket(sa_port, sizeof(port));
430 if (listen_fd == -1) {
431 gpr_log(GPR_ERROR, "Listen failed");
432 goto error;
433 }
434
435 client_fd = connect_client(sa_port, sizeof(port));
436 if (client_fd == -1) {
437 gpr_log(GPR_ERROR, "Connect failed");
438 goto error;
439 }
440
441 server_fd = accept_server(listen_fd);
442 if (server_fd == -1) {
443 gpr_log(GPR_ERROR, "Accept failed");
444 goto error;
445 }
446
447 client_fds->read_fd = client_fd;
448 client_fds->write_fd = client_fd;
449 server_fds->read_fd = server_fd;
450 server_fds->write_fd = server_fd;
451 close(listen_fd);
452 return 0;
453
454 error:
455 if (listen_fd != -1) {
456 close(listen_fd);
457 }
458 if (client_fd != -1) {
459 close(client_fd);
460 }
461 if (server_fd != -1) {
462 close(server_fd);
463 }
464 return -1;
465 }
466
create_sockets_socketpair(fd_pair * client_fds,fd_pair * server_fds)467 static int create_sockets_socketpair(fd_pair* client_fds, fd_pair* server_fds) {
468 int fds[2];
469 if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0) {
470 gpr_log(GPR_ERROR, "socketpair: %s", strerror(errno));
471 return -1;
472 }
473
474 client_fds->read_fd = fds[0];
475 client_fds->write_fd = fds[0];
476 server_fds->read_fd = fds[1];
477 server_fds->write_fd = fds[1];
478 return 0;
479 }
480
create_sockets_pipe(fd_pair * client_fds,fd_pair * server_fds)481 static int create_sockets_pipe(fd_pair* client_fds, fd_pair* server_fds) {
482 int cfds[2];
483 int sfds[2];
484 if (pipe(cfds) < 0) {
485 gpr_log(GPR_ERROR, "pipe: %s", strerror(errno));
486 return -1;
487 }
488
489 if (pipe(sfds) < 0) {
490 gpr_log(GPR_ERROR, "pipe: %s", strerror(errno));
491 return -1;
492 }
493
494 client_fds->read_fd = cfds[0];
495 client_fds->write_fd = cfds[1];
496 server_fds->read_fd = sfds[0];
497 server_fds->write_fd = sfds[1];
498 return 0;
499 }
500
501 static const char* read_strategy_usage =
502 "Strategy for doing reads, which is one of:\n"
503 " blocking: blocking read calls\n"
504 " same_thread_poll: poll() call on same thread \n"
505 #ifdef __linux__
506 " same_thread_epoll: epoll_wait() on same thread \n"
507 #endif
508 " spin_read: spinning non-blocking read() calls \n"
509 " spin_poll: spinning 0 timeout poll() calls \n"
510 #ifdef __linux__
511 " spin_epoll: spinning 0 timeout epoll_wait() calls \n"
512 #endif
513 "";
514
515 static const char* socket_type_usage =
516 "Type of socket used, one of:\n"
517 " tcp: fds are endpoints of a TCP connection\n"
518 " socketpair: fds come from socketpair()\n"
519 " pipe: fds come from pipe()\n";
520
print_usage(char * argv0)521 void print_usage(char* argv0) {
522 fprintf(stderr, "%s usage:\n\n", argv0);
523 fprintf(stderr, "%s read_strategy socket_type msg_size\n\n", argv0);
524 fprintf(stderr, "where read_strategy is one of:\n");
525 fprintf(stderr, " blocking: blocking read calls\n");
526 fprintf(stderr, " same_thread_poll: poll() call on same thread \n");
527 #ifdef __linux__
528 fprintf(stderr, " same_thread_epoll: epoll_wait() on same thread \n");
529 #endif
530 fprintf(stderr, " spin_read: spinning non-blocking read() calls \n");
531 fprintf(stderr, " spin_poll: spinning 0 timeout poll() calls \n");
532 #ifdef __linux__
533 fprintf(stderr, " spin_epoll: spinning 0 timeout epoll_wait() calls \n");
534 #endif
535 fprintf(stderr, "and socket_type is one of:\n");
536 fprintf(stderr, " tcp: fds are endpoints of a TCP connection\n");
537 fprintf(stderr, " socketpair: fds come from socketpair()\n");
538 fprintf(stderr, " pipe: fds come from pipe()\n");
539 fflush(stderr);
540 }
541
542 typedef struct test_strategy {
543 const char* name;
544 int (*read_strategy)(struct thread_args* args, char* buf);
545 int (*setup)(struct thread_args* args);
546 } test_strategy;
547
548 static test_strategy test_strategies[] = {
549 {"blocking", blocking_read_bytes, do_nothing},
550 {"same_thread_poll", poll_read_bytes_blocking, set_socket_nonblocking},
551 #ifdef __linux__
552 {"same_thread_epoll", epoll_read_bytes_blocking, epoll_setup},
553 {"spin_epoll", epoll_read_bytes_spin, epoll_setup},
554 #endif /* __linux__ */
555 {"spin_read", spin_read_bytes, set_socket_nonblocking},
556 {"spin_poll", poll_read_bytes_spin, set_socket_nonblocking}};
557
558 static const char* socket_types[] = {"tcp", "socketpair", "pipe"};
559
create_socket(const char * socket_type,fd_pair * client_fds,fd_pair * server_fds)560 int create_socket(const char* socket_type, fd_pair* client_fds,
561 fd_pair* server_fds) {
562 if (strcmp(socket_type, "tcp") == 0) {
563 create_sockets_tcp(client_fds, server_fds);
564 } else if (strcmp(socket_type, "socketpair") == 0) {
565 create_sockets_socketpair(client_fds, server_fds);
566 } else if (strcmp(socket_type, "pipe") == 0) {
567 create_sockets_pipe(client_fds, server_fds);
568 } else {
569 fprintf(stderr, "Invalid socket type %s\n", socket_type);
570 fflush(stderr);
571 return -1;
572 }
573 return 0;
574 }
575
run_benchmark(const char * socket_type,thread_args * client_args,thread_args * server_args)576 static int run_benchmark(const char* socket_type, thread_args* client_args,
577 thread_args* server_args) {
578 int rv = 0;
579
580 rv = create_socket(socket_type, &client_args->fds, &server_args->fds);
581 if (rv < 0) {
582 return rv;
583 }
584
585 gpr_log(GPR_INFO, "Starting test %s %s %zu", client_args->strategy_name,
586 socket_type, client_args->msg_size);
587
588 grpc_core::Thread server("server_thread", server_thread_wrap, server_args);
589 server.Start();
590 client_thread(client_args);
591 server.Join();
592
593 return 0;
594 }
595
run_all_benchmarks(size_t msg_size)596 static int run_all_benchmarks(size_t msg_size) {
597 int error = 0;
598 size_t i;
599 for (i = 0; i < GPR_ARRAY_SIZE(test_strategies); ++i) {
600 test_strategy* strategy = &test_strategies[i];
601 size_t j;
602 for (j = 0; j < GPR_ARRAY_SIZE(socket_types); ++j) {
603 thread_args* client_args =
604 static_cast<thread_args*>(gpr_malloc(sizeof(thread_args)));
605 thread_args* server_args =
606 static_cast<thread_args*>(gpr_malloc(sizeof(thread_args)));
607 const char* socket_type = socket_types[j];
608
609 client_args->read_bytes = strategy->read_strategy;
610 client_args->write_bytes = blocking_write_bytes;
611 client_args->setup = strategy->setup;
612 client_args->msg_size = msg_size;
613 client_args->strategy_name = strategy->name;
614 server_args->read_bytes = strategy->read_strategy;
615 server_args->write_bytes = blocking_write_bytes;
616 server_args->setup = strategy->setup;
617 server_args->msg_size = msg_size;
618 server_args->strategy_name = strategy->name;
619 error = run_benchmark(socket_type, client_args, server_args);
620 if (error < 0) {
621 return error;
622 }
623 }
624 }
625 return error;
626 }
627
main(int argc,char ** argv)628 int main(int argc, char** argv) {
629 thread_args* client_args =
630 static_cast<thread_args*>(gpr_malloc(sizeof(thread_args)));
631 thread_args* server_args =
632 static_cast<thread_args*>(gpr_malloc(sizeof(thread_args)));
633 int msg_size = -1;
634 const char* read_strategy = nullptr;
635 const char* socket_type = nullptr;
636 size_t i;
637 const test_strategy* strategy = nullptr;
638 int error = 0;
639
640 gpr_cmdline* cmdline =
641 gpr_cmdline_create("low_level_ping_pong network benchmarking tool");
642
643 gpr_cmdline_add_int(cmdline, "msg_size", "Size of sent messages", &msg_size);
644 gpr_cmdline_add_string(cmdline, "read_strategy", read_strategy_usage,
645 &read_strategy);
646 gpr_cmdline_add_string(cmdline, "socket_type", socket_type_usage,
647 &socket_type);
648
649 gpr_cmdline_parse(cmdline, argc, argv);
650
651 if (msg_size == -1) {
652 msg_size = 50;
653 }
654
655 if (read_strategy == nullptr) {
656 gpr_log(GPR_INFO, "No strategy specified, running all benchmarks");
657 return run_all_benchmarks(static_cast<size_t>(msg_size));
658 }
659
660 if (socket_type == nullptr) {
661 socket_type = "tcp";
662 }
663 if (msg_size <= 0) {
664 fprintf(stderr, "msg_size must be > 0\n");
665 fflush(stderr);
666 print_usage(argv[0]);
667 return -1;
668 }
669
670 for (i = 0; i < GPR_ARRAY_SIZE(test_strategies); ++i) {
671 if (strcmp(test_strategies[i].name, read_strategy) == 0) {
672 strategy = &test_strategies[i];
673 }
674 }
675 if (strategy == nullptr) {
676 fprintf(stderr, "Invalid read strategy %s\n", read_strategy);
677 fflush(stderr);
678 return -1;
679 }
680
681 client_args->read_bytes = strategy->read_strategy;
682 client_args->write_bytes = blocking_write_bytes;
683 client_args->setup = strategy->setup;
684 client_args->msg_size = static_cast<size_t>(msg_size);
685 client_args->strategy_name = read_strategy;
686 server_args->read_bytes = strategy->read_strategy;
687 server_args->write_bytes = blocking_write_bytes;
688 server_args->setup = strategy->setup;
689 server_args->msg_size = static_cast<size_t>(msg_size);
690 server_args->strategy_name = read_strategy;
691
692 error = run_benchmark(socket_type, client_args, server_args);
693
694 gpr_cmdline_destroy(cmdline);
695 return error;
696 }
697