1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include <inttypes.h>
16 #include <string.h>
17
18 #include <string>
19
20 #include "absl/base/thread_annotations.h"
21 #include "absl/strings/str_cat.h"
22 #include "gtest/gtest.h"
23
24 #include <grpc/byte_buffer.h>
25 #include <grpc/grpc.h>
26 #include <grpc/grpc_security.h>
27 #include <grpc/impl/channel_arg_names.h>
28 #include <grpc/impl/propagation_bits.h>
29 #include <grpc/slice.h>
30 #include <grpc/status.h>
31 #include <grpc/support/log.h>
32 #include <grpc/support/port_platform.h>
33 #include <grpc/support/time.h>
34
35 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
36 #include "src/core/lib/channel/channel_args.h"
37 #include "src/core/lib/gprpp/host_port.h"
38 #include "src/core/lib/gprpp/sync.h"
39 #include "test/core/util/port.h"
40 #include "test/core/util/test_config.h"
41
42 namespace {
43
44 class TestServer {
45 public:
TestServer(grpc_completion_queue * cq,grpc_channel_args * channel_args)46 explicit TestServer(grpc_completion_queue* cq,
47 grpc_channel_args* channel_args)
48 : cq_(cq) {
49 server_ = grpc_server_create(channel_args, nullptr);
50 address_ = grpc_core::JoinHostPort("[::1]", grpc_pick_unused_port_or_die());
51 grpc_server_register_completion_queue(server_, cq_, nullptr);
52 grpc_server_credentials* server_creds =
53 grpc_insecure_server_credentials_create();
54 GPR_ASSERT(
55 grpc_server_add_http2_port(server_, address_.c_str(), server_creds));
56 grpc_server_credentials_release(server_creds);
57 grpc_server_start(server_);
58 }
59
~TestServer()60 ~TestServer() {
61 grpc_server_shutdown_and_notify(server_, cq_, this /* tag */);
62 grpc_event event = grpc_completion_queue_next(
63 cq_, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
64 GPR_ASSERT(event.type == GRPC_OP_COMPLETE);
65 GPR_ASSERT(event.success);
66 GPR_ASSERT(event.tag == this);
67 grpc_server_destroy(server_);
68 }
69
HandleRpc()70 void HandleRpc() {
71 grpc_call_details call_details;
72 grpc_call_details_init(&call_details);
73 grpc_metadata_array request_metadata_recv;
74 grpc_metadata_array_init(&request_metadata_recv);
75 grpc_slice status_details = grpc_slice_from_static_string("xyz");
76 int was_cancelled;
77 // request a call
78 void* tag = this;
79 grpc_call* call;
80 grpc_call_error error = grpc_server_request_call(
81 server_, &call, &call_details, &request_metadata_recv, cq_, cq_, tag);
82 GPR_ASSERT(error == GRPC_CALL_OK);
83 grpc_event event = grpc_completion_queue_next(
84 cq_, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
85 GPR_ASSERT(event.type == GRPC_OP_COMPLETE);
86 grpc_call_details_destroy(&call_details);
87 grpc_metadata_array_destroy(&request_metadata_recv);
88 GPR_ASSERT(event.success);
89 GPR_ASSERT(event.tag == tag);
90 // Send a response with a 1-byte payload. The 1-byte length is important
91 // because it's enough to get the client to *queue* a flow control update,
92 // but not long enough to get the client to initiate a write on that update.
93 grpc_slice response_payload_slice = grpc_slice_from_static_string("a");
94 grpc_byte_buffer* response_payload =
95 grpc_raw_byte_buffer_create(&response_payload_slice, 1);
96 grpc_op ops[4];
97 grpc_op* op;
98 memset(ops, 0, sizeof(ops));
99 op = ops;
100 op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
101 op->data.recv_close_on_server.cancelled = &was_cancelled;
102 op++;
103 op->op = GRPC_OP_SEND_INITIAL_METADATA;
104 op++;
105 op->op = GRPC_OP_SEND_MESSAGE;
106 op->data.send_message.send_message = response_payload;
107 op++;
108 op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
109 op->data.send_status_from_server.status = GRPC_STATUS_OK;
110 op->data.send_status_from_server.status_details = &status_details;
111 op++;
112 error = grpc_call_start_batch(call, ops, static_cast<size_t>(op - ops), tag,
113 nullptr);
114 GPR_ASSERT(error == GRPC_CALL_OK);
115 event = grpc_completion_queue_next(cq_, gpr_inf_future(GPR_CLOCK_REALTIME),
116 nullptr);
117 GPR_ASSERT(event.type == GRPC_OP_COMPLETE);
118 GPR_ASSERT(event.success);
119 GPR_ASSERT(event.tag == tag);
120 grpc_byte_buffer_destroy(response_payload);
121 grpc_call_unref(call);
122 }
123
address() const124 std::string address() const { return address_; }
125
126 private:
127 grpc_server* server_;
128 grpc_completion_queue* cq_;
129 std::string address_;
130 };
131
StartCallAndCloseWrites(grpc_call * call,grpc_completion_queue * cq)132 void StartCallAndCloseWrites(grpc_call* call, grpc_completion_queue* cq) {
133 grpc_op ops[2];
134 grpc_op* op;
135 memset(ops, 0, sizeof(ops));
136 op = ops;
137 op->op = GRPC_OP_SEND_INITIAL_METADATA;
138 op++;
139 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
140 op++;
141 void* tag = call;
142 grpc_call_error error = grpc_call_start_batch(
143 call, ops, static_cast<size_t>(op - ops), tag, nullptr);
144 GPR_ASSERT(GRPC_CALL_OK == error);
145 grpc_event event = grpc_completion_queue_next(
146 cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
147 GPR_ASSERT(event.type == GRPC_OP_COMPLETE);
148 GPR_ASSERT(event.success);
149 GPR_ASSERT(event.tag == tag);
150 }
151
FinishCall(grpc_call * call,grpc_completion_queue * cq)152 void FinishCall(grpc_call* call, grpc_completion_queue* cq) {
153 grpc_metadata_array initial_metadata_recv;
154 grpc_metadata_array_init(&initial_metadata_recv);
155 grpc_metadata_array trailing_metadata_recv;
156 grpc_metadata_array_init(&trailing_metadata_recv);
157 grpc_status_code status = GRPC_STATUS_UNKNOWN;
158 grpc_slice details;
159 grpc_byte_buffer* recv_payload = nullptr;
160 void* tag = call;
161 // Note: we're only doing read ops here. The goal here is to finish the call
162 // with a queued stream flow control update, due to receipt of a small
163 // message. We won't do anything to explicitly initiate writes on the
164 // transport, which could accidentally flush out that queued update.
165 grpc_op ops[3];
166 grpc_op* op;
167 memset(ops, 0, sizeof(ops));
168 op = ops;
169 op->op = GRPC_OP_RECV_INITIAL_METADATA;
170 op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
171 op++;
172 op->op = GRPC_OP_RECV_MESSAGE;
173 op->data.recv_message.recv_message = &recv_payload;
174 op++;
175 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
176 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
177 op->data.recv_status_on_client.status = &status;
178 op->data.recv_status_on_client.status_details = &details;
179 op++;
180 grpc_call_error error = grpc_call_start_batch(
181 call, ops, static_cast<size_t>(op - ops), tag, nullptr);
182 GPR_ASSERT(GRPC_CALL_OK == error);
183 grpc_event event = grpc_completion_queue_next(
184 cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
185 GPR_ASSERT(event.type == GRPC_OP_COMPLETE);
186 GPR_ASSERT(event.success);
187 GPR_ASSERT(event.tag == tag);
188 EXPECT_EQ(status, GRPC_STATUS_OK);
189 grpc_byte_buffer_destroy(recv_payload);
190 grpc_metadata_array_destroy(&initial_metadata_recv);
191 grpc_metadata_array_destroy(&trailing_metadata_recv);
192 grpc_slice_unref(details);
193 }
194
195 class TransportCounter {
196 public:
InitCallback()197 void InitCallback() {
198 grpc_core::MutexLock lock(&mu_);
199 ++num_created_;
200 ++num_live_;
201 gpr_log(GPR_INFO,
202 "TransportCounter num_created_=%ld num_live_=%" PRId64
203 " InitCallback",
204 num_created_, num_live_);
205 }
206
DestructCallback()207 void DestructCallback() {
208 grpc_core::MutexLock lock(&mu_);
209 --num_live_;
210 gpr_log(GPR_INFO,
211 "TransportCounter num_created_=%ld num_live_=%" PRId64
212 " DestructCallback",
213 num_created_, num_live_);
214 }
215
num_live()216 int64_t num_live() {
217 grpc_core::MutexLock lock(&mu_);
218 return num_live_;
219 }
220
num_created()221 size_t num_created() {
222 grpc_core::MutexLock lock(&mu_);
223 return num_created_;
224 }
225
226 private:
227 grpc_core::Mutex mu_;
228 int64_t num_live_ ABSL_GUARDED_BY(mu_) = 0;
229 size_t num_created_ ABSL_GUARDED_BY(mu_) = 0;
230 };
231
232 TransportCounter* g_transport_counter;
233
CounterInitCallback()234 void CounterInitCallback() { g_transport_counter->InitCallback(); }
235
CounterDestructCallback()236 void CounterDestructCallback() { g_transport_counter->DestructCallback(); }
237
EnsureConnectionsArentLeaked(grpc_completion_queue * cq)238 void EnsureConnectionsArentLeaked(grpc_completion_queue* cq) {
239 gpr_log(
240 GPR_INFO,
241 "The channel has been destroyed, wait for it to shut down and close...");
242 // Do a quick initial poll to try to exit the test early if things have
243 // already cleaned up.
244 GPR_ASSERT(grpc_completion_queue_next(
245 cq,
246 gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
247 gpr_time_from_millis(1, GPR_TIMESPAN)),
248 nullptr)
249 .type == GRPC_QUEUE_TIMEOUT);
250 if (g_transport_counter->num_created() < 2) {
251 gpr_log(GPR_ERROR,
252 "g_transport_counter->num_created() == %ld. This means that "
253 "g_transport_counter isn't working and this test is broken. At "
254 "least a couple of transport objects should have been created.",
255 g_transport_counter->num_created());
256 GPR_ASSERT(0);
257 }
258 gpr_timespec overall_deadline = grpc_timeout_seconds_to_deadline(120);
259 for (;;) {
260 // Note: the main goal of this test is to try to repro a chttp2 stream leak,
261 // which also holds on to transports objects.
262 int64_t live_transports = g_transport_counter->num_live();
263 if (live_transports == 0) return;
264 if (gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), overall_deadline) > 0) {
265 gpr_log(GPR_INFO,
266 "g_transport_counter->num_live() never returned 0. "
267 "It's likely this test has triggered a connection leak.");
268 GPR_ASSERT(0);
269 }
270 gpr_log(GPR_INFO,
271 "g_transport_counter->num_live() returned %" PRId64
272 ", keep waiting "
273 "until it reaches 0",
274 live_transports);
275 GPR_ASSERT(grpc_completion_queue_next(
276 cq,
277 gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
278 gpr_time_from_seconds(1, GPR_TIMESPAN)),
279 nullptr)
280 .type == GRPC_QUEUE_TIMEOUT);
281 }
282 }
283
TEST(Chttp2,TestStreamDoesntLeakWhenItsWriteClosedAndThenReadClosedBeforeStartOfReadingMessageAndStatus)284 TEST(
285 Chttp2,
286 TestStreamDoesntLeakWhenItsWriteClosedAndThenReadClosedBeforeStartOfReadingMessageAndStatus) {
287 grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
288 {
289 // Prevent pings from client to server and server to client, since they can
290 // cause chttp2 to initiate writes and thus dodge the bug we're trying to
291 // repro.
292 auto channel_args =
293 grpc_core::ChannelArgs().Set(GRPC_ARG_HTTP2_BDP_PROBE, 0);
294 TestServer server(cq,
295 const_cast<grpc_channel_args*>(channel_args.ToC().get()));
296 grpc_channel_credentials* creds = grpc_insecure_credentials_create();
297 grpc_channel* channel =
298 grpc_channel_create(absl::StrCat("ipv6:", server.address()).c_str(),
299 creds, channel_args.ToC().get());
300 grpc_channel_credentials_release(creds);
301 grpc_call* call =
302 grpc_channel_create_call(channel, nullptr, GRPC_PROPAGATE_DEFAULTS, cq,
303 grpc_slice_from_static_string("/foo"), nullptr,
304 gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
305 // Start the call. It's important for our repro to close writes before
306 // reading the response, so that the client transport marks the stream
307 // both read and write closed as soon as it reads a status off the wire.
308 StartCallAndCloseWrites(call, cq);
309 // Send a small message from server to client. The message needs to be small
310 // enough such that the client will queue a stream flow control update,
311 // without flushing it out to the wire.
312 server.HandleRpc();
313 // Do some polling to let the client to pick up the message and status off
314 // the wire, *before* it begins the RECV_MESSAGE and RECV_STATUS ops.
315 // The timeout here just needs to be long enough that the client has
316 // most likely reads everything the server sent it by the time it's done.
317 GPR_ASSERT(grpc_completion_queue_next(
318 cq, grpc_timeout_milliseconds_to_deadline(20), nullptr)
319 .type == GRPC_QUEUE_TIMEOUT);
320 // Perform the receive message and status. Note that the incoming bytes
321 // should already be in the client's buffers by the time we start these ops.
322 // Thus, the client should *not* need to urgently send a flow control update
323 // to the server, to ensure progress, and it can simply queue the flow
324 // control update instead.
325 FinishCall(call, cq);
326 grpc_call_unref(call);
327 grpc_channel_destroy(channel);
328 // There should be nothing to prevent stream and transport objects from
329 // shutdown and destruction at this point. So check that this happens.
330 // The timeout is somewhat arbitrary, and is set long enough so that it's
331 // extremely unlikely to be hit due to CPU starvation.
332 EnsureConnectionsArentLeaked(cq);
333 }
334 grpc_completion_queue_shutdown(cq);
335 while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME),
336 nullptr)
337 .type != GRPC_QUEUE_SHUTDOWN) {
338 }
339 grpc_completion_queue_destroy(cq);
340 }
341
342 } // namespace
343
main(int argc,char ** argv)344 int main(int argc, char** argv) {
345 ::testing::InitGoogleTest(&argc, argv);
346 grpc::testing::TestEnvironment env(&argc, argv);
347 grpc_init();
348 g_transport_counter = new TransportCounter();
349 grpc_core::TestOnlySetGlobalHttp2TransportInitCallback(CounterInitCallback);
350 grpc_core::TestOnlySetGlobalHttp2TransportDestructCallback(
351 CounterDestructCallback);
352 auto result = RUN_ALL_TESTS();
353 grpc_shutdown();
354 return result;
355 }
356