• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <gtest/gtest.h>
20 
21 #include "src/core/lib/iomgr/port.h"
22 #include "test/core/test_util/test_config.h"
23 
24 // This test won't work except with posix sockets enabled
25 #ifdef GRPC_POSIX_SOCKET_EV
26 
27 #include <ctype.h>
28 #include <errno.h>
29 #include <fcntl.h>
30 #include <grpc/grpc.h>
31 #include <grpc/support/alloc.h>
32 #include <grpc/support/sync.h>
33 #include <grpc/support/time.h>
34 #include <netinet/in.h>
35 #include <poll.h>
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <string.h>
39 #include <sys/socket.h>
40 #include <sys/time.h>
41 #include <unistd.h>
42 
43 #include "absl/log/log.h"
44 #include "absl/strings/str_format.h"
45 #include "src/core/lib/iomgr/ev_posix.h"
46 #include "src/core/lib/iomgr/iomgr.h"
47 #include "src/core/lib/iomgr/socket_utils_posix.h"
48 #include "src/core/util/crash.h"
49 #include "src/core/util/strerror.h"
50 
51 static gpr_mu* g_mu;
52 static grpc_pollset* g_pollset;
53 
54 // buffer size used to send and receive data.
55 // 1024 is the minimal value to set TCP send and receive buffer.
56 #define BUF_SIZE 1024
57 
58 // Create a test socket with the right properties for testing.
59 // port is the TCP port to listen or connect to.
60 // Return a socket FD and sockaddr_in.
create_test_socket(int port,int * socket_fd,struct sockaddr_in * sin)61 static void create_test_socket(int port, int* socket_fd,
62                                struct sockaddr_in* sin) {
63   int fd;
64   int one = 1;
65   int buffer_size_bytes = BUF_SIZE;
66   int flags;
67 
68   fd = socket(AF_INET, SOCK_STREAM, 0);
69   setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
70   // Reset the size of socket send buffer to the minimal value to facilitate
71   // buffer filling up and triggering notify_on_write
72   ASSERT_EQ(grpc_set_socket_sndbuf(fd, buffer_size_bytes), absl::OkStatus());
73   ASSERT_EQ(grpc_set_socket_rcvbuf(fd, buffer_size_bytes), absl::OkStatus());
74   // Make fd non-blocking
75   flags = fcntl(fd, F_GETFL, 0);
76   ASSERT_EQ(fcntl(fd, F_SETFL, flags | O_NONBLOCK), 0);
77   *socket_fd = fd;
78 
79   // Use local address for test
80   sin->sin_family = AF_INET;
81   sin->sin_addr.s_addr = htonl(0x7f000001);
82   ASSERT_GE(port, 0);
83   ASSERT_LT(port, 65536);
84   sin->sin_port = htons(static_cast<uint16_t>(port));
85 }
86 
87 // Phony gRPC callback
no_op_cb(void *,int)88 void no_op_cb(void* /*arg*/, int /*success*/) {}
89 
90 // =======An upload server to test notify_on_read===========
91 // The server simply reads and counts a stream of bytes.
92 
93 // An upload server.
94 typedef struct {
95   grpc_fd* em_fd;            // listening fd
96   ssize_t read_bytes_total;  // total number of received bytes
97   int done;                  // set to 1 when a server finishes serving
98   grpc_closure listen_closure;
99 } server;
100 
server_init(server * sv)101 static void server_init(server* sv) {
102   sv->read_bytes_total = 0;
103   sv->done = 0;
104 }
105 
106 // An upload session.
107 // Created when a new upload request arrives in the server.
108 typedef struct {
109   server* sv;               // not owned by a single session
110   grpc_fd* em_fd;           // fd to read upload bytes
111   char read_buf[BUF_SIZE];  // buffer to store upload bytes
112   grpc_closure session_read_closure;
113 } session;
114 
115 // Called when an upload session can be safely shutdown.
116 // Close session FD and start to shutdown listen FD.
session_shutdown_cb(void * arg,bool)117 static void session_shutdown_cb(void* arg,  // session
118                                 bool /*success*/) {
119   session* se = static_cast<session*>(arg);
120   server* sv = se->sv;
121   grpc_fd_orphan(se->em_fd, nullptr, nullptr, "a");
122   gpr_free(se);
123   // Start to shutdown listen fd.
124   grpc_fd_shutdown(sv->em_fd, GRPC_ERROR_CREATE("session_shutdown_cb"));
125 }
126 
127 // Called when data become readable in a session.
session_read_cb(void * arg,grpc_error_handle error)128 static void session_read_cb(void* arg,  // session
129                             grpc_error_handle error) {
130   session* se = static_cast<session*>(arg);
131   int fd = grpc_fd_wrapped_fd(se->em_fd);
132 
133   ssize_t read_once = 0;
134   ssize_t read_total = 0;
135 
136   if (!error.ok()) {
137     session_shutdown_cb(arg, true);
138     return;
139   }
140 
141   do {
142     read_once = read(fd, se->read_buf, BUF_SIZE);
143     if (read_once > 0) read_total += read_once;
144   } while (read_once > 0);
145   se->sv->read_bytes_total += read_total;
146 
147   // read() returns 0 to indicate the TCP connection was closed by the client.
148   // read(fd, read_buf, 0) also returns 0 which should never be called as such.
149   // It is possible to read nothing due to spurious edge event or data has
150   // been drained, In such a case, read() returns -1 and set errno to EAGAIN.
151   if (read_once == 0) {
152     session_shutdown_cb(arg, true);
153   } else if (read_once == -1) {
154     if (errno == EAGAIN) {
155       // An edge triggered event is cached in the kernel until next poll.
156       // In the current single thread implementation, session_read_cb is called
157       // in the polling thread, such that polling only happens after this
158       // callback, and will catch read edge event if data is available again
159       // before notify_on_read.
160       // TODO(chenw): in multi-threaded version, callback and polling can be
161       // run in different threads. polling may catch a persist read edge event
162       // before notify_on_read is called.
163       grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure);
164     } else {
165       LOG(FATAL) << "Unhandled read error " << grpc_core::StrError(errno);
166     }
167   }
168 }
169 
170 // Called when the listen FD can be safely shutdown.
171 // Close listen FD and signal that server can be shutdown.
listen_shutdown_cb(void * arg,int)172 static void listen_shutdown_cb(void* arg /*server*/, int /*success*/) {
173   server* sv = static_cast<server*>(arg);
174 
175   grpc_fd_orphan(sv->em_fd, nullptr, nullptr, "b");
176 
177   gpr_mu_lock(g_mu);
178   sv->done = 1;
179   ASSERT_TRUE(
180       GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
181   gpr_mu_unlock(g_mu);
182 }
183 
184 // Called when a new TCP connection request arrives in the listening port.
listen_cb(void * arg,grpc_error_handle error)185 static void listen_cb(void* arg,  //=sv_arg
186                       grpc_error_handle error) {
187   server* sv = static_cast<server*>(arg);
188   int fd;
189   int flags;
190   session* se;
191   struct sockaddr_storage ss;
192   socklen_t slen = sizeof(ss);
193   grpc_fd* listen_em_fd = sv->em_fd;
194 
195   if (!error.ok()) {
196     listen_shutdown_cb(arg, 1);
197     return;
198   }
199 
200   fd = accept(grpc_fd_wrapped_fd(listen_em_fd),
201               reinterpret_cast<struct sockaddr*>(&ss), &slen);
202   ASSERT_GE(fd, 0);
203   ASSERT_LT(fd, FD_SETSIZE);
204   flags = fcntl(fd, F_GETFL, 0);
205   fcntl(fd, F_SETFL, flags | O_NONBLOCK);
206   se = static_cast<session*>(gpr_malloc(sizeof(*se)));
207   se->sv = sv;
208   se->em_fd = grpc_fd_create(fd, "listener", false);
209   grpc_pollset_add_fd(g_pollset, se->em_fd);
210   GRPC_CLOSURE_INIT(&se->session_read_closure, session_read_cb, se,
211                     grpc_schedule_on_exec_ctx);
212   grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure);
213 
214   grpc_fd_notify_on_read(listen_em_fd, &sv->listen_closure);
215 }
216 
217 // Max number of connections pending to be accepted by listen().
218 #define MAX_NUM_FD 1024
219 
220 // Start a test server, return the TCP listening port bound to listen_fd.
221 // listen_cb() is registered to be interested in reading from listen_fd.
222 // When connection request arrives, listen_cb() is called to accept the
223 // connection request.
server_start(server * sv)224 static int server_start(server* sv) {
225   int port = 0;
226   int fd;
227   struct sockaddr_in sin;
228   socklen_t addr_len;
229 
230   create_test_socket(port, &fd, &sin);
231   addr_len = sizeof(sin);
232   EXPECT_EQ(bind(fd, (struct sockaddr*)&sin, addr_len), 0);
233   EXPECT_EQ(getsockname(fd, (struct sockaddr*)&sin, &addr_len), 0);
234   port = ntohs(sin.sin_port);
235   EXPECT_EQ(listen(fd, MAX_NUM_FD), 0);
236 
237   sv->em_fd = grpc_fd_create(fd, "server", false);
238   grpc_pollset_add_fd(g_pollset, sv->em_fd);
239   // Register to be interested in reading from listen_fd.
240   GRPC_CLOSURE_INIT(&sv->listen_closure, listen_cb, sv,
241                     grpc_schedule_on_exec_ctx);
242   grpc_fd_notify_on_read(sv->em_fd, &sv->listen_closure);
243 
244   return port;
245 }
246 
247 // Wait and shutdown a sever.
server_wait_and_shutdown(server * sv)248 static void server_wait_and_shutdown(server* sv) {
249   gpr_mu_lock(g_mu);
250   while (!sv->done) {
251     grpc_core::ExecCtx exec_ctx;
252     grpc_pollset_worker* worker = nullptr;
253     ASSERT_TRUE(GRPC_LOG_IF_ERROR(
254         "pollset_work", grpc_pollset_work(g_pollset, &worker,
255                                           grpc_core::Timestamp::InfFuture())));
256     gpr_mu_unlock(g_mu);
257 
258     gpr_mu_lock(g_mu);
259   }
260   gpr_mu_unlock(g_mu);
261 }
262 
263 // ===An upload client to test notify_on_write===
264 
265 // Client write buffer size
266 #define CLIENT_WRITE_BUF_SIZE 10
267 // Total number of times that the client fills up the write buffer
268 #define CLIENT_TOTAL_WRITE_CNT 3
269 
270 // An upload client.
271 typedef struct {
272   grpc_fd* em_fd;
273   char write_buf[CLIENT_WRITE_BUF_SIZE];
274   ssize_t write_bytes_total;
275   // Number of times that the client fills up the write buffer and calls
276   // notify_on_write to schedule another write.
277   int client_write_cnt;
278 
279   int done;  // set to 1 when a client finishes sending
280   grpc_closure write_closure;
281 } client;
282 
client_init(client * cl)283 static void client_init(client* cl) {
284   memset(cl->write_buf, 0, sizeof(cl->write_buf));
285   cl->write_bytes_total = 0;
286   cl->client_write_cnt = 0;
287   cl->done = 0;
288 }
289 
290 // Called when a client upload session is ready to shutdown.
client_session_shutdown_cb(void * arg,int)291 static void client_session_shutdown_cb(void* arg /*client*/, int /*success*/) {
292   client* cl = static_cast<client*>(arg);
293   grpc_fd_orphan(cl->em_fd, nullptr, nullptr, "c");
294   cl->done = 1;
295   ASSERT_TRUE(
296       GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
297 }
298 
299 // Write as much as possible, then register notify_on_write.
client_session_write(void * arg,grpc_error_handle error)300 static void client_session_write(void* arg,  // client
301                                  grpc_error_handle error) {
302   client* cl = static_cast<client*>(arg);
303   int fd = grpc_fd_wrapped_fd(cl->em_fd);
304   ssize_t write_once = 0;
305 
306   if (!error.ok()) {
307     gpr_mu_lock(g_mu);
308     client_session_shutdown_cb(arg, 1);
309     gpr_mu_unlock(g_mu);
310     return;
311   }
312 
313   do {
314     write_once = write(fd, cl->write_buf, CLIENT_WRITE_BUF_SIZE);
315     if (write_once > 0) cl->write_bytes_total += write_once;
316   } while (write_once > 0);
317 
318   if (errno == EAGAIN) {
319     gpr_mu_lock(g_mu);
320     if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) {
321       GRPC_CLOSURE_INIT(&cl->write_closure, client_session_write, cl,
322                         grpc_schedule_on_exec_ctx);
323       grpc_fd_notify_on_write(cl->em_fd, &cl->write_closure);
324       cl->client_write_cnt++;
325     } else {
326       client_session_shutdown_cb(arg, 1);
327     }
328     gpr_mu_unlock(g_mu);
329   } else {
330     LOG(FATAL) << "unknown errno " << grpc_core::StrError(errno).c_str();
331   }
332 }
333 
334 // Start a client to send a stream of bytes.
client_start(client * cl,int port)335 static void client_start(client* cl, int port) {
336   int fd;
337   struct sockaddr_in sin;
338   create_test_socket(port, &fd, &sin);
339   if (connect(fd, reinterpret_cast<struct sockaddr*>(&sin), sizeof(sin)) ==
340       -1) {
341     if (errno == EINPROGRESS) {
342       struct pollfd pfd;
343       pfd.fd = fd;
344       pfd.events = POLLOUT;
345       pfd.revents = 0;
346       if (poll(&pfd, 1, -1) == -1) {
347         LOG(FATAL) << "poll() failed during connect; errno=" << errno;
348       }
349     } else {
350       LOG(FATAL) << "Failed to connect to the server (errno=" << errno << ")";
351     }
352   }
353 
354   cl->em_fd = grpc_fd_create(fd, "client", false);
355   grpc_pollset_add_fd(g_pollset, cl->em_fd);
356 
357   client_session_write(cl, absl::OkStatus());
358 }
359 
360 // Wait for the signal to shutdown a client.
client_wait_and_shutdown(client * cl)361 static void client_wait_and_shutdown(client* cl) {
362   gpr_mu_lock(g_mu);
363   while (!cl->done) {
364     grpc_pollset_worker* worker = nullptr;
365     grpc_core::ExecCtx exec_ctx;
366     ASSERT_TRUE(GRPC_LOG_IF_ERROR(
367         "pollset_work", grpc_pollset_work(g_pollset, &worker,
368                                           grpc_core::Timestamp::InfFuture())));
369     gpr_mu_unlock(g_mu);
370 
371     gpr_mu_lock(g_mu);
372   }
373   gpr_mu_unlock(g_mu);
374 }
375 
376 // Test grpc_fd. Start an upload server and client, upload a stream of
377 // bytes from the client to the server, and verify that the total number of
378 // sent bytes is equal to the total number of received bytes.
test_grpc_fd(void)379 static void test_grpc_fd(void) {
380   server sv;
381   client cl;
382   int port;
383   grpc_core::ExecCtx exec_ctx;
384 
385   server_init(&sv);
386   port = server_start(&sv);
387   client_init(&cl);
388   client_start(&cl, port);
389 
390   client_wait_and_shutdown(&cl);
391   server_wait_and_shutdown(&sv);
392   ASSERT_EQ(sv.read_bytes_total, cl.write_bytes_total);
393   LOG(INFO) << "Total read bytes " << sv.read_bytes_total;
394 }
395 
396 typedef struct fd_change_data {
397   grpc_iomgr_cb_func cb_that_ran;
398 } fd_change_data;
399 
init_change_data(fd_change_data * fdc)400 void init_change_data(fd_change_data* fdc) { fdc->cb_that_ran = nullptr; }
401 
destroy_change_data(fd_change_data *)402 void destroy_change_data(fd_change_data* /*fdc*/) {}
403 
first_read_callback(void * arg,grpc_error_handle)404 static void first_read_callback(void* arg /* fd_change_data */,
405                                 grpc_error_handle /*error*/) {
406   fd_change_data* fdc = static_cast<fd_change_data*>(arg);
407 
408   gpr_mu_lock(g_mu);
409   fdc->cb_that_ran = first_read_callback;
410   ASSERT_TRUE(
411       GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
412   gpr_mu_unlock(g_mu);
413 }
414 
second_read_callback(void * arg,grpc_error_handle)415 static void second_read_callback(void* arg /* fd_change_data */,
416                                  grpc_error_handle /*error*/) {
417   fd_change_data* fdc = static_cast<fd_change_data*>(arg);
418 
419   gpr_mu_lock(g_mu);
420   fdc->cb_that_ran = second_read_callback;
421   ASSERT_TRUE(
422       GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
423   gpr_mu_unlock(g_mu);
424 }
425 
426 // Test that changing the callback we use for notify_on_read actually works.
427 // Note that we have two different but almost identical callbacks above -- the
428 // point is to have two different function pointers and two different data
429 // pointers and make sure that changing both really works.
test_grpc_fd_change(void)430 static void test_grpc_fd_change(void) {
431   grpc_fd* em_fd;
432   fd_change_data a, b;
433   int flags;
434   int sv[2];
435   char data;
436   ssize_t result;
437   grpc_closure first_closure;
438   grpc_closure second_closure;
439   grpc_core::ExecCtx exec_ctx;
440 
441   GRPC_CLOSURE_INIT(&first_closure, first_read_callback, &a,
442                     grpc_schedule_on_exec_ctx);
443   GRPC_CLOSURE_INIT(&second_closure, second_read_callback, &b,
444                     grpc_schedule_on_exec_ctx);
445 
446   init_change_data(&a);
447   init_change_data(&b);
448 
449   ASSERT_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, sv), 0);
450   flags = fcntl(sv[0], F_GETFL, 0);
451   ASSERT_EQ(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK), 0);
452   flags = fcntl(sv[1], F_GETFL, 0);
453   ASSERT_EQ(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK), 0);
454 
455   em_fd = grpc_fd_create(sv[0], "test_grpc_fd_change", false);
456   grpc_pollset_add_fd(g_pollset, em_fd);
457 
458   // Register the first callback, then make its FD readable
459   grpc_fd_notify_on_read(em_fd, &first_closure);
460   data = 0;
461   result = write(sv[1], &data, 1);
462   ASSERT_EQ(result, 1);
463 
464   // And now wait for it to run.
465   gpr_mu_lock(g_mu);
466   while (a.cb_that_ran == nullptr) {
467     grpc_pollset_worker* worker = nullptr;
468     ASSERT_TRUE(GRPC_LOG_IF_ERROR(
469         "pollset_work", grpc_pollset_work(g_pollset, &worker,
470                                           grpc_core::Timestamp::InfFuture())));
471     gpr_mu_unlock(g_mu);
472 
473     gpr_mu_lock(g_mu);
474   }
475   ASSERT_EQ(a.cb_that_ran, first_read_callback);
476   gpr_mu_unlock(g_mu);
477 
478   // And drain the socket so we can generate a new read edge
479   result = read(sv[0], &data, 1);
480   ASSERT_EQ(result, 1);
481 
482   // Now register a second callback with distinct change data, and do the same
483   // thing again.
484   grpc_fd_notify_on_read(em_fd, &second_closure);
485   data = 0;
486   result = write(sv[1], &data, 1);
487   ASSERT_EQ(result, 1);
488 
489   gpr_mu_lock(g_mu);
490   while (b.cb_that_ran == nullptr) {
491     grpc_pollset_worker* worker = nullptr;
492     ASSERT_TRUE(GRPC_LOG_IF_ERROR(
493         "pollset_work", grpc_pollset_work(g_pollset, &worker,
494                                           grpc_core::Timestamp::InfFuture())));
495     gpr_mu_unlock(g_mu);
496 
497     gpr_mu_lock(g_mu);
498   }
499   // Except now we verify that second_read_callback ran instead
500   ASSERT_EQ(b.cb_that_ran, second_read_callback);
501   gpr_mu_unlock(g_mu);
502 
503   grpc_fd_orphan(em_fd, nullptr, nullptr, "d");
504 
505   destroy_change_data(&a);
506   destroy_change_data(&b);
507   close(sv[1]);
508 }
509 
destroy_pollset(void * p,grpc_error_handle)510 static void destroy_pollset(void* p, grpc_error_handle /*error*/) {
511   grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
512 }
513 
TEST(FdPosixTest,MainTest)514 TEST(FdPosixTest, MainTest) {
515   grpc_closure destroyed;
516   grpc_init();
517   {
518     grpc_core::ExecCtx exec_ctx;
519     g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
520     grpc_pollset_init(g_pollset, &g_mu);
521     test_grpc_fd();
522     test_grpc_fd_change();
523     GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
524                       grpc_schedule_on_exec_ctx);
525     grpc_pollset_shutdown(g_pollset, &destroyed);
526     grpc_core::ExecCtx::Get()->Flush();
527     gpr_free(g_pollset);
528   }
529   grpc_shutdown();
530 }
531 
532 #endif  // GRPC_POSIX_SOCKET_EV
533 
main(int argc,char ** argv)534 int main(int argc, char** argv) {
535   grpc::testing::TestEnvironment env(&argc, argv);
536   ::testing::InitGoogleTest(&argc, argv);
537   return RUN_ALL_TESTS();
538 }
539