1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
22 #include "src/core/ext/transport/chttp2/transport/context_list.h"
23 #include "src/core/ext/transport/chttp2/transport/internal.h"
24
25 #include <limits.h>
26
27 #include <grpc/support/log.h>
28
29 #include "src/core/lib/compression/stream_compression.h"
30 #include "src/core/lib/debug/stats.h"
31 #include "src/core/lib/profiling/timers.h"
32 #include "src/core/lib/slice/slice_internal.h"
33 #include "src/core/lib/transport/http2_errors.h"
34
add_to_write_list(grpc_chttp2_write_cb ** list,grpc_chttp2_write_cb * cb)35 static void add_to_write_list(grpc_chttp2_write_cb** list,
36 grpc_chttp2_write_cb* cb) {
37 cb->next = *list;
38 *list = cb;
39 }
40
finish_write_cb(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_chttp2_write_cb * cb,grpc_error * error)41 static void finish_write_cb(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
42 grpc_chttp2_write_cb* cb, grpc_error* error) {
43 grpc_chttp2_complete_closure_step(t, s, &cb->closure, error,
44 "finish_write_cb");
45 cb->next = t->write_cb_pool;
46 t->write_cb_pool = cb;
47 }
48
maybe_initiate_ping(grpc_chttp2_transport * t)49 static void maybe_initiate_ping(grpc_chttp2_transport* t) {
50 grpc_chttp2_ping_queue* pq = &t->ping_queue;
51 if (grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
52 /* no ping needed: wait */
53 return;
54 }
55 if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) {
56 /* ping already in-flight: wait */
57 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
58 GRPC_TRACE_FLAG_ENABLED(grpc_bdp_estimator_trace) ||
59 GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
60 gpr_log(GPR_INFO, "%s: Ping delayed [%s]: already pinging",
61 t->is_client ? "CLIENT" : "SERVER", t->peer_string.c_str());
62 }
63 return;
64 }
65 if (t->ping_state.pings_before_data_required == 0 &&
66 t->ping_policy.max_pings_without_data != 0) {
67 /* need to receive something of substance before sending a ping again */
68 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
69 GRPC_TRACE_FLAG_ENABLED(grpc_bdp_estimator_trace) ||
70 GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
71 gpr_log(GPR_INFO, "%s: Ping delayed [%s]: too many recent pings: %d/%d",
72 t->is_client ? "CLIENT" : "SERVER", t->peer_string.c_str(),
73 t->ping_state.pings_before_data_required,
74 t->ping_policy.max_pings_without_data);
75 }
76 return;
77 }
78 grpc_millis now = grpc_core::ExecCtx::Get()->Now();
79
80 grpc_millis next_allowed_ping_interval =
81 (t->keepalive_permit_without_calls == 0 &&
82 grpc_chttp2_stream_map_size(&t->stream_map) == 0)
83 ? 7200 * GPR_MS_PER_SEC
84 : (GPR_MS_PER_SEC); /* A second is added to deal with network delays
85 and timing imprecision */
86 grpc_millis next_allowed_ping =
87 t->ping_state.last_ping_sent_time + next_allowed_ping_interval;
88
89 if (next_allowed_ping > now) {
90 /* not enough elapsed time between successive pings */
91 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
92 GRPC_TRACE_FLAG_ENABLED(grpc_bdp_estimator_trace) ||
93 GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
94 gpr_log(GPR_INFO,
95 "%s: Ping delayed [%s]: not enough time elapsed since last ping. "
96 " Last ping %f: Next ping %f: Now %f",
97 t->is_client ? "CLIENT" : "SERVER", t->peer_string.c_str(),
98 static_cast<double>(t->ping_state.last_ping_sent_time),
99 static_cast<double>(next_allowed_ping), static_cast<double>(now));
100 }
101 if (!t->ping_state.is_delayed_ping_timer_set) {
102 t->ping_state.is_delayed_ping_timer_set = true;
103 GRPC_CHTTP2_REF_TRANSPORT(t, "retry_initiate_ping_locked");
104 GRPC_CLOSURE_INIT(&t->retry_initiate_ping_locked,
105 grpc_chttp2_retry_initiate_ping, t,
106 grpc_schedule_on_exec_ctx);
107 grpc_timer_init(&t->ping_state.delayed_ping_timer, next_allowed_ping,
108 &t->retry_initiate_ping_locked);
109 }
110 return;
111 }
112
113 pq->inflight_id = t->ping_ctr;
114 t->ping_ctr++;
115 grpc_core::ExecCtx::RunList(DEBUG_LOCATION,
116 &pq->lists[GRPC_CHTTP2_PCL_INITIATE]);
117 grpc_closure_list_move(&pq->lists[GRPC_CHTTP2_PCL_NEXT],
118 &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
119 grpc_slice_buffer_add(&t->outbuf,
120 grpc_chttp2_ping_create(false, pq->inflight_id));
121 GRPC_STATS_INC_HTTP2_PINGS_SENT();
122 t->ping_state.last_ping_sent_time = now;
123 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
124 GRPC_TRACE_FLAG_ENABLED(grpc_bdp_estimator_trace) ||
125 GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
126 gpr_log(GPR_INFO, "%s: Ping sent [%s]: %d/%d",
127 t->is_client ? "CLIENT" : "SERVER", t->peer_string.c_str(),
128 t->ping_state.pings_before_data_required,
129 t->ping_policy.max_pings_without_data);
130 }
131 t->ping_state.pings_before_data_required -=
132 (t->ping_state.pings_before_data_required != 0);
133 }
134
update_list(grpc_chttp2_transport * t,grpc_chttp2_stream * s,int64_t send_bytes,grpc_chttp2_write_cb ** list,int64_t * ctr,grpc_error * error)135 static bool update_list(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
136 int64_t send_bytes, grpc_chttp2_write_cb** list,
137 int64_t* ctr, grpc_error* error) {
138 bool sched_any = false;
139 grpc_chttp2_write_cb* cb = *list;
140 *list = nullptr;
141 *ctr += send_bytes;
142 while (cb) {
143 grpc_chttp2_write_cb* next = cb->next;
144 if (cb->call_at_byte <= *ctr) {
145 sched_any = true;
146 finish_write_cb(t, s, cb, GRPC_ERROR_REF(error));
147 } else {
148 add_to_write_list(list, cb);
149 }
150 cb = next;
151 }
152 GRPC_ERROR_UNREF(error);
153 return sched_any;
154 }
155
report_stall(grpc_chttp2_transport * t,grpc_chttp2_stream * s,const char * staller)156 static void report_stall(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
157 const char* staller) {
158 if (GRPC_TRACE_FLAG_ENABLED(grpc_flowctl_trace)) {
159 gpr_log(
160 GPR_DEBUG,
161 "%s:%p stream %d moved to stalled list by %s. This is FULLY expected "
162 "to happen in a healthy program that is not seeing flow control stalls."
163 " However, if you know that there are unwanted stalls, here is some "
164 "helpful data: [fc:pending=%" PRIdPTR ":pending-compressed=%" PRIdPTR
165 ":flowed=%" PRId64 ":peer_initwin=%d:t_win=%" PRId64
166 ":s_win=%d:s_delta=%" PRId64 "]",
167 t->peer_string.c_str(), t, s->id, staller,
168 s->flow_controlled_buffer.length,
169 s->stream_compression_method ==
170 GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS
171 ? 0
172 : s->compressed_data_buffer.length,
173 s->flow_controlled_bytes_flowed,
174 t->settings[GRPC_ACKED_SETTINGS]
175 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
176 t->flow_control->remote_window(),
177 static_cast<uint32_t> GPR_MAX(
178 0,
179 s->flow_control->remote_window_delta() +
180 (int64_t)t->settings[GRPC_PEER_SETTINGS]
181 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]),
182 s->flow_control->remote_window_delta());
183 }
184 }
185
186 /* How many bytes would we like to put on the wire during a single syscall */
target_write_size(grpc_chttp2_transport *)187 static uint32_t target_write_size(grpc_chttp2_transport* /*t*/) {
188 return 1024 * 1024;
189 }
190
191 // Returns true if initial_metadata contains only default headers.
is_default_initial_metadata(grpc_metadata_batch * initial_metadata)192 static bool is_default_initial_metadata(grpc_metadata_batch* initial_metadata) {
193 return initial_metadata->list.default_count == initial_metadata->list.count;
194 }
195
196 namespace {
197 class StreamWriteContext;
198
199 class WriteContext {
200 public:
WriteContext(grpc_chttp2_transport * t)201 explicit WriteContext(grpc_chttp2_transport* t) : t_(t) {
202 GRPC_STATS_INC_HTTP2_WRITES_BEGUN();
203 GPR_TIMER_SCOPE("grpc_chttp2_begin_write", 0);
204 }
205
206 // TODO(ctiller): make this the destructor
FlushStats()207 void FlushStats() {
208 GRPC_STATS_INC_HTTP2_SEND_INITIAL_METADATA_PER_WRITE(
209 initial_metadata_writes_);
210 GRPC_STATS_INC_HTTP2_SEND_MESSAGE_PER_WRITE(message_writes_);
211 GRPC_STATS_INC_HTTP2_SEND_TRAILING_METADATA_PER_WRITE(
212 trailing_metadata_writes_);
213 GRPC_STATS_INC_HTTP2_SEND_FLOWCTL_PER_WRITE(flow_control_writes_);
214 }
215
FlushSettings()216 void FlushSettings() {
217 if (t_->dirtied_local_settings && !t_->sent_local_settings) {
218 grpc_slice_buffer_add(
219 &t_->outbuf, grpc_chttp2_settings_create(
220 t_->settings[GRPC_SENT_SETTINGS],
221 t_->settings[GRPC_LOCAL_SETTINGS],
222 t_->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS));
223 t_->force_send_settings = false;
224 t_->dirtied_local_settings = false;
225 t_->sent_local_settings = true;
226 GRPC_STATS_INC_HTTP2_SETTINGS_WRITES();
227 }
228 }
229
FlushQueuedBuffers()230 void FlushQueuedBuffers() {
231 /* simple writes are queued to qbuf, and flushed here */
232 grpc_slice_buffer_move_into(&t_->qbuf, &t_->outbuf);
233 t_->num_pending_induced_frames = 0;
234 GPR_ASSERT(t_->qbuf.count == 0);
235 }
236
FlushWindowUpdates()237 void FlushWindowUpdates() {
238 uint32_t transport_announce =
239 t_->flow_control->MaybeSendUpdate(t_->outbuf.count > 0);
240 if (transport_announce) {
241 grpc_transport_one_way_stats throwaway_stats;
242 grpc_slice_buffer_add(
243 &t_->outbuf, grpc_chttp2_window_update_create(0, transport_announce,
244 &throwaway_stats));
245 grpc_chttp2_reset_ping_clock(t_);
246 }
247 }
248
FlushPingAcks()249 void FlushPingAcks() {
250 for (size_t i = 0; i < t_->ping_ack_count; i++) {
251 grpc_slice_buffer_add(&t_->outbuf,
252 grpc_chttp2_ping_create(true, t_->ping_acks[i]));
253 }
254 t_->ping_ack_count = 0;
255 }
256
EnactHpackSettings()257 void EnactHpackSettings() {
258 grpc_chttp2_hpack_compressor_set_max_table_size(
259 &t_->hpack_compressor,
260 t_->settings[GRPC_PEER_SETTINGS]
261 [GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]);
262 }
263
UpdateStreamsNoLongerStalled()264 void UpdateStreamsNoLongerStalled() {
265 grpc_chttp2_stream* s;
266 while (grpc_chttp2_list_pop_stalled_by_transport(t_, &s)) {
267 if (t_->closed_with_error == GRPC_ERROR_NONE &&
268 grpc_chttp2_list_add_writable_stream(t_, s)) {
269 if (!s->refcount->refs.RefIfNonZero()) {
270 grpc_chttp2_list_remove_writable_stream(t_, s);
271 }
272 }
273 }
274 }
275
NextStream()276 grpc_chttp2_stream* NextStream() {
277 if (t_->outbuf.length > target_write_size(t_)) {
278 result_.partial = true;
279 return nullptr;
280 }
281
282 grpc_chttp2_stream* s;
283 if (!grpc_chttp2_list_pop_writable_stream(t_, &s)) {
284 return nullptr;
285 }
286
287 return s;
288 }
289
IncInitialMetadataWrites()290 void IncInitialMetadataWrites() { ++initial_metadata_writes_; }
IncWindowUpdateWrites()291 void IncWindowUpdateWrites() { ++flow_control_writes_; }
IncMessageWrites()292 void IncMessageWrites() { ++message_writes_; }
IncTrailingMetadataWrites()293 void IncTrailingMetadataWrites() { ++trailing_metadata_writes_; }
294
NoteScheduledResults()295 void NoteScheduledResults() { result_.early_results_scheduled = true; }
296
transport() const297 grpc_chttp2_transport* transport() const { return t_; }
298
Result()299 grpc_chttp2_begin_write_result Result() {
300 result_.writing = t_->outbuf.count > 0;
301 return result_;
302 }
303
304 private:
305 grpc_chttp2_transport* const t_;
306
307 /* stats histogram counters: we increment these throughout this function,
308 and at the end publish to the central stats histograms */
309 int flow_control_writes_ = 0;
310 int initial_metadata_writes_ = 0;
311 int trailing_metadata_writes_ = 0;
312 int message_writes_ = 0;
313 grpc_chttp2_begin_write_result result_ = {false, false, false};
314 };
315
316 class DataSendContext {
317 public:
DataSendContext(WriteContext * write_context,grpc_chttp2_transport * t,grpc_chttp2_stream * s)318 DataSendContext(WriteContext* write_context, grpc_chttp2_transport* t,
319 grpc_chttp2_stream* s)
320 : write_context_(write_context),
321 t_(t),
322 s_(s),
323 sending_bytes_before_(s_->sending_bytes) {}
324
stream_remote_window() const325 uint32_t stream_remote_window() const {
326 return static_cast<uint32_t> GPR_MAX(
327 0, s_->flow_control->remote_window_delta() +
328 (int64_t)t_->settings[GRPC_PEER_SETTINGS]
329 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
330 }
331
max_outgoing() const332 uint32_t max_outgoing() const {
333 return static_cast<uint32_t> GPR_MIN(
334 t_->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
335 GPR_MIN(stream_remote_window(), t_->flow_control->remote_window()));
336 }
337
AnyOutgoing() const338 bool AnyOutgoing() const { return max_outgoing() > 0; }
339
FlushUncompressedBytes()340 void FlushUncompressedBytes() {
341 uint32_t send_bytes = static_cast<uint32_t> GPR_MIN(
342 max_outgoing(), s_->flow_controlled_buffer.length);
343 is_last_frame_ = send_bytes == s_->flow_controlled_buffer.length &&
344 s_->fetching_send_message == nullptr &&
345 s_->send_trailing_metadata != nullptr &&
346 grpc_metadata_batch_is_empty(s_->send_trailing_metadata);
347 grpc_chttp2_encode_data(s_->id, &s_->flow_controlled_buffer, send_bytes,
348 is_last_frame_, &s_->stats.outgoing, &t_->outbuf);
349 s_->flow_control->SentData(send_bytes);
350 s_->sending_bytes += send_bytes;
351 }
352
FlushCompressedBytes()353 void FlushCompressedBytes() {
354 GPR_DEBUG_ASSERT(s_->stream_compression_method !=
355 GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS);
356
357 uint32_t send_bytes = static_cast<uint32_t> GPR_MIN(
358 max_outgoing(), s_->compressed_data_buffer.length);
359 bool is_last_data_frame =
360 (send_bytes == s_->compressed_data_buffer.length &&
361 s_->flow_controlled_buffer.length == 0 &&
362 s_->fetching_send_message == nullptr);
363 if (is_last_data_frame && s_->send_trailing_metadata != nullptr &&
364 s_->stream_compression_ctx != nullptr) {
365 if (GPR_UNLIKELY(!grpc_stream_compress(
366 s_->stream_compression_ctx, &s_->flow_controlled_buffer,
367 &s_->compressed_data_buffer, nullptr, MAX_SIZE_T,
368 GRPC_STREAM_COMPRESSION_FLUSH_FINISH))) {
369 gpr_log(GPR_ERROR, "Stream compression failed.");
370 }
371 grpc_stream_compression_context_destroy(s_->stream_compression_ctx);
372 s_->stream_compression_ctx = nullptr;
373 /* After finish, bytes in s->compressed_data_buffer may be
374 * more than max_outgoing. Start another round of the current
375 * while loop so that send_bytes and is_last_data_frame are
376 * recalculated. */
377 return;
378 }
379 is_last_frame_ = is_last_data_frame &&
380 s_->send_trailing_metadata != nullptr &&
381 grpc_metadata_batch_is_empty(s_->send_trailing_metadata);
382 grpc_chttp2_encode_data(s_->id, &s_->compressed_data_buffer, send_bytes,
383 is_last_frame_, &s_->stats.outgoing, &t_->outbuf);
384 s_->flow_control->SentData(send_bytes);
385 if (s_->compressed_data_buffer.length == 0) {
386 s_->sending_bytes += s_->uncompressed_data_size;
387 }
388 }
389
CompressMoreBytes()390 void CompressMoreBytes() {
391 GPR_DEBUG_ASSERT(s_->stream_compression_method !=
392 GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS);
393
394 if (s_->stream_compression_ctx == nullptr) {
395 s_->stream_compression_ctx =
396 grpc_stream_compression_context_create(s_->stream_compression_method);
397 }
398 s_->uncompressed_data_size = s_->flow_controlled_buffer.length;
399 if (GPR_UNLIKELY(!grpc_stream_compress(
400 s_->stream_compression_ctx, &s_->flow_controlled_buffer,
401 &s_->compressed_data_buffer, nullptr, MAX_SIZE_T,
402 GRPC_STREAM_COMPRESSION_FLUSH_SYNC))) {
403 gpr_log(GPR_ERROR, "Stream compression failed.");
404 }
405 }
406
is_last_frame() const407 bool is_last_frame() const { return is_last_frame_; }
408
CallCallbacks()409 void CallCallbacks() {
410 if (update_list(
411 t_, s_,
412 static_cast<int64_t>(s_->sending_bytes - sending_bytes_before_),
413 &s_->on_flow_controlled_cbs, &s_->flow_controlled_bytes_flowed,
414 GRPC_ERROR_NONE)) {
415 write_context_->NoteScheduledResults();
416 }
417 }
418
419 private:
420 WriteContext* write_context_;
421 grpc_chttp2_transport* t_;
422 grpc_chttp2_stream* s_;
423 const size_t sending_bytes_before_;
424 bool is_last_frame_ = false;
425 };
426
427 class StreamWriteContext {
428 public:
StreamWriteContext(WriteContext * write_context,grpc_chttp2_stream * s)429 StreamWriteContext(WriteContext* write_context, grpc_chttp2_stream* s)
430 : write_context_(write_context), t_(write_context->transport()), s_(s) {
431 GRPC_CHTTP2_IF_TRACING(
432 gpr_log(GPR_INFO, "W:%p %s[%d] im-(sent,send)=(%d,%d) announce=%d", t_,
433 t_->is_client ? "CLIENT" : "SERVER", s->id,
434 s->sent_initial_metadata, s->send_initial_metadata != nullptr,
435 (int)(s->flow_control->local_window_delta() -
436 s->flow_control->announced_window_delta())));
437 }
438
FlushInitialMetadata()439 void FlushInitialMetadata() {
440 /* send initial metadata if it's available */
441 if (s_->sent_initial_metadata) return;
442 if (s_->send_initial_metadata == nullptr) return;
443
444 // We skip this on the server side if there is no custom initial
445 // metadata, there are no messages to send, and we are also sending
446 // trailing metadata. This results in a Trailers-Only response,
447 // which is required for retries, as per:
448 // https://github.com/grpc/proposal/blob/master/A6-client-retries.md#when-retries-are-valid
449 if (!t_->is_client && s_->fetching_send_message == nullptr &&
450 s_->flow_controlled_buffer.length == 0 &&
451 compressed_data_buffer_len() == 0 &&
452 s_->send_trailing_metadata != nullptr &&
453 is_default_initial_metadata(s_->send_initial_metadata)) {
454 ConvertInitialMetadataToTrailingMetadata();
455 } else {
456 grpc_encode_header_options hopt = {
457 s_->id, // stream_id
458 false, // is_eof
459 t_->settings[GRPC_PEER_SETTINGS]
460 [GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA] !=
461 0, // use_true_binary_metadata
462 t_->settings[GRPC_PEER_SETTINGS]
463 [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], // max_frame_size
464 &s_->stats.outgoing // stats
465 };
466 grpc_chttp2_encode_header(&t_->hpack_compressor, nullptr, 0,
467 s_->send_initial_metadata, &hopt, &t_->outbuf);
468 grpc_chttp2_reset_ping_clock(t_);
469 write_context_->IncInitialMetadataWrites();
470 }
471
472 s_->send_initial_metadata = nullptr;
473 s_->sent_initial_metadata = true;
474 write_context_->NoteScheduledResults();
475 grpc_chttp2_complete_closure_step(
476 t_, s_, &s_->send_initial_metadata_finished, GRPC_ERROR_NONE,
477 "send_initial_metadata_finished");
478 }
479
compressed_data_buffer_len()480 size_t compressed_data_buffer_len() {
481 return s_->stream_compression_method ==
482 GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS
483 ? 0
484 : s_->compressed_data_buffer.length;
485 }
486
FlushWindowUpdates()487 void FlushWindowUpdates() {
488 /* send any window updates */
489 const uint32_t stream_announce = s_->flow_control->MaybeSendUpdate();
490 if (stream_announce == 0) return;
491
492 grpc_slice_buffer_add(
493 &t_->outbuf, grpc_chttp2_window_update_create(s_->id, stream_announce,
494 &s_->stats.outgoing));
495 grpc_chttp2_reset_ping_clock(t_);
496 write_context_->IncWindowUpdateWrites();
497 }
498
FlushData()499 void FlushData() {
500 if (!s_->sent_initial_metadata) return;
501
502 if (s_->flow_controlled_buffer.length == 0 &&
503 compressed_data_buffer_len() == 0) {
504 return; // early out: nothing to do
505 }
506
507 DataSendContext data_send_context(write_context_, t_, s_);
508
509 if (!data_send_context.AnyOutgoing()) {
510 if (t_->flow_control->remote_window() <= 0) {
511 report_stall(t_, s_, "transport");
512 grpc_chttp2_list_add_stalled_by_transport(t_, s_);
513 } else if (data_send_context.stream_remote_window() <= 0) {
514 report_stall(t_, s_, "stream");
515 grpc_chttp2_list_add_stalled_by_stream(t_, s_);
516 }
517 return; // early out: nothing to do
518 }
519
520 if (s_->stream_compression_method ==
521 GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) {
522 while (s_->flow_controlled_buffer.length > 0 &&
523 data_send_context.max_outgoing() > 0) {
524 data_send_context.FlushUncompressedBytes();
525 }
526 } else {
527 while ((s_->flow_controlled_buffer.length > 0 ||
528 s_->compressed_data_buffer.length > 0) &&
529 data_send_context.max_outgoing() > 0) {
530 if (s_->compressed_data_buffer.length > 0) {
531 data_send_context.FlushCompressedBytes();
532 } else {
533 data_send_context.CompressMoreBytes();
534 }
535 }
536 }
537 grpc_chttp2_reset_ping_clock(t_);
538 if (data_send_context.is_last_frame()) {
539 SentLastFrame();
540 }
541 data_send_context.CallCallbacks();
542 stream_became_writable_ = true;
543 if (s_->flow_controlled_buffer.length > 0 ||
544 compressed_data_buffer_len() > 0) {
545 GRPC_CHTTP2_STREAM_REF(s_, "chttp2_writing:fork");
546 grpc_chttp2_list_add_writable_stream(t_, s_);
547 }
548 write_context_->IncMessageWrites();
549 }
550
FlushTrailingMetadata()551 void FlushTrailingMetadata() {
552 if (!s_->sent_initial_metadata) return;
553
554 if (s_->send_trailing_metadata == nullptr) return;
555 if (s_->fetching_send_message != nullptr) return;
556 if (s_->flow_controlled_buffer.length != 0) return;
557 if (compressed_data_buffer_len() != 0) return;
558
559 GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "sending trailing_metadata"));
560 if (grpc_metadata_batch_is_empty(s_->send_trailing_metadata)) {
561 grpc_chttp2_encode_data(s_->id, &s_->flow_controlled_buffer, 0, true,
562 &s_->stats.outgoing, &t_->outbuf);
563 } else {
564 grpc_encode_header_options hopt = {
565 s_->id, true,
566 t_->settings[GRPC_PEER_SETTINGS]
567 [GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA] !=
568 0,
569
570 t_->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
571 &s_->stats.outgoing};
572 grpc_chttp2_encode_header(&t_->hpack_compressor,
573 extra_headers_for_trailing_metadata_,
574 num_extra_headers_for_trailing_metadata_,
575 s_->send_trailing_metadata, &hopt, &t_->outbuf);
576 }
577 write_context_->IncTrailingMetadataWrites();
578 grpc_chttp2_reset_ping_clock(t_);
579 SentLastFrame();
580
581 write_context_->NoteScheduledResults();
582 grpc_chttp2_complete_closure_step(
583 t_, s_, &s_->send_trailing_metadata_finished, GRPC_ERROR_NONE,
584 "send_trailing_metadata_finished");
585 }
586
stream_became_writable()587 bool stream_became_writable() { return stream_became_writable_; }
588
589 private:
ConvertInitialMetadataToTrailingMetadata()590 void ConvertInitialMetadataToTrailingMetadata() {
591 GRPC_CHTTP2_IF_TRACING(
592 gpr_log(GPR_INFO, "not sending initial_metadata (Trailers-Only)"));
593 // When sending Trailers-Only, we need to move the :status and
594 // content-type headers to the trailers.
595 if (s_->send_initial_metadata->idx.named.status != nullptr) {
596 extra_headers_for_trailing_metadata_
597 [num_extra_headers_for_trailing_metadata_++] =
598 &s_->send_initial_metadata->idx.named.status->md;
599 }
600 if (s_->send_initial_metadata->idx.named.content_type != nullptr) {
601 extra_headers_for_trailing_metadata_
602 [num_extra_headers_for_trailing_metadata_++] =
603 &s_->send_initial_metadata->idx.named.content_type->md;
604 }
605 }
606
SentLastFrame()607 void SentLastFrame() {
608 s_->send_trailing_metadata = nullptr;
609 if (s_->sent_trailing_metadata_op) {
610 *s_->sent_trailing_metadata_op = true;
611 s_->sent_trailing_metadata_op = nullptr;
612 }
613 s_->sent_trailing_metadata = true;
614 s_->eos_sent = true;
615
616 if (!t_->is_client && !s_->read_closed) {
617 grpc_slice_buffer_add(
618 &t_->outbuf, grpc_chttp2_rst_stream_create(
619 s_->id, GRPC_HTTP2_NO_ERROR, &s_->stats.outgoing));
620 }
621 grpc_chttp2_mark_stream_closed(t_, s_, !t_->is_client, true,
622 GRPC_ERROR_NONE);
623 }
624
625 WriteContext* const write_context_;
626 grpc_chttp2_transport* const t_;
627 grpc_chttp2_stream* const s_;
628 bool stream_became_writable_ = false;
629 grpc_mdelem* extra_headers_for_trailing_metadata_[2];
630 size_t num_extra_headers_for_trailing_metadata_ = 0;
631 };
632 } // namespace
633
grpc_chttp2_begin_write(grpc_chttp2_transport * t)634 grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
635 grpc_chttp2_transport* t) {
636 WriteContext ctx(t);
637 ctx.FlushSettings();
638 ctx.FlushPingAcks();
639 ctx.FlushQueuedBuffers();
640 ctx.EnactHpackSettings();
641
642 if (t->flow_control->remote_window() > 0) {
643 ctx.UpdateStreamsNoLongerStalled();
644 }
645
646 /* for each grpc_chttp2_stream that's become writable, frame it's data
647 (according to available window sizes) and add to the output buffer */
648 while (grpc_chttp2_stream* s = ctx.NextStream()) {
649 StreamWriteContext stream_ctx(&ctx, s);
650 size_t orig_len = t->outbuf.length;
651 stream_ctx.FlushInitialMetadata();
652 stream_ctx.FlushWindowUpdates();
653 stream_ctx.FlushData();
654 stream_ctx.FlushTrailingMetadata();
655 if (t->outbuf.length > orig_len) {
656 /* Add this stream to the list of the contexts to be traced at TCP */
657 s->byte_counter += t->outbuf.length - orig_len;
658 if (s->traced && grpc_endpoint_can_track_err(t->ep)) {
659 grpc_core::ContextList::Append(&t->cl, s);
660 }
661 }
662 if (stream_ctx.stream_became_writable()) {
663 if (!grpc_chttp2_list_add_writing_stream(t, s)) {
664 /* already in writing list: drop ref */
665 GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:already_writing");
666 } else {
667 /* ref will be dropped at end of write */
668 }
669 } else {
670 GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:no_write");
671 }
672 }
673
674 ctx.FlushWindowUpdates();
675
676 maybe_initiate_ping(t);
677
678 return ctx.Result();
679 }
680
grpc_chttp2_end_write(grpc_chttp2_transport * t,grpc_error * error)681 void grpc_chttp2_end_write(grpc_chttp2_transport* t, grpc_error* error) {
682 GPR_TIMER_SCOPE("grpc_chttp2_end_write", 0);
683 grpc_chttp2_stream* s;
684
685 if (t->channelz_socket != nullptr) {
686 t->channelz_socket->RecordMessagesSent(t->num_messages_in_next_write);
687 }
688 t->num_messages_in_next_write = 0;
689
690 while (grpc_chttp2_list_pop_writing_stream(t, &s)) {
691 if (s->sending_bytes != 0) {
692 update_list(t, s, static_cast<int64_t>(s->sending_bytes),
693 &s->on_write_finished_cbs, &s->flow_controlled_bytes_written,
694 GRPC_ERROR_REF(error));
695 s->sending_bytes = 0;
696 }
697 GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:end");
698 }
699 grpc_slice_buffer_reset_and_unref_internal(&t->outbuf);
700 GRPC_ERROR_UNREF(error);
701 }
702