1 /*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
22
23 #include <grpc/slice_buffer.h>
24 #include <grpc/support/alloc.h>
25 #include <grpc/support/log.h>
26 #include <grpc/support/port_platform.h>
27 #include <grpc/support/string_util.h>
28 #include <inttypes.h>
29 #include <limits.h>
30 #include <math.h>
31 #include <stdio.h>
32 #include <string.h>
33
34 #include "absl/strings/str_format.h"
35 #include "src/core/ext/transport/chttp2/transport/context_list.h"
36 #include "src/core/ext/transport/chttp2/transport/frame_data.h"
37 #include "src/core/ext/transport/chttp2/transport/internal.h"
38 #include "src/core/ext/transport/chttp2/transport/varint.h"
39 #include "src/core/lib/channel/channel_args.h"
40 #include "src/core/lib/compression/stream_compression.h"
41 #include "src/core/lib/debug/stats.h"
42 #include "src/core/lib/gpr/env.h"
43 #include "src/core/lib/gpr/string.h"
44 #include "src/core/lib/gprpp/memory.h"
45 #include "src/core/lib/http/parser.h"
46 #include "src/core/lib/iomgr/executor.h"
47 #include "src/core/lib/iomgr/iomgr.h"
48 #include "src/core/lib/iomgr/timer.h"
49 #include "src/core/lib/profiling/timers.h"
50 #include "src/core/lib/slice/slice_internal.h"
51 #include "src/core/lib/slice/slice_string_helpers.h"
52 #include "src/core/lib/transport/error_utils.h"
53 #include "src/core/lib/transport/http2_errors.h"
54 #include "src/core/lib/transport/static_metadata.h"
55 #include "src/core/lib/transport/status_conversion.h"
56 #include "src/core/lib/transport/timeout_encoding.h"
57 #include "src/core/lib/transport/transport.h"
58 #include "src/core/lib/transport/transport_impl.h"
59 #include "src/core/lib/uri/uri_parser.h"
60
61 #define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
62 #define MAX_WINDOW 0x7fffffffu
63 #define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024)
64 #define DEFAULT_MAX_HEADER_LIST_SIZE (8 * 1024)
65
66 #define DEFAULT_CLIENT_KEEPALIVE_TIME_MS INT_MAX
67 #define DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_MS 20000 /* 20 seconds */
68 #define DEFAULT_SERVER_KEEPALIVE_TIME_MS 7200000 /* 2 hours */
69 #define DEFAULT_SERVER_KEEPALIVE_TIMEOUT_MS 20000 /* 20 seconds */
70 #define DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS false
71 #define KEEPALIVE_TIME_BACKOFF_MULTIPLIER 2
72
73 #define DEFAULT_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS 300000 /* 5 minutes */
74 #define DEFAULT_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS 300000 /* 5 minutes */
75 #define DEFAULT_MAX_PINGS_BETWEEN_DATA 2
76 #define DEFAULT_MAX_PING_STRIKES 2
77
78 #define DEFAULT_MAX_PENDING_INDUCED_FRAMES 10000
79
80 static int g_default_client_keepalive_time_ms =
81 DEFAULT_CLIENT_KEEPALIVE_TIME_MS;
82 static int g_default_client_keepalive_timeout_ms =
83 DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_MS;
84 static int g_default_server_keepalive_time_ms =
85 DEFAULT_SERVER_KEEPALIVE_TIME_MS;
86 static int g_default_server_keepalive_timeout_ms =
87 DEFAULT_SERVER_KEEPALIVE_TIMEOUT_MS;
88 static bool g_default_client_keepalive_permit_without_calls =
89 DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS;
90 static bool g_default_server_keepalive_permit_without_calls =
91 DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS;
92
93 static int g_default_min_sent_ping_interval_without_data_ms =
94 DEFAULT_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS;
95 static int g_default_min_recv_ping_interval_without_data_ms =
96 DEFAULT_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS;
97 static int g_default_max_pings_without_data = DEFAULT_MAX_PINGS_BETWEEN_DATA;
98 static int g_default_max_ping_strikes = DEFAULT_MAX_PING_STRIKES;
99
100 #define MAX_CLIENT_STREAM_ID 0x7fffffffu
101 grpc_core::TraceFlag grpc_http_trace(false, "http");
102 grpc_core::TraceFlag grpc_keepalive_trace(false, "http_keepalive");
103 grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount(false,
104 "chttp2_refcount");
105
106 /* forward declarations of various callbacks that we'll build closures around */
107 static void write_action_begin_locked(void* t, grpc_error* error);
108 static void write_action(void* t, grpc_error* error);
109 static void write_action_end(void* t, grpc_error* error);
110 static void write_action_end_locked(void* t, grpc_error* error);
111
112 static void read_action(void* t, grpc_error* error);
113 static void read_action_locked(void* t, grpc_error* error);
114 static void continue_read_action_locked(grpc_chttp2_transport* t);
115
116 static void complete_fetch(void* gs, grpc_error* error);
117 static void complete_fetch_locked(void* gs, grpc_error* error);
118 /** Set a transport level setting, and push it to our peer */
119 static void queue_setting_update(grpc_chttp2_transport* t,
120 grpc_chttp2_setting_id id, uint32_t value);
121
122 static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
123 grpc_error* error);
124
125 /** Start new streams that have been created if we can */
126 static void maybe_start_some_streams(grpc_chttp2_transport* t);
127
128 static void connectivity_state_set(grpc_chttp2_transport* t,
129 grpc_connectivity_state state,
130 const char* reason);
131
132 static void benign_reclaimer(void* t, grpc_error* error);
133 static void destructive_reclaimer(void* t, grpc_error* error);
134 static void benign_reclaimer_locked(void* t, grpc_error* error);
135 static void destructive_reclaimer_locked(void* t, grpc_error* error);
136
137 static void post_benign_reclaimer(grpc_chttp2_transport* t);
138 static void post_destructive_reclaimer(grpc_chttp2_transport* t);
139
140 static void close_transport_locked(grpc_chttp2_transport* t, grpc_error* error);
141 static void end_all_the_calls(grpc_chttp2_transport* t, grpc_error* error);
142
143 static void schedule_bdp_ping_locked(grpc_chttp2_transport* t);
144 static void start_bdp_ping(void* tp, grpc_error* error);
145 static void finish_bdp_ping(void* tp, grpc_error* error);
146 static void start_bdp_ping_locked(void* tp, grpc_error* error);
147 static void finish_bdp_ping_locked(void* tp, grpc_error* error);
148 static void next_bdp_ping_timer_expired(void* tp, grpc_error* error);
149 static void next_bdp_ping_timer_expired_locked(void* tp, grpc_error* error);
150
151 static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error);
152 static void send_ping_locked(grpc_chttp2_transport* t,
153 grpc_closure* on_initiate,
154 grpc_closure* on_complete);
155 static void retry_initiate_ping_locked(void* tp, grpc_error* error);
156
157 /** keepalive-relevant functions */
158 static void init_keepalive_ping(void* arg, grpc_error* error);
159 static void init_keepalive_ping_locked(void* arg, grpc_error* error);
160 static void start_keepalive_ping(void* arg, grpc_error* error);
161 static void finish_keepalive_ping(void* arg, grpc_error* error);
162 static void start_keepalive_ping_locked(void* arg, grpc_error* error);
163 static void finish_keepalive_ping_locked(void* arg, grpc_error* error);
164 static void keepalive_watchdog_fired(void* arg, grpc_error* error);
165 static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error);
166
167 static void reset_byte_stream(void* arg, grpc_error* error);
168
169 // Flow control default enabled. Can be disabled by setting
170 // GRPC_EXPERIMENTAL_DISABLE_FLOW_CONTROL
171 bool g_flow_control_enabled = true;
172
173 /*******************************************************************************
174 * CONSTRUCTION/DESTRUCTION/REFCOUNTING
175 */
176
~grpc_chttp2_transport()177 grpc_chttp2_transport::~grpc_chttp2_transport() {
178 size_t i;
179
180 if (channelz_socket != nullptr) {
181 channelz_socket.reset();
182 }
183
184 grpc_endpoint_destroy(ep);
185
186 grpc_slice_buffer_destroy_internal(&qbuf);
187
188 grpc_slice_buffer_destroy_internal(&outbuf);
189 grpc_chttp2_hpack_compressor_destroy(&hpack_compressor);
190
191 grpc_error* error =
192 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed");
193 // ContextList::Execute follows semantics of a callback function and does not
194 // take a ref on error
195 grpc_core::ContextList::Execute(cl, nullptr, error);
196 GRPC_ERROR_UNREF(error);
197 cl = nullptr;
198
199 grpc_slice_buffer_destroy_internal(&read_buffer);
200 grpc_chttp2_hpack_parser_destroy(&hpack_parser);
201 grpc_chttp2_goaway_parser_destroy(&goaway_parser);
202
203 for (i = 0; i < STREAM_LIST_COUNT; i++) {
204 GPR_ASSERT(lists[i].head == nullptr);
205 GPR_ASSERT(lists[i].tail == nullptr);
206 }
207
208 GRPC_ERROR_UNREF(goaway_error);
209
210 GPR_ASSERT(grpc_chttp2_stream_map_size(&stream_map) == 0);
211
212 grpc_chttp2_stream_map_destroy(&stream_map);
213
214 GRPC_COMBINER_UNREF(combiner, "chttp2_transport");
215
216 cancel_pings(this,
217 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"));
218
219 while (write_cb_pool) {
220 grpc_chttp2_write_cb* next = write_cb_pool->next;
221 gpr_free(write_cb_pool);
222 write_cb_pool = next;
223 }
224
225 flow_control.Destroy();
226
227 GRPC_ERROR_UNREF(closed_with_error);
228 gpr_free(ping_acks);
229 gpr_free(peer_string);
230 }
231
232 static const grpc_transport_vtable* get_vtable(void);
233
234 /* Returns whether bdp is enabled */
read_channel_args(grpc_chttp2_transport * t,const grpc_channel_args * channel_args,bool is_client)235 static bool read_channel_args(grpc_chttp2_transport* t,
236 const grpc_channel_args* channel_args,
237 bool is_client) {
238 bool enable_bdp = true;
239 bool channelz_enabled = GRPC_ENABLE_CHANNELZ_DEFAULT;
240 size_t i;
241 int j;
242
243 for (i = 0; i < channel_args->num_args; i++) {
244 if (0 == strcmp(channel_args->args[i].key,
245 GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER)) {
246 const grpc_integer_options options = {-1, 0, INT_MAX};
247 const int value =
248 grpc_channel_arg_get_integer(&channel_args->args[i], options);
249 if (value >= 0) {
250 if ((t->next_stream_id & 1) != (value & 1)) {
251 gpr_log(GPR_ERROR, "%s: low bit must be %d on %s",
252 GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER, t->next_stream_id & 1,
253 is_client ? "client" : "server");
254 } else {
255 t->next_stream_id = static_cast<uint32_t>(value);
256 }
257 }
258 } else if (0 == strcmp(channel_args->args[i].key,
259 GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER)) {
260 const grpc_integer_options options = {-1, 0, INT_MAX};
261 const int value =
262 grpc_channel_arg_get_integer(&channel_args->args[i], options);
263 if (value >= 0) {
264 grpc_chttp2_hpack_compressor_set_max_usable_size(
265 &t->hpack_compressor, static_cast<uint32_t>(value));
266 }
267 } else if (0 == strcmp(channel_args->args[i].key,
268 GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) {
269 t->ping_policy.max_pings_without_data = grpc_channel_arg_get_integer(
270 &channel_args->args[i],
271 {g_default_max_pings_without_data, 0, INT_MAX});
272 } else if (0 == strcmp(channel_args->args[i].key,
273 GRPC_ARG_HTTP2_MAX_PING_STRIKES)) {
274 t->ping_policy.max_ping_strikes = grpc_channel_arg_get_integer(
275 &channel_args->args[i], {g_default_max_ping_strikes, 0, INT_MAX});
276 } else if (0 ==
277 strcmp(channel_args->args[i].key,
278 GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS)) {
279 t->ping_policy.min_sent_ping_interval_without_data =
280 grpc_channel_arg_get_integer(
281 &channel_args->args[i],
282 grpc_integer_options{
283 g_default_min_sent_ping_interval_without_data_ms, 0,
284 INT_MAX});
285 } else if (0 ==
286 strcmp(channel_args->args[i].key,
287 GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS)) {
288 t->ping_policy.min_recv_ping_interval_without_data =
289 grpc_channel_arg_get_integer(
290 &channel_args->args[i],
291 grpc_integer_options{
292 g_default_min_recv_ping_interval_without_data_ms, 0,
293 INT_MAX});
294 } else if (0 == strcmp(channel_args->args[i].key,
295 GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)) {
296 t->write_buffer_size = static_cast<uint32_t>(grpc_channel_arg_get_integer(
297 &channel_args->args[i], {0, 0, MAX_WRITE_BUFFER_SIZE}));
298 } else if (0 ==
299 strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) {
300 enable_bdp = grpc_channel_arg_get_bool(&channel_args->args[i], true);
301 } else if (0 ==
302 strcmp(channel_args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) {
303 const int value = grpc_channel_arg_get_integer(
304 &channel_args->args[i],
305 grpc_integer_options{t->is_client
306 ? g_default_client_keepalive_time_ms
307 : g_default_server_keepalive_time_ms,
308 1, INT_MAX});
309 t->keepalive_time = value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value;
310 } else if (0 == strcmp(channel_args->args[i].key,
311 GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) {
312 const int value = grpc_channel_arg_get_integer(
313 &channel_args->args[i],
314 grpc_integer_options{t->is_client
315 ? g_default_client_keepalive_timeout_ms
316 : g_default_server_keepalive_timeout_ms,
317 0, INT_MAX});
318 t->keepalive_timeout = value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value;
319 } else if (0 == strcmp(channel_args->args[i].key,
320 GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) {
321 t->keepalive_permit_without_calls = static_cast<uint32_t>(
322 grpc_channel_arg_get_integer(&channel_args->args[i], {0, 0, 1}));
323 } else if (0 == strcmp(channel_args->args[i].key,
324 GRPC_ARG_OPTIMIZATION_TARGET)) {
325 gpr_log(GPR_INFO, "GRPC_ARG_OPTIMIZATION_TARGET is deprecated");
326 } else if (0 ==
327 strcmp(channel_args->args[i].key, GRPC_ARG_ENABLE_CHANNELZ)) {
328 channelz_enabled = grpc_channel_arg_get_bool(
329 &channel_args->args[i], GRPC_ENABLE_CHANNELZ_DEFAULT);
330 } else {
331 static const struct {
332 const char* channel_arg_name;
333 grpc_chttp2_setting_id setting_id;
334 grpc_integer_options integer_options;
335 bool availability[2] /* server, client */;
336 } settings_map[] = {{GRPC_ARG_MAX_CONCURRENT_STREAMS,
337 GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
338 {-1, 0, INT32_MAX},
339 {true, false}},
340 {GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER,
341 GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE,
342 {-1, 0, INT32_MAX},
343 {true, true}},
344 {GRPC_ARG_MAX_METADATA_SIZE,
345 GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
346 {-1, 0, INT32_MAX},
347 {true, true}},
348 {GRPC_ARG_HTTP2_MAX_FRAME_SIZE,
349 GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
350 {-1, 16384, 16777215},
351 {true, true}},
352 {GRPC_ARG_HTTP2_ENABLE_TRUE_BINARY,
353 GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA,
354 {1, 0, 1},
355 {true, true}},
356 {GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES,
357 GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
358 {-1, 5, INT32_MAX},
359 {true, true}}};
360 for (j = 0; j < static_cast<int> GPR_ARRAY_SIZE(settings_map); j++) {
361 if (0 == strcmp(channel_args->args[i].key,
362 settings_map[j].channel_arg_name)) {
363 if (!settings_map[j].availability[is_client]) {
364 gpr_log(GPR_DEBUG, "%s is not available on %s",
365 settings_map[j].channel_arg_name,
366 is_client ? "clients" : "servers");
367 } else {
368 int value = grpc_channel_arg_get_integer(
369 &channel_args->args[i], settings_map[j].integer_options);
370 if (value >= 0) {
371 queue_setting_update(t, settings_map[j].setting_id,
372 static_cast<uint32_t>(value));
373 }
374 }
375 break;
376 }
377 }
378 }
379 }
380 if (channelz_enabled) {
381 // TODO(ncteisen): add an API to endpoint to query for local addr, and pass
382 // it in here, so SocketNode knows its own address.
383 t->channelz_socket =
384 grpc_core::MakeRefCounted<grpc_core::channelz::SocketNode>(
385 "", t->peer_string,
386 absl::StrFormat("%s %s", get_vtable()->name, t->peer_string));
387 }
388 return enable_bdp;
389 }
390
init_transport_keepalive_settings(grpc_chttp2_transport * t)391 static void init_transport_keepalive_settings(grpc_chttp2_transport* t) {
392 if (t->is_client) {
393 t->keepalive_time = g_default_client_keepalive_time_ms == INT_MAX
394 ? GRPC_MILLIS_INF_FUTURE
395 : g_default_client_keepalive_time_ms;
396 t->keepalive_timeout = g_default_client_keepalive_timeout_ms == INT_MAX
397 ? GRPC_MILLIS_INF_FUTURE
398 : g_default_client_keepalive_timeout_ms;
399 t->keepalive_permit_without_calls =
400 g_default_client_keepalive_permit_without_calls;
401 } else {
402 t->keepalive_time = g_default_server_keepalive_time_ms == INT_MAX
403 ? GRPC_MILLIS_INF_FUTURE
404 : g_default_server_keepalive_time_ms;
405 t->keepalive_timeout = g_default_server_keepalive_timeout_ms == INT_MAX
406 ? GRPC_MILLIS_INF_FUTURE
407 : g_default_server_keepalive_timeout_ms;
408 t->keepalive_permit_without_calls =
409 g_default_server_keepalive_permit_without_calls;
410 }
411 }
412
configure_transport_ping_policy(grpc_chttp2_transport * t)413 static void configure_transport_ping_policy(grpc_chttp2_transport* t) {
414 t->ping_policy.max_pings_without_data = g_default_max_pings_without_data;
415 t->ping_policy.min_sent_ping_interval_without_data =
416 g_default_min_sent_ping_interval_without_data_ms;
417 t->ping_policy.max_ping_strikes = g_default_max_ping_strikes;
418 t->ping_policy.min_recv_ping_interval_without_data =
419 g_default_min_recv_ping_interval_without_data_ms;
420 }
421
init_keepalive_pings_if_enabled(grpc_chttp2_transport * t)422 static void init_keepalive_pings_if_enabled(grpc_chttp2_transport* t) {
423 if (t->keepalive_time != GRPC_MILLIS_INF_FUTURE) {
424 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
425 GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
426 GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t,
427 grpc_schedule_on_exec_ctx);
428 grpc_timer_init(&t->keepalive_ping_timer,
429 grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
430 &t->init_keepalive_ping_locked);
431 } else {
432 /* Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no
433 inflight keeaplive timers */
434 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED;
435 }
436 }
437
grpc_chttp2_transport(const grpc_channel_args * channel_args,grpc_endpoint * ep,bool is_client,grpc_resource_user * resource_user)438 grpc_chttp2_transport::grpc_chttp2_transport(
439 const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client,
440 grpc_resource_user* resource_user)
441 : refs(1, &grpc_trace_chttp2_refcount),
442 ep(ep),
443 peer_string(grpc_endpoint_get_peer(ep)),
444 resource_user(resource_user),
445 combiner(grpc_combiner_create()),
446 state_tracker(is_client ? "client_transport" : "server_transport",
447 GRPC_CHANNEL_READY),
448 is_client(is_client),
449 next_stream_id(is_client ? 1 : 2),
450 deframe_state(is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0) {
451 GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) ==
452 GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
453 base.vtable = get_vtable();
454 /* 8 is a random stab in the dark as to a good initial size: it's small enough
455 that it shouldn't waste memory for infrequently used connections, yet
456 large enough that the exponential growth should happen nicely when it's
457 needed.
458 TODO(ctiller): tune this */
459 grpc_chttp2_stream_map_init(&stream_map, 8);
460
461 grpc_slice_buffer_init(&read_buffer);
462 grpc_slice_buffer_init(&outbuf);
463 if (is_client) {
464 grpc_slice_buffer_add(&outbuf, grpc_slice_from_copied_string(
465 GRPC_CHTTP2_CLIENT_CONNECT_STRING));
466 }
467 grpc_chttp2_hpack_compressor_init(&hpack_compressor);
468 grpc_slice_buffer_init(&qbuf);
469 /* copy in initial settings to all setting sets */
470 size_t i;
471 int j;
472 for (i = 0; i < GRPC_CHTTP2_NUM_SETTINGS; i++) {
473 for (j = 0; j < GRPC_NUM_SETTING_SETS; j++) {
474 settings[j][i] = grpc_chttp2_settings_parameters[i].default_value;
475 }
476 }
477 grpc_chttp2_hpack_parser_init(&hpack_parser);
478 grpc_chttp2_goaway_parser_init(&goaway_parser);
479
480 /* configure http2 the way we like it */
481 if (is_client) {
482 queue_setting_update(this, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
483 queue_setting_update(this, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
484 }
485 queue_setting_update(this, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
486 DEFAULT_MAX_HEADER_LIST_SIZE);
487 queue_setting_update(this,
488 GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, 1);
489
490 configure_transport_ping_policy(this);
491 init_transport_keepalive_settings(this);
492
493 bool enable_bdp = true;
494 if (channel_args) {
495 enable_bdp = read_channel_args(this, channel_args, is_client);
496 }
497
498 if (g_flow_control_enabled) {
499 flow_control.Init<grpc_core::chttp2::TransportFlowControl>(this,
500 enable_bdp);
501 } else {
502 flow_control.Init<grpc_core::chttp2::TransportFlowControlDisabled>(this);
503 enable_bdp = false;
504 }
505
506 /* No pings allowed before receiving a header or data frame. */
507 ping_state.pings_before_data_required = 0;
508 ping_state.is_delayed_ping_timer_set = false;
509 ping_state.last_ping_sent_time = GRPC_MILLIS_INF_PAST;
510
511 ping_recv_state.last_ping_recv_time = GRPC_MILLIS_INF_PAST;
512 ping_recv_state.ping_strikes = 0;
513
514 init_keepalive_pings_if_enabled(this);
515
516 if (enable_bdp) {
517 GRPC_CHTTP2_REF_TRANSPORT(this, "bdp_ping");
518 schedule_bdp_ping_locked(this);
519 grpc_chttp2_act_on_flowctl_action(flow_control->PeriodicUpdate(), this,
520 nullptr);
521 }
522
523 grpc_chttp2_initiate_write(this, GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE);
524 post_benign_reclaimer(this);
525 }
526
destroy_transport_locked(void * tp,grpc_error *)527 static void destroy_transport_locked(void* tp, grpc_error* /*error*/) {
528 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
529 t->destroying = 1;
530 close_transport_locked(
531 t, grpc_error_set_int(
532 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"),
533 GRPC_ERROR_INT_OCCURRED_DURING_WRITE, t->write_state));
534 // Must be the last line.
535 GRPC_CHTTP2_UNREF_TRANSPORT(t, "destroy");
536 }
537
destroy_transport(grpc_transport * gt)538 static void destroy_transport(grpc_transport* gt) {
539 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
540 t->combiner->Run(GRPC_CLOSURE_CREATE(destroy_transport_locked, t, nullptr),
541 GRPC_ERROR_NONE);
542 }
543
close_transport_locked(grpc_chttp2_transport * t,grpc_error * error)544 static void close_transport_locked(grpc_chttp2_transport* t,
545 grpc_error* error) {
546 end_all_the_calls(t, GRPC_ERROR_REF(error));
547 cancel_pings(t, GRPC_ERROR_REF(error));
548 if (t->closed_with_error == GRPC_ERROR_NONE) {
549 if (!grpc_error_has_clear_grpc_status(error)) {
550 error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
551 GRPC_STATUS_UNAVAILABLE);
552 }
553 if (t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE) {
554 if (t->close_transport_on_writes_finished == nullptr) {
555 t->close_transport_on_writes_finished =
556 GRPC_ERROR_CREATE_FROM_STATIC_STRING(
557 "Delayed close due to in-progress write");
558 }
559 t->close_transport_on_writes_finished =
560 grpc_error_add_child(t->close_transport_on_writes_finished, error);
561 return;
562 }
563 GPR_ASSERT(error != GRPC_ERROR_NONE);
564 t->closed_with_error = GRPC_ERROR_REF(error);
565 connectivity_state_set(t, GRPC_CHANNEL_SHUTDOWN, "close_transport");
566 if (t->ping_state.is_delayed_ping_timer_set) {
567 grpc_timer_cancel(&t->ping_state.delayed_ping_timer);
568 }
569 if (t->have_next_bdp_ping_timer) {
570 grpc_timer_cancel(&t->next_bdp_ping_timer);
571 }
572 switch (t->keepalive_state) {
573 case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING:
574 grpc_timer_cancel(&t->keepalive_ping_timer);
575 break;
576 case GRPC_CHTTP2_KEEPALIVE_STATE_PINGING:
577 grpc_timer_cancel(&t->keepalive_ping_timer);
578 grpc_timer_cancel(&t->keepalive_watchdog_timer);
579 break;
580 case GRPC_CHTTP2_KEEPALIVE_STATE_DYING:
581 case GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED:
582 /* keepalive timers are not set in these two states */
583 break;
584 }
585
586 /* flush writable stream list to avoid dangling references */
587 grpc_chttp2_stream* s;
588 while (grpc_chttp2_list_pop_writable_stream(t, &s)) {
589 GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:close");
590 }
591 GPR_ASSERT(t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE);
592 grpc_endpoint_shutdown(t->ep, GRPC_ERROR_REF(error));
593 }
594 if (t->notify_on_receive_settings != nullptr) {
595 grpc_core::ExecCtx::Run(DEBUG_LOCATION, t->notify_on_receive_settings,
596 GRPC_ERROR_CANCELLED);
597 t->notify_on_receive_settings = nullptr;
598 }
599 GRPC_ERROR_UNREF(error);
600 }
601
602 #ifndef NDEBUG
grpc_chttp2_stream_ref(grpc_chttp2_stream * s,const char * reason)603 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s, const char* reason) {
604 grpc_stream_ref(s->refcount, reason);
605 }
grpc_chttp2_stream_unref(grpc_chttp2_stream * s,const char * reason)606 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s, const char* reason) {
607 grpc_stream_unref(s->refcount, reason);
608 }
609 #else
grpc_chttp2_stream_ref(grpc_chttp2_stream * s)610 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s) {
611 grpc_stream_ref(s->refcount);
612 }
grpc_chttp2_stream_unref(grpc_chttp2_stream * s)613 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s) {
614 grpc_stream_unref(s->refcount);
615 }
616 #endif
617
Reffer(grpc_chttp2_stream * s)618 grpc_chttp2_stream::Reffer::Reffer(grpc_chttp2_stream* s) {
619 /* We reserve one 'active stream' that's dropped when the stream is
620 read-closed. The others are for Chttp2IncomingByteStreams that are
621 actively reading */
622 GRPC_CHTTP2_STREAM_REF(s, "chttp2");
623 GRPC_CHTTP2_REF_TRANSPORT(s->t, "stream");
624 }
625
grpc_chttp2_stream(grpc_chttp2_transport * t,grpc_stream_refcount * refcount,const void * server_data,grpc_core::Arena * arena)626 grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t,
627 grpc_stream_refcount* refcount,
628 const void* server_data,
629 grpc_core::Arena* arena)
630 : t(t),
631 refcount(refcount),
632 reffer(this),
633 metadata_buffer{grpc_chttp2_incoming_metadata_buffer(arena),
634 grpc_chttp2_incoming_metadata_buffer(arena)} {
635 if (server_data) {
636 id = static_cast<uint32_t>((uintptr_t)server_data);
637 *t->accepting_stream = this;
638 grpc_chttp2_stream_map_add(&t->stream_map, id, this);
639 post_destructive_reclaimer(t);
640 }
641 if (t->flow_control->flow_control_enabled()) {
642 flow_control.Init<grpc_core::chttp2::StreamFlowControl>(
643 static_cast<grpc_core::chttp2::TransportFlowControl*>(
644 t->flow_control.get()),
645 this);
646 } else {
647 flow_control.Init<grpc_core::chttp2::StreamFlowControlDisabled>();
648 }
649
650 grpc_slice_buffer_init(&frame_storage);
651 grpc_slice_buffer_init(&unprocessed_incoming_frames_buffer);
652 grpc_slice_buffer_init(&flow_controlled_buffer);
653 GRPC_CLOSURE_INIT(&reset_byte_stream, ::reset_byte_stream, this, nullptr);
654 }
655
~grpc_chttp2_stream()656 grpc_chttp2_stream::~grpc_chttp2_stream() {
657 if (t->channelz_socket != nullptr) {
658 if ((t->is_client && eos_received) || (!t->is_client && eos_sent)) {
659 t->channelz_socket->RecordStreamSucceeded();
660 } else {
661 t->channelz_socket->RecordStreamFailed();
662 }
663 }
664
665 GPR_ASSERT((write_closed && read_closed) || id == 0);
666 if (id != 0) {
667 GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, id) == nullptr);
668 }
669
670 grpc_slice_buffer_destroy_internal(&unprocessed_incoming_frames_buffer);
671 grpc_slice_buffer_destroy_internal(&frame_storage);
672 if (stream_compression_method != GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) {
673 grpc_slice_buffer_destroy_internal(&compressed_data_buffer);
674 }
675 if (stream_decompression_method !=
676 GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
677 grpc_slice_buffer_destroy_internal(&decompressed_data_buffer);
678 }
679
680 grpc_chttp2_list_remove_stalled_by_transport(t, this);
681 grpc_chttp2_list_remove_stalled_by_stream(t, this);
682
683 for (int i = 0; i < STREAM_LIST_COUNT; i++) {
684 if (GPR_UNLIKELY(included[i])) {
685 gpr_log(GPR_ERROR, "%s stream %d still included in list %d",
686 t->is_client ? "client" : "server", id, i);
687 abort();
688 }
689 }
690
691 GPR_ASSERT(send_initial_metadata_finished == nullptr);
692 GPR_ASSERT(fetching_send_message == nullptr);
693 GPR_ASSERT(send_trailing_metadata_finished == nullptr);
694 GPR_ASSERT(recv_initial_metadata_ready == nullptr);
695 GPR_ASSERT(recv_message_ready == nullptr);
696 GPR_ASSERT(recv_trailing_metadata_finished == nullptr);
697 grpc_slice_buffer_destroy_internal(&flow_controlled_buffer);
698 GRPC_ERROR_UNREF(read_closed_error);
699 GRPC_ERROR_UNREF(write_closed_error);
700 GRPC_ERROR_UNREF(byte_stream_error);
701
702 flow_control.Destroy();
703
704 if (t->resource_user != nullptr) {
705 grpc_resource_user_free(t->resource_user, GRPC_RESOURCE_QUOTA_CALL_SIZE);
706 }
707
708 GRPC_CHTTP2_UNREF_TRANSPORT(t, "stream");
709 grpc_core::ExecCtx::Run(DEBUG_LOCATION, destroy_stream_arg, GRPC_ERROR_NONE);
710 }
711
init_stream(grpc_transport * gt,grpc_stream * gs,grpc_stream_refcount * refcount,const void * server_data,grpc_core::Arena * arena)712 static int init_stream(grpc_transport* gt, grpc_stream* gs,
713 grpc_stream_refcount* refcount, const void* server_data,
714 grpc_core::Arena* arena) {
715 GPR_TIMER_SCOPE("init_stream", 0);
716 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
717 new (gs) grpc_chttp2_stream(t, refcount, server_data, arena);
718 return 0;
719 }
720
destroy_stream_locked(void * sp,grpc_error *)721 static void destroy_stream_locked(void* sp, grpc_error* /*error*/) {
722 GPR_TIMER_SCOPE("destroy_stream", 0);
723 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(sp);
724 s->~grpc_chttp2_stream();
725 }
726
destroy_stream(grpc_transport * gt,grpc_stream * gs,grpc_closure * then_schedule_closure)727 static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
728 grpc_closure* then_schedule_closure) {
729 GPR_TIMER_SCOPE("destroy_stream", 0);
730 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
731 grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs);
732 if (s->stream_compression_method !=
733 GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS &&
734 s->stream_compression_ctx != nullptr) {
735 grpc_stream_compression_context_destroy(s->stream_compression_ctx);
736 s->stream_compression_ctx = nullptr;
737 }
738 if (s->stream_decompression_method !=
739 GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS &&
740 s->stream_decompression_ctx != nullptr) {
741 grpc_stream_compression_context_destroy(s->stream_decompression_ctx);
742 s->stream_decompression_ctx = nullptr;
743 }
744
745 s->destroy_stream_arg = then_schedule_closure;
746 t->combiner->Run(
747 GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s, nullptr),
748 GRPC_ERROR_NONE);
749 }
750
grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport * t,uint32_t id)751 grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t,
752 uint32_t id) {
753 if (t->accept_stream_cb == nullptr) {
754 return nullptr;
755 }
756 // Don't accept the stream if memory quota doesn't allow. Note that we should
757 // simply refuse the stream here instead of canceling the stream after it's
758 // accepted since the latter will create the call which costs much memory.
759 if (t->resource_user != nullptr &&
760 !grpc_resource_user_safe_alloc(t->resource_user,
761 GRPC_RESOURCE_QUOTA_CALL_SIZE)) {
762 gpr_log(GPR_ERROR, "Memory exhausted, rejecting the stream.");
763 grpc_chttp2_add_rst_stream_to_next_write(t, id, GRPC_HTTP2_REFUSED_STREAM,
764 nullptr);
765 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
766 return nullptr;
767 }
768 grpc_chttp2_stream* accepting = nullptr;
769 GPR_ASSERT(t->accepting_stream == nullptr);
770 t->accepting_stream = &accepting;
771 t->accept_stream_cb(t->accept_stream_cb_user_data, &t->base,
772 (void*)static_cast<uintptr_t>(id));
773 t->accepting_stream = nullptr;
774 return accepting;
775 }
776
777 /*******************************************************************************
778 * OUTPUT PROCESSING
779 */
780
write_state_name(grpc_chttp2_write_state st)781 static const char* write_state_name(grpc_chttp2_write_state st) {
782 switch (st) {
783 case GRPC_CHTTP2_WRITE_STATE_IDLE:
784 return "IDLE";
785 case GRPC_CHTTP2_WRITE_STATE_WRITING:
786 return "WRITING";
787 case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
788 return "WRITING+MORE";
789 }
790 GPR_UNREACHABLE_CODE(return "UNKNOWN");
791 }
792
set_write_state(grpc_chttp2_transport * t,grpc_chttp2_write_state st,const char * reason)793 static void set_write_state(grpc_chttp2_transport* t,
794 grpc_chttp2_write_state st, const char* reason) {
795 GRPC_CHTTP2_IF_TRACING(
796 gpr_log(GPR_INFO, "W:%p %s [%s] state %s -> %s [%s]", t,
797 t->is_client ? "CLIENT" : "SERVER", t->peer_string,
798 write_state_name(t->write_state), write_state_name(st), reason));
799 t->write_state = st;
800 /* If the state is being reset back to idle, it means a write was just
801 * finished. Make sure all the run_after_write closures are scheduled.
802 *
803 * This is also our chance to close the transport if the transport was marked
804 * to be closed after all writes finish (for example, if we received a go-away
805 * from peer while we had some pending writes) */
806 if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) {
807 grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &t->run_after_write);
808 if (t->close_transport_on_writes_finished != nullptr) {
809 grpc_error* err = t->close_transport_on_writes_finished;
810 t->close_transport_on_writes_finished = nullptr;
811 close_transport_locked(t, err);
812 }
813 }
814 }
815
inc_initiate_write_reason(grpc_chttp2_initiate_write_reason reason)816 static void inc_initiate_write_reason(
817 grpc_chttp2_initiate_write_reason reason) {
818 switch (reason) {
819 case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE:
820 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_INITIAL_WRITE();
821 break;
822 case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM:
823 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_START_NEW_STREAM();
824 break;
825 case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE:
826 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_MESSAGE();
827 break;
828 case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA:
829 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_INITIAL_METADATA();
830 break;
831 case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA:
832 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_TRAILING_METADATA();
833 break;
834 case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING:
835 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RETRY_SEND_PING();
836 break;
837 case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS:
838 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CONTINUE_PINGS();
839 break;
840 case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT:
841 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_GOAWAY_SENT();
842 break;
843 case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM:
844 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RST_STREAM();
845 break;
846 case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API:
847 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CLOSE_FROM_API();
848 break;
849 case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL:
850 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_STREAM_FLOW_CONTROL();
851 break;
852 case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL:
853 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL();
854 break;
855 case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS:
856 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_SETTINGS();
857 break;
858 case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING:
859 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_SETTING();
860 break;
861 case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE:
862 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_UPDATE();
863 break;
864 case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING:
865 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_APPLICATION_PING();
866 break;
867 case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING:
868 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_KEEPALIVE_PING();
869 break;
870 case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED:
871 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL_UNSTALLED();
872 break;
873 case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE:
874 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_PING_RESPONSE();
875 break;
876 case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM:
877 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FORCE_RST_STREAM();
878 break;
879 }
880 }
881
grpc_chttp2_initiate_write(grpc_chttp2_transport * t,grpc_chttp2_initiate_write_reason reason)882 void grpc_chttp2_initiate_write(grpc_chttp2_transport* t,
883 grpc_chttp2_initiate_write_reason reason) {
884 GPR_TIMER_SCOPE("grpc_chttp2_initiate_write", 0);
885
886 switch (t->write_state) {
887 case GRPC_CHTTP2_WRITE_STATE_IDLE:
888 inc_initiate_write_reason(reason);
889 set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING,
890 grpc_chttp2_initiate_write_reason_string(reason));
891 GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
892 /* Note that the 'write_action_begin_locked' closure is being scheduled
893 * on the 'finally_scheduler' of t->combiner. This means that
894 * 'write_action_begin_locked' is called only *after* all the other
895 * closures (some of which are potentially initiating more writes on the
896 * transport) are executed on the t->combiner.
897 *
898 * The reason for scheduling on finally_scheduler is to make sure we batch
899 * as many writes as possible. 'write_action_begin_locked' is the function
900 * that gathers all the relevant bytes (which are at various places in the
901 * grpc_chttp2_transport structure) and append them to 'outbuf' field in
902 * grpc_chttp2_transport thereby batching what would have been potentially
903 * multiple write operations.
904 *
905 * Also, 'write_action_begin_locked' only gathers the bytes into outbuf.
906 * It does not call the endpoint to write the bytes. That is done by the
907 * 'write_action' (which is scheduled by 'write_action_begin_locked') */
908 t->combiner->FinallyRun(
909 GRPC_CLOSURE_INIT(&t->write_action_begin_locked,
910 write_action_begin_locked, t, nullptr),
911 GRPC_ERROR_NONE);
912 break;
913 case GRPC_CHTTP2_WRITE_STATE_WRITING:
914 set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
915 grpc_chttp2_initiate_write_reason_string(reason));
916 break;
917 case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
918 break;
919 }
920 }
921
grpc_chttp2_mark_stream_writable(grpc_chttp2_transport * t,grpc_chttp2_stream * s)922 void grpc_chttp2_mark_stream_writable(grpc_chttp2_transport* t,
923 grpc_chttp2_stream* s) {
924 if (t->closed_with_error == GRPC_ERROR_NONE &&
925 grpc_chttp2_list_add_writable_stream(t, s)) {
926 GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become");
927 }
928 }
929
begin_writing_desc(bool partial)930 static const char* begin_writing_desc(bool partial) {
931 if (partial) {
932 return "begin partial write in background";
933 } else {
934 return "begin write in current thread";
935 }
936 }
937
write_action_begin_locked(void * gt,grpc_error *)938 static void write_action_begin_locked(void* gt, grpc_error* /*error_ignored*/) {
939 GPR_TIMER_SCOPE("write_action_begin_locked", 0);
940 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(gt);
941 GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE);
942 grpc_chttp2_begin_write_result r;
943 if (t->closed_with_error != GRPC_ERROR_NONE) {
944 r.writing = false;
945 } else {
946 r = grpc_chttp2_begin_write(t);
947 }
948 if (r.writing) {
949 if (r.partial) {
950 GRPC_STATS_INC_HTTP2_PARTIAL_WRITES();
951 }
952 set_write_state(t,
953 r.partial ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE
954 : GRPC_CHTTP2_WRITE_STATE_WRITING,
955 begin_writing_desc(r.partial));
956 write_action(t, GRPC_ERROR_NONE);
957 if (t->reading_paused_on_pending_induced_frames) {
958 GPR_ASSERT(t->num_pending_induced_frames == 0);
959 /* We had paused reading, because we had many induced frames (SETTINGS
960 * ACK, PINGS ACK and RST_STREAMS) pending in t->qbuf. Now that we have
961 * been able to flush qbuf, we can resume reading. */
962 GRPC_CHTTP2_IF_TRACING(gpr_log(
963 GPR_INFO,
964 "transport %p : Resuming reading after being paused due to too "
965 "many unwritten SETTINGS ACK, PINGS ACK and RST_STREAM frames",
966 t));
967 t->reading_paused_on_pending_induced_frames = false;
968 continue_read_action_locked(t);
969 }
970 } else {
971 GRPC_STATS_INC_HTTP2_SPURIOUS_WRITES_BEGUN();
972 set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "begin writing nothing");
973 GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing");
974 }
975 }
976
write_action(void * gt,grpc_error *)977 static void write_action(void* gt, grpc_error* /*error*/) {
978 GPR_TIMER_SCOPE("write_action", 0);
979 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(gt);
980 void* cl = t->cl;
981 t->cl = nullptr;
982 grpc_endpoint_write(
983 t->ep, &t->outbuf,
984 GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end, t,
985 grpc_schedule_on_exec_ctx),
986 cl);
987 }
988
write_action_end(void * tp,grpc_error * error)989 static void write_action_end(void* tp, grpc_error* error) {
990 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
991 t->combiner->Run(GRPC_CLOSURE_INIT(&t->write_action_end_locked,
992 write_action_end_locked, t, nullptr),
993 GRPC_ERROR_REF(error));
994 }
995
996 /* Callback from the grpc_endpoint after bytes have been written by calling
997 * sendmsg */
write_action_end_locked(void * tp,grpc_error * error)998 static void write_action_end_locked(void* tp, grpc_error* error) {
999 GPR_TIMER_SCOPE("terminate_writing_with_lock", 0);
1000 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
1001
1002 bool closed = false;
1003 if (error != GRPC_ERROR_NONE) {
1004 close_transport_locked(t, GRPC_ERROR_REF(error));
1005 closed = true;
1006 }
1007
1008 if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED) {
1009 t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SENT;
1010 closed = true;
1011 if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
1012 close_transport_locked(
1013 t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("goaway sent"));
1014 }
1015 }
1016
1017 switch (t->write_state) {
1018 case GRPC_CHTTP2_WRITE_STATE_IDLE:
1019 GPR_UNREACHABLE_CODE(break);
1020 case GRPC_CHTTP2_WRITE_STATE_WRITING:
1021 GPR_TIMER_MARK("state=writing", 0);
1022 set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "finish writing");
1023 break;
1024 case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
1025 GPR_TIMER_MARK("state=writing_stale_no_poller", 0);
1026 set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, "continue writing");
1027 GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
1028 // If the transport is closed, we will retry writing on the endpoint
1029 // and next write may contain part of the currently serialized frames.
1030 // So, we should only call the run_after_write callbacks when the next
1031 // write finishes, or the callbacks will be invoked when the stream is
1032 // closed.
1033 if (!closed) {
1034 grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &t->run_after_write);
1035 }
1036 t->combiner->FinallyRun(
1037 GRPC_CLOSURE_INIT(&t->write_action_begin_locked,
1038 write_action_begin_locked, t, nullptr),
1039 GRPC_ERROR_NONE);
1040 break;
1041 }
1042
1043 grpc_chttp2_end_write(t, GRPC_ERROR_REF(error));
1044 GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing");
1045 }
1046
1047 // Dirties an HTTP2 setting to be sent out next time a writing path occurs.
1048 // If the change needs to occur immediately, manually initiate a write.
queue_setting_update(grpc_chttp2_transport * t,grpc_chttp2_setting_id id,uint32_t value)1049 static void queue_setting_update(grpc_chttp2_transport* t,
1050 grpc_chttp2_setting_id id, uint32_t value) {
1051 const grpc_chttp2_setting_parameters* sp =
1052 &grpc_chttp2_settings_parameters[id];
1053 uint32_t use_value = GPR_CLAMP(value, sp->min_value, sp->max_value);
1054 if (use_value != value) {
1055 gpr_log(GPR_INFO, "Requested parameter %s clamped from %d to %d", sp->name,
1056 value, use_value);
1057 }
1058 if (use_value != t->settings[GRPC_LOCAL_SETTINGS][id]) {
1059 t->settings[GRPC_LOCAL_SETTINGS][id] = use_value;
1060 t->dirtied_local_settings = 1;
1061 }
1062 }
1063
grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport * t,uint32_t goaway_error,uint32_t last_stream_id,const grpc_slice & goaway_text)1064 void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
1065 uint32_t goaway_error,
1066 uint32_t last_stream_id,
1067 const grpc_slice& goaway_text) {
1068 // Discard the error from a previous goaway frame (if any)
1069 if (t->goaway_error != GRPC_ERROR_NONE) {
1070 GRPC_ERROR_UNREF(t->goaway_error);
1071 }
1072 t->goaway_error = grpc_error_set_str(
1073 grpc_error_set_int(
1074 grpc_error_set_int(
1075 GRPC_ERROR_CREATE_FROM_STATIC_STRING("GOAWAY received"),
1076 GRPC_ERROR_INT_HTTP2_ERROR, static_cast<intptr_t>(goaway_error)),
1077 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
1078 GRPC_ERROR_STR_RAW_BYTES, goaway_text);
1079
1080 GRPC_CHTTP2_IF_TRACING(
1081 gpr_log(GPR_INFO, "transport %p got goaway with last stream id %d", t,
1082 last_stream_id));
1083 /* We want to log this irrespective of whether http tracing is enabled if we
1084 * received a GOAWAY with a non NO_ERROR code. */
1085 if (goaway_error != GRPC_HTTP2_NO_ERROR) {
1086 gpr_log(GPR_INFO, "%s: Got goaway [%d] err=%s", t->peer_string,
1087 goaway_error, grpc_error_string(t->goaway_error));
1088 }
1089 /* When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug
1090 * data equal to "too_many_pings", it should log the occurrence at a log level
1091 * that is enabled by default and double the configured KEEPALIVE_TIME used
1092 * for new connections on that channel. */
1093 if (GPR_UNLIKELY(t->is_client &&
1094 goaway_error == GRPC_HTTP2_ENHANCE_YOUR_CALM &&
1095 grpc_slice_str_cmp(goaway_text, "too_many_pings") == 0)) {
1096 gpr_log(GPR_ERROR,
1097 "Received a GOAWAY with error code ENHANCE_YOUR_CALM and debug "
1098 "data equal to \"too_many_pings\"");
1099 double current_keepalive_time_ms = static_cast<double>(t->keepalive_time);
1100 constexpr int max_keepalive_time_ms =
1101 INT_MAX / KEEPALIVE_TIME_BACKOFF_MULTIPLIER;
1102 t->keepalive_time =
1103 current_keepalive_time_ms > static_cast<double>(max_keepalive_time_ms)
1104 ? GRPC_MILLIS_INF_FUTURE
1105 : static_cast<grpc_millis>(current_keepalive_time_ms *
1106 KEEPALIVE_TIME_BACKOFF_MULTIPLIER);
1107 }
1108 /* lie: use transient failure from the transport to indicate goaway has been
1109 * received */
1110 connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE, "got_goaway");
1111 }
1112
maybe_start_some_streams(grpc_chttp2_transport * t)1113 static void maybe_start_some_streams(grpc_chttp2_transport* t) {
1114 grpc_chttp2_stream* s;
1115 /* cancel out streams that haven't yet started if we have received a GOAWAY */
1116 if (t->goaway_error != GRPC_ERROR_NONE) {
1117 while (grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1118 grpc_chttp2_cancel_stream(
1119 t, s,
1120 grpc_error_set_int(
1121 GRPC_ERROR_CREATE_FROM_STATIC_STRING("GOAWAY received"),
1122 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1123 }
1124 return;
1125 }
1126 /* start streams where we have free grpc_chttp2_stream ids and free
1127 * concurrency */
1128 while (t->next_stream_id <= MAX_CLIENT_STREAM_ID &&
1129 grpc_chttp2_stream_map_size(&t->stream_map) <
1130 t->settings[GRPC_PEER_SETTINGS]
1131 [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] &&
1132 grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1133 /* safe since we can't (legally) be parsing this stream yet */
1134 GRPC_CHTTP2_IF_TRACING(gpr_log(
1135 GPR_INFO,
1136 "HTTP:%s: Transport %p allocating new grpc_chttp2_stream %p to id %d",
1137 t->is_client ? "CLI" : "SVR", t, s, t->next_stream_id));
1138
1139 GPR_ASSERT(s->id == 0);
1140 s->id = t->next_stream_id;
1141 t->next_stream_id += 2;
1142
1143 if (t->next_stream_id >= MAX_CLIENT_STREAM_ID) {
1144 connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE,
1145 "no_more_stream_ids");
1146 }
1147
1148 grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
1149 post_destructive_reclaimer(t);
1150 grpc_chttp2_mark_stream_writable(t, s);
1151 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM);
1152 }
1153 /* cancel out streams that will never be started */
1154 if (t->next_stream_id >= MAX_CLIENT_STREAM_ID) {
1155 while (grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1156 grpc_chttp2_cancel_stream(
1157 t, s,
1158 grpc_error_set_int(
1159 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream IDs exhausted"),
1160 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1161 }
1162 }
1163 }
1164
1165 /* Flag that this closure barrier may be covering a write in a pollset, and so
1166 we should not complete this closure until we can prove that the write got
1167 scheduled */
1168 #define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 0)
1169 /* First bit of the reference count, stored in the high order bits (with the low
1170 bits being used for flags defined above) */
1171 #define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16)
1172
add_closure_barrier(grpc_closure * closure)1173 static grpc_closure* add_closure_barrier(grpc_closure* closure) {
1174 closure->next_data.scratch += CLOSURE_BARRIER_FIRST_REF_BIT;
1175 return closure;
1176 }
1177
null_then_sched_closure(grpc_closure ** closure)1178 static void null_then_sched_closure(grpc_closure** closure) {
1179 grpc_closure* c = *closure;
1180 *closure = nullptr;
1181 grpc_core::ExecCtx::Run(DEBUG_LOCATION, c, GRPC_ERROR_NONE);
1182 }
1183
grpc_chttp2_complete_closure_step(grpc_chttp2_transport * t,grpc_chttp2_stream *,grpc_closure ** pclosure,grpc_error * error,const char * desc)1184 void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
1185 grpc_chttp2_stream* /*s*/,
1186 grpc_closure** pclosure,
1187 grpc_error* error, const char* desc) {
1188 grpc_closure* closure = *pclosure;
1189 *pclosure = nullptr;
1190 if (closure == nullptr) {
1191 GRPC_ERROR_UNREF(error);
1192 return;
1193 }
1194 closure->next_data.scratch -= CLOSURE_BARRIER_FIRST_REF_BIT;
1195 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1196 const char* errstr = grpc_error_string(error);
1197 gpr_log(
1198 GPR_INFO,
1199 "complete_closure_step: t=%p %p refs=%d flags=0x%04x desc=%s err=%s "
1200 "write_state=%s",
1201 t, closure,
1202 static_cast<int>(closure->next_data.scratch /
1203 CLOSURE_BARRIER_FIRST_REF_BIT),
1204 static_cast<int>(closure->next_data.scratch %
1205 CLOSURE_BARRIER_FIRST_REF_BIT),
1206 desc, errstr, write_state_name(t->write_state));
1207 }
1208 if (error != GRPC_ERROR_NONE) {
1209 if (closure->error_data.error == GRPC_ERROR_NONE) {
1210 closure->error_data.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1211 "Error in HTTP transport completing operation");
1212 closure->error_data.error = grpc_error_set_str(
1213 closure->error_data.error, GRPC_ERROR_STR_TARGET_ADDRESS,
1214 grpc_slice_from_copied_string(t->peer_string));
1215 }
1216 closure->error_data.error =
1217 grpc_error_add_child(closure->error_data.error, error);
1218 }
1219 if (closure->next_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) {
1220 if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) ||
1221 !(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) {
1222 // Using GRPC_CLOSURE_SCHED instead of GRPC_CLOSURE_RUN to avoid running
1223 // closures earlier than when it is safe to do so.
1224 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure,
1225 closure->error_data.error);
1226 } else {
1227 grpc_closure_list_append(&t->run_after_write, closure,
1228 closure->error_data.error);
1229 }
1230 }
1231 }
1232
contains_non_ok_status(grpc_metadata_batch * batch)1233 static bool contains_non_ok_status(grpc_metadata_batch* batch) {
1234 if (batch->idx.named.grpc_status != nullptr) {
1235 return !grpc_mdelem_static_value_eq(batch->idx.named.grpc_status->md,
1236 GRPC_MDELEM_GRPC_STATUS_0);
1237 }
1238 return false;
1239 }
1240
maybe_become_writable_due_to_send_msg(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1241 static void maybe_become_writable_due_to_send_msg(grpc_chttp2_transport* t,
1242 grpc_chttp2_stream* s) {
1243 if (s->id != 0 && (!s->write_buffering ||
1244 s->flow_controlled_buffer.length > t->write_buffer_size)) {
1245 grpc_chttp2_mark_stream_writable(t, s);
1246 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE);
1247 }
1248 }
1249
add_fetched_slice_locked(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1250 static void add_fetched_slice_locked(grpc_chttp2_transport* t,
1251 grpc_chttp2_stream* s) {
1252 s->fetched_send_message_length +=
1253 static_cast<uint32_t> GRPC_SLICE_LENGTH(s->fetching_slice);
1254 grpc_slice_buffer_add(&s->flow_controlled_buffer, s->fetching_slice);
1255 maybe_become_writable_due_to_send_msg(t, s);
1256 }
1257
continue_fetching_send_locked(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1258 static void continue_fetching_send_locked(grpc_chttp2_transport* t,
1259 grpc_chttp2_stream* s) {
1260 for (;;) {
1261 if (s->fetching_send_message == nullptr) {
1262 /* Stream was cancelled before message fetch completed */
1263 abort(); /* TODO(ctiller): what cleanup here? */
1264 return; /* early out */
1265 }
1266 if (s->fetched_send_message_length == s->fetching_send_message->length()) {
1267 int64_t notify_offset = s->next_message_end_offset;
1268 if (notify_offset <= s->flow_controlled_bytes_written) {
1269 grpc_chttp2_complete_closure_step(
1270 t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE,
1271 "fetching_send_message_finished");
1272 } else {
1273 grpc_chttp2_write_cb* cb = t->write_cb_pool;
1274 if (cb == nullptr) {
1275 cb = static_cast<grpc_chttp2_write_cb*>(gpr_malloc(sizeof(*cb)));
1276 } else {
1277 t->write_cb_pool = cb->next;
1278 }
1279 cb->call_at_byte = notify_offset;
1280 cb->closure = s->fetching_send_message_finished;
1281 s->fetching_send_message_finished = nullptr;
1282 grpc_chttp2_write_cb** list =
1283 s->fetching_send_message->flags() & GRPC_WRITE_THROUGH
1284 ? &s->on_write_finished_cbs
1285 : &s->on_flow_controlled_cbs;
1286 cb->next = *list;
1287 *list = cb;
1288 }
1289 s->fetching_send_message.reset();
1290 return; /* early out */
1291 } else if (s->fetching_send_message->Next(
1292 UINT32_MAX, GRPC_CLOSURE_INIT(&s->complete_fetch_locked,
1293 ::complete_fetch, s,
1294 grpc_schedule_on_exec_ctx))) {
1295 grpc_error* error = s->fetching_send_message->Pull(&s->fetching_slice);
1296 if (error != GRPC_ERROR_NONE) {
1297 s->fetching_send_message.reset();
1298 grpc_chttp2_cancel_stream(t, s, error);
1299 } else {
1300 add_fetched_slice_locked(t, s);
1301 }
1302 }
1303 }
1304 }
1305
complete_fetch(void * gs,grpc_error * error)1306 static void complete_fetch(void* gs, grpc_error* error) {
1307 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(gs);
1308 s->t->combiner->Run(GRPC_CLOSURE_INIT(&s->complete_fetch_locked,
1309 ::complete_fetch_locked, s, nullptr),
1310 GRPC_ERROR_REF(error));
1311 }
1312
complete_fetch_locked(void * gs,grpc_error * error)1313 static void complete_fetch_locked(void* gs, grpc_error* error) {
1314 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(gs);
1315 grpc_chttp2_transport* t = s->t;
1316 if (error == GRPC_ERROR_NONE) {
1317 error = s->fetching_send_message->Pull(&s->fetching_slice);
1318 if (error == GRPC_ERROR_NONE) {
1319 add_fetched_slice_locked(t, s);
1320 continue_fetching_send_locked(t, s);
1321 }
1322 }
1323 if (error != GRPC_ERROR_NONE) {
1324 s->fetching_send_message.reset();
1325 grpc_chttp2_cancel_stream(t, s, error);
1326 }
1327 }
1328
log_metadata(const grpc_metadata_batch * md_batch,uint32_t id,bool is_client,bool is_initial)1329 static void log_metadata(const grpc_metadata_batch* md_batch, uint32_t id,
1330 bool is_client, bool is_initial) {
1331 for (grpc_linked_mdelem* md = md_batch->list.head; md != nullptr;
1332 md = md->next) {
1333 char* key = grpc_slice_to_c_string(GRPC_MDKEY(md->md));
1334 char* value = grpc_slice_to_c_string(GRPC_MDVALUE(md->md));
1335 gpr_log(GPR_INFO, "HTTP:%d:%s:%s: %s: %s", id, is_initial ? "HDR" : "TRL",
1336 is_client ? "CLI" : "SVR", key, value);
1337 gpr_free(key);
1338 gpr_free(value);
1339 }
1340 }
1341
perform_stream_op_locked(void * stream_op,grpc_error *)1342 static void perform_stream_op_locked(void* stream_op,
1343 grpc_error* /*error_ignored*/) {
1344 GPR_TIMER_SCOPE("perform_stream_op_locked", 0);
1345
1346 grpc_transport_stream_op_batch* op =
1347 static_cast<grpc_transport_stream_op_batch*>(stream_op);
1348 grpc_chttp2_stream* s =
1349 static_cast<grpc_chttp2_stream*>(op->handler_private.extra_arg);
1350 grpc_transport_stream_op_batch_payload* op_payload = op->payload;
1351 grpc_chttp2_transport* t = s->t;
1352
1353 GRPC_STATS_INC_HTTP2_OP_BATCHES();
1354
1355 s->context = op->payload->context;
1356 s->traced = op->is_traced;
1357 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1358 gpr_log(GPR_INFO, "perform_stream_op_locked: %s; on_complete = %p",
1359 grpc_transport_stream_op_batch_string(op).c_str(), op->on_complete);
1360 if (op->send_initial_metadata) {
1361 log_metadata(op_payload->send_initial_metadata.send_initial_metadata,
1362 s->id, t->is_client, true);
1363 }
1364 if (op->send_trailing_metadata) {
1365 log_metadata(op_payload->send_trailing_metadata.send_trailing_metadata,
1366 s->id, t->is_client, false);
1367 }
1368 }
1369
1370 grpc_closure* on_complete = op->on_complete;
1371 // on_complete will be null if and only if there are no send ops in the batch.
1372 if (on_complete != nullptr) {
1373 // This batch has send ops. Use final_data as a barrier until enqueue time;
1374 // the initial counter is dropped at the end of this function.
1375 on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT;
1376 on_complete->error_data.error = GRPC_ERROR_NONE;
1377 }
1378
1379 if (op->cancel_stream) {
1380 GRPC_STATS_INC_HTTP2_OP_CANCEL();
1381 grpc_chttp2_cancel_stream(t, s, op_payload->cancel_stream.cancel_error);
1382 }
1383
1384 if (op->send_initial_metadata) {
1385 if (t->is_client && t->channelz_socket != nullptr) {
1386 t->channelz_socket->RecordStreamStartedFromLocal();
1387 }
1388 GRPC_STATS_INC_HTTP2_OP_SEND_INITIAL_METADATA();
1389 GPR_ASSERT(s->send_initial_metadata_finished == nullptr);
1390 on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
1391
1392 /* Identify stream compression */
1393 if (op_payload->send_initial_metadata.send_initial_metadata->idx.named
1394 .content_encoding == nullptr ||
1395 grpc_stream_compression_method_parse(
1396 GRPC_MDVALUE(
1397 op_payload->send_initial_metadata.send_initial_metadata->idx
1398 .named.content_encoding->md),
1399 true, &s->stream_compression_method) == 0) {
1400 s->stream_compression_method = GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS;
1401 }
1402 if (s->stream_compression_method !=
1403 GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) {
1404 s->uncompressed_data_size = 0;
1405 s->stream_compression_ctx = nullptr;
1406 grpc_slice_buffer_init(&s->compressed_data_buffer);
1407 }
1408 s->send_initial_metadata_finished = add_closure_barrier(on_complete);
1409 s->send_initial_metadata =
1410 op_payload->send_initial_metadata.send_initial_metadata;
1411 const size_t metadata_size =
1412 grpc_metadata_batch_size(s->send_initial_metadata);
1413 const size_t metadata_peer_limit =
1414 t->settings[GRPC_PEER_SETTINGS]
1415 [GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE];
1416 if (t->is_client) {
1417 s->deadline = GPR_MIN(s->deadline, s->send_initial_metadata->deadline);
1418 }
1419 if (metadata_size > metadata_peer_limit) {
1420 grpc_chttp2_cancel_stream(
1421 t, s,
1422 grpc_error_set_int(
1423 grpc_error_set_int(
1424 grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1425 "to-be-sent initial metadata size "
1426 "exceeds peer limit"),
1427 GRPC_ERROR_INT_SIZE,
1428 static_cast<intptr_t>(metadata_size)),
1429 GRPC_ERROR_INT_LIMIT,
1430 static_cast<intptr_t>(metadata_peer_limit)),
1431 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED));
1432 } else {
1433 if (contains_non_ok_status(s->send_initial_metadata)) {
1434 s->seen_error = true;
1435 }
1436 if (!s->write_closed) {
1437 if (t->is_client) {
1438 if (t->closed_with_error == GRPC_ERROR_NONE) {
1439 GPR_ASSERT(s->id == 0);
1440 grpc_chttp2_list_add_waiting_for_concurrency(t, s);
1441 maybe_start_some_streams(t);
1442 } else {
1443 grpc_chttp2_cancel_stream(
1444 t, s,
1445 grpc_error_set_int(
1446 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1447 "Transport closed", &t->closed_with_error, 1),
1448 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1449 }
1450 } else {
1451 GPR_ASSERT(s->id != 0);
1452 grpc_chttp2_mark_stream_writable(t, s);
1453 if (!(op->send_message &&
1454 (op->payload->send_message.send_message->flags() &
1455 GRPC_WRITE_BUFFER_HINT))) {
1456 grpc_chttp2_initiate_write(
1457 t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA);
1458 }
1459 }
1460 } else {
1461 s->send_initial_metadata = nullptr;
1462 grpc_chttp2_complete_closure_step(
1463 t, s, &s->send_initial_metadata_finished,
1464 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1465 "Attempt to send initial metadata after stream was closed",
1466 &s->write_closed_error, 1),
1467 "send_initial_metadata_finished");
1468 }
1469 }
1470 if (op_payload->send_initial_metadata.peer_string != nullptr) {
1471 gpr_atm_rel_store(op_payload->send_initial_metadata.peer_string,
1472 (gpr_atm)t->peer_string);
1473 }
1474 }
1475
1476 if (op->send_message) {
1477 GRPC_STATS_INC_HTTP2_OP_SEND_MESSAGE();
1478 t->num_messages_in_next_write++;
1479 GRPC_STATS_INC_HTTP2_SEND_MESSAGE_SIZE(
1480 op->payload->send_message.send_message->length());
1481 on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
1482 s->fetching_send_message_finished = add_closure_barrier(op->on_complete);
1483 if (s->write_closed) {
1484 op->payload->send_message.stream_write_closed = true;
1485 // We should NOT return an error here, so as to avoid a cancel OP being
1486 // started. The surface layer will notice that the stream has been closed
1487 // for writes and fail the send message op.
1488 op->payload->send_message.send_message.reset();
1489 grpc_chttp2_complete_closure_step(
1490 t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE,
1491 "fetching_send_message_finished");
1492 } else {
1493 GPR_ASSERT(s->fetching_send_message == nullptr);
1494 uint8_t* frame_hdr = grpc_slice_buffer_tiny_add(
1495 &s->flow_controlled_buffer, GRPC_HEADER_SIZE_IN_BYTES);
1496 uint32_t flags = op_payload->send_message.send_message->flags();
1497 frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0;
1498 size_t len = op_payload->send_message.send_message->length();
1499 frame_hdr[1] = static_cast<uint8_t>(len >> 24);
1500 frame_hdr[2] = static_cast<uint8_t>(len >> 16);
1501 frame_hdr[3] = static_cast<uint8_t>(len >> 8);
1502 frame_hdr[4] = static_cast<uint8_t>(len);
1503 s->fetching_send_message =
1504 std::move(op_payload->send_message.send_message);
1505 s->fetched_send_message_length = 0;
1506 s->next_message_end_offset =
1507 s->flow_controlled_bytes_written +
1508 static_cast<int64_t>(s->flow_controlled_buffer.length) +
1509 static_cast<int64_t>(len);
1510 if (flags & GRPC_WRITE_BUFFER_HINT) {
1511 s->next_message_end_offset -= t->write_buffer_size;
1512 s->write_buffering = true;
1513 } else {
1514 s->write_buffering = false;
1515 }
1516 continue_fetching_send_locked(t, s);
1517 maybe_become_writable_due_to_send_msg(t, s);
1518 }
1519 }
1520
1521 if (op->send_trailing_metadata) {
1522 GRPC_STATS_INC_HTTP2_OP_SEND_TRAILING_METADATA();
1523 GPR_ASSERT(s->send_trailing_metadata_finished == nullptr);
1524 on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
1525 s->send_trailing_metadata_finished = add_closure_barrier(on_complete);
1526 s->send_trailing_metadata =
1527 op_payload->send_trailing_metadata.send_trailing_metadata;
1528 s->sent_trailing_metadata_op = op_payload->send_trailing_metadata.sent;
1529 s->write_buffering = false;
1530 const size_t metadata_size =
1531 grpc_metadata_batch_size(s->send_trailing_metadata);
1532 const size_t metadata_peer_limit =
1533 t->settings[GRPC_PEER_SETTINGS]
1534 [GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE];
1535 if (metadata_size > metadata_peer_limit) {
1536 grpc_chttp2_cancel_stream(
1537 t, s,
1538 grpc_error_set_int(
1539 grpc_error_set_int(
1540 grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1541 "to-be-sent trailing metadata size "
1542 "exceeds peer limit"),
1543 GRPC_ERROR_INT_SIZE,
1544 static_cast<intptr_t>(metadata_size)),
1545 GRPC_ERROR_INT_LIMIT,
1546 static_cast<intptr_t>(metadata_peer_limit)),
1547 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED));
1548 } else {
1549 if (contains_non_ok_status(s->send_trailing_metadata)) {
1550 s->seen_error = true;
1551 }
1552 if (s->write_closed) {
1553 s->send_trailing_metadata = nullptr;
1554 s->sent_trailing_metadata_op = nullptr;
1555 grpc_chttp2_complete_closure_step(
1556 t, s, &s->send_trailing_metadata_finished,
1557 grpc_metadata_batch_is_empty(
1558 op->payload->send_trailing_metadata.send_trailing_metadata)
1559 ? GRPC_ERROR_NONE
1560 : GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1561 "Attempt to send trailing metadata after "
1562 "stream was closed"),
1563 "send_trailing_metadata_finished");
1564 } else if (s->id != 0) {
1565 /* TODO(ctiller): check if there's flow control for any outstanding
1566 bytes before going writable */
1567 grpc_chttp2_mark_stream_writable(t, s);
1568 grpc_chttp2_initiate_write(
1569 t, GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA);
1570 }
1571 }
1572 }
1573
1574 if (op->recv_initial_metadata) {
1575 GRPC_STATS_INC_HTTP2_OP_RECV_INITIAL_METADATA();
1576 GPR_ASSERT(s->recv_initial_metadata_ready == nullptr);
1577 s->recv_initial_metadata_ready =
1578 op_payload->recv_initial_metadata.recv_initial_metadata_ready;
1579 s->recv_initial_metadata =
1580 op_payload->recv_initial_metadata.recv_initial_metadata;
1581 s->trailing_metadata_available =
1582 op_payload->recv_initial_metadata.trailing_metadata_available;
1583 if (op_payload->recv_initial_metadata.peer_string != nullptr) {
1584 gpr_atm_rel_store(op_payload->recv_initial_metadata.peer_string,
1585 (gpr_atm)t->peer_string);
1586 }
1587 grpc_chttp2_maybe_complete_recv_initial_metadata(t, s);
1588 }
1589
1590 if (op->recv_message) {
1591 GRPC_STATS_INC_HTTP2_OP_RECV_MESSAGE();
1592 size_t before = 0;
1593 GPR_ASSERT(s->recv_message_ready == nullptr);
1594 GPR_ASSERT(!s->pending_byte_stream);
1595 s->recv_message_ready = op_payload->recv_message.recv_message_ready;
1596 s->recv_message = op_payload->recv_message.recv_message;
1597 if (s->id != 0) {
1598 if (!s->read_closed) {
1599 before = s->frame_storage.length +
1600 s->unprocessed_incoming_frames_buffer.length;
1601 }
1602 }
1603 grpc_chttp2_maybe_complete_recv_message(t, s);
1604 if (s->id != 0) {
1605 if (!s->read_closed && s->frame_storage.length == 0) {
1606 size_t after = s->frame_storage.length +
1607 s->unprocessed_incoming_frames_buffer_cached_length;
1608 s->flow_control->IncomingByteStreamUpdate(GRPC_HEADER_SIZE_IN_BYTES,
1609 before - after);
1610 grpc_chttp2_act_on_flowctl_action(s->flow_control->MakeAction(), t, s);
1611 }
1612 }
1613 }
1614
1615 if (op->recv_trailing_metadata) {
1616 GRPC_STATS_INC_HTTP2_OP_RECV_TRAILING_METADATA();
1617 GPR_ASSERT(s->collecting_stats == nullptr);
1618 s->collecting_stats = op_payload->recv_trailing_metadata.collect_stats;
1619 GPR_ASSERT(s->recv_trailing_metadata_finished == nullptr);
1620 s->recv_trailing_metadata_finished =
1621 op_payload->recv_trailing_metadata.recv_trailing_metadata_ready;
1622 s->recv_trailing_metadata =
1623 op_payload->recv_trailing_metadata.recv_trailing_metadata;
1624 s->final_metadata_requested = true;
1625 grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
1626 }
1627
1628 if (on_complete != nullptr) {
1629 grpc_chttp2_complete_closure_step(t, s, &on_complete, GRPC_ERROR_NONE,
1630 "op->on_complete");
1631 }
1632
1633 GRPC_CHTTP2_STREAM_UNREF(s, "perform_stream_op");
1634 }
1635
perform_stream_op(grpc_transport * gt,grpc_stream * gs,grpc_transport_stream_op_batch * op)1636 static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
1637 grpc_transport_stream_op_batch* op) {
1638 GPR_TIMER_SCOPE("perform_stream_op", 0);
1639 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
1640 grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs);
1641
1642 if (!t->is_client) {
1643 if (op->send_initial_metadata) {
1644 grpc_millis deadline =
1645 op->payload->send_initial_metadata.send_initial_metadata->deadline;
1646 GPR_ASSERT(deadline == GRPC_MILLIS_INF_FUTURE);
1647 }
1648 if (op->send_trailing_metadata) {
1649 grpc_millis deadline =
1650 op->payload->send_trailing_metadata.send_trailing_metadata->deadline;
1651 GPR_ASSERT(deadline == GRPC_MILLIS_INF_FUTURE);
1652 }
1653 }
1654
1655 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1656 gpr_log(GPR_INFO, "perform_stream_op[s=%p]: %s", s,
1657 grpc_transport_stream_op_batch_string(op).c_str());
1658 }
1659
1660 GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op");
1661 op->handler_private.extra_arg = gs;
1662 t->combiner->Run(GRPC_CLOSURE_INIT(&op->handler_private.closure,
1663 perform_stream_op_locked, op, nullptr),
1664 GRPC_ERROR_NONE);
1665 }
1666
cancel_pings(grpc_chttp2_transport * t,grpc_error * error)1667 static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) {
1668 /* callback remaining pings: they're not allowed to call into the transport,
1669 and maybe they hold resources that need to be freed */
1670 grpc_chttp2_ping_queue* pq = &t->ping_queue;
1671 GPR_ASSERT(error != GRPC_ERROR_NONE);
1672 for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) {
1673 grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error));
1674 grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &pq->lists[j]);
1675 }
1676 GRPC_ERROR_UNREF(error);
1677 }
1678
send_ping_locked(grpc_chttp2_transport * t,grpc_closure * on_initiate,grpc_closure * on_ack)1679 static void send_ping_locked(grpc_chttp2_transport* t,
1680 grpc_closure* on_initiate, grpc_closure* on_ack) {
1681 if (t->closed_with_error != GRPC_ERROR_NONE) {
1682 grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_initiate,
1683 GRPC_ERROR_REF(t->closed_with_error));
1684 grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_ack,
1685 GRPC_ERROR_REF(t->closed_with_error));
1686 return;
1687 }
1688 grpc_chttp2_ping_queue* pq = &t->ping_queue;
1689 grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE], on_initiate,
1690 GRPC_ERROR_NONE);
1691 grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT], on_ack,
1692 GRPC_ERROR_NONE);
1693 }
1694
1695 /*
1696 * Specialized form of send_ping_locked for keepalive ping. If there is already
1697 * a ping in progress, the keepalive ping would piggyback onto that ping,
1698 * instead of waiting for that ping to complete and then starting a new ping.
1699 */
send_keepalive_ping_locked(grpc_chttp2_transport * t)1700 static void send_keepalive_ping_locked(grpc_chttp2_transport* t) {
1701 if (t->closed_with_error != GRPC_ERROR_NONE) {
1702 t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked,
1703 start_keepalive_ping_locked, t, nullptr),
1704 GRPC_ERROR_REF(t->closed_with_error));
1705 t->combiner->Run(
1706 GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked,
1707 finish_keepalive_ping_locked, t, nullptr),
1708 GRPC_ERROR_REF(t->closed_with_error));
1709 return;
1710 }
1711 grpc_chttp2_ping_queue* pq = &t->ping_queue;
1712 if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) {
1713 /* There is a ping in flight. Add yourself to the inflight closure list. */
1714 t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked,
1715 start_keepalive_ping_locked, t, nullptr),
1716 GRPC_ERROR_REF(t->closed_with_error));
1717 grpc_closure_list_append(
1718 &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT],
1719 GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked,
1720 finish_keepalive_ping, t, grpc_schedule_on_exec_ctx),
1721 GRPC_ERROR_NONE);
1722 return;
1723 }
1724 grpc_closure_list_append(
1725 &pq->lists[GRPC_CHTTP2_PCL_INITIATE],
1726 GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked, start_keepalive_ping,
1727 t, grpc_schedule_on_exec_ctx),
1728 GRPC_ERROR_NONE);
1729 grpc_closure_list_append(
1730 &pq->lists[GRPC_CHTTP2_PCL_NEXT],
1731 GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked, finish_keepalive_ping,
1732 t, grpc_schedule_on_exec_ctx),
1733 GRPC_ERROR_NONE);
1734 }
1735
grpc_chttp2_retry_initiate_ping(void * tp,grpc_error * error)1736 void grpc_chttp2_retry_initiate_ping(void* tp, grpc_error* error) {
1737 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
1738 t->combiner->Run(GRPC_CLOSURE_INIT(&t->retry_initiate_ping_locked,
1739 retry_initiate_ping_locked, t, nullptr),
1740 GRPC_ERROR_REF(error));
1741 }
1742
retry_initiate_ping_locked(void * tp,grpc_error * error)1743 static void retry_initiate_ping_locked(void* tp, grpc_error* error) {
1744 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
1745 t->ping_state.is_delayed_ping_timer_set = false;
1746 if (error == GRPC_ERROR_NONE) {
1747 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING);
1748 }
1749 GRPC_CHTTP2_UNREF_TRANSPORT(t, "retry_initiate_ping_locked");
1750 }
1751
grpc_chttp2_ack_ping(grpc_chttp2_transport * t,uint64_t id)1752 void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) {
1753 grpc_chttp2_ping_queue* pq = &t->ping_queue;
1754 if (pq->inflight_id != id) {
1755 char* from = grpc_endpoint_get_peer(t->ep);
1756 gpr_log(GPR_DEBUG, "Unknown ping response from %s: %" PRIx64, from, id);
1757 gpr_free(from);
1758 return;
1759 }
1760 grpc_core::ExecCtx::RunList(DEBUG_LOCATION,
1761 &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
1762 if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
1763 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS);
1764 }
1765 }
1766
send_goaway(grpc_chttp2_transport * t,grpc_error * error)1767 static void send_goaway(grpc_chttp2_transport* t, grpc_error* error) {
1768 /* We want to log this irrespective of whether http tracing is enabled */
1769 gpr_log(GPR_INFO, "%s: Sending goaway err=%s", t->peer_string,
1770 grpc_error_string(error));
1771 t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED;
1772 grpc_http2_error_code http_error;
1773 grpc_slice slice;
1774 grpc_error_get_status(error, GRPC_MILLIS_INF_FUTURE, nullptr, &slice,
1775 &http_error, nullptr);
1776 grpc_chttp2_goaway_append(t->last_new_stream_id,
1777 static_cast<uint32_t>(http_error),
1778 grpc_slice_ref_internal(slice), &t->qbuf);
1779 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT);
1780 GRPC_ERROR_UNREF(error);
1781 }
1782
grpc_chttp2_add_ping_strike(grpc_chttp2_transport * t)1783 void grpc_chttp2_add_ping_strike(grpc_chttp2_transport* t) {
1784 if (++t->ping_recv_state.ping_strikes > t->ping_policy.max_ping_strikes &&
1785 t->ping_policy.max_ping_strikes != 0) {
1786 send_goaway(t,
1787 grpc_error_set_int(
1788 GRPC_ERROR_CREATE_FROM_STATIC_STRING("too_many_pings"),
1789 GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM));
1790 /*The transport will be closed after the write is done */
1791 close_transport_locked(
1792 t, grpc_error_set_int(
1793 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many pings"),
1794 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1795 }
1796 }
1797
grpc_chttp2_reset_ping_clock(grpc_chttp2_transport * t)1798 void grpc_chttp2_reset_ping_clock(grpc_chttp2_transport* t) {
1799 if (!t->is_client) {
1800 t->ping_recv_state.last_ping_recv_time = GRPC_MILLIS_INF_PAST;
1801 t->ping_recv_state.ping_strikes = 0;
1802 }
1803 t->ping_state.pings_before_data_required =
1804 t->ping_policy.max_pings_without_data;
1805 }
1806
perform_transport_op_locked(void * stream_op,grpc_error *)1807 static void perform_transport_op_locked(void* stream_op,
1808 grpc_error* /*error_ignored*/) {
1809 grpc_transport_op* op = static_cast<grpc_transport_op*>(stream_op);
1810 grpc_chttp2_transport* t =
1811 static_cast<grpc_chttp2_transport*>(op->handler_private.extra_arg);
1812
1813 if (op->goaway_error) {
1814 send_goaway(t, op->goaway_error);
1815 }
1816
1817 if (op->set_accept_stream) {
1818 t->accept_stream_cb = op->set_accept_stream_fn;
1819 t->accept_stream_cb_user_data = op->set_accept_stream_user_data;
1820 }
1821
1822 if (op->bind_pollset) {
1823 grpc_endpoint_add_to_pollset(t->ep, op->bind_pollset);
1824 }
1825
1826 if (op->bind_pollset_set) {
1827 grpc_endpoint_add_to_pollset_set(t->ep, op->bind_pollset_set);
1828 }
1829
1830 if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
1831 send_ping_locked(t, op->send_ping.on_initiate, op->send_ping.on_ack);
1832 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING);
1833 }
1834
1835 if (op->start_connectivity_watch != nullptr) {
1836 t->state_tracker.AddWatcher(op->start_connectivity_watch_state,
1837 std::move(op->start_connectivity_watch));
1838 }
1839 if (op->stop_connectivity_watch != nullptr) {
1840 t->state_tracker.RemoveWatcher(op->stop_connectivity_watch);
1841 }
1842
1843 if (op->disconnect_with_error != GRPC_ERROR_NONE) {
1844 close_transport_locked(t, op->disconnect_with_error);
1845 }
1846
1847 grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, GRPC_ERROR_NONE);
1848
1849 GRPC_CHTTP2_UNREF_TRANSPORT(t, "transport_op");
1850 }
1851
perform_transport_op(grpc_transport * gt,grpc_transport_op * op)1852 static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
1853 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
1854 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1855 gpr_log(GPR_INFO, "perform_transport_op[t=%p]: %s", t,
1856 grpc_transport_op_string(op).c_str());
1857 }
1858 op->handler_private.extra_arg = gt;
1859 GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op");
1860 t->combiner->Run(GRPC_CLOSURE_INIT(&op->handler_private.closure,
1861 perform_transport_op_locked, op, nullptr),
1862 GRPC_ERROR_NONE);
1863 }
1864
1865 /*******************************************************************************
1866 * INPUT PROCESSING - GENERAL
1867 */
1868
grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport *,grpc_chttp2_stream * s)1869 void grpc_chttp2_maybe_complete_recv_initial_metadata(
1870 grpc_chttp2_transport* /*t*/, grpc_chttp2_stream* s) {
1871 if (s->recv_initial_metadata_ready != nullptr &&
1872 s->published_metadata[0] != GRPC_METADATA_NOT_PUBLISHED) {
1873 if (s->seen_error) {
1874 grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1875 if (!s->pending_byte_stream) {
1876 grpc_slice_buffer_reset_and_unref_internal(
1877 &s->unprocessed_incoming_frames_buffer);
1878 }
1879 }
1880 grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[0],
1881 s->recv_initial_metadata);
1882 null_then_sched_closure(&s->recv_initial_metadata_ready);
1883 }
1884 }
1885
grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport *,grpc_chttp2_stream * s)1886 void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* /*t*/,
1887 grpc_chttp2_stream* s) {
1888 grpc_error* error = GRPC_ERROR_NONE;
1889 if (s->recv_message_ready != nullptr) {
1890 *s->recv_message = nullptr;
1891 if (s->final_metadata_requested && s->seen_error) {
1892 grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1893 if (!s->pending_byte_stream) {
1894 grpc_slice_buffer_reset_and_unref_internal(
1895 &s->unprocessed_incoming_frames_buffer);
1896 }
1897 }
1898 if (!s->pending_byte_stream) {
1899 while (s->unprocessed_incoming_frames_buffer.length > 0 ||
1900 s->frame_storage.length > 0) {
1901 if (s->unprocessed_incoming_frames_buffer.length == 0) {
1902 grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer,
1903 &s->frame_storage);
1904 s->unprocessed_incoming_frames_decompressed = false;
1905 }
1906 if (!s->unprocessed_incoming_frames_decompressed &&
1907 s->stream_decompression_method !=
1908 GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
1909 GPR_ASSERT(s->decompressed_data_buffer.length == 0);
1910 bool end_of_context;
1911 if (!s->stream_decompression_ctx) {
1912 s->stream_decompression_ctx =
1913 grpc_stream_compression_context_create(
1914 s->stream_decompression_method);
1915 }
1916 if (!grpc_stream_decompress(
1917 s->stream_decompression_ctx,
1918 &s->unprocessed_incoming_frames_buffer,
1919 &s->decompressed_data_buffer, nullptr,
1920 GRPC_HEADER_SIZE_IN_BYTES - s->decompressed_header_bytes,
1921 &end_of_context)) {
1922 grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1923 grpc_slice_buffer_reset_and_unref_internal(
1924 &s->unprocessed_incoming_frames_buffer);
1925 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1926 "Stream decompression error.");
1927 } else {
1928 s->decompressed_header_bytes += s->decompressed_data_buffer.length;
1929 if (s->decompressed_header_bytes == GRPC_HEADER_SIZE_IN_BYTES) {
1930 s->decompressed_header_bytes = 0;
1931 }
1932 error = grpc_deframe_unprocessed_incoming_frames(
1933 &s->data_parser, s, &s->decompressed_data_buffer, nullptr,
1934 s->recv_message);
1935 if (end_of_context) {
1936 grpc_stream_compression_context_destroy(
1937 s->stream_decompression_ctx);
1938 s->stream_decompression_ctx = nullptr;
1939 }
1940 }
1941 } else {
1942 error = grpc_deframe_unprocessed_incoming_frames(
1943 &s->data_parser, s, &s->unprocessed_incoming_frames_buffer,
1944 nullptr, s->recv_message);
1945 }
1946 if (error != GRPC_ERROR_NONE) {
1947 s->seen_error = true;
1948 grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1949 grpc_slice_buffer_reset_and_unref_internal(
1950 &s->unprocessed_incoming_frames_buffer);
1951 break;
1952 } else if (*s->recv_message != nullptr) {
1953 break;
1954 }
1955 }
1956 }
1957 // save the length of the buffer before handing control back to application
1958 // threads. Needed to support correct flow control bookkeeping
1959 s->unprocessed_incoming_frames_buffer_cached_length =
1960 s->unprocessed_incoming_frames_buffer.length;
1961 if (error == GRPC_ERROR_NONE && *s->recv_message != nullptr) {
1962 null_then_sched_closure(&s->recv_message_ready);
1963 } else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) {
1964 *s->recv_message = nullptr;
1965 null_then_sched_closure(&s->recv_message_ready);
1966 }
1967 GRPC_ERROR_UNREF(error);
1968 }
1969 }
1970
grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1971 void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
1972 grpc_chttp2_stream* s) {
1973 grpc_chttp2_maybe_complete_recv_message(t, s);
1974 if (s->recv_trailing_metadata_finished != nullptr && s->read_closed &&
1975 s->write_closed) {
1976 if (s->seen_error || !t->is_client) {
1977 grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1978 if (!s->pending_byte_stream) {
1979 grpc_slice_buffer_reset_and_unref_internal(
1980 &s->unprocessed_incoming_frames_buffer);
1981 }
1982 }
1983 bool pending_data = s->pending_byte_stream ||
1984 s->unprocessed_incoming_frames_buffer.length > 0;
1985 if (s->read_closed && s->frame_storage.length > 0 && !pending_data &&
1986 !s->seen_error && s->recv_trailing_metadata_finished != nullptr) {
1987 /* Maybe some SYNC_FLUSH data is left in frame_storage. Consume them and
1988 * maybe decompress the next 5 bytes in the stream. */
1989 if (s->stream_decompression_method ==
1990 GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
1991 grpc_slice_buffer_move_first(
1992 &s->frame_storage,
1993 GPR_MIN(s->frame_storage.length, GRPC_HEADER_SIZE_IN_BYTES),
1994 &s->unprocessed_incoming_frames_buffer);
1995 if (s->unprocessed_incoming_frames_buffer.length > 0) {
1996 s->unprocessed_incoming_frames_decompressed = true;
1997 pending_data = true;
1998 }
1999 } else {
2000 bool end_of_context;
2001 if (!s->stream_decompression_ctx) {
2002 s->stream_decompression_ctx = grpc_stream_compression_context_create(
2003 s->stream_decompression_method);
2004 }
2005 if (!grpc_stream_decompress(
2006 s->stream_decompression_ctx, &s->frame_storage,
2007 &s->unprocessed_incoming_frames_buffer, nullptr,
2008 GRPC_HEADER_SIZE_IN_BYTES, &end_of_context)) {
2009 grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
2010 grpc_slice_buffer_reset_and_unref_internal(
2011 &s->unprocessed_incoming_frames_buffer);
2012 s->seen_error = true;
2013 } else {
2014 if (s->unprocessed_incoming_frames_buffer.length > 0) {
2015 s->unprocessed_incoming_frames_decompressed = true;
2016 pending_data = true;
2017 }
2018 if (end_of_context) {
2019 grpc_stream_compression_context_destroy(
2020 s->stream_decompression_ctx);
2021 s->stream_decompression_ctx = nullptr;
2022 }
2023 }
2024 }
2025 }
2026 if (s->read_closed && s->frame_storage.length == 0 && !pending_data &&
2027 s->recv_trailing_metadata_finished != nullptr) {
2028 grpc_transport_move_stats(&s->stats, s->collecting_stats);
2029 s->collecting_stats = nullptr;
2030 grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[1],
2031 s->recv_trailing_metadata);
2032 null_then_sched_closure(&s->recv_trailing_metadata_finished);
2033 }
2034 }
2035 }
2036
remove_stream(grpc_chttp2_transport * t,uint32_t id,grpc_error * error)2037 static void remove_stream(grpc_chttp2_transport* t, uint32_t id,
2038 grpc_error* error) {
2039 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(
2040 grpc_chttp2_stream_map_delete(&t->stream_map, id));
2041 GPR_DEBUG_ASSERT(s);
2042 if (t->incoming_stream == s) {
2043 t->incoming_stream = nullptr;
2044 grpc_chttp2_parsing_become_skip_parser(t);
2045 }
2046 if (s->pending_byte_stream) {
2047 if (s->on_next != nullptr) {
2048 grpc_core::Chttp2IncomingByteStream* bs = s->data_parser.parsing_frame;
2049 if (error == GRPC_ERROR_NONE) {
2050 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
2051 }
2052 bs->PublishError(error);
2053 bs->Unref();
2054 s->data_parser.parsing_frame = nullptr;
2055 } else {
2056 GRPC_ERROR_UNREF(s->byte_stream_error);
2057 s->byte_stream_error = GRPC_ERROR_REF(error);
2058 }
2059 }
2060
2061 if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
2062 post_benign_reclaimer(t);
2063 if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SENT) {
2064 close_transport_locked(
2065 t, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2066 "Last stream closed after sending GOAWAY", &error, 1));
2067 }
2068 }
2069 if (grpc_chttp2_list_remove_writable_stream(t, s)) {
2070 GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:remove_stream");
2071 }
2072
2073 GRPC_ERROR_UNREF(error);
2074
2075 maybe_start_some_streams(t);
2076 }
2077
grpc_chttp2_cancel_stream(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error * due_to_error)2078 void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2079 grpc_error* due_to_error) {
2080 if (!t->is_client && !s->sent_trailing_metadata &&
2081 grpc_error_has_clear_grpc_status(due_to_error)) {
2082 close_from_api(t, s, due_to_error);
2083 return;
2084 }
2085
2086 if (!s->read_closed || !s->write_closed) {
2087 if (s->id != 0) {
2088 grpc_http2_error_code http_error;
2089 grpc_error_get_status(due_to_error, s->deadline, nullptr, nullptr,
2090 &http_error, nullptr);
2091 grpc_chttp2_add_rst_stream_to_next_write(
2092 t, s->id, static_cast<uint32_t>(http_error), &s->stats.outgoing);
2093 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
2094 }
2095 }
2096 if (due_to_error != GRPC_ERROR_NONE && !s->seen_error) {
2097 s->seen_error = true;
2098 }
2099 grpc_chttp2_mark_stream_closed(t, s, 1, 1, due_to_error);
2100 }
2101
grpc_chttp2_fake_status(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error * error)2102 void grpc_chttp2_fake_status(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2103 grpc_error* error) {
2104 grpc_status_code status;
2105 grpc_slice slice;
2106 grpc_error_get_status(error, s->deadline, &status, &slice, nullptr, nullptr);
2107 if (status != GRPC_STATUS_OK) {
2108 s->seen_error = true;
2109 }
2110 /* stream_global->recv_trailing_metadata_finished gives us a
2111 last chance replacement: we've received trailing metadata,
2112 but something more important has become available to signal
2113 to the upper layers - drop what we've got, and then publish
2114 what we want - which is safe because we haven't told anyone
2115 about the metadata yet */
2116 if (s->published_metadata[1] == GRPC_METADATA_NOT_PUBLISHED ||
2117 s->recv_trailing_metadata_finished != nullptr) {
2118 char status_string[GPR_LTOA_MIN_BUFSIZE];
2119 gpr_ltoa(status, status_string);
2120 GRPC_LOG_IF_ERROR("add_status",
2121 grpc_chttp2_incoming_metadata_buffer_replace_or_add(
2122 &s->metadata_buffer[1],
2123 grpc_mdelem_from_slices(
2124 GRPC_MDSTR_GRPC_STATUS,
2125 grpc_core::UnmanagedMemorySlice(status_string))));
2126 if (!GRPC_SLICE_IS_EMPTY(slice)) {
2127 GRPC_LOG_IF_ERROR(
2128 "add_status_message",
2129 grpc_chttp2_incoming_metadata_buffer_replace_or_add(
2130 &s->metadata_buffer[1],
2131 grpc_mdelem_create(GRPC_MDSTR_GRPC_MESSAGE, slice, nullptr)));
2132 }
2133 s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE;
2134 grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2135 }
2136
2137 GRPC_ERROR_UNREF(error);
2138 }
2139
add_error(grpc_error * error,grpc_error ** refs,size_t * nrefs)2140 static void add_error(grpc_error* error, grpc_error** refs, size_t* nrefs) {
2141 if (error == GRPC_ERROR_NONE) return;
2142 for (size_t i = 0; i < *nrefs; i++) {
2143 if (error == refs[i]) {
2144 return;
2145 }
2146 }
2147 refs[*nrefs] = error;
2148 ++*nrefs;
2149 }
2150
removal_error(grpc_error * extra_error,grpc_chttp2_stream * s,const char * master_error_msg)2151 static grpc_error* removal_error(grpc_error* extra_error, grpc_chttp2_stream* s,
2152 const char* master_error_msg) {
2153 grpc_error* refs[3];
2154 size_t nrefs = 0;
2155 add_error(s->read_closed_error, refs, &nrefs);
2156 add_error(s->write_closed_error, refs, &nrefs);
2157 add_error(extra_error, refs, &nrefs);
2158 grpc_error* error = GRPC_ERROR_NONE;
2159 if (nrefs > 0) {
2160 error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(master_error_msg,
2161 refs, nrefs);
2162 }
2163 GRPC_ERROR_UNREF(extra_error);
2164 return error;
2165 }
2166
flush_write_list(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_chttp2_write_cb ** list,grpc_error * error)2167 static void flush_write_list(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2168 grpc_chttp2_write_cb** list, grpc_error* error) {
2169 while (*list) {
2170 grpc_chttp2_write_cb* cb = *list;
2171 *list = cb->next;
2172 grpc_chttp2_complete_closure_step(t, s, &cb->closure, GRPC_ERROR_REF(error),
2173 "on_write_finished_cb");
2174 cb->next = t->write_cb_pool;
2175 t->write_cb_pool = cb;
2176 }
2177 GRPC_ERROR_UNREF(error);
2178 }
2179
grpc_chttp2_fail_pending_writes(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error * error)2180 void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t,
2181 grpc_chttp2_stream* s, grpc_error* error) {
2182 error =
2183 removal_error(error, s, "Pending writes failed due to stream closure");
2184 s->send_initial_metadata = nullptr;
2185 grpc_chttp2_complete_closure_step(t, s, &s->send_initial_metadata_finished,
2186 GRPC_ERROR_REF(error),
2187 "send_initial_metadata_finished");
2188
2189 s->send_trailing_metadata = nullptr;
2190 s->sent_trailing_metadata_op = nullptr;
2191 grpc_chttp2_complete_closure_step(t, s, &s->send_trailing_metadata_finished,
2192 GRPC_ERROR_REF(error),
2193 "send_trailing_metadata_finished");
2194
2195 s->fetching_send_message.reset();
2196 grpc_chttp2_complete_closure_step(t, s, &s->fetching_send_message_finished,
2197 GRPC_ERROR_REF(error),
2198 "fetching_send_message_finished");
2199 flush_write_list(t, s, &s->on_write_finished_cbs, GRPC_ERROR_REF(error));
2200 flush_write_list(t, s, &s->on_flow_controlled_cbs, error);
2201 }
2202
grpc_chttp2_mark_stream_closed(grpc_chttp2_transport * t,grpc_chttp2_stream * s,int close_reads,int close_writes,grpc_error * error)2203 void grpc_chttp2_mark_stream_closed(grpc_chttp2_transport* t,
2204 grpc_chttp2_stream* s, int close_reads,
2205 int close_writes, grpc_error* error) {
2206 if (s->read_closed && s->write_closed) {
2207 /* already closed, but we should still fake the status if needed. */
2208 grpc_error* overall_error = removal_error(error, s, "Stream removed");
2209 if (overall_error != GRPC_ERROR_NONE) {
2210 grpc_chttp2_fake_status(t, s, overall_error);
2211 }
2212 grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2213 return;
2214 }
2215 bool closed_read = false;
2216 bool became_closed = false;
2217 if (close_reads && !s->read_closed) {
2218 s->read_closed_error = GRPC_ERROR_REF(error);
2219 s->read_closed = true;
2220 closed_read = true;
2221 }
2222 if (close_writes && !s->write_closed) {
2223 s->write_closed_error = GRPC_ERROR_REF(error);
2224 s->write_closed = true;
2225 grpc_chttp2_fail_pending_writes(t, s, GRPC_ERROR_REF(error));
2226 }
2227 if (s->read_closed && s->write_closed) {
2228 became_closed = true;
2229 grpc_error* overall_error =
2230 removal_error(GRPC_ERROR_REF(error), s, "Stream removed");
2231 if (s->id != 0) {
2232 remove_stream(t, s->id, GRPC_ERROR_REF(overall_error));
2233 } else {
2234 /* Purge streams waiting on concurrency still waiting for id assignment */
2235 grpc_chttp2_list_remove_waiting_for_concurrency(t, s);
2236 }
2237 if (overall_error != GRPC_ERROR_NONE) {
2238 grpc_chttp2_fake_status(t, s, overall_error);
2239 }
2240 }
2241 if (closed_read) {
2242 for (int i = 0; i < 2; i++) {
2243 if (s->published_metadata[i] == GRPC_METADATA_NOT_PUBLISHED) {
2244 s->published_metadata[i] = GRPC_METADATA_PUBLISHED_AT_CLOSE;
2245 }
2246 }
2247 grpc_chttp2_maybe_complete_recv_initial_metadata(t, s);
2248 grpc_chttp2_maybe_complete_recv_message(t, s);
2249 }
2250 if (became_closed) {
2251 grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2252 GRPC_CHTTP2_STREAM_UNREF(s, "chttp2");
2253 }
2254 GRPC_ERROR_UNREF(error);
2255 }
2256
close_from_api(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error * error)2257 static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2258 grpc_error* error) {
2259 grpc_slice hdr;
2260 grpc_slice status_hdr;
2261 grpc_slice http_status_hdr;
2262 grpc_slice content_type_hdr;
2263 grpc_slice message_pfx;
2264 uint8_t* p;
2265 uint32_t len = 0;
2266 grpc_status_code grpc_status;
2267 grpc_slice slice;
2268 grpc_error_get_status(error, s->deadline, &grpc_status, &slice, nullptr,
2269 nullptr);
2270
2271 GPR_ASSERT(grpc_status >= 0 && (int)grpc_status < 100);
2272
2273 /* Hand roll a header block.
2274 This is unnecessarily ugly - at some point we should find a more
2275 elegant solution.
2276 It's complicated by the fact that our send machinery would be dead by
2277 the time we got around to sending this, so instead we ignore HPACK
2278 compression and just write the uncompressed bytes onto the wire. */
2279 if (!s->sent_initial_metadata) {
2280 http_status_hdr = GRPC_SLICE_MALLOC(13);
2281 p = GRPC_SLICE_START_PTR(http_status_hdr);
2282 *p++ = 0x00;
2283 *p++ = 7;
2284 *p++ = ':';
2285 *p++ = 's';
2286 *p++ = 't';
2287 *p++ = 'a';
2288 *p++ = 't';
2289 *p++ = 'u';
2290 *p++ = 's';
2291 *p++ = 3;
2292 *p++ = '2';
2293 *p++ = '0';
2294 *p++ = '0';
2295 GPR_ASSERT(p == GRPC_SLICE_END_PTR(http_status_hdr));
2296 len += static_cast<uint32_t> GRPC_SLICE_LENGTH(http_status_hdr);
2297
2298 content_type_hdr = GRPC_SLICE_MALLOC(31);
2299 p = GRPC_SLICE_START_PTR(content_type_hdr);
2300 *p++ = 0x00;
2301 *p++ = 12;
2302 *p++ = 'c';
2303 *p++ = 'o';
2304 *p++ = 'n';
2305 *p++ = 't';
2306 *p++ = 'e';
2307 *p++ = 'n';
2308 *p++ = 't';
2309 *p++ = '-';
2310 *p++ = 't';
2311 *p++ = 'y';
2312 *p++ = 'p';
2313 *p++ = 'e';
2314 *p++ = 16;
2315 *p++ = 'a';
2316 *p++ = 'p';
2317 *p++ = 'p';
2318 *p++ = 'l';
2319 *p++ = 'i';
2320 *p++ = 'c';
2321 *p++ = 'a';
2322 *p++ = 't';
2323 *p++ = 'i';
2324 *p++ = 'o';
2325 *p++ = 'n';
2326 *p++ = '/';
2327 *p++ = 'g';
2328 *p++ = 'r';
2329 *p++ = 'p';
2330 *p++ = 'c';
2331 GPR_ASSERT(p == GRPC_SLICE_END_PTR(content_type_hdr));
2332 len += static_cast<uint32_t> GRPC_SLICE_LENGTH(content_type_hdr);
2333 }
2334
2335 status_hdr = GRPC_SLICE_MALLOC(15 + (grpc_status >= 10));
2336 p = GRPC_SLICE_START_PTR(status_hdr);
2337 *p++ = 0x00; /* literal header, not indexed */
2338 *p++ = 11; /* len(grpc-status) */
2339 *p++ = 'g';
2340 *p++ = 'r';
2341 *p++ = 'p';
2342 *p++ = 'c';
2343 *p++ = '-';
2344 *p++ = 's';
2345 *p++ = 't';
2346 *p++ = 'a';
2347 *p++ = 't';
2348 *p++ = 'u';
2349 *p++ = 's';
2350 if (grpc_status < 10) {
2351 *p++ = 1;
2352 *p++ = static_cast<uint8_t>('0' + grpc_status);
2353 } else {
2354 *p++ = 2;
2355 *p++ = static_cast<uint8_t>('0' + (grpc_status / 10));
2356 *p++ = static_cast<uint8_t>('0' + (grpc_status % 10));
2357 }
2358 GPR_ASSERT(p == GRPC_SLICE_END_PTR(status_hdr));
2359 len += static_cast<uint32_t> GRPC_SLICE_LENGTH(status_hdr);
2360
2361 size_t msg_len = GRPC_SLICE_LENGTH(slice);
2362 GPR_ASSERT(msg_len <= UINT32_MAX);
2363 uint32_t msg_len_len = GRPC_CHTTP2_VARINT_LENGTH((uint32_t)msg_len, 1);
2364 message_pfx = GRPC_SLICE_MALLOC(14 + msg_len_len);
2365 p = GRPC_SLICE_START_PTR(message_pfx);
2366 *p++ = 0x00; /* literal header, not indexed */
2367 *p++ = 12; /* len(grpc-message) */
2368 *p++ = 'g';
2369 *p++ = 'r';
2370 *p++ = 'p';
2371 *p++ = 'c';
2372 *p++ = '-';
2373 *p++ = 'm';
2374 *p++ = 'e';
2375 *p++ = 's';
2376 *p++ = 's';
2377 *p++ = 'a';
2378 *p++ = 'g';
2379 *p++ = 'e';
2380 GRPC_CHTTP2_WRITE_VARINT((uint32_t)msg_len, 1, 0, p, (uint32_t)msg_len_len);
2381 p += msg_len_len;
2382 GPR_ASSERT(p == GRPC_SLICE_END_PTR(message_pfx));
2383 len += static_cast<uint32_t> GRPC_SLICE_LENGTH(message_pfx);
2384 len += static_cast<uint32_t>(msg_len);
2385
2386 hdr = GRPC_SLICE_MALLOC(9);
2387 p = GRPC_SLICE_START_PTR(hdr);
2388 *p++ = static_cast<uint8_t>(len >> 16);
2389 *p++ = static_cast<uint8_t>(len >> 8);
2390 *p++ = static_cast<uint8_t>(len);
2391 *p++ = GRPC_CHTTP2_FRAME_HEADER;
2392 *p++ = GRPC_CHTTP2_DATA_FLAG_END_STREAM | GRPC_CHTTP2_DATA_FLAG_END_HEADERS;
2393 *p++ = static_cast<uint8_t>(s->id >> 24);
2394 *p++ = static_cast<uint8_t>(s->id >> 16);
2395 *p++ = static_cast<uint8_t>(s->id >> 8);
2396 *p++ = static_cast<uint8_t>(s->id);
2397 GPR_ASSERT(p == GRPC_SLICE_END_PTR(hdr));
2398
2399 grpc_slice_buffer_add(&t->qbuf, hdr);
2400 if (!s->sent_initial_metadata) {
2401 grpc_slice_buffer_add(&t->qbuf, http_status_hdr);
2402 grpc_slice_buffer_add(&t->qbuf, content_type_hdr);
2403 }
2404 grpc_slice_buffer_add(&t->qbuf, status_hdr);
2405 grpc_slice_buffer_add(&t->qbuf, message_pfx);
2406 grpc_slice_buffer_add(&t->qbuf, grpc_slice_ref_internal(slice));
2407 grpc_chttp2_reset_ping_clock(t);
2408 grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR,
2409 &s->stats.outgoing);
2410
2411 grpc_chttp2_mark_stream_closed(t, s, 1, 1, error);
2412 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API);
2413 }
2414
2415 struct cancel_stream_cb_args {
2416 grpc_error* error;
2417 grpc_chttp2_transport* t;
2418 };
2419
cancel_stream_cb(void * user_data,uint32_t,void * stream)2420 static void cancel_stream_cb(void* user_data, uint32_t /*key*/, void* stream) {
2421 cancel_stream_cb_args* args = static_cast<cancel_stream_cb_args*>(user_data);
2422 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(stream);
2423 grpc_chttp2_cancel_stream(args->t, s, GRPC_ERROR_REF(args->error));
2424 }
2425
end_all_the_calls(grpc_chttp2_transport * t,grpc_error * error)2426 static void end_all_the_calls(grpc_chttp2_transport* t, grpc_error* error) {
2427 intptr_t http2_error;
2428 // If there is no explicit grpc or HTTP/2 error, set to UNAVAILABLE on server.
2429 if (!t->is_client && !grpc_error_has_clear_grpc_status(error) &&
2430 !grpc_error_get_int(error, GRPC_ERROR_INT_HTTP2_ERROR, &http2_error)) {
2431 error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
2432 GRPC_STATUS_UNAVAILABLE);
2433 }
2434 cancel_stream_cb_args args = {error, t};
2435 grpc_chttp2_stream_map_for_each(&t->stream_map, cancel_stream_cb, &args);
2436 GRPC_ERROR_UNREF(error);
2437 }
2438
2439 /*******************************************************************************
2440 * INPUT PROCESSING - PARSING
2441 */
2442
2443 template <class F>
WithUrgency(grpc_chttp2_transport * t,grpc_core::chttp2::FlowControlAction::Urgency urgency,grpc_chttp2_initiate_write_reason reason,F action)2444 static void WithUrgency(grpc_chttp2_transport* t,
2445 grpc_core::chttp2::FlowControlAction::Urgency urgency,
2446 grpc_chttp2_initiate_write_reason reason, F action) {
2447 switch (urgency) {
2448 case grpc_core::chttp2::FlowControlAction::Urgency::NO_ACTION_NEEDED:
2449 break;
2450 case grpc_core::chttp2::FlowControlAction::Urgency::UPDATE_IMMEDIATELY:
2451 grpc_chttp2_initiate_write(t, reason);
2452 // fallthrough
2453 case grpc_core::chttp2::FlowControlAction::Urgency::QUEUE_UPDATE:
2454 action();
2455 break;
2456 }
2457 }
2458
grpc_chttp2_act_on_flowctl_action(const grpc_core::chttp2::FlowControlAction & action,grpc_chttp2_transport * t,grpc_chttp2_stream * s)2459 void grpc_chttp2_act_on_flowctl_action(
2460 const grpc_core::chttp2::FlowControlAction& action,
2461 grpc_chttp2_transport* t, grpc_chttp2_stream* s) {
2462 WithUrgency(t, action.send_stream_update(),
2463 GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL,
2464 [t, s]() { grpc_chttp2_mark_stream_writable(t, s); });
2465 WithUrgency(t, action.send_transport_update(),
2466 GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL, []() {});
2467 WithUrgency(t, action.send_initial_window_update(),
2468 GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
2469 queue_setting_update(t,
2470 GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
2471 action.initial_window_size());
2472 });
2473 WithUrgency(t, action.send_max_frame_size_update(),
2474 GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
2475 queue_setting_update(t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
2476 action.max_frame_size());
2477 });
2478 }
2479
try_http_parsing(grpc_chttp2_transport * t)2480 static grpc_error* try_http_parsing(grpc_chttp2_transport* t) {
2481 grpc_http_parser parser;
2482 size_t i = 0;
2483 grpc_error* error = GRPC_ERROR_NONE;
2484 grpc_http_response response;
2485
2486 grpc_http_parser_init(&parser, GRPC_HTTP_RESPONSE, &response);
2487
2488 grpc_error* parse_error = GRPC_ERROR_NONE;
2489 for (; i < t->read_buffer.count && parse_error == GRPC_ERROR_NONE; i++) {
2490 parse_error =
2491 grpc_http_parser_parse(&parser, t->read_buffer.slices[i], nullptr);
2492 }
2493 if (parse_error == GRPC_ERROR_NONE &&
2494 (parse_error = grpc_http_parser_eof(&parser)) == GRPC_ERROR_NONE) {
2495 error = grpc_error_set_int(
2496 grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2497 "Trying to connect an http1.x server"),
2498 GRPC_ERROR_INT_HTTP_STATUS, response.status),
2499 GRPC_ERROR_INT_GRPC_STATUS,
2500 grpc_http2_status_to_grpc_status(response.status));
2501 }
2502 GRPC_ERROR_UNREF(parse_error);
2503
2504 grpc_http_parser_destroy(&parser);
2505 grpc_http_response_destroy(&response);
2506 return error;
2507 }
2508
read_action(void * tp,grpc_error * error)2509 static void read_action(void* tp, grpc_error* error) {
2510 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2511 t->combiner->Run(
2512 GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t, nullptr),
2513 GRPC_ERROR_REF(error));
2514 }
2515
read_action_locked(void * tp,grpc_error * error)2516 static void read_action_locked(void* tp, grpc_error* error) {
2517 GPR_TIMER_SCOPE("reading_action_locked", 0);
2518
2519 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2520
2521 GRPC_ERROR_REF(error);
2522
2523 grpc_error* err = error;
2524 if (err != GRPC_ERROR_NONE) {
2525 err = grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2526 "Endpoint read failed", &err, 1),
2527 GRPC_ERROR_INT_OCCURRED_DURING_WRITE,
2528 t->write_state);
2529 }
2530 GPR_SWAP(grpc_error*, err, error);
2531 GRPC_ERROR_UNREF(err);
2532 if (t->closed_with_error == GRPC_ERROR_NONE) {
2533 GPR_TIMER_SCOPE("reading_action.parse", 0);
2534 size_t i = 0;
2535 grpc_error* errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE,
2536 GRPC_ERROR_NONE};
2537 for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) {
2538 grpc_core::BdpEstimator* bdp_est = t->flow_control->bdp_estimator();
2539 if (bdp_est) {
2540 bdp_est->AddIncomingBytes(
2541 static_cast<int64_t> GRPC_SLICE_LENGTH(t->read_buffer.slices[i]));
2542 }
2543 errors[1] = grpc_chttp2_perform_read(t, t->read_buffer.slices[i]);
2544 }
2545 if (errors[1] != GRPC_ERROR_NONE) {
2546 errors[2] = try_http_parsing(t);
2547 GRPC_ERROR_UNREF(error);
2548 error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2549 "Failed parsing HTTP/2", errors, GPR_ARRAY_SIZE(errors));
2550 }
2551 for (i = 0; i < GPR_ARRAY_SIZE(errors); i++) {
2552 GRPC_ERROR_UNREF(errors[i]);
2553 }
2554
2555 GPR_TIMER_SCOPE("post_parse_locked", 0);
2556 if (t->initial_window_update != 0) {
2557 if (t->initial_window_update > 0) {
2558 grpc_chttp2_stream* s;
2559 while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) {
2560 grpc_chttp2_mark_stream_writable(t, s);
2561 grpc_chttp2_initiate_write(
2562 t, GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING);
2563 }
2564 }
2565 t->initial_window_update = 0;
2566 }
2567 }
2568
2569 GPR_TIMER_SCOPE("post_reading_action_locked", 0);
2570 bool keep_reading = false;
2571 if (error == GRPC_ERROR_NONE && t->closed_with_error != GRPC_ERROR_NONE) {
2572 error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2573 "Transport closed", &t->closed_with_error, 1);
2574 }
2575 if (error != GRPC_ERROR_NONE) {
2576 /* If a goaway frame was received, this might be the reason why the read
2577 * failed. Add this info to the error */
2578 if (t->goaway_error != GRPC_ERROR_NONE) {
2579 error = grpc_error_add_child(error, GRPC_ERROR_REF(t->goaway_error));
2580 }
2581
2582 close_transport_locked(t, GRPC_ERROR_REF(error));
2583 t->endpoint_reading = 0;
2584 } else if (t->closed_with_error == GRPC_ERROR_NONE) {
2585 keep_reading = true;
2586 /* Since we have read a byte, reset the keepalive timer */
2587 if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
2588 grpc_timer_cancel(&t->keepalive_ping_timer);
2589 }
2590 }
2591 grpc_slice_buffer_reset_and_unref_internal(&t->read_buffer);
2592
2593 if (keep_reading) {
2594 if (t->num_pending_induced_frames >= DEFAULT_MAX_PENDING_INDUCED_FRAMES) {
2595 t->reading_paused_on_pending_induced_frames = true;
2596 GRPC_CHTTP2_IF_TRACING(
2597 gpr_log(GPR_INFO,
2598 "transport %p : Pausing reading due to too "
2599 "many unwritten SETTINGS ACK and RST_STREAM frames",
2600 t));
2601 } else {
2602 continue_read_action_locked(t);
2603 }
2604 } else {
2605 GRPC_CHTTP2_UNREF_TRANSPORT(t, "reading_action");
2606 }
2607
2608 GRPC_ERROR_UNREF(error);
2609 }
2610
continue_read_action_locked(grpc_chttp2_transport * t)2611 static void continue_read_action_locked(grpc_chttp2_transport* t) {
2612 const bool urgent = t->goaway_error != GRPC_ERROR_NONE;
2613 GRPC_CLOSURE_INIT(&t->read_action_locked, read_action, t,
2614 grpc_schedule_on_exec_ctx);
2615 grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action_locked, urgent);
2616 grpc_chttp2_act_on_flowctl_action(t->flow_control->MakeAction(), t, nullptr);
2617 }
2618
2619 // t is reffed prior to calling the first time, and once the callback chain
2620 // that kicks off finishes, it's unreffed
schedule_bdp_ping_locked(grpc_chttp2_transport * t)2621 static void schedule_bdp_ping_locked(grpc_chttp2_transport* t) {
2622 t->flow_control->bdp_estimator()->SchedulePing();
2623 send_ping_locked(
2624 t,
2625 GRPC_CLOSURE_INIT(&t->start_bdp_ping_locked, start_bdp_ping, t,
2626 grpc_schedule_on_exec_ctx),
2627 GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked, finish_bdp_ping, t,
2628 grpc_schedule_on_exec_ctx));
2629 }
2630
start_bdp_ping(void * tp,grpc_error * error)2631 static void start_bdp_ping(void* tp, grpc_error* error) {
2632 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2633 t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_bdp_ping_locked,
2634 start_bdp_ping_locked, t, nullptr),
2635 GRPC_ERROR_REF(error));
2636 }
2637
start_bdp_ping_locked(void * tp,grpc_error * error)2638 static void start_bdp_ping_locked(void* tp, grpc_error* error) {
2639 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2640 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
2641 gpr_log(GPR_INFO, "%s: Start BDP ping err=%s", t->peer_string,
2642 grpc_error_string(error));
2643 }
2644 if (error != GRPC_ERROR_NONE || t->closed_with_error != GRPC_ERROR_NONE) {
2645 return;
2646 }
2647 /* Reset the keepalive ping timer */
2648 if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
2649 grpc_timer_cancel(&t->keepalive_ping_timer);
2650 }
2651 t->flow_control->bdp_estimator()->StartPing();
2652 t->bdp_ping_started = true;
2653 }
2654
finish_bdp_ping(void * tp,grpc_error * error)2655 static void finish_bdp_ping(void* tp, grpc_error* error) {
2656 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2657 t->combiner->Run(GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked,
2658 finish_bdp_ping_locked, t, nullptr),
2659 GRPC_ERROR_REF(error));
2660 }
2661
finish_bdp_ping_locked(void * tp,grpc_error * error)2662 static void finish_bdp_ping_locked(void* tp, grpc_error* error) {
2663 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2664 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
2665 gpr_log(GPR_INFO, "%s: Complete BDP ping err=%s", t->peer_string,
2666 grpc_error_string(error));
2667 }
2668 if (error != GRPC_ERROR_NONE || t->closed_with_error != GRPC_ERROR_NONE) {
2669 GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
2670 return;
2671 }
2672 if (!t->bdp_ping_started) {
2673 /* start_bdp_ping_locked has not been run yet. Schedule
2674 * finish_bdp_ping_locked to be run later. */
2675 t->combiner->Run(GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked,
2676 finish_bdp_ping_locked, t, nullptr),
2677 GRPC_ERROR_REF(error));
2678 return;
2679 }
2680 t->bdp_ping_started = false;
2681 grpc_millis next_ping = t->flow_control->bdp_estimator()->CompletePing();
2682 grpc_chttp2_act_on_flowctl_action(t->flow_control->PeriodicUpdate(), t,
2683 nullptr);
2684 GPR_ASSERT(!t->have_next_bdp_ping_timer);
2685 t->have_next_bdp_ping_timer = true;
2686 GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked,
2687 next_bdp_ping_timer_expired, t, grpc_schedule_on_exec_ctx);
2688 grpc_timer_init(&t->next_bdp_ping_timer, next_ping,
2689 &t->next_bdp_ping_timer_expired_locked);
2690 }
2691
next_bdp_ping_timer_expired(void * tp,grpc_error * error)2692 static void next_bdp_ping_timer_expired(void* tp, grpc_error* error) {
2693 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2694 t->combiner->Run(
2695 GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked,
2696 next_bdp_ping_timer_expired_locked, t, nullptr),
2697 GRPC_ERROR_REF(error));
2698 }
2699
next_bdp_ping_timer_expired_locked(void * tp,grpc_error * error)2700 static void next_bdp_ping_timer_expired_locked(void* tp, grpc_error* error) {
2701 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2702 GPR_ASSERT(t->have_next_bdp_ping_timer);
2703 t->have_next_bdp_ping_timer = false;
2704 if (error != GRPC_ERROR_NONE) {
2705 GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
2706 return;
2707 }
2708 schedule_bdp_ping_locked(t);
2709 }
2710
grpc_chttp2_config_default_keepalive_args(grpc_channel_args * args,bool is_client)2711 void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
2712 bool is_client) {
2713 size_t i;
2714 if (args) {
2715 for (i = 0; i < args->num_args; i++) {
2716 if (0 == strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) {
2717 const int value = grpc_channel_arg_get_integer(
2718 &args->args[i], {is_client ? g_default_client_keepalive_time_ms
2719 : g_default_server_keepalive_time_ms,
2720 1, INT_MAX});
2721 if (is_client) {
2722 g_default_client_keepalive_time_ms = value;
2723 } else {
2724 g_default_server_keepalive_time_ms = value;
2725 }
2726 } else if (0 ==
2727 strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) {
2728 const int value = grpc_channel_arg_get_integer(
2729 &args->args[i], {is_client ? g_default_client_keepalive_timeout_ms
2730 : g_default_server_keepalive_timeout_ms,
2731 0, INT_MAX});
2732 if (is_client) {
2733 g_default_client_keepalive_timeout_ms = value;
2734 } else {
2735 g_default_server_keepalive_timeout_ms = value;
2736 }
2737 } else if (0 == strcmp(args->args[i].key,
2738 GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) {
2739 const bool value = static_cast<uint32_t>(grpc_channel_arg_get_integer(
2740 &args->args[i],
2741 {is_client ? g_default_client_keepalive_permit_without_calls
2742 : g_default_server_keepalive_timeout_ms,
2743 0, 1}));
2744 if (is_client) {
2745 g_default_client_keepalive_permit_without_calls = value;
2746 } else {
2747 g_default_server_keepalive_permit_without_calls = value;
2748 }
2749 } else if (0 ==
2750 strcmp(args->args[i].key, GRPC_ARG_HTTP2_MAX_PING_STRIKES)) {
2751 g_default_max_ping_strikes = grpc_channel_arg_get_integer(
2752 &args->args[i], {g_default_max_ping_strikes, 0, INT_MAX});
2753 } else if (0 == strcmp(args->args[i].key,
2754 GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) {
2755 g_default_max_pings_without_data = grpc_channel_arg_get_integer(
2756 &args->args[i], {g_default_max_pings_without_data, 0, INT_MAX});
2757 } else if (0 ==
2758 strcmp(
2759 args->args[i].key,
2760 GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS)) {
2761 g_default_min_sent_ping_interval_without_data_ms =
2762 grpc_channel_arg_get_integer(
2763 &args->args[i],
2764 {g_default_min_sent_ping_interval_without_data_ms, 0, INT_MAX});
2765 } else if (0 ==
2766 strcmp(
2767 args->args[i].key,
2768 GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS)) {
2769 g_default_min_recv_ping_interval_without_data_ms =
2770 grpc_channel_arg_get_integer(
2771 &args->args[i],
2772 {g_default_min_recv_ping_interval_without_data_ms, 0, INT_MAX});
2773 }
2774 }
2775 }
2776 }
2777
init_keepalive_ping(void * arg,grpc_error * error)2778 static void init_keepalive_ping(void* arg, grpc_error* error) {
2779 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2780 t->combiner->Run(GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked,
2781 init_keepalive_ping_locked, t, nullptr),
2782 GRPC_ERROR_REF(error));
2783 }
2784
init_keepalive_ping_locked(void * arg,grpc_error * error)2785 static void init_keepalive_ping_locked(void* arg, grpc_error* error) {
2786 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2787 GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING);
2788 if (t->destroying || t->closed_with_error != GRPC_ERROR_NONE) {
2789 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
2790 } else if (error == GRPC_ERROR_NONE) {
2791 if (t->keepalive_permit_without_calls ||
2792 grpc_chttp2_stream_map_size(&t->stream_map) > 0) {
2793 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING;
2794 GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end");
2795 grpc_timer_init_unset(&t->keepalive_watchdog_timer);
2796 send_keepalive_ping_locked(t);
2797 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING);
2798 } else {
2799 GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
2800 GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t,
2801 grpc_schedule_on_exec_ctx);
2802 grpc_timer_init(&t->keepalive_ping_timer,
2803 grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
2804 &t->init_keepalive_ping_locked);
2805 }
2806 } else if (error == GRPC_ERROR_CANCELLED) {
2807 /* The keepalive ping timer may be cancelled by bdp */
2808 GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
2809 GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t,
2810 grpc_schedule_on_exec_ctx);
2811 grpc_timer_init(&t->keepalive_ping_timer,
2812 grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
2813 &t->init_keepalive_ping_locked);
2814 }
2815 GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping");
2816 }
2817
start_keepalive_ping(void * arg,grpc_error * error)2818 static void start_keepalive_ping(void* arg, grpc_error* error) {
2819 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2820 t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked,
2821 start_keepalive_ping_locked, t, nullptr),
2822 GRPC_ERROR_REF(error));
2823 }
2824
start_keepalive_ping_locked(void * arg,grpc_error * error)2825 static void start_keepalive_ping_locked(void* arg, grpc_error* error) {
2826 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2827 if (error != GRPC_ERROR_NONE) {
2828 return;
2829 }
2830 if (t->channelz_socket != nullptr) {
2831 t->channelz_socket->RecordKeepaliveSent();
2832 }
2833 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
2834 GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
2835 gpr_log(GPR_INFO, "%s: Start keepalive ping", t->peer_string);
2836 }
2837 GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog");
2838 GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked,
2839 keepalive_watchdog_fired, t, grpc_schedule_on_exec_ctx);
2840 grpc_timer_init(&t->keepalive_watchdog_timer,
2841 grpc_core::ExecCtx::Get()->Now() + t->keepalive_timeout,
2842 &t->keepalive_watchdog_fired_locked);
2843 t->keepalive_ping_started = true;
2844 }
2845
finish_keepalive_ping(void * arg,grpc_error * error)2846 static void finish_keepalive_ping(void* arg, grpc_error* error) {
2847 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2848 t->combiner->Run(GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked,
2849 finish_keepalive_ping_locked, t, nullptr),
2850 GRPC_ERROR_REF(error));
2851 }
2852
finish_keepalive_ping_locked(void * arg,grpc_error * error)2853 static void finish_keepalive_ping_locked(void* arg, grpc_error* error) {
2854 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2855 if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
2856 if (error == GRPC_ERROR_NONE) {
2857 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
2858 GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
2859 gpr_log(GPR_INFO, "%s: Finish keepalive ping", t->peer_string);
2860 }
2861 if (!t->keepalive_ping_started) {
2862 /* start_keepalive_ping_locked has not run yet. Reschedule
2863 * finish_keepalive_ping_locked for it to be run later. */
2864 t->combiner->Run(
2865 GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked,
2866 finish_keepalive_ping_locked, t, nullptr),
2867 GRPC_ERROR_REF(error));
2868 return;
2869 }
2870 t->keepalive_ping_started = false;
2871 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
2872 grpc_timer_cancel(&t->keepalive_watchdog_timer);
2873 GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
2874 GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t,
2875 grpc_schedule_on_exec_ctx);
2876 grpc_timer_init(&t->keepalive_ping_timer,
2877 grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
2878 &t->init_keepalive_ping_locked);
2879 }
2880 }
2881 GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive ping end");
2882 }
2883
keepalive_watchdog_fired(void * arg,grpc_error * error)2884 static void keepalive_watchdog_fired(void* arg, grpc_error* error) {
2885 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2886 t->combiner->Run(
2887 GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked,
2888 keepalive_watchdog_fired_locked, t, nullptr),
2889 GRPC_ERROR_REF(error));
2890 }
2891
keepalive_watchdog_fired_locked(void * arg,grpc_error * error)2892 static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error) {
2893 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2894 if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
2895 if (error == GRPC_ERROR_NONE) {
2896 gpr_log(GPR_INFO, "%s: Keepalive watchdog fired. Closing transport.",
2897 t->peer_string);
2898 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
2899 close_transport_locked(
2900 t, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2901 "keepalive watchdog timeout"),
2902 GRPC_ERROR_INT_GRPC_STATUS,
2903 GRPC_STATUS_UNAVAILABLE));
2904 }
2905 } else {
2906 /* The watchdog timer should have been cancelled by
2907 * finish_keepalive_ping_locked. */
2908 if (GPR_UNLIKELY(error != GRPC_ERROR_CANCELLED)) {
2909 gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)",
2910 t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING);
2911 }
2912 }
2913 GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog");
2914 }
2915
2916 /*******************************************************************************
2917 * CALLBACK LOOP
2918 */
2919
connectivity_state_set(grpc_chttp2_transport * t,grpc_connectivity_state state,const char * reason)2920 static void connectivity_state_set(grpc_chttp2_transport* t,
2921 grpc_connectivity_state state,
2922 const char* reason) {
2923 GRPC_CHTTP2_IF_TRACING(
2924 gpr_log(GPR_INFO, "transport %p set connectivity_state=%d", t, state));
2925 t->state_tracker.SetState(state, reason);
2926 }
2927
2928 /*******************************************************************************
2929 * POLLSET STUFF
2930 */
2931
set_pollset(grpc_transport * gt,grpc_stream *,grpc_pollset * pollset)2932 static void set_pollset(grpc_transport* gt, grpc_stream* /*gs*/,
2933 grpc_pollset* pollset) {
2934 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
2935 grpc_endpoint_add_to_pollset(t->ep, pollset);
2936 }
2937
set_pollset_set(grpc_transport * gt,grpc_stream *,grpc_pollset_set * pollset_set)2938 static void set_pollset_set(grpc_transport* gt, grpc_stream* /*gs*/,
2939 grpc_pollset_set* pollset_set) {
2940 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
2941 grpc_endpoint_add_to_pollset_set(t->ep, pollset_set);
2942 }
2943
2944 /*******************************************************************************
2945 * BYTE STREAM
2946 */
2947
reset_byte_stream(void * arg,grpc_error * error)2948 static void reset_byte_stream(void* arg, grpc_error* error) {
2949 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(arg);
2950 s->pending_byte_stream = false;
2951 if (error == GRPC_ERROR_NONE) {
2952 grpc_chttp2_maybe_complete_recv_message(s->t, s);
2953 grpc_chttp2_maybe_complete_recv_trailing_metadata(s->t, s);
2954 } else {
2955 GPR_ASSERT(error != GRPC_ERROR_NONE);
2956 grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->on_next, GRPC_ERROR_REF(error));
2957 s->on_next = nullptr;
2958 GRPC_ERROR_UNREF(s->byte_stream_error);
2959 s->byte_stream_error = GRPC_ERROR_NONE;
2960 grpc_chttp2_cancel_stream(s->t, s, GRPC_ERROR_REF(error));
2961 s->byte_stream_error = GRPC_ERROR_REF(error);
2962 }
2963 }
2964
2965 namespace grpc_core {
2966
Chttp2IncomingByteStream(grpc_chttp2_transport * transport,grpc_chttp2_stream * stream,uint32_t frame_size,uint32_t flags)2967 Chttp2IncomingByteStream::Chttp2IncomingByteStream(
2968 grpc_chttp2_transport* transport, grpc_chttp2_stream* stream,
2969 uint32_t frame_size, uint32_t flags)
2970 : ByteStream(frame_size, flags),
2971 transport_(transport),
2972 stream_(stream),
2973 refs_(2),
2974 remaining_bytes_(frame_size) {
2975 GRPC_ERROR_UNREF(stream->byte_stream_error);
2976 stream->byte_stream_error = GRPC_ERROR_NONE;
2977 }
2978
OrphanLocked(void * arg,grpc_error *)2979 void Chttp2IncomingByteStream::OrphanLocked(void* arg,
2980 grpc_error* /*error_ignored*/) {
2981 Chttp2IncomingByteStream* bs = static_cast<Chttp2IncomingByteStream*>(arg);
2982 grpc_chttp2_stream* s = bs->stream_;
2983 grpc_chttp2_transport* t = s->t;
2984 bs->Unref();
2985 s->pending_byte_stream = false;
2986 grpc_chttp2_maybe_complete_recv_message(t, s);
2987 grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2988 }
2989
Orphan()2990 void Chttp2IncomingByteStream::Orphan() {
2991 GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0);
2992 transport_->combiner->Run(
2993 GRPC_CLOSURE_INIT(&destroy_action_,
2994 &Chttp2IncomingByteStream::OrphanLocked, this, nullptr),
2995 GRPC_ERROR_NONE);
2996 }
2997
NextLocked(void * arg,grpc_error *)2998 void Chttp2IncomingByteStream::NextLocked(void* arg,
2999 grpc_error* /*error_ignored*/) {
3000 Chttp2IncomingByteStream* bs = static_cast<Chttp2IncomingByteStream*>(arg);
3001 grpc_chttp2_transport* t = bs->transport_;
3002 grpc_chttp2_stream* s = bs->stream_;
3003 size_t cur_length = s->frame_storage.length;
3004 if (!s->read_closed) {
3005 s->flow_control->IncomingByteStreamUpdate(bs->next_action_.max_size_hint,
3006 cur_length);
3007 grpc_chttp2_act_on_flowctl_action(s->flow_control->MakeAction(), t, s);
3008 }
3009 GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0);
3010 if (s->frame_storage.length > 0) {
3011 grpc_slice_buffer_swap(&s->frame_storage,
3012 &s->unprocessed_incoming_frames_buffer);
3013 s->unprocessed_incoming_frames_decompressed = false;
3014 grpc_core::ExecCtx::Run(DEBUG_LOCATION, bs->next_action_.on_complete,
3015 GRPC_ERROR_NONE);
3016 } else if (s->byte_stream_error != GRPC_ERROR_NONE) {
3017 grpc_core::ExecCtx::Run(DEBUG_LOCATION, bs->next_action_.on_complete,
3018 GRPC_ERROR_REF(s->byte_stream_error));
3019 if (s->data_parser.parsing_frame != nullptr) {
3020 s->data_parser.parsing_frame->Unref();
3021 s->data_parser.parsing_frame = nullptr;
3022 }
3023 } else if (s->read_closed) {
3024 if (bs->remaining_bytes_ != 0) {
3025 s->byte_stream_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
3026 "Truncated message", &s->read_closed_error, 1);
3027 grpc_core::ExecCtx::Run(DEBUG_LOCATION, bs->next_action_.on_complete,
3028 GRPC_ERROR_REF(s->byte_stream_error));
3029 if (s->data_parser.parsing_frame != nullptr) {
3030 s->data_parser.parsing_frame->Unref();
3031 s->data_parser.parsing_frame = nullptr;
3032 }
3033 } else {
3034 /* Should never reach here. */
3035 GPR_ASSERT(false);
3036 }
3037 } else {
3038 s->on_next = bs->next_action_.on_complete;
3039 }
3040 bs->Unref();
3041 }
3042
Next(size_t max_size_hint,grpc_closure * on_complete)3043 bool Chttp2IncomingByteStream::Next(size_t max_size_hint,
3044 grpc_closure* on_complete) {
3045 GPR_TIMER_SCOPE("incoming_byte_stream_next", 0);
3046 if (stream_->unprocessed_incoming_frames_buffer.length > 0) {
3047 return true;
3048 } else {
3049 Ref();
3050 next_action_.max_size_hint = max_size_hint;
3051 next_action_.on_complete = on_complete;
3052 transport_->combiner->Run(
3053 GRPC_CLOSURE_INIT(&next_action_.closure,
3054 &Chttp2IncomingByteStream::NextLocked, this, nullptr),
3055 GRPC_ERROR_NONE);
3056 return false;
3057 }
3058 }
3059
MaybeCreateStreamDecompressionCtx()3060 void Chttp2IncomingByteStream::MaybeCreateStreamDecompressionCtx() {
3061 GPR_DEBUG_ASSERT(stream_->stream_decompression_method !=
3062 GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS);
3063 if (!stream_->stream_decompression_ctx) {
3064 stream_->stream_decompression_ctx = grpc_stream_compression_context_create(
3065 stream_->stream_decompression_method);
3066 }
3067 }
3068
Pull(grpc_slice * slice)3069 grpc_error* Chttp2IncomingByteStream::Pull(grpc_slice* slice) {
3070 GPR_TIMER_SCOPE("incoming_byte_stream_pull", 0);
3071 grpc_error* error;
3072 if (stream_->unprocessed_incoming_frames_buffer.length > 0) {
3073 if (!stream_->unprocessed_incoming_frames_decompressed &&
3074 stream_->stream_decompression_method !=
3075 GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
3076 bool end_of_context;
3077 MaybeCreateStreamDecompressionCtx();
3078 if (!grpc_stream_decompress(stream_->stream_decompression_ctx,
3079 &stream_->unprocessed_incoming_frames_buffer,
3080 &stream_->decompressed_data_buffer, nullptr,
3081 MAX_SIZE_T, &end_of_context)) {
3082 error =
3083 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream decompression error.");
3084 return error;
3085 }
3086 GPR_ASSERT(stream_->unprocessed_incoming_frames_buffer.length == 0);
3087 grpc_slice_buffer_swap(&stream_->unprocessed_incoming_frames_buffer,
3088 &stream_->decompressed_data_buffer);
3089 stream_->unprocessed_incoming_frames_decompressed = true;
3090 if (end_of_context) {
3091 grpc_stream_compression_context_destroy(
3092 stream_->stream_decompression_ctx);
3093 stream_->stream_decompression_ctx = nullptr;
3094 }
3095 if (stream_->unprocessed_incoming_frames_buffer.length == 0) {
3096 *slice = grpc_empty_slice();
3097 }
3098 }
3099 error = grpc_deframe_unprocessed_incoming_frames(
3100 &stream_->data_parser, stream_,
3101 &stream_->unprocessed_incoming_frames_buffer, slice, nullptr);
3102 if (error != GRPC_ERROR_NONE) {
3103 return error;
3104 }
3105 } else {
3106 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
3107 stream_->t->combiner->Run(&stream_->reset_byte_stream,
3108 GRPC_ERROR_REF(error));
3109 return error;
3110 }
3111 return GRPC_ERROR_NONE;
3112 }
3113
PublishError(grpc_error * error)3114 void Chttp2IncomingByteStream::PublishError(grpc_error* error) {
3115 GPR_ASSERT(error != GRPC_ERROR_NONE);
3116 grpc_core::ExecCtx::Run(DEBUG_LOCATION, stream_->on_next,
3117 GRPC_ERROR_REF(error));
3118 stream_->on_next = nullptr;
3119 GRPC_ERROR_UNREF(stream_->byte_stream_error);
3120 stream_->byte_stream_error = GRPC_ERROR_REF(error);
3121 grpc_chttp2_cancel_stream(transport_, stream_, GRPC_ERROR_REF(error));
3122 }
3123
Push(const grpc_slice & slice,grpc_slice * slice_out)3124 grpc_error* Chttp2IncomingByteStream::Push(const grpc_slice& slice,
3125 grpc_slice* slice_out) {
3126 if (remaining_bytes_ < GRPC_SLICE_LENGTH(slice)) {
3127 grpc_error* error =
3128 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream");
3129 transport_->combiner->Run(&stream_->reset_byte_stream,
3130 GRPC_ERROR_REF(error));
3131 grpc_slice_unref_internal(slice);
3132 return error;
3133 } else {
3134 remaining_bytes_ -= static_cast<uint32_t> GRPC_SLICE_LENGTH(slice);
3135 if (slice_out != nullptr) {
3136 *slice_out = slice;
3137 }
3138 return GRPC_ERROR_NONE;
3139 }
3140 }
3141
Finished(grpc_error * error,bool reset_on_error)3142 grpc_error* Chttp2IncomingByteStream::Finished(grpc_error* error,
3143 bool reset_on_error) {
3144 if (error == GRPC_ERROR_NONE) {
3145 if (remaining_bytes_ != 0) {
3146 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
3147 }
3148 }
3149 if (error != GRPC_ERROR_NONE && reset_on_error) {
3150 transport_->combiner->Run(&stream_->reset_byte_stream,
3151 GRPC_ERROR_REF(error));
3152 }
3153 Unref();
3154 return error;
3155 }
3156
Shutdown(grpc_error * error)3157 void Chttp2IncomingByteStream::Shutdown(grpc_error* error) {
3158 GRPC_ERROR_UNREF(Finished(error, true /* reset_on_error */));
3159 }
3160
3161 } // namespace grpc_core
3162
3163 /*******************************************************************************
3164 * RESOURCE QUOTAS
3165 */
3166
post_benign_reclaimer(grpc_chttp2_transport * t)3167 static void post_benign_reclaimer(grpc_chttp2_transport* t) {
3168 if (!t->benign_reclaimer_registered) {
3169 t->benign_reclaimer_registered = true;
3170 GRPC_CHTTP2_REF_TRANSPORT(t, "benign_reclaimer");
3171 GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked, benign_reclaimer, t,
3172 grpc_schedule_on_exec_ctx);
3173 grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep),
3174 false, &t->benign_reclaimer_locked);
3175 }
3176 }
3177
post_destructive_reclaimer(grpc_chttp2_transport * t)3178 static void post_destructive_reclaimer(grpc_chttp2_transport* t) {
3179 if (!t->destructive_reclaimer_registered) {
3180 t->destructive_reclaimer_registered = true;
3181 GRPC_CHTTP2_REF_TRANSPORT(t, "destructive_reclaimer");
3182 GRPC_CLOSURE_INIT(&t->destructive_reclaimer_locked, destructive_reclaimer,
3183 t, grpc_schedule_on_exec_ctx);
3184 grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep),
3185 true, &t->destructive_reclaimer_locked);
3186 }
3187 }
3188
benign_reclaimer(void * arg,grpc_error * error)3189 static void benign_reclaimer(void* arg, grpc_error* error) {
3190 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
3191 t->combiner->Run(GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked,
3192 benign_reclaimer_locked, t, nullptr),
3193 GRPC_ERROR_REF(error));
3194 }
3195
benign_reclaimer_locked(void * arg,grpc_error * error)3196 static void benign_reclaimer_locked(void* arg, grpc_error* error) {
3197 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
3198 if (error == GRPC_ERROR_NONE &&
3199 grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
3200 /* Channel with no active streams: send a goaway to try and make it
3201 * disconnect cleanly */
3202 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
3203 gpr_log(GPR_INFO, "HTTP2: %s - send goaway to free memory",
3204 t->peer_string);
3205 }
3206 send_goaway(t,
3207 grpc_error_set_int(
3208 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"),
3209 GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM));
3210 } else if (error == GRPC_ERROR_NONE &&
3211 GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
3212 gpr_log(GPR_INFO,
3213 "HTTP2: %s - skip benign reclamation, there are still %" PRIdPTR
3214 " streams",
3215 t->peer_string, grpc_chttp2_stream_map_size(&t->stream_map));
3216 }
3217 t->benign_reclaimer_registered = false;
3218 if (error != GRPC_ERROR_CANCELLED) {
3219 grpc_resource_user_finish_reclamation(
3220 grpc_endpoint_get_resource_user(t->ep));
3221 }
3222 GRPC_CHTTP2_UNREF_TRANSPORT(t, "benign_reclaimer");
3223 }
3224
destructive_reclaimer(void * arg,grpc_error * error)3225 static void destructive_reclaimer(void* arg, grpc_error* error) {
3226 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
3227 t->combiner->Run(GRPC_CLOSURE_INIT(&t->destructive_reclaimer_locked,
3228 destructive_reclaimer_locked, t, nullptr),
3229 GRPC_ERROR_REF(error));
3230 }
3231
destructive_reclaimer_locked(void * arg,grpc_error * error)3232 static void destructive_reclaimer_locked(void* arg, grpc_error* error) {
3233 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
3234 size_t n = grpc_chttp2_stream_map_size(&t->stream_map);
3235 t->destructive_reclaimer_registered = false;
3236 if (error == GRPC_ERROR_NONE && n > 0) {
3237 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(
3238 grpc_chttp2_stream_map_rand(&t->stream_map));
3239 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
3240 gpr_log(GPR_INFO, "HTTP2: %s - abandon stream id %d", t->peer_string,
3241 s->id);
3242 }
3243 grpc_chttp2_cancel_stream(
3244 t, s,
3245 grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"),
3246 GRPC_ERROR_INT_HTTP2_ERROR,
3247 GRPC_HTTP2_ENHANCE_YOUR_CALM));
3248 if (n > 1) {
3249 /* Since we cancel one stream per destructive reclamation, if
3250 there are more streams left, we can immediately post a new
3251 reclaimer in case the resource quota needs to free more
3252 memory */
3253 post_destructive_reclaimer(t);
3254 }
3255 }
3256 if (error != GRPC_ERROR_CANCELLED) {
3257 grpc_resource_user_finish_reclamation(
3258 grpc_endpoint_get_resource_user(t->ep));
3259 }
3260 GRPC_CHTTP2_UNREF_TRANSPORT(t, "destructive_reclaimer");
3261 }
3262
3263 /*******************************************************************************
3264 * MONITORING
3265 */
3266
grpc_chttp2_initiate_write_reason_string(grpc_chttp2_initiate_write_reason reason)3267 const char* grpc_chttp2_initiate_write_reason_string(
3268 grpc_chttp2_initiate_write_reason reason) {
3269 switch (reason) {
3270 case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE:
3271 return "INITIAL_WRITE";
3272 case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM:
3273 return "START_NEW_STREAM";
3274 case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE:
3275 return "SEND_MESSAGE";
3276 case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA:
3277 return "SEND_INITIAL_METADATA";
3278 case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA:
3279 return "SEND_TRAILING_METADATA";
3280 case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING:
3281 return "RETRY_SEND_PING";
3282 case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS:
3283 return "CONTINUE_PINGS";
3284 case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT:
3285 return "GOAWAY_SENT";
3286 case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM:
3287 return "RST_STREAM";
3288 case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API:
3289 return "CLOSE_FROM_API";
3290 case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL:
3291 return "STREAM_FLOW_CONTROL";
3292 case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL:
3293 return "TRANSPORT_FLOW_CONTROL";
3294 case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS:
3295 return "SEND_SETTINGS";
3296 case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING:
3297 return "FLOW_CONTROL_UNSTALLED_BY_SETTING";
3298 case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE:
3299 return "FLOW_CONTROL_UNSTALLED_BY_UPDATE";
3300 case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING:
3301 return "APPLICATION_PING";
3302 case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING:
3303 return "KEEPALIVE_PING";
3304 case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED:
3305 return "TRANSPORT_FLOW_CONTROL_UNSTALLED";
3306 case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE:
3307 return "PING_RESPONSE";
3308 case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM:
3309 return "FORCE_RST_STREAM";
3310 }
3311 GPR_UNREACHABLE_CODE(return "unknown");
3312 }
3313
chttp2_get_endpoint(grpc_transport * t)3314 static grpc_endpoint* chttp2_get_endpoint(grpc_transport* t) {
3315 return (reinterpret_cast<grpc_chttp2_transport*>(t))->ep;
3316 }
3317
3318 static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
3319 "chttp2",
3320 init_stream,
3321 set_pollset,
3322 set_pollset_set,
3323 perform_stream_op,
3324 perform_transport_op,
3325 destroy_stream,
3326 destroy_transport,
3327 chttp2_get_endpoint};
3328
get_vtable(void)3329 static const grpc_transport_vtable* get_vtable(void) { return &vtable; }
3330
3331 grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode>
grpc_chttp2_transport_get_socket_node(grpc_transport * transport)3332 grpc_chttp2_transport_get_socket_node(grpc_transport* transport) {
3333 grpc_chttp2_transport* t =
3334 reinterpret_cast<grpc_chttp2_transport*>(transport);
3335 return t->channelz_socket;
3336 }
3337
grpc_create_chttp2_transport(const grpc_channel_args * channel_args,grpc_endpoint * ep,bool is_client,grpc_resource_user * resource_user)3338 grpc_transport* grpc_create_chttp2_transport(
3339 const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client,
3340 grpc_resource_user* resource_user) {
3341 auto t =
3342 new grpc_chttp2_transport(channel_args, ep, is_client, resource_user);
3343 return &t->base;
3344 }
3345
grpc_chttp2_transport_start_reading(grpc_transport * transport,grpc_slice_buffer * read_buffer,grpc_closure * notify_on_receive_settings)3346 void grpc_chttp2_transport_start_reading(
3347 grpc_transport* transport, grpc_slice_buffer* read_buffer,
3348 grpc_closure* notify_on_receive_settings) {
3349 grpc_chttp2_transport* t =
3350 reinterpret_cast<grpc_chttp2_transport*>(transport);
3351 GRPC_CHTTP2_REF_TRANSPORT(
3352 t, "reading_action"); /* matches unref inside reading_action */
3353 if (read_buffer != nullptr) {
3354 grpc_slice_buffer_move_into(read_buffer, &t->read_buffer);
3355 gpr_free(read_buffer);
3356 }
3357 t->notify_on_receive_settings = notify_on_receive_settings;
3358 t->combiner->Run(
3359 GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t, nullptr),
3360 GRPC_ERROR_NONE);
3361 }
3362