1 //
2 //
3 // Copyright 2020 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/byte_buffer.h>
20 #include <grpc/credentials.h>
21 #include <grpc/grpc.h>
22 #include <grpc/grpc_security.h>
23 #include <grpc/impl/channel_arg_names.h>
24 #include <grpc/impl/propagation_bits.h>
25 #include <grpc/slice.h>
26 #include <grpc/status.h>
27 #include <grpc/support/port_platform.h>
28 #include <grpc/support/time.h>
29 #include <gtest/gtest.h>
30 #include <string.h>
31
32 #include <algorithm>
33 #include <functional>
34 #include <memory>
35 #include <string>
36 #include <thread>
37 #include <vector>
38
39 #include "absl/log/check.h"
40 #include "absl/log/log.h"
41 #include "absl/types/optional.h"
42 #include "src/core/config/config_vars.h"
43 #include "src/core/ext/transport/chttp2/transport/flow_control.h"
44 #include "src/core/lib/channel/channel_args.h"
45 #include "src/core/util/host_port.h"
46 #include "src/core/util/sync.h"
47 #include "test/core/test_util/port.h"
48 #include "test/core/test_util/test_config.h"
49
50 namespace {
51
52 class TransportTargetWindowEstimatesMocker
53 : public grpc_core::chttp2::TestOnlyTransportTargetWindowEstimatesMocker {
54 public:
TransportTargetWindowEstimatesMocker()55 explicit TransportTargetWindowEstimatesMocker() {}
56
ComputeNextTargetInitialWindowSizeFromPeriodicUpdate(double current_target)57 double ComputeNextTargetInitialWindowSizeFromPeriodicUpdate(
58 double current_target) override {
59 const double kTinyWindow = 512;
60 const double kSmallWindow = 8192;
61 // The goal is to bounce back and forth between 512 and 8192 initial window
62 // sizes, in order to get the following to happen at the server (in order):
63 //
64 // 1) Stall the server-side RPC's outgoing message on stream window flow
65 // control.
66 //
67 // 2) Send another settings frame with a change in initial window
68 // size setting, which will make the server-side call go writable.
69 if (current_target > kTinyWindow) {
70 return kTinyWindow;
71 } else {
72 return kSmallWindow;
73 }
74 }
75 };
76
StartCall(grpc_call * call,grpc_completion_queue * cq)77 void StartCall(grpc_call* call, grpc_completion_queue* cq) {
78 grpc_op ops[1];
79 grpc_op* op;
80 memset(ops, 0, sizeof(ops));
81 op = ops;
82 op->op = GRPC_OP_SEND_INITIAL_METADATA;
83 op->data.send_initial_metadata.count = 0;
84 op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY;
85 op->reserved = nullptr;
86 op++;
87 void* tag = call;
88 grpc_call_error error = grpc_call_start_batch(
89 call, ops, static_cast<size_t>(op - ops), tag, nullptr);
90 CHECK_EQ(error, GRPC_CALL_OK);
91 grpc_event event = grpc_completion_queue_next(
92 cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
93 CHECK(event.type == GRPC_OP_COMPLETE);
94 CHECK(event.success);
95 CHECK(event.tag == tag);
96 }
97
FinishCall(grpc_call * call,grpc_completion_queue * cq)98 void FinishCall(grpc_call* call, grpc_completion_queue* cq) {
99 grpc_op ops[4];
100 grpc_op* op;
101 grpc_metadata_array initial_metadata_recv;
102 grpc_metadata_array_init(&initial_metadata_recv);
103 grpc_metadata_array trailing_metadata_recv;
104 grpc_metadata_array_init(&trailing_metadata_recv);
105 grpc_status_code status;
106 grpc_slice details;
107 grpc_byte_buffer* recv_payload = nullptr;
108 memset(ops, 0, sizeof(ops));
109 op = ops;
110 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
111 op->flags = 0;
112 op->reserved = nullptr;
113 op++;
114 op->op = GRPC_OP_RECV_INITIAL_METADATA;
115 op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
116 op->flags = 0;
117 op->reserved = nullptr;
118 op++;
119 op->op = GRPC_OP_RECV_MESSAGE;
120 op->data.recv_message.recv_message = &recv_payload;
121 op->flags = 0;
122 op->reserved = nullptr;
123 op++;
124 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
125 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
126 op->data.recv_status_on_client.status = &status;
127 op->data.recv_status_on_client.status_details = &details;
128 op->flags = 0;
129 op->reserved = nullptr;
130 op++;
131 void* tag = call;
132 grpc_call_error error = grpc_call_start_batch(
133 call, ops, static_cast<size_t>(op - ops), tag, nullptr);
134 CHECK_EQ(error, GRPC_CALL_OK);
135 grpc_event event = grpc_completion_queue_next(
136 cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
137 CHECK(event.type == GRPC_OP_COMPLETE);
138 CHECK(event.success);
139 CHECK(event.tag == tag);
140 grpc_metadata_array_destroy(&initial_metadata_recv);
141 grpc_metadata_array_destroy(&trailing_metadata_recv);
142 grpc_byte_buffer_destroy(recv_payload);
143 grpc_slice_unref(details);
144 }
145
146 class TestServer {
147 public:
TestServer()148 explicit TestServer() {
149 cq_ = grpc_completion_queue_create_for_next(nullptr);
150 server_ = grpc_server_create(nullptr, nullptr);
151 address_ = grpc_core::JoinHostPort("[::1]", grpc_pick_unused_port_or_die());
152 grpc_server_register_completion_queue(server_, cq_, nullptr);
153 grpc_server_credentials* server_creds =
154 grpc_insecure_server_credentials_create();
155 CHECK(grpc_server_add_http2_port(server_, address_.c_str(), server_creds));
156 grpc_server_credentials_release(server_creds);
157 grpc_server_start(server_);
158 accept_thread_ = std::thread(std::bind(&TestServer::AcceptThread, this));
159 }
160
ShutdownAndGetNumCallsHandled()161 int ShutdownAndGetNumCallsHandled() {
162 {
163 // prevent the server from requesting any more calls
164 grpc_core::MutexLock lock(&shutdown_mu_);
165 shutdown_ = true;
166 }
167 grpc_server_shutdown_and_notify(server_, cq_, this /* tag */);
168 accept_thread_.join();
169 grpc_server_destroy(server_);
170 grpc_completion_queue_shutdown(cq_);
171 while (grpc_completion_queue_next(cq_, gpr_inf_future(GPR_CLOCK_REALTIME),
172 nullptr)
173 .type != GRPC_QUEUE_SHUTDOWN) {
174 }
175 grpc_completion_queue_destroy(cq_);
176 return num_calls_handled_;
177 }
178
address() const179 std::string address() const { return address_; }
180
181 private:
AcceptThread()182 void AcceptThread() {
183 std::vector<std::thread> rpc_threads;
184 bool got_shutdown_and_notify_tag = false;
185 while (!got_shutdown_and_notify_tag) {
186 void* request_call_tag = &rpc_threads;
187 grpc_call_details call_details;
188 grpc_call_details_init(&call_details);
189 grpc_call* call = nullptr;
190 grpc_completion_queue* call_cq = nullptr;
191 grpc_metadata_array request_metadata_recv;
192 grpc_metadata_array_init(&request_metadata_recv);
193 {
194 grpc_core::MutexLock lock(&shutdown_mu_);
195 if (!shutdown_) {
196 call_cq = grpc_completion_queue_create_for_next(nullptr);
197 grpc_call_error error = grpc_server_request_call(
198 server_, &call, &call_details, &request_metadata_recv, call_cq,
199 cq_, request_call_tag);
200 CHECK_EQ(error, GRPC_CALL_OK);
201 }
202 }
203 grpc_event event = grpc_completion_queue_next(
204 cq_, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
205 CHECK(event.type == GRPC_OP_COMPLETE);
206 grpc_call_details_destroy(&call_details);
207 grpc_metadata_array_destroy(&request_metadata_recv);
208 if (event.success) {
209 if (event.tag == request_call_tag) {
210 // HandleOneRpc takes ownership of its parameters
211 num_calls_handled_++;
212 rpc_threads.push_back(
213 std::thread(std::bind(&TestServer::HandleOneRpc, call, call_cq)));
214 } else if (event.tag == this /* shutdown_and_notify tag */) {
215 grpc_core::MutexLock lock(&shutdown_mu_);
216 CHECK(shutdown_);
217 CHECK_EQ(call_cq, nullptr);
218 got_shutdown_and_notify_tag = true;
219 } else {
220 CHECK(0);
221 }
222 } else {
223 grpc_core::MutexLock lock(&shutdown_mu_);
224 CHECK(shutdown_);
225 grpc_completion_queue_destroy(call_cq);
226 }
227 }
228 LOG(INFO) << "test server shutdown, joining RPC threads...";
229 for (auto& t : rpc_threads) {
230 t.join();
231 }
232 LOG(INFO) << "test server threads all finished!";
233 }
234
HandleOneRpc(grpc_call * call,grpc_completion_queue * call_cq)235 static void HandleOneRpc(grpc_call* call, grpc_completion_queue* call_cq) {
236 // Send a large enough payload to get us stalled on outgoing flow control
237 std::string send_payload(4 * 1024 * 1024, 'a');
238 grpc_slice request_payload_slice =
239 grpc_slice_from_copied_string(send_payload.c_str());
240 grpc_byte_buffer* request_payload =
241 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
242 void* tag = call_cq;
243 grpc_op ops[2];
244 grpc_op* op;
245 memset(ops, 0, sizeof(ops));
246 op = ops;
247 op->op = GRPC_OP_SEND_INITIAL_METADATA;
248 op->data.send_initial_metadata.count = 0;
249 op->reserved = nullptr;
250 op++;
251 op->op = GRPC_OP_SEND_MESSAGE;
252 op->data.send_message.send_message = request_payload;
253 op->reserved = nullptr;
254 op++;
255 grpc_call_error error = grpc_call_start_batch(
256 call, ops, static_cast<size_t>(op - ops), tag, nullptr);
257 CHECK_EQ(error, GRPC_CALL_OK);
258 std::thread poller([call_cq]() {
259 // poll the connection so that we actively pick up bytes off the wire,
260 // including settings frames with window size increases
261 while (grpc_completion_queue_next(
262 call_cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr)
263 .type != GRPC_QUEUE_SHUTDOWN) {
264 }
265 });
266 grpc_call_cancel(call, nullptr);
267 grpc_call_unref(call);
268 grpc_completion_queue_shutdown(call_cq);
269 poller.join();
270 grpc_completion_queue_destroy(call_cq);
271 grpc_byte_buffer_destroy(request_payload);
272 grpc_slice_unref(request_payload_slice);
273 }
274
275 grpc_server* server_;
276 grpc_completion_queue* cq_;
277 std::string address_;
278 std::thread accept_thread_;
279 int num_calls_handled_ = 0;
280 grpc_core::Mutex shutdown_mu_;
281 bool shutdown_ = false;
282 };
283
284 // Perform a simple RPC where the server cancels the request with
285 // grpc_call_cancel_with_status
TEST(Pollers,TestDontCrashWhenTryingToReproIssueFixedBy23984)286 TEST(Pollers, TestDontCrashWhenTryingToReproIssueFixedBy23984) {
287 // 64 threads is arbitrary but chosen because, experimentally it's enough to
288 // repro the targeted crash crash (which is then fixed by
289 // https://github.com/grpc/grpc/pull/23984) at a very high rate.
290 const int kNumCalls = 64;
291 std::vector<std::thread> threads;
292 threads.reserve(kNumCalls);
293 std::unique_ptr<TestServer> test_server = std::make_unique<TestServer>();
294 const std::string server_address = test_server->address();
295 for (int i = 0; i < kNumCalls; i++) {
296 threads.push_back(std::thread([server_address]() {
297 std::vector<grpc_arg> args;
298 // this test is meant to create one connection to the server for each
299 // of these threads
300 args.push_back(grpc_channel_arg_integer_create(
301 const_cast<char*>(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL), true));
302 grpc_channel_args* channel_args =
303 grpc_channel_args_copy_and_add(nullptr, args.data(), args.size());
304 grpc_channel_credentials* creds = grpc_insecure_credentials_create();
305 grpc_channel* channel = grpc_channel_create(
306 std::string("ipv6:" + server_address).c_str(), creds, channel_args);
307 grpc_channel_credentials_release(creds);
308 grpc_channel_args_destroy(channel_args);
309 grpc_completion_queue* cq =
310 grpc_completion_queue_create_for_next(nullptr);
311 grpc_call* call = grpc_channel_create_call(
312 channel, nullptr, GRPC_PROPAGATE_DEFAULTS, cq,
313 grpc_slice_from_static_string("/foo"), nullptr,
314 gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
315 StartCall(call, cq);
316 // Explicitly avoid reading on this RPC for a period of time. The
317 // goal is to get the server side RPC to stall on it's outgoing stream
318 // flow control window, as the first step in trying to trigger a bug.
319 gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
320 gpr_time_from_seconds(1, GPR_TIMESPAN)));
321 // Note that this test doesn't really care what the status of the RPC was,
322 // because we're just trying to make sure that we don't crash.
323 FinishCall(call, cq);
324 grpc_call_unref(call);
325 grpc_channel_destroy(channel);
326 grpc_completion_queue_shutdown(cq);
327 while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME),
328 nullptr)
329 .type != GRPC_QUEUE_SHUTDOWN) {
330 }
331 grpc_completion_queue_destroy(cq);
332 }));
333 }
334 for (auto& thread : threads) {
335 thread.join();
336 }
337 VLOG(2) << "All RPCs completed!";
338 int num_calls_seen_at_server = test_server->ShutdownAndGetNumCallsHandled();
339 if (num_calls_seen_at_server != kNumCalls) {
340 LOG(ERROR) << "Expected server to handle " << kNumCalls
341 << " calls, but instead it only handled "
342 << num_calls_seen_at_server
343 << ". This suggests some or all RPCs didn't make it to the "
344 "server, which means that this test likely isn't doing what "
345 "it's meant to be doing.";
346 CHECK(0);
347 }
348 }
349
350 } // namespace
351
main(int argc,char ** argv)352 int main(int argc, char** argv) {
353 ::testing::InitGoogleTest(&argc, argv);
354 // Make sure that we will have an active poller on all client-side fd's that
355 // are capable of sending settings frames with window updates etc., even in
356 // the case that we don't have an active RPC operation on the fd.
357 grpc_core::ConfigVars::Overrides overrides;
358 overrides.client_channel_backup_poll_interval_ms = 1;
359 grpc_core::ConfigVars::SetOverrides(overrides);
360 grpc_core::chttp2::g_test_only_transport_target_window_estimates_mocker =
361 new TransportTargetWindowEstimatesMocker();
362 grpc::testing::TestEnvironment env(&argc, argv);
363 grpc_init();
364 auto result = RUN_ALL_TESTS();
365 grpc_shutdown();
366 return result;
367 }
368