1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include "src/core/lib/iomgr/port.h"
20
21 // This test won't work except with posix sockets enabled
22 #ifdef GRPC_POSIX_SOCKET_TCP
23
24 #include "src/core/lib/iomgr/tcp_posix.h"
25
26 #include <errno.h>
27 #include <fcntl.h>
28 #include <string.h>
29 #include <sys/socket.h>
30 #include <sys/types.h>
31 #include <unistd.h>
32
33 #include <grpc/grpc.h>
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/log.h>
36 #include <grpc/support/time.h>
37
38 #include "src/core/lib/gpr/useful.h"
39 #include "src/core/lib/iomgr/buffer_list.h"
40 #include "src/core/lib/iomgr/ev_posix.h"
41 #include "src/core/lib/iomgr/sockaddr_posix.h"
42 #include "src/core/lib/slice/slice_internal.h"
43 #include "test/core/iomgr/endpoint_tests.h"
44 #include "test/core/util/test_config.h"
45
46 static gpr_mu* g_mu;
47 static grpc_pollset* g_pollset;
48
49 /*
50 General test notes:
51
52 All tests which write data into a socket write i%256 into byte i, which is
53 verified by readers.
54
55 In general there are a few interesting things to vary which may lead to
56 exercising different codepaths in an implementation:
57 1. Total amount of data written to the socket
58 2. Size of slice allocations
59 3. Amount of data we read from or write to the socket at once
60
61 The tests here tend to parameterize these where applicable.
62
63 */
64
create_sockets(int sv[2])65 static void create_sockets(int sv[2]) {
66 int flags;
67 GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0);
68 flags = fcntl(sv[0], F_GETFL, 0);
69 GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0);
70 flags = fcntl(sv[1], F_GETFL, 0);
71 GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
72 }
73
create_inet_sockets(int sv[2])74 static void create_inet_sockets(int sv[2]) {
75 /* Prepare listening socket */
76 struct sockaddr_in addr;
77 memset(&addr, 0, sizeof(struct sockaddr_in));
78 addr.sin_family = AF_INET;
79 int sock = socket(AF_INET, SOCK_STREAM, 0);
80 GPR_ASSERT(sock);
81 GPR_ASSERT(bind(sock, (sockaddr*)&addr, sizeof(sockaddr_in)) == 0);
82 listen(sock, 1);
83
84 /* Prepare client socket and connect to server */
85 socklen_t len = sizeof(sockaddr_in);
86 GPR_ASSERT(getsockname(sock, (sockaddr*)&addr, &len) == 0);
87
88 int client = socket(AF_INET, SOCK_STREAM, 0);
89 GPR_ASSERT(client);
90 int ret;
91 do {
92 ret = connect(client, (sockaddr*)&addr, sizeof(sockaddr_in));
93 } while (ret == -1 && errno == EINTR);
94
95 /* Accept client connection */
96 len = sizeof(socklen_t);
97 int server;
98 do {
99 server = accept(sock, (sockaddr*)&addr, (socklen_t*)&len);
100 } while (server == -1 && errno == EINTR);
101 GPR_ASSERT(server != -1);
102
103 sv[0] = server;
104 sv[1] = client;
105 int flags = fcntl(sv[0], F_GETFL, 0);
106 GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0);
107 flags = fcntl(sv[1], F_GETFL, 0);
108 GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
109 }
110
fill_socket(int fd)111 static ssize_t fill_socket(int fd) {
112 ssize_t write_bytes;
113 ssize_t total_bytes = 0;
114 int i;
115 unsigned char buf[256];
116 for (i = 0; i < 256; ++i) {
117 buf[i] = static_cast<uint8_t>(i);
118 }
119 do {
120 write_bytes = write(fd, buf, 256);
121 if (write_bytes > 0) {
122 total_bytes += write_bytes;
123 }
124 } while (write_bytes >= 0 || errno == EINTR);
125 GPR_ASSERT(errno == EAGAIN);
126 return total_bytes;
127 }
128
fill_socket_partial(int fd,size_t bytes)129 static size_t fill_socket_partial(int fd, size_t bytes) {
130 ssize_t write_bytes;
131 size_t total_bytes = 0;
132 unsigned char* buf = static_cast<unsigned char*>(gpr_malloc(bytes));
133 unsigned i;
134 for (i = 0; i < bytes; ++i) {
135 buf[i] = static_cast<uint8_t>(i % 256);
136 }
137
138 do {
139 write_bytes = write(fd, buf, bytes - total_bytes);
140 if (write_bytes > 0) {
141 total_bytes += static_cast<size_t>(write_bytes);
142 }
143 } while ((write_bytes >= 0 || errno == EINTR) && bytes > total_bytes);
144
145 gpr_free(buf);
146
147 return total_bytes;
148 }
149
150 struct read_socket_state {
151 grpc_endpoint* ep;
152 size_t read_bytes;
153 size_t target_read_bytes;
154 grpc_slice_buffer incoming;
155 grpc_closure read_cb;
156 };
157
count_slices(grpc_slice * slices,size_t nslices,int * current_data)158 static size_t count_slices(grpc_slice* slices, size_t nslices,
159 int* current_data) {
160 size_t num_bytes = 0;
161 unsigned i, j;
162 unsigned char* buf;
163 for (i = 0; i < nslices; ++i) {
164 buf = GRPC_SLICE_START_PTR(slices[i]);
165 for (j = 0; j < GRPC_SLICE_LENGTH(slices[i]); ++j) {
166 GPR_ASSERT(buf[j] == *current_data);
167 *current_data = (*current_data + 1) % 256;
168 }
169 num_bytes += GRPC_SLICE_LENGTH(slices[i]);
170 }
171 return num_bytes;
172 }
173
read_cb(void * user_data,grpc_error * error)174 static void read_cb(void* user_data, grpc_error* error) {
175 struct read_socket_state* state =
176 static_cast<struct read_socket_state*>(user_data);
177 size_t read_bytes;
178 int current_data;
179
180 GPR_ASSERT(error == GRPC_ERROR_NONE);
181
182 gpr_mu_lock(g_mu);
183 current_data = state->read_bytes % 256;
184 read_bytes = count_slices(state->incoming.slices, state->incoming.count,
185 ¤t_data);
186 state->read_bytes += read_bytes;
187 gpr_log(GPR_INFO, "Read %" PRIuPTR " bytes of %" PRIuPTR, read_bytes,
188 state->target_read_bytes);
189 if (state->read_bytes >= state->target_read_bytes) {
190 GPR_ASSERT(
191 GRPC_LOG_IF_ERROR("kick", grpc_pollset_kick(g_pollset, nullptr)));
192 gpr_mu_unlock(g_mu);
193 } else {
194 gpr_mu_unlock(g_mu);
195 grpc_endpoint_read(state->ep, &state->incoming, &state->read_cb,
196 /*urgent=*/false);
197 }
198 }
199
200 /* Write to a socket, then read from it using the grpc_tcp API. */
read_test(size_t num_bytes,size_t slice_size)201 static void read_test(size_t num_bytes, size_t slice_size) {
202 int sv[2];
203 grpc_endpoint* ep;
204 struct read_socket_state state;
205 size_t written_bytes;
206 grpc_millis deadline =
207 grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20));
208 grpc_core::ExecCtx exec_ctx;
209
210 gpr_log(GPR_INFO, "Read test of size %" PRIuPTR ", slice size %" PRIuPTR,
211 num_bytes, slice_size);
212
213 create_sockets(sv);
214
215 grpc_arg a[1];
216 a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE);
217 a[0].type = GRPC_ARG_INTEGER,
218 a[0].value.integer = static_cast<int>(slice_size);
219 grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
220 ep =
221 grpc_tcp_create(grpc_fd_create(sv[1], "read_test", false), &args, "test");
222 grpc_endpoint_add_to_pollset(ep, g_pollset);
223
224 written_bytes = fill_socket_partial(sv[0], num_bytes);
225 gpr_log(GPR_INFO, "Wrote %" PRIuPTR " bytes", written_bytes);
226
227 state.ep = ep;
228 state.read_bytes = 0;
229 state.target_read_bytes = written_bytes;
230 grpc_slice_buffer_init(&state.incoming);
231 GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx);
232
233 grpc_endpoint_read(ep, &state.incoming, &state.read_cb, /*urgent=*/false);
234
235 gpr_mu_lock(g_mu);
236 while (state.read_bytes < state.target_read_bytes) {
237 grpc_pollset_worker* worker = nullptr;
238 GPR_ASSERT(GRPC_LOG_IF_ERROR(
239 "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
240 gpr_mu_unlock(g_mu);
241
242 gpr_mu_lock(g_mu);
243 }
244 GPR_ASSERT(state.read_bytes == state.target_read_bytes);
245 gpr_mu_unlock(g_mu);
246
247 grpc_slice_buffer_destroy_internal(&state.incoming);
248 grpc_endpoint_destroy(ep);
249 }
250
251 /* Write to a socket until it fills up, then read from it using the grpc_tcp
252 API. */
large_read_test(size_t slice_size)253 static void large_read_test(size_t slice_size) {
254 int sv[2];
255 grpc_endpoint* ep;
256 struct read_socket_state state;
257 ssize_t written_bytes;
258 grpc_millis deadline =
259 grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20));
260 grpc_core::ExecCtx exec_ctx;
261
262 gpr_log(GPR_INFO, "Start large read test, slice size %" PRIuPTR, slice_size);
263
264 create_sockets(sv);
265
266 grpc_arg a[1];
267 a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE);
268 a[0].type = GRPC_ARG_INTEGER;
269 a[0].value.integer = static_cast<int>(slice_size);
270 grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
271 ep = grpc_tcp_create(grpc_fd_create(sv[1], "large_read_test", false), &args,
272 "test");
273 grpc_endpoint_add_to_pollset(ep, g_pollset);
274
275 written_bytes = fill_socket(sv[0]);
276 gpr_log(GPR_INFO, "Wrote %" PRIuPTR " bytes", written_bytes);
277
278 state.ep = ep;
279 state.read_bytes = 0;
280 state.target_read_bytes = static_cast<size_t>(written_bytes);
281 grpc_slice_buffer_init(&state.incoming);
282 GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx);
283
284 grpc_endpoint_read(ep, &state.incoming, &state.read_cb, /*urgent=*/false);
285
286 gpr_mu_lock(g_mu);
287 while (state.read_bytes < state.target_read_bytes) {
288 grpc_pollset_worker* worker = nullptr;
289 GPR_ASSERT(GRPC_LOG_IF_ERROR(
290 "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
291 gpr_mu_unlock(g_mu);
292
293 gpr_mu_lock(g_mu);
294 }
295 GPR_ASSERT(state.read_bytes == state.target_read_bytes);
296 gpr_mu_unlock(g_mu);
297
298 grpc_slice_buffer_destroy_internal(&state.incoming);
299 grpc_endpoint_destroy(ep);
300 }
301
302 struct write_socket_state {
303 grpc_endpoint* ep;
304 int write_done;
305 };
306
allocate_blocks(size_t num_bytes,size_t slice_size,size_t * num_blocks,uint8_t * current_data)307 static grpc_slice* allocate_blocks(size_t num_bytes, size_t slice_size,
308 size_t* num_blocks, uint8_t* current_data) {
309 size_t nslices = num_bytes / slice_size + (num_bytes % slice_size ? 1u : 0u);
310 grpc_slice* slices =
311 static_cast<grpc_slice*>(gpr_malloc(sizeof(grpc_slice) * nslices));
312 size_t num_bytes_left = num_bytes;
313 unsigned i, j;
314 unsigned char* buf;
315 *num_blocks = nslices;
316
317 for (i = 0; i < nslices; ++i) {
318 slices[i] = grpc_slice_malloc(slice_size > num_bytes_left ? num_bytes_left
319 : slice_size);
320 num_bytes_left -= GRPC_SLICE_LENGTH(slices[i]);
321 buf = GRPC_SLICE_START_PTR(slices[i]);
322 for (j = 0; j < GRPC_SLICE_LENGTH(slices[i]); ++j) {
323 buf[j] = *current_data;
324 (*current_data)++;
325 }
326 }
327 GPR_ASSERT(num_bytes_left == 0);
328 return slices;
329 }
330
write_done(void * user_data,grpc_error * error)331 static void write_done(void* user_data /* write_socket_state */,
332 grpc_error* error) {
333 GPR_ASSERT(error == GRPC_ERROR_NONE);
334 struct write_socket_state* state =
335 static_cast<struct write_socket_state*>(user_data);
336 gpr_mu_lock(g_mu);
337 state->write_done = 1;
338 GPR_ASSERT(
339 GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
340 gpr_mu_unlock(g_mu);
341 }
342
drain_socket_blocking(int fd,size_t num_bytes,size_t read_size)343 void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) {
344 unsigned char* buf = static_cast<unsigned char*>(gpr_malloc(read_size));
345 ssize_t bytes_read;
346 size_t bytes_left = num_bytes;
347 int flags;
348 int current = 0;
349 int i;
350 grpc_core::ExecCtx exec_ctx;
351
352 flags = fcntl(fd, F_GETFL, 0);
353 GPR_ASSERT(fcntl(fd, F_SETFL, flags & ~O_NONBLOCK) == 0);
354
355 for (;;) {
356 grpc_pollset_worker* worker = nullptr;
357 gpr_mu_lock(g_mu);
358 GPR_ASSERT(GRPC_LOG_IF_ERROR(
359 "pollset_work",
360 grpc_pollset_work(g_pollset, &worker,
361 grpc_timespec_to_millis_round_up(
362 grpc_timeout_milliseconds_to_deadline(10)))));
363 gpr_mu_unlock(g_mu);
364
365 do {
366 bytes_read =
367 read(fd, buf, bytes_left > read_size ? read_size : bytes_left);
368 } while (bytes_read < 0 && errno == EINTR);
369 GPR_ASSERT(bytes_read >= 0);
370 for (i = 0; i < bytes_read; ++i) {
371 GPR_ASSERT(buf[i] == current);
372 current = (current + 1) % 256;
373 }
374 bytes_left -= static_cast<size_t>(bytes_read);
375 if (bytes_left == 0) break;
376 }
377 flags = fcntl(fd, F_GETFL, 0);
378 GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
379
380 gpr_free(buf);
381 }
382
383 /* Verifier for timestamps callback for write_test */
timestamps_verifier(void * arg,grpc_core::Timestamps * ts,grpc_error * error)384 void timestamps_verifier(void* arg, grpc_core::Timestamps* ts,
385 grpc_error* error) {
386 GPR_ASSERT(error == GRPC_ERROR_NONE);
387 GPR_ASSERT(arg != nullptr);
388 GPR_ASSERT(ts->sendmsg_time.time.clock_type == GPR_CLOCK_REALTIME);
389 GPR_ASSERT(ts->scheduled_time.time.clock_type == GPR_CLOCK_REALTIME);
390 GPR_ASSERT(ts->acked_time.time.clock_type == GPR_CLOCK_REALTIME);
391 gpr_atm* done_timestamps = (gpr_atm*)arg;
392 gpr_atm_rel_store(done_timestamps, static_cast<gpr_atm>(1));
393 }
394
395 /* Write to a socket using the grpc_tcp API, then drain it directly.
396 Note that if the write does not complete immediately we need to drain the
397 socket in parallel with the read. If collect_timestamps is true, it will
398 try to get timestamps for the write. */
write_test(size_t num_bytes,size_t slice_size,bool collect_timestamps)399 static void write_test(size_t num_bytes, size_t slice_size,
400 bool collect_timestamps) {
401 int sv[2];
402 grpc_endpoint* ep;
403 struct write_socket_state state;
404 size_t num_blocks;
405 grpc_slice* slices;
406 uint8_t current_data = 0;
407 grpc_slice_buffer outgoing;
408 grpc_closure write_done_closure;
409 grpc_millis deadline =
410 grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20));
411 grpc_core::ExecCtx exec_ctx;
412
413 if (collect_timestamps && !grpc_event_engine_can_track_errors()) {
414 return;
415 }
416
417 gpr_log(GPR_INFO,
418 "Start write test with %" PRIuPTR " bytes, slice size %" PRIuPTR,
419 num_bytes, slice_size);
420
421 if (collect_timestamps) {
422 create_inet_sockets(sv);
423 } else {
424 create_sockets(sv);
425 }
426
427 grpc_arg a[1];
428 a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE);
429 a[0].type = GRPC_ARG_INTEGER,
430 a[0].value.integer = static_cast<int>(slice_size);
431 grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
432 ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test", collect_timestamps),
433 &args, "test");
434 grpc_endpoint_add_to_pollset(ep, g_pollset);
435
436 state.ep = ep;
437 state.write_done = 0;
438
439 slices = allocate_blocks(num_bytes, slice_size, &num_blocks, ¤t_data);
440
441 grpc_slice_buffer_init(&outgoing);
442 grpc_slice_buffer_addn(&outgoing, slices, num_blocks);
443 GRPC_CLOSURE_INIT(&write_done_closure, write_done, &state,
444 grpc_schedule_on_exec_ctx);
445
446 gpr_atm done_timestamps;
447 gpr_atm_rel_store(&done_timestamps, static_cast<gpr_atm>(0));
448 grpc_endpoint_write(ep, &outgoing, &write_done_closure,
449 grpc_event_engine_can_track_errors() && collect_timestamps
450 ? (void*)&done_timestamps
451 : nullptr);
452 drain_socket_blocking(sv[0], num_bytes, num_bytes);
453 exec_ctx.Flush();
454 gpr_mu_lock(g_mu);
455 for (;;) {
456 grpc_pollset_worker* worker = nullptr;
457 if (state.write_done &&
458 (!(grpc_event_engine_can_track_errors() && collect_timestamps) ||
459 gpr_atm_acq_load(&done_timestamps) == static_cast<gpr_atm>(1))) {
460 break;
461 }
462 GPR_ASSERT(GRPC_LOG_IF_ERROR(
463 "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
464 gpr_mu_unlock(g_mu);
465 exec_ctx.Flush();
466 gpr_mu_lock(g_mu);
467 }
468 gpr_mu_unlock(g_mu);
469
470 grpc_slice_buffer_destroy_internal(&outgoing);
471 grpc_endpoint_destroy(ep);
472 gpr_free(slices);
473 }
474
on_fd_released(void * arg,grpc_error *)475 void on_fd_released(void* arg, grpc_error* /*errors*/) {
476 int* done = static_cast<int*>(arg);
477 *done = 1;
478 GPR_ASSERT(
479 GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
480 }
481
482 /* Do a read_test, then release fd and try to read/write again. Verify that
483 grpc_tcp_fd() is available before the fd is released. */
release_fd_test(size_t num_bytes,size_t slice_size)484 static void release_fd_test(size_t num_bytes, size_t slice_size) {
485 int sv[2];
486 grpc_endpoint* ep;
487 struct read_socket_state state;
488 size_t written_bytes;
489 int fd;
490 grpc_millis deadline =
491 grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20));
492 grpc_core::ExecCtx exec_ctx;
493 grpc_closure fd_released_cb;
494 int fd_released_done = 0;
495 GRPC_CLOSURE_INIT(&fd_released_cb, &on_fd_released, &fd_released_done,
496 grpc_schedule_on_exec_ctx);
497
498 gpr_log(GPR_INFO,
499 "Release fd read_test of size %" PRIuPTR ", slice size %" PRIuPTR,
500 num_bytes, slice_size);
501
502 create_sockets(sv);
503
504 grpc_arg a[1];
505 a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE);
506 a[0].type = GRPC_ARG_INTEGER;
507 a[0].value.integer = static_cast<int>(slice_size);
508 grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
509 ep =
510 grpc_tcp_create(grpc_fd_create(sv[1], "read_test", false), &args, "test");
511 GPR_ASSERT(grpc_tcp_fd(ep) == sv[1] && sv[1] >= 0);
512 grpc_endpoint_add_to_pollset(ep, g_pollset);
513
514 written_bytes = fill_socket_partial(sv[0], num_bytes);
515 gpr_log(GPR_INFO, "Wrote %" PRIuPTR " bytes", written_bytes);
516
517 state.ep = ep;
518 state.read_bytes = 0;
519 state.target_read_bytes = written_bytes;
520 grpc_slice_buffer_init(&state.incoming);
521 GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx);
522
523 grpc_endpoint_read(ep, &state.incoming, &state.read_cb, /*urgent=*/false);
524
525 gpr_mu_lock(g_mu);
526 while (state.read_bytes < state.target_read_bytes) {
527 grpc_pollset_worker* worker = nullptr;
528 GPR_ASSERT(GRPC_LOG_IF_ERROR(
529 "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
530 gpr_log(GPR_DEBUG, "wakeup: read=%" PRIdPTR " target=%" PRIdPTR,
531 state.read_bytes, state.target_read_bytes);
532 gpr_mu_unlock(g_mu);
533 grpc_core::ExecCtx::Get()->Flush();
534 gpr_mu_lock(g_mu);
535 }
536 GPR_ASSERT(state.read_bytes == state.target_read_bytes);
537 gpr_mu_unlock(g_mu);
538
539 grpc_slice_buffer_destroy_internal(&state.incoming);
540 grpc_tcp_destroy_and_release_fd(ep, &fd, &fd_released_cb);
541 grpc_core::ExecCtx::Get()->Flush();
542 gpr_mu_lock(g_mu);
543 while (!fd_released_done) {
544 grpc_pollset_worker* worker = nullptr;
545 GPR_ASSERT(GRPC_LOG_IF_ERROR(
546 "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
547 gpr_log(GPR_DEBUG, "wakeup: fd_released_done=%d", fd_released_done);
548 }
549 gpr_mu_unlock(g_mu);
550 GPR_ASSERT(fd_released_done == 1);
551 GPR_ASSERT(fd == sv[1]);
552
553 written_bytes = fill_socket_partial(sv[0], num_bytes);
554 drain_socket_blocking(fd, written_bytes, written_bytes);
555 written_bytes = fill_socket_partial(fd, num_bytes);
556 drain_socket_blocking(sv[0], written_bytes, written_bytes);
557 close(fd);
558 }
559
run_tests(void)560 void run_tests(void) {
561 size_t i = 0;
562
563 read_test(100, 8192);
564 read_test(10000, 8192);
565 read_test(10000, 137);
566 read_test(10000, 1);
567 large_read_test(8192);
568 large_read_test(1);
569
570 write_test(100, 8192, false);
571 write_test(100, 1, false);
572 write_test(100000, 8192, false);
573 write_test(100000, 1, false);
574 write_test(100000, 137, false);
575
576 write_test(100, 8192, true);
577 write_test(100, 1, true);
578 write_test(100000, 8192, true);
579 write_test(100000, 1, true);
580 write_test(100, 137, true);
581
582 for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) {
583 write_test(40320, i, false);
584 write_test(40320, i, true);
585 }
586
587 release_fd_test(100, 8192);
588 }
589
clean_up(void)590 static void clean_up(void) {}
591
create_fixture_tcp_socketpair(size_t slice_size)592 static grpc_endpoint_test_fixture create_fixture_tcp_socketpair(
593 size_t slice_size) {
594 int sv[2];
595 grpc_endpoint_test_fixture f;
596 grpc_core::ExecCtx exec_ctx;
597
598 create_sockets(sv);
599 grpc_resource_quota* resource_quota =
600 grpc_resource_quota_create("tcp_posix_test_socketpair");
601 grpc_arg a[1];
602 a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE);
603 a[0].type = GRPC_ARG_INTEGER;
604 a[0].value.integer = static_cast<int>(slice_size);
605 grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
606 f.client_ep = grpc_tcp_create(grpc_fd_create(sv[0], "fixture:client", false),
607 &args, "test");
608 f.server_ep = grpc_tcp_create(grpc_fd_create(sv[1], "fixture:server", false),
609 &args, "test");
610 grpc_resource_quota_unref_internal(resource_quota);
611 grpc_endpoint_add_to_pollset(f.client_ep, g_pollset);
612 grpc_endpoint_add_to_pollset(f.server_ep, g_pollset);
613
614 return f;
615 }
616
617 static grpc_endpoint_test_config configs[] = {
618 {"tcp/tcp_socketpair", create_fixture_tcp_socketpair, clean_up},
619 };
620
destroy_pollset(void * p,grpc_error *)621 static void destroy_pollset(void* p, grpc_error* /*error*/) {
622 grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
623 }
624
main(int argc,char ** argv)625 int main(int argc, char** argv) {
626 grpc_closure destroyed;
627 grpc::testing::TestEnvironment env(argc, argv);
628 grpc_init();
629 grpc_core::grpc_tcp_set_write_timestamps_callback(timestamps_verifier);
630 {
631 grpc_core::ExecCtx exec_ctx;
632 g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
633 grpc_pollset_init(g_pollset, &g_mu);
634 grpc_endpoint_tests(configs[0], g_pollset, g_mu);
635 run_tests();
636 GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
637 grpc_schedule_on_exec_ctx);
638 grpc_pollset_shutdown(g_pollset, &destroyed);
639
640 grpc_core::ExecCtx::Get()->Flush();
641 }
642 grpc_shutdown();
643 gpr_free(g_pollset);
644
645 return 0;
646 }
647
648 #else /* GRPC_POSIX_SOCKET_TCP */
649
main(int argc,char ** argv)650 int main(int argc, char** argv) { return 1; }
651
652 #endif /* GRPC_POSIX_SOCKET_TCP */
653