1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
22
23 #include <inttypes.h>
24 #include <limits.h>
25 #include <math.h>
26 #include <stdio.h>
27 #include <string.h>
28
29 #include <grpc/slice_buffer.h>
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/log.h>
32 #include <grpc/support/string_util.h>
33
34 #include "src/core/ext/transport/chttp2/transport/frame_data.h"
35 #include "src/core/ext/transport/chttp2/transport/internal.h"
36 #include "src/core/ext/transport/chttp2/transport/varint.h"
37 #include "src/core/lib/channel/channel_args.h"
38 #include "src/core/lib/compression/stream_compression.h"
39 #include "src/core/lib/debug/stats.h"
40 #include "src/core/lib/gpr/env.h"
41 #include "src/core/lib/gpr/string.h"
42 #include "src/core/lib/gprpp/memory.h"
43 #include "src/core/lib/http/parser.h"
44 #include "src/core/lib/iomgr/executor.h"
45 #include "src/core/lib/iomgr/timer.h"
46 #include "src/core/lib/profiling/timers.h"
47 #include "src/core/lib/slice/slice_internal.h"
48 #include "src/core/lib/slice/slice_string_helpers.h"
49 #include "src/core/lib/transport/error_utils.h"
50 #include "src/core/lib/transport/http2_errors.h"
51 #include "src/core/lib/transport/static_metadata.h"
52 #include "src/core/lib/transport/status_conversion.h"
53 #include "src/core/lib/transport/timeout_encoding.h"
54 #include "src/core/lib/transport/transport.h"
55 #include "src/core/lib/transport/transport_impl.h"
56
57 #define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
58 #define MAX_WINDOW 0x7fffffffu
59 #define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024)
60 #define DEFAULT_MAX_HEADER_LIST_SIZE (8 * 1024)
61
62 #define DEFAULT_CLIENT_KEEPALIVE_TIME_MS INT_MAX
63 #define DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_MS 20000 /* 20 seconds */
64 #define DEFAULT_SERVER_KEEPALIVE_TIME_MS 7200000 /* 2 hours */
65 #define DEFAULT_SERVER_KEEPALIVE_TIMEOUT_MS 20000 /* 20 seconds */
66 #define DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS false
67 #define KEEPALIVE_TIME_BACKOFF_MULTIPLIER 2
68
69 #define DEFAULT_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS 300000 /* 5 minutes */
70 #define DEFAULT_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS 300000 /* 5 minutes */
71 #define DEFAULT_MAX_PINGS_BETWEEN_DATA 2
72 #define DEFAULT_MAX_PING_STRIKES 2
73
74 static int g_default_client_keepalive_time_ms =
75 DEFAULT_CLIENT_KEEPALIVE_TIME_MS;
76 static int g_default_client_keepalive_timeout_ms =
77 DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_MS;
78 static int g_default_server_keepalive_time_ms =
79 DEFAULT_SERVER_KEEPALIVE_TIME_MS;
80 static int g_default_server_keepalive_timeout_ms =
81 DEFAULT_SERVER_KEEPALIVE_TIMEOUT_MS;
82 static bool g_default_client_keepalive_permit_without_calls =
83 DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS;
84 static bool g_default_server_keepalive_permit_without_calls =
85 DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS;
86
87 static int g_default_min_sent_ping_interval_without_data_ms =
88 DEFAULT_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS;
89 static int g_default_min_recv_ping_interval_without_data_ms =
90 DEFAULT_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS;
91 static int g_default_max_pings_without_data = DEFAULT_MAX_PINGS_BETWEEN_DATA;
92 static int g_default_max_ping_strikes = DEFAULT_MAX_PING_STRIKES;
93
94 #define MAX_CLIENT_STREAM_ID 0x7fffffffu
95 grpc_core::TraceFlag grpc_http_trace(false, "http");
96 grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount(false,
97 "chttp2_refcount");
98
99 /* forward declarations of various callbacks that we'll build closures around */
100 static void write_action_begin_locked(void* t, grpc_error* error);
101 static void write_action(void* t, grpc_error* error);
102 static void write_action_end_locked(void* t, grpc_error* error);
103
104 static void read_action_locked(void* t, grpc_error* error);
105
106 static void complete_fetch_locked(void* gs, grpc_error* error);
107 /** Set a transport level setting, and push it to our peer */
108 static void queue_setting_update(grpc_chttp2_transport* t,
109 grpc_chttp2_setting_id id, uint32_t value);
110
111 static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
112 grpc_error* error);
113
114 /** Start new streams that have been created if we can */
115 static void maybe_start_some_streams(grpc_chttp2_transport* t);
116
117 static void connectivity_state_set(grpc_chttp2_transport* t,
118 grpc_connectivity_state state,
119 grpc_error* error, const char* reason);
120
121 static void benign_reclaimer_locked(void* t, grpc_error* error);
122 static void destructive_reclaimer_locked(void* t, grpc_error* error);
123
124 static void post_benign_reclaimer(grpc_chttp2_transport* t);
125 static void post_destructive_reclaimer(grpc_chttp2_transport* t);
126
127 static void close_transport_locked(grpc_chttp2_transport* t, grpc_error* error);
128 static void end_all_the_calls(grpc_chttp2_transport* t, grpc_error* error);
129
130 static void schedule_bdp_ping_locked(grpc_chttp2_transport* t);
131 static void start_bdp_ping_locked(void* tp, grpc_error* error);
132 static void finish_bdp_ping_locked(void* tp, grpc_error* error);
133 static void next_bdp_ping_timer_expired_locked(void* tp, grpc_error* error);
134
135 static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error);
136 static void send_ping_locked(grpc_chttp2_transport* t,
137 grpc_closure* on_initiate,
138 grpc_closure* on_complete);
139 static void retry_initiate_ping_locked(void* tp, grpc_error* error);
140
141 /** keepalive-relevant functions */
142 static void init_keepalive_ping_locked(void* arg, grpc_error* error);
143 static void start_keepalive_ping_locked(void* arg, grpc_error* error);
144 static void finish_keepalive_ping_locked(void* arg, grpc_error* error);
145 static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error);
146
147 static void reset_byte_stream(void* arg, grpc_error* error);
148
149 // Flow control default enabled. Can be disabled by setting
150 // GRPC_EXPERIMENTAL_DISABLE_FLOW_CONTROL
151 bool g_flow_control_enabled = true;
152
153 /*******************************************************************************
154 * CONSTRUCTION/DESTRUCTION/REFCOUNTING
155 */
156
destruct_transport(grpc_chttp2_transport * t)157 static void destruct_transport(grpc_chttp2_transport* t) {
158 size_t i;
159
160 grpc_endpoint_destroy(t->ep);
161
162 grpc_slice_buffer_destroy_internal(&t->qbuf);
163
164 grpc_slice_buffer_destroy_internal(&t->outbuf);
165 grpc_chttp2_hpack_compressor_destroy(&t->hpack_compressor);
166
167 grpc_slice_buffer_destroy_internal(&t->read_buffer);
168 grpc_chttp2_hpack_parser_destroy(&t->hpack_parser);
169 grpc_chttp2_goaway_parser_destroy(&t->goaway_parser);
170
171 for (i = 0; i < STREAM_LIST_COUNT; i++) {
172 GPR_ASSERT(t->lists[i].head == nullptr);
173 GPR_ASSERT(t->lists[i].tail == nullptr);
174 }
175
176 GRPC_ERROR_UNREF(t->goaway_error);
177
178 GPR_ASSERT(grpc_chttp2_stream_map_size(&t->stream_map) == 0);
179
180 grpc_chttp2_stream_map_destroy(&t->stream_map);
181 grpc_connectivity_state_destroy(&t->channel_callback.state_tracker);
182
183 GRPC_COMBINER_UNREF(t->combiner, "chttp2_transport");
184
185 cancel_pings(t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"));
186
187 while (t->write_cb_pool) {
188 grpc_chttp2_write_cb* next = t->write_cb_pool->next;
189 gpr_free(t->write_cb_pool);
190 t->write_cb_pool = next;
191 }
192
193 t->flow_control.Destroy();
194
195 GRPC_ERROR_UNREF(t->closed_with_error);
196 gpr_free(t->ping_acks);
197 gpr_free(t->peer_string);
198 gpr_free(t);
199 }
200
201 #ifndef NDEBUG
grpc_chttp2_unref_transport(grpc_chttp2_transport * t,const char * reason,const char * file,int line)202 void grpc_chttp2_unref_transport(grpc_chttp2_transport* t, const char* reason,
203 const char* file, int line) {
204 if (grpc_trace_chttp2_refcount.enabled()) {
205 gpr_atm val = gpr_atm_no_barrier_load(&t->refs.count);
206 gpr_log(GPR_DEBUG, "chttp2:unref:%p %" PRIdPTR "->%" PRIdPTR " %s [%s:%d]",
207 t, val, val - 1, reason, file, line);
208 }
209 if (!gpr_unref(&t->refs)) return;
210 destruct_transport(t);
211 }
212
grpc_chttp2_ref_transport(grpc_chttp2_transport * t,const char * reason,const char * file,int line)213 void grpc_chttp2_ref_transport(grpc_chttp2_transport* t, const char* reason,
214 const char* file, int line) {
215 if (grpc_trace_chttp2_refcount.enabled()) {
216 gpr_atm val = gpr_atm_no_barrier_load(&t->refs.count);
217 gpr_log(GPR_DEBUG, "chttp2: ref:%p %" PRIdPTR "->%" PRIdPTR " %s [%s:%d]",
218 t, val, val + 1, reason, file, line);
219 }
220 gpr_ref(&t->refs);
221 }
222 #else
grpc_chttp2_unref_transport(grpc_chttp2_transport * t)223 void grpc_chttp2_unref_transport(grpc_chttp2_transport* t) {
224 if (!gpr_unref(&t->refs)) return;
225 destruct_transport(t);
226 }
227
grpc_chttp2_ref_transport(grpc_chttp2_transport * t)228 void grpc_chttp2_ref_transport(grpc_chttp2_transport* t) { gpr_ref(&t->refs); }
229 #endif
230
231 static const grpc_transport_vtable* get_vtable(void);
232
233 /* Returns whether bdp is enabled */
read_channel_args(grpc_chttp2_transport * t,const grpc_channel_args * channel_args,bool is_client)234 static bool read_channel_args(grpc_chttp2_transport* t,
235 const grpc_channel_args* channel_args,
236 bool is_client) {
237 bool enable_bdp = true;
238 size_t i;
239 int j;
240
241 for (i = 0; i < channel_args->num_args; i++) {
242 if (0 == strcmp(channel_args->args[i].key,
243 GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER)) {
244 const grpc_integer_options options = {-1, 0, INT_MAX};
245 const int value =
246 grpc_channel_arg_get_integer(&channel_args->args[i], options);
247 if (value >= 0) {
248 if ((t->next_stream_id & 1) != (value & 1)) {
249 gpr_log(GPR_ERROR, "%s: low bit must be %d on %s",
250 GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER, t->next_stream_id & 1,
251 is_client ? "client" : "server");
252 } else {
253 t->next_stream_id = static_cast<uint32_t>(value);
254 }
255 }
256 } else if (0 == strcmp(channel_args->args[i].key,
257 GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER)) {
258 const grpc_integer_options options = {-1, 0, INT_MAX};
259 const int value =
260 grpc_channel_arg_get_integer(&channel_args->args[i], options);
261 if (value >= 0) {
262 grpc_chttp2_hpack_compressor_set_max_usable_size(
263 &t->hpack_compressor, static_cast<uint32_t>(value));
264 }
265 } else if (0 == strcmp(channel_args->args[i].key,
266 GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) {
267 t->ping_policy.max_pings_without_data = grpc_channel_arg_get_integer(
268 &channel_args->args[i],
269 {g_default_max_pings_without_data, 0, INT_MAX});
270 } else if (0 == strcmp(channel_args->args[i].key,
271 GRPC_ARG_HTTP2_MAX_PING_STRIKES)) {
272 t->ping_policy.max_ping_strikes = grpc_channel_arg_get_integer(
273 &channel_args->args[i], {g_default_max_ping_strikes, 0, INT_MAX});
274 } else if (0 ==
275 strcmp(channel_args->args[i].key,
276 GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS)) {
277 t->ping_policy.min_sent_ping_interval_without_data =
278 grpc_channel_arg_get_integer(
279 &channel_args->args[i],
280 grpc_integer_options{
281 g_default_min_sent_ping_interval_without_data_ms, 0,
282 INT_MAX});
283 } else if (0 ==
284 strcmp(channel_args->args[i].key,
285 GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS)) {
286 t->ping_policy.min_recv_ping_interval_without_data =
287 grpc_channel_arg_get_integer(
288 &channel_args->args[i],
289 grpc_integer_options{
290 g_default_min_recv_ping_interval_without_data_ms, 0,
291 INT_MAX});
292 } else if (0 == strcmp(channel_args->args[i].key,
293 GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)) {
294 t->write_buffer_size = static_cast<uint32_t>(grpc_channel_arg_get_integer(
295 &channel_args->args[i], {0, 0, MAX_WRITE_BUFFER_SIZE}));
296 } else if (0 ==
297 strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) {
298 enable_bdp = grpc_channel_arg_get_bool(&channel_args->args[i], true);
299 } else if (0 ==
300 strcmp(channel_args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) {
301 const int value = grpc_channel_arg_get_integer(
302 &channel_args->args[i],
303 grpc_integer_options{t->is_client
304 ? g_default_client_keepalive_time_ms
305 : g_default_server_keepalive_time_ms,
306 1, INT_MAX});
307 t->keepalive_time = value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value;
308 } else if (0 == strcmp(channel_args->args[i].key,
309 GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) {
310 const int value = grpc_channel_arg_get_integer(
311 &channel_args->args[i],
312 grpc_integer_options{t->is_client
313 ? g_default_client_keepalive_timeout_ms
314 : g_default_server_keepalive_timeout_ms,
315 0, INT_MAX});
316 t->keepalive_timeout = value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value;
317 } else if (0 == strcmp(channel_args->args[i].key,
318 GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) {
319 t->keepalive_permit_without_calls = static_cast<uint32_t>(
320 grpc_channel_arg_get_integer(&channel_args->args[i], {0, 0, 1}));
321 } else if (0 == strcmp(channel_args->args[i].key,
322 GRPC_ARG_OPTIMIZATION_TARGET)) {
323 if (channel_args->args[i].type != GRPC_ARG_STRING) {
324 gpr_log(GPR_ERROR, "%s should be a string",
325 GRPC_ARG_OPTIMIZATION_TARGET);
326 } else if (0 == strcmp(channel_args->args[i].value.string, "blend")) {
327 t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY;
328 } else if (0 == strcmp(channel_args->args[i].value.string, "latency")) {
329 t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY;
330 } else if (0 ==
331 strcmp(channel_args->args[i].value.string, "throughput")) {
332 t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT;
333 } else {
334 gpr_log(GPR_ERROR, "%s value '%s' unknown, assuming 'blend'",
335 GRPC_ARG_OPTIMIZATION_TARGET,
336 channel_args->args[i].value.string);
337 }
338 } else {
339 static const struct {
340 const char* channel_arg_name;
341 grpc_chttp2_setting_id setting_id;
342 grpc_integer_options integer_options;
343 bool availability[2] /* server, client */;
344 } settings_map[] = {{GRPC_ARG_MAX_CONCURRENT_STREAMS,
345 GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
346 {-1, 0, INT32_MAX},
347 {true, false}},
348 {GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER,
349 GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE,
350 {-1, 0, INT32_MAX},
351 {true, true}},
352 {GRPC_ARG_MAX_METADATA_SIZE,
353 GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
354 {-1, 0, INT32_MAX},
355 {true, true}},
356 {GRPC_ARG_HTTP2_MAX_FRAME_SIZE,
357 GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
358 {-1, 16384, 16777215},
359 {true, true}},
360 {GRPC_ARG_HTTP2_ENABLE_TRUE_BINARY,
361 GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA,
362 {1, 0, 1},
363 {true, true}},
364 {GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES,
365 GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
366 {-1, 5, INT32_MAX},
367 {true, true}}};
368 for (j = 0; j < static_cast<int> GPR_ARRAY_SIZE(settings_map); j++) {
369 if (0 == strcmp(channel_args->args[i].key,
370 settings_map[j].channel_arg_name)) {
371 if (!settings_map[j].availability[is_client]) {
372 gpr_log(GPR_DEBUG, "%s is not available on %s",
373 settings_map[j].channel_arg_name,
374 is_client ? "clients" : "servers");
375 } else {
376 int value = grpc_channel_arg_get_integer(
377 &channel_args->args[i], settings_map[j].integer_options);
378 if (value >= 0) {
379 queue_setting_update(t, settings_map[j].setting_id,
380 static_cast<uint32_t>(value));
381 }
382 }
383 break;
384 }
385 }
386 }
387 }
388 return enable_bdp;
389 }
390
init_transport_closures(grpc_chttp2_transport * t)391 static void init_transport_closures(grpc_chttp2_transport* t) {
392 GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t,
393 grpc_combiner_scheduler(t->combiner));
394 GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked, benign_reclaimer_locked, t,
395 grpc_combiner_scheduler(t->combiner));
396 GRPC_CLOSURE_INIT(&t->destructive_reclaimer_locked,
397 destructive_reclaimer_locked, t,
398 grpc_combiner_scheduler(t->combiner));
399 GRPC_CLOSURE_INIT(&t->retry_initiate_ping_locked, retry_initiate_ping_locked,
400 t, grpc_combiner_scheduler(t->combiner));
401 GRPC_CLOSURE_INIT(&t->start_bdp_ping_locked, start_bdp_ping_locked, t,
402 grpc_combiner_scheduler(t->combiner));
403 GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t,
404 grpc_combiner_scheduler(t->combiner));
405 GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked,
406 next_bdp_ping_timer_expired_locked, t,
407 grpc_combiner_scheduler(t->combiner));
408 GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping_locked,
409 t, grpc_combiner_scheduler(t->combiner));
410 GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked,
411 start_keepalive_ping_locked, t,
412 grpc_combiner_scheduler(t->combiner));
413 GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked,
414 finish_keepalive_ping_locked, t,
415 grpc_combiner_scheduler(t->combiner));
416 GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked,
417 keepalive_watchdog_fired_locked, t,
418 grpc_combiner_scheduler(t->combiner));
419 }
420
init_transport_keepalive_settings(grpc_chttp2_transport * t)421 static void init_transport_keepalive_settings(grpc_chttp2_transport* t) {
422 if (t->is_client) {
423 t->keepalive_time = g_default_client_keepalive_time_ms == INT_MAX
424 ? GRPC_MILLIS_INF_FUTURE
425 : g_default_client_keepalive_time_ms;
426 t->keepalive_timeout = g_default_client_keepalive_timeout_ms == INT_MAX
427 ? GRPC_MILLIS_INF_FUTURE
428 : g_default_client_keepalive_timeout_ms;
429 t->keepalive_permit_without_calls =
430 g_default_client_keepalive_permit_without_calls;
431 } else {
432 t->keepalive_time = g_default_server_keepalive_time_ms == INT_MAX
433 ? GRPC_MILLIS_INF_FUTURE
434 : g_default_server_keepalive_time_ms;
435 t->keepalive_timeout = g_default_server_keepalive_timeout_ms == INT_MAX
436 ? GRPC_MILLIS_INF_FUTURE
437 : g_default_server_keepalive_timeout_ms;
438 t->keepalive_permit_without_calls =
439 g_default_server_keepalive_permit_without_calls;
440 }
441 }
442
configure_transport_ping_policy(grpc_chttp2_transport * t)443 static void configure_transport_ping_policy(grpc_chttp2_transport* t) {
444 t->ping_policy.max_pings_without_data = g_default_max_pings_without_data;
445 t->ping_policy.min_sent_ping_interval_without_data =
446 g_default_min_sent_ping_interval_without_data_ms;
447 t->ping_policy.max_ping_strikes = g_default_max_ping_strikes;
448 t->ping_policy.min_recv_ping_interval_without_data =
449 g_default_min_recv_ping_interval_without_data_ms;
450 }
451
init_keepalive_pings_if_enabled(grpc_chttp2_transport * t)452 static void init_keepalive_pings_if_enabled(grpc_chttp2_transport* t) {
453 if (t->keepalive_time != GRPC_MILLIS_INF_FUTURE) {
454 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
455 GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
456 grpc_timer_init(&t->keepalive_ping_timer,
457 grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
458 &t->init_keepalive_ping_locked);
459 } else {
460 /* Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no
461 inflight keeaplive timers */
462 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED;
463 }
464 }
465
init_transport(grpc_chttp2_transport * t,const grpc_channel_args * channel_args,grpc_endpoint * ep,bool is_client)466 static void init_transport(grpc_chttp2_transport* t,
467 const grpc_channel_args* channel_args,
468 grpc_endpoint* ep, bool is_client) {
469 GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) ==
470 GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
471
472 t->base.vtable = get_vtable();
473 t->ep = ep;
474 /* one ref is for destroy */
475 gpr_ref_init(&t->refs, 1);
476 t->combiner = grpc_combiner_create();
477 t->peer_string = grpc_endpoint_get_peer(ep);
478 t->endpoint_reading = 1;
479 t->next_stream_id = is_client ? 1 : 2;
480 t->is_client = is_client;
481 t->deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
482 t->is_first_frame = true;
483 grpc_connectivity_state_init(
484 &t->channel_callback.state_tracker, GRPC_CHANNEL_READY,
485 is_client ? "client_transport" : "server_transport");
486
487 grpc_slice_buffer_init(&t->qbuf);
488 grpc_slice_buffer_init(&t->outbuf);
489 grpc_chttp2_hpack_compressor_init(&t->hpack_compressor);
490
491 init_transport_closures(t);
492
493 t->goaway_error = GRPC_ERROR_NONE;
494 grpc_chttp2_goaway_parser_init(&t->goaway_parser);
495 grpc_chttp2_hpack_parser_init(&t->hpack_parser);
496
497 grpc_slice_buffer_init(&t->read_buffer);
498
499 /* 8 is a random stab in the dark as to a good initial size: it's small enough
500 that it shouldn't waste memory for infrequently used connections, yet
501 large enough that the exponential growth should happen nicely when it's
502 needed.
503 TODO(ctiller): tune this */
504 grpc_chttp2_stream_map_init(&t->stream_map, 8);
505
506 /* copy in initial settings to all setting sets */
507 size_t i;
508 int j;
509 for (i = 0; i < GRPC_CHTTP2_NUM_SETTINGS; i++) {
510 for (j = 0; j < GRPC_NUM_SETTING_SETS; j++) {
511 t->settings[j][i] = grpc_chttp2_settings_parameters[i].default_value;
512 }
513 }
514 t->dirtied_local_settings = 1;
515 /* Hack: it's common for implementations to assume 65536 bytes initial send
516 window -- this should by rights be 0 */
517 t->force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
518 t->sent_local_settings = 0;
519 t->write_buffer_size = grpc_core::chttp2::kDefaultWindow;
520
521 if (is_client) {
522 grpc_slice_buffer_add(&t->outbuf, grpc_slice_from_copied_string(
523 GRPC_CHTTP2_CLIENT_CONNECT_STRING));
524 }
525
526 /* configure http2 the way we like it */
527 if (is_client) {
528 queue_setting_update(t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
529 queue_setting_update(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
530 }
531 queue_setting_update(t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
532 DEFAULT_MAX_HEADER_LIST_SIZE);
533 queue_setting_update(t, GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA,
534 1);
535
536 configure_transport_ping_policy(t);
537 init_transport_keepalive_settings(t);
538
539 t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY;
540
541 bool enable_bdp = true;
542 if (channel_args) {
543 enable_bdp = read_channel_args(t, channel_args, is_client);
544 }
545
546 if (g_flow_control_enabled) {
547 t->flow_control.Init<grpc_core::chttp2::TransportFlowControl>(t,
548 enable_bdp);
549 } else {
550 t->flow_control.Init<grpc_core::chttp2::TransportFlowControlDisabled>(t);
551 enable_bdp = false;
552 }
553
554 /* No pings allowed before receiving a header or data frame. */
555 t->ping_state.pings_before_data_required = 0;
556 t->ping_state.is_delayed_ping_timer_set = false;
557 t->ping_state.last_ping_sent_time = GRPC_MILLIS_INF_PAST;
558
559 t->ping_recv_state.last_ping_recv_time = GRPC_MILLIS_INF_PAST;
560 t->ping_recv_state.ping_strikes = 0;
561
562 init_keepalive_pings_if_enabled(t);
563
564 if (enable_bdp) {
565 GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping");
566 schedule_bdp_ping_locked(t);
567 grpc_chttp2_act_on_flowctl_action(t->flow_control->PeriodicUpdate(), t,
568 nullptr);
569 }
570
571 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE);
572 post_benign_reclaimer(t);
573 }
574
destroy_transport_locked(void * tp,grpc_error * error)575 static void destroy_transport_locked(void* tp, grpc_error* error) {
576 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
577 t->destroying = 1;
578 close_transport_locked(
579 t, grpc_error_set_int(
580 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"),
581 GRPC_ERROR_INT_OCCURRED_DURING_WRITE, t->write_state));
582 GRPC_CHTTP2_UNREF_TRANSPORT(t, "destroy");
583 }
584
destroy_transport(grpc_transport * gt)585 static void destroy_transport(grpc_transport* gt) {
586 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
587 GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(destroy_transport_locked, t,
588 grpc_combiner_scheduler(t->combiner)),
589 GRPC_ERROR_NONE);
590 }
591
close_transport_locked(grpc_chttp2_transport * t,grpc_error * error)592 static void close_transport_locked(grpc_chttp2_transport* t,
593 grpc_error* error) {
594 end_all_the_calls(t, GRPC_ERROR_REF(error));
595 cancel_pings(t, GRPC_ERROR_REF(error));
596 if (t->closed_with_error == GRPC_ERROR_NONE) {
597 if (!grpc_error_has_clear_grpc_status(error)) {
598 error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
599 GRPC_STATUS_UNAVAILABLE);
600 }
601 if (t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE) {
602 if (t->close_transport_on_writes_finished == nullptr) {
603 t->close_transport_on_writes_finished =
604 GRPC_ERROR_CREATE_FROM_STATIC_STRING(
605 "Delayed close due to in-progress write");
606 }
607 t->close_transport_on_writes_finished =
608 grpc_error_add_child(t->close_transport_on_writes_finished, error);
609 return;
610 }
611 GPR_ASSERT(error != GRPC_ERROR_NONE);
612 t->closed_with_error = GRPC_ERROR_REF(error);
613 connectivity_state_set(t, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error),
614 "close_transport");
615 if (t->ping_state.is_delayed_ping_timer_set) {
616 grpc_timer_cancel(&t->ping_state.delayed_ping_timer);
617 }
618 if (t->have_next_bdp_ping_timer) {
619 grpc_timer_cancel(&t->next_bdp_ping_timer);
620 }
621 switch (t->keepalive_state) {
622 case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING:
623 grpc_timer_cancel(&t->keepalive_ping_timer);
624 break;
625 case GRPC_CHTTP2_KEEPALIVE_STATE_PINGING:
626 grpc_timer_cancel(&t->keepalive_ping_timer);
627 grpc_timer_cancel(&t->keepalive_watchdog_timer);
628 break;
629 case GRPC_CHTTP2_KEEPALIVE_STATE_DYING:
630 case GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED:
631 /* keepalive timers are not set in these two states */
632 break;
633 }
634
635 /* flush writable stream list to avoid dangling references */
636 grpc_chttp2_stream* s;
637 while (grpc_chttp2_list_pop_writable_stream(t, &s)) {
638 GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:close");
639 }
640 GPR_ASSERT(t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE);
641 grpc_endpoint_shutdown(t->ep, GRPC_ERROR_REF(error));
642 }
643 if (t->notify_on_receive_settings != nullptr) {
644 GRPC_CLOSURE_SCHED(t->notify_on_receive_settings, GRPC_ERROR_CANCELLED);
645 t->notify_on_receive_settings = nullptr;
646 }
647 GRPC_ERROR_UNREF(error);
648 }
649
650 #ifndef NDEBUG
grpc_chttp2_stream_ref(grpc_chttp2_stream * s,const char * reason)651 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s, const char* reason) {
652 grpc_stream_ref(s->refcount, reason);
653 }
grpc_chttp2_stream_unref(grpc_chttp2_stream * s,const char * reason)654 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s, const char* reason) {
655 grpc_stream_unref(s->refcount, reason);
656 }
657 #else
grpc_chttp2_stream_ref(grpc_chttp2_stream * s)658 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s) {
659 grpc_stream_ref(s->refcount);
660 }
grpc_chttp2_stream_unref(grpc_chttp2_stream * s)661 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s) {
662 grpc_stream_unref(s->refcount);
663 }
664 #endif
665
init_stream(grpc_transport * gt,grpc_stream * gs,grpc_stream_refcount * refcount,const void * server_data,gpr_arena * arena)666 static int init_stream(grpc_transport* gt, grpc_stream* gs,
667 grpc_stream_refcount* refcount, const void* server_data,
668 gpr_arena* arena) {
669 GPR_TIMER_SCOPE("init_stream", 0);
670 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
671 grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs);
672
673 s->t = t;
674 s->refcount = refcount;
675 /* We reserve one 'active stream' that's dropped when the stream is
676 read-closed. The others are for Chttp2IncomingByteStreams that are
677 actively reading */
678 GRPC_CHTTP2_STREAM_REF(s, "chttp2");
679
680 grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0], arena);
681 grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[1], arena);
682 grpc_chttp2_data_parser_init(&s->data_parser);
683 grpc_slice_buffer_init(&s->flow_controlled_buffer);
684 s->deadline = GRPC_MILLIS_INF_FUTURE;
685 GRPC_CLOSURE_INIT(&s->complete_fetch_locked, complete_fetch_locked, s,
686 grpc_schedule_on_exec_ctx);
687 grpc_slice_buffer_init(&s->unprocessed_incoming_frames_buffer);
688 s->unprocessed_incoming_frames_buffer_cached_length = 0;
689 grpc_slice_buffer_init(&s->frame_storage);
690 grpc_slice_buffer_init(&s->compressed_data_buffer);
691 grpc_slice_buffer_init(&s->decompressed_data_buffer);
692 s->pending_byte_stream = false;
693 s->decompressed_header_bytes = 0;
694 GRPC_CLOSURE_INIT(&s->reset_byte_stream, reset_byte_stream, s,
695 grpc_combiner_scheduler(t->combiner));
696
697 GRPC_CHTTP2_REF_TRANSPORT(t, "stream");
698
699 if (server_data) {
700 s->id = static_cast<uint32_t>((uintptr_t)server_data);
701 *t->accepting_stream = s;
702 grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
703 post_destructive_reclaimer(t);
704 }
705
706 if (t->flow_control->flow_control_enabled()) {
707 s->flow_control.Init<grpc_core::chttp2::StreamFlowControl>(
708 static_cast<grpc_core::chttp2::TransportFlowControl*>(
709 t->flow_control.get()),
710 s);
711 } else {
712 s->flow_control.Init<grpc_core::chttp2::StreamFlowControlDisabled>();
713 }
714
715 return 0;
716 }
717
destroy_stream_locked(void * sp,grpc_error * error)718 static void destroy_stream_locked(void* sp, grpc_error* error) {
719 GPR_TIMER_SCOPE("destroy_stream", 0);
720 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(sp);
721 grpc_chttp2_transport* t = s->t;
722
723 GPR_ASSERT((s->write_closed && s->read_closed) || s->id == 0);
724 if (s->id != 0) {
725 GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, s->id) == nullptr);
726 }
727
728 grpc_slice_buffer_destroy_internal(&s->unprocessed_incoming_frames_buffer);
729 grpc_slice_buffer_destroy_internal(&s->frame_storage);
730 grpc_slice_buffer_destroy_internal(&s->compressed_data_buffer);
731 grpc_slice_buffer_destroy_internal(&s->decompressed_data_buffer);
732
733 grpc_chttp2_list_remove_stalled_by_transport(t, s);
734 grpc_chttp2_list_remove_stalled_by_stream(t, s);
735
736 for (int i = 0; i < STREAM_LIST_COUNT; i++) {
737 if (GPR_UNLIKELY(s->included[i])) {
738 gpr_log(GPR_ERROR, "%s stream %d still included in list %d",
739 t->is_client ? "client" : "server", s->id, i);
740 abort();
741 }
742 }
743
744 GPR_ASSERT(s->send_initial_metadata_finished == nullptr);
745 GPR_ASSERT(s->fetching_send_message == nullptr);
746 GPR_ASSERT(s->send_trailing_metadata_finished == nullptr);
747 GPR_ASSERT(s->recv_initial_metadata_ready == nullptr);
748 GPR_ASSERT(s->recv_message_ready == nullptr);
749 GPR_ASSERT(s->recv_trailing_metadata_finished == nullptr);
750 grpc_chttp2_data_parser_destroy(&s->data_parser);
751 grpc_chttp2_incoming_metadata_buffer_destroy(&s->metadata_buffer[0]);
752 grpc_chttp2_incoming_metadata_buffer_destroy(&s->metadata_buffer[1]);
753 grpc_slice_buffer_destroy_internal(&s->flow_controlled_buffer);
754 GRPC_ERROR_UNREF(s->read_closed_error);
755 GRPC_ERROR_UNREF(s->write_closed_error);
756 GRPC_ERROR_UNREF(s->byte_stream_error);
757
758 s->flow_control.Destroy();
759
760 GRPC_CHTTP2_UNREF_TRANSPORT(t, "stream");
761
762 GRPC_CLOSURE_SCHED(s->destroy_stream_arg, GRPC_ERROR_NONE);
763 }
764
destroy_stream(grpc_transport * gt,grpc_stream * gs,grpc_closure * then_schedule_closure)765 static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
766 grpc_closure* then_schedule_closure) {
767 GPR_TIMER_SCOPE("destroy_stream", 0);
768 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
769 grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs);
770
771 if (s->stream_compression_ctx != nullptr) {
772 grpc_stream_compression_context_destroy(s->stream_compression_ctx);
773 s->stream_compression_ctx = nullptr;
774 }
775 if (s->stream_decompression_ctx != nullptr) {
776 grpc_stream_compression_context_destroy(s->stream_decompression_ctx);
777 s->stream_decompression_ctx = nullptr;
778 }
779
780 s->destroy_stream_arg = then_schedule_closure;
781 GRPC_CLOSURE_SCHED(
782 GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s,
783 grpc_combiner_scheduler(t->combiner)),
784 GRPC_ERROR_NONE);
785 }
786
grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport * t,uint32_t id)787 grpc_chttp2_stream* grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport* t,
788 uint32_t id) {
789 return static_cast<grpc_chttp2_stream*>(
790 grpc_chttp2_stream_map_find(&t->stream_map, id));
791 }
792
grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport * t,uint32_t id)793 grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t,
794 uint32_t id) {
795 if (t->channel_callback.accept_stream == nullptr) {
796 return nullptr;
797 }
798 grpc_chttp2_stream* accepting;
799 GPR_ASSERT(t->accepting_stream == nullptr);
800 t->accepting_stream = &accepting;
801 t->channel_callback.accept_stream(t->channel_callback.accept_stream_user_data,
802 &t->base,
803 (void*)static_cast<uintptr_t>(id));
804 t->accepting_stream = nullptr;
805 return accepting;
806 }
807
808 /*******************************************************************************
809 * OUTPUT PROCESSING
810 */
811
write_state_name(grpc_chttp2_write_state st)812 static const char* write_state_name(grpc_chttp2_write_state st) {
813 switch (st) {
814 case GRPC_CHTTP2_WRITE_STATE_IDLE:
815 return "IDLE";
816 case GRPC_CHTTP2_WRITE_STATE_WRITING:
817 return "WRITING";
818 case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
819 return "WRITING+MORE";
820 }
821 GPR_UNREACHABLE_CODE(return "UNKNOWN");
822 }
823
set_write_state(grpc_chttp2_transport * t,grpc_chttp2_write_state st,const char * reason)824 static void set_write_state(grpc_chttp2_transport* t,
825 grpc_chttp2_write_state st, const char* reason) {
826 GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "W:%p %s state %s -> %s [%s]", t,
827 t->is_client ? "CLIENT" : "SERVER",
828 write_state_name(t->write_state),
829 write_state_name(st), reason));
830 t->write_state = st;
831 /* If the state is being reset back to idle, it means a write was just
832 * finished. Make sure all the run_after_write closures are scheduled.
833 *
834 * This is also our chance to close the transport if the transport was marked
835 * to be closed after all writes finish (for example, if we received a go-away
836 * from peer while we had some pending writes) */
837 if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) {
838 GRPC_CLOSURE_LIST_SCHED(&t->run_after_write);
839 if (t->close_transport_on_writes_finished != nullptr) {
840 grpc_error* err = t->close_transport_on_writes_finished;
841 t->close_transport_on_writes_finished = nullptr;
842 close_transport_locked(t, err);
843 }
844 }
845 }
846
inc_initiate_write_reason(grpc_chttp2_initiate_write_reason reason)847 static void inc_initiate_write_reason(
848 grpc_chttp2_initiate_write_reason reason) {
849 switch (reason) {
850 case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE:
851 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_INITIAL_WRITE();
852 break;
853 case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM:
854 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_START_NEW_STREAM();
855 break;
856 case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE:
857 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_MESSAGE();
858 break;
859 case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA:
860 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_INITIAL_METADATA();
861 break;
862 case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA:
863 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_TRAILING_METADATA();
864 break;
865 case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING:
866 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RETRY_SEND_PING();
867 break;
868 case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS:
869 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CONTINUE_PINGS();
870 break;
871 case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT:
872 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_GOAWAY_SENT();
873 break;
874 case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM:
875 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RST_STREAM();
876 break;
877 case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API:
878 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CLOSE_FROM_API();
879 break;
880 case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL:
881 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_STREAM_FLOW_CONTROL();
882 break;
883 case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL:
884 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL();
885 break;
886 case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS:
887 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_SETTINGS();
888 break;
889 case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING:
890 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_SETTING();
891 break;
892 case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE:
893 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_UPDATE();
894 break;
895 case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING:
896 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_APPLICATION_PING();
897 break;
898 case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING:
899 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_KEEPALIVE_PING();
900 break;
901 case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED:
902 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL_UNSTALLED();
903 break;
904 case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE:
905 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_PING_RESPONSE();
906 break;
907 case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM:
908 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FORCE_RST_STREAM();
909 break;
910 }
911 }
912
grpc_chttp2_initiate_write(grpc_chttp2_transport * t,grpc_chttp2_initiate_write_reason reason)913 void grpc_chttp2_initiate_write(grpc_chttp2_transport* t,
914 grpc_chttp2_initiate_write_reason reason) {
915 GPR_TIMER_SCOPE("grpc_chttp2_initiate_write", 0);
916
917 switch (t->write_state) {
918 case GRPC_CHTTP2_WRITE_STATE_IDLE:
919 inc_initiate_write_reason(reason);
920 set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING,
921 grpc_chttp2_initiate_write_reason_string(reason));
922 t->is_first_write_in_batch = true;
923 GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
924 /* Note that the 'write_action_begin_locked' closure is being scheduled
925 * on the 'finally_scheduler' of t->combiner. This means that
926 * 'write_action_begin_locked' is called only *after* all the other
927 * closures (some of which are potentially initiating more writes on the
928 * transport) are executed on the t->combiner.
929 *
930 * The reason for scheduling on finally_scheduler is to make sure we batch
931 * as many writes as possible. 'write_action_begin_locked' is the function
932 * that gathers all the relevant bytes (which are at various places in the
933 * grpc_chttp2_transport structure) and append them to 'outbuf' field in
934 * grpc_chttp2_transport thereby batching what would have been potentially
935 * multiple write operations.
936 *
937 * Also, 'write_action_begin_locked' only gathers the bytes into outbuf.
938 * It does not call the endpoint to write the bytes. That is done by the
939 * 'write_action' (which is scheduled by 'write_action_begin_locked') */
940 GRPC_CLOSURE_SCHED(
941 GRPC_CLOSURE_INIT(&t->write_action_begin_locked,
942 write_action_begin_locked, t,
943 grpc_combiner_finally_scheduler(t->combiner)),
944 GRPC_ERROR_NONE);
945 break;
946 case GRPC_CHTTP2_WRITE_STATE_WRITING:
947 set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
948 grpc_chttp2_initiate_write_reason_string(reason));
949 break;
950 case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
951 break;
952 }
953 }
954
grpc_chttp2_mark_stream_writable(grpc_chttp2_transport * t,grpc_chttp2_stream * s)955 void grpc_chttp2_mark_stream_writable(grpc_chttp2_transport* t,
956 grpc_chttp2_stream* s) {
957 if (t->closed_with_error == GRPC_ERROR_NONE &&
958 grpc_chttp2_list_add_writable_stream(t, s)) {
959 GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become");
960 }
961 }
962
write_scheduler(grpc_chttp2_transport * t,bool early_results_scheduled,bool partial_write)963 static grpc_closure_scheduler* write_scheduler(grpc_chttp2_transport* t,
964 bool early_results_scheduled,
965 bool partial_write) {
966 /* if it's not the first write in a batch, always offload to the executor:
967 we'll probably end up queuing against the kernel anyway, so we'll likely
968 get better latency overall if we switch writing work elsewhere and continue
969 with application work above */
970 if (!t->is_first_write_in_batch) {
971 return grpc_executor_scheduler(GRPC_EXECUTOR_SHORT);
972 }
973 /* equivalently, if it's a partial write, we *know* we're going to be taking a
974 thread jump to write it because of the above, may as well do so
975 immediately */
976 if (partial_write) {
977 return grpc_executor_scheduler(GRPC_EXECUTOR_SHORT);
978 }
979 switch (t->opt_target) {
980 case GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT:
981 /* executor gives us the largest probability of being able to batch a
982 * write with others on this transport */
983 return grpc_executor_scheduler(GRPC_EXECUTOR_SHORT);
984 case GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY:
985 return grpc_schedule_on_exec_ctx;
986 }
987 GPR_UNREACHABLE_CODE(return nullptr);
988 }
989
990 #define WRITE_STATE_TUPLE_TO_INT(p, i) (2 * (int)(p) + (int)(i))
begin_writing_desc(bool partial,bool inlined)991 static const char* begin_writing_desc(bool partial, bool inlined) {
992 switch (WRITE_STATE_TUPLE_TO_INT(partial, inlined)) {
993 case WRITE_STATE_TUPLE_TO_INT(false, false):
994 return "begin write in background";
995 case WRITE_STATE_TUPLE_TO_INT(false, true):
996 return "begin write in current thread";
997 case WRITE_STATE_TUPLE_TO_INT(true, false):
998 return "begin partial write in background";
999 case WRITE_STATE_TUPLE_TO_INT(true, true):
1000 return "begin partial write in current thread";
1001 }
1002 GPR_UNREACHABLE_CODE(return "bad state tuple");
1003 }
1004
write_action_begin_locked(void * gt,grpc_error * error_ignored)1005 static void write_action_begin_locked(void* gt, grpc_error* error_ignored) {
1006 GPR_TIMER_SCOPE("write_action_begin_locked", 0);
1007 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(gt);
1008 GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE);
1009 grpc_chttp2_begin_write_result r;
1010 if (t->closed_with_error != GRPC_ERROR_NONE) {
1011 r.writing = false;
1012 } else {
1013 r = grpc_chttp2_begin_write(t);
1014 }
1015 if (r.writing) {
1016 if (r.partial) {
1017 GRPC_STATS_INC_HTTP2_PARTIAL_WRITES();
1018 }
1019 if (!t->is_first_write_in_batch) {
1020 GRPC_STATS_INC_HTTP2_WRITES_CONTINUED();
1021 }
1022 grpc_closure_scheduler* scheduler =
1023 write_scheduler(t, r.early_results_scheduled, r.partial);
1024 if (scheduler != grpc_schedule_on_exec_ctx) {
1025 GRPC_STATS_INC_HTTP2_WRITES_OFFLOADED();
1026 }
1027 set_write_state(
1028 t,
1029 r.partial ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE
1030 : GRPC_CHTTP2_WRITE_STATE_WRITING,
1031 begin_writing_desc(r.partial, scheduler == grpc_schedule_on_exec_ctx));
1032 GRPC_CLOSURE_SCHED(
1033 GRPC_CLOSURE_INIT(&t->write_action, write_action, t, scheduler),
1034 GRPC_ERROR_NONE);
1035 } else {
1036 GRPC_STATS_INC_HTTP2_SPURIOUS_WRITES_BEGUN();
1037 set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "begin writing nothing");
1038 GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing");
1039 }
1040 }
1041
write_action(void * gt,grpc_error * error)1042 static void write_action(void* gt, grpc_error* error) {
1043 GPR_TIMER_SCOPE("write_action", 0);
1044 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(gt);
1045 grpc_endpoint_write(
1046 t->ep, &t->outbuf,
1047 GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end_locked, t,
1048 grpc_combiner_scheduler(t->combiner)),
1049 nullptr);
1050 }
1051
1052 /* Callback from the grpc_endpoint after bytes have been written by calling
1053 * sendmsg */
write_action_end_locked(void * tp,grpc_error * error)1054 static void write_action_end_locked(void* tp, grpc_error* error) {
1055 GPR_TIMER_SCOPE("terminate_writing_with_lock", 0);
1056 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
1057
1058 if (error != GRPC_ERROR_NONE) {
1059 close_transport_locked(t, GRPC_ERROR_REF(error));
1060 }
1061
1062 if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED) {
1063 t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SENT;
1064 if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
1065 close_transport_locked(
1066 t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("goaway sent"));
1067 }
1068 }
1069
1070 switch (t->write_state) {
1071 case GRPC_CHTTP2_WRITE_STATE_IDLE:
1072 GPR_UNREACHABLE_CODE(break);
1073 case GRPC_CHTTP2_WRITE_STATE_WRITING:
1074 GPR_TIMER_MARK("state=writing", 0);
1075 set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "finish writing");
1076 break;
1077 case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
1078 GPR_TIMER_MARK("state=writing_stale_no_poller", 0);
1079 set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, "continue writing");
1080 t->is_first_write_in_batch = false;
1081 GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
1082 GRPC_CLOSURE_RUN(
1083 GRPC_CLOSURE_INIT(&t->write_action_begin_locked,
1084 write_action_begin_locked, t,
1085 grpc_combiner_finally_scheduler(t->combiner)),
1086 GRPC_ERROR_NONE);
1087 break;
1088 }
1089
1090 grpc_chttp2_end_write(t, GRPC_ERROR_REF(error));
1091
1092 GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing");
1093 }
1094
1095 // Dirties an HTTP2 setting to be sent out next time a writing path occurs.
1096 // If the change needs to occur immediately, manually initiate a write.
queue_setting_update(grpc_chttp2_transport * t,grpc_chttp2_setting_id id,uint32_t value)1097 static void queue_setting_update(grpc_chttp2_transport* t,
1098 grpc_chttp2_setting_id id, uint32_t value) {
1099 const grpc_chttp2_setting_parameters* sp =
1100 &grpc_chttp2_settings_parameters[id];
1101 uint32_t use_value = GPR_CLAMP(value, sp->min_value, sp->max_value);
1102 if (use_value != value) {
1103 gpr_log(GPR_INFO, "Requested parameter %s clamped from %d to %d", sp->name,
1104 value, use_value);
1105 }
1106 if (use_value != t->settings[GRPC_LOCAL_SETTINGS][id]) {
1107 t->settings[GRPC_LOCAL_SETTINGS][id] = use_value;
1108 t->dirtied_local_settings = 1;
1109 }
1110 }
1111
grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport * t,uint32_t goaway_error,grpc_slice goaway_text)1112 void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
1113 uint32_t goaway_error,
1114 grpc_slice goaway_text) {
1115 // GRPC_CHTTP2_IF_TRACING(
1116 // gpr_log(GPR_INFO, "got goaway [%d]: %s", goaway_error, msg));
1117
1118 // Discard the error from a previous goaway frame (if any)
1119 if (t->goaway_error != GRPC_ERROR_NONE) {
1120 GRPC_ERROR_UNREF(t->goaway_error);
1121 }
1122 t->goaway_error = grpc_error_set_str(
1123 grpc_error_set_int(
1124 GRPC_ERROR_CREATE_FROM_STATIC_STRING("GOAWAY received"),
1125 GRPC_ERROR_INT_HTTP2_ERROR, static_cast<intptr_t>(goaway_error)),
1126 GRPC_ERROR_STR_RAW_BYTES, goaway_text);
1127
1128 /* When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug
1129 * data equal to "too_many_pings", it should log the occurrence at a log level
1130 * that is enabled by default and double the configured KEEPALIVE_TIME used
1131 * for new connections on that channel. */
1132 if (GPR_UNLIKELY(t->is_client &&
1133 goaway_error == GRPC_HTTP2_ENHANCE_YOUR_CALM &&
1134 grpc_slice_str_cmp(goaway_text, "too_many_pings") == 0)) {
1135 gpr_log(GPR_ERROR,
1136 "Received a GOAWAY with error code ENHANCE_YOUR_CALM and debug "
1137 "data equal to \"too_many_pings\"");
1138 double current_keepalive_time_ms = static_cast<double>(t->keepalive_time);
1139 t->keepalive_time =
1140 current_keepalive_time_ms > INT_MAX / KEEPALIVE_TIME_BACKOFF_MULTIPLIER
1141 ? GRPC_MILLIS_INF_FUTURE
1142 : static_cast<grpc_millis>(current_keepalive_time_ms *
1143 KEEPALIVE_TIME_BACKOFF_MULTIPLIER);
1144 }
1145
1146 /* lie: use transient failure from the transport to indicate goaway has been
1147 * received */
1148 connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE,
1149 GRPC_ERROR_REF(t->goaway_error), "got_goaway");
1150 }
1151
maybe_start_some_streams(grpc_chttp2_transport * t)1152 static void maybe_start_some_streams(grpc_chttp2_transport* t) {
1153 grpc_chttp2_stream* s;
1154 /* start streams where we have free grpc_chttp2_stream ids and free
1155 * concurrency */
1156 while (t->next_stream_id <= MAX_CLIENT_STREAM_ID &&
1157 grpc_chttp2_stream_map_size(&t->stream_map) <
1158 t->settings[GRPC_PEER_SETTINGS]
1159 [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] &&
1160 grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1161 /* safe since we can't (legally) be parsing this stream yet */
1162 GRPC_CHTTP2_IF_TRACING(gpr_log(
1163 GPR_INFO, "HTTP:%s: Allocating new grpc_chttp2_stream %p to id %d",
1164 t->is_client ? "CLI" : "SVR", s, t->next_stream_id));
1165
1166 GPR_ASSERT(s->id == 0);
1167 s->id = t->next_stream_id;
1168 t->next_stream_id += 2;
1169
1170 if (t->next_stream_id >= MAX_CLIENT_STREAM_ID) {
1171 connectivity_state_set(
1172 t, GRPC_CHANNEL_TRANSIENT_FAILURE,
1173 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream IDs exhausted"),
1174 "no_more_stream_ids");
1175 }
1176
1177 grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
1178 post_destructive_reclaimer(t);
1179 grpc_chttp2_mark_stream_writable(t, s);
1180 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM);
1181 }
1182 /* cancel out streams that will never be started */
1183 while (t->next_stream_id >= MAX_CLIENT_STREAM_ID &&
1184 grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1185 grpc_chttp2_cancel_stream(
1186 t, s,
1187 grpc_error_set_int(
1188 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream IDs exhausted"),
1189 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1190 }
1191 }
1192
1193 /* Flag that this closure barrier may be covering a write in a pollset, and so
1194 we should not complete this closure until we can prove that the write got
1195 scheduled */
1196 #define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 0)
1197 /* First bit of the reference count, stored in the high order bits (with the low
1198 bits being used for flags defined above) */
1199 #define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16)
1200
add_closure_barrier(grpc_closure * closure)1201 static grpc_closure* add_closure_barrier(grpc_closure* closure) {
1202 closure->next_data.scratch += CLOSURE_BARRIER_FIRST_REF_BIT;
1203 return closure;
1204 }
1205
null_then_run_closure(grpc_closure ** closure,grpc_error * error)1206 static void null_then_run_closure(grpc_closure** closure, grpc_error* error) {
1207 grpc_closure* c = *closure;
1208 *closure = nullptr;
1209 GRPC_CLOSURE_RUN(c, error);
1210 }
1211
grpc_chttp2_complete_closure_step(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_closure ** pclosure,grpc_error * error,const char * desc)1212 void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
1213 grpc_chttp2_stream* s,
1214 grpc_closure** pclosure,
1215 grpc_error* error, const char* desc) {
1216 grpc_closure* closure = *pclosure;
1217 *pclosure = nullptr;
1218 if (closure == nullptr) {
1219 GRPC_ERROR_UNREF(error);
1220 return;
1221 }
1222 closure->next_data.scratch -= CLOSURE_BARRIER_FIRST_REF_BIT;
1223 if (grpc_http_trace.enabled()) {
1224 const char* errstr = grpc_error_string(error);
1225 gpr_log(
1226 GPR_INFO,
1227 "complete_closure_step: t=%p %p refs=%d flags=0x%04x desc=%s err=%s "
1228 "write_state=%s",
1229 t, closure,
1230 static_cast<int>(closure->next_data.scratch /
1231 CLOSURE_BARRIER_FIRST_REF_BIT),
1232 static_cast<int>(closure->next_data.scratch %
1233 CLOSURE_BARRIER_FIRST_REF_BIT),
1234 desc, errstr, write_state_name(t->write_state));
1235 }
1236 if (error != GRPC_ERROR_NONE) {
1237 if (closure->error_data.error == GRPC_ERROR_NONE) {
1238 closure->error_data.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1239 "Error in HTTP transport completing operation");
1240 closure->error_data.error = grpc_error_set_str(
1241 closure->error_data.error, GRPC_ERROR_STR_TARGET_ADDRESS,
1242 grpc_slice_from_copied_string(t->peer_string));
1243 }
1244 closure->error_data.error =
1245 grpc_error_add_child(closure->error_data.error, error);
1246 }
1247 if (closure->next_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) {
1248 if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) ||
1249 !(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) {
1250 GRPC_CLOSURE_RUN(closure, closure->error_data.error);
1251 } else {
1252 grpc_closure_list_append(&t->run_after_write, closure,
1253 closure->error_data.error);
1254 }
1255 }
1256 }
1257
contains_non_ok_status(grpc_metadata_batch * batch)1258 static bool contains_non_ok_status(grpc_metadata_batch* batch) {
1259 if (batch->idx.named.grpc_status != nullptr) {
1260 return !grpc_mdelem_eq(batch->idx.named.grpc_status->md,
1261 GRPC_MDELEM_GRPC_STATUS_0);
1262 }
1263 return false;
1264 }
1265
maybe_become_writable_due_to_send_msg(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1266 static void maybe_become_writable_due_to_send_msg(grpc_chttp2_transport* t,
1267 grpc_chttp2_stream* s) {
1268 if (s->id != 0 && (!s->write_buffering ||
1269 s->flow_controlled_buffer.length > t->write_buffer_size)) {
1270 grpc_chttp2_mark_stream_writable(t, s);
1271 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE);
1272 }
1273 }
1274
add_fetched_slice_locked(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1275 static void add_fetched_slice_locked(grpc_chttp2_transport* t,
1276 grpc_chttp2_stream* s) {
1277 s->fetched_send_message_length +=
1278 static_cast<uint32_t> GRPC_SLICE_LENGTH(s->fetching_slice);
1279 grpc_slice_buffer_add(&s->flow_controlled_buffer, s->fetching_slice);
1280 maybe_become_writable_due_to_send_msg(t, s);
1281 }
1282
continue_fetching_send_locked(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1283 static void continue_fetching_send_locked(grpc_chttp2_transport* t,
1284 grpc_chttp2_stream* s) {
1285 for (;;) {
1286 if (s->fetching_send_message == nullptr) {
1287 /* Stream was cancelled before message fetch completed */
1288 abort(); /* TODO(ctiller): what cleanup here? */
1289 return; /* early out */
1290 }
1291 if (s->fetched_send_message_length == s->fetching_send_message->length()) {
1292 int64_t notify_offset = s->next_message_end_offset;
1293 if (notify_offset <= s->flow_controlled_bytes_written) {
1294 grpc_chttp2_complete_closure_step(
1295 t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE,
1296 "fetching_send_message_finished");
1297 } else {
1298 grpc_chttp2_write_cb* cb = t->write_cb_pool;
1299 if (cb == nullptr) {
1300 cb = static_cast<grpc_chttp2_write_cb*>(gpr_malloc(sizeof(*cb)));
1301 } else {
1302 t->write_cb_pool = cb->next;
1303 }
1304 cb->call_at_byte = notify_offset;
1305 cb->closure = s->fetching_send_message_finished;
1306 s->fetching_send_message_finished = nullptr;
1307 grpc_chttp2_write_cb** list =
1308 s->fetching_send_message->flags() & GRPC_WRITE_THROUGH
1309 ? &s->on_write_finished_cbs
1310 : &s->on_flow_controlled_cbs;
1311 cb->next = *list;
1312 *list = cb;
1313 }
1314 s->fetching_send_message.reset();
1315 return; /* early out */
1316 } else if (s->fetching_send_message->Next(UINT32_MAX,
1317 &s->complete_fetch_locked)) {
1318 grpc_error* error = s->fetching_send_message->Pull(&s->fetching_slice);
1319 if (error != GRPC_ERROR_NONE) {
1320 s->fetching_send_message.reset();
1321 grpc_chttp2_cancel_stream(t, s, error);
1322 } else {
1323 add_fetched_slice_locked(t, s);
1324 }
1325 }
1326 }
1327 }
1328
complete_fetch_locked(void * gs,grpc_error * error)1329 static void complete_fetch_locked(void* gs, grpc_error* error) {
1330 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(gs);
1331 grpc_chttp2_transport* t = s->t;
1332 if (error == GRPC_ERROR_NONE) {
1333 error = s->fetching_send_message->Pull(&s->fetching_slice);
1334 if (error == GRPC_ERROR_NONE) {
1335 add_fetched_slice_locked(t, s);
1336 continue_fetching_send_locked(t, s);
1337 }
1338 }
1339 if (error != GRPC_ERROR_NONE) {
1340 s->fetching_send_message.reset();
1341 grpc_chttp2_cancel_stream(t, s, error);
1342 }
1343 }
1344
do_nothing(void * arg,grpc_error * error)1345 static void do_nothing(void* arg, grpc_error* error) {}
1346
log_metadata(const grpc_metadata_batch * md_batch,uint32_t id,bool is_client,bool is_initial)1347 static void log_metadata(const grpc_metadata_batch* md_batch, uint32_t id,
1348 bool is_client, bool is_initial) {
1349 for (grpc_linked_mdelem* md = md_batch->list.head; md != nullptr;
1350 md = md->next) {
1351 char* key = grpc_slice_to_c_string(GRPC_MDKEY(md->md));
1352 char* value = grpc_slice_to_c_string(GRPC_MDVALUE(md->md));
1353 gpr_log(GPR_INFO, "HTTP:%d:%s:%s: %s: %s", id, is_initial ? "HDR" : "TRL",
1354 is_client ? "CLI" : "SVR", key, value);
1355 gpr_free(key);
1356 gpr_free(value);
1357 }
1358 }
1359
perform_stream_op_locked(void * stream_op,grpc_error * error_ignored)1360 static void perform_stream_op_locked(void* stream_op,
1361 grpc_error* error_ignored) {
1362 GPR_TIMER_SCOPE("perform_stream_op_locked", 0);
1363
1364 grpc_transport_stream_op_batch* op =
1365 static_cast<grpc_transport_stream_op_batch*>(stream_op);
1366 grpc_chttp2_stream* s =
1367 static_cast<grpc_chttp2_stream*>(op->handler_private.extra_arg);
1368 grpc_transport_stream_op_batch_payload* op_payload = op->payload;
1369 grpc_chttp2_transport* t = s->t;
1370
1371 GRPC_STATS_INC_HTTP2_OP_BATCHES();
1372
1373 if (grpc_http_trace.enabled()) {
1374 char* str = grpc_transport_stream_op_batch_string(op);
1375 gpr_log(GPR_INFO, "perform_stream_op_locked: %s; on_complete = %p", str,
1376 op->on_complete);
1377 gpr_free(str);
1378 if (op->send_initial_metadata) {
1379 log_metadata(op_payload->send_initial_metadata.send_initial_metadata,
1380 s->id, t->is_client, true);
1381 }
1382 if (op->send_trailing_metadata) {
1383 log_metadata(op_payload->send_trailing_metadata.send_trailing_metadata,
1384 s->id, t->is_client, false);
1385 }
1386 }
1387
1388 grpc_closure* on_complete = op->on_complete;
1389 // TODO(roth): This is a hack needed because we use data inside of the
1390 // closure itself to do the barrier calculation (i.e., to ensure that
1391 // we don't schedule the closure until all ops in the batch have been
1392 // completed). This can go away once we move to a new C++ closure API
1393 // that provides the ability to create a barrier closure.
1394 if (on_complete == nullptr) {
1395 on_complete = GRPC_CLOSURE_INIT(&op->handler_private.closure, do_nothing,
1396 nullptr, grpc_schedule_on_exec_ctx);
1397 }
1398
1399 /* use final_data as a barrier until enqueue time; the inital counter is
1400 dropped at the end of this function */
1401 on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT;
1402 on_complete->error_data.error = GRPC_ERROR_NONE;
1403
1404 if (op->cancel_stream) {
1405 GRPC_STATS_INC_HTTP2_OP_CANCEL();
1406 grpc_chttp2_cancel_stream(t, s, op_payload->cancel_stream.cancel_error);
1407 }
1408
1409 if (op->send_initial_metadata) {
1410 GRPC_STATS_INC_HTTP2_OP_SEND_INITIAL_METADATA();
1411 GPR_ASSERT(s->send_initial_metadata_finished == nullptr);
1412 on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
1413
1414 /* Identify stream compression */
1415 if (op_payload->send_initial_metadata.send_initial_metadata->idx.named
1416 .content_encoding == nullptr ||
1417 grpc_stream_compression_method_parse(
1418 GRPC_MDVALUE(
1419 op_payload->send_initial_metadata.send_initial_metadata->idx
1420 .named.content_encoding->md),
1421 true, &s->stream_compression_method) == 0) {
1422 s->stream_compression_method = GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS;
1423 }
1424
1425 s->send_initial_metadata_finished = add_closure_barrier(on_complete);
1426 s->send_initial_metadata =
1427 op_payload->send_initial_metadata.send_initial_metadata;
1428 const size_t metadata_size =
1429 grpc_metadata_batch_size(s->send_initial_metadata);
1430 const size_t metadata_peer_limit =
1431 t->settings[GRPC_PEER_SETTINGS]
1432 [GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE];
1433 if (t->is_client) {
1434 s->deadline = GPR_MIN(s->deadline, s->send_initial_metadata->deadline);
1435 }
1436 if (metadata_size > metadata_peer_limit) {
1437 grpc_chttp2_cancel_stream(
1438 t, s,
1439 grpc_error_set_int(
1440 grpc_error_set_int(
1441 grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1442 "to-be-sent initial metadata size "
1443 "exceeds peer limit"),
1444 GRPC_ERROR_INT_SIZE,
1445 static_cast<intptr_t>(metadata_size)),
1446 GRPC_ERROR_INT_LIMIT,
1447 static_cast<intptr_t>(metadata_peer_limit)),
1448 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED));
1449 } else {
1450 if (contains_non_ok_status(s->send_initial_metadata)) {
1451 s->seen_error = true;
1452 }
1453 if (!s->write_closed) {
1454 if (t->is_client) {
1455 if (t->closed_with_error == GRPC_ERROR_NONE) {
1456 GPR_ASSERT(s->id == 0);
1457 grpc_chttp2_list_add_waiting_for_concurrency(t, s);
1458 maybe_start_some_streams(t);
1459 } else {
1460 grpc_chttp2_cancel_stream(
1461 t, s,
1462 grpc_error_set_int(
1463 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1464 "Transport closed", &t->closed_with_error, 1),
1465 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1466 }
1467 } else {
1468 GPR_ASSERT(s->id != 0);
1469 grpc_chttp2_mark_stream_writable(t, s);
1470 if (!(op->send_message &&
1471 (op->payload->send_message.send_message->flags() &
1472 GRPC_WRITE_BUFFER_HINT))) {
1473 grpc_chttp2_initiate_write(
1474 t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA);
1475 }
1476 }
1477 } else {
1478 s->send_initial_metadata = nullptr;
1479 grpc_chttp2_complete_closure_step(
1480 t, s, &s->send_initial_metadata_finished,
1481 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1482 "Attempt to send initial metadata after stream was closed",
1483 &s->write_closed_error, 1),
1484 "send_initial_metadata_finished");
1485 }
1486 }
1487 if (op_payload->send_initial_metadata.peer_string != nullptr) {
1488 gpr_atm_rel_store(op_payload->send_initial_metadata.peer_string,
1489 (gpr_atm)t->peer_string);
1490 }
1491 }
1492
1493 if (op->send_message) {
1494 GRPC_STATS_INC_HTTP2_OP_SEND_MESSAGE();
1495 GRPC_STATS_INC_HTTP2_SEND_MESSAGE_SIZE(
1496 op->payload->send_message.send_message->length());
1497 on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
1498 s->fetching_send_message_finished = add_closure_barrier(op->on_complete);
1499 if (s->write_closed) {
1500 // Return an error unless the client has already received trailing
1501 // metadata from the server, since an application using a
1502 // streaming call might send another message before getting a
1503 // recv_message failure, breaking out of its loop, and then
1504 // starting recv_trailing_metadata.
1505 op->payload->send_message.send_message.reset();
1506 grpc_chttp2_complete_closure_step(
1507 t, s, &s->fetching_send_message_finished,
1508 t->is_client && s->received_trailing_metadata
1509 ? GRPC_ERROR_NONE
1510 : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1511 "Attempt to send message after stream was closed",
1512 &s->write_closed_error, 1),
1513 "fetching_send_message_finished");
1514 } else {
1515 GPR_ASSERT(s->fetching_send_message == nullptr);
1516 uint8_t* frame_hdr = grpc_slice_buffer_tiny_add(
1517 &s->flow_controlled_buffer, GRPC_HEADER_SIZE_IN_BYTES);
1518 uint32_t flags = op_payload->send_message.send_message->flags();
1519 frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0;
1520 size_t len = op_payload->send_message.send_message->length();
1521 frame_hdr[1] = static_cast<uint8_t>(len >> 24);
1522 frame_hdr[2] = static_cast<uint8_t>(len >> 16);
1523 frame_hdr[3] = static_cast<uint8_t>(len >> 8);
1524 frame_hdr[4] = static_cast<uint8_t>(len);
1525 s->fetching_send_message =
1526 std::move(op_payload->send_message.send_message);
1527 s->fetched_send_message_length = 0;
1528 s->next_message_end_offset =
1529 s->flow_controlled_bytes_written +
1530 static_cast<int64_t>(s->flow_controlled_buffer.length) +
1531 static_cast<int64_t>(len);
1532 if (flags & GRPC_WRITE_BUFFER_HINT) {
1533 s->next_message_end_offset -= t->write_buffer_size;
1534 s->write_buffering = true;
1535 } else {
1536 s->write_buffering = false;
1537 }
1538 continue_fetching_send_locked(t, s);
1539 maybe_become_writable_due_to_send_msg(t, s);
1540 }
1541 }
1542
1543 if (op->send_trailing_metadata) {
1544 GRPC_STATS_INC_HTTP2_OP_SEND_TRAILING_METADATA();
1545 GPR_ASSERT(s->send_trailing_metadata_finished == nullptr);
1546 on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
1547 s->send_trailing_metadata_finished = add_closure_barrier(on_complete);
1548 s->send_trailing_metadata =
1549 op_payload->send_trailing_metadata.send_trailing_metadata;
1550 s->write_buffering = false;
1551 const size_t metadata_size =
1552 grpc_metadata_batch_size(s->send_trailing_metadata);
1553 const size_t metadata_peer_limit =
1554 t->settings[GRPC_PEER_SETTINGS]
1555 [GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE];
1556 if (metadata_size > metadata_peer_limit) {
1557 grpc_chttp2_cancel_stream(
1558 t, s,
1559 grpc_error_set_int(
1560 grpc_error_set_int(
1561 grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1562 "to-be-sent trailing metadata size "
1563 "exceeds peer limit"),
1564 GRPC_ERROR_INT_SIZE,
1565 static_cast<intptr_t>(metadata_size)),
1566 GRPC_ERROR_INT_LIMIT,
1567 static_cast<intptr_t>(metadata_peer_limit)),
1568 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED));
1569 } else {
1570 if (contains_non_ok_status(s->send_trailing_metadata)) {
1571 s->seen_error = true;
1572 }
1573 if (s->write_closed) {
1574 s->send_trailing_metadata = nullptr;
1575 grpc_chttp2_complete_closure_step(
1576 t, s, &s->send_trailing_metadata_finished,
1577 grpc_metadata_batch_is_empty(
1578 op->payload->send_trailing_metadata.send_trailing_metadata)
1579 ? GRPC_ERROR_NONE
1580 : GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1581 "Attempt to send trailing metadata after "
1582 "stream was closed"),
1583 "send_trailing_metadata_finished");
1584 } else if (s->id != 0) {
1585 /* TODO(ctiller): check if there's flow control for any outstanding
1586 bytes before going writable */
1587 grpc_chttp2_mark_stream_writable(t, s);
1588 grpc_chttp2_initiate_write(
1589 t, GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA);
1590 }
1591 }
1592 }
1593
1594 if (op->recv_initial_metadata) {
1595 GRPC_STATS_INC_HTTP2_OP_RECV_INITIAL_METADATA();
1596 GPR_ASSERT(s->recv_initial_metadata_ready == nullptr);
1597 s->recv_initial_metadata_ready =
1598 op_payload->recv_initial_metadata.recv_initial_metadata_ready;
1599 s->recv_initial_metadata =
1600 op_payload->recv_initial_metadata.recv_initial_metadata;
1601 s->trailing_metadata_available =
1602 op_payload->recv_initial_metadata.trailing_metadata_available;
1603 if (op_payload->recv_initial_metadata.peer_string != nullptr) {
1604 gpr_atm_rel_store(op_payload->recv_initial_metadata.peer_string,
1605 (gpr_atm)t->peer_string);
1606 }
1607 grpc_chttp2_maybe_complete_recv_initial_metadata(t, s);
1608 }
1609
1610 if (op->recv_message) {
1611 GRPC_STATS_INC_HTTP2_OP_RECV_MESSAGE();
1612 size_t before = 0;
1613 GPR_ASSERT(s->recv_message_ready == nullptr);
1614 GPR_ASSERT(!s->pending_byte_stream);
1615 s->recv_message_ready = op_payload->recv_message.recv_message_ready;
1616 s->recv_message = op_payload->recv_message.recv_message;
1617 if (s->id != 0) {
1618 if (!s->read_closed) {
1619 before = s->frame_storage.length +
1620 s->unprocessed_incoming_frames_buffer.length;
1621 }
1622 }
1623 grpc_chttp2_maybe_complete_recv_message(t, s);
1624 if (s->id != 0) {
1625 if (!s->read_closed && s->frame_storage.length == 0) {
1626 size_t after = s->frame_storage.length +
1627 s->unprocessed_incoming_frames_buffer_cached_length;
1628 s->flow_control->IncomingByteStreamUpdate(GRPC_HEADER_SIZE_IN_BYTES,
1629 before - after);
1630 grpc_chttp2_act_on_flowctl_action(s->flow_control->MakeAction(), t, s);
1631 }
1632 }
1633 }
1634
1635 if (op->recv_trailing_metadata) {
1636 GRPC_STATS_INC_HTTP2_OP_RECV_TRAILING_METADATA();
1637 GPR_ASSERT(s->collecting_stats == nullptr);
1638 s->collecting_stats = op_payload->recv_trailing_metadata.collect_stats;
1639 GPR_ASSERT(s->recv_trailing_metadata_finished == nullptr);
1640 s->recv_trailing_metadata_finished =
1641 op_payload->recv_trailing_metadata.recv_trailing_metadata_ready;
1642 s->recv_trailing_metadata =
1643 op_payload->recv_trailing_metadata.recv_trailing_metadata;
1644 s->final_metadata_requested = true;
1645 grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
1646 }
1647
1648 grpc_chttp2_complete_closure_step(t, s, &on_complete, GRPC_ERROR_NONE,
1649 "op->on_complete");
1650
1651 GRPC_CHTTP2_STREAM_UNREF(s, "perform_stream_op");
1652 }
1653
perform_stream_op(grpc_transport * gt,grpc_stream * gs,grpc_transport_stream_op_batch * op)1654 static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
1655 grpc_transport_stream_op_batch* op) {
1656 GPR_TIMER_SCOPE("perform_stream_op", 0);
1657 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
1658 grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs);
1659
1660 if (!t->is_client) {
1661 if (op->send_initial_metadata) {
1662 grpc_millis deadline =
1663 op->payload->send_initial_metadata.send_initial_metadata->deadline;
1664 GPR_ASSERT(deadline == GRPC_MILLIS_INF_FUTURE);
1665 }
1666 if (op->send_trailing_metadata) {
1667 grpc_millis deadline =
1668 op->payload->send_trailing_metadata.send_trailing_metadata->deadline;
1669 GPR_ASSERT(deadline == GRPC_MILLIS_INF_FUTURE);
1670 }
1671 }
1672
1673 if (grpc_http_trace.enabled()) {
1674 char* str = grpc_transport_stream_op_batch_string(op);
1675 gpr_log(GPR_INFO, "perform_stream_op[s=%p]: %s", s, str);
1676 gpr_free(str);
1677 }
1678
1679 op->handler_private.extra_arg = gs;
1680 GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op");
1681 GRPC_CLOSURE_SCHED(
1682 GRPC_CLOSURE_INIT(&op->handler_private.closure, perform_stream_op_locked,
1683 op, grpc_combiner_scheduler(t->combiner)),
1684 GRPC_ERROR_NONE);
1685 }
1686
cancel_pings(grpc_chttp2_transport * t,grpc_error * error)1687 static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) {
1688 /* callback remaining pings: they're not allowed to call into the transpot,
1689 and maybe they hold resources that need to be freed */
1690 grpc_chttp2_ping_queue* pq = &t->ping_queue;
1691 GPR_ASSERT(error != GRPC_ERROR_NONE);
1692 for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) {
1693 grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error));
1694 GRPC_CLOSURE_LIST_SCHED(&pq->lists[j]);
1695 }
1696 GRPC_ERROR_UNREF(error);
1697 }
1698
send_ping_locked(grpc_chttp2_transport * t,grpc_closure * on_initiate,grpc_closure * on_ack)1699 static void send_ping_locked(grpc_chttp2_transport* t,
1700 grpc_closure* on_initiate, grpc_closure* on_ack) {
1701 if (t->closed_with_error != GRPC_ERROR_NONE) {
1702 GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_REF(t->closed_with_error));
1703 GRPC_CLOSURE_SCHED(on_ack, GRPC_ERROR_REF(t->closed_with_error));
1704 return;
1705 }
1706 grpc_chttp2_ping_queue* pq = &t->ping_queue;
1707 grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE], on_initiate,
1708 GRPC_ERROR_NONE);
1709 grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT], on_ack,
1710 GRPC_ERROR_NONE);
1711 }
1712
1713 /*
1714 * Specialized form of send_ping_locked for keepalive ping. If there is already
1715 * a ping in progress, the keepalive ping would piggyback onto that ping,
1716 * instead of waiting for that ping to complete and then starting a new ping.
1717 */
send_keepalive_ping_locked(grpc_chttp2_transport * t)1718 static void send_keepalive_ping_locked(grpc_chttp2_transport* t) {
1719 if (t->closed_with_error != GRPC_ERROR_NONE) {
1720 GRPC_CLOSURE_RUN(&t->start_keepalive_ping_locked,
1721 GRPC_ERROR_REF(t->closed_with_error));
1722 GRPC_CLOSURE_RUN(&t->finish_keepalive_ping_locked,
1723 GRPC_ERROR_REF(t->closed_with_error));
1724 return;
1725 }
1726 grpc_chttp2_ping_queue* pq = &t->ping_queue;
1727 if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) {
1728 /* There is a ping in flight. Add yourself to the inflight closure list. */
1729 GRPC_CLOSURE_RUN(&t->start_keepalive_ping_locked, GRPC_ERROR_NONE);
1730 grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT],
1731 &t->finish_keepalive_ping_locked, GRPC_ERROR_NONE);
1732 return;
1733 }
1734 grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE],
1735 &t->start_keepalive_ping_locked, GRPC_ERROR_NONE);
1736 grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT],
1737 &t->finish_keepalive_ping_locked, GRPC_ERROR_NONE);
1738 }
1739
retry_initiate_ping_locked(void * tp,grpc_error * error)1740 static void retry_initiate_ping_locked(void* tp, grpc_error* error) {
1741 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
1742 t->ping_state.is_delayed_ping_timer_set = false;
1743 if (error == GRPC_ERROR_NONE) {
1744 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING);
1745 }
1746 GRPC_CHTTP2_UNREF_TRANSPORT(t, "retry_initiate_ping_locked");
1747 }
1748
grpc_chttp2_ack_ping(grpc_chttp2_transport * t,uint64_t id)1749 void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) {
1750 grpc_chttp2_ping_queue* pq = &t->ping_queue;
1751 if (pq->inflight_id != id) {
1752 char* from = grpc_endpoint_get_peer(t->ep);
1753 gpr_log(GPR_DEBUG, "Unknown ping response from %s: %" PRIx64, from, id);
1754 gpr_free(from);
1755 return;
1756 }
1757 GRPC_CLOSURE_LIST_SCHED(&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
1758 if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
1759 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS);
1760 }
1761 }
1762
send_goaway(grpc_chttp2_transport * t,grpc_error * error)1763 static void send_goaway(grpc_chttp2_transport* t, grpc_error* error) {
1764 t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED;
1765 grpc_http2_error_code http_error;
1766 grpc_slice slice;
1767 grpc_error_get_status(error, GRPC_MILLIS_INF_FUTURE, nullptr, &slice,
1768 &http_error, nullptr);
1769 grpc_chttp2_goaway_append(t->last_new_stream_id,
1770 static_cast<uint32_t>(http_error),
1771 grpc_slice_ref_internal(slice), &t->qbuf);
1772 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT);
1773 GRPC_ERROR_UNREF(error);
1774 }
1775
grpc_chttp2_add_ping_strike(grpc_chttp2_transport * t)1776 void grpc_chttp2_add_ping_strike(grpc_chttp2_transport* t) {
1777 if (++t->ping_recv_state.ping_strikes > t->ping_policy.max_ping_strikes &&
1778 t->ping_policy.max_ping_strikes != 0) {
1779 send_goaway(t,
1780 grpc_error_set_int(
1781 GRPC_ERROR_CREATE_FROM_STATIC_STRING("too_many_pings"),
1782 GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM));
1783 /*The transport will be closed after the write is done */
1784 close_transport_locked(
1785 t, grpc_error_set_int(
1786 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many pings"),
1787 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1788 }
1789 }
1790
perform_transport_op_locked(void * stream_op,grpc_error * error_ignored)1791 static void perform_transport_op_locked(void* stream_op,
1792 grpc_error* error_ignored) {
1793 grpc_transport_op* op = static_cast<grpc_transport_op*>(stream_op);
1794 grpc_chttp2_transport* t =
1795 static_cast<grpc_chttp2_transport*>(op->handler_private.extra_arg);
1796
1797 if (op->goaway_error) {
1798 send_goaway(t, op->goaway_error);
1799 }
1800
1801 if (op->set_accept_stream) {
1802 t->channel_callback.accept_stream = op->set_accept_stream_fn;
1803 t->channel_callback.accept_stream_user_data =
1804 op->set_accept_stream_user_data;
1805 }
1806
1807 if (op->bind_pollset) {
1808 grpc_endpoint_add_to_pollset(t->ep, op->bind_pollset);
1809 }
1810
1811 if (op->bind_pollset_set) {
1812 grpc_endpoint_add_to_pollset_set(t->ep, op->bind_pollset_set);
1813 }
1814
1815 if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
1816 send_ping_locked(t, op->send_ping.on_initiate, op->send_ping.on_ack);
1817 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING);
1818 }
1819
1820 if (op->on_connectivity_state_change != nullptr) {
1821 grpc_connectivity_state_notify_on_state_change(
1822 &t->channel_callback.state_tracker, op->connectivity_state,
1823 op->on_connectivity_state_change);
1824 }
1825
1826 if (op->disconnect_with_error != GRPC_ERROR_NONE) {
1827 close_transport_locked(t, op->disconnect_with_error);
1828 }
1829
1830 GRPC_CLOSURE_RUN(op->on_consumed, GRPC_ERROR_NONE);
1831
1832 GRPC_CHTTP2_UNREF_TRANSPORT(t, "transport_op");
1833 }
1834
perform_transport_op(grpc_transport * gt,grpc_transport_op * op)1835 static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
1836 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
1837 if (grpc_http_trace.enabled()) {
1838 char* msg = grpc_transport_op_string(op);
1839 gpr_log(GPR_INFO, "perform_transport_op[t=%p]: %s", t, msg);
1840 gpr_free(msg);
1841 }
1842 op->handler_private.extra_arg = gt;
1843 GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op");
1844 GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&op->handler_private.closure,
1845 perform_transport_op_locked, op,
1846 grpc_combiner_scheduler(t->combiner)),
1847 GRPC_ERROR_NONE);
1848 }
1849
1850 /*******************************************************************************
1851 * INPUT PROCESSING - GENERAL
1852 */
1853
grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1854 void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport* t,
1855 grpc_chttp2_stream* s) {
1856 if (s->recv_initial_metadata_ready != nullptr &&
1857 s->published_metadata[0] != GRPC_METADATA_NOT_PUBLISHED) {
1858 if (s->seen_error) {
1859 grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1860 if (!s->pending_byte_stream) {
1861 grpc_slice_buffer_reset_and_unref_internal(
1862 &s->unprocessed_incoming_frames_buffer);
1863 }
1864 }
1865 grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[0],
1866 s->recv_initial_metadata);
1867 null_then_run_closure(&s->recv_initial_metadata_ready, GRPC_ERROR_NONE);
1868 }
1869 }
1870
grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1871 void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t,
1872 grpc_chttp2_stream* s) {
1873 grpc_error* error = GRPC_ERROR_NONE;
1874 if (s->recv_message_ready != nullptr) {
1875 *s->recv_message = nullptr;
1876 if (s->final_metadata_requested && s->seen_error) {
1877 grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1878 if (!s->pending_byte_stream) {
1879 grpc_slice_buffer_reset_and_unref_internal(
1880 &s->unprocessed_incoming_frames_buffer);
1881 }
1882 }
1883 if (!s->pending_byte_stream) {
1884 while (s->unprocessed_incoming_frames_buffer.length > 0 ||
1885 s->frame_storage.length > 0) {
1886 if (s->unprocessed_incoming_frames_buffer.length == 0) {
1887 grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer,
1888 &s->frame_storage);
1889 s->unprocessed_incoming_frames_decompressed = false;
1890 }
1891 if (!s->unprocessed_incoming_frames_decompressed &&
1892 s->stream_decompression_method !=
1893 GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
1894 GPR_ASSERT(s->decompressed_data_buffer.length == 0);
1895 bool end_of_context;
1896 if (!s->stream_decompression_ctx) {
1897 s->stream_decompression_ctx =
1898 grpc_stream_compression_context_create(
1899 s->stream_decompression_method);
1900 }
1901 if (!grpc_stream_decompress(
1902 s->stream_decompression_ctx,
1903 &s->unprocessed_incoming_frames_buffer,
1904 &s->decompressed_data_buffer, nullptr,
1905 GRPC_HEADER_SIZE_IN_BYTES - s->decompressed_header_bytes,
1906 &end_of_context)) {
1907 grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1908 grpc_slice_buffer_reset_and_unref_internal(
1909 &s->unprocessed_incoming_frames_buffer);
1910 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1911 "Stream decompression error.");
1912 } else {
1913 s->decompressed_header_bytes += s->decompressed_data_buffer.length;
1914 if (s->decompressed_header_bytes == GRPC_HEADER_SIZE_IN_BYTES) {
1915 s->decompressed_header_bytes = 0;
1916 }
1917 error = grpc_deframe_unprocessed_incoming_frames(
1918 &s->data_parser, s, &s->decompressed_data_buffer, nullptr,
1919 s->recv_message);
1920 if (end_of_context) {
1921 grpc_stream_compression_context_destroy(
1922 s->stream_decompression_ctx);
1923 s->stream_decompression_ctx = nullptr;
1924 }
1925 }
1926 } else {
1927 error = grpc_deframe_unprocessed_incoming_frames(
1928 &s->data_parser, s, &s->unprocessed_incoming_frames_buffer,
1929 nullptr, s->recv_message);
1930 }
1931 if (error != GRPC_ERROR_NONE) {
1932 s->seen_error = true;
1933 grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1934 grpc_slice_buffer_reset_and_unref_internal(
1935 &s->unprocessed_incoming_frames_buffer);
1936 break;
1937 } else if (*s->recv_message != nullptr) {
1938 break;
1939 }
1940 }
1941 }
1942 // save the length of the buffer before handing control back to application
1943 // threads. Needed to support correct flow control bookkeeping
1944 s->unprocessed_incoming_frames_buffer_cached_length =
1945 s->unprocessed_incoming_frames_buffer.length;
1946 if (error == GRPC_ERROR_NONE && *s->recv_message != nullptr) {
1947 null_then_run_closure(&s->recv_message_ready, GRPC_ERROR_NONE);
1948 } else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) {
1949 *s->recv_message = nullptr;
1950 null_then_run_closure(&s->recv_message_ready, GRPC_ERROR_NONE);
1951 }
1952 GRPC_ERROR_UNREF(error);
1953 }
1954 }
1955
grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1956 void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
1957 grpc_chttp2_stream* s) {
1958 grpc_chttp2_maybe_complete_recv_message(t, s);
1959 if (s->recv_trailing_metadata_finished != nullptr && s->read_closed &&
1960 s->write_closed) {
1961 if (s->seen_error || !t->is_client) {
1962 grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1963 if (!s->pending_byte_stream) {
1964 grpc_slice_buffer_reset_and_unref_internal(
1965 &s->unprocessed_incoming_frames_buffer);
1966 }
1967 }
1968 bool pending_data = s->pending_byte_stream ||
1969 s->unprocessed_incoming_frames_buffer.length > 0;
1970 if (s->read_closed && s->frame_storage.length > 0 && !pending_data &&
1971 !s->seen_error && s->recv_trailing_metadata_finished != nullptr) {
1972 /* Maybe some SYNC_FLUSH data is left in frame_storage. Consume them and
1973 * maybe decompress the next 5 bytes in the stream. */
1974 bool end_of_context;
1975 if (!s->stream_decompression_ctx) {
1976 s->stream_decompression_ctx = grpc_stream_compression_context_create(
1977 s->stream_decompression_method);
1978 }
1979 if (!grpc_stream_decompress(
1980 s->stream_decompression_ctx, &s->frame_storage,
1981 &s->unprocessed_incoming_frames_buffer, nullptr,
1982 GRPC_HEADER_SIZE_IN_BYTES, &end_of_context)) {
1983 grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1984 grpc_slice_buffer_reset_and_unref_internal(
1985 &s->unprocessed_incoming_frames_buffer);
1986 s->seen_error = true;
1987 } else {
1988 if (s->unprocessed_incoming_frames_buffer.length > 0) {
1989 s->unprocessed_incoming_frames_decompressed = true;
1990 pending_data = true;
1991 }
1992 if (end_of_context) {
1993 grpc_stream_compression_context_destroy(s->stream_decompression_ctx);
1994 s->stream_decompression_ctx = nullptr;
1995 }
1996 }
1997 }
1998 if (s->read_closed && s->frame_storage.length == 0 && !pending_data &&
1999 s->recv_trailing_metadata_finished != nullptr) {
2000 grpc_transport_move_stats(&s->stats, s->collecting_stats);
2001 s->collecting_stats = nullptr;
2002 grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[1],
2003 s->recv_trailing_metadata);
2004 null_then_run_closure(&s->recv_trailing_metadata_finished,
2005 GRPC_ERROR_NONE);
2006 }
2007 }
2008 }
2009
remove_stream(grpc_chttp2_transport * t,uint32_t id,grpc_error * error)2010 static void remove_stream(grpc_chttp2_transport* t, uint32_t id,
2011 grpc_error* error) {
2012 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(
2013 grpc_chttp2_stream_map_delete(&t->stream_map, id));
2014 GPR_ASSERT(s);
2015 if (t->incoming_stream == s) {
2016 t->incoming_stream = nullptr;
2017 grpc_chttp2_parsing_become_skip_parser(t);
2018 }
2019 if (s->pending_byte_stream) {
2020 if (s->on_next != nullptr) {
2021 grpc_core::Chttp2IncomingByteStream* bs = s->data_parser.parsing_frame;
2022 if (error == GRPC_ERROR_NONE) {
2023 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
2024 }
2025 bs->PublishError(error);
2026 bs->Unref();
2027 s->data_parser.parsing_frame = nullptr;
2028 } else {
2029 GRPC_ERROR_UNREF(s->byte_stream_error);
2030 s->byte_stream_error = GRPC_ERROR_REF(error);
2031 }
2032 }
2033
2034 if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
2035 post_benign_reclaimer(t);
2036 if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SENT) {
2037 close_transport_locked(
2038 t, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2039 "Last stream closed after sending GOAWAY", &error, 1));
2040 }
2041 }
2042 if (grpc_chttp2_list_remove_writable_stream(t, s)) {
2043 GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:remove_stream");
2044 }
2045
2046 GRPC_ERROR_UNREF(error);
2047
2048 maybe_start_some_streams(t);
2049 }
2050
grpc_chttp2_cancel_stream(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error * due_to_error)2051 void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2052 grpc_error* due_to_error) {
2053 if (!t->is_client && !s->sent_trailing_metadata &&
2054 grpc_error_has_clear_grpc_status(due_to_error)) {
2055 close_from_api(t, s, due_to_error);
2056 return;
2057 }
2058
2059 if (!s->read_closed || !s->write_closed) {
2060 if (s->id != 0) {
2061 grpc_http2_error_code http_error;
2062 grpc_error_get_status(due_to_error, s->deadline, nullptr, nullptr,
2063 &http_error, nullptr);
2064 grpc_slice_buffer_add(
2065 &t->qbuf,
2066 grpc_chttp2_rst_stream_create(
2067 s->id, static_cast<uint32_t>(http_error), &s->stats.outgoing));
2068 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
2069 }
2070 }
2071 if (due_to_error != GRPC_ERROR_NONE && !s->seen_error) {
2072 s->seen_error = true;
2073 }
2074 grpc_chttp2_mark_stream_closed(t, s, 1, 1, due_to_error);
2075 }
2076
grpc_chttp2_fake_status(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error * error)2077 void grpc_chttp2_fake_status(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2078 grpc_error* error) {
2079 grpc_status_code status;
2080 grpc_slice slice;
2081 grpc_error_get_status(error, s->deadline, &status, &slice, nullptr, nullptr);
2082 if (status != GRPC_STATUS_OK) {
2083 s->seen_error = true;
2084 }
2085 /* stream_global->recv_trailing_metadata_finished gives us a
2086 last chance replacement: we've received trailing metadata,
2087 but something more important has become available to signal
2088 to the upper layers - drop what we've got, and then publish
2089 what we want - which is safe because we haven't told anyone
2090 about the metadata yet */
2091 if (s->published_metadata[1] == GRPC_METADATA_NOT_PUBLISHED ||
2092 s->recv_trailing_metadata_finished != nullptr) {
2093 char status_string[GPR_LTOA_MIN_BUFSIZE];
2094 gpr_ltoa(status, status_string);
2095 GRPC_LOG_IF_ERROR("add_status",
2096 grpc_chttp2_incoming_metadata_buffer_replace_or_add(
2097 &s->metadata_buffer[1],
2098 grpc_mdelem_from_slices(
2099 GRPC_MDSTR_GRPC_STATUS,
2100 grpc_slice_from_copied_string(status_string))));
2101 if (!GRPC_SLICE_IS_EMPTY(slice)) {
2102 GRPC_LOG_IF_ERROR(
2103 "add_status_message",
2104 grpc_chttp2_incoming_metadata_buffer_replace_or_add(
2105 &s->metadata_buffer[1],
2106 grpc_mdelem_from_slices(GRPC_MDSTR_GRPC_MESSAGE,
2107 grpc_slice_ref_internal(slice))));
2108 }
2109 s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE;
2110 grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2111 }
2112
2113 GRPC_ERROR_UNREF(error);
2114 }
2115
add_error(grpc_error * error,grpc_error ** refs,size_t * nrefs)2116 static void add_error(grpc_error* error, grpc_error** refs, size_t* nrefs) {
2117 if (error == GRPC_ERROR_NONE) return;
2118 for (size_t i = 0; i < *nrefs; i++) {
2119 if (error == refs[i]) {
2120 return;
2121 }
2122 }
2123 refs[*nrefs] = error;
2124 ++*nrefs;
2125 }
2126
removal_error(grpc_error * extra_error,grpc_chttp2_stream * s,const char * master_error_msg)2127 static grpc_error* removal_error(grpc_error* extra_error, grpc_chttp2_stream* s,
2128 const char* master_error_msg) {
2129 grpc_error* refs[3];
2130 size_t nrefs = 0;
2131 add_error(s->read_closed_error, refs, &nrefs);
2132 add_error(s->write_closed_error, refs, &nrefs);
2133 add_error(extra_error, refs, &nrefs);
2134 grpc_error* error = GRPC_ERROR_NONE;
2135 if (nrefs > 0) {
2136 error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(master_error_msg,
2137 refs, nrefs);
2138 }
2139 GRPC_ERROR_UNREF(extra_error);
2140 return error;
2141 }
2142
flush_write_list(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_chttp2_write_cb ** list,grpc_error * error)2143 static void flush_write_list(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2144 grpc_chttp2_write_cb** list, grpc_error* error) {
2145 while (*list) {
2146 grpc_chttp2_write_cb* cb = *list;
2147 *list = cb->next;
2148 grpc_chttp2_complete_closure_step(t, s, &cb->closure, GRPC_ERROR_REF(error),
2149 "on_write_finished_cb");
2150 cb->next = t->write_cb_pool;
2151 t->write_cb_pool = cb;
2152 }
2153 GRPC_ERROR_UNREF(error);
2154 }
2155
grpc_chttp2_fail_pending_writes(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error * error)2156 void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t,
2157 grpc_chttp2_stream* s, grpc_error* error) {
2158 error =
2159 removal_error(error, s, "Pending writes failed due to stream closure");
2160 s->send_initial_metadata = nullptr;
2161 grpc_chttp2_complete_closure_step(t, s, &s->send_initial_metadata_finished,
2162 GRPC_ERROR_REF(error),
2163 "send_initial_metadata_finished");
2164
2165 s->send_trailing_metadata = nullptr;
2166 grpc_chttp2_complete_closure_step(t, s, &s->send_trailing_metadata_finished,
2167 GRPC_ERROR_REF(error),
2168 "send_trailing_metadata_finished");
2169
2170 s->fetching_send_message.reset();
2171 grpc_chttp2_complete_closure_step(t, s, &s->fetching_send_message_finished,
2172 GRPC_ERROR_REF(error),
2173 "fetching_send_message_finished");
2174 flush_write_list(t, s, &s->on_write_finished_cbs, GRPC_ERROR_REF(error));
2175 flush_write_list(t, s, &s->on_flow_controlled_cbs, error);
2176 }
2177
grpc_chttp2_mark_stream_closed(grpc_chttp2_transport * t,grpc_chttp2_stream * s,int close_reads,int close_writes,grpc_error * error)2178 void grpc_chttp2_mark_stream_closed(grpc_chttp2_transport* t,
2179 grpc_chttp2_stream* s, int close_reads,
2180 int close_writes, grpc_error* error) {
2181 if (s->read_closed && s->write_closed) {
2182 /* already closed */
2183 grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2184 GRPC_ERROR_UNREF(error);
2185 return;
2186 }
2187 bool closed_read = false;
2188 bool became_closed = false;
2189 if (close_reads && !s->read_closed) {
2190 s->read_closed_error = GRPC_ERROR_REF(error);
2191 s->read_closed = true;
2192 closed_read = true;
2193 }
2194 if (close_writes && !s->write_closed) {
2195 s->write_closed_error = GRPC_ERROR_REF(error);
2196 s->write_closed = true;
2197 grpc_chttp2_fail_pending_writes(t, s, GRPC_ERROR_REF(error));
2198 }
2199 if (s->read_closed && s->write_closed) {
2200 became_closed = true;
2201 grpc_error* overall_error =
2202 removal_error(GRPC_ERROR_REF(error), s, "Stream removed");
2203 if (s->id != 0) {
2204 remove_stream(t, s->id, GRPC_ERROR_REF(overall_error));
2205 } else {
2206 /* Purge streams waiting on concurrency still waiting for id assignment */
2207 grpc_chttp2_list_remove_waiting_for_concurrency(t, s);
2208 }
2209 if (overall_error != GRPC_ERROR_NONE) {
2210 grpc_chttp2_fake_status(t, s, overall_error);
2211 }
2212 }
2213 if (closed_read) {
2214 for (int i = 0; i < 2; i++) {
2215 if (s->published_metadata[i] == GRPC_METADATA_NOT_PUBLISHED) {
2216 s->published_metadata[i] = GPRC_METADATA_PUBLISHED_AT_CLOSE;
2217 }
2218 }
2219 grpc_chttp2_maybe_complete_recv_initial_metadata(t, s);
2220 grpc_chttp2_maybe_complete_recv_message(t, s);
2221 }
2222 if (became_closed) {
2223 grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2224 GRPC_CHTTP2_STREAM_UNREF(s, "chttp2");
2225 }
2226 GRPC_ERROR_UNREF(error);
2227 }
2228
close_from_api(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error * error)2229 static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2230 grpc_error* error) {
2231 grpc_slice hdr;
2232 grpc_slice status_hdr;
2233 grpc_slice http_status_hdr;
2234 grpc_slice content_type_hdr;
2235 grpc_slice message_pfx;
2236 uint8_t* p;
2237 uint32_t len = 0;
2238 grpc_status_code grpc_status;
2239 grpc_slice slice;
2240 grpc_error_get_status(error, s->deadline, &grpc_status, &slice, nullptr,
2241 nullptr);
2242
2243 GPR_ASSERT(grpc_status >= 0 && (int)grpc_status < 100);
2244
2245 /* Hand roll a header block.
2246 This is unnecessarily ugly - at some point we should find a more
2247 elegant solution.
2248 It's complicated by the fact that our send machinery would be dead by
2249 the time we got around to sending this, so instead we ignore HPACK
2250 compression and just write the uncompressed bytes onto the wire. */
2251 if (!s->sent_initial_metadata) {
2252 http_status_hdr = GRPC_SLICE_MALLOC(13);
2253 p = GRPC_SLICE_START_PTR(http_status_hdr);
2254 *p++ = 0x00;
2255 *p++ = 7;
2256 *p++ = ':';
2257 *p++ = 's';
2258 *p++ = 't';
2259 *p++ = 'a';
2260 *p++ = 't';
2261 *p++ = 'u';
2262 *p++ = 's';
2263 *p++ = 3;
2264 *p++ = '2';
2265 *p++ = '0';
2266 *p++ = '0';
2267 GPR_ASSERT(p == GRPC_SLICE_END_PTR(http_status_hdr));
2268 len += static_cast<uint32_t> GRPC_SLICE_LENGTH(http_status_hdr);
2269
2270 content_type_hdr = GRPC_SLICE_MALLOC(31);
2271 p = GRPC_SLICE_START_PTR(content_type_hdr);
2272 *p++ = 0x00;
2273 *p++ = 12;
2274 *p++ = 'c';
2275 *p++ = 'o';
2276 *p++ = 'n';
2277 *p++ = 't';
2278 *p++ = 'e';
2279 *p++ = 'n';
2280 *p++ = 't';
2281 *p++ = '-';
2282 *p++ = 't';
2283 *p++ = 'y';
2284 *p++ = 'p';
2285 *p++ = 'e';
2286 *p++ = 16;
2287 *p++ = 'a';
2288 *p++ = 'p';
2289 *p++ = 'p';
2290 *p++ = 'l';
2291 *p++ = 'i';
2292 *p++ = 'c';
2293 *p++ = 'a';
2294 *p++ = 't';
2295 *p++ = 'i';
2296 *p++ = 'o';
2297 *p++ = 'n';
2298 *p++ = '/';
2299 *p++ = 'g';
2300 *p++ = 'r';
2301 *p++ = 'p';
2302 *p++ = 'c';
2303 GPR_ASSERT(p == GRPC_SLICE_END_PTR(content_type_hdr));
2304 len += static_cast<uint32_t> GRPC_SLICE_LENGTH(content_type_hdr);
2305 }
2306
2307 status_hdr = GRPC_SLICE_MALLOC(15 + (grpc_status >= 10));
2308 p = GRPC_SLICE_START_PTR(status_hdr);
2309 *p++ = 0x00; /* literal header, not indexed */
2310 *p++ = 11; /* len(grpc-status) */
2311 *p++ = 'g';
2312 *p++ = 'r';
2313 *p++ = 'p';
2314 *p++ = 'c';
2315 *p++ = '-';
2316 *p++ = 's';
2317 *p++ = 't';
2318 *p++ = 'a';
2319 *p++ = 't';
2320 *p++ = 'u';
2321 *p++ = 's';
2322 if (grpc_status < 10) {
2323 *p++ = 1;
2324 *p++ = static_cast<uint8_t>('0' + grpc_status);
2325 } else {
2326 *p++ = 2;
2327 *p++ = static_cast<uint8_t>('0' + (grpc_status / 10));
2328 *p++ = static_cast<uint8_t>('0' + (grpc_status % 10));
2329 }
2330 GPR_ASSERT(p == GRPC_SLICE_END_PTR(status_hdr));
2331 len += static_cast<uint32_t> GRPC_SLICE_LENGTH(status_hdr);
2332
2333 size_t msg_len = GRPC_SLICE_LENGTH(slice);
2334 GPR_ASSERT(msg_len <= UINT32_MAX);
2335 uint32_t msg_len_len = GRPC_CHTTP2_VARINT_LENGTH((uint32_t)msg_len, 1);
2336 message_pfx = GRPC_SLICE_MALLOC(14 + msg_len_len);
2337 p = GRPC_SLICE_START_PTR(message_pfx);
2338 *p++ = 0x00; /* literal header, not indexed */
2339 *p++ = 12; /* len(grpc-message) */
2340 *p++ = 'g';
2341 *p++ = 'r';
2342 *p++ = 'p';
2343 *p++ = 'c';
2344 *p++ = '-';
2345 *p++ = 'm';
2346 *p++ = 'e';
2347 *p++ = 's';
2348 *p++ = 's';
2349 *p++ = 'a';
2350 *p++ = 'g';
2351 *p++ = 'e';
2352 GRPC_CHTTP2_WRITE_VARINT((uint32_t)msg_len, 1, 0, p, (uint32_t)msg_len_len);
2353 p += msg_len_len;
2354 GPR_ASSERT(p == GRPC_SLICE_END_PTR(message_pfx));
2355 len += static_cast<uint32_t> GRPC_SLICE_LENGTH(message_pfx);
2356 len += static_cast<uint32_t>(msg_len);
2357
2358 hdr = GRPC_SLICE_MALLOC(9);
2359 p = GRPC_SLICE_START_PTR(hdr);
2360 *p++ = static_cast<uint8_t>(len >> 16);
2361 *p++ = static_cast<uint8_t>(len >> 8);
2362 *p++ = static_cast<uint8_t>(len);
2363 *p++ = GRPC_CHTTP2_FRAME_HEADER;
2364 *p++ = GRPC_CHTTP2_DATA_FLAG_END_STREAM | GRPC_CHTTP2_DATA_FLAG_END_HEADERS;
2365 *p++ = static_cast<uint8_t>(s->id >> 24);
2366 *p++ = static_cast<uint8_t>(s->id >> 16);
2367 *p++ = static_cast<uint8_t>(s->id >> 8);
2368 *p++ = static_cast<uint8_t>(s->id);
2369 GPR_ASSERT(p == GRPC_SLICE_END_PTR(hdr));
2370
2371 grpc_slice_buffer_add(&t->qbuf, hdr);
2372 if (!s->sent_initial_metadata) {
2373 grpc_slice_buffer_add(&t->qbuf, http_status_hdr);
2374 grpc_slice_buffer_add(&t->qbuf, content_type_hdr);
2375 }
2376 grpc_slice_buffer_add(&t->qbuf, status_hdr);
2377 grpc_slice_buffer_add(&t->qbuf, message_pfx);
2378 grpc_slice_buffer_add(&t->qbuf, grpc_slice_ref_internal(slice));
2379 grpc_slice_buffer_add(
2380 &t->qbuf, grpc_chttp2_rst_stream_create(s->id, GRPC_HTTP2_NO_ERROR,
2381 &s->stats.outgoing));
2382
2383 grpc_chttp2_mark_stream_closed(t, s, 1, 1, error);
2384 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API);
2385 }
2386
2387 typedef struct {
2388 grpc_error* error;
2389 grpc_chttp2_transport* t;
2390 } cancel_stream_cb_args;
2391
cancel_stream_cb(void * user_data,uint32_t key,void * stream)2392 static void cancel_stream_cb(void* user_data, uint32_t key, void* stream) {
2393 cancel_stream_cb_args* args = static_cast<cancel_stream_cb_args*>(user_data);
2394 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(stream);
2395 grpc_chttp2_cancel_stream(args->t, s, GRPC_ERROR_REF(args->error));
2396 }
2397
end_all_the_calls(grpc_chttp2_transport * t,grpc_error * error)2398 static void end_all_the_calls(grpc_chttp2_transport* t, grpc_error* error) {
2399 cancel_stream_cb_args args = {error, t};
2400 grpc_chttp2_stream_map_for_each(&t->stream_map, cancel_stream_cb, &args);
2401 GRPC_ERROR_UNREF(error);
2402 }
2403
2404 /*******************************************************************************
2405 * INPUT PROCESSING - PARSING
2406 */
2407
2408 template <class F>
WithUrgency(grpc_chttp2_transport * t,grpc_core::chttp2::FlowControlAction::Urgency urgency,grpc_chttp2_initiate_write_reason reason,F action)2409 static void WithUrgency(grpc_chttp2_transport* t,
2410 grpc_core::chttp2::FlowControlAction::Urgency urgency,
2411 grpc_chttp2_initiate_write_reason reason, F action) {
2412 switch (urgency) {
2413 case grpc_core::chttp2::FlowControlAction::Urgency::NO_ACTION_NEEDED:
2414 break;
2415 case grpc_core::chttp2::FlowControlAction::Urgency::UPDATE_IMMEDIATELY:
2416 grpc_chttp2_initiate_write(t, reason);
2417 // fallthrough
2418 case grpc_core::chttp2::FlowControlAction::Urgency::QUEUE_UPDATE:
2419 action();
2420 break;
2421 }
2422 }
2423
grpc_chttp2_act_on_flowctl_action(const grpc_core::chttp2::FlowControlAction & action,grpc_chttp2_transport * t,grpc_chttp2_stream * s)2424 void grpc_chttp2_act_on_flowctl_action(
2425 const grpc_core::chttp2::FlowControlAction& action,
2426 grpc_chttp2_transport* t, grpc_chttp2_stream* s) {
2427 WithUrgency(t, action.send_stream_update(),
2428 GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL,
2429 [t, s]() { grpc_chttp2_mark_stream_writable(t, s); });
2430 WithUrgency(t, action.send_transport_update(),
2431 GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL, []() {});
2432 WithUrgency(t, action.send_initial_window_update(),
2433 GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
2434 queue_setting_update(t,
2435 GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
2436 action.initial_window_size());
2437 });
2438 WithUrgency(t, action.send_max_frame_size_update(),
2439 GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
2440 queue_setting_update(t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
2441 action.max_frame_size());
2442 });
2443 }
2444
try_http_parsing(grpc_chttp2_transport * t)2445 static grpc_error* try_http_parsing(grpc_chttp2_transport* t) {
2446 grpc_http_parser parser;
2447 size_t i = 0;
2448 grpc_error* error = GRPC_ERROR_NONE;
2449 grpc_http_response response;
2450 memset(&response, 0, sizeof(response));
2451
2452 grpc_http_parser_init(&parser, GRPC_HTTP_RESPONSE, &response);
2453
2454 grpc_error* parse_error = GRPC_ERROR_NONE;
2455 for (; i < t->read_buffer.count && parse_error == GRPC_ERROR_NONE; i++) {
2456 parse_error =
2457 grpc_http_parser_parse(&parser, t->read_buffer.slices[i], nullptr);
2458 }
2459 if (parse_error == GRPC_ERROR_NONE &&
2460 (parse_error = grpc_http_parser_eof(&parser)) == GRPC_ERROR_NONE) {
2461 error = grpc_error_set_int(
2462 grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2463 "Trying to connect an http1.x server"),
2464 GRPC_ERROR_INT_HTTP_STATUS, response.status),
2465 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
2466 }
2467 GRPC_ERROR_UNREF(parse_error);
2468
2469 grpc_http_parser_destroy(&parser);
2470 grpc_http_response_destroy(&response);
2471 return error;
2472 }
2473
read_action_locked(void * tp,grpc_error * error)2474 static void read_action_locked(void* tp, grpc_error* error) {
2475 GPR_TIMER_SCOPE("reading_action_locked", 0);
2476
2477 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2478
2479 GRPC_ERROR_REF(error);
2480
2481 grpc_error* err = error;
2482 if (err != GRPC_ERROR_NONE) {
2483 err = grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2484 "Endpoint read failed", &err, 1),
2485 GRPC_ERROR_INT_OCCURRED_DURING_WRITE,
2486 t->write_state);
2487 }
2488 GPR_SWAP(grpc_error*, err, error);
2489 GRPC_ERROR_UNREF(err);
2490 if (t->closed_with_error == GRPC_ERROR_NONE) {
2491 GPR_TIMER_SCOPE("reading_action.parse", 0);
2492 size_t i = 0;
2493 grpc_error* errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE,
2494 GRPC_ERROR_NONE};
2495 for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) {
2496 grpc_core::BdpEstimator* bdp_est = t->flow_control->bdp_estimator();
2497 if (bdp_est) {
2498 bdp_est->AddIncomingBytes(
2499 static_cast<int64_t> GRPC_SLICE_LENGTH(t->read_buffer.slices[i]));
2500 }
2501 errors[1] = grpc_chttp2_perform_read(t, t->read_buffer.slices[i]);
2502 }
2503 if (errors[1] != GRPC_ERROR_NONE) {
2504 errors[2] = try_http_parsing(t);
2505 GRPC_ERROR_UNREF(error);
2506 error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2507 "Failed parsing HTTP/2", errors, GPR_ARRAY_SIZE(errors));
2508 }
2509 for (i = 0; i < GPR_ARRAY_SIZE(errors); i++) {
2510 GRPC_ERROR_UNREF(errors[i]);
2511 }
2512
2513 GPR_TIMER_SCOPE("post_parse_locked", 0);
2514 if (t->initial_window_update != 0) {
2515 if (t->initial_window_update > 0) {
2516 grpc_chttp2_stream* s;
2517 while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) {
2518 grpc_chttp2_mark_stream_writable(t, s);
2519 grpc_chttp2_initiate_write(
2520 t, GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING);
2521 }
2522 }
2523 t->initial_window_update = 0;
2524 }
2525 }
2526
2527 GPR_TIMER_SCOPE("post_reading_action_locked", 0);
2528 bool keep_reading = false;
2529 if (error == GRPC_ERROR_NONE && t->closed_with_error != GRPC_ERROR_NONE) {
2530 error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2531 "Transport closed", &t->closed_with_error, 1);
2532 }
2533 if (error != GRPC_ERROR_NONE) {
2534 /* If a goaway frame was received, this might be the reason why the read
2535 * failed. Add this info to the error */
2536 if (t->goaway_error != GRPC_ERROR_NONE) {
2537 error = grpc_error_add_child(error, GRPC_ERROR_REF(t->goaway_error));
2538 }
2539
2540 close_transport_locked(t, GRPC_ERROR_REF(error));
2541 t->endpoint_reading = 0;
2542 } else if (t->closed_with_error == GRPC_ERROR_NONE) {
2543 keep_reading = true;
2544 GRPC_CHTTP2_REF_TRANSPORT(t, "keep_reading");
2545 }
2546 grpc_slice_buffer_reset_and_unref_internal(&t->read_buffer);
2547
2548 if (keep_reading) {
2549 grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action_locked);
2550 grpc_chttp2_act_on_flowctl_action(t->flow_control->MakeAction(), t,
2551 nullptr);
2552 GRPC_CHTTP2_UNREF_TRANSPORT(t, "keep_reading");
2553 } else {
2554 GRPC_CHTTP2_UNREF_TRANSPORT(t, "reading_action");
2555 }
2556
2557 GRPC_ERROR_UNREF(error);
2558 }
2559
2560 // t is reffed prior to calling the first time, and once the callback chain
2561 // that kicks off finishes, it's unreffed
schedule_bdp_ping_locked(grpc_chttp2_transport * t)2562 static void schedule_bdp_ping_locked(grpc_chttp2_transport* t) {
2563 t->flow_control->bdp_estimator()->SchedulePing();
2564 send_ping_locked(t, &t->start_bdp_ping_locked, &t->finish_bdp_ping_locked);
2565 }
2566
start_bdp_ping_locked(void * tp,grpc_error * error)2567 static void start_bdp_ping_locked(void* tp, grpc_error* error) {
2568 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2569 if (grpc_http_trace.enabled()) {
2570 gpr_log(GPR_INFO, "%s: Start BDP ping err=%s", t->peer_string,
2571 grpc_error_string(error));
2572 }
2573 /* Reset the keepalive ping timer */
2574 if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
2575 grpc_timer_cancel(&t->keepalive_ping_timer);
2576 }
2577 t->flow_control->bdp_estimator()->StartPing();
2578 }
2579
finish_bdp_ping_locked(void * tp,grpc_error * error)2580 static void finish_bdp_ping_locked(void* tp, grpc_error* error) {
2581 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2582 if (grpc_http_trace.enabled()) {
2583 gpr_log(GPR_INFO, "%s: Complete BDP ping err=%s", t->peer_string,
2584 grpc_error_string(error));
2585 }
2586 if (error != GRPC_ERROR_NONE) {
2587 GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
2588 return;
2589 }
2590 grpc_millis next_ping = t->flow_control->bdp_estimator()->CompletePing();
2591 grpc_chttp2_act_on_flowctl_action(t->flow_control->PeriodicUpdate(), t,
2592 nullptr);
2593 GPR_ASSERT(!t->have_next_bdp_ping_timer);
2594 t->have_next_bdp_ping_timer = true;
2595 grpc_timer_init(&t->next_bdp_ping_timer, next_ping,
2596 &t->next_bdp_ping_timer_expired_locked);
2597 }
2598
next_bdp_ping_timer_expired_locked(void * tp,grpc_error * error)2599 static void next_bdp_ping_timer_expired_locked(void* tp, grpc_error* error) {
2600 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2601 GPR_ASSERT(t->have_next_bdp_ping_timer);
2602 t->have_next_bdp_ping_timer = false;
2603 if (error != GRPC_ERROR_NONE) {
2604 GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
2605 return;
2606 }
2607 schedule_bdp_ping_locked(t);
2608 }
2609
grpc_chttp2_config_default_keepalive_args(grpc_channel_args * args,bool is_client)2610 void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
2611 bool is_client) {
2612 size_t i;
2613 if (args) {
2614 for (i = 0; i < args->num_args; i++) {
2615 if (0 == strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) {
2616 const int value = grpc_channel_arg_get_integer(
2617 &args->args[i], {is_client ? g_default_client_keepalive_time_ms
2618 : g_default_server_keepalive_time_ms,
2619 1, INT_MAX});
2620 if (is_client) {
2621 g_default_client_keepalive_time_ms = value;
2622 } else {
2623 g_default_server_keepalive_time_ms = value;
2624 }
2625 } else if (0 ==
2626 strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) {
2627 const int value = grpc_channel_arg_get_integer(
2628 &args->args[i], {is_client ? g_default_client_keepalive_timeout_ms
2629 : g_default_server_keepalive_timeout_ms,
2630 0, INT_MAX});
2631 if (is_client) {
2632 g_default_client_keepalive_timeout_ms = value;
2633 } else {
2634 g_default_server_keepalive_timeout_ms = value;
2635 }
2636 } else if (0 == strcmp(args->args[i].key,
2637 GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) {
2638 const bool value = static_cast<uint32_t>(grpc_channel_arg_get_integer(
2639 &args->args[i],
2640 {is_client ? g_default_client_keepalive_permit_without_calls
2641 : g_default_server_keepalive_timeout_ms,
2642 0, 1}));
2643 if (is_client) {
2644 g_default_client_keepalive_permit_without_calls = value;
2645 } else {
2646 g_default_server_keepalive_permit_without_calls = value;
2647 }
2648 } else if (0 ==
2649 strcmp(args->args[i].key, GRPC_ARG_HTTP2_MAX_PING_STRIKES)) {
2650 g_default_max_ping_strikes = grpc_channel_arg_get_integer(
2651 &args->args[i], {g_default_max_ping_strikes, 0, INT_MAX});
2652 } else if (0 == strcmp(args->args[i].key,
2653 GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) {
2654 g_default_max_pings_without_data = grpc_channel_arg_get_integer(
2655 &args->args[i], {g_default_max_pings_without_data, 0, INT_MAX});
2656 } else if (0 ==
2657 strcmp(
2658 args->args[i].key,
2659 GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS)) {
2660 g_default_min_sent_ping_interval_without_data_ms =
2661 grpc_channel_arg_get_integer(
2662 &args->args[i],
2663 {g_default_min_sent_ping_interval_without_data_ms, 0, INT_MAX});
2664 } else if (0 ==
2665 strcmp(
2666 args->args[i].key,
2667 GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS)) {
2668 g_default_min_recv_ping_interval_without_data_ms =
2669 grpc_channel_arg_get_integer(
2670 &args->args[i],
2671 {g_default_min_recv_ping_interval_without_data_ms, 0, INT_MAX});
2672 }
2673 }
2674 }
2675 }
2676
init_keepalive_ping_locked(void * arg,grpc_error * error)2677 static void init_keepalive_ping_locked(void* arg, grpc_error* error) {
2678 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2679 GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING);
2680 if (t->destroying || t->closed_with_error != GRPC_ERROR_NONE) {
2681 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
2682 } else if (error == GRPC_ERROR_NONE) {
2683 if (t->keepalive_permit_without_calls ||
2684 grpc_chttp2_stream_map_size(&t->stream_map) > 0) {
2685 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING;
2686 GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end");
2687 send_keepalive_ping_locked(t);
2688 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING);
2689 } else {
2690 GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
2691 grpc_timer_init(&t->keepalive_ping_timer,
2692 grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
2693 &t->init_keepalive_ping_locked);
2694 }
2695 } else if (error == GRPC_ERROR_CANCELLED) {
2696 /* The keepalive ping timer may be cancelled by bdp */
2697 GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
2698 grpc_timer_init(&t->keepalive_ping_timer,
2699 grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
2700 &t->init_keepalive_ping_locked);
2701 }
2702 GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping");
2703 }
2704
start_keepalive_ping_locked(void * arg,grpc_error * error)2705 static void start_keepalive_ping_locked(void* arg, grpc_error* error) {
2706 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2707 if (error != GRPC_ERROR_NONE) {
2708 return;
2709 }
2710 GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog");
2711 grpc_timer_init(&t->keepalive_watchdog_timer,
2712 grpc_core::ExecCtx::Get()->Now() + t->keepalive_timeout,
2713 &t->keepalive_watchdog_fired_locked);
2714 }
2715
finish_keepalive_ping_locked(void * arg,grpc_error * error)2716 static void finish_keepalive_ping_locked(void* arg, grpc_error* error) {
2717 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2718 if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
2719 if (error == GRPC_ERROR_NONE) {
2720 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
2721 grpc_timer_cancel(&t->keepalive_watchdog_timer);
2722 GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
2723 grpc_timer_init(&t->keepalive_ping_timer,
2724 grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
2725 &t->init_keepalive_ping_locked);
2726 }
2727 }
2728 GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive ping end");
2729 }
2730
keepalive_watchdog_fired_locked(void * arg,grpc_error * error)2731 static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error) {
2732 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2733 if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
2734 if (error == GRPC_ERROR_NONE) {
2735 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
2736 close_transport_locked(
2737 t,
2738 grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2739 "keepalive watchdog timeout"),
2740 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL));
2741 }
2742 } else {
2743 /* The watchdog timer should have been cancelled by
2744 * finish_keepalive_ping_locked. */
2745 if (GPR_UNLIKELY(error != GRPC_ERROR_CANCELLED)) {
2746 gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)",
2747 t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING);
2748 }
2749 }
2750 GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog");
2751 }
2752
2753 /*******************************************************************************
2754 * CALLBACK LOOP
2755 */
2756
connectivity_state_set(grpc_chttp2_transport * t,grpc_connectivity_state state,grpc_error * error,const char * reason)2757 static void connectivity_state_set(grpc_chttp2_transport* t,
2758 grpc_connectivity_state state,
2759 grpc_error* error, const char* reason) {
2760 GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "set connectivity_state=%d", state));
2761 grpc_connectivity_state_set(&t->channel_callback.state_tracker, state, error,
2762 reason);
2763 }
2764
2765 /*******************************************************************************
2766 * POLLSET STUFF
2767 */
2768
set_pollset(grpc_transport * gt,grpc_stream * gs,grpc_pollset * pollset)2769 static void set_pollset(grpc_transport* gt, grpc_stream* gs,
2770 grpc_pollset* pollset) {
2771 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
2772 grpc_endpoint_add_to_pollset(t->ep, pollset);
2773 }
2774
set_pollset_set(grpc_transport * gt,grpc_stream * gs,grpc_pollset_set * pollset_set)2775 static void set_pollset_set(grpc_transport* gt, grpc_stream* gs,
2776 grpc_pollset_set* pollset_set) {
2777 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
2778 grpc_endpoint_add_to_pollset_set(t->ep, pollset_set);
2779 }
2780
2781 /*******************************************************************************
2782 * BYTE STREAM
2783 */
2784
reset_byte_stream(void * arg,grpc_error * error)2785 static void reset_byte_stream(void* arg, grpc_error* error) {
2786 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(arg);
2787 s->pending_byte_stream = false;
2788 if (error == GRPC_ERROR_NONE) {
2789 grpc_chttp2_maybe_complete_recv_message(s->t, s);
2790 grpc_chttp2_maybe_complete_recv_trailing_metadata(s->t, s);
2791 } else {
2792 GPR_ASSERT(error != GRPC_ERROR_NONE);
2793 GRPC_CLOSURE_SCHED(s->on_next, GRPC_ERROR_REF(error));
2794 s->on_next = nullptr;
2795 GRPC_ERROR_UNREF(s->byte_stream_error);
2796 s->byte_stream_error = GRPC_ERROR_NONE;
2797 grpc_chttp2_cancel_stream(s->t, s, GRPC_ERROR_REF(error));
2798 s->byte_stream_error = GRPC_ERROR_REF(error);
2799 }
2800 }
2801
2802 namespace grpc_core {
2803
Chttp2IncomingByteStream(grpc_chttp2_transport * transport,grpc_chttp2_stream * stream,uint32_t frame_size,uint32_t flags)2804 Chttp2IncomingByteStream::Chttp2IncomingByteStream(
2805 grpc_chttp2_transport* transport, grpc_chttp2_stream* stream,
2806 uint32_t frame_size, uint32_t flags)
2807 : ByteStream(frame_size, flags),
2808 transport_(transport),
2809 stream_(stream),
2810 remaining_bytes_(frame_size) {
2811 gpr_ref_init(&refs_, 2);
2812 GRPC_ERROR_UNREF(stream->byte_stream_error);
2813 stream->byte_stream_error = GRPC_ERROR_NONE;
2814 }
2815
OrphanLocked(void * arg,grpc_error * error_ignored)2816 void Chttp2IncomingByteStream::OrphanLocked(void* arg,
2817 grpc_error* error_ignored) {
2818 Chttp2IncomingByteStream* bs = static_cast<Chttp2IncomingByteStream*>(arg);
2819 grpc_chttp2_stream* s = bs->stream_;
2820 grpc_chttp2_transport* t = s->t;
2821 bs->Unref();
2822 s->pending_byte_stream = false;
2823 grpc_chttp2_maybe_complete_recv_message(t, s);
2824 grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2825 }
2826
Orphan()2827 void Chttp2IncomingByteStream::Orphan() {
2828 GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0);
2829 GRPC_CLOSURE_SCHED(
2830 GRPC_CLOSURE_INIT(&destroy_action_,
2831 &Chttp2IncomingByteStream::OrphanLocked, this,
2832 grpc_combiner_scheduler(transport_->combiner)),
2833 GRPC_ERROR_NONE);
2834 }
2835
Unref()2836 void Chttp2IncomingByteStream::Unref() {
2837 if (gpr_unref(&refs_)) {
2838 Delete(this);
2839 }
2840 }
2841
Ref()2842 void Chttp2IncomingByteStream::Ref() { gpr_ref(&refs_); }
2843
NextLocked(void * arg,grpc_error * error_ignored)2844 void Chttp2IncomingByteStream::NextLocked(void* arg,
2845 grpc_error* error_ignored) {
2846 Chttp2IncomingByteStream* bs = static_cast<Chttp2IncomingByteStream*>(arg);
2847 grpc_chttp2_transport* t = bs->transport_;
2848 grpc_chttp2_stream* s = bs->stream_;
2849 size_t cur_length = s->frame_storage.length;
2850 if (!s->read_closed) {
2851 s->flow_control->IncomingByteStreamUpdate(bs->next_action_.max_size_hint,
2852 cur_length);
2853 grpc_chttp2_act_on_flowctl_action(s->flow_control->MakeAction(), t, s);
2854 }
2855 GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0);
2856 if (s->frame_storage.length > 0) {
2857 grpc_slice_buffer_swap(&s->frame_storage,
2858 &s->unprocessed_incoming_frames_buffer);
2859 s->unprocessed_incoming_frames_decompressed = false;
2860 GRPC_CLOSURE_SCHED(bs->next_action_.on_complete, GRPC_ERROR_NONE);
2861 } else if (s->byte_stream_error != GRPC_ERROR_NONE) {
2862 GRPC_CLOSURE_SCHED(bs->next_action_.on_complete,
2863 GRPC_ERROR_REF(s->byte_stream_error));
2864 if (s->data_parser.parsing_frame != nullptr) {
2865 s->data_parser.parsing_frame->Unref();
2866 s->data_parser.parsing_frame = nullptr;
2867 }
2868 } else if (s->read_closed) {
2869 if (bs->remaining_bytes_ != 0) {
2870 s->byte_stream_error =
2871 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
2872 GRPC_CLOSURE_SCHED(bs->next_action_.on_complete,
2873 GRPC_ERROR_REF(s->byte_stream_error));
2874 if (s->data_parser.parsing_frame != nullptr) {
2875 s->data_parser.parsing_frame->Unref();
2876 s->data_parser.parsing_frame = nullptr;
2877 }
2878 } else {
2879 /* Should never reach here. */
2880 GPR_ASSERT(false);
2881 }
2882 } else {
2883 s->on_next = bs->next_action_.on_complete;
2884 }
2885 bs->Unref();
2886 }
2887
Next(size_t max_size_hint,grpc_closure * on_complete)2888 bool Chttp2IncomingByteStream::Next(size_t max_size_hint,
2889 grpc_closure* on_complete) {
2890 GPR_TIMER_SCOPE("incoming_byte_stream_next", 0);
2891 if (stream_->unprocessed_incoming_frames_buffer.length > 0) {
2892 return true;
2893 } else {
2894 Ref();
2895 next_action_.max_size_hint = max_size_hint;
2896 next_action_.on_complete = on_complete;
2897 GRPC_CLOSURE_SCHED(
2898 GRPC_CLOSURE_INIT(&next_action_.closure,
2899 &Chttp2IncomingByteStream::NextLocked, this,
2900 grpc_combiner_scheduler(transport_->combiner)),
2901 GRPC_ERROR_NONE);
2902 return false;
2903 }
2904 }
2905
MaybeCreateStreamDecompressionCtx()2906 void Chttp2IncomingByteStream::MaybeCreateStreamDecompressionCtx() {
2907 if (!stream_->stream_decompression_ctx) {
2908 stream_->stream_decompression_ctx = grpc_stream_compression_context_create(
2909 stream_->stream_decompression_method);
2910 }
2911 }
2912
Pull(grpc_slice * slice)2913 grpc_error* Chttp2IncomingByteStream::Pull(grpc_slice* slice) {
2914 GPR_TIMER_SCOPE("incoming_byte_stream_pull", 0);
2915 grpc_error* error;
2916 if (stream_->unprocessed_incoming_frames_buffer.length > 0) {
2917 if (!stream_->unprocessed_incoming_frames_decompressed) {
2918 bool end_of_context;
2919 MaybeCreateStreamDecompressionCtx();
2920 if (!grpc_stream_decompress(stream_->stream_decompression_ctx,
2921 &stream_->unprocessed_incoming_frames_buffer,
2922 &stream_->decompressed_data_buffer, nullptr,
2923 MAX_SIZE_T, &end_of_context)) {
2924 error =
2925 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream decompression error.");
2926 return error;
2927 }
2928 GPR_ASSERT(stream_->unprocessed_incoming_frames_buffer.length == 0);
2929 grpc_slice_buffer_swap(&stream_->unprocessed_incoming_frames_buffer,
2930 &stream_->decompressed_data_buffer);
2931 stream_->unprocessed_incoming_frames_decompressed = true;
2932 if (end_of_context) {
2933 grpc_stream_compression_context_destroy(
2934 stream_->stream_decompression_ctx);
2935 stream_->stream_decompression_ctx = nullptr;
2936 }
2937 if (stream_->unprocessed_incoming_frames_buffer.length == 0) {
2938 *slice = grpc_empty_slice();
2939 }
2940 }
2941 error = grpc_deframe_unprocessed_incoming_frames(
2942 &stream_->data_parser, stream_,
2943 &stream_->unprocessed_incoming_frames_buffer, slice, nullptr);
2944 if (error != GRPC_ERROR_NONE) {
2945 return error;
2946 }
2947 } else {
2948 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
2949 GRPC_CLOSURE_SCHED(&stream_->reset_byte_stream, GRPC_ERROR_REF(error));
2950 return error;
2951 }
2952 return GRPC_ERROR_NONE;
2953 }
2954
PublishError(grpc_error * error)2955 void Chttp2IncomingByteStream::PublishError(grpc_error* error) {
2956 GPR_ASSERT(error != GRPC_ERROR_NONE);
2957 GRPC_CLOSURE_SCHED(stream_->on_next, GRPC_ERROR_REF(error));
2958 stream_->on_next = nullptr;
2959 GRPC_ERROR_UNREF(stream_->byte_stream_error);
2960 stream_->byte_stream_error = GRPC_ERROR_REF(error);
2961 grpc_chttp2_cancel_stream(transport_, stream_, GRPC_ERROR_REF(error));
2962 }
2963
Push(grpc_slice slice,grpc_slice * slice_out)2964 grpc_error* Chttp2IncomingByteStream::Push(grpc_slice slice,
2965 grpc_slice* slice_out) {
2966 if (remaining_bytes_ < GRPC_SLICE_LENGTH(slice)) {
2967 grpc_error* error =
2968 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream");
2969 GRPC_CLOSURE_SCHED(&stream_->reset_byte_stream, GRPC_ERROR_REF(error));
2970 grpc_slice_unref_internal(slice);
2971 return error;
2972 } else {
2973 remaining_bytes_ -= static_cast<uint32_t> GRPC_SLICE_LENGTH(slice);
2974 if (slice_out != nullptr) {
2975 *slice_out = slice;
2976 }
2977 return GRPC_ERROR_NONE;
2978 }
2979 }
2980
Finished(grpc_error * error,bool reset_on_error)2981 grpc_error* Chttp2IncomingByteStream::Finished(grpc_error* error,
2982 bool reset_on_error) {
2983 if (error == GRPC_ERROR_NONE) {
2984 if (remaining_bytes_ != 0) {
2985 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
2986 }
2987 }
2988 if (error != GRPC_ERROR_NONE && reset_on_error) {
2989 GRPC_CLOSURE_SCHED(&stream_->reset_byte_stream, GRPC_ERROR_REF(error));
2990 }
2991 Unref();
2992 return error;
2993 }
2994
Shutdown(grpc_error * error)2995 void Chttp2IncomingByteStream::Shutdown(grpc_error* error) {
2996 GRPC_ERROR_UNREF(Finished(error, true /* reset_on_error */));
2997 }
2998
2999 } // namespace grpc_core
3000
3001 /*******************************************************************************
3002 * RESOURCE QUOTAS
3003 */
3004
post_benign_reclaimer(grpc_chttp2_transport * t)3005 static void post_benign_reclaimer(grpc_chttp2_transport* t) {
3006 if (!t->benign_reclaimer_registered) {
3007 t->benign_reclaimer_registered = true;
3008 GRPC_CHTTP2_REF_TRANSPORT(t, "benign_reclaimer");
3009 grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep),
3010 false, &t->benign_reclaimer_locked);
3011 }
3012 }
3013
post_destructive_reclaimer(grpc_chttp2_transport * t)3014 static void post_destructive_reclaimer(grpc_chttp2_transport* t) {
3015 if (!t->destructive_reclaimer_registered) {
3016 t->destructive_reclaimer_registered = true;
3017 GRPC_CHTTP2_REF_TRANSPORT(t, "destructive_reclaimer");
3018 grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep),
3019 true, &t->destructive_reclaimer_locked);
3020 }
3021 }
3022
benign_reclaimer_locked(void * arg,grpc_error * error)3023 static void benign_reclaimer_locked(void* arg, grpc_error* error) {
3024 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
3025 if (error == GRPC_ERROR_NONE &&
3026 grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
3027 /* Channel with no active streams: send a goaway to try and make it
3028 * disconnect cleanly */
3029 if (grpc_resource_quota_trace.enabled()) {
3030 gpr_log(GPR_INFO, "HTTP2: %s - send goaway to free memory",
3031 t->peer_string);
3032 }
3033 send_goaway(t,
3034 grpc_error_set_int(
3035 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"),
3036 GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM));
3037 } else if (error == GRPC_ERROR_NONE && grpc_resource_quota_trace.enabled()) {
3038 gpr_log(GPR_INFO,
3039 "HTTP2: %s - skip benign reclamation, there are still %" PRIdPTR
3040 " streams",
3041 t->peer_string, grpc_chttp2_stream_map_size(&t->stream_map));
3042 }
3043 t->benign_reclaimer_registered = false;
3044 if (error != GRPC_ERROR_CANCELLED) {
3045 grpc_resource_user_finish_reclamation(
3046 grpc_endpoint_get_resource_user(t->ep));
3047 }
3048 GRPC_CHTTP2_UNREF_TRANSPORT(t, "benign_reclaimer");
3049 }
3050
destructive_reclaimer_locked(void * arg,grpc_error * error)3051 static void destructive_reclaimer_locked(void* arg, grpc_error* error) {
3052 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
3053 size_t n = grpc_chttp2_stream_map_size(&t->stream_map);
3054 t->destructive_reclaimer_registered = false;
3055 if (error == GRPC_ERROR_NONE && n > 0) {
3056 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(
3057 grpc_chttp2_stream_map_rand(&t->stream_map));
3058 if (grpc_resource_quota_trace.enabled()) {
3059 gpr_log(GPR_INFO, "HTTP2: %s - abandon stream id %d", t->peer_string,
3060 s->id);
3061 }
3062 grpc_chttp2_cancel_stream(
3063 t, s,
3064 grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"),
3065 GRPC_ERROR_INT_HTTP2_ERROR,
3066 GRPC_HTTP2_ENHANCE_YOUR_CALM));
3067 if (n > 1) {
3068 /* Since we cancel one stream per destructive reclamation, if
3069 there are more streams left, we can immediately post a new
3070 reclaimer in case the resource quota needs to free more
3071 memory */
3072 post_destructive_reclaimer(t);
3073 }
3074 }
3075 if (error != GRPC_ERROR_CANCELLED) {
3076 grpc_resource_user_finish_reclamation(
3077 grpc_endpoint_get_resource_user(t->ep));
3078 }
3079 GRPC_CHTTP2_UNREF_TRANSPORT(t, "destructive_reclaimer");
3080 }
3081
3082 /*******************************************************************************
3083 * MONITORING
3084 */
3085
grpc_chttp2_initiate_write_reason_string(grpc_chttp2_initiate_write_reason reason)3086 const char* grpc_chttp2_initiate_write_reason_string(
3087 grpc_chttp2_initiate_write_reason reason) {
3088 switch (reason) {
3089 case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE:
3090 return "INITIAL_WRITE";
3091 case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM:
3092 return "START_NEW_STREAM";
3093 case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE:
3094 return "SEND_MESSAGE";
3095 case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA:
3096 return "SEND_INITIAL_METADATA";
3097 case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA:
3098 return "SEND_TRAILING_METADATA";
3099 case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING:
3100 return "RETRY_SEND_PING";
3101 case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS:
3102 return "CONTINUE_PINGS";
3103 case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT:
3104 return "GOAWAY_SENT";
3105 case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM:
3106 return "RST_STREAM";
3107 case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API:
3108 return "CLOSE_FROM_API";
3109 case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL:
3110 return "STREAM_FLOW_CONTROL";
3111 case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL:
3112 return "TRANSPORT_FLOW_CONTROL";
3113 case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS:
3114 return "SEND_SETTINGS";
3115 case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING:
3116 return "FLOW_CONTROL_UNSTALLED_BY_SETTING";
3117 case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE:
3118 return "FLOW_CONTROL_UNSTALLED_BY_UPDATE";
3119 case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING:
3120 return "APPLICATION_PING";
3121 case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING:
3122 return "KEEPALIVE_PING";
3123 case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED:
3124 return "TRANSPORT_FLOW_CONTROL_UNSTALLED";
3125 case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE:
3126 return "PING_RESPONSE";
3127 case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM:
3128 return "FORCE_RST_STREAM";
3129 }
3130 GPR_UNREACHABLE_CODE(return "unknown");
3131 }
3132
chttp2_get_endpoint(grpc_transport * t)3133 static grpc_endpoint* chttp2_get_endpoint(grpc_transport* t) {
3134 return (reinterpret_cast<grpc_chttp2_transport*>(t))->ep;
3135 }
3136
3137 static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
3138 "chttp2",
3139 init_stream,
3140 set_pollset,
3141 set_pollset_set,
3142 perform_stream_op,
3143 perform_transport_op,
3144 destroy_stream,
3145 destroy_transport,
3146 chttp2_get_endpoint};
3147
get_vtable(void)3148 static const grpc_transport_vtable* get_vtable(void) { return &vtable; }
3149
grpc_create_chttp2_transport(const grpc_channel_args * channel_args,grpc_endpoint * ep,bool is_client)3150 grpc_transport* grpc_create_chttp2_transport(
3151 const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client) {
3152 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(
3153 gpr_zalloc(sizeof(grpc_chttp2_transport)));
3154 init_transport(t, channel_args, ep, is_client);
3155 return &t->base;
3156 }
3157
grpc_chttp2_transport_start_reading(grpc_transport * transport,grpc_slice_buffer * read_buffer,grpc_closure * notify_on_receive_settings)3158 void grpc_chttp2_transport_start_reading(
3159 grpc_transport* transport, grpc_slice_buffer* read_buffer,
3160 grpc_closure* notify_on_receive_settings) {
3161 grpc_chttp2_transport* t =
3162 reinterpret_cast<grpc_chttp2_transport*>(transport);
3163 GRPC_CHTTP2_REF_TRANSPORT(
3164 t, "reading_action"); /* matches unref inside reading_action */
3165 if (read_buffer != nullptr) {
3166 grpc_slice_buffer_move_into(read_buffer, &t->read_buffer);
3167 gpr_free(read_buffer);
3168 }
3169 t->notify_on_receive_settings = notify_on_receive_settings;
3170 GRPC_CLOSURE_SCHED(&t->read_action_locked, GRPC_ERROR_NONE);
3171 }
3172