1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include "src/core/lib/iomgr/port.h"
20
21 // This test won't work except with posix sockets enabled
22 #ifdef GRPC_POSIX_SOCKET
23
24 #include "src/core/lib/iomgr/ev_posix.h"
25
26 #include <ctype.h>
27 #include <errno.h>
28 #include <fcntl.h>
29 #include <netinet/in.h>
30 #include <poll.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <string.h>
34 #include <sys/socket.h>
35 #include <sys/time.h>
36 #include <unistd.h>
37
38 #include <grpc/grpc.h>
39 #include <grpc/support/alloc.h>
40 #include <grpc/support/log.h>
41 #include <grpc/support/sync.h>
42 #include <grpc/support/time.h>
43
44 #include "src/core/lib/iomgr/ev_posix.h"
45 #include "src/core/lib/iomgr/iomgr.h"
46 #include "src/core/lib/iomgr/socket_utils_posix.h"
47 #include "test/core/util/test_config.h"
48
49 static gpr_mu* g_mu;
50 static grpc_pollset* g_pollset;
51
52 /* buffer size used to send and receive data.
53 1024 is the minimal value to set TCP send and receive buffer. */
54 #define BUF_SIZE 1024
55
56 /* Create a test socket with the right properties for testing.
57 port is the TCP port to listen or connect to.
58 Return a socket FD and sockaddr_in. */
create_test_socket(int port,int * socket_fd,struct sockaddr_in * sin)59 static void create_test_socket(int port, int* socket_fd,
60 struct sockaddr_in* sin) {
61 int fd;
62 int one = 1;
63 int buffer_size_bytes = BUF_SIZE;
64 int flags;
65
66 fd = socket(AF_INET, SOCK_STREAM, 0);
67 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
68 /* Reset the size of socket send buffer to the minimal value to facilitate
69 buffer filling up and triggering notify_on_write */
70 GPR_ASSERT(grpc_set_socket_sndbuf(fd, buffer_size_bytes) == GRPC_ERROR_NONE);
71 GPR_ASSERT(grpc_set_socket_rcvbuf(fd, buffer_size_bytes) == GRPC_ERROR_NONE);
72 /* Make fd non-blocking */
73 flags = fcntl(fd, F_GETFL, 0);
74 GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
75 *socket_fd = fd;
76
77 /* Use local address for test */
78 sin->sin_family = AF_INET;
79 sin->sin_addr.s_addr = htonl(0x7f000001);
80 GPR_ASSERT(port >= 0 && port < 65536);
81 sin->sin_port = htons(static_cast<uint16_t>(port));
82 }
83
84 /* Dummy gRPC callback */
no_op_cb(void * arg,int success)85 void no_op_cb(void* arg, int success) {}
86
87 /* =======An upload server to test notify_on_read===========
88 The server simply reads and counts a stream of bytes. */
89
90 /* An upload server. */
91 typedef struct {
92 grpc_fd* em_fd; /* listening fd */
93 ssize_t read_bytes_total; /* total number of received bytes */
94 int done; /* set to 1 when a server finishes serving */
95 grpc_closure listen_closure;
96 } server;
97
server_init(server * sv)98 static void server_init(server* sv) {
99 sv->read_bytes_total = 0;
100 sv->done = 0;
101 }
102
103 /* An upload session.
104 Created when a new upload request arrives in the server. */
105 typedef struct {
106 server* sv; /* not owned by a single session */
107 grpc_fd* em_fd; /* fd to read upload bytes */
108 char read_buf[BUF_SIZE]; /* buffer to store upload bytes */
109 grpc_closure session_read_closure;
110 } session;
111
112 /* Called when an upload session can be safely shutdown.
113 Close session FD and start to shutdown listen FD. */
session_shutdown_cb(void * arg,bool success)114 static void session_shutdown_cb(void* arg, /*session */
115 bool success) {
116 session* se = static_cast<session*>(arg);
117 server* sv = se->sv;
118 grpc_fd_orphan(se->em_fd, nullptr, nullptr, "a");
119 gpr_free(se);
120 /* Start to shutdown listen fd. */
121 grpc_fd_shutdown(sv->em_fd,
122 GRPC_ERROR_CREATE_FROM_STATIC_STRING("session_shutdown_cb"));
123 }
124
125 /* Called when data become readable in a session. */
session_read_cb(void * arg,grpc_error * error)126 static void session_read_cb(void* arg, /*session */
127 grpc_error* error) {
128 session* se = static_cast<session*>(arg);
129 int fd = grpc_fd_wrapped_fd(se->em_fd);
130
131 ssize_t read_once = 0;
132 ssize_t read_total = 0;
133
134 if (error != GRPC_ERROR_NONE) {
135 session_shutdown_cb(arg, 1);
136 return;
137 }
138
139 do {
140 read_once = read(fd, se->read_buf, BUF_SIZE);
141 if (read_once > 0) read_total += read_once;
142 } while (read_once > 0);
143 se->sv->read_bytes_total += read_total;
144
145 /* read() returns 0 to indicate the TCP connection was closed by the client.
146 read(fd, read_buf, 0) also returns 0 which should never be called as such.
147 It is possible to read nothing due to spurious edge event or data has
148 been drained, In such a case, read() returns -1 and set errno to EAGAIN. */
149 if (read_once == 0) {
150 session_shutdown_cb(arg, 1);
151 } else if (read_once == -1) {
152 if (errno == EAGAIN) {
153 /* An edge triggered event is cached in the kernel until next poll.
154 In the current single thread implementation, session_read_cb is called
155 in the polling thread, such that polling only happens after this
156 callback, and will catch read edge event if data is available again
157 before notify_on_read.
158 TODO(chenw): in multi-threaded version, callback and polling can be
159 run in different threads. polling may catch a persist read edge event
160 before notify_on_read is called. */
161 grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure);
162 } else {
163 gpr_log(GPR_ERROR, "Unhandled read error %s", strerror(errno));
164 abort();
165 }
166 }
167 }
168
169 /* Called when the listen FD can be safely shutdown.
170 Close listen FD and signal that server can be shutdown. */
listen_shutdown_cb(void * arg,int success)171 static void listen_shutdown_cb(void* arg /*server */, int success) {
172 server* sv = static_cast<server*>(arg);
173
174 grpc_fd_orphan(sv->em_fd, nullptr, nullptr, "b");
175
176 gpr_mu_lock(g_mu);
177 sv->done = 1;
178 GPR_ASSERT(
179 GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
180 gpr_mu_unlock(g_mu);
181 }
182
183 /* Called when a new TCP connection request arrives in the listening port. */
listen_cb(void * arg,grpc_error * error)184 static void listen_cb(void* arg, /*=sv_arg*/
185 grpc_error* error) {
186 server* sv = static_cast<server*>(arg);
187 int fd;
188 int flags;
189 session* se;
190 struct sockaddr_storage ss;
191 socklen_t slen = sizeof(ss);
192 grpc_fd* listen_em_fd = sv->em_fd;
193
194 if (error != GRPC_ERROR_NONE) {
195 listen_shutdown_cb(arg, 1);
196 return;
197 }
198
199 fd = accept(grpc_fd_wrapped_fd(listen_em_fd),
200 reinterpret_cast<struct sockaddr*>(&ss), &slen);
201 GPR_ASSERT(fd >= 0);
202 GPR_ASSERT(fd < FD_SETSIZE);
203 flags = fcntl(fd, F_GETFL, 0);
204 fcntl(fd, F_SETFL, flags | O_NONBLOCK);
205 se = static_cast<session*>(gpr_malloc(sizeof(*se)));
206 se->sv = sv;
207 se->em_fd = grpc_fd_create(fd, "listener", false);
208 grpc_pollset_add_fd(g_pollset, se->em_fd);
209 GRPC_CLOSURE_INIT(&se->session_read_closure, session_read_cb, se,
210 grpc_schedule_on_exec_ctx);
211 grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure);
212
213 grpc_fd_notify_on_read(listen_em_fd, &sv->listen_closure);
214 }
215
216 /* Max number of connections pending to be accepted by listen(). */
217 #define MAX_NUM_FD 1024
218
219 /* Start a test server, return the TCP listening port bound to listen_fd.
220 listen_cb() is registered to be interested in reading from listen_fd.
221 When connection request arrives, listen_cb() is called to accept the
222 connection request. */
server_start(server * sv)223 static int server_start(server* sv) {
224 int port = 0;
225 int fd;
226 struct sockaddr_in sin;
227 socklen_t addr_len;
228
229 create_test_socket(port, &fd, &sin);
230 addr_len = sizeof(sin);
231 GPR_ASSERT(bind(fd, (struct sockaddr*)&sin, addr_len) == 0);
232 GPR_ASSERT(getsockname(fd, (struct sockaddr*)&sin, &addr_len) == 0);
233 port = ntohs(sin.sin_port);
234 GPR_ASSERT(listen(fd, MAX_NUM_FD) == 0);
235
236 sv->em_fd = grpc_fd_create(fd, "server", false);
237 grpc_pollset_add_fd(g_pollset, sv->em_fd);
238 /* Register to be interested in reading from listen_fd. */
239 GRPC_CLOSURE_INIT(&sv->listen_closure, listen_cb, sv,
240 grpc_schedule_on_exec_ctx);
241 grpc_fd_notify_on_read(sv->em_fd, &sv->listen_closure);
242
243 return port;
244 }
245
246 /* Wait and shutdown a sever. */
server_wait_and_shutdown(server * sv)247 static void server_wait_and_shutdown(server* sv) {
248 gpr_mu_lock(g_mu);
249 while (!sv->done) {
250 grpc_core::ExecCtx exec_ctx;
251 grpc_pollset_worker* worker = nullptr;
252 GPR_ASSERT(GRPC_LOG_IF_ERROR(
253 "pollset_work",
254 grpc_pollset_work(g_pollset, &worker, GRPC_MILLIS_INF_FUTURE)));
255 gpr_mu_unlock(g_mu);
256
257 gpr_mu_lock(g_mu);
258 }
259 gpr_mu_unlock(g_mu);
260 }
261
262 /* ===An upload client to test notify_on_write=== */
263
264 /* Client write buffer size */
265 #define CLIENT_WRITE_BUF_SIZE 10
266 /* Total number of times that the client fills up the write buffer */
267 #define CLIENT_TOTAL_WRITE_CNT 3
268
269 /* An upload client. */
270 typedef struct {
271 grpc_fd* em_fd;
272 char write_buf[CLIENT_WRITE_BUF_SIZE];
273 ssize_t write_bytes_total;
274 /* Number of times that the client fills up the write buffer and calls
275 notify_on_write to schedule another write. */
276 int client_write_cnt;
277
278 int done; /* set to 1 when a client finishes sending */
279 grpc_closure write_closure;
280 } client;
281
client_init(client * cl)282 static void client_init(client* cl) {
283 memset(cl->write_buf, 0, sizeof(cl->write_buf));
284 cl->write_bytes_total = 0;
285 cl->client_write_cnt = 0;
286 cl->done = 0;
287 }
288
289 /* Called when a client upload session is ready to shutdown. */
client_session_shutdown_cb(void * arg,int success)290 static void client_session_shutdown_cb(void* arg /*client */, int success) {
291 client* cl = static_cast<client*>(arg);
292 grpc_fd_orphan(cl->em_fd, nullptr, nullptr, "c");
293 cl->done = 1;
294 GPR_ASSERT(
295 GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
296 }
297
298 /* Write as much as possible, then register notify_on_write. */
client_session_write(void * arg,grpc_error * error)299 static void client_session_write(void* arg, /*client */
300 grpc_error* error) {
301 client* cl = static_cast<client*>(arg);
302 int fd = grpc_fd_wrapped_fd(cl->em_fd);
303 ssize_t write_once = 0;
304
305 if (error != GRPC_ERROR_NONE) {
306 gpr_mu_lock(g_mu);
307 client_session_shutdown_cb(arg, 1);
308 gpr_mu_unlock(g_mu);
309 return;
310 }
311
312 do {
313 write_once = write(fd, cl->write_buf, CLIENT_WRITE_BUF_SIZE);
314 if (write_once > 0) cl->write_bytes_total += write_once;
315 } while (write_once > 0);
316
317 if (errno == EAGAIN) {
318 gpr_mu_lock(g_mu);
319 if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) {
320 GRPC_CLOSURE_INIT(&cl->write_closure, client_session_write, cl,
321 grpc_schedule_on_exec_ctx);
322 grpc_fd_notify_on_write(cl->em_fd, &cl->write_closure);
323 cl->client_write_cnt++;
324 } else {
325 client_session_shutdown_cb(arg, 1);
326 }
327 gpr_mu_unlock(g_mu);
328 } else {
329 gpr_log(GPR_ERROR, "unknown errno %s", strerror(errno));
330 abort();
331 }
332 }
333
334 /* Start a client to send a stream of bytes. */
client_start(client * cl,int port)335 static void client_start(client* cl, int port) {
336 int fd;
337 struct sockaddr_in sin;
338 create_test_socket(port, &fd, &sin);
339 if (connect(fd, reinterpret_cast<struct sockaddr*>(&sin), sizeof(sin)) ==
340 -1) {
341 if (errno == EINPROGRESS) {
342 struct pollfd pfd;
343 pfd.fd = fd;
344 pfd.events = POLLOUT;
345 pfd.revents = 0;
346 if (poll(&pfd, 1, -1) == -1) {
347 gpr_log(GPR_ERROR, "poll() failed during connect; errno=%d", errno);
348 abort();
349 }
350 } else {
351 gpr_log(GPR_ERROR, "Failed to connect to the server (errno=%d)", errno);
352 abort();
353 }
354 }
355
356 cl->em_fd = grpc_fd_create(fd, "client", false);
357 grpc_pollset_add_fd(g_pollset, cl->em_fd);
358
359 client_session_write(cl, GRPC_ERROR_NONE);
360 }
361
362 /* Wait for the signal to shutdown a client. */
client_wait_and_shutdown(client * cl)363 static void client_wait_and_shutdown(client* cl) {
364 gpr_mu_lock(g_mu);
365 while (!cl->done) {
366 grpc_pollset_worker* worker = nullptr;
367 grpc_core::ExecCtx exec_ctx;
368 GPR_ASSERT(GRPC_LOG_IF_ERROR(
369 "pollset_work",
370 grpc_pollset_work(g_pollset, &worker, GRPC_MILLIS_INF_FUTURE)));
371 gpr_mu_unlock(g_mu);
372
373 gpr_mu_lock(g_mu);
374 }
375 gpr_mu_unlock(g_mu);
376 }
377
378 /* Test grpc_fd. Start an upload server and client, upload a stream of
379 bytes from the client to the server, and verify that the total number of
380 sent bytes is equal to the total number of received bytes. */
test_grpc_fd(void)381 static void test_grpc_fd(void) {
382 server sv;
383 client cl;
384 int port;
385 grpc_core::ExecCtx exec_ctx;
386
387 server_init(&sv);
388 port = server_start(&sv);
389 client_init(&cl);
390 client_start(&cl, port);
391
392 client_wait_and_shutdown(&cl);
393 server_wait_and_shutdown(&sv);
394 GPR_ASSERT(sv.read_bytes_total == cl.write_bytes_total);
395 gpr_log(GPR_INFO, "Total read bytes %" PRIdPTR, sv.read_bytes_total);
396 }
397
398 typedef struct fd_change_data {
399 grpc_iomgr_cb_func cb_that_ran;
400 } fd_change_data;
401
init_change_data(fd_change_data * fdc)402 void init_change_data(fd_change_data* fdc) { fdc->cb_that_ran = nullptr; }
403
destroy_change_data(fd_change_data * fdc)404 void destroy_change_data(fd_change_data* fdc) {}
405
first_read_callback(void * arg,grpc_error * error)406 static void first_read_callback(void* arg /* fd_change_data */,
407 grpc_error* error) {
408 fd_change_data* fdc = static_cast<fd_change_data*>(arg);
409
410 gpr_mu_lock(g_mu);
411 fdc->cb_that_ran = first_read_callback;
412 GPR_ASSERT(
413 GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
414 gpr_mu_unlock(g_mu);
415 }
416
second_read_callback(void * arg,grpc_error * error)417 static void second_read_callback(void* arg /* fd_change_data */,
418 grpc_error* error) {
419 fd_change_data* fdc = static_cast<fd_change_data*>(arg);
420
421 gpr_mu_lock(g_mu);
422 fdc->cb_that_ran = second_read_callback;
423 GPR_ASSERT(
424 GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
425 gpr_mu_unlock(g_mu);
426 }
427
428 /* Test that changing the callback we use for notify_on_read actually works.
429 Note that we have two different but almost identical callbacks above -- the
430 point is to have two different function pointers and two different data
431 pointers and make sure that changing both really works. */
test_grpc_fd_change(void)432 static void test_grpc_fd_change(void) {
433 grpc_fd* em_fd;
434 fd_change_data a, b;
435 int flags;
436 int sv[2];
437 char data;
438 ssize_t result;
439 grpc_closure first_closure;
440 grpc_closure second_closure;
441 grpc_core::ExecCtx exec_ctx;
442
443 GRPC_CLOSURE_INIT(&first_closure, first_read_callback, &a,
444 grpc_schedule_on_exec_ctx);
445 GRPC_CLOSURE_INIT(&second_closure, second_read_callback, &b,
446 grpc_schedule_on_exec_ctx);
447
448 init_change_data(&a);
449 init_change_data(&b);
450
451 GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0);
452 flags = fcntl(sv[0], F_GETFL, 0);
453 GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0);
454 flags = fcntl(sv[1], F_GETFL, 0);
455 GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
456
457 em_fd = grpc_fd_create(sv[0], "test_grpc_fd_change", false);
458 grpc_pollset_add_fd(g_pollset, em_fd);
459
460 /* Register the first callback, then make its FD readable */
461 grpc_fd_notify_on_read(em_fd, &first_closure);
462 data = 0;
463 result = write(sv[1], &data, 1);
464 GPR_ASSERT(result == 1);
465
466 /* And now wait for it to run. */
467 gpr_mu_lock(g_mu);
468 while (a.cb_that_ran == nullptr) {
469 grpc_pollset_worker* worker = nullptr;
470 GPR_ASSERT(GRPC_LOG_IF_ERROR(
471 "pollset_work",
472 grpc_pollset_work(g_pollset, &worker, GRPC_MILLIS_INF_FUTURE)));
473 gpr_mu_unlock(g_mu);
474
475 gpr_mu_lock(g_mu);
476 }
477 GPR_ASSERT(a.cb_that_ran == first_read_callback);
478 gpr_mu_unlock(g_mu);
479
480 /* And drain the socket so we can generate a new read edge */
481 result = read(sv[0], &data, 1);
482 GPR_ASSERT(result == 1);
483
484 /* Now register a second callback with distinct change data, and do the same
485 thing again. */
486 grpc_fd_notify_on_read(em_fd, &second_closure);
487 data = 0;
488 result = write(sv[1], &data, 1);
489 GPR_ASSERT(result == 1);
490
491 gpr_mu_lock(g_mu);
492 while (b.cb_that_ran == nullptr) {
493 grpc_pollset_worker* worker = nullptr;
494 GPR_ASSERT(GRPC_LOG_IF_ERROR(
495 "pollset_work",
496 grpc_pollset_work(g_pollset, &worker, GRPC_MILLIS_INF_FUTURE)));
497 gpr_mu_unlock(g_mu);
498
499 gpr_mu_lock(g_mu);
500 }
501 /* Except now we verify that second_read_callback ran instead */
502 GPR_ASSERT(b.cb_that_ran == second_read_callback);
503 gpr_mu_unlock(g_mu);
504
505 grpc_fd_orphan(em_fd, nullptr, nullptr, "d");
506
507 destroy_change_data(&a);
508 destroy_change_data(&b);
509 close(sv[1]);
510 }
511
destroy_pollset(void * p,grpc_error * error)512 static void destroy_pollset(void* p, grpc_error* error) {
513 grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
514 }
515
main(int argc,char ** argv)516 int main(int argc, char** argv) {
517 grpc_closure destroyed;
518 grpc_test_init(argc, argv);
519 grpc_init();
520 {
521 grpc_core::ExecCtx exec_ctx;
522 g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
523 grpc_pollset_init(g_pollset, &g_mu);
524 test_grpc_fd();
525 test_grpc_fd_change();
526 GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
527 grpc_schedule_on_exec_ctx);
528 grpc_pollset_shutdown(g_pollset, &destroyed);
529 grpc_core::ExecCtx::Get()->Flush();
530 gpr_free(g_pollset);
531 }
532 grpc_shutdown();
533 return 0;
534 }
535
536 #else /* GRPC_POSIX_SOCKET */
537
main(int argc,char ** argv)538 int main(int argc, char** argv) { return 1; }
539
540 #endif /* GRPC_POSIX_SOCKET */
541