1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
18
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpc/grpc.h>
21 #include <grpc/impl/channel_arg_names.h>
22 #include <grpc/impl/connectivity_state.h>
23 #include <grpc/slice_buffer.h>
24 #include <grpc/status.h>
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/port_platform.h>
27 #include <grpc/support/time.h>
28 #include <inttypes.h>
29 #include <limits.h>
30 #include <string.h>
31
32 #include <algorithm>
33 #include <atomic>
34 #include <cstddef>
35 #include <cstdint>
36 #include <limits>
37 #include <memory>
38 #include <new>
39 #include <string>
40 #include <type_traits>
41 #include <utility>
42 #include <vector>
43
44 #include "absl/base/attributes.h"
45 #include "absl/container/flat_hash_map.h"
46 #include "absl/hash/hash.h"
47 #include "absl/log/check.h"
48 #include "absl/log/log.h"
49 #include "absl/meta/type_traits.h"
50 #include "absl/random/random.h"
51 #include "absl/status/status.h"
52 #include "absl/strings/cord.h"
53 #include "absl/strings/str_cat.h"
54 #include "absl/strings/str_format.h"
55 #include "absl/strings/string_view.h"
56 #include "absl/types/optional.h"
57 #include "absl/types/variant.h"
58 #include "src/core/config/config_vars.h"
59 #include "src/core/ext/transport/chttp2/transport/call_tracer_wrapper.h"
60 #include "src/core/ext/transport/chttp2/transport/context_list_entry.h"
61 #include "src/core/ext/transport/chttp2/transport/flow_control.h"
62 #include "src/core/ext/transport/chttp2/transport/frame_data.h"
63 #include "src/core/ext/transport/chttp2/transport/frame_goaway.h"
64 #include "src/core/ext/transport/chttp2/transport/frame_rst_stream.h"
65 #include "src/core/ext/transport/chttp2/transport/frame_security.h"
66 #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
67 #include "src/core/ext/transport/chttp2/transport/http2_settings.h"
68 #include "src/core/ext/transport/chttp2/transport/internal.h"
69 #include "src/core/ext/transport/chttp2/transport/legacy_frame.h"
70 #include "src/core/ext/transport/chttp2/transport/ping_abuse_policy.h"
71 #include "src/core/ext/transport/chttp2/transport/ping_callbacks.h"
72 #include "src/core/ext/transport/chttp2/transport/ping_rate_policy.h"
73 #include "src/core/ext/transport/chttp2/transport/stream_lists.h"
74 #include "src/core/ext/transport/chttp2/transport/varint.h"
75 #include "src/core/ext/transport/chttp2/transport/write_size_policy.h"
76 #include "src/core/lib/channel/channel_args.h"
77 #include "src/core/lib/event_engine/extensions/tcp_trace.h"
78 #include "src/core/lib/event_engine/query_extensions.h"
79 #include "src/core/lib/experiments/experiments.h"
80 #include "src/core/lib/iomgr/combiner.h"
81 #include "src/core/lib/iomgr/endpoint.h"
82 #include "src/core/lib/iomgr/error.h"
83 #include "src/core/lib/iomgr/ev_posix.h"
84 #include "src/core/lib/iomgr/event_engine_shims/endpoint.h"
85 #include "src/core/lib/iomgr/exec_ctx.h"
86 #include "src/core/lib/iomgr/iomgr_fwd.h"
87 #include "src/core/lib/iomgr/port.h"
88 #include "src/core/lib/promise/poll.h"
89 #include "src/core/lib/resource_quota/arena.h"
90 #include "src/core/lib/resource_quota/memory_quota.h"
91 #include "src/core/lib/resource_quota/resource_quota.h"
92 #include "src/core/lib/slice/slice.h"
93 #include "src/core/lib/slice/slice_buffer.h"
94 #include "src/core/lib/slice/slice_internal.h"
95 #include "src/core/lib/transport/bdp_estimator.h"
96 #include "src/core/lib/transport/connectivity_state.h"
97 #include "src/core/lib/transport/error_utils.h"
98 #include "src/core/lib/transport/http2_errors.h"
99 #include "src/core/lib/transport/metadata_batch.h"
100 #include "src/core/lib/transport/metadata_info.h"
101 #include "src/core/lib/transport/status_conversion.h"
102 #include "src/core/lib/transport/transport.h"
103 #include "src/core/lib/transport/transport_framing_endpoint_extension.h"
104 #include "src/core/telemetry/call_tracer.h"
105 #include "src/core/telemetry/stats.h"
106 #include "src/core/telemetry/stats_data.h"
107 #include "src/core/telemetry/tcp_tracer.h"
108 #include "src/core/util/bitset.h"
109 #include "src/core/util/crash.h"
110 #include "src/core/util/debug_location.h"
111 #include "src/core/util/http_client/parser.h"
112 #include "src/core/util/ref_counted.h"
113 #include "src/core/util/status_helper.h"
114 #include "src/core/util/string.h"
115 #include "src/core/util/time.h"
116 #include "src/core/util/useful.h"
117
118 #define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
119 #define MAX_WINDOW 0x7fffffffu
120 #define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024)
121
122 #define KEEPALIVE_TIME_BACKOFF_MULTIPLIER 2
123
124 #define DEFAULT_MAX_PENDING_INDUCED_FRAMES 10000
125
126 #define GRPC_ARG_HTTP2_PING_ON_RST_STREAM_PERCENT \
127 "grpc.http2.ping_on_rst_stream_percent"
128
129 static grpc_core::Duration g_default_client_keepalive_time =
130 grpc_core::Duration::Infinity();
131 static grpc_core::Duration g_default_client_keepalive_timeout =
132 grpc_core::Duration::Seconds(20);
133 static grpc_core::Duration g_default_server_keepalive_time =
134 grpc_core::Duration::Hours(2);
135 static grpc_core::Duration g_default_server_keepalive_timeout =
136 grpc_core::Duration::Seconds(20);
137 static bool g_default_client_keepalive_permit_without_calls = false;
138 static bool g_default_server_keepalive_permit_without_calls = false;
139
140 // EXPERIMENTAL: control tarpitting in chttp2
141 #define GRPC_ARG_HTTP_ALLOW_TARPIT "grpc.http.tarpit"
142 #define GRPC_ARG_HTTP_TARPIT_MIN_DURATION_MS "grpc.http.tarpit_min_duration_ms"
143 #define GRPC_ARG_HTTP_TARPIT_MAX_DURATION_MS "grpc.http.tarpit_max_duration_ms"
144
145 #define MAX_CLIENT_STREAM_ID 0x7fffffffu
146
147 // forward declarations of various callbacks that we'll build closures around
148 static void write_action_begin_locked(
149 grpc_core::RefCountedPtr<grpc_chttp2_transport>, grpc_error_handle error);
150 static void write_action(grpc_chttp2_transport* t);
151 static void write_action_end(grpc_core::RefCountedPtr<grpc_chttp2_transport>,
152 grpc_error_handle error);
153 static void write_action_end_locked(
154 grpc_core::RefCountedPtr<grpc_chttp2_transport>, grpc_error_handle error);
155
156 static void read_action(grpc_core::RefCountedPtr<grpc_chttp2_transport>,
157 grpc_error_handle error);
158 static void read_action_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport>,
159 grpc_error_handle error);
160 static void continue_read_action_locked(
161 grpc_core::RefCountedPtr<grpc_chttp2_transport> t);
162
163 static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
164 grpc_error_handle error, bool tarpit);
165
166 // Start new streams that have been created if we can
167 static void maybe_start_some_streams(grpc_chttp2_transport* t);
168
169 static void connectivity_state_set(grpc_chttp2_transport* t,
170 grpc_connectivity_state state,
171 const absl::Status& status,
172 const char* reason);
173
174 static void benign_reclaimer_locked(
175 grpc_core::RefCountedPtr<grpc_chttp2_transport>, grpc_error_handle error);
176 static void destructive_reclaimer_locked(
177 grpc_core::RefCountedPtr<grpc_chttp2_transport>, grpc_error_handle error);
178
179 static void post_benign_reclaimer(grpc_chttp2_transport* t);
180 static void post_destructive_reclaimer(grpc_chttp2_transport* t);
181
182 static void close_transport_locked(grpc_chttp2_transport* t,
183 grpc_error_handle error);
184 static void end_all_the_calls(grpc_chttp2_transport* t,
185 grpc_error_handle error);
186
187 static void start_bdp_ping(grpc_core::RefCountedPtr<grpc_chttp2_transport>,
188 grpc_error_handle error);
189 static void finish_bdp_ping(grpc_core::RefCountedPtr<grpc_chttp2_transport>,
190 grpc_error_handle error);
191 static void start_bdp_ping_locked(
192 grpc_core::RefCountedPtr<grpc_chttp2_transport>, grpc_error_handle error);
193 static void finish_bdp_ping_locked(
194 grpc_core::RefCountedPtr<grpc_chttp2_transport>, grpc_error_handle error);
195 static void next_bdp_ping_timer_expired(grpc_chttp2_transport* t);
196 static void next_bdp_ping_timer_expired_locked(
197 grpc_core::RefCountedPtr<grpc_chttp2_transport> tp,
198 GRPC_UNUSED grpc_error_handle error);
199
200 static void cancel_pings(grpc_chttp2_transport* t, grpc_error_handle error);
201 static void send_ping_locked(grpc_chttp2_transport* t,
202 grpc_closure* on_initiate, grpc_closure* on_ack);
203 static void retry_initiate_ping_locked(
204 grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
205 GRPC_UNUSED grpc_error_handle error);
206
207 // keepalive-relevant functions
208 static void init_keepalive_ping(
209 grpc_core::RefCountedPtr<grpc_chttp2_transport> t);
210 static void init_keepalive_ping_locked(
211 grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
212 GRPC_UNUSED grpc_error_handle error);
213 static void finish_keepalive_ping(
214 grpc_core::RefCountedPtr<grpc_chttp2_transport> t, grpc_error_handle error);
215 static void finish_keepalive_ping_locked(
216 grpc_core::RefCountedPtr<grpc_chttp2_transport> t, grpc_error_handle error);
217 static void maybe_reset_keepalive_ping_timer_locked(grpc_chttp2_transport* t);
218
219 static void send_goaway(grpc_chttp2_transport* t, grpc_error_handle error,
220 bool immediate_disconnect_hint);
221
222 // Timeout for getting an ack back on settings changes
223 #define GRPC_ARG_SETTINGS_TIMEOUT "grpc.http2.settings_timeout"
224
225 namespace {
226
227 using TaskHandle = ::grpc_event_engine::experimental::EventEngine::TaskHandle;
228
CallTracerIfSampled(grpc_chttp2_stream * s)229 grpc_core::CallTracerAnnotationInterface* CallTracerIfSampled(
230 grpc_chttp2_stream* s) {
231 if (!grpc_core::IsTraceRecordCallopsEnabled()) {
232 return nullptr;
233 }
234 auto* call_tracer =
235 s->arena->GetContext<grpc_core::CallTracerAnnotationInterface>();
236 if (call_tracer == nullptr || !call_tracer->IsSampled()) {
237 return nullptr;
238 }
239 return call_tracer;
240 }
241
TcpTracerIfSampled(grpc_chttp2_stream * s)242 std::shared_ptr<grpc_core::TcpTracerInterface> TcpTracerIfSampled(
243 grpc_chttp2_stream* s) {
244 if (!grpc_core::IsTraceRecordCallopsEnabled()) {
245 return nullptr;
246 }
247 auto* call_attempt_tracer =
248 s->arena->GetContext<grpc_core::CallTracerInterface>();
249 if (call_attempt_tracer == nullptr || !call_attempt_tracer->IsSampled()) {
250 return nullptr;
251 }
252 return call_attempt_tracer->StartNewTcpTrace();
253 }
254
255 grpc_core::WriteTimestampsCallback g_write_timestamps_callback = nullptr;
256 grpc_core::CopyContextFn g_get_copied_context_fn = nullptr;
257 } // namespace
258
259 namespace grpc_core {
260
261 namespace {
262
263 // Initialize a grpc_closure \a c to call \a Fn with \a t and \a error. Holds
264 // the passed in reference to \a t until it's moved into Fn.
265 template <void (*Fn)(RefCountedPtr<grpc_chttp2_transport>, grpc_error_handle)>
InitTransportClosure(RefCountedPtr<grpc_chttp2_transport> t,grpc_closure * c)266 grpc_closure* InitTransportClosure(RefCountedPtr<grpc_chttp2_transport> t,
267 grpc_closure* c) {
268 GRPC_CLOSURE_INIT(
269 c,
270 [](void* tp, grpc_error_handle error) {
271 Fn(RefCountedPtr<grpc_chttp2_transport>(
272 static_cast<grpc_chttp2_transport*>(tp)),
273 std::move(error));
274 },
275 t.release(), nullptr);
276 return c;
277 }
278
279 TestOnlyGlobalHttp2TransportInitCallback test_only_init_callback = nullptr;
280 TestOnlyGlobalHttp2TransportDestructCallback test_only_destruct_callback =
281 nullptr;
282 bool test_only_disable_transient_failure_state_notification = false;
283
284 } // namespace
285
TestOnlySetGlobalHttp2TransportInitCallback(TestOnlyGlobalHttp2TransportInitCallback callback)286 void TestOnlySetGlobalHttp2TransportInitCallback(
287 TestOnlyGlobalHttp2TransportInitCallback callback) {
288 test_only_init_callback = callback;
289 }
290
TestOnlySetGlobalHttp2TransportDestructCallback(TestOnlyGlobalHttp2TransportDestructCallback callback)291 void TestOnlySetGlobalHttp2TransportDestructCallback(
292 TestOnlyGlobalHttp2TransportDestructCallback callback) {
293 test_only_destruct_callback = callback;
294 }
295
TestOnlyGlobalHttp2TransportDisableTransientFailureStateNotification(bool disable)296 void TestOnlyGlobalHttp2TransportDisableTransientFailureStateNotification(
297 bool disable) {
298 test_only_disable_transient_failure_state_notification = disable;
299 }
300
GrpcHttp2SetWriteTimestampsCallback(WriteTimestampsCallback fn)301 void GrpcHttp2SetWriteTimestampsCallback(WriteTimestampsCallback fn) {
302 g_write_timestamps_callback = fn;
303 }
304
GrpcHttp2SetCopyContextFn(CopyContextFn fn)305 void GrpcHttp2SetCopyContextFn(CopyContextFn fn) {
306 g_get_copied_context_fn = fn;
307 }
308
GrpcHttp2GetWriteTimestampsCallback()309 WriteTimestampsCallback GrpcHttp2GetWriteTimestampsCallback() {
310 return g_write_timestamps_callback;
311 }
312
GrpcHttp2GetCopyContextFn()313 CopyContextFn GrpcHttp2GetCopyContextFn() { return g_get_copied_context_fn; }
314
315 // For each entry in the passed ContextList, it executes the function set using
316 // GrpcHttp2SetWriteTimestampsCallback method with each context in the list
317 // and \a ts. It also deletes/frees up the passed ContextList after this
318 // operation.
ForEachContextListEntryExecute(void * arg,Timestamps * ts,grpc_error_handle error)319 void ForEachContextListEntryExecute(void* arg, Timestamps* ts,
320 grpc_error_handle error) {
321 ContextList* context_list = reinterpret_cast<ContextList*>(arg);
322 if (!context_list) {
323 return;
324 }
325 for (auto it = context_list->begin(); it != context_list->end(); it++) {
326 ContextListEntry& entry = (*it);
327 if (ts) {
328 ts->byte_offset = static_cast<uint32_t>(entry.ByteOffsetInStream());
329 }
330 g_write_timestamps_callback(entry.TraceContext(), ts, error);
331 }
332 delete context_list;
333 }
334
HttpAnnotation(Type type,gpr_timespec time)335 HttpAnnotation::HttpAnnotation(Type type, gpr_timespec time)
336 : CallTracerAnnotationInterface::Annotation(
337 CallTracerAnnotationInterface::AnnotationType::kHttpTransport),
338 type_(type),
339 time_(time) {}
340
ToString() const341 std::string HttpAnnotation::ToString() const {
342 std::string s = "HttpAnnotation type: ";
343 switch (type_) {
344 case Type::kStart:
345 absl::StrAppend(&s, "Start");
346 break;
347 case Type::kHeadWritten:
348 absl::StrAppend(&s, "HeadWritten");
349 break;
350 case Type::kEnd:
351 absl::StrAppend(&s, "End");
352 break;
353 default:
354 absl::StrAppend(&s, "Unknown");
355 }
356 absl::StrAppend(&s, " time: ", gpr_format_timespec(time_));
357 if (transport_stats_.has_value()) {
358 absl::StrAppend(&s, " transport:[", transport_stats_->ToString(), "]");
359 }
360 if (stream_stats_.has_value()) {
361 absl::StrAppend(&s, " stream:[", stream_stats_->ToString(), "]");
362 }
363 return s;
364 }
365
366 } // namespace grpc_core
367
368 //
369 // CONSTRUCTION/DESTRUCTION/REFCOUNTING
370 //
371
~grpc_chttp2_transport()372 grpc_chttp2_transport::~grpc_chttp2_transport() {
373 size_t i;
374
375 cancel_pings(this, GRPC_ERROR_CREATE("Transport destroyed"));
376
377 event_engine.reset();
378
379 if (channelz_socket != nullptr) {
380 channelz_socket.reset();
381 }
382
383 grpc_slice_buffer_destroy(&qbuf);
384
385 grpc_error_handle error = GRPC_ERROR_CREATE("Transport destroyed");
386 // ContextList::Execute follows semantics of a callback function and does not
387 // take a ref on error
388 if (context_list != nullptr) {
389 grpc_core::ForEachContextListEntryExecute(context_list, nullptr, error);
390 }
391 context_list = nullptr;
392
393 grpc_slice_buffer_destroy(&read_buffer);
394 grpc_chttp2_goaway_parser_destroy(&goaway_parser);
395
396 for (i = 0; i < STREAM_LIST_COUNT; i++) {
397 CHECK_EQ(lists[i].head, nullptr);
398 CHECK_EQ(lists[i].tail, nullptr);
399 }
400
401 CHECK(stream_map.empty());
402 GRPC_COMBINER_UNREF(combiner, "chttp2_transport");
403
404 while (write_cb_pool) {
405 grpc_chttp2_write_cb* next = write_cb_pool->next;
406 gpr_free(write_cb_pool);
407 write_cb_pool = next;
408 }
409
410 gpr_free(ping_acks);
411 if (grpc_core::test_only_destruct_callback != nullptr) {
412 grpc_core::test_only_destruct_callback();
413 }
414 }
415
read_channel_args(grpc_chttp2_transport * t,const grpc_core::ChannelArgs & channel_args,const bool is_client)416 static void read_channel_args(grpc_chttp2_transport* t,
417 const grpc_core::ChannelArgs& channel_args,
418 const bool is_client) {
419 const int initial_sequence_number =
420 channel_args.GetInt(GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER).value_or(-1);
421 if (initial_sequence_number > 0) {
422 if ((t->next_stream_id & 1) != (initial_sequence_number & 1)) {
423 LOG(ERROR) << GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER
424 << ": low bit must be " << (t->next_stream_id & 1) << " on "
425 << (is_client ? "client" : "server");
426 } else {
427 t->next_stream_id = static_cast<uint32_t>(initial_sequence_number);
428 }
429 }
430
431 const int max_hpack_table_size =
432 channel_args.GetInt(GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER).value_or(-1);
433 if (max_hpack_table_size >= 0) {
434 t->hpack_compressor.SetMaxUsableSize(max_hpack_table_size);
435 }
436
437 t->write_buffer_size =
438 std::max(0, channel_args.GetInt(GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)
439 .value_or(grpc_core::chttp2::kDefaultWindow));
440 t->keepalive_time =
441 std::max(grpc_core::Duration::Milliseconds(1),
442 channel_args.GetDurationFromIntMillis(GRPC_ARG_KEEPALIVE_TIME_MS)
443 .value_or(t->is_client ? g_default_client_keepalive_time
444 : g_default_server_keepalive_time));
445 t->keepalive_timeout = std::max(
446 grpc_core::Duration::Zero(),
447 channel_args.GetDurationFromIntMillis(GRPC_ARG_KEEPALIVE_TIMEOUT_MS)
448 .value_or(t->keepalive_time == grpc_core::Duration::Infinity()
449 ? grpc_core::Duration::Infinity()
450 : (t->is_client ? g_default_client_keepalive_timeout
451 : g_default_server_keepalive_timeout)));
452 t->ping_timeout = std::max(
453 grpc_core::Duration::Zero(),
454 channel_args.GetDurationFromIntMillis(GRPC_ARG_PING_TIMEOUT_MS)
455 .value_or(t->keepalive_time == grpc_core::Duration::Infinity()
456 ? grpc_core::Duration::Infinity()
457 : grpc_core::Duration::Minutes(1)));
458 if (t->is_client) {
459 t->keepalive_permit_without_calls =
460 channel_args.GetBool(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)
461 .value_or(g_default_client_keepalive_permit_without_calls);
462 } else {
463 t->keepalive_permit_without_calls =
464 channel_args.GetBool(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)
465 .value_or(g_default_server_keepalive_permit_without_calls);
466 }
467
468 t->settings_timeout =
469 channel_args.GetDurationFromIntMillis(GRPC_ARG_SETTINGS_TIMEOUT)
470 .value_or(std::max(t->keepalive_timeout * 2,
471 grpc_core::Duration::Minutes(1)));
472
473 // Only send the preferred rx frame size http2 setting if we are instructed
474 // to auto size the buffers allocated at tcp level and we also can adjust
475 // sending frame size.
476 t->enable_preferred_rx_crypto_frame_advertisement =
477 channel_args
478 .GetBool(GRPC_ARG_EXPERIMENTAL_HTTP2_PREFERRED_CRYPTO_FRAME_SIZE)
479 .value_or(false);
480
481 const auto max_requests_per_read =
482 channel_args.GetInt("grpc.http2.max_requests_per_read");
483 if (max_requests_per_read.has_value()) {
484 t->max_requests_per_read =
485 grpc_core::Clamp(*max_requests_per_read, 1, 10000);
486 } else {
487 t->max_requests_per_read = 32;
488 }
489
490 if (channel_args.GetBool(GRPC_ARG_ENABLE_CHANNELZ)
491 .value_or(GRPC_ENABLE_CHANNELZ_DEFAULT)) {
492 t->channelz_socket =
493 grpc_core::MakeRefCounted<grpc_core::channelz::SocketNode>(
494 std::string(grpc_endpoint_get_local_address(t->ep.get())),
495 std::string(t->peer_string.as_string_view()),
496 absl::StrCat(t->GetTransportName(), " ",
497 t->peer_string.as_string_view()),
498 channel_args
499 .GetObjectRef<grpc_core::channelz::SocketNode::Security>());
500 }
501
502 t->ack_pings = channel_args.GetBool("grpc.http2.ack_pings").value_or(true);
503
504 t->allow_tarpit =
505 channel_args.GetBool(GRPC_ARG_HTTP_ALLOW_TARPIT).value_or(true);
506 t->min_tarpit_duration_ms =
507 channel_args
508 .GetDurationFromIntMillis(GRPC_ARG_HTTP_TARPIT_MIN_DURATION_MS)
509 .value_or(grpc_core::Duration::Milliseconds(100))
510 .millis();
511 t->max_tarpit_duration_ms =
512 channel_args
513 .GetDurationFromIntMillis(GRPC_ARG_HTTP_TARPIT_MAX_DURATION_MS)
514 .value_or(grpc_core::Duration::Seconds(1))
515 .millis();
516 t->max_header_list_size_soft_limit =
517 grpc_core::GetSoftLimitFromChannelArgs(channel_args);
518
519 int value;
520 if (!is_client) {
521 value = channel_args.GetInt(GRPC_ARG_MAX_CONCURRENT_STREAMS).value_or(-1);
522 if (value >= 0) {
523 t->settings.mutable_local().SetMaxConcurrentStreams(value);
524 }
525 } else if (channel_args.Contains(GRPC_ARG_MAX_CONCURRENT_STREAMS)) {
526 VLOG(2) << GRPC_ARG_MAX_CONCURRENT_STREAMS
527 << " is not available on clients";
528 }
529 value =
530 channel_args.GetInt(GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER).value_or(-1);
531 if (value >= 0) {
532 t->settings.mutable_local().SetHeaderTableSize(value);
533 }
534 t->settings.mutable_local().SetMaxHeaderListSize(
535 grpc_core::GetHardLimitFromChannelArgs(channel_args));
536 value = channel_args.GetInt(GRPC_ARG_HTTP2_MAX_FRAME_SIZE).value_or(-1);
537 if (value >= 0) {
538 t->settings.mutable_local().SetMaxFrameSize(value);
539 }
540 value =
541 channel_args.GetInt(GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES).value_or(-1);
542 if (value >= 0) {
543 t->settings.mutable_local().SetInitialWindowSize(value);
544 }
545 value = channel_args.GetInt(GRPC_ARG_HTTP2_ENABLE_TRUE_BINARY).value_or(-1);
546 if (value >= 0) {
547 t->settings.mutable_local().SetAllowTrueBinaryMetadata(value != 0);
548 }
549
550 if (t->enable_preferred_rx_crypto_frame_advertisement) {
551 t->settings.mutable_local().SetPreferredReceiveCryptoMessageSize(INT_MAX);
552 }
553
554 t->settings.mutable_local().SetAllowSecurityFrame(
555 channel_args.GetBool(GRPC_ARG_SECURITY_FRAME_ALLOWED).value_or(false));
556
557 t->ping_on_rst_stream_percent = grpc_core::Clamp(
558 channel_args.GetInt(GRPC_ARG_HTTP2_PING_ON_RST_STREAM_PERCENT)
559 .value_or(1),
560 0, 100);
561
562 t->max_concurrent_streams_overload_protection =
563 channel_args.GetBool(GRPC_ARG_MAX_CONCURRENT_STREAMS_OVERLOAD_PROTECTION)
564 .value_or(true);
565
566 t->max_concurrent_streams_reject_on_client =
567 channel_args.GetBool(GRPC_ARG_MAX_CONCURRENT_STREAMS_REJECT_ON_CLIENT)
568 .value_or(false);
569 }
570
init_keepalive_pings_if_enabled_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,GRPC_UNUSED grpc_error_handle error)571 static void init_keepalive_pings_if_enabled_locked(
572 grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
573 GRPC_UNUSED grpc_error_handle error) {
574 DCHECK(error.ok());
575 if (t->keepalive_time != grpc_core::Duration::Infinity()) {
576 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
577 t->keepalive_ping_timer_handle =
578 t->event_engine->RunAfter(t->keepalive_time, [t = t->Ref()]() mutable {
579 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
580 grpc_core::ExecCtx exec_ctx;
581 init_keepalive_ping(std::move(t));
582 });
583 } else {
584 // Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no
585 // inflight keepalive timers
586 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED;
587 }
588 }
589
590 // TODO(alishananda): add unit testing as part of chttp2 promise conversion work
WriteSecurityFrame(grpc_core::SliceBuffer * data)591 void grpc_chttp2_transport::WriteSecurityFrame(grpc_core::SliceBuffer* data) {
592 grpc_core::ExecCtx exec_ctx;
593 combiner->Run(grpc_core::NewClosure(
594 [transport = Ref(), data](grpc_error_handle) mutable {
595 transport->WriteSecurityFrameLocked(data);
596 }),
597 absl::OkStatus());
598 }
599
WriteSecurityFrameLocked(grpc_core::SliceBuffer * data)600 void grpc_chttp2_transport::WriteSecurityFrameLocked(
601 grpc_core::SliceBuffer* data) {
602 if (data == nullptr) {
603 return;
604 }
605 if (!settings.peer().allow_security_frame()) {
606 close_transport_locked(
607 this,
608 grpc_error_set_int(
609 GRPC_ERROR_CREATE("Unexpected SECURITY frame scheduled for write"),
610 grpc_core::StatusIntProperty::kRpcStatus,
611 GRPC_STATUS_FAILED_PRECONDITION));
612 }
613 grpc_core::SliceBuffer security_frame;
614 grpc_chttp2_security_frame_create(data->c_slice_buffer(), data->Length(),
615 security_frame.c_slice_buffer());
616 grpc_slice_buffer_move_into(security_frame.c_slice_buffer(), &qbuf);
617 grpc_chttp2_initiate_write(this, GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE);
618 }
619
620 using grpc_event_engine::experimental::QueryExtension;
621 using grpc_event_engine::experimental::TcpTraceExtension;
622
grpc_chttp2_transport(const grpc_core::ChannelArgs & channel_args,grpc_core::OrphanablePtr<grpc_endpoint> endpoint,const bool is_client)623 grpc_chttp2_transport::grpc_chttp2_transport(
624 const grpc_core::ChannelArgs& channel_args,
625 grpc_core::OrphanablePtr<grpc_endpoint> endpoint, const bool is_client)
626 : ep(std::move(endpoint)),
627 peer_string(
628 grpc_core::Slice::FromCopiedString(grpc_endpoint_get_peer(ep.get()))),
629 memory_owner(channel_args.GetObject<grpc_core::ResourceQuota>()
630 ->memory_quota()
631 ->CreateMemoryOwner()),
632 self_reservation(
633 memory_owner.MakeReservation(sizeof(grpc_chttp2_transport))),
634 event_engine(
635 channel_args
636 .GetObjectRef<grpc_event_engine::experimental::EventEngine>()),
637 combiner(grpc_combiner_create(event_engine)),
638 state_tracker(is_client ? "client_transport" : "server_transport",
639 GRPC_CHANNEL_READY),
640 next_stream_id(is_client ? 1 : 2),
641 ping_abuse_policy(channel_args),
642 ping_rate_policy(channel_args, is_client),
643 flow_control(
644 peer_string.as_string_view(),
645 channel_args.GetBool(GRPC_ARG_HTTP2_BDP_PROBE).value_or(true),
646 &memory_owner),
647 deframe_state(is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0),
648 is_client(is_client) {
649 context_list = new grpc_core::ContextList();
650
651 if (channel_args.GetBool(GRPC_ARG_TCP_TRACING_ENABLED).value_or(false) &&
652 grpc_event_engine::experimental::grpc_is_event_engine_endpoint(
653 ep.get())) {
654 auto epte = QueryExtension<TcpTraceExtension>(
655 grpc_event_engine::experimental::grpc_get_wrapped_event_engine_endpoint(
656 ep.get()));
657 if (epte != nullptr) {
658 epte->InitializeAndReturnTcpTracer();
659 }
660 }
661
662 if (channel_args.GetBool(GRPC_ARG_SECURITY_FRAME_ALLOWED).value_or(false)) {
663 transport_framing_endpoint_extension = QueryExtension<
664 grpc_core::TransportFramingEndpointExtension>(
665 grpc_event_engine::experimental::grpc_get_wrapped_event_engine_endpoint(
666 ep.get()));
667 if (transport_framing_endpoint_extension != nullptr) {
668 transport_framing_endpoint_extension->SetSendFrameCallback(
669 [this](grpc_core::SliceBuffer* data) { WriteSecurityFrame(data); });
670 }
671 }
672
673 CHECK(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) ==
674 GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
675
676 grpc_slice_buffer_init(&read_buffer);
677 if (is_client) {
678 grpc_slice_buffer_add(
679 outbuf.c_slice_buffer(),
680 grpc_slice_from_copied_string(GRPC_CHTTP2_CLIENT_CONNECT_STRING));
681 }
682 grpc_slice_buffer_init(&qbuf);
683 grpc_chttp2_goaway_parser_init(&goaway_parser);
684
685 // configure http2 the way we like it
686 if (is_client) {
687 settings.mutable_local().SetEnablePush(false);
688 settings.mutable_local().SetMaxConcurrentStreams(0);
689 }
690 settings.mutable_local().SetMaxHeaderListSize(DEFAULT_MAX_HEADER_LIST_SIZE);
691 settings.mutable_local().SetAllowTrueBinaryMetadata(true);
692
693 read_channel_args(this, channel_args, is_client);
694
695 // Initially allow *UP TO* MAX_CONCURRENT_STREAMS incoming before we start
696 // blanket cancelling them.
697 num_incoming_streams_before_settings_ack =
698 settings.local().max_concurrent_streams();
699
700 grpc_core::ExecCtx exec_ctx;
701 combiner->Run(
702 grpc_core::InitTransportClosure<init_keepalive_pings_if_enabled_locked>(
703 Ref(), &init_keepalive_ping_locked),
704 absl::OkStatus());
705
706 if (flow_control.bdp_probe()) {
707 bdp_ping_blocked = true;
708 grpc_chttp2_act_on_flowctl_action(flow_control.PeriodicUpdate(), this,
709 nullptr);
710 }
711
712 grpc_chttp2_initiate_write(this, GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE);
713 post_benign_reclaimer(this);
714 if (grpc_core::test_only_init_callback != nullptr) {
715 grpc_core::test_only_init_callback();
716 }
717
718 #ifdef GRPC_POSIX_SOCKET_TCP
719 closure_barrier_may_cover_write =
720 grpc_event_engine_run_in_background() &&
721 grpc_core::IsScheduleCancellationOverWriteEnabled()
722 ? 0
723 : CLOSURE_BARRIER_MAY_COVER_WRITE;
724 #endif
725 }
726
destroy_transport_locked(void * tp,grpc_error_handle)727 static void destroy_transport_locked(void* tp, grpc_error_handle /*error*/) {
728 grpc_core::RefCountedPtr<grpc_chttp2_transport> t(
729 static_cast<grpc_chttp2_transport*>(tp));
730 t->destroying = 1;
731 close_transport_locked(
732 t.get(),
733 grpc_error_set_int(GRPC_ERROR_CREATE("Transport destroyed"),
734 grpc_core::StatusIntProperty::kOccurredDuringWrite,
735 t->write_state));
736 t->memory_owner.Reset();
737 }
738
Orphan()739 void grpc_chttp2_transport::Orphan() {
740 combiner->Run(GRPC_CLOSURE_CREATE(destroy_transport_locked, this, nullptr),
741 absl::OkStatus());
742 }
743
close_transport_locked(grpc_chttp2_transport * t,grpc_error_handle error)744 static void close_transport_locked(grpc_chttp2_transport* t,
745 grpc_error_handle error) {
746 end_all_the_calls(t, error);
747 cancel_pings(t, error);
748 if (t->closed_with_error.ok()) {
749 if (!grpc_error_has_clear_grpc_status(error)) {
750 error =
751 grpc_error_set_int(error, grpc_core::StatusIntProperty::kRpcStatus,
752 GRPC_STATUS_UNAVAILABLE);
753 }
754 if (t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE) {
755 if (t->close_transport_on_writes_finished.ok()) {
756 t->close_transport_on_writes_finished =
757 GRPC_ERROR_CREATE("Delayed close due to in-progress write");
758 }
759 t->close_transport_on_writes_finished =
760 grpc_error_add_child(t->close_transport_on_writes_finished, error);
761 return;
762 }
763 CHECK(!error.ok());
764 t->closed_with_error = error;
765 connectivity_state_set(t, GRPC_CHANNEL_SHUTDOWN, absl::Status(),
766 "close_transport");
767 if (t->keepalive_ping_timeout_handle != TaskHandle::kInvalid) {
768 t->event_engine->Cancel(std::exchange(t->keepalive_ping_timeout_handle,
769 TaskHandle::kInvalid));
770 }
771 if (t->settings_ack_watchdog != TaskHandle::kInvalid) {
772 t->event_engine->Cancel(
773 std::exchange(t->settings_ack_watchdog, TaskHandle::kInvalid));
774 }
775 if (t->delayed_ping_timer_handle != TaskHandle::kInvalid &&
776 t->event_engine->Cancel(t->delayed_ping_timer_handle)) {
777 t->delayed_ping_timer_handle = TaskHandle::kInvalid;
778 }
779 if (t->next_bdp_ping_timer_handle != TaskHandle::kInvalid &&
780 t->event_engine->Cancel(t->next_bdp_ping_timer_handle)) {
781 t->next_bdp_ping_timer_handle = TaskHandle::kInvalid;
782 }
783 switch (t->keepalive_state) {
784 case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING:
785 if (t->keepalive_ping_timer_handle != TaskHandle::kInvalid &&
786 t->event_engine->Cancel(t->keepalive_ping_timer_handle)) {
787 t->keepalive_ping_timer_handle = TaskHandle::kInvalid;
788 }
789 break;
790 case GRPC_CHTTP2_KEEPALIVE_STATE_PINGING:
791 if (t->keepalive_ping_timer_handle != TaskHandle::kInvalid &&
792 t->event_engine->Cancel(t->keepalive_ping_timer_handle)) {
793 t->keepalive_ping_timer_handle = TaskHandle::kInvalid;
794 }
795 break;
796 case GRPC_CHTTP2_KEEPALIVE_STATE_DYING:
797 case GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED:
798 // keepalive timers are not set in these two states
799 break;
800 }
801
802 // flush writable stream list to avoid dangling references
803 grpc_chttp2_stream* s;
804 while (grpc_chttp2_list_pop_writable_stream(t, &s)) {
805 GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:close");
806 }
807 CHECK(t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE);
808 if (t->interested_parties_until_recv_settings != nullptr) {
809 grpc_endpoint_delete_from_pollset_set(
810 t->ep.get(), t->interested_parties_until_recv_settings);
811 t->interested_parties_until_recv_settings = nullptr;
812 }
813 grpc_core::MutexLock lock(&t->ep_destroy_mu);
814 t->ep.reset();
815 }
816 if (t->notify_on_receive_settings != nullptr) {
817 if (t->interested_parties_until_recv_settings != nullptr) {
818 grpc_endpoint_delete_from_pollset_set(
819 t->ep.get(), t->interested_parties_until_recv_settings);
820 t->interested_parties_until_recv_settings = nullptr;
821 }
822 grpc_core::ExecCtx::Run(DEBUG_LOCATION, t->notify_on_receive_settings,
823 error);
824 t->notify_on_receive_settings = nullptr;
825 }
826 if (t->notify_on_close != nullptr) {
827 grpc_core::ExecCtx::Run(DEBUG_LOCATION, t->notify_on_close, error);
828 t->notify_on_close = nullptr;
829 }
830 }
831
832 #ifndef NDEBUG
grpc_chttp2_stream_ref(grpc_chttp2_stream * s,const char * reason)833 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s, const char* reason) {
834 grpc_stream_ref(s->refcount, reason);
835 }
grpc_chttp2_stream_unref(grpc_chttp2_stream * s,const char * reason)836 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s, const char* reason) {
837 grpc_stream_unref(s->refcount, reason);
838 }
839 #else
grpc_chttp2_stream_ref(grpc_chttp2_stream * s)840 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s) {
841 grpc_stream_ref(s->refcount);
842 }
grpc_chttp2_stream_unref(grpc_chttp2_stream * s)843 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s) {
844 grpc_stream_unref(s->refcount);
845 }
846 #endif
847
grpc_chttp2_stream(grpc_chttp2_transport * t,grpc_stream_refcount * refcount,const void * server_data,grpc_core::Arena * arena)848 grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t,
849 grpc_stream_refcount* refcount,
850 const void* server_data,
851 grpc_core::Arena* arena)
852 : t(t->Ref()),
853 refcount([refcount]() {
854 // We reserve one 'active stream' that's dropped when the stream is
855 // read-closed. The others are for Chttp2IncomingByteStreams that are
856 // actively reading
857 // We do this here to avoid cache misses.
858 #ifndef NDEBUG
859 grpc_stream_ref(refcount, "chttp2");
860 #else
861 grpc_stream_ref(refcount);
862 #endif
863 return refcount;
864 }()),
865 arena(arena),
866 flow_control(&t->flow_control),
867 call_tracer_wrapper(this) {
868 t->streams_allocated.fetch_add(1, std::memory_order_relaxed);
869 if (server_data) {
870 id = static_cast<uint32_t>(reinterpret_cast<uintptr_t>(server_data));
871 GRPC_TRACE_VLOG(http, 2)
872 << "HTTP:" << t << "/" << this << " creating accept stream " << id
873 << " [from " << server_data << "]";
874 *t->accepting_stream = this;
875 t->stream_map.emplace(id, this);
876 post_destructive_reclaimer(t);
877 }
878
879 grpc_slice_buffer_init(&frame_storage);
880 grpc_slice_buffer_init(&flow_controlled_buffer);
881 }
882
~grpc_chttp2_stream()883 grpc_chttp2_stream::~grpc_chttp2_stream() {
884 t->streams_allocated.fetch_sub(1, std::memory_order_relaxed);
885 grpc_chttp2_list_remove_stalled_by_stream(t.get(), this);
886 grpc_chttp2_list_remove_stalled_by_transport(t.get(), this);
887
888 if (t->channelz_socket != nullptr) {
889 if ((t->is_client && eos_received) || (!t->is_client && eos_sent)) {
890 t->channelz_socket->RecordStreamSucceeded();
891 } else {
892 t->channelz_socket->RecordStreamFailed();
893 }
894 }
895
896 CHECK((write_closed && read_closed) || id == 0);
897 if (id != 0) {
898 CHECK_EQ(t->stream_map.count(id), 0u);
899 }
900
901 grpc_slice_buffer_destroy(&frame_storage);
902
903 for (int i = 0; i < STREAM_LIST_COUNT; i++) {
904 if (GPR_UNLIKELY(included.is_set(i))) {
905 grpc_core::Crash(absl::StrFormat("%s stream %d still included in list %d",
906 t->is_client ? "client" : "server", id,
907 i));
908 }
909 }
910
911 CHECK_EQ(send_initial_metadata_finished, nullptr);
912 CHECK_EQ(send_trailing_metadata_finished, nullptr);
913 CHECK_EQ(recv_initial_metadata_ready, nullptr);
914 CHECK_EQ(recv_message_ready, nullptr);
915 CHECK_EQ(recv_trailing_metadata_finished, nullptr);
916 grpc_slice_buffer_destroy(&flow_controlled_buffer);
917 grpc_core::ExecCtx::Run(DEBUG_LOCATION, destroy_stream_arg, absl::OkStatus());
918 }
919
InitStream(grpc_stream * gs,grpc_stream_refcount * refcount,const void * server_data,grpc_core::Arena * arena)920 void grpc_chttp2_transport::InitStream(grpc_stream* gs,
921 grpc_stream_refcount* refcount,
922 const void* server_data,
923 grpc_core::Arena* arena) {
924 new (gs) grpc_chttp2_stream(this, refcount, server_data, arena);
925 }
926
destroy_stream_locked(void * sp,grpc_error_handle)927 static void destroy_stream_locked(void* sp, grpc_error_handle /*error*/) {
928 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(sp);
929 s->~grpc_chttp2_stream();
930 }
931
DestroyStream(grpc_stream * gs,grpc_closure * then_schedule_closure)932 void grpc_chttp2_transport::DestroyStream(grpc_stream* gs,
933 grpc_closure* then_schedule_closure) {
934 grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs);
935
936 s->destroy_stream_arg = then_schedule_closure;
937 combiner->Run(
938 GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s, nullptr),
939 absl::OkStatus());
940 }
941
grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport * t,uint32_t id)942 grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t,
943 uint32_t id) {
944 if (t->accept_stream_cb == nullptr) {
945 return nullptr;
946 }
947 grpc_chttp2_stream* accepting = nullptr;
948 CHECK_EQ(t->accepting_stream, nullptr);
949 t->accepting_stream = &accepting;
950 t->accept_stream_cb(t->accept_stream_cb_user_data, t,
951 reinterpret_cast<void*>(id));
952 t->accepting_stream = nullptr;
953 return accepting;
954 }
955
956 //
957 // OUTPUT PROCESSING
958 //
959
get_write_state_name(grpc_chttp2_write_state st)960 static const char* get_write_state_name(grpc_chttp2_write_state st) {
961 switch (st) {
962 case GRPC_CHTTP2_WRITE_STATE_IDLE:
963 return "IDLE";
964 case GRPC_CHTTP2_WRITE_STATE_WRITING:
965 return "WRITING";
966 case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
967 return "WRITING+MORE";
968 }
969 GPR_UNREACHABLE_CODE(return "UNKNOWN");
970 }
971
set_write_state(grpc_chttp2_transport * t,grpc_chttp2_write_state st,const char * reason)972 static void set_write_state(grpc_chttp2_transport* t,
973 grpc_chttp2_write_state st, const char* reason) {
974 GRPC_TRACE_LOG(http, INFO)
975 << "W:" << t << " " << (t->is_client ? "CLIENT" : "SERVER") << " ["
976 << t->peer_string.as_string_view() << "] state "
977 << get_write_state_name(t->write_state) << " -> "
978 << get_write_state_name(st) << " [" << reason << "]";
979 t->write_state = st;
980 // If the state is being reset back to idle, it means a write was just
981 // finished. Make sure all the run_after_write closures are scheduled.
982 //
983 // This is also our chance to close the transport if the transport was marked
984 // to be closed after all writes finish (for example, if we received a go-away
985 // from peer while we had some pending writes)
986 if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) {
987 grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &t->run_after_write);
988 if (!t->close_transport_on_writes_finished.ok()) {
989 grpc_error_handle err = t->close_transport_on_writes_finished;
990 t->close_transport_on_writes_finished = absl::OkStatus();
991 close_transport_locked(t, err);
992 }
993 }
994 }
995
grpc_chttp2_initiate_write(grpc_chttp2_transport * t,grpc_chttp2_initiate_write_reason reason)996 void grpc_chttp2_initiate_write(grpc_chttp2_transport* t,
997 grpc_chttp2_initiate_write_reason reason) {
998 switch (t->write_state) {
999 case GRPC_CHTTP2_WRITE_STATE_IDLE:
1000 set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING,
1001 grpc_chttp2_initiate_write_reason_string(reason));
1002 // Note that the 'write_action_begin_locked' closure is being scheduled
1003 // on the 'finally_scheduler' of t->combiner. This means that
1004 // 'write_action_begin_locked' is called only *after* all the other
1005 // closures (some of which are potentially initiating more writes on the
1006 // transport) are executed on the t->combiner.
1007 //
1008 // The reason for scheduling on finally_scheduler is to make sure we batch
1009 // as many writes as possible. 'write_action_begin_locked' is the function
1010 // that gathers all the relevant bytes (which are at various places in the
1011 // grpc_chttp2_transport structure) and append them to 'outbuf' field in
1012 // grpc_chttp2_transport thereby batching what would have been potentially
1013 // multiple write operations.
1014 //
1015 // Also, 'write_action_begin_locked' only gathers the bytes into outbuf.
1016 // It does not call the endpoint to write the bytes. That is done by the
1017 // 'write_action' (which is scheduled by 'write_action_begin_locked')
1018 t->combiner->FinallyRun(
1019 grpc_core::InitTransportClosure<write_action_begin_locked>(
1020 t->Ref(), &t->write_action_begin_locked),
1021 absl::OkStatus());
1022 break;
1023 case GRPC_CHTTP2_WRITE_STATE_WRITING:
1024 set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
1025 grpc_chttp2_initiate_write_reason_string(reason));
1026 break;
1027 case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
1028 break;
1029 }
1030 }
1031
grpc_chttp2_mark_stream_writable(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1032 void grpc_chttp2_mark_stream_writable(grpc_chttp2_transport* t,
1033 grpc_chttp2_stream* s) {
1034 if (t->closed_with_error.ok() && grpc_chttp2_list_add_writable_stream(t, s)) {
1035 GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become");
1036 }
1037 }
1038
begin_writing_desc(bool partial)1039 static const char* begin_writing_desc(bool partial) {
1040 if (partial) {
1041 return "begin partial write in background";
1042 } else {
1043 return "begin write in current thread";
1044 }
1045 }
1046
write_action_begin_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle)1047 static void write_action_begin_locked(
1048 grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
1049 grpc_error_handle /*error_ignored*/) {
1050 GRPC_LATENT_SEE_INNER_SCOPE("write_action_begin_locked");
1051 CHECK(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE);
1052 grpc_chttp2_begin_write_result r;
1053 if (!t->closed_with_error.ok()) {
1054 r.writing = false;
1055 } else {
1056 r = grpc_chttp2_begin_write(t.get());
1057 }
1058 if (r.writing) {
1059 set_write_state(t.get(),
1060 r.partial ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE
1061 : GRPC_CHTTP2_WRITE_STATE_WRITING,
1062 begin_writing_desc(r.partial));
1063 write_action(t.get());
1064 if (t->reading_paused_on_pending_induced_frames) {
1065 CHECK_EQ(t->num_pending_induced_frames, 0u);
1066 // We had paused reading, because we had many induced frames (SETTINGS
1067 // ACK, PINGS ACK and RST_STREAMS) pending in t->qbuf. Now that we have
1068 // been able to flush qbuf, we can resume reading.
1069 GRPC_TRACE_LOG(http, INFO)
1070 << "transport " << t.get()
1071 << " : Resuming reading after being paused due to too many unwritten "
1072 "SETTINGS ACK, PINGS ACK and RST_STREAM frames";
1073 t->reading_paused_on_pending_induced_frames = false;
1074 continue_read_action_locked(std::move(t));
1075 }
1076 } else {
1077 set_write_state(t.get(), GRPC_CHTTP2_WRITE_STATE_IDLE,
1078 "begin writing nothing");
1079 }
1080 }
1081
write_action(grpc_chttp2_transport * t)1082 static void write_action(grpc_chttp2_transport* t) {
1083 void* cl = t->context_list;
1084 if (!t->context_list->empty()) {
1085 // Transfer the ownership of the context list to the endpoint and create and
1086 // associate a new context list with the transport.
1087 // The old context list is stored in the cl local variable which is passed
1088 // to the endpoint. Its upto the endpoint to manage its lifetime.
1089 t->context_list = new grpc_core::ContextList();
1090 } else {
1091 // t->cl is Empty. There is nothing to trace in this endpoint_write. set cl
1092 // to nullptr.
1093 cl = nullptr;
1094 }
1095 // Choose max_frame_size as the preferred rx crypto frame size indicated by
1096 // the peer.
1097 int max_frame_size =
1098 t->settings.peer().preferred_receive_crypto_message_size();
1099 // Note: max frame size is 0 if the remote peer does not support adjusting the
1100 // sending frame size.
1101 if (max_frame_size == 0) {
1102 max_frame_size = INT_MAX;
1103 }
1104 GRPC_TRACE_LOG(http2_ping, INFO)
1105 << (t->is_client ? "CLIENT" : "SERVER") << "[" << t << "]: Write "
1106 << t->outbuf.Length() << " bytes";
1107 t->write_size_policy.BeginWrite(t->outbuf.Length());
1108 grpc_endpoint_write(t->ep.get(), t->outbuf.c_slice_buffer(),
1109 grpc_core::InitTransportClosure<write_action_end>(
1110 t->Ref(), &t->write_action_end_locked),
1111 cl, max_frame_size);
1112 }
1113
write_action_end(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)1114 static void write_action_end(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
1115 grpc_error_handle error) {
1116 auto* tp = t.get();
1117 GRPC_TRACE_LOG(http2_ping, INFO) << (t->is_client ? "CLIENT" : "SERVER")
1118 << "[" << t.get() << "]: Finish write";
1119 tp->combiner->Run(grpc_core::InitTransportClosure<write_action_end_locked>(
1120 std::move(t), &tp->write_action_end_locked),
1121 error);
1122 }
1123
1124 // Callback from the grpc_endpoint after bytes have been written by calling
1125 // sendmsg
write_action_end_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)1126 static void write_action_end_locked(
1127 grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
1128 grpc_error_handle error) {
1129 t->write_size_policy.EndWrite(error.ok());
1130
1131 bool closed = false;
1132 if (!error.ok()) {
1133 close_transport_locked(t.get(), error);
1134 closed = true;
1135 }
1136
1137 if (t->sent_goaway_state == GRPC_CHTTP2_FINAL_GOAWAY_SEND_SCHEDULED) {
1138 t->sent_goaway_state = GRPC_CHTTP2_FINAL_GOAWAY_SENT;
1139 closed = true;
1140 if (t->stream_map.empty()) {
1141 close_transport_locked(t.get(), GRPC_ERROR_CREATE("goaway sent"));
1142 }
1143 }
1144
1145 switch (t->write_state) {
1146 case GRPC_CHTTP2_WRITE_STATE_IDLE:
1147 GPR_UNREACHABLE_CODE(break);
1148 case GRPC_CHTTP2_WRITE_STATE_WRITING:
1149 set_write_state(t.get(), GRPC_CHTTP2_WRITE_STATE_IDLE, "finish writing");
1150 break;
1151 case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
1152 set_write_state(t.get(), GRPC_CHTTP2_WRITE_STATE_WRITING,
1153 "continue writing");
1154 // If the transport is closed, we will retry writing on the endpoint
1155 // and next write may contain part of the currently serialized frames.
1156 // So, we should only call the run_after_write callbacks when the next
1157 // write finishes, or the callbacks will be invoked when the stream is
1158 // closed.
1159 if (!closed) {
1160 grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &t->run_after_write);
1161 }
1162 t->combiner->FinallyRun(
1163 grpc_core::InitTransportClosure<write_action_begin_locked>(
1164 t, &t->write_action_begin_locked),
1165 absl::OkStatus());
1166 break;
1167 }
1168
1169 grpc_chttp2_end_write(t.get(), error);
1170 }
1171
1172 // Cancel out streams that haven't yet started if we have received a GOAWAY
cancel_unstarted_streams(grpc_chttp2_transport * t,grpc_error_handle error,bool tarpit)1173 static void cancel_unstarted_streams(grpc_chttp2_transport* t,
1174 grpc_error_handle error, bool tarpit) {
1175 grpc_chttp2_stream* s;
1176 while (grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1177 s->trailing_metadata_buffer.Set(
1178 grpc_core::GrpcStreamNetworkState(),
1179 grpc_core::GrpcStreamNetworkState::kNotSentOnWire);
1180 grpc_chttp2_cancel_stream(t, s, error, tarpit);
1181 }
1182 }
1183
grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport * t,uint32_t goaway_error,uint32_t last_stream_id,absl::string_view goaway_text)1184 void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
1185 uint32_t goaway_error,
1186 uint32_t last_stream_id,
1187 absl::string_view goaway_text) {
1188 t->goaway_error = grpc_error_set_int(
1189 grpc_error_set_int(
1190 grpc_core::StatusCreate(
1191 absl::StatusCode::kUnavailable,
1192 absl::StrFormat("GOAWAY received; Error code: %u; Debug Text: %s",
1193 goaway_error, goaway_text),
1194 DEBUG_LOCATION, {}),
1195 grpc_core::StatusIntProperty::kHttp2Error,
1196 static_cast<intptr_t>(goaway_error)),
1197 grpc_core::StatusIntProperty::kRpcStatus, GRPC_STATUS_UNAVAILABLE);
1198
1199 GRPC_TRACE_LOG(http, INFO)
1200 << "transport " << t << " got goaway with last stream id "
1201 << last_stream_id;
1202 // We want to log this irrespective of whether http tracing is enabled if we
1203 // received a GOAWAY with a non NO_ERROR code.
1204 if (goaway_error != GRPC_HTTP2_NO_ERROR) {
1205 LOG(INFO) << t->peer_string.as_string_view() << ": Got goaway ["
1206 << goaway_error
1207 << "] err=" << grpc_core::StatusToString(t->goaway_error);
1208 }
1209 if (t->is_client) {
1210 cancel_unstarted_streams(t, t->goaway_error, false);
1211 // Cancel all unseen streams
1212 std::vector<grpc_chttp2_stream*> to_cancel;
1213 for (auto id_stream : t->stream_map) {
1214 if (id_stream.first > last_stream_id) {
1215 to_cancel.push_back(id_stream.second);
1216 }
1217 }
1218 for (auto s : to_cancel) {
1219 s->trailing_metadata_buffer.Set(
1220 grpc_core::GrpcStreamNetworkState(),
1221 grpc_core::GrpcStreamNetworkState::kNotSeenByServer);
1222 grpc_chttp2_cancel_stream(s->t.get(), s, s->t->goaway_error, false);
1223 }
1224 }
1225 absl::Status status = grpc_error_to_absl_status(t->goaway_error);
1226 // When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug
1227 // data equal to "too_many_pings", it should log the occurrence at a log level
1228 // that is enabled by default and double the configured KEEPALIVE_TIME used
1229 // for new connections on that channel.
1230 if (GPR_UNLIKELY(t->is_client &&
1231 goaway_error == GRPC_HTTP2_ENHANCE_YOUR_CALM &&
1232 goaway_text == "too_many_pings")) {
1233 LOG(ERROR) << t->peer_string.as_string_view()
1234 << ": Received a GOAWAY with error code ENHANCE_YOUR_CALM and "
1235 "debug data equal to \"too_many_pings\". Current keepalive "
1236 "time (before throttling): "
1237 << t->keepalive_time.ToString();
1238 constexpr int max_keepalive_time_millis =
1239 INT_MAX / KEEPALIVE_TIME_BACKOFF_MULTIPLIER;
1240 int64_t throttled_keepalive_time =
1241 t->keepalive_time.millis() > max_keepalive_time_millis
1242 ? INT_MAX
1243 : t->keepalive_time.millis() * KEEPALIVE_TIME_BACKOFF_MULTIPLIER;
1244 status.SetPayload(grpc_core::kKeepaliveThrottlingKey,
1245 absl::Cord(std::to_string(throttled_keepalive_time)));
1246 }
1247 // lie: use transient failure from the transport to indicate goaway has been
1248 // received.
1249 if (!grpc_core::test_only_disable_transient_failure_state_notification) {
1250 connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE, status,
1251 "got_goaway");
1252 }
1253 }
1254
maybe_start_some_streams(grpc_chttp2_transport * t)1255 static void maybe_start_some_streams(grpc_chttp2_transport* t) {
1256 grpc_chttp2_stream* s;
1257 // maybe cancel out streams that haven't yet started if we have received a
1258 // GOAWAY
1259 if (!t->goaway_error.ok()) {
1260 cancel_unstarted_streams(t, t->goaway_error, false);
1261 return;
1262 }
1263 // start streams where we have free grpc_chttp2_stream ids and free
1264 // * concurrency
1265 while (t->next_stream_id <= MAX_CLIENT_STREAM_ID &&
1266 t->stream_map.size() < t->settings.peer().max_concurrent_streams() &&
1267 grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1268 // safe since we can't (legally) be parsing this stream yet
1269 GRPC_TRACE_LOG(http, INFO)
1270 << "HTTP:" << (t->is_client ? "CLI" : "SVR") << ": Transport " << t
1271 << " allocating new grpc_chttp2_stream " << s << " to id "
1272 << t->next_stream_id;
1273
1274 CHECK_EQ(s->id, 0u);
1275 s->id = t->next_stream_id;
1276 t->next_stream_id += 2;
1277
1278 if (t->next_stream_id >= MAX_CLIENT_STREAM_ID) {
1279 connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE,
1280 absl::Status(absl::StatusCode::kUnavailable,
1281 "Transport Stream IDs exhausted"),
1282 "no_more_stream_ids");
1283 }
1284
1285 t->stream_map.emplace(s->id, s);
1286 post_destructive_reclaimer(t);
1287 grpc_chttp2_mark_stream_writable(t, s);
1288 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM);
1289 }
1290 // cancel out streams that will never be started
1291 if (t->next_stream_id >= MAX_CLIENT_STREAM_ID) {
1292 while (grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1293 s->trailing_metadata_buffer.Set(
1294 grpc_core::GrpcStreamNetworkState(),
1295 grpc_core::GrpcStreamNetworkState::kNotSentOnWire);
1296 grpc_chttp2_cancel_stream(
1297 t, s,
1298 grpc_error_set_int(GRPC_ERROR_CREATE("Stream IDs exhausted"),
1299 grpc_core::StatusIntProperty::kRpcStatus,
1300 GRPC_STATUS_UNAVAILABLE),
1301 false);
1302 }
1303 }
1304 }
1305
add_closure_barrier(grpc_closure * closure)1306 static grpc_closure* add_closure_barrier(grpc_closure* closure) {
1307 closure->next_data.scratch += CLOSURE_BARRIER_FIRST_REF_BIT;
1308 return closure;
1309 }
1310
null_then_sched_closure(grpc_closure ** closure)1311 static void null_then_sched_closure(grpc_closure** closure) {
1312 grpc_closure* c = *closure;
1313 *closure = nullptr;
1314 // null_then_schedule_closure might be run during a start_batch which might
1315 // subsequently examine the batch for more operations contained within.
1316 // However, the closure run might make it back to the call object, push a
1317 // completion, have the application see it, and make a new operation on the
1318 // call which recycles the batch BEFORE the call to start_batch completes,
1319 // forcing a race.
1320 grpc_core::ExecCtx::Run(DEBUG_LOCATION, c, absl::OkStatus());
1321 }
1322
grpc_chttp2_complete_closure_step(grpc_chttp2_transport * t,grpc_closure ** pclosure,grpc_error_handle error,const char * desc,grpc_core::DebugLocation whence)1323 void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
1324 grpc_closure** pclosure,
1325 grpc_error_handle error,
1326 const char* desc,
1327 grpc_core::DebugLocation whence) {
1328 grpc_closure* closure = *pclosure;
1329 *pclosure = nullptr;
1330 if (closure == nullptr) {
1331 return;
1332 }
1333 closure->next_data.scratch -= CLOSURE_BARRIER_FIRST_REF_BIT;
1334 GRPC_TRACE_LOG(http, INFO)
1335 << "complete_closure_step: t=" << t << " " << closure << " refs="
1336 << (closure->next_data.scratch / CLOSURE_BARRIER_FIRST_REF_BIT)
1337 << " flags="
1338 << (closure->next_data.scratch % CLOSURE_BARRIER_FIRST_REF_BIT)
1339 << " desc=" << desc << " err=" << grpc_core::StatusToString(error)
1340 << " write_state=" << get_write_state_name(t->write_state)
1341 << " whence=" << whence.file() << ":" << whence.line();
1342
1343 if (!error.ok()) {
1344 grpc_error_handle cl_err =
1345 grpc_core::internal::StatusMoveFromHeapPtr(closure->error_data.error);
1346 if (cl_err.ok()) {
1347 cl_err = GRPC_ERROR_CREATE(absl::StrCat(
1348 "Error in HTTP transport completing operation: ", desc,
1349 " write_state=", get_write_state_name(t->write_state),
1350 " refs=", closure->next_data.scratch / CLOSURE_BARRIER_FIRST_REF_BIT,
1351 " flags=", closure->next_data.scratch % CLOSURE_BARRIER_FIRST_REF_BIT,
1352 " peer_address=", t->peer_string.as_string_view()));
1353 }
1354 cl_err = grpc_error_add_child(cl_err, error);
1355 closure->error_data.error = grpc_core::internal::StatusAllocHeapPtr(cl_err);
1356 }
1357 if (closure->next_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) {
1358 if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) ||
1359 !(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) {
1360 // Using GRPC_CLOSURE_SCHED instead of GRPC_CLOSURE_RUN to avoid running
1361 // closures earlier than when it is safe to do so.
1362 grpc_error_handle run_error =
1363 grpc_core::internal::StatusMoveFromHeapPtr(closure->error_data.error);
1364 closure->error_data.error = 0;
1365 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, run_error);
1366 } else {
1367 grpc_closure_list_append(&t->run_after_write, closure);
1368 }
1369 }
1370 }
1371
contains_non_ok_status(grpc_metadata_batch * batch)1372 static bool contains_non_ok_status(grpc_metadata_batch* batch) {
1373 return batch->get(grpc_core::GrpcStatusMetadata()).value_or(GRPC_STATUS_OK) !=
1374 GRPC_STATUS_OK;
1375 }
1376
log_metadata(const grpc_metadata_batch * md_batch,uint32_t id,const bool is_client,const bool is_initial)1377 static void log_metadata(const grpc_metadata_batch* md_batch, uint32_t id,
1378 const bool is_client, const bool is_initial) {
1379 VLOG(2) << "--metadata--";
1380 const std::string prefix = absl::StrCat(
1381 "HTTP:", id, is_initial ? ":HDR" : ":TRL", is_client ? ":CLI:" : ":SVR:");
1382 md_batch->Log([&prefix](absl::string_view key, absl::string_view value) {
1383 VLOG(2) << prefix << key << ": " << value;
1384 });
1385 }
1386
trace_annotations(grpc_chttp2_stream * s)1387 static void trace_annotations(grpc_chttp2_stream* s) {
1388 if (!grpc_core::IsCallTracerInTransportEnabled()) {
1389 if (s->call_tracer != nullptr) {
1390 s->call_tracer->RecordAnnotation(
1391 grpc_core::HttpAnnotation(grpc_core::HttpAnnotation::Type::kStart,
1392 gpr_now(GPR_CLOCK_REALTIME))
1393 .Add(s->t->flow_control.stats())
1394 .Add(s->flow_control.stats()));
1395 }
1396 } else if (grpc_core::IsTraceRecordCallopsEnabled()) {
1397 auto* call_tracer = s->arena->GetContext<grpc_core::CallTracerInterface>();
1398 if (call_tracer != nullptr && call_tracer->IsSampled()) {
1399 call_tracer->RecordAnnotation(
1400 grpc_core::HttpAnnotation(grpc_core::HttpAnnotation::Type::kStart,
1401 gpr_now(GPR_CLOCK_REALTIME))
1402 .Add(s->t->flow_control.stats())
1403 .Add(s->flow_control.stats()));
1404 }
1405 }
1406 }
1407
send_initial_metadata_locked(grpc_transport_stream_op_batch * op,grpc_chttp2_stream * s,grpc_transport_stream_op_batch_payload * op_payload,grpc_chttp2_transport * t,grpc_closure * on_complete)1408 static void send_initial_metadata_locked(
1409 grpc_transport_stream_op_batch* op, grpc_chttp2_stream* s,
1410 grpc_transport_stream_op_batch_payload* op_payload,
1411 grpc_chttp2_transport* t, grpc_closure* on_complete) {
1412 trace_annotations(s);
1413 if (t->is_client && t->channelz_socket != nullptr) {
1414 t->channelz_socket->RecordStreamStartedFromLocal();
1415 }
1416 CHECK_EQ(s->send_initial_metadata_finished, nullptr);
1417 on_complete->next_data.scratch |= t->closure_barrier_may_cover_write;
1418
1419 s->send_initial_metadata_finished = add_closure_barrier(on_complete);
1420 s->send_initial_metadata =
1421 op_payload->send_initial_metadata.send_initial_metadata;
1422 if (t->is_client) {
1423 s->deadline =
1424 std::min(s->deadline,
1425 s->send_initial_metadata->get(grpc_core::GrpcTimeoutMetadata())
1426 .value_or(grpc_core::Timestamp::InfFuture()));
1427 }
1428 if (contains_non_ok_status(s->send_initial_metadata)) {
1429 s->seen_error = true;
1430 }
1431 if (!s->write_closed) {
1432 if (t->is_client) {
1433 if (t->closed_with_error.ok()) {
1434 CHECK_EQ(s->id, 0u);
1435 if (t->max_concurrent_streams_reject_on_client &&
1436 t->stream_map.size() >=
1437 t->settings.peer().max_concurrent_streams()) {
1438 s->trailing_metadata_buffer.Set(
1439 grpc_core::GrpcStreamNetworkState(),
1440 grpc_core::GrpcStreamNetworkState::kNotSentOnWire);
1441 grpc_chttp2_cancel_stream(
1442 t, s,
1443 grpc_error_set_int(
1444 GRPC_ERROR_CREATE_REFERENCING("Too many streams",
1445 &t->closed_with_error, 1),
1446 grpc_core::StatusIntProperty::kRpcStatus,
1447 GRPC_STATUS_RESOURCE_EXHAUSTED),
1448 false);
1449 } else {
1450 grpc_chttp2_list_add_waiting_for_concurrency(t, s);
1451 maybe_start_some_streams(t);
1452 }
1453 } else {
1454 s->trailing_metadata_buffer.Set(
1455 grpc_core::GrpcStreamNetworkState(),
1456 grpc_core::GrpcStreamNetworkState::kNotSentOnWire);
1457 grpc_chttp2_cancel_stream(
1458 t, s,
1459 grpc_error_set_int(
1460 GRPC_ERROR_CREATE_REFERENCING("Transport closed",
1461 &t->closed_with_error, 1),
1462 grpc_core::StatusIntProperty::kRpcStatus,
1463 GRPC_STATUS_UNAVAILABLE),
1464 false);
1465 }
1466 } else {
1467 CHECK_NE(s->id, 0u);
1468 grpc_chttp2_mark_stream_writable(t, s);
1469 if (!(op->send_message &&
1470 (op->payload->send_message.flags & GRPC_WRITE_BUFFER_HINT))) {
1471 grpc_chttp2_initiate_write(
1472 t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA);
1473 }
1474 }
1475 } else {
1476 s->send_initial_metadata = nullptr;
1477 grpc_chttp2_complete_closure_step(
1478 t, &s->send_initial_metadata_finished,
1479 GRPC_ERROR_CREATE_REFERENCING(
1480 "Attempt to send initial metadata after stream was closed",
1481 &s->write_closed_error, 1),
1482 "send_initial_metadata_finished");
1483 }
1484 }
1485
send_message_locked(grpc_transport_stream_op_batch * op,grpc_chttp2_stream * s,grpc_transport_stream_op_batch_payload * op_payload,grpc_chttp2_transport * t,grpc_closure * on_complete)1486 static void send_message_locked(
1487 grpc_transport_stream_op_batch* op, grpc_chttp2_stream* s,
1488 grpc_transport_stream_op_batch_payload* op_payload,
1489 grpc_chttp2_transport* t, grpc_closure* on_complete) {
1490 t->num_messages_in_next_write++;
1491 grpc_core::global_stats().IncrementHttp2SendMessageSize(
1492 op->payload->send_message.send_message->Length());
1493 on_complete->next_data.scratch |= t->closure_barrier_may_cover_write;
1494 s->send_message_finished = add_closure_barrier(op->on_complete);
1495 uint32_t flags = 0;
1496 if (s->write_closed) {
1497 op->payload->send_message.stream_write_closed = true;
1498 // We should NOT return an error here, so as to avoid a cancel OP being
1499 // started. The surface layer will notice that the stream has been closed
1500 // for writes and fail the send message op.
1501 grpc_chttp2_complete_closure_step(t, &s->send_message_finished,
1502 absl::OkStatus(),
1503 "fetching_send_message_finished");
1504 } else {
1505 // Buffer hint is used to buffer the message in the transport until the
1506 // write buffer size (specified through GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE) is
1507 // reached. This is to batch writes sent down to tcp. However, if the memory
1508 // pressure is high, disable the buffer hint to flush data down to tcp as
1509 // soon as possible to avoid OOM.
1510 if (grpc_core::IsDisableBufferHintOnHighMemoryPressureEnabled() &&
1511 t->memory_owner.GetPressureInfo().pressure_control_value >= 0.8) {
1512 // Disable write buffer hint if memory pressure is high. The value of 0.8
1513 // is chosen to match the threshold used by the tcp endpoint (in
1514 // allocating memory for socket reads).
1515 op_payload->send_message.flags &= ~GRPC_WRITE_BUFFER_HINT;
1516 }
1517 flags = op_payload->send_message.flags;
1518 uint8_t* frame_hdr = grpc_slice_buffer_tiny_add(&s->flow_controlled_buffer,
1519 GRPC_HEADER_SIZE_IN_BYTES);
1520 frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0;
1521 size_t len = op_payload->send_message.send_message->Length();
1522 frame_hdr[1] = static_cast<uint8_t>(len >> 24);
1523 frame_hdr[2] = static_cast<uint8_t>(len >> 16);
1524 frame_hdr[3] = static_cast<uint8_t>(len >> 8);
1525 frame_hdr[4] = static_cast<uint8_t>(len);
1526
1527 s->call_tracer_wrapper.RecordOutgoingBytes(
1528 {GRPC_HEADER_SIZE_IN_BYTES, len, 0});
1529 s->next_message_end_offset =
1530 s->flow_controlled_bytes_written +
1531 static_cast<int64_t>(s->flow_controlled_buffer.length) +
1532 static_cast<int64_t>(len);
1533 if (flags & GRPC_WRITE_BUFFER_HINT) {
1534 s->next_message_end_offset -= t->write_buffer_size;
1535 s->write_buffering = true;
1536 } else {
1537 s->write_buffering = false;
1538 }
1539
1540 grpc_slice* const slices =
1541 op_payload->send_message.send_message->c_slice_buffer()->slices;
1542 grpc_slice* const end =
1543 slices + op_payload->send_message.send_message->Count();
1544 for (grpc_slice* slice = slices; slice != end; slice++) {
1545 grpc_slice_buffer_add(&s->flow_controlled_buffer,
1546 grpc_core::CSliceRef(*slice));
1547 }
1548
1549 int64_t notify_offset = s->next_message_end_offset;
1550 if (notify_offset <= s->flow_controlled_bytes_written) {
1551 grpc_chttp2_complete_closure_step(t, &s->send_message_finished,
1552 absl::OkStatus(),
1553 "fetching_send_message_finished");
1554 } else {
1555 grpc_chttp2_write_cb* cb = t->write_cb_pool;
1556 if (cb == nullptr) {
1557 cb = static_cast<grpc_chttp2_write_cb*>(gpr_malloc(sizeof(*cb)));
1558 } else {
1559 t->write_cb_pool = cb->next;
1560 }
1561 cb->call_at_byte = notify_offset;
1562 cb->closure = s->send_message_finished;
1563 s->send_message_finished = nullptr;
1564 grpc_chttp2_write_cb** list = flags & GRPC_WRITE_THROUGH
1565 ? &s->on_write_finished_cbs
1566 : &s->on_flow_controlled_cbs;
1567 cb->next = *list;
1568 *list = cb;
1569 }
1570
1571 if (s->id != 0 && (!s->write_buffering || s->flow_controlled_buffer.length >
1572 t->write_buffer_size)) {
1573 grpc_chttp2_mark_stream_writable(t, s);
1574 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE);
1575 }
1576 }
1577 }
1578
send_trailing_metadata_locked(grpc_transport_stream_op_batch * op,grpc_chttp2_stream * s,grpc_transport_stream_op_batch_payload * op_payload,grpc_chttp2_transport * t,grpc_closure * on_complete)1579 static void send_trailing_metadata_locked(
1580 grpc_transport_stream_op_batch* op, grpc_chttp2_stream* s,
1581 grpc_transport_stream_op_batch_payload* op_payload,
1582 grpc_chttp2_transport* t, grpc_closure* on_complete) {
1583 CHECK_EQ(s->send_trailing_metadata_finished, nullptr);
1584 on_complete->next_data.scratch |= t->closure_barrier_may_cover_write;
1585 s->send_trailing_metadata_finished = add_closure_barrier(on_complete);
1586 s->send_trailing_metadata =
1587 op_payload->send_trailing_metadata.send_trailing_metadata;
1588 s->sent_trailing_metadata_op = op_payload->send_trailing_metadata.sent;
1589 s->write_buffering = false;
1590 if (contains_non_ok_status(s->send_trailing_metadata)) {
1591 s->seen_error = true;
1592 }
1593 if (s->write_closed) {
1594 s->send_trailing_metadata = nullptr;
1595 s->sent_trailing_metadata_op = nullptr;
1596 grpc_chttp2_complete_closure_step(
1597 t, &s->send_trailing_metadata_finished,
1598 op->payload->send_trailing_metadata.send_trailing_metadata->empty()
1599 ? absl::OkStatus()
1600 : GRPC_ERROR_CREATE("Attempt to send trailing metadata after "
1601 "stream was closed"),
1602 "send_trailing_metadata_finished");
1603 } else if (s->id != 0) {
1604 // TODO(ctiller): check if there's flow control for any outstanding
1605 // bytes before going writable
1606 grpc_chttp2_mark_stream_writable(t, s);
1607 grpc_chttp2_initiate_write(
1608 t, GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA);
1609 }
1610 }
1611
recv_initial_metadata_locked(grpc_chttp2_stream * s,grpc_transport_stream_op_batch_payload * op_payload,grpc_chttp2_transport * t)1612 static void recv_initial_metadata_locked(
1613 grpc_chttp2_stream* s, grpc_transport_stream_op_batch_payload* op_payload,
1614 grpc_chttp2_transport* t) {
1615 CHECK_EQ(s->recv_initial_metadata_ready, nullptr);
1616 s->recv_initial_metadata_ready =
1617 op_payload->recv_initial_metadata.recv_initial_metadata_ready;
1618 s->recv_initial_metadata =
1619 op_payload->recv_initial_metadata.recv_initial_metadata;
1620 s->trailing_metadata_available =
1621 op_payload->recv_initial_metadata.trailing_metadata_available;
1622 if (s->parsed_trailers_only && s->trailing_metadata_available != nullptr) {
1623 *s->trailing_metadata_available = true;
1624 }
1625 grpc_chttp2_maybe_complete_recv_initial_metadata(t, s);
1626 }
1627
recv_message_locked(grpc_chttp2_stream * s,grpc_transport_stream_op_batch_payload * op_payload,grpc_chttp2_transport * t)1628 static void recv_message_locked(
1629 grpc_chttp2_stream* s, grpc_transport_stream_op_batch_payload* op_payload,
1630 grpc_chttp2_transport* t) {
1631 CHECK_EQ(s->recv_message_ready, nullptr);
1632 s->recv_message_ready = op_payload->recv_message.recv_message_ready;
1633 s->recv_message = op_payload->recv_message.recv_message;
1634 s->recv_message->emplace();
1635 s->recv_message_flags = op_payload->recv_message.flags;
1636 s->call_failed_before_recv_message =
1637 op_payload->recv_message.call_failed_before_recv_message;
1638 grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
1639 }
1640
recv_trailing_metadata_locked(grpc_chttp2_stream * s,grpc_transport_stream_op_batch_payload * op_payload,grpc_chttp2_transport * t)1641 static void recv_trailing_metadata_locked(
1642 grpc_chttp2_stream* s, grpc_transport_stream_op_batch_payload* op_payload,
1643 grpc_chttp2_transport* t) {
1644 CHECK_EQ(s->collecting_stats, nullptr);
1645 s->collecting_stats = op_payload->recv_trailing_metadata.collect_stats;
1646 CHECK_EQ(s->recv_trailing_metadata_finished, nullptr);
1647 s->recv_trailing_metadata_finished =
1648 op_payload->recv_trailing_metadata.recv_trailing_metadata_ready;
1649 s->recv_trailing_metadata =
1650 op_payload->recv_trailing_metadata.recv_trailing_metadata;
1651 s->final_metadata_requested = true;
1652 grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
1653 }
1654
perform_stream_op_locked(void * stream_op,grpc_error_handle)1655 static void perform_stream_op_locked(void* stream_op,
1656 grpc_error_handle /*error_ignored*/) {
1657 grpc_transport_stream_op_batch* op =
1658 static_cast<grpc_transport_stream_op_batch*>(stream_op);
1659 grpc_chttp2_stream* s =
1660 static_cast<grpc_chttp2_stream*>(op->handler_private.extra_arg);
1661 grpc_transport_stream_op_batch_payload* op_payload = op->payload;
1662 grpc_chttp2_transport* t = s->t.get();
1663
1664 s->traced = op->is_traced;
1665 if (!grpc_core::IsCallTracerInTransportEnabled()) {
1666 s->call_tracer = CallTracerIfSampled(s);
1667 }
1668 s->tcp_tracer = TcpTracerIfSampled(s);
1669 if (GRPC_TRACE_FLAG_ENABLED(http)) {
1670 LOG(INFO) << "perform_stream_op_locked[s=" << s << "; op=" << op
1671 << "]: " << grpc_transport_stream_op_batch_string(op, false)
1672 << "; on_complete = " << op->on_complete;
1673 if (op->send_initial_metadata) {
1674 log_metadata(op_payload->send_initial_metadata.send_initial_metadata,
1675 s->id, t->is_client, true);
1676 }
1677 if (op->send_trailing_metadata) {
1678 log_metadata(op_payload->send_trailing_metadata.send_trailing_metadata,
1679 s->id, t->is_client, false);
1680 }
1681 }
1682
1683 grpc_closure* on_complete = op->on_complete;
1684 // on_complete will be null if and only if there are no send ops in the batch.
1685 if (on_complete != nullptr) {
1686 // This batch has send ops. Use final_data as a barrier until enqueue time;
1687 // the initial counter is dropped at the end of this function.
1688 on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT;
1689 on_complete->error_data.error = 0;
1690 }
1691
1692 if (op->cancel_stream) {
1693 grpc_chttp2_cancel_stream(t, s, op_payload->cancel_stream.cancel_error,
1694 op_payload->cancel_stream.tarpit);
1695 }
1696
1697 if (op->send_initial_metadata) {
1698 send_initial_metadata_locked(op, s, op_payload, t, on_complete);
1699 }
1700
1701 if (op->send_message) {
1702 send_message_locked(op, s, op_payload, t, on_complete);
1703 }
1704
1705 if (op->send_trailing_metadata) {
1706 send_trailing_metadata_locked(op, s, op_payload, t, on_complete);
1707 }
1708
1709 if (op->recv_initial_metadata) {
1710 recv_initial_metadata_locked(s, op_payload, t);
1711 }
1712
1713 if (op->recv_message) {
1714 recv_message_locked(s, op_payload, t);
1715 }
1716
1717 if (op->recv_trailing_metadata) {
1718 recv_trailing_metadata_locked(s, op_payload, t);
1719 }
1720
1721 if (on_complete != nullptr) {
1722 grpc_chttp2_complete_closure_step(t, &on_complete, absl::OkStatus(),
1723 "op->on_complete");
1724 }
1725
1726 GRPC_CHTTP2_STREAM_UNREF(s, "perform_stream_op");
1727 }
1728
PerformStreamOp(grpc_stream * gs,grpc_transport_stream_op_batch * op)1729 void grpc_chttp2_transport::PerformStreamOp(
1730 grpc_stream* gs, grpc_transport_stream_op_batch* op) {
1731 grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs);
1732
1733 if (!is_client) {
1734 if (op->send_initial_metadata) {
1735 CHECK(!op->payload->send_initial_metadata.send_initial_metadata
1736 ->get(grpc_core::GrpcTimeoutMetadata())
1737 .has_value());
1738 }
1739 if (op->send_trailing_metadata) {
1740 CHECK(!op->payload->send_trailing_metadata.send_trailing_metadata
1741 ->get(grpc_core::GrpcTimeoutMetadata())
1742 .has_value());
1743 }
1744 }
1745
1746 GRPC_TRACE_LOG(http, INFO)
1747 << "perform_stream_op[s=" << s << "; op=" << op
1748 << "]: " << grpc_transport_stream_op_batch_string(op, false);
1749
1750 GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op");
1751 op->handler_private.extra_arg = gs;
1752 combiner->Run(GRPC_CLOSURE_INIT(&op->handler_private.closure,
1753 perform_stream_op_locked, op, nullptr),
1754 absl::OkStatus());
1755 }
1756
cancel_pings(grpc_chttp2_transport * t,grpc_error_handle error)1757 static void cancel_pings(grpc_chttp2_transport* t, grpc_error_handle error) {
1758 GRPC_TRACE_LOG(http, INFO)
1759 << t << " CANCEL PINGS: " << grpc_core::StatusToString(error);
1760 // callback remaining pings: they're not allowed to call into the transport,
1761 // and maybe they hold resources that need to be freed
1762 t->ping_callbacks.CancelAll(t->event_engine.get());
1763 }
1764
1765 namespace {
1766 class PingClosureWrapper {
1767 public:
PingClosureWrapper(grpc_closure * closure)1768 explicit PingClosureWrapper(grpc_closure* closure) : closure_(closure) {}
1769 PingClosureWrapper(const PingClosureWrapper&) = delete;
1770 PingClosureWrapper& operator=(const PingClosureWrapper&) = delete;
PingClosureWrapper(PingClosureWrapper && other)1771 PingClosureWrapper(PingClosureWrapper&& other) noexcept
1772 : closure_(other.Take()) {}
operator =(PingClosureWrapper && other)1773 PingClosureWrapper& operator=(PingClosureWrapper&& other) noexcept {
1774 std::swap(closure_, other.closure_);
1775 return *this;
1776 }
~PingClosureWrapper()1777 ~PingClosureWrapper() {
1778 if (closure_ != nullptr) {
1779 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure_, absl::CancelledError());
1780 }
1781 }
1782
operator ()()1783 void operator()() {
1784 grpc_core::ExecCtx::Run(DEBUG_LOCATION, Take(), absl::OkStatus());
1785 }
1786
1787 private:
Take()1788 grpc_closure* Take() { return std::exchange(closure_, nullptr); }
1789
1790 grpc_closure* closure_ = nullptr;
1791 };
1792 } // namespace
1793
send_ping_locked(grpc_chttp2_transport * t,grpc_closure * on_initiate,grpc_closure * on_ack)1794 static void send_ping_locked(grpc_chttp2_transport* t,
1795 grpc_closure* on_initiate, grpc_closure* on_ack) {
1796 if (!t->closed_with_error.ok()) {
1797 grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_initiate, t->closed_with_error);
1798 grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_ack, t->closed_with_error);
1799 return;
1800 }
1801 t->ping_callbacks.OnPing(PingClosureWrapper(on_initiate),
1802 PingClosureWrapper(on_ack));
1803 }
1804
1805 // Specialized form of send_ping_locked for keepalive ping. If there is already
1806 // a ping in progress, the keepalive ping would piggyback onto that ping,
1807 // instead of waiting for that ping to complete and then starting a new ping.
send_keepalive_ping_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t)1808 static void send_keepalive_ping_locked(
1809 grpc_core::RefCountedPtr<grpc_chttp2_transport> t) {
1810 if (!t->closed_with_error.ok()) {
1811 t->combiner->Run(
1812 grpc_core::InitTransportClosure<finish_keepalive_ping_locked>(
1813 t->Ref(), &t->finish_keepalive_ping_locked),
1814 t->closed_with_error);
1815 return;
1816 }
1817 t->ping_callbacks.OnPingAck(
1818 PingClosureWrapper(grpc_core::InitTransportClosure<finish_keepalive_ping>(
1819 t->Ref(), &t->finish_keepalive_ping_locked)));
1820 }
1821
grpc_chttp2_retry_initiate_ping(grpc_core::RefCountedPtr<grpc_chttp2_transport> t)1822 void grpc_chttp2_retry_initiate_ping(
1823 grpc_core::RefCountedPtr<grpc_chttp2_transport> t) {
1824 auto tp = t.get();
1825 tp->combiner->Run(grpc_core::InitTransportClosure<retry_initiate_ping_locked>(
1826 std::move(t), &tp->retry_initiate_ping_locked),
1827 absl::OkStatus());
1828 }
1829
retry_initiate_ping_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,GRPC_UNUSED grpc_error_handle error)1830 static void retry_initiate_ping_locked(
1831 grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
1832 GRPC_UNUSED grpc_error_handle error) {
1833 DCHECK(error.ok());
1834 CHECK(t->delayed_ping_timer_handle != TaskHandle::kInvalid);
1835 t->delayed_ping_timer_handle = TaskHandle::kInvalid;
1836 grpc_chttp2_initiate_write(t.get(),
1837 GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING);
1838 }
1839
grpc_chttp2_ack_ping(grpc_chttp2_transport * t,uint64_t id)1840 void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) {
1841 if (!t->ping_callbacks.AckPing(id, t->event_engine.get())) {
1842 VLOG(2) << "Unknown ping response from " << t->peer_string.as_string_view()
1843 << ": " << id;
1844 return;
1845 }
1846 if (t->ping_callbacks.ping_requested()) {
1847 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS);
1848 }
1849 }
1850
grpc_chttp2_keepalive_timeout(grpc_core::RefCountedPtr<grpc_chttp2_transport> t)1851 void grpc_chttp2_keepalive_timeout(
1852 grpc_core::RefCountedPtr<grpc_chttp2_transport> t) {
1853 t->combiner->Run(
1854 grpc_core::NewClosure([t](grpc_error_handle) {
1855 GRPC_TRACE_LOG(http, INFO) << t->peer_string.as_string_view()
1856 << ": Keepalive timeout. Closing transport.";
1857 send_goaway(
1858 t.get(),
1859 grpc_error_set_int(GRPC_ERROR_CREATE("keepalive_timeout"),
1860 grpc_core::StatusIntProperty::kHttp2Error,
1861 GRPC_HTTP2_ENHANCE_YOUR_CALM),
1862 /*immediate_disconnect_hint=*/true);
1863 close_transport_locked(
1864 t.get(),
1865 grpc_error_set_int(GRPC_ERROR_CREATE("keepalive timeout"),
1866 grpc_core::StatusIntProperty::kRpcStatus,
1867 GRPC_STATUS_UNAVAILABLE));
1868 }),
1869 absl::OkStatus());
1870 }
1871
grpc_chttp2_ping_timeout(grpc_core::RefCountedPtr<grpc_chttp2_transport> t)1872 void grpc_chttp2_ping_timeout(
1873 grpc_core::RefCountedPtr<grpc_chttp2_transport> t) {
1874 t->combiner->Run(
1875 grpc_core::NewClosure([t](grpc_error_handle) {
1876 GRPC_TRACE_LOG(http, INFO) << t->peer_string.as_string_view()
1877 << ": Ping timeout. Closing transport.";
1878 send_goaway(
1879 t.get(),
1880 grpc_error_set_int(GRPC_ERROR_CREATE("ping_timeout"),
1881 grpc_core::StatusIntProperty::kHttp2Error,
1882 GRPC_HTTP2_ENHANCE_YOUR_CALM),
1883 /*immediate_disconnect_hint=*/true);
1884 close_transport_locked(
1885 t.get(),
1886 grpc_error_set_int(GRPC_ERROR_CREATE("ping timeout"),
1887 grpc_core::StatusIntProperty::kRpcStatus,
1888 GRPC_STATUS_UNAVAILABLE));
1889 }),
1890 absl::OkStatus());
1891 }
1892
grpc_chttp2_settings_timeout(grpc_core::RefCountedPtr<grpc_chttp2_transport> t)1893 void grpc_chttp2_settings_timeout(
1894 grpc_core::RefCountedPtr<grpc_chttp2_transport> t) {
1895 t->combiner->Run(
1896 grpc_core::NewClosure([t](grpc_error_handle) {
1897 GRPC_TRACE_LOG(http, INFO) << t->peer_string.as_string_view()
1898 << ": Settings timeout. Closing transport.";
1899 send_goaway(
1900 t.get(),
1901 grpc_error_set_int(GRPC_ERROR_CREATE("settings_timeout"),
1902 grpc_core::StatusIntProperty::kHttp2Error,
1903 GRPC_HTTP2_SETTINGS_TIMEOUT),
1904 /*immediate_disconnect_hint=*/true);
1905 close_transport_locked(
1906 t.get(),
1907 grpc_error_set_int(GRPC_ERROR_CREATE("settings timeout"),
1908 grpc_core::StatusIntProperty::kRpcStatus,
1909 GRPC_STATUS_UNAVAILABLE));
1910 }),
1911 absl::OkStatus());
1912 }
1913
1914 namespace {
1915
1916 // Fire and forget (deletes itself on completion). Does a graceful shutdown by
1917 // sending a GOAWAY frame with the last stream id set to 2^31-1, sending a ping
1918 // and waiting for an ack (effective waiting for an RTT) and then sending a
1919 // final GOAWAY frame with an updated last stream identifier. This helps ensure
1920 // that a connection can be cleanly shut down without losing requests.
1921 // In the event, that the client does not respond to the ping for some reason,
1922 // we add a 20 second deadline, after which we send the second goaway.
1923 class GracefulGoaway : public grpc_core::RefCounted<GracefulGoaway> {
1924 public:
Start(grpc_chttp2_transport * t)1925 static void Start(grpc_chttp2_transport* t) { new GracefulGoaway(t); }
1926
1927 private:
1928 using TaskHandle = ::grpc_event_engine::experimental::EventEngine::TaskHandle;
1929
GracefulGoaway(grpc_chttp2_transport * t)1930 explicit GracefulGoaway(grpc_chttp2_transport* t) : t_(t->Ref()) {
1931 t->sent_goaway_state = GRPC_CHTTP2_GRACEFUL_GOAWAY;
1932 grpc_chttp2_goaway_append((1u << 31) - 1, 0, grpc_empty_slice(), &t->qbuf);
1933 t->keepalive_timeout =
1934 std::min(t->keepalive_timeout, grpc_core::Duration::Seconds(20));
1935 t->ping_timeout =
1936 std::min(t->ping_timeout, grpc_core::Duration::Seconds(20));
1937 send_ping_locked(
1938 t, nullptr, GRPC_CLOSURE_INIT(&on_ping_ack_, OnPingAck, this, nullptr));
1939 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT);
1940 }
1941
MaybeSendFinalGoawayLocked()1942 void MaybeSendFinalGoawayLocked() {
1943 if (t_->sent_goaway_state != GRPC_CHTTP2_GRACEFUL_GOAWAY) {
1944 // We already sent the final GOAWAY.
1945 return;
1946 }
1947 if (t_->destroying || !t_->closed_with_error.ok()) {
1948 GRPC_TRACE_LOG(http, INFO) << "transport:" << t_.get() << " "
1949 << (t_->is_client ? "CLIENT" : "SERVER")
1950 << " peer:" << t_->peer_string.as_string_view()
1951 << " Transport already shutting down. "
1952 "Graceful GOAWAY abandoned.";
1953 return;
1954 }
1955 // Ping completed. Send final goaway.
1956 GRPC_TRACE_LOG(http, INFO)
1957 << "transport:" << t_.get() << " "
1958 << (t_->is_client ? "CLIENT" : "SERVER")
1959 << " peer:" << std::string(t_->peer_string.as_string_view())
1960 << " Graceful shutdown: Ping received. "
1961 "Sending final GOAWAY with stream_id:"
1962 << t_->last_new_stream_id;
1963 t_->sent_goaway_state = GRPC_CHTTP2_FINAL_GOAWAY_SEND_SCHEDULED;
1964 grpc_chttp2_goaway_append(t_->last_new_stream_id, 0, grpc_empty_slice(),
1965 &t_->qbuf);
1966 grpc_chttp2_initiate_write(t_.get(),
1967 GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT);
1968 }
1969
OnPingAck(void * arg,grpc_error_handle)1970 static void OnPingAck(void* arg, grpc_error_handle /* error */) {
1971 auto* self = static_cast<GracefulGoaway*>(arg);
1972 self->t_->combiner->Run(
1973 GRPC_CLOSURE_INIT(&self->on_ping_ack_, OnPingAckLocked, self, nullptr),
1974 absl::OkStatus());
1975 }
1976
OnPingAckLocked(void * arg,grpc_error_handle)1977 static void OnPingAckLocked(void* arg, grpc_error_handle /* error */) {
1978 auto* self = static_cast<GracefulGoaway*>(arg);
1979 self->MaybeSendFinalGoawayLocked();
1980 self->Unref();
1981 }
1982
1983 const grpc_core::RefCountedPtr<grpc_chttp2_transport> t_;
1984 grpc_closure on_ping_ack_;
1985 };
1986
1987 } // namespace
1988
send_goaway(grpc_chttp2_transport * t,grpc_error_handle error,const bool immediate_disconnect_hint)1989 static void send_goaway(grpc_chttp2_transport* t, grpc_error_handle error,
1990 const bool immediate_disconnect_hint) {
1991 grpc_http2_error_code http_error;
1992 std::string message;
1993 grpc_error_get_status(error, grpc_core::Timestamp::InfFuture(), nullptr,
1994 &message, &http_error, nullptr);
1995 if (!t->is_client && http_error == GRPC_HTTP2_NO_ERROR &&
1996 !immediate_disconnect_hint) {
1997 // Do a graceful shutdown.
1998 if (t->sent_goaway_state == GRPC_CHTTP2_NO_GOAWAY_SEND) {
1999 GracefulGoaway::Start(t);
2000 } else {
2001 // Graceful GOAWAY is already in progress.
2002 }
2003 } else if (t->sent_goaway_state == GRPC_CHTTP2_NO_GOAWAY_SEND ||
2004 t->sent_goaway_state == GRPC_CHTTP2_GRACEFUL_GOAWAY) {
2005 // We want to log this irrespective of whether http tracing is enabled
2006 VLOG(2) << t->peer_string.as_string_view() << " "
2007 << (t->is_client ? "CLIENT" : "SERVER")
2008 << ": Sending goaway last_new_stream_id=" << t->last_new_stream_id
2009 << " err=" << grpc_core::StatusToString(error);
2010 t->sent_goaway_state = GRPC_CHTTP2_FINAL_GOAWAY_SEND_SCHEDULED;
2011 grpc_chttp2_goaway_append(
2012 t->last_new_stream_id, static_cast<uint32_t>(http_error),
2013 grpc_slice_from_cpp_string(std::move(message)), &t->qbuf);
2014 } else {
2015 // Final GOAWAY has already been sent.
2016 }
2017 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT);
2018 }
2019
grpc_chttp2_exceeded_ping_strikes(grpc_chttp2_transport * t)2020 void grpc_chttp2_exceeded_ping_strikes(grpc_chttp2_transport* t) {
2021 send_goaway(t,
2022 grpc_error_set_int(GRPC_ERROR_CREATE("too_many_pings"),
2023 grpc_core::StatusIntProperty::kHttp2Error,
2024 GRPC_HTTP2_ENHANCE_YOUR_CALM),
2025 /*immediate_disconnect_hint=*/true);
2026 // The transport will be closed after the write is done
2027 close_transport_locked(
2028 t, grpc_error_set_int(GRPC_ERROR_CREATE("Too many pings"),
2029 grpc_core::StatusIntProperty::kRpcStatus,
2030 GRPC_STATUS_UNAVAILABLE));
2031 }
2032
grpc_chttp2_reset_ping_clock(grpc_chttp2_transport * t)2033 void grpc_chttp2_reset_ping_clock(grpc_chttp2_transport* t) {
2034 if (!t->is_client) {
2035 t->ping_abuse_policy.ResetPingStrikes();
2036 }
2037 t->ping_rate_policy.ResetPingsBeforeDataRequired();
2038 }
2039
perform_transport_op_locked(void * stream_op,grpc_error_handle)2040 static void perform_transport_op_locked(void* stream_op,
2041 grpc_error_handle /*error_ignored*/) {
2042 grpc_transport_op* op = static_cast<grpc_transport_op*>(stream_op);
2043 grpc_core::RefCountedPtr<grpc_chttp2_transport> t(
2044 static_cast<grpc_chttp2_transport*>(op->handler_private.extra_arg));
2045
2046 if (!op->goaway_error.ok()) {
2047 send_goaway(t.get(), op->goaway_error, /*immediate_disconnect_hint=*/false);
2048 }
2049
2050 if (op->set_accept_stream) {
2051 t->accept_stream_cb = op->set_accept_stream_fn;
2052 t->accept_stream_cb_user_data = op->set_accept_stream_user_data;
2053 t->registered_method_matcher_cb = op->set_registered_method_matcher_fn;
2054 }
2055
2056 if (op->bind_pollset) {
2057 if (t->ep != nullptr) {
2058 grpc_endpoint_add_to_pollset(t->ep.get(), op->bind_pollset);
2059 }
2060 }
2061
2062 if (op->bind_pollset_set) {
2063 if (t->ep != nullptr) {
2064 grpc_endpoint_add_to_pollset_set(t->ep.get(), op->bind_pollset_set);
2065 }
2066 }
2067
2068 if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
2069 send_ping_locked(t.get(), op->send_ping.on_initiate, op->send_ping.on_ack);
2070 grpc_chttp2_initiate_write(t.get(),
2071 GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING);
2072 }
2073
2074 if (op->start_connectivity_watch != nullptr) {
2075 t->state_tracker.AddWatcher(op->start_connectivity_watch_state,
2076 std::move(op->start_connectivity_watch));
2077 }
2078 if (op->stop_connectivity_watch != nullptr) {
2079 t->state_tracker.RemoveWatcher(op->stop_connectivity_watch);
2080 }
2081
2082 if (!op->disconnect_with_error.ok()) {
2083 send_goaway(t.get(), op->disconnect_with_error,
2084 /*immediate_disconnect_hint=*/true);
2085 close_transport_locked(t.get(), op->disconnect_with_error);
2086 }
2087
2088 grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, absl::OkStatus());
2089 }
2090
PerformOp(grpc_transport_op * op)2091 void grpc_chttp2_transport::PerformOp(grpc_transport_op* op) {
2092 GRPC_TRACE_LOG(http, INFO) << "perform_transport_op[t=" << this
2093 << "]: " << grpc_transport_op_string(op);
2094 op->handler_private.extra_arg = this;
2095 Ref().release()->combiner->Run(
2096 GRPC_CLOSURE_INIT(&op->handler_private.closure,
2097 perform_transport_op_locked, op, nullptr),
2098 absl::OkStatus());
2099 }
2100
2101 //
2102 // INPUT PROCESSING - GENERAL
2103 //
2104
grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport * t,grpc_chttp2_stream * s)2105 void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport* t,
2106 grpc_chttp2_stream* s) {
2107 if (s->recv_initial_metadata_ready != nullptr &&
2108 s->published_metadata[0] != GRPC_METADATA_NOT_PUBLISHED) {
2109 if (s->seen_error) {
2110 grpc_slice_buffer_reset_and_unref(&s->frame_storage);
2111 }
2112 *s->recv_initial_metadata = std::move(s->initial_metadata_buffer);
2113 s->recv_initial_metadata->Set(grpc_core::PeerString(),
2114 t->peer_string.Ref());
2115 // If we didn't receive initial metadata from the wire and instead faked a
2116 // status (due to stream cancellations for example), let upper layers know
2117 // that trailing metadata is immediately available.
2118 if (s->trailing_metadata_available != nullptr &&
2119 s->published_metadata[0] != GRPC_METADATA_PUBLISHED_FROM_WIRE &&
2120 s->published_metadata[1] == GRPC_METADATA_SYNTHESIZED_FROM_FAKE) {
2121 *s->trailing_metadata_available = true;
2122 s->trailing_metadata_available = nullptr;
2123 }
2124 if (t->registered_method_matcher_cb != nullptr) {
2125 t->registered_method_matcher_cb(t->accept_stream_cb_user_data,
2126 s->recv_initial_metadata);
2127 }
2128 null_then_sched_closure(&s->recv_initial_metadata_ready);
2129 }
2130 }
2131
grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport * t,grpc_chttp2_stream * s)2132 void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t,
2133 grpc_chttp2_stream* s) {
2134 if (s->recv_message_ready == nullptr) return;
2135
2136 grpc_core::chttp2::StreamFlowControl::IncomingUpdateContext upd(
2137 &s->flow_control);
2138 grpc_error_handle error;
2139
2140 // Lambda is immediately invoked as a big scoped section that can be
2141 // exited out of at any point by returning.
2142 [&]() {
2143 GRPC_TRACE_VLOG(http, 2)
2144 << "maybe_complete_recv_message " << s
2145 << " final_metadata_requested=" << s->final_metadata_requested
2146 << " seen_error=" << s->seen_error;
2147 if (s->final_metadata_requested && s->seen_error) {
2148 grpc_slice_buffer_reset_and_unref(&s->frame_storage);
2149 s->recv_message->reset();
2150 } else {
2151 if (s->frame_storage.length != 0) {
2152 while (true) {
2153 CHECK_GT(s->frame_storage.length, 0u);
2154 int64_t min_progress_size;
2155 auto r = grpc_deframe_unprocessed_incoming_frames(
2156 s, &min_progress_size, &**s->recv_message, s->recv_message_flags);
2157 GRPC_TRACE_VLOG(http, 2)
2158 << "Deframe data frame: "
2159 << grpc_core::PollToString(
2160 r, [](absl::Status r) { return r.ToString(); });
2161 if (r.pending()) {
2162 if (s->read_closed) {
2163 grpc_slice_buffer_reset_and_unref(&s->frame_storage);
2164 s->recv_message->reset();
2165 break;
2166 } else {
2167 upd.SetMinProgressSize(min_progress_size);
2168 return; // Out of lambda to enclosing function
2169 }
2170 } else {
2171 error = std::move(r.value());
2172 if (!error.ok()) {
2173 s->seen_error = true;
2174 grpc_slice_buffer_reset_and_unref(&s->frame_storage);
2175 break;
2176 } else {
2177 if (t->channelz_socket != nullptr) {
2178 t->channelz_socket->RecordMessageReceived();
2179 }
2180 break;
2181 }
2182 }
2183 }
2184 } else if (s->read_closed) {
2185 s->recv_message->reset();
2186 } else {
2187 upd.SetMinProgressSize(GRPC_HEADER_SIZE_IN_BYTES);
2188 return; // Out of lambda to enclosing function
2189 }
2190 }
2191 // save the length of the buffer before handing control back to application
2192 // threads. Needed to support correct flow control bookkeeping
2193 if (error.ok() && s->recv_message->has_value()) {
2194 null_then_sched_closure(&s->recv_message_ready);
2195 } else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) {
2196 if (s->call_failed_before_recv_message != nullptr) {
2197 *s->call_failed_before_recv_message =
2198 (s->published_metadata[1] != GRPC_METADATA_PUBLISHED_AT_CLOSE);
2199 }
2200 null_then_sched_closure(&s->recv_message_ready);
2201 }
2202 }();
2203
2204 upd.SetPendingSize(s->frame_storage.length);
2205 grpc_chttp2_act_on_flowctl_action(upd.MakeAction(), t, s);
2206 }
2207
grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport * t,grpc_chttp2_stream * s)2208 void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
2209 grpc_chttp2_stream* s) {
2210 grpc_chttp2_maybe_complete_recv_message(t, s);
2211 GRPC_TRACE_VLOG(http, 2) << "maybe_complete_recv_trailing_metadata cli="
2212 << t->is_client << " s=" << s
2213 << " closure=" << s->recv_trailing_metadata_finished
2214 << " read_closed=" << s->read_closed
2215 << " write_closed=" << s->write_closed << " "
2216 << s->frame_storage.length;
2217 if (s->recv_trailing_metadata_finished != nullptr && s->read_closed &&
2218 s->write_closed) {
2219 if (s->seen_error || !t->is_client) {
2220 grpc_slice_buffer_reset_and_unref(&s->frame_storage);
2221 }
2222 if (s->read_closed && s->frame_storage.length == 0 &&
2223 s->recv_trailing_metadata_finished != nullptr) {
2224 grpc_transport_move_stats(&s->stats, s->collecting_stats);
2225 s->collecting_stats = nullptr;
2226 *s->recv_trailing_metadata = std::move(s->trailing_metadata_buffer);
2227 null_then_sched_closure(&s->recv_trailing_metadata_finished);
2228 }
2229 }
2230 }
2231
remove_stream(grpc_chttp2_transport * t,uint32_t id,grpc_error_handle error)2232 static grpc_chttp2_transport::RemovedStreamHandle remove_stream(
2233 grpc_chttp2_transport* t, uint32_t id, grpc_error_handle error) {
2234 grpc_chttp2_stream* s = t->stream_map.extract(id).mapped();
2235 DCHECK(s);
2236 if (t->incoming_stream == s) {
2237 t->incoming_stream = nullptr;
2238 grpc_chttp2_parsing_become_skip_parser(t);
2239 }
2240
2241 if (t->stream_map.empty()) {
2242 post_benign_reclaimer(t);
2243 if (t->sent_goaway_state == GRPC_CHTTP2_FINAL_GOAWAY_SENT) {
2244 close_transport_locked(
2245 t, GRPC_ERROR_CREATE_REFERENCING(
2246 "Last stream closed after sending GOAWAY", &error, 1));
2247 }
2248 }
2249 if (grpc_chttp2_list_remove_writable_stream(t, s)) {
2250 GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:remove_stream");
2251 }
2252 grpc_chttp2_list_remove_stalled_by_stream(t, s);
2253 grpc_chttp2_list_remove_stalled_by_transport(t, s);
2254
2255 maybe_start_some_streams(t);
2256
2257 if (t->is_client) return grpc_chttp2_transport::RemovedStreamHandle();
2258 return grpc_chttp2_transport::RemovedStreamHandle(t->Ref());
2259 }
2260
2261 namespace grpc_core {
2262 namespace {
2263
TarpitDuration(grpc_chttp2_transport * t)2264 Duration TarpitDuration(grpc_chttp2_transport* t) {
2265 return Duration::Milliseconds(absl::LogUniform<int>(
2266 absl::BitGen(), t->min_tarpit_duration_ms, t->max_tarpit_duration_ms));
2267 }
2268
2269 template <typename F>
MaybeTarpit(grpc_chttp2_transport * t,bool tarpit,F fn)2270 void MaybeTarpit(grpc_chttp2_transport* t, bool tarpit, F fn) {
2271 if (!tarpit || !t->allow_tarpit || t->is_client) {
2272 fn(t);
2273 return;
2274 }
2275 const auto duration = TarpitDuration(t);
2276 t->event_engine->RunAfter(
2277 duration, [t = t->Ref(), fn = std::move(fn)]() mutable {
2278 ApplicationCallbackExecCtx app_exec_ctx;
2279 ExecCtx exec_ctx;
2280 t->combiner->Run(
2281 NewClosure([t, fn = std::move(fn)](grpc_error_handle) mutable {
2282 // TODO(ctiller): this can result in not sending RST_STREAMS if a
2283 // request gets tarpit behind a transport close.
2284 if (!t->closed_with_error.ok()) return;
2285 fn(t.get());
2286 }),
2287 absl::OkStatus());
2288 });
2289 }
2290
2291 } // namespace
2292 } // namespace grpc_core
2293
grpc_chttp2_cancel_stream(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error_handle due_to_error,bool tarpit)2294 void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2295 grpc_error_handle due_to_error, bool tarpit) {
2296 if (!t->is_client && !s->sent_trailing_metadata &&
2297 grpc_error_has_clear_grpc_status(due_to_error) &&
2298 !(s->read_closed && s->write_closed)) {
2299 close_from_api(t, s, due_to_error, tarpit);
2300 return;
2301 }
2302
2303 if (!due_to_error.ok() && !s->seen_error) {
2304 s->seen_error = true;
2305 }
2306 if (!s->read_closed || !s->write_closed) {
2307 if (s->id != 0) {
2308 grpc_http2_error_code http_error;
2309 grpc_error_get_status(due_to_error, s->deadline, nullptr, nullptr,
2310 &http_error, nullptr);
2311 grpc_core::MaybeTarpit(
2312 t, tarpit,
2313 [id = s->id, http_error,
2314 remove_stream_handle = grpc_chttp2_mark_stream_closed(
2315 t, s, 1, 1, due_to_error)](grpc_chttp2_transport* t) {
2316 grpc_chttp2_add_rst_stream_to_next_write(
2317 t, id, static_cast<uint32_t>(http_error), nullptr);
2318 grpc_chttp2_initiate_write(t,
2319 GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
2320 });
2321 return;
2322 }
2323 }
2324 grpc_chttp2_mark_stream_closed(t, s, 1, 1, due_to_error);
2325 }
2326
grpc_chttp2_fake_status(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error_handle error)2327 void grpc_chttp2_fake_status(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2328 grpc_error_handle error) {
2329 grpc_status_code status;
2330 std::string message;
2331 grpc_error_get_status(error, s->deadline, &status, &message, nullptr,
2332 nullptr);
2333 if (status != GRPC_STATUS_OK) {
2334 s->seen_error = true;
2335 }
2336 // stream_global->recv_trailing_metadata_finished gives us a
2337 // last chance replacement: we've received trailing metadata,
2338 // but something more important has become available to signal
2339 // to the upper layers - drop what we've got, and then publish
2340 // what we want - which is safe because we haven't told anyone
2341 // about the metadata yet
2342 if (s->published_metadata[1] == GRPC_METADATA_NOT_PUBLISHED ||
2343 s->recv_trailing_metadata_finished != nullptr ||
2344 !s->final_metadata_requested) {
2345 s->trailing_metadata_buffer.Set(grpc_core::GrpcStatusMetadata(), status);
2346 if (!message.empty()) {
2347 s->trailing_metadata_buffer.Set(
2348 grpc_core::GrpcMessageMetadata(),
2349 grpc_core::Slice::FromCopiedBuffer(message));
2350 }
2351 s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE;
2352 grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2353 }
2354 }
2355
add_error(grpc_error_handle error,grpc_error_handle * refs,size_t * nrefs)2356 static void add_error(grpc_error_handle error, grpc_error_handle* refs,
2357 size_t* nrefs) {
2358 if (error.ok()) return;
2359 for (size_t i = 0; i < *nrefs; i++) {
2360 if (error == refs[i]) {
2361 return;
2362 }
2363 }
2364 refs[*nrefs] = error;
2365 ++*nrefs;
2366 }
2367
removal_error(grpc_error_handle extra_error,grpc_chttp2_stream * s,const char * main_error_msg)2368 static grpc_error_handle removal_error(grpc_error_handle extra_error,
2369 grpc_chttp2_stream* s,
2370 const char* main_error_msg) {
2371 grpc_error_handle refs[3];
2372 size_t nrefs = 0;
2373 add_error(s->read_closed_error, refs, &nrefs);
2374 add_error(s->write_closed_error, refs, &nrefs);
2375 add_error(extra_error, refs, &nrefs);
2376 grpc_error_handle error;
2377 if (nrefs > 0) {
2378 error = GRPC_ERROR_CREATE_REFERENCING(main_error_msg, refs, nrefs);
2379 }
2380 return error;
2381 }
2382
flush_write_list(grpc_chttp2_transport * t,grpc_chttp2_write_cb ** list,grpc_error_handle error)2383 static void flush_write_list(grpc_chttp2_transport* t,
2384 grpc_chttp2_write_cb** list,
2385 grpc_error_handle error) {
2386 while (*list) {
2387 grpc_chttp2_write_cb* cb = *list;
2388 *list = cb->next;
2389 grpc_chttp2_complete_closure_step(t, &cb->closure, error,
2390 "on_write_finished_cb");
2391 cb->next = t->write_cb_pool;
2392 t->write_cb_pool = cb;
2393 }
2394 }
2395
grpc_chttp2_fail_pending_writes(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error_handle error)2396 void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t,
2397 grpc_chttp2_stream* s,
2398 grpc_error_handle error) {
2399 error =
2400 removal_error(error, s, "Pending writes failed due to stream closure");
2401 s->send_initial_metadata = nullptr;
2402 grpc_chttp2_complete_closure_step(t, &s->send_initial_metadata_finished,
2403 error, "send_initial_metadata_finished");
2404
2405 s->send_trailing_metadata = nullptr;
2406 s->sent_trailing_metadata_op = nullptr;
2407 grpc_chttp2_complete_closure_step(t, &s->send_trailing_metadata_finished,
2408 error, "send_trailing_metadata_finished");
2409
2410 grpc_chttp2_complete_closure_step(t, &s->send_message_finished, error,
2411 "fetching_send_message_finished");
2412 flush_write_list(t, &s->on_write_finished_cbs, error);
2413 flush_write_list(t, &s->on_flow_controlled_cbs, error);
2414 }
2415
grpc_chttp2_mark_stream_closed(grpc_chttp2_transport * t,grpc_chttp2_stream * s,int close_reads,int close_writes,grpc_error_handle error)2416 grpc_chttp2_transport::RemovedStreamHandle grpc_chttp2_mark_stream_closed(
2417 grpc_chttp2_transport* t, grpc_chttp2_stream* s, int close_reads,
2418 int close_writes, grpc_error_handle error) {
2419 grpc_chttp2_transport::RemovedStreamHandle rsh;
2420 GRPC_TRACE_VLOG(http, 2)
2421 << "MARK_STREAM_CLOSED: t=" << t << " s=" << s << "(id=" << s->id << ") "
2422 << ((close_reads && close_writes)
2423 ? "read+write"
2424 : (close_reads ? "read" : (close_writes ? "write" : "nothing??")))
2425 << " [" << grpc_core::StatusToString(error) << "]";
2426 if (s->read_closed && s->write_closed) {
2427 // already closed, but we should still fake the status if needed.
2428 grpc_error_handle overall_error = removal_error(error, s, "Stream removed");
2429 if (!overall_error.ok()) {
2430 grpc_chttp2_fake_status(t, s, overall_error);
2431 }
2432 grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2433 return rsh;
2434 }
2435 bool closed_read = false;
2436 bool became_closed = false;
2437 if (close_reads && !s->read_closed) {
2438 s->read_closed_error = error;
2439 s->read_closed = true;
2440 closed_read = true;
2441 }
2442 if (close_writes && !s->write_closed) {
2443 s->write_closed_error = error;
2444 s->write_closed = true;
2445 grpc_chttp2_fail_pending_writes(t, s, error);
2446 }
2447 if (s->read_closed && s->write_closed) {
2448 became_closed = true;
2449 grpc_error_handle overall_error = removal_error(error, s, "Stream removed");
2450 if (s->id != 0) {
2451 rsh = remove_stream(t, s->id, overall_error);
2452 } else {
2453 // Purge streams waiting on concurrency still waiting for id assignment
2454 grpc_chttp2_list_remove_waiting_for_concurrency(t, s);
2455 }
2456 if (!overall_error.ok()) {
2457 grpc_chttp2_fake_status(t, s, overall_error);
2458 }
2459 }
2460 if (closed_read) {
2461 for (int i = 0; i < 2; i++) {
2462 if (s->published_metadata[i] == GRPC_METADATA_NOT_PUBLISHED) {
2463 s->published_metadata[i] = GRPC_METADATA_PUBLISHED_AT_CLOSE;
2464 }
2465 }
2466 grpc_chttp2_maybe_complete_recv_initial_metadata(t, s);
2467 grpc_chttp2_maybe_complete_recv_message(t, s);
2468 }
2469 if (became_closed) {
2470 s->stats.latency =
2471 gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), s->creation_time);
2472 grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2473 GRPC_CHTTP2_STREAM_UNREF(s, "chttp2");
2474 }
2475 return rsh;
2476 }
2477
close_from_api(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error_handle error,bool tarpit)2478 static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2479 grpc_error_handle error, bool tarpit) {
2480 grpc_status_code grpc_status;
2481 std::string message;
2482 grpc_error_get_status(error, s->deadline, &grpc_status, &message, nullptr,
2483 nullptr);
2484
2485 CHECK_GE(grpc_status, 0);
2486 CHECK_LT((int)grpc_status, 100);
2487
2488 auto remove_stream_handle = grpc_chttp2_mark_stream_closed(t, s, 1, 1, error);
2489 grpc_core::MaybeTarpit(
2490 t, tarpit,
2491 [error = std::move(error),
2492 sent_initial_metadata = s->sent_initial_metadata, id = s->id,
2493 grpc_status, message = std::move(message),
2494 remove_stream_handle =
2495 std::move(remove_stream_handle)](grpc_chttp2_transport* t) mutable {
2496 grpc_slice hdr;
2497 grpc_slice status_hdr;
2498 grpc_slice http_status_hdr;
2499 grpc_slice content_type_hdr;
2500 grpc_slice message_pfx;
2501 uint8_t* p;
2502 uint32_t len = 0;
2503
2504 // Hand roll a header block.
2505 // This is unnecessarily ugly - at some point we should find a more
2506 // elegant solution.
2507 // It's complicated by the fact that our send machinery would be dead
2508 // by the time we got around to sending this, so instead we ignore
2509 // HPACK compression and just write the uncompressed bytes onto the
2510 // wire.
2511 if (!sent_initial_metadata) {
2512 http_status_hdr = GRPC_SLICE_MALLOC(13);
2513 p = GRPC_SLICE_START_PTR(http_status_hdr);
2514 *p++ = 0x00;
2515 *p++ = 7;
2516 *p++ = ':';
2517 *p++ = 's';
2518 *p++ = 't';
2519 *p++ = 'a';
2520 *p++ = 't';
2521 *p++ = 'u';
2522 *p++ = 's';
2523 *p++ = 3;
2524 *p++ = '2';
2525 *p++ = '0';
2526 *p++ = '0';
2527 CHECK(p == GRPC_SLICE_END_PTR(http_status_hdr));
2528 len += static_cast<uint32_t> GRPC_SLICE_LENGTH(http_status_hdr);
2529
2530 content_type_hdr = GRPC_SLICE_MALLOC(31);
2531 p = GRPC_SLICE_START_PTR(content_type_hdr);
2532 *p++ = 0x00;
2533 *p++ = 12;
2534 *p++ = 'c';
2535 *p++ = 'o';
2536 *p++ = 'n';
2537 *p++ = 't';
2538 *p++ = 'e';
2539 *p++ = 'n';
2540 *p++ = 't';
2541 *p++ = '-';
2542 *p++ = 't';
2543 *p++ = 'y';
2544 *p++ = 'p';
2545 *p++ = 'e';
2546 *p++ = 16;
2547 *p++ = 'a';
2548 *p++ = 'p';
2549 *p++ = 'p';
2550 *p++ = 'l';
2551 *p++ = 'i';
2552 *p++ = 'c';
2553 *p++ = 'a';
2554 *p++ = 't';
2555 *p++ = 'i';
2556 *p++ = 'o';
2557 *p++ = 'n';
2558 *p++ = '/';
2559 *p++ = 'g';
2560 *p++ = 'r';
2561 *p++ = 'p';
2562 *p++ = 'c';
2563 CHECK(p == GRPC_SLICE_END_PTR(content_type_hdr));
2564 len += static_cast<uint32_t> GRPC_SLICE_LENGTH(content_type_hdr);
2565 }
2566
2567 status_hdr = GRPC_SLICE_MALLOC(15 + (grpc_status >= 10));
2568 p = GRPC_SLICE_START_PTR(status_hdr);
2569 *p++ = 0x00; // literal header, not indexed
2570 *p++ = 11; // len(grpc-status)
2571 *p++ = 'g';
2572 *p++ = 'r';
2573 *p++ = 'p';
2574 *p++ = 'c';
2575 *p++ = '-';
2576 *p++ = 's';
2577 *p++ = 't';
2578 *p++ = 'a';
2579 *p++ = 't';
2580 *p++ = 'u';
2581 *p++ = 's';
2582 if (grpc_status < 10) {
2583 *p++ = 1;
2584 *p++ = static_cast<uint8_t>('0' + grpc_status);
2585 } else {
2586 *p++ = 2;
2587 *p++ = static_cast<uint8_t>('0' + (grpc_status / 10));
2588 *p++ = static_cast<uint8_t>('0' + (grpc_status % 10));
2589 }
2590 CHECK(p == GRPC_SLICE_END_PTR(status_hdr));
2591 len += static_cast<uint32_t> GRPC_SLICE_LENGTH(status_hdr);
2592
2593 size_t msg_len = message.length();
2594 CHECK(msg_len <= UINT32_MAX);
2595 grpc_core::VarintWriter<1> msg_len_writer(
2596 static_cast<uint32_t>(msg_len));
2597 message_pfx = GRPC_SLICE_MALLOC(14 + msg_len_writer.length());
2598 p = GRPC_SLICE_START_PTR(message_pfx);
2599 *p++ = 0x00; // literal header, not indexed
2600 *p++ = 12; // len(grpc-message)
2601 *p++ = 'g';
2602 *p++ = 'r';
2603 *p++ = 'p';
2604 *p++ = 'c';
2605 *p++ = '-';
2606 *p++ = 'm';
2607 *p++ = 'e';
2608 *p++ = 's';
2609 *p++ = 's';
2610 *p++ = 'a';
2611 *p++ = 'g';
2612 *p++ = 'e';
2613 msg_len_writer.Write(0, p);
2614 p += msg_len_writer.length();
2615 CHECK(p == GRPC_SLICE_END_PTR(message_pfx));
2616 len += static_cast<uint32_t> GRPC_SLICE_LENGTH(message_pfx);
2617 len += static_cast<uint32_t>(msg_len);
2618
2619 hdr = GRPC_SLICE_MALLOC(9);
2620 p = GRPC_SLICE_START_PTR(hdr);
2621 *p++ = static_cast<uint8_t>(len >> 16);
2622 *p++ = static_cast<uint8_t>(len >> 8);
2623 *p++ = static_cast<uint8_t>(len);
2624 *p++ = GRPC_CHTTP2_FRAME_HEADER;
2625 *p++ = GRPC_CHTTP2_DATA_FLAG_END_STREAM |
2626 GRPC_CHTTP2_DATA_FLAG_END_HEADERS;
2627 *p++ = static_cast<uint8_t>(id >> 24);
2628 *p++ = static_cast<uint8_t>(id >> 16);
2629 *p++ = static_cast<uint8_t>(id >> 8);
2630 *p++ = static_cast<uint8_t>(id);
2631 CHECK(p == GRPC_SLICE_END_PTR(hdr));
2632
2633 grpc_slice_buffer_add(&t->qbuf, hdr);
2634 if (!sent_initial_metadata) {
2635 grpc_slice_buffer_add(&t->qbuf, http_status_hdr);
2636 grpc_slice_buffer_add(&t->qbuf, content_type_hdr);
2637 }
2638 grpc_slice_buffer_add(&t->qbuf, status_hdr);
2639 grpc_slice_buffer_add(&t->qbuf, message_pfx);
2640 grpc_slice_buffer_add(&t->qbuf,
2641 grpc_slice_from_cpp_string(std::move(message)));
2642 grpc_chttp2_reset_ping_clock(t);
2643 grpc_chttp2_add_rst_stream_to_next_write(t, id, GRPC_HTTP2_NO_ERROR,
2644 nullptr);
2645
2646 grpc_chttp2_initiate_write(t,
2647 GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API);
2648 });
2649 }
2650
end_all_the_calls(grpc_chttp2_transport * t,grpc_error_handle error)2651 static void end_all_the_calls(grpc_chttp2_transport* t,
2652 grpc_error_handle error) {
2653 intptr_t http2_error;
2654 // If there is no explicit grpc or HTTP/2 error, set to UNAVAILABLE on server.
2655 if (!t->is_client && !grpc_error_has_clear_grpc_status(error) &&
2656 !grpc_error_get_int(error, grpc_core::StatusIntProperty::kHttp2Error,
2657 &http2_error)) {
2658 error = grpc_error_set_int(error, grpc_core::StatusIntProperty::kRpcStatus,
2659 GRPC_STATUS_UNAVAILABLE);
2660 }
2661 cancel_unstarted_streams(t, error, false);
2662 std::vector<grpc_chttp2_stream*> to_cancel;
2663 for (auto id_stream : t->stream_map) {
2664 to_cancel.push_back(id_stream.second);
2665 }
2666 for (auto s : to_cancel) {
2667 grpc_chttp2_cancel_stream(t, s, error, false);
2668 }
2669 }
2670
2671 //
2672 // INPUT PROCESSING - PARSING
2673 //
2674
2675 template <class F>
WithUrgency(grpc_chttp2_transport * t,grpc_core::chttp2::FlowControlAction::Urgency urgency,grpc_chttp2_initiate_write_reason reason,F action)2676 static void WithUrgency(grpc_chttp2_transport* t,
2677 grpc_core::chttp2::FlowControlAction::Urgency urgency,
2678 grpc_chttp2_initiate_write_reason reason, F action) {
2679 switch (urgency) {
2680 case grpc_core::chttp2::FlowControlAction::Urgency::NO_ACTION_NEEDED:
2681 break;
2682 case grpc_core::chttp2::FlowControlAction::Urgency::UPDATE_IMMEDIATELY:
2683 grpc_chttp2_initiate_write(t, reason);
2684 ABSL_FALLTHROUGH_INTENDED;
2685 case grpc_core::chttp2::FlowControlAction::Urgency::QUEUE_UPDATE:
2686 action();
2687 break;
2688 }
2689 }
2690
grpc_chttp2_act_on_flowctl_action(const grpc_core::chttp2::FlowControlAction & action,grpc_chttp2_transport * t,grpc_chttp2_stream * s)2691 void grpc_chttp2_act_on_flowctl_action(
2692 const grpc_core::chttp2::FlowControlAction& action,
2693 grpc_chttp2_transport* t, grpc_chttp2_stream* s) {
2694 WithUrgency(t, action.send_stream_update(),
2695 GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL, [t, s]() {
2696 if (s->id != 0 && !s->read_closed) {
2697 grpc_chttp2_mark_stream_writable(t, s);
2698 }
2699 });
2700 WithUrgency(t, action.send_transport_update(),
2701 GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL, []() {});
2702 WithUrgency(t, action.send_initial_window_update(),
2703 GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
2704 t->settings.mutable_local().SetInitialWindowSize(
2705 action.initial_window_size());
2706 });
2707 WithUrgency(
2708 t, action.send_max_frame_size_update(),
2709 GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
2710 t->settings.mutable_local().SetMaxFrameSize(action.max_frame_size());
2711 });
2712 if (t->enable_preferred_rx_crypto_frame_advertisement) {
2713 WithUrgency(
2714 t, action.preferred_rx_crypto_frame_size_update(),
2715 GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
2716 t->settings.mutable_local().SetPreferredReceiveCryptoMessageSize(
2717 action.preferred_rx_crypto_frame_size());
2718 });
2719 }
2720 }
2721
try_http_parsing(grpc_chttp2_transport * t)2722 static grpc_error_handle try_http_parsing(grpc_chttp2_transport* t) {
2723 grpc_http_parser parser;
2724 size_t i = 0;
2725 grpc_error_handle error;
2726 grpc_http_response response;
2727
2728 grpc_http_parser_init(&parser, GRPC_HTTP_RESPONSE, &response);
2729
2730 grpc_error_handle parse_error;
2731 for (; i < t->read_buffer.count && parse_error.ok(); i++) {
2732 parse_error =
2733 grpc_http_parser_parse(&parser, t->read_buffer.slices[i], nullptr);
2734 }
2735 if (parse_error.ok() &&
2736 (parse_error = grpc_http_parser_eof(&parser)) == absl::OkStatus()) {
2737 error = grpc_error_set_int(
2738 GRPC_ERROR_CREATE(
2739 absl::StrCat("Trying to connect an http1.x server (HTTP status ",
2740 response.status, ")")),
2741 grpc_core::StatusIntProperty::kRpcStatus,
2742 grpc_http2_status_to_grpc_status(response.status));
2743 }
2744
2745 grpc_http_parser_destroy(&parser);
2746 grpc_http_response_destroy(&response);
2747 return error;
2748 }
2749
read_action(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)2750 static void read_action(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
2751 grpc_error_handle error) {
2752 auto* tp = t.get();
2753 tp->combiner->Run(grpc_core::InitTransportClosure<read_action_locked>(
2754 std::move(t), &tp->read_action_locked),
2755 error);
2756 }
2757
read_action_parse_loop_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)2758 static void read_action_parse_loop_locked(
2759 grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
2760 grpc_error_handle error) {
2761 GRPC_LATENT_SEE_INNER_SCOPE("read_action_parse_loop_locked");
2762 if (t->closed_with_error.ok()) {
2763 grpc_error_handle errors[3] = {error, absl::OkStatus(), absl::OkStatus()};
2764 size_t requests_started = 0;
2765 for (size_t i = 0;
2766 i < t->read_buffer.count && errors[1] == absl::OkStatus(); i++) {
2767 auto r = grpc_chttp2_perform_read(t.get(), t->read_buffer.slices[i],
2768 requests_started);
2769 if (auto* partial_read_size = absl::get_if<size_t>(&r)) {
2770 for (size_t j = 0; j < i; j++) {
2771 grpc_core::CSliceUnref(grpc_slice_buffer_take_first(&t->read_buffer));
2772 }
2773 grpc_slice_buffer_sub_first(
2774 &t->read_buffer, *partial_read_size,
2775 GRPC_SLICE_LENGTH(t->read_buffer.slices[0]));
2776 t->combiner->ForceOffload();
2777 auto* tp = t.get();
2778 tp->combiner->Run(
2779 grpc_core::InitTransportClosure<read_action_parse_loop_locked>(
2780 std::move(t), &tp->read_action_locked),
2781 std::move(errors[0]));
2782 // Early return: we queued to retry later.
2783 return;
2784 } else {
2785 errors[1] = std::move(absl::get<absl::Status>(r));
2786 }
2787 }
2788 if (errors[1] != absl::OkStatus()) {
2789 errors[2] = try_http_parsing(t.get());
2790 error = GRPC_ERROR_CREATE_REFERENCING("Failed parsing HTTP/2", errors,
2791 GPR_ARRAY_SIZE(errors));
2792 }
2793
2794 if (t->initial_window_update != 0) {
2795 if (t->initial_window_update > 0) {
2796 grpc_chttp2_stream* s;
2797 while (grpc_chttp2_list_pop_stalled_by_stream(t.get(), &s)) {
2798 grpc_chttp2_mark_stream_writable(t.get(), s);
2799 grpc_chttp2_initiate_write(
2800 t.get(),
2801 GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING);
2802 }
2803 }
2804 t->initial_window_update = 0;
2805 }
2806 }
2807
2808 bool keep_reading = false;
2809 if (error.ok() && !t->closed_with_error.ok()) {
2810 error = GRPC_ERROR_CREATE_REFERENCING("Transport closed",
2811 &t->closed_with_error, 1);
2812 }
2813 if (!error.ok()) {
2814 // If a goaway frame was received, this might be the reason why the read
2815 // failed. Add this info to the error
2816 if (!t->goaway_error.ok()) {
2817 error = grpc_error_add_child(error, t->goaway_error);
2818 }
2819
2820 close_transport_locked(t.get(), error);
2821 } else if (t->closed_with_error.ok()) {
2822 keep_reading = true;
2823 // Since we have read a byte, reset the keepalive timer
2824 if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
2825 maybe_reset_keepalive_ping_timer_locked(t.get());
2826 }
2827 }
2828 grpc_slice_buffer_reset_and_unref(&t->read_buffer);
2829
2830 if (keep_reading) {
2831 if (t->num_pending_induced_frames >= DEFAULT_MAX_PENDING_INDUCED_FRAMES) {
2832 t->reading_paused_on_pending_induced_frames = true;
2833 GRPC_TRACE_LOG(http, INFO)
2834 << "transport " << t.get()
2835 << " : Pausing reading due to too many unwritten "
2836 "SETTINGS ACK and RST_STREAM frames";
2837 } else {
2838 continue_read_action_locked(std::move(t));
2839 }
2840 }
2841 }
2842
read_action_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)2843 static void read_action_locked(
2844 grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
2845 grpc_error_handle error) {
2846 // got an incoming read, cancel any pending keepalive timers
2847 t->keepalive_incoming_data_wanted = false;
2848 if (t->keepalive_ping_timeout_handle != TaskHandle::kInvalid) {
2849 if (GRPC_TRACE_FLAG_ENABLED(http2_ping) ||
2850 GRPC_TRACE_FLAG_ENABLED(http_keepalive)) {
2851 LOG(INFO) << (t->is_client ? "CLIENT" : "SERVER") << "[" << t.get()
2852 << "]: Clear keepalive timer because data was received";
2853 }
2854 t->event_engine->Cancel(
2855 std::exchange(t->keepalive_ping_timeout_handle, TaskHandle::kInvalid));
2856 }
2857 grpc_error_handle err = error;
2858 if (!err.ok()) {
2859 err = grpc_error_set_int(
2860 GRPC_ERROR_CREATE_REFERENCING("Endpoint read failed", &err, 1),
2861 grpc_core::StatusIntProperty::kOccurredDuringWrite, t->write_state);
2862 }
2863 std::swap(err, error);
2864 read_action_parse_loop_locked(std::move(t), std::move(err));
2865 }
2866
continue_read_action_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t)2867 static void continue_read_action_locked(
2868 grpc_core::RefCountedPtr<grpc_chttp2_transport> t) {
2869 const bool urgent = !t->goaway_error.ok();
2870 auto* tp = t.get();
2871 grpc_endpoint_read(tp->ep.get(), &tp->read_buffer,
2872 grpc_core::InitTransportClosure<read_action>(
2873 std::move(t), &tp->read_action_locked),
2874 urgent, grpc_chttp2_min_read_progress_size(tp));
2875 }
2876
2877 // t is reffed prior to calling the first time, and once the callback chain
2878 // that kicks off finishes, it's unreffed
schedule_bdp_ping_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t)2879 void schedule_bdp_ping_locked(
2880 grpc_core::RefCountedPtr<grpc_chttp2_transport> t) {
2881 auto* tp = t.get();
2882 tp->flow_control.bdp_estimator()->SchedulePing();
2883 send_ping_locked(tp,
2884 grpc_core::InitTransportClosure<start_bdp_ping>(
2885 tp->Ref(), &tp->start_bdp_ping_locked),
2886 grpc_core::InitTransportClosure<finish_bdp_ping>(
2887 std::move(t), &tp->finish_bdp_ping_locked));
2888 grpc_chttp2_initiate_write(tp, GRPC_CHTTP2_INITIATE_WRITE_BDP_PING);
2889 }
2890
start_bdp_ping(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)2891 static void start_bdp_ping(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
2892 grpc_error_handle error) {
2893 grpc_chttp2_transport* tp = t.get();
2894 tp->combiner->Run(grpc_core::InitTransportClosure<start_bdp_ping_locked>(
2895 std::move(t), &tp->start_bdp_ping_locked),
2896 error);
2897 }
2898
start_bdp_ping_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)2899 static void start_bdp_ping_locked(
2900 grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
2901 grpc_error_handle error) {
2902 GRPC_TRACE_LOG(http, INFO)
2903 << t->peer_string.as_string_view()
2904 << ": Start BDP ping err=" << grpc_core::StatusToString(error);
2905 if (!error.ok() || !t->closed_with_error.ok()) {
2906 return;
2907 }
2908 // Reset the keepalive ping timer
2909 if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
2910 maybe_reset_keepalive_ping_timer_locked(t.get());
2911 }
2912 t->flow_control.bdp_estimator()->StartPing();
2913 t->bdp_ping_started = true;
2914 }
2915
finish_bdp_ping(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)2916 static void finish_bdp_ping(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
2917 grpc_error_handle error) {
2918 grpc_chttp2_transport* tp = t.get();
2919 tp->combiner->Run(grpc_core::InitTransportClosure<finish_bdp_ping_locked>(
2920 std::move(t), &tp->finish_bdp_ping_locked),
2921 error);
2922 }
2923
finish_bdp_ping_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)2924 static void finish_bdp_ping_locked(
2925 grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
2926 grpc_error_handle error) {
2927 GRPC_TRACE_LOG(http, INFO)
2928 << t->peer_string.as_string_view()
2929 << ": Complete BDP ping err=" << grpc_core::StatusToString(error);
2930 if (!error.ok() || !t->closed_with_error.ok()) {
2931 return;
2932 }
2933 if (!t->bdp_ping_started) {
2934 // start_bdp_ping_locked has not been run yet. Schedule
2935 // finish_bdp_ping_locked to be run later.
2936 finish_bdp_ping(std::move(t), std::move(error));
2937 return;
2938 }
2939 t->bdp_ping_started = false;
2940 grpc_core::Timestamp next_ping =
2941 t->flow_control.bdp_estimator()->CompletePing();
2942 grpc_chttp2_act_on_flowctl_action(t->flow_control.PeriodicUpdate(), t.get(),
2943 nullptr);
2944 CHECK(t->next_bdp_ping_timer_handle == TaskHandle::kInvalid);
2945 t->next_bdp_ping_timer_handle =
2946 t->event_engine->RunAfter(next_ping - grpc_core::Timestamp::Now(), [t] {
2947 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2948 grpc_core::ExecCtx exec_ctx;
2949 next_bdp_ping_timer_expired(t.get());
2950 });
2951 }
2952
next_bdp_ping_timer_expired(grpc_chttp2_transport * t)2953 static void next_bdp_ping_timer_expired(grpc_chttp2_transport* t) {
2954 t->combiner->Run(
2955 grpc_core::InitTransportClosure<next_bdp_ping_timer_expired_locked>(
2956 t->Ref(), &t->next_bdp_ping_timer_expired_locked),
2957 absl::OkStatus());
2958 }
2959
next_bdp_ping_timer_expired_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,GRPC_UNUSED grpc_error_handle error)2960 static void next_bdp_ping_timer_expired_locked(
2961 grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
2962 GRPC_UNUSED grpc_error_handle error) {
2963 DCHECK(error.ok());
2964 t->next_bdp_ping_timer_handle = TaskHandle::kInvalid;
2965 if (t->flow_control.bdp_estimator()->accumulator() == 0) {
2966 // Block the bdp ping till we receive more data.
2967 t->bdp_ping_blocked = true;
2968 } else {
2969 schedule_bdp_ping_locked(std::move(t));
2970 }
2971 }
2972
grpc_chttp2_config_default_keepalive_args(grpc_channel_args * args,const bool is_client)2973 void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
2974 const bool is_client) {
2975 grpc_chttp2_config_default_keepalive_args(grpc_core::ChannelArgs::FromC(args),
2976 is_client);
2977 }
2978
grpc_chttp2_config_default_keepalive_args_client(const grpc_core::ChannelArgs & channel_args)2979 static void grpc_chttp2_config_default_keepalive_args_client(
2980 const grpc_core::ChannelArgs& channel_args) {
2981 g_default_client_keepalive_time =
2982 std::max(grpc_core::Duration::Milliseconds(1),
2983 channel_args.GetDurationFromIntMillis(GRPC_ARG_KEEPALIVE_TIME_MS)
2984 .value_or(g_default_client_keepalive_time));
2985 g_default_client_keepalive_timeout = std::max(
2986 grpc_core::Duration::Zero(),
2987 channel_args.GetDurationFromIntMillis(GRPC_ARG_KEEPALIVE_TIMEOUT_MS)
2988 .value_or(g_default_client_keepalive_timeout));
2989 g_default_client_keepalive_permit_without_calls =
2990 channel_args.GetBool(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)
2991 .value_or(g_default_client_keepalive_permit_without_calls);
2992 }
2993
grpc_chttp2_config_default_keepalive_args_server(const grpc_core::ChannelArgs & channel_args)2994 static void grpc_chttp2_config_default_keepalive_args_server(
2995 const grpc_core::ChannelArgs& channel_args) {
2996 g_default_server_keepalive_time =
2997 std::max(grpc_core::Duration::Milliseconds(1),
2998 channel_args.GetDurationFromIntMillis(GRPC_ARG_KEEPALIVE_TIME_MS)
2999 .value_or(g_default_server_keepalive_time));
3000 g_default_server_keepalive_timeout = std::max(
3001 grpc_core::Duration::Zero(),
3002 channel_args.GetDurationFromIntMillis(GRPC_ARG_KEEPALIVE_TIMEOUT_MS)
3003 .value_or(g_default_server_keepalive_timeout));
3004 g_default_server_keepalive_permit_without_calls =
3005 channel_args.GetBool(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)
3006 .value_or(g_default_server_keepalive_permit_without_calls);
3007 }
3008
grpc_chttp2_config_default_keepalive_args(const grpc_core::ChannelArgs & channel_args,const bool is_client)3009 void grpc_chttp2_config_default_keepalive_args(
3010 const grpc_core::ChannelArgs& channel_args, const bool is_client) {
3011 if (is_client) {
3012 grpc_chttp2_config_default_keepalive_args_client(channel_args);
3013 } else {
3014 grpc_chttp2_config_default_keepalive_args_server(channel_args);
3015 }
3016 grpc_core::Chttp2PingAbusePolicy::SetDefaults(channel_args);
3017 grpc_core::Chttp2PingRatePolicy::SetDefaults(channel_args);
3018 }
3019
init_keepalive_ping(grpc_core::RefCountedPtr<grpc_chttp2_transport> t)3020 static void init_keepalive_ping(
3021 grpc_core::RefCountedPtr<grpc_chttp2_transport> t) {
3022 auto* tp = t.get();
3023 tp->combiner->Run(grpc_core::InitTransportClosure<init_keepalive_ping_locked>(
3024 std::move(t), &tp->init_keepalive_ping_locked),
3025 absl::OkStatus());
3026 }
3027
init_keepalive_ping_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,GRPC_UNUSED grpc_error_handle error)3028 static void init_keepalive_ping_locked(
3029 grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
3030 GRPC_UNUSED grpc_error_handle error) {
3031 DCHECK(error.ok());
3032 CHECK(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING);
3033 CHECK(t->keepalive_ping_timer_handle != TaskHandle::kInvalid);
3034 t->keepalive_ping_timer_handle = TaskHandle::kInvalid;
3035 if (t->destroying || !t->closed_with_error.ok()) {
3036 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
3037 } else {
3038 if (t->keepalive_permit_without_calls || !t->stream_map.empty()) {
3039 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING;
3040 send_keepalive_ping_locked(t);
3041 grpc_chttp2_initiate_write(t.get(),
3042 GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING);
3043 } else {
3044 t->keepalive_ping_timer_handle =
3045 t->event_engine->RunAfter(t->keepalive_time, [t] {
3046 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
3047 grpc_core::ExecCtx exec_ctx;
3048 init_keepalive_ping(t);
3049 });
3050 }
3051 }
3052 }
3053
finish_keepalive_ping(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)3054 static void finish_keepalive_ping(
3055 grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
3056 grpc_error_handle error) {
3057 auto* tp = t.get();
3058 tp->combiner->Run(
3059 grpc_core::InitTransportClosure<finish_keepalive_ping_locked>(
3060 std::move(t), &tp->finish_keepalive_ping_locked),
3061 error);
3062 }
3063
finish_keepalive_ping_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)3064 static void finish_keepalive_ping_locked(
3065 grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
3066 grpc_error_handle error) {
3067 if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
3068 if (error.ok()) {
3069 if (GRPC_TRACE_FLAG_ENABLED(http) ||
3070 GRPC_TRACE_FLAG_ENABLED(http_keepalive)) {
3071 LOG(INFO) << t->peer_string.as_string_view()
3072 << ": Finish keepalive ping";
3073 }
3074 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
3075 CHECK(t->keepalive_ping_timer_handle == TaskHandle::kInvalid);
3076 t->keepalive_ping_timer_handle =
3077 t->event_engine->RunAfter(t->keepalive_time, [t] {
3078 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
3079 grpc_core::ExecCtx exec_ctx;
3080 init_keepalive_ping(t);
3081 });
3082 }
3083 }
3084 }
3085
maybe_reset_keepalive_ping_timer_locked(grpc_chttp2_transport * t)3086 static void maybe_reset_keepalive_ping_timer_locked(grpc_chttp2_transport* t) {
3087 if (t->keepalive_ping_timer_handle != TaskHandle::kInvalid &&
3088 t->event_engine->Cancel(t->keepalive_ping_timer_handle)) {
3089 // Cancel succeeds, resets the keepalive ping timer. Note that we don't
3090 // need to Ref or Unref here since we still hold the Ref.
3091 if (GRPC_TRACE_FLAG_ENABLED(http) ||
3092 GRPC_TRACE_FLAG_ENABLED(http_keepalive)) {
3093 LOG(INFO) << t->peer_string.as_string_view()
3094 << ": Keepalive ping cancelled. Resetting timer.";
3095 }
3096 t->keepalive_ping_timer_handle =
3097 t->event_engine->RunAfter(t->keepalive_time, [t = t->Ref()]() mutable {
3098 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
3099 grpc_core::ExecCtx exec_ctx;
3100 init_keepalive_ping(std::move(t));
3101 });
3102 }
3103 }
3104
3105 //
3106 // CALLBACK LOOP
3107 //
3108
connectivity_state_set(grpc_chttp2_transport * t,grpc_connectivity_state state,const absl::Status & status,const char * reason)3109 static void connectivity_state_set(grpc_chttp2_transport* t,
3110 grpc_connectivity_state state,
3111 const absl::Status& status,
3112 const char* reason) {
3113 GRPC_TRACE_LOG(http, INFO)
3114 << "transport " << t << " set connectivity_state=" << state
3115 << "; status=" << status.ToString() << "; reason=" << reason;
3116 t->state_tracker.SetState(state, status, reason);
3117 }
3118
3119 //
3120 // POLLSET STUFF
3121 //
3122
SetPollset(grpc_stream *,grpc_pollset * pollset)3123 void grpc_chttp2_transport::SetPollset(grpc_stream* /*gs*/,
3124 grpc_pollset* pollset) {
3125 // We don't want the overhead of acquiring the mutex unless we're
3126 // using the "poll" polling engine, which is the only one that
3127 // actually uses pollsets.
3128 if (strcmp(grpc_get_poll_strategy_name(), "poll") != 0) return;
3129 grpc_core::MutexLock lock(&ep_destroy_mu);
3130 if (ep != nullptr) grpc_endpoint_add_to_pollset(ep.get(), pollset);
3131 }
3132
SetPollsetSet(grpc_stream *,grpc_pollset_set * pollset_set)3133 void grpc_chttp2_transport::SetPollsetSet(grpc_stream* /*gs*/,
3134 grpc_pollset_set* pollset_set) {
3135 // We don't want the overhead of acquiring the mutex unless we're
3136 // using the "poll" polling engine, which is the only one that
3137 // actually uses pollsets.
3138 if (strcmp(grpc_get_poll_strategy_name(), "poll") != 0) return;
3139 grpc_core::MutexLock lock(&ep_destroy_mu);
3140 if (ep != nullptr) grpc_endpoint_add_to_pollset_set(ep.get(), pollset_set);
3141 }
3142
3143 //
3144 // RESOURCE QUOTAS
3145 //
3146
post_benign_reclaimer(grpc_chttp2_transport * t)3147 static void post_benign_reclaimer(grpc_chttp2_transport* t) {
3148 if (!t->benign_reclaimer_registered) {
3149 t->benign_reclaimer_registered = true;
3150 t->memory_owner.PostReclaimer(
3151 grpc_core::ReclamationPass::kBenign,
3152 [t = t->Ref()](
3153 absl::optional<grpc_core::ReclamationSweep> sweep) mutable {
3154 if (sweep.has_value()) {
3155 auto* tp = t.get();
3156 tp->active_reclamation = std::move(*sweep);
3157 tp->combiner->Run(
3158 grpc_core::InitTransportClosure<benign_reclaimer_locked>(
3159 std::move(t), &tp->benign_reclaimer_locked),
3160 absl::OkStatus());
3161 }
3162 });
3163 }
3164 }
3165
post_destructive_reclaimer(grpc_chttp2_transport * t)3166 static void post_destructive_reclaimer(grpc_chttp2_transport* t) {
3167 if (!t->destructive_reclaimer_registered) {
3168 t->destructive_reclaimer_registered = true;
3169 t->memory_owner.PostReclaimer(
3170 grpc_core::ReclamationPass::kDestructive,
3171 [t = t->Ref()](
3172 absl::optional<grpc_core::ReclamationSweep> sweep) mutable {
3173 if (sweep.has_value()) {
3174 auto* tp = t.get();
3175 tp->active_reclamation = std::move(*sweep);
3176 tp->combiner->Run(
3177 grpc_core::InitTransportClosure<destructive_reclaimer_locked>(
3178 std::move(t), &tp->destructive_reclaimer_locked),
3179 absl::OkStatus());
3180 }
3181 });
3182 }
3183 }
3184
benign_reclaimer_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)3185 static void benign_reclaimer_locked(
3186 grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
3187 grpc_error_handle error) {
3188 if (error.ok() && t->stream_map.empty()) {
3189 // Channel with no active streams: send a goaway to try and make it
3190 // disconnect cleanly
3191 grpc_core::global_stats().IncrementRqConnectionsDropped();
3192 GRPC_TRACE_LOG(resource_quota, INFO)
3193 << "HTTP2: " << t->peer_string.as_string_view()
3194 << " - send goaway to free memory";
3195 send_goaway(t.get(),
3196 grpc_error_set_int(GRPC_ERROR_CREATE("Buffers full"),
3197 grpc_core::StatusIntProperty::kHttp2Error,
3198 GRPC_HTTP2_ENHANCE_YOUR_CALM),
3199 /*immediate_disconnect_hint=*/true);
3200 } else if (error.ok() && GRPC_TRACE_FLAG_ENABLED(resource_quota)) {
3201 LOG(INFO) << "HTTP2: " << t->peer_string.as_string_view()
3202 << " - skip benign reclamation, there are still "
3203 << t->stream_map.size() << " streams";
3204 }
3205 t->benign_reclaimer_registered = false;
3206 if (error != absl::CancelledError()) {
3207 t->active_reclamation.Finish();
3208 }
3209 }
3210
destructive_reclaimer_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)3211 static void destructive_reclaimer_locked(
3212 grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
3213 grpc_error_handle error) {
3214 t->destructive_reclaimer_registered = false;
3215 if (error.ok() && !t->stream_map.empty()) {
3216 // As stream_map is a hash map, this selects effectively a random stream.
3217 grpc_chttp2_stream* s = t->stream_map.begin()->second;
3218 GRPC_TRACE_LOG(resource_quota, INFO)
3219 << "HTTP2: " << t->peer_string.as_string_view()
3220 << " - abandon stream id " << s->id;
3221 grpc_core::global_stats().IncrementRqCallsDropped();
3222 grpc_chttp2_cancel_stream(
3223 t.get(), s,
3224 grpc_error_set_int(GRPC_ERROR_CREATE("Buffers full"),
3225 grpc_core::StatusIntProperty::kHttp2Error,
3226 GRPC_HTTP2_ENHANCE_YOUR_CALM),
3227 false);
3228 if (!t->stream_map.empty()) {
3229 // Since we cancel one stream per destructive reclamation, if
3230 // there are more streams left, we can immediately post a new
3231 // reclaimer in case the resource quota needs to free more
3232 // memory
3233 post_destructive_reclaimer(t.get());
3234 }
3235 }
3236 if (error != absl::CancelledError()) {
3237 t->active_reclamation.Finish();
3238 }
3239 }
3240
3241 //
3242 // MONITORING
3243 //
3244
grpc_chttp2_initiate_write_reason_string(grpc_chttp2_initiate_write_reason reason)3245 const char* grpc_chttp2_initiate_write_reason_string(
3246 grpc_chttp2_initiate_write_reason reason) {
3247 switch (reason) {
3248 case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE:
3249 return "INITIAL_WRITE";
3250 case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM:
3251 return "START_NEW_STREAM";
3252 case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE:
3253 return "SEND_MESSAGE";
3254 case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA:
3255 return "SEND_INITIAL_METADATA";
3256 case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA:
3257 return "SEND_TRAILING_METADATA";
3258 case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING:
3259 return "RETRY_SEND_PING";
3260 case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS:
3261 return "CONTINUE_PINGS";
3262 case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT:
3263 return "GOAWAY_SENT";
3264 case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM:
3265 return "RST_STREAM";
3266 case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API:
3267 return "CLOSE_FROM_API";
3268 case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL:
3269 return "STREAM_FLOW_CONTROL";
3270 case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL:
3271 return "TRANSPORT_FLOW_CONTROL";
3272 case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS:
3273 return "SEND_SETTINGS";
3274 case GRPC_CHTTP2_INITIATE_WRITE_SETTINGS_ACK:
3275 return "SETTINGS_ACK";
3276 case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING:
3277 return "FLOW_CONTROL_UNSTALLED_BY_SETTING";
3278 case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE:
3279 return "FLOW_CONTROL_UNSTALLED_BY_UPDATE";
3280 case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING:
3281 return "APPLICATION_PING";
3282 case GRPC_CHTTP2_INITIATE_WRITE_BDP_PING:
3283 return "BDP_PING";
3284 case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING:
3285 return "KEEPALIVE_PING";
3286 case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED:
3287 return "TRANSPORT_FLOW_CONTROL_UNSTALLED";
3288 case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE:
3289 return "PING_RESPONSE";
3290 case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM:
3291 return "FORCE_RST_STREAM";
3292 }
3293 GPR_UNREACHABLE_CODE(return "unknown");
3294 }
3295
SizeOfStream() const3296 size_t grpc_chttp2_transport::SizeOfStream() const {
3297 return sizeof(grpc_chttp2_stream);
3298 }
3299
3300 bool grpc_chttp2_transport::
HackyDisableStreamOpBatchCoalescingInConnectedChannel() const3301 HackyDisableStreamOpBatchCoalescingInConnectedChannel() const {
3302 return false;
3303 }
3304
GetTransportName() const3305 absl::string_view grpc_chttp2_transport::GetTransportName() const {
3306 return "chttp2";
3307 }
3308
3309 grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode>
grpc_chttp2_transport_get_socket_node(grpc_core::Transport * transport)3310 grpc_chttp2_transport_get_socket_node(grpc_core::Transport* transport) {
3311 grpc_chttp2_transport* t =
3312 reinterpret_cast<grpc_chttp2_transport*>(transport);
3313 return t->channelz_socket;
3314 }
3315
grpc_create_chttp2_transport(const grpc_core::ChannelArgs & channel_args,grpc_core::OrphanablePtr<grpc_endpoint> ep,const bool is_client)3316 grpc_core::Transport* grpc_create_chttp2_transport(
3317 const grpc_core::ChannelArgs& channel_args,
3318 grpc_core::OrphanablePtr<grpc_endpoint> ep, const bool is_client) {
3319 return new grpc_chttp2_transport(channel_args, std::move(ep), is_client);
3320 }
3321
grpc_chttp2_transport_start_reading(grpc_core::Transport * transport,grpc_slice_buffer * read_buffer,grpc_closure * notify_on_receive_settings,grpc_pollset_set * interested_parties_until_recv_settings,grpc_closure * notify_on_close)3322 void grpc_chttp2_transport_start_reading(
3323 grpc_core::Transport* transport, grpc_slice_buffer* read_buffer,
3324 grpc_closure* notify_on_receive_settings,
3325 grpc_pollset_set* interested_parties_until_recv_settings,
3326 grpc_closure* notify_on_close) {
3327 auto t = reinterpret_cast<grpc_chttp2_transport*>(transport)->Ref();
3328 if (read_buffer != nullptr) {
3329 grpc_slice_buffer_move_into(read_buffer, &t->read_buffer);
3330 }
3331 auto* tp = t.get();
3332 tp->combiner->Run(
3333 grpc_core::NewClosure([t = std::move(t), notify_on_receive_settings,
3334 interested_parties_until_recv_settings,
3335 notify_on_close](grpc_error_handle) mutable {
3336 if (!t->closed_with_error.ok()) {
3337 if (notify_on_receive_settings != nullptr) {
3338 if (t->ep != nullptr &&
3339 interested_parties_until_recv_settings != nullptr) {
3340 grpc_endpoint_delete_from_pollset_set(
3341 t->ep.get(), interested_parties_until_recv_settings);
3342 }
3343 grpc_core::ExecCtx::Run(DEBUG_LOCATION, notify_on_receive_settings,
3344 t->closed_with_error);
3345 }
3346 if (notify_on_close != nullptr) {
3347 grpc_core::ExecCtx::Run(DEBUG_LOCATION, notify_on_close,
3348 t->closed_with_error);
3349 }
3350 return;
3351 }
3352 t->interested_parties_until_recv_settings =
3353 interested_parties_until_recv_settings;
3354 t->notify_on_receive_settings = notify_on_receive_settings;
3355 t->notify_on_close = notify_on_close;
3356 read_action_locked(std::move(t), absl::OkStatus());
3357 }),
3358 absl::OkStatus());
3359 }
3360