1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include "test/core/end2end/end2end_tests.h"
20
21 #include <stdio.h>
22 #include <string.h>
23
24 #include <grpc/byte_buffer.h>
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log.h>
27 #include <grpc/support/time.h>
28 #include "test/core/end2end/cq_verifier.h"
29
tag(intptr_t t)30 static void* tag(intptr_t t) { return (void*)t; }
31
begin_test(grpc_end2end_test_config config,const char * test_name,grpc_channel_args * client_args,grpc_channel_args * server_args)32 static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
33 const char* test_name,
34 grpc_channel_args* client_args,
35 grpc_channel_args* server_args) {
36 grpc_end2end_test_fixture f;
37 gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name);
38 f = config.create_fixture(client_args, server_args);
39 config.init_server(&f, server_args);
40 config.init_client(&f, client_args);
41 return f;
42 }
43
n_seconds_from_now(int n)44 static gpr_timespec n_seconds_from_now(int n) {
45 return grpc_timeout_seconds_to_deadline(n);
46 }
47
five_seconds_from_now(void)48 static gpr_timespec five_seconds_from_now(void) {
49 return n_seconds_from_now(5);
50 }
51
drain_cq(grpc_completion_queue * cq)52 static void drain_cq(grpc_completion_queue* cq) {
53 grpc_event ev;
54 do {
55 ev = grpc_completion_queue_next(cq, five_seconds_from_now(), nullptr);
56 } while (ev.type != GRPC_QUEUE_SHUTDOWN);
57 }
58
shutdown_server(grpc_end2end_test_fixture * f)59 static void shutdown_server(grpc_end2end_test_fixture* f) {
60 if (!f->server) return;
61 grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000));
62 GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000),
63 grpc_timeout_seconds_to_deadline(5),
64 nullptr)
65 .type == GRPC_OP_COMPLETE);
66 grpc_server_destroy(f->server);
67 f->server = nullptr;
68 }
69
shutdown_client(grpc_end2end_test_fixture * f)70 static void shutdown_client(grpc_end2end_test_fixture* f) {
71 if (!f->client) return;
72 grpc_channel_destroy(f->client);
73 f->client = nullptr;
74 }
75
end_test(grpc_end2end_test_fixture * f)76 static void end_test(grpc_end2end_test_fixture* f) {
77 shutdown_server(f);
78 shutdown_client(f);
79
80 grpc_completion_queue_shutdown(f->cq);
81 drain_cq(f->cq);
82 grpc_completion_queue_destroy(f->cq);
83 grpc_completion_queue_destroy(f->shutdown_cq);
84 }
85
simple_request_body(grpc_end2end_test_config config,grpc_end2end_test_fixture f)86 static void simple_request_body(grpc_end2end_test_config config,
87 grpc_end2end_test_fixture f) {
88 grpc_call* c;
89 grpc_call* s;
90 cq_verifier* cqv = cq_verifier_create(f.cq);
91 grpc_op ops[6];
92 grpc_op* op;
93 grpc_metadata_array initial_metadata_recv;
94 grpc_metadata_array trailing_metadata_recv;
95 grpc_metadata_array request_metadata_recv;
96 grpc_call_details call_details;
97 grpc_status_code status;
98 grpc_call_error error;
99 grpc_slice details;
100 int was_cancelled = 2;
101
102 gpr_timespec deadline = five_seconds_from_now();
103 c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq,
104 grpc_slice_from_static_string("/foo"), nullptr,
105 deadline, nullptr);
106 GPR_ASSERT(c);
107
108 grpc_metadata_array_init(&initial_metadata_recv);
109 grpc_metadata_array_init(&trailing_metadata_recv);
110 grpc_metadata_array_init(&request_metadata_recv);
111 grpc_call_details_init(&call_details);
112
113 memset(ops, 0, sizeof(ops));
114 op = ops;
115 op->op = GRPC_OP_SEND_INITIAL_METADATA;
116 op->data.send_initial_metadata.count = 0;
117 op->flags = 0;
118 op->reserved = nullptr;
119 op++;
120 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
121 op->flags = 0;
122 op->reserved = nullptr;
123 op++;
124 op->op = GRPC_OP_RECV_INITIAL_METADATA;
125 op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
126 op->flags = 0;
127 op->reserved = nullptr;
128 op++;
129 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
130 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
131 op->data.recv_status_on_client.status = &status;
132 op->data.recv_status_on_client.status_details = &details;
133 op->flags = 0;
134 op->reserved = nullptr;
135 op++;
136 error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), tag(1),
137 nullptr);
138 GPR_ASSERT(GRPC_CALL_OK == error);
139
140 error =
141 grpc_server_request_call(f.server, &s, &call_details,
142 &request_metadata_recv, f.cq, f.cq, tag(101));
143 GPR_ASSERT(GRPC_CALL_OK == error);
144 CQ_EXPECT_COMPLETION(cqv, tag(101), 1);
145 cq_verify(cqv);
146
147 memset(ops, 0, sizeof(ops));
148 op = ops;
149 op->op = GRPC_OP_SEND_INITIAL_METADATA;
150 op->data.send_initial_metadata.count = 0;
151 op->flags = 0;
152 op->reserved = nullptr;
153 op++;
154 op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
155 op->data.send_status_from_server.trailing_metadata_count = 0;
156 op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED;
157 grpc_slice status_details = grpc_slice_from_static_string("xyz");
158 op->data.send_status_from_server.status_details = &status_details;
159 op->flags = 0;
160 op->reserved = nullptr;
161 op++;
162 op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
163 op->data.recv_close_on_server.cancelled = &was_cancelled;
164 op->flags = 0;
165 op->reserved = nullptr;
166 op++;
167 error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(102),
168 nullptr);
169 GPR_ASSERT(GRPC_CALL_OK == error);
170
171 CQ_EXPECT_COMPLETION(cqv, tag(102), 1);
172 CQ_EXPECT_COMPLETION(cqv, tag(1), 1);
173 cq_verify(cqv);
174
175 GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED);
176 GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
177 GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo"));
178 GPR_ASSERT(was_cancelled == 1);
179
180 grpc_slice_unref(details);
181 grpc_metadata_array_destroy(&initial_metadata_recv);
182 grpc_metadata_array_destroy(&trailing_metadata_recv);
183 grpc_metadata_array_destroy(&request_metadata_recv);
184 grpc_call_details_destroy(&call_details);
185
186 grpc_call_unref(c);
187 grpc_call_unref(s);
188
189 cq_verifier_destroy(cqv);
190 }
191
test_max_concurrent_streams(grpc_end2end_test_config config)192 static void test_max_concurrent_streams(grpc_end2end_test_config config) {
193 grpc_end2end_test_fixture f;
194 grpc_arg server_arg;
195 grpc_channel_args server_args;
196 grpc_call* c1;
197 grpc_call* c2;
198 grpc_call* s1;
199 grpc_call* s2;
200 int live_call;
201 gpr_timespec deadline;
202 cq_verifier* cqv;
203 grpc_event ev;
204 grpc_call_details call_details;
205 grpc_metadata_array request_metadata_recv;
206 grpc_metadata_array initial_metadata_recv1;
207 grpc_metadata_array trailing_metadata_recv1;
208 grpc_metadata_array initial_metadata_recv2;
209 grpc_metadata_array trailing_metadata_recv2;
210 grpc_status_code status1;
211 grpc_call_error error;
212 grpc_slice details1;
213 grpc_status_code status2;
214 grpc_slice details2;
215 grpc_op ops[6];
216 grpc_op* op;
217 int was_cancelled;
218 int got_client_start;
219 int got_server_start;
220
221 server_arg.key = const_cast<char*>(GRPC_ARG_MAX_CONCURRENT_STREAMS);
222 server_arg.type = GRPC_ARG_INTEGER;
223 server_arg.value.integer = 1;
224
225 server_args.num_args = 1;
226 server_args.args = &server_arg;
227
228 f = begin_test(config, "test_max_concurrent_streams", nullptr, &server_args);
229 cqv = cq_verifier_create(f.cq);
230
231 grpc_metadata_array_init(&request_metadata_recv);
232 grpc_metadata_array_init(&initial_metadata_recv1);
233 grpc_metadata_array_init(&trailing_metadata_recv1);
234 grpc_metadata_array_init(&initial_metadata_recv2);
235 grpc_metadata_array_init(&trailing_metadata_recv2);
236 grpc_call_details_init(&call_details);
237
238 /* perform a ping-pong to ensure that settings have had a chance to round
239 trip */
240 simple_request_body(config, f);
241 /* perform another one to make sure that the one stream case still works */
242 simple_request_body(config, f);
243
244 /* start two requests - ensuring that the second is not accepted until
245 the first completes */
246 deadline = n_seconds_from_now(1000);
247 c1 = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS,
248 f.cq, grpc_slice_from_static_string("/alpha"),
249 nullptr, deadline, nullptr);
250 GPR_ASSERT(c1);
251 c2 = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS,
252 f.cq, grpc_slice_from_static_string("/beta"),
253 nullptr, deadline, nullptr);
254 GPR_ASSERT(c2);
255
256 GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
257 f.server, &s1, &call_details,
258 &request_metadata_recv, f.cq, f.cq, tag(101)));
259
260 memset(ops, 0, sizeof(ops));
261 op = ops;
262 op->op = GRPC_OP_SEND_INITIAL_METADATA;
263 op->data.send_initial_metadata.count = 0;
264 op->flags = 0;
265 op->reserved = nullptr;
266 op++;
267 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
268 op->flags = 0;
269 op->reserved = nullptr;
270 op++;
271 error = grpc_call_start_batch(c1, ops, static_cast<size_t>(op - ops),
272 tag(301), nullptr);
273 GPR_ASSERT(GRPC_CALL_OK == error);
274
275 memset(ops, 0, sizeof(ops));
276 op = ops;
277 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
278 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv1;
279 op->data.recv_status_on_client.status = &status1;
280 op->data.recv_status_on_client.status_details = &details1;
281 op->flags = 0;
282 op->reserved = nullptr;
283 op++;
284 op->op = GRPC_OP_RECV_INITIAL_METADATA;
285 op->data.recv_initial_metadata.recv_initial_metadata =
286 &initial_metadata_recv1;
287 op->flags = 0;
288 op->reserved = nullptr;
289 op++;
290 error = grpc_call_start_batch(c1, ops, static_cast<size_t>(op - ops),
291 tag(302), nullptr);
292 GPR_ASSERT(GRPC_CALL_OK == error);
293
294 memset(ops, 0, sizeof(ops));
295 op = ops;
296 op->op = GRPC_OP_SEND_INITIAL_METADATA;
297 op->data.send_initial_metadata.count = 0;
298 op->flags = 0;
299 op->reserved = nullptr;
300 op++;
301 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
302 op->flags = 0;
303 op->reserved = nullptr;
304 op++;
305 error = grpc_call_start_batch(c2, ops, static_cast<size_t>(op - ops),
306 tag(401), nullptr);
307 GPR_ASSERT(GRPC_CALL_OK == error);
308
309 memset(ops, 0, sizeof(ops));
310 op = ops;
311 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
312 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv2;
313 op->data.recv_status_on_client.status = &status2;
314 op->data.recv_status_on_client.status_details = &details2;
315 op->flags = 0;
316 op->reserved = nullptr;
317 op++;
318 op->op = GRPC_OP_RECV_INITIAL_METADATA;
319 op->data.recv_initial_metadata.recv_initial_metadata =
320 &initial_metadata_recv1;
321 op->flags = 0;
322 op->reserved = nullptr;
323 op++;
324 error = grpc_call_start_batch(c2, ops, static_cast<size_t>(op - ops),
325 tag(402), nullptr);
326 GPR_ASSERT(GRPC_CALL_OK == error);
327
328 got_client_start = 0;
329 got_server_start = 0;
330 live_call = -1;
331 while (!got_client_start || !got_server_start) {
332 ev = grpc_completion_queue_next(f.cq, grpc_timeout_seconds_to_deadline(3),
333 nullptr);
334 GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
335 GPR_ASSERT(ev.success);
336 if (ev.tag == tag(101)) {
337 GPR_ASSERT(!got_server_start);
338 got_server_start = 1;
339 } else {
340 GPR_ASSERT(!got_client_start);
341 GPR_ASSERT(ev.tag == tag(301) || ev.tag == tag(401));
342 /* The /alpha or /beta calls started above could be invoked (but NOT
343 * both);
344 * check this here */
345 /* We'll get tag 303 or 403, we want 300, 400 */
346 live_call = (static_cast<int>((intptr_t)ev.tag)) - 1;
347 got_client_start = 1;
348 }
349 }
350 GPR_ASSERT(live_call == 300 || live_call == 400);
351
352 memset(ops, 0, sizeof(ops));
353 op = ops;
354 op->op = GRPC_OP_SEND_INITIAL_METADATA;
355 op->data.send_initial_metadata.count = 0;
356 op->flags = 0;
357 op->reserved = nullptr;
358 op++;
359 op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
360 op->data.recv_close_on_server.cancelled = &was_cancelled;
361 op->flags = 0;
362 op->reserved = nullptr;
363 op++;
364 op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
365 op->data.send_status_from_server.trailing_metadata_count = 0;
366 op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED;
367 grpc_slice status_details = grpc_slice_from_static_string("xyz");
368 op->data.send_status_from_server.status_details = &status_details;
369 op->flags = 0;
370 op->reserved = nullptr;
371 op++;
372 error = grpc_call_start_batch(s1, ops, static_cast<size_t>(op - ops),
373 tag(102), nullptr);
374 GPR_ASSERT(GRPC_CALL_OK == error);
375
376 CQ_EXPECT_COMPLETION(cqv, tag(102), 1);
377 CQ_EXPECT_COMPLETION(cqv, tag(live_call + 2), 1);
378 /* first request is finished, we should be able to start the second */
379 live_call = (live_call == 300) ? 400 : 300;
380 CQ_EXPECT_COMPLETION(cqv, tag(live_call + 1), 1);
381 cq_verify(cqv);
382
383 grpc_call_details_destroy(&call_details);
384
385 GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
386 f.server, &s2, &call_details,
387 &request_metadata_recv, f.cq, f.cq, tag(201)));
388 CQ_EXPECT_COMPLETION(cqv, tag(201), 1);
389 cq_verify(cqv);
390
391 memset(ops, 0, sizeof(ops));
392 op = ops;
393 op->op = GRPC_OP_SEND_INITIAL_METADATA;
394 op->data.send_initial_metadata.count = 0;
395 op->flags = 0;
396 op->reserved = nullptr;
397 op++;
398 op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
399 op->data.recv_close_on_server.cancelled = &was_cancelled;
400 op->flags = 0;
401 op->reserved = nullptr;
402 op++;
403 op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
404 op->data.send_status_from_server.trailing_metadata_count = 0;
405 op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED;
406 op->data.send_status_from_server.status_details = &status_details;
407 op->flags = 0;
408 op->reserved = nullptr;
409 op++;
410 error = grpc_call_start_batch(s2, ops, static_cast<size_t>(op - ops),
411 tag(202), nullptr);
412 GPR_ASSERT(GRPC_CALL_OK == error);
413
414 CQ_EXPECT_COMPLETION(cqv, tag(live_call + 2), 1);
415 CQ_EXPECT_COMPLETION(cqv, tag(202), 1);
416 cq_verify(cqv);
417
418 cq_verifier_destroy(cqv);
419
420 grpc_call_unref(c1);
421 grpc_call_unref(s1);
422 grpc_call_unref(c2);
423 grpc_call_unref(s2);
424
425 grpc_slice_unref(details1);
426 grpc_slice_unref(details2);
427 grpc_metadata_array_destroy(&initial_metadata_recv1);
428 grpc_metadata_array_destroy(&trailing_metadata_recv1);
429 grpc_metadata_array_destroy(&initial_metadata_recv2);
430 grpc_metadata_array_destroy(&trailing_metadata_recv2);
431 grpc_metadata_array_destroy(&request_metadata_recv);
432 grpc_call_details_destroy(&call_details);
433
434 end_test(&f);
435 config.tear_down_data(&f);
436 }
437
test_max_concurrent_streams_with_timeout_on_first(grpc_end2end_test_config config)438 static void test_max_concurrent_streams_with_timeout_on_first(
439 grpc_end2end_test_config config) {
440 grpc_end2end_test_fixture f;
441 grpc_arg server_arg;
442 grpc_channel_args server_args;
443 grpc_call* c1;
444 grpc_call* c2;
445 grpc_call* s1;
446 grpc_call* s2;
447 cq_verifier* cqv;
448 grpc_call_details call_details;
449 grpc_metadata_array request_metadata_recv;
450 grpc_metadata_array initial_metadata_recv1;
451 grpc_metadata_array trailing_metadata_recv1;
452 grpc_metadata_array initial_metadata_recv2;
453 grpc_metadata_array trailing_metadata_recv2;
454 grpc_status_code status1;
455 grpc_call_error error;
456 grpc_slice details1 = grpc_empty_slice();
457 grpc_status_code status2;
458 grpc_slice details2 = grpc_empty_slice();
459 grpc_op ops[6];
460 grpc_op* op;
461 int was_cancelled;
462
463 server_arg.key = const_cast<char*>(GRPC_ARG_MAX_CONCURRENT_STREAMS);
464 server_arg.type = GRPC_ARG_INTEGER;
465 server_arg.value.integer = 1;
466
467 server_args.num_args = 1;
468 server_args.args = &server_arg;
469
470 f = begin_test(config, "test_max_concurrent_streams_with_timeout_on_first",
471 nullptr, &server_args);
472 cqv = cq_verifier_create(f.cq);
473
474 grpc_metadata_array_init(&request_metadata_recv);
475 grpc_metadata_array_init(&initial_metadata_recv1);
476 grpc_metadata_array_init(&trailing_metadata_recv1);
477 grpc_metadata_array_init(&initial_metadata_recv2);
478 grpc_metadata_array_init(&trailing_metadata_recv2);
479 grpc_call_details_init(&call_details);
480
481 /* perform a ping-pong to ensure that settings have had a chance to round
482 trip */
483 simple_request_body(config, f);
484 /* perform another one to make sure that the one stream case still works */
485 simple_request_body(config, f);
486
487 /* start two requests - ensuring that the second is not accepted until
488 the first completes */
489 c1 = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS,
490 f.cq, grpc_slice_from_static_string("/alpha"),
491 nullptr, n_seconds_from_now(3), nullptr);
492 GPR_ASSERT(c1);
493 c2 = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS,
494 f.cq, grpc_slice_from_static_string("/beta"),
495 nullptr, n_seconds_from_now(1000), nullptr);
496 GPR_ASSERT(c2);
497
498 GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
499 f.server, &s1, &call_details,
500 &request_metadata_recv, f.cq, f.cq, tag(101)));
501
502 memset(ops, 0, sizeof(ops));
503 op = ops;
504 op->op = GRPC_OP_SEND_INITIAL_METADATA;
505 op->data.send_initial_metadata.count = 0;
506 op->flags = 0;
507 op->reserved = nullptr;
508 op++;
509 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
510 op->flags = 0;
511 op->reserved = nullptr;
512 op++;
513 error = grpc_call_start_batch(c1, ops, static_cast<size_t>(op - ops),
514 tag(301), nullptr);
515 GPR_ASSERT(GRPC_CALL_OK == error);
516
517 memset(ops, 0, sizeof(ops));
518 op = ops;
519 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
520 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv1;
521 op->data.recv_status_on_client.status = &status1;
522 op->data.recv_status_on_client.status_details = &details1;
523 op->flags = 0;
524 op->reserved = nullptr;
525 op++;
526 op->op = GRPC_OP_RECV_INITIAL_METADATA;
527 op->data.recv_initial_metadata.recv_initial_metadata =
528 &initial_metadata_recv1;
529 op->flags = 0;
530 op->reserved = nullptr;
531 op++;
532 error = grpc_call_start_batch(c1, ops, static_cast<size_t>(op - ops),
533 tag(302), nullptr);
534 GPR_ASSERT(GRPC_CALL_OK == error);
535
536 CQ_EXPECT_COMPLETION(cqv, tag(101), 1);
537 CQ_EXPECT_COMPLETION(cqv, tag(301), 1);
538 cq_verify(cqv);
539
540 memset(ops, 0, sizeof(ops));
541 op = ops;
542 op->op = GRPC_OP_SEND_INITIAL_METADATA;
543 op->data.send_initial_metadata.count = 0;
544 op->flags = 0;
545 op->reserved = nullptr;
546 op++;
547 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
548 op->flags = 0;
549 op->reserved = nullptr;
550 op++;
551 error = grpc_call_start_batch(c2, ops, static_cast<size_t>(op - ops),
552 tag(401), nullptr);
553 GPR_ASSERT(GRPC_CALL_OK == error);
554
555 memset(ops, 0, sizeof(ops));
556 op = ops;
557 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
558 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv2;
559 op->data.recv_status_on_client.status = &status2;
560 op->data.recv_status_on_client.status_details = &details2;
561 op->flags = 0;
562 op->reserved = nullptr;
563 op++;
564 op->op = GRPC_OP_RECV_INITIAL_METADATA;
565 op->data.recv_initial_metadata.recv_initial_metadata =
566 &initial_metadata_recv2;
567 op->flags = 0;
568 op->reserved = nullptr;
569 op++;
570 error = grpc_call_start_batch(c2, ops, static_cast<size_t>(op - ops),
571 tag(402), nullptr);
572 GPR_ASSERT(GRPC_CALL_OK == error);
573
574 grpc_call_details_destroy(&call_details);
575 grpc_call_details_init(&call_details);
576 GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
577 f.server, &s2, &call_details,
578 &request_metadata_recv, f.cq, f.cq, tag(201)));
579
580 CQ_EXPECT_COMPLETION(cqv, tag(302), 1);
581 /* first request is finished, we should be able to start the second */
582 CQ_EXPECT_COMPLETION(cqv, tag(401), 1);
583 CQ_EXPECT_COMPLETION(cqv, tag(201), 1);
584 cq_verify(cqv);
585
586 memset(ops, 0, sizeof(ops));
587 op = ops;
588 op->op = GRPC_OP_SEND_INITIAL_METADATA;
589 op->data.send_initial_metadata.count = 0;
590 op->flags = 0;
591 op->reserved = nullptr;
592 op++;
593 op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
594 op->data.recv_close_on_server.cancelled = &was_cancelled;
595 op->flags = 0;
596 op->reserved = nullptr;
597 op++;
598 op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
599 op->data.send_status_from_server.trailing_metadata_count = 0;
600 op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED;
601 grpc_slice status_details = grpc_slice_from_static_string("xyz");
602 op->data.send_status_from_server.status_details = &status_details;
603 op->flags = 0;
604 op->reserved = nullptr;
605 op++;
606 error = grpc_call_start_batch(s2, ops, static_cast<size_t>(op - ops),
607 tag(202), nullptr);
608 GPR_ASSERT(GRPC_CALL_OK == error);
609
610 CQ_EXPECT_COMPLETION(cqv, tag(402), 1);
611 CQ_EXPECT_COMPLETION(cqv, tag(202), 1);
612 cq_verify(cqv);
613
614 cq_verifier_destroy(cqv);
615
616 grpc_call_unref(c1);
617 grpc_call_unref(s1);
618 grpc_call_unref(c2);
619 grpc_call_unref(s2);
620
621 grpc_slice_unref(details1);
622 grpc_slice_unref(details2);
623 grpc_metadata_array_destroy(&initial_metadata_recv1);
624 grpc_metadata_array_destroy(&trailing_metadata_recv1);
625 grpc_metadata_array_destroy(&initial_metadata_recv2);
626 grpc_metadata_array_destroy(&trailing_metadata_recv2);
627 grpc_metadata_array_destroy(&request_metadata_recv);
628 grpc_call_details_destroy(&call_details);
629
630 end_test(&f);
631 config.tear_down_data(&f);
632 }
633
test_max_concurrent_streams_with_timeout_on_second(grpc_end2end_test_config config)634 static void test_max_concurrent_streams_with_timeout_on_second(
635 grpc_end2end_test_config config) {
636 grpc_end2end_test_fixture f;
637 grpc_arg server_arg;
638 grpc_channel_args server_args;
639 grpc_call* c1;
640 grpc_call* c2;
641 grpc_call* s1;
642 cq_verifier* cqv;
643 grpc_call_details call_details;
644 grpc_metadata_array request_metadata_recv;
645 grpc_metadata_array initial_metadata_recv1;
646 grpc_metadata_array trailing_metadata_recv1;
647 grpc_metadata_array initial_metadata_recv2;
648 grpc_metadata_array trailing_metadata_recv2;
649 grpc_status_code status1;
650 grpc_call_error error;
651 grpc_slice details1 = grpc_empty_slice();
652 grpc_status_code status2;
653 grpc_slice details2 = grpc_empty_slice();
654 grpc_op ops[6];
655 grpc_op* op;
656 int was_cancelled;
657
658 server_arg.key = const_cast<char*>(GRPC_ARG_MAX_CONCURRENT_STREAMS);
659 server_arg.type = GRPC_ARG_INTEGER;
660 server_arg.value.integer = 1;
661
662 server_args.num_args = 1;
663 server_args.args = &server_arg;
664
665 f = begin_test(config, "test_max_concurrent_streams_with_timeout_on_second",
666 nullptr, &server_args);
667 cqv = cq_verifier_create(f.cq);
668
669 grpc_metadata_array_init(&request_metadata_recv);
670 grpc_metadata_array_init(&initial_metadata_recv1);
671 grpc_metadata_array_init(&trailing_metadata_recv1);
672 grpc_metadata_array_init(&initial_metadata_recv2);
673 grpc_metadata_array_init(&trailing_metadata_recv2);
674 grpc_call_details_init(&call_details);
675
676 /* perform a ping-pong to ensure that settings have had a chance to round
677 trip */
678 simple_request_body(config, f);
679 /* perform another one to make sure that the one stream case still works */
680 simple_request_body(config, f);
681
682 /* start two requests - ensuring that the second is not accepted until
683 the first completes , and the second request will timeout in the
684 concurrent_list */
685 c1 = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS,
686 f.cq, grpc_slice_from_static_string("/alpha"),
687 nullptr, n_seconds_from_now(1000), nullptr);
688 GPR_ASSERT(c1);
689 c2 = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS,
690 f.cq, grpc_slice_from_static_string("/beta"),
691 nullptr, n_seconds_from_now(3), nullptr);
692 GPR_ASSERT(c2);
693
694 GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
695 f.server, &s1, &call_details,
696 &request_metadata_recv, f.cq, f.cq, tag(101)));
697
698 memset(ops, 0, sizeof(ops));
699 op = ops;
700 op->op = GRPC_OP_SEND_INITIAL_METADATA;
701 op->data.send_initial_metadata.count = 0;
702 op->flags = 0;
703 op->reserved = nullptr;
704 op++;
705 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
706 op->flags = 0;
707 op->reserved = nullptr;
708 op++;
709 error = grpc_call_start_batch(c1, ops, static_cast<size_t>(op - ops),
710 tag(301), nullptr);
711 GPR_ASSERT(GRPC_CALL_OK == error);
712
713 memset(ops, 0, sizeof(ops));
714 op = ops;
715 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
716 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv1;
717 op->data.recv_status_on_client.status = &status1;
718 op->data.recv_status_on_client.status_details = &details1;
719 op->flags = 0;
720 op->reserved = nullptr;
721 op++;
722 op->op = GRPC_OP_RECV_INITIAL_METADATA;
723 op->data.recv_initial_metadata.recv_initial_metadata =
724 &initial_metadata_recv1;
725 op->flags = 0;
726 op->reserved = nullptr;
727 op++;
728 error = grpc_call_start_batch(c1, ops, static_cast<size_t>(op - ops),
729 tag(302), nullptr);
730 GPR_ASSERT(GRPC_CALL_OK == error);
731
732 CQ_EXPECT_COMPLETION(cqv, tag(101), 1);
733 CQ_EXPECT_COMPLETION(cqv, tag(301), 1);
734 cq_verify(cqv);
735
736 memset(ops, 0, sizeof(ops));
737 op = ops;
738 op->op = GRPC_OP_SEND_INITIAL_METADATA;
739 op->data.send_initial_metadata.count = 0;
740 op->flags = 0;
741 op->reserved = nullptr;
742 op++;
743 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
744 op->flags = 0;
745 op->reserved = nullptr;
746 op++;
747 error = grpc_call_start_batch(c2, ops, static_cast<size_t>(op - ops),
748 tag(401), nullptr);
749 GPR_ASSERT(GRPC_CALL_OK == error);
750
751 memset(ops, 0, sizeof(ops));
752 op = ops;
753 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
754 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv2;
755 op->data.recv_status_on_client.status = &status2;
756 op->data.recv_status_on_client.status_details = &details2;
757 op->flags = 0;
758 op->reserved = nullptr;
759 op++;
760 op->op = GRPC_OP_RECV_INITIAL_METADATA;
761 op->data.recv_initial_metadata.recv_initial_metadata =
762 &initial_metadata_recv2;
763 op->flags = 0;
764 op->reserved = nullptr;
765 op++;
766 error = grpc_call_start_batch(c2, ops, static_cast<size_t>(op - ops),
767 tag(402), nullptr);
768 GPR_ASSERT(GRPC_CALL_OK == error);
769
770 /* the second request is time out*/
771 CQ_EXPECT_COMPLETION(cqv, tag(401), 0);
772 CQ_EXPECT_COMPLETION(cqv, tag(402), 1);
773 cq_verify(cqv);
774
775 /* second request is finished because of time out, so destroy the second call
776 */
777 grpc_call_unref(c2);
778
779 /* now reply the first call */
780 memset(ops, 0, sizeof(ops));
781 op = ops;
782 op->op = GRPC_OP_SEND_INITIAL_METADATA;
783 op->data.send_initial_metadata.count = 0;
784 op->flags = 0;
785 op->reserved = nullptr;
786 op++;
787 op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
788 op->data.recv_close_on_server.cancelled = &was_cancelled;
789 op->flags = 0;
790 op->reserved = nullptr;
791 op++;
792 op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
793 op->data.send_status_from_server.trailing_metadata_count = 0;
794 op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED;
795 grpc_slice status_details = grpc_slice_from_static_string("xyz");
796 op->data.send_status_from_server.status_details = &status_details;
797 op->flags = 0;
798 op->reserved = nullptr;
799 op++;
800 error = grpc_call_start_batch(s1, ops, static_cast<size_t>(op - ops),
801 tag(102), nullptr);
802 GPR_ASSERT(GRPC_CALL_OK == error);
803
804 CQ_EXPECT_COMPLETION(cqv, tag(302), 1);
805 CQ_EXPECT_COMPLETION(cqv, tag(102), 1);
806 cq_verify(cqv);
807
808 cq_verifier_destroy(cqv);
809
810 grpc_call_unref(c1);
811 grpc_call_unref(s1);
812
813 grpc_slice_unref(details1);
814 grpc_slice_unref(details2);
815 grpc_metadata_array_destroy(&initial_metadata_recv1);
816 grpc_metadata_array_destroy(&trailing_metadata_recv1);
817 grpc_metadata_array_destroy(&initial_metadata_recv2);
818 grpc_metadata_array_destroy(&trailing_metadata_recv2);
819 grpc_metadata_array_destroy(&request_metadata_recv);
820 grpc_call_details_destroy(&call_details);
821
822 end_test(&f);
823 config.tear_down_data(&f);
824 }
825
max_concurrent_streams(grpc_end2end_test_config config)826 void max_concurrent_streams(grpc_end2end_test_config config) {
827 test_max_concurrent_streams_with_timeout_on_first(config);
828 test_max_concurrent_streams_with_timeout_on_second(config);
829 test_max_concurrent_streams(config);
830 }
831
max_concurrent_streams_pre_init(void)832 void max_concurrent_streams_pre_init(void) {}
833