• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/ext/transport/chttp2/transport/internal.h"
22 
23 #include <limits.h>
24 
25 #include <grpc/support/log.h>
26 
27 #include "src/core/lib/debug/stats.h"
28 #include "src/core/lib/profiling/timers.h"
29 #include "src/core/lib/slice/slice_internal.h"
30 #include "src/core/lib/transport/http2_errors.h"
31 
add_to_write_list(grpc_chttp2_write_cb ** list,grpc_chttp2_write_cb * cb)32 static void add_to_write_list(grpc_chttp2_write_cb** list,
33                               grpc_chttp2_write_cb* cb) {
34   cb->next = *list;
35   *list = cb;
36 }
37 
finish_write_cb(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_chttp2_write_cb * cb,grpc_error * error)38 static void finish_write_cb(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
39                             grpc_chttp2_write_cb* cb, grpc_error* error) {
40   grpc_chttp2_complete_closure_step(t, s, &cb->closure, error,
41                                     "finish_write_cb");
42   cb->next = t->write_cb_pool;
43   t->write_cb_pool = cb;
44 }
45 
maybe_initiate_ping(grpc_chttp2_transport * t)46 static void maybe_initiate_ping(grpc_chttp2_transport* t) {
47   grpc_chttp2_ping_queue* pq = &t->ping_queue;
48   if (grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
49     /* no ping needed: wait */
50     return;
51   }
52   if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) {
53     /* ping already in-flight: wait */
54     if (grpc_http_trace.enabled() || grpc_bdp_estimator_trace.enabled()) {
55       gpr_log(GPR_INFO, "%s: Ping delayed [%p]: already pinging",
56               t->is_client ? "CLIENT" : "SERVER", t->peer_string);
57     }
58     return;
59   }
60   if (t->ping_state.pings_before_data_required == 0 &&
61       t->ping_policy.max_pings_without_data != 0) {
62     /* need to receive something of substance before sending a ping again */
63     if (grpc_http_trace.enabled() || grpc_bdp_estimator_trace.enabled()) {
64       gpr_log(GPR_INFO, "%s: Ping delayed [%p]: too many recent pings: %d/%d",
65               t->is_client ? "CLIENT" : "SERVER", t->peer_string,
66               t->ping_state.pings_before_data_required,
67               t->ping_policy.max_pings_without_data);
68     }
69     return;
70   }
71   grpc_millis now = grpc_core::ExecCtx::Get()->Now();
72 
73   grpc_millis next_allowed_ping_interval =
74       (t->keepalive_permit_without_calls == 0 &&
75        grpc_chttp2_stream_map_size(&t->stream_map) == 0)
76           ? 7200 * GPR_MS_PER_SEC
77           : t->ping_policy.min_sent_ping_interval_without_data;
78   grpc_millis next_allowed_ping =
79       t->ping_state.last_ping_sent_time + next_allowed_ping_interval;
80 
81   if (next_allowed_ping > now) {
82     /* not enough elapsed time between successive pings */
83     if (grpc_http_trace.enabled() || grpc_bdp_estimator_trace.enabled()) {
84       gpr_log(GPR_INFO,
85               "%s: Ping delayed [%p]: not enough time elapsed since last ping. "
86               " Last ping %f: Next ping %f: Now %f",
87               t->is_client ? "CLIENT" : "SERVER", t->peer_string,
88               static_cast<double>(t->ping_state.last_ping_sent_time),
89               static_cast<double>(next_allowed_ping), static_cast<double>(now));
90     }
91     if (!t->ping_state.is_delayed_ping_timer_set) {
92       t->ping_state.is_delayed_ping_timer_set = true;
93       GRPC_CHTTP2_REF_TRANSPORT(t, "retry_initiate_ping_locked");
94       grpc_timer_init(&t->ping_state.delayed_ping_timer, next_allowed_ping,
95                       &t->retry_initiate_ping_locked);
96     }
97     return;
98   }
99 
100   pq->inflight_id = t->ping_ctr;
101   t->ping_ctr++;
102   GRPC_CLOSURE_LIST_SCHED(&pq->lists[GRPC_CHTTP2_PCL_INITIATE]);
103   grpc_closure_list_move(&pq->lists[GRPC_CHTTP2_PCL_NEXT],
104                          &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
105   grpc_slice_buffer_add(&t->outbuf,
106                         grpc_chttp2_ping_create(false, pq->inflight_id));
107   GRPC_STATS_INC_HTTP2_PINGS_SENT();
108   t->ping_state.last_ping_sent_time = now;
109   if (grpc_http_trace.enabled() || grpc_bdp_estimator_trace.enabled()) {
110     gpr_log(GPR_INFO, "%s: Ping sent [%p]: %d/%d",
111             t->is_client ? "CLIENT" : "SERVER", t->peer_string,
112             t->ping_state.pings_before_data_required,
113             t->ping_policy.max_pings_without_data);
114   }
115   t->ping_state.pings_before_data_required -=
116       (t->ping_state.pings_before_data_required != 0);
117 }
118 
update_list(grpc_chttp2_transport * t,grpc_chttp2_stream * s,int64_t send_bytes,grpc_chttp2_write_cb ** list,int64_t * ctr,grpc_error * error)119 static bool update_list(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
120                         int64_t send_bytes, grpc_chttp2_write_cb** list,
121                         int64_t* ctr, grpc_error* error) {
122   bool sched_any = false;
123   grpc_chttp2_write_cb* cb = *list;
124   *list = nullptr;
125   *ctr += send_bytes;
126   while (cb) {
127     grpc_chttp2_write_cb* next = cb->next;
128     if (cb->call_at_byte <= *ctr) {
129       sched_any = true;
130       finish_write_cb(t, s, cb, GRPC_ERROR_REF(error));
131     } else {
132       add_to_write_list(list, cb);
133     }
134     cb = next;
135   }
136   GRPC_ERROR_UNREF(error);
137   return sched_any;
138 }
139 
report_stall(grpc_chttp2_transport * t,grpc_chttp2_stream * s,const char * staller)140 static void report_stall(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
141                          const char* staller) {
142   if (grpc_flowctl_trace.enabled()) {
143     gpr_log(
144         GPR_DEBUG,
145         "%s:%p stream %d moved to stalled list by %s. This is FULLY expected "
146         "to happen in a healthy program that is not seeing flow control stalls."
147         " However, if you know that there are unwanted stalls, here is some "
148         "helpful data: [fc:pending=%" PRIdPTR ":pending-compressed=%" PRIdPTR
149         ":flowed=%" PRId64 ":peer_initwin=%d:t_win=%" PRId64
150         ":s_win=%d:s_delta=%" PRId64 "]",
151         t->peer_string, t, s->id, staller, s->flow_controlled_buffer.length,
152         s->compressed_data_buffer.length, s->flow_controlled_bytes_flowed,
153         t->settings[GRPC_ACKED_SETTINGS]
154                    [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
155         t->flow_control->remote_window(),
156         static_cast<uint32_t> GPR_MAX(
157             0,
158             s->flow_control->remote_window_delta() +
159                 (int64_t)t->settings[GRPC_PEER_SETTINGS]
160                                     [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]),
161         s->flow_control->remote_window_delta());
162   }
163 }
164 
stream_ref_if_not_destroyed(gpr_refcount * r)165 static bool stream_ref_if_not_destroyed(gpr_refcount* r) {
166   gpr_atm count;
167   do {
168     count = gpr_atm_acq_load(&r->count);
169     if (count == 0) return false;
170   } while (!gpr_atm_rel_cas(&r->count, count, count + 1));
171   return true;
172 }
173 
174 /* How many bytes would we like to put on the wire during a single syscall */
target_write_size(grpc_chttp2_transport * t)175 static uint32_t target_write_size(grpc_chttp2_transport* t) {
176   return 1024 * 1024;
177 }
178 
179 // Returns true if initial_metadata contains only default headers.
is_default_initial_metadata(grpc_metadata_batch * initial_metadata)180 static bool is_default_initial_metadata(grpc_metadata_batch* initial_metadata) {
181   return initial_metadata->list.default_count == initial_metadata->list.count;
182 }
183 
184 namespace {
185 class StreamWriteContext;
186 
187 class WriteContext {
188  public:
WriteContext(grpc_chttp2_transport * t)189   WriteContext(grpc_chttp2_transport* t) : t_(t) {
190     GRPC_STATS_INC_HTTP2_WRITES_BEGUN();
191     GPR_TIMER_SCOPE("grpc_chttp2_begin_write", 0);
192   }
193 
194   // TODO(ctiller): make this the destructor
FlushStats()195   void FlushStats() {
196     GRPC_STATS_INC_HTTP2_SEND_INITIAL_METADATA_PER_WRITE(
197         initial_metadata_writes_);
198     GRPC_STATS_INC_HTTP2_SEND_MESSAGE_PER_WRITE(message_writes_);
199     GRPC_STATS_INC_HTTP2_SEND_TRAILING_METADATA_PER_WRITE(
200         trailing_metadata_writes_);
201     GRPC_STATS_INC_HTTP2_SEND_FLOWCTL_PER_WRITE(flow_control_writes_);
202   }
203 
FlushSettings()204   void FlushSettings() {
205     if (t_->dirtied_local_settings && !t_->sent_local_settings) {
206       grpc_slice_buffer_add(
207           &t_->outbuf, grpc_chttp2_settings_create(
208                            t_->settings[GRPC_SENT_SETTINGS],
209                            t_->settings[GRPC_LOCAL_SETTINGS],
210                            t_->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS));
211       t_->force_send_settings = false;
212       t_->dirtied_local_settings = false;
213       t_->sent_local_settings = true;
214       GRPC_STATS_INC_HTTP2_SETTINGS_WRITES();
215     }
216   }
217 
FlushQueuedBuffers()218   void FlushQueuedBuffers() {
219     /* simple writes are queued to qbuf, and flushed here */
220     grpc_slice_buffer_move_into(&t_->qbuf, &t_->outbuf);
221     GPR_ASSERT(t_->qbuf.count == 0);
222   }
223 
FlushWindowUpdates()224   void FlushWindowUpdates() {
225     uint32_t transport_announce =
226         t_->flow_control->MaybeSendUpdate(t_->outbuf.count > 0);
227     if (transport_announce) {
228       grpc_transport_one_way_stats throwaway_stats;
229       grpc_slice_buffer_add(
230           &t_->outbuf, grpc_chttp2_window_update_create(0, transport_announce,
231                                                         &throwaway_stats));
232       ResetPingClock();
233     }
234   }
235 
FlushPingAcks()236   void FlushPingAcks() {
237     for (size_t i = 0; i < t_->ping_ack_count; i++) {
238       grpc_slice_buffer_add(&t_->outbuf,
239                             grpc_chttp2_ping_create(true, t_->ping_acks[i]));
240     }
241     t_->ping_ack_count = 0;
242   }
243 
EnactHpackSettings()244   void EnactHpackSettings() {
245     grpc_chttp2_hpack_compressor_set_max_table_size(
246         &t_->hpack_compressor,
247         t_->settings[GRPC_PEER_SETTINGS]
248                     [GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]);
249   }
250 
UpdateStreamsNoLongerStalled()251   void UpdateStreamsNoLongerStalled() {
252     grpc_chttp2_stream* s;
253     while (grpc_chttp2_list_pop_stalled_by_transport(t_, &s)) {
254       if (t_->closed_with_error == GRPC_ERROR_NONE &&
255           grpc_chttp2_list_add_writable_stream(t_, s)) {
256         if (!stream_ref_if_not_destroyed(&s->refcount->refs)) {
257           grpc_chttp2_list_remove_writable_stream(t_, s);
258         }
259       }
260     }
261   }
262 
NextStream()263   grpc_chttp2_stream* NextStream() {
264     if (t_->outbuf.length > target_write_size(t_)) {
265       result_.partial = true;
266       return nullptr;
267     }
268 
269     grpc_chttp2_stream* s;
270     if (!grpc_chttp2_list_pop_writable_stream(t_, &s)) {
271       return nullptr;
272     }
273 
274     return s;
275   }
276 
ResetPingClock()277   void ResetPingClock() {
278     if (!t_->is_client) {
279       t_->ping_recv_state.last_ping_recv_time = GRPC_MILLIS_INF_PAST;
280       t_->ping_recv_state.ping_strikes = 0;
281     }
282     t_->ping_state.pings_before_data_required =
283         t_->ping_policy.max_pings_without_data;
284   }
285 
IncInitialMetadataWrites()286   void IncInitialMetadataWrites() { ++initial_metadata_writes_; }
IncWindowUpdateWrites()287   void IncWindowUpdateWrites() { ++flow_control_writes_; }
IncMessageWrites()288   void IncMessageWrites() { ++message_writes_; }
IncTrailingMetadataWrites()289   void IncTrailingMetadataWrites() { ++trailing_metadata_writes_; }
290 
NoteScheduledResults()291   void NoteScheduledResults() { result_.early_results_scheduled = true; }
292 
transport() const293   grpc_chttp2_transport* transport() const { return t_; }
294 
Result()295   grpc_chttp2_begin_write_result Result() {
296     result_.writing = t_->outbuf.count > 0;
297     return result_;
298   }
299 
300  private:
301   grpc_chttp2_transport* const t_;
302 
303   /* stats histogram counters: we increment these throughout this function,
304      and at the end publish to the central stats histograms */
305   int flow_control_writes_ = 0;
306   int initial_metadata_writes_ = 0;
307   int trailing_metadata_writes_ = 0;
308   int message_writes_ = 0;
309   grpc_chttp2_begin_write_result result_ = {false, false, false};
310 };
311 
312 class DataSendContext {
313  public:
DataSendContext(WriteContext * write_context,grpc_chttp2_transport * t,grpc_chttp2_stream * s)314   DataSendContext(WriteContext* write_context, grpc_chttp2_transport* t,
315                   grpc_chttp2_stream* s)
316       : write_context_(write_context),
317         t_(t),
318         s_(s),
319         sending_bytes_before_(s_->sending_bytes) {}
320 
stream_remote_window() const321   uint32_t stream_remote_window() const {
322     return static_cast<uint32_t> GPR_MAX(
323         0, s_->flow_control->remote_window_delta() +
324                (int64_t)t_->settings[GRPC_PEER_SETTINGS]
325                                     [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
326   }
327 
max_outgoing() const328   uint32_t max_outgoing() const {
329     return static_cast<uint32_t> GPR_MIN(
330         t_->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
331         GPR_MIN(stream_remote_window(), t_->flow_control->remote_window()));
332   }
333 
AnyOutgoing() const334   bool AnyOutgoing() const { return max_outgoing() > 0; }
335 
FlushCompressedBytes()336   void FlushCompressedBytes() {
337     uint32_t send_bytes = static_cast<uint32_t> GPR_MIN(
338         max_outgoing(), s_->compressed_data_buffer.length);
339     bool is_last_data_frame =
340         (send_bytes == s_->compressed_data_buffer.length &&
341          s_->flow_controlled_buffer.length == 0 &&
342          s_->fetching_send_message == nullptr);
343     if (is_last_data_frame && s_->send_trailing_metadata != nullptr &&
344         s_->stream_compression_ctx != nullptr) {
345       if (GPR_UNLIKELY(!grpc_stream_compress(
346               s_->stream_compression_ctx, &s_->flow_controlled_buffer,
347               &s_->compressed_data_buffer, nullptr, MAX_SIZE_T,
348               GRPC_STREAM_COMPRESSION_FLUSH_FINISH))) {
349         gpr_log(GPR_ERROR, "Stream compression failed.");
350       }
351       grpc_stream_compression_context_destroy(s_->stream_compression_ctx);
352       s_->stream_compression_ctx = nullptr;
353       /* After finish, bytes in s->compressed_data_buffer may be
354        * more than max_outgoing. Start another round of the current
355        * while loop so that send_bytes and is_last_data_frame are
356        * recalculated. */
357       return;
358     }
359     is_last_frame_ = is_last_data_frame &&
360                      s_->send_trailing_metadata != nullptr &&
361                      grpc_metadata_batch_is_empty(s_->send_trailing_metadata);
362     grpc_chttp2_encode_data(s_->id, &s_->compressed_data_buffer, send_bytes,
363                             is_last_frame_, &s_->stats.outgoing, &t_->outbuf);
364     s_->flow_control->SentData(send_bytes);
365     if (s_->compressed_data_buffer.length == 0) {
366       s_->sending_bytes += s_->uncompressed_data_size;
367     }
368   }
369 
CompressMoreBytes()370   void CompressMoreBytes() {
371     if (s_->stream_compression_ctx == nullptr) {
372       s_->stream_compression_ctx =
373           grpc_stream_compression_context_create(s_->stream_compression_method);
374     }
375     s_->uncompressed_data_size = s_->flow_controlled_buffer.length;
376     if (GPR_UNLIKELY(!grpc_stream_compress(
377             s_->stream_compression_ctx, &s_->flow_controlled_buffer,
378             &s_->compressed_data_buffer, nullptr, MAX_SIZE_T,
379             GRPC_STREAM_COMPRESSION_FLUSH_SYNC))) {
380       gpr_log(GPR_ERROR, "Stream compression failed.");
381     }
382   }
383 
is_last_frame() const384   bool is_last_frame() const { return is_last_frame_; }
385 
CallCallbacks()386   void CallCallbacks() {
387     if (update_list(
388             t_, s_,
389             static_cast<int64_t>(s_->sending_bytes - sending_bytes_before_),
390             &s_->on_flow_controlled_cbs, &s_->flow_controlled_bytes_flowed,
391             GRPC_ERROR_NONE)) {
392       write_context_->NoteScheduledResults();
393     }
394   }
395 
396  private:
397   WriteContext* write_context_;
398   grpc_chttp2_transport* t_;
399   grpc_chttp2_stream* s_;
400   const size_t sending_bytes_before_;
401   bool is_last_frame_ = false;
402 };
403 
404 class StreamWriteContext {
405  public:
StreamWriteContext(WriteContext * write_context,grpc_chttp2_stream * s)406   StreamWriteContext(WriteContext* write_context, grpc_chttp2_stream* s)
407       : write_context_(write_context), t_(write_context->transport()), s_(s) {
408     GRPC_CHTTP2_IF_TRACING(
409         gpr_log(GPR_INFO, "W:%p %s[%d] im-(sent,send)=(%d,%d) announce=%d", t_,
410                 t_->is_client ? "CLIENT" : "SERVER", s->id,
411                 s->sent_initial_metadata, s->send_initial_metadata != nullptr,
412                 (int)(s->flow_control->local_window_delta() -
413                       s->flow_control->announced_window_delta())));
414   }
415 
FlushInitialMetadata()416   void FlushInitialMetadata() {
417     /* send initial metadata if it's available */
418     if (s_->sent_initial_metadata) return;
419     if (s_->send_initial_metadata == nullptr) return;
420 
421     // We skip this on the server side if there is no custom initial
422     // metadata, there are no messages to send, and we are also sending
423     // trailing metadata.  This results in a Trailers-Only response,
424     // which is required for retries, as per:
425     // https://github.com/grpc/proposal/blob/master/A6-client-retries.md#when-retries-are-valid
426     if (!t_->is_client && s_->fetching_send_message == nullptr &&
427         s_->flow_controlled_buffer.length == 0 &&
428         s_->compressed_data_buffer.length == 0 &&
429         s_->send_trailing_metadata != nullptr &&
430         is_default_initial_metadata(s_->send_initial_metadata)) {
431       ConvertInitialMetadataToTrailingMetadata();
432     } else {
433       grpc_encode_header_options hopt = {
434           s_->id,  // stream_id
435           false,   // is_eof
436           t_->settings[GRPC_PEER_SETTINGS]
437                       [GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA] !=
438               0,  // use_true_binary_metadata
439           t_->settings[GRPC_PEER_SETTINGS]
440                       [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],  // max_frame_size
441           &s_->stats.outgoing                                 // stats
442       };
443       grpc_chttp2_encode_header(&t_->hpack_compressor, nullptr, 0,
444                                 s_->send_initial_metadata, &hopt, &t_->outbuf);
445       write_context_->ResetPingClock();
446       write_context_->IncInitialMetadataWrites();
447     }
448 
449     s_->send_initial_metadata = nullptr;
450     s_->sent_initial_metadata = true;
451     write_context_->NoteScheduledResults();
452     grpc_chttp2_complete_closure_step(
453         t_, s_, &s_->send_initial_metadata_finished, GRPC_ERROR_NONE,
454         "send_initial_metadata_finished");
455   }
456 
FlushWindowUpdates()457   void FlushWindowUpdates() {
458     /* send any window updates */
459     const uint32_t stream_announce = s_->flow_control->MaybeSendUpdate();
460     if (stream_announce == 0) return;
461 
462     grpc_slice_buffer_add(
463         &t_->outbuf, grpc_chttp2_window_update_create(s_->id, stream_announce,
464                                                       &s_->stats.outgoing));
465     write_context_->ResetPingClock();
466     write_context_->IncWindowUpdateWrites();
467   }
468 
FlushData()469   void FlushData() {
470     if (!s_->sent_initial_metadata) return;
471 
472     if (s_->flow_controlled_buffer.length == 0 &&
473         s_->compressed_data_buffer.length == 0) {
474       return;  // early out: nothing to do
475     }
476 
477     DataSendContext data_send_context(write_context_, t_, s_);
478 
479     if (!data_send_context.AnyOutgoing()) {
480       if (t_->flow_control->remote_window() <= 0) {
481         report_stall(t_, s_, "transport");
482         grpc_chttp2_list_add_stalled_by_transport(t_, s_);
483       } else if (data_send_context.stream_remote_window() <= 0) {
484         report_stall(t_, s_, "stream");
485         grpc_chttp2_list_add_stalled_by_stream(t_, s_);
486       }
487       return;  // early out: nothing to do
488     }
489 
490     while ((s_->flow_controlled_buffer.length > 0 ||
491             s_->compressed_data_buffer.length > 0) &&
492            data_send_context.max_outgoing() > 0) {
493       if (s_->compressed_data_buffer.length > 0) {
494         data_send_context.FlushCompressedBytes();
495       } else {
496         data_send_context.CompressMoreBytes();
497       }
498     }
499     write_context_->ResetPingClock();
500     if (data_send_context.is_last_frame()) {
501       SentLastFrame();
502     }
503     data_send_context.CallCallbacks();
504     stream_became_writable_ = true;
505     if (s_->flow_controlled_buffer.length > 0 ||
506         s_->compressed_data_buffer.length > 0) {
507       GRPC_CHTTP2_STREAM_REF(s_, "chttp2_writing:fork");
508       grpc_chttp2_list_add_writable_stream(t_, s_);
509     }
510     write_context_->IncMessageWrites();
511   }
512 
FlushTrailingMetadata()513   void FlushTrailingMetadata() {
514     if (!s_->sent_initial_metadata) return;
515 
516     if (s_->send_trailing_metadata == nullptr) return;
517     if (s_->fetching_send_message != nullptr) return;
518     if (s_->flow_controlled_buffer.length != 0) return;
519     if (s_->compressed_data_buffer.length != 0) return;
520 
521     GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "sending trailing_metadata"));
522     if (grpc_metadata_batch_is_empty(s_->send_trailing_metadata)) {
523       grpc_chttp2_encode_data(s_->id, &s_->flow_controlled_buffer, 0, true,
524                               &s_->stats.outgoing, &t_->outbuf);
525     } else {
526       grpc_encode_header_options hopt = {
527           s_->id, true,
528           t_->settings[GRPC_PEER_SETTINGS]
529                       [GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA] !=
530               0,
531 
532           t_->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
533           &s_->stats.outgoing};
534       grpc_chttp2_encode_header(&t_->hpack_compressor,
535                                 extra_headers_for_trailing_metadata_,
536                                 num_extra_headers_for_trailing_metadata_,
537                                 s_->send_trailing_metadata, &hopt, &t_->outbuf);
538     }
539     write_context_->IncTrailingMetadataWrites();
540     write_context_->ResetPingClock();
541     SentLastFrame();
542 
543     write_context_->NoteScheduledResults();
544     grpc_chttp2_complete_closure_step(
545         t_, s_, &s_->send_trailing_metadata_finished, GRPC_ERROR_NONE,
546         "send_trailing_metadata_finished");
547   }
548 
stream_became_writable()549   bool stream_became_writable() { return stream_became_writable_; }
550 
551  private:
ConvertInitialMetadataToTrailingMetadata()552   void ConvertInitialMetadataToTrailingMetadata() {
553     GRPC_CHTTP2_IF_TRACING(
554         gpr_log(GPR_INFO, "not sending initial_metadata (Trailers-Only)"));
555     // When sending Trailers-Only, we need to move the :status and
556     // content-type headers to the trailers.
557     if (s_->send_initial_metadata->idx.named.status != nullptr) {
558       extra_headers_for_trailing_metadata_
559           [num_extra_headers_for_trailing_metadata_++] =
560               &s_->send_initial_metadata->idx.named.status->md;
561     }
562     if (s_->send_initial_metadata->idx.named.content_type != nullptr) {
563       extra_headers_for_trailing_metadata_
564           [num_extra_headers_for_trailing_metadata_++] =
565               &s_->send_initial_metadata->idx.named.content_type->md;
566     }
567   }
568 
SentLastFrame()569   void SentLastFrame() {
570     s_->send_trailing_metadata = nullptr;
571     s_->sent_trailing_metadata = true;
572 
573     if (!t_->is_client && !s_->read_closed) {
574       grpc_slice_buffer_add(
575           &t_->outbuf, grpc_chttp2_rst_stream_create(
576                            s_->id, GRPC_HTTP2_NO_ERROR, &s_->stats.outgoing));
577     }
578     grpc_chttp2_mark_stream_closed(t_, s_, !t_->is_client, true,
579                                    GRPC_ERROR_NONE);
580   }
581 
582   WriteContext* const write_context_;
583   grpc_chttp2_transport* const t_;
584   grpc_chttp2_stream* const s_;
585   bool stream_became_writable_ = false;
586   grpc_mdelem* extra_headers_for_trailing_metadata_[2];
587   size_t num_extra_headers_for_trailing_metadata_ = 0;
588 };
589 }  // namespace
590 
grpc_chttp2_begin_write(grpc_chttp2_transport * t)591 grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
592     grpc_chttp2_transport* t) {
593   WriteContext ctx(t);
594   ctx.FlushSettings();
595   ctx.FlushPingAcks();
596   ctx.FlushQueuedBuffers();
597   ctx.EnactHpackSettings();
598 
599   if (t->flow_control->remote_window() > 0) {
600     ctx.UpdateStreamsNoLongerStalled();
601   }
602 
603   /* for each grpc_chttp2_stream that's become writable, frame it's data
604      (according to available window sizes) and add to the output buffer */
605   while (grpc_chttp2_stream* s = ctx.NextStream()) {
606     StreamWriteContext stream_ctx(&ctx, s);
607     stream_ctx.FlushInitialMetadata();
608     stream_ctx.FlushWindowUpdates();
609     stream_ctx.FlushData();
610     stream_ctx.FlushTrailingMetadata();
611 
612     if (stream_ctx.stream_became_writable()) {
613       if (!grpc_chttp2_list_add_writing_stream(t, s)) {
614         /* already in writing list: drop ref */
615         GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:already_writing");
616       } else {
617         /* ref will be dropped at end of write */
618       }
619     } else {
620       GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:no_write");
621     }
622   }
623 
624   ctx.FlushWindowUpdates();
625 
626   maybe_initiate_ping(t);
627 
628   return ctx.Result();
629 }
630 
grpc_chttp2_end_write(grpc_chttp2_transport * t,grpc_error * error)631 void grpc_chttp2_end_write(grpc_chttp2_transport* t, grpc_error* error) {
632   GPR_TIMER_SCOPE("grpc_chttp2_end_write", 0);
633   grpc_chttp2_stream* s;
634 
635   while (grpc_chttp2_list_pop_writing_stream(t, &s)) {
636     if (s->sending_bytes != 0) {
637       update_list(t, s, static_cast<int64_t>(s->sending_bytes),
638                   &s->on_write_finished_cbs, &s->flow_controlled_bytes_written,
639                   GRPC_ERROR_REF(error));
640       s->sending_bytes = 0;
641     }
642     GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:end");
643   }
644   grpc_slice_buffer_reset_and_unref_internal(&t->outbuf);
645   GRPC_ERROR_UNREF(error);
646 }
647