• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
22 
23 #include <grpc/slice_buffer.h>
24 #include <grpc/support/alloc.h>
25 #include <grpc/support/log.h>
26 #include <grpc/support/port_platform.h>
27 #include <grpc/support/string_util.h>
28 #include <inttypes.h>
29 #include <limits.h>
30 #include <math.h>
31 #include <stdio.h>
32 #include <string.h>
33 
34 #include "absl/strings/str_format.h"
35 #include "src/core/ext/transport/chttp2/transport/context_list.h"
36 #include "src/core/ext/transport/chttp2/transport/frame_data.h"
37 #include "src/core/ext/transport/chttp2/transport/internal.h"
38 #include "src/core/ext/transport/chttp2/transport/varint.h"
39 #include "src/core/lib/channel/channel_args.h"
40 #include "src/core/lib/compression/stream_compression.h"
41 #include "src/core/lib/debug/stats.h"
42 #include "src/core/lib/gpr/env.h"
43 #include "src/core/lib/gpr/string.h"
44 #include "src/core/lib/gprpp/memory.h"
45 #include "src/core/lib/http/parser.h"
46 #include "src/core/lib/iomgr/executor.h"
47 #include "src/core/lib/iomgr/iomgr.h"
48 #include "src/core/lib/iomgr/timer.h"
49 #include "src/core/lib/profiling/timers.h"
50 #include "src/core/lib/slice/slice_internal.h"
51 #include "src/core/lib/slice/slice_string_helpers.h"
52 #include "src/core/lib/transport/error_utils.h"
53 #include "src/core/lib/transport/http2_errors.h"
54 #include "src/core/lib/transport/static_metadata.h"
55 #include "src/core/lib/transport/status_conversion.h"
56 #include "src/core/lib/transport/timeout_encoding.h"
57 #include "src/core/lib/transport/transport.h"
58 #include "src/core/lib/transport/transport_impl.h"
59 #include "src/core/lib/uri/uri_parser.h"
60 
61 #define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
62 #define MAX_WINDOW 0x7fffffffu
63 #define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024)
64 #define DEFAULT_MAX_HEADER_LIST_SIZE (8 * 1024)
65 
66 #define DEFAULT_CLIENT_KEEPALIVE_TIME_MS INT_MAX
67 #define DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_MS 20000 /* 20 seconds */
68 #define DEFAULT_SERVER_KEEPALIVE_TIME_MS 7200000  /* 2 hours */
69 #define DEFAULT_SERVER_KEEPALIVE_TIMEOUT_MS 20000 /* 20 seconds */
70 #define DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS false
71 #define KEEPALIVE_TIME_BACKOFF_MULTIPLIER 2
72 
73 #define DEFAULT_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS 300000 /* 5 minutes */
74 #define DEFAULT_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS 300000 /* 5 minutes */
75 #define DEFAULT_MAX_PINGS_BETWEEN_DATA 2
76 #define DEFAULT_MAX_PING_STRIKES 2
77 
78 #define DEFAULT_MAX_PENDING_INDUCED_FRAMES 10000
79 
80 static int g_default_client_keepalive_time_ms =
81     DEFAULT_CLIENT_KEEPALIVE_TIME_MS;
82 static int g_default_client_keepalive_timeout_ms =
83     DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_MS;
84 static int g_default_server_keepalive_time_ms =
85     DEFAULT_SERVER_KEEPALIVE_TIME_MS;
86 static int g_default_server_keepalive_timeout_ms =
87     DEFAULT_SERVER_KEEPALIVE_TIMEOUT_MS;
88 static bool g_default_client_keepalive_permit_without_calls =
89     DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS;
90 static bool g_default_server_keepalive_permit_without_calls =
91     DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS;
92 
93 static int g_default_min_sent_ping_interval_without_data_ms =
94     DEFAULT_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS;
95 static int g_default_min_recv_ping_interval_without_data_ms =
96     DEFAULT_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS;
97 static int g_default_max_pings_without_data = DEFAULT_MAX_PINGS_BETWEEN_DATA;
98 static int g_default_max_ping_strikes = DEFAULT_MAX_PING_STRIKES;
99 
100 #define MAX_CLIENT_STREAM_ID 0x7fffffffu
101 grpc_core::TraceFlag grpc_http_trace(false, "http");
102 grpc_core::TraceFlag grpc_keepalive_trace(false, "http_keepalive");
103 grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount(false,
104                                                          "chttp2_refcount");
105 
106 /* forward declarations of various callbacks that we'll build closures around */
107 static void write_action_begin_locked(void* t, grpc_error* error);
108 static void write_action(void* t, grpc_error* error);
109 static void write_action_end(void* t, grpc_error* error);
110 static void write_action_end_locked(void* t, grpc_error* error);
111 
112 static void read_action(void* t, grpc_error* error);
113 static void read_action_locked(void* t, grpc_error* error);
114 static void continue_read_action_locked(grpc_chttp2_transport* t);
115 
116 static void complete_fetch(void* gs, grpc_error* error);
117 static void complete_fetch_locked(void* gs, grpc_error* error);
118 /** Set a transport level setting, and push it to our peer */
119 static void queue_setting_update(grpc_chttp2_transport* t,
120                                  grpc_chttp2_setting_id id, uint32_t value);
121 
122 static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
123                            grpc_error* error);
124 
125 /** Start new streams that have been created if we can */
126 static void maybe_start_some_streams(grpc_chttp2_transport* t);
127 
128 static void connectivity_state_set(grpc_chttp2_transport* t,
129                                    grpc_connectivity_state state,
130                                    const char* reason);
131 
132 static void benign_reclaimer(void* t, grpc_error* error);
133 static void destructive_reclaimer(void* t, grpc_error* error);
134 static void benign_reclaimer_locked(void* t, grpc_error* error);
135 static void destructive_reclaimer_locked(void* t, grpc_error* error);
136 
137 static void post_benign_reclaimer(grpc_chttp2_transport* t);
138 static void post_destructive_reclaimer(grpc_chttp2_transport* t);
139 
140 static void close_transport_locked(grpc_chttp2_transport* t, grpc_error* error);
141 static void end_all_the_calls(grpc_chttp2_transport* t, grpc_error* error);
142 
143 static void schedule_bdp_ping_locked(grpc_chttp2_transport* t);
144 static void start_bdp_ping(void* tp, grpc_error* error);
145 static void finish_bdp_ping(void* tp, grpc_error* error);
146 static void start_bdp_ping_locked(void* tp, grpc_error* error);
147 static void finish_bdp_ping_locked(void* tp, grpc_error* error);
148 static void next_bdp_ping_timer_expired(void* tp, grpc_error* error);
149 static void next_bdp_ping_timer_expired_locked(void* tp, grpc_error* error);
150 
151 static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error);
152 static void send_ping_locked(grpc_chttp2_transport* t,
153                              grpc_closure* on_initiate,
154                              grpc_closure* on_complete);
155 static void retry_initiate_ping_locked(void* tp, grpc_error* error);
156 
157 /** keepalive-relevant functions */
158 static void init_keepalive_ping(void* arg, grpc_error* error);
159 static void init_keepalive_ping_locked(void* arg, grpc_error* error);
160 static void start_keepalive_ping(void* arg, grpc_error* error);
161 static void finish_keepalive_ping(void* arg, grpc_error* error);
162 static void start_keepalive_ping_locked(void* arg, grpc_error* error);
163 static void finish_keepalive_ping_locked(void* arg, grpc_error* error);
164 static void keepalive_watchdog_fired(void* arg, grpc_error* error);
165 static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error);
166 
167 static void reset_byte_stream(void* arg, grpc_error* error);
168 
169 // Flow control default enabled. Can be disabled by setting
170 // GRPC_EXPERIMENTAL_DISABLE_FLOW_CONTROL
171 bool g_flow_control_enabled = true;
172 
173 /*******************************************************************************
174  * CONSTRUCTION/DESTRUCTION/REFCOUNTING
175  */
176 
~grpc_chttp2_transport()177 grpc_chttp2_transport::~grpc_chttp2_transport() {
178   size_t i;
179 
180   if (channelz_socket != nullptr) {
181     channelz_socket.reset();
182   }
183 
184   grpc_endpoint_destroy(ep);
185 
186   grpc_slice_buffer_destroy_internal(&qbuf);
187 
188   grpc_slice_buffer_destroy_internal(&outbuf);
189   grpc_chttp2_hpack_compressor_destroy(&hpack_compressor);
190 
191   grpc_error* error =
192       GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed");
193   // ContextList::Execute follows semantics of a callback function and does not
194   // take a ref on error
195   grpc_core::ContextList::Execute(cl, nullptr, error);
196   GRPC_ERROR_UNREF(error);
197   cl = nullptr;
198 
199   grpc_slice_buffer_destroy_internal(&read_buffer);
200   grpc_chttp2_hpack_parser_destroy(&hpack_parser);
201   grpc_chttp2_goaway_parser_destroy(&goaway_parser);
202 
203   for (i = 0; i < STREAM_LIST_COUNT; i++) {
204     GPR_ASSERT(lists[i].head == nullptr);
205     GPR_ASSERT(lists[i].tail == nullptr);
206   }
207 
208   GRPC_ERROR_UNREF(goaway_error);
209 
210   GPR_ASSERT(grpc_chttp2_stream_map_size(&stream_map) == 0);
211 
212   grpc_chttp2_stream_map_destroy(&stream_map);
213 
214   GRPC_COMBINER_UNREF(combiner, "chttp2_transport");
215 
216   cancel_pings(this,
217                GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"));
218 
219   while (write_cb_pool) {
220     grpc_chttp2_write_cb* next = write_cb_pool->next;
221     gpr_free(write_cb_pool);
222     write_cb_pool = next;
223   }
224 
225   flow_control.Destroy();
226 
227   GRPC_ERROR_UNREF(closed_with_error);
228   gpr_free(ping_acks);
229   gpr_free(peer_string);
230 }
231 
232 static const grpc_transport_vtable* get_vtable(void);
233 
234 /* Returns whether bdp is enabled */
read_channel_args(grpc_chttp2_transport * t,const grpc_channel_args * channel_args,bool is_client)235 static bool read_channel_args(grpc_chttp2_transport* t,
236                               const grpc_channel_args* channel_args,
237                               bool is_client) {
238   bool enable_bdp = true;
239   bool channelz_enabled = GRPC_ENABLE_CHANNELZ_DEFAULT;
240   size_t i;
241   int j;
242 
243   for (i = 0; i < channel_args->num_args; i++) {
244     if (0 == strcmp(channel_args->args[i].key,
245                     GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER)) {
246       const grpc_integer_options options = {-1, 0, INT_MAX};
247       const int value =
248           grpc_channel_arg_get_integer(&channel_args->args[i], options);
249       if (value >= 0) {
250         if ((t->next_stream_id & 1) != (value & 1)) {
251           gpr_log(GPR_ERROR, "%s: low bit must be %d on %s",
252                   GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER, t->next_stream_id & 1,
253                   is_client ? "client" : "server");
254         } else {
255           t->next_stream_id = static_cast<uint32_t>(value);
256         }
257       }
258     } else if (0 == strcmp(channel_args->args[i].key,
259                            GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER)) {
260       const grpc_integer_options options = {-1, 0, INT_MAX};
261       const int value =
262           grpc_channel_arg_get_integer(&channel_args->args[i], options);
263       if (value >= 0) {
264         grpc_chttp2_hpack_compressor_set_max_usable_size(
265             &t->hpack_compressor, static_cast<uint32_t>(value));
266       }
267     } else if (0 == strcmp(channel_args->args[i].key,
268                            GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) {
269       t->ping_policy.max_pings_without_data = grpc_channel_arg_get_integer(
270           &channel_args->args[i],
271           {g_default_max_pings_without_data, 0, INT_MAX});
272     } else if (0 == strcmp(channel_args->args[i].key,
273                            GRPC_ARG_HTTP2_MAX_PING_STRIKES)) {
274       t->ping_policy.max_ping_strikes = grpc_channel_arg_get_integer(
275           &channel_args->args[i], {g_default_max_ping_strikes, 0, INT_MAX});
276     } else if (0 ==
277                strcmp(channel_args->args[i].key,
278                       GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS)) {
279       t->ping_policy.min_sent_ping_interval_without_data =
280           grpc_channel_arg_get_integer(
281               &channel_args->args[i],
282               grpc_integer_options{
283                   g_default_min_sent_ping_interval_without_data_ms, 0,
284                   INT_MAX});
285     } else if (0 ==
286                strcmp(channel_args->args[i].key,
287                       GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS)) {
288       t->ping_policy.min_recv_ping_interval_without_data =
289           grpc_channel_arg_get_integer(
290               &channel_args->args[i],
291               grpc_integer_options{
292                   g_default_min_recv_ping_interval_without_data_ms, 0,
293                   INT_MAX});
294     } else if (0 == strcmp(channel_args->args[i].key,
295                            GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)) {
296       t->write_buffer_size = static_cast<uint32_t>(grpc_channel_arg_get_integer(
297           &channel_args->args[i], {0, 0, MAX_WRITE_BUFFER_SIZE}));
298     } else if (0 ==
299                strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) {
300       enable_bdp = grpc_channel_arg_get_bool(&channel_args->args[i], true);
301     } else if (0 ==
302                strcmp(channel_args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) {
303       const int value = grpc_channel_arg_get_integer(
304           &channel_args->args[i],
305           grpc_integer_options{t->is_client
306                                    ? g_default_client_keepalive_time_ms
307                                    : g_default_server_keepalive_time_ms,
308                                1, INT_MAX});
309       t->keepalive_time = value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value;
310     } else if (0 == strcmp(channel_args->args[i].key,
311                            GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) {
312       const int value = grpc_channel_arg_get_integer(
313           &channel_args->args[i],
314           grpc_integer_options{t->is_client
315                                    ? g_default_client_keepalive_timeout_ms
316                                    : g_default_server_keepalive_timeout_ms,
317                                0, INT_MAX});
318       t->keepalive_timeout = value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value;
319     } else if (0 == strcmp(channel_args->args[i].key,
320                            GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) {
321       t->keepalive_permit_without_calls = static_cast<uint32_t>(
322           grpc_channel_arg_get_integer(&channel_args->args[i], {0, 0, 1}));
323     } else if (0 == strcmp(channel_args->args[i].key,
324                            GRPC_ARG_OPTIMIZATION_TARGET)) {
325       gpr_log(GPR_INFO, "GRPC_ARG_OPTIMIZATION_TARGET is deprecated");
326     } else if (0 ==
327                strcmp(channel_args->args[i].key, GRPC_ARG_ENABLE_CHANNELZ)) {
328       channelz_enabled = grpc_channel_arg_get_bool(
329           &channel_args->args[i], GRPC_ENABLE_CHANNELZ_DEFAULT);
330     } else {
331       static const struct {
332         const char* channel_arg_name;
333         grpc_chttp2_setting_id setting_id;
334         grpc_integer_options integer_options;
335         bool availability[2] /* server, client */;
336       } settings_map[] = {{GRPC_ARG_MAX_CONCURRENT_STREAMS,
337                            GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
338                            {-1, 0, INT32_MAX},
339                            {true, false}},
340                           {GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER,
341                            GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE,
342                            {-1, 0, INT32_MAX},
343                            {true, true}},
344                           {GRPC_ARG_MAX_METADATA_SIZE,
345                            GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
346                            {-1, 0, INT32_MAX},
347                            {true, true}},
348                           {GRPC_ARG_HTTP2_MAX_FRAME_SIZE,
349                            GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
350                            {-1, 16384, 16777215},
351                            {true, true}},
352                           {GRPC_ARG_HTTP2_ENABLE_TRUE_BINARY,
353                            GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA,
354                            {1, 0, 1},
355                            {true, true}},
356                           {GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES,
357                            GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
358                            {-1, 5, INT32_MAX},
359                            {true, true}}};
360       for (j = 0; j < static_cast<int> GPR_ARRAY_SIZE(settings_map); j++) {
361         if (0 == strcmp(channel_args->args[i].key,
362                         settings_map[j].channel_arg_name)) {
363           if (!settings_map[j].availability[is_client]) {
364             gpr_log(GPR_DEBUG, "%s is not available on %s",
365                     settings_map[j].channel_arg_name,
366                     is_client ? "clients" : "servers");
367           } else {
368             int value = grpc_channel_arg_get_integer(
369                 &channel_args->args[i], settings_map[j].integer_options);
370             if (value >= 0) {
371               queue_setting_update(t, settings_map[j].setting_id,
372                                    static_cast<uint32_t>(value));
373             }
374           }
375           break;
376         }
377       }
378     }
379   }
380   if (channelz_enabled) {
381     // TODO(ncteisen): add an API to endpoint to query for local addr, and pass
382     // it in here, so SocketNode knows its own address.
383     t->channelz_socket =
384         grpc_core::MakeRefCounted<grpc_core::channelz::SocketNode>(
385             "", t->peer_string,
386             absl::StrFormat("%s %s", get_vtable()->name, t->peer_string));
387   }
388   return enable_bdp;
389 }
390 
init_transport_keepalive_settings(grpc_chttp2_transport * t)391 static void init_transport_keepalive_settings(grpc_chttp2_transport* t) {
392   if (t->is_client) {
393     t->keepalive_time = g_default_client_keepalive_time_ms == INT_MAX
394                             ? GRPC_MILLIS_INF_FUTURE
395                             : g_default_client_keepalive_time_ms;
396     t->keepalive_timeout = g_default_client_keepalive_timeout_ms == INT_MAX
397                                ? GRPC_MILLIS_INF_FUTURE
398                                : g_default_client_keepalive_timeout_ms;
399     t->keepalive_permit_without_calls =
400         g_default_client_keepalive_permit_without_calls;
401   } else {
402     t->keepalive_time = g_default_server_keepalive_time_ms == INT_MAX
403                             ? GRPC_MILLIS_INF_FUTURE
404                             : g_default_server_keepalive_time_ms;
405     t->keepalive_timeout = g_default_server_keepalive_timeout_ms == INT_MAX
406                                ? GRPC_MILLIS_INF_FUTURE
407                                : g_default_server_keepalive_timeout_ms;
408     t->keepalive_permit_without_calls =
409         g_default_server_keepalive_permit_without_calls;
410   }
411 }
412 
configure_transport_ping_policy(grpc_chttp2_transport * t)413 static void configure_transport_ping_policy(grpc_chttp2_transport* t) {
414   t->ping_policy.max_pings_without_data = g_default_max_pings_without_data;
415   t->ping_policy.min_sent_ping_interval_without_data =
416       g_default_min_sent_ping_interval_without_data_ms;
417   t->ping_policy.max_ping_strikes = g_default_max_ping_strikes;
418   t->ping_policy.min_recv_ping_interval_without_data =
419       g_default_min_recv_ping_interval_without_data_ms;
420 }
421 
init_keepalive_pings_if_enabled(grpc_chttp2_transport * t)422 static void init_keepalive_pings_if_enabled(grpc_chttp2_transport* t) {
423   if (t->keepalive_time != GRPC_MILLIS_INF_FUTURE) {
424     t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
425     GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
426     GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t,
427                       grpc_schedule_on_exec_ctx);
428     grpc_timer_init(&t->keepalive_ping_timer,
429                     grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
430                     &t->init_keepalive_ping_locked);
431   } else {
432     /* Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no
433        inflight keeaplive timers */
434     t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED;
435   }
436 }
437 
grpc_chttp2_transport(const grpc_channel_args * channel_args,grpc_endpoint * ep,bool is_client,grpc_resource_user * resource_user)438 grpc_chttp2_transport::grpc_chttp2_transport(
439     const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client,
440     grpc_resource_user* resource_user)
441     : refs(1, &grpc_trace_chttp2_refcount),
442       ep(ep),
443       peer_string(grpc_endpoint_get_peer(ep)),
444       resource_user(resource_user),
445       combiner(grpc_combiner_create()),
446       state_tracker(is_client ? "client_transport" : "server_transport",
447                     GRPC_CHANNEL_READY),
448       is_client(is_client),
449       next_stream_id(is_client ? 1 : 2),
450       deframe_state(is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0) {
451   GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) ==
452              GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
453   base.vtable = get_vtable();
454   /* 8 is a random stab in the dark as to a good initial size: it's small enough
455      that it shouldn't waste memory for infrequently used connections, yet
456      large enough that the exponential growth should happen nicely when it's
457      needed.
458      TODO(ctiller): tune this */
459   grpc_chttp2_stream_map_init(&stream_map, 8);
460 
461   grpc_slice_buffer_init(&read_buffer);
462   grpc_slice_buffer_init(&outbuf);
463   if (is_client) {
464     grpc_slice_buffer_add(&outbuf, grpc_slice_from_copied_string(
465                                        GRPC_CHTTP2_CLIENT_CONNECT_STRING));
466   }
467   grpc_chttp2_hpack_compressor_init(&hpack_compressor);
468   grpc_slice_buffer_init(&qbuf);
469   /* copy in initial settings to all setting sets */
470   size_t i;
471   int j;
472   for (i = 0; i < GRPC_CHTTP2_NUM_SETTINGS; i++) {
473     for (j = 0; j < GRPC_NUM_SETTING_SETS; j++) {
474       settings[j][i] = grpc_chttp2_settings_parameters[i].default_value;
475     }
476   }
477   grpc_chttp2_hpack_parser_init(&hpack_parser);
478   grpc_chttp2_goaway_parser_init(&goaway_parser);
479 
480   /* configure http2 the way we like it */
481   if (is_client) {
482     queue_setting_update(this, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
483     queue_setting_update(this, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
484   }
485   queue_setting_update(this, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
486                        DEFAULT_MAX_HEADER_LIST_SIZE);
487   queue_setting_update(this,
488                        GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, 1);
489 
490   configure_transport_ping_policy(this);
491   init_transport_keepalive_settings(this);
492 
493   bool enable_bdp = true;
494   if (channel_args) {
495     enable_bdp = read_channel_args(this, channel_args, is_client);
496   }
497 
498   if (g_flow_control_enabled) {
499     flow_control.Init<grpc_core::chttp2::TransportFlowControl>(this,
500                                                                enable_bdp);
501   } else {
502     flow_control.Init<grpc_core::chttp2::TransportFlowControlDisabled>(this);
503     enable_bdp = false;
504   }
505 
506   /* No pings allowed before receiving a header or data frame. */
507   ping_state.pings_before_data_required = 0;
508   ping_state.is_delayed_ping_timer_set = false;
509   ping_state.last_ping_sent_time = GRPC_MILLIS_INF_PAST;
510 
511   ping_recv_state.last_ping_recv_time = GRPC_MILLIS_INF_PAST;
512   ping_recv_state.ping_strikes = 0;
513 
514   init_keepalive_pings_if_enabled(this);
515 
516   if (enable_bdp) {
517     GRPC_CHTTP2_REF_TRANSPORT(this, "bdp_ping");
518     schedule_bdp_ping_locked(this);
519     grpc_chttp2_act_on_flowctl_action(flow_control->PeriodicUpdate(), this,
520                                       nullptr);
521   }
522 
523   grpc_chttp2_initiate_write(this, GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE);
524   post_benign_reclaimer(this);
525 }
526 
destroy_transport_locked(void * tp,grpc_error *)527 static void destroy_transport_locked(void* tp, grpc_error* /*error*/) {
528   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
529   t->destroying = 1;
530   close_transport_locked(
531       t, grpc_error_set_int(
532              GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"),
533              GRPC_ERROR_INT_OCCURRED_DURING_WRITE, t->write_state));
534   // Must be the last line.
535   GRPC_CHTTP2_UNREF_TRANSPORT(t, "destroy");
536 }
537 
destroy_transport(grpc_transport * gt)538 static void destroy_transport(grpc_transport* gt) {
539   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
540   t->combiner->Run(GRPC_CLOSURE_CREATE(destroy_transport_locked, t, nullptr),
541                    GRPC_ERROR_NONE);
542 }
543 
close_transport_locked(grpc_chttp2_transport * t,grpc_error * error)544 static void close_transport_locked(grpc_chttp2_transport* t,
545                                    grpc_error* error) {
546   end_all_the_calls(t, GRPC_ERROR_REF(error));
547   cancel_pings(t, GRPC_ERROR_REF(error));
548   if (t->closed_with_error == GRPC_ERROR_NONE) {
549     if (!grpc_error_has_clear_grpc_status(error)) {
550       error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
551                                  GRPC_STATUS_UNAVAILABLE);
552     }
553     if (t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE) {
554       if (t->close_transport_on_writes_finished == nullptr) {
555         t->close_transport_on_writes_finished =
556             GRPC_ERROR_CREATE_FROM_STATIC_STRING(
557                 "Delayed close due to in-progress write");
558       }
559       t->close_transport_on_writes_finished =
560           grpc_error_add_child(t->close_transport_on_writes_finished, error);
561       return;
562     }
563     GPR_ASSERT(error != GRPC_ERROR_NONE);
564     t->closed_with_error = GRPC_ERROR_REF(error);
565     connectivity_state_set(t, GRPC_CHANNEL_SHUTDOWN, "close_transport");
566     if (t->ping_state.is_delayed_ping_timer_set) {
567       grpc_timer_cancel(&t->ping_state.delayed_ping_timer);
568     }
569     if (t->have_next_bdp_ping_timer) {
570       grpc_timer_cancel(&t->next_bdp_ping_timer);
571     }
572     switch (t->keepalive_state) {
573       case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING:
574         grpc_timer_cancel(&t->keepalive_ping_timer);
575         break;
576       case GRPC_CHTTP2_KEEPALIVE_STATE_PINGING:
577         grpc_timer_cancel(&t->keepalive_ping_timer);
578         grpc_timer_cancel(&t->keepalive_watchdog_timer);
579         break;
580       case GRPC_CHTTP2_KEEPALIVE_STATE_DYING:
581       case GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED:
582         /* keepalive timers are not set in these two states */
583         break;
584     }
585 
586     /* flush writable stream list to avoid dangling references */
587     grpc_chttp2_stream* s;
588     while (grpc_chttp2_list_pop_writable_stream(t, &s)) {
589       GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:close");
590     }
591     GPR_ASSERT(t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE);
592     grpc_endpoint_shutdown(t->ep, GRPC_ERROR_REF(error));
593   }
594   if (t->notify_on_receive_settings != nullptr) {
595     grpc_core::ExecCtx::Run(DEBUG_LOCATION, t->notify_on_receive_settings,
596                             GRPC_ERROR_CANCELLED);
597     t->notify_on_receive_settings = nullptr;
598   }
599   GRPC_ERROR_UNREF(error);
600 }
601 
602 #ifndef NDEBUG
grpc_chttp2_stream_ref(grpc_chttp2_stream * s,const char * reason)603 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s, const char* reason) {
604   grpc_stream_ref(s->refcount, reason);
605 }
grpc_chttp2_stream_unref(grpc_chttp2_stream * s,const char * reason)606 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s, const char* reason) {
607   grpc_stream_unref(s->refcount, reason);
608 }
609 #else
grpc_chttp2_stream_ref(grpc_chttp2_stream * s)610 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s) {
611   grpc_stream_ref(s->refcount);
612 }
grpc_chttp2_stream_unref(grpc_chttp2_stream * s)613 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s) {
614   grpc_stream_unref(s->refcount);
615 }
616 #endif
617 
Reffer(grpc_chttp2_stream * s)618 grpc_chttp2_stream::Reffer::Reffer(grpc_chttp2_stream* s) {
619   /* We reserve one 'active stream' that's dropped when the stream is
620      read-closed. The others are for Chttp2IncomingByteStreams that are
621      actively reading */
622   GRPC_CHTTP2_STREAM_REF(s, "chttp2");
623   GRPC_CHTTP2_REF_TRANSPORT(s->t, "stream");
624 }
625 
grpc_chttp2_stream(grpc_chttp2_transport * t,grpc_stream_refcount * refcount,const void * server_data,grpc_core::Arena * arena)626 grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t,
627                                        grpc_stream_refcount* refcount,
628                                        const void* server_data,
629                                        grpc_core::Arena* arena)
630     : t(t),
631       refcount(refcount),
632       reffer(this),
633       metadata_buffer{grpc_chttp2_incoming_metadata_buffer(arena),
634                       grpc_chttp2_incoming_metadata_buffer(arena)} {
635   if (server_data) {
636     id = static_cast<uint32_t>((uintptr_t)server_data);
637     *t->accepting_stream = this;
638     grpc_chttp2_stream_map_add(&t->stream_map, id, this);
639     post_destructive_reclaimer(t);
640   }
641   if (t->flow_control->flow_control_enabled()) {
642     flow_control.Init<grpc_core::chttp2::StreamFlowControl>(
643         static_cast<grpc_core::chttp2::TransportFlowControl*>(
644             t->flow_control.get()),
645         this);
646   } else {
647     flow_control.Init<grpc_core::chttp2::StreamFlowControlDisabled>();
648   }
649 
650   grpc_slice_buffer_init(&frame_storage);
651   grpc_slice_buffer_init(&unprocessed_incoming_frames_buffer);
652   grpc_slice_buffer_init(&flow_controlled_buffer);
653   GRPC_CLOSURE_INIT(&reset_byte_stream, ::reset_byte_stream, this, nullptr);
654 }
655 
~grpc_chttp2_stream()656 grpc_chttp2_stream::~grpc_chttp2_stream() {
657   if (t->channelz_socket != nullptr) {
658     if ((t->is_client && eos_received) || (!t->is_client && eos_sent)) {
659       t->channelz_socket->RecordStreamSucceeded();
660     } else {
661       t->channelz_socket->RecordStreamFailed();
662     }
663   }
664 
665   GPR_ASSERT((write_closed && read_closed) || id == 0);
666   if (id != 0) {
667     GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, id) == nullptr);
668   }
669 
670   grpc_slice_buffer_destroy_internal(&unprocessed_incoming_frames_buffer);
671   grpc_slice_buffer_destroy_internal(&frame_storage);
672   if (stream_compression_method != GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) {
673     grpc_slice_buffer_destroy_internal(&compressed_data_buffer);
674   }
675   if (stream_decompression_method !=
676       GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
677     grpc_slice_buffer_destroy_internal(&decompressed_data_buffer);
678   }
679 
680   grpc_chttp2_list_remove_stalled_by_transport(t, this);
681   grpc_chttp2_list_remove_stalled_by_stream(t, this);
682 
683   for (int i = 0; i < STREAM_LIST_COUNT; i++) {
684     if (GPR_UNLIKELY(included[i])) {
685       gpr_log(GPR_ERROR, "%s stream %d still included in list %d",
686               t->is_client ? "client" : "server", id, i);
687       abort();
688     }
689   }
690 
691   GPR_ASSERT(send_initial_metadata_finished == nullptr);
692   GPR_ASSERT(fetching_send_message == nullptr);
693   GPR_ASSERT(send_trailing_metadata_finished == nullptr);
694   GPR_ASSERT(recv_initial_metadata_ready == nullptr);
695   GPR_ASSERT(recv_message_ready == nullptr);
696   GPR_ASSERT(recv_trailing_metadata_finished == nullptr);
697   grpc_slice_buffer_destroy_internal(&flow_controlled_buffer);
698   GRPC_ERROR_UNREF(read_closed_error);
699   GRPC_ERROR_UNREF(write_closed_error);
700   GRPC_ERROR_UNREF(byte_stream_error);
701 
702   flow_control.Destroy();
703 
704   if (t->resource_user != nullptr) {
705     grpc_resource_user_free(t->resource_user, GRPC_RESOURCE_QUOTA_CALL_SIZE);
706   }
707 
708   GRPC_CHTTP2_UNREF_TRANSPORT(t, "stream");
709   grpc_core::ExecCtx::Run(DEBUG_LOCATION, destroy_stream_arg, GRPC_ERROR_NONE);
710 }
711 
init_stream(grpc_transport * gt,grpc_stream * gs,grpc_stream_refcount * refcount,const void * server_data,grpc_core::Arena * arena)712 static int init_stream(grpc_transport* gt, grpc_stream* gs,
713                        grpc_stream_refcount* refcount, const void* server_data,
714                        grpc_core::Arena* arena) {
715   GPR_TIMER_SCOPE("init_stream", 0);
716   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
717   new (gs) grpc_chttp2_stream(t, refcount, server_data, arena);
718   return 0;
719 }
720 
destroy_stream_locked(void * sp,grpc_error *)721 static void destroy_stream_locked(void* sp, grpc_error* /*error*/) {
722   GPR_TIMER_SCOPE("destroy_stream", 0);
723   grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(sp);
724   s->~grpc_chttp2_stream();
725 }
726 
destroy_stream(grpc_transport * gt,grpc_stream * gs,grpc_closure * then_schedule_closure)727 static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
728                            grpc_closure* then_schedule_closure) {
729   GPR_TIMER_SCOPE("destroy_stream", 0);
730   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
731   grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs);
732   if (s->stream_compression_method !=
733           GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS &&
734       s->stream_compression_ctx != nullptr) {
735     grpc_stream_compression_context_destroy(s->stream_compression_ctx);
736     s->stream_compression_ctx = nullptr;
737   }
738   if (s->stream_decompression_method !=
739           GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS &&
740       s->stream_decompression_ctx != nullptr) {
741     grpc_stream_compression_context_destroy(s->stream_decompression_ctx);
742     s->stream_decompression_ctx = nullptr;
743   }
744 
745   s->destroy_stream_arg = then_schedule_closure;
746   t->combiner->Run(
747       GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s, nullptr),
748       GRPC_ERROR_NONE);
749 }
750 
grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport * t,uint32_t id)751 grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t,
752                                                       uint32_t id) {
753   if (t->accept_stream_cb == nullptr) {
754     return nullptr;
755   }
756   // Don't accept the stream if memory quota doesn't allow. Note that we should
757   // simply refuse the stream here instead of canceling the stream after it's
758   // accepted since the latter will create the call which costs much memory.
759   if (t->resource_user != nullptr &&
760       !grpc_resource_user_safe_alloc(t->resource_user,
761                                      GRPC_RESOURCE_QUOTA_CALL_SIZE)) {
762     gpr_log(GPR_ERROR, "Memory exhausted, rejecting the stream.");
763     grpc_chttp2_add_rst_stream_to_next_write(t, id, GRPC_HTTP2_REFUSED_STREAM,
764                                              nullptr);
765     grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
766     return nullptr;
767   }
768   grpc_chttp2_stream* accepting = nullptr;
769   GPR_ASSERT(t->accepting_stream == nullptr);
770   t->accepting_stream = &accepting;
771   t->accept_stream_cb(t->accept_stream_cb_user_data, &t->base,
772                       (void*)static_cast<uintptr_t>(id));
773   t->accepting_stream = nullptr;
774   return accepting;
775 }
776 
777 /*******************************************************************************
778  * OUTPUT PROCESSING
779  */
780 
write_state_name(grpc_chttp2_write_state st)781 static const char* write_state_name(grpc_chttp2_write_state st) {
782   switch (st) {
783     case GRPC_CHTTP2_WRITE_STATE_IDLE:
784       return "IDLE";
785     case GRPC_CHTTP2_WRITE_STATE_WRITING:
786       return "WRITING";
787     case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
788       return "WRITING+MORE";
789   }
790   GPR_UNREACHABLE_CODE(return "UNKNOWN");
791 }
792 
set_write_state(grpc_chttp2_transport * t,grpc_chttp2_write_state st,const char * reason)793 static void set_write_state(grpc_chttp2_transport* t,
794                             grpc_chttp2_write_state st, const char* reason) {
795   GRPC_CHTTP2_IF_TRACING(
796       gpr_log(GPR_INFO, "W:%p %s [%s] state %s -> %s [%s]", t,
797               t->is_client ? "CLIENT" : "SERVER", t->peer_string,
798               write_state_name(t->write_state), write_state_name(st), reason));
799   t->write_state = st;
800   /* If the state is being reset back to idle, it means a write was just
801    * finished. Make sure all the run_after_write closures are scheduled.
802    *
803    * This is also our chance to close the transport if the transport was marked
804    * to be closed after all writes finish (for example, if we received a go-away
805    * from peer while we had some pending writes) */
806   if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) {
807     grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &t->run_after_write);
808     if (t->close_transport_on_writes_finished != nullptr) {
809       grpc_error* err = t->close_transport_on_writes_finished;
810       t->close_transport_on_writes_finished = nullptr;
811       close_transport_locked(t, err);
812     }
813   }
814 }
815 
inc_initiate_write_reason(grpc_chttp2_initiate_write_reason reason)816 static void inc_initiate_write_reason(
817     grpc_chttp2_initiate_write_reason reason) {
818   switch (reason) {
819     case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE:
820       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_INITIAL_WRITE();
821       break;
822     case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM:
823       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_START_NEW_STREAM();
824       break;
825     case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE:
826       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_MESSAGE();
827       break;
828     case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA:
829       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_INITIAL_METADATA();
830       break;
831     case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA:
832       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_TRAILING_METADATA();
833       break;
834     case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING:
835       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RETRY_SEND_PING();
836       break;
837     case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS:
838       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CONTINUE_PINGS();
839       break;
840     case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT:
841       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_GOAWAY_SENT();
842       break;
843     case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM:
844       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RST_STREAM();
845       break;
846     case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API:
847       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CLOSE_FROM_API();
848       break;
849     case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL:
850       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_STREAM_FLOW_CONTROL();
851       break;
852     case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL:
853       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL();
854       break;
855     case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS:
856       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_SETTINGS();
857       break;
858     case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING:
859       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_SETTING();
860       break;
861     case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE:
862       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_UPDATE();
863       break;
864     case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING:
865       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_APPLICATION_PING();
866       break;
867     case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING:
868       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_KEEPALIVE_PING();
869       break;
870     case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED:
871       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL_UNSTALLED();
872       break;
873     case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE:
874       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_PING_RESPONSE();
875       break;
876     case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM:
877       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FORCE_RST_STREAM();
878       break;
879   }
880 }
881 
grpc_chttp2_initiate_write(grpc_chttp2_transport * t,grpc_chttp2_initiate_write_reason reason)882 void grpc_chttp2_initiate_write(grpc_chttp2_transport* t,
883                                 grpc_chttp2_initiate_write_reason reason) {
884   GPR_TIMER_SCOPE("grpc_chttp2_initiate_write", 0);
885 
886   switch (t->write_state) {
887     case GRPC_CHTTP2_WRITE_STATE_IDLE:
888       inc_initiate_write_reason(reason);
889       set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING,
890                       grpc_chttp2_initiate_write_reason_string(reason));
891       GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
892       /* Note that the 'write_action_begin_locked' closure is being scheduled
893        * on the 'finally_scheduler' of t->combiner. This means that
894        * 'write_action_begin_locked' is called only *after* all the other
895        * closures (some of which are potentially initiating more writes on the
896        * transport) are executed on the t->combiner.
897        *
898        * The reason for scheduling on finally_scheduler is to make sure we batch
899        * as many writes as possible. 'write_action_begin_locked' is the function
900        * that gathers all the relevant bytes (which are at various places in the
901        * grpc_chttp2_transport structure) and append them to 'outbuf' field in
902        * grpc_chttp2_transport thereby batching what would have been potentially
903        * multiple write operations.
904        *
905        * Also, 'write_action_begin_locked' only gathers the bytes into outbuf.
906        * It does not call the endpoint to write the bytes. That is done by the
907        * 'write_action' (which is scheduled by 'write_action_begin_locked') */
908       t->combiner->FinallyRun(
909           GRPC_CLOSURE_INIT(&t->write_action_begin_locked,
910                             write_action_begin_locked, t, nullptr),
911           GRPC_ERROR_NONE);
912       break;
913     case GRPC_CHTTP2_WRITE_STATE_WRITING:
914       set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
915                       grpc_chttp2_initiate_write_reason_string(reason));
916       break;
917     case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
918       break;
919   }
920 }
921 
grpc_chttp2_mark_stream_writable(grpc_chttp2_transport * t,grpc_chttp2_stream * s)922 void grpc_chttp2_mark_stream_writable(grpc_chttp2_transport* t,
923                                       grpc_chttp2_stream* s) {
924   if (t->closed_with_error == GRPC_ERROR_NONE &&
925       grpc_chttp2_list_add_writable_stream(t, s)) {
926     GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become");
927   }
928 }
929 
begin_writing_desc(bool partial)930 static const char* begin_writing_desc(bool partial) {
931   if (partial) {
932     return "begin partial write in background";
933   } else {
934     return "begin write in current thread";
935   }
936 }
937 
write_action_begin_locked(void * gt,grpc_error *)938 static void write_action_begin_locked(void* gt, grpc_error* /*error_ignored*/) {
939   GPR_TIMER_SCOPE("write_action_begin_locked", 0);
940   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(gt);
941   GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE);
942   grpc_chttp2_begin_write_result r;
943   if (t->closed_with_error != GRPC_ERROR_NONE) {
944     r.writing = false;
945   } else {
946     r = grpc_chttp2_begin_write(t);
947   }
948   if (r.writing) {
949     if (r.partial) {
950       GRPC_STATS_INC_HTTP2_PARTIAL_WRITES();
951     }
952     set_write_state(t,
953                     r.partial ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE
954                               : GRPC_CHTTP2_WRITE_STATE_WRITING,
955                     begin_writing_desc(r.partial));
956     write_action(t, GRPC_ERROR_NONE);
957     if (t->reading_paused_on_pending_induced_frames) {
958       GPR_ASSERT(t->num_pending_induced_frames == 0);
959       /* We had paused reading, because we had many induced frames (SETTINGS
960        * ACK, PINGS ACK and RST_STREAMS) pending in t->qbuf. Now that we have
961        * been able to flush qbuf, we can resume reading. */
962       GRPC_CHTTP2_IF_TRACING(gpr_log(
963           GPR_INFO,
964           "transport %p : Resuming reading after being paused due to too "
965           "many unwritten SETTINGS ACK, PINGS ACK and RST_STREAM frames",
966           t));
967       t->reading_paused_on_pending_induced_frames = false;
968       continue_read_action_locked(t);
969     }
970   } else {
971     GRPC_STATS_INC_HTTP2_SPURIOUS_WRITES_BEGUN();
972     set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "begin writing nothing");
973     GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing");
974   }
975 }
976 
write_action(void * gt,grpc_error *)977 static void write_action(void* gt, grpc_error* /*error*/) {
978   GPR_TIMER_SCOPE("write_action", 0);
979   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(gt);
980   void* cl = t->cl;
981   t->cl = nullptr;
982   grpc_endpoint_write(
983       t->ep, &t->outbuf,
984       GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end, t,
985                         grpc_schedule_on_exec_ctx),
986       cl);
987 }
988 
write_action_end(void * tp,grpc_error * error)989 static void write_action_end(void* tp, grpc_error* error) {
990   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
991   t->combiner->Run(GRPC_CLOSURE_INIT(&t->write_action_end_locked,
992                                      write_action_end_locked, t, nullptr),
993                    GRPC_ERROR_REF(error));
994 }
995 
996 /* Callback from the grpc_endpoint after bytes have been written by calling
997  * sendmsg */
write_action_end_locked(void * tp,grpc_error * error)998 static void write_action_end_locked(void* tp, grpc_error* error) {
999   GPR_TIMER_SCOPE("terminate_writing_with_lock", 0);
1000   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
1001 
1002   bool closed = false;
1003   if (error != GRPC_ERROR_NONE) {
1004     close_transport_locked(t, GRPC_ERROR_REF(error));
1005     closed = true;
1006   }
1007 
1008   if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED) {
1009     t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SENT;
1010     closed = true;
1011     if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
1012       close_transport_locked(
1013           t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("goaway sent"));
1014     }
1015   }
1016 
1017   switch (t->write_state) {
1018     case GRPC_CHTTP2_WRITE_STATE_IDLE:
1019       GPR_UNREACHABLE_CODE(break);
1020     case GRPC_CHTTP2_WRITE_STATE_WRITING:
1021       GPR_TIMER_MARK("state=writing", 0);
1022       set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "finish writing");
1023       break;
1024     case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
1025       GPR_TIMER_MARK("state=writing_stale_no_poller", 0);
1026       set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, "continue writing");
1027       GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
1028       // If the transport is closed, we will retry writing on the endpoint
1029       // and next write may contain part of the currently serialized frames.
1030       // So, we should only call the run_after_write callbacks when the next
1031       // write finishes, or the callbacks will be invoked when the stream is
1032       // closed.
1033       if (!closed) {
1034         grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &t->run_after_write);
1035       }
1036       t->combiner->FinallyRun(
1037           GRPC_CLOSURE_INIT(&t->write_action_begin_locked,
1038                             write_action_begin_locked, t, nullptr),
1039           GRPC_ERROR_NONE);
1040       break;
1041   }
1042 
1043   grpc_chttp2_end_write(t, GRPC_ERROR_REF(error));
1044   GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing");
1045 }
1046 
1047 // Dirties an HTTP2 setting to be sent out next time a writing path occurs.
1048 // If the change needs to occur immediately, manually initiate a write.
queue_setting_update(grpc_chttp2_transport * t,grpc_chttp2_setting_id id,uint32_t value)1049 static void queue_setting_update(grpc_chttp2_transport* t,
1050                                  grpc_chttp2_setting_id id, uint32_t value) {
1051   const grpc_chttp2_setting_parameters* sp =
1052       &grpc_chttp2_settings_parameters[id];
1053   uint32_t use_value = GPR_CLAMP(value, sp->min_value, sp->max_value);
1054   if (use_value != value) {
1055     gpr_log(GPR_INFO, "Requested parameter %s clamped from %d to %d", sp->name,
1056             value, use_value);
1057   }
1058   if (use_value != t->settings[GRPC_LOCAL_SETTINGS][id]) {
1059     t->settings[GRPC_LOCAL_SETTINGS][id] = use_value;
1060     t->dirtied_local_settings = 1;
1061   }
1062 }
1063 
grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport * t,uint32_t goaway_error,uint32_t last_stream_id,const grpc_slice & goaway_text)1064 void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
1065                                      uint32_t goaway_error,
1066                                      uint32_t last_stream_id,
1067                                      const grpc_slice& goaway_text) {
1068   // Discard the error from a previous goaway frame (if any)
1069   if (t->goaway_error != GRPC_ERROR_NONE) {
1070     GRPC_ERROR_UNREF(t->goaway_error);
1071   }
1072   t->goaway_error = grpc_error_set_str(
1073       grpc_error_set_int(
1074           grpc_error_set_int(
1075               GRPC_ERROR_CREATE_FROM_STATIC_STRING("GOAWAY received"),
1076               GRPC_ERROR_INT_HTTP2_ERROR, static_cast<intptr_t>(goaway_error)),
1077           GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
1078       GRPC_ERROR_STR_RAW_BYTES, goaway_text);
1079 
1080   GRPC_CHTTP2_IF_TRACING(
1081       gpr_log(GPR_INFO, "transport %p got goaway with last stream id %d", t,
1082               last_stream_id));
1083   /* We want to log this irrespective of whether http tracing is enabled if we
1084    * received a GOAWAY with a non NO_ERROR code. */
1085   if (goaway_error != GRPC_HTTP2_NO_ERROR) {
1086     gpr_log(GPR_INFO, "%s: Got goaway [%d] err=%s", t->peer_string,
1087             goaway_error, grpc_error_string(t->goaway_error));
1088   }
1089   /* When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug
1090    * data equal to "too_many_pings", it should log the occurrence at a log level
1091    * that is enabled by default and double the configured KEEPALIVE_TIME used
1092    * for new connections on that channel. */
1093   if (GPR_UNLIKELY(t->is_client &&
1094                    goaway_error == GRPC_HTTP2_ENHANCE_YOUR_CALM &&
1095                    grpc_slice_str_cmp(goaway_text, "too_many_pings") == 0)) {
1096     gpr_log(GPR_ERROR,
1097             "Received a GOAWAY with error code ENHANCE_YOUR_CALM and debug "
1098             "data equal to \"too_many_pings\"");
1099     double current_keepalive_time_ms = static_cast<double>(t->keepalive_time);
1100     constexpr int max_keepalive_time_ms =
1101         INT_MAX / KEEPALIVE_TIME_BACKOFF_MULTIPLIER;
1102     t->keepalive_time =
1103         current_keepalive_time_ms > static_cast<double>(max_keepalive_time_ms)
1104             ? GRPC_MILLIS_INF_FUTURE
1105             : static_cast<grpc_millis>(current_keepalive_time_ms *
1106                                        KEEPALIVE_TIME_BACKOFF_MULTIPLIER);
1107   }
1108   /* lie: use transient failure from the transport to indicate goaway has been
1109    * received */
1110   connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE, "got_goaway");
1111 }
1112 
maybe_start_some_streams(grpc_chttp2_transport * t)1113 static void maybe_start_some_streams(grpc_chttp2_transport* t) {
1114   grpc_chttp2_stream* s;
1115   /* cancel out streams that haven't yet started if we have received a GOAWAY */
1116   if (t->goaway_error != GRPC_ERROR_NONE) {
1117     while (grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1118       grpc_chttp2_cancel_stream(
1119           t, s,
1120           grpc_error_set_int(
1121               GRPC_ERROR_CREATE_FROM_STATIC_STRING("GOAWAY received"),
1122               GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1123     }
1124     return;
1125   }
1126   /* start streams where we have free grpc_chttp2_stream ids and free
1127    * concurrency */
1128   while (t->next_stream_id <= MAX_CLIENT_STREAM_ID &&
1129          grpc_chttp2_stream_map_size(&t->stream_map) <
1130              t->settings[GRPC_PEER_SETTINGS]
1131                         [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] &&
1132          grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1133     /* safe since we can't (legally) be parsing this stream yet */
1134     GRPC_CHTTP2_IF_TRACING(gpr_log(
1135         GPR_INFO,
1136         "HTTP:%s: Transport %p allocating new grpc_chttp2_stream %p to id %d",
1137         t->is_client ? "CLI" : "SVR", t, s, t->next_stream_id));
1138 
1139     GPR_ASSERT(s->id == 0);
1140     s->id = t->next_stream_id;
1141     t->next_stream_id += 2;
1142 
1143     if (t->next_stream_id >= MAX_CLIENT_STREAM_ID) {
1144       connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE,
1145                              "no_more_stream_ids");
1146     }
1147 
1148     grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
1149     post_destructive_reclaimer(t);
1150     grpc_chttp2_mark_stream_writable(t, s);
1151     grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM);
1152   }
1153   /* cancel out streams that will never be started */
1154   if (t->next_stream_id >= MAX_CLIENT_STREAM_ID) {
1155     while (grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1156       grpc_chttp2_cancel_stream(
1157           t, s,
1158           grpc_error_set_int(
1159               GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream IDs exhausted"),
1160               GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1161     }
1162   }
1163 }
1164 
1165 /* Flag that this closure barrier may be covering a write in a pollset, and so
1166    we should not complete this closure until we can prove that the write got
1167    scheduled */
1168 #define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 0)
1169 /* First bit of the reference count, stored in the high order bits (with the low
1170    bits being used for flags defined above) */
1171 #define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16)
1172 
add_closure_barrier(grpc_closure * closure)1173 static grpc_closure* add_closure_barrier(grpc_closure* closure) {
1174   closure->next_data.scratch += CLOSURE_BARRIER_FIRST_REF_BIT;
1175   return closure;
1176 }
1177 
null_then_sched_closure(grpc_closure ** closure)1178 static void null_then_sched_closure(grpc_closure** closure) {
1179   grpc_closure* c = *closure;
1180   *closure = nullptr;
1181   grpc_core::ExecCtx::Run(DEBUG_LOCATION, c, GRPC_ERROR_NONE);
1182 }
1183 
grpc_chttp2_complete_closure_step(grpc_chttp2_transport * t,grpc_chttp2_stream *,grpc_closure ** pclosure,grpc_error * error,const char * desc)1184 void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
1185                                        grpc_chttp2_stream* /*s*/,
1186                                        grpc_closure** pclosure,
1187                                        grpc_error* error, const char* desc) {
1188   grpc_closure* closure = *pclosure;
1189   *pclosure = nullptr;
1190   if (closure == nullptr) {
1191     GRPC_ERROR_UNREF(error);
1192     return;
1193   }
1194   closure->next_data.scratch -= CLOSURE_BARRIER_FIRST_REF_BIT;
1195   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1196     const char* errstr = grpc_error_string(error);
1197     gpr_log(
1198         GPR_INFO,
1199         "complete_closure_step: t=%p %p refs=%d flags=0x%04x desc=%s err=%s "
1200         "write_state=%s",
1201         t, closure,
1202         static_cast<int>(closure->next_data.scratch /
1203                          CLOSURE_BARRIER_FIRST_REF_BIT),
1204         static_cast<int>(closure->next_data.scratch %
1205                          CLOSURE_BARRIER_FIRST_REF_BIT),
1206         desc, errstr, write_state_name(t->write_state));
1207   }
1208   if (error != GRPC_ERROR_NONE) {
1209     if (closure->error_data.error == GRPC_ERROR_NONE) {
1210       closure->error_data.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1211           "Error in HTTP transport completing operation");
1212       closure->error_data.error = grpc_error_set_str(
1213           closure->error_data.error, GRPC_ERROR_STR_TARGET_ADDRESS,
1214           grpc_slice_from_copied_string(t->peer_string));
1215     }
1216     closure->error_data.error =
1217         grpc_error_add_child(closure->error_data.error, error);
1218   }
1219   if (closure->next_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) {
1220     if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) ||
1221         !(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) {
1222       // Using GRPC_CLOSURE_SCHED instead of GRPC_CLOSURE_RUN to avoid running
1223       // closures earlier than when it is safe to do so.
1224       grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure,
1225                               closure->error_data.error);
1226     } else {
1227       grpc_closure_list_append(&t->run_after_write, closure,
1228                                closure->error_data.error);
1229     }
1230   }
1231 }
1232 
contains_non_ok_status(grpc_metadata_batch * batch)1233 static bool contains_non_ok_status(grpc_metadata_batch* batch) {
1234   if (batch->idx.named.grpc_status != nullptr) {
1235     return !grpc_mdelem_static_value_eq(batch->idx.named.grpc_status->md,
1236                                         GRPC_MDELEM_GRPC_STATUS_0);
1237   }
1238   return false;
1239 }
1240 
maybe_become_writable_due_to_send_msg(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1241 static void maybe_become_writable_due_to_send_msg(grpc_chttp2_transport* t,
1242                                                   grpc_chttp2_stream* s) {
1243   if (s->id != 0 && (!s->write_buffering ||
1244                      s->flow_controlled_buffer.length > t->write_buffer_size)) {
1245     grpc_chttp2_mark_stream_writable(t, s);
1246     grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE);
1247   }
1248 }
1249 
add_fetched_slice_locked(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1250 static void add_fetched_slice_locked(grpc_chttp2_transport* t,
1251                                      grpc_chttp2_stream* s) {
1252   s->fetched_send_message_length +=
1253       static_cast<uint32_t> GRPC_SLICE_LENGTH(s->fetching_slice);
1254   grpc_slice_buffer_add(&s->flow_controlled_buffer, s->fetching_slice);
1255   maybe_become_writable_due_to_send_msg(t, s);
1256 }
1257 
continue_fetching_send_locked(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1258 static void continue_fetching_send_locked(grpc_chttp2_transport* t,
1259                                           grpc_chttp2_stream* s) {
1260   for (;;) {
1261     if (s->fetching_send_message == nullptr) {
1262       /* Stream was cancelled before message fetch completed */
1263       abort(); /* TODO(ctiller): what cleanup here? */
1264       return;  /* early out */
1265     }
1266     if (s->fetched_send_message_length == s->fetching_send_message->length()) {
1267       int64_t notify_offset = s->next_message_end_offset;
1268       if (notify_offset <= s->flow_controlled_bytes_written) {
1269         grpc_chttp2_complete_closure_step(
1270             t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE,
1271             "fetching_send_message_finished");
1272       } else {
1273         grpc_chttp2_write_cb* cb = t->write_cb_pool;
1274         if (cb == nullptr) {
1275           cb = static_cast<grpc_chttp2_write_cb*>(gpr_malloc(sizeof(*cb)));
1276         } else {
1277           t->write_cb_pool = cb->next;
1278         }
1279         cb->call_at_byte = notify_offset;
1280         cb->closure = s->fetching_send_message_finished;
1281         s->fetching_send_message_finished = nullptr;
1282         grpc_chttp2_write_cb** list =
1283             s->fetching_send_message->flags() & GRPC_WRITE_THROUGH
1284                 ? &s->on_write_finished_cbs
1285                 : &s->on_flow_controlled_cbs;
1286         cb->next = *list;
1287         *list = cb;
1288       }
1289       s->fetching_send_message.reset();
1290       return; /* early out */
1291     } else if (s->fetching_send_message->Next(
1292                    UINT32_MAX, GRPC_CLOSURE_INIT(&s->complete_fetch_locked,
1293                                                  ::complete_fetch, s,
1294                                                  grpc_schedule_on_exec_ctx))) {
1295       grpc_error* error = s->fetching_send_message->Pull(&s->fetching_slice);
1296       if (error != GRPC_ERROR_NONE) {
1297         s->fetching_send_message.reset();
1298         grpc_chttp2_cancel_stream(t, s, error);
1299       } else {
1300         add_fetched_slice_locked(t, s);
1301       }
1302     }
1303   }
1304 }
1305 
complete_fetch(void * gs,grpc_error * error)1306 static void complete_fetch(void* gs, grpc_error* error) {
1307   grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(gs);
1308   s->t->combiner->Run(GRPC_CLOSURE_INIT(&s->complete_fetch_locked,
1309                                         ::complete_fetch_locked, s, nullptr),
1310                       GRPC_ERROR_REF(error));
1311 }
1312 
complete_fetch_locked(void * gs,grpc_error * error)1313 static void complete_fetch_locked(void* gs, grpc_error* error) {
1314   grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(gs);
1315   grpc_chttp2_transport* t = s->t;
1316   if (error == GRPC_ERROR_NONE) {
1317     error = s->fetching_send_message->Pull(&s->fetching_slice);
1318     if (error == GRPC_ERROR_NONE) {
1319       add_fetched_slice_locked(t, s);
1320       continue_fetching_send_locked(t, s);
1321     }
1322   }
1323   if (error != GRPC_ERROR_NONE) {
1324     s->fetching_send_message.reset();
1325     grpc_chttp2_cancel_stream(t, s, error);
1326   }
1327 }
1328 
log_metadata(const grpc_metadata_batch * md_batch,uint32_t id,bool is_client,bool is_initial)1329 static void log_metadata(const grpc_metadata_batch* md_batch, uint32_t id,
1330                          bool is_client, bool is_initial) {
1331   for (grpc_linked_mdelem* md = md_batch->list.head; md != nullptr;
1332        md = md->next) {
1333     char* key = grpc_slice_to_c_string(GRPC_MDKEY(md->md));
1334     char* value = grpc_slice_to_c_string(GRPC_MDVALUE(md->md));
1335     gpr_log(GPR_INFO, "HTTP:%d:%s:%s: %s: %s", id, is_initial ? "HDR" : "TRL",
1336             is_client ? "CLI" : "SVR", key, value);
1337     gpr_free(key);
1338     gpr_free(value);
1339   }
1340 }
1341 
perform_stream_op_locked(void * stream_op,grpc_error *)1342 static void perform_stream_op_locked(void* stream_op,
1343                                      grpc_error* /*error_ignored*/) {
1344   GPR_TIMER_SCOPE("perform_stream_op_locked", 0);
1345 
1346   grpc_transport_stream_op_batch* op =
1347       static_cast<grpc_transport_stream_op_batch*>(stream_op);
1348   grpc_chttp2_stream* s =
1349       static_cast<grpc_chttp2_stream*>(op->handler_private.extra_arg);
1350   grpc_transport_stream_op_batch_payload* op_payload = op->payload;
1351   grpc_chttp2_transport* t = s->t;
1352 
1353   GRPC_STATS_INC_HTTP2_OP_BATCHES();
1354 
1355   s->context = op->payload->context;
1356   s->traced = op->is_traced;
1357   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1358     gpr_log(GPR_INFO, "perform_stream_op_locked: %s; on_complete = %p",
1359             grpc_transport_stream_op_batch_string(op).c_str(), op->on_complete);
1360     if (op->send_initial_metadata) {
1361       log_metadata(op_payload->send_initial_metadata.send_initial_metadata,
1362                    s->id, t->is_client, true);
1363     }
1364     if (op->send_trailing_metadata) {
1365       log_metadata(op_payload->send_trailing_metadata.send_trailing_metadata,
1366                    s->id, t->is_client, false);
1367     }
1368   }
1369 
1370   grpc_closure* on_complete = op->on_complete;
1371   // on_complete will be null if and only if there are no send ops in the batch.
1372   if (on_complete != nullptr) {
1373     // This batch has send ops. Use final_data as a barrier until enqueue time;
1374     // the initial counter is dropped at the end of this function.
1375     on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT;
1376     on_complete->error_data.error = GRPC_ERROR_NONE;
1377   }
1378 
1379   if (op->cancel_stream) {
1380     GRPC_STATS_INC_HTTP2_OP_CANCEL();
1381     grpc_chttp2_cancel_stream(t, s, op_payload->cancel_stream.cancel_error);
1382   }
1383 
1384   if (op->send_initial_metadata) {
1385     if (t->is_client && t->channelz_socket != nullptr) {
1386       t->channelz_socket->RecordStreamStartedFromLocal();
1387     }
1388     GRPC_STATS_INC_HTTP2_OP_SEND_INITIAL_METADATA();
1389     GPR_ASSERT(s->send_initial_metadata_finished == nullptr);
1390     on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
1391 
1392     /* Identify stream compression */
1393     if (op_payload->send_initial_metadata.send_initial_metadata->idx.named
1394                 .content_encoding == nullptr ||
1395         grpc_stream_compression_method_parse(
1396             GRPC_MDVALUE(
1397                 op_payload->send_initial_metadata.send_initial_metadata->idx
1398                     .named.content_encoding->md),
1399             true, &s->stream_compression_method) == 0) {
1400       s->stream_compression_method = GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS;
1401     }
1402     if (s->stream_compression_method !=
1403         GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) {
1404       s->uncompressed_data_size = 0;
1405       s->stream_compression_ctx = nullptr;
1406       grpc_slice_buffer_init(&s->compressed_data_buffer);
1407     }
1408     s->send_initial_metadata_finished = add_closure_barrier(on_complete);
1409     s->send_initial_metadata =
1410         op_payload->send_initial_metadata.send_initial_metadata;
1411     const size_t metadata_size =
1412         grpc_metadata_batch_size(s->send_initial_metadata);
1413     const size_t metadata_peer_limit =
1414         t->settings[GRPC_PEER_SETTINGS]
1415                    [GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE];
1416     if (t->is_client) {
1417       s->deadline = GPR_MIN(s->deadline, s->send_initial_metadata->deadline);
1418     }
1419     if (metadata_size > metadata_peer_limit) {
1420       grpc_chttp2_cancel_stream(
1421           t, s,
1422           grpc_error_set_int(
1423               grpc_error_set_int(
1424                   grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1425                                          "to-be-sent initial metadata size "
1426                                          "exceeds peer limit"),
1427                                      GRPC_ERROR_INT_SIZE,
1428                                      static_cast<intptr_t>(metadata_size)),
1429                   GRPC_ERROR_INT_LIMIT,
1430                   static_cast<intptr_t>(metadata_peer_limit)),
1431               GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED));
1432     } else {
1433       if (contains_non_ok_status(s->send_initial_metadata)) {
1434         s->seen_error = true;
1435       }
1436       if (!s->write_closed) {
1437         if (t->is_client) {
1438           if (t->closed_with_error == GRPC_ERROR_NONE) {
1439             GPR_ASSERT(s->id == 0);
1440             grpc_chttp2_list_add_waiting_for_concurrency(t, s);
1441             maybe_start_some_streams(t);
1442           } else {
1443             grpc_chttp2_cancel_stream(
1444                 t, s,
1445                 grpc_error_set_int(
1446                     GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1447                         "Transport closed", &t->closed_with_error, 1),
1448                     GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1449           }
1450         } else {
1451           GPR_ASSERT(s->id != 0);
1452           grpc_chttp2_mark_stream_writable(t, s);
1453           if (!(op->send_message &&
1454                 (op->payload->send_message.send_message->flags() &
1455                  GRPC_WRITE_BUFFER_HINT))) {
1456             grpc_chttp2_initiate_write(
1457                 t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA);
1458           }
1459         }
1460       } else {
1461         s->send_initial_metadata = nullptr;
1462         grpc_chttp2_complete_closure_step(
1463             t, s, &s->send_initial_metadata_finished,
1464             GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1465                 "Attempt to send initial metadata after stream was closed",
1466                 &s->write_closed_error, 1),
1467             "send_initial_metadata_finished");
1468       }
1469     }
1470     if (op_payload->send_initial_metadata.peer_string != nullptr) {
1471       gpr_atm_rel_store(op_payload->send_initial_metadata.peer_string,
1472                         (gpr_atm)t->peer_string);
1473     }
1474   }
1475 
1476   if (op->send_message) {
1477     GRPC_STATS_INC_HTTP2_OP_SEND_MESSAGE();
1478     t->num_messages_in_next_write++;
1479     GRPC_STATS_INC_HTTP2_SEND_MESSAGE_SIZE(
1480         op->payload->send_message.send_message->length());
1481     on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
1482     s->fetching_send_message_finished = add_closure_barrier(op->on_complete);
1483     if (s->write_closed) {
1484       op->payload->send_message.stream_write_closed = true;
1485       // We should NOT return an error here, so as to avoid a cancel OP being
1486       // started. The surface layer will notice that the stream has been closed
1487       // for writes and fail the send message op.
1488       op->payload->send_message.send_message.reset();
1489       grpc_chttp2_complete_closure_step(
1490           t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE,
1491           "fetching_send_message_finished");
1492     } else {
1493       GPR_ASSERT(s->fetching_send_message == nullptr);
1494       uint8_t* frame_hdr = grpc_slice_buffer_tiny_add(
1495           &s->flow_controlled_buffer, GRPC_HEADER_SIZE_IN_BYTES);
1496       uint32_t flags = op_payload->send_message.send_message->flags();
1497       frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0;
1498       size_t len = op_payload->send_message.send_message->length();
1499       frame_hdr[1] = static_cast<uint8_t>(len >> 24);
1500       frame_hdr[2] = static_cast<uint8_t>(len >> 16);
1501       frame_hdr[3] = static_cast<uint8_t>(len >> 8);
1502       frame_hdr[4] = static_cast<uint8_t>(len);
1503       s->fetching_send_message =
1504           std::move(op_payload->send_message.send_message);
1505       s->fetched_send_message_length = 0;
1506       s->next_message_end_offset =
1507           s->flow_controlled_bytes_written +
1508           static_cast<int64_t>(s->flow_controlled_buffer.length) +
1509           static_cast<int64_t>(len);
1510       if (flags & GRPC_WRITE_BUFFER_HINT) {
1511         s->next_message_end_offset -= t->write_buffer_size;
1512         s->write_buffering = true;
1513       } else {
1514         s->write_buffering = false;
1515       }
1516       continue_fetching_send_locked(t, s);
1517       maybe_become_writable_due_to_send_msg(t, s);
1518     }
1519   }
1520 
1521   if (op->send_trailing_metadata) {
1522     GRPC_STATS_INC_HTTP2_OP_SEND_TRAILING_METADATA();
1523     GPR_ASSERT(s->send_trailing_metadata_finished == nullptr);
1524     on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
1525     s->send_trailing_metadata_finished = add_closure_barrier(on_complete);
1526     s->send_trailing_metadata =
1527         op_payload->send_trailing_metadata.send_trailing_metadata;
1528     s->sent_trailing_metadata_op = op_payload->send_trailing_metadata.sent;
1529     s->write_buffering = false;
1530     const size_t metadata_size =
1531         grpc_metadata_batch_size(s->send_trailing_metadata);
1532     const size_t metadata_peer_limit =
1533         t->settings[GRPC_PEER_SETTINGS]
1534                    [GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE];
1535     if (metadata_size > metadata_peer_limit) {
1536       grpc_chttp2_cancel_stream(
1537           t, s,
1538           grpc_error_set_int(
1539               grpc_error_set_int(
1540                   grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1541                                          "to-be-sent trailing metadata size "
1542                                          "exceeds peer limit"),
1543                                      GRPC_ERROR_INT_SIZE,
1544                                      static_cast<intptr_t>(metadata_size)),
1545                   GRPC_ERROR_INT_LIMIT,
1546                   static_cast<intptr_t>(metadata_peer_limit)),
1547               GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED));
1548     } else {
1549       if (contains_non_ok_status(s->send_trailing_metadata)) {
1550         s->seen_error = true;
1551       }
1552       if (s->write_closed) {
1553         s->send_trailing_metadata = nullptr;
1554         s->sent_trailing_metadata_op = nullptr;
1555         grpc_chttp2_complete_closure_step(
1556             t, s, &s->send_trailing_metadata_finished,
1557             grpc_metadata_batch_is_empty(
1558                 op->payload->send_trailing_metadata.send_trailing_metadata)
1559                 ? GRPC_ERROR_NONE
1560                 : GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1561                       "Attempt to send trailing metadata after "
1562                       "stream was closed"),
1563             "send_trailing_metadata_finished");
1564       } else if (s->id != 0) {
1565         /* TODO(ctiller): check if there's flow control for any outstanding
1566            bytes before going writable */
1567         grpc_chttp2_mark_stream_writable(t, s);
1568         grpc_chttp2_initiate_write(
1569             t, GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA);
1570       }
1571     }
1572   }
1573 
1574   if (op->recv_initial_metadata) {
1575     GRPC_STATS_INC_HTTP2_OP_RECV_INITIAL_METADATA();
1576     GPR_ASSERT(s->recv_initial_metadata_ready == nullptr);
1577     s->recv_initial_metadata_ready =
1578         op_payload->recv_initial_metadata.recv_initial_metadata_ready;
1579     s->recv_initial_metadata =
1580         op_payload->recv_initial_metadata.recv_initial_metadata;
1581     s->trailing_metadata_available =
1582         op_payload->recv_initial_metadata.trailing_metadata_available;
1583     if (op_payload->recv_initial_metadata.peer_string != nullptr) {
1584       gpr_atm_rel_store(op_payload->recv_initial_metadata.peer_string,
1585                         (gpr_atm)t->peer_string);
1586     }
1587     grpc_chttp2_maybe_complete_recv_initial_metadata(t, s);
1588   }
1589 
1590   if (op->recv_message) {
1591     GRPC_STATS_INC_HTTP2_OP_RECV_MESSAGE();
1592     size_t before = 0;
1593     GPR_ASSERT(s->recv_message_ready == nullptr);
1594     GPR_ASSERT(!s->pending_byte_stream);
1595     s->recv_message_ready = op_payload->recv_message.recv_message_ready;
1596     s->recv_message = op_payload->recv_message.recv_message;
1597     if (s->id != 0) {
1598       if (!s->read_closed) {
1599         before = s->frame_storage.length +
1600                  s->unprocessed_incoming_frames_buffer.length;
1601       }
1602     }
1603     grpc_chttp2_maybe_complete_recv_message(t, s);
1604     if (s->id != 0) {
1605       if (!s->read_closed && s->frame_storage.length == 0) {
1606         size_t after = s->frame_storage.length +
1607                        s->unprocessed_incoming_frames_buffer_cached_length;
1608         s->flow_control->IncomingByteStreamUpdate(GRPC_HEADER_SIZE_IN_BYTES,
1609                                                   before - after);
1610         grpc_chttp2_act_on_flowctl_action(s->flow_control->MakeAction(), t, s);
1611       }
1612     }
1613   }
1614 
1615   if (op->recv_trailing_metadata) {
1616     GRPC_STATS_INC_HTTP2_OP_RECV_TRAILING_METADATA();
1617     GPR_ASSERT(s->collecting_stats == nullptr);
1618     s->collecting_stats = op_payload->recv_trailing_metadata.collect_stats;
1619     GPR_ASSERT(s->recv_trailing_metadata_finished == nullptr);
1620     s->recv_trailing_metadata_finished =
1621         op_payload->recv_trailing_metadata.recv_trailing_metadata_ready;
1622     s->recv_trailing_metadata =
1623         op_payload->recv_trailing_metadata.recv_trailing_metadata;
1624     s->final_metadata_requested = true;
1625     grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
1626   }
1627 
1628   if (on_complete != nullptr) {
1629     grpc_chttp2_complete_closure_step(t, s, &on_complete, GRPC_ERROR_NONE,
1630                                       "op->on_complete");
1631   }
1632 
1633   GRPC_CHTTP2_STREAM_UNREF(s, "perform_stream_op");
1634 }
1635 
perform_stream_op(grpc_transport * gt,grpc_stream * gs,grpc_transport_stream_op_batch * op)1636 static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
1637                               grpc_transport_stream_op_batch* op) {
1638   GPR_TIMER_SCOPE("perform_stream_op", 0);
1639   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
1640   grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs);
1641 
1642   if (!t->is_client) {
1643     if (op->send_initial_metadata) {
1644       grpc_millis deadline =
1645           op->payload->send_initial_metadata.send_initial_metadata->deadline;
1646       GPR_ASSERT(deadline == GRPC_MILLIS_INF_FUTURE);
1647     }
1648     if (op->send_trailing_metadata) {
1649       grpc_millis deadline =
1650           op->payload->send_trailing_metadata.send_trailing_metadata->deadline;
1651       GPR_ASSERT(deadline == GRPC_MILLIS_INF_FUTURE);
1652     }
1653   }
1654 
1655   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1656     gpr_log(GPR_INFO, "perform_stream_op[s=%p]: %s", s,
1657             grpc_transport_stream_op_batch_string(op).c_str());
1658   }
1659 
1660   GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op");
1661   op->handler_private.extra_arg = gs;
1662   t->combiner->Run(GRPC_CLOSURE_INIT(&op->handler_private.closure,
1663                                      perform_stream_op_locked, op, nullptr),
1664                    GRPC_ERROR_NONE);
1665 }
1666 
cancel_pings(grpc_chttp2_transport * t,grpc_error * error)1667 static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) {
1668   /* callback remaining pings: they're not allowed to call into the transport,
1669      and maybe they hold resources that need to be freed */
1670   grpc_chttp2_ping_queue* pq = &t->ping_queue;
1671   GPR_ASSERT(error != GRPC_ERROR_NONE);
1672   for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) {
1673     grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error));
1674     grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &pq->lists[j]);
1675   }
1676   GRPC_ERROR_UNREF(error);
1677 }
1678 
send_ping_locked(grpc_chttp2_transport * t,grpc_closure * on_initiate,grpc_closure * on_ack)1679 static void send_ping_locked(grpc_chttp2_transport* t,
1680                              grpc_closure* on_initiate, grpc_closure* on_ack) {
1681   if (t->closed_with_error != GRPC_ERROR_NONE) {
1682     grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_initiate,
1683                             GRPC_ERROR_REF(t->closed_with_error));
1684     grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_ack,
1685                             GRPC_ERROR_REF(t->closed_with_error));
1686     return;
1687   }
1688   grpc_chttp2_ping_queue* pq = &t->ping_queue;
1689   grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE], on_initiate,
1690                            GRPC_ERROR_NONE);
1691   grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT], on_ack,
1692                            GRPC_ERROR_NONE);
1693 }
1694 
1695 /*
1696  * Specialized form of send_ping_locked for keepalive ping. If there is already
1697  * a ping in progress, the keepalive ping would piggyback onto that ping,
1698  * instead of waiting for that ping to complete and then starting a new ping.
1699  */
send_keepalive_ping_locked(grpc_chttp2_transport * t)1700 static void send_keepalive_ping_locked(grpc_chttp2_transport* t) {
1701   if (t->closed_with_error != GRPC_ERROR_NONE) {
1702     t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked,
1703                                        start_keepalive_ping_locked, t, nullptr),
1704                      GRPC_ERROR_REF(t->closed_with_error));
1705     t->combiner->Run(
1706         GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked,
1707                           finish_keepalive_ping_locked, t, nullptr),
1708         GRPC_ERROR_REF(t->closed_with_error));
1709     return;
1710   }
1711   grpc_chttp2_ping_queue* pq = &t->ping_queue;
1712   if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) {
1713     /* There is a ping in flight. Add yourself to the inflight closure list. */
1714     t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked,
1715                                        start_keepalive_ping_locked, t, nullptr),
1716                      GRPC_ERROR_REF(t->closed_with_error));
1717     grpc_closure_list_append(
1718         &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT],
1719         GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked,
1720                           finish_keepalive_ping, t, grpc_schedule_on_exec_ctx),
1721         GRPC_ERROR_NONE);
1722     return;
1723   }
1724   grpc_closure_list_append(
1725       &pq->lists[GRPC_CHTTP2_PCL_INITIATE],
1726       GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked, start_keepalive_ping,
1727                         t, grpc_schedule_on_exec_ctx),
1728       GRPC_ERROR_NONE);
1729   grpc_closure_list_append(
1730       &pq->lists[GRPC_CHTTP2_PCL_NEXT],
1731       GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked, finish_keepalive_ping,
1732                         t, grpc_schedule_on_exec_ctx),
1733       GRPC_ERROR_NONE);
1734 }
1735 
grpc_chttp2_retry_initiate_ping(void * tp,grpc_error * error)1736 void grpc_chttp2_retry_initiate_ping(void* tp, grpc_error* error) {
1737   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
1738   t->combiner->Run(GRPC_CLOSURE_INIT(&t->retry_initiate_ping_locked,
1739                                      retry_initiate_ping_locked, t, nullptr),
1740                    GRPC_ERROR_REF(error));
1741 }
1742 
retry_initiate_ping_locked(void * tp,grpc_error * error)1743 static void retry_initiate_ping_locked(void* tp, grpc_error* error) {
1744   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
1745   t->ping_state.is_delayed_ping_timer_set = false;
1746   if (error == GRPC_ERROR_NONE) {
1747     grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING);
1748   }
1749   GRPC_CHTTP2_UNREF_TRANSPORT(t, "retry_initiate_ping_locked");
1750 }
1751 
grpc_chttp2_ack_ping(grpc_chttp2_transport * t,uint64_t id)1752 void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) {
1753   grpc_chttp2_ping_queue* pq = &t->ping_queue;
1754   if (pq->inflight_id != id) {
1755     char* from = grpc_endpoint_get_peer(t->ep);
1756     gpr_log(GPR_DEBUG, "Unknown ping response from %s: %" PRIx64, from, id);
1757     gpr_free(from);
1758     return;
1759   }
1760   grpc_core::ExecCtx::RunList(DEBUG_LOCATION,
1761                               &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
1762   if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
1763     grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS);
1764   }
1765 }
1766 
send_goaway(grpc_chttp2_transport * t,grpc_error * error)1767 static void send_goaway(grpc_chttp2_transport* t, grpc_error* error) {
1768   /* We want to log this irrespective of whether http tracing is enabled */
1769   gpr_log(GPR_INFO, "%s: Sending goaway err=%s", t->peer_string,
1770           grpc_error_string(error));
1771   t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED;
1772   grpc_http2_error_code http_error;
1773   grpc_slice slice;
1774   grpc_error_get_status(error, GRPC_MILLIS_INF_FUTURE, nullptr, &slice,
1775                         &http_error, nullptr);
1776   grpc_chttp2_goaway_append(t->last_new_stream_id,
1777                             static_cast<uint32_t>(http_error),
1778                             grpc_slice_ref_internal(slice), &t->qbuf);
1779   grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT);
1780   GRPC_ERROR_UNREF(error);
1781 }
1782 
grpc_chttp2_add_ping_strike(grpc_chttp2_transport * t)1783 void grpc_chttp2_add_ping_strike(grpc_chttp2_transport* t) {
1784   if (++t->ping_recv_state.ping_strikes > t->ping_policy.max_ping_strikes &&
1785       t->ping_policy.max_ping_strikes != 0) {
1786     send_goaway(t,
1787                 grpc_error_set_int(
1788                     GRPC_ERROR_CREATE_FROM_STATIC_STRING("too_many_pings"),
1789                     GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM));
1790     /*The transport will be closed after the write is done */
1791     close_transport_locked(
1792         t, grpc_error_set_int(
1793                GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many pings"),
1794                GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1795   }
1796 }
1797 
grpc_chttp2_reset_ping_clock(grpc_chttp2_transport * t)1798 void grpc_chttp2_reset_ping_clock(grpc_chttp2_transport* t) {
1799   if (!t->is_client) {
1800     t->ping_recv_state.last_ping_recv_time = GRPC_MILLIS_INF_PAST;
1801     t->ping_recv_state.ping_strikes = 0;
1802   }
1803   t->ping_state.pings_before_data_required =
1804       t->ping_policy.max_pings_without_data;
1805 }
1806 
perform_transport_op_locked(void * stream_op,grpc_error *)1807 static void perform_transport_op_locked(void* stream_op,
1808                                         grpc_error* /*error_ignored*/) {
1809   grpc_transport_op* op = static_cast<grpc_transport_op*>(stream_op);
1810   grpc_chttp2_transport* t =
1811       static_cast<grpc_chttp2_transport*>(op->handler_private.extra_arg);
1812 
1813   if (op->goaway_error) {
1814     send_goaway(t, op->goaway_error);
1815   }
1816 
1817   if (op->set_accept_stream) {
1818     t->accept_stream_cb = op->set_accept_stream_fn;
1819     t->accept_stream_cb_user_data = op->set_accept_stream_user_data;
1820   }
1821 
1822   if (op->bind_pollset) {
1823     grpc_endpoint_add_to_pollset(t->ep, op->bind_pollset);
1824   }
1825 
1826   if (op->bind_pollset_set) {
1827     grpc_endpoint_add_to_pollset_set(t->ep, op->bind_pollset_set);
1828   }
1829 
1830   if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
1831     send_ping_locked(t, op->send_ping.on_initiate, op->send_ping.on_ack);
1832     grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING);
1833   }
1834 
1835   if (op->start_connectivity_watch != nullptr) {
1836     t->state_tracker.AddWatcher(op->start_connectivity_watch_state,
1837                                 std::move(op->start_connectivity_watch));
1838   }
1839   if (op->stop_connectivity_watch != nullptr) {
1840     t->state_tracker.RemoveWatcher(op->stop_connectivity_watch);
1841   }
1842 
1843   if (op->disconnect_with_error != GRPC_ERROR_NONE) {
1844     close_transport_locked(t, op->disconnect_with_error);
1845   }
1846 
1847   grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, GRPC_ERROR_NONE);
1848 
1849   GRPC_CHTTP2_UNREF_TRANSPORT(t, "transport_op");
1850 }
1851 
perform_transport_op(grpc_transport * gt,grpc_transport_op * op)1852 static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
1853   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
1854   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1855     gpr_log(GPR_INFO, "perform_transport_op[t=%p]: %s", t,
1856             grpc_transport_op_string(op).c_str());
1857   }
1858   op->handler_private.extra_arg = gt;
1859   GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op");
1860   t->combiner->Run(GRPC_CLOSURE_INIT(&op->handler_private.closure,
1861                                      perform_transport_op_locked, op, nullptr),
1862                    GRPC_ERROR_NONE);
1863 }
1864 
1865 /*******************************************************************************
1866  * INPUT PROCESSING - GENERAL
1867  */
1868 
grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport *,grpc_chttp2_stream * s)1869 void grpc_chttp2_maybe_complete_recv_initial_metadata(
1870     grpc_chttp2_transport* /*t*/, grpc_chttp2_stream* s) {
1871   if (s->recv_initial_metadata_ready != nullptr &&
1872       s->published_metadata[0] != GRPC_METADATA_NOT_PUBLISHED) {
1873     if (s->seen_error) {
1874       grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1875       if (!s->pending_byte_stream) {
1876         grpc_slice_buffer_reset_and_unref_internal(
1877             &s->unprocessed_incoming_frames_buffer);
1878       }
1879     }
1880     grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[0],
1881                                                  s->recv_initial_metadata);
1882     null_then_sched_closure(&s->recv_initial_metadata_ready);
1883   }
1884 }
1885 
grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport *,grpc_chttp2_stream * s)1886 void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* /*t*/,
1887                                              grpc_chttp2_stream* s) {
1888   grpc_error* error = GRPC_ERROR_NONE;
1889   if (s->recv_message_ready != nullptr) {
1890     *s->recv_message = nullptr;
1891     if (s->final_metadata_requested && s->seen_error) {
1892       grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1893       if (!s->pending_byte_stream) {
1894         grpc_slice_buffer_reset_and_unref_internal(
1895             &s->unprocessed_incoming_frames_buffer);
1896       }
1897     }
1898     if (!s->pending_byte_stream) {
1899       while (s->unprocessed_incoming_frames_buffer.length > 0 ||
1900              s->frame_storage.length > 0) {
1901         if (s->unprocessed_incoming_frames_buffer.length == 0) {
1902           grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer,
1903                                  &s->frame_storage);
1904           s->unprocessed_incoming_frames_decompressed = false;
1905         }
1906         if (!s->unprocessed_incoming_frames_decompressed &&
1907             s->stream_decompression_method !=
1908                 GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
1909           GPR_ASSERT(s->decompressed_data_buffer.length == 0);
1910           bool end_of_context;
1911           if (!s->stream_decompression_ctx) {
1912             s->stream_decompression_ctx =
1913                 grpc_stream_compression_context_create(
1914                     s->stream_decompression_method);
1915           }
1916           if (!grpc_stream_decompress(
1917                   s->stream_decompression_ctx,
1918                   &s->unprocessed_incoming_frames_buffer,
1919                   &s->decompressed_data_buffer, nullptr,
1920                   GRPC_HEADER_SIZE_IN_BYTES - s->decompressed_header_bytes,
1921                   &end_of_context)) {
1922             grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1923             grpc_slice_buffer_reset_and_unref_internal(
1924                 &s->unprocessed_incoming_frames_buffer);
1925             error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1926                 "Stream decompression error.");
1927           } else {
1928             s->decompressed_header_bytes += s->decompressed_data_buffer.length;
1929             if (s->decompressed_header_bytes == GRPC_HEADER_SIZE_IN_BYTES) {
1930               s->decompressed_header_bytes = 0;
1931             }
1932             error = grpc_deframe_unprocessed_incoming_frames(
1933                 &s->data_parser, s, &s->decompressed_data_buffer, nullptr,
1934                 s->recv_message);
1935             if (end_of_context) {
1936               grpc_stream_compression_context_destroy(
1937                   s->stream_decompression_ctx);
1938               s->stream_decompression_ctx = nullptr;
1939             }
1940           }
1941         } else {
1942           error = grpc_deframe_unprocessed_incoming_frames(
1943               &s->data_parser, s, &s->unprocessed_incoming_frames_buffer,
1944               nullptr, s->recv_message);
1945         }
1946         if (error != GRPC_ERROR_NONE) {
1947           s->seen_error = true;
1948           grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1949           grpc_slice_buffer_reset_and_unref_internal(
1950               &s->unprocessed_incoming_frames_buffer);
1951           break;
1952         } else if (*s->recv_message != nullptr) {
1953           break;
1954         }
1955       }
1956     }
1957     // save the length of the buffer before handing control back to application
1958     // threads. Needed to support correct flow control bookkeeping
1959     s->unprocessed_incoming_frames_buffer_cached_length =
1960         s->unprocessed_incoming_frames_buffer.length;
1961     if (error == GRPC_ERROR_NONE && *s->recv_message != nullptr) {
1962       null_then_sched_closure(&s->recv_message_ready);
1963     } else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) {
1964       *s->recv_message = nullptr;
1965       null_then_sched_closure(&s->recv_message_ready);
1966     }
1967     GRPC_ERROR_UNREF(error);
1968   }
1969 }
1970 
grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1971 void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
1972                                                        grpc_chttp2_stream* s) {
1973   grpc_chttp2_maybe_complete_recv_message(t, s);
1974   if (s->recv_trailing_metadata_finished != nullptr && s->read_closed &&
1975       s->write_closed) {
1976     if (s->seen_error || !t->is_client) {
1977       grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1978       if (!s->pending_byte_stream) {
1979         grpc_slice_buffer_reset_and_unref_internal(
1980             &s->unprocessed_incoming_frames_buffer);
1981       }
1982     }
1983     bool pending_data = s->pending_byte_stream ||
1984                         s->unprocessed_incoming_frames_buffer.length > 0;
1985     if (s->read_closed && s->frame_storage.length > 0 && !pending_data &&
1986         !s->seen_error && s->recv_trailing_metadata_finished != nullptr) {
1987       /* Maybe some SYNC_FLUSH data is left in frame_storage. Consume them and
1988        * maybe decompress the next 5 bytes in the stream. */
1989       if (s->stream_decompression_method ==
1990           GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
1991         grpc_slice_buffer_move_first(
1992             &s->frame_storage,
1993             GPR_MIN(s->frame_storage.length, GRPC_HEADER_SIZE_IN_BYTES),
1994             &s->unprocessed_incoming_frames_buffer);
1995         if (s->unprocessed_incoming_frames_buffer.length > 0) {
1996           s->unprocessed_incoming_frames_decompressed = true;
1997           pending_data = true;
1998         }
1999       } else {
2000         bool end_of_context;
2001         if (!s->stream_decompression_ctx) {
2002           s->stream_decompression_ctx = grpc_stream_compression_context_create(
2003               s->stream_decompression_method);
2004         }
2005         if (!grpc_stream_decompress(
2006                 s->stream_decompression_ctx, &s->frame_storage,
2007                 &s->unprocessed_incoming_frames_buffer, nullptr,
2008                 GRPC_HEADER_SIZE_IN_BYTES, &end_of_context)) {
2009           grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
2010           grpc_slice_buffer_reset_and_unref_internal(
2011               &s->unprocessed_incoming_frames_buffer);
2012           s->seen_error = true;
2013         } else {
2014           if (s->unprocessed_incoming_frames_buffer.length > 0) {
2015             s->unprocessed_incoming_frames_decompressed = true;
2016             pending_data = true;
2017           }
2018           if (end_of_context) {
2019             grpc_stream_compression_context_destroy(
2020                 s->stream_decompression_ctx);
2021             s->stream_decompression_ctx = nullptr;
2022           }
2023         }
2024       }
2025     }
2026     if (s->read_closed && s->frame_storage.length == 0 && !pending_data &&
2027         s->recv_trailing_metadata_finished != nullptr) {
2028       grpc_transport_move_stats(&s->stats, s->collecting_stats);
2029       s->collecting_stats = nullptr;
2030       grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[1],
2031                                                    s->recv_trailing_metadata);
2032       null_then_sched_closure(&s->recv_trailing_metadata_finished);
2033     }
2034   }
2035 }
2036 
remove_stream(grpc_chttp2_transport * t,uint32_t id,grpc_error * error)2037 static void remove_stream(grpc_chttp2_transport* t, uint32_t id,
2038                           grpc_error* error) {
2039   grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(
2040       grpc_chttp2_stream_map_delete(&t->stream_map, id));
2041   GPR_DEBUG_ASSERT(s);
2042   if (t->incoming_stream == s) {
2043     t->incoming_stream = nullptr;
2044     grpc_chttp2_parsing_become_skip_parser(t);
2045   }
2046   if (s->pending_byte_stream) {
2047     if (s->on_next != nullptr) {
2048       grpc_core::Chttp2IncomingByteStream* bs = s->data_parser.parsing_frame;
2049       if (error == GRPC_ERROR_NONE) {
2050         error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
2051       }
2052       bs->PublishError(error);
2053       bs->Unref();
2054       s->data_parser.parsing_frame = nullptr;
2055     } else {
2056       GRPC_ERROR_UNREF(s->byte_stream_error);
2057       s->byte_stream_error = GRPC_ERROR_REF(error);
2058     }
2059   }
2060 
2061   if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
2062     post_benign_reclaimer(t);
2063     if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SENT) {
2064       close_transport_locked(
2065           t, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2066                  "Last stream closed after sending GOAWAY", &error, 1));
2067     }
2068   }
2069   if (grpc_chttp2_list_remove_writable_stream(t, s)) {
2070     GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:remove_stream");
2071   }
2072 
2073   GRPC_ERROR_UNREF(error);
2074 
2075   maybe_start_some_streams(t);
2076 }
2077 
grpc_chttp2_cancel_stream(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error * due_to_error)2078 void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2079                                grpc_error* due_to_error) {
2080   if (!t->is_client && !s->sent_trailing_metadata &&
2081       grpc_error_has_clear_grpc_status(due_to_error)) {
2082     close_from_api(t, s, due_to_error);
2083     return;
2084   }
2085 
2086   if (!s->read_closed || !s->write_closed) {
2087     if (s->id != 0) {
2088       grpc_http2_error_code http_error;
2089       grpc_error_get_status(due_to_error, s->deadline, nullptr, nullptr,
2090                             &http_error, nullptr);
2091       grpc_chttp2_add_rst_stream_to_next_write(
2092           t, s->id, static_cast<uint32_t>(http_error), &s->stats.outgoing);
2093       grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
2094     }
2095   }
2096   if (due_to_error != GRPC_ERROR_NONE && !s->seen_error) {
2097     s->seen_error = true;
2098   }
2099   grpc_chttp2_mark_stream_closed(t, s, 1, 1, due_to_error);
2100 }
2101 
grpc_chttp2_fake_status(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error * error)2102 void grpc_chttp2_fake_status(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2103                              grpc_error* error) {
2104   grpc_status_code status;
2105   grpc_slice slice;
2106   grpc_error_get_status(error, s->deadline, &status, &slice, nullptr, nullptr);
2107   if (status != GRPC_STATUS_OK) {
2108     s->seen_error = true;
2109   }
2110   /* stream_global->recv_trailing_metadata_finished gives us a
2111      last chance replacement: we've received trailing metadata,
2112      but something more important has become available to signal
2113      to the upper layers - drop what we've got, and then publish
2114      what we want - which is safe because we haven't told anyone
2115      about the metadata yet */
2116   if (s->published_metadata[1] == GRPC_METADATA_NOT_PUBLISHED ||
2117       s->recv_trailing_metadata_finished != nullptr) {
2118     char status_string[GPR_LTOA_MIN_BUFSIZE];
2119     gpr_ltoa(status, status_string);
2120     GRPC_LOG_IF_ERROR("add_status",
2121                       grpc_chttp2_incoming_metadata_buffer_replace_or_add(
2122                           &s->metadata_buffer[1],
2123                           grpc_mdelem_from_slices(
2124                               GRPC_MDSTR_GRPC_STATUS,
2125                               grpc_core::UnmanagedMemorySlice(status_string))));
2126     if (!GRPC_SLICE_IS_EMPTY(slice)) {
2127       GRPC_LOG_IF_ERROR(
2128           "add_status_message",
2129           grpc_chttp2_incoming_metadata_buffer_replace_or_add(
2130               &s->metadata_buffer[1],
2131               grpc_mdelem_create(GRPC_MDSTR_GRPC_MESSAGE, slice, nullptr)));
2132     }
2133     s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE;
2134     grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2135   }
2136 
2137   GRPC_ERROR_UNREF(error);
2138 }
2139 
add_error(grpc_error * error,grpc_error ** refs,size_t * nrefs)2140 static void add_error(grpc_error* error, grpc_error** refs, size_t* nrefs) {
2141   if (error == GRPC_ERROR_NONE) return;
2142   for (size_t i = 0; i < *nrefs; i++) {
2143     if (error == refs[i]) {
2144       return;
2145     }
2146   }
2147   refs[*nrefs] = error;
2148   ++*nrefs;
2149 }
2150 
removal_error(grpc_error * extra_error,grpc_chttp2_stream * s,const char * master_error_msg)2151 static grpc_error* removal_error(grpc_error* extra_error, grpc_chttp2_stream* s,
2152                                  const char* master_error_msg) {
2153   grpc_error* refs[3];
2154   size_t nrefs = 0;
2155   add_error(s->read_closed_error, refs, &nrefs);
2156   add_error(s->write_closed_error, refs, &nrefs);
2157   add_error(extra_error, refs, &nrefs);
2158   grpc_error* error = GRPC_ERROR_NONE;
2159   if (nrefs > 0) {
2160     error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(master_error_msg,
2161                                                              refs, nrefs);
2162   }
2163   GRPC_ERROR_UNREF(extra_error);
2164   return error;
2165 }
2166 
flush_write_list(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_chttp2_write_cb ** list,grpc_error * error)2167 static void flush_write_list(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2168                              grpc_chttp2_write_cb** list, grpc_error* error) {
2169   while (*list) {
2170     grpc_chttp2_write_cb* cb = *list;
2171     *list = cb->next;
2172     grpc_chttp2_complete_closure_step(t, s, &cb->closure, GRPC_ERROR_REF(error),
2173                                       "on_write_finished_cb");
2174     cb->next = t->write_cb_pool;
2175     t->write_cb_pool = cb;
2176   }
2177   GRPC_ERROR_UNREF(error);
2178 }
2179 
grpc_chttp2_fail_pending_writes(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error * error)2180 void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t,
2181                                      grpc_chttp2_stream* s, grpc_error* error) {
2182   error =
2183       removal_error(error, s, "Pending writes failed due to stream closure");
2184   s->send_initial_metadata = nullptr;
2185   grpc_chttp2_complete_closure_step(t, s, &s->send_initial_metadata_finished,
2186                                     GRPC_ERROR_REF(error),
2187                                     "send_initial_metadata_finished");
2188 
2189   s->send_trailing_metadata = nullptr;
2190   s->sent_trailing_metadata_op = nullptr;
2191   grpc_chttp2_complete_closure_step(t, s, &s->send_trailing_metadata_finished,
2192                                     GRPC_ERROR_REF(error),
2193                                     "send_trailing_metadata_finished");
2194 
2195   s->fetching_send_message.reset();
2196   grpc_chttp2_complete_closure_step(t, s, &s->fetching_send_message_finished,
2197                                     GRPC_ERROR_REF(error),
2198                                     "fetching_send_message_finished");
2199   flush_write_list(t, s, &s->on_write_finished_cbs, GRPC_ERROR_REF(error));
2200   flush_write_list(t, s, &s->on_flow_controlled_cbs, error);
2201 }
2202 
grpc_chttp2_mark_stream_closed(grpc_chttp2_transport * t,grpc_chttp2_stream * s,int close_reads,int close_writes,grpc_error * error)2203 void grpc_chttp2_mark_stream_closed(grpc_chttp2_transport* t,
2204                                     grpc_chttp2_stream* s, int close_reads,
2205                                     int close_writes, grpc_error* error) {
2206   if (s->read_closed && s->write_closed) {
2207     /* already closed, but we should still fake the status if needed. */
2208     grpc_error* overall_error = removal_error(error, s, "Stream removed");
2209     if (overall_error != GRPC_ERROR_NONE) {
2210       grpc_chttp2_fake_status(t, s, overall_error);
2211     }
2212     grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2213     return;
2214   }
2215   bool closed_read = false;
2216   bool became_closed = false;
2217   if (close_reads && !s->read_closed) {
2218     s->read_closed_error = GRPC_ERROR_REF(error);
2219     s->read_closed = true;
2220     closed_read = true;
2221   }
2222   if (close_writes && !s->write_closed) {
2223     s->write_closed_error = GRPC_ERROR_REF(error);
2224     s->write_closed = true;
2225     grpc_chttp2_fail_pending_writes(t, s, GRPC_ERROR_REF(error));
2226   }
2227   if (s->read_closed && s->write_closed) {
2228     became_closed = true;
2229     grpc_error* overall_error =
2230         removal_error(GRPC_ERROR_REF(error), s, "Stream removed");
2231     if (s->id != 0) {
2232       remove_stream(t, s->id, GRPC_ERROR_REF(overall_error));
2233     } else {
2234       /* Purge streams waiting on concurrency still waiting for id assignment */
2235       grpc_chttp2_list_remove_waiting_for_concurrency(t, s);
2236     }
2237     if (overall_error != GRPC_ERROR_NONE) {
2238       grpc_chttp2_fake_status(t, s, overall_error);
2239     }
2240   }
2241   if (closed_read) {
2242     for (int i = 0; i < 2; i++) {
2243       if (s->published_metadata[i] == GRPC_METADATA_NOT_PUBLISHED) {
2244         s->published_metadata[i] = GRPC_METADATA_PUBLISHED_AT_CLOSE;
2245       }
2246     }
2247     grpc_chttp2_maybe_complete_recv_initial_metadata(t, s);
2248     grpc_chttp2_maybe_complete_recv_message(t, s);
2249   }
2250   if (became_closed) {
2251     grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2252     GRPC_CHTTP2_STREAM_UNREF(s, "chttp2");
2253   }
2254   GRPC_ERROR_UNREF(error);
2255 }
2256 
close_from_api(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error * error)2257 static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2258                            grpc_error* error) {
2259   grpc_slice hdr;
2260   grpc_slice status_hdr;
2261   grpc_slice http_status_hdr;
2262   grpc_slice content_type_hdr;
2263   grpc_slice message_pfx;
2264   uint8_t* p;
2265   uint32_t len = 0;
2266   grpc_status_code grpc_status;
2267   grpc_slice slice;
2268   grpc_error_get_status(error, s->deadline, &grpc_status, &slice, nullptr,
2269                         nullptr);
2270 
2271   GPR_ASSERT(grpc_status >= 0 && (int)grpc_status < 100);
2272 
2273   /* Hand roll a header block.
2274      This is unnecessarily ugly - at some point we should find a more
2275      elegant solution.
2276      It's complicated by the fact that our send machinery would be dead by
2277      the time we got around to sending this, so instead we ignore HPACK
2278      compression and just write the uncompressed bytes onto the wire. */
2279   if (!s->sent_initial_metadata) {
2280     http_status_hdr = GRPC_SLICE_MALLOC(13);
2281     p = GRPC_SLICE_START_PTR(http_status_hdr);
2282     *p++ = 0x00;
2283     *p++ = 7;
2284     *p++ = ':';
2285     *p++ = 's';
2286     *p++ = 't';
2287     *p++ = 'a';
2288     *p++ = 't';
2289     *p++ = 'u';
2290     *p++ = 's';
2291     *p++ = 3;
2292     *p++ = '2';
2293     *p++ = '0';
2294     *p++ = '0';
2295     GPR_ASSERT(p == GRPC_SLICE_END_PTR(http_status_hdr));
2296     len += static_cast<uint32_t> GRPC_SLICE_LENGTH(http_status_hdr);
2297 
2298     content_type_hdr = GRPC_SLICE_MALLOC(31);
2299     p = GRPC_SLICE_START_PTR(content_type_hdr);
2300     *p++ = 0x00;
2301     *p++ = 12;
2302     *p++ = 'c';
2303     *p++ = 'o';
2304     *p++ = 'n';
2305     *p++ = 't';
2306     *p++ = 'e';
2307     *p++ = 'n';
2308     *p++ = 't';
2309     *p++ = '-';
2310     *p++ = 't';
2311     *p++ = 'y';
2312     *p++ = 'p';
2313     *p++ = 'e';
2314     *p++ = 16;
2315     *p++ = 'a';
2316     *p++ = 'p';
2317     *p++ = 'p';
2318     *p++ = 'l';
2319     *p++ = 'i';
2320     *p++ = 'c';
2321     *p++ = 'a';
2322     *p++ = 't';
2323     *p++ = 'i';
2324     *p++ = 'o';
2325     *p++ = 'n';
2326     *p++ = '/';
2327     *p++ = 'g';
2328     *p++ = 'r';
2329     *p++ = 'p';
2330     *p++ = 'c';
2331     GPR_ASSERT(p == GRPC_SLICE_END_PTR(content_type_hdr));
2332     len += static_cast<uint32_t> GRPC_SLICE_LENGTH(content_type_hdr);
2333   }
2334 
2335   status_hdr = GRPC_SLICE_MALLOC(15 + (grpc_status >= 10));
2336   p = GRPC_SLICE_START_PTR(status_hdr);
2337   *p++ = 0x00; /* literal header, not indexed */
2338   *p++ = 11;   /* len(grpc-status) */
2339   *p++ = 'g';
2340   *p++ = 'r';
2341   *p++ = 'p';
2342   *p++ = 'c';
2343   *p++ = '-';
2344   *p++ = 's';
2345   *p++ = 't';
2346   *p++ = 'a';
2347   *p++ = 't';
2348   *p++ = 'u';
2349   *p++ = 's';
2350   if (grpc_status < 10) {
2351     *p++ = 1;
2352     *p++ = static_cast<uint8_t>('0' + grpc_status);
2353   } else {
2354     *p++ = 2;
2355     *p++ = static_cast<uint8_t>('0' + (grpc_status / 10));
2356     *p++ = static_cast<uint8_t>('0' + (grpc_status % 10));
2357   }
2358   GPR_ASSERT(p == GRPC_SLICE_END_PTR(status_hdr));
2359   len += static_cast<uint32_t> GRPC_SLICE_LENGTH(status_hdr);
2360 
2361   size_t msg_len = GRPC_SLICE_LENGTH(slice);
2362   GPR_ASSERT(msg_len <= UINT32_MAX);
2363   uint32_t msg_len_len = GRPC_CHTTP2_VARINT_LENGTH((uint32_t)msg_len, 1);
2364   message_pfx = GRPC_SLICE_MALLOC(14 + msg_len_len);
2365   p = GRPC_SLICE_START_PTR(message_pfx);
2366   *p++ = 0x00; /* literal header, not indexed */
2367   *p++ = 12;   /* len(grpc-message) */
2368   *p++ = 'g';
2369   *p++ = 'r';
2370   *p++ = 'p';
2371   *p++ = 'c';
2372   *p++ = '-';
2373   *p++ = 'm';
2374   *p++ = 'e';
2375   *p++ = 's';
2376   *p++ = 's';
2377   *p++ = 'a';
2378   *p++ = 'g';
2379   *p++ = 'e';
2380   GRPC_CHTTP2_WRITE_VARINT((uint32_t)msg_len, 1, 0, p, (uint32_t)msg_len_len);
2381   p += msg_len_len;
2382   GPR_ASSERT(p == GRPC_SLICE_END_PTR(message_pfx));
2383   len += static_cast<uint32_t> GRPC_SLICE_LENGTH(message_pfx);
2384   len += static_cast<uint32_t>(msg_len);
2385 
2386   hdr = GRPC_SLICE_MALLOC(9);
2387   p = GRPC_SLICE_START_PTR(hdr);
2388   *p++ = static_cast<uint8_t>(len >> 16);
2389   *p++ = static_cast<uint8_t>(len >> 8);
2390   *p++ = static_cast<uint8_t>(len);
2391   *p++ = GRPC_CHTTP2_FRAME_HEADER;
2392   *p++ = GRPC_CHTTP2_DATA_FLAG_END_STREAM | GRPC_CHTTP2_DATA_FLAG_END_HEADERS;
2393   *p++ = static_cast<uint8_t>(s->id >> 24);
2394   *p++ = static_cast<uint8_t>(s->id >> 16);
2395   *p++ = static_cast<uint8_t>(s->id >> 8);
2396   *p++ = static_cast<uint8_t>(s->id);
2397   GPR_ASSERT(p == GRPC_SLICE_END_PTR(hdr));
2398 
2399   grpc_slice_buffer_add(&t->qbuf, hdr);
2400   if (!s->sent_initial_metadata) {
2401     grpc_slice_buffer_add(&t->qbuf, http_status_hdr);
2402     grpc_slice_buffer_add(&t->qbuf, content_type_hdr);
2403   }
2404   grpc_slice_buffer_add(&t->qbuf, status_hdr);
2405   grpc_slice_buffer_add(&t->qbuf, message_pfx);
2406   grpc_slice_buffer_add(&t->qbuf, grpc_slice_ref_internal(slice));
2407   grpc_chttp2_reset_ping_clock(t);
2408   grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR,
2409                                            &s->stats.outgoing);
2410 
2411   grpc_chttp2_mark_stream_closed(t, s, 1, 1, error);
2412   grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API);
2413 }
2414 
2415 struct cancel_stream_cb_args {
2416   grpc_error* error;
2417   grpc_chttp2_transport* t;
2418 };
2419 
cancel_stream_cb(void * user_data,uint32_t,void * stream)2420 static void cancel_stream_cb(void* user_data, uint32_t /*key*/, void* stream) {
2421   cancel_stream_cb_args* args = static_cast<cancel_stream_cb_args*>(user_data);
2422   grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(stream);
2423   grpc_chttp2_cancel_stream(args->t, s, GRPC_ERROR_REF(args->error));
2424 }
2425 
end_all_the_calls(grpc_chttp2_transport * t,grpc_error * error)2426 static void end_all_the_calls(grpc_chttp2_transport* t, grpc_error* error) {
2427   intptr_t http2_error;
2428   // If there is no explicit grpc or HTTP/2 error, set to UNAVAILABLE on server.
2429   if (!t->is_client && !grpc_error_has_clear_grpc_status(error) &&
2430       !grpc_error_get_int(error, GRPC_ERROR_INT_HTTP2_ERROR, &http2_error)) {
2431     error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
2432                                GRPC_STATUS_UNAVAILABLE);
2433   }
2434   cancel_stream_cb_args args = {error, t};
2435   grpc_chttp2_stream_map_for_each(&t->stream_map, cancel_stream_cb, &args);
2436   GRPC_ERROR_UNREF(error);
2437 }
2438 
2439 /*******************************************************************************
2440  * INPUT PROCESSING - PARSING
2441  */
2442 
2443 template <class F>
WithUrgency(grpc_chttp2_transport * t,grpc_core::chttp2::FlowControlAction::Urgency urgency,grpc_chttp2_initiate_write_reason reason,F action)2444 static void WithUrgency(grpc_chttp2_transport* t,
2445                         grpc_core::chttp2::FlowControlAction::Urgency urgency,
2446                         grpc_chttp2_initiate_write_reason reason, F action) {
2447   switch (urgency) {
2448     case grpc_core::chttp2::FlowControlAction::Urgency::NO_ACTION_NEEDED:
2449       break;
2450     case grpc_core::chttp2::FlowControlAction::Urgency::UPDATE_IMMEDIATELY:
2451       grpc_chttp2_initiate_write(t, reason);
2452     // fallthrough
2453     case grpc_core::chttp2::FlowControlAction::Urgency::QUEUE_UPDATE:
2454       action();
2455       break;
2456   }
2457 }
2458 
grpc_chttp2_act_on_flowctl_action(const grpc_core::chttp2::FlowControlAction & action,grpc_chttp2_transport * t,grpc_chttp2_stream * s)2459 void grpc_chttp2_act_on_flowctl_action(
2460     const grpc_core::chttp2::FlowControlAction& action,
2461     grpc_chttp2_transport* t, grpc_chttp2_stream* s) {
2462   WithUrgency(t, action.send_stream_update(),
2463               GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL,
2464               [t, s]() { grpc_chttp2_mark_stream_writable(t, s); });
2465   WithUrgency(t, action.send_transport_update(),
2466               GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL, []() {});
2467   WithUrgency(t, action.send_initial_window_update(),
2468               GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
2469                 queue_setting_update(t,
2470                                      GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
2471                                      action.initial_window_size());
2472               });
2473   WithUrgency(t, action.send_max_frame_size_update(),
2474               GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
2475                 queue_setting_update(t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
2476                                      action.max_frame_size());
2477               });
2478 }
2479 
try_http_parsing(grpc_chttp2_transport * t)2480 static grpc_error* try_http_parsing(grpc_chttp2_transport* t) {
2481   grpc_http_parser parser;
2482   size_t i = 0;
2483   grpc_error* error = GRPC_ERROR_NONE;
2484   grpc_http_response response;
2485 
2486   grpc_http_parser_init(&parser, GRPC_HTTP_RESPONSE, &response);
2487 
2488   grpc_error* parse_error = GRPC_ERROR_NONE;
2489   for (; i < t->read_buffer.count && parse_error == GRPC_ERROR_NONE; i++) {
2490     parse_error =
2491         grpc_http_parser_parse(&parser, t->read_buffer.slices[i], nullptr);
2492   }
2493   if (parse_error == GRPC_ERROR_NONE &&
2494       (parse_error = grpc_http_parser_eof(&parser)) == GRPC_ERROR_NONE) {
2495     error = grpc_error_set_int(
2496         grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2497                                "Trying to connect an http1.x server"),
2498                            GRPC_ERROR_INT_HTTP_STATUS, response.status),
2499         GRPC_ERROR_INT_GRPC_STATUS,
2500         grpc_http2_status_to_grpc_status(response.status));
2501   }
2502   GRPC_ERROR_UNREF(parse_error);
2503 
2504   grpc_http_parser_destroy(&parser);
2505   grpc_http_response_destroy(&response);
2506   return error;
2507 }
2508 
read_action(void * tp,grpc_error * error)2509 static void read_action(void* tp, grpc_error* error) {
2510   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2511   t->combiner->Run(
2512       GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t, nullptr),
2513       GRPC_ERROR_REF(error));
2514 }
2515 
read_action_locked(void * tp,grpc_error * error)2516 static void read_action_locked(void* tp, grpc_error* error) {
2517   GPR_TIMER_SCOPE("reading_action_locked", 0);
2518 
2519   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2520 
2521   GRPC_ERROR_REF(error);
2522 
2523   grpc_error* err = error;
2524   if (err != GRPC_ERROR_NONE) {
2525     err = grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2526                                  "Endpoint read failed", &err, 1),
2527                              GRPC_ERROR_INT_OCCURRED_DURING_WRITE,
2528                              t->write_state);
2529   }
2530   GPR_SWAP(grpc_error*, err, error);
2531   GRPC_ERROR_UNREF(err);
2532   if (t->closed_with_error == GRPC_ERROR_NONE) {
2533     GPR_TIMER_SCOPE("reading_action.parse", 0);
2534     size_t i = 0;
2535     grpc_error* errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE,
2536                              GRPC_ERROR_NONE};
2537     for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) {
2538       grpc_core::BdpEstimator* bdp_est = t->flow_control->bdp_estimator();
2539       if (bdp_est) {
2540         bdp_est->AddIncomingBytes(
2541             static_cast<int64_t> GRPC_SLICE_LENGTH(t->read_buffer.slices[i]));
2542       }
2543       errors[1] = grpc_chttp2_perform_read(t, t->read_buffer.slices[i]);
2544     }
2545     if (errors[1] != GRPC_ERROR_NONE) {
2546       errors[2] = try_http_parsing(t);
2547       GRPC_ERROR_UNREF(error);
2548       error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2549           "Failed parsing HTTP/2", errors, GPR_ARRAY_SIZE(errors));
2550     }
2551     for (i = 0; i < GPR_ARRAY_SIZE(errors); i++) {
2552       GRPC_ERROR_UNREF(errors[i]);
2553     }
2554 
2555     GPR_TIMER_SCOPE("post_parse_locked", 0);
2556     if (t->initial_window_update != 0) {
2557       if (t->initial_window_update > 0) {
2558         grpc_chttp2_stream* s;
2559         while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) {
2560           grpc_chttp2_mark_stream_writable(t, s);
2561           grpc_chttp2_initiate_write(
2562               t, GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING);
2563         }
2564       }
2565       t->initial_window_update = 0;
2566     }
2567   }
2568 
2569   GPR_TIMER_SCOPE("post_reading_action_locked", 0);
2570   bool keep_reading = false;
2571   if (error == GRPC_ERROR_NONE && t->closed_with_error != GRPC_ERROR_NONE) {
2572     error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2573         "Transport closed", &t->closed_with_error, 1);
2574   }
2575   if (error != GRPC_ERROR_NONE) {
2576     /* If a goaway frame was received, this might be the reason why the read
2577      * failed. Add this info to the error */
2578     if (t->goaway_error != GRPC_ERROR_NONE) {
2579       error = grpc_error_add_child(error, GRPC_ERROR_REF(t->goaway_error));
2580     }
2581 
2582     close_transport_locked(t, GRPC_ERROR_REF(error));
2583     t->endpoint_reading = 0;
2584   } else if (t->closed_with_error == GRPC_ERROR_NONE) {
2585     keep_reading = true;
2586     /* Since we have read a byte, reset the keepalive timer */
2587     if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
2588       grpc_timer_cancel(&t->keepalive_ping_timer);
2589     }
2590   }
2591   grpc_slice_buffer_reset_and_unref_internal(&t->read_buffer);
2592 
2593   if (keep_reading) {
2594     if (t->num_pending_induced_frames >= DEFAULT_MAX_PENDING_INDUCED_FRAMES) {
2595       t->reading_paused_on_pending_induced_frames = true;
2596       GRPC_CHTTP2_IF_TRACING(
2597           gpr_log(GPR_INFO,
2598                   "transport %p : Pausing reading due to too "
2599                   "many unwritten SETTINGS ACK and RST_STREAM frames",
2600                   t));
2601     } else {
2602       continue_read_action_locked(t);
2603     }
2604   } else {
2605     GRPC_CHTTP2_UNREF_TRANSPORT(t, "reading_action");
2606   }
2607 
2608   GRPC_ERROR_UNREF(error);
2609 }
2610 
continue_read_action_locked(grpc_chttp2_transport * t)2611 static void continue_read_action_locked(grpc_chttp2_transport* t) {
2612   const bool urgent = t->goaway_error != GRPC_ERROR_NONE;
2613   GRPC_CLOSURE_INIT(&t->read_action_locked, read_action, t,
2614                     grpc_schedule_on_exec_ctx);
2615   grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action_locked, urgent);
2616   grpc_chttp2_act_on_flowctl_action(t->flow_control->MakeAction(), t, nullptr);
2617 }
2618 
2619 // t is reffed prior to calling the first time, and once the callback chain
2620 // that kicks off finishes, it's unreffed
schedule_bdp_ping_locked(grpc_chttp2_transport * t)2621 static void schedule_bdp_ping_locked(grpc_chttp2_transport* t) {
2622   t->flow_control->bdp_estimator()->SchedulePing();
2623   send_ping_locked(
2624       t,
2625       GRPC_CLOSURE_INIT(&t->start_bdp_ping_locked, start_bdp_ping, t,
2626                         grpc_schedule_on_exec_ctx),
2627       GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked, finish_bdp_ping, t,
2628                         grpc_schedule_on_exec_ctx));
2629 }
2630 
start_bdp_ping(void * tp,grpc_error * error)2631 static void start_bdp_ping(void* tp, grpc_error* error) {
2632   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2633   t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_bdp_ping_locked,
2634                                      start_bdp_ping_locked, t, nullptr),
2635                    GRPC_ERROR_REF(error));
2636 }
2637 
start_bdp_ping_locked(void * tp,grpc_error * error)2638 static void start_bdp_ping_locked(void* tp, grpc_error* error) {
2639   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2640   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
2641     gpr_log(GPR_INFO, "%s: Start BDP ping err=%s", t->peer_string,
2642             grpc_error_string(error));
2643   }
2644   if (error != GRPC_ERROR_NONE || t->closed_with_error != GRPC_ERROR_NONE) {
2645     return;
2646   }
2647   /* Reset the keepalive ping timer */
2648   if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
2649     grpc_timer_cancel(&t->keepalive_ping_timer);
2650   }
2651   t->flow_control->bdp_estimator()->StartPing();
2652   t->bdp_ping_started = true;
2653 }
2654 
finish_bdp_ping(void * tp,grpc_error * error)2655 static void finish_bdp_ping(void* tp, grpc_error* error) {
2656   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2657   t->combiner->Run(GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked,
2658                                      finish_bdp_ping_locked, t, nullptr),
2659                    GRPC_ERROR_REF(error));
2660 }
2661 
finish_bdp_ping_locked(void * tp,grpc_error * error)2662 static void finish_bdp_ping_locked(void* tp, grpc_error* error) {
2663   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2664   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
2665     gpr_log(GPR_INFO, "%s: Complete BDP ping err=%s", t->peer_string,
2666             grpc_error_string(error));
2667   }
2668   if (error != GRPC_ERROR_NONE || t->closed_with_error != GRPC_ERROR_NONE) {
2669     GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
2670     return;
2671   }
2672   if (!t->bdp_ping_started) {
2673     /* start_bdp_ping_locked has not been run yet. Schedule
2674      * finish_bdp_ping_locked to be run later. */
2675     t->combiner->Run(GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked,
2676                                        finish_bdp_ping_locked, t, nullptr),
2677                      GRPC_ERROR_REF(error));
2678     return;
2679   }
2680   t->bdp_ping_started = false;
2681   grpc_millis next_ping = t->flow_control->bdp_estimator()->CompletePing();
2682   grpc_chttp2_act_on_flowctl_action(t->flow_control->PeriodicUpdate(), t,
2683                                     nullptr);
2684   GPR_ASSERT(!t->have_next_bdp_ping_timer);
2685   t->have_next_bdp_ping_timer = true;
2686   GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked,
2687                     next_bdp_ping_timer_expired, t, grpc_schedule_on_exec_ctx);
2688   grpc_timer_init(&t->next_bdp_ping_timer, next_ping,
2689                   &t->next_bdp_ping_timer_expired_locked);
2690 }
2691 
next_bdp_ping_timer_expired(void * tp,grpc_error * error)2692 static void next_bdp_ping_timer_expired(void* tp, grpc_error* error) {
2693   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2694   t->combiner->Run(
2695       GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked,
2696                         next_bdp_ping_timer_expired_locked, t, nullptr),
2697       GRPC_ERROR_REF(error));
2698 }
2699 
next_bdp_ping_timer_expired_locked(void * tp,grpc_error * error)2700 static void next_bdp_ping_timer_expired_locked(void* tp, grpc_error* error) {
2701   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2702   GPR_ASSERT(t->have_next_bdp_ping_timer);
2703   t->have_next_bdp_ping_timer = false;
2704   if (error != GRPC_ERROR_NONE) {
2705     GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
2706     return;
2707   }
2708   schedule_bdp_ping_locked(t);
2709 }
2710 
grpc_chttp2_config_default_keepalive_args(grpc_channel_args * args,bool is_client)2711 void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
2712                                                bool is_client) {
2713   size_t i;
2714   if (args) {
2715     for (i = 0; i < args->num_args; i++) {
2716       if (0 == strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) {
2717         const int value = grpc_channel_arg_get_integer(
2718             &args->args[i], {is_client ? g_default_client_keepalive_time_ms
2719                                        : g_default_server_keepalive_time_ms,
2720                              1, INT_MAX});
2721         if (is_client) {
2722           g_default_client_keepalive_time_ms = value;
2723         } else {
2724           g_default_server_keepalive_time_ms = value;
2725         }
2726       } else if (0 ==
2727                  strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) {
2728         const int value = grpc_channel_arg_get_integer(
2729             &args->args[i], {is_client ? g_default_client_keepalive_timeout_ms
2730                                        : g_default_server_keepalive_timeout_ms,
2731                              0, INT_MAX});
2732         if (is_client) {
2733           g_default_client_keepalive_timeout_ms = value;
2734         } else {
2735           g_default_server_keepalive_timeout_ms = value;
2736         }
2737       } else if (0 == strcmp(args->args[i].key,
2738                              GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) {
2739         const bool value = static_cast<uint32_t>(grpc_channel_arg_get_integer(
2740             &args->args[i],
2741             {is_client ? g_default_client_keepalive_permit_without_calls
2742                        : g_default_server_keepalive_timeout_ms,
2743              0, 1}));
2744         if (is_client) {
2745           g_default_client_keepalive_permit_without_calls = value;
2746         } else {
2747           g_default_server_keepalive_permit_without_calls = value;
2748         }
2749       } else if (0 ==
2750                  strcmp(args->args[i].key, GRPC_ARG_HTTP2_MAX_PING_STRIKES)) {
2751         g_default_max_ping_strikes = grpc_channel_arg_get_integer(
2752             &args->args[i], {g_default_max_ping_strikes, 0, INT_MAX});
2753       } else if (0 == strcmp(args->args[i].key,
2754                              GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) {
2755         g_default_max_pings_without_data = grpc_channel_arg_get_integer(
2756             &args->args[i], {g_default_max_pings_without_data, 0, INT_MAX});
2757       } else if (0 ==
2758                  strcmp(
2759                      args->args[i].key,
2760                      GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS)) {
2761         g_default_min_sent_ping_interval_without_data_ms =
2762             grpc_channel_arg_get_integer(
2763                 &args->args[i],
2764                 {g_default_min_sent_ping_interval_without_data_ms, 0, INT_MAX});
2765       } else if (0 ==
2766                  strcmp(
2767                      args->args[i].key,
2768                      GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS)) {
2769         g_default_min_recv_ping_interval_without_data_ms =
2770             grpc_channel_arg_get_integer(
2771                 &args->args[i],
2772                 {g_default_min_recv_ping_interval_without_data_ms, 0, INT_MAX});
2773       }
2774     }
2775   }
2776 }
2777 
init_keepalive_ping(void * arg,grpc_error * error)2778 static void init_keepalive_ping(void* arg, grpc_error* error) {
2779   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2780   t->combiner->Run(GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked,
2781                                      init_keepalive_ping_locked, t, nullptr),
2782                    GRPC_ERROR_REF(error));
2783 }
2784 
init_keepalive_ping_locked(void * arg,grpc_error * error)2785 static void init_keepalive_ping_locked(void* arg, grpc_error* error) {
2786   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2787   GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING);
2788   if (t->destroying || t->closed_with_error != GRPC_ERROR_NONE) {
2789     t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
2790   } else if (error == GRPC_ERROR_NONE) {
2791     if (t->keepalive_permit_without_calls ||
2792         grpc_chttp2_stream_map_size(&t->stream_map) > 0) {
2793       t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING;
2794       GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end");
2795       grpc_timer_init_unset(&t->keepalive_watchdog_timer);
2796       send_keepalive_ping_locked(t);
2797       grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING);
2798     } else {
2799       GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
2800       GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t,
2801                         grpc_schedule_on_exec_ctx);
2802       grpc_timer_init(&t->keepalive_ping_timer,
2803                       grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
2804                       &t->init_keepalive_ping_locked);
2805     }
2806   } else if (error == GRPC_ERROR_CANCELLED) {
2807     /* The keepalive ping timer may be cancelled by bdp */
2808     GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
2809     GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t,
2810                       grpc_schedule_on_exec_ctx);
2811     grpc_timer_init(&t->keepalive_ping_timer,
2812                     grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
2813                     &t->init_keepalive_ping_locked);
2814   }
2815   GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping");
2816 }
2817 
start_keepalive_ping(void * arg,grpc_error * error)2818 static void start_keepalive_ping(void* arg, grpc_error* error) {
2819   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2820   t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked,
2821                                      start_keepalive_ping_locked, t, nullptr),
2822                    GRPC_ERROR_REF(error));
2823 }
2824 
start_keepalive_ping_locked(void * arg,grpc_error * error)2825 static void start_keepalive_ping_locked(void* arg, grpc_error* error) {
2826   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2827   if (error != GRPC_ERROR_NONE) {
2828     return;
2829   }
2830   if (t->channelz_socket != nullptr) {
2831     t->channelz_socket->RecordKeepaliveSent();
2832   }
2833   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
2834       GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
2835     gpr_log(GPR_INFO, "%s: Start keepalive ping", t->peer_string);
2836   }
2837   GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog");
2838   GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked,
2839                     keepalive_watchdog_fired, t, grpc_schedule_on_exec_ctx);
2840   grpc_timer_init(&t->keepalive_watchdog_timer,
2841                   grpc_core::ExecCtx::Get()->Now() + t->keepalive_timeout,
2842                   &t->keepalive_watchdog_fired_locked);
2843   t->keepalive_ping_started = true;
2844 }
2845 
finish_keepalive_ping(void * arg,grpc_error * error)2846 static void finish_keepalive_ping(void* arg, grpc_error* error) {
2847   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2848   t->combiner->Run(GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked,
2849                                      finish_keepalive_ping_locked, t, nullptr),
2850                    GRPC_ERROR_REF(error));
2851 }
2852 
finish_keepalive_ping_locked(void * arg,grpc_error * error)2853 static void finish_keepalive_ping_locked(void* arg, grpc_error* error) {
2854   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2855   if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
2856     if (error == GRPC_ERROR_NONE) {
2857       if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
2858           GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
2859         gpr_log(GPR_INFO, "%s: Finish keepalive ping", t->peer_string);
2860       }
2861       if (!t->keepalive_ping_started) {
2862         /* start_keepalive_ping_locked has not run yet. Reschedule
2863          * finish_keepalive_ping_locked for it to be run later. */
2864         t->combiner->Run(
2865             GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked,
2866                               finish_keepalive_ping_locked, t, nullptr),
2867             GRPC_ERROR_REF(error));
2868         return;
2869       }
2870       t->keepalive_ping_started = false;
2871       t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
2872       grpc_timer_cancel(&t->keepalive_watchdog_timer);
2873       GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
2874       GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping, t,
2875                         grpc_schedule_on_exec_ctx);
2876       grpc_timer_init(&t->keepalive_ping_timer,
2877                       grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
2878                       &t->init_keepalive_ping_locked);
2879     }
2880   }
2881   GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive ping end");
2882 }
2883 
keepalive_watchdog_fired(void * arg,grpc_error * error)2884 static void keepalive_watchdog_fired(void* arg, grpc_error* error) {
2885   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2886   t->combiner->Run(
2887       GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked,
2888                         keepalive_watchdog_fired_locked, t, nullptr),
2889       GRPC_ERROR_REF(error));
2890 }
2891 
keepalive_watchdog_fired_locked(void * arg,grpc_error * error)2892 static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error) {
2893   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2894   if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
2895     if (error == GRPC_ERROR_NONE) {
2896       gpr_log(GPR_INFO, "%s: Keepalive watchdog fired. Closing transport.",
2897               t->peer_string);
2898       t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
2899       close_transport_locked(
2900           t, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2901                                     "keepalive watchdog timeout"),
2902                                 GRPC_ERROR_INT_GRPC_STATUS,
2903                                 GRPC_STATUS_UNAVAILABLE));
2904     }
2905   } else {
2906     /* The watchdog timer should have been cancelled by
2907      * finish_keepalive_ping_locked. */
2908     if (GPR_UNLIKELY(error != GRPC_ERROR_CANCELLED)) {
2909       gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)",
2910               t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING);
2911     }
2912   }
2913   GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog");
2914 }
2915 
2916 /*******************************************************************************
2917  * CALLBACK LOOP
2918  */
2919 
connectivity_state_set(grpc_chttp2_transport * t,grpc_connectivity_state state,const char * reason)2920 static void connectivity_state_set(grpc_chttp2_transport* t,
2921                                    grpc_connectivity_state state,
2922                                    const char* reason) {
2923   GRPC_CHTTP2_IF_TRACING(
2924       gpr_log(GPR_INFO, "transport %p set connectivity_state=%d", t, state));
2925   t->state_tracker.SetState(state, reason);
2926 }
2927 
2928 /*******************************************************************************
2929  * POLLSET STUFF
2930  */
2931 
set_pollset(grpc_transport * gt,grpc_stream *,grpc_pollset * pollset)2932 static void set_pollset(grpc_transport* gt, grpc_stream* /*gs*/,
2933                         grpc_pollset* pollset) {
2934   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
2935   grpc_endpoint_add_to_pollset(t->ep, pollset);
2936 }
2937 
set_pollset_set(grpc_transport * gt,grpc_stream *,grpc_pollset_set * pollset_set)2938 static void set_pollset_set(grpc_transport* gt, grpc_stream* /*gs*/,
2939                             grpc_pollset_set* pollset_set) {
2940   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
2941   grpc_endpoint_add_to_pollset_set(t->ep, pollset_set);
2942 }
2943 
2944 /*******************************************************************************
2945  * BYTE STREAM
2946  */
2947 
reset_byte_stream(void * arg,grpc_error * error)2948 static void reset_byte_stream(void* arg, grpc_error* error) {
2949   grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(arg);
2950   s->pending_byte_stream = false;
2951   if (error == GRPC_ERROR_NONE) {
2952     grpc_chttp2_maybe_complete_recv_message(s->t, s);
2953     grpc_chttp2_maybe_complete_recv_trailing_metadata(s->t, s);
2954   } else {
2955     GPR_ASSERT(error != GRPC_ERROR_NONE);
2956     grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->on_next, GRPC_ERROR_REF(error));
2957     s->on_next = nullptr;
2958     GRPC_ERROR_UNREF(s->byte_stream_error);
2959     s->byte_stream_error = GRPC_ERROR_NONE;
2960     grpc_chttp2_cancel_stream(s->t, s, GRPC_ERROR_REF(error));
2961     s->byte_stream_error = GRPC_ERROR_REF(error);
2962   }
2963 }
2964 
2965 namespace grpc_core {
2966 
Chttp2IncomingByteStream(grpc_chttp2_transport * transport,grpc_chttp2_stream * stream,uint32_t frame_size,uint32_t flags)2967 Chttp2IncomingByteStream::Chttp2IncomingByteStream(
2968     grpc_chttp2_transport* transport, grpc_chttp2_stream* stream,
2969     uint32_t frame_size, uint32_t flags)
2970     : ByteStream(frame_size, flags),
2971       transport_(transport),
2972       stream_(stream),
2973       refs_(2),
2974       remaining_bytes_(frame_size) {
2975   GRPC_ERROR_UNREF(stream->byte_stream_error);
2976   stream->byte_stream_error = GRPC_ERROR_NONE;
2977 }
2978 
OrphanLocked(void * arg,grpc_error *)2979 void Chttp2IncomingByteStream::OrphanLocked(void* arg,
2980                                             grpc_error* /*error_ignored*/) {
2981   Chttp2IncomingByteStream* bs = static_cast<Chttp2IncomingByteStream*>(arg);
2982   grpc_chttp2_stream* s = bs->stream_;
2983   grpc_chttp2_transport* t = s->t;
2984   bs->Unref();
2985   s->pending_byte_stream = false;
2986   grpc_chttp2_maybe_complete_recv_message(t, s);
2987   grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2988 }
2989 
Orphan()2990 void Chttp2IncomingByteStream::Orphan() {
2991   GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0);
2992   transport_->combiner->Run(
2993       GRPC_CLOSURE_INIT(&destroy_action_,
2994                         &Chttp2IncomingByteStream::OrphanLocked, this, nullptr),
2995       GRPC_ERROR_NONE);
2996 }
2997 
NextLocked(void * arg,grpc_error *)2998 void Chttp2IncomingByteStream::NextLocked(void* arg,
2999                                           grpc_error* /*error_ignored*/) {
3000   Chttp2IncomingByteStream* bs = static_cast<Chttp2IncomingByteStream*>(arg);
3001   grpc_chttp2_transport* t = bs->transport_;
3002   grpc_chttp2_stream* s = bs->stream_;
3003   size_t cur_length = s->frame_storage.length;
3004   if (!s->read_closed) {
3005     s->flow_control->IncomingByteStreamUpdate(bs->next_action_.max_size_hint,
3006                                               cur_length);
3007     grpc_chttp2_act_on_flowctl_action(s->flow_control->MakeAction(), t, s);
3008   }
3009   GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0);
3010   if (s->frame_storage.length > 0) {
3011     grpc_slice_buffer_swap(&s->frame_storage,
3012                            &s->unprocessed_incoming_frames_buffer);
3013     s->unprocessed_incoming_frames_decompressed = false;
3014     grpc_core::ExecCtx::Run(DEBUG_LOCATION, bs->next_action_.on_complete,
3015                             GRPC_ERROR_NONE);
3016   } else if (s->byte_stream_error != GRPC_ERROR_NONE) {
3017     grpc_core::ExecCtx::Run(DEBUG_LOCATION, bs->next_action_.on_complete,
3018                             GRPC_ERROR_REF(s->byte_stream_error));
3019     if (s->data_parser.parsing_frame != nullptr) {
3020       s->data_parser.parsing_frame->Unref();
3021       s->data_parser.parsing_frame = nullptr;
3022     }
3023   } else if (s->read_closed) {
3024     if (bs->remaining_bytes_ != 0) {
3025       s->byte_stream_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
3026           "Truncated message", &s->read_closed_error, 1);
3027       grpc_core::ExecCtx::Run(DEBUG_LOCATION, bs->next_action_.on_complete,
3028                               GRPC_ERROR_REF(s->byte_stream_error));
3029       if (s->data_parser.parsing_frame != nullptr) {
3030         s->data_parser.parsing_frame->Unref();
3031         s->data_parser.parsing_frame = nullptr;
3032       }
3033     } else {
3034       /* Should never reach here. */
3035       GPR_ASSERT(false);
3036     }
3037   } else {
3038     s->on_next = bs->next_action_.on_complete;
3039   }
3040   bs->Unref();
3041 }
3042 
Next(size_t max_size_hint,grpc_closure * on_complete)3043 bool Chttp2IncomingByteStream::Next(size_t max_size_hint,
3044                                     grpc_closure* on_complete) {
3045   GPR_TIMER_SCOPE("incoming_byte_stream_next", 0);
3046   if (stream_->unprocessed_incoming_frames_buffer.length > 0) {
3047     return true;
3048   } else {
3049     Ref();
3050     next_action_.max_size_hint = max_size_hint;
3051     next_action_.on_complete = on_complete;
3052     transport_->combiner->Run(
3053         GRPC_CLOSURE_INIT(&next_action_.closure,
3054                           &Chttp2IncomingByteStream::NextLocked, this, nullptr),
3055         GRPC_ERROR_NONE);
3056     return false;
3057   }
3058 }
3059 
MaybeCreateStreamDecompressionCtx()3060 void Chttp2IncomingByteStream::MaybeCreateStreamDecompressionCtx() {
3061   GPR_DEBUG_ASSERT(stream_->stream_decompression_method !=
3062                    GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS);
3063   if (!stream_->stream_decompression_ctx) {
3064     stream_->stream_decompression_ctx = grpc_stream_compression_context_create(
3065         stream_->stream_decompression_method);
3066   }
3067 }
3068 
Pull(grpc_slice * slice)3069 grpc_error* Chttp2IncomingByteStream::Pull(grpc_slice* slice) {
3070   GPR_TIMER_SCOPE("incoming_byte_stream_pull", 0);
3071   grpc_error* error;
3072   if (stream_->unprocessed_incoming_frames_buffer.length > 0) {
3073     if (!stream_->unprocessed_incoming_frames_decompressed &&
3074         stream_->stream_decompression_method !=
3075             GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
3076       bool end_of_context;
3077       MaybeCreateStreamDecompressionCtx();
3078       if (!grpc_stream_decompress(stream_->stream_decompression_ctx,
3079                                   &stream_->unprocessed_incoming_frames_buffer,
3080                                   &stream_->decompressed_data_buffer, nullptr,
3081                                   MAX_SIZE_T, &end_of_context)) {
3082         error =
3083             GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream decompression error.");
3084         return error;
3085       }
3086       GPR_ASSERT(stream_->unprocessed_incoming_frames_buffer.length == 0);
3087       grpc_slice_buffer_swap(&stream_->unprocessed_incoming_frames_buffer,
3088                              &stream_->decompressed_data_buffer);
3089       stream_->unprocessed_incoming_frames_decompressed = true;
3090       if (end_of_context) {
3091         grpc_stream_compression_context_destroy(
3092             stream_->stream_decompression_ctx);
3093         stream_->stream_decompression_ctx = nullptr;
3094       }
3095       if (stream_->unprocessed_incoming_frames_buffer.length == 0) {
3096         *slice = grpc_empty_slice();
3097       }
3098     }
3099     error = grpc_deframe_unprocessed_incoming_frames(
3100         &stream_->data_parser, stream_,
3101         &stream_->unprocessed_incoming_frames_buffer, slice, nullptr);
3102     if (error != GRPC_ERROR_NONE) {
3103       return error;
3104     }
3105   } else {
3106     error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
3107     stream_->t->combiner->Run(&stream_->reset_byte_stream,
3108                               GRPC_ERROR_REF(error));
3109     return error;
3110   }
3111   return GRPC_ERROR_NONE;
3112 }
3113 
PublishError(grpc_error * error)3114 void Chttp2IncomingByteStream::PublishError(grpc_error* error) {
3115   GPR_ASSERT(error != GRPC_ERROR_NONE);
3116   grpc_core::ExecCtx::Run(DEBUG_LOCATION, stream_->on_next,
3117                           GRPC_ERROR_REF(error));
3118   stream_->on_next = nullptr;
3119   GRPC_ERROR_UNREF(stream_->byte_stream_error);
3120   stream_->byte_stream_error = GRPC_ERROR_REF(error);
3121   grpc_chttp2_cancel_stream(transport_, stream_, GRPC_ERROR_REF(error));
3122 }
3123 
Push(const grpc_slice & slice,grpc_slice * slice_out)3124 grpc_error* Chttp2IncomingByteStream::Push(const grpc_slice& slice,
3125                                            grpc_slice* slice_out) {
3126   if (remaining_bytes_ < GRPC_SLICE_LENGTH(slice)) {
3127     grpc_error* error =
3128         GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream");
3129     transport_->combiner->Run(&stream_->reset_byte_stream,
3130                               GRPC_ERROR_REF(error));
3131     grpc_slice_unref_internal(slice);
3132     return error;
3133   } else {
3134     remaining_bytes_ -= static_cast<uint32_t> GRPC_SLICE_LENGTH(slice);
3135     if (slice_out != nullptr) {
3136       *slice_out = slice;
3137     }
3138     return GRPC_ERROR_NONE;
3139   }
3140 }
3141 
Finished(grpc_error * error,bool reset_on_error)3142 grpc_error* Chttp2IncomingByteStream::Finished(grpc_error* error,
3143                                                bool reset_on_error) {
3144   if (error == GRPC_ERROR_NONE) {
3145     if (remaining_bytes_ != 0) {
3146       error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
3147     }
3148   }
3149   if (error != GRPC_ERROR_NONE && reset_on_error) {
3150     transport_->combiner->Run(&stream_->reset_byte_stream,
3151                               GRPC_ERROR_REF(error));
3152   }
3153   Unref();
3154   return error;
3155 }
3156 
Shutdown(grpc_error * error)3157 void Chttp2IncomingByteStream::Shutdown(grpc_error* error) {
3158   GRPC_ERROR_UNREF(Finished(error, true /* reset_on_error */));
3159 }
3160 
3161 }  // namespace grpc_core
3162 
3163 /*******************************************************************************
3164  * RESOURCE QUOTAS
3165  */
3166 
post_benign_reclaimer(grpc_chttp2_transport * t)3167 static void post_benign_reclaimer(grpc_chttp2_transport* t) {
3168   if (!t->benign_reclaimer_registered) {
3169     t->benign_reclaimer_registered = true;
3170     GRPC_CHTTP2_REF_TRANSPORT(t, "benign_reclaimer");
3171     GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked, benign_reclaimer, t,
3172                       grpc_schedule_on_exec_ctx);
3173     grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep),
3174                                       false, &t->benign_reclaimer_locked);
3175   }
3176 }
3177 
post_destructive_reclaimer(grpc_chttp2_transport * t)3178 static void post_destructive_reclaimer(grpc_chttp2_transport* t) {
3179   if (!t->destructive_reclaimer_registered) {
3180     t->destructive_reclaimer_registered = true;
3181     GRPC_CHTTP2_REF_TRANSPORT(t, "destructive_reclaimer");
3182     GRPC_CLOSURE_INIT(&t->destructive_reclaimer_locked, destructive_reclaimer,
3183                       t, grpc_schedule_on_exec_ctx);
3184     grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep),
3185                                       true, &t->destructive_reclaimer_locked);
3186   }
3187 }
3188 
benign_reclaimer(void * arg,grpc_error * error)3189 static void benign_reclaimer(void* arg, grpc_error* error) {
3190   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
3191   t->combiner->Run(GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked,
3192                                      benign_reclaimer_locked, t, nullptr),
3193                    GRPC_ERROR_REF(error));
3194 }
3195 
benign_reclaimer_locked(void * arg,grpc_error * error)3196 static void benign_reclaimer_locked(void* arg, grpc_error* error) {
3197   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
3198   if (error == GRPC_ERROR_NONE &&
3199       grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
3200     /* Channel with no active streams: send a goaway to try and make it
3201      * disconnect cleanly */
3202     if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
3203       gpr_log(GPR_INFO, "HTTP2: %s - send goaway to free memory",
3204               t->peer_string);
3205     }
3206     send_goaway(t,
3207                 grpc_error_set_int(
3208                     GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"),
3209                     GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM));
3210   } else if (error == GRPC_ERROR_NONE &&
3211              GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
3212     gpr_log(GPR_INFO,
3213             "HTTP2: %s - skip benign reclamation, there are still %" PRIdPTR
3214             " streams",
3215             t->peer_string, grpc_chttp2_stream_map_size(&t->stream_map));
3216   }
3217   t->benign_reclaimer_registered = false;
3218   if (error != GRPC_ERROR_CANCELLED) {
3219     grpc_resource_user_finish_reclamation(
3220         grpc_endpoint_get_resource_user(t->ep));
3221   }
3222   GRPC_CHTTP2_UNREF_TRANSPORT(t, "benign_reclaimer");
3223 }
3224 
destructive_reclaimer(void * arg,grpc_error * error)3225 static void destructive_reclaimer(void* arg, grpc_error* error) {
3226   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
3227   t->combiner->Run(GRPC_CLOSURE_INIT(&t->destructive_reclaimer_locked,
3228                                      destructive_reclaimer_locked, t, nullptr),
3229                    GRPC_ERROR_REF(error));
3230 }
3231 
destructive_reclaimer_locked(void * arg,grpc_error * error)3232 static void destructive_reclaimer_locked(void* arg, grpc_error* error) {
3233   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
3234   size_t n = grpc_chttp2_stream_map_size(&t->stream_map);
3235   t->destructive_reclaimer_registered = false;
3236   if (error == GRPC_ERROR_NONE && n > 0) {
3237     grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(
3238         grpc_chttp2_stream_map_rand(&t->stream_map));
3239     if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
3240       gpr_log(GPR_INFO, "HTTP2: %s - abandon stream id %d", t->peer_string,
3241               s->id);
3242     }
3243     grpc_chttp2_cancel_stream(
3244         t, s,
3245         grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"),
3246                            GRPC_ERROR_INT_HTTP2_ERROR,
3247                            GRPC_HTTP2_ENHANCE_YOUR_CALM));
3248     if (n > 1) {
3249       /* Since we cancel one stream per destructive reclamation, if
3250          there are more streams left, we can immediately post a new
3251          reclaimer in case the resource quota needs to free more
3252          memory */
3253       post_destructive_reclaimer(t);
3254     }
3255   }
3256   if (error != GRPC_ERROR_CANCELLED) {
3257     grpc_resource_user_finish_reclamation(
3258         grpc_endpoint_get_resource_user(t->ep));
3259   }
3260   GRPC_CHTTP2_UNREF_TRANSPORT(t, "destructive_reclaimer");
3261 }
3262 
3263 /*******************************************************************************
3264  * MONITORING
3265  */
3266 
grpc_chttp2_initiate_write_reason_string(grpc_chttp2_initiate_write_reason reason)3267 const char* grpc_chttp2_initiate_write_reason_string(
3268     grpc_chttp2_initiate_write_reason reason) {
3269   switch (reason) {
3270     case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE:
3271       return "INITIAL_WRITE";
3272     case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM:
3273       return "START_NEW_STREAM";
3274     case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE:
3275       return "SEND_MESSAGE";
3276     case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA:
3277       return "SEND_INITIAL_METADATA";
3278     case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA:
3279       return "SEND_TRAILING_METADATA";
3280     case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING:
3281       return "RETRY_SEND_PING";
3282     case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS:
3283       return "CONTINUE_PINGS";
3284     case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT:
3285       return "GOAWAY_SENT";
3286     case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM:
3287       return "RST_STREAM";
3288     case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API:
3289       return "CLOSE_FROM_API";
3290     case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL:
3291       return "STREAM_FLOW_CONTROL";
3292     case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL:
3293       return "TRANSPORT_FLOW_CONTROL";
3294     case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS:
3295       return "SEND_SETTINGS";
3296     case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING:
3297       return "FLOW_CONTROL_UNSTALLED_BY_SETTING";
3298     case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE:
3299       return "FLOW_CONTROL_UNSTALLED_BY_UPDATE";
3300     case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING:
3301       return "APPLICATION_PING";
3302     case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING:
3303       return "KEEPALIVE_PING";
3304     case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED:
3305       return "TRANSPORT_FLOW_CONTROL_UNSTALLED";
3306     case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE:
3307       return "PING_RESPONSE";
3308     case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM:
3309       return "FORCE_RST_STREAM";
3310   }
3311   GPR_UNREACHABLE_CODE(return "unknown");
3312 }
3313 
chttp2_get_endpoint(grpc_transport * t)3314 static grpc_endpoint* chttp2_get_endpoint(grpc_transport* t) {
3315   return (reinterpret_cast<grpc_chttp2_transport*>(t))->ep;
3316 }
3317 
3318 static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
3319                                              "chttp2",
3320                                              init_stream,
3321                                              set_pollset,
3322                                              set_pollset_set,
3323                                              perform_stream_op,
3324                                              perform_transport_op,
3325                                              destroy_stream,
3326                                              destroy_transport,
3327                                              chttp2_get_endpoint};
3328 
get_vtable(void)3329 static const grpc_transport_vtable* get_vtable(void) { return &vtable; }
3330 
3331 grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode>
grpc_chttp2_transport_get_socket_node(grpc_transport * transport)3332 grpc_chttp2_transport_get_socket_node(grpc_transport* transport) {
3333   grpc_chttp2_transport* t =
3334       reinterpret_cast<grpc_chttp2_transport*>(transport);
3335   return t->channelz_socket;
3336 }
3337 
grpc_create_chttp2_transport(const grpc_channel_args * channel_args,grpc_endpoint * ep,bool is_client,grpc_resource_user * resource_user)3338 grpc_transport* grpc_create_chttp2_transport(
3339     const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client,
3340     grpc_resource_user* resource_user) {
3341   auto t =
3342       new grpc_chttp2_transport(channel_args, ep, is_client, resource_user);
3343   return &t->base;
3344 }
3345 
grpc_chttp2_transport_start_reading(grpc_transport * transport,grpc_slice_buffer * read_buffer,grpc_closure * notify_on_receive_settings)3346 void grpc_chttp2_transport_start_reading(
3347     grpc_transport* transport, grpc_slice_buffer* read_buffer,
3348     grpc_closure* notify_on_receive_settings) {
3349   grpc_chttp2_transport* t =
3350       reinterpret_cast<grpc_chttp2_transport*>(transport);
3351   GRPC_CHTTP2_REF_TRANSPORT(
3352       t, "reading_action"); /* matches unref inside reading_action */
3353   if (read_buffer != nullptr) {
3354     grpc_slice_buffer_move_into(read_buffer, &t->read_buffer);
3355     gpr_free(read_buffer);
3356   }
3357   t->notify_on_receive_settings = notify_on_receive_settings;
3358   t->combiner->Run(
3359       GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t, nullptr),
3360       GRPC_ERROR_NONE);
3361 }
3362