1 /*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include "test/core/end2end/end2end_tests.h"
20
21 #include <stdio.h>
22 #include <string.h>
23
24 #include <grpc/byte_buffer.h>
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log.h>
27 #include <grpc/support/time.h>
28
29 #include "test/core/end2end/cq_verifier.h"
30
tag(intptr_t t)31 static void* tag(intptr_t t) { return (void*)t; }
32
begin_test(grpc_end2end_test_config config,const char * test_name,grpc_channel_args * client_args,grpc_channel_args * server_args)33 static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
34 const char* test_name,
35 grpc_channel_args* client_args,
36 grpc_channel_args* server_args) {
37 grpc_end2end_test_fixture f;
38 gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name);
39 f = config.create_fixture(client_args, server_args);
40 config.init_server(&f, server_args);
41 config.init_client(&f, client_args);
42 return f;
43 }
44
n_seconds_from_now(int n)45 static gpr_timespec n_seconds_from_now(int n) {
46 return grpc_timeout_seconds_to_deadline(n);
47 }
48
five_seconds_from_now(void)49 static gpr_timespec five_seconds_from_now(void) {
50 return n_seconds_from_now(5);
51 }
52
drain_cq(grpc_completion_queue * cq)53 static void drain_cq(grpc_completion_queue* cq) {
54 grpc_event ev;
55 do {
56 ev = grpc_completion_queue_next(cq, five_seconds_from_now(), nullptr);
57 } while (ev.type != GRPC_QUEUE_SHUTDOWN);
58 }
59
shutdown_server(grpc_end2end_test_fixture * f)60 static void shutdown_server(grpc_end2end_test_fixture* f) {
61 if (!f->server) return;
62 grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000));
63 GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000),
64 grpc_timeout_seconds_to_deadline(5),
65 nullptr)
66 .type == GRPC_OP_COMPLETE);
67 grpc_server_destroy(f->server);
68 f->server = nullptr;
69 }
70
shutdown_client(grpc_end2end_test_fixture * f)71 static void shutdown_client(grpc_end2end_test_fixture* f) {
72 if (!f->client) return;
73 grpc_channel_destroy(f->client);
74 f->client = nullptr;
75 }
76
end_test(grpc_end2end_test_fixture * f)77 static void end_test(grpc_end2end_test_fixture* f) {
78 shutdown_server(f);
79 shutdown_client(f);
80
81 grpc_completion_queue_shutdown(f->cq);
82 drain_cq(f->cq);
83 grpc_completion_queue_destroy(f->cq);
84 grpc_completion_queue_destroy(f->shutdown_cq);
85 }
86
87 /* Creates and returns a grpc_slice containing random alphanumeric characters.
88 */
generate_random_slice()89 static grpc_slice generate_random_slice() {
90 size_t i;
91 static const char chars[] = "abcdefghijklmnopqrstuvwxyz1234567890";
92 char* output;
93 const size_t output_size = 1024 * 1024;
94 output = static_cast<char*>(gpr_malloc(output_size));
95 for (i = 0; i < output_size - 1; ++i) {
96 output[i] = chars[rand() % static_cast<int>(sizeof(chars) - 1)];
97 }
98 output[output_size - 1] = '\0';
99 grpc_slice out = grpc_slice_from_copied_string(output);
100 gpr_free(output);
101 return out;
102 }
103
resource_quota_server(grpc_end2end_test_config config)104 void resource_quota_server(grpc_end2end_test_config config) {
105 if (config.feature_mask &
106 FEATURE_MASK_DOES_NOT_SUPPORT_RESOURCE_QUOTA_SERVER) {
107 return;
108 }
109 grpc_resource_quota* resource_quota =
110 grpc_resource_quota_create("test_server");
111 grpc_resource_quota_resize(resource_quota, 5 * 1024 * 1024);
112
113 #define NUM_CALLS 100
114 #define CLIENT_BASE_TAG 0x1000
115 #define SERVER_START_BASE_TAG 0x2000
116 #define SERVER_RECV_BASE_TAG 0x3000
117 #define SERVER_END_BASE_TAG 0x4000
118
119 grpc_arg arg;
120 arg.key = const_cast<char*>(GRPC_ARG_RESOURCE_QUOTA);
121 arg.type = GRPC_ARG_POINTER;
122 arg.value.pointer.p = resource_quota;
123 arg.value.pointer.vtable = grpc_resource_quota_arg_vtable();
124 grpc_channel_args args = {1, &arg};
125
126 grpc_end2end_test_fixture f =
127 begin_test(config, "resource_quota_server", nullptr, &args);
128
129 /* Create large request and response bodies. These are big enough to require
130 * multiple round trips to deliver to the peer, and their exact contents of
131 * will be verified on completion. */
132 grpc_slice request_payload_slice = generate_random_slice();
133
134 grpc_call** client_calls =
135 static_cast<grpc_call**>(malloc(sizeof(grpc_call*) * NUM_CALLS));
136 grpc_call** server_calls =
137 static_cast<grpc_call**>(malloc(sizeof(grpc_call*) * NUM_CALLS));
138 grpc_metadata_array* initial_metadata_recv =
139 static_cast<grpc_metadata_array*>(
140 malloc(sizeof(grpc_metadata_array) * NUM_CALLS));
141 grpc_metadata_array* trailing_metadata_recv =
142 static_cast<grpc_metadata_array*>(
143 malloc(sizeof(grpc_metadata_array) * NUM_CALLS));
144 grpc_metadata_array* request_metadata_recv =
145 static_cast<grpc_metadata_array*>(
146 malloc(sizeof(grpc_metadata_array) * NUM_CALLS));
147 grpc_call_details* call_details = static_cast<grpc_call_details*>(
148 malloc(sizeof(grpc_call_details) * NUM_CALLS));
149 grpc_status_code* status = static_cast<grpc_status_code*>(
150 malloc(sizeof(grpc_status_code) * NUM_CALLS));
151 grpc_slice* details =
152 static_cast<grpc_slice*>(malloc(sizeof(grpc_slice) * NUM_CALLS));
153 grpc_byte_buffer** request_payload = static_cast<grpc_byte_buffer**>(
154 malloc(sizeof(grpc_byte_buffer*) * NUM_CALLS));
155 grpc_byte_buffer** request_payload_recv = static_cast<grpc_byte_buffer**>(
156 malloc(sizeof(grpc_byte_buffer*) * NUM_CALLS));
157 int* was_cancelled = static_cast<int*>(malloc(sizeof(int) * NUM_CALLS));
158 grpc_call_error error;
159 int pending_client_calls = 0;
160 int pending_server_start_calls = 0;
161 int pending_server_recv_calls = 0;
162 int pending_server_end_calls = 0;
163 int cancelled_calls_on_client = 0;
164 int cancelled_calls_on_server = 0;
165 int deadline_exceeded = 0;
166 int unavailable = 0;
167
168 grpc_op ops[6];
169 grpc_op* op;
170
171 for (int i = 0; i < NUM_CALLS; i++) {
172 grpc_metadata_array_init(&initial_metadata_recv[i]);
173 grpc_metadata_array_init(&trailing_metadata_recv[i]);
174 grpc_metadata_array_init(&request_metadata_recv[i]);
175 grpc_call_details_init(&call_details[i]);
176 request_payload[i] = grpc_raw_byte_buffer_create(&request_payload_slice, 1);
177 request_payload_recv[i] = nullptr;
178 was_cancelled[i] = 0;
179 }
180
181 for (int i = 0; i < NUM_CALLS; i++) {
182 error = grpc_server_request_call(
183 f.server, &server_calls[i], &call_details[i], &request_metadata_recv[i],
184 f.cq, f.cq, tag(SERVER_START_BASE_TAG + i));
185 GPR_ASSERT(GRPC_CALL_OK == error);
186
187 pending_server_start_calls++;
188 }
189
190 for (int i = 0; i < NUM_CALLS; i++) {
191 client_calls[i] =
192 grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS,
193 f.cq, grpc_slice_from_static_string("/foo"),
194 nullptr, n_seconds_from_now(60), nullptr);
195
196 memset(ops, 0, sizeof(ops));
197 op = ops;
198 op->op = GRPC_OP_SEND_INITIAL_METADATA;
199 op->data.send_initial_metadata.count = 0;
200 op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY;
201 op->reserved = nullptr;
202 op++;
203 op->op = GRPC_OP_SEND_MESSAGE;
204 op->data.send_message.send_message = request_payload[i];
205 op->flags = 0;
206 op->reserved = nullptr;
207 op++;
208 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
209 op->flags = 0;
210 op->reserved = nullptr;
211 op++;
212 op->op = GRPC_OP_RECV_INITIAL_METADATA;
213 op->data.recv_initial_metadata.recv_initial_metadata =
214 &initial_metadata_recv[i];
215 op->flags = 0;
216 op->reserved = nullptr;
217 op++;
218 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
219 op->data.recv_status_on_client.trailing_metadata =
220 &trailing_metadata_recv[i];
221 op->data.recv_status_on_client.status = &status[i];
222 op->data.recv_status_on_client.status_details = &details[i];
223 op->flags = 0;
224 op->reserved = nullptr;
225 op++;
226 error = grpc_call_start_batch(client_calls[i], ops,
227 static_cast<size_t>(op - ops),
228 tag(CLIENT_BASE_TAG + i), nullptr);
229 GPR_ASSERT(GRPC_CALL_OK == error);
230
231 pending_client_calls++;
232 }
233
234 while (pending_client_calls + pending_server_recv_calls +
235 pending_server_end_calls >
236 0) {
237 grpc_event ev =
238 grpc_completion_queue_next(f.cq, n_seconds_from_now(60), nullptr);
239 GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
240
241 int ev_tag = static_cast<int>((intptr_t)ev.tag);
242 if (ev_tag < CLIENT_BASE_TAG) {
243 abort(); /* illegal tag */
244 } else if (ev_tag < SERVER_START_BASE_TAG) {
245 /* client call finished */
246 int call_id = ev_tag - CLIENT_BASE_TAG;
247 GPR_ASSERT(call_id >= 0);
248 GPR_ASSERT(call_id < NUM_CALLS);
249 switch (status[call_id]) {
250 case GRPC_STATUS_RESOURCE_EXHAUSTED:
251 cancelled_calls_on_client++;
252 break;
253 case GRPC_STATUS_DEADLINE_EXCEEDED:
254 deadline_exceeded++;
255 break;
256 case GRPC_STATUS_UNAVAILABLE:
257 unavailable++;
258 break;
259 case GRPC_STATUS_OK:
260 break;
261 default:
262 gpr_log(GPR_ERROR, "Unexpected status code: %d", status[call_id]);
263 abort();
264 }
265 GPR_ASSERT(pending_client_calls > 0);
266
267 grpc_metadata_array_destroy(&initial_metadata_recv[call_id]);
268 grpc_metadata_array_destroy(&trailing_metadata_recv[call_id]);
269 grpc_call_unref(client_calls[call_id]);
270 grpc_slice_unref(details[call_id]);
271 grpc_byte_buffer_destroy(request_payload[call_id]);
272
273 pending_client_calls--;
274 } else if (ev_tag < SERVER_RECV_BASE_TAG) {
275 /* new incoming call to the server */
276 int call_id = ev_tag - SERVER_START_BASE_TAG;
277 GPR_ASSERT(call_id >= 0);
278 GPR_ASSERT(call_id < NUM_CALLS);
279
280 memset(ops, 0, sizeof(ops));
281 op = ops;
282 op->op = GRPC_OP_SEND_INITIAL_METADATA;
283 op->data.send_initial_metadata.count = 0;
284 op->flags = 0;
285 op->reserved = nullptr;
286 op++;
287 op->op = GRPC_OP_RECV_MESSAGE;
288 op->data.recv_message.recv_message = &request_payload_recv[call_id];
289 op->flags = 0;
290 op->reserved = nullptr;
291 op++;
292 error = grpc_call_start_batch(
293 server_calls[call_id], ops, static_cast<size_t>(op - ops),
294 tag(SERVER_RECV_BASE_TAG + call_id), nullptr);
295 GPR_ASSERT(GRPC_CALL_OK == error);
296
297 GPR_ASSERT(pending_server_start_calls > 0);
298 pending_server_start_calls--;
299 pending_server_recv_calls++;
300
301 grpc_call_details_destroy(&call_details[call_id]);
302 grpc_metadata_array_destroy(&request_metadata_recv[call_id]);
303 } else if (ev_tag < SERVER_END_BASE_TAG) {
304 /* finished read on the server */
305 int call_id = ev_tag - SERVER_RECV_BASE_TAG;
306 GPR_ASSERT(call_id >= 0);
307 GPR_ASSERT(call_id < NUM_CALLS);
308
309 if (ev.success) {
310 if (request_payload_recv[call_id] != nullptr) {
311 grpc_byte_buffer_destroy(request_payload_recv[call_id]);
312 request_payload_recv[call_id] = nullptr;
313 }
314 } else {
315 GPR_ASSERT(request_payload_recv[call_id] == nullptr);
316 }
317
318 memset(ops, 0, sizeof(ops));
319 op = ops;
320 op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
321 op->data.recv_close_on_server.cancelled = &was_cancelled[call_id];
322 op->flags = 0;
323 op->reserved = nullptr;
324 op++;
325 op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
326 op->data.send_status_from_server.trailing_metadata_count = 0;
327 op->data.send_status_from_server.status = GRPC_STATUS_OK;
328 grpc_slice status_details = grpc_slice_from_static_string("xyz");
329 op->data.send_status_from_server.status_details = &status_details;
330 op->flags = 0;
331 op->reserved = nullptr;
332 op++;
333 error = grpc_call_start_batch(
334 server_calls[call_id], ops, static_cast<size_t>(op - ops),
335 tag(SERVER_END_BASE_TAG + call_id), nullptr);
336 GPR_ASSERT(GRPC_CALL_OK == error);
337
338 GPR_ASSERT(pending_server_recv_calls > 0);
339 pending_server_recv_calls--;
340 pending_server_end_calls++;
341 } else {
342 int call_id = ev_tag - SERVER_END_BASE_TAG;
343 GPR_ASSERT(call_id >= 0);
344 GPR_ASSERT(call_id < NUM_CALLS);
345
346 if (was_cancelled[call_id]) {
347 cancelled_calls_on_server++;
348 }
349 GPR_ASSERT(pending_server_end_calls > 0);
350 pending_server_end_calls--;
351
352 grpc_call_unref(server_calls[call_id]);
353 }
354 }
355
356 gpr_log(GPR_INFO,
357 "Done. %d total calls: %d cancelled at server, %d cancelled at "
358 "client, %d timed out, %d unavailable.",
359 NUM_CALLS, cancelled_calls_on_server, cancelled_calls_on_client,
360 deadline_exceeded, unavailable);
361
362 grpc_slice_unref(request_payload_slice);
363 grpc_resource_quota_unref(resource_quota);
364
365 end_test(&f);
366 config.tear_down_data(&f);
367
368 free(client_calls);
369 free(server_calls);
370 free(initial_metadata_recv);
371 free(trailing_metadata_recv);
372 free(request_metadata_recv);
373 free(call_details);
374 free(status);
375 free(details);
376 free(request_payload);
377 free(request_payload_recv);
378 free(was_cancelled);
379 }
380
resource_quota_server_pre_init(void)381 void resource_quota_server_pre_init(void) {}
382