1 /*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include "test/core/end2end/end2end_tests.h"
20
21 #include <stdio.h>
22 #include <string.h>
23
24 #include <string>
25
26 #include "absl/strings/str_cat.h"
27
28 #include <grpc/byte_buffer.h>
29 #include <grpc/grpc.h>
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/log.h>
32 #include <grpc/support/time.h>
33
34 #include "src/core/lib/channel/channel_args.h"
35 #include "src/core/lib/gpr/string.h"
36 #include "src/core/lib/gpr/useful.h"
37 #include "src/core/lib/iomgr/exec_ctx.h"
38 #include "src/core/lib/transport/static_metadata.h"
39
40 #include "test/core/end2end/cq_verifier.h"
41 #include "test/core/end2end/tests/cancel_test_helpers.h"
42
tag(intptr_t t)43 static void* tag(intptr_t t) { return (void*)t; }
44
begin_test(grpc_end2end_test_config config,const char * test_name,grpc_channel_args * client_args,grpc_channel_args * server_args)45 static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
46 const char* test_name,
47 grpc_channel_args* client_args,
48 grpc_channel_args* server_args) {
49 grpc_end2end_test_fixture f;
50 gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name);
51 f = config.create_fixture(client_args, server_args);
52 config.init_server(&f, server_args);
53 config.init_client(&f, client_args);
54 return f;
55 }
56
n_seconds_from_now(int n)57 static gpr_timespec n_seconds_from_now(int n) {
58 return grpc_timeout_seconds_to_deadline(n);
59 }
60
five_seconds_from_now(void)61 static gpr_timespec five_seconds_from_now(void) {
62 return n_seconds_from_now(5);
63 }
64
drain_cq(grpc_completion_queue * cq)65 static void drain_cq(grpc_completion_queue* cq) {
66 grpc_event ev;
67 do {
68 ev = grpc_completion_queue_next(cq, five_seconds_from_now(), nullptr);
69 } while (ev.type != GRPC_QUEUE_SHUTDOWN);
70 }
71
shutdown_server(grpc_end2end_test_fixture * f)72 static void shutdown_server(grpc_end2end_test_fixture* f) {
73 if (!f->server) return;
74 grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000));
75 GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000),
76 grpc_timeout_seconds_to_deadline(5),
77 nullptr)
78 .type == GRPC_OP_COMPLETE);
79 grpc_server_destroy(f->server);
80 f->server = nullptr;
81 }
82
shutdown_client(grpc_end2end_test_fixture * f)83 static void shutdown_client(grpc_end2end_test_fixture* f) {
84 if (!f->client) return;
85 grpc_channel_destroy(f->client);
86 f->client = nullptr;
87 }
88
end_test(grpc_end2end_test_fixture * f)89 static void end_test(grpc_end2end_test_fixture* f) {
90 shutdown_server(f);
91 shutdown_client(f);
92
93 grpc_completion_queue_shutdown(f->cq);
94 drain_cq(f->cq);
95 grpc_completion_queue_destroy(f->cq);
96 grpc_completion_queue_destroy(f->shutdown_cq);
97 }
98
99 // Tests retry cancellation.
test_retry_cancellation(grpc_end2end_test_config config,cancellation_mode mode)100 static void test_retry_cancellation(grpc_end2end_test_config config,
101 cancellation_mode mode) {
102 grpc_call* c;
103 grpc_call* s;
104 grpc_op ops[6];
105 grpc_op* op;
106 grpc_metadata_array initial_metadata_recv;
107 grpc_metadata_array trailing_metadata_recv;
108 grpc_metadata_array request_metadata_recv;
109 grpc_call_details call_details;
110 grpc_slice request_payload_slice = grpc_slice_from_static_string("foo");
111 grpc_slice response_payload_slice = grpc_slice_from_static_string("bar");
112 grpc_byte_buffer* request_payload =
113 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
114 grpc_byte_buffer* response_payload =
115 grpc_raw_byte_buffer_create(&response_payload_slice, 1);
116 grpc_byte_buffer* request_payload_recv = nullptr;
117 grpc_byte_buffer* response_payload_recv = nullptr;
118 grpc_status_code status;
119 grpc_call_error error;
120 grpc_slice details;
121 int was_cancelled = 2;
122 char* peer;
123
124 grpc_arg arg;
125 arg.type = GRPC_ARG_STRING;
126 arg.key = const_cast<char*>(GRPC_ARG_SERVICE_CONFIG);
127 arg.value.string = const_cast<char*>(
128 "{\n"
129 " \"methodConfig\": [ {\n"
130 " \"name\": [\n"
131 " { \"service\": \"service\", \"method\": \"method\" }\n"
132 " ],\n"
133 " \"retryPolicy\": {\n"
134 " \"maxAttempts\": 3,\n"
135 " \"initialBackoff\": \"1s\",\n"
136 " \"maxBackoff\": \"120s\",\n"
137 " \"backoffMultiplier\": 1.6,\n"
138 " \"retryableStatusCodes\": [ \"ABORTED\" ]\n"
139 " },\n"
140 " \"timeout\": \"5s\"\n"
141 " } ]\n"
142 "}");
143 grpc_channel_args client_args = {1, &arg};
144 std::string name = absl::StrCat("retry_cancellation/%s", mode.name);
145 grpc_end2end_test_fixture f =
146 begin_test(config, name.c_str(), &client_args, nullptr);
147
148 cq_verifier* cqv = cq_verifier_create(f.cq);
149
150 gpr_timespec deadline = five_seconds_from_now();
151 c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq,
152 grpc_slice_from_static_string("/service/method"),
153 nullptr, deadline, nullptr);
154 GPR_ASSERT(c);
155
156 peer = grpc_call_get_peer(c);
157 GPR_ASSERT(peer != nullptr);
158 gpr_log(GPR_DEBUG, "client_peer_before_call=%s", peer);
159 gpr_free(peer);
160
161 grpc_metadata_array_init(&initial_metadata_recv);
162 grpc_metadata_array_init(&trailing_metadata_recv);
163 grpc_metadata_array_init(&request_metadata_recv);
164 grpc_call_details_init(&call_details);
165 grpc_slice status_details = grpc_slice_from_static_string("xyz");
166
167 // Client starts a batch with all 6 ops.
168 memset(ops, 0, sizeof(ops));
169 op = ops;
170 op->op = GRPC_OP_SEND_INITIAL_METADATA;
171 op->data.send_initial_metadata.count = 0;
172 op++;
173 op->op = GRPC_OP_SEND_MESSAGE;
174 op->data.send_message.send_message = request_payload;
175 op++;
176 op->op = GRPC_OP_RECV_MESSAGE;
177 op->data.recv_message.recv_message = &response_payload_recv;
178 op++;
179 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
180 op++;
181 op->op = GRPC_OP_RECV_INITIAL_METADATA;
182 op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
183 op++;
184 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
185 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
186 op->data.recv_status_on_client.status = &status;
187 op->data.recv_status_on_client.status_details = &details;
188 op++;
189 error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(1), nullptr);
190 GPR_ASSERT(GRPC_CALL_OK == error);
191
192 // Server gets a call and fails with retryable status.
193 error =
194 grpc_server_request_call(f.server, &s, &call_details,
195 &request_metadata_recv, f.cq, f.cq, tag(101));
196 GPR_ASSERT(GRPC_CALL_OK == error);
197 CQ_EXPECT_COMPLETION(cqv, tag(101), true);
198 cq_verify(cqv);
199
200 peer = grpc_call_get_peer(s);
201 GPR_ASSERT(peer != nullptr);
202 gpr_log(GPR_DEBUG, "server_peer=%s", peer);
203 gpr_free(peer);
204 peer = grpc_call_get_peer(c);
205 GPR_ASSERT(peer != nullptr);
206 gpr_log(GPR_DEBUG, "client_peer=%s", peer);
207 gpr_free(peer);
208
209 memset(ops, 0, sizeof(ops));
210 op = ops;
211 op->op = GRPC_OP_SEND_INITIAL_METADATA;
212 op->data.send_initial_metadata.count = 0;
213 op++;
214 op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
215 op->data.send_status_from_server.trailing_metadata_count = 0;
216 op->data.send_status_from_server.status = GRPC_STATUS_ABORTED;
217 op->data.send_status_from_server.status_details = &status_details;
218 op++;
219 op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
220 op->data.recv_close_on_server.cancelled = &was_cancelled;
221 op++;
222 error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(102), nullptr);
223 GPR_ASSERT(GRPC_CALL_OK == error);
224
225 CQ_EXPECT_COMPLETION(cqv, tag(102), true);
226 cq_verify(cqv);
227
228 grpc_call_unref(s);
229 grpc_metadata_array_destroy(&request_metadata_recv);
230 grpc_metadata_array_init(&request_metadata_recv);
231 grpc_call_details_destroy(&call_details);
232 grpc_call_details_init(&call_details);
233
234 // Server gets a second call (the retry).
235 error =
236 grpc_server_request_call(f.server, &s, &call_details,
237 &request_metadata_recv, f.cq, f.cq, tag(201));
238 GPR_ASSERT(GRPC_CALL_OK == error);
239 CQ_EXPECT_COMPLETION(cqv, tag(201), true);
240 cq_verify(cqv);
241
242 // Initiate cancellation.
243 GPR_ASSERT(GRPC_CALL_OK == mode.initiate_cancel(c, nullptr));
244
245 CQ_EXPECT_COMPLETION(cqv, tag(1), true);
246 cq_verify(cqv);
247
248 GPR_ASSERT(status == mode.expect_status);
249 GPR_ASSERT(was_cancelled == 0);
250
251 grpc_slice_unref(details);
252 grpc_metadata_array_destroy(&initial_metadata_recv);
253 grpc_metadata_array_destroy(&trailing_metadata_recv);
254 grpc_metadata_array_destroy(&request_metadata_recv);
255 grpc_call_details_destroy(&call_details);
256 grpc_byte_buffer_destroy(request_payload);
257 grpc_byte_buffer_destroy(response_payload);
258 grpc_byte_buffer_destroy(request_payload_recv);
259 grpc_byte_buffer_destroy(response_payload_recv);
260
261 grpc_call_unref(c);
262 grpc_call_unref(s);
263
264 cq_verifier_destroy(cqv);
265
266 end_test(&f);
267 config.tear_down_data(&f);
268 }
269
retry_cancellation(grpc_end2end_test_config config)270 void retry_cancellation(grpc_end2end_test_config config) {
271 GPR_ASSERT(config.feature_mask & FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL);
272 for (size_t i = 0; i < GPR_ARRAY_SIZE(cancellation_modes); ++i) {
273 test_retry_cancellation(config, cancellation_modes[i]);
274 }
275 }
276
retry_cancellation_pre_init(void)277 void retry_cancellation_pre_init(void) {}
278