• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
20 
21 #include "absl/strings/str_format.h"
22 
23 #include <grpc/slice_buffer.h>
24 #include <grpc/support/alloc.h>
25 #include <grpc/support/log.h>
26 #include <grpc/support/port_platform.h>
27 #include <grpc/support/string_util.h>
28 #include <inttypes.h>
29 #include <limits.h>
30 #include <math.h>
31 #include <stdio.h>
32 #include <string.h>
33 
34 #include "src/core/ext/transport/chttp2/transport/context_list.h"
35 #include "src/core/ext/transport/chttp2/transport/frame_data.h"
36 #include "src/core/ext/transport/chttp2/transport/internal.h"
37 #include "src/core/ext/transport/chttp2/transport/varint.h"
38 #include "src/core/lib/channel/channel_args.h"
39 #include "src/core/lib/compression/stream_compression.h"
40 #include "src/core/lib/debug/stats.h"
41 #include "src/core/lib/gpr/env.h"
42 #include "src/core/lib/gpr/string.h"
43 #include "src/core/lib/gprpp/memory.h"
44 #include "src/core/lib/http/parser.h"
45 #include "src/core/lib/iomgr/executor.h"
46 #include "src/core/lib/iomgr/iomgr.h"
47 #include "src/core/lib/iomgr/timer.h"
48 #include "src/core/lib/profiling/timers.h"
49 #include "src/core/lib/slice/slice_internal.h"
50 #include "src/core/lib/slice/slice_string_helpers.h"
51 #include "src/core/lib/transport/error_utils.h"
52 #include "src/core/lib/transport/http2_errors.h"
53 #include "src/core/lib/transport/static_metadata.h"
54 #include "src/core/lib/transport/status_conversion.h"
55 #include "src/core/lib/transport/timeout_encoding.h"
56 #include "src/core/lib/transport/transport.h"
57 #include "src/core/lib/transport/transport_impl.h"
58 #include "src/core/lib/uri/uri_parser.h"
59 
60 #define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
61 #define MAX_WINDOW 0x7fffffffu
62 #define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024)
63 #define DEFAULT_MAX_HEADER_LIST_SIZE (8 * 1024)
64 
65 #define DEFAULT_CLIENT_KEEPALIVE_TIME_MS INT_MAX
66 #define DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_MS 20000 /* 20 seconds */
67 #define DEFAULT_SERVER_KEEPALIVE_TIME_MS 7200000  /* 2 hours */
68 #define DEFAULT_SERVER_KEEPALIVE_TIMEOUT_MS 20000 /* 20 seconds */
69 #define DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS false
70 #define KEEPALIVE_TIME_BACKOFF_MULTIPLIER 2
71 
72 #define DEFAULT_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS 300000 /* 5 minutes */
73 #define DEFAULT_MAX_PINGS_BETWEEN_DATA 2
74 #define DEFAULT_MAX_PING_STRIKES 2
75 
76 #define DEFAULT_MAX_PENDING_INDUCED_FRAMES 10000
77 
78 static int g_default_client_keepalive_time_ms =
79     DEFAULT_CLIENT_KEEPALIVE_TIME_MS;
80 static int g_default_client_keepalive_timeout_ms =
81     DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_MS;
82 static int g_default_server_keepalive_time_ms =
83     DEFAULT_SERVER_KEEPALIVE_TIME_MS;
84 static int g_default_server_keepalive_timeout_ms =
85     DEFAULT_SERVER_KEEPALIVE_TIMEOUT_MS;
86 static bool g_default_client_keepalive_permit_without_calls =
87     DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS;
88 static bool g_default_server_keepalive_permit_without_calls =
89     DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS;
90 
91 static int g_default_min_recv_ping_interval_without_data_ms =
92     DEFAULT_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS;
93 static int g_default_max_pings_without_data = DEFAULT_MAX_PINGS_BETWEEN_DATA;
94 static int g_default_max_ping_strikes = DEFAULT_MAX_PING_STRIKES;
95 
96 #define MAX_CLIENT_STREAM_ID 0x7fffffffu
97 grpc_core::TraceFlag grpc_http_trace(false, "http");
98 grpc_core::TraceFlag grpc_keepalive_trace(false, "http_keepalive");
99 grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount(false,
100                                                          "chttp2_refcount");
101 
102 // forward declarations of various callbacks that we'll build closures around
103 static void write_action_begin_locked(void* t, grpc_error* error);
104 static void write_action(void* t, grpc_error* error);
105 static void write_action_end(void* t, grpc_error* error);
106 static void write_action_end_locked(void* t, grpc_error* error);
107 
108 static void read_action(void* t, grpc_error* error);
109 static void read_action_locked(void* t, grpc_error* error);
110 static void continue_read_action_locked(grpc_chttp2_transport* t);
111 
112 static void complete_fetch(void* gs, grpc_error* error);
113 static void complete_fetch_locked(void* gs, grpc_error* error);
114 // Set a transport level setting, and push it to our peer
115 static void queue_setting_update(grpc_chttp2_transport* t,
116                                  grpc_chttp2_setting_id id, uint32_t value);
117 
118 static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
119                            grpc_error* error);
120 
121 // Start new streams that have been created if we can
122 static void maybe_start_some_streams(grpc_chttp2_transport* t);
123 
124 static void connectivity_state_set(grpc_chttp2_transport* t,
125                                    grpc_connectivity_state state,
126                                    const absl::Status& status,
127                                    const char* reason);
128 
129 static void benign_reclaimer(void* arg, grpc_error* error);
130 static void destructive_reclaimer(void* arg, grpc_error* error);
131 static void benign_reclaimer_locked(void* arg, grpc_error* error);
132 static void destructive_reclaimer_locked(void* arg, grpc_error* error);
133 
134 static void post_benign_reclaimer(grpc_chttp2_transport* t);
135 static void post_destructive_reclaimer(grpc_chttp2_transport* t);
136 
137 static void close_transport_locked(grpc_chttp2_transport* t, grpc_error* error);
138 static void end_all_the_calls(grpc_chttp2_transport* t, grpc_error* error);
139 
140 static void start_bdp_ping(void* tp, grpc_error* error);
141 static void finish_bdp_ping(void* tp, grpc_error* error);
142 static void start_bdp_ping_locked(void* tp, grpc_error* error);
143 static void finish_bdp_ping_locked(void* tp, grpc_error* error);
144 static void next_bdp_ping_timer_expired(void* tp, grpc_error* error);
145 static void next_bdp_ping_timer_expired_locked(void* tp, grpc_error* error);
146 
147 static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error);
148 static void send_ping_locked(grpc_chttp2_transport* t,
149                              grpc_closure* on_initiate, grpc_closure* on_ack);
150 static void retry_initiate_ping_locked(void* tp, grpc_error* error);
151 
152 // keepalive-relevant functions
153 static void init_keepalive_ping(void* arg, grpc_error* error);
154 static void init_keepalive_ping_locked(void* arg, grpc_error* error);
155 static void start_keepalive_ping(void* arg, grpc_error* error);
156 static void finish_keepalive_ping(void* arg, grpc_error* error);
157 static void start_keepalive_ping_locked(void* arg, grpc_error* error);
158 static void finish_keepalive_ping_locked(void* arg, grpc_error* error);
159 static void keepalive_watchdog_fired(void* arg, grpc_error* error);
160 static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error);
161 
162 static void reset_byte_stream(void* arg, grpc_error* error);
163 
164 // Flow control default enabled. Can be disabled by setting
165 // GRPC_EXPERIMENTAL_DISABLE_FLOW_CONTROL
166 bool g_flow_control_enabled = true;
167 
168 //
169 // CONSTRUCTION/DESTRUCTION/REFCOUNTING
170 //
171 
~grpc_chttp2_transport()172 grpc_chttp2_transport::~grpc_chttp2_transport() {
173   size_t i;
174 
175   if (channelz_socket != nullptr) {
176     channelz_socket.reset();
177   }
178 
179   grpc_endpoint_destroy(ep);
180 
181   grpc_slice_buffer_destroy_internal(&qbuf);
182 
183   grpc_slice_buffer_destroy_internal(&outbuf);
184   grpc_chttp2_hpack_compressor_destroy(&hpack_compressor);
185 
186   grpc_error* error =
187       GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed");
188   // ContextList::Execute follows semantics of a callback function and does not
189   // take a ref on error
190   grpc_core::ContextList::Execute(cl, nullptr, error);
191   GRPC_ERROR_UNREF(error);
192   cl = nullptr;
193 
194   grpc_slice_buffer_destroy_internal(&read_buffer);
195   grpc_chttp2_hpack_parser_destroy(&hpack_parser);
196   grpc_chttp2_goaway_parser_destroy(&goaway_parser);
197 
198   for (i = 0; i < STREAM_LIST_COUNT; i++) {
199     GPR_ASSERT(lists[i].head == nullptr);
200     GPR_ASSERT(lists[i].tail == nullptr);
201   }
202 
203   GRPC_ERROR_UNREF(goaway_error);
204 
205   GPR_ASSERT(grpc_chttp2_stream_map_size(&stream_map) == 0);
206 
207   grpc_chttp2_stream_map_destroy(&stream_map);
208 
209   GRPC_COMBINER_UNREF(combiner, "chttp2_transport");
210 
211   cancel_pings(this,
212                GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"));
213 
214   while (write_cb_pool) {
215     grpc_chttp2_write_cb* next = write_cb_pool->next;
216     gpr_free(write_cb_pool);
217     write_cb_pool = next;
218   }
219 
220   flow_control.Destroy();
221 
222   GRPC_ERROR_UNREF(closed_with_error);
223   gpr_free(ping_acks);
224 }
225 
226 static const grpc_transport_vtable* get_vtable(void);
227 
228 // Returns whether bdp is enabled
read_channel_args(grpc_chttp2_transport * t,const grpc_channel_args * channel_args,bool is_client)229 static bool read_channel_args(grpc_chttp2_transport* t,
230                               const grpc_channel_args* channel_args,
231                               bool is_client) {
232   bool enable_bdp = true;
233   bool channelz_enabled = GRPC_ENABLE_CHANNELZ_DEFAULT;
234   size_t i;
235   int j;
236 
237   for (i = 0; i < channel_args->num_args; i++) {
238     if (0 == strcmp(channel_args->args[i].key,
239                     GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER)) {
240       const grpc_integer_options options = {-1, 0, INT_MAX};
241       const int value =
242           grpc_channel_arg_get_integer(&channel_args->args[i], options);
243       if (value >= 0) {
244         if ((t->next_stream_id & 1) != (value & 1)) {
245           gpr_log(GPR_ERROR, "%s: low bit must be %d on %s",
246                   GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER, t->next_stream_id & 1,
247                   is_client ? "client" : "server");
248         } else {
249           t->next_stream_id = static_cast<uint32_t>(value);
250         }
251       }
252     } else if (0 == strcmp(channel_args->args[i].key,
253                            GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER)) {
254       const grpc_integer_options options = {-1, 0, INT_MAX};
255       const int value =
256           grpc_channel_arg_get_integer(&channel_args->args[i], options);
257       if (value >= 0) {
258         grpc_chttp2_hpack_compressor_set_max_usable_size(
259             &t->hpack_compressor, static_cast<uint32_t>(value));
260       }
261     } else if (0 == strcmp(channel_args->args[i].key,
262                            GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) {
263       t->ping_policy.max_pings_without_data = grpc_channel_arg_get_integer(
264           &channel_args->args[i],
265           {g_default_max_pings_without_data, 0, INT_MAX});
266     } else if (0 == strcmp(channel_args->args[i].key,
267                            GRPC_ARG_HTTP2_MAX_PING_STRIKES)) {
268       t->ping_policy.max_ping_strikes = grpc_channel_arg_get_integer(
269           &channel_args->args[i], {g_default_max_ping_strikes, 0, INT_MAX});
270     } else if (0 ==
271                strcmp(channel_args->args[i].key,
272                       GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS)) {
273       t->ping_policy.min_recv_ping_interval_without_data =
274           grpc_channel_arg_get_integer(
275               &channel_args->args[i],
276               grpc_integer_options{
277                   g_default_min_recv_ping_interval_without_data_ms, 0,
278                   INT_MAX});
279     } else if (0 == strcmp(channel_args->args[i].key,
280                            GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)) {
281       t->write_buffer_size = static_cast<uint32_t>(grpc_channel_arg_get_integer(
282           &channel_args->args[i], {0, 0, MAX_WRITE_BUFFER_SIZE}));
283     } else if (0 ==
284                strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) {
285       enable_bdp = grpc_channel_arg_get_bool(&channel_args->args[i], true);
286     } else if (0 ==
287                strcmp(channel_args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) {
288       const int value = grpc_channel_arg_get_integer(
289           &channel_args->args[i],
290           grpc_integer_options{t->is_client
291                                    ? g_default_client_keepalive_time_ms
292                                    : g_default_server_keepalive_time_ms,
293                                1, INT_MAX});
294       t->keepalive_time = value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value;
295     } else if (0 == strcmp(channel_args->args[i].key,
296                            GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) {
297       const int value = grpc_channel_arg_get_integer(
298           &channel_args->args[i],
299           grpc_integer_options{t->is_client
300                                    ? g_default_client_keepalive_timeout_ms
301                                    : g_default_server_keepalive_timeout_ms,
302                                0, INT_MAX});
303       t->keepalive_timeout = value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value;
304     } else if (0 == strcmp(channel_args->args[i].key,
305                            GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) {
306       t->keepalive_permit_without_calls = static_cast<uint32_t>(
307           grpc_channel_arg_get_integer(&channel_args->args[i], {0, 0, 1}));
308     } else if (0 == strcmp(channel_args->args[i].key,
309                            GRPC_ARG_OPTIMIZATION_TARGET)) {
310       gpr_log(GPR_INFO, "GRPC_ARG_OPTIMIZATION_TARGET is deprecated");
311     } else if (0 ==
312                strcmp(channel_args->args[i].key, GRPC_ARG_ENABLE_CHANNELZ)) {
313       channelz_enabled = grpc_channel_arg_get_bool(
314           &channel_args->args[i], GRPC_ENABLE_CHANNELZ_DEFAULT);
315     } else {
316       static const struct {
317         const char* channel_arg_name;
318         grpc_chttp2_setting_id setting_id;
319         grpc_integer_options integer_options;
320         bool availability[2] /* server, client */;
321       } settings_map[] = {{GRPC_ARG_MAX_CONCURRENT_STREAMS,
322                            GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
323                            {-1, 0, INT32_MAX},
324                            {true, false}},
325                           {GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER,
326                            GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE,
327                            {-1, 0, INT32_MAX},
328                            {true, true}},
329                           {GRPC_ARG_MAX_METADATA_SIZE,
330                            GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
331                            {-1, 0, INT32_MAX},
332                            {true, true}},
333                           {GRPC_ARG_HTTP2_MAX_FRAME_SIZE,
334                            GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
335                            {-1, 16384, 16777215},
336                            {true, true}},
337                           {GRPC_ARG_HTTP2_ENABLE_TRUE_BINARY,
338                            GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA,
339                            {1, 0, 1},
340                            {true, true}},
341                           {GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES,
342                            GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
343                            {-1, 5, INT32_MAX},
344                            {true, true}}};
345       for (j = 0; j < static_cast<int> GPR_ARRAY_SIZE(settings_map); j++) {
346         if (0 == strcmp(channel_args->args[i].key,
347                         settings_map[j].channel_arg_name)) {
348           if (!settings_map[j].availability[is_client]) {
349             gpr_log(GPR_DEBUG, "%s is not available on %s",
350                     settings_map[j].channel_arg_name,
351                     is_client ? "clients" : "servers");
352           } else {
353             int value = grpc_channel_arg_get_integer(
354                 &channel_args->args[i], settings_map[j].integer_options);
355             if (value >= 0) {
356               queue_setting_update(t, settings_map[j].setting_id,
357                                    static_cast<uint32_t>(value));
358             }
359           }
360           break;
361         }
362       }
363     }
364   }
365   if (channelz_enabled) {
366     t->channelz_socket =
367         grpc_core::MakeRefCounted<grpc_core::channelz::SocketNode>(
368             std::string(grpc_endpoint_get_local_address(t->ep)), t->peer_string,
369             absl::StrFormat("%s %s", get_vtable()->name, t->peer_string));
370   }
371   return enable_bdp;
372 }
373 
init_transport_keepalive_settings(grpc_chttp2_transport * t)374 static void init_transport_keepalive_settings(grpc_chttp2_transport* t) {
375   if (t->is_client) {
376     t->keepalive_time = g_default_client_keepalive_time_ms == INT_MAX
377                             ? GRPC_MILLIS_INF_FUTURE
378                             : g_default_client_keepalive_time_ms;
379     t->keepalive_timeout = g_default_client_keepalive_timeout_ms == INT_MAX
380                                ? GRPC_MILLIS_INF_FUTURE
381                                : g_default_client_keepalive_timeout_ms;
382     t->keepalive_permit_without_calls =
383         g_default_client_keepalive_permit_without_calls;
384   } else {
385     t->keepalive_time = g_default_server_keepalive_time_ms == INT_MAX
386                             ? GRPC_MILLIS_INF_FUTURE
387                             : g_default_server_keepalive_time_ms;
388     t->keepalive_timeout = g_default_server_keepalive_timeout_ms == INT_MAX
389                                ? GRPC_MILLIS_INF_FUTURE
390                                : g_default_server_keepalive_timeout_ms;
391     t->keepalive_permit_without_calls =
392         g_default_server_keepalive_permit_without_calls;
393   }
394 }
395 
configure_transport_ping_policy(grpc_chttp2_transport * t)396 static void configure_transport_ping_policy(grpc_chttp2_transport* t) {
397   t->ping_policy.max_pings_without_data = g_default_max_pings_without_data;
398   t->ping_policy.max_ping_strikes = g_default_max_ping_strikes;
399   t->ping_policy.min_recv_ping_interval_without_data =
400       g_default_min_recv_ping_interval_without_data_ms;
401 }
402 
init_keepalive_pings_if_enabled(grpc_chttp2_transport * t)403 static void init_keepalive_pings_if_enabled(grpc_chttp2_transport* t) {
404   if (t->keepalive_time != GRPC_MILLIS_INF_FUTURE) {
405     t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
406     GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
407     GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t,
408                       grpc_schedule_on_exec_ctx);
409     grpc_timer_init(&t->keepalive_ping_timer,
410                     grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
411                     &t->init_keepalive_ping_locked);
412   } else {
413     // Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no
414     //   inflight keeaplive timers
415     t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED;
416   }
417 }
418 
grpc_chttp2_transport(const grpc_channel_args * channel_args,grpc_endpoint * ep,bool is_client,grpc_resource_user * resource_user)419 grpc_chttp2_transport::grpc_chttp2_transport(
420     const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client,
421     grpc_resource_user* resource_user)
422     : refs(1, GRPC_TRACE_FLAG_ENABLED(grpc_trace_chttp2_refcount)
423                   ? "chttp2_refcount"
424                   : nullptr),
425       ep(ep),
426       peer_string(grpc_endpoint_get_peer(ep)),
427       resource_user(resource_user),
428       combiner(grpc_combiner_create()),
429       state_tracker(is_client ? "client_transport" : "server_transport",
430                     GRPC_CHANNEL_READY),
431       is_client(is_client),
432       next_stream_id(is_client ? 1 : 2),
433       deframe_state(is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0) {
434   GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) ==
435              GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
436   base.vtable = get_vtable();
437   // 8 is a random stab in the dark as to a good initial size: it's small enough
438   //   that it shouldn't waste memory for infrequently used connections, yet
439   //   large enough that the exponential growth should happen nicely when it's
440   //   needed.
441   //   TODO(ctiller): tune this
442   grpc_chttp2_stream_map_init(&stream_map, 8);
443 
444   grpc_slice_buffer_init(&read_buffer);
445   grpc_slice_buffer_init(&outbuf);
446   if (is_client) {
447     grpc_slice_buffer_add(&outbuf, grpc_slice_from_copied_string(
448                                        GRPC_CHTTP2_CLIENT_CONNECT_STRING));
449   }
450   grpc_chttp2_hpack_compressor_init(&hpack_compressor);
451   grpc_slice_buffer_init(&qbuf);
452   // copy in initial settings to all setting sets
453   size_t i;
454   int j;
455   for (i = 0; i < GRPC_CHTTP2_NUM_SETTINGS; i++) {
456     for (j = 0; j < GRPC_NUM_SETTING_SETS; j++) {
457       settings[j][i] = grpc_chttp2_settings_parameters[i].default_value;
458     }
459   }
460   grpc_chttp2_hpack_parser_init(&hpack_parser);
461   grpc_chttp2_goaway_parser_init(&goaway_parser);
462 
463   // configure http2 the way we like it
464   if (is_client) {
465     queue_setting_update(this, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
466     queue_setting_update(this, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
467   }
468   queue_setting_update(this, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
469                        DEFAULT_MAX_HEADER_LIST_SIZE);
470   queue_setting_update(this,
471                        GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, 1);
472 
473   configure_transport_ping_policy(this);
474   init_transport_keepalive_settings(this);
475 
476   bool enable_bdp = true;
477   if (channel_args) {
478     enable_bdp = read_channel_args(this, channel_args, is_client);
479   }
480 
481   if (g_flow_control_enabled) {
482     flow_control.Init<grpc_core::chttp2::TransportFlowControl>(this,
483                                                                enable_bdp);
484   } else {
485     flow_control.Init<grpc_core::chttp2::TransportFlowControlDisabled>(this);
486     enable_bdp = false;
487   }
488 
489   // No pings allowed before receiving a header or data frame.
490   ping_state.pings_before_data_required = 0;
491   ping_state.is_delayed_ping_timer_set = false;
492   ping_state.last_ping_sent_time = GRPC_MILLIS_INF_PAST;
493 
494   ping_recv_state.last_ping_recv_time = GRPC_MILLIS_INF_PAST;
495   ping_recv_state.ping_strikes = 0;
496 
497   init_keepalive_pings_if_enabled(this);
498 
499   if (enable_bdp) {
500     bdp_ping_blocked = true;
501     grpc_chttp2_act_on_flowctl_action(flow_control->PeriodicUpdate(), this,
502                                       nullptr);
503   }
504 
505   grpc_chttp2_initiate_write(this, GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE);
506   post_benign_reclaimer(this);
507 }
508 
destroy_transport_locked(void * tp,grpc_error *)509 static void destroy_transport_locked(void* tp, grpc_error* /*error*/) {
510   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
511   t->destroying = 1;
512   close_transport_locked(
513       t, grpc_error_set_int(
514              GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"),
515              GRPC_ERROR_INT_OCCURRED_DURING_WRITE, t->write_state));
516   // Must be the last line.
517   GRPC_CHTTP2_UNREF_TRANSPORT(t, "destroy");
518 }
519 
destroy_transport(grpc_transport * gt)520 static void destroy_transport(grpc_transport* gt) {
521   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
522   t->combiner->Run(GRPC_CLOSURE_CREATE(destroy_transport_locked, t, nullptr),
523                    GRPC_ERROR_NONE);
524 }
525 
close_transport_locked(grpc_chttp2_transport * t,grpc_error * error)526 static void close_transport_locked(grpc_chttp2_transport* t,
527                                    grpc_error* error) {
528   end_all_the_calls(t, GRPC_ERROR_REF(error));
529   cancel_pings(t, GRPC_ERROR_REF(error));
530   if (t->closed_with_error == GRPC_ERROR_NONE) {
531     if (!grpc_error_has_clear_grpc_status(error)) {
532       error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
533                                  GRPC_STATUS_UNAVAILABLE);
534     }
535     if (t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE) {
536       if (t->close_transport_on_writes_finished == nullptr) {
537         t->close_transport_on_writes_finished =
538             GRPC_ERROR_CREATE_FROM_STATIC_STRING(
539                 "Delayed close due to in-progress write");
540       }
541       t->close_transport_on_writes_finished =
542           grpc_error_add_child(t->close_transport_on_writes_finished, error);
543       return;
544     }
545     GPR_ASSERT(error != GRPC_ERROR_NONE);
546     t->closed_with_error = GRPC_ERROR_REF(error);
547     connectivity_state_set(t, GRPC_CHANNEL_SHUTDOWN, absl::Status(),
548                            "close_transport");
549     if (t->ping_state.is_delayed_ping_timer_set) {
550       grpc_timer_cancel(&t->ping_state.delayed_ping_timer);
551     }
552     if (t->have_next_bdp_ping_timer) {
553       grpc_timer_cancel(&t->next_bdp_ping_timer);
554     }
555     switch (t->keepalive_state) {
556       case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING:
557         grpc_timer_cancel(&t->keepalive_ping_timer);
558         break;
559       case GRPC_CHTTP2_KEEPALIVE_STATE_PINGING:
560         grpc_timer_cancel(&t->keepalive_ping_timer);
561         grpc_timer_cancel(&t->keepalive_watchdog_timer);
562         break;
563       case GRPC_CHTTP2_KEEPALIVE_STATE_DYING:
564       case GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED:
565         // keepalive timers are not set in these two states
566         break;
567     }
568 
569     // flush writable stream list to avoid dangling references
570     grpc_chttp2_stream* s;
571     while (grpc_chttp2_list_pop_writable_stream(t, &s)) {
572       GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:close");
573     }
574     GPR_ASSERT(t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE);
575     grpc_endpoint_shutdown(t->ep, GRPC_ERROR_REF(error));
576   }
577   if (t->notify_on_receive_settings != nullptr) {
578     grpc_core::ExecCtx::Run(DEBUG_LOCATION, t->notify_on_receive_settings,
579                             GRPC_ERROR_REF(error));
580     t->notify_on_receive_settings = nullptr;
581   }
582   GRPC_ERROR_UNREF(error);
583 }
584 
585 #ifndef NDEBUG
grpc_chttp2_stream_ref(grpc_chttp2_stream * s,const char * reason)586 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s, const char* reason) {
587   grpc_stream_ref(s->refcount, reason);
588 }
grpc_chttp2_stream_unref(grpc_chttp2_stream * s,const char * reason)589 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s, const char* reason) {
590   grpc_stream_unref(s->refcount, reason);
591 }
592 #else
grpc_chttp2_stream_ref(grpc_chttp2_stream * s)593 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s) {
594   grpc_stream_ref(s->refcount);
595 }
grpc_chttp2_stream_unref(grpc_chttp2_stream * s)596 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s) {
597   grpc_stream_unref(s->refcount);
598 }
599 #endif
600 
Reffer(grpc_chttp2_stream * s)601 grpc_chttp2_stream::Reffer::Reffer(grpc_chttp2_stream* s) {
602   // We reserve one 'active stream' that's dropped when the stream is
603   //   read-closed. The others are for Chttp2IncomingByteStreams that are
604   //   actively reading
605   GRPC_CHTTP2_STREAM_REF(s, "chttp2");
606   GRPC_CHTTP2_REF_TRANSPORT(s->t, "stream");
607 }
608 
grpc_chttp2_stream(grpc_chttp2_transport * t,grpc_stream_refcount * refcount,const void * server_data,grpc_core::Arena * arena)609 grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t,
610                                        grpc_stream_refcount* refcount,
611                                        const void* server_data,
612                                        grpc_core::Arena* arena)
613     : t(t),
614       refcount(refcount),
615       reffer(this),
616       metadata_buffer{grpc_chttp2_incoming_metadata_buffer(arena),
617                       grpc_chttp2_incoming_metadata_buffer(arena)} {
618   if (server_data) {
619     id = static_cast<uint32_t>(reinterpret_cast<uintptr_t>(server_data));
620     *t->accepting_stream = this;
621     grpc_chttp2_stream_map_add(&t->stream_map, id, this);
622     post_destructive_reclaimer(t);
623   }
624   if (t->flow_control->flow_control_enabled()) {
625     flow_control.Init<grpc_core::chttp2::StreamFlowControl>(
626         static_cast<grpc_core::chttp2::TransportFlowControl*>(
627             t->flow_control.get()),
628         this);
629   } else {
630     flow_control.Init<grpc_core::chttp2::StreamFlowControlDisabled>();
631   }
632 
633   grpc_slice_buffer_init(&frame_storage);
634   grpc_slice_buffer_init(&unprocessed_incoming_frames_buffer);
635   grpc_slice_buffer_init(&flow_controlled_buffer);
636   GRPC_CLOSURE_INIT(&reset_byte_stream, ::reset_byte_stream, this, nullptr);
637 }
638 
~grpc_chttp2_stream()639 grpc_chttp2_stream::~grpc_chttp2_stream() {
640   if (t->channelz_socket != nullptr) {
641     if ((t->is_client && eos_received) || (!t->is_client && eos_sent)) {
642       t->channelz_socket->RecordStreamSucceeded();
643     } else {
644       t->channelz_socket->RecordStreamFailed();
645     }
646   }
647 
648   GPR_ASSERT((write_closed && read_closed) || id == 0);
649   if (id != 0) {
650     GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, id) == nullptr);
651   }
652 
653   grpc_slice_buffer_destroy_internal(&unprocessed_incoming_frames_buffer);
654   grpc_slice_buffer_destroy_internal(&frame_storage);
655   if (stream_compression_method != GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) {
656     grpc_slice_buffer_destroy_internal(&compressed_data_buffer);
657   }
658   if (stream_decompression_method !=
659       GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
660     grpc_slice_buffer_destroy_internal(&decompressed_data_buffer);
661   }
662 
663   for (int i = 0; i < STREAM_LIST_COUNT; i++) {
664     if (GPR_UNLIKELY(included[i])) {
665       gpr_log(GPR_ERROR, "%s stream %d still included in list %d",
666               t->is_client ? "client" : "server", id, i);
667       abort();
668     }
669   }
670 
671   GPR_ASSERT(send_initial_metadata_finished == nullptr);
672   GPR_ASSERT(fetching_send_message == nullptr);
673   GPR_ASSERT(send_trailing_metadata_finished == nullptr);
674   GPR_ASSERT(recv_initial_metadata_ready == nullptr);
675   GPR_ASSERT(recv_message_ready == nullptr);
676   GPR_ASSERT(recv_trailing_metadata_finished == nullptr);
677   grpc_slice_buffer_destroy_internal(&flow_controlled_buffer);
678   GRPC_ERROR_UNREF(read_closed_error);
679   GRPC_ERROR_UNREF(write_closed_error);
680   GRPC_ERROR_UNREF(byte_stream_error);
681 
682   flow_control.Destroy();
683 
684   if (t->resource_user != nullptr) {
685     grpc_resource_user_free(t->resource_user, GRPC_RESOURCE_QUOTA_CALL_SIZE);
686   }
687 
688   GRPC_CHTTP2_UNREF_TRANSPORT(t, "stream");
689   grpc_core::ExecCtx::Run(DEBUG_LOCATION, destroy_stream_arg, GRPC_ERROR_NONE);
690 }
691 
init_stream(grpc_transport * gt,grpc_stream * gs,grpc_stream_refcount * refcount,const void * server_data,grpc_core::Arena * arena)692 static int init_stream(grpc_transport* gt, grpc_stream* gs,
693                        grpc_stream_refcount* refcount, const void* server_data,
694                        grpc_core::Arena* arena) {
695   GPR_TIMER_SCOPE("init_stream", 0);
696   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
697   new (gs) grpc_chttp2_stream(t, refcount, server_data, arena);
698   return 0;
699 }
700 
destroy_stream_locked(void * sp,grpc_error *)701 static void destroy_stream_locked(void* sp, grpc_error* /*error*/) {
702   GPR_TIMER_SCOPE("destroy_stream", 0);
703   grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(sp);
704   s->~grpc_chttp2_stream();
705 }
706 
destroy_stream(grpc_transport * gt,grpc_stream * gs,grpc_closure * then_schedule_closure)707 static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
708                            grpc_closure* then_schedule_closure) {
709   GPR_TIMER_SCOPE("destroy_stream", 0);
710   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
711   grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs);
712   if (s->stream_compression_method !=
713           GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS &&
714       s->stream_compression_ctx != nullptr) {
715     grpc_stream_compression_context_destroy(s->stream_compression_ctx);
716     s->stream_compression_ctx = nullptr;
717   }
718   if (s->stream_decompression_method !=
719           GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS &&
720       s->stream_decompression_ctx != nullptr) {
721     grpc_stream_compression_context_destroy(s->stream_decompression_ctx);
722     s->stream_decompression_ctx = nullptr;
723   }
724 
725   s->destroy_stream_arg = then_schedule_closure;
726   t->combiner->Run(
727       GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s, nullptr),
728       GRPC_ERROR_NONE);
729 }
730 
grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport * t,uint32_t id)731 grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t,
732                                                       uint32_t id) {
733   if (t->accept_stream_cb == nullptr) {
734     return nullptr;
735   }
736   // Don't accept the stream if memory quota doesn't allow. Note that we should
737   // simply refuse the stream here instead of canceling the stream after it's
738   // accepted since the latter will create the call which costs much memory.
739   if (t->resource_user != nullptr &&
740       !grpc_resource_user_safe_alloc(t->resource_user,
741                                      GRPC_RESOURCE_QUOTA_CALL_SIZE)) {
742     gpr_log(GPR_ERROR, "Memory exhausted, rejecting the stream.");
743     grpc_chttp2_add_rst_stream_to_next_write(t, id, GRPC_HTTP2_REFUSED_STREAM,
744                                              nullptr);
745     grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
746     return nullptr;
747   }
748   grpc_chttp2_stream* accepting = nullptr;
749   GPR_ASSERT(t->accepting_stream == nullptr);
750   t->accepting_stream = &accepting;
751   t->accept_stream_cb(t->accept_stream_cb_user_data, &t->base,
752                       reinterpret_cast<void*>(id));
753   t->accepting_stream = nullptr;
754   return accepting;
755 }
756 
757 //
758 // OUTPUT PROCESSING
759 //
760 
write_state_name(grpc_chttp2_write_state st)761 static const char* write_state_name(grpc_chttp2_write_state st) {
762   switch (st) {
763     case GRPC_CHTTP2_WRITE_STATE_IDLE:
764       return "IDLE";
765     case GRPC_CHTTP2_WRITE_STATE_WRITING:
766       return "WRITING";
767     case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
768       return "WRITING+MORE";
769   }
770   GPR_UNREACHABLE_CODE(return "UNKNOWN");
771 }
772 
set_write_state(grpc_chttp2_transport * t,grpc_chttp2_write_state st,const char * reason)773 static void set_write_state(grpc_chttp2_transport* t,
774                             grpc_chttp2_write_state st, const char* reason) {
775   GRPC_CHTTP2_IF_TRACING(
776       gpr_log(GPR_INFO, "W:%p %s [%s] state %s -> %s [%s]", t,
777               t->is_client ? "CLIENT" : "SERVER", t->peer_string.c_str(),
778               write_state_name(t->write_state), write_state_name(st), reason));
779   t->write_state = st;
780   // If the state is being reset back to idle, it means a write was just
781   // finished. Make sure all the run_after_write closures are scheduled.
782   //
783   // This is also our chance to close the transport if the transport was marked
784   // to be closed after all writes finish (for example, if we received a go-away
785   // from peer while we had some pending writes)
786   if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) {
787     grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &t->run_after_write);
788     if (t->close_transport_on_writes_finished != nullptr) {
789       grpc_error* err = t->close_transport_on_writes_finished;
790       t->close_transport_on_writes_finished = nullptr;
791       close_transport_locked(t, err);
792     }
793   }
794 }
795 
inc_initiate_write_reason(grpc_chttp2_initiate_write_reason reason)796 static void inc_initiate_write_reason(
797     grpc_chttp2_initiate_write_reason reason) {
798   switch (reason) {
799     case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE:
800       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_INITIAL_WRITE();
801       break;
802     case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM:
803       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_START_NEW_STREAM();
804       break;
805     case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE:
806       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_MESSAGE();
807       break;
808     case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA:
809       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_INITIAL_METADATA();
810       break;
811     case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA:
812       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_TRAILING_METADATA();
813       break;
814     case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING:
815       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RETRY_SEND_PING();
816       break;
817     case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS:
818       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CONTINUE_PINGS();
819       break;
820     case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT:
821       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_GOAWAY_SENT();
822       break;
823     case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM:
824       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RST_STREAM();
825       break;
826     case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API:
827       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CLOSE_FROM_API();
828       break;
829     case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL:
830       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_STREAM_FLOW_CONTROL();
831       break;
832     case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL:
833       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL();
834       break;
835     case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS:
836       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_SETTINGS();
837       break;
838     case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING:
839       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_SETTING();
840       break;
841     case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE:
842       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_UPDATE();
843       break;
844     case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING:
845       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_APPLICATION_PING();
846       break;
847     case GRPC_CHTTP2_INITIATE_WRITE_BDP_PING:
848       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_BDP_ESTIMATOR_PING();
849       break;
850     case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING:
851       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_KEEPALIVE_PING();
852       break;
853     case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED:
854       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL_UNSTALLED();
855       break;
856     case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE:
857       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_PING_RESPONSE();
858       break;
859     case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM:
860       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FORCE_RST_STREAM();
861       break;
862   }
863 }
864 
grpc_chttp2_initiate_write(grpc_chttp2_transport * t,grpc_chttp2_initiate_write_reason reason)865 void grpc_chttp2_initiate_write(grpc_chttp2_transport* t,
866                                 grpc_chttp2_initiate_write_reason reason) {
867   GPR_TIMER_SCOPE("grpc_chttp2_initiate_write", 0);
868 
869   switch (t->write_state) {
870     case GRPC_CHTTP2_WRITE_STATE_IDLE:
871       inc_initiate_write_reason(reason);
872       set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING,
873                       grpc_chttp2_initiate_write_reason_string(reason));
874       GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
875       // Note that the 'write_action_begin_locked' closure is being scheduled
876       // on the 'finally_scheduler' of t->combiner. This means that
877       // 'write_action_begin_locked' is called only *after* all the other
878       // closures (some of which are potentially initiating more writes on the
879       // transport) are executed on the t->combiner.
880       //
881       // The reason for scheduling on finally_scheduler is to make sure we batch
882       // as many writes as possible. 'write_action_begin_locked' is the function
883       // that gathers all the relevant bytes (which are at various places in the
884       // grpc_chttp2_transport structure) and append them to 'outbuf' field in
885       // grpc_chttp2_transport thereby batching what would have been potentially
886       // multiple write operations.
887       //
888       // Also, 'write_action_begin_locked' only gathers the bytes into outbuf.
889       // It does not call the endpoint to write the bytes. That is done by the
890       // 'write_action' (which is scheduled by 'write_action_begin_locked')
891       t->combiner->FinallyRun(
892           GRPC_CLOSURE_INIT(&t->write_action_begin_locked,
893                             write_action_begin_locked, t, nullptr),
894           GRPC_ERROR_NONE);
895       break;
896     case GRPC_CHTTP2_WRITE_STATE_WRITING:
897       set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
898                       grpc_chttp2_initiate_write_reason_string(reason));
899       break;
900     case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
901       break;
902   }
903 }
904 
grpc_chttp2_mark_stream_writable(grpc_chttp2_transport * t,grpc_chttp2_stream * s)905 void grpc_chttp2_mark_stream_writable(grpc_chttp2_transport* t,
906                                       grpc_chttp2_stream* s) {
907   if (t->closed_with_error == GRPC_ERROR_NONE &&
908       grpc_chttp2_list_add_writable_stream(t, s)) {
909     GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become");
910   }
911 }
912 
begin_writing_desc(bool partial)913 static const char* begin_writing_desc(bool partial) {
914   if (partial) {
915     return "begin partial write in background";
916   } else {
917     return "begin write in current thread";
918   }
919 }
920 
write_action_begin_locked(void * gt,grpc_error *)921 static void write_action_begin_locked(void* gt, grpc_error* /*error_ignored*/) {
922   GPR_TIMER_SCOPE("write_action_begin_locked", 0);
923   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(gt);
924   GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE);
925   grpc_chttp2_begin_write_result r;
926   if (t->closed_with_error != GRPC_ERROR_NONE) {
927     r.writing = false;
928   } else {
929     r = grpc_chttp2_begin_write(t);
930   }
931   if (r.writing) {
932     if (r.partial) {
933       GRPC_STATS_INC_HTTP2_PARTIAL_WRITES();
934     }
935     set_write_state(t,
936                     r.partial ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE
937                               : GRPC_CHTTP2_WRITE_STATE_WRITING,
938                     begin_writing_desc(r.partial));
939     write_action(t, GRPC_ERROR_NONE);
940     if (t->reading_paused_on_pending_induced_frames) {
941       GPR_ASSERT(t->num_pending_induced_frames == 0);
942       // We had paused reading, because we had many induced frames (SETTINGS
943       // ACK, PINGS ACK and RST_STREAMS) pending in t->qbuf. Now that we have
944       // been able to flush qbuf, we can resume reading.
945       GRPC_CHTTP2_IF_TRACING(gpr_log(
946           GPR_INFO,
947           "transport %p : Resuming reading after being paused due to too "
948           "many unwritten SETTINGS ACK, PINGS ACK and RST_STREAM frames",
949           t));
950       t->reading_paused_on_pending_induced_frames = false;
951       continue_read_action_locked(t);
952     }
953   } else {
954     GRPC_STATS_INC_HTTP2_SPURIOUS_WRITES_BEGUN();
955     set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "begin writing nothing");
956     GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing");
957   }
958 }
959 
write_action(void * gt,grpc_error *)960 static void write_action(void* gt, grpc_error* /*error*/) {
961   GPR_TIMER_SCOPE("write_action", 0);
962   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(gt);
963   void* cl = t->cl;
964   t->cl = nullptr;
965   grpc_endpoint_write(
966       t->ep, &t->outbuf,
967       GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end, t,
968                         grpc_schedule_on_exec_ctx),
969       cl);
970 }
971 
write_action_end(void * tp,grpc_error * error)972 static void write_action_end(void* tp, grpc_error* error) {
973   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
974   t->combiner->Run(GRPC_CLOSURE_INIT(&t->write_action_end_locked,
975                                      write_action_end_locked, t, nullptr),
976                    GRPC_ERROR_REF(error));
977 }
978 
979 // Callback from the grpc_endpoint after bytes have been written by calling
980 // sendmsg
write_action_end_locked(void * tp,grpc_error * error)981 static void write_action_end_locked(void* tp, grpc_error* error) {
982   GPR_TIMER_SCOPE("terminate_writing_with_lock", 0);
983   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
984 
985   bool closed = false;
986   if (error != GRPC_ERROR_NONE) {
987     close_transport_locked(t, GRPC_ERROR_REF(error));
988     closed = true;
989   }
990 
991   if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED) {
992     t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SENT;
993     closed = true;
994     if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
995       close_transport_locked(
996           t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("goaway sent"));
997     }
998   }
999 
1000   switch (t->write_state) {
1001     case GRPC_CHTTP2_WRITE_STATE_IDLE:
1002       GPR_UNREACHABLE_CODE(break);
1003     case GRPC_CHTTP2_WRITE_STATE_WRITING:
1004       GPR_TIMER_MARK("state=writing", 0);
1005       set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "finish writing");
1006       break;
1007     case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
1008       GPR_TIMER_MARK("state=writing_stale_no_poller", 0);
1009       set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, "continue writing");
1010       GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
1011       // If the transport is closed, we will retry writing on the endpoint
1012       // and next write may contain part of the currently serialized frames.
1013       // So, we should only call the run_after_write callbacks when the next
1014       // write finishes, or the callbacks will be invoked when the stream is
1015       // closed.
1016       if (!closed) {
1017         grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &t->run_after_write);
1018       }
1019       t->combiner->FinallyRun(
1020           GRPC_CLOSURE_INIT(&t->write_action_begin_locked,
1021                             write_action_begin_locked, t, nullptr),
1022           GRPC_ERROR_NONE);
1023       break;
1024   }
1025 
1026   grpc_chttp2_end_write(t, GRPC_ERROR_REF(error));
1027   GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing");
1028 }
1029 
1030 // Dirties an HTTP2 setting to be sent out next time a writing path occurs.
1031 // If the change needs to occur immediately, manually initiate a write.
queue_setting_update(grpc_chttp2_transport * t,grpc_chttp2_setting_id id,uint32_t value)1032 static void queue_setting_update(grpc_chttp2_transport* t,
1033                                  grpc_chttp2_setting_id id, uint32_t value) {
1034   const grpc_chttp2_setting_parameters* sp =
1035       &grpc_chttp2_settings_parameters[id];
1036   uint32_t use_value = GPR_CLAMP(value, sp->min_value, sp->max_value);
1037   if (use_value != value) {
1038     gpr_log(GPR_INFO, "Requested parameter %s clamped from %d to %d", sp->name,
1039             value, use_value);
1040   }
1041   if (use_value != t->settings[GRPC_LOCAL_SETTINGS][id]) {
1042     t->settings[GRPC_LOCAL_SETTINGS][id] = use_value;
1043     t->dirtied_local_settings = true;
1044   }
1045 }
1046 
grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport * t,uint32_t goaway_error,uint32_t last_stream_id,const grpc_slice & goaway_text)1047 void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
1048                                      uint32_t goaway_error,
1049                                      uint32_t last_stream_id,
1050                                      const grpc_slice& goaway_text) {
1051   // Discard the error from a previous goaway frame (if any)
1052   if (t->goaway_error != GRPC_ERROR_NONE) {
1053     GRPC_ERROR_UNREF(t->goaway_error);
1054   }
1055   t->goaway_error = grpc_error_set_str(
1056       grpc_error_set_int(
1057           grpc_error_set_int(
1058               GRPC_ERROR_CREATE_FROM_STATIC_STRING("GOAWAY received"),
1059               GRPC_ERROR_INT_HTTP2_ERROR, static_cast<intptr_t>(goaway_error)),
1060           GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
1061       GRPC_ERROR_STR_RAW_BYTES, goaway_text);
1062 
1063   GRPC_CHTTP2_IF_TRACING(
1064       gpr_log(GPR_INFO, "transport %p got goaway with last stream id %d", t,
1065               last_stream_id));
1066   // We want to log this irrespective of whether http tracing is enabled if we
1067   // received a GOAWAY with a non NO_ERROR code.
1068   if (goaway_error != GRPC_HTTP2_NO_ERROR) {
1069     gpr_log(GPR_INFO, "%s: Got goaway [%d] err=%s", t->peer_string.c_str(),
1070             goaway_error, grpc_error_string(t->goaway_error));
1071   }
1072   absl::Status status = grpc_error_to_absl_status(t->goaway_error);
1073   // When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug
1074   // data equal to "too_many_pings", it should log the occurrence at a log level
1075   // that is enabled by default and double the configured KEEPALIVE_TIME used
1076   // for new connections on that channel.
1077   if (GPR_UNLIKELY(t->is_client &&
1078                    goaway_error == GRPC_HTTP2_ENHANCE_YOUR_CALM &&
1079                    grpc_slice_str_cmp(goaway_text, "too_many_pings") == 0)) {
1080     gpr_log(GPR_ERROR,
1081             "Received a GOAWAY with error code ENHANCE_YOUR_CALM and debug "
1082             "data equal to \"too_many_pings\"");
1083     double current_keepalive_time_ms = static_cast<double>(t->keepalive_time);
1084     constexpr int max_keepalive_time_ms =
1085         INT_MAX / KEEPALIVE_TIME_BACKOFF_MULTIPLIER;
1086     t->keepalive_time =
1087         current_keepalive_time_ms > static_cast<double>(max_keepalive_time_ms)
1088             ? GRPC_MILLIS_INF_FUTURE
1089             : static_cast<grpc_millis>(current_keepalive_time_ms *
1090                                        KEEPALIVE_TIME_BACKOFF_MULTIPLIER);
1091     status.SetPayload(grpc_core::kKeepaliveThrottlingKey,
1092                       absl::Cord(std::to_string(t->keepalive_time)));
1093   }
1094   // lie: use transient failure from the transport to indicate goaway has been
1095   // received.
1096   connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE, status,
1097                          "got_goaway");
1098 }
1099 
maybe_start_some_streams(grpc_chttp2_transport * t)1100 static void maybe_start_some_streams(grpc_chttp2_transport* t) {
1101   grpc_chttp2_stream* s;
1102   // cancel out streams that haven't yet started if we have received a GOAWAY
1103   if (t->goaway_error != GRPC_ERROR_NONE) {
1104     while (grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1105       grpc_chttp2_cancel_stream(
1106           t, s,
1107           grpc_error_set_int(
1108               GRPC_ERROR_CREATE_FROM_STATIC_STRING("GOAWAY received"),
1109               GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1110     }
1111     return;
1112   }
1113   // start streams where we have free grpc_chttp2_stream ids and free
1114   // * concurrency
1115   while (t->next_stream_id <= MAX_CLIENT_STREAM_ID &&
1116          grpc_chttp2_stream_map_size(&t->stream_map) <
1117              t->settings[GRPC_PEER_SETTINGS]
1118                         [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] &&
1119          grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1120     // safe since we can't (legally) be parsing this stream yet
1121     GRPC_CHTTP2_IF_TRACING(gpr_log(
1122         GPR_INFO,
1123         "HTTP:%s: Transport %p allocating new grpc_chttp2_stream %p to id %d",
1124         t->is_client ? "CLI" : "SVR", t, s, t->next_stream_id));
1125 
1126     GPR_ASSERT(s->id == 0);
1127     s->id = t->next_stream_id;
1128     t->next_stream_id += 2;
1129 
1130     if (t->next_stream_id >= MAX_CLIENT_STREAM_ID) {
1131       connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE,
1132                              absl::Status(absl::StatusCode::kUnavailable,
1133                                           "Transport Stream IDs exhausted"),
1134                              "no_more_stream_ids");
1135     }
1136 
1137     grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
1138     post_destructive_reclaimer(t);
1139     grpc_chttp2_mark_stream_writable(t, s);
1140     grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM);
1141   }
1142   // cancel out streams that will never be started
1143   if (t->next_stream_id >= MAX_CLIENT_STREAM_ID) {
1144     while (grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1145       grpc_chttp2_cancel_stream(
1146           t, s,
1147           grpc_error_set_int(
1148               GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream IDs exhausted"),
1149               GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1150     }
1151   }
1152 }
1153 
1154 // Flag that this closure barrier may be covering a write in a pollset, and so
1155 //   we should not complete this closure until we can prove that the write got
1156 //   scheduled
1157 #define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 0)
1158 // First bit of the reference count, stored in the high order bits (with the low
1159 //   bits being used for flags defined above)
1160 #define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16)
1161 
add_closure_barrier(grpc_closure * closure)1162 static grpc_closure* add_closure_barrier(grpc_closure* closure) {
1163   closure->next_data.scratch += CLOSURE_BARRIER_FIRST_REF_BIT;
1164   return closure;
1165 }
1166 
null_then_sched_closure(grpc_closure ** closure)1167 static void null_then_sched_closure(grpc_closure** closure) {
1168   grpc_closure* c = *closure;
1169   *closure = nullptr;
1170   grpc_core::ExecCtx::Run(DEBUG_LOCATION, c, GRPC_ERROR_NONE);
1171 }
1172 
grpc_chttp2_complete_closure_step(grpc_chttp2_transport * t,grpc_chttp2_stream *,grpc_closure ** pclosure,grpc_error * error,const char * desc)1173 void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
1174                                        grpc_chttp2_stream* /*s*/,
1175                                        grpc_closure** pclosure,
1176                                        grpc_error* error, const char* desc) {
1177   grpc_closure* closure = *pclosure;
1178   *pclosure = nullptr;
1179   if (closure == nullptr) {
1180     GRPC_ERROR_UNREF(error);
1181     return;
1182   }
1183   closure->next_data.scratch -= CLOSURE_BARRIER_FIRST_REF_BIT;
1184   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1185     const char* errstr = grpc_error_string(error);
1186     gpr_log(
1187         GPR_INFO,
1188         "complete_closure_step: t=%p %p refs=%d flags=0x%04x desc=%s err=%s "
1189         "write_state=%s",
1190         t, closure,
1191         static_cast<int>(closure->next_data.scratch /
1192                          CLOSURE_BARRIER_FIRST_REF_BIT),
1193         static_cast<int>(closure->next_data.scratch %
1194                          CLOSURE_BARRIER_FIRST_REF_BIT),
1195         desc, errstr, write_state_name(t->write_state));
1196   }
1197   if (error != GRPC_ERROR_NONE) {
1198     if (closure->error_data.error == GRPC_ERROR_NONE) {
1199       closure->error_data.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1200           "Error in HTTP transport completing operation");
1201       closure->error_data.error = grpc_error_set_str(
1202           closure->error_data.error, GRPC_ERROR_STR_TARGET_ADDRESS,
1203           grpc_slice_from_copied_string(t->peer_string.c_str()));
1204     }
1205     closure->error_data.error =
1206         grpc_error_add_child(closure->error_data.error, error);
1207   }
1208   if (closure->next_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) {
1209     if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) ||
1210         !(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) {
1211       // Using GRPC_CLOSURE_SCHED instead of GRPC_CLOSURE_RUN to avoid running
1212       // closures earlier than when it is safe to do so.
1213       grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure,
1214                               closure->error_data.error);
1215     } else {
1216       grpc_closure_list_append(&t->run_after_write, closure,
1217                                closure->error_data.error);
1218     }
1219   }
1220 }
1221 
contains_non_ok_status(grpc_metadata_batch * batch)1222 static bool contains_non_ok_status(grpc_metadata_batch* batch) {
1223   if (batch->idx.named.grpc_status != nullptr) {
1224     return !grpc_mdelem_static_value_eq(batch->idx.named.grpc_status->md,
1225                                         GRPC_MDELEM_GRPC_STATUS_0);
1226   }
1227   return false;
1228 }
1229 
maybe_become_writable_due_to_send_msg(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1230 static void maybe_become_writable_due_to_send_msg(grpc_chttp2_transport* t,
1231                                                   grpc_chttp2_stream* s) {
1232   if (s->id != 0 && (!s->write_buffering ||
1233                      s->flow_controlled_buffer.length > t->write_buffer_size)) {
1234     grpc_chttp2_mark_stream_writable(t, s);
1235     grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE);
1236   }
1237 }
1238 
add_fetched_slice_locked(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1239 static void add_fetched_slice_locked(grpc_chttp2_transport* t,
1240                                      grpc_chttp2_stream* s) {
1241   s->fetched_send_message_length +=
1242       static_cast<uint32_t> GRPC_SLICE_LENGTH(s->fetching_slice);
1243   grpc_slice_buffer_add(&s->flow_controlled_buffer, s->fetching_slice);
1244   maybe_become_writable_due_to_send_msg(t, s);
1245 }
1246 
continue_fetching_send_locked(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1247 static void continue_fetching_send_locked(grpc_chttp2_transport* t,
1248                                           grpc_chttp2_stream* s) {
1249   for (;;) {
1250     if (s->fetching_send_message == nullptr) {
1251       // Stream was cancelled before message fetch completed
1252       abort(); /* TODO(ctiller): what cleanup here? */
1253       return;  /* early out */
1254     }
1255     if (s->fetched_send_message_length == s->fetching_send_message->length()) {
1256       int64_t notify_offset = s->next_message_end_offset;
1257       if (notify_offset <= s->flow_controlled_bytes_written) {
1258         grpc_chttp2_complete_closure_step(
1259             t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE,
1260             "fetching_send_message_finished");
1261       } else {
1262         grpc_chttp2_write_cb* cb = t->write_cb_pool;
1263         if (cb == nullptr) {
1264           cb = static_cast<grpc_chttp2_write_cb*>(gpr_malloc(sizeof(*cb)));
1265         } else {
1266           t->write_cb_pool = cb->next;
1267         }
1268         cb->call_at_byte = notify_offset;
1269         cb->closure = s->fetching_send_message_finished;
1270         s->fetching_send_message_finished = nullptr;
1271         grpc_chttp2_write_cb** list =
1272             s->fetching_send_message->flags() & GRPC_WRITE_THROUGH
1273                 ? &s->on_write_finished_cbs
1274                 : &s->on_flow_controlled_cbs;
1275         cb->next = *list;
1276         *list = cb;
1277       }
1278       s->fetching_send_message.reset();
1279       return; /* early out */
1280     } else if (s->fetching_send_message->Next(
1281                    UINT32_MAX, GRPC_CLOSURE_INIT(&s->complete_fetch_locked,
1282                                                  ::complete_fetch, s,
1283                                                  grpc_schedule_on_exec_ctx))) {
1284       grpc_error* error = s->fetching_send_message->Pull(&s->fetching_slice);
1285       if (error != GRPC_ERROR_NONE) {
1286         s->fetching_send_message.reset();
1287         grpc_chttp2_cancel_stream(t, s, error);
1288       } else {
1289         add_fetched_slice_locked(t, s);
1290       }
1291     }
1292   }
1293 }
1294 
complete_fetch(void * gs,grpc_error * error)1295 static void complete_fetch(void* gs, grpc_error* error) {
1296   grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(gs);
1297   s->t->combiner->Run(GRPC_CLOSURE_INIT(&s->complete_fetch_locked,
1298                                         ::complete_fetch_locked, s, nullptr),
1299                       GRPC_ERROR_REF(error));
1300 }
1301 
complete_fetch_locked(void * gs,grpc_error * error)1302 static void complete_fetch_locked(void* gs, grpc_error* error) {
1303   grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(gs);
1304   grpc_chttp2_transport* t = s->t;
1305   if (error == GRPC_ERROR_NONE) {
1306     error = s->fetching_send_message->Pull(&s->fetching_slice);
1307     if (error == GRPC_ERROR_NONE) {
1308       add_fetched_slice_locked(t, s);
1309       continue_fetching_send_locked(t, s);
1310     }
1311   }
1312   if (error != GRPC_ERROR_NONE) {
1313     s->fetching_send_message.reset();
1314     grpc_chttp2_cancel_stream(t, s, error);
1315   }
1316 }
1317 
log_metadata(const grpc_metadata_batch * md_batch,uint32_t id,bool is_client,bool is_initial)1318 static void log_metadata(const grpc_metadata_batch* md_batch, uint32_t id,
1319                          bool is_client, bool is_initial) {
1320   for (grpc_linked_mdelem* md = md_batch->list.head; md != nullptr;
1321        md = md->next) {
1322     char* key = grpc_slice_to_c_string(GRPC_MDKEY(md->md));
1323     char* value = grpc_slice_to_c_string(GRPC_MDVALUE(md->md));
1324     gpr_log(GPR_INFO, "HTTP:%d:%s:%s: %s: %s", id, is_initial ? "HDR" : "TRL",
1325             is_client ? "CLI" : "SVR", key, value);
1326     gpr_free(key);
1327     gpr_free(value);
1328   }
1329 }
1330 
perform_stream_op_locked(void * stream_op,grpc_error *)1331 static void perform_stream_op_locked(void* stream_op,
1332                                      grpc_error* /*error_ignored*/) {
1333   GPR_TIMER_SCOPE("perform_stream_op_locked", 0);
1334 
1335   grpc_transport_stream_op_batch* op =
1336       static_cast<grpc_transport_stream_op_batch*>(stream_op);
1337   grpc_chttp2_stream* s =
1338       static_cast<grpc_chttp2_stream*>(op->handler_private.extra_arg);
1339   grpc_transport_stream_op_batch_payload* op_payload = op->payload;
1340   grpc_chttp2_transport* t = s->t;
1341 
1342   GRPC_STATS_INC_HTTP2_OP_BATCHES();
1343 
1344   s->context = op->payload->context;
1345   s->traced = op->is_traced;
1346   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1347     gpr_log(GPR_INFO, "perform_stream_op_locked: %s; on_complete = %p",
1348             grpc_transport_stream_op_batch_string(op).c_str(), op->on_complete);
1349     if (op->send_initial_metadata) {
1350       log_metadata(op_payload->send_initial_metadata.send_initial_metadata,
1351                    s->id, t->is_client, true);
1352     }
1353     if (op->send_trailing_metadata) {
1354       log_metadata(op_payload->send_trailing_metadata.send_trailing_metadata,
1355                    s->id, t->is_client, false);
1356     }
1357   }
1358 
1359   grpc_closure* on_complete = op->on_complete;
1360   // on_complete will be null if and only if there are no send ops in the batch.
1361   if (on_complete != nullptr) {
1362     // This batch has send ops. Use final_data as a barrier until enqueue time;
1363     // the initial counter is dropped at the end of this function.
1364     on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT;
1365     on_complete->error_data.error = GRPC_ERROR_NONE;
1366   }
1367 
1368   if (op->cancel_stream) {
1369     GRPC_STATS_INC_HTTP2_OP_CANCEL();
1370     grpc_chttp2_cancel_stream(t, s, op_payload->cancel_stream.cancel_error);
1371   }
1372 
1373   if (op->send_initial_metadata) {
1374     if (t->is_client && t->channelz_socket != nullptr) {
1375       t->channelz_socket->RecordStreamStartedFromLocal();
1376     }
1377     GRPC_STATS_INC_HTTP2_OP_SEND_INITIAL_METADATA();
1378     GPR_ASSERT(s->send_initial_metadata_finished == nullptr);
1379     on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
1380 
1381     // Identify stream compression
1382     if (op_payload->send_initial_metadata.send_initial_metadata->idx.named
1383                 .content_encoding == nullptr ||
1384         grpc_stream_compression_method_parse(
1385             GRPC_MDVALUE(
1386                 op_payload->send_initial_metadata.send_initial_metadata->idx
1387                     .named.content_encoding->md),
1388             true, &s->stream_compression_method) == 0) {
1389       s->stream_compression_method = GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS;
1390     }
1391     if (s->stream_compression_method !=
1392         GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) {
1393       s->uncompressed_data_size = 0;
1394       s->stream_compression_ctx = nullptr;
1395       grpc_slice_buffer_init(&s->compressed_data_buffer);
1396     }
1397     s->send_initial_metadata_finished = add_closure_barrier(on_complete);
1398     s->send_initial_metadata =
1399         op_payload->send_initial_metadata.send_initial_metadata;
1400     if (t->is_client) {
1401       s->deadline = GPR_MIN(s->deadline, s->send_initial_metadata->deadline);
1402     }
1403     if (contains_non_ok_status(s->send_initial_metadata)) {
1404       s->seen_error = true;
1405     }
1406     if (!s->write_closed) {
1407       if (t->is_client) {
1408         if (t->closed_with_error == GRPC_ERROR_NONE) {
1409           GPR_ASSERT(s->id == 0);
1410           grpc_chttp2_list_add_waiting_for_concurrency(t, s);
1411           maybe_start_some_streams(t);
1412         } else {
1413           grpc_chttp2_cancel_stream(
1414               t, s,
1415               grpc_error_set_int(
1416                   GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1417                       "Transport closed", &t->closed_with_error, 1),
1418                   GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1419         }
1420       } else {
1421         GPR_ASSERT(s->id != 0);
1422         grpc_chttp2_mark_stream_writable(t, s);
1423         if (!(op->send_message &&
1424               (op->payload->send_message.send_message->flags() &
1425                GRPC_WRITE_BUFFER_HINT))) {
1426           grpc_chttp2_initiate_write(
1427               t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA);
1428         }
1429       }
1430     } else {
1431       s->send_initial_metadata = nullptr;
1432       grpc_chttp2_complete_closure_step(
1433           t, s, &s->send_initial_metadata_finished,
1434           GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1435               "Attempt to send initial metadata after stream was closed",
1436               &s->write_closed_error, 1),
1437           "send_initial_metadata_finished");
1438     }
1439     if (op_payload->send_initial_metadata.peer_string != nullptr) {
1440       gpr_atm_rel_store(op_payload->send_initial_metadata.peer_string,
1441                         (gpr_atm)t->peer_string.c_str());
1442     }
1443   }
1444 
1445   if (op->send_message) {
1446     GRPC_STATS_INC_HTTP2_OP_SEND_MESSAGE();
1447     t->num_messages_in_next_write++;
1448     GRPC_STATS_INC_HTTP2_SEND_MESSAGE_SIZE(
1449         op->payload->send_message.send_message->length());
1450     on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
1451     s->fetching_send_message_finished = add_closure_barrier(op->on_complete);
1452     if (s->write_closed) {
1453       op->payload->send_message.stream_write_closed = true;
1454       // We should NOT return an error here, so as to avoid a cancel OP being
1455       // started. The surface layer will notice that the stream has been closed
1456       // for writes and fail the send message op.
1457       op->payload->send_message.send_message.reset();
1458       grpc_chttp2_complete_closure_step(
1459           t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE,
1460           "fetching_send_message_finished");
1461     } else {
1462       GPR_ASSERT(s->fetching_send_message == nullptr);
1463       uint8_t* frame_hdr = grpc_slice_buffer_tiny_add(
1464           &s->flow_controlled_buffer, GRPC_HEADER_SIZE_IN_BYTES);
1465       uint32_t flags = op_payload->send_message.send_message->flags();
1466       frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0;
1467       size_t len = op_payload->send_message.send_message->length();
1468       frame_hdr[1] = static_cast<uint8_t>(len >> 24);
1469       frame_hdr[2] = static_cast<uint8_t>(len >> 16);
1470       frame_hdr[3] = static_cast<uint8_t>(len >> 8);
1471       frame_hdr[4] = static_cast<uint8_t>(len);
1472       s->fetching_send_message =
1473           std::move(op_payload->send_message.send_message);
1474       s->fetched_send_message_length = 0;
1475       s->next_message_end_offset =
1476           s->flow_controlled_bytes_written +
1477           static_cast<int64_t>(s->flow_controlled_buffer.length) +
1478           static_cast<int64_t>(len);
1479       if (flags & GRPC_WRITE_BUFFER_HINT) {
1480         s->next_message_end_offset -= t->write_buffer_size;
1481         s->write_buffering = true;
1482       } else {
1483         s->write_buffering = false;
1484       }
1485       continue_fetching_send_locked(t, s);
1486       maybe_become_writable_due_to_send_msg(t, s);
1487     }
1488   }
1489 
1490   if (op->send_trailing_metadata) {
1491     GRPC_STATS_INC_HTTP2_OP_SEND_TRAILING_METADATA();
1492     GPR_ASSERT(s->send_trailing_metadata_finished == nullptr);
1493     on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
1494     s->send_trailing_metadata_finished = add_closure_barrier(on_complete);
1495     s->send_trailing_metadata =
1496         op_payload->send_trailing_metadata.send_trailing_metadata;
1497     s->sent_trailing_metadata_op = op_payload->send_trailing_metadata.sent;
1498     s->write_buffering = false;
1499     if (contains_non_ok_status(s->send_trailing_metadata)) {
1500       s->seen_error = true;
1501     }
1502     if (s->write_closed) {
1503       s->send_trailing_metadata = nullptr;
1504       s->sent_trailing_metadata_op = nullptr;
1505       grpc_chttp2_complete_closure_step(
1506           t, s, &s->send_trailing_metadata_finished,
1507           grpc_metadata_batch_is_empty(
1508               op->payload->send_trailing_metadata.send_trailing_metadata)
1509               ? GRPC_ERROR_NONE
1510               : GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1511                     "Attempt to send trailing metadata after "
1512                     "stream was closed"),
1513           "send_trailing_metadata_finished");
1514     } else if (s->id != 0) {
1515       // TODO(ctiller): check if there's flow control for any outstanding
1516       //   bytes before going writable
1517       grpc_chttp2_mark_stream_writable(t, s);
1518       grpc_chttp2_initiate_write(
1519           t, GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA);
1520     }
1521   }
1522 
1523   if (op->recv_initial_metadata) {
1524     GRPC_STATS_INC_HTTP2_OP_RECV_INITIAL_METADATA();
1525     GPR_ASSERT(s->recv_initial_metadata_ready == nullptr);
1526     s->recv_initial_metadata_ready =
1527         op_payload->recv_initial_metadata.recv_initial_metadata_ready;
1528     s->recv_initial_metadata =
1529         op_payload->recv_initial_metadata.recv_initial_metadata;
1530     s->trailing_metadata_available =
1531         op_payload->recv_initial_metadata.trailing_metadata_available;
1532     if (op_payload->recv_initial_metadata.peer_string != nullptr) {
1533       gpr_atm_rel_store(op_payload->recv_initial_metadata.peer_string,
1534                         (gpr_atm)t->peer_string.c_str());
1535     }
1536     grpc_chttp2_maybe_complete_recv_initial_metadata(t, s);
1537   }
1538 
1539   if (op->recv_message) {
1540     GRPC_STATS_INC_HTTP2_OP_RECV_MESSAGE();
1541     size_t before = 0;
1542     GPR_ASSERT(s->recv_message_ready == nullptr);
1543     GPR_ASSERT(!s->pending_byte_stream);
1544     s->recv_message_ready = op_payload->recv_message.recv_message_ready;
1545     s->recv_message = op_payload->recv_message.recv_message;
1546     if (s->id != 0) {
1547       if (!s->read_closed) {
1548         before = s->frame_storage.length +
1549                  s->unprocessed_incoming_frames_buffer.length;
1550       }
1551     }
1552     grpc_chttp2_maybe_complete_recv_message(t, s);
1553     if (s->id != 0) {
1554       if (!s->read_closed && s->frame_storage.length == 0) {
1555         size_t after = s->frame_storage.length +
1556                        s->unprocessed_incoming_frames_buffer_cached_length;
1557         s->flow_control->IncomingByteStreamUpdate(GRPC_HEADER_SIZE_IN_BYTES,
1558                                                   before - after);
1559         grpc_chttp2_act_on_flowctl_action(s->flow_control->MakeAction(), t, s);
1560       }
1561     }
1562   }
1563 
1564   if (op->recv_trailing_metadata) {
1565     GRPC_STATS_INC_HTTP2_OP_RECV_TRAILING_METADATA();
1566     GPR_ASSERT(s->collecting_stats == nullptr);
1567     s->collecting_stats = op_payload->recv_trailing_metadata.collect_stats;
1568     GPR_ASSERT(s->recv_trailing_metadata_finished == nullptr);
1569     s->recv_trailing_metadata_finished =
1570         op_payload->recv_trailing_metadata.recv_trailing_metadata_ready;
1571     s->recv_trailing_metadata =
1572         op_payload->recv_trailing_metadata.recv_trailing_metadata;
1573     s->final_metadata_requested = true;
1574     grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
1575   }
1576 
1577   if (on_complete != nullptr) {
1578     grpc_chttp2_complete_closure_step(t, s, &on_complete, GRPC_ERROR_NONE,
1579                                       "op->on_complete");
1580   }
1581 
1582   GRPC_CHTTP2_STREAM_UNREF(s, "perform_stream_op");
1583 }
1584 
perform_stream_op(grpc_transport * gt,grpc_stream * gs,grpc_transport_stream_op_batch * op)1585 static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
1586                               grpc_transport_stream_op_batch* op) {
1587   GPR_TIMER_SCOPE("perform_stream_op", 0);
1588   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
1589   grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs);
1590 
1591   if (!t->is_client) {
1592     if (op->send_initial_metadata) {
1593       grpc_millis deadline =
1594           op->payload->send_initial_metadata.send_initial_metadata->deadline;
1595       GPR_ASSERT(deadline == GRPC_MILLIS_INF_FUTURE);
1596     }
1597     if (op->send_trailing_metadata) {
1598       grpc_millis deadline =
1599           op->payload->send_trailing_metadata.send_trailing_metadata->deadline;
1600       GPR_ASSERT(deadline == GRPC_MILLIS_INF_FUTURE);
1601     }
1602   }
1603 
1604   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1605     gpr_log(GPR_INFO, "perform_stream_op[s=%p]: %s", s,
1606             grpc_transport_stream_op_batch_string(op).c_str());
1607   }
1608 
1609   GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op");
1610   op->handler_private.extra_arg = gs;
1611   t->combiner->Run(GRPC_CLOSURE_INIT(&op->handler_private.closure,
1612                                      perform_stream_op_locked, op, nullptr),
1613                    GRPC_ERROR_NONE);
1614 }
1615 
cancel_pings(grpc_chttp2_transport * t,grpc_error * error)1616 static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) {
1617   // callback remaining pings: they're not allowed to call into the transport,
1618   //   and maybe they hold resources that need to be freed
1619   grpc_chttp2_ping_queue* pq = &t->ping_queue;
1620   GPR_ASSERT(error != GRPC_ERROR_NONE);
1621   for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) {
1622     grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error));
1623     grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &pq->lists[j]);
1624   }
1625   GRPC_ERROR_UNREF(error);
1626 }
1627 
send_ping_locked(grpc_chttp2_transport * t,grpc_closure * on_initiate,grpc_closure * on_ack)1628 static void send_ping_locked(grpc_chttp2_transport* t,
1629                              grpc_closure* on_initiate, grpc_closure* on_ack) {
1630   if (t->closed_with_error != GRPC_ERROR_NONE) {
1631     grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_initiate,
1632                             GRPC_ERROR_REF(t->closed_with_error));
1633     grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_ack,
1634                             GRPC_ERROR_REF(t->closed_with_error));
1635     return;
1636   }
1637   grpc_chttp2_ping_queue* pq = &t->ping_queue;
1638   grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE], on_initiate,
1639                            GRPC_ERROR_NONE);
1640   grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT], on_ack,
1641                            GRPC_ERROR_NONE);
1642 }
1643 
1644 // Specialized form of send_ping_locked for keepalive ping. If there is already
1645 // a ping in progress, the keepalive ping would piggyback onto that ping,
1646 // instead of waiting for that ping to complete and then starting a new ping.
send_keepalive_ping_locked(grpc_chttp2_transport * t)1647 static void send_keepalive_ping_locked(grpc_chttp2_transport* t) {
1648   if (t->closed_with_error != GRPC_ERROR_NONE) {
1649     t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked,
1650                                        start_keepalive_ping_locked, t, nullptr),
1651                      GRPC_ERROR_REF(t->closed_with_error));
1652     t->combiner->Run(
1653         GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked,
1654                           finish_keepalive_ping_locked, t, nullptr),
1655         GRPC_ERROR_REF(t->closed_with_error));
1656     return;
1657   }
1658   grpc_chttp2_ping_queue* pq = &t->ping_queue;
1659   if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) {
1660     // There is a ping in flight. Add yourself to the inflight closure list.
1661     t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked,
1662                                        start_keepalive_ping_locked, t, nullptr),
1663                      GRPC_ERROR_REF(t->closed_with_error));
1664     grpc_closure_list_append(
1665         &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT],
1666         GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked,
1667                           finish_keepalive_ping, t, grpc_schedule_on_exec_ctx),
1668         GRPC_ERROR_NONE);
1669     return;
1670   }
1671   grpc_closure_list_append(
1672       &pq->lists[GRPC_CHTTP2_PCL_INITIATE],
1673       GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked, start_keepalive_ping,
1674                         t, grpc_schedule_on_exec_ctx),
1675       GRPC_ERROR_NONE);
1676   grpc_closure_list_append(
1677       &pq->lists[GRPC_CHTTP2_PCL_NEXT],
1678       GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked, finish_keepalive_ping,
1679                         t, grpc_schedule_on_exec_ctx),
1680       GRPC_ERROR_NONE);
1681 }
1682 
grpc_chttp2_retry_initiate_ping(void * tp,grpc_error * error)1683 void grpc_chttp2_retry_initiate_ping(void* tp, grpc_error* error) {
1684   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
1685   t->combiner->Run(GRPC_CLOSURE_INIT(&t->retry_initiate_ping_locked,
1686                                      retry_initiate_ping_locked, t, nullptr),
1687                    GRPC_ERROR_REF(error));
1688 }
1689 
retry_initiate_ping_locked(void * tp,grpc_error * error)1690 static void retry_initiate_ping_locked(void* tp, grpc_error* error) {
1691   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
1692   t->ping_state.is_delayed_ping_timer_set = false;
1693   if (error == GRPC_ERROR_NONE) {
1694     grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING);
1695   }
1696   GRPC_CHTTP2_UNREF_TRANSPORT(t, "retry_initiate_ping_locked");
1697 }
1698 
grpc_chttp2_ack_ping(grpc_chttp2_transport * t,uint64_t id)1699 void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) {
1700   grpc_chttp2_ping_queue* pq = &t->ping_queue;
1701   if (pq->inflight_id != id) {
1702     gpr_log(GPR_DEBUG, "Unknown ping response from %s: %" PRIx64,
1703             t->peer_string.c_str(), id);
1704     return;
1705   }
1706   grpc_core::ExecCtx::RunList(DEBUG_LOCATION,
1707                               &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
1708   if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
1709     grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS);
1710   }
1711 }
1712 
send_goaway(grpc_chttp2_transport * t,grpc_error * error)1713 static void send_goaway(grpc_chttp2_transport* t, grpc_error* error) {
1714   // We want to log this irrespective of whether http tracing is enabled
1715   gpr_log(GPR_INFO, "%s: Sending goaway err=%s", t->peer_string.c_str(),
1716           grpc_error_string(error));
1717   t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED;
1718   grpc_http2_error_code http_error;
1719   grpc_slice slice;
1720   grpc_error_get_status(error, GRPC_MILLIS_INF_FUTURE, nullptr, &slice,
1721                         &http_error, nullptr);
1722   grpc_chttp2_goaway_append(t->last_new_stream_id,
1723                             static_cast<uint32_t>(http_error),
1724                             grpc_slice_ref_internal(slice), &t->qbuf);
1725   grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT);
1726   GRPC_ERROR_UNREF(error);
1727 }
1728 
grpc_chttp2_add_ping_strike(grpc_chttp2_transport * t)1729 void grpc_chttp2_add_ping_strike(grpc_chttp2_transport* t) {
1730   if (++t->ping_recv_state.ping_strikes > t->ping_policy.max_ping_strikes &&
1731       t->ping_policy.max_ping_strikes != 0) {
1732     send_goaway(t,
1733                 grpc_error_set_int(
1734                     GRPC_ERROR_CREATE_FROM_STATIC_STRING("too_many_pings"),
1735                     GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM));
1736     // The transport will be closed after the write is done
1737     close_transport_locked(
1738         t, grpc_error_set_int(
1739                GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many pings"),
1740                GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1741   }
1742 }
1743 
grpc_chttp2_reset_ping_clock(grpc_chttp2_transport * t)1744 void grpc_chttp2_reset_ping_clock(grpc_chttp2_transport* t) {
1745   if (!t->is_client) {
1746     t->ping_recv_state.last_ping_recv_time = GRPC_MILLIS_INF_PAST;
1747     t->ping_recv_state.ping_strikes = 0;
1748   }
1749   t->ping_state.pings_before_data_required =
1750       t->ping_policy.max_pings_without_data;
1751 }
1752 
perform_transport_op_locked(void * stream_op,grpc_error *)1753 static void perform_transport_op_locked(void* stream_op,
1754                                         grpc_error* /*error_ignored*/) {
1755   grpc_transport_op* op = static_cast<grpc_transport_op*>(stream_op);
1756   grpc_chttp2_transport* t =
1757       static_cast<grpc_chttp2_transport*>(op->handler_private.extra_arg);
1758 
1759   if (op->goaway_error) {
1760     send_goaway(t, op->goaway_error);
1761   }
1762 
1763   if (op->set_accept_stream) {
1764     t->accept_stream_cb = op->set_accept_stream_fn;
1765     t->accept_stream_cb_user_data = op->set_accept_stream_user_data;
1766   }
1767 
1768   if (op->bind_pollset) {
1769     grpc_endpoint_add_to_pollset(t->ep, op->bind_pollset);
1770   }
1771 
1772   if (op->bind_pollset_set) {
1773     grpc_endpoint_add_to_pollset_set(t->ep, op->bind_pollset_set);
1774   }
1775 
1776   if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
1777     send_ping_locked(t, op->send_ping.on_initiate, op->send_ping.on_ack);
1778     grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING);
1779   }
1780 
1781   if (op->start_connectivity_watch != nullptr) {
1782     t->state_tracker.AddWatcher(op->start_connectivity_watch_state,
1783                                 std::move(op->start_connectivity_watch));
1784   }
1785   if (op->stop_connectivity_watch != nullptr) {
1786     t->state_tracker.RemoveWatcher(op->stop_connectivity_watch);
1787   }
1788 
1789   if (op->disconnect_with_error != GRPC_ERROR_NONE) {
1790     close_transport_locked(t, op->disconnect_with_error);
1791   }
1792 
1793   grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, GRPC_ERROR_NONE);
1794 
1795   GRPC_CHTTP2_UNREF_TRANSPORT(t, "transport_op");
1796 }
1797 
perform_transport_op(grpc_transport * gt,grpc_transport_op * op)1798 static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
1799   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
1800   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1801     gpr_log(GPR_INFO, "perform_transport_op[t=%p]: %s", t,
1802             grpc_transport_op_string(op).c_str());
1803   }
1804   op->handler_private.extra_arg = gt;
1805   GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op");
1806   t->combiner->Run(GRPC_CLOSURE_INIT(&op->handler_private.closure,
1807                                      perform_transport_op_locked, op, nullptr),
1808                    GRPC_ERROR_NONE);
1809 }
1810 
1811 //
1812 // INPUT PROCESSING - GENERAL
1813 //
1814 
grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport *,grpc_chttp2_stream * s)1815 void grpc_chttp2_maybe_complete_recv_initial_metadata(
1816     grpc_chttp2_transport* /*t*/, grpc_chttp2_stream* s) {
1817   if (s->recv_initial_metadata_ready != nullptr &&
1818       s->published_metadata[0] != GRPC_METADATA_NOT_PUBLISHED) {
1819     if (s->seen_error) {
1820       grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1821       if (!s->pending_byte_stream) {
1822         grpc_slice_buffer_reset_and_unref_internal(
1823             &s->unprocessed_incoming_frames_buffer);
1824       }
1825     }
1826     grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[0],
1827                                                  s->recv_initial_metadata);
1828     null_then_sched_closure(&s->recv_initial_metadata_ready);
1829   }
1830 }
1831 
grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport *,grpc_chttp2_stream * s)1832 void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* /*t*/,
1833                                              grpc_chttp2_stream* s) {
1834   grpc_error* error = GRPC_ERROR_NONE;
1835   if (s->recv_message_ready != nullptr) {
1836     *s->recv_message = nullptr;
1837     if (s->final_metadata_requested && s->seen_error) {
1838       grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1839       if (!s->pending_byte_stream) {
1840         grpc_slice_buffer_reset_and_unref_internal(
1841             &s->unprocessed_incoming_frames_buffer);
1842       }
1843     }
1844     if (!s->pending_byte_stream) {
1845       while (s->unprocessed_incoming_frames_buffer.length > 0 ||
1846              s->frame_storage.length > 0) {
1847         if (s->unprocessed_incoming_frames_buffer.length == 0) {
1848           grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer,
1849                                  &s->frame_storage);
1850           s->unprocessed_incoming_frames_decompressed = false;
1851         }
1852         if (!s->unprocessed_incoming_frames_decompressed &&
1853             s->stream_decompression_method !=
1854                 GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
1855           GPR_ASSERT(s->decompressed_data_buffer.length == 0);
1856           bool end_of_context;
1857           if (!s->stream_decompression_ctx) {
1858             s->stream_decompression_ctx =
1859                 grpc_stream_compression_context_create(
1860                     s->stream_decompression_method);
1861           }
1862           if (!grpc_stream_decompress(
1863                   s->stream_decompression_ctx,
1864                   &s->unprocessed_incoming_frames_buffer,
1865                   &s->decompressed_data_buffer, nullptr,
1866                   GRPC_HEADER_SIZE_IN_BYTES - s->decompressed_header_bytes,
1867                   &end_of_context)) {
1868             grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1869             grpc_slice_buffer_reset_and_unref_internal(
1870                 &s->unprocessed_incoming_frames_buffer);
1871             error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1872                 "Stream decompression error.");
1873           } else {
1874             s->decompressed_header_bytes += s->decompressed_data_buffer.length;
1875             if (s->decompressed_header_bytes == GRPC_HEADER_SIZE_IN_BYTES) {
1876               s->decompressed_header_bytes = 0;
1877             }
1878             error = grpc_deframe_unprocessed_incoming_frames(
1879                 &s->data_parser, s, &s->decompressed_data_buffer, nullptr,
1880                 s->recv_message);
1881             if (end_of_context) {
1882               grpc_stream_compression_context_destroy(
1883                   s->stream_decompression_ctx);
1884               s->stream_decompression_ctx = nullptr;
1885             }
1886           }
1887         } else {
1888           error = grpc_deframe_unprocessed_incoming_frames(
1889               &s->data_parser, s, &s->unprocessed_incoming_frames_buffer,
1890               nullptr, s->recv_message);
1891         }
1892         if (error != GRPC_ERROR_NONE) {
1893           s->seen_error = true;
1894           grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1895           grpc_slice_buffer_reset_and_unref_internal(
1896               &s->unprocessed_incoming_frames_buffer);
1897           break;
1898         } else if (*s->recv_message != nullptr) {
1899           break;
1900         }
1901       }
1902     }
1903     // save the length of the buffer before handing control back to application
1904     // threads. Needed to support correct flow control bookkeeping
1905     s->unprocessed_incoming_frames_buffer_cached_length =
1906         s->unprocessed_incoming_frames_buffer.length;
1907     if (error == GRPC_ERROR_NONE && *s->recv_message != nullptr) {
1908       null_then_sched_closure(&s->recv_message_ready);
1909     } else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) {
1910       *s->recv_message = nullptr;
1911       null_then_sched_closure(&s->recv_message_ready);
1912     }
1913     GRPC_ERROR_UNREF(error);
1914   }
1915 }
1916 
grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1917 void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
1918                                                        grpc_chttp2_stream* s) {
1919   grpc_chttp2_maybe_complete_recv_message(t, s);
1920   if (s->recv_trailing_metadata_finished != nullptr && s->read_closed &&
1921       s->write_closed) {
1922     if (s->seen_error || !t->is_client) {
1923       grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1924       if (!s->pending_byte_stream) {
1925         grpc_slice_buffer_reset_and_unref_internal(
1926             &s->unprocessed_incoming_frames_buffer);
1927       }
1928     }
1929     bool pending_data = s->pending_byte_stream ||
1930                         s->unprocessed_incoming_frames_buffer.length > 0;
1931     if (s->read_closed && s->frame_storage.length > 0 && !pending_data &&
1932         !s->seen_error && s->recv_trailing_metadata_finished != nullptr) {
1933       // Maybe some SYNC_FLUSH data is left in frame_storage. Consume them and
1934       // maybe decompress the next 5 bytes in the stream.
1935       if (s->stream_decompression_method ==
1936           GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
1937         grpc_slice_buffer_move_first(
1938             &s->frame_storage,
1939             GPR_MIN(s->frame_storage.length, GRPC_HEADER_SIZE_IN_BYTES),
1940             &s->unprocessed_incoming_frames_buffer);
1941         if (s->unprocessed_incoming_frames_buffer.length > 0) {
1942           s->unprocessed_incoming_frames_decompressed = true;
1943           pending_data = true;
1944         }
1945       } else {
1946         bool end_of_context;
1947         if (!s->stream_decompression_ctx) {
1948           s->stream_decompression_ctx = grpc_stream_compression_context_create(
1949               s->stream_decompression_method);
1950         }
1951         if (!grpc_stream_decompress(
1952                 s->stream_decompression_ctx, &s->frame_storage,
1953                 &s->unprocessed_incoming_frames_buffer, nullptr,
1954                 GRPC_HEADER_SIZE_IN_BYTES, &end_of_context)) {
1955           grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1956           grpc_slice_buffer_reset_and_unref_internal(
1957               &s->unprocessed_incoming_frames_buffer);
1958           s->seen_error = true;
1959         } else {
1960           if (s->unprocessed_incoming_frames_buffer.length > 0) {
1961             s->unprocessed_incoming_frames_decompressed = true;
1962             pending_data = true;
1963           }
1964           if (end_of_context) {
1965             grpc_stream_compression_context_destroy(
1966                 s->stream_decompression_ctx);
1967             s->stream_decompression_ctx = nullptr;
1968           }
1969         }
1970       }
1971     }
1972     if (s->read_closed && s->frame_storage.length == 0 && !pending_data &&
1973         s->recv_trailing_metadata_finished != nullptr) {
1974       grpc_transport_move_stats(&s->stats, s->collecting_stats);
1975       s->collecting_stats = nullptr;
1976       grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[1],
1977                                                    s->recv_trailing_metadata);
1978       null_then_sched_closure(&s->recv_trailing_metadata_finished);
1979     }
1980   }
1981 }
1982 
remove_stream(grpc_chttp2_transport * t,uint32_t id,grpc_error * error)1983 static void remove_stream(grpc_chttp2_transport* t, uint32_t id,
1984                           grpc_error* error) {
1985   grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(
1986       grpc_chttp2_stream_map_delete(&t->stream_map, id));
1987   GPR_DEBUG_ASSERT(s);
1988   if (t->incoming_stream == s) {
1989     t->incoming_stream = nullptr;
1990     grpc_chttp2_parsing_become_skip_parser(t);
1991   }
1992   if (s->pending_byte_stream) {
1993     if (s->on_next != nullptr) {
1994       grpc_core::Chttp2IncomingByteStream* bs = s->data_parser.parsing_frame;
1995       if (error == GRPC_ERROR_NONE) {
1996         error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
1997       }
1998       bs->PublishError(error);
1999       bs->Unref();
2000       s->data_parser.parsing_frame = nullptr;
2001     } else {
2002       GRPC_ERROR_UNREF(s->byte_stream_error);
2003       s->byte_stream_error = GRPC_ERROR_REF(error);
2004     }
2005   }
2006 
2007   if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
2008     post_benign_reclaimer(t);
2009     if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SENT) {
2010       close_transport_locked(
2011           t, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2012                  "Last stream closed after sending GOAWAY", &error, 1));
2013     }
2014   }
2015   if (grpc_chttp2_list_remove_writable_stream(t, s)) {
2016     GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:remove_stream");
2017   }
2018   grpc_chttp2_list_remove_stalled_by_stream(t, s);
2019   grpc_chttp2_list_remove_stalled_by_transport(t, s);
2020 
2021   GRPC_ERROR_UNREF(error);
2022 
2023   maybe_start_some_streams(t);
2024 }
2025 
grpc_chttp2_cancel_stream(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error * due_to_error)2026 void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2027                                grpc_error* due_to_error) {
2028   if (!t->is_client && !s->sent_trailing_metadata &&
2029       grpc_error_has_clear_grpc_status(due_to_error)) {
2030     close_from_api(t, s, due_to_error);
2031     return;
2032   }
2033 
2034   if (!s->read_closed || !s->write_closed) {
2035     if (s->id != 0) {
2036       grpc_http2_error_code http_error;
2037       grpc_error_get_status(due_to_error, s->deadline, nullptr, nullptr,
2038                             &http_error, nullptr);
2039       grpc_chttp2_add_rst_stream_to_next_write(
2040           t, s->id, static_cast<uint32_t>(http_error), &s->stats.outgoing);
2041       grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
2042     }
2043   }
2044   if (due_to_error != GRPC_ERROR_NONE && !s->seen_error) {
2045     s->seen_error = true;
2046   }
2047   grpc_chttp2_mark_stream_closed(t, s, 1, 1, due_to_error);
2048 }
2049 
grpc_chttp2_fake_status(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error * error)2050 void grpc_chttp2_fake_status(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2051                              grpc_error* error) {
2052   grpc_status_code status;
2053   grpc_slice slice;
2054   grpc_error_get_status(error, s->deadline, &status, &slice, nullptr, nullptr);
2055   if (status != GRPC_STATUS_OK) {
2056     s->seen_error = true;
2057   }
2058   // stream_global->recv_trailing_metadata_finished gives us a
2059   //   last chance replacement: we've received trailing metadata,
2060   //   but something more important has become available to signal
2061   //   to the upper layers - drop what we've got, and then publish
2062   //   what we want - which is safe because we haven't told anyone
2063   //   about the metadata yet
2064   if (s->published_metadata[1] == GRPC_METADATA_NOT_PUBLISHED ||
2065       s->recv_trailing_metadata_finished != nullptr) {
2066     char status_string[GPR_LTOA_MIN_BUFSIZE];
2067     gpr_ltoa(status, status_string);
2068     GRPC_LOG_IF_ERROR("add_status",
2069                       grpc_chttp2_incoming_metadata_buffer_replace_or_add(
2070                           &s->metadata_buffer[1],
2071                           grpc_mdelem_from_slices(
2072                               GRPC_MDSTR_GRPC_STATUS,
2073                               grpc_core::UnmanagedMemorySlice(status_string))));
2074     if (!GRPC_SLICE_IS_EMPTY(slice)) {
2075       GRPC_LOG_IF_ERROR(
2076           "add_status_message",
2077           grpc_chttp2_incoming_metadata_buffer_replace_or_add(
2078               &s->metadata_buffer[1],
2079               grpc_mdelem_create(GRPC_MDSTR_GRPC_MESSAGE, slice, nullptr)));
2080     }
2081     s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE;
2082     grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2083   }
2084 
2085   GRPC_ERROR_UNREF(error);
2086 }
2087 
add_error(grpc_error * error,grpc_error ** refs,size_t * nrefs)2088 static void add_error(grpc_error* error, grpc_error** refs, size_t* nrefs) {
2089   if (error == GRPC_ERROR_NONE) return;
2090   for (size_t i = 0; i < *nrefs; i++) {
2091     if (error == refs[i]) {
2092       return;
2093     }
2094   }
2095   refs[*nrefs] = error;
2096   ++*nrefs;
2097 }
2098 
removal_error(grpc_error * extra_error,grpc_chttp2_stream * s,const char * master_error_msg)2099 static grpc_error* removal_error(grpc_error* extra_error, grpc_chttp2_stream* s,
2100                                  const char* master_error_msg) {
2101   grpc_error* refs[3];
2102   size_t nrefs = 0;
2103   add_error(s->read_closed_error, refs, &nrefs);
2104   add_error(s->write_closed_error, refs, &nrefs);
2105   add_error(extra_error, refs, &nrefs);
2106   grpc_error* error = GRPC_ERROR_NONE;
2107   if (nrefs > 0) {
2108     error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(master_error_msg,
2109                                                              refs, nrefs);
2110   }
2111   GRPC_ERROR_UNREF(extra_error);
2112   return error;
2113 }
2114 
flush_write_list(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_chttp2_write_cb ** list,grpc_error * error)2115 static void flush_write_list(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2116                              grpc_chttp2_write_cb** list, grpc_error* error) {
2117   while (*list) {
2118     grpc_chttp2_write_cb* cb = *list;
2119     *list = cb->next;
2120     grpc_chttp2_complete_closure_step(t, s, &cb->closure, GRPC_ERROR_REF(error),
2121                                       "on_write_finished_cb");
2122     cb->next = t->write_cb_pool;
2123     t->write_cb_pool = cb;
2124   }
2125   GRPC_ERROR_UNREF(error);
2126 }
2127 
grpc_chttp2_fail_pending_writes(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error * error)2128 void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t,
2129                                      grpc_chttp2_stream* s, grpc_error* error) {
2130   error =
2131       removal_error(error, s, "Pending writes failed due to stream closure");
2132   s->send_initial_metadata = nullptr;
2133   grpc_chttp2_complete_closure_step(t, s, &s->send_initial_metadata_finished,
2134                                     GRPC_ERROR_REF(error),
2135                                     "send_initial_metadata_finished");
2136 
2137   s->send_trailing_metadata = nullptr;
2138   s->sent_trailing_metadata_op = nullptr;
2139   grpc_chttp2_complete_closure_step(t, s, &s->send_trailing_metadata_finished,
2140                                     GRPC_ERROR_REF(error),
2141                                     "send_trailing_metadata_finished");
2142 
2143   s->fetching_send_message.reset();
2144   grpc_chttp2_complete_closure_step(t, s, &s->fetching_send_message_finished,
2145                                     GRPC_ERROR_REF(error),
2146                                     "fetching_send_message_finished");
2147   flush_write_list(t, s, &s->on_write_finished_cbs, GRPC_ERROR_REF(error));
2148   flush_write_list(t, s, &s->on_flow_controlled_cbs, error);
2149 }
2150 
grpc_chttp2_mark_stream_closed(grpc_chttp2_transport * t,grpc_chttp2_stream * s,int close_reads,int close_writes,grpc_error * error)2151 void grpc_chttp2_mark_stream_closed(grpc_chttp2_transport* t,
2152                                     grpc_chttp2_stream* s, int close_reads,
2153                                     int close_writes, grpc_error* error) {
2154   if (s->read_closed && s->write_closed) {
2155     // already closed, but we should still fake the status if needed.
2156     grpc_error* overall_error = removal_error(error, s, "Stream removed");
2157     if (overall_error != GRPC_ERROR_NONE) {
2158       grpc_chttp2_fake_status(t, s, overall_error);
2159     }
2160     grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2161     return;
2162   }
2163   bool closed_read = false;
2164   bool became_closed = false;
2165   if (close_reads && !s->read_closed) {
2166     s->read_closed_error = GRPC_ERROR_REF(error);
2167     s->read_closed = true;
2168     closed_read = true;
2169   }
2170   if (close_writes && !s->write_closed) {
2171     s->write_closed_error = GRPC_ERROR_REF(error);
2172     s->write_closed = true;
2173     grpc_chttp2_fail_pending_writes(t, s, GRPC_ERROR_REF(error));
2174   }
2175   if (s->read_closed && s->write_closed) {
2176     became_closed = true;
2177     grpc_error* overall_error =
2178         removal_error(GRPC_ERROR_REF(error), s, "Stream removed");
2179     if (s->id != 0) {
2180       remove_stream(t, s->id, GRPC_ERROR_REF(overall_error));
2181     } else {
2182       // Purge streams waiting on concurrency still waiting for id assignment
2183       grpc_chttp2_list_remove_waiting_for_concurrency(t, s);
2184     }
2185     if (overall_error != GRPC_ERROR_NONE) {
2186       grpc_chttp2_fake_status(t, s, overall_error);
2187     }
2188   }
2189   if (closed_read) {
2190     for (int i = 0; i < 2; i++) {
2191       if (s->published_metadata[i] == GRPC_METADATA_NOT_PUBLISHED) {
2192         s->published_metadata[i] = GRPC_METADATA_PUBLISHED_AT_CLOSE;
2193       }
2194     }
2195     grpc_chttp2_maybe_complete_recv_initial_metadata(t, s);
2196     grpc_chttp2_maybe_complete_recv_message(t, s);
2197   }
2198   if (became_closed) {
2199     grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2200     GRPC_CHTTP2_STREAM_UNREF(s, "chttp2");
2201   }
2202   GRPC_ERROR_UNREF(error);
2203 }
2204 
close_from_api(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error * error)2205 static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2206                            grpc_error* error) {
2207   grpc_slice hdr;
2208   grpc_slice status_hdr;
2209   grpc_slice http_status_hdr;
2210   grpc_slice content_type_hdr;
2211   grpc_slice message_pfx;
2212   uint8_t* p;
2213   uint32_t len = 0;
2214   grpc_status_code grpc_status;
2215   grpc_slice slice;
2216   grpc_error_get_status(error, s->deadline, &grpc_status, &slice, nullptr,
2217                         nullptr);
2218 
2219   GPR_ASSERT(grpc_status >= 0 && (int)grpc_status < 100);
2220 
2221   // Hand roll a header block.
2222   //   This is unnecessarily ugly - at some point we should find a more
2223   //   elegant solution.
2224   //   It's complicated by the fact that our send machinery would be dead by
2225   //   the time we got around to sending this, so instead we ignore HPACK
2226   //   compression and just write the uncompressed bytes onto the wire.
2227   if (!s->sent_initial_metadata) {
2228     http_status_hdr = GRPC_SLICE_MALLOC(13);
2229     p = GRPC_SLICE_START_PTR(http_status_hdr);
2230     *p++ = 0x00;
2231     *p++ = 7;
2232     *p++ = ':';
2233     *p++ = 's';
2234     *p++ = 't';
2235     *p++ = 'a';
2236     *p++ = 't';
2237     *p++ = 'u';
2238     *p++ = 's';
2239     *p++ = 3;
2240     *p++ = '2';
2241     *p++ = '0';
2242     *p++ = '0';
2243     GPR_ASSERT(p == GRPC_SLICE_END_PTR(http_status_hdr));
2244     len += static_cast<uint32_t> GRPC_SLICE_LENGTH(http_status_hdr);
2245 
2246     content_type_hdr = GRPC_SLICE_MALLOC(31);
2247     p = GRPC_SLICE_START_PTR(content_type_hdr);
2248     *p++ = 0x00;
2249     *p++ = 12;
2250     *p++ = 'c';
2251     *p++ = 'o';
2252     *p++ = 'n';
2253     *p++ = 't';
2254     *p++ = 'e';
2255     *p++ = 'n';
2256     *p++ = 't';
2257     *p++ = '-';
2258     *p++ = 't';
2259     *p++ = 'y';
2260     *p++ = 'p';
2261     *p++ = 'e';
2262     *p++ = 16;
2263     *p++ = 'a';
2264     *p++ = 'p';
2265     *p++ = 'p';
2266     *p++ = 'l';
2267     *p++ = 'i';
2268     *p++ = 'c';
2269     *p++ = 'a';
2270     *p++ = 't';
2271     *p++ = 'i';
2272     *p++ = 'o';
2273     *p++ = 'n';
2274     *p++ = '/';
2275     *p++ = 'g';
2276     *p++ = 'r';
2277     *p++ = 'p';
2278     *p++ = 'c';
2279     GPR_ASSERT(p == GRPC_SLICE_END_PTR(content_type_hdr));
2280     len += static_cast<uint32_t> GRPC_SLICE_LENGTH(content_type_hdr);
2281   }
2282 
2283   status_hdr = GRPC_SLICE_MALLOC(15 + (grpc_status >= 10));
2284   p = GRPC_SLICE_START_PTR(status_hdr);
2285   *p++ = 0x00; /* literal header, not indexed */
2286   *p++ = 11;   /* len(grpc-status) */
2287   *p++ = 'g';
2288   *p++ = 'r';
2289   *p++ = 'p';
2290   *p++ = 'c';
2291   *p++ = '-';
2292   *p++ = 's';
2293   *p++ = 't';
2294   *p++ = 'a';
2295   *p++ = 't';
2296   *p++ = 'u';
2297   *p++ = 's';
2298   if (grpc_status < 10) {
2299     *p++ = 1;
2300     *p++ = static_cast<uint8_t>('0' + grpc_status);
2301   } else {
2302     *p++ = 2;
2303     *p++ = static_cast<uint8_t>('0' + (grpc_status / 10));
2304     *p++ = static_cast<uint8_t>('0' + (grpc_status % 10));
2305   }
2306   GPR_ASSERT(p == GRPC_SLICE_END_PTR(status_hdr));
2307   len += static_cast<uint32_t> GRPC_SLICE_LENGTH(status_hdr);
2308 
2309   size_t msg_len = GRPC_SLICE_LENGTH(slice);
2310   GPR_ASSERT(msg_len <= UINT32_MAX);
2311   uint32_t msg_len_len = GRPC_CHTTP2_VARINT_LENGTH((uint32_t)msg_len, 1);
2312   message_pfx = GRPC_SLICE_MALLOC(14 + msg_len_len);
2313   p = GRPC_SLICE_START_PTR(message_pfx);
2314   *p++ = 0x00; /* literal header, not indexed */
2315   *p++ = 12;   /* len(grpc-message) */
2316   *p++ = 'g';
2317   *p++ = 'r';
2318   *p++ = 'p';
2319   *p++ = 'c';
2320   *p++ = '-';
2321   *p++ = 'm';
2322   *p++ = 'e';
2323   *p++ = 's';
2324   *p++ = 's';
2325   *p++ = 'a';
2326   *p++ = 'g';
2327   *p++ = 'e';
2328   GRPC_CHTTP2_WRITE_VARINT((uint32_t)msg_len, 1, 0, p, (uint32_t)msg_len_len);
2329   p += msg_len_len;
2330   GPR_ASSERT(p == GRPC_SLICE_END_PTR(message_pfx));
2331   len += static_cast<uint32_t> GRPC_SLICE_LENGTH(message_pfx);
2332   len += static_cast<uint32_t>(msg_len);
2333 
2334   hdr = GRPC_SLICE_MALLOC(9);
2335   p = GRPC_SLICE_START_PTR(hdr);
2336   *p++ = static_cast<uint8_t>(len >> 16);
2337   *p++ = static_cast<uint8_t>(len >> 8);
2338   *p++ = static_cast<uint8_t>(len);
2339   *p++ = GRPC_CHTTP2_FRAME_HEADER;
2340   *p++ = GRPC_CHTTP2_DATA_FLAG_END_STREAM | GRPC_CHTTP2_DATA_FLAG_END_HEADERS;
2341   *p++ = static_cast<uint8_t>(s->id >> 24);
2342   *p++ = static_cast<uint8_t>(s->id >> 16);
2343   *p++ = static_cast<uint8_t>(s->id >> 8);
2344   *p++ = static_cast<uint8_t>(s->id);
2345   GPR_ASSERT(p == GRPC_SLICE_END_PTR(hdr));
2346 
2347   grpc_slice_buffer_add(&t->qbuf, hdr);
2348   if (!s->sent_initial_metadata) {
2349     grpc_slice_buffer_add(&t->qbuf, http_status_hdr);
2350     grpc_slice_buffer_add(&t->qbuf, content_type_hdr);
2351   }
2352   grpc_slice_buffer_add(&t->qbuf, status_hdr);
2353   grpc_slice_buffer_add(&t->qbuf, message_pfx);
2354   grpc_slice_buffer_add(&t->qbuf, grpc_slice_ref_internal(slice));
2355   grpc_chttp2_reset_ping_clock(t);
2356   grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR,
2357                                            &s->stats.outgoing);
2358 
2359   grpc_chttp2_mark_stream_closed(t, s, 1, 1, error);
2360   grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API);
2361 }
2362 
2363 struct cancel_stream_cb_args {
2364   grpc_error* error;
2365   grpc_chttp2_transport* t;
2366 };
2367 
cancel_stream_cb(void * user_data,uint32_t,void * stream)2368 static void cancel_stream_cb(void* user_data, uint32_t /*key*/, void* stream) {
2369   cancel_stream_cb_args* args = static_cast<cancel_stream_cb_args*>(user_data);
2370   grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(stream);
2371   grpc_chttp2_cancel_stream(args->t, s, GRPC_ERROR_REF(args->error));
2372 }
2373 
end_all_the_calls(grpc_chttp2_transport * t,grpc_error * error)2374 static void end_all_the_calls(grpc_chttp2_transport* t, grpc_error* error) {
2375   intptr_t http2_error;
2376   // If there is no explicit grpc or HTTP/2 error, set to UNAVAILABLE on server.
2377   if (!t->is_client && !grpc_error_has_clear_grpc_status(error) &&
2378       !grpc_error_get_int(error, GRPC_ERROR_INT_HTTP2_ERROR, &http2_error)) {
2379     error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
2380                                GRPC_STATUS_UNAVAILABLE);
2381   }
2382   cancel_stream_cb_args args = {error, t};
2383   grpc_chttp2_stream_map_for_each(&t->stream_map, cancel_stream_cb, &args);
2384   GRPC_ERROR_UNREF(error);
2385 }
2386 
2387 //
2388 // INPUT PROCESSING - PARSING
2389 //
2390 
2391 template <class F>
WithUrgency(grpc_chttp2_transport * t,grpc_core::chttp2::FlowControlAction::Urgency urgency,grpc_chttp2_initiate_write_reason reason,F action)2392 static void WithUrgency(grpc_chttp2_transport* t,
2393                         grpc_core::chttp2::FlowControlAction::Urgency urgency,
2394                         grpc_chttp2_initiate_write_reason reason, F action) {
2395   switch (urgency) {
2396     case grpc_core::chttp2::FlowControlAction::Urgency::NO_ACTION_NEEDED:
2397       break;
2398     case grpc_core::chttp2::FlowControlAction::Urgency::UPDATE_IMMEDIATELY:
2399       grpc_chttp2_initiate_write(t, reason);
2400     // fallthrough
2401     case grpc_core::chttp2::FlowControlAction::Urgency::QUEUE_UPDATE:
2402       action();
2403       break;
2404   }
2405 }
2406 
grpc_chttp2_act_on_flowctl_action(const grpc_core::chttp2::FlowControlAction & action,grpc_chttp2_transport * t,grpc_chttp2_stream * s)2407 void grpc_chttp2_act_on_flowctl_action(
2408     const grpc_core::chttp2::FlowControlAction& action,
2409     grpc_chttp2_transport* t, grpc_chttp2_stream* s) {
2410   WithUrgency(t, action.send_stream_update(),
2411               GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL,
2412               [t, s]() { grpc_chttp2_mark_stream_writable(t, s); });
2413   WithUrgency(t, action.send_transport_update(),
2414               GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL, []() {});
2415   WithUrgency(t, action.send_initial_window_update(),
2416               GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
2417                 queue_setting_update(t,
2418                                      GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
2419                                      action.initial_window_size());
2420               });
2421   WithUrgency(t, action.send_max_frame_size_update(),
2422               GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
2423                 queue_setting_update(t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
2424                                      action.max_frame_size());
2425               });
2426 }
2427 
try_http_parsing(grpc_chttp2_transport * t)2428 static grpc_error* try_http_parsing(grpc_chttp2_transport* t) {
2429   grpc_http_parser parser;
2430   size_t i = 0;
2431   grpc_error* error = GRPC_ERROR_NONE;
2432   grpc_http_response response;
2433 
2434   grpc_http_parser_init(&parser, GRPC_HTTP_RESPONSE, &response);
2435 
2436   grpc_error* parse_error = GRPC_ERROR_NONE;
2437   for (; i < t->read_buffer.count && parse_error == GRPC_ERROR_NONE; i++) {
2438     parse_error =
2439         grpc_http_parser_parse(&parser, t->read_buffer.slices[i], nullptr);
2440   }
2441   if (parse_error == GRPC_ERROR_NONE &&
2442       (parse_error = grpc_http_parser_eof(&parser)) == GRPC_ERROR_NONE) {
2443     error = grpc_error_set_int(
2444         grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2445                                "Trying to connect an http1.x server"),
2446                            GRPC_ERROR_INT_HTTP_STATUS, response.status),
2447         GRPC_ERROR_INT_GRPC_STATUS,
2448         grpc_http2_status_to_grpc_status(response.status));
2449   }
2450   GRPC_ERROR_UNREF(parse_error);
2451 
2452   grpc_http_parser_destroy(&parser);
2453   grpc_http_response_destroy(&response);
2454   return error;
2455 }
2456 
read_action(void * tp,grpc_error * error)2457 static void read_action(void* tp, grpc_error* error) {
2458   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2459   t->combiner->Run(
2460       GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t, nullptr),
2461       GRPC_ERROR_REF(error));
2462 }
2463 
read_action_locked(void * tp,grpc_error * error)2464 static void read_action_locked(void* tp, grpc_error* error) {
2465   GPR_TIMER_SCOPE("reading_action_locked", 0);
2466 
2467   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2468 
2469   GRPC_ERROR_REF(error);
2470 
2471   grpc_error* err = error;
2472   if (err != GRPC_ERROR_NONE) {
2473     err = grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2474                                  "Endpoint read failed", &err, 1),
2475                              GRPC_ERROR_INT_OCCURRED_DURING_WRITE,
2476                              t->write_state);
2477   }
2478   GPR_SWAP(grpc_error*, err, error);
2479   GRPC_ERROR_UNREF(err);
2480   if (t->closed_with_error == GRPC_ERROR_NONE) {
2481     GPR_TIMER_SCOPE("reading_action.parse", 0);
2482     size_t i = 0;
2483     grpc_error* errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE,
2484                              GRPC_ERROR_NONE};
2485     for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) {
2486       errors[1] = grpc_chttp2_perform_read(t, t->read_buffer.slices[i]);
2487     }
2488     if (errors[1] != GRPC_ERROR_NONE) {
2489       errors[2] = try_http_parsing(t);
2490       GRPC_ERROR_UNREF(error);
2491       error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2492           "Failed parsing HTTP/2", errors, GPR_ARRAY_SIZE(errors));
2493     }
2494     for (i = 0; i < GPR_ARRAY_SIZE(errors); i++) {
2495       GRPC_ERROR_UNREF(errors[i]);
2496     }
2497 
2498     GPR_TIMER_SCOPE("post_parse_locked", 0);
2499     if (t->initial_window_update != 0) {
2500       if (t->initial_window_update > 0) {
2501         grpc_chttp2_stream* s;
2502         while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) {
2503           grpc_chttp2_mark_stream_writable(t, s);
2504           grpc_chttp2_initiate_write(
2505               t, GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING);
2506         }
2507       }
2508       t->initial_window_update = 0;
2509     }
2510   }
2511 
2512   GPR_TIMER_SCOPE("post_reading_action_locked", 0);
2513   bool keep_reading = false;
2514   if (error == GRPC_ERROR_NONE && t->closed_with_error != GRPC_ERROR_NONE) {
2515     error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2516         "Transport closed", &t->closed_with_error, 1);
2517   }
2518   if (error != GRPC_ERROR_NONE) {
2519     // If a goaway frame was received, this might be the reason why the read
2520     // failed. Add this info to the error
2521     if (t->goaway_error != GRPC_ERROR_NONE) {
2522       error = grpc_error_add_child(error, GRPC_ERROR_REF(t->goaway_error));
2523     }
2524 
2525     close_transport_locked(t, GRPC_ERROR_REF(error));
2526     t->endpoint_reading = 0;
2527   } else if (t->closed_with_error == GRPC_ERROR_NONE) {
2528     keep_reading = true;
2529     // Since we have read a byte, reset the keepalive timer
2530     if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
2531       grpc_timer_cancel(&t->keepalive_ping_timer);
2532     }
2533   }
2534   grpc_slice_buffer_reset_and_unref_internal(&t->read_buffer);
2535 
2536   if (keep_reading) {
2537     if (t->num_pending_induced_frames >= DEFAULT_MAX_PENDING_INDUCED_FRAMES) {
2538       t->reading_paused_on_pending_induced_frames = true;
2539       GRPC_CHTTP2_IF_TRACING(
2540           gpr_log(GPR_INFO,
2541                   "transport %p : Pausing reading due to too "
2542                   "many unwritten SETTINGS ACK and RST_STREAM frames",
2543                   t));
2544     } else {
2545       continue_read_action_locked(t);
2546     }
2547   } else {
2548     GRPC_CHTTP2_UNREF_TRANSPORT(t, "reading_action");
2549   }
2550 
2551   GRPC_ERROR_UNREF(error);
2552 }
2553 
continue_read_action_locked(grpc_chttp2_transport * t)2554 static void continue_read_action_locked(grpc_chttp2_transport* t) {
2555   const bool urgent = t->goaway_error != GRPC_ERROR_NONE;
2556   GRPC_CLOSURE_INIT(&t->read_action_locked, read_action, t,
2557                     grpc_schedule_on_exec_ctx);
2558   grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action_locked, urgent);
2559   grpc_chttp2_act_on_flowctl_action(t->flow_control->MakeAction(), t, nullptr);
2560 }
2561 
2562 // t is reffed prior to calling the first time, and once the callback chain
2563 // that kicks off finishes, it's unreffed
schedule_bdp_ping_locked(grpc_chttp2_transport * t)2564 void schedule_bdp_ping_locked(grpc_chttp2_transport* t) {
2565   t->flow_control->bdp_estimator()->SchedulePing();
2566   send_ping_locked(
2567       t,
2568       GRPC_CLOSURE_INIT(&t->start_bdp_ping_locked, start_bdp_ping, t,
2569                         grpc_schedule_on_exec_ctx),
2570       GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked, finish_bdp_ping, t,
2571                         grpc_schedule_on_exec_ctx));
2572   // TODO(yashykt): Enabling this causes internal b/168345569. Re-enable once
2573   // fixed.
2574   // grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_BDP_PING);
2575 }
2576 
start_bdp_ping(void * tp,grpc_error * error)2577 static void start_bdp_ping(void* tp, grpc_error* error) {
2578   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2579   t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_bdp_ping_locked,
2580                                      start_bdp_ping_locked, t, nullptr),
2581                    GRPC_ERROR_REF(error));
2582 }
2583 
start_bdp_ping_locked(void * tp,grpc_error * error)2584 static void start_bdp_ping_locked(void* tp, grpc_error* error) {
2585   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2586   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
2587     gpr_log(GPR_INFO, "%s: Start BDP ping err=%s", t->peer_string.c_str(),
2588             grpc_error_string(error));
2589   }
2590   if (error != GRPC_ERROR_NONE || t->closed_with_error != GRPC_ERROR_NONE) {
2591     return;
2592   }
2593   // Reset the keepalive ping timer
2594   if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
2595     grpc_timer_cancel(&t->keepalive_ping_timer);
2596   }
2597   t->flow_control->bdp_estimator()->StartPing();
2598   t->bdp_ping_started = true;
2599 }
2600 
finish_bdp_ping(void * tp,grpc_error * error)2601 static void finish_bdp_ping(void* tp, grpc_error* error) {
2602   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2603   t->combiner->Run(GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked,
2604                                      finish_bdp_ping_locked, t, nullptr),
2605                    GRPC_ERROR_REF(error));
2606 }
2607 
finish_bdp_ping_locked(void * tp,grpc_error * error)2608 static void finish_bdp_ping_locked(void* tp, grpc_error* error) {
2609   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2610   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
2611     gpr_log(GPR_INFO, "%s: Complete BDP ping err=%s", t->peer_string.c_str(),
2612             grpc_error_string(error));
2613   }
2614   if (error != GRPC_ERROR_NONE || t->closed_with_error != GRPC_ERROR_NONE) {
2615     GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
2616     return;
2617   }
2618   if (!t->bdp_ping_started) {
2619     // start_bdp_ping_locked has not been run yet. Schedule
2620     // finish_bdp_ping_locked to be run later.
2621     t->combiner->Run(GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked,
2622                                        finish_bdp_ping_locked, t, nullptr),
2623                      GRPC_ERROR_REF(error));
2624     return;
2625   }
2626   t->bdp_ping_started = false;
2627   grpc_millis next_ping = t->flow_control->bdp_estimator()->CompletePing();
2628   grpc_chttp2_act_on_flowctl_action(t->flow_control->PeriodicUpdate(), t,
2629                                     nullptr);
2630   GPR_ASSERT(!t->have_next_bdp_ping_timer);
2631   t->have_next_bdp_ping_timer = true;
2632   GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked,
2633                     next_bdp_ping_timer_expired, t, grpc_schedule_on_exec_ctx);
2634   grpc_timer_init(&t->next_bdp_ping_timer, next_ping,
2635                   &t->next_bdp_ping_timer_expired_locked);
2636 }
2637 
next_bdp_ping_timer_expired(void * tp,grpc_error * error)2638 static void next_bdp_ping_timer_expired(void* tp, grpc_error* error) {
2639   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2640   t->combiner->Run(
2641       GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked,
2642                         next_bdp_ping_timer_expired_locked, t, nullptr),
2643       GRPC_ERROR_REF(error));
2644 }
2645 
next_bdp_ping_timer_expired_locked(void * tp,grpc_error * error)2646 static void next_bdp_ping_timer_expired_locked(void* tp, grpc_error* error) {
2647   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2648   GPR_ASSERT(t->have_next_bdp_ping_timer);
2649   t->have_next_bdp_ping_timer = false;
2650   if (error != GRPC_ERROR_NONE) {
2651     GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
2652     return;
2653   }
2654   if (t->flow_control->bdp_estimator()->accumulator() == 0) {
2655     // Block the bdp ping till we receive more data.
2656     t->bdp_ping_blocked = true;
2657     GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
2658   } else {
2659     schedule_bdp_ping_locked(t);
2660   }
2661 }
2662 
grpc_chttp2_config_default_keepalive_args(grpc_channel_args * args,bool is_client)2663 void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
2664                                                bool is_client) {
2665   size_t i;
2666   if (args) {
2667     for (i = 0; i < args->num_args; i++) {
2668       if (0 == strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) {
2669         const int value = grpc_channel_arg_get_integer(
2670             &args->args[i], {is_client ? g_default_client_keepalive_time_ms
2671                                        : g_default_server_keepalive_time_ms,
2672                              1, INT_MAX});
2673         if (is_client) {
2674           g_default_client_keepalive_time_ms = value;
2675         } else {
2676           g_default_server_keepalive_time_ms = value;
2677         }
2678       } else if (0 ==
2679                  strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) {
2680         const int value = grpc_channel_arg_get_integer(
2681             &args->args[i], {is_client ? g_default_client_keepalive_timeout_ms
2682                                        : g_default_server_keepalive_timeout_ms,
2683                              0, INT_MAX});
2684         if (is_client) {
2685           g_default_client_keepalive_timeout_ms = value;
2686         } else {
2687           g_default_server_keepalive_timeout_ms = value;
2688         }
2689       } else if (0 == strcmp(args->args[i].key,
2690                              GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) {
2691         const bool value = static_cast<uint32_t>(grpc_channel_arg_get_integer(
2692             &args->args[i],
2693             {is_client ? g_default_client_keepalive_permit_without_calls
2694                        : g_default_server_keepalive_timeout_ms,
2695              0, 1}));
2696         if (is_client) {
2697           g_default_client_keepalive_permit_without_calls = value;
2698         } else {
2699           g_default_server_keepalive_permit_without_calls = value;
2700         }
2701       } else if (0 ==
2702                  strcmp(args->args[i].key, GRPC_ARG_HTTP2_MAX_PING_STRIKES)) {
2703         g_default_max_ping_strikes = grpc_channel_arg_get_integer(
2704             &args->args[i], {g_default_max_ping_strikes, 0, INT_MAX});
2705       } else if (0 == strcmp(args->args[i].key,
2706                              GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) {
2707         g_default_max_pings_without_data = grpc_channel_arg_get_integer(
2708             &args->args[i], {g_default_max_pings_without_data, 0, INT_MAX});
2709       } else if (0 ==
2710                  strcmp(
2711                      args->args[i].key,
2712                      GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS)) {
2713         g_default_min_recv_ping_interval_without_data_ms =
2714             grpc_channel_arg_get_integer(
2715                 &args->args[i],
2716                 {g_default_min_recv_ping_interval_without_data_ms, 0, INT_MAX});
2717       }
2718     }
2719   }
2720 }
2721 
init_keepalive_ping(void * arg,grpc_error * error)2722 static void init_keepalive_ping(void* arg, grpc_error* error) {
2723   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2724   t->combiner->Run(GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked,
2725                                      init_keepalive_ping_locked, t, nullptr),
2726                    GRPC_ERROR_REF(error));
2727 }
2728 
init_keepalive_ping_locked(void * arg,grpc_error * error)2729 static void init_keepalive_ping_locked(void* arg, grpc_error* error) {
2730   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2731   GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING);
2732   if (t->destroying || t->closed_with_error != GRPC_ERROR_NONE) {
2733     t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
2734   } else if (error == GRPC_ERROR_NONE) {
2735     if (t->keepalive_permit_without_calls ||
2736         grpc_chttp2_stream_map_size(&t->stream_map) > 0) {
2737       t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING;
2738       GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end");
2739       grpc_timer_init_unset(&t->keepalive_watchdog_timer);
2740       send_keepalive_ping_locked(t);
2741       grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING);
2742     } else {
2743       GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
2744       GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t,
2745                         grpc_schedule_on_exec_ctx);
2746       grpc_timer_init(&t->keepalive_ping_timer,
2747                       grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
2748                       &t->init_keepalive_ping_locked);
2749     }
2750   } else if (error == GRPC_ERROR_CANCELLED) {
2751     // The keepalive ping timer may be cancelled by bdp
2752     GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
2753     GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t,
2754                       grpc_schedule_on_exec_ctx);
2755     grpc_timer_init(&t->keepalive_ping_timer,
2756                     grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
2757                     &t->init_keepalive_ping_locked);
2758   }
2759   GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping");
2760 }
2761 
start_keepalive_ping(void * arg,grpc_error * error)2762 static void start_keepalive_ping(void* arg, grpc_error* error) {
2763   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2764   t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked,
2765                                      start_keepalive_ping_locked, t, nullptr),
2766                    GRPC_ERROR_REF(error));
2767 }
2768 
start_keepalive_ping_locked(void * arg,grpc_error * error)2769 static void start_keepalive_ping_locked(void* arg, grpc_error* error) {
2770   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2771   if (error != GRPC_ERROR_NONE) {
2772     return;
2773   }
2774   if (t->channelz_socket != nullptr) {
2775     t->channelz_socket->RecordKeepaliveSent();
2776   }
2777   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
2778       GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
2779     gpr_log(GPR_INFO, "%s: Start keepalive ping", t->peer_string.c_str());
2780   }
2781   GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog");
2782   GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked,
2783                     keepalive_watchdog_fired, t, grpc_schedule_on_exec_ctx);
2784   grpc_timer_init(&t->keepalive_watchdog_timer,
2785                   grpc_core::ExecCtx::Get()->Now() + t->keepalive_timeout,
2786                   &t->keepalive_watchdog_fired_locked);
2787   t->keepalive_ping_started = true;
2788 }
2789 
finish_keepalive_ping(void * arg,grpc_error * error)2790 static void finish_keepalive_ping(void* arg, grpc_error* error) {
2791   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2792   t->combiner->Run(GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked,
2793                                      finish_keepalive_ping_locked, t, nullptr),
2794                    GRPC_ERROR_REF(error));
2795 }
2796 
finish_keepalive_ping_locked(void * arg,grpc_error * error)2797 static void finish_keepalive_ping_locked(void* arg, grpc_error* error) {
2798   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2799   if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
2800     if (error == GRPC_ERROR_NONE) {
2801       if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
2802           GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
2803         gpr_log(GPR_INFO, "%s: Finish keepalive ping", t->peer_string.c_str());
2804       }
2805       if (!t->keepalive_ping_started) {
2806         // start_keepalive_ping_locked has not run yet. Reschedule
2807         // finish_keepalive_ping_locked for it to be run later.
2808         t->combiner->Run(
2809             GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked,
2810                               finish_keepalive_ping_locked, t, nullptr),
2811             GRPC_ERROR_REF(error));
2812         return;
2813       }
2814       t->keepalive_ping_started = false;
2815       t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
2816       grpc_timer_cancel(&t->keepalive_watchdog_timer);
2817       GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
2818       GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t,
2819                         grpc_schedule_on_exec_ctx);
2820       grpc_timer_init(&t->keepalive_ping_timer,
2821                       grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
2822                       &t->init_keepalive_ping_locked);
2823     }
2824   }
2825   GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive ping end");
2826 }
2827 
keepalive_watchdog_fired(void * arg,grpc_error * error)2828 static void keepalive_watchdog_fired(void* arg, grpc_error* error) {
2829   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2830   t->combiner->Run(
2831       GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked,
2832                         keepalive_watchdog_fired_locked, t, nullptr),
2833       GRPC_ERROR_REF(error));
2834 }
2835 
keepalive_watchdog_fired_locked(void * arg,grpc_error * error)2836 static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error) {
2837   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2838   if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
2839     if (error == GRPC_ERROR_NONE) {
2840       gpr_log(GPR_INFO, "%s: Keepalive watchdog fired. Closing transport.",
2841               t->peer_string.c_str());
2842       t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
2843       close_transport_locked(
2844           t, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2845                                     "keepalive watchdog timeout"),
2846                                 GRPC_ERROR_INT_GRPC_STATUS,
2847                                 GRPC_STATUS_UNAVAILABLE));
2848     }
2849   } else {
2850     // The watchdog timer should have been cancelled by
2851     // finish_keepalive_ping_locked.
2852     if (GPR_UNLIKELY(error != GRPC_ERROR_CANCELLED)) {
2853       gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)",
2854               t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING);
2855     }
2856   }
2857   GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog");
2858 }
2859 
2860 //
2861 // CALLBACK LOOP
2862 //
2863 
connectivity_state_set(grpc_chttp2_transport * t,grpc_connectivity_state state,const absl::Status & status,const char * reason)2864 static void connectivity_state_set(grpc_chttp2_transport* t,
2865                                    grpc_connectivity_state state,
2866                                    const absl::Status& status,
2867                                    const char* reason) {
2868   GRPC_CHTTP2_IF_TRACING(
2869       gpr_log(GPR_INFO, "transport %p set connectivity_state=%d", t, state));
2870   t->state_tracker.SetState(state, status, reason);
2871 }
2872 
2873 //
2874 // POLLSET STUFF
2875 //
2876 
set_pollset(grpc_transport * gt,grpc_stream *,grpc_pollset * pollset)2877 static void set_pollset(grpc_transport* gt, grpc_stream* /*gs*/,
2878                         grpc_pollset* pollset) {
2879   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
2880   grpc_endpoint_add_to_pollset(t->ep, pollset);
2881 }
2882 
set_pollset_set(grpc_transport * gt,grpc_stream *,grpc_pollset_set * pollset_set)2883 static void set_pollset_set(grpc_transport* gt, grpc_stream* /*gs*/,
2884                             grpc_pollset_set* pollset_set) {
2885   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
2886   grpc_endpoint_add_to_pollset_set(t->ep, pollset_set);
2887 }
2888 
2889 //
2890 // BYTE STREAM
2891 //
2892 
reset_byte_stream(void * arg,grpc_error * error)2893 static void reset_byte_stream(void* arg, grpc_error* error) {
2894   grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(arg);
2895   s->pending_byte_stream = false;
2896   if (error == GRPC_ERROR_NONE) {
2897     grpc_chttp2_maybe_complete_recv_message(s->t, s);
2898     grpc_chttp2_maybe_complete_recv_trailing_metadata(s->t, s);
2899   } else {
2900     GPR_ASSERT(error != GRPC_ERROR_NONE);
2901     grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->on_next, GRPC_ERROR_REF(error));
2902     s->on_next = nullptr;
2903     GRPC_ERROR_UNREF(s->byte_stream_error);
2904     s->byte_stream_error = GRPC_ERROR_NONE;
2905     grpc_chttp2_cancel_stream(s->t, s, GRPC_ERROR_REF(error));
2906     s->byte_stream_error = GRPC_ERROR_REF(error);
2907   }
2908 }
2909 
2910 namespace grpc_core {
2911 
Chttp2IncomingByteStream(grpc_chttp2_transport * transport,grpc_chttp2_stream * stream,uint32_t frame_size,uint32_t flags)2912 Chttp2IncomingByteStream::Chttp2IncomingByteStream(
2913     grpc_chttp2_transport* transport, grpc_chttp2_stream* stream,
2914     uint32_t frame_size, uint32_t flags)
2915     : ByteStream(frame_size, flags),
2916       transport_(transport),
2917       stream_(stream),
2918       refs_(2),
2919       remaining_bytes_(frame_size) {
2920   GRPC_ERROR_UNREF(stream->byte_stream_error);
2921   stream->byte_stream_error = GRPC_ERROR_NONE;
2922 }
2923 
OrphanLocked(void * arg,grpc_error *)2924 void Chttp2IncomingByteStream::OrphanLocked(void* arg,
2925                                             grpc_error* /*error_ignored*/) {
2926   Chttp2IncomingByteStream* bs = static_cast<Chttp2IncomingByteStream*>(arg);
2927   grpc_chttp2_stream* s = bs->stream_;
2928   grpc_chttp2_transport* t = s->t;
2929   bs->Unref();
2930   s->pending_byte_stream = false;
2931   grpc_chttp2_maybe_complete_recv_message(t, s);
2932   grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2933 }
2934 
Orphan()2935 void Chttp2IncomingByteStream::Orphan() {
2936   GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0);
2937   transport_->combiner->Run(
2938       GRPC_CLOSURE_INIT(&destroy_action_,
2939                         &Chttp2IncomingByteStream::OrphanLocked, this, nullptr),
2940       GRPC_ERROR_NONE);
2941 }
2942 
NextLocked(void * arg,grpc_error *)2943 void Chttp2IncomingByteStream::NextLocked(void* arg,
2944                                           grpc_error* /*error_ignored*/) {
2945   Chttp2IncomingByteStream* bs = static_cast<Chttp2IncomingByteStream*>(arg);
2946   grpc_chttp2_transport* t = bs->transport_;
2947   grpc_chttp2_stream* s = bs->stream_;
2948   size_t cur_length = s->frame_storage.length;
2949   if (!s->read_closed) {
2950     s->flow_control->IncomingByteStreamUpdate(bs->next_action_.max_size_hint,
2951                                               cur_length);
2952     grpc_chttp2_act_on_flowctl_action(s->flow_control->MakeAction(), t, s);
2953   }
2954   GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0);
2955   if (s->frame_storage.length > 0) {
2956     grpc_slice_buffer_swap(&s->frame_storage,
2957                            &s->unprocessed_incoming_frames_buffer);
2958     s->unprocessed_incoming_frames_decompressed = false;
2959     grpc_core::ExecCtx::Run(DEBUG_LOCATION, bs->next_action_.on_complete,
2960                             GRPC_ERROR_NONE);
2961   } else if (s->byte_stream_error != GRPC_ERROR_NONE) {
2962     grpc_core::ExecCtx::Run(DEBUG_LOCATION, bs->next_action_.on_complete,
2963                             GRPC_ERROR_REF(s->byte_stream_error));
2964     if (s->data_parser.parsing_frame != nullptr) {
2965       s->data_parser.parsing_frame->Unref();
2966       s->data_parser.parsing_frame = nullptr;
2967     }
2968   } else if (s->read_closed) {
2969     if (bs->remaining_bytes_ != 0) {
2970       s->byte_stream_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2971           "Truncated message", &s->read_closed_error, 1);
2972       grpc_core::ExecCtx::Run(DEBUG_LOCATION, bs->next_action_.on_complete,
2973                               GRPC_ERROR_REF(s->byte_stream_error));
2974       if (s->data_parser.parsing_frame != nullptr) {
2975         s->data_parser.parsing_frame->Unref();
2976         s->data_parser.parsing_frame = nullptr;
2977       }
2978     } else {
2979       // Should never reach here.
2980       GPR_ASSERT(false);
2981     }
2982   } else {
2983     s->on_next = bs->next_action_.on_complete;
2984   }
2985   bs->Unref();
2986 }
2987 
Next(size_t max_size_hint,grpc_closure * on_complete)2988 bool Chttp2IncomingByteStream::Next(size_t max_size_hint,
2989                                     grpc_closure* on_complete) {
2990   GPR_TIMER_SCOPE("incoming_byte_stream_next", 0);
2991   if (stream_->unprocessed_incoming_frames_buffer.length > 0) {
2992     return true;
2993   } else {
2994     Ref();
2995     next_action_.max_size_hint = max_size_hint;
2996     next_action_.on_complete = on_complete;
2997     transport_->combiner->Run(
2998         GRPC_CLOSURE_INIT(&next_action_.closure,
2999                           &Chttp2IncomingByteStream::NextLocked, this, nullptr),
3000         GRPC_ERROR_NONE);
3001     return false;
3002   }
3003 }
3004 
MaybeCreateStreamDecompressionCtx()3005 void Chttp2IncomingByteStream::MaybeCreateStreamDecompressionCtx() {
3006   GPR_DEBUG_ASSERT(stream_->stream_decompression_method !=
3007                    GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS);
3008   if (!stream_->stream_decompression_ctx) {
3009     stream_->stream_decompression_ctx = grpc_stream_compression_context_create(
3010         stream_->stream_decompression_method);
3011   }
3012 }
3013 
Pull(grpc_slice * slice)3014 grpc_error* Chttp2IncomingByteStream::Pull(grpc_slice* slice) {
3015   GPR_TIMER_SCOPE("incoming_byte_stream_pull", 0);
3016   grpc_error* error;
3017   if (stream_->unprocessed_incoming_frames_buffer.length > 0) {
3018     if (!stream_->unprocessed_incoming_frames_decompressed &&
3019         stream_->stream_decompression_method !=
3020             GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
3021       bool end_of_context;
3022       MaybeCreateStreamDecompressionCtx();
3023       if (!grpc_stream_decompress(stream_->stream_decompression_ctx,
3024                                   &stream_->unprocessed_incoming_frames_buffer,
3025                                   &stream_->decompressed_data_buffer, nullptr,
3026                                   MAX_SIZE_T, &end_of_context)) {
3027         error =
3028             GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream decompression error.");
3029         return error;
3030       }
3031       GPR_ASSERT(stream_->unprocessed_incoming_frames_buffer.length == 0);
3032       grpc_slice_buffer_swap(&stream_->unprocessed_incoming_frames_buffer,
3033                              &stream_->decompressed_data_buffer);
3034       stream_->unprocessed_incoming_frames_decompressed = true;
3035       if (end_of_context) {
3036         grpc_stream_compression_context_destroy(
3037             stream_->stream_decompression_ctx);
3038         stream_->stream_decompression_ctx = nullptr;
3039       }
3040       if (stream_->unprocessed_incoming_frames_buffer.length == 0) {
3041         *slice = grpc_empty_slice();
3042       }
3043     }
3044     error = grpc_deframe_unprocessed_incoming_frames(
3045         &stream_->data_parser, stream_,
3046         &stream_->unprocessed_incoming_frames_buffer, slice, nullptr);
3047     if (error != GRPC_ERROR_NONE) {
3048       return error;
3049     }
3050   } else {
3051     error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
3052     stream_->t->combiner->Run(&stream_->reset_byte_stream,
3053                               GRPC_ERROR_REF(error));
3054     return error;
3055   }
3056   return GRPC_ERROR_NONE;
3057 }
3058 
PublishError(grpc_error * error)3059 void Chttp2IncomingByteStream::PublishError(grpc_error* error) {
3060   GPR_ASSERT(error != GRPC_ERROR_NONE);
3061   grpc_core::ExecCtx::Run(DEBUG_LOCATION, stream_->on_next,
3062                           GRPC_ERROR_REF(error));
3063   stream_->on_next = nullptr;
3064   GRPC_ERROR_UNREF(stream_->byte_stream_error);
3065   stream_->byte_stream_error = GRPC_ERROR_REF(error);
3066   grpc_chttp2_cancel_stream(transport_, stream_, GRPC_ERROR_REF(error));
3067 }
3068 
Push(const grpc_slice & slice,grpc_slice * slice_out)3069 grpc_error* Chttp2IncomingByteStream::Push(const grpc_slice& slice,
3070                                            grpc_slice* slice_out) {
3071   if (remaining_bytes_ < GRPC_SLICE_LENGTH(slice)) {
3072     grpc_error* error =
3073         GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream");
3074     transport_->combiner->Run(&stream_->reset_byte_stream,
3075                               GRPC_ERROR_REF(error));
3076     grpc_slice_unref_internal(slice);
3077     return error;
3078   } else {
3079     remaining_bytes_ -= static_cast<uint32_t> GRPC_SLICE_LENGTH(slice);
3080     if (slice_out != nullptr) {
3081       *slice_out = slice;
3082     }
3083     return GRPC_ERROR_NONE;
3084   }
3085 }
3086 
Finished(grpc_error * error,bool reset_on_error)3087 grpc_error* Chttp2IncomingByteStream::Finished(grpc_error* error,
3088                                                bool reset_on_error) {
3089   if (error == GRPC_ERROR_NONE) {
3090     if (remaining_bytes_ != 0) {
3091       error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
3092     }
3093   }
3094   if (error != GRPC_ERROR_NONE && reset_on_error) {
3095     transport_->combiner->Run(&stream_->reset_byte_stream,
3096                               GRPC_ERROR_REF(error));
3097   }
3098   Unref();
3099   return error;
3100 }
3101 
Shutdown(grpc_error * error)3102 void Chttp2IncomingByteStream::Shutdown(grpc_error* error) {
3103   GRPC_ERROR_UNREF(Finished(error, true /* reset_on_error */));
3104 }
3105 
3106 }  // namespace grpc_core
3107 
3108 //
3109 // RESOURCE QUOTAS
3110 //
3111 
post_benign_reclaimer(grpc_chttp2_transport * t)3112 static void post_benign_reclaimer(grpc_chttp2_transport* t) {
3113   if (!t->benign_reclaimer_registered) {
3114     t->benign_reclaimer_registered = true;
3115     GRPC_CHTTP2_REF_TRANSPORT(t, "benign_reclaimer");
3116     GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked, benign_reclaimer, t,
3117                       grpc_schedule_on_exec_ctx);
3118     grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep),
3119                                       false, &t->benign_reclaimer_locked);
3120   }
3121 }
3122 
post_destructive_reclaimer(grpc_chttp2_transport * t)3123 static void post_destructive_reclaimer(grpc_chttp2_transport* t) {
3124   if (!t->destructive_reclaimer_registered) {
3125     t->destructive_reclaimer_registered = true;
3126     GRPC_CHTTP2_REF_TRANSPORT(t, "destructive_reclaimer");
3127     GRPC_CLOSURE_INIT(&t->destructive_reclaimer_locked, destructive_reclaimer,
3128                       t, grpc_schedule_on_exec_ctx);
3129     grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep),
3130                                       true, &t->destructive_reclaimer_locked);
3131   }
3132 }
3133 
benign_reclaimer(void * arg,grpc_error * error)3134 static void benign_reclaimer(void* arg, grpc_error* error) {
3135   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
3136   t->combiner->Run(GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked,
3137                                      benign_reclaimer_locked, t, nullptr),
3138                    GRPC_ERROR_REF(error));
3139 }
3140 
benign_reclaimer_locked(void * arg,grpc_error * error)3141 static void benign_reclaimer_locked(void* arg, grpc_error* error) {
3142   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
3143   if (error == GRPC_ERROR_NONE &&
3144       grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
3145     // Channel with no active streams: send a goaway to try and make it
3146     // disconnect cleanly
3147     if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
3148       gpr_log(GPR_INFO, "HTTP2: %s - send goaway to free memory",
3149               t->peer_string.c_str());
3150     }
3151     send_goaway(t,
3152                 grpc_error_set_int(
3153                     GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"),
3154                     GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM));
3155   } else if (error == GRPC_ERROR_NONE &&
3156              GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
3157     gpr_log(GPR_INFO,
3158             "HTTP2: %s - skip benign reclamation, there are still %" PRIdPTR
3159             " streams",
3160             t->peer_string.c_str(),
3161             grpc_chttp2_stream_map_size(&t->stream_map));
3162   }
3163   t->benign_reclaimer_registered = false;
3164   if (error != GRPC_ERROR_CANCELLED) {
3165     grpc_resource_user_finish_reclamation(
3166         grpc_endpoint_get_resource_user(t->ep));
3167   }
3168   GRPC_CHTTP2_UNREF_TRANSPORT(t, "benign_reclaimer");
3169 }
3170 
destructive_reclaimer(void * arg,grpc_error * error)3171 static void destructive_reclaimer(void* arg, grpc_error* error) {
3172   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
3173   t->combiner->Run(GRPC_CLOSURE_INIT(&t->destructive_reclaimer_locked,
3174                                      destructive_reclaimer_locked, t, nullptr),
3175                    GRPC_ERROR_REF(error));
3176 }
3177 
destructive_reclaimer_locked(void * arg,grpc_error * error)3178 static void destructive_reclaimer_locked(void* arg, grpc_error* error) {
3179   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
3180   size_t n = grpc_chttp2_stream_map_size(&t->stream_map);
3181   t->destructive_reclaimer_registered = false;
3182   if (error == GRPC_ERROR_NONE && n > 0) {
3183     grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(
3184         grpc_chttp2_stream_map_rand(&t->stream_map));
3185     if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
3186       gpr_log(GPR_INFO, "HTTP2: %s - abandon stream id %d",
3187               t->peer_string.c_str(), s->id);
3188     }
3189     grpc_chttp2_cancel_stream(
3190         t, s,
3191         grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"),
3192                            GRPC_ERROR_INT_HTTP2_ERROR,
3193                            GRPC_HTTP2_ENHANCE_YOUR_CALM));
3194     if (n > 1) {
3195       // Since we cancel one stream per destructive reclamation, if
3196       //   there are more streams left, we can immediately post a new
3197       //   reclaimer in case the resource quota needs to free more
3198       //   memory
3199       post_destructive_reclaimer(t);
3200     }
3201   }
3202   if (error != GRPC_ERROR_CANCELLED) {
3203     grpc_resource_user_finish_reclamation(
3204         grpc_endpoint_get_resource_user(t->ep));
3205   }
3206   GRPC_CHTTP2_UNREF_TRANSPORT(t, "destructive_reclaimer");
3207 }
3208 
3209 //
3210 // MONITORING
3211 //
3212 
grpc_chttp2_initiate_write_reason_string(grpc_chttp2_initiate_write_reason reason)3213 const char* grpc_chttp2_initiate_write_reason_string(
3214     grpc_chttp2_initiate_write_reason reason) {
3215   switch (reason) {
3216     case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE:
3217       return "INITIAL_WRITE";
3218     case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM:
3219       return "START_NEW_STREAM";
3220     case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE:
3221       return "SEND_MESSAGE";
3222     case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA:
3223       return "SEND_INITIAL_METADATA";
3224     case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA:
3225       return "SEND_TRAILING_METADATA";
3226     case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING:
3227       return "RETRY_SEND_PING";
3228     case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS:
3229       return "CONTINUE_PINGS";
3230     case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT:
3231       return "GOAWAY_SENT";
3232     case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM:
3233       return "RST_STREAM";
3234     case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API:
3235       return "CLOSE_FROM_API";
3236     case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL:
3237       return "STREAM_FLOW_CONTROL";
3238     case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL:
3239       return "TRANSPORT_FLOW_CONTROL";
3240     case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS:
3241       return "SEND_SETTINGS";
3242     case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING:
3243       return "FLOW_CONTROL_UNSTALLED_BY_SETTING";
3244     case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE:
3245       return "FLOW_CONTROL_UNSTALLED_BY_UPDATE";
3246     case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING:
3247       return "APPLICATION_PING";
3248     case GRPC_CHTTP2_INITIATE_WRITE_BDP_PING:
3249       return "BDP_PING";
3250     case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING:
3251       return "KEEPALIVE_PING";
3252     case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED:
3253       return "TRANSPORT_FLOW_CONTROL_UNSTALLED";
3254     case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE:
3255       return "PING_RESPONSE";
3256     case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM:
3257       return "FORCE_RST_STREAM";
3258   }
3259   GPR_UNREACHABLE_CODE(return "unknown");
3260 }
3261 
chttp2_get_endpoint(grpc_transport * t)3262 static grpc_endpoint* chttp2_get_endpoint(grpc_transport* t) {
3263   return (reinterpret_cast<grpc_chttp2_transport*>(t))->ep;
3264 }
3265 
3266 static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
3267                                              "chttp2",
3268                                              init_stream,
3269                                              set_pollset,
3270                                              set_pollset_set,
3271                                              perform_stream_op,
3272                                              perform_transport_op,
3273                                              destroy_stream,
3274                                              destroy_transport,
3275                                              chttp2_get_endpoint};
3276 
get_vtable(void)3277 static const grpc_transport_vtable* get_vtable(void) { return &vtable; }
3278 
3279 grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode>
grpc_chttp2_transport_get_socket_node(grpc_transport * transport)3280 grpc_chttp2_transport_get_socket_node(grpc_transport* transport) {
3281   grpc_chttp2_transport* t =
3282       reinterpret_cast<grpc_chttp2_transport*>(transport);
3283   return t->channelz_socket;
3284 }
3285 
grpc_create_chttp2_transport(const grpc_channel_args * channel_args,grpc_endpoint * ep,bool is_client,grpc_resource_user * resource_user)3286 grpc_transport* grpc_create_chttp2_transport(
3287     const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client,
3288     grpc_resource_user* resource_user) {
3289   auto t =
3290       new grpc_chttp2_transport(channel_args, ep, is_client, resource_user);
3291   return &t->base;
3292 }
3293 
grpc_chttp2_transport_start_reading(grpc_transport * transport,grpc_slice_buffer * read_buffer,grpc_closure * notify_on_receive_settings)3294 void grpc_chttp2_transport_start_reading(
3295     grpc_transport* transport, grpc_slice_buffer* read_buffer,
3296     grpc_closure* notify_on_receive_settings) {
3297   grpc_chttp2_transport* t =
3298       reinterpret_cast<grpc_chttp2_transport*>(transport);
3299   GRPC_CHTTP2_REF_TRANSPORT(
3300       t, "reading_action"); /* matches unref inside reading_action */
3301   if (read_buffer != nullptr) {
3302     grpc_slice_buffer_move_into(read_buffer, &t->read_buffer);
3303     gpr_free(read_buffer);
3304   }
3305   t->notify_on_receive_settings = notify_on_receive_settings;
3306   t->combiner->Run(
3307       GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t, nullptr),
3308       GRPC_ERROR_NONE);
3309 }
3310