• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2020 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <string.h>
20 
21 #include <algorithm>
22 #include <functional>
23 #include <memory>
24 #include <string>
25 #include <thread>
26 #include <vector>
27 
28 #include <gtest/gtest.h>
29 
30 #include "absl/types/optional.h"
31 
32 #include <grpc/byte_buffer.h>
33 #include <grpc/grpc.h>
34 #include <grpc/grpc_security.h>
35 #include <grpc/impl/channel_arg_names.h>
36 #include <grpc/impl/propagation_bits.h>
37 #include <grpc/slice.h>
38 #include <grpc/status.h>
39 #include <grpc/support/log.h>
40 #include <grpc/support/port_platform.h>
41 #include <grpc/support/time.h>
42 
43 #include "src/core/ext/transport/chttp2/transport/flow_control.h"
44 #include "src/core/lib/channel/channel_args.h"
45 #include "src/core/lib/config/config_vars.h"
46 #include "src/core/lib/gprpp/host_port.h"
47 #include "src/core/lib/gprpp/sync.h"
48 #include "test/core/util/port.h"
49 #include "test/core/util/test_config.h"
50 
51 namespace {
52 
53 class TransportTargetWindowEstimatesMocker
54     : public grpc_core::chttp2::TestOnlyTransportTargetWindowEstimatesMocker {
55  public:
TransportTargetWindowEstimatesMocker()56   explicit TransportTargetWindowEstimatesMocker() {}
57 
ComputeNextTargetInitialWindowSizeFromPeriodicUpdate(double current_target)58   double ComputeNextTargetInitialWindowSizeFromPeriodicUpdate(
59       double current_target) override {
60     const double kTinyWindow = 512;
61     const double kSmallWindow = 8192;
62     // The goal is to bounce back and forth between 512 and 8192 initial window
63     // sizes, in order to get the following to happen at the server (in order):
64     //
65     // 1) Stall the server-side RPC's outgoing message on stream window flow
66     // control.
67     //
68     // 2) Send another settings frame with a change in initial window
69     // size setting, which will make the server-side call go writable.
70     if (current_target > kTinyWindow) {
71       return kTinyWindow;
72     } else {
73       return kSmallWindow;
74     }
75   }
76 };
77 
StartCall(grpc_call * call,grpc_completion_queue * cq)78 void StartCall(grpc_call* call, grpc_completion_queue* cq) {
79   grpc_op ops[1];
80   grpc_op* op;
81   memset(ops, 0, sizeof(ops));
82   op = ops;
83   op->op = GRPC_OP_SEND_INITIAL_METADATA;
84   op->data.send_initial_metadata.count = 0;
85   op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY;
86   op->reserved = nullptr;
87   op++;
88   void* tag = call;
89   grpc_call_error error = grpc_call_start_batch(
90       call, ops, static_cast<size_t>(op - ops), tag, nullptr);
91   GPR_ASSERT(GRPC_CALL_OK == error);
92   grpc_event event = grpc_completion_queue_next(
93       cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
94   GPR_ASSERT(event.type == GRPC_OP_COMPLETE);
95   GPR_ASSERT(event.success);
96   GPR_ASSERT(event.tag == tag);
97 }
98 
FinishCall(grpc_call * call,grpc_completion_queue * cq)99 void FinishCall(grpc_call* call, grpc_completion_queue* cq) {
100   grpc_op ops[4];
101   grpc_op* op;
102   grpc_metadata_array initial_metadata_recv;
103   grpc_metadata_array_init(&initial_metadata_recv);
104   grpc_metadata_array trailing_metadata_recv;
105   grpc_metadata_array_init(&trailing_metadata_recv);
106   grpc_status_code status;
107   grpc_slice details;
108   grpc_byte_buffer* recv_payload = nullptr;
109   memset(ops, 0, sizeof(ops));
110   op = ops;
111   op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
112   op->flags = 0;
113   op->reserved = nullptr;
114   op++;
115   op->op = GRPC_OP_RECV_INITIAL_METADATA;
116   op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
117   op->flags = 0;
118   op->reserved = nullptr;
119   op++;
120   op->op = GRPC_OP_RECV_MESSAGE;
121   op->data.recv_message.recv_message = &recv_payload;
122   op->flags = 0;
123   op->reserved = nullptr;
124   op++;
125   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
126   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
127   op->data.recv_status_on_client.status = &status;
128   op->data.recv_status_on_client.status_details = &details;
129   op->flags = 0;
130   op->reserved = nullptr;
131   op++;
132   void* tag = call;
133   grpc_call_error error = grpc_call_start_batch(
134       call, ops, static_cast<size_t>(op - ops), tag, nullptr);
135   GPR_ASSERT(GRPC_CALL_OK == error);
136   grpc_event event = grpc_completion_queue_next(
137       cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
138   GPR_ASSERT(event.type == GRPC_OP_COMPLETE);
139   GPR_ASSERT(event.success);
140   GPR_ASSERT(event.tag == tag);
141   grpc_metadata_array_destroy(&initial_metadata_recv);
142   grpc_metadata_array_destroy(&trailing_metadata_recv);
143   grpc_byte_buffer_destroy(recv_payload);
144   grpc_slice_unref(details);
145 }
146 
147 class TestServer {
148  public:
TestServer()149   explicit TestServer() {
150     cq_ = grpc_completion_queue_create_for_next(nullptr);
151     server_ = grpc_server_create(nullptr, nullptr);
152     address_ = grpc_core::JoinHostPort("[::1]", grpc_pick_unused_port_or_die());
153     grpc_server_register_completion_queue(server_, cq_, nullptr);
154     grpc_server_credentials* server_creds =
155         grpc_insecure_server_credentials_create();
156     GPR_ASSERT(
157         grpc_server_add_http2_port(server_, address_.c_str(), server_creds));
158     grpc_server_credentials_release(server_creds);
159     grpc_server_start(server_);
160     accept_thread_ = std::thread(std::bind(&TestServer::AcceptThread, this));
161   }
162 
ShutdownAndGetNumCallsHandled()163   int ShutdownAndGetNumCallsHandled() {
164     {
165       // prevent the server from requesting any more calls
166       grpc_core::MutexLock lock(&shutdown_mu_);
167       shutdown_ = true;
168     }
169     grpc_server_shutdown_and_notify(server_, cq_, this /* tag */);
170     accept_thread_.join();
171     grpc_server_destroy(server_);
172     grpc_completion_queue_shutdown(cq_);
173     while (grpc_completion_queue_next(cq_, gpr_inf_future(GPR_CLOCK_REALTIME),
174                                       nullptr)
175                .type != GRPC_QUEUE_SHUTDOWN) {
176     }
177     grpc_completion_queue_destroy(cq_);
178     return num_calls_handled_;
179   }
180 
address() const181   std::string address() const { return address_; }
182 
183  private:
AcceptThread()184   void AcceptThread() {
185     std::vector<std::thread> rpc_threads;
186     bool got_shutdown_and_notify_tag = false;
187     while (!got_shutdown_and_notify_tag) {
188       void* request_call_tag = &rpc_threads;
189       grpc_call_details call_details;
190       grpc_call_details_init(&call_details);
191       grpc_call* call = nullptr;
192       grpc_completion_queue* call_cq = nullptr;
193       grpc_metadata_array request_metadata_recv;
194       grpc_metadata_array_init(&request_metadata_recv);
195       {
196         grpc_core::MutexLock lock(&shutdown_mu_);
197         if (!shutdown_) {
198           call_cq = grpc_completion_queue_create_for_next(nullptr);
199           grpc_call_error error = grpc_server_request_call(
200               server_, &call, &call_details, &request_metadata_recv, call_cq,
201               cq_, request_call_tag);
202           GPR_ASSERT(error == GRPC_CALL_OK);
203         }
204       }
205       grpc_event event = grpc_completion_queue_next(
206           cq_, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
207       GPR_ASSERT(event.type == GRPC_OP_COMPLETE);
208       grpc_call_details_destroy(&call_details);
209       grpc_metadata_array_destroy(&request_metadata_recv);
210       if (event.success) {
211         if (event.tag == request_call_tag) {
212           // HandleOneRpc takes ownership of its parameters
213           num_calls_handled_++;
214           rpc_threads.push_back(
215               std::thread(std::bind(&TestServer::HandleOneRpc, call, call_cq)));
216         } else if (event.tag == this /* shutdown_and_notify tag */) {
217           grpc_core::MutexLock lock(&shutdown_mu_);
218           GPR_ASSERT(shutdown_);
219           GPR_ASSERT(call_cq == nullptr);
220           got_shutdown_and_notify_tag = true;
221         } else {
222           GPR_ASSERT(0);
223         }
224       } else {
225         grpc_core::MutexLock lock(&shutdown_mu_);
226         GPR_ASSERT(shutdown_);
227         grpc_completion_queue_destroy(call_cq);
228       }
229     }
230     gpr_log(GPR_INFO, "test server shutdown, joining RPC threads...");
231     for (auto& t : rpc_threads) {
232       t.join();
233     }
234     gpr_log(GPR_INFO, "test server threads all finished!");
235   }
236 
HandleOneRpc(grpc_call * call,grpc_completion_queue * call_cq)237   static void HandleOneRpc(grpc_call* call, grpc_completion_queue* call_cq) {
238     // Send a large enough payload to get us stalled on outgoing flow control
239     std::string send_payload(4 * 1024 * 1024, 'a');
240     grpc_slice request_payload_slice =
241         grpc_slice_from_copied_string(send_payload.c_str());
242     grpc_byte_buffer* request_payload =
243         grpc_raw_byte_buffer_create(&request_payload_slice, 1);
244     void* tag = call_cq;
245     grpc_op ops[2];
246     grpc_op* op;
247     memset(ops, 0, sizeof(ops));
248     op = ops;
249     op->op = GRPC_OP_SEND_INITIAL_METADATA;
250     op->data.send_initial_metadata.count = 0;
251     op->reserved = nullptr;
252     op++;
253     op->op = GRPC_OP_SEND_MESSAGE;
254     op->data.send_message.send_message = request_payload;
255     op->reserved = nullptr;
256     op++;
257     grpc_call_error error = grpc_call_start_batch(
258         call, ops, static_cast<size_t>(op - ops), tag, nullptr);
259     GPR_ASSERT(GRPC_CALL_OK == error);
260     std::thread poller([call_cq]() {
261       // poll the connection so that we actively pick up bytes off the wire,
262       // including settings frames with window size increases
263       while (grpc_completion_queue_next(
264                  call_cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr)
265                  .type != GRPC_QUEUE_SHUTDOWN) {
266       }
267     });
268     grpc_call_cancel(call, nullptr);
269     grpc_call_unref(call);
270     grpc_completion_queue_shutdown(call_cq);
271     poller.join();
272     grpc_completion_queue_destroy(call_cq);
273     grpc_byte_buffer_destroy(request_payload);
274     grpc_slice_unref(request_payload_slice);
275   }
276 
277   grpc_server* server_;
278   grpc_completion_queue* cq_;
279   std::string address_;
280   std::thread accept_thread_;
281   int num_calls_handled_ = 0;
282   grpc_core::Mutex shutdown_mu_;
283   bool shutdown_ = false;
284 };
285 
286 // Perform a simple RPC where the server cancels the request with
287 // grpc_call_cancel_with_status
TEST(Pollers,TestDontCrashWhenTryingToReproIssueFixedBy23984)288 TEST(Pollers, TestDontCrashWhenTryingToReproIssueFixedBy23984) {
289   // 64 threads is arbitrary but chosen because, experimentally it's enough to
290   // repro the targetted crash crash (which is then fixed by
291   // https://github.com/grpc/grpc/pull/23984) at a very high rate.
292   const int kNumCalls = 64;
293   std::vector<std::thread> threads;
294   threads.reserve(kNumCalls);
295   std::unique_ptr<TestServer> test_server = std::make_unique<TestServer>();
296   const std::string server_address = test_server->address();
297   for (int i = 0; i < kNumCalls; i++) {
298     threads.push_back(std::thread([server_address]() {
299       std::vector<grpc_arg> args;
300       // this test is meant to create one connection to the server for each
301       // of these threads
302       args.push_back(grpc_channel_arg_integer_create(
303           const_cast<char*>(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL), true));
304       grpc_channel_args* channel_args =
305           grpc_channel_args_copy_and_add(nullptr, args.data(), args.size());
306       grpc_channel_credentials* creds = grpc_insecure_credentials_create();
307       grpc_channel* channel = grpc_channel_create(
308           std::string("ipv6:" + server_address).c_str(), creds, channel_args);
309       grpc_channel_credentials_release(creds);
310       grpc_channel_args_destroy(channel_args);
311       grpc_completion_queue* cq =
312           grpc_completion_queue_create_for_next(nullptr);
313       grpc_call* call = grpc_channel_create_call(
314           channel, nullptr, GRPC_PROPAGATE_DEFAULTS, cq,
315           grpc_slice_from_static_string("/foo"), nullptr,
316           gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
317       StartCall(call, cq);
318       // Explicitly avoid reading on this RPC for a period of time. The
319       // goal is to get the server side RPC to stall on it's outgoing stream
320       // flow control window, as the first step in trying to trigger a bug.
321       gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
322                                    gpr_time_from_seconds(1, GPR_TIMESPAN)));
323       // Note that this test doesn't really care what the status of the RPC was,
324       // because we're just trying to make sure that we don't crash.
325       FinishCall(call, cq);
326       grpc_call_unref(call);
327       grpc_channel_destroy(channel);
328       grpc_completion_queue_shutdown(cq);
329       while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME),
330                                         nullptr)
331                  .type != GRPC_QUEUE_SHUTDOWN) {
332       }
333       grpc_completion_queue_destroy(cq);
334     }));
335   }
336   for (auto& thread : threads) {
337     thread.join();
338   }
339   gpr_log(GPR_DEBUG, "All RPCs completed!");
340   int num_calls_seen_at_server = test_server->ShutdownAndGetNumCallsHandled();
341   if (num_calls_seen_at_server != kNumCalls) {
342     gpr_log(GPR_ERROR,
343             "Expected server to handle %d calls, but instead it only handled "
344             "%d. This suggests some or all RPCs didn't make it to the server, "
345             "which means "
346             "that this test likely isn't doing what it's meant to be doing.",
347             kNumCalls, num_calls_seen_at_server);
348     GPR_ASSERT(0);
349   }
350 }
351 
352 }  // namespace
353 
main(int argc,char ** argv)354 int main(int argc, char** argv) {
355   ::testing::InitGoogleTest(&argc, argv);
356   // Make sure that we will have an active poller on all client-side fd's that
357   // are capable of sending settings frames with window updates etc., even in
358   // the case that we don't have an active RPC operation on the fd.
359   grpc_core::ConfigVars::Overrides overrides;
360   overrides.client_channel_backup_poll_interval_ms = 1;
361   grpc_core::ConfigVars::SetOverrides(overrides);
362   grpc_core::chttp2::g_test_only_transport_target_window_estimates_mocker =
363       new TransportTargetWindowEstimatesMocker();
364   grpc::testing::TestEnvironment env(&argc, argv);
365   grpc_init();
366   auto result = RUN_ALL_TESTS();
367   grpc_shutdown();
368   return result;
369 }
370