1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include "test/core/iomgr/endpoint_tests.h"
20
21 #include <stdbool.h>
22 #include <sys/types.h>
23
24 #include <grpc/slice.h>
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log.h>
27 #include <grpc/support/time.h>
28
29 #include "src/core/lib/gpr/useful.h"
30 #include "src/core/lib/slice/slice_internal.h"
31 #include "test/core/util/test_config.h"
32
33 /*
34 General test notes:
35
36 All tests which write data into an endpoint write i%256 into byte i, which
37 is verified by readers.
38
39 In general there are a few interesting things to vary which may lead to
40 exercising different codepaths in an implementation:
41 1. Total amount of data written to the endpoint
42 2. Size of slice allocations
43 3. Amount of data we read from or write to the endpoint at once
44
45 The tests here tend to parameterize these where applicable.
46
47 */
48
49 static gpr_mu* g_mu;
50 static grpc_pollset* g_pollset;
51
count_slices(grpc_slice * slices,size_t nslices,int * current_data)52 size_t count_slices(grpc_slice* slices, size_t nslices, int* current_data) {
53 size_t num_bytes = 0;
54 size_t i;
55 size_t j;
56 unsigned char* buf;
57 for (i = 0; i < nslices; ++i) {
58 buf = GRPC_SLICE_START_PTR(slices[i]);
59 for (j = 0; j < GRPC_SLICE_LENGTH(slices[i]); ++j) {
60 GPR_ASSERT(buf[j] == *current_data);
61 *current_data = (*current_data + 1) % 256;
62 }
63 num_bytes += GRPC_SLICE_LENGTH(slices[i]);
64 }
65 return num_bytes;
66 }
67
begin_test(grpc_endpoint_test_config config,const char * test_name,size_t slice_size)68 static grpc_endpoint_test_fixture begin_test(grpc_endpoint_test_config config,
69 const char* test_name,
70 size_t slice_size) {
71 gpr_log(GPR_INFO, "%s/%s", test_name, config.name);
72 return config.create_fixture(slice_size);
73 }
74
end_test(grpc_endpoint_test_config config)75 static void end_test(grpc_endpoint_test_config config) { config.clean_up(); }
76
allocate_blocks(size_t num_bytes,size_t slice_size,size_t * num_blocks,uint8_t * current_data)77 static grpc_slice* allocate_blocks(size_t num_bytes, size_t slice_size,
78 size_t* num_blocks, uint8_t* current_data) {
79 size_t nslices = num_bytes / slice_size + (num_bytes % slice_size ? 1 : 0);
80 grpc_slice* slices =
81 static_cast<grpc_slice*>(gpr_malloc(sizeof(grpc_slice) * nslices));
82 size_t num_bytes_left = num_bytes;
83 size_t i;
84 size_t j;
85 unsigned char* buf;
86 *num_blocks = nslices;
87
88 for (i = 0; i < nslices; ++i) {
89 slices[i] = grpc_slice_malloc(slice_size > num_bytes_left ? num_bytes_left
90 : slice_size);
91 num_bytes_left -= GRPC_SLICE_LENGTH(slices[i]);
92 buf = GRPC_SLICE_START_PTR(slices[i]);
93 for (j = 0; j < GRPC_SLICE_LENGTH(slices[i]); ++j) {
94 buf[j] = *current_data;
95 (*current_data)++;
96 }
97 }
98 GPR_ASSERT(num_bytes_left == 0);
99 return slices;
100 }
101
102 struct read_and_write_test_state {
103 grpc_endpoint* read_ep;
104 grpc_endpoint* write_ep;
105 size_t target_bytes;
106 size_t bytes_read;
107 size_t current_write_size;
108 size_t bytes_written;
109 int current_read_data;
110 uint8_t current_write_data;
111 int read_done;
112 int write_done;
113 grpc_slice_buffer incoming;
114 grpc_slice_buffer outgoing;
115 grpc_closure done_read;
116 grpc_closure done_write;
117 };
118
read_and_write_test_read_handler(void * data,grpc_error * error)119 static void read_and_write_test_read_handler(void* data, grpc_error* error) {
120 struct read_and_write_test_state* state =
121 static_cast<struct read_and_write_test_state*>(data);
122
123 state->bytes_read += count_slices(
124 state->incoming.slices, state->incoming.count, &state->current_read_data);
125 if (state->bytes_read == state->target_bytes || error != GRPC_ERROR_NONE) {
126 gpr_log(GPR_INFO, "Read handler done");
127 gpr_mu_lock(g_mu);
128 state->read_done = 1 + (error == GRPC_ERROR_NONE);
129 GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr));
130 gpr_mu_unlock(g_mu);
131 } else if (error == GRPC_ERROR_NONE) {
132 grpc_endpoint_read(state->read_ep, &state->incoming, &state->done_read);
133 }
134 }
135
read_and_write_test_write_handler(void * data,grpc_error * error)136 static void read_and_write_test_write_handler(void* data, grpc_error* error) {
137 struct read_and_write_test_state* state =
138 static_cast<struct read_and_write_test_state*>(data);
139 grpc_slice* slices = nullptr;
140 size_t nslices;
141
142 if (error == GRPC_ERROR_NONE) {
143 state->bytes_written += state->current_write_size;
144 if (state->target_bytes - state->bytes_written <
145 state->current_write_size) {
146 state->current_write_size = state->target_bytes - state->bytes_written;
147 }
148 if (state->current_write_size != 0) {
149 slices = allocate_blocks(state->current_write_size, 8192, &nslices,
150 &state->current_write_data);
151 grpc_slice_buffer_reset_and_unref(&state->outgoing);
152 grpc_slice_buffer_addn(&state->outgoing, slices, nslices);
153 grpc_endpoint_write(state->write_ep, &state->outgoing, &state->done_write,
154 nullptr);
155 gpr_free(slices);
156 return;
157 }
158 }
159
160 gpr_log(GPR_INFO, "Write handler done");
161 gpr_mu_lock(g_mu);
162 state->write_done = 1 + (error == GRPC_ERROR_NONE);
163 GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr));
164 gpr_mu_unlock(g_mu);
165 }
166
167 /* Do both reading and writing using the grpc_endpoint API.
168
169 This also includes a test of the shutdown behavior.
170 */
read_and_write_test(grpc_endpoint_test_config config,size_t num_bytes,size_t write_size,size_t slice_size,bool shutdown)171 static void read_and_write_test(grpc_endpoint_test_config config,
172 size_t num_bytes, size_t write_size,
173 size_t slice_size, bool shutdown) {
174 struct read_and_write_test_state state;
175 grpc_endpoint_test_fixture f =
176 begin_test(config, "read_and_write_test", slice_size);
177 grpc_core::ExecCtx exec_ctx;
178 grpc_millis deadline =
179 grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20));
180 gpr_log(GPR_DEBUG,
181 "num_bytes=%" PRIuPTR " write_size=%" PRIuPTR " slice_size=%" PRIuPTR
182 " shutdown=%d",
183 num_bytes, write_size, slice_size, shutdown);
184
185 if (shutdown) {
186 gpr_log(GPR_INFO, "Start read and write shutdown test");
187 } else {
188 gpr_log(GPR_INFO,
189 "Start read and write test with %" PRIuPTR
190 " bytes, slice size %" PRIuPTR,
191 num_bytes, slice_size);
192 }
193
194 state.read_ep = f.client_ep;
195 state.write_ep = f.server_ep;
196 state.target_bytes = num_bytes;
197 state.bytes_read = 0;
198 state.current_write_size = write_size;
199 state.bytes_written = 0;
200 state.read_done = 0;
201 state.write_done = 0;
202 state.current_read_data = 0;
203 state.current_write_data = 0;
204 GRPC_CLOSURE_INIT(&state.done_read, read_and_write_test_read_handler, &state,
205 grpc_schedule_on_exec_ctx);
206 GRPC_CLOSURE_INIT(&state.done_write, read_and_write_test_write_handler,
207 &state, grpc_schedule_on_exec_ctx);
208 grpc_slice_buffer_init(&state.outgoing);
209 grpc_slice_buffer_init(&state.incoming);
210
211 /* Get started by pretending an initial write completed */
212 /* NOTE: Sets up initial conditions so we can have the same write handler
213 for the first iteration as for later iterations. It does the right thing
214 even when bytes_written is unsigned. */
215 state.bytes_written -= state.current_write_size;
216 read_and_write_test_write_handler(&state, GRPC_ERROR_NONE);
217 grpc_core::ExecCtx::Get()->Flush();
218
219 grpc_endpoint_read(state.read_ep, &state.incoming, &state.done_read);
220
221 if (shutdown) {
222 gpr_log(GPR_DEBUG, "shutdown read");
223 grpc_endpoint_shutdown(
224 state.read_ep, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown"));
225 gpr_log(GPR_DEBUG, "shutdown write");
226 grpc_endpoint_shutdown(
227 state.write_ep, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown"));
228 }
229 grpc_core::ExecCtx::Get()->Flush();
230
231 gpr_mu_lock(g_mu);
232 while (!state.read_done || !state.write_done) {
233 grpc_pollset_worker* worker = nullptr;
234 GPR_ASSERT(grpc_core::ExecCtx::Get()->Now() < deadline);
235 GPR_ASSERT(GRPC_LOG_IF_ERROR(
236 "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
237 }
238 gpr_mu_unlock(g_mu);
239 grpc_core::ExecCtx::Get()->Flush();
240
241 end_test(config);
242 grpc_slice_buffer_destroy_internal(&state.outgoing);
243 grpc_slice_buffer_destroy_internal(&state.incoming);
244 grpc_endpoint_destroy(state.read_ep);
245 grpc_endpoint_destroy(state.write_ep);
246 }
247
inc_on_failure(void * arg,grpc_error * error)248 static void inc_on_failure(void* arg, grpc_error* error) {
249 gpr_mu_lock(g_mu);
250 *static_cast<int*>(arg) += (error != GRPC_ERROR_NONE);
251 GPR_ASSERT(GRPC_LOG_IF_ERROR("kick", grpc_pollset_kick(g_pollset, nullptr)));
252 gpr_mu_unlock(g_mu);
253 }
254
wait_for_fail_count(int * fail_count,int want_fail_count)255 static void wait_for_fail_count(int* fail_count, int want_fail_count) {
256 grpc_core::ExecCtx::Get()->Flush();
257 gpr_mu_lock(g_mu);
258 grpc_millis deadline =
259 grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(10));
260 while (grpc_core::ExecCtx::Get()->Now() < deadline &&
261 *fail_count < want_fail_count) {
262 grpc_pollset_worker* worker = nullptr;
263 GPR_ASSERT(GRPC_LOG_IF_ERROR(
264 "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
265 gpr_mu_unlock(g_mu);
266 grpc_core::ExecCtx::Get()->Flush();
267 gpr_mu_lock(g_mu);
268 }
269 GPR_ASSERT(*fail_count == want_fail_count);
270 gpr_mu_unlock(g_mu);
271 }
272
multiple_shutdown_test(grpc_endpoint_test_config config)273 static void multiple_shutdown_test(grpc_endpoint_test_config config) {
274 grpc_endpoint_test_fixture f =
275 begin_test(config, "multiple_shutdown_test", 128);
276 int fail_count = 0;
277
278 grpc_slice_buffer slice_buffer;
279 grpc_slice_buffer_init(&slice_buffer);
280
281 grpc_core::ExecCtx exec_ctx;
282 grpc_endpoint_add_to_pollset(f.client_ep, g_pollset);
283 grpc_endpoint_read(f.client_ep, &slice_buffer,
284 GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count,
285 grpc_schedule_on_exec_ctx));
286 wait_for_fail_count(&fail_count, 0);
287 grpc_endpoint_shutdown(f.client_ep,
288 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown"));
289 wait_for_fail_count(&fail_count, 1);
290 grpc_endpoint_read(f.client_ep, &slice_buffer,
291 GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count,
292 grpc_schedule_on_exec_ctx));
293 wait_for_fail_count(&fail_count, 2);
294 grpc_slice_buffer_add(&slice_buffer, grpc_slice_from_copied_string("a"));
295 grpc_endpoint_write(f.client_ep, &slice_buffer,
296 GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count,
297 grpc_schedule_on_exec_ctx),
298 nullptr);
299 wait_for_fail_count(&fail_count, 3);
300 grpc_endpoint_shutdown(f.client_ep,
301 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown"));
302 wait_for_fail_count(&fail_count, 3);
303
304 grpc_slice_buffer_destroy_internal(&slice_buffer);
305
306 grpc_endpoint_destroy(f.client_ep);
307 grpc_endpoint_destroy(f.server_ep);
308 }
309
grpc_endpoint_tests(grpc_endpoint_test_config config,grpc_pollset * pollset,gpr_mu * mu)310 void grpc_endpoint_tests(grpc_endpoint_test_config config,
311 grpc_pollset* pollset, gpr_mu* mu) {
312 size_t i;
313 g_pollset = pollset;
314 g_mu = mu;
315 multiple_shutdown_test(config);
316 read_and_write_test(config, 10000000, 100000, 8192, false);
317 read_and_write_test(config, 1000000, 100000, 1, false);
318 read_and_write_test(config, 100000000, 100000, 1, true);
319 for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) {
320 read_and_write_test(config, 40320, i, i, false);
321 }
322 g_pollset = nullptr;
323 g_mu = nullptr;
324 }
325