• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2020 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/byte_buffer.h>
20 #include <grpc/credentials.h>
21 #include <grpc/grpc.h>
22 #include <grpc/grpc_security.h>
23 #include <grpc/impl/channel_arg_names.h>
24 #include <grpc/impl/propagation_bits.h>
25 #include <grpc/slice.h>
26 #include <grpc/status.h>
27 #include <grpc/support/port_platform.h>
28 #include <grpc/support/time.h>
29 #include <gtest/gtest.h>
30 #include <string.h>
31 
32 #include <algorithm>
33 #include <functional>
34 #include <memory>
35 #include <string>
36 #include <thread>
37 #include <vector>
38 
39 #include "absl/log/check.h"
40 #include "absl/log/log.h"
41 #include "absl/types/optional.h"
42 #include "src/core/config/config_vars.h"
43 #include "src/core/ext/transport/chttp2/transport/flow_control.h"
44 #include "src/core/lib/channel/channel_args.h"
45 #include "src/core/util/host_port.h"
46 #include "src/core/util/sync.h"
47 #include "test/core/test_util/port.h"
48 #include "test/core/test_util/test_config.h"
49 
50 namespace {
51 
52 class TransportTargetWindowEstimatesMocker
53     : public grpc_core::chttp2::TestOnlyTransportTargetWindowEstimatesMocker {
54  public:
TransportTargetWindowEstimatesMocker()55   explicit TransportTargetWindowEstimatesMocker() {}
56 
ComputeNextTargetInitialWindowSizeFromPeriodicUpdate(double current_target)57   double ComputeNextTargetInitialWindowSizeFromPeriodicUpdate(
58       double current_target) override {
59     const double kTinyWindow = 512;
60     const double kSmallWindow = 8192;
61     // The goal is to bounce back and forth between 512 and 8192 initial window
62     // sizes, in order to get the following to happen at the server (in order):
63     //
64     // 1) Stall the server-side RPC's outgoing message on stream window flow
65     // control.
66     //
67     // 2) Send another settings frame with a change in initial window
68     // size setting, which will make the server-side call go writable.
69     if (current_target > kTinyWindow) {
70       return kTinyWindow;
71     } else {
72       return kSmallWindow;
73     }
74   }
75 };
76 
StartCall(grpc_call * call,grpc_completion_queue * cq)77 void StartCall(grpc_call* call, grpc_completion_queue* cq) {
78   grpc_op ops[1];
79   grpc_op* op;
80   memset(ops, 0, sizeof(ops));
81   op = ops;
82   op->op = GRPC_OP_SEND_INITIAL_METADATA;
83   op->data.send_initial_metadata.count = 0;
84   op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY;
85   op->reserved = nullptr;
86   op++;
87   void* tag = call;
88   grpc_call_error error = grpc_call_start_batch(
89       call, ops, static_cast<size_t>(op - ops), tag, nullptr);
90   CHECK_EQ(error, GRPC_CALL_OK);
91   grpc_event event = grpc_completion_queue_next(
92       cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
93   CHECK(event.type == GRPC_OP_COMPLETE);
94   CHECK(event.success);
95   CHECK(event.tag == tag);
96 }
97 
FinishCall(grpc_call * call,grpc_completion_queue * cq)98 void FinishCall(grpc_call* call, grpc_completion_queue* cq) {
99   grpc_op ops[4];
100   grpc_op* op;
101   grpc_metadata_array initial_metadata_recv;
102   grpc_metadata_array_init(&initial_metadata_recv);
103   grpc_metadata_array trailing_metadata_recv;
104   grpc_metadata_array_init(&trailing_metadata_recv);
105   grpc_status_code status;
106   grpc_slice details;
107   grpc_byte_buffer* recv_payload = nullptr;
108   memset(ops, 0, sizeof(ops));
109   op = ops;
110   op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
111   op->flags = 0;
112   op->reserved = nullptr;
113   op++;
114   op->op = GRPC_OP_RECV_INITIAL_METADATA;
115   op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
116   op->flags = 0;
117   op->reserved = nullptr;
118   op++;
119   op->op = GRPC_OP_RECV_MESSAGE;
120   op->data.recv_message.recv_message = &recv_payload;
121   op->flags = 0;
122   op->reserved = nullptr;
123   op++;
124   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
125   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
126   op->data.recv_status_on_client.status = &status;
127   op->data.recv_status_on_client.status_details = &details;
128   op->flags = 0;
129   op->reserved = nullptr;
130   op++;
131   void* tag = call;
132   grpc_call_error error = grpc_call_start_batch(
133       call, ops, static_cast<size_t>(op - ops), tag, nullptr);
134   CHECK_EQ(error, GRPC_CALL_OK);
135   grpc_event event = grpc_completion_queue_next(
136       cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
137   CHECK(event.type == GRPC_OP_COMPLETE);
138   CHECK(event.success);
139   CHECK(event.tag == tag);
140   grpc_metadata_array_destroy(&initial_metadata_recv);
141   grpc_metadata_array_destroy(&trailing_metadata_recv);
142   grpc_byte_buffer_destroy(recv_payload);
143   grpc_slice_unref(details);
144 }
145 
146 class TestServer {
147  public:
TestServer()148   explicit TestServer() {
149     cq_ = grpc_completion_queue_create_for_next(nullptr);
150     server_ = grpc_server_create(nullptr, nullptr);
151     address_ = grpc_core::JoinHostPort("[::1]", grpc_pick_unused_port_or_die());
152     grpc_server_register_completion_queue(server_, cq_, nullptr);
153     grpc_server_credentials* server_creds =
154         grpc_insecure_server_credentials_create();
155     CHECK(grpc_server_add_http2_port(server_, address_.c_str(), server_creds));
156     grpc_server_credentials_release(server_creds);
157     grpc_server_start(server_);
158     accept_thread_ = std::thread(std::bind(&TestServer::AcceptThread, this));
159   }
160 
ShutdownAndGetNumCallsHandled()161   int ShutdownAndGetNumCallsHandled() {
162     {
163       // prevent the server from requesting any more calls
164       grpc_core::MutexLock lock(&shutdown_mu_);
165       shutdown_ = true;
166     }
167     grpc_server_shutdown_and_notify(server_, cq_, this /* tag */);
168     accept_thread_.join();
169     grpc_server_destroy(server_);
170     grpc_completion_queue_shutdown(cq_);
171     while (grpc_completion_queue_next(cq_, gpr_inf_future(GPR_CLOCK_REALTIME),
172                                       nullptr)
173                .type != GRPC_QUEUE_SHUTDOWN) {
174     }
175     grpc_completion_queue_destroy(cq_);
176     return num_calls_handled_;
177   }
178 
address() const179   std::string address() const { return address_; }
180 
181  private:
AcceptThread()182   void AcceptThread() {
183     std::vector<std::thread> rpc_threads;
184     bool got_shutdown_and_notify_tag = false;
185     while (!got_shutdown_and_notify_tag) {
186       void* request_call_tag = &rpc_threads;
187       grpc_call_details call_details;
188       grpc_call_details_init(&call_details);
189       grpc_call* call = nullptr;
190       grpc_completion_queue* call_cq = nullptr;
191       grpc_metadata_array request_metadata_recv;
192       grpc_metadata_array_init(&request_metadata_recv);
193       {
194         grpc_core::MutexLock lock(&shutdown_mu_);
195         if (!shutdown_) {
196           call_cq = grpc_completion_queue_create_for_next(nullptr);
197           grpc_call_error error = grpc_server_request_call(
198               server_, &call, &call_details, &request_metadata_recv, call_cq,
199               cq_, request_call_tag);
200           CHECK_EQ(error, GRPC_CALL_OK);
201         }
202       }
203       grpc_event event = grpc_completion_queue_next(
204           cq_, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
205       CHECK(event.type == GRPC_OP_COMPLETE);
206       grpc_call_details_destroy(&call_details);
207       grpc_metadata_array_destroy(&request_metadata_recv);
208       if (event.success) {
209         if (event.tag == request_call_tag) {
210           // HandleOneRpc takes ownership of its parameters
211           num_calls_handled_++;
212           rpc_threads.push_back(
213               std::thread(std::bind(&TestServer::HandleOneRpc, call, call_cq)));
214         } else if (event.tag == this /* shutdown_and_notify tag */) {
215           grpc_core::MutexLock lock(&shutdown_mu_);
216           CHECK(shutdown_);
217           CHECK_EQ(call_cq, nullptr);
218           got_shutdown_and_notify_tag = true;
219         } else {
220           CHECK(0);
221         }
222       } else {
223         grpc_core::MutexLock lock(&shutdown_mu_);
224         CHECK(shutdown_);
225         grpc_completion_queue_destroy(call_cq);
226       }
227     }
228     LOG(INFO) << "test server shutdown, joining RPC threads...";
229     for (auto& t : rpc_threads) {
230       t.join();
231     }
232     LOG(INFO) << "test server threads all finished!";
233   }
234 
HandleOneRpc(grpc_call * call,grpc_completion_queue * call_cq)235   static void HandleOneRpc(grpc_call* call, grpc_completion_queue* call_cq) {
236     // Send a large enough payload to get us stalled on outgoing flow control
237     std::string send_payload(4 * 1024 * 1024, 'a');
238     grpc_slice request_payload_slice =
239         grpc_slice_from_copied_string(send_payload.c_str());
240     grpc_byte_buffer* request_payload =
241         grpc_raw_byte_buffer_create(&request_payload_slice, 1);
242     void* tag = call_cq;
243     grpc_op ops[2];
244     grpc_op* op;
245     memset(ops, 0, sizeof(ops));
246     op = ops;
247     op->op = GRPC_OP_SEND_INITIAL_METADATA;
248     op->data.send_initial_metadata.count = 0;
249     op->reserved = nullptr;
250     op++;
251     op->op = GRPC_OP_SEND_MESSAGE;
252     op->data.send_message.send_message = request_payload;
253     op->reserved = nullptr;
254     op++;
255     grpc_call_error error = grpc_call_start_batch(
256         call, ops, static_cast<size_t>(op - ops), tag, nullptr);
257     CHECK_EQ(error, GRPC_CALL_OK);
258     std::thread poller([call_cq]() {
259       // poll the connection so that we actively pick up bytes off the wire,
260       // including settings frames with window size increases
261       while (grpc_completion_queue_next(
262                  call_cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr)
263                  .type != GRPC_QUEUE_SHUTDOWN) {
264       }
265     });
266     grpc_call_cancel(call, nullptr);
267     grpc_call_unref(call);
268     grpc_completion_queue_shutdown(call_cq);
269     poller.join();
270     grpc_completion_queue_destroy(call_cq);
271     grpc_byte_buffer_destroy(request_payload);
272     grpc_slice_unref(request_payload_slice);
273   }
274 
275   grpc_server* server_;
276   grpc_completion_queue* cq_;
277   std::string address_;
278   std::thread accept_thread_;
279   int num_calls_handled_ = 0;
280   grpc_core::Mutex shutdown_mu_;
281   bool shutdown_ = false;
282 };
283 
284 // Perform a simple RPC where the server cancels the request with
285 // grpc_call_cancel_with_status
TEST(Pollers,TestDontCrashWhenTryingToReproIssueFixedBy23984)286 TEST(Pollers, TestDontCrashWhenTryingToReproIssueFixedBy23984) {
287   // 64 threads is arbitrary but chosen because, experimentally it's enough to
288   // repro the targeted crash crash (which is then fixed by
289   // https://github.com/grpc/grpc/pull/23984) at a very high rate.
290   const int kNumCalls = 64;
291   std::vector<std::thread> threads;
292   threads.reserve(kNumCalls);
293   std::unique_ptr<TestServer> test_server = std::make_unique<TestServer>();
294   const std::string server_address = test_server->address();
295   for (int i = 0; i < kNumCalls; i++) {
296     threads.push_back(std::thread([server_address]() {
297       std::vector<grpc_arg> args;
298       // this test is meant to create one connection to the server for each
299       // of these threads
300       args.push_back(grpc_channel_arg_integer_create(
301           const_cast<char*>(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL), true));
302       grpc_channel_args* channel_args =
303           grpc_channel_args_copy_and_add(nullptr, args.data(), args.size());
304       grpc_channel_credentials* creds = grpc_insecure_credentials_create();
305       grpc_channel* channel = grpc_channel_create(
306           std::string("ipv6:" + server_address).c_str(), creds, channel_args);
307       grpc_channel_credentials_release(creds);
308       grpc_channel_args_destroy(channel_args);
309       grpc_completion_queue* cq =
310           grpc_completion_queue_create_for_next(nullptr);
311       grpc_call* call = grpc_channel_create_call(
312           channel, nullptr, GRPC_PROPAGATE_DEFAULTS, cq,
313           grpc_slice_from_static_string("/foo"), nullptr,
314           gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
315       StartCall(call, cq);
316       // Explicitly avoid reading on this RPC for a period of time. The
317       // goal is to get the server side RPC to stall on it's outgoing stream
318       // flow control window, as the first step in trying to trigger a bug.
319       gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
320                                    gpr_time_from_seconds(1, GPR_TIMESPAN)));
321       // Note that this test doesn't really care what the status of the RPC was,
322       // because we're just trying to make sure that we don't crash.
323       FinishCall(call, cq);
324       grpc_call_unref(call);
325       grpc_channel_destroy(channel);
326       grpc_completion_queue_shutdown(cq);
327       while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME),
328                                         nullptr)
329                  .type != GRPC_QUEUE_SHUTDOWN) {
330       }
331       grpc_completion_queue_destroy(cq);
332     }));
333   }
334   for (auto& thread : threads) {
335     thread.join();
336   }
337   VLOG(2) << "All RPCs completed!";
338   int num_calls_seen_at_server = test_server->ShutdownAndGetNumCallsHandled();
339   if (num_calls_seen_at_server != kNumCalls) {
340     LOG(ERROR) << "Expected server to handle " << kNumCalls
341                << " calls, but instead it only handled "
342                << num_calls_seen_at_server
343                << ". This suggests some or all RPCs didn't make it to the "
344                   "server, which means that this test likely isn't doing what "
345                   "it's meant to be doing.";
346     CHECK(0);
347   }
348 }
349 
350 }  // namespace
351 
main(int argc,char ** argv)352 int main(int argc, char** argv) {
353   ::testing::InitGoogleTest(&argc, argv);
354   // Make sure that we will have an active poller on all client-side fd's that
355   // are capable of sending settings frames with window updates etc., even in
356   // the case that we don't have an active RPC operation on the fd.
357   grpc_core::ConfigVars::Overrides overrides;
358   overrides.client_channel_backup_poll_interval_ms = 1;
359   grpc_core::ConfigVars::SetOverrides(overrides);
360   grpc_core::chttp2::g_test_only_transport_target_window_estimates_mocker =
361       new TransportTargetWindowEstimatesMocker();
362   grpc::testing::TestEnvironment env(&argc, argv);
363   grpc_init();
364   auto result = RUN_ALL_TESTS();
365   grpc_shutdown();
366   return result;
367 }
368