1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <gtest/gtest.h>
20
21 #include "src/core/lib/iomgr/port.h"
22 #include "test/core/test_util/test_config.h"
23
24 // This test won't work except with posix sockets enabled
25 #ifdef GRPC_POSIX_SOCKET_EV
26
27 #include <ctype.h>
28 #include <errno.h>
29 #include <fcntl.h>
30 #include <grpc/grpc.h>
31 #include <grpc/support/alloc.h>
32 #include <grpc/support/sync.h>
33 #include <grpc/support/time.h>
34 #include <netinet/in.h>
35 #include <poll.h>
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <string.h>
39 #include <sys/socket.h>
40 #include <sys/time.h>
41 #include <unistd.h>
42
43 #include "absl/log/log.h"
44 #include "absl/strings/str_format.h"
45 #include "src/core/lib/iomgr/ev_posix.h"
46 #include "src/core/lib/iomgr/iomgr.h"
47 #include "src/core/lib/iomgr/socket_utils_posix.h"
48 #include "src/core/util/crash.h"
49 #include "src/core/util/strerror.h"
50
51 static gpr_mu* g_mu;
52 static grpc_pollset* g_pollset;
53
54 // buffer size used to send and receive data.
55 // 1024 is the minimal value to set TCP send and receive buffer.
56 #define BUF_SIZE 1024
57
58 // Create a test socket with the right properties for testing.
59 // port is the TCP port to listen or connect to.
60 // Return a socket FD and sockaddr_in.
create_test_socket(int port,int * socket_fd,struct sockaddr_in * sin)61 static void create_test_socket(int port, int* socket_fd,
62 struct sockaddr_in* sin) {
63 int fd;
64 int one = 1;
65 int buffer_size_bytes = BUF_SIZE;
66 int flags;
67
68 fd = socket(AF_INET, SOCK_STREAM, 0);
69 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
70 // Reset the size of socket send buffer to the minimal value to facilitate
71 // buffer filling up and triggering notify_on_write
72 ASSERT_EQ(grpc_set_socket_sndbuf(fd, buffer_size_bytes), absl::OkStatus());
73 ASSERT_EQ(grpc_set_socket_rcvbuf(fd, buffer_size_bytes), absl::OkStatus());
74 // Make fd non-blocking
75 flags = fcntl(fd, F_GETFL, 0);
76 ASSERT_EQ(fcntl(fd, F_SETFL, flags | O_NONBLOCK), 0);
77 *socket_fd = fd;
78
79 // Use local address for test
80 sin->sin_family = AF_INET;
81 sin->sin_addr.s_addr = htonl(0x7f000001);
82 ASSERT_GE(port, 0);
83 ASSERT_LT(port, 65536);
84 sin->sin_port = htons(static_cast<uint16_t>(port));
85 }
86
87 // Phony gRPC callback
no_op_cb(void *,int)88 void no_op_cb(void* /*arg*/, int /*success*/) {}
89
90 // =======An upload server to test notify_on_read===========
91 // The server simply reads and counts a stream of bytes.
92
93 // An upload server.
94 typedef struct {
95 grpc_fd* em_fd; // listening fd
96 ssize_t read_bytes_total; // total number of received bytes
97 int done; // set to 1 when a server finishes serving
98 grpc_closure listen_closure;
99 } server;
100
server_init(server * sv)101 static void server_init(server* sv) {
102 sv->read_bytes_total = 0;
103 sv->done = 0;
104 }
105
106 // An upload session.
107 // Created when a new upload request arrives in the server.
108 typedef struct {
109 server* sv; // not owned by a single session
110 grpc_fd* em_fd; // fd to read upload bytes
111 char read_buf[BUF_SIZE]; // buffer to store upload bytes
112 grpc_closure session_read_closure;
113 } session;
114
115 // Called when an upload session can be safely shutdown.
116 // Close session FD and start to shutdown listen FD.
session_shutdown_cb(void * arg,bool)117 static void session_shutdown_cb(void* arg, // session
118 bool /*success*/) {
119 session* se = static_cast<session*>(arg);
120 server* sv = se->sv;
121 grpc_fd_orphan(se->em_fd, nullptr, nullptr, "a");
122 gpr_free(se);
123 // Start to shutdown listen fd.
124 grpc_fd_shutdown(sv->em_fd, GRPC_ERROR_CREATE("session_shutdown_cb"));
125 }
126
127 // Called when data become readable in a session.
session_read_cb(void * arg,grpc_error_handle error)128 static void session_read_cb(void* arg, // session
129 grpc_error_handle error) {
130 session* se = static_cast<session*>(arg);
131 int fd = grpc_fd_wrapped_fd(se->em_fd);
132
133 ssize_t read_once = 0;
134 ssize_t read_total = 0;
135
136 if (!error.ok()) {
137 session_shutdown_cb(arg, true);
138 return;
139 }
140
141 do {
142 read_once = read(fd, se->read_buf, BUF_SIZE);
143 if (read_once > 0) read_total += read_once;
144 } while (read_once > 0);
145 se->sv->read_bytes_total += read_total;
146
147 // read() returns 0 to indicate the TCP connection was closed by the client.
148 // read(fd, read_buf, 0) also returns 0 which should never be called as such.
149 // It is possible to read nothing due to spurious edge event or data has
150 // been drained, In such a case, read() returns -1 and set errno to EAGAIN.
151 if (read_once == 0) {
152 session_shutdown_cb(arg, true);
153 } else if (read_once == -1) {
154 if (errno == EAGAIN) {
155 // An edge triggered event is cached in the kernel until next poll.
156 // In the current single thread implementation, session_read_cb is called
157 // in the polling thread, such that polling only happens after this
158 // callback, and will catch read edge event if data is available again
159 // before notify_on_read.
160 // TODO(chenw): in multi-threaded version, callback and polling can be
161 // run in different threads. polling may catch a persist read edge event
162 // before notify_on_read is called.
163 grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure);
164 } else {
165 LOG(FATAL) << "Unhandled read error " << grpc_core::StrError(errno);
166 }
167 }
168 }
169
170 // Called when the listen FD can be safely shutdown.
171 // Close listen FD and signal that server can be shutdown.
listen_shutdown_cb(void * arg,int)172 static void listen_shutdown_cb(void* arg /*server*/, int /*success*/) {
173 server* sv = static_cast<server*>(arg);
174
175 grpc_fd_orphan(sv->em_fd, nullptr, nullptr, "b");
176
177 gpr_mu_lock(g_mu);
178 sv->done = 1;
179 ASSERT_TRUE(
180 GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
181 gpr_mu_unlock(g_mu);
182 }
183
184 // Called when a new TCP connection request arrives in the listening port.
listen_cb(void * arg,grpc_error_handle error)185 static void listen_cb(void* arg, //=sv_arg
186 grpc_error_handle error) {
187 server* sv = static_cast<server*>(arg);
188 int fd;
189 int flags;
190 session* se;
191 struct sockaddr_storage ss;
192 socklen_t slen = sizeof(ss);
193 grpc_fd* listen_em_fd = sv->em_fd;
194
195 if (!error.ok()) {
196 listen_shutdown_cb(arg, 1);
197 return;
198 }
199
200 fd = accept(grpc_fd_wrapped_fd(listen_em_fd),
201 reinterpret_cast<struct sockaddr*>(&ss), &slen);
202 ASSERT_GE(fd, 0);
203 ASSERT_LT(fd, FD_SETSIZE);
204 flags = fcntl(fd, F_GETFL, 0);
205 fcntl(fd, F_SETFL, flags | O_NONBLOCK);
206 se = static_cast<session*>(gpr_malloc(sizeof(*se)));
207 se->sv = sv;
208 se->em_fd = grpc_fd_create(fd, "listener", false);
209 grpc_pollset_add_fd(g_pollset, se->em_fd);
210 GRPC_CLOSURE_INIT(&se->session_read_closure, session_read_cb, se,
211 grpc_schedule_on_exec_ctx);
212 grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure);
213
214 grpc_fd_notify_on_read(listen_em_fd, &sv->listen_closure);
215 }
216
217 // Max number of connections pending to be accepted by listen().
218 #define MAX_NUM_FD 1024
219
220 // Start a test server, return the TCP listening port bound to listen_fd.
221 // listen_cb() is registered to be interested in reading from listen_fd.
222 // When connection request arrives, listen_cb() is called to accept the
223 // connection request.
server_start(server * sv)224 static int server_start(server* sv) {
225 int port = 0;
226 int fd;
227 struct sockaddr_in sin;
228 socklen_t addr_len;
229
230 create_test_socket(port, &fd, &sin);
231 addr_len = sizeof(sin);
232 EXPECT_EQ(bind(fd, (struct sockaddr*)&sin, addr_len), 0);
233 EXPECT_EQ(getsockname(fd, (struct sockaddr*)&sin, &addr_len), 0);
234 port = ntohs(sin.sin_port);
235 EXPECT_EQ(listen(fd, MAX_NUM_FD), 0);
236
237 sv->em_fd = grpc_fd_create(fd, "server", false);
238 grpc_pollset_add_fd(g_pollset, sv->em_fd);
239 // Register to be interested in reading from listen_fd.
240 GRPC_CLOSURE_INIT(&sv->listen_closure, listen_cb, sv,
241 grpc_schedule_on_exec_ctx);
242 grpc_fd_notify_on_read(sv->em_fd, &sv->listen_closure);
243
244 return port;
245 }
246
247 // Wait and shutdown a sever.
server_wait_and_shutdown(server * sv)248 static void server_wait_and_shutdown(server* sv) {
249 gpr_mu_lock(g_mu);
250 while (!sv->done) {
251 grpc_core::ExecCtx exec_ctx;
252 grpc_pollset_worker* worker = nullptr;
253 ASSERT_TRUE(GRPC_LOG_IF_ERROR(
254 "pollset_work", grpc_pollset_work(g_pollset, &worker,
255 grpc_core::Timestamp::InfFuture())));
256 gpr_mu_unlock(g_mu);
257
258 gpr_mu_lock(g_mu);
259 }
260 gpr_mu_unlock(g_mu);
261 }
262
263 // ===An upload client to test notify_on_write===
264
265 // Client write buffer size
266 #define CLIENT_WRITE_BUF_SIZE 10
267 // Total number of times that the client fills up the write buffer
268 #define CLIENT_TOTAL_WRITE_CNT 3
269
270 // An upload client.
271 typedef struct {
272 grpc_fd* em_fd;
273 char write_buf[CLIENT_WRITE_BUF_SIZE];
274 ssize_t write_bytes_total;
275 // Number of times that the client fills up the write buffer and calls
276 // notify_on_write to schedule another write.
277 int client_write_cnt;
278
279 int done; // set to 1 when a client finishes sending
280 grpc_closure write_closure;
281 } client;
282
client_init(client * cl)283 static void client_init(client* cl) {
284 memset(cl->write_buf, 0, sizeof(cl->write_buf));
285 cl->write_bytes_total = 0;
286 cl->client_write_cnt = 0;
287 cl->done = 0;
288 }
289
290 // Called when a client upload session is ready to shutdown.
client_session_shutdown_cb(void * arg,int)291 static void client_session_shutdown_cb(void* arg /*client*/, int /*success*/) {
292 client* cl = static_cast<client*>(arg);
293 grpc_fd_orphan(cl->em_fd, nullptr, nullptr, "c");
294 cl->done = 1;
295 ASSERT_TRUE(
296 GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
297 }
298
299 // Write as much as possible, then register notify_on_write.
client_session_write(void * arg,grpc_error_handle error)300 static void client_session_write(void* arg, // client
301 grpc_error_handle error) {
302 client* cl = static_cast<client*>(arg);
303 int fd = grpc_fd_wrapped_fd(cl->em_fd);
304 ssize_t write_once = 0;
305
306 if (!error.ok()) {
307 gpr_mu_lock(g_mu);
308 client_session_shutdown_cb(arg, 1);
309 gpr_mu_unlock(g_mu);
310 return;
311 }
312
313 do {
314 write_once = write(fd, cl->write_buf, CLIENT_WRITE_BUF_SIZE);
315 if (write_once > 0) cl->write_bytes_total += write_once;
316 } while (write_once > 0);
317
318 if (errno == EAGAIN) {
319 gpr_mu_lock(g_mu);
320 if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) {
321 GRPC_CLOSURE_INIT(&cl->write_closure, client_session_write, cl,
322 grpc_schedule_on_exec_ctx);
323 grpc_fd_notify_on_write(cl->em_fd, &cl->write_closure);
324 cl->client_write_cnt++;
325 } else {
326 client_session_shutdown_cb(arg, 1);
327 }
328 gpr_mu_unlock(g_mu);
329 } else {
330 LOG(FATAL) << "unknown errno " << grpc_core::StrError(errno).c_str();
331 }
332 }
333
334 // Start a client to send a stream of bytes.
client_start(client * cl,int port)335 static void client_start(client* cl, int port) {
336 int fd;
337 struct sockaddr_in sin;
338 create_test_socket(port, &fd, &sin);
339 if (connect(fd, reinterpret_cast<struct sockaddr*>(&sin), sizeof(sin)) ==
340 -1) {
341 if (errno == EINPROGRESS) {
342 struct pollfd pfd;
343 pfd.fd = fd;
344 pfd.events = POLLOUT;
345 pfd.revents = 0;
346 if (poll(&pfd, 1, -1) == -1) {
347 LOG(FATAL) << "poll() failed during connect; errno=" << errno;
348 }
349 } else {
350 LOG(FATAL) << "Failed to connect to the server (errno=" << errno << ")";
351 }
352 }
353
354 cl->em_fd = grpc_fd_create(fd, "client", false);
355 grpc_pollset_add_fd(g_pollset, cl->em_fd);
356
357 client_session_write(cl, absl::OkStatus());
358 }
359
360 // Wait for the signal to shutdown a client.
client_wait_and_shutdown(client * cl)361 static void client_wait_and_shutdown(client* cl) {
362 gpr_mu_lock(g_mu);
363 while (!cl->done) {
364 grpc_pollset_worker* worker = nullptr;
365 grpc_core::ExecCtx exec_ctx;
366 ASSERT_TRUE(GRPC_LOG_IF_ERROR(
367 "pollset_work", grpc_pollset_work(g_pollset, &worker,
368 grpc_core::Timestamp::InfFuture())));
369 gpr_mu_unlock(g_mu);
370
371 gpr_mu_lock(g_mu);
372 }
373 gpr_mu_unlock(g_mu);
374 }
375
376 // Test grpc_fd. Start an upload server and client, upload a stream of
377 // bytes from the client to the server, and verify that the total number of
378 // sent bytes is equal to the total number of received bytes.
test_grpc_fd(void)379 static void test_grpc_fd(void) {
380 server sv;
381 client cl;
382 int port;
383 grpc_core::ExecCtx exec_ctx;
384
385 server_init(&sv);
386 port = server_start(&sv);
387 client_init(&cl);
388 client_start(&cl, port);
389
390 client_wait_and_shutdown(&cl);
391 server_wait_and_shutdown(&sv);
392 ASSERT_EQ(sv.read_bytes_total, cl.write_bytes_total);
393 LOG(INFO) << "Total read bytes " << sv.read_bytes_total;
394 }
395
396 typedef struct fd_change_data {
397 grpc_iomgr_cb_func cb_that_ran;
398 } fd_change_data;
399
init_change_data(fd_change_data * fdc)400 void init_change_data(fd_change_data* fdc) { fdc->cb_that_ran = nullptr; }
401
destroy_change_data(fd_change_data *)402 void destroy_change_data(fd_change_data* /*fdc*/) {}
403
first_read_callback(void * arg,grpc_error_handle)404 static void first_read_callback(void* arg /* fd_change_data */,
405 grpc_error_handle /*error*/) {
406 fd_change_data* fdc = static_cast<fd_change_data*>(arg);
407
408 gpr_mu_lock(g_mu);
409 fdc->cb_that_ran = first_read_callback;
410 ASSERT_TRUE(
411 GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
412 gpr_mu_unlock(g_mu);
413 }
414
second_read_callback(void * arg,grpc_error_handle)415 static void second_read_callback(void* arg /* fd_change_data */,
416 grpc_error_handle /*error*/) {
417 fd_change_data* fdc = static_cast<fd_change_data*>(arg);
418
419 gpr_mu_lock(g_mu);
420 fdc->cb_that_ran = second_read_callback;
421 ASSERT_TRUE(
422 GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
423 gpr_mu_unlock(g_mu);
424 }
425
426 // Test that changing the callback we use for notify_on_read actually works.
427 // Note that we have two different but almost identical callbacks above -- the
428 // point is to have two different function pointers and two different data
429 // pointers and make sure that changing both really works.
test_grpc_fd_change(void)430 static void test_grpc_fd_change(void) {
431 grpc_fd* em_fd;
432 fd_change_data a, b;
433 int flags;
434 int sv[2];
435 char data;
436 ssize_t result;
437 grpc_closure first_closure;
438 grpc_closure second_closure;
439 grpc_core::ExecCtx exec_ctx;
440
441 GRPC_CLOSURE_INIT(&first_closure, first_read_callback, &a,
442 grpc_schedule_on_exec_ctx);
443 GRPC_CLOSURE_INIT(&second_closure, second_read_callback, &b,
444 grpc_schedule_on_exec_ctx);
445
446 init_change_data(&a);
447 init_change_data(&b);
448
449 ASSERT_EQ(socketpair(AF_UNIX, SOCK_STREAM, 0, sv), 0);
450 flags = fcntl(sv[0], F_GETFL, 0);
451 ASSERT_EQ(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK), 0);
452 flags = fcntl(sv[1], F_GETFL, 0);
453 ASSERT_EQ(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK), 0);
454
455 em_fd = grpc_fd_create(sv[0], "test_grpc_fd_change", false);
456 grpc_pollset_add_fd(g_pollset, em_fd);
457
458 // Register the first callback, then make its FD readable
459 grpc_fd_notify_on_read(em_fd, &first_closure);
460 data = 0;
461 result = write(sv[1], &data, 1);
462 ASSERT_EQ(result, 1);
463
464 // And now wait for it to run.
465 gpr_mu_lock(g_mu);
466 while (a.cb_that_ran == nullptr) {
467 grpc_pollset_worker* worker = nullptr;
468 ASSERT_TRUE(GRPC_LOG_IF_ERROR(
469 "pollset_work", grpc_pollset_work(g_pollset, &worker,
470 grpc_core::Timestamp::InfFuture())));
471 gpr_mu_unlock(g_mu);
472
473 gpr_mu_lock(g_mu);
474 }
475 ASSERT_EQ(a.cb_that_ran, first_read_callback);
476 gpr_mu_unlock(g_mu);
477
478 // And drain the socket so we can generate a new read edge
479 result = read(sv[0], &data, 1);
480 ASSERT_EQ(result, 1);
481
482 // Now register a second callback with distinct change data, and do the same
483 // thing again.
484 grpc_fd_notify_on_read(em_fd, &second_closure);
485 data = 0;
486 result = write(sv[1], &data, 1);
487 ASSERT_EQ(result, 1);
488
489 gpr_mu_lock(g_mu);
490 while (b.cb_that_ran == nullptr) {
491 grpc_pollset_worker* worker = nullptr;
492 ASSERT_TRUE(GRPC_LOG_IF_ERROR(
493 "pollset_work", grpc_pollset_work(g_pollset, &worker,
494 grpc_core::Timestamp::InfFuture())));
495 gpr_mu_unlock(g_mu);
496
497 gpr_mu_lock(g_mu);
498 }
499 // Except now we verify that second_read_callback ran instead
500 ASSERT_EQ(b.cb_that_ran, second_read_callback);
501 gpr_mu_unlock(g_mu);
502
503 grpc_fd_orphan(em_fd, nullptr, nullptr, "d");
504
505 destroy_change_data(&a);
506 destroy_change_data(&b);
507 close(sv[1]);
508 }
509
destroy_pollset(void * p,grpc_error_handle)510 static void destroy_pollset(void* p, grpc_error_handle /*error*/) {
511 grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
512 }
513
TEST(FdPosixTest,MainTest)514 TEST(FdPosixTest, MainTest) {
515 grpc_closure destroyed;
516 grpc_init();
517 {
518 grpc_core::ExecCtx exec_ctx;
519 g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
520 grpc_pollset_init(g_pollset, &g_mu);
521 test_grpc_fd();
522 test_grpc_fd_change();
523 GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
524 grpc_schedule_on_exec_ctx);
525 grpc_pollset_shutdown(g_pollset, &destroyed);
526 grpc_core::ExecCtx::Get()->Flush();
527 gpr_free(g_pollset);
528 }
529 grpc_shutdown();
530 }
531
532 #endif // GRPC_POSIX_SOCKET_EV
533
main(int argc,char ** argv)534 int main(int argc, char** argv) {
535 grpc::testing::TestEnvironment env(&argc, argv);
536 ::testing::InitGoogleTest(&argc, argv);
537 return RUN_ALL_TESTS();
538 }
539