1 //
2 //
3 // Copyright 2022 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <limits.h>
20 #include <stdint.h>
21 #include <stdlib.h>
22 #include <string.h>
23
24 #include <atomic>
25 #include <memory>
26 #include <string>
27 #include <thread>
28 #include <tuple>
29
30 #include "absl/base/thread_annotations.h"
31 #include "absl/status/status.h"
32 #include "absl/strings/str_cat.h"
33 #include "absl/strings/string_view.h"
34 #include "absl/time/clock.h"
35 #include "absl/time/time.h"
36 #include "gtest/gtest.h"
37
38 #include <grpc/grpc.h>
39 #include <grpc/impl/channel_arg_names.h>
40 #include <grpc/slice.h>
41 #include <grpc/slice_buffer.h>
42 #include <grpc/status.h>
43 #include <grpc/support/log.h>
44 #include <grpc/support/port_platform.h>
45
46 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
47 #include "src/core/ext/transport/chttp2/transport/frame_goaway.h"
48 #include "src/core/ext/transport/chttp2/transport/frame_ping.h"
49 #include "src/core/lib/channel/channel_args.h"
50 #include "src/core/lib/channel/channelz.h"
51 #include "src/core/lib/gpr/useful.h"
52 #include "src/core/lib/gprpp/crash.h"
53 #include "src/core/lib/gprpp/notification.h"
54 #include "src/core/lib/gprpp/sync.h"
55 #include "src/core/lib/iomgr/closure.h"
56 #include "src/core/lib/iomgr/endpoint.h"
57 #include "src/core/lib/iomgr/endpoint_pair.h"
58 #include "src/core/lib/iomgr/error.h"
59 #include "src/core/lib/iomgr/exec_ctx.h"
60 #include "src/core/lib/slice/slice.h"
61 #include "src/core/lib/slice/slice_internal.h"
62 #include "src/core/lib/surface/completion_queue.h"
63 #include "src/core/lib/surface/server.h"
64 #include "test/core/end2end/cq_verifier.h"
65 #include "test/core/util/test_config.h"
66
67 namespace grpc_core {
68 namespace {
69
Tag(intptr_t t)70 void* Tag(intptr_t t) { return reinterpret_cast<void*>(t); }
71
72 class GracefulShutdownTest : public ::testing::Test {
73 protected:
GracefulShutdownTest()74 GracefulShutdownTest() { SetupAndStart(); }
75
~GracefulShutdownTest()76 ~GracefulShutdownTest() override { ShutdownAndDestroy(); }
77
78 // Sets up the client and server
SetupAndStart()79 void SetupAndStart() {
80 ExecCtx exec_ctx;
81 cq_ = grpc_completion_queue_create_for_next(nullptr);
82 cqv_ = std::make_unique<CqVerifier>(cq_);
83 grpc_arg server_args[] = {
84 grpc_channel_arg_integer_create(
85 const_cast<char*>(GRPC_ARG_HTTP2_BDP_PROBE), 0),
86 grpc_channel_arg_integer_create(
87 const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS), INT_MAX)};
88 grpc_channel_args server_channel_args = {GPR_ARRAY_SIZE(server_args),
89 server_args};
90 // Create server
91 server_ = grpc_server_create(&server_channel_args, nullptr);
92 auto* core_server = Server::FromC(server_);
93 grpc_server_register_completion_queue(server_, cq_, nullptr);
94 grpc_server_start(server_);
95 fds_ = grpc_iomgr_create_endpoint_pair("fixture", nullptr);
96 auto* transport = grpc_create_chttp2_transport(core_server->channel_args(),
97 fds_.server, false);
98 grpc_endpoint_add_to_pollset(fds_.server, grpc_cq_pollset(cq_));
99 GPR_ASSERT(core_server->SetupTransport(transport, nullptr,
100 core_server->channel_args(),
101 nullptr) == absl::OkStatus());
102 grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
103 // Start polling on the client
104 Notification client_poller_thread_started_notification;
105 client_poll_thread_ = std::make_unique<std::thread>(
106 [this, &client_poller_thread_started_notification]() {
107 grpc_completion_queue* client_cq =
108 grpc_completion_queue_create_for_next(nullptr);
109 {
110 ExecCtx exec_ctx;
111 grpc_endpoint_add_to_pollset(fds_.client,
112 grpc_cq_pollset(client_cq));
113 grpc_endpoint_add_to_pollset(fds_.server,
114 grpc_cq_pollset(client_cq));
115 }
116 client_poller_thread_started_notification.Notify();
117 while (!shutdown_) {
118 GPR_ASSERT(grpc_completion_queue_next(
119 client_cq, grpc_timeout_milliseconds_to_deadline(10),
120 nullptr)
121 .type == GRPC_QUEUE_TIMEOUT);
122 }
123 grpc_completion_queue_destroy(client_cq);
124 });
125 client_poller_thread_started_notification.WaitForNotification();
126 // Write connection prefix and settings frame
127 constexpr char kPrefix[] =
128 "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n\x00\x00\x00\x04\x00\x00\x00\x00\x00";
129 Write(absl::string_view(kPrefix, sizeof(kPrefix) - 1));
130 // Start reading on the client
131 grpc_slice_buffer_init(&read_buffer_);
132 GRPC_CLOSURE_INIT(&on_read_done_, OnReadDone, this, nullptr);
133 grpc_endpoint_read(fds_.client, &read_buffer_, &on_read_done_, false,
134 /*min_progress_size=*/1);
135 }
136
137 // Shuts down and destroys the client and server.
ShutdownAndDestroy()138 void ShutdownAndDestroy() {
139 shutdown_ = true;
140 ExecCtx exec_ctx;
141 grpc_endpoint_shutdown(fds_.client, GRPC_ERROR_CREATE("Client shutdown"));
142 ExecCtx::Get()->Flush();
143 client_poll_thread_->join();
144 GPR_ASSERT(read_end_notification_.WaitForNotificationWithTimeout(
145 absl::Seconds(5)));
146 grpc_endpoint_destroy(fds_.client);
147 ExecCtx::Get()->Flush();
148 // Shutdown and destroy server
149 grpc_server_shutdown_and_notify(server_, cq_, Tag(1000));
150 cqv_->Expect(Tag(1000), true);
151 cqv_->Verify();
152 grpc_server_destroy(server_);
153 cqv_.reset();
154 grpc_completion_queue_destroy(cq_);
155 }
156
OnReadDone(void * arg,grpc_error_handle error)157 static void OnReadDone(void* arg, grpc_error_handle error) {
158 GracefulShutdownTest* self = static_cast<GracefulShutdownTest*>(arg);
159 if (error.ok()) {
160 {
161 MutexLock lock(&self->mu_);
162 for (size_t i = 0; i < self->read_buffer_.count; ++i) {
163 absl::StrAppend(&self->read_bytes_,
164 StringViewFromSlice(self->read_buffer_.slices[i]));
165 }
166 self->read_cv_.SignalAll();
167 }
168 grpc_slice_buffer_reset_and_unref(&self->read_buffer_);
169 grpc_endpoint_read(self->fds_.client, &self->read_buffer_,
170 &self->on_read_done_, false, /*min_progress_size=*/1);
171 } else {
172 grpc_slice_buffer_destroy(&self->read_buffer_);
173 self->read_end_notification_.Notify();
174 }
175 }
176
177 // Waits for \a bytes to show up in read_bytes_
WaitForReadBytes(absl::string_view bytes)178 void WaitForReadBytes(absl::string_view bytes) {
179 auto start_time = absl::Now();
180 MutexLock lock(&mu_);
181 while (true) {
182 auto where = read_bytes_.find(std::string(bytes));
183 if (where != std::string::npos) {
184 read_bytes_ = read_bytes_.substr(where + bytes.size());
185 break;
186 }
187 ASSERT_LT(absl::Now() - start_time, absl::Seconds(60));
188 read_cv_.WaitWithTimeout(&mu_, absl::Seconds(5));
189 }
190 }
191
WaitForNBytes(size_t bytes)192 std::string WaitForNBytes(size_t bytes) {
193 auto start_time = absl::Now();
194 MutexLock lock(&mu_);
195 while (read_bytes_.size() < bytes) {
196 EXPECT_LT(absl::Now() - start_time, absl::Seconds(60));
197 read_cv_.WaitWithTimeout(&mu_, absl::Seconds(5));
198 }
199 std::string result = read_bytes_.substr(0, bytes);
200 read_bytes_ = read_bytes_.substr(bytes);
201 return result;
202 }
203
WaitForClose()204 void WaitForClose() {
205 ASSERT_TRUE(read_end_notification_.WaitForNotificationWithTimeout(
206 absl::Minutes(1)));
207 }
208
WaitForGoaway(uint32_t last_stream_id,uint32_t error_code=0,grpc_slice slice=grpc_empty_slice ())209 void WaitForGoaway(uint32_t last_stream_id, uint32_t error_code = 0,
210 grpc_slice slice = grpc_empty_slice()) {
211 grpc_slice_buffer buffer;
212 grpc_slice_buffer_init(&buffer);
213 grpc_chttp2_goaway_append(last_stream_id, error_code, slice, &buffer);
214 std::string expected_bytes;
215 for (size_t i = 0; i < buffer.count; ++i) {
216 absl::StrAppend(&expected_bytes, StringViewFromSlice(buffer.slices[i]));
217 }
218 grpc_slice_buffer_destroy(&buffer);
219 WaitForReadBytes(expected_bytes);
220 }
221
WaitForPing()222 uint64_t WaitForPing() {
223 grpc_slice ping_slice = grpc_chttp2_ping_create(0, 0);
224 auto whole_ping = StringViewFromSlice(ping_slice);
225 GPR_ASSERT(whole_ping.size() == 9 + 8);
226 WaitForReadBytes(whole_ping.substr(0, 9));
227 std::string ping = WaitForNBytes(8);
228 return (static_cast<uint64_t>(static_cast<uint8_t>(ping[0])) << 56) |
229 (static_cast<uint64_t>(static_cast<uint8_t>(ping[1])) << 48) |
230 (static_cast<uint64_t>(static_cast<uint8_t>(ping[2])) << 40) |
231 (static_cast<uint64_t>(static_cast<uint8_t>(ping[3])) << 32) |
232 (static_cast<uint64_t>(static_cast<uint8_t>(ping[4])) << 24) |
233 (static_cast<uint64_t>(static_cast<uint8_t>(ping[5])) << 16) |
234 (static_cast<uint64_t>(static_cast<uint8_t>(ping[6])) << 8) |
235 (static_cast<uint64_t>(static_cast<uint8_t>(ping[7])));
236 }
237
SendPingAck(uint64_t opaque_data)238 void SendPingAck(uint64_t opaque_data) {
239 grpc_slice ping_slice = grpc_chttp2_ping_create(1, opaque_data);
240 Write(StringViewFromSlice(ping_slice));
241 }
242
243 // This is a blocking call. It waits for the write callback to be invoked
244 // before returning. (In other words, do not call this from a thread that
245 // should not be blocked, for example, a polling thread.)
Write(absl::string_view bytes)246 void Write(absl::string_view bytes) {
247 ExecCtx exec_ctx;
248 grpc_slice slice =
249 StaticSlice::FromStaticBuffer(bytes.data(), bytes.size()).TakeCSlice();
250 grpc_slice_buffer buffer;
251 grpc_slice_buffer_init(&buffer);
252 grpc_slice_buffer_add(&buffer, slice);
253 WriteBuffer(&buffer);
254 grpc_slice_buffer_destroy(&buffer);
255 }
256
WriteBuffer(grpc_slice_buffer * buffer)257 void WriteBuffer(grpc_slice_buffer* buffer) {
258 Notification on_write_done_notification_;
259 GRPC_CLOSURE_INIT(&on_write_done_, OnWriteDone,
260 &on_write_done_notification_, nullptr);
261 grpc_endpoint_write(fds_.client, buffer, &on_write_done_, nullptr,
262 /*max_frame_size=*/INT_MAX);
263 ExecCtx::Get()->Flush();
264 GPR_ASSERT(on_write_done_notification_.WaitForNotificationWithTimeout(
265 absl::Seconds(5)));
266 }
267
OnWriteDone(void * arg,grpc_error_handle error)268 static void OnWriteDone(void* arg, grpc_error_handle error) {
269 if (!error.ok()) {
270 Crash(absl::StrCat("Write failed: ", error.ToString()));
271 }
272 Notification* on_write_done_notification_ = static_cast<Notification*>(arg);
273 on_write_done_notification_->Notify();
274 }
275
276 grpc_endpoint_pair fds_;
277 grpc_server* server_ = nullptr;
278 grpc_completion_queue* cq_ = nullptr;
279 std::unique_ptr<CqVerifier> cqv_;
280 std::unique_ptr<std::thread> client_poll_thread_;
281 std::atomic<bool> shutdown_{false};
282 grpc_closure on_read_done_;
283 Mutex mu_;
284 CondVar read_cv_;
285 Notification read_end_notification_;
286 grpc_slice_buffer read_buffer_;
287 std::string read_bytes_ ABSL_GUARDED_BY(mu_);
288 grpc_closure on_write_done_;
289 };
290
TEST_F(GracefulShutdownTest,GracefulGoaway)291 TEST_F(GracefulShutdownTest, GracefulGoaway) {
292 // Initiate shutdown on the server
293 grpc_server_shutdown_and_notify(server_, cq_, Tag(1));
294 // Wait for first goaway
295 WaitForGoaway((1u << 31) - 1);
296 // Wait for the ping
297 uint64_t ping_id = WaitForPing();
298 // Reply to the ping
299 SendPingAck(ping_id);
300 // Wait for final goaway
301 WaitForGoaway(0);
302 // The shutdown should successfully complete.
303 cqv_->Expect(Tag(1), true);
304 cqv_->Verify();
305 }
306
TEST_F(GracefulShutdownTest,RequestStartedBeforeFinalGoaway)307 TEST_F(GracefulShutdownTest, RequestStartedBeforeFinalGoaway) {
308 grpc_call_error error;
309 grpc_call* s;
310 grpc_call_details call_details;
311 grpc_metadata_array request_metadata_recv;
312 grpc_call_details_init(&call_details);
313 grpc_metadata_array_init(&request_metadata_recv);
314 error = grpc_server_request_call(server_, &s, &call_details,
315 &request_metadata_recv, cq_, cq_, Tag(100));
316 GPR_ASSERT(GRPC_CALL_OK == error);
317 // Initiate shutdown on the server
318 grpc_server_shutdown_and_notify(server_, cq_, Tag(1));
319 // Wait for first goaway
320 WaitForGoaway((1u << 31) - 1);
321 // Wait for the ping
322 uint64_t ping_id = WaitForPing();
323 // Start a request
324 constexpr char kRequestFrame[] =
325 "\x00\x00\xbe\x01\x05\x00\x00\x00\x01"
326 "\x10\x05:path\x08/foo/bar"
327 "\x10\x07:scheme\x04http"
328 "\x10\x07:method\x04POST"
329 "\x10\x0a:authority\x09localhost"
330 "\x10\x0c"
331 "content-type\x10"
332 "application/grpc"
333 "\x10\x14grpc-accept-encoding\x15identity,deflate,gzip"
334 "\x10\x02te\x08trailers"
335 "\x10\x0auser-agent\x17grpc-c/0.12.0.0 (linux)";
336 Write(absl::string_view(kRequestFrame, sizeof(kRequestFrame) - 1));
337 // Reply to the ping
338 SendPingAck(ping_id);
339 // Wait for final goaway with last stream ID 1 to show that the HTTP2
340 // transport accepted the stream.
341 WaitForGoaway(1);
342 // TODO(yashykt): The surface layer automatically cancels calls received after
343 // shutdown has been called. Once that is fixed, this should be a success.
344 cqv_->Expect(Tag(100), false);
345 // The shutdown should successfully complete.
346 cqv_->Expect(Tag(1), true);
347 cqv_->Verify();
348 grpc_metadata_array_destroy(&request_metadata_recv);
349 grpc_call_details_destroy(&call_details);
350 }
351
TEST_F(GracefulShutdownTest,RequestStartedAfterFinalGoawayIsIgnored)352 TEST_F(GracefulShutdownTest, RequestStartedAfterFinalGoawayIsIgnored) {
353 // Start a request before shutdown to make sure that the connection stays
354 // alive.
355 grpc_call_error error;
356 grpc_call* s;
357 grpc_call_details call_details;
358 grpc_metadata_array request_metadata_recv;
359 grpc_call_details_init(&call_details);
360 grpc_metadata_array_init(&request_metadata_recv);
361 error = grpc_server_request_call(server_, &s, &call_details,
362 &request_metadata_recv, cq_, cq_, Tag(100));
363 GPR_ASSERT(GRPC_CALL_OK == error);
364 // Send the request from the client.
365 constexpr char kRequestFrame[] =
366 "\x00\x00\xbe\x01\x05\x00\x00\x00\x01"
367 "\x10\x05:path\x08/foo/bar"
368 "\x10\x07:scheme\x04http"
369 "\x10\x07:method\x04POST"
370 "\x10\x0a:authority\x09localhost"
371 "\x10\x0c"
372 "content-type\x10"
373 "application/grpc"
374 "\x10\x14grpc-accept-encoding\x15identity,deflate,gzip"
375 "\x10\x02te\x08trailers"
376 "\x10\x0auser-agent\x17grpc-c/0.12.0.0 (linux)";
377 Write(absl::string_view(kRequestFrame, sizeof(kRequestFrame) - 1));
378 cqv_->Expect(Tag(100), true);
379 cqv_->Verify();
380
381 // Initiate shutdown on the server
382 grpc_server_shutdown_and_notify(server_, cq_, Tag(1));
383 // Wait for first goaway
384 WaitForGoaway((1u << 31) - 1);
385 // Wait for the ping
386 uint64_t ping_id = WaitForPing();
387 // Reply to the ping
388 SendPingAck(ping_id);
389 // Wait for final goaway
390 WaitForGoaway(1);
391
392 // Send another request from the client which should be ignored.
393 constexpr char kNewRequestFrame[] =
394 "\x00\x00\xbe\x01\x05\x00\x00\x00\x03"
395 "\x10\x05:path\x08/foo/bar"
396 "\x10\x07:scheme\x04http"
397 "\x10\x07:method\x04POST"
398 "\x10\x0a:authority\x09localhost"
399 "\x10\x0c"
400 "content-type\x10"
401 "application/grpc"
402 "\x10\x14grpc-accept-encoding\x15identity,deflate,gzip"
403 "\x10\x02te\x08trailers"
404 "\x10\x0auser-agent\x17grpc-c/0.12.0.0 (linux)";
405 Write(absl::string_view(kNewRequestFrame, sizeof(kNewRequestFrame) - 1));
406
407 // Finish the accepted request.
408 grpc_op ops[3];
409 grpc_op* op;
410 memset(ops, 0, sizeof(ops));
411 op = ops;
412 op->op = GRPC_OP_SEND_INITIAL_METADATA;
413 op->data.send_initial_metadata.count = 0;
414 op->flags = 0;
415 op->reserved = nullptr;
416 op++;
417 op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
418 op->data.send_status_from_server.trailing_metadata_count = 0;
419 op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED;
420 grpc_slice status_details = grpc_slice_from_static_string("xyz");
421 op->data.send_status_from_server.status_details = &status_details;
422 op->flags = 0;
423 op->reserved = nullptr;
424 op++;
425 op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
426 int was_cancelled = 2;
427 op->data.recv_close_on_server.cancelled = &was_cancelled;
428 op->flags = 0;
429 op->reserved = nullptr;
430 op++;
431 error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), Tag(101),
432 nullptr);
433 GPR_ASSERT(GRPC_CALL_OK == error);
434 cqv_->Expect(Tag(101), true);
435 // The shutdown should successfully complete.
436 cqv_->Expect(Tag(1), true);
437 cqv_->Verify();
438 grpc_call_unref(s);
439 grpc_metadata_array_destroy(&request_metadata_recv);
440 grpc_call_details_destroy(&call_details);
441 }
442
443 // Make sure that the graceful goaway eventually makes progress even if a client
444 // does not respond to the ping.
TEST_F(GracefulShutdownTest,UnresponsiveClient)445 TEST_F(GracefulShutdownTest, UnresponsiveClient) {
446 absl::Time initial_time = absl::Now();
447 // Initiate shutdown on the server
448 grpc_server_shutdown_and_notify(server_, cq_, Tag(1));
449 // Wait for first goaway
450 WaitForGoaway((1u << 31) - 1);
451 // Wait for the ping
452 std::ignore = WaitForPing();
453 // Wait for final goaway without sending a ping ACK.
454 WaitForClose();
455 EXPECT_GE(absl::Now() - initial_time,
456 absl::Seconds(20) -
457 absl::Seconds(
458 1) /* clock skew between threads due to time caching */);
459 // The shutdown should successfully complete.
460 cqv_->Expect(Tag(1), true);
461 cqv_->Verify();
462 }
463
464 // Test that servers send a GOAWAY with the last stream ID even when the
465 // transport is disconnected without letting Graceful GOAWAY complete
466 // successfully.
TEST_F(GracefulShutdownTest,GoawayReceivedOnServerDisconnect)467 TEST_F(GracefulShutdownTest, GoawayReceivedOnServerDisconnect) {
468 // Initiate shutdown on the server and immediately disconnect.
469 grpc_server_shutdown_and_notify(server_, cq_, Tag(1));
470 grpc_server_cancel_all_calls(server_);
471 // Wait for final goaway.
472 WaitForGoaway(/*last_stream_id=*/0, /*error_code=*/2,
473 grpc_slice_from_static_string("Cancelling all calls"));
474 // The shutdown should successfully complete.
475 cqv_->Expect(Tag(1), true);
476 cqv_->Verify();
477 }
478
479 } // namespace
480 } // namespace grpc_core
481
main(int argc,char ** argv)482 int main(int argc, char** argv) {
483 ::testing::InitGoogleTest(&argc, argv);
484 grpc::testing::TestEnvironment env(&argc, argv);
485 grpc_init();
486 int result = RUN_ALL_TESTS();
487 grpc_shutdown();
488 return result;
489 }
490