• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2022 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/credentials.h>
20 #include <grpc/grpc.h>
21 #include <grpc/grpc_security.h>
22 #include <grpc/impl/channel_arg_names.h>
23 #include <grpc/impl/propagation_bits.h>
24 #include <grpc/slice.h>
25 #include <grpc/slice_buffer.h>
26 #include <grpc/status.h>
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/port_platform.h>
29 #include <limits.h>
30 #include <stdint.h>
31 #include <stdlib.h>
32 #include <string.h>
33 
34 #include <atomic>
35 #include <memory>
36 #include <new>
37 #include <string>
38 #include <thread>
39 #include <vector>
40 
41 #include "absl/base/thread_annotations.h"
42 #include "absl/log/check.h"
43 #include "absl/status/status.h"
44 #include "absl/strings/match.h"
45 #include "absl/strings/str_cat.h"
46 #include "absl/strings/string_view.h"
47 #include "absl/time/time.h"
48 #include "absl/types/optional.h"
49 #include "gtest/gtest.h"
50 #include "src/core/config/core_configuration.h"
51 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
52 #include "src/core/ext/transport/chttp2/transport/frame_goaway.h"
53 #include "src/core/lib/channel/channel_args.h"
54 #include "src/core/lib/channel/channel_fwd.h"
55 #include "src/core/lib/channel/channel_stack.h"
56 #include "src/core/lib/iomgr/closure.h"
57 #include "src/core/lib/iomgr/endpoint.h"
58 #include "src/core/lib/iomgr/error.h"
59 #include "src/core/lib/iomgr/exec_ctx.h"
60 #include "src/core/lib/iomgr/iomgr_fwd.h"
61 #include "src/core/lib/iomgr/tcp_server.h"
62 #include "src/core/lib/slice/slice.h"
63 #include "src/core/lib/slice/slice_internal.h"
64 #include "src/core/lib/surface/channel_stack_type.h"
65 #include "src/core/lib/transport/metadata_batch.h"
66 #include "src/core/lib/transport/transport.h"
67 #include "src/core/util/debug_location.h"
68 #include "src/core/util/host_port.h"
69 #include "src/core/util/notification.h"
70 #include "src/core/util/sync.h"
71 #include "src/core/util/time.h"
72 #include "src/core/util/useful.h"
73 #include "test/core/end2end/cq_verifier.h"
74 #include "test/core/test_util/port.h"
75 #include "test/core/test_util/test_config.h"
76 #include "test/core/test_util/test_tcp_server.h"
77 
78 namespace grpc_core {
79 namespace {
80 
Tag(intptr_t t)81 void* Tag(intptr_t t) { return reinterpret_cast<void*>(t); }
82 
83 // A filter that records state about trailing metadata.
84 class TrailingMetadataRecordingFilter {
85  public:
86   static grpc_channel_filter kFilterVtable;
87 
trailing_metadata_available()88   static bool trailing_metadata_available() {
89     return trailing_metadata_available_;
90   }
91 
reset_trailing_metadata_available()92   static void reset_trailing_metadata_available() {
93     trailing_metadata_available_ = false;
94   }
95 
96   static absl::optional<GrpcStreamNetworkState::ValueType>
stream_network_state()97   stream_network_state() {
98     return stream_network_state_;
99   }
100 
reset_stream_network_state()101   static void reset_stream_network_state() {
102     stream_network_state_ = absl::nullopt;
103   }
104 
reset_state()105   static void reset_state() {
106     reset_trailing_metadata_available();
107     reset_stream_network_state();
108   }
109 
110  private:
111   class CallData {
112    public:
Init(grpc_call_element * elem,const grpc_call_element_args * args)113     static grpc_error_handle Init(grpc_call_element* elem,
114                                   const grpc_call_element_args* args) {
115       new (elem->call_data) CallData(args);
116       return absl::OkStatus();
117     }
118 
Destroy(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure *)119     static void Destroy(grpc_call_element* elem,
120                         const grpc_call_final_info* /*final_info*/,
121                         grpc_closure* /*ignored*/) {
122       auto* calld = static_cast<CallData*>(elem->call_data);
123       calld->~CallData();
124     }
125 
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)126     static void StartTransportStreamOpBatch(
127         grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
128       auto* calld = static_cast<CallData*>(elem->call_data);
129       if (batch->recv_initial_metadata) {
130         calld->trailing_metadata_available_ =
131             batch->payload->recv_initial_metadata.trailing_metadata_available;
132         calld->original_recv_initial_metadata_ready_ =
133             batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
134         batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
135             &calld->recv_initial_metadata_ready_;
136       }
137       if (batch->recv_trailing_metadata) {
138         calld->recv_trailing_metadata_ =
139             batch->payload->recv_trailing_metadata.recv_trailing_metadata;
140         calld->original_recv_trailing_metadata_ready_ =
141             batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
142         batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
143             &calld->recv_trailing_metadata_ready_;
144       }
145       grpc_call_next_op(elem, batch);
146     }
147 
148    private:
CallData(const grpc_call_element_args *)149     explicit CallData(const grpc_call_element_args* /*args*/) {
150       GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
151                         this, nullptr);
152       GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
153                         RecvTrailingMetadataReady, this, nullptr);
154     }
155 
RecvInitialMetadataReady(void * arg,grpc_error_handle error)156     static void RecvInitialMetadataReady(void* arg, grpc_error_handle error) {
157       auto* calld = static_cast<CallData*>(arg);
158       TrailingMetadataRecordingFilter::trailing_metadata_available_ =
159           *calld->trailing_metadata_available_;
160       Closure::Run(DEBUG_LOCATION, calld->original_recv_initial_metadata_ready_,
161                    error);
162     }
163 
RecvTrailingMetadataReady(void * arg,grpc_error_handle error)164     static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error) {
165       auto* calld = static_cast<CallData*>(arg);
166       stream_network_state_ =
167           calld->recv_trailing_metadata_->get(GrpcStreamNetworkState());
168       Closure::Run(DEBUG_LOCATION,
169                    calld->original_recv_trailing_metadata_ready_, error);
170     }
171 
172     bool* trailing_metadata_available_ = nullptr;
173     grpc_closure recv_initial_metadata_ready_;
174     grpc_closure* original_recv_initial_metadata_ready_ = nullptr;
175     grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
176     grpc_closure recv_trailing_metadata_ready_;
177     grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;
178   };
179 
Init(grpc_channel_element * elem,grpc_channel_element_args *)180   static grpc_error_handle Init(grpc_channel_element* elem,
181                                 grpc_channel_element_args* /*args*/) {
182     new (elem->channel_data) TrailingMetadataRecordingFilter();
183     return absl::OkStatus();
184   }
185 
Destroy(grpc_channel_element * elem)186   static void Destroy(grpc_channel_element* elem) {
187     auto* chand =
188         static_cast<TrailingMetadataRecordingFilter*>(elem->channel_data);
189     chand->~TrailingMetadataRecordingFilter();
190   }
191 
192   static bool trailing_metadata_available_;
193   static absl::optional<GrpcStreamNetworkState::ValueType>
194       stream_network_state_;
195 };
196 
197 grpc_channel_filter TrailingMetadataRecordingFilter::kFilterVtable = {
198     CallData::StartTransportStreamOpBatch,
199     grpc_channel_next_op,
200     sizeof(CallData),
201     CallData::Init,
202     grpc_call_stack_ignore_set_pollset_or_pollset_set,
203     CallData::Destroy,
204     sizeof(TrailingMetadataRecordingFilter),
205     Init,
206     grpc_channel_stack_no_post_init,
207     Destroy,
208     grpc_channel_next_get_info,
209     // Want to add the filter as close to the end as possible, to
210     // make sure that all of the filters work well together.
211     // However, we can't add it at the very end, because the
212     // connected channel filter must be the last one.
213     // Channel init code falls back to lexical ordering of filters if there are
214     // otherwise no dependencies, so we leverage that.
215     GRPC_UNIQUE_TYPE_NAME_HERE("zzzzzz_trailing-metadata-recording-filter"),
216 };
217 bool TrailingMetadataRecordingFilter::trailing_metadata_available_;
218 absl::optional<GrpcStreamNetworkState::ValueType>
219     TrailingMetadataRecordingFilter::stream_network_state_;
220 
221 class StreamsNotSeenTest : public ::testing::Test {
222  protected:
StreamsNotSeenTest(bool server_allows_streams=true)223   explicit StreamsNotSeenTest(bool server_allows_streams = true)
224       : server_allows_streams_(server_allows_streams) {
225     // Reset the filter state
226     TrailingMetadataRecordingFilter::reset_state();
227     grpc_slice_buffer_init(&read_buffer_);
228     GRPC_CLOSURE_INIT(&on_read_done_, OnReadDone, this, nullptr);
229     GRPC_CLOSURE_INIT(&on_read_done_scheduler_, OnReadDoneScheduler, this,
230                       nullptr);
231     // Start the test tcp server
232     port_ = grpc_pick_unused_port_or_die();
233     test_tcp_server_init(&server_, OnConnect, this);
234     test_tcp_server_start(&server_, port_);
235     // Start polling on the test tcp server
236     server_poll_thread_ = std::make_unique<std::thread>([this]() {
237       while (!shutdown_) {
238         test_tcp_server_poll(&server_, 10);
239       }
240     });
241     // Create the channel
242     cq_ = grpc_completion_queue_create_for_next(nullptr);
243     cqv_ = std::make_unique<CqVerifier>(cq_);
244     grpc_arg client_args[] = {
245         grpc_channel_arg_integer_create(
246             const_cast<char*>(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA), 0),
247         grpc_channel_arg_integer_create(
248             const_cast<char*>(GRPC_ARG_HTTP2_BDP_PROBE), 0),
249         grpc_channel_arg_integer_create(
250             const_cast<char*>(GRPC_ARG_ENABLE_RETRIES), 0)};
251     grpc_channel_args client_channel_args = {GPR_ARRAY_SIZE(client_args),
252                                              client_args};
253     grpc_channel_credentials* creds = grpc_insecure_credentials_create();
254     channel_ = grpc_channel_create(JoinHostPort("127.0.0.1", port_).c_str(),
255                                    creds, &client_channel_args);
256     grpc_channel_credentials_release(creds);
257     // Wait for the channel to connect
258     grpc_connectivity_state state = grpc_channel_check_connectivity_state(
259         channel_, /*try_to_connect=*/true);
260     while (state != GRPC_CHANNEL_READY) {
261       grpc_channel_watch_connectivity_state(
262           channel_, state, grpc_timeout_seconds_to_deadline(1), cq_, Tag(1));
263       cqv_->Expect(Tag(1), true);
264       cqv_->Verify(Duration::Seconds(5));
265       state = grpc_channel_check_connectivity_state(channel_, false);
266     }
267     ExecCtx::Get()->Flush();
268     CHECK(
269         connect_notification_.WaitForNotificationWithTimeout(absl::Seconds(1)));
270   }
271 
~StreamsNotSeenTest()272   ~StreamsNotSeenTest() override {
273     cqv_.reset();
274     grpc_completion_queue_shutdown(cq_);
275     grpc_event ev;
276     do {
277       ev = grpc_completion_queue_next(cq_, grpc_timeout_seconds_to_deadline(1),
278                                       nullptr);
279     } while (ev.type != GRPC_QUEUE_SHUTDOWN);
280     grpc_completion_queue_destroy(cq_);
281     grpc_channel_destroy(channel_);
282     if (tcp_ != nullptr) grpc_endpoint_destroy(tcp_);
283     ExecCtx::Get()->Flush();
284     CHECK(read_end_notification_.WaitForNotificationWithTimeout(
285         absl::Seconds(5)));
286     shutdown_ = true;
287     server_poll_thread_->join();
288     test_tcp_server_destroy(&server_);
289     ExecCtx::Get()->Flush();
290   }
291 
OnConnect(void * arg,grpc_endpoint * tcp,grpc_pollset *,grpc_tcp_server_acceptor * acceptor)292   static void OnConnect(void* arg, grpc_endpoint* tcp,
293                         grpc_pollset* /* accepting_pollset */,
294                         grpc_tcp_server_acceptor* acceptor) {
295     gpr_free(acceptor);
296     StreamsNotSeenTest* self = static_cast<StreamsNotSeenTest*>(arg);
297     self->tcp_ = tcp;
298     grpc_endpoint_add_to_pollset(tcp, self->server_.pollset[0]);
299     grpc_endpoint_read(tcp, &self->read_buffer_, &self->on_read_done_, false,
300                        /*min_progress_size=*/1);
301     std::thread([self]() {
302       ExecCtx exec_ctx;
303       // Send settings frame from server
304       if (self->server_allows_streams_) {
305         constexpr char kHttp2SettingsFrame[] =
306             "\x00\x00\x00\x04\x00\x00\x00\x00\x00";
307         self->Write(absl::string_view(kHttp2SettingsFrame,
308                                       sizeof(kHttp2SettingsFrame) - 1));
309       } else {
310         // Create a settings frame with a max concurrent stream setting of 0
311         constexpr char kHttp2SettingsFrame[] =
312             "\x00\x00\x06\x04\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00";
313         self->Write(absl::string_view(kHttp2SettingsFrame,
314                                       sizeof(kHttp2SettingsFrame) - 1));
315       }
316       self->connect_notification_.Notify();
317     }).detach();
318   }
319 
320   // This is a blocking call. It waits for the write callback to be invoked
321   // before returning. (In other words, do not call this from a thread that
322   // should not be blocked, for example, a polling thread.)
Write(absl::string_view bytes)323   void Write(absl::string_view bytes) {
324     grpc_slice slice =
325         StaticSlice::FromStaticBuffer(bytes.data(), bytes.size()).TakeCSlice();
326     grpc_slice_buffer buffer;
327     grpc_slice_buffer_init(&buffer);
328     grpc_slice_buffer_add(&buffer, slice);
329     WriteBuffer(&buffer);
330     grpc_slice_buffer_destroy(&buffer);
331   }
332 
SendPing()333   void SendPing() {
334     // Send and recv ping ack
335     const char ping_bytes[] =
336         "\x00\x00\x08\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00";
337     const char ping_ack_bytes[] =
338         "\x00\x00\x08\x06\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00";
339     Write(absl::string_view(ping_bytes, sizeof(ping_bytes) - 1));
340     WaitForReadBytes(
341         absl::string_view(ping_ack_bytes, sizeof(ping_ack_bytes) - 1));
342   }
343 
SendGoaway(uint32_t last_stream_id)344   void SendGoaway(uint32_t last_stream_id) {
345     grpc_slice_buffer buffer;
346     grpc_slice_buffer_init(&buffer);
347     grpc_chttp2_goaway_append(last_stream_id, 0, grpc_empty_slice(), &buffer);
348     WriteBuffer(&buffer);
349     grpc_slice_buffer_destroy(&buffer);
350   }
351 
WriteBuffer(grpc_slice_buffer * buffer)352   void WriteBuffer(grpc_slice_buffer* buffer) {
353     Notification on_write_done_notification_;
354     GRPC_CLOSURE_INIT(&on_write_done_, OnWriteDone,
355                       &on_write_done_notification_, nullptr);
356     {
357       MutexLock lock(&tcp_destroy_mu_);
358       if (tcp_ != nullptr) {
359         grpc_endpoint_write(tcp_, buffer, &on_write_done_, nullptr,
360                             /*max_frame_size=*/INT_MAX);
361       }
362     }
363     ExecCtx::Get()->Flush();
364     CHECK(on_write_done_notification_.WaitForNotificationWithTimeout(
365         absl::Seconds(5)));
366   }
367 
OnWriteDone(void * arg,grpc_error_handle error)368   static void OnWriteDone(void* arg, grpc_error_handle error) {
369     CHECK_OK(error);
370     Notification* on_write_done_notification_ = static_cast<Notification*>(arg);
371     on_write_done_notification_->Notify();
372   }
373 
OnReadDone(void * arg,grpc_error_handle error)374   static void OnReadDone(void* arg, grpc_error_handle error) {
375     StreamsNotSeenTest* self = static_cast<StreamsNotSeenTest*>(arg);
376     if (error.ok()) {
377       {
378         MutexLock lock(&self->mu_);
379         for (size_t i = 0; i < self->read_buffer_.count; ++i) {
380           absl::StrAppend(&self->read_bytes_,
381                           StringViewFromSlice(self->read_buffer_.slices[i]));
382         }
383         self->read_cv_.SignalAll();
384       }
385       MutexLock lock(&self->tcp_destroy_mu_);
386       if (self->tcp_ != nullptr) {
387         grpc_slice_buffer_reset_and_unref(&self->read_buffer_);
388         grpc_endpoint_read(self->tcp_, &self->read_buffer_,
389                            &self->on_read_done_scheduler_, false,
390                            /*min_progress_size=*/1);
391         return;
392       }
393     }
394     grpc_slice_buffer_destroy(&self->read_buffer_);
395     self->read_end_notification_.Notify();
396   }
397 
398   // Async hop for OnReadDone(), in case grpc_endpoint_read() invokes
399   // the callback synchronously while holding the lock.
OnReadDoneScheduler(void * arg,grpc_error_handle error)400   static void OnReadDoneScheduler(void* arg, grpc_error_handle error) {
401     StreamsNotSeenTest* self = static_cast<StreamsNotSeenTest*>(arg);
402     ExecCtx::Run(DEBUG_LOCATION, &self->on_read_done_, std::move(error));
403   }
404 
CloseServerConnection()405   void CloseServerConnection() {
406     MutexLock lock(&tcp_destroy_mu_);
407     grpc_endpoint_destroy(tcp_);
408     tcp_ = nullptr;
409   }
410 
411   // Waits for \a bytes to show up in read_bytes_
WaitForReadBytes(absl::string_view bytes)412   void WaitForReadBytes(absl::string_view bytes) {
413     std::atomic<bool> done{false};
414     std::thread cq_driver([&]() {
415       while (!done) {
416         grpc_event ev = grpc_completion_queue_next(
417             cq_, grpc_timeout_milliseconds_to_deadline(10), nullptr);
418         CHECK(ev.type == GRPC_QUEUE_TIMEOUT);
419       }
420     });
421     {
422       MutexLock lock(&mu_);
423       while (!absl::StrContains(read_bytes_, bytes)) {
424         read_cv_.WaitWithTimeout(&mu_, absl::Seconds(5));
425       }
426     }
427     done = true;
428     cq_driver.join();
429   }
430 
431   // Flag to check whether the server's MAX_CONCURRENT_STREAM setting is
432   // non-zero or not.
433   bool server_allows_streams_;
434   int port_;
435   test_tcp_server server_;
436   std::unique_ptr<std::thread> server_poll_thread_;
437   // Guards destroying tcp_, so that we know not to start the next read/write.
438   Mutex tcp_destroy_mu_;
439   grpc_endpoint* tcp_ = nullptr;
440   Notification connect_notification_;
441   grpc_slice_buffer read_buffer_;
442   grpc_closure on_write_done_;
443   grpc_closure on_read_done_;
444   grpc_closure on_read_done_scheduler_;
445   Notification read_end_notification_;
446   std::string read_bytes_ ABSL_GUARDED_BY(mu_);
447   grpc_channel* channel_ = nullptr;
448   grpc_completion_queue* cq_ = nullptr;
449   std::unique_ptr<CqVerifier> cqv_;
450   Mutex mu_;
451   CondVar read_cv_;
452   std::atomic<bool> shutdown_{false};
453 };
454 
455 // Client's HTTP2 transport starts a new stream, sends the request on the wire,
456 // but receives a GOAWAY with a stream ID of 0, meaning that the request was
457 // unseen by the server.The test verifies that the HTTP2 transport adds
458 // GrpcNetworkStreamState with a value of kNotSeenByServer to the trailing
459 // metadata.
TEST_F(StreamsNotSeenTest,StartStreamBeforeGoaway)460 TEST_F(StreamsNotSeenTest, StartStreamBeforeGoaway) {
461   grpc_call* c =
462       grpc_channel_create_call(channel_, nullptr, GRPC_PROPAGATE_DEFAULTS, cq_,
463                                grpc_slice_from_static_string("/foo"), nullptr,
464                                grpc_timeout_seconds_to_deadline(1), nullptr);
465   CHECK(c);
466   grpc_metadata_array initial_metadata_recv;
467   grpc_metadata_array trailing_metadata_recv;
468   grpc_metadata_array_init(&initial_metadata_recv);
469   grpc_metadata_array_init(&trailing_metadata_recv);
470   grpc_op* op;
471   grpc_op ops[6];
472   grpc_status_code status;
473   const char* error_string;
474   grpc_call_error error;
475   grpc_slice details;
476   // Send the request
477   memset(ops, 0, sizeof(ops));
478   op = ops;
479   op->op = GRPC_OP_SEND_INITIAL_METADATA;
480   op->data.send_initial_metadata.count = 0;
481   op->flags = 0;
482   op->reserved = nullptr;
483   op++;
484   op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
485   op->flags = 0;
486   op->reserved = nullptr;
487   op++;
488   error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), Tag(101),
489                                 nullptr);
490   cqv_->Expect(Tag(101), true);
491   cqv_->Verify();
492   // Send a goaway from server signalling that the request was unseen by the
493   // server.
494   SendGoaway(0);
495   memset(ops, 0, sizeof(ops));
496   op = ops;
497   op->op = GRPC_OP_RECV_INITIAL_METADATA;
498   op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
499   op->flags = 0;
500   op->reserved = nullptr;
501   op++;
502   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
503   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
504   op->data.recv_status_on_client.status = &status;
505   op->data.recv_status_on_client.status_details = &details;
506   op->data.recv_status_on_client.error_string = &error_string;
507   op->flags = 0;
508   op->reserved = nullptr;
509   op++;
510   error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), Tag(102),
511                                 nullptr);
512   CHECK_EQ(error, GRPC_CALL_OK);
513   cqv_->Expect(Tag(102), true);
514   cqv_->Verify();
515   // Verify status and metadata
516   EXPECT_EQ(status, GRPC_STATUS_UNAVAILABLE);
517   ASSERT_TRUE(TrailingMetadataRecordingFilter::trailing_metadata_available());
518   ASSERT_TRUE(
519       TrailingMetadataRecordingFilter::stream_network_state().has_value());
520   EXPECT_EQ(TrailingMetadataRecordingFilter::stream_network_state().value(),
521             GrpcStreamNetworkState::kNotSeenByServer);
522   grpc_slice_unref(details);
523   gpr_free(const_cast<char*>(error_string));
524   grpc_metadata_array_destroy(&initial_metadata_recv);
525   grpc_metadata_array_destroy(&trailing_metadata_recv);
526   grpc_call_unref(c);
527   ExecCtx::Get()->Flush();
528 }
529 
530 // Client's HTTP2 transport starts a new stream, sends the request on the wire,
531 // notices that the transport is destroyed. The test verifies that the HTTP2
532 // transport does not add GrpcNetworkStreamState metadata since we don't know
533 // whether the server saw the request or not.
TEST_F(StreamsNotSeenTest,TransportDestroyed)534 TEST_F(StreamsNotSeenTest, TransportDestroyed) {
535   grpc_call* c =
536       grpc_channel_create_call(channel_, nullptr, GRPC_PROPAGATE_DEFAULTS, cq_,
537                                grpc_slice_from_static_string("/foo"), nullptr,
538                                grpc_timeout_seconds_to_deadline(1), nullptr);
539   CHECK(c);
540   grpc_metadata_array initial_metadata_recv;
541   grpc_metadata_array trailing_metadata_recv;
542   grpc_metadata_array_init(&initial_metadata_recv);
543   grpc_metadata_array_init(&trailing_metadata_recv);
544   grpc_op* op;
545   grpc_op ops[6];
546   grpc_status_code status;
547   const char* error_string;
548   grpc_call_error error;
549   grpc_slice details;
550   // Send the request
551   memset(ops, 0, sizeof(ops));
552   op = ops;
553   op->op = GRPC_OP_SEND_INITIAL_METADATA;
554   op->data.send_initial_metadata.count = 0;
555   op->flags = 0;
556   op->reserved = nullptr;
557   op++;
558   op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
559   op->flags = 0;
560   op->reserved = nullptr;
561   op++;
562   error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), Tag(101),
563                                 nullptr);
564   cqv_->Expect(Tag(101), true);
565   cqv_->Verify();
566   // Shutdown the server endpoint
567   CloseServerConnection();
568   memset(ops, 0, sizeof(ops));
569   op = ops;
570   op->op = GRPC_OP_RECV_INITIAL_METADATA;
571   op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
572   op->flags = 0;
573   op->reserved = nullptr;
574   op++;
575   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
576   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
577   op->data.recv_status_on_client.status = &status;
578   op->data.recv_status_on_client.status_details = &details;
579   op->data.recv_status_on_client.error_string = &error_string;
580   op->flags = 0;
581   op->reserved = nullptr;
582   op++;
583   error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), Tag(102),
584                                 nullptr);
585   CHECK_EQ(error, GRPC_CALL_OK);
586   cqv_->Expect(Tag(102), true);
587   cqv_->Verify();
588   // Verify status and metadata
589   EXPECT_EQ(status, GRPC_STATUS_UNAVAILABLE);
590   EXPECT_FALSE(
591       TrailingMetadataRecordingFilter::stream_network_state().has_value());
592   grpc_slice_unref(details);
593   gpr_free(const_cast<char*>(error_string));
594   grpc_metadata_array_destroy(&initial_metadata_recv);
595   grpc_metadata_array_destroy(&trailing_metadata_recv);
596   grpc_call_unref(c);
597   ExecCtx::Get()->Flush();
598 }
599 
600 // Client's HTTP2 transport tries to send an RPC after having received a GOAWAY
601 // frame. The test verifies that the HTTP2 transport adds GrpcNetworkStreamState
602 // with a value of kNotSentOnWire to the trailing metadata.
TEST_F(StreamsNotSeenTest,StartStreamAfterGoaway)603 TEST_F(StreamsNotSeenTest, StartStreamAfterGoaway) {
604   // Send Goaway from the server
605   SendGoaway(0);
606   // Send a ping to make sure that the goaway was received.
607   SendPing();
608   // Try sending an RPC
609   grpc_call* c =
610       grpc_channel_create_call(channel_, nullptr, GRPC_PROPAGATE_DEFAULTS, cq_,
611                                grpc_slice_from_static_string("/foo"), nullptr,
612                                grpc_timeout_seconds_to_deadline(1), nullptr);
613   CHECK(c);
614   grpc_metadata_array initial_metadata_recv;
615   grpc_metadata_array trailing_metadata_recv;
616   grpc_metadata_array_init(&initial_metadata_recv);
617   grpc_metadata_array_init(&trailing_metadata_recv);
618   grpc_op* op;
619   grpc_op ops[6];
620   grpc_status_code status;
621   const char* error_string;
622   grpc_call_error error;
623   grpc_slice details;
624   memset(ops, 0, sizeof(ops));
625   op = ops;
626   op->op = GRPC_OP_SEND_INITIAL_METADATA;
627   op->data.send_initial_metadata.count = 0;
628   op->flags = 0;
629   op->reserved = nullptr;
630   op++;
631   op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
632   op->flags = 0;
633   op->reserved = nullptr;
634   op++;
635   op->op = GRPC_OP_RECV_INITIAL_METADATA;
636   op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
637   op->flags = 0;
638   op->reserved = nullptr;
639   op++;
640   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
641   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
642   op->data.recv_status_on_client.status = &status;
643   op->data.recv_status_on_client.status_details = &details;
644   op->data.recv_status_on_client.error_string = &error_string;
645   op->flags = 0;
646   op->reserved = nullptr;
647   op++;
648   error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), Tag(101),
649                                 nullptr);
650   CHECK_EQ(error, GRPC_CALL_OK);
651   cqv_->Expect(Tag(101), true);
652   cqv_->Verify();
653   // Verify status and metadata
654   EXPECT_EQ(status, GRPC_STATUS_UNAVAILABLE);
655   ASSERT_TRUE(TrailingMetadataRecordingFilter::trailing_metadata_available());
656   ASSERT_TRUE(
657       TrailingMetadataRecordingFilter::stream_network_state().has_value());
658   EXPECT_EQ(TrailingMetadataRecordingFilter::stream_network_state().value(),
659             GrpcStreamNetworkState::kNotSentOnWire);
660   grpc_slice_unref(details);
661   gpr_free(const_cast<char*>(error_string));
662   grpc_metadata_array_destroy(&initial_metadata_recv);
663   grpc_metadata_array_destroy(&trailing_metadata_recv);
664   grpc_call_unref(c);
665   ExecCtx::Get()->Flush();
666 }
667 
668 // These tests have the server sending a SETTINGS_FRAME with a max concurrent
669 // streams settings of 0 which denies the client the chance to start a stream.
670 // Note that in the future, these tests might become outdated if the
671 // client_channel learns about the max concurrent streams setting.
672 class ZeroConcurrencyTest : public StreamsNotSeenTest {
673  protected:
ZeroConcurrencyTest()674   ZeroConcurrencyTest() : StreamsNotSeenTest(/*server_allows_streams=*/false) {}
675 };
676 
677 // Client's HTTP2 transport receives a RPC request, but it cannot start the RPC
678 // because of the max concurrent streams setting. A goaway frame is then
679 // received which should result in the RPC getting cancelled with
680 // kNotSentOnWire.
TEST_F(ZeroConcurrencyTest,StartStreamBeforeGoaway)681 TEST_F(ZeroConcurrencyTest, StartStreamBeforeGoaway) {
682   grpc_call* c =
683       grpc_channel_create_call(channel_, nullptr, GRPC_PROPAGATE_DEFAULTS, cq_,
684                                grpc_slice_from_static_string("/foo"), nullptr,
685                                grpc_timeout_seconds_to_deadline(5), nullptr);
686   CHECK(c);
687   grpc_metadata_array initial_metadata_recv;
688   grpc_metadata_array trailing_metadata_recv;
689   grpc_metadata_array_init(&initial_metadata_recv);
690   grpc_metadata_array_init(&trailing_metadata_recv);
691   grpc_op* op;
692   grpc_op ops[6];
693   grpc_status_code status;
694   const char* error_string;
695   grpc_call_error error;
696   grpc_slice details;
697   // Send the request
698   memset(ops, 0, sizeof(ops));
699   op = ops;
700   op->op = GRPC_OP_SEND_INITIAL_METADATA;
701   op->data.send_initial_metadata.count = 0;
702   op->flags = 0;
703   op->reserved = nullptr;
704   op++;
705   op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
706   op->flags = 0;
707   op->reserved = nullptr;
708   op++;
709   op->op = GRPC_OP_RECV_INITIAL_METADATA;
710   op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
711   op->flags = 0;
712   op->reserved = nullptr;
713   op++;
714   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
715   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
716   op->data.recv_status_on_client.status = &status;
717   op->data.recv_status_on_client.status_details = &details;
718   op->data.recv_status_on_client.error_string = &error_string;
719   op->flags = 0;
720   op->reserved = nullptr;
721   op++;
722   error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), Tag(101),
723                                 nullptr);
724   // This test assumes that nothing would pause the RPC before its received by
725   // the transport. If that no longer holds true, we might need to drive the cq
726   // for some time to make sure that the RPC reaches the HTTP2 layer.
727   SendGoaway(0);
728   CHECK_EQ(error, GRPC_CALL_OK);
729   cqv_->Expect(Tag(101), true);
730   cqv_->Verify();
731   // Verify status and metadata
732   EXPECT_EQ(status, GRPC_STATUS_UNAVAILABLE);
733   ASSERT_TRUE(TrailingMetadataRecordingFilter::trailing_metadata_available());
734   ASSERT_TRUE(
735       TrailingMetadataRecordingFilter::stream_network_state().has_value());
736   EXPECT_EQ(TrailingMetadataRecordingFilter::stream_network_state().value(),
737             GrpcStreamNetworkState::kNotSentOnWire);
738   grpc_slice_unref(details);
739   gpr_free(const_cast<char*>(error_string));
740   grpc_metadata_array_destroy(&initial_metadata_recv);
741   grpc_metadata_array_destroy(&trailing_metadata_recv);
742   grpc_call_unref(c);
743   ExecCtx::Get()->Flush();
744 }
745 
746 // Client's HTTP2 transport receives a RPC request, but it cannot start the RPC
747 // because of the max concurrent streams setting. Server then shuts its endpoint
748 // which should result in the RPC getting cancelled with kNotSentOnWire.
TEST_F(ZeroConcurrencyTest,TransportDestroyed)749 TEST_F(ZeroConcurrencyTest, TransportDestroyed) {
750   grpc_call* c =
751       grpc_channel_create_call(channel_, nullptr, GRPC_PROPAGATE_DEFAULTS, cq_,
752                                grpc_slice_from_static_string("/foo"), nullptr,
753                                grpc_timeout_seconds_to_deadline(5), nullptr);
754   CHECK(c);
755   grpc_metadata_array initial_metadata_recv;
756   grpc_metadata_array trailing_metadata_recv;
757   grpc_metadata_array_init(&initial_metadata_recv);
758   grpc_metadata_array_init(&trailing_metadata_recv);
759   grpc_op* op;
760   grpc_op ops[6];
761   grpc_status_code status;
762   const char* error_string;
763   grpc_call_error error;
764   grpc_slice details;
765   // Send the request
766   memset(ops, 0, sizeof(ops));
767   op = ops;
768   op->op = GRPC_OP_SEND_INITIAL_METADATA;
769   op->data.send_initial_metadata.count = 0;
770   op->flags = 0;
771   op->reserved = nullptr;
772   op++;
773   op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
774   op->flags = 0;
775   op->reserved = nullptr;
776   op++;
777   op->op = GRPC_OP_RECV_INITIAL_METADATA;
778   op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
779   op->flags = 0;
780   op->reserved = nullptr;
781   op++;
782   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
783   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
784   op->data.recv_status_on_client.status = &status;
785   op->data.recv_status_on_client.status_details = &details;
786   op->data.recv_status_on_client.error_string = &error_string;
787   op->flags = 0;
788   op->reserved = nullptr;
789   op++;
790   error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), Tag(101),
791                                 nullptr);
792   CloseServerConnection();
793   CHECK_EQ(error, GRPC_CALL_OK);
794   cqv_->Expect(Tag(101), true);
795   cqv_->Verify();
796   // Verify status and metadata
797   EXPECT_EQ(status, GRPC_STATUS_UNAVAILABLE);
798   ASSERT_TRUE(TrailingMetadataRecordingFilter::trailing_metadata_available());
799   ASSERT_TRUE(
800       TrailingMetadataRecordingFilter::stream_network_state().has_value());
801   EXPECT_EQ(TrailingMetadataRecordingFilter::stream_network_state().value(),
802             GrpcStreamNetworkState::kNotSentOnWire);
803   grpc_slice_unref(details);
804   gpr_free(const_cast<char*>(error_string));
805   grpc_metadata_array_destroy(&initial_metadata_recv);
806   grpc_metadata_array_destroy(&trailing_metadata_recv);
807   grpc_call_unref(c);
808   ExecCtx::Get()->Flush();
809 }
810 
811 }  // namespace
812 }  // namespace grpc_core
813 
main(int argc,char ** argv)814 int main(int argc, char** argv) {
815   ::testing::InitGoogleTest(&argc, argv);
816   grpc::testing::TestEnvironment env(&argc, argv);
817   int result;
818   grpc_core::CoreConfiguration::RunWithSpecialConfiguration(
819       [](grpc_core::CoreConfiguration::Builder* builder) {
820         grpc_core::BuildCoreConfiguration(builder);
821         builder->channel_init()->RegisterFilter(
822             GRPC_CLIENT_SUBCHANNEL,
823             &grpc_core::TrailingMetadataRecordingFilter::kFilterVtable);
824       },
825       [&] {
826         grpc_core::
827             TestOnlyGlobalHttp2TransportDisableTransientFailureStateNotification(
828                 true);
829         grpc_init();
830         {
831           grpc_core::ExecCtx exec_ctx;
832           result = RUN_ALL_TESTS();
833         }
834         grpc_shutdown();
835       });
836   return result;
837 }
838