• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpc/slice.h>
21 #include <grpc/slice_buffer.h>
22 #include <grpc/support/port_platform.h>
23 #include <inttypes.h>
24 #include <string.h>
25 
26 #include <atomic>
27 #include <initializer_list>
28 #include <limits>
29 #include <memory>
30 #include <string>
31 #include <utility>
32 
33 #include "absl/base/attributes.h"
34 #include "absl/container/flat_hash_map.h"
35 #include "absl/log/check.h"
36 #include "absl/log/log.h"
37 #include "absl/random/bit_gen_ref.h"
38 #include "absl/status/status.h"
39 #include "absl/strings/str_cat.h"
40 #include "absl/strings/str_format.h"
41 #include "absl/strings/string_view.h"
42 #include "absl/types/variant.h"
43 #include "src/core/channelz/channelz.h"
44 #include "src/core/ext/transport/chttp2/transport/call_tracer_wrapper.h"
45 #include "src/core/ext/transport/chttp2/transport/flow_control.h"
46 #include "src/core/ext/transport/chttp2/transport/frame_data.h"
47 #include "src/core/ext/transport/chttp2/transport/frame_goaway.h"
48 #include "src/core/ext/transport/chttp2/transport/frame_ping.h"
49 #include "src/core/ext/transport/chttp2/transport/frame_rst_stream.h"
50 #include "src/core/ext/transport/chttp2/transport/frame_security.h"
51 #include "src/core/ext/transport/chttp2/transport/frame_settings.h"
52 #include "src/core/ext/transport/chttp2/transport/frame_window_update.h"
53 #include "src/core/ext/transport/chttp2/transport/hpack_parser.h"
54 #include "src/core/ext/transport/chttp2/transport/hpack_parser_table.h"
55 #include "src/core/ext/transport/chttp2/transport/http2_settings.h"
56 #include "src/core/ext/transport/chttp2/transport/internal.h"
57 #include "src/core/ext/transport/chttp2/transport/legacy_frame.h"
58 #include "src/core/ext/transport/chttp2/transport/ping_rate_policy.h"
59 #include "src/core/lib/debug/trace.h"
60 #include "src/core/lib/experiments/experiments.h"
61 #include "src/core/lib/iomgr/closure.h"
62 #include "src/core/lib/iomgr/combiner.h"
63 #include "src/core/lib/iomgr/endpoint.h"
64 #include "src/core/lib/iomgr/error.h"
65 #include "src/core/lib/slice/slice.h"
66 #include "src/core/lib/transport/bdp_estimator.h"
67 #include "src/core/lib/transport/error_utils.h"
68 #include "src/core/lib/transport/http2_errors.h"
69 #include "src/core/lib/transport/metadata_batch.h"
70 #include "src/core/lib/transport/transport.h"
71 #include "src/core/telemetry/call_tracer.h"
72 #include "src/core/telemetry/stats.h"
73 #include "src/core/telemetry/stats_data.h"
74 #include "src/core/util/random_early_detection.h"
75 #include "src/core/util/ref_counted_ptr.h"
76 #include "src/core/util/status_helper.h"
77 
78 using grpc_core::HPackParser;
79 
80 static grpc_error_handle init_frame_parser(grpc_chttp2_transport* t,
81                                            size_t& requests_started);
82 static grpc_error_handle init_header_frame_parser(grpc_chttp2_transport* t,
83                                                   int is_continuation,
84                                                   size_t& requests_started);
85 static grpc_error_handle init_data_frame_parser(grpc_chttp2_transport* t);
86 static grpc_error_handle init_rst_stream_parser(grpc_chttp2_transport* t);
87 static grpc_error_handle init_settings_frame_parser(grpc_chttp2_transport* t);
88 static grpc_error_handle init_window_update_frame_parser(
89     grpc_chttp2_transport* t);
90 static grpc_error_handle init_ping_parser(grpc_chttp2_transport* t);
91 static grpc_error_handle init_goaway_parser(grpc_chttp2_transport* t);
92 static grpc_error_handle init_security_frame_parser(grpc_chttp2_transport* t);
93 static grpc_error_handle init_non_header_skip_frame_parser(
94     grpc_chttp2_transport* t);
95 
96 static grpc_error_handle parse_frame_slice(grpc_chttp2_transport* t,
97                                            const grpc_slice& slice,
98                                            int is_last);
99 
get_utf8_safe_char(char c)100 static char get_utf8_safe_char(char c) {
101   return static_cast<unsigned char>(c) < 128 ? c : 32;
102 }
103 
grpc_chttp2_min_read_progress_size(grpc_chttp2_transport * t)104 uint32_t grpc_chttp2_min_read_progress_size(grpc_chttp2_transport* t) {
105   switch (t->deframe_state) {
106     case GRPC_DTS_CLIENT_PREFIX_0:
107     case GRPC_DTS_CLIENT_PREFIX_1:
108     case GRPC_DTS_CLIENT_PREFIX_2:
109     case GRPC_DTS_CLIENT_PREFIX_3:
110     case GRPC_DTS_CLIENT_PREFIX_4:
111     case GRPC_DTS_CLIENT_PREFIX_5:
112     case GRPC_DTS_CLIENT_PREFIX_6:
113     case GRPC_DTS_CLIENT_PREFIX_7:
114     case GRPC_DTS_CLIENT_PREFIX_8:
115     case GRPC_DTS_CLIENT_PREFIX_9:
116     case GRPC_DTS_CLIENT_PREFIX_10:
117     case GRPC_DTS_CLIENT_PREFIX_11:
118     case GRPC_DTS_CLIENT_PREFIX_12:
119     case GRPC_DTS_CLIENT_PREFIX_13:
120     case GRPC_DTS_CLIENT_PREFIX_14:
121     case GRPC_DTS_CLIENT_PREFIX_15:
122     case GRPC_DTS_CLIENT_PREFIX_16:
123     case GRPC_DTS_CLIENT_PREFIX_17:
124     case GRPC_DTS_CLIENT_PREFIX_18:
125     case GRPC_DTS_CLIENT_PREFIX_19:
126     case GRPC_DTS_CLIENT_PREFIX_20:
127     case GRPC_DTS_CLIENT_PREFIX_21:
128     case GRPC_DTS_CLIENT_PREFIX_22:
129     case GRPC_DTS_CLIENT_PREFIX_23:
130       // Need the client prefix *and* the first fixed header to make progress.
131       return 9 + 24 - (t->deframe_state - GRPC_DTS_CLIENT_PREFIX_0);
132     case GRPC_DTS_FH_0:
133     case GRPC_DTS_FH_1:
134     case GRPC_DTS_FH_2:
135     case GRPC_DTS_FH_3:
136     case GRPC_DTS_FH_4:
137     case GRPC_DTS_FH_5:
138     case GRPC_DTS_FH_6:
139     case GRPC_DTS_FH_7:
140     case GRPC_DTS_FH_8:
141       return 9 - (t->deframe_state - GRPC_DTS_FH_0);
142     case GRPC_DTS_FRAME:
143       return t->incoming_frame_size;
144   }
145   GPR_UNREACHABLE_CODE(return 1);
146 }
147 
148 namespace {
149 struct KnownFlag {
150   uint8_t flag;
151   absl::string_view name;
152 };
153 
MakeFrameTypeString(absl::string_view frame_type,uint8_t flags,std::initializer_list<KnownFlag> known_flags)154 std::string MakeFrameTypeString(absl::string_view frame_type, uint8_t flags,
155                                 std::initializer_list<KnownFlag> known_flags) {
156   std::string result(frame_type);
157   for (const KnownFlag& known_flag : known_flags) {
158     if (flags & known_flag.flag) {
159       absl::StrAppend(&result, ":", known_flag.name);
160       flags &= ~known_flag.flag;
161     }
162   }
163   if (flags != 0) {
164     absl::StrAppend(&result, ":UNKNOWN_FLAGS=0x",
165                     absl::Hex(flags, absl::kZeroPad2));
166   }
167   return result;
168 }
169 
FrameTypeString(uint8_t frame_type,uint8_t flags)170 std::string FrameTypeString(uint8_t frame_type, uint8_t flags) {
171   switch (frame_type) {
172     case GRPC_CHTTP2_FRAME_DATA:
173       return MakeFrameTypeString(
174           "DATA", flags, {{GRPC_CHTTP2_DATA_FLAG_END_STREAM, "END_STREAM"}});
175     case GRPC_CHTTP2_FRAME_HEADER:
176       return MakeFrameTypeString(
177           "HEADERS", flags,
178           {{GRPC_CHTTP2_DATA_FLAG_END_STREAM, "END_STREAM"},
179            {GRPC_CHTTP2_DATA_FLAG_END_HEADERS, "END_HEADERS"},
180            {GRPC_CHTTP2_FLAG_HAS_PRIORITY, "PRIORITY"}});
181     case GRPC_CHTTP2_FRAME_CONTINUATION:
182       return MakeFrameTypeString(
183           "HEADERS", flags,
184           {{GRPC_CHTTP2_DATA_FLAG_END_STREAM, "END_STREAM"},
185            {GRPC_CHTTP2_DATA_FLAG_END_HEADERS, "END_HEADERS"},
186            {GRPC_CHTTP2_FLAG_HAS_PRIORITY, "PRIORITY"}});
187     case GRPC_CHTTP2_FRAME_RST_STREAM:
188       return MakeFrameTypeString("RST_STREAM", flags, {});
189     case GRPC_CHTTP2_FRAME_SETTINGS:
190       return MakeFrameTypeString("SETTINGS", flags,
191                                  {{GRPC_CHTTP2_FLAG_ACK, "ACK"}});
192     case GRPC_CHTTP2_FRAME_PING:
193       return MakeFrameTypeString("PING", flags,
194                                  {{GRPC_CHTTP2_FLAG_ACK, "ACK"}});
195     case GRPC_CHTTP2_FRAME_GOAWAY:
196       return MakeFrameTypeString("GOAWAY", flags, {});
197     case GRPC_CHTTP2_FRAME_WINDOW_UPDATE:
198       return MakeFrameTypeString("WINDOW_UPDATE", flags, {});
199     case GRPC_CHTTP2_FRAME_SECURITY:
200       return MakeFrameTypeString("SECURITY", flags, {});
201     default:
202       return MakeFrameTypeString(
203           absl::StrCat("UNKNOWN_FRAME_TYPE_", static_cast<int>(frame_type)),
204           flags, {});
205   }
206 }
207 }  // namespace
208 
grpc_chttp2_perform_read(grpc_chttp2_transport * t,const grpc_slice & slice,size_t & requests_started)209 absl::variant<size_t, absl::Status> grpc_chttp2_perform_read(
210     grpc_chttp2_transport* t, const grpc_slice& slice,
211     size_t& requests_started) {
212   GRPC_LATENT_SEE_INNER_SCOPE("grpc_chttp2_perform_read");
213 
214   const uint8_t* beg = GRPC_SLICE_START_PTR(slice);
215   const uint8_t* end = GRPC_SLICE_END_PTR(slice);
216   const uint8_t* cur = beg;
217   grpc_error_handle err;
218 
219   if (cur == end) return absl::OkStatus();
220 
221   switch (t->deframe_state) {
222     case GRPC_DTS_CLIENT_PREFIX_0:
223     case GRPC_DTS_CLIENT_PREFIX_1:
224     case GRPC_DTS_CLIENT_PREFIX_2:
225     case GRPC_DTS_CLIENT_PREFIX_3:
226     case GRPC_DTS_CLIENT_PREFIX_4:
227     case GRPC_DTS_CLIENT_PREFIX_5:
228     case GRPC_DTS_CLIENT_PREFIX_6:
229     case GRPC_DTS_CLIENT_PREFIX_7:
230     case GRPC_DTS_CLIENT_PREFIX_8:
231     case GRPC_DTS_CLIENT_PREFIX_9:
232     case GRPC_DTS_CLIENT_PREFIX_10:
233     case GRPC_DTS_CLIENT_PREFIX_11:
234     case GRPC_DTS_CLIENT_PREFIX_12:
235     case GRPC_DTS_CLIENT_PREFIX_13:
236     case GRPC_DTS_CLIENT_PREFIX_14:
237     case GRPC_DTS_CLIENT_PREFIX_15:
238     case GRPC_DTS_CLIENT_PREFIX_16:
239     case GRPC_DTS_CLIENT_PREFIX_17:
240     case GRPC_DTS_CLIENT_PREFIX_18:
241     case GRPC_DTS_CLIENT_PREFIX_19:
242     case GRPC_DTS_CLIENT_PREFIX_20:
243     case GRPC_DTS_CLIENT_PREFIX_21:
244     case GRPC_DTS_CLIENT_PREFIX_22:
245     case GRPC_DTS_CLIENT_PREFIX_23:
246       while (cur != end && t->deframe_state != GRPC_DTS_FH_0) {
247         if (*cur != GRPC_CHTTP2_CLIENT_CONNECT_STRING[t->deframe_state]) {
248           return GRPC_ERROR_CREATE(absl::StrFormat(
249               "Connect string mismatch: expected '%c' (%d) got '%c' (%d) "
250               "at byte %d",
251               get_utf8_safe_char(
252                   GRPC_CHTTP2_CLIENT_CONNECT_STRING[t->deframe_state]),
253               static_cast<int>(static_cast<uint8_t>(
254                   GRPC_CHTTP2_CLIENT_CONNECT_STRING[t->deframe_state])),
255               get_utf8_safe_char(*cur), static_cast<int>(*cur),
256               t->deframe_state));
257         }
258         ++cur;
259         // NOLINTNEXTLINE(bugprone-misplaced-widening-cast)
260         t->deframe_state = static_cast<grpc_chttp2_deframe_transport_state>(
261             1 + static_cast<int>(t->deframe_state));
262       }
263       if (cur == end) {
264         return absl::OkStatus();
265       }
266     dts_fh_0:
267       if (requests_started >= t->max_requests_per_read) {
268         t->deframe_state = GRPC_DTS_FH_0;
269         return static_cast<size_t>(cur - beg);
270       }
271       ABSL_FALLTHROUGH_INTENDED;
272     case GRPC_DTS_FH_0:
273       DCHECK_LT(cur, end);
274       t->incoming_frame_size = (static_cast<uint32_t>(*cur)) << 16;
275       if (++cur == end) {
276         t->deframe_state = GRPC_DTS_FH_1;
277         return absl::OkStatus();
278       }
279       ABSL_FALLTHROUGH_INTENDED;
280     case GRPC_DTS_FH_1:
281       DCHECK_LT(cur, end);
282       t->incoming_frame_size |= (static_cast<uint32_t>(*cur)) << 8;
283       if (++cur == end) {
284         t->deframe_state = GRPC_DTS_FH_2;
285         return absl::OkStatus();
286       }
287       ABSL_FALLTHROUGH_INTENDED;
288     case GRPC_DTS_FH_2:
289       DCHECK_LT(cur, end);
290       t->incoming_frame_size |= *cur;
291       if (++cur == end) {
292         t->deframe_state = GRPC_DTS_FH_3;
293         return absl::OkStatus();
294       }
295       ABSL_FALLTHROUGH_INTENDED;
296     case GRPC_DTS_FH_3:
297       DCHECK_LT(cur, end);
298       t->incoming_frame_type = *cur;
299       if (++cur == end) {
300         t->deframe_state = GRPC_DTS_FH_4;
301         return absl::OkStatus();
302       }
303       ABSL_FALLTHROUGH_INTENDED;
304     case GRPC_DTS_FH_4:
305       DCHECK_LT(cur, end);
306       t->incoming_frame_flags = *cur;
307       if (++cur == end) {
308         t->deframe_state = GRPC_DTS_FH_5;
309         return absl::OkStatus();
310       }
311       ABSL_FALLTHROUGH_INTENDED;
312     case GRPC_DTS_FH_5:
313       DCHECK_LT(cur, end);
314       t->incoming_stream_id = ((static_cast<uint32_t>(*cur)) & 0x7f) << 24;
315       if (++cur == end) {
316         t->deframe_state = GRPC_DTS_FH_6;
317         return absl::OkStatus();
318       }
319       ABSL_FALLTHROUGH_INTENDED;
320     case GRPC_DTS_FH_6:
321       DCHECK_LT(cur, end);
322       t->incoming_stream_id |= (static_cast<uint32_t>(*cur)) << 16;
323       if (++cur == end) {
324         t->deframe_state = GRPC_DTS_FH_7;
325         return absl::OkStatus();
326       }
327       ABSL_FALLTHROUGH_INTENDED;
328     case GRPC_DTS_FH_7:
329       DCHECK_LT(cur, end);
330       t->incoming_stream_id |= (static_cast<uint32_t>(*cur)) << 8;
331       if (++cur == end) {
332         t->deframe_state = GRPC_DTS_FH_8;
333         return absl::OkStatus();
334       }
335       ABSL_FALLTHROUGH_INTENDED;
336     case GRPC_DTS_FH_8:
337       DCHECK_LT(cur, end);
338       t->incoming_stream_id |= (static_cast<uint32_t>(*cur));
339       GRPC_TRACE_LOG(http, INFO)
340           << "INCOMING[" << t << "]: "
341           << FrameTypeString(t->incoming_frame_type, t->incoming_frame_flags)
342           << " len:" << t->incoming_frame_size
343           << absl::StrFormat(" id:0x%08x", t->incoming_stream_id);
344       t->deframe_state = GRPC_DTS_FRAME;
345       err = init_frame_parser(t, requests_started);
346       if (!err.ok()) {
347         return err;
348       }
349       if (t->incoming_frame_size == 0) {
350         err = parse_frame_slice(t, grpc_empty_slice(), 1);
351         if (!err.ok()) {
352           return err;
353         }
354         t->incoming_stream = nullptr;
355         if (++cur == end) {
356           t->deframe_state = GRPC_DTS_FH_0;
357           return absl::OkStatus();
358         }
359         goto dts_fh_0;  // loop
360       } else if (t->incoming_frame_size >
361                  t->settings.acked().max_frame_size()) {
362         return GRPC_ERROR_CREATE(absl::StrFormat(
363             "Frame size %d is larger than max frame size %d",
364             t->incoming_frame_size, t->settings.acked().max_frame_size()));
365       }
366       if (++cur == end) {
367         return absl::OkStatus();
368       }
369       ABSL_FALLTHROUGH_INTENDED;
370     case GRPC_DTS_FRAME:
371       DCHECK_LT(cur, end);
372       if (static_cast<uint32_t>(end - cur) == t->incoming_frame_size) {
373         err = parse_frame_slice(
374             t,
375             grpc_slice_sub_no_ref(slice, static_cast<size_t>(cur - beg),
376                                   static_cast<size_t>(end - beg)),
377             1);
378         if (!err.ok()) {
379           return err;
380         }
381         t->deframe_state = GRPC_DTS_FH_0;
382         t->incoming_stream = nullptr;
383         return absl::OkStatus();
384       } else if (static_cast<uint32_t>(end - cur) > t->incoming_frame_size) {
385         size_t cur_offset = static_cast<size_t>(cur - beg);
386         err = parse_frame_slice(
387             t,
388             grpc_slice_sub_no_ref(slice, cur_offset,
389                                   cur_offset + t->incoming_frame_size),
390             1);
391         if (!err.ok()) {
392           return err;
393         }
394         cur += t->incoming_frame_size;
395         t->incoming_stream = nullptr;
396         if (t->incoming_frame_type == GRPC_CHTTP2_FRAME_RST_STREAM) {
397           requests_started = std::numeric_limits<size_t>::max();
398         }
399         goto dts_fh_0;  // loop
400       } else {
401         err = parse_frame_slice(
402             t,
403             grpc_slice_sub_no_ref(slice, static_cast<size_t>(cur - beg),
404                                   static_cast<size_t>(end - beg)),
405             0);
406         if (!err.ok()) {
407           return err;
408         }
409         t->incoming_frame_size -= static_cast<uint32_t>(end - cur);
410         return absl::OkStatus();
411       }
412       GPR_UNREACHABLE_CODE(return absl::OkStatus());
413   }
414 
415   GPR_UNREACHABLE_CODE(return absl::OkStatus());
416 }
417 
init_frame_parser(grpc_chttp2_transport * t,size_t & requests_started)418 static grpc_error_handle init_frame_parser(grpc_chttp2_transport* t,
419                                            size_t& requests_started) {
420   if (t->is_first_frame &&
421       t->incoming_frame_type != GRPC_CHTTP2_FRAME_SETTINGS) {
422     return GRPC_ERROR_CREATE(absl::StrCat(
423         "Expected SETTINGS frame as the first frame, got frame type ",
424         t->incoming_frame_type));
425   }
426   t->is_first_frame = false;
427   if (t->expect_continuation_stream_id != 0) {
428     if (t->incoming_frame_type != GRPC_CHTTP2_FRAME_CONTINUATION) {
429       return GRPC_ERROR_CREATE(
430           absl::StrFormat("Expected CONTINUATION frame, got frame type %02x",
431                           t->incoming_frame_type));
432     }
433     if (t->expect_continuation_stream_id != t->incoming_stream_id) {
434       return GRPC_ERROR_CREATE(absl::StrFormat(
435           "Expected CONTINUATION frame for grpc_chttp2_stream %08x, got "
436           "grpc_chttp2_stream %08x",
437           t->expect_continuation_stream_id, t->incoming_stream_id));
438     }
439     return init_header_frame_parser(t, 1, requests_started);
440   }
441   switch (t->incoming_frame_type) {
442     case GRPC_CHTTP2_FRAME_DATA:
443       return init_data_frame_parser(t);
444     case GRPC_CHTTP2_FRAME_HEADER:
445       return init_header_frame_parser(t, 0, requests_started);
446     case GRPC_CHTTP2_FRAME_CONTINUATION:
447       return GRPC_ERROR_CREATE("Unexpected CONTINUATION frame");
448     case GRPC_CHTTP2_FRAME_RST_STREAM:
449       return init_rst_stream_parser(t);
450     case GRPC_CHTTP2_FRAME_SETTINGS:
451       return init_settings_frame_parser(t);
452     case GRPC_CHTTP2_FRAME_WINDOW_UPDATE:
453       return init_window_update_frame_parser(t);
454     case GRPC_CHTTP2_FRAME_PING:
455       return init_ping_parser(t);
456     case GRPC_CHTTP2_FRAME_GOAWAY:
457       return init_goaway_parser(t);
458     case GRPC_CHTTP2_FRAME_SECURITY:
459       if (!t->settings.peer().allow_security_frame()) {
460         if (GRPC_TRACE_FLAG_ENABLED(http)) {
461           LOG(ERROR) << "Security frame received but not allowed, ignoring";
462         }
463         return init_non_header_skip_frame_parser(t);
464       }
465       return init_security_frame_parser(t);
466     default:
467       GRPC_TRACE_LOG(http, ERROR)
468           << "Unknown frame type "
469           << absl::StrFormat("%02x", t->incoming_frame_type);
470       return init_non_header_skip_frame_parser(t);
471   }
472 }
473 
skip_parser(void *,grpc_chttp2_transport *,grpc_chttp2_stream *,const grpc_slice &,int)474 static grpc_error_handle skip_parser(void* /*parser*/,
475                                      grpc_chttp2_transport* /*t*/,
476                                      grpc_chttp2_stream* /*s*/,
477                                      const grpc_slice& /*slice*/,
478                                      int /*is_last*/) {
479   return absl::OkStatus();
480 }
481 
hpack_boundary_type(grpc_chttp2_transport * t,bool is_eoh)482 static HPackParser::Boundary hpack_boundary_type(grpc_chttp2_transport* t,
483                                                  bool is_eoh) {
484   if (is_eoh) {
485     if (t->header_eof) {
486       return HPackParser::Boundary::EndOfStream;
487     } else {
488       return HPackParser::Boundary::EndOfHeaders;
489     }
490   } else {
491     return HPackParser::Boundary::None;
492   }
493 }
494 
hpack_parser_log_info(grpc_chttp2_transport * t,HPackParser::LogInfo::Type type)495 static HPackParser::LogInfo hpack_parser_log_info(
496     grpc_chttp2_transport* t, HPackParser::LogInfo::Type type) {
497   return HPackParser::LogInfo{
498       t->incoming_stream_id,
499       type,
500       t->is_client,
501   };
502 }
503 
init_header_skip_frame_parser(grpc_chttp2_transport * t,HPackParser::Priority priority_type,bool is_eoh)504 static grpc_error_handle init_header_skip_frame_parser(
505     grpc_chttp2_transport* t, HPackParser::Priority priority_type,
506     bool is_eoh) {
507   t->parser = grpc_chttp2_transport::Parser{
508       "header", grpc_chttp2_header_parser_parse, &t->hpack_parser};
509   t->hpack_parser.BeginFrame(
510       nullptr,
511       /*metadata_size_soft_limit=*/
512       t->max_header_list_size_soft_limit,
513       /*metadata_size_hard_limit=*/
514       t->settings.acked().max_header_list_size(),
515       hpack_boundary_type(t, is_eoh), priority_type,
516       hpack_parser_log_info(t, HPackParser::LogInfo::kDontKnow));
517   return absl::OkStatus();
518 }
519 
init_non_header_skip_frame_parser(grpc_chttp2_transport * t)520 static grpc_error_handle init_non_header_skip_frame_parser(
521     grpc_chttp2_transport* t) {
522   t->parser =
523       grpc_chttp2_transport::Parser{"skip_parser", skip_parser, nullptr};
524   return absl::OkStatus();
525 }
526 
grpc_chttp2_parsing_become_skip_parser(grpc_chttp2_transport * t)527 void grpc_chttp2_parsing_become_skip_parser(grpc_chttp2_transport* t) {
528   if (t->parser.parser == grpc_chttp2_header_parser_parse) {
529     t->hpack_parser.StopBufferingFrame();
530   } else {
531     t->parser =
532         grpc_chttp2_transport::Parser{"skip_parser", skip_parser, nullptr};
533   }
534 }
535 
init_data_frame_parser(grpc_chttp2_transport * t)536 static grpc_error_handle init_data_frame_parser(grpc_chttp2_transport* t) {
537   // Update BDP accounting since we have received a data frame.
538   grpc_core::BdpEstimator* bdp_est = t->flow_control.bdp_estimator();
539   if (bdp_est) {
540     if (t->bdp_ping_blocked) {
541       t->bdp_ping_blocked = false;
542       schedule_bdp_ping_locked(t->Ref());
543     }
544     bdp_est->AddIncomingBytes(t->incoming_frame_size);
545   }
546   grpc_chttp2_stream* s =
547       grpc_chttp2_parsing_lookup_stream(t, t->incoming_stream_id);
548   absl::Status status;
549   grpc_core::chttp2::FlowControlAction action;
550   if (s == nullptr) {
551     grpc_core::chttp2::TransportFlowControl::IncomingUpdateContext upd(
552         &t->flow_control);
553     status = upd.RecvData(t->incoming_frame_size);
554     action = upd.MakeAction();
555   } else {
556     grpc_core::chttp2::StreamFlowControl::IncomingUpdateContext upd(
557         &s->flow_control);
558     status = upd.RecvData(t->incoming_frame_size);
559     action = upd.MakeAction();
560   }
561   grpc_chttp2_act_on_flowctl_action(action, t, s);
562   if (!status.ok()) {
563     goto error_handler;
564   }
565   if (s == nullptr) {
566     return init_non_header_skip_frame_parser(t);
567   }
568   s->received_bytes += t->incoming_frame_size;
569   s->call_tracer_wrapper.RecordIncomingBytes({9, 0, 0});
570   if (s->read_closed) {
571     return init_non_header_skip_frame_parser(t);
572   }
573   status =
574       grpc_chttp2_data_parser_begin_frame(t->incoming_frame_flags, s->id, s);
575 error_handler:
576   if (status.ok()) {
577     t->incoming_stream = s;
578     t->parser = grpc_chttp2_transport::Parser{
579         "data", grpc_chttp2_data_parser_parse, nullptr};
580     t->ping_rate_policy.ReceivedDataFrame();
581     return absl::OkStatus();
582   } else if (s != nullptr) {
583     // handle stream errors by closing the stream
584     grpc_chttp2_mark_stream_closed(t, s, true, false,
585                                    absl_status_to_grpc_error(status));
586     grpc_chttp2_add_rst_stream_to_next_write(t, t->incoming_stream_id,
587                                              GRPC_HTTP2_PROTOCOL_ERROR,
588                                              &s->call_tracer_wrapper);
589     return init_non_header_skip_frame_parser(t);
590   } else {
591     return absl_status_to_grpc_error(status);
592   }
593 }
594 
init_header_frame_parser(grpc_chttp2_transport * t,int is_continuation,size_t & requests_started)595 static grpc_error_handle init_header_frame_parser(grpc_chttp2_transport* t,
596                                                   int is_continuation,
597                                                   size_t& requests_started) {
598   const bool is_eoh =
599       (t->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_HEADERS) != 0;
600   grpc_chttp2_stream* s;
601 
602   // TODO(ctiller): when to increment header_frames_received?
603 
604   if (is_eoh) {
605     t->expect_continuation_stream_id = 0;
606   } else {
607     t->expect_continuation_stream_id = t->incoming_stream_id;
608   }
609 
610   if (!is_continuation) {
611     t->header_eof =
612         (t->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) != 0;
613   }
614 
615   const auto priority_type = !is_continuation && (t->incoming_frame_flags &
616                                                   GRPC_CHTTP2_FLAG_HAS_PRIORITY)
617                                  ? HPackParser::Priority::Included
618                                  : HPackParser::Priority::None;
619 
620   t->ping_rate_policy.ReceivedDataFrame();
621 
622   // could be a new grpc_chttp2_stream or an existing grpc_chttp2_stream
623   s = grpc_chttp2_parsing_lookup_stream(t, t->incoming_stream_id);
624   if (s == nullptr) {
625     if (GPR_UNLIKELY(is_continuation)) {
626       GRPC_CHTTP2_IF_TRACING(ERROR)
627           << "grpc_chttp2_stream disbanded before CONTINUATION received";
628       return init_header_skip_frame_parser(t, priority_type, is_eoh);
629     }
630     if (t->is_client) {
631       if (GPR_LIKELY((t->incoming_stream_id & 1) &&
632                      t->incoming_stream_id < t->next_stream_id)) {
633         // this is an old (probably cancelled) grpc_chttp2_stream
634       } else {
635         GRPC_CHTTP2_IF_TRACING(ERROR)
636             << "ignoring new grpc_chttp2_stream creation on client";
637       }
638       return init_header_skip_frame_parser(t, priority_type, is_eoh);
639     } else if (GPR_UNLIKELY(t->last_new_stream_id >= t->incoming_stream_id)) {
640       GRPC_CHTTP2_IF_TRACING(ERROR)
641           << "ignoring out of order new grpc_chttp2_stream request on server; "
642              "last grpc_chttp2_stream id="
643           << t->last_new_stream_id
644           << ", new grpc_chttp2_stream id=" << t->incoming_stream_id;
645       return init_header_skip_frame_parser(t, priority_type, is_eoh);
646     } else if (GPR_UNLIKELY((t->incoming_stream_id & 1) == 0)) {
647       GRPC_CHTTP2_IF_TRACING(ERROR)
648           << "ignoring grpc_chttp2_stream with non-client generated index "
649           << t->incoming_stream_id;
650       return init_header_skip_frame_parser(t, priority_type, is_eoh);
651     } else if (GPR_UNLIKELY(t->stream_map.size() + t->extra_streams >=
652                             t->settings.acked().max_concurrent_streams())) {
653       ++t->num_pending_induced_frames;
654       grpc_slice_buffer_add(&t->qbuf, grpc_chttp2_rst_stream_create(
655                                           t->incoming_stream_id,
656                                           GRPC_HTTP2_REFUSED_STREAM, nullptr));
657       grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
658       return init_header_skip_frame_parser(t, priority_type, is_eoh);
659     } else if (grpc_core::IsRqFastRejectEnabled() &&
660                GPR_UNLIKELY(t->memory_owner.IsMemoryPressureHigh())) {
661       // We have more streams allocated than we'd like, so apply some pushback
662       // by refusing this stream.
663       grpc_core::global_stats().IncrementRqCallsRejected();
664       ++t->num_pending_induced_frames;
665       grpc_slice_buffer_add(
666           &t->qbuf,
667           grpc_chttp2_rst_stream_create(t->incoming_stream_id,
668                                         GRPC_HTTP2_ENHANCE_YOUR_CALM, nullptr));
669       grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
670       return init_header_skip_frame_parser(t, priority_type, is_eoh);
671     } else if (GPR_UNLIKELY(
672                    t->max_concurrent_streams_overload_protection &&
673                    t->streams_allocated.load(std::memory_order_relaxed) >
674                        t->settings.local().max_concurrent_streams())) {
675       // We have more streams allocated than we'd like, so apply some pushback
676       // by refusing this stream.
677       ++t->num_pending_induced_frames;
678       grpc_slice_buffer_add(&t->qbuf, grpc_chttp2_rst_stream_create(
679                                           t->incoming_stream_id,
680                                           GRPC_HTTP2_REFUSED_STREAM, nullptr));
681       grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
682       return init_header_skip_frame_parser(t, priority_type, is_eoh);
683     } else if (GPR_UNLIKELY(t->stream_map.size() >=
684                                 t->settings.local().max_concurrent_streams() &&
685                             grpc_core::RandomEarlyDetection(
686                                 t->settings.local().max_concurrent_streams(),
687                                 t->settings.acked().max_concurrent_streams())
688                                 .Reject(t->stream_map.size(), t->bitgen))) {
689       // We are under the limit of max concurrent streams for the current
690       // setting, but are over the next value that will be advertised.
691       // Apply some backpressure by randomly not accepting new streams.
692       ++t->num_pending_induced_frames;
693       grpc_slice_buffer_add(&t->qbuf, grpc_chttp2_rst_stream_create(
694                                           t->incoming_stream_id,
695                                           GRPC_HTTP2_REFUSED_STREAM, nullptr));
696       grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
697       return init_header_skip_frame_parser(t, priority_type, is_eoh);
698     } else if (t->sent_goaway_state == GRPC_CHTTP2_FINAL_GOAWAY_SENT ||
699                t->sent_goaway_state ==
700                    GRPC_CHTTP2_FINAL_GOAWAY_SEND_SCHEDULED) {
701       GRPC_CHTTP2_IF_TRACING(INFO)
702           << "transport:" << t
703           << " SERVER peer:" << t->peer_string.as_string_view()
704           << " Final GOAWAY sent. Ignoring new grpc_chttp2_stream request "
705              "id="
706           << t->incoming_stream_id
707           << ", last grpc_chttp2_stream id=" << t->last_new_stream_id;
708       ;
709       return init_header_skip_frame_parser(t, priority_type, is_eoh);
710     } else if (t->num_incoming_streams_before_settings_ack == 0) {
711       GRPC_CHTTP2_IF_TRACING(ERROR)
712           << "transport:" << t
713           << " SERVER peer:" << t->peer_string.as_string_view()
714           << " rejecting grpc_chttp2_stream id=" << t->incoming_stream_id
715           << ", last grpc_chttp2_stream id=" << t->last_new_stream_id
716           << " before settings have been acknowledged";
717       ++t->num_pending_induced_frames;
718       grpc_slice_buffer_add(
719           &t->qbuf,
720           grpc_chttp2_rst_stream_create(t->incoming_stream_id,
721                                         GRPC_HTTP2_ENHANCE_YOUR_CALM, nullptr));
722       grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
723       t->last_new_stream_id = t->incoming_stream_id;
724       return init_header_skip_frame_parser(t, priority_type, is_eoh);
725     }
726     --t->num_incoming_streams_before_settings_ack;
727     t->last_new_stream_id = t->incoming_stream_id;
728     s = t->incoming_stream =
729         grpc_chttp2_parsing_accept_stream(t, t->incoming_stream_id);
730     ++requests_started;
731     if (GPR_UNLIKELY(s == nullptr)) {
732       GRPC_CHTTP2_IF_TRACING(ERROR) << "grpc_chttp2_stream not accepted";
733       return init_header_skip_frame_parser(t, priority_type, is_eoh);
734     }
735     if (GRPC_TRACE_FLAG_ENABLED(http) ||
736         GRPC_TRACE_FLAG_ENABLED(chttp2_new_stream)) {
737       LOG(INFO) << "[t:" << t << " fd:" << grpc_endpoint_get_fd(t->ep.get())
738                 << " peer:" << t->peer_string.as_string_view()
739                 << "] Accepting new stream; "
740                    "num_incoming_streams_before_settings_ack="
741                 << t->num_incoming_streams_before_settings_ack;
742     }
743     if (t->channelz_socket != nullptr) {
744       t->channelz_socket->RecordStreamStartedFromRemote();
745     }
746   } else {
747     t->incoming_stream = s;
748   }
749   DCHECK_NE(s, nullptr);
750   s->call_tracer_wrapper.RecordIncomingBytes({9, 0, 0});
751   if (GPR_UNLIKELY(s->read_closed)) {
752     GRPC_CHTTP2_IF_TRACING(ERROR)
753         << "skipping already closed grpc_chttp2_stream header";
754     t->incoming_stream = nullptr;
755     return init_header_skip_frame_parser(t, priority_type, is_eoh);
756   }
757   t->parser = grpc_chttp2_transport::Parser{
758       "header", grpc_chttp2_header_parser_parse, &t->hpack_parser};
759   if (t->header_eof) {
760     s->eos_received = true;
761   }
762   grpc_metadata_batch* incoming_metadata_buffer = nullptr;
763   HPackParser::LogInfo::Type frame_type = HPackParser::LogInfo::kDontKnow;
764   switch (s->header_frames_received) {
765     case 0:
766       if (t->is_client && t->header_eof) {
767         GRPC_CHTTP2_IF_TRACING(INFO) << "parsing Trailers-Only";
768         if (s->trailing_metadata_available != nullptr) {
769           *s->trailing_metadata_available = true;
770         }
771         s->parsed_trailers_only = true;
772         s->trailing_metadata_buffer.Set(grpc_core::GrpcTrailersOnly(), true);
773         s->initial_metadata_buffer.Set(grpc_core::GrpcTrailersOnly(), true);
774         incoming_metadata_buffer = &s->trailing_metadata_buffer;
775         frame_type = HPackParser::LogInfo::kTrailers;
776       } else {
777         GRPC_CHTTP2_IF_TRACING(INFO) << "parsing initial_metadata";
778         incoming_metadata_buffer = &s->initial_metadata_buffer;
779         frame_type = HPackParser::LogInfo::kHeaders;
780       }
781       break;
782     case 1:
783       GRPC_CHTTP2_IF_TRACING(INFO) << "parsing trailing_metadata";
784       incoming_metadata_buffer = &s->trailing_metadata_buffer;
785       frame_type = HPackParser::LogInfo::kTrailers;
786       break;
787     case 2:
788       LOG(ERROR) << "too many header frames received";
789       return init_header_skip_frame_parser(t, priority_type, is_eoh);
790   }
791   if (frame_type == HPackParser::LogInfo::kTrailers && !t->header_eof) {
792     return GRPC_ERROR_CREATE(
793         "Trailing metadata frame received without an end-o-stream");
794   }
795   t->hpack_parser.BeginFrame(incoming_metadata_buffer,
796                              /*metadata_size_soft_limit=*/
797                              t->max_header_list_size_soft_limit,
798                              /*metadata_size_hard_limit=*/
799                              t->settings.acked().max_header_list_size(),
800                              hpack_boundary_type(t, is_eoh), priority_type,
801                              hpack_parser_log_info(t, frame_type));
802   return absl::OkStatus();
803 }
804 
init_window_update_frame_parser(grpc_chttp2_transport * t)805 static grpc_error_handle init_window_update_frame_parser(
806     grpc_chttp2_transport* t) {
807   grpc_error_handle err = grpc_chttp2_window_update_parser_begin_frame(
808       &t->simple.window_update, t->incoming_frame_size,
809       t->incoming_frame_flags);
810   if (!err.ok()) return err;
811   if (t->incoming_stream_id != 0) {
812     grpc_chttp2_stream* s = t->incoming_stream =
813         grpc_chttp2_parsing_lookup_stream(t, t->incoming_stream_id);
814     if (s == nullptr) {
815       GRPC_TRACE_LOG(http, ERROR) << "Stream " << t->incoming_stream_id
816                                   << " not found, ignoring WINDOW_UPDATE";
817       return init_non_header_skip_frame_parser(t);
818     }
819     s->call_tracer_wrapper.RecordIncomingBytes({9, 0, 0});
820   }
821   t->parser = grpc_chttp2_transport::Parser{
822       "window_update", grpc_chttp2_window_update_parser_parse,
823       &t->simple.window_update};
824   return absl::OkStatus();
825 }
826 
init_ping_parser(grpc_chttp2_transport * t)827 static grpc_error_handle init_ping_parser(grpc_chttp2_transport* t) {
828   grpc_error_handle err = grpc_chttp2_ping_parser_begin_frame(
829       &t->simple.ping, t->incoming_frame_size, t->incoming_frame_flags);
830   if (!err.ok()) return err;
831   t->parser = grpc_chttp2_transport::Parser{
832       "ping", grpc_chttp2_ping_parser_parse, &t->simple.ping};
833   return absl::OkStatus();
834 }
835 
init_rst_stream_parser(grpc_chttp2_transport * t)836 static grpc_error_handle init_rst_stream_parser(grpc_chttp2_transport* t) {
837   grpc_error_handle err = grpc_chttp2_rst_stream_parser_begin_frame(
838       &t->simple.rst_stream, t->incoming_frame_size, t->incoming_frame_flags);
839   if (!err.ok()) return err;
840   grpc_chttp2_stream* s = t->incoming_stream =
841       grpc_chttp2_parsing_lookup_stream(t, t->incoming_stream_id);
842   if (!t->incoming_stream) {
843     return init_non_header_skip_frame_parser(t);
844   }
845   s->call_tracer_wrapper.RecordIncomingBytes({9, 0, 0});
846   t->parser = grpc_chttp2_transport::Parser{
847       "rst_stream", grpc_chttp2_rst_stream_parser_parse, &t->simple.rst_stream};
848   return absl::OkStatus();
849 }
850 
init_goaway_parser(grpc_chttp2_transport * t)851 static grpc_error_handle init_goaway_parser(grpc_chttp2_transport* t) {
852   grpc_error_handle err = grpc_chttp2_goaway_parser_begin_frame(
853       &t->goaway_parser, t->incoming_frame_size, t->incoming_frame_flags);
854   if (!err.ok()) return err;
855   t->parser = grpc_chttp2_transport::Parser{
856       "goaway", grpc_chttp2_goaway_parser_parse, &t->goaway_parser};
857   return absl::OkStatus();
858 }
859 
init_settings_frame_parser(grpc_chttp2_transport * t)860 static grpc_error_handle init_settings_frame_parser(grpc_chttp2_transport* t) {
861   if (t->incoming_stream_id != 0) {
862     return GRPC_ERROR_CREATE("Settings frame received for grpc_chttp2_stream");
863   }
864 
865   grpc_error_handle err = grpc_chttp2_settings_parser_begin_frame(
866       &t->simple.settings, t->incoming_frame_size, t->incoming_frame_flags,
867       t->settings.mutable_peer());
868   if (!err.ok()) {
869     return err;
870   }
871   if (t->incoming_frame_flags & GRPC_CHTTP2_FLAG_ACK) {
872     if (!t->settings.AckLastSend()) {
873       return GRPC_ERROR_CREATE("Received unexpected settings ack");
874     }
875     t->hpack_parser.hpack_table()->SetMaxBytes(
876         t->settings.acked().header_table_size());
877     grpc_chttp2_act_on_flowctl_action(
878         t->flow_control.SetAckedInitialWindow(
879             t->settings.acked().initial_window_size()),
880         t, nullptr);
881     if (t->settings_ack_watchdog !=
882         grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid) {
883       t->event_engine->Cancel(std::exchange(
884           t->settings_ack_watchdog,
885           grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid));
886     }
887     // This is more streams than can be started in http2, so setting this
888     // effictively removes the limit for the rest of the connection.
889     t->num_incoming_streams_before_settings_ack =
890         std::numeric_limits<uint32_t>::max();
891   }
892   t->parser = grpc_chttp2_transport::Parser{
893       "settings", grpc_chttp2_settings_parser_parse, &t->simple.settings};
894   return absl::OkStatus();
895 }
896 
init_security_frame_parser(grpc_chttp2_transport * t)897 static grpc_error_handle init_security_frame_parser(grpc_chttp2_transport* t) {
898   grpc_error_handle err =
899       grpc_chttp2_security_frame_parser_begin_frame(&t->security_frame_parser);
900   if (!err.ok()) return err;
901   t->parser = grpc_chttp2_transport::Parser{
902       "security_frame", grpc_chttp2_security_frame_parser_parse,
903       &t->security_frame_parser};
904   return absl::OkStatus();
905 }
906 
parse_frame_slice(grpc_chttp2_transport * t,const grpc_slice & slice,int is_last)907 static grpc_error_handle parse_frame_slice(grpc_chttp2_transport* t,
908                                            const grpc_slice& slice,
909                                            int is_last) {
910   grpc_chttp2_stream* s = t->incoming_stream;
911   GRPC_TRACE_VLOG(http, 2) << "INCOMING[" << t << ";" << s << "]: Parse "
912                            << GRPC_SLICE_LENGTH(slice) << "b "
913                            << (is_last ? "last " : "") << "frame fragment with "
914                            << t->parser.name;
915   grpc_error_handle err =
916       t->parser.parser(t->parser.user_data, t, s, slice, is_last);
917   intptr_t unused;
918   if (GPR_LIKELY(err.ok())) {
919     return err;
920   }
921   GRPC_TRACE_LOG(http, ERROR)
922       << "INCOMING[" << t << ";" << s << "]: Parse failed with " << err;
923   if (grpc_error_get_int(err, grpc_core::StatusIntProperty::kStreamId,
924                          &unused)) {
925     grpc_chttp2_parsing_become_skip_parser(t);
926     if (s) {
927       grpc_chttp2_cancel_stream(t, s, err, true);
928     }
929     return absl::OkStatus();
930   }
931   return err;
932 }
933 
934 typedef void (*maybe_complete_func_type)(grpc_chttp2_transport* t,
935                                          grpc_chttp2_stream* s);
936 static const maybe_complete_func_type maybe_complete_funcs[] = {
937     grpc_chttp2_maybe_complete_recv_initial_metadata,
938     grpc_chttp2_maybe_complete_recv_trailing_metadata};
939 
force_client_rst_stream(void * sp,grpc_error_handle)940 static void force_client_rst_stream(void* sp, grpc_error_handle /*error*/) {
941   grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(sp);
942   grpc_chttp2_transport* t = s->t.get();
943   if (!s->write_closed) {
944     grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR,
945                                              &s->call_tracer_wrapper);
946     grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM);
947     grpc_chttp2_mark_stream_closed(t, s, true, true, absl::OkStatus());
948   }
949   GRPC_CHTTP2_STREAM_UNREF(s, "final_rst");
950 }
951 
grpc_chttp2_header_parser_parse(void * hpack_parser,grpc_chttp2_transport * t,grpc_chttp2_stream * s,const grpc_slice & slice,int is_last)952 grpc_error_handle grpc_chttp2_header_parser_parse(void* hpack_parser,
953                                                   grpc_chttp2_transport* t,
954                                                   grpc_chttp2_stream* s,
955                                                   const grpc_slice& slice,
956                                                   int is_last) {
957   auto* parser = static_cast<grpc_core::HPackParser*>(hpack_parser);
958   grpc_core::CallTracerAnnotationInterface* call_tracer = nullptr;
959   if (s != nullptr) {
960     s->call_tracer_wrapper.RecordIncomingBytes(
961         {0, 0, GRPC_SLICE_LENGTH(slice)});
962     call_tracer =
963         grpc_core::IsCallTracerInTransportEnabled()
964             ? s->arena->GetContext<grpc_core::CallTracerInterface>()
965             : s->arena->GetContext<grpc_core::CallTracerAnnotationInterface>();
966   }
967   grpc_error_handle error = parser->Parse(
968       slice, is_last != 0, absl::BitGenRef(t->bitgen), call_tracer);
969   if (!error.ok()) {
970     return error;
971   }
972   if (is_last) {
973     // need to check for null stream: this can occur if we receive an invalid
974     // stream id on a header
975     if (s != nullptr) {
976       if (parser->is_boundary()) {
977         if (s->header_frames_received == 2) {
978           return GRPC_ERROR_CREATE("Too many trailer frames");
979         }
980         s->published_metadata[s->header_frames_received] =
981             GRPC_METADATA_PUBLISHED_FROM_WIRE;
982         maybe_complete_funcs[s->header_frames_received](t, s);
983         s->header_frames_received++;
984       }
985       if (parser->is_eof()) {
986         if (t->is_client && !s->write_closed) {
987           // server eof ==> complete closure; we may need to forcefully close
988           // the stream. Wait until the combiner lock is ready to be released
989           // however -- it might be that we receive a RST_STREAM following this
990           // and can avoid the extra write
991           GRPC_CHTTP2_STREAM_REF(s, "final_rst");
992           t->combiner->FinallyRun(
993               GRPC_CLOSURE_CREATE(force_client_rst_stream, s, nullptr),
994               absl::OkStatus());
995         }
996         grpc_chttp2_mark_stream_closed(t, s, true, false, absl::OkStatus());
997       }
998     }
999     parser->FinishFrame();
1000   }
1001   return absl::OkStatus();
1002 }
1003