• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include "test/core/end2end/end2end_tests.h"
20 
21 #include <stdio.h>
22 #include <string.h>
23 
24 #include <string>
25 
26 #include "absl/strings/str_cat.h"
27 
28 #include <grpc/byte_buffer.h>
29 #include <grpc/grpc.h>
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/log.h>
32 #include <grpc/support/time.h>
33 
34 #include "src/core/lib/channel/channel_args.h"
35 #include "src/core/lib/gpr/string.h"
36 #include "src/core/lib/gpr/useful.h"
37 #include "src/core/lib/iomgr/exec_ctx.h"
38 #include "src/core/lib/transport/static_metadata.h"
39 
40 #include "test/core/end2end/cq_verifier.h"
41 #include "test/core/end2end/tests/cancel_test_helpers.h"
42 
tag(intptr_t t)43 static void* tag(intptr_t t) { return (void*)t; }
44 
begin_test(grpc_end2end_test_config config,const char * test_name,grpc_channel_args * client_args,grpc_channel_args * server_args)45 static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
46                                             const char* test_name,
47                                             grpc_channel_args* client_args,
48                                             grpc_channel_args* server_args) {
49   grpc_end2end_test_fixture f;
50   gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name);
51   f = config.create_fixture(client_args, server_args);
52   config.init_server(&f, server_args);
53   config.init_client(&f, client_args);
54   return f;
55 }
56 
n_seconds_from_now(int n)57 static gpr_timespec n_seconds_from_now(int n) {
58   return grpc_timeout_seconds_to_deadline(n);
59 }
60 
five_seconds_from_now(void)61 static gpr_timespec five_seconds_from_now(void) {
62   return n_seconds_from_now(5);
63 }
64 
drain_cq(grpc_completion_queue * cq)65 static void drain_cq(grpc_completion_queue* cq) {
66   grpc_event ev;
67   do {
68     ev = grpc_completion_queue_next(cq, five_seconds_from_now(), nullptr);
69   } while (ev.type != GRPC_QUEUE_SHUTDOWN);
70 }
71 
shutdown_server(grpc_end2end_test_fixture * f)72 static void shutdown_server(grpc_end2end_test_fixture* f) {
73   if (!f->server) return;
74   grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000));
75   GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000),
76                                          grpc_timeout_seconds_to_deadline(5),
77                                          nullptr)
78                  .type == GRPC_OP_COMPLETE);
79   grpc_server_destroy(f->server);
80   f->server = nullptr;
81 }
82 
shutdown_client(grpc_end2end_test_fixture * f)83 static void shutdown_client(grpc_end2end_test_fixture* f) {
84   if (!f->client) return;
85   grpc_channel_destroy(f->client);
86   f->client = nullptr;
87 }
88 
end_test(grpc_end2end_test_fixture * f)89 static void end_test(grpc_end2end_test_fixture* f) {
90   shutdown_server(f);
91   shutdown_client(f);
92 
93   grpc_completion_queue_shutdown(f->cq);
94   drain_cq(f->cq);
95   grpc_completion_queue_destroy(f->cq);
96   grpc_completion_queue_destroy(f->shutdown_cq);
97 }
98 
99 // Tests retry cancellation.
test_retry_cancellation(grpc_end2end_test_config config,cancellation_mode mode)100 static void test_retry_cancellation(grpc_end2end_test_config config,
101                                     cancellation_mode mode) {
102   grpc_call* c;
103   grpc_call* s;
104   grpc_op ops[6];
105   grpc_op* op;
106   grpc_metadata_array initial_metadata_recv;
107   grpc_metadata_array trailing_metadata_recv;
108   grpc_metadata_array request_metadata_recv;
109   grpc_call_details call_details;
110   grpc_slice request_payload_slice = grpc_slice_from_static_string("foo");
111   grpc_slice response_payload_slice = grpc_slice_from_static_string("bar");
112   grpc_byte_buffer* request_payload =
113       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
114   grpc_byte_buffer* response_payload =
115       grpc_raw_byte_buffer_create(&response_payload_slice, 1);
116   grpc_byte_buffer* request_payload_recv = nullptr;
117   grpc_byte_buffer* response_payload_recv = nullptr;
118   grpc_status_code status;
119   grpc_call_error error;
120   grpc_slice details;
121   int was_cancelled = 2;
122   char* peer;
123 
124   grpc_arg arg;
125   arg.type = GRPC_ARG_STRING;
126   arg.key = const_cast<char*>(GRPC_ARG_SERVICE_CONFIG);
127   arg.value.string = const_cast<char*>(
128       "{\n"
129       "  \"methodConfig\": [ {\n"
130       "    \"name\": [\n"
131       "      { \"service\": \"service\", \"method\": \"method\" }\n"
132       "    ],\n"
133       "    \"retryPolicy\": {\n"
134       "      \"maxAttempts\": 3,\n"
135       "      \"initialBackoff\": \"1s\",\n"
136       "      \"maxBackoff\": \"120s\",\n"
137       "      \"backoffMultiplier\": 1.6,\n"
138       "      \"retryableStatusCodes\": [ \"ABORTED\" ]\n"
139       "    },\n"
140       "    \"timeout\": \"5s\"\n"
141       "  } ]\n"
142       "}");
143   grpc_channel_args client_args = {1, &arg};
144   std::string name = absl::StrCat("retry_cancellation/%s", mode.name);
145   grpc_end2end_test_fixture f =
146       begin_test(config, name.c_str(), &client_args, nullptr);
147 
148   cq_verifier* cqv = cq_verifier_create(f.cq);
149 
150   gpr_timespec deadline = five_seconds_from_now();
151   c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq,
152                                grpc_slice_from_static_string("/service/method"),
153                                nullptr, deadline, nullptr);
154   GPR_ASSERT(c);
155 
156   peer = grpc_call_get_peer(c);
157   GPR_ASSERT(peer != nullptr);
158   gpr_log(GPR_DEBUG, "client_peer_before_call=%s", peer);
159   gpr_free(peer);
160 
161   grpc_metadata_array_init(&initial_metadata_recv);
162   grpc_metadata_array_init(&trailing_metadata_recv);
163   grpc_metadata_array_init(&request_metadata_recv);
164   grpc_call_details_init(&call_details);
165   grpc_slice status_details = grpc_slice_from_static_string("xyz");
166 
167   // Client starts a batch with all 6 ops.
168   memset(ops, 0, sizeof(ops));
169   op = ops;
170   op->op = GRPC_OP_SEND_INITIAL_METADATA;
171   op->data.send_initial_metadata.count = 0;
172   op++;
173   op->op = GRPC_OP_SEND_MESSAGE;
174   op->data.send_message.send_message = request_payload;
175   op++;
176   op->op = GRPC_OP_RECV_MESSAGE;
177   op->data.recv_message.recv_message = &response_payload_recv;
178   op++;
179   op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
180   op++;
181   op->op = GRPC_OP_RECV_INITIAL_METADATA;
182   op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
183   op++;
184   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
185   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
186   op->data.recv_status_on_client.status = &status;
187   op->data.recv_status_on_client.status_details = &details;
188   op++;
189   error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(1), nullptr);
190   GPR_ASSERT(GRPC_CALL_OK == error);
191 
192   // Server gets a call and fails with retryable status.
193   error =
194       grpc_server_request_call(f.server, &s, &call_details,
195                                &request_metadata_recv, f.cq, f.cq, tag(101));
196   GPR_ASSERT(GRPC_CALL_OK == error);
197   CQ_EXPECT_COMPLETION(cqv, tag(101), true);
198   cq_verify(cqv);
199 
200   peer = grpc_call_get_peer(s);
201   GPR_ASSERT(peer != nullptr);
202   gpr_log(GPR_DEBUG, "server_peer=%s", peer);
203   gpr_free(peer);
204   peer = grpc_call_get_peer(c);
205   GPR_ASSERT(peer != nullptr);
206   gpr_log(GPR_DEBUG, "client_peer=%s", peer);
207   gpr_free(peer);
208 
209   memset(ops, 0, sizeof(ops));
210   op = ops;
211   op->op = GRPC_OP_SEND_INITIAL_METADATA;
212   op->data.send_initial_metadata.count = 0;
213   op++;
214   op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
215   op->data.send_status_from_server.trailing_metadata_count = 0;
216   op->data.send_status_from_server.status = GRPC_STATUS_ABORTED;
217   op->data.send_status_from_server.status_details = &status_details;
218   op++;
219   op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
220   op->data.recv_close_on_server.cancelled = &was_cancelled;
221   op++;
222   error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(102), nullptr);
223   GPR_ASSERT(GRPC_CALL_OK == error);
224 
225   CQ_EXPECT_COMPLETION(cqv, tag(102), true);
226   cq_verify(cqv);
227 
228   grpc_call_unref(s);
229   grpc_metadata_array_destroy(&request_metadata_recv);
230   grpc_metadata_array_init(&request_metadata_recv);
231   grpc_call_details_destroy(&call_details);
232   grpc_call_details_init(&call_details);
233 
234   // Server gets a second call (the retry).
235   error =
236       grpc_server_request_call(f.server, &s, &call_details,
237                                &request_metadata_recv, f.cq, f.cq, tag(201));
238   GPR_ASSERT(GRPC_CALL_OK == error);
239   CQ_EXPECT_COMPLETION(cqv, tag(201), true);
240   cq_verify(cqv);
241 
242   // Initiate cancellation.
243   GPR_ASSERT(GRPC_CALL_OK == mode.initiate_cancel(c, nullptr));
244 
245   CQ_EXPECT_COMPLETION(cqv, tag(1), true);
246   cq_verify(cqv);
247 
248   GPR_ASSERT(status == mode.expect_status);
249   GPR_ASSERT(was_cancelled == 0);
250 
251   grpc_slice_unref(details);
252   grpc_metadata_array_destroy(&initial_metadata_recv);
253   grpc_metadata_array_destroy(&trailing_metadata_recv);
254   grpc_metadata_array_destroy(&request_metadata_recv);
255   grpc_call_details_destroy(&call_details);
256   grpc_byte_buffer_destroy(request_payload);
257   grpc_byte_buffer_destroy(response_payload);
258   grpc_byte_buffer_destroy(request_payload_recv);
259   grpc_byte_buffer_destroy(response_payload_recv);
260 
261   grpc_call_unref(c);
262   grpc_call_unref(s);
263 
264   cq_verifier_destroy(cqv);
265 
266   end_test(&f);
267   config.tear_down_data(&f);
268 }
269 
retry_cancellation(grpc_end2end_test_config config)270 void retry_cancellation(grpc_end2end_test_config config) {
271   GPR_ASSERT(config.feature_mask & FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL);
272   for (size_t i = 0; i < GPR_ARRAY_SIZE(cancellation_modes); ++i) {
273     test_retry_cancellation(config, cancellation_modes[i]);
274   }
275 }
276 
retry_cancellation_pre_init(void)277 void retry_cancellation_pre_init(void) {}
278