1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include <grpc/byte_buffer.h>
16 #include <grpc/credentials.h>
17 #include <grpc/grpc.h>
18 #include <grpc/grpc_security.h>
19 #include <grpc/impl/channel_arg_names.h>
20 #include <grpc/impl/propagation_bits.h>
21 #include <grpc/slice.h>
22 #include <grpc/status.h>
23 #include <grpc/support/port_platform.h>
24 #include <grpc/support/time.h>
25 #include <inttypes.h>
26 #include <string.h>
27
28 #include <string>
29
30 #include "absl/base/thread_annotations.h"
31 #include "absl/log/check.h"
32 #include "absl/log/log.h"
33 #include "absl/strings/str_cat.h"
34 #include "gtest/gtest.h"
35 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
36 #include "src/core/lib/channel/channel_args.h"
37 #include "src/core/util/host_port.h"
38 #include "src/core/util/sync.h"
39 #include "test/core/test_util/port.h"
40 #include "test/core/test_util/test_config.h"
41
42 namespace {
43
44 class TestServer {
45 public:
TestServer(grpc_completion_queue * cq,grpc_channel_args * channel_args)46 explicit TestServer(grpc_completion_queue* cq,
47 grpc_channel_args* channel_args)
48 : cq_(cq) {
49 server_ = grpc_server_create(channel_args, nullptr);
50 address_ = grpc_core::JoinHostPort("[::1]", grpc_pick_unused_port_or_die());
51 grpc_server_register_completion_queue(server_, cq_, nullptr);
52 grpc_server_credentials* server_creds =
53 grpc_insecure_server_credentials_create();
54 CHECK(grpc_server_add_http2_port(server_, address_.c_str(), server_creds));
55 grpc_server_credentials_release(server_creds);
56 grpc_server_start(server_);
57 }
58
~TestServer()59 ~TestServer() {
60 grpc_server_shutdown_and_notify(server_, cq_, this /* tag */);
61 grpc_event event = grpc_completion_queue_next(
62 cq_, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
63 CHECK(event.type == GRPC_OP_COMPLETE);
64 CHECK(event.success);
65 CHECK(event.tag == this);
66 grpc_server_destroy(server_);
67 }
68
HandleRpc()69 void HandleRpc() {
70 grpc_call_details call_details;
71 grpc_call_details_init(&call_details);
72 grpc_metadata_array request_metadata_recv;
73 grpc_metadata_array_init(&request_metadata_recv);
74 grpc_slice status_details = grpc_slice_from_static_string("xyz");
75 int was_cancelled;
76 // request a call
77 void* tag = this;
78 grpc_call* call;
79 grpc_call_error error = grpc_server_request_call(
80 server_, &call, &call_details, &request_metadata_recv, cq_, cq_, tag);
81 CHECK_EQ(error, GRPC_CALL_OK);
82 grpc_event event = grpc_completion_queue_next(
83 cq_, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
84 CHECK(event.type == GRPC_OP_COMPLETE);
85 grpc_call_details_destroy(&call_details);
86 grpc_metadata_array_destroy(&request_metadata_recv);
87 CHECK(event.success);
88 CHECK(event.tag == tag);
89 // Send a response with a 1-byte payload. The 1-byte length is important
90 // because it's enough to get the client to *queue* a flow control update,
91 // but not long enough to get the client to initiate a write on that update.
92 grpc_slice response_payload_slice = grpc_slice_from_static_string("a");
93 grpc_byte_buffer* response_payload =
94 grpc_raw_byte_buffer_create(&response_payload_slice, 1);
95 grpc_op ops[4];
96 grpc_op* op;
97 memset(ops, 0, sizeof(ops));
98 op = ops;
99 op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
100 op->data.recv_close_on_server.cancelled = &was_cancelled;
101 op++;
102 op->op = GRPC_OP_SEND_INITIAL_METADATA;
103 op++;
104 op->op = GRPC_OP_SEND_MESSAGE;
105 op->data.send_message.send_message = response_payload;
106 op++;
107 op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
108 op->data.send_status_from_server.status = GRPC_STATUS_OK;
109 op->data.send_status_from_server.status_details = &status_details;
110 op++;
111 error = grpc_call_start_batch(call, ops, static_cast<size_t>(op - ops), tag,
112 nullptr);
113 CHECK_EQ(error, GRPC_CALL_OK);
114 event = grpc_completion_queue_next(cq_, gpr_inf_future(GPR_CLOCK_REALTIME),
115 nullptr);
116 CHECK(event.type == GRPC_OP_COMPLETE);
117 CHECK(event.success);
118 CHECK(event.tag == tag);
119 grpc_byte_buffer_destroy(response_payload);
120 grpc_call_unref(call);
121 }
122
address() const123 std::string address() const { return address_; }
124
125 private:
126 grpc_server* server_;
127 grpc_completion_queue* cq_;
128 std::string address_;
129 };
130
StartCallAndCloseWrites(grpc_call * call,grpc_completion_queue * cq)131 void StartCallAndCloseWrites(grpc_call* call, grpc_completion_queue* cq) {
132 grpc_op ops[2];
133 grpc_op* op;
134 memset(ops, 0, sizeof(ops));
135 op = ops;
136 op->op = GRPC_OP_SEND_INITIAL_METADATA;
137 op++;
138 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
139 op++;
140 void* tag = call;
141 grpc_call_error error = grpc_call_start_batch(
142 call, ops, static_cast<size_t>(op - ops), tag, nullptr);
143 CHECK_EQ(error, GRPC_CALL_OK);
144 grpc_event event = grpc_completion_queue_next(
145 cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
146 CHECK(event.type == GRPC_OP_COMPLETE);
147 CHECK(event.success);
148 CHECK(event.tag == tag);
149 }
150
FinishCall(grpc_call * call,grpc_completion_queue * cq)151 void FinishCall(grpc_call* call, grpc_completion_queue* cq) {
152 grpc_metadata_array initial_metadata_recv;
153 grpc_metadata_array_init(&initial_metadata_recv);
154 grpc_metadata_array trailing_metadata_recv;
155 grpc_metadata_array_init(&trailing_metadata_recv);
156 grpc_status_code status = GRPC_STATUS_UNKNOWN;
157 grpc_slice details;
158 grpc_byte_buffer* recv_payload = nullptr;
159 void* tag = call;
160 // Note: we're only doing read ops here. The goal here is to finish the call
161 // with a queued stream flow control update, due to receipt of a small
162 // message. We won't do anything to explicitly initiate writes on the
163 // transport, which could accidentally flush out that queued update.
164 grpc_op ops[3];
165 grpc_op* op;
166 memset(ops, 0, sizeof(ops));
167 op = ops;
168 op->op = GRPC_OP_RECV_INITIAL_METADATA;
169 op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
170 op++;
171 op->op = GRPC_OP_RECV_MESSAGE;
172 op->data.recv_message.recv_message = &recv_payload;
173 op++;
174 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
175 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
176 op->data.recv_status_on_client.status = &status;
177 op->data.recv_status_on_client.status_details = &details;
178 op++;
179 grpc_call_error error = grpc_call_start_batch(
180 call, ops, static_cast<size_t>(op - ops), tag, nullptr);
181 CHECK_EQ(error, GRPC_CALL_OK);
182 grpc_event event = grpc_completion_queue_next(
183 cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
184 CHECK(event.type == GRPC_OP_COMPLETE);
185 CHECK(event.success);
186 CHECK(event.tag == tag);
187 EXPECT_EQ(status, GRPC_STATUS_OK);
188 grpc_byte_buffer_destroy(recv_payload);
189 grpc_metadata_array_destroy(&initial_metadata_recv);
190 grpc_metadata_array_destroy(&trailing_metadata_recv);
191 grpc_slice_unref(details);
192 }
193
194 class TransportCounter {
195 public:
InitCallback()196 void InitCallback() {
197 grpc_core::MutexLock lock(&mu_);
198 ++num_created_;
199 ++num_live_;
200 LOG(INFO) << "TransportCounter num_created_=" << num_created_
201 << " num_live_=" << num_live_ << " InitCallback";
202 }
203
DestructCallback()204 void DestructCallback() {
205 grpc_core::MutexLock lock(&mu_);
206 --num_live_;
207 LOG(INFO) << "TransportCounter num_created_=" << num_created_
208 << " num_live_=" << num_live_ << " DestructCallback";
209 }
210
num_live()211 int64_t num_live() {
212 grpc_core::MutexLock lock(&mu_);
213 return num_live_;
214 }
215
num_created()216 size_t num_created() {
217 grpc_core::MutexLock lock(&mu_);
218 return num_created_;
219 }
220
221 private:
222 grpc_core::Mutex mu_;
223 int64_t num_live_ ABSL_GUARDED_BY(mu_) = 0;
224 size_t num_created_ ABSL_GUARDED_BY(mu_) = 0;
225 };
226
227 TransportCounter* g_transport_counter;
228
CounterInitCallback()229 void CounterInitCallback() { g_transport_counter->InitCallback(); }
230
CounterDestructCallback()231 void CounterDestructCallback() { g_transport_counter->DestructCallback(); }
232
EnsureConnectionsArentLeaked(grpc_completion_queue * cq)233 void EnsureConnectionsArentLeaked(grpc_completion_queue* cq) {
234 LOG(INFO) << "The channel has been destroyed, wait for it to shut down and "
235 "close...";
236 // Do a quick initial poll to try to exit the test early if things have
237 // already cleaned up.
238 CHECK(grpc_completion_queue_next(
239 cq,
240 gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
241 gpr_time_from_millis(1, GPR_TIMESPAN)),
242 nullptr)
243 .type == GRPC_QUEUE_TIMEOUT);
244 if (g_transport_counter->num_created() < 2) {
245 LOG(ERROR) << "g_transport_counter->num_created() == "
246 << g_transport_counter->num_created()
247 << ". This means that g_transport_counter isn't working and "
248 "this test is broken. At least a couple of transport objects "
249 "should have been created.";
250 CHECK(0);
251 }
252 gpr_timespec overall_deadline = grpc_timeout_seconds_to_deadline(120);
253 for (;;) {
254 // Note: the main goal of this test is to try to repro a chttp2 stream
255 // leak, which also holds on to transports objects.
256 int64_t live_transports = g_transport_counter->num_live();
257 if (live_transports == 0) return;
258 if (gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), overall_deadline) > 0) {
259 LOG(INFO) << "g_transport_counter->num_live() never returned 0. "
260 "It's likely this test has triggered a connection leak.";
261 CHECK(0);
262 }
263 LOG(INFO) << "g_transport_counter->num_live() returned " << live_transports
264 << ", keep waiting until it reaches 0";
265 CHECK(grpc_completion_queue_next(
266 cq,
267 gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
268 gpr_time_from_seconds(1, GPR_TIMESPAN)),
269 nullptr)
270 .type == GRPC_QUEUE_TIMEOUT);
271 }
272 }
273
TEST(Chttp2,TestStreamDoesntLeakWhenItsWriteClosedAndThenReadClosedBeforeStartOfReadingMessageAndStatus)274 TEST(
275 Chttp2,
276 TestStreamDoesntLeakWhenItsWriteClosedAndThenReadClosedBeforeStartOfReadingMessageAndStatus) {
277 grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
278 {
279 // Prevent pings from client to server and server to client, since they can
280 // cause chttp2 to initiate writes and thus dodge the bug we're trying to
281 // repro.
282 auto channel_args =
283 grpc_core::ChannelArgs().Set(GRPC_ARG_HTTP2_BDP_PROBE, 0);
284 TestServer server(cq,
285 const_cast<grpc_channel_args*>(channel_args.ToC().get()));
286 grpc_channel_credentials* creds = grpc_insecure_credentials_create();
287 grpc_channel* channel =
288 grpc_channel_create(absl::StrCat("ipv6:", server.address()).c_str(),
289 creds, channel_args.ToC().get());
290 grpc_channel_credentials_release(creds);
291 grpc_call* call =
292 grpc_channel_create_call(channel, nullptr, GRPC_PROPAGATE_DEFAULTS, cq,
293 grpc_slice_from_static_string("/foo"), nullptr,
294 gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
295 // Start the call. It's important for our repro to close writes before
296 // reading the response, so that the client transport marks the stream
297 // both read and write closed as soon as it reads a status off the wire.
298 StartCallAndCloseWrites(call, cq);
299 // Send a small message from server to client. The message needs to be small
300 // enough such that the client will queue a stream flow control update,
301 // without flushing it out to the wire.
302 server.HandleRpc();
303 // Do some polling to let the client to pick up the message and status off
304 // the wire, *before* it begins the RECV_MESSAGE and RECV_STATUS ops.
305 // The timeout here just needs to be long enough that the client has
306 // most likely reads everything the server sent it by the time it's done.
307 CHECK(grpc_completion_queue_next(
308 cq, grpc_timeout_milliseconds_to_deadline(20), nullptr)
309 .type == GRPC_QUEUE_TIMEOUT);
310 // Perform the receive message and status. Note that the incoming bytes
311 // should already be in the client's buffers by the time we start these ops.
312 // Thus, the client should *not* need to urgently send a flow control update
313 // to the server, to ensure progress, and it can simply queue the flow
314 // control update instead.
315 FinishCall(call, cq);
316 grpc_call_unref(call);
317 grpc_channel_destroy(channel);
318 // There should be nothing to prevent stream and transport objects from
319 // shutdown and destruction at this point. So check that this happens.
320 // The timeout is somewhat arbitrary, and is set long enough so that it's
321 // extremely unlikely to be hit due to CPU starvation.
322 EnsureConnectionsArentLeaked(cq);
323 }
324 grpc_completion_queue_shutdown(cq);
325 while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME),
326 nullptr)
327 .type != GRPC_QUEUE_SHUTDOWN) {
328 }
329 grpc_completion_queue_destroy(cq);
330 }
331
332 } // namespace
333
main(int argc,char ** argv)334 int main(int argc, char** argv) {
335 ::testing::InitGoogleTest(&argc, argv);
336 grpc::testing::TestEnvironment env(&argc, argv);
337 grpc_init();
338 g_transport_counter = new TransportCounter();
339 grpc_core::TestOnlySetGlobalHttp2TransportInitCallback(CounterInitCallback);
340 grpc_core::TestOnlySetGlobalHttp2TransportDestructCallback(
341 CounterDestructCallback);
342 auto result = RUN_ALL_TESTS();
343 grpc_shutdown();
344 return result;
345 }
346