• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
22 #include "src/core/ext/transport/chttp2/transport/context_list.h"
23 #include "src/core/ext/transport/chttp2/transport/internal.h"
24 
25 #include <limits.h>
26 
27 #include <grpc/support/log.h>
28 
29 #include "src/core/lib/compression/stream_compression.h"
30 #include "src/core/lib/debug/stats.h"
31 #include "src/core/lib/profiling/timers.h"
32 #include "src/core/lib/slice/slice_internal.h"
33 #include "src/core/lib/transport/http2_errors.h"
34 
add_to_write_list(grpc_chttp2_write_cb ** list,grpc_chttp2_write_cb * cb)35 static void add_to_write_list(grpc_chttp2_write_cb** list,
36                               grpc_chttp2_write_cb* cb) {
37   cb->next = *list;
38   *list = cb;
39 }
40 
finish_write_cb(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_chttp2_write_cb * cb,grpc_error * error)41 static void finish_write_cb(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
42                             grpc_chttp2_write_cb* cb, grpc_error* error) {
43   grpc_chttp2_complete_closure_step(t, s, &cb->closure, error,
44                                     "finish_write_cb");
45   cb->next = t->write_cb_pool;
46   t->write_cb_pool = cb;
47 }
48 
maybe_initiate_ping(grpc_chttp2_transport * t)49 static void maybe_initiate_ping(grpc_chttp2_transport* t) {
50   grpc_chttp2_ping_queue* pq = &t->ping_queue;
51   if (grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
52     /* no ping needed: wait */
53     return;
54   }
55   if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) {
56     /* ping already in-flight: wait */
57     if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
58         GRPC_TRACE_FLAG_ENABLED(grpc_bdp_estimator_trace) ||
59         GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
60       gpr_log(GPR_INFO, "%s: Ping delayed [%s]: already pinging",
61               t->is_client ? "CLIENT" : "SERVER", t->peer_string.c_str());
62     }
63     return;
64   }
65   if (t->ping_state.pings_before_data_required == 0 &&
66       t->ping_policy.max_pings_without_data != 0) {
67     /* need to receive something of substance before sending a ping again */
68     if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
69         GRPC_TRACE_FLAG_ENABLED(grpc_bdp_estimator_trace) ||
70         GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
71       gpr_log(GPR_INFO, "%s: Ping delayed [%s]: too many recent pings: %d/%d",
72               t->is_client ? "CLIENT" : "SERVER", t->peer_string.c_str(),
73               t->ping_state.pings_before_data_required,
74               t->ping_policy.max_pings_without_data);
75     }
76     return;
77   }
78   grpc_millis now = grpc_core::ExecCtx::Get()->Now();
79 
80   grpc_millis next_allowed_ping_interval =
81       (t->keepalive_permit_without_calls == 0 &&
82        grpc_chttp2_stream_map_size(&t->stream_map) == 0)
83           ? 7200 * GPR_MS_PER_SEC
84           : (GPR_MS_PER_SEC); /* A second is added to deal with network delays
85                                  and timing imprecision */
86   grpc_millis next_allowed_ping =
87       t->ping_state.last_ping_sent_time + next_allowed_ping_interval;
88 
89   if (next_allowed_ping > now) {
90     /* not enough elapsed time between successive pings */
91     if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
92         GRPC_TRACE_FLAG_ENABLED(grpc_bdp_estimator_trace) ||
93         GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
94       gpr_log(GPR_INFO,
95               "%s: Ping delayed [%s]: not enough time elapsed since last ping. "
96               " Last ping %f: Next ping %f: Now %f",
97               t->is_client ? "CLIENT" : "SERVER", t->peer_string.c_str(),
98               static_cast<double>(t->ping_state.last_ping_sent_time),
99               static_cast<double>(next_allowed_ping), static_cast<double>(now));
100     }
101     if (!t->ping_state.is_delayed_ping_timer_set) {
102       t->ping_state.is_delayed_ping_timer_set = true;
103       GRPC_CHTTP2_REF_TRANSPORT(t, "retry_initiate_ping_locked");
104       GRPC_CLOSURE_INIT(&t->retry_initiate_ping_locked,
105                         grpc_chttp2_retry_initiate_ping, t,
106                         grpc_schedule_on_exec_ctx);
107       grpc_timer_init(&t->ping_state.delayed_ping_timer, next_allowed_ping,
108                       &t->retry_initiate_ping_locked);
109     }
110     return;
111   }
112 
113   pq->inflight_id = t->ping_ctr;
114   t->ping_ctr++;
115   grpc_core::ExecCtx::RunList(DEBUG_LOCATION,
116                               &pq->lists[GRPC_CHTTP2_PCL_INITIATE]);
117   grpc_closure_list_move(&pq->lists[GRPC_CHTTP2_PCL_NEXT],
118                          &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
119   grpc_slice_buffer_add(&t->outbuf,
120                         grpc_chttp2_ping_create(false, pq->inflight_id));
121   GRPC_STATS_INC_HTTP2_PINGS_SENT();
122   t->ping_state.last_ping_sent_time = now;
123   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
124       GRPC_TRACE_FLAG_ENABLED(grpc_bdp_estimator_trace) ||
125       GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
126     gpr_log(GPR_INFO, "%s: Ping sent [%s]: %d/%d",
127             t->is_client ? "CLIENT" : "SERVER", t->peer_string.c_str(),
128             t->ping_state.pings_before_data_required,
129             t->ping_policy.max_pings_without_data);
130   }
131   t->ping_state.pings_before_data_required -=
132       (t->ping_state.pings_before_data_required != 0);
133 }
134 
update_list(grpc_chttp2_transport * t,grpc_chttp2_stream * s,int64_t send_bytes,grpc_chttp2_write_cb ** list,int64_t * ctr,grpc_error * error)135 static bool update_list(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
136                         int64_t send_bytes, grpc_chttp2_write_cb** list,
137                         int64_t* ctr, grpc_error* error) {
138   bool sched_any = false;
139   grpc_chttp2_write_cb* cb = *list;
140   *list = nullptr;
141   *ctr += send_bytes;
142   while (cb) {
143     grpc_chttp2_write_cb* next = cb->next;
144     if (cb->call_at_byte <= *ctr) {
145       sched_any = true;
146       finish_write_cb(t, s, cb, GRPC_ERROR_REF(error));
147     } else {
148       add_to_write_list(list, cb);
149     }
150     cb = next;
151   }
152   GRPC_ERROR_UNREF(error);
153   return sched_any;
154 }
155 
report_stall(grpc_chttp2_transport * t,grpc_chttp2_stream * s,const char * staller)156 static void report_stall(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
157                          const char* staller) {
158   if (GRPC_TRACE_FLAG_ENABLED(grpc_flowctl_trace)) {
159     gpr_log(
160         GPR_DEBUG,
161         "%s:%p stream %d moved to stalled list by %s. This is FULLY expected "
162         "to happen in a healthy program that is not seeing flow control stalls."
163         " However, if you know that there are unwanted stalls, here is some "
164         "helpful data: [fc:pending=%" PRIdPTR ":pending-compressed=%" PRIdPTR
165         ":flowed=%" PRId64 ":peer_initwin=%d:t_win=%" PRId64
166         ":s_win=%d:s_delta=%" PRId64 "]",
167         t->peer_string.c_str(), t, s->id, staller,
168         s->flow_controlled_buffer.length,
169         s->stream_compression_method ==
170                 GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS
171             ? 0
172             : s->compressed_data_buffer.length,
173         s->flow_controlled_bytes_flowed,
174         t->settings[GRPC_ACKED_SETTINGS]
175                    [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
176         t->flow_control->remote_window(),
177         static_cast<uint32_t> GPR_MAX(
178             0,
179             s->flow_control->remote_window_delta() +
180                 (int64_t)t->settings[GRPC_PEER_SETTINGS]
181                                     [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]),
182         s->flow_control->remote_window_delta());
183   }
184 }
185 
186 /* How many bytes would we like to put on the wire during a single syscall */
target_write_size(grpc_chttp2_transport *)187 static uint32_t target_write_size(grpc_chttp2_transport* /*t*/) {
188   return 1024 * 1024;
189 }
190 
191 // Returns true if initial_metadata contains only default headers.
is_default_initial_metadata(grpc_metadata_batch * initial_metadata)192 static bool is_default_initial_metadata(grpc_metadata_batch* initial_metadata) {
193   return initial_metadata->list.default_count == initial_metadata->list.count;
194 }
195 
196 namespace {
197 class StreamWriteContext;
198 
199 class WriteContext {
200  public:
WriteContext(grpc_chttp2_transport * t)201   explicit WriteContext(grpc_chttp2_transport* t) : t_(t) {
202     GRPC_STATS_INC_HTTP2_WRITES_BEGUN();
203     GPR_TIMER_SCOPE("grpc_chttp2_begin_write", 0);
204   }
205 
206   // TODO(ctiller): make this the destructor
FlushStats()207   void FlushStats() {
208     GRPC_STATS_INC_HTTP2_SEND_INITIAL_METADATA_PER_WRITE(
209         initial_metadata_writes_);
210     GRPC_STATS_INC_HTTP2_SEND_MESSAGE_PER_WRITE(message_writes_);
211     GRPC_STATS_INC_HTTP2_SEND_TRAILING_METADATA_PER_WRITE(
212         trailing_metadata_writes_);
213     GRPC_STATS_INC_HTTP2_SEND_FLOWCTL_PER_WRITE(flow_control_writes_);
214   }
215 
FlushSettings()216   void FlushSettings() {
217     if (t_->dirtied_local_settings && !t_->sent_local_settings) {
218       grpc_slice_buffer_add(
219           &t_->outbuf, grpc_chttp2_settings_create(
220                            t_->settings[GRPC_SENT_SETTINGS],
221                            t_->settings[GRPC_LOCAL_SETTINGS],
222                            t_->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS));
223       t_->force_send_settings = false;
224       t_->dirtied_local_settings = false;
225       t_->sent_local_settings = true;
226       GRPC_STATS_INC_HTTP2_SETTINGS_WRITES();
227     }
228   }
229 
FlushQueuedBuffers()230   void FlushQueuedBuffers() {
231     /* simple writes are queued to qbuf, and flushed here */
232     grpc_slice_buffer_move_into(&t_->qbuf, &t_->outbuf);
233     t_->num_pending_induced_frames = 0;
234     GPR_ASSERT(t_->qbuf.count == 0);
235   }
236 
FlushWindowUpdates()237   void FlushWindowUpdates() {
238     uint32_t transport_announce =
239         t_->flow_control->MaybeSendUpdate(t_->outbuf.count > 0);
240     if (transport_announce) {
241       grpc_transport_one_way_stats throwaway_stats;
242       grpc_slice_buffer_add(
243           &t_->outbuf, grpc_chttp2_window_update_create(0, transport_announce,
244                                                         &throwaway_stats));
245       grpc_chttp2_reset_ping_clock(t_);
246     }
247   }
248 
FlushPingAcks()249   void FlushPingAcks() {
250     for (size_t i = 0; i < t_->ping_ack_count; i++) {
251       grpc_slice_buffer_add(&t_->outbuf,
252                             grpc_chttp2_ping_create(true, t_->ping_acks[i]));
253     }
254     t_->ping_ack_count = 0;
255   }
256 
EnactHpackSettings()257   void EnactHpackSettings() {
258     grpc_chttp2_hpack_compressor_set_max_table_size(
259         &t_->hpack_compressor,
260         t_->settings[GRPC_PEER_SETTINGS]
261                     [GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]);
262   }
263 
UpdateStreamsNoLongerStalled()264   void UpdateStreamsNoLongerStalled() {
265     grpc_chttp2_stream* s;
266     while (grpc_chttp2_list_pop_stalled_by_transport(t_, &s)) {
267       if (t_->closed_with_error == GRPC_ERROR_NONE &&
268           grpc_chttp2_list_add_writable_stream(t_, s)) {
269         if (!s->refcount->refs.RefIfNonZero()) {
270           grpc_chttp2_list_remove_writable_stream(t_, s);
271         }
272       }
273     }
274   }
275 
NextStream()276   grpc_chttp2_stream* NextStream() {
277     if (t_->outbuf.length > target_write_size(t_)) {
278       result_.partial = true;
279       return nullptr;
280     }
281 
282     grpc_chttp2_stream* s;
283     if (!grpc_chttp2_list_pop_writable_stream(t_, &s)) {
284       return nullptr;
285     }
286 
287     return s;
288   }
289 
IncInitialMetadataWrites()290   void IncInitialMetadataWrites() { ++initial_metadata_writes_; }
IncWindowUpdateWrites()291   void IncWindowUpdateWrites() { ++flow_control_writes_; }
IncMessageWrites()292   void IncMessageWrites() { ++message_writes_; }
IncTrailingMetadataWrites()293   void IncTrailingMetadataWrites() { ++trailing_metadata_writes_; }
294 
NoteScheduledResults()295   void NoteScheduledResults() { result_.early_results_scheduled = true; }
296 
transport() const297   grpc_chttp2_transport* transport() const { return t_; }
298 
Result()299   grpc_chttp2_begin_write_result Result() {
300     result_.writing = t_->outbuf.count > 0;
301     return result_;
302   }
303 
304  private:
305   grpc_chttp2_transport* const t_;
306 
307   /* stats histogram counters: we increment these throughout this function,
308      and at the end publish to the central stats histograms */
309   int flow_control_writes_ = 0;
310   int initial_metadata_writes_ = 0;
311   int trailing_metadata_writes_ = 0;
312   int message_writes_ = 0;
313   grpc_chttp2_begin_write_result result_ = {false, false, false};
314 };
315 
316 class DataSendContext {
317  public:
DataSendContext(WriteContext * write_context,grpc_chttp2_transport * t,grpc_chttp2_stream * s)318   DataSendContext(WriteContext* write_context, grpc_chttp2_transport* t,
319                   grpc_chttp2_stream* s)
320       : write_context_(write_context),
321         t_(t),
322         s_(s),
323         sending_bytes_before_(s_->sending_bytes) {}
324 
stream_remote_window() const325   uint32_t stream_remote_window() const {
326     return static_cast<uint32_t> GPR_MAX(
327         0, s_->flow_control->remote_window_delta() +
328                (int64_t)t_->settings[GRPC_PEER_SETTINGS]
329                                     [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
330   }
331 
max_outgoing() const332   uint32_t max_outgoing() const {
333     return static_cast<uint32_t> GPR_MIN(
334         t_->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
335         GPR_MIN(stream_remote_window(), t_->flow_control->remote_window()));
336   }
337 
AnyOutgoing() const338   bool AnyOutgoing() const { return max_outgoing() > 0; }
339 
FlushUncompressedBytes()340   void FlushUncompressedBytes() {
341     uint32_t send_bytes = static_cast<uint32_t> GPR_MIN(
342         max_outgoing(), s_->flow_controlled_buffer.length);
343     is_last_frame_ = send_bytes == s_->flow_controlled_buffer.length &&
344                      s_->fetching_send_message == nullptr &&
345                      s_->send_trailing_metadata != nullptr &&
346                      grpc_metadata_batch_is_empty(s_->send_trailing_metadata);
347     grpc_chttp2_encode_data(s_->id, &s_->flow_controlled_buffer, send_bytes,
348                             is_last_frame_, &s_->stats.outgoing, &t_->outbuf);
349     s_->flow_control->SentData(send_bytes);
350     s_->sending_bytes += send_bytes;
351   }
352 
FlushCompressedBytes()353   void FlushCompressedBytes() {
354     GPR_DEBUG_ASSERT(s_->stream_compression_method !=
355                      GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS);
356 
357     uint32_t send_bytes = static_cast<uint32_t> GPR_MIN(
358         max_outgoing(), s_->compressed_data_buffer.length);
359     bool is_last_data_frame =
360         (send_bytes == s_->compressed_data_buffer.length &&
361          s_->flow_controlled_buffer.length == 0 &&
362          s_->fetching_send_message == nullptr);
363     if (is_last_data_frame && s_->send_trailing_metadata != nullptr &&
364         s_->stream_compression_ctx != nullptr) {
365       if (GPR_UNLIKELY(!grpc_stream_compress(
366               s_->stream_compression_ctx, &s_->flow_controlled_buffer,
367               &s_->compressed_data_buffer, nullptr, MAX_SIZE_T,
368               GRPC_STREAM_COMPRESSION_FLUSH_FINISH))) {
369         gpr_log(GPR_ERROR, "Stream compression failed.");
370       }
371       grpc_stream_compression_context_destroy(s_->stream_compression_ctx);
372       s_->stream_compression_ctx = nullptr;
373       /* After finish, bytes in s->compressed_data_buffer may be
374        * more than max_outgoing. Start another round of the current
375        * while loop so that send_bytes and is_last_data_frame are
376        * recalculated. */
377       return;
378     }
379     is_last_frame_ = is_last_data_frame &&
380                      s_->send_trailing_metadata != nullptr &&
381                      grpc_metadata_batch_is_empty(s_->send_trailing_metadata);
382     grpc_chttp2_encode_data(s_->id, &s_->compressed_data_buffer, send_bytes,
383                             is_last_frame_, &s_->stats.outgoing, &t_->outbuf);
384     s_->flow_control->SentData(send_bytes);
385     if (s_->compressed_data_buffer.length == 0) {
386       s_->sending_bytes += s_->uncompressed_data_size;
387     }
388   }
389 
CompressMoreBytes()390   void CompressMoreBytes() {
391     GPR_DEBUG_ASSERT(s_->stream_compression_method !=
392                      GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS);
393 
394     if (s_->stream_compression_ctx == nullptr) {
395       s_->stream_compression_ctx =
396           grpc_stream_compression_context_create(s_->stream_compression_method);
397     }
398     s_->uncompressed_data_size = s_->flow_controlled_buffer.length;
399     if (GPR_UNLIKELY(!grpc_stream_compress(
400             s_->stream_compression_ctx, &s_->flow_controlled_buffer,
401             &s_->compressed_data_buffer, nullptr, MAX_SIZE_T,
402             GRPC_STREAM_COMPRESSION_FLUSH_SYNC))) {
403       gpr_log(GPR_ERROR, "Stream compression failed.");
404     }
405   }
406 
is_last_frame() const407   bool is_last_frame() const { return is_last_frame_; }
408 
CallCallbacks()409   void CallCallbacks() {
410     if (update_list(
411             t_, s_,
412             static_cast<int64_t>(s_->sending_bytes - sending_bytes_before_),
413             &s_->on_flow_controlled_cbs, &s_->flow_controlled_bytes_flowed,
414             GRPC_ERROR_NONE)) {
415       write_context_->NoteScheduledResults();
416     }
417   }
418 
419  private:
420   WriteContext* write_context_;
421   grpc_chttp2_transport* t_;
422   grpc_chttp2_stream* s_;
423   const size_t sending_bytes_before_;
424   bool is_last_frame_ = false;
425 };
426 
427 class StreamWriteContext {
428  public:
StreamWriteContext(WriteContext * write_context,grpc_chttp2_stream * s)429   StreamWriteContext(WriteContext* write_context, grpc_chttp2_stream* s)
430       : write_context_(write_context), t_(write_context->transport()), s_(s) {
431     GRPC_CHTTP2_IF_TRACING(
432         gpr_log(GPR_INFO, "W:%p %s[%d] im-(sent,send)=(%d,%d) announce=%d", t_,
433                 t_->is_client ? "CLIENT" : "SERVER", s->id,
434                 s->sent_initial_metadata, s->send_initial_metadata != nullptr,
435                 (int)(s->flow_control->local_window_delta() -
436                       s->flow_control->announced_window_delta())));
437   }
438 
FlushInitialMetadata()439   void FlushInitialMetadata() {
440     /* send initial metadata if it's available */
441     if (s_->sent_initial_metadata) return;
442     if (s_->send_initial_metadata == nullptr) return;
443 
444     // We skip this on the server side if there is no custom initial
445     // metadata, there are no messages to send, and we are also sending
446     // trailing metadata.  This results in a Trailers-Only response,
447     // which is required for retries, as per:
448     // https://github.com/grpc/proposal/blob/master/A6-client-retries.md#when-retries-are-valid
449     if (!t_->is_client && s_->fetching_send_message == nullptr &&
450         s_->flow_controlled_buffer.length == 0 &&
451         compressed_data_buffer_len() == 0 &&
452         s_->send_trailing_metadata != nullptr &&
453         is_default_initial_metadata(s_->send_initial_metadata)) {
454       ConvertInitialMetadataToTrailingMetadata();
455     } else {
456       grpc_encode_header_options hopt = {
457           s_->id,  // stream_id
458           false,   // is_eof
459           t_->settings[GRPC_PEER_SETTINGS]
460                       [GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA] !=
461               0,  // use_true_binary_metadata
462           t_->settings[GRPC_PEER_SETTINGS]
463                       [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],  // max_frame_size
464           &s_->stats.outgoing                                 // stats
465       };
466       grpc_chttp2_encode_header(&t_->hpack_compressor, nullptr, 0,
467                                 s_->send_initial_metadata, &hopt, &t_->outbuf);
468       grpc_chttp2_reset_ping_clock(t_);
469       write_context_->IncInitialMetadataWrites();
470     }
471 
472     s_->send_initial_metadata = nullptr;
473     s_->sent_initial_metadata = true;
474     write_context_->NoteScheduledResults();
475     grpc_chttp2_complete_closure_step(
476         t_, s_, &s_->send_initial_metadata_finished, GRPC_ERROR_NONE,
477         "send_initial_metadata_finished");
478   }
479 
compressed_data_buffer_len()480   size_t compressed_data_buffer_len() {
481     return s_->stream_compression_method ==
482                    GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS
483                ? 0
484                : s_->compressed_data_buffer.length;
485   }
486 
FlushWindowUpdates()487   void FlushWindowUpdates() {
488     /* send any window updates */
489     const uint32_t stream_announce = s_->flow_control->MaybeSendUpdate();
490     if (stream_announce == 0) return;
491 
492     grpc_slice_buffer_add(
493         &t_->outbuf, grpc_chttp2_window_update_create(s_->id, stream_announce,
494                                                       &s_->stats.outgoing));
495     grpc_chttp2_reset_ping_clock(t_);
496     write_context_->IncWindowUpdateWrites();
497   }
498 
FlushData()499   void FlushData() {
500     if (!s_->sent_initial_metadata) return;
501 
502     if (s_->flow_controlled_buffer.length == 0 &&
503         compressed_data_buffer_len() == 0) {
504       return;  // early out: nothing to do
505     }
506 
507     DataSendContext data_send_context(write_context_, t_, s_);
508 
509     if (!data_send_context.AnyOutgoing()) {
510       if (t_->flow_control->remote_window() <= 0) {
511         report_stall(t_, s_, "transport");
512         grpc_chttp2_list_add_stalled_by_transport(t_, s_);
513       } else if (data_send_context.stream_remote_window() <= 0) {
514         report_stall(t_, s_, "stream");
515         grpc_chttp2_list_add_stalled_by_stream(t_, s_);
516       }
517       return;  // early out: nothing to do
518     }
519 
520     if (s_->stream_compression_method ==
521         GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) {
522       while (s_->flow_controlled_buffer.length > 0 &&
523              data_send_context.max_outgoing() > 0) {
524         data_send_context.FlushUncompressedBytes();
525       }
526     } else {
527       while ((s_->flow_controlled_buffer.length > 0 ||
528               s_->compressed_data_buffer.length > 0) &&
529              data_send_context.max_outgoing() > 0) {
530         if (s_->compressed_data_buffer.length > 0) {
531           data_send_context.FlushCompressedBytes();
532         } else {
533           data_send_context.CompressMoreBytes();
534         }
535       }
536     }
537     grpc_chttp2_reset_ping_clock(t_);
538     if (data_send_context.is_last_frame()) {
539       SentLastFrame();
540     }
541     data_send_context.CallCallbacks();
542     stream_became_writable_ = true;
543     if (s_->flow_controlled_buffer.length > 0 ||
544         compressed_data_buffer_len() > 0) {
545       GRPC_CHTTP2_STREAM_REF(s_, "chttp2_writing:fork");
546       grpc_chttp2_list_add_writable_stream(t_, s_);
547     }
548     write_context_->IncMessageWrites();
549   }
550 
FlushTrailingMetadata()551   void FlushTrailingMetadata() {
552     if (!s_->sent_initial_metadata) return;
553 
554     if (s_->send_trailing_metadata == nullptr) return;
555     if (s_->fetching_send_message != nullptr) return;
556     if (s_->flow_controlled_buffer.length != 0) return;
557     if (compressed_data_buffer_len() != 0) return;
558 
559     GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "sending trailing_metadata"));
560     if (grpc_metadata_batch_is_empty(s_->send_trailing_metadata)) {
561       grpc_chttp2_encode_data(s_->id, &s_->flow_controlled_buffer, 0, true,
562                               &s_->stats.outgoing, &t_->outbuf);
563     } else {
564       grpc_encode_header_options hopt = {
565           s_->id, true,
566           t_->settings[GRPC_PEER_SETTINGS]
567                       [GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA] !=
568               0,
569 
570           t_->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
571           &s_->stats.outgoing};
572       grpc_chttp2_encode_header(&t_->hpack_compressor,
573                                 extra_headers_for_trailing_metadata_,
574                                 num_extra_headers_for_trailing_metadata_,
575                                 s_->send_trailing_metadata, &hopt, &t_->outbuf);
576     }
577     write_context_->IncTrailingMetadataWrites();
578     grpc_chttp2_reset_ping_clock(t_);
579     SentLastFrame();
580 
581     write_context_->NoteScheduledResults();
582     grpc_chttp2_complete_closure_step(
583         t_, s_, &s_->send_trailing_metadata_finished, GRPC_ERROR_NONE,
584         "send_trailing_metadata_finished");
585   }
586 
stream_became_writable()587   bool stream_became_writable() { return stream_became_writable_; }
588 
589  private:
ConvertInitialMetadataToTrailingMetadata()590   void ConvertInitialMetadataToTrailingMetadata() {
591     GRPC_CHTTP2_IF_TRACING(
592         gpr_log(GPR_INFO, "not sending initial_metadata (Trailers-Only)"));
593     // When sending Trailers-Only, we need to move the :status and
594     // content-type headers to the trailers.
595     if (s_->send_initial_metadata->idx.named.status != nullptr) {
596       extra_headers_for_trailing_metadata_
597           [num_extra_headers_for_trailing_metadata_++] =
598               &s_->send_initial_metadata->idx.named.status->md;
599     }
600     if (s_->send_initial_metadata->idx.named.content_type != nullptr) {
601       extra_headers_for_trailing_metadata_
602           [num_extra_headers_for_trailing_metadata_++] =
603               &s_->send_initial_metadata->idx.named.content_type->md;
604     }
605   }
606 
SentLastFrame()607   void SentLastFrame() {
608     s_->send_trailing_metadata = nullptr;
609     if (s_->sent_trailing_metadata_op) {
610       *s_->sent_trailing_metadata_op = true;
611       s_->sent_trailing_metadata_op = nullptr;
612     }
613     s_->sent_trailing_metadata = true;
614     s_->eos_sent = true;
615 
616     if (!t_->is_client && !s_->read_closed) {
617       grpc_slice_buffer_add(
618           &t_->outbuf, grpc_chttp2_rst_stream_create(
619                            s_->id, GRPC_HTTP2_NO_ERROR, &s_->stats.outgoing));
620     }
621     grpc_chttp2_mark_stream_closed(t_, s_, !t_->is_client, true,
622                                    GRPC_ERROR_NONE);
623   }
624 
625   WriteContext* const write_context_;
626   grpc_chttp2_transport* const t_;
627   grpc_chttp2_stream* const s_;
628   bool stream_became_writable_ = false;
629   grpc_mdelem* extra_headers_for_trailing_metadata_[2];
630   size_t num_extra_headers_for_trailing_metadata_ = 0;
631 };
632 }  // namespace
633 
grpc_chttp2_begin_write(grpc_chttp2_transport * t)634 grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
635     grpc_chttp2_transport* t) {
636   WriteContext ctx(t);
637   ctx.FlushSettings();
638   ctx.FlushPingAcks();
639   ctx.FlushQueuedBuffers();
640   ctx.EnactHpackSettings();
641 
642   if (t->flow_control->remote_window() > 0) {
643     ctx.UpdateStreamsNoLongerStalled();
644   }
645 
646   /* for each grpc_chttp2_stream that's become writable, frame it's data
647      (according to available window sizes) and add to the output buffer */
648   while (grpc_chttp2_stream* s = ctx.NextStream()) {
649     StreamWriteContext stream_ctx(&ctx, s);
650     size_t orig_len = t->outbuf.length;
651     stream_ctx.FlushInitialMetadata();
652     stream_ctx.FlushWindowUpdates();
653     stream_ctx.FlushData();
654     stream_ctx.FlushTrailingMetadata();
655     if (t->outbuf.length > orig_len) {
656       /* Add this stream to the list of the contexts to be traced at TCP */
657       s->byte_counter += t->outbuf.length - orig_len;
658       if (s->traced && grpc_endpoint_can_track_err(t->ep)) {
659         grpc_core::ContextList::Append(&t->cl, s);
660       }
661     }
662     if (stream_ctx.stream_became_writable()) {
663       if (!grpc_chttp2_list_add_writing_stream(t, s)) {
664         /* already in writing list: drop ref */
665         GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:already_writing");
666       } else {
667         /* ref will be dropped at end of write */
668       }
669     } else {
670       GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:no_write");
671     }
672   }
673 
674   ctx.FlushWindowUpdates();
675 
676   maybe_initiate_ping(t);
677 
678   return ctx.Result();
679 }
680 
grpc_chttp2_end_write(grpc_chttp2_transport * t,grpc_error * error)681 void grpc_chttp2_end_write(grpc_chttp2_transport* t, grpc_error* error) {
682   GPR_TIMER_SCOPE("grpc_chttp2_end_write", 0);
683   grpc_chttp2_stream* s;
684 
685   if (t->channelz_socket != nullptr) {
686     t->channelz_socket->RecordMessagesSent(t->num_messages_in_next_write);
687   }
688   t->num_messages_in_next_write = 0;
689 
690   while (grpc_chttp2_list_pop_writing_stream(t, &s)) {
691     if (s->sending_bytes != 0) {
692       update_list(t, s, static_cast<int64_t>(s->sending_bytes),
693                   &s->on_write_finished_cbs, &s->flow_controlled_bytes_written,
694                   GRPC_ERROR_REF(error));
695       s->sending_bytes = 0;
696     }
697     GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:end");
698   }
699   grpc_slice_buffer_reset_and_unref_internal(&t->outbuf);
700   GRPC_ERROR_UNREF(error);
701 }
702