• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpc/slice_buffer.h>
21 #include <grpc/support/port_platform.h>
22 #include <grpc/support/time.h>
23 #include <inttypes.h>
24 #include <stddef.h>
25 
26 #include <algorithm>
27 #include <limits>
28 #include <memory>
29 #include <string>
30 #include <utility>
31 
32 #include "absl/container/flat_hash_map.h"
33 #include "absl/log/check.h"
34 #include "absl/log/log.h"
35 #include "absl/status/status.h"
36 #include "absl/types/optional.h"
37 #include "src/core/channelz/channelz.h"
38 #include "src/core/ext/transport/chttp2/transport/call_tracer_wrapper.h"
39 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
40 #include "src/core/ext/transport/chttp2/transport/context_list_entry.h"
41 #include "src/core/ext/transport/chttp2/transport/flow_control.h"
42 #include "src/core/ext/transport/chttp2/transport/frame_data.h"
43 #include "src/core/ext/transport/chttp2/transport/frame_ping.h"
44 #include "src/core/ext/transport/chttp2/transport/frame_rst_stream.h"
45 #include "src/core/ext/transport/chttp2/transport/frame_settings.h"
46 #include "src/core/ext/transport/chttp2/transport/frame_window_update.h"
47 #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
48 #include "src/core/ext/transport/chttp2/transport/http2_settings.h"
49 #include "src/core/ext/transport/chttp2/transport/internal.h"
50 #include "src/core/ext/transport/chttp2/transport/legacy_frame.h"
51 #include "src/core/ext/transport/chttp2/transport/ping_callbacks.h"
52 #include "src/core/ext/transport/chttp2/transport/ping_rate_policy.h"
53 #include "src/core/ext/transport/chttp2/transport/stream_lists.h"
54 #include "src/core/ext/transport/chttp2/transport/write_size_policy.h"
55 #include "src/core/lib/debug/trace.h"
56 #include "src/core/lib/experiments/experiments.h"
57 #include "src/core/lib/iomgr/endpoint.h"
58 #include "src/core/lib/iomgr/error.h"
59 #include "src/core/lib/iomgr/exec_ctx.h"
60 #include "src/core/lib/slice/slice.h"
61 #include "src/core/lib/slice/slice_buffer.h"
62 #include "src/core/lib/transport/bdp_estimator.h"
63 #include "src/core/lib/transport/http2_errors.h"
64 #include "src/core/lib/transport/metadata_batch.h"
65 #include "src/core/lib/transport/transport.h"
66 #include "src/core/telemetry/call_tracer.h"
67 #include "src/core/telemetry/stats.h"
68 #include "src/core/telemetry/stats_data.h"
69 #include "src/core/util/match.h"
70 #include "src/core/util/ref_counted.h"
71 #include "src/core/util/ref_counted_ptr.h"
72 #include "src/core/util/time.h"
73 #include "src/core/util/useful.h"
74 
75 // IWYU pragma: no_include "src/core/util/orphanable.h"
76 
add_to_write_list(grpc_chttp2_write_cb ** list,grpc_chttp2_write_cb * cb)77 static void add_to_write_list(grpc_chttp2_write_cb** list,
78                               grpc_chttp2_write_cb* cb) {
79   cb->next = *list;
80   *list = cb;
81 }
82 
finish_write_cb(grpc_chttp2_transport * t,grpc_chttp2_write_cb * cb,grpc_error_handle error)83 static void finish_write_cb(grpc_chttp2_transport* t, grpc_chttp2_write_cb* cb,
84                             grpc_error_handle error) {
85   grpc_chttp2_complete_closure_step(t, &cb->closure, error, "finish_write_cb");
86   cb->next = t->write_cb_pool;
87   t->write_cb_pool = cb;
88 }
89 
NextAllowedPingInterval(grpc_chttp2_transport * t)90 static grpc_core::Duration NextAllowedPingInterval(grpc_chttp2_transport* t) {
91   if (t->is_client) {
92     return (t->keepalive_permit_without_calls == 0 && t->stream_map.empty())
93                ? grpc_core::Duration::Hours(2)
94                : grpc_core::Duration::Seconds(
95                      1);  // A second is added to deal with
96                           // network delays and timing imprecision
97   }
98   if (t->sent_goaway_state != GRPC_CHTTP2_GRACEFUL_GOAWAY) {
99     // The gRPC keepalive spec doesn't call for any throttling on the server
100     // side, but we are adding some throttling for protection anyway, unless
101     // we are doing a graceful GOAWAY in which case we don't want to wait.
102     if (grpc_core::IsMultipingEnabled()) {
103       return grpc_core::Duration::Seconds(1);
104     }
105     return t->keepalive_time == grpc_core::Duration::Infinity()
106                ? grpc_core::Duration::Seconds(20)
107                : t->keepalive_time / 2;
108   }
109   return grpc_core::Duration::Zero();
110 }
111 
maybe_initiate_ping(grpc_chttp2_transport * t)112 static void maybe_initiate_ping(grpc_chttp2_transport* t) {
113   if (!t->ping_callbacks.ping_requested()) {
114     // no ping needed: wait
115     return;
116   }
117   // InvalidateNow to avoid getting stuck re-initializing the ping timer
118   // in a loop while draining the currently-held combiner. Also see
119   // https://github.com/grpc/grpc/issues/26079.
120   grpc_core::ExecCtx::Get()->InvalidateNow();
121   Match(
122       t->ping_rate_policy.RequestSendPing(NextAllowedPingInterval(t),
123                                           t->ping_callbacks.pings_inflight()),
124       [t](grpc_core::Chttp2PingRatePolicy::SendGranted) {
125         t->ping_rate_policy.SentPing();
126         const uint64_t id = t->ping_callbacks.StartPing(t->bitgen);
127         grpc_slice_buffer_add(t->outbuf.c_slice_buffer(),
128                               grpc_chttp2_ping_create(false, id));
129         t->keepalive_incoming_data_wanted = true;
130         if (t->channelz_socket != nullptr) {
131           t->channelz_socket->RecordKeepaliveSent();
132         }
133         grpc_core::global_stats().IncrementHttp2PingsSent();
134         if (GRPC_TRACE_FLAG_ENABLED(http) ||
135             GRPC_TRACE_FLAG_ENABLED(bdp_estimator) ||
136             GRPC_TRACE_FLAG_ENABLED(http_keepalive) ||
137             GRPC_TRACE_FLAG_ENABLED(http2_ping)) {
138           LOG(INFO) << (t->is_client ? "CLIENT" : "SERVER") << "[" << t
139                     << "]: Ping " << id << " sent ["
140                     << std::string(t->peer_string.as_string_view())
141                     << "]: " << t->ping_rate_policy.GetDebugString();
142         }
143       },
144       [t](grpc_core::Chttp2PingRatePolicy::TooManyRecentPings) {
145         // need to receive something of substance before sending a ping again
146         if (GRPC_TRACE_FLAG_ENABLED(http) ||
147             GRPC_TRACE_FLAG_ENABLED(bdp_estimator) ||
148             GRPC_TRACE_FLAG_ENABLED(http_keepalive) ||
149             GRPC_TRACE_FLAG_ENABLED(http2_ping)) {
150           LOG(INFO) << (t->is_client ? "CLIENT" : "SERVER") << "[" << t
151                     << "]: Ping delayed ["
152                     << std::string(t->peer_string.as_string_view())
153                     << "]: too many recent pings: "
154                     << t->ping_rate_policy.GetDebugString();
155         }
156       },
157       [t](grpc_core::Chttp2PingRatePolicy::TooSoon too_soon) {
158         // not enough elapsed time between successive pings
159         if (GRPC_TRACE_FLAG_ENABLED(http) ||
160             GRPC_TRACE_FLAG_ENABLED(bdp_estimator) ||
161             GRPC_TRACE_FLAG_ENABLED(http_keepalive) ||
162             GRPC_TRACE_FLAG_ENABLED(http2_ping)) {
163           LOG(INFO) << (t->is_client ? "CLIENT" : "SERVER") << "[" << t
164                     << "]: Ping delayed ["
165                     << std::string(t->peer_string.as_string_view())
166                     << "]: not enough time elapsed since last "
167                        "ping. Last ping:"
168                     << too_soon.last_ping
169                     << ", minimum wait:" << too_soon.next_allowed_ping_interval
170                     << ", need to wait:" << too_soon.wait;
171         }
172         if (t->delayed_ping_timer_handle ==
173             grpc_event_engine::experimental::EventEngine::TaskHandle::
174                 kInvalid) {
175           t->delayed_ping_timer_handle = t->event_engine->RunAfter(
176               too_soon.wait, [t = t->Ref()]() mutable {
177                 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
178                 grpc_core::ExecCtx exec_ctx;
179                 grpc_chttp2_retry_initiate_ping(std::move(t));
180               });
181         }
182       });
183 }
184 
update_list(grpc_chttp2_transport * t,int64_t send_bytes,grpc_chttp2_write_cb ** list,int64_t * ctr,grpc_error_handle error)185 static bool update_list(grpc_chttp2_transport* t, int64_t send_bytes,
186                         grpc_chttp2_write_cb** list, int64_t* ctr,
187                         grpc_error_handle error) {
188   bool sched_any = false;
189   grpc_chttp2_write_cb* cb = *list;
190   *list = nullptr;
191   *ctr += send_bytes;
192   while (cb) {
193     grpc_chttp2_write_cb* next = cb->next;
194     if (cb->call_at_byte <= *ctr) {
195       sched_any = true;
196       finish_write_cb(t, cb, error);
197     } else {
198       add_to_write_list(list, cb);
199     }
200     cb = next;
201   }
202   return sched_any;
203 }
204 
report_stall(grpc_chttp2_transport * t,grpc_chttp2_stream * s,const char * staller)205 static void report_stall(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
206                          const char* staller) {
207   GRPC_TRACE_VLOG(flowctl, 2)
208       << t->peer_string.as_string_view() << ":" << t << " stream " << s->id
209       << " moved to stalled list by " << staller
210       << ". This is FULLY expected to happen in a healthy program that is not "
211          "seeing flow control stalls. However, if you know that there are "
212          "unwanted stalls, here is some helpful data: [fc:pending="
213       << s->flow_controlled_buffer.length
214       << ":flowed=" << s->flow_controlled_bytes_flowed
215       << ":peer_initwin=" << t->settings.acked().initial_window_size()
216       << ":t_win=" << t->flow_control.remote_window() << ":s_win="
217       << static_cast<uint32_t>(std::max(
218              int64_t{0}, s->flow_control.remote_window_delta() +
219                              static_cast<int64_t>(
220                                  t->settings.peer().initial_window_size())))
221       << ":s_delta=" << s->flow_control.remote_window_delta() << "]";
222 }
223 
224 namespace {
225 
226 class CountDefaultMetadataEncoder {
227  public:
count() const228   size_t count() const { return count_; }
229 
Encode(const grpc_core::Slice &,const grpc_core::Slice &)230   void Encode(const grpc_core::Slice&, const grpc_core::Slice&) {}
231 
232   template <typename Which>
Encode(Which,const typename Which::ValueType &)233   void Encode(Which, const typename Which::ValueType&) {
234     count_++;
235   }
236 
237  private:
238   size_t count_ = 0;
239 };
240 
241 }  // namespace
242 
243 // Returns true if initial_metadata contains only default headers.
is_default_initial_metadata(grpc_metadata_batch * initial_metadata)244 static bool is_default_initial_metadata(grpc_metadata_batch* initial_metadata) {
245   CountDefaultMetadataEncoder enc;
246   initial_metadata->Encode(&enc);
247   return enc.count() == initial_metadata->count();
248 }
249 
250 namespace {
251 
252 class WriteContext {
253  public:
WriteContext(grpc_chttp2_transport * t)254   explicit WriteContext(grpc_chttp2_transport* t) : t_(t) {
255     grpc_core::global_stats().IncrementHttp2WritesBegun();
256   }
257 
FlushSettings()258   void FlushSettings() {
259     auto update = t_->settings.MaybeSendUpdate();
260     if (update.has_value()) {
261       grpc_core::Http2Frame frame(std::move(*update));
262       Serialize(absl::Span<grpc_core::Http2Frame>(&frame, 1), t_->outbuf);
263       if (t_->keepalive_timeout != grpc_core::Duration::Infinity()) {
264         CHECK(
265             t_->settings_ack_watchdog ==
266             grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid);
267         // We base settings timeout on keepalive timeout, but double it to allow
268         // for implementations taking some more time about acking a setting.
269         t_->settings_ack_watchdog = t_->event_engine->RunAfter(
270             t_->settings_timeout, [t = t_->Ref()]() mutable {
271               grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
272               grpc_core::ExecCtx exec_ctx;
273               grpc_chttp2_settings_timeout(std::move(t));
274             });
275       }
276       t_->flow_control.FlushedSettings();
277       grpc_core::global_stats().IncrementHttp2SettingsWrites();
278     }
279   }
280 
FlushQueuedBuffers()281   void FlushQueuedBuffers() {
282     // simple writes are queued to qbuf, and flushed here
283     grpc_slice_buffer_move_into(&t_->qbuf, t_->outbuf.c_slice_buffer());
284     t_->num_pending_induced_frames = 0;
285     CHECK_EQ(t_->qbuf.count, 0u);
286   }
287 
FlushWindowUpdates()288   void FlushWindowUpdates() {
289     uint32_t transport_announce = t_->flow_control.MaybeSendUpdate(
290         t_->outbuf.c_slice_buffer()->count > 0);
291     if (transport_announce) {
292       grpc_slice_buffer_add(
293           t_->outbuf.c_slice_buffer(),
294           grpc_chttp2_window_update_create(0, transport_announce, nullptr));
295       grpc_chttp2_reset_ping_clock(t_);
296     }
297   }
298 
FlushPingAcks()299   void FlushPingAcks() {
300     if (t_->ping_ack_count == 0) return;
301     // Limit the size of writes if we include ping acks - to avoid the ack being
302     // delayed by crypto operations.
303     target_write_size_ = 0;
304     for (size_t i = 0; i < t_->ping_ack_count; i++) {
305       grpc_slice_buffer_add(t_->outbuf.c_slice_buffer(),
306                             grpc_chttp2_ping_create(true, t_->ping_acks[i]));
307     }
308     t_->ping_ack_count = 0;
309   }
310 
EnactHpackSettings()311   void EnactHpackSettings() {
312     t_->hpack_compressor.SetMaxTableSize(
313         t_->settings.peer().header_table_size());
314   }
315 
UpdateStreamsNoLongerStalled()316   void UpdateStreamsNoLongerStalled() {
317     grpc_chttp2_stream* s;
318     while (grpc_chttp2_list_pop_stalled_by_transport(t_, &s)) {
319       if (t_->closed_with_error.ok() &&
320           grpc_chttp2_list_add_writable_stream(t_, s)) {
321         if (!s->refcount->refs.RefIfNonZero()) {
322           grpc_chttp2_list_remove_writable_stream(t_, s);
323         }
324       }
325     }
326   }
327 
NextStream()328   grpc_chttp2_stream* NextStream() {
329     if (t_->outbuf.c_slice_buffer()->length > target_write_size_) {
330       result_.partial = true;
331       return nullptr;
332     }
333 
334     grpc_chttp2_stream* s;
335     if (!grpc_chttp2_list_pop_writable_stream(t_, &s)) {
336       return nullptr;
337     }
338 
339     return s;
340   }
341 
IncInitialMetadataWrites()342   void IncInitialMetadataWrites() { ++initial_metadata_writes_; }
IncWindowUpdateWrites()343   void IncWindowUpdateWrites() { ++flow_control_writes_; }
IncMessageWrites()344   void IncMessageWrites() { ++message_writes_; }
IncTrailingMetadataWrites()345   void IncTrailingMetadataWrites() { ++trailing_metadata_writes_; }
346 
NoteScheduledResults()347   void NoteScheduledResults() { result_.early_results_scheduled = true; }
348 
transport() const349   grpc_chttp2_transport* transport() const { return t_; }
350 
Result()351   grpc_chttp2_begin_write_result Result() {
352     result_.writing = t_->outbuf.c_slice_buffer()->count > 0;
353     return result_;
354   }
355 
target_write_size() const356   size_t target_write_size() const { return target_write_size_; }
357 
358  private:
359   grpc_chttp2_transport* const t_;
360   size_t target_write_size_ = t_->write_size_policy.WriteTargetSize();
361 
362   // stats histogram counters: we increment these throughout this function,
363   // and at the end publish to the central stats histograms
364   int flow_control_writes_ = 0;
365   int initial_metadata_writes_ = 0;
366   int trailing_metadata_writes_ = 0;
367   int message_writes_ = 0;
368   grpc_chttp2_begin_write_result result_ = {false, false, false};
369 };
370 
371 class DataSendContext {
372  public:
DataSendContext(WriteContext * write_context,grpc_chttp2_transport * t,grpc_chttp2_stream * s)373   DataSendContext(WriteContext* write_context, grpc_chttp2_transport* t,
374                   grpc_chttp2_stream* s)
375       : write_context_(write_context),
376         t_(t),
377         s_(s),
378         sending_bytes_before_(s_->sending_bytes) {}
379 
stream_remote_window() const380   uint32_t stream_remote_window() const {
381     return static_cast<uint32_t>(std::max(
382         int64_t{0},
383         s_->flow_control.remote_window_delta() +
384             static_cast<int64_t>(t_->settings.peer().initial_window_size())));
385   }
386 
max_outgoing() const387   uint32_t max_outgoing() const {
388     return grpc_core::Clamp<uint32_t>(
389         std::min<int64_t>(
390             {t_->settings.peer().max_frame_size(), stream_remote_window(),
391              t_->flow_control.remote_window(),
392              static_cast<int64_t>(write_context_->target_write_size())}),
393         0, std::numeric_limits<uint32_t>::max());
394   }
395 
AnyOutgoing() const396   bool AnyOutgoing() const { return max_outgoing() > 0; }
397 
FlushBytes()398   void FlushBytes() {
399     uint32_t send_bytes =
400         static_cast<uint32_t>(std::min(static_cast<size_t>(max_outgoing()),
401                                        s_->flow_controlled_buffer.length));
402     is_last_frame_ = send_bytes == s_->flow_controlled_buffer.length &&
403                      s_->send_trailing_metadata != nullptr &&
404                      s_->send_trailing_metadata->empty();
405     grpc_chttp2_encode_data(s_->id, &s_->flow_controlled_buffer, send_bytes,
406                             is_last_frame_, &s_->call_tracer_wrapper,
407                             t_->outbuf.c_slice_buffer());
408     sfc_upd_.SentData(send_bytes);
409     s_->sending_bytes += send_bytes;
410   }
411 
is_last_frame() const412   bool is_last_frame() const { return is_last_frame_; }
413 
CallCallbacks()414   void CallCallbacks() {
415     if (update_list(
416             t_, static_cast<int64_t>(s_->sending_bytes - sending_bytes_before_),
417             &s_->on_flow_controlled_cbs, &s_->flow_controlled_bytes_flowed,
418             absl::OkStatus())) {
419       write_context_->NoteScheduledResults();
420     }
421   }
422 
423  private:
424   WriteContext* write_context_;
425   grpc_chttp2_transport* t_;
426   grpc_chttp2_stream* s_;
427   grpc_core::chttp2::StreamFlowControl::OutgoingUpdateContext sfc_upd_{
428       &s_->flow_control};
429   const size_t sending_bytes_before_;
430   bool is_last_frame_ = false;
431 };
432 
433 class StreamWriteContext {
434  public:
StreamWriteContext(WriteContext * write_context,grpc_chttp2_stream * s)435   StreamWriteContext(WriteContext* write_context, grpc_chttp2_stream* s)
436       : write_context_(write_context), t_(write_context->transport()), s_(s) {
437     GRPC_CHTTP2_IF_TRACING(INFO)
438         << "W:" << t_ << " " << (t_->is_client ? "CLIENT" : "SERVER") << "["
439         << s->id << "] im-(sent,send)=(" << s->sent_initial_metadata << ","
440         << (s->send_initial_metadata != nullptr) << ")";
441   }
442 
FlushInitialMetadata()443   void FlushInitialMetadata() {
444     // send initial metadata if it's available
445     if (s_->sent_initial_metadata) return;
446     if (s_->send_initial_metadata == nullptr) return;
447 
448     // We skip this on the server side if there is no custom initial
449     // metadata, there are no messages to send, and we are also sending
450     // trailing metadata.  This results in a Trailers-Only response,
451     // which is required for retries, as per:
452     // https://github.com/grpc/proposal/blob/master/A6-client-retries.md#when-retries-are-valid
453     if (!t_->is_client && s_->flow_controlled_buffer.length == 0 &&
454         s_->send_trailing_metadata != nullptr &&
455         is_default_initial_metadata(s_->send_initial_metadata)) {
456       ConvertInitialMetadataToTrailingMetadata();
457     } else {
458       t_->hpack_compressor.EncodeHeaders(
459           grpc_core::HPackCompressor::EncodeHeaderOptions{
460               s_->id,  // stream_id
461               false,   // is_eof
462               t_->settings.peer()
463                   .allow_true_binary_metadata(),     // use_true_binary_metadata
464               t_->settings.peer().max_frame_size(),  // max_frame_size
465               &s_->call_tracer_wrapper},
466           *s_->send_initial_metadata, t_->outbuf.c_slice_buffer());
467       grpc_chttp2_reset_ping_clock(t_);
468       write_context_->IncInitialMetadataWrites();
469     }
470 
471     s_->send_initial_metadata = nullptr;
472     s_->sent_initial_metadata = true;
473     write_context_->NoteScheduledResults();
474     grpc_chttp2_complete_closure_step(t_, &s_->send_initial_metadata_finished,
475                                       absl::OkStatus(),
476                                       "send_initial_metadata_finished");
477     if (!grpc_core::IsCallTracerInTransportEnabled()) {
478       if (s_->call_tracer) {
479         grpc_core::HttpAnnotation::WriteStats write_stats;
480         write_stats.target_write_size = write_context_->target_write_size();
481         s_->call_tracer->RecordAnnotation(
482             grpc_core::HttpAnnotation(
483                 grpc_core::HttpAnnotation::Type::kHeadWritten,
484                 gpr_now(GPR_CLOCK_REALTIME))
485                 .Add(s_->t->flow_control.stats())
486                 .Add(s_->flow_control.stats())
487                 .Add(write_stats));
488       }
489     } else if (grpc_core::IsTraceRecordCallopsEnabled()) {
490       auto* call_tracer =
491           s_->arena->GetContext<grpc_core::CallTracerInterface>();
492       if (call_tracer != nullptr && call_tracer->IsSampled()) {
493         grpc_core::HttpAnnotation::WriteStats write_stats;
494         write_stats.target_write_size = write_context_->target_write_size();
495         call_tracer->RecordAnnotation(
496             grpc_core::HttpAnnotation(
497                 grpc_core::HttpAnnotation::Type::kHeadWritten,
498                 gpr_now(GPR_CLOCK_REALTIME))
499                 .Add(s_->t->flow_control.stats())
500                 .Add(s_->flow_control.stats())
501                 .Add(write_stats));
502       }
503     }
504   }
505 
FlushWindowUpdates()506   void FlushWindowUpdates() {
507     if (s_->read_closed) return;
508 
509     // send any window updates
510     const uint32_t stream_announce = s_->flow_control.MaybeSendUpdate();
511     if (stream_announce == 0) return;
512 
513     grpc_slice_buffer_add(
514         t_->outbuf.c_slice_buffer(),
515         grpc_chttp2_window_update_create(s_->id, stream_announce,
516                                          &s_->call_tracer_wrapper));
517     grpc_chttp2_reset_ping_clock(t_);
518     write_context_->IncWindowUpdateWrites();
519   }
520 
FlushData()521   void FlushData() {
522     if (!s_->sent_initial_metadata) return;
523 
524     if (s_->flow_controlled_buffer.length == 0) {
525       return;  // early out: nothing to do
526     }
527 
528     DataSendContext data_send_context(write_context_, t_, s_);
529 
530     if (!data_send_context.AnyOutgoing()) {
531       if (t_->flow_control.remote_window() <= 0) {
532         grpc_core::global_stats().IncrementHttp2TransportStalls();
533         report_stall(t_, s_, "transport");
534         grpc_chttp2_list_add_stalled_by_transport(t_, s_);
535       } else if (data_send_context.stream_remote_window() <= 0) {
536         grpc_core::global_stats().IncrementHttp2StreamStalls();
537         report_stall(t_, s_, "stream");
538         grpc_chttp2_list_add_stalled_by_stream(t_, s_);
539       }
540       return;  // early out: nothing to do
541     }
542 
543     while (s_->flow_controlled_buffer.length > 0 &&
544            data_send_context.max_outgoing() > 0) {
545       data_send_context.FlushBytes();
546     }
547     grpc_chttp2_reset_ping_clock(t_);
548     if (data_send_context.is_last_frame()) {
549       SentLastFrame();
550     }
551     data_send_context.CallCallbacks();
552     stream_became_writable_ = true;
553     if (s_->flow_controlled_buffer.length > 0) {
554       GRPC_CHTTP2_STREAM_REF(s_, "chttp2_writing:fork");
555       grpc_chttp2_list_add_writable_stream(t_, s_);
556     }
557     write_context_->IncMessageWrites();
558   }
559 
FlushTrailingMetadata()560   void FlushTrailingMetadata() {
561     if (!s_->sent_initial_metadata) return;
562 
563     if (s_->send_trailing_metadata == nullptr) return;
564     if (s_->flow_controlled_buffer.length != 0) return;
565 
566     GRPC_CHTTP2_IF_TRACING(INFO) << "sending trailing_metadata";
567     if (s_->send_trailing_metadata->empty()) {
568       grpc_chttp2_encode_data(s_->id, &s_->flow_controlled_buffer, 0, true,
569                               &s_->call_tracer_wrapper,
570                               t_->outbuf.c_slice_buffer());
571     } else {
572       t_->hpack_compressor.EncodeHeaders(
573           grpc_core::HPackCompressor::EncodeHeaderOptions{
574               s_->id, true, t_->settings.peer().allow_true_binary_metadata(),
575               t_->settings.peer().max_frame_size(), &s_->call_tracer_wrapper},
576           *s_->send_trailing_metadata, t_->outbuf.c_slice_buffer());
577     }
578     write_context_->IncTrailingMetadataWrites();
579     grpc_chttp2_reset_ping_clock(t_);
580     SentLastFrame();
581 
582     write_context_->NoteScheduledResults();
583     grpc_chttp2_complete_closure_step(t_, &s_->send_trailing_metadata_finished,
584                                       absl::OkStatus(),
585                                       "send_trailing_metadata_finished");
586   }
587 
stream_became_writable()588   bool stream_became_writable() { return stream_became_writable_; }
589 
590  private:
591   class TrailersOnlyMetadataEncoder {
592    public:
TrailersOnlyMetadataEncoder(grpc_metadata_batch * trailing_md)593     explicit TrailersOnlyMetadataEncoder(grpc_metadata_batch* trailing_md)
594         : trailing_md_(trailing_md) {}
595 
596     template <typename Which, typename Value>
Encode(Which which,Value value)597     void Encode(Which which, Value value) {
598       if (Which::kTransferOnTrailersOnly) {
599         trailing_md_->Set(which, value);
600       }
601     }
602 
603     template <typename Which>
Encode(Which which,const grpc_core::Slice & value)604     void Encode(Which which, const grpc_core::Slice& value) {
605       if (Which::kTransferOnTrailersOnly) {
606         trailing_md_->Set(which, value.Ref());
607       }
608     }
609 
610     // Non-grpc metadata should not be transferred.
Encode(const grpc_core::Slice &,const grpc_core::Slice &)611     void Encode(const grpc_core::Slice&, const grpc_core::Slice&) {}
612 
613    private:
614     grpc_metadata_batch* trailing_md_;
615   };
616 
ConvertInitialMetadataToTrailingMetadata()617   void ConvertInitialMetadataToTrailingMetadata() {
618     GRPC_CHTTP2_IF_TRACING(INFO)
619         << "not sending initial_metadata (Trailers-Only)";
620     // When sending Trailers-Only, we need to move metadata from headers to
621     // trailers.
622     TrailersOnlyMetadataEncoder encoder(s_->send_trailing_metadata);
623     s_->send_initial_metadata->Encode(&encoder);
624   }
625 
SentLastFrame()626   void SentLastFrame() {
627     s_->send_trailing_metadata = nullptr;
628     if (s_->sent_trailing_metadata_op) {
629       *s_->sent_trailing_metadata_op = true;
630       s_->sent_trailing_metadata_op = nullptr;
631     }
632     s_->sent_trailing_metadata = true;
633     s_->eos_sent = true;
634 
635     if (!t_->is_client && !s_->read_closed) {
636       grpc_slice_buffer_add(
637           t_->outbuf.c_slice_buffer(),
638           grpc_chttp2_rst_stream_create(s_->id, GRPC_HTTP2_NO_ERROR,
639                                         &s_->call_tracer_wrapper));
640     }
641     grpc_chttp2_mark_stream_closed(t_, s_, !t_->is_client, true,
642                                    absl::OkStatus());
643     if (!grpc_core::IsCallTracerInTransportEnabled()) {
644       if (s_->call_tracer) {
645         s_->call_tracer->RecordAnnotation(
646             grpc_core::HttpAnnotation(grpc_core::HttpAnnotation::Type::kEnd,
647                                       gpr_now(GPR_CLOCK_REALTIME))
648                 .Add(s_->t->flow_control.stats())
649                 .Add(s_->flow_control.stats()));
650       }
651     } else if (grpc_core::IsTraceRecordCallopsEnabled()) {
652       auto* call_tracer =
653           s_->arena->GetContext<grpc_core::CallTracerInterface>();
654       if (call_tracer != nullptr && call_tracer->IsSampled()) {
655         call_tracer->RecordAnnotation(
656             grpc_core::HttpAnnotation(grpc_core::HttpAnnotation::Type::kEnd,
657                                       gpr_now(GPR_CLOCK_REALTIME))
658                 .Add(s_->t->flow_control.stats())
659                 .Add(s_->flow_control.stats()));
660       }
661     }
662   }
663 
664   WriteContext* const write_context_;
665   grpc_chttp2_transport* const t_;
666   grpc_chttp2_stream* const s_;
667   bool stream_became_writable_ = false;
668 };
669 }  // namespace
670 
grpc_chttp2_begin_write(grpc_chttp2_transport * t)671 grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
672     grpc_chttp2_transport* t) {
673   GRPC_LATENT_SEE_INNER_SCOPE("grpc_chttp2_begin_write");
674 
675   int64_t outbuf_relative_start_pos = 0;
676   WriteContext ctx(t);
677   ctx.FlushSettings();
678   ctx.FlushPingAcks();
679   ctx.FlushQueuedBuffers();
680   ctx.EnactHpackSettings();
681 
682   if (t->flow_control.remote_window() > 0) {
683     ctx.UpdateStreamsNoLongerStalled();
684   }
685 
686   // for each grpc_chttp2_stream that's become writable, frame it's data
687   // (according to available window sizes) and add to the output buffer
688   while (grpc_chttp2_stream* s = ctx.NextStream()) {
689     StreamWriteContext stream_ctx(&ctx, s);
690     size_t orig_len = t->outbuf.c_slice_buffer()->length;
691     int64_t num_stream_bytes = 0;
692     stream_ctx.FlushInitialMetadata();
693     stream_ctx.FlushWindowUpdates();
694     stream_ctx.FlushData();
695     stream_ctx.FlushTrailingMetadata();
696     if (t->outbuf.c_slice_buffer()->length > orig_len) {
697       // Add this stream to the list of the contexts to be traced at TCP
698       num_stream_bytes = t->outbuf.c_slice_buffer()->length - orig_len;
699       s->byte_counter += static_cast<size_t>(num_stream_bytes);
700       ++s->write_counter;
701       if (s->traced && grpc_endpoint_can_track_err(t->ep.get())) {
702         grpc_core::CopyContextFn copy_context_fn =
703             grpc_core::GrpcHttp2GetCopyContextFn();
704         if (copy_context_fn != nullptr &&
705             grpc_core::GrpcHttp2GetWriteTimestampsCallback() != nullptr) {
706           t->context_list->emplace_back(copy_context_fn(s->arena),
707                                         outbuf_relative_start_pos,
708                                         num_stream_bytes, s->byte_counter,
709                                         s->write_counter - 1, s->tcp_tracer);
710         }
711       }
712       outbuf_relative_start_pos += num_stream_bytes;
713     }
714     if (stream_ctx.stream_became_writable()) {
715       if (!grpc_chttp2_list_add_writing_stream(t, s)) {
716         // already in writing list: drop ref
717         GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:already_writing");
718       } else {
719         // ref will be dropped at end of write
720       }
721     } else {
722       GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:no_write");
723     }
724   }
725 
726   ctx.FlushWindowUpdates();
727 
728   maybe_initiate_ping(t);
729 
730   t->write_flow.Begin(GRPC_LATENT_SEE_METADATA("write"));
731 
732   return ctx.Result();
733 }
734 
grpc_chttp2_end_write(grpc_chttp2_transport * t,grpc_error_handle error)735 void grpc_chttp2_end_write(grpc_chttp2_transport* t, grpc_error_handle error) {
736   GRPC_LATENT_SEE_INNER_SCOPE("grpc_chttp2_end_write");
737   grpc_chttp2_stream* s;
738 
739   t->write_flow.End();
740 
741   if (t->channelz_socket != nullptr) {
742     t->channelz_socket->RecordMessagesSent(t->num_messages_in_next_write);
743   }
744   t->num_messages_in_next_write = 0;
745 
746   if (t->ping_callbacks.started_new_ping_without_setting_timeout() &&
747       t->keepalive_timeout != grpc_core::Duration::Infinity()) {
748     // Set ping timeout after finishing write so we don't measure our own send
749     // time.
750     const auto timeout = t->ping_timeout;
751     auto id = t->ping_callbacks.OnPingTimeout(
752         timeout, t->event_engine.get(), [t = t->Ref()] {
753           grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
754           grpc_core::ExecCtx exec_ctx;
755           grpc_chttp2_ping_timeout(t);
756         });
757     if (GRPC_TRACE_FLAG_ENABLED(http2_ping) && id.has_value()) {
758       LOG(INFO) << (t->is_client ? "CLIENT" : "SERVER") << "[" << t
759                 << "]: Set ping timeout timer of " << timeout.ToString()
760                 << " for ping id " << id.value();
761     }
762 
763     if (t->keepalive_incoming_data_wanted &&
764         t->keepalive_timeout < t->ping_timeout &&
765         t->keepalive_ping_timeout_handle !=
766             grpc_event_engine::experimental::EventEngine::TaskHandle::
767                 kInvalid) {
768       if (GRPC_TRACE_FLAG_ENABLED(http2_ping) ||
769           GRPC_TRACE_FLAG_ENABLED(http_keepalive)) {
770         LOG(INFO) << (t->is_client ? "CLIENT" : "SERVER") << "[" << t
771                   << "]: Set keepalive ping timeout timer of "
772                   << t->keepalive_timeout.ToString();
773       }
774       t->keepalive_ping_timeout_handle =
775           t->event_engine->RunAfter(t->keepalive_timeout, [t = t->Ref()] {
776             grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
777             grpc_core::ExecCtx exec_ctx;
778             grpc_chttp2_keepalive_timeout(t);
779           });
780     }
781   }
782 
783   while (grpc_chttp2_list_pop_writing_stream(t, &s)) {
784     if (s->sending_bytes != 0) {
785       update_list(t, static_cast<int64_t>(s->sending_bytes),
786                   &s->on_write_finished_cbs, &s->flow_controlled_bytes_written,
787                   error);
788       s->sending_bytes = 0;
789     }
790     GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:end");
791   }
792   grpc_slice_buffer_reset_and_unref(t->outbuf.c_slice_buffer());
793 }
794