1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpc/slice_buffer.h>
21 #include <grpc/support/port_platform.h>
22 #include <grpc/support/time.h>
23 #include <inttypes.h>
24 #include <stddef.h>
25
26 #include <algorithm>
27 #include <limits>
28 #include <memory>
29 #include <string>
30 #include <utility>
31
32 #include "absl/container/flat_hash_map.h"
33 #include "absl/log/check.h"
34 #include "absl/log/log.h"
35 #include "absl/status/status.h"
36 #include "absl/types/optional.h"
37 #include "src/core/channelz/channelz.h"
38 #include "src/core/ext/transport/chttp2/transport/call_tracer_wrapper.h"
39 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
40 #include "src/core/ext/transport/chttp2/transport/context_list_entry.h"
41 #include "src/core/ext/transport/chttp2/transport/flow_control.h"
42 #include "src/core/ext/transport/chttp2/transport/frame_data.h"
43 #include "src/core/ext/transport/chttp2/transport/frame_ping.h"
44 #include "src/core/ext/transport/chttp2/transport/frame_rst_stream.h"
45 #include "src/core/ext/transport/chttp2/transport/frame_settings.h"
46 #include "src/core/ext/transport/chttp2/transport/frame_window_update.h"
47 #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
48 #include "src/core/ext/transport/chttp2/transport/http2_settings.h"
49 #include "src/core/ext/transport/chttp2/transport/internal.h"
50 #include "src/core/ext/transport/chttp2/transport/legacy_frame.h"
51 #include "src/core/ext/transport/chttp2/transport/ping_callbacks.h"
52 #include "src/core/ext/transport/chttp2/transport/ping_rate_policy.h"
53 #include "src/core/ext/transport/chttp2/transport/stream_lists.h"
54 #include "src/core/ext/transport/chttp2/transport/write_size_policy.h"
55 #include "src/core/lib/debug/trace.h"
56 #include "src/core/lib/experiments/experiments.h"
57 #include "src/core/lib/iomgr/endpoint.h"
58 #include "src/core/lib/iomgr/error.h"
59 #include "src/core/lib/iomgr/exec_ctx.h"
60 #include "src/core/lib/slice/slice.h"
61 #include "src/core/lib/slice/slice_buffer.h"
62 #include "src/core/lib/transport/bdp_estimator.h"
63 #include "src/core/lib/transport/http2_errors.h"
64 #include "src/core/lib/transport/metadata_batch.h"
65 #include "src/core/lib/transport/transport.h"
66 #include "src/core/telemetry/call_tracer.h"
67 #include "src/core/telemetry/stats.h"
68 #include "src/core/telemetry/stats_data.h"
69 #include "src/core/util/match.h"
70 #include "src/core/util/ref_counted.h"
71 #include "src/core/util/ref_counted_ptr.h"
72 #include "src/core/util/time.h"
73 #include "src/core/util/useful.h"
74
75 // IWYU pragma: no_include "src/core/util/orphanable.h"
76
add_to_write_list(grpc_chttp2_write_cb ** list,grpc_chttp2_write_cb * cb)77 static void add_to_write_list(grpc_chttp2_write_cb** list,
78 grpc_chttp2_write_cb* cb) {
79 cb->next = *list;
80 *list = cb;
81 }
82
finish_write_cb(grpc_chttp2_transport * t,grpc_chttp2_write_cb * cb,grpc_error_handle error)83 static void finish_write_cb(grpc_chttp2_transport* t, grpc_chttp2_write_cb* cb,
84 grpc_error_handle error) {
85 grpc_chttp2_complete_closure_step(t, &cb->closure, error, "finish_write_cb");
86 cb->next = t->write_cb_pool;
87 t->write_cb_pool = cb;
88 }
89
NextAllowedPingInterval(grpc_chttp2_transport * t)90 static grpc_core::Duration NextAllowedPingInterval(grpc_chttp2_transport* t) {
91 if (t->is_client) {
92 return (t->keepalive_permit_without_calls == 0 && t->stream_map.empty())
93 ? grpc_core::Duration::Hours(2)
94 : grpc_core::Duration::Seconds(
95 1); // A second is added to deal with
96 // network delays and timing imprecision
97 }
98 if (t->sent_goaway_state != GRPC_CHTTP2_GRACEFUL_GOAWAY) {
99 // The gRPC keepalive spec doesn't call for any throttling on the server
100 // side, but we are adding some throttling for protection anyway, unless
101 // we are doing a graceful GOAWAY in which case we don't want to wait.
102 if (grpc_core::IsMultipingEnabled()) {
103 return grpc_core::Duration::Seconds(1);
104 }
105 return t->keepalive_time == grpc_core::Duration::Infinity()
106 ? grpc_core::Duration::Seconds(20)
107 : t->keepalive_time / 2;
108 }
109 return grpc_core::Duration::Zero();
110 }
111
maybe_initiate_ping(grpc_chttp2_transport * t)112 static void maybe_initiate_ping(grpc_chttp2_transport* t) {
113 if (!t->ping_callbacks.ping_requested()) {
114 // no ping needed: wait
115 return;
116 }
117 // InvalidateNow to avoid getting stuck re-initializing the ping timer
118 // in a loop while draining the currently-held combiner. Also see
119 // https://github.com/grpc/grpc/issues/26079.
120 grpc_core::ExecCtx::Get()->InvalidateNow();
121 Match(
122 t->ping_rate_policy.RequestSendPing(NextAllowedPingInterval(t),
123 t->ping_callbacks.pings_inflight()),
124 [t](grpc_core::Chttp2PingRatePolicy::SendGranted) {
125 t->ping_rate_policy.SentPing();
126 const uint64_t id = t->ping_callbacks.StartPing(t->bitgen);
127 grpc_slice_buffer_add(t->outbuf.c_slice_buffer(),
128 grpc_chttp2_ping_create(false, id));
129 t->keepalive_incoming_data_wanted = true;
130 if (t->channelz_socket != nullptr) {
131 t->channelz_socket->RecordKeepaliveSent();
132 }
133 grpc_core::global_stats().IncrementHttp2PingsSent();
134 if (GRPC_TRACE_FLAG_ENABLED(http) ||
135 GRPC_TRACE_FLAG_ENABLED(bdp_estimator) ||
136 GRPC_TRACE_FLAG_ENABLED(http_keepalive) ||
137 GRPC_TRACE_FLAG_ENABLED(http2_ping)) {
138 LOG(INFO) << (t->is_client ? "CLIENT" : "SERVER") << "[" << t
139 << "]: Ping " << id << " sent ["
140 << std::string(t->peer_string.as_string_view())
141 << "]: " << t->ping_rate_policy.GetDebugString();
142 }
143 },
144 [t](grpc_core::Chttp2PingRatePolicy::TooManyRecentPings) {
145 // need to receive something of substance before sending a ping again
146 if (GRPC_TRACE_FLAG_ENABLED(http) ||
147 GRPC_TRACE_FLAG_ENABLED(bdp_estimator) ||
148 GRPC_TRACE_FLAG_ENABLED(http_keepalive) ||
149 GRPC_TRACE_FLAG_ENABLED(http2_ping)) {
150 LOG(INFO) << (t->is_client ? "CLIENT" : "SERVER") << "[" << t
151 << "]: Ping delayed ["
152 << std::string(t->peer_string.as_string_view())
153 << "]: too many recent pings: "
154 << t->ping_rate_policy.GetDebugString();
155 }
156 },
157 [t](grpc_core::Chttp2PingRatePolicy::TooSoon too_soon) {
158 // not enough elapsed time between successive pings
159 if (GRPC_TRACE_FLAG_ENABLED(http) ||
160 GRPC_TRACE_FLAG_ENABLED(bdp_estimator) ||
161 GRPC_TRACE_FLAG_ENABLED(http_keepalive) ||
162 GRPC_TRACE_FLAG_ENABLED(http2_ping)) {
163 LOG(INFO) << (t->is_client ? "CLIENT" : "SERVER") << "[" << t
164 << "]: Ping delayed ["
165 << std::string(t->peer_string.as_string_view())
166 << "]: not enough time elapsed since last "
167 "ping. Last ping:"
168 << too_soon.last_ping
169 << ", minimum wait:" << too_soon.next_allowed_ping_interval
170 << ", need to wait:" << too_soon.wait;
171 }
172 if (t->delayed_ping_timer_handle ==
173 grpc_event_engine::experimental::EventEngine::TaskHandle::
174 kInvalid) {
175 t->delayed_ping_timer_handle = t->event_engine->RunAfter(
176 too_soon.wait, [t = t->Ref()]() mutable {
177 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
178 grpc_core::ExecCtx exec_ctx;
179 grpc_chttp2_retry_initiate_ping(std::move(t));
180 });
181 }
182 });
183 }
184
update_list(grpc_chttp2_transport * t,int64_t send_bytes,grpc_chttp2_write_cb ** list,int64_t * ctr,grpc_error_handle error)185 static bool update_list(grpc_chttp2_transport* t, int64_t send_bytes,
186 grpc_chttp2_write_cb** list, int64_t* ctr,
187 grpc_error_handle error) {
188 bool sched_any = false;
189 grpc_chttp2_write_cb* cb = *list;
190 *list = nullptr;
191 *ctr += send_bytes;
192 while (cb) {
193 grpc_chttp2_write_cb* next = cb->next;
194 if (cb->call_at_byte <= *ctr) {
195 sched_any = true;
196 finish_write_cb(t, cb, error);
197 } else {
198 add_to_write_list(list, cb);
199 }
200 cb = next;
201 }
202 return sched_any;
203 }
204
report_stall(grpc_chttp2_transport * t,grpc_chttp2_stream * s,const char * staller)205 static void report_stall(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
206 const char* staller) {
207 GRPC_TRACE_VLOG(flowctl, 2)
208 << t->peer_string.as_string_view() << ":" << t << " stream " << s->id
209 << " moved to stalled list by " << staller
210 << ". This is FULLY expected to happen in a healthy program that is not "
211 "seeing flow control stalls. However, if you know that there are "
212 "unwanted stalls, here is some helpful data: [fc:pending="
213 << s->flow_controlled_buffer.length
214 << ":flowed=" << s->flow_controlled_bytes_flowed
215 << ":peer_initwin=" << t->settings.acked().initial_window_size()
216 << ":t_win=" << t->flow_control.remote_window() << ":s_win="
217 << static_cast<uint32_t>(std::max(
218 int64_t{0}, s->flow_control.remote_window_delta() +
219 static_cast<int64_t>(
220 t->settings.peer().initial_window_size())))
221 << ":s_delta=" << s->flow_control.remote_window_delta() << "]";
222 }
223
224 namespace {
225
226 class CountDefaultMetadataEncoder {
227 public:
count() const228 size_t count() const { return count_; }
229
Encode(const grpc_core::Slice &,const grpc_core::Slice &)230 void Encode(const grpc_core::Slice&, const grpc_core::Slice&) {}
231
232 template <typename Which>
Encode(Which,const typename Which::ValueType &)233 void Encode(Which, const typename Which::ValueType&) {
234 count_++;
235 }
236
237 private:
238 size_t count_ = 0;
239 };
240
241 } // namespace
242
243 // Returns true if initial_metadata contains only default headers.
is_default_initial_metadata(grpc_metadata_batch * initial_metadata)244 static bool is_default_initial_metadata(grpc_metadata_batch* initial_metadata) {
245 CountDefaultMetadataEncoder enc;
246 initial_metadata->Encode(&enc);
247 return enc.count() == initial_metadata->count();
248 }
249
250 namespace {
251
252 class WriteContext {
253 public:
WriteContext(grpc_chttp2_transport * t)254 explicit WriteContext(grpc_chttp2_transport* t) : t_(t) {
255 grpc_core::global_stats().IncrementHttp2WritesBegun();
256 }
257
FlushSettings()258 void FlushSettings() {
259 auto update = t_->settings.MaybeSendUpdate();
260 if (update.has_value()) {
261 grpc_core::Http2Frame frame(std::move(*update));
262 Serialize(absl::Span<grpc_core::Http2Frame>(&frame, 1), t_->outbuf);
263 if (t_->keepalive_timeout != grpc_core::Duration::Infinity()) {
264 CHECK(
265 t_->settings_ack_watchdog ==
266 grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid);
267 // We base settings timeout on keepalive timeout, but double it to allow
268 // for implementations taking some more time about acking a setting.
269 t_->settings_ack_watchdog = t_->event_engine->RunAfter(
270 t_->settings_timeout, [t = t_->Ref()]() mutable {
271 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
272 grpc_core::ExecCtx exec_ctx;
273 grpc_chttp2_settings_timeout(std::move(t));
274 });
275 }
276 t_->flow_control.FlushedSettings();
277 grpc_core::global_stats().IncrementHttp2SettingsWrites();
278 }
279 }
280
FlushQueuedBuffers()281 void FlushQueuedBuffers() {
282 // simple writes are queued to qbuf, and flushed here
283 grpc_slice_buffer_move_into(&t_->qbuf, t_->outbuf.c_slice_buffer());
284 t_->num_pending_induced_frames = 0;
285 CHECK_EQ(t_->qbuf.count, 0u);
286 }
287
FlushWindowUpdates()288 void FlushWindowUpdates() {
289 uint32_t transport_announce = t_->flow_control.MaybeSendUpdate(
290 t_->outbuf.c_slice_buffer()->count > 0);
291 if (transport_announce) {
292 grpc_slice_buffer_add(
293 t_->outbuf.c_slice_buffer(),
294 grpc_chttp2_window_update_create(0, transport_announce, nullptr));
295 grpc_chttp2_reset_ping_clock(t_);
296 }
297 }
298
FlushPingAcks()299 void FlushPingAcks() {
300 if (t_->ping_ack_count == 0) return;
301 // Limit the size of writes if we include ping acks - to avoid the ack being
302 // delayed by crypto operations.
303 target_write_size_ = 0;
304 for (size_t i = 0; i < t_->ping_ack_count; i++) {
305 grpc_slice_buffer_add(t_->outbuf.c_slice_buffer(),
306 grpc_chttp2_ping_create(true, t_->ping_acks[i]));
307 }
308 t_->ping_ack_count = 0;
309 }
310
EnactHpackSettings()311 void EnactHpackSettings() {
312 t_->hpack_compressor.SetMaxTableSize(
313 t_->settings.peer().header_table_size());
314 }
315
UpdateStreamsNoLongerStalled()316 void UpdateStreamsNoLongerStalled() {
317 grpc_chttp2_stream* s;
318 while (grpc_chttp2_list_pop_stalled_by_transport(t_, &s)) {
319 if (t_->closed_with_error.ok() &&
320 grpc_chttp2_list_add_writable_stream(t_, s)) {
321 if (!s->refcount->refs.RefIfNonZero()) {
322 grpc_chttp2_list_remove_writable_stream(t_, s);
323 }
324 }
325 }
326 }
327
NextStream()328 grpc_chttp2_stream* NextStream() {
329 if (t_->outbuf.c_slice_buffer()->length > target_write_size_) {
330 result_.partial = true;
331 return nullptr;
332 }
333
334 grpc_chttp2_stream* s;
335 if (!grpc_chttp2_list_pop_writable_stream(t_, &s)) {
336 return nullptr;
337 }
338
339 return s;
340 }
341
IncInitialMetadataWrites()342 void IncInitialMetadataWrites() { ++initial_metadata_writes_; }
IncWindowUpdateWrites()343 void IncWindowUpdateWrites() { ++flow_control_writes_; }
IncMessageWrites()344 void IncMessageWrites() { ++message_writes_; }
IncTrailingMetadataWrites()345 void IncTrailingMetadataWrites() { ++trailing_metadata_writes_; }
346
NoteScheduledResults()347 void NoteScheduledResults() { result_.early_results_scheduled = true; }
348
transport() const349 grpc_chttp2_transport* transport() const { return t_; }
350
Result()351 grpc_chttp2_begin_write_result Result() {
352 result_.writing = t_->outbuf.c_slice_buffer()->count > 0;
353 return result_;
354 }
355
target_write_size() const356 size_t target_write_size() const { return target_write_size_; }
357
358 private:
359 grpc_chttp2_transport* const t_;
360 size_t target_write_size_ = t_->write_size_policy.WriteTargetSize();
361
362 // stats histogram counters: we increment these throughout this function,
363 // and at the end publish to the central stats histograms
364 int flow_control_writes_ = 0;
365 int initial_metadata_writes_ = 0;
366 int trailing_metadata_writes_ = 0;
367 int message_writes_ = 0;
368 grpc_chttp2_begin_write_result result_ = {false, false, false};
369 };
370
371 class DataSendContext {
372 public:
DataSendContext(WriteContext * write_context,grpc_chttp2_transport * t,grpc_chttp2_stream * s)373 DataSendContext(WriteContext* write_context, grpc_chttp2_transport* t,
374 grpc_chttp2_stream* s)
375 : write_context_(write_context),
376 t_(t),
377 s_(s),
378 sending_bytes_before_(s_->sending_bytes) {}
379
stream_remote_window() const380 uint32_t stream_remote_window() const {
381 return static_cast<uint32_t>(std::max(
382 int64_t{0},
383 s_->flow_control.remote_window_delta() +
384 static_cast<int64_t>(t_->settings.peer().initial_window_size())));
385 }
386
max_outgoing() const387 uint32_t max_outgoing() const {
388 return grpc_core::Clamp<uint32_t>(
389 std::min<int64_t>(
390 {t_->settings.peer().max_frame_size(), stream_remote_window(),
391 t_->flow_control.remote_window(),
392 static_cast<int64_t>(write_context_->target_write_size())}),
393 0, std::numeric_limits<uint32_t>::max());
394 }
395
AnyOutgoing() const396 bool AnyOutgoing() const { return max_outgoing() > 0; }
397
FlushBytes()398 void FlushBytes() {
399 uint32_t send_bytes =
400 static_cast<uint32_t>(std::min(static_cast<size_t>(max_outgoing()),
401 s_->flow_controlled_buffer.length));
402 is_last_frame_ = send_bytes == s_->flow_controlled_buffer.length &&
403 s_->send_trailing_metadata != nullptr &&
404 s_->send_trailing_metadata->empty();
405 grpc_chttp2_encode_data(s_->id, &s_->flow_controlled_buffer, send_bytes,
406 is_last_frame_, &s_->call_tracer_wrapper,
407 t_->outbuf.c_slice_buffer());
408 sfc_upd_.SentData(send_bytes);
409 s_->sending_bytes += send_bytes;
410 }
411
is_last_frame() const412 bool is_last_frame() const { return is_last_frame_; }
413
CallCallbacks()414 void CallCallbacks() {
415 if (update_list(
416 t_, static_cast<int64_t>(s_->sending_bytes - sending_bytes_before_),
417 &s_->on_flow_controlled_cbs, &s_->flow_controlled_bytes_flowed,
418 absl::OkStatus())) {
419 write_context_->NoteScheduledResults();
420 }
421 }
422
423 private:
424 WriteContext* write_context_;
425 grpc_chttp2_transport* t_;
426 grpc_chttp2_stream* s_;
427 grpc_core::chttp2::StreamFlowControl::OutgoingUpdateContext sfc_upd_{
428 &s_->flow_control};
429 const size_t sending_bytes_before_;
430 bool is_last_frame_ = false;
431 };
432
433 class StreamWriteContext {
434 public:
StreamWriteContext(WriteContext * write_context,grpc_chttp2_stream * s)435 StreamWriteContext(WriteContext* write_context, grpc_chttp2_stream* s)
436 : write_context_(write_context), t_(write_context->transport()), s_(s) {
437 GRPC_CHTTP2_IF_TRACING(INFO)
438 << "W:" << t_ << " " << (t_->is_client ? "CLIENT" : "SERVER") << "["
439 << s->id << "] im-(sent,send)=(" << s->sent_initial_metadata << ","
440 << (s->send_initial_metadata != nullptr) << ")";
441 }
442
FlushInitialMetadata()443 void FlushInitialMetadata() {
444 // send initial metadata if it's available
445 if (s_->sent_initial_metadata) return;
446 if (s_->send_initial_metadata == nullptr) return;
447
448 // We skip this on the server side if there is no custom initial
449 // metadata, there are no messages to send, and we are also sending
450 // trailing metadata. This results in a Trailers-Only response,
451 // which is required for retries, as per:
452 // https://github.com/grpc/proposal/blob/master/A6-client-retries.md#when-retries-are-valid
453 if (!t_->is_client && s_->flow_controlled_buffer.length == 0 &&
454 s_->send_trailing_metadata != nullptr &&
455 is_default_initial_metadata(s_->send_initial_metadata)) {
456 ConvertInitialMetadataToTrailingMetadata();
457 } else {
458 t_->hpack_compressor.EncodeHeaders(
459 grpc_core::HPackCompressor::EncodeHeaderOptions{
460 s_->id, // stream_id
461 false, // is_eof
462 t_->settings.peer()
463 .allow_true_binary_metadata(), // use_true_binary_metadata
464 t_->settings.peer().max_frame_size(), // max_frame_size
465 &s_->call_tracer_wrapper},
466 *s_->send_initial_metadata, t_->outbuf.c_slice_buffer());
467 grpc_chttp2_reset_ping_clock(t_);
468 write_context_->IncInitialMetadataWrites();
469 }
470
471 s_->send_initial_metadata = nullptr;
472 s_->sent_initial_metadata = true;
473 write_context_->NoteScheduledResults();
474 grpc_chttp2_complete_closure_step(t_, &s_->send_initial_metadata_finished,
475 absl::OkStatus(),
476 "send_initial_metadata_finished");
477 if (!grpc_core::IsCallTracerInTransportEnabled()) {
478 if (s_->call_tracer) {
479 grpc_core::HttpAnnotation::WriteStats write_stats;
480 write_stats.target_write_size = write_context_->target_write_size();
481 s_->call_tracer->RecordAnnotation(
482 grpc_core::HttpAnnotation(
483 grpc_core::HttpAnnotation::Type::kHeadWritten,
484 gpr_now(GPR_CLOCK_REALTIME))
485 .Add(s_->t->flow_control.stats())
486 .Add(s_->flow_control.stats())
487 .Add(write_stats));
488 }
489 } else if (grpc_core::IsTraceRecordCallopsEnabled()) {
490 auto* call_tracer =
491 s_->arena->GetContext<grpc_core::CallTracerInterface>();
492 if (call_tracer != nullptr && call_tracer->IsSampled()) {
493 grpc_core::HttpAnnotation::WriteStats write_stats;
494 write_stats.target_write_size = write_context_->target_write_size();
495 call_tracer->RecordAnnotation(
496 grpc_core::HttpAnnotation(
497 grpc_core::HttpAnnotation::Type::kHeadWritten,
498 gpr_now(GPR_CLOCK_REALTIME))
499 .Add(s_->t->flow_control.stats())
500 .Add(s_->flow_control.stats())
501 .Add(write_stats));
502 }
503 }
504 }
505
FlushWindowUpdates()506 void FlushWindowUpdates() {
507 if (s_->read_closed) return;
508
509 // send any window updates
510 const uint32_t stream_announce = s_->flow_control.MaybeSendUpdate();
511 if (stream_announce == 0) return;
512
513 grpc_slice_buffer_add(
514 t_->outbuf.c_slice_buffer(),
515 grpc_chttp2_window_update_create(s_->id, stream_announce,
516 &s_->call_tracer_wrapper));
517 grpc_chttp2_reset_ping_clock(t_);
518 write_context_->IncWindowUpdateWrites();
519 }
520
FlushData()521 void FlushData() {
522 if (!s_->sent_initial_metadata) return;
523
524 if (s_->flow_controlled_buffer.length == 0) {
525 return; // early out: nothing to do
526 }
527
528 DataSendContext data_send_context(write_context_, t_, s_);
529
530 if (!data_send_context.AnyOutgoing()) {
531 if (t_->flow_control.remote_window() <= 0) {
532 grpc_core::global_stats().IncrementHttp2TransportStalls();
533 report_stall(t_, s_, "transport");
534 grpc_chttp2_list_add_stalled_by_transport(t_, s_);
535 } else if (data_send_context.stream_remote_window() <= 0) {
536 grpc_core::global_stats().IncrementHttp2StreamStalls();
537 report_stall(t_, s_, "stream");
538 grpc_chttp2_list_add_stalled_by_stream(t_, s_);
539 }
540 return; // early out: nothing to do
541 }
542
543 while (s_->flow_controlled_buffer.length > 0 &&
544 data_send_context.max_outgoing() > 0) {
545 data_send_context.FlushBytes();
546 }
547 grpc_chttp2_reset_ping_clock(t_);
548 if (data_send_context.is_last_frame()) {
549 SentLastFrame();
550 }
551 data_send_context.CallCallbacks();
552 stream_became_writable_ = true;
553 if (s_->flow_controlled_buffer.length > 0) {
554 GRPC_CHTTP2_STREAM_REF(s_, "chttp2_writing:fork");
555 grpc_chttp2_list_add_writable_stream(t_, s_);
556 }
557 write_context_->IncMessageWrites();
558 }
559
FlushTrailingMetadata()560 void FlushTrailingMetadata() {
561 if (!s_->sent_initial_metadata) return;
562
563 if (s_->send_trailing_metadata == nullptr) return;
564 if (s_->flow_controlled_buffer.length != 0) return;
565
566 GRPC_CHTTP2_IF_TRACING(INFO) << "sending trailing_metadata";
567 if (s_->send_trailing_metadata->empty()) {
568 grpc_chttp2_encode_data(s_->id, &s_->flow_controlled_buffer, 0, true,
569 &s_->call_tracer_wrapper,
570 t_->outbuf.c_slice_buffer());
571 } else {
572 t_->hpack_compressor.EncodeHeaders(
573 grpc_core::HPackCompressor::EncodeHeaderOptions{
574 s_->id, true, t_->settings.peer().allow_true_binary_metadata(),
575 t_->settings.peer().max_frame_size(), &s_->call_tracer_wrapper},
576 *s_->send_trailing_metadata, t_->outbuf.c_slice_buffer());
577 }
578 write_context_->IncTrailingMetadataWrites();
579 grpc_chttp2_reset_ping_clock(t_);
580 SentLastFrame();
581
582 write_context_->NoteScheduledResults();
583 grpc_chttp2_complete_closure_step(t_, &s_->send_trailing_metadata_finished,
584 absl::OkStatus(),
585 "send_trailing_metadata_finished");
586 }
587
stream_became_writable()588 bool stream_became_writable() { return stream_became_writable_; }
589
590 private:
591 class TrailersOnlyMetadataEncoder {
592 public:
TrailersOnlyMetadataEncoder(grpc_metadata_batch * trailing_md)593 explicit TrailersOnlyMetadataEncoder(grpc_metadata_batch* trailing_md)
594 : trailing_md_(trailing_md) {}
595
596 template <typename Which, typename Value>
Encode(Which which,Value value)597 void Encode(Which which, Value value) {
598 if (Which::kTransferOnTrailersOnly) {
599 trailing_md_->Set(which, value);
600 }
601 }
602
603 template <typename Which>
Encode(Which which,const grpc_core::Slice & value)604 void Encode(Which which, const grpc_core::Slice& value) {
605 if (Which::kTransferOnTrailersOnly) {
606 trailing_md_->Set(which, value.Ref());
607 }
608 }
609
610 // Non-grpc metadata should not be transferred.
Encode(const grpc_core::Slice &,const grpc_core::Slice &)611 void Encode(const grpc_core::Slice&, const grpc_core::Slice&) {}
612
613 private:
614 grpc_metadata_batch* trailing_md_;
615 };
616
ConvertInitialMetadataToTrailingMetadata()617 void ConvertInitialMetadataToTrailingMetadata() {
618 GRPC_CHTTP2_IF_TRACING(INFO)
619 << "not sending initial_metadata (Trailers-Only)";
620 // When sending Trailers-Only, we need to move metadata from headers to
621 // trailers.
622 TrailersOnlyMetadataEncoder encoder(s_->send_trailing_metadata);
623 s_->send_initial_metadata->Encode(&encoder);
624 }
625
SentLastFrame()626 void SentLastFrame() {
627 s_->send_trailing_metadata = nullptr;
628 if (s_->sent_trailing_metadata_op) {
629 *s_->sent_trailing_metadata_op = true;
630 s_->sent_trailing_metadata_op = nullptr;
631 }
632 s_->sent_trailing_metadata = true;
633 s_->eos_sent = true;
634
635 if (!t_->is_client && !s_->read_closed) {
636 grpc_slice_buffer_add(
637 t_->outbuf.c_slice_buffer(),
638 grpc_chttp2_rst_stream_create(s_->id, GRPC_HTTP2_NO_ERROR,
639 &s_->call_tracer_wrapper));
640 }
641 grpc_chttp2_mark_stream_closed(t_, s_, !t_->is_client, true,
642 absl::OkStatus());
643 if (!grpc_core::IsCallTracerInTransportEnabled()) {
644 if (s_->call_tracer) {
645 s_->call_tracer->RecordAnnotation(
646 grpc_core::HttpAnnotation(grpc_core::HttpAnnotation::Type::kEnd,
647 gpr_now(GPR_CLOCK_REALTIME))
648 .Add(s_->t->flow_control.stats())
649 .Add(s_->flow_control.stats()));
650 }
651 } else if (grpc_core::IsTraceRecordCallopsEnabled()) {
652 auto* call_tracer =
653 s_->arena->GetContext<grpc_core::CallTracerInterface>();
654 if (call_tracer != nullptr && call_tracer->IsSampled()) {
655 call_tracer->RecordAnnotation(
656 grpc_core::HttpAnnotation(grpc_core::HttpAnnotation::Type::kEnd,
657 gpr_now(GPR_CLOCK_REALTIME))
658 .Add(s_->t->flow_control.stats())
659 .Add(s_->flow_control.stats()));
660 }
661 }
662 }
663
664 WriteContext* const write_context_;
665 grpc_chttp2_transport* const t_;
666 grpc_chttp2_stream* const s_;
667 bool stream_became_writable_ = false;
668 };
669 } // namespace
670
grpc_chttp2_begin_write(grpc_chttp2_transport * t)671 grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
672 grpc_chttp2_transport* t) {
673 GRPC_LATENT_SEE_INNER_SCOPE("grpc_chttp2_begin_write");
674
675 int64_t outbuf_relative_start_pos = 0;
676 WriteContext ctx(t);
677 ctx.FlushSettings();
678 ctx.FlushPingAcks();
679 ctx.FlushQueuedBuffers();
680 ctx.EnactHpackSettings();
681
682 if (t->flow_control.remote_window() > 0) {
683 ctx.UpdateStreamsNoLongerStalled();
684 }
685
686 // for each grpc_chttp2_stream that's become writable, frame it's data
687 // (according to available window sizes) and add to the output buffer
688 while (grpc_chttp2_stream* s = ctx.NextStream()) {
689 StreamWriteContext stream_ctx(&ctx, s);
690 size_t orig_len = t->outbuf.c_slice_buffer()->length;
691 int64_t num_stream_bytes = 0;
692 stream_ctx.FlushInitialMetadata();
693 stream_ctx.FlushWindowUpdates();
694 stream_ctx.FlushData();
695 stream_ctx.FlushTrailingMetadata();
696 if (t->outbuf.c_slice_buffer()->length > orig_len) {
697 // Add this stream to the list of the contexts to be traced at TCP
698 num_stream_bytes = t->outbuf.c_slice_buffer()->length - orig_len;
699 s->byte_counter += static_cast<size_t>(num_stream_bytes);
700 ++s->write_counter;
701 if (s->traced && grpc_endpoint_can_track_err(t->ep.get())) {
702 grpc_core::CopyContextFn copy_context_fn =
703 grpc_core::GrpcHttp2GetCopyContextFn();
704 if (copy_context_fn != nullptr &&
705 grpc_core::GrpcHttp2GetWriteTimestampsCallback() != nullptr) {
706 t->context_list->emplace_back(copy_context_fn(s->arena),
707 outbuf_relative_start_pos,
708 num_stream_bytes, s->byte_counter,
709 s->write_counter - 1, s->tcp_tracer);
710 }
711 }
712 outbuf_relative_start_pos += num_stream_bytes;
713 }
714 if (stream_ctx.stream_became_writable()) {
715 if (!grpc_chttp2_list_add_writing_stream(t, s)) {
716 // already in writing list: drop ref
717 GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:already_writing");
718 } else {
719 // ref will be dropped at end of write
720 }
721 } else {
722 GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:no_write");
723 }
724 }
725
726 ctx.FlushWindowUpdates();
727
728 maybe_initiate_ping(t);
729
730 t->write_flow.Begin(GRPC_LATENT_SEE_METADATA("write"));
731
732 return ctx.Result();
733 }
734
grpc_chttp2_end_write(grpc_chttp2_transport * t,grpc_error_handle error)735 void grpc_chttp2_end_write(grpc_chttp2_transport* t, grpc_error_handle error) {
736 GRPC_LATENT_SEE_INNER_SCOPE("grpc_chttp2_end_write");
737 grpc_chttp2_stream* s;
738
739 t->write_flow.End();
740
741 if (t->channelz_socket != nullptr) {
742 t->channelz_socket->RecordMessagesSent(t->num_messages_in_next_write);
743 }
744 t->num_messages_in_next_write = 0;
745
746 if (t->ping_callbacks.started_new_ping_without_setting_timeout() &&
747 t->keepalive_timeout != grpc_core::Duration::Infinity()) {
748 // Set ping timeout after finishing write so we don't measure our own send
749 // time.
750 const auto timeout = t->ping_timeout;
751 auto id = t->ping_callbacks.OnPingTimeout(
752 timeout, t->event_engine.get(), [t = t->Ref()] {
753 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
754 grpc_core::ExecCtx exec_ctx;
755 grpc_chttp2_ping_timeout(t);
756 });
757 if (GRPC_TRACE_FLAG_ENABLED(http2_ping) && id.has_value()) {
758 LOG(INFO) << (t->is_client ? "CLIENT" : "SERVER") << "[" << t
759 << "]: Set ping timeout timer of " << timeout.ToString()
760 << " for ping id " << id.value();
761 }
762
763 if (t->keepalive_incoming_data_wanted &&
764 t->keepalive_timeout < t->ping_timeout &&
765 t->keepalive_ping_timeout_handle !=
766 grpc_event_engine::experimental::EventEngine::TaskHandle::
767 kInvalid) {
768 if (GRPC_TRACE_FLAG_ENABLED(http2_ping) ||
769 GRPC_TRACE_FLAG_ENABLED(http_keepalive)) {
770 LOG(INFO) << (t->is_client ? "CLIENT" : "SERVER") << "[" << t
771 << "]: Set keepalive ping timeout timer of "
772 << t->keepalive_timeout.ToString();
773 }
774 t->keepalive_ping_timeout_handle =
775 t->event_engine->RunAfter(t->keepalive_timeout, [t = t->Ref()] {
776 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
777 grpc_core::ExecCtx exec_ctx;
778 grpc_chttp2_keepalive_timeout(t);
779 });
780 }
781 }
782
783 while (grpc_chttp2_list_pop_writing_stream(t, &s)) {
784 if (s->sending_bytes != 0) {
785 update_list(t, static_cast<int64_t>(s->sending_bytes),
786 &s->on_write_finished_cbs, &s->flow_controlled_bytes_written,
787 error);
788 s->sending_bytes = 0;
789 }
790 GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:end");
791 }
792 grpc_slice_buffer_reset_and_unref(t->outbuf.c_slice_buffer());
793 }
794