• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include "test/core/iomgr/endpoint_tests.h"
20 
21 #include <stdbool.h>
22 #include <sys/types.h>
23 
24 #include <grpc/slice.h>
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log.h>
27 #include <grpc/support/time.h>
28 
29 #include "src/core/lib/gpr/useful.h"
30 #include "src/core/lib/slice/slice_internal.h"
31 #include "test/core/util/test_config.h"
32 
33 /*
34    General test notes:
35 
36    All tests which write data into an endpoint write i%256 into byte i, which
37    is verified by readers.
38 
39    In general there are a few interesting things to vary which may lead to
40    exercising different codepaths in an implementation:
41    1. Total amount of data written to the endpoint
42    2. Size of slice allocations
43    3. Amount of data we read from or write to the endpoint at once
44 
45    The tests here tend to parameterize these where applicable.
46 
47 */
48 
49 static gpr_mu* g_mu;
50 static grpc_pollset* g_pollset;
51 
count_slices(grpc_slice * slices,size_t nslices,int * current_data)52 size_t count_slices(grpc_slice* slices, size_t nslices, int* current_data) {
53   size_t num_bytes = 0;
54   size_t i;
55   size_t j;
56   unsigned char* buf;
57   for (i = 0; i < nslices; ++i) {
58     buf = GRPC_SLICE_START_PTR(slices[i]);
59     for (j = 0; j < GRPC_SLICE_LENGTH(slices[i]); ++j) {
60       GPR_ASSERT(buf[j] == *current_data);
61       *current_data = (*current_data + 1) % 256;
62     }
63     num_bytes += GRPC_SLICE_LENGTH(slices[i]);
64   }
65   return num_bytes;
66 }
67 
begin_test(grpc_endpoint_test_config config,const char * test_name,size_t slice_size)68 static grpc_endpoint_test_fixture begin_test(grpc_endpoint_test_config config,
69                                              const char* test_name,
70                                              size_t slice_size) {
71   gpr_log(GPR_INFO, "%s/%s", test_name, config.name);
72   return config.create_fixture(slice_size);
73 }
74 
end_test(grpc_endpoint_test_config config)75 static void end_test(grpc_endpoint_test_config config) { config.clean_up(); }
76 
allocate_blocks(size_t num_bytes,size_t slice_size,size_t * num_blocks,uint8_t * current_data)77 static grpc_slice* allocate_blocks(size_t num_bytes, size_t slice_size,
78                                    size_t* num_blocks, uint8_t* current_data) {
79   size_t nslices = num_bytes / slice_size + (num_bytes % slice_size ? 1 : 0);
80   grpc_slice* slices =
81       static_cast<grpc_slice*>(gpr_malloc(sizeof(grpc_slice) * nslices));
82   size_t num_bytes_left = num_bytes;
83   size_t i;
84   size_t j;
85   unsigned char* buf;
86   *num_blocks = nslices;
87 
88   for (i = 0; i < nslices; ++i) {
89     slices[i] = grpc_slice_malloc(slice_size > num_bytes_left ? num_bytes_left
90                                                               : slice_size);
91     num_bytes_left -= GRPC_SLICE_LENGTH(slices[i]);
92     buf = GRPC_SLICE_START_PTR(slices[i]);
93     for (j = 0; j < GRPC_SLICE_LENGTH(slices[i]); ++j) {
94       buf[j] = *current_data;
95       (*current_data)++;
96     }
97   }
98   GPR_ASSERT(num_bytes_left == 0);
99   return slices;
100 }
101 
102 struct read_and_write_test_state {
103   grpc_endpoint* read_ep;
104   grpc_endpoint* write_ep;
105   size_t target_bytes;
106   size_t bytes_read;
107   size_t current_write_size;
108   size_t bytes_written;
109   int current_read_data;
110   uint8_t current_write_data;
111   int read_done;
112   int write_done;
113   grpc_slice_buffer incoming;
114   grpc_slice_buffer outgoing;
115   grpc_closure done_read;
116   grpc_closure done_write;
117   grpc_closure read_scheduler;
118   grpc_closure write_scheduler;
119 };
120 
read_scheduler(void * data,grpc_error *)121 static void read_scheduler(void* data, grpc_error* /* error */) {
122   struct read_and_write_test_state* state =
123       static_cast<struct read_and_write_test_state*>(data);
124   grpc_endpoint_read(state->read_ep, &state->incoming, &state->done_read,
125                      /*urgent=*/false);
126 }
127 
read_and_write_test_read_handler(void * data,grpc_error * error)128 static void read_and_write_test_read_handler(void* data, grpc_error* error) {
129   struct read_and_write_test_state* state =
130       static_cast<struct read_and_write_test_state*>(data);
131 
132   state->bytes_read += count_slices(
133       state->incoming.slices, state->incoming.count, &state->current_read_data);
134   if (state->bytes_read == state->target_bytes || error != GRPC_ERROR_NONE) {
135     gpr_log(GPR_INFO, "Read handler done");
136     gpr_mu_lock(g_mu);
137     state->read_done = 1 + (error == GRPC_ERROR_NONE);
138     GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr));
139     gpr_mu_unlock(g_mu);
140   } else if (error == GRPC_ERROR_NONE) {
141     /* We perform many reads one after another. If grpc_endpoint_read and the
142      * read_handler are both run inline, we might end up growing the stack
143      * beyond the limit. Schedule the read on ExecCtx to avoid this. */
144     grpc_core::ExecCtx::Run(DEBUG_LOCATION, &state->read_scheduler,
145                             GRPC_ERROR_NONE);
146   }
147 }
148 
write_scheduler(void * data,grpc_error *)149 static void write_scheduler(void* data, grpc_error* /* error */) {
150   struct read_and_write_test_state* state =
151       static_cast<struct read_and_write_test_state*>(data);
152   grpc_endpoint_write(state->write_ep, &state->outgoing, &state->done_write,
153                       nullptr);
154 }
155 
read_and_write_test_write_handler(void * data,grpc_error * error)156 static void read_and_write_test_write_handler(void* data, grpc_error* error) {
157   struct read_and_write_test_state* state =
158       static_cast<struct read_and_write_test_state*>(data);
159   grpc_slice* slices = nullptr;
160   size_t nslices;
161 
162   if (error == GRPC_ERROR_NONE) {
163     state->bytes_written += state->current_write_size;
164     if (state->target_bytes - state->bytes_written <
165         state->current_write_size) {
166       state->current_write_size = state->target_bytes - state->bytes_written;
167     }
168     if (state->current_write_size != 0) {
169       slices = allocate_blocks(state->current_write_size, 8192, &nslices,
170                                &state->current_write_data);
171       grpc_slice_buffer_reset_and_unref(&state->outgoing);
172       grpc_slice_buffer_addn(&state->outgoing, slices, nslices);
173       /* We perform many writes one after another. If grpc_endpoint_write and
174        * the write_handler are both run inline, we might end up growing the
175        * stack beyond the limit. Schedule the write on ExecCtx to avoid this. */
176       grpc_core::ExecCtx::Run(DEBUG_LOCATION, &state->write_scheduler,
177                               GRPC_ERROR_NONE);
178       gpr_free(slices);
179       return;
180     }
181   }
182 
183   gpr_log(GPR_INFO, "Write handler done");
184   gpr_mu_lock(g_mu);
185   state->write_done = 1 + (error == GRPC_ERROR_NONE);
186   GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr));
187   gpr_mu_unlock(g_mu);
188 }
189 
190 /* Do both reading and writing using the grpc_endpoint API.
191 
192    This also includes a test of the shutdown behavior.
193  */
read_and_write_test(grpc_endpoint_test_config config,size_t num_bytes,size_t write_size,size_t slice_size,bool shutdown)194 static void read_and_write_test(grpc_endpoint_test_config config,
195                                 size_t num_bytes, size_t write_size,
196                                 size_t slice_size, bool shutdown) {
197   struct read_and_write_test_state state;
198   grpc_endpoint_test_fixture f =
199       begin_test(config, "read_and_write_test", slice_size);
200   grpc_core::ExecCtx exec_ctx;
201   grpc_millis deadline =
202       grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20));
203   gpr_log(GPR_DEBUG,
204           "num_bytes=%" PRIuPTR " write_size=%" PRIuPTR " slice_size=%" PRIuPTR
205           " shutdown=%d",
206           num_bytes, write_size, slice_size, shutdown);
207 
208   if (shutdown) {
209     gpr_log(GPR_INFO, "Start read and write shutdown test");
210   } else {
211     gpr_log(GPR_INFO,
212             "Start read and write test with %" PRIuPTR
213             " bytes, slice size %" PRIuPTR,
214             num_bytes, slice_size);
215   }
216 
217   state.read_ep = f.client_ep;
218   state.write_ep = f.server_ep;
219   state.target_bytes = num_bytes;
220   state.bytes_read = 0;
221   state.current_write_size = write_size;
222   state.bytes_written = 0;
223   state.read_done = 0;
224   state.write_done = 0;
225   state.current_read_data = 0;
226   state.current_write_data = 0;
227   GRPC_CLOSURE_INIT(&state.read_scheduler, read_scheduler, &state,
228                     grpc_schedule_on_exec_ctx);
229   GRPC_CLOSURE_INIT(&state.done_read, read_and_write_test_read_handler, &state,
230                     grpc_schedule_on_exec_ctx);
231   GRPC_CLOSURE_INIT(&state.write_scheduler, write_scheduler, &state,
232                     grpc_schedule_on_exec_ctx);
233   GRPC_CLOSURE_INIT(&state.done_write, read_and_write_test_write_handler,
234                     &state, grpc_schedule_on_exec_ctx);
235   grpc_slice_buffer_init(&state.outgoing);
236   grpc_slice_buffer_init(&state.incoming);
237 
238   /* Get started by pretending an initial write completed */
239   /* NOTE: Sets up initial conditions so we can have the same write handler
240      for the first iteration as for later iterations. It does the right thing
241      even when bytes_written is unsigned. */
242   state.bytes_written -= state.current_write_size;
243   read_and_write_test_write_handler(&state, GRPC_ERROR_NONE);
244   grpc_core::ExecCtx::Get()->Flush();
245 
246   grpc_endpoint_read(state.read_ep, &state.incoming, &state.done_read,
247                      /*urgent=*/false);
248   if (shutdown) {
249     gpr_log(GPR_DEBUG, "shutdown read");
250     grpc_endpoint_shutdown(
251         state.read_ep, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown"));
252     gpr_log(GPR_DEBUG, "shutdown write");
253     grpc_endpoint_shutdown(
254         state.write_ep, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown"));
255   }
256   grpc_core::ExecCtx::Get()->Flush();
257 
258   gpr_mu_lock(g_mu);
259   while (!state.read_done || !state.write_done) {
260     grpc_pollset_worker* worker = nullptr;
261     GPR_ASSERT(grpc_core::ExecCtx::Get()->Now() < deadline);
262     GPR_ASSERT(GRPC_LOG_IF_ERROR(
263         "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
264   }
265   gpr_mu_unlock(g_mu);
266   grpc_core::ExecCtx::Get()->Flush();
267 
268   end_test(config);
269   grpc_slice_buffer_destroy_internal(&state.outgoing);
270   grpc_slice_buffer_destroy_internal(&state.incoming);
271   grpc_endpoint_destroy(state.read_ep);
272   grpc_endpoint_destroy(state.write_ep);
273 }
274 
inc_on_failure(void * arg,grpc_error * error)275 static void inc_on_failure(void* arg, grpc_error* error) {
276   gpr_mu_lock(g_mu);
277   *static_cast<int*>(arg) += (error != GRPC_ERROR_NONE);
278   GPR_ASSERT(GRPC_LOG_IF_ERROR("kick", grpc_pollset_kick(g_pollset, nullptr)));
279   gpr_mu_unlock(g_mu);
280 }
281 
wait_for_fail_count(int * fail_count,int want_fail_count)282 static void wait_for_fail_count(int* fail_count, int want_fail_count) {
283   grpc_core::ExecCtx::Get()->Flush();
284   gpr_mu_lock(g_mu);
285   grpc_millis deadline =
286       grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(10));
287   while (grpc_core::ExecCtx::Get()->Now() < deadline &&
288          *fail_count < want_fail_count) {
289     grpc_pollset_worker* worker = nullptr;
290     GPR_ASSERT(GRPC_LOG_IF_ERROR(
291         "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
292     gpr_mu_unlock(g_mu);
293     grpc_core::ExecCtx::Get()->Flush();
294     gpr_mu_lock(g_mu);
295   }
296   GPR_ASSERT(*fail_count == want_fail_count);
297   gpr_mu_unlock(g_mu);
298 }
299 
multiple_shutdown_test(grpc_endpoint_test_config config)300 static void multiple_shutdown_test(grpc_endpoint_test_config config) {
301   grpc_endpoint_test_fixture f =
302       begin_test(config, "multiple_shutdown_test", 128);
303   int fail_count = 0;
304 
305   grpc_slice_buffer slice_buffer;
306   grpc_slice_buffer_init(&slice_buffer);
307 
308   grpc_core::ExecCtx exec_ctx;
309   grpc_endpoint_add_to_pollset(f.client_ep, g_pollset);
310   grpc_endpoint_read(f.client_ep, &slice_buffer,
311                      GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count,
312                                          grpc_schedule_on_exec_ctx),
313                      /*urgent=*/false);
314   wait_for_fail_count(&fail_count, 0);
315   grpc_endpoint_shutdown(f.client_ep,
316                          GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown"));
317   wait_for_fail_count(&fail_count, 1);
318   grpc_endpoint_read(f.client_ep, &slice_buffer,
319                      GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count,
320                                          grpc_schedule_on_exec_ctx),
321                      /*urgent=*/false);
322   wait_for_fail_count(&fail_count, 2);
323   grpc_slice_buffer_add(&slice_buffer, grpc_slice_from_copied_string("a"));
324   grpc_endpoint_write(f.client_ep, &slice_buffer,
325                       GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count,
326                                           grpc_schedule_on_exec_ctx),
327                       nullptr);
328   wait_for_fail_count(&fail_count, 3);
329   grpc_endpoint_shutdown(f.client_ep,
330                          GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown"));
331   wait_for_fail_count(&fail_count, 3);
332 
333   grpc_slice_buffer_destroy_internal(&slice_buffer);
334 
335   grpc_endpoint_destroy(f.client_ep);
336   grpc_endpoint_destroy(f.server_ep);
337 }
338 
grpc_endpoint_tests(grpc_endpoint_test_config config,grpc_pollset * pollset,gpr_mu * mu)339 void grpc_endpoint_tests(grpc_endpoint_test_config config,
340                          grpc_pollset* pollset, gpr_mu* mu) {
341   size_t i;
342   g_pollset = pollset;
343   g_mu = mu;
344   multiple_shutdown_test(config);
345   read_and_write_test(config, 10000000, 100000, 8192, false);
346   read_and_write_test(config, 1000000, 100000, 1, false);
347   read_and_write_test(config, 100000000, 100000, 1, true);
348   for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) {
349     read_and_write_test(config, 40320, i, i, false);
350   }
351   g_pollset = nullptr;
352   g_mu = nullptr;
353 }
354