1 //
2 //
3 // Copyright 2022 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/credentials.h>
20 #include <grpc/grpc.h>
21 #include <grpc/grpc_security.h>
22 #include <grpc/impl/channel_arg_names.h>
23 #include <grpc/impl/propagation_bits.h>
24 #include <grpc/slice.h>
25 #include <grpc/slice_buffer.h>
26 #include <grpc/status.h>
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/port_platform.h>
29 #include <limits.h>
30 #include <stdint.h>
31 #include <stdlib.h>
32 #include <string.h>
33
34 #include <atomic>
35 #include <memory>
36 #include <new>
37 #include <string>
38 #include <thread>
39 #include <vector>
40
41 #include "absl/base/thread_annotations.h"
42 #include "absl/log/check.h"
43 #include "absl/status/status.h"
44 #include "absl/strings/match.h"
45 #include "absl/strings/str_cat.h"
46 #include "absl/strings/string_view.h"
47 #include "absl/time/time.h"
48 #include "absl/types/optional.h"
49 #include "gtest/gtest.h"
50 #include "src/core/config/core_configuration.h"
51 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
52 #include "src/core/ext/transport/chttp2/transport/frame_goaway.h"
53 #include "src/core/lib/channel/channel_args.h"
54 #include "src/core/lib/channel/channel_fwd.h"
55 #include "src/core/lib/channel/channel_stack.h"
56 #include "src/core/lib/iomgr/closure.h"
57 #include "src/core/lib/iomgr/endpoint.h"
58 #include "src/core/lib/iomgr/error.h"
59 #include "src/core/lib/iomgr/exec_ctx.h"
60 #include "src/core/lib/iomgr/iomgr_fwd.h"
61 #include "src/core/lib/iomgr/tcp_server.h"
62 #include "src/core/lib/slice/slice.h"
63 #include "src/core/lib/slice/slice_internal.h"
64 #include "src/core/lib/surface/channel_stack_type.h"
65 #include "src/core/lib/transport/metadata_batch.h"
66 #include "src/core/lib/transport/transport.h"
67 #include "src/core/util/debug_location.h"
68 #include "src/core/util/host_port.h"
69 #include "src/core/util/notification.h"
70 #include "src/core/util/sync.h"
71 #include "src/core/util/time.h"
72 #include "src/core/util/useful.h"
73 #include "test/core/end2end/cq_verifier.h"
74 #include "test/core/test_util/port.h"
75 #include "test/core/test_util/test_config.h"
76 #include "test/core/test_util/test_tcp_server.h"
77
78 namespace grpc_core {
79 namespace {
80
Tag(intptr_t t)81 void* Tag(intptr_t t) { return reinterpret_cast<void*>(t); }
82
83 // A filter that records state about trailing metadata.
84 class TrailingMetadataRecordingFilter {
85 public:
86 static grpc_channel_filter kFilterVtable;
87
trailing_metadata_available()88 static bool trailing_metadata_available() {
89 return trailing_metadata_available_;
90 }
91
reset_trailing_metadata_available()92 static void reset_trailing_metadata_available() {
93 trailing_metadata_available_ = false;
94 }
95
96 static absl::optional<GrpcStreamNetworkState::ValueType>
stream_network_state()97 stream_network_state() {
98 return stream_network_state_;
99 }
100
reset_stream_network_state()101 static void reset_stream_network_state() {
102 stream_network_state_ = absl::nullopt;
103 }
104
reset_state()105 static void reset_state() {
106 reset_trailing_metadata_available();
107 reset_stream_network_state();
108 }
109
110 private:
111 class CallData {
112 public:
Init(grpc_call_element * elem,const grpc_call_element_args * args)113 static grpc_error_handle Init(grpc_call_element* elem,
114 const grpc_call_element_args* args) {
115 new (elem->call_data) CallData(args);
116 return absl::OkStatus();
117 }
118
Destroy(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure *)119 static void Destroy(grpc_call_element* elem,
120 const grpc_call_final_info* /*final_info*/,
121 grpc_closure* /*ignored*/) {
122 auto* calld = static_cast<CallData*>(elem->call_data);
123 calld->~CallData();
124 }
125
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)126 static void StartTransportStreamOpBatch(
127 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
128 auto* calld = static_cast<CallData*>(elem->call_data);
129 if (batch->recv_initial_metadata) {
130 calld->trailing_metadata_available_ =
131 batch->payload->recv_initial_metadata.trailing_metadata_available;
132 calld->original_recv_initial_metadata_ready_ =
133 batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
134 batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
135 &calld->recv_initial_metadata_ready_;
136 }
137 if (batch->recv_trailing_metadata) {
138 calld->recv_trailing_metadata_ =
139 batch->payload->recv_trailing_metadata.recv_trailing_metadata;
140 calld->original_recv_trailing_metadata_ready_ =
141 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
142 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
143 &calld->recv_trailing_metadata_ready_;
144 }
145 grpc_call_next_op(elem, batch);
146 }
147
148 private:
CallData(const grpc_call_element_args *)149 explicit CallData(const grpc_call_element_args* /*args*/) {
150 GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
151 this, nullptr);
152 GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
153 RecvTrailingMetadataReady, this, nullptr);
154 }
155
RecvInitialMetadataReady(void * arg,grpc_error_handle error)156 static void RecvInitialMetadataReady(void* arg, grpc_error_handle error) {
157 auto* calld = static_cast<CallData*>(arg);
158 TrailingMetadataRecordingFilter::trailing_metadata_available_ =
159 *calld->trailing_metadata_available_;
160 Closure::Run(DEBUG_LOCATION, calld->original_recv_initial_metadata_ready_,
161 error);
162 }
163
RecvTrailingMetadataReady(void * arg,grpc_error_handle error)164 static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error) {
165 auto* calld = static_cast<CallData*>(arg);
166 stream_network_state_ =
167 calld->recv_trailing_metadata_->get(GrpcStreamNetworkState());
168 Closure::Run(DEBUG_LOCATION,
169 calld->original_recv_trailing_metadata_ready_, error);
170 }
171
172 bool* trailing_metadata_available_ = nullptr;
173 grpc_closure recv_initial_metadata_ready_;
174 grpc_closure* original_recv_initial_metadata_ready_ = nullptr;
175 grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
176 grpc_closure recv_trailing_metadata_ready_;
177 grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;
178 };
179
Init(grpc_channel_element * elem,grpc_channel_element_args *)180 static grpc_error_handle Init(grpc_channel_element* elem,
181 grpc_channel_element_args* /*args*/) {
182 new (elem->channel_data) TrailingMetadataRecordingFilter();
183 return absl::OkStatus();
184 }
185
Destroy(grpc_channel_element * elem)186 static void Destroy(grpc_channel_element* elem) {
187 auto* chand =
188 static_cast<TrailingMetadataRecordingFilter*>(elem->channel_data);
189 chand->~TrailingMetadataRecordingFilter();
190 }
191
192 static bool trailing_metadata_available_;
193 static absl::optional<GrpcStreamNetworkState::ValueType>
194 stream_network_state_;
195 };
196
197 grpc_channel_filter TrailingMetadataRecordingFilter::kFilterVtable = {
198 CallData::StartTransportStreamOpBatch,
199 grpc_channel_next_op,
200 sizeof(CallData),
201 CallData::Init,
202 grpc_call_stack_ignore_set_pollset_or_pollset_set,
203 CallData::Destroy,
204 sizeof(TrailingMetadataRecordingFilter),
205 Init,
206 grpc_channel_stack_no_post_init,
207 Destroy,
208 grpc_channel_next_get_info,
209 // Want to add the filter as close to the end as possible, to
210 // make sure that all of the filters work well together.
211 // However, we can't add it at the very end, because the
212 // connected channel filter must be the last one.
213 // Channel init code falls back to lexical ordering of filters if there are
214 // otherwise no dependencies, so we leverage that.
215 GRPC_UNIQUE_TYPE_NAME_HERE("zzzzzz_trailing-metadata-recording-filter"),
216 };
217 bool TrailingMetadataRecordingFilter::trailing_metadata_available_;
218 absl::optional<GrpcStreamNetworkState::ValueType>
219 TrailingMetadataRecordingFilter::stream_network_state_;
220
221 class StreamsNotSeenTest : public ::testing::Test {
222 protected:
StreamsNotSeenTest(bool server_allows_streams=true)223 explicit StreamsNotSeenTest(bool server_allows_streams = true)
224 : server_allows_streams_(server_allows_streams) {
225 // Reset the filter state
226 TrailingMetadataRecordingFilter::reset_state();
227 grpc_slice_buffer_init(&read_buffer_);
228 GRPC_CLOSURE_INIT(&on_read_done_, OnReadDone, this, nullptr);
229 GRPC_CLOSURE_INIT(&on_read_done_scheduler_, OnReadDoneScheduler, this,
230 nullptr);
231 // Start the test tcp server
232 port_ = grpc_pick_unused_port_or_die();
233 test_tcp_server_init(&server_, OnConnect, this);
234 test_tcp_server_start(&server_, port_);
235 // Start polling on the test tcp server
236 server_poll_thread_ = std::make_unique<std::thread>([this]() {
237 while (!shutdown_) {
238 test_tcp_server_poll(&server_, 10);
239 }
240 });
241 // Create the channel
242 cq_ = grpc_completion_queue_create_for_next(nullptr);
243 cqv_ = std::make_unique<CqVerifier>(cq_);
244 grpc_arg client_args[] = {
245 grpc_channel_arg_integer_create(
246 const_cast<char*>(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA), 0),
247 grpc_channel_arg_integer_create(
248 const_cast<char*>(GRPC_ARG_HTTP2_BDP_PROBE), 0),
249 grpc_channel_arg_integer_create(
250 const_cast<char*>(GRPC_ARG_ENABLE_RETRIES), 0)};
251 grpc_channel_args client_channel_args = {GPR_ARRAY_SIZE(client_args),
252 client_args};
253 grpc_channel_credentials* creds = grpc_insecure_credentials_create();
254 channel_ = grpc_channel_create(JoinHostPort("127.0.0.1", port_).c_str(),
255 creds, &client_channel_args);
256 grpc_channel_credentials_release(creds);
257 // Wait for the channel to connect
258 grpc_connectivity_state state = grpc_channel_check_connectivity_state(
259 channel_, /*try_to_connect=*/true);
260 while (state != GRPC_CHANNEL_READY) {
261 grpc_channel_watch_connectivity_state(
262 channel_, state, grpc_timeout_seconds_to_deadline(1), cq_, Tag(1));
263 cqv_->Expect(Tag(1), true);
264 cqv_->Verify(Duration::Seconds(5));
265 state = grpc_channel_check_connectivity_state(channel_, false);
266 }
267 ExecCtx::Get()->Flush();
268 CHECK(
269 connect_notification_.WaitForNotificationWithTimeout(absl::Seconds(1)));
270 }
271
~StreamsNotSeenTest()272 ~StreamsNotSeenTest() override {
273 cqv_.reset();
274 grpc_completion_queue_shutdown(cq_);
275 grpc_event ev;
276 do {
277 ev = grpc_completion_queue_next(cq_, grpc_timeout_seconds_to_deadline(1),
278 nullptr);
279 } while (ev.type != GRPC_QUEUE_SHUTDOWN);
280 grpc_completion_queue_destroy(cq_);
281 grpc_channel_destroy(channel_);
282 if (tcp_ != nullptr) grpc_endpoint_destroy(tcp_);
283 ExecCtx::Get()->Flush();
284 CHECK(read_end_notification_.WaitForNotificationWithTimeout(
285 absl::Seconds(5)));
286 shutdown_ = true;
287 server_poll_thread_->join();
288 test_tcp_server_destroy(&server_);
289 ExecCtx::Get()->Flush();
290 }
291
OnConnect(void * arg,grpc_endpoint * tcp,grpc_pollset *,grpc_tcp_server_acceptor * acceptor)292 static void OnConnect(void* arg, grpc_endpoint* tcp,
293 grpc_pollset* /* accepting_pollset */,
294 grpc_tcp_server_acceptor* acceptor) {
295 gpr_free(acceptor);
296 StreamsNotSeenTest* self = static_cast<StreamsNotSeenTest*>(arg);
297 self->tcp_ = tcp;
298 grpc_endpoint_add_to_pollset(tcp, self->server_.pollset[0]);
299 grpc_endpoint_read(tcp, &self->read_buffer_, &self->on_read_done_, false,
300 /*min_progress_size=*/1);
301 std::thread([self]() {
302 ExecCtx exec_ctx;
303 // Send settings frame from server
304 if (self->server_allows_streams_) {
305 constexpr char kHttp2SettingsFrame[] =
306 "\x00\x00\x00\x04\x00\x00\x00\x00\x00";
307 self->Write(absl::string_view(kHttp2SettingsFrame,
308 sizeof(kHttp2SettingsFrame) - 1));
309 } else {
310 // Create a settings frame with a max concurrent stream setting of 0
311 constexpr char kHttp2SettingsFrame[] =
312 "\x00\x00\x06\x04\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00";
313 self->Write(absl::string_view(kHttp2SettingsFrame,
314 sizeof(kHttp2SettingsFrame) - 1));
315 }
316 self->connect_notification_.Notify();
317 }).detach();
318 }
319
320 // This is a blocking call. It waits for the write callback to be invoked
321 // before returning. (In other words, do not call this from a thread that
322 // should not be blocked, for example, a polling thread.)
Write(absl::string_view bytes)323 void Write(absl::string_view bytes) {
324 grpc_slice slice =
325 StaticSlice::FromStaticBuffer(bytes.data(), bytes.size()).TakeCSlice();
326 grpc_slice_buffer buffer;
327 grpc_slice_buffer_init(&buffer);
328 grpc_slice_buffer_add(&buffer, slice);
329 WriteBuffer(&buffer);
330 grpc_slice_buffer_destroy(&buffer);
331 }
332
SendPing()333 void SendPing() {
334 // Send and recv ping ack
335 const char ping_bytes[] =
336 "\x00\x00\x08\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00";
337 const char ping_ack_bytes[] =
338 "\x00\x00\x08\x06\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00";
339 Write(absl::string_view(ping_bytes, sizeof(ping_bytes) - 1));
340 WaitForReadBytes(
341 absl::string_view(ping_ack_bytes, sizeof(ping_ack_bytes) - 1));
342 }
343
SendGoaway(uint32_t last_stream_id)344 void SendGoaway(uint32_t last_stream_id) {
345 grpc_slice_buffer buffer;
346 grpc_slice_buffer_init(&buffer);
347 grpc_chttp2_goaway_append(last_stream_id, 0, grpc_empty_slice(), &buffer);
348 WriteBuffer(&buffer);
349 grpc_slice_buffer_destroy(&buffer);
350 }
351
WriteBuffer(grpc_slice_buffer * buffer)352 void WriteBuffer(grpc_slice_buffer* buffer) {
353 Notification on_write_done_notification_;
354 GRPC_CLOSURE_INIT(&on_write_done_, OnWriteDone,
355 &on_write_done_notification_, nullptr);
356 {
357 MutexLock lock(&tcp_destroy_mu_);
358 if (tcp_ != nullptr) {
359 grpc_endpoint_write(tcp_, buffer, &on_write_done_, nullptr,
360 /*max_frame_size=*/INT_MAX);
361 }
362 }
363 ExecCtx::Get()->Flush();
364 CHECK(on_write_done_notification_.WaitForNotificationWithTimeout(
365 absl::Seconds(5)));
366 }
367
OnWriteDone(void * arg,grpc_error_handle error)368 static void OnWriteDone(void* arg, grpc_error_handle error) {
369 CHECK_OK(error);
370 Notification* on_write_done_notification_ = static_cast<Notification*>(arg);
371 on_write_done_notification_->Notify();
372 }
373
OnReadDone(void * arg,grpc_error_handle error)374 static void OnReadDone(void* arg, grpc_error_handle error) {
375 StreamsNotSeenTest* self = static_cast<StreamsNotSeenTest*>(arg);
376 if (error.ok()) {
377 {
378 MutexLock lock(&self->mu_);
379 for (size_t i = 0; i < self->read_buffer_.count; ++i) {
380 absl::StrAppend(&self->read_bytes_,
381 StringViewFromSlice(self->read_buffer_.slices[i]));
382 }
383 self->read_cv_.SignalAll();
384 }
385 MutexLock lock(&self->tcp_destroy_mu_);
386 if (self->tcp_ != nullptr) {
387 grpc_slice_buffer_reset_and_unref(&self->read_buffer_);
388 grpc_endpoint_read(self->tcp_, &self->read_buffer_,
389 &self->on_read_done_scheduler_, false,
390 /*min_progress_size=*/1);
391 return;
392 }
393 }
394 grpc_slice_buffer_destroy(&self->read_buffer_);
395 self->read_end_notification_.Notify();
396 }
397
398 // Async hop for OnReadDone(), in case grpc_endpoint_read() invokes
399 // the callback synchronously while holding the lock.
OnReadDoneScheduler(void * arg,grpc_error_handle error)400 static void OnReadDoneScheduler(void* arg, grpc_error_handle error) {
401 StreamsNotSeenTest* self = static_cast<StreamsNotSeenTest*>(arg);
402 ExecCtx::Run(DEBUG_LOCATION, &self->on_read_done_, std::move(error));
403 }
404
CloseServerConnection()405 void CloseServerConnection() {
406 MutexLock lock(&tcp_destroy_mu_);
407 grpc_endpoint_destroy(tcp_);
408 tcp_ = nullptr;
409 }
410
411 // Waits for \a bytes to show up in read_bytes_
WaitForReadBytes(absl::string_view bytes)412 void WaitForReadBytes(absl::string_view bytes) {
413 std::atomic<bool> done{false};
414 std::thread cq_driver([&]() {
415 while (!done) {
416 grpc_event ev = grpc_completion_queue_next(
417 cq_, grpc_timeout_milliseconds_to_deadline(10), nullptr);
418 CHECK(ev.type == GRPC_QUEUE_TIMEOUT);
419 }
420 });
421 {
422 MutexLock lock(&mu_);
423 while (!absl::StrContains(read_bytes_, bytes)) {
424 read_cv_.WaitWithTimeout(&mu_, absl::Seconds(5));
425 }
426 }
427 done = true;
428 cq_driver.join();
429 }
430
431 // Flag to check whether the server's MAX_CONCURRENT_STREAM setting is
432 // non-zero or not.
433 bool server_allows_streams_;
434 int port_;
435 test_tcp_server server_;
436 std::unique_ptr<std::thread> server_poll_thread_;
437 // Guards destroying tcp_, so that we know not to start the next read/write.
438 Mutex tcp_destroy_mu_;
439 grpc_endpoint* tcp_ = nullptr;
440 Notification connect_notification_;
441 grpc_slice_buffer read_buffer_;
442 grpc_closure on_write_done_;
443 grpc_closure on_read_done_;
444 grpc_closure on_read_done_scheduler_;
445 Notification read_end_notification_;
446 std::string read_bytes_ ABSL_GUARDED_BY(mu_);
447 grpc_channel* channel_ = nullptr;
448 grpc_completion_queue* cq_ = nullptr;
449 std::unique_ptr<CqVerifier> cqv_;
450 Mutex mu_;
451 CondVar read_cv_;
452 std::atomic<bool> shutdown_{false};
453 };
454
455 // Client's HTTP2 transport starts a new stream, sends the request on the wire,
456 // but receives a GOAWAY with a stream ID of 0, meaning that the request was
457 // unseen by the server.The test verifies that the HTTP2 transport adds
458 // GrpcNetworkStreamState with a value of kNotSeenByServer to the trailing
459 // metadata.
TEST_F(StreamsNotSeenTest,StartStreamBeforeGoaway)460 TEST_F(StreamsNotSeenTest, StartStreamBeforeGoaway) {
461 grpc_call* c =
462 grpc_channel_create_call(channel_, nullptr, GRPC_PROPAGATE_DEFAULTS, cq_,
463 grpc_slice_from_static_string("/foo"), nullptr,
464 grpc_timeout_seconds_to_deadline(1), nullptr);
465 CHECK(c);
466 grpc_metadata_array initial_metadata_recv;
467 grpc_metadata_array trailing_metadata_recv;
468 grpc_metadata_array_init(&initial_metadata_recv);
469 grpc_metadata_array_init(&trailing_metadata_recv);
470 grpc_op* op;
471 grpc_op ops[6];
472 grpc_status_code status;
473 const char* error_string;
474 grpc_call_error error;
475 grpc_slice details;
476 // Send the request
477 memset(ops, 0, sizeof(ops));
478 op = ops;
479 op->op = GRPC_OP_SEND_INITIAL_METADATA;
480 op->data.send_initial_metadata.count = 0;
481 op->flags = 0;
482 op->reserved = nullptr;
483 op++;
484 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
485 op->flags = 0;
486 op->reserved = nullptr;
487 op++;
488 error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), Tag(101),
489 nullptr);
490 cqv_->Expect(Tag(101), true);
491 cqv_->Verify();
492 // Send a goaway from server signalling that the request was unseen by the
493 // server.
494 SendGoaway(0);
495 memset(ops, 0, sizeof(ops));
496 op = ops;
497 op->op = GRPC_OP_RECV_INITIAL_METADATA;
498 op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
499 op->flags = 0;
500 op->reserved = nullptr;
501 op++;
502 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
503 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
504 op->data.recv_status_on_client.status = &status;
505 op->data.recv_status_on_client.status_details = &details;
506 op->data.recv_status_on_client.error_string = &error_string;
507 op->flags = 0;
508 op->reserved = nullptr;
509 op++;
510 error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), Tag(102),
511 nullptr);
512 CHECK_EQ(error, GRPC_CALL_OK);
513 cqv_->Expect(Tag(102), true);
514 cqv_->Verify();
515 // Verify status and metadata
516 EXPECT_EQ(status, GRPC_STATUS_UNAVAILABLE);
517 ASSERT_TRUE(TrailingMetadataRecordingFilter::trailing_metadata_available());
518 ASSERT_TRUE(
519 TrailingMetadataRecordingFilter::stream_network_state().has_value());
520 EXPECT_EQ(TrailingMetadataRecordingFilter::stream_network_state().value(),
521 GrpcStreamNetworkState::kNotSeenByServer);
522 grpc_slice_unref(details);
523 gpr_free(const_cast<char*>(error_string));
524 grpc_metadata_array_destroy(&initial_metadata_recv);
525 grpc_metadata_array_destroy(&trailing_metadata_recv);
526 grpc_call_unref(c);
527 ExecCtx::Get()->Flush();
528 }
529
530 // Client's HTTP2 transport starts a new stream, sends the request on the wire,
531 // notices that the transport is destroyed. The test verifies that the HTTP2
532 // transport does not add GrpcNetworkStreamState metadata since we don't know
533 // whether the server saw the request or not.
TEST_F(StreamsNotSeenTest,TransportDestroyed)534 TEST_F(StreamsNotSeenTest, TransportDestroyed) {
535 grpc_call* c =
536 grpc_channel_create_call(channel_, nullptr, GRPC_PROPAGATE_DEFAULTS, cq_,
537 grpc_slice_from_static_string("/foo"), nullptr,
538 grpc_timeout_seconds_to_deadline(1), nullptr);
539 CHECK(c);
540 grpc_metadata_array initial_metadata_recv;
541 grpc_metadata_array trailing_metadata_recv;
542 grpc_metadata_array_init(&initial_metadata_recv);
543 grpc_metadata_array_init(&trailing_metadata_recv);
544 grpc_op* op;
545 grpc_op ops[6];
546 grpc_status_code status;
547 const char* error_string;
548 grpc_call_error error;
549 grpc_slice details;
550 // Send the request
551 memset(ops, 0, sizeof(ops));
552 op = ops;
553 op->op = GRPC_OP_SEND_INITIAL_METADATA;
554 op->data.send_initial_metadata.count = 0;
555 op->flags = 0;
556 op->reserved = nullptr;
557 op++;
558 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
559 op->flags = 0;
560 op->reserved = nullptr;
561 op++;
562 error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), Tag(101),
563 nullptr);
564 cqv_->Expect(Tag(101), true);
565 cqv_->Verify();
566 // Shutdown the server endpoint
567 CloseServerConnection();
568 memset(ops, 0, sizeof(ops));
569 op = ops;
570 op->op = GRPC_OP_RECV_INITIAL_METADATA;
571 op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
572 op->flags = 0;
573 op->reserved = nullptr;
574 op++;
575 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
576 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
577 op->data.recv_status_on_client.status = &status;
578 op->data.recv_status_on_client.status_details = &details;
579 op->data.recv_status_on_client.error_string = &error_string;
580 op->flags = 0;
581 op->reserved = nullptr;
582 op++;
583 error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), Tag(102),
584 nullptr);
585 CHECK_EQ(error, GRPC_CALL_OK);
586 cqv_->Expect(Tag(102), true);
587 cqv_->Verify();
588 // Verify status and metadata
589 EXPECT_EQ(status, GRPC_STATUS_UNAVAILABLE);
590 EXPECT_FALSE(
591 TrailingMetadataRecordingFilter::stream_network_state().has_value());
592 grpc_slice_unref(details);
593 gpr_free(const_cast<char*>(error_string));
594 grpc_metadata_array_destroy(&initial_metadata_recv);
595 grpc_metadata_array_destroy(&trailing_metadata_recv);
596 grpc_call_unref(c);
597 ExecCtx::Get()->Flush();
598 }
599
600 // Client's HTTP2 transport tries to send an RPC after having received a GOAWAY
601 // frame. The test verifies that the HTTP2 transport adds GrpcNetworkStreamState
602 // with a value of kNotSentOnWire to the trailing metadata.
TEST_F(StreamsNotSeenTest,StartStreamAfterGoaway)603 TEST_F(StreamsNotSeenTest, StartStreamAfterGoaway) {
604 // Send Goaway from the server
605 SendGoaway(0);
606 // Send a ping to make sure that the goaway was received.
607 SendPing();
608 // Try sending an RPC
609 grpc_call* c =
610 grpc_channel_create_call(channel_, nullptr, GRPC_PROPAGATE_DEFAULTS, cq_,
611 grpc_slice_from_static_string("/foo"), nullptr,
612 grpc_timeout_seconds_to_deadline(1), nullptr);
613 CHECK(c);
614 grpc_metadata_array initial_metadata_recv;
615 grpc_metadata_array trailing_metadata_recv;
616 grpc_metadata_array_init(&initial_metadata_recv);
617 grpc_metadata_array_init(&trailing_metadata_recv);
618 grpc_op* op;
619 grpc_op ops[6];
620 grpc_status_code status;
621 const char* error_string;
622 grpc_call_error error;
623 grpc_slice details;
624 memset(ops, 0, sizeof(ops));
625 op = ops;
626 op->op = GRPC_OP_SEND_INITIAL_METADATA;
627 op->data.send_initial_metadata.count = 0;
628 op->flags = 0;
629 op->reserved = nullptr;
630 op++;
631 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
632 op->flags = 0;
633 op->reserved = nullptr;
634 op++;
635 op->op = GRPC_OP_RECV_INITIAL_METADATA;
636 op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
637 op->flags = 0;
638 op->reserved = nullptr;
639 op++;
640 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
641 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
642 op->data.recv_status_on_client.status = &status;
643 op->data.recv_status_on_client.status_details = &details;
644 op->data.recv_status_on_client.error_string = &error_string;
645 op->flags = 0;
646 op->reserved = nullptr;
647 op++;
648 error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), Tag(101),
649 nullptr);
650 CHECK_EQ(error, GRPC_CALL_OK);
651 cqv_->Expect(Tag(101), true);
652 cqv_->Verify();
653 // Verify status and metadata
654 EXPECT_EQ(status, GRPC_STATUS_UNAVAILABLE);
655 ASSERT_TRUE(TrailingMetadataRecordingFilter::trailing_metadata_available());
656 ASSERT_TRUE(
657 TrailingMetadataRecordingFilter::stream_network_state().has_value());
658 EXPECT_EQ(TrailingMetadataRecordingFilter::stream_network_state().value(),
659 GrpcStreamNetworkState::kNotSentOnWire);
660 grpc_slice_unref(details);
661 gpr_free(const_cast<char*>(error_string));
662 grpc_metadata_array_destroy(&initial_metadata_recv);
663 grpc_metadata_array_destroy(&trailing_metadata_recv);
664 grpc_call_unref(c);
665 ExecCtx::Get()->Flush();
666 }
667
668 // These tests have the server sending a SETTINGS_FRAME with a max concurrent
669 // streams settings of 0 which denies the client the chance to start a stream.
670 // Note that in the future, these tests might become outdated if the
671 // client_channel learns about the max concurrent streams setting.
672 class ZeroConcurrencyTest : public StreamsNotSeenTest {
673 protected:
ZeroConcurrencyTest()674 ZeroConcurrencyTest() : StreamsNotSeenTest(/*server_allows_streams=*/false) {}
675 };
676
677 // Client's HTTP2 transport receives a RPC request, but it cannot start the RPC
678 // because of the max concurrent streams setting. A goaway frame is then
679 // received which should result in the RPC getting cancelled with
680 // kNotSentOnWire.
TEST_F(ZeroConcurrencyTest,StartStreamBeforeGoaway)681 TEST_F(ZeroConcurrencyTest, StartStreamBeforeGoaway) {
682 grpc_call* c =
683 grpc_channel_create_call(channel_, nullptr, GRPC_PROPAGATE_DEFAULTS, cq_,
684 grpc_slice_from_static_string("/foo"), nullptr,
685 grpc_timeout_seconds_to_deadline(5), nullptr);
686 CHECK(c);
687 grpc_metadata_array initial_metadata_recv;
688 grpc_metadata_array trailing_metadata_recv;
689 grpc_metadata_array_init(&initial_metadata_recv);
690 grpc_metadata_array_init(&trailing_metadata_recv);
691 grpc_op* op;
692 grpc_op ops[6];
693 grpc_status_code status;
694 const char* error_string;
695 grpc_call_error error;
696 grpc_slice details;
697 // Send the request
698 memset(ops, 0, sizeof(ops));
699 op = ops;
700 op->op = GRPC_OP_SEND_INITIAL_METADATA;
701 op->data.send_initial_metadata.count = 0;
702 op->flags = 0;
703 op->reserved = nullptr;
704 op++;
705 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
706 op->flags = 0;
707 op->reserved = nullptr;
708 op++;
709 op->op = GRPC_OP_RECV_INITIAL_METADATA;
710 op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
711 op->flags = 0;
712 op->reserved = nullptr;
713 op++;
714 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
715 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
716 op->data.recv_status_on_client.status = &status;
717 op->data.recv_status_on_client.status_details = &details;
718 op->data.recv_status_on_client.error_string = &error_string;
719 op->flags = 0;
720 op->reserved = nullptr;
721 op++;
722 error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), Tag(101),
723 nullptr);
724 // This test assumes that nothing would pause the RPC before its received by
725 // the transport. If that no longer holds true, we might need to drive the cq
726 // for some time to make sure that the RPC reaches the HTTP2 layer.
727 SendGoaway(0);
728 CHECK_EQ(error, GRPC_CALL_OK);
729 cqv_->Expect(Tag(101), true);
730 cqv_->Verify();
731 // Verify status and metadata
732 EXPECT_EQ(status, GRPC_STATUS_UNAVAILABLE);
733 ASSERT_TRUE(TrailingMetadataRecordingFilter::trailing_metadata_available());
734 ASSERT_TRUE(
735 TrailingMetadataRecordingFilter::stream_network_state().has_value());
736 EXPECT_EQ(TrailingMetadataRecordingFilter::stream_network_state().value(),
737 GrpcStreamNetworkState::kNotSentOnWire);
738 grpc_slice_unref(details);
739 gpr_free(const_cast<char*>(error_string));
740 grpc_metadata_array_destroy(&initial_metadata_recv);
741 grpc_metadata_array_destroy(&trailing_metadata_recv);
742 grpc_call_unref(c);
743 ExecCtx::Get()->Flush();
744 }
745
746 // Client's HTTP2 transport receives a RPC request, but it cannot start the RPC
747 // because of the max concurrent streams setting. Server then shuts its endpoint
748 // which should result in the RPC getting cancelled with kNotSentOnWire.
TEST_F(ZeroConcurrencyTest,TransportDestroyed)749 TEST_F(ZeroConcurrencyTest, TransportDestroyed) {
750 grpc_call* c =
751 grpc_channel_create_call(channel_, nullptr, GRPC_PROPAGATE_DEFAULTS, cq_,
752 grpc_slice_from_static_string("/foo"), nullptr,
753 grpc_timeout_seconds_to_deadline(5), nullptr);
754 CHECK(c);
755 grpc_metadata_array initial_metadata_recv;
756 grpc_metadata_array trailing_metadata_recv;
757 grpc_metadata_array_init(&initial_metadata_recv);
758 grpc_metadata_array_init(&trailing_metadata_recv);
759 grpc_op* op;
760 grpc_op ops[6];
761 grpc_status_code status;
762 const char* error_string;
763 grpc_call_error error;
764 grpc_slice details;
765 // Send the request
766 memset(ops, 0, sizeof(ops));
767 op = ops;
768 op->op = GRPC_OP_SEND_INITIAL_METADATA;
769 op->data.send_initial_metadata.count = 0;
770 op->flags = 0;
771 op->reserved = nullptr;
772 op++;
773 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
774 op->flags = 0;
775 op->reserved = nullptr;
776 op++;
777 op->op = GRPC_OP_RECV_INITIAL_METADATA;
778 op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
779 op->flags = 0;
780 op->reserved = nullptr;
781 op++;
782 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
783 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
784 op->data.recv_status_on_client.status = &status;
785 op->data.recv_status_on_client.status_details = &details;
786 op->data.recv_status_on_client.error_string = &error_string;
787 op->flags = 0;
788 op->reserved = nullptr;
789 op++;
790 error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), Tag(101),
791 nullptr);
792 CloseServerConnection();
793 CHECK_EQ(error, GRPC_CALL_OK);
794 cqv_->Expect(Tag(101), true);
795 cqv_->Verify();
796 // Verify status and metadata
797 EXPECT_EQ(status, GRPC_STATUS_UNAVAILABLE);
798 ASSERT_TRUE(TrailingMetadataRecordingFilter::trailing_metadata_available());
799 ASSERT_TRUE(
800 TrailingMetadataRecordingFilter::stream_network_state().has_value());
801 EXPECT_EQ(TrailingMetadataRecordingFilter::stream_network_state().value(),
802 GrpcStreamNetworkState::kNotSentOnWire);
803 grpc_slice_unref(details);
804 gpr_free(const_cast<char*>(error_string));
805 grpc_metadata_array_destroy(&initial_metadata_recv);
806 grpc_metadata_array_destroy(&trailing_metadata_recv);
807 grpc_call_unref(c);
808 ExecCtx::Get()->Flush();
809 }
810
811 } // namespace
812 } // namespace grpc_core
813
main(int argc,char ** argv)814 int main(int argc, char** argv) {
815 ::testing::InitGoogleTest(&argc, argv);
816 grpc::testing::TestEnvironment env(&argc, argv);
817 int result;
818 grpc_core::CoreConfiguration::RunWithSpecialConfiguration(
819 [](grpc_core::CoreConfiguration::Builder* builder) {
820 grpc_core::BuildCoreConfiguration(builder);
821 builder->channel_init()->RegisterFilter(
822 GRPC_CLIENT_SUBCHANNEL,
823 &grpc_core::TrailingMetadataRecordingFilter::kFilterVtable);
824 },
825 [&] {
826 grpc_core::
827 TestOnlyGlobalHttp2TransportDisableTransientFailureStateNotification(
828 true);
829 grpc_init();
830 {
831 grpc_core::ExecCtx exec_ctx;
832 result = RUN_ALL_TESTS();
833 }
834 grpc_shutdown();
835 });
836 return result;
837 }
838