• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
22 
23 #include <inttypes.h>
24 #include <limits.h>
25 #include <math.h>
26 #include <stdio.h>
27 #include <string.h>
28 
29 #include <grpc/slice_buffer.h>
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/log.h>
32 #include <grpc/support/string_util.h>
33 
34 #include "src/core/ext/transport/chttp2/transport/frame_data.h"
35 #include "src/core/ext/transport/chttp2/transport/internal.h"
36 #include "src/core/ext/transport/chttp2/transport/varint.h"
37 #include "src/core/lib/channel/channel_args.h"
38 #include "src/core/lib/compression/stream_compression.h"
39 #include "src/core/lib/debug/stats.h"
40 #include "src/core/lib/gpr/env.h"
41 #include "src/core/lib/gpr/string.h"
42 #include "src/core/lib/gprpp/memory.h"
43 #include "src/core/lib/http/parser.h"
44 #include "src/core/lib/iomgr/executor.h"
45 #include "src/core/lib/iomgr/timer.h"
46 #include "src/core/lib/profiling/timers.h"
47 #include "src/core/lib/slice/slice_internal.h"
48 #include "src/core/lib/slice/slice_string_helpers.h"
49 #include "src/core/lib/transport/error_utils.h"
50 #include "src/core/lib/transport/http2_errors.h"
51 #include "src/core/lib/transport/static_metadata.h"
52 #include "src/core/lib/transport/status_conversion.h"
53 #include "src/core/lib/transport/timeout_encoding.h"
54 #include "src/core/lib/transport/transport.h"
55 #include "src/core/lib/transport/transport_impl.h"
56 
57 #define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
58 #define MAX_WINDOW 0x7fffffffu
59 #define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024)
60 #define DEFAULT_MAX_HEADER_LIST_SIZE (8 * 1024)
61 
62 #define DEFAULT_CLIENT_KEEPALIVE_TIME_MS INT_MAX
63 #define DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_MS 20000 /* 20 seconds */
64 #define DEFAULT_SERVER_KEEPALIVE_TIME_MS 7200000  /* 2 hours */
65 #define DEFAULT_SERVER_KEEPALIVE_TIMEOUT_MS 20000 /* 20 seconds */
66 #define DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS false
67 #define KEEPALIVE_TIME_BACKOFF_MULTIPLIER 2
68 
69 #define DEFAULT_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS 300000 /* 5 minutes */
70 #define DEFAULT_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS 300000 /* 5 minutes */
71 #define DEFAULT_MAX_PINGS_BETWEEN_DATA 2
72 #define DEFAULT_MAX_PING_STRIKES 2
73 
74 static int g_default_client_keepalive_time_ms =
75     DEFAULT_CLIENT_KEEPALIVE_TIME_MS;
76 static int g_default_client_keepalive_timeout_ms =
77     DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_MS;
78 static int g_default_server_keepalive_time_ms =
79     DEFAULT_SERVER_KEEPALIVE_TIME_MS;
80 static int g_default_server_keepalive_timeout_ms =
81     DEFAULT_SERVER_KEEPALIVE_TIMEOUT_MS;
82 static bool g_default_client_keepalive_permit_without_calls =
83     DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS;
84 static bool g_default_server_keepalive_permit_without_calls =
85     DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS;
86 
87 static int g_default_min_sent_ping_interval_without_data_ms =
88     DEFAULT_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS;
89 static int g_default_min_recv_ping_interval_without_data_ms =
90     DEFAULT_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS;
91 static int g_default_max_pings_without_data = DEFAULT_MAX_PINGS_BETWEEN_DATA;
92 static int g_default_max_ping_strikes = DEFAULT_MAX_PING_STRIKES;
93 
94 #define MAX_CLIENT_STREAM_ID 0x7fffffffu
95 grpc_core::TraceFlag grpc_http_trace(false, "http");
96 grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount(false,
97                                                          "chttp2_refcount");
98 
99 /* forward declarations of various callbacks that we'll build closures around */
100 static void write_action_begin_locked(void* t, grpc_error* error);
101 static void write_action(void* t, grpc_error* error);
102 static void write_action_end_locked(void* t, grpc_error* error);
103 
104 static void read_action_locked(void* t, grpc_error* error);
105 
106 static void complete_fetch_locked(void* gs, grpc_error* error);
107 /** Set a transport level setting, and push it to our peer */
108 static void queue_setting_update(grpc_chttp2_transport* t,
109                                  grpc_chttp2_setting_id id, uint32_t value);
110 
111 static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
112                            grpc_error* error);
113 
114 /** Start new streams that have been created if we can */
115 static void maybe_start_some_streams(grpc_chttp2_transport* t);
116 
117 static void connectivity_state_set(grpc_chttp2_transport* t,
118                                    grpc_connectivity_state state,
119                                    grpc_error* error, const char* reason);
120 
121 static void benign_reclaimer_locked(void* t, grpc_error* error);
122 static void destructive_reclaimer_locked(void* t, grpc_error* error);
123 
124 static void post_benign_reclaimer(grpc_chttp2_transport* t);
125 static void post_destructive_reclaimer(grpc_chttp2_transport* t);
126 
127 static void close_transport_locked(grpc_chttp2_transport* t, grpc_error* error);
128 static void end_all_the_calls(grpc_chttp2_transport* t, grpc_error* error);
129 
130 static void schedule_bdp_ping_locked(grpc_chttp2_transport* t);
131 static void start_bdp_ping_locked(void* tp, grpc_error* error);
132 static void finish_bdp_ping_locked(void* tp, grpc_error* error);
133 static void next_bdp_ping_timer_expired_locked(void* tp, grpc_error* error);
134 
135 static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error);
136 static void send_ping_locked(grpc_chttp2_transport* t,
137                              grpc_closure* on_initiate,
138                              grpc_closure* on_complete);
139 static void retry_initiate_ping_locked(void* tp, grpc_error* error);
140 
141 /** keepalive-relevant functions */
142 static void init_keepalive_ping_locked(void* arg, grpc_error* error);
143 static void start_keepalive_ping_locked(void* arg, grpc_error* error);
144 static void finish_keepalive_ping_locked(void* arg, grpc_error* error);
145 static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error);
146 
147 static void reset_byte_stream(void* arg, grpc_error* error);
148 
149 // Flow control default enabled. Can be disabled by setting
150 // GRPC_EXPERIMENTAL_DISABLE_FLOW_CONTROL
151 bool g_flow_control_enabled = true;
152 
153 /*******************************************************************************
154  * CONSTRUCTION/DESTRUCTION/REFCOUNTING
155  */
156 
destruct_transport(grpc_chttp2_transport * t)157 static void destruct_transport(grpc_chttp2_transport* t) {
158   size_t i;
159 
160   grpc_endpoint_destroy(t->ep);
161 
162   grpc_slice_buffer_destroy_internal(&t->qbuf);
163 
164   grpc_slice_buffer_destroy_internal(&t->outbuf);
165   grpc_chttp2_hpack_compressor_destroy(&t->hpack_compressor);
166 
167   grpc_slice_buffer_destroy_internal(&t->read_buffer);
168   grpc_chttp2_hpack_parser_destroy(&t->hpack_parser);
169   grpc_chttp2_goaway_parser_destroy(&t->goaway_parser);
170 
171   for (i = 0; i < STREAM_LIST_COUNT; i++) {
172     GPR_ASSERT(t->lists[i].head == nullptr);
173     GPR_ASSERT(t->lists[i].tail == nullptr);
174   }
175 
176   GRPC_ERROR_UNREF(t->goaway_error);
177 
178   GPR_ASSERT(grpc_chttp2_stream_map_size(&t->stream_map) == 0);
179 
180   grpc_chttp2_stream_map_destroy(&t->stream_map);
181   grpc_connectivity_state_destroy(&t->channel_callback.state_tracker);
182 
183   GRPC_COMBINER_UNREF(t->combiner, "chttp2_transport");
184 
185   cancel_pings(t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"));
186 
187   while (t->write_cb_pool) {
188     grpc_chttp2_write_cb* next = t->write_cb_pool->next;
189     gpr_free(t->write_cb_pool);
190     t->write_cb_pool = next;
191   }
192 
193   t->flow_control.Destroy();
194 
195   GRPC_ERROR_UNREF(t->closed_with_error);
196   gpr_free(t->ping_acks);
197   gpr_free(t->peer_string);
198   gpr_free(t);
199 }
200 
201 #ifndef NDEBUG
grpc_chttp2_unref_transport(grpc_chttp2_transport * t,const char * reason,const char * file,int line)202 void grpc_chttp2_unref_transport(grpc_chttp2_transport* t, const char* reason,
203                                  const char* file, int line) {
204   if (grpc_trace_chttp2_refcount.enabled()) {
205     gpr_atm val = gpr_atm_no_barrier_load(&t->refs.count);
206     gpr_log(GPR_DEBUG, "chttp2:unref:%p %" PRIdPTR "->%" PRIdPTR " %s [%s:%d]",
207             t, val, val - 1, reason, file, line);
208   }
209   if (!gpr_unref(&t->refs)) return;
210   destruct_transport(t);
211 }
212 
grpc_chttp2_ref_transport(grpc_chttp2_transport * t,const char * reason,const char * file,int line)213 void grpc_chttp2_ref_transport(grpc_chttp2_transport* t, const char* reason,
214                                const char* file, int line) {
215   if (grpc_trace_chttp2_refcount.enabled()) {
216     gpr_atm val = gpr_atm_no_barrier_load(&t->refs.count);
217     gpr_log(GPR_DEBUG, "chttp2:  ref:%p %" PRIdPTR "->%" PRIdPTR " %s [%s:%d]",
218             t, val, val + 1, reason, file, line);
219   }
220   gpr_ref(&t->refs);
221 }
222 #else
grpc_chttp2_unref_transport(grpc_chttp2_transport * t)223 void grpc_chttp2_unref_transport(grpc_chttp2_transport* t) {
224   if (!gpr_unref(&t->refs)) return;
225   destruct_transport(t);
226 }
227 
grpc_chttp2_ref_transport(grpc_chttp2_transport * t)228 void grpc_chttp2_ref_transport(grpc_chttp2_transport* t) { gpr_ref(&t->refs); }
229 #endif
230 
231 static const grpc_transport_vtable* get_vtable(void);
232 
233 /* Returns whether bdp is enabled */
read_channel_args(grpc_chttp2_transport * t,const grpc_channel_args * channel_args,bool is_client)234 static bool read_channel_args(grpc_chttp2_transport* t,
235                               const grpc_channel_args* channel_args,
236                               bool is_client) {
237   bool enable_bdp = true;
238   size_t i;
239   int j;
240 
241   for (i = 0; i < channel_args->num_args; i++) {
242     if (0 == strcmp(channel_args->args[i].key,
243                     GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER)) {
244       const grpc_integer_options options = {-1, 0, INT_MAX};
245       const int value =
246           grpc_channel_arg_get_integer(&channel_args->args[i], options);
247       if (value >= 0) {
248         if ((t->next_stream_id & 1) != (value & 1)) {
249           gpr_log(GPR_ERROR, "%s: low bit must be %d on %s",
250                   GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER, t->next_stream_id & 1,
251                   is_client ? "client" : "server");
252         } else {
253           t->next_stream_id = static_cast<uint32_t>(value);
254         }
255       }
256     } else if (0 == strcmp(channel_args->args[i].key,
257                            GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER)) {
258       const grpc_integer_options options = {-1, 0, INT_MAX};
259       const int value =
260           grpc_channel_arg_get_integer(&channel_args->args[i], options);
261       if (value >= 0) {
262         grpc_chttp2_hpack_compressor_set_max_usable_size(
263             &t->hpack_compressor, static_cast<uint32_t>(value));
264       }
265     } else if (0 == strcmp(channel_args->args[i].key,
266                            GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) {
267       t->ping_policy.max_pings_without_data = grpc_channel_arg_get_integer(
268           &channel_args->args[i],
269           {g_default_max_pings_without_data, 0, INT_MAX});
270     } else if (0 == strcmp(channel_args->args[i].key,
271                            GRPC_ARG_HTTP2_MAX_PING_STRIKES)) {
272       t->ping_policy.max_ping_strikes = grpc_channel_arg_get_integer(
273           &channel_args->args[i], {g_default_max_ping_strikes, 0, INT_MAX});
274     } else if (0 ==
275                strcmp(channel_args->args[i].key,
276                       GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS)) {
277       t->ping_policy.min_sent_ping_interval_without_data =
278           grpc_channel_arg_get_integer(
279               &channel_args->args[i],
280               grpc_integer_options{
281                   g_default_min_sent_ping_interval_without_data_ms, 0,
282                   INT_MAX});
283     } else if (0 ==
284                strcmp(channel_args->args[i].key,
285                       GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS)) {
286       t->ping_policy.min_recv_ping_interval_without_data =
287           grpc_channel_arg_get_integer(
288               &channel_args->args[i],
289               grpc_integer_options{
290                   g_default_min_recv_ping_interval_without_data_ms, 0,
291                   INT_MAX});
292     } else if (0 == strcmp(channel_args->args[i].key,
293                            GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)) {
294       t->write_buffer_size = static_cast<uint32_t>(grpc_channel_arg_get_integer(
295           &channel_args->args[i], {0, 0, MAX_WRITE_BUFFER_SIZE}));
296     } else if (0 ==
297                strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) {
298       enable_bdp = grpc_channel_arg_get_bool(&channel_args->args[i], true);
299     } else if (0 ==
300                strcmp(channel_args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) {
301       const int value = grpc_channel_arg_get_integer(
302           &channel_args->args[i],
303           grpc_integer_options{t->is_client
304                                    ? g_default_client_keepalive_time_ms
305                                    : g_default_server_keepalive_time_ms,
306                                1, INT_MAX});
307       t->keepalive_time = value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value;
308     } else if (0 == strcmp(channel_args->args[i].key,
309                            GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) {
310       const int value = grpc_channel_arg_get_integer(
311           &channel_args->args[i],
312           grpc_integer_options{t->is_client
313                                    ? g_default_client_keepalive_timeout_ms
314                                    : g_default_server_keepalive_timeout_ms,
315                                0, INT_MAX});
316       t->keepalive_timeout = value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value;
317     } else if (0 == strcmp(channel_args->args[i].key,
318                            GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) {
319       t->keepalive_permit_without_calls = static_cast<uint32_t>(
320           grpc_channel_arg_get_integer(&channel_args->args[i], {0, 0, 1}));
321     } else if (0 == strcmp(channel_args->args[i].key,
322                            GRPC_ARG_OPTIMIZATION_TARGET)) {
323       if (channel_args->args[i].type != GRPC_ARG_STRING) {
324         gpr_log(GPR_ERROR, "%s should be a string",
325                 GRPC_ARG_OPTIMIZATION_TARGET);
326       } else if (0 == strcmp(channel_args->args[i].value.string, "blend")) {
327         t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY;
328       } else if (0 == strcmp(channel_args->args[i].value.string, "latency")) {
329         t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY;
330       } else if (0 ==
331                  strcmp(channel_args->args[i].value.string, "throughput")) {
332         t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT;
333       } else {
334         gpr_log(GPR_ERROR, "%s value '%s' unknown, assuming 'blend'",
335                 GRPC_ARG_OPTIMIZATION_TARGET,
336                 channel_args->args[i].value.string);
337       }
338     } else {
339       static const struct {
340         const char* channel_arg_name;
341         grpc_chttp2_setting_id setting_id;
342         grpc_integer_options integer_options;
343         bool availability[2] /* server, client */;
344       } settings_map[] = {{GRPC_ARG_MAX_CONCURRENT_STREAMS,
345                            GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
346                            {-1, 0, INT32_MAX},
347                            {true, false}},
348                           {GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER,
349                            GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE,
350                            {-1, 0, INT32_MAX},
351                            {true, true}},
352                           {GRPC_ARG_MAX_METADATA_SIZE,
353                            GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
354                            {-1, 0, INT32_MAX},
355                            {true, true}},
356                           {GRPC_ARG_HTTP2_MAX_FRAME_SIZE,
357                            GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
358                            {-1, 16384, 16777215},
359                            {true, true}},
360                           {GRPC_ARG_HTTP2_ENABLE_TRUE_BINARY,
361                            GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA,
362                            {1, 0, 1},
363                            {true, true}},
364                           {GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES,
365                            GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
366                            {-1, 5, INT32_MAX},
367                            {true, true}}};
368       for (j = 0; j < static_cast<int> GPR_ARRAY_SIZE(settings_map); j++) {
369         if (0 == strcmp(channel_args->args[i].key,
370                         settings_map[j].channel_arg_name)) {
371           if (!settings_map[j].availability[is_client]) {
372             gpr_log(GPR_DEBUG, "%s is not available on %s",
373                     settings_map[j].channel_arg_name,
374                     is_client ? "clients" : "servers");
375           } else {
376             int value = grpc_channel_arg_get_integer(
377                 &channel_args->args[i], settings_map[j].integer_options);
378             if (value >= 0) {
379               queue_setting_update(t, settings_map[j].setting_id,
380                                    static_cast<uint32_t>(value));
381             }
382           }
383           break;
384         }
385       }
386     }
387   }
388   return enable_bdp;
389 }
390 
init_transport_closures(grpc_chttp2_transport * t)391 static void init_transport_closures(grpc_chttp2_transport* t) {
392   GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t,
393                     grpc_combiner_scheduler(t->combiner));
394   GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked, benign_reclaimer_locked, t,
395                     grpc_combiner_scheduler(t->combiner));
396   GRPC_CLOSURE_INIT(&t->destructive_reclaimer_locked,
397                     destructive_reclaimer_locked, t,
398                     grpc_combiner_scheduler(t->combiner));
399   GRPC_CLOSURE_INIT(&t->retry_initiate_ping_locked, retry_initiate_ping_locked,
400                     t, grpc_combiner_scheduler(t->combiner));
401   GRPC_CLOSURE_INIT(&t->start_bdp_ping_locked, start_bdp_ping_locked, t,
402                     grpc_combiner_scheduler(t->combiner));
403   GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t,
404                     grpc_combiner_scheduler(t->combiner));
405   GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked,
406                     next_bdp_ping_timer_expired_locked, t,
407                     grpc_combiner_scheduler(t->combiner));
408   GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, init_keepalive_ping_locked,
409                     t, grpc_combiner_scheduler(t->combiner));
410   GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked,
411                     start_keepalive_ping_locked, t,
412                     grpc_combiner_scheduler(t->combiner));
413   GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked,
414                     finish_keepalive_ping_locked, t,
415                     grpc_combiner_scheduler(t->combiner));
416   GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked,
417                     keepalive_watchdog_fired_locked, t,
418                     grpc_combiner_scheduler(t->combiner));
419 }
420 
init_transport_keepalive_settings(grpc_chttp2_transport * t)421 static void init_transport_keepalive_settings(grpc_chttp2_transport* t) {
422   if (t->is_client) {
423     t->keepalive_time = g_default_client_keepalive_time_ms == INT_MAX
424                             ? GRPC_MILLIS_INF_FUTURE
425                             : g_default_client_keepalive_time_ms;
426     t->keepalive_timeout = g_default_client_keepalive_timeout_ms == INT_MAX
427                                ? GRPC_MILLIS_INF_FUTURE
428                                : g_default_client_keepalive_timeout_ms;
429     t->keepalive_permit_without_calls =
430         g_default_client_keepalive_permit_without_calls;
431   } else {
432     t->keepalive_time = g_default_server_keepalive_time_ms == INT_MAX
433                             ? GRPC_MILLIS_INF_FUTURE
434                             : g_default_server_keepalive_time_ms;
435     t->keepalive_timeout = g_default_server_keepalive_timeout_ms == INT_MAX
436                                ? GRPC_MILLIS_INF_FUTURE
437                                : g_default_server_keepalive_timeout_ms;
438     t->keepalive_permit_without_calls =
439         g_default_server_keepalive_permit_without_calls;
440   }
441 }
442 
configure_transport_ping_policy(grpc_chttp2_transport * t)443 static void configure_transport_ping_policy(grpc_chttp2_transport* t) {
444   t->ping_policy.max_pings_without_data = g_default_max_pings_without_data;
445   t->ping_policy.min_sent_ping_interval_without_data =
446       g_default_min_sent_ping_interval_without_data_ms;
447   t->ping_policy.max_ping_strikes = g_default_max_ping_strikes;
448   t->ping_policy.min_recv_ping_interval_without_data =
449       g_default_min_recv_ping_interval_without_data_ms;
450 }
451 
init_keepalive_pings_if_enabled(grpc_chttp2_transport * t)452 static void init_keepalive_pings_if_enabled(grpc_chttp2_transport* t) {
453   if (t->keepalive_time != GRPC_MILLIS_INF_FUTURE) {
454     t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
455     GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
456     grpc_timer_init(&t->keepalive_ping_timer,
457                     grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
458                     &t->init_keepalive_ping_locked);
459   } else {
460     /* Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no
461        inflight keeaplive timers */
462     t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED;
463   }
464 }
465 
init_transport(grpc_chttp2_transport * t,const grpc_channel_args * channel_args,grpc_endpoint * ep,bool is_client)466 static void init_transport(grpc_chttp2_transport* t,
467                            const grpc_channel_args* channel_args,
468                            grpc_endpoint* ep, bool is_client) {
469   GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) ==
470              GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
471 
472   t->base.vtable = get_vtable();
473   t->ep = ep;
474   /* one ref is for destroy */
475   gpr_ref_init(&t->refs, 1);
476   t->combiner = grpc_combiner_create();
477   t->peer_string = grpc_endpoint_get_peer(ep);
478   t->endpoint_reading = 1;
479   t->next_stream_id = is_client ? 1 : 2;
480   t->is_client = is_client;
481   t->deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
482   t->is_first_frame = true;
483   grpc_connectivity_state_init(
484       &t->channel_callback.state_tracker, GRPC_CHANNEL_READY,
485       is_client ? "client_transport" : "server_transport");
486 
487   grpc_slice_buffer_init(&t->qbuf);
488   grpc_slice_buffer_init(&t->outbuf);
489   grpc_chttp2_hpack_compressor_init(&t->hpack_compressor);
490 
491   init_transport_closures(t);
492 
493   t->goaway_error = GRPC_ERROR_NONE;
494   grpc_chttp2_goaway_parser_init(&t->goaway_parser);
495   grpc_chttp2_hpack_parser_init(&t->hpack_parser);
496 
497   grpc_slice_buffer_init(&t->read_buffer);
498 
499   /* 8 is a random stab in the dark as to a good initial size: it's small enough
500      that it shouldn't waste memory for infrequently used connections, yet
501      large enough that the exponential growth should happen nicely when it's
502      needed.
503      TODO(ctiller): tune this */
504   grpc_chttp2_stream_map_init(&t->stream_map, 8);
505 
506   /* copy in initial settings to all setting sets */
507   size_t i;
508   int j;
509   for (i = 0; i < GRPC_CHTTP2_NUM_SETTINGS; i++) {
510     for (j = 0; j < GRPC_NUM_SETTING_SETS; j++) {
511       t->settings[j][i] = grpc_chttp2_settings_parameters[i].default_value;
512     }
513   }
514   t->dirtied_local_settings = 1;
515   /* Hack: it's common for implementations to assume 65536 bytes initial send
516      window -- this should by rights be 0 */
517   t->force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
518   t->sent_local_settings = 0;
519   t->write_buffer_size = grpc_core::chttp2::kDefaultWindow;
520 
521   if (is_client) {
522     grpc_slice_buffer_add(&t->outbuf, grpc_slice_from_copied_string(
523                                           GRPC_CHTTP2_CLIENT_CONNECT_STRING));
524   }
525 
526   /* configure http2 the way we like it */
527   if (is_client) {
528     queue_setting_update(t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
529     queue_setting_update(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
530   }
531   queue_setting_update(t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
532                        DEFAULT_MAX_HEADER_LIST_SIZE);
533   queue_setting_update(t, GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA,
534                        1);
535 
536   configure_transport_ping_policy(t);
537   init_transport_keepalive_settings(t);
538 
539   t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY;
540 
541   bool enable_bdp = true;
542   if (channel_args) {
543     enable_bdp = read_channel_args(t, channel_args, is_client);
544   }
545 
546   if (g_flow_control_enabled) {
547     t->flow_control.Init<grpc_core::chttp2::TransportFlowControl>(t,
548                                                                   enable_bdp);
549   } else {
550     t->flow_control.Init<grpc_core::chttp2::TransportFlowControlDisabled>(t);
551     enable_bdp = false;
552   }
553 
554   /* No pings allowed before receiving a header or data frame. */
555   t->ping_state.pings_before_data_required = 0;
556   t->ping_state.is_delayed_ping_timer_set = false;
557   t->ping_state.last_ping_sent_time = GRPC_MILLIS_INF_PAST;
558 
559   t->ping_recv_state.last_ping_recv_time = GRPC_MILLIS_INF_PAST;
560   t->ping_recv_state.ping_strikes = 0;
561 
562   init_keepalive_pings_if_enabled(t);
563 
564   if (enable_bdp) {
565     GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping");
566     schedule_bdp_ping_locked(t);
567     grpc_chttp2_act_on_flowctl_action(t->flow_control->PeriodicUpdate(), t,
568                                       nullptr);
569   }
570 
571   grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE);
572   post_benign_reclaimer(t);
573 }
574 
destroy_transport_locked(void * tp,grpc_error * error)575 static void destroy_transport_locked(void* tp, grpc_error* error) {
576   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
577   t->destroying = 1;
578   close_transport_locked(
579       t, grpc_error_set_int(
580              GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport destroyed"),
581              GRPC_ERROR_INT_OCCURRED_DURING_WRITE, t->write_state));
582   GRPC_CHTTP2_UNREF_TRANSPORT(t, "destroy");
583 }
584 
destroy_transport(grpc_transport * gt)585 static void destroy_transport(grpc_transport* gt) {
586   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
587   GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(destroy_transport_locked, t,
588                                          grpc_combiner_scheduler(t->combiner)),
589                      GRPC_ERROR_NONE);
590 }
591 
close_transport_locked(grpc_chttp2_transport * t,grpc_error * error)592 static void close_transport_locked(grpc_chttp2_transport* t,
593                                    grpc_error* error) {
594   end_all_the_calls(t, GRPC_ERROR_REF(error));
595   cancel_pings(t, GRPC_ERROR_REF(error));
596   if (t->closed_with_error == GRPC_ERROR_NONE) {
597     if (!grpc_error_has_clear_grpc_status(error)) {
598       error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
599                                  GRPC_STATUS_UNAVAILABLE);
600     }
601     if (t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE) {
602       if (t->close_transport_on_writes_finished == nullptr) {
603         t->close_transport_on_writes_finished =
604             GRPC_ERROR_CREATE_FROM_STATIC_STRING(
605                 "Delayed close due to in-progress write");
606       }
607       t->close_transport_on_writes_finished =
608           grpc_error_add_child(t->close_transport_on_writes_finished, error);
609       return;
610     }
611     GPR_ASSERT(error != GRPC_ERROR_NONE);
612     t->closed_with_error = GRPC_ERROR_REF(error);
613     connectivity_state_set(t, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error),
614                            "close_transport");
615     if (t->ping_state.is_delayed_ping_timer_set) {
616       grpc_timer_cancel(&t->ping_state.delayed_ping_timer);
617     }
618     if (t->have_next_bdp_ping_timer) {
619       grpc_timer_cancel(&t->next_bdp_ping_timer);
620     }
621     switch (t->keepalive_state) {
622       case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING:
623         grpc_timer_cancel(&t->keepalive_ping_timer);
624         break;
625       case GRPC_CHTTP2_KEEPALIVE_STATE_PINGING:
626         grpc_timer_cancel(&t->keepalive_ping_timer);
627         grpc_timer_cancel(&t->keepalive_watchdog_timer);
628         break;
629       case GRPC_CHTTP2_KEEPALIVE_STATE_DYING:
630       case GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED:
631         /* keepalive timers are not set in these two states */
632         break;
633     }
634 
635     /* flush writable stream list to avoid dangling references */
636     grpc_chttp2_stream* s;
637     while (grpc_chttp2_list_pop_writable_stream(t, &s)) {
638       GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:close");
639     }
640     GPR_ASSERT(t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE);
641     grpc_endpoint_shutdown(t->ep, GRPC_ERROR_REF(error));
642   }
643   if (t->notify_on_receive_settings != nullptr) {
644     GRPC_CLOSURE_SCHED(t->notify_on_receive_settings, GRPC_ERROR_CANCELLED);
645     t->notify_on_receive_settings = nullptr;
646   }
647   GRPC_ERROR_UNREF(error);
648 }
649 
650 #ifndef NDEBUG
grpc_chttp2_stream_ref(grpc_chttp2_stream * s,const char * reason)651 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s, const char* reason) {
652   grpc_stream_ref(s->refcount, reason);
653 }
grpc_chttp2_stream_unref(grpc_chttp2_stream * s,const char * reason)654 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s, const char* reason) {
655   grpc_stream_unref(s->refcount, reason);
656 }
657 #else
grpc_chttp2_stream_ref(grpc_chttp2_stream * s)658 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s) {
659   grpc_stream_ref(s->refcount);
660 }
grpc_chttp2_stream_unref(grpc_chttp2_stream * s)661 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s) {
662   grpc_stream_unref(s->refcount);
663 }
664 #endif
665 
init_stream(grpc_transport * gt,grpc_stream * gs,grpc_stream_refcount * refcount,const void * server_data,gpr_arena * arena)666 static int init_stream(grpc_transport* gt, grpc_stream* gs,
667                        grpc_stream_refcount* refcount, const void* server_data,
668                        gpr_arena* arena) {
669   GPR_TIMER_SCOPE("init_stream", 0);
670   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
671   grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs);
672 
673   s->t = t;
674   s->refcount = refcount;
675   /* We reserve one 'active stream' that's dropped when the stream is
676      read-closed. The others are for Chttp2IncomingByteStreams that are
677      actively reading */
678   GRPC_CHTTP2_STREAM_REF(s, "chttp2");
679 
680   grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0], arena);
681   grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[1], arena);
682   grpc_chttp2_data_parser_init(&s->data_parser);
683   grpc_slice_buffer_init(&s->flow_controlled_buffer);
684   s->deadline = GRPC_MILLIS_INF_FUTURE;
685   GRPC_CLOSURE_INIT(&s->complete_fetch_locked, complete_fetch_locked, s,
686                     grpc_schedule_on_exec_ctx);
687   grpc_slice_buffer_init(&s->unprocessed_incoming_frames_buffer);
688   s->unprocessed_incoming_frames_buffer_cached_length = 0;
689   grpc_slice_buffer_init(&s->frame_storage);
690   grpc_slice_buffer_init(&s->compressed_data_buffer);
691   grpc_slice_buffer_init(&s->decompressed_data_buffer);
692   s->pending_byte_stream = false;
693   s->decompressed_header_bytes = 0;
694   GRPC_CLOSURE_INIT(&s->reset_byte_stream, reset_byte_stream, s,
695                     grpc_combiner_scheduler(t->combiner));
696 
697   GRPC_CHTTP2_REF_TRANSPORT(t, "stream");
698 
699   if (server_data) {
700     s->id = static_cast<uint32_t>((uintptr_t)server_data);
701     *t->accepting_stream = s;
702     grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
703     post_destructive_reclaimer(t);
704   }
705 
706   if (t->flow_control->flow_control_enabled()) {
707     s->flow_control.Init<grpc_core::chttp2::StreamFlowControl>(
708         static_cast<grpc_core::chttp2::TransportFlowControl*>(
709             t->flow_control.get()),
710         s);
711   } else {
712     s->flow_control.Init<grpc_core::chttp2::StreamFlowControlDisabled>();
713   }
714 
715   return 0;
716 }
717 
destroy_stream_locked(void * sp,grpc_error * error)718 static void destroy_stream_locked(void* sp, grpc_error* error) {
719   GPR_TIMER_SCOPE("destroy_stream", 0);
720   grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(sp);
721   grpc_chttp2_transport* t = s->t;
722 
723   GPR_ASSERT((s->write_closed && s->read_closed) || s->id == 0);
724   if (s->id != 0) {
725     GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, s->id) == nullptr);
726   }
727 
728   grpc_slice_buffer_destroy_internal(&s->unprocessed_incoming_frames_buffer);
729   grpc_slice_buffer_destroy_internal(&s->frame_storage);
730   grpc_slice_buffer_destroy_internal(&s->compressed_data_buffer);
731   grpc_slice_buffer_destroy_internal(&s->decompressed_data_buffer);
732 
733   grpc_chttp2_list_remove_stalled_by_transport(t, s);
734   grpc_chttp2_list_remove_stalled_by_stream(t, s);
735 
736   for (int i = 0; i < STREAM_LIST_COUNT; i++) {
737     if (GPR_UNLIKELY(s->included[i])) {
738       gpr_log(GPR_ERROR, "%s stream %d still included in list %d",
739               t->is_client ? "client" : "server", s->id, i);
740       abort();
741     }
742   }
743 
744   GPR_ASSERT(s->send_initial_metadata_finished == nullptr);
745   GPR_ASSERT(s->fetching_send_message == nullptr);
746   GPR_ASSERT(s->send_trailing_metadata_finished == nullptr);
747   GPR_ASSERT(s->recv_initial_metadata_ready == nullptr);
748   GPR_ASSERT(s->recv_message_ready == nullptr);
749   GPR_ASSERT(s->recv_trailing_metadata_finished == nullptr);
750   grpc_chttp2_data_parser_destroy(&s->data_parser);
751   grpc_chttp2_incoming_metadata_buffer_destroy(&s->metadata_buffer[0]);
752   grpc_chttp2_incoming_metadata_buffer_destroy(&s->metadata_buffer[1]);
753   grpc_slice_buffer_destroy_internal(&s->flow_controlled_buffer);
754   GRPC_ERROR_UNREF(s->read_closed_error);
755   GRPC_ERROR_UNREF(s->write_closed_error);
756   GRPC_ERROR_UNREF(s->byte_stream_error);
757 
758   s->flow_control.Destroy();
759 
760   GRPC_CHTTP2_UNREF_TRANSPORT(t, "stream");
761 
762   GRPC_CLOSURE_SCHED(s->destroy_stream_arg, GRPC_ERROR_NONE);
763 }
764 
destroy_stream(grpc_transport * gt,grpc_stream * gs,grpc_closure * then_schedule_closure)765 static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
766                            grpc_closure* then_schedule_closure) {
767   GPR_TIMER_SCOPE("destroy_stream", 0);
768   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
769   grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs);
770 
771   if (s->stream_compression_ctx != nullptr) {
772     grpc_stream_compression_context_destroy(s->stream_compression_ctx);
773     s->stream_compression_ctx = nullptr;
774   }
775   if (s->stream_decompression_ctx != nullptr) {
776     grpc_stream_compression_context_destroy(s->stream_decompression_ctx);
777     s->stream_decompression_ctx = nullptr;
778   }
779 
780   s->destroy_stream_arg = then_schedule_closure;
781   GRPC_CLOSURE_SCHED(
782       GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s,
783                         grpc_combiner_scheduler(t->combiner)),
784       GRPC_ERROR_NONE);
785 }
786 
grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport * t,uint32_t id)787 grpc_chttp2_stream* grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport* t,
788                                                       uint32_t id) {
789   return static_cast<grpc_chttp2_stream*>(
790       grpc_chttp2_stream_map_find(&t->stream_map, id));
791 }
792 
grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport * t,uint32_t id)793 grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t,
794                                                       uint32_t id) {
795   if (t->channel_callback.accept_stream == nullptr) {
796     return nullptr;
797   }
798   grpc_chttp2_stream* accepting;
799   GPR_ASSERT(t->accepting_stream == nullptr);
800   t->accepting_stream = &accepting;
801   t->channel_callback.accept_stream(t->channel_callback.accept_stream_user_data,
802                                     &t->base,
803                                     (void*)static_cast<uintptr_t>(id));
804   t->accepting_stream = nullptr;
805   return accepting;
806 }
807 
808 /*******************************************************************************
809  * OUTPUT PROCESSING
810  */
811 
write_state_name(grpc_chttp2_write_state st)812 static const char* write_state_name(grpc_chttp2_write_state st) {
813   switch (st) {
814     case GRPC_CHTTP2_WRITE_STATE_IDLE:
815       return "IDLE";
816     case GRPC_CHTTP2_WRITE_STATE_WRITING:
817       return "WRITING";
818     case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
819       return "WRITING+MORE";
820   }
821   GPR_UNREACHABLE_CODE(return "UNKNOWN");
822 }
823 
set_write_state(grpc_chttp2_transport * t,grpc_chttp2_write_state st,const char * reason)824 static void set_write_state(grpc_chttp2_transport* t,
825                             grpc_chttp2_write_state st, const char* reason) {
826   GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "W:%p %s state %s -> %s [%s]", t,
827                                  t->is_client ? "CLIENT" : "SERVER",
828                                  write_state_name(t->write_state),
829                                  write_state_name(st), reason));
830   t->write_state = st;
831   /* If the state is being reset back to idle, it means a write was just
832    * finished. Make sure all the run_after_write closures are scheduled.
833    *
834    * This is also our chance to close the transport if the transport was marked
835    * to be closed after all writes finish (for example, if we received a go-away
836    * from peer while we had some pending writes) */
837   if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) {
838     GRPC_CLOSURE_LIST_SCHED(&t->run_after_write);
839     if (t->close_transport_on_writes_finished != nullptr) {
840       grpc_error* err = t->close_transport_on_writes_finished;
841       t->close_transport_on_writes_finished = nullptr;
842       close_transport_locked(t, err);
843     }
844   }
845 }
846 
inc_initiate_write_reason(grpc_chttp2_initiate_write_reason reason)847 static void inc_initiate_write_reason(
848     grpc_chttp2_initiate_write_reason reason) {
849   switch (reason) {
850     case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE:
851       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_INITIAL_WRITE();
852       break;
853     case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM:
854       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_START_NEW_STREAM();
855       break;
856     case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE:
857       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_MESSAGE();
858       break;
859     case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA:
860       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_INITIAL_METADATA();
861       break;
862     case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA:
863       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_TRAILING_METADATA();
864       break;
865     case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING:
866       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RETRY_SEND_PING();
867       break;
868     case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS:
869       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CONTINUE_PINGS();
870       break;
871     case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT:
872       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_GOAWAY_SENT();
873       break;
874     case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM:
875       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RST_STREAM();
876       break;
877     case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API:
878       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CLOSE_FROM_API();
879       break;
880     case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL:
881       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_STREAM_FLOW_CONTROL();
882       break;
883     case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL:
884       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL();
885       break;
886     case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS:
887       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_SETTINGS();
888       break;
889     case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING:
890       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_SETTING();
891       break;
892     case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE:
893       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_UPDATE();
894       break;
895     case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING:
896       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_APPLICATION_PING();
897       break;
898     case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING:
899       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_KEEPALIVE_PING();
900       break;
901     case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED:
902       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL_UNSTALLED();
903       break;
904     case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE:
905       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_PING_RESPONSE();
906       break;
907     case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM:
908       GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FORCE_RST_STREAM();
909       break;
910   }
911 }
912 
grpc_chttp2_initiate_write(grpc_chttp2_transport * t,grpc_chttp2_initiate_write_reason reason)913 void grpc_chttp2_initiate_write(grpc_chttp2_transport* t,
914                                 grpc_chttp2_initiate_write_reason reason) {
915   GPR_TIMER_SCOPE("grpc_chttp2_initiate_write", 0);
916 
917   switch (t->write_state) {
918     case GRPC_CHTTP2_WRITE_STATE_IDLE:
919       inc_initiate_write_reason(reason);
920       set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING,
921                       grpc_chttp2_initiate_write_reason_string(reason));
922       t->is_first_write_in_batch = true;
923       GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
924       /* Note that the 'write_action_begin_locked' closure is being scheduled
925        * on the 'finally_scheduler' of t->combiner. This means that
926        * 'write_action_begin_locked' is called only *after* all the other
927        * closures (some of which are potentially initiating more writes on the
928        * transport) are executed on the t->combiner.
929        *
930        * The reason for scheduling on finally_scheduler is to make sure we batch
931        * as many writes as possible. 'write_action_begin_locked' is the function
932        * that gathers all the relevant bytes (which are at various places in the
933        * grpc_chttp2_transport structure) and append them to 'outbuf' field in
934        * grpc_chttp2_transport thereby batching what would have been potentially
935        * multiple write operations.
936        *
937        * Also, 'write_action_begin_locked' only gathers the bytes into outbuf.
938        * It does not call the endpoint to write the bytes. That is done by the
939        * 'write_action' (which is scheduled by 'write_action_begin_locked') */
940       GRPC_CLOSURE_SCHED(
941           GRPC_CLOSURE_INIT(&t->write_action_begin_locked,
942                             write_action_begin_locked, t,
943                             grpc_combiner_finally_scheduler(t->combiner)),
944           GRPC_ERROR_NONE);
945       break;
946     case GRPC_CHTTP2_WRITE_STATE_WRITING:
947       set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
948                       grpc_chttp2_initiate_write_reason_string(reason));
949       break;
950     case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
951       break;
952   }
953 }
954 
grpc_chttp2_mark_stream_writable(grpc_chttp2_transport * t,grpc_chttp2_stream * s)955 void grpc_chttp2_mark_stream_writable(grpc_chttp2_transport* t,
956                                       grpc_chttp2_stream* s) {
957   if (t->closed_with_error == GRPC_ERROR_NONE &&
958       grpc_chttp2_list_add_writable_stream(t, s)) {
959     GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become");
960   }
961 }
962 
write_scheduler(grpc_chttp2_transport * t,bool early_results_scheduled,bool partial_write)963 static grpc_closure_scheduler* write_scheduler(grpc_chttp2_transport* t,
964                                                bool early_results_scheduled,
965                                                bool partial_write) {
966   /* if it's not the first write in a batch, always offload to the executor:
967      we'll probably end up queuing against the kernel anyway, so we'll likely
968      get better latency overall if we switch writing work elsewhere and continue
969      with application work above */
970   if (!t->is_first_write_in_batch) {
971     return grpc_executor_scheduler(GRPC_EXECUTOR_SHORT);
972   }
973   /* equivalently, if it's a partial write, we *know* we're going to be taking a
974      thread jump to write it because of the above, may as well do so
975      immediately */
976   if (partial_write) {
977     return grpc_executor_scheduler(GRPC_EXECUTOR_SHORT);
978   }
979   switch (t->opt_target) {
980     case GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT:
981       /* executor gives us the largest probability of being able to batch a
982        * write with others on this transport */
983       return grpc_executor_scheduler(GRPC_EXECUTOR_SHORT);
984     case GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY:
985       return grpc_schedule_on_exec_ctx;
986   }
987   GPR_UNREACHABLE_CODE(return nullptr);
988 }
989 
990 #define WRITE_STATE_TUPLE_TO_INT(p, i) (2 * (int)(p) + (int)(i))
begin_writing_desc(bool partial,bool inlined)991 static const char* begin_writing_desc(bool partial, bool inlined) {
992   switch (WRITE_STATE_TUPLE_TO_INT(partial, inlined)) {
993     case WRITE_STATE_TUPLE_TO_INT(false, false):
994       return "begin write in background";
995     case WRITE_STATE_TUPLE_TO_INT(false, true):
996       return "begin write in current thread";
997     case WRITE_STATE_TUPLE_TO_INT(true, false):
998       return "begin partial write in background";
999     case WRITE_STATE_TUPLE_TO_INT(true, true):
1000       return "begin partial write in current thread";
1001   }
1002   GPR_UNREACHABLE_CODE(return "bad state tuple");
1003 }
1004 
write_action_begin_locked(void * gt,grpc_error * error_ignored)1005 static void write_action_begin_locked(void* gt, grpc_error* error_ignored) {
1006   GPR_TIMER_SCOPE("write_action_begin_locked", 0);
1007   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(gt);
1008   GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE);
1009   grpc_chttp2_begin_write_result r;
1010   if (t->closed_with_error != GRPC_ERROR_NONE) {
1011     r.writing = false;
1012   } else {
1013     r = grpc_chttp2_begin_write(t);
1014   }
1015   if (r.writing) {
1016     if (r.partial) {
1017       GRPC_STATS_INC_HTTP2_PARTIAL_WRITES();
1018     }
1019     if (!t->is_first_write_in_batch) {
1020       GRPC_STATS_INC_HTTP2_WRITES_CONTINUED();
1021     }
1022     grpc_closure_scheduler* scheduler =
1023         write_scheduler(t, r.early_results_scheduled, r.partial);
1024     if (scheduler != grpc_schedule_on_exec_ctx) {
1025       GRPC_STATS_INC_HTTP2_WRITES_OFFLOADED();
1026     }
1027     set_write_state(
1028         t,
1029         r.partial ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE
1030                   : GRPC_CHTTP2_WRITE_STATE_WRITING,
1031         begin_writing_desc(r.partial, scheduler == grpc_schedule_on_exec_ctx));
1032     GRPC_CLOSURE_SCHED(
1033         GRPC_CLOSURE_INIT(&t->write_action, write_action, t, scheduler),
1034         GRPC_ERROR_NONE);
1035   } else {
1036     GRPC_STATS_INC_HTTP2_SPURIOUS_WRITES_BEGUN();
1037     set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "begin writing nothing");
1038     GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing");
1039   }
1040 }
1041 
write_action(void * gt,grpc_error * error)1042 static void write_action(void* gt, grpc_error* error) {
1043   GPR_TIMER_SCOPE("write_action", 0);
1044   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(gt);
1045   grpc_endpoint_write(
1046       t->ep, &t->outbuf,
1047       GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end_locked, t,
1048                         grpc_combiner_scheduler(t->combiner)),
1049       nullptr);
1050 }
1051 
1052 /* Callback from the grpc_endpoint after bytes have been written by calling
1053  * sendmsg */
write_action_end_locked(void * tp,grpc_error * error)1054 static void write_action_end_locked(void* tp, grpc_error* error) {
1055   GPR_TIMER_SCOPE("terminate_writing_with_lock", 0);
1056   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
1057 
1058   if (error != GRPC_ERROR_NONE) {
1059     close_transport_locked(t, GRPC_ERROR_REF(error));
1060   }
1061 
1062   if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED) {
1063     t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SENT;
1064     if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
1065       close_transport_locked(
1066           t, GRPC_ERROR_CREATE_FROM_STATIC_STRING("goaway sent"));
1067     }
1068   }
1069 
1070   switch (t->write_state) {
1071     case GRPC_CHTTP2_WRITE_STATE_IDLE:
1072       GPR_UNREACHABLE_CODE(break);
1073     case GRPC_CHTTP2_WRITE_STATE_WRITING:
1074       GPR_TIMER_MARK("state=writing", 0);
1075       set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "finish writing");
1076       break;
1077     case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
1078       GPR_TIMER_MARK("state=writing_stale_no_poller", 0);
1079       set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, "continue writing");
1080       t->is_first_write_in_batch = false;
1081       GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
1082       GRPC_CLOSURE_RUN(
1083           GRPC_CLOSURE_INIT(&t->write_action_begin_locked,
1084                             write_action_begin_locked, t,
1085                             grpc_combiner_finally_scheduler(t->combiner)),
1086           GRPC_ERROR_NONE);
1087       break;
1088   }
1089 
1090   grpc_chttp2_end_write(t, GRPC_ERROR_REF(error));
1091 
1092   GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing");
1093 }
1094 
1095 // Dirties an HTTP2 setting to be sent out next time a writing path occurs.
1096 // If the change needs to occur immediately, manually initiate a write.
queue_setting_update(grpc_chttp2_transport * t,grpc_chttp2_setting_id id,uint32_t value)1097 static void queue_setting_update(grpc_chttp2_transport* t,
1098                                  grpc_chttp2_setting_id id, uint32_t value) {
1099   const grpc_chttp2_setting_parameters* sp =
1100       &grpc_chttp2_settings_parameters[id];
1101   uint32_t use_value = GPR_CLAMP(value, sp->min_value, sp->max_value);
1102   if (use_value != value) {
1103     gpr_log(GPR_INFO, "Requested parameter %s clamped from %d to %d", sp->name,
1104             value, use_value);
1105   }
1106   if (use_value != t->settings[GRPC_LOCAL_SETTINGS][id]) {
1107     t->settings[GRPC_LOCAL_SETTINGS][id] = use_value;
1108     t->dirtied_local_settings = 1;
1109   }
1110 }
1111 
grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport * t,uint32_t goaway_error,grpc_slice goaway_text)1112 void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
1113                                      uint32_t goaway_error,
1114                                      grpc_slice goaway_text) {
1115   // GRPC_CHTTP2_IF_TRACING(
1116   //     gpr_log(GPR_INFO, "got goaway [%d]: %s", goaway_error, msg));
1117 
1118   // Discard the error from a previous goaway frame (if any)
1119   if (t->goaway_error != GRPC_ERROR_NONE) {
1120     GRPC_ERROR_UNREF(t->goaway_error);
1121   }
1122   t->goaway_error = grpc_error_set_str(
1123       grpc_error_set_int(
1124           GRPC_ERROR_CREATE_FROM_STATIC_STRING("GOAWAY received"),
1125           GRPC_ERROR_INT_HTTP2_ERROR, static_cast<intptr_t>(goaway_error)),
1126       GRPC_ERROR_STR_RAW_BYTES, goaway_text);
1127 
1128   /* When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug
1129    * data equal to "too_many_pings", it should log the occurrence at a log level
1130    * that is enabled by default and double the configured KEEPALIVE_TIME used
1131    * for new connections on that channel. */
1132   if (GPR_UNLIKELY(t->is_client &&
1133                    goaway_error == GRPC_HTTP2_ENHANCE_YOUR_CALM &&
1134                    grpc_slice_str_cmp(goaway_text, "too_many_pings") == 0)) {
1135     gpr_log(GPR_ERROR,
1136             "Received a GOAWAY with error code ENHANCE_YOUR_CALM and debug "
1137             "data equal to \"too_many_pings\"");
1138     double current_keepalive_time_ms = static_cast<double>(t->keepalive_time);
1139     t->keepalive_time =
1140         current_keepalive_time_ms > INT_MAX / KEEPALIVE_TIME_BACKOFF_MULTIPLIER
1141             ? GRPC_MILLIS_INF_FUTURE
1142             : static_cast<grpc_millis>(current_keepalive_time_ms *
1143                                        KEEPALIVE_TIME_BACKOFF_MULTIPLIER);
1144   }
1145 
1146   /* lie: use transient failure from the transport to indicate goaway has been
1147    * received */
1148   connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE,
1149                          GRPC_ERROR_REF(t->goaway_error), "got_goaway");
1150 }
1151 
maybe_start_some_streams(grpc_chttp2_transport * t)1152 static void maybe_start_some_streams(grpc_chttp2_transport* t) {
1153   grpc_chttp2_stream* s;
1154   /* start streams where we have free grpc_chttp2_stream ids and free
1155    * concurrency */
1156   while (t->next_stream_id <= MAX_CLIENT_STREAM_ID &&
1157          grpc_chttp2_stream_map_size(&t->stream_map) <
1158              t->settings[GRPC_PEER_SETTINGS]
1159                         [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] &&
1160          grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1161     /* safe since we can't (legally) be parsing this stream yet */
1162     GRPC_CHTTP2_IF_TRACING(gpr_log(
1163         GPR_INFO, "HTTP:%s: Allocating new grpc_chttp2_stream %p to id %d",
1164         t->is_client ? "CLI" : "SVR", s, t->next_stream_id));
1165 
1166     GPR_ASSERT(s->id == 0);
1167     s->id = t->next_stream_id;
1168     t->next_stream_id += 2;
1169 
1170     if (t->next_stream_id >= MAX_CLIENT_STREAM_ID) {
1171       connectivity_state_set(
1172           t, GRPC_CHANNEL_TRANSIENT_FAILURE,
1173           GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream IDs exhausted"),
1174           "no_more_stream_ids");
1175     }
1176 
1177     grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
1178     post_destructive_reclaimer(t);
1179     grpc_chttp2_mark_stream_writable(t, s);
1180     grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM);
1181   }
1182   /* cancel out streams that will never be started */
1183   while (t->next_stream_id >= MAX_CLIENT_STREAM_ID &&
1184          grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1185     grpc_chttp2_cancel_stream(
1186         t, s,
1187         grpc_error_set_int(
1188             GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream IDs exhausted"),
1189             GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1190   }
1191 }
1192 
1193 /* Flag that this closure barrier may be covering a write in a pollset, and so
1194    we should not complete this closure until we can prove that the write got
1195    scheduled */
1196 #define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 0)
1197 /* First bit of the reference count, stored in the high order bits (with the low
1198    bits being used for flags defined above) */
1199 #define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16)
1200 
add_closure_barrier(grpc_closure * closure)1201 static grpc_closure* add_closure_barrier(grpc_closure* closure) {
1202   closure->next_data.scratch += CLOSURE_BARRIER_FIRST_REF_BIT;
1203   return closure;
1204 }
1205 
null_then_run_closure(grpc_closure ** closure,grpc_error * error)1206 static void null_then_run_closure(grpc_closure** closure, grpc_error* error) {
1207   grpc_closure* c = *closure;
1208   *closure = nullptr;
1209   GRPC_CLOSURE_RUN(c, error);
1210 }
1211 
grpc_chttp2_complete_closure_step(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_closure ** pclosure,grpc_error * error,const char * desc)1212 void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
1213                                        grpc_chttp2_stream* s,
1214                                        grpc_closure** pclosure,
1215                                        grpc_error* error, const char* desc) {
1216   grpc_closure* closure = *pclosure;
1217   *pclosure = nullptr;
1218   if (closure == nullptr) {
1219     GRPC_ERROR_UNREF(error);
1220     return;
1221   }
1222   closure->next_data.scratch -= CLOSURE_BARRIER_FIRST_REF_BIT;
1223   if (grpc_http_trace.enabled()) {
1224     const char* errstr = grpc_error_string(error);
1225     gpr_log(
1226         GPR_INFO,
1227         "complete_closure_step: t=%p %p refs=%d flags=0x%04x desc=%s err=%s "
1228         "write_state=%s",
1229         t, closure,
1230         static_cast<int>(closure->next_data.scratch /
1231                          CLOSURE_BARRIER_FIRST_REF_BIT),
1232         static_cast<int>(closure->next_data.scratch %
1233                          CLOSURE_BARRIER_FIRST_REF_BIT),
1234         desc, errstr, write_state_name(t->write_state));
1235   }
1236   if (error != GRPC_ERROR_NONE) {
1237     if (closure->error_data.error == GRPC_ERROR_NONE) {
1238       closure->error_data.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1239           "Error in HTTP transport completing operation");
1240       closure->error_data.error = grpc_error_set_str(
1241           closure->error_data.error, GRPC_ERROR_STR_TARGET_ADDRESS,
1242           grpc_slice_from_copied_string(t->peer_string));
1243     }
1244     closure->error_data.error =
1245         grpc_error_add_child(closure->error_data.error, error);
1246   }
1247   if (closure->next_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) {
1248     if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) ||
1249         !(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) {
1250       GRPC_CLOSURE_RUN(closure, closure->error_data.error);
1251     } else {
1252       grpc_closure_list_append(&t->run_after_write, closure,
1253                                closure->error_data.error);
1254     }
1255   }
1256 }
1257 
contains_non_ok_status(grpc_metadata_batch * batch)1258 static bool contains_non_ok_status(grpc_metadata_batch* batch) {
1259   if (batch->idx.named.grpc_status != nullptr) {
1260     return !grpc_mdelem_eq(batch->idx.named.grpc_status->md,
1261                            GRPC_MDELEM_GRPC_STATUS_0);
1262   }
1263   return false;
1264 }
1265 
maybe_become_writable_due_to_send_msg(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1266 static void maybe_become_writable_due_to_send_msg(grpc_chttp2_transport* t,
1267                                                   grpc_chttp2_stream* s) {
1268   if (s->id != 0 && (!s->write_buffering ||
1269                      s->flow_controlled_buffer.length > t->write_buffer_size)) {
1270     grpc_chttp2_mark_stream_writable(t, s);
1271     grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE);
1272   }
1273 }
1274 
add_fetched_slice_locked(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1275 static void add_fetched_slice_locked(grpc_chttp2_transport* t,
1276                                      grpc_chttp2_stream* s) {
1277   s->fetched_send_message_length +=
1278       static_cast<uint32_t> GRPC_SLICE_LENGTH(s->fetching_slice);
1279   grpc_slice_buffer_add(&s->flow_controlled_buffer, s->fetching_slice);
1280   maybe_become_writable_due_to_send_msg(t, s);
1281 }
1282 
continue_fetching_send_locked(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1283 static void continue_fetching_send_locked(grpc_chttp2_transport* t,
1284                                           grpc_chttp2_stream* s) {
1285   for (;;) {
1286     if (s->fetching_send_message == nullptr) {
1287       /* Stream was cancelled before message fetch completed */
1288       abort(); /* TODO(ctiller): what cleanup here? */
1289       return;  /* early out */
1290     }
1291     if (s->fetched_send_message_length == s->fetching_send_message->length()) {
1292       int64_t notify_offset = s->next_message_end_offset;
1293       if (notify_offset <= s->flow_controlled_bytes_written) {
1294         grpc_chttp2_complete_closure_step(
1295             t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE,
1296             "fetching_send_message_finished");
1297       } else {
1298         grpc_chttp2_write_cb* cb = t->write_cb_pool;
1299         if (cb == nullptr) {
1300           cb = static_cast<grpc_chttp2_write_cb*>(gpr_malloc(sizeof(*cb)));
1301         } else {
1302           t->write_cb_pool = cb->next;
1303         }
1304         cb->call_at_byte = notify_offset;
1305         cb->closure = s->fetching_send_message_finished;
1306         s->fetching_send_message_finished = nullptr;
1307         grpc_chttp2_write_cb** list =
1308             s->fetching_send_message->flags() & GRPC_WRITE_THROUGH
1309                 ? &s->on_write_finished_cbs
1310                 : &s->on_flow_controlled_cbs;
1311         cb->next = *list;
1312         *list = cb;
1313       }
1314       s->fetching_send_message.reset();
1315       return; /* early out */
1316     } else if (s->fetching_send_message->Next(UINT32_MAX,
1317                                               &s->complete_fetch_locked)) {
1318       grpc_error* error = s->fetching_send_message->Pull(&s->fetching_slice);
1319       if (error != GRPC_ERROR_NONE) {
1320         s->fetching_send_message.reset();
1321         grpc_chttp2_cancel_stream(t, s, error);
1322       } else {
1323         add_fetched_slice_locked(t, s);
1324       }
1325     }
1326   }
1327 }
1328 
complete_fetch_locked(void * gs,grpc_error * error)1329 static void complete_fetch_locked(void* gs, grpc_error* error) {
1330   grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(gs);
1331   grpc_chttp2_transport* t = s->t;
1332   if (error == GRPC_ERROR_NONE) {
1333     error = s->fetching_send_message->Pull(&s->fetching_slice);
1334     if (error == GRPC_ERROR_NONE) {
1335       add_fetched_slice_locked(t, s);
1336       continue_fetching_send_locked(t, s);
1337     }
1338   }
1339   if (error != GRPC_ERROR_NONE) {
1340     s->fetching_send_message.reset();
1341     grpc_chttp2_cancel_stream(t, s, error);
1342   }
1343 }
1344 
do_nothing(void * arg,grpc_error * error)1345 static void do_nothing(void* arg, grpc_error* error) {}
1346 
log_metadata(const grpc_metadata_batch * md_batch,uint32_t id,bool is_client,bool is_initial)1347 static void log_metadata(const grpc_metadata_batch* md_batch, uint32_t id,
1348                          bool is_client, bool is_initial) {
1349   for (grpc_linked_mdelem* md = md_batch->list.head; md != nullptr;
1350        md = md->next) {
1351     char* key = grpc_slice_to_c_string(GRPC_MDKEY(md->md));
1352     char* value = grpc_slice_to_c_string(GRPC_MDVALUE(md->md));
1353     gpr_log(GPR_INFO, "HTTP:%d:%s:%s: %s: %s", id, is_initial ? "HDR" : "TRL",
1354             is_client ? "CLI" : "SVR", key, value);
1355     gpr_free(key);
1356     gpr_free(value);
1357   }
1358 }
1359 
perform_stream_op_locked(void * stream_op,grpc_error * error_ignored)1360 static void perform_stream_op_locked(void* stream_op,
1361                                      grpc_error* error_ignored) {
1362   GPR_TIMER_SCOPE("perform_stream_op_locked", 0);
1363 
1364   grpc_transport_stream_op_batch* op =
1365       static_cast<grpc_transport_stream_op_batch*>(stream_op);
1366   grpc_chttp2_stream* s =
1367       static_cast<grpc_chttp2_stream*>(op->handler_private.extra_arg);
1368   grpc_transport_stream_op_batch_payload* op_payload = op->payload;
1369   grpc_chttp2_transport* t = s->t;
1370 
1371   GRPC_STATS_INC_HTTP2_OP_BATCHES();
1372 
1373   if (grpc_http_trace.enabled()) {
1374     char* str = grpc_transport_stream_op_batch_string(op);
1375     gpr_log(GPR_INFO, "perform_stream_op_locked: %s; on_complete = %p", str,
1376             op->on_complete);
1377     gpr_free(str);
1378     if (op->send_initial_metadata) {
1379       log_metadata(op_payload->send_initial_metadata.send_initial_metadata,
1380                    s->id, t->is_client, true);
1381     }
1382     if (op->send_trailing_metadata) {
1383       log_metadata(op_payload->send_trailing_metadata.send_trailing_metadata,
1384                    s->id, t->is_client, false);
1385     }
1386   }
1387 
1388   grpc_closure* on_complete = op->on_complete;
1389   // TODO(roth): This is a hack needed because we use data inside of the
1390   // closure itself to do the barrier calculation (i.e., to ensure that
1391   // we don't schedule the closure until all ops in the batch have been
1392   // completed).  This can go away once we move to a new C++ closure API
1393   // that provides the ability to create a barrier closure.
1394   if (on_complete == nullptr) {
1395     on_complete = GRPC_CLOSURE_INIT(&op->handler_private.closure, do_nothing,
1396                                     nullptr, grpc_schedule_on_exec_ctx);
1397   }
1398 
1399   /* use final_data as a barrier until enqueue time; the inital counter is
1400      dropped at the end of this function */
1401   on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT;
1402   on_complete->error_data.error = GRPC_ERROR_NONE;
1403 
1404   if (op->cancel_stream) {
1405     GRPC_STATS_INC_HTTP2_OP_CANCEL();
1406     grpc_chttp2_cancel_stream(t, s, op_payload->cancel_stream.cancel_error);
1407   }
1408 
1409   if (op->send_initial_metadata) {
1410     GRPC_STATS_INC_HTTP2_OP_SEND_INITIAL_METADATA();
1411     GPR_ASSERT(s->send_initial_metadata_finished == nullptr);
1412     on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
1413 
1414     /* Identify stream compression */
1415     if (op_payload->send_initial_metadata.send_initial_metadata->idx.named
1416                 .content_encoding == nullptr ||
1417         grpc_stream_compression_method_parse(
1418             GRPC_MDVALUE(
1419                 op_payload->send_initial_metadata.send_initial_metadata->idx
1420                     .named.content_encoding->md),
1421             true, &s->stream_compression_method) == 0) {
1422       s->stream_compression_method = GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS;
1423     }
1424 
1425     s->send_initial_metadata_finished = add_closure_barrier(on_complete);
1426     s->send_initial_metadata =
1427         op_payload->send_initial_metadata.send_initial_metadata;
1428     const size_t metadata_size =
1429         grpc_metadata_batch_size(s->send_initial_metadata);
1430     const size_t metadata_peer_limit =
1431         t->settings[GRPC_PEER_SETTINGS]
1432                    [GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE];
1433     if (t->is_client) {
1434       s->deadline = GPR_MIN(s->deadline, s->send_initial_metadata->deadline);
1435     }
1436     if (metadata_size > metadata_peer_limit) {
1437       grpc_chttp2_cancel_stream(
1438           t, s,
1439           grpc_error_set_int(
1440               grpc_error_set_int(
1441                   grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1442                                          "to-be-sent initial metadata size "
1443                                          "exceeds peer limit"),
1444                                      GRPC_ERROR_INT_SIZE,
1445                                      static_cast<intptr_t>(metadata_size)),
1446                   GRPC_ERROR_INT_LIMIT,
1447                   static_cast<intptr_t>(metadata_peer_limit)),
1448               GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED));
1449     } else {
1450       if (contains_non_ok_status(s->send_initial_metadata)) {
1451         s->seen_error = true;
1452       }
1453       if (!s->write_closed) {
1454         if (t->is_client) {
1455           if (t->closed_with_error == GRPC_ERROR_NONE) {
1456             GPR_ASSERT(s->id == 0);
1457             grpc_chttp2_list_add_waiting_for_concurrency(t, s);
1458             maybe_start_some_streams(t);
1459           } else {
1460             grpc_chttp2_cancel_stream(
1461                 t, s,
1462                 grpc_error_set_int(
1463                     GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1464                         "Transport closed", &t->closed_with_error, 1),
1465                     GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1466           }
1467         } else {
1468           GPR_ASSERT(s->id != 0);
1469           grpc_chttp2_mark_stream_writable(t, s);
1470           if (!(op->send_message &&
1471                 (op->payload->send_message.send_message->flags() &
1472                  GRPC_WRITE_BUFFER_HINT))) {
1473             grpc_chttp2_initiate_write(
1474                 t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA);
1475           }
1476         }
1477       } else {
1478         s->send_initial_metadata = nullptr;
1479         grpc_chttp2_complete_closure_step(
1480             t, s, &s->send_initial_metadata_finished,
1481             GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1482                 "Attempt to send initial metadata after stream was closed",
1483                 &s->write_closed_error, 1),
1484             "send_initial_metadata_finished");
1485       }
1486     }
1487     if (op_payload->send_initial_metadata.peer_string != nullptr) {
1488       gpr_atm_rel_store(op_payload->send_initial_metadata.peer_string,
1489                         (gpr_atm)t->peer_string);
1490     }
1491   }
1492 
1493   if (op->send_message) {
1494     GRPC_STATS_INC_HTTP2_OP_SEND_MESSAGE();
1495     GRPC_STATS_INC_HTTP2_SEND_MESSAGE_SIZE(
1496         op->payload->send_message.send_message->length());
1497     on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
1498     s->fetching_send_message_finished = add_closure_barrier(op->on_complete);
1499     if (s->write_closed) {
1500       // Return an error unless the client has already received trailing
1501       // metadata from the server, since an application using a
1502       // streaming call might send another message before getting a
1503       // recv_message failure, breaking out of its loop, and then
1504       // starting recv_trailing_metadata.
1505       op->payload->send_message.send_message.reset();
1506       grpc_chttp2_complete_closure_step(
1507           t, s, &s->fetching_send_message_finished,
1508           t->is_client && s->received_trailing_metadata
1509               ? GRPC_ERROR_NONE
1510               : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1511                     "Attempt to send message after stream was closed",
1512                     &s->write_closed_error, 1),
1513           "fetching_send_message_finished");
1514     } else {
1515       GPR_ASSERT(s->fetching_send_message == nullptr);
1516       uint8_t* frame_hdr = grpc_slice_buffer_tiny_add(
1517           &s->flow_controlled_buffer, GRPC_HEADER_SIZE_IN_BYTES);
1518       uint32_t flags = op_payload->send_message.send_message->flags();
1519       frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0;
1520       size_t len = op_payload->send_message.send_message->length();
1521       frame_hdr[1] = static_cast<uint8_t>(len >> 24);
1522       frame_hdr[2] = static_cast<uint8_t>(len >> 16);
1523       frame_hdr[3] = static_cast<uint8_t>(len >> 8);
1524       frame_hdr[4] = static_cast<uint8_t>(len);
1525       s->fetching_send_message =
1526           std::move(op_payload->send_message.send_message);
1527       s->fetched_send_message_length = 0;
1528       s->next_message_end_offset =
1529           s->flow_controlled_bytes_written +
1530           static_cast<int64_t>(s->flow_controlled_buffer.length) +
1531           static_cast<int64_t>(len);
1532       if (flags & GRPC_WRITE_BUFFER_HINT) {
1533         s->next_message_end_offset -= t->write_buffer_size;
1534         s->write_buffering = true;
1535       } else {
1536         s->write_buffering = false;
1537       }
1538       continue_fetching_send_locked(t, s);
1539       maybe_become_writable_due_to_send_msg(t, s);
1540     }
1541   }
1542 
1543   if (op->send_trailing_metadata) {
1544     GRPC_STATS_INC_HTTP2_OP_SEND_TRAILING_METADATA();
1545     GPR_ASSERT(s->send_trailing_metadata_finished == nullptr);
1546     on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
1547     s->send_trailing_metadata_finished = add_closure_barrier(on_complete);
1548     s->send_trailing_metadata =
1549         op_payload->send_trailing_metadata.send_trailing_metadata;
1550     s->write_buffering = false;
1551     const size_t metadata_size =
1552         grpc_metadata_batch_size(s->send_trailing_metadata);
1553     const size_t metadata_peer_limit =
1554         t->settings[GRPC_PEER_SETTINGS]
1555                    [GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE];
1556     if (metadata_size > metadata_peer_limit) {
1557       grpc_chttp2_cancel_stream(
1558           t, s,
1559           grpc_error_set_int(
1560               grpc_error_set_int(
1561                   grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1562                                          "to-be-sent trailing metadata size "
1563                                          "exceeds peer limit"),
1564                                      GRPC_ERROR_INT_SIZE,
1565                                      static_cast<intptr_t>(metadata_size)),
1566                   GRPC_ERROR_INT_LIMIT,
1567                   static_cast<intptr_t>(metadata_peer_limit)),
1568               GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED));
1569     } else {
1570       if (contains_non_ok_status(s->send_trailing_metadata)) {
1571         s->seen_error = true;
1572       }
1573       if (s->write_closed) {
1574         s->send_trailing_metadata = nullptr;
1575         grpc_chttp2_complete_closure_step(
1576             t, s, &s->send_trailing_metadata_finished,
1577             grpc_metadata_batch_is_empty(
1578                 op->payload->send_trailing_metadata.send_trailing_metadata)
1579                 ? GRPC_ERROR_NONE
1580                 : GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1581                       "Attempt to send trailing metadata after "
1582                       "stream was closed"),
1583             "send_trailing_metadata_finished");
1584       } else if (s->id != 0) {
1585         /* TODO(ctiller): check if there's flow control for any outstanding
1586            bytes before going writable */
1587         grpc_chttp2_mark_stream_writable(t, s);
1588         grpc_chttp2_initiate_write(
1589             t, GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA);
1590       }
1591     }
1592   }
1593 
1594   if (op->recv_initial_metadata) {
1595     GRPC_STATS_INC_HTTP2_OP_RECV_INITIAL_METADATA();
1596     GPR_ASSERT(s->recv_initial_metadata_ready == nullptr);
1597     s->recv_initial_metadata_ready =
1598         op_payload->recv_initial_metadata.recv_initial_metadata_ready;
1599     s->recv_initial_metadata =
1600         op_payload->recv_initial_metadata.recv_initial_metadata;
1601     s->trailing_metadata_available =
1602         op_payload->recv_initial_metadata.trailing_metadata_available;
1603     if (op_payload->recv_initial_metadata.peer_string != nullptr) {
1604       gpr_atm_rel_store(op_payload->recv_initial_metadata.peer_string,
1605                         (gpr_atm)t->peer_string);
1606     }
1607     grpc_chttp2_maybe_complete_recv_initial_metadata(t, s);
1608   }
1609 
1610   if (op->recv_message) {
1611     GRPC_STATS_INC_HTTP2_OP_RECV_MESSAGE();
1612     size_t before = 0;
1613     GPR_ASSERT(s->recv_message_ready == nullptr);
1614     GPR_ASSERT(!s->pending_byte_stream);
1615     s->recv_message_ready = op_payload->recv_message.recv_message_ready;
1616     s->recv_message = op_payload->recv_message.recv_message;
1617     if (s->id != 0) {
1618       if (!s->read_closed) {
1619         before = s->frame_storage.length +
1620                  s->unprocessed_incoming_frames_buffer.length;
1621       }
1622     }
1623     grpc_chttp2_maybe_complete_recv_message(t, s);
1624     if (s->id != 0) {
1625       if (!s->read_closed && s->frame_storage.length == 0) {
1626         size_t after = s->frame_storage.length +
1627                        s->unprocessed_incoming_frames_buffer_cached_length;
1628         s->flow_control->IncomingByteStreamUpdate(GRPC_HEADER_SIZE_IN_BYTES,
1629                                                   before - after);
1630         grpc_chttp2_act_on_flowctl_action(s->flow_control->MakeAction(), t, s);
1631       }
1632     }
1633   }
1634 
1635   if (op->recv_trailing_metadata) {
1636     GRPC_STATS_INC_HTTP2_OP_RECV_TRAILING_METADATA();
1637     GPR_ASSERT(s->collecting_stats == nullptr);
1638     s->collecting_stats = op_payload->recv_trailing_metadata.collect_stats;
1639     GPR_ASSERT(s->recv_trailing_metadata_finished == nullptr);
1640     s->recv_trailing_metadata_finished =
1641         op_payload->recv_trailing_metadata.recv_trailing_metadata_ready;
1642     s->recv_trailing_metadata =
1643         op_payload->recv_trailing_metadata.recv_trailing_metadata;
1644     s->final_metadata_requested = true;
1645     grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
1646   }
1647 
1648   grpc_chttp2_complete_closure_step(t, s, &on_complete, GRPC_ERROR_NONE,
1649                                     "op->on_complete");
1650 
1651   GRPC_CHTTP2_STREAM_UNREF(s, "perform_stream_op");
1652 }
1653 
perform_stream_op(grpc_transport * gt,grpc_stream * gs,grpc_transport_stream_op_batch * op)1654 static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
1655                               grpc_transport_stream_op_batch* op) {
1656   GPR_TIMER_SCOPE("perform_stream_op", 0);
1657   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
1658   grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs);
1659 
1660   if (!t->is_client) {
1661     if (op->send_initial_metadata) {
1662       grpc_millis deadline =
1663           op->payload->send_initial_metadata.send_initial_metadata->deadline;
1664       GPR_ASSERT(deadline == GRPC_MILLIS_INF_FUTURE);
1665     }
1666     if (op->send_trailing_metadata) {
1667       grpc_millis deadline =
1668           op->payload->send_trailing_metadata.send_trailing_metadata->deadline;
1669       GPR_ASSERT(deadline == GRPC_MILLIS_INF_FUTURE);
1670     }
1671   }
1672 
1673   if (grpc_http_trace.enabled()) {
1674     char* str = grpc_transport_stream_op_batch_string(op);
1675     gpr_log(GPR_INFO, "perform_stream_op[s=%p]: %s", s, str);
1676     gpr_free(str);
1677   }
1678 
1679   op->handler_private.extra_arg = gs;
1680   GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op");
1681   GRPC_CLOSURE_SCHED(
1682       GRPC_CLOSURE_INIT(&op->handler_private.closure, perform_stream_op_locked,
1683                         op, grpc_combiner_scheduler(t->combiner)),
1684       GRPC_ERROR_NONE);
1685 }
1686 
cancel_pings(grpc_chttp2_transport * t,grpc_error * error)1687 static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) {
1688   /* callback remaining pings: they're not allowed to call into the transpot,
1689      and maybe they hold resources that need to be freed */
1690   grpc_chttp2_ping_queue* pq = &t->ping_queue;
1691   GPR_ASSERT(error != GRPC_ERROR_NONE);
1692   for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) {
1693     grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error));
1694     GRPC_CLOSURE_LIST_SCHED(&pq->lists[j]);
1695   }
1696   GRPC_ERROR_UNREF(error);
1697 }
1698 
send_ping_locked(grpc_chttp2_transport * t,grpc_closure * on_initiate,grpc_closure * on_ack)1699 static void send_ping_locked(grpc_chttp2_transport* t,
1700                              grpc_closure* on_initiate, grpc_closure* on_ack) {
1701   if (t->closed_with_error != GRPC_ERROR_NONE) {
1702     GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_REF(t->closed_with_error));
1703     GRPC_CLOSURE_SCHED(on_ack, GRPC_ERROR_REF(t->closed_with_error));
1704     return;
1705   }
1706   grpc_chttp2_ping_queue* pq = &t->ping_queue;
1707   grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE], on_initiate,
1708                            GRPC_ERROR_NONE);
1709   grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT], on_ack,
1710                            GRPC_ERROR_NONE);
1711 }
1712 
1713 /*
1714  * Specialized form of send_ping_locked for keepalive ping. If there is already
1715  * a ping in progress, the keepalive ping would piggyback onto that ping,
1716  * instead of waiting for that ping to complete and then starting a new ping.
1717  */
send_keepalive_ping_locked(grpc_chttp2_transport * t)1718 static void send_keepalive_ping_locked(grpc_chttp2_transport* t) {
1719   if (t->closed_with_error != GRPC_ERROR_NONE) {
1720     GRPC_CLOSURE_RUN(&t->start_keepalive_ping_locked,
1721                      GRPC_ERROR_REF(t->closed_with_error));
1722     GRPC_CLOSURE_RUN(&t->finish_keepalive_ping_locked,
1723                      GRPC_ERROR_REF(t->closed_with_error));
1724     return;
1725   }
1726   grpc_chttp2_ping_queue* pq = &t->ping_queue;
1727   if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) {
1728     /* There is a ping in flight. Add yourself to the inflight closure list. */
1729     GRPC_CLOSURE_RUN(&t->start_keepalive_ping_locked, GRPC_ERROR_NONE);
1730     grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT],
1731                              &t->finish_keepalive_ping_locked, GRPC_ERROR_NONE);
1732     return;
1733   }
1734   grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE],
1735                            &t->start_keepalive_ping_locked, GRPC_ERROR_NONE);
1736   grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT],
1737                            &t->finish_keepalive_ping_locked, GRPC_ERROR_NONE);
1738 }
1739 
retry_initiate_ping_locked(void * tp,grpc_error * error)1740 static void retry_initiate_ping_locked(void* tp, grpc_error* error) {
1741   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
1742   t->ping_state.is_delayed_ping_timer_set = false;
1743   if (error == GRPC_ERROR_NONE) {
1744     grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING);
1745   }
1746   GRPC_CHTTP2_UNREF_TRANSPORT(t, "retry_initiate_ping_locked");
1747 }
1748 
grpc_chttp2_ack_ping(grpc_chttp2_transport * t,uint64_t id)1749 void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) {
1750   grpc_chttp2_ping_queue* pq = &t->ping_queue;
1751   if (pq->inflight_id != id) {
1752     char* from = grpc_endpoint_get_peer(t->ep);
1753     gpr_log(GPR_DEBUG, "Unknown ping response from %s: %" PRIx64, from, id);
1754     gpr_free(from);
1755     return;
1756   }
1757   GRPC_CLOSURE_LIST_SCHED(&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
1758   if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
1759     grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS);
1760   }
1761 }
1762 
send_goaway(grpc_chttp2_transport * t,grpc_error * error)1763 static void send_goaway(grpc_chttp2_transport* t, grpc_error* error) {
1764   t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED;
1765   grpc_http2_error_code http_error;
1766   grpc_slice slice;
1767   grpc_error_get_status(error, GRPC_MILLIS_INF_FUTURE, nullptr, &slice,
1768                         &http_error, nullptr);
1769   grpc_chttp2_goaway_append(t->last_new_stream_id,
1770                             static_cast<uint32_t>(http_error),
1771                             grpc_slice_ref_internal(slice), &t->qbuf);
1772   grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT);
1773   GRPC_ERROR_UNREF(error);
1774 }
1775 
grpc_chttp2_add_ping_strike(grpc_chttp2_transport * t)1776 void grpc_chttp2_add_ping_strike(grpc_chttp2_transport* t) {
1777   if (++t->ping_recv_state.ping_strikes > t->ping_policy.max_ping_strikes &&
1778       t->ping_policy.max_ping_strikes != 0) {
1779     send_goaway(t,
1780                 grpc_error_set_int(
1781                     GRPC_ERROR_CREATE_FROM_STATIC_STRING("too_many_pings"),
1782                     GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM));
1783     /*The transport will be closed after the write is done */
1784     close_transport_locked(
1785         t, grpc_error_set_int(
1786                GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many pings"),
1787                GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1788   }
1789 }
1790 
perform_transport_op_locked(void * stream_op,grpc_error * error_ignored)1791 static void perform_transport_op_locked(void* stream_op,
1792                                         grpc_error* error_ignored) {
1793   grpc_transport_op* op = static_cast<grpc_transport_op*>(stream_op);
1794   grpc_chttp2_transport* t =
1795       static_cast<grpc_chttp2_transport*>(op->handler_private.extra_arg);
1796 
1797   if (op->goaway_error) {
1798     send_goaway(t, op->goaway_error);
1799   }
1800 
1801   if (op->set_accept_stream) {
1802     t->channel_callback.accept_stream = op->set_accept_stream_fn;
1803     t->channel_callback.accept_stream_user_data =
1804         op->set_accept_stream_user_data;
1805   }
1806 
1807   if (op->bind_pollset) {
1808     grpc_endpoint_add_to_pollset(t->ep, op->bind_pollset);
1809   }
1810 
1811   if (op->bind_pollset_set) {
1812     grpc_endpoint_add_to_pollset_set(t->ep, op->bind_pollset_set);
1813   }
1814 
1815   if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
1816     send_ping_locked(t, op->send_ping.on_initiate, op->send_ping.on_ack);
1817     grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING);
1818   }
1819 
1820   if (op->on_connectivity_state_change != nullptr) {
1821     grpc_connectivity_state_notify_on_state_change(
1822         &t->channel_callback.state_tracker, op->connectivity_state,
1823         op->on_connectivity_state_change);
1824   }
1825 
1826   if (op->disconnect_with_error != GRPC_ERROR_NONE) {
1827     close_transport_locked(t, op->disconnect_with_error);
1828   }
1829 
1830   GRPC_CLOSURE_RUN(op->on_consumed, GRPC_ERROR_NONE);
1831 
1832   GRPC_CHTTP2_UNREF_TRANSPORT(t, "transport_op");
1833 }
1834 
perform_transport_op(grpc_transport * gt,grpc_transport_op * op)1835 static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
1836   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
1837   if (grpc_http_trace.enabled()) {
1838     char* msg = grpc_transport_op_string(op);
1839     gpr_log(GPR_INFO, "perform_transport_op[t=%p]: %s", t, msg);
1840     gpr_free(msg);
1841   }
1842   op->handler_private.extra_arg = gt;
1843   GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op");
1844   GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&op->handler_private.closure,
1845                                        perform_transport_op_locked, op,
1846                                        grpc_combiner_scheduler(t->combiner)),
1847                      GRPC_ERROR_NONE);
1848 }
1849 
1850 /*******************************************************************************
1851  * INPUT PROCESSING - GENERAL
1852  */
1853 
grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1854 void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport* t,
1855                                                       grpc_chttp2_stream* s) {
1856   if (s->recv_initial_metadata_ready != nullptr &&
1857       s->published_metadata[0] != GRPC_METADATA_NOT_PUBLISHED) {
1858     if (s->seen_error) {
1859       grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1860       if (!s->pending_byte_stream) {
1861         grpc_slice_buffer_reset_and_unref_internal(
1862             &s->unprocessed_incoming_frames_buffer);
1863       }
1864     }
1865     grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[0],
1866                                                  s->recv_initial_metadata);
1867     null_then_run_closure(&s->recv_initial_metadata_ready, GRPC_ERROR_NONE);
1868   }
1869 }
1870 
grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1871 void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t,
1872                                              grpc_chttp2_stream* s) {
1873   grpc_error* error = GRPC_ERROR_NONE;
1874   if (s->recv_message_ready != nullptr) {
1875     *s->recv_message = nullptr;
1876     if (s->final_metadata_requested && s->seen_error) {
1877       grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1878       if (!s->pending_byte_stream) {
1879         grpc_slice_buffer_reset_and_unref_internal(
1880             &s->unprocessed_incoming_frames_buffer);
1881       }
1882     }
1883     if (!s->pending_byte_stream) {
1884       while (s->unprocessed_incoming_frames_buffer.length > 0 ||
1885              s->frame_storage.length > 0) {
1886         if (s->unprocessed_incoming_frames_buffer.length == 0) {
1887           grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer,
1888                                  &s->frame_storage);
1889           s->unprocessed_incoming_frames_decompressed = false;
1890         }
1891         if (!s->unprocessed_incoming_frames_decompressed &&
1892             s->stream_decompression_method !=
1893                 GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS) {
1894           GPR_ASSERT(s->decompressed_data_buffer.length == 0);
1895           bool end_of_context;
1896           if (!s->stream_decompression_ctx) {
1897             s->stream_decompression_ctx =
1898                 grpc_stream_compression_context_create(
1899                     s->stream_decompression_method);
1900           }
1901           if (!grpc_stream_decompress(
1902                   s->stream_decompression_ctx,
1903                   &s->unprocessed_incoming_frames_buffer,
1904                   &s->decompressed_data_buffer, nullptr,
1905                   GRPC_HEADER_SIZE_IN_BYTES - s->decompressed_header_bytes,
1906                   &end_of_context)) {
1907             grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1908             grpc_slice_buffer_reset_and_unref_internal(
1909                 &s->unprocessed_incoming_frames_buffer);
1910             error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1911                 "Stream decompression error.");
1912           } else {
1913             s->decompressed_header_bytes += s->decompressed_data_buffer.length;
1914             if (s->decompressed_header_bytes == GRPC_HEADER_SIZE_IN_BYTES) {
1915               s->decompressed_header_bytes = 0;
1916             }
1917             error = grpc_deframe_unprocessed_incoming_frames(
1918                 &s->data_parser, s, &s->decompressed_data_buffer, nullptr,
1919                 s->recv_message);
1920             if (end_of_context) {
1921               grpc_stream_compression_context_destroy(
1922                   s->stream_decompression_ctx);
1923               s->stream_decompression_ctx = nullptr;
1924             }
1925           }
1926         } else {
1927           error = grpc_deframe_unprocessed_incoming_frames(
1928               &s->data_parser, s, &s->unprocessed_incoming_frames_buffer,
1929               nullptr, s->recv_message);
1930         }
1931         if (error != GRPC_ERROR_NONE) {
1932           s->seen_error = true;
1933           grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1934           grpc_slice_buffer_reset_and_unref_internal(
1935               &s->unprocessed_incoming_frames_buffer);
1936           break;
1937         } else if (*s->recv_message != nullptr) {
1938           break;
1939         }
1940       }
1941     }
1942     // save the length of the buffer before handing control back to application
1943     // threads. Needed to support correct flow control bookkeeping
1944     s->unprocessed_incoming_frames_buffer_cached_length =
1945         s->unprocessed_incoming_frames_buffer.length;
1946     if (error == GRPC_ERROR_NONE && *s->recv_message != nullptr) {
1947       null_then_run_closure(&s->recv_message_ready, GRPC_ERROR_NONE);
1948     } else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) {
1949       *s->recv_message = nullptr;
1950       null_then_run_closure(&s->recv_message_ready, GRPC_ERROR_NONE);
1951     }
1952     GRPC_ERROR_UNREF(error);
1953   }
1954 }
1955 
grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1956 void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
1957                                                        grpc_chttp2_stream* s) {
1958   grpc_chttp2_maybe_complete_recv_message(t, s);
1959   if (s->recv_trailing_metadata_finished != nullptr && s->read_closed &&
1960       s->write_closed) {
1961     if (s->seen_error || !t->is_client) {
1962       grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1963       if (!s->pending_byte_stream) {
1964         grpc_slice_buffer_reset_and_unref_internal(
1965             &s->unprocessed_incoming_frames_buffer);
1966       }
1967     }
1968     bool pending_data = s->pending_byte_stream ||
1969                         s->unprocessed_incoming_frames_buffer.length > 0;
1970     if (s->read_closed && s->frame_storage.length > 0 && !pending_data &&
1971         !s->seen_error && s->recv_trailing_metadata_finished != nullptr) {
1972       /* Maybe some SYNC_FLUSH data is left in frame_storage. Consume them and
1973        * maybe decompress the next 5 bytes in the stream. */
1974       bool end_of_context;
1975       if (!s->stream_decompression_ctx) {
1976         s->stream_decompression_ctx = grpc_stream_compression_context_create(
1977             s->stream_decompression_method);
1978       }
1979       if (!grpc_stream_decompress(
1980               s->stream_decompression_ctx, &s->frame_storage,
1981               &s->unprocessed_incoming_frames_buffer, nullptr,
1982               GRPC_HEADER_SIZE_IN_BYTES, &end_of_context)) {
1983         grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
1984         grpc_slice_buffer_reset_and_unref_internal(
1985             &s->unprocessed_incoming_frames_buffer);
1986         s->seen_error = true;
1987       } else {
1988         if (s->unprocessed_incoming_frames_buffer.length > 0) {
1989           s->unprocessed_incoming_frames_decompressed = true;
1990           pending_data = true;
1991         }
1992         if (end_of_context) {
1993           grpc_stream_compression_context_destroy(s->stream_decompression_ctx);
1994           s->stream_decompression_ctx = nullptr;
1995         }
1996       }
1997     }
1998     if (s->read_closed && s->frame_storage.length == 0 && !pending_data &&
1999         s->recv_trailing_metadata_finished != nullptr) {
2000       grpc_transport_move_stats(&s->stats, s->collecting_stats);
2001       s->collecting_stats = nullptr;
2002       grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[1],
2003                                                    s->recv_trailing_metadata);
2004       null_then_run_closure(&s->recv_trailing_metadata_finished,
2005                             GRPC_ERROR_NONE);
2006     }
2007   }
2008 }
2009 
remove_stream(grpc_chttp2_transport * t,uint32_t id,grpc_error * error)2010 static void remove_stream(grpc_chttp2_transport* t, uint32_t id,
2011                           grpc_error* error) {
2012   grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(
2013       grpc_chttp2_stream_map_delete(&t->stream_map, id));
2014   GPR_ASSERT(s);
2015   if (t->incoming_stream == s) {
2016     t->incoming_stream = nullptr;
2017     grpc_chttp2_parsing_become_skip_parser(t);
2018   }
2019   if (s->pending_byte_stream) {
2020     if (s->on_next != nullptr) {
2021       grpc_core::Chttp2IncomingByteStream* bs = s->data_parser.parsing_frame;
2022       if (error == GRPC_ERROR_NONE) {
2023         error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
2024       }
2025       bs->PublishError(error);
2026       bs->Unref();
2027       s->data_parser.parsing_frame = nullptr;
2028     } else {
2029       GRPC_ERROR_UNREF(s->byte_stream_error);
2030       s->byte_stream_error = GRPC_ERROR_REF(error);
2031     }
2032   }
2033 
2034   if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
2035     post_benign_reclaimer(t);
2036     if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SENT) {
2037       close_transport_locked(
2038           t, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2039                  "Last stream closed after sending GOAWAY", &error, 1));
2040     }
2041   }
2042   if (grpc_chttp2_list_remove_writable_stream(t, s)) {
2043     GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:remove_stream");
2044   }
2045 
2046   GRPC_ERROR_UNREF(error);
2047 
2048   maybe_start_some_streams(t);
2049 }
2050 
grpc_chttp2_cancel_stream(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error * due_to_error)2051 void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2052                                grpc_error* due_to_error) {
2053   if (!t->is_client && !s->sent_trailing_metadata &&
2054       grpc_error_has_clear_grpc_status(due_to_error)) {
2055     close_from_api(t, s, due_to_error);
2056     return;
2057   }
2058 
2059   if (!s->read_closed || !s->write_closed) {
2060     if (s->id != 0) {
2061       grpc_http2_error_code http_error;
2062       grpc_error_get_status(due_to_error, s->deadline, nullptr, nullptr,
2063                             &http_error, nullptr);
2064       grpc_slice_buffer_add(
2065           &t->qbuf,
2066           grpc_chttp2_rst_stream_create(
2067               s->id, static_cast<uint32_t>(http_error), &s->stats.outgoing));
2068       grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
2069     }
2070   }
2071   if (due_to_error != GRPC_ERROR_NONE && !s->seen_error) {
2072     s->seen_error = true;
2073   }
2074   grpc_chttp2_mark_stream_closed(t, s, 1, 1, due_to_error);
2075 }
2076 
grpc_chttp2_fake_status(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error * error)2077 void grpc_chttp2_fake_status(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2078                              grpc_error* error) {
2079   grpc_status_code status;
2080   grpc_slice slice;
2081   grpc_error_get_status(error, s->deadline, &status, &slice, nullptr, nullptr);
2082   if (status != GRPC_STATUS_OK) {
2083     s->seen_error = true;
2084   }
2085   /* stream_global->recv_trailing_metadata_finished gives us a
2086      last chance replacement: we've received trailing metadata,
2087      but something more important has become available to signal
2088      to the upper layers - drop what we've got, and then publish
2089      what we want - which is safe because we haven't told anyone
2090      about the metadata yet */
2091   if (s->published_metadata[1] == GRPC_METADATA_NOT_PUBLISHED ||
2092       s->recv_trailing_metadata_finished != nullptr) {
2093     char status_string[GPR_LTOA_MIN_BUFSIZE];
2094     gpr_ltoa(status, status_string);
2095     GRPC_LOG_IF_ERROR("add_status",
2096                       grpc_chttp2_incoming_metadata_buffer_replace_or_add(
2097                           &s->metadata_buffer[1],
2098                           grpc_mdelem_from_slices(
2099                               GRPC_MDSTR_GRPC_STATUS,
2100                               grpc_slice_from_copied_string(status_string))));
2101     if (!GRPC_SLICE_IS_EMPTY(slice)) {
2102       GRPC_LOG_IF_ERROR(
2103           "add_status_message",
2104           grpc_chttp2_incoming_metadata_buffer_replace_or_add(
2105               &s->metadata_buffer[1],
2106               grpc_mdelem_from_slices(GRPC_MDSTR_GRPC_MESSAGE,
2107                                       grpc_slice_ref_internal(slice))));
2108     }
2109     s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE;
2110     grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2111   }
2112 
2113   GRPC_ERROR_UNREF(error);
2114 }
2115 
add_error(grpc_error * error,grpc_error ** refs,size_t * nrefs)2116 static void add_error(grpc_error* error, grpc_error** refs, size_t* nrefs) {
2117   if (error == GRPC_ERROR_NONE) return;
2118   for (size_t i = 0; i < *nrefs; i++) {
2119     if (error == refs[i]) {
2120       return;
2121     }
2122   }
2123   refs[*nrefs] = error;
2124   ++*nrefs;
2125 }
2126 
removal_error(grpc_error * extra_error,grpc_chttp2_stream * s,const char * master_error_msg)2127 static grpc_error* removal_error(grpc_error* extra_error, grpc_chttp2_stream* s,
2128                                  const char* master_error_msg) {
2129   grpc_error* refs[3];
2130   size_t nrefs = 0;
2131   add_error(s->read_closed_error, refs, &nrefs);
2132   add_error(s->write_closed_error, refs, &nrefs);
2133   add_error(extra_error, refs, &nrefs);
2134   grpc_error* error = GRPC_ERROR_NONE;
2135   if (nrefs > 0) {
2136     error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(master_error_msg,
2137                                                              refs, nrefs);
2138   }
2139   GRPC_ERROR_UNREF(extra_error);
2140   return error;
2141 }
2142 
flush_write_list(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_chttp2_write_cb ** list,grpc_error * error)2143 static void flush_write_list(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2144                              grpc_chttp2_write_cb** list, grpc_error* error) {
2145   while (*list) {
2146     grpc_chttp2_write_cb* cb = *list;
2147     *list = cb->next;
2148     grpc_chttp2_complete_closure_step(t, s, &cb->closure, GRPC_ERROR_REF(error),
2149                                       "on_write_finished_cb");
2150     cb->next = t->write_cb_pool;
2151     t->write_cb_pool = cb;
2152   }
2153   GRPC_ERROR_UNREF(error);
2154 }
2155 
grpc_chttp2_fail_pending_writes(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error * error)2156 void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t,
2157                                      grpc_chttp2_stream* s, grpc_error* error) {
2158   error =
2159       removal_error(error, s, "Pending writes failed due to stream closure");
2160   s->send_initial_metadata = nullptr;
2161   grpc_chttp2_complete_closure_step(t, s, &s->send_initial_metadata_finished,
2162                                     GRPC_ERROR_REF(error),
2163                                     "send_initial_metadata_finished");
2164 
2165   s->send_trailing_metadata = nullptr;
2166   grpc_chttp2_complete_closure_step(t, s, &s->send_trailing_metadata_finished,
2167                                     GRPC_ERROR_REF(error),
2168                                     "send_trailing_metadata_finished");
2169 
2170   s->fetching_send_message.reset();
2171   grpc_chttp2_complete_closure_step(t, s, &s->fetching_send_message_finished,
2172                                     GRPC_ERROR_REF(error),
2173                                     "fetching_send_message_finished");
2174   flush_write_list(t, s, &s->on_write_finished_cbs, GRPC_ERROR_REF(error));
2175   flush_write_list(t, s, &s->on_flow_controlled_cbs, error);
2176 }
2177 
grpc_chttp2_mark_stream_closed(grpc_chttp2_transport * t,grpc_chttp2_stream * s,int close_reads,int close_writes,grpc_error * error)2178 void grpc_chttp2_mark_stream_closed(grpc_chttp2_transport* t,
2179                                     grpc_chttp2_stream* s, int close_reads,
2180                                     int close_writes, grpc_error* error) {
2181   if (s->read_closed && s->write_closed) {
2182     /* already closed */
2183     grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2184     GRPC_ERROR_UNREF(error);
2185     return;
2186   }
2187   bool closed_read = false;
2188   bool became_closed = false;
2189   if (close_reads && !s->read_closed) {
2190     s->read_closed_error = GRPC_ERROR_REF(error);
2191     s->read_closed = true;
2192     closed_read = true;
2193   }
2194   if (close_writes && !s->write_closed) {
2195     s->write_closed_error = GRPC_ERROR_REF(error);
2196     s->write_closed = true;
2197     grpc_chttp2_fail_pending_writes(t, s, GRPC_ERROR_REF(error));
2198   }
2199   if (s->read_closed && s->write_closed) {
2200     became_closed = true;
2201     grpc_error* overall_error =
2202         removal_error(GRPC_ERROR_REF(error), s, "Stream removed");
2203     if (s->id != 0) {
2204       remove_stream(t, s->id, GRPC_ERROR_REF(overall_error));
2205     } else {
2206       /* Purge streams waiting on concurrency still waiting for id assignment */
2207       grpc_chttp2_list_remove_waiting_for_concurrency(t, s);
2208     }
2209     if (overall_error != GRPC_ERROR_NONE) {
2210       grpc_chttp2_fake_status(t, s, overall_error);
2211     }
2212   }
2213   if (closed_read) {
2214     for (int i = 0; i < 2; i++) {
2215       if (s->published_metadata[i] == GRPC_METADATA_NOT_PUBLISHED) {
2216         s->published_metadata[i] = GPRC_METADATA_PUBLISHED_AT_CLOSE;
2217       }
2218     }
2219     grpc_chttp2_maybe_complete_recv_initial_metadata(t, s);
2220     grpc_chttp2_maybe_complete_recv_message(t, s);
2221   }
2222   if (became_closed) {
2223     grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2224     GRPC_CHTTP2_STREAM_UNREF(s, "chttp2");
2225   }
2226   GRPC_ERROR_UNREF(error);
2227 }
2228 
close_from_api(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error * error)2229 static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2230                            grpc_error* error) {
2231   grpc_slice hdr;
2232   grpc_slice status_hdr;
2233   grpc_slice http_status_hdr;
2234   grpc_slice content_type_hdr;
2235   grpc_slice message_pfx;
2236   uint8_t* p;
2237   uint32_t len = 0;
2238   grpc_status_code grpc_status;
2239   grpc_slice slice;
2240   grpc_error_get_status(error, s->deadline, &grpc_status, &slice, nullptr,
2241                         nullptr);
2242 
2243   GPR_ASSERT(grpc_status >= 0 && (int)grpc_status < 100);
2244 
2245   /* Hand roll a header block.
2246      This is unnecessarily ugly - at some point we should find a more
2247      elegant solution.
2248      It's complicated by the fact that our send machinery would be dead by
2249      the time we got around to sending this, so instead we ignore HPACK
2250      compression and just write the uncompressed bytes onto the wire. */
2251   if (!s->sent_initial_metadata) {
2252     http_status_hdr = GRPC_SLICE_MALLOC(13);
2253     p = GRPC_SLICE_START_PTR(http_status_hdr);
2254     *p++ = 0x00;
2255     *p++ = 7;
2256     *p++ = ':';
2257     *p++ = 's';
2258     *p++ = 't';
2259     *p++ = 'a';
2260     *p++ = 't';
2261     *p++ = 'u';
2262     *p++ = 's';
2263     *p++ = 3;
2264     *p++ = '2';
2265     *p++ = '0';
2266     *p++ = '0';
2267     GPR_ASSERT(p == GRPC_SLICE_END_PTR(http_status_hdr));
2268     len += static_cast<uint32_t> GRPC_SLICE_LENGTH(http_status_hdr);
2269 
2270     content_type_hdr = GRPC_SLICE_MALLOC(31);
2271     p = GRPC_SLICE_START_PTR(content_type_hdr);
2272     *p++ = 0x00;
2273     *p++ = 12;
2274     *p++ = 'c';
2275     *p++ = 'o';
2276     *p++ = 'n';
2277     *p++ = 't';
2278     *p++ = 'e';
2279     *p++ = 'n';
2280     *p++ = 't';
2281     *p++ = '-';
2282     *p++ = 't';
2283     *p++ = 'y';
2284     *p++ = 'p';
2285     *p++ = 'e';
2286     *p++ = 16;
2287     *p++ = 'a';
2288     *p++ = 'p';
2289     *p++ = 'p';
2290     *p++ = 'l';
2291     *p++ = 'i';
2292     *p++ = 'c';
2293     *p++ = 'a';
2294     *p++ = 't';
2295     *p++ = 'i';
2296     *p++ = 'o';
2297     *p++ = 'n';
2298     *p++ = '/';
2299     *p++ = 'g';
2300     *p++ = 'r';
2301     *p++ = 'p';
2302     *p++ = 'c';
2303     GPR_ASSERT(p == GRPC_SLICE_END_PTR(content_type_hdr));
2304     len += static_cast<uint32_t> GRPC_SLICE_LENGTH(content_type_hdr);
2305   }
2306 
2307   status_hdr = GRPC_SLICE_MALLOC(15 + (grpc_status >= 10));
2308   p = GRPC_SLICE_START_PTR(status_hdr);
2309   *p++ = 0x00; /* literal header, not indexed */
2310   *p++ = 11;   /* len(grpc-status) */
2311   *p++ = 'g';
2312   *p++ = 'r';
2313   *p++ = 'p';
2314   *p++ = 'c';
2315   *p++ = '-';
2316   *p++ = 's';
2317   *p++ = 't';
2318   *p++ = 'a';
2319   *p++ = 't';
2320   *p++ = 'u';
2321   *p++ = 's';
2322   if (grpc_status < 10) {
2323     *p++ = 1;
2324     *p++ = static_cast<uint8_t>('0' + grpc_status);
2325   } else {
2326     *p++ = 2;
2327     *p++ = static_cast<uint8_t>('0' + (grpc_status / 10));
2328     *p++ = static_cast<uint8_t>('0' + (grpc_status % 10));
2329   }
2330   GPR_ASSERT(p == GRPC_SLICE_END_PTR(status_hdr));
2331   len += static_cast<uint32_t> GRPC_SLICE_LENGTH(status_hdr);
2332 
2333   size_t msg_len = GRPC_SLICE_LENGTH(slice);
2334   GPR_ASSERT(msg_len <= UINT32_MAX);
2335   uint32_t msg_len_len = GRPC_CHTTP2_VARINT_LENGTH((uint32_t)msg_len, 1);
2336   message_pfx = GRPC_SLICE_MALLOC(14 + msg_len_len);
2337   p = GRPC_SLICE_START_PTR(message_pfx);
2338   *p++ = 0x00; /* literal header, not indexed */
2339   *p++ = 12;   /* len(grpc-message) */
2340   *p++ = 'g';
2341   *p++ = 'r';
2342   *p++ = 'p';
2343   *p++ = 'c';
2344   *p++ = '-';
2345   *p++ = 'm';
2346   *p++ = 'e';
2347   *p++ = 's';
2348   *p++ = 's';
2349   *p++ = 'a';
2350   *p++ = 'g';
2351   *p++ = 'e';
2352   GRPC_CHTTP2_WRITE_VARINT((uint32_t)msg_len, 1, 0, p, (uint32_t)msg_len_len);
2353   p += msg_len_len;
2354   GPR_ASSERT(p == GRPC_SLICE_END_PTR(message_pfx));
2355   len += static_cast<uint32_t> GRPC_SLICE_LENGTH(message_pfx);
2356   len += static_cast<uint32_t>(msg_len);
2357 
2358   hdr = GRPC_SLICE_MALLOC(9);
2359   p = GRPC_SLICE_START_PTR(hdr);
2360   *p++ = static_cast<uint8_t>(len >> 16);
2361   *p++ = static_cast<uint8_t>(len >> 8);
2362   *p++ = static_cast<uint8_t>(len);
2363   *p++ = GRPC_CHTTP2_FRAME_HEADER;
2364   *p++ = GRPC_CHTTP2_DATA_FLAG_END_STREAM | GRPC_CHTTP2_DATA_FLAG_END_HEADERS;
2365   *p++ = static_cast<uint8_t>(s->id >> 24);
2366   *p++ = static_cast<uint8_t>(s->id >> 16);
2367   *p++ = static_cast<uint8_t>(s->id >> 8);
2368   *p++ = static_cast<uint8_t>(s->id);
2369   GPR_ASSERT(p == GRPC_SLICE_END_PTR(hdr));
2370 
2371   grpc_slice_buffer_add(&t->qbuf, hdr);
2372   if (!s->sent_initial_metadata) {
2373     grpc_slice_buffer_add(&t->qbuf, http_status_hdr);
2374     grpc_slice_buffer_add(&t->qbuf, content_type_hdr);
2375   }
2376   grpc_slice_buffer_add(&t->qbuf, status_hdr);
2377   grpc_slice_buffer_add(&t->qbuf, message_pfx);
2378   grpc_slice_buffer_add(&t->qbuf, grpc_slice_ref_internal(slice));
2379   grpc_slice_buffer_add(
2380       &t->qbuf, grpc_chttp2_rst_stream_create(s->id, GRPC_HTTP2_NO_ERROR,
2381                                               &s->stats.outgoing));
2382 
2383   grpc_chttp2_mark_stream_closed(t, s, 1, 1, error);
2384   grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API);
2385 }
2386 
2387 typedef struct {
2388   grpc_error* error;
2389   grpc_chttp2_transport* t;
2390 } cancel_stream_cb_args;
2391 
cancel_stream_cb(void * user_data,uint32_t key,void * stream)2392 static void cancel_stream_cb(void* user_data, uint32_t key, void* stream) {
2393   cancel_stream_cb_args* args = static_cast<cancel_stream_cb_args*>(user_data);
2394   grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(stream);
2395   grpc_chttp2_cancel_stream(args->t, s, GRPC_ERROR_REF(args->error));
2396 }
2397 
end_all_the_calls(grpc_chttp2_transport * t,grpc_error * error)2398 static void end_all_the_calls(grpc_chttp2_transport* t, grpc_error* error) {
2399   cancel_stream_cb_args args = {error, t};
2400   grpc_chttp2_stream_map_for_each(&t->stream_map, cancel_stream_cb, &args);
2401   GRPC_ERROR_UNREF(error);
2402 }
2403 
2404 /*******************************************************************************
2405  * INPUT PROCESSING - PARSING
2406  */
2407 
2408 template <class F>
WithUrgency(grpc_chttp2_transport * t,grpc_core::chttp2::FlowControlAction::Urgency urgency,grpc_chttp2_initiate_write_reason reason,F action)2409 static void WithUrgency(grpc_chttp2_transport* t,
2410                         grpc_core::chttp2::FlowControlAction::Urgency urgency,
2411                         grpc_chttp2_initiate_write_reason reason, F action) {
2412   switch (urgency) {
2413     case grpc_core::chttp2::FlowControlAction::Urgency::NO_ACTION_NEEDED:
2414       break;
2415     case grpc_core::chttp2::FlowControlAction::Urgency::UPDATE_IMMEDIATELY:
2416       grpc_chttp2_initiate_write(t, reason);
2417     // fallthrough
2418     case grpc_core::chttp2::FlowControlAction::Urgency::QUEUE_UPDATE:
2419       action();
2420       break;
2421   }
2422 }
2423 
grpc_chttp2_act_on_flowctl_action(const grpc_core::chttp2::FlowControlAction & action,grpc_chttp2_transport * t,grpc_chttp2_stream * s)2424 void grpc_chttp2_act_on_flowctl_action(
2425     const grpc_core::chttp2::FlowControlAction& action,
2426     grpc_chttp2_transport* t, grpc_chttp2_stream* s) {
2427   WithUrgency(t, action.send_stream_update(),
2428               GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL,
2429               [t, s]() { grpc_chttp2_mark_stream_writable(t, s); });
2430   WithUrgency(t, action.send_transport_update(),
2431               GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL, []() {});
2432   WithUrgency(t, action.send_initial_window_update(),
2433               GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
2434                 queue_setting_update(t,
2435                                      GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
2436                                      action.initial_window_size());
2437               });
2438   WithUrgency(t, action.send_max_frame_size_update(),
2439               GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
2440                 queue_setting_update(t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
2441                                      action.max_frame_size());
2442               });
2443 }
2444 
try_http_parsing(grpc_chttp2_transport * t)2445 static grpc_error* try_http_parsing(grpc_chttp2_transport* t) {
2446   grpc_http_parser parser;
2447   size_t i = 0;
2448   grpc_error* error = GRPC_ERROR_NONE;
2449   grpc_http_response response;
2450   memset(&response, 0, sizeof(response));
2451 
2452   grpc_http_parser_init(&parser, GRPC_HTTP_RESPONSE, &response);
2453 
2454   grpc_error* parse_error = GRPC_ERROR_NONE;
2455   for (; i < t->read_buffer.count && parse_error == GRPC_ERROR_NONE; i++) {
2456     parse_error =
2457         grpc_http_parser_parse(&parser, t->read_buffer.slices[i], nullptr);
2458   }
2459   if (parse_error == GRPC_ERROR_NONE &&
2460       (parse_error = grpc_http_parser_eof(&parser)) == GRPC_ERROR_NONE) {
2461     error = grpc_error_set_int(
2462         grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2463                                "Trying to connect an http1.x server"),
2464                            GRPC_ERROR_INT_HTTP_STATUS, response.status),
2465         GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
2466   }
2467   GRPC_ERROR_UNREF(parse_error);
2468 
2469   grpc_http_parser_destroy(&parser);
2470   grpc_http_response_destroy(&response);
2471   return error;
2472 }
2473 
read_action_locked(void * tp,grpc_error * error)2474 static void read_action_locked(void* tp, grpc_error* error) {
2475   GPR_TIMER_SCOPE("reading_action_locked", 0);
2476 
2477   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2478 
2479   GRPC_ERROR_REF(error);
2480 
2481   grpc_error* err = error;
2482   if (err != GRPC_ERROR_NONE) {
2483     err = grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2484                                  "Endpoint read failed", &err, 1),
2485                              GRPC_ERROR_INT_OCCURRED_DURING_WRITE,
2486                              t->write_state);
2487   }
2488   GPR_SWAP(grpc_error*, err, error);
2489   GRPC_ERROR_UNREF(err);
2490   if (t->closed_with_error == GRPC_ERROR_NONE) {
2491     GPR_TIMER_SCOPE("reading_action.parse", 0);
2492     size_t i = 0;
2493     grpc_error* errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE,
2494                              GRPC_ERROR_NONE};
2495     for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) {
2496       grpc_core::BdpEstimator* bdp_est = t->flow_control->bdp_estimator();
2497       if (bdp_est) {
2498         bdp_est->AddIncomingBytes(
2499             static_cast<int64_t> GRPC_SLICE_LENGTH(t->read_buffer.slices[i]));
2500       }
2501       errors[1] = grpc_chttp2_perform_read(t, t->read_buffer.slices[i]);
2502     }
2503     if (errors[1] != GRPC_ERROR_NONE) {
2504       errors[2] = try_http_parsing(t);
2505       GRPC_ERROR_UNREF(error);
2506       error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2507           "Failed parsing HTTP/2", errors, GPR_ARRAY_SIZE(errors));
2508     }
2509     for (i = 0; i < GPR_ARRAY_SIZE(errors); i++) {
2510       GRPC_ERROR_UNREF(errors[i]);
2511     }
2512 
2513     GPR_TIMER_SCOPE("post_parse_locked", 0);
2514     if (t->initial_window_update != 0) {
2515       if (t->initial_window_update > 0) {
2516         grpc_chttp2_stream* s;
2517         while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) {
2518           grpc_chttp2_mark_stream_writable(t, s);
2519           grpc_chttp2_initiate_write(
2520               t, GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING);
2521         }
2522       }
2523       t->initial_window_update = 0;
2524     }
2525   }
2526 
2527   GPR_TIMER_SCOPE("post_reading_action_locked", 0);
2528   bool keep_reading = false;
2529   if (error == GRPC_ERROR_NONE && t->closed_with_error != GRPC_ERROR_NONE) {
2530     error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2531         "Transport closed", &t->closed_with_error, 1);
2532   }
2533   if (error != GRPC_ERROR_NONE) {
2534     /* If a goaway frame was received, this might be the reason why the read
2535      * failed. Add this info to the error */
2536     if (t->goaway_error != GRPC_ERROR_NONE) {
2537       error = grpc_error_add_child(error, GRPC_ERROR_REF(t->goaway_error));
2538     }
2539 
2540     close_transport_locked(t, GRPC_ERROR_REF(error));
2541     t->endpoint_reading = 0;
2542   } else if (t->closed_with_error == GRPC_ERROR_NONE) {
2543     keep_reading = true;
2544     GRPC_CHTTP2_REF_TRANSPORT(t, "keep_reading");
2545   }
2546   grpc_slice_buffer_reset_and_unref_internal(&t->read_buffer);
2547 
2548   if (keep_reading) {
2549     grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action_locked);
2550     grpc_chttp2_act_on_flowctl_action(t->flow_control->MakeAction(), t,
2551                                       nullptr);
2552     GRPC_CHTTP2_UNREF_TRANSPORT(t, "keep_reading");
2553   } else {
2554     GRPC_CHTTP2_UNREF_TRANSPORT(t, "reading_action");
2555   }
2556 
2557   GRPC_ERROR_UNREF(error);
2558 }
2559 
2560 // t is reffed prior to calling the first time, and once the callback chain
2561 // that kicks off finishes, it's unreffed
schedule_bdp_ping_locked(grpc_chttp2_transport * t)2562 static void schedule_bdp_ping_locked(grpc_chttp2_transport* t) {
2563   t->flow_control->bdp_estimator()->SchedulePing();
2564   send_ping_locked(t, &t->start_bdp_ping_locked, &t->finish_bdp_ping_locked);
2565 }
2566 
start_bdp_ping_locked(void * tp,grpc_error * error)2567 static void start_bdp_ping_locked(void* tp, grpc_error* error) {
2568   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2569   if (grpc_http_trace.enabled()) {
2570     gpr_log(GPR_INFO, "%s: Start BDP ping err=%s", t->peer_string,
2571             grpc_error_string(error));
2572   }
2573   /* Reset the keepalive ping timer */
2574   if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
2575     grpc_timer_cancel(&t->keepalive_ping_timer);
2576   }
2577   t->flow_control->bdp_estimator()->StartPing();
2578 }
2579 
finish_bdp_ping_locked(void * tp,grpc_error * error)2580 static void finish_bdp_ping_locked(void* tp, grpc_error* error) {
2581   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2582   if (grpc_http_trace.enabled()) {
2583     gpr_log(GPR_INFO, "%s: Complete BDP ping err=%s", t->peer_string,
2584             grpc_error_string(error));
2585   }
2586   if (error != GRPC_ERROR_NONE) {
2587     GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
2588     return;
2589   }
2590   grpc_millis next_ping = t->flow_control->bdp_estimator()->CompletePing();
2591   grpc_chttp2_act_on_flowctl_action(t->flow_control->PeriodicUpdate(), t,
2592                                     nullptr);
2593   GPR_ASSERT(!t->have_next_bdp_ping_timer);
2594   t->have_next_bdp_ping_timer = true;
2595   grpc_timer_init(&t->next_bdp_ping_timer, next_ping,
2596                   &t->next_bdp_ping_timer_expired_locked);
2597 }
2598 
next_bdp_ping_timer_expired_locked(void * tp,grpc_error * error)2599 static void next_bdp_ping_timer_expired_locked(void* tp, grpc_error* error) {
2600   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);
2601   GPR_ASSERT(t->have_next_bdp_ping_timer);
2602   t->have_next_bdp_ping_timer = false;
2603   if (error != GRPC_ERROR_NONE) {
2604     GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping");
2605     return;
2606   }
2607   schedule_bdp_ping_locked(t);
2608 }
2609 
grpc_chttp2_config_default_keepalive_args(grpc_channel_args * args,bool is_client)2610 void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
2611                                                bool is_client) {
2612   size_t i;
2613   if (args) {
2614     for (i = 0; i < args->num_args; i++) {
2615       if (0 == strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) {
2616         const int value = grpc_channel_arg_get_integer(
2617             &args->args[i], {is_client ? g_default_client_keepalive_time_ms
2618                                        : g_default_server_keepalive_time_ms,
2619                              1, INT_MAX});
2620         if (is_client) {
2621           g_default_client_keepalive_time_ms = value;
2622         } else {
2623           g_default_server_keepalive_time_ms = value;
2624         }
2625       } else if (0 ==
2626                  strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) {
2627         const int value = grpc_channel_arg_get_integer(
2628             &args->args[i], {is_client ? g_default_client_keepalive_timeout_ms
2629                                        : g_default_server_keepalive_timeout_ms,
2630                              0, INT_MAX});
2631         if (is_client) {
2632           g_default_client_keepalive_timeout_ms = value;
2633         } else {
2634           g_default_server_keepalive_timeout_ms = value;
2635         }
2636       } else if (0 == strcmp(args->args[i].key,
2637                              GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) {
2638         const bool value = static_cast<uint32_t>(grpc_channel_arg_get_integer(
2639             &args->args[i],
2640             {is_client ? g_default_client_keepalive_permit_without_calls
2641                        : g_default_server_keepalive_timeout_ms,
2642              0, 1}));
2643         if (is_client) {
2644           g_default_client_keepalive_permit_without_calls = value;
2645         } else {
2646           g_default_server_keepalive_permit_without_calls = value;
2647         }
2648       } else if (0 ==
2649                  strcmp(args->args[i].key, GRPC_ARG_HTTP2_MAX_PING_STRIKES)) {
2650         g_default_max_ping_strikes = grpc_channel_arg_get_integer(
2651             &args->args[i], {g_default_max_ping_strikes, 0, INT_MAX});
2652       } else if (0 == strcmp(args->args[i].key,
2653                              GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) {
2654         g_default_max_pings_without_data = grpc_channel_arg_get_integer(
2655             &args->args[i], {g_default_max_pings_without_data, 0, INT_MAX});
2656       } else if (0 ==
2657                  strcmp(
2658                      args->args[i].key,
2659                      GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS)) {
2660         g_default_min_sent_ping_interval_without_data_ms =
2661             grpc_channel_arg_get_integer(
2662                 &args->args[i],
2663                 {g_default_min_sent_ping_interval_without_data_ms, 0, INT_MAX});
2664       } else if (0 ==
2665                  strcmp(
2666                      args->args[i].key,
2667                      GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS)) {
2668         g_default_min_recv_ping_interval_without_data_ms =
2669             grpc_channel_arg_get_integer(
2670                 &args->args[i],
2671                 {g_default_min_recv_ping_interval_without_data_ms, 0, INT_MAX});
2672       }
2673     }
2674   }
2675 }
2676 
init_keepalive_ping_locked(void * arg,grpc_error * error)2677 static void init_keepalive_ping_locked(void* arg, grpc_error* error) {
2678   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2679   GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING);
2680   if (t->destroying || t->closed_with_error != GRPC_ERROR_NONE) {
2681     t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
2682   } else if (error == GRPC_ERROR_NONE) {
2683     if (t->keepalive_permit_without_calls ||
2684         grpc_chttp2_stream_map_size(&t->stream_map) > 0) {
2685       t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING;
2686       GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end");
2687       send_keepalive_ping_locked(t);
2688       grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING);
2689     } else {
2690       GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
2691       grpc_timer_init(&t->keepalive_ping_timer,
2692                       grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
2693                       &t->init_keepalive_ping_locked);
2694     }
2695   } else if (error == GRPC_ERROR_CANCELLED) {
2696     /* The keepalive ping timer may be cancelled by bdp */
2697     GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
2698     grpc_timer_init(&t->keepalive_ping_timer,
2699                     grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
2700                     &t->init_keepalive_ping_locked);
2701   }
2702   GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping");
2703 }
2704 
start_keepalive_ping_locked(void * arg,grpc_error * error)2705 static void start_keepalive_ping_locked(void* arg, grpc_error* error) {
2706   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2707   if (error != GRPC_ERROR_NONE) {
2708     return;
2709   }
2710   GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog");
2711   grpc_timer_init(&t->keepalive_watchdog_timer,
2712                   grpc_core::ExecCtx::Get()->Now() + t->keepalive_timeout,
2713                   &t->keepalive_watchdog_fired_locked);
2714 }
2715 
finish_keepalive_ping_locked(void * arg,grpc_error * error)2716 static void finish_keepalive_ping_locked(void* arg, grpc_error* error) {
2717   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2718   if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
2719     if (error == GRPC_ERROR_NONE) {
2720       t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
2721       grpc_timer_cancel(&t->keepalive_watchdog_timer);
2722       GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
2723       grpc_timer_init(&t->keepalive_ping_timer,
2724                       grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
2725                       &t->init_keepalive_ping_locked);
2726     }
2727   }
2728   GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive ping end");
2729 }
2730 
keepalive_watchdog_fired_locked(void * arg,grpc_error * error)2731 static void keepalive_watchdog_fired_locked(void* arg, grpc_error* error) {
2732   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
2733   if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
2734     if (error == GRPC_ERROR_NONE) {
2735       t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
2736       close_transport_locked(
2737           t,
2738           grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2739                                  "keepalive watchdog timeout"),
2740                              GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL));
2741     }
2742   } else {
2743     /* The watchdog timer should have been cancelled by
2744      * finish_keepalive_ping_locked. */
2745     if (GPR_UNLIKELY(error != GRPC_ERROR_CANCELLED)) {
2746       gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)",
2747               t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING);
2748     }
2749   }
2750   GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog");
2751 }
2752 
2753 /*******************************************************************************
2754  * CALLBACK LOOP
2755  */
2756 
connectivity_state_set(grpc_chttp2_transport * t,grpc_connectivity_state state,grpc_error * error,const char * reason)2757 static void connectivity_state_set(grpc_chttp2_transport* t,
2758                                    grpc_connectivity_state state,
2759                                    grpc_error* error, const char* reason) {
2760   GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "set connectivity_state=%d", state));
2761   grpc_connectivity_state_set(&t->channel_callback.state_tracker, state, error,
2762                               reason);
2763 }
2764 
2765 /*******************************************************************************
2766  * POLLSET STUFF
2767  */
2768 
set_pollset(grpc_transport * gt,grpc_stream * gs,grpc_pollset * pollset)2769 static void set_pollset(grpc_transport* gt, grpc_stream* gs,
2770                         grpc_pollset* pollset) {
2771   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
2772   grpc_endpoint_add_to_pollset(t->ep, pollset);
2773 }
2774 
set_pollset_set(grpc_transport * gt,grpc_stream * gs,grpc_pollset_set * pollset_set)2775 static void set_pollset_set(grpc_transport* gt, grpc_stream* gs,
2776                             grpc_pollset_set* pollset_set) {
2777   grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt);
2778   grpc_endpoint_add_to_pollset_set(t->ep, pollset_set);
2779 }
2780 
2781 /*******************************************************************************
2782  * BYTE STREAM
2783  */
2784 
reset_byte_stream(void * arg,grpc_error * error)2785 static void reset_byte_stream(void* arg, grpc_error* error) {
2786   grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(arg);
2787   s->pending_byte_stream = false;
2788   if (error == GRPC_ERROR_NONE) {
2789     grpc_chttp2_maybe_complete_recv_message(s->t, s);
2790     grpc_chttp2_maybe_complete_recv_trailing_metadata(s->t, s);
2791   } else {
2792     GPR_ASSERT(error != GRPC_ERROR_NONE);
2793     GRPC_CLOSURE_SCHED(s->on_next, GRPC_ERROR_REF(error));
2794     s->on_next = nullptr;
2795     GRPC_ERROR_UNREF(s->byte_stream_error);
2796     s->byte_stream_error = GRPC_ERROR_NONE;
2797     grpc_chttp2_cancel_stream(s->t, s, GRPC_ERROR_REF(error));
2798     s->byte_stream_error = GRPC_ERROR_REF(error);
2799   }
2800 }
2801 
2802 namespace grpc_core {
2803 
Chttp2IncomingByteStream(grpc_chttp2_transport * transport,grpc_chttp2_stream * stream,uint32_t frame_size,uint32_t flags)2804 Chttp2IncomingByteStream::Chttp2IncomingByteStream(
2805     grpc_chttp2_transport* transport, grpc_chttp2_stream* stream,
2806     uint32_t frame_size, uint32_t flags)
2807     : ByteStream(frame_size, flags),
2808       transport_(transport),
2809       stream_(stream),
2810       remaining_bytes_(frame_size) {
2811   gpr_ref_init(&refs_, 2);
2812   GRPC_ERROR_UNREF(stream->byte_stream_error);
2813   stream->byte_stream_error = GRPC_ERROR_NONE;
2814 }
2815 
OrphanLocked(void * arg,grpc_error * error_ignored)2816 void Chttp2IncomingByteStream::OrphanLocked(void* arg,
2817                                             grpc_error* error_ignored) {
2818   Chttp2IncomingByteStream* bs = static_cast<Chttp2IncomingByteStream*>(arg);
2819   grpc_chttp2_stream* s = bs->stream_;
2820   grpc_chttp2_transport* t = s->t;
2821   bs->Unref();
2822   s->pending_byte_stream = false;
2823   grpc_chttp2_maybe_complete_recv_message(t, s);
2824   grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2825 }
2826 
Orphan()2827 void Chttp2IncomingByteStream::Orphan() {
2828   GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0);
2829   GRPC_CLOSURE_SCHED(
2830       GRPC_CLOSURE_INIT(&destroy_action_,
2831                         &Chttp2IncomingByteStream::OrphanLocked, this,
2832                         grpc_combiner_scheduler(transport_->combiner)),
2833       GRPC_ERROR_NONE);
2834 }
2835 
Unref()2836 void Chttp2IncomingByteStream::Unref() {
2837   if (gpr_unref(&refs_)) {
2838     Delete(this);
2839   }
2840 }
2841 
Ref()2842 void Chttp2IncomingByteStream::Ref() { gpr_ref(&refs_); }
2843 
NextLocked(void * arg,grpc_error * error_ignored)2844 void Chttp2IncomingByteStream::NextLocked(void* arg,
2845                                           grpc_error* error_ignored) {
2846   Chttp2IncomingByteStream* bs = static_cast<Chttp2IncomingByteStream*>(arg);
2847   grpc_chttp2_transport* t = bs->transport_;
2848   grpc_chttp2_stream* s = bs->stream_;
2849   size_t cur_length = s->frame_storage.length;
2850   if (!s->read_closed) {
2851     s->flow_control->IncomingByteStreamUpdate(bs->next_action_.max_size_hint,
2852                                               cur_length);
2853     grpc_chttp2_act_on_flowctl_action(s->flow_control->MakeAction(), t, s);
2854   }
2855   GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0);
2856   if (s->frame_storage.length > 0) {
2857     grpc_slice_buffer_swap(&s->frame_storage,
2858                            &s->unprocessed_incoming_frames_buffer);
2859     s->unprocessed_incoming_frames_decompressed = false;
2860     GRPC_CLOSURE_SCHED(bs->next_action_.on_complete, GRPC_ERROR_NONE);
2861   } else if (s->byte_stream_error != GRPC_ERROR_NONE) {
2862     GRPC_CLOSURE_SCHED(bs->next_action_.on_complete,
2863                        GRPC_ERROR_REF(s->byte_stream_error));
2864     if (s->data_parser.parsing_frame != nullptr) {
2865       s->data_parser.parsing_frame->Unref();
2866       s->data_parser.parsing_frame = nullptr;
2867     }
2868   } else if (s->read_closed) {
2869     if (bs->remaining_bytes_ != 0) {
2870       s->byte_stream_error =
2871           GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
2872       GRPC_CLOSURE_SCHED(bs->next_action_.on_complete,
2873                          GRPC_ERROR_REF(s->byte_stream_error));
2874       if (s->data_parser.parsing_frame != nullptr) {
2875         s->data_parser.parsing_frame->Unref();
2876         s->data_parser.parsing_frame = nullptr;
2877       }
2878     } else {
2879       /* Should never reach here. */
2880       GPR_ASSERT(false);
2881     }
2882   } else {
2883     s->on_next = bs->next_action_.on_complete;
2884   }
2885   bs->Unref();
2886 }
2887 
Next(size_t max_size_hint,grpc_closure * on_complete)2888 bool Chttp2IncomingByteStream::Next(size_t max_size_hint,
2889                                     grpc_closure* on_complete) {
2890   GPR_TIMER_SCOPE("incoming_byte_stream_next", 0);
2891   if (stream_->unprocessed_incoming_frames_buffer.length > 0) {
2892     return true;
2893   } else {
2894     Ref();
2895     next_action_.max_size_hint = max_size_hint;
2896     next_action_.on_complete = on_complete;
2897     GRPC_CLOSURE_SCHED(
2898         GRPC_CLOSURE_INIT(&next_action_.closure,
2899                           &Chttp2IncomingByteStream::NextLocked, this,
2900                           grpc_combiner_scheduler(transport_->combiner)),
2901         GRPC_ERROR_NONE);
2902     return false;
2903   }
2904 }
2905 
MaybeCreateStreamDecompressionCtx()2906 void Chttp2IncomingByteStream::MaybeCreateStreamDecompressionCtx() {
2907   if (!stream_->stream_decompression_ctx) {
2908     stream_->stream_decompression_ctx = grpc_stream_compression_context_create(
2909         stream_->stream_decompression_method);
2910   }
2911 }
2912 
Pull(grpc_slice * slice)2913 grpc_error* Chttp2IncomingByteStream::Pull(grpc_slice* slice) {
2914   GPR_TIMER_SCOPE("incoming_byte_stream_pull", 0);
2915   grpc_error* error;
2916   if (stream_->unprocessed_incoming_frames_buffer.length > 0) {
2917     if (!stream_->unprocessed_incoming_frames_decompressed) {
2918       bool end_of_context;
2919       MaybeCreateStreamDecompressionCtx();
2920       if (!grpc_stream_decompress(stream_->stream_decompression_ctx,
2921                                   &stream_->unprocessed_incoming_frames_buffer,
2922                                   &stream_->decompressed_data_buffer, nullptr,
2923                                   MAX_SIZE_T, &end_of_context)) {
2924         error =
2925             GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream decompression error.");
2926         return error;
2927       }
2928       GPR_ASSERT(stream_->unprocessed_incoming_frames_buffer.length == 0);
2929       grpc_slice_buffer_swap(&stream_->unprocessed_incoming_frames_buffer,
2930                              &stream_->decompressed_data_buffer);
2931       stream_->unprocessed_incoming_frames_decompressed = true;
2932       if (end_of_context) {
2933         grpc_stream_compression_context_destroy(
2934             stream_->stream_decompression_ctx);
2935         stream_->stream_decompression_ctx = nullptr;
2936       }
2937       if (stream_->unprocessed_incoming_frames_buffer.length == 0) {
2938         *slice = grpc_empty_slice();
2939       }
2940     }
2941     error = grpc_deframe_unprocessed_incoming_frames(
2942         &stream_->data_parser, stream_,
2943         &stream_->unprocessed_incoming_frames_buffer, slice, nullptr);
2944     if (error != GRPC_ERROR_NONE) {
2945       return error;
2946     }
2947   } else {
2948     error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
2949     GRPC_CLOSURE_SCHED(&stream_->reset_byte_stream, GRPC_ERROR_REF(error));
2950     return error;
2951   }
2952   return GRPC_ERROR_NONE;
2953 }
2954 
PublishError(grpc_error * error)2955 void Chttp2IncomingByteStream::PublishError(grpc_error* error) {
2956   GPR_ASSERT(error != GRPC_ERROR_NONE);
2957   GRPC_CLOSURE_SCHED(stream_->on_next, GRPC_ERROR_REF(error));
2958   stream_->on_next = nullptr;
2959   GRPC_ERROR_UNREF(stream_->byte_stream_error);
2960   stream_->byte_stream_error = GRPC_ERROR_REF(error);
2961   grpc_chttp2_cancel_stream(transport_, stream_, GRPC_ERROR_REF(error));
2962 }
2963 
Push(grpc_slice slice,grpc_slice * slice_out)2964 grpc_error* Chttp2IncomingByteStream::Push(grpc_slice slice,
2965                                            grpc_slice* slice_out) {
2966   if (remaining_bytes_ < GRPC_SLICE_LENGTH(slice)) {
2967     grpc_error* error =
2968         GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream");
2969     GRPC_CLOSURE_SCHED(&stream_->reset_byte_stream, GRPC_ERROR_REF(error));
2970     grpc_slice_unref_internal(slice);
2971     return error;
2972   } else {
2973     remaining_bytes_ -= static_cast<uint32_t> GRPC_SLICE_LENGTH(slice);
2974     if (slice_out != nullptr) {
2975       *slice_out = slice;
2976     }
2977     return GRPC_ERROR_NONE;
2978   }
2979 }
2980 
Finished(grpc_error * error,bool reset_on_error)2981 grpc_error* Chttp2IncomingByteStream::Finished(grpc_error* error,
2982                                                bool reset_on_error) {
2983   if (error == GRPC_ERROR_NONE) {
2984     if (remaining_bytes_ != 0) {
2985       error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
2986     }
2987   }
2988   if (error != GRPC_ERROR_NONE && reset_on_error) {
2989     GRPC_CLOSURE_SCHED(&stream_->reset_byte_stream, GRPC_ERROR_REF(error));
2990   }
2991   Unref();
2992   return error;
2993 }
2994 
Shutdown(grpc_error * error)2995 void Chttp2IncomingByteStream::Shutdown(grpc_error* error) {
2996   GRPC_ERROR_UNREF(Finished(error, true /* reset_on_error */));
2997 }
2998 
2999 }  // namespace grpc_core
3000 
3001 /*******************************************************************************
3002  * RESOURCE QUOTAS
3003  */
3004 
post_benign_reclaimer(grpc_chttp2_transport * t)3005 static void post_benign_reclaimer(grpc_chttp2_transport* t) {
3006   if (!t->benign_reclaimer_registered) {
3007     t->benign_reclaimer_registered = true;
3008     GRPC_CHTTP2_REF_TRANSPORT(t, "benign_reclaimer");
3009     grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep),
3010                                       false, &t->benign_reclaimer_locked);
3011   }
3012 }
3013 
post_destructive_reclaimer(grpc_chttp2_transport * t)3014 static void post_destructive_reclaimer(grpc_chttp2_transport* t) {
3015   if (!t->destructive_reclaimer_registered) {
3016     t->destructive_reclaimer_registered = true;
3017     GRPC_CHTTP2_REF_TRANSPORT(t, "destructive_reclaimer");
3018     grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep),
3019                                       true, &t->destructive_reclaimer_locked);
3020   }
3021 }
3022 
benign_reclaimer_locked(void * arg,grpc_error * error)3023 static void benign_reclaimer_locked(void* arg, grpc_error* error) {
3024   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
3025   if (error == GRPC_ERROR_NONE &&
3026       grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
3027     /* Channel with no active streams: send a goaway to try and make it
3028      * disconnect cleanly */
3029     if (grpc_resource_quota_trace.enabled()) {
3030       gpr_log(GPR_INFO, "HTTP2: %s - send goaway to free memory",
3031               t->peer_string);
3032     }
3033     send_goaway(t,
3034                 grpc_error_set_int(
3035                     GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"),
3036                     GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM));
3037   } else if (error == GRPC_ERROR_NONE && grpc_resource_quota_trace.enabled()) {
3038     gpr_log(GPR_INFO,
3039             "HTTP2: %s - skip benign reclamation, there are still %" PRIdPTR
3040             " streams",
3041             t->peer_string, grpc_chttp2_stream_map_size(&t->stream_map));
3042   }
3043   t->benign_reclaimer_registered = false;
3044   if (error != GRPC_ERROR_CANCELLED) {
3045     grpc_resource_user_finish_reclamation(
3046         grpc_endpoint_get_resource_user(t->ep));
3047   }
3048   GRPC_CHTTP2_UNREF_TRANSPORT(t, "benign_reclaimer");
3049 }
3050 
destructive_reclaimer_locked(void * arg,grpc_error * error)3051 static void destructive_reclaimer_locked(void* arg, grpc_error* error) {
3052   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
3053   size_t n = grpc_chttp2_stream_map_size(&t->stream_map);
3054   t->destructive_reclaimer_registered = false;
3055   if (error == GRPC_ERROR_NONE && n > 0) {
3056     grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(
3057         grpc_chttp2_stream_map_rand(&t->stream_map));
3058     if (grpc_resource_quota_trace.enabled()) {
3059       gpr_log(GPR_INFO, "HTTP2: %s - abandon stream id %d", t->peer_string,
3060               s->id);
3061     }
3062     grpc_chttp2_cancel_stream(
3063         t, s,
3064         grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"),
3065                            GRPC_ERROR_INT_HTTP2_ERROR,
3066                            GRPC_HTTP2_ENHANCE_YOUR_CALM));
3067     if (n > 1) {
3068       /* Since we cancel one stream per destructive reclamation, if
3069          there are more streams left, we can immediately post a new
3070          reclaimer in case the resource quota needs to free more
3071          memory */
3072       post_destructive_reclaimer(t);
3073     }
3074   }
3075   if (error != GRPC_ERROR_CANCELLED) {
3076     grpc_resource_user_finish_reclamation(
3077         grpc_endpoint_get_resource_user(t->ep));
3078   }
3079   GRPC_CHTTP2_UNREF_TRANSPORT(t, "destructive_reclaimer");
3080 }
3081 
3082 /*******************************************************************************
3083  * MONITORING
3084  */
3085 
grpc_chttp2_initiate_write_reason_string(grpc_chttp2_initiate_write_reason reason)3086 const char* grpc_chttp2_initiate_write_reason_string(
3087     grpc_chttp2_initiate_write_reason reason) {
3088   switch (reason) {
3089     case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE:
3090       return "INITIAL_WRITE";
3091     case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM:
3092       return "START_NEW_STREAM";
3093     case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE:
3094       return "SEND_MESSAGE";
3095     case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA:
3096       return "SEND_INITIAL_METADATA";
3097     case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA:
3098       return "SEND_TRAILING_METADATA";
3099     case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING:
3100       return "RETRY_SEND_PING";
3101     case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS:
3102       return "CONTINUE_PINGS";
3103     case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT:
3104       return "GOAWAY_SENT";
3105     case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM:
3106       return "RST_STREAM";
3107     case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API:
3108       return "CLOSE_FROM_API";
3109     case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL:
3110       return "STREAM_FLOW_CONTROL";
3111     case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL:
3112       return "TRANSPORT_FLOW_CONTROL";
3113     case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS:
3114       return "SEND_SETTINGS";
3115     case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING:
3116       return "FLOW_CONTROL_UNSTALLED_BY_SETTING";
3117     case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE:
3118       return "FLOW_CONTROL_UNSTALLED_BY_UPDATE";
3119     case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING:
3120       return "APPLICATION_PING";
3121     case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING:
3122       return "KEEPALIVE_PING";
3123     case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED:
3124       return "TRANSPORT_FLOW_CONTROL_UNSTALLED";
3125     case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE:
3126       return "PING_RESPONSE";
3127     case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM:
3128       return "FORCE_RST_STREAM";
3129   }
3130   GPR_UNREACHABLE_CODE(return "unknown");
3131 }
3132 
chttp2_get_endpoint(grpc_transport * t)3133 static grpc_endpoint* chttp2_get_endpoint(grpc_transport* t) {
3134   return (reinterpret_cast<grpc_chttp2_transport*>(t))->ep;
3135 }
3136 
3137 static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
3138                                              "chttp2",
3139                                              init_stream,
3140                                              set_pollset,
3141                                              set_pollset_set,
3142                                              perform_stream_op,
3143                                              perform_transport_op,
3144                                              destroy_stream,
3145                                              destroy_transport,
3146                                              chttp2_get_endpoint};
3147 
get_vtable(void)3148 static const grpc_transport_vtable* get_vtable(void) { return &vtable; }
3149 
grpc_create_chttp2_transport(const grpc_channel_args * channel_args,grpc_endpoint * ep,bool is_client)3150 grpc_transport* grpc_create_chttp2_transport(
3151     const grpc_channel_args* channel_args, grpc_endpoint* ep, bool is_client) {
3152   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(
3153       gpr_zalloc(sizeof(grpc_chttp2_transport)));
3154   init_transport(t, channel_args, ep, is_client);
3155   return &t->base;
3156 }
3157 
grpc_chttp2_transport_start_reading(grpc_transport * transport,grpc_slice_buffer * read_buffer,grpc_closure * notify_on_receive_settings)3158 void grpc_chttp2_transport_start_reading(
3159     grpc_transport* transport, grpc_slice_buffer* read_buffer,
3160     grpc_closure* notify_on_receive_settings) {
3161   grpc_chttp2_transport* t =
3162       reinterpret_cast<grpc_chttp2_transport*>(transport);
3163   GRPC_CHTTP2_REF_TRANSPORT(
3164       t, "reading_action"); /* matches unref inside reading_action */
3165   if (read_buffer != nullptr) {
3166     grpc_slice_buffer_move_into(read_buffer, &t->read_buffer);
3167     gpr_free(read_buffer);
3168   }
3169   t->notify_on_receive_settings = notify_on_receive_settings;
3170   GRPC_CLOSURE_SCHED(&t->read_action_locked, GRPC_ERROR_NONE);
3171 }
3172