• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2020 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <string.h>
20 
21 #include <algorithm>
22 #include <map>
23 #include <string>
24 #include <utility>
25 #include <vector>
26 
27 #include "absl/status/status.h"
28 #include "absl/status/statusor.h"
29 #include "absl/strings/str_cat.h"
30 #include "absl/time/time.h"
31 #include "gtest/gtest.h"
32 
33 #include <grpc/byte_buffer.h>
34 #include <grpc/grpc.h>
35 #include <grpc/grpc_security.h>
36 #include <grpc/impl/channel_arg_names.h>
37 #include <grpc/impl/propagation_bits.h>
38 #include <grpc/slice.h>
39 #include <grpc/status.h>
40 #include <grpc/support/log.h>
41 #include <grpc/support/port_platform.h>
42 #include <grpc/support/time.h>
43 
44 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
45 #include "src/core/lib/address_utils/parse_address.h"
46 #include "src/core/lib/channel/channel_args.h"
47 #include "src/core/lib/gpr/useful.h"
48 #include "src/core/lib/gprpp/host_port.h"
49 #include "src/core/lib/gprpp/ref_counted_ptr.h"
50 #include "src/core/lib/gprpp/sync.h"
51 #include "src/core/lib/gprpp/time.h"
52 #include "src/core/lib/iomgr/exec_ctx.h"
53 #include "src/core/lib/iomgr/resolved_address.h"
54 #include "src/core/lib/surface/channel.h"
55 #include "src/core/lib/uri/uri_parser.h"
56 #include "src/core/resolver/endpoint_addresses.h"
57 #include "src/core/resolver/fake/fake_resolver.h"
58 #include "src/core/resolver/resolver.h"
59 #include "test/core/end2end/cq_verifier.h"
60 #include "test/core/util/port.h"
61 #include "test/core/util/resolve_localhost_ip46.h"
62 #include "test/core/util/test_config.h"
63 
64 namespace {
65 
66 class TransportCounter {
67  public:
CounterInitCallback()68   static void CounterInitCallback() {
69     grpc_core::MutexLock lock(&mu());
70     ++count_;
71   }
72 
CounterDestructCallback()73   static void CounterDestructCallback() {
74     grpc_core::MutexLock lock(&mu());
75     if (--count_ == 0) {
76       cv().SignalAll();
77     }
78   }
79 
WaitForTransportsToBeDestroyed()80   static void WaitForTransportsToBeDestroyed() {
81     grpc_core::MutexLock lock(&mu());
82     while (count_ != 0) {
83       ASSERT_FALSE(cv().WaitWithTimeout(&mu(), absl::Seconds(10)));
84     }
85   }
86 
count()87   static int count() {
88     grpc_core::MutexLock lock(&mu());
89     return count_;
90   }
91 
mu()92   static grpc_core::Mutex& mu() {
93     static grpc_core::Mutex* mu = new grpc_core::Mutex();
94     return *mu;
95   }
96 
cv()97   static grpc_core::CondVar& cv() {
98     static grpc_core::CondVar* cv = new grpc_core::CondVar();
99     return *cv;
100   }
101 
102  private:
103   static int count_;
104 };
105 
106 int TransportCounter::count_ = 0;
107 
108 // Perform a simple RPC where the server cancels the request with
109 // grpc_call_cancel_with_status
PerformCall(grpc_channel * channel,grpc_server * server,grpc_completion_queue * cq)110 grpc_status_code PerformCall(grpc_channel* channel, grpc_server* server,
111                              grpc_completion_queue* cq) {
112   grpc_call* c;
113   grpc_call* s;
114   grpc_core::CqVerifier cqv(cq);
115   grpc_op ops[6];
116   grpc_op* op;
117   grpc_metadata_array initial_metadata_recv;
118   grpc_metadata_array trailing_metadata_recv;
119   grpc_metadata_array request_metadata_recv;
120   grpc_call_details call_details;
121   grpc_status_code status;
122   grpc_call_error error;
123   grpc_slice details;
124   gpr_timespec deadline = grpc_timeout_seconds_to_deadline(5);
125   // Start a call
126   c = grpc_channel_create_call(channel, nullptr, GRPC_PROPAGATE_DEFAULTS, cq,
127                                grpc_slice_from_static_string("/foo"), nullptr,
128                                deadline, nullptr);
129   GPR_ASSERT(c);
130   grpc_metadata_array_init(&initial_metadata_recv);
131   grpc_metadata_array_init(&trailing_metadata_recv);
132   grpc_metadata_array_init(&request_metadata_recv);
133   grpc_call_details_init(&call_details);
134   memset(ops, 0, sizeof(ops));
135   op = ops;
136   op->op = GRPC_OP_SEND_INITIAL_METADATA;
137   op->data.send_initial_metadata.count = 0;
138   op->flags = 0;
139   op->reserved = nullptr;
140   op++;
141   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
142   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
143   op->data.recv_status_on_client.status = &status;
144   op->data.recv_status_on_client.status_details = &details;
145   op->flags = 0;
146   op->reserved = nullptr;
147   op++;
148   op->op = GRPC_OP_RECV_INITIAL_METADATA;
149   op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
150   op->flags = 0;
151   op->reserved = nullptr;
152   op++;
153   error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops),
154                                 grpc_core::CqVerifier::tag(1), nullptr);
155   GPR_ASSERT(GRPC_CALL_OK == error);
156   // Request a call on the server
157   error = grpc_server_request_call(server, &s, &call_details,
158                                    &request_metadata_recv, cq, cq,
159                                    grpc_core::CqVerifier::tag(101));
160   GPR_ASSERT(GRPC_CALL_OK == error);
161   cqv.Expect(grpc_core::CqVerifier::tag(101), true);
162   cqv.Verify();
163   grpc_call_cancel_with_status(s, GRPC_STATUS_PERMISSION_DENIED, "test status",
164                                nullptr);
165   cqv.Expect(grpc_core::CqVerifier::tag(1), true);
166   cqv.Verify();
167   // cleanup
168   grpc_slice_unref(details);
169   grpc_metadata_array_destroy(&initial_metadata_recv);
170   grpc_metadata_array_destroy(&trailing_metadata_recv);
171   grpc_metadata_array_destroy(&request_metadata_recv);
172   grpc_call_details_destroy(&call_details);
173   grpc_call_unref(c);
174   grpc_call_unref(s);
175   return status;
176 }
177 
178 // Test that sending a lot of RPCs that are cancelled by the server doesn't
179 // result in too many pings due to the pings sent by BDP.
TEST(TooManyPings,TestLotsOfServerCancelledRpcsDoesntGiveTooManyPings)180 TEST(TooManyPings, TestLotsOfServerCancelledRpcsDoesntGiveTooManyPings) {
181   grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
182   // create the server
183   grpc_server* server = grpc_server_create(nullptr, nullptr);
184   std::string server_address = grpc_core::JoinHostPort(
185       grpc_core::LocalIp(), grpc_pick_unused_port_or_die());
186   grpc_server_register_completion_queue(server, cq, nullptr);
187   grpc_server_credentials* server_creds =
188       grpc_insecure_server_credentials_create();
189   GPR_ASSERT(
190       grpc_server_add_http2_port(server, server_address.c_str(), server_creds));
191   grpc_server_credentials_release(server_creds);
192   grpc_server_start(server);
193   // create the channel (bdp pings are enabled by default)
194   grpc_channel_credentials* creds = grpc_insecure_credentials_create();
195   grpc_channel* channel = grpc_channel_create(server_address.c_str(), creds,
196                                               nullptr /* channel args */);
197   grpc_channel_credentials_release(creds);
198   std::map<grpc_status_code, int> statuses_and_counts;
199   const int kNumTotalRpcs = 100;
200   // perform an RPC
201   gpr_log(GPR_INFO,
202           "Performing %d total RPCs and expecting them all to receive status "
203           "PERMISSION_DENIED (%d)",
204           kNumTotalRpcs, GRPC_STATUS_PERMISSION_DENIED);
205   for (int i = 0; i < kNumTotalRpcs; i++) {
206     grpc_status_code status = PerformCall(channel, server, cq);
207     statuses_and_counts[status] += 1;
208   }
209   int num_not_cancelled = 0;
210   for (auto itr = statuses_and_counts.begin(); itr != statuses_and_counts.end();
211        itr++) {
212     if (itr->first != GRPC_STATUS_PERMISSION_DENIED) {
213       num_not_cancelled += itr->second;
214     }
215     gpr_log(GPR_INFO, "%d / %d RPCs received status code: %d", itr->second,
216             kNumTotalRpcs, itr->first);
217   }
218   if (num_not_cancelled > 0) {
219     gpr_log(GPR_ERROR,
220             "Expected all RPCs to receive status PERMISSION_DENIED (%d) but %d "
221             "received other status codes",
222             GRPC_STATUS_PERMISSION_DENIED, num_not_cancelled);
223     FAIL();
224   }
225   // shutdown and destroy the client and server
226   grpc_channel_destroy(channel);
227   grpc_server_shutdown_and_notify(server, cq, nullptr);
228   grpc_completion_queue_shutdown(cq);
229   while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME),
230                                     nullptr)
231              .type != GRPC_QUEUE_SHUTDOWN) {
232   }
233   grpc_server_destroy(server);
234   grpc_completion_queue_destroy(cq);
235 }
236 
237 // Perform a simple RPC where the client makes a request, and both the client
238 // and server continue reading so that gRPC can send and receive keepalive
239 // pings.
PerformWaitingCall(grpc_channel * channel,grpc_server * server,grpc_completion_queue * cq)240 grpc_status_code PerformWaitingCall(grpc_channel* channel, grpc_server* server,
241                                     grpc_completion_queue* cq) {
242   grpc_call* c;
243   grpc_call* s;
244   grpc_core::CqVerifier cqv(cq);
245   grpc_op ops[6];
246   grpc_op* op;
247   grpc_metadata_array initial_metadata_recv;
248   grpc_metadata_array trailing_metadata_recv;
249   grpc_metadata_array request_metadata_recv;
250   grpc_call_details call_details;
251   grpc_status_code status;
252   grpc_call_error error;
253   grpc_slice details;
254   gpr_timespec deadline = grpc_timeout_seconds_to_deadline(30);
255   // Start a call
256   c = grpc_channel_create_call(channel, nullptr, GRPC_PROPAGATE_DEFAULTS, cq,
257                                grpc_slice_from_static_string("/foo"), nullptr,
258                                deadline, nullptr);
259   GPR_ASSERT(c);
260   grpc_metadata_array_init(&initial_metadata_recv);
261   grpc_metadata_array_init(&trailing_metadata_recv);
262   grpc_metadata_array_init(&request_metadata_recv);
263   grpc_call_details_init(&call_details);
264   memset(ops, 0, sizeof(ops));
265   op = ops;
266   op->op = GRPC_OP_SEND_INITIAL_METADATA;
267   op->data.send_initial_metadata.count = 0;
268   op->flags = 0;
269   op->reserved = nullptr;
270   op++;
271   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
272   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
273   op->data.recv_status_on_client.status = &status;
274   op->data.recv_status_on_client.status_details = &details;
275   op->flags = 0;
276   op->reserved = nullptr;
277   op++;
278   op->op = GRPC_OP_RECV_INITIAL_METADATA;
279   op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
280   op->flags = 0;
281   op->reserved = nullptr;
282   op++;
283   error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops),
284                                 grpc_core::CqVerifier::tag(1), nullptr);
285   GPR_ASSERT(GRPC_CALL_OK == error);
286   // Request a call on the server
287   error = grpc_server_request_call(server, &s, &call_details,
288                                    &request_metadata_recv, cq, cq,
289                                    grpc_core::CqVerifier::tag(101));
290   GPR_ASSERT(GRPC_CALL_OK == error);
291   cqv.Expect(grpc_core::CqVerifier::tag(101), true);
292   cqv.Verify();
293   // Since the server is configured to allow only a single ping strike, it would
294   // take 3 pings to trigger the GOAWAY frame with "too_many_pings" from the
295   // server. (The second ping from the client would be the first bad ping sent
296   // too quickly leading to a ping strike and the third ping would lead to the
297   // GOAWAY.) If the client settings match with the server's settings, there
298   // won't be a bad ping, and the call will end due to the deadline expiring
299   // instead.
300   cqv.Expect(grpc_core::CqVerifier::tag(1), true);
301   // The call will end after this
302   cqv.Verify(grpc_core::Duration::Seconds(60));
303   // cleanup
304   grpc_slice_unref(details);
305   grpc_metadata_array_destroy(&initial_metadata_recv);
306   grpc_metadata_array_destroy(&trailing_metadata_recv);
307   grpc_metadata_array_destroy(&request_metadata_recv);
308   grpc_call_details_destroy(&call_details);
309   grpc_call_unref(c);
310   grpc_call_unref(s);
311   return status;
312 }
313 
314 // Shuts down and destroys the server.
ServerShutdownAndDestroy(grpc_server * server,grpc_completion_queue * cq)315 void ServerShutdownAndDestroy(grpc_server* server, grpc_completion_queue* cq) {
316   // Shutdown and destroy server
317   grpc_server_shutdown_and_notify(server, cq, reinterpret_cast<void*>(1000));
318   while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME),
319                                     nullptr)
320              .tag != reinterpret_cast<void*>(1000)) {
321   }
322   grpc_server_destroy(server);
323 }
324 
VerifyChannelReady(grpc_channel * channel,grpc_completion_queue * cq)325 void VerifyChannelReady(grpc_channel* channel, grpc_completion_queue* cq) {
326   grpc_connectivity_state state =
327       grpc_channel_check_connectivity_state(channel, 1 /* try_to_connect */);
328   while (state != GRPC_CHANNEL_READY) {
329     grpc_channel_watch_connectivity_state(
330         channel, state, grpc_timeout_seconds_to_deadline(5), cq, nullptr);
331     grpc_completion_queue_next(cq, grpc_timeout_seconds_to_deadline(5),
332                                nullptr);
333     state = grpc_channel_check_connectivity_state(channel, 0);
334   }
335 }
336 
VerifyChannelDisconnected(grpc_channel * channel,grpc_completion_queue * cq)337 void VerifyChannelDisconnected(grpc_channel* channel,
338                                grpc_completion_queue* cq) {
339   // Verify channel gets disconnected. Use a ping to make sure that clients
340   // tries sending/receiving bytes if the channel is connected.
341   grpc_channel_ping(channel, cq, reinterpret_cast<void*>(2000), nullptr);
342   grpc_event ev = grpc_completion_queue_next(
343       cq, grpc_timeout_seconds_to_deadline(5), nullptr);
344   GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
345   GPR_ASSERT(ev.tag == reinterpret_cast<void*>(2000));
346   GPR_ASSERT(ev.success == 0);
347   // We are intentionally not checking the connectivity state since it is
348   // propagated in an asynchronous manner which means that we might see an older
349   // state. We would eventually get the correct state, but since we have already
350   // verified that the ping has failed, checking the connectivity state is not
351   // necessary.
352 }
353 
354 class KeepaliveThrottlingTest : public ::testing::Test {
355  protected:
356   // Starts the server and makes sure that the channel is able to get connected.
ServerStart(const char * addr,grpc_completion_queue * cq)357   grpc_server* ServerStart(const char* addr, grpc_completion_queue* cq) {
358     // Set up server channel args to expect pings at an interval of 5 seconds
359     // and use a single ping strike
360     grpc_arg server_args[] = {
361         grpc_channel_arg_integer_create(
362             const_cast<char*>(
363                 GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS),
364             5 * 1000),
365         grpc_channel_arg_integer_create(
366             const_cast<char*>(GRPC_ARG_HTTP2_MAX_PING_STRIKES), 1)};
367     grpc_channel_args server_channel_args = {GPR_ARRAY_SIZE(server_args),
368                                              server_args};
369     // Create server
370     grpc_server* server = grpc_server_create(&server_channel_args, nullptr);
371     grpc_server_register_completion_queue(server, cq, nullptr);
372     grpc_server_credentials* server_creds =
373         grpc_insecure_server_credentials_create();
374     GPR_ASSERT(grpc_server_add_http2_port(server, addr, server_creds));
375     grpc_server_credentials_release(server_creds);
376     grpc_server_start(server);
377     return server;
378   }
379 };
380 
TEST_F(KeepaliveThrottlingTest,KeepaliveThrottlingMultipleChannels)381 TEST_F(KeepaliveThrottlingTest, KeepaliveThrottlingMultipleChannels) {
382   grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
383   std::string server_address = grpc_core::JoinHostPort(
384       grpc_core::LocalIp(), grpc_pick_unused_port_or_die());
385   grpc_server* server = ServerStart(server_address.c_str(), cq);
386   // create two channel with a keepalive ping interval of 1 second.
387   grpc_arg client_args[] = {
388       grpc_channel_arg_integer_create(
389           const_cast<char*>(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA), 0),
390       grpc_channel_arg_integer_create(
391           const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS), 1 * 1000),
392       grpc_channel_arg_integer_create(
393           const_cast<char*>(GRPC_ARG_HTTP2_BDP_PROBE), 0)};
394   grpc_channel_args client_channel_args = {GPR_ARRAY_SIZE(client_args),
395                                            client_args};
396   grpc_channel_credentials* creds = grpc_insecure_credentials_create();
397   grpc_channel* channel =
398       grpc_channel_create(server_address.c_str(), creds, &client_channel_args);
399   grpc_channel* channel_dup =
400       grpc_channel_create(server_address.c_str(), creds, &client_channel_args);
401   grpc_channel_credentials_release(creds);
402   int expected_keepalive_time_sec = 1;
403   // We need 3 GOAWAY frames to throttle the keepalive time from 1 second to 8
404   // seconds (> 5sec).
405   for (int i = 0; i < 3; i++) {
406     gpr_log(GPR_INFO, "Expected keepalive time : %d",
407             expected_keepalive_time_sec);
408     EXPECT_EQ(PerformWaitingCall(channel, server, cq), GRPC_STATUS_UNAVAILABLE);
409     expected_keepalive_time_sec *= 2;
410   }
411   gpr_log(
412       GPR_INFO,
413       "Client keepalive time %d should now be in sync with the server settings",
414       expected_keepalive_time_sec);
415   EXPECT_EQ(PerformWaitingCall(channel, server, cq),
416             GRPC_STATUS_DEADLINE_EXCEEDED);
417   // Since the subchannel is shared, the second channel should also have
418   // keepalive settings in sync with the server.
419   gpr_log(GPR_INFO, "Now testing second channel sharing the same subchannel");
420   EXPECT_EQ(PerformWaitingCall(channel_dup, server, cq),
421             GRPC_STATUS_DEADLINE_EXCEEDED);
422   // shutdown and destroy the client and server
423   grpc_channel_destroy(channel);
424   grpc_channel_destroy(channel_dup);
425   ServerShutdownAndDestroy(server, cq);
426   grpc_completion_queue_shutdown(cq);
427   while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME),
428                                     nullptr)
429              .type != GRPC_QUEUE_SHUTDOWN) {
430   }
431   grpc_completion_queue_destroy(cq);
432 }
433 
BuildResolverResult(const std::vector<std::string> & addresses)434 grpc_core::Resolver::Result BuildResolverResult(
435     const std::vector<std::string>& addresses) {
436   grpc_core::Resolver::Result result;
437   result.addresses = grpc_core::EndpointAddressesList();
438   for (const auto& address_str : addresses) {
439     absl::StatusOr<grpc_core::URI> uri = grpc_core::URI::Parse(address_str);
440     if (!uri.ok()) {
441       gpr_log(GPR_ERROR, "Failed to parse uri. Error: %s",
442               uri.status().ToString().c_str());
443       GPR_ASSERT(uri.ok());
444     }
445     grpc_resolved_address address;
446     GPR_ASSERT(grpc_parse_uri(*uri, &address));
447     result.addresses->emplace_back(address, grpc_core::ChannelArgs());
448   }
449   return result;
450 }
451 
452 // Tests that when new subchannels are created due to a change in resolved
453 // addresses, the new subchannels use the updated keepalive time.
TEST_F(KeepaliveThrottlingTest,NewSubchannelsUseUpdatedKeepaliveTime)454 TEST_F(KeepaliveThrottlingTest, NewSubchannelsUseUpdatedKeepaliveTime) {
455   grpc_core::ExecCtx exec_ctx;
456   grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
457   std::string server_address1 = grpc_core::JoinHostPort(
458       grpc_core::LocalIp(), grpc_pick_unused_port_or_die());
459   std::string server_address2 = grpc_core::JoinHostPort(
460       grpc_core::LocalIp(), grpc_pick_unused_port_or_die());
461   grpc_server* server1 = ServerStart(server_address1.c_str(), cq);
462   grpc_server* server2 = ServerStart(server_address2.c_str(), cq);
463   // create a single channel with multiple subchannels with a keepalive ping
464   // interval of 1 second. To get finer control on subchannel connection times,
465   // we are using pick_first instead of round_robin and using the fake resolver
466   // response generator to switch between the two.
467   auto response_generator =
468       grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
469   auto client_channel_args =
470       grpc_core::ChannelArgs()
471           .Set(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0)
472           .Set(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 0)
473           .Set(GRPC_ARG_KEEPALIVE_TIME_MS, 1 * 1000)
474           .Set(GRPC_ARG_HTTP2_BDP_PROBE, 0)
475           .SetObject(response_generator)
476           .ToC();
477   grpc_channel_credentials* creds = grpc_insecure_credentials_create();
478   grpc_channel* channel =
479       grpc_channel_create("fake:///", creds, client_channel_args.get());
480   grpc_channel_credentials_release(creds);
481   // For a single subchannel 3 GOAWAYs would be sufficient to increase the
482   // keepalive time from 1 second to beyond 5 seconds. Even though we are
483   // alternating between two subchannels, 3 GOAWAYs should still be enough since
484   // the channel should start all new transports with the new keepalive value
485   // (even those from a different subchannel).
486   int expected_keepalive_time_sec = 1;
487   for (int i = 0; i < 3; i++) {
488     gpr_log(GPR_INFO, "Expected keepalive time : %d",
489             expected_keepalive_time_sec);
490     response_generator->SetResponseSynchronously(
491         BuildResolverResult({absl::StrCat(
492             "ipv4:", i % 2 == 0 ? server_address1 : server_address2)}));
493     // ExecCtx::Flush() might not be enough to make sure that the resolver
494     // result has been propagated, so sleep for a bit.
495     grpc_core::ExecCtx::Get()->Flush();
496     gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
497     EXPECT_EQ(PerformWaitingCall(channel, i % 2 == 0 ? server1 : server2, cq),
498               GRPC_STATUS_UNAVAILABLE);
499     expected_keepalive_time_sec *= 2;
500   }
501   gpr_log(
502       GPR_INFO,
503       "Client keepalive time %d should now be in sync with the server settings",
504       expected_keepalive_time_sec);
505   response_generator->SetResponseSynchronously(
506       BuildResolverResult({absl::StrCat("ipv4:", server_address2)}));
507   grpc_core::ExecCtx::Get()->Flush();
508   gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
509   EXPECT_EQ(PerformWaitingCall(channel, server2, cq),
510             GRPC_STATUS_DEADLINE_EXCEEDED);
511   // shutdown and destroy the client and server
512   grpc_channel_destroy(channel);
513   ServerShutdownAndDestroy(server1, cq);
514   ServerShutdownAndDestroy(server2, cq);
515   grpc_completion_queue_shutdown(cq);
516   while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME),
517                                     nullptr)
518              .type != GRPC_QUEUE_SHUTDOWN) {
519   }
520   grpc_completion_queue_destroy(cq);
521 }
522 
523 // Tests that when a channel has multiple subchannels and receives a GOAWAY with
524 // "too_many_pings" on one of them, all subchannels start any new transports
525 // with an updated keepalive time.
TEST_F(KeepaliveThrottlingTest,ExistingSubchannelsUseNewKeepaliveTimeWhenReconnecting)526 TEST_F(KeepaliveThrottlingTest,
527        ExistingSubchannelsUseNewKeepaliveTimeWhenReconnecting) {
528   grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
529   std::string server_address1 = grpc_core::JoinHostPort(
530       grpc_core::LocalIp(), grpc_pick_unused_port_or_die());
531   std::string server_address2 = grpc_core::JoinHostPort(
532       grpc_core::LocalIp(), grpc_pick_unused_port_or_die());
533   // create a single channel with round robin load balancing policy.
534   auto response_generator =
535       grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
536   auto client_channel_args =
537       grpc_core::ChannelArgs()
538           .Set(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0)
539           .Set(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 0)
540           .Set(GRPC_ARG_KEEPALIVE_TIME_MS, 1 * 1000)
541           .Set(GRPC_ARG_HTTP2_BDP_PROBE, 0)
542           .SetObject(response_generator)
543           .ToC();
544   grpc_channel_credentials* creds = grpc_insecure_credentials_create();
545   grpc_channel* channel =
546       grpc_channel_create("fake:///", creds, client_channel_args.get());
547   grpc_channel_credentials_release(creds);
548   response_generator->SetResponseSynchronously(
549       BuildResolverResult({absl::StrCat("ipv4:", server_address1),
550                            absl::StrCat("ipv4:", server_address2)}));
551   // For a single subchannel 3 GOAWAYs would be sufficient to increase the
552   // keepalive time from 1 second to beyond 5 seconds. Even though we are
553   // alternating between two subchannels, 3 GOAWAYs should still be enough since
554   // the channel should start all new transports with the new keepalive value
555   // (even those from a different subchannel).
556   int expected_keepalive_time_sec = 1;
557   for (int i = 0; i < 3; i++) {
558     gpr_log(GPR_ERROR, "Expected keepalive time : %d",
559             expected_keepalive_time_sec);
560     grpc_server* server = ServerStart(
561         i % 2 == 0 ? server_address1.c_str() : server_address2.c_str(), cq);
562     VerifyChannelReady(channel, cq);
563     EXPECT_EQ(PerformWaitingCall(channel, server, cq), GRPC_STATUS_UNAVAILABLE);
564     ServerShutdownAndDestroy(server, cq);
565     VerifyChannelDisconnected(channel, cq);
566     expected_keepalive_time_sec *= 2;
567   }
568   gpr_log(
569       GPR_INFO,
570       "Client keepalive time %d should now be in sync with the server settings",
571       expected_keepalive_time_sec);
572   grpc_server* server = ServerStart(server_address1.c_str(), cq);
573   VerifyChannelReady(channel, cq);
574   EXPECT_EQ(PerformWaitingCall(channel, server, cq),
575             GRPC_STATUS_DEADLINE_EXCEEDED);
576   ServerShutdownAndDestroy(server, cq);
577   // shutdown and destroy the client
578   grpc_channel_destroy(channel);
579   grpc_completion_queue_shutdown(cq);
580   while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME),
581                                     nullptr)
582              .type != GRPC_QUEUE_SHUTDOWN) {
583   }
584   grpc_completion_queue_destroy(cq);
585 }
586 
587 // Perform a simple RPC where the client makes a request expecting a response
588 // with payload.
PerformCallWithResponsePayload(grpc_channel * channel,grpc_server * server,grpc_completion_queue * cq)589 void PerformCallWithResponsePayload(grpc_channel* channel, grpc_server* server,
590                                     grpc_completion_queue* cq) {
591   grpc_slice response_payload_slice = grpc_slice_from_static_string("hello");
592 
593   grpc_call* c;
594   grpc_call* s;
595   grpc_byte_buffer* response_payload =
596       grpc_raw_byte_buffer_create(&response_payload_slice, 1);
597   grpc_core::CqVerifier cqv(cq);
598   grpc_op ops[6];
599   grpc_op* op;
600   grpc_metadata_array initial_metadata_recv;
601   grpc_metadata_array trailing_metadata_recv;
602   grpc_metadata_array request_metadata_recv;
603   grpc_byte_buffer* response_payload_recv = nullptr;
604   grpc_call_details call_details;
605   grpc_status_code status;
606   grpc_call_error error;
607   grpc_slice details;
608   int was_cancelled = 2;
609 
610   gpr_timespec deadline = grpc_timeout_seconds_to_deadline(60);
611   c = grpc_channel_create_call(channel, nullptr, GRPC_PROPAGATE_DEFAULTS, cq,
612                                grpc_slice_from_static_string("/foo"), nullptr,
613                                deadline, nullptr);
614   GPR_ASSERT(c);
615 
616   grpc_metadata_array_init(&initial_metadata_recv);
617   grpc_metadata_array_init(&trailing_metadata_recv);
618   grpc_metadata_array_init(&request_metadata_recv);
619   grpc_call_details_init(&call_details);
620 
621   memset(ops, 0, sizeof(ops));
622   op = ops;
623   op->op = GRPC_OP_SEND_INITIAL_METADATA;
624   op->data.send_initial_metadata.count = 0;
625   op->flags = 0;
626   op->reserved = nullptr;
627   op++;
628   op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
629   op->flags = 0;
630   op->reserved = nullptr;
631   op++;
632   op->op = GRPC_OP_RECV_INITIAL_METADATA;
633   op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
634   op->flags = 0;
635   op->reserved = nullptr;
636   op++;
637   op->op = GRPC_OP_RECV_MESSAGE;
638   op->data.recv_message.recv_message = &response_payload_recv;
639   op->flags = 0;
640   op->reserved = nullptr;
641   op++;
642   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
643   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
644   op->data.recv_status_on_client.status = &status;
645   op->data.recv_status_on_client.status_details = &details;
646   op->flags = 0;
647   op->reserved = nullptr;
648   op++;
649   error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops),
650                                 grpc_core::CqVerifier::tag(1), nullptr);
651   GPR_ASSERT(GRPC_CALL_OK == error);
652 
653   error = grpc_server_request_call(server, &s, &call_details,
654                                    &request_metadata_recv, cq, cq,
655                                    grpc_core::CqVerifier::tag(101));
656   GPR_ASSERT(GRPC_CALL_OK == error);
657   cqv.Expect(grpc_core::CqVerifier::tag(101), true);
658   cqv.Verify();
659 
660   memset(ops, 0, sizeof(ops));
661   op = ops;
662   op->op = GRPC_OP_SEND_INITIAL_METADATA;
663   op->data.send_initial_metadata.count = 0;
664   op->flags = 0;
665   op->reserved = nullptr;
666   op++;
667   error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops),
668                                 grpc_core::CqVerifier::tag(102), nullptr);
669   GPR_ASSERT(GRPC_CALL_OK == error);
670 
671   cqv.Expect(grpc_core::CqVerifier::tag(102), true);
672   cqv.Verify();
673 
674   memset(ops, 0, sizeof(ops));
675   op = ops;
676   op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
677   op->data.recv_close_on_server.cancelled = &was_cancelled;
678   op->flags = 0;
679   op->reserved = nullptr;
680   op++;
681   op->op = GRPC_OP_SEND_MESSAGE;
682   op->data.send_message.send_message = response_payload;
683   op->flags = 0;
684   op->reserved = nullptr;
685   op++;
686   op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
687   op->data.send_status_from_server.trailing_metadata_count = 0;
688   op->data.send_status_from_server.status = GRPC_STATUS_OK;
689   grpc_slice status_details = grpc_slice_from_static_string("xyz");
690   op->data.send_status_from_server.status_details = &status_details;
691   op->flags = 0;
692   op->reserved = nullptr;
693   op++;
694   error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops),
695                                 grpc_core::CqVerifier::tag(103), nullptr);
696   GPR_ASSERT(GRPC_CALL_OK == error);
697 
698   cqv.Expect(grpc_core::CqVerifier::tag(103), true);
699   cqv.Expect(grpc_core::CqVerifier::tag(1), true);
700   cqv.Verify();
701 
702   GPR_ASSERT(status == GRPC_STATUS_OK);
703   GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
704   GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo"));
705   GPR_ASSERT(was_cancelled == 0);
706   GPR_ASSERT(
707       byte_buffer_eq_slice(response_payload_recv, response_payload_slice));
708 
709   grpc_slice_unref(details);
710   grpc_metadata_array_destroy(&initial_metadata_recv);
711   grpc_metadata_array_destroy(&trailing_metadata_recv);
712   grpc_metadata_array_destroy(&request_metadata_recv);
713   grpc_call_details_destroy(&call_details);
714 
715   grpc_call_unref(c);
716   grpc_call_unref(s);
717 
718   grpc_byte_buffer_destroy(response_payload);
719   grpc_byte_buffer_destroy(response_payload_recv);
720 }
721 
TEST(TooManyPings,BdpPingNotSentWithoutReceiveSideActivity)722 TEST(TooManyPings, BdpPingNotSentWithoutReceiveSideActivity) {
723   TransportCounter::WaitForTransportsToBeDestroyed();
724   grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
725   // create the server
726   std::string server_address = grpc_core::JoinHostPort(
727       grpc_core::LocalIp(), grpc_pick_unused_port_or_die());
728   grpc_arg server_args[] = {
729       grpc_channel_arg_integer_create(
730           const_cast<char*>(
731               GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS),
732           60 * 1000),
733       grpc_channel_arg_integer_create(
734           const_cast<char*>(GRPC_ARG_HTTP2_MAX_PING_STRIKES), 1)};
735   grpc_channel_args server_channel_args = {GPR_ARRAY_SIZE(server_args),
736                                            server_args};
737   grpc_server* server = grpc_server_create(&server_channel_args, nullptr);
738   grpc_server_register_completion_queue(server, cq, nullptr);
739   grpc_server_credentials* server_creds =
740       grpc_insecure_server_credentials_create();
741   GPR_ASSERT(
742       grpc_server_add_http2_port(server, server_address.c_str(), server_creds));
743   grpc_server_credentials_release(server_creds);
744   grpc_server_start(server);
745   // create the channel (bdp pings are enabled by default)
746   grpc_arg client_args[] = {
747       grpc_channel_arg_integer_create(
748           const_cast<char*>(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA), 0),
749       grpc_channel_arg_integer_create(
750           const_cast<char*>(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS), 1)};
751   grpc_channel_args client_channel_args = {GPR_ARRAY_SIZE(client_args),
752                                            client_args};
753   grpc_channel_credentials* creds = grpc_insecure_credentials_create();
754   grpc_channel* channel =
755       grpc_channel_create(server_address.c_str(), creds, &client_channel_args);
756   grpc_channel_credentials_release(creds);
757   VerifyChannelReady(channel, cq);
758   EXPECT_EQ(TransportCounter::count(), 2 /* one each for server and client */);
759   grpc_core::CqVerifier cqv(cq);
760   // Channel should be able to send two pings without disconnect if there was no
761   // BDP sent.
762   grpc_channel_ping(channel, cq, grpc_core::CqVerifier::tag(1), nullptr);
763   cqv.Expect(grpc_core::CqVerifier::tag(1), true);
764   cqv.Verify(grpc_core::Duration::Seconds(5));
765   // Second ping
766   grpc_channel_ping(channel, cq, grpc_core::CqVerifier::tag(2), nullptr);
767   cqv.Expect(grpc_core::CqVerifier::tag(2), true);
768   cqv.Verify(grpc_core::Duration::Seconds(5));
769   ASSERT_EQ(grpc_channel_check_connectivity_state(channel, 0),
770             GRPC_CHANNEL_READY);
771   PerformCallWithResponsePayload(channel, server, cq);
772   // Wait a bit to make sure that the BDP ping goes out.
773   cqv.VerifyEmpty(grpc_core::Duration::Seconds(1));
774   // The call with a response payload should have triggered a BDP ping.
775   // Send two more pings to verify. The second ping should cause a disconnect.
776   // If BDP was not sent, the second ping would not cause a disconnect.
777   grpc_channel_ping(channel, cq, grpc_core::CqVerifier::tag(3), nullptr);
778   cqv.Expect(grpc_core::CqVerifier::tag(3), true);
779   cqv.Verify(grpc_core::Duration::Seconds(5));
780   // Second ping
781   grpc_channel_ping(channel, cq, grpc_core::CqVerifier::tag(4), nullptr);
782   cqv.Expect(grpc_core::CqVerifier::tag(4), true);
783   cqv.Verify(grpc_core::Duration::Seconds(5));
784   // Make sure that the transports have been destroyed
785   VerifyChannelDisconnected(channel, cq);
786   TransportCounter::WaitForTransportsToBeDestroyed();
787   // shutdown and destroy the client and server
788   ServerShutdownAndDestroy(server, cq);
789   grpc_channel_destroy(channel);
790   grpc_completion_queue_shutdown(cq);
791   while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME),
792                                     nullptr)
793              .type != GRPC_QUEUE_SHUTDOWN) {
794   }
795   grpc_completion_queue_destroy(cq);
796 }
797 
TEST(TooManyPings,TransportsGetCleanedUpOnDisconnect)798 TEST(TooManyPings, TransportsGetCleanedUpOnDisconnect) {
799   TransportCounter::WaitForTransportsToBeDestroyed();
800   grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
801   // create the client and server
802   std::string server_address = grpc_core::JoinHostPort(
803       grpc_core::LocalIp(), grpc_pick_unused_port_or_die());
804   grpc_arg server_args[] = {
805       grpc_channel_arg_integer_create(
806           const_cast<char*>(
807               GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS),
808           60 * 1000),
809       grpc_channel_arg_integer_create(
810           const_cast<char*>(GRPC_ARG_HTTP2_MAX_PING_STRIKES), 1)};
811   grpc_channel_args server_channel_args = {GPR_ARRAY_SIZE(server_args),
812                                            server_args};
813   grpc_server* server = grpc_server_create(&server_channel_args, nullptr);
814   grpc_server_register_completion_queue(server, cq, nullptr);
815   grpc_server_credentials* server_creds =
816       grpc_insecure_server_credentials_create();
817   GPR_ASSERT(
818       grpc_server_add_http2_port(server, server_address.c_str(), server_creds));
819   grpc_server_credentials_release(server_creds);
820   grpc_server_start(server);
821   grpc_arg client_args[] = {
822       grpc_channel_arg_integer_create(
823           const_cast<char*>(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA), 0),
824       grpc_channel_arg_integer_create(
825           const_cast<char*>(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS), 1)};
826   grpc_channel_args client_channel_args = {GPR_ARRAY_SIZE(client_args),
827                                            client_args};
828   grpc_channel_credentials* creds = grpc_insecure_credentials_create();
829   grpc_channel* channel =
830       grpc_channel_create(server_address.c_str(), creds, &client_channel_args);
831   grpc_channel_credentials_release(creds);
832   VerifyChannelReady(channel, cq);
833   EXPECT_EQ(TransportCounter::count(), 2 /* one each for server and client */);
834   grpc_core::CqVerifier cqv(cq);
835   // First ping
836   grpc_channel_ping(channel, cq, grpc_core::CqVerifier::tag(1), nullptr);
837   cqv.Expect(grpc_core::CqVerifier::tag(1), true);
838   cqv.Verify(grpc_core::Duration::Seconds(5));
839   // Second ping
840   grpc_channel_ping(channel, cq, grpc_core::CqVerifier::tag(2), nullptr);
841   cqv.Expect(grpc_core::CqVerifier::tag(2), true);
842   cqv.Verify(grpc_core::Duration::Seconds(5));
843   // Third ping caused disconnect
844   grpc_channel_ping(channel, cq, grpc_core::CqVerifier::tag(2), nullptr);
845   cqv.Expect(grpc_core::CqVerifier::tag(2), true);
846   cqv.Verify(grpc_core::Duration::Seconds(5));
847   // Make sure that the transports have been destroyed
848   VerifyChannelDisconnected(channel, cq);
849   TransportCounter::WaitForTransportsToBeDestroyed();
850   // shutdown and destroy the client and server
851   ServerShutdownAndDestroy(server, cq);
852   grpc_channel_destroy(channel);
853   grpc_completion_queue_shutdown(cq);
854   while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME),
855                                     nullptr)
856              .type != GRPC_QUEUE_SHUTDOWN) {
857   }
858   grpc_completion_queue_destroy(cq);
859 }
860 
861 }  // namespace
862 
main(int argc,char ** argv)863 int main(int argc, char** argv) {
864   ::testing::InitGoogleTest(&argc, argv);
865   grpc::testing::TestEnvironment env(&argc, argv);
866   grpc_core::TestOnlySetGlobalHttp2TransportInitCallback(
867       TransportCounter::CounterInitCallback);
868   grpc_core::TestOnlySetGlobalHttp2TransportDestructCallback(
869       TransportCounter::CounterDestructCallback);
870   grpc_init();
871   auto result = RUN_ALL_TESTS();
872   grpc_shutdown();
873   return result;
874 }
875