• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <inttypes.h>
16 #include <string.h>
17 
18 #include <string>
19 
20 #include "absl/base/thread_annotations.h"
21 #include "absl/strings/str_cat.h"
22 #include "gtest/gtest.h"
23 
24 #include <grpc/byte_buffer.h>
25 #include <grpc/grpc.h>
26 #include <grpc/grpc_security.h>
27 #include <grpc/impl/channel_arg_names.h>
28 #include <grpc/impl/propagation_bits.h>
29 #include <grpc/slice.h>
30 #include <grpc/status.h>
31 #include <grpc/support/log.h>
32 #include <grpc/support/port_platform.h>
33 #include <grpc/support/time.h>
34 
35 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
36 #include "src/core/lib/channel/channel_args.h"
37 #include "src/core/lib/gprpp/host_port.h"
38 #include "src/core/lib/gprpp/sync.h"
39 #include "test/core/util/port.h"
40 #include "test/core/util/test_config.h"
41 
42 namespace {
43 
44 class TestServer {
45  public:
TestServer(grpc_completion_queue * cq,grpc_channel_args * channel_args)46   explicit TestServer(grpc_completion_queue* cq,
47                       grpc_channel_args* channel_args)
48       : cq_(cq) {
49     server_ = grpc_server_create(channel_args, nullptr);
50     address_ = grpc_core::JoinHostPort("[::1]", grpc_pick_unused_port_or_die());
51     grpc_server_register_completion_queue(server_, cq_, nullptr);
52     grpc_server_credentials* server_creds =
53         grpc_insecure_server_credentials_create();
54     GPR_ASSERT(
55         grpc_server_add_http2_port(server_, address_.c_str(), server_creds));
56     grpc_server_credentials_release(server_creds);
57     grpc_server_start(server_);
58   }
59 
~TestServer()60   ~TestServer() {
61     grpc_server_shutdown_and_notify(server_, cq_, this /* tag */);
62     grpc_event event = grpc_completion_queue_next(
63         cq_, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
64     GPR_ASSERT(event.type == GRPC_OP_COMPLETE);
65     GPR_ASSERT(event.success);
66     GPR_ASSERT(event.tag == this);
67     grpc_server_destroy(server_);
68   }
69 
HandleRpc()70   void HandleRpc() {
71     grpc_call_details call_details;
72     grpc_call_details_init(&call_details);
73     grpc_metadata_array request_metadata_recv;
74     grpc_metadata_array_init(&request_metadata_recv);
75     grpc_slice status_details = grpc_slice_from_static_string("xyz");
76     int was_cancelled;
77     // request a call
78     void* tag = this;
79     grpc_call* call;
80     grpc_call_error error = grpc_server_request_call(
81         server_, &call, &call_details, &request_metadata_recv, cq_, cq_, tag);
82     GPR_ASSERT(error == GRPC_CALL_OK);
83     grpc_event event = grpc_completion_queue_next(
84         cq_, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
85     GPR_ASSERT(event.type == GRPC_OP_COMPLETE);
86     grpc_call_details_destroy(&call_details);
87     grpc_metadata_array_destroy(&request_metadata_recv);
88     GPR_ASSERT(event.success);
89     GPR_ASSERT(event.tag == tag);
90     // Send a response with a 1-byte payload. The 1-byte length is important
91     // because it's enough to get the client to *queue* a flow control update,
92     // but not long enough to get the client to initiate a write on that update.
93     grpc_slice response_payload_slice = grpc_slice_from_static_string("a");
94     grpc_byte_buffer* response_payload =
95         grpc_raw_byte_buffer_create(&response_payload_slice, 1);
96     grpc_op ops[4];
97     grpc_op* op;
98     memset(ops, 0, sizeof(ops));
99     op = ops;
100     op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
101     op->data.recv_close_on_server.cancelled = &was_cancelled;
102     op++;
103     op->op = GRPC_OP_SEND_INITIAL_METADATA;
104     op++;
105     op->op = GRPC_OP_SEND_MESSAGE;
106     op->data.send_message.send_message = response_payload;
107     op++;
108     op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
109     op->data.send_status_from_server.status = GRPC_STATUS_OK;
110     op->data.send_status_from_server.status_details = &status_details;
111     op++;
112     error = grpc_call_start_batch(call, ops, static_cast<size_t>(op - ops), tag,
113                                   nullptr);
114     GPR_ASSERT(error == GRPC_CALL_OK);
115     event = grpc_completion_queue_next(cq_, gpr_inf_future(GPR_CLOCK_REALTIME),
116                                        nullptr);
117     GPR_ASSERT(event.type == GRPC_OP_COMPLETE);
118     GPR_ASSERT(event.success);
119     GPR_ASSERT(event.tag == tag);
120     grpc_byte_buffer_destroy(response_payload);
121     grpc_call_unref(call);
122   }
123 
address() const124   std::string address() const { return address_; }
125 
126  private:
127   grpc_server* server_;
128   grpc_completion_queue* cq_;
129   std::string address_;
130 };
131 
StartCallAndCloseWrites(grpc_call * call,grpc_completion_queue * cq)132 void StartCallAndCloseWrites(grpc_call* call, grpc_completion_queue* cq) {
133   grpc_op ops[2];
134   grpc_op* op;
135   memset(ops, 0, sizeof(ops));
136   op = ops;
137   op->op = GRPC_OP_SEND_INITIAL_METADATA;
138   op++;
139   op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
140   op++;
141   void* tag = call;
142   grpc_call_error error = grpc_call_start_batch(
143       call, ops, static_cast<size_t>(op - ops), tag, nullptr);
144   GPR_ASSERT(GRPC_CALL_OK == error);
145   grpc_event event = grpc_completion_queue_next(
146       cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
147   GPR_ASSERT(event.type == GRPC_OP_COMPLETE);
148   GPR_ASSERT(event.success);
149   GPR_ASSERT(event.tag == tag);
150 }
151 
FinishCall(grpc_call * call,grpc_completion_queue * cq)152 void FinishCall(grpc_call* call, grpc_completion_queue* cq) {
153   grpc_metadata_array initial_metadata_recv;
154   grpc_metadata_array_init(&initial_metadata_recv);
155   grpc_metadata_array trailing_metadata_recv;
156   grpc_metadata_array_init(&trailing_metadata_recv);
157   grpc_status_code status = GRPC_STATUS_UNKNOWN;
158   grpc_slice details;
159   grpc_byte_buffer* recv_payload = nullptr;
160   void* tag = call;
161   // Note: we're only doing read ops here.  The goal here is to finish the call
162   // with a queued stream flow control update, due to receipt of a small
163   // message. We won't do anything to explicitly initiate writes on the
164   // transport, which could accidentally flush out that queued update.
165   grpc_op ops[3];
166   grpc_op* op;
167   memset(ops, 0, sizeof(ops));
168   op = ops;
169   op->op = GRPC_OP_RECV_INITIAL_METADATA;
170   op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
171   op++;
172   op->op = GRPC_OP_RECV_MESSAGE;
173   op->data.recv_message.recv_message = &recv_payload;
174   op++;
175   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
176   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
177   op->data.recv_status_on_client.status = &status;
178   op->data.recv_status_on_client.status_details = &details;
179   op++;
180   grpc_call_error error = grpc_call_start_batch(
181       call, ops, static_cast<size_t>(op - ops), tag, nullptr);
182   GPR_ASSERT(GRPC_CALL_OK == error);
183   grpc_event event = grpc_completion_queue_next(
184       cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
185   GPR_ASSERT(event.type == GRPC_OP_COMPLETE);
186   GPR_ASSERT(event.success);
187   GPR_ASSERT(event.tag == tag);
188   EXPECT_EQ(status, GRPC_STATUS_OK);
189   grpc_byte_buffer_destroy(recv_payload);
190   grpc_metadata_array_destroy(&initial_metadata_recv);
191   grpc_metadata_array_destroy(&trailing_metadata_recv);
192   grpc_slice_unref(details);
193 }
194 
195 class TransportCounter {
196  public:
InitCallback()197   void InitCallback() {
198     grpc_core::MutexLock lock(&mu_);
199     ++num_created_;
200     ++num_live_;
201     gpr_log(GPR_INFO,
202             "TransportCounter num_created_=%ld num_live_=%" PRId64
203             " InitCallback",
204             num_created_, num_live_);
205   }
206 
DestructCallback()207   void DestructCallback() {
208     grpc_core::MutexLock lock(&mu_);
209     --num_live_;
210     gpr_log(GPR_INFO,
211             "TransportCounter num_created_=%ld num_live_=%" PRId64
212             " DestructCallback",
213             num_created_, num_live_);
214   }
215 
num_live()216   int64_t num_live() {
217     grpc_core::MutexLock lock(&mu_);
218     return num_live_;
219   }
220 
num_created()221   size_t num_created() {
222     grpc_core::MutexLock lock(&mu_);
223     return num_created_;
224   }
225 
226  private:
227   grpc_core::Mutex mu_;
228   int64_t num_live_ ABSL_GUARDED_BY(mu_) = 0;
229   size_t num_created_ ABSL_GUARDED_BY(mu_) = 0;
230 };
231 
232 TransportCounter* g_transport_counter;
233 
CounterInitCallback()234 void CounterInitCallback() { g_transport_counter->InitCallback(); }
235 
CounterDestructCallback()236 void CounterDestructCallback() { g_transport_counter->DestructCallback(); }
237 
EnsureConnectionsArentLeaked(grpc_completion_queue * cq)238 void EnsureConnectionsArentLeaked(grpc_completion_queue* cq) {
239   gpr_log(
240       GPR_INFO,
241       "The channel has been destroyed, wait for it to shut down and close...");
242   // Do a quick initial poll to try to exit the test early if things have
243   // already cleaned up.
244   GPR_ASSERT(grpc_completion_queue_next(
245                  cq,
246                  gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
247                               gpr_time_from_millis(1, GPR_TIMESPAN)),
248                  nullptr)
249                  .type == GRPC_QUEUE_TIMEOUT);
250   if (g_transport_counter->num_created() < 2) {
251     gpr_log(GPR_ERROR,
252             "g_transport_counter->num_created() == %ld. This means that "
253             "g_transport_counter isn't working and this test is broken. At "
254             "least a couple of transport objects should have been created.",
255             g_transport_counter->num_created());
256     GPR_ASSERT(0);
257   }
258   gpr_timespec overall_deadline = grpc_timeout_seconds_to_deadline(120);
259   for (;;) {
260     // Note: the main goal of this test is to try to repro a chttp2 stream leak,
261     // which also holds on to transports objects.
262     int64_t live_transports = g_transport_counter->num_live();
263     if (live_transports == 0) return;
264     if (gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), overall_deadline) > 0) {
265       gpr_log(GPR_INFO,
266               "g_transport_counter->num_live() never returned 0. "
267               "It's likely this test has triggered a connection leak.");
268       GPR_ASSERT(0);
269     }
270     gpr_log(GPR_INFO,
271             "g_transport_counter->num_live() returned %" PRId64
272             ", keep waiting "
273             "until it reaches 0",
274             live_transports);
275     GPR_ASSERT(grpc_completion_queue_next(
276                    cq,
277                    gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
278                                 gpr_time_from_seconds(1, GPR_TIMESPAN)),
279                    nullptr)
280                    .type == GRPC_QUEUE_TIMEOUT);
281   }
282 }
283 
TEST(Chttp2,TestStreamDoesntLeakWhenItsWriteClosedAndThenReadClosedBeforeStartOfReadingMessageAndStatus)284 TEST(
285     Chttp2,
286     TestStreamDoesntLeakWhenItsWriteClosedAndThenReadClosedBeforeStartOfReadingMessageAndStatus) {
287   grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
288   {
289     // Prevent pings from client to server and server to client, since they can
290     // cause chttp2 to initiate writes and thus dodge the bug we're trying to
291     // repro.
292     auto channel_args =
293         grpc_core::ChannelArgs().Set(GRPC_ARG_HTTP2_BDP_PROBE, 0);
294     TestServer server(cq,
295                       const_cast<grpc_channel_args*>(channel_args.ToC().get()));
296     grpc_channel_credentials* creds = grpc_insecure_credentials_create();
297     grpc_channel* channel =
298         grpc_channel_create(absl::StrCat("ipv6:", server.address()).c_str(),
299                             creds, channel_args.ToC().get());
300     grpc_channel_credentials_release(creds);
301     grpc_call* call =
302         grpc_channel_create_call(channel, nullptr, GRPC_PROPAGATE_DEFAULTS, cq,
303                                  grpc_slice_from_static_string("/foo"), nullptr,
304                                  gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
305     // Start the call. It's important for our repro to close writes before
306     // reading the response, so that the client transport marks the stream
307     // both read and write closed as soon as it reads a status off the wire.
308     StartCallAndCloseWrites(call, cq);
309     // Send a small message from server to client. The message needs to be small
310     // enough such that the client will queue a stream flow control update,
311     // without flushing it out to the wire.
312     server.HandleRpc();
313     // Do some polling to let the client to pick up the message and status off
314     // the wire, *before* it begins the RECV_MESSAGE and RECV_STATUS ops.
315     // The timeout here just needs to be long enough that the client has
316     // most likely reads everything the server sent it by the time it's done.
317     GPR_ASSERT(grpc_completion_queue_next(
318                    cq, grpc_timeout_milliseconds_to_deadline(20), nullptr)
319                    .type == GRPC_QUEUE_TIMEOUT);
320     // Perform the receive message and status. Note that the incoming bytes
321     // should already be in the client's buffers by the time we start these ops.
322     // Thus, the client should *not* need to urgently send a flow control update
323     // to the server, to ensure progress, and it can simply queue the flow
324     // control update instead.
325     FinishCall(call, cq);
326     grpc_call_unref(call);
327     grpc_channel_destroy(channel);
328     // There should be nothing to prevent stream and transport objects from
329     // shutdown and destruction at this point. So check that this happens.
330     // The timeout is somewhat arbitrary, and is set long enough so that it's
331     // extremely unlikely to be hit due to CPU starvation.
332     EnsureConnectionsArentLeaked(cq);
333   }
334   grpc_completion_queue_shutdown(cq);
335   while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME),
336                                     nullptr)
337              .type != GRPC_QUEUE_SHUTDOWN) {
338   }
339   grpc_completion_queue_destroy(cq);
340 }
341 
342 }  // namespace
343 
main(int argc,char ** argv)344 int main(int argc, char** argv) {
345   ::testing::InitGoogleTest(&argc, argv);
346   grpc::testing::TestEnvironment env(&argc, argv);
347   grpc_init();
348   g_transport_counter = new TransportCounter();
349   grpc_core::TestOnlySetGlobalHttp2TransportInitCallback(CounterInitCallback);
350   grpc_core::TestOnlySetGlobalHttp2TransportDestructCallback(
351       CounterDestructCallback);
352   auto result = RUN_ALL_TESTS();
353   grpc_shutdown();
354   return result;
355 }
356