1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpc/slice.h>
21 #include <grpc/slice_buffer.h>
22 #include <grpc/support/port_platform.h>
23 #include <inttypes.h>
24 #include <string.h>
25
26 #include <atomic>
27 #include <initializer_list>
28 #include <limits>
29 #include <memory>
30 #include <string>
31 #include <utility>
32
33 #include "absl/base/attributes.h"
34 #include "absl/container/flat_hash_map.h"
35 #include "absl/log/check.h"
36 #include "absl/log/log.h"
37 #include "absl/random/bit_gen_ref.h"
38 #include "absl/status/status.h"
39 #include "absl/strings/str_cat.h"
40 #include "absl/strings/str_format.h"
41 #include "absl/strings/string_view.h"
42 #include "absl/types/variant.h"
43 #include "src/core/channelz/channelz.h"
44 #include "src/core/ext/transport/chttp2/transport/call_tracer_wrapper.h"
45 #include "src/core/ext/transport/chttp2/transport/flow_control.h"
46 #include "src/core/ext/transport/chttp2/transport/frame_data.h"
47 #include "src/core/ext/transport/chttp2/transport/frame_goaway.h"
48 #include "src/core/ext/transport/chttp2/transport/frame_ping.h"
49 #include "src/core/ext/transport/chttp2/transport/frame_rst_stream.h"
50 #include "src/core/ext/transport/chttp2/transport/frame_security.h"
51 #include "src/core/ext/transport/chttp2/transport/frame_settings.h"
52 #include "src/core/ext/transport/chttp2/transport/frame_window_update.h"
53 #include "src/core/ext/transport/chttp2/transport/hpack_parser.h"
54 #include "src/core/ext/transport/chttp2/transport/hpack_parser_table.h"
55 #include "src/core/ext/transport/chttp2/transport/http2_settings.h"
56 #include "src/core/ext/transport/chttp2/transport/internal.h"
57 #include "src/core/ext/transport/chttp2/transport/legacy_frame.h"
58 #include "src/core/ext/transport/chttp2/transport/ping_rate_policy.h"
59 #include "src/core/lib/debug/trace.h"
60 #include "src/core/lib/experiments/experiments.h"
61 #include "src/core/lib/iomgr/closure.h"
62 #include "src/core/lib/iomgr/combiner.h"
63 #include "src/core/lib/iomgr/endpoint.h"
64 #include "src/core/lib/iomgr/error.h"
65 #include "src/core/lib/slice/slice.h"
66 #include "src/core/lib/transport/bdp_estimator.h"
67 #include "src/core/lib/transport/error_utils.h"
68 #include "src/core/lib/transport/http2_errors.h"
69 #include "src/core/lib/transport/metadata_batch.h"
70 #include "src/core/lib/transport/transport.h"
71 #include "src/core/telemetry/call_tracer.h"
72 #include "src/core/telemetry/stats.h"
73 #include "src/core/telemetry/stats_data.h"
74 #include "src/core/util/random_early_detection.h"
75 #include "src/core/util/ref_counted_ptr.h"
76 #include "src/core/util/status_helper.h"
77
78 using grpc_core::HPackParser;
79
80 static grpc_error_handle init_frame_parser(grpc_chttp2_transport* t,
81 size_t& requests_started);
82 static grpc_error_handle init_header_frame_parser(grpc_chttp2_transport* t,
83 int is_continuation,
84 size_t& requests_started);
85 static grpc_error_handle init_data_frame_parser(grpc_chttp2_transport* t);
86 static grpc_error_handle init_rst_stream_parser(grpc_chttp2_transport* t);
87 static grpc_error_handle init_settings_frame_parser(grpc_chttp2_transport* t);
88 static grpc_error_handle init_window_update_frame_parser(
89 grpc_chttp2_transport* t);
90 static grpc_error_handle init_ping_parser(grpc_chttp2_transport* t);
91 static grpc_error_handle init_goaway_parser(grpc_chttp2_transport* t);
92 static grpc_error_handle init_security_frame_parser(grpc_chttp2_transport* t);
93 static grpc_error_handle init_non_header_skip_frame_parser(
94 grpc_chttp2_transport* t);
95
96 static grpc_error_handle parse_frame_slice(grpc_chttp2_transport* t,
97 const grpc_slice& slice,
98 int is_last);
99
get_utf8_safe_char(char c)100 static char get_utf8_safe_char(char c) {
101 return static_cast<unsigned char>(c) < 128 ? c : 32;
102 }
103
grpc_chttp2_min_read_progress_size(grpc_chttp2_transport * t)104 uint32_t grpc_chttp2_min_read_progress_size(grpc_chttp2_transport* t) {
105 switch (t->deframe_state) {
106 case GRPC_DTS_CLIENT_PREFIX_0:
107 case GRPC_DTS_CLIENT_PREFIX_1:
108 case GRPC_DTS_CLIENT_PREFIX_2:
109 case GRPC_DTS_CLIENT_PREFIX_3:
110 case GRPC_DTS_CLIENT_PREFIX_4:
111 case GRPC_DTS_CLIENT_PREFIX_5:
112 case GRPC_DTS_CLIENT_PREFIX_6:
113 case GRPC_DTS_CLIENT_PREFIX_7:
114 case GRPC_DTS_CLIENT_PREFIX_8:
115 case GRPC_DTS_CLIENT_PREFIX_9:
116 case GRPC_DTS_CLIENT_PREFIX_10:
117 case GRPC_DTS_CLIENT_PREFIX_11:
118 case GRPC_DTS_CLIENT_PREFIX_12:
119 case GRPC_DTS_CLIENT_PREFIX_13:
120 case GRPC_DTS_CLIENT_PREFIX_14:
121 case GRPC_DTS_CLIENT_PREFIX_15:
122 case GRPC_DTS_CLIENT_PREFIX_16:
123 case GRPC_DTS_CLIENT_PREFIX_17:
124 case GRPC_DTS_CLIENT_PREFIX_18:
125 case GRPC_DTS_CLIENT_PREFIX_19:
126 case GRPC_DTS_CLIENT_PREFIX_20:
127 case GRPC_DTS_CLIENT_PREFIX_21:
128 case GRPC_DTS_CLIENT_PREFIX_22:
129 case GRPC_DTS_CLIENT_PREFIX_23:
130 // Need the client prefix *and* the first fixed header to make progress.
131 return 9 + 24 - (t->deframe_state - GRPC_DTS_CLIENT_PREFIX_0);
132 case GRPC_DTS_FH_0:
133 case GRPC_DTS_FH_1:
134 case GRPC_DTS_FH_2:
135 case GRPC_DTS_FH_3:
136 case GRPC_DTS_FH_4:
137 case GRPC_DTS_FH_5:
138 case GRPC_DTS_FH_6:
139 case GRPC_DTS_FH_7:
140 case GRPC_DTS_FH_8:
141 return 9 - (t->deframe_state - GRPC_DTS_FH_0);
142 case GRPC_DTS_FRAME:
143 return t->incoming_frame_size;
144 }
145 GPR_UNREACHABLE_CODE(return 1);
146 }
147
148 namespace {
149 struct KnownFlag {
150 uint8_t flag;
151 absl::string_view name;
152 };
153
MakeFrameTypeString(absl::string_view frame_type,uint8_t flags,std::initializer_list<KnownFlag> known_flags)154 std::string MakeFrameTypeString(absl::string_view frame_type, uint8_t flags,
155 std::initializer_list<KnownFlag> known_flags) {
156 std::string result(frame_type);
157 for (const KnownFlag& known_flag : known_flags) {
158 if (flags & known_flag.flag) {
159 absl::StrAppend(&result, ":", known_flag.name);
160 flags &= ~known_flag.flag;
161 }
162 }
163 if (flags != 0) {
164 absl::StrAppend(&result, ":UNKNOWN_FLAGS=0x",
165 absl::Hex(flags, absl::kZeroPad2));
166 }
167 return result;
168 }
169
FrameTypeString(uint8_t frame_type,uint8_t flags)170 std::string FrameTypeString(uint8_t frame_type, uint8_t flags) {
171 switch (frame_type) {
172 case GRPC_CHTTP2_FRAME_DATA:
173 return MakeFrameTypeString(
174 "DATA", flags, {{GRPC_CHTTP2_DATA_FLAG_END_STREAM, "END_STREAM"}});
175 case GRPC_CHTTP2_FRAME_HEADER:
176 return MakeFrameTypeString(
177 "HEADERS", flags,
178 {{GRPC_CHTTP2_DATA_FLAG_END_STREAM, "END_STREAM"},
179 {GRPC_CHTTP2_DATA_FLAG_END_HEADERS, "END_HEADERS"},
180 {GRPC_CHTTP2_FLAG_HAS_PRIORITY, "PRIORITY"}});
181 case GRPC_CHTTP2_FRAME_CONTINUATION:
182 return MakeFrameTypeString(
183 "HEADERS", flags,
184 {{GRPC_CHTTP2_DATA_FLAG_END_STREAM, "END_STREAM"},
185 {GRPC_CHTTP2_DATA_FLAG_END_HEADERS, "END_HEADERS"},
186 {GRPC_CHTTP2_FLAG_HAS_PRIORITY, "PRIORITY"}});
187 case GRPC_CHTTP2_FRAME_RST_STREAM:
188 return MakeFrameTypeString("RST_STREAM", flags, {});
189 case GRPC_CHTTP2_FRAME_SETTINGS:
190 return MakeFrameTypeString("SETTINGS", flags,
191 {{GRPC_CHTTP2_FLAG_ACK, "ACK"}});
192 case GRPC_CHTTP2_FRAME_PING:
193 return MakeFrameTypeString("PING", flags,
194 {{GRPC_CHTTP2_FLAG_ACK, "ACK"}});
195 case GRPC_CHTTP2_FRAME_GOAWAY:
196 return MakeFrameTypeString("GOAWAY", flags, {});
197 case GRPC_CHTTP2_FRAME_WINDOW_UPDATE:
198 return MakeFrameTypeString("WINDOW_UPDATE", flags, {});
199 case GRPC_CHTTP2_FRAME_SECURITY:
200 return MakeFrameTypeString("SECURITY", flags, {});
201 default:
202 return MakeFrameTypeString(
203 absl::StrCat("UNKNOWN_FRAME_TYPE_", static_cast<int>(frame_type)),
204 flags, {});
205 }
206 }
207 } // namespace
208
grpc_chttp2_perform_read(grpc_chttp2_transport * t,const grpc_slice & slice,size_t & requests_started)209 absl::variant<size_t, absl::Status> grpc_chttp2_perform_read(
210 grpc_chttp2_transport* t, const grpc_slice& slice,
211 size_t& requests_started) {
212 GRPC_LATENT_SEE_INNER_SCOPE("grpc_chttp2_perform_read");
213
214 const uint8_t* beg = GRPC_SLICE_START_PTR(slice);
215 const uint8_t* end = GRPC_SLICE_END_PTR(slice);
216 const uint8_t* cur = beg;
217 grpc_error_handle err;
218
219 if (cur == end) return absl::OkStatus();
220
221 switch (t->deframe_state) {
222 case GRPC_DTS_CLIENT_PREFIX_0:
223 case GRPC_DTS_CLIENT_PREFIX_1:
224 case GRPC_DTS_CLIENT_PREFIX_2:
225 case GRPC_DTS_CLIENT_PREFIX_3:
226 case GRPC_DTS_CLIENT_PREFIX_4:
227 case GRPC_DTS_CLIENT_PREFIX_5:
228 case GRPC_DTS_CLIENT_PREFIX_6:
229 case GRPC_DTS_CLIENT_PREFIX_7:
230 case GRPC_DTS_CLIENT_PREFIX_8:
231 case GRPC_DTS_CLIENT_PREFIX_9:
232 case GRPC_DTS_CLIENT_PREFIX_10:
233 case GRPC_DTS_CLIENT_PREFIX_11:
234 case GRPC_DTS_CLIENT_PREFIX_12:
235 case GRPC_DTS_CLIENT_PREFIX_13:
236 case GRPC_DTS_CLIENT_PREFIX_14:
237 case GRPC_DTS_CLIENT_PREFIX_15:
238 case GRPC_DTS_CLIENT_PREFIX_16:
239 case GRPC_DTS_CLIENT_PREFIX_17:
240 case GRPC_DTS_CLIENT_PREFIX_18:
241 case GRPC_DTS_CLIENT_PREFIX_19:
242 case GRPC_DTS_CLIENT_PREFIX_20:
243 case GRPC_DTS_CLIENT_PREFIX_21:
244 case GRPC_DTS_CLIENT_PREFIX_22:
245 case GRPC_DTS_CLIENT_PREFIX_23:
246 while (cur != end && t->deframe_state != GRPC_DTS_FH_0) {
247 if (*cur != GRPC_CHTTP2_CLIENT_CONNECT_STRING[t->deframe_state]) {
248 return GRPC_ERROR_CREATE(absl::StrFormat(
249 "Connect string mismatch: expected '%c' (%d) got '%c' (%d) "
250 "at byte %d",
251 get_utf8_safe_char(
252 GRPC_CHTTP2_CLIENT_CONNECT_STRING[t->deframe_state]),
253 static_cast<int>(static_cast<uint8_t>(
254 GRPC_CHTTP2_CLIENT_CONNECT_STRING[t->deframe_state])),
255 get_utf8_safe_char(*cur), static_cast<int>(*cur),
256 t->deframe_state));
257 }
258 ++cur;
259 // NOLINTNEXTLINE(bugprone-misplaced-widening-cast)
260 t->deframe_state = static_cast<grpc_chttp2_deframe_transport_state>(
261 1 + static_cast<int>(t->deframe_state));
262 }
263 if (cur == end) {
264 return absl::OkStatus();
265 }
266 dts_fh_0:
267 if (requests_started >= t->max_requests_per_read) {
268 t->deframe_state = GRPC_DTS_FH_0;
269 return static_cast<size_t>(cur - beg);
270 }
271 ABSL_FALLTHROUGH_INTENDED;
272 case GRPC_DTS_FH_0:
273 DCHECK_LT(cur, end);
274 t->incoming_frame_size = (static_cast<uint32_t>(*cur)) << 16;
275 if (++cur == end) {
276 t->deframe_state = GRPC_DTS_FH_1;
277 return absl::OkStatus();
278 }
279 ABSL_FALLTHROUGH_INTENDED;
280 case GRPC_DTS_FH_1:
281 DCHECK_LT(cur, end);
282 t->incoming_frame_size |= (static_cast<uint32_t>(*cur)) << 8;
283 if (++cur == end) {
284 t->deframe_state = GRPC_DTS_FH_2;
285 return absl::OkStatus();
286 }
287 ABSL_FALLTHROUGH_INTENDED;
288 case GRPC_DTS_FH_2:
289 DCHECK_LT(cur, end);
290 t->incoming_frame_size |= *cur;
291 if (++cur == end) {
292 t->deframe_state = GRPC_DTS_FH_3;
293 return absl::OkStatus();
294 }
295 ABSL_FALLTHROUGH_INTENDED;
296 case GRPC_DTS_FH_3:
297 DCHECK_LT(cur, end);
298 t->incoming_frame_type = *cur;
299 if (++cur == end) {
300 t->deframe_state = GRPC_DTS_FH_4;
301 return absl::OkStatus();
302 }
303 ABSL_FALLTHROUGH_INTENDED;
304 case GRPC_DTS_FH_4:
305 DCHECK_LT(cur, end);
306 t->incoming_frame_flags = *cur;
307 if (++cur == end) {
308 t->deframe_state = GRPC_DTS_FH_5;
309 return absl::OkStatus();
310 }
311 ABSL_FALLTHROUGH_INTENDED;
312 case GRPC_DTS_FH_5:
313 DCHECK_LT(cur, end);
314 t->incoming_stream_id = ((static_cast<uint32_t>(*cur)) & 0x7f) << 24;
315 if (++cur == end) {
316 t->deframe_state = GRPC_DTS_FH_6;
317 return absl::OkStatus();
318 }
319 ABSL_FALLTHROUGH_INTENDED;
320 case GRPC_DTS_FH_6:
321 DCHECK_LT(cur, end);
322 t->incoming_stream_id |= (static_cast<uint32_t>(*cur)) << 16;
323 if (++cur == end) {
324 t->deframe_state = GRPC_DTS_FH_7;
325 return absl::OkStatus();
326 }
327 ABSL_FALLTHROUGH_INTENDED;
328 case GRPC_DTS_FH_7:
329 DCHECK_LT(cur, end);
330 t->incoming_stream_id |= (static_cast<uint32_t>(*cur)) << 8;
331 if (++cur == end) {
332 t->deframe_state = GRPC_DTS_FH_8;
333 return absl::OkStatus();
334 }
335 ABSL_FALLTHROUGH_INTENDED;
336 case GRPC_DTS_FH_8:
337 DCHECK_LT(cur, end);
338 t->incoming_stream_id |= (static_cast<uint32_t>(*cur));
339 GRPC_TRACE_LOG(http, INFO)
340 << "INCOMING[" << t << "]: "
341 << FrameTypeString(t->incoming_frame_type, t->incoming_frame_flags)
342 << " len:" << t->incoming_frame_size
343 << absl::StrFormat(" id:0x%08x", t->incoming_stream_id);
344 t->deframe_state = GRPC_DTS_FRAME;
345 err = init_frame_parser(t, requests_started);
346 if (!err.ok()) {
347 return err;
348 }
349 if (t->incoming_frame_size == 0) {
350 err = parse_frame_slice(t, grpc_empty_slice(), 1);
351 if (!err.ok()) {
352 return err;
353 }
354 t->incoming_stream = nullptr;
355 if (++cur == end) {
356 t->deframe_state = GRPC_DTS_FH_0;
357 return absl::OkStatus();
358 }
359 goto dts_fh_0; // loop
360 } else if (t->incoming_frame_size >
361 t->settings.acked().max_frame_size()) {
362 return GRPC_ERROR_CREATE(absl::StrFormat(
363 "Frame size %d is larger than max frame size %d",
364 t->incoming_frame_size, t->settings.acked().max_frame_size()));
365 }
366 if (++cur == end) {
367 return absl::OkStatus();
368 }
369 ABSL_FALLTHROUGH_INTENDED;
370 case GRPC_DTS_FRAME:
371 DCHECK_LT(cur, end);
372 if (static_cast<uint32_t>(end - cur) == t->incoming_frame_size) {
373 err = parse_frame_slice(
374 t,
375 grpc_slice_sub_no_ref(slice, static_cast<size_t>(cur - beg),
376 static_cast<size_t>(end - beg)),
377 1);
378 if (!err.ok()) {
379 return err;
380 }
381 t->deframe_state = GRPC_DTS_FH_0;
382 t->incoming_stream = nullptr;
383 return absl::OkStatus();
384 } else if (static_cast<uint32_t>(end - cur) > t->incoming_frame_size) {
385 size_t cur_offset = static_cast<size_t>(cur - beg);
386 err = parse_frame_slice(
387 t,
388 grpc_slice_sub_no_ref(slice, cur_offset,
389 cur_offset + t->incoming_frame_size),
390 1);
391 if (!err.ok()) {
392 return err;
393 }
394 cur += t->incoming_frame_size;
395 t->incoming_stream = nullptr;
396 if (t->incoming_frame_type == GRPC_CHTTP2_FRAME_RST_STREAM) {
397 requests_started = std::numeric_limits<size_t>::max();
398 }
399 goto dts_fh_0; // loop
400 } else {
401 err = parse_frame_slice(
402 t,
403 grpc_slice_sub_no_ref(slice, static_cast<size_t>(cur - beg),
404 static_cast<size_t>(end - beg)),
405 0);
406 if (!err.ok()) {
407 return err;
408 }
409 t->incoming_frame_size -= static_cast<uint32_t>(end - cur);
410 return absl::OkStatus();
411 }
412 GPR_UNREACHABLE_CODE(return absl::OkStatus());
413 }
414
415 GPR_UNREACHABLE_CODE(return absl::OkStatus());
416 }
417
init_frame_parser(grpc_chttp2_transport * t,size_t & requests_started)418 static grpc_error_handle init_frame_parser(grpc_chttp2_transport* t,
419 size_t& requests_started) {
420 if (t->is_first_frame &&
421 t->incoming_frame_type != GRPC_CHTTP2_FRAME_SETTINGS) {
422 return GRPC_ERROR_CREATE(absl::StrCat(
423 "Expected SETTINGS frame as the first frame, got frame type ",
424 t->incoming_frame_type));
425 }
426 t->is_first_frame = false;
427 if (t->expect_continuation_stream_id != 0) {
428 if (t->incoming_frame_type != GRPC_CHTTP2_FRAME_CONTINUATION) {
429 return GRPC_ERROR_CREATE(
430 absl::StrFormat("Expected CONTINUATION frame, got frame type %02x",
431 t->incoming_frame_type));
432 }
433 if (t->expect_continuation_stream_id != t->incoming_stream_id) {
434 return GRPC_ERROR_CREATE(absl::StrFormat(
435 "Expected CONTINUATION frame for grpc_chttp2_stream %08x, got "
436 "grpc_chttp2_stream %08x",
437 t->expect_continuation_stream_id, t->incoming_stream_id));
438 }
439 return init_header_frame_parser(t, 1, requests_started);
440 }
441 switch (t->incoming_frame_type) {
442 case GRPC_CHTTP2_FRAME_DATA:
443 return init_data_frame_parser(t);
444 case GRPC_CHTTP2_FRAME_HEADER:
445 return init_header_frame_parser(t, 0, requests_started);
446 case GRPC_CHTTP2_FRAME_CONTINUATION:
447 return GRPC_ERROR_CREATE("Unexpected CONTINUATION frame");
448 case GRPC_CHTTP2_FRAME_RST_STREAM:
449 return init_rst_stream_parser(t);
450 case GRPC_CHTTP2_FRAME_SETTINGS:
451 return init_settings_frame_parser(t);
452 case GRPC_CHTTP2_FRAME_WINDOW_UPDATE:
453 return init_window_update_frame_parser(t);
454 case GRPC_CHTTP2_FRAME_PING:
455 return init_ping_parser(t);
456 case GRPC_CHTTP2_FRAME_GOAWAY:
457 return init_goaway_parser(t);
458 case GRPC_CHTTP2_FRAME_SECURITY:
459 if (!t->settings.peer().allow_security_frame()) {
460 if (GRPC_TRACE_FLAG_ENABLED(http)) {
461 LOG(ERROR) << "Security frame received but not allowed, ignoring";
462 }
463 return init_non_header_skip_frame_parser(t);
464 }
465 return init_security_frame_parser(t);
466 default:
467 GRPC_TRACE_LOG(http, ERROR)
468 << "Unknown frame type "
469 << absl::StrFormat("%02x", t->incoming_frame_type);
470 return init_non_header_skip_frame_parser(t);
471 }
472 }
473
skip_parser(void *,grpc_chttp2_transport *,grpc_chttp2_stream *,const grpc_slice &,int)474 static grpc_error_handle skip_parser(void* /*parser*/,
475 grpc_chttp2_transport* /*t*/,
476 grpc_chttp2_stream* /*s*/,
477 const grpc_slice& /*slice*/,
478 int /*is_last*/) {
479 return absl::OkStatus();
480 }
481
hpack_boundary_type(grpc_chttp2_transport * t,bool is_eoh)482 static HPackParser::Boundary hpack_boundary_type(grpc_chttp2_transport* t,
483 bool is_eoh) {
484 if (is_eoh) {
485 if (t->header_eof) {
486 return HPackParser::Boundary::EndOfStream;
487 } else {
488 return HPackParser::Boundary::EndOfHeaders;
489 }
490 } else {
491 return HPackParser::Boundary::None;
492 }
493 }
494
hpack_parser_log_info(grpc_chttp2_transport * t,HPackParser::LogInfo::Type type)495 static HPackParser::LogInfo hpack_parser_log_info(
496 grpc_chttp2_transport* t, HPackParser::LogInfo::Type type) {
497 return HPackParser::LogInfo{
498 t->incoming_stream_id,
499 type,
500 t->is_client,
501 };
502 }
503
init_header_skip_frame_parser(grpc_chttp2_transport * t,HPackParser::Priority priority_type,bool is_eoh)504 static grpc_error_handle init_header_skip_frame_parser(
505 grpc_chttp2_transport* t, HPackParser::Priority priority_type,
506 bool is_eoh) {
507 t->parser = grpc_chttp2_transport::Parser{
508 "header", grpc_chttp2_header_parser_parse, &t->hpack_parser};
509 t->hpack_parser.BeginFrame(
510 nullptr,
511 /*metadata_size_soft_limit=*/
512 t->max_header_list_size_soft_limit,
513 /*metadata_size_hard_limit=*/
514 t->settings.acked().max_header_list_size(),
515 hpack_boundary_type(t, is_eoh), priority_type,
516 hpack_parser_log_info(t, HPackParser::LogInfo::kDontKnow));
517 return absl::OkStatus();
518 }
519
init_non_header_skip_frame_parser(grpc_chttp2_transport * t)520 static grpc_error_handle init_non_header_skip_frame_parser(
521 grpc_chttp2_transport* t) {
522 t->parser =
523 grpc_chttp2_transport::Parser{"skip_parser", skip_parser, nullptr};
524 return absl::OkStatus();
525 }
526
grpc_chttp2_parsing_become_skip_parser(grpc_chttp2_transport * t)527 void grpc_chttp2_parsing_become_skip_parser(grpc_chttp2_transport* t) {
528 if (t->parser.parser == grpc_chttp2_header_parser_parse) {
529 t->hpack_parser.StopBufferingFrame();
530 } else {
531 t->parser =
532 grpc_chttp2_transport::Parser{"skip_parser", skip_parser, nullptr};
533 }
534 }
535
init_data_frame_parser(grpc_chttp2_transport * t)536 static grpc_error_handle init_data_frame_parser(grpc_chttp2_transport* t) {
537 // Update BDP accounting since we have received a data frame.
538 grpc_core::BdpEstimator* bdp_est = t->flow_control.bdp_estimator();
539 if (bdp_est) {
540 if (t->bdp_ping_blocked) {
541 t->bdp_ping_blocked = false;
542 schedule_bdp_ping_locked(t->Ref());
543 }
544 bdp_est->AddIncomingBytes(t->incoming_frame_size);
545 }
546 grpc_chttp2_stream* s =
547 grpc_chttp2_parsing_lookup_stream(t, t->incoming_stream_id);
548 absl::Status status;
549 grpc_core::chttp2::FlowControlAction action;
550 if (s == nullptr) {
551 grpc_core::chttp2::TransportFlowControl::IncomingUpdateContext upd(
552 &t->flow_control);
553 status = upd.RecvData(t->incoming_frame_size);
554 action = upd.MakeAction();
555 } else {
556 grpc_core::chttp2::StreamFlowControl::IncomingUpdateContext upd(
557 &s->flow_control);
558 status = upd.RecvData(t->incoming_frame_size);
559 action = upd.MakeAction();
560 }
561 grpc_chttp2_act_on_flowctl_action(action, t, s);
562 if (!status.ok()) {
563 goto error_handler;
564 }
565 if (s == nullptr) {
566 return init_non_header_skip_frame_parser(t);
567 }
568 s->received_bytes += t->incoming_frame_size;
569 s->call_tracer_wrapper.RecordIncomingBytes({9, 0, 0});
570 if (s->read_closed) {
571 return init_non_header_skip_frame_parser(t);
572 }
573 status =
574 grpc_chttp2_data_parser_begin_frame(t->incoming_frame_flags, s->id, s);
575 error_handler:
576 if (status.ok()) {
577 t->incoming_stream = s;
578 t->parser = grpc_chttp2_transport::Parser{
579 "data", grpc_chttp2_data_parser_parse, nullptr};
580 t->ping_rate_policy.ReceivedDataFrame();
581 return absl::OkStatus();
582 } else if (s != nullptr) {
583 // handle stream errors by closing the stream
584 grpc_chttp2_mark_stream_closed(t, s, true, false,
585 absl_status_to_grpc_error(status));
586 grpc_chttp2_add_rst_stream_to_next_write(t, t->incoming_stream_id,
587 GRPC_HTTP2_PROTOCOL_ERROR,
588 &s->call_tracer_wrapper);
589 return init_non_header_skip_frame_parser(t);
590 } else {
591 return absl_status_to_grpc_error(status);
592 }
593 }
594
init_header_frame_parser(grpc_chttp2_transport * t,int is_continuation,size_t & requests_started)595 static grpc_error_handle init_header_frame_parser(grpc_chttp2_transport* t,
596 int is_continuation,
597 size_t& requests_started) {
598 const bool is_eoh =
599 (t->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_HEADERS) != 0;
600 grpc_chttp2_stream* s;
601
602 // TODO(ctiller): when to increment header_frames_received?
603
604 if (is_eoh) {
605 t->expect_continuation_stream_id = 0;
606 } else {
607 t->expect_continuation_stream_id = t->incoming_stream_id;
608 }
609
610 if (!is_continuation) {
611 t->header_eof =
612 (t->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) != 0;
613 }
614
615 const auto priority_type = !is_continuation && (t->incoming_frame_flags &
616 GRPC_CHTTP2_FLAG_HAS_PRIORITY)
617 ? HPackParser::Priority::Included
618 : HPackParser::Priority::None;
619
620 t->ping_rate_policy.ReceivedDataFrame();
621
622 // could be a new grpc_chttp2_stream or an existing grpc_chttp2_stream
623 s = grpc_chttp2_parsing_lookup_stream(t, t->incoming_stream_id);
624 if (s == nullptr) {
625 if (GPR_UNLIKELY(is_continuation)) {
626 GRPC_CHTTP2_IF_TRACING(ERROR)
627 << "grpc_chttp2_stream disbanded before CONTINUATION received";
628 return init_header_skip_frame_parser(t, priority_type, is_eoh);
629 }
630 if (t->is_client) {
631 if (GPR_LIKELY((t->incoming_stream_id & 1) &&
632 t->incoming_stream_id < t->next_stream_id)) {
633 // this is an old (probably cancelled) grpc_chttp2_stream
634 } else {
635 GRPC_CHTTP2_IF_TRACING(ERROR)
636 << "ignoring new grpc_chttp2_stream creation on client";
637 }
638 return init_header_skip_frame_parser(t, priority_type, is_eoh);
639 } else if (GPR_UNLIKELY(t->last_new_stream_id >= t->incoming_stream_id)) {
640 GRPC_CHTTP2_IF_TRACING(ERROR)
641 << "ignoring out of order new grpc_chttp2_stream request on server; "
642 "last grpc_chttp2_stream id="
643 << t->last_new_stream_id
644 << ", new grpc_chttp2_stream id=" << t->incoming_stream_id;
645 return init_header_skip_frame_parser(t, priority_type, is_eoh);
646 } else if (GPR_UNLIKELY((t->incoming_stream_id & 1) == 0)) {
647 GRPC_CHTTP2_IF_TRACING(ERROR)
648 << "ignoring grpc_chttp2_stream with non-client generated index "
649 << t->incoming_stream_id;
650 return init_header_skip_frame_parser(t, priority_type, is_eoh);
651 } else if (GPR_UNLIKELY(t->stream_map.size() + t->extra_streams >=
652 t->settings.acked().max_concurrent_streams())) {
653 ++t->num_pending_induced_frames;
654 grpc_slice_buffer_add(&t->qbuf, grpc_chttp2_rst_stream_create(
655 t->incoming_stream_id,
656 GRPC_HTTP2_REFUSED_STREAM, nullptr));
657 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
658 return init_header_skip_frame_parser(t, priority_type, is_eoh);
659 } else if (grpc_core::IsRqFastRejectEnabled() &&
660 GPR_UNLIKELY(t->memory_owner.IsMemoryPressureHigh())) {
661 // We have more streams allocated than we'd like, so apply some pushback
662 // by refusing this stream.
663 grpc_core::global_stats().IncrementRqCallsRejected();
664 ++t->num_pending_induced_frames;
665 grpc_slice_buffer_add(
666 &t->qbuf,
667 grpc_chttp2_rst_stream_create(t->incoming_stream_id,
668 GRPC_HTTP2_ENHANCE_YOUR_CALM, nullptr));
669 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
670 return init_header_skip_frame_parser(t, priority_type, is_eoh);
671 } else if (GPR_UNLIKELY(
672 t->max_concurrent_streams_overload_protection &&
673 t->streams_allocated.load(std::memory_order_relaxed) >
674 t->settings.local().max_concurrent_streams())) {
675 // We have more streams allocated than we'd like, so apply some pushback
676 // by refusing this stream.
677 ++t->num_pending_induced_frames;
678 grpc_slice_buffer_add(&t->qbuf, grpc_chttp2_rst_stream_create(
679 t->incoming_stream_id,
680 GRPC_HTTP2_REFUSED_STREAM, nullptr));
681 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
682 return init_header_skip_frame_parser(t, priority_type, is_eoh);
683 } else if (GPR_UNLIKELY(t->stream_map.size() >=
684 t->settings.local().max_concurrent_streams() &&
685 grpc_core::RandomEarlyDetection(
686 t->settings.local().max_concurrent_streams(),
687 t->settings.acked().max_concurrent_streams())
688 .Reject(t->stream_map.size(), t->bitgen))) {
689 // We are under the limit of max concurrent streams for the current
690 // setting, but are over the next value that will be advertised.
691 // Apply some backpressure by randomly not accepting new streams.
692 ++t->num_pending_induced_frames;
693 grpc_slice_buffer_add(&t->qbuf, grpc_chttp2_rst_stream_create(
694 t->incoming_stream_id,
695 GRPC_HTTP2_REFUSED_STREAM, nullptr));
696 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
697 return init_header_skip_frame_parser(t, priority_type, is_eoh);
698 } else if (t->sent_goaway_state == GRPC_CHTTP2_FINAL_GOAWAY_SENT ||
699 t->sent_goaway_state ==
700 GRPC_CHTTP2_FINAL_GOAWAY_SEND_SCHEDULED) {
701 GRPC_CHTTP2_IF_TRACING(INFO)
702 << "transport:" << t
703 << " SERVER peer:" << t->peer_string.as_string_view()
704 << " Final GOAWAY sent. Ignoring new grpc_chttp2_stream request "
705 "id="
706 << t->incoming_stream_id
707 << ", last grpc_chttp2_stream id=" << t->last_new_stream_id;
708 ;
709 return init_header_skip_frame_parser(t, priority_type, is_eoh);
710 } else if (t->num_incoming_streams_before_settings_ack == 0) {
711 GRPC_CHTTP2_IF_TRACING(ERROR)
712 << "transport:" << t
713 << " SERVER peer:" << t->peer_string.as_string_view()
714 << " rejecting grpc_chttp2_stream id=" << t->incoming_stream_id
715 << ", last grpc_chttp2_stream id=" << t->last_new_stream_id
716 << " before settings have been acknowledged";
717 ++t->num_pending_induced_frames;
718 grpc_slice_buffer_add(
719 &t->qbuf,
720 grpc_chttp2_rst_stream_create(t->incoming_stream_id,
721 GRPC_HTTP2_ENHANCE_YOUR_CALM, nullptr));
722 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
723 t->last_new_stream_id = t->incoming_stream_id;
724 return init_header_skip_frame_parser(t, priority_type, is_eoh);
725 }
726 --t->num_incoming_streams_before_settings_ack;
727 t->last_new_stream_id = t->incoming_stream_id;
728 s = t->incoming_stream =
729 grpc_chttp2_parsing_accept_stream(t, t->incoming_stream_id);
730 ++requests_started;
731 if (GPR_UNLIKELY(s == nullptr)) {
732 GRPC_CHTTP2_IF_TRACING(ERROR) << "grpc_chttp2_stream not accepted";
733 return init_header_skip_frame_parser(t, priority_type, is_eoh);
734 }
735 if (GRPC_TRACE_FLAG_ENABLED(http) ||
736 GRPC_TRACE_FLAG_ENABLED(chttp2_new_stream)) {
737 LOG(INFO) << "[t:" << t << " fd:" << grpc_endpoint_get_fd(t->ep.get())
738 << " peer:" << t->peer_string.as_string_view()
739 << "] Accepting new stream; "
740 "num_incoming_streams_before_settings_ack="
741 << t->num_incoming_streams_before_settings_ack;
742 }
743 if (t->channelz_socket != nullptr) {
744 t->channelz_socket->RecordStreamStartedFromRemote();
745 }
746 } else {
747 t->incoming_stream = s;
748 }
749 DCHECK_NE(s, nullptr);
750 s->call_tracer_wrapper.RecordIncomingBytes({9, 0, 0});
751 if (GPR_UNLIKELY(s->read_closed)) {
752 GRPC_CHTTP2_IF_TRACING(ERROR)
753 << "skipping already closed grpc_chttp2_stream header";
754 t->incoming_stream = nullptr;
755 return init_header_skip_frame_parser(t, priority_type, is_eoh);
756 }
757 t->parser = grpc_chttp2_transport::Parser{
758 "header", grpc_chttp2_header_parser_parse, &t->hpack_parser};
759 if (t->header_eof) {
760 s->eos_received = true;
761 }
762 grpc_metadata_batch* incoming_metadata_buffer = nullptr;
763 HPackParser::LogInfo::Type frame_type = HPackParser::LogInfo::kDontKnow;
764 switch (s->header_frames_received) {
765 case 0:
766 if (t->is_client && t->header_eof) {
767 GRPC_CHTTP2_IF_TRACING(INFO) << "parsing Trailers-Only";
768 if (s->trailing_metadata_available != nullptr) {
769 *s->trailing_metadata_available = true;
770 }
771 s->parsed_trailers_only = true;
772 s->trailing_metadata_buffer.Set(grpc_core::GrpcTrailersOnly(), true);
773 s->initial_metadata_buffer.Set(grpc_core::GrpcTrailersOnly(), true);
774 incoming_metadata_buffer = &s->trailing_metadata_buffer;
775 frame_type = HPackParser::LogInfo::kTrailers;
776 } else {
777 GRPC_CHTTP2_IF_TRACING(INFO) << "parsing initial_metadata";
778 incoming_metadata_buffer = &s->initial_metadata_buffer;
779 frame_type = HPackParser::LogInfo::kHeaders;
780 }
781 break;
782 case 1:
783 GRPC_CHTTP2_IF_TRACING(INFO) << "parsing trailing_metadata";
784 incoming_metadata_buffer = &s->trailing_metadata_buffer;
785 frame_type = HPackParser::LogInfo::kTrailers;
786 break;
787 case 2:
788 LOG(ERROR) << "too many header frames received";
789 return init_header_skip_frame_parser(t, priority_type, is_eoh);
790 }
791 if (frame_type == HPackParser::LogInfo::kTrailers && !t->header_eof) {
792 return GRPC_ERROR_CREATE(
793 "Trailing metadata frame received without an end-o-stream");
794 }
795 t->hpack_parser.BeginFrame(incoming_metadata_buffer,
796 /*metadata_size_soft_limit=*/
797 t->max_header_list_size_soft_limit,
798 /*metadata_size_hard_limit=*/
799 t->settings.acked().max_header_list_size(),
800 hpack_boundary_type(t, is_eoh), priority_type,
801 hpack_parser_log_info(t, frame_type));
802 return absl::OkStatus();
803 }
804
init_window_update_frame_parser(grpc_chttp2_transport * t)805 static grpc_error_handle init_window_update_frame_parser(
806 grpc_chttp2_transport* t) {
807 grpc_error_handle err = grpc_chttp2_window_update_parser_begin_frame(
808 &t->simple.window_update, t->incoming_frame_size,
809 t->incoming_frame_flags);
810 if (!err.ok()) return err;
811 if (t->incoming_stream_id != 0) {
812 grpc_chttp2_stream* s = t->incoming_stream =
813 grpc_chttp2_parsing_lookup_stream(t, t->incoming_stream_id);
814 if (s == nullptr) {
815 GRPC_TRACE_LOG(http, ERROR) << "Stream " << t->incoming_stream_id
816 << " not found, ignoring WINDOW_UPDATE";
817 return init_non_header_skip_frame_parser(t);
818 }
819 s->call_tracer_wrapper.RecordIncomingBytes({9, 0, 0});
820 }
821 t->parser = grpc_chttp2_transport::Parser{
822 "window_update", grpc_chttp2_window_update_parser_parse,
823 &t->simple.window_update};
824 return absl::OkStatus();
825 }
826
init_ping_parser(grpc_chttp2_transport * t)827 static grpc_error_handle init_ping_parser(grpc_chttp2_transport* t) {
828 grpc_error_handle err = grpc_chttp2_ping_parser_begin_frame(
829 &t->simple.ping, t->incoming_frame_size, t->incoming_frame_flags);
830 if (!err.ok()) return err;
831 t->parser = grpc_chttp2_transport::Parser{
832 "ping", grpc_chttp2_ping_parser_parse, &t->simple.ping};
833 return absl::OkStatus();
834 }
835
init_rst_stream_parser(grpc_chttp2_transport * t)836 static grpc_error_handle init_rst_stream_parser(grpc_chttp2_transport* t) {
837 grpc_error_handle err = grpc_chttp2_rst_stream_parser_begin_frame(
838 &t->simple.rst_stream, t->incoming_frame_size, t->incoming_frame_flags);
839 if (!err.ok()) return err;
840 grpc_chttp2_stream* s = t->incoming_stream =
841 grpc_chttp2_parsing_lookup_stream(t, t->incoming_stream_id);
842 if (!t->incoming_stream) {
843 return init_non_header_skip_frame_parser(t);
844 }
845 s->call_tracer_wrapper.RecordIncomingBytes({9, 0, 0});
846 t->parser = grpc_chttp2_transport::Parser{
847 "rst_stream", grpc_chttp2_rst_stream_parser_parse, &t->simple.rst_stream};
848 return absl::OkStatus();
849 }
850
init_goaway_parser(grpc_chttp2_transport * t)851 static grpc_error_handle init_goaway_parser(grpc_chttp2_transport* t) {
852 grpc_error_handle err = grpc_chttp2_goaway_parser_begin_frame(
853 &t->goaway_parser, t->incoming_frame_size, t->incoming_frame_flags);
854 if (!err.ok()) return err;
855 t->parser = grpc_chttp2_transport::Parser{
856 "goaway", grpc_chttp2_goaway_parser_parse, &t->goaway_parser};
857 return absl::OkStatus();
858 }
859
init_settings_frame_parser(grpc_chttp2_transport * t)860 static grpc_error_handle init_settings_frame_parser(grpc_chttp2_transport* t) {
861 if (t->incoming_stream_id != 0) {
862 return GRPC_ERROR_CREATE("Settings frame received for grpc_chttp2_stream");
863 }
864
865 grpc_error_handle err = grpc_chttp2_settings_parser_begin_frame(
866 &t->simple.settings, t->incoming_frame_size, t->incoming_frame_flags,
867 t->settings.mutable_peer());
868 if (!err.ok()) {
869 return err;
870 }
871 if (t->incoming_frame_flags & GRPC_CHTTP2_FLAG_ACK) {
872 if (!t->settings.AckLastSend()) {
873 return GRPC_ERROR_CREATE("Received unexpected settings ack");
874 }
875 t->hpack_parser.hpack_table()->SetMaxBytes(
876 t->settings.acked().header_table_size());
877 grpc_chttp2_act_on_flowctl_action(
878 t->flow_control.SetAckedInitialWindow(
879 t->settings.acked().initial_window_size()),
880 t, nullptr);
881 if (t->settings_ack_watchdog !=
882 grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid) {
883 t->event_engine->Cancel(std::exchange(
884 t->settings_ack_watchdog,
885 grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid));
886 }
887 // This is more streams than can be started in http2, so setting this
888 // effictively removes the limit for the rest of the connection.
889 t->num_incoming_streams_before_settings_ack =
890 std::numeric_limits<uint32_t>::max();
891 }
892 t->parser = grpc_chttp2_transport::Parser{
893 "settings", grpc_chttp2_settings_parser_parse, &t->simple.settings};
894 return absl::OkStatus();
895 }
896
init_security_frame_parser(grpc_chttp2_transport * t)897 static grpc_error_handle init_security_frame_parser(grpc_chttp2_transport* t) {
898 grpc_error_handle err =
899 grpc_chttp2_security_frame_parser_begin_frame(&t->security_frame_parser);
900 if (!err.ok()) return err;
901 t->parser = grpc_chttp2_transport::Parser{
902 "security_frame", grpc_chttp2_security_frame_parser_parse,
903 &t->security_frame_parser};
904 return absl::OkStatus();
905 }
906
parse_frame_slice(grpc_chttp2_transport * t,const grpc_slice & slice,int is_last)907 static grpc_error_handle parse_frame_slice(grpc_chttp2_transport* t,
908 const grpc_slice& slice,
909 int is_last) {
910 grpc_chttp2_stream* s = t->incoming_stream;
911 GRPC_TRACE_VLOG(http, 2) << "INCOMING[" << t << ";" << s << "]: Parse "
912 << GRPC_SLICE_LENGTH(slice) << "b "
913 << (is_last ? "last " : "") << "frame fragment with "
914 << t->parser.name;
915 grpc_error_handle err =
916 t->parser.parser(t->parser.user_data, t, s, slice, is_last);
917 intptr_t unused;
918 if (GPR_LIKELY(err.ok())) {
919 return err;
920 }
921 GRPC_TRACE_LOG(http, ERROR)
922 << "INCOMING[" << t << ";" << s << "]: Parse failed with " << err;
923 if (grpc_error_get_int(err, grpc_core::StatusIntProperty::kStreamId,
924 &unused)) {
925 grpc_chttp2_parsing_become_skip_parser(t);
926 if (s) {
927 grpc_chttp2_cancel_stream(t, s, err, true);
928 }
929 return absl::OkStatus();
930 }
931 return err;
932 }
933
934 typedef void (*maybe_complete_func_type)(grpc_chttp2_transport* t,
935 grpc_chttp2_stream* s);
936 static const maybe_complete_func_type maybe_complete_funcs[] = {
937 grpc_chttp2_maybe_complete_recv_initial_metadata,
938 grpc_chttp2_maybe_complete_recv_trailing_metadata};
939
force_client_rst_stream(void * sp,grpc_error_handle)940 static void force_client_rst_stream(void* sp, grpc_error_handle /*error*/) {
941 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(sp);
942 grpc_chttp2_transport* t = s->t.get();
943 if (!s->write_closed) {
944 grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR,
945 &s->call_tracer_wrapper);
946 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM);
947 grpc_chttp2_mark_stream_closed(t, s, true, true, absl::OkStatus());
948 }
949 GRPC_CHTTP2_STREAM_UNREF(s, "final_rst");
950 }
951
grpc_chttp2_header_parser_parse(void * hpack_parser,grpc_chttp2_transport * t,grpc_chttp2_stream * s,const grpc_slice & slice,int is_last)952 grpc_error_handle grpc_chttp2_header_parser_parse(void* hpack_parser,
953 grpc_chttp2_transport* t,
954 grpc_chttp2_stream* s,
955 const grpc_slice& slice,
956 int is_last) {
957 auto* parser = static_cast<grpc_core::HPackParser*>(hpack_parser);
958 grpc_core::CallTracerAnnotationInterface* call_tracer = nullptr;
959 if (s != nullptr) {
960 s->call_tracer_wrapper.RecordIncomingBytes(
961 {0, 0, GRPC_SLICE_LENGTH(slice)});
962 call_tracer =
963 grpc_core::IsCallTracerInTransportEnabled()
964 ? s->arena->GetContext<grpc_core::CallTracerInterface>()
965 : s->arena->GetContext<grpc_core::CallTracerAnnotationInterface>();
966 }
967 grpc_error_handle error = parser->Parse(
968 slice, is_last != 0, absl::BitGenRef(t->bitgen), call_tracer);
969 if (!error.ok()) {
970 return error;
971 }
972 if (is_last) {
973 // need to check for null stream: this can occur if we receive an invalid
974 // stream id on a header
975 if (s != nullptr) {
976 if (parser->is_boundary()) {
977 if (s->header_frames_received == 2) {
978 return GRPC_ERROR_CREATE("Too many trailer frames");
979 }
980 s->published_metadata[s->header_frames_received] =
981 GRPC_METADATA_PUBLISHED_FROM_WIRE;
982 maybe_complete_funcs[s->header_frames_received](t, s);
983 s->header_frames_received++;
984 }
985 if (parser->is_eof()) {
986 if (t->is_client && !s->write_closed) {
987 // server eof ==> complete closure; we may need to forcefully close
988 // the stream. Wait until the combiner lock is ready to be released
989 // however -- it might be that we receive a RST_STREAM following this
990 // and can avoid the extra write
991 GRPC_CHTTP2_STREAM_REF(s, "final_rst");
992 t->combiner->FinallyRun(
993 GRPC_CLOSURE_CREATE(force_client_rst_stream, s, nullptr),
994 absl::OkStatus());
995 }
996 grpc_chttp2_mark_stream_closed(t, s, true, false, absl::OkStatus());
997 }
998 }
999 parser->FinishFrame();
1000 }
1001 return absl::OkStatus();
1002 }
1003