• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
18 
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpc/grpc.h>
21 #include <grpc/impl/channel_arg_names.h>
22 #include <grpc/impl/connectivity_state.h>
23 #include <grpc/slice_buffer.h>
24 #include <grpc/status.h>
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/port_platform.h>
27 #include <grpc/support/time.h>
28 #include <inttypes.h>
29 #include <limits.h>
30 #include <string.h>
31 
32 #include <algorithm>
33 #include <atomic>
34 #include <cstddef>
35 #include <cstdint>
36 #include <limits>
37 #include <memory>
38 #include <new>
39 #include <string>
40 #include <type_traits>
41 #include <utility>
42 #include <vector>
43 
44 #include "absl/base/attributes.h"
45 #include "absl/container/flat_hash_map.h"
46 #include "absl/hash/hash.h"
47 #include "absl/log/check.h"
48 #include "absl/log/log.h"
49 #include "absl/meta/type_traits.h"
50 #include "absl/random/random.h"
51 #include "absl/status/status.h"
52 #include "absl/strings/cord.h"
53 #include "absl/strings/str_cat.h"
54 #include "absl/strings/str_format.h"
55 #include "absl/strings/string_view.h"
56 #include "absl/types/optional.h"
57 #include "absl/types/variant.h"
58 #include "src/core/config/config_vars.h"
59 #include "src/core/ext/transport/chttp2/transport/call_tracer_wrapper.h"
60 #include "src/core/ext/transport/chttp2/transport/context_list_entry.h"
61 #include "src/core/ext/transport/chttp2/transport/flow_control.h"
62 #include "src/core/ext/transport/chttp2/transport/frame_data.h"
63 #include "src/core/ext/transport/chttp2/transport/frame_goaway.h"
64 #include "src/core/ext/transport/chttp2/transport/frame_rst_stream.h"
65 #include "src/core/ext/transport/chttp2/transport/frame_security.h"
66 #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
67 #include "src/core/ext/transport/chttp2/transport/http2_settings.h"
68 #include "src/core/ext/transport/chttp2/transport/internal.h"
69 #include "src/core/ext/transport/chttp2/transport/legacy_frame.h"
70 #include "src/core/ext/transport/chttp2/transport/ping_abuse_policy.h"
71 #include "src/core/ext/transport/chttp2/transport/ping_callbacks.h"
72 #include "src/core/ext/transport/chttp2/transport/ping_rate_policy.h"
73 #include "src/core/ext/transport/chttp2/transport/stream_lists.h"
74 #include "src/core/ext/transport/chttp2/transport/varint.h"
75 #include "src/core/ext/transport/chttp2/transport/write_size_policy.h"
76 #include "src/core/lib/channel/channel_args.h"
77 #include "src/core/lib/event_engine/extensions/tcp_trace.h"
78 #include "src/core/lib/event_engine/query_extensions.h"
79 #include "src/core/lib/experiments/experiments.h"
80 #include "src/core/lib/iomgr/combiner.h"
81 #include "src/core/lib/iomgr/endpoint.h"
82 #include "src/core/lib/iomgr/error.h"
83 #include "src/core/lib/iomgr/ev_posix.h"
84 #include "src/core/lib/iomgr/event_engine_shims/endpoint.h"
85 #include "src/core/lib/iomgr/exec_ctx.h"
86 #include "src/core/lib/iomgr/iomgr_fwd.h"
87 #include "src/core/lib/iomgr/port.h"
88 #include "src/core/lib/promise/poll.h"
89 #include "src/core/lib/resource_quota/arena.h"
90 #include "src/core/lib/resource_quota/memory_quota.h"
91 #include "src/core/lib/resource_quota/resource_quota.h"
92 #include "src/core/lib/slice/slice.h"
93 #include "src/core/lib/slice/slice_buffer.h"
94 #include "src/core/lib/slice/slice_internal.h"
95 #include "src/core/lib/transport/bdp_estimator.h"
96 #include "src/core/lib/transport/connectivity_state.h"
97 #include "src/core/lib/transport/error_utils.h"
98 #include "src/core/lib/transport/http2_errors.h"
99 #include "src/core/lib/transport/metadata_batch.h"
100 #include "src/core/lib/transport/metadata_info.h"
101 #include "src/core/lib/transport/status_conversion.h"
102 #include "src/core/lib/transport/transport.h"
103 #include "src/core/lib/transport/transport_framing_endpoint_extension.h"
104 #include "src/core/telemetry/call_tracer.h"
105 #include "src/core/telemetry/stats.h"
106 #include "src/core/telemetry/stats_data.h"
107 #include "src/core/telemetry/tcp_tracer.h"
108 #include "src/core/util/bitset.h"
109 #include "src/core/util/crash.h"
110 #include "src/core/util/debug_location.h"
111 #include "src/core/util/http_client/parser.h"
112 #include "src/core/util/ref_counted.h"
113 #include "src/core/util/status_helper.h"
114 #include "src/core/util/string.h"
115 #include "src/core/util/time.h"
116 #include "src/core/util/useful.h"
117 
118 #define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
119 #define MAX_WINDOW 0x7fffffffu
120 #define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024)
121 
122 #define KEEPALIVE_TIME_BACKOFF_MULTIPLIER 2
123 
124 #define DEFAULT_MAX_PENDING_INDUCED_FRAMES 10000
125 
126 #define GRPC_ARG_HTTP2_PING_ON_RST_STREAM_PERCENT \
127   "grpc.http2.ping_on_rst_stream_percent"
128 
129 static grpc_core::Duration g_default_client_keepalive_time =
130     grpc_core::Duration::Infinity();
131 static grpc_core::Duration g_default_client_keepalive_timeout =
132     grpc_core::Duration::Seconds(20);
133 static grpc_core::Duration g_default_server_keepalive_time =
134     grpc_core::Duration::Hours(2);
135 static grpc_core::Duration g_default_server_keepalive_timeout =
136     grpc_core::Duration::Seconds(20);
137 static bool g_default_client_keepalive_permit_without_calls = false;
138 static bool g_default_server_keepalive_permit_without_calls = false;
139 
140 // EXPERIMENTAL: control tarpitting in chttp2
141 #define GRPC_ARG_HTTP_ALLOW_TARPIT "grpc.http.tarpit"
142 #define GRPC_ARG_HTTP_TARPIT_MIN_DURATION_MS "grpc.http.tarpit_min_duration_ms"
143 #define GRPC_ARG_HTTP_TARPIT_MAX_DURATION_MS "grpc.http.tarpit_max_duration_ms"
144 
145 #define MAX_CLIENT_STREAM_ID 0x7fffffffu
146 
147 // forward declarations of various callbacks that we'll build closures around
148 static void write_action_begin_locked(
149     grpc_core::RefCountedPtr<grpc_chttp2_transport>, grpc_error_handle error);
150 static void write_action(grpc_chttp2_transport* t);
151 static void write_action_end(grpc_core::RefCountedPtr<grpc_chttp2_transport>,
152                              grpc_error_handle error);
153 static void write_action_end_locked(
154     grpc_core::RefCountedPtr<grpc_chttp2_transport>, grpc_error_handle error);
155 
156 static void read_action(grpc_core::RefCountedPtr<grpc_chttp2_transport>,
157                         grpc_error_handle error);
158 static void read_action_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport>,
159                                grpc_error_handle error);
160 static void continue_read_action_locked(
161     grpc_core::RefCountedPtr<grpc_chttp2_transport> t);
162 
163 static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
164                            grpc_error_handle error, bool tarpit);
165 
166 // Start new streams that have been created if we can
167 static void maybe_start_some_streams(grpc_chttp2_transport* t);
168 
169 static void connectivity_state_set(grpc_chttp2_transport* t,
170                                    grpc_connectivity_state state,
171                                    const absl::Status& status,
172                                    const char* reason);
173 
174 static void benign_reclaimer_locked(
175     grpc_core::RefCountedPtr<grpc_chttp2_transport>, grpc_error_handle error);
176 static void destructive_reclaimer_locked(
177     grpc_core::RefCountedPtr<grpc_chttp2_transport>, grpc_error_handle error);
178 
179 static void post_benign_reclaimer(grpc_chttp2_transport* t);
180 static void post_destructive_reclaimer(grpc_chttp2_transport* t);
181 
182 static void close_transport_locked(grpc_chttp2_transport* t,
183                                    grpc_error_handle error);
184 static void end_all_the_calls(grpc_chttp2_transport* t,
185                               grpc_error_handle error);
186 
187 static void start_bdp_ping(grpc_core::RefCountedPtr<grpc_chttp2_transport>,
188                            grpc_error_handle error);
189 static void finish_bdp_ping(grpc_core::RefCountedPtr<grpc_chttp2_transport>,
190                             grpc_error_handle error);
191 static void start_bdp_ping_locked(
192     grpc_core::RefCountedPtr<grpc_chttp2_transport>, grpc_error_handle error);
193 static void finish_bdp_ping_locked(
194     grpc_core::RefCountedPtr<grpc_chttp2_transport>, grpc_error_handle error);
195 static void next_bdp_ping_timer_expired(grpc_chttp2_transport* t);
196 static void next_bdp_ping_timer_expired_locked(
197     grpc_core::RefCountedPtr<grpc_chttp2_transport> tp,
198     GRPC_UNUSED grpc_error_handle error);
199 
200 static void cancel_pings(grpc_chttp2_transport* t, grpc_error_handle error);
201 static void send_ping_locked(grpc_chttp2_transport* t,
202                              grpc_closure* on_initiate, grpc_closure* on_ack);
203 static void retry_initiate_ping_locked(
204     grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
205     GRPC_UNUSED grpc_error_handle error);
206 
207 // keepalive-relevant functions
208 static void init_keepalive_ping(
209     grpc_core::RefCountedPtr<grpc_chttp2_transport> t);
210 static void init_keepalive_ping_locked(
211     grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
212     GRPC_UNUSED grpc_error_handle error);
213 static void finish_keepalive_ping(
214     grpc_core::RefCountedPtr<grpc_chttp2_transport> t, grpc_error_handle error);
215 static void finish_keepalive_ping_locked(
216     grpc_core::RefCountedPtr<grpc_chttp2_transport> t, grpc_error_handle error);
217 static void maybe_reset_keepalive_ping_timer_locked(grpc_chttp2_transport* t);
218 
219 static void send_goaway(grpc_chttp2_transport* t, grpc_error_handle error,
220                         bool immediate_disconnect_hint);
221 
222 // Timeout for getting an ack back on settings changes
223 #define GRPC_ARG_SETTINGS_TIMEOUT "grpc.http2.settings_timeout"
224 
225 namespace {
226 
227 using TaskHandle = ::grpc_event_engine::experimental::EventEngine::TaskHandle;
228 
CallTracerIfSampled(grpc_chttp2_stream * s)229 grpc_core::CallTracerAnnotationInterface* CallTracerIfSampled(
230     grpc_chttp2_stream* s) {
231   if (!grpc_core::IsTraceRecordCallopsEnabled()) {
232     return nullptr;
233   }
234   auto* call_tracer =
235       s->arena->GetContext<grpc_core::CallTracerAnnotationInterface>();
236   if (call_tracer == nullptr || !call_tracer->IsSampled()) {
237     return nullptr;
238   }
239   return call_tracer;
240 }
241 
TcpTracerIfSampled(grpc_chttp2_stream * s)242 std::shared_ptr<grpc_core::TcpTracerInterface> TcpTracerIfSampled(
243     grpc_chttp2_stream* s) {
244   if (!grpc_core::IsTraceRecordCallopsEnabled()) {
245     return nullptr;
246   }
247   auto* call_attempt_tracer =
248       s->arena->GetContext<grpc_core::CallTracerInterface>();
249   if (call_attempt_tracer == nullptr || !call_attempt_tracer->IsSampled()) {
250     return nullptr;
251   }
252   return call_attempt_tracer->StartNewTcpTrace();
253 }
254 
255 grpc_core::WriteTimestampsCallback g_write_timestamps_callback = nullptr;
256 grpc_core::CopyContextFn g_get_copied_context_fn = nullptr;
257 }  // namespace
258 
259 namespace grpc_core {
260 
261 namespace {
262 
263 // Initialize a grpc_closure \a c to call \a Fn with \a t and \a error. Holds
264 // the passed in reference to \a t until it's moved into Fn.
265 template <void (*Fn)(RefCountedPtr<grpc_chttp2_transport>, grpc_error_handle)>
InitTransportClosure(RefCountedPtr<grpc_chttp2_transport> t,grpc_closure * c)266 grpc_closure* InitTransportClosure(RefCountedPtr<grpc_chttp2_transport> t,
267                                    grpc_closure* c) {
268   GRPC_CLOSURE_INIT(
269       c,
270       [](void* tp, grpc_error_handle error) {
271         Fn(RefCountedPtr<grpc_chttp2_transport>(
272                static_cast<grpc_chttp2_transport*>(tp)),
273            std::move(error));
274       },
275       t.release(), nullptr);
276   return c;
277 }
278 
279 TestOnlyGlobalHttp2TransportInitCallback test_only_init_callback = nullptr;
280 TestOnlyGlobalHttp2TransportDestructCallback test_only_destruct_callback =
281     nullptr;
282 bool test_only_disable_transient_failure_state_notification = false;
283 
284 }  // namespace
285 
TestOnlySetGlobalHttp2TransportInitCallback(TestOnlyGlobalHttp2TransportInitCallback callback)286 void TestOnlySetGlobalHttp2TransportInitCallback(
287     TestOnlyGlobalHttp2TransportInitCallback callback) {
288   test_only_init_callback = callback;
289 }
290 
TestOnlySetGlobalHttp2TransportDestructCallback(TestOnlyGlobalHttp2TransportDestructCallback callback)291 void TestOnlySetGlobalHttp2TransportDestructCallback(
292     TestOnlyGlobalHttp2TransportDestructCallback callback) {
293   test_only_destruct_callback = callback;
294 }
295 
TestOnlyGlobalHttp2TransportDisableTransientFailureStateNotification(bool disable)296 void TestOnlyGlobalHttp2TransportDisableTransientFailureStateNotification(
297     bool disable) {
298   test_only_disable_transient_failure_state_notification = disable;
299 }
300 
GrpcHttp2SetWriteTimestampsCallback(WriteTimestampsCallback fn)301 void GrpcHttp2SetWriteTimestampsCallback(WriteTimestampsCallback fn) {
302   g_write_timestamps_callback = fn;
303 }
304 
GrpcHttp2SetCopyContextFn(CopyContextFn fn)305 void GrpcHttp2SetCopyContextFn(CopyContextFn fn) {
306   g_get_copied_context_fn = fn;
307 }
308 
GrpcHttp2GetWriteTimestampsCallback()309 WriteTimestampsCallback GrpcHttp2GetWriteTimestampsCallback() {
310   return g_write_timestamps_callback;
311 }
312 
GrpcHttp2GetCopyContextFn()313 CopyContextFn GrpcHttp2GetCopyContextFn() { return g_get_copied_context_fn; }
314 
315 // For each entry in the passed ContextList, it executes the function set using
316 // GrpcHttp2SetWriteTimestampsCallback method with each context in the list
317 // and \a ts. It also deletes/frees up the passed ContextList after this
318 // operation.
ForEachContextListEntryExecute(void * arg,Timestamps * ts,grpc_error_handle error)319 void ForEachContextListEntryExecute(void* arg, Timestamps* ts,
320                                     grpc_error_handle error) {
321   ContextList* context_list = reinterpret_cast<ContextList*>(arg);
322   if (!context_list) {
323     return;
324   }
325   for (auto it = context_list->begin(); it != context_list->end(); it++) {
326     ContextListEntry& entry = (*it);
327     if (ts) {
328       ts->byte_offset = static_cast<uint32_t>(entry.ByteOffsetInStream());
329     }
330     g_write_timestamps_callback(entry.TraceContext(), ts, error);
331   }
332   delete context_list;
333 }
334 
HttpAnnotation(Type type,gpr_timespec time)335 HttpAnnotation::HttpAnnotation(Type type, gpr_timespec time)
336     : CallTracerAnnotationInterface::Annotation(
337           CallTracerAnnotationInterface::AnnotationType::kHttpTransport),
338       type_(type),
339       time_(time) {}
340 
ToString() const341 std::string HttpAnnotation::ToString() const {
342   std::string s = "HttpAnnotation type: ";
343   switch (type_) {
344     case Type::kStart:
345       absl::StrAppend(&s, "Start");
346       break;
347     case Type::kHeadWritten:
348       absl::StrAppend(&s, "HeadWritten");
349       break;
350     case Type::kEnd:
351       absl::StrAppend(&s, "End");
352       break;
353     default:
354       absl::StrAppend(&s, "Unknown");
355   }
356   absl::StrAppend(&s, " time: ", gpr_format_timespec(time_));
357   if (transport_stats_.has_value()) {
358     absl::StrAppend(&s, " transport:[", transport_stats_->ToString(), "]");
359   }
360   if (stream_stats_.has_value()) {
361     absl::StrAppend(&s, " stream:[", stream_stats_->ToString(), "]");
362   }
363   return s;
364 }
365 
366 }  // namespace grpc_core
367 
368 //
369 // CONSTRUCTION/DESTRUCTION/REFCOUNTING
370 //
371 
~grpc_chttp2_transport()372 grpc_chttp2_transport::~grpc_chttp2_transport() {
373   size_t i;
374 
375   cancel_pings(this, GRPC_ERROR_CREATE("Transport destroyed"));
376 
377   event_engine.reset();
378 
379   if (channelz_socket != nullptr) {
380     channelz_socket.reset();
381   }
382 
383   grpc_slice_buffer_destroy(&qbuf);
384 
385   grpc_error_handle error = GRPC_ERROR_CREATE("Transport destroyed");
386   // ContextList::Execute follows semantics of a callback function and does not
387   // take a ref on error
388   if (context_list != nullptr) {
389     grpc_core::ForEachContextListEntryExecute(context_list, nullptr, error);
390   }
391   context_list = nullptr;
392 
393   grpc_slice_buffer_destroy(&read_buffer);
394   grpc_chttp2_goaway_parser_destroy(&goaway_parser);
395 
396   for (i = 0; i < STREAM_LIST_COUNT; i++) {
397     CHECK_EQ(lists[i].head, nullptr);
398     CHECK_EQ(lists[i].tail, nullptr);
399   }
400 
401   CHECK(stream_map.empty());
402   GRPC_COMBINER_UNREF(combiner, "chttp2_transport");
403 
404   while (write_cb_pool) {
405     grpc_chttp2_write_cb* next = write_cb_pool->next;
406     gpr_free(write_cb_pool);
407     write_cb_pool = next;
408   }
409 
410   gpr_free(ping_acks);
411   if (grpc_core::test_only_destruct_callback != nullptr) {
412     grpc_core::test_only_destruct_callback();
413   }
414 }
415 
read_channel_args(grpc_chttp2_transport * t,const grpc_core::ChannelArgs & channel_args,const bool is_client)416 static void read_channel_args(grpc_chttp2_transport* t,
417                               const grpc_core::ChannelArgs& channel_args,
418                               const bool is_client) {
419   const int initial_sequence_number =
420       channel_args.GetInt(GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER).value_or(-1);
421   if (initial_sequence_number > 0) {
422     if ((t->next_stream_id & 1) != (initial_sequence_number & 1)) {
423       LOG(ERROR) << GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER
424                  << ": low bit must be " << (t->next_stream_id & 1) << " on "
425                  << (is_client ? "client" : "server");
426     } else {
427       t->next_stream_id = static_cast<uint32_t>(initial_sequence_number);
428     }
429   }
430 
431   const int max_hpack_table_size =
432       channel_args.GetInt(GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER).value_or(-1);
433   if (max_hpack_table_size >= 0) {
434     t->hpack_compressor.SetMaxUsableSize(max_hpack_table_size);
435   }
436 
437   t->write_buffer_size =
438       std::max(0, channel_args.GetInt(GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)
439                       .value_or(grpc_core::chttp2::kDefaultWindow));
440   t->keepalive_time =
441       std::max(grpc_core::Duration::Milliseconds(1),
442                channel_args.GetDurationFromIntMillis(GRPC_ARG_KEEPALIVE_TIME_MS)
443                    .value_or(t->is_client ? g_default_client_keepalive_time
444                                           : g_default_server_keepalive_time));
445   t->keepalive_timeout = std::max(
446       grpc_core::Duration::Zero(),
447       channel_args.GetDurationFromIntMillis(GRPC_ARG_KEEPALIVE_TIMEOUT_MS)
448           .value_or(t->keepalive_time == grpc_core::Duration::Infinity()
449                         ? grpc_core::Duration::Infinity()
450                         : (t->is_client ? g_default_client_keepalive_timeout
451                                         : g_default_server_keepalive_timeout)));
452   t->ping_timeout = std::max(
453       grpc_core::Duration::Zero(),
454       channel_args.GetDurationFromIntMillis(GRPC_ARG_PING_TIMEOUT_MS)
455           .value_or(t->keepalive_time == grpc_core::Duration::Infinity()
456                         ? grpc_core::Duration::Infinity()
457                         : grpc_core::Duration::Minutes(1)));
458   if (t->is_client) {
459     t->keepalive_permit_without_calls =
460         channel_args.GetBool(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)
461             .value_or(g_default_client_keepalive_permit_without_calls);
462   } else {
463     t->keepalive_permit_without_calls =
464         channel_args.GetBool(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)
465             .value_or(g_default_server_keepalive_permit_without_calls);
466   }
467 
468   t->settings_timeout =
469       channel_args.GetDurationFromIntMillis(GRPC_ARG_SETTINGS_TIMEOUT)
470           .value_or(std::max(t->keepalive_timeout * 2,
471                              grpc_core::Duration::Minutes(1)));
472 
473   // Only send the preferred rx frame size http2 setting if we are instructed
474   // to auto size the buffers allocated at tcp level and we also can adjust
475   // sending frame size.
476   t->enable_preferred_rx_crypto_frame_advertisement =
477       channel_args
478           .GetBool(GRPC_ARG_EXPERIMENTAL_HTTP2_PREFERRED_CRYPTO_FRAME_SIZE)
479           .value_or(false);
480 
481   const auto max_requests_per_read =
482       channel_args.GetInt("grpc.http2.max_requests_per_read");
483   if (max_requests_per_read.has_value()) {
484     t->max_requests_per_read =
485         grpc_core::Clamp(*max_requests_per_read, 1, 10000);
486   } else {
487     t->max_requests_per_read = 32;
488   }
489 
490   if (channel_args.GetBool(GRPC_ARG_ENABLE_CHANNELZ)
491           .value_or(GRPC_ENABLE_CHANNELZ_DEFAULT)) {
492     t->channelz_socket =
493         grpc_core::MakeRefCounted<grpc_core::channelz::SocketNode>(
494             std::string(grpc_endpoint_get_local_address(t->ep.get())),
495             std::string(t->peer_string.as_string_view()),
496             absl::StrCat(t->GetTransportName(), " ",
497                          t->peer_string.as_string_view()),
498             channel_args
499                 .GetObjectRef<grpc_core::channelz::SocketNode::Security>());
500   }
501 
502   t->ack_pings = channel_args.GetBool("grpc.http2.ack_pings").value_or(true);
503 
504   t->allow_tarpit =
505       channel_args.GetBool(GRPC_ARG_HTTP_ALLOW_TARPIT).value_or(true);
506   t->min_tarpit_duration_ms =
507       channel_args
508           .GetDurationFromIntMillis(GRPC_ARG_HTTP_TARPIT_MIN_DURATION_MS)
509           .value_or(grpc_core::Duration::Milliseconds(100))
510           .millis();
511   t->max_tarpit_duration_ms =
512       channel_args
513           .GetDurationFromIntMillis(GRPC_ARG_HTTP_TARPIT_MAX_DURATION_MS)
514           .value_or(grpc_core::Duration::Seconds(1))
515           .millis();
516   t->max_header_list_size_soft_limit =
517       grpc_core::GetSoftLimitFromChannelArgs(channel_args);
518 
519   int value;
520   if (!is_client) {
521     value = channel_args.GetInt(GRPC_ARG_MAX_CONCURRENT_STREAMS).value_or(-1);
522     if (value >= 0) {
523       t->settings.mutable_local().SetMaxConcurrentStreams(value);
524     }
525   } else if (channel_args.Contains(GRPC_ARG_MAX_CONCURRENT_STREAMS)) {
526     VLOG(2) << GRPC_ARG_MAX_CONCURRENT_STREAMS
527             << " is not available on clients";
528   }
529   value =
530       channel_args.GetInt(GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER).value_or(-1);
531   if (value >= 0) {
532     t->settings.mutable_local().SetHeaderTableSize(value);
533   }
534   t->settings.mutable_local().SetMaxHeaderListSize(
535       grpc_core::GetHardLimitFromChannelArgs(channel_args));
536   value = channel_args.GetInt(GRPC_ARG_HTTP2_MAX_FRAME_SIZE).value_or(-1);
537   if (value >= 0) {
538     t->settings.mutable_local().SetMaxFrameSize(value);
539   }
540   value =
541       channel_args.GetInt(GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES).value_or(-1);
542   if (value >= 0) {
543     t->settings.mutable_local().SetInitialWindowSize(value);
544   }
545   value = channel_args.GetInt(GRPC_ARG_HTTP2_ENABLE_TRUE_BINARY).value_or(-1);
546   if (value >= 0) {
547     t->settings.mutable_local().SetAllowTrueBinaryMetadata(value != 0);
548   }
549 
550   if (t->enable_preferred_rx_crypto_frame_advertisement) {
551     t->settings.mutable_local().SetPreferredReceiveCryptoMessageSize(INT_MAX);
552   }
553 
554   t->settings.mutable_local().SetAllowSecurityFrame(
555       channel_args.GetBool(GRPC_ARG_SECURITY_FRAME_ALLOWED).value_or(false));
556 
557   t->ping_on_rst_stream_percent = grpc_core::Clamp(
558       channel_args.GetInt(GRPC_ARG_HTTP2_PING_ON_RST_STREAM_PERCENT)
559           .value_or(1),
560       0, 100);
561 
562   t->max_concurrent_streams_overload_protection =
563       channel_args.GetBool(GRPC_ARG_MAX_CONCURRENT_STREAMS_OVERLOAD_PROTECTION)
564           .value_or(true);
565 
566   t->max_concurrent_streams_reject_on_client =
567       channel_args.GetBool(GRPC_ARG_MAX_CONCURRENT_STREAMS_REJECT_ON_CLIENT)
568           .value_or(false);
569 }
570 
init_keepalive_pings_if_enabled_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,GRPC_UNUSED grpc_error_handle error)571 static void init_keepalive_pings_if_enabled_locked(
572     grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
573     GRPC_UNUSED grpc_error_handle error) {
574   DCHECK(error.ok());
575   if (t->keepalive_time != grpc_core::Duration::Infinity()) {
576     t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
577     t->keepalive_ping_timer_handle =
578         t->event_engine->RunAfter(t->keepalive_time, [t = t->Ref()]() mutable {
579           grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
580           grpc_core::ExecCtx exec_ctx;
581           init_keepalive_ping(std::move(t));
582         });
583   } else {
584     // Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no
585     // inflight keepalive timers
586     t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED;
587   }
588 }
589 
590 // TODO(alishananda): add unit testing as part of chttp2 promise conversion work
WriteSecurityFrame(grpc_core::SliceBuffer * data)591 void grpc_chttp2_transport::WriteSecurityFrame(grpc_core::SliceBuffer* data) {
592   grpc_core::ExecCtx exec_ctx;
593   combiner->Run(grpc_core::NewClosure(
594                     [transport = Ref(), data](grpc_error_handle) mutable {
595                       transport->WriteSecurityFrameLocked(data);
596                     }),
597                 absl::OkStatus());
598 }
599 
WriteSecurityFrameLocked(grpc_core::SliceBuffer * data)600 void grpc_chttp2_transport::WriteSecurityFrameLocked(
601     grpc_core::SliceBuffer* data) {
602   if (data == nullptr) {
603     return;
604   }
605   if (!settings.peer().allow_security_frame()) {
606     close_transport_locked(
607         this,
608         grpc_error_set_int(
609             GRPC_ERROR_CREATE("Unexpected SECURITY frame scheduled for write"),
610             grpc_core::StatusIntProperty::kRpcStatus,
611             GRPC_STATUS_FAILED_PRECONDITION));
612   }
613   grpc_core::SliceBuffer security_frame;
614   grpc_chttp2_security_frame_create(data->c_slice_buffer(), data->Length(),
615                                     security_frame.c_slice_buffer());
616   grpc_slice_buffer_move_into(security_frame.c_slice_buffer(), &qbuf);
617   grpc_chttp2_initiate_write(this, GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE);
618 }
619 
620 using grpc_event_engine::experimental::QueryExtension;
621 using grpc_event_engine::experimental::TcpTraceExtension;
622 
grpc_chttp2_transport(const grpc_core::ChannelArgs & channel_args,grpc_core::OrphanablePtr<grpc_endpoint> endpoint,const bool is_client)623 grpc_chttp2_transport::grpc_chttp2_transport(
624     const grpc_core::ChannelArgs& channel_args,
625     grpc_core::OrphanablePtr<grpc_endpoint> endpoint, const bool is_client)
626     : ep(std::move(endpoint)),
627       peer_string(
628           grpc_core::Slice::FromCopiedString(grpc_endpoint_get_peer(ep.get()))),
629       memory_owner(channel_args.GetObject<grpc_core::ResourceQuota>()
630                        ->memory_quota()
631                        ->CreateMemoryOwner()),
632       self_reservation(
633           memory_owner.MakeReservation(sizeof(grpc_chttp2_transport))),
634       event_engine(
635           channel_args
636               .GetObjectRef<grpc_event_engine::experimental::EventEngine>()),
637       combiner(grpc_combiner_create(event_engine)),
638       state_tracker(is_client ? "client_transport" : "server_transport",
639                     GRPC_CHANNEL_READY),
640       next_stream_id(is_client ? 1 : 2),
641       ping_abuse_policy(channel_args),
642       ping_rate_policy(channel_args, is_client),
643       flow_control(
644           peer_string.as_string_view(),
645           channel_args.GetBool(GRPC_ARG_HTTP2_BDP_PROBE).value_or(true),
646           &memory_owner),
647       deframe_state(is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0),
648       is_client(is_client) {
649   context_list = new grpc_core::ContextList();
650 
651   if (channel_args.GetBool(GRPC_ARG_TCP_TRACING_ENABLED).value_or(false) &&
652       grpc_event_engine::experimental::grpc_is_event_engine_endpoint(
653           ep.get())) {
654     auto epte = QueryExtension<TcpTraceExtension>(
655         grpc_event_engine::experimental::grpc_get_wrapped_event_engine_endpoint(
656             ep.get()));
657     if (epte != nullptr) {
658       epte->InitializeAndReturnTcpTracer();
659     }
660   }
661 
662   if (channel_args.GetBool(GRPC_ARG_SECURITY_FRAME_ALLOWED).value_or(false)) {
663     transport_framing_endpoint_extension = QueryExtension<
664         grpc_core::TransportFramingEndpointExtension>(
665         grpc_event_engine::experimental::grpc_get_wrapped_event_engine_endpoint(
666             ep.get()));
667     if (transport_framing_endpoint_extension != nullptr) {
668       transport_framing_endpoint_extension->SetSendFrameCallback(
669           [this](grpc_core::SliceBuffer* data) { WriteSecurityFrame(data); });
670     }
671   }
672 
673   CHECK(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) ==
674         GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
675 
676   grpc_slice_buffer_init(&read_buffer);
677   if (is_client) {
678     grpc_slice_buffer_add(
679         outbuf.c_slice_buffer(),
680         grpc_slice_from_copied_string(GRPC_CHTTP2_CLIENT_CONNECT_STRING));
681   }
682   grpc_slice_buffer_init(&qbuf);
683   grpc_chttp2_goaway_parser_init(&goaway_parser);
684 
685   // configure http2 the way we like it
686   if (is_client) {
687     settings.mutable_local().SetEnablePush(false);
688     settings.mutable_local().SetMaxConcurrentStreams(0);
689   }
690   settings.mutable_local().SetMaxHeaderListSize(DEFAULT_MAX_HEADER_LIST_SIZE);
691   settings.mutable_local().SetAllowTrueBinaryMetadata(true);
692 
693   read_channel_args(this, channel_args, is_client);
694 
695   // Initially allow *UP TO* MAX_CONCURRENT_STREAMS incoming before we start
696   // blanket cancelling them.
697   num_incoming_streams_before_settings_ack =
698       settings.local().max_concurrent_streams();
699 
700   grpc_core::ExecCtx exec_ctx;
701   combiner->Run(
702       grpc_core::InitTransportClosure<init_keepalive_pings_if_enabled_locked>(
703           Ref(), &init_keepalive_ping_locked),
704       absl::OkStatus());
705 
706   if (flow_control.bdp_probe()) {
707     bdp_ping_blocked = true;
708     grpc_chttp2_act_on_flowctl_action(flow_control.PeriodicUpdate(), this,
709                                       nullptr);
710   }
711 
712   grpc_chttp2_initiate_write(this, GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE);
713   post_benign_reclaimer(this);
714   if (grpc_core::test_only_init_callback != nullptr) {
715     grpc_core::test_only_init_callback();
716   }
717 
718 #ifdef GRPC_POSIX_SOCKET_TCP
719   closure_barrier_may_cover_write =
720       grpc_event_engine_run_in_background() &&
721               grpc_core::IsScheduleCancellationOverWriteEnabled()
722           ? 0
723           : CLOSURE_BARRIER_MAY_COVER_WRITE;
724 #endif
725 }
726 
destroy_transport_locked(void * tp,grpc_error_handle)727 static void destroy_transport_locked(void* tp, grpc_error_handle /*error*/) {
728   grpc_core::RefCountedPtr<grpc_chttp2_transport> t(
729       static_cast<grpc_chttp2_transport*>(tp));
730   t->destroying = 1;
731   close_transport_locked(
732       t.get(),
733       grpc_error_set_int(GRPC_ERROR_CREATE("Transport destroyed"),
734                          grpc_core::StatusIntProperty::kOccurredDuringWrite,
735                          t->write_state));
736   t->memory_owner.Reset();
737 }
738 
Orphan()739 void grpc_chttp2_transport::Orphan() {
740   combiner->Run(GRPC_CLOSURE_CREATE(destroy_transport_locked, this, nullptr),
741                 absl::OkStatus());
742 }
743 
close_transport_locked(grpc_chttp2_transport * t,grpc_error_handle error)744 static void close_transport_locked(grpc_chttp2_transport* t,
745                                    grpc_error_handle error) {
746   end_all_the_calls(t, error);
747   cancel_pings(t, error);
748   if (t->closed_with_error.ok()) {
749     if (!grpc_error_has_clear_grpc_status(error)) {
750       error =
751           grpc_error_set_int(error, grpc_core::StatusIntProperty::kRpcStatus,
752                              GRPC_STATUS_UNAVAILABLE);
753     }
754     if (t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE) {
755       if (t->close_transport_on_writes_finished.ok()) {
756         t->close_transport_on_writes_finished =
757             GRPC_ERROR_CREATE("Delayed close due to in-progress write");
758       }
759       t->close_transport_on_writes_finished =
760           grpc_error_add_child(t->close_transport_on_writes_finished, error);
761       return;
762     }
763     CHECK(!error.ok());
764     t->closed_with_error = error;
765     connectivity_state_set(t, GRPC_CHANNEL_SHUTDOWN, absl::Status(),
766                            "close_transport");
767     if (t->keepalive_ping_timeout_handle != TaskHandle::kInvalid) {
768       t->event_engine->Cancel(std::exchange(t->keepalive_ping_timeout_handle,
769                                             TaskHandle::kInvalid));
770     }
771     if (t->settings_ack_watchdog != TaskHandle::kInvalid) {
772       t->event_engine->Cancel(
773           std::exchange(t->settings_ack_watchdog, TaskHandle::kInvalid));
774     }
775     if (t->delayed_ping_timer_handle != TaskHandle::kInvalid &&
776         t->event_engine->Cancel(t->delayed_ping_timer_handle)) {
777       t->delayed_ping_timer_handle = TaskHandle::kInvalid;
778     }
779     if (t->next_bdp_ping_timer_handle != TaskHandle::kInvalid &&
780         t->event_engine->Cancel(t->next_bdp_ping_timer_handle)) {
781       t->next_bdp_ping_timer_handle = TaskHandle::kInvalid;
782     }
783     switch (t->keepalive_state) {
784       case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING:
785         if (t->keepalive_ping_timer_handle != TaskHandle::kInvalid &&
786             t->event_engine->Cancel(t->keepalive_ping_timer_handle)) {
787           t->keepalive_ping_timer_handle = TaskHandle::kInvalid;
788         }
789         break;
790       case GRPC_CHTTP2_KEEPALIVE_STATE_PINGING:
791         if (t->keepalive_ping_timer_handle != TaskHandle::kInvalid &&
792             t->event_engine->Cancel(t->keepalive_ping_timer_handle)) {
793           t->keepalive_ping_timer_handle = TaskHandle::kInvalid;
794         }
795         break;
796       case GRPC_CHTTP2_KEEPALIVE_STATE_DYING:
797       case GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED:
798         // keepalive timers are not set in these two states
799         break;
800     }
801 
802     // flush writable stream list to avoid dangling references
803     grpc_chttp2_stream* s;
804     while (grpc_chttp2_list_pop_writable_stream(t, &s)) {
805       GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:close");
806     }
807     CHECK(t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE);
808     if (t->interested_parties_until_recv_settings != nullptr) {
809       grpc_endpoint_delete_from_pollset_set(
810           t->ep.get(), t->interested_parties_until_recv_settings);
811       t->interested_parties_until_recv_settings = nullptr;
812     }
813     grpc_core::MutexLock lock(&t->ep_destroy_mu);
814     t->ep.reset();
815   }
816   if (t->notify_on_receive_settings != nullptr) {
817     if (t->interested_parties_until_recv_settings != nullptr) {
818       grpc_endpoint_delete_from_pollset_set(
819           t->ep.get(), t->interested_parties_until_recv_settings);
820       t->interested_parties_until_recv_settings = nullptr;
821     }
822     grpc_core::ExecCtx::Run(DEBUG_LOCATION, t->notify_on_receive_settings,
823                             error);
824     t->notify_on_receive_settings = nullptr;
825   }
826   if (t->notify_on_close != nullptr) {
827     grpc_core::ExecCtx::Run(DEBUG_LOCATION, t->notify_on_close, error);
828     t->notify_on_close = nullptr;
829   }
830 }
831 
832 #ifndef NDEBUG
grpc_chttp2_stream_ref(grpc_chttp2_stream * s,const char * reason)833 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s, const char* reason) {
834   grpc_stream_ref(s->refcount, reason);
835 }
grpc_chttp2_stream_unref(grpc_chttp2_stream * s,const char * reason)836 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s, const char* reason) {
837   grpc_stream_unref(s->refcount, reason);
838 }
839 #else
grpc_chttp2_stream_ref(grpc_chttp2_stream * s)840 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s) {
841   grpc_stream_ref(s->refcount);
842 }
grpc_chttp2_stream_unref(grpc_chttp2_stream * s)843 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s) {
844   grpc_stream_unref(s->refcount);
845 }
846 #endif
847 
grpc_chttp2_stream(grpc_chttp2_transport * t,grpc_stream_refcount * refcount,const void * server_data,grpc_core::Arena * arena)848 grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t,
849                                        grpc_stream_refcount* refcount,
850                                        const void* server_data,
851                                        grpc_core::Arena* arena)
852     : t(t->Ref()),
853       refcount([refcount]() {
854 // We reserve one 'active stream' that's dropped when the stream is
855 //   read-closed. The others are for Chttp2IncomingByteStreams that are
856 //   actively reading
857 // We do this here to avoid cache misses.
858 #ifndef NDEBUG
859         grpc_stream_ref(refcount, "chttp2");
860 #else
861         grpc_stream_ref(refcount);
862 #endif
863         return refcount;
864       }()),
865       arena(arena),
866       flow_control(&t->flow_control),
867       call_tracer_wrapper(this) {
868   t->streams_allocated.fetch_add(1, std::memory_order_relaxed);
869   if (server_data) {
870     id = static_cast<uint32_t>(reinterpret_cast<uintptr_t>(server_data));
871     GRPC_TRACE_VLOG(http, 2)
872         << "HTTP:" << t << "/" << this << " creating accept stream " << id
873         << " [from " << server_data << "]";
874     *t->accepting_stream = this;
875     t->stream_map.emplace(id, this);
876     post_destructive_reclaimer(t);
877   }
878 
879   grpc_slice_buffer_init(&frame_storage);
880   grpc_slice_buffer_init(&flow_controlled_buffer);
881 }
882 
~grpc_chttp2_stream()883 grpc_chttp2_stream::~grpc_chttp2_stream() {
884   t->streams_allocated.fetch_sub(1, std::memory_order_relaxed);
885   grpc_chttp2_list_remove_stalled_by_stream(t.get(), this);
886   grpc_chttp2_list_remove_stalled_by_transport(t.get(), this);
887 
888   if (t->channelz_socket != nullptr) {
889     if ((t->is_client && eos_received) || (!t->is_client && eos_sent)) {
890       t->channelz_socket->RecordStreamSucceeded();
891     } else {
892       t->channelz_socket->RecordStreamFailed();
893     }
894   }
895 
896   CHECK((write_closed && read_closed) || id == 0);
897   if (id != 0) {
898     CHECK_EQ(t->stream_map.count(id), 0u);
899   }
900 
901   grpc_slice_buffer_destroy(&frame_storage);
902 
903   for (int i = 0; i < STREAM_LIST_COUNT; i++) {
904     if (GPR_UNLIKELY(included.is_set(i))) {
905       grpc_core::Crash(absl::StrFormat("%s stream %d still included in list %d",
906                                        t->is_client ? "client" : "server", id,
907                                        i));
908     }
909   }
910 
911   CHECK_EQ(send_initial_metadata_finished, nullptr);
912   CHECK_EQ(send_trailing_metadata_finished, nullptr);
913   CHECK_EQ(recv_initial_metadata_ready, nullptr);
914   CHECK_EQ(recv_message_ready, nullptr);
915   CHECK_EQ(recv_trailing_metadata_finished, nullptr);
916   grpc_slice_buffer_destroy(&flow_controlled_buffer);
917   grpc_core::ExecCtx::Run(DEBUG_LOCATION, destroy_stream_arg, absl::OkStatus());
918 }
919 
InitStream(grpc_stream * gs,grpc_stream_refcount * refcount,const void * server_data,grpc_core::Arena * arena)920 void grpc_chttp2_transport::InitStream(grpc_stream* gs,
921                                        grpc_stream_refcount* refcount,
922                                        const void* server_data,
923                                        grpc_core::Arena* arena) {
924   new (gs) grpc_chttp2_stream(this, refcount, server_data, arena);
925 }
926 
destroy_stream_locked(void * sp,grpc_error_handle)927 static void destroy_stream_locked(void* sp, grpc_error_handle /*error*/) {
928   grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(sp);
929   s->~grpc_chttp2_stream();
930 }
931 
DestroyStream(grpc_stream * gs,grpc_closure * then_schedule_closure)932 void grpc_chttp2_transport::DestroyStream(grpc_stream* gs,
933                                           grpc_closure* then_schedule_closure) {
934   grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs);
935 
936   s->destroy_stream_arg = then_schedule_closure;
937   combiner->Run(
938       GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s, nullptr),
939       absl::OkStatus());
940 }
941 
grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport * t,uint32_t id)942 grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t,
943                                                       uint32_t id) {
944   if (t->accept_stream_cb == nullptr) {
945     return nullptr;
946   }
947   grpc_chttp2_stream* accepting = nullptr;
948   CHECK_EQ(t->accepting_stream, nullptr);
949   t->accepting_stream = &accepting;
950   t->accept_stream_cb(t->accept_stream_cb_user_data, t,
951                       reinterpret_cast<void*>(id));
952   t->accepting_stream = nullptr;
953   return accepting;
954 }
955 
956 //
957 // OUTPUT PROCESSING
958 //
959 
get_write_state_name(grpc_chttp2_write_state st)960 static const char* get_write_state_name(grpc_chttp2_write_state st) {
961   switch (st) {
962     case GRPC_CHTTP2_WRITE_STATE_IDLE:
963       return "IDLE";
964     case GRPC_CHTTP2_WRITE_STATE_WRITING:
965       return "WRITING";
966     case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
967       return "WRITING+MORE";
968   }
969   GPR_UNREACHABLE_CODE(return "UNKNOWN");
970 }
971 
set_write_state(grpc_chttp2_transport * t,grpc_chttp2_write_state st,const char * reason)972 static void set_write_state(grpc_chttp2_transport* t,
973                             grpc_chttp2_write_state st, const char* reason) {
974   GRPC_TRACE_LOG(http, INFO)
975       << "W:" << t << " " << (t->is_client ? "CLIENT" : "SERVER") << " ["
976       << t->peer_string.as_string_view() << "] state "
977       << get_write_state_name(t->write_state) << " -> "
978       << get_write_state_name(st) << " [" << reason << "]";
979   t->write_state = st;
980   // If the state is being reset back to idle, it means a write was just
981   // finished. Make sure all the run_after_write closures are scheduled.
982   //
983   // This is also our chance to close the transport if the transport was marked
984   // to be closed after all writes finish (for example, if we received a go-away
985   // from peer while we had some pending writes)
986   if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) {
987     grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &t->run_after_write);
988     if (!t->close_transport_on_writes_finished.ok()) {
989       grpc_error_handle err = t->close_transport_on_writes_finished;
990       t->close_transport_on_writes_finished = absl::OkStatus();
991       close_transport_locked(t, err);
992     }
993   }
994 }
995 
grpc_chttp2_initiate_write(grpc_chttp2_transport * t,grpc_chttp2_initiate_write_reason reason)996 void grpc_chttp2_initiate_write(grpc_chttp2_transport* t,
997                                 grpc_chttp2_initiate_write_reason reason) {
998   switch (t->write_state) {
999     case GRPC_CHTTP2_WRITE_STATE_IDLE:
1000       set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING,
1001                       grpc_chttp2_initiate_write_reason_string(reason));
1002       // Note that the 'write_action_begin_locked' closure is being scheduled
1003       // on the 'finally_scheduler' of t->combiner. This means that
1004       // 'write_action_begin_locked' is called only *after* all the other
1005       // closures (some of which are potentially initiating more writes on the
1006       // transport) are executed on the t->combiner.
1007       //
1008       // The reason for scheduling on finally_scheduler is to make sure we batch
1009       // as many writes as possible. 'write_action_begin_locked' is the function
1010       // that gathers all the relevant bytes (which are at various places in the
1011       // grpc_chttp2_transport structure) and append them to 'outbuf' field in
1012       // grpc_chttp2_transport thereby batching what would have been potentially
1013       // multiple write operations.
1014       //
1015       // Also, 'write_action_begin_locked' only gathers the bytes into outbuf.
1016       // It does not call the endpoint to write the bytes. That is done by the
1017       // 'write_action' (which is scheduled by 'write_action_begin_locked')
1018       t->combiner->FinallyRun(
1019           grpc_core::InitTransportClosure<write_action_begin_locked>(
1020               t->Ref(), &t->write_action_begin_locked),
1021           absl::OkStatus());
1022       break;
1023     case GRPC_CHTTP2_WRITE_STATE_WRITING:
1024       set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
1025                       grpc_chttp2_initiate_write_reason_string(reason));
1026       break;
1027     case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
1028       break;
1029   }
1030 }
1031 
grpc_chttp2_mark_stream_writable(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1032 void grpc_chttp2_mark_stream_writable(grpc_chttp2_transport* t,
1033                                       grpc_chttp2_stream* s) {
1034   if (t->closed_with_error.ok() && grpc_chttp2_list_add_writable_stream(t, s)) {
1035     GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become");
1036   }
1037 }
1038 
begin_writing_desc(bool partial)1039 static const char* begin_writing_desc(bool partial) {
1040   if (partial) {
1041     return "begin partial write in background";
1042   } else {
1043     return "begin write in current thread";
1044   }
1045 }
1046 
write_action_begin_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle)1047 static void write_action_begin_locked(
1048     grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
1049     grpc_error_handle /*error_ignored*/) {
1050   GRPC_LATENT_SEE_INNER_SCOPE("write_action_begin_locked");
1051   CHECK(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE);
1052   grpc_chttp2_begin_write_result r;
1053   if (!t->closed_with_error.ok()) {
1054     r.writing = false;
1055   } else {
1056     r = grpc_chttp2_begin_write(t.get());
1057   }
1058   if (r.writing) {
1059     set_write_state(t.get(),
1060                     r.partial ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE
1061                               : GRPC_CHTTP2_WRITE_STATE_WRITING,
1062                     begin_writing_desc(r.partial));
1063     write_action(t.get());
1064     if (t->reading_paused_on_pending_induced_frames) {
1065       CHECK_EQ(t->num_pending_induced_frames, 0u);
1066       // We had paused reading, because we had many induced frames (SETTINGS
1067       // ACK, PINGS ACK and RST_STREAMS) pending in t->qbuf. Now that we have
1068       // been able to flush qbuf, we can resume reading.
1069       GRPC_TRACE_LOG(http, INFO)
1070           << "transport " << t.get()
1071           << " : Resuming reading after being paused due to too many unwritten "
1072              "SETTINGS ACK, PINGS ACK and RST_STREAM frames";
1073       t->reading_paused_on_pending_induced_frames = false;
1074       continue_read_action_locked(std::move(t));
1075     }
1076   } else {
1077     set_write_state(t.get(), GRPC_CHTTP2_WRITE_STATE_IDLE,
1078                     "begin writing nothing");
1079   }
1080 }
1081 
write_action(grpc_chttp2_transport * t)1082 static void write_action(grpc_chttp2_transport* t) {
1083   void* cl = t->context_list;
1084   if (!t->context_list->empty()) {
1085     // Transfer the ownership of the context list to the endpoint and create and
1086     // associate a new context list with the transport.
1087     // The old context list is stored in the cl local variable which is passed
1088     // to the endpoint. Its upto the endpoint to manage its lifetime.
1089     t->context_list = new grpc_core::ContextList();
1090   } else {
1091     // t->cl is Empty. There is nothing to trace in this endpoint_write. set cl
1092     // to nullptr.
1093     cl = nullptr;
1094   }
1095   // Choose max_frame_size as the preferred rx crypto frame size indicated by
1096   // the peer.
1097   int max_frame_size =
1098       t->settings.peer().preferred_receive_crypto_message_size();
1099   // Note: max frame size is 0 if the remote peer does not support adjusting the
1100   // sending frame size.
1101   if (max_frame_size == 0) {
1102     max_frame_size = INT_MAX;
1103   }
1104   GRPC_TRACE_LOG(http2_ping, INFO)
1105       << (t->is_client ? "CLIENT" : "SERVER") << "[" << t << "]: Write "
1106       << t->outbuf.Length() << " bytes";
1107   t->write_size_policy.BeginWrite(t->outbuf.Length());
1108   grpc_endpoint_write(t->ep.get(), t->outbuf.c_slice_buffer(),
1109                       grpc_core::InitTransportClosure<write_action_end>(
1110                           t->Ref(), &t->write_action_end_locked),
1111                       cl, max_frame_size);
1112 }
1113 
write_action_end(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)1114 static void write_action_end(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
1115                              grpc_error_handle error) {
1116   auto* tp = t.get();
1117   GRPC_TRACE_LOG(http2_ping, INFO) << (t->is_client ? "CLIENT" : "SERVER")
1118                                    << "[" << t.get() << "]: Finish write";
1119   tp->combiner->Run(grpc_core::InitTransportClosure<write_action_end_locked>(
1120                         std::move(t), &tp->write_action_end_locked),
1121                     error);
1122 }
1123 
1124 // Callback from the grpc_endpoint after bytes have been written by calling
1125 // sendmsg
write_action_end_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)1126 static void write_action_end_locked(
1127     grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
1128     grpc_error_handle error) {
1129   t->write_size_policy.EndWrite(error.ok());
1130 
1131   bool closed = false;
1132   if (!error.ok()) {
1133     close_transport_locked(t.get(), error);
1134     closed = true;
1135   }
1136 
1137   if (t->sent_goaway_state == GRPC_CHTTP2_FINAL_GOAWAY_SEND_SCHEDULED) {
1138     t->sent_goaway_state = GRPC_CHTTP2_FINAL_GOAWAY_SENT;
1139     closed = true;
1140     if (t->stream_map.empty()) {
1141       close_transport_locked(t.get(), GRPC_ERROR_CREATE("goaway sent"));
1142     }
1143   }
1144 
1145   switch (t->write_state) {
1146     case GRPC_CHTTP2_WRITE_STATE_IDLE:
1147       GPR_UNREACHABLE_CODE(break);
1148     case GRPC_CHTTP2_WRITE_STATE_WRITING:
1149       set_write_state(t.get(), GRPC_CHTTP2_WRITE_STATE_IDLE, "finish writing");
1150       break;
1151     case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
1152       set_write_state(t.get(), GRPC_CHTTP2_WRITE_STATE_WRITING,
1153                       "continue writing");
1154       // If the transport is closed, we will retry writing on the endpoint
1155       // and next write may contain part of the currently serialized frames.
1156       // So, we should only call the run_after_write callbacks when the next
1157       // write finishes, or the callbacks will be invoked when the stream is
1158       // closed.
1159       if (!closed) {
1160         grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &t->run_after_write);
1161       }
1162       t->combiner->FinallyRun(
1163           grpc_core::InitTransportClosure<write_action_begin_locked>(
1164               t, &t->write_action_begin_locked),
1165           absl::OkStatus());
1166       break;
1167   }
1168 
1169   grpc_chttp2_end_write(t.get(), error);
1170 }
1171 
1172 // Cancel out streams that haven't yet started if we have received a GOAWAY
cancel_unstarted_streams(grpc_chttp2_transport * t,grpc_error_handle error,bool tarpit)1173 static void cancel_unstarted_streams(grpc_chttp2_transport* t,
1174                                      grpc_error_handle error, bool tarpit) {
1175   grpc_chttp2_stream* s;
1176   while (grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1177     s->trailing_metadata_buffer.Set(
1178         grpc_core::GrpcStreamNetworkState(),
1179         grpc_core::GrpcStreamNetworkState::kNotSentOnWire);
1180     grpc_chttp2_cancel_stream(t, s, error, tarpit);
1181   }
1182 }
1183 
grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport * t,uint32_t goaway_error,uint32_t last_stream_id,absl::string_view goaway_text)1184 void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
1185                                      uint32_t goaway_error,
1186                                      uint32_t last_stream_id,
1187                                      absl::string_view goaway_text) {
1188   t->goaway_error = grpc_error_set_int(
1189       grpc_error_set_int(
1190           grpc_core::StatusCreate(
1191               absl::StatusCode::kUnavailable,
1192               absl::StrFormat("GOAWAY received; Error code: %u; Debug Text: %s",
1193                               goaway_error, goaway_text),
1194               DEBUG_LOCATION, {}),
1195           grpc_core::StatusIntProperty::kHttp2Error,
1196           static_cast<intptr_t>(goaway_error)),
1197       grpc_core::StatusIntProperty::kRpcStatus, GRPC_STATUS_UNAVAILABLE);
1198 
1199   GRPC_TRACE_LOG(http, INFO)
1200       << "transport " << t << " got goaway with last stream id "
1201       << last_stream_id;
1202   // We want to log this irrespective of whether http tracing is enabled if we
1203   // received a GOAWAY with a non NO_ERROR code.
1204   if (goaway_error != GRPC_HTTP2_NO_ERROR) {
1205     LOG(INFO) << t->peer_string.as_string_view() << ": Got goaway ["
1206               << goaway_error
1207               << "] err=" << grpc_core::StatusToString(t->goaway_error);
1208   }
1209   if (t->is_client) {
1210     cancel_unstarted_streams(t, t->goaway_error, false);
1211     // Cancel all unseen streams
1212     std::vector<grpc_chttp2_stream*> to_cancel;
1213     for (auto id_stream : t->stream_map) {
1214       if (id_stream.first > last_stream_id) {
1215         to_cancel.push_back(id_stream.second);
1216       }
1217     }
1218     for (auto s : to_cancel) {
1219       s->trailing_metadata_buffer.Set(
1220           grpc_core::GrpcStreamNetworkState(),
1221           grpc_core::GrpcStreamNetworkState::kNotSeenByServer);
1222       grpc_chttp2_cancel_stream(s->t.get(), s, s->t->goaway_error, false);
1223     }
1224   }
1225   absl::Status status = grpc_error_to_absl_status(t->goaway_error);
1226   // When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug
1227   // data equal to "too_many_pings", it should log the occurrence at a log level
1228   // that is enabled by default and double the configured KEEPALIVE_TIME used
1229   // for new connections on that channel.
1230   if (GPR_UNLIKELY(t->is_client &&
1231                    goaway_error == GRPC_HTTP2_ENHANCE_YOUR_CALM &&
1232                    goaway_text == "too_many_pings")) {
1233     LOG(ERROR) << t->peer_string.as_string_view()
1234                << ": Received a GOAWAY with error code ENHANCE_YOUR_CALM and "
1235                   "debug data equal to \"too_many_pings\". Current keepalive "
1236                   "time (before throttling): "
1237                << t->keepalive_time.ToString();
1238     constexpr int max_keepalive_time_millis =
1239         INT_MAX / KEEPALIVE_TIME_BACKOFF_MULTIPLIER;
1240     int64_t throttled_keepalive_time =
1241         t->keepalive_time.millis() > max_keepalive_time_millis
1242             ? INT_MAX
1243             : t->keepalive_time.millis() * KEEPALIVE_TIME_BACKOFF_MULTIPLIER;
1244     status.SetPayload(grpc_core::kKeepaliveThrottlingKey,
1245                       absl::Cord(std::to_string(throttled_keepalive_time)));
1246   }
1247   // lie: use transient failure from the transport to indicate goaway has been
1248   // received.
1249   if (!grpc_core::test_only_disable_transient_failure_state_notification) {
1250     connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE, status,
1251                            "got_goaway");
1252   }
1253 }
1254 
maybe_start_some_streams(grpc_chttp2_transport * t)1255 static void maybe_start_some_streams(grpc_chttp2_transport* t) {
1256   grpc_chttp2_stream* s;
1257   // maybe cancel out streams that haven't yet started if we have received a
1258   // GOAWAY
1259   if (!t->goaway_error.ok()) {
1260     cancel_unstarted_streams(t, t->goaway_error, false);
1261     return;
1262   }
1263   // start streams where we have free grpc_chttp2_stream ids and free
1264   // * concurrency
1265   while (t->next_stream_id <= MAX_CLIENT_STREAM_ID &&
1266          t->stream_map.size() < t->settings.peer().max_concurrent_streams() &&
1267          grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1268     // safe since we can't (legally) be parsing this stream yet
1269     GRPC_TRACE_LOG(http, INFO)
1270         << "HTTP:" << (t->is_client ? "CLI" : "SVR") << ": Transport " << t
1271         << " allocating new grpc_chttp2_stream " << s << " to id "
1272         << t->next_stream_id;
1273 
1274     CHECK_EQ(s->id, 0u);
1275     s->id = t->next_stream_id;
1276     t->next_stream_id += 2;
1277 
1278     if (t->next_stream_id >= MAX_CLIENT_STREAM_ID) {
1279       connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE,
1280                              absl::Status(absl::StatusCode::kUnavailable,
1281                                           "Transport Stream IDs exhausted"),
1282                              "no_more_stream_ids");
1283     }
1284 
1285     t->stream_map.emplace(s->id, s);
1286     post_destructive_reclaimer(t);
1287     grpc_chttp2_mark_stream_writable(t, s);
1288     grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM);
1289   }
1290   // cancel out streams that will never be started
1291   if (t->next_stream_id >= MAX_CLIENT_STREAM_ID) {
1292     while (grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1293       s->trailing_metadata_buffer.Set(
1294           grpc_core::GrpcStreamNetworkState(),
1295           grpc_core::GrpcStreamNetworkState::kNotSentOnWire);
1296       grpc_chttp2_cancel_stream(
1297           t, s,
1298           grpc_error_set_int(GRPC_ERROR_CREATE("Stream IDs exhausted"),
1299                              grpc_core::StatusIntProperty::kRpcStatus,
1300                              GRPC_STATUS_UNAVAILABLE),
1301           false);
1302     }
1303   }
1304 }
1305 
add_closure_barrier(grpc_closure * closure)1306 static grpc_closure* add_closure_barrier(grpc_closure* closure) {
1307   closure->next_data.scratch += CLOSURE_BARRIER_FIRST_REF_BIT;
1308   return closure;
1309 }
1310 
null_then_sched_closure(grpc_closure ** closure)1311 static void null_then_sched_closure(grpc_closure** closure) {
1312   grpc_closure* c = *closure;
1313   *closure = nullptr;
1314   // null_then_schedule_closure might be run during a start_batch which might
1315   // subsequently examine the batch for more operations contained within.
1316   // However, the closure run might make it back to the call object, push a
1317   // completion, have the application see it, and make a new operation on the
1318   // call which recycles the batch BEFORE the call to start_batch completes,
1319   // forcing a race.
1320   grpc_core::ExecCtx::Run(DEBUG_LOCATION, c, absl::OkStatus());
1321 }
1322 
grpc_chttp2_complete_closure_step(grpc_chttp2_transport * t,grpc_closure ** pclosure,grpc_error_handle error,const char * desc,grpc_core::DebugLocation whence)1323 void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
1324                                        grpc_closure** pclosure,
1325                                        grpc_error_handle error,
1326                                        const char* desc,
1327                                        grpc_core::DebugLocation whence) {
1328   grpc_closure* closure = *pclosure;
1329   *pclosure = nullptr;
1330   if (closure == nullptr) {
1331     return;
1332   }
1333   closure->next_data.scratch -= CLOSURE_BARRIER_FIRST_REF_BIT;
1334   GRPC_TRACE_LOG(http, INFO)
1335       << "complete_closure_step: t=" << t << " " << closure << " refs="
1336       << (closure->next_data.scratch / CLOSURE_BARRIER_FIRST_REF_BIT)
1337       << " flags="
1338       << (closure->next_data.scratch % CLOSURE_BARRIER_FIRST_REF_BIT)
1339       << " desc=" << desc << " err=" << grpc_core::StatusToString(error)
1340       << " write_state=" << get_write_state_name(t->write_state)
1341       << " whence=" << whence.file() << ":" << whence.line();
1342 
1343   if (!error.ok()) {
1344     grpc_error_handle cl_err =
1345         grpc_core::internal::StatusMoveFromHeapPtr(closure->error_data.error);
1346     if (cl_err.ok()) {
1347       cl_err = GRPC_ERROR_CREATE(absl::StrCat(
1348           "Error in HTTP transport completing operation: ", desc,
1349           " write_state=", get_write_state_name(t->write_state),
1350           " refs=", closure->next_data.scratch / CLOSURE_BARRIER_FIRST_REF_BIT,
1351           " flags=", closure->next_data.scratch % CLOSURE_BARRIER_FIRST_REF_BIT,
1352           " peer_address=", t->peer_string.as_string_view()));
1353     }
1354     cl_err = grpc_error_add_child(cl_err, error);
1355     closure->error_data.error = grpc_core::internal::StatusAllocHeapPtr(cl_err);
1356   }
1357   if (closure->next_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) {
1358     if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) ||
1359         !(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) {
1360       // Using GRPC_CLOSURE_SCHED instead of GRPC_CLOSURE_RUN to avoid running
1361       // closures earlier than when it is safe to do so.
1362       grpc_error_handle run_error =
1363           grpc_core::internal::StatusMoveFromHeapPtr(closure->error_data.error);
1364       closure->error_data.error = 0;
1365       grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, run_error);
1366     } else {
1367       grpc_closure_list_append(&t->run_after_write, closure);
1368     }
1369   }
1370 }
1371 
contains_non_ok_status(grpc_metadata_batch * batch)1372 static bool contains_non_ok_status(grpc_metadata_batch* batch) {
1373   return batch->get(grpc_core::GrpcStatusMetadata()).value_or(GRPC_STATUS_OK) !=
1374          GRPC_STATUS_OK;
1375 }
1376 
log_metadata(const grpc_metadata_batch * md_batch,uint32_t id,const bool is_client,const bool is_initial)1377 static void log_metadata(const grpc_metadata_batch* md_batch, uint32_t id,
1378                          const bool is_client, const bool is_initial) {
1379   VLOG(2) << "--metadata--";
1380   const std::string prefix = absl::StrCat(
1381       "HTTP:", id, is_initial ? ":HDR" : ":TRL", is_client ? ":CLI:" : ":SVR:");
1382   md_batch->Log([&prefix](absl::string_view key, absl::string_view value) {
1383     VLOG(2) << prefix << key << ": " << value;
1384   });
1385 }
1386 
trace_annotations(grpc_chttp2_stream * s)1387 static void trace_annotations(grpc_chttp2_stream* s) {
1388   if (!grpc_core::IsCallTracerInTransportEnabled()) {
1389     if (s->call_tracer != nullptr) {
1390       s->call_tracer->RecordAnnotation(
1391           grpc_core::HttpAnnotation(grpc_core::HttpAnnotation::Type::kStart,
1392                                     gpr_now(GPR_CLOCK_REALTIME))
1393               .Add(s->t->flow_control.stats())
1394               .Add(s->flow_control.stats()));
1395     }
1396   } else if (grpc_core::IsTraceRecordCallopsEnabled()) {
1397     auto* call_tracer = s->arena->GetContext<grpc_core::CallTracerInterface>();
1398     if (call_tracer != nullptr && call_tracer->IsSampled()) {
1399       call_tracer->RecordAnnotation(
1400           grpc_core::HttpAnnotation(grpc_core::HttpAnnotation::Type::kStart,
1401                                     gpr_now(GPR_CLOCK_REALTIME))
1402               .Add(s->t->flow_control.stats())
1403               .Add(s->flow_control.stats()));
1404     }
1405   }
1406 }
1407 
send_initial_metadata_locked(grpc_transport_stream_op_batch * op,grpc_chttp2_stream * s,grpc_transport_stream_op_batch_payload * op_payload,grpc_chttp2_transport * t,grpc_closure * on_complete)1408 static void send_initial_metadata_locked(
1409     grpc_transport_stream_op_batch* op, grpc_chttp2_stream* s,
1410     grpc_transport_stream_op_batch_payload* op_payload,
1411     grpc_chttp2_transport* t, grpc_closure* on_complete) {
1412   trace_annotations(s);
1413   if (t->is_client && t->channelz_socket != nullptr) {
1414     t->channelz_socket->RecordStreamStartedFromLocal();
1415   }
1416   CHECK_EQ(s->send_initial_metadata_finished, nullptr);
1417   on_complete->next_data.scratch |= t->closure_barrier_may_cover_write;
1418 
1419   s->send_initial_metadata_finished = add_closure_barrier(on_complete);
1420   s->send_initial_metadata =
1421       op_payload->send_initial_metadata.send_initial_metadata;
1422   if (t->is_client) {
1423     s->deadline =
1424         std::min(s->deadline,
1425                  s->send_initial_metadata->get(grpc_core::GrpcTimeoutMetadata())
1426                      .value_or(grpc_core::Timestamp::InfFuture()));
1427   }
1428   if (contains_non_ok_status(s->send_initial_metadata)) {
1429     s->seen_error = true;
1430   }
1431   if (!s->write_closed) {
1432     if (t->is_client) {
1433       if (t->closed_with_error.ok()) {
1434         CHECK_EQ(s->id, 0u);
1435         if (t->max_concurrent_streams_reject_on_client &&
1436             t->stream_map.size() >=
1437                 t->settings.peer().max_concurrent_streams()) {
1438           s->trailing_metadata_buffer.Set(
1439               grpc_core::GrpcStreamNetworkState(),
1440               grpc_core::GrpcStreamNetworkState::kNotSentOnWire);
1441           grpc_chttp2_cancel_stream(
1442               t, s,
1443               grpc_error_set_int(
1444                   GRPC_ERROR_CREATE_REFERENCING("Too many streams",
1445                                                 &t->closed_with_error, 1),
1446                   grpc_core::StatusIntProperty::kRpcStatus,
1447                   GRPC_STATUS_RESOURCE_EXHAUSTED),
1448               false);
1449         } else {
1450           grpc_chttp2_list_add_waiting_for_concurrency(t, s);
1451           maybe_start_some_streams(t);
1452         }
1453       } else {
1454         s->trailing_metadata_buffer.Set(
1455             grpc_core::GrpcStreamNetworkState(),
1456             grpc_core::GrpcStreamNetworkState::kNotSentOnWire);
1457         grpc_chttp2_cancel_stream(
1458             t, s,
1459             grpc_error_set_int(
1460                 GRPC_ERROR_CREATE_REFERENCING("Transport closed",
1461                                               &t->closed_with_error, 1),
1462                 grpc_core::StatusIntProperty::kRpcStatus,
1463                 GRPC_STATUS_UNAVAILABLE),
1464             false);
1465       }
1466     } else {
1467       CHECK_NE(s->id, 0u);
1468       grpc_chttp2_mark_stream_writable(t, s);
1469       if (!(op->send_message &&
1470             (op->payload->send_message.flags & GRPC_WRITE_BUFFER_HINT))) {
1471         grpc_chttp2_initiate_write(
1472             t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA);
1473       }
1474     }
1475   } else {
1476     s->send_initial_metadata = nullptr;
1477     grpc_chttp2_complete_closure_step(
1478         t, &s->send_initial_metadata_finished,
1479         GRPC_ERROR_CREATE_REFERENCING(
1480             "Attempt to send initial metadata after stream was closed",
1481             &s->write_closed_error, 1),
1482         "send_initial_metadata_finished");
1483   }
1484 }
1485 
send_message_locked(grpc_transport_stream_op_batch * op,grpc_chttp2_stream * s,grpc_transport_stream_op_batch_payload * op_payload,grpc_chttp2_transport * t,grpc_closure * on_complete)1486 static void send_message_locked(
1487     grpc_transport_stream_op_batch* op, grpc_chttp2_stream* s,
1488     grpc_transport_stream_op_batch_payload* op_payload,
1489     grpc_chttp2_transport* t, grpc_closure* on_complete) {
1490   t->num_messages_in_next_write++;
1491   grpc_core::global_stats().IncrementHttp2SendMessageSize(
1492       op->payload->send_message.send_message->Length());
1493   on_complete->next_data.scratch |= t->closure_barrier_may_cover_write;
1494   s->send_message_finished = add_closure_barrier(op->on_complete);
1495   uint32_t flags = 0;
1496   if (s->write_closed) {
1497     op->payload->send_message.stream_write_closed = true;
1498     // We should NOT return an error here, so as to avoid a cancel OP being
1499     // started. The surface layer will notice that the stream has been closed
1500     // for writes and fail the send message op.
1501     grpc_chttp2_complete_closure_step(t, &s->send_message_finished,
1502                                       absl::OkStatus(),
1503                                       "fetching_send_message_finished");
1504   } else {
1505     // Buffer hint is used to buffer the message in the transport until the
1506     // write buffer size (specified through GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE) is
1507     // reached. This is to batch writes sent down to tcp. However, if the memory
1508     // pressure is high, disable the buffer hint to flush data down to tcp as
1509     // soon as possible to avoid OOM.
1510     if (grpc_core::IsDisableBufferHintOnHighMemoryPressureEnabled() &&
1511         t->memory_owner.GetPressureInfo().pressure_control_value >= 0.8) {
1512       // Disable write buffer hint if memory pressure is high. The value of 0.8
1513       // is chosen to match the threshold used by the tcp endpoint (in
1514       // allocating memory for socket reads).
1515       op_payload->send_message.flags &= ~GRPC_WRITE_BUFFER_HINT;
1516     }
1517     flags = op_payload->send_message.flags;
1518     uint8_t* frame_hdr = grpc_slice_buffer_tiny_add(&s->flow_controlled_buffer,
1519                                                     GRPC_HEADER_SIZE_IN_BYTES);
1520     frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0;
1521     size_t len = op_payload->send_message.send_message->Length();
1522     frame_hdr[1] = static_cast<uint8_t>(len >> 24);
1523     frame_hdr[2] = static_cast<uint8_t>(len >> 16);
1524     frame_hdr[3] = static_cast<uint8_t>(len >> 8);
1525     frame_hdr[4] = static_cast<uint8_t>(len);
1526 
1527     s->call_tracer_wrapper.RecordOutgoingBytes(
1528         {GRPC_HEADER_SIZE_IN_BYTES, len, 0});
1529     s->next_message_end_offset =
1530         s->flow_controlled_bytes_written +
1531         static_cast<int64_t>(s->flow_controlled_buffer.length) +
1532         static_cast<int64_t>(len);
1533     if (flags & GRPC_WRITE_BUFFER_HINT) {
1534       s->next_message_end_offset -= t->write_buffer_size;
1535       s->write_buffering = true;
1536     } else {
1537       s->write_buffering = false;
1538     }
1539 
1540     grpc_slice* const slices =
1541         op_payload->send_message.send_message->c_slice_buffer()->slices;
1542     grpc_slice* const end =
1543         slices + op_payload->send_message.send_message->Count();
1544     for (grpc_slice* slice = slices; slice != end; slice++) {
1545       grpc_slice_buffer_add(&s->flow_controlled_buffer,
1546                             grpc_core::CSliceRef(*slice));
1547     }
1548 
1549     int64_t notify_offset = s->next_message_end_offset;
1550     if (notify_offset <= s->flow_controlled_bytes_written) {
1551       grpc_chttp2_complete_closure_step(t, &s->send_message_finished,
1552                                         absl::OkStatus(),
1553                                         "fetching_send_message_finished");
1554     } else {
1555       grpc_chttp2_write_cb* cb = t->write_cb_pool;
1556       if (cb == nullptr) {
1557         cb = static_cast<grpc_chttp2_write_cb*>(gpr_malloc(sizeof(*cb)));
1558       } else {
1559         t->write_cb_pool = cb->next;
1560       }
1561       cb->call_at_byte = notify_offset;
1562       cb->closure = s->send_message_finished;
1563       s->send_message_finished = nullptr;
1564       grpc_chttp2_write_cb** list = flags & GRPC_WRITE_THROUGH
1565                                         ? &s->on_write_finished_cbs
1566                                         : &s->on_flow_controlled_cbs;
1567       cb->next = *list;
1568       *list = cb;
1569     }
1570 
1571     if (s->id != 0 && (!s->write_buffering || s->flow_controlled_buffer.length >
1572                                                   t->write_buffer_size)) {
1573       grpc_chttp2_mark_stream_writable(t, s);
1574       grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE);
1575     }
1576   }
1577 }
1578 
send_trailing_metadata_locked(grpc_transport_stream_op_batch * op,grpc_chttp2_stream * s,grpc_transport_stream_op_batch_payload * op_payload,grpc_chttp2_transport * t,grpc_closure * on_complete)1579 static void send_trailing_metadata_locked(
1580     grpc_transport_stream_op_batch* op, grpc_chttp2_stream* s,
1581     grpc_transport_stream_op_batch_payload* op_payload,
1582     grpc_chttp2_transport* t, grpc_closure* on_complete) {
1583   CHECK_EQ(s->send_trailing_metadata_finished, nullptr);
1584   on_complete->next_data.scratch |= t->closure_barrier_may_cover_write;
1585   s->send_trailing_metadata_finished = add_closure_barrier(on_complete);
1586   s->send_trailing_metadata =
1587       op_payload->send_trailing_metadata.send_trailing_metadata;
1588   s->sent_trailing_metadata_op = op_payload->send_trailing_metadata.sent;
1589   s->write_buffering = false;
1590   if (contains_non_ok_status(s->send_trailing_metadata)) {
1591     s->seen_error = true;
1592   }
1593   if (s->write_closed) {
1594     s->send_trailing_metadata = nullptr;
1595     s->sent_trailing_metadata_op = nullptr;
1596     grpc_chttp2_complete_closure_step(
1597         t, &s->send_trailing_metadata_finished,
1598         op->payload->send_trailing_metadata.send_trailing_metadata->empty()
1599             ? absl::OkStatus()
1600             : GRPC_ERROR_CREATE("Attempt to send trailing metadata after "
1601                                 "stream was closed"),
1602         "send_trailing_metadata_finished");
1603   } else if (s->id != 0) {
1604     // TODO(ctiller): check if there's flow control for any outstanding
1605     //   bytes before going writable
1606     grpc_chttp2_mark_stream_writable(t, s);
1607     grpc_chttp2_initiate_write(
1608         t, GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA);
1609   }
1610 }
1611 
recv_initial_metadata_locked(grpc_chttp2_stream * s,grpc_transport_stream_op_batch_payload * op_payload,grpc_chttp2_transport * t)1612 static void recv_initial_metadata_locked(
1613     grpc_chttp2_stream* s, grpc_transport_stream_op_batch_payload* op_payload,
1614     grpc_chttp2_transport* t) {
1615   CHECK_EQ(s->recv_initial_metadata_ready, nullptr);
1616   s->recv_initial_metadata_ready =
1617       op_payload->recv_initial_metadata.recv_initial_metadata_ready;
1618   s->recv_initial_metadata =
1619       op_payload->recv_initial_metadata.recv_initial_metadata;
1620   s->trailing_metadata_available =
1621       op_payload->recv_initial_metadata.trailing_metadata_available;
1622   if (s->parsed_trailers_only && s->trailing_metadata_available != nullptr) {
1623     *s->trailing_metadata_available = true;
1624   }
1625   grpc_chttp2_maybe_complete_recv_initial_metadata(t, s);
1626 }
1627 
recv_message_locked(grpc_chttp2_stream * s,grpc_transport_stream_op_batch_payload * op_payload,grpc_chttp2_transport * t)1628 static void recv_message_locked(
1629     grpc_chttp2_stream* s, grpc_transport_stream_op_batch_payload* op_payload,
1630     grpc_chttp2_transport* t) {
1631   CHECK_EQ(s->recv_message_ready, nullptr);
1632   s->recv_message_ready = op_payload->recv_message.recv_message_ready;
1633   s->recv_message = op_payload->recv_message.recv_message;
1634   s->recv_message->emplace();
1635   s->recv_message_flags = op_payload->recv_message.flags;
1636   s->call_failed_before_recv_message =
1637       op_payload->recv_message.call_failed_before_recv_message;
1638   grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
1639 }
1640 
recv_trailing_metadata_locked(grpc_chttp2_stream * s,grpc_transport_stream_op_batch_payload * op_payload,grpc_chttp2_transport * t)1641 static void recv_trailing_metadata_locked(
1642     grpc_chttp2_stream* s, grpc_transport_stream_op_batch_payload* op_payload,
1643     grpc_chttp2_transport* t) {
1644   CHECK_EQ(s->collecting_stats, nullptr);
1645   s->collecting_stats = op_payload->recv_trailing_metadata.collect_stats;
1646   CHECK_EQ(s->recv_trailing_metadata_finished, nullptr);
1647   s->recv_trailing_metadata_finished =
1648       op_payload->recv_trailing_metadata.recv_trailing_metadata_ready;
1649   s->recv_trailing_metadata =
1650       op_payload->recv_trailing_metadata.recv_trailing_metadata;
1651   s->final_metadata_requested = true;
1652   grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
1653 }
1654 
perform_stream_op_locked(void * stream_op,grpc_error_handle)1655 static void perform_stream_op_locked(void* stream_op,
1656                                      grpc_error_handle /*error_ignored*/) {
1657   grpc_transport_stream_op_batch* op =
1658       static_cast<grpc_transport_stream_op_batch*>(stream_op);
1659   grpc_chttp2_stream* s =
1660       static_cast<grpc_chttp2_stream*>(op->handler_private.extra_arg);
1661   grpc_transport_stream_op_batch_payload* op_payload = op->payload;
1662   grpc_chttp2_transport* t = s->t.get();
1663 
1664   s->traced = op->is_traced;
1665   if (!grpc_core::IsCallTracerInTransportEnabled()) {
1666     s->call_tracer = CallTracerIfSampled(s);
1667   }
1668   s->tcp_tracer = TcpTracerIfSampled(s);
1669   if (GRPC_TRACE_FLAG_ENABLED(http)) {
1670     LOG(INFO) << "perform_stream_op_locked[s=" << s << "; op=" << op
1671               << "]: " << grpc_transport_stream_op_batch_string(op, false)
1672               << "; on_complete = " << op->on_complete;
1673     if (op->send_initial_metadata) {
1674       log_metadata(op_payload->send_initial_metadata.send_initial_metadata,
1675                    s->id, t->is_client, true);
1676     }
1677     if (op->send_trailing_metadata) {
1678       log_metadata(op_payload->send_trailing_metadata.send_trailing_metadata,
1679                    s->id, t->is_client, false);
1680     }
1681   }
1682 
1683   grpc_closure* on_complete = op->on_complete;
1684   // on_complete will be null if and only if there are no send ops in the batch.
1685   if (on_complete != nullptr) {
1686     // This batch has send ops. Use final_data as a barrier until enqueue time;
1687     // the initial counter is dropped at the end of this function.
1688     on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT;
1689     on_complete->error_data.error = 0;
1690   }
1691 
1692   if (op->cancel_stream) {
1693     grpc_chttp2_cancel_stream(t, s, op_payload->cancel_stream.cancel_error,
1694                               op_payload->cancel_stream.tarpit);
1695   }
1696 
1697   if (op->send_initial_metadata) {
1698     send_initial_metadata_locked(op, s, op_payload, t, on_complete);
1699   }
1700 
1701   if (op->send_message) {
1702     send_message_locked(op, s, op_payload, t, on_complete);
1703   }
1704 
1705   if (op->send_trailing_metadata) {
1706     send_trailing_metadata_locked(op, s, op_payload, t, on_complete);
1707   }
1708 
1709   if (op->recv_initial_metadata) {
1710     recv_initial_metadata_locked(s, op_payload, t);
1711   }
1712 
1713   if (op->recv_message) {
1714     recv_message_locked(s, op_payload, t);
1715   }
1716 
1717   if (op->recv_trailing_metadata) {
1718     recv_trailing_metadata_locked(s, op_payload, t);
1719   }
1720 
1721   if (on_complete != nullptr) {
1722     grpc_chttp2_complete_closure_step(t, &on_complete, absl::OkStatus(),
1723                                       "op->on_complete");
1724   }
1725 
1726   GRPC_CHTTP2_STREAM_UNREF(s, "perform_stream_op");
1727 }
1728 
PerformStreamOp(grpc_stream * gs,grpc_transport_stream_op_batch * op)1729 void grpc_chttp2_transport::PerformStreamOp(
1730     grpc_stream* gs, grpc_transport_stream_op_batch* op) {
1731   grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs);
1732 
1733   if (!is_client) {
1734     if (op->send_initial_metadata) {
1735       CHECK(!op->payload->send_initial_metadata.send_initial_metadata
1736                  ->get(grpc_core::GrpcTimeoutMetadata())
1737                  .has_value());
1738     }
1739     if (op->send_trailing_metadata) {
1740       CHECK(!op->payload->send_trailing_metadata.send_trailing_metadata
1741                  ->get(grpc_core::GrpcTimeoutMetadata())
1742                  .has_value());
1743     }
1744   }
1745 
1746   GRPC_TRACE_LOG(http, INFO)
1747       << "perform_stream_op[s=" << s << "; op=" << op
1748       << "]: " << grpc_transport_stream_op_batch_string(op, false);
1749 
1750   GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op");
1751   op->handler_private.extra_arg = gs;
1752   combiner->Run(GRPC_CLOSURE_INIT(&op->handler_private.closure,
1753                                   perform_stream_op_locked, op, nullptr),
1754                 absl::OkStatus());
1755 }
1756 
cancel_pings(grpc_chttp2_transport * t,grpc_error_handle error)1757 static void cancel_pings(grpc_chttp2_transport* t, grpc_error_handle error) {
1758   GRPC_TRACE_LOG(http, INFO)
1759       << t << " CANCEL PINGS: " << grpc_core::StatusToString(error);
1760   // callback remaining pings: they're not allowed to call into the transport,
1761   //   and maybe they hold resources that need to be freed
1762   t->ping_callbacks.CancelAll(t->event_engine.get());
1763 }
1764 
1765 namespace {
1766 class PingClosureWrapper {
1767  public:
PingClosureWrapper(grpc_closure * closure)1768   explicit PingClosureWrapper(grpc_closure* closure) : closure_(closure) {}
1769   PingClosureWrapper(const PingClosureWrapper&) = delete;
1770   PingClosureWrapper& operator=(const PingClosureWrapper&) = delete;
PingClosureWrapper(PingClosureWrapper && other)1771   PingClosureWrapper(PingClosureWrapper&& other) noexcept
1772       : closure_(other.Take()) {}
operator =(PingClosureWrapper && other)1773   PingClosureWrapper& operator=(PingClosureWrapper&& other) noexcept {
1774     std::swap(closure_, other.closure_);
1775     return *this;
1776   }
~PingClosureWrapper()1777   ~PingClosureWrapper() {
1778     if (closure_ != nullptr) {
1779       grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure_, absl::CancelledError());
1780     }
1781   }
1782 
operator ()()1783   void operator()() {
1784     grpc_core::ExecCtx::Run(DEBUG_LOCATION, Take(), absl::OkStatus());
1785   }
1786 
1787  private:
Take()1788   grpc_closure* Take() { return std::exchange(closure_, nullptr); }
1789 
1790   grpc_closure* closure_ = nullptr;
1791 };
1792 }  // namespace
1793 
send_ping_locked(grpc_chttp2_transport * t,grpc_closure * on_initiate,grpc_closure * on_ack)1794 static void send_ping_locked(grpc_chttp2_transport* t,
1795                              grpc_closure* on_initiate, grpc_closure* on_ack) {
1796   if (!t->closed_with_error.ok()) {
1797     grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_initiate, t->closed_with_error);
1798     grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_ack, t->closed_with_error);
1799     return;
1800   }
1801   t->ping_callbacks.OnPing(PingClosureWrapper(on_initiate),
1802                            PingClosureWrapper(on_ack));
1803 }
1804 
1805 // Specialized form of send_ping_locked for keepalive ping. If there is already
1806 // a ping in progress, the keepalive ping would piggyback onto that ping,
1807 // instead of waiting for that ping to complete and then starting a new ping.
send_keepalive_ping_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t)1808 static void send_keepalive_ping_locked(
1809     grpc_core::RefCountedPtr<grpc_chttp2_transport> t) {
1810   if (!t->closed_with_error.ok()) {
1811     t->combiner->Run(
1812         grpc_core::InitTransportClosure<finish_keepalive_ping_locked>(
1813             t->Ref(), &t->finish_keepalive_ping_locked),
1814         t->closed_with_error);
1815     return;
1816   }
1817   t->ping_callbacks.OnPingAck(
1818       PingClosureWrapper(grpc_core::InitTransportClosure<finish_keepalive_ping>(
1819           t->Ref(), &t->finish_keepalive_ping_locked)));
1820 }
1821 
grpc_chttp2_retry_initiate_ping(grpc_core::RefCountedPtr<grpc_chttp2_transport> t)1822 void grpc_chttp2_retry_initiate_ping(
1823     grpc_core::RefCountedPtr<grpc_chttp2_transport> t) {
1824   auto tp = t.get();
1825   tp->combiner->Run(grpc_core::InitTransportClosure<retry_initiate_ping_locked>(
1826                         std::move(t), &tp->retry_initiate_ping_locked),
1827                     absl::OkStatus());
1828 }
1829 
retry_initiate_ping_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,GRPC_UNUSED grpc_error_handle error)1830 static void retry_initiate_ping_locked(
1831     grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
1832     GRPC_UNUSED grpc_error_handle error) {
1833   DCHECK(error.ok());
1834   CHECK(t->delayed_ping_timer_handle != TaskHandle::kInvalid);
1835   t->delayed_ping_timer_handle = TaskHandle::kInvalid;
1836   grpc_chttp2_initiate_write(t.get(),
1837                              GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING);
1838 }
1839 
grpc_chttp2_ack_ping(grpc_chttp2_transport * t,uint64_t id)1840 void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) {
1841   if (!t->ping_callbacks.AckPing(id, t->event_engine.get())) {
1842     VLOG(2) << "Unknown ping response from " << t->peer_string.as_string_view()
1843             << ": " << id;
1844     return;
1845   }
1846   if (t->ping_callbacks.ping_requested()) {
1847     grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS);
1848   }
1849 }
1850 
grpc_chttp2_keepalive_timeout(grpc_core::RefCountedPtr<grpc_chttp2_transport> t)1851 void grpc_chttp2_keepalive_timeout(
1852     grpc_core::RefCountedPtr<grpc_chttp2_transport> t) {
1853   t->combiner->Run(
1854       grpc_core::NewClosure([t](grpc_error_handle) {
1855         GRPC_TRACE_LOG(http, INFO) << t->peer_string.as_string_view()
1856                                    << ": Keepalive timeout. Closing transport.";
1857         send_goaway(
1858             t.get(),
1859             grpc_error_set_int(GRPC_ERROR_CREATE("keepalive_timeout"),
1860                                grpc_core::StatusIntProperty::kHttp2Error,
1861                                GRPC_HTTP2_ENHANCE_YOUR_CALM),
1862             /*immediate_disconnect_hint=*/true);
1863         close_transport_locked(
1864             t.get(),
1865             grpc_error_set_int(GRPC_ERROR_CREATE("keepalive timeout"),
1866                                grpc_core::StatusIntProperty::kRpcStatus,
1867                                GRPC_STATUS_UNAVAILABLE));
1868       }),
1869       absl::OkStatus());
1870 }
1871 
grpc_chttp2_ping_timeout(grpc_core::RefCountedPtr<grpc_chttp2_transport> t)1872 void grpc_chttp2_ping_timeout(
1873     grpc_core::RefCountedPtr<grpc_chttp2_transport> t) {
1874   t->combiner->Run(
1875       grpc_core::NewClosure([t](grpc_error_handle) {
1876         GRPC_TRACE_LOG(http, INFO) << t->peer_string.as_string_view()
1877                                    << ": Ping timeout. Closing transport.";
1878         send_goaway(
1879             t.get(),
1880             grpc_error_set_int(GRPC_ERROR_CREATE("ping_timeout"),
1881                                grpc_core::StatusIntProperty::kHttp2Error,
1882                                GRPC_HTTP2_ENHANCE_YOUR_CALM),
1883             /*immediate_disconnect_hint=*/true);
1884         close_transport_locked(
1885             t.get(),
1886             grpc_error_set_int(GRPC_ERROR_CREATE("ping timeout"),
1887                                grpc_core::StatusIntProperty::kRpcStatus,
1888                                GRPC_STATUS_UNAVAILABLE));
1889       }),
1890       absl::OkStatus());
1891 }
1892 
grpc_chttp2_settings_timeout(grpc_core::RefCountedPtr<grpc_chttp2_transport> t)1893 void grpc_chttp2_settings_timeout(
1894     grpc_core::RefCountedPtr<grpc_chttp2_transport> t) {
1895   t->combiner->Run(
1896       grpc_core::NewClosure([t](grpc_error_handle) {
1897         GRPC_TRACE_LOG(http, INFO) << t->peer_string.as_string_view()
1898                                    << ": Settings timeout. Closing transport.";
1899         send_goaway(
1900             t.get(),
1901             grpc_error_set_int(GRPC_ERROR_CREATE("settings_timeout"),
1902                                grpc_core::StatusIntProperty::kHttp2Error,
1903                                GRPC_HTTP2_SETTINGS_TIMEOUT),
1904             /*immediate_disconnect_hint=*/true);
1905         close_transport_locked(
1906             t.get(),
1907             grpc_error_set_int(GRPC_ERROR_CREATE("settings timeout"),
1908                                grpc_core::StatusIntProperty::kRpcStatus,
1909                                GRPC_STATUS_UNAVAILABLE));
1910       }),
1911       absl::OkStatus());
1912 }
1913 
1914 namespace {
1915 
1916 // Fire and forget (deletes itself on completion). Does a graceful shutdown by
1917 // sending a GOAWAY frame with the last stream id set to 2^31-1, sending a ping
1918 // and waiting for an ack (effective waiting for an RTT) and then sending a
1919 // final GOAWAY frame with an updated last stream identifier. This helps ensure
1920 // that a connection can be cleanly shut down without losing requests.
1921 // In the event, that the client does not respond to the ping for some reason,
1922 // we add a 20 second deadline, after which we send the second goaway.
1923 class GracefulGoaway : public grpc_core::RefCounted<GracefulGoaway> {
1924  public:
Start(grpc_chttp2_transport * t)1925   static void Start(grpc_chttp2_transport* t) { new GracefulGoaway(t); }
1926 
1927  private:
1928   using TaskHandle = ::grpc_event_engine::experimental::EventEngine::TaskHandle;
1929 
GracefulGoaway(grpc_chttp2_transport * t)1930   explicit GracefulGoaway(grpc_chttp2_transport* t) : t_(t->Ref()) {
1931     t->sent_goaway_state = GRPC_CHTTP2_GRACEFUL_GOAWAY;
1932     grpc_chttp2_goaway_append((1u << 31) - 1, 0, grpc_empty_slice(), &t->qbuf);
1933     t->keepalive_timeout =
1934         std::min(t->keepalive_timeout, grpc_core::Duration::Seconds(20));
1935     t->ping_timeout =
1936         std::min(t->ping_timeout, grpc_core::Duration::Seconds(20));
1937     send_ping_locked(
1938         t, nullptr, GRPC_CLOSURE_INIT(&on_ping_ack_, OnPingAck, this, nullptr));
1939     grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT);
1940   }
1941 
MaybeSendFinalGoawayLocked()1942   void MaybeSendFinalGoawayLocked() {
1943     if (t_->sent_goaway_state != GRPC_CHTTP2_GRACEFUL_GOAWAY) {
1944       // We already sent the final GOAWAY.
1945       return;
1946     }
1947     if (t_->destroying || !t_->closed_with_error.ok()) {
1948       GRPC_TRACE_LOG(http, INFO) << "transport:" << t_.get() << " "
1949                                  << (t_->is_client ? "CLIENT" : "SERVER")
1950                                  << " peer:" << t_->peer_string.as_string_view()
1951                                  << " Transport already shutting down. "
1952                                     "Graceful GOAWAY abandoned.";
1953       return;
1954     }
1955     // Ping completed. Send final goaway.
1956     GRPC_TRACE_LOG(http, INFO)
1957         << "transport:" << t_.get() << " "
1958         << (t_->is_client ? "CLIENT" : "SERVER")
1959         << " peer:" << std::string(t_->peer_string.as_string_view())
1960         << " Graceful shutdown: Ping received. "
1961            "Sending final GOAWAY with stream_id:"
1962         << t_->last_new_stream_id;
1963     t_->sent_goaway_state = GRPC_CHTTP2_FINAL_GOAWAY_SEND_SCHEDULED;
1964     grpc_chttp2_goaway_append(t_->last_new_stream_id, 0, grpc_empty_slice(),
1965                               &t_->qbuf);
1966     grpc_chttp2_initiate_write(t_.get(),
1967                                GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT);
1968   }
1969 
OnPingAck(void * arg,grpc_error_handle)1970   static void OnPingAck(void* arg, grpc_error_handle /* error */) {
1971     auto* self = static_cast<GracefulGoaway*>(arg);
1972     self->t_->combiner->Run(
1973         GRPC_CLOSURE_INIT(&self->on_ping_ack_, OnPingAckLocked, self, nullptr),
1974         absl::OkStatus());
1975   }
1976 
OnPingAckLocked(void * arg,grpc_error_handle)1977   static void OnPingAckLocked(void* arg, grpc_error_handle /* error */) {
1978     auto* self = static_cast<GracefulGoaway*>(arg);
1979     self->MaybeSendFinalGoawayLocked();
1980     self->Unref();
1981   }
1982 
1983   const grpc_core::RefCountedPtr<grpc_chttp2_transport> t_;
1984   grpc_closure on_ping_ack_;
1985 };
1986 
1987 }  // namespace
1988 
send_goaway(grpc_chttp2_transport * t,grpc_error_handle error,const bool immediate_disconnect_hint)1989 static void send_goaway(grpc_chttp2_transport* t, grpc_error_handle error,
1990                         const bool immediate_disconnect_hint) {
1991   grpc_http2_error_code http_error;
1992   std::string message;
1993   grpc_error_get_status(error, grpc_core::Timestamp::InfFuture(), nullptr,
1994                         &message, &http_error, nullptr);
1995   if (!t->is_client && http_error == GRPC_HTTP2_NO_ERROR &&
1996       !immediate_disconnect_hint) {
1997     // Do a graceful shutdown.
1998     if (t->sent_goaway_state == GRPC_CHTTP2_NO_GOAWAY_SEND) {
1999       GracefulGoaway::Start(t);
2000     } else {
2001       // Graceful GOAWAY is already in progress.
2002     }
2003   } else if (t->sent_goaway_state == GRPC_CHTTP2_NO_GOAWAY_SEND ||
2004              t->sent_goaway_state == GRPC_CHTTP2_GRACEFUL_GOAWAY) {
2005     // We want to log this irrespective of whether http tracing is enabled
2006     VLOG(2) << t->peer_string.as_string_view() << " "
2007             << (t->is_client ? "CLIENT" : "SERVER")
2008             << ": Sending goaway last_new_stream_id=" << t->last_new_stream_id
2009             << " err=" << grpc_core::StatusToString(error);
2010     t->sent_goaway_state = GRPC_CHTTP2_FINAL_GOAWAY_SEND_SCHEDULED;
2011     grpc_chttp2_goaway_append(
2012         t->last_new_stream_id, static_cast<uint32_t>(http_error),
2013         grpc_slice_from_cpp_string(std::move(message)), &t->qbuf);
2014   } else {
2015     // Final GOAWAY has already been sent.
2016   }
2017   grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT);
2018 }
2019 
grpc_chttp2_exceeded_ping_strikes(grpc_chttp2_transport * t)2020 void grpc_chttp2_exceeded_ping_strikes(grpc_chttp2_transport* t) {
2021   send_goaway(t,
2022               grpc_error_set_int(GRPC_ERROR_CREATE("too_many_pings"),
2023                                  grpc_core::StatusIntProperty::kHttp2Error,
2024                                  GRPC_HTTP2_ENHANCE_YOUR_CALM),
2025               /*immediate_disconnect_hint=*/true);
2026   // The transport will be closed after the write is done
2027   close_transport_locked(
2028       t, grpc_error_set_int(GRPC_ERROR_CREATE("Too many pings"),
2029                             grpc_core::StatusIntProperty::kRpcStatus,
2030                             GRPC_STATUS_UNAVAILABLE));
2031 }
2032 
grpc_chttp2_reset_ping_clock(grpc_chttp2_transport * t)2033 void grpc_chttp2_reset_ping_clock(grpc_chttp2_transport* t) {
2034   if (!t->is_client) {
2035     t->ping_abuse_policy.ResetPingStrikes();
2036   }
2037   t->ping_rate_policy.ResetPingsBeforeDataRequired();
2038 }
2039 
perform_transport_op_locked(void * stream_op,grpc_error_handle)2040 static void perform_transport_op_locked(void* stream_op,
2041                                         grpc_error_handle /*error_ignored*/) {
2042   grpc_transport_op* op = static_cast<grpc_transport_op*>(stream_op);
2043   grpc_core::RefCountedPtr<grpc_chttp2_transport> t(
2044       static_cast<grpc_chttp2_transport*>(op->handler_private.extra_arg));
2045 
2046   if (!op->goaway_error.ok()) {
2047     send_goaway(t.get(), op->goaway_error, /*immediate_disconnect_hint=*/false);
2048   }
2049 
2050   if (op->set_accept_stream) {
2051     t->accept_stream_cb = op->set_accept_stream_fn;
2052     t->accept_stream_cb_user_data = op->set_accept_stream_user_data;
2053     t->registered_method_matcher_cb = op->set_registered_method_matcher_fn;
2054   }
2055 
2056   if (op->bind_pollset) {
2057     if (t->ep != nullptr) {
2058       grpc_endpoint_add_to_pollset(t->ep.get(), op->bind_pollset);
2059     }
2060   }
2061 
2062   if (op->bind_pollset_set) {
2063     if (t->ep != nullptr) {
2064       grpc_endpoint_add_to_pollset_set(t->ep.get(), op->bind_pollset_set);
2065     }
2066   }
2067 
2068   if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
2069     send_ping_locked(t.get(), op->send_ping.on_initiate, op->send_ping.on_ack);
2070     grpc_chttp2_initiate_write(t.get(),
2071                                GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING);
2072   }
2073 
2074   if (op->start_connectivity_watch != nullptr) {
2075     t->state_tracker.AddWatcher(op->start_connectivity_watch_state,
2076                                 std::move(op->start_connectivity_watch));
2077   }
2078   if (op->stop_connectivity_watch != nullptr) {
2079     t->state_tracker.RemoveWatcher(op->stop_connectivity_watch);
2080   }
2081 
2082   if (!op->disconnect_with_error.ok()) {
2083     send_goaway(t.get(), op->disconnect_with_error,
2084                 /*immediate_disconnect_hint=*/true);
2085     close_transport_locked(t.get(), op->disconnect_with_error);
2086   }
2087 
2088   grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, absl::OkStatus());
2089 }
2090 
PerformOp(grpc_transport_op * op)2091 void grpc_chttp2_transport::PerformOp(grpc_transport_op* op) {
2092   GRPC_TRACE_LOG(http, INFO) << "perform_transport_op[t=" << this
2093                              << "]: " << grpc_transport_op_string(op);
2094   op->handler_private.extra_arg = this;
2095   Ref().release()->combiner->Run(
2096       GRPC_CLOSURE_INIT(&op->handler_private.closure,
2097                         perform_transport_op_locked, op, nullptr),
2098       absl::OkStatus());
2099 }
2100 
2101 //
2102 // INPUT PROCESSING - GENERAL
2103 //
2104 
grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport * t,grpc_chttp2_stream * s)2105 void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport* t,
2106                                                       grpc_chttp2_stream* s) {
2107   if (s->recv_initial_metadata_ready != nullptr &&
2108       s->published_metadata[0] != GRPC_METADATA_NOT_PUBLISHED) {
2109     if (s->seen_error) {
2110       grpc_slice_buffer_reset_and_unref(&s->frame_storage);
2111     }
2112     *s->recv_initial_metadata = std::move(s->initial_metadata_buffer);
2113     s->recv_initial_metadata->Set(grpc_core::PeerString(),
2114                                   t->peer_string.Ref());
2115     // If we didn't receive initial metadata from the wire and instead faked a
2116     // status (due to stream cancellations for example), let upper layers know
2117     // that trailing metadata is immediately available.
2118     if (s->trailing_metadata_available != nullptr &&
2119         s->published_metadata[0] != GRPC_METADATA_PUBLISHED_FROM_WIRE &&
2120         s->published_metadata[1] == GRPC_METADATA_SYNTHESIZED_FROM_FAKE) {
2121       *s->trailing_metadata_available = true;
2122       s->trailing_metadata_available = nullptr;
2123     }
2124     if (t->registered_method_matcher_cb != nullptr) {
2125       t->registered_method_matcher_cb(t->accept_stream_cb_user_data,
2126                                       s->recv_initial_metadata);
2127     }
2128     null_then_sched_closure(&s->recv_initial_metadata_ready);
2129   }
2130 }
2131 
grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport * t,grpc_chttp2_stream * s)2132 void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t,
2133                                              grpc_chttp2_stream* s) {
2134   if (s->recv_message_ready == nullptr) return;
2135 
2136   grpc_core::chttp2::StreamFlowControl::IncomingUpdateContext upd(
2137       &s->flow_control);
2138   grpc_error_handle error;
2139 
2140   // Lambda is immediately invoked as a big scoped section that can be
2141   // exited out of at any point by returning.
2142   [&]() {
2143     GRPC_TRACE_VLOG(http, 2)
2144         << "maybe_complete_recv_message " << s
2145         << " final_metadata_requested=" << s->final_metadata_requested
2146         << " seen_error=" << s->seen_error;
2147     if (s->final_metadata_requested && s->seen_error) {
2148       grpc_slice_buffer_reset_and_unref(&s->frame_storage);
2149       s->recv_message->reset();
2150     } else {
2151       if (s->frame_storage.length != 0) {
2152         while (true) {
2153           CHECK_GT(s->frame_storage.length, 0u);
2154           int64_t min_progress_size;
2155           auto r = grpc_deframe_unprocessed_incoming_frames(
2156               s, &min_progress_size, &**s->recv_message, s->recv_message_flags);
2157           GRPC_TRACE_VLOG(http, 2)
2158               << "Deframe data frame: "
2159               << grpc_core::PollToString(
2160                      r, [](absl::Status r) { return r.ToString(); });
2161           if (r.pending()) {
2162             if (s->read_closed) {
2163               grpc_slice_buffer_reset_and_unref(&s->frame_storage);
2164               s->recv_message->reset();
2165               break;
2166             } else {
2167               upd.SetMinProgressSize(min_progress_size);
2168               return;  // Out of lambda to enclosing function
2169             }
2170           } else {
2171             error = std::move(r.value());
2172             if (!error.ok()) {
2173               s->seen_error = true;
2174               grpc_slice_buffer_reset_and_unref(&s->frame_storage);
2175               break;
2176             } else {
2177               if (t->channelz_socket != nullptr) {
2178                 t->channelz_socket->RecordMessageReceived();
2179               }
2180               break;
2181             }
2182           }
2183         }
2184       } else if (s->read_closed) {
2185         s->recv_message->reset();
2186       } else {
2187         upd.SetMinProgressSize(GRPC_HEADER_SIZE_IN_BYTES);
2188         return;  // Out of lambda to enclosing function
2189       }
2190     }
2191     // save the length of the buffer before handing control back to application
2192     // threads. Needed to support correct flow control bookkeeping
2193     if (error.ok() && s->recv_message->has_value()) {
2194       null_then_sched_closure(&s->recv_message_ready);
2195     } else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) {
2196       if (s->call_failed_before_recv_message != nullptr) {
2197         *s->call_failed_before_recv_message =
2198             (s->published_metadata[1] != GRPC_METADATA_PUBLISHED_AT_CLOSE);
2199       }
2200       null_then_sched_closure(&s->recv_message_ready);
2201     }
2202   }();
2203 
2204   upd.SetPendingSize(s->frame_storage.length);
2205   grpc_chttp2_act_on_flowctl_action(upd.MakeAction(), t, s);
2206 }
2207 
grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport * t,grpc_chttp2_stream * s)2208 void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
2209                                                        grpc_chttp2_stream* s) {
2210   grpc_chttp2_maybe_complete_recv_message(t, s);
2211   GRPC_TRACE_VLOG(http, 2) << "maybe_complete_recv_trailing_metadata cli="
2212                            << t->is_client << " s=" << s
2213                            << " closure=" << s->recv_trailing_metadata_finished
2214                            << " read_closed=" << s->read_closed
2215                            << " write_closed=" << s->write_closed << " "
2216                            << s->frame_storage.length;
2217   if (s->recv_trailing_metadata_finished != nullptr && s->read_closed &&
2218       s->write_closed) {
2219     if (s->seen_error || !t->is_client) {
2220       grpc_slice_buffer_reset_and_unref(&s->frame_storage);
2221     }
2222     if (s->read_closed && s->frame_storage.length == 0 &&
2223         s->recv_trailing_metadata_finished != nullptr) {
2224       grpc_transport_move_stats(&s->stats, s->collecting_stats);
2225       s->collecting_stats = nullptr;
2226       *s->recv_trailing_metadata = std::move(s->trailing_metadata_buffer);
2227       null_then_sched_closure(&s->recv_trailing_metadata_finished);
2228     }
2229   }
2230 }
2231 
remove_stream(grpc_chttp2_transport * t,uint32_t id,grpc_error_handle error)2232 static grpc_chttp2_transport::RemovedStreamHandle remove_stream(
2233     grpc_chttp2_transport* t, uint32_t id, grpc_error_handle error) {
2234   grpc_chttp2_stream* s = t->stream_map.extract(id).mapped();
2235   DCHECK(s);
2236   if (t->incoming_stream == s) {
2237     t->incoming_stream = nullptr;
2238     grpc_chttp2_parsing_become_skip_parser(t);
2239   }
2240 
2241   if (t->stream_map.empty()) {
2242     post_benign_reclaimer(t);
2243     if (t->sent_goaway_state == GRPC_CHTTP2_FINAL_GOAWAY_SENT) {
2244       close_transport_locked(
2245           t, GRPC_ERROR_CREATE_REFERENCING(
2246                  "Last stream closed after sending GOAWAY", &error, 1));
2247     }
2248   }
2249   if (grpc_chttp2_list_remove_writable_stream(t, s)) {
2250     GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:remove_stream");
2251   }
2252   grpc_chttp2_list_remove_stalled_by_stream(t, s);
2253   grpc_chttp2_list_remove_stalled_by_transport(t, s);
2254 
2255   maybe_start_some_streams(t);
2256 
2257   if (t->is_client) return grpc_chttp2_transport::RemovedStreamHandle();
2258   return grpc_chttp2_transport::RemovedStreamHandle(t->Ref());
2259 }
2260 
2261 namespace grpc_core {
2262 namespace {
2263 
TarpitDuration(grpc_chttp2_transport * t)2264 Duration TarpitDuration(grpc_chttp2_transport* t) {
2265   return Duration::Milliseconds(absl::LogUniform<int>(
2266       absl::BitGen(), t->min_tarpit_duration_ms, t->max_tarpit_duration_ms));
2267 }
2268 
2269 template <typename F>
MaybeTarpit(grpc_chttp2_transport * t,bool tarpit,F fn)2270 void MaybeTarpit(grpc_chttp2_transport* t, bool tarpit, F fn) {
2271   if (!tarpit || !t->allow_tarpit || t->is_client) {
2272     fn(t);
2273     return;
2274   }
2275   const auto duration = TarpitDuration(t);
2276   t->event_engine->RunAfter(
2277       duration, [t = t->Ref(), fn = std::move(fn)]() mutable {
2278         ApplicationCallbackExecCtx app_exec_ctx;
2279         ExecCtx exec_ctx;
2280         t->combiner->Run(
2281             NewClosure([t, fn = std::move(fn)](grpc_error_handle) mutable {
2282               // TODO(ctiller): this can result in not sending RST_STREAMS if a
2283               // request gets tarpit behind a transport close.
2284               if (!t->closed_with_error.ok()) return;
2285               fn(t.get());
2286             }),
2287             absl::OkStatus());
2288       });
2289 }
2290 
2291 }  // namespace
2292 }  // namespace grpc_core
2293 
grpc_chttp2_cancel_stream(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error_handle due_to_error,bool tarpit)2294 void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2295                                grpc_error_handle due_to_error, bool tarpit) {
2296   if (!t->is_client && !s->sent_trailing_metadata &&
2297       grpc_error_has_clear_grpc_status(due_to_error) &&
2298       !(s->read_closed && s->write_closed)) {
2299     close_from_api(t, s, due_to_error, tarpit);
2300     return;
2301   }
2302 
2303   if (!due_to_error.ok() && !s->seen_error) {
2304     s->seen_error = true;
2305   }
2306   if (!s->read_closed || !s->write_closed) {
2307     if (s->id != 0) {
2308       grpc_http2_error_code http_error;
2309       grpc_error_get_status(due_to_error, s->deadline, nullptr, nullptr,
2310                             &http_error, nullptr);
2311       grpc_core::MaybeTarpit(
2312           t, tarpit,
2313           [id = s->id, http_error,
2314            remove_stream_handle = grpc_chttp2_mark_stream_closed(
2315                t, s, 1, 1, due_to_error)](grpc_chttp2_transport* t) {
2316             grpc_chttp2_add_rst_stream_to_next_write(
2317                 t, id, static_cast<uint32_t>(http_error), nullptr);
2318             grpc_chttp2_initiate_write(t,
2319                                        GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
2320           });
2321       return;
2322     }
2323   }
2324   grpc_chttp2_mark_stream_closed(t, s, 1, 1, due_to_error);
2325 }
2326 
grpc_chttp2_fake_status(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error_handle error)2327 void grpc_chttp2_fake_status(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2328                              grpc_error_handle error) {
2329   grpc_status_code status;
2330   std::string message;
2331   grpc_error_get_status(error, s->deadline, &status, &message, nullptr,
2332                         nullptr);
2333   if (status != GRPC_STATUS_OK) {
2334     s->seen_error = true;
2335   }
2336   // stream_global->recv_trailing_metadata_finished gives us a
2337   //   last chance replacement: we've received trailing metadata,
2338   //   but something more important has become available to signal
2339   //   to the upper layers - drop what we've got, and then publish
2340   //   what we want - which is safe because we haven't told anyone
2341   //   about the metadata yet
2342   if (s->published_metadata[1] == GRPC_METADATA_NOT_PUBLISHED ||
2343       s->recv_trailing_metadata_finished != nullptr ||
2344       !s->final_metadata_requested) {
2345     s->trailing_metadata_buffer.Set(grpc_core::GrpcStatusMetadata(), status);
2346     if (!message.empty()) {
2347       s->trailing_metadata_buffer.Set(
2348           grpc_core::GrpcMessageMetadata(),
2349           grpc_core::Slice::FromCopiedBuffer(message));
2350     }
2351     s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE;
2352     grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2353   }
2354 }
2355 
add_error(grpc_error_handle error,grpc_error_handle * refs,size_t * nrefs)2356 static void add_error(grpc_error_handle error, grpc_error_handle* refs,
2357                       size_t* nrefs) {
2358   if (error.ok()) return;
2359   for (size_t i = 0; i < *nrefs; i++) {
2360     if (error == refs[i]) {
2361       return;
2362     }
2363   }
2364   refs[*nrefs] = error;
2365   ++*nrefs;
2366 }
2367 
removal_error(grpc_error_handle extra_error,grpc_chttp2_stream * s,const char * main_error_msg)2368 static grpc_error_handle removal_error(grpc_error_handle extra_error,
2369                                        grpc_chttp2_stream* s,
2370                                        const char* main_error_msg) {
2371   grpc_error_handle refs[3];
2372   size_t nrefs = 0;
2373   add_error(s->read_closed_error, refs, &nrefs);
2374   add_error(s->write_closed_error, refs, &nrefs);
2375   add_error(extra_error, refs, &nrefs);
2376   grpc_error_handle error;
2377   if (nrefs > 0) {
2378     error = GRPC_ERROR_CREATE_REFERENCING(main_error_msg, refs, nrefs);
2379   }
2380   return error;
2381 }
2382 
flush_write_list(grpc_chttp2_transport * t,grpc_chttp2_write_cb ** list,grpc_error_handle error)2383 static void flush_write_list(grpc_chttp2_transport* t,
2384                              grpc_chttp2_write_cb** list,
2385                              grpc_error_handle error) {
2386   while (*list) {
2387     grpc_chttp2_write_cb* cb = *list;
2388     *list = cb->next;
2389     grpc_chttp2_complete_closure_step(t, &cb->closure, error,
2390                                       "on_write_finished_cb");
2391     cb->next = t->write_cb_pool;
2392     t->write_cb_pool = cb;
2393   }
2394 }
2395 
grpc_chttp2_fail_pending_writes(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error_handle error)2396 void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t,
2397                                      grpc_chttp2_stream* s,
2398                                      grpc_error_handle error) {
2399   error =
2400       removal_error(error, s, "Pending writes failed due to stream closure");
2401   s->send_initial_metadata = nullptr;
2402   grpc_chttp2_complete_closure_step(t, &s->send_initial_metadata_finished,
2403                                     error, "send_initial_metadata_finished");
2404 
2405   s->send_trailing_metadata = nullptr;
2406   s->sent_trailing_metadata_op = nullptr;
2407   grpc_chttp2_complete_closure_step(t, &s->send_trailing_metadata_finished,
2408                                     error, "send_trailing_metadata_finished");
2409 
2410   grpc_chttp2_complete_closure_step(t, &s->send_message_finished, error,
2411                                     "fetching_send_message_finished");
2412   flush_write_list(t, &s->on_write_finished_cbs, error);
2413   flush_write_list(t, &s->on_flow_controlled_cbs, error);
2414 }
2415 
grpc_chttp2_mark_stream_closed(grpc_chttp2_transport * t,grpc_chttp2_stream * s,int close_reads,int close_writes,grpc_error_handle error)2416 grpc_chttp2_transport::RemovedStreamHandle grpc_chttp2_mark_stream_closed(
2417     grpc_chttp2_transport* t, grpc_chttp2_stream* s, int close_reads,
2418     int close_writes, grpc_error_handle error) {
2419   grpc_chttp2_transport::RemovedStreamHandle rsh;
2420   GRPC_TRACE_VLOG(http, 2)
2421       << "MARK_STREAM_CLOSED: t=" << t << " s=" << s << "(id=" << s->id << ") "
2422       << ((close_reads && close_writes)
2423               ? "read+write"
2424               : (close_reads ? "read" : (close_writes ? "write" : "nothing??")))
2425       << " [" << grpc_core::StatusToString(error) << "]";
2426   if (s->read_closed && s->write_closed) {
2427     // already closed, but we should still fake the status if needed.
2428     grpc_error_handle overall_error = removal_error(error, s, "Stream removed");
2429     if (!overall_error.ok()) {
2430       grpc_chttp2_fake_status(t, s, overall_error);
2431     }
2432     grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2433     return rsh;
2434   }
2435   bool closed_read = false;
2436   bool became_closed = false;
2437   if (close_reads && !s->read_closed) {
2438     s->read_closed_error = error;
2439     s->read_closed = true;
2440     closed_read = true;
2441   }
2442   if (close_writes && !s->write_closed) {
2443     s->write_closed_error = error;
2444     s->write_closed = true;
2445     grpc_chttp2_fail_pending_writes(t, s, error);
2446   }
2447   if (s->read_closed && s->write_closed) {
2448     became_closed = true;
2449     grpc_error_handle overall_error = removal_error(error, s, "Stream removed");
2450     if (s->id != 0) {
2451       rsh = remove_stream(t, s->id, overall_error);
2452     } else {
2453       // Purge streams waiting on concurrency still waiting for id assignment
2454       grpc_chttp2_list_remove_waiting_for_concurrency(t, s);
2455     }
2456     if (!overall_error.ok()) {
2457       grpc_chttp2_fake_status(t, s, overall_error);
2458     }
2459   }
2460   if (closed_read) {
2461     for (int i = 0; i < 2; i++) {
2462       if (s->published_metadata[i] == GRPC_METADATA_NOT_PUBLISHED) {
2463         s->published_metadata[i] = GRPC_METADATA_PUBLISHED_AT_CLOSE;
2464       }
2465     }
2466     grpc_chttp2_maybe_complete_recv_initial_metadata(t, s);
2467     grpc_chttp2_maybe_complete_recv_message(t, s);
2468   }
2469   if (became_closed) {
2470     s->stats.latency =
2471         gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), s->creation_time);
2472     grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2473     GRPC_CHTTP2_STREAM_UNREF(s, "chttp2");
2474   }
2475   return rsh;
2476 }
2477 
close_from_api(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error_handle error,bool tarpit)2478 static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2479                            grpc_error_handle error, bool tarpit) {
2480   grpc_status_code grpc_status;
2481   std::string message;
2482   grpc_error_get_status(error, s->deadline, &grpc_status, &message, nullptr,
2483                         nullptr);
2484 
2485   CHECK_GE(grpc_status, 0);
2486   CHECK_LT((int)grpc_status, 100);
2487 
2488   auto remove_stream_handle = grpc_chttp2_mark_stream_closed(t, s, 1, 1, error);
2489   grpc_core::MaybeTarpit(
2490       t, tarpit,
2491       [error = std::move(error),
2492        sent_initial_metadata = s->sent_initial_metadata, id = s->id,
2493        grpc_status, message = std::move(message),
2494        remove_stream_handle =
2495            std::move(remove_stream_handle)](grpc_chttp2_transport* t) mutable {
2496         grpc_slice hdr;
2497         grpc_slice status_hdr;
2498         grpc_slice http_status_hdr;
2499         grpc_slice content_type_hdr;
2500         grpc_slice message_pfx;
2501         uint8_t* p;
2502         uint32_t len = 0;
2503 
2504         // Hand roll a header block.
2505         //   This is unnecessarily ugly - at some point we should find a more
2506         //   elegant solution.
2507         //   It's complicated by the fact that our send machinery would be dead
2508         //   by the time we got around to sending this, so instead we ignore
2509         //   HPACK compression and just write the uncompressed bytes onto the
2510         //   wire.
2511         if (!sent_initial_metadata) {
2512           http_status_hdr = GRPC_SLICE_MALLOC(13);
2513           p = GRPC_SLICE_START_PTR(http_status_hdr);
2514           *p++ = 0x00;
2515           *p++ = 7;
2516           *p++ = ':';
2517           *p++ = 's';
2518           *p++ = 't';
2519           *p++ = 'a';
2520           *p++ = 't';
2521           *p++ = 'u';
2522           *p++ = 's';
2523           *p++ = 3;
2524           *p++ = '2';
2525           *p++ = '0';
2526           *p++ = '0';
2527           CHECK(p == GRPC_SLICE_END_PTR(http_status_hdr));
2528           len += static_cast<uint32_t> GRPC_SLICE_LENGTH(http_status_hdr);
2529 
2530           content_type_hdr = GRPC_SLICE_MALLOC(31);
2531           p = GRPC_SLICE_START_PTR(content_type_hdr);
2532           *p++ = 0x00;
2533           *p++ = 12;
2534           *p++ = 'c';
2535           *p++ = 'o';
2536           *p++ = 'n';
2537           *p++ = 't';
2538           *p++ = 'e';
2539           *p++ = 'n';
2540           *p++ = 't';
2541           *p++ = '-';
2542           *p++ = 't';
2543           *p++ = 'y';
2544           *p++ = 'p';
2545           *p++ = 'e';
2546           *p++ = 16;
2547           *p++ = 'a';
2548           *p++ = 'p';
2549           *p++ = 'p';
2550           *p++ = 'l';
2551           *p++ = 'i';
2552           *p++ = 'c';
2553           *p++ = 'a';
2554           *p++ = 't';
2555           *p++ = 'i';
2556           *p++ = 'o';
2557           *p++ = 'n';
2558           *p++ = '/';
2559           *p++ = 'g';
2560           *p++ = 'r';
2561           *p++ = 'p';
2562           *p++ = 'c';
2563           CHECK(p == GRPC_SLICE_END_PTR(content_type_hdr));
2564           len += static_cast<uint32_t> GRPC_SLICE_LENGTH(content_type_hdr);
2565         }
2566 
2567         status_hdr = GRPC_SLICE_MALLOC(15 + (grpc_status >= 10));
2568         p = GRPC_SLICE_START_PTR(status_hdr);
2569         *p++ = 0x00;  // literal header, not indexed
2570         *p++ = 11;    // len(grpc-status)
2571         *p++ = 'g';
2572         *p++ = 'r';
2573         *p++ = 'p';
2574         *p++ = 'c';
2575         *p++ = '-';
2576         *p++ = 's';
2577         *p++ = 't';
2578         *p++ = 'a';
2579         *p++ = 't';
2580         *p++ = 'u';
2581         *p++ = 's';
2582         if (grpc_status < 10) {
2583           *p++ = 1;
2584           *p++ = static_cast<uint8_t>('0' + grpc_status);
2585         } else {
2586           *p++ = 2;
2587           *p++ = static_cast<uint8_t>('0' + (grpc_status / 10));
2588           *p++ = static_cast<uint8_t>('0' + (grpc_status % 10));
2589         }
2590         CHECK(p == GRPC_SLICE_END_PTR(status_hdr));
2591         len += static_cast<uint32_t> GRPC_SLICE_LENGTH(status_hdr);
2592 
2593         size_t msg_len = message.length();
2594         CHECK(msg_len <= UINT32_MAX);
2595         grpc_core::VarintWriter<1> msg_len_writer(
2596             static_cast<uint32_t>(msg_len));
2597         message_pfx = GRPC_SLICE_MALLOC(14 + msg_len_writer.length());
2598         p = GRPC_SLICE_START_PTR(message_pfx);
2599         *p++ = 0x00;  // literal header, not indexed
2600         *p++ = 12;    // len(grpc-message)
2601         *p++ = 'g';
2602         *p++ = 'r';
2603         *p++ = 'p';
2604         *p++ = 'c';
2605         *p++ = '-';
2606         *p++ = 'm';
2607         *p++ = 'e';
2608         *p++ = 's';
2609         *p++ = 's';
2610         *p++ = 'a';
2611         *p++ = 'g';
2612         *p++ = 'e';
2613         msg_len_writer.Write(0, p);
2614         p += msg_len_writer.length();
2615         CHECK(p == GRPC_SLICE_END_PTR(message_pfx));
2616         len += static_cast<uint32_t> GRPC_SLICE_LENGTH(message_pfx);
2617         len += static_cast<uint32_t>(msg_len);
2618 
2619         hdr = GRPC_SLICE_MALLOC(9);
2620         p = GRPC_SLICE_START_PTR(hdr);
2621         *p++ = static_cast<uint8_t>(len >> 16);
2622         *p++ = static_cast<uint8_t>(len >> 8);
2623         *p++ = static_cast<uint8_t>(len);
2624         *p++ = GRPC_CHTTP2_FRAME_HEADER;
2625         *p++ = GRPC_CHTTP2_DATA_FLAG_END_STREAM |
2626                GRPC_CHTTP2_DATA_FLAG_END_HEADERS;
2627         *p++ = static_cast<uint8_t>(id >> 24);
2628         *p++ = static_cast<uint8_t>(id >> 16);
2629         *p++ = static_cast<uint8_t>(id >> 8);
2630         *p++ = static_cast<uint8_t>(id);
2631         CHECK(p == GRPC_SLICE_END_PTR(hdr));
2632 
2633         grpc_slice_buffer_add(&t->qbuf, hdr);
2634         if (!sent_initial_metadata) {
2635           grpc_slice_buffer_add(&t->qbuf, http_status_hdr);
2636           grpc_slice_buffer_add(&t->qbuf, content_type_hdr);
2637         }
2638         grpc_slice_buffer_add(&t->qbuf, status_hdr);
2639         grpc_slice_buffer_add(&t->qbuf, message_pfx);
2640         grpc_slice_buffer_add(&t->qbuf,
2641                               grpc_slice_from_cpp_string(std::move(message)));
2642         grpc_chttp2_reset_ping_clock(t);
2643         grpc_chttp2_add_rst_stream_to_next_write(t, id, GRPC_HTTP2_NO_ERROR,
2644                                                  nullptr);
2645 
2646         grpc_chttp2_initiate_write(t,
2647                                    GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API);
2648       });
2649 }
2650 
end_all_the_calls(grpc_chttp2_transport * t,grpc_error_handle error)2651 static void end_all_the_calls(grpc_chttp2_transport* t,
2652                               grpc_error_handle error) {
2653   intptr_t http2_error;
2654   // If there is no explicit grpc or HTTP/2 error, set to UNAVAILABLE on server.
2655   if (!t->is_client && !grpc_error_has_clear_grpc_status(error) &&
2656       !grpc_error_get_int(error, grpc_core::StatusIntProperty::kHttp2Error,
2657                           &http2_error)) {
2658     error = grpc_error_set_int(error, grpc_core::StatusIntProperty::kRpcStatus,
2659                                GRPC_STATUS_UNAVAILABLE);
2660   }
2661   cancel_unstarted_streams(t, error, false);
2662   std::vector<grpc_chttp2_stream*> to_cancel;
2663   for (auto id_stream : t->stream_map) {
2664     to_cancel.push_back(id_stream.second);
2665   }
2666   for (auto s : to_cancel) {
2667     grpc_chttp2_cancel_stream(t, s, error, false);
2668   }
2669 }
2670 
2671 //
2672 // INPUT PROCESSING - PARSING
2673 //
2674 
2675 template <class F>
WithUrgency(grpc_chttp2_transport * t,grpc_core::chttp2::FlowControlAction::Urgency urgency,grpc_chttp2_initiate_write_reason reason,F action)2676 static void WithUrgency(grpc_chttp2_transport* t,
2677                         grpc_core::chttp2::FlowControlAction::Urgency urgency,
2678                         grpc_chttp2_initiate_write_reason reason, F action) {
2679   switch (urgency) {
2680     case grpc_core::chttp2::FlowControlAction::Urgency::NO_ACTION_NEEDED:
2681       break;
2682     case grpc_core::chttp2::FlowControlAction::Urgency::UPDATE_IMMEDIATELY:
2683       grpc_chttp2_initiate_write(t, reason);
2684       ABSL_FALLTHROUGH_INTENDED;
2685     case grpc_core::chttp2::FlowControlAction::Urgency::QUEUE_UPDATE:
2686       action();
2687       break;
2688   }
2689 }
2690 
grpc_chttp2_act_on_flowctl_action(const grpc_core::chttp2::FlowControlAction & action,grpc_chttp2_transport * t,grpc_chttp2_stream * s)2691 void grpc_chttp2_act_on_flowctl_action(
2692     const grpc_core::chttp2::FlowControlAction& action,
2693     grpc_chttp2_transport* t, grpc_chttp2_stream* s) {
2694   WithUrgency(t, action.send_stream_update(),
2695               GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL, [t, s]() {
2696                 if (s->id != 0 && !s->read_closed) {
2697                   grpc_chttp2_mark_stream_writable(t, s);
2698                 }
2699               });
2700   WithUrgency(t, action.send_transport_update(),
2701               GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL, []() {});
2702   WithUrgency(t, action.send_initial_window_update(),
2703               GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
2704                 t->settings.mutable_local().SetInitialWindowSize(
2705                     action.initial_window_size());
2706               });
2707   WithUrgency(
2708       t, action.send_max_frame_size_update(),
2709       GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
2710         t->settings.mutable_local().SetMaxFrameSize(action.max_frame_size());
2711       });
2712   if (t->enable_preferred_rx_crypto_frame_advertisement) {
2713     WithUrgency(
2714         t, action.preferred_rx_crypto_frame_size_update(),
2715         GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
2716           t->settings.mutable_local().SetPreferredReceiveCryptoMessageSize(
2717               action.preferred_rx_crypto_frame_size());
2718         });
2719   }
2720 }
2721 
try_http_parsing(grpc_chttp2_transport * t)2722 static grpc_error_handle try_http_parsing(grpc_chttp2_transport* t) {
2723   grpc_http_parser parser;
2724   size_t i = 0;
2725   grpc_error_handle error;
2726   grpc_http_response response;
2727 
2728   grpc_http_parser_init(&parser, GRPC_HTTP_RESPONSE, &response);
2729 
2730   grpc_error_handle parse_error;
2731   for (; i < t->read_buffer.count && parse_error.ok(); i++) {
2732     parse_error =
2733         grpc_http_parser_parse(&parser, t->read_buffer.slices[i], nullptr);
2734   }
2735   if (parse_error.ok() &&
2736       (parse_error = grpc_http_parser_eof(&parser)) == absl::OkStatus()) {
2737     error = grpc_error_set_int(
2738         GRPC_ERROR_CREATE(
2739             absl::StrCat("Trying to connect an http1.x server (HTTP status ",
2740                          response.status, ")")),
2741         grpc_core::StatusIntProperty::kRpcStatus,
2742         grpc_http2_status_to_grpc_status(response.status));
2743   }
2744 
2745   grpc_http_parser_destroy(&parser);
2746   grpc_http_response_destroy(&response);
2747   return error;
2748 }
2749 
read_action(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)2750 static void read_action(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
2751                         grpc_error_handle error) {
2752   auto* tp = t.get();
2753   tp->combiner->Run(grpc_core::InitTransportClosure<read_action_locked>(
2754                         std::move(t), &tp->read_action_locked),
2755                     error);
2756 }
2757 
read_action_parse_loop_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)2758 static void read_action_parse_loop_locked(
2759     grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
2760     grpc_error_handle error) {
2761   GRPC_LATENT_SEE_INNER_SCOPE("read_action_parse_loop_locked");
2762   if (t->closed_with_error.ok()) {
2763     grpc_error_handle errors[3] = {error, absl::OkStatus(), absl::OkStatus()};
2764     size_t requests_started = 0;
2765     for (size_t i = 0;
2766          i < t->read_buffer.count && errors[1] == absl::OkStatus(); i++) {
2767       auto r = grpc_chttp2_perform_read(t.get(), t->read_buffer.slices[i],
2768                                         requests_started);
2769       if (auto* partial_read_size = absl::get_if<size_t>(&r)) {
2770         for (size_t j = 0; j < i; j++) {
2771           grpc_core::CSliceUnref(grpc_slice_buffer_take_first(&t->read_buffer));
2772         }
2773         grpc_slice_buffer_sub_first(
2774             &t->read_buffer, *partial_read_size,
2775             GRPC_SLICE_LENGTH(t->read_buffer.slices[0]));
2776         t->combiner->ForceOffload();
2777         auto* tp = t.get();
2778         tp->combiner->Run(
2779             grpc_core::InitTransportClosure<read_action_parse_loop_locked>(
2780                 std::move(t), &tp->read_action_locked),
2781             std::move(errors[0]));
2782         // Early return: we queued to retry later.
2783         return;
2784       } else {
2785         errors[1] = std::move(absl::get<absl::Status>(r));
2786       }
2787     }
2788     if (errors[1] != absl::OkStatus()) {
2789       errors[2] = try_http_parsing(t.get());
2790       error = GRPC_ERROR_CREATE_REFERENCING("Failed parsing HTTP/2", errors,
2791                                             GPR_ARRAY_SIZE(errors));
2792     }
2793 
2794     if (t->initial_window_update != 0) {
2795       if (t->initial_window_update > 0) {
2796         grpc_chttp2_stream* s;
2797         while (grpc_chttp2_list_pop_stalled_by_stream(t.get(), &s)) {
2798           grpc_chttp2_mark_stream_writable(t.get(), s);
2799           grpc_chttp2_initiate_write(
2800               t.get(),
2801               GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING);
2802         }
2803       }
2804       t->initial_window_update = 0;
2805     }
2806   }
2807 
2808   bool keep_reading = false;
2809   if (error.ok() && !t->closed_with_error.ok()) {
2810     error = GRPC_ERROR_CREATE_REFERENCING("Transport closed",
2811                                           &t->closed_with_error, 1);
2812   }
2813   if (!error.ok()) {
2814     // If a goaway frame was received, this might be the reason why the read
2815     // failed. Add this info to the error
2816     if (!t->goaway_error.ok()) {
2817       error = grpc_error_add_child(error, t->goaway_error);
2818     }
2819 
2820     close_transport_locked(t.get(), error);
2821   } else if (t->closed_with_error.ok()) {
2822     keep_reading = true;
2823     // Since we have read a byte, reset the keepalive timer
2824     if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
2825       maybe_reset_keepalive_ping_timer_locked(t.get());
2826     }
2827   }
2828   grpc_slice_buffer_reset_and_unref(&t->read_buffer);
2829 
2830   if (keep_reading) {
2831     if (t->num_pending_induced_frames >= DEFAULT_MAX_PENDING_INDUCED_FRAMES) {
2832       t->reading_paused_on_pending_induced_frames = true;
2833       GRPC_TRACE_LOG(http, INFO)
2834           << "transport " << t.get()
2835           << " : Pausing reading due to too many unwritten "
2836              "SETTINGS ACK and RST_STREAM frames";
2837     } else {
2838       continue_read_action_locked(std::move(t));
2839     }
2840   }
2841 }
2842 
read_action_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)2843 static void read_action_locked(
2844     grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
2845     grpc_error_handle error) {
2846   // got an incoming read, cancel any pending keepalive timers
2847   t->keepalive_incoming_data_wanted = false;
2848   if (t->keepalive_ping_timeout_handle != TaskHandle::kInvalid) {
2849     if (GRPC_TRACE_FLAG_ENABLED(http2_ping) ||
2850         GRPC_TRACE_FLAG_ENABLED(http_keepalive)) {
2851       LOG(INFO) << (t->is_client ? "CLIENT" : "SERVER") << "[" << t.get()
2852                 << "]: Clear keepalive timer because data was received";
2853     }
2854     t->event_engine->Cancel(
2855         std::exchange(t->keepalive_ping_timeout_handle, TaskHandle::kInvalid));
2856   }
2857   grpc_error_handle err = error;
2858   if (!err.ok()) {
2859     err = grpc_error_set_int(
2860         GRPC_ERROR_CREATE_REFERENCING("Endpoint read failed", &err, 1),
2861         grpc_core::StatusIntProperty::kOccurredDuringWrite, t->write_state);
2862   }
2863   std::swap(err, error);
2864   read_action_parse_loop_locked(std::move(t), std::move(err));
2865 }
2866 
continue_read_action_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t)2867 static void continue_read_action_locked(
2868     grpc_core::RefCountedPtr<grpc_chttp2_transport> t) {
2869   const bool urgent = !t->goaway_error.ok();
2870   auto* tp = t.get();
2871   grpc_endpoint_read(tp->ep.get(), &tp->read_buffer,
2872                      grpc_core::InitTransportClosure<read_action>(
2873                          std::move(t), &tp->read_action_locked),
2874                      urgent, grpc_chttp2_min_read_progress_size(tp));
2875 }
2876 
2877 // t is reffed prior to calling the first time, and once the callback chain
2878 // that kicks off finishes, it's unreffed
schedule_bdp_ping_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t)2879 void schedule_bdp_ping_locked(
2880     grpc_core::RefCountedPtr<grpc_chttp2_transport> t) {
2881   auto* tp = t.get();
2882   tp->flow_control.bdp_estimator()->SchedulePing();
2883   send_ping_locked(tp,
2884                    grpc_core::InitTransportClosure<start_bdp_ping>(
2885                        tp->Ref(), &tp->start_bdp_ping_locked),
2886                    grpc_core::InitTransportClosure<finish_bdp_ping>(
2887                        std::move(t), &tp->finish_bdp_ping_locked));
2888   grpc_chttp2_initiate_write(tp, GRPC_CHTTP2_INITIATE_WRITE_BDP_PING);
2889 }
2890 
start_bdp_ping(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)2891 static void start_bdp_ping(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
2892                            grpc_error_handle error) {
2893   grpc_chttp2_transport* tp = t.get();
2894   tp->combiner->Run(grpc_core::InitTransportClosure<start_bdp_ping_locked>(
2895                         std::move(t), &tp->start_bdp_ping_locked),
2896                     error);
2897 }
2898 
start_bdp_ping_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)2899 static void start_bdp_ping_locked(
2900     grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
2901     grpc_error_handle error) {
2902   GRPC_TRACE_LOG(http, INFO)
2903       << t->peer_string.as_string_view()
2904       << ": Start BDP ping err=" << grpc_core::StatusToString(error);
2905   if (!error.ok() || !t->closed_with_error.ok()) {
2906     return;
2907   }
2908   // Reset the keepalive ping timer
2909   if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
2910     maybe_reset_keepalive_ping_timer_locked(t.get());
2911   }
2912   t->flow_control.bdp_estimator()->StartPing();
2913   t->bdp_ping_started = true;
2914 }
2915 
finish_bdp_ping(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)2916 static void finish_bdp_ping(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
2917                             grpc_error_handle error) {
2918   grpc_chttp2_transport* tp = t.get();
2919   tp->combiner->Run(grpc_core::InitTransportClosure<finish_bdp_ping_locked>(
2920                         std::move(t), &tp->finish_bdp_ping_locked),
2921                     error);
2922 }
2923 
finish_bdp_ping_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)2924 static void finish_bdp_ping_locked(
2925     grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
2926     grpc_error_handle error) {
2927   GRPC_TRACE_LOG(http, INFO)
2928       << t->peer_string.as_string_view()
2929       << ": Complete BDP ping err=" << grpc_core::StatusToString(error);
2930   if (!error.ok() || !t->closed_with_error.ok()) {
2931     return;
2932   }
2933   if (!t->bdp_ping_started) {
2934     // start_bdp_ping_locked has not been run yet. Schedule
2935     // finish_bdp_ping_locked to be run later.
2936     finish_bdp_ping(std::move(t), std::move(error));
2937     return;
2938   }
2939   t->bdp_ping_started = false;
2940   grpc_core::Timestamp next_ping =
2941       t->flow_control.bdp_estimator()->CompletePing();
2942   grpc_chttp2_act_on_flowctl_action(t->flow_control.PeriodicUpdate(), t.get(),
2943                                     nullptr);
2944   CHECK(t->next_bdp_ping_timer_handle == TaskHandle::kInvalid);
2945   t->next_bdp_ping_timer_handle =
2946       t->event_engine->RunAfter(next_ping - grpc_core::Timestamp::Now(), [t] {
2947         grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2948         grpc_core::ExecCtx exec_ctx;
2949         next_bdp_ping_timer_expired(t.get());
2950       });
2951 }
2952 
next_bdp_ping_timer_expired(grpc_chttp2_transport * t)2953 static void next_bdp_ping_timer_expired(grpc_chttp2_transport* t) {
2954   t->combiner->Run(
2955       grpc_core::InitTransportClosure<next_bdp_ping_timer_expired_locked>(
2956           t->Ref(), &t->next_bdp_ping_timer_expired_locked),
2957       absl::OkStatus());
2958 }
2959 
next_bdp_ping_timer_expired_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,GRPC_UNUSED grpc_error_handle error)2960 static void next_bdp_ping_timer_expired_locked(
2961     grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
2962     GRPC_UNUSED grpc_error_handle error) {
2963   DCHECK(error.ok());
2964   t->next_bdp_ping_timer_handle = TaskHandle::kInvalid;
2965   if (t->flow_control.bdp_estimator()->accumulator() == 0) {
2966     // Block the bdp ping till we receive more data.
2967     t->bdp_ping_blocked = true;
2968   } else {
2969     schedule_bdp_ping_locked(std::move(t));
2970   }
2971 }
2972 
grpc_chttp2_config_default_keepalive_args(grpc_channel_args * args,const bool is_client)2973 void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
2974                                                const bool is_client) {
2975   grpc_chttp2_config_default_keepalive_args(grpc_core::ChannelArgs::FromC(args),
2976                                             is_client);
2977 }
2978 
grpc_chttp2_config_default_keepalive_args_client(const grpc_core::ChannelArgs & channel_args)2979 static void grpc_chttp2_config_default_keepalive_args_client(
2980     const grpc_core::ChannelArgs& channel_args) {
2981   g_default_client_keepalive_time =
2982       std::max(grpc_core::Duration::Milliseconds(1),
2983                channel_args.GetDurationFromIntMillis(GRPC_ARG_KEEPALIVE_TIME_MS)
2984                    .value_or(g_default_client_keepalive_time));
2985   g_default_client_keepalive_timeout = std::max(
2986       grpc_core::Duration::Zero(),
2987       channel_args.GetDurationFromIntMillis(GRPC_ARG_KEEPALIVE_TIMEOUT_MS)
2988           .value_or(g_default_client_keepalive_timeout));
2989   g_default_client_keepalive_permit_without_calls =
2990       channel_args.GetBool(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)
2991           .value_or(g_default_client_keepalive_permit_without_calls);
2992 }
2993 
grpc_chttp2_config_default_keepalive_args_server(const grpc_core::ChannelArgs & channel_args)2994 static void grpc_chttp2_config_default_keepalive_args_server(
2995     const grpc_core::ChannelArgs& channel_args) {
2996   g_default_server_keepalive_time =
2997       std::max(grpc_core::Duration::Milliseconds(1),
2998                channel_args.GetDurationFromIntMillis(GRPC_ARG_KEEPALIVE_TIME_MS)
2999                    .value_or(g_default_server_keepalive_time));
3000   g_default_server_keepalive_timeout = std::max(
3001       grpc_core::Duration::Zero(),
3002       channel_args.GetDurationFromIntMillis(GRPC_ARG_KEEPALIVE_TIMEOUT_MS)
3003           .value_or(g_default_server_keepalive_timeout));
3004   g_default_server_keepalive_permit_without_calls =
3005       channel_args.GetBool(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)
3006           .value_or(g_default_server_keepalive_permit_without_calls);
3007 }
3008 
grpc_chttp2_config_default_keepalive_args(const grpc_core::ChannelArgs & channel_args,const bool is_client)3009 void grpc_chttp2_config_default_keepalive_args(
3010     const grpc_core::ChannelArgs& channel_args, const bool is_client) {
3011   if (is_client) {
3012     grpc_chttp2_config_default_keepalive_args_client(channel_args);
3013   } else {
3014     grpc_chttp2_config_default_keepalive_args_server(channel_args);
3015   }
3016   grpc_core::Chttp2PingAbusePolicy::SetDefaults(channel_args);
3017   grpc_core::Chttp2PingRatePolicy::SetDefaults(channel_args);
3018 }
3019 
init_keepalive_ping(grpc_core::RefCountedPtr<grpc_chttp2_transport> t)3020 static void init_keepalive_ping(
3021     grpc_core::RefCountedPtr<grpc_chttp2_transport> t) {
3022   auto* tp = t.get();
3023   tp->combiner->Run(grpc_core::InitTransportClosure<init_keepalive_ping_locked>(
3024                         std::move(t), &tp->init_keepalive_ping_locked),
3025                     absl::OkStatus());
3026 }
3027 
init_keepalive_ping_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,GRPC_UNUSED grpc_error_handle error)3028 static void init_keepalive_ping_locked(
3029     grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
3030     GRPC_UNUSED grpc_error_handle error) {
3031   DCHECK(error.ok());
3032   CHECK(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING);
3033   CHECK(t->keepalive_ping_timer_handle != TaskHandle::kInvalid);
3034   t->keepalive_ping_timer_handle = TaskHandle::kInvalid;
3035   if (t->destroying || !t->closed_with_error.ok()) {
3036     t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
3037   } else {
3038     if (t->keepalive_permit_without_calls || !t->stream_map.empty()) {
3039       t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING;
3040       send_keepalive_ping_locked(t);
3041       grpc_chttp2_initiate_write(t.get(),
3042                                  GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING);
3043     } else {
3044       t->keepalive_ping_timer_handle =
3045           t->event_engine->RunAfter(t->keepalive_time, [t] {
3046             grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
3047             grpc_core::ExecCtx exec_ctx;
3048             init_keepalive_ping(t);
3049           });
3050     }
3051   }
3052 }
3053 
finish_keepalive_ping(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)3054 static void finish_keepalive_ping(
3055     grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
3056     grpc_error_handle error) {
3057   auto* tp = t.get();
3058   tp->combiner->Run(
3059       grpc_core::InitTransportClosure<finish_keepalive_ping_locked>(
3060           std::move(t), &tp->finish_keepalive_ping_locked),
3061       error);
3062 }
3063 
finish_keepalive_ping_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)3064 static void finish_keepalive_ping_locked(
3065     grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
3066     grpc_error_handle error) {
3067   if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
3068     if (error.ok()) {
3069       if (GRPC_TRACE_FLAG_ENABLED(http) ||
3070           GRPC_TRACE_FLAG_ENABLED(http_keepalive)) {
3071         LOG(INFO) << t->peer_string.as_string_view()
3072                   << ": Finish keepalive ping";
3073       }
3074       t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
3075       CHECK(t->keepalive_ping_timer_handle == TaskHandle::kInvalid);
3076       t->keepalive_ping_timer_handle =
3077           t->event_engine->RunAfter(t->keepalive_time, [t] {
3078             grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
3079             grpc_core::ExecCtx exec_ctx;
3080             init_keepalive_ping(t);
3081           });
3082     }
3083   }
3084 }
3085 
maybe_reset_keepalive_ping_timer_locked(grpc_chttp2_transport * t)3086 static void maybe_reset_keepalive_ping_timer_locked(grpc_chttp2_transport* t) {
3087   if (t->keepalive_ping_timer_handle != TaskHandle::kInvalid &&
3088       t->event_engine->Cancel(t->keepalive_ping_timer_handle)) {
3089     // Cancel succeeds, resets the keepalive ping timer. Note that we don't
3090     // need to Ref or Unref here since we still hold the Ref.
3091     if (GRPC_TRACE_FLAG_ENABLED(http) ||
3092         GRPC_TRACE_FLAG_ENABLED(http_keepalive)) {
3093       LOG(INFO) << t->peer_string.as_string_view()
3094                 << ": Keepalive ping cancelled. Resetting timer.";
3095     }
3096     t->keepalive_ping_timer_handle =
3097         t->event_engine->RunAfter(t->keepalive_time, [t = t->Ref()]() mutable {
3098           grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
3099           grpc_core::ExecCtx exec_ctx;
3100           init_keepalive_ping(std::move(t));
3101         });
3102   }
3103 }
3104 
3105 //
3106 // CALLBACK LOOP
3107 //
3108 
connectivity_state_set(grpc_chttp2_transport * t,grpc_connectivity_state state,const absl::Status & status,const char * reason)3109 static void connectivity_state_set(grpc_chttp2_transport* t,
3110                                    grpc_connectivity_state state,
3111                                    const absl::Status& status,
3112                                    const char* reason) {
3113   GRPC_TRACE_LOG(http, INFO)
3114       << "transport " << t << " set connectivity_state=" << state
3115       << "; status=" << status.ToString() << "; reason=" << reason;
3116   t->state_tracker.SetState(state, status, reason);
3117 }
3118 
3119 //
3120 // POLLSET STUFF
3121 //
3122 
SetPollset(grpc_stream *,grpc_pollset * pollset)3123 void grpc_chttp2_transport::SetPollset(grpc_stream* /*gs*/,
3124                                        grpc_pollset* pollset) {
3125   // We don't want the overhead of acquiring the mutex unless we're
3126   // using the "poll" polling engine, which is the only one that
3127   // actually uses pollsets.
3128   if (strcmp(grpc_get_poll_strategy_name(), "poll") != 0) return;
3129   grpc_core::MutexLock lock(&ep_destroy_mu);
3130   if (ep != nullptr) grpc_endpoint_add_to_pollset(ep.get(), pollset);
3131 }
3132 
SetPollsetSet(grpc_stream *,grpc_pollset_set * pollset_set)3133 void grpc_chttp2_transport::SetPollsetSet(grpc_stream* /*gs*/,
3134                                           grpc_pollset_set* pollset_set) {
3135   // We don't want the overhead of acquiring the mutex unless we're
3136   // using the "poll" polling engine, which is the only one that
3137   // actually uses pollsets.
3138   if (strcmp(grpc_get_poll_strategy_name(), "poll") != 0) return;
3139   grpc_core::MutexLock lock(&ep_destroy_mu);
3140   if (ep != nullptr) grpc_endpoint_add_to_pollset_set(ep.get(), pollset_set);
3141 }
3142 
3143 //
3144 // RESOURCE QUOTAS
3145 //
3146 
post_benign_reclaimer(grpc_chttp2_transport * t)3147 static void post_benign_reclaimer(grpc_chttp2_transport* t) {
3148   if (!t->benign_reclaimer_registered) {
3149     t->benign_reclaimer_registered = true;
3150     t->memory_owner.PostReclaimer(
3151         grpc_core::ReclamationPass::kBenign,
3152         [t = t->Ref()](
3153             absl::optional<grpc_core::ReclamationSweep> sweep) mutable {
3154           if (sweep.has_value()) {
3155             auto* tp = t.get();
3156             tp->active_reclamation = std::move(*sweep);
3157             tp->combiner->Run(
3158                 grpc_core::InitTransportClosure<benign_reclaimer_locked>(
3159                     std::move(t), &tp->benign_reclaimer_locked),
3160                 absl::OkStatus());
3161           }
3162         });
3163   }
3164 }
3165 
post_destructive_reclaimer(grpc_chttp2_transport * t)3166 static void post_destructive_reclaimer(grpc_chttp2_transport* t) {
3167   if (!t->destructive_reclaimer_registered) {
3168     t->destructive_reclaimer_registered = true;
3169     t->memory_owner.PostReclaimer(
3170         grpc_core::ReclamationPass::kDestructive,
3171         [t = t->Ref()](
3172             absl::optional<grpc_core::ReclamationSweep> sweep) mutable {
3173           if (sweep.has_value()) {
3174             auto* tp = t.get();
3175             tp->active_reclamation = std::move(*sweep);
3176             tp->combiner->Run(
3177                 grpc_core::InitTransportClosure<destructive_reclaimer_locked>(
3178                     std::move(t), &tp->destructive_reclaimer_locked),
3179                 absl::OkStatus());
3180           }
3181         });
3182   }
3183 }
3184 
benign_reclaimer_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)3185 static void benign_reclaimer_locked(
3186     grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
3187     grpc_error_handle error) {
3188   if (error.ok() && t->stream_map.empty()) {
3189     // Channel with no active streams: send a goaway to try and make it
3190     // disconnect cleanly
3191     grpc_core::global_stats().IncrementRqConnectionsDropped();
3192     GRPC_TRACE_LOG(resource_quota, INFO)
3193         << "HTTP2: " << t->peer_string.as_string_view()
3194         << " - send goaway to free memory";
3195     send_goaway(t.get(),
3196                 grpc_error_set_int(GRPC_ERROR_CREATE("Buffers full"),
3197                                    grpc_core::StatusIntProperty::kHttp2Error,
3198                                    GRPC_HTTP2_ENHANCE_YOUR_CALM),
3199                 /*immediate_disconnect_hint=*/true);
3200   } else if (error.ok() && GRPC_TRACE_FLAG_ENABLED(resource_quota)) {
3201     LOG(INFO) << "HTTP2: " << t->peer_string.as_string_view()
3202               << " - skip benign reclamation, there are still "
3203               << t->stream_map.size() << " streams";
3204   }
3205   t->benign_reclaimer_registered = false;
3206   if (error != absl::CancelledError()) {
3207     t->active_reclamation.Finish();
3208   }
3209 }
3210 
destructive_reclaimer_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)3211 static void destructive_reclaimer_locked(
3212     grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
3213     grpc_error_handle error) {
3214   t->destructive_reclaimer_registered = false;
3215   if (error.ok() && !t->stream_map.empty()) {
3216     // As stream_map is a hash map, this selects effectively a random stream.
3217     grpc_chttp2_stream* s = t->stream_map.begin()->second;
3218     GRPC_TRACE_LOG(resource_quota, INFO)
3219         << "HTTP2: " << t->peer_string.as_string_view()
3220         << " - abandon stream id " << s->id;
3221     grpc_core::global_stats().IncrementRqCallsDropped();
3222     grpc_chttp2_cancel_stream(
3223         t.get(), s,
3224         grpc_error_set_int(GRPC_ERROR_CREATE("Buffers full"),
3225                            grpc_core::StatusIntProperty::kHttp2Error,
3226                            GRPC_HTTP2_ENHANCE_YOUR_CALM),
3227         false);
3228     if (!t->stream_map.empty()) {
3229       // Since we cancel one stream per destructive reclamation, if
3230       //   there are more streams left, we can immediately post a new
3231       //   reclaimer in case the resource quota needs to free more
3232       //   memory
3233       post_destructive_reclaimer(t.get());
3234     }
3235   }
3236   if (error != absl::CancelledError()) {
3237     t->active_reclamation.Finish();
3238   }
3239 }
3240 
3241 //
3242 // MONITORING
3243 //
3244 
grpc_chttp2_initiate_write_reason_string(grpc_chttp2_initiate_write_reason reason)3245 const char* grpc_chttp2_initiate_write_reason_string(
3246     grpc_chttp2_initiate_write_reason reason) {
3247   switch (reason) {
3248     case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE:
3249       return "INITIAL_WRITE";
3250     case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM:
3251       return "START_NEW_STREAM";
3252     case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE:
3253       return "SEND_MESSAGE";
3254     case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA:
3255       return "SEND_INITIAL_METADATA";
3256     case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA:
3257       return "SEND_TRAILING_METADATA";
3258     case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING:
3259       return "RETRY_SEND_PING";
3260     case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS:
3261       return "CONTINUE_PINGS";
3262     case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT:
3263       return "GOAWAY_SENT";
3264     case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM:
3265       return "RST_STREAM";
3266     case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API:
3267       return "CLOSE_FROM_API";
3268     case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL:
3269       return "STREAM_FLOW_CONTROL";
3270     case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL:
3271       return "TRANSPORT_FLOW_CONTROL";
3272     case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS:
3273       return "SEND_SETTINGS";
3274     case GRPC_CHTTP2_INITIATE_WRITE_SETTINGS_ACK:
3275       return "SETTINGS_ACK";
3276     case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING:
3277       return "FLOW_CONTROL_UNSTALLED_BY_SETTING";
3278     case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE:
3279       return "FLOW_CONTROL_UNSTALLED_BY_UPDATE";
3280     case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING:
3281       return "APPLICATION_PING";
3282     case GRPC_CHTTP2_INITIATE_WRITE_BDP_PING:
3283       return "BDP_PING";
3284     case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING:
3285       return "KEEPALIVE_PING";
3286     case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED:
3287       return "TRANSPORT_FLOW_CONTROL_UNSTALLED";
3288     case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE:
3289       return "PING_RESPONSE";
3290     case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM:
3291       return "FORCE_RST_STREAM";
3292   }
3293   GPR_UNREACHABLE_CODE(return "unknown");
3294 }
3295 
SizeOfStream() const3296 size_t grpc_chttp2_transport::SizeOfStream() const {
3297   return sizeof(grpc_chttp2_stream);
3298 }
3299 
3300 bool grpc_chttp2_transport::
HackyDisableStreamOpBatchCoalescingInConnectedChannel() const3301     HackyDisableStreamOpBatchCoalescingInConnectedChannel() const {
3302   return false;
3303 }
3304 
GetTransportName() const3305 absl::string_view grpc_chttp2_transport::GetTransportName() const {
3306   return "chttp2";
3307 }
3308 
3309 grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode>
grpc_chttp2_transport_get_socket_node(grpc_core::Transport * transport)3310 grpc_chttp2_transport_get_socket_node(grpc_core::Transport* transport) {
3311   grpc_chttp2_transport* t =
3312       reinterpret_cast<grpc_chttp2_transport*>(transport);
3313   return t->channelz_socket;
3314 }
3315 
grpc_create_chttp2_transport(const grpc_core::ChannelArgs & channel_args,grpc_core::OrphanablePtr<grpc_endpoint> ep,const bool is_client)3316 grpc_core::Transport* grpc_create_chttp2_transport(
3317     const grpc_core::ChannelArgs& channel_args,
3318     grpc_core::OrphanablePtr<grpc_endpoint> ep, const bool is_client) {
3319   return new grpc_chttp2_transport(channel_args, std::move(ep), is_client);
3320 }
3321 
grpc_chttp2_transport_start_reading(grpc_core::Transport * transport,grpc_slice_buffer * read_buffer,grpc_closure * notify_on_receive_settings,grpc_pollset_set * interested_parties_until_recv_settings,grpc_closure * notify_on_close)3322 void grpc_chttp2_transport_start_reading(
3323     grpc_core::Transport* transport, grpc_slice_buffer* read_buffer,
3324     grpc_closure* notify_on_receive_settings,
3325     grpc_pollset_set* interested_parties_until_recv_settings,
3326     grpc_closure* notify_on_close) {
3327   auto t = reinterpret_cast<grpc_chttp2_transport*>(transport)->Ref();
3328   if (read_buffer != nullptr) {
3329     grpc_slice_buffer_move_into(read_buffer, &t->read_buffer);
3330   }
3331   auto* tp = t.get();
3332   tp->combiner->Run(
3333       grpc_core::NewClosure([t = std::move(t), notify_on_receive_settings,
3334                              interested_parties_until_recv_settings,
3335                              notify_on_close](grpc_error_handle) mutable {
3336         if (!t->closed_with_error.ok()) {
3337           if (notify_on_receive_settings != nullptr) {
3338             if (t->ep != nullptr &&
3339                 interested_parties_until_recv_settings != nullptr) {
3340               grpc_endpoint_delete_from_pollset_set(
3341                   t->ep.get(), interested_parties_until_recv_settings);
3342             }
3343             grpc_core::ExecCtx::Run(DEBUG_LOCATION, notify_on_receive_settings,
3344                                     t->closed_with_error);
3345           }
3346           if (notify_on_close != nullptr) {
3347             grpc_core::ExecCtx::Run(DEBUG_LOCATION, notify_on_close,
3348                                     t->closed_with_error);
3349           }
3350           return;
3351         }
3352         t->interested_parties_until_recv_settings =
3353             interested_parties_until_recv_settings;
3354         t->notify_on_receive_settings = notify_on_receive_settings;
3355         t->notify_on_close = notify_on_close;
3356         read_action_locked(std::move(t), absl::OkStatus());
3357       }),
3358       absl::OkStatus());
3359 }
3360