• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <grpc/byte_buffer.h>
16 #include <grpc/credentials.h>
17 #include <grpc/grpc.h>
18 #include <grpc/grpc_security.h>
19 #include <grpc/impl/channel_arg_names.h>
20 #include <grpc/impl/propagation_bits.h>
21 #include <grpc/slice.h>
22 #include <grpc/status.h>
23 #include <grpc/support/port_platform.h>
24 #include <grpc/support/time.h>
25 #include <inttypes.h>
26 #include <string.h>
27 
28 #include <string>
29 
30 #include "absl/base/thread_annotations.h"
31 #include "absl/log/check.h"
32 #include "absl/log/log.h"
33 #include "absl/strings/str_cat.h"
34 #include "gtest/gtest.h"
35 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
36 #include "src/core/lib/channel/channel_args.h"
37 #include "src/core/util/host_port.h"
38 #include "src/core/util/sync.h"
39 #include "test/core/test_util/port.h"
40 #include "test/core/test_util/test_config.h"
41 
42 namespace {
43 
44 class TestServer {
45  public:
TestServer(grpc_completion_queue * cq,grpc_channel_args * channel_args)46   explicit TestServer(grpc_completion_queue* cq,
47                       grpc_channel_args* channel_args)
48       : cq_(cq) {
49     server_ = grpc_server_create(channel_args, nullptr);
50     address_ = grpc_core::JoinHostPort("[::1]", grpc_pick_unused_port_or_die());
51     grpc_server_register_completion_queue(server_, cq_, nullptr);
52     grpc_server_credentials* server_creds =
53         grpc_insecure_server_credentials_create();
54     CHECK(grpc_server_add_http2_port(server_, address_.c_str(), server_creds));
55     grpc_server_credentials_release(server_creds);
56     grpc_server_start(server_);
57   }
58 
~TestServer()59   ~TestServer() {
60     grpc_server_shutdown_and_notify(server_, cq_, this /* tag */);
61     grpc_event event = grpc_completion_queue_next(
62         cq_, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
63     CHECK(event.type == GRPC_OP_COMPLETE);
64     CHECK(event.success);
65     CHECK(event.tag == this);
66     grpc_server_destroy(server_);
67   }
68 
HandleRpc()69   void HandleRpc() {
70     grpc_call_details call_details;
71     grpc_call_details_init(&call_details);
72     grpc_metadata_array request_metadata_recv;
73     grpc_metadata_array_init(&request_metadata_recv);
74     grpc_slice status_details = grpc_slice_from_static_string("xyz");
75     int was_cancelled;
76     // request a call
77     void* tag = this;
78     grpc_call* call;
79     grpc_call_error error = grpc_server_request_call(
80         server_, &call, &call_details, &request_metadata_recv, cq_, cq_, tag);
81     CHECK_EQ(error, GRPC_CALL_OK);
82     grpc_event event = grpc_completion_queue_next(
83         cq_, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
84     CHECK(event.type == GRPC_OP_COMPLETE);
85     grpc_call_details_destroy(&call_details);
86     grpc_metadata_array_destroy(&request_metadata_recv);
87     CHECK(event.success);
88     CHECK(event.tag == tag);
89     // Send a response with a 1-byte payload. The 1-byte length is important
90     // because it's enough to get the client to *queue* a flow control update,
91     // but not long enough to get the client to initiate a write on that update.
92     grpc_slice response_payload_slice = grpc_slice_from_static_string("a");
93     grpc_byte_buffer* response_payload =
94         grpc_raw_byte_buffer_create(&response_payload_slice, 1);
95     grpc_op ops[4];
96     grpc_op* op;
97     memset(ops, 0, sizeof(ops));
98     op = ops;
99     op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
100     op->data.recv_close_on_server.cancelled = &was_cancelled;
101     op++;
102     op->op = GRPC_OP_SEND_INITIAL_METADATA;
103     op++;
104     op->op = GRPC_OP_SEND_MESSAGE;
105     op->data.send_message.send_message = response_payload;
106     op++;
107     op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
108     op->data.send_status_from_server.status = GRPC_STATUS_OK;
109     op->data.send_status_from_server.status_details = &status_details;
110     op++;
111     error = grpc_call_start_batch(call, ops, static_cast<size_t>(op - ops), tag,
112                                   nullptr);
113     CHECK_EQ(error, GRPC_CALL_OK);
114     event = grpc_completion_queue_next(cq_, gpr_inf_future(GPR_CLOCK_REALTIME),
115                                        nullptr);
116     CHECK(event.type == GRPC_OP_COMPLETE);
117     CHECK(event.success);
118     CHECK(event.tag == tag);
119     grpc_byte_buffer_destroy(response_payload);
120     grpc_call_unref(call);
121   }
122 
address() const123   std::string address() const { return address_; }
124 
125  private:
126   grpc_server* server_;
127   grpc_completion_queue* cq_;
128   std::string address_;
129 };
130 
StartCallAndCloseWrites(grpc_call * call,grpc_completion_queue * cq)131 void StartCallAndCloseWrites(grpc_call* call, grpc_completion_queue* cq) {
132   grpc_op ops[2];
133   grpc_op* op;
134   memset(ops, 0, sizeof(ops));
135   op = ops;
136   op->op = GRPC_OP_SEND_INITIAL_METADATA;
137   op++;
138   op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
139   op++;
140   void* tag = call;
141   grpc_call_error error = grpc_call_start_batch(
142       call, ops, static_cast<size_t>(op - ops), tag, nullptr);
143   CHECK_EQ(error, GRPC_CALL_OK);
144   grpc_event event = grpc_completion_queue_next(
145       cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
146   CHECK(event.type == GRPC_OP_COMPLETE);
147   CHECK(event.success);
148   CHECK(event.tag == tag);
149 }
150 
FinishCall(grpc_call * call,grpc_completion_queue * cq)151 void FinishCall(grpc_call* call, grpc_completion_queue* cq) {
152   grpc_metadata_array initial_metadata_recv;
153   grpc_metadata_array_init(&initial_metadata_recv);
154   grpc_metadata_array trailing_metadata_recv;
155   grpc_metadata_array_init(&trailing_metadata_recv);
156   grpc_status_code status = GRPC_STATUS_UNKNOWN;
157   grpc_slice details;
158   grpc_byte_buffer* recv_payload = nullptr;
159   void* tag = call;
160   // Note: we're only doing read ops here.  The goal here is to finish the call
161   // with a queued stream flow control update, due to receipt of a small
162   // message. We won't do anything to explicitly initiate writes on the
163   // transport, which could accidentally flush out that queued update.
164   grpc_op ops[3];
165   grpc_op* op;
166   memset(ops, 0, sizeof(ops));
167   op = ops;
168   op->op = GRPC_OP_RECV_INITIAL_METADATA;
169   op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
170   op++;
171   op->op = GRPC_OP_RECV_MESSAGE;
172   op->data.recv_message.recv_message = &recv_payload;
173   op++;
174   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
175   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
176   op->data.recv_status_on_client.status = &status;
177   op->data.recv_status_on_client.status_details = &details;
178   op++;
179   grpc_call_error error = grpc_call_start_batch(
180       call, ops, static_cast<size_t>(op - ops), tag, nullptr);
181   CHECK_EQ(error, GRPC_CALL_OK);
182   grpc_event event = grpc_completion_queue_next(
183       cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
184   CHECK(event.type == GRPC_OP_COMPLETE);
185   CHECK(event.success);
186   CHECK(event.tag == tag);
187   EXPECT_EQ(status, GRPC_STATUS_OK);
188   grpc_byte_buffer_destroy(recv_payload);
189   grpc_metadata_array_destroy(&initial_metadata_recv);
190   grpc_metadata_array_destroy(&trailing_metadata_recv);
191   grpc_slice_unref(details);
192 }
193 
194 class TransportCounter {
195  public:
InitCallback()196   void InitCallback() {
197     grpc_core::MutexLock lock(&mu_);
198     ++num_created_;
199     ++num_live_;
200     LOG(INFO) << "TransportCounter num_created_=" << num_created_
201               << " num_live_=" << num_live_ << " InitCallback";
202   }
203 
DestructCallback()204   void DestructCallback() {
205     grpc_core::MutexLock lock(&mu_);
206     --num_live_;
207     LOG(INFO) << "TransportCounter num_created_=" << num_created_
208               << " num_live_=" << num_live_ << " DestructCallback";
209   }
210 
num_live()211   int64_t num_live() {
212     grpc_core::MutexLock lock(&mu_);
213     return num_live_;
214   }
215 
num_created()216   size_t num_created() {
217     grpc_core::MutexLock lock(&mu_);
218     return num_created_;
219   }
220 
221  private:
222   grpc_core::Mutex mu_;
223   int64_t num_live_ ABSL_GUARDED_BY(mu_) = 0;
224   size_t num_created_ ABSL_GUARDED_BY(mu_) = 0;
225 };
226 
227 TransportCounter* g_transport_counter;
228 
CounterInitCallback()229 void CounterInitCallback() { g_transport_counter->InitCallback(); }
230 
CounterDestructCallback()231 void CounterDestructCallback() { g_transport_counter->DestructCallback(); }
232 
EnsureConnectionsArentLeaked(grpc_completion_queue * cq)233 void EnsureConnectionsArentLeaked(grpc_completion_queue* cq) {
234   LOG(INFO) << "The channel has been destroyed, wait for it to shut down and "
235                "close...";
236   // Do a quick initial poll to try to exit the test early if things have
237   // already cleaned up.
238   CHECK(grpc_completion_queue_next(
239             cq,
240             gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
241                          gpr_time_from_millis(1, GPR_TIMESPAN)),
242             nullptr)
243             .type == GRPC_QUEUE_TIMEOUT);
244   if (g_transport_counter->num_created() < 2) {
245     LOG(ERROR) << "g_transport_counter->num_created() == "
246                << g_transport_counter->num_created()
247                << ". This means that g_transport_counter isn't working and "
248                   "this test is broken. At least a couple of transport objects "
249                   "should have been created.";
250     CHECK(0);
251   }
252   gpr_timespec overall_deadline = grpc_timeout_seconds_to_deadline(120);
253   for (;;) {
254     // Note: the main goal of this test is to try to repro a chttp2 stream
255     // leak, which also holds on to transports objects.
256     int64_t live_transports = g_transport_counter->num_live();
257     if (live_transports == 0) return;
258     if (gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), overall_deadline) > 0) {
259       LOG(INFO) << "g_transport_counter->num_live() never returned 0. "
260                    "It's likely this test has triggered a connection leak.";
261       CHECK(0);
262     }
263     LOG(INFO) << "g_transport_counter->num_live() returned " << live_transports
264               << ", keep waiting until it reaches 0";
265     CHECK(grpc_completion_queue_next(
266               cq,
267               gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
268                            gpr_time_from_seconds(1, GPR_TIMESPAN)),
269               nullptr)
270               .type == GRPC_QUEUE_TIMEOUT);
271   }
272 }
273 
TEST(Chttp2,TestStreamDoesntLeakWhenItsWriteClosedAndThenReadClosedBeforeStartOfReadingMessageAndStatus)274 TEST(
275     Chttp2,
276     TestStreamDoesntLeakWhenItsWriteClosedAndThenReadClosedBeforeStartOfReadingMessageAndStatus) {
277   grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
278   {
279     // Prevent pings from client to server and server to client, since they can
280     // cause chttp2 to initiate writes and thus dodge the bug we're trying to
281     // repro.
282     auto channel_args =
283         grpc_core::ChannelArgs().Set(GRPC_ARG_HTTP2_BDP_PROBE, 0);
284     TestServer server(cq,
285                       const_cast<grpc_channel_args*>(channel_args.ToC().get()));
286     grpc_channel_credentials* creds = grpc_insecure_credentials_create();
287     grpc_channel* channel =
288         grpc_channel_create(absl::StrCat("ipv6:", server.address()).c_str(),
289                             creds, channel_args.ToC().get());
290     grpc_channel_credentials_release(creds);
291     grpc_call* call =
292         grpc_channel_create_call(channel, nullptr, GRPC_PROPAGATE_DEFAULTS, cq,
293                                  grpc_slice_from_static_string("/foo"), nullptr,
294                                  gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
295     // Start the call. It's important for our repro to close writes before
296     // reading the response, so that the client transport marks the stream
297     // both read and write closed as soon as it reads a status off the wire.
298     StartCallAndCloseWrites(call, cq);
299     // Send a small message from server to client. The message needs to be small
300     // enough such that the client will queue a stream flow control update,
301     // without flushing it out to the wire.
302     server.HandleRpc();
303     // Do some polling to let the client to pick up the message and status off
304     // the wire, *before* it begins the RECV_MESSAGE and RECV_STATUS ops.
305     // The timeout here just needs to be long enough that the client has
306     // most likely reads everything the server sent it by the time it's done.
307     CHECK(grpc_completion_queue_next(
308               cq, grpc_timeout_milliseconds_to_deadline(20), nullptr)
309               .type == GRPC_QUEUE_TIMEOUT);
310     // Perform the receive message and status. Note that the incoming bytes
311     // should already be in the client's buffers by the time we start these ops.
312     // Thus, the client should *not* need to urgently send a flow control update
313     // to the server, to ensure progress, and it can simply queue the flow
314     // control update instead.
315     FinishCall(call, cq);
316     grpc_call_unref(call);
317     grpc_channel_destroy(channel);
318     // There should be nothing to prevent stream and transport objects from
319     // shutdown and destruction at this point. So check that this happens.
320     // The timeout is somewhat arbitrary, and is set long enough so that it's
321     // extremely unlikely to be hit due to CPU starvation.
322     EnsureConnectionsArentLeaked(cq);
323   }
324   grpc_completion_queue_shutdown(cq);
325   while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME),
326                                     nullptr)
327              .type != GRPC_QUEUE_SHUTDOWN) {
328   }
329   grpc_completion_queue_destroy(cq);
330 }
331 
332 }  // namespace
333 
main(int argc,char ** argv)334 int main(int argc, char** argv) {
335   ::testing::InitGoogleTest(&argc, argv);
336   grpc::testing::TestEnvironment env(&argc, argv);
337   grpc_init();
338   g_transport_counter = new TransportCounter();
339   grpc_core::TestOnlySetGlobalHttp2TransportInitCallback(CounterInitCallback);
340   grpc_core::TestOnlySetGlobalHttp2TransportDestructCallback(
341       CounterDestructCallback);
342   auto result = RUN_ALL_TESTS();
343   grpc_shutdown();
344   return result;
345 }
346