• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
20 
21 #include "absl/strings/str_format.h"
22 
23 #include <grpc/slice_buffer.h>
24 #include <grpc/support/alloc.h>
25 #include <grpc/support/log.h>
26 #include <grpc/support/port_platform.h>
27 #include <grpc/support/string_util.h>
28 #include <inttypes.h>
29 #include <limits.h>
30 #include <math.h>
31 #include <stdio.h>
32 #include <string.h>
33 
34 #include "src/core/ext/transport/chttp2/transport/context_list.h"
35 #include "src/core/ext/transport/chttp2/transport/frame_data.h"
36 #include "src/core/ext/transport/chttp2/transport/internal.h"
37 #include "src/core/ext/transport/chttp2/transport/varint.h"
38 #include "src/core/lib/channel/channel_args.h"
39 #include "src/core/lib/compression/stream_compression.h"
40 #include "src/core/lib/debug/stats.h"
41 #include "src/core/lib/gpr/env.h"
42 #include "src/core/lib/gpr/string.h"
43 #include "src/core/lib/gprpp/memory.h"
44 #include "src/core/lib/http/parser.h"
45 #include "src/core/lib/iomgr/executor.h"
46 #include "src/core/lib/iomgr/iomgr.h"
47 #include "src/core/lib/iomgr/timer.h"
48 #include "src/core/lib/profiling/timers.h"
49 #include "src/core/lib/slice/slice_internal.h"
50 #include "src/core/lib/slice/slice_string_helpers.h"
51 #include "src/core/lib/transport/error_utils.h"
52 #include "src/core/lib/transport/http2_errors.h"
53 #include "src/core/lib/transport/static_metadata.h"
54 #include "src/core/lib/transport/status_conversion.h"
55 #include "src/core/lib/transport/timeout_encoding.h"
56 #include "src/core/lib/transport/transport.h"
57 #include "src/core/lib/transport/transport_impl.h"
58 #include "src/core/lib/uri/uri_parser.h"
59 
60 #define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
61 #define MAX_WINDOW 0x7fffffffu
62 #define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024)
63 #define DEFAULT_MAX_HEADER_LIST_SIZE (8 * 1024)
64 
65 #define DEFAULT_CLIENT_KEEPALIVE_TIME_MS INT_MAX
66 #define DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_MS 20000 /* 20 seconds */
67 #define DEFAULT_SERVER_KEEPALIVE_TIME_MS 7200000  /* 2 hours */
68 #define DEFAULT_SERVER_KEEPALIVE_TIMEOUT_MS 20000 /* 20 seconds */
69 #define DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS false
70 #define KEEPALIVE_TIME_BACKOFF_MULTIPLIER 2
71 
72 #define DEFAULT_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS 300000 /* 5 minutes */
73 #define DEFAULT_MAX_PINGS_BETWEEN_DATA 2
74 #define DEFAULT_MAX_PING_STRIKES 2
75 
76 #define DEFAULT_MAX_PENDING_INDUCED_FRAMES 10000
77 
78 static int g_default_client_keepalive_time_ms =
79     DEFAULT_CLIENT_KEEPALIVE_TIME_MS;
80 static int g_default_client_keepalive_timeout_ms =
81     DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_MS;
82 static int g_default_server_keepalive_time_ms =
83     DEFAULT_SERVER_KEEPALIVE_TIME_MS;
84 static int g_default_server_keepalive_timeout_ms =
85     DEFAULT_SERVER_KEEPALIVE_TIMEOUT_MS;
86 static bool g_default_client_keepalive_permit_without_calls =
87     DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS;
88 static bool g_default_server_keepalive_permit_without_calls =
89     DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS;
90 
91 static int g_default_min_recv_ping_interval_without_data_ms =
92     DEFAULT_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS;
93 static int g_default_max_pings_without_data = DEFAULT_MAX_PINGS_BETWEEN_DATA;
94 static int g_default_max_ping_strikes = DEFAULT_MAX_PING_STRIKES;
95 
96 #define MAX_CLIENT_STREAM_ID 0x7fffffffu
97 grpc_core::TraceFlag grpc_http_trace(false, "http");
98 grpc_core::TraceFlag grpc_keepalive_trace(false, "http_keepalive");
99 grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount(false,
100                                                          "chttp2_refcount");
101 
102 // forward declarations of various callbacks that we'll build closures around
103 static void write_action_begin_locked(void* t, grpc_error_handle error);
104 static void write_action(void* t, grpc_error_handle error);
105 static void write_action_end(void* t, grpc_error_handle error);
106 static void write_action_end_locked(void* t, grpc_error_handle error);
107 
108 static void read_action(void* t, grpc_error_handle error);
109 static void read_action_locked(void* t, grpc_error_handle error);
110 static void continue_read_action_locked(grpc_chttp2_transport* t);
111 
112 static void complete_fetch(void* gs, grpc_error_handle error);
113 static void complete_fetch_locked(void* gs, grpc_error_handle error);
114 // Set a transport level setting, and push it to our peer
115 static void queue_setting_update(grpc_chttp2_transport* t,
116                                  grpc_chttp2_setting_id id, uint32_t value);
117 
118 static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
119                            grpc_error_handle error);
120 
121 // Start new streams that have been created if we can
122 static void maybe_start_some_streams(grpc_chttp2_transport* t);
123 
124 static void connectivity_state_set(grpc_chttp2_transport* t,
125                                    grpc_connectivity_state state,
126                                    const absl::Status& status,
127                                    const char* reason);
128 
129 static void benign_reclaimer(void* arg, grpc_error_handle error);
130 static void destructive_reclaimer(void* arg, grpc_error_handle error);
131 static void benign_reclaimer_locked(void* arg, grpc_error_handle error);
132 static void destructive_reclaimer_locked(void* arg, grpc_error_handle error);
133 
134 static void post_benign_reclaimer(grpc_chttp2_transport* t);
135 static void post_destructive_reclaimer(grpc_chttp2_transport* t);
136 
137 static void close_transport_locked(grpc_chttp2_transport* t,
138                                    grpc_error_handle error);
139 static void end_all_the_calls(grpc_chttp2_transport* t,
140                               grpc_error_handle error);
141 
142 static void start_bdp_ping(void* tp, grpc_error_handle error);
143 static void finish_bdp_ping(void* tp, grpc_error_handle error);
144 static void start_bdp_ping_locked(void* tp, grpc_error_handle error);
145 static void finish_bdp_ping_locked(void* tp, grpc_error_handle error);
146 static void next_bdp_ping_timer_expired(void* tp, grpc_error_handle error);
147 static void next_bdp_ping_timer_expired_locked(void* tp,
148                                                grpc_error_handle error);
149 
150 static void cancel_pings(grpc_chttp2_transport* t, grpc_error_handle error);
151 static void send_ping_locked(grpc_chttp2_transport* t,
152                              grpc_closure* on_initiate, grpc_closure* on_ack);
153 static void retry_initiate_ping_locked(void* tp, grpc_error_handle error);
154 
155 // keepalive-relevant functions
156 static void init_keepalive_ping(void* arg, grpc_error_handle error);
157 static void init_keepalive_ping_locked(void* arg, grpc_error_handle error);
158 static void start_keepalive_ping(void* arg, grpc_error_handle error);
159 static void finish_keepalive_ping(void* arg, grpc_error_handle error);
160 static void start_keepalive_ping_locked(void* arg, grpc_error_handle error);
161 static void finish_keepalive_ping_locked(void* arg, grpc_error_handle error);
162 static void keepalive_watchdog_fired(void* arg, grpc_error_handle error);
163 static void keepalive_watchdog_fired_locked(void* arg, grpc_error_handle error);
164 
165 static void reset_byte_stream(void* arg, grpc_error_handle error);
166 
167 // Flow control default enabled. Can be disabled by setting
168 // GRPC_EXPERIMENTAL_DISABLE_FLOW_CONTROL
169 bool g_flow_control_enabled = true;
170 
171 namespace grpc_core {
172 
173 namespace {
174 TestOnlyGlobalHttp2TransportInitCallback test_only_init_callback = nullptr;
175 TestOnlyGlobalHttp2TransportDestructCallback test_only_destruct_callback =
176     nullptr;
177 }  // namespace
178 
TestOnlySetGlobalHttp2TransportInitCallback(TestOnlyGlobalHttp2TransportInitCallback callback)179 void TestOnlySetGlobalHttp2TransportInitCallback(
180     TestOnlyGlobalHttp2TransportInitCallback callback) {
181   test_only_init_callback = callback;
182 }
183 
TestOnlySetGlobalHttp2TransportDestructCallback(TestOnlyGlobalHttp2TransportDestructCallback callback)184 void TestOnlySetGlobalHttp2TransportDestructCallback(
185     TestOnlyGlobalHttp2TransportDestructCallback callback) {
186   test_only_destruct_callback = callback;
187 }
188 
189 }  // namespace grpc_core
190 
191 //
192 // CONSTRUCTION/DESTRUCTION/REFCOUNTING
193 //
194 
~grpc_chttp2_transport()195 grpc_chttp2_transport::~grpc_chttp2_transport() {
196   size_t i;
197 
198   if (channelz_socket != nullptr) {
199     channelz_socket.reset();
200   }
201 
202   grpc_endpoint_destroy(ep);
203 
204   grpc_slice_buffer_destroy_internal(&qbuf);
205 
206   grpc_slice_buffer_destroy_internal(&outbuf);
207   grpc_chttp2_hpack_compressor_destroy(&hpack_compressor);
208 
209   grpc_error_handle error =
210       GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed");
211   // ContextList::Execute follows semantics of a callback function and does not
212   // take a ref on error
213   grpc_core::ContextList::Execute(cl, nullptr, error);
214   GRPC_ERROR_UNREF(error);
215   cl = nullptr;
216 
217   grpc_slice_buffer_destroy_internal(&read_buffer);
218   grpc_chttp2_hpack_parser_destroy(&hpack_parser);
219   grpc_chttp2_goaway_parser_destroy(&goaway_parser);
220 
221   for (i = 0; i < STREAM_LIST_COUNT; i++) {
222     GPR_ASSERT(lists[i].head == nullptr);
223     GPR_ASSERT(lists[i].tail == nullptr);
224   }
225 
226   GRPC_ERROR_UNREF(goaway_error);
227 
228   GPR_ASSERT(grpc_chttp2_stream_map_size(&stream_map) == 0);
229 
230   grpc_chttp2_stream_map_destroy(&stream_map);
231 
232   GRPC_COMBINER_UNREF(combiner, "chttp2_transport");
233 
234   cancel_pings(this,
235                GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"));
236 
237   while (write_cb_pool) {
238     grpc_chttp2_write_cb* next = write_cb_pool->next;
239     gpr_free(write_cb_pool);
240     write_cb_pool = next;
241   }
242 
243   flow_control.Destroy();
244 
245   GRPC_ERROR_UNREF(closed_with_error);
246   gpr_free(ping_acks);
247   if (grpc_core::test_only_destruct_callback != nullptr) {
248     grpc_core::test_only_destruct_callback();
249   }
250 }
251 
252 static const grpc_transport_vtable* get_vtable(void);
253 
254 // Returns whether bdp is enabled
read_channel_args(grpc_chttp2_transport * t,const grpc_channel_args * channel_args,bool is_client)255 static bool read_channel_args(grpc_chttp2_transport* t,
256                               const grpc_channel_args* channel_args,
257                               bool is_client) {
258   bool enable_bdp = true;
259   bool channelz_enabled = GRPC_ENABLE_CHANNELZ_DEFAULT;
260   size_t i;
261   int j;
262 
263   for (i = 0; i < channel_args->num_args; i++) {
264     if (0 == strcmp(channel_args->args[i].key,
265                     GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER)) {
266       const grpc_integer_options options = {-1, 0, INT_MAX};
267       const int value =
268           grpc_channel_arg_get_integer(&channel_args->args[i], options);
269       if (value >= 0) {
270         if ((t->next_stream_id & 1) != (value & 1)) {
271           gpr_log(GPR_ERROR, "%s: low bit must be %d on %s",
272                   GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER, t->next_stream_id & 1,
273                   is_client ? "client" : "server");
274         } else {
275           t->next_stream_id = static_cast<uint32_t>(value);
276         }
277       }
278     } else if (0 == strcmp(channel_args->args[i].key,
279                            GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER)) {
280       const grpc_integer_options options = {-1, 0, INT_MAX};
281       const int value =
282           grpc_channel_arg_get_integer(&channel_args->args[i], options);
283       if (value >= 0) {
284         grpc_chttp2_hpack_compressor_set_max_usable_size(
285             &t->hpack_compressor, static_cast<uint32_t>(value));
286       }
287     } else if (0 == strcmp(channel_args->args[i].key,
288                            GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) {
289       t->ping_policy.max_pings_without_data = grpc_channel_arg_get_integer(
290           &channel_args->args[i],
291           {g_default_max_pings_without_data, 0, INT_MAX});
292     } else if (0 == strcmp(channel_args->args[i].key,
293                            GRPC_ARG_HTTP2_MAX_PING_STRIKES)) {
294       t->ping_policy.max_ping_strikes = grpc_channel_arg_get_integer(
295           &channel_args->args[i], {g_default_max_ping_strikes, 0, INT_MAX});
296     } else if (0 ==
297                strcmp(channel_args->args[i].key,
298                       GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS)) {
299       t->ping_policy.min_recv_ping_interval_without_data =
300           grpc_channel_arg_get_integer(
301               &channel_args->args[i],
302               grpc_integer_options{
303                   g_default_min_recv_ping_interval_without_data_ms, 0,
304                   INT_MAX});
305     } else if (0 == strcmp(channel_args->args[i].key,
306                            GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)) {
307       t->write_buffer_size = static_cast<uint32_t>(grpc_channel_arg_get_integer(
308           &channel_args->args[i], {0, 0, MAX_WRITE_BUFFER_SIZE}));
309     } else if (0 ==
310                strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) {
311       enable_bdp = grpc_channel_arg_get_bool(&channel_args->args[i], true);
312     } else if (0 ==
313                strcmp(channel_args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) {
314       const int value = grpc_channel_arg_get_integer(
315           &channel_args->args[i],
316           grpc_integer_options{t->is_client
317                                    ? g_default_client_keepalive_time_ms
318                                    : g_default_server_keepalive_time_ms,
319                                1, INT_MAX});
320       t->keepalive_time = value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value;
321     } else if (0 == strcmp(channel_args->args[i].key,
322                            GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) {
323       const int value = grpc_channel_arg_get_integer(
324           &channel_args->args[i],
325           grpc_integer_options{t->is_client
326                                    ? g_default_client_keepalive_timeout_ms
327                                    : g_default_server_keepalive_timeout_ms,
328                                0, INT_MAX});
329       t->keepalive_timeout = value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value;
330     } else if (0 == strcmp(channel_args->args[i].key,
331                            GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) {
332       t->keepalive_permit_without_calls = static_cast<uint32_t>(
333           grpc_channel_arg_get_integer(&channel_args->args[i], {0, 0, 1}));
334     } else if (0 == strcmp(channel_args->args[i].key,
335                            GRPC_ARG_OPTIMIZATION_TARGET)) {
336       gpr_log(GPR_INFO, "GRPC_ARG_OPTIMIZATION_TARGET is deprecated");
337     } else if (0 ==
338                strcmp(channel_args->args[i].key, GRPC_ARG_ENABLE_CHANNELZ)) {
339       channelz_enabled = grpc_channel_arg_get_bool(
340           &channel_args->args[i], GRPC_ENABLE_CHANNELZ_DEFAULT);
341     } else {
342       static const struct {
343         const char* channel_arg_name;
344         grpc_chttp2_setting_id setting_id;
345         grpc_integer_options integer_options;
346         bool availability[2] /* server, client */;
347       } settings_map[] = {{GRPC_ARG_MAX_CONCURRENT_STREAMS,
348                            GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
349                            {-1, 0, INT32_MAX},
350                            {true, false}},
351                           {GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER,
352                            GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE,
353                            {-1, 0, INT32_MAX},
354                            {true, true}},
355                           {GRPC_ARG_MAX_METADATA_SIZE,
356                            GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
357                            {-1, 0, INT32_MAX},
358                            {true, true}},
359                           {GRPC_ARG_HTTP2_MAX_FRAME_SIZE,
360                            GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
361                            {-1, 16384, 16777215},
362                            {true, true}},
363                           {GRPC_ARG_HTTP2_ENABLE_TRUE_BINARY,
364                            GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA,
365                            {1, 0, 1},
366                            {true, true}},
367                           {GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES,
368                            GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
369                            {-1, 5, INT32_MAX},
370                            {true, true}}};
371       for (j = 0; j < static_cast<int> GPR_ARRAY_SIZE(settings_map); j++) {
372         if (0 == strcmp(channel_args->args[i].key,
373                         settings_map[j].channel_arg_name)) {
374           if (!settings_map[j].availability[is_client]) {
375             gpr_log(GPR_DEBUG, "%s is not available on %s",
376                     settings_map[j].channel_arg_name,
377                     is_client ? "clients" : "servers");
378           } else {
379             int value = grpc_channel_arg_get_integer(
380                 &channel_args->args[i], settings_map[j].integer_options);
381             if (value >= 0) {
382               queue_setting_update(t, settings_map[j].setting_id,
383                                    static_cast<uint32_t>(value));
384             }
385           }
386           break;
387         }
388       }
389     }
390   }
391   if (channelz_enabled) {
392     t->channelz_socket =
393         grpc_core::MakeRefCounted<grpc_core::channelz::SocketNode>(
394             std::string(grpc_endpoint_get_local_address(t->ep)), t->peer_string,
395             absl::StrFormat("%s %s", get_vtable()->name, t->peer_string),
396             grpc_core::channelz::SocketNode::Security::GetFromChannelArgs(
397                 channel_args));
398   }
399   return enable_bdp;
400 }
401 
init_transport_keepalive_settings(grpc_chttp2_transport * t)402 static void init_transport_keepalive_settings(grpc_chttp2_transport* t) {
403   if (t->is_client) {
404     t->keepalive_time = g_default_client_keepalive_time_ms == INT_MAX
405                             ? GRPC_MILLIS_INF_FUTURE
406                             : g_default_client_keepalive_time_ms;
407     t->keepalive_timeout = g_default_client_keepalive_timeout_ms == INT_MAX
408                                ? GRPC_MILLIS_INF_FUTURE
409                                : g_default_client_keepalive_timeout_ms;
410     t->keepalive_permit_without_calls =
411         g_default_client_keepalive_permit_without_calls;
412   } else {
413     t->keepalive_time = g_default_server_keepalive_time_ms == INT_MAX
414                             ? GRPC_MILLIS_INF_FUTURE
415                             : g_default_server_keepalive_time_ms;
416     t->keepalive_timeout = g_default_server_keepalive_timeout_ms == INT_MAX
417                                ? GRPC_MILLIS_INF_FUTURE
418                                : g_default_server_keepalive_timeout_ms;
419     t->keepalive_permit_without_calls =
420         g_default_server_keepalive_permit_without_calls;
421   }
422 }
423 
configure_transport_ping_policy(grpc_chttp2_transport * t)424 static void configure_transport_ping_policy(grpc_chttp2_transport* t) {
425   t->ping_policy.max_pings_without_data = g_default_max_pings_without_data;
426   t->ping_policy.max_ping_strikes = g_default_max_ping_strikes;
427   t->ping_policy.min_recv_ping_interval_without_data =
428       g_default_min_recv_ping_interval_without_data_ms;
429 }
430 
init_keepalive_pings_if_enabled(grpc_chttp2_transport * t)431 static void init_keepalive_pings_if_enabled(grpc_chttp2_transport* t) {
432   if (t->keepalive_time != GRPC_MILLIS_INF_FUTURE) {
433     t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
434     GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
435     GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t,
436                       grpc_schedule_on_exec_ctx);
437     grpc_timer_init(&t->keepalive_ping_timer,
438                     grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
439                     &t->init_keepalive_ping_locked);
440   } else {
441     // Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no
442     //   inflight keeaplive timers
443     t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED;
444   }
445 }
446 
grpc_chttp2_transport(const grpc_channel_args * channel_args,grpc_endpoint * ep,bool is_client,grpc_resource_user * resource_user)447 grpc_chttp2_transport::grpc_chttp2_transport(
448     const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client,
449     grpc_resource_user* resource_user)
450     : refs(1, GRPC_TRACE_FLAG_ENABLED(grpc_trace_chttp2_refcount)
451                   ? "chttp2_refcount"
452                   : nullptr),
453       ep(ep),
454       peer_string(grpc_endpoint_get_peer(ep)),
455       resource_user(resource_user),
456       combiner(grpc_combiner_create()),
457       state_tracker(is_client ? "client_transport" : "server_transport",
458                     GRPC_CHANNEL_READY),
459       is_client(is_client),
460       next_stream_id(is_client ? 1 : 2),
461       deframe_state(is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0) {
462   GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) ==
463              GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
464   base.vtable = get_vtable();
465   // 8 is a random stab in the dark as to a good initial size: it's small enough
466   //   that it shouldn't waste memory for infrequently used connections, yet
467   //   large enough that the exponential growth should happen nicely when it's
468   //   needed.
469   //   TODO(ctiller): tune this
470   grpc_chttp2_stream_map_init(&stream_map, 8);
471 
472   grpc_slice_buffer_init(&read_buffer);
473   grpc_slice_buffer_init(&outbuf);
474   if (is_client) {
475     grpc_slice_buffer_add(&outbuf, grpc_slice_from_copied_string(
476                                        GRPC_CHTTP2_CLIENT_CONNECT_STRING));
477   }
478   grpc_chttp2_hpack_compressor_init(&hpack_compressor);
479   grpc_slice_buffer_init(&qbuf);
480   // copy in initial settings to all setting sets
481   size_t i;
482   int j;
483   for (i = 0; i < GRPC_CHTTP2_NUM_SETTINGS; i++) {
484     for (j = 0; j < GRPC_NUM_SETTING_SETS; j++) {
485       settings[j][i] = grpc_chttp2_settings_parameters[i].default_value;
486     }
487   }
488   grpc_chttp2_hpack_parser_init(&hpack_parser);
489   grpc_chttp2_goaway_parser_init(&goaway_parser);
490 
491   // configure http2 the way we like it
492   if (is_client) {
493     queue_setting_update(this, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
494     queue_setting_update(this, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
495   }
496   queue_setting_update(this, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
497                        DEFAULT_MAX_HEADER_LIST_SIZE);
498   queue_setting_update(this,
499                        GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, 1);
500 
501   configure_transport_ping_policy(this);
502   init_transport_keepalive_settings(this);
503 
504   bool enable_bdp = true;
505   if (channel_args) {
506     enable_bdp = read_channel_args(this, channel_args, is_client);
507   }
508 
509   if (g_flow_control_enabled) {
510     flow_control.Init<grpc_core::chttp2::TransportFlowControl>(this,
511                                                                enable_bdp);
512   } else {
513     flow_control.Init<grpc_core::chttp2::TransportFlowControlDisabled>(this);
514     enable_bdp = false;
515   }
516 
517   // No pings allowed before receiving a header or data frame.
518   ping_state.pings_before_data_required = 0;
519   ping_state.is_delayed_ping_timer_set = false;
520   ping_state.last_ping_sent_time = GRPC_MILLIS_INF_PAST;
521 
522   ping_recv_state.last_ping_recv_time = GRPC_MILLIS_INF_PAST;
523   ping_recv_state.ping_strikes = 0;
524 
525   init_keepalive_pings_if_enabled(this);
526 
527   if (enable_bdp) {
528     bdp_ping_blocked = true;
529     grpc_chttp2_act_on_flowctl_action(flow_control->PeriodicUpdate(), this,
530                                       nullptr);
531   }
532 
533   grpc_chttp2_initiate_write(this, GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE);
534   post_benign_reclaimer(this);
535   if (grpc_core::test_only_init_callback != nullptr) {
536     grpc_core::test_only_init_callback();
537   }
538 }
539 
destroy_transport_locked(void * tp,grpc_error_handle)540 static void destroy_transport_locked(void* tp, grpc_error_handle /*error*/) {
541   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
542   t->destroying = 1;
543   close_transport_locked(
544       t, grpc_error_set_int(
545              GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"),
546              GRPC_ERROR_INT_OCCURRED_DURING_WRITE, t->write_state));
547   // Must be the last line.
548   GRPC_CHTTP2_UNREF_TRANSPORT(t, "destroy");
549 }
550 
destroy_transport(grpc_transport * gt)551 static void destroy_transport(grpc_transport* gt) {
552   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
553   t->combiner->Run(GRPC_CLOSURE_CREATE(destroy_transport_locked, t, nullptr),
554                    GRPC_ERROR_NONE);
555 }
556 
close_transport_locked(grpc_chttp2_transport * t,grpc_error_handle error)557 static void close_transport_locked(grpc_chttp2_transport* t,
558                                    grpc_error_handle error) {
559   end_all_the_calls(t, GRPC_ERROR_REF(error));
560   cancel_pings(t, GRPC_ERROR_REF(error));
561   if (t->closed_with_error == GRPC_ERROR_NONE) {
562     if (!grpc_error_has_clear_grpc_status(error)) {
563       error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
564                                  GRPC_STATUS_UNAVAILABLE);
565     }
566     if (t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE) {
567       if (t->close_transport_on_writes_finished == nullptr) {
568         t->close_transport_on_writes_finished =
569             GRPC_ERROR_CREATE_FROM_STATIC_STRING(
570                 "Delayed close due to in-progress write");
571       }
572       t->close_transport_on_writes_finished =
573           grpc_error_add_child(t->close_transport_on_writes_finished, error);
574       return;
575     }
576     GPR_ASSERT(error != GRPC_ERROR_NONE);
577     t->closed_with_error = GRPC_ERROR_REF(error);
578     connectivity_state_set(t, GRPC_CHANNEL_SHUTDOWN, absl::Status(),
579                            "close_transport");
580     if (t->ping_state.is_delayed_ping_timer_set) {
581       grpc_timer_cancel(&t->ping_state.delayed_ping_timer);
582     }
583     if (t->have_next_bdp_ping_timer) {
584       grpc_timer_cancel(&t->next_bdp_ping_timer);
585     }
586     switch (t->keepalive_state) {
587       case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING:
588         grpc_timer_cancel(&t->keepalive_ping_timer);
589         break;
590       case GRPC_CHTTP2_KEEPALIVE_STATE_PINGING:
591         grpc_timer_cancel(&t->keepalive_ping_timer);
592         grpc_timer_cancel(&t->keepalive_watchdog_timer);
593         break;
594       case GRPC_CHTTP2_KEEPALIVE_STATE_DYING:
595       case GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED:
596         // keepalive timers are not set in these two states
597         break;
598     }
599 
600     // flush writable stream list to avoid dangling references
601     grpc_chttp2_stream* s;
602     while (grpc_chttp2_list_pop_writable_stream(t, &s)) {
603       GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:close");
604     }
605     GPR_ASSERT(t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE);
606     grpc_endpoint_shutdown(t->ep, GRPC_ERROR_REF(error));
607   }
608   if (t->notify_on_receive_settings != nullptr) {
609     grpc_core::ExecCtx::Run(DEBUG_LOCATION, t->notify_on_receive_settings,
610                             GRPC_ERROR_REF(error));
611     t->notify_on_receive_settings = nullptr;
612   }
613   if (t->notify_on_close != nullptr) {
614     grpc_core::ExecCtx::Run(DEBUG_LOCATION, t->notify_on_close,
615                             GRPC_ERROR_REF(error));
616     t->notify_on_close = nullptr;
617   }
618   GRPC_ERROR_UNREF(error);
619 }
620 
621 #ifndef NDEBUG
grpc_chttp2_stream_ref(grpc_chttp2_stream * s,const char * reason)622 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s, const char* reason) {
623   grpc_stream_ref(s->refcount, reason);
624 }
grpc_chttp2_stream_unref(grpc_chttp2_stream * s,const char * reason)625 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s, const char* reason) {
626   grpc_stream_unref(s->refcount, reason);
627 }
628 #else
grpc_chttp2_stream_ref(grpc_chttp2_stream * s)629 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s) {
630   grpc_stream_ref(s->refcount);
631 }
grpc_chttp2_stream_unref(grpc_chttp2_stream * s)632 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s) {
633   grpc_stream_unref(s->refcount);
634 }
635 #endif
636 
Reffer(grpc_chttp2_stream * s)637 grpc_chttp2_stream::Reffer::Reffer(grpc_chttp2_stream* s) {
638   // We reserve one 'active stream' that's dropped when the stream is
639   //   read-closed. The others are for Chttp2IncomingByteStreams that are
640   //   actively reading
641   GRPC_CHTTP2_STREAM_REF(s, "chttp2");
642   GRPC_CHTTP2_REF_TRANSPORT(s->t, "stream");
643 }
644 
grpc_chttp2_stream(grpc_chttp2_transport * t,grpc_stream_refcount * refcount,const void * server_data,grpc_core::Arena * arena)645 grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t,
646                                        grpc_stream_refcount* refcount,
647                                        const void* server_data,
648                                        grpc_core::Arena* arena)
649     : t(t),
650       refcount(refcount),
651       reffer(this),
652       metadata_buffer{grpc_chttp2_incoming_metadata_buffer(arena),
653                       grpc_chttp2_incoming_metadata_buffer(arena)} {
654   if (server_data) {
655     id = static_cast<uint32_t>(reinterpret_cast<uintptr_t>(server_data));
656     *t->accepting_stream = this;
657     grpc_chttp2_stream_map_add(&t->stream_map, id, this);
658     post_destructive_reclaimer(t);
659   }
660   if (t->flow_control->flow_control_enabled()) {
661     flow_control.Init<grpc_core::chttp2::StreamFlowControl>(
662         static_cast<grpc_core::chttp2::TransportFlowControl*>(
663             t->flow_control.get()),
664         this);
665   } else {
666     flow_control.Init<grpc_core::chttp2::StreamFlowControlDisabled>();
667   }
668 
669   grpc_slice_buffer_init(&frame_storage);
670   grpc_slice_buffer_init(&unprocessed_incoming_frames_buffer);
671   grpc_slice_buffer_init(&flow_controlled_buffer);
672   GRPC_CLOSURE_INIT(&reset_byte_stream, ::reset_byte_stream, this, nullptr);
673 }
674 
~grpc_chttp2_stream()675 grpc_chttp2_stream::~grpc_chttp2_stream() {
676   if (t->channelz_socket != nullptr) {
677     if ((t->is_client && eos_received) || (!t->is_client && eos_sent)) {
678       t->channelz_socket->RecordStreamSucceeded();
679     } else {
680       t->channelz_socket->RecordStreamFailed();
681     }
682   }
683 
684   GPR_ASSERT((write_closed && read_closed) || id == 0);
685   if (id != 0) {
686     GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, id) == nullptr);
687   }
688 
689   grpc_slice_buffer_destroy_internal(&unprocessed_incoming_frames_buffer);
690   grpc_slice_buffer_destroy_internal(&frame_storage);
691   if (stream_compression_method != GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) {
692     grpc_slice_buffer_destroy_internal(&compressed_data_buffer);
693   }
694   if (stream_decompression_method !=
695       GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
696     grpc_slice_buffer_destroy_internal(&decompressed_data_buffer);
697   }
698 
699   for (int i = 0; i < STREAM_LIST_COUNT; i++) {
700     if (GPR_UNLIKELY(included[i])) {
701       gpr_log(GPR_ERROR, "%s stream %d still included in list %d",
702               t->is_client ? "client" : "server", id, i);
703       abort();
704     }
705   }
706 
707   GPR_ASSERT(send_initial_metadata_finished == nullptr);
708   GPR_ASSERT(fetching_send_message == nullptr);
709   GPR_ASSERT(send_trailing_metadata_finished == nullptr);
710   GPR_ASSERT(recv_initial_metadata_ready == nullptr);
711   GPR_ASSERT(recv_message_ready == nullptr);
712   GPR_ASSERT(recv_trailing_metadata_finished == nullptr);
713   grpc_slice_buffer_destroy_internal(&flow_controlled_buffer);
714   GRPC_ERROR_UNREF(read_closed_error);
715   GRPC_ERROR_UNREF(write_closed_error);
716   GRPC_ERROR_UNREF(byte_stream_error);
717 
718   flow_control.Destroy();
719 
720   if (t->resource_user != nullptr) {
721     grpc_resource_user_free(t->resource_user, GRPC_RESOURCE_QUOTA_CALL_SIZE);
722   }
723 
724   GRPC_CHTTP2_UNREF_TRANSPORT(t, "stream");
725   grpc_core::ExecCtx::Run(DEBUG_LOCATION, destroy_stream_arg, GRPC_ERROR_NONE);
726 }
727 
init_stream(grpc_transport * gt,grpc_stream * gs,grpc_stream_refcount * refcount,const void * server_data,grpc_core::Arena * arena)728 static int init_stream(grpc_transport* gt, grpc_stream* gs,
729                        grpc_stream_refcount* refcount, const void* server_data,
730                        grpc_core::Arena* arena) {
731   GPR_TIMER_SCOPE("init_stream", 0);
732   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
733   new (gs) grpc_chttp2_stream(t, refcount, server_data, arena);
734   return 0;
735 }
736 
destroy_stream_locked(void * sp,grpc_error_handle)737 static void destroy_stream_locked(void* sp, grpc_error_handle /*error*/) {
738   GPR_TIMER_SCOPE("destroy_stream", 0);
739   grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(sp);
740   s->~grpc_chttp2_stream();
741 }
742 
destroy_stream(grpc_transport * gt,grpc_stream * gs,grpc_closure * then_schedule_closure)743 static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
744                            grpc_closure* then_schedule_closure) {
745   GPR_TIMER_SCOPE("destroy_stream", 0);
746   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
747   grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs);
748   if (s->stream_compression_method !=
749           GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS &&
750       s->stream_compression_ctx != nullptr) {
751     grpc_stream_compression_context_destroy(s->stream_compression_ctx);
752     s->stream_compression_ctx = nullptr;
753   }
754   if (s->stream_decompression_method !=
755           GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS &&
756       s->stream_decompression_ctx != nullptr) {
757     grpc_stream_compression_context_destroy(s->stream_decompression_ctx);
758     s->stream_decompression_ctx = nullptr;
759   }
760 
761   s->destroy_stream_arg = then_schedule_closure;
762   t->combiner->Run(
763       GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s, nullptr),
764       GRPC_ERROR_NONE);
765 }
766 
grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport * t,uint32_t id)767 grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t,
768                                                       uint32_t id) {
769   if (t->accept_stream_cb == nullptr) {
770     return nullptr;
771   }
772   // Don't accept the stream if memory quota doesn't allow. Note that we should
773   // simply refuse the stream here instead of canceling the stream after it's
774   // accepted since the latter will create the call which costs much memory.
775   if (t->resource_user != nullptr &&
776       !grpc_resource_user_safe_alloc(t->resource_user,
777                                      GRPC_RESOURCE_QUOTA_CALL_SIZE)) {
778     gpr_log(GPR_ERROR, "Memory exhausted, rejecting the stream.");
779     grpc_chttp2_add_rst_stream_to_next_write(t, id, GRPC_HTTP2_REFUSED_STREAM,
780                                              nullptr);
781     grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
782     return nullptr;
783   }
784   grpc_chttp2_stream* accepting = nullptr;
785   GPR_ASSERT(t->accepting_stream == nullptr);
786   t->accepting_stream = &accepting;
787   t->accept_stream_cb(t->accept_stream_cb_user_data, &t->base,
788                       reinterpret_cast<void*>(id));
789   t->accepting_stream = nullptr;
790   return accepting;
791 }
792 
793 //
794 // OUTPUT PROCESSING
795 //
796 
write_state_name(grpc_chttp2_write_state st)797 static const char* write_state_name(grpc_chttp2_write_state st) {
798   switch (st) {
799     case GRPC_CHTTP2_WRITE_STATE_IDLE:
800       return "IDLE";
801     case GRPC_CHTTP2_WRITE_STATE_WRITING:
802       return "WRITING";
803     case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
804       return "WRITING+MORE";
805   }
806   GPR_UNREACHABLE_CODE(return "UNKNOWN");
807 }
808 
set_write_state(grpc_chttp2_transport * t,grpc_chttp2_write_state st,const char * reason)809 static void set_write_state(grpc_chttp2_transport* t,
810                             grpc_chttp2_write_state st, const char* reason) {
811   GRPC_CHTTP2_IF_TRACING(
812       gpr_log(GPR_INFO, "W:%p %s [%s] state %s -> %s [%s]", t,
813               t->is_client ? "CLIENT" : "SERVER", t->peer_string.c_str(),
814               write_state_name(t->write_state), write_state_name(st), reason));
815   t->write_state = st;
816   // If the state is being reset back to idle, it means a write was just
817   // finished. Make sure all the run_after_write closures are scheduled.
818   //
819   // This is also our chance to close the transport if the transport was marked
820   // to be closed after all writes finish (for example, if we received a go-away
821   // from peer while we had some pending writes)
822   if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) {
823     grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &t->run_after_write);
824     if (t->close_transport_on_writes_finished != nullptr) {
825       grpc_error_handle err = t->close_transport_on_writes_finished;
826       t->close_transport_on_writes_finished = nullptr;
827       close_transport_locked(t, err);
828     }
829   }
830 }
831 
inc_initiate_write_reason(grpc_chttp2_initiate_write_reason reason)832 static void inc_initiate_write_reason(
833     grpc_chttp2_initiate_write_reason reason) {
834   switch (reason) {
835     case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE:
836       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_INITIAL_WRITE();
837       break;
838     case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM:
839       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_START_NEW_STREAM();
840       break;
841     case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE:
842       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_MESSAGE();
843       break;
844     case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA:
845       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_INITIAL_METADATA();
846       break;
847     case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA:
848       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_TRAILING_METADATA();
849       break;
850     case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING:
851       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RETRY_SEND_PING();
852       break;
853     case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS:
854       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CONTINUE_PINGS();
855       break;
856     case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT:
857       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_GOAWAY_SENT();
858       break;
859     case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM:
860       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RST_STREAM();
861       break;
862     case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API:
863       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CLOSE_FROM_API();
864       break;
865     case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL:
866       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_STREAM_FLOW_CONTROL();
867       break;
868     case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL:
869       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL();
870       break;
871     case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS:
872       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_SETTINGS();
873       break;
874     case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING:
875       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_SETTING();
876       break;
877     case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE:
878       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_UPDATE();
879       break;
880     case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING:
881       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_APPLICATION_PING();
882       break;
883     case GRPC_CHTTP2_INITIATE_WRITE_BDP_PING:
884       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_BDP_ESTIMATOR_PING();
885       break;
886     case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING:
887       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_KEEPALIVE_PING();
888       break;
889     case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED:
890       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL_UNSTALLED();
891       break;
892     case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE:
893       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_PING_RESPONSE();
894       break;
895     case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM:
896       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FORCE_RST_STREAM();
897       break;
898   }
899 }
900 
grpc_chttp2_initiate_write(grpc_chttp2_transport * t,grpc_chttp2_initiate_write_reason reason)901 void grpc_chttp2_initiate_write(grpc_chttp2_transport* t,
902                                 grpc_chttp2_initiate_write_reason reason) {
903   GPR_TIMER_SCOPE("grpc_chttp2_initiate_write", 0);
904 
905   switch (t->write_state) {
906     case GRPC_CHTTP2_WRITE_STATE_IDLE:
907       inc_initiate_write_reason(reason);
908       set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING,
909                       grpc_chttp2_initiate_write_reason_string(reason));
910       GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
911       // Note that the 'write_action_begin_locked' closure is being scheduled
912       // on the 'finally_scheduler' of t->combiner. This means that
913       // 'write_action_begin_locked' is called only *after* all the other
914       // closures (some of which are potentially initiating more writes on the
915       // transport) are executed on the t->combiner.
916       //
917       // The reason for scheduling on finally_scheduler is to make sure we batch
918       // as many writes as possible. 'write_action_begin_locked' is the function
919       // that gathers all the relevant bytes (which are at various places in the
920       // grpc_chttp2_transport structure) and append them to 'outbuf' field in
921       // grpc_chttp2_transport thereby batching what would have been potentially
922       // multiple write operations.
923       //
924       // Also, 'write_action_begin_locked' only gathers the bytes into outbuf.
925       // It does not call the endpoint to write the bytes. That is done by the
926       // 'write_action' (which is scheduled by 'write_action_begin_locked')
927       t->combiner->FinallyRun(
928           GRPC_CLOSURE_INIT(&t->write_action_begin_locked,
929                             write_action_begin_locked, t, nullptr),
930           GRPC_ERROR_NONE);
931       break;
932     case GRPC_CHTTP2_WRITE_STATE_WRITING:
933       set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
934                       grpc_chttp2_initiate_write_reason_string(reason));
935       break;
936     case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
937       break;
938   }
939 }
940 
grpc_chttp2_mark_stream_writable(grpc_chttp2_transport * t,grpc_chttp2_stream * s)941 void grpc_chttp2_mark_stream_writable(grpc_chttp2_transport* t,
942                                       grpc_chttp2_stream* s) {
943   if (t->closed_with_error == GRPC_ERROR_NONE &&
944       grpc_chttp2_list_add_writable_stream(t, s)) {
945     GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become");
946   }
947 }
948 
begin_writing_desc(bool partial)949 static const char* begin_writing_desc(bool partial) {
950   if (partial) {
951     return "begin partial write in background";
952   } else {
953     return "begin write in current thread";
954   }
955 }
956 
write_action_begin_locked(void * gt,grpc_error_handle)957 static void write_action_begin_locked(void* gt,
958                                       grpc_error_handle /*error_ignored*/) {
959   GPR_TIMER_SCOPE("write_action_begin_locked", 0);
960   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(gt);
961   GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE);
962   grpc_chttp2_begin_write_result r;
963   if (t->closed_with_error != GRPC_ERROR_NONE) {
964     r.writing = false;
965   } else {
966     r = grpc_chttp2_begin_write(t);
967   }
968   if (r.writing) {
969     if (r.partial) {
970       GRPC_STATS_INC_HTTP2_PARTIAL_WRITES();
971     }
972     set_write_state(t,
973                     r.partial ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE
974                               : GRPC_CHTTP2_WRITE_STATE_WRITING,
975                     begin_writing_desc(r.partial));
976     write_action(t, GRPC_ERROR_NONE);
977     if (t->reading_paused_on_pending_induced_frames) {
978       GPR_ASSERT(t->num_pending_induced_frames == 0);
979       // We had paused reading, because we had many induced frames (SETTINGS
980       // ACK, PINGS ACK and RST_STREAMS) pending in t->qbuf. Now that we have
981       // been able to flush qbuf, we can resume reading.
982       GRPC_CHTTP2_IF_TRACING(gpr_log(
983           GPR_INFO,
984           "transport %p : Resuming reading after being paused due to too "
985           "many unwritten SETTINGS ACK, PINGS ACK and RST_STREAM frames",
986           t));
987       t->reading_paused_on_pending_induced_frames = false;
988       continue_read_action_locked(t);
989     }
990   } else {
991     GRPC_STATS_INC_HTTP2_SPURIOUS_WRITES_BEGUN();
992     set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "begin writing nothing");
993     GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing");
994   }
995 }
996 
write_action(void * gt,grpc_error_handle)997 static void write_action(void* gt, grpc_error_handle /*error*/) {
998   GPR_TIMER_SCOPE("write_action", 0);
999   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(gt);
1000   void* cl = t->cl;
1001   t->cl = nullptr;
1002   grpc_endpoint_write(
1003       t->ep, &t->outbuf,
1004       GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end, t,
1005                         grpc_schedule_on_exec_ctx),
1006       cl);
1007 }
1008 
write_action_end(void * tp,grpc_error_handle error)1009 static void write_action_end(void* tp, grpc_error_handle error) {
1010   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
1011   t->combiner->Run(GRPC_CLOSURE_INIT(&t->write_action_end_locked,
1012                                      write_action_end_locked, t, nullptr),
1013                    GRPC_ERROR_REF(error));
1014 }
1015 
1016 // Callback from the grpc_endpoint after bytes have been written by calling
1017 // sendmsg
write_action_end_locked(void * tp,grpc_error_handle error)1018 static void write_action_end_locked(void* tp, grpc_error_handle error) {
1019   GPR_TIMER_SCOPE("terminate_writing_with_lock", 0);
1020   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
1021 
1022   bool closed = false;
1023   if (error != GRPC_ERROR_NONE) {
1024     close_transport_locked(t, GRPC_ERROR_REF(error));
1025     closed = true;
1026   }
1027 
1028   if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED) {
1029     t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SENT;
1030     closed = true;
1031     if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
1032       close_transport_locked(
1033           t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("goaway sent"));
1034     }
1035   }
1036 
1037   switch (t->write_state) {
1038     case GRPC_CHTTP2_WRITE_STATE_IDLE:
1039       GPR_UNREACHABLE_CODE(break);
1040     case GRPC_CHTTP2_WRITE_STATE_WRITING:
1041       GPR_TIMER_MARK("state=writing", 0);
1042       set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "finish writing");
1043       break;
1044     case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
1045       GPR_TIMER_MARK("state=writing_stale_no_poller", 0);
1046       set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, "continue writing");
1047       GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
1048       // If the transport is closed, we will retry writing on the endpoint
1049       // and next write may contain part of the currently serialized frames.
1050       // So, we should only call the run_after_write callbacks when the next
1051       // write finishes, or the callbacks will be invoked when the stream is
1052       // closed.
1053       if (!closed) {
1054         grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &t->run_after_write);
1055       }
1056       t->combiner->FinallyRun(
1057           GRPC_CLOSURE_INIT(&t->write_action_begin_locked,
1058                             write_action_begin_locked, t, nullptr),
1059           GRPC_ERROR_NONE);
1060       break;
1061   }
1062 
1063   grpc_chttp2_end_write(t, GRPC_ERROR_REF(error));
1064   GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing");
1065 }
1066 
1067 // Dirties an HTTP2 setting to be sent out next time a writing path occurs.
1068 // If the change needs to occur immediately, manually initiate a write.
queue_setting_update(grpc_chttp2_transport * t,grpc_chttp2_setting_id id,uint32_t value)1069 static void queue_setting_update(grpc_chttp2_transport* t,
1070                                  grpc_chttp2_setting_id id, uint32_t value) {
1071   const grpc_chttp2_setting_parameters* sp =
1072       &grpc_chttp2_settings_parameters[id];
1073   uint32_t use_value = GPR_CLAMP(value, sp->min_value, sp->max_value);
1074   if (use_value != value) {
1075     gpr_log(GPR_INFO, "Requested parameter %s clamped from %d to %d", sp->name,
1076             value, use_value);
1077   }
1078   if (use_value != t->settings[GRPC_LOCAL_SETTINGS][id]) {
1079     t->settings[GRPC_LOCAL_SETTINGS][id] = use_value;
1080     t->dirtied_local_settings = true;
1081   }
1082 }
1083 
grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport * t,uint32_t goaway_error,uint32_t last_stream_id,const grpc_slice & goaway_text)1084 void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
1085                                      uint32_t goaway_error,
1086                                      uint32_t last_stream_id,
1087                                      const grpc_slice& goaway_text) {
1088   // Discard the error from a previous goaway frame (if any)
1089   if (t->goaway_error != GRPC_ERROR_NONE) {
1090     GRPC_ERROR_UNREF(t->goaway_error);
1091   }
1092   t->goaway_error = grpc_error_set_str(
1093       grpc_error_set_int(
1094           grpc_error_set_int(
1095               GRPC_ERROR_CREATE_FROM_STATIC_STRING("GOAWAY received"),
1096               GRPC_ERROR_INT_HTTP2_ERROR, static_cast<intptr_t>(goaway_error)),
1097           GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
1098       GRPC_ERROR_STR_RAW_BYTES, goaway_text);
1099 
1100   GRPC_CHTTP2_IF_TRACING(
1101       gpr_log(GPR_INFO, "transport %p got goaway with last stream id %d", t,
1102               last_stream_id));
1103   // We want to log this irrespective of whether http tracing is enabled if we
1104   // received a GOAWAY with a non NO_ERROR code.
1105   if (goaway_error != GRPC_HTTP2_NO_ERROR) {
1106     gpr_log(GPR_INFO, "%s: Got goaway [%d] err=%s", t->peer_string.c_str(),
1107             goaway_error, grpc_error_std_string(t->goaway_error).c_str());
1108   }
1109   absl::Status status = grpc_error_to_absl_status(t->goaway_error);
1110   // When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug
1111   // data equal to "too_many_pings", it should log the occurrence at a log level
1112   // that is enabled by default and double the configured KEEPALIVE_TIME used
1113   // for new connections on that channel.
1114   if (GPR_UNLIKELY(t->is_client &&
1115                    goaway_error == GRPC_HTTP2_ENHANCE_YOUR_CALM &&
1116                    grpc_slice_str_cmp(goaway_text, "too_many_pings") == 0)) {
1117     gpr_log(GPR_ERROR,
1118             "Received a GOAWAY with error code ENHANCE_YOUR_CALM and debug "
1119             "data equal to \"too_many_pings\"");
1120     double current_keepalive_time_ms = static_cast<double>(t->keepalive_time);
1121     constexpr int max_keepalive_time_ms =
1122         INT_MAX / KEEPALIVE_TIME_BACKOFF_MULTIPLIER;
1123     t->keepalive_time =
1124         current_keepalive_time_ms > static_cast<double>(max_keepalive_time_ms)
1125             ? GRPC_MILLIS_INF_FUTURE
1126             : static_cast<grpc_millis>(current_keepalive_time_ms *
1127                                        KEEPALIVE_TIME_BACKOFF_MULTIPLIER);
1128     status.SetPayload(grpc_core::kKeepaliveThrottlingKey,
1129                       absl::Cord(std::to_string(t->keepalive_time)));
1130   }
1131   // lie: use transient failure from the transport to indicate goaway has been
1132   // received.
1133   connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE, status,
1134                          "got_goaway");
1135 }
1136 
maybe_start_some_streams(grpc_chttp2_transport * t)1137 static void maybe_start_some_streams(grpc_chttp2_transport* t) {
1138   grpc_chttp2_stream* s;
1139   // cancel out streams that haven't yet started if we have received a GOAWAY
1140   if (t->goaway_error != GRPC_ERROR_NONE) {
1141     while (grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1142       grpc_chttp2_cancel_stream(
1143           t, s,
1144           grpc_error_set_int(
1145               GRPC_ERROR_CREATE_FROM_STATIC_STRING("GOAWAY received"),
1146               GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1147     }
1148     return;
1149   }
1150   // start streams where we have free grpc_chttp2_stream ids and free
1151   // * concurrency
1152   while (t->next_stream_id <= MAX_CLIENT_STREAM_ID &&
1153          grpc_chttp2_stream_map_size(&t->stream_map) <
1154              t->settings[GRPC_PEER_SETTINGS]
1155                         [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] &&
1156          grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1157     // safe since we can't (legally) be parsing this stream yet
1158     GRPC_CHTTP2_IF_TRACING(gpr_log(
1159         GPR_INFO,
1160         "HTTP:%s: Transport %p allocating new grpc_chttp2_stream %p to id %d",
1161         t->is_client ? "CLI" : "SVR", t, s, t->next_stream_id));
1162 
1163     GPR_ASSERT(s->id == 0);
1164     s->id = t->next_stream_id;
1165     t->next_stream_id += 2;
1166 
1167     if (t->next_stream_id >= MAX_CLIENT_STREAM_ID) {
1168       connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE,
1169                              absl::Status(absl::StatusCode::kUnavailable,
1170                                           "Transport Stream IDs exhausted"),
1171                              "no_more_stream_ids");
1172     }
1173 
1174     grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
1175     post_destructive_reclaimer(t);
1176     grpc_chttp2_mark_stream_writable(t, s);
1177     grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM);
1178   }
1179   // cancel out streams that will never be started
1180   if (t->next_stream_id >= MAX_CLIENT_STREAM_ID) {
1181     while (grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1182       grpc_chttp2_cancel_stream(
1183           t, s,
1184           grpc_error_set_int(
1185               GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream IDs exhausted"),
1186               GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1187     }
1188   }
1189 }
1190 
1191 // Flag that this closure barrier may be covering a write in a pollset, and so
1192 //   we should not complete this closure until we can prove that the write got
1193 //   scheduled
1194 #define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 0)
1195 // First bit of the reference count, stored in the high order bits (with the low
1196 //   bits being used for flags defined above)
1197 #define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16)
1198 
add_closure_barrier(grpc_closure * closure)1199 static grpc_closure* add_closure_barrier(grpc_closure* closure) {
1200   closure->next_data.scratch += CLOSURE_BARRIER_FIRST_REF_BIT;
1201   return closure;
1202 }
1203 
null_then_sched_closure(grpc_closure ** closure)1204 static void null_then_sched_closure(grpc_closure** closure) {
1205   grpc_closure* c = *closure;
1206   *closure = nullptr;
1207   grpc_core::ExecCtx::Run(DEBUG_LOCATION, c, GRPC_ERROR_NONE);
1208 }
1209 
grpc_chttp2_complete_closure_step(grpc_chttp2_transport * t,grpc_chttp2_stream *,grpc_closure ** pclosure,grpc_error_handle error,const char * desc)1210 void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
1211                                        grpc_chttp2_stream* /*s*/,
1212                                        grpc_closure** pclosure,
1213                                        grpc_error_handle error,
1214                                        const char* desc) {
1215   grpc_closure* closure = *pclosure;
1216   *pclosure = nullptr;
1217   if (closure == nullptr) {
1218     GRPC_ERROR_UNREF(error);
1219     return;
1220   }
1221   closure->next_data.scratch -= CLOSURE_BARRIER_FIRST_REF_BIT;
1222   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1223     gpr_log(
1224         GPR_INFO,
1225         "complete_closure_step: t=%p %p refs=%d flags=0x%04x desc=%s err=%s "
1226         "write_state=%s",
1227         t, closure,
1228         static_cast<int>(closure->next_data.scratch /
1229                          CLOSURE_BARRIER_FIRST_REF_BIT),
1230         static_cast<int>(closure->next_data.scratch %
1231                          CLOSURE_BARRIER_FIRST_REF_BIT),
1232         desc, grpc_error_std_string(error).c_str(),
1233         write_state_name(t->write_state));
1234   }
1235   if (error != GRPC_ERROR_NONE) {
1236     if (closure->error_data.error == GRPC_ERROR_NONE) {
1237       closure->error_data.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1238           "Error in HTTP transport completing operation");
1239       closure->error_data.error = grpc_error_set_str(
1240           closure->error_data.error, GRPC_ERROR_STR_TARGET_ADDRESS,
1241           grpc_slice_from_copied_string(t->peer_string.c_str()));
1242     }
1243     closure->error_data.error =
1244         grpc_error_add_child(closure->error_data.error, error);
1245   }
1246   if (closure->next_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) {
1247     if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) ||
1248         !(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) {
1249       // Using GRPC_CLOSURE_SCHED instead of GRPC_CLOSURE_RUN to avoid running
1250       // closures earlier than when it is safe to do so.
1251       grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure,
1252                               closure->error_data.error);
1253     } else {
1254       grpc_closure_list_append(&t->run_after_write, closure,
1255                                closure->error_data.error);
1256     }
1257   }
1258 }
1259 
contains_non_ok_status(grpc_metadata_batch * batch)1260 static bool contains_non_ok_status(grpc_metadata_batch* batch) {
1261   if (batch->idx.named.grpc_status != nullptr) {
1262     return !grpc_mdelem_static_value_eq(batch->idx.named.grpc_status->md,
1263                                         GRPC_MDELEM_GRPC_STATUS_0);
1264   }
1265   return false;
1266 }
1267 
maybe_become_writable_due_to_send_msg(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1268 static void maybe_become_writable_due_to_send_msg(grpc_chttp2_transport* t,
1269                                                   grpc_chttp2_stream* s) {
1270   if (s->id != 0 && (!s->write_buffering ||
1271                      s->flow_controlled_buffer.length > t->write_buffer_size)) {
1272     grpc_chttp2_mark_stream_writable(t, s);
1273     grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE);
1274   }
1275 }
1276 
add_fetched_slice_locked(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1277 static void add_fetched_slice_locked(grpc_chttp2_transport* t,
1278                                      grpc_chttp2_stream* s) {
1279   s->fetched_send_message_length +=
1280       static_cast<uint32_t> GRPC_SLICE_LENGTH(s->fetching_slice);
1281   grpc_slice_buffer_add(&s->flow_controlled_buffer, s->fetching_slice);
1282   maybe_become_writable_due_to_send_msg(t, s);
1283 }
1284 
continue_fetching_send_locked(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1285 static void continue_fetching_send_locked(grpc_chttp2_transport* t,
1286                                           grpc_chttp2_stream* s) {
1287   for (;;) {
1288     if (s->fetching_send_message == nullptr) {
1289       // Stream was cancelled before message fetch completed
1290       abort(); /* TODO(ctiller): what cleanup here? */
1291       return;  /* early out */
1292     }
1293     if (s->fetched_send_message_length == s->fetching_send_message->length()) {
1294       int64_t notify_offset = s->next_message_end_offset;
1295       if (notify_offset <= s->flow_controlled_bytes_written) {
1296         grpc_chttp2_complete_closure_step(
1297             t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE,
1298             "fetching_send_message_finished");
1299       } else {
1300         grpc_chttp2_write_cb* cb = t->write_cb_pool;
1301         if (cb == nullptr) {
1302           cb = static_cast<grpc_chttp2_write_cb*>(gpr_malloc(sizeof(*cb)));
1303         } else {
1304           t->write_cb_pool = cb->next;
1305         }
1306         cb->call_at_byte = notify_offset;
1307         cb->closure = s->fetching_send_message_finished;
1308         s->fetching_send_message_finished = nullptr;
1309         grpc_chttp2_write_cb** list =
1310             s->fetching_send_message->flags() & GRPC_WRITE_THROUGH
1311                 ? &s->on_write_finished_cbs
1312                 : &s->on_flow_controlled_cbs;
1313         cb->next = *list;
1314         *list = cb;
1315       }
1316       s->fetching_send_message.reset();
1317       return; /* early out */
1318     } else if (s->fetching_send_message->Next(
1319                    UINT32_MAX, GRPC_CLOSURE_INIT(&s->complete_fetch_locked,
1320                                                  ::complete_fetch, s,
1321                                                  grpc_schedule_on_exec_ctx))) {
1322       grpc_error_handle error =
1323           s->fetching_send_message->Pull(&s->fetching_slice);
1324       if (error != GRPC_ERROR_NONE) {
1325         s->fetching_send_message.reset();
1326         grpc_chttp2_cancel_stream(t, s, error);
1327       } else {
1328         add_fetched_slice_locked(t, s);
1329       }
1330     }
1331   }
1332 }
1333 
complete_fetch(void * gs,grpc_error_handle error)1334 static void complete_fetch(void* gs, grpc_error_handle error) {
1335   grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(gs);
1336   s->t->combiner->Run(GRPC_CLOSURE_INIT(&s->complete_fetch_locked,
1337                                         ::complete_fetch_locked, s, nullptr),
1338                       GRPC_ERROR_REF(error));
1339 }
1340 
complete_fetch_locked(void * gs,grpc_error_handle error)1341 static void complete_fetch_locked(void* gs, grpc_error_handle error) {
1342   grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(gs);
1343   grpc_chttp2_transport* t = s->t;
1344   if (error == GRPC_ERROR_NONE) {
1345     error = s->fetching_send_message->Pull(&s->fetching_slice);
1346     if (error == GRPC_ERROR_NONE) {
1347       add_fetched_slice_locked(t, s);
1348       continue_fetching_send_locked(t, s);
1349     }
1350   }
1351   if (error != GRPC_ERROR_NONE) {
1352     s->fetching_send_message.reset();
1353     grpc_chttp2_cancel_stream(t, s, error);
1354   }
1355 }
1356 
log_metadata(const grpc_metadata_batch * md_batch,uint32_t id,bool is_client,bool is_initial)1357 static void log_metadata(const grpc_metadata_batch* md_batch, uint32_t id,
1358                          bool is_client, bool is_initial) {
1359   for (grpc_linked_mdelem* md = md_batch->list.head; md != nullptr;
1360        md = md->next) {
1361     char* key = grpc_slice_to_c_string(GRPC_MDKEY(md->md));
1362     char* value = grpc_slice_to_c_string(GRPC_MDVALUE(md->md));
1363     gpr_log(GPR_INFO, "HTTP:%d:%s:%s: %s: %s", id, is_initial ? "HDR" : "TRL",
1364             is_client ? "CLI" : "SVR", key, value);
1365     gpr_free(key);
1366     gpr_free(value);
1367   }
1368 }
1369 
perform_stream_op_locked(void * stream_op,grpc_error_handle)1370 static void perform_stream_op_locked(void* stream_op,
1371                                      grpc_error_handle /*error_ignored*/) {
1372   GPR_TIMER_SCOPE("perform_stream_op_locked", 0);
1373 
1374   grpc_transport_stream_op_batch* op =
1375       static_cast<grpc_transport_stream_op_batch*>(stream_op);
1376   grpc_chttp2_stream* s =
1377       static_cast<grpc_chttp2_stream*>(op->handler_private.extra_arg);
1378   grpc_transport_stream_op_batch_payload* op_payload = op->payload;
1379   grpc_chttp2_transport* t = s->t;
1380 
1381   GRPC_STATS_INC_HTTP2_OP_BATCHES();
1382 
1383   s->context = op->payload->context;
1384   s->traced = op->is_traced;
1385   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1386     gpr_log(GPR_INFO, "perform_stream_op_locked: %s; on_complete = %p",
1387             grpc_transport_stream_op_batch_string(op).c_str(), op->on_complete);
1388     if (op->send_initial_metadata) {
1389       log_metadata(op_payload->send_initial_metadata.send_initial_metadata,
1390                    s->id, t->is_client, true);
1391     }
1392     if (op->send_trailing_metadata) {
1393       log_metadata(op_payload->send_trailing_metadata.send_trailing_metadata,
1394                    s->id, t->is_client, false);
1395     }
1396   }
1397 
1398   grpc_closure* on_complete = op->on_complete;
1399   // on_complete will be null if and only if there are no send ops in the batch.
1400   if (on_complete != nullptr) {
1401     // This batch has send ops. Use final_data as a barrier until enqueue time;
1402     // the initial counter is dropped at the end of this function.
1403     on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT;
1404     on_complete->error_data.error = GRPC_ERROR_NONE;
1405   }
1406 
1407   if (op->cancel_stream) {
1408     GRPC_STATS_INC_HTTP2_OP_CANCEL();
1409     grpc_chttp2_cancel_stream(t, s, op_payload->cancel_stream.cancel_error);
1410   }
1411 
1412   if (op->send_initial_metadata) {
1413     if (t->is_client && t->channelz_socket != nullptr) {
1414       t->channelz_socket->RecordStreamStartedFromLocal();
1415     }
1416     GRPC_STATS_INC_HTTP2_OP_SEND_INITIAL_METADATA();
1417     GPR_ASSERT(s->send_initial_metadata_finished == nullptr);
1418     on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
1419 
1420     // Identify stream compression
1421     if (op_payload->send_initial_metadata.send_initial_metadata->idx.named
1422                 .content_encoding == nullptr ||
1423         grpc_stream_compression_method_parse(
1424             GRPC_MDVALUE(
1425                 op_payload->send_initial_metadata.send_initial_metadata->idx
1426                     .named.content_encoding->md),
1427             true, &s->stream_compression_method) == 0) {
1428       s->stream_compression_method = GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS;
1429     }
1430     if (s->stream_compression_method !=
1431         GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) {
1432       s->uncompressed_data_size = 0;
1433       s->stream_compression_ctx = nullptr;
1434       grpc_slice_buffer_init(&s->compressed_data_buffer);
1435     }
1436     s->send_initial_metadata_finished = add_closure_barrier(on_complete);
1437     s->send_initial_metadata =
1438         op_payload->send_initial_metadata.send_initial_metadata;
1439     if (t->is_client) {
1440       s->deadline = GPR_MIN(s->deadline, s->send_initial_metadata->deadline);
1441     }
1442     if (contains_non_ok_status(s->send_initial_metadata)) {
1443       s->seen_error = true;
1444     }
1445     if (!s->write_closed) {
1446       if (t->is_client) {
1447         if (t->closed_with_error == GRPC_ERROR_NONE) {
1448           GPR_ASSERT(s->id == 0);
1449           grpc_chttp2_list_add_waiting_for_concurrency(t, s);
1450           maybe_start_some_streams(t);
1451         } else {
1452           grpc_chttp2_cancel_stream(
1453               t, s,
1454               grpc_error_set_int(
1455                   GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1456                       "Transport closed", &t->closed_with_error, 1),
1457                   GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1458         }
1459       } else {
1460         GPR_ASSERT(s->id != 0);
1461         grpc_chttp2_mark_stream_writable(t, s);
1462         if (!(op->send_message &&
1463               (op->payload->send_message.send_message->flags() &
1464                GRPC_WRITE_BUFFER_HINT))) {
1465           grpc_chttp2_initiate_write(
1466               t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA);
1467         }
1468       }
1469     } else {
1470       s->send_initial_metadata = nullptr;
1471       grpc_chttp2_complete_closure_step(
1472           t, s, &s->send_initial_metadata_finished,
1473           GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1474               "Attempt to send initial metadata after stream was closed",
1475               &s->write_closed_error, 1),
1476           "send_initial_metadata_finished");
1477     }
1478     if (op_payload->send_initial_metadata.peer_string != nullptr) {
1479       gpr_atm_rel_store(op_payload->send_initial_metadata.peer_string,
1480                         (gpr_atm)t->peer_string.c_str());
1481     }
1482   }
1483 
1484   if (op->send_message) {
1485     GRPC_STATS_INC_HTTP2_OP_SEND_MESSAGE();
1486     t->num_messages_in_next_write++;
1487     GRPC_STATS_INC_HTTP2_SEND_MESSAGE_SIZE(
1488         op->payload->send_message.send_message->length());
1489     on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
1490     s->fetching_send_message_finished = add_closure_barrier(op->on_complete);
1491     if (s->write_closed) {
1492       op->payload->send_message.stream_write_closed = true;
1493       // We should NOT return an error here, so as to avoid a cancel OP being
1494       // started. The surface layer will notice that the stream has been closed
1495       // for writes and fail the send message op.
1496       op->payload->send_message.send_message.reset();
1497       grpc_chttp2_complete_closure_step(
1498           t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE,
1499           "fetching_send_message_finished");
1500     } else {
1501       GPR_ASSERT(s->fetching_send_message == nullptr);
1502       uint8_t* frame_hdr = grpc_slice_buffer_tiny_add(
1503           &s->flow_controlled_buffer, GRPC_HEADER_SIZE_IN_BYTES);
1504       uint32_t flags = op_payload->send_message.send_message->flags();
1505       frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0;
1506       size_t len = op_payload->send_message.send_message->length();
1507       frame_hdr[1] = static_cast<uint8_t>(len >> 24);
1508       frame_hdr[2] = static_cast<uint8_t>(len >> 16);
1509       frame_hdr[3] = static_cast<uint8_t>(len >> 8);
1510       frame_hdr[4] = static_cast<uint8_t>(len);
1511       s->fetching_send_message =
1512           std::move(op_payload->send_message.send_message);
1513       s->fetched_send_message_length = 0;
1514       s->next_message_end_offset =
1515           s->flow_controlled_bytes_written +
1516           static_cast<int64_t>(s->flow_controlled_buffer.length) +
1517           static_cast<int64_t>(len);
1518       if (flags & GRPC_WRITE_BUFFER_HINT) {
1519         s->next_message_end_offset -= t->write_buffer_size;
1520         s->write_buffering = true;
1521       } else {
1522         s->write_buffering = false;
1523       }
1524       continue_fetching_send_locked(t, s);
1525       maybe_become_writable_due_to_send_msg(t, s);
1526     }
1527   }
1528 
1529   if (op->send_trailing_metadata) {
1530     GRPC_STATS_INC_HTTP2_OP_SEND_TRAILING_METADATA();
1531     GPR_ASSERT(s->send_trailing_metadata_finished == nullptr);
1532     on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
1533     s->send_trailing_metadata_finished = add_closure_barrier(on_complete);
1534     s->send_trailing_metadata =
1535         op_payload->send_trailing_metadata.send_trailing_metadata;
1536     s->sent_trailing_metadata_op = op_payload->send_trailing_metadata.sent;
1537     s->write_buffering = false;
1538     if (contains_non_ok_status(s->send_trailing_metadata)) {
1539       s->seen_error = true;
1540     }
1541     if (s->write_closed) {
1542       s->send_trailing_metadata = nullptr;
1543       s->sent_trailing_metadata_op = nullptr;
1544       grpc_chttp2_complete_closure_step(
1545           t, s, &s->send_trailing_metadata_finished,
1546           grpc_metadata_batch_is_empty(
1547               op->payload->send_trailing_metadata.send_trailing_metadata)
1548               ? GRPC_ERROR_NONE
1549               : GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1550                     "Attempt to send trailing metadata after "
1551                     "stream was closed"),
1552           "send_trailing_metadata_finished");
1553     } else if (s->id != 0) {
1554       // TODO(ctiller): check if there's flow control for any outstanding
1555       //   bytes before going writable
1556       grpc_chttp2_mark_stream_writable(t, s);
1557       grpc_chttp2_initiate_write(
1558           t, GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA);
1559     }
1560   }
1561 
1562   if (op->recv_initial_metadata) {
1563     GRPC_STATS_INC_HTTP2_OP_RECV_INITIAL_METADATA();
1564     GPR_ASSERT(s->recv_initial_metadata_ready == nullptr);
1565     s->recv_initial_metadata_ready =
1566         op_payload->recv_initial_metadata.recv_initial_metadata_ready;
1567     s->recv_initial_metadata =
1568         op_payload->recv_initial_metadata.recv_initial_metadata;
1569     s->trailing_metadata_available =
1570         op_payload->recv_initial_metadata.trailing_metadata_available;
1571     if (op_payload->recv_initial_metadata.peer_string != nullptr) {
1572       gpr_atm_rel_store(op_payload->recv_initial_metadata.peer_string,
1573                         (gpr_atm)t->peer_string.c_str());
1574     }
1575     grpc_chttp2_maybe_complete_recv_initial_metadata(t, s);
1576   }
1577 
1578   if (op->recv_message) {
1579     GRPC_STATS_INC_HTTP2_OP_RECV_MESSAGE();
1580     size_t before = 0;
1581     GPR_ASSERT(s->recv_message_ready == nullptr);
1582     GPR_ASSERT(!s->pending_byte_stream);
1583     s->recv_message_ready = op_payload->recv_message.recv_message_ready;
1584     s->recv_message = op_payload->recv_message.recv_message;
1585     if (s->id != 0) {
1586       if (!s->read_closed) {
1587         before = s->frame_storage.length +
1588                  s->unprocessed_incoming_frames_buffer.length;
1589       }
1590     }
1591     grpc_chttp2_maybe_complete_recv_message(t, s);
1592     if (s->id != 0) {
1593       if (!s->read_closed && s->frame_storage.length == 0) {
1594         size_t after = s->frame_storage.length +
1595                        s->unprocessed_incoming_frames_buffer_cached_length;
1596         s->flow_control->IncomingByteStreamUpdate(GRPC_HEADER_SIZE_IN_BYTES,
1597                                                   before - after);
1598         grpc_chttp2_act_on_flowctl_action(s->flow_control->MakeAction(), t, s);
1599       }
1600     }
1601   }
1602 
1603   if (op->recv_trailing_metadata) {
1604     GRPC_STATS_INC_HTTP2_OP_RECV_TRAILING_METADATA();
1605     GPR_ASSERT(s->collecting_stats == nullptr);
1606     s->collecting_stats = op_payload->recv_trailing_metadata.collect_stats;
1607     GPR_ASSERT(s->recv_trailing_metadata_finished == nullptr);
1608     s->recv_trailing_metadata_finished =
1609         op_payload->recv_trailing_metadata.recv_trailing_metadata_ready;
1610     s->recv_trailing_metadata =
1611         op_payload->recv_trailing_metadata.recv_trailing_metadata;
1612     s->final_metadata_requested = true;
1613     grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
1614   }
1615 
1616   if (on_complete != nullptr) {
1617     grpc_chttp2_complete_closure_step(t, s, &on_complete, GRPC_ERROR_NONE,
1618                                       "op->on_complete");
1619   }
1620 
1621   GRPC_CHTTP2_STREAM_UNREF(s, "perform_stream_op");
1622 }
1623 
perform_stream_op(grpc_transport * gt,grpc_stream * gs,grpc_transport_stream_op_batch * op)1624 static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
1625                               grpc_transport_stream_op_batch* op) {
1626   GPR_TIMER_SCOPE("perform_stream_op", 0);
1627   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
1628   grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs);
1629 
1630   if (!t->is_client) {
1631     if (op->send_initial_metadata) {
1632       grpc_millis deadline =
1633           op->payload->send_initial_metadata.send_initial_metadata->deadline;
1634       GPR_ASSERT(deadline == GRPC_MILLIS_INF_FUTURE);
1635     }
1636     if (op->send_trailing_metadata) {
1637       grpc_millis deadline =
1638           op->payload->send_trailing_metadata.send_trailing_metadata->deadline;
1639       GPR_ASSERT(deadline == GRPC_MILLIS_INF_FUTURE);
1640     }
1641   }
1642 
1643   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1644     gpr_log(GPR_INFO, "perform_stream_op[s=%p]: %s", s,
1645             grpc_transport_stream_op_batch_string(op).c_str());
1646   }
1647 
1648   GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op");
1649   op->handler_private.extra_arg = gs;
1650   t->combiner->Run(GRPC_CLOSURE_INIT(&op->handler_private.closure,
1651                                      perform_stream_op_locked, op, nullptr),
1652                    GRPC_ERROR_NONE);
1653 }
1654 
cancel_pings(grpc_chttp2_transport * t,grpc_error_handle error)1655 static void cancel_pings(grpc_chttp2_transport* t, grpc_error_handle error) {
1656   // callback remaining pings: they're not allowed to call into the transport,
1657   //   and maybe they hold resources that need to be freed
1658   grpc_chttp2_ping_queue* pq = &t->ping_queue;
1659   GPR_ASSERT(error != GRPC_ERROR_NONE);
1660   for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) {
1661     grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error));
1662     grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &pq->lists[j]);
1663   }
1664   GRPC_ERROR_UNREF(error);
1665 }
1666 
send_ping_locked(grpc_chttp2_transport * t,grpc_closure * on_initiate,grpc_closure * on_ack)1667 static void send_ping_locked(grpc_chttp2_transport* t,
1668                              grpc_closure* on_initiate, grpc_closure* on_ack) {
1669   if (t->closed_with_error != GRPC_ERROR_NONE) {
1670     grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_initiate,
1671                             GRPC_ERROR_REF(t->closed_with_error));
1672     grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_ack,
1673                             GRPC_ERROR_REF(t->closed_with_error));
1674     return;
1675   }
1676   grpc_chttp2_ping_queue* pq = &t->ping_queue;
1677   grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE], on_initiate,
1678                            GRPC_ERROR_NONE);
1679   grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT], on_ack,
1680                            GRPC_ERROR_NONE);
1681 }
1682 
1683 // Specialized form of send_ping_locked for keepalive ping. If there is already
1684 // a ping in progress, the keepalive ping would piggyback onto that ping,
1685 // instead of waiting for that ping to complete and then starting a new ping.
send_keepalive_ping_locked(grpc_chttp2_transport * t)1686 static void send_keepalive_ping_locked(grpc_chttp2_transport* t) {
1687   if (t->closed_with_error != GRPC_ERROR_NONE) {
1688     t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked,
1689                                        start_keepalive_ping_locked, t, nullptr),
1690                      GRPC_ERROR_REF(t->closed_with_error));
1691     t->combiner->Run(
1692         GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked,
1693                           finish_keepalive_ping_locked, t, nullptr),
1694         GRPC_ERROR_REF(t->closed_with_error));
1695     return;
1696   }
1697   grpc_chttp2_ping_queue* pq = &t->ping_queue;
1698   if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) {
1699     // There is a ping in flight. Add yourself to the inflight closure list.
1700     t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked,
1701                                        start_keepalive_ping_locked, t, nullptr),
1702                      GRPC_ERROR_REF(t->closed_with_error));
1703     grpc_closure_list_append(
1704         &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT],
1705         GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked,
1706                           finish_keepalive_ping, t, grpc_schedule_on_exec_ctx),
1707         GRPC_ERROR_NONE);
1708     return;
1709   }
1710   grpc_closure_list_append(
1711       &pq->lists[GRPC_CHTTP2_PCL_INITIATE],
1712       GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked, start_keepalive_ping,
1713                         t, grpc_schedule_on_exec_ctx),
1714       GRPC_ERROR_NONE);
1715   grpc_closure_list_append(
1716       &pq->lists[GRPC_CHTTP2_PCL_NEXT],
1717       GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked, finish_keepalive_ping,
1718                         t, grpc_schedule_on_exec_ctx),
1719       GRPC_ERROR_NONE);
1720 }
1721 
grpc_chttp2_retry_initiate_ping(void * tp,grpc_error_handle error)1722 void grpc_chttp2_retry_initiate_ping(void* tp, grpc_error_handle error) {
1723   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
1724   t->combiner->Run(GRPC_CLOSURE_INIT(&t->retry_initiate_ping_locked,
1725                                      retry_initiate_ping_locked, t, nullptr),
1726                    GRPC_ERROR_REF(error));
1727 }
1728 
retry_initiate_ping_locked(void * tp,grpc_error_handle error)1729 static void retry_initiate_ping_locked(void* tp, grpc_error_handle error) {
1730   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
1731   t->ping_state.is_delayed_ping_timer_set = false;
1732   if (error == GRPC_ERROR_NONE) {
1733     grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING);
1734   }
1735   GRPC_CHTTP2_UNREF_TRANSPORT(t, "retry_initiate_ping_locked");
1736 }
1737 
grpc_chttp2_ack_ping(grpc_chttp2_transport * t,uint64_t id)1738 void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) {
1739   grpc_chttp2_ping_queue* pq = &t->ping_queue;
1740   if (pq->inflight_id != id) {
1741     gpr_log(GPR_DEBUG, "Unknown ping response from %s: %" PRIx64,
1742             t->peer_string.c_str(), id);
1743     return;
1744   }
1745   grpc_core::ExecCtx::RunList(DEBUG_LOCATION,
1746                               &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
1747   if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
1748     grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS);
1749   }
1750 }
1751 
send_goaway(grpc_chttp2_transport * t,grpc_error_handle error)1752 static void send_goaway(grpc_chttp2_transport* t, grpc_error_handle error) {
1753   // We want to log this irrespective of whether http tracing is enabled
1754   gpr_log(GPR_INFO, "%s: Sending goaway err=%s", t->peer_string.c_str(),
1755           grpc_error_std_string(error).c_str());
1756   t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED;
1757   grpc_http2_error_code http_error;
1758   grpc_slice slice;
1759   grpc_error_get_status(error, GRPC_MILLIS_INF_FUTURE, nullptr, &slice,
1760                         &http_error, nullptr);
1761   grpc_chttp2_goaway_append(t->last_new_stream_id,
1762                             static_cast<uint32_t>(http_error),
1763                             grpc_slice_ref_internal(slice), &t->qbuf);
1764   grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT);
1765   GRPC_ERROR_UNREF(error);
1766 }
1767 
grpc_chttp2_add_ping_strike(grpc_chttp2_transport * t)1768 void grpc_chttp2_add_ping_strike(grpc_chttp2_transport* t) {
1769   if (++t->ping_recv_state.ping_strikes > t->ping_policy.max_ping_strikes &&
1770       t->ping_policy.max_ping_strikes != 0) {
1771     send_goaway(t,
1772                 grpc_error_set_int(
1773                     GRPC_ERROR_CREATE_FROM_STATIC_STRING("too_many_pings"),
1774                     GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM));
1775     // The transport will be closed after the write is done
1776     close_transport_locked(
1777         t, grpc_error_set_int(
1778                GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many pings"),
1779                GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1780   }
1781 }
1782 
grpc_chttp2_reset_ping_clock(grpc_chttp2_transport * t)1783 void grpc_chttp2_reset_ping_clock(grpc_chttp2_transport* t) {
1784   if (!t->is_client) {
1785     t->ping_recv_state.last_ping_recv_time = GRPC_MILLIS_INF_PAST;
1786     t->ping_recv_state.ping_strikes = 0;
1787   }
1788   t->ping_state.pings_before_data_required =
1789       t->ping_policy.max_pings_without_data;
1790 }
1791 
perform_transport_op_locked(void * stream_op,grpc_error_handle)1792 static void perform_transport_op_locked(void* stream_op,
1793                                         grpc_error_handle /*error_ignored*/) {
1794   grpc_transport_op* op = static_cast<grpc_transport_op*>(stream_op);
1795   grpc_chttp2_transport* t =
1796       static_cast<grpc_chttp2_transport*>(op->handler_private.extra_arg);
1797 
1798   if (op->goaway_error) {
1799     send_goaway(t, op->goaway_error);
1800   }
1801 
1802   if (op->set_accept_stream) {
1803     t->accept_stream_cb = op->set_accept_stream_fn;
1804     t->accept_stream_cb_user_data = op->set_accept_stream_user_data;
1805   }
1806 
1807   if (op->bind_pollset) {
1808     grpc_endpoint_add_to_pollset(t->ep, op->bind_pollset);
1809   }
1810 
1811   if (op->bind_pollset_set) {
1812     grpc_endpoint_add_to_pollset_set(t->ep, op->bind_pollset_set);
1813   }
1814 
1815   if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
1816     send_ping_locked(t, op->send_ping.on_initiate, op->send_ping.on_ack);
1817     grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING);
1818   }
1819 
1820   if (op->start_connectivity_watch != nullptr) {
1821     t->state_tracker.AddWatcher(op->start_connectivity_watch_state,
1822                                 std::move(op->start_connectivity_watch));
1823   }
1824   if (op->stop_connectivity_watch != nullptr) {
1825     t->state_tracker.RemoveWatcher(op->stop_connectivity_watch);
1826   }
1827 
1828   if (op->disconnect_with_error != GRPC_ERROR_NONE) {
1829     close_transport_locked(t, op->disconnect_with_error);
1830   }
1831 
1832   grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, GRPC_ERROR_NONE);
1833 
1834   GRPC_CHTTP2_UNREF_TRANSPORT(t, "transport_op");
1835 }
1836 
perform_transport_op(grpc_transport * gt,grpc_transport_op * op)1837 static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
1838   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
1839   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1840     gpr_log(GPR_INFO, "perform_transport_op[t=%p]: %s", t,
1841             grpc_transport_op_string(op).c_str());
1842   }
1843   op->handler_private.extra_arg = gt;
1844   GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op");
1845   t->combiner->Run(GRPC_CLOSURE_INIT(&op->handler_private.closure,
1846                                      perform_transport_op_locked, op, nullptr),
1847                    GRPC_ERROR_NONE);
1848 }
1849 
1850 //
1851 // INPUT PROCESSING - GENERAL
1852 //
1853 
grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport *,grpc_chttp2_stream * s)1854 void grpc_chttp2_maybe_complete_recv_initial_metadata(
1855     grpc_chttp2_transport* /*t*/, grpc_chttp2_stream* s) {
1856   if (s->recv_initial_metadata_ready != nullptr &&
1857       s->published_metadata[0] != GRPC_METADATA_NOT_PUBLISHED) {
1858     if (s->seen_error) {
1859       grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1860       if (!s->pending_byte_stream) {
1861         grpc_slice_buffer_reset_and_unref_internal(
1862             &s->unprocessed_incoming_frames_buffer);
1863       }
1864     }
1865     grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[0],
1866                                                  s->recv_initial_metadata);
1867     null_then_sched_closure(&s->recv_initial_metadata_ready);
1868   }
1869 }
1870 
grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport *,grpc_chttp2_stream * s)1871 void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* /*t*/,
1872                                              grpc_chttp2_stream* s) {
1873   grpc_error_handle error = GRPC_ERROR_NONE;
1874   if (s->recv_message_ready != nullptr) {
1875     *s->recv_message = nullptr;
1876     if (s->final_metadata_requested && s->seen_error) {
1877       grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1878       if (!s->pending_byte_stream) {
1879         grpc_slice_buffer_reset_and_unref_internal(
1880             &s->unprocessed_incoming_frames_buffer);
1881       }
1882     }
1883     if (!s->pending_byte_stream) {
1884       while (s->unprocessed_incoming_frames_buffer.length > 0 ||
1885              s->frame_storage.length > 0) {
1886         if (s->unprocessed_incoming_frames_buffer.length == 0) {
1887           grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer,
1888                                  &s->frame_storage);
1889           s->unprocessed_incoming_frames_decompressed = false;
1890         }
1891         if (!s->unprocessed_incoming_frames_decompressed &&
1892             s->stream_decompression_method !=
1893                 GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
1894           GPR_ASSERT(s->decompressed_data_buffer.length == 0);
1895           bool end_of_context;
1896           if (!s->stream_decompression_ctx) {
1897             s->stream_decompression_ctx =
1898                 grpc_stream_compression_context_create(
1899                     s->stream_decompression_method);
1900           }
1901           if (!grpc_stream_decompress(
1902                   s->stream_decompression_ctx,
1903                   &s->unprocessed_incoming_frames_buffer,
1904                   &s->decompressed_data_buffer, nullptr,
1905                   GRPC_HEADER_SIZE_IN_BYTES - s->decompressed_header_bytes,
1906                   &end_of_context)) {
1907             grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1908             grpc_slice_buffer_reset_and_unref_internal(
1909                 &s->unprocessed_incoming_frames_buffer);
1910             error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1911                 "Stream decompression error.");
1912           } else {
1913             s->decompressed_header_bytes += s->decompressed_data_buffer.length;
1914             if (s->decompressed_header_bytes == GRPC_HEADER_SIZE_IN_BYTES) {
1915               s->decompressed_header_bytes = 0;
1916             }
1917             error = grpc_deframe_unprocessed_incoming_frames(
1918                 &s->data_parser, s, &s->decompressed_data_buffer, nullptr,
1919                 s->recv_message);
1920             if (end_of_context) {
1921               grpc_stream_compression_context_destroy(
1922                   s->stream_decompression_ctx);
1923               s->stream_decompression_ctx = nullptr;
1924             }
1925           }
1926         } else {
1927           error = grpc_deframe_unprocessed_incoming_frames(
1928               &s->data_parser, s, &s->unprocessed_incoming_frames_buffer,
1929               nullptr, s->recv_message);
1930         }
1931         if (error != GRPC_ERROR_NONE) {
1932           s->seen_error = true;
1933           grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1934           grpc_slice_buffer_reset_and_unref_internal(
1935               &s->unprocessed_incoming_frames_buffer);
1936           break;
1937         } else if (*s->recv_message != nullptr) {
1938           break;
1939         }
1940       }
1941     }
1942     // save the length of the buffer before handing control back to application
1943     // threads. Needed to support correct flow control bookkeeping
1944     s->unprocessed_incoming_frames_buffer_cached_length =
1945         s->unprocessed_incoming_frames_buffer.length;
1946     if (error == GRPC_ERROR_NONE && *s->recv_message != nullptr) {
1947       null_then_sched_closure(&s->recv_message_ready);
1948     } else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) {
1949       *s->recv_message = nullptr;
1950       null_then_sched_closure(&s->recv_message_ready);
1951     }
1952     GRPC_ERROR_UNREF(error);
1953   }
1954 }
1955 
grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1956 void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
1957                                                        grpc_chttp2_stream* s) {
1958   grpc_chttp2_maybe_complete_recv_message(t, s);
1959   if (s->recv_trailing_metadata_finished != nullptr && s->read_closed &&
1960       s->write_closed) {
1961     if (s->seen_error || !t->is_client) {
1962       grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1963       if (!s->pending_byte_stream) {
1964         grpc_slice_buffer_reset_and_unref_internal(
1965             &s->unprocessed_incoming_frames_buffer);
1966       }
1967     }
1968     bool pending_data = s->pending_byte_stream ||
1969                         s->unprocessed_incoming_frames_buffer.length > 0;
1970     if (s->read_closed && s->frame_storage.length > 0 && !pending_data &&
1971         !s->seen_error && s->recv_trailing_metadata_finished != nullptr) {
1972       // Maybe some SYNC_FLUSH data is left in frame_storage. Consume them and
1973       // maybe decompress the next 5 bytes in the stream.
1974       if (s->stream_decompression_method ==
1975           GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
1976         grpc_slice_buffer_move_first(
1977             &s->frame_storage,
1978             GPR_MIN(s->frame_storage.length, GRPC_HEADER_SIZE_IN_BYTES),
1979             &s->unprocessed_incoming_frames_buffer);
1980         if (s->unprocessed_incoming_frames_buffer.length > 0) {
1981           s->unprocessed_incoming_frames_decompressed = true;
1982           pending_data = true;
1983         }
1984       } else {
1985         bool end_of_context;
1986         if (!s->stream_decompression_ctx) {
1987           s->stream_decompression_ctx = grpc_stream_compression_context_create(
1988               s->stream_decompression_method);
1989         }
1990         if (!grpc_stream_decompress(
1991                 s->stream_decompression_ctx, &s->frame_storage,
1992                 &s->unprocessed_incoming_frames_buffer, nullptr,
1993                 GRPC_HEADER_SIZE_IN_BYTES, &end_of_context)) {
1994           grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1995           grpc_slice_buffer_reset_and_unref_internal(
1996               &s->unprocessed_incoming_frames_buffer);
1997           s->seen_error = true;
1998         } else {
1999           if (s->unprocessed_incoming_frames_buffer.length > 0) {
2000             s->unprocessed_incoming_frames_decompressed = true;
2001             pending_data = true;
2002           }
2003           if (end_of_context) {
2004             grpc_stream_compression_context_destroy(
2005                 s->stream_decompression_ctx);
2006             s->stream_decompression_ctx = nullptr;
2007           }
2008         }
2009       }
2010     }
2011     if (s->read_closed && s->frame_storage.length == 0 && !pending_data &&
2012         s->recv_trailing_metadata_finished != nullptr) {
2013       grpc_transport_move_stats(&s->stats, s->collecting_stats);
2014       s->collecting_stats = nullptr;
2015       grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[1],
2016                                                    s->recv_trailing_metadata);
2017       null_then_sched_closure(&s->recv_trailing_metadata_finished);
2018     }
2019   }
2020 }
2021 
remove_stream(grpc_chttp2_transport * t,uint32_t id,grpc_error_handle error)2022 static void remove_stream(grpc_chttp2_transport* t, uint32_t id,
2023                           grpc_error_handle error) {
2024   grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(
2025       grpc_chttp2_stream_map_delete(&t->stream_map, id));
2026   GPR_DEBUG_ASSERT(s);
2027   if (t->incoming_stream == s) {
2028     t->incoming_stream = nullptr;
2029     grpc_chttp2_parsing_become_skip_parser(t);
2030   }
2031   if (s->pending_byte_stream) {
2032     if (s->on_next != nullptr) {
2033       grpc_core::Chttp2IncomingByteStream* bs = s->data_parser.parsing_frame;
2034       if (error == GRPC_ERROR_NONE) {
2035         error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
2036       }
2037       bs->PublishError(error);
2038       bs->Unref();
2039       s->data_parser.parsing_frame = nullptr;
2040     } else {
2041       GRPC_ERROR_UNREF(s->byte_stream_error);
2042       s->byte_stream_error = GRPC_ERROR_REF(error);
2043     }
2044   }
2045 
2046   if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
2047     post_benign_reclaimer(t);
2048     if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SENT) {
2049       close_transport_locked(
2050           t, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2051                  "Last stream closed after sending GOAWAY", &error, 1));
2052     }
2053   }
2054   if (grpc_chttp2_list_remove_writable_stream(t, s)) {
2055     GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:remove_stream");
2056   }
2057   grpc_chttp2_list_remove_stalled_by_stream(t, s);
2058   grpc_chttp2_list_remove_stalled_by_transport(t, s);
2059 
2060   GRPC_ERROR_UNREF(error);
2061 
2062   maybe_start_some_streams(t);
2063 }
2064 
grpc_chttp2_cancel_stream(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error_handle due_to_error)2065 void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2066                                grpc_error_handle due_to_error) {
2067   if (!t->is_client && !s->sent_trailing_metadata &&
2068       grpc_error_has_clear_grpc_status(due_to_error)) {
2069     close_from_api(t, s, due_to_error);
2070     return;
2071   }
2072 
2073   if (!s->read_closed || !s->write_closed) {
2074     if (s->id != 0) {
2075       grpc_http2_error_code http_error;
2076       grpc_error_get_status(due_to_error, s->deadline, nullptr, nullptr,
2077                             &http_error, nullptr);
2078       grpc_chttp2_add_rst_stream_to_next_write(
2079           t, s->id, static_cast<uint32_t>(http_error), &s->stats.outgoing);
2080       grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
2081     }
2082   }
2083   if (due_to_error != GRPC_ERROR_NONE && !s->seen_error) {
2084     s->seen_error = true;
2085   }
2086   grpc_chttp2_mark_stream_closed(t, s, 1, 1, due_to_error);
2087 }
2088 
grpc_chttp2_fake_status(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error_handle error)2089 void grpc_chttp2_fake_status(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2090                              grpc_error_handle error) {
2091   grpc_status_code status;
2092   grpc_slice slice;
2093   grpc_error_get_status(error, s->deadline, &status, &slice, nullptr, nullptr);
2094   if (status != GRPC_STATUS_OK) {
2095     s->seen_error = true;
2096   }
2097   // stream_global->recv_trailing_metadata_finished gives us a
2098   //   last chance replacement: we've received trailing metadata,
2099   //   but something more important has become available to signal
2100   //   to the upper layers - drop what we've got, and then publish
2101   //   what we want - which is safe because we haven't told anyone
2102   //   about the metadata yet
2103   if (s->published_metadata[1] == GRPC_METADATA_NOT_PUBLISHED ||
2104       s->recv_trailing_metadata_finished != nullptr) {
2105     char status_string[GPR_LTOA_MIN_BUFSIZE];
2106     gpr_ltoa(status, status_string);
2107     GRPC_LOG_IF_ERROR("add_status",
2108                       grpc_chttp2_incoming_metadata_buffer_replace_or_add(
2109                           &s->metadata_buffer[1],
2110                           grpc_mdelem_from_slices(
2111                               GRPC_MDSTR_GRPC_STATUS,
2112                               grpc_core::UnmanagedMemorySlice(status_string))));
2113     if (!GRPC_SLICE_IS_EMPTY(slice)) {
2114       GRPC_LOG_IF_ERROR(
2115           "add_status_message",
2116           grpc_chttp2_incoming_metadata_buffer_replace_or_add(
2117               &s->metadata_buffer[1],
2118               grpc_mdelem_create(GRPC_MDSTR_GRPC_MESSAGE, slice, nullptr)));
2119     }
2120     s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE;
2121     grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2122   }
2123 
2124   GRPC_ERROR_UNREF(error);
2125 }
2126 
add_error(grpc_error_handle error,grpc_error_handle * refs,size_t * nrefs)2127 static void add_error(grpc_error_handle error, grpc_error_handle* refs,
2128                       size_t* nrefs) {
2129   if (error == GRPC_ERROR_NONE) return;
2130   for (size_t i = 0; i < *nrefs; i++) {
2131     if (error == refs[i]) {
2132       return;
2133     }
2134   }
2135   refs[*nrefs] = error;
2136   ++*nrefs;
2137 }
2138 
removal_error(grpc_error_handle extra_error,grpc_chttp2_stream * s,const char * main_error_msg)2139 static grpc_error_handle removal_error(grpc_error_handle extra_error,
2140                                        grpc_chttp2_stream* s,
2141                                        const char* main_error_msg) {
2142   grpc_error_handle refs[3];
2143   size_t nrefs = 0;
2144   add_error(s->read_closed_error, refs, &nrefs);
2145   add_error(s->write_closed_error, refs, &nrefs);
2146   add_error(extra_error, refs, &nrefs);
2147   grpc_error_handle error = GRPC_ERROR_NONE;
2148   if (nrefs > 0) {
2149     error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(main_error_msg,
2150                                                              refs, nrefs);
2151   }
2152   GRPC_ERROR_UNREF(extra_error);
2153   return error;
2154 }
2155 
flush_write_list(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_chttp2_write_cb ** list,grpc_error_handle error)2156 static void flush_write_list(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2157                              grpc_chttp2_write_cb** list,
2158                              grpc_error_handle error) {
2159   while (*list) {
2160     grpc_chttp2_write_cb* cb = *list;
2161     *list = cb->next;
2162     grpc_chttp2_complete_closure_step(t, s, &cb->closure, GRPC_ERROR_REF(error),
2163                                       "on_write_finished_cb");
2164     cb->next = t->write_cb_pool;
2165     t->write_cb_pool = cb;
2166   }
2167   GRPC_ERROR_UNREF(error);
2168 }
2169 
grpc_chttp2_fail_pending_writes(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error_handle error)2170 void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t,
2171                                      grpc_chttp2_stream* s,
2172                                      grpc_error_handle error) {
2173   error =
2174       removal_error(error, s, "Pending writes failed due to stream closure");
2175   s->send_initial_metadata = nullptr;
2176   grpc_chttp2_complete_closure_step(t, s, &s->send_initial_metadata_finished,
2177                                     GRPC_ERROR_REF(error),
2178                                     "send_initial_metadata_finished");
2179 
2180   s->send_trailing_metadata = nullptr;
2181   s->sent_trailing_metadata_op = nullptr;
2182   grpc_chttp2_complete_closure_step(t, s, &s->send_trailing_metadata_finished,
2183                                     GRPC_ERROR_REF(error),
2184                                     "send_trailing_metadata_finished");
2185 
2186   s->fetching_send_message.reset();
2187   grpc_chttp2_complete_closure_step(t, s, &s->fetching_send_message_finished,
2188                                     GRPC_ERROR_REF(error),
2189                                     "fetching_send_message_finished");
2190   flush_write_list(t, s, &s->on_write_finished_cbs, GRPC_ERROR_REF(error));
2191   flush_write_list(t, s, &s->on_flow_controlled_cbs, error);
2192 }
2193 
grpc_chttp2_mark_stream_closed(grpc_chttp2_transport * t,grpc_chttp2_stream * s,int close_reads,int close_writes,grpc_error_handle error)2194 void grpc_chttp2_mark_stream_closed(grpc_chttp2_transport* t,
2195                                     grpc_chttp2_stream* s, int close_reads,
2196                                     int close_writes, grpc_error_handle error) {
2197   if (s->read_closed && s->write_closed) {
2198     // already closed, but we should still fake the status if needed.
2199     grpc_error_handle overall_error = removal_error(error, s, "Stream removed");
2200     if (overall_error != GRPC_ERROR_NONE) {
2201       grpc_chttp2_fake_status(t, s, overall_error);
2202     }
2203     grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2204     return;
2205   }
2206   bool closed_read = false;
2207   bool became_closed = false;
2208   if (close_reads && !s->read_closed) {
2209     s->read_closed_error = GRPC_ERROR_REF(error);
2210     s->read_closed = true;
2211     closed_read = true;
2212   }
2213   if (close_writes && !s->write_closed) {
2214     s->write_closed_error = GRPC_ERROR_REF(error);
2215     s->write_closed = true;
2216     grpc_chttp2_fail_pending_writes(t, s, GRPC_ERROR_REF(error));
2217   }
2218   if (s->read_closed && s->write_closed) {
2219     became_closed = true;
2220     grpc_error_handle overall_error =
2221         removal_error(GRPC_ERROR_REF(error), s, "Stream removed");
2222     if (s->id != 0) {
2223       remove_stream(t, s->id, GRPC_ERROR_REF(overall_error));
2224     } else {
2225       // Purge streams waiting on concurrency still waiting for id assignment
2226       grpc_chttp2_list_remove_waiting_for_concurrency(t, s);
2227     }
2228     if (overall_error != GRPC_ERROR_NONE) {
2229       grpc_chttp2_fake_status(t, s, overall_error);
2230     }
2231   }
2232   if (closed_read) {
2233     for (int i = 0; i < 2; i++) {
2234       if (s->published_metadata[i] == GRPC_METADATA_NOT_PUBLISHED) {
2235         s->published_metadata[i] = GRPC_METADATA_PUBLISHED_AT_CLOSE;
2236       }
2237     }
2238     grpc_chttp2_maybe_complete_recv_initial_metadata(t, s);
2239     grpc_chttp2_maybe_complete_recv_message(t, s);
2240   }
2241   if (became_closed) {
2242     grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2243     GRPC_CHTTP2_STREAM_UNREF(s, "chttp2");
2244   }
2245   GRPC_ERROR_UNREF(error);
2246 }
2247 
close_from_api(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error_handle error)2248 static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2249                            grpc_error_handle error) {
2250   grpc_slice hdr;
2251   grpc_slice status_hdr;
2252   grpc_slice http_status_hdr;
2253   grpc_slice content_type_hdr;
2254   grpc_slice message_pfx;
2255   uint8_t* p;
2256   uint32_t len = 0;
2257   grpc_status_code grpc_status;
2258   grpc_slice slice;
2259   grpc_error_get_status(error, s->deadline, &grpc_status, &slice, nullptr,
2260                         nullptr);
2261 
2262   GPR_ASSERT(grpc_status >= 0 && (int)grpc_status < 100);
2263 
2264   // Hand roll a header block.
2265   //   This is unnecessarily ugly - at some point we should find a more
2266   //   elegant solution.
2267   //   It's complicated by the fact that our send machinery would be dead by
2268   //   the time we got around to sending this, so instead we ignore HPACK
2269   //   compression and just write the uncompressed bytes onto the wire.
2270   if (!s->sent_initial_metadata) {
2271     http_status_hdr = GRPC_SLICE_MALLOC(13);
2272     p = GRPC_SLICE_START_PTR(http_status_hdr);
2273     *p++ = 0x00;
2274     *p++ = 7;
2275     *p++ = ':';
2276     *p++ = 's';
2277     *p++ = 't';
2278     *p++ = 'a';
2279     *p++ = 't';
2280     *p++ = 'u';
2281     *p++ = 's';
2282     *p++ = 3;
2283     *p++ = '2';
2284     *p++ = '0';
2285     *p++ = '0';
2286     GPR_ASSERT(p == GRPC_SLICE_END_PTR(http_status_hdr));
2287     len += static_cast<uint32_t> GRPC_SLICE_LENGTH(http_status_hdr);
2288 
2289     content_type_hdr = GRPC_SLICE_MALLOC(31);
2290     p = GRPC_SLICE_START_PTR(content_type_hdr);
2291     *p++ = 0x00;
2292     *p++ = 12;
2293     *p++ = 'c';
2294     *p++ = 'o';
2295     *p++ = 'n';
2296     *p++ = 't';
2297     *p++ = 'e';
2298     *p++ = 'n';
2299     *p++ = 't';
2300     *p++ = '-';
2301     *p++ = 't';
2302     *p++ = 'y';
2303     *p++ = 'p';
2304     *p++ = 'e';
2305     *p++ = 16;
2306     *p++ = 'a';
2307     *p++ = 'p';
2308     *p++ = 'p';
2309     *p++ = 'l';
2310     *p++ = 'i';
2311     *p++ = 'c';
2312     *p++ = 'a';
2313     *p++ = 't';
2314     *p++ = 'i';
2315     *p++ = 'o';
2316     *p++ = 'n';
2317     *p++ = '/';
2318     *p++ = 'g';
2319     *p++ = 'r';
2320     *p++ = 'p';
2321     *p++ = 'c';
2322     GPR_ASSERT(p == GRPC_SLICE_END_PTR(content_type_hdr));
2323     len += static_cast<uint32_t> GRPC_SLICE_LENGTH(content_type_hdr);
2324   }
2325 
2326   status_hdr = GRPC_SLICE_MALLOC(15 + (grpc_status >= 10));
2327   p = GRPC_SLICE_START_PTR(status_hdr);
2328   *p++ = 0x00; /* literal header, not indexed */
2329   *p++ = 11;   /* len(grpc-status) */
2330   *p++ = 'g';
2331   *p++ = 'r';
2332   *p++ = 'p';
2333   *p++ = 'c';
2334   *p++ = '-';
2335   *p++ = 's';
2336   *p++ = 't';
2337   *p++ = 'a';
2338   *p++ = 't';
2339   *p++ = 'u';
2340   *p++ = 's';
2341   if (grpc_status < 10) {
2342     *p++ = 1;
2343     *p++ = static_cast<uint8_t>('0' + grpc_status);
2344   } else {
2345     *p++ = 2;
2346     *p++ = static_cast<uint8_t>('0' + (grpc_status / 10));
2347     *p++ = static_cast<uint8_t>('0' + (grpc_status % 10));
2348   }
2349   GPR_ASSERT(p == GRPC_SLICE_END_PTR(status_hdr));
2350   len += static_cast<uint32_t> GRPC_SLICE_LENGTH(status_hdr);
2351 
2352   size_t msg_len = GRPC_SLICE_LENGTH(slice);
2353   GPR_ASSERT(msg_len <= UINT32_MAX);
2354   uint32_t msg_len_len = GRPC_CHTTP2_VARINT_LENGTH((uint32_t)msg_len, 1);
2355   message_pfx = GRPC_SLICE_MALLOC(14 + msg_len_len);
2356   p = GRPC_SLICE_START_PTR(message_pfx);
2357   *p++ = 0x00; /* literal header, not indexed */
2358   *p++ = 12;   /* len(grpc-message) */
2359   *p++ = 'g';
2360   *p++ = 'r';
2361   *p++ = 'p';
2362   *p++ = 'c';
2363   *p++ = '-';
2364   *p++ = 'm';
2365   *p++ = 'e';
2366   *p++ = 's';
2367   *p++ = 's';
2368   *p++ = 'a';
2369   *p++ = 'g';
2370   *p++ = 'e';
2371   GRPC_CHTTP2_WRITE_VARINT((uint32_t)msg_len, 1, 0, p, (uint32_t)msg_len_len);
2372   p += msg_len_len;
2373   GPR_ASSERT(p == GRPC_SLICE_END_PTR(message_pfx));
2374   len += static_cast<uint32_t> GRPC_SLICE_LENGTH(message_pfx);
2375   len += static_cast<uint32_t>(msg_len);
2376 
2377   hdr = GRPC_SLICE_MALLOC(9);
2378   p = GRPC_SLICE_START_PTR(hdr);
2379   *p++ = static_cast<uint8_t>(len >> 16);
2380   *p++ = static_cast<uint8_t>(len >> 8);
2381   *p++ = static_cast<uint8_t>(len);
2382   *p++ = GRPC_CHTTP2_FRAME_HEADER;
2383   *p++ = GRPC_CHTTP2_DATA_FLAG_END_STREAM | GRPC_CHTTP2_DATA_FLAG_END_HEADERS;
2384   *p++ = static_cast<uint8_t>(s->id >> 24);
2385   *p++ = static_cast<uint8_t>(s->id >> 16);
2386   *p++ = static_cast<uint8_t>(s->id >> 8);
2387   *p++ = static_cast<uint8_t>(s->id);
2388   GPR_ASSERT(p == GRPC_SLICE_END_PTR(hdr));
2389 
2390   grpc_slice_buffer_add(&t->qbuf, hdr);
2391   if (!s->sent_initial_metadata) {
2392     grpc_slice_buffer_add(&t->qbuf, http_status_hdr);
2393     grpc_slice_buffer_add(&t->qbuf, content_type_hdr);
2394   }
2395   grpc_slice_buffer_add(&t->qbuf, status_hdr);
2396   grpc_slice_buffer_add(&t->qbuf, message_pfx);
2397   grpc_slice_buffer_add(&t->qbuf, grpc_slice_ref_internal(slice));
2398   grpc_chttp2_reset_ping_clock(t);
2399   grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR,
2400                                            &s->stats.outgoing);
2401 
2402   grpc_chttp2_mark_stream_closed(t, s, 1, 1, error);
2403   grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API);
2404 }
2405 
2406 struct cancel_stream_cb_args {
2407   grpc_error_handle error;
2408   grpc_chttp2_transport* t;
2409 };
2410 
cancel_stream_cb(void * user_data,uint32_t,void * stream)2411 static void cancel_stream_cb(void* user_data, uint32_t /*key*/, void* stream) {
2412   cancel_stream_cb_args* args = static_cast<cancel_stream_cb_args*>(user_data);
2413   grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(stream);
2414   grpc_chttp2_cancel_stream(args->t, s, GRPC_ERROR_REF(args->error));
2415 }
2416 
end_all_the_calls(grpc_chttp2_transport * t,grpc_error_handle error)2417 static void end_all_the_calls(grpc_chttp2_transport* t,
2418                               grpc_error_handle error) {
2419   intptr_t http2_error;
2420   // If there is no explicit grpc or HTTP/2 error, set to UNAVAILABLE on server.
2421   if (!t->is_client && !grpc_error_has_clear_grpc_status(error) &&
2422       !grpc_error_get_int(error, GRPC_ERROR_INT_HTTP2_ERROR, &http2_error)) {
2423     error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
2424                                GRPC_STATUS_UNAVAILABLE);
2425   }
2426   cancel_stream_cb_args args = {error, t};
2427   grpc_chttp2_stream_map_for_each(&t->stream_map, cancel_stream_cb, &args);
2428   GRPC_ERROR_UNREF(error);
2429 }
2430 
2431 //
2432 // INPUT PROCESSING - PARSING
2433 //
2434 
2435 template <class F>
WithUrgency(grpc_chttp2_transport * t,grpc_core::chttp2::FlowControlAction::Urgency urgency,grpc_chttp2_initiate_write_reason reason,F action)2436 static void WithUrgency(grpc_chttp2_transport* t,
2437                         grpc_core::chttp2::FlowControlAction::Urgency urgency,
2438                         grpc_chttp2_initiate_write_reason reason, F action) {
2439   switch (urgency) {
2440     case grpc_core::chttp2::FlowControlAction::Urgency::NO_ACTION_NEEDED:
2441       break;
2442     case grpc_core::chttp2::FlowControlAction::Urgency::UPDATE_IMMEDIATELY:
2443       grpc_chttp2_initiate_write(t, reason);
2444     // fallthrough
2445     case grpc_core::chttp2::FlowControlAction::Urgency::QUEUE_UPDATE:
2446       action();
2447       break;
2448   }
2449 }
2450 
grpc_chttp2_act_on_flowctl_action(const grpc_core::chttp2::FlowControlAction & action,grpc_chttp2_transport * t,grpc_chttp2_stream * s)2451 void grpc_chttp2_act_on_flowctl_action(
2452     const grpc_core::chttp2::FlowControlAction& action,
2453     grpc_chttp2_transport* t, grpc_chttp2_stream* s) {
2454   WithUrgency(t, action.send_stream_update(),
2455               GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL,
2456               [t, s]() { grpc_chttp2_mark_stream_writable(t, s); });
2457   WithUrgency(t, action.send_transport_update(),
2458               GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL, []() {});
2459   WithUrgency(t, action.send_initial_window_update(),
2460               GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
2461                 queue_setting_update(t,
2462                                      GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
2463                                      action.initial_window_size());
2464               });
2465   WithUrgency(t, action.send_max_frame_size_update(),
2466               GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
2467                 queue_setting_update(t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
2468                                      action.max_frame_size());
2469               });
2470 }
2471 
try_http_parsing(grpc_chttp2_transport * t)2472 static grpc_error_handle try_http_parsing(grpc_chttp2_transport* t) {
2473   grpc_http_parser parser;
2474   size_t i = 0;
2475   grpc_error_handle error = GRPC_ERROR_NONE;
2476   grpc_http_response response;
2477 
2478   grpc_http_parser_init(&parser, GRPC_HTTP_RESPONSE, &response);
2479 
2480   grpc_error_handle parse_error = GRPC_ERROR_NONE;
2481   for (; i < t->read_buffer.count && parse_error == GRPC_ERROR_NONE; i++) {
2482     parse_error =
2483         grpc_http_parser_parse(&parser, t->read_buffer.slices[i], nullptr);
2484   }
2485   if (parse_error == GRPC_ERROR_NONE &&
2486       (parse_error = grpc_http_parser_eof(&parser)) == GRPC_ERROR_NONE) {
2487     error = grpc_error_set_int(
2488         grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2489                                "Trying to connect an http1.x server"),
2490                            GRPC_ERROR_INT_HTTP_STATUS, response.status),
2491         GRPC_ERROR_INT_GRPC_STATUS,
2492         grpc_http2_status_to_grpc_status(response.status));
2493   }
2494   GRPC_ERROR_UNREF(parse_error);
2495 
2496   grpc_http_parser_destroy(&parser);
2497   grpc_http_response_destroy(&response);
2498   return error;
2499 }
2500 
read_action(void * tp,grpc_error_handle error)2501 static void read_action(void* tp, grpc_error_handle error) {
2502   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2503   t->combiner->Run(
2504       GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t, nullptr),
2505       GRPC_ERROR_REF(error));
2506 }
2507 
read_action_locked(void * tp,grpc_error_handle error)2508 static void read_action_locked(void* tp, grpc_error_handle error) {
2509   GPR_TIMER_SCOPE("reading_action_locked", 0);
2510 
2511   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2512 
2513   GRPC_ERROR_REF(error);
2514 
2515   grpc_error_handle err = error;
2516   if (err != GRPC_ERROR_NONE) {
2517     err = grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2518                                  "Endpoint read failed", &err, 1),
2519                              GRPC_ERROR_INT_OCCURRED_DURING_WRITE,
2520                              t->write_state);
2521   }
2522   GPR_SWAP(grpc_error_handle, err, error);
2523   GRPC_ERROR_UNREF(err);
2524   if (t->closed_with_error == GRPC_ERROR_NONE) {
2525     GPR_TIMER_SCOPE("reading_action.parse", 0);
2526     size_t i = 0;
2527     grpc_error_handle errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE,
2528                                    GRPC_ERROR_NONE};
2529     for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) {
2530       errors[1] = grpc_chttp2_perform_read(t, t->read_buffer.slices[i]);
2531     }
2532     if (errors[1] != GRPC_ERROR_NONE) {
2533       errors[2] = try_http_parsing(t);
2534       GRPC_ERROR_UNREF(error);
2535       error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2536           "Failed parsing HTTP/2", errors, GPR_ARRAY_SIZE(errors));
2537     }
2538     for (i = 0; i < GPR_ARRAY_SIZE(errors); i++) {
2539       GRPC_ERROR_UNREF(errors[i]);
2540     }
2541 
2542     GPR_TIMER_SCOPE("post_parse_locked", 0);
2543     if (t->initial_window_update != 0) {
2544       if (t->initial_window_update > 0) {
2545         grpc_chttp2_stream* s;
2546         while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) {
2547           grpc_chttp2_mark_stream_writable(t, s);
2548           grpc_chttp2_initiate_write(
2549               t, GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING);
2550         }
2551       }
2552       t->initial_window_update = 0;
2553     }
2554   }
2555 
2556   GPR_TIMER_SCOPE("post_reading_action_locked", 0);
2557   bool keep_reading = false;
2558   if (error == GRPC_ERROR_NONE && t->closed_with_error != GRPC_ERROR_NONE) {
2559     error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2560         "Transport closed", &t->closed_with_error, 1);
2561   }
2562   if (error != GRPC_ERROR_NONE) {
2563     // If a goaway frame was received, this might be the reason why the read
2564     // failed. Add this info to the error
2565     if (t->goaway_error != GRPC_ERROR_NONE) {
2566       error = grpc_error_add_child(error, GRPC_ERROR_REF(t->goaway_error));
2567     }
2568 
2569     close_transport_locked(t, GRPC_ERROR_REF(error));
2570     t->endpoint_reading = 0;
2571   } else if (t->closed_with_error == GRPC_ERROR_NONE) {
2572     keep_reading = true;
2573     // Since we have read a byte, reset the keepalive timer
2574     if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
2575       grpc_timer_cancel(&t->keepalive_ping_timer);
2576     }
2577   }
2578   grpc_slice_buffer_reset_and_unref_internal(&t->read_buffer);
2579 
2580   if (keep_reading) {
2581     if (t->num_pending_induced_frames >= DEFAULT_MAX_PENDING_INDUCED_FRAMES) {
2582       t->reading_paused_on_pending_induced_frames = true;
2583       GRPC_CHTTP2_IF_TRACING(
2584           gpr_log(GPR_INFO,
2585                   "transport %p : Pausing reading due to too "
2586                   "many unwritten SETTINGS ACK and RST_STREAM frames",
2587                   t));
2588     } else {
2589       continue_read_action_locked(t);
2590     }
2591   } else {
2592     GRPC_CHTTP2_UNREF_TRANSPORT(t, "reading_action");
2593   }
2594 
2595   GRPC_ERROR_UNREF(error);
2596 }
2597 
continue_read_action_locked(grpc_chttp2_transport * t)2598 static void continue_read_action_locked(grpc_chttp2_transport* t) {
2599   const bool urgent = t->goaway_error != GRPC_ERROR_NONE;
2600   GRPC_CLOSURE_INIT(&t->read_action_locked, read_action, t,
2601                     grpc_schedule_on_exec_ctx);
2602   grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action_locked, urgent);
2603   grpc_chttp2_act_on_flowctl_action(t->flow_control->MakeAction(), t, nullptr);
2604 }
2605 
2606 // t is reffed prior to calling the first time, and once the callback chain
2607 // that kicks off finishes, it's unreffed
schedule_bdp_ping_locked(grpc_chttp2_transport * t)2608 void schedule_bdp_ping_locked(grpc_chttp2_transport* t) {
2609   t->flow_control->bdp_estimator()->SchedulePing();
2610   send_ping_locked(
2611       t,
2612       GRPC_CLOSURE_INIT(&t->start_bdp_ping_locked, start_bdp_ping, t,
2613                         grpc_schedule_on_exec_ctx),
2614       GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked, finish_bdp_ping, t,
2615                         grpc_schedule_on_exec_ctx));
2616   grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_BDP_PING);
2617 }
2618 
start_bdp_ping(void * tp,grpc_error_handle error)2619 static void start_bdp_ping(void* tp, grpc_error_handle error) {
2620   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2621   t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_bdp_ping_locked,
2622                                      start_bdp_ping_locked, t, nullptr),
2623                    GRPC_ERROR_REF(error));
2624 }
2625 
start_bdp_ping_locked(void * tp,grpc_error_handle error)2626 static void start_bdp_ping_locked(void* tp, grpc_error_handle error) {
2627   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2628   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
2629     gpr_log(GPR_INFO, "%s: Start BDP ping err=%s", t->peer_string.c_str(),
2630             grpc_error_std_string(error).c_str());
2631   }
2632   if (error != GRPC_ERROR_NONE || t->closed_with_error != GRPC_ERROR_NONE) {
2633     return;
2634   }
2635   // Reset the keepalive ping timer
2636   if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
2637     grpc_timer_cancel(&t->keepalive_ping_timer);
2638   }
2639   t->flow_control->bdp_estimator()->StartPing();
2640   t->bdp_ping_started = true;
2641 }
2642 
finish_bdp_ping(void * tp,grpc_error_handle error)2643 static void finish_bdp_ping(void* tp, grpc_error_handle error) {
2644   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2645   t->combiner->Run(GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked,
2646                                      finish_bdp_ping_locked, t, nullptr),
2647                    GRPC_ERROR_REF(error));
2648 }
2649 
finish_bdp_ping_locked(void * tp,grpc_error_handle error)2650 static void finish_bdp_ping_locked(void* tp, grpc_error_handle error) {
2651   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2652   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
2653     gpr_log(GPR_INFO, "%s: Complete BDP ping err=%s", t->peer_string.c_str(),
2654             grpc_error_std_string(error).c_str());
2655   }
2656   if (error != GRPC_ERROR_NONE || t->closed_with_error != GRPC_ERROR_NONE) {
2657     GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
2658     return;
2659   }
2660   if (!t->bdp_ping_started) {
2661     // start_bdp_ping_locked has not been run yet. Schedule
2662     // finish_bdp_ping_locked to be run later.
2663     t->combiner->Run(GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked,
2664                                        finish_bdp_ping_locked, t, nullptr),
2665                      GRPC_ERROR_REF(error));
2666     return;
2667   }
2668   t->bdp_ping_started = false;
2669   grpc_millis next_ping = t->flow_control->bdp_estimator()->CompletePing();
2670   grpc_chttp2_act_on_flowctl_action(t->flow_control->PeriodicUpdate(), t,
2671                                     nullptr);
2672   GPR_ASSERT(!t->have_next_bdp_ping_timer);
2673   t->have_next_bdp_ping_timer = true;
2674   GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked,
2675                     next_bdp_ping_timer_expired, t, grpc_schedule_on_exec_ctx);
2676   grpc_timer_init(&t->next_bdp_ping_timer, next_ping,
2677                   &t->next_bdp_ping_timer_expired_locked);
2678 }
2679 
next_bdp_ping_timer_expired(void * tp,grpc_error_handle error)2680 static void next_bdp_ping_timer_expired(void* tp, grpc_error_handle error) {
2681   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2682   t->combiner->Run(
2683       GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked,
2684                         next_bdp_ping_timer_expired_locked, t, nullptr),
2685       GRPC_ERROR_REF(error));
2686 }
2687 
next_bdp_ping_timer_expired_locked(void * tp,grpc_error_handle error)2688 static void next_bdp_ping_timer_expired_locked(void* tp,
2689                                                grpc_error_handle error) {
2690   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2691   GPR_ASSERT(t->have_next_bdp_ping_timer);
2692   t->have_next_bdp_ping_timer = false;
2693   if (error != GRPC_ERROR_NONE) {
2694     GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
2695     return;
2696   }
2697   if (t->flow_control->bdp_estimator()->accumulator() == 0) {
2698     // Block the bdp ping till we receive more data.
2699     t->bdp_ping_blocked = true;
2700     GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
2701   } else {
2702     schedule_bdp_ping_locked(t);
2703   }
2704 }
2705 
grpc_chttp2_config_default_keepalive_args(grpc_channel_args * args,bool is_client)2706 void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
2707                                                bool is_client) {
2708   size_t i;
2709   if (args) {
2710     for (i = 0; i < args->num_args; i++) {
2711       if (0 == strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) {
2712         const int value = grpc_channel_arg_get_integer(
2713             &args->args[i], {is_client ? g_default_client_keepalive_time_ms
2714                                        : g_default_server_keepalive_time_ms,
2715                              1, INT_MAX});
2716         if (is_client) {
2717           g_default_client_keepalive_time_ms = value;
2718         } else {
2719           g_default_server_keepalive_time_ms = value;
2720         }
2721       } else if (0 ==
2722                  strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) {
2723         const int value = grpc_channel_arg_get_integer(
2724             &args->args[i], {is_client ? g_default_client_keepalive_timeout_ms
2725                                        : g_default_server_keepalive_timeout_ms,
2726                              0, INT_MAX});
2727         if (is_client) {
2728           g_default_client_keepalive_timeout_ms = value;
2729         } else {
2730           g_default_server_keepalive_timeout_ms = value;
2731         }
2732       } else if (0 == strcmp(args->args[i].key,
2733                              GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) {
2734         const bool value = static_cast<uint32_t>(grpc_channel_arg_get_integer(
2735             &args->args[i],
2736             {is_client ? g_default_client_keepalive_permit_without_calls
2737                        : g_default_server_keepalive_timeout_ms,
2738              0, 1}));
2739         if (is_client) {
2740           g_default_client_keepalive_permit_without_calls = value;
2741         } else {
2742           g_default_server_keepalive_permit_without_calls = value;
2743         }
2744       } else if (0 ==
2745                  strcmp(args->args[i].key, GRPC_ARG_HTTP2_MAX_PING_STRIKES)) {
2746         g_default_max_ping_strikes = grpc_channel_arg_get_integer(
2747             &args->args[i], {g_default_max_ping_strikes, 0, INT_MAX});
2748       } else if (0 == strcmp(args->args[i].key,
2749                              GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) {
2750         g_default_max_pings_without_data = grpc_channel_arg_get_integer(
2751             &args->args[i], {g_default_max_pings_without_data, 0, INT_MAX});
2752       } else if (0 ==
2753                  strcmp(
2754                      args->args[i].key,
2755                      GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS)) {
2756         g_default_min_recv_ping_interval_without_data_ms =
2757             grpc_channel_arg_get_integer(
2758                 &args->args[i],
2759                 {g_default_min_recv_ping_interval_without_data_ms, 0, INT_MAX});
2760       }
2761     }
2762   }
2763 }
2764 
init_keepalive_ping(void * arg,grpc_error_handle error)2765 static void init_keepalive_ping(void* arg, grpc_error_handle error) {
2766   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2767   t->combiner->Run(GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked,
2768                                      init_keepalive_ping_locked, t, nullptr),
2769                    GRPC_ERROR_REF(error));
2770 }
2771 
init_keepalive_ping_locked(void * arg,grpc_error_handle error)2772 static void init_keepalive_ping_locked(void* arg, grpc_error_handle error) {
2773   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2774   GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING);
2775   if (t->destroying || t->closed_with_error != GRPC_ERROR_NONE) {
2776     t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
2777   } else if (error == GRPC_ERROR_NONE) {
2778     if (t->keepalive_permit_without_calls ||
2779         grpc_chttp2_stream_map_size(&t->stream_map) > 0) {
2780       t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING;
2781       GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end");
2782       grpc_timer_init_unset(&t->keepalive_watchdog_timer);
2783       send_keepalive_ping_locked(t);
2784       grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING);
2785     } else {
2786       GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
2787       GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t,
2788                         grpc_schedule_on_exec_ctx);
2789       grpc_timer_init(&t->keepalive_ping_timer,
2790                       grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
2791                       &t->init_keepalive_ping_locked);
2792     }
2793   } else if (error == GRPC_ERROR_CANCELLED) {
2794     // The keepalive ping timer may be cancelled by bdp
2795     GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
2796     GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t,
2797                       grpc_schedule_on_exec_ctx);
2798     grpc_timer_init(&t->keepalive_ping_timer,
2799                     grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
2800                     &t->init_keepalive_ping_locked);
2801   }
2802   GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping");
2803 }
2804 
start_keepalive_ping(void * arg,grpc_error_handle error)2805 static void start_keepalive_ping(void* arg, grpc_error_handle error) {
2806   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2807   t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked,
2808                                      start_keepalive_ping_locked, t, nullptr),
2809                    GRPC_ERROR_REF(error));
2810 }
2811 
start_keepalive_ping_locked(void * arg,grpc_error_handle error)2812 static void start_keepalive_ping_locked(void* arg, grpc_error_handle error) {
2813   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2814   if (error != GRPC_ERROR_NONE) {
2815     return;
2816   }
2817   if (t->channelz_socket != nullptr) {
2818     t->channelz_socket->RecordKeepaliveSent();
2819   }
2820   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
2821       GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
2822     gpr_log(GPR_INFO, "%s: Start keepalive ping", t->peer_string.c_str());
2823   }
2824   GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog");
2825   GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked,
2826                     keepalive_watchdog_fired, t, grpc_schedule_on_exec_ctx);
2827   grpc_timer_init(&t->keepalive_watchdog_timer,
2828                   grpc_core::ExecCtx::Get()->Now() + t->keepalive_timeout,
2829                   &t->keepalive_watchdog_fired_locked);
2830   t->keepalive_ping_started = true;
2831 }
2832 
finish_keepalive_ping(void * arg,grpc_error_handle error)2833 static void finish_keepalive_ping(void* arg, grpc_error_handle error) {
2834   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2835   t->combiner->Run(GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked,
2836                                      finish_keepalive_ping_locked, t, nullptr),
2837                    GRPC_ERROR_REF(error));
2838 }
2839 
finish_keepalive_ping_locked(void * arg,grpc_error_handle error)2840 static void finish_keepalive_ping_locked(void* arg, grpc_error_handle error) {
2841   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2842   if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
2843     if (error == GRPC_ERROR_NONE) {
2844       if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
2845           GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
2846         gpr_log(GPR_INFO, "%s: Finish keepalive ping", t->peer_string.c_str());
2847       }
2848       if (!t->keepalive_ping_started) {
2849         // start_keepalive_ping_locked has not run yet. Reschedule
2850         // finish_keepalive_ping_locked for it to be run later.
2851         t->combiner->Run(
2852             GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked,
2853                               finish_keepalive_ping_locked, t, nullptr),
2854             GRPC_ERROR_REF(error));
2855         return;
2856       }
2857       t->keepalive_ping_started = false;
2858       t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
2859       grpc_timer_cancel(&t->keepalive_watchdog_timer);
2860       GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
2861       GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t,
2862                         grpc_schedule_on_exec_ctx);
2863       grpc_timer_init(&t->keepalive_ping_timer,
2864                       grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
2865                       &t->init_keepalive_ping_locked);
2866     }
2867   }
2868   GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive ping end");
2869 }
2870 
keepalive_watchdog_fired(void * arg,grpc_error_handle error)2871 static void keepalive_watchdog_fired(void* arg, grpc_error_handle error) {
2872   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2873   t->combiner->Run(
2874       GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked,
2875                         keepalive_watchdog_fired_locked, t, nullptr),
2876       GRPC_ERROR_REF(error));
2877 }
2878 
keepalive_watchdog_fired_locked(void * arg,grpc_error_handle error)2879 static void keepalive_watchdog_fired_locked(void* arg,
2880                                             grpc_error_handle error) {
2881   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2882   if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
2883     if (error == GRPC_ERROR_NONE) {
2884       gpr_log(GPR_INFO, "%s: Keepalive watchdog fired. Closing transport.",
2885               t->peer_string.c_str());
2886       t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
2887       close_transport_locked(
2888           t, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2889                                     "keepalive watchdog timeout"),
2890                                 GRPC_ERROR_INT_GRPC_STATUS,
2891                                 GRPC_STATUS_UNAVAILABLE));
2892     }
2893   } else {
2894     // The watchdog timer should have been cancelled by
2895     // finish_keepalive_ping_locked.
2896     if (GPR_UNLIKELY(error != GRPC_ERROR_CANCELLED)) {
2897       gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)",
2898               t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING);
2899     }
2900   }
2901   GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog");
2902 }
2903 
2904 //
2905 // CALLBACK LOOP
2906 //
2907 
connectivity_state_set(grpc_chttp2_transport * t,grpc_connectivity_state state,const absl::Status & status,const char * reason)2908 static void connectivity_state_set(grpc_chttp2_transport* t,
2909                                    grpc_connectivity_state state,
2910                                    const absl::Status& status,
2911                                    const char* reason) {
2912   GRPC_CHTTP2_IF_TRACING(
2913       gpr_log(GPR_INFO, "transport %p set connectivity_state=%d", t, state));
2914   t->state_tracker.SetState(state, status, reason);
2915 }
2916 
2917 //
2918 // POLLSET STUFF
2919 //
2920 
set_pollset(grpc_transport * gt,grpc_stream *,grpc_pollset * pollset)2921 static void set_pollset(grpc_transport* gt, grpc_stream* /*gs*/,
2922                         grpc_pollset* pollset) {
2923   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
2924   grpc_endpoint_add_to_pollset(t->ep, pollset);
2925 }
2926 
set_pollset_set(grpc_transport * gt,grpc_stream *,grpc_pollset_set * pollset_set)2927 static void set_pollset_set(grpc_transport* gt, grpc_stream* /*gs*/,
2928                             grpc_pollset_set* pollset_set) {
2929   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
2930   grpc_endpoint_add_to_pollset_set(t->ep, pollset_set);
2931 }
2932 
2933 //
2934 // BYTE STREAM
2935 //
2936 
reset_byte_stream(void * arg,grpc_error_handle error)2937 static void reset_byte_stream(void* arg, grpc_error_handle error) {
2938   grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(arg);
2939   s->pending_byte_stream = false;
2940   if (error == GRPC_ERROR_NONE) {
2941     grpc_chttp2_maybe_complete_recv_message(s->t, s);
2942     grpc_chttp2_maybe_complete_recv_trailing_metadata(s->t, s);
2943   } else {
2944     GPR_ASSERT(error != GRPC_ERROR_NONE);
2945     grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->on_next, GRPC_ERROR_REF(error));
2946     s->on_next = nullptr;
2947     GRPC_ERROR_UNREF(s->byte_stream_error);
2948     s->byte_stream_error = GRPC_ERROR_NONE;
2949     grpc_chttp2_cancel_stream(s->t, s, GRPC_ERROR_REF(error));
2950     s->byte_stream_error = GRPC_ERROR_REF(error);
2951   }
2952 }
2953 
2954 namespace grpc_core {
2955 
Chttp2IncomingByteStream(grpc_chttp2_transport * transport,grpc_chttp2_stream * stream,uint32_t frame_size,uint32_t flags)2956 Chttp2IncomingByteStream::Chttp2IncomingByteStream(
2957     grpc_chttp2_transport* transport, grpc_chttp2_stream* stream,
2958     uint32_t frame_size, uint32_t flags)
2959     : ByteStream(frame_size, flags),
2960       transport_(transport),
2961       stream_(stream),
2962       refs_(2),
2963       remaining_bytes_(frame_size) {
2964   GRPC_ERROR_UNREF(stream->byte_stream_error);
2965   stream->byte_stream_error = GRPC_ERROR_NONE;
2966 }
2967 
OrphanLocked(void * arg,grpc_error_handle)2968 void Chttp2IncomingByteStream::OrphanLocked(
2969     void* arg, grpc_error_handle /*error_ignored*/) {
2970   Chttp2IncomingByteStream* bs = static_cast<Chttp2IncomingByteStream*>(arg);
2971   grpc_chttp2_stream* s = bs->stream_;
2972   grpc_chttp2_transport* t = s->t;
2973   bs->Unref();
2974   s->pending_byte_stream = false;
2975   grpc_chttp2_maybe_complete_recv_message(t, s);
2976   grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2977 }
2978 
Orphan()2979 void Chttp2IncomingByteStream::Orphan() {
2980   GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0);
2981   transport_->combiner->Run(
2982       GRPC_CLOSURE_INIT(&destroy_action_,
2983                         &Chttp2IncomingByteStream::OrphanLocked, this, nullptr),
2984       GRPC_ERROR_NONE);
2985 }
2986 
NextLocked(void * arg,grpc_error_handle)2987 void Chttp2IncomingByteStream::NextLocked(void* arg,
2988                                           grpc_error_handle /*error_ignored*/) {
2989   Chttp2IncomingByteStream* bs = static_cast<Chttp2IncomingByteStream*>(arg);
2990   grpc_chttp2_transport* t = bs->transport_;
2991   grpc_chttp2_stream* s = bs->stream_;
2992   size_t cur_length = s->frame_storage.length;
2993   if (!s->read_closed) {
2994     s->flow_control->IncomingByteStreamUpdate(bs->next_action_.max_size_hint,
2995                                               cur_length);
2996     grpc_chttp2_act_on_flowctl_action(s->flow_control->MakeAction(), t, s);
2997   }
2998   GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0);
2999   if (s->frame_storage.length > 0) {
3000     grpc_slice_buffer_swap(&s->frame_storage,
3001                            &s->unprocessed_incoming_frames_buffer);
3002     s->unprocessed_incoming_frames_decompressed = false;
3003     grpc_core::ExecCtx::Run(DEBUG_LOCATION, bs->next_action_.on_complete,
3004                             GRPC_ERROR_NONE);
3005   } else if (s->byte_stream_error != GRPC_ERROR_NONE) {
3006     grpc_core::ExecCtx::Run(DEBUG_LOCATION, bs->next_action_.on_complete,
3007                             GRPC_ERROR_REF(s->byte_stream_error));
3008     if (s->data_parser.parsing_frame != nullptr) {
3009       s->data_parser.parsing_frame->Unref();
3010       s->data_parser.parsing_frame = nullptr;
3011     }
3012   } else if (s->read_closed) {
3013     if (bs->remaining_bytes_ != 0) {
3014       s->byte_stream_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
3015           "Truncated message", &s->read_closed_error, 1);
3016       grpc_core::ExecCtx::Run(DEBUG_LOCATION, bs->next_action_.on_complete,
3017                               GRPC_ERROR_REF(s->byte_stream_error));
3018       if (s->data_parser.parsing_frame != nullptr) {
3019         s->data_parser.parsing_frame->Unref();
3020         s->data_parser.parsing_frame = nullptr;
3021       }
3022     } else {
3023       // Should never reach here.
3024       GPR_ASSERT(false);
3025     }
3026   } else {
3027     s->on_next = bs->next_action_.on_complete;
3028   }
3029   bs->Unref();
3030 }
3031 
Next(size_t max_size_hint,grpc_closure * on_complete)3032 bool Chttp2IncomingByteStream::Next(size_t max_size_hint,
3033                                     grpc_closure* on_complete) {
3034   GPR_TIMER_SCOPE("incoming_byte_stream_next", 0);
3035   if (stream_->unprocessed_incoming_frames_buffer.length > 0) {
3036     return true;
3037   } else {
3038     Ref();
3039     next_action_.max_size_hint = max_size_hint;
3040     next_action_.on_complete = on_complete;
3041     transport_->combiner->Run(
3042         GRPC_CLOSURE_INIT(&next_action_.closure,
3043                           &Chttp2IncomingByteStream::NextLocked, this, nullptr),
3044         GRPC_ERROR_NONE);
3045     return false;
3046   }
3047 }
3048 
MaybeCreateStreamDecompressionCtx()3049 void Chttp2IncomingByteStream::MaybeCreateStreamDecompressionCtx() {
3050   GPR_DEBUG_ASSERT(stream_->stream_decompression_method !=
3051                    GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS);
3052   if (!stream_->stream_decompression_ctx) {
3053     stream_->stream_decompression_ctx = grpc_stream_compression_context_create(
3054         stream_->stream_decompression_method);
3055   }
3056 }
3057 
Pull(grpc_slice * slice)3058 grpc_error_handle Chttp2IncomingByteStream::Pull(grpc_slice* slice) {
3059   GPR_TIMER_SCOPE("incoming_byte_stream_pull", 0);
3060   grpc_error_handle error;
3061   if (stream_->unprocessed_incoming_frames_buffer.length > 0) {
3062     if (!stream_->unprocessed_incoming_frames_decompressed &&
3063         stream_->stream_decompression_method !=
3064             GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
3065       bool end_of_context;
3066       MaybeCreateStreamDecompressionCtx();
3067       if (!grpc_stream_decompress(stream_->stream_decompression_ctx,
3068                                   &stream_->unprocessed_incoming_frames_buffer,
3069                                   &stream_->decompressed_data_buffer, nullptr,
3070                                   MAX_SIZE_T, &end_of_context)) {
3071         error =
3072             GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream decompression error.");
3073         return error;
3074       }
3075       GPR_ASSERT(stream_->unprocessed_incoming_frames_buffer.length == 0);
3076       grpc_slice_buffer_swap(&stream_->unprocessed_incoming_frames_buffer,
3077                              &stream_->decompressed_data_buffer);
3078       stream_->unprocessed_incoming_frames_decompressed = true;
3079       if (end_of_context) {
3080         grpc_stream_compression_context_destroy(
3081             stream_->stream_decompression_ctx);
3082         stream_->stream_decompression_ctx = nullptr;
3083       }
3084       if (stream_->unprocessed_incoming_frames_buffer.length == 0) {
3085         *slice = grpc_empty_slice();
3086       }
3087     }
3088     error = grpc_deframe_unprocessed_incoming_frames(
3089         &stream_->data_parser, stream_,
3090         &stream_->unprocessed_incoming_frames_buffer, slice, nullptr);
3091     if (error != GRPC_ERROR_NONE) {
3092       return error;
3093     }
3094   } else {
3095     error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
3096     stream_->t->combiner->Run(&stream_->reset_byte_stream,
3097                               GRPC_ERROR_REF(error));
3098     return error;
3099   }
3100   return GRPC_ERROR_NONE;
3101 }
3102 
PublishError(grpc_error_handle error)3103 void Chttp2IncomingByteStream::PublishError(grpc_error_handle error) {
3104   GPR_ASSERT(error != GRPC_ERROR_NONE);
3105   grpc_core::ExecCtx::Run(DEBUG_LOCATION, stream_->on_next,
3106                           GRPC_ERROR_REF(error));
3107   stream_->on_next = nullptr;
3108   GRPC_ERROR_UNREF(stream_->byte_stream_error);
3109   stream_->byte_stream_error = GRPC_ERROR_REF(error);
3110   grpc_chttp2_cancel_stream(transport_, stream_, GRPC_ERROR_REF(error));
3111 }
3112 
Push(const grpc_slice & slice,grpc_slice * slice_out)3113 grpc_error_handle Chttp2IncomingByteStream::Push(const grpc_slice& slice,
3114                                                  grpc_slice* slice_out) {
3115   if (remaining_bytes_ < GRPC_SLICE_LENGTH(slice)) {
3116     grpc_error_handle error =
3117         GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream");
3118     transport_->combiner->Run(&stream_->reset_byte_stream,
3119                               GRPC_ERROR_REF(error));
3120     grpc_slice_unref_internal(slice);
3121     return error;
3122   } else {
3123     remaining_bytes_ -= static_cast<uint32_t> GRPC_SLICE_LENGTH(slice);
3124     if (slice_out != nullptr) {
3125       *slice_out = slice;
3126     }
3127     return GRPC_ERROR_NONE;
3128   }
3129 }
3130 
Finished(grpc_error_handle error,bool reset_on_error)3131 grpc_error_handle Chttp2IncomingByteStream::Finished(grpc_error_handle error,
3132                                                      bool reset_on_error) {
3133   if (error == GRPC_ERROR_NONE) {
3134     if (remaining_bytes_ != 0) {
3135       error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
3136     }
3137   }
3138   if (error != GRPC_ERROR_NONE && reset_on_error) {
3139     transport_->combiner->Run(&stream_->reset_byte_stream,
3140                               GRPC_ERROR_REF(error));
3141   }
3142   Unref();
3143   return error;
3144 }
3145 
Shutdown(grpc_error_handle error)3146 void Chttp2IncomingByteStream::Shutdown(grpc_error_handle error) {
3147   GRPC_ERROR_UNREF(Finished(error, true /* reset_on_error */));
3148 }
3149 
3150 }  // namespace grpc_core
3151 
3152 //
3153 // RESOURCE QUOTAS
3154 //
3155 
post_benign_reclaimer(grpc_chttp2_transport * t)3156 static void post_benign_reclaimer(grpc_chttp2_transport* t) {
3157   if (!t->benign_reclaimer_registered) {
3158     t->benign_reclaimer_registered = true;
3159     GRPC_CHTTP2_REF_TRANSPORT(t, "benign_reclaimer");
3160     GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked, benign_reclaimer, t,
3161                       grpc_schedule_on_exec_ctx);
3162     grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep),
3163                                       false, &t->benign_reclaimer_locked);
3164   }
3165 }
3166 
post_destructive_reclaimer(grpc_chttp2_transport * t)3167 static void post_destructive_reclaimer(grpc_chttp2_transport* t) {
3168   if (!t->destructive_reclaimer_registered) {
3169     t->destructive_reclaimer_registered = true;
3170     GRPC_CHTTP2_REF_TRANSPORT(t, "destructive_reclaimer");
3171     GRPC_CLOSURE_INIT(&t->destructive_reclaimer_locked, destructive_reclaimer,
3172                       t, grpc_schedule_on_exec_ctx);
3173     grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep),
3174                                       true, &t->destructive_reclaimer_locked);
3175   }
3176 }
3177 
benign_reclaimer(void * arg,grpc_error_handle error)3178 static void benign_reclaimer(void* arg, grpc_error_handle error) {
3179   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
3180   t->combiner->Run(GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked,
3181                                      benign_reclaimer_locked, t, nullptr),
3182                    GRPC_ERROR_REF(error));
3183 }
3184 
benign_reclaimer_locked(void * arg,grpc_error_handle error)3185 static void benign_reclaimer_locked(void* arg, grpc_error_handle error) {
3186   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
3187   if (error == GRPC_ERROR_NONE &&
3188       grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
3189     // Channel with no active streams: send a goaway to try and make it
3190     // disconnect cleanly
3191     if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
3192       gpr_log(GPR_INFO, "HTTP2: %s - send goaway to free memory",
3193               t->peer_string.c_str());
3194     }
3195     send_goaway(t,
3196                 grpc_error_set_int(
3197                     GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"),
3198                     GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM));
3199   } else if (error == GRPC_ERROR_NONE &&
3200              GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
3201     gpr_log(GPR_INFO,
3202             "HTTP2: %s - skip benign reclamation, there are still %" PRIdPTR
3203             " streams",
3204             t->peer_string.c_str(),
3205             grpc_chttp2_stream_map_size(&t->stream_map));
3206   }
3207   t->benign_reclaimer_registered = false;
3208   if (error != GRPC_ERROR_CANCELLED) {
3209     grpc_resource_user_finish_reclamation(
3210         grpc_endpoint_get_resource_user(t->ep));
3211   }
3212   GRPC_CHTTP2_UNREF_TRANSPORT(t, "benign_reclaimer");
3213 }
3214 
destructive_reclaimer(void * arg,grpc_error_handle error)3215 static void destructive_reclaimer(void* arg, grpc_error_handle error) {
3216   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
3217   t->combiner->Run(GRPC_CLOSURE_INIT(&t->destructive_reclaimer_locked,
3218                                      destructive_reclaimer_locked, t, nullptr),
3219                    GRPC_ERROR_REF(error));
3220 }
3221 
destructive_reclaimer_locked(void * arg,grpc_error_handle error)3222 static void destructive_reclaimer_locked(void* arg, grpc_error_handle error) {
3223   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
3224   size_t n = grpc_chttp2_stream_map_size(&t->stream_map);
3225   t->destructive_reclaimer_registered = false;
3226   if (error == GRPC_ERROR_NONE && n > 0) {
3227     grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(
3228         grpc_chttp2_stream_map_rand(&t->stream_map));
3229     if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
3230       gpr_log(GPR_INFO, "HTTP2: %s - abandon stream id %d",
3231               t->peer_string.c_str(), s->id);
3232     }
3233     grpc_chttp2_cancel_stream(
3234         t, s,
3235         grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"),
3236                            GRPC_ERROR_INT_HTTP2_ERROR,
3237                            GRPC_HTTP2_ENHANCE_YOUR_CALM));
3238     if (n > 1) {
3239       // Since we cancel one stream per destructive reclamation, if
3240       //   there are more streams left, we can immediately post a new
3241       //   reclaimer in case the resource quota needs to free more
3242       //   memory
3243       post_destructive_reclaimer(t);
3244     }
3245   }
3246   if (error != GRPC_ERROR_CANCELLED) {
3247     grpc_resource_user_finish_reclamation(
3248         grpc_endpoint_get_resource_user(t->ep));
3249   }
3250   GRPC_CHTTP2_UNREF_TRANSPORT(t, "destructive_reclaimer");
3251 }
3252 
3253 //
3254 // MONITORING
3255 //
3256 
grpc_chttp2_initiate_write_reason_string(grpc_chttp2_initiate_write_reason reason)3257 const char* grpc_chttp2_initiate_write_reason_string(
3258     grpc_chttp2_initiate_write_reason reason) {
3259   switch (reason) {
3260     case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE:
3261       return "INITIAL_WRITE";
3262     case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM:
3263       return "START_NEW_STREAM";
3264     case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE:
3265       return "SEND_MESSAGE";
3266     case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA:
3267       return "SEND_INITIAL_METADATA";
3268     case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA:
3269       return "SEND_TRAILING_METADATA";
3270     case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING:
3271       return "RETRY_SEND_PING";
3272     case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS:
3273       return "CONTINUE_PINGS";
3274     case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT:
3275       return "GOAWAY_SENT";
3276     case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM:
3277       return "RST_STREAM";
3278     case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API:
3279       return "CLOSE_FROM_API";
3280     case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL:
3281       return "STREAM_FLOW_CONTROL";
3282     case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL:
3283       return "TRANSPORT_FLOW_CONTROL";
3284     case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS:
3285       return "SEND_SETTINGS";
3286     case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING:
3287       return "FLOW_CONTROL_UNSTALLED_BY_SETTING";
3288     case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE:
3289       return "FLOW_CONTROL_UNSTALLED_BY_UPDATE";
3290     case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING:
3291       return "APPLICATION_PING";
3292     case GRPC_CHTTP2_INITIATE_WRITE_BDP_PING:
3293       return "BDP_PING";
3294     case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING:
3295       return "KEEPALIVE_PING";
3296     case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED:
3297       return "TRANSPORT_FLOW_CONTROL_UNSTALLED";
3298     case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE:
3299       return "PING_RESPONSE";
3300     case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM:
3301       return "FORCE_RST_STREAM";
3302   }
3303   GPR_UNREACHABLE_CODE(return "unknown");
3304 }
3305 
chttp2_get_endpoint(grpc_transport * t)3306 static grpc_endpoint* chttp2_get_endpoint(grpc_transport* t) {
3307   return (reinterpret_cast<grpc_chttp2_transport*>(t))->ep;
3308 }
3309 
3310 static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
3311                                              "chttp2",
3312                                              init_stream,
3313                                              set_pollset,
3314                                              set_pollset_set,
3315                                              perform_stream_op,
3316                                              perform_transport_op,
3317                                              destroy_stream,
3318                                              destroy_transport,
3319                                              chttp2_get_endpoint};
3320 
get_vtable(void)3321 static const grpc_transport_vtable* get_vtable(void) { return &vtable; }
3322 
3323 grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode>
grpc_chttp2_transport_get_socket_node(grpc_transport * transport)3324 grpc_chttp2_transport_get_socket_node(grpc_transport* transport) {
3325   grpc_chttp2_transport* t =
3326       reinterpret_cast<grpc_chttp2_transport*>(transport);
3327   return t->channelz_socket;
3328 }
3329 
grpc_create_chttp2_transport(const grpc_channel_args * channel_args,grpc_endpoint * ep,bool is_client,grpc_resource_user * resource_user)3330 grpc_transport* grpc_create_chttp2_transport(
3331     const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client,
3332     grpc_resource_user* resource_user) {
3333   auto t =
3334       new grpc_chttp2_transport(channel_args, ep, is_client, resource_user);
3335   return &t->base;
3336 }
3337 
grpc_chttp2_transport_start_reading(grpc_transport * transport,grpc_slice_buffer * read_buffer,grpc_closure * notify_on_receive_settings,grpc_closure * notify_on_close)3338 void grpc_chttp2_transport_start_reading(
3339     grpc_transport* transport, grpc_slice_buffer* read_buffer,
3340     grpc_closure* notify_on_receive_settings, grpc_closure* notify_on_close) {
3341   grpc_chttp2_transport* t =
3342       reinterpret_cast<grpc_chttp2_transport*>(transport);
3343   GRPC_CHTTP2_REF_TRANSPORT(
3344       t, "reading_action"); /* matches unref inside reading_action */
3345   if (read_buffer != nullptr) {
3346     grpc_slice_buffer_move_into(read_buffer, &t->read_buffer);
3347     gpr_free(read_buffer);
3348   }
3349   t->notify_on_receive_settings = notify_on_receive_settings;
3350   t->notify_on_close = notify_on_close;
3351   t->combiner->Run(
3352       GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t, nullptr),
3353       GRPC_ERROR_NONE);
3354 }
3355