1 //
2 //
3 // Copyright 2020 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <string.h>
20
21 #include <algorithm>
22 #include <map>
23 #include <string>
24 #include <utility>
25 #include <vector>
26
27 #include "absl/status/status.h"
28 #include "absl/status/statusor.h"
29 #include "absl/strings/str_cat.h"
30 #include "absl/time/time.h"
31 #include "gtest/gtest.h"
32
33 #include <grpc/byte_buffer.h>
34 #include <grpc/grpc.h>
35 #include <grpc/grpc_security.h>
36 #include <grpc/impl/channel_arg_names.h>
37 #include <grpc/impl/propagation_bits.h>
38 #include <grpc/slice.h>
39 #include <grpc/status.h>
40 #include <grpc/support/log.h>
41 #include <grpc/support/port_platform.h>
42 #include <grpc/support/time.h>
43
44 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
45 #include "src/core/lib/address_utils/parse_address.h"
46 #include "src/core/lib/channel/channel_args.h"
47 #include "src/core/lib/gpr/useful.h"
48 #include "src/core/lib/gprpp/host_port.h"
49 #include "src/core/lib/gprpp/ref_counted_ptr.h"
50 #include "src/core/lib/gprpp/sync.h"
51 #include "src/core/lib/gprpp/time.h"
52 #include "src/core/lib/iomgr/exec_ctx.h"
53 #include "src/core/lib/iomgr/resolved_address.h"
54 #include "src/core/lib/surface/channel.h"
55 #include "src/core/lib/uri/uri_parser.h"
56 #include "src/core/resolver/endpoint_addresses.h"
57 #include "src/core/resolver/fake/fake_resolver.h"
58 #include "src/core/resolver/resolver.h"
59 #include "test/core/end2end/cq_verifier.h"
60 #include "test/core/util/port.h"
61 #include "test/core/util/resolve_localhost_ip46.h"
62 #include "test/core/util/test_config.h"
63
64 namespace {
65
66 class TransportCounter {
67 public:
CounterInitCallback()68 static void CounterInitCallback() {
69 grpc_core::MutexLock lock(&mu());
70 ++count_;
71 }
72
CounterDestructCallback()73 static void CounterDestructCallback() {
74 grpc_core::MutexLock lock(&mu());
75 if (--count_ == 0) {
76 cv().SignalAll();
77 }
78 }
79
WaitForTransportsToBeDestroyed()80 static void WaitForTransportsToBeDestroyed() {
81 grpc_core::MutexLock lock(&mu());
82 while (count_ != 0) {
83 ASSERT_FALSE(cv().WaitWithTimeout(&mu(), absl::Seconds(10)));
84 }
85 }
86
count()87 static int count() {
88 grpc_core::MutexLock lock(&mu());
89 return count_;
90 }
91
mu()92 static grpc_core::Mutex& mu() {
93 static grpc_core::Mutex* mu = new grpc_core::Mutex();
94 return *mu;
95 }
96
cv()97 static grpc_core::CondVar& cv() {
98 static grpc_core::CondVar* cv = new grpc_core::CondVar();
99 return *cv;
100 }
101
102 private:
103 static int count_;
104 };
105
106 int TransportCounter::count_ = 0;
107
108 // Perform a simple RPC where the server cancels the request with
109 // grpc_call_cancel_with_status
PerformCall(grpc_channel * channel,grpc_server * server,grpc_completion_queue * cq)110 grpc_status_code PerformCall(grpc_channel* channel, grpc_server* server,
111 grpc_completion_queue* cq) {
112 grpc_call* c;
113 grpc_call* s;
114 grpc_core::CqVerifier cqv(cq);
115 grpc_op ops[6];
116 grpc_op* op;
117 grpc_metadata_array initial_metadata_recv;
118 grpc_metadata_array trailing_metadata_recv;
119 grpc_metadata_array request_metadata_recv;
120 grpc_call_details call_details;
121 grpc_status_code status;
122 grpc_call_error error;
123 grpc_slice details;
124 gpr_timespec deadline = grpc_timeout_seconds_to_deadline(5);
125 // Start a call
126 c = grpc_channel_create_call(channel, nullptr, GRPC_PROPAGATE_DEFAULTS, cq,
127 grpc_slice_from_static_string("/foo"), nullptr,
128 deadline, nullptr);
129 GPR_ASSERT(c);
130 grpc_metadata_array_init(&initial_metadata_recv);
131 grpc_metadata_array_init(&trailing_metadata_recv);
132 grpc_metadata_array_init(&request_metadata_recv);
133 grpc_call_details_init(&call_details);
134 memset(ops, 0, sizeof(ops));
135 op = ops;
136 op->op = GRPC_OP_SEND_INITIAL_METADATA;
137 op->data.send_initial_metadata.count = 0;
138 op->flags = 0;
139 op->reserved = nullptr;
140 op++;
141 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
142 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
143 op->data.recv_status_on_client.status = &status;
144 op->data.recv_status_on_client.status_details = &details;
145 op->flags = 0;
146 op->reserved = nullptr;
147 op++;
148 op->op = GRPC_OP_RECV_INITIAL_METADATA;
149 op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
150 op->flags = 0;
151 op->reserved = nullptr;
152 op++;
153 error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops),
154 grpc_core::CqVerifier::tag(1), nullptr);
155 GPR_ASSERT(GRPC_CALL_OK == error);
156 // Request a call on the server
157 error = grpc_server_request_call(server, &s, &call_details,
158 &request_metadata_recv, cq, cq,
159 grpc_core::CqVerifier::tag(101));
160 GPR_ASSERT(GRPC_CALL_OK == error);
161 cqv.Expect(grpc_core::CqVerifier::tag(101), true);
162 cqv.Verify();
163 grpc_call_cancel_with_status(s, GRPC_STATUS_PERMISSION_DENIED, "test status",
164 nullptr);
165 cqv.Expect(grpc_core::CqVerifier::tag(1), true);
166 cqv.Verify();
167 // cleanup
168 grpc_slice_unref(details);
169 grpc_metadata_array_destroy(&initial_metadata_recv);
170 grpc_metadata_array_destroy(&trailing_metadata_recv);
171 grpc_metadata_array_destroy(&request_metadata_recv);
172 grpc_call_details_destroy(&call_details);
173 grpc_call_unref(c);
174 grpc_call_unref(s);
175 return status;
176 }
177
178 // Test that sending a lot of RPCs that are cancelled by the server doesn't
179 // result in too many pings due to the pings sent by BDP.
TEST(TooManyPings,TestLotsOfServerCancelledRpcsDoesntGiveTooManyPings)180 TEST(TooManyPings, TestLotsOfServerCancelledRpcsDoesntGiveTooManyPings) {
181 grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
182 // create the server
183 grpc_server* server = grpc_server_create(nullptr, nullptr);
184 std::string server_address = grpc_core::JoinHostPort(
185 grpc_core::LocalIp(), grpc_pick_unused_port_or_die());
186 grpc_server_register_completion_queue(server, cq, nullptr);
187 grpc_server_credentials* server_creds =
188 grpc_insecure_server_credentials_create();
189 GPR_ASSERT(
190 grpc_server_add_http2_port(server, server_address.c_str(), server_creds));
191 grpc_server_credentials_release(server_creds);
192 grpc_server_start(server);
193 // create the channel (bdp pings are enabled by default)
194 grpc_channel_credentials* creds = grpc_insecure_credentials_create();
195 grpc_channel* channel = grpc_channel_create(server_address.c_str(), creds,
196 nullptr /* channel args */);
197 grpc_channel_credentials_release(creds);
198 std::map<grpc_status_code, int> statuses_and_counts;
199 const int kNumTotalRpcs = 100;
200 // perform an RPC
201 gpr_log(GPR_INFO,
202 "Performing %d total RPCs and expecting them all to receive status "
203 "PERMISSION_DENIED (%d)",
204 kNumTotalRpcs, GRPC_STATUS_PERMISSION_DENIED);
205 for (int i = 0; i < kNumTotalRpcs; i++) {
206 grpc_status_code status = PerformCall(channel, server, cq);
207 statuses_and_counts[status] += 1;
208 }
209 int num_not_cancelled = 0;
210 for (auto itr = statuses_and_counts.begin(); itr != statuses_and_counts.end();
211 itr++) {
212 if (itr->first != GRPC_STATUS_PERMISSION_DENIED) {
213 num_not_cancelled += itr->second;
214 }
215 gpr_log(GPR_INFO, "%d / %d RPCs received status code: %d", itr->second,
216 kNumTotalRpcs, itr->first);
217 }
218 if (num_not_cancelled > 0) {
219 gpr_log(GPR_ERROR,
220 "Expected all RPCs to receive status PERMISSION_DENIED (%d) but %d "
221 "received other status codes",
222 GRPC_STATUS_PERMISSION_DENIED, num_not_cancelled);
223 FAIL();
224 }
225 // shutdown and destroy the client and server
226 grpc_channel_destroy(channel);
227 grpc_server_shutdown_and_notify(server, cq, nullptr);
228 grpc_completion_queue_shutdown(cq);
229 while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME),
230 nullptr)
231 .type != GRPC_QUEUE_SHUTDOWN) {
232 }
233 grpc_server_destroy(server);
234 grpc_completion_queue_destroy(cq);
235 }
236
237 // Perform a simple RPC where the client makes a request, and both the client
238 // and server continue reading so that gRPC can send and receive keepalive
239 // pings.
PerformWaitingCall(grpc_channel * channel,grpc_server * server,grpc_completion_queue * cq)240 grpc_status_code PerformWaitingCall(grpc_channel* channel, grpc_server* server,
241 grpc_completion_queue* cq) {
242 grpc_call* c;
243 grpc_call* s;
244 grpc_core::CqVerifier cqv(cq);
245 grpc_op ops[6];
246 grpc_op* op;
247 grpc_metadata_array initial_metadata_recv;
248 grpc_metadata_array trailing_metadata_recv;
249 grpc_metadata_array request_metadata_recv;
250 grpc_call_details call_details;
251 grpc_status_code status;
252 grpc_call_error error;
253 grpc_slice details;
254 gpr_timespec deadline = grpc_timeout_seconds_to_deadline(30);
255 // Start a call
256 c = grpc_channel_create_call(channel, nullptr, GRPC_PROPAGATE_DEFAULTS, cq,
257 grpc_slice_from_static_string("/foo"), nullptr,
258 deadline, nullptr);
259 GPR_ASSERT(c);
260 grpc_metadata_array_init(&initial_metadata_recv);
261 grpc_metadata_array_init(&trailing_metadata_recv);
262 grpc_metadata_array_init(&request_metadata_recv);
263 grpc_call_details_init(&call_details);
264 memset(ops, 0, sizeof(ops));
265 op = ops;
266 op->op = GRPC_OP_SEND_INITIAL_METADATA;
267 op->data.send_initial_metadata.count = 0;
268 op->flags = 0;
269 op->reserved = nullptr;
270 op++;
271 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
272 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
273 op->data.recv_status_on_client.status = &status;
274 op->data.recv_status_on_client.status_details = &details;
275 op->flags = 0;
276 op->reserved = nullptr;
277 op++;
278 op->op = GRPC_OP_RECV_INITIAL_METADATA;
279 op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
280 op->flags = 0;
281 op->reserved = nullptr;
282 op++;
283 error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops),
284 grpc_core::CqVerifier::tag(1), nullptr);
285 GPR_ASSERT(GRPC_CALL_OK == error);
286 // Request a call on the server
287 error = grpc_server_request_call(server, &s, &call_details,
288 &request_metadata_recv, cq, cq,
289 grpc_core::CqVerifier::tag(101));
290 GPR_ASSERT(GRPC_CALL_OK == error);
291 cqv.Expect(grpc_core::CqVerifier::tag(101), true);
292 cqv.Verify();
293 // Since the server is configured to allow only a single ping strike, it would
294 // take 3 pings to trigger the GOAWAY frame with "too_many_pings" from the
295 // server. (The second ping from the client would be the first bad ping sent
296 // too quickly leading to a ping strike and the third ping would lead to the
297 // GOAWAY.) If the client settings match with the server's settings, there
298 // won't be a bad ping, and the call will end due to the deadline expiring
299 // instead.
300 cqv.Expect(grpc_core::CqVerifier::tag(1), true);
301 // The call will end after this
302 cqv.Verify(grpc_core::Duration::Seconds(60));
303 // cleanup
304 grpc_slice_unref(details);
305 grpc_metadata_array_destroy(&initial_metadata_recv);
306 grpc_metadata_array_destroy(&trailing_metadata_recv);
307 grpc_metadata_array_destroy(&request_metadata_recv);
308 grpc_call_details_destroy(&call_details);
309 grpc_call_unref(c);
310 grpc_call_unref(s);
311 return status;
312 }
313
314 // Shuts down and destroys the server.
ServerShutdownAndDestroy(grpc_server * server,grpc_completion_queue * cq)315 void ServerShutdownAndDestroy(grpc_server* server, grpc_completion_queue* cq) {
316 // Shutdown and destroy server
317 grpc_server_shutdown_and_notify(server, cq, reinterpret_cast<void*>(1000));
318 while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME),
319 nullptr)
320 .tag != reinterpret_cast<void*>(1000)) {
321 }
322 grpc_server_destroy(server);
323 }
324
VerifyChannelReady(grpc_channel * channel,grpc_completion_queue * cq)325 void VerifyChannelReady(grpc_channel* channel, grpc_completion_queue* cq) {
326 grpc_connectivity_state state =
327 grpc_channel_check_connectivity_state(channel, 1 /* try_to_connect */);
328 while (state != GRPC_CHANNEL_READY) {
329 grpc_channel_watch_connectivity_state(
330 channel, state, grpc_timeout_seconds_to_deadline(5), cq, nullptr);
331 grpc_completion_queue_next(cq, grpc_timeout_seconds_to_deadline(5),
332 nullptr);
333 state = grpc_channel_check_connectivity_state(channel, 0);
334 }
335 }
336
VerifyChannelDisconnected(grpc_channel * channel,grpc_completion_queue * cq)337 void VerifyChannelDisconnected(grpc_channel* channel,
338 grpc_completion_queue* cq) {
339 // Verify channel gets disconnected. Use a ping to make sure that clients
340 // tries sending/receiving bytes if the channel is connected.
341 grpc_channel_ping(channel, cq, reinterpret_cast<void*>(2000), nullptr);
342 grpc_event ev = grpc_completion_queue_next(
343 cq, grpc_timeout_seconds_to_deadline(5), nullptr);
344 GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
345 GPR_ASSERT(ev.tag == reinterpret_cast<void*>(2000));
346 GPR_ASSERT(ev.success == 0);
347 // We are intentionally not checking the connectivity state since it is
348 // propagated in an asynchronous manner which means that we might see an older
349 // state. We would eventually get the correct state, but since we have already
350 // verified that the ping has failed, checking the connectivity state is not
351 // necessary.
352 }
353
354 class KeepaliveThrottlingTest : public ::testing::Test {
355 protected:
356 // Starts the server and makes sure that the channel is able to get connected.
ServerStart(const char * addr,grpc_completion_queue * cq)357 grpc_server* ServerStart(const char* addr, grpc_completion_queue* cq) {
358 // Set up server channel args to expect pings at an interval of 5 seconds
359 // and use a single ping strike
360 grpc_arg server_args[] = {
361 grpc_channel_arg_integer_create(
362 const_cast<char*>(
363 GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS),
364 5 * 1000),
365 grpc_channel_arg_integer_create(
366 const_cast<char*>(GRPC_ARG_HTTP2_MAX_PING_STRIKES), 1)};
367 grpc_channel_args server_channel_args = {GPR_ARRAY_SIZE(server_args),
368 server_args};
369 // Create server
370 grpc_server* server = grpc_server_create(&server_channel_args, nullptr);
371 grpc_server_register_completion_queue(server, cq, nullptr);
372 grpc_server_credentials* server_creds =
373 grpc_insecure_server_credentials_create();
374 GPR_ASSERT(grpc_server_add_http2_port(server, addr, server_creds));
375 grpc_server_credentials_release(server_creds);
376 grpc_server_start(server);
377 return server;
378 }
379 };
380
TEST_F(KeepaliveThrottlingTest,KeepaliveThrottlingMultipleChannels)381 TEST_F(KeepaliveThrottlingTest, KeepaliveThrottlingMultipleChannels) {
382 grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
383 std::string server_address = grpc_core::JoinHostPort(
384 grpc_core::LocalIp(), grpc_pick_unused_port_or_die());
385 grpc_server* server = ServerStart(server_address.c_str(), cq);
386 // create two channel with a keepalive ping interval of 1 second.
387 grpc_arg client_args[] = {
388 grpc_channel_arg_integer_create(
389 const_cast<char*>(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA), 0),
390 grpc_channel_arg_integer_create(
391 const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS), 1 * 1000),
392 grpc_channel_arg_integer_create(
393 const_cast<char*>(GRPC_ARG_HTTP2_BDP_PROBE), 0)};
394 grpc_channel_args client_channel_args = {GPR_ARRAY_SIZE(client_args),
395 client_args};
396 grpc_channel_credentials* creds = grpc_insecure_credentials_create();
397 grpc_channel* channel =
398 grpc_channel_create(server_address.c_str(), creds, &client_channel_args);
399 grpc_channel* channel_dup =
400 grpc_channel_create(server_address.c_str(), creds, &client_channel_args);
401 grpc_channel_credentials_release(creds);
402 int expected_keepalive_time_sec = 1;
403 // We need 3 GOAWAY frames to throttle the keepalive time from 1 second to 8
404 // seconds (> 5sec).
405 for (int i = 0; i < 3; i++) {
406 gpr_log(GPR_INFO, "Expected keepalive time : %d",
407 expected_keepalive_time_sec);
408 EXPECT_EQ(PerformWaitingCall(channel, server, cq), GRPC_STATUS_UNAVAILABLE);
409 expected_keepalive_time_sec *= 2;
410 }
411 gpr_log(
412 GPR_INFO,
413 "Client keepalive time %d should now be in sync with the server settings",
414 expected_keepalive_time_sec);
415 EXPECT_EQ(PerformWaitingCall(channel, server, cq),
416 GRPC_STATUS_DEADLINE_EXCEEDED);
417 // Since the subchannel is shared, the second channel should also have
418 // keepalive settings in sync with the server.
419 gpr_log(GPR_INFO, "Now testing second channel sharing the same subchannel");
420 EXPECT_EQ(PerformWaitingCall(channel_dup, server, cq),
421 GRPC_STATUS_DEADLINE_EXCEEDED);
422 // shutdown and destroy the client and server
423 grpc_channel_destroy(channel);
424 grpc_channel_destroy(channel_dup);
425 ServerShutdownAndDestroy(server, cq);
426 grpc_completion_queue_shutdown(cq);
427 while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME),
428 nullptr)
429 .type != GRPC_QUEUE_SHUTDOWN) {
430 }
431 grpc_completion_queue_destroy(cq);
432 }
433
BuildResolverResult(const std::vector<std::string> & addresses)434 grpc_core::Resolver::Result BuildResolverResult(
435 const std::vector<std::string>& addresses) {
436 grpc_core::Resolver::Result result;
437 result.addresses = grpc_core::EndpointAddressesList();
438 for (const auto& address_str : addresses) {
439 absl::StatusOr<grpc_core::URI> uri = grpc_core::URI::Parse(address_str);
440 if (!uri.ok()) {
441 gpr_log(GPR_ERROR, "Failed to parse uri. Error: %s",
442 uri.status().ToString().c_str());
443 GPR_ASSERT(uri.ok());
444 }
445 grpc_resolved_address address;
446 GPR_ASSERT(grpc_parse_uri(*uri, &address));
447 result.addresses->emplace_back(address, grpc_core::ChannelArgs());
448 }
449 return result;
450 }
451
452 // Tests that when new subchannels are created due to a change in resolved
453 // addresses, the new subchannels use the updated keepalive time.
TEST_F(KeepaliveThrottlingTest,NewSubchannelsUseUpdatedKeepaliveTime)454 TEST_F(KeepaliveThrottlingTest, NewSubchannelsUseUpdatedKeepaliveTime) {
455 grpc_core::ExecCtx exec_ctx;
456 grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
457 std::string server_address1 = grpc_core::JoinHostPort(
458 grpc_core::LocalIp(), grpc_pick_unused_port_or_die());
459 std::string server_address2 = grpc_core::JoinHostPort(
460 grpc_core::LocalIp(), grpc_pick_unused_port_or_die());
461 grpc_server* server1 = ServerStart(server_address1.c_str(), cq);
462 grpc_server* server2 = ServerStart(server_address2.c_str(), cq);
463 // create a single channel with multiple subchannels with a keepalive ping
464 // interval of 1 second. To get finer control on subchannel connection times,
465 // we are using pick_first instead of round_robin and using the fake resolver
466 // response generator to switch between the two.
467 auto response_generator =
468 grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
469 auto client_channel_args =
470 grpc_core::ChannelArgs()
471 .Set(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0)
472 .Set(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 0)
473 .Set(GRPC_ARG_KEEPALIVE_TIME_MS, 1 * 1000)
474 .Set(GRPC_ARG_HTTP2_BDP_PROBE, 0)
475 .SetObject(response_generator)
476 .ToC();
477 grpc_channel_credentials* creds = grpc_insecure_credentials_create();
478 grpc_channel* channel =
479 grpc_channel_create("fake:///", creds, client_channel_args.get());
480 grpc_channel_credentials_release(creds);
481 // For a single subchannel 3 GOAWAYs would be sufficient to increase the
482 // keepalive time from 1 second to beyond 5 seconds. Even though we are
483 // alternating between two subchannels, 3 GOAWAYs should still be enough since
484 // the channel should start all new transports with the new keepalive value
485 // (even those from a different subchannel).
486 int expected_keepalive_time_sec = 1;
487 for (int i = 0; i < 3; i++) {
488 gpr_log(GPR_INFO, "Expected keepalive time : %d",
489 expected_keepalive_time_sec);
490 response_generator->SetResponseSynchronously(
491 BuildResolverResult({absl::StrCat(
492 "ipv4:", i % 2 == 0 ? server_address1 : server_address2)}));
493 // ExecCtx::Flush() might not be enough to make sure that the resolver
494 // result has been propagated, so sleep for a bit.
495 grpc_core::ExecCtx::Get()->Flush();
496 gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
497 EXPECT_EQ(PerformWaitingCall(channel, i % 2 == 0 ? server1 : server2, cq),
498 GRPC_STATUS_UNAVAILABLE);
499 expected_keepalive_time_sec *= 2;
500 }
501 gpr_log(
502 GPR_INFO,
503 "Client keepalive time %d should now be in sync with the server settings",
504 expected_keepalive_time_sec);
505 response_generator->SetResponseSynchronously(
506 BuildResolverResult({absl::StrCat("ipv4:", server_address2)}));
507 grpc_core::ExecCtx::Get()->Flush();
508 gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
509 EXPECT_EQ(PerformWaitingCall(channel, server2, cq),
510 GRPC_STATUS_DEADLINE_EXCEEDED);
511 // shutdown and destroy the client and server
512 grpc_channel_destroy(channel);
513 ServerShutdownAndDestroy(server1, cq);
514 ServerShutdownAndDestroy(server2, cq);
515 grpc_completion_queue_shutdown(cq);
516 while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME),
517 nullptr)
518 .type != GRPC_QUEUE_SHUTDOWN) {
519 }
520 grpc_completion_queue_destroy(cq);
521 }
522
523 // Tests that when a channel has multiple subchannels and receives a GOAWAY with
524 // "too_many_pings" on one of them, all subchannels start any new transports
525 // with an updated keepalive time.
TEST_F(KeepaliveThrottlingTest,ExistingSubchannelsUseNewKeepaliveTimeWhenReconnecting)526 TEST_F(KeepaliveThrottlingTest,
527 ExistingSubchannelsUseNewKeepaliveTimeWhenReconnecting) {
528 grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
529 std::string server_address1 = grpc_core::JoinHostPort(
530 grpc_core::LocalIp(), grpc_pick_unused_port_or_die());
531 std::string server_address2 = grpc_core::JoinHostPort(
532 grpc_core::LocalIp(), grpc_pick_unused_port_or_die());
533 // create a single channel with round robin load balancing policy.
534 auto response_generator =
535 grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
536 auto client_channel_args =
537 grpc_core::ChannelArgs()
538 .Set(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0)
539 .Set(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 0)
540 .Set(GRPC_ARG_KEEPALIVE_TIME_MS, 1 * 1000)
541 .Set(GRPC_ARG_HTTP2_BDP_PROBE, 0)
542 .SetObject(response_generator)
543 .ToC();
544 grpc_channel_credentials* creds = grpc_insecure_credentials_create();
545 grpc_channel* channel =
546 grpc_channel_create("fake:///", creds, client_channel_args.get());
547 grpc_channel_credentials_release(creds);
548 response_generator->SetResponseSynchronously(
549 BuildResolverResult({absl::StrCat("ipv4:", server_address1),
550 absl::StrCat("ipv4:", server_address2)}));
551 // For a single subchannel 3 GOAWAYs would be sufficient to increase the
552 // keepalive time from 1 second to beyond 5 seconds. Even though we are
553 // alternating between two subchannels, 3 GOAWAYs should still be enough since
554 // the channel should start all new transports with the new keepalive value
555 // (even those from a different subchannel).
556 int expected_keepalive_time_sec = 1;
557 for (int i = 0; i < 3; i++) {
558 gpr_log(GPR_ERROR, "Expected keepalive time : %d",
559 expected_keepalive_time_sec);
560 grpc_server* server = ServerStart(
561 i % 2 == 0 ? server_address1.c_str() : server_address2.c_str(), cq);
562 VerifyChannelReady(channel, cq);
563 EXPECT_EQ(PerformWaitingCall(channel, server, cq), GRPC_STATUS_UNAVAILABLE);
564 ServerShutdownAndDestroy(server, cq);
565 VerifyChannelDisconnected(channel, cq);
566 expected_keepalive_time_sec *= 2;
567 }
568 gpr_log(
569 GPR_INFO,
570 "Client keepalive time %d should now be in sync with the server settings",
571 expected_keepalive_time_sec);
572 grpc_server* server = ServerStart(server_address1.c_str(), cq);
573 VerifyChannelReady(channel, cq);
574 EXPECT_EQ(PerformWaitingCall(channel, server, cq),
575 GRPC_STATUS_DEADLINE_EXCEEDED);
576 ServerShutdownAndDestroy(server, cq);
577 // shutdown and destroy the client
578 grpc_channel_destroy(channel);
579 grpc_completion_queue_shutdown(cq);
580 while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME),
581 nullptr)
582 .type != GRPC_QUEUE_SHUTDOWN) {
583 }
584 grpc_completion_queue_destroy(cq);
585 }
586
587 // Perform a simple RPC where the client makes a request expecting a response
588 // with payload.
PerformCallWithResponsePayload(grpc_channel * channel,grpc_server * server,grpc_completion_queue * cq)589 void PerformCallWithResponsePayload(grpc_channel* channel, grpc_server* server,
590 grpc_completion_queue* cq) {
591 grpc_slice response_payload_slice = grpc_slice_from_static_string("hello");
592
593 grpc_call* c;
594 grpc_call* s;
595 grpc_byte_buffer* response_payload =
596 grpc_raw_byte_buffer_create(&response_payload_slice, 1);
597 grpc_core::CqVerifier cqv(cq);
598 grpc_op ops[6];
599 grpc_op* op;
600 grpc_metadata_array initial_metadata_recv;
601 grpc_metadata_array trailing_metadata_recv;
602 grpc_metadata_array request_metadata_recv;
603 grpc_byte_buffer* response_payload_recv = nullptr;
604 grpc_call_details call_details;
605 grpc_status_code status;
606 grpc_call_error error;
607 grpc_slice details;
608 int was_cancelled = 2;
609
610 gpr_timespec deadline = grpc_timeout_seconds_to_deadline(60);
611 c = grpc_channel_create_call(channel, nullptr, GRPC_PROPAGATE_DEFAULTS, cq,
612 grpc_slice_from_static_string("/foo"), nullptr,
613 deadline, nullptr);
614 GPR_ASSERT(c);
615
616 grpc_metadata_array_init(&initial_metadata_recv);
617 grpc_metadata_array_init(&trailing_metadata_recv);
618 grpc_metadata_array_init(&request_metadata_recv);
619 grpc_call_details_init(&call_details);
620
621 memset(ops, 0, sizeof(ops));
622 op = ops;
623 op->op = GRPC_OP_SEND_INITIAL_METADATA;
624 op->data.send_initial_metadata.count = 0;
625 op->flags = 0;
626 op->reserved = nullptr;
627 op++;
628 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
629 op->flags = 0;
630 op->reserved = nullptr;
631 op++;
632 op->op = GRPC_OP_RECV_INITIAL_METADATA;
633 op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
634 op->flags = 0;
635 op->reserved = nullptr;
636 op++;
637 op->op = GRPC_OP_RECV_MESSAGE;
638 op->data.recv_message.recv_message = &response_payload_recv;
639 op->flags = 0;
640 op->reserved = nullptr;
641 op++;
642 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
643 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
644 op->data.recv_status_on_client.status = &status;
645 op->data.recv_status_on_client.status_details = &details;
646 op->flags = 0;
647 op->reserved = nullptr;
648 op++;
649 error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops),
650 grpc_core::CqVerifier::tag(1), nullptr);
651 GPR_ASSERT(GRPC_CALL_OK == error);
652
653 error = grpc_server_request_call(server, &s, &call_details,
654 &request_metadata_recv, cq, cq,
655 grpc_core::CqVerifier::tag(101));
656 GPR_ASSERT(GRPC_CALL_OK == error);
657 cqv.Expect(grpc_core::CqVerifier::tag(101), true);
658 cqv.Verify();
659
660 memset(ops, 0, sizeof(ops));
661 op = ops;
662 op->op = GRPC_OP_SEND_INITIAL_METADATA;
663 op->data.send_initial_metadata.count = 0;
664 op->flags = 0;
665 op->reserved = nullptr;
666 op++;
667 error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops),
668 grpc_core::CqVerifier::tag(102), nullptr);
669 GPR_ASSERT(GRPC_CALL_OK == error);
670
671 cqv.Expect(grpc_core::CqVerifier::tag(102), true);
672 cqv.Verify();
673
674 memset(ops, 0, sizeof(ops));
675 op = ops;
676 op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
677 op->data.recv_close_on_server.cancelled = &was_cancelled;
678 op->flags = 0;
679 op->reserved = nullptr;
680 op++;
681 op->op = GRPC_OP_SEND_MESSAGE;
682 op->data.send_message.send_message = response_payload;
683 op->flags = 0;
684 op->reserved = nullptr;
685 op++;
686 op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
687 op->data.send_status_from_server.trailing_metadata_count = 0;
688 op->data.send_status_from_server.status = GRPC_STATUS_OK;
689 grpc_slice status_details = grpc_slice_from_static_string("xyz");
690 op->data.send_status_from_server.status_details = &status_details;
691 op->flags = 0;
692 op->reserved = nullptr;
693 op++;
694 error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops),
695 grpc_core::CqVerifier::tag(103), nullptr);
696 GPR_ASSERT(GRPC_CALL_OK == error);
697
698 cqv.Expect(grpc_core::CqVerifier::tag(103), true);
699 cqv.Expect(grpc_core::CqVerifier::tag(1), true);
700 cqv.Verify();
701
702 GPR_ASSERT(status == GRPC_STATUS_OK);
703 GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
704 GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo"));
705 GPR_ASSERT(was_cancelled == 0);
706 GPR_ASSERT(
707 byte_buffer_eq_slice(response_payload_recv, response_payload_slice));
708
709 grpc_slice_unref(details);
710 grpc_metadata_array_destroy(&initial_metadata_recv);
711 grpc_metadata_array_destroy(&trailing_metadata_recv);
712 grpc_metadata_array_destroy(&request_metadata_recv);
713 grpc_call_details_destroy(&call_details);
714
715 grpc_call_unref(c);
716 grpc_call_unref(s);
717
718 grpc_byte_buffer_destroy(response_payload);
719 grpc_byte_buffer_destroy(response_payload_recv);
720 }
721
TEST(TooManyPings,BdpPingNotSentWithoutReceiveSideActivity)722 TEST(TooManyPings, BdpPingNotSentWithoutReceiveSideActivity) {
723 TransportCounter::WaitForTransportsToBeDestroyed();
724 grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
725 // create the server
726 std::string server_address = grpc_core::JoinHostPort(
727 grpc_core::LocalIp(), grpc_pick_unused_port_or_die());
728 grpc_arg server_args[] = {
729 grpc_channel_arg_integer_create(
730 const_cast<char*>(
731 GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS),
732 60 * 1000),
733 grpc_channel_arg_integer_create(
734 const_cast<char*>(GRPC_ARG_HTTP2_MAX_PING_STRIKES), 1)};
735 grpc_channel_args server_channel_args = {GPR_ARRAY_SIZE(server_args),
736 server_args};
737 grpc_server* server = grpc_server_create(&server_channel_args, nullptr);
738 grpc_server_register_completion_queue(server, cq, nullptr);
739 grpc_server_credentials* server_creds =
740 grpc_insecure_server_credentials_create();
741 GPR_ASSERT(
742 grpc_server_add_http2_port(server, server_address.c_str(), server_creds));
743 grpc_server_credentials_release(server_creds);
744 grpc_server_start(server);
745 // create the channel (bdp pings are enabled by default)
746 grpc_arg client_args[] = {
747 grpc_channel_arg_integer_create(
748 const_cast<char*>(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA), 0),
749 grpc_channel_arg_integer_create(
750 const_cast<char*>(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS), 1)};
751 grpc_channel_args client_channel_args = {GPR_ARRAY_SIZE(client_args),
752 client_args};
753 grpc_channel_credentials* creds = grpc_insecure_credentials_create();
754 grpc_channel* channel =
755 grpc_channel_create(server_address.c_str(), creds, &client_channel_args);
756 grpc_channel_credentials_release(creds);
757 VerifyChannelReady(channel, cq);
758 EXPECT_EQ(TransportCounter::count(), 2 /* one each for server and client */);
759 grpc_core::CqVerifier cqv(cq);
760 // Channel should be able to send two pings without disconnect if there was no
761 // BDP sent.
762 grpc_channel_ping(channel, cq, grpc_core::CqVerifier::tag(1), nullptr);
763 cqv.Expect(grpc_core::CqVerifier::tag(1), true);
764 cqv.Verify(grpc_core::Duration::Seconds(5));
765 // Second ping
766 grpc_channel_ping(channel, cq, grpc_core::CqVerifier::tag(2), nullptr);
767 cqv.Expect(grpc_core::CqVerifier::tag(2), true);
768 cqv.Verify(grpc_core::Duration::Seconds(5));
769 ASSERT_EQ(grpc_channel_check_connectivity_state(channel, 0),
770 GRPC_CHANNEL_READY);
771 PerformCallWithResponsePayload(channel, server, cq);
772 // Wait a bit to make sure that the BDP ping goes out.
773 cqv.VerifyEmpty(grpc_core::Duration::Seconds(1));
774 // The call with a response payload should have triggered a BDP ping.
775 // Send two more pings to verify. The second ping should cause a disconnect.
776 // If BDP was not sent, the second ping would not cause a disconnect.
777 grpc_channel_ping(channel, cq, grpc_core::CqVerifier::tag(3), nullptr);
778 cqv.Expect(grpc_core::CqVerifier::tag(3), true);
779 cqv.Verify(grpc_core::Duration::Seconds(5));
780 // Second ping
781 grpc_channel_ping(channel, cq, grpc_core::CqVerifier::tag(4), nullptr);
782 cqv.Expect(grpc_core::CqVerifier::tag(4), true);
783 cqv.Verify(grpc_core::Duration::Seconds(5));
784 // Make sure that the transports have been destroyed
785 VerifyChannelDisconnected(channel, cq);
786 TransportCounter::WaitForTransportsToBeDestroyed();
787 // shutdown and destroy the client and server
788 ServerShutdownAndDestroy(server, cq);
789 grpc_channel_destroy(channel);
790 grpc_completion_queue_shutdown(cq);
791 while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME),
792 nullptr)
793 .type != GRPC_QUEUE_SHUTDOWN) {
794 }
795 grpc_completion_queue_destroy(cq);
796 }
797
TEST(TooManyPings,TransportsGetCleanedUpOnDisconnect)798 TEST(TooManyPings, TransportsGetCleanedUpOnDisconnect) {
799 TransportCounter::WaitForTransportsToBeDestroyed();
800 grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
801 // create the client and server
802 std::string server_address = grpc_core::JoinHostPort(
803 grpc_core::LocalIp(), grpc_pick_unused_port_or_die());
804 grpc_arg server_args[] = {
805 grpc_channel_arg_integer_create(
806 const_cast<char*>(
807 GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS),
808 60 * 1000),
809 grpc_channel_arg_integer_create(
810 const_cast<char*>(GRPC_ARG_HTTP2_MAX_PING_STRIKES), 1)};
811 grpc_channel_args server_channel_args = {GPR_ARRAY_SIZE(server_args),
812 server_args};
813 grpc_server* server = grpc_server_create(&server_channel_args, nullptr);
814 grpc_server_register_completion_queue(server, cq, nullptr);
815 grpc_server_credentials* server_creds =
816 grpc_insecure_server_credentials_create();
817 GPR_ASSERT(
818 grpc_server_add_http2_port(server, server_address.c_str(), server_creds));
819 grpc_server_credentials_release(server_creds);
820 grpc_server_start(server);
821 grpc_arg client_args[] = {
822 grpc_channel_arg_integer_create(
823 const_cast<char*>(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA), 0),
824 grpc_channel_arg_integer_create(
825 const_cast<char*>(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS), 1)};
826 grpc_channel_args client_channel_args = {GPR_ARRAY_SIZE(client_args),
827 client_args};
828 grpc_channel_credentials* creds = grpc_insecure_credentials_create();
829 grpc_channel* channel =
830 grpc_channel_create(server_address.c_str(), creds, &client_channel_args);
831 grpc_channel_credentials_release(creds);
832 VerifyChannelReady(channel, cq);
833 EXPECT_EQ(TransportCounter::count(), 2 /* one each for server and client */);
834 grpc_core::CqVerifier cqv(cq);
835 // First ping
836 grpc_channel_ping(channel, cq, grpc_core::CqVerifier::tag(1), nullptr);
837 cqv.Expect(grpc_core::CqVerifier::tag(1), true);
838 cqv.Verify(grpc_core::Duration::Seconds(5));
839 // Second ping
840 grpc_channel_ping(channel, cq, grpc_core::CqVerifier::tag(2), nullptr);
841 cqv.Expect(grpc_core::CqVerifier::tag(2), true);
842 cqv.Verify(grpc_core::Duration::Seconds(5));
843 // Third ping caused disconnect
844 grpc_channel_ping(channel, cq, grpc_core::CqVerifier::tag(2), nullptr);
845 cqv.Expect(grpc_core::CqVerifier::tag(2), true);
846 cqv.Verify(grpc_core::Duration::Seconds(5));
847 // Make sure that the transports have been destroyed
848 VerifyChannelDisconnected(channel, cq);
849 TransportCounter::WaitForTransportsToBeDestroyed();
850 // shutdown and destroy the client and server
851 ServerShutdownAndDestroy(server, cq);
852 grpc_channel_destroy(channel);
853 grpc_completion_queue_shutdown(cq);
854 while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME),
855 nullptr)
856 .type != GRPC_QUEUE_SHUTDOWN) {
857 }
858 grpc_completion_queue_destroy(cq);
859 }
860
861 } // namespace
862
main(int argc,char ** argv)863 int main(int argc, char** argv) {
864 ::testing::InitGoogleTest(&argc, argv);
865 grpc::testing::TestEnvironment env(&argc, argv);
866 grpc_core::TestOnlySetGlobalHttp2TransportInitCallback(
867 TransportCounter::CounterInitCallback);
868 grpc_core::TestOnlySetGlobalHttp2TransportDestructCallback(
869 TransportCounter::CounterDestructCallback);
870 grpc_init();
871 auto result = RUN_ALL_TESTS();
872 grpc_shutdown();
873 return result;
874 }
875