1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
20
21 #include "absl/strings/str_format.h"
22
23 #include <grpc/slice_buffer.h>
24 #include <grpc/support/alloc.h>
25 #include <grpc/support/log.h>
26 #include <grpc/support/port_platform.h>
27 #include <grpc/support/string_util.h>
28 #include <inttypes.h>
29 #include <limits.h>
30 #include <math.h>
31 #include <stdio.h>
32 #include <string.h>
33
34 #include "src/core/ext/transport/chttp2/transport/context_list.h"
35 #include "src/core/ext/transport/chttp2/transport/frame_data.h"
36 #include "src/core/ext/transport/chttp2/transport/internal.h"
37 #include "src/core/ext/transport/chttp2/transport/varint.h"
38 #include "src/core/lib/channel/channel_args.h"
39 #include "src/core/lib/compression/stream_compression.h"
40 #include "src/core/lib/debug/stats.h"
41 #include "src/core/lib/gpr/env.h"
42 #include "src/core/lib/gpr/string.h"
43 #include "src/core/lib/gprpp/memory.h"
44 #include "src/core/lib/http/parser.h"
45 #include "src/core/lib/iomgr/executor.h"
46 #include "src/core/lib/iomgr/iomgr.h"
47 #include "src/core/lib/iomgr/timer.h"
48 #include "src/core/lib/profiling/timers.h"
49 #include "src/core/lib/slice/slice_internal.h"
50 #include "src/core/lib/slice/slice_string_helpers.h"
51 #include "src/core/lib/transport/error_utils.h"
52 #include "src/core/lib/transport/http2_errors.h"
53 #include "src/core/lib/transport/static_metadata.h"
54 #include "src/core/lib/transport/status_conversion.h"
55 #include "src/core/lib/transport/timeout_encoding.h"
56 #include "src/core/lib/transport/transport.h"
57 #include "src/core/lib/transport/transport_impl.h"
58 #include "src/core/lib/uri/uri_parser.h"
59
60 #define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
61 #define MAX_WINDOW 0x7fffffffu
62 #define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024)
63 #define DEFAULT_MAX_HEADER_LIST_SIZE (8 * 1024)
64
65 #define DEFAULT_CLIENT_KEEPALIVE_TIME_MS INT_MAX
66 #define DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_MS 20000 /* 20 seconds */
67 #define DEFAULT_SERVER_KEEPALIVE_TIME_MS 7200000 /* 2 hours */
68 #define DEFAULT_SERVER_KEEPALIVE_TIMEOUT_MS 20000 /* 20 seconds */
69 #define DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS false
70 #define KEEPALIVE_TIME_BACKOFF_MULTIPLIER 2
71
72 #define DEFAULT_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS 300000 /* 5 minutes */
73 #define DEFAULT_MAX_PINGS_BETWEEN_DATA 2
74 #define DEFAULT_MAX_PING_STRIKES 2
75
76 #define DEFAULT_MAX_PENDING_INDUCED_FRAMES 10000
77
78 static int g_default_client_keepalive_time_ms =
79 DEFAULT_CLIENT_KEEPALIVE_TIME_MS;
80 static int g_default_client_keepalive_timeout_ms =
81 DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_MS;
82 static int g_default_server_keepalive_time_ms =
83 DEFAULT_SERVER_KEEPALIVE_TIME_MS;
84 static int g_default_server_keepalive_timeout_ms =
85 DEFAULT_SERVER_KEEPALIVE_TIMEOUT_MS;
86 static bool g_default_client_keepalive_permit_without_calls =
87 DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS;
88 static bool g_default_server_keepalive_permit_without_calls =
89 DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS;
90
91 static int g_default_min_recv_ping_interval_without_data_ms =
92 DEFAULT_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS;
93 static int g_default_max_pings_without_data = DEFAULT_MAX_PINGS_BETWEEN_DATA;
94 static int g_default_max_ping_strikes = DEFAULT_MAX_PING_STRIKES;
95
96 #define MAX_CLIENT_STREAM_ID 0x7fffffffu
97 grpc_core::TraceFlag grpc_http_trace(false, "http");
98 grpc_core::TraceFlag grpc_keepalive_trace(false, "http_keepalive");
99 grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount(false,
100 "chttp2_refcount");
101
102 // forward declarations of various callbacks that we'll build closures around
103 static void write_action_begin_locked(void* t, grpc_error_handle error);
104 static void write_action(void* t, grpc_error_handle error);
105 static void write_action_end(void* t, grpc_error_handle error);
106 static void write_action_end_locked(void* t, grpc_error_handle error);
107
108 static void read_action(void* t, grpc_error_handle error);
109 static void read_action_locked(void* t, grpc_error_handle error);
110 static void continue_read_action_locked(grpc_chttp2_transport* t);
111
112 static void complete_fetch(void* gs, grpc_error_handle error);
113 static void complete_fetch_locked(void* gs, grpc_error_handle error);
114 // Set a transport level setting, and push it to our peer
115 static void queue_setting_update(grpc_chttp2_transport* t,
116 grpc_chttp2_setting_id id, uint32_t value);
117
118 static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
119 grpc_error_handle error);
120
121 // Start new streams that have been created if we can
122 static void maybe_start_some_streams(grpc_chttp2_transport* t);
123
124 static void connectivity_state_set(grpc_chttp2_transport* t,
125 grpc_connectivity_state state,
126 const absl::Status& status,
127 const char* reason);
128
129 static void benign_reclaimer(void* arg, grpc_error_handle error);
130 static void destructive_reclaimer(void* arg, grpc_error_handle error);
131 static void benign_reclaimer_locked(void* arg, grpc_error_handle error);
132 static void destructive_reclaimer_locked(void* arg, grpc_error_handle error);
133
134 static void post_benign_reclaimer(grpc_chttp2_transport* t);
135 static void post_destructive_reclaimer(grpc_chttp2_transport* t);
136
137 static void close_transport_locked(grpc_chttp2_transport* t,
138 grpc_error_handle error);
139 static void end_all_the_calls(grpc_chttp2_transport* t,
140 grpc_error_handle error);
141
142 static void start_bdp_ping(void* tp, grpc_error_handle error);
143 static void finish_bdp_ping(void* tp, grpc_error_handle error);
144 static void start_bdp_ping_locked(void* tp, grpc_error_handle error);
145 static void finish_bdp_ping_locked(void* tp, grpc_error_handle error);
146 static void next_bdp_ping_timer_expired(void* tp, grpc_error_handle error);
147 static void next_bdp_ping_timer_expired_locked(void* tp,
148 grpc_error_handle error);
149
150 static void cancel_pings(grpc_chttp2_transport* t, grpc_error_handle error);
151 static void send_ping_locked(grpc_chttp2_transport* t,
152 grpc_closure* on_initiate, grpc_closure* on_ack);
153 static void retry_initiate_ping_locked(void* tp, grpc_error_handle error);
154
155 // keepalive-relevant functions
156 static void init_keepalive_ping(void* arg, grpc_error_handle error);
157 static void init_keepalive_ping_locked(void* arg, grpc_error_handle error);
158 static void start_keepalive_ping(void* arg, grpc_error_handle error);
159 static void finish_keepalive_ping(void* arg, grpc_error_handle error);
160 static void start_keepalive_ping_locked(void* arg, grpc_error_handle error);
161 static void finish_keepalive_ping_locked(void* arg, grpc_error_handle error);
162 static void keepalive_watchdog_fired(void* arg, grpc_error_handle error);
163 static void keepalive_watchdog_fired_locked(void* arg, grpc_error_handle error);
164
165 static void reset_byte_stream(void* arg, grpc_error_handle error);
166
167 // Flow control default enabled. Can be disabled by setting
168 // GRPC_EXPERIMENTAL_DISABLE_FLOW_CONTROL
169 bool g_flow_control_enabled = true;
170
171 namespace grpc_core {
172
173 namespace {
174 TestOnlyGlobalHttp2TransportInitCallback test_only_init_callback = nullptr;
175 TestOnlyGlobalHttp2TransportDestructCallback test_only_destruct_callback =
176 nullptr;
177 } // namespace
178
TestOnlySetGlobalHttp2TransportInitCallback(TestOnlyGlobalHttp2TransportInitCallback callback)179 void TestOnlySetGlobalHttp2TransportInitCallback(
180 TestOnlyGlobalHttp2TransportInitCallback callback) {
181 test_only_init_callback = callback;
182 }
183
TestOnlySetGlobalHttp2TransportDestructCallback(TestOnlyGlobalHttp2TransportDestructCallback callback)184 void TestOnlySetGlobalHttp2TransportDestructCallback(
185 TestOnlyGlobalHttp2TransportDestructCallback callback) {
186 test_only_destruct_callback = callback;
187 }
188
189 } // namespace grpc_core
190
191 //
192 // CONSTRUCTION/DESTRUCTION/REFCOUNTING
193 //
194
~grpc_chttp2_transport()195 grpc_chttp2_transport::~grpc_chttp2_transport() {
196 size_t i;
197
198 if (channelz_socket != nullptr) {
199 channelz_socket.reset();
200 }
201
202 grpc_endpoint_destroy(ep);
203
204 grpc_slice_buffer_destroy_internal(&qbuf);
205
206 grpc_slice_buffer_destroy_internal(&outbuf);
207 grpc_chttp2_hpack_compressor_destroy(&hpack_compressor);
208
209 grpc_error_handle error =
210 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed");
211 // ContextList::Execute follows semantics of a callback function and does not
212 // take a ref on error
213 grpc_core::ContextList::Execute(cl, nullptr, error);
214 GRPC_ERROR_UNREF(error);
215 cl = nullptr;
216
217 grpc_slice_buffer_destroy_internal(&read_buffer);
218 grpc_chttp2_hpack_parser_destroy(&hpack_parser);
219 grpc_chttp2_goaway_parser_destroy(&goaway_parser);
220
221 for (i = 0; i < STREAM_LIST_COUNT; i++) {
222 GPR_ASSERT(lists[i].head == nullptr);
223 GPR_ASSERT(lists[i].tail == nullptr);
224 }
225
226 GRPC_ERROR_UNREF(goaway_error);
227
228 GPR_ASSERT(grpc_chttp2_stream_map_size(&stream_map) == 0);
229
230 grpc_chttp2_stream_map_destroy(&stream_map);
231
232 GRPC_COMBINER_UNREF(combiner, "chttp2_transport");
233
234 cancel_pings(this,
235 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"));
236
237 while (write_cb_pool) {
238 grpc_chttp2_write_cb* next = write_cb_pool->next;
239 gpr_free(write_cb_pool);
240 write_cb_pool = next;
241 }
242
243 flow_control.Destroy();
244
245 GRPC_ERROR_UNREF(closed_with_error);
246 gpr_free(ping_acks);
247 if (grpc_core::test_only_destruct_callback != nullptr) {
248 grpc_core::test_only_destruct_callback();
249 }
250 }
251
252 static const grpc_transport_vtable* get_vtable(void);
253
254 // Returns whether bdp is enabled
read_channel_args(grpc_chttp2_transport * t,const grpc_channel_args * channel_args,bool is_client)255 static bool read_channel_args(grpc_chttp2_transport* t,
256 const grpc_channel_args* channel_args,
257 bool is_client) {
258 bool enable_bdp = true;
259 bool channelz_enabled = GRPC_ENABLE_CHANNELZ_DEFAULT;
260 size_t i;
261 int j;
262
263 for (i = 0; i < channel_args->num_args; i++) {
264 if (0 == strcmp(channel_args->args[i].key,
265 GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER)) {
266 const grpc_integer_options options = {-1, 0, INT_MAX};
267 const int value =
268 grpc_channel_arg_get_integer(&channel_args->args[i], options);
269 if (value >= 0) {
270 if ((t->next_stream_id & 1) != (value & 1)) {
271 gpr_log(GPR_ERROR, "%s: low bit must be %d on %s",
272 GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER, t->next_stream_id & 1,
273 is_client ? "client" : "server");
274 } else {
275 t->next_stream_id = static_cast<uint32_t>(value);
276 }
277 }
278 } else if (0 == strcmp(channel_args->args[i].key,
279 GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER)) {
280 const grpc_integer_options options = {-1, 0, INT_MAX};
281 const int value =
282 grpc_channel_arg_get_integer(&channel_args->args[i], options);
283 if (value >= 0) {
284 grpc_chttp2_hpack_compressor_set_max_usable_size(
285 &t->hpack_compressor, static_cast<uint32_t>(value));
286 }
287 } else if (0 == strcmp(channel_args->args[i].key,
288 GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) {
289 t->ping_policy.max_pings_without_data = grpc_channel_arg_get_integer(
290 &channel_args->args[i],
291 {g_default_max_pings_without_data, 0, INT_MAX});
292 } else if (0 == strcmp(channel_args->args[i].key,
293 GRPC_ARG_HTTP2_MAX_PING_STRIKES)) {
294 t->ping_policy.max_ping_strikes = grpc_channel_arg_get_integer(
295 &channel_args->args[i], {g_default_max_ping_strikes, 0, INT_MAX});
296 } else if (0 ==
297 strcmp(channel_args->args[i].key,
298 GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS)) {
299 t->ping_policy.min_recv_ping_interval_without_data =
300 grpc_channel_arg_get_integer(
301 &channel_args->args[i],
302 grpc_integer_options{
303 g_default_min_recv_ping_interval_without_data_ms, 0,
304 INT_MAX});
305 } else if (0 == strcmp(channel_args->args[i].key,
306 GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)) {
307 t->write_buffer_size = static_cast<uint32_t>(grpc_channel_arg_get_integer(
308 &channel_args->args[i], {0, 0, MAX_WRITE_BUFFER_SIZE}));
309 } else if (0 ==
310 strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) {
311 enable_bdp = grpc_channel_arg_get_bool(&channel_args->args[i], true);
312 } else if (0 ==
313 strcmp(channel_args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) {
314 const int value = grpc_channel_arg_get_integer(
315 &channel_args->args[i],
316 grpc_integer_options{t->is_client
317 ? g_default_client_keepalive_time_ms
318 : g_default_server_keepalive_time_ms,
319 1, INT_MAX});
320 t->keepalive_time = value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value;
321 } else if (0 == strcmp(channel_args->args[i].key,
322 GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) {
323 const int value = grpc_channel_arg_get_integer(
324 &channel_args->args[i],
325 grpc_integer_options{t->is_client
326 ? g_default_client_keepalive_timeout_ms
327 : g_default_server_keepalive_timeout_ms,
328 0, INT_MAX});
329 t->keepalive_timeout = value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value;
330 } else if (0 == strcmp(channel_args->args[i].key,
331 GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) {
332 t->keepalive_permit_without_calls = static_cast<uint32_t>(
333 grpc_channel_arg_get_integer(&channel_args->args[i], {0, 0, 1}));
334 } else if (0 == strcmp(channel_args->args[i].key,
335 GRPC_ARG_OPTIMIZATION_TARGET)) {
336 gpr_log(GPR_INFO, "GRPC_ARG_OPTIMIZATION_TARGET is deprecated");
337 } else if (0 ==
338 strcmp(channel_args->args[i].key, GRPC_ARG_ENABLE_CHANNELZ)) {
339 channelz_enabled = grpc_channel_arg_get_bool(
340 &channel_args->args[i], GRPC_ENABLE_CHANNELZ_DEFAULT);
341 } else {
342 static const struct {
343 const char* channel_arg_name;
344 grpc_chttp2_setting_id setting_id;
345 grpc_integer_options integer_options;
346 bool availability[2] /* server, client */;
347 } settings_map[] = {{GRPC_ARG_MAX_CONCURRENT_STREAMS,
348 GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
349 {-1, 0, INT32_MAX},
350 {true, false}},
351 {GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER,
352 GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE,
353 {-1, 0, INT32_MAX},
354 {true, true}},
355 {GRPC_ARG_MAX_METADATA_SIZE,
356 GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
357 {-1, 0, INT32_MAX},
358 {true, true}},
359 {GRPC_ARG_HTTP2_MAX_FRAME_SIZE,
360 GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
361 {-1, 16384, 16777215},
362 {true, true}},
363 {GRPC_ARG_HTTP2_ENABLE_TRUE_BINARY,
364 GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA,
365 {1, 0, 1},
366 {true, true}},
367 {GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES,
368 GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
369 {-1, 5, INT32_MAX},
370 {true, true}}};
371 for (j = 0; j < static_cast<int> GPR_ARRAY_SIZE(settings_map); j++) {
372 if (0 == strcmp(channel_args->args[i].key,
373 settings_map[j].channel_arg_name)) {
374 if (!settings_map[j].availability[is_client]) {
375 gpr_log(GPR_DEBUG, "%s is not available on %s",
376 settings_map[j].channel_arg_name,
377 is_client ? "clients" : "servers");
378 } else {
379 int value = grpc_channel_arg_get_integer(
380 &channel_args->args[i], settings_map[j].integer_options);
381 if (value >= 0) {
382 queue_setting_update(t, settings_map[j].setting_id,
383 static_cast<uint32_t>(value));
384 }
385 }
386 break;
387 }
388 }
389 }
390 }
391 if (channelz_enabled) {
392 t->channelz_socket =
393 grpc_core::MakeRefCounted<grpc_core::channelz::SocketNode>(
394 std::string(grpc_endpoint_get_local_address(t->ep)), t->peer_string,
395 absl::StrFormat("%s %s", get_vtable()->name, t->peer_string),
396 grpc_core::channelz::SocketNode::Security::GetFromChannelArgs(
397 channel_args));
398 }
399 return enable_bdp;
400 }
401
init_transport_keepalive_settings(grpc_chttp2_transport * t)402 static void init_transport_keepalive_settings(grpc_chttp2_transport* t) {
403 if (t->is_client) {
404 t->keepalive_time = g_default_client_keepalive_time_ms == INT_MAX
405 ? GRPC_MILLIS_INF_FUTURE
406 : g_default_client_keepalive_time_ms;
407 t->keepalive_timeout = g_default_client_keepalive_timeout_ms == INT_MAX
408 ? GRPC_MILLIS_INF_FUTURE
409 : g_default_client_keepalive_timeout_ms;
410 t->keepalive_permit_without_calls =
411 g_default_client_keepalive_permit_without_calls;
412 } else {
413 t->keepalive_time = g_default_server_keepalive_time_ms == INT_MAX
414 ? GRPC_MILLIS_INF_FUTURE
415 : g_default_server_keepalive_time_ms;
416 t->keepalive_timeout = g_default_server_keepalive_timeout_ms == INT_MAX
417 ? GRPC_MILLIS_INF_FUTURE
418 : g_default_server_keepalive_timeout_ms;
419 t->keepalive_permit_without_calls =
420 g_default_server_keepalive_permit_without_calls;
421 }
422 }
423
configure_transport_ping_policy(grpc_chttp2_transport * t)424 static void configure_transport_ping_policy(grpc_chttp2_transport* t) {
425 t->ping_policy.max_pings_without_data = g_default_max_pings_without_data;
426 t->ping_policy.max_ping_strikes = g_default_max_ping_strikes;
427 t->ping_policy.min_recv_ping_interval_without_data =
428 g_default_min_recv_ping_interval_without_data_ms;
429 }
430
init_keepalive_pings_if_enabled(grpc_chttp2_transport * t)431 static void init_keepalive_pings_if_enabled(grpc_chttp2_transport* t) {
432 if (t->keepalive_time != GRPC_MILLIS_INF_FUTURE) {
433 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
434 GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
435 GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t,
436 grpc_schedule_on_exec_ctx);
437 grpc_timer_init(&t->keepalive_ping_timer,
438 grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
439 &t->init_keepalive_ping_locked);
440 } else {
441 // Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no
442 // inflight keeaplive timers
443 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED;
444 }
445 }
446
grpc_chttp2_transport(const grpc_channel_args * channel_args,grpc_endpoint * ep,bool is_client,grpc_resource_user * resource_user)447 grpc_chttp2_transport::grpc_chttp2_transport(
448 const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client,
449 grpc_resource_user* resource_user)
450 : refs(1, GRPC_TRACE_FLAG_ENABLED(grpc_trace_chttp2_refcount)
451 ? "chttp2_refcount"
452 : nullptr),
453 ep(ep),
454 peer_string(grpc_endpoint_get_peer(ep)),
455 resource_user(resource_user),
456 combiner(grpc_combiner_create()),
457 state_tracker(is_client ? "client_transport" : "server_transport",
458 GRPC_CHANNEL_READY),
459 is_client(is_client),
460 next_stream_id(is_client ? 1 : 2),
461 deframe_state(is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0) {
462 GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) ==
463 GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
464 base.vtable = get_vtable();
465 // 8 is a random stab in the dark as to a good initial size: it's small enough
466 // that it shouldn't waste memory for infrequently used connections, yet
467 // large enough that the exponential growth should happen nicely when it's
468 // needed.
469 // TODO(ctiller): tune this
470 grpc_chttp2_stream_map_init(&stream_map, 8);
471
472 grpc_slice_buffer_init(&read_buffer);
473 grpc_slice_buffer_init(&outbuf);
474 if (is_client) {
475 grpc_slice_buffer_add(&outbuf, grpc_slice_from_copied_string(
476 GRPC_CHTTP2_CLIENT_CONNECT_STRING));
477 }
478 grpc_chttp2_hpack_compressor_init(&hpack_compressor);
479 grpc_slice_buffer_init(&qbuf);
480 // copy in initial settings to all setting sets
481 size_t i;
482 int j;
483 for (i = 0; i < GRPC_CHTTP2_NUM_SETTINGS; i++) {
484 for (j = 0; j < GRPC_NUM_SETTING_SETS; j++) {
485 settings[j][i] = grpc_chttp2_settings_parameters[i].default_value;
486 }
487 }
488 grpc_chttp2_hpack_parser_init(&hpack_parser);
489 grpc_chttp2_goaway_parser_init(&goaway_parser);
490
491 // configure http2 the way we like it
492 if (is_client) {
493 queue_setting_update(this, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
494 queue_setting_update(this, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
495 }
496 queue_setting_update(this, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
497 DEFAULT_MAX_HEADER_LIST_SIZE);
498 queue_setting_update(this,
499 GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, 1);
500
501 configure_transport_ping_policy(this);
502 init_transport_keepalive_settings(this);
503
504 bool enable_bdp = true;
505 if (channel_args) {
506 enable_bdp = read_channel_args(this, channel_args, is_client);
507 }
508
509 if (g_flow_control_enabled) {
510 flow_control.Init<grpc_core::chttp2::TransportFlowControl>(this,
511 enable_bdp);
512 } else {
513 flow_control.Init<grpc_core::chttp2::TransportFlowControlDisabled>(this);
514 enable_bdp = false;
515 }
516
517 // No pings allowed before receiving a header or data frame.
518 ping_state.pings_before_data_required = 0;
519 ping_state.is_delayed_ping_timer_set = false;
520 ping_state.last_ping_sent_time = GRPC_MILLIS_INF_PAST;
521
522 ping_recv_state.last_ping_recv_time = GRPC_MILLIS_INF_PAST;
523 ping_recv_state.ping_strikes = 0;
524
525 init_keepalive_pings_if_enabled(this);
526
527 if (enable_bdp) {
528 bdp_ping_blocked = true;
529 grpc_chttp2_act_on_flowctl_action(flow_control->PeriodicUpdate(), this,
530 nullptr);
531 }
532
533 grpc_chttp2_initiate_write(this, GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE);
534 post_benign_reclaimer(this);
535 if (grpc_core::test_only_init_callback != nullptr) {
536 grpc_core::test_only_init_callback();
537 }
538 }
539
destroy_transport_locked(void * tp,grpc_error_handle)540 static void destroy_transport_locked(void* tp, grpc_error_handle /*error*/) {
541 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
542 t->destroying = 1;
543 close_transport_locked(
544 t, grpc_error_set_int(
545 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"),
546 GRPC_ERROR_INT_OCCURRED_DURING_WRITE, t->write_state));
547 // Must be the last line.
548 GRPC_CHTTP2_UNREF_TRANSPORT(t, "destroy");
549 }
550
destroy_transport(grpc_transport * gt)551 static void destroy_transport(grpc_transport* gt) {
552 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
553 t->combiner->Run(GRPC_CLOSURE_CREATE(destroy_transport_locked, t, nullptr),
554 GRPC_ERROR_NONE);
555 }
556
close_transport_locked(grpc_chttp2_transport * t,grpc_error_handle error)557 static void close_transport_locked(grpc_chttp2_transport* t,
558 grpc_error_handle error) {
559 end_all_the_calls(t, GRPC_ERROR_REF(error));
560 cancel_pings(t, GRPC_ERROR_REF(error));
561 if (t->closed_with_error == GRPC_ERROR_NONE) {
562 if (!grpc_error_has_clear_grpc_status(error)) {
563 error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
564 GRPC_STATUS_UNAVAILABLE);
565 }
566 if (t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE) {
567 if (t->close_transport_on_writes_finished == nullptr) {
568 t->close_transport_on_writes_finished =
569 GRPC_ERROR_CREATE_FROM_STATIC_STRING(
570 "Delayed close due to in-progress write");
571 }
572 t->close_transport_on_writes_finished =
573 grpc_error_add_child(t->close_transport_on_writes_finished, error);
574 return;
575 }
576 GPR_ASSERT(error != GRPC_ERROR_NONE);
577 t->closed_with_error = GRPC_ERROR_REF(error);
578 connectivity_state_set(t, GRPC_CHANNEL_SHUTDOWN, absl::Status(),
579 "close_transport");
580 if (t->ping_state.is_delayed_ping_timer_set) {
581 grpc_timer_cancel(&t->ping_state.delayed_ping_timer);
582 }
583 if (t->have_next_bdp_ping_timer) {
584 grpc_timer_cancel(&t->next_bdp_ping_timer);
585 }
586 switch (t->keepalive_state) {
587 case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING:
588 grpc_timer_cancel(&t->keepalive_ping_timer);
589 break;
590 case GRPC_CHTTP2_KEEPALIVE_STATE_PINGING:
591 grpc_timer_cancel(&t->keepalive_ping_timer);
592 grpc_timer_cancel(&t->keepalive_watchdog_timer);
593 break;
594 case GRPC_CHTTP2_KEEPALIVE_STATE_DYING:
595 case GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED:
596 // keepalive timers are not set in these two states
597 break;
598 }
599
600 // flush writable stream list to avoid dangling references
601 grpc_chttp2_stream* s;
602 while (grpc_chttp2_list_pop_writable_stream(t, &s)) {
603 GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:close");
604 }
605 GPR_ASSERT(t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE);
606 grpc_endpoint_shutdown(t->ep, GRPC_ERROR_REF(error));
607 }
608 if (t->notify_on_receive_settings != nullptr) {
609 grpc_core::ExecCtx::Run(DEBUG_LOCATION, t->notify_on_receive_settings,
610 GRPC_ERROR_REF(error));
611 t->notify_on_receive_settings = nullptr;
612 }
613 if (t->notify_on_close != nullptr) {
614 grpc_core::ExecCtx::Run(DEBUG_LOCATION, t->notify_on_close,
615 GRPC_ERROR_REF(error));
616 t->notify_on_close = nullptr;
617 }
618 GRPC_ERROR_UNREF(error);
619 }
620
621 #ifndef NDEBUG
grpc_chttp2_stream_ref(grpc_chttp2_stream * s,const char * reason)622 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s, const char* reason) {
623 grpc_stream_ref(s->refcount, reason);
624 }
grpc_chttp2_stream_unref(grpc_chttp2_stream * s,const char * reason)625 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s, const char* reason) {
626 grpc_stream_unref(s->refcount, reason);
627 }
628 #else
grpc_chttp2_stream_ref(grpc_chttp2_stream * s)629 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s) {
630 grpc_stream_ref(s->refcount);
631 }
grpc_chttp2_stream_unref(grpc_chttp2_stream * s)632 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s) {
633 grpc_stream_unref(s->refcount);
634 }
635 #endif
636
Reffer(grpc_chttp2_stream * s)637 grpc_chttp2_stream::Reffer::Reffer(grpc_chttp2_stream* s) {
638 // We reserve one 'active stream' that's dropped when the stream is
639 // read-closed. The others are for Chttp2IncomingByteStreams that are
640 // actively reading
641 GRPC_CHTTP2_STREAM_REF(s, "chttp2");
642 GRPC_CHTTP2_REF_TRANSPORT(s->t, "stream");
643 }
644
grpc_chttp2_stream(grpc_chttp2_transport * t,grpc_stream_refcount * refcount,const void * server_data,grpc_core::Arena * arena)645 grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t,
646 grpc_stream_refcount* refcount,
647 const void* server_data,
648 grpc_core::Arena* arena)
649 : t(t),
650 refcount(refcount),
651 reffer(this),
652 metadata_buffer{grpc_chttp2_incoming_metadata_buffer(arena),
653 grpc_chttp2_incoming_metadata_buffer(arena)} {
654 if (server_data) {
655 id = static_cast<uint32_t>(reinterpret_cast<uintptr_t>(server_data));
656 *t->accepting_stream = this;
657 grpc_chttp2_stream_map_add(&t->stream_map, id, this);
658 post_destructive_reclaimer(t);
659 }
660 if (t->flow_control->flow_control_enabled()) {
661 flow_control.Init<grpc_core::chttp2::StreamFlowControl>(
662 static_cast<grpc_core::chttp2::TransportFlowControl*>(
663 t->flow_control.get()),
664 this);
665 } else {
666 flow_control.Init<grpc_core::chttp2::StreamFlowControlDisabled>();
667 }
668
669 grpc_slice_buffer_init(&frame_storage);
670 grpc_slice_buffer_init(&unprocessed_incoming_frames_buffer);
671 grpc_slice_buffer_init(&flow_controlled_buffer);
672 GRPC_CLOSURE_INIT(&reset_byte_stream, ::reset_byte_stream, this, nullptr);
673 }
674
~grpc_chttp2_stream()675 grpc_chttp2_stream::~grpc_chttp2_stream() {
676 if (t->channelz_socket != nullptr) {
677 if ((t->is_client && eos_received) || (!t->is_client && eos_sent)) {
678 t->channelz_socket->RecordStreamSucceeded();
679 } else {
680 t->channelz_socket->RecordStreamFailed();
681 }
682 }
683
684 GPR_ASSERT((write_closed && read_closed) || id == 0);
685 if (id != 0) {
686 GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, id) == nullptr);
687 }
688
689 grpc_slice_buffer_destroy_internal(&unprocessed_incoming_frames_buffer);
690 grpc_slice_buffer_destroy_internal(&frame_storage);
691 if (stream_compression_method != GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) {
692 grpc_slice_buffer_destroy_internal(&compressed_data_buffer);
693 }
694 if (stream_decompression_method !=
695 GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
696 grpc_slice_buffer_destroy_internal(&decompressed_data_buffer);
697 }
698
699 for (int i = 0; i < STREAM_LIST_COUNT; i++) {
700 if (GPR_UNLIKELY(included[i])) {
701 gpr_log(GPR_ERROR, "%s stream %d still included in list %d",
702 t->is_client ? "client" : "server", id, i);
703 abort();
704 }
705 }
706
707 GPR_ASSERT(send_initial_metadata_finished == nullptr);
708 GPR_ASSERT(fetching_send_message == nullptr);
709 GPR_ASSERT(send_trailing_metadata_finished == nullptr);
710 GPR_ASSERT(recv_initial_metadata_ready == nullptr);
711 GPR_ASSERT(recv_message_ready == nullptr);
712 GPR_ASSERT(recv_trailing_metadata_finished == nullptr);
713 grpc_slice_buffer_destroy_internal(&flow_controlled_buffer);
714 GRPC_ERROR_UNREF(read_closed_error);
715 GRPC_ERROR_UNREF(write_closed_error);
716 GRPC_ERROR_UNREF(byte_stream_error);
717
718 flow_control.Destroy();
719
720 if (t->resource_user != nullptr) {
721 grpc_resource_user_free(t->resource_user, GRPC_RESOURCE_QUOTA_CALL_SIZE);
722 }
723
724 GRPC_CHTTP2_UNREF_TRANSPORT(t, "stream");
725 grpc_core::ExecCtx::Run(DEBUG_LOCATION, destroy_stream_arg, GRPC_ERROR_NONE);
726 }
727
init_stream(grpc_transport * gt,grpc_stream * gs,grpc_stream_refcount * refcount,const void * server_data,grpc_core::Arena * arena)728 static int init_stream(grpc_transport* gt, grpc_stream* gs,
729 grpc_stream_refcount* refcount, const void* server_data,
730 grpc_core::Arena* arena) {
731 GPR_TIMER_SCOPE("init_stream", 0);
732 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
733 new (gs) grpc_chttp2_stream(t, refcount, server_data, arena);
734 return 0;
735 }
736
destroy_stream_locked(void * sp,grpc_error_handle)737 static void destroy_stream_locked(void* sp, grpc_error_handle /*error*/) {
738 GPR_TIMER_SCOPE("destroy_stream", 0);
739 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(sp);
740 s->~grpc_chttp2_stream();
741 }
742
destroy_stream(grpc_transport * gt,grpc_stream * gs,grpc_closure * then_schedule_closure)743 static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
744 grpc_closure* then_schedule_closure) {
745 GPR_TIMER_SCOPE("destroy_stream", 0);
746 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
747 grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs);
748 if (s->stream_compression_method !=
749 GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS &&
750 s->stream_compression_ctx != nullptr) {
751 grpc_stream_compression_context_destroy(s->stream_compression_ctx);
752 s->stream_compression_ctx = nullptr;
753 }
754 if (s->stream_decompression_method !=
755 GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS &&
756 s->stream_decompression_ctx != nullptr) {
757 grpc_stream_compression_context_destroy(s->stream_decompression_ctx);
758 s->stream_decompression_ctx = nullptr;
759 }
760
761 s->destroy_stream_arg = then_schedule_closure;
762 t->combiner->Run(
763 GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s, nullptr),
764 GRPC_ERROR_NONE);
765 }
766
grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport * t,uint32_t id)767 grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t,
768 uint32_t id) {
769 if (t->accept_stream_cb == nullptr) {
770 return nullptr;
771 }
772 // Don't accept the stream if memory quota doesn't allow. Note that we should
773 // simply refuse the stream here instead of canceling the stream after it's
774 // accepted since the latter will create the call which costs much memory.
775 if (t->resource_user != nullptr &&
776 !grpc_resource_user_safe_alloc(t->resource_user,
777 GRPC_RESOURCE_QUOTA_CALL_SIZE)) {
778 gpr_log(GPR_ERROR, "Memory exhausted, rejecting the stream.");
779 grpc_chttp2_add_rst_stream_to_next_write(t, id, GRPC_HTTP2_REFUSED_STREAM,
780 nullptr);
781 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
782 return nullptr;
783 }
784 grpc_chttp2_stream* accepting = nullptr;
785 GPR_ASSERT(t->accepting_stream == nullptr);
786 t->accepting_stream = &accepting;
787 t->accept_stream_cb(t->accept_stream_cb_user_data, &t->base,
788 reinterpret_cast<void*>(id));
789 t->accepting_stream = nullptr;
790 return accepting;
791 }
792
793 //
794 // OUTPUT PROCESSING
795 //
796
write_state_name(grpc_chttp2_write_state st)797 static const char* write_state_name(grpc_chttp2_write_state st) {
798 switch (st) {
799 case GRPC_CHTTP2_WRITE_STATE_IDLE:
800 return "IDLE";
801 case GRPC_CHTTP2_WRITE_STATE_WRITING:
802 return "WRITING";
803 case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
804 return "WRITING+MORE";
805 }
806 GPR_UNREACHABLE_CODE(return "UNKNOWN");
807 }
808
set_write_state(grpc_chttp2_transport * t,grpc_chttp2_write_state st,const char * reason)809 static void set_write_state(grpc_chttp2_transport* t,
810 grpc_chttp2_write_state st, const char* reason) {
811 GRPC_CHTTP2_IF_TRACING(
812 gpr_log(GPR_INFO, "W:%p %s [%s] state %s -> %s [%s]", t,
813 t->is_client ? "CLIENT" : "SERVER", t->peer_string.c_str(),
814 write_state_name(t->write_state), write_state_name(st), reason));
815 t->write_state = st;
816 // If the state is being reset back to idle, it means a write was just
817 // finished. Make sure all the run_after_write closures are scheduled.
818 //
819 // This is also our chance to close the transport if the transport was marked
820 // to be closed after all writes finish (for example, if we received a go-away
821 // from peer while we had some pending writes)
822 if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) {
823 grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &t->run_after_write);
824 if (t->close_transport_on_writes_finished != nullptr) {
825 grpc_error_handle err = t->close_transport_on_writes_finished;
826 t->close_transport_on_writes_finished = nullptr;
827 close_transport_locked(t, err);
828 }
829 }
830 }
831
inc_initiate_write_reason(grpc_chttp2_initiate_write_reason reason)832 static void inc_initiate_write_reason(
833 grpc_chttp2_initiate_write_reason reason) {
834 switch (reason) {
835 case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE:
836 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_INITIAL_WRITE();
837 break;
838 case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM:
839 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_START_NEW_STREAM();
840 break;
841 case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE:
842 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_MESSAGE();
843 break;
844 case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA:
845 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_INITIAL_METADATA();
846 break;
847 case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA:
848 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_TRAILING_METADATA();
849 break;
850 case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING:
851 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RETRY_SEND_PING();
852 break;
853 case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS:
854 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CONTINUE_PINGS();
855 break;
856 case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT:
857 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_GOAWAY_SENT();
858 break;
859 case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM:
860 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RST_STREAM();
861 break;
862 case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API:
863 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CLOSE_FROM_API();
864 break;
865 case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL:
866 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_STREAM_FLOW_CONTROL();
867 break;
868 case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL:
869 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL();
870 break;
871 case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS:
872 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_SETTINGS();
873 break;
874 case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING:
875 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_SETTING();
876 break;
877 case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE:
878 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_UPDATE();
879 break;
880 case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING:
881 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_APPLICATION_PING();
882 break;
883 case GRPC_CHTTP2_INITIATE_WRITE_BDP_PING:
884 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_BDP_ESTIMATOR_PING();
885 break;
886 case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING:
887 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_KEEPALIVE_PING();
888 break;
889 case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED:
890 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL_UNSTALLED();
891 break;
892 case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE:
893 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_PING_RESPONSE();
894 break;
895 case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM:
896 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FORCE_RST_STREAM();
897 break;
898 }
899 }
900
grpc_chttp2_initiate_write(grpc_chttp2_transport * t,grpc_chttp2_initiate_write_reason reason)901 void grpc_chttp2_initiate_write(grpc_chttp2_transport* t,
902 grpc_chttp2_initiate_write_reason reason) {
903 GPR_TIMER_SCOPE("grpc_chttp2_initiate_write", 0);
904
905 switch (t->write_state) {
906 case GRPC_CHTTP2_WRITE_STATE_IDLE:
907 inc_initiate_write_reason(reason);
908 set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING,
909 grpc_chttp2_initiate_write_reason_string(reason));
910 GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
911 // Note that the 'write_action_begin_locked' closure is being scheduled
912 // on the 'finally_scheduler' of t->combiner. This means that
913 // 'write_action_begin_locked' is called only *after* all the other
914 // closures (some of which are potentially initiating more writes on the
915 // transport) are executed on the t->combiner.
916 //
917 // The reason for scheduling on finally_scheduler is to make sure we batch
918 // as many writes as possible. 'write_action_begin_locked' is the function
919 // that gathers all the relevant bytes (which are at various places in the
920 // grpc_chttp2_transport structure) and append them to 'outbuf' field in
921 // grpc_chttp2_transport thereby batching what would have been potentially
922 // multiple write operations.
923 //
924 // Also, 'write_action_begin_locked' only gathers the bytes into outbuf.
925 // It does not call the endpoint to write the bytes. That is done by the
926 // 'write_action' (which is scheduled by 'write_action_begin_locked')
927 t->combiner->FinallyRun(
928 GRPC_CLOSURE_INIT(&t->write_action_begin_locked,
929 write_action_begin_locked, t, nullptr),
930 GRPC_ERROR_NONE);
931 break;
932 case GRPC_CHTTP2_WRITE_STATE_WRITING:
933 set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
934 grpc_chttp2_initiate_write_reason_string(reason));
935 break;
936 case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
937 break;
938 }
939 }
940
grpc_chttp2_mark_stream_writable(grpc_chttp2_transport * t,grpc_chttp2_stream * s)941 void grpc_chttp2_mark_stream_writable(grpc_chttp2_transport* t,
942 grpc_chttp2_stream* s) {
943 if (t->closed_with_error == GRPC_ERROR_NONE &&
944 grpc_chttp2_list_add_writable_stream(t, s)) {
945 GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become");
946 }
947 }
948
begin_writing_desc(bool partial)949 static const char* begin_writing_desc(bool partial) {
950 if (partial) {
951 return "begin partial write in background";
952 } else {
953 return "begin write in current thread";
954 }
955 }
956
write_action_begin_locked(void * gt,grpc_error_handle)957 static void write_action_begin_locked(void* gt,
958 grpc_error_handle /*error_ignored*/) {
959 GPR_TIMER_SCOPE("write_action_begin_locked", 0);
960 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(gt);
961 GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE);
962 grpc_chttp2_begin_write_result r;
963 if (t->closed_with_error != GRPC_ERROR_NONE) {
964 r.writing = false;
965 } else {
966 r = grpc_chttp2_begin_write(t);
967 }
968 if (r.writing) {
969 if (r.partial) {
970 GRPC_STATS_INC_HTTP2_PARTIAL_WRITES();
971 }
972 set_write_state(t,
973 r.partial ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE
974 : GRPC_CHTTP2_WRITE_STATE_WRITING,
975 begin_writing_desc(r.partial));
976 write_action(t, GRPC_ERROR_NONE);
977 if (t->reading_paused_on_pending_induced_frames) {
978 GPR_ASSERT(t->num_pending_induced_frames == 0);
979 // We had paused reading, because we had many induced frames (SETTINGS
980 // ACK, PINGS ACK and RST_STREAMS) pending in t->qbuf. Now that we have
981 // been able to flush qbuf, we can resume reading.
982 GRPC_CHTTP2_IF_TRACING(gpr_log(
983 GPR_INFO,
984 "transport %p : Resuming reading after being paused due to too "
985 "many unwritten SETTINGS ACK, PINGS ACK and RST_STREAM frames",
986 t));
987 t->reading_paused_on_pending_induced_frames = false;
988 continue_read_action_locked(t);
989 }
990 } else {
991 GRPC_STATS_INC_HTTP2_SPURIOUS_WRITES_BEGUN();
992 set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "begin writing nothing");
993 GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing");
994 }
995 }
996
write_action(void * gt,grpc_error_handle)997 static void write_action(void* gt, grpc_error_handle /*error*/) {
998 GPR_TIMER_SCOPE("write_action", 0);
999 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(gt);
1000 void* cl = t->cl;
1001 t->cl = nullptr;
1002 grpc_endpoint_write(
1003 t->ep, &t->outbuf,
1004 GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end, t,
1005 grpc_schedule_on_exec_ctx),
1006 cl);
1007 }
1008
write_action_end(void * tp,grpc_error_handle error)1009 static void write_action_end(void* tp, grpc_error_handle error) {
1010 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
1011 t->combiner->Run(GRPC_CLOSURE_INIT(&t->write_action_end_locked,
1012 write_action_end_locked, t, nullptr),
1013 GRPC_ERROR_REF(error));
1014 }
1015
1016 // Callback from the grpc_endpoint after bytes have been written by calling
1017 // sendmsg
write_action_end_locked(void * tp,grpc_error_handle error)1018 static void write_action_end_locked(void* tp, grpc_error_handle error) {
1019 GPR_TIMER_SCOPE("terminate_writing_with_lock", 0);
1020 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
1021
1022 bool closed = false;
1023 if (error != GRPC_ERROR_NONE) {
1024 close_transport_locked(t, GRPC_ERROR_REF(error));
1025 closed = true;
1026 }
1027
1028 if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED) {
1029 t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SENT;
1030 closed = true;
1031 if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
1032 close_transport_locked(
1033 t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("goaway sent"));
1034 }
1035 }
1036
1037 switch (t->write_state) {
1038 case GRPC_CHTTP2_WRITE_STATE_IDLE:
1039 GPR_UNREACHABLE_CODE(break);
1040 case GRPC_CHTTP2_WRITE_STATE_WRITING:
1041 GPR_TIMER_MARK("state=writing", 0);
1042 set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "finish writing");
1043 break;
1044 case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
1045 GPR_TIMER_MARK("state=writing_stale_no_poller", 0);
1046 set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, "continue writing");
1047 GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
1048 // If the transport is closed, we will retry writing on the endpoint
1049 // and next write may contain part of the currently serialized frames.
1050 // So, we should only call the run_after_write callbacks when the next
1051 // write finishes, or the callbacks will be invoked when the stream is
1052 // closed.
1053 if (!closed) {
1054 grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &t->run_after_write);
1055 }
1056 t->combiner->FinallyRun(
1057 GRPC_CLOSURE_INIT(&t->write_action_begin_locked,
1058 write_action_begin_locked, t, nullptr),
1059 GRPC_ERROR_NONE);
1060 break;
1061 }
1062
1063 grpc_chttp2_end_write(t, GRPC_ERROR_REF(error));
1064 GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing");
1065 }
1066
1067 // Dirties an HTTP2 setting to be sent out next time a writing path occurs.
1068 // If the change needs to occur immediately, manually initiate a write.
queue_setting_update(grpc_chttp2_transport * t,grpc_chttp2_setting_id id,uint32_t value)1069 static void queue_setting_update(grpc_chttp2_transport* t,
1070 grpc_chttp2_setting_id id, uint32_t value) {
1071 const grpc_chttp2_setting_parameters* sp =
1072 &grpc_chttp2_settings_parameters[id];
1073 uint32_t use_value = GPR_CLAMP(value, sp->min_value, sp->max_value);
1074 if (use_value != value) {
1075 gpr_log(GPR_INFO, "Requested parameter %s clamped from %d to %d", sp->name,
1076 value, use_value);
1077 }
1078 if (use_value != t->settings[GRPC_LOCAL_SETTINGS][id]) {
1079 t->settings[GRPC_LOCAL_SETTINGS][id] = use_value;
1080 t->dirtied_local_settings = true;
1081 }
1082 }
1083
grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport * t,uint32_t goaway_error,uint32_t last_stream_id,const grpc_slice & goaway_text)1084 void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
1085 uint32_t goaway_error,
1086 uint32_t last_stream_id,
1087 const grpc_slice& goaway_text) {
1088 // Discard the error from a previous goaway frame (if any)
1089 if (t->goaway_error != GRPC_ERROR_NONE) {
1090 GRPC_ERROR_UNREF(t->goaway_error);
1091 }
1092 t->goaway_error = grpc_error_set_str(
1093 grpc_error_set_int(
1094 grpc_error_set_int(
1095 GRPC_ERROR_CREATE_FROM_STATIC_STRING("GOAWAY received"),
1096 GRPC_ERROR_INT_HTTP2_ERROR, static_cast<intptr_t>(goaway_error)),
1097 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
1098 GRPC_ERROR_STR_RAW_BYTES, goaway_text);
1099
1100 GRPC_CHTTP2_IF_TRACING(
1101 gpr_log(GPR_INFO, "transport %p got goaway with last stream id %d", t,
1102 last_stream_id));
1103 // We want to log this irrespective of whether http tracing is enabled if we
1104 // received a GOAWAY with a non NO_ERROR code.
1105 if (goaway_error != GRPC_HTTP2_NO_ERROR) {
1106 gpr_log(GPR_INFO, "%s: Got goaway [%d] err=%s", t->peer_string.c_str(),
1107 goaway_error, grpc_error_std_string(t->goaway_error).c_str());
1108 }
1109 absl::Status status = grpc_error_to_absl_status(t->goaway_error);
1110 // When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug
1111 // data equal to "too_many_pings", it should log the occurrence at a log level
1112 // that is enabled by default and double the configured KEEPALIVE_TIME used
1113 // for new connections on that channel.
1114 if (GPR_UNLIKELY(t->is_client &&
1115 goaway_error == GRPC_HTTP2_ENHANCE_YOUR_CALM &&
1116 grpc_slice_str_cmp(goaway_text, "too_many_pings") == 0)) {
1117 gpr_log(GPR_ERROR,
1118 "Received a GOAWAY with error code ENHANCE_YOUR_CALM and debug "
1119 "data equal to \"too_many_pings\"");
1120 double current_keepalive_time_ms = static_cast<double>(t->keepalive_time);
1121 constexpr int max_keepalive_time_ms =
1122 INT_MAX / KEEPALIVE_TIME_BACKOFF_MULTIPLIER;
1123 t->keepalive_time =
1124 current_keepalive_time_ms > static_cast<double>(max_keepalive_time_ms)
1125 ? GRPC_MILLIS_INF_FUTURE
1126 : static_cast<grpc_millis>(current_keepalive_time_ms *
1127 KEEPALIVE_TIME_BACKOFF_MULTIPLIER);
1128 status.SetPayload(grpc_core::kKeepaliveThrottlingKey,
1129 absl::Cord(std::to_string(t->keepalive_time)));
1130 }
1131 // lie: use transient failure from the transport to indicate goaway has been
1132 // received.
1133 connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE, status,
1134 "got_goaway");
1135 }
1136
maybe_start_some_streams(grpc_chttp2_transport * t)1137 static void maybe_start_some_streams(grpc_chttp2_transport* t) {
1138 grpc_chttp2_stream* s;
1139 // cancel out streams that haven't yet started if we have received a GOAWAY
1140 if (t->goaway_error != GRPC_ERROR_NONE) {
1141 while (grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1142 grpc_chttp2_cancel_stream(
1143 t, s,
1144 grpc_error_set_int(
1145 GRPC_ERROR_CREATE_FROM_STATIC_STRING("GOAWAY received"),
1146 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1147 }
1148 return;
1149 }
1150 // start streams where we have free grpc_chttp2_stream ids and free
1151 // * concurrency
1152 while (t->next_stream_id <= MAX_CLIENT_STREAM_ID &&
1153 grpc_chttp2_stream_map_size(&t->stream_map) <
1154 t->settings[GRPC_PEER_SETTINGS]
1155 [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] &&
1156 grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1157 // safe since we can't (legally) be parsing this stream yet
1158 GRPC_CHTTP2_IF_TRACING(gpr_log(
1159 GPR_INFO,
1160 "HTTP:%s: Transport %p allocating new grpc_chttp2_stream %p to id %d",
1161 t->is_client ? "CLI" : "SVR", t, s, t->next_stream_id));
1162
1163 GPR_ASSERT(s->id == 0);
1164 s->id = t->next_stream_id;
1165 t->next_stream_id += 2;
1166
1167 if (t->next_stream_id >= MAX_CLIENT_STREAM_ID) {
1168 connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE,
1169 absl::Status(absl::StatusCode::kUnavailable,
1170 "Transport Stream IDs exhausted"),
1171 "no_more_stream_ids");
1172 }
1173
1174 grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
1175 post_destructive_reclaimer(t);
1176 grpc_chttp2_mark_stream_writable(t, s);
1177 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM);
1178 }
1179 // cancel out streams that will never be started
1180 if (t->next_stream_id >= MAX_CLIENT_STREAM_ID) {
1181 while (grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1182 grpc_chttp2_cancel_stream(
1183 t, s,
1184 grpc_error_set_int(
1185 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream IDs exhausted"),
1186 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1187 }
1188 }
1189 }
1190
1191 // Flag that this closure barrier may be covering a write in a pollset, and so
1192 // we should not complete this closure until we can prove that the write got
1193 // scheduled
1194 #define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 0)
1195 // First bit of the reference count, stored in the high order bits (with the low
1196 // bits being used for flags defined above)
1197 #define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16)
1198
add_closure_barrier(grpc_closure * closure)1199 static grpc_closure* add_closure_barrier(grpc_closure* closure) {
1200 closure->next_data.scratch += CLOSURE_BARRIER_FIRST_REF_BIT;
1201 return closure;
1202 }
1203
null_then_sched_closure(grpc_closure ** closure)1204 static void null_then_sched_closure(grpc_closure** closure) {
1205 grpc_closure* c = *closure;
1206 *closure = nullptr;
1207 grpc_core::ExecCtx::Run(DEBUG_LOCATION, c, GRPC_ERROR_NONE);
1208 }
1209
grpc_chttp2_complete_closure_step(grpc_chttp2_transport * t,grpc_chttp2_stream *,grpc_closure ** pclosure,grpc_error_handle error,const char * desc)1210 void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
1211 grpc_chttp2_stream* /*s*/,
1212 grpc_closure** pclosure,
1213 grpc_error_handle error,
1214 const char* desc) {
1215 grpc_closure* closure = *pclosure;
1216 *pclosure = nullptr;
1217 if (closure == nullptr) {
1218 GRPC_ERROR_UNREF(error);
1219 return;
1220 }
1221 closure->next_data.scratch -= CLOSURE_BARRIER_FIRST_REF_BIT;
1222 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1223 gpr_log(
1224 GPR_INFO,
1225 "complete_closure_step: t=%p %p refs=%d flags=0x%04x desc=%s err=%s "
1226 "write_state=%s",
1227 t, closure,
1228 static_cast<int>(closure->next_data.scratch /
1229 CLOSURE_BARRIER_FIRST_REF_BIT),
1230 static_cast<int>(closure->next_data.scratch %
1231 CLOSURE_BARRIER_FIRST_REF_BIT),
1232 desc, grpc_error_std_string(error).c_str(),
1233 write_state_name(t->write_state));
1234 }
1235 if (error != GRPC_ERROR_NONE) {
1236 if (closure->error_data.error == GRPC_ERROR_NONE) {
1237 closure->error_data.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1238 "Error in HTTP transport completing operation");
1239 closure->error_data.error = grpc_error_set_str(
1240 closure->error_data.error, GRPC_ERROR_STR_TARGET_ADDRESS,
1241 grpc_slice_from_copied_string(t->peer_string.c_str()));
1242 }
1243 closure->error_data.error =
1244 grpc_error_add_child(closure->error_data.error, error);
1245 }
1246 if (closure->next_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) {
1247 if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) ||
1248 !(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) {
1249 // Using GRPC_CLOSURE_SCHED instead of GRPC_CLOSURE_RUN to avoid running
1250 // closures earlier than when it is safe to do so.
1251 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure,
1252 closure->error_data.error);
1253 } else {
1254 grpc_closure_list_append(&t->run_after_write, closure,
1255 closure->error_data.error);
1256 }
1257 }
1258 }
1259
contains_non_ok_status(grpc_metadata_batch * batch)1260 static bool contains_non_ok_status(grpc_metadata_batch* batch) {
1261 if (batch->idx.named.grpc_status != nullptr) {
1262 return !grpc_mdelem_static_value_eq(batch->idx.named.grpc_status->md,
1263 GRPC_MDELEM_GRPC_STATUS_0);
1264 }
1265 return false;
1266 }
1267
maybe_become_writable_due_to_send_msg(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1268 static void maybe_become_writable_due_to_send_msg(grpc_chttp2_transport* t,
1269 grpc_chttp2_stream* s) {
1270 if (s->id != 0 && (!s->write_buffering ||
1271 s->flow_controlled_buffer.length > t->write_buffer_size)) {
1272 grpc_chttp2_mark_stream_writable(t, s);
1273 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE);
1274 }
1275 }
1276
add_fetched_slice_locked(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1277 static void add_fetched_slice_locked(grpc_chttp2_transport* t,
1278 grpc_chttp2_stream* s) {
1279 s->fetched_send_message_length +=
1280 static_cast<uint32_t> GRPC_SLICE_LENGTH(s->fetching_slice);
1281 grpc_slice_buffer_add(&s->flow_controlled_buffer, s->fetching_slice);
1282 maybe_become_writable_due_to_send_msg(t, s);
1283 }
1284
continue_fetching_send_locked(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1285 static void continue_fetching_send_locked(grpc_chttp2_transport* t,
1286 grpc_chttp2_stream* s) {
1287 for (;;) {
1288 if (s->fetching_send_message == nullptr) {
1289 // Stream was cancelled before message fetch completed
1290 abort(); /* TODO(ctiller): what cleanup here? */
1291 return; /* early out */
1292 }
1293 if (s->fetched_send_message_length == s->fetching_send_message->length()) {
1294 int64_t notify_offset = s->next_message_end_offset;
1295 if (notify_offset <= s->flow_controlled_bytes_written) {
1296 grpc_chttp2_complete_closure_step(
1297 t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE,
1298 "fetching_send_message_finished");
1299 } else {
1300 grpc_chttp2_write_cb* cb = t->write_cb_pool;
1301 if (cb == nullptr) {
1302 cb = static_cast<grpc_chttp2_write_cb*>(gpr_malloc(sizeof(*cb)));
1303 } else {
1304 t->write_cb_pool = cb->next;
1305 }
1306 cb->call_at_byte = notify_offset;
1307 cb->closure = s->fetching_send_message_finished;
1308 s->fetching_send_message_finished = nullptr;
1309 grpc_chttp2_write_cb** list =
1310 s->fetching_send_message->flags() & GRPC_WRITE_THROUGH
1311 ? &s->on_write_finished_cbs
1312 : &s->on_flow_controlled_cbs;
1313 cb->next = *list;
1314 *list = cb;
1315 }
1316 s->fetching_send_message.reset();
1317 return; /* early out */
1318 } else if (s->fetching_send_message->Next(
1319 UINT32_MAX, GRPC_CLOSURE_INIT(&s->complete_fetch_locked,
1320 ::complete_fetch, s,
1321 grpc_schedule_on_exec_ctx))) {
1322 grpc_error_handle error =
1323 s->fetching_send_message->Pull(&s->fetching_slice);
1324 if (error != GRPC_ERROR_NONE) {
1325 s->fetching_send_message.reset();
1326 grpc_chttp2_cancel_stream(t, s, error);
1327 } else {
1328 add_fetched_slice_locked(t, s);
1329 }
1330 }
1331 }
1332 }
1333
complete_fetch(void * gs,grpc_error_handle error)1334 static void complete_fetch(void* gs, grpc_error_handle error) {
1335 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(gs);
1336 s->t->combiner->Run(GRPC_CLOSURE_INIT(&s->complete_fetch_locked,
1337 ::complete_fetch_locked, s, nullptr),
1338 GRPC_ERROR_REF(error));
1339 }
1340
complete_fetch_locked(void * gs,grpc_error_handle error)1341 static void complete_fetch_locked(void* gs, grpc_error_handle error) {
1342 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(gs);
1343 grpc_chttp2_transport* t = s->t;
1344 if (error == GRPC_ERROR_NONE) {
1345 error = s->fetching_send_message->Pull(&s->fetching_slice);
1346 if (error == GRPC_ERROR_NONE) {
1347 add_fetched_slice_locked(t, s);
1348 continue_fetching_send_locked(t, s);
1349 }
1350 }
1351 if (error != GRPC_ERROR_NONE) {
1352 s->fetching_send_message.reset();
1353 grpc_chttp2_cancel_stream(t, s, error);
1354 }
1355 }
1356
log_metadata(const grpc_metadata_batch * md_batch,uint32_t id,bool is_client,bool is_initial)1357 static void log_metadata(const grpc_metadata_batch* md_batch, uint32_t id,
1358 bool is_client, bool is_initial) {
1359 for (grpc_linked_mdelem* md = md_batch->list.head; md != nullptr;
1360 md = md->next) {
1361 char* key = grpc_slice_to_c_string(GRPC_MDKEY(md->md));
1362 char* value = grpc_slice_to_c_string(GRPC_MDVALUE(md->md));
1363 gpr_log(GPR_INFO, "HTTP:%d:%s:%s: %s: %s", id, is_initial ? "HDR" : "TRL",
1364 is_client ? "CLI" : "SVR", key, value);
1365 gpr_free(key);
1366 gpr_free(value);
1367 }
1368 }
1369
perform_stream_op_locked(void * stream_op,grpc_error_handle)1370 static void perform_stream_op_locked(void* stream_op,
1371 grpc_error_handle /*error_ignored*/) {
1372 GPR_TIMER_SCOPE("perform_stream_op_locked", 0);
1373
1374 grpc_transport_stream_op_batch* op =
1375 static_cast<grpc_transport_stream_op_batch*>(stream_op);
1376 grpc_chttp2_stream* s =
1377 static_cast<grpc_chttp2_stream*>(op->handler_private.extra_arg);
1378 grpc_transport_stream_op_batch_payload* op_payload = op->payload;
1379 grpc_chttp2_transport* t = s->t;
1380
1381 GRPC_STATS_INC_HTTP2_OP_BATCHES();
1382
1383 s->context = op->payload->context;
1384 s->traced = op->is_traced;
1385 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1386 gpr_log(GPR_INFO, "perform_stream_op_locked: %s; on_complete = %p",
1387 grpc_transport_stream_op_batch_string(op).c_str(), op->on_complete);
1388 if (op->send_initial_metadata) {
1389 log_metadata(op_payload->send_initial_metadata.send_initial_metadata,
1390 s->id, t->is_client, true);
1391 }
1392 if (op->send_trailing_metadata) {
1393 log_metadata(op_payload->send_trailing_metadata.send_trailing_metadata,
1394 s->id, t->is_client, false);
1395 }
1396 }
1397
1398 grpc_closure* on_complete = op->on_complete;
1399 // on_complete will be null if and only if there are no send ops in the batch.
1400 if (on_complete != nullptr) {
1401 // This batch has send ops. Use final_data as a barrier until enqueue time;
1402 // the initial counter is dropped at the end of this function.
1403 on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT;
1404 on_complete->error_data.error = GRPC_ERROR_NONE;
1405 }
1406
1407 if (op->cancel_stream) {
1408 GRPC_STATS_INC_HTTP2_OP_CANCEL();
1409 grpc_chttp2_cancel_stream(t, s, op_payload->cancel_stream.cancel_error);
1410 }
1411
1412 if (op->send_initial_metadata) {
1413 if (t->is_client && t->channelz_socket != nullptr) {
1414 t->channelz_socket->RecordStreamStartedFromLocal();
1415 }
1416 GRPC_STATS_INC_HTTP2_OP_SEND_INITIAL_METADATA();
1417 GPR_ASSERT(s->send_initial_metadata_finished == nullptr);
1418 on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
1419
1420 // Identify stream compression
1421 if (op_payload->send_initial_metadata.send_initial_metadata->idx.named
1422 .content_encoding == nullptr ||
1423 grpc_stream_compression_method_parse(
1424 GRPC_MDVALUE(
1425 op_payload->send_initial_metadata.send_initial_metadata->idx
1426 .named.content_encoding->md),
1427 true, &s->stream_compression_method) == 0) {
1428 s->stream_compression_method = GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS;
1429 }
1430 if (s->stream_compression_method !=
1431 GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) {
1432 s->uncompressed_data_size = 0;
1433 s->stream_compression_ctx = nullptr;
1434 grpc_slice_buffer_init(&s->compressed_data_buffer);
1435 }
1436 s->send_initial_metadata_finished = add_closure_barrier(on_complete);
1437 s->send_initial_metadata =
1438 op_payload->send_initial_metadata.send_initial_metadata;
1439 if (t->is_client) {
1440 s->deadline = GPR_MIN(s->deadline, s->send_initial_metadata->deadline);
1441 }
1442 if (contains_non_ok_status(s->send_initial_metadata)) {
1443 s->seen_error = true;
1444 }
1445 if (!s->write_closed) {
1446 if (t->is_client) {
1447 if (t->closed_with_error == GRPC_ERROR_NONE) {
1448 GPR_ASSERT(s->id == 0);
1449 grpc_chttp2_list_add_waiting_for_concurrency(t, s);
1450 maybe_start_some_streams(t);
1451 } else {
1452 grpc_chttp2_cancel_stream(
1453 t, s,
1454 grpc_error_set_int(
1455 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1456 "Transport closed", &t->closed_with_error, 1),
1457 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1458 }
1459 } else {
1460 GPR_ASSERT(s->id != 0);
1461 grpc_chttp2_mark_stream_writable(t, s);
1462 if (!(op->send_message &&
1463 (op->payload->send_message.send_message->flags() &
1464 GRPC_WRITE_BUFFER_HINT))) {
1465 grpc_chttp2_initiate_write(
1466 t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA);
1467 }
1468 }
1469 } else {
1470 s->send_initial_metadata = nullptr;
1471 grpc_chttp2_complete_closure_step(
1472 t, s, &s->send_initial_metadata_finished,
1473 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1474 "Attempt to send initial metadata after stream was closed",
1475 &s->write_closed_error, 1),
1476 "send_initial_metadata_finished");
1477 }
1478 if (op_payload->send_initial_metadata.peer_string != nullptr) {
1479 gpr_atm_rel_store(op_payload->send_initial_metadata.peer_string,
1480 (gpr_atm)t->peer_string.c_str());
1481 }
1482 }
1483
1484 if (op->send_message) {
1485 GRPC_STATS_INC_HTTP2_OP_SEND_MESSAGE();
1486 t->num_messages_in_next_write++;
1487 GRPC_STATS_INC_HTTP2_SEND_MESSAGE_SIZE(
1488 op->payload->send_message.send_message->length());
1489 on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
1490 s->fetching_send_message_finished = add_closure_barrier(op->on_complete);
1491 if (s->write_closed) {
1492 op->payload->send_message.stream_write_closed = true;
1493 // We should NOT return an error here, so as to avoid a cancel OP being
1494 // started. The surface layer will notice that the stream has been closed
1495 // for writes and fail the send message op.
1496 op->payload->send_message.send_message.reset();
1497 grpc_chttp2_complete_closure_step(
1498 t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE,
1499 "fetching_send_message_finished");
1500 } else {
1501 GPR_ASSERT(s->fetching_send_message == nullptr);
1502 uint8_t* frame_hdr = grpc_slice_buffer_tiny_add(
1503 &s->flow_controlled_buffer, GRPC_HEADER_SIZE_IN_BYTES);
1504 uint32_t flags = op_payload->send_message.send_message->flags();
1505 frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0;
1506 size_t len = op_payload->send_message.send_message->length();
1507 frame_hdr[1] = static_cast<uint8_t>(len >> 24);
1508 frame_hdr[2] = static_cast<uint8_t>(len >> 16);
1509 frame_hdr[3] = static_cast<uint8_t>(len >> 8);
1510 frame_hdr[4] = static_cast<uint8_t>(len);
1511 s->fetching_send_message =
1512 std::move(op_payload->send_message.send_message);
1513 s->fetched_send_message_length = 0;
1514 s->next_message_end_offset =
1515 s->flow_controlled_bytes_written +
1516 static_cast<int64_t>(s->flow_controlled_buffer.length) +
1517 static_cast<int64_t>(len);
1518 if (flags & GRPC_WRITE_BUFFER_HINT) {
1519 s->next_message_end_offset -= t->write_buffer_size;
1520 s->write_buffering = true;
1521 } else {
1522 s->write_buffering = false;
1523 }
1524 continue_fetching_send_locked(t, s);
1525 maybe_become_writable_due_to_send_msg(t, s);
1526 }
1527 }
1528
1529 if (op->send_trailing_metadata) {
1530 GRPC_STATS_INC_HTTP2_OP_SEND_TRAILING_METADATA();
1531 GPR_ASSERT(s->send_trailing_metadata_finished == nullptr);
1532 on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
1533 s->send_trailing_metadata_finished = add_closure_barrier(on_complete);
1534 s->send_trailing_metadata =
1535 op_payload->send_trailing_metadata.send_trailing_metadata;
1536 s->sent_trailing_metadata_op = op_payload->send_trailing_metadata.sent;
1537 s->write_buffering = false;
1538 if (contains_non_ok_status(s->send_trailing_metadata)) {
1539 s->seen_error = true;
1540 }
1541 if (s->write_closed) {
1542 s->send_trailing_metadata = nullptr;
1543 s->sent_trailing_metadata_op = nullptr;
1544 grpc_chttp2_complete_closure_step(
1545 t, s, &s->send_trailing_metadata_finished,
1546 grpc_metadata_batch_is_empty(
1547 op->payload->send_trailing_metadata.send_trailing_metadata)
1548 ? GRPC_ERROR_NONE
1549 : GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1550 "Attempt to send trailing metadata after "
1551 "stream was closed"),
1552 "send_trailing_metadata_finished");
1553 } else if (s->id != 0) {
1554 // TODO(ctiller): check if there's flow control for any outstanding
1555 // bytes before going writable
1556 grpc_chttp2_mark_stream_writable(t, s);
1557 grpc_chttp2_initiate_write(
1558 t, GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA);
1559 }
1560 }
1561
1562 if (op->recv_initial_metadata) {
1563 GRPC_STATS_INC_HTTP2_OP_RECV_INITIAL_METADATA();
1564 GPR_ASSERT(s->recv_initial_metadata_ready == nullptr);
1565 s->recv_initial_metadata_ready =
1566 op_payload->recv_initial_metadata.recv_initial_metadata_ready;
1567 s->recv_initial_metadata =
1568 op_payload->recv_initial_metadata.recv_initial_metadata;
1569 s->trailing_metadata_available =
1570 op_payload->recv_initial_metadata.trailing_metadata_available;
1571 if (op_payload->recv_initial_metadata.peer_string != nullptr) {
1572 gpr_atm_rel_store(op_payload->recv_initial_metadata.peer_string,
1573 (gpr_atm)t->peer_string.c_str());
1574 }
1575 grpc_chttp2_maybe_complete_recv_initial_metadata(t, s);
1576 }
1577
1578 if (op->recv_message) {
1579 GRPC_STATS_INC_HTTP2_OP_RECV_MESSAGE();
1580 size_t before = 0;
1581 GPR_ASSERT(s->recv_message_ready == nullptr);
1582 GPR_ASSERT(!s->pending_byte_stream);
1583 s->recv_message_ready = op_payload->recv_message.recv_message_ready;
1584 s->recv_message = op_payload->recv_message.recv_message;
1585 if (s->id != 0) {
1586 if (!s->read_closed) {
1587 before = s->frame_storage.length +
1588 s->unprocessed_incoming_frames_buffer.length;
1589 }
1590 }
1591 grpc_chttp2_maybe_complete_recv_message(t, s);
1592 if (s->id != 0) {
1593 if (!s->read_closed && s->frame_storage.length == 0) {
1594 size_t after = s->frame_storage.length +
1595 s->unprocessed_incoming_frames_buffer_cached_length;
1596 s->flow_control->IncomingByteStreamUpdate(GRPC_HEADER_SIZE_IN_BYTES,
1597 before - after);
1598 grpc_chttp2_act_on_flowctl_action(s->flow_control->MakeAction(), t, s);
1599 }
1600 }
1601 }
1602
1603 if (op->recv_trailing_metadata) {
1604 GRPC_STATS_INC_HTTP2_OP_RECV_TRAILING_METADATA();
1605 GPR_ASSERT(s->collecting_stats == nullptr);
1606 s->collecting_stats = op_payload->recv_trailing_metadata.collect_stats;
1607 GPR_ASSERT(s->recv_trailing_metadata_finished == nullptr);
1608 s->recv_trailing_metadata_finished =
1609 op_payload->recv_trailing_metadata.recv_trailing_metadata_ready;
1610 s->recv_trailing_metadata =
1611 op_payload->recv_trailing_metadata.recv_trailing_metadata;
1612 s->final_metadata_requested = true;
1613 grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
1614 }
1615
1616 if (on_complete != nullptr) {
1617 grpc_chttp2_complete_closure_step(t, s, &on_complete, GRPC_ERROR_NONE,
1618 "op->on_complete");
1619 }
1620
1621 GRPC_CHTTP2_STREAM_UNREF(s, "perform_stream_op");
1622 }
1623
perform_stream_op(grpc_transport * gt,grpc_stream * gs,grpc_transport_stream_op_batch * op)1624 static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
1625 grpc_transport_stream_op_batch* op) {
1626 GPR_TIMER_SCOPE("perform_stream_op", 0);
1627 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
1628 grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs);
1629
1630 if (!t->is_client) {
1631 if (op->send_initial_metadata) {
1632 grpc_millis deadline =
1633 op->payload->send_initial_metadata.send_initial_metadata->deadline;
1634 GPR_ASSERT(deadline == GRPC_MILLIS_INF_FUTURE);
1635 }
1636 if (op->send_trailing_metadata) {
1637 grpc_millis deadline =
1638 op->payload->send_trailing_metadata.send_trailing_metadata->deadline;
1639 GPR_ASSERT(deadline == GRPC_MILLIS_INF_FUTURE);
1640 }
1641 }
1642
1643 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1644 gpr_log(GPR_INFO, "perform_stream_op[s=%p]: %s", s,
1645 grpc_transport_stream_op_batch_string(op).c_str());
1646 }
1647
1648 GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op");
1649 op->handler_private.extra_arg = gs;
1650 t->combiner->Run(GRPC_CLOSURE_INIT(&op->handler_private.closure,
1651 perform_stream_op_locked, op, nullptr),
1652 GRPC_ERROR_NONE);
1653 }
1654
cancel_pings(grpc_chttp2_transport * t,grpc_error_handle error)1655 static void cancel_pings(grpc_chttp2_transport* t, grpc_error_handle error) {
1656 // callback remaining pings: they're not allowed to call into the transport,
1657 // and maybe they hold resources that need to be freed
1658 grpc_chttp2_ping_queue* pq = &t->ping_queue;
1659 GPR_ASSERT(error != GRPC_ERROR_NONE);
1660 for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) {
1661 grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error));
1662 grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &pq->lists[j]);
1663 }
1664 GRPC_ERROR_UNREF(error);
1665 }
1666
send_ping_locked(grpc_chttp2_transport * t,grpc_closure * on_initiate,grpc_closure * on_ack)1667 static void send_ping_locked(grpc_chttp2_transport* t,
1668 grpc_closure* on_initiate, grpc_closure* on_ack) {
1669 if (t->closed_with_error != GRPC_ERROR_NONE) {
1670 grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_initiate,
1671 GRPC_ERROR_REF(t->closed_with_error));
1672 grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_ack,
1673 GRPC_ERROR_REF(t->closed_with_error));
1674 return;
1675 }
1676 grpc_chttp2_ping_queue* pq = &t->ping_queue;
1677 grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE], on_initiate,
1678 GRPC_ERROR_NONE);
1679 grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT], on_ack,
1680 GRPC_ERROR_NONE);
1681 }
1682
1683 // Specialized form of send_ping_locked for keepalive ping. If there is already
1684 // a ping in progress, the keepalive ping would piggyback onto that ping,
1685 // instead of waiting for that ping to complete and then starting a new ping.
send_keepalive_ping_locked(grpc_chttp2_transport * t)1686 static void send_keepalive_ping_locked(grpc_chttp2_transport* t) {
1687 if (t->closed_with_error != GRPC_ERROR_NONE) {
1688 t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked,
1689 start_keepalive_ping_locked, t, nullptr),
1690 GRPC_ERROR_REF(t->closed_with_error));
1691 t->combiner->Run(
1692 GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked,
1693 finish_keepalive_ping_locked, t, nullptr),
1694 GRPC_ERROR_REF(t->closed_with_error));
1695 return;
1696 }
1697 grpc_chttp2_ping_queue* pq = &t->ping_queue;
1698 if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) {
1699 // There is a ping in flight. Add yourself to the inflight closure list.
1700 t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked,
1701 start_keepalive_ping_locked, t, nullptr),
1702 GRPC_ERROR_REF(t->closed_with_error));
1703 grpc_closure_list_append(
1704 &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT],
1705 GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked,
1706 finish_keepalive_ping, t, grpc_schedule_on_exec_ctx),
1707 GRPC_ERROR_NONE);
1708 return;
1709 }
1710 grpc_closure_list_append(
1711 &pq->lists[GRPC_CHTTP2_PCL_INITIATE],
1712 GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked, start_keepalive_ping,
1713 t, grpc_schedule_on_exec_ctx),
1714 GRPC_ERROR_NONE);
1715 grpc_closure_list_append(
1716 &pq->lists[GRPC_CHTTP2_PCL_NEXT],
1717 GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked, finish_keepalive_ping,
1718 t, grpc_schedule_on_exec_ctx),
1719 GRPC_ERROR_NONE);
1720 }
1721
grpc_chttp2_retry_initiate_ping(void * tp,grpc_error_handle error)1722 void grpc_chttp2_retry_initiate_ping(void* tp, grpc_error_handle error) {
1723 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
1724 t->combiner->Run(GRPC_CLOSURE_INIT(&t->retry_initiate_ping_locked,
1725 retry_initiate_ping_locked, t, nullptr),
1726 GRPC_ERROR_REF(error));
1727 }
1728
retry_initiate_ping_locked(void * tp,grpc_error_handle error)1729 static void retry_initiate_ping_locked(void* tp, grpc_error_handle error) {
1730 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
1731 t->ping_state.is_delayed_ping_timer_set = false;
1732 if (error == GRPC_ERROR_NONE) {
1733 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING);
1734 }
1735 GRPC_CHTTP2_UNREF_TRANSPORT(t, "retry_initiate_ping_locked");
1736 }
1737
grpc_chttp2_ack_ping(grpc_chttp2_transport * t,uint64_t id)1738 void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) {
1739 grpc_chttp2_ping_queue* pq = &t->ping_queue;
1740 if (pq->inflight_id != id) {
1741 gpr_log(GPR_DEBUG, "Unknown ping response from %s: %" PRIx64,
1742 t->peer_string.c_str(), id);
1743 return;
1744 }
1745 grpc_core::ExecCtx::RunList(DEBUG_LOCATION,
1746 &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
1747 if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
1748 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS);
1749 }
1750 }
1751
send_goaway(grpc_chttp2_transport * t,grpc_error_handle error)1752 static void send_goaway(grpc_chttp2_transport* t, grpc_error_handle error) {
1753 // We want to log this irrespective of whether http tracing is enabled
1754 gpr_log(GPR_INFO, "%s: Sending goaway err=%s", t->peer_string.c_str(),
1755 grpc_error_std_string(error).c_str());
1756 t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED;
1757 grpc_http2_error_code http_error;
1758 grpc_slice slice;
1759 grpc_error_get_status(error, GRPC_MILLIS_INF_FUTURE, nullptr, &slice,
1760 &http_error, nullptr);
1761 grpc_chttp2_goaway_append(t->last_new_stream_id,
1762 static_cast<uint32_t>(http_error),
1763 grpc_slice_ref_internal(slice), &t->qbuf);
1764 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT);
1765 GRPC_ERROR_UNREF(error);
1766 }
1767
grpc_chttp2_add_ping_strike(grpc_chttp2_transport * t)1768 void grpc_chttp2_add_ping_strike(grpc_chttp2_transport* t) {
1769 if (++t->ping_recv_state.ping_strikes > t->ping_policy.max_ping_strikes &&
1770 t->ping_policy.max_ping_strikes != 0) {
1771 send_goaway(t,
1772 grpc_error_set_int(
1773 GRPC_ERROR_CREATE_FROM_STATIC_STRING("too_many_pings"),
1774 GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM));
1775 // The transport will be closed after the write is done
1776 close_transport_locked(
1777 t, grpc_error_set_int(
1778 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many pings"),
1779 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1780 }
1781 }
1782
grpc_chttp2_reset_ping_clock(grpc_chttp2_transport * t)1783 void grpc_chttp2_reset_ping_clock(grpc_chttp2_transport* t) {
1784 if (!t->is_client) {
1785 t->ping_recv_state.last_ping_recv_time = GRPC_MILLIS_INF_PAST;
1786 t->ping_recv_state.ping_strikes = 0;
1787 }
1788 t->ping_state.pings_before_data_required =
1789 t->ping_policy.max_pings_without_data;
1790 }
1791
perform_transport_op_locked(void * stream_op,grpc_error_handle)1792 static void perform_transport_op_locked(void* stream_op,
1793 grpc_error_handle /*error_ignored*/) {
1794 grpc_transport_op* op = static_cast<grpc_transport_op*>(stream_op);
1795 grpc_chttp2_transport* t =
1796 static_cast<grpc_chttp2_transport*>(op->handler_private.extra_arg);
1797
1798 if (op->goaway_error) {
1799 send_goaway(t, op->goaway_error);
1800 }
1801
1802 if (op->set_accept_stream) {
1803 t->accept_stream_cb = op->set_accept_stream_fn;
1804 t->accept_stream_cb_user_data = op->set_accept_stream_user_data;
1805 }
1806
1807 if (op->bind_pollset) {
1808 grpc_endpoint_add_to_pollset(t->ep, op->bind_pollset);
1809 }
1810
1811 if (op->bind_pollset_set) {
1812 grpc_endpoint_add_to_pollset_set(t->ep, op->bind_pollset_set);
1813 }
1814
1815 if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
1816 send_ping_locked(t, op->send_ping.on_initiate, op->send_ping.on_ack);
1817 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING);
1818 }
1819
1820 if (op->start_connectivity_watch != nullptr) {
1821 t->state_tracker.AddWatcher(op->start_connectivity_watch_state,
1822 std::move(op->start_connectivity_watch));
1823 }
1824 if (op->stop_connectivity_watch != nullptr) {
1825 t->state_tracker.RemoveWatcher(op->stop_connectivity_watch);
1826 }
1827
1828 if (op->disconnect_with_error != GRPC_ERROR_NONE) {
1829 close_transport_locked(t, op->disconnect_with_error);
1830 }
1831
1832 grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, GRPC_ERROR_NONE);
1833
1834 GRPC_CHTTP2_UNREF_TRANSPORT(t, "transport_op");
1835 }
1836
perform_transport_op(grpc_transport * gt,grpc_transport_op * op)1837 static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
1838 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
1839 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1840 gpr_log(GPR_INFO, "perform_transport_op[t=%p]: %s", t,
1841 grpc_transport_op_string(op).c_str());
1842 }
1843 op->handler_private.extra_arg = gt;
1844 GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op");
1845 t->combiner->Run(GRPC_CLOSURE_INIT(&op->handler_private.closure,
1846 perform_transport_op_locked, op, nullptr),
1847 GRPC_ERROR_NONE);
1848 }
1849
1850 //
1851 // INPUT PROCESSING - GENERAL
1852 //
1853
grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport *,grpc_chttp2_stream * s)1854 void grpc_chttp2_maybe_complete_recv_initial_metadata(
1855 grpc_chttp2_transport* /*t*/, grpc_chttp2_stream* s) {
1856 if (s->recv_initial_metadata_ready != nullptr &&
1857 s->published_metadata[0] != GRPC_METADATA_NOT_PUBLISHED) {
1858 if (s->seen_error) {
1859 grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1860 if (!s->pending_byte_stream) {
1861 grpc_slice_buffer_reset_and_unref_internal(
1862 &s->unprocessed_incoming_frames_buffer);
1863 }
1864 }
1865 grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[0],
1866 s->recv_initial_metadata);
1867 null_then_sched_closure(&s->recv_initial_metadata_ready);
1868 }
1869 }
1870
grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport *,grpc_chttp2_stream * s)1871 void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* /*t*/,
1872 grpc_chttp2_stream* s) {
1873 grpc_error_handle error = GRPC_ERROR_NONE;
1874 if (s->recv_message_ready != nullptr) {
1875 *s->recv_message = nullptr;
1876 if (s->final_metadata_requested && s->seen_error) {
1877 grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1878 if (!s->pending_byte_stream) {
1879 grpc_slice_buffer_reset_and_unref_internal(
1880 &s->unprocessed_incoming_frames_buffer);
1881 }
1882 }
1883 if (!s->pending_byte_stream) {
1884 while (s->unprocessed_incoming_frames_buffer.length > 0 ||
1885 s->frame_storage.length > 0) {
1886 if (s->unprocessed_incoming_frames_buffer.length == 0) {
1887 grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer,
1888 &s->frame_storage);
1889 s->unprocessed_incoming_frames_decompressed = false;
1890 }
1891 if (!s->unprocessed_incoming_frames_decompressed &&
1892 s->stream_decompression_method !=
1893 GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
1894 GPR_ASSERT(s->decompressed_data_buffer.length == 0);
1895 bool end_of_context;
1896 if (!s->stream_decompression_ctx) {
1897 s->stream_decompression_ctx =
1898 grpc_stream_compression_context_create(
1899 s->stream_decompression_method);
1900 }
1901 if (!grpc_stream_decompress(
1902 s->stream_decompression_ctx,
1903 &s->unprocessed_incoming_frames_buffer,
1904 &s->decompressed_data_buffer, nullptr,
1905 GRPC_HEADER_SIZE_IN_BYTES - s->decompressed_header_bytes,
1906 &end_of_context)) {
1907 grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1908 grpc_slice_buffer_reset_and_unref_internal(
1909 &s->unprocessed_incoming_frames_buffer);
1910 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1911 "Stream decompression error.");
1912 } else {
1913 s->decompressed_header_bytes += s->decompressed_data_buffer.length;
1914 if (s->decompressed_header_bytes == GRPC_HEADER_SIZE_IN_BYTES) {
1915 s->decompressed_header_bytes = 0;
1916 }
1917 error = grpc_deframe_unprocessed_incoming_frames(
1918 &s->data_parser, s, &s->decompressed_data_buffer, nullptr,
1919 s->recv_message);
1920 if (end_of_context) {
1921 grpc_stream_compression_context_destroy(
1922 s->stream_decompression_ctx);
1923 s->stream_decompression_ctx = nullptr;
1924 }
1925 }
1926 } else {
1927 error = grpc_deframe_unprocessed_incoming_frames(
1928 &s->data_parser, s, &s->unprocessed_incoming_frames_buffer,
1929 nullptr, s->recv_message);
1930 }
1931 if (error != GRPC_ERROR_NONE) {
1932 s->seen_error = true;
1933 grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1934 grpc_slice_buffer_reset_and_unref_internal(
1935 &s->unprocessed_incoming_frames_buffer);
1936 break;
1937 } else if (*s->recv_message != nullptr) {
1938 break;
1939 }
1940 }
1941 }
1942 // save the length of the buffer before handing control back to application
1943 // threads. Needed to support correct flow control bookkeeping
1944 s->unprocessed_incoming_frames_buffer_cached_length =
1945 s->unprocessed_incoming_frames_buffer.length;
1946 if (error == GRPC_ERROR_NONE && *s->recv_message != nullptr) {
1947 null_then_sched_closure(&s->recv_message_ready);
1948 } else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) {
1949 *s->recv_message = nullptr;
1950 null_then_sched_closure(&s->recv_message_ready);
1951 }
1952 GRPC_ERROR_UNREF(error);
1953 }
1954 }
1955
grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1956 void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
1957 grpc_chttp2_stream* s) {
1958 grpc_chttp2_maybe_complete_recv_message(t, s);
1959 if (s->recv_trailing_metadata_finished != nullptr && s->read_closed &&
1960 s->write_closed) {
1961 if (s->seen_error || !t->is_client) {
1962 grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1963 if (!s->pending_byte_stream) {
1964 grpc_slice_buffer_reset_and_unref_internal(
1965 &s->unprocessed_incoming_frames_buffer);
1966 }
1967 }
1968 bool pending_data = s->pending_byte_stream ||
1969 s->unprocessed_incoming_frames_buffer.length > 0;
1970 if (s->read_closed && s->frame_storage.length > 0 && !pending_data &&
1971 !s->seen_error && s->recv_trailing_metadata_finished != nullptr) {
1972 // Maybe some SYNC_FLUSH data is left in frame_storage. Consume them and
1973 // maybe decompress the next 5 bytes in the stream.
1974 if (s->stream_decompression_method ==
1975 GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
1976 grpc_slice_buffer_move_first(
1977 &s->frame_storage,
1978 GPR_MIN(s->frame_storage.length, GRPC_HEADER_SIZE_IN_BYTES),
1979 &s->unprocessed_incoming_frames_buffer);
1980 if (s->unprocessed_incoming_frames_buffer.length > 0) {
1981 s->unprocessed_incoming_frames_decompressed = true;
1982 pending_data = true;
1983 }
1984 } else {
1985 bool end_of_context;
1986 if (!s->stream_decompression_ctx) {
1987 s->stream_decompression_ctx = grpc_stream_compression_context_create(
1988 s->stream_decompression_method);
1989 }
1990 if (!grpc_stream_decompress(
1991 s->stream_decompression_ctx, &s->frame_storage,
1992 &s->unprocessed_incoming_frames_buffer, nullptr,
1993 GRPC_HEADER_SIZE_IN_BYTES, &end_of_context)) {
1994 grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1995 grpc_slice_buffer_reset_and_unref_internal(
1996 &s->unprocessed_incoming_frames_buffer);
1997 s->seen_error = true;
1998 } else {
1999 if (s->unprocessed_incoming_frames_buffer.length > 0) {
2000 s->unprocessed_incoming_frames_decompressed = true;
2001 pending_data = true;
2002 }
2003 if (end_of_context) {
2004 grpc_stream_compression_context_destroy(
2005 s->stream_decompression_ctx);
2006 s->stream_decompression_ctx = nullptr;
2007 }
2008 }
2009 }
2010 }
2011 if (s->read_closed && s->frame_storage.length == 0 && !pending_data &&
2012 s->recv_trailing_metadata_finished != nullptr) {
2013 grpc_transport_move_stats(&s->stats, s->collecting_stats);
2014 s->collecting_stats = nullptr;
2015 grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[1],
2016 s->recv_trailing_metadata);
2017 null_then_sched_closure(&s->recv_trailing_metadata_finished);
2018 }
2019 }
2020 }
2021
remove_stream(grpc_chttp2_transport * t,uint32_t id,grpc_error_handle error)2022 static void remove_stream(grpc_chttp2_transport* t, uint32_t id,
2023 grpc_error_handle error) {
2024 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(
2025 grpc_chttp2_stream_map_delete(&t->stream_map, id));
2026 GPR_DEBUG_ASSERT(s);
2027 if (t->incoming_stream == s) {
2028 t->incoming_stream = nullptr;
2029 grpc_chttp2_parsing_become_skip_parser(t);
2030 }
2031 if (s->pending_byte_stream) {
2032 if (s->on_next != nullptr) {
2033 grpc_core::Chttp2IncomingByteStream* bs = s->data_parser.parsing_frame;
2034 if (error == GRPC_ERROR_NONE) {
2035 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
2036 }
2037 bs->PublishError(error);
2038 bs->Unref();
2039 s->data_parser.parsing_frame = nullptr;
2040 } else {
2041 GRPC_ERROR_UNREF(s->byte_stream_error);
2042 s->byte_stream_error = GRPC_ERROR_REF(error);
2043 }
2044 }
2045
2046 if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
2047 post_benign_reclaimer(t);
2048 if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SENT) {
2049 close_transport_locked(
2050 t, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2051 "Last stream closed after sending GOAWAY", &error, 1));
2052 }
2053 }
2054 if (grpc_chttp2_list_remove_writable_stream(t, s)) {
2055 GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:remove_stream");
2056 }
2057 grpc_chttp2_list_remove_stalled_by_stream(t, s);
2058 grpc_chttp2_list_remove_stalled_by_transport(t, s);
2059
2060 GRPC_ERROR_UNREF(error);
2061
2062 maybe_start_some_streams(t);
2063 }
2064
grpc_chttp2_cancel_stream(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error_handle due_to_error)2065 void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2066 grpc_error_handle due_to_error) {
2067 if (!t->is_client && !s->sent_trailing_metadata &&
2068 grpc_error_has_clear_grpc_status(due_to_error)) {
2069 close_from_api(t, s, due_to_error);
2070 return;
2071 }
2072
2073 if (!s->read_closed || !s->write_closed) {
2074 if (s->id != 0) {
2075 grpc_http2_error_code http_error;
2076 grpc_error_get_status(due_to_error, s->deadline, nullptr, nullptr,
2077 &http_error, nullptr);
2078 grpc_chttp2_add_rst_stream_to_next_write(
2079 t, s->id, static_cast<uint32_t>(http_error), &s->stats.outgoing);
2080 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
2081 }
2082 }
2083 if (due_to_error != GRPC_ERROR_NONE && !s->seen_error) {
2084 s->seen_error = true;
2085 }
2086 grpc_chttp2_mark_stream_closed(t, s, 1, 1, due_to_error);
2087 }
2088
grpc_chttp2_fake_status(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error_handle error)2089 void grpc_chttp2_fake_status(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2090 grpc_error_handle error) {
2091 grpc_status_code status;
2092 grpc_slice slice;
2093 grpc_error_get_status(error, s->deadline, &status, &slice, nullptr, nullptr);
2094 if (status != GRPC_STATUS_OK) {
2095 s->seen_error = true;
2096 }
2097 // stream_global->recv_trailing_metadata_finished gives us a
2098 // last chance replacement: we've received trailing metadata,
2099 // but something more important has become available to signal
2100 // to the upper layers - drop what we've got, and then publish
2101 // what we want - which is safe because we haven't told anyone
2102 // about the metadata yet
2103 if (s->published_metadata[1] == GRPC_METADATA_NOT_PUBLISHED ||
2104 s->recv_trailing_metadata_finished != nullptr) {
2105 char status_string[GPR_LTOA_MIN_BUFSIZE];
2106 gpr_ltoa(status, status_string);
2107 GRPC_LOG_IF_ERROR("add_status",
2108 grpc_chttp2_incoming_metadata_buffer_replace_or_add(
2109 &s->metadata_buffer[1],
2110 grpc_mdelem_from_slices(
2111 GRPC_MDSTR_GRPC_STATUS,
2112 grpc_core::UnmanagedMemorySlice(status_string))));
2113 if (!GRPC_SLICE_IS_EMPTY(slice)) {
2114 GRPC_LOG_IF_ERROR(
2115 "add_status_message",
2116 grpc_chttp2_incoming_metadata_buffer_replace_or_add(
2117 &s->metadata_buffer[1],
2118 grpc_mdelem_create(GRPC_MDSTR_GRPC_MESSAGE, slice, nullptr)));
2119 }
2120 s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE;
2121 grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2122 }
2123
2124 GRPC_ERROR_UNREF(error);
2125 }
2126
add_error(grpc_error_handle error,grpc_error_handle * refs,size_t * nrefs)2127 static void add_error(grpc_error_handle error, grpc_error_handle* refs,
2128 size_t* nrefs) {
2129 if (error == GRPC_ERROR_NONE) return;
2130 for (size_t i = 0; i < *nrefs; i++) {
2131 if (error == refs[i]) {
2132 return;
2133 }
2134 }
2135 refs[*nrefs] = error;
2136 ++*nrefs;
2137 }
2138
removal_error(grpc_error_handle extra_error,grpc_chttp2_stream * s,const char * main_error_msg)2139 static grpc_error_handle removal_error(grpc_error_handle extra_error,
2140 grpc_chttp2_stream* s,
2141 const char* main_error_msg) {
2142 grpc_error_handle refs[3];
2143 size_t nrefs = 0;
2144 add_error(s->read_closed_error, refs, &nrefs);
2145 add_error(s->write_closed_error, refs, &nrefs);
2146 add_error(extra_error, refs, &nrefs);
2147 grpc_error_handle error = GRPC_ERROR_NONE;
2148 if (nrefs > 0) {
2149 error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(main_error_msg,
2150 refs, nrefs);
2151 }
2152 GRPC_ERROR_UNREF(extra_error);
2153 return error;
2154 }
2155
flush_write_list(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_chttp2_write_cb ** list,grpc_error_handle error)2156 static void flush_write_list(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2157 grpc_chttp2_write_cb** list,
2158 grpc_error_handle error) {
2159 while (*list) {
2160 grpc_chttp2_write_cb* cb = *list;
2161 *list = cb->next;
2162 grpc_chttp2_complete_closure_step(t, s, &cb->closure, GRPC_ERROR_REF(error),
2163 "on_write_finished_cb");
2164 cb->next = t->write_cb_pool;
2165 t->write_cb_pool = cb;
2166 }
2167 GRPC_ERROR_UNREF(error);
2168 }
2169
grpc_chttp2_fail_pending_writes(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error_handle error)2170 void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t,
2171 grpc_chttp2_stream* s,
2172 grpc_error_handle error) {
2173 error =
2174 removal_error(error, s, "Pending writes failed due to stream closure");
2175 s->send_initial_metadata = nullptr;
2176 grpc_chttp2_complete_closure_step(t, s, &s->send_initial_metadata_finished,
2177 GRPC_ERROR_REF(error),
2178 "send_initial_metadata_finished");
2179
2180 s->send_trailing_metadata = nullptr;
2181 s->sent_trailing_metadata_op = nullptr;
2182 grpc_chttp2_complete_closure_step(t, s, &s->send_trailing_metadata_finished,
2183 GRPC_ERROR_REF(error),
2184 "send_trailing_metadata_finished");
2185
2186 s->fetching_send_message.reset();
2187 grpc_chttp2_complete_closure_step(t, s, &s->fetching_send_message_finished,
2188 GRPC_ERROR_REF(error),
2189 "fetching_send_message_finished");
2190 flush_write_list(t, s, &s->on_write_finished_cbs, GRPC_ERROR_REF(error));
2191 flush_write_list(t, s, &s->on_flow_controlled_cbs, error);
2192 }
2193
grpc_chttp2_mark_stream_closed(grpc_chttp2_transport * t,grpc_chttp2_stream * s,int close_reads,int close_writes,grpc_error_handle error)2194 void grpc_chttp2_mark_stream_closed(grpc_chttp2_transport* t,
2195 grpc_chttp2_stream* s, int close_reads,
2196 int close_writes, grpc_error_handle error) {
2197 if (s->read_closed && s->write_closed) {
2198 // already closed, but we should still fake the status if needed.
2199 grpc_error_handle overall_error = removal_error(error, s, "Stream removed");
2200 if (overall_error != GRPC_ERROR_NONE) {
2201 grpc_chttp2_fake_status(t, s, overall_error);
2202 }
2203 grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2204 return;
2205 }
2206 bool closed_read = false;
2207 bool became_closed = false;
2208 if (close_reads && !s->read_closed) {
2209 s->read_closed_error = GRPC_ERROR_REF(error);
2210 s->read_closed = true;
2211 closed_read = true;
2212 }
2213 if (close_writes && !s->write_closed) {
2214 s->write_closed_error = GRPC_ERROR_REF(error);
2215 s->write_closed = true;
2216 grpc_chttp2_fail_pending_writes(t, s, GRPC_ERROR_REF(error));
2217 }
2218 if (s->read_closed && s->write_closed) {
2219 became_closed = true;
2220 grpc_error_handle overall_error =
2221 removal_error(GRPC_ERROR_REF(error), s, "Stream removed");
2222 if (s->id != 0) {
2223 remove_stream(t, s->id, GRPC_ERROR_REF(overall_error));
2224 } else {
2225 // Purge streams waiting on concurrency still waiting for id assignment
2226 grpc_chttp2_list_remove_waiting_for_concurrency(t, s);
2227 }
2228 if (overall_error != GRPC_ERROR_NONE) {
2229 grpc_chttp2_fake_status(t, s, overall_error);
2230 }
2231 }
2232 if (closed_read) {
2233 for (int i = 0; i < 2; i++) {
2234 if (s->published_metadata[i] == GRPC_METADATA_NOT_PUBLISHED) {
2235 s->published_metadata[i] = GRPC_METADATA_PUBLISHED_AT_CLOSE;
2236 }
2237 }
2238 grpc_chttp2_maybe_complete_recv_initial_metadata(t, s);
2239 grpc_chttp2_maybe_complete_recv_message(t, s);
2240 }
2241 if (became_closed) {
2242 grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2243 GRPC_CHTTP2_STREAM_UNREF(s, "chttp2");
2244 }
2245 GRPC_ERROR_UNREF(error);
2246 }
2247
close_from_api(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error_handle error)2248 static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2249 grpc_error_handle error) {
2250 grpc_slice hdr;
2251 grpc_slice status_hdr;
2252 grpc_slice http_status_hdr;
2253 grpc_slice content_type_hdr;
2254 grpc_slice message_pfx;
2255 uint8_t* p;
2256 uint32_t len = 0;
2257 grpc_status_code grpc_status;
2258 grpc_slice slice;
2259 grpc_error_get_status(error, s->deadline, &grpc_status, &slice, nullptr,
2260 nullptr);
2261
2262 GPR_ASSERT(grpc_status >= 0 && (int)grpc_status < 100);
2263
2264 // Hand roll a header block.
2265 // This is unnecessarily ugly - at some point we should find a more
2266 // elegant solution.
2267 // It's complicated by the fact that our send machinery would be dead by
2268 // the time we got around to sending this, so instead we ignore HPACK
2269 // compression and just write the uncompressed bytes onto the wire.
2270 if (!s->sent_initial_metadata) {
2271 http_status_hdr = GRPC_SLICE_MALLOC(13);
2272 p = GRPC_SLICE_START_PTR(http_status_hdr);
2273 *p++ = 0x00;
2274 *p++ = 7;
2275 *p++ = ':';
2276 *p++ = 's';
2277 *p++ = 't';
2278 *p++ = 'a';
2279 *p++ = 't';
2280 *p++ = 'u';
2281 *p++ = 's';
2282 *p++ = 3;
2283 *p++ = '2';
2284 *p++ = '0';
2285 *p++ = '0';
2286 GPR_ASSERT(p == GRPC_SLICE_END_PTR(http_status_hdr));
2287 len += static_cast<uint32_t> GRPC_SLICE_LENGTH(http_status_hdr);
2288
2289 content_type_hdr = GRPC_SLICE_MALLOC(31);
2290 p = GRPC_SLICE_START_PTR(content_type_hdr);
2291 *p++ = 0x00;
2292 *p++ = 12;
2293 *p++ = 'c';
2294 *p++ = 'o';
2295 *p++ = 'n';
2296 *p++ = 't';
2297 *p++ = 'e';
2298 *p++ = 'n';
2299 *p++ = 't';
2300 *p++ = '-';
2301 *p++ = 't';
2302 *p++ = 'y';
2303 *p++ = 'p';
2304 *p++ = 'e';
2305 *p++ = 16;
2306 *p++ = 'a';
2307 *p++ = 'p';
2308 *p++ = 'p';
2309 *p++ = 'l';
2310 *p++ = 'i';
2311 *p++ = 'c';
2312 *p++ = 'a';
2313 *p++ = 't';
2314 *p++ = 'i';
2315 *p++ = 'o';
2316 *p++ = 'n';
2317 *p++ = '/';
2318 *p++ = 'g';
2319 *p++ = 'r';
2320 *p++ = 'p';
2321 *p++ = 'c';
2322 GPR_ASSERT(p == GRPC_SLICE_END_PTR(content_type_hdr));
2323 len += static_cast<uint32_t> GRPC_SLICE_LENGTH(content_type_hdr);
2324 }
2325
2326 status_hdr = GRPC_SLICE_MALLOC(15 + (grpc_status >= 10));
2327 p = GRPC_SLICE_START_PTR(status_hdr);
2328 *p++ = 0x00; /* literal header, not indexed */
2329 *p++ = 11; /* len(grpc-status) */
2330 *p++ = 'g';
2331 *p++ = 'r';
2332 *p++ = 'p';
2333 *p++ = 'c';
2334 *p++ = '-';
2335 *p++ = 's';
2336 *p++ = 't';
2337 *p++ = 'a';
2338 *p++ = 't';
2339 *p++ = 'u';
2340 *p++ = 's';
2341 if (grpc_status < 10) {
2342 *p++ = 1;
2343 *p++ = static_cast<uint8_t>('0' + grpc_status);
2344 } else {
2345 *p++ = 2;
2346 *p++ = static_cast<uint8_t>('0' + (grpc_status / 10));
2347 *p++ = static_cast<uint8_t>('0' + (grpc_status % 10));
2348 }
2349 GPR_ASSERT(p == GRPC_SLICE_END_PTR(status_hdr));
2350 len += static_cast<uint32_t> GRPC_SLICE_LENGTH(status_hdr);
2351
2352 size_t msg_len = GRPC_SLICE_LENGTH(slice);
2353 GPR_ASSERT(msg_len <= UINT32_MAX);
2354 uint32_t msg_len_len = GRPC_CHTTP2_VARINT_LENGTH((uint32_t)msg_len, 1);
2355 message_pfx = GRPC_SLICE_MALLOC(14 + msg_len_len);
2356 p = GRPC_SLICE_START_PTR(message_pfx);
2357 *p++ = 0x00; /* literal header, not indexed */
2358 *p++ = 12; /* len(grpc-message) */
2359 *p++ = 'g';
2360 *p++ = 'r';
2361 *p++ = 'p';
2362 *p++ = 'c';
2363 *p++ = '-';
2364 *p++ = 'm';
2365 *p++ = 'e';
2366 *p++ = 's';
2367 *p++ = 's';
2368 *p++ = 'a';
2369 *p++ = 'g';
2370 *p++ = 'e';
2371 GRPC_CHTTP2_WRITE_VARINT((uint32_t)msg_len, 1, 0, p, (uint32_t)msg_len_len);
2372 p += msg_len_len;
2373 GPR_ASSERT(p == GRPC_SLICE_END_PTR(message_pfx));
2374 len += static_cast<uint32_t> GRPC_SLICE_LENGTH(message_pfx);
2375 len += static_cast<uint32_t>(msg_len);
2376
2377 hdr = GRPC_SLICE_MALLOC(9);
2378 p = GRPC_SLICE_START_PTR(hdr);
2379 *p++ = static_cast<uint8_t>(len >> 16);
2380 *p++ = static_cast<uint8_t>(len >> 8);
2381 *p++ = static_cast<uint8_t>(len);
2382 *p++ = GRPC_CHTTP2_FRAME_HEADER;
2383 *p++ = GRPC_CHTTP2_DATA_FLAG_END_STREAM | GRPC_CHTTP2_DATA_FLAG_END_HEADERS;
2384 *p++ = static_cast<uint8_t>(s->id >> 24);
2385 *p++ = static_cast<uint8_t>(s->id >> 16);
2386 *p++ = static_cast<uint8_t>(s->id >> 8);
2387 *p++ = static_cast<uint8_t>(s->id);
2388 GPR_ASSERT(p == GRPC_SLICE_END_PTR(hdr));
2389
2390 grpc_slice_buffer_add(&t->qbuf, hdr);
2391 if (!s->sent_initial_metadata) {
2392 grpc_slice_buffer_add(&t->qbuf, http_status_hdr);
2393 grpc_slice_buffer_add(&t->qbuf, content_type_hdr);
2394 }
2395 grpc_slice_buffer_add(&t->qbuf, status_hdr);
2396 grpc_slice_buffer_add(&t->qbuf, message_pfx);
2397 grpc_slice_buffer_add(&t->qbuf, grpc_slice_ref_internal(slice));
2398 grpc_chttp2_reset_ping_clock(t);
2399 grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR,
2400 &s->stats.outgoing);
2401
2402 grpc_chttp2_mark_stream_closed(t, s, 1, 1, error);
2403 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API);
2404 }
2405
2406 struct cancel_stream_cb_args {
2407 grpc_error_handle error;
2408 grpc_chttp2_transport* t;
2409 };
2410
cancel_stream_cb(void * user_data,uint32_t,void * stream)2411 static void cancel_stream_cb(void* user_data, uint32_t /*key*/, void* stream) {
2412 cancel_stream_cb_args* args = static_cast<cancel_stream_cb_args*>(user_data);
2413 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(stream);
2414 grpc_chttp2_cancel_stream(args->t, s, GRPC_ERROR_REF(args->error));
2415 }
2416
end_all_the_calls(grpc_chttp2_transport * t,grpc_error_handle error)2417 static void end_all_the_calls(grpc_chttp2_transport* t,
2418 grpc_error_handle error) {
2419 intptr_t http2_error;
2420 // If there is no explicit grpc or HTTP/2 error, set to UNAVAILABLE on server.
2421 if (!t->is_client && !grpc_error_has_clear_grpc_status(error) &&
2422 !grpc_error_get_int(error, GRPC_ERROR_INT_HTTP2_ERROR, &http2_error)) {
2423 error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
2424 GRPC_STATUS_UNAVAILABLE);
2425 }
2426 cancel_stream_cb_args args = {error, t};
2427 grpc_chttp2_stream_map_for_each(&t->stream_map, cancel_stream_cb, &args);
2428 GRPC_ERROR_UNREF(error);
2429 }
2430
2431 //
2432 // INPUT PROCESSING - PARSING
2433 //
2434
2435 template <class F>
WithUrgency(grpc_chttp2_transport * t,grpc_core::chttp2::FlowControlAction::Urgency urgency,grpc_chttp2_initiate_write_reason reason,F action)2436 static void WithUrgency(grpc_chttp2_transport* t,
2437 grpc_core::chttp2::FlowControlAction::Urgency urgency,
2438 grpc_chttp2_initiate_write_reason reason, F action) {
2439 switch (urgency) {
2440 case grpc_core::chttp2::FlowControlAction::Urgency::NO_ACTION_NEEDED:
2441 break;
2442 case grpc_core::chttp2::FlowControlAction::Urgency::UPDATE_IMMEDIATELY:
2443 grpc_chttp2_initiate_write(t, reason);
2444 // fallthrough
2445 case grpc_core::chttp2::FlowControlAction::Urgency::QUEUE_UPDATE:
2446 action();
2447 break;
2448 }
2449 }
2450
grpc_chttp2_act_on_flowctl_action(const grpc_core::chttp2::FlowControlAction & action,grpc_chttp2_transport * t,grpc_chttp2_stream * s)2451 void grpc_chttp2_act_on_flowctl_action(
2452 const grpc_core::chttp2::FlowControlAction& action,
2453 grpc_chttp2_transport* t, grpc_chttp2_stream* s) {
2454 WithUrgency(t, action.send_stream_update(),
2455 GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL,
2456 [t, s]() { grpc_chttp2_mark_stream_writable(t, s); });
2457 WithUrgency(t, action.send_transport_update(),
2458 GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL, []() {});
2459 WithUrgency(t, action.send_initial_window_update(),
2460 GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
2461 queue_setting_update(t,
2462 GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
2463 action.initial_window_size());
2464 });
2465 WithUrgency(t, action.send_max_frame_size_update(),
2466 GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
2467 queue_setting_update(t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
2468 action.max_frame_size());
2469 });
2470 }
2471
try_http_parsing(grpc_chttp2_transport * t)2472 static grpc_error_handle try_http_parsing(grpc_chttp2_transport* t) {
2473 grpc_http_parser parser;
2474 size_t i = 0;
2475 grpc_error_handle error = GRPC_ERROR_NONE;
2476 grpc_http_response response;
2477
2478 grpc_http_parser_init(&parser, GRPC_HTTP_RESPONSE, &response);
2479
2480 grpc_error_handle parse_error = GRPC_ERROR_NONE;
2481 for (; i < t->read_buffer.count && parse_error == GRPC_ERROR_NONE; i++) {
2482 parse_error =
2483 grpc_http_parser_parse(&parser, t->read_buffer.slices[i], nullptr);
2484 }
2485 if (parse_error == GRPC_ERROR_NONE &&
2486 (parse_error = grpc_http_parser_eof(&parser)) == GRPC_ERROR_NONE) {
2487 error = grpc_error_set_int(
2488 grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2489 "Trying to connect an http1.x server"),
2490 GRPC_ERROR_INT_HTTP_STATUS, response.status),
2491 GRPC_ERROR_INT_GRPC_STATUS,
2492 grpc_http2_status_to_grpc_status(response.status));
2493 }
2494 GRPC_ERROR_UNREF(parse_error);
2495
2496 grpc_http_parser_destroy(&parser);
2497 grpc_http_response_destroy(&response);
2498 return error;
2499 }
2500
read_action(void * tp,grpc_error_handle error)2501 static void read_action(void* tp, grpc_error_handle error) {
2502 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2503 t->combiner->Run(
2504 GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t, nullptr),
2505 GRPC_ERROR_REF(error));
2506 }
2507
read_action_locked(void * tp,grpc_error_handle error)2508 static void read_action_locked(void* tp, grpc_error_handle error) {
2509 GPR_TIMER_SCOPE("reading_action_locked", 0);
2510
2511 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2512
2513 GRPC_ERROR_REF(error);
2514
2515 grpc_error_handle err = error;
2516 if (err != GRPC_ERROR_NONE) {
2517 err = grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2518 "Endpoint read failed", &err, 1),
2519 GRPC_ERROR_INT_OCCURRED_DURING_WRITE,
2520 t->write_state);
2521 }
2522 GPR_SWAP(grpc_error_handle, err, error);
2523 GRPC_ERROR_UNREF(err);
2524 if (t->closed_with_error == GRPC_ERROR_NONE) {
2525 GPR_TIMER_SCOPE("reading_action.parse", 0);
2526 size_t i = 0;
2527 grpc_error_handle errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE,
2528 GRPC_ERROR_NONE};
2529 for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) {
2530 errors[1] = grpc_chttp2_perform_read(t, t->read_buffer.slices[i]);
2531 }
2532 if (errors[1] != GRPC_ERROR_NONE) {
2533 errors[2] = try_http_parsing(t);
2534 GRPC_ERROR_UNREF(error);
2535 error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2536 "Failed parsing HTTP/2", errors, GPR_ARRAY_SIZE(errors));
2537 }
2538 for (i = 0; i < GPR_ARRAY_SIZE(errors); i++) {
2539 GRPC_ERROR_UNREF(errors[i]);
2540 }
2541
2542 GPR_TIMER_SCOPE("post_parse_locked", 0);
2543 if (t->initial_window_update != 0) {
2544 if (t->initial_window_update > 0) {
2545 grpc_chttp2_stream* s;
2546 while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) {
2547 grpc_chttp2_mark_stream_writable(t, s);
2548 grpc_chttp2_initiate_write(
2549 t, GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING);
2550 }
2551 }
2552 t->initial_window_update = 0;
2553 }
2554 }
2555
2556 GPR_TIMER_SCOPE("post_reading_action_locked", 0);
2557 bool keep_reading = false;
2558 if (error == GRPC_ERROR_NONE && t->closed_with_error != GRPC_ERROR_NONE) {
2559 error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2560 "Transport closed", &t->closed_with_error, 1);
2561 }
2562 if (error != GRPC_ERROR_NONE) {
2563 // If a goaway frame was received, this might be the reason why the read
2564 // failed. Add this info to the error
2565 if (t->goaway_error != GRPC_ERROR_NONE) {
2566 error = grpc_error_add_child(error, GRPC_ERROR_REF(t->goaway_error));
2567 }
2568
2569 close_transport_locked(t, GRPC_ERROR_REF(error));
2570 t->endpoint_reading = 0;
2571 } else if (t->closed_with_error == GRPC_ERROR_NONE) {
2572 keep_reading = true;
2573 // Since we have read a byte, reset the keepalive timer
2574 if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
2575 grpc_timer_cancel(&t->keepalive_ping_timer);
2576 }
2577 }
2578 grpc_slice_buffer_reset_and_unref_internal(&t->read_buffer);
2579
2580 if (keep_reading) {
2581 if (t->num_pending_induced_frames >= DEFAULT_MAX_PENDING_INDUCED_FRAMES) {
2582 t->reading_paused_on_pending_induced_frames = true;
2583 GRPC_CHTTP2_IF_TRACING(
2584 gpr_log(GPR_INFO,
2585 "transport %p : Pausing reading due to too "
2586 "many unwritten SETTINGS ACK and RST_STREAM frames",
2587 t));
2588 } else {
2589 continue_read_action_locked(t);
2590 }
2591 } else {
2592 GRPC_CHTTP2_UNREF_TRANSPORT(t, "reading_action");
2593 }
2594
2595 GRPC_ERROR_UNREF(error);
2596 }
2597
continue_read_action_locked(grpc_chttp2_transport * t)2598 static void continue_read_action_locked(grpc_chttp2_transport* t) {
2599 const bool urgent = t->goaway_error != GRPC_ERROR_NONE;
2600 GRPC_CLOSURE_INIT(&t->read_action_locked, read_action, t,
2601 grpc_schedule_on_exec_ctx);
2602 grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action_locked, urgent);
2603 grpc_chttp2_act_on_flowctl_action(t->flow_control->MakeAction(), t, nullptr);
2604 }
2605
2606 // t is reffed prior to calling the first time, and once the callback chain
2607 // that kicks off finishes, it's unreffed
schedule_bdp_ping_locked(grpc_chttp2_transport * t)2608 void schedule_bdp_ping_locked(grpc_chttp2_transport* t) {
2609 t->flow_control->bdp_estimator()->SchedulePing();
2610 send_ping_locked(
2611 t,
2612 GRPC_CLOSURE_INIT(&t->start_bdp_ping_locked, start_bdp_ping, t,
2613 grpc_schedule_on_exec_ctx),
2614 GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked, finish_bdp_ping, t,
2615 grpc_schedule_on_exec_ctx));
2616 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_BDP_PING);
2617 }
2618
start_bdp_ping(void * tp,grpc_error_handle error)2619 static void start_bdp_ping(void* tp, grpc_error_handle error) {
2620 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2621 t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_bdp_ping_locked,
2622 start_bdp_ping_locked, t, nullptr),
2623 GRPC_ERROR_REF(error));
2624 }
2625
start_bdp_ping_locked(void * tp,grpc_error_handle error)2626 static void start_bdp_ping_locked(void* tp, grpc_error_handle error) {
2627 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2628 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
2629 gpr_log(GPR_INFO, "%s: Start BDP ping err=%s", t->peer_string.c_str(),
2630 grpc_error_std_string(error).c_str());
2631 }
2632 if (error != GRPC_ERROR_NONE || t->closed_with_error != GRPC_ERROR_NONE) {
2633 return;
2634 }
2635 // Reset the keepalive ping timer
2636 if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
2637 grpc_timer_cancel(&t->keepalive_ping_timer);
2638 }
2639 t->flow_control->bdp_estimator()->StartPing();
2640 t->bdp_ping_started = true;
2641 }
2642
finish_bdp_ping(void * tp,grpc_error_handle error)2643 static void finish_bdp_ping(void* tp, grpc_error_handle error) {
2644 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2645 t->combiner->Run(GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked,
2646 finish_bdp_ping_locked, t, nullptr),
2647 GRPC_ERROR_REF(error));
2648 }
2649
finish_bdp_ping_locked(void * tp,grpc_error_handle error)2650 static void finish_bdp_ping_locked(void* tp, grpc_error_handle error) {
2651 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2652 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
2653 gpr_log(GPR_INFO, "%s: Complete BDP ping err=%s", t->peer_string.c_str(),
2654 grpc_error_std_string(error).c_str());
2655 }
2656 if (error != GRPC_ERROR_NONE || t->closed_with_error != GRPC_ERROR_NONE) {
2657 GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
2658 return;
2659 }
2660 if (!t->bdp_ping_started) {
2661 // start_bdp_ping_locked has not been run yet. Schedule
2662 // finish_bdp_ping_locked to be run later.
2663 t->combiner->Run(GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked,
2664 finish_bdp_ping_locked, t, nullptr),
2665 GRPC_ERROR_REF(error));
2666 return;
2667 }
2668 t->bdp_ping_started = false;
2669 grpc_millis next_ping = t->flow_control->bdp_estimator()->CompletePing();
2670 grpc_chttp2_act_on_flowctl_action(t->flow_control->PeriodicUpdate(), t,
2671 nullptr);
2672 GPR_ASSERT(!t->have_next_bdp_ping_timer);
2673 t->have_next_bdp_ping_timer = true;
2674 GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked,
2675 next_bdp_ping_timer_expired, t, grpc_schedule_on_exec_ctx);
2676 grpc_timer_init(&t->next_bdp_ping_timer, next_ping,
2677 &t->next_bdp_ping_timer_expired_locked);
2678 }
2679
next_bdp_ping_timer_expired(void * tp,grpc_error_handle error)2680 static void next_bdp_ping_timer_expired(void* tp, grpc_error_handle error) {
2681 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2682 t->combiner->Run(
2683 GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked,
2684 next_bdp_ping_timer_expired_locked, t, nullptr),
2685 GRPC_ERROR_REF(error));
2686 }
2687
next_bdp_ping_timer_expired_locked(void * tp,grpc_error_handle error)2688 static void next_bdp_ping_timer_expired_locked(void* tp,
2689 grpc_error_handle error) {
2690 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2691 GPR_ASSERT(t->have_next_bdp_ping_timer);
2692 t->have_next_bdp_ping_timer = false;
2693 if (error != GRPC_ERROR_NONE) {
2694 GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
2695 return;
2696 }
2697 if (t->flow_control->bdp_estimator()->accumulator() == 0) {
2698 // Block the bdp ping till we receive more data.
2699 t->bdp_ping_blocked = true;
2700 GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
2701 } else {
2702 schedule_bdp_ping_locked(t);
2703 }
2704 }
2705
grpc_chttp2_config_default_keepalive_args(grpc_channel_args * args,bool is_client)2706 void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
2707 bool is_client) {
2708 size_t i;
2709 if (args) {
2710 for (i = 0; i < args->num_args; i++) {
2711 if (0 == strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) {
2712 const int value = grpc_channel_arg_get_integer(
2713 &args->args[i], {is_client ? g_default_client_keepalive_time_ms
2714 : g_default_server_keepalive_time_ms,
2715 1, INT_MAX});
2716 if (is_client) {
2717 g_default_client_keepalive_time_ms = value;
2718 } else {
2719 g_default_server_keepalive_time_ms = value;
2720 }
2721 } else if (0 ==
2722 strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) {
2723 const int value = grpc_channel_arg_get_integer(
2724 &args->args[i], {is_client ? g_default_client_keepalive_timeout_ms
2725 : g_default_server_keepalive_timeout_ms,
2726 0, INT_MAX});
2727 if (is_client) {
2728 g_default_client_keepalive_timeout_ms = value;
2729 } else {
2730 g_default_server_keepalive_timeout_ms = value;
2731 }
2732 } else if (0 == strcmp(args->args[i].key,
2733 GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) {
2734 const bool value = static_cast<uint32_t>(grpc_channel_arg_get_integer(
2735 &args->args[i],
2736 {is_client ? g_default_client_keepalive_permit_without_calls
2737 : g_default_server_keepalive_timeout_ms,
2738 0, 1}));
2739 if (is_client) {
2740 g_default_client_keepalive_permit_without_calls = value;
2741 } else {
2742 g_default_server_keepalive_permit_without_calls = value;
2743 }
2744 } else if (0 ==
2745 strcmp(args->args[i].key, GRPC_ARG_HTTP2_MAX_PING_STRIKES)) {
2746 g_default_max_ping_strikes = grpc_channel_arg_get_integer(
2747 &args->args[i], {g_default_max_ping_strikes, 0, INT_MAX});
2748 } else if (0 == strcmp(args->args[i].key,
2749 GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) {
2750 g_default_max_pings_without_data = grpc_channel_arg_get_integer(
2751 &args->args[i], {g_default_max_pings_without_data, 0, INT_MAX});
2752 } else if (0 ==
2753 strcmp(
2754 args->args[i].key,
2755 GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS)) {
2756 g_default_min_recv_ping_interval_without_data_ms =
2757 grpc_channel_arg_get_integer(
2758 &args->args[i],
2759 {g_default_min_recv_ping_interval_without_data_ms, 0, INT_MAX});
2760 }
2761 }
2762 }
2763 }
2764
init_keepalive_ping(void * arg,grpc_error_handle error)2765 static void init_keepalive_ping(void* arg, grpc_error_handle error) {
2766 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2767 t->combiner->Run(GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked,
2768 init_keepalive_ping_locked, t, nullptr),
2769 GRPC_ERROR_REF(error));
2770 }
2771
init_keepalive_ping_locked(void * arg,grpc_error_handle error)2772 static void init_keepalive_ping_locked(void* arg, grpc_error_handle error) {
2773 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2774 GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING);
2775 if (t->destroying || t->closed_with_error != GRPC_ERROR_NONE) {
2776 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
2777 } else if (error == GRPC_ERROR_NONE) {
2778 if (t->keepalive_permit_without_calls ||
2779 grpc_chttp2_stream_map_size(&t->stream_map) > 0) {
2780 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING;
2781 GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end");
2782 grpc_timer_init_unset(&t->keepalive_watchdog_timer);
2783 send_keepalive_ping_locked(t);
2784 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING);
2785 } else {
2786 GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
2787 GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t,
2788 grpc_schedule_on_exec_ctx);
2789 grpc_timer_init(&t->keepalive_ping_timer,
2790 grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
2791 &t->init_keepalive_ping_locked);
2792 }
2793 } else if (error == GRPC_ERROR_CANCELLED) {
2794 // The keepalive ping timer may be cancelled by bdp
2795 GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
2796 GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t,
2797 grpc_schedule_on_exec_ctx);
2798 grpc_timer_init(&t->keepalive_ping_timer,
2799 grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
2800 &t->init_keepalive_ping_locked);
2801 }
2802 GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping");
2803 }
2804
start_keepalive_ping(void * arg,grpc_error_handle error)2805 static void start_keepalive_ping(void* arg, grpc_error_handle error) {
2806 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2807 t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked,
2808 start_keepalive_ping_locked, t, nullptr),
2809 GRPC_ERROR_REF(error));
2810 }
2811
start_keepalive_ping_locked(void * arg,grpc_error_handle error)2812 static void start_keepalive_ping_locked(void* arg, grpc_error_handle error) {
2813 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2814 if (error != GRPC_ERROR_NONE) {
2815 return;
2816 }
2817 if (t->channelz_socket != nullptr) {
2818 t->channelz_socket->RecordKeepaliveSent();
2819 }
2820 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
2821 GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
2822 gpr_log(GPR_INFO, "%s: Start keepalive ping", t->peer_string.c_str());
2823 }
2824 GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog");
2825 GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked,
2826 keepalive_watchdog_fired, t, grpc_schedule_on_exec_ctx);
2827 grpc_timer_init(&t->keepalive_watchdog_timer,
2828 grpc_core::ExecCtx::Get()->Now() + t->keepalive_timeout,
2829 &t->keepalive_watchdog_fired_locked);
2830 t->keepalive_ping_started = true;
2831 }
2832
finish_keepalive_ping(void * arg,grpc_error_handle error)2833 static void finish_keepalive_ping(void* arg, grpc_error_handle error) {
2834 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2835 t->combiner->Run(GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked,
2836 finish_keepalive_ping_locked, t, nullptr),
2837 GRPC_ERROR_REF(error));
2838 }
2839
finish_keepalive_ping_locked(void * arg,grpc_error_handle error)2840 static void finish_keepalive_ping_locked(void* arg, grpc_error_handle error) {
2841 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2842 if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
2843 if (error == GRPC_ERROR_NONE) {
2844 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
2845 GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
2846 gpr_log(GPR_INFO, "%s: Finish keepalive ping", t->peer_string.c_str());
2847 }
2848 if (!t->keepalive_ping_started) {
2849 // start_keepalive_ping_locked has not run yet. Reschedule
2850 // finish_keepalive_ping_locked for it to be run later.
2851 t->combiner->Run(
2852 GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked,
2853 finish_keepalive_ping_locked, t, nullptr),
2854 GRPC_ERROR_REF(error));
2855 return;
2856 }
2857 t->keepalive_ping_started = false;
2858 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
2859 grpc_timer_cancel(&t->keepalive_watchdog_timer);
2860 GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
2861 GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t,
2862 grpc_schedule_on_exec_ctx);
2863 grpc_timer_init(&t->keepalive_ping_timer,
2864 grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
2865 &t->init_keepalive_ping_locked);
2866 }
2867 }
2868 GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive ping end");
2869 }
2870
keepalive_watchdog_fired(void * arg,grpc_error_handle error)2871 static void keepalive_watchdog_fired(void* arg, grpc_error_handle error) {
2872 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2873 t->combiner->Run(
2874 GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked,
2875 keepalive_watchdog_fired_locked, t, nullptr),
2876 GRPC_ERROR_REF(error));
2877 }
2878
keepalive_watchdog_fired_locked(void * arg,grpc_error_handle error)2879 static void keepalive_watchdog_fired_locked(void* arg,
2880 grpc_error_handle error) {
2881 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2882 if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
2883 if (error == GRPC_ERROR_NONE) {
2884 gpr_log(GPR_INFO, "%s: Keepalive watchdog fired. Closing transport.",
2885 t->peer_string.c_str());
2886 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
2887 close_transport_locked(
2888 t, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2889 "keepalive watchdog timeout"),
2890 GRPC_ERROR_INT_GRPC_STATUS,
2891 GRPC_STATUS_UNAVAILABLE));
2892 }
2893 } else {
2894 // The watchdog timer should have been cancelled by
2895 // finish_keepalive_ping_locked.
2896 if (GPR_UNLIKELY(error != GRPC_ERROR_CANCELLED)) {
2897 gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)",
2898 t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING);
2899 }
2900 }
2901 GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog");
2902 }
2903
2904 //
2905 // CALLBACK LOOP
2906 //
2907
connectivity_state_set(grpc_chttp2_transport * t,grpc_connectivity_state state,const absl::Status & status,const char * reason)2908 static void connectivity_state_set(grpc_chttp2_transport* t,
2909 grpc_connectivity_state state,
2910 const absl::Status& status,
2911 const char* reason) {
2912 GRPC_CHTTP2_IF_TRACING(
2913 gpr_log(GPR_INFO, "transport %p set connectivity_state=%d", t, state));
2914 t->state_tracker.SetState(state, status, reason);
2915 }
2916
2917 //
2918 // POLLSET STUFF
2919 //
2920
set_pollset(grpc_transport * gt,grpc_stream *,grpc_pollset * pollset)2921 static void set_pollset(grpc_transport* gt, grpc_stream* /*gs*/,
2922 grpc_pollset* pollset) {
2923 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
2924 grpc_endpoint_add_to_pollset(t->ep, pollset);
2925 }
2926
set_pollset_set(grpc_transport * gt,grpc_stream *,grpc_pollset_set * pollset_set)2927 static void set_pollset_set(grpc_transport* gt, grpc_stream* /*gs*/,
2928 grpc_pollset_set* pollset_set) {
2929 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
2930 grpc_endpoint_add_to_pollset_set(t->ep, pollset_set);
2931 }
2932
2933 //
2934 // BYTE STREAM
2935 //
2936
reset_byte_stream(void * arg,grpc_error_handle error)2937 static void reset_byte_stream(void* arg, grpc_error_handle error) {
2938 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(arg);
2939 s->pending_byte_stream = false;
2940 if (error == GRPC_ERROR_NONE) {
2941 grpc_chttp2_maybe_complete_recv_message(s->t, s);
2942 grpc_chttp2_maybe_complete_recv_trailing_metadata(s->t, s);
2943 } else {
2944 GPR_ASSERT(error != GRPC_ERROR_NONE);
2945 grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->on_next, GRPC_ERROR_REF(error));
2946 s->on_next = nullptr;
2947 GRPC_ERROR_UNREF(s->byte_stream_error);
2948 s->byte_stream_error = GRPC_ERROR_NONE;
2949 grpc_chttp2_cancel_stream(s->t, s, GRPC_ERROR_REF(error));
2950 s->byte_stream_error = GRPC_ERROR_REF(error);
2951 }
2952 }
2953
2954 namespace grpc_core {
2955
Chttp2IncomingByteStream(grpc_chttp2_transport * transport,grpc_chttp2_stream * stream,uint32_t frame_size,uint32_t flags)2956 Chttp2IncomingByteStream::Chttp2IncomingByteStream(
2957 grpc_chttp2_transport* transport, grpc_chttp2_stream* stream,
2958 uint32_t frame_size, uint32_t flags)
2959 : ByteStream(frame_size, flags),
2960 transport_(transport),
2961 stream_(stream),
2962 refs_(2),
2963 remaining_bytes_(frame_size) {
2964 GRPC_ERROR_UNREF(stream->byte_stream_error);
2965 stream->byte_stream_error = GRPC_ERROR_NONE;
2966 }
2967
OrphanLocked(void * arg,grpc_error_handle)2968 void Chttp2IncomingByteStream::OrphanLocked(
2969 void* arg, grpc_error_handle /*error_ignored*/) {
2970 Chttp2IncomingByteStream* bs = static_cast<Chttp2IncomingByteStream*>(arg);
2971 grpc_chttp2_stream* s = bs->stream_;
2972 grpc_chttp2_transport* t = s->t;
2973 bs->Unref();
2974 s->pending_byte_stream = false;
2975 grpc_chttp2_maybe_complete_recv_message(t, s);
2976 grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2977 }
2978
Orphan()2979 void Chttp2IncomingByteStream::Orphan() {
2980 GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0);
2981 transport_->combiner->Run(
2982 GRPC_CLOSURE_INIT(&destroy_action_,
2983 &Chttp2IncomingByteStream::OrphanLocked, this, nullptr),
2984 GRPC_ERROR_NONE);
2985 }
2986
NextLocked(void * arg,grpc_error_handle)2987 void Chttp2IncomingByteStream::NextLocked(void* arg,
2988 grpc_error_handle /*error_ignored*/) {
2989 Chttp2IncomingByteStream* bs = static_cast<Chttp2IncomingByteStream*>(arg);
2990 grpc_chttp2_transport* t = bs->transport_;
2991 grpc_chttp2_stream* s = bs->stream_;
2992 size_t cur_length = s->frame_storage.length;
2993 if (!s->read_closed) {
2994 s->flow_control->IncomingByteStreamUpdate(bs->next_action_.max_size_hint,
2995 cur_length);
2996 grpc_chttp2_act_on_flowctl_action(s->flow_control->MakeAction(), t, s);
2997 }
2998 GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0);
2999 if (s->frame_storage.length > 0) {
3000 grpc_slice_buffer_swap(&s->frame_storage,
3001 &s->unprocessed_incoming_frames_buffer);
3002 s->unprocessed_incoming_frames_decompressed = false;
3003 grpc_core::ExecCtx::Run(DEBUG_LOCATION, bs->next_action_.on_complete,
3004 GRPC_ERROR_NONE);
3005 } else if (s->byte_stream_error != GRPC_ERROR_NONE) {
3006 grpc_core::ExecCtx::Run(DEBUG_LOCATION, bs->next_action_.on_complete,
3007 GRPC_ERROR_REF(s->byte_stream_error));
3008 if (s->data_parser.parsing_frame != nullptr) {
3009 s->data_parser.parsing_frame->Unref();
3010 s->data_parser.parsing_frame = nullptr;
3011 }
3012 } else if (s->read_closed) {
3013 if (bs->remaining_bytes_ != 0) {
3014 s->byte_stream_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
3015 "Truncated message", &s->read_closed_error, 1);
3016 grpc_core::ExecCtx::Run(DEBUG_LOCATION, bs->next_action_.on_complete,
3017 GRPC_ERROR_REF(s->byte_stream_error));
3018 if (s->data_parser.parsing_frame != nullptr) {
3019 s->data_parser.parsing_frame->Unref();
3020 s->data_parser.parsing_frame = nullptr;
3021 }
3022 } else {
3023 // Should never reach here.
3024 GPR_ASSERT(false);
3025 }
3026 } else {
3027 s->on_next = bs->next_action_.on_complete;
3028 }
3029 bs->Unref();
3030 }
3031
Next(size_t max_size_hint,grpc_closure * on_complete)3032 bool Chttp2IncomingByteStream::Next(size_t max_size_hint,
3033 grpc_closure* on_complete) {
3034 GPR_TIMER_SCOPE("incoming_byte_stream_next", 0);
3035 if (stream_->unprocessed_incoming_frames_buffer.length > 0) {
3036 return true;
3037 } else {
3038 Ref();
3039 next_action_.max_size_hint = max_size_hint;
3040 next_action_.on_complete = on_complete;
3041 transport_->combiner->Run(
3042 GRPC_CLOSURE_INIT(&next_action_.closure,
3043 &Chttp2IncomingByteStream::NextLocked, this, nullptr),
3044 GRPC_ERROR_NONE);
3045 return false;
3046 }
3047 }
3048
MaybeCreateStreamDecompressionCtx()3049 void Chttp2IncomingByteStream::MaybeCreateStreamDecompressionCtx() {
3050 GPR_DEBUG_ASSERT(stream_->stream_decompression_method !=
3051 GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS);
3052 if (!stream_->stream_decompression_ctx) {
3053 stream_->stream_decompression_ctx = grpc_stream_compression_context_create(
3054 stream_->stream_decompression_method);
3055 }
3056 }
3057
Pull(grpc_slice * slice)3058 grpc_error_handle Chttp2IncomingByteStream::Pull(grpc_slice* slice) {
3059 GPR_TIMER_SCOPE("incoming_byte_stream_pull", 0);
3060 grpc_error_handle error;
3061 if (stream_->unprocessed_incoming_frames_buffer.length > 0) {
3062 if (!stream_->unprocessed_incoming_frames_decompressed &&
3063 stream_->stream_decompression_method !=
3064 GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
3065 bool end_of_context;
3066 MaybeCreateStreamDecompressionCtx();
3067 if (!grpc_stream_decompress(stream_->stream_decompression_ctx,
3068 &stream_->unprocessed_incoming_frames_buffer,
3069 &stream_->decompressed_data_buffer, nullptr,
3070 MAX_SIZE_T, &end_of_context)) {
3071 error =
3072 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream decompression error.");
3073 return error;
3074 }
3075 GPR_ASSERT(stream_->unprocessed_incoming_frames_buffer.length == 0);
3076 grpc_slice_buffer_swap(&stream_->unprocessed_incoming_frames_buffer,
3077 &stream_->decompressed_data_buffer);
3078 stream_->unprocessed_incoming_frames_decompressed = true;
3079 if (end_of_context) {
3080 grpc_stream_compression_context_destroy(
3081 stream_->stream_decompression_ctx);
3082 stream_->stream_decompression_ctx = nullptr;
3083 }
3084 if (stream_->unprocessed_incoming_frames_buffer.length == 0) {
3085 *slice = grpc_empty_slice();
3086 }
3087 }
3088 error = grpc_deframe_unprocessed_incoming_frames(
3089 &stream_->data_parser, stream_,
3090 &stream_->unprocessed_incoming_frames_buffer, slice, nullptr);
3091 if (error != GRPC_ERROR_NONE) {
3092 return error;
3093 }
3094 } else {
3095 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
3096 stream_->t->combiner->Run(&stream_->reset_byte_stream,
3097 GRPC_ERROR_REF(error));
3098 return error;
3099 }
3100 return GRPC_ERROR_NONE;
3101 }
3102
PublishError(grpc_error_handle error)3103 void Chttp2IncomingByteStream::PublishError(grpc_error_handle error) {
3104 GPR_ASSERT(error != GRPC_ERROR_NONE);
3105 grpc_core::ExecCtx::Run(DEBUG_LOCATION, stream_->on_next,
3106 GRPC_ERROR_REF(error));
3107 stream_->on_next = nullptr;
3108 GRPC_ERROR_UNREF(stream_->byte_stream_error);
3109 stream_->byte_stream_error = GRPC_ERROR_REF(error);
3110 grpc_chttp2_cancel_stream(transport_, stream_, GRPC_ERROR_REF(error));
3111 }
3112
Push(const grpc_slice & slice,grpc_slice * slice_out)3113 grpc_error_handle Chttp2IncomingByteStream::Push(const grpc_slice& slice,
3114 grpc_slice* slice_out) {
3115 if (remaining_bytes_ < GRPC_SLICE_LENGTH(slice)) {
3116 grpc_error_handle error =
3117 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream");
3118 transport_->combiner->Run(&stream_->reset_byte_stream,
3119 GRPC_ERROR_REF(error));
3120 grpc_slice_unref_internal(slice);
3121 return error;
3122 } else {
3123 remaining_bytes_ -= static_cast<uint32_t> GRPC_SLICE_LENGTH(slice);
3124 if (slice_out != nullptr) {
3125 *slice_out = slice;
3126 }
3127 return GRPC_ERROR_NONE;
3128 }
3129 }
3130
Finished(grpc_error_handle error,bool reset_on_error)3131 grpc_error_handle Chttp2IncomingByteStream::Finished(grpc_error_handle error,
3132 bool reset_on_error) {
3133 if (error == GRPC_ERROR_NONE) {
3134 if (remaining_bytes_ != 0) {
3135 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
3136 }
3137 }
3138 if (error != GRPC_ERROR_NONE && reset_on_error) {
3139 transport_->combiner->Run(&stream_->reset_byte_stream,
3140 GRPC_ERROR_REF(error));
3141 }
3142 Unref();
3143 return error;
3144 }
3145
Shutdown(grpc_error_handle error)3146 void Chttp2IncomingByteStream::Shutdown(grpc_error_handle error) {
3147 GRPC_ERROR_UNREF(Finished(error, true /* reset_on_error */));
3148 }
3149
3150 } // namespace grpc_core
3151
3152 //
3153 // RESOURCE QUOTAS
3154 //
3155
post_benign_reclaimer(grpc_chttp2_transport * t)3156 static void post_benign_reclaimer(grpc_chttp2_transport* t) {
3157 if (!t->benign_reclaimer_registered) {
3158 t->benign_reclaimer_registered = true;
3159 GRPC_CHTTP2_REF_TRANSPORT(t, "benign_reclaimer");
3160 GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked, benign_reclaimer, t,
3161 grpc_schedule_on_exec_ctx);
3162 grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep),
3163 false, &t->benign_reclaimer_locked);
3164 }
3165 }
3166
post_destructive_reclaimer(grpc_chttp2_transport * t)3167 static void post_destructive_reclaimer(grpc_chttp2_transport* t) {
3168 if (!t->destructive_reclaimer_registered) {
3169 t->destructive_reclaimer_registered = true;
3170 GRPC_CHTTP2_REF_TRANSPORT(t, "destructive_reclaimer");
3171 GRPC_CLOSURE_INIT(&t->destructive_reclaimer_locked, destructive_reclaimer,
3172 t, grpc_schedule_on_exec_ctx);
3173 grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep),
3174 true, &t->destructive_reclaimer_locked);
3175 }
3176 }
3177
benign_reclaimer(void * arg,grpc_error_handle error)3178 static void benign_reclaimer(void* arg, grpc_error_handle error) {
3179 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
3180 t->combiner->Run(GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked,
3181 benign_reclaimer_locked, t, nullptr),
3182 GRPC_ERROR_REF(error));
3183 }
3184
benign_reclaimer_locked(void * arg,grpc_error_handle error)3185 static void benign_reclaimer_locked(void* arg, grpc_error_handle error) {
3186 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
3187 if (error == GRPC_ERROR_NONE &&
3188 grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
3189 // Channel with no active streams: send a goaway to try and make it
3190 // disconnect cleanly
3191 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
3192 gpr_log(GPR_INFO, "HTTP2: %s - send goaway to free memory",
3193 t->peer_string.c_str());
3194 }
3195 send_goaway(t,
3196 grpc_error_set_int(
3197 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"),
3198 GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM));
3199 } else if (error == GRPC_ERROR_NONE &&
3200 GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
3201 gpr_log(GPR_INFO,
3202 "HTTP2: %s - skip benign reclamation, there are still %" PRIdPTR
3203 " streams",
3204 t->peer_string.c_str(),
3205 grpc_chttp2_stream_map_size(&t->stream_map));
3206 }
3207 t->benign_reclaimer_registered = false;
3208 if (error != GRPC_ERROR_CANCELLED) {
3209 grpc_resource_user_finish_reclamation(
3210 grpc_endpoint_get_resource_user(t->ep));
3211 }
3212 GRPC_CHTTP2_UNREF_TRANSPORT(t, "benign_reclaimer");
3213 }
3214
destructive_reclaimer(void * arg,grpc_error_handle error)3215 static void destructive_reclaimer(void* arg, grpc_error_handle error) {
3216 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
3217 t->combiner->Run(GRPC_CLOSURE_INIT(&t->destructive_reclaimer_locked,
3218 destructive_reclaimer_locked, t, nullptr),
3219 GRPC_ERROR_REF(error));
3220 }
3221
destructive_reclaimer_locked(void * arg,grpc_error_handle error)3222 static void destructive_reclaimer_locked(void* arg, grpc_error_handle error) {
3223 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
3224 size_t n = grpc_chttp2_stream_map_size(&t->stream_map);
3225 t->destructive_reclaimer_registered = false;
3226 if (error == GRPC_ERROR_NONE && n > 0) {
3227 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(
3228 grpc_chttp2_stream_map_rand(&t->stream_map));
3229 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
3230 gpr_log(GPR_INFO, "HTTP2: %s - abandon stream id %d",
3231 t->peer_string.c_str(), s->id);
3232 }
3233 grpc_chttp2_cancel_stream(
3234 t, s,
3235 grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"),
3236 GRPC_ERROR_INT_HTTP2_ERROR,
3237 GRPC_HTTP2_ENHANCE_YOUR_CALM));
3238 if (n > 1) {
3239 // Since we cancel one stream per destructive reclamation, if
3240 // there are more streams left, we can immediately post a new
3241 // reclaimer in case the resource quota needs to free more
3242 // memory
3243 post_destructive_reclaimer(t);
3244 }
3245 }
3246 if (error != GRPC_ERROR_CANCELLED) {
3247 grpc_resource_user_finish_reclamation(
3248 grpc_endpoint_get_resource_user(t->ep));
3249 }
3250 GRPC_CHTTP2_UNREF_TRANSPORT(t, "destructive_reclaimer");
3251 }
3252
3253 //
3254 // MONITORING
3255 //
3256
grpc_chttp2_initiate_write_reason_string(grpc_chttp2_initiate_write_reason reason)3257 const char* grpc_chttp2_initiate_write_reason_string(
3258 grpc_chttp2_initiate_write_reason reason) {
3259 switch (reason) {
3260 case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE:
3261 return "INITIAL_WRITE";
3262 case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM:
3263 return "START_NEW_STREAM";
3264 case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE:
3265 return "SEND_MESSAGE";
3266 case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA:
3267 return "SEND_INITIAL_METADATA";
3268 case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA:
3269 return "SEND_TRAILING_METADATA";
3270 case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING:
3271 return "RETRY_SEND_PING";
3272 case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS:
3273 return "CONTINUE_PINGS";
3274 case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT:
3275 return "GOAWAY_SENT";
3276 case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM:
3277 return "RST_STREAM";
3278 case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API:
3279 return "CLOSE_FROM_API";
3280 case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL:
3281 return "STREAM_FLOW_CONTROL";
3282 case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL:
3283 return "TRANSPORT_FLOW_CONTROL";
3284 case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS:
3285 return "SEND_SETTINGS";
3286 case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING:
3287 return "FLOW_CONTROL_UNSTALLED_BY_SETTING";
3288 case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE:
3289 return "FLOW_CONTROL_UNSTALLED_BY_UPDATE";
3290 case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING:
3291 return "APPLICATION_PING";
3292 case GRPC_CHTTP2_INITIATE_WRITE_BDP_PING:
3293 return "BDP_PING";
3294 case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING:
3295 return "KEEPALIVE_PING";
3296 case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED:
3297 return "TRANSPORT_FLOW_CONTROL_UNSTALLED";
3298 case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE:
3299 return "PING_RESPONSE";
3300 case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM:
3301 return "FORCE_RST_STREAM";
3302 }
3303 GPR_UNREACHABLE_CODE(return "unknown");
3304 }
3305
chttp2_get_endpoint(grpc_transport * t)3306 static grpc_endpoint* chttp2_get_endpoint(grpc_transport* t) {
3307 return (reinterpret_cast<grpc_chttp2_transport*>(t))->ep;
3308 }
3309
3310 static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
3311 "chttp2",
3312 init_stream,
3313 set_pollset,
3314 set_pollset_set,
3315 perform_stream_op,
3316 perform_transport_op,
3317 destroy_stream,
3318 destroy_transport,
3319 chttp2_get_endpoint};
3320
get_vtable(void)3321 static const grpc_transport_vtable* get_vtable(void) { return &vtable; }
3322
3323 grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode>
grpc_chttp2_transport_get_socket_node(grpc_transport * transport)3324 grpc_chttp2_transport_get_socket_node(grpc_transport* transport) {
3325 grpc_chttp2_transport* t =
3326 reinterpret_cast<grpc_chttp2_transport*>(transport);
3327 return t->channelz_socket;
3328 }
3329
grpc_create_chttp2_transport(const grpc_channel_args * channel_args,grpc_endpoint * ep,bool is_client,grpc_resource_user * resource_user)3330 grpc_transport* grpc_create_chttp2_transport(
3331 const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client,
3332 grpc_resource_user* resource_user) {
3333 auto t =
3334 new grpc_chttp2_transport(channel_args, ep, is_client, resource_user);
3335 return &t->base;
3336 }
3337
grpc_chttp2_transport_start_reading(grpc_transport * transport,grpc_slice_buffer * read_buffer,grpc_closure * notify_on_receive_settings,grpc_closure * notify_on_close)3338 void grpc_chttp2_transport_start_reading(
3339 grpc_transport* transport, grpc_slice_buffer* read_buffer,
3340 grpc_closure* notify_on_receive_settings, grpc_closure* notify_on_close) {
3341 grpc_chttp2_transport* t =
3342 reinterpret_cast<grpc_chttp2_transport*>(transport);
3343 GRPC_CHTTP2_REF_TRANSPORT(
3344 t, "reading_action"); /* matches unref inside reading_action */
3345 if (read_buffer != nullptr) {
3346 grpc_slice_buffer_move_into(read_buffer, &t->read_buffer);
3347 gpr_free(read_buffer);
3348 }
3349 t->notify_on_receive_settings = notify_on_receive_settings;
3350 t->notify_on_close = notify_on_close;
3351 t->combiner->Run(
3352 GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t, nullptr),
3353 GRPC_ERROR_NONE);
3354 }
3355