1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
20
21 #include "absl/strings/str_format.h"
22
23 #include <grpc/slice_buffer.h>
24 #include <grpc/support/alloc.h>
25 #include <grpc/support/log.h>
26 #include <grpc/support/port_platform.h>
27 #include <grpc/support/string_util.h>
28 #include <inttypes.h>
29 #include <limits.h>
30 #include <math.h>
31 #include <stdio.h>
32 #include <string.h>
33
34 #include "src/core/ext/transport/chttp2/transport/context_list.h"
35 #include "src/core/ext/transport/chttp2/transport/frame_data.h"
36 #include "src/core/ext/transport/chttp2/transport/internal.h"
37 #include "src/core/ext/transport/chttp2/transport/varint.h"
38 #include "src/core/lib/channel/channel_args.h"
39 #include "src/core/lib/compression/stream_compression.h"
40 #include "src/core/lib/debug/stats.h"
41 #include "src/core/lib/gpr/env.h"
42 #include "src/core/lib/gpr/string.h"
43 #include "src/core/lib/gprpp/memory.h"
44 #include "src/core/lib/http/parser.h"
45 #include "src/core/lib/iomgr/executor.h"
46 #include "src/core/lib/iomgr/iomgr.h"
47 #include "src/core/lib/iomgr/timer.h"
48 #include "src/core/lib/profiling/timers.h"
49 #include "src/core/lib/slice/slice_internal.h"
50 #include "src/core/lib/slice/slice_string_helpers.h"
51 #include "src/core/lib/transport/error_utils.h"
52 #include "src/core/lib/transport/http2_errors.h"
53 #include "src/core/lib/transport/static_metadata.h"
54 #include "src/core/lib/transport/status_conversion.h"
55 #include "src/core/lib/transport/timeout_encoding.h"
56 #include "src/core/lib/transport/transport.h"
57 #include "src/core/lib/transport/transport_impl.h"
58 #include "src/core/lib/uri/uri_parser.h"
59
60 #define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
61 #define MAX_WINDOW 0x7fffffffu
62 #define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024)
63 #define DEFAULT_MAX_HEADER_LIST_SIZE (8 * 1024)
64
65 #define DEFAULT_CLIENT_KEEPALIVE_TIME_MS INT_MAX
66 #define DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_MS 20000 /* 20 seconds */
67 #define DEFAULT_SERVER_KEEPALIVE_TIME_MS 7200000 /* 2 hours */
68 #define DEFAULT_SERVER_KEEPALIVE_TIMEOUT_MS 20000 /* 20 seconds */
69 #define DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS false
70 #define KEEPALIVE_TIME_BACKOFF_MULTIPLIER 2
71
72 #define DEFAULT_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS 300000 /* 5 minutes */
73 #define DEFAULT_MAX_PINGS_BETWEEN_DATA 2
74 #define DEFAULT_MAX_PING_STRIKES 2
75
76 #define DEFAULT_MAX_PENDING_INDUCED_FRAMES 10000
77
78 static int g_default_client_keepalive_time_ms =
79 DEFAULT_CLIENT_KEEPALIVE_TIME_MS;
80 static int g_default_client_keepalive_timeout_ms =
81 DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_MS;
82 static int g_default_server_keepalive_time_ms =
83 DEFAULT_SERVER_KEEPALIVE_TIME_MS;
84 static int g_default_server_keepalive_timeout_ms =
85 DEFAULT_SERVER_KEEPALIVE_TIMEOUT_MS;
86 static bool g_default_client_keepalive_permit_without_calls =
87 DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS;
88 static bool g_default_server_keepalive_permit_without_calls =
89 DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS;
90
91 static int g_default_min_recv_ping_interval_without_data_ms =
92 DEFAULT_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS;
93 static int g_default_max_pings_without_data = DEFAULT_MAX_PINGS_BETWEEN_DATA;
94 static int g_default_max_ping_strikes = DEFAULT_MAX_PING_STRIKES;
95
96 #define MAX_CLIENT_STREAM_ID 0x7fffffffu
97 grpc_core::TraceFlag grpc_http_trace(false, "http");
98 grpc_core::TraceFlag grpc_keepalive_trace(false, "http_keepalive");
99 grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount(false,
100 "chttp2_refcount");
101
102 // forward declarations of various callbacks that we'll build closures around
103 static void write_action_begin_locked(void* t, grpc_error* error);
104 static void write_action(void* t, grpc_error* error);
105 static void write_action_end(void* t, grpc_error* error);
106 static void write_action_end_locked(void* t, grpc_error* error);
107
108 static void read_action(void* t, grpc_error* error);
109 static void read_action_locked(void* t, grpc_error* error);
110 static void continue_read_action_locked(grpc_chttp2_transport* t);
111
112 static void complete_fetch(void* gs, grpc_error* error);
113 static void complete_fetch_locked(void* gs, grpc_error* error);
114 // Set a transport level setting, and push it to our peer
115 static void queue_setting_update(grpc_chttp2_transport* t,
116 grpc_chttp2_setting_id id, uint32_t value);
117
118 static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
119 grpc_error* error);
120
121 // Start new streams that have been created if we can
122 static void maybe_start_some_streams(grpc_chttp2_transport* t);
123
124 static void connectivity_state_set(grpc_chttp2_transport* t,
125 grpc_connectivity_state state,
126 const absl::Status& status,
127 const char* reason);
128
129 static void benign_reclaimer(void* arg, grpc_error* error);
130 static void destructive_reclaimer(void* arg, grpc_error* error);
131 static void benign_reclaimer_locked(void* arg, grpc_error* error);
132 static void destructive_reclaimer_locked(void* arg, grpc_error* error);
133
134 static void post_benign_reclaimer(grpc_chttp2_transport* t);
135 static void post_destructive_reclaimer(grpc_chttp2_transport* t);
136
137 static void close_transport_locked(grpc_chttp2_transport* t, grpc_error* error);
138 static void end_all_the_calls(grpc_chttp2_transport* t, grpc_error* error);
139
140 static void start_bdp_ping(void* tp, grpc_error* error);
141 static void finish_bdp_ping(void* tp, grpc_error* error);
142 static void start_bdp_ping_locked(void* tp, grpc_error* error);
143 static void finish_bdp_ping_locked(void* tp, grpc_error* error);
144 static void next_bdp_ping_timer_expired(void* tp, grpc_error* error);
145 static void next_bdp_ping_timer_expired_locked(void* tp, grpc_error* error);
146
147 static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error);
148 static void send_ping_locked(grpc_chttp2_transport* t,
149 grpc_closure* on_initiate, grpc_closure* on_ack);
150 static void retry_initiate_ping_locked(void* tp, grpc_error* error);
151
152 // keepalive-relevant functions
153 static void init_keepalive_ping(void* arg, grpc_error* error);
154 static void init_keepalive_ping_locked(void* arg, grpc_error* error);
155 static void start_keepalive_ping(void* arg, grpc_error* error);
156 static void finish_keepalive_ping(void* arg, grpc_error* error);
157 static void start_keepalive_ping_locked(void* arg, grpc_error* error);
158 static void finish_keepalive_ping_locked(void* arg, grpc_error* error);
159 static void keepalive_watchdog_fired(void* arg, grpc_error* error);
160 static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error);
161
162 static void reset_byte_stream(void* arg, grpc_error* error);
163
164 // Flow control default enabled. Can be disabled by setting
165 // GRPC_EXPERIMENTAL_DISABLE_FLOW_CONTROL
166 bool g_flow_control_enabled = true;
167
168 //
169 // CONSTRUCTION/DESTRUCTION/REFCOUNTING
170 //
171
~grpc_chttp2_transport()172 grpc_chttp2_transport::~grpc_chttp2_transport() {
173 size_t i;
174
175 if (channelz_socket != nullptr) {
176 channelz_socket.reset();
177 }
178
179 grpc_endpoint_destroy(ep);
180
181 grpc_slice_buffer_destroy_internal(&qbuf);
182
183 grpc_slice_buffer_destroy_internal(&outbuf);
184 grpc_chttp2_hpack_compressor_destroy(&hpack_compressor);
185
186 grpc_error* error =
187 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed");
188 // ContextList::Execute follows semantics of a callback function and does not
189 // take a ref on error
190 grpc_core::ContextList::Execute(cl, nullptr, error);
191 GRPC_ERROR_UNREF(error);
192 cl = nullptr;
193
194 grpc_slice_buffer_destroy_internal(&read_buffer);
195 grpc_chttp2_hpack_parser_destroy(&hpack_parser);
196 grpc_chttp2_goaway_parser_destroy(&goaway_parser);
197
198 for (i = 0; i < STREAM_LIST_COUNT; i++) {
199 GPR_ASSERT(lists[i].head == nullptr);
200 GPR_ASSERT(lists[i].tail == nullptr);
201 }
202
203 GRPC_ERROR_UNREF(goaway_error);
204
205 GPR_ASSERT(grpc_chttp2_stream_map_size(&stream_map) == 0);
206
207 grpc_chttp2_stream_map_destroy(&stream_map);
208
209 GRPC_COMBINER_UNREF(combiner, "chttp2_transport");
210
211 cancel_pings(this,
212 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"));
213
214 while (write_cb_pool) {
215 grpc_chttp2_write_cb* next = write_cb_pool->next;
216 gpr_free(write_cb_pool);
217 write_cb_pool = next;
218 }
219
220 flow_control.Destroy();
221
222 GRPC_ERROR_UNREF(closed_with_error);
223 gpr_free(ping_acks);
224 }
225
226 static const grpc_transport_vtable* get_vtable(void);
227
228 // Returns whether bdp is enabled
read_channel_args(grpc_chttp2_transport * t,const grpc_channel_args * channel_args,bool is_client)229 static bool read_channel_args(grpc_chttp2_transport* t,
230 const grpc_channel_args* channel_args,
231 bool is_client) {
232 bool enable_bdp = true;
233 bool channelz_enabled = GRPC_ENABLE_CHANNELZ_DEFAULT;
234 size_t i;
235 int j;
236
237 for (i = 0; i < channel_args->num_args; i++) {
238 if (0 == strcmp(channel_args->args[i].key,
239 GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER)) {
240 const grpc_integer_options options = {-1, 0, INT_MAX};
241 const int value =
242 grpc_channel_arg_get_integer(&channel_args->args[i], options);
243 if (value >= 0) {
244 if ((t->next_stream_id & 1) != (value & 1)) {
245 gpr_log(GPR_ERROR, "%s: low bit must be %d on %s",
246 GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER, t->next_stream_id & 1,
247 is_client ? "client" : "server");
248 } else {
249 t->next_stream_id = static_cast<uint32_t>(value);
250 }
251 }
252 } else if (0 == strcmp(channel_args->args[i].key,
253 GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER)) {
254 const grpc_integer_options options = {-1, 0, INT_MAX};
255 const int value =
256 grpc_channel_arg_get_integer(&channel_args->args[i], options);
257 if (value >= 0) {
258 grpc_chttp2_hpack_compressor_set_max_usable_size(
259 &t->hpack_compressor, static_cast<uint32_t>(value));
260 }
261 } else if (0 == strcmp(channel_args->args[i].key,
262 GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) {
263 t->ping_policy.max_pings_without_data = grpc_channel_arg_get_integer(
264 &channel_args->args[i],
265 {g_default_max_pings_without_data, 0, INT_MAX});
266 } else if (0 == strcmp(channel_args->args[i].key,
267 GRPC_ARG_HTTP2_MAX_PING_STRIKES)) {
268 t->ping_policy.max_ping_strikes = grpc_channel_arg_get_integer(
269 &channel_args->args[i], {g_default_max_ping_strikes, 0, INT_MAX});
270 } else if (0 ==
271 strcmp(channel_args->args[i].key,
272 GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS)) {
273 t->ping_policy.min_recv_ping_interval_without_data =
274 grpc_channel_arg_get_integer(
275 &channel_args->args[i],
276 grpc_integer_options{
277 g_default_min_recv_ping_interval_without_data_ms, 0,
278 INT_MAX});
279 } else if (0 == strcmp(channel_args->args[i].key,
280 GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)) {
281 t->write_buffer_size = static_cast<uint32_t>(grpc_channel_arg_get_integer(
282 &channel_args->args[i], {0, 0, MAX_WRITE_BUFFER_SIZE}));
283 } else if (0 ==
284 strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) {
285 enable_bdp = grpc_channel_arg_get_bool(&channel_args->args[i], true);
286 } else if (0 ==
287 strcmp(channel_args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) {
288 const int value = grpc_channel_arg_get_integer(
289 &channel_args->args[i],
290 grpc_integer_options{t->is_client
291 ? g_default_client_keepalive_time_ms
292 : g_default_server_keepalive_time_ms,
293 1, INT_MAX});
294 t->keepalive_time = value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value;
295 } else if (0 == strcmp(channel_args->args[i].key,
296 GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) {
297 const int value = grpc_channel_arg_get_integer(
298 &channel_args->args[i],
299 grpc_integer_options{t->is_client
300 ? g_default_client_keepalive_timeout_ms
301 : g_default_server_keepalive_timeout_ms,
302 0, INT_MAX});
303 t->keepalive_timeout = value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value;
304 } else if (0 == strcmp(channel_args->args[i].key,
305 GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) {
306 t->keepalive_permit_without_calls = static_cast<uint32_t>(
307 grpc_channel_arg_get_integer(&channel_args->args[i], {0, 0, 1}));
308 } else if (0 == strcmp(channel_args->args[i].key,
309 GRPC_ARG_OPTIMIZATION_TARGET)) {
310 gpr_log(GPR_INFO, "GRPC_ARG_OPTIMIZATION_TARGET is deprecated");
311 } else if (0 ==
312 strcmp(channel_args->args[i].key, GRPC_ARG_ENABLE_CHANNELZ)) {
313 channelz_enabled = grpc_channel_arg_get_bool(
314 &channel_args->args[i], GRPC_ENABLE_CHANNELZ_DEFAULT);
315 } else {
316 static const struct {
317 const char* channel_arg_name;
318 grpc_chttp2_setting_id setting_id;
319 grpc_integer_options integer_options;
320 bool availability[2] /* server, client */;
321 } settings_map[] = {{GRPC_ARG_MAX_CONCURRENT_STREAMS,
322 GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
323 {-1, 0, INT32_MAX},
324 {true, false}},
325 {GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER,
326 GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE,
327 {-1, 0, INT32_MAX},
328 {true, true}},
329 {GRPC_ARG_MAX_METADATA_SIZE,
330 GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
331 {-1, 0, INT32_MAX},
332 {true, true}},
333 {GRPC_ARG_HTTP2_MAX_FRAME_SIZE,
334 GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
335 {-1, 16384, 16777215},
336 {true, true}},
337 {GRPC_ARG_HTTP2_ENABLE_TRUE_BINARY,
338 GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA,
339 {1, 0, 1},
340 {true, true}},
341 {GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES,
342 GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
343 {-1, 5, INT32_MAX},
344 {true, true}}};
345 for (j = 0; j < static_cast<int> GPR_ARRAY_SIZE(settings_map); j++) {
346 if (0 == strcmp(channel_args->args[i].key,
347 settings_map[j].channel_arg_name)) {
348 if (!settings_map[j].availability[is_client]) {
349 gpr_log(GPR_DEBUG, "%s is not available on %s",
350 settings_map[j].channel_arg_name,
351 is_client ? "clients" : "servers");
352 } else {
353 int value = grpc_channel_arg_get_integer(
354 &channel_args->args[i], settings_map[j].integer_options);
355 if (value >= 0) {
356 queue_setting_update(t, settings_map[j].setting_id,
357 static_cast<uint32_t>(value));
358 }
359 }
360 break;
361 }
362 }
363 }
364 }
365 if (channelz_enabled) {
366 t->channelz_socket =
367 grpc_core::MakeRefCounted<grpc_core::channelz::SocketNode>(
368 std::string(grpc_endpoint_get_local_address(t->ep)), t->peer_string,
369 absl::StrFormat("%s %s", get_vtable()->name, t->peer_string));
370 }
371 return enable_bdp;
372 }
373
init_transport_keepalive_settings(grpc_chttp2_transport * t)374 static void init_transport_keepalive_settings(grpc_chttp2_transport* t) {
375 if (t->is_client) {
376 t->keepalive_time = g_default_client_keepalive_time_ms == INT_MAX
377 ? GRPC_MILLIS_INF_FUTURE
378 : g_default_client_keepalive_time_ms;
379 t->keepalive_timeout = g_default_client_keepalive_timeout_ms == INT_MAX
380 ? GRPC_MILLIS_INF_FUTURE
381 : g_default_client_keepalive_timeout_ms;
382 t->keepalive_permit_without_calls =
383 g_default_client_keepalive_permit_without_calls;
384 } else {
385 t->keepalive_time = g_default_server_keepalive_time_ms == INT_MAX
386 ? GRPC_MILLIS_INF_FUTURE
387 : g_default_server_keepalive_time_ms;
388 t->keepalive_timeout = g_default_server_keepalive_timeout_ms == INT_MAX
389 ? GRPC_MILLIS_INF_FUTURE
390 : g_default_server_keepalive_timeout_ms;
391 t->keepalive_permit_without_calls =
392 g_default_server_keepalive_permit_without_calls;
393 }
394 }
395
configure_transport_ping_policy(grpc_chttp2_transport * t)396 static void configure_transport_ping_policy(grpc_chttp2_transport* t) {
397 t->ping_policy.max_pings_without_data = g_default_max_pings_without_data;
398 t->ping_policy.max_ping_strikes = g_default_max_ping_strikes;
399 t->ping_policy.min_recv_ping_interval_without_data =
400 g_default_min_recv_ping_interval_without_data_ms;
401 }
402
init_keepalive_pings_if_enabled(grpc_chttp2_transport * t)403 static void init_keepalive_pings_if_enabled(grpc_chttp2_transport* t) {
404 if (t->keepalive_time != GRPC_MILLIS_INF_FUTURE) {
405 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
406 GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
407 GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t,
408 grpc_schedule_on_exec_ctx);
409 grpc_timer_init(&t->keepalive_ping_timer,
410 grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
411 &t->init_keepalive_ping_locked);
412 } else {
413 // Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no
414 // inflight keeaplive timers
415 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED;
416 }
417 }
418
grpc_chttp2_transport(const grpc_channel_args * channel_args,grpc_endpoint * ep,bool is_client,grpc_resource_user * resource_user)419 grpc_chttp2_transport::grpc_chttp2_transport(
420 const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client,
421 grpc_resource_user* resource_user)
422 : refs(1, GRPC_TRACE_FLAG_ENABLED(grpc_trace_chttp2_refcount)
423 ? "chttp2_refcount"
424 : nullptr),
425 ep(ep),
426 peer_string(grpc_endpoint_get_peer(ep)),
427 resource_user(resource_user),
428 combiner(grpc_combiner_create()),
429 state_tracker(is_client ? "client_transport" : "server_transport",
430 GRPC_CHANNEL_READY),
431 is_client(is_client),
432 next_stream_id(is_client ? 1 : 2),
433 deframe_state(is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0) {
434 GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) ==
435 GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
436 base.vtable = get_vtable();
437 // 8 is a random stab in the dark as to a good initial size: it's small enough
438 // that it shouldn't waste memory for infrequently used connections, yet
439 // large enough that the exponential growth should happen nicely when it's
440 // needed.
441 // TODO(ctiller): tune this
442 grpc_chttp2_stream_map_init(&stream_map, 8);
443
444 grpc_slice_buffer_init(&read_buffer);
445 grpc_slice_buffer_init(&outbuf);
446 if (is_client) {
447 grpc_slice_buffer_add(&outbuf, grpc_slice_from_copied_string(
448 GRPC_CHTTP2_CLIENT_CONNECT_STRING));
449 }
450 grpc_chttp2_hpack_compressor_init(&hpack_compressor);
451 grpc_slice_buffer_init(&qbuf);
452 // copy in initial settings to all setting sets
453 size_t i;
454 int j;
455 for (i = 0; i < GRPC_CHTTP2_NUM_SETTINGS; i++) {
456 for (j = 0; j < GRPC_NUM_SETTING_SETS; j++) {
457 settings[j][i] = grpc_chttp2_settings_parameters[i].default_value;
458 }
459 }
460 grpc_chttp2_hpack_parser_init(&hpack_parser);
461 grpc_chttp2_goaway_parser_init(&goaway_parser);
462
463 // configure http2 the way we like it
464 if (is_client) {
465 queue_setting_update(this, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
466 queue_setting_update(this, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
467 }
468 queue_setting_update(this, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
469 DEFAULT_MAX_HEADER_LIST_SIZE);
470 queue_setting_update(this,
471 GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, 1);
472
473 configure_transport_ping_policy(this);
474 init_transport_keepalive_settings(this);
475
476 bool enable_bdp = true;
477 if (channel_args) {
478 enable_bdp = read_channel_args(this, channel_args, is_client);
479 }
480
481 if (g_flow_control_enabled) {
482 flow_control.Init<grpc_core::chttp2::TransportFlowControl>(this,
483 enable_bdp);
484 } else {
485 flow_control.Init<grpc_core::chttp2::TransportFlowControlDisabled>(this);
486 enable_bdp = false;
487 }
488
489 // No pings allowed before receiving a header or data frame.
490 ping_state.pings_before_data_required = 0;
491 ping_state.is_delayed_ping_timer_set = false;
492 ping_state.last_ping_sent_time = GRPC_MILLIS_INF_PAST;
493
494 ping_recv_state.last_ping_recv_time = GRPC_MILLIS_INF_PAST;
495 ping_recv_state.ping_strikes = 0;
496
497 init_keepalive_pings_if_enabled(this);
498
499 if (enable_bdp) {
500 bdp_ping_blocked = true;
501 grpc_chttp2_act_on_flowctl_action(flow_control->PeriodicUpdate(), this,
502 nullptr);
503 }
504
505 grpc_chttp2_initiate_write(this, GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE);
506 post_benign_reclaimer(this);
507 }
508
destroy_transport_locked(void * tp,grpc_error *)509 static void destroy_transport_locked(void* tp, grpc_error* /*error*/) {
510 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
511 t->destroying = 1;
512 close_transport_locked(
513 t, grpc_error_set_int(
514 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"),
515 GRPC_ERROR_INT_OCCURRED_DURING_WRITE, t->write_state));
516 // Must be the last line.
517 GRPC_CHTTP2_UNREF_TRANSPORT(t, "destroy");
518 }
519
destroy_transport(grpc_transport * gt)520 static void destroy_transport(grpc_transport* gt) {
521 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
522 t->combiner->Run(GRPC_CLOSURE_CREATE(destroy_transport_locked, t, nullptr),
523 GRPC_ERROR_NONE);
524 }
525
close_transport_locked(grpc_chttp2_transport * t,grpc_error * error)526 static void close_transport_locked(grpc_chttp2_transport* t,
527 grpc_error* error) {
528 end_all_the_calls(t, GRPC_ERROR_REF(error));
529 cancel_pings(t, GRPC_ERROR_REF(error));
530 if (t->closed_with_error == GRPC_ERROR_NONE) {
531 if (!grpc_error_has_clear_grpc_status(error)) {
532 error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
533 GRPC_STATUS_UNAVAILABLE);
534 }
535 if (t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE) {
536 if (t->close_transport_on_writes_finished == nullptr) {
537 t->close_transport_on_writes_finished =
538 GRPC_ERROR_CREATE_FROM_STATIC_STRING(
539 "Delayed close due to in-progress write");
540 }
541 t->close_transport_on_writes_finished =
542 grpc_error_add_child(t->close_transport_on_writes_finished, error);
543 return;
544 }
545 GPR_ASSERT(error != GRPC_ERROR_NONE);
546 t->closed_with_error = GRPC_ERROR_REF(error);
547 connectivity_state_set(t, GRPC_CHANNEL_SHUTDOWN, absl::Status(),
548 "close_transport");
549 if (t->ping_state.is_delayed_ping_timer_set) {
550 grpc_timer_cancel(&t->ping_state.delayed_ping_timer);
551 }
552 if (t->have_next_bdp_ping_timer) {
553 grpc_timer_cancel(&t->next_bdp_ping_timer);
554 }
555 switch (t->keepalive_state) {
556 case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING:
557 grpc_timer_cancel(&t->keepalive_ping_timer);
558 break;
559 case GRPC_CHTTP2_KEEPALIVE_STATE_PINGING:
560 grpc_timer_cancel(&t->keepalive_ping_timer);
561 grpc_timer_cancel(&t->keepalive_watchdog_timer);
562 break;
563 case GRPC_CHTTP2_KEEPALIVE_STATE_DYING:
564 case GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED:
565 // keepalive timers are not set in these two states
566 break;
567 }
568
569 // flush writable stream list to avoid dangling references
570 grpc_chttp2_stream* s;
571 while (grpc_chttp2_list_pop_writable_stream(t, &s)) {
572 GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:close");
573 }
574 GPR_ASSERT(t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE);
575 grpc_endpoint_shutdown(t->ep, GRPC_ERROR_REF(error));
576 }
577 if (t->notify_on_receive_settings != nullptr) {
578 grpc_core::ExecCtx::Run(DEBUG_LOCATION, t->notify_on_receive_settings,
579 GRPC_ERROR_REF(error));
580 t->notify_on_receive_settings = nullptr;
581 }
582 GRPC_ERROR_UNREF(error);
583 }
584
585 #ifndef NDEBUG
grpc_chttp2_stream_ref(grpc_chttp2_stream * s,const char * reason)586 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s, const char* reason) {
587 grpc_stream_ref(s->refcount, reason);
588 }
grpc_chttp2_stream_unref(grpc_chttp2_stream * s,const char * reason)589 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s, const char* reason) {
590 grpc_stream_unref(s->refcount, reason);
591 }
592 #else
grpc_chttp2_stream_ref(grpc_chttp2_stream * s)593 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s) {
594 grpc_stream_ref(s->refcount);
595 }
grpc_chttp2_stream_unref(grpc_chttp2_stream * s)596 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s) {
597 grpc_stream_unref(s->refcount);
598 }
599 #endif
600
Reffer(grpc_chttp2_stream * s)601 grpc_chttp2_stream::Reffer::Reffer(grpc_chttp2_stream* s) {
602 // We reserve one 'active stream' that's dropped when the stream is
603 // read-closed. The others are for Chttp2IncomingByteStreams that are
604 // actively reading
605 GRPC_CHTTP2_STREAM_REF(s, "chttp2");
606 GRPC_CHTTP2_REF_TRANSPORT(s->t, "stream");
607 }
608
grpc_chttp2_stream(grpc_chttp2_transport * t,grpc_stream_refcount * refcount,const void * server_data,grpc_core::Arena * arena)609 grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t,
610 grpc_stream_refcount* refcount,
611 const void* server_data,
612 grpc_core::Arena* arena)
613 : t(t),
614 refcount(refcount),
615 reffer(this),
616 metadata_buffer{grpc_chttp2_incoming_metadata_buffer(arena),
617 grpc_chttp2_incoming_metadata_buffer(arena)} {
618 if (server_data) {
619 id = static_cast<uint32_t>(reinterpret_cast<uintptr_t>(server_data));
620 *t->accepting_stream = this;
621 grpc_chttp2_stream_map_add(&t->stream_map, id, this);
622 post_destructive_reclaimer(t);
623 }
624 if (t->flow_control->flow_control_enabled()) {
625 flow_control.Init<grpc_core::chttp2::StreamFlowControl>(
626 static_cast<grpc_core::chttp2::TransportFlowControl*>(
627 t->flow_control.get()),
628 this);
629 } else {
630 flow_control.Init<grpc_core::chttp2::StreamFlowControlDisabled>();
631 }
632
633 grpc_slice_buffer_init(&frame_storage);
634 grpc_slice_buffer_init(&unprocessed_incoming_frames_buffer);
635 grpc_slice_buffer_init(&flow_controlled_buffer);
636 GRPC_CLOSURE_INIT(&reset_byte_stream, ::reset_byte_stream, this, nullptr);
637 }
638
~grpc_chttp2_stream()639 grpc_chttp2_stream::~grpc_chttp2_stream() {
640 if (t->channelz_socket != nullptr) {
641 if ((t->is_client && eos_received) || (!t->is_client && eos_sent)) {
642 t->channelz_socket->RecordStreamSucceeded();
643 } else {
644 t->channelz_socket->RecordStreamFailed();
645 }
646 }
647
648 GPR_ASSERT((write_closed && read_closed) || id == 0);
649 if (id != 0) {
650 GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, id) == nullptr);
651 }
652
653 grpc_slice_buffer_destroy_internal(&unprocessed_incoming_frames_buffer);
654 grpc_slice_buffer_destroy_internal(&frame_storage);
655 if (stream_compression_method != GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) {
656 grpc_slice_buffer_destroy_internal(&compressed_data_buffer);
657 }
658 if (stream_decompression_method !=
659 GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
660 grpc_slice_buffer_destroy_internal(&decompressed_data_buffer);
661 }
662
663 for (int i = 0; i < STREAM_LIST_COUNT; i++) {
664 if (GPR_UNLIKELY(included[i])) {
665 gpr_log(GPR_ERROR, "%s stream %d still included in list %d",
666 t->is_client ? "client" : "server", id, i);
667 abort();
668 }
669 }
670
671 GPR_ASSERT(send_initial_metadata_finished == nullptr);
672 GPR_ASSERT(fetching_send_message == nullptr);
673 GPR_ASSERT(send_trailing_metadata_finished == nullptr);
674 GPR_ASSERT(recv_initial_metadata_ready == nullptr);
675 GPR_ASSERT(recv_message_ready == nullptr);
676 GPR_ASSERT(recv_trailing_metadata_finished == nullptr);
677 grpc_slice_buffer_destroy_internal(&flow_controlled_buffer);
678 GRPC_ERROR_UNREF(read_closed_error);
679 GRPC_ERROR_UNREF(write_closed_error);
680 GRPC_ERROR_UNREF(byte_stream_error);
681
682 flow_control.Destroy();
683
684 if (t->resource_user != nullptr) {
685 grpc_resource_user_free(t->resource_user, GRPC_RESOURCE_QUOTA_CALL_SIZE);
686 }
687
688 GRPC_CHTTP2_UNREF_TRANSPORT(t, "stream");
689 grpc_core::ExecCtx::Run(DEBUG_LOCATION, destroy_stream_arg, GRPC_ERROR_NONE);
690 }
691
init_stream(grpc_transport * gt,grpc_stream * gs,grpc_stream_refcount * refcount,const void * server_data,grpc_core::Arena * arena)692 static int init_stream(grpc_transport* gt, grpc_stream* gs,
693 grpc_stream_refcount* refcount, const void* server_data,
694 grpc_core::Arena* arena) {
695 GPR_TIMER_SCOPE("init_stream", 0);
696 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
697 new (gs) grpc_chttp2_stream(t, refcount, server_data, arena);
698 return 0;
699 }
700
destroy_stream_locked(void * sp,grpc_error *)701 static void destroy_stream_locked(void* sp, grpc_error* /*error*/) {
702 GPR_TIMER_SCOPE("destroy_stream", 0);
703 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(sp);
704 s->~grpc_chttp2_stream();
705 }
706
destroy_stream(grpc_transport * gt,grpc_stream * gs,grpc_closure * then_schedule_closure)707 static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
708 grpc_closure* then_schedule_closure) {
709 GPR_TIMER_SCOPE("destroy_stream", 0);
710 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
711 grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs);
712 if (s->stream_compression_method !=
713 GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS &&
714 s->stream_compression_ctx != nullptr) {
715 grpc_stream_compression_context_destroy(s->stream_compression_ctx);
716 s->stream_compression_ctx = nullptr;
717 }
718 if (s->stream_decompression_method !=
719 GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS &&
720 s->stream_decompression_ctx != nullptr) {
721 grpc_stream_compression_context_destroy(s->stream_decompression_ctx);
722 s->stream_decompression_ctx = nullptr;
723 }
724
725 s->destroy_stream_arg = then_schedule_closure;
726 t->combiner->Run(
727 GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s, nullptr),
728 GRPC_ERROR_NONE);
729 }
730
grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport * t,uint32_t id)731 grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t,
732 uint32_t id) {
733 if (t->accept_stream_cb == nullptr) {
734 return nullptr;
735 }
736 // Don't accept the stream if memory quota doesn't allow. Note that we should
737 // simply refuse the stream here instead of canceling the stream after it's
738 // accepted since the latter will create the call which costs much memory.
739 if (t->resource_user != nullptr &&
740 !grpc_resource_user_safe_alloc(t->resource_user,
741 GRPC_RESOURCE_QUOTA_CALL_SIZE)) {
742 gpr_log(GPR_ERROR, "Memory exhausted, rejecting the stream.");
743 grpc_chttp2_add_rst_stream_to_next_write(t, id, GRPC_HTTP2_REFUSED_STREAM,
744 nullptr);
745 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
746 return nullptr;
747 }
748 grpc_chttp2_stream* accepting = nullptr;
749 GPR_ASSERT(t->accepting_stream == nullptr);
750 t->accepting_stream = &accepting;
751 t->accept_stream_cb(t->accept_stream_cb_user_data, &t->base,
752 reinterpret_cast<void*>(id));
753 t->accepting_stream = nullptr;
754 return accepting;
755 }
756
757 //
758 // OUTPUT PROCESSING
759 //
760
write_state_name(grpc_chttp2_write_state st)761 static const char* write_state_name(grpc_chttp2_write_state st) {
762 switch (st) {
763 case GRPC_CHTTP2_WRITE_STATE_IDLE:
764 return "IDLE";
765 case GRPC_CHTTP2_WRITE_STATE_WRITING:
766 return "WRITING";
767 case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
768 return "WRITING+MORE";
769 }
770 GPR_UNREACHABLE_CODE(return "UNKNOWN");
771 }
772
set_write_state(grpc_chttp2_transport * t,grpc_chttp2_write_state st,const char * reason)773 static void set_write_state(grpc_chttp2_transport* t,
774 grpc_chttp2_write_state st, const char* reason) {
775 GRPC_CHTTP2_IF_TRACING(
776 gpr_log(GPR_INFO, "W:%p %s [%s] state %s -> %s [%s]", t,
777 t->is_client ? "CLIENT" : "SERVER", t->peer_string.c_str(),
778 write_state_name(t->write_state), write_state_name(st), reason));
779 t->write_state = st;
780 // If the state is being reset back to idle, it means a write was just
781 // finished. Make sure all the run_after_write closures are scheduled.
782 //
783 // This is also our chance to close the transport if the transport was marked
784 // to be closed after all writes finish (for example, if we received a go-away
785 // from peer while we had some pending writes)
786 if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) {
787 grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &t->run_after_write);
788 if (t->close_transport_on_writes_finished != nullptr) {
789 grpc_error* err = t->close_transport_on_writes_finished;
790 t->close_transport_on_writes_finished = nullptr;
791 close_transport_locked(t, err);
792 }
793 }
794 }
795
inc_initiate_write_reason(grpc_chttp2_initiate_write_reason reason)796 static void inc_initiate_write_reason(
797 grpc_chttp2_initiate_write_reason reason) {
798 switch (reason) {
799 case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE:
800 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_INITIAL_WRITE();
801 break;
802 case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM:
803 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_START_NEW_STREAM();
804 break;
805 case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE:
806 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_MESSAGE();
807 break;
808 case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA:
809 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_INITIAL_METADATA();
810 break;
811 case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA:
812 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_TRAILING_METADATA();
813 break;
814 case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING:
815 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RETRY_SEND_PING();
816 break;
817 case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS:
818 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CONTINUE_PINGS();
819 break;
820 case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT:
821 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_GOAWAY_SENT();
822 break;
823 case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM:
824 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RST_STREAM();
825 break;
826 case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API:
827 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CLOSE_FROM_API();
828 break;
829 case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL:
830 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_STREAM_FLOW_CONTROL();
831 break;
832 case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL:
833 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL();
834 break;
835 case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS:
836 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_SETTINGS();
837 break;
838 case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING:
839 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_SETTING();
840 break;
841 case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE:
842 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_UPDATE();
843 break;
844 case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING:
845 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_APPLICATION_PING();
846 break;
847 case GRPC_CHTTP2_INITIATE_WRITE_BDP_PING:
848 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_BDP_ESTIMATOR_PING();
849 break;
850 case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING:
851 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_KEEPALIVE_PING();
852 break;
853 case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED:
854 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL_UNSTALLED();
855 break;
856 case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE:
857 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_PING_RESPONSE();
858 break;
859 case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM:
860 GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FORCE_RST_STREAM();
861 break;
862 }
863 }
864
grpc_chttp2_initiate_write(grpc_chttp2_transport * t,grpc_chttp2_initiate_write_reason reason)865 void grpc_chttp2_initiate_write(grpc_chttp2_transport* t,
866 grpc_chttp2_initiate_write_reason reason) {
867 GPR_TIMER_SCOPE("grpc_chttp2_initiate_write", 0);
868
869 switch (t->write_state) {
870 case GRPC_CHTTP2_WRITE_STATE_IDLE:
871 inc_initiate_write_reason(reason);
872 set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING,
873 grpc_chttp2_initiate_write_reason_string(reason));
874 GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
875 // Note that the 'write_action_begin_locked' closure is being scheduled
876 // on the 'finally_scheduler' of t->combiner. This means that
877 // 'write_action_begin_locked' is called only *after* all the other
878 // closures (some of which are potentially initiating more writes on the
879 // transport) are executed on the t->combiner.
880 //
881 // The reason for scheduling on finally_scheduler is to make sure we batch
882 // as many writes as possible. 'write_action_begin_locked' is the function
883 // that gathers all the relevant bytes (which are at various places in the
884 // grpc_chttp2_transport structure) and append them to 'outbuf' field in
885 // grpc_chttp2_transport thereby batching what would have been potentially
886 // multiple write operations.
887 //
888 // Also, 'write_action_begin_locked' only gathers the bytes into outbuf.
889 // It does not call the endpoint to write the bytes. That is done by the
890 // 'write_action' (which is scheduled by 'write_action_begin_locked')
891 t->combiner->FinallyRun(
892 GRPC_CLOSURE_INIT(&t->write_action_begin_locked,
893 write_action_begin_locked, t, nullptr),
894 GRPC_ERROR_NONE);
895 break;
896 case GRPC_CHTTP2_WRITE_STATE_WRITING:
897 set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
898 grpc_chttp2_initiate_write_reason_string(reason));
899 break;
900 case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
901 break;
902 }
903 }
904
grpc_chttp2_mark_stream_writable(grpc_chttp2_transport * t,grpc_chttp2_stream * s)905 void grpc_chttp2_mark_stream_writable(grpc_chttp2_transport* t,
906 grpc_chttp2_stream* s) {
907 if (t->closed_with_error == GRPC_ERROR_NONE &&
908 grpc_chttp2_list_add_writable_stream(t, s)) {
909 GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become");
910 }
911 }
912
begin_writing_desc(bool partial)913 static const char* begin_writing_desc(bool partial) {
914 if (partial) {
915 return "begin partial write in background";
916 } else {
917 return "begin write in current thread";
918 }
919 }
920
write_action_begin_locked(void * gt,grpc_error *)921 static void write_action_begin_locked(void* gt, grpc_error* /*error_ignored*/) {
922 GPR_TIMER_SCOPE("write_action_begin_locked", 0);
923 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(gt);
924 GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE);
925 grpc_chttp2_begin_write_result r;
926 if (t->closed_with_error != GRPC_ERROR_NONE) {
927 r.writing = false;
928 } else {
929 r = grpc_chttp2_begin_write(t);
930 }
931 if (r.writing) {
932 if (r.partial) {
933 GRPC_STATS_INC_HTTP2_PARTIAL_WRITES();
934 }
935 set_write_state(t,
936 r.partial ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE
937 : GRPC_CHTTP2_WRITE_STATE_WRITING,
938 begin_writing_desc(r.partial));
939 write_action(t, GRPC_ERROR_NONE);
940 if (t->reading_paused_on_pending_induced_frames) {
941 GPR_ASSERT(t->num_pending_induced_frames == 0);
942 // We had paused reading, because we had many induced frames (SETTINGS
943 // ACK, PINGS ACK and RST_STREAMS) pending in t->qbuf. Now that we have
944 // been able to flush qbuf, we can resume reading.
945 GRPC_CHTTP2_IF_TRACING(gpr_log(
946 GPR_INFO,
947 "transport %p : Resuming reading after being paused due to too "
948 "many unwritten SETTINGS ACK, PINGS ACK and RST_STREAM frames",
949 t));
950 t->reading_paused_on_pending_induced_frames = false;
951 continue_read_action_locked(t);
952 }
953 } else {
954 GRPC_STATS_INC_HTTP2_SPURIOUS_WRITES_BEGUN();
955 set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "begin writing nothing");
956 GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing");
957 }
958 }
959
write_action(void * gt,grpc_error *)960 static void write_action(void* gt, grpc_error* /*error*/) {
961 GPR_TIMER_SCOPE("write_action", 0);
962 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(gt);
963 void* cl = t->cl;
964 t->cl = nullptr;
965 grpc_endpoint_write(
966 t->ep, &t->outbuf,
967 GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end, t,
968 grpc_schedule_on_exec_ctx),
969 cl);
970 }
971
write_action_end(void * tp,grpc_error * error)972 static void write_action_end(void* tp, grpc_error* error) {
973 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
974 t->combiner->Run(GRPC_CLOSURE_INIT(&t->write_action_end_locked,
975 write_action_end_locked, t, nullptr),
976 GRPC_ERROR_REF(error));
977 }
978
979 // Callback from the grpc_endpoint after bytes have been written by calling
980 // sendmsg
write_action_end_locked(void * tp,grpc_error * error)981 static void write_action_end_locked(void* tp, grpc_error* error) {
982 GPR_TIMER_SCOPE("terminate_writing_with_lock", 0);
983 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
984
985 bool closed = false;
986 if (error != GRPC_ERROR_NONE) {
987 close_transport_locked(t, GRPC_ERROR_REF(error));
988 closed = true;
989 }
990
991 if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED) {
992 t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SENT;
993 closed = true;
994 if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
995 close_transport_locked(
996 t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("goaway sent"));
997 }
998 }
999
1000 switch (t->write_state) {
1001 case GRPC_CHTTP2_WRITE_STATE_IDLE:
1002 GPR_UNREACHABLE_CODE(break);
1003 case GRPC_CHTTP2_WRITE_STATE_WRITING:
1004 GPR_TIMER_MARK("state=writing", 0);
1005 set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "finish writing");
1006 break;
1007 case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
1008 GPR_TIMER_MARK("state=writing_stale_no_poller", 0);
1009 set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, "continue writing");
1010 GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
1011 // If the transport is closed, we will retry writing on the endpoint
1012 // and next write may contain part of the currently serialized frames.
1013 // So, we should only call the run_after_write callbacks when the next
1014 // write finishes, or the callbacks will be invoked when the stream is
1015 // closed.
1016 if (!closed) {
1017 grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &t->run_after_write);
1018 }
1019 t->combiner->FinallyRun(
1020 GRPC_CLOSURE_INIT(&t->write_action_begin_locked,
1021 write_action_begin_locked, t, nullptr),
1022 GRPC_ERROR_NONE);
1023 break;
1024 }
1025
1026 grpc_chttp2_end_write(t, GRPC_ERROR_REF(error));
1027 GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing");
1028 }
1029
1030 // Dirties an HTTP2 setting to be sent out next time a writing path occurs.
1031 // If the change needs to occur immediately, manually initiate a write.
queue_setting_update(grpc_chttp2_transport * t,grpc_chttp2_setting_id id,uint32_t value)1032 static void queue_setting_update(grpc_chttp2_transport* t,
1033 grpc_chttp2_setting_id id, uint32_t value) {
1034 const grpc_chttp2_setting_parameters* sp =
1035 &grpc_chttp2_settings_parameters[id];
1036 uint32_t use_value = GPR_CLAMP(value, sp->min_value, sp->max_value);
1037 if (use_value != value) {
1038 gpr_log(GPR_INFO, "Requested parameter %s clamped from %d to %d", sp->name,
1039 value, use_value);
1040 }
1041 if (use_value != t->settings[GRPC_LOCAL_SETTINGS][id]) {
1042 t->settings[GRPC_LOCAL_SETTINGS][id] = use_value;
1043 t->dirtied_local_settings = true;
1044 }
1045 }
1046
grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport * t,uint32_t goaway_error,uint32_t last_stream_id,const grpc_slice & goaway_text)1047 void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
1048 uint32_t goaway_error,
1049 uint32_t last_stream_id,
1050 const grpc_slice& goaway_text) {
1051 // Discard the error from a previous goaway frame (if any)
1052 if (t->goaway_error != GRPC_ERROR_NONE) {
1053 GRPC_ERROR_UNREF(t->goaway_error);
1054 }
1055 t->goaway_error = grpc_error_set_str(
1056 grpc_error_set_int(
1057 grpc_error_set_int(
1058 GRPC_ERROR_CREATE_FROM_STATIC_STRING("GOAWAY received"),
1059 GRPC_ERROR_INT_HTTP2_ERROR, static_cast<intptr_t>(goaway_error)),
1060 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
1061 GRPC_ERROR_STR_RAW_BYTES, goaway_text);
1062
1063 GRPC_CHTTP2_IF_TRACING(
1064 gpr_log(GPR_INFO, "transport %p got goaway with last stream id %d", t,
1065 last_stream_id));
1066 // We want to log this irrespective of whether http tracing is enabled if we
1067 // received a GOAWAY with a non NO_ERROR code.
1068 if (goaway_error != GRPC_HTTP2_NO_ERROR) {
1069 gpr_log(GPR_INFO, "%s: Got goaway [%d] err=%s", t->peer_string.c_str(),
1070 goaway_error, grpc_error_string(t->goaway_error));
1071 }
1072 absl::Status status = grpc_error_to_absl_status(t->goaway_error);
1073 // When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug
1074 // data equal to "too_many_pings", it should log the occurrence at a log level
1075 // that is enabled by default and double the configured KEEPALIVE_TIME used
1076 // for new connections on that channel.
1077 if (GPR_UNLIKELY(t->is_client &&
1078 goaway_error == GRPC_HTTP2_ENHANCE_YOUR_CALM &&
1079 grpc_slice_str_cmp(goaway_text, "too_many_pings") == 0)) {
1080 gpr_log(GPR_ERROR,
1081 "Received a GOAWAY with error code ENHANCE_YOUR_CALM and debug "
1082 "data equal to \"too_many_pings\"");
1083 double current_keepalive_time_ms = static_cast<double>(t->keepalive_time);
1084 constexpr int max_keepalive_time_ms =
1085 INT_MAX / KEEPALIVE_TIME_BACKOFF_MULTIPLIER;
1086 t->keepalive_time =
1087 current_keepalive_time_ms > static_cast<double>(max_keepalive_time_ms)
1088 ? GRPC_MILLIS_INF_FUTURE
1089 : static_cast<grpc_millis>(current_keepalive_time_ms *
1090 KEEPALIVE_TIME_BACKOFF_MULTIPLIER);
1091 status.SetPayload(grpc_core::kKeepaliveThrottlingKey,
1092 absl::Cord(std::to_string(t->keepalive_time)));
1093 }
1094 // lie: use transient failure from the transport to indicate goaway has been
1095 // received.
1096 connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE, status,
1097 "got_goaway");
1098 }
1099
maybe_start_some_streams(grpc_chttp2_transport * t)1100 static void maybe_start_some_streams(grpc_chttp2_transport* t) {
1101 grpc_chttp2_stream* s;
1102 // cancel out streams that haven't yet started if we have received a GOAWAY
1103 if (t->goaway_error != GRPC_ERROR_NONE) {
1104 while (grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1105 grpc_chttp2_cancel_stream(
1106 t, s,
1107 grpc_error_set_int(
1108 GRPC_ERROR_CREATE_FROM_STATIC_STRING("GOAWAY received"),
1109 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1110 }
1111 return;
1112 }
1113 // start streams where we have free grpc_chttp2_stream ids and free
1114 // * concurrency
1115 while (t->next_stream_id <= MAX_CLIENT_STREAM_ID &&
1116 grpc_chttp2_stream_map_size(&t->stream_map) <
1117 t->settings[GRPC_PEER_SETTINGS]
1118 [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] &&
1119 grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1120 // safe since we can't (legally) be parsing this stream yet
1121 GRPC_CHTTP2_IF_TRACING(gpr_log(
1122 GPR_INFO,
1123 "HTTP:%s: Transport %p allocating new grpc_chttp2_stream %p to id %d",
1124 t->is_client ? "CLI" : "SVR", t, s, t->next_stream_id));
1125
1126 GPR_ASSERT(s->id == 0);
1127 s->id = t->next_stream_id;
1128 t->next_stream_id += 2;
1129
1130 if (t->next_stream_id >= MAX_CLIENT_STREAM_ID) {
1131 connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE,
1132 absl::Status(absl::StatusCode::kUnavailable,
1133 "Transport Stream IDs exhausted"),
1134 "no_more_stream_ids");
1135 }
1136
1137 grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
1138 post_destructive_reclaimer(t);
1139 grpc_chttp2_mark_stream_writable(t, s);
1140 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM);
1141 }
1142 // cancel out streams that will never be started
1143 if (t->next_stream_id >= MAX_CLIENT_STREAM_ID) {
1144 while (grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1145 grpc_chttp2_cancel_stream(
1146 t, s,
1147 grpc_error_set_int(
1148 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream IDs exhausted"),
1149 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1150 }
1151 }
1152 }
1153
1154 // Flag that this closure barrier may be covering a write in a pollset, and so
1155 // we should not complete this closure until we can prove that the write got
1156 // scheduled
1157 #define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 0)
1158 // First bit of the reference count, stored in the high order bits (with the low
1159 // bits being used for flags defined above)
1160 #define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16)
1161
add_closure_barrier(grpc_closure * closure)1162 static grpc_closure* add_closure_barrier(grpc_closure* closure) {
1163 closure->next_data.scratch += CLOSURE_BARRIER_FIRST_REF_BIT;
1164 return closure;
1165 }
1166
null_then_sched_closure(grpc_closure ** closure)1167 static void null_then_sched_closure(grpc_closure** closure) {
1168 grpc_closure* c = *closure;
1169 *closure = nullptr;
1170 grpc_core::ExecCtx::Run(DEBUG_LOCATION, c, GRPC_ERROR_NONE);
1171 }
1172
grpc_chttp2_complete_closure_step(grpc_chttp2_transport * t,grpc_chttp2_stream *,grpc_closure ** pclosure,grpc_error * error,const char * desc)1173 void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
1174 grpc_chttp2_stream* /*s*/,
1175 grpc_closure** pclosure,
1176 grpc_error* error, const char* desc) {
1177 grpc_closure* closure = *pclosure;
1178 *pclosure = nullptr;
1179 if (closure == nullptr) {
1180 GRPC_ERROR_UNREF(error);
1181 return;
1182 }
1183 closure->next_data.scratch -= CLOSURE_BARRIER_FIRST_REF_BIT;
1184 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1185 const char* errstr = grpc_error_string(error);
1186 gpr_log(
1187 GPR_INFO,
1188 "complete_closure_step: t=%p %p refs=%d flags=0x%04x desc=%s err=%s "
1189 "write_state=%s",
1190 t, closure,
1191 static_cast<int>(closure->next_data.scratch /
1192 CLOSURE_BARRIER_FIRST_REF_BIT),
1193 static_cast<int>(closure->next_data.scratch %
1194 CLOSURE_BARRIER_FIRST_REF_BIT),
1195 desc, errstr, write_state_name(t->write_state));
1196 }
1197 if (error != GRPC_ERROR_NONE) {
1198 if (closure->error_data.error == GRPC_ERROR_NONE) {
1199 closure->error_data.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1200 "Error in HTTP transport completing operation");
1201 closure->error_data.error = grpc_error_set_str(
1202 closure->error_data.error, GRPC_ERROR_STR_TARGET_ADDRESS,
1203 grpc_slice_from_copied_string(t->peer_string.c_str()));
1204 }
1205 closure->error_data.error =
1206 grpc_error_add_child(closure->error_data.error, error);
1207 }
1208 if (closure->next_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) {
1209 if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) ||
1210 !(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) {
1211 // Using GRPC_CLOSURE_SCHED instead of GRPC_CLOSURE_RUN to avoid running
1212 // closures earlier than when it is safe to do so.
1213 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure,
1214 closure->error_data.error);
1215 } else {
1216 grpc_closure_list_append(&t->run_after_write, closure,
1217 closure->error_data.error);
1218 }
1219 }
1220 }
1221
contains_non_ok_status(grpc_metadata_batch * batch)1222 static bool contains_non_ok_status(grpc_metadata_batch* batch) {
1223 if (batch->idx.named.grpc_status != nullptr) {
1224 return !grpc_mdelem_static_value_eq(batch->idx.named.grpc_status->md,
1225 GRPC_MDELEM_GRPC_STATUS_0);
1226 }
1227 return false;
1228 }
1229
maybe_become_writable_due_to_send_msg(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1230 static void maybe_become_writable_due_to_send_msg(grpc_chttp2_transport* t,
1231 grpc_chttp2_stream* s) {
1232 if (s->id != 0 && (!s->write_buffering ||
1233 s->flow_controlled_buffer.length > t->write_buffer_size)) {
1234 grpc_chttp2_mark_stream_writable(t, s);
1235 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE);
1236 }
1237 }
1238
add_fetched_slice_locked(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1239 static void add_fetched_slice_locked(grpc_chttp2_transport* t,
1240 grpc_chttp2_stream* s) {
1241 s->fetched_send_message_length +=
1242 static_cast<uint32_t> GRPC_SLICE_LENGTH(s->fetching_slice);
1243 grpc_slice_buffer_add(&s->flow_controlled_buffer, s->fetching_slice);
1244 maybe_become_writable_due_to_send_msg(t, s);
1245 }
1246
continue_fetching_send_locked(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1247 static void continue_fetching_send_locked(grpc_chttp2_transport* t,
1248 grpc_chttp2_stream* s) {
1249 for (;;) {
1250 if (s->fetching_send_message == nullptr) {
1251 // Stream was cancelled before message fetch completed
1252 abort(); /* TODO(ctiller): what cleanup here? */
1253 return; /* early out */
1254 }
1255 if (s->fetched_send_message_length == s->fetching_send_message->length()) {
1256 int64_t notify_offset = s->next_message_end_offset;
1257 if (notify_offset <= s->flow_controlled_bytes_written) {
1258 grpc_chttp2_complete_closure_step(
1259 t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE,
1260 "fetching_send_message_finished");
1261 } else {
1262 grpc_chttp2_write_cb* cb = t->write_cb_pool;
1263 if (cb == nullptr) {
1264 cb = static_cast<grpc_chttp2_write_cb*>(gpr_malloc(sizeof(*cb)));
1265 } else {
1266 t->write_cb_pool = cb->next;
1267 }
1268 cb->call_at_byte = notify_offset;
1269 cb->closure = s->fetching_send_message_finished;
1270 s->fetching_send_message_finished = nullptr;
1271 grpc_chttp2_write_cb** list =
1272 s->fetching_send_message->flags() & GRPC_WRITE_THROUGH
1273 ? &s->on_write_finished_cbs
1274 : &s->on_flow_controlled_cbs;
1275 cb->next = *list;
1276 *list = cb;
1277 }
1278 s->fetching_send_message.reset();
1279 return; /* early out */
1280 } else if (s->fetching_send_message->Next(
1281 UINT32_MAX, GRPC_CLOSURE_INIT(&s->complete_fetch_locked,
1282 ::complete_fetch, s,
1283 grpc_schedule_on_exec_ctx))) {
1284 grpc_error* error = s->fetching_send_message->Pull(&s->fetching_slice);
1285 if (error != GRPC_ERROR_NONE) {
1286 s->fetching_send_message.reset();
1287 grpc_chttp2_cancel_stream(t, s, error);
1288 } else {
1289 add_fetched_slice_locked(t, s);
1290 }
1291 }
1292 }
1293 }
1294
complete_fetch(void * gs,grpc_error * error)1295 static void complete_fetch(void* gs, grpc_error* error) {
1296 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(gs);
1297 s->t->combiner->Run(GRPC_CLOSURE_INIT(&s->complete_fetch_locked,
1298 ::complete_fetch_locked, s, nullptr),
1299 GRPC_ERROR_REF(error));
1300 }
1301
complete_fetch_locked(void * gs,grpc_error * error)1302 static void complete_fetch_locked(void* gs, grpc_error* error) {
1303 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(gs);
1304 grpc_chttp2_transport* t = s->t;
1305 if (error == GRPC_ERROR_NONE) {
1306 error = s->fetching_send_message->Pull(&s->fetching_slice);
1307 if (error == GRPC_ERROR_NONE) {
1308 add_fetched_slice_locked(t, s);
1309 continue_fetching_send_locked(t, s);
1310 }
1311 }
1312 if (error != GRPC_ERROR_NONE) {
1313 s->fetching_send_message.reset();
1314 grpc_chttp2_cancel_stream(t, s, error);
1315 }
1316 }
1317
log_metadata(const grpc_metadata_batch * md_batch,uint32_t id,bool is_client,bool is_initial)1318 static void log_metadata(const grpc_metadata_batch* md_batch, uint32_t id,
1319 bool is_client, bool is_initial) {
1320 for (grpc_linked_mdelem* md = md_batch->list.head; md != nullptr;
1321 md = md->next) {
1322 char* key = grpc_slice_to_c_string(GRPC_MDKEY(md->md));
1323 char* value = grpc_slice_to_c_string(GRPC_MDVALUE(md->md));
1324 gpr_log(GPR_INFO, "HTTP:%d:%s:%s: %s: %s", id, is_initial ? "HDR" : "TRL",
1325 is_client ? "CLI" : "SVR", key, value);
1326 gpr_free(key);
1327 gpr_free(value);
1328 }
1329 }
1330
perform_stream_op_locked(void * stream_op,grpc_error *)1331 static void perform_stream_op_locked(void* stream_op,
1332 grpc_error* /*error_ignored*/) {
1333 GPR_TIMER_SCOPE("perform_stream_op_locked", 0);
1334
1335 grpc_transport_stream_op_batch* op =
1336 static_cast<grpc_transport_stream_op_batch*>(stream_op);
1337 grpc_chttp2_stream* s =
1338 static_cast<grpc_chttp2_stream*>(op->handler_private.extra_arg);
1339 grpc_transport_stream_op_batch_payload* op_payload = op->payload;
1340 grpc_chttp2_transport* t = s->t;
1341
1342 GRPC_STATS_INC_HTTP2_OP_BATCHES();
1343
1344 s->context = op->payload->context;
1345 s->traced = op->is_traced;
1346 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1347 gpr_log(GPR_INFO, "perform_stream_op_locked: %s; on_complete = %p",
1348 grpc_transport_stream_op_batch_string(op).c_str(), op->on_complete);
1349 if (op->send_initial_metadata) {
1350 log_metadata(op_payload->send_initial_metadata.send_initial_metadata,
1351 s->id, t->is_client, true);
1352 }
1353 if (op->send_trailing_metadata) {
1354 log_metadata(op_payload->send_trailing_metadata.send_trailing_metadata,
1355 s->id, t->is_client, false);
1356 }
1357 }
1358
1359 grpc_closure* on_complete = op->on_complete;
1360 // on_complete will be null if and only if there are no send ops in the batch.
1361 if (on_complete != nullptr) {
1362 // This batch has send ops. Use final_data as a barrier until enqueue time;
1363 // the initial counter is dropped at the end of this function.
1364 on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT;
1365 on_complete->error_data.error = GRPC_ERROR_NONE;
1366 }
1367
1368 if (op->cancel_stream) {
1369 GRPC_STATS_INC_HTTP2_OP_CANCEL();
1370 grpc_chttp2_cancel_stream(t, s, op_payload->cancel_stream.cancel_error);
1371 }
1372
1373 if (op->send_initial_metadata) {
1374 if (t->is_client && t->channelz_socket != nullptr) {
1375 t->channelz_socket->RecordStreamStartedFromLocal();
1376 }
1377 GRPC_STATS_INC_HTTP2_OP_SEND_INITIAL_METADATA();
1378 GPR_ASSERT(s->send_initial_metadata_finished == nullptr);
1379 on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
1380
1381 // Identify stream compression
1382 if (op_payload->send_initial_metadata.send_initial_metadata->idx.named
1383 .content_encoding == nullptr ||
1384 grpc_stream_compression_method_parse(
1385 GRPC_MDVALUE(
1386 op_payload->send_initial_metadata.send_initial_metadata->idx
1387 .named.content_encoding->md),
1388 true, &s->stream_compression_method) == 0) {
1389 s->stream_compression_method = GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS;
1390 }
1391 if (s->stream_compression_method !=
1392 GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) {
1393 s->uncompressed_data_size = 0;
1394 s->stream_compression_ctx = nullptr;
1395 grpc_slice_buffer_init(&s->compressed_data_buffer);
1396 }
1397 s->send_initial_metadata_finished = add_closure_barrier(on_complete);
1398 s->send_initial_metadata =
1399 op_payload->send_initial_metadata.send_initial_metadata;
1400 if (t->is_client) {
1401 s->deadline = GPR_MIN(s->deadline, s->send_initial_metadata->deadline);
1402 }
1403 if (contains_non_ok_status(s->send_initial_metadata)) {
1404 s->seen_error = true;
1405 }
1406 if (!s->write_closed) {
1407 if (t->is_client) {
1408 if (t->closed_with_error == GRPC_ERROR_NONE) {
1409 GPR_ASSERT(s->id == 0);
1410 grpc_chttp2_list_add_waiting_for_concurrency(t, s);
1411 maybe_start_some_streams(t);
1412 } else {
1413 grpc_chttp2_cancel_stream(
1414 t, s,
1415 grpc_error_set_int(
1416 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1417 "Transport closed", &t->closed_with_error, 1),
1418 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1419 }
1420 } else {
1421 GPR_ASSERT(s->id != 0);
1422 grpc_chttp2_mark_stream_writable(t, s);
1423 if (!(op->send_message &&
1424 (op->payload->send_message.send_message->flags() &
1425 GRPC_WRITE_BUFFER_HINT))) {
1426 grpc_chttp2_initiate_write(
1427 t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA);
1428 }
1429 }
1430 } else {
1431 s->send_initial_metadata = nullptr;
1432 grpc_chttp2_complete_closure_step(
1433 t, s, &s->send_initial_metadata_finished,
1434 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1435 "Attempt to send initial metadata after stream was closed",
1436 &s->write_closed_error, 1),
1437 "send_initial_metadata_finished");
1438 }
1439 if (op_payload->send_initial_metadata.peer_string != nullptr) {
1440 gpr_atm_rel_store(op_payload->send_initial_metadata.peer_string,
1441 (gpr_atm)t->peer_string.c_str());
1442 }
1443 }
1444
1445 if (op->send_message) {
1446 GRPC_STATS_INC_HTTP2_OP_SEND_MESSAGE();
1447 t->num_messages_in_next_write++;
1448 GRPC_STATS_INC_HTTP2_SEND_MESSAGE_SIZE(
1449 op->payload->send_message.send_message->length());
1450 on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
1451 s->fetching_send_message_finished = add_closure_barrier(op->on_complete);
1452 if (s->write_closed) {
1453 op->payload->send_message.stream_write_closed = true;
1454 // We should NOT return an error here, so as to avoid a cancel OP being
1455 // started. The surface layer will notice that the stream has been closed
1456 // for writes and fail the send message op.
1457 op->payload->send_message.send_message.reset();
1458 grpc_chttp2_complete_closure_step(
1459 t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE,
1460 "fetching_send_message_finished");
1461 } else {
1462 GPR_ASSERT(s->fetching_send_message == nullptr);
1463 uint8_t* frame_hdr = grpc_slice_buffer_tiny_add(
1464 &s->flow_controlled_buffer, GRPC_HEADER_SIZE_IN_BYTES);
1465 uint32_t flags = op_payload->send_message.send_message->flags();
1466 frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0;
1467 size_t len = op_payload->send_message.send_message->length();
1468 frame_hdr[1] = static_cast<uint8_t>(len >> 24);
1469 frame_hdr[2] = static_cast<uint8_t>(len >> 16);
1470 frame_hdr[3] = static_cast<uint8_t>(len >> 8);
1471 frame_hdr[4] = static_cast<uint8_t>(len);
1472 s->fetching_send_message =
1473 std::move(op_payload->send_message.send_message);
1474 s->fetched_send_message_length = 0;
1475 s->next_message_end_offset =
1476 s->flow_controlled_bytes_written +
1477 static_cast<int64_t>(s->flow_controlled_buffer.length) +
1478 static_cast<int64_t>(len);
1479 if (flags & GRPC_WRITE_BUFFER_HINT) {
1480 s->next_message_end_offset -= t->write_buffer_size;
1481 s->write_buffering = true;
1482 } else {
1483 s->write_buffering = false;
1484 }
1485 continue_fetching_send_locked(t, s);
1486 maybe_become_writable_due_to_send_msg(t, s);
1487 }
1488 }
1489
1490 if (op->send_trailing_metadata) {
1491 GRPC_STATS_INC_HTTP2_OP_SEND_TRAILING_METADATA();
1492 GPR_ASSERT(s->send_trailing_metadata_finished == nullptr);
1493 on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
1494 s->send_trailing_metadata_finished = add_closure_barrier(on_complete);
1495 s->send_trailing_metadata =
1496 op_payload->send_trailing_metadata.send_trailing_metadata;
1497 s->sent_trailing_metadata_op = op_payload->send_trailing_metadata.sent;
1498 s->write_buffering = false;
1499 if (contains_non_ok_status(s->send_trailing_metadata)) {
1500 s->seen_error = true;
1501 }
1502 if (s->write_closed) {
1503 s->send_trailing_metadata = nullptr;
1504 s->sent_trailing_metadata_op = nullptr;
1505 grpc_chttp2_complete_closure_step(
1506 t, s, &s->send_trailing_metadata_finished,
1507 grpc_metadata_batch_is_empty(
1508 op->payload->send_trailing_metadata.send_trailing_metadata)
1509 ? GRPC_ERROR_NONE
1510 : GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1511 "Attempt to send trailing metadata after "
1512 "stream was closed"),
1513 "send_trailing_metadata_finished");
1514 } else if (s->id != 0) {
1515 // TODO(ctiller): check if there's flow control for any outstanding
1516 // bytes before going writable
1517 grpc_chttp2_mark_stream_writable(t, s);
1518 grpc_chttp2_initiate_write(
1519 t, GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA);
1520 }
1521 }
1522
1523 if (op->recv_initial_metadata) {
1524 GRPC_STATS_INC_HTTP2_OP_RECV_INITIAL_METADATA();
1525 GPR_ASSERT(s->recv_initial_metadata_ready == nullptr);
1526 s->recv_initial_metadata_ready =
1527 op_payload->recv_initial_metadata.recv_initial_metadata_ready;
1528 s->recv_initial_metadata =
1529 op_payload->recv_initial_metadata.recv_initial_metadata;
1530 s->trailing_metadata_available =
1531 op_payload->recv_initial_metadata.trailing_metadata_available;
1532 if (op_payload->recv_initial_metadata.peer_string != nullptr) {
1533 gpr_atm_rel_store(op_payload->recv_initial_metadata.peer_string,
1534 (gpr_atm)t->peer_string.c_str());
1535 }
1536 grpc_chttp2_maybe_complete_recv_initial_metadata(t, s);
1537 }
1538
1539 if (op->recv_message) {
1540 GRPC_STATS_INC_HTTP2_OP_RECV_MESSAGE();
1541 size_t before = 0;
1542 GPR_ASSERT(s->recv_message_ready == nullptr);
1543 GPR_ASSERT(!s->pending_byte_stream);
1544 s->recv_message_ready = op_payload->recv_message.recv_message_ready;
1545 s->recv_message = op_payload->recv_message.recv_message;
1546 if (s->id != 0) {
1547 if (!s->read_closed) {
1548 before = s->frame_storage.length +
1549 s->unprocessed_incoming_frames_buffer.length;
1550 }
1551 }
1552 grpc_chttp2_maybe_complete_recv_message(t, s);
1553 if (s->id != 0) {
1554 if (!s->read_closed && s->frame_storage.length == 0) {
1555 size_t after = s->frame_storage.length +
1556 s->unprocessed_incoming_frames_buffer_cached_length;
1557 s->flow_control->IncomingByteStreamUpdate(GRPC_HEADER_SIZE_IN_BYTES,
1558 before - after);
1559 grpc_chttp2_act_on_flowctl_action(s->flow_control->MakeAction(), t, s);
1560 }
1561 }
1562 }
1563
1564 if (op->recv_trailing_metadata) {
1565 GRPC_STATS_INC_HTTP2_OP_RECV_TRAILING_METADATA();
1566 GPR_ASSERT(s->collecting_stats == nullptr);
1567 s->collecting_stats = op_payload->recv_trailing_metadata.collect_stats;
1568 GPR_ASSERT(s->recv_trailing_metadata_finished == nullptr);
1569 s->recv_trailing_metadata_finished =
1570 op_payload->recv_trailing_metadata.recv_trailing_metadata_ready;
1571 s->recv_trailing_metadata =
1572 op_payload->recv_trailing_metadata.recv_trailing_metadata;
1573 s->final_metadata_requested = true;
1574 grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
1575 }
1576
1577 if (on_complete != nullptr) {
1578 grpc_chttp2_complete_closure_step(t, s, &on_complete, GRPC_ERROR_NONE,
1579 "op->on_complete");
1580 }
1581
1582 GRPC_CHTTP2_STREAM_UNREF(s, "perform_stream_op");
1583 }
1584
perform_stream_op(grpc_transport * gt,grpc_stream * gs,grpc_transport_stream_op_batch * op)1585 static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
1586 grpc_transport_stream_op_batch* op) {
1587 GPR_TIMER_SCOPE("perform_stream_op", 0);
1588 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
1589 grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs);
1590
1591 if (!t->is_client) {
1592 if (op->send_initial_metadata) {
1593 grpc_millis deadline =
1594 op->payload->send_initial_metadata.send_initial_metadata->deadline;
1595 GPR_ASSERT(deadline == GRPC_MILLIS_INF_FUTURE);
1596 }
1597 if (op->send_trailing_metadata) {
1598 grpc_millis deadline =
1599 op->payload->send_trailing_metadata.send_trailing_metadata->deadline;
1600 GPR_ASSERT(deadline == GRPC_MILLIS_INF_FUTURE);
1601 }
1602 }
1603
1604 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1605 gpr_log(GPR_INFO, "perform_stream_op[s=%p]: %s", s,
1606 grpc_transport_stream_op_batch_string(op).c_str());
1607 }
1608
1609 GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op");
1610 op->handler_private.extra_arg = gs;
1611 t->combiner->Run(GRPC_CLOSURE_INIT(&op->handler_private.closure,
1612 perform_stream_op_locked, op, nullptr),
1613 GRPC_ERROR_NONE);
1614 }
1615
cancel_pings(grpc_chttp2_transport * t,grpc_error * error)1616 static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) {
1617 // callback remaining pings: they're not allowed to call into the transport,
1618 // and maybe they hold resources that need to be freed
1619 grpc_chttp2_ping_queue* pq = &t->ping_queue;
1620 GPR_ASSERT(error != GRPC_ERROR_NONE);
1621 for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) {
1622 grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error));
1623 grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &pq->lists[j]);
1624 }
1625 GRPC_ERROR_UNREF(error);
1626 }
1627
send_ping_locked(grpc_chttp2_transport * t,grpc_closure * on_initiate,grpc_closure * on_ack)1628 static void send_ping_locked(grpc_chttp2_transport* t,
1629 grpc_closure* on_initiate, grpc_closure* on_ack) {
1630 if (t->closed_with_error != GRPC_ERROR_NONE) {
1631 grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_initiate,
1632 GRPC_ERROR_REF(t->closed_with_error));
1633 grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_ack,
1634 GRPC_ERROR_REF(t->closed_with_error));
1635 return;
1636 }
1637 grpc_chttp2_ping_queue* pq = &t->ping_queue;
1638 grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE], on_initiate,
1639 GRPC_ERROR_NONE);
1640 grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT], on_ack,
1641 GRPC_ERROR_NONE);
1642 }
1643
1644 // Specialized form of send_ping_locked for keepalive ping. If there is already
1645 // a ping in progress, the keepalive ping would piggyback onto that ping,
1646 // instead of waiting for that ping to complete and then starting a new ping.
send_keepalive_ping_locked(grpc_chttp2_transport * t)1647 static void send_keepalive_ping_locked(grpc_chttp2_transport* t) {
1648 if (t->closed_with_error != GRPC_ERROR_NONE) {
1649 t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked,
1650 start_keepalive_ping_locked, t, nullptr),
1651 GRPC_ERROR_REF(t->closed_with_error));
1652 t->combiner->Run(
1653 GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked,
1654 finish_keepalive_ping_locked, t, nullptr),
1655 GRPC_ERROR_REF(t->closed_with_error));
1656 return;
1657 }
1658 grpc_chttp2_ping_queue* pq = &t->ping_queue;
1659 if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) {
1660 // There is a ping in flight. Add yourself to the inflight closure list.
1661 t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked,
1662 start_keepalive_ping_locked, t, nullptr),
1663 GRPC_ERROR_REF(t->closed_with_error));
1664 grpc_closure_list_append(
1665 &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT],
1666 GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked,
1667 finish_keepalive_ping, t, grpc_schedule_on_exec_ctx),
1668 GRPC_ERROR_NONE);
1669 return;
1670 }
1671 grpc_closure_list_append(
1672 &pq->lists[GRPC_CHTTP2_PCL_INITIATE],
1673 GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked, start_keepalive_ping,
1674 t, grpc_schedule_on_exec_ctx),
1675 GRPC_ERROR_NONE);
1676 grpc_closure_list_append(
1677 &pq->lists[GRPC_CHTTP2_PCL_NEXT],
1678 GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked, finish_keepalive_ping,
1679 t, grpc_schedule_on_exec_ctx),
1680 GRPC_ERROR_NONE);
1681 }
1682
grpc_chttp2_retry_initiate_ping(void * tp,grpc_error * error)1683 void grpc_chttp2_retry_initiate_ping(void* tp, grpc_error* error) {
1684 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
1685 t->combiner->Run(GRPC_CLOSURE_INIT(&t->retry_initiate_ping_locked,
1686 retry_initiate_ping_locked, t, nullptr),
1687 GRPC_ERROR_REF(error));
1688 }
1689
retry_initiate_ping_locked(void * tp,grpc_error * error)1690 static void retry_initiate_ping_locked(void* tp, grpc_error* error) {
1691 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
1692 t->ping_state.is_delayed_ping_timer_set = false;
1693 if (error == GRPC_ERROR_NONE) {
1694 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING);
1695 }
1696 GRPC_CHTTP2_UNREF_TRANSPORT(t, "retry_initiate_ping_locked");
1697 }
1698
grpc_chttp2_ack_ping(grpc_chttp2_transport * t,uint64_t id)1699 void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) {
1700 grpc_chttp2_ping_queue* pq = &t->ping_queue;
1701 if (pq->inflight_id != id) {
1702 gpr_log(GPR_DEBUG, "Unknown ping response from %s: %" PRIx64,
1703 t->peer_string.c_str(), id);
1704 return;
1705 }
1706 grpc_core::ExecCtx::RunList(DEBUG_LOCATION,
1707 &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
1708 if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
1709 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS);
1710 }
1711 }
1712
send_goaway(grpc_chttp2_transport * t,grpc_error * error)1713 static void send_goaway(grpc_chttp2_transport* t, grpc_error* error) {
1714 // We want to log this irrespective of whether http tracing is enabled
1715 gpr_log(GPR_INFO, "%s: Sending goaway err=%s", t->peer_string.c_str(),
1716 grpc_error_string(error));
1717 t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED;
1718 grpc_http2_error_code http_error;
1719 grpc_slice slice;
1720 grpc_error_get_status(error, GRPC_MILLIS_INF_FUTURE, nullptr, &slice,
1721 &http_error, nullptr);
1722 grpc_chttp2_goaway_append(t->last_new_stream_id,
1723 static_cast<uint32_t>(http_error),
1724 grpc_slice_ref_internal(slice), &t->qbuf);
1725 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT);
1726 GRPC_ERROR_UNREF(error);
1727 }
1728
grpc_chttp2_add_ping_strike(grpc_chttp2_transport * t)1729 void grpc_chttp2_add_ping_strike(grpc_chttp2_transport* t) {
1730 if (++t->ping_recv_state.ping_strikes > t->ping_policy.max_ping_strikes &&
1731 t->ping_policy.max_ping_strikes != 0) {
1732 send_goaway(t,
1733 grpc_error_set_int(
1734 GRPC_ERROR_CREATE_FROM_STATIC_STRING("too_many_pings"),
1735 GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM));
1736 // The transport will be closed after the write is done
1737 close_transport_locked(
1738 t, grpc_error_set_int(
1739 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many pings"),
1740 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1741 }
1742 }
1743
grpc_chttp2_reset_ping_clock(grpc_chttp2_transport * t)1744 void grpc_chttp2_reset_ping_clock(grpc_chttp2_transport* t) {
1745 if (!t->is_client) {
1746 t->ping_recv_state.last_ping_recv_time = GRPC_MILLIS_INF_PAST;
1747 t->ping_recv_state.ping_strikes = 0;
1748 }
1749 t->ping_state.pings_before_data_required =
1750 t->ping_policy.max_pings_without_data;
1751 }
1752
perform_transport_op_locked(void * stream_op,grpc_error *)1753 static void perform_transport_op_locked(void* stream_op,
1754 grpc_error* /*error_ignored*/) {
1755 grpc_transport_op* op = static_cast<grpc_transport_op*>(stream_op);
1756 grpc_chttp2_transport* t =
1757 static_cast<grpc_chttp2_transport*>(op->handler_private.extra_arg);
1758
1759 if (op->goaway_error) {
1760 send_goaway(t, op->goaway_error);
1761 }
1762
1763 if (op->set_accept_stream) {
1764 t->accept_stream_cb = op->set_accept_stream_fn;
1765 t->accept_stream_cb_user_data = op->set_accept_stream_user_data;
1766 }
1767
1768 if (op->bind_pollset) {
1769 grpc_endpoint_add_to_pollset(t->ep, op->bind_pollset);
1770 }
1771
1772 if (op->bind_pollset_set) {
1773 grpc_endpoint_add_to_pollset_set(t->ep, op->bind_pollset_set);
1774 }
1775
1776 if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
1777 send_ping_locked(t, op->send_ping.on_initiate, op->send_ping.on_ack);
1778 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING);
1779 }
1780
1781 if (op->start_connectivity_watch != nullptr) {
1782 t->state_tracker.AddWatcher(op->start_connectivity_watch_state,
1783 std::move(op->start_connectivity_watch));
1784 }
1785 if (op->stop_connectivity_watch != nullptr) {
1786 t->state_tracker.RemoveWatcher(op->stop_connectivity_watch);
1787 }
1788
1789 if (op->disconnect_with_error != GRPC_ERROR_NONE) {
1790 close_transport_locked(t, op->disconnect_with_error);
1791 }
1792
1793 grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, GRPC_ERROR_NONE);
1794
1795 GRPC_CHTTP2_UNREF_TRANSPORT(t, "transport_op");
1796 }
1797
perform_transport_op(grpc_transport * gt,grpc_transport_op * op)1798 static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
1799 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
1800 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1801 gpr_log(GPR_INFO, "perform_transport_op[t=%p]: %s", t,
1802 grpc_transport_op_string(op).c_str());
1803 }
1804 op->handler_private.extra_arg = gt;
1805 GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op");
1806 t->combiner->Run(GRPC_CLOSURE_INIT(&op->handler_private.closure,
1807 perform_transport_op_locked, op, nullptr),
1808 GRPC_ERROR_NONE);
1809 }
1810
1811 //
1812 // INPUT PROCESSING - GENERAL
1813 //
1814
grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport *,grpc_chttp2_stream * s)1815 void grpc_chttp2_maybe_complete_recv_initial_metadata(
1816 grpc_chttp2_transport* /*t*/, grpc_chttp2_stream* s) {
1817 if (s->recv_initial_metadata_ready != nullptr &&
1818 s->published_metadata[0] != GRPC_METADATA_NOT_PUBLISHED) {
1819 if (s->seen_error) {
1820 grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1821 if (!s->pending_byte_stream) {
1822 grpc_slice_buffer_reset_and_unref_internal(
1823 &s->unprocessed_incoming_frames_buffer);
1824 }
1825 }
1826 grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[0],
1827 s->recv_initial_metadata);
1828 null_then_sched_closure(&s->recv_initial_metadata_ready);
1829 }
1830 }
1831
grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport *,grpc_chttp2_stream * s)1832 void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* /*t*/,
1833 grpc_chttp2_stream* s) {
1834 grpc_error* error = GRPC_ERROR_NONE;
1835 if (s->recv_message_ready != nullptr) {
1836 *s->recv_message = nullptr;
1837 if (s->final_metadata_requested && s->seen_error) {
1838 grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1839 if (!s->pending_byte_stream) {
1840 grpc_slice_buffer_reset_and_unref_internal(
1841 &s->unprocessed_incoming_frames_buffer);
1842 }
1843 }
1844 if (!s->pending_byte_stream) {
1845 while (s->unprocessed_incoming_frames_buffer.length > 0 ||
1846 s->frame_storage.length > 0) {
1847 if (s->unprocessed_incoming_frames_buffer.length == 0) {
1848 grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer,
1849 &s->frame_storage);
1850 s->unprocessed_incoming_frames_decompressed = false;
1851 }
1852 if (!s->unprocessed_incoming_frames_decompressed &&
1853 s->stream_decompression_method !=
1854 GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
1855 GPR_ASSERT(s->decompressed_data_buffer.length == 0);
1856 bool end_of_context;
1857 if (!s->stream_decompression_ctx) {
1858 s->stream_decompression_ctx =
1859 grpc_stream_compression_context_create(
1860 s->stream_decompression_method);
1861 }
1862 if (!grpc_stream_decompress(
1863 s->stream_decompression_ctx,
1864 &s->unprocessed_incoming_frames_buffer,
1865 &s->decompressed_data_buffer, nullptr,
1866 GRPC_HEADER_SIZE_IN_BYTES - s->decompressed_header_bytes,
1867 &end_of_context)) {
1868 grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1869 grpc_slice_buffer_reset_and_unref_internal(
1870 &s->unprocessed_incoming_frames_buffer);
1871 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1872 "Stream decompression error.");
1873 } else {
1874 s->decompressed_header_bytes += s->decompressed_data_buffer.length;
1875 if (s->decompressed_header_bytes == GRPC_HEADER_SIZE_IN_BYTES) {
1876 s->decompressed_header_bytes = 0;
1877 }
1878 error = grpc_deframe_unprocessed_incoming_frames(
1879 &s->data_parser, s, &s->decompressed_data_buffer, nullptr,
1880 s->recv_message);
1881 if (end_of_context) {
1882 grpc_stream_compression_context_destroy(
1883 s->stream_decompression_ctx);
1884 s->stream_decompression_ctx = nullptr;
1885 }
1886 }
1887 } else {
1888 error = grpc_deframe_unprocessed_incoming_frames(
1889 &s->data_parser, s, &s->unprocessed_incoming_frames_buffer,
1890 nullptr, s->recv_message);
1891 }
1892 if (error != GRPC_ERROR_NONE) {
1893 s->seen_error = true;
1894 grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1895 grpc_slice_buffer_reset_and_unref_internal(
1896 &s->unprocessed_incoming_frames_buffer);
1897 break;
1898 } else if (*s->recv_message != nullptr) {
1899 break;
1900 }
1901 }
1902 }
1903 // save the length of the buffer before handing control back to application
1904 // threads. Needed to support correct flow control bookkeeping
1905 s->unprocessed_incoming_frames_buffer_cached_length =
1906 s->unprocessed_incoming_frames_buffer.length;
1907 if (error == GRPC_ERROR_NONE && *s->recv_message != nullptr) {
1908 null_then_sched_closure(&s->recv_message_ready);
1909 } else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) {
1910 *s->recv_message = nullptr;
1911 null_then_sched_closure(&s->recv_message_ready);
1912 }
1913 GRPC_ERROR_UNREF(error);
1914 }
1915 }
1916
grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1917 void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
1918 grpc_chttp2_stream* s) {
1919 grpc_chttp2_maybe_complete_recv_message(t, s);
1920 if (s->recv_trailing_metadata_finished != nullptr && s->read_closed &&
1921 s->write_closed) {
1922 if (s->seen_error || !t->is_client) {
1923 grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1924 if (!s->pending_byte_stream) {
1925 grpc_slice_buffer_reset_and_unref_internal(
1926 &s->unprocessed_incoming_frames_buffer);
1927 }
1928 }
1929 bool pending_data = s->pending_byte_stream ||
1930 s->unprocessed_incoming_frames_buffer.length > 0;
1931 if (s->read_closed && s->frame_storage.length > 0 && !pending_data &&
1932 !s->seen_error && s->recv_trailing_metadata_finished != nullptr) {
1933 // Maybe some SYNC_FLUSH data is left in frame_storage. Consume them and
1934 // maybe decompress the next 5 bytes in the stream.
1935 if (s->stream_decompression_method ==
1936 GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
1937 grpc_slice_buffer_move_first(
1938 &s->frame_storage,
1939 GPR_MIN(s->frame_storage.length, GRPC_HEADER_SIZE_IN_BYTES),
1940 &s->unprocessed_incoming_frames_buffer);
1941 if (s->unprocessed_incoming_frames_buffer.length > 0) {
1942 s->unprocessed_incoming_frames_decompressed = true;
1943 pending_data = true;
1944 }
1945 } else {
1946 bool end_of_context;
1947 if (!s->stream_decompression_ctx) {
1948 s->stream_decompression_ctx = grpc_stream_compression_context_create(
1949 s->stream_decompression_method);
1950 }
1951 if (!grpc_stream_decompress(
1952 s->stream_decompression_ctx, &s->frame_storage,
1953 &s->unprocessed_incoming_frames_buffer, nullptr,
1954 GRPC_HEADER_SIZE_IN_BYTES, &end_of_context)) {
1955 grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1956 grpc_slice_buffer_reset_and_unref_internal(
1957 &s->unprocessed_incoming_frames_buffer);
1958 s->seen_error = true;
1959 } else {
1960 if (s->unprocessed_incoming_frames_buffer.length > 0) {
1961 s->unprocessed_incoming_frames_decompressed = true;
1962 pending_data = true;
1963 }
1964 if (end_of_context) {
1965 grpc_stream_compression_context_destroy(
1966 s->stream_decompression_ctx);
1967 s->stream_decompression_ctx = nullptr;
1968 }
1969 }
1970 }
1971 }
1972 if (s->read_closed && s->frame_storage.length == 0 && !pending_data &&
1973 s->recv_trailing_metadata_finished != nullptr) {
1974 grpc_transport_move_stats(&s->stats, s->collecting_stats);
1975 s->collecting_stats = nullptr;
1976 grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[1],
1977 s->recv_trailing_metadata);
1978 null_then_sched_closure(&s->recv_trailing_metadata_finished);
1979 }
1980 }
1981 }
1982
remove_stream(grpc_chttp2_transport * t,uint32_t id,grpc_error * error)1983 static void remove_stream(grpc_chttp2_transport* t, uint32_t id,
1984 grpc_error* error) {
1985 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(
1986 grpc_chttp2_stream_map_delete(&t->stream_map, id));
1987 GPR_DEBUG_ASSERT(s);
1988 if (t->incoming_stream == s) {
1989 t->incoming_stream = nullptr;
1990 grpc_chttp2_parsing_become_skip_parser(t);
1991 }
1992 if (s->pending_byte_stream) {
1993 if (s->on_next != nullptr) {
1994 grpc_core::Chttp2IncomingByteStream* bs = s->data_parser.parsing_frame;
1995 if (error == GRPC_ERROR_NONE) {
1996 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
1997 }
1998 bs->PublishError(error);
1999 bs->Unref();
2000 s->data_parser.parsing_frame = nullptr;
2001 } else {
2002 GRPC_ERROR_UNREF(s->byte_stream_error);
2003 s->byte_stream_error = GRPC_ERROR_REF(error);
2004 }
2005 }
2006
2007 if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
2008 post_benign_reclaimer(t);
2009 if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SENT) {
2010 close_transport_locked(
2011 t, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2012 "Last stream closed after sending GOAWAY", &error, 1));
2013 }
2014 }
2015 if (grpc_chttp2_list_remove_writable_stream(t, s)) {
2016 GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:remove_stream");
2017 }
2018 grpc_chttp2_list_remove_stalled_by_stream(t, s);
2019 grpc_chttp2_list_remove_stalled_by_transport(t, s);
2020
2021 GRPC_ERROR_UNREF(error);
2022
2023 maybe_start_some_streams(t);
2024 }
2025
grpc_chttp2_cancel_stream(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error * due_to_error)2026 void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2027 grpc_error* due_to_error) {
2028 if (!t->is_client && !s->sent_trailing_metadata &&
2029 grpc_error_has_clear_grpc_status(due_to_error)) {
2030 close_from_api(t, s, due_to_error);
2031 return;
2032 }
2033
2034 if (!s->read_closed || !s->write_closed) {
2035 if (s->id != 0) {
2036 grpc_http2_error_code http_error;
2037 grpc_error_get_status(due_to_error, s->deadline, nullptr, nullptr,
2038 &http_error, nullptr);
2039 grpc_chttp2_add_rst_stream_to_next_write(
2040 t, s->id, static_cast<uint32_t>(http_error), &s->stats.outgoing);
2041 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
2042 }
2043 }
2044 if (due_to_error != GRPC_ERROR_NONE && !s->seen_error) {
2045 s->seen_error = true;
2046 }
2047 grpc_chttp2_mark_stream_closed(t, s, 1, 1, due_to_error);
2048 }
2049
grpc_chttp2_fake_status(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error * error)2050 void grpc_chttp2_fake_status(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2051 grpc_error* error) {
2052 grpc_status_code status;
2053 grpc_slice slice;
2054 grpc_error_get_status(error, s->deadline, &status, &slice, nullptr, nullptr);
2055 if (status != GRPC_STATUS_OK) {
2056 s->seen_error = true;
2057 }
2058 // stream_global->recv_trailing_metadata_finished gives us a
2059 // last chance replacement: we've received trailing metadata,
2060 // but something more important has become available to signal
2061 // to the upper layers - drop what we've got, and then publish
2062 // what we want - which is safe because we haven't told anyone
2063 // about the metadata yet
2064 if (s->published_metadata[1] == GRPC_METADATA_NOT_PUBLISHED ||
2065 s->recv_trailing_metadata_finished != nullptr) {
2066 char status_string[GPR_LTOA_MIN_BUFSIZE];
2067 gpr_ltoa(status, status_string);
2068 GRPC_LOG_IF_ERROR("add_status",
2069 grpc_chttp2_incoming_metadata_buffer_replace_or_add(
2070 &s->metadata_buffer[1],
2071 grpc_mdelem_from_slices(
2072 GRPC_MDSTR_GRPC_STATUS,
2073 grpc_core::UnmanagedMemorySlice(status_string))));
2074 if (!GRPC_SLICE_IS_EMPTY(slice)) {
2075 GRPC_LOG_IF_ERROR(
2076 "add_status_message",
2077 grpc_chttp2_incoming_metadata_buffer_replace_or_add(
2078 &s->metadata_buffer[1],
2079 grpc_mdelem_create(GRPC_MDSTR_GRPC_MESSAGE, slice, nullptr)));
2080 }
2081 s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE;
2082 grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2083 }
2084
2085 GRPC_ERROR_UNREF(error);
2086 }
2087
add_error(grpc_error * error,grpc_error ** refs,size_t * nrefs)2088 static void add_error(grpc_error* error, grpc_error** refs, size_t* nrefs) {
2089 if (error == GRPC_ERROR_NONE) return;
2090 for (size_t i = 0; i < *nrefs; i++) {
2091 if (error == refs[i]) {
2092 return;
2093 }
2094 }
2095 refs[*nrefs] = error;
2096 ++*nrefs;
2097 }
2098
removal_error(grpc_error * extra_error,grpc_chttp2_stream * s,const char * master_error_msg)2099 static grpc_error* removal_error(grpc_error* extra_error, grpc_chttp2_stream* s,
2100 const char* master_error_msg) {
2101 grpc_error* refs[3];
2102 size_t nrefs = 0;
2103 add_error(s->read_closed_error, refs, &nrefs);
2104 add_error(s->write_closed_error, refs, &nrefs);
2105 add_error(extra_error, refs, &nrefs);
2106 grpc_error* error = GRPC_ERROR_NONE;
2107 if (nrefs > 0) {
2108 error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(master_error_msg,
2109 refs, nrefs);
2110 }
2111 GRPC_ERROR_UNREF(extra_error);
2112 return error;
2113 }
2114
flush_write_list(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_chttp2_write_cb ** list,grpc_error * error)2115 static void flush_write_list(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2116 grpc_chttp2_write_cb** list, grpc_error* error) {
2117 while (*list) {
2118 grpc_chttp2_write_cb* cb = *list;
2119 *list = cb->next;
2120 grpc_chttp2_complete_closure_step(t, s, &cb->closure, GRPC_ERROR_REF(error),
2121 "on_write_finished_cb");
2122 cb->next = t->write_cb_pool;
2123 t->write_cb_pool = cb;
2124 }
2125 GRPC_ERROR_UNREF(error);
2126 }
2127
grpc_chttp2_fail_pending_writes(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error * error)2128 void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t,
2129 grpc_chttp2_stream* s, grpc_error* error) {
2130 error =
2131 removal_error(error, s, "Pending writes failed due to stream closure");
2132 s->send_initial_metadata = nullptr;
2133 grpc_chttp2_complete_closure_step(t, s, &s->send_initial_metadata_finished,
2134 GRPC_ERROR_REF(error),
2135 "send_initial_metadata_finished");
2136
2137 s->send_trailing_metadata = nullptr;
2138 s->sent_trailing_metadata_op = nullptr;
2139 grpc_chttp2_complete_closure_step(t, s, &s->send_trailing_metadata_finished,
2140 GRPC_ERROR_REF(error),
2141 "send_trailing_metadata_finished");
2142
2143 s->fetching_send_message.reset();
2144 grpc_chttp2_complete_closure_step(t, s, &s->fetching_send_message_finished,
2145 GRPC_ERROR_REF(error),
2146 "fetching_send_message_finished");
2147 flush_write_list(t, s, &s->on_write_finished_cbs, GRPC_ERROR_REF(error));
2148 flush_write_list(t, s, &s->on_flow_controlled_cbs, error);
2149 }
2150
grpc_chttp2_mark_stream_closed(grpc_chttp2_transport * t,grpc_chttp2_stream * s,int close_reads,int close_writes,grpc_error * error)2151 void grpc_chttp2_mark_stream_closed(grpc_chttp2_transport* t,
2152 grpc_chttp2_stream* s, int close_reads,
2153 int close_writes, grpc_error* error) {
2154 if (s->read_closed && s->write_closed) {
2155 // already closed, but we should still fake the status if needed.
2156 grpc_error* overall_error = removal_error(error, s, "Stream removed");
2157 if (overall_error != GRPC_ERROR_NONE) {
2158 grpc_chttp2_fake_status(t, s, overall_error);
2159 }
2160 grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2161 return;
2162 }
2163 bool closed_read = false;
2164 bool became_closed = false;
2165 if (close_reads && !s->read_closed) {
2166 s->read_closed_error = GRPC_ERROR_REF(error);
2167 s->read_closed = true;
2168 closed_read = true;
2169 }
2170 if (close_writes && !s->write_closed) {
2171 s->write_closed_error = GRPC_ERROR_REF(error);
2172 s->write_closed = true;
2173 grpc_chttp2_fail_pending_writes(t, s, GRPC_ERROR_REF(error));
2174 }
2175 if (s->read_closed && s->write_closed) {
2176 became_closed = true;
2177 grpc_error* overall_error =
2178 removal_error(GRPC_ERROR_REF(error), s, "Stream removed");
2179 if (s->id != 0) {
2180 remove_stream(t, s->id, GRPC_ERROR_REF(overall_error));
2181 } else {
2182 // Purge streams waiting on concurrency still waiting for id assignment
2183 grpc_chttp2_list_remove_waiting_for_concurrency(t, s);
2184 }
2185 if (overall_error != GRPC_ERROR_NONE) {
2186 grpc_chttp2_fake_status(t, s, overall_error);
2187 }
2188 }
2189 if (closed_read) {
2190 for (int i = 0; i < 2; i++) {
2191 if (s->published_metadata[i] == GRPC_METADATA_NOT_PUBLISHED) {
2192 s->published_metadata[i] = GRPC_METADATA_PUBLISHED_AT_CLOSE;
2193 }
2194 }
2195 grpc_chttp2_maybe_complete_recv_initial_metadata(t, s);
2196 grpc_chttp2_maybe_complete_recv_message(t, s);
2197 }
2198 if (became_closed) {
2199 grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2200 GRPC_CHTTP2_STREAM_UNREF(s, "chttp2");
2201 }
2202 GRPC_ERROR_UNREF(error);
2203 }
2204
close_from_api(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error * error)2205 static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2206 grpc_error* error) {
2207 grpc_slice hdr;
2208 grpc_slice status_hdr;
2209 grpc_slice http_status_hdr;
2210 grpc_slice content_type_hdr;
2211 grpc_slice message_pfx;
2212 uint8_t* p;
2213 uint32_t len = 0;
2214 grpc_status_code grpc_status;
2215 grpc_slice slice;
2216 grpc_error_get_status(error, s->deadline, &grpc_status, &slice, nullptr,
2217 nullptr);
2218
2219 GPR_ASSERT(grpc_status >= 0 && (int)grpc_status < 100);
2220
2221 // Hand roll a header block.
2222 // This is unnecessarily ugly - at some point we should find a more
2223 // elegant solution.
2224 // It's complicated by the fact that our send machinery would be dead by
2225 // the time we got around to sending this, so instead we ignore HPACK
2226 // compression and just write the uncompressed bytes onto the wire.
2227 if (!s->sent_initial_metadata) {
2228 http_status_hdr = GRPC_SLICE_MALLOC(13);
2229 p = GRPC_SLICE_START_PTR(http_status_hdr);
2230 *p++ = 0x00;
2231 *p++ = 7;
2232 *p++ = ':';
2233 *p++ = 's';
2234 *p++ = 't';
2235 *p++ = 'a';
2236 *p++ = 't';
2237 *p++ = 'u';
2238 *p++ = 's';
2239 *p++ = 3;
2240 *p++ = '2';
2241 *p++ = '0';
2242 *p++ = '0';
2243 GPR_ASSERT(p == GRPC_SLICE_END_PTR(http_status_hdr));
2244 len += static_cast<uint32_t> GRPC_SLICE_LENGTH(http_status_hdr);
2245
2246 content_type_hdr = GRPC_SLICE_MALLOC(31);
2247 p = GRPC_SLICE_START_PTR(content_type_hdr);
2248 *p++ = 0x00;
2249 *p++ = 12;
2250 *p++ = 'c';
2251 *p++ = 'o';
2252 *p++ = 'n';
2253 *p++ = 't';
2254 *p++ = 'e';
2255 *p++ = 'n';
2256 *p++ = 't';
2257 *p++ = '-';
2258 *p++ = 't';
2259 *p++ = 'y';
2260 *p++ = 'p';
2261 *p++ = 'e';
2262 *p++ = 16;
2263 *p++ = 'a';
2264 *p++ = 'p';
2265 *p++ = 'p';
2266 *p++ = 'l';
2267 *p++ = 'i';
2268 *p++ = 'c';
2269 *p++ = 'a';
2270 *p++ = 't';
2271 *p++ = 'i';
2272 *p++ = 'o';
2273 *p++ = 'n';
2274 *p++ = '/';
2275 *p++ = 'g';
2276 *p++ = 'r';
2277 *p++ = 'p';
2278 *p++ = 'c';
2279 GPR_ASSERT(p == GRPC_SLICE_END_PTR(content_type_hdr));
2280 len += static_cast<uint32_t> GRPC_SLICE_LENGTH(content_type_hdr);
2281 }
2282
2283 status_hdr = GRPC_SLICE_MALLOC(15 + (grpc_status >= 10));
2284 p = GRPC_SLICE_START_PTR(status_hdr);
2285 *p++ = 0x00; /* literal header, not indexed */
2286 *p++ = 11; /* len(grpc-status) */
2287 *p++ = 'g';
2288 *p++ = 'r';
2289 *p++ = 'p';
2290 *p++ = 'c';
2291 *p++ = '-';
2292 *p++ = 's';
2293 *p++ = 't';
2294 *p++ = 'a';
2295 *p++ = 't';
2296 *p++ = 'u';
2297 *p++ = 's';
2298 if (grpc_status < 10) {
2299 *p++ = 1;
2300 *p++ = static_cast<uint8_t>('0' + grpc_status);
2301 } else {
2302 *p++ = 2;
2303 *p++ = static_cast<uint8_t>('0' + (grpc_status / 10));
2304 *p++ = static_cast<uint8_t>('0' + (grpc_status % 10));
2305 }
2306 GPR_ASSERT(p == GRPC_SLICE_END_PTR(status_hdr));
2307 len += static_cast<uint32_t> GRPC_SLICE_LENGTH(status_hdr);
2308
2309 size_t msg_len = GRPC_SLICE_LENGTH(slice);
2310 GPR_ASSERT(msg_len <= UINT32_MAX);
2311 uint32_t msg_len_len = GRPC_CHTTP2_VARINT_LENGTH((uint32_t)msg_len, 1);
2312 message_pfx = GRPC_SLICE_MALLOC(14 + msg_len_len);
2313 p = GRPC_SLICE_START_PTR(message_pfx);
2314 *p++ = 0x00; /* literal header, not indexed */
2315 *p++ = 12; /* len(grpc-message) */
2316 *p++ = 'g';
2317 *p++ = 'r';
2318 *p++ = 'p';
2319 *p++ = 'c';
2320 *p++ = '-';
2321 *p++ = 'm';
2322 *p++ = 'e';
2323 *p++ = 's';
2324 *p++ = 's';
2325 *p++ = 'a';
2326 *p++ = 'g';
2327 *p++ = 'e';
2328 GRPC_CHTTP2_WRITE_VARINT((uint32_t)msg_len, 1, 0, p, (uint32_t)msg_len_len);
2329 p += msg_len_len;
2330 GPR_ASSERT(p == GRPC_SLICE_END_PTR(message_pfx));
2331 len += static_cast<uint32_t> GRPC_SLICE_LENGTH(message_pfx);
2332 len += static_cast<uint32_t>(msg_len);
2333
2334 hdr = GRPC_SLICE_MALLOC(9);
2335 p = GRPC_SLICE_START_PTR(hdr);
2336 *p++ = static_cast<uint8_t>(len >> 16);
2337 *p++ = static_cast<uint8_t>(len >> 8);
2338 *p++ = static_cast<uint8_t>(len);
2339 *p++ = GRPC_CHTTP2_FRAME_HEADER;
2340 *p++ = GRPC_CHTTP2_DATA_FLAG_END_STREAM | GRPC_CHTTP2_DATA_FLAG_END_HEADERS;
2341 *p++ = static_cast<uint8_t>(s->id >> 24);
2342 *p++ = static_cast<uint8_t>(s->id >> 16);
2343 *p++ = static_cast<uint8_t>(s->id >> 8);
2344 *p++ = static_cast<uint8_t>(s->id);
2345 GPR_ASSERT(p == GRPC_SLICE_END_PTR(hdr));
2346
2347 grpc_slice_buffer_add(&t->qbuf, hdr);
2348 if (!s->sent_initial_metadata) {
2349 grpc_slice_buffer_add(&t->qbuf, http_status_hdr);
2350 grpc_slice_buffer_add(&t->qbuf, content_type_hdr);
2351 }
2352 grpc_slice_buffer_add(&t->qbuf, status_hdr);
2353 grpc_slice_buffer_add(&t->qbuf, message_pfx);
2354 grpc_slice_buffer_add(&t->qbuf, grpc_slice_ref_internal(slice));
2355 grpc_chttp2_reset_ping_clock(t);
2356 grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR,
2357 &s->stats.outgoing);
2358
2359 grpc_chttp2_mark_stream_closed(t, s, 1, 1, error);
2360 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API);
2361 }
2362
2363 struct cancel_stream_cb_args {
2364 grpc_error* error;
2365 grpc_chttp2_transport* t;
2366 };
2367
cancel_stream_cb(void * user_data,uint32_t,void * stream)2368 static void cancel_stream_cb(void* user_data, uint32_t /*key*/, void* stream) {
2369 cancel_stream_cb_args* args = static_cast<cancel_stream_cb_args*>(user_data);
2370 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(stream);
2371 grpc_chttp2_cancel_stream(args->t, s, GRPC_ERROR_REF(args->error));
2372 }
2373
end_all_the_calls(grpc_chttp2_transport * t,grpc_error * error)2374 static void end_all_the_calls(grpc_chttp2_transport* t, grpc_error* error) {
2375 intptr_t http2_error;
2376 // If there is no explicit grpc or HTTP/2 error, set to UNAVAILABLE on server.
2377 if (!t->is_client && !grpc_error_has_clear_grpc_status(error) &&
2378 !grpc_error_get_int(error, GRPC_ERROR_INT_HTTP2_ERROR, &http2_error)) {
2379 error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
2380 GRPC_STATUS_UNAVAILABLE);
2381 }
2382 cancel_stream_cb_args args = {error, t};
2383 grpc_chttp2_stream_map_for_each(&t->stream_map, cancel_stream_cb, &args);
2384 GRPC_ERROR_UNREF(error);
2385 }
2386
2387 //
2388 // INPUT PROCESSING - PARSING
2389 //
2390
2391 template <class F>
WithUrgency(grpc_chttp2_transport * t,grpc_core::chttp2::FlowControlAction::Urgency urgency,grpc_chttp2_initiate_write_reason reason,F action)2392 static void WithUrgency(grpc_chttp2_transport* t,
2393 grpc_core::chttp2::FlowControlAction::Urgency urgency,
2394 grpc_chttp2_initiate_write_reason reason, F action) {
2395 switch (urgency) {
2396 case grpc_core::chttp2::FlowControlAction::Urgency::NO_ACTION_NEEDED:
2397 break;
2398 case grpc_core::chttp2::FlowControlAction::Urgency::UPDATE_IMMEDIATELY:
2399 grpc_chttp2_initiate_write(t, reason);
2400 // fallthrough
2401 case grpc_core::chttp2::FlowControlAction::Urgency::QUEUE_UPDATE:
2402 action();
2403 break;
2404 }
2405 }
2406
grpc_chttp2_act_on_flowctl_action(const grpc_core::chttp2::FlowControlAction & action,grpc_chttp2_transport * t,grpc_chttp2_stream * s)2407 void grpc_chttp2_act_on_flowctl_action(
2408 const grpc_core::chttp2::FlowControlAction& action,
2409 grpc_chttp2_transport* t, grpc_chttp2_stream* s) {
2410 WithUrgency(t, action.send_stream_update(),
2411 GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL,
2412 [t, s]() { grpc_chttp2_mark_stream_writable(t, s); });
2413 WithUrgency(t, action.send_transport_update(),
2414 GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL, []() {});
2415 WithUrgency(t, action.send_initial_window_update(),
2416 GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
2417 queue_setting_update(t,
2418 GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
2419 action.initial_window_size());
2420 });
2421 WithUrgency(t, action.send_max_frame_size_update(),
2422 GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
2423 queue_setting_update(t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
2424 action.max_frame_size());
2425 });
2426 }
2427
try_http_parsing(grpc_chttp2_transport * t)2428 static grpc_error* try_http_parsing(grpc_chttp2_transport* t) {
2429 grpc_http_parser parser;
2430 size_t i = 0;
2431 grpc_error* error = GRPC_ERROR_NONE;
2432 grpc_http_response response;
2433
2434 grpc_http_parser_init(&parser, GRPC_HTTP_RESPONSE, &response);
2435
2436 grpc_error* parse_error = GRPC_ERROR_NONE;
2437 for (; i < t->read_buffer.count && parse_error == GRPC_ERROR_NONE; i++) {
2438 parse_error =
2439 grpc_http_parser_parse(&parser, t->read_buffer.slices[i], nullptr);
2440 }
2441 if (parse_error == GRPC_ERROR_NONE &&
2442 (parse_error = grpc_http_parser_eof(&parser)) == GRPC_ERROR_NONE) {
2443 error = grpc_error_set_int(
2444 grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2445 "Trying to connect an http1.x server"),
2446 GRPC_ERROR_INT_HTTP_STATUS, response.status),
2447 GRPC_ERROR_INT_GRPC_STATUS,
2448 grpc_http2_status_to_grpc_status(response.status));
2449 }
2450 GRPC_ERROR_UNREF(parse_error);
2451
2452 grpc_http_parser_destroy(&parser);
2453 grpc_http_response_destroy(&response);
2454 return error;
2455 }
2456
read_action(void * tp,grpc_error * error)2457 static void read_action(void* tp, grpc_error* error) {
2458 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2459 t->combiner->Run(
2460 GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t, nullptr),
2461 GRPC_ERROR_REF(error));
2462 }
2463
read_action_locked(void * tp,grpc_error * error)2464 static void read_action_locked(void* tp, grpc_error* error) {
2465 GPR_TIMER_SCOPE("reading_action_locked", 0);
2466
2467 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2468
2469 GRPC_ERROR_REF(error);
2470
2471 grpc_error* err = error;
2472 if (err != GRPC_ERROR_NONE) {
2473 err = grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2474 "Endpoint read failed", &err, 1),
2475 GRPC_ERROR_INT_OCCURRED_DURING_WRITE,
2476 t->write_state);
2477 }
2478 GPR_SWAP(grpc_error*, err, error);
2479 GRPC_ERROR_UNREF(err);
2480 if (t->closed_with_error == GRPC_ERROR_NONE) {
2481 GPR_TIMER_SCOPE("reading_action.parse", 0);
2482 size_t i = 0;
2483 grpc_error* errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE,
2484 GRPC_ERROR_NONE};
2485 for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) {
2486 errors[1] = grpc_chttp2_perform_read(t, t->read_buffer.slices[i]);
2487 }
2488 if (errors[1] != GRPC_ERROR_NONE) {
2489 errors[2] = try_http_parsing(t);
2490 GRPC_ERROR_UNREF(error);
2491 error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2492 "Failed parsing HTTP/2", errors, GPR_ARRAY_SIZE(errors));
2493 }
2494 for (i = 0; i < GPR_ARRAY_SIZE(errors); i++) {
2495 GRPC_ERROR_UNREF(errors[i]);
2496 }
2497
2498 GPR_TIMER_SCOPE("post_parse_locked", 0);
2499 if (t->initial_window_update != 0) {
2500 if (t->initial_window_update > 0) {
2501 grpc_chttp2_stream* s;
2502 while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) {
2503 grpc_chttp2_mark_stream_writable(t, s);
2504 grpc_chttp2_initiate_write(
2505 t, GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING);
2506 }
2507 }
2508 t->initial_window_update = 0;
2509 }
2510 }
2511
2512 GPR_TIMER_SCOPE("post_reading_action_locked", 0);
2513 bool keep_reading = false;
2514 if (error == GRPC_ERROR_NONE && t->closed_with_error != GRPC_ERROR_NONE) {
2515 error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2516 "Transport closed", &t->closed_with_error, 1);
2517 }
2518 if (error != GRPC_ERROR_NONE) {
2519 // If a goaway frame was received, this might be the reason why the read
2520 // failed. Add this info to the error
2521 if (t->goaway_error != GRPC_ERROR_NONE) {
2522 error = grpc_error_add_child(error, GRPC_ERROR_REF(t->goaway_error));
2523 }
2524
2525 close_transport_locked(t, GRPC_ERROR_REF(error));
2526 t->endpoint_reading = 0;
2527 } else if (t->closed_with_error == GRPC_ERROR_NONE) {
2528 keep_reading = true;
2529 // Since we have read a byte, reset the keepalive timer
2530 if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
2531 grpc_timer_cancel(&t->keepalive_ping_timer);
2532 }
2533 }
2534 grpc_slice_buffer_reset_and_unref_internal(&t->read_buffer);
2535
2536 if (keep_reading) {
2537 if (t->num_pending_induced_frames >= DEFAULT_MAX_PENDING_INDUCED_FRAMES) {
2538 t->reading_paused_on_pending_induced_frames = true;
2539 GRPC_CHTTP2_IF_TRACING(
2540 gpr_log(GPR_INFO,
2541 "transport %p : Pausing reading due to too "
2542 "many unwritten SETTINGS ACK and RST_STREAM frames",
2543 t));
2544 } else {
2545 continue_read_action_locked(t);
2546 }
2547 } else {
2548 GRPC_CHTTP2_UNREF_TRANSPORT(t, "reading_action");
2549 }
2550
2551 GRPC_ERROR_UNREF(error);
2552 }
2553
continue_read_action_locked(grpc_chttp2_transport * t)2554 static void continue_read_action_locked(grpc_chttp2_transport* t) {
2555 const bool urgent = t->goaway_error != GRPC_ERROR_NONE;
2556 GRPC_CLOSURE_INIT(&t->read_action_locked, read_action, t,
2557 grpc_schedule_on_exec_ctx);
2558 grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action_locked, urgent);
2559 grpc_chttp2_act_on_flowctl_action(t->flow_control->MakeAction(), t, nullptr);
2560 }
2561
2562 // t is reffed prior to calling the first time, and once the callback chain
2563 // that kicks off finishes, it's unreffed
schedule_bdp_ping_locked(grpc_chttp2_transport * t)2564 void schedule_bdp_ping_locked(grpc_chttp2_transport* t) {
2565 t->flow_control->bdp_estimator()->SchedulePing();
2566 send_ping_locked(
2567 t,
2568 GRPC_CLOSURE_INIT(&t->start_bdp_ping_locked, start_bdp_ping, t,
2569 grpc_schedule_on_exec_ctx),
2570 GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked, finish_bdp_ping, t,
2571 grpc_schedule_on_exec_ctx));
2572 // TODO(yashykt): Enabling this causes internal b/168345569. Re-enable once
2573 // fixed.
2574 // grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_BDP_PING);
2575 }
2576
start_bdp_ping(void * tp,grpc_error * error)2577 static void start_bdp_ping(void* tp, grpc_error* error) {
2578 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2579 t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_bdp_ping_locked,
2580 start_bdp_ping_locked, t, nullptr),
2581 GRPC_ERROR_REF(error));
2582 }
2583
start_bdp_ping_locked(void * tp,grpc_error * error)2584 static void start_bdp_ping_locked(void* tp, grpc_error* error) {
2585 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2586 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
2587 gpr_log(GPR_INFO, "%s: Start BDP ping err=%s", t->peer_string.c_str(),
2588 grpc_error_string(error));
2589 }
2590 if (error != GRPC_ERROR_NONE || t->closed_with_error != GRPC_ERROR_NONE) {
2591 return;
2592 }
2593 // Reset the keepalive ping timer
2594 if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
2595 grpc_timer_cancel(&t->keepalive_ping_timer);
2596 }
2597 t->flow_control->bdp_estimator()->StartPing();
2598 t->bdp_ping_started = true;
2599 }
2600
finish_bdp_ping(void * tp,grpc_error * error)2601 static void finish_bdp_ping(void* tp, grpc_error* error) {
2602 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2603 t->combiner->Run(GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked,
2604 finish_bdp_ping_locked, t, nullptr),
2605 GRPC_ERROR_REF(error));
2606 }
2607
finish_bdp_ping_locked(void * tp,grpc_error * error)2608 static void finish_bdp_ping_locked(void* tp, grpc_error* error) {
2609 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2610 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
2611 gpr_log(GPR_INFO, "%s: Complete BDP ping err=%s", t->peer_string.c_str(),
2612 grpc_error_string(error));
2613 }
2614 if (error != GRPC_ERROR_NONE || t->closed_with_error != GRPC_ERROR_NONE) {
2615 GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
2616 return;
2617 }
2618 if (!t->bdp_ping_started) {
2619 // start_bdp_ping_locked has not been run yet. Schedule
2620 // finish_bdp_ping_locked to be run later.
2621 t->combiner->Run(GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked,
2622 finish_bdp_ping_locked, t, nullptr),
2623 GRPC_ERROR_REF(error));
2624 return;
2625 }
2626 t->bdp_ping_started = false;
2627 grpc_millis next_ping = t->flow_control->bdp_estimator()->CompletePing();
2628 grpc_chttp2_act_on_flowctl_action(t->flow_control->PeriodicUpdate(), t,
2629 nullptr);
2630 GPR_ASSERT(!t->have_next_bdp_ping_timer);
2631 t->have_next_bdp_ping_timer = true;
2632 GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked,
2633 next_bdp_ping_timer_expired, t, grpc_schedule_on_exec_ctx);
2634 grpc_timer_init(&t->next_bdp_ping_timer, next_ping,
2635 &t->next_bdp_ping_timer_expired_locked);
2636 }
2637
next_bdp_ping_timer_expired(void * tp,grpc_error * error)2638 static void next_bdp_ping_timer_expired(void* tp, grpc_error* error) {
2639 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2640 t->combiner->Run(
2641 GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked,
2642 next_bdp_ping_timer_expired_locked, t, nullptr),
2643 GRPC_ERROR_REF(error));
2644 }
2645
next_bdp_ping_timer_expired_locked(void * tp,grpc_error * error)2646 static void next_bdp_ping_timer_expired_locked(void* tp, grpc_error* error) {
2647 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2648 GPR_ASSERT(t->have_next_bdp_ping_timer);
2649 t->have_next_bdp_ping_timer = false;
2650 if (error != GRPC_ERROR_NONE) {
2651 GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
2652 return;
2653 }
2654 if (t->flow_control->bdp_estimator()->accumulator() == 0) {
2655 // Block the bdp ping till we receive more data.
2656 t->bdp_ping_blocked = true;
2657 GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
2658 } else {
2659 schedule_bdp_ping_locked(t);
2660 }
2661 }
2662
grpc_chttp2_config_default_keepalive_args(grpc_channel_args * args,bool is_client)2663 void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
2664 bool is_client) {
2665 size_t i;
2666 if (args) {
2667 for (i = 0; i < args->num_args; i++) {
2668 if (0 == strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) {
2669 const int value = grpc_channel_arg_get_integer(
2670 &args->args[i], {is_client ? g_default_client_keepalive_time_ms
2671 : g_default_server_keepalive_time_ms,
2672 1, INT_MAX});
2673 if (is_client) {
2674 g_default_client_keepalive_time_ms = value;
2675 } else {
2676 g_default_server_keepalive_time_ms = value;
2677 }
2678 } else if (0 ==
2679 strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) {
2680 const int value = grpc_channel_arg_get_integer(
2681 &args->args[i], {is_client ? g_default_client_keepalive_timeout_ms
2682 : g_default_server_keepalive_timeout_ms,
2683 0, INT_MAX});
2684 if (is_client) {
2685 g_default_client_keepalive_timeout_ms = value;
2686 } else {
2687 g_default_server_keepalive_timeout_ms = value;
2688 }
2689 } else if (0 == strcmp(args->args[i].key,
2690 GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) {
2691 const bool value = static_cast<uint32_t>(grpc_channel_arg_get_integer(
2692 &args->args[i],
2693 {is_client ? g_default_client_keepalive_permit_without_calls
2694 : g_default_server_keepalive_timeout_ms,
2695 0, 1}));
2696 if (is_client) {
2697 g_default_client_keepalive_permit_without_calls = value;
2698 } else {
2699 g_default_server_keepalive_permit_without_calls = value;
2700 }
2701 } else if (0 ==
2702 strcmp(args->args[i].key, GRPC_ARG_HTTP2_MAX_PING_STRIKES)) {
2703 g_default_max_ping_strikes = grpc_channel_arg_get_integer(
2704 &args->args[i], {g_default_max_ping_strikes, 0, INT_MAX});
2705 } else if (0 == strcmp(args->args[i].key,
2706 GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) {
2707 g_default_max_pings_without_data = grpc_channel_arg_get_integer(
2708 &args->args[i], {g_default_max_pings_without_data, 0, INT_MAX});
2709 } else if (0 ==
2710 strcmp(
2711 args->args[i].key,
2712 GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS)) {
2713 g_default_min_recv_ping_interval_without_data_ms =
2714 grpc_channel_arg_get_integer(
2715 &args->args[i],
2716 {g_default_min_recv_ping_interval_without_data_ms, 0, INT_MAX});
2717 }
2718 }
2719 }
2720 }
2721
init_keepalive_ping(void * arg,grpc_error * error)2722 static void init_keepalive_ping(void* arg, grpc_error* error) {
2723 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2724 t->combiner->Run(GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked,
2725 init_keepalive_ping_locked, t, nullptr),
2726 GRPC_ERROR_REF(error));
2727 }
2728
init_keepalive_ping_locked(void * arg,grpc_error * error)2729 static void init_keepalive_ping_locked(void* arg, grpc_error* error) {
2730 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2731 GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING);
2732 if (t->destroying || t->closed_with_error != GRPC_ERROR_NONE) {
2733 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
2734 } else if (error == GRPC_ERROR_NONE) {
2735 if (t->keepalive_permit_without_calls ||
2736 grpc_chttp2_stream_map_size(&t->stream_map) > 0) {
2737 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING;
2738 GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end");
2739 grpc_timer_init_unset(&t->keepalive_watchdog_timer);
2740 send_keepalive_ping_locked(t);
2741 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING);
2742 } else {
2743 GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
2744 GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t,
2745 grpc_schedule_on_exec_ctx);
2746 grpc_timer_init(&t->keepalive_ping_timer,
2747 grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
2748 &t->init_keepalive_ping_locked);
2749 }
2750 } else if (error == GRPC_ERROR_CANCELLED) {
2751 // The keepalive ping timer may be cancelled by bdp
2752 GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
2753 GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t,
2754 grpc_schedule_on_exec_ctx);
2755 grpc_timer_init(&t->keepalive_ping_timer,
2756 grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
2757 &t->init_keepalive_ping_locked);
2758 }
2759 GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping");
2760 }
2761
start_keepalive_ping(void * arg,grpc_error * error)2762 static void start_keepalive_ping(void* arg, grpc_error* error) {
2763 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2764 t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked,
2765 start_keepalive_ping_locked, t, nullptr),
2766 GRPC_ERROR_REF(error));
2767 }
2768
start_keepalive_ping_locked(void * arg,grpc_error * error)2769 static void start_keepalive_ping_locked(void* arg, grpc_error* error) {
2770 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2771 if (error != GRPC_ERROR_NONE) {
2772 return;
2773 }
2774 if (t->channelz_socket != nullptr) {
2775 t->channelz_socket->RecordKeepaliveSent();
2776 }
2777 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
2778 GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
2779 gpr_log(GPR_INFO, "%s: Start keepalive ping", t->peer_string.c_str());
2780 }
2781 GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog");
2782 GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked,
2783 keepalive_watchdog_fired, t, grpc_schedule_on_exec_ctx);
2784 grpc_timer_init(&t->keepalive_watchdog_timer,
2785 grpc_core::ExecCtx::Get()->Now() + t->keepalive_timeout,
2786 &t->keepalive_watchdog_fired_locked);
2787 t->keepalive_ping_started = true;
2788 }
2789
finish_keepalive_ping(void * arg,grpc_error * error)2790 static void finish_keepalive_ping(void* arg, grpc_error* error) {
2791 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2792 t->combiner->Run(GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked,
2793 finish_keepalive_ping_locked, t, nullptr),
2794 GRPC_ERROR_REF(error));
2795 }
2796
finish_keepalive_ping_locked(void * arg,grpc_error * error)2797 static void finish_keepalive_ping_locked(void* arg, grpc_error* error) {
2798 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2799 if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
2800 if (error == GRPC_ERROR_NONE) {
2801 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
2802 GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
2803 gpr_log(GPR_INFO, "%s: Finish keepalive ping", t->peer_string.c_str());
2804 }
2805 if (!t->keepalive_ping_started) {
2806 // start_keepalive_ping_locked has not run yet. Reschedule
2807 // finish_keepalive_ping_locked for it to be run later.
2808 t->combiner->Run(
2809 GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked,
2810 finish_keepalive_ping_locked, t, nullptr),
2811 GRPC_ERROR_REF(error));
2812 return;
2813 }
2814 t->keepalive_ping_started = false;
2815 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
2816 grpc_timer_cancel(&t->keepalive_watchdog_timer);
2817 GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
2818 GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t,
2819 grpc_schedule_on_exec_ctx);
2820 grpc_timer_init(&t->keepalive_ping_timer,
2821 grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
2822 &t->init_keepalive_ping_locked);
2823 }
2824 }
2825 GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive ping end");
2826 }
2827
keepalive_watchdog_fired(void * arg,grpc_error * error)2828 static void keepalive_watchdog_fired(void* arg, grpc_error* error) {
2829 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2830 t->combiner->Run(
2831 GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked,
2832 keepalive_watchdog_fired_locked, t, nullptr),
2833 GRPC_ERROR_REF(error));
2834 }
2835
keepalive_watchdog_fired_locked(void * arg,grpc_error * error)2836 static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error) {
2837 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2838 if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
2839 if (error == GRPC_ERROR_NONE) {
2840 gpr_log(GPR_INFO, "%s: Keepalive watchdog fired. Closing transport.",
2841 t->peer_string.c_str());
2842 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
2843 close_transport_locked(
2844 t, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2845 "keepalive watchdog timeout"),
2846 GRPC_ERROR_INT_GRPC_STATUS,
2847 GRPC_STATUS_UNAVAILABLE));
2848 }
2849 } else {
2850 // The watchdog timer should have been cancelled by
2851 // finish_keepalive_ping_locked.
2852 if (GPR_UNLIKELY(error != GRPC_ERROR_CANCELLED)) {
2853 gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)",
2854 t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING);
2855 }
2856 }
2857 GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog");
2858 }
2859
2860 //
2861 // CALLBACK LOOP
2862 //
2863
connectivity_state_set(grpc_chttp2_transport * t,grpc_connectivity_state state,const absl::Status & status,const char * reason)2864 static void connectivity_state_set(grpc_chttp2_transport* t,
2865 grpc_connectivity_state state,
2866 const absl::Status& status,
2867 const char* reason) {
2868 GRPC_CHTTP2_IF_TRACING(
2869 gpr_log(GPR_INFO, "transport %p set connectivity_state=%d", t, state));
2870 t->state_tracker.SetState(state, status, reason);
2871 }
2872
2873 //
2874 // POLLSET STUFF
2875 //
2876
set_pollset(grpc_transport * gt,grpc_stream *,grpc_pollset * pollset)2877 static void set_pollset(grpc_transport* gt, grpc_stream* /*gs*/,
2878 grpc_pollset* pollset) {
2879 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
2880 grpc_endpoint_add_to_pollset(t->ep, pollset);
2881 }
2882
set_pollset_set(grpc_transport * gt,grpc_stream *,grpc_pollset_set * pollset_set)2883 static void set_pollset_set(grpc_transport* gt, grpc_stream* /*gs*/,
2884 grpc_pollset_set* pollset_set) {
2885 grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
2886 grpc_endpoint_add_to_pollset_set(t->ep, pollset_set);
2887 }
2888
2889 //
2890 // BYTE STREAM
2891 //
2892
reset_byte_stream(void * arg,grpc_error * error)2893 static void reset_byte_stream(void* arg, grpc_error* error) {
2894 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(arg);
2895 s->pending_byte_stream = false;
2896 if (error == GRPC_ERROR_NONE) {
2897 grpc_chttp2_maybe_complete_recv_message(s->t, s);
2898 grpc_chttp2_maybe_complete_recv_trailing_metadata(s->t, s);
2899 } else {
2900 GPR_ASSERT(error != GRPC_ERROR_NONE);
2901 grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->on_next, GRPC_ERROR_REF(error));
2902 s->on_next = nullptr;
2903 GRPC_ERROR_UNREF(s->byte_stream_error);
2904 s->byte_stream_error = GRPC_ERROR_NONE;
2905 grpc_chttp2_cancel_stream(s->t, s, GRPC_ERROR_REF(error));
2906 s->byte_stream_error = GRPC_ERROR_REF(error);
2907 }
2908 }
2909
2910 namespace grpc_core {
2911
Chttp2IncomingByteStream(grpc_chttp2_transport * transport,grpc_chttp2_stream * stream,uint32_t frame_size,uint32_t flags)2912 Chttp2IncomingByteStream::Chttp2IncomingByteStream(
2913 grpc_chttp2_transport* transport, grpc_chttp2_stream* stream,
2914 uint32_t frame_size, uint32_t flags)
2915 : ByteStream(frame_size, flags),
2916 transport_(transport),
2917 stream_(stream),
2918 refs_(2),
2919 remaining_bytes_(frame_size) {
2920 GRPC_ERROR_UNREF(stream->byte_stream_error);
2921 stream->byte_stream_error = GRPC_ERROR_NONE;
2922 }
2923
OrphanLocked(void * arg,grpc_error *)2924 void Chttp2IncomingByteStream::OrphanLocked(void* arg,
2925 grpc_error* /*error_ignored*/) {
2926 Chttp2IncomingByteStream* bs = static_cast<Chttp2IncomingByteStream*>(arg);
2927 grpc_chttp2_stream* s = bs->stream_;
2928 grpc_chttp2_transport* t = s->t;
2929 bs->Unref();
2930 s->pending_byte_stream = false;
2931 grpc_chttp2_maybe_complete_recv_message(t, s);
2932 grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2933 }
2934
Orphan()2935 void Chttp2IncomingByteStream::Orphan() {
2936 GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0);
2937 transport_->combiner->Run(
2938 GRPC_CLOSURE_INIT(&destroy_action_,
2939 &Chttp2IncomingByteStream::OrphanLocked, this, nullptr),
2940 GRPC_ERROR_NONE);
2941 }
2942
NextLocked(void * arg,grpc_error *)2943 void Chttp2IncomingByteStream::NextLocked(void* arg,
2944 grpc_error* /*error_ignored*/) {
2945 Chttp2IncomingByteStream* bs = static_cast<Chttp2IncomingByteStream*>(arg);
2946 grpc_chttp2_transport* t = bs->transport_;
2947 grpc_chttp2_stream* s = bs->stream_;
2948 size_t cur_length = s->frame_storage.length;
2949 if (!s->read_closed) {
2950 s->flow_control->IncomingByteStreamUpdate(bs->next_action_.max_size_hint,
2951 cur_length);
2952 grpc_chttp2_act_on_flowctl_action(s->flow_control->MakeAction(), t, s);
2953 }
2954 GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0);
2955 if (s->frame_storage.length > 0) {
2956 grpc_slice_buffer_swap(&s->frame_storage,
2957 &s->unprocessed_incoming_frames_buffer);
2958 s->unprocessed_incoming_frames_decompressed = false;
2959 grpc_core::ExecCtx::Run(DEBUG_LOCATION, bs->next_action_.on_complete,
2960 GRPC_ERROR_NONE);
2961 } else if (s->byte_stream_error != GRPC_ERROR_NONE) {
2962 grpc_core::ExecCtx::Run(DEBUG_LOCATION, bs->next_action_.on_complete,
2963 GRPC_ERROR_REF(s->byte_stream_error));
2964 if (s->data_parser.parsing_frame != nullptr) {
2965 s->data_parser.parsing_frame->Unref();
2966 s->data_parser.parsing_frame = nullptr;
2967 }
2968 } else if (s->read_closed) {
2969 if (bs->remaining_bytes_ != 0) {
2970 s->byte_stream_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2971 "Truncated message", &s->read_closed_error, 1);
2972 grpc_core::ExecCtx::Run(DEBUG_LOCATION, bs->next_action_.on_complete,
2973 GRPC_ERROR_REF(s->byte_stream_error));
2974 if (s->data_parser.parsing_frame != nullptr) {
2975 s->data_parser.parsing_frame->Unref();
2976 s->data_parser.parsing_frame = nullptr;
2977 }
2978 } else {
2979 // Should never reach here.
2980 GPR_ASSERT(false);
2981 }
2982 } else {
2983 s->on_next = bs->next_action_.on_complete;
2984 }
2985 bs->Unref();
2986 }
2987
Next(size_t max_size_hint,grpc_closure * on_complete)2988 bool Chttp2IncomingByteStream::Next(size_t max_size_hint,
2989 grpc_closure* on_complete) {
2990 GPR_TIMER_SCOPE("incoming_byte_stream_next", 0);
2991 if (stream_->unprocessed_incoming_frames_buffer.length > 0) {
2992 return true;
2993 } else {
2994 Ref();
2995 next_action_.max_size_hint = max_size_hint;
2996 next_action_.on_complete = on_complete;
2997 transport_->combiner->Run(
2998 GRPC_CLOSURE_INIT(&next_action_.closure,
2999 &Chttp2IncomingByteStream::NextLocked, this, nullptr),
3000 GRPC_ERROR_NONE);
3001 return false;
3002 }
3003 }
3004
MaybeCreateStreamDecompressionCtx()3005 void Chttp2IncomingByteStream::MaybeCreateStreamDecompressionCtx() {
3006 GPR_DEBUG_ASSERT(stream_->stream_decompression_method !=
3007 GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS);
3008 if (!stream_->stream_decompression_ctx) {
3009 stream_->stream_decompression_ctx = grpc_stream_compression_context_create(
3010 stream_->stream_decompression_method);
3011 }
3012 }
3013
Pull(grpc_slice * slice)3014 grpc_error* Chttp2IncomingByteStream::Pull(grpc_slice* slice) {
3015 GPR_TIMER_SCOPE("incoming_byte_stream_pull", 0);
3016 grpc_error* error;
3017 if (stream_->unprocessed_incoming_frames_buffer.length > 0) {
3018 if (!stream_->unprocessed_incoming_frames_decompressed &&
3019 stream_->stream_decompression_method !=
3020 GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
3021 bool end_of_context;
3022 MaybeCreateStreamDecompressionCtx();
3023 if (!grpc_stream_decompress(stream_->stream_decompression_ctx,
3024 &stream_->unprocessed_incoming_frames_buffer,
3025 &stream_->decompressed_data_buffer, nullptr,
3026 MAX_SIZE_T, &end_of_context)) {
3027 error =
3028 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream decompression error.");
3029 return error;
3030 }
3031 GPR_ASSERT(stream_->unprocessed_incoming_frames_buffer.length == 0);
3032 grpc_slice_buffer_swap(&stream_->unprocessed_incoming_frames_buffer,
3033 &stream_->decompressed_data_buffer);
3034 stream_->unprocessed_incoming_frames_decompressed = true;
3035 if (end_of_context) {
3036 grpc_stream_compression_context_destroy(
3037 stream_->stream_decompression_ctx);
3038 stream_->stream_decompression_ctx = nullptr;
3039 }
3040 if (stream_->unprocessed_incoming_frames_buffer.length == 0) {
3041 *slice = grpc_empty_slice();
3042 }
3043 }
3044 error = grpc_deframe_unprocessed_incoming_frames(
3045 &stream_->data_parser, stream_,
3046 &stream_->unprocessed_incoming_frames_buffer, slice, nullptr);
3047 if (error != GRPC_ERROR_NONE) {
3048 return error;
3049 }
3050 } else {
3051 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
3052 stream_->t->combiner->Run(&stream_->reset_byte_stream,
3053 GRPC_ERROR_REF(error));
3054 return error;
3055 }
3056 return GRPC_ERROR_NONE;
3057 }
3058
PublishError(grpc_error * error)3059 void Chttp2IncomingByteStream::PublishError(grpc_error* error) {
3060 GPR_ASSERT(error != GRPC_ERROR_NONE);
3061 grpc_core::ExecCtx::Run(DEBUG_LOCATION, stream_->on_next,
3062 GRPC_ERROR_REF(error));
3063 stream_->on_next = nullptr;
3064 GRPC_ERROR_UNREF(stream_->byte_stream_error);
3065 stream_->byte_stream_error = GRPC_ERROR_REF(error);
3066 grpc_chttp2_cancel_stream(transport_, stream_, GRPC_ERROR_REF(error));
3067 }
3068
Push(const grpc_slice & slice,grpc_slice * slice_out)3069 grpc_error* Chttp2IncomingByteStream::Push(const grpc_slice& slice,
3070 grpc_slice* slice_out) {
3071 if (remaining_bytes_ < GRPC_SLICE_LENGTH(slice)) {
3072 grpc_error* error =
3073 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream");
3074 transport_->combiner->Run(&stream_->reset_byte_stream,
3075 GRPC_ERROR_REF(error));
3076 grpc_slice_unref_internal(slice);
3077 return error;
3078 } else {
3079 remaining_bytes_ -= static_cast<uint32_t> GRPC_SLICE_LENGTH(slice);
3080 if (slice_out != nullptr) {
3081 *slice_out = slice;
3082 }
3083 return GRPC_ERROR_NONE;
3084 }
3085 }
3086
Finished(grpc_error * error,bool reset_on_error)3087 grpc_error* Chttp2IncomingByteStream::Finished(grpc_error* error,
3088 bool reset_on_error) {
3089 if (error == GRPC_ERROR_NONE) {
3090 if (remaining_bytes_ != 0) {
3091 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
3092 }
3093 }
3094 if (error != GRPC_ERROR_NONE && reset_on_error) {
3095 transport_->combiner->Run(&stream_->reset_byte_stream,
3096 GRPC_ERROR_REF(error));
3097 }
3098 Unref();
3099 return error;
3100 }
3101
Shutdown(grpc_error * error)3102 void Chttp2IncomingByteStream::Shutdown(grpc_error* error) {
3103 GRPC_ERROR_UNREF(Finished(error, true /* reset_on_error */));
3104 }
3105
3106 } // namespace grpc_core
3107
3108 //
3109 // RESOURCE QUOTAS
3110 //
3111
post_benign_reclaimer(grpc_chttp2_transport * t)3112 static void post_benign_reclaimer(grpc_chttp2_transport* t) {
3113 if (!t->benign_reclaimer_registered) {
3114 t->benign_reclaimer_registered = true;
3115 GRPC_CHTTP2_REF_TRANSPORT(t, "benign_reclaimer");
3116 GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked, benign_reclaimer, t,
3117 grpc_schedule_on_exec_ctx);
3118 grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep),
3119 false, &t->benign_reclaimer_locked);
3120 }
3121 }
3122
post_destructive_reclaimer(grpc_chttp2_transport * t)3123 static void post_destructive_reclaimer(grpc_chttp2_transport* t) {
3124 if (!t->destructive_reclaimer_registered) {
3125 t->destructive_reclaimer_registered = true;
3126 GRPC_CHTTP2_REF_TRANSPORT(t, "destructive_reclaimer");
3127 GRPC_CLOSURE_INIT(&t->destructive_reclaimer_locked, destructive_reclaimer,
3128 t, grpc_schedule_on_exec_ctx);
3129 grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep),
3130 true, &t->destructive_reclaimer_locked);
3131 }
3132 }
3133
benign_reclaimer(void * arg,grpc_error * error)3134 static void benign_reclaimer(void* arg, grpc_error* error) {
3135 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
3136 t->combiner->Run(GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked,
3137 benign_reclaimer_locked, t, nullptr),
3138 GRPC_ERROR_REF(error));
3139 }
3140
benign_reclaimer_locked(void * arg,grpc_error * error)3141 static void benign_reclaimer_locked(void* arg, grpc_error* error) {
3142 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
3143 if (error == GRPC_ERROR_NONE &&
3144 grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
3145 // Channel with no active streams: send a goaway to try and make it
3146 // disconnect cleanly
3147 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
3148 gpr_log(GPR_INFO, "HTTP2: %s - send goaway to free memory",
3149 t->peer_string.c_str());
3150 }
3151 send_goaway(t,
3152 grpc_error_set_int(
3153 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"),
3154 GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM));
3155 } else if (error == GRPC_ERROR_NONE &&
3156 GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
3157 gpr_log(GPR_INFO,
3158 "HTTP2: %s - skip benign reclamation, there are still %" PRIdPTR
3159 " streams",
3160 t->peer_string.c_str(),
3161 grpc_chttp2_stream_map_size(&t->stream_map));
3162 }
3163 t->benign_reclaimer_registered = false;
3164 if (error != GRPC_ERROR_CANCELLED) {
3165 grpc_resource_user_finish_reclamation(
3166 grpc_endpoint_get_resource_user(t->ep));
3167 }
3168 GRPC_CHTTP2_UNREF_TRANSPORT(t, "benign_reclaimer");
3169 }
3170
destructive_reclaimer(void * arg,grpc_error * error)3171 static void destructive_reclaimer(void* arg, grpc_error* error) {
3172 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
3173 t->combiner->Run(GRPC_CLOSURE_INIT(&t->destructive_reclaimer_locked,
3174 destructive_reclaimer_locked, t, nullptr),
3175 GRPC_ERROR_REF(error));
3176 }
3177
destructive_reclaimer_locked(void * arg,grpc_error * error)3178 static void destructive_reclaimer_locked(void* arg, grpc_error* error) {
3179 grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
3180 size_t n = grpc_chttp2_stream_map_size(&t->stream_map);
3181 t->destructive_reclaimer_registered = false;
3182 if (error == GRPC_ERROR_NONE && n > 0) {
3183 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(
3184 grpc_chttp2_stream_map_rand(&t->stream_map));
3185 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
3186 gpr_log(GPR_INFO, "HTTP2: %s - abandon stream id %d",
3187 t->peer_string.c_str(), s->id);
3188 }
3189 grpc_chttp2_cancel_stream(
3190 t, s,
3191 grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"),
3192 GRPC_ERROR_INT_HTTP2_ERROR,
3193 GRPC_HTTP2_ENHANCE_YOUR_CALM));
3194 if (n > 1) {
3195 // Since we cancel one stream per destructive reclamation, if
3196 // there are more streams left, we can immediately post a new
3197 // reclaimer in case the resource quota needs to free more
3198 // memory
3199 post_destructive_reclaimer(t);
3200 }
3201 }
3202 if (error != GRPC_ERROR_CANCELLED) {
3203 grpc_resource_user_finish_reclamation(
3204 grpc_endpoint_get_resource_user(t->ep));
3205 }
3206 GRPC_CHTTP2_UNREF_TRANSPORT(t, "destructive_reclaimer");
3207 }
3208
3209 //
3210 // MONITORING
3211 //
3212
grpc_chttp2_initiate_write_reason_string(grpc_chttp2_initiate_write_reason reason)3213 const char* grpc_chttp2_initiate_write_reason_string(
3214 grpc_chttp2_initiate_write_reason reason) {
3215 switch (reason) {
3216 case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE:
3217 return "INITIAL_WRITE";
3218 case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM:
3219 return "START_NEW_STREAM";
3220 case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE:
3221 return "SEND_MESSAGE";
3222 case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA:
3223 return "SEND_INITIAL_METADATA";
3224 case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA:
3225 return "SEND_TRAILING_METADATA";
3226 case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING:
3227 return "RETRY_SEND_PING";
3228 case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS:
3229 return "CONTINUE_PINGS";
3230 case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT:
3231 return "GOAWAY_SENT";
3232 case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM:
3233 return "RST_STREAM";
3234 case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API:
3235 return "CLOSE_FROM_API";
3236 case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL:
3237 return "STREAM_FLOW_CONTROL";
3238 case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL:
3239 return "TRANSPORT_FLOW_CONTROL";
3240 case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS:
3241 return "SEND_SETTINGS";
3242 case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING:
3243 return "FLOW_CONTROL_UNSTALLED_BY_SETTING";
3244 case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE:
3245 return "FLOW_CONTROL_UNSTALLED_BY_UPDATE";
3246 case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING:
3247 return "APPLICATION_PING";
3248 case GRPC_CHTTP2_INITIATE_WRITE_BDP_PING:
3249 return "BDP_PING";
3250 case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING:
3251 return "KEEPALIVE_PING";
3252 case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED:
3253 return "TRANSPORT_FLOW_CONTROL_UNSTALLED";
3254 case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE:
3255 return "PING_RESPONSE";
3256 case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM:
3257 return "FORCE_RST_STREAM";
3258 }
3259 GPR_UNREACHABLE_CODE(return "unknown");
3260 }
3261
chttp2_get_endpoint(grpc_transport * t)3262 static grpc_endpoint* chttp2_get_endpoint(grpc_transport* t) {
3263 return (reinterpret_cast<grpc_chttp2_transport*>(t))->ep;
3264 }
3265
3266 static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
3267 "chttp2",
3268 init_stream,
3269 set_pollset,
3270 set_pollset_set,
3271 perform_stream_op,
3272 perform_transport_op,
3273 destroy_stream,
3274 destroy_transport,
3275 chttp2_get_endpoint};
3276
get_vtable(void)3277 static const grpc_transport_vtable* get_vtable(void) { return &vtable; }
3278
3279 grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode>
grpc_chttp2_transport_get_socket_node(grpc_transport * transport)3280 grpc_chttp2_transport_get_socket_node(grpc_transport* transport) {
3281 grpc_chttp2_transport* t =
3282 reinterpret_cast<grpc_chttp2_transport*>(transport);
3283 return t->channelz_socket;
3284 }
3285
grpc_create_chttp2_transport(const grpc_channel_args * channel_args,grpc_endpoint * ep,bool is_client,grpc_resource_user * resource_user)3286 grpc_transport* grpc_create_chttp2_transport(
3287 const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client,
3288 grpc_resource_user* resource_user) {
3289 auto t =
3290 new grpc_chttp2_transport(channel_args, ep, is_client, resource_user);
3291 return &t->base;
3292 }
3293
grpc_chttp2_transport_start_reading(grpc_transport * transport,grpc_slice_buffer * read_buffer,grpc_closure * notify_on_receive_settings)3294 void grpc_chttp2_transport_start_reading(
3295 grpc_transport* transport, grpc_slice_buffer* read_buffer,
3296 grpc_closure* notify_on_receive_settings) {
3297 grpc_chttp2_transport* t =
3298 reinterpret_cast<grpc_chttp2_transport*>(transport);
3299 GRPC_CHTTP2_REF_TRANSPORT(
3300 t, "reading_action"); /* matches unref inside reading_action */
3301 if (read_buffer != nullptr) {
3302 grpc_slice_buffer_move_into(read_buffer, &t->read_buffer);
3303 gpr_free(read_buffer);
3304 }
3305 t->notify_on_receive_settings = notify_on_receive_settings;
3306 t->combiner->Run(
3307 GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t, nullptr),
3308 GRPC_ERROR_NONE);
3309 }
3310