• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include "test/core/iomgr/endpoint_tests.h"
20 
21 #include <stdbool.h>
22 #include <sys/types.h>
23 
24 #include <grpc/slice.h>
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log.h>
27 #include <grpc/support/time.h>
28 
29 #include "src/core/lib/gpr/useful.h"
30 #include "src/core/lib/slice/slice_internal.h"
31 #include "test/core/util/test_config.h"
32 
33 /*
34    General test notes:
35 
36    All tests which write data into an endpoint write i%256 into byte i, which
37    is verified by readers.
38 
39    In general there are a few interesting things to vary which may lead to
40    exercising different codepaths in an implementation:
41    1. Total amount of data written to the endpoint
42    2. Size of slice allocations
43    3. Amount of data we read from or write to the endpoint at once
44 
45    The tests here tend to parameterize these where applicable.
46 
47 */
48 
49 static gpr_mu* g_mu;
50 static grpc_pollset* g_pollset;
51 
count_slices(grpc_slice * slices,size_t nslices,int * current_data)52 size_t count_slices(grpc_slice* slices, size_t nslices, int* current_data) {
53   size_t num_bytes = 0;
54   size_t i;
55   size_t j;
56   unsigned char* buf;
57   for (i = 0; i < nslices; ++i) {
58     buf = GRPC_SLICE_START_PTR(slices[i]);
59     for (j = 0; j < GRPC_SLICE_LENGTH(slices[i]); ++j) {
60       GPR_ASSERT(buf[j] == *current_data);
61       *current_data = (*current_data + 1) % 256;
62     }
63     num_bytes += GRPC_SLICE_LENGTH(slices[i]);
64   }
65   return num_bytes;
66 }
67 
begin_test(grpc_endpoint_test_config config,const char * test_name,size_t slice_size)68 static grpc_endpoint_test_fixture begin_test(grpc_endpoint_test_config config,
69                                              const char* test_name,
70                                              size_t slice_size) {
71   gpr_log(GPR_INFO, "%s/%s", test_name, config.name);
72   return config.create_fixture(slice_size);
73 }
74 
end_test(grpc_endpoint_test_config config)75 static void end_test(grpc_endpoint_test_config config) { config.clean_up(); }
76 
allocate_blocks(size_t num_bytes,size_t slice_size,size_t * num_blocks,uint8_t * current_data)77 static grpc_slice* allocate_blocks(size_t num_bytes, size_t slice_size,
78                                    size_t* num_blocks, uint8_t* current_data) {
79   size_t nslices = num_bytes / slice_size + (num_bytes % slice_size ? 1 : 0);
80   grpc_slice* slices =
81       static_cast<grpc_slice*>(gpr_malloc(sizeof(grpc_slice) * nslices));
82   size_t num_bytes_left = num_bytes;
83   size_t i;
84   size_t j;
85   unsigned char* buf;
86   *num_blocks = nslices;
87 
88   for (i = 0; i < nslices; ++i) {
89     slices[i] = grpc_slice_malloc(slice_size > num_bytes_left ? num_bytes_left
90                                                               : slice_size);
91     num_bytes_left -= GRPC_SLICE_LENGTH(slices[i]);
92     buf = GRPC_SLICE_START_PTR(slices[i]);
93     for (j = 0; j < GRPC_SLICE_LENGTH(slices[i]); ++j) {
94       buf[j] = *current_data;
95       (*current_data)++;
96     }
97   }
98   GPR_ASSERT(num_bytes_left == 0);
99   return slices;
100 }
101 
102 struct read_and_write_test_state {
103   grpc_endpoint* read_ep;
104   grpc_endpoint* write_ep;
105   size_t target_bytes;
106   size_t bytes_read;
107   size_t current_write_size;
108   size_t bytes_written;
109   int current_read_data;
110   uint8_t current_write_data;
111   int read_done;
112   int write_done;
113   grpc_slice_buffer incoming;
114   grpc_slice_buffer outgoing;
115   grpc_closure done_read;
116   grpc_closure done_write;
117 };
118 
read_and_write_test_read_handler(void * data,grpc_error * error)119 static void read_and_write_test_read_handler(void* data, grpc_error* error) {
120   struct read_and_write_test_state* state =
121       static_cast<struct read_and_write_test_state*>(data);
122 
123   state->bytes_read += count_slices(
124       state->incoming.slices, state->incoming.count, &state->current_read_data);
125   if (state->bytes_read == state->target_bytes || error != GRPC_ERROR_NONE) {
126     gpr_log(GPR_INFO, "Read handler done");
127     gpr_mu_lock(g_mu);
128     state->read_done = 1 + (error == GRPC_ERROR_NONE);
129     GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr));
130     gpr_mu_unlock(g_mu);
131   } else if (error == GRPC_ERROR_NONE) {
132     grpc_endpoint_read(state->read_ep, &state->incoming, &state->done_read);
133   }
134 }
135 
read_and_write_test_write_handler(void * data,grpc_error * error)136 static void read_and_write_test_write_handler(void* data, grpc_error* error) {
137   struct read_and_write_test_state* state =
138       static_cast<struct read_and_write_test_state*>(data);
139   grpc_slice* slices = nullptr;
140   size_t nslices;
141 
142   if (error == GRPC_ERROR_NONE) {
143     state->bytes_written += state->current_write_size;
144     if (state->target_bytes - state->bytes_written <
145         state->current_write_size) {
146       state->current_write_size = state->target_bytes - state->bytes_written;
147     }
148     if (state->current_write_size != 0) {
149       slices = allocate_blocks(state->current_write_size, 8192, &nslices,
150                                &state->current_write_data);
151       grpc_slice_buffer_reset_and_unref(&state->outgoing);
152       grpc_slice_buffer_addn(&state->outgoing, slices, nslices);
153       grpc_endpoint_write(state->write_ep, &state->outgoing, &state->done_write,
154                           nullptr);
155       gpr_free(slices);
156       return;
157     }
158   }
159 
160   gpr_log(GPR_INFO, "Write handler done");
161   gpr_mu_lock(g_mu);
162   state->write_done = 1 + (error == GRPC_ERROR_NONE);
163   GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr));
164   gpr_mu_unlock(g_mu);
165 }
166 
167 /* Do both reading and writing using the grpc_endpoint API.
168 
169    This also includes a test of the shutdown behavior.
170  */
read_and_write_test(grpc_endpoint_test_config config,size_t num_bytes,size_t write_size,size_t slice_size,bool shutdown)171 static void read_and_write_test(grpc_endpoint_test_config config,
172                                 size_t num_bytes, size_t write_size,
173                                 size_t slice_size, bool shutdown) {
174   struct read_and_write_test_state state;
175   grpc_endpoint_test_fixture f =
176       begin_test(config, "read_and_write_test", slice_size);
177   grpc_core::ExecCtx exec_ctx;
178   grpc_millis deadline =
179       grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20));
180   gpr_log(GPR_DEBUG,
181           "num_bytes=%" PRIuPTR " write_size=%" PRIuPTR " slice_size=%" PRIuPTR
182           " shutdown=%d",
183           num_bytes, write_size, slice_size, shutdown);
184 
185   if (shutdown) {
186     gpr_log(GPR_INFO, "Start read and write shutdown test");
187   } else {
188     gpr_log(GPR_INFO,
189             "Start read and write test with %" PRIuPTR
190             " bytes, slice size %" PRIuPTR,
191             num_bytes, slice_size);
192   }
193 
194   state.read_ep = f.client_ep;
195   state.write_ep = f.server_ep;
196   state.target_bytes = num_bytes;
197   state.bytes_read = 0;
198   state.current_write_size = write_size;
199   state.bytes_written = 0;
200   state.read_done = 0;
201   state.write_done = 0;
202   state.current_read_data = 0;
203   state.current_write_data = 0;
204   GRPC_CLOSURE_INIT(&state.done_read, read_and_write_test_read_handler, &state,
205                     grpc_schedule_on_exec_ctx);
206   GRPC_CLOSURE_INIT(&state.done_write, read_and_write_test_write_handler,
207                     &state, grpc_schedule_on_exec_ctx);
208   grpc_slice_buffer_init(&state.outgoing);
209   grpc_slice_buffer_init(&state.incoming);
210 
211   /* Get started by pretending an initial write completed */
212   /* NOTE: Sets up initial conditions so we can have the same write handler
213      for the first iteration as for later iterations. It does the right thing
214      even when bytes_written is unsigned. */
215   state.bytes_written -= state.current_write_size;
216   read_and_write_test_write_handler(&state, GRPC_ERROR_NONE);
217   grpc_core::ExecCtx::Get()->Flush();
218 
219   grpc_endpoint_read(state.read_ep, &state.incoming, &state.done_read);
220 
221   if (shutdown) {
222     gpr_log(GPR_DEBUG, "shutdown read");
223     grpc_endpoint_shutdown(
224         state.read_ep, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown"));
225     gpr_log(GPR_DEBUG, "shutdown write");
226     grpc_endpoint_shutdown(
227         state.write_ep, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown"));
228   }
229   grpc_core::ExecCtx::Get()->Flush();
230 
231   gpr_mu_lock(g_mu);
232   while (!state.read_done || !state.write_done) {
233     grpc_pollset_worker* worker = nullptr;
234     GPR_ASSERT(grpc_core::ExecCtx::Get()->Now() < deadline);
235     GPR_ASSERT(GRPC_LOG_IF_ERROR(
236         "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
237   }
238   gpr_mu_unlock(g_mu);
239   grpc_core::ExecCtx::Get()->Flush();
240 
241   end_test(config);
242   grpc_slice_buffer_destroy_internal(&state.outgoing);
243   grpc_slice_buffer_destroy_internal(&state.incoming);
244   grpc_endpoint_destroy(state.read_ep);
245   grpc_endpoint_destroy(state.write_ep);
246 }
247 
inc_on_failure(void * arg,grpc_error * error)248 static void inc_on_failure(void* arg, grpc_error* error) {
249   gpr_mu_lock(g_mu);
250   *static_cast<int*>(arg) += (error != GRPC_ERROR_NONE);
251   GPR_ASSERT(GRPC_LOG_IF_ERROR("kick", grpc_pollset_kick(g_pollset, nullptr)));
252   gpr_mu_unlock(g_mu);
253 }
254 
wait_for_fail_count(int * fail_count,int want_fail_count)255 static void wait_for_fail_count(int* fail_count, int want_fail_count) {
256   grpc_core::ExecCtx::Get()->Flush();
257   gpr_mu_lock(g_mu);
258   grpc_millis deadline =
259       grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(10));
260   while (grpc_core::ExecCtx::Get()->Now() < deadline &&
261          *fail_count < want_fail_count) {
262     grpc_pollset_worker* worker = nullptr;
263     GPR_ASSERT(GRPC_LOG_IF_ERROR(
264         "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
265     gpr_mu_unlock(g_mu);
266     grpc_core::ExecCtx::Get()->Flush();
267     gpr_mu_lock(g_mu);
268   }
269   GPR_ASSERT(*fail_count == want_fail_count);
270   gpr_mu_unlock(g_mu);
271 }
272 
multiple_shutdown_test(grpc_endpoint_test_config config)273 static void multiple_shutdown_test(grpc_endpoint_test_config config) {
274   grpc_endpoint_test_fixture f =
275       begin_test(config, "multiple_shutdown_test", 128);
276   int fail_count = 0;
277 
278   grpc_slice_buffer slice_buffer;
279   grpc_slice_buffer_init(&slice_buffer);
280 
281   grpc_core::ExecCtx exec_ctx;
282   grpc_endpoint_add_to_pollset(f.client_ep, g_pollset);
283   grpc_endpoint_read(f.client_ep, &slice_buffer,
284                      GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count,
285                                          grpc_schedule_on_exec_ctx));
286   wait_for_fail_count(&fail_count, 0);
287   grpc_endpoint_shutdown(f.client_ep,
288                          GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown"));
289   wait_for_fail_count(&fail_count, 1);
290   grpc_endpoint_read(f.client_ep, &slice_buffer,
291                      GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count,
292                                          grpc_schedule_on_exec_ctx));
293   wait_for_fail_count(&fail_count, 2);
294   grpc_slice_buffer_add(&slice_buffer, grpc_slice_from_copied_string("a"));
295   grpc_endpoint_write(f.client_ep, &slice_buffer,
296                       GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count,
297                                           grpc_schedule_on_exec_ctx),
298                       nullptr);
299   wait_for_fail_count(&fail_count, 3);
300   grpc_endpoint_shutdown(f.client_ep,
301                          GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown"));
302   wait_for_fail_count(&fail_count, 3);
303 
304   grpc_slice_buffer_destroy_internal(&slice_buffer);
305 
306   grpc_endpoint_destroy(f.client_ep);
307   grpc_endpoint_destroy(f.server_ep);
308 }
309 
grpc_endpoint_tests(grpc_endpoint_test_config config,grpc_pollset * pollset,gpr_mu * mu)310 void grpc_endpoint_tests(grpc_endpoint_test_config config,
311                          grpc_pollset* pollset, gpr_mu* mu) {
312   size_t i;
313   g_pollset = pollset;
314   g_mu = mu;
315   multiple_shutdown_test(config);
316   read_and_write_test(config, 10000000, 100000, 8192, false);
317   read_and_write_test(config, 1000000, 100000, 1, false);
318   read_and_write_test(config, 100000000, 100000, 1, true);
319   for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) {
320     read_and_write_test(config, 40320, i, i, false);
321   }
322   g_pollset = nullptr;
323   g_mu = nullptr;
324 }
325