• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2022 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <limits.h>
20 #include <stdint.h>
21 #include <stdlib.h>
22 #include <string.h>
23 
24 #include <atomic>
25 #include <memory>
26 #include <string>
27 #include <thread>
28 #include <tuple>
29 
30 #include "absl/base/thread_annotations.h"
31 #include "absl/status/status.h"
32 #include "absl/strings/str_cat.h"
33 #include "absl/strings/string_view.h"
34 #include "absl/time/clock.h"
35 #include "absl/time/time.h"
36 #include "gtest/gtest.h"
37 
38 #include <grpc/grpc.h>
39 #include <grpc/impl/channel_arg_names.h>
40 #include <grpc/slice.h>
41 #include <grpc/slice_buffer.h>
42 #include <grpc/status.h>
43 #include <grpc/support/log.h>
44 #include <grpc/support/port_platform.h>
45 
46 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
47 #include "src/core/ext/transport/chttp2/transport/frame_goaway.h"
48 #include "src/core/ext/transport/chttp2/transport/frame_ping.h"
49 #include "src/core/lib/channel/channel_args.h"
50 #include "src/core/lib/channel/channelz.h"
51 #include "src/core/lib/gpr/useful.h"
52 #include "src/core/lib/gprpp/crash.h"
53 #include "src/core/lib/gprpp/notification.h"
54 #include "src/core/lib/gprpp/sync.h"
55 #include "src/core/lib/iomgr/closure.h"
56 #include "src/core/lib/iomgr/endpoint.h"
57 #include "src/core/lib/iomgr/endpoint_pair.h"
58 #include "src/core/lib/iomgr/error.h"
59 #include "src/core/lib/iomgr/exec_ctx.h"
60 #include "src/core/lib/slice/slice.h"
61 #include "src/core/lib/slice/slice_internal.h"
62 #include "src/core/lib/surface/completion_queue.h"
63 #include "src/core/lib/surface/server.h"
64 #include "test/core/end2end/cq_verifier.h"
65 #include "test/core/util/test_config.h"
66 
67 namespace grpc_core {
68 namespace {
69 
Tag(intptr_t t)70 void* Tag(intptr_t t) { return reinterpret_cast<void*>(t); }
71 
72 class GracefulShutdownTest : public ::testing::Test {
73  protected:
GracefulShutdownTest()74   GracefulShutdownTest() { SetupAndStart(); }
75 
~GracefulShutdownTest()76   ~GracefulShutdownTest() override { ShutdownAndDestroy(); }
77 
78   // Sets up the client and server
SetupAndStart()79   void SetupAndStart() {
80     ExecCtx exec_ctx;
81     cq_ = grpc_completion_queue_create_for_next(nullptr);
82     cqv_ = std::make_unique<CqVerifier>(cq_);
83     grpc_arg server_args[] = {
84         grpc_channel_arg_integer_create(
85             const_cast<char*>(GRPC_ARG_HTTP2_BDP_PROBE), 0),
86         grpc_channel_arg_integer_create(
87             const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS), INT_MAX)};
88     grpc_channel_args server_channel_args = {GPR_ARRAY_SIZE(server_args),
89                                              server_args};
90     // Create server
91     server_ = grpc_server_create(&server_channel_args, nullptr);
92     auto* core_server = Server::FromC(server_);
93     grpc_server_register_completion_queue(server_, cq_, nullptr);
94     grpc_server_start(server_);
95     fds_ = grpc_iomgr_create_endpoint_pair("fixture", nullptr);
96     auto* transport = grpc_create_chttp2_transport(core_server->channel_args(),
97                                                    fds_.server, false);
98     grpc_endpoint_add_to_pollset(fds_.server, grpc_cq_pollset(cq_));
99     GPR_ASSERT(core_server->SetupTransport(transport, nullptr,
100                                            core_server->channel_args(),
101                                            nullptr) == absl::OkStatus());
102     grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
103     // Start polling on the client
104     Notification client_poller_thread_started_notification;
105     client_poll_thread_ = std::make_unique<std::thread>(
106         [this, &client_poller_thread_started_notification]() {
107           grpc_completion_queue* client_cq =
108               grpc_completion_queue_create_for_next(nullptr);
109           {
110             ExecCtx exec_ctx;
111             grpc_endpoint_add_to_pollset(fds_.client,
112                                          grpc_cq_pollset(client_cq));
113             grpc_endpoint_add_to_pollset(fds_.server,
114                                          grpc_cq_pollset(client_cq));
115           }
116           client_poller_thread_started_notification.Notify();
117           while (!shutdown_) {
118             GPR_ASSERT(grpc_completion_queue_next(
119                            client_cq, grpc_timeout_milliseconds_to_deadline(10),
120                            nullptr)
121                            .type == GRPC_QUEUE_TIMEOUT);
122           }
123           grpc_completion_queue_destroy(client_cq);
124         });
125     client_poller_thread_started_notification.WaitForNotification();
126     // Write connection prefix and settings frame
127     constexpr char kPrefix[] =
128         "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n\x00\x00\x00\x04\x00\x00\x00\x00\x00";
129     Write(absl::string_view(kPrefix, sizeof(kPrefix) - 1));
130     // Start reading on the client
131     grpc_slice_buffer_init(&read_buffer_);
132     GRPC_CLOSURE_INIT(&on_read_done_, OnReadDone, this, nullptr);
133     grpc_endpoint_read(fds_.client, &read_buffer_, &on_read_done_, false,
134                        /*min_progress_size=*/1);
135   }
136 
137   // Shuts down and destroys the client and server.
ShutdownAndDestroy()138   void ShutdownAndDestroy() {
139     shutdown_ = true;
140     ExecCtx exec_ctx;
141     grpc_endpoint_shutdown(fds_.client, GRPC_ERROR_CREATE("Client shutdown"));
142     ExecCtx::Get()->Flush();
143     client_poll_thread_->join();
144     GPR_ASSERT(read_end_notification_.WaitForNotificationWithTimeout(
145         absl::Seconds(5)));
146     grpc_endpoint_destroy(fds_.client);
147     ExecCtx::Get()->Flush();
148     // Shutdown and destroy server
149     grpc_server_shutdown_and_notify(server_, cq_, Tag(1000));
150     cqv_->Expect(Tag(1000), true);
151     cqv_->Verify();
152     grpc_server_destroy(server_);
153     cqv_.reset();
154     grpc_completion_queue_destroy(cq_);
155   }
156 
OnReadDone(void * arg,grpc_error_handle error)157   static void OnReadDone(void* arg, grpc_error_handle error) {
158     GracefulShutdownTest* self = static_cast<GracefulShutdownTest*>(arg);
159     if (error.ok()) {
160       {
161         MutexLock lock(&self->mu_);
162         for (size_t i = 0; i < self->read_buffer_.count; ++i) {
163           absl::StrAppend(&self->read_bytes_,
164                           StringViewFromSlice(self->read_buffer_.slices[i]));
165         }
166         self->read_cv_.SignalAll();
167       }
168       grpc_slice_buffer_reset_and_unref(&self->read_buffer_);
169       grpc_endpoint_read(self->fds_.client, &self->read_buffer_,
170                          &self->on_read_done_, false, /*min_progress_size=*/1);
171     } else {
172       grpc_slice_buffer_destroy(&self->read_buffer_);
173       self->read_end_notification_.Notify();
174     }
175   }
176 
177   // Waits for \a bytes to show up in read_bytes_
WaitForReadBytes(absl::string_view bytes)178   void WaitForReadBytes(absl::string_view bytes) {
179     auto start_time = absl::Now();
180     MutexLock lock(&mu_);
181     while (true) {
182       auto where = read_bytes_.find(std::string(bytes));
183       if (where != std::string::npos) {
184         read_bytes_ = read_bytes_.substr(where + bytes.size());
185         break;
186       }
187       ASSERT_LT(absl::Now() - start_time, absl::Seconds(60));
188       read_cv_.WaitWithTimeout(&mu_, absl::Seconds(5));
189     }
190   }
191 
WaitForNBytes(size_t bytes)192   std::string WaitForNBytes(size_t bytes) {
193     auto start_time = absl::Now();
194     MutexLock lock(&mu_);
195     while (read_bytes_.size() < bytes) {
196       EXPECT_LT(absl::Now() - start_time, absl::Seconds(60));
197       read_cv_.WaitWithTimeout(&mu_, absl::Seconds(5));
198     }
199     std::string result = read_bytes_.substr(0, bytes);
200     read_bytes_ = read_bytes_.substr(bytes);
201     return result;
202   }
203 
WaitForClose()204   void WaitForClose() {
205     ASSERT_TRUE(read_end_notification_.WaitForNotificationWithTimeout(
206         absl::Minutes(1)));
207   }
208 
WaitForGoaway(uint32_t last_stream_id,uint32_t error_code=0,grpc_slice slice=grpc_empty_slice ())209   void WaitForGoaway(uint32_t last_stream_id, uint32_t error_code = 0,
210                      grpc_slice slice = grpc_empty_slice()) {
211     grpc_slice_buffer buffer;
212     grpc_slice_buffer_init(&buffer);
213     grpc_chttp2_goaway_append(last_stream_id, error_code, slice, &buffer);
214     std::string expected_bytes;
215     for (size_t i = 0; i < buffer.count; ++i) {
216       absl::StrAppend(&expected_bytes, StringViewFromSlice(buffer.slices[i]));
217     }
218     grpc_slice_buffer_destroy(&buffer);
219     WaitForReadBytes(expected_bytes);
220   }
221 
WaitForPing()222   uint64_t WaitForPing() {
223     grpc_slice ping_slice = grpc_chttp2_ping_create(0, 0);
224     auto whole_ping = StringViewFromSlice(ping_slice);
225     GPR_ASSERT(whole_ping.size() == 9 + 8);
226     WaitForReadBytes(whole_ping.substr(0, 9));
227     std::string ping = WaitForNBytes(8);
228     return (static_cast<uint64_t>(static_cast<uint8_t>(ping[0])) << 56) |
229            (static_cast<uint64_t>(static_cast<uint8_t>(ping[1])) << 48) |
230            (static_cast<uint64_t>(static_cast<uint8_t>(ping[2])) << 40) |
231            (static_cast<uint64_t>(static_cast<uint8_t>(ping[3])) << 32) |
232            (static_cast<uint64_t>(static_cast<uint8_t>(ping[4])) << 24) |
233            (static_cast<uint64_t>(static_cast<uint8_t>(ping[5])) << 16) |
234            (static_cast<uint64_t>(static_cast<uint8_t>(ping[6])) << 8) |
235            (static_cast<uint64_t>(static_cast<uint8_t>(ping[7])));
236   }
237 
SendPingAck(uint64_t opaque_data)238   void SendPingAck(uint64_t opaque_data) {
239     grpc_slice ping_slice = grpc_chttp2_ping_create(1, opaque_data);
240     Write(StringViewFromSlice(ping_slice));
241   }
242 
243   // This is a blocking call. It waits for the write callback to be invoked
244   // before returning. (In other words, do not call this from a thread that
245   // should not be blocked, for example, a polling thread.)
Write(absl::string_view bytes)246   void Write(absl::string_view bytes) {
247     ExecCtx exec_ctx;
248     grpc_slice slice =
249         StaticSlice::FromStaticBuffer(bytes.data(), bytes.size()).TakeCSlice();
250     grpc_slice_buffer buffer;
251     grpc_slice_buffer_init(&buffer);
252     grpc_slice_buffer_add(&buffer, slice);
253     WriteBuffer(&buffer);
254     grpc_slice_buffer_destroy(&buffer);
255   }
256 
WriteBuffer(grpc_slice_buffer * buffer)257   void WriteBuffer(grpc_slice_buffer* buffer) {
258     Notification on_write_done_notification_;
259     GRPC_CLOSURE_INIT(&on_write_done_, OnWriteDone,
260                       &on_write_done_notification_, nullptr);
261     grpc_endpoint_write(fds_.client, buffer, &on_write_done_, nullptr,
262                         /*max_frame_size=*/INT_MAX);
263     ExecCtx::Get()->Flush();
264     GPR_ASSERT(on_write_done_notification_.WaitForNotificationWithTimeout(
265         absl::Seconds(5)));
266   }
267 
OnWriteDone(void * arg,grpc_error_handle error)268   static void OnWriteDone(void* arg, grpc_error_handle error) {
269     if (!error.ok()) {
270       Crash(absl::StrCat("Write failed: ", error.ToString()));
271     }
272     Notification* on_write_done_notification_ = static_cast<Notification*>(arg);
273     on_write_done_notification_->Notify();
274   }
275 
276   grpc_endpoint_pair fds_;
277   grpc_server* server_ = nullptr;
278   grpc_completion_queue* cq_ = nullptr;
279   std::unique_ptr<CqVerifier> cqv_;
280   std::unique_ptr<std::thread> client_poll_thread_;
281   std::atomic<bool> shutdown_{false};
282   grpc_closure on_read_done_;
283   Mutex mu_;
284   CondVar read_cv_;
285   Notification read_end_notification_;
286   grpc_slice_buffer read_buffer_;
287   std::string read_bytes_ ABSL_GUARDED_BY(mu_);
288   grpc_closure on_write_done_;
289 };
290 
TEST_F(GracefulShutdownTest,GracefulGoaway)291 TEST_F(GracefulShutdownTest, GracefulGoaway) {
292   // Initiate shutdown on the server
293   grpc_server_shutdown_and_notify(server_, cq_, Tag(1));
294   // Wait for first goaway
295   WaitForGoaway((1u << 31) - 1);
296   // Wait for the ping
297   uint64_t ping_id = WaitForPing();
298   // Reply to the ping
299   SendPingAck(ping_id);
300   // Wait for final goaway
301   WaitForGoaway(0);
302   // The shutdown should successfully complete.
303   cqv_->Expect(Tag(1), true);
304   cqv_->Verify();
305 }
306 
TEST_F(GracefulShutdownTest,RequestStartedBeforeFinalGoaway)307 TEST_F(GracefulShutdownTest, RequestStartedBeforeFinalGoaway) {
308   grpc_call_error error;
309   grpc_call* s;
310   grpc_call_details call_details;
311   grpc_metadata_array request_metadata_recv;
312   grpc_call_details_init(&call_details);
313   grpc_metadata_array_init(&request_metadata_recv);
314   error = grpc_server_request_call(server_, &s, &call_details,
315                                    &request_metadata_recv, cq_, cq_, Tag(100));
316   GPR_ASSERT(GRPC_CALL_OK == error);
317   // Initiate shutdown on the server
318   grpc_server_shutdown_and_notify(server_, cq_, Tag(1));
319   // Wait for first goaway
320   WaitForGoaway((1u << 31) - 1);
321   // Wait for the ping
322   uint64_t ping_id = WaitForPing();
323   // Start a request
324   constexpr char kRequestFrame[] =
325       "\x00\x00\xbe\x01\x05\x00\x00\x00\x01"
326       "\x10\x05:path\x08/foo/bar"
327       "\x10\x07:scheme\x04http"
328       "\x10\x07:method\x04POST"
329       "\x10\x0a:authority\x09localhost"
330       "\x10\x0c"
331       "content-type\x10"
332       "application/grpc"
333       "\x10\x14grpc-accept-encoding\x15identity,deflate,gzip"
334       "\x10\x02te\x08trailers"
335       "\x10\x0auser-agent\x17grpc-c/0.12.0.0 (linux)";
336   Write(absl::string_view(kRequestFrame, sizeof(kRequestFrame) - 1));
337   // Reply to the ping
338   SendPingAck(ping_id);
339   // Wait for final goaway with last stream ID 1 to show that the HTTP2
340   // transport accepted the stream.
341   WaitForGoaway(1);
342   // TODO(yashykt): The surface layer automatically cancels calls received after
343   // shutdown has been called. Once that is fixed, this should be a success.
344   cqv_->Expect(Tag(100), false);
345   // The shutdown should successfully complete.
346   cqv_->Expect(Tag(1), true);
347   cqv_->Verify();
348   grpc_metadata_array_destroy(&request_metadata_recv);
349   grpc_call_details_destroy(&call_details);
350 }
351 
TEST_F(GracefulShutdownTest,RequestStartedAfterFinalGoawayIsIgnored)352 TEST_F(GracefulShutdownTest, RequestStartedAfterFinalGoawayIsIgnored) {
353   // Start a request before shutdown to make sure that the connection stays
354   // alive.
355   grpc_call_error error;
356   grpc_call* s;
357   grpc_call_details call_details;
358   grpc_metadata_array request_metadata_recv;
359   grpc_call_details_init(&call_details);
360   grpc_metadata_array_init(&request_metadata_recv);
361   error = grpc_server_request_call(server_, &s, &call_details,
362                                    &request_metadata_recv, cq_, cq_, Tag(100));
363   GPR_ASSERT(GRPC_CALL_OK == error);
364   // Send the request from the client.
365   constexpr char kRequestFrame[] =
366       "\x00\x00\xbe\x01\x05\x00\x00\x00\x01"
367       "\x10\x05:path\x08/foo/bar"
368       "\x10\x07:scheme\x04http"
369       "\x10\x07:method\x04POST"
370       "\x10\x0a:authority\x09localhost"
371       "\x10\x0c"
372       "content-type\x10"
373       "application/grpc"
374       "\x10\x14grpc-accept-encoding\x15identity,deflate,gzip"
375       "\x10\x02te\x08trailers"
376       "\x10\x0auser-agent\x17grpc-c/0.12.0.0 (linux)";
377   Write(absl::string_view(kRequestFrame, sizeof(kRequestFrame) - 1));
378   cqv_->Expect(Tag(100), true);
379   cqv_->Verify();
380 
381   // Initiate shutdown on the server
382   grpc_server_shutdown_and_notify(server_, cq_, Tag(1));
383   // Wait for first goaway
384   WaitForGoaway((1u << 31) - 1);
385   // Wait for the ping
386   uint64_t ping_id = WaitForPing();
387   // Reply to the ping
388   SendPingAck(ping_id);
389   // Wait for final goaway
390   WaitForGoaway(1);
391 
392   // Send another request from the client which should be ignored.
393   constexpr char kNewRequestFrame[] =
394       "\x00\x00\xbe\x01\x05\x00\x00\x00\x03"
395       "\x10\x05:path\x08/foo/bar"
396       "\x10\x07:scheme\x04http"
397       "\x10\x07:method\x04POST"
398       "\x10\x0a:authority\x09localhost"
399       "\x10\x0c"
400       "content-type\x10"
401       "application/grpc"
402       "\x10\x14grpc-accept-encoding\x15identity,deflate,gzip"
403       "\x10\x02te\x08trailers"
404       "\x10\x0auser-agent\x17grpc-c/0.12.0.0 (linux)";
405   Write(absl::string_view(kNewRequestFrame, sizeof(kNewRequestFrame) - 1));
406 
407   // Finish the accepted request.
408   grpc_op ops[3];
409   grpc_op* op;
410   memset(ops, 0, sizeof(ops));
411   op = ops;
412   op->op = GRPC_OP_SEND_INITIAL_METADATA;
413   op->data.send_initial_metadata.count = 0;
414   op->flags = 0;
415   op->reserved = nullptr;
416   op++;
417   op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
418   op->data.send_status_from_server.trailing_metadata_count = 0;
419   op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED;
420   grpc_slice status_details = grpc_slice_from_static_string("xyz");
421   op->data.send_status_from_server.status_details = &status_details;
422   op->flags = 0;
423   op->reserved = nullptr;
424   op++;
425   op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
426   int was_cancelled = 2;
427   op->data.recv_close_on_server.cancelled = &was_cancelled;
428   op->flags = 0;
429   op->reserved = nullptr;
430   op++;
431   error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), Tag(101),
432                                 nullptr);
433   GPR_ASSERT(GRPC_CALL_OK == error);
434   cqv_->Expect(Tag(101), true);
435   // The shutdown should successfully complete.
436   cqv_->Expect(Tag(1), true);
437   cqv_->Verify();
438   grpc_call_unref(s);
439   grpc_metadata_array_destroy(&request_metadata_recv);
440   grpc_call_details_destroy(&call_details);
441 }
442 
443 // Make sure that the graceful goaway eventually makes progress even if a client
444 // does not respond to the ping.
TEST_F(GracefulShutdownTest,UnresponsiveClient)445 TEST_F(GracefulShutdownTest, UnresponsiveClient) {
446   absl::Time initial_time = absl::Now();
447   // Initiate shutdown on the server
448   grpc_server_shutdown_and_notify(server_, cq_, Tag(1));
449   // Wait for first goaway
450   WaitForGoaway((1u << 31) - 1);
451   // Wait for the ping
452   std::ignore = WaitForPing();
453   // Wait for final goaway without sending a ping ACK.
454   WaitForClose();
455   EXPECT_GE(absl::Now() - initial_time,
456             absl::Seconds(20) -
457                 absl::Seconds(
458                     1) /* clock skew between threads due to time caching */);
459   // The shutdown should successfully complete.
460   cqv_->Expect(Tag(1), true);
461   cqv_->Verify();
462 }
463 
464 // Test that servers send a GOAWAY with the last stream ID even when the
465 // transport is disconnected without letting Graceful GOAWAY complete
466 // successfully.
TEST_F(GracefulShutdownTest,GoawayReceivedOnServerDisconnect)467 TEST_F(GracefulShutdownTest, GoawayReceivedOnServerDisconnect) {
468   // Initiate shutdown on the server and immediately disconnect.
469   grpc_server_shutdown_and_notify(server_, cq_, Tag(1));
470   grpc_server_cancel_all_calls(server_);
471   // Wait for final goaway.
472   WaitForGoaway(/*last_stream_id=*/0, /*error_code=*/2,
473                 grpc_slice_from_static_string("Cancelling all calls"));
474   // The shutdown should successfully complete.
475   cqv_->Expect(Tag(1), true);
476   cqv_->Verify();
477 }
478 
479 }  // namespace
480 }  // namespace grpc_core
481 
main(int argc,char ** argv)482 int main(int argc, char** argv) {
483   ::testing::InitGoogleTest(&argc, argv);
484   grpc::testing::TestEnvironment env(&argc, argv);
485   grpc_init();
486   int result = RUN_ALL_TESTS();
487   grpc_shutdown();
488   return result;
489 }
490