1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/transport/chttp2/transport/internal.h"
22
23 #include <limits.h>
24
25 #include <grpc/support/log.h>
26
27 #include "src/core/lib/debug/stats.h"
28 #include "src/core/lib/profiling/timers.h"
29 #include "src/core/lib/slice/slice_internal.h"
30 #include "src/core/lib/transport/http2_errors.h"
31
add_to_write_list(grpc_chttp2_write_cb ** list,grpc_chttp2_write_cb * cb)32 static void add_to_write_list(grpc_chttp2_write_cb** list,
33 grpc_chttp2_write_cb* cb) {
34 cb->next = *list;
35 *list = cb;
36 }
37
finish_write_cb(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_chttp2_write_cb * cb,grpc_error * error)38 static void finish_write_cb(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
39 grpc_chttp2_write_cb* cb, grpc_error* error) {
40 grpc_chttp2_complete_closure_step(t, s, &cb->closure, error,
41 "finish_write_cb");
42 cb->next = t->write_cb_pool;
43 t->write_cb_pool = cb;
44 }
45
maybe_initiate_ping(grpc_chttp2_transport * t)46 static void maybe_initiate_ping(grpc_chttp2_transport* t) {
47 grpc_chttp2_ping_queue* pq = &t->ping_queue;
48 if (grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
49 /* no ping needed: wait */
50 return;
51 }
52 if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) {
53 /* ping already in-flight: wait */
54 if (grpc_http_trace.enabled() || grpc_bdp_estimator_trace.enabled()) {
55 gpr_log(GPR_INFO, "%s: Ping delayed [%p]: already pinging",
56 t->is_client ? "CLIENT" : "SERVER", t->peer_string);
57 }
58 return;
59 }
60 if (t->ping_state.pings_before_data_required == 0 &&
61 t->ping_policy.max_pings_without_data != 0) {
62 /* need to receive something of substance before sending a ping again */
63 if (grpc_http_trace.enabled() || grpc_bdp_estimator_trace.enabled()) {
64 gpr_log(GPR_INFO, "%s: Ping delayed [%p]: too many recent pings: %d/%d",
65 t->is_client ? "CLIENT" : "SERVER", t->peer_string,
66 t->ping_state.pings_before_data_required,
67 t->ping_policy.max_pings_without_data);
68 }
69 return;
70 }
71 grpc_millis now = grpc_core::ExecCtx::Get()->Now();
72
73 grpc_millis next_allowed_ping_interval =
74 (t->keepalive_permit_without_calls == 0 &&
75 grpc_chttp2_stream_map_size(&t->stream_map) == 0)
76 ? 7200 * GPR_MS_PER_SEC
77 : t->ping_policy.min_sent_ping_interval_without_data;
78 grpc_millis next_allowed_ping =
79 t->ping_state.last_ping_sent_time + next_allowed_ping_interval;
80
81 if (next_allowed_ping > now) {
82 /* not enough elapsed time between successive pings */
83 if (grpc_http_trace.enabled() || grpc_bdp_estimator_trace.enabled()) {
84 gpr_log(GPR_INFO,
85 "%s: Ping delayed [%p]: not enough time elapsed since last ping. "
86 " Last ping %f: Next ping %f: Now %f",
87 t->is_client ? "CLIENT" : "SERVER", t->peer_string,
88 static_cast<double>(t->ping_state.last_ping_sent_time),
89 static_cast<double>(next_allowed_ping), static_cast<double>(now));
90 }
91 if (!t->ping_state.is_delayed_ping_timer_set) {
92 t->ping_state.is_delayed_ping_timer_set = true;
93 GRPC_CHTTP2_REF_TRANSPORT(t, "retry_initiate_ping_locked");
94 grpc_timer_init(&t->ping_state.delayed_ping_timer, next_allowed_ping,
95 &t->retry_initiate_ping_locked);
96 }
97 return;
98 }
99
100 pq->inflight_id = t->ping_ctr;
101 t->ping_ctr++;
102 GRPC_CLOSURE_LIST_SCHED(&pq->lists[GRPC_CHTTP2_PCL_INITIATE]);
103 grpc_closure_list_move(&pq->lists[GRPC_CHTTP2_PCL_NEXT],
104 &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
105 grpc_slice_buffer_add(&t->outbuf,
106 grpc_chttp2_ping_create(false, pq->inflight_id));
107 GRPC_STATS_INC_HTTP2_PINGS_SENT();
108 t->ping_state.last_ping_sent_time = now;
109 if (grpc_http_trace.enabled() || grpc_bdp_estimator_trace.enabled()) {
110 gpr_log(GPR_INFO, "%s: Ping sent [%p]: %d/%d",
111 t->is_client ? "CLIENT" : "SERVER", t->peer_string,
112 t->ping_state.pings_before_data_required,
113 t->ping_policy.max_pings_without_data);
114 }
115 t->ping_state.pings_before_data_required -=
116 (t->ping_state.pings_before_data_required != 0);
117 }
118
update_list(grpc_chttp2_transport * t,grpc_chttp2_stream * s,int64_t send_bytes,grpc_chttp2_write_cb ** list,int64_t * ctr,grpc_error * error)119 static bool update_list(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
120 int64_t send_bytes, grpc_chttp2_write_cb** list,
121 int64_t* ctr, grpc_error* error) {
122 bool sched_any = false;
123 grpc_chttp2_write_cb* cb = *list;
124 *list = nullptr;
125 *ctr += send_bytes;
126 while (cb) {
127 grpc_chttp2_write_cb* next = cb->next;
128 if (cb->call_at_byte <= *ctr) {
129 sched_any = true;
130 finish_write_cb(t, s, cb, GRPC_ERROR_REF(error));
131 } else {
132 add_to_write_list(list, cb);
133 }
134 cb = next;
135 }
136 GRPC_ERROR_UNREF(error);
137 return sched_any;
138 }
139
report_stall(grpc_chttp2_transport * t,grpc_chttp2_stream * s,const char * staller)140 static void report_stall(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
141 const char* staller) {
142 if (grpc_flowctl_trace.enabled()) {
143 gpr_log(
144 GPR_DEBUG,
145 "%s:%p stream %d moved to stalled list by %s. This is FULLY expected "
146 "to happen in a healthy program that is not seeing flow control stalls."
147 " However, if you know that there are unwanted stalls, here is some "
148 "helpful data: [fc:pending=%" PRIdPTR ":pending-compressed=%" PRIdPTR
149 ":flowed=%" PRId64 ":peer_initwin=%d:t_win=%" PRId64
150 ":s_win=%d:s_delta=%" PRId64 "]",
151 t->peer_string, t, s->id, staller, s->flow_controlled_buffer.length,
152 s->compressed_data_buffer.length, s->flow_controlled_bytes_flowed,
153 t->settings[GRPC_ACKED_SETTINGS]
154 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
155 t->flow_control->remote_window(),
156 static_cast<uint32_t> GPR_MAX(
157 0,
158 s->flow_control->remote_window_delta() +
159 (int64_t)t->settings[GRPC_PEER_SETTINGS]
160 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]),
161 s->flow_control->remote_window_delta());
162 }
163 }
164
stream_ref_if_not_destroyed(gpr_refcount * r)165 static bool stream_ref_if_not_destroyed(gpr_refcount* r) {
166 gpr_atm count;
167 do {
168 count = gpr_atm_acq_load(&r->count);
169 if (count == 0) return false;
170 } while (!gpr_atm_rel_cas(&r->count, count, count + 1));
171 return true;
172 }
173
174 /* How many bytes would we like to put on the wire during a single syscall */
target_write_size(grpc_chttp2_transport * t)175 static uint32_t target_write_size(grpc_chttp2_transport* t) {
176 return 1024 * 1024;
177 }
178
179 // Returns true if initial_metadata contains only default headers.
is_default_initial_metadata(grpc_metadata_batch * initial_metadata)180 static bool is_default_initial_metadata(grpc_metadata_batch* initial_metadata) {
181 return initial_metadata->list.default_count == initial_metadata->list.count;
182 }
183
184 namespace {
185 class StreamWriteContext;
186
187 class WriteContext {
188 public:
WriteContext(grpc_chttp2_transport * t)189 WriteContext(grpc_chttp2_transport* t) : t_(t) {
190 GRPC_STATS_INC_HTTP2_WRITES_BEGUN();
191 GPR_TIMER_SCOPE("grpc_chttp2_begin_write", 0);
192 }
193
194 // TODO(ctiller): make this the destructor
FlushStats()195 void FlushStats() {
196 GRPC_STATS_INC_HTTP2_SEND_INITIAL_METADATA_PER_WRITE(
197 initial_metadata_writes_);
198 GRPC_STATS_INC_HTTP2_SEND_MESSAGE_PER_WRITE(message_writes_);
199 GRPC_STATS_INC_HTTP2_SEND_TRAILING_METADATA_PER_WRITE(
200 trailing_metadata_writes_);
201 GRPC_STATS_INC_HTTP2_SEND_FLOWCTL_PER_WRITE(flow_control_writes_);
202 }
203
FlushSettings()204 void FlushSettings() {
205 if (t_->dirtied_local_settings && !t_->sent_local_settings) {
206 grpc_slice_buffer_add(
207 &t_->outbuf, grpc_chttp2_settings_create(
208 t_->settings[GRPC_SENT_SETTINGS],
209 t_->settings[GRPC_LOCAL_SETTINGS],
210 t_->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS));
211 t_->force_send_settings = false;
212 t_->dirtied_local_settings = false;
213 t_->sent_local_settings = true;
214 GRPC_STATS_INC_HTTP2_SETTINGS_WRITES();
215 }
216 }
217
FlushQueuedBuffers()218 void FlushQueuedBuffers() {
219 /* simple writes are queued to qbuf, and flushed here */
220 grpc_slice_buffer_move_into(&t_->qbuf, &t_->outbuf);
221 GPR_ASSERT(t_->qbuf.count == 0);
222 }
223
FlushWindowUpdates()224 void FlushWindowUpdates() {
225 uint32_t transport_announce =
226 t_->flow_control->MaybeSendUpdate(t_->outbuf.count > 0);
227 if (transport_announce) {
228 grpc_transport_one_way_stats throwaway_stats;
229 grpc_slice_buffer_add(
230 &t_->outbuf, grpc_chttp2_window_update_create(0, transport_announce,
231 &throwaway_stats));
232 ResetPingClock();
233 }
234 }
235
FlushPingAcks()236 void FlushPingAcks() {
237 for (size_t i = 0; i < t_->ping_ack_count; i++) {
238 grpc_slice_buffer_add(&t_->outbuf,
239 grpc_chttp2_ping_create(true, t_->ping_acks[i]));
240 }
241 t_->ping_ack_count = 0;
242 }
243
EnactHpackSettings()244 void EnactHpackSettings() {
245 grpc_chttp2_hpack_compressor_set_max_table_size(
246 &t_->hpack_compressor,
247 t_->settings[GRPC_PEER_SETTINGS]
248 [GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]);
249 }
250
UpdateStreamsNoLongerStalled()251 void UpdateStreamsNoLongerStalled() {
252 grpc_chttp2_stream* s;
253 while (grpc_chttp2_list_pop_stalled_by_transport(t_, &s)) {
254 if (t_->closed_with_error == GRPC_ERROR_NONE &&
255 grpc_chttp2_list_add_writable_stream(t_, s)) {
256 if (!stream_ref_if_not_destroyed(&s->refcount->refs)) {
257 grpc_chttp2_list_remove_writable_stream(t_, s);
258 }
259 }
260 }
261 }
262
NextStream()263 grpc_chttp2_stream* NextStream() {
264 if (t_->outbuf.length > target_write_size(t_)) {
265 result_.partial = true;
266 return nullptr;
267 }
268
269 grpc_chttp2_stream* s;
270 if (!grpc_chttp2_list_pop_writable_stream(t_, &s)) {
271 return nullptr;
272 }
273
274 return s;
275 }
276
ResetPingClock()277 void ResetPingClock() {
278 if (!t_->is_client) {
279 t_->ping_recv_state.last_ping_recv_time = GRPC_MILLIS_INF_PAST;
280 t_->ping_recv_state.ping_strikes = 0;
281 }
282 t_->ping_state.pings_before_data_required =
283 t_->ping_policy.max_pings_without_data;
284 }
285
IncInitialMetadataWrites()286 void IncInitialMetadataWrites() { ++initial_metadata_writes_; }
IncWindowUpdateWrites()287 void IncWindowUpdateWrites() { ++flow_control_writes_; }
IncMessageWrites()288 void IncMessageWrites() { ++message_writes_; }
IncTrailingMetadataWrites()289 void IncTrailingMetadataWrites() { ++trailing_metadata_writes_; }
290
NoteScheduledResults()291 void NoteScheduledResults() { result_.early_results_scheduled = true; }
292
transport() const293 grpc_chttp2_transport* transport() const { return t_; }
294
Result()295 grpc_chttp2_begin_write_result Result() {
296 result_.writing = t_->outbuf.count > 0;
297 return result_;
298 }
299
300 private:
301 grpc_chttp2_transport* const t_;
302
303 /* stats histogram counters: we increment these throughout this function,
304 and at the end publish to the central stats histograms */
305 int flow_control_writes_ = 0;
306 int initial_metadata_writes_ = 0;
307 int trailing_metadata_writes_ = 0;
308 int message_writes_ = 0;
309 grpc_chttp2_begin_write_result result_ = {false, false, false};
310 };
311
312 class DataSendContext {
313 public:
DataSendContext(WriteContext * write_context,grpc_chttp2_transport * t,grpc_chttp2_stream * s)314 DataSendContext(WriteContext* write_context, grpc_chttp2_transport* t,
315 grpc_chttp2_stream* s)
316 : write_context_(write_context),
317 t_(t),
318 s_(s),
319 sending_bytes_before_(s_->sending_bytes) {}
320
stream_remote_window() const321 uint32_t stream_remote_window() const {
322 return static_cast<uint32_t> GPR_MAX(
323 0, s_->flow_control->remote_window_delta() +
324 (int64_t)t_->settings[GRPC_PEER_SETTINGS]
325 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
326 }
327
max_outgoing() const328 uint32_t max_outgoing() const {
329 return static_cast<uint32_t> GPR_MIN(
330 t_->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
331 GPR_MIN(stream_remote_window(), t_->flow_control->remote_window()));
332 }
333
AnyOutgoing() const334 bool AnyOutgoing() const { return max_outgoing() > 0; }
335
FlushCompressedBytes()336 void FlushCompressedBytes() {
337 uint32_t send_bytes = static_cast<uint32_t> GPR_MIN(
338 max_outgoing(), s_->compressed_data_buffer.length);
339 bool is_last_data_frame =
340 (send_bytes == s_->compressed_data_buffer.length &&
341 s_->flow_controlled_buffer.length == 0 &&
342 s_->fetching_send_message == nullptr);
343 if (is_last_data_frame && s_->send_trailing_metadata != nullptr &&
344 s_->stream_compression_ctx != nullptr) {
345 if (GPR_UNLIKELY(!grpc_stream_compress(
346 s_->stream_compression_ctx, &s_->flow_controlled_buffer,
347 &s_->compressed_data_buffer, nullptr, MAX_SIZE_T,
348 GRPC_STREAM_COMPRESSION_FLUSH_FINISH))) {
349 gpr_log(GPR_ERROR, "Stream compression failed.");
350 }
351 grpc_stream_compression_context_destroy(s_->stream_compression_ctx);
352 s_->stream_compression_ctx = nullptr;
353 /* After finish, bytes in s->compressed_data_buffer may be
354 * more than max_outgoing. Start another round of the current
355 * while loop so that send_bytes and is_last_data_frame are
356 * recalculated. */
357 return;
358 }
359 is_last_frame_ = is_last_data_frame &&
360 s_->send_trailing_metadata != nullptr &&
361 grpc_metadata_batch_is_empty(s_->send_trailing_metadata);
362 grpc_chttp2_encode_data(s_->id, &s_->compressed_data_buffer, send_bytes,
363 is_last_frame_, &s_->stats.outgoing, &t_->outbuf);
364 s_->flow_control->SentData(send_bytes);
365 if (s_->compressed_data_buffer.length == 0) {
366 s_->sending_bytes += s_->uncompressed_data_size;
367 }
368 }
369
CompressMoreBytes()370 void CompressMoreBytes() {
371 if (s_->stream_compression_ctx == nullptr) {
372 s_->stream_compression_ctx =
373 grpc_stream_compression_context_create(s_->stream_compression_method);
374 }
375 s_->uncompressed_data_size = s_->flow_controlled_buffer.length;
376 if (GPR_UNLIKELY(!grpc_stream_compress(
377 s_->stream_compression_ctx, &s_->flow_controlled_buffer,
378 &s_->compressed_data_buffer, nullptr, MAX_SIZE_T,
379 GRPC_STREAM_COMPRESSION_FLUSH_SYNC))) {
380 gpr_log(GPR_ERROR, "Stream compression failed.");
381 }
382 }
383
is_last_frame() const384 bool is_last_frame() const { return is_last_frame_; }
385
CallCallbacks()386 void CallCallbacks() {
387 if (update_list(
388 t_, s_,
389 static_cast<int64_t>(s_->sending_bytes - sending_bytes_before_),
390 &s_->on_flow_controlled_cbs, &s_->flow_controlled_bytes_flowed,
391 GRPC_ERROR_NONE)) {
392 write_context_->NoteScheduledResults();
393 }
394 }
395
396 private:
397 WriteContext* write_context_;
398 grpc_chttp2_transport* t_;
399 grpc_chttp2_stream* s_;
400 const size_t sending_bytes_before_;
401 bool is_last_frame_ = false;
402 };
403
404 class StreamWriteContext {
405 public:
StreamWriteContext(WriteContext * write_context,grpc_chttp2_stream * s)406 StreamWriteContext(WriteContext* write_context, grpc_chttp2_stream* s)
407 : write_context_(write_context), t_(write_context->transport()), s_(s) {
408 GRPC_CHTTP2_IF_TRACING(
409 gpr_log(GPR_INFO, "W:%p %s[%d] im-(sent,send)=(%d,%d) announce=%d", t_,
410 t_->is_client ? "CLIENT" : "SERVER", s->id,
411 s->sent_initial_metadata, s->send_initial_metadata != nullptr,
412 (int)(s->flow_control->local_window_delta() -
413 s->flow_control->announced_window_delta())));
414 }
415
FlushInitialMetadata()416 void FlushInitialMetadata() {
417 /* send initial metadata if it's available */
418 if (s_->sent_initial_metadata) return;
419 if (s_->send_initial_metadata == nullptr) return;
420
421 // We skip this on the server side if there is no custom initial
422 // metadata, there are no messages to send, and we are also sending
423 // trailing metadata. This results in a Trailers-Only response,
424 // which is required for retries, as per:
425 // https://github.com/grpc/proposal/blob/master/A6-client-retries.md#when-retries-are-valid
426 if (!t_->is_client && s_->fetching_send_message == nullptr &&
427 s_->flow_controlled_buffer.length == 0 &&
428 s_->compressed_data_buffer.length == 0 &&
429 s_->send_trailing_metadata != nullptr &&
430 is_default_initial_metadata(s_->send_initial_metadata)) {
431 ConvertInitialMetadataToTrailingMetadata();
432 } else {
433 grpc_encode_header_options hopt = {
434 s_->id, // stream_id
435 false, // is_eof
436 t_->settings[GRPC_PEER_SETTINGS]
437 [GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA] !=
438 0, // use_true_binary_metadata
439 t_->settings[GRPC_PEER_SETTINGS]
440 [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], // max_frame_size
441 &s_->stats.outgoing // stats
442 };
443 grpc_chttp2_encode_header(&t_->hpack_compressor, nullptr, 0,
444 s_->send_initial_metadata, &hopt, &t_->outbuf);
445 write_context_->ResetPingClock();
446 write_context_->IncInitialMetadataWrites();
447 }
448
449 s_->send_initial_metadata = nullptr;
450 s_->sent_initial_metadata = true;
451 write_context_->NoteScheduledResults();
452 grpc_chttp2_complete_closure_step(
453 t_, s_, &s_->send_initial_metadata_finished, GRPC_ERROR_NONE,
454 "send_initial_metadata_finished");
455 }
456
FlushWindowUpdates()457 void FlushWindowUpdates() {
458 /* send any window updates */
459 const uint32_t stream_announce = s_->flow_control->MaybeSendUpdate();
460 if (stream_announce == 0) return;
461
462 grpc_slice_buffer_add(
463 &t_->outbuf, grpc_chttp2_window_update_create(s_->id, stream_announce,
464 &s_->stats.outgoing));
465 write_context_->ResetPingClock();
466 write_context_->IncWindowUpdateWrites();
467 }
468
FlushData()469 void FlushData() {
470 if (!s_->sent_initial_metadata) return;
471
472 if (s_->flow_controlled_buffer.length == 0 &&
473 s_->compressed_data_buffer.length == 0) {
474 return; // early out: nothing to do
475 }
476
477 DataSendContext data_send_context(write_context_, t_, s_);
478
479 if (!data_send_context.AnyOutgoing()) {
480 if (t_->flow_control->remote_window() <= 0) {
481 report_stall(t_, s_, "transport");
482 grpc_chttp2_list_add_stalled_by_transport(t_, s_);
483 } else if (data_send_context.stream_remote_window() <= 0) {
484 report_stall(t_, s_, "stream");
485 grpc_chttp2_list_add_stalled_by_stream(t_, s_);
486 }
487 return; // early out: nothing to do
488 }
489
490 while ((s_->flow_controlled_buffer.length > 0 ||
491 s_->compressed_data_buffer.length > 0) &&
492 data_send_context.max_outgoing() > 0) {
493 if (s_->compressed_data_buffer.length > 0) {
494 data_send_context.FlushCompressedBytes();
495 } else {
496 data_send_context.CompressMoreBytes();
497 }
498 }
499 write_context_->ResetPingClock();
500 if (data_send_context.is_last_frame()) {
501 SentLastFrame();
502 }
503 data_send_context.CallCallbacks();
504 stream_became_writable_ = true;
505 if (s_->flow_controlled_buffer.length > 0 ||
506 s_->compressed_data_buffer.length > 0) {
507 GRPC_CHTTP2_STREAM_REF(s_, "chttp2_writing:fork");
508 grpc_chttp2_list_add_writable_stream(t_, s_);
509 }
510 write_context_->IncMessageWrites();
511 }
512
FlushTrailingMetadata()513 void FlushTrailingMetadata() {
514 if (!s_->sent_initial_metadata) return;
515
516 if (s_->send_trailing_metadata == nullptr) return;
517 if (s_->fetching_send_message != nullptr) return;
518 if (s_->flow_controlled_buffer.length != 0) return;
519 if (s_->compressed_data_buffer.length != 0) return;
520
521 GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "sending trailing_metadata"));
522 if (grpc_metadata_batch_is_empty(s_->send_trailing_metadata)) {
523 grpc_chttp2_encode_data(s_->id, &s_->flow_controlled_buffer, 0, true,
524 &s_->stats.outgoing, &t_->outbuf);
525 } else {
526 grpc_encode_header_options hopt = {
527 s_->id, true,
528 t_->settings[GRPC_PEER_SETTINGS]
529 [GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA] !=
530 0,
531
532 t_->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
533 &s_->stats.outgoing};
534 grpc_chttp2_encode_header(&t_->hpack_compressor,
535 extra_headers_for_trailing_metadata_,
536 num_extra_headers_for_trailing_metadata_,
537 s_->send_trailing_metadata, &hopt, &t_->outbuf);
538 }
539 write_context_->IncTrailingMetadataWrites();
540 write_context_->ResetPingClock();
541 SentLastFrame();
542
543 write_context_->NoteScheduledResults();
544 grpc_chttp2_complete_closure_step(
545 t_, s_, &s_->send_trailing_metadata_finished, GRPC_ERROR_NONE,
546 "send_trailing_metadata_finished");
547 }
548
stream_became_writable()549 bool stream_became_writable() { return stream_became_writable_; }
550
551 private:
ConvertInitialMetadataToTrailingMetadata()552 void ConvertInitialMetadataToTrailingMetadata() {
553 GRPC_CHTTP2_IF_TRACING(
554 gpr_log(GPR_INFO, "not sending initial_metadata (Trailers-Only)"));
555 // When sending Trailers-Only, we need to move the :status and
556 // content-type headers to the trailers.
557 if (s_->send_initial_metadata->idx.named.status != nullptr) {
558 extra_headers_for_trailing_metadata_
559 [num_extra_headers_for_trailing_metadata_++] =
560 &s_->send_initial_metadata->idx.named.status->md;
561 }
562 if (s_->send_initial_metadata->idx.named.content_type != nullptr) {
563 extra_headers_for_trailing_metadata_
564 [num_extra_headers_for_trailing_metadata_++] =
565 &s_->send_initial_metadata->idx.named.content_type->md;
566 }
567 }
568
SentLastFrame()569 void SentLastFrame() {
570 s_->send_trailing_metadata = nullptr;
571 s_->sent_trailing_metadata = true;
572
573 if (!t_->is_client && !s_->read_closed) {
574 grpc_slice_buffer_add(
575 &t_->outbuf, grpc_chttp2_rst_stream_create(
576 s_->id, GRPC_HTTP2_NO_ERROR, &s_->stats.outgoing));
577 }
578 grpc_chttp2_mark_stream_closed(t_, s_, !t_->is_client, true,
579 GRPC_ERROR_NONE);
580 }
581
582 WriteContext* const write_context_;
583 grpc_chttp2_transport* const t_;
584 grpc_chttp2_stream* const s_;
585 bool stream_became_writable_ = false;
586 grpc_mdelem* extra_headers_for_trailing_metadata_[2];
587 size_t num_extra_headers_for_trailing_metadata_ = 0;
588 };
589 } // namespace
590
grpc_chttp2_begin_write(grpc_chttp2_transport * t)591 grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
592 grpc_chttp2_transport* t) {
593 WriteContext ctx(t);
594 ctx.FlushSettings();
595 ctx.FlushPingAcks();
596 ctx.FlushQueuedBuffers();
597 ctx.EnactHpackSettings();
598
599 if (t->flow_control->remote_window() > 0) {
600 ctx.UpdateStreamsNoLongerStalled();
601 }
602
603 /* for each grpc_chttp2_stream that's become writable, frame it's data
604 (according to available window sizes) and add to the output buffer */
605 while (grpc_chttp2_stream* s = ctx.NextStream()) {
606 StreamWriteContext stream_ctx(&ctx, s);
607 stream_ctx.FlushInitialMetadata();
608 stream_ctx.FlushWindowUpdates();
609 stream_ctx.FlushData();
610 stream_ctx.FlushTrailingMetadata();
611
612 if (stream_ctx.stream_became_writable()) {
613 if (!grpc_chttp2_list_add_writing_stream(t, s)) {
614 /* already in writing list: drop ref */
615 GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:already_writing");
616 } else {
617 /* ref will be dropped at end of write */
618 }
619 } else {
620 GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:no_write");
621 }
622 }
623
624 ctx.FlushWindowUpdates();
625
626 maybe_initiate_ping(t);
627
628 return ctx.Result();
629 }
630
grpc_chttp2_end_write(grpc_chttp2_transport * t,grpc_error * error)631 void grpc_chttp2_end_write(grpc_chttp2_transport* t, grpc_error* error) {
632 GPR_TIMER_SCOPE("grpc_chttp2_end_write", 0);
633 grpc_chttp2_stream* s;
634
635 while (grpc_chttp2_list_pop_writing_stream(t, &s)) {
636 if (s->sending_bytes != 0) {
637 update_list(t, s, static_cast<int64_t>(s->sending_bytes),
638 &s->on_write_finished_cbs, &s->flow_controlled_bytes_written,
639 GRPC_ERROR_REF(error));
640 s->sending_bytes = 0;
641 }
642 GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:end");
643 }
644 grpc_slice_buffer_reset_and_unref_internal(&t->outbuf);
645 GRPC_ERROR_UNREF(error);
646 }
647