• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 /*
20    Basic I/O ping-pong benchmarks.
21 
22    The goal here is to establish lower bounds on how fast the stack could get by
23    measuring the cost of using various I/O strategies to do a basic
24    request-response loop.
25  */
26 
27 #include <errno.h>
28 #include <netinet/ip.h>
29 #include <poll.h>
30 #include <stdio.h>
31 #include <string.h>
32 #ifdef __linux__
33 #include <sys/epoll.h>
34 #endif
35 #include <sys/socket.h>
36 
37 #include <grpc/support/alloc.h>
38 #include <grpc/support/log.h>
39 #include <grpc/support/time.h>
40 
41 #include "src/core/lib/gpr/useful.h"
42 #include "src/core/lib/gprpp/thd.h"
43 #include "src/core/lib/iomgr/error.h"
44 #include "src/core/lib/iomgr/socket_utils_posix.h"
45 #include "test/core/util/cmdline.h"
46 #include "test/core/util/histogram.h"
47 
48 typedef struct fd_pair {
49   int read_fd;
50   int write_fd;
51 } fd_pair;
52 
53 typedef struct thread_args {
54   fd_pair fds;
55   size_t msg_size;
56   int (*read_bytes)(struct thread_args* args, char* buf);
57   int (*write_bytes)(struct thread_args* args, char* buf);
58   int (*setup)(struct thread_args* args);
59   int epoll_fd;
60   const char* strategy_name;
61 } thread_args;
62 
63 /*
64    Read strategies
65 
66    There are a number of read strategies, each of which has a blocking and
67    non-blocking version.
68  */
69 
70 /* Basic call to read() */
read_bytes(int fd,char * buf,size_t read_size,int spin)71 static int read_bytes(int fd, char* buf, size_t read_size, int spin) {
72   size_t bytes_read = 0;
73   ssize_t err;
74   do {
75     err = read(fd, buf + bytes_read, read_size - bytes_read);
76     if (err < 0) {
77       if (errno == EINTR) {
78         continue;
79       } else {
80         if (errno == EAGAIN && spin == 1) {
81           continue;
82         }
83         gpr_log(GPR_ERROR, "Read failed: %s", strerror(errno));
84         return -1;
85       }
86     } else {
87       bytes_read += static_cast<size_t>(err);
88     }
89   } while (bytes_read < read_size);
90   return 0;
91 }
92 
blocking_read_bytes(thread_args * args,char * buf)93 static int blocking_read_bytes(thread_args* args, char* buf) {
94   return read_bytes(args->fds.read_fd, buf, args->msg_size, 0);
95 }
96 
spin_read_bytes(thread_args * args,char * buf)97 static int spin_read_bytes(thread_args* args, char* buf) {
98   return read_bytes(args->fds.read_fd, buf, args->msg_size, 1);
99 }
100 
101 /* Call poll() to monitor a non-blocking fd */
poll_read_bytes(int fd,char * buf,size_t read_size,int spin)102 static int poll_read_bytes(int fd, char* buf, size_t read_size, int spin) {
103   struct pollfd pfd;
104   size_t bytes_read = 0;
105   int err;
106   ssize_t err2;
107 
108   pfd.fd = fd;
109   pfd.events = POLLIN;
110   do {
111     err = poll(&pfd, 1, spin ? 0 : -1);
112     if (err < 0) {
113       if (errno == EINTR) {
114         continue;
115       } else {
116         gpr_log(GPR_ERROR, "Poll failed: %s", strerror(errno));
117         return -1;
118       }
119     }
120     if (err == 0 && spin) continue;
121     GPR_ASSERT(err == 1);
122     GPR_ASSERT(pfd.revents == POLLIN);
123     do {
124       err2 = read(fd, buf + bytes_read, read_size - bytes_read);
125     } while (err2 < 0 && errno == EINTR);
126     if (err2 < 0 && errno != EAGAIN) {
127       gpr_log(GPR_ERROR, "Read failed: %s", strerror(errno));
128       return -1;
129     }
130     bytes_read += static_cast<size_t>(err2);
131   } while (bytes_read < read_size);
132   return 0;
133 }
134 
poll_read_bytes_blocking(struct thread_args * args,char * buf)135 static int poll_read_bytes_blocking(struct thread_args* args, char* buf) {
136   return poll_read_bytes(args->fds.read_fd, buf, args->msg_size, 0);
137 }
138 
poll_read_bytes_spin(struct thread_args * args,char * buf)139 static int poll_read_bytes_spin(struct thread_args* args, char* buf) {
140   return poll_read_bytes(args->fds.read_fd, buf, args->msg_size, 1);
141 }
142 
143 #ifdef __linux__
144 /* Call epoll_wait() to monitor a non-blocking fd */
epoll_read_bytes(struct thread_args * args,char * buf,int spin)145 static int epoll_read_bytes(struct thread_args* args, char* buf, int spin) {
146   struct epoll_event ev;
147   size_t bytes_read = 0;
148   int err;
149   ssize_t err2;
150   size_t read_size = args->msg_size;
151 
152   do {
153     err = epoll_wait(args->epoll_fd, &ev, 1, spin ? 0 : -1);
154     if (err < 0) {
155       if (errno == EINTR) continue;
156       gpr_log(GPR_ERROR, "epoll_wait failed: %s", strerror(errno));
157       return -1;
158     }
159     if (err == 0 && spin) continue;
160     GPR_ASSERT(err == 1);
161     GPR_ASSERT(ev.events & EPOLLIN);
162     GPR_ASSERT(ev.data.fd == args->fds.read_fd);
163     do {
164       do {
165         err2 =
166             read(args->fds.read_fd, buf + bytes_read, read_size - bytes_read);
167       } while (err2 < 0 && errno == EINTR);
168       if (errno == EAGAIN) break;
169       bytes_read += static_cast<size_t>(err2);
170       /* TODO(klempner): This should really be doing an extra call after we are
171          done to ensure we see an EAGAIN */
172     } while (bytes_read < read_size);
173   } while (bytes_read < read_size);
174   GPR_ASSERT(bytes_read == read_size);
175   return 0;
176 }
177 
epoll_read_bytes_blocking(struct thread_args * args,char * buf)178 static int epoll_read_bytes_blocking(struct thread_args* args, char* buf) {
179   return epoll_read_bytes(args, buf, 0);
180 }
181 
epoll_read_bytes_spin(struct thread_args * args,char * buf)182 static int epoll_read_bytes_spin(struct thread_args* args, char* buf) {
183   return epoll_read_bytes(args, buf, 1);
184 }
185 #endif /* __linux__ */
186 
187 /* Write out bytes.
188    At this point we only have one strategy, since in the common case these
189    writes go directly out to the kernel.
190  */
blocking_write_bytes(struct thread_args * args,char * buf)191 static int blocking_write_bytes(struct thread_args* args, char* buf) {
192   size_t bytes_written = 0;
193   ssize_t err;
194   size_t write_size = args->msg_size;
195   do {
196     err = write(args->fds.write_fd, buf + bytes_written,
197                 write_size - bytes_written);
198     if (err < 0) {
199       if (errno == EINTR) {
200         continue;
201       } else {
202         gpr_log(GPR_ERROR, "Read failed: %s", strerror(errno));
203         return -1;
204       }
205     } else {
206       bytes_written += static_cast<size_t>(err);
207     }
208   } while (bytes_written < write_size);
209   return 0;
210 }
211 
212 /*
213    Initialization code
214 
215    These are called at the beginning of the client and server thread, depending
216    on the scenario we're using.
217  */
set_socket_nonblocking(thread_args * args)218 static int set_socket_nonblocking(thread_args* args) {
219   if (!GRPC_LOG_IF_ERROR("Unable to set read socket nonblocking",
220                          grpc_set_socket_nonblocking(args->fds.read_fd, 1))) {
221     return -1;
222   }
223   if (!GRPC_LOG_IF_ERROR("Unable to set write socket nonblocking",
224                          grpc_set_socket_nonblocking(args->fds.write_fd, 1))) {
225     return -1;
226   }
227   return 0;
228 }
229 
do_nothing(thread_args * args)230 static int do_nothing(thread_args* args) { return 0; }
231 
232 #ifdef __linux__
233 /* Special case for epoll, where we need to create the fd ahead of time. */
epoll_setup(thread_args * args)234 static int epoll_setup(thread_args* args) {
235   int epoll_fd;
236   struct epoll_event ev;
237   set_socket_nonblocking(args);
238   epoll_fd = epoll_create(1);
239   if (epoll_fd < 0) {
240     gpr_log(GPR_ERROR, "epoll_create: %s", strerror(errno));
241     return -1;
242   }
243 
244   args->epoll_fd = epoll_fd;
245 
246   ev.events = EPOLLIN | EPOLLET;
247   ev.data.fd = args->fds.read_fd;
248   if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, args->fds.read_fd, &ev) < 0) {
249     gpr_log(GPR_ERROR, "epoll_ctl: %s", strerror(errno));
250   }
251   return 0;
252 }
253 #endif
254 
server_thread(thread_args * args)255 static void server_thread(thread_args* args) {
256   char* buf = static_cast<char*>(gpr_malloc(args->msg_size));
257   if (args->setup(args) < 0) {
258     gpr_log(GPR_ERROR, "Setup failed");
259   }
260   for (;;) {
261     if (args->read_bytes(args, buf) < 0) {
262       gpr_log(GPR_ERROR, "Server read failed");
263       gpr_free(buf);
264       return;
265     }
266     if (args->write_bytes(args, buf) < 0) {
267       gpr_log(GPR_ERROR, "Server write failed");
268       gpr_free(buf);
269       return;
270     }
271   }
272 }
273 
server_thread_wrap(void * arg)274 static void server_thread_wrap(void* arg) {
275   thread_args* args = static_cast<thread_args*>(arg);
276   server_thread(args);
277 }
278 
print_histogram(grpc_histogram * histogram)279 static void print_histogram(grpc_histogram* histogram) {
280   /* TODO(klempner): Print more detailed information, such as detailed histogram
281      buckets */
282   gpr_log(GPR_INFO, "latency (50/95/99/99.9): %f/%f/%f/%f",
283           grpc_histogram_percentile(histogram, 50),
284           grpc_histogram_percentile(histogram, 95),
285           grpc_histogram_percentile(histogram, 99),
286           grpc_histogram_percentile(histogram, 99.9));
287 }
288 
now(void)289 static double now(void) {
290   gpr_timespec tv = gpr_now(GPR_CLOCK_REALTIME);
291   return 1e9 * static_cast<double>(tv.tv_sec) + static_cast<double>(tv.tv_nsec);
292 }
293 
client_thread(thread_args * args)294 static void client_thread(thread_args* args) {
295   char* buf = static_cast<char*>(gpr_malloc(args->msg_size * sizeof(char)));
296   memset(buf, 0, args->msg_size * sizeof(char));
297   grpc_histogram* histogram = grpc_histogram_create(0.01, 60e9);
298   double start_time;
299   double end_time;
300   double interval;
301   const int kNumIters = 100000;
302   int i;
303 
304   if (args->setup(args) < 0) {
305     gpr_log(GPR_ERROR, "Setup failed");
306   }
307   for (i = 0; i < kNumIters; ++i) {
308     start_time = now();
309     if (args->write_bytes(args, buf) < 0) {
310       gpr_log(GPR_ERROR, "Client write failed");
311       goto error;
312     }
313     if (args->read_bytes(args, buf) < 0) {
314       gpr_log(GPR_ERROR, "Client read failed");
315       goto error;
316     }
317     end_time = now();
318     if (i > kNumIters / 2) {
319       interval = end_time - start_time;
320       grpc_histogram_add(histogram, interval);
321     }
322   }
323   print_histogram(histogram);
324 error:
325   gpr_free(buf);
326   grpc_histogram_destroy(histogram);
327 }
328 
329 /* This roughly matches tcp_server's create_listening_socket */
create_listening_socket(struct sockaddr * port,socklen_t len)330 static int create_listening_socket(struct sockaddr* port, socklen_t len) {
331   int fd = socket(port->sa_family, SOCK_STREAM, 0);
332   if (fd < 0) {
333     gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno));
334     goto error;
335   }
336 
337   if (!GRPC_LOG_IF_ERROR("Failed to set listening socket cloexec",
338                          grpc_set_socket_cloexec(fd, 1))) {
339     goto error;
340   }
341   if (!GRPC_LOG_IF_ERROR("Failed to set listening socket low latency",
342                          grpc_set_socket_low_latency(fd, 1))) {
343     goto error;
344   }
345   if (!GRPC_LOG_IF_ERROR("Failed to set listening socket reuse addr",
346                          grpc_set_socket_reuse_addr(fd, 1))) {
347     goto error;
348   }
349 
350   if (bind(fd, port, len) < 0) {
351     gpr_log(GPR_ERROR, "bind: %s", strerror(errno));
352     goto error;
353   }
354 
355   if (listen(fd, 1) < 0) {
356     gpr_log(GPR_ERROR, "listen: %s", strerror(errno));
357     goto error;
358   }
359 
360   if (getsockname(fd, port, &len) < 0) {
361     gpr_log(GPR_ERROR, "getsockname: %s", strerror(errno));
362     goto error;
363   }
364 
365   return fd;
366 
367 error:
368   if (fd >= 0) {
369     close(fd);
370   }
371   return -1;
372 }
373 
connect_client(struct sockaddr * addr,socklen_t len)374 static int connect_client(struct sockaddr* addr, socklen_t len) {
375   int fd = socket(addr->sa_family, SOCK_STREAM, 0);
376   int err;
377   if (fd < 0) {
378     gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno));
379     goto error;
380   }
381 
382   if (!GRPC_LOG_IF_ERROR("Failed to set connecting socket cloexec",
383                          grpc_set_socket_cloexec(fd, 1))) {
384     goto error;
385   }
386   if (!GRPC_LOG_IF_ERROR("Failed to set connecting socket low latency",
387                          grpc_set_socket_low_latency(fd, 1))) {
388     goto error;
389   }
390 
391   do {
392     err = connect(fd, addr, len);
393   } while (err < 0 && errno == EINTR);
394 
395   if (err < 0) {
396     gpr_log(GPR_ERROR, "connect error: %s", strerror(errno));
397     goto error;
398   }
399   return fd;
400 
401 error:
402   if (fd >= 0) {
403     close(fd);
404   }
405   return -1;
406 }
407 
accept_server(int listen_fd)408 static int accept_server(int listen_fd) {
409   int fd = accept(listen_fd, nullptr, nullptr);
410   if (fd < 0) {
411     gpr_log(GPR_ERROR, "Accept failed: %s", strerror(errno));
412     return -1;
413   }
414   return fd;
415 }
416 
create_sockets_tcp(fd_pair * client_fds,fd_pair * server_fds)417 static int create_sockets_tcp(fd_pair* client_fds, fd_pair* server_fds) {
418   int listen_fd = -1;
419   int client_fd = -1;
420   int server_fd = -1;
421 
422   struct sockaddr_in port;
423   struct sockaddr* sa_port = reinterpret_cast<struct sockaddr*>(&port);
424 
425   port.sin_family = AF_INET;
426   port.sin_port = 0;
427   port.sin_addr.s_addr = INADDR_ANY;
428 
429   listen_fd = create_listening_socket(sa_port, sizeof(port));
430   if (listen_fd == -1) {
431     gpr_log(GPR_ERROR, "Listen failed");
432     goto error;
433   }
434 
435   client_fd = connect_client(sa_port, sizeof(port));
436   if (client_fd == -1) {
437     gpr_log(GPR_ERROR, "Connect failed");
438     goto error;
439   }
440 
441   server_fd = accept_server(listen_fd);
442   if (server_fd == -1) {
443     gpr_log(GPR_ERROR, "Accept failed");
444     goto error;
445   }
446 
447   client_fds->read_fd = client_fd;
448   client_fds->write_fd = client_fd;
449   server_fds->read_fd = server_fd;
450   server_fds->write_fd = server_fd;
451   close(listen_fd);
452   return 0;
453 
454 error:
455   if (listen_fd != -1) {
456     close(listen_fd);
457   }
458   if (client_fd != -1) {
459     close(client_fd);
460   }
461   if (server_fd != -1) {
462     close(server_fd);
463   }
464   return -1;
465 }
466 
create_sockets_socketpair(fd_pair * client_fds,fd_pair * server_fds)467 static int create_sockets_socketpair(fd_pair* client_fds, fd_pair* server_fds) {
468   int fds[2];
469   if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0) {
470     gpr_log(GPR_ERROR, "socketpair: %s", strerror(errno));
471     return -1;
472   }
473 
474   client_fds->read_fd = fds[0];
475   client_fds->write_fd = fds[0];
476   server_fds->read_fd = fds[1];
477   server_fds->write_fd = fds[1];
478   return 0;
479 }
480 
create_sockets_pipe(fd_pair * client_fds,fd_pair * server_fds)481 static int create_sockets_pipe(fd_pair* client_fds, fd_pair* server_fds) {
482   int cfds[2];
483   int sfds[2];
484   if (pipe(cfds) < 0) {
485     gpr_log(GPR_ERROR, "pipe: %s", strerror(errno));
486     return -1;
487   }
488 
489   if (pipe(sfds) < 0) {
490     gpr_log(GPR_ERROR, "pipe: %s", strerror(errno));
491     return -1;
492   }
493 
494   client_fds->read_fd = cfds[0];
495   client_fds->write_fd = cfds[1];
496   server_fds->read_fd = sfds[0];
497   server_fds->write_fd = sfds[1];
498   return 0;
499 }
500 
501 static const char* read_strategy_usage =
502     "Strategy for doing reads, which is one of:\n"
503     "  blocking: blocking read calls\n"
504     "  same_thread_poll: poll() call on same thread \n"
505 #ifdef __linux__
506     "  same_thread_epoll: epoll_wait() on same thread \n"
507 #endif
508     "  spin_read: spinning non-blocking read() calls \n"
509     "  spin_poll: spinning 0 timeout poll() calls \n"
510 #ifdef __linux__
511     "  spin_epoll: spinning 0 timeout epoll_wait() calls \n"
512 #endif
513     "";
514 
515 static const char* socket_type_usage =
516     "Type of socket used, one of:\n"
517     "  tcp: fds are endpoints of a TCP connection\n"
518     "  socketpair: fds come from socketpair()\n"
519     "  pipe: fds come from pipe()\n";
520 
print_usage(char * argv0)521 void print_usage(char* argv0) {
522   fprintf(stderr, "%s usage:\n\n", argv0);
523   fprintf(stderr, "%s read_strategy socket_type msg_size\n\n", argv0);
524   fprintf(stderr, "where read_strategy is one of:\n");
525   fprintf(stderr, "  blocking: blocking read calls\n");
526   fprintf(stderr, "  same_thread_poll: poll() call on same thread \n");
527 #ifdef __linux__
528   fprintf(stderr, "  same_thread_epoll: epoll_wait() on same thread \n");
529 #endif
530   fprintf(stderr, "  spin_read: spinning non-blocking read() calls \n");
531   fprintf(stderr, "  spin_poll: spinning 0 timeout poll() calls \n");
532 #ifdef __linux__
533   fprintf(stderr, "  spin_epoll: spinning 0 timeout epoll_wait() calls \n");
534 #endif
535   fprintf(stderr, "and socket_type is one of:\n");
536   fprintf(stderr, "  tcp: fds are endpoints of a TCP connection\n");
537   fprintf(stderr, "  socketpair: fds come from socketpair()\n");
538   fprintf(stderr, "  pipe: fds come from pipe()\n");
539   fflush(stderr);
540 }
541 
542 typedef struct test_strategy {
543   const char* name;
544   int (*read_strategy)(struct thread_args* args, char* buf);
545   int (*setup)(struct thread_args* args);
546 } test_strategy;
547 
548 static test_strategy test_strategies[] = {
549     {"blocking", blocking_read_bytes, do_nothing},
550     {"same_thread_poll", poll_read_bytes_blocking, set_socket_nonblocking},
551 #ifdef __linux__
552     {"same_thread_epoll", epoll_read_bytes_blocking, epoll_setup},
553     {"spin_epoll", epoll_read_bytes_spin, epoll_setup},
554 #endif /* __linux__ */
555     {"spin_read", spin_read_bytes, set_socket_nonblocking},
556     {"spin_poll", poll_read_bytes_spin, set_socket_nonblocking}};
557 
558 static const char* socket_types[] = {"tcp", "socketpair", "pipe"};
559 
create_socket(const char * socket_type,fd_pair * client_fds,fd_pair * server_fds)560 int create_socket(const char* socket_type, fd_pair* client_fds,
561                   fd_pair* server_fds) {
562   if (strcmp(socket_type, "tcp") == 0) {
563     create_sockets_tcp(client_fds, server_fds);
564   } else if (strcmp(socket_type, "socketpair") == 0) {
565     create_sockets_socketpair(client_fds, server_fds);
566   } else if (strcmp(socket_type, "pipe") == 0) {
567     create_sockets_pipe(client_fds, server_fds);
568   } else {
569     fprintf(stderr, "Invalid socket type %s\n", socket_type);
570     fflush(stderr);
571     return -1;
572   }
573   return 0;
574 }
575 
run_benchmark(const char * socket_type,thread_args * client_args,thread_args * server_args)576 static int run_benchmark(const char* socket_type, thread_args* client_args,
577                          thread_args* server_args) {
578   int rv = 0;
579 
580   rv = create_socket(socket_type, &client_args->fds, &server_args->fds);
581   if (rv < 0) {
582     return rv;
583   }
584 
585   gpr_log(GPR_INFO, "Starting test %s %s %zu", client_args->strategy_name,
586           socket_type, client_args->msg_size);
587 
588   grpc_core::Thread server("server_thread", server_thread_wrap, server_args);
589   server.Start();
590   client_thread(client_args);
591   server.Join();
592 
593   return 0;
594 }
595 
run_all_benchmarks(size_t msg_size)596 static int run_all_benchmarks(size_t msg_size) {
597   int error = 0;
598   size_t i;
599   for (i = 0; i < GPR_ARRAY_SIZE(test_strategies); ++i) {
600     test_strategy* strategy = &test_strategies[i];
601     size_t j;
602     for (j = 0; j < GPR_ARRAY_SIZE(socket_types); ++j) {
603       thread_args* client_args =
604           static_cast<thread_args*>(gpr_malloc(sizeof(thread_args)));
605       thread_args* server_args =
606           static_cast<thread_args*>(gpr_malloc(sizeof(thread_args)));
607       const char* socket_type = socket_types[j];
608 
609       client_args->read_bytes = strategy->read_strategy;
610       client_args->write_bytes = blocking_write_bytes;
611       client_args->setup = strategy->setup;
612       client_args->msg_size = msg_size;
613       client_args->strategy_name = strategy->name;
614       server_args->read_bytes = strategy->read_strategy;
615       server_args->write_bytes = blocking_write_bytes;
616       server_args->setup = strategy->setup;
617       server_args->msg_size = msg_size;
618       server_args->strategy_name = strategy->name;
619       error = run_benchmark(socket_type, client_args, server_args);
620       if (error < 0) {
621         return error;
622       }
623     }
624   }
625   return error;
626 }
627 
main(int argc,char ** argv)628 int main(int argc, char** argv) {
629   thread_args* client_args =
630       static_cast<thread_args*>(gpr_malloc(sizeof(thread_args)));
631   thread_args* server_args =
632       static_cast<thread_args*>(gpr_malloc(sizeof(thread_args)));
633   int msg_size = -1;
634   const char* read_strategy = nullptr;
635   const char* socket_type = nullptr;
636   size_t i;
637   const test_strategy* strategy = nullptr;
638   int error = 0;
639 
640   gpr_cmdline* cmdline =
641       gpr_cmdline_create("low_level_ping_pong network benchmarking tool");
642 
643   gpr_cmdline_add_int(cmdline, "msg_size", "Size of sent messages", &msg_size);
644   gpr_cmdline_add_string(cmdline, "read_strategy", read_strategy_usage,
645                          &read_strategy);
646   gpr_cmdline_add_string(cmdline, "socket_type", socket_type_usage,
647                          &socket_type);
648 
649   gpr_cmdline_parse(cmdline, argc, argv);
650 
651   if (msg_size == -1) {
652     msg_size = 50;
653   }
654 
655   if (read_strategy == nullptr) {
656     gpr_log(GPR_INFO, "No strategy specified, running all benchmarks");
657     return run_all_benchmarks(static_cast<size_t>(msg_size));
658   }
659 
660   if (socket_type == nullptr) {
661     socket_type = "tcp";
662   }
663   if (msg_size <= 0) {
664     fprintf(stderr, "msg_size must be > 0\n");
665     fflush(stderr);
666     print_usage(argv[0]);
667     return -1;
668   }
669 
670   for (i = 0; i < GPR_ARRAY_SIZE(test_strategies); ++i) {
671     if (strcmp(test_strategies[i].name, read_strategy) == 0) {
672       strategy = &test_strategies[i];
673     }
674   }
675   if (strategy == nullptr) {
676     fprintf(stderr, "Invalid read strategy %s\n", read_strategy);
677     fflush(stderr);
678     return -1;
679   }
680 
681   client_args->read_bytes = strategy->read_strategy;
682   client_args->write_bytes = blocking_write_bytes;
683   client_args->setup = strategy->setup;
684   client_args->msg_size = static_cast<size_t>(msg_size);
685   client_args->strategy_name = read_strategy;
686   server_args->read_bytes = strategy->read_strategy;
687   server_args->write_bytes = blocking_write_bytes;
688   server_args->setup = strategy->setup;
689   server_args->msg_size = static_cast<size_t>(msg_size);
690   server_args->strategy_name = read_strategy;
691 
692   error = run_benchmark(socket_type, client_args, server_args);
693 
694   gpr_cmdline_destroy(cmdline);
695   return error;
696 }
697