1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include "test/core/iomgr/endpoint_tests.h"
20
21 #include <stdbool.h>
22 #include <sys/types.h>
23
24 #include <grpc/slice.h>
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log.h>
27 #include <grpc/support/time.h>
28
29 #include "src/core/lib/gpr/useful.h"
30 #include "src/core/lib/slice/slice_internal.h"
31 #include "test/core/util/test_config.h"
32
33 /*
34 General test notes:
35
36 All tests which write data into an endpoint write i%256 into byte i, which
37 is verified by readers.
38
39 In general there are a few interesting things to vary which may lead to
40 exercising different codepaths in an implementation:
41 1. Total amount of data written to the endpoint
42 2. Size of slice allocations
43 3. Amount of data we read from or write to the endpoint at once
44
45 The tests here tend to parameterize these where applicable.
46
47 */
48
49 static gpr_mu* g_mu;
50 static grpc_pollset* g_pollset;
51
count_slices(grpc_slice * slices,size_t nslices,int * current_data)52 size_t count_slices(grpc_slice* slices, size_t nslices, int* current_data) {
53 size_t num_bytes = 0;
54 size_t i;
55 size_t j;
56 unsigned char* buf;
57 for (i = 0; i < nslices; ++i) {
58 buf = GRPC_SLICE_START_PTR(slices[i]);
59 for (j = 0; j < GRPC_SLICE_LENGTH(slices[i]); ++j) {
60 GPR_ASSERT(buf[j] == *current_data);
61 *current_data = (*current_data + 1) % 256;
62 }
63 num_bytes += GRPC_SLICE_LENGTH(slices[i]);
64 }
65 return num_bytes;
66 }
67
begin_test(grpc_endpoint_test_config config,const char * test_name,size_t slice_size)68 static grpc_endpoint_test_fixture begin_test(grpc_endpoint_test_config config,
69 const char* test_name,
70 size_t slice_size) {
71 gpr_log(GPR_INFO, "%s/%s", test_name, config.name);
72 return config.create_fixture(slice_size);
73 }
74
end_test(grpc_endpoint_test_config config)75 static void end_test(grpc_endpoint_test_config config) { config.clean_up(); }
76
allocate_blocks(size_t num_bytes,size_t slice_size,size_t * num_blocks,uint8_t * current_data)77 static grpc_slice* allocate_blocks(size_t num_bytes, size_t slice_size,
78 size_t* num_blocks, uint8_t* current_data) {
79 size_t nslices = num_bytes / slice_size + (num_bytes % slice_size ? 1 : 0);
80 grpc_slice* slices =
81 static_cast<grpc_slice*>(gpr_malloc(sizeof(grpc_slice) * nslices));
82 size_t num_bytes_left = num_bytes;
83 size_t i;
84 size_t j;
85 unsigned char* buf;
86 *num_blocks = nslices;
87
88 for (i = 0; i < nslices; ++i) {
89 slices[i] = grpc_slice_malloc(slice_size > num_bytes_left ? num_bytes_left
90 : slice_size);
91 num_bytes_left -= GRPC_SLICE_LENGTH(slices[i]);
92 buf = GRPC_SLICE_START_PTR(slices[i]);
93 for (j = 0; j < GRPC_SLICE_LENGTH(slices[i]); ++j) {
94 buf[j] = *current_data;
95 (*current_data)++;
96 }
97 }
98 GPR_ASSERT(num_bytes_left == 0);
99 return slices;
100 }
101
102 struct read_and_write_test_state {
103 grpc_endpoint* read_ep;
104 grpc_endpoint* write_ep;
105 size_t target_bytes;
106 size_t bytes_read;
107 size_t current_write_size;
108 size_t bytes_written;
109 int current_read_data;
110 uint8_t current_write_data;
111 int read_done;
112 int write_done;
113 grpc_slice_buffer incoming;
114 grpc_slice_buffer outgoing;
115 grpc_closure done_read;
116 grpc_closure done_write;
117 grpc_closure read_scheduler;
118 grpc_closure write_scheduler;
119 };
120
read_scheduler(void * data,grpc_error *)121 static void read_scheduler(void* data, grpc_error* /* error */) {
122 struct read_and_write_test_state* state =
123 static_cast<struct read_and_write_test_state*>(data);
124 grpc_endpoint_read(state->read_ep, &state->incoming, &state->done_read,
125 /*urgent=*/false);
126 }
127
read_and_write_test_read_handler(void * data,grpc_error * error)128 static void read_and_write_test_read_handler(void* data, grpc_error* error) {
129 struct read_and_write_test_state* state =
130 static_cast<struct read_and_write_test_state*>(data);
131
132 state->bytes_read += count_slices(
133 state->incoming.slices, state->incoming.count, &state->current_read_data);
134 if (state->bytes_read == state->target_bytes || error != GRPC_ERROR_NONE) {
135 gpr_log(GPR_INFO, "Read handler done");
136 gpr_mu_lock(g_mu);
137 state->read_done = 1 + (error == GRPC_ERROR_NONE);
138 GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr));
139 gpr_mu_unlock(g_mu);
140 } else if (error == GRPC_ERROR_NONE) {
141 /* We perform many reads one after another. If grpc_endpoint_read and the
142 * read_handler are both run inline, we might end up growing the stack
143 * beyond the limit. Schedule the read on ExecCtx to avoid this. */
144 grpc_core::ExecCtx::Run(DEBUG_LOCATION, &state->read_scheduler,
145 GRPC_ERROR_NONE);
146 }
147 }
148
write_scheduler(void * data,grpc_error *)149 static void write_scheduler(void* data, grpc_error* /* error */) {
150 struct read_and_write_test_state* state =
151 static_cast<struct read_and_write_test_state*>(data);
152 grpc_endpoint_write(state->write_ep, &state->outgoing, &state->done_write,
153 nullptr);
154 }
155
read_and_write_test_write_handler(void * data,grpc_error * error)156 static void read_and_write_test_write_handler(void* data, grpc_error* error) {
157 struct read_and_write_test_state* state =
158 static_cast<struct read_and_write_test_state*>(data);
159 grpc_slice* slices = nullptr;
160 size_t nslices;
161
162 if (error == GRPC_ERROR_NONE) {
163 state->bytes_written += state->current_write_size;
164 if (state->target_bytes - state->bytes_written <
165 state->current_write_size) {
166 state->current_write_size = state->target_bytes - state->bytes_written;
167 }
168 if (state->current_write_size != 0) {
169 slices = allocate_blocks(state->current_write_size, 8192, &nslices,
170 &state->current_write_data);
171 grpc_slice_buffer_reset_and_unref(&state->outgoing);
172 grpc_slice_buffer_addn(&state->outgoing, slices, nslices);
173 /* We perform many writes one after another. If grpc_endpoint_write and
174 * the write_handler are both run inline, we might end up growing the
175 * stack beyond the limit. Schedule the write on ExecCtx to avoid this. */
176 grpc_core::ExecCtx::Run(DEBUG_LOCATION, &state->write_scheduler,
177 GRPC_ERROR_NONE);
178 gpr_free(slices);
179 return;
180 }
181 }
182
183 gpr_log(GPR_INFO, "Write handler done");
184 gpr_mu_lock(g_mu);
185 state->write_done = 1 + (error == GRPC_ERROR_NONE);
186 GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr));
187 gpr_mu_unlock(g_mu);
188 }
189
190 /* Do both reading and writing using the grpc_endpoint API.
191
192 This also includes a test of the shutdown behavior.
193 */
read_and_write_test(grpc_endpoint_test_config config,size_t num_bytes,size_t write_size,size_t slice_size,bool shutdown)194 static void read_and_write_test(grpc_endpoint_test_config config,
195 size_t num_bytes, size_t write_size,
196 size_t slice_size, bool shutdown) {
197 struct read_and_write_test_state state;
198 grpc_endpoint_test_fixture f =
199 begin_test(config, "read_and_write_test", slice_size);
200 grpc_core::ExecCtx exec_ctx;
201 grpc_millis deadline =
202 grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20));
203 gpr_log(GPR_DEBUG,
204 "num_bytes=%" PRIuPTR " write_size=%" PRIuPTR " slice_size=%" PRIuPTR
205 " shutdown=%d",
206 num_bytes, write_size, slice_size, shutdown);
207
208 if (shutdown) {
209 gpr_log(GPR_INFO, "Start read and write shutdown test");
210 } else {
211 gpr_log(GPR_INFO,
212 "Start read and write test with %" PRIuPTR
213 " bytes, slice size %" PRIuPTR,
214 num_bytes, slice_size);
215 }
216
217 state.read_ep = f.client_ep;
218 state.write_ep = f.server_ep;
219 state.target_bytes = num_bytes;
220 state.bytes_read = 0;
221 state.current_write_size = write_size;
222 state.bytes_written = 0;
223 state.read_done = 0;
224 state.write_done = 0;
225 state.current_read_data = 0;
226 state.current_write_data = 0;
227 GRPC_CLOSURE_INIT(&state.read_scheduler, read_scheduler, &state,
228 grpc_schedule_on_exec_ctx);
229 GRPC_CLOSURE_INIT(&state.done_read, read_and_write_test_read_handler, &state,
230 grpc_schedule_on_exec_ctx);
231 GRPC_CLOSURE_INIT(&state.write_scheduler, write_scheduler, &state,
232 grpc_schedule_on_exec_ctx);
233 GRPC_CLOSURE_INIT(&state.done_write, read_and_write_test_write_handler,
234 &state, grpc_schedule_on_exec_ctx);
235 grpc_slice_buffer_init(&state.outgoing);
236 grpc_slice_buffer_init(&state.incoming);
237
238 /* Get started by pretending an initial write completed */
239 /* NOTE: Sets up initial conditions so we can have the same write handler
240 for the first iteration as for later iterations. It does the right thing
241 even when bytes_written is unsigned. */
242 state.bytes_written -= state.current_write_size;
243 read_and_write_test_write_handler(&state, GRPC_ERROR_NONE);
244 grpc_core::ExecCtx::Get()->Flush();
245
246 grpc_endpoint_read(state.read_ep, &state.incoming, &state.done_read,
247 /*urgent=*/false);
248 if (shutdown) {
249 gpr_log(GPR_DEBUG, "shutdown read");
250 grpc_endpoint_shutdown(
251 state.read_ep, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown"));
252 gpr_log(GPR_DEBUG, "shutdown write");
253 grpc_endpoint_shutdown(
254 state.write_ep, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown"));
255 }
256 grpc_core::ExecCtx::Get()->Flush();
257
258 gpr_mu_lock(g_mu);
259 while (!state.read_done || !state.write_done) {
260 grpc_pollset_worker* worker = nullptr;
261 GPR_ASSERT(grpc_core::ExecCtx::Get()->Now() < deadline);
262 GPR_ASSERT(GRPC_LOG_IF_ERROR(
263 "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
264 }
265 gpr_mu_unlock(g_mu);
266 grpc_core::ExecCtx::Get()->Flush();
267
268 end_test(config);
269 grpc_slice_buffer_destroy_internal(&state.outgoing);
270 grpc_slice_buffer_destroy_internal(&state.incoming);
271 grpc_endpoint_destroy(state.read_ep);
272 grpc_endpoint_destroy(state.write_ep);
273 }
274
inc_on_failure(void * arg,grpc_error * error)275 static void inc_on_failure(void* arg, grpc_error* error) {
276 gpr_mu_lock(g_mu);
277 *static_cast<int*>(arg) += (error != GRPC_ERROR_NONE);
278 GPR_ASSERT(GRPC_LOG_IF_ERROR("kick", grpc_pollset_kick(g_pollset, nullptr)));
279 gpr_mu_unlock(g_mu);
280 }
281
wait_for_fail_count(int * fail_count,int want_fail_count)282 static void wait_for_fail_count(int* fail_count, int want_fail_count) {
283 grpc_core::ExecCtx::Get()->Flush();
284 gpr_mu_lock(g_mu);
285 grpc_millis deadline =
286 grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(10));
287 while (grpc_core::ExecCtx::Get()->Now() < deadline &&
288 *fail_count < want_fail_count) {
289 grpc_pollset_worker* worker = nullptr;
290 GPR_ASSERT(GRPC_LOG_IF_ERROR(
291 "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
292 gpr_mu_unlock(g_mu);
293 grpc_core::ExecCtx::Get()->Flush();
294 gpr_mu_lock(g_mu);
295 }
296 GPR_ASSERT(*fail_count == want_fail_count);
297 gpr_mu_unlock(g_mu);
298 }
299
multiple_shutdown_test(grpc_endpoint_test_config config)300 static void multiple_shutdown_test(grpc_endpoint_test_config config) {
301 grpc_endpoint_test_fixture f =
302 begin_test(config, "multiple_shutdown_test", 128);
303 int fail_count = 0;
304
305 grpc_slice_buffer slice_buffer;
306 grpc_slice_buffer_init(&slice_buffer);
307
308 grpc_core::ExecCtx exec_ctx;
309 grpc_endpoint_add_to_pollset(f.client_ep, g_pollset);
310 grpc_endpoint_read(f.client_ep, &slice_buffer,
311 GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count,
312 grpc_schedule_on_exec_ctx),
313 /*urgent=*/false);
314 wait_for_fail_count(&fail_count, 0);
315 grpc_endpoint_shutdown(f.client_ep,
316 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown"));
317 wait_for_fail_count(&fail_count, 1);
318 grpc_endpoint_read(f.client_ep, &slice_buffer,
319 GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count,
320 grpc_schedule_on_exec_ctx),
321 /*urgent=*/false);
322 wait_for_fail_count(&fail_count, 2);
323 grpc_slice_buffer_add(&slice_buffer, grpc_slice_from_copied_string("a"));
324 grpc_endpoint_write(f.client_ep, &slice_buffer,
325 GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count,
326 grpc_schedule_on_exec_ctx),
327 nullptr);
328 wait_for_fail_count(&fail_count, 3);
329 grpc_endpoint_shutdown(f.client_ep,
330 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown"));
331 wait_for_fail_count(&fail_count, 3);
332
333 grpc_slice_buffer_destroy_internal(&slice_buffer);
334
335 grpc_endpoint_destroy(f.client_ep);
336 grpc_endpoint_destroy(f.server_ep);
337 }
338
grpc_endpoint_tests(grpc_endpoint_test_config config,grpc_pollset * pollset,gpr_mu * mu)339 void grpc_endpoint_tests(grpc_endpoint_test_config config,
340 grpc_pollset* pollset, gpr_mu* mu) {
341 size_t i;
342 g_pollset = pollset;
343 g_mu = mu;
344 multiple_shutdown_test(config);
345 read_and_write_test(config, 10000000, 100000, 8192, false);
346 read_and_write_test(config, 1000000, 100000, 1, false);
347 read_and_write_test(config, 100000000, 100000, 1, true);
348 for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) {
349 read_and_write_test(config, 40320, i, i, false);
350 }
351 g_pollset = nullptr;
352 g_mu = nullptr;
353 }
354