• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include "src/core/lib/iomgr/port.h"
20 
21 // This test won't work except with posix sockets enabled
22 #ifdef GRPC_POSIX_SOCKET_TCP
23 
24 #include "src/core/lib/iomgr/tcp_posix.h"
25 
26 #include <errno.h>
27 #include <fcntl.h>
28 #include <string.h>
29 #include <sys/socket.h>
30 #include <sys/types.h>
31 #include <unistd.h>
32 
33 #include <grpc/grpc.h>
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/log.h>
36 #include <grpc/support/time.h>
37 
38 #include "src/core/lib/gpr/useful.h"
39 #include "src/core/lib/iomgr/buffer_list.h"
40 #include "src/core/lib/iomgr/ev_posix.h"
41 #include "src/core/lib/iomgr/sockaddr_posix.h"
42 #include "src/core/lib/slice/slice_internal.h"
43 #include "test/core/iomgr/endpoint_tests.h"
44 #include "test/core/util/test_config.h"
45 
46 static gpr_mu* g_mu;
47 static grpc_pollset* g_pollset;
48 
49 /*
50    General test notes:
51 
52    All tests which write data into a socket write i%256 into byte i, which is
53    verified by readers.
54 
55    In general there are a few interesting things to vary which may lead to
56    exercising different codepaths in an implementation:
57    1. Total amount of data written to the socket
58    2. Size of slice allocations
59    3. Amount of data we read from or write to the socket at once
60 
61    The tests here tend to parameterize these where applicable.
62 
63  */
64 
create_sockets(int sv[2])65 static void create_sockets(int sv[2]) {
66   int flags;
67   GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0);
68   flags = fcntl(sv[0], F_GETFL, 0);
69   GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0);
70   flags = fcntl(sv[1], F_GETFL, 0);
71   GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
72 }
73 
create_inet_sockets(int sv[2])74 static void create_inet_sockets(int sv[2]) {
75   /* Prepare listening socket */
76   struct sockaddr_in addr;
77   memset(&addr, 0, sizeof(struct sockaddr_in));
78   addr.sin_family = AF_INET;
79   int sock = socket(AF_INET, SOCK_STREAM, 0);
80   GPR_ASSERT(sock);
81   GPR_ASSERT(bind(sock, (sockaddr*)&addr, sizeof(sockaddr_in)) == 0);
82   listen(sock, 1);
83 
84   /* Prepare client socket and connect to server */
85   socklen_t len = sizeof(sockaddr_in);
86   GPR_ASSERT(getsockname(sock, (sockaddr*)&addr, &len) == 0);
87 
88   int client = socket(AF_INET, SOCK_STREAM, 0);
89   GPR_ASSERT(client);
90   int ret;
91   do {
92     ret = connect(client, (sockaddr*)&addr, sizeof(sockaddr_in));
93   } while (ret == -1 && errno == EINTR);
94 
95   /* Accept client connection */
96   len = sizeof(socklen_t);
97   int server;
98   do {
99     server = accept(sock, (sockaddr*)&addr, (socklen_t*)&len);
100   } while (server == -1 && errno == EINTR);
101   GPR_ASSERT(server != -1);
102 
103   sv[0] = server;
104   sv[1] = client;
105   int flags = fcntl(sv[0], F_GETFL, 0);
106   GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0);
107   flags = fcntl(sv[1], F_GETFL, 0);
108   GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
109 }
110 
fill_socket(int fd)111 static ssize_t fill_socket(int fd) {
112   ssize_t write_bytes;
113   ssize_t total_bytes = 0;
114   int i;
115   unsigned char buf[256];
116   for (i = 0; i < 256; ++i) {
117     buf[i] = static_cast<uint8_t>(i);
118   }
119   do {
120     write_bytes = write(fd, buf, 256);
121     if (write_bytes > 0) {
122       total_bytes += write_bytes;
123     }
124   } while (write_bytes >= 0 || errno == EINTR);
125   GPR_ASSERT(errno == EAGAIN);
126   return total_bytes;
127 }
128 
fill_socket_partial(int fd,size_t bytes)129 static size_t fill_socket_partial(int fd, size_t bytes) {
130   ssize_t write_bytes;
131   size_t total_bytes = 0;
132   unsigned char* buf = static_cast<unsigned char*>(gpr_malloc(bytes));
133   unsigned i;
134   for (i = 0; i < bytes; ++i) {
135     buf[i] = static_cast<uint8_t>(i % 256);
136   }
137 
138   do {
139     write_bytes = write(fd, buf, bytes - total_bytes);
140     if (write_bytes > 0) {
141       total_bytes += static_cast<size_t>(write_bytes);
142     }
143   } while ((write_bytes >= 0 || errno == EINTR) && bytes > total_bytes);
144 
145   gpr_free(buf);
146 
147   return total_bytes;
148 }
149 
150 struct read_socket_state {
151   grpc_endpoint* ep;
152   size_t read_bytes;
153   size_t target_read_bytes;
154   grpc_slice_buffer incoming;
155   grpc_closure read_cb;
156 };
157 
count_slices(grpc_slice * slices,size_t nslices,int * current_data)158 static size_t count_slices(grpc_slice* slices, size_t nslices,
159                            int* current_data) {
160   size_t num_bytes = 0;
161   unsigned i, j;
162   unsigned char* buf;
163   for (i = 0; i < nslices; ++i) {
164     buf = GRPC_SLICE_START_PTR(slices[i]);
165     for (j = 0; j < GRPC_SLICE_LENGTH(slices[i]); ++j) {
166       GPR_ASSERT(buf[j] == *current_data);
167       *current_data = (*current_data + 1) % 256;
168     }
169     num_bytes += GRPC_SLICE_LENGTH(slices[i]);
170   }
171   return num_bytes;
172 }
173 
read_cb(void * user_data,grpc_error * error)174 static void read_cb(void* user_data, grpc_error* error) {
175   struct read_socket_state* state =
176       static_cast<struct read_socket_state*>(user_data);
177   size_t read_bytes;
178   int current_data;
179 
180   GPR_ASSERT(error == GRPC_ERROR_NONE);
181 
182   gpr_mu_lock(g_mu);
183   current_data = state->read_bytes % 256;
184   read_bytes = count_slices(state->incoming.slices, state->incoming.count,
185                             &current_data);
186   state->read_bytes += read_bytes;
187   gpr_log(GPR_INFO, "Read %" PRIuPTR " bytes of %" PRIuPTR, read_bytes,
188           state->target_read_bytes);
189   if (state->read_bytes >= state->target_read_bytes) {
190     GPR_ASSERT(
191         GRPC_LOG_IF_ERROR("kick", grpc_pollset_kick(g_pollset, nullptr)));
192     gpr_mu_unlock(g_mu);
193   } else {
194     gpr_mu_unlock(g_mu);
195     grpc_endpoint_read(state->ep, &state->incoming, &state->read_cb,
196                        /*urgent=*/false);
197   }
198 }
199 
200 /* Write to a socket, then read from it using the grpc_tcp API. */
read_test(size_t num_bytes,size_t slice_size)201 static void read_test(size_t num_bytes, size_t slice_size) {
202   int sv[2];
203   grpc_endpoint* ep;
204   struct read_socket_state state;
205   size_t written_bytes;
206   grpc_millis deadline =
207       grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20));
208   grpc_core::ExecCtx exec_ctx;
209 
210   gpr_log(GPR_INFO, "Read test of size %" PRIuPTR ", slice size %" PRIuPTR,
211           num_bytes, slice_size);
212 
213   create_sockets(sv);
214 
215   grpc_arg a[1];
216   a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE);
217   a[0].type = GRPC_ARG_INTEGER,
218   a[0].value.integer = static_cast<int>(slice_size);
219   grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
220   ep =
221       grpc_tcp_create(grpc_fd_create(sv[1], "read_test", false), &args, "test");
222   grpc_endpoint_add_to_pollset(ep, g_pollset);
223 
224   written_bytes = fill_socket_partial(sv[0], num_bytes);
225   gpr_log(GPR_INFO, "Wrote %" PRIuPTR " bytes", written_bytes);
226 
227   state.ep = ep;
228   state.read_bytes = 0;
229   state.target_read_bytes = written_bytes;
230   grpc_slice_buffer_init(&state.incoming);
231   GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx);
232 
233   grpc_endpoint_read(ep, &state.incoming, &state.read_cb, /*urgent=*/false);
234 
235   gpr_mu_lock(g_mu);
236   while (state.read_bytes < state.target_read_bytes) {
237     grpc_pollset_worker* worker = nullptr;
238     GPR_ASSERT(GRPC_LOG_IF_ERROR(
239         "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
240     gpr_mu_unlock(g_mu);
241 
242     gpr_mu_lock(g_mu);
243   }
244   GPR_ASSERT(state.read_bytes == state.target_read_bytes);
245   gpr_mu_unlock(g_mu);
246 
247   grpc_slice_buffer_destroy_internal(&state.incoming);
248   grpc_endpoint_destroy(ep);
249 }
250 
251 /* Write to a socket until it fills up, then read from it using the grpc_tcp
252    API. */
large_read_test(size_t slice_size)253 static void large_read_test(size_t slice_size) {
254   int sv[2];
255   grpc_endpoint* ep;
256   struct read_socket_state state;
257   ssize_t written_bytes;
258   grpc_millis deadline =
259       grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20));
260   grpc_core::ExecCtx exec_ctx;
261 
262   gpr_log(GPR_INFO, "Start large read test, slice size %" PRIuPTR, slice_size);
263 
264   create_sockets(sv);
265 
266   grpc_arg a[1];
267   a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE);
268   a[0].type = GRPC_ARG_INTEGER;
269   a[0].value.integer = static_cast<int>(slice_size);
270   grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
271   ep = grpc_tcp_create(grpc_fd_create(sv[1], "large_read_test", false), &args,
272                        "test");
273   grpc_endpoint_add_to_pollset(ep, g_pollset);
274 
275   written_bytes = fill_socket(sv[0]);
276   gpr_log(GPR_INFO, "Wrote %" PRIuPTR " bytes", written_bytes);
277 
278   state.ep = ep;
279   state.read_bytes = 0;
280   state.target_read_bytes = static_cast<size_t>(written_bytes);
281   grpc_slice_buffer_init(&state.incoming);
282   GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx);
283 
284   grpc_endpoint_read(ep, &state.incoming, &state.read_cb, /*urgent=*/false);
285 
286   gpr_mu_lock(g_mu);
287   while (state.read_bytes < state.target_read_bytes) {
288     grpc_pollset_worker* worker = nullptr;
289     GPR_ASSERT(GRPC_LOG_IF_ERROR(
290         "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
291     gpr_mu_unlock(g_mu);
292 
293     gpr_mu_lock(g_mu);
294   }
295   GPR_ASSERT(state.read_bytes == state.target_read_bytes);
296   gpr_mu_unlock(g_mu);
297 
298   grpc_slice_buffer_destroy_internal(&state.incoming);
299   grpc_endpoint_destroy(ep);
300 }
301 
302 struct write_socket_state {
303   grpc_endpoint* ep;
304   int write_done;
305 };
306 
allocate_blocks(size_t num_bytes,size_t slice_size,size_t * num_blocks,uint8_t * current_data)307 static grpc_slice* allocate_blocks(size_t num_bytes, size_t slice_size,
308                                    size_t* num_blocks, uint8_t* current_data) {
309   size_t nslices = num_bytes / slice_size + (num_bytes % slice_size ? 1u : 0u);
310   grpc_slice* slices =
311       static_cast<grpc_slice*>(gpr_malloc(sizeof(grpc_slice) * nslices));
312   size_t num_bytes_left = num_bytes;
313   unsigned i, j;
314   unsigned char* buf;
315   *num_blocks = nslices;
316 
317   for (i = 0; i < nslices; ++i) {
318     slices[i] = grpc_slice_malloc(slice_size > num_bytes_left ? num_bytes_left
319                                                               : slice_size);
320     num_bytes_left -= GRPC_SLICE_LENGTH(slices[i]);
321     buf = GRPC_SLICE_START_PTR(slices[i]);
322     for (j = 0; j < GRPC_SLICE_LENGTH(slices[i]); ++j) {
323       buf[j] = *current_data;
324       (*current_data)++;
325     }
326   }
327   GPR_ASSERT(num_bytes_left == 0);
328   return slices;
329 }
330 
write_done(void * user_data,grpc_error * error)331 static void write_done(void* user_data /* write_socket_state */,
332                        grpc_error* error) {
333   GPR_ASSERT(error == GRPC_ERROR_NONE);
334   struct write_socket_state* state =
335       static_cast<struct write_socket_state*>(user_data);
336   gpr_mu_lock(g_mu);
337   state->write_done = 1;
338   GPR_ASSERT(
339       GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
340   gpr_mu_unlock(g_mu);
341 }
342 
drain_socket_blocking(int fd,size_t num_bytes,size_t read_size)343 void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) {
344   unsigned char* buf = static_cast<unsigned char*>(gpr_malloc(read_size));
345   ssize_t bytes_read;
346   size_t bytes_left = num_bytes;
347   int flags;
348   int current = 0;
349   int i;
350   grpc_core::ExecCtx exec_ctx;
351 
352   flags = fcntl(fd, F_GETFL, 0);
353   GPR_ASSERT(fcntl(fd, F_SETFL, flags & ~O_NONBLOCK) == 0);
354 
355   for (;;) {
356     grpc_pollset_worker* worker = nullptr;
357     gpr_mu_lock(g_mu);
358     GPR_ASSERT(GRPC_LOG_IF_ERROR(
359         "pollset_work",
360         grpc_pollset_work(g_pollset, &worker,
361                           grpc_timespec_to_millis_round_up(
362                               grpc_timeout_milliseconds_to_deadline(10)))));
363     gpr_mu_unlock(g_mu);
364 
365     do {
366       bytes_read =
367           read(fd, buf, bytes_left > read_size ? read_size : bytes_left);
368     } while (bytes_read < 0 && errno == EINTR);
369     GPR_ASSERT(bytes_read >= 0);
370     for (i = 0; i < bytes_read; ++i) {
371       GPR_ASSERT(buf[i] == current);
372       current = (current + 1) % 256;
373     }
374     bytes_left -= static_cast<size_t>(bytes_read);
375     if (bytes_left == 0) break;
376   }
377   flags = fcntl(fd, F_GETFL, 0);
378   GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
379 
380   gpr_free(buf);
381 }
382 
383 /* Verifier for timestamps callback for write_test */
timestamps_verifier(void * arg,grpc_core::Timestamps * ts,grpc_error * error)384 void timestamps_verifier(void* arg, grpc_core::Timestamps* ts,
385                          grpc_error* error) {
386   GPR_ASSERT(error == GRPC_ERROR_NONE);
387   GPR_ASSERT(arg != nullptr);
388   GPR_ASSERT(ts->sendmsg_time.time.clock_type == GPR_CLOCK_REALTIME);
389   GPR_ASSERT(ts->scheduled_time.time.clock_type == GPR_CLOCK_REALTIME);
390   GPR_ASSERT(ts->acked_time.time.clock_type == GPR_CLOCK_REALTIME);
391   gpr_atm* done_timestamps = (gpr_atm*)arg;
392   gpr_atm_rel_store(done_timestamps, static_cast<gpr_atm>(1));
393 }
394 
395 /* Write to a socket using the grpc_tcp API, then drain it directly.
396    Note that if the write does not complete immediately we need to drain the
397    socket in parallel with the read. If collect_timestamps is true, it will
398    try to get timestamps for the write. */
write_test(size_t num_bytes,size_t slice_size,bool collect_timestamps)399 static void write_test(size_t num_bytes, size_t slice_size,
400                        bool collect_timestamps) {
401   int sv[2];
402   grpc_endpoint* ep;
403   struct write_socket_state state;
404   size_t num_blocks;
405   grpc_slice* slices;
406   uint8_t current_data = 0;
407   grpc_slice_buffer outgoing;
408   grpc_closure write_done_closure;
409   grpc_millis deadline =
410       grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20));
411   grpc_core::ExecCtx exec_ctx;
412 
413   if (collect_timestamps && !grpc_event_engine_can_track_errors()) {
414     return;
415   }
416 
417   gpr_log(GPR_INFO,
418           "Start write test with %" PRIuPTR " bytes, slice size %" PRIuPTR,
419           num_bytes, slice_size);
420 
421   if (collect_timestamps) {
422     create_inet_sockets(sv);
423   } else {
424     create_sockets(sv);
425   }
426 
427   grpc_arg a[1];
428   a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE);
429   a[0].type = GRPC_ARG_INTEGER,
430   a[0].value.integer = static_cast<int>(slice_size);
431   grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
432   ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test", collect_timestamps),
433                        &args, "test");
434   grpc_endpoint_add_to_pollset(ep, g_pollset);
435 
436   state.ep = ep;
437   state.write_done = 0;
438 
439   slices = allocate_blocks(num_bytes, slice_size, &num_blocks, &current_data);
440 
441   grpc_slice_buffer_init(&outgoing);
442   grpc_slice_buffer_addn(&outgoing, slices, num_blocks);
443   GRPC_CLOSURE_INIT(&write_done_closure, write_done, &state,
444                     grpc_schedule_on_exec_ctx);
445 
446   gpr_atm done_timestamps;
447   gpr_atm_rel_store(&done_timestamps, static_cast<gpr_atm>(0));
448   grpc_endpoint_write(ep, &outgoing, &write_done_closure,
449                       grpc_event_engine_can_track_errors() && collect_timestamps
450                           ? (void*)&done_timestamps
451                           : nullptr);
452   drain_socket_blocking(sv[0], num_bytes, num_bytes);
453   exec_ctx.Flush();
454   gpr_mu_lock(g_mu);
455   for (;;) {
456     grpc_pollset_worker* worker = nullptr;
457     if (state.write_done &&
458         (!(grpc_event_engine_can_track_errors() && collect_timestamps) ||
459          gpr_atm_acq_load(&done_timestamps) == static_cast<gpr_atm>(1))) {
460       break;
461     }
462     GPR_ASSERT(GRPC_LOG_IF_ERROR(
463         "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
464     gpr_mu_unlock(g_mu);
465     exec_ctx.Flush();
466     gpr_mu_lock(g_mu);
467   }
468   gpr_mu_unlock(g_mu);
469 
470   grpc_slice_buffer_destroy_internal(&outgoing);
471   grpc_endpoint_destroy(ep);
472   gpr_free(slices);
473 }
474 
on_fd_released(void * arg,grpc_error *)475 void on_fd_released(void* arg, grpc_error* /*errors*/) {
476   int* done = static_cast<int*>(arg);
477   *done = 1;
478   GPR_ASSERT(
479       GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
480 }
481 
482 /* Do a read_test, then release fd and try to read/write again. Verify that
483    grpc_tcp_fd() is available before the fd is released. */
release_fd_test(size_t num_bytes,size_t slice_size)484 static void release_fd_test(size_t num_bytes, size_t slice_size) {
485   int sv[2];
486   grpc_endpoint* ep;
487   struct read_socket_state state;
488   size_t written_bytes;
489   int fd;
490   grpc_millis deadline =
491       grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20));
492   grpc_core::ExecCtx exec_ctx;
493   grpc_closure fd_released_cb;
494   int fd_released_done = 0;
495   GRPC_CLOSURE_INIT(&fd_released_cb, &on_fd_released, &fd_released_done,
496                     grpc_schedule_on_exec_ctx);
497 
498   gpr_log(GPR_INFO,
499           "Release fd read_test of size %" PRIuPTR ", slice size %" PRIuPTR,
500           num_bytes, slice_size);
501 
502   create_sockets(sv);
503 
504   grpc_arg a[1];
505   a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE);
506   a[0].type = GRPC_ARG_INTEGER;
507   a[0].value.integer = static_cast<int>(slice_size);
508   grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
509   ep =
510       grpc_tcp_create(grpc_fd_create(sv[1], "read_test", false), &args, "test");
511   GPR_ASSERT(grpc_tcp_fd(ep) == sv[1] && sv[1] >= 0);
512   grpc_endpoint_add_to_pollset(ep, g_pollset);
513 
514   written_bytes = fill_socket_partial(sv[0], num_bytes);
515   gpr_log(GPR_INFO, "Wrote %" PRIuPTR " bytes", written_bytes);
516 
517   state.ep = ep;
518   state.read_bytes = 0;
519   state.target_read_bytes = written_bytes;
520   grpc_slice_buffer_init(&state.incoming);
521   GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx);
522 
523   grpc_endpoint_read(ep, &state.incoming, &state.read_cb, /*urgent=*/false);
524 
525   gpr_mu_lock(g_mu);
526   while (state.read_bytes < state.target_read_bytes) {
527     grpc_pollset_worker* worker = nullptr;
528     GPR_ASSERT(GRPC_LOG_IF_ERROR(
529         "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
530     gpr_log(GPR_DEBUG, "wakeup: read=%" PRIdPTR " target=%" PRIdPTR,
531             state.read_bytes, state.target_read_bytes);
532     gpr_mu_unlock(g_mu);
533     grpc_core::ExecCtx::Get()->Flush();
534     gpr_mu_lock(g_mu);
535   }
536   GPR_ASSERT(state.read_bytes == state.target_read_bytes);
537   gpr_mu_unlock(g_mu);
538 
539   grpc_slice_buffer_destroy_internal(&state.incoming);
540   grpc_tcp_destroy_and_release_fd(ep, &fd, &fd_released_cb);
541   grpc_core::ExecCtx::Get()->Flush();
542   gpr_mu_lock(g_mu);
543   while (!fd_released_done) {
544     grpc_pollset_worker* worker = nullptr;
545     GPR_ASSERT(GRPC_LOG_IF_ERROR(
546         "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
547     gpr_log(GPR_DEBUG, "wakeup: fd_released_done=%d", fd_released_done);
548   }
549   gpr_mu_unlock(g_mu);
550   GPR_ASSERT(fd_released_done == 1);
551   GPR_ASSERT(fd == sv[1]);
552 
553   written_bytes = fill_socket_partial(sv[0], num_bytes);
554   drain_socket_blocking(fd, written_bytes, written_bytes);
555   written_bytes = fill_socket_partial(fd, num_bytes);
556   drain_socket_blocking(sv[0], written_bytes, written_bytes);
557   close(fd);
558 }
559 
run_tests(void)560 void run_tests(void) {
561   size_t i = 0;
562 
563   read_test(100, 8192);
564   read_test(10000, 8192);
565   read_test(10000, 137);
566   read_test(10000, 1);
567   large_read_test(8192);
568   large_read_test(1);
569 
570   write_test(100, 8192, false);
571   write_test(100, 1, false);
572   write_test(100000, 8192, false);
573   write_test(100000, 1, false);
574   write_test(100000, 137, false);
575 
576   write_test(100, 8192, true);
577   write_test(100, 1, true);
578   write_test(100000, 8192, true);
579   write_test(100000, 1, true);
580   write_test(100, 137, true);
581 
582   for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) {
583     write_test(40320, i, false);
584     write_test(40320, i, true);
585   }
586 
587   release_fd_test(100, 8192);
588 }
589 
clean_up(void)590 static void clean_up(void) {}
591 
create_fixture_tcp_socketpair(size_t slice_size)592 static grpc_endpoint_test_fixture create_fixture_tcp_socketpair(
593     size_t slice_size) {
594   int sv[2];
595   grpc_endpoint_test_fixture f;
596   grpc_core::ExecCtx exec_ctx;
597 
598   create_sockets(sv);
599   grpc_resource_quota* resource_quota =
600       grpc_resource_quota_create("tcp_posix_test_socketpair");
601   grpc_arg a[1];
602   a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE);
603   a[0].type = GRPC_ARG_INTEGER;
604   a[0].value.integer = static_cast<int>(slice_size);
605   grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
606   f.client_ep = grpc_tcp_create(grpc_fd_create(sv[0], "fixture:client", false),
607                                 &args, "test");
608   f.server_ep = grpc_tcp_create(grpc_fd_create(sv[1], "fixture:server", false),
609                                 &args, "test");
610   grpc_resource_quota_unref_internal(resource_quota);
611   grpc_endpoint_add_to_pollset(f.client_ep, g_pollset);
612   grpc_endpoint_add_to_pollset(f.server_ep, g_pollset);
613 
614   return f;
615 }
616 
617 static grpc_endpoint_test_config configs[] = {
618     {"tcp/tcp_socketpair", create_fixture_tcp_socketpair, clean_up},
619 };
620 
destroy_pollset(void * p,grpc_error *)621 static void destroy_pollset(void* p, grpc_error* /*error*/) {
622   grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
623 }
624 
main(int argc,char ** argv)625 int main(int argc, char** argv) {
626   grpc_closure destroyed;
627   grpc::testing::TestEnvironment env(argc, argv);
628   grpc_init();
629   grpc_core::grpc_tcp_set_write_timestamps_callback(timestamps_verifier);
630   {
631     grpc_core::ExecCtx exec_ctx;
632     g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
633     grpc_pollset_init(g_pollset, &g_mu);
634     grpc_endpoint_tests(configs[0], g_pollset, g_mu);
635     run_tests();
636     GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
637                       grpc_schedule_on_exec_ctx);
638     grpc_pollset_shutdown(g_pollset, &destroyed);
639 
640     grpc_core::ExecCtx::Get()->Flush();
641   }
642   grpc_shutdown();
643   gpr_free(g_pollset);
644 
645   return 0;
646 }
647 
648 #else /* GRPC_POSIX_SOCKET_TCP */
649 
main(int argc,char ** argv)650 int main(int argc, char** argv) { return 1; }
651 
652 #endif /* GRPC_POSIX_SOCKET_TCP */
653