1 #include "quiche/http2/adapter/oghttp2_session.h"
2
3 #include <cstdint>
4 #include <memory>
5 #include <utility>
6 #include <vector>
7
8 #include "absl/memory/memory.h"
9 #include "absl/strings/escaping.h"
10 #include "quiche/http2/adapter/header_validator.h"
11 #include "quiche/http2/adapter/http2_protocol.h"
12 #include "quiche/http2/adapter/http2_util.h"
13 #include "quiche/http2/adapter/http2_visitor_interface.h"
14 #include "quiche/http2/adapter/noop_header_validator.h"
15 #include "quiche/http2/adapter/oghttp2_util.h"
16 #include "quiche/spdy/core/spdy_protocol.h"
17
18 namespace http2 {
19 namespace adapter {
20
21 namespace {
22
23 using ConnectionError = Http2VisitorInterface::ConnectionError;
24 using SpdyFramerError = Http2DecoderAdapter::SpdyFramerError;
25
26 using ::spdy::SpdySettingsIR;
27
28 const uint32_t kMaxAllowedMetadataFrameSize = 65536u;
29 const uint32_t kDefaultHpackTableCapacity = 4096u;
30 const uint32_t kMaximumHpackTableCapacity = 65536u;
31
32 // Corresponds to NGHTTP2_ERR_CALLBACK_FAILURE.
33 const int kSendError = -902;
34
35 constexpr absl::string_view kHeadValue = "HEAD";
36
37 // TODO(birenroy): Consider incorporating spdy::FlagsSerializionVisitor here.
38 class FrameAttributeCollector : public spdy::SpdyFrameVisitor {
39 public:
40 FrameAttributeCollector() = default;
VisitData(const spdy::SpdyDataIR & data)41 void VisitData(const spdy::SpdyDataIR& data) override {
42 frame_type_ = static_cast<uint8_t>(data.frame_type());
43 stream_id_ = data.stream_id();
44 flags_ =
45 (data.fin() ? END_STREAM_FLAG : 0) | (data.padded() ? PADDED_FLAG : 0);
46 }
VisitHeaders(const spdy::SpdyHeadersIR & headers)47 void VisitHeaders(const spdy::SpdyHeadersIR& headers) override {
48 frame_type_ = static_cast<uint8_t>(headers.frame_type());
49 stream_id_ = headers.stream_id();
50 flags_ = END_HEADERS_FLAG | (headers.fin() ? END_STREAM_FLAG : 0) |
51 (headers.padded() ? PADDED_FLAG : 0) |
52 (headers.has_priority() ? PRIORITY_FLAG : 0);
53 }
VisitPriority(const spdy::SpdyPriorityIR & priority)54 void VisitPriority(const spdy::SpdyPriorityIR& priority) override {
55 frame_type_ = static_cast<uint8_t>(priority.frame_type());
56 frame_type_ = 2;
57 stream_id_ = priority.stream_id();
58 }
VisitRstStream(const spdy::SpdyRstStreamIR & rst_stream)59 void VisitRstStream(const spdy::SpdyRstStreamIR& rst_stream) override {
60 frame_type_ = static_cast<uint8_t>(rst_stream.frame_type());
61 frame_type_ = 3;
62 stream_id_ = rst_stream.stream_id();
63 error_code_ = rst_stream.error_code();
64 }
VisitSettings(const spdy::SpdySettingsIR & settings)65 void VisitSettings(const spdy::SpdySettingsIR& settings) override {
66 frame_type_ = static_cast<uint8_t>(settings.frame_type());
67 frame_type_ = 4;
68 flags_ = (settings.is_ack() ? ACK_FLAG : 0);
69 }
VisitPushPromise(const spdy::SpdyPushPromiseIR & push_promise)70 void VisitPushPromise(const spdy::SpdyPushPromiseIR& push_promise) override {
71 frame_type_ = static_cast<uint8_t>(push_promise.frame_type());
72 frame_type_ = 5;
73 stream_id_ = push_promise.stream_id();
74 flags_ = (push_promise.padded() ? PADDED_FLAG : 0);
75 }
VisitPing(const spdy::SpdyPingIR & ping)76 void VisitPing(const spdy::SpdyPingIR& ping) override {
77 frame_type_ = static_cast<uint8_t>(ping.frame_type());
78 frame_type_ = 6;
79 flags_ = (ping.is_ack() ? ACK_FLAG : 0);
80 }
VisitGoAway(const spdy::SpdyGoAwayIR & goaway)81 void VisitGoAway(const spdy::SpdyGoAwayIR& goaway) override {
82 frame_type_ = static_cast<uint8_t>(goaway.frame_type());
83 frame_type_ = 7;
84 error_code_ = goaway.error_code();
85 }
VisitWindowUpdate(const spdy::SpdyWindowUpdateIR & window_update)86 void VisitWindowUpdate(
87 const spdy::SpdyWindowUpdateIR& window_update) override {
88 frame_type_ = static_cast<uint8_t>(window_update.frame_type());
89 frame_type_ = 8;
90 stream_id_ = window_update.stream_id();
91 }
VisitContinuation(const spdy::SpdyContinuationIR & continuation)92 void VisitContinuation(
93 const spdy::SpdyContinuationIR& continuation) override {
94 frame_type_ = static_cast<uint8_t>(continuation.frame_type());
95 stream_id_ = continuation.stream_id();
96 flags_ = continuation.end_headers() ? END_HEADERS_FLAG : 0;
97 }
VisitUnknown(const spdy::SpdyUnknownIR & unknown)98 void VisitUnknown(const spdy::SpdyUnknownIR& unknown) override {
99 frame_type_ = static_cast<uint8_t>(unknown.frame_type());
100 stream_id_ = unknown.stream_id();
101 flags_ = unknown.flags();
102 }
VisitAltSvc(const spdy::SpdyAltSvcIR &)103 void VisitAltSvc(const spdy::SpdyAltSvcIR& /*altsvc*/) override {}
VisitPriorityUpdate(const spdy::SpdyPriorityUpdateIR &)104 void VisitPriorityUpdate(
105 const spdy::SpdyPriorityUpdateIR& /*priority_update*/) override {}
VisitAcceptCh(const spdy::SpdyAcceptChIR &)106 void VisitAcceptCh(const spdy::SpdyAcceptChIR& /*accept_ch*/) override {}
107
stream_id()108 uint32_t stream_id() { return stream_id_; }
error_code()109 uint32_t error_code() { return error_code_; }
frame_type()110 uint8_t frame_type() { return frame_type_; }
flags()111 uint8_t flags() { return flags_; }
112
113 private:
114 uint32_t stream_id_ = 0;
115 uint32_t error_code_ = 0;
116 uint8_t frame_type_ = 0;
117 uint8_t flags_ = 0;
118 };
119
TracePerspectiveAsString(Perspective p)120 absl::string_view TracePerspectiveAsString(Perspective p) {
121 switch (p) {
122 case Perspective::kClient:
123 return "OGHTTP2_CLIENT";
124 case Perspective::kServer:
125 return "OGHTTP2_SERVER";
126 }
127 return "OGHTTP2_SERVER";
128 }
129
130 class RunOnExit {
131 public:
132 RunOnExit() = default;
RunOnExit(std::function<void ()> f)133 explicit RunOnExit(std::function<void()> f) : f_(std::move(f)) {}
134
135 RunOnExit(const RunOnExit& other) = delete;
136 RunOnExit& operator=(const RunOnExit& other) = delete;
137 RunOnExit(RunOnExit&& other) = delete;
138 RunOnExit& operator=(RunOnExit&& other) = delete;
139
~RunOnExit()140 ~RunOnExit() {
141 if (f_) {
142 f_();
143 }
144 f_ = {};
145 }
146
emplace(std::function<void ()> f)147 void emplace(std::function<void()> f) { f_ = std::move(f); }
148
149 private:
150 std::function<void()> f_;
151 };
152
GetHttp2ErrorCode(SpdyFramerError error)153 Http2ErrorCode GetHttp2ErrorCode(SpdyFramerError error) {
154 switch (error) {
155 case SpdyFramerError::SPDY_NO_ERROR:
156 return Http2ErrorCode::HTTP2_NO_ERROR;
157 case SpdyFramerError::SPDY_INVALID_STREAM_ID:
158 case SpdyFramerError::SPDY_INVALID_CONTROL_FRAME:
159 case SpdyFramerError::SPDY_INVALID_PADDING:
160 case SpdyFramerError::SPDY_INVALID_DATA_FRAME_FLAGS:
161 case SpdyFramerError::SPDY_UNEXPECTED_FRAME:
162 return Http2ErrorCode::PROTOCOL_ERROR;
163 case SpdyFramerError::SPDY_CONTROL_PAYLOAD_TOO_LARGE:
164 case SpdyFramerError::SPDY_INVALID_CONTROL_FRAME_SIZE:
165 case SpdyFramerError::SPDY_OVERSIZED_PAYLOAD:
166 return Http2ErrorCode::FRAME_SIZE_ERROR;
167 case SpdyFramerError::SPDY_DECOMPRESS_FAILURE:
168 case SpdyFramerError::SPDY_HPACK_INDEX_VARINT_ERROR:
169 case SpdyFramerError::SPDY_HPACK_NAME_LENGTH_VARINT_ERROR:
170 case SpdyFramerError::SPDY_HPACK_VALUE_LENGTH_VARINT_ERROR:
171 case SpdyFramerError::SPDY_HPACK_NAME_TOO_LONG:
172 case SpdyFramerError::SPDY_HPACK_VALUE_TOO_LONG:
173 case SpdyFramerError::SPDY_HPACK_NAME_HUFFMAN_ERROR:
174 case SpdyFramerError::SPDY_HPACK_VALUE_HUFFMAN_ERROR:
175 case SpdyFramerError::SPDY_HPACK_MISSING_DYNAMIC_TABLE_SIZE_UPDATE:
176 case SpdyFramerError::SPDY_HPACK_INVALID_INDEX:
177 case SpdyFramerError::SPDY_HPACK_INVALID_NAME_INDEX:
178 case SpdyFramerError::SPDY_HPACK_DYNAMIC_TABLE_SIZE_UPDATE_NOT_ALLOWED:
179 case SpdyFramerError::
180 SPDY_HPACK_INITIAL_DYNAMIC_TABLE_SIZE_UPDATE_IS_ABOVE_LOW_WATER_MARK:
181 case SpdyFramerError::
182 SPDY_HPACK_DYNAMIC_TABLE_SIZE_UPDATE_IS_ABOVE_ACKNOWLEDGED_SETTING:
183 case SpdyFramerError::SPDY_HPACK_TRUNCATED_BLOCK:
184 case SpdyFramerError::SPDY_HPACK_FRAGMENT_TOO_LONG:
185 case SpdyFramerError::SPDY_HPACK_COMPRESSED_HEADER_SIZE_EXCEEDS_LIMIT:
186 return Http2ErrorCode::COMPRESSION_ERROR;
187 case SpdyFramerError::SPDY_INTERNAL_FRAMER_ERROR:
188 case SpdyFramerError::SPDY_STOP_PROCESSING:
189 case SpdyFramerError::LAST_ERROR:
190 return Http2ErrorCode::INTERNAL_ERROR;
191 }
192 return Http2ErrorCode::INTERNAL_ERROR;
193 }
194
IsResponse(HeaderType type)195 bool IsResponse(HeaderType type) {
196 return type == HeaderType::RESPONSE_100 || type == HeaderType::RESPONSE;
197 }
198
StatusIs1xx(absl::string_view status)199 bool StatusIs1xx(absl::string_view status) {
200 return status.size() == 3 && status[0] == '1';
201 }
202
203 // Returns the upper bound on HPACK encoder table capacity. If not specified in
204 // the Options, a reasonable default upper bound is used.
HpackCapacityBound(const OgHttp2Session::Options & o)205 uint32_t HpackCapacityBound(const OgHttp2Session::Options& o) {
206 return o.max_hpack_encoding_table_capacity.value_or(
207 kMaximumHpackTableCapacity);
208 }
209
IsNonAckSettings(const spdy::SpdyFrameIR & frame)210 bool IsNonAckSettings(const spdy::SpdyFrameIR& frame) {
211 return frame.frame_type() == spdy::SpdyFrameType::SETTINGS &&
212 !reinterpret_cast<const SpdySettingsIR&>(frame).is_ack();
213 }
214
215 } // namespace
216
PassthroughHeadersHandler(OgHttp2Session & session,Http2VisitorInterface & visitor)217 OgHttp2Session::PassthroughHeadersHandler::PassthroughHeadersHandler(
218 OgHttp2Session& session, Http2VisitorInterface& visitor)
219 : session_(session), visitor_(visitor) {
220 if (session_.options_.validate_http_headers) {
221 QUICHE_VLOG(2) << "instantiating regular header validator";
222 validator_ = std::make_unique<HeaderValidator>();
223 } else {
224 QUICHE_VLOG(2) << "instantiating noop header validator";
225 validator_ = std::make_unique<NoopHeaderValidator>();
226 }
227 }
228
OnHeaderBlockStart()229 void OgHttp2Session::PassthroughHeadersHandler::OnHeaderBlockStart() {
230 result_ = Http2VisitorInterface::HEADER_OK;
231 const bool status = visitor_.OnBeginHeadersForStream(stream_id_);
232 if (!status) {
233 QUICHE_VLOG(1)
234 << "Visitor rejected header block, returning HEADER_CONNECTION_ERROR";
235 result_ = Http2VisitorInterface::HEADER_CONNECTION_ERROR;
236 }
237 validator_->StartHeaderBlock();
238 }
239
InterpretHeaderStatus(HeaderValidator::HeaderStatus status)240 Http2VisitorInterface::OnHeaderResult InterpretHeaderStatus(
241 HeaderValidator::HeaderStatus status) {
242 switch (status) {
243 case HeaderValidator::HEADER_OK:
244 case HeaderValidator::HEADER_SKIP:
245 return Http2VisitorInterface::HEADER_OK;
246 case HeaderValidator::HEADER_FIELD_INVALID:
247 return Http2VisitorInterface::HEADER_FIELD_INVALID;
248 case HeaderValidator::HEADER_FIELD_TOO_LONG:
249 return Http2VisitorInterface::HEADER_RST_STREAM;
250 }
251 return Http2VisitorInterface::HEADER_CONNECTION_ERROR;
252 }
253
OnHeader(absl::string_view key,absl::string_view value)254 void OgHttp2Session::PassthroughHeadersHandler::OnHeader(
255 absl::string_view key, absl::string_view value) {
256 if (result_ != Http2VisitorInterface::HEADER_OK) {
257 QUICHE_VLOG(2) << "Early return; status not HEADER_OK";
258 return;
259 }
260 const HeaderValidator::HeaderStatus validation_result =
261 validator_->ValidateSingleHeader(key, value);
262 if (validation_result == HeaderValidator::HEADER_SKIP) {
263 return;
264 }
265 if (validation_result != HeaderValidator::HEADER_OK) {
266 QUICHE_VLOG(2) << "Header validation failed with result "
267 << static_cast<int>(validation_result);
268 result_ = InterpretHeaderStatus(validation_result);
269 return;
270 }
271 result_ = visitor_.OnHeaderForStream(stream_id_, key, value);
272 }
273
OnHeaderBlockEnd(size_t,size_t)274 void OgHttp2Session::PassthroughHeadersHandler::OnHeaderBlockEnd(
275 size_t /* uncompressed_header_bytes */,
276 size_t /* compressed_header_bytes */) {
277 if (result_ == Http2VisitorInterface::HEADER_OK) {
278 if (!validator_->FinishHeaderBlock(type_)) {
279 QUICHE_VLOG(1) << "FinishHeaderBlock returned false; returning "
280 "HEADER_HTTP_MESSAGING";
281 result_ = Http2VisitorInterface::HEADER_HTTP_MESSAGING;
282 }
283 }
284 if (frame_contains_fin_ && IsResponse(type_) &&
285 StatusIs1xx(status_header())) {
286 QUICHE_VLOG(1) << "Unexpected end of stream without final headers";
287 result_ = Http2VisitorInterface::HEADER_HTTP_MESSAGING;
288 }
289 if (result_ == Http2VisitorInterface::HEADER_OK) {
290 const bool result = visitor_.OnEndHeadersForStream(stream_id_);
291 if (!result) {
292 session_.fatal_visitor_callback_failure_ = true;
293 session_.decoder_.StopProcessing();
294 }
295 } else {
296 session_.OnHeaderStatus(stream_id_, result_);
297 }
298 frame_contains_fin_ = false;
299 }
300
301 // TODO(diannahu): Add checks for request methods.
CanReceiveBody() const302 bool OgHttp2Session::PassthroughHeadersHandler::CanReceiveBody() const {
303 switch (header_type()) {
304 case HeaderType::REQUEST_TRAILER:
305 case HeaderType::RESPONSE_TRAILER:
306 case HeaderType::RESPONSE_100:
307 return false;
308 case HeaderType::RESPONSE:
309 // 304 responses should not have a body:
310 // https://httpwg.org/specs/rfc7230.html#rfc.section.3.3.2
311 // Neither should 204 responses:
312 // https://httpwg.org/specs/rfc7231.html#rfc.section.6.3.5
313 return status_header() != "304" && status_header() != "204";
314 case HeaderType::REQUEST:
315 return true;
316 }
317 return true;
318 }
319
320 // A visitor that extracts an int64_t from each type of a ProcessBytesResult.
321 struct OgHttp2Session::ProcessBytesResultVisitor {
operator ()http2::adapter::OgHttp2Session::ProcessBytesResultVisitor322 int64_t operator()(const int64_t bytes) const { return bytes; }
323
operator ()http2::adapter::OgHttp2Session::ProcessBytesResultVisitor324 int64_t operator()(const ProcessBytesError error) const {
325 switch (error) {
326 case ProcessBytesError::kUnspecified:
327 return -1;
328 case ProcessBytesError::kInvalidConnectionPreface:
329 return -903; // NGHTTP2_ERR_BAD_CLIENT_MAGIC
330 case ProcessBytesError::kVisitorCallbackFailed:
331 return -902; // NGHTTP2_ERR_CALLBACK_FAILURE
332 }
333 return -1;
334 }
335 };
336
OgHttp2Session(Http2VisitorInterface & visitor,Options options)337 OgHttp2Session::OgHttp2Session(Http2VisitorInterface& visitor, Options options)
338 : visitor_(visitor),
339 options_(options),
340 event_forwarder_([this]() { return !latched_error_; }, *this),
341 receive_logger_(
342 &event_forwarder_, TracePerspectiveAsString(options.perspective),
__anon1df836620302() 343 [logging_enabled = GetQuicheFlag(quiche_oghttp2_debug_trace)]() {
344 return logging_enabled;
345 },
346 this),
347 send_logger_(
348 TracePerspectiveAsString(options.perspective),
__anon1df836620402() 349 [logging_enabled = GetQuicheFlag(quiche_oghttp2_debug_trace)]() {
350 return logging_enabled;
351 },
352 this),
353 headers_handler_(*this, visitor),
354 noop_headers_handler_(/*listener=*/nullptr),
355 connection_window_manager_(
356 kInitialFlowControlWindowSize,
__anon1df836620502(size_t window_update_delta) 357 [this](size_t window_update_delta) {
358 SendWindowUpdate(kConnectionStreamId, window_update_delta);
359 },
360 options.should_window_update_fn,
361 /*update_window_on_notify=*/false) {
362 decoder_.set_visitor(&receive_logger_);
363 if (options_.max_header_list_bytes) {
364 // Limit buffering of encoded HPACK data to 2x the decoded limit.
365 decoder_.GetHpackDecoder()->set_max_decode_buffer_size_bytes(
366 2 * *options_.max_header_list_bytes);
367 // Limit the total bytes accepted for HPACK decoding to 4x the limit.
368 decoder_.GetHpackDecoder()->set_max_header_block_bytes(
369 4 * *options_.max_header_list_bytes);
370 }
371 if (IsServerSession()) {
372 remaining_preface_ = {spdy::kHttp2ConnectionHeaderPrefix,
373 spdy::kHttp2ConnectionHeaderPrefixSize};
374 }
375 if (options_.max_header_field_size.has_value()) {
376 headers_handler_.SetMaxFieldSize(options_.max_header_field_size.value());
377 }
378 headers_handler_.SetAllowObsText(options_.allow_obs_text);
379 }
380
~OgHttp2Session()381 OgHttp2Session::~OgHttp2Session() {}
382
SetStreamUserData(Http2StreamId stream_id,void * user_data)383 void OgHttp2Session::SetStreamUserData(Http2StreamId stream_id,
384 void* user_data) {
385 auto it = stream_map_.find(stream_id);
386 if (it != stream_map_.end()) {
387 it->second.user_data = user_data;
388 }
389 }
390
GetStreamUserData(Http2StreamId stream_id)391 void* OgHttp2Session::GetStreamUserData(Http2StreamId stream_id) {
392 auto it = stream_map_.find(stream_id);
393 if (it != stream_map_.end()) {
394 return it->second.user_data;
395 }
396 auto p = pending_streams_.find(stream_id);
397 if (p != pending_streams_.end()) {
398 return p->second.user_data;
399 }
400 return nullptr;
401 }
402
ResumeStream(Http2StreamId stream_id)403 bool OgHttp2Session::ResumeStream(Http2StreamId stream_id) {
404 auto it = stream_map_.find(stream_id);
405 if (it == stream_map_.end() || it->second.outbound_body == nullptr ||
406 !write_scheduler_.StreamRegistered(stream_id)) {
407 return false;
408 }
409 it->second.data_deferred = false;
410 write_scheduler_.MarkStreamReady(stream_id, /*add_to_front=*/false);
411 return true;
412 }
413
GetStreamSendWindowSize(Http2StreamId stream_id) const414 int OgHttp2Session::GetStreamSendWindowSize(Http2StreamId stream_id) const {
415 auto it = stream_map_.find(stream_id);
416 if (it != stream_map_.end()) {
417 return it->second.send_window;
418 }
419 return -1;
420 }
421
GetStreamReceiveWindowLimit(Http2StreamId stream_id) const422 int OgHttp2Session::GetStreamReceiveWindowLimit(Http2StreamId stream_id) const {
423 auto it = stream_map_.find(stream_id);
424 if (it != stream_map_.end()) {
425 return it->second.window_manager.WindowSizeLimit();
426 }
427 return -1;
428 }
429
GetStreamReceiveWindowSize(Http2StreamId stream_id) const430 int OgHttp2Session::GetStreamReceiveWindowSize(Http2StreamId stream_id) const {
431 auto it = stream_map_.find(stream_id);
432 if (it != stream_map_.end()) {
433 return it->second.window_manager.CurrentWindowSize();
434 }
435 return -1;
436 }
437
GetReceiveWindowSize() const438 int OgHttp2Session::GetReceiveWindowSize() const {
439 return connection_window_manager_.CurrentWindowSize();
440 }
441
GetHpackEncoderDynamicTableSize() const442 int OgHttp2Session::GetHpackEncoderDynamicTableSize() const {
443 const spdy::HpackEncoder* encoder = framer_.GetHpackEncoder();
444 return encoder == nullptr ? 0 : encoder->GetDynamicTableSize();
445 }
446
GetHpackEncoderDynamicTableCapacity() const447 int OgHttp2Session::GetHpackEncoderDynamicTableCapacity() const {
448 const spdy::HpackEncoder* encoder = framer_.GetHpackEncoder();
449 return encoder == nullptr ? kDefaultHpackTableCapacity
450 : encoder->CurrentHeaderTableSizeSetting();
451 }
452
GetHpackDecoderDynamicTableSize() const453 int OgHttp2Session::GetHpackDecoderDynamicTableSize() const {
454 const spdy::HpackDecoderAdapter* decoder = decoder_.GetHpackDecoder();
455 return decoder == nullptr ? 0 : decoder->GetDynamicTableSize();
456 }
457
GetHpackDecoderSizeLimit() const458 int OgHttp2Session::GetHpackDecoderSizeLimit() const {
459 const spdy::HpackDecoderAdapter* decoder = decoder_.GetHpackDecoder();
460 return decoder == nullptr ? 0 : decoder->GetCurrentHeaderTableSizeSetting();
461 }
462
ProcessBytes(absl::string_view bytes)463 int64_t OgHttp2Session::ProcessBytes(absl::string_view bytes) {
464 QUICHE_VLOG(2) << TracePerspectiveAsString(options_.perspective)
465 << " processing [" << absl::CEscape(bytes) << "]";
466 return absl::visit(ProcessBytesResultVisitor(), ProcessBytesImpl(bytes));
467 }
468
469 absl::variant<int64_t, OgHttp2Session::ProcessBytesError>
ProcessBytesImpl(absl::string_view bytes)470 OgHttp2Session::ProcessBytesImpl(absl::string_view bytes) {
471 if (processing_bytes_) {
472 QUICHE_VLOG(1) << "Returning early; already processing bytes.";
473 return 0;
474 }
475 processing_bytes_ = true;
476 RunOnExit r{[this]() { processing_bytes_ = false; }};
477
478 if (options_.blackhole_data_on_connection_error && latched_error_) {
479 return static_cast<int64_t>(bytes.size());
480 }
481
482 int64_t preface_consumed = 0;
483 if (!remaining_preface_.empty()) {
484 QUICHE_VLOG(2) << "Preface bytes remaining: " << remaining_preface_.size();
485 // decoder_ does not understand the client connection preface.
486 size_t min_size = std::min(remaining_preface_.size(), bytes.size());
487 if (!absl::StartsWith(remaining_preface_, bytes.substr(0, min_size))) {
488 // Preface doesn't match!
489 QUICHE_DLOG(INFO) << "Preface doesn't match! Expected: ["
490 << absl::CEscape(remaining_preface_) << "], actual: ["
491 << absl::CEscape(bytes) << "]";
492 LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
493 ConnectionError::kInvalidConnectionPreface);
494 return ProcessBytesError::kInvalidConnectionPreface;
495 }
496 remaining_preface_.remove_prefix(min_size);
497 bytes.remove_prefix(min_size);
498 if (!remaining_preface_.empty()) {
499 QUICHE_VLOG(2) << "Preface bytes remaining: "
500 << remaining_preface_.size();
501 return static_cast<int64_t>(min_size);
502 }
503 preface_consumed = min_size;
504 }
505 int64_t result = decoder_.ProcessInput(bytes.data(), bytes.size());
506 QUICHE_VLOG(2) << "ProcessBytes result: " << result;
507 if (fatal_visitor_callback_failure_) {
508 QUICHE_DCHECK(latched_error_);
509 QUICHE_VLOG(2) << "Visitor callback failed while processing bytes.";
510 return ProcessBytesError::kVisitorCallbackFailed;
511 }
512 if (latched_error_ || result < 0) {
513 QUICHE_VLOG(2) << "ProcessBytes encountered an error.";
514 if (options_.blackhole_data_on_connection_error) {
515 return static_cast<int64_t>(bytes.size() + preface_consumed);
516 } else {
517 return ProcessBytesError::kUnspecified;
518 }
519 }
520 return result + preface_consumed;
521 }
522
Consume(Http2StreamId stream_id,size_t num_bytes)523 int OgHttp2Session::Consume(Http2StreamId stream_id, size_t num_bytes) {
524 auto it = stream_map_.find(stream_id);
525 if (it == stream_map_.end()) {
526 QUICHE_LOG(ERROR) << "Stream " << stream_id << " not found when consuming "
527 << num_bytes << " bytes";
528 } else {
529 it->second.window_manager.MarkDataFlushed(num_bytes);
530 }
531 connection_window_manager_.MarkDataFlushed(num_bytes);
532 return 0; // Remove?
533 }
534
StartGracefulShutdown()535 void OgHttp2Session::StartGracefulShutdown() {
536 if (IsServerSession()) {
537 if (!queued_goaway_) {
538 EnqueueFrame(std::make_unique<spdy::SpdyGoAwayIR>(
539 std::numeric_limits<int32_t>::max(), spdy::ERROR_CODE_NO_ERROR,
540 "graceful_shutdown"));
541 }
542 } else {
543 QUICHE_LOG(ERROR) << "Graceful shutdown not needed for clients.";
544 }
545 }
546
EnqueueFrame(std::unique_ptr<spdy::SpdyFrameIR> frame)547 void OgHttp2Session::EnqueueFrame(std::unique_ptr<spdy::SpdyFrameIR> frame) {
548 if (queued_immediate_goaway_) {
549 // Do not allow additional frames to be enqueued after the GOAWAY.
550 return;
551 }
552
553 const bool is_non_ack_settings = IsNonAckSettings(*frame);
554 MaybeSetupPreface(is_non_ack_settings);
555
556 if (frame->frame_type() == spdy::SpdyFrameType::GOAWAY) {
557 queued_goaway_ = true;
558 if (latched_error_) {
559 PrepareForImmediateGoAway();
560 }
561 } else if (frame->fin() ||
562 frame->frame_type() == spdy::SpdyFrameType::RST_STREAM) {
563 auto iter = stream_map_.find(frame->stream_id());
564 if (iter != stream_map_.end()) {
565 iter->second.half_closed_local = true;
566 }
567 if (frame->frame_type() == spdy::SpdyFrameType::RST_STREAM) {
568 // TODO(diannahu): Condition on existence in the stream map?
569 streams_reset_.insert(frame->stream_id());
570 }
571 } else if (frame->frame_type() == spdy::SpdyFrameType::WINDOW_UPDATE) {
572 UpdateReceiveWindow(
573 frame->stream_id(),
574 reinterpret_cast<spdy::SpdyWindowUpdateIR&>(*frame).delta());
575 } else if (is_non_ack_settings) {
576 HandleOutboundSettings(
577 *reinterpret_cast<spdy::SpdySettingsIR*>(frame.get()));
578 }
579 if (frame->stream_id() != 0) {
580 auto result = queued_frames_.insert({frame->stream_id(), 1});
581 if (!result.second) {
582 ++(result.first->second);
583 }
584 }
585 frames_.push_back(std::move(frame));
586 }
587
Send()588 int OgHttp2Session::Send() {
589 if (sending_) {
590 QUICHE_VLOG(1) << TracePerspectiveAsString(options_.perspective)
591 << " returning early; already sending.";
592 return 0;
593 }
594 sending_ = true;
595 RunOnExit r{[this]() { sending_ = false; }};
596
597 if (fatal_send_error_) {
598 return kSendError;
599 }
600
601 MaybeSetupPreface(/*sending_outbound_settings=*/false);
602
603 SendResult continue_writing = SendQueuedFrames();
604 if (queued_immediate_goaway_) {
605 // If an immediate GOAWAY was queued, then the above flush either sent the
606 // GOAWAY or buffered it to be sent on the next successful flush. In either
607 // case, return early here to avoid sending other frames.
608 return InterpretSendResult(continue_writing);
609 }
610 // Notify on new/pending streams closed due to GOAWAY receipt.
611 CloseGoAwayRejectedStreams();
612 // Wake streams for writes.
613 while (continue_writing == SendResult::SEND_OK && HasReadyStream()) {
614 const Http2StreamId stream_id = GetNextReadyStream();
615 // TODO(birenroy): Add a return value to indicate write blockage, so streams
616 // aren't woken unnecessarily.
617 QUICHE_VLOG(1) << "Waking stream " << stream_id << " for writes.";
618 continue_writing = WriteForStream(stream_id);
619 }
620 if (continue_writing == SendResult::SEND_OK) {
621 continue_writing = SendQueuedFrames();
622 }
623 return InterpretSendResult(continue_writing);
624 }
625
InterpretSendResult(SendResult result)626 int OgHttp2Session::InterpretSendResult(SendResult result) {
627 if (result == SendResult::SEND_ERROR) {
628 fatal_send_error_ = true;
629 return kSendError;
630 } else {
631 return 0;
632 }
633 }
634
HasReadyStream() const635 bool OgHttp2Session::HasReadyStream() const {
636 return !trailers_ready_.empty() ||
637 (write_scheduler_.HasReadyStreams() && connection_send_window_ > 0);
638 }
639
GetNextReadyStream()640 Http2StreamId OgHttp2Session::GetNextReadyStream() {
641 QUICHE_DCHECK(HasReadyStream());
642 if (!trailers_ready_.empty()) {
643 const Http2StreamId stream_id = *trailers_ready_.begin();
644 // WriteForStream() will re-mark the stream as ready, if necessary.
645 write_scheduler_.MarkStreamNotReady(stream_id);
646 return stream_id;
647 }
648 return write_scheduler_.PopNextReadyStream();
649 }
650
MaybeSendBufferedData()651 OgHttp2Session::SendResult OgHttp2Session::MaybeSendBufferedData() {
652 int64_t result = std::numeric_limits<int64_t>::max();
653 while (result > 0 && !buffered_data_.empty()) {
654 result = visitor_.OnReadyToSend(buffered_data_);
655 if (result > 0) {
656 buffered_data_.erase(0, result);
657 }
658 }
659 if (result < 0) {
660 LatchErrorAndNotify(Http2ErrorCode::INTERNAL_ERROR,
661 ConnectionError::kSendError);
662 return SendResult::SEND_ERROR;
663 }
664 return buffered_data_.empty() ? SendResult::SEND_OK
665 : SendResult::SEND_BLOCKED;
666 }
667
SendQueuedFrames()668 OgHttp2Session::SendResult OgHttp2Session::SendQueuedFrames() {
669 // Flush any serialized prefix.
670 const SendResult result = MaybeSendBufferedData();
671 if (result != SendResult::SEND_OK) {
672 return result;
673 }
674 // Serialize and send frames in the queue.
675 while (!frames_.empty()) {
676 const auto& frame_ptr = frames_.front();
677 FrameAttributeCollector c;
678 frame_ptr->Visit(&c);
679
680 // DATA frames should never be queued.
681 QUICHE_DCHECK_NE(c.frame_type(), 0);
682
683 const bool stream_reset =
684 c.stream_id() != 0 && streams_reset_.count(c.stream_id()) > 0;
685 if (stream_reset &&
686 c.frame_type() != static_cast<uint8_t>(FrameType::RST_STREAM)) {
687 // The stream has been reset, so any other remaining frames can be
688 // skipped.
689 // TODO(birenroy): inform the visitor of frames that are skipped.
690 DecrementQueuedFrameCount(c.stream_id(), c.frame_type());
691 frames_.pop_front();
692 continue;
693 } else if (!IsServerSession() && received_goaway_ &&
694 c.stream_id() >
695 static_cast<uint32_t>(received_goaway_stream_id_)) {
696 // This frame will be ignored by the server, so don't send it. The stream
697 // associated with this frame should have been closed in OnGoAway().
698 frames_.pop_front();
699 continue;
700 }
701 // Frames can't accurately report their own length; the actual serialized
702 // length must be used instead.
703 spdy::SpdySerializedFrame frame = framer_.SerializeFrame(*frame_ptr);
704 const size_t frame_payload_length = frame.size() - spdy::kFrameHeaderSize;
705 frame_ptr->Visit(&send_logger_);
706 visitor_.OnBeforeFrameSent(c.frame_type(), c.stream_id(),
707 frame_payload_length, c.flags());
708 const int64_t result = visitor_.OnReadyToSend(absl::string_view(frame));
709 if (result < 0) {
710 LatchErrorAndNotify(Http2ErrorCode::INTERNAL_ERROR,
711 ConnectionError::kSendError);
712 return SendResult::SEND_ERROR;
713 } else if (result == 0) {
714 // Write blocked.
715 return SendResult::SEND_BLOCKED;
716 } else {
717 frames_.pop_front();
718
719 const bool ok =
720 AfterFrameSent(c.frame_type(), c.stream_id(), frame_payload_length,
721 c.flags(), c.error_code());
722 if (!ok) {
723 LatchErrorAndNotify(Http2ErrorCode::INTERNAL_ERROR,
724 ConnectionError::kSendError);
725 return SendResult::SEND_ERROR;
726 }
727 if (static_cast<size_t>(result) < frame.size()) {
728 // The frame was partially written, so the rest must be buffered.
729 buffered_data_.append(frame.data() + result, frame.size() - result);
730 return SendResult::SEND_BLOCKED;
731 }
732 }
733 }
734 return SendResult::SEND_OK;
735 }
736
AfterFrameSent(uint8_t frame_type_int,uint32_t stream_id,size_t payload_length,uint8_t flags,uint32_t error_code)737 bool OgHttp2Session::AfterFrameSent(uint8_t frame_type_int, uint32_t stream_id,
738 size_t payload_length, uint8_t flags,
739 uint32_t error_code) {
740 const FrameType frame_type = static_cast<FrameType>(frame_type_int);
741 int result = visitor_.OnFrameSent(frame_type_int, stream_id, payload_length,
742 flags, error_code);
743 if (result < 0) {
744 return false;
745 }
746 if (stream_id == 0) {
747 if (frame_type == FrameType::SETTINGS) {
748 const bool is_settings_ack = (flags & ACK_FLAG);
749 if (is_settings_ack && encoder_header_table_capacity_when_acking_) {
750 framer_.UpdateHeaderEncoderTableSize(
751 encoder_header_table_capacity_when_acking_.value());
752 encoder_header_table_capacity_when_acking_ = absl::nullopt;
753 } else if (!is_settings_ack) {
754 sent_non_ack_settings_ = true;
755 }
756 }
757 return true;
758 }
759
760 const bool contains_fin =
761 (frame_type == FrameType::DATA || frame_type == FrameType::HEADERS) &&
762 (flags & END_STREAM_FLAG) == END_STREAM_FLAG;
763 auto it = stream_map_.find(stream_id);
764 const bool still_open_remote =
765 it != stream_map_.end() && !it->second.half_closed_remote;
766 if (contains_fin && still_open_remote &&
767 options_.rst_stream_no_error_when_incomplete && IsServerSession()) {
768 // Since the peer has not yet ended the stream, this endpoint should
769 // send a RST_STREAM NO_ERROR. See RFC 7540 Section 8.1.
770 frames_.push_front(std::make_unique<spdy::SpdyRstStreamIR>(
771 stream_id, spdy::SpdyErrorCode::ERROR_CODE_NO_ERROR));
772 auto queued_result = queued_frames_.insert({stream_id, 1});
773 if (!queued_result.second) {
774 ++(queued_result.first->second);
775 }
776 it->second.half_closed_remote = true;
777 }
778
779 DecrementQueuedFrameCount(stream_id, frame_type_int);
780 return true;
781 }
782
WriteForStream(Http2StreamId stream_id)783 OgHttp2Session::SendResult OgHttp2Session::WriteForStream(
784 Http2StreamId stream_id) {
785 auto it = stream_map_.find(stream_id);
786 if (it == stream_map_.end()) {
787 QUICHE_LOG(ERROR) << "Can't find stream " << stream_id
788 << " which is ready to write!";
789 return SendResult::SEND_OK;
790 }
791 StreamState& state = it->second;
792 auto reset_it = streams_reset_.find(stream_id);
793 if (reset_it != streams_reset_.end()) {
794 // The stream has been reset; there's no point in sending DATA or trailing
795 // HEADERS.
796 state.outbound_body = nullptr;
797 state.trailers = nullptr;
798 return SendResult::SEND_OK;
799 }
800
801 SendResult connection_can_write = SendResult::SEND_OK;
802 if (state.outbound_body == nullptr ||
803 (!options_.trailers_require_end_data && state.data_deferred)) {
804 // No data to send, but there might be trailers.
805 if (state.trailers != nullptr) {
806 // Trailers will include END_STREAM, so the data source can be discarded.
807 // Since data_deferred is true, there is no data waiting to be flushed for
808 // this stream.
809 state.outbound_body = nullptr;
810 auto block_ptr = std::move(state.trailers);
811 if (state.half_closed_local) {
812 QUICHE_LOG(ERROR) << "Sent fin; can't send trailers.";
813 } else {
814 SendTrailers(stream_id, std::move(*block_ptr));
815 }
816 }
817 return SendResult::SEND_OK;
818 }
819 int32_t available_window =
820 std::min({connection_send_window_, state.send_window,
821 static_cast<int32_t>(max_frame_payload_)});
822 while (connection_can_write == SendResult::SEND_OK && available_window > 0 &&
823 state.outbound_body != nullptr && !state.data_deferred) {
824 auto [length, end_data] =
825 state.outbound_body->SelectPayloadLength(available_window);
826 QUICHE_VLOG(2) << "WriteForStream | length: " << length
827 << " end_data: " << end_data
828 << " trailers: " << state.trailers.get();
829 if (length == 0 && !end_data &&
830 (options_.trailers_require_end_data || state.trailers == nullptr)) {
831 // An unproductive call to SelectPayloadLength() results in this stream
832 // entering the "deferred" state only if either no trailers are available
833 // to send, or trailers require an explicit end_data before being sent.
834 state.data_deferred = true;
835 break;
836 } else if (length == DataFrameSource::kError) {
837 // TODO(birenroy): Consider queuing a RST_STREAM INTERNAL_ERROR instead.
838 CloseStream(stream_id, Http2ErrorCode::INTERNAL_ERROR);
839 // No more work on the stream; it has been closed.
840 break;
841 }
842 const bool fin = end_data ? state.outbound_body->send_fin() : false;
843 if (length > 0 || fin) {
844 spdy::SpdyDataIR data(stream_id);
845 data.set_fin(fin);
846 data.SetDataShallow(length);
847 spdy::SpdySerializedFrame header =
848 spdy::SpdyFramer::SerializeDataFrameHeaderWithPaddingLengthField(
849 data);
850 QUICHE_DCHECK(buffered_data_.empty() && frames_.empty());
851 const bool success =
852 state.outbound_body->Send(absl::string_view(header), length);
853 if (!success) {
854 connection_can_write = SendResult::SEND_BLOCKED;
855 break;
856 }
857 connection_send_window_ -= length;
858 state.send_window -= length;
859 available_window = std::min({connection_send_window_, state.send_window,
860 static_cast<int32_t>(max_frame_payload_)});
861 if (fin) {
862 state.half_closed_local = true;
863 MaybeFinWithRstStream(it);
864 }
865 const bool ok = AfterFrameSent(/* DATA */ 0, stream_id, length,
866 fin ? END_STREAM_FLAG : 0x0, 0);
867 if (!ok) {
868 LatchErrorAndNotify(Http2ErrorCode::INTERNAL_ERROR,
869 ConnectionError::kSendError);
870 return SendResult::SEND_ERROR;
871 }
872 if (!stream_map_.contains(stream_id)) {
873 // Note: the stream may have been closed if `fin` is true.
874 break;
875 }
876 }
877 if (end_data || (length == 0 && state.trailers != nullptr &&
878 !options_.trailers_require_end_data)) {
879 // If SelectPayloadLength() returned {0, false}, and there are trailers to
880 // send, and the safety feature is disabled, it's okay to send the
881 // trailers.
882 if (state.trailers != nullptr) {
883 auto block_ptr = std::move(state.trailers);
884 if (fin) {
885 QUICHE_LOG(ERROR) << "Sent fin; can't send trailers.";
886 } else {
887 SendTrailers(stream_id, std::move(*block_ptr));
888 }
889 }
890 state.outbound_body = nullptr;
891 }
892 }
893 // If the stream still exists and has data to send, it should be marked as
894 // ready in the write scheduler.
895 if (stream_map_.contains(stream_id) && !state.data_deferred &&
896 state.send_window > 0 && state.outbound_body != nullptr) {
897 write_scheduler_.MarkStreamReady(stream_id, false);
898 }
899 // Streams can continue writing as long as the connection is not write-blocked
900 // and there is additional flow control quota available.
901 if (connection_can_write != SendResult::SEND_OK) {
902 return connection_can_write;
903 }
904 return connection_send_window_ <= 0 ? SendResult::SEND_BLOCKED
905 : SendResult::SEND_OK;
906 }
907
SerializeMetadata(Http2StreamId stream_id,std::unique_ptr<MetadataSource> source)908 void OgHttp2Session::SerializeMetadata(Http2StreamId stream_id,
909 std::unique_ptr<MetadataSource> source) {
910 const uint32_t max_payload_size =
911 std::min(kMaxAllowedMetadataFrameSize, max_frame_payload_);
912 auto payload_buffer = std::make_unique<uint8_t[]>(max_payload_size);
913
914 while (true) {
915 auto [written, end_metadata] =
916 source->Pack(payload_buffer.get(), max_payload_size);
917 if (written < 0) {
918 // Unable to pack any metadata.
919 return;
920 }
921 QUICHE_DCHECK_LE(static_cast<size_t>(written), max_payload_size);
922 auto payload = absl::string_view(
923 reinterpret_cast<const char*>(payload_buffer.get()), written);
924 EnqueueFrame(std::make_unique<spdy::SpdyUnknownIR>(
925 stream_id, kMetadataFrameType, end_metadata ? kMetadataEndFlag : 0u,
926 std::string(payload)));
927 if (end_metadata) {
928 return;
929 }
930 }
931 }
932
SubmitRequest(absl::Span<const Header> headers,std::unique_ptr<DataFrameSource> data_source,void * user_data)933 int32_t OgHttp2Session::SubmitRequest(
934 absl::Span<const Header> headers,
935 std::unique_ptr<DataFrameSource> data_source, void* user_data) {
936 // TODO(birenroy): return an error for the incorrect perspective
937 const Http2StreamId stream_id = next_stream_id_;
938 next_stream_id_ += 2;
939 if (!pending_streams_.empty() || !CanCreateStream()) {
940 // TODO(diannahu): There should probably be a limit to the number of allowed
941 // pending streams.
942 pending_streams_.insert(
943 {stream_id, PendingStreamState{ToHeaderBlock(headers),
944 std::move(data_source), user_data}});
945 StartPendingStreams();
946 } else {
947 StartRequest(stream_id, ToHeaderBlock(headers), std::move(data_source),
948 user_data);
949 }
950 return stream_id;
951 }
952
SubmitResponse(Http2StreamId stream_id,absl::Span<const Header> headers,std::unique_ptr<DataFrameSource> data_source)953 int OgHttp2Session::SubmitResponse(
954 Http2StreamId stream_id, absl::Span<const Header> headers,
955 std::unique_ptr<DataFrameSource> data_source) {
956 // TODO(birenroy): return an error for the incorrect perspective
957 auto iter = stream_map_.find(stream_id);
958 if (iter == stream_map_.end()) {
959 QUICHE_LOG(ERROR) << "Unable to find stream " << stream_id;
960 return -501; // NGHTTP2_ERR_INVALID_ARGUMENT
961 }
962 const bool end_stream = data_source == nullptr;
963 if (!end_stream) {
964 // Add data source to stream state
965 iter->second.outbound_body = std::move(data_source);
966 write_scheduler_.MarkStreamReady(stream_id, false);
967 }
968 SendHeaders(stream_id, ToHeaderBlock(headers), end_stream);
969 return 0;
970 }
971
SubmitTrailer(Http2StreamId stream_id,absl::Span<const Header> trailers)972 int OgHttp2Session::SubmitTrailer(Http2StreamId stream_id,
973 absl::Span<const Header> trailers) {
974 // TODO(birenroy): Reject trailers when acting as a client?
975 auto iter = stream_map_.find(stream_id);
976 if (iter == stream_map_.end()) {
977 QUICHE_LOG(ERROR) << "Unable to find stream " << stream_id;
978 return -501; // NGHTTP2_ERR_INVALID_ARGUMENT
979 }
980 StreamState& state = iter->second;
981 if (state.half_closed_local) {
982 QUICHE_LOG(ERROR) << "Stream " << stream_id << " is half closed (local)";
983 return -514; // NGHTTP2_ERR_INVALID_STREAM_STATE
984 }
985 if (state.trailers != nullptr) {
986 QUICHE_LOG(ERROR) << "Stream " << stream_id
987 << " already has trailers queued";
988 return -514; // NGHTTP2_ERR_INVALID_STREAM_STATE
989 }
990 if (state.outbound_body == nullptr) {
991 // Enqueue trailers immediately.
992 SendTrailers(stream_id, ToHeaderBlock(trailers));
993 } else {
994 QUICHE_LOG_IF(ERROR, state.outbound_body->send_fin())
995 << "DataFrameSource will send fin, preventing trailers!";
996 // Save trailers so they can be written once data is done.
997 state.trailers =
998 std::make_unique<spdy::Http2HeaderBlock>(ToHeaderBlock(trailers));
999 if (!options_.trailers_require_end_data || !iter->second.data_deferred) {
1000 trailers_ready_.insert(stream_id);
1001 }
1002 }
1003 return 0;
1004 }
1005
SubmitMetadata(Http2StreamId stream_id,std::unique_ptr<MetadataSource> source)1006 void OgHttp2Session::SubmitMetadata(Http2StreamId stream_id,
1007 std::unique_ptr<MetadataSource> source) {
1008 SerializeMetadata(stream_id, std::move(source));
1009 }
1010
SubmitSettings(absl::Span<const Http2Setting> settings)1011 void OgHttp2Session::SubmitSettings(absl::Span<const Http2Setting> settings) {
1012 auto frame = PrepareSettingsFrame(settings);
1013 EnqueueFrame(std::move(frame));
1014 }
1015
OnError(SpdyFramerError error,std::string detailed_error)1016 void OgHttp2Session::OnError(SpdyFramerError error,
1017 std::string detailed_error) {
1018 QUICHE_VLOG(1) << "Error: "
1019 << http2::Http2DecoderAdapter::SpdyFramerErrorToString(error)
1020 << " details: " << detailed_error;
1021 // TODO(diannahu): Consider propagating `detailed_error`.
1022 LatchErrorAndNotify(GetHttp2ErrorCode(error), ConnectionError::kParseError);
1023 }
1024
OnCommonHeader(spdy::SpdyStreamId stream_id,size_t length,uint8_t type,uint8_t flags)1025 void OgHttp2Session::OnCommonHeader(spdy::SpdyStreamId stream_id, size_t length,
1026 uint8_t type, uint8_t flags) {
1027 highest_received_stream_id_ = std::max(static_cast<Http2StreamId>(stream_id),
1028 highest_received_stream_id_);
1029 if (streams_reset_.contains(stream_id)) {
1030 return;
1031 }
1032 const bool result = visitor_.OnFrameHeader(stream_id, length, type, flags);
1033 if (!result) {
1034 fatal_visitor_callback_failure_ = true;
1035 decoder_.StopProcessing();
1036 }
1037 }
1038
OnDataFrameHeader(spdy::SpdyStreamId stream_id,size_t length,bool)1039 void OgHttp2Session::OnDataFrameHeader(spdy::SpdyStreamId stream_id,
1040 size_t length, bool /*fin*/) {
1041 auto iter = stream_map_.find(stream_id);
1042 if (iter == stream_map_.end() || streams_reset_.contains(stream_id)) {
1043 // The stream does not exist; it could be an error or a benign close, e.g.,
1044 // getting data for a stream this connection recently closed.
1045 if (static_cast<Http2StreamId>(stream_id) > highest_processed_stream_id_) {
1046 // Receiving DATA before HEADERS is a connection error.
1047 LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
1048 ConnectionError::kWrongFrameSequence);
1049 }
1050 return;
1051 }
1052
1053 if (static_cast<int64_t>(length) >
1054 connection_window_manager_.CurrentWindowSize()) {
1055 // Peer exceeded the connection flow control limit.
1056 LatchErrorAndNotify(
1057 Http2ErrorCode::FLOW_CONTROL_ERROR,
1058 Http2VisitorInterface::ConnectionError::kFlowControlError);
1059 return;
1060 }
1061
1062 if (static_cast<int64_t>(length) >
1063 iter->second.window_manager.CurrentWindowSize()) {
1064 // Peer exceeded the stream flow control limit.
1065 EnqueueFrame(std::make_unique<spdy::SpdyRstStreamIR>(
1066 stream_id, spdy::ERROR_CODE_FLOW_CONTROL_ERROR));
1067 return;
1068 }
1069
1070 const bool result = visitor_.OnBeginDataForStream(stream_id, length);
1071 if (!result) {
1072 fatal_visitor_callback_failure_ = true;
1073 decoder_.StopProcessing();
1074 }
1075
1076 if (!iter->second.can_receive_body && length > 0) {
1077 EnqueueFrame(std::make_unique<spdy::SpdyRstStreamIR>(
1078 stream_id, spdy::ERROR_CODE_PROTOCOL_ERROR));
1079 return;
1080 }
1081
1082 // Validate against the content-length if it exists.
1083 if (iter->second.remaining_content_length.has_value()) {
1084 if (length > *iter->second.remaining_content_length) {
1085 HandleContentLengthError(stream_id);
1086 iter->second.remaining_content_length.reset();
1087 } else {
1088 *iter->second.remaining_content_length -= length;
1089 }
1090 }
1091 }
1092
OnStreamFrameData(spdy::SpdyStreamId stream_id,const char * data,size_t len)1093 void OgHttp2Session::OnStreamFrameData(spdy::SpdyStreamId stream_id,
1094 const char* data, size_t len) {
1095 // Count the data against flow control, even if the stream is unknown.
1096 MarkDataBuffered(stream_id, len);
1097
1098 if (!stream_map_.contains(stream_id) || streams_reset_.contains(stream_id)) {
1099 // If the stream was unknown due to a protocol error, the visitor was
1100 // informed in OnDataFrameHeader().
1101 return;
1102 }
1103
1104 const bool result =
1105 visitor_.OnDataForStream(stream_id, absl::string_view(data, len));
1106 if (!result) {
1107 fatal_visitor_callback_failure_ = true;
1108 decoder_.StopProcessing();
1109 }
1110 }
1111
OnStreamEnd(spdy::SpdyStreamId stream_id)1112 void OgHttp2Session::OnStreamEnd(spdy::SpdyStreamId stream_id) {
1113 auto iter = stream_map_.find(stream_id);
1114 if (iter != stream_map_.end()) {
1115 iter->second.half_closed_remote = true;
1116 if (streams_reset_.contains(stream_id)) {
1117 return;
1118 }
1119
1120 // Validate against the content-length if it exists.
1121 if (iter->second.remaining_content_length.has_value() &&
1122 *iter->second.remaining_content_length != 0) {
1123 HandleContentLengthError(stream_id);
1124 return;
1125 }
1126
1127 const bool result = visitor_.OnEndStream(stream_id);
1128 if (!result) {
1129 fatal_visitor_callback_failure_ = true;
1130 decoder_.StopProcessing();
1131 }
1132 }
1133
1134 auto queued_frames_iter = queued_frames_.find(stream_id);
1135 const bool no_queued_frames = queued_frames_iter == queued_frames_.end() ||
1136 queued_frames_iter->second == 0;
1137 if (iter != stream_map_.end() && iter->second.half_closed_local &&
1138 !IsServerSession() && no_queued_frames) {
1139 // From the client's perspective, the stream can be closed if it's already
1140 // half_closed_local.
1141 CloseStream(stream_id, Http2ErrorCode::HTTP2_NO_ERROR);
1142 }
1143 }
1144
OnStreamPadLength(spdy::SpdyStreamId stream_id,size_t value)1145 void OgHttp2Session::OnStreamPadLength(spdy::SpdyStreamId stream_id,
1146 size_t value) {
1147 bool result = visitor_.OnDataPaddingLength(stream_id, 1 + value);
1148 if (!result) {
1149 fatal_visitor_callback_failure_ = true;
1150 decoder_.StopProcessing();
1151 }
1152 MarkDataBuffered(stream_id, 1 + value);
1153 }
1154
OnStreamPadding(spdy::SpdyStreamId,size_t)1155 void OgHttp2Session::OnStreamPadding(spdy::SpdyStreamId /*stream_id*/, size_t
1156 /*len*/) {
1157 // Flow control was accounted for in OnStreamPadLength().
1158 // TODO(181586191): Pass padding to the visitor?
1159 }
1160
OnHeaderFrameStart(spdy::SpdyStreamId stream_id)1161 spdy::SpdyHeadersHandlerInterface* OgHttp2Session::OnHeaderFrameStart(
1162 spdy::SpdyStreamId stream_id) {
1163 auto it = stream_map_.find(stream_id);
1164 if (it != stream_map_.end() && !streams_reset_.contains(stream_id)) {
1165 headers_handler_.set_stream_id(stream_id);
1166 headers_handler_.set_header_type(
1167 NextHeaderType(it->second.received_header_type));
1168 return &headers_handler_;
1169 } else {
1170 return &noop_headers_handler_;
1171 }
1172 }
1173
OnHeaderFrameEnd(spdy::SpdyStreamId stream_id)1174 void OgHttp2Session::OnHeaderFrameEnd(spdy::SpdyStreamId stream_id) {
1175 auto it = stream_map_.find(stream_id);
1176 if (it != stream_map_.end()) {
1177 if (headers_handler_.header_type() == HeaderType::RESPONSE &&
1178 !headers_handler_.status_header().empty() &&
1179 headers_handler_.status_header()[0] == '1') {
1180 // If response headers carried a 1xx response code, final response headers
1181 // should still be forthcoming.
1182 headers_handler_.set_header_type(HeaderType::RESPONSE_100);
1183 }
1184 it->second.received_header_type = headers_handler_.header_type();
1185
1186 // Track the content-length if the headers indicate that a body can follow.
1187 it->second.can_receive_body =
1188 headers_handler_.CanReceiveBody() && !it->second.sent_head_method;
1189 if (it->second.can_receive_body) {
1190 it->second.remaining_content_length = headers_handler_.content_length();
1191 }
1192
1193 headers_handler_.set_stream_id(0);
1194 }
1195 }
1196
OnRstStream(spdy::SpdyStreamId stream_id,spdy::SpdyErrorCode error_code)1197 void OgHttp2Session::OnRstStream(spdy::SpdyStreamId stream_id,
1198 spdy::SpdyErrorCode error_code) {
1199 auto iter = stream_map_.find(stream_id);
1200 if (iter != stream_map_.end()) {
1201 iter->second.half_closed_remote = true;
1202 iter->second.outbound_body = nullptr;
1203 } else if (static_cast<Http2StreamId>(stream_id) >
1204 highest_processed_stream_id_) {
1205 // Receiving RST_STREAM before HEADERS is a connection error.
1206 LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
1207 ConnectionError::kWrongFrameSequence);
1208 return;
1209 }
1210 if (streams_reset_.contains(stream_id)) {
1211 return;
1212 }
1213 visitor_.OnRstStream(stream_id, TranslateErrorCode(error_code));
1214 // TODO(birenroy): Consider whether there are outbound frames queued for the
1215 // stream.
1216 CloseStream(stream_id, TranslateErrorCode(error_code));
1217 }
1218
OnSettings()1219 void OgHttp2Session::OnSettings() {
1220 visitor_.OnSettingsStart();
1221 auto settings = std::make_unique<SpdySettingsIR>();
1222 settings->set_is_ack(true);
1223 EnqueueFrame(std::move(settings));
1224 }
1225
OnSetting(spdy::SpdySettingsId id,uint32_t value)1226 void OgHttp2Session::OnSetting(spdy::SpdySettingsId id, uint32_t value) {
1227 switch (id) {
1228 case HEADER_TABLE_SIZE:
1229 value = std::min(value, HpackCapacityBound(options_));
1230 if (value < framer_.GetHpackEncoder()->CurrentHeaderTableSizeSetting()) {
1231 // Safe to apply a smaller table capacity immediately.
1232 QUICHE_VLOG(2) << TracePerspectiveAsString(options_.perspective)
1233 << " applying encoder table capacity " << value;
1234 framer_.GetHpackEncoder()->ApplyHeaderTableSizeSetting(value);
1235 } else {
1236 QUICHE_VLOG(2)
1237 << TracePerspectiveAsString(options_.perspective)
1238 << " NOT applying encoder table capacity until writing ack: "
1239 << value;
1240 encoder_header_table_capacity_when_acking_ = value;
1241 }
1242 break;
1243 case ENABLE_PUSH:
1244 if (value > 1u) {
1245 visitor_.OnInvalidFrame(
1246 0, Http2VisitorInterface::InvalidFrameError::kProtocol);
1247 // The specification says this is a connection-level protocol error.
1248 LatchErrorAndNotify(
1249 Http2ErrorCode::PROTOCOL_ERROR,
1250 Http2VisitorInterface::ConnectionError::kInvalidSetting);
1251 return;
1252 }
1253 // Aside from validation, this setting is ignored.
1254 break;
1255 case MAX_CONCURRENT_STREAMS:
1256 max_outbound_concurrent_streams_ = value;
1257 if (!IsServerSession()) {
1258 // We may now be able to start pending streams.
1259 StartPendingStreams();
1260 }
1261 break;
1262 case INITIAL_WINDOW_SIZE:
1263 if (value > spdy::kSpdyMaximumWindowSize) {
1264 visitor_.OnInvalidFrame(
1265 0, Http2VisitorInterface::InvalidFrameError::kFlowControl);
1266 // The specification says this is a connection-level flow control error.
1267 LatchErrorAndNotify(
1268 Http2ErrorCode::FLOW_CONTROL_ERROR,
1269 Http2VisitorInterface::ConnectionError::kFlowControlError);
1270 return;
1271 } else {
1272 UpdateStreamSendWindowSizes(value);
1273 }
1274 break;
1275 case MAX_FRAME_SIZE:
1276 if (value < kDefaultFramePayloadSizeLimit ||
1277 value > kMaximumFramePayloadSizeLimit) {
1278 visitor_.OnInvalidFrame(
1279 0, Http2VisitorInterface::InvalidFrameError::kProtocol);
1280 // The specification says this is a connection-level protocol error.
1281 LatchErrorAndNotify(
1282 Http2ErrorCode::PROTOCOL_ERROR,
1283 Http2VisitorInterface::ConnectionError::kInvalidSetting);
1284 return;
1285 }
1286 max_frame_payload_ = value;
1287 break;
1288 case ENABLE_CONNECT_PROTOCOL:
1289 if (value > 1u || (value == 0 && peer_enables_connect_protocol_)) {
1290 visitor_.OnInvalidFrame(
1291 0, Http2VisitorInterface::InvalidFrameError::kProtocol);
1292 LatchErrorAndNotify(
1293 Http2ErrorCode::PROTOCOL_ERROR,
1294 Http2VisitorInterface::ConnectionError::kInvalidSetting);
1295 return;
1296 }
1297 peer_enables_connect_protocol_ = (value == 1u);
1298 break;
1299 default:
1300 // TODO(bnc): See if C++17 inline constants are allowed in QUICHE.
1301 if (id == kMetadataExtensionId) {
1302 peer_supports_metadata_ = (value != 0);
1303 } else {
1304 QUICHE_VLOG(1) << "Unimplemented SETTING id: " << id;
1305 }
1306 }
1307 visitor_.OnSetting({id, value});
1308 }
1309
OnSettingsEnd()1310 void OgHttp2Session::OnSettingsEnd() { visitor_.OnSettingsEnd(); }
1311
OnSettingsAck()1312 void OgHttp2Session::OnSettingsAck() {
1313 if (!settings_ack_callbacks_.empty()) {
1314 SettingsAckCallback callback = std::move(settings_ack_callbacks_.front());
1315 settings_ack_callbacks_.pop_front();
1316 callback();
1317 }
1318
1319 visitor_.OnSettingsAck();
1320 }
1321
OnPing(spdy::SpdyPingId unique_id,bool is_ack)1322 void OgHttp2Session::OnPing(spdy::SpdyPingId unique_id, bool is_ack) {
1323 visitor_.OnPing(unique_id, is_ack);
1324 if (options_.auto_ping_ack && !is_ack) {
1325 auto ping = std::make_unique<spdy::SpdyPingIR>(unique_id);
1326 ping->set_is_ack(true);
1327 EnqueueFrame(std::move(ping));
1328 }
1329 }
1330
OnGoAway(spdy::SpdyStreamId last_accepted_stream_id,spdy::SpdyErrorCode error_code)1331 void OgHttp2Session::OnGoAway(spdy::SpdyStreamId last_accepted_stream_id,
1332 spdy::SpdyErrorCode error_code) {
1333 if (received_goaway_ &&
1334 last_accepted_stream_id >
1335 static_cast<spdy::SpdyStreamId>(received_goaway_stream_id_)) {
1336 // This GOAWAY has a higher `last_accepted_stream_id` than a previous
1337 // GOAWAY, a connection-level spec violation.
1338 const bool ok = visitor_.OnInvalidFrame(
1339 kConnectionStreamId,
1340 Http2VisitorInterface::InvalidFrameError::kProtocol);
1341 if (!ok) {
1342 fatal_visitor_callback_failure_ = true;
1343 }
1344 LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
1345 ConnectionError::kInvalidGoAwayLastStreamId);
1346 return;
1347 }
1348
1349 received_goaway_ = true;
1350 received_goaway_stream_id_ = last_accepted_stream_id;
1351 const bool result = visitor_.OnGoAway(last_accepted_stream_id,
1352 TranslateErrorCode(error_code), "");
1353 if (!result) {
1354 fatal_visitor_callback_failure_ = true;
1355 decoder_.StopProcessing();
1356 }
1357
1358 // Close the streams above `last_accepted_stream_id`. Only applies if the
1359 // session receives a GOAWAY as a client, as we do not support server push.
1360 if (last_accepted_stream_id == spdy::kMaxStreamId || IsServerSession()) {
1361 return;
1362 }
1363 std::vector<Http2StreamId> streams_to_close;
1364 for (const auto& [stream_id, stream_state] : stream_map_) {
1365 if (static_cast<spdy::SpdyStreamId>(stream_id) > last_accepted_stream_id) {
1366 streams_to_close.push_back(stream_id);
1367 }
1368 }
1369 for (Http2StreamId stream_id : streams_to_close) {
1370 CloseStream(stream_id, Http2ErrorCode::REFUSED_STREAM);
1371 }
1372 }
1373
OnGoAwayFrameData(const char *,size_t)1374 bool OgHttp2Session::OnGoAwayFrameData(const char* /*goaway_data*/, size_t
1375 /*len*/) {
1376 // Opaque data is currently ignored.
1377 return true;
1378 }
1379
OnHeaders(spdy::SpdyStreamId stream_id,size_t,bool,int,spdy::SpdyStreamId,bool,bool fin,bool)1380 void OgHttp2Session::OnHeaders(spdy::SpdyStreamId stream_id,
1381 size_t /*payload_length*/, bool /*has_priority*/,
1382 int /*weight*/,
1383 spdy::SpdyStreamId /*parent_stream_id*/,
1384 bool /*exclusive*/, bool fin, bool /*end*/) {
1385 if (stream_id % 2 == 0) {
1386 // Server push is disabled; receiving push HEADERS is a connection error.
1387 LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
1388 ConnectionError::kInvalidNewStreamId);
1389 return;
1390 }
1391 if (fin) {
1392 headers_handler_.set_frame_contains_fin();
1393 }
1394 if (IsServerSession()) {
1395 const auto new_stream_id = static_cast<Http2StreamId>(stream_id);
1396 if (stream_map_.find(new_stream_id) != stream_map_.end() && fin) {
1397 // Not a new stream, must be trailers.
1398 return;
1399 }
1400 if (new_stream_id <= highest_processed_stream_id_) {
1401 // A new stream ID lower than the watermark is a connection error.
1402 LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
1403 ConnectionError::kInvalidNewStreamId);
1404 return;
1405 }
1406
1407 if (stream_map_.size() >= max_inbound_concurrent_streams_) {
1408 // The new stream would exceed our advertised and acknowledged
1409 // MAX_CONCURRENT_STREAMS. For parity with nghttp2, treat this error as a
1410 // connection-level PROTOCOL_ERROR.
1411 bool ok = visitor_.OnInvalidFrame(
1412 stream_id, Http2VisitorInterface::InvalidFrameError::kProtocol);
1413 if (!ok) {
1414 fatal_visitor_callback_failure_ = true;
1415 }
1416 LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
1417 ConnectionError::kExceededMaxConcurrentStreams);
1418 return;
1419 }
1420 if (stream_map_.size() >= pending_max_inbound_concurrent_streams_) {
1421 // The new stream would exceed our advertised but unacked
1422 // MAX_CONCURRENT_STREAMS. Refuse the stream for parity with nghttp2.
1423 EnqueueFrame(std::make_unique<spdy::SpdyRstStreamIR>(
1424 stream_id, spdy::ERROR_CODE_REFUSED_STREAM));
1425 const bool ok = visitor_.OnInvalidFrame(
1426 stream_id, Http2VisitorInterface::InvalidFrameError::kRefusedStream);
1427 if (!ok) {
1428 fatal_visitor_callback_failure_ = true;
1429 LatchErrorAndNotify(Http2ErrorCode::REFUSED_STREAM,
1430 ConnectionError::kExceededMaxConcurrentStreams);
1431 }
1432 return;
1433 }
1434
1435 CreateStream(stream_id);
1436 }
1437 }
1438
OnWindowUpdate(spdy::SpdyStreamId stream_id,int delta_window_size)1439 void OgHttp2Session::OnWindowUpdate(spdy::SpdyStreamId stream_id,
1440 int delta_window_size) {
1441 constexpr int kMaxWindowValue = 2147483647; // (1 << 31) - 1
1442 if (stream_id == 0) {
1443 if (delta_window_size == 0) {
1444 // A PROTOCOL_ERROR, according to RFC 9113 Section 6.9.
1445 LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
1446 ConnectionError::kFlowControlError);
1447 return;
1448 }
1449 if (connection_send_window_ > 0 &&
1450 delta_window_size > (kMaxWindowValue - connection_send_window_)) {
1451 // Window overflow is a FLOW_CONTROL_ERROR.
1452 LatchErrorAndNotify(Http2ErrorCode::FLOW_CONTROL_ERROR,
1453 ConnectionError::kFlowControlError);
1454 return;
1455 }
1456 connection_send_window_ += delta_window_size;
1457 } else {
1458 if (delta_window_size == 0) {
1459 // A PROTOCOL_ERROR, according to RFC 9113 Section 6.9.
1460 EnqueueFrame(std::make_unique<spdy::SpdyRstStreamIR>(
1461 stream_id, spdy::ERROR_CODE_PROTOCOL_ERROR));
1462 return;
1463 }
1464 auto it = stream_map_.find(stream_id);
1465 if (it == stream_map_.end()) {
1466 QUICHE_VLOG(1) << "Stream " << stream_id << " not found!";
1467 if (static_cast<Http2StreamId>(stream_id) >
1468 highest_processed_stream_id_) {
1469 // Receiving WINDOW_UPDATE before HEADERS is a connection error.
1470 LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
1471 ConnectionError::kWrongFrameSequence);
1472 }
1473 // Do not inform the visitor of a WINDOW_UPDATE for a non-existent stream.
1474 return;
1475 } else {
1476 if (streams_reset_.contains(stream_id)) {
1477 return;
1478 }
1479 if (it->second.send_window > 0 &&
1480 delta_window_size > (kMaxWindowValue - it->second.send_window)) {
1481 // Window overflow is a FLOW_CONTROL_ERROR.
1482 EnqueueFrame(std::make_unique<spdy::SpdyRstStreamIR>(
1483 stream_id, spdy::ERROR_CODE_FLOW_CONTROL_ERROR));
1484 return;
1485 }
1486 const bool was_blocked = (it->second.send_window <= 0);
1487 it->second.send_window += delta_window_size;
1488 if (was_blocked && it->second.send_window > 0) {
1489 // The stream was blocked on flow control.
1490 QUICHE_VLOG(1) << "Marking stream " << stream_id << " ready to write.";
1491 write_scheduler_.MarkStreamReady(stream_id, false);
1492 }
1493 }
1494 }
1495 visitor_.OnWindowUpdate(stream_id, delta_window_size);
1496 }
1497
OnPushPromise(spdy::SpdyStreamId,spdy::SpdyStreamId,bool)1498 void OgHttp2Session::OnPushPromise(spdy::SpdyStreamId /*stream_id*/,
1499 spdy::SpdyStreamId /*promised_stream_id*/,
1500 bool /*end*/) {
1501 // Server push is disabled; PUSH_PROMISE is an invalid frame.
1502 LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
1503 ConnectionError::kInvalidPushPromise);
1504 }
1505
OnContinuation(spdy::SpdyStreamId,size_t,bool)1506 void OgHttp2Session::OnContinuation(spdy::SpdyStreamId /*stream_id*/,
1507 size_t /*payload_length*/, bool /*end*/) {}
1508
OnAltSvc(spdy::SpdyStreamId,absl::string_view,const spdy::SpdyAltSvcWireFormat::AlternativeServiceVector &)1509 void OgHttp2Session::OnAltSvc(spdy::SpdyStreamId /*stream_id*/,
1510 absl::string_view /*origin*/,
1511 const spdy::SpdyAltSvcWireFormat::
1512 AlternativeServiceVector& /*altsvc_vector*/) {
1513 }
1514
OnPriority(spdy::SpdyStreamId,spdy::SpdyStreamId,int,bool)1515 void OgHttp2Session::OnPriority(spdy::SpdyStreamId /*stream_id*/,
1516 spdy::SpdyStreamId /*parent_stream_id*/,
1517 int /*weight*/, bool /*exclusive*/) {}
1518
OnPriorityUpdate(spdy::SpdyStreamId,absl::string_view)1519 void OgHttp2Session::OnPriorityUpdate(
1520 spdy::SpdyStreamId /*prioritized_stream_id*/,
1521 absl::string_view /*priority_field_value*/) {}
1522
OnUnknownFrame(spdy::SpdyStreamId,uint8_t)1523 bool OgHttp2Session::OnUnknownFrame(spdy::SpdyStreamId /*stream_id*/,
1524 uint8_t /*frame_type*/) {
1525 return true;
1526 }
1527
OnUnknownFrameStart(spdy::SpdyStreamId stream_id,size_t length,uint8_t type,uint8_t flags)1528 void OgHttp2Session::OnUnknownFrameStart(spdy::SpdyStreamId stream_id,
1529 size_t length, uint8_t type,
1530 uint8_t flags) {
1531 process_metadata_ = false;
1532 if (streams_reset_.contains(stream_id)) {
1533 return;
1534 }
1535 if (type == kMetadataFrameType) {
1536 QUICHE_DCHECK_EQ(metadata_length_, 0u);
1537 visitor_.OnBeginMetadataForStream(stream_id, length);
1538 metadata_length_ = length;
1539 process_metadata_ = true;
1540 end_metadata_ = flags & kMetadataEndFlag;
1541
1542 // Empty metadata payloads will not trigger OnUnknownFramePayload(), so
1543 // handle that possibility here.
1544 MaybeHandleMetadataEndForStream(stream_id);
1545 } else {
1546 QUICHE_DLOG(INFO) << "Received unexpected frame type "
1547 << static_cast<int>(type);
1548 }
1549 }
1550
OnUnknownFramePayload(spdy::SpdyStreamId stream_id,absl::string_view payload)1551 void OgHttp2Session::OnUnknownFramePayload(spdy::SpdyStreamId stream_id,
1552 absl::string_view payload) {
1553 if (!process_metadata_) {
1554 return;
1555 }
1556 if (streams_reset_.contains(stream_id)) {
1557 return;
1558 }
1559 if (metadata_length_ > 0) {
1560 QUICHE_DCHECK_LE(payload.size(), metadata_length_);
1561 const bool payload_success =
1562 visitor_.OnMetadataForStream(stream_id, payload);
1563 if (payload_success) {
1564 metadata_length_ -= payload.size();
1565 MaybeHandleMetadataEndForStream(stream_id);
1566 } else {
1567 fatal_visitor_callback_failure_ = true;
1568 decoder_.StopProcessing();
1569 }
1570 } else {
1571 QUICHE_DLOG(INFO) << "Unexpected metadata payload for stream " << stream_id;
1572 }
1573 }
1574
OnHeaderStatus(Http2StreamId stream_id,Http2VisitorInterface::OnHeaderResult result)1575 void OgHttp2Session::OnHeaderStatus(
1576 Http2StreamId stream_id, Http2VisitorInterface::OnHeaderResult result) {
1577 QUICHE_DCHECK_NE(result, Http2VisitorInterface::HEADER_OK);
1578 QUICHE_VLOG(1) << "OnHeaderStatus(stream_id=" << stream_id
1579 << ", result=" << result << ")";
1580 const bool should_reset_stream =
1581 result == Http2VisitorInterface::HEADER_RST_STREAM ||
1582 result == Http2VisitorInterface::HEADER_FIELD_INVALID ||
1583 result == Http2VisitorInterface::HEADER_HTTP_MESSAGING;
1584 if (should_reset_stream) {
1585 const Http2ErrorCode error_code =
1586 (result == Http2VisitorInterface::HEADER_RST_STREAM)
1587 ? Http2ErrorCode::INTERNAL_ERROR
1588 : Http2ErrorCode::PROTOCOL_ERROR;
1589 const spdy::SpdyErrorCode spdy_error_code = TranslateErrorCode(error_code);
1590 const Http2VisitorInterface::InvalidFrameError frame_error =
1591 (result == Http2VisitorInterface::HEADER_RST_STREAM ||
1592 result == Http2VisitorInterface::HEADER_FIELD_INVALID)
1593 ? Http2VisitorInterface::InvalidFrameError::kHttpHeader
1594 : Http2VisitorInterface::InvalidFrameError::kHttpMessaging;
1595 auto it = streams_reset_.find(stream_id);
1596 if (it == streams_reset_.end()) {
1597 EnqueueFrame(
1598 std::make_unique<spdy::SpdyRstStreamIR>(stream_id, spdy_error_code));
1599
1600 if (result == Http2VisitorInterface::HEADER_FIELD_INVALID ||
1601 result == Http2VisitorInterface::HEADER_HTTP_MESSAGING) {
1602 const bool ok = visitor_.OnInvalidFrame(stream_id, frame_error);
1603 if (!ok) {
1604 fatal_visitor_callback_failure_ = true;
1605 LatchErrorAndNotify(error_code, ConnectionError::kHeaderError);
1606 }
1607 }
1608 }
1609 } else if (result == Http2VisitorInterface::HEADER_CONNECTION_ERROR) {
1610 fatal_visitor_callback_failure_ = true;
1611 LatchErrorAndNotify(Http2ErrorCode::INTERNAL_ERROR,
1612 ConnectionError::kHeaderError);
1613 } else if (result == Http2VisitorInterface::HEADER_COMPRESSION_ERROR) {
1614 LatchErrorAndNotify(Http2ErrorCode::COMPRESSION_ERROR,
1615 ConnectionError::kHeaderError);
1616 }
1617 }
1618
MaybeSetupPreface(bool sending_outbound_settings)1619 void OgHttp2Session::MaybeSetupPreface(bool sending_outbound_settings) {
1620 if (!queued_preface_) {
1621 queued_preface_ = true;
1622 if (!IsServerSession()) {
1623 buffered_data_.assign(spdy::kHttp2ConnectionHeaderPrefix,
1624 spdy::kHttp2ConnectionHeaderPrefixSize);
1625 }
1626 if (!sending_outbound_settings) {
1627 QUICHE_DCHECK(frames_.empty());
1628 // First frame must be a non-ack SETTINGS.
1629 EnqueueFrame(PrepareSettingsFrame(GetInitialSettings()));
1630 }
1631 }
1632 }
1633
GetInitialSettings() const1634 std::vector<Http2Setting> OgHttp2Session::GetInitialSettings() const {
1635 std::vector<Http2Setting> settings;
1636 if (!IsServerSession()) {
1637 // Disable server push. Note that server push from clients is already
1638 // disabled, so the server does not need to send this disabling setting.
1639 // TODO(diannahu): Consider applying server push disabling on SETTINGS ack.
1640 settings.push_back({Http2KnownSettingsId::ENABLE_PUSH, 0});
1641 }
1642 if (options_.max_header_list_bytes) {
1643 settings.push_back({Http2KnownSettingsId::MAX_HEADER_LIST_SIZE,
1644 *options_.max_header_list_bytes});
1645 }
1646 if (options_.allow_extended_connect && IsServerSession()) {
1647 settings.push_back({Http2KnownSettingsId::ENABLE_CONNECT_PROTOCOL, 1u});
1648 }
1649 return settings;
1650 }
1651
PrepareSettingsFrame(absl::Span<const Http2Setting> settings)1652 std::unique_ptr<SpdySettingsIR> OgHttp2Session::PrepareSettingsFrame(
1653 absl::Span<const Http2Setting> settings) {
1654 auto settings_ir = std::make_unique<SpdySettingsIR>();
1655 for (const Http2Setting& setting : settings) {
1656 settings_ir->AddSetting(setting.id, setting.value);
1657 }
1658 return settings_ir;
1659 }
1660
HandleOutboundSettings(const spdy::SpdySettingsIR & settings_frame)1661 void OgHttp2Session::HandleOutboundSettings(
1662 const spdy::SpdySettingsIR& settings_frame) {
1663 for (const auto& [id, value] : settings_frame.values()) {
1664 switch (static_cast<Http2KnownSettingsId>(id)) {
1665 case MAX_CONCURRENT_STREAMS:
1666 pending_max_inbound_concurrent_streams_ = value;
1667 break;
1668 case ENABLE_CONNECT_PROTOCOL:
1669 if (value == 1u && IsServerSession()) {
1670 // Allow extended CONNECT semantics even before SETTINGS are acked, to
1671 // make things easier for clients.
1672 headers_handler_.SetAllowExtendedConnect();
1673 }
1674 break;
1675 case HEADER_TABLE_SIZE:
1676 case ENABLE_PUSH:
1677 case INITIAL_WINDOW_SIZE:
1678 case MAX_FRAME_SIZE:
1679 case MAX_HEADER_LIST_SIZE:
1680 QUICHE_VLOG(2)
1681 << "Not adjusting internal state for outbound setting with id "
1682 << id;
1683 break;
1684 }
1685 }
1686
1687 // Copy the (small) map of settings we are about to send so that we can set
1688 // values in the SETTINGS ack callback.
1689 settings_ack_callbacks_.push_back(
1690 [this, settings_map = settings_frame.values()]() {
1691 for (const auto& [id, value] : settings_map) {
1692 switch (static_cast<Http2KnownSettingsId>(id)) {
1693 case MAX_CONCURRENT_STREAMS:
1694 max_inbound_concurrent_streams_ = value;
1695 break;
1696 case HEADER_TABLE_SIZE:
1697 decoder_.GetHpackDecoder()->ApplyHeaderTableSizeSetting(value);
1698 break;
1699 case INITIAL_WINDOW_SIZE:
1700 UpdateStreamReceiveWindowSizes(value);
1701 initial_stream_receive_window_ = value;
1702 break;
1703 case MAX_FRAME_SIZE:
1704 decoder_.SetMaxFrameSize(value);
1705 break;
1706 case ENABLE_PUSH:
1707 case MAX_HEADER_LIST_SIZE:
1708 case ENABLE_CONNECT_PROTOCOL:
1709 QUICHE_VLOG(2)
1710 << "No action required in ack for outbound setting with id "
1711 << id;
1712 break;
1713 }
1714 }
1715 });
1716 }
1717
SendWindowUpdate(Http2StreamId stream_id,size_t update_delta)1718 void OgHttp2Session::SendWindowUpdate(Http2StreamId stream_id,
1719 size_t update_delta) {
1720 EnqueueFrame(
1721 std::make_unique<spdy::SpdyWindowUpdateIR>(stream_id, update_delta));
1722 }
1723
SendHeaders(Http2StreamId stream_id,spdy::Http2HeaderBlock headers,bool end_stream)1724 void OgHttp2Session::SendHeaders(Http2StreamId stream_id,
1725 spdy::Http2HeaderBlock headers,
1726 bool end_stream) {
1727 auto frame =
1728 std::make_unique<spdy::SpdyHeadersIR>(stream_id, std::move(headers));
1729 frame->set_fin(end_stream);
1730 EnqueueFrame(std::move(frame));
1731 }
1732
SendTrailers(Http2StreamId stream_id,spdy::Http2HeaderBlock trailers)1733 void OgHttp2Session::SendTrailers(Http2StreamId stream_id,
1734 spdy::Http2HeaderBlock trailers) {
1735 auto frame =
1736 std::make_unique<spdy::SpdyHeadersIR>(stream_id, std::move(trailers));
1737 frame->set_fin(true);
1738 EnqueueFrame(std::move(frame));
1739 trailers_ready_.erase(stream_id);
1740 }
1741
MaybeFinWithRstStream(StreamStateMap::iterator iter)1742 void OgHttp2Session::MaybeFinWithRstStream(StreamStateMap::iterator iter) {
1743 QUICHE_DCHECK(iter != stream_map_.end() && iter->second.half_closed_local);
1744
1745 if (options_.rst_stream_no_error_when_incomplete && IsServerSession() &&
1746 !iter->second.half_closed_remote) {
1747 // Since the peer has not yet ended the stream, this endpoint should
1748 // send a RST_STREAM NO_ERROR. See RFC 7540 Section 8.1.
1749 EnqueueFrame(std::make_unique<spdy::SpdyRstStreamIR>(
1750 iter->first, spdy::SpdyErrorCode::ERROR_CODE_NO_ERROR));
1751 iter->second.half_closed_remote = true;
1752 }
1753 }
1754
MarkDataBuffered(Http2StreamId stream_id,size_t bytes)1755 void OgHttp2Session::MarkDataBuffered(Http2StreamId stream_id, size_t bytes) {
1756 connection_window_manager_.MarkDataBuffered(bytes);
1757 if (auto it = stream_map_.find(stream_id); it != stream_map_.end()) {
1758 it->second.window_manager.MarkDataBuffered(bytes);
1759 }
1760 }
1761
CreateStream(Http2StreamId stream_id)1762 OgHttp2Session::StreamStateMap::iterator OgHttp2Session::CreateStream(
1763 Http2StreamId stream_id) {
1764 WindowManager::WindowUpdateListener listener =
1765 [this, stream_id](size_t window_update_delta) {
1766 SendWindowUpdate(stream_id, window_update_delta);
1767 };
1768 auto [iter, inserted] = stream_map_.try_emplace(
1769 stream_id,
1770 StreamState(initial_stream_receive_window_, initial_stream_send_window_,
1771 std::move(listener), options_.should_window_update_fn));
1772 if (inserted) {
1773 // Add the stream to the write scheduler.
1774 const spdy::SpdyPriority priority = 3;
1775 write_scheduler_.RegisterStream(stream_id, priority);
1776
1777 highest_processed_stream_id_ =
1778 std::max(highest_processed_stream_id_, stream_id);
1779 }
1780 return iter;
1781 }
1782
StartRequest(Http2StreamId stream_id,spdy::Http2HeaderBlock headers,std::unique_ptr<DataFrameSource> data_source,void * user_data)1783 void OgHttp2Session::StartRequest(Http2StreamId stream_id,
1784 spdy::Http2HeaderBlock headers,
1785 std::unique_ptr<DataFrameSource> data_source,
1786 void* user_data) {
1787 if (received_goaway_) {
1788 // Do not start new streams after receiving a GOAWAY.
1789 goaway_rejected_streams_.insert(stream_id);
1790 return;
1791 }
1792
1793 auto iter = CreateStream(stream_id);
1794 const bool end_stream = data_source == nullptr;
1795 if (!end_stream) {
1796 iter->second.outbound_body = std::move(data_source);
1797 write_scheduler_.MarkStreamReady(stream_id, false);
1798 }
1799 iter->second.user_data = user_data;
1800 for (const auto& [name, value] : headers) {
1801 if (name == kHttp2MethodPseudoHeader && value == kHeadValue) {
1802 iter->second.sent_head_method = true;
1803 }
1804 }
1805 SendHeaders(stream_id, std::move(headers), end_stream);
1806 }
1807
StartPendingStreams()1808 void OgHttp2Session::StartPendingStreams() {
1809 while (!pending_streams_.empty() && CanCreateStream()) {
1810 auto& [stream_id, pending_stream] = pending_streams_.front();
1811 StartRequest(stream_id, std::move(pending_stream.headers),
1812 std::move(pending_stream.data_source),
1813 pending_stream.user_data);
1814 pending_streams_.pop_front();
1815 }
1816 }
1817
CloseStream(Http2StreamId stream_id,Http2ErrorCode error_code)1818 void OgHttp2Session::CloseStream(Http2StreamId stream_id,
1819 Http2ErrorCode error_code) {
1820 const bool result = visitor_.OnCloseStream(stream_id, error_code);
1821 if (!result) {
1822 latched_error_ = true;
1823 decoder_.StopProcessing();
1824 }
1825 stream_map_.erase(stream_id);
1826 trailers_ready_.erase(stream_id);
1827 streams_reset_.erase(stream_id);
1828 auto queued_it = queued_frames_.find(stream_id);
1829 if (queued_it != queued_frames_.end()) {
1830 // Remove any queued frames for this stream.
1831 int frames_remaining = queued_it->second;
1832 queued_frames_.erase(queued_it);
1833 for (auto it = frames_.begin();
1834 frames_remaining > 0 && it != frames_.end();) {
1835 if (static_cast<Http2StreamId>((*it)->stream_id()) == stream_id) {
1836 it = frames_.erase(it);
1837 --frames_remaining;
1838 } else {
1839 ++it;
1840 }
1841 }
1842 }
1843 if (write_scheduler_.StreamRegistered(stream_id)) {
1844 write_scheduler_.UnregisterStream(stream_id);
1845 }
1846
1847 StartPendingStreams();
1848 }
1849
CanCreateStream() const1850 bool OgHttp2Session::CanCreateStream() const {
1851 return stream_map_.size() < max_outbound_concurrent_streams_;
1852 }
1853
NextHeaderType(absl::optional<HeaderType> current_type)1854 HeaderType OgHttp2Session::NextHeaderType(
1855 absl::optional<HeaderType> current_type) {
1856 if (IsServerSession()) {
1857 if (!current_type) {
1858 return HeaderType::REQUEST;
1859 } else {
1860 QUICHE_DCHECK(current_type == HeaderType::REQUEST);
1861 return HeaderType::REQUEST_TRAILER;
1862 }
1863 } else if (!current_type ||
1864 current_type.value() == HeaderType::RESPONSE_100) {
1865 return HeaderType::RESPONSE;
1866 } else {
1867 return HeaderType::RESPONSE_TRAILER;
1868 }
1869 }
1870
LatchErrorAndNotify(Http2ErrorCode error_code,ConnectionError error)1871 void OgHttp2Session::LatchErrorAndNotify(Http2ErrorCode error_code,
1872 ConnectionError error) {
1873 if (latched_error_) {
1874 // Do not kick a connection when it is down.
1875 return;
1876 }
1877
1878 latched_error_ = true;
1879 visitor_.OnConnectionError(error);
1880 decoder_.StopProcessing();
1881 EnqueueFrame(std::make_unique<spdy::SpdyGoAwayIR>(
1882 highest_processed_stream_id_, TranslateErrorCode(error_code),
1883 ConnectionErrorToString(error)));
1884 }
1885
CloseStreamIfReady(uint8_t frame_type,uint32_t stream_id)1886 void OgHttp2Session::CloseStreamIfReady(uint8_t frame_type,
1887 uint32_t stream_id) {
1888 auto iter = stream_map_.find(stream_id);
1889 if (iter == stream_map_.end()) {
1890 return;
1891 }
1892 const StreamState& state = iter->second;
1893 if (static_cast<FrameType>(frame_type) == FrameType::RST_STREAM ||
1894 (state.half_closed_local && state.half_closed_remote)) {
1895 CloseStream(stream_id, Http2ErrorCode::HTTP2_NO_ERROR);
1896 }
1897 }
1898
CloseGoAwayRejectedStreams()1899 void OgHttp2Session::CloseGoAwayRejectedStreams() {
1900 for (Http2StreamId stream_id : goaway_rejected_streams_) {
1901 const bool result =
1902 visitor_.OnCloseStream(stream_id, Http2ErrorCode::REFUSED_STREAM);
1903 if (!result) {
1904 latched_error_ = true;
1905 decoder_.StopProcessing();
1906 }
1907 }
1908 goaway_rejected_streams_.clear();
1909 }
1910
PrepareForImmediateGoAway()1911 void OgHttp2Session::PrepareForImmediateGoAway() {
1912 queued_immediate_goaway_ = true;
1913
1914 // Keep the initial SETTINGS frame if the session has SETTINGS at the front of
1915 // the queue but has not sent SETTINGS yet. The session should send initial
1916 // SETTINGS before GOAWAY.
1917 std::unique_ptr<spdy::SpdyFrameIR> initial_settings;
1918 if (!sent_non_ack_settings_ && !frames_.empty() &&
1919 IsNonAckSettings(*frames_.front())) {
1920 initial_settings = std::move(frames_.front());
1921 frames_.pop_front();
1922 }
1923
1924 // Remove all pending frames except for RST_STREAMs. It is important to send
1925 // RST_STREAMs so the peer knows of errors below the GOAWAY last stream ID.
1926 // TODO(diannahu): Consider informing the visitor of dropped frames. This may
1927 // mean keeping the frames and invoking a frame-not-sent callback, similar to
1928 // nghttp2. Could add a closure to each frame in the frames queue.
1929 frames_.remove_if([](const auto& frame) {
1930 return frame->frame_type() != spdy::SpdyFrameType::RST_STREAM;
1931 });
1932
1933 if (initial_settings != nullptr) {
1934 frames_.push_front(std::move(initial_settings));
1935 }
1936 }
1937
MaybeHandleMetadataEndForStream(Http2StreamId stream_id)1938 void OgHttp2Session::MaybeHandleMetadataEndForStream(Http2StreamId stream_id) {
1939 if (metadata_length_ == 0 && end_metadata_) {
1940 const bool completion_success = visitor_.OnMetadataEndForStream(stream_id);
1941 if (!completion_success) {
1942 fatal_visitor_callback_failure_ = true;
1943 decoder_.StopProcessing();
1944 }
1945 process_metadata_ = false;
1946 end_metadata_ = false;
1947 }
1948 }
1949
DecrementQueuedFrameCount(uint32_t stream_id,uint8_t frame_type)1950 void OgHttp2Session::DecrementQueuedFrameCount(uint32_t stream_id,
1951 uint8_t frame_type) {
1952 auto iter = queued_frames_.find(stream_id);
1953 if (iter == queued_frames_.end()) {
1954 QUICHE_LOG(ERROR) << "Unable to find a queued frame count for stream "
1955 << stream_id;
1956 return;
1957 }
1958 if (static_cast<FrameType>(frame_type) != FrameType::DATA) {
1959 --iter->second;
1960 }
1961 if (iter->second == 0) {
1962 // TODO(birenroy): Consider passing through `error_code` here.
1963 CloseStreamIfReady(frame_type, stream_id);
1964 }
1965 }
1966
HandleContentLengthError(Http2StreamId stream_id)1967 void OgHttp2Session::HandleContentLengthError(Http2StreamId stream_id) {
1968 EnqueueFrame(std::make_unique<spdy::SpdyRstStreamIR>(
1969 stream_id, spdy::ERROR_CODE_PROTOCOL_ERROR));
1970 }
1971
UpdateReceiveWindow(Http2StreamId stream_id,int32_t delta)1972 void OgHttp2Session::UpdateReceiveWindow(Http2StreamId stream_id,
1973 int32_t delta) {
1974 if (stream_id == 0) {
1975 connection_window_manager_.IncreaseWindow(delta);
1976 // TODO(b/181586191): Provide an explicit way to set the desired window
1977 // limit, remove the upsize-on-window-update behavior.
1978 const int64_t current_window =
1979 connection_window_manager_.CurrentWindowSize();
1980 if (current_window > connection_window_manager_.WindowSizeLimit()) {
1981 connection_window_manager_.SetWindowSizeLimit(current_window);
1982 }
1983 } else {
1984 auto iter = stream_map_.find(stream_id);
1985 if (iter != stream_map_.end()) {
1986 WindowManager& manager = iter->second.window_manager;
1987 manager.IncreaseWindow(delta);
1988 // TODO(b/181586191): Provide an explicit way to set the desired window
1989 // limit, remove the upsize-on-window-update behavior.
1990 const int64_t current_window = manager.CurrentWindowSize();
1991 if (current_window > manager.WindowSizeLimit()) {
1992 manager.SetWindowSizeLimit(current_window);
1993 }
1994 }
1995 }
1996 }
1997
UpdateStreamSendWindowSizes(uint32_t new_value)1998 void OgHttp2Session::UpdateStreamSendWindowSizes(uint32_t new_value) {
1999 const int32_t delta =
2000 static_cast<int32_t>(new_value) - initial_stream_send_window_;
2001 initial_stream_send_window_ = new_value;
2002 for (auto& [stream_id, stream_state] : stream_map_) {
2003 const int64_t current_window_size = stream_state.send_window;
2004 const int64_t new_window_size = current_window_size + delta;
2005 if (new_window_size > spdy::kSpdyMaximumWindowSize) {
2006 EnqueueFrame(std::make_unique<spdy::SpdyRstStreamIR>(
2007 stream_id, spdy::ERROR_CODE_FLOW_CONTROL_ERROR));
2008 } else {
2009 stream_state.send_window += delta;
2010 }
2011 if (current_window_size <= 0 && new_window_size > 0) {
2012 write_scheduler_.MarkStreamReady(stream_id, false);
2013 }
2014 }
2015 }
2016
UpdateStreamReceiveWindowSizes(uint32_t new_value)2017 void OgHttp2Session::UpdateStreamReceiveWindowSizes(uint32_t new_value) {
2018 for (auto& [stream_id, stream_state] : stream_map_) {
2019 stream_state.window_manager.OnWindowSizeLimitChange(new_value);
2020 }
2021 }
2022
2023 } // namespace adapter
2024 } // namespace http2
2025