• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "quiche/http2/adapter/oghttp2_session.h"
2 
3 #include <cstdint>
4 #include <memory>
5 #include <utility>
6 #include <vector>
7 
8 #include "absl/memory/memory.h"
9 #include "absl/strings/escaping.h"
10 #include "quiche/http2/adapter/header_validator.h"
11 #include "quiche/http2/adapter/http2_protocol.h"
12 #include "quiche/http2/adapter/http2_util.h"
13 #include "quiche/http2/adapter/http2_visitor_interface.h"
14 #include "quiche/http2/adapter/noop_header_validator.h"
15 #include "quiche/http2/adapter/oghttp2_util.h"
16 #include "quiche/spdy/core/spdy_protocol.h"
17 
18 namespace http2 {
19 namespace adapter {
20 
21 namespace {
22 
23 using ConnectionError = Http2VisitorInterface::ConnectionError;
24 using SpdyFramerError = Http2DecoderAdapter::SpdyFramerError;
25 
26 using ::spdy::SpdySettingsIR;
27 
28 const uint32_t kMaxAllowedMetadataFrameSize = 65536u;
29 const uint32_t kDefaultHpackTableCapacity = 4096u;
30 const uint32_t kMaximumHpackTableCapacity = 65536u;
31 
32 // Corresponds to NGHTTP2_ERR_CALLBACK_FAILURE.
33 const int kSendError = -902;
34 
35 constexpr absl::string_view kHeadValue = "HEAD";
36 
37 // TODO(birenroy): Consider incorporating spdy::FlagsSerializionVisitor here.
38 class FrameAttributeCollector : public spdy::SpdyFrameVisitor {
39  public:
40   FrameAttributeCollector() = default;
VisitData(const spdy::SpdyDataIR & data)41   void VisitData(const spdy::SpdyDataIR& data) override {
42     frame_type_ = static_cast<uint8_t>(data.frame_type());
43     stream_id_ = data.stream_id();
44     flags_ =
45         (data.fin() ? END_STREAM_FLAG : 0) | (data.padded() ? PADDED_FLAG : 0);
46   }
VisitHeaders(const spdy::SpdyHeadersIR & headers)47   void VisitHeaders(const spdy::SpdyHeadersIR& headers) override {
48     frame_type_ = static_cast<uint8_t>(headers.frame_type());
49     stream_id_ = headers.stream_id();
50     flags_ = END_HEADERS_FLAG | (headers.fin() ? END_STREAM_FLAG : 0) |
51              (headers.padded() ? PADDED_FLAG : 0) |
52              (headers.has_priority() ? PRIORITY_FLAG : 0);
53   }
VisitPriority(const spdy::SpdyPriorityIR & priority)54   void VisitPriority(const spdy::SpdyPriorityIR& priority) override {
55     frame_type_ = static_cast<uint8_t>(priority.frame_type());
56     frame_type_ = 2;
57     stream_id_ = priority.stream_id();
58   }
VisitRstStream(const spdy::SpdyRstStreamIR & rst_stream)59   void VisitRstStream(const spdy::SpdyRstStreamIR& rst_stream) override {
60     frame_type_ = static_cast<uint8_t>(rst_stream.frame_type());
61     frame_type_ = 3;
62     stream_id_ = rst_stream.stream_id();
63     error_code_ = rst_stream.error_code();
64   }
VisitSettings(const spdy::SpdySettingsIR & settings)65   void VisitSettings(const spdy::SpdySettingsIR& settings) override {
66     frame_type_ = static_cast<uint8_t>(settings.frame_type());
67     frame_type_ = 4;
68     flags_ = (settings.is_ack() ? ACK_FLAG : 0);
69   }
VisitPushPromise(const spdy::SpdyPushPromiseIR & push_promise)70   void VisitPushPromise(const spdy::SpdyPushPromiseIR& push_promise) override {
71     frame_type_ = static_cast<uint8_t>(push_promise.frame_type());
72     frame_type_ = 5;
73     stream_id_ = push_promise.stream_id();
74     flags_ = (push_promise.padded() ? PADDED_FLAG : 0);
75   }
VisitPing(const spdy::SpdyPingIR & ping)76   void VisitPing(const spdy::SpdyPingIR& ping) override {
77     frame_type_ = static_cast<uint8_t>(ping.frame_type());
78     frame_type_ = 6;
79     flags_ = (ping.is_ack() ? ACK_FLAG : 0);
80   }
VisitGoAway(const spdy::SpdyGoAwayIR & goaway)81   void VisitGoAway(const spdy::SpdyGoAwayIR& goaway) override {
82     frame_type_ = static_cast<uint8_t>(goaway.frame_type());
83     frame_type_ = 7;
84     error_code_ = goaway.error_code();
85   }
VisitWindowUpdate(const spdy::SpdyWindowUpdateIR & window_update)86   void VisitWindowUpdate(
87       const spdy::SpdyWindowUpdateIR& window_update) override {
88     frame_type_ = static_cast<uint8_t>(window_update.frame_type());
89     frame_type_ = 8;
90     stream_id_ = window_update.stream_id();
91   }
VisitContinuation(const spdy::SpdyContinuationIR & continuation)92   void VisitContinuation(
93       const spdy::SpdyContinuationIR& continuation) override {
94     frame_type_ = static_cast<uint8_t>(continuation.frame_type());
95     stream_id_ = continuation.stream_id();
96     flags_ = continuation.end_headers() ? END_HEADERS_FLAG : 0;
97   }
VisitUnknown(const spdy::SpdyUnknownIR & unknown)98   void VisitUnknown(const spdy::SpdyUnknownIR& unknown) override {
99     frame_type_ = static_cast<uint8_t>(unknown.frame_type());
100     stream_id_ = unknown.stream_id();
101     flags_ = unknown.flags();
102   }
VisitAltSvc(const spdy::SpdyAltSvcIR &)103   void VisitAltSvc(const spdy::SpdyAltSvcIR& /*altsvc*/) override {}
VisitPriorityUpdate(const spdy::SpdyPriorityUpdateIR &)104   void VisitPriorityUpdate(
105       const spdy::SpdyPriorityUpdateIR& /*priority_update*/) override {}
VisitAcceptCh(const spdy::SpdyAcceptChIR &)106   void VisitAcceptCh(const spdy::SpdyAcceptChIR& /*accept_ch*/) override {}
107 
stream_id()108   uint32_t stream_id() { return stream_id_; }
error_code()109   uint32_t error_code() { return error_code_; }
frame_type()110   uint8_t frame_type() { return frame_type_; }
flags()111   uint8_t flags() { return flags_; }
112 
113  private:
114   uint32_t stream_id_ = 0;
115   uint32_t error_code_ = 0;
116   uint8_t frame_type_ = 0;
117   uint8_t flags_ = 0;
118 };
119 
TracePerspectiveAsString(Perspective p)120 absl::string_view TracePerspectiveAsString(Perspective p) {
121   switch (p) {
122     case Perspective::kClient:
123       return "OGHTTP2_CLIENT";
124     case Perspective::kServer:
125       return "OGHTTP2_SERVER";
126   }
127   return "OGHTTP2_SERVER";
128 }
129 
130 class RunOnExit {
131  public:
132   RunOnExit() = default;
RunOnExit(std::function<void ()> f)133   explicit RunOnExit(std::function<void()> f) : f_(std::move(f)) {}
134 
135   RunOnExit(const RunOnExit& other) = delete;
136   RunOnExit& operator=(const RunOnExit& other) = delete;
137   RunOnExit(RunOnExit&& other) = delete;
138   RunOnExit& operator=(RunOnExit&& other) = delete;
139 
~RunOnExit()140   ~RunOnExit() {
141     if (f_) {
142       f_();
143     }
144     f_ = {};
145   }
146 
emplace(std::function<void ()> f)147   void emplace(std::function<void()> f) { f_ = std::move(f); }
148 
149  private:
150   std::function<void()> f_;
151 };
152 
GetHttp2ErrorCode(SpdyFramerError error)153 Http2ErrorCode GetHttp2ErrorCode(SpdyFramerError error) {
154   switch (error) {
155     case SpdyFramerError::SPDY_NO_ERROR:
156       return Http2ErrorCode::HTTP2_NO_ERROR;
157     case SpdyFramerError::SPDY_INVALID_STREAM_ID:
158     case SpdyFramerError::SPDY_INVALID_CONTROL_FRAME:
159     case SpdyFramerError::SPDY_INVALID_PADDING:
160     case SpdyFramerError::SPDY_INVALID_DATA_FRAME_FLAGS:
161     case SpdyFramerError::SPDY_UNEXPECTED_FRAME:
162       return Http2ErrorCode::PROTOCOL_ERROR;
163     case SpdyFramerError::SPDY_CONTROL_PAYLOAD_TOO_LARGE:
164     case SpdyFramerError::SPDY_INVALID_CONTROL_FRAME_SIZE:
165     case SpdyFramerError::SPDY_OVERSIZED_PAYLOAD:
166       return Http2ErrorCode::FRAME_SIZE_ERROR;
167     case SpdyFramerError::SPDY_DECOMPRESS_FAILURE:
168     case SpdyFramerError::SPDY_HPACK_INDEX_VARINT_ERROR:
169     case SpdyFramerError::SPDY_HPACK_NAME_LENGTH_VARINT_ERROR:
170     case SpdyFramerError::SPDY_HPACK_VALUE_LENGTH_VARINT_ERROR:
171     case SpdyFramerError::SPDY_HPACK_NAME_TOO_LONG:
172     case SpdyFramerError::SPDY_HPACK_VALUE_TOO_LONG:
173     case SpdyFramerError::SPDY_HPACK_NAME_HUFFMAN_ERROR:
174     case SpdyFramerError::SPDY_HPACK_VALUE_HUFFMAN_ERROR:
175     case SpdyFramerError::SPDY_HPACK_MISSING_DYNAMIC_TABLE_SIZE_UPDATE:
176     case SpdyFramerError::SPDY_HPACK_INVALID_INDEX:
177     case SpdyFramerError::SPDY_HPACK_INVALID_NAME_INDEX:
178     case SpdyFramerError::SPDY_HPACK_DYNAMIC_TABLE_SIZE_UPDATE_NOT_ALLOWED:
179     case SpdyFramerError::
180         SPDY_HPACK_INITIAL_DYNAMIC_TABLE_SIZE_UPDATE_IS_ABOVE_LOW_WATER_MARK:
181     case SpdyFramerError::
182         SPDY_HPACK_DYNAMIC_TABLE_SIZE_UPDATE_IS_ABOVE_ACKNOWLEDGED_SETTING:
183     case SpdyFramerError::SPDY_HPACK_TRUNCATED_BLOCK:
184     case SpdyFramerError::SPDY_HPACK_FRAGMENT_TOO_LONG:
185     case SpdyFramerError::SPDY_HPACK_COMPRESSED_HEADER_SIZE_EXCEEDS_LIMIT:
186       return Http2ErrorCode::COMPRESSION_ERROR;
187     case SpdyFramerError::SPDY_INTERNAL_FRAMER_ERROR:
188     case SpdyFramerError::SPDY_STOP_PROCESSING:
189     case SpdyFramerError::LAST_ERROR:
190       return Http2ErrorCode::INTERNAL_ERROR;
191   }
192   return Http2ErrorCode::INTERNAL_ERROR;
193 }
194 
IsResponse(HeaderType type)195 bool IsResponse(HeaderType type) {
196   return type == HeaderType::RESPONSE_100 || type == HeaderType::RESPONSE;
197 }
198 
StatusIs1xx(absl::string_view status)199 bool StatusIs1xx(absl::string_view status) {
200   return status.size() == 3 && status[0] == '1';
201 }
202 
203 // Returns the upper bound on HPACK encoder table capacity. If not specified in
204 // the Options, a reasonable default upper bound is used.
HpackCapacityBound(const OgHttp2Session::Options & o)205 uint32_t HpackCapacityBound(const OgHttp2Session::Options& o) {
206   return o.max_hpack_encoding_table_capacity.value_or(
207       kMaximumHpackTableCapacity);
208 }
209 
IsNonAckSettings(const spdy::SpdyFrameIR & frame)210 bool IsNonAckSettings(const spdy::SpdyFrameIR& frame) {
211   return frame.frame_type() == spdy::SpdyFrameType::SETTINGS &&
212          !reinterpret_cast<const SpdySettingsIR&>(frame).is_ack();
213 }
214 
215 }  // namespace
216 
PassthroughHeadersHandler(OgHttp2Session & session,Http2VisitorInterface & visitor)217 OgHttp2Session::PassthroughHeadersHandler::PassthroughHeadersHandler(
218     OgHttp2Session& session, Http2VisitorInterface& visitor)
219     : session_(session), visitor_(visitor) {
220   if (session_.options_.validate_http_headers) {
221     QUICHE_VLOG(2) << "instantiating regular header validator";
222     validator_ = std::make_unique<HeaderValidator>();
223   } else {
224     QUICHE_VLOG(2) << "instantiating noop header validator";
225     validator_ = std::make_unique<NoopHeaderValidator>();
226   }
227 }
228 
OnHeaderBlockStart()229 void OgHttp2Session::PassthroughHeadersHandler::OnHeaderBlockStart() {
230   result_ = Http2VisitorInterface::HEADER_OK;
231   const bool status = visitor_.OnBeginHeadersForStream(stream_id_);
232   if (!status) {
233     QUICHE_VLOG(1)
234         << "Visitor rejected header block, returning HEADER_CONNECTION_ERROR";
235     result_ = Http2VisitorInterface::HEADER_CONNECTION_ERROR;
236   }
237   validator_->StartHeaderBlock();
238 }
239 
InterpretHeaderStatus(HeaderValidator::HeaderStatus status)240 Http2VisitorInterface::OnHeaderResult InterpretHeaderStatus(
241     HeaderValidator::HeaderStatus status) {
242   switch (status) {
243     case HeaderValidator::HEADER_OK:
244     case HeaderValidator::HEADER_SKIP:
245       return Http2VisitorInterface::HEADER_OK;
246     case HeaderValidator::HEADER_FIELD_INVALID:
247       return Http2VisitorInterface::HEADER_FIELD_INVALID;
248     case HeaderValidator::HEADER_FIELD_TOO_LONG:
249       return Http2VisitorInterface::HEADER_RST_STREAM;
250   }
251   return Http2VisitorInterface::HEADER_CONNECTION_ERROR;
252 }
253 
OnHeader(absl::string_view key,absl::string_view value)254 void OgHttp2Session::PassthroughHeadersHandler::OnHeader(
255     absl::string_view key, absl::string_view value) {
256   if (result_ != Http2VisitorInterface::HEADER_OK) {
257     QUICHE_VLOG(2) << "Early return; status not HEADER_OK";
258     return;
259   }
260   const HeaderValidator::HeaderStatus validation_result =
261       validator_->ValidateSingleHeader(key, value);
262   if (validation_result == HeaderValidator::HEADER_SKIP) {
263     return;
264   }
265   if (validation_result != HeaderValidator::HEADER_OK) {
266     QUICHE_VLOG(2) << "Header validation failed with result "
267                    << static_cast<int>(validation_result);
268     result_ = InterpretHeaderStatus(validation_result);
269     return;
270   }
271   result_ = visitor_.OnHeaderForStream(stream_id_, key, value);
272 }
273 
OnHeaderBlockEnd(size_t,size_t)274 void OgHttp2Session::PassthroughHeadersHandler::OnHeaderBlockEnd(
275     size_t /* uncompressed_header_bytes */,
276     size_t /* compressed_header_bytes */) {
277   if (result_ == Http2VisitorInterface::HEADER_OK) {
278     if (!validator_->FinishHeaderBlock(type_)) {
279       QUICHE_VLOG(1) << "FinishHeaderBlock returned false; returning "
280                         "HEADER_HTTP_MESSAGING";
281       result_ = Http2VisitorInterface::HEADER_HTTP_MESSAGING;
282     }
283   }
284   if (frame_contains_fin_ && IsResponse(type_) &&
285       StatusIs1xx(status_header())) {
286     QUICHE_VLOG(1) << "Unexpected end of stream without final headers";
287     result_ = Http2VisitorInterface::HEADER_HTTP_MESSAGING;
288   }
289   if (result_ == Http2VisitorInterface::HEADER_OK) {
290     const bool result = visitor_.OnEndHeadersForStream(stream_id_);
291     if (!result) {
292       session_.fatal_visitor_callback_failure_ = true;
293       session_.decoder_.StopProcessing();
294     }
295   } else {
296     session_.OnHeaderStatus(stream_id_, result_);
297   }
298   frame_contains_fin_ = false;
299 }
300 
301 // TODO(diannahu): Add checks for request methods.
CanReceiveBody() const302 bool OgHttp2Session::PassthroughHeadersHandler::CanReceiveBody() const {
303   switch (header_type()) {
304     case HeaderType::REQUEST_TRAILER:
305     case HeaderType::RESPONSE_TRAILER:
306     case HeaderType::RESPONSE_100:
307       return false;
308     case HeaderType::RESPONSE:
309       // 304 responses should not have a body:
310       // https://httpwg.org/specs/rfc7230.html#rfc.section.3.3.2
311       // Neither should 204 responses:
312       // https://httpwg.org/specs/rfc7231.html#rfc.section.6.3.5
313       return status_header() != "304" && status_header() != "204";
314     case HeaderType::REQUEST:
315       return true;
316   }
317   return true;
318 }
319 
320 // A visitor that extracts an int64_t from each type of a ProcessBytesResult.
321 struct OgHttp2Session::ProcessBytesResultVisitor {
operator ()http2::adapter::OgHttp2Session::ProcessBytesResultVisitor322   int64_t operator()(const int64_t bytes) const { return bytes; }
323 
operator ()http2::adapter::OgHttp2Session::ProcessBytesResultVisitor324   int64_t operator()(const ProcessBytesError error) const {
325     switch (error) {
326       case ProcessBytesError::kUnspecified:
327         return -1;
328       case ProcessBytesError::kInvalidConnectionPreface:
329         return -903;  // NGHTTP2_ERR_BAD_CLIENT_MAGIC
330       case ProcessBytesError::kVisitorCallbackFailed:
331         return -902;  // NGHTTP2_ERR_CALLBACK_FAILURE
332     }
333     return -1;
334   }
335 };
336 
OgHttp2Session(Http2VisitorInterface & visitor,Options options)337 OgHttp2Session::OgHttp2Session(Http2VisitorInterface& visitor, Options options)
338     : visitor_(visitor),
339       options_(options),
340       event_forwarder_([this]() { return !latched_error_; }, *this),
341       receive_logger_(
342           &event_forwarder_, TracePerspectiveAsString(options.perspective),
__anon1df836620302() 343           [logging_enabled = GetQuicheFlag(quiche_oghttp2_debug_trace)]() {
344             return logging_enabled;
345           },
346           this),
347       send_logger_(
348           TracePerspectiveAsString(options.perspective),
__anon1df836620402() 349           [logging_enabled = GetQuicheFlag(quiche_oghttp2_debug_trace)]() {
350             return logging_enabled;
351           },
352           this),
353       headers_handler_(*this, visitor),
354       noop_headers_handler_(/*listener=*/nullptr),
355       connection_window_manager_(
356           kInitialFlowControlWindowSize,
__anon1df836620502(size_t window_update_delta) 357           [this](size_t window_update_delta) {
358             SendWindowUpdate(kConnectionStreamId, window_update_delta);
359           },
360           options.should_window_update_fn,
361           /*update_window_on_notify=*/false) {
362   decoder_.set_visitor(&receive_logger_);
363   if (options_.max_header_list_bytes) {
364     // Limit buffering of encoded HPACK data to 2x the decoded limit.
365     decoder_.GetHpackDecoder()->set_max_decode_buffer_size_bytes(
366         2 * *options_.max_header_list_bytes);
367     // Limit the total bytes accepted for HPACK decoding to 4x the limit.
368     decoder_.GetHpackDecoder()->set_max_header_block_bytes(
369         4 * *options_.max_header_list_bytes);
370   }
371   if (IsServerSession()) {
372     remaining_preface_ = {spdy::kHttp2ConnectionHeaderPrefix,
373                           spdy::kHttp2ConnectionHeaderPrefixSize};
374   }
375   if (options_.max_header_field_size.has_value()) {
376     headers_handler_.SetMaxFieldSize(options_.max_header_field_size.value());
377   }
378   headers_handler_.SetAllowObsText(options_.allow_obs_text);
379 }
380 
~OgHttp2Session()381 OgHttp2Session::~OgHttp2Session() {}
382 
SetStreamUserData(Http2StreamId stream_id,void * user_data)383 void OgHttp2Session::SetStreamUserData(Http2StreamId stream_id,
384                                        void* user_data) {
385   auto it = stream_map_.find(stream_id);
386   if (it != stream_map_.end()) {
387     it->second.user_data = user_data;
388   }
389 }
390 
GetStreamUserData(Http2StreamId stream_id)391 void* OgHttp2Session::GetStreamUserData(Http2StreamId stream_id) {
392   auto it = stream_map_.find(stream_id);
393   if (it != stream_map_.end()) {
394     return it->second.user_data;
395   }
396   auto p = pending_streams_.find(stream_id);
397   if (p != pending_streams_.end()) {
398     return p->second.user_data;
399   }
400   return nullptr;
401 }
402 
ResumeStream(Http2StreamId stream_id)403 bool OgHttp2Session::ResumeStream(Http2StreamId stream_id) {
404   auto it = stream_map_.find(stream_id);
405   if (it == stream_map_.end() || it->second.outbound_body == nullptr ||
406       !write_scheduler_.StreamRegistered(stream_id)) {
407     return false;
408   }
409   it->second.data_deferred = false;
410   write_scheduler_.MarkStreamReady(stream_id, /*add_to_front=*/false);
411   return true;
412 }
413 
GetStreamSendWindowSize(Http2StreamId stream_id) const414 int OgHttp2Session::GetStreamSendWindowSize(Http2StreamId stream_id) const {
415   auto it = stream_map_.find(stream_id);
416   if (it != stream_map_.end()) {
417     return it->second.send_window;
418   }
419   return -1;
420 }
421 
GetStreamReceiveWindowLimit(Http2StreamId stream_id) const422 int OgHttp2Session::GetStreamReceiveWindowLimit(Http2StreamId stream_id) const {
423   auto it = stream_map_.find(stream_id);
424   if (it != stream_map_.end()) {
425     return it->second.window_manager.WindowSizeLimit();
426   }
427   return -1;
428 }
429 
GetStreamReceiveWindowSize(Http2StreamId stream_id) const430 int OgHttp2Session::GetStreamReceiveWindowSize(Http2StreamId stream_id) const {
431   auto it = stream_map_.find(stream_id);
432   if (it != stream_map_.end()) {
433     return it->second.window_manager.CurrentWindowSize();
434   }
435   return -1;
436 }
437 
GetReceiveWindowSize() const438 int OgHttp2Session::GetReceiveWindowSize() const {
439   return connection_window_manager_.CurrentWindowSize();
440 }
441 
GetHpackEncoderDynamicTableSize() const442 int OgHttp2Session::GetHpackEncoderDynamicTableSize() const {
443   const spdy::HpackEncoder* encoder = framer_.GetHpackEncoder();
444   return encoder == nullptr ? 0 : encoder->GetDynamicTableSize();
445 }
446 
GetHpackEncoderDynamicTableCapacity() const447 int OgHttp2Session::GetHpackEncoderDynamicTableCapacity() const {
448   const spdy::HpackEncoder* encoder = framer_.GetHpackEncoder();
449   return encoder == nullptr ? kDefaultHpackTableCapacity
450                             : encoder->CurrentHeaderTableSizeSetting();
451 }
452 
GetHpackDecoderDynamicTableSize() const453 int OgHttp2Session::GetHpackDecoderDynamicTableSize() const {
454   const spdy::HpackDecoderAdapter* decoder = decoder_.GetHpackDecoder();
455   return decoder == nullptr ? 0 : decoder->GetDynamicTableSize();
456 }
457 
GetHpackDecoderSizeLimit() const458 int OgHttp2Session::GetHpackDecoderSizeLimit() const {
459   const spdy::HpackDecoderAdapter* decoder = decoder_.GetHpackDecoder();
460   return decoder == nullptr ? 0 : decoder->GetCurrentHeaderTableSizeSetting();
461 }
462 
ProcessBytes(absl::string_view bytes)463 int64_t OgHttp2Session::ProcessBytes(absl::string_view bytes) {
464   QUICHE_VLOG(2) << TracePerspectiveAsString(options_.perspective)
465                  << " processing [" << absl::CEscape(bytes) << "]";
466   return absl::visit(ProcessBytesResultVisitor(), ProcessBytesImpl(bytes));
467 }
468 
469 absl::variant<int64_t, OgHttp2Session::ProcessBytesError>
ProcessBytesImpl(absl::string_view bytes)470 OgHttp2Session::ProcessBytesImpl(absl::string_view bytes) {
471   if (processing_bytes_) {
472     QUICHE_VLOG(1) << "Returning early; already processing bytes.";
473     return 0;
474   }
475   processing_bytes_ = true;
476   RunOnExit r{[this]() { processing_bytes_ = false; }};
477 
478   if (options_.blackhole_data_on_connection_error && latched_error_) {
479     return static_cast<int64_t>(bytes.size());
480   }
481 
482   int64_t preface_consumed = 0;
483   if (!remaining_preface_.empty()) {
484     QUICHE_VLOG(2) << "Preface bytes remaining: " << remaining_preface_.size();
485     // decoder_ does not understand the client connection preface.
486     size_t min_size = std::min(remaining_preface_.size(), bytes.size());
487     if (!absl::StartsWith(remaining_preface_, bytes.substr(0, min_size))) {
488       // Preface doesn't match!
489       QUICHE_DLOG(INFO) << "Preface doesn't match! Expected: ["
490                         << absl::CEscape(remaining_preface_) << "], actual: ["
491                         << absl::CEscape(bytes) << "]";
492       LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
493                           ConnectionError::kInvalidConnectionPreface);
494       return ProcessBytesError::kInvalidConnectionPreface;
495     }
496     remaining_preface_.remove_prefix(min_size);
497     bytes.remove_prefix(min_size);
498     if (!remaining_preface_.empty()) {
499       QUICHE_VLOG(2) << "Preface bytes remaining: "
500                      << remaining_preface_.size();
501       return static_cast<int64_t>(min_size);
502     }
503     preface_consumed = min_size;
504   }
505   int64_t result = decoder_.ProcessInput(bytes.data(), bytes.size());
506   QUICHE_VLOG(2) << "ProcessBytes result: " << result;
507   if (fatal_visitor_callback_failure_) {
508     QUICHE_DCHECK(latched_error_);
509     QUICHE_VLOG(2) << "Visitor callback failed while processing bytes.";
510     return ProcessBytesError::kVisitorCallbackFailed;
511   }
512   if (latched_error_ || result < 0) {
513     QUICHE_VLOG(2) << "ProcessBytes encountered an error.";
514     if (options_.blackhole_data_on_connection_error) {
515       return static_cast<int64_t>(bytes.size() + preface_consumed);
516     } else {
517       return ProcessBytesError::kUnspecified;
518     }
519   }
520   return result + preface_consumed;
521 }
522 
Consume(Http2StreamId stream_id,size_t num_bytes)523 int OgHttp2Session::Consume(Http2StreamId stream_id, size_t num_bytes) {
524   auto it = stream_map_.find(stream_id);
525   if (it == stream_map_.end()) {
526     QUICHE_LOG(ERROR) << "Stream " << stream_id << " not found when consuming "
527                       << num_bytes << " bytes";
528   } else {
529     it->second.window_manager.MarkDataFlushed(num_bytes);
530   }
531   connection_window_manager_.MarkDataFlushed(num_bytes);
532   return 0;  // Remove?
533 }
534 
StartGracefulShutdown()535 void OgHttp2Session::StartGracefulShutdown() {
536   if (IsServerSession()) {
537     if (!queued_goaway_) {
538       EnqueueFrame(std::make_unique<spdy::SpdyGoAwayIR>(
539           std::numeric_limits<int32_t>::max(), spdy::ERROR_CODE_NO_ERROR,
540           "graceful_shutdown"));
541     }
542   } else {
543     QUICHE_LOG(ERROR) << "Graceful shutdown not needed for clients.";
544   }
545 }
546 
EnqueueFrame(std::unique_ptr<spdy::SpdyFrameIR> frame)547 void OgHttp2Session::EnqueueFrame(std::unique_ptr<spdy::SpdyFrameIR> frame) {
548   if (queued_immediate_goaway_) {
549     // Do not allow additional frames to be enqueued after the GOAWAY.
550     return;
551   }
552 
553   const bool is_non_ack_settings = IsNonAckSettings(*frame);
554   MaybeSetupPreface(is_non_ack_settings);
555 
556   if (frame->frame_type() == spdy::SpdyFrameType::GOAWAY) {
557     queued_goaway_ = true;
558     if (latched_error_) {
559       PrepareForImmediateGoAway();
560     }
561   } else if (frame->fin() ||
562              frame->frame_type() == spdy::SpdyFrameType::RST_STREAM) {
563     auto iter = stream_map_.find(frame->stream_id());
564     if (iter != stream_map_.end()) {
565       iter->second.half_closed_local = true;
566     }
567     if (frame->frame_type() == spdy::SpdyFrameType::RST_STREAM) {
568       // TODO(diannahu): Condition on existence in the stream map?
569       streams_reset_.insert(frame->stream_id());
570     }
571   } else if (frame->frame_type() == spdy::SpdyFrameType::WINDOW_UPDATE) {
572     UpdateReceiveWindow(
573         frame->stream_id(),
574         reinterpret_cast<spdy::SpdyWindowUpdateIR&>(*frame).delta());
575   } else if (is_non_ack_settings) {
576     HandleOutboundSettings(
577         *reinterpret_cast<spdy::SpdySettingsIR*>(frame.get()));
578   }
579   if (frame->stream_id() != 0) {
580     auto result = queued_frames_.insert({frame->stream_id(), 1});
581     if (!result.second) {
582       ++(result.first->second);
583     }
584   }
585   frames_.push_back(std::move(frame));
586 }
587 
Send()588 int OgHttp2Session::Send() {
589   if (sending_) {
590     QUICHE_VLOG(1) << TracePerspectiveAsString(options_.perspective)
591                    << " returning early; already sending.";
592     return 0;
593   }
594   sending_ = true;
595   RunOnExit r{[this]() { sending_ = false; }};
596 
597   if (fatal_send_error_) {
598     return kSendError;
599   }
600 
601   MaybeSetupPreface(/*sending_outbound_settings=*/false);
602 
603   SendResult continue_writing = SendQueuedFrames();
604   if (queued_immediate_goaway_) {
605     // If an immediate GOAWAY was queued, then the above flush either sent the
606     // GOAWAY or buffered it to be sent on the next successful flush. In either
607     // case, return early here to avoid sending other frames.
608     return InterpretSendResult(continue_writing);
609   }
610   // Notify on new/pending streams closed due to GOAWAY receipt.
611   CloseGoAwayRejectedStreams();
612   // Wake streams for writes.
613   while (continue_writing == SendResult::SEND_OK && HasReadyStream()) {
614     const Http2StreamId stream_id = GetNextReadyStream();
615     // TODO(birenroy): Add a return value to indicate write blockage, so streams
616     // aren't woken unnecessarily.
617     QUICHE_VLOG(1) << "Waking stream " << stream_id << " for writes.";
618     continue_writing = WriteForStream(stream_id);
619   }
620   if (continue_writing == SendResult::SEND_OK) {
621     continue_writing = SendQueuedFrames();
622   }
623   return InterpretSendResult(continue_writing);
624 }
625 
InterpretSendResult(SendResult result)626 int OgHttp2Session::InterpretSendResult(SendResult result) {
627   if (result == SendResult::SEND_ERROR) {
628     fatal_send_error_ = true;
629     return kSendError;
630   } else {
631     return 0;
632   }
633 }
634 
HasReadyStream() const635 bool OgHttp2Session::HasReadyStream() const {
636   return !trailers_ready_.empty() ||
637          (write_scheduler_.HasReadyStreams() && connection_send_window_ > 0);
638 }
639 
GetNextReadyStream()640 Http2StreamId OgHttp2Session::GetNextReadyStream() {
641   QUICHE_DCHECK(HasReadyStream());
642   if (!trailers_ready_.empty()) {
643     const Http2StreamId stream_id = *trailers_ready_.begin();
644     // WriteForStream() will re-mark the stream as ready, if necessary.
645     write_scheduler_.MarkStreamNotReady(stream_id);
646     return stream_id;
647   }
648   return write_scheduler_.PopNextReadyStream();
649 }
650 
MaybeSendBufferedData()651 OgHttp2Session::SendResult OgHttp2Session::MaybeSendBufferedData() {
652   int64_t result = std::numeric_limits<int64_t>::max();
653   while (result > 0 && !buffered_data_.empty()) {
654     result = visitor_.OnReadyToSend(buffered_data_);
655     if (result > 0) {
656       buffered_data_.erase(0, result);
657     }
658   }
659   if (result < 0) {
660     LatchErrorAndNotify(Http2ErrorCode::INTERNAL_ERROR,
661                         ConnectionError::kSendError);
662     return SendResult::SEND_ERROR;
663   }
664   return buffered_data_.empty() ? SendResult::SEND_OK
665                                 : SendResult::SEND_BLOCKED;
666 }
667 
SendQueuedFrames()668 OgHttp2Session::SendResult OgHttp2Session::SendQueuedFrames() {
669   // Flush any serialized prefix.
670   const SendResult result = MaybeSendBufferedData();
671   if (result != SendResult::SEND_OK) {
672     return result;
673   }
674   // Serialize and send frames in the queue.
675   while (!frames_.empty()) {
676     const auto& frame_ptr = frames_.front();
677     FrameAttributeCollector c;
678     frame_ptr->Visit(&c);
679 
680     // DATA frames should never be queued.
681     QUICHE_DCHECK_NE(c.frame_type(), 0);
682 
683     const bool stream_reset =
684         c.stream_id() != 0 && streams_reset_.count(c.stream_id()) > 0;
685     if (stream_reset &&
686         c.frame_type() != static_cast<uint8_t>(FrameType::RST_STREAM)) {
687       // The stream has been reset, so any other remaining frames can be
688       // skipped.
689       // TODO(birenroy): inform the visitor of frames that are skipped.
690       DecrementQueuedFrameCount(c.stream_id(), c.frame_type());
691       frames_.pop_front();
692       continue;
693     } else if (!IsServerSession() && received_goaway_ &&
694                c.stream_id() >
695                    static_cast<uint32_t>(received_goaway_stream_id_)) {
696       // This frame will be ignored by the server, so don't send it. The stream
697       // associated with this frame should have been closed in OnGoAway().
698       frames_.pop_front();
699       continue;
700     }
701     // Frames can't accurately report their own length; the actual serialized
702     // length must be used instead.
703     spdy::SpdySerializedFrame frame = framer_.SerializeFrame(*frame_ptr);
704     const size_t frame_payload_length = frame.size() - spdy::kFrameHeaderSize;
705     frame_ptr->Visit(&send_logger_);
706     visitor_.OnBeforeFrameSent(c.frame_type(), c.stream_id(),
707                                frame_payload_length, c.flags());
708     const int64_t result = visitor_.OnReadyToSend(absl::string_view(frame));
709     if (result < 0) {
710       LatchErrorAndNotify(Http2ErrorCode::INTERNAL_ERROR,
711                           ConnectionError::kSendError);
712       return SendResult::SEND_ERROR;
713     } else if (result == 0) {
714       // Write blocked.
715       return SendResult::SEND_BLOCKED;
716     } else {
717       frames_.pop_front();
718 
719       const bool ok =
720           AfterFrameSent(c.frame_type(), c.stream_id(), frame_payload_length,
721                          c.flags(), c.error_code());
722       if (!ok) {
723         LatchErrorAndNotify(Http2ErrorCode::INTERNAL_ERROR,
724                             ConnectionError::kSendError);
725         return SendResult::SEND_ERROR;
726       }
727       if (static_cast<size_t>(result) < frame.size()) {
728         // The frame was partially written, so the rest must be buffered.
729         buffered_data_.append(frame.data() + result, frame.size() - result);
730         return SendResult::SEND_BLOCKED;
731       }
732     }
733   }
734   return SendResult::SEND_OK;
735 }
736 
AfterFrameSent(uint8_t frame_type_int,uint32_t stream_id,size_t payload_length,uint8_t flags,uint32_t error_code)737 bool OgHttp2Session::AfterFrameSent(uint8_t frame_type_int, uint32_t stream_id,
738                                     size_t payload_length, uint8_t flags,
739                                     uint32_t error_code) {
740   const FrameType frame_type = static_cast<FrameType>(frame_type_int);
741   int result = visitor_.OnFrameSent(frame_type_int, stream_id, payload_length,
742                                     flags, error_code);
743   if (result < 0) {
744     return false;
745   }
746   if (stream_id == 0) {
747     if (frame_type == FrameType::SETTINGS) {
748       const bool is_settings_ack = (flags & ACK_FLAG);
749       if (is_settings_ack && encoder_header_table_capacity_when_acking_) {
750         framer_.UpdateHeaderEncoderTableSize(
751             encoder_header_table_capacity_when_acking_.value());
752         encoder_header_table_capacity_when_acking_ = absl::nullopt;
753       } else if (!is_settings_ack) {
754         sent_non_ack_settings_ = true;
755       }
756     }
757     return true;
758   }
759 
760   const bool contains_fin =
761       (frame_type == FrameType::DATA || frame_type == FrameType::HEADERS) &&
762       (flags & END_STREAM_FLAG) == END_STREAM_FLAG;
763   auto it = stream_map_.find(stream_id);
764   const bool still_open_remote =
765       it != stream_map_.end() && !it->second.half_closed_remote;
766   if (contains_fin && still_open_remote &&
767       options_.rst_stream_no_error_when_incomplete && IsServerSession()) {
768     // Since the peer has not yet ended the stream, this endpoint should
769     // send a RST_STREAM NO_ERROR. See RFC 7540 Section 8.1.
770     frames_.push_front(std::make_unique<spdy::SpdyRstStreamIR>(
771         stream_id, spdy::SpdyErrorCode::ERROR_CODE_NO_ERROR));
772     auto queued_result = queued_frames_.insert({stream_id, 1});
773     if (!queued_result.second) {
774       ++(queued_result.first->second);
775     }
776     it->second.half_closed_remote = true;
777   }
778 
779   DecrementQueuedFrameCount(stream_id, frame_type_int);
780   return true;
781 }
782 
WriteForStream(Http2StreamId stream_id)783 OgHttp2Session::SendResult OgHttp2Session::WriteForStream(
784     Http2StreamId stream_id) {
785   auto it = stream_map_.find(stream_id);
786   if (it == stream_map_.end()) {
787     QUICHE_LOG(ERROR) << "Can't find stream " << stream_id
788                       << " which is ready to write!";
789     return SendResult::SEND_OK;
790   }
791   StreamState& state = it->second;
792   auto reset_it = streams_reset_.find(stream_id);
793   if (reset_it != streams_reset_.end()) {
794     // The stream has been reset; there's no point in sending DATA or trailing
795     // HEADERS.
796     state.outbound_body = nullptr;
797     state.trailers = nullptr;
798     return SendResult::SEND_OK;
799   }
800 
801   SendResult connection_can_write = SendResult::SEND_OK;
802   if (state.outbound_body == nullptr ||
803       (!options_.trailers_require_end_data && state.data_deferred)) {
804     // No data to send, but there might be trailers.
805     if (state.trailers != nullptr) {
806       // Trailers will include END_STREAM, so the data source can be discarded.
807       // Since data_deferred is true, there is no data waiting to be flushed for
808       // this stream.
809       state.outbound_body = nullptr;
810       auto block_ptr = std::move(state.trailers);
811       if (state.half_closed_local) {
812         QUICHE_LOG(ERROR) << "Sent fin; can't send trailers.";
813       } else {
814         SendTrailers(stream_id, std::move(*block_ptr));
815       }
816     }
817     return SendResult::SEND_OK;
818   }
819   int32_t available_window =
820       std::min({connection_send_window_, state.send_window,
821                 static_cast<int32_t>(max_frame_payload_)});
822   while (connection_can_write == SendResult::SEND_OK && available_window > 0 &&
823          state.outbound_body != nullptr && !state.data_deferred) {
824     auto [length, end_data] =
825         state.outbound_body->SelectPayloadLength(available_window);
826     QUICHE_VLOG(2) << "WriteForStream | length: " << length
827                    << " end_data: " << end_data
828                    << " trailers: " << state.trailers.get();
829     if (length == 0 && !end_data &&
830         (options_.trailers_require_end_data || state.trailers == nullptr)) {
831       // An unproductive call to SelectPayloadLength() results in this stream
832       // entering the "deferred" state only if either no trailers are available
833       // to send, or trailers require an explicit end_data before being sent.
834       state.data_deferred = true;
835       break;
836     } else if (length == DataFrameSource::kError) {
837       // TODO(birenroy): Consider queuing a RST_STREAM INTERNAL_ERROR instead.
838       CloseStream(stream_id, Http2ErrorCode::INTERNAL_ERROR);
839       // No more work on the stream; it has been closed.
840       break;
841     }
842     const bool fin = end_data ? state.outbound_body->send_fin() : false;
843     if (length > 0 || fin) {
844       spdy::SpdyDataIR data(stream_id);
845       data.set_fin(fin);
846       data.SetDataShallow(length);
847       spdy::SpdySerializedFrame header =
848           spdy::SpdyFramer::SerializeDataFrameHeaderWithPaddingLengthField(
849               data);
850       QUICHE_DCHECK(buffered_data_.empty() && frames_.empty());
851       const bool success =
852           state.outbound_body->Send(absl::string_view(header), length);
853       if (!success) {
854         connection_can_write = SendResult::SEND_BLOCKED;
855         break;
856       }
857       connection_send_window_ -= length;
858       state.send_window -= length;
859       available_window = std::min({connection_send_window_, state.send_window,
860                                    static_cast<int32_t>(max_frame_payload_)});
861       if (fin) {
862         state.half_closed_local = true;
863         MaybeFinWithRstStream(it);
864       }
865       const bool ok = AfterFrameSent(/* DATA */ 0, stream_id, length,
866                                      fin ? END_STREAM_FLAG : 0x0, 0);
867       if (!ok) {
868         LatchErrorAndNotify(Http2ErrorCode::INTERNAL_ERROR,
869                             ConnectionError::kSendError);
870         return SendResult::SEND_ERROR;
871       }
872       if (!stream_map_.contains(stream_id)) {
873         // Note: the stream may have been closed if `fin` is true.
874         break;
875       }
876     }
877     if (end_data || (length == 0 && state.trailers != nullptr &&
878                      !options_.trailers_require_end_data)) {
879       // If SelectPayloadLength() returned {0, false}, and there are trailers to
880       // send, and the safety feature is disabled, it's okay to send the
881       // trailers.
882       if (state.trailers != nullptr) {
883         auto block_ptr = std::move(state.trailers);
884         if (fin) {
885           QUICHE_LOG(ERROR) << "Sent fin; can't send trailers.";
886         } else {
887           SendTrailers(stream_id, std::move(*block_ptr));
888         }
889       }
890       state.outbound_body = nullptr;
891     }
892   }
893   // If the stream still exists and has data to send, it should be marked as
894   // ready in the write scheduler.
895   if (stream_map_.contains(stream_id) && !state.data_deferred &&
896       state.send_window > 0 && state.outbound_body != nullptr) {
897     write_scheduler_.MarkStreamReady(stream_id, false);
898   }
899   // Streams can continue writing as long as the connection is not write-blocked
900   // and there is additional flow control quota available.
901   if (connection_can_write != SendResult::SEND_OK) {
902     return connection_can_write;
903   }
904   return connection_send_window_ <= 0 ? SendResult::SEND_BLOCKED
905                                       : SendResult::SEND_OK;
906 }
907 
SerializeMetadata(Http2StreamId stream_id,std::unique_ptr<MetadataSource> source)908 void OgHttp2Session::SerializeMetadata(Http2StreamId stream_id,
909                                        std::unique_ptr<MetadataSource> source) {
910   const uint32_t max_payload_size =
911       std::min(kMaxAllowedMetadataFrameSize, max_frame_payload_);
912   auto payload_buffer = std::make_unique<uint8_t[]>(max_payload_size);
913 
914   while (true) {
915     auto [written, end_metadata] =
916         source->Pack(payload_buffer.get(), max_payload_size);
917     if (written < 0) {
918       // Unable to pack any metadata.
919       return;
920     }
921     QUICHE_DCHECK_LE(static_cast<size_t>(written), max_payload_size);
922     auto payload = absl::string_view(
923         reinterpret_cast<const char*>(payload_buffer.get()), written);
924     EnqueueFrame(std::make_unique<spdy::SpdyUnknownIR>(
925         stream_id, kMetadataFrameType, end_metadata ? kMetadataEndFlag : 0u,
926         std::string(payload)));
927     if (end_metadata) {
928       return;
929     }
930   }
931 }
932 
SubmitRequest(absl::Span<const Header> headers,std::unique_ptr<DataFrameSource> data_source,void * user_data)933 int32_t OgHttp2Session::SubmitRequest(
934     absl::Span<const Header> headers,
935     std::unique_ptr<DataFrameSource> data_source, void* user_data) {
936   // TODO(birenroy): return an error for the incorrect perspective
937   const Http2StreamId stream_id = next_stream_id_;
938   next_stream_id_ += 2;
939   if (!pending_streams_.empty() || !CanCreateStream()) {
940     // TODO(diannahu): There should probably be a limit to the number of allowed
941     // pending streams.
942     pending_streams_.insert(
943         {stream_id, PendingStreamState{ToHeaderBlock(headers),
944                                        std::move(data_source), user_data}});
945     StartPendingStreams();
946   } else {
947     StartRequest(stream_id, ToHeaderBlock(headers), std::move(data_source),
948                  user_data);
949   }
950   return stream_id;
951 }
952 
SubmitResponse(Http2StreamId stream_id,absl::Span<const Header> headers,std::unique_ptr<DataFrameSource> data_source)953 int OgHttp2Session::SubmitResponse(
954     Http2StreamId stream_id, absl::Span<const Header> headers,
955     std::unique_ptr<DataFrameSource> data_source) {
956   // TODO(birenroy): return an error for the incorrect perspective
957   auto iter = stream_map_.find(stream_id);
958   if (iter == stream_map_.end()) {
959     QUICHE_LOG(ERROR) << "Unable to find stream " << stream_id;
960     return -501;  // NGHTTP2_ERR_INVALID_ARGUMENT
961   }
962   const bool end_stream = data_source == nullptr;
963   if (!end_stream) {
964     // Add data source to stream state
965     iter->second.outbound_body = std::move(data_source);
966     write_scheduler_.MarkStreamReady(stream_id, false);
967   }
968   SendHeaders(stream_id, ToHeaderBlock(headers), end_stream);
969   return 0;
970 }
971 
SubmitTrailer(Http2StreamId stream_id,absl::Span<const Header> trailers)972 int OgHttp2Session::SubmitTrailer(Http2StreamId stream_id,
973                                   absl::Span<const Header> trailers) {
974   // TODO(birenroy): Reject trailers when acting as a client?
975   auto iter = stream_map_.find(stream_id);
976   if (iter == stream_map_.end()) {
977     QUICHE_LOG(ERROR) << "Unable to find stream " << stream_id;
978     return -501;  // NGHTTP2_ERR_INVALID_ARGUMENT
979   }
980   StreamState& state = iter->second;
981   if (state.half_closed_local) {
982     QUICHE_LOG(ERROR) << "Stream " << stream_id << " is half closed (local)";
983     return -514;  // NGHTTP2_ERR_INVALID_STREAM_STATE
984   }
985   if (state.trailers != nullptr) {
986     QUICHE_LOG(ERROR) << "Stream " << stream_id
987                       << " already has trailers queued";
988     return -514;  // NGHTTP2_ERR_INVALID_STREAM_STATE
989   }
990   if (state.outbound_body == nullptr) {
991     // Enqueue trailers immediately.
992     SendTrailers(stream_id, ToHeaderBlock(trailers));
993   } else {
994     QUICHE_LOG_IF(ERROR, state.outbound_body->send_fin())
995         << "DataFrameSource will send fin, preventing trailers!";
996     // Save trailers so they can be written once data is done.
997     state.trailers =
998         std::make_unique<spdy::Http2HeaderBlock>(ToHeaderBlock(trailers));
999     if (!options_.trailers_require_end_data || !iter->second.data_deferred) {
1000       trailers_ready_.insert(stream_id);
1001     }
1002   }
1003   return 0;
1004 }
1005 
SubmitMetadata(Http2StreamId stream_id,std::unique_ptr<MetadataSource> source)1006 void OgHttp2Session::SubmitMetadata(Http2StreamId stream_id,
1007                                     std::unique_ptr<MetadataSource> source) {
1008   SerializeMetadata(stream_id, std::move(source));
1009 }
1010 
SubmitSettings(absl::Span<const Http2Setting> settings)1011 void OgHttp2Session::SubmitSettings(absl::Span<const Http2Setting> settings) {
1012   auto frame = PrepareSettingsFrame(settings);
1013   EnqueueFrame(std::move(frame));
1014 }
1015 
OnError(SpdyFramerError error,std::string detailed_error)1016 void OgHttp2Session::OnError(SpdyFramerError error,
1017                              std::string detailed_error) {
1018   QUICHE_VLOG(1) << "Error: "
1019                  << http2::Http2DecoderAdapter::SpdyFramerErrorToString(error)
1020                  << " details: " << detailed_error;
1021   // TODO(diannahu): Consider propagating `detailed_error`.
1022   LatchErrorAndNotify(GetHttp2ErrorCode(error), ConnectionError::kParseError);
1023 }
1024 
OnCommonHeader(spdy::SpdyStreamId stream_id,size_t length,uint8_t type,uint8_t flags)1025 void OgHttp2Session::OnCommonHeader(spdy::SpdyStreamId stream_id, size_t length,
1026                                     uint8_t type, uint8_t flags) {
1027   highest_received_stream_id_ = std::max(static_cast<Http2StreamId>(stream_id),
1028                                          highest_received_stream_id_);
1029   if (streams_reset_.contains(stream_id)) {
1030     return;
1031   }
1032   const bool result = visitor_.OnFrameHeader(stream_id, length, type, flags);
1033   if (!result) {
1034     fatal_visitor_callback_failure_ = true;
1035     decoder_.StopProcessing();
1036   }
1037 }
1038 
OnDataFrameHeader(spdy::SpdyStreamId stream_id,size_t length,bool)1039 void OgHttp2Session::OnDataFrameHeader(spdy::SpdyStreamId stream_id,
1040                                        size_t length, bool /*fin*/) {
1041   auto iter = stream_map_.find(stream_id);
1042   if (iter == stream_map_.end() || streams_reset_.contains(stream_id)) {
1043     // The stream does not exist; it could be an error or a benign close, e.g.,
1044     // getting data for a stream this connection recently closed.
1045     if (static_cast<Http2StreamId>(stream_id) > highest_processed_stream_id_) {
1046       // Receiving DATA before HEADERS is a connection error.
1047       LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
1048                           ConnectionError::kWrongFrameSequence);
1049     }
1050     return;
1051   }
1052 
1053   if (static_cast<int64_t>(length) >
1054       connection_window_manager_.CurrentWindowSize()) {
1055     // Peer exceeded the connection flow control limit.
1056     LatchErrorAndNotify(
1057         Http2ErrorCode::FLOW_CONTROL_ERROR,
1058         Http2VisitorInterface::ConnectionError::kFlowControlError);
1059     return;
1060   }
1061 
1062   if (static_cast<int64_t>(length) >
1063       iter->second.window_manager.CurrentWindowSize()) {
1064     // Peer exceeded the stream flow control limit.
1065     EnqueueFrame(std::make_unique<spdy::SpdyRstStreamIR>(
1066         stream_id, spdy::ERROR_CODE_FLOW_CONTROL_ERROR));
1067     return;
1068   }
1069 
1070   const bool result = visitor_.OnBeginDataForStream(stream_id, length);
1071   if (!result) {
1072     fatal_visitor_callback_failure_ = true;
1073     decoder_.StopProcessing();
1074   }
1075 
1076   if (!iter->second.can_receive_body && length > 0) {
1077     EnqueueFrame(std::make_unique<spdy::SpdyRstStreamIR>(
1078         stream_id, spdy::ERROR_CODE_PROTOCOL_ERROR));
1079     return;
1080   }
1081 
1082   // Validate against the content-length if it exists.
1083   if (iter->second.remaining_content_length.has_value()) {
1084     if (length > *iter->second.remaining_content_length) {
1085       HandleContentLengthError(stream_id);
1086       iter->second.remaining_content_length.reset();
1087     } else {
1088       *iter->second.remaining_content_length -= length;
1089     }
1090   }
1091 }
1092 
OnStreamFrameData(spdy::SpdyStreamId stream_id,const char * data,size_t len)1093 void OgHttp2Session::OnStreamFrameData(spdy::SpdyStreamId stream_id,
1094                                        const char* data, size_t len) {
1095   // Count the data against flow control, even if the stream is unknown.
1096   MarkDataBuffered(stream_id, len);
1097 
1098   if (!stream_map_.contains(stream_id) || streams_reset_.contains(stream_id)) {
1099     // If the stream was unknown due to a protocol error, the visitor was
1100     // informed in OnDataFrameHeader().
1101     return;
1102   }
1103 
1104   const bool result =
1105       visitor_.OnDataForStream(stream_id, absl::string_view(data, len));
1106   if (!result) {
1107     fatal_visitor_callback_failure_ = true;
1108     decoder_.StopProcessing();
1109   }
1110 }
1111 
OnStreamEnd(spdy::SpdyStreamId stream_id)1112 void OgHttp2Session::OnStreamEnd(spdy::SpdyStreamId stream_id) {
1113   auto iter = stream_map_.find(stream_id);
1114   if (iter != stream_map_.end()) {
1115     iter->second.half_closed_remote = true;
1116     if (streams_reset_.contains(stream_id)) {
1117       return;
1118     }
1119 
1120     // Validate against the content-length if it exists.
1121     if (iter->second.remaining_content_length.has_value() &&
1122         *iter->second.remaining_content_length != 0) {
1123       HandleContentLengthError(stream_id);
1124       return;
1125     }
1126 
1127     const bool result = visitor_.OnEndStream(stream_id);
1128     if (!result) {
1129       fatal_visitor_callback_failure_ = true;
1130       decoder_.StopProcessing();
1131     }
1132   }
1133 
1134   auto queued_frames_iter = queued_frames_.find(stream_id);
1135   const bool no_queued_frames = queued_frames_iter == queued_frames_.end() ||
1136                                 queued_frames_iter->second == 0;
1137   if (iter != stream_map_.end() && iter->second.half_closed_local &&
1138       !IsServerSession() && no_queued_frames) {
1139     // From the client's perspective, the stream can be closed if it's already
1140     // half_closed_local.
1141     CloseStream(stream_id, Http2ErrorCode::HTTP2_NO_ERROR);
1142   }
1143 }
1144 
OnStreamPadLength(spdy::SpdyStreamId stream_id,size_t value)1145 void OgHttp2Session::OnStreamPadLength(spdy::SpdyStreamId stream_id,
1146                                        size_t value) {
1147   bool result = visitor_.OnDataPaddingLength(stream_id, 1 + value);
1148   if (!result) {
1149     fatal_visitor_callback_failure_ = true;
1150     decoder_.StopProcessing();
1151   }
1152   MarkDataBuffered(stream_id, 1 + value);
1153 }
1154 
OnStreamPadding(spdy::SpdyStreamId,size_t)1155 void OgHttp2Session::OnStreamPadding(spdy::SpdyStreamId /*stream_id*/, size_t
1156                                      /*len*/) {
1157   // Flow control was accounted for in OnStreamPadLength().
1158   // TODO(181586191): Pass padding to the visitor?
1159 }
1160 
OnHeaderFrameStart(spdy::SpdyStreamId stream_id)1161 spdy::SpdyHeadersHandlerInterface* OgHttp2Session::OnHeaderFrameStart(
1162     spdy::SpdyStreamId stream_id) {
1163   auto it = stream_map_.find(stream_id);
1164   if (it != stream_map_.end() && !streams_reset_.contains(stream_id)) {
1165     headers_handler_.set_stream_id(stream_id);
1166     headers_handler_.set_header_type(
1167         NextHeaderType(it->second.received_header_type));
1168     return &headers_handler_;
1169   } else {
1170     return &noop_headers_handler_;
1171   }
1172 }
1173 
OnHeaderFrameEnd(spdy::SpdyStreamId stream_id)1174 void OgHttp2Session::OnHeaderFrameEnd(spdy::SpdyStreamId stream_id) {
1175   auto it = stream_map_.find(stream_id);
1176   if (it != stream_map_.end()) {
1177     if (headers_handler_.header_type() == HeaderType::RESPONSE &&
1178         !headers_handler_.status_header().empty() &&
1179         headers_handler_.status_header()[0] == '1') {
1180       // If response headers carried a 1xx response code, final response headers
1181       // should still be forthcoming.
1182       headers_handler_.set_header_type(HeaderType::RESPONSE_100);
1183     }
1184     it->second.received_header_type = headers_handler_.header_type();
1185 
1186     // Track the content-length if the headers indicate that a body can follow.
1187     it->second.can_receive_body =
1188         headers_handler_.CanReceiveBody() && !it->second.sent_head_method;
1189     if (it->second.can_receive_body) {
1190       it->second.remaining_content_length = headers_handler_.content_length();
1191     }
1192 
1193     headers_handler_.set_stream_id(0);
1194   }
1195 }
1196 
OnRstStream(spdy::SpdyStreamId stream_id,spdy::SpdyErrorCode error_code)1197 void OgHttp2Session::OnRstStream(spdy::SpdyStreamId stream_id,
1198                                  spdy::SpdyErrorCode error_code) {
1199   auto iter = stream_map_.find(stream_id);
1200   if (iter != stream_map_.end()) {
1201     iter->second.half_closed_remote = true;
1202     iter->second.outbound_body = nullptr;
1203   } else if (static_cast<Http2StreamId>(stream_id) >
1204              highest_processed_stream_id_) {
1205     // Receiving RST_STREAM before HEADERS is a connection error.
1206     LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
1207                         ConnectionError::kWrongFrameSequence);
1208     return;
1209   }
1210   if (streams_reset_.contains(stream_id)) {
1211     return;
1212   }
1213   visitor_.OnRstStream(stream_id, TranslateErrorCode(error_code));
1214   // TODO(birenroy): Consider whether there are outbound frames queued for the
1215   // stream.
1216   CloseStream(stream_id, TranslateErrorCode(error_code));
1217 }
1218 
OnSettings()1219 void OgHttp2Session::OnSettings() {
1220   visitor_.OnSettingsStart();
1221   auto settings = std::make_unique<SpdySettingsIR>();
1222   settings->set_is_ack(true);
1223   EnqueueFrame(std::move(settings));
1224 }
1225 
OnSetting(spdy::SpdySettingsId id,uint32_t value)1226 void OgHttp2Session::OnSetting(spdy::SpdySettingsId id, uint32_t value) {
1227   switch (id) {
1228     case HEADER_TABLE_SIZE:
1229       value = std::min(value, HpackCapacityBound(options_));
1230       if (value < framer_.GetHpackEncoder()->CurrentHeaderTableSizeSetting()) {
1231         // Safe to apply a smaller table capacity immediately.
1232         QUICHE_VLOG(2) << TracePerspectiveAsString(options_.perspective)
1233                        << " applying encoder table capacity " << value;
1234         framer_.GetHpackEncoder()->ApplyHeaderTableSizeSetting(value);
1235       } else {
1236         QUICHE_VLOG(2)
1237             << TracePerspectiveAsString(options_.perspective)
1238             << " NOT applying encoder table capacity until writing ack: "
1239             << value;
1240         encoder_header_table_capacity_when_acking_ = value;
1241       }
1242       break;
1243     case ENABLE_PUSH:
1244       if (value > 1u) {
1245         visitor_.OnInvalidFrame(
1246             0, Http2VisitorInterface::InvalidFrameError::kProtocol);
1247         // The specification says this is a connection-level protocol error.
1248         LatchErrorAndNotify(
1249             Http2ErrorCode::PROTOCOL_ERROR,
1250             Http2VisitorInterface::ConnectionError::kInvalidSetting);
1251         return;
1252       }
1253       // Aside from validation, this setting is ignored.
1254       break;
1255     case MAX_CONCURRENT_STREAMS:
1256       max_outbound_concurrent_streams_ = value;
1257       if (!IsServerSession()) {
1258         // We may now be able to start pending streams.
1259         StartPendingStreams();
1260       }
1261       break;
1262     case INITIAL_WINDOW_SIZE:
1263       if (value > spdy::kSpdyMaximumWindowSize) {
1264         visitor_.OnInvalidFrame(
1265             0, Http2VisitorInterface::InvalidFrameError::kFlowControl);
1266         // The specification says this is a connection-level flow control error.
1267         LatchErrorAndNotify(
1268             Http2ErrorCode::FLOW_CONTROL_ERROR,
1269             Http2VisitorInterface::ConnectionError::kFlowControlError);
1270         return;
1271       } else {
1272         UpdateStreamSendWindowSizes(value);
1273       }
1274       break;
1275     case MAX_FRAME_SIZE:
1276       if (value < kDefaultFramePayloadSizeLimit ||
1277           value > kMaximumFramePayloadSizeLimit) {
1278         visitor_.OnInvalidFrame(
1279             0, Http2VisitorInterface::InvalidFrameError::kProtocol);
1280         // The specification says this is a connection-level protocol error.
1281         LatchErrorAndNotify(
1282             Http2ErrorCode::PROTOCOL_ERROR,
1283             Http2VisitorInterface::ConnectionError::kInvalidSetting);
1284         return;
1285       }
1286       max_frame_payload_ = value;
1287       break;
1288     case ENABLE_CONNECT_PROTOCOL:
1289       if (value > 1u || (value == 0 && peer_enables_connect_protocol_)) {
1290         visitor_.OnInvalidFrame(
1291             0, Http2VisitorInterface::InvalidFrameError::kProtocol);
1292         LatchErrorAndNotify(
1293             Http2ErrorCode::PROTOCOL_ERROR,
1294             Http2VisitorInterface::ConnectionError::kInvalidSetting);
1295         return;
1296       }
1297       peer_enables_connect_protocol_ = (value == 1u);
1298       break;
1299     default:
1300       // TODO(bnc): See if C++17 inline constants are allowed in QUICHE.
1301       if (id == kMetadataExtensionId) {
1302         peer_supports_metadata_ = (value != 0);
1303       } else {
1304         QUICHE_VLOG(1) << "Unimplemented SETTING id: " << id;
1305       }
1306   }
1307   visitor_.OnSetting({id, value});
1308 }
1309 
OnSettingsEnd()1310 void OgHttp2Session::OnSettingsEnd() { visitor_.OnSettingsEnd(); }
1311 
OnSettingsAck()1312 void OgHttp2Session::OnSettingsAck() {
1313   if (!settings_ack_callbacks_.empty()) {
1314     SettingsAckCallback callback = std::move(settings_ack_callbacks_.front());
1315     settings_ack_callbacks_.pop_front();
1316     callback();
1317   }
1318 
1319   visitor_.OnSettingsAck();
1320 }
1321 
OnPing(spdy::SpdyPingId unique_id,bool is_ack)1322 void OgHttp2Session::OnPing(spdy::SpdyPingId unique_id, bool is_ack) {
1323   visitor_.OnPing(unique_id, is_ack);
1324   if (options_.auto_ping_ack && !is_ack) {
1325     auto ping = std::make_unique<spdy::SpdyPingIR>(unique_id);
1326     ping->set_is_ack(true);
1327     EnqueueFrame(std::move(ping));
1328   }
1329 }
1330 
OnGoAway(spdy::SpdyStreamId last_accepted_stream_id,spdy::SpdyErrorCode error_code)1331 void OgHttp2Session::OnGoAway(spdy::SpdyStreamId last_accepted_stream_id,
1332                               spdy::SpdyErrorCode error_code) {
1333   if (received_goaway_ &&
1334       last_accepted_stream_id >
1335           static_cast<spdy::SpdyStreamId>(received_goaway_stream_id_)) {
1336     // This GOAWAY has a higher `last_accepted_stream_id` than a previous
1337     // GOAWAY, a connection-level spec violation.
1338     const bool ok = visitor_.OnInvalidFrame(
1339         kConnectionStreamId,
1340         Http2VisitorInterface::InvalidFrameError::kProtocol);
1341     if (!ok) {
1342       fatal_visitor_callback_failure_ = true;
1343     }
1344     LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
1345                         ConnectionError::kInvalidGoAwayLastStreamId);
1346     return;
1347   }
1348 
1349   received_goaway_ = true;
1350   received_goaway_stream_id_ = last_accepted_stream_id;
1351   const bool result = visitor_.OnGoAway(last_accepted_stream_id,
1352                                         TranslateErrorCode(error_code), "");
1353   if (!result) {
1354     fatal_visitor_callback_failure_ = true;
1355     decoder_.StopProcessing();
1356   }
1357 
1358   // Close the streams above `last_accepted_stream_id`. Only applies if the
1359   // session receives a GOAWAY as a client, as we do not support server push.
1360   if (last_accepted_stream_id == spdy::kMaxStreamId || IsServerSession()) {
1361     return;
1362   }
1363   std::vector<Http2StreamId> streams_to_close;
1364   for (const auto& [stream_id, stream_state] : stream_map_) {
1365     if (static_cast<spdy::SpdyStreamId>(stream_id) > last_accepted_stream_id) {
1366       streams_to_close.push_back(stream_id);
1367     }
1368   }
1369   for (Http2StreamId stream_id : streams_to_close) {
1370     CloseStream(stream_id, Http2ErrorCode::REFUSED_STREAM);
1371   }
1372 }
1373 
OnGoAwayFrameData(const char *,size_t)1374 bool OgHttp2Session::OnGoAwayFrameData(const char* /*goaway_data*/, size_t
1375                                        /*len*/) {
1376   // Opaque data is currently ignored.
1377   return true;
1378 }
1379 
OnHeaders(spdy::SpdyStreamId stream_id,size_t,bool,int,spdy::SpdyStreamId,bool,bool fin,bool)1380 void OgHttp2Session::OnHeaders(spdy::SpdyStreamId stream_id,
1381                                size_t /*payload_length*/, bool /*has_priority*/,
1382                                int /*weight*/,
1383                                spdy::SpdyStreamId /*parent_stream_id*/,
1384                                bool /*exclusive*/, bool fin, bool /*end*/) {
1385   if (stream_id % 2 == 0) {
1386     // Server push is disabled; receiving push HEADERS is a connection error.
1387     LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
1388                         ConnectionError::kInvalidNewStreamId);
1389     return;
1390   }
1391   if (fin) {
1392     headers_handler_.set_frame_contains_fin();
1393   }
1394   if (IsServerSession()) {
1395     const auto new_stream_id = static_cast<Http2StreamId>(stream_id);
1396     if (stream_map_.find(new_stream_id) != stream_map_.end() && fin) {
1397       // Not a new stream, must be trailers.
1398       return;
1399     }
1400     if (new_stream_id <= highest_processed_stream_id_) {
1401       // A new stream ID lower than the watermark is a connection error.
1402       LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
1403                           ConnectionError::kInvalidNewStreamId);
1404       return;
1405     }
1406 
1407     if (stream_map_.size() >= max_inbound_concurrent_streams_) {
1408       // The new stream would exceed our advertised and acknowledged
1409       // MAX_CONCURRENT_STREAMS. For parity with nghttp2, treat this error as a
1410       // connection-level PROTOCOL_ERROR.
1411       bool ok = visitor_.OnInvalidFrame(
1412           stream_id, Http2VisitorInterface::InvalidFrameError::kProtocol);
1413       if (!ok) {
1414         fatal_visitor_callback_failure_ = true;
1415       }
1416       LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
1417                           ConnectionError::kExceededMaxConcurrentStreams);
1418       return;
1419     }
1420     if (stream_map_.size() >= pending_max_inbound_concurrent_streams_) {
1421       // The new stream would exceed our advertised but unacked
1422       // MAX_CONCURRENT_STREAMS. Refuse the stream for parity with nghttp2.
1423       EnqueueFrame(std::make_unique<spdy::SpdyRstStreamIR>(
1424           stream_id, spdy::ERROR_CODE_REFUSED_STREAM));
1425       const bool ok = visitor_.OnInvalidFrame(
1426           stream_id, Http2VisitorInterface::InvalidFrameError::kRefusedStream);
1427       if (!ok) {
1428         fatal_visitor_callback_failure_ = true;
1429         LatchErrorAndNotify(Http2ErrorCode::REFUSED_STREAM,
1430                             ConnectionError::kExceededMaxConcurrentStreams);
1431       }
1432       return;
1433     }
1434 
1435     CreateStream(stream_id);
1436   }
1437 }
1438 
OnWindowUpdate(spdy::SpdyStreamId stream_id,int delta_window_size)1439 void OgHttp2Session::OnWindowUpdate(spdy::SpdyStreamId stream_id,
1440                                     int delta_window_size) {
1441   constexpr int kMaxWindowValue = 2147483647;  // (1 << 31) - 1
1442   if (stream_id == 0) {
1443     if (delta_window_size == 0) {
1444       // A PROTOCOL_ERROR, according to RFC 9113 Section 6.9.
1445       LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
1446                           ConnectionError::kFlowControlError);
1447       return;
1448     }
1449     if (connection_send_window_ > 0 &&
1450         delta_window_size > (kMaxWindowValue - connection_send_window_)) {
1451       // Window overflow is a FLOW_CONTROL_ERROR.
1452       LatchErrorAndNotify(Http2ErrorCode::FLOW_CONTROL_ERROR,
1453                           ConnectionError::kFlowControlError);
1454       return;
1455     }
1456     connection_send_window_ += delta_window_size;
1457   } else {
1458     if (delta_window_size == 0) {
1459       // A PROTOCOL_ERROR, according to RFC 9113 Section 6.9.
1460       EnqueueFrame(std::make_unique<spdy::SpdyRstStreamIR>(
1461           stream_id, spdy::ERROR_CODE_PROTOCOL_ERROR));
1462       return;
1463     }
1464     auto it = stream_map_.find(stream_id);
1465     if (it == stream_map_.end()) {
1466       QUICHE_VLOG(1) << "Stream " << stream_id << " not found!";
1467       if (static_cast<Http2StreamId>(stream_id) >
1468           highest_processed_stream_id_) {
1469         // Receiving WINDOW_UPDATE before HEADERS is a connection error.
1470         LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
1471                             ConnectionError::kWrongFrameSequence);
1472       }
1473       // Do not inform the visitor of a WINDOW_UPDATE for a non-existent stream.
1474       return;
1475     } else {
1476       if (streams_reset_.contains(stream_id)) {
1477         return;
1478       }
1479       if (it->second.send_window > 0 &&
1480           delta_window_size > (kMaxWindowValue - it->second.send_window)) {
1481         // Window overflow is a FLOW_CONTROL_ERROR.
1482         EnqueueFrame(std::make_unique<spdy::SpdyRstStreamIR>(
1483             stream_id, spdy::ERROR_CODE_FLOW_CONTROL_ERROR));
1484         return;
1485       }
1486       const bool was_blocked = (it->second.send_window <= 0);
1487       it->second.send_window += delta_window_size;
1488       if (was_blocked && it->second.send_window > 0) {
1489         // The stream was blocked on flow control.
1490         QUICHE_VLOG(1) << "Marking stream " << stream_id << " ready to write.";
1491         write_scheduler_.MarkStreamReady(stream_id, false);
1492       }
1493     }
1494   }
1495   visitor_.OnWindowUpdate(stream_id, delta_window_size);
1496 }
1497 
OnPushPromise(spdy::SpdyStreamId,spdy::SpdyStreamId,bool)1498 void OgHttp2Session::OnPushPromise(spdy::SpdyStreamId /*stream_id*/,
1499                                    spdy::SpdyStreamId /*promised_stream_id*/,
1500                                    bool /*end*/) {
1501   // Server push is disabled; PUSH_PROMISE is an invalid frame.
1502   LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
1503                       ConnectionError::kInvalidPushPromise);
1504 }
1505 
OnContinuation(spdy::SpdyStreamId,size_t,bool)1506 void OgHttp2Session::OnContinuation(spdy::SpdyStreamId /*stream_id*/,
1507                                     size_t /*payload_length*/, bool /*end*/) {}
1508 
OnAltSvc(spdy::SpdyStreamId,absl::string_view,const spdy::SpdyAltSvcWireFormat::AlternativeServiceVector &)1509 void OgHttp2Session::OnAltSvc(spdy::SpdyStreamId /*stream_id*/,
1510                               absl::string_view /*origin*/,
1511                               const spdy::SpdyAltSvcWireFormat::
1512                                   AlternativeServiceVector& /*altsvc_vector*/) {
1513 }
1514 
OnPriority(spdy::SpdyStreamId,spdy::SpdyStreamId,int,bool)1515 void OgHttp2Session::OnPriority(spdy::SpdyStreamId /*stream_id*/,
1516                                 spdy::SpdyStreamId /*parent_stream_id*/,
1517                                 int /*weight*/, bool /*exclusive*/) {}
1518 
OnPriorityUpdate(spdy::SpdyStreamId,absl::string_view)1519 void OgHttp2Session::OnPriorityUpdate(
1520     spdy::SpdyStreamId /*prioritized_stream_id*/,
1521     absl::string_view /*priority_field_value*/) {}
1522 
OnUnknownFrame(spdy::SpdyStreamId,uint8_t)1523 bool OgHttp2Session::OnUnknownFrame(spdy::SpdyStreamId /*stream_id*/,
1524                                     uint8_t /*frame_type*/) {
1525   return true;
1526 }
1527 
OnUnknownFrameStart(spdy::SpdyStreamId stream_id,size_t length,uint8_t type,uint8_t flags)1528 void OgHttp2Session::OnUnknownFrameStart(spdy::SpdyStreamId stream_id,
1529                                          size_t length, uint8_t type,
1530                                          uint8_t flags) {
1531   process_metadata_ = false;
1532   if (streams_reset_.contains(stream_id)) {
1533     return;
1534   }
1535   if (type == kMetadataFrameType) {
1536     QUICHE_DCHECK_EQ(metadata_length_, 0u);
1537     visitor_.OnBeginMetadataForStream(stream_id, length);
1538     metadata_length_ = length;
1539     process_metadata_ = true;
1540     end_metadata_ = flags & kMetadataEndFlag;
1541 
1542     // Empty metadata payloads will not trigger OnUnknownFramePayload(), so
1543     // handle that possibility here.
1544     MaybeHandleMetadataEndForStream(stream_id);
1545   } else {
1546     QUICHE_DLOG(INFO) << "Received unexpected frame type "
1547                       << static_cast<int>(type);
1548   }
1549 }
1550 
OnUnknownFramePayload(spdy::SpdyStreamId stream_id,absl::string_view payload)1551 void OgHttp2Session::OnUnknownFramePayload(spdy::SpdyStreamId stream_id,
1552                                            absl::string_view payload) {
1553   if (!process_metadata_) {
1554     return;
1555   }
1556   if (streams_reset_.contains(stream_id)) {
1557     return;
1558   }
1559   if (metadata_length_ > 0) {
1560     QUICHE_DCHECK_LE(payload.size(), metadata_length_);
1561     const bool payload_success =
1562         visitor_.OnMetadataForStream(stream_id, payload);
1563     if (payload_success) {
1564       metadata_length_ -= payload.size();
1565       MaybeHandleMetadataEndForStream(stream_id);
1566     } else {
1567       fatal_visitor_callback_failure_ = true;
1568       decoder_.StopProcessing();
1569     }
1570   } else {
1571     QUICHE_DLOG(INFO) << "Unexpected metadata payload for stream " << stream_id;
1572   }
1573 }
1574 
OnHeaderStatus(Http2StreamId stream_id,Http2VisitorInterface::OnHeaderResult result)1575 void OgHttp2Session::OnHeaderStatus(
1576     Http2StreamId stream_id, Http2VisitorInterface::OnHeaderResult result) {
1577   QUICHE_DCHECK_NE(result, Http2VisitorInterface::HEADER_OK);
1578   QUICHE_VLOG(1) << "OnHeaderStatus(stream_id=" << stream_id
1579                  << ", result=" << result << ")";
1580   const bool should_reset_stream =
1581       result == Http2VisitorInterface::HEADER_RST_STREAM ||
1582       result == Http2VisitorInterface::HEADER_FIELD_INVALID ||
1583       result == Http2VisitorInterface::HEADER_HTTP_MESSAGING;
1584   if (should_reset_stream) {
1585     const Http2ErrorCode error_code =
1586         (result == Http2VisitorInterface::HEADER_RST_STREAM)
1587             ? Http2ErrorCode::INTERNAL_ERROR
1588             : Http2ErrorCode::PROTOCOL_ERROR;
1589     const spdy::SpdyErrorCode spdy_error_code = TranslateErrorCode(error_code);
1590     const Http2VisitorInterface::InvalidFrameError frame_error =
1591         (result == Http2VisitorInterface::HEADER_RST_STREAM ||
1592          result == Http2VisitorInterface::HEADER_FIELD_INVALID)
1593             ? Http2VisitorInterface::InvalidFrameError::kHttpHeader
1594             : Http2VisitorInterface::InvalidFrameError::kHttpMessaging;
1595     auto it = streams_reset_.find(stream_id);
1596     if (it == streams_reset_.end()) {
1597       EnqueueFrame(
1598           std::make_unique<spdy::SpdyRstStreamIR>(stream_id, spdy_error_code));
1599 
1600       if (result == Http2VisitorInterface::HEADER_FIELD_INVALID ||
1601           result == Http2VisitorInterface::HEADER_HTTP_MESSAGING) {
1602         const bool ok = visitor_.OnInvalidFrame(stream_id, frame_error);
1603         if (!ok) {
1604           fatal_visitor_callback_failure_ = true;
1605           LatchErrorAndNotify(error_code, ConnectionError::kHeaderError);
1606         }
1607       }
1608     }
1609   } else if (result == Http2VisitorInterface::HEADER_CONNECTION_ERROR) {
1610     fatal_visitor_callback_failure_ = true;
1611     LatchErrorAndNotify(Http2ErrorCode::INTERNAL_ERROR,
1612                         ConnectionError::kHeaderError);
1613   } else if (result == Http2VisitorInterface::HEADER_COMPRESSION_ERROR) {
1614     LatchErrorAndNotify(Http2ErrorCode::COMPRESSION_ERROR,
1615                         ConnectionError::kHeaderError);
1616   }
1617 }
1618 
MaybeSetupPreface(bool sending_outbound_settings)1619 void OgHttp2Session::MaybeSetupPreface(bool sending_outbound_settings) {
1620   if (!queued_preface_) {
1621     queued_preface_ = true;
1622     if (!IsServerSession()) {
1623       buffered_data_.assign(spdy::kHttp2ConnectionHeaderPrefix,
1624                             spdy::kHttp2ConnectionHeaderPrefixSize);
1625     }
1626     if (!sending_outbound_settings) {
1627       QUICHE_DCHECK(frames_.empty());
1628       // First frame must be a non-ack SETTINGS.
1629       EnqueueFrame(PrepareSettingsFrame(GetInitialSettings()));
1630     }
1631   }
1632 }
1633 
GetInitialSettings() const1634 std::vector<Http2Setting> OgHttp2Session::GetInitialSettings() const {
1635   std::vector<Http2Setting> settings;
1636   if (!IsServerSession()) {
1637     // Disable server push. Note that server push from clients is already
1638     // disabled, so the server does not need to send this disabling setting.
1639     // TODO(diannahu): Consider applying server push disabling on SETTINGS ack.
1640     settings.push_back({Http2KnownSettingsId::ENABLE_PUSH, 0});
1641   }
1642   if (options_.max_header_list_bytes) {
1643     settings.push_back({Http2KnownSettingsId::MAX_HEADER_LIST_SIZE,
1644                         *options_.max_header_list_bytes});
1645   }
1646   if (options_.allow_extended_connect && IsServerSession()) {
1647     settings.push_back({Http2KnownSettingsId::ENABLE_CONNECT_PROTOCOL, 1u});
1648   }
1649   return settings;
1650 }
1651 
PrepareSettingsFrame(absl::Span<const Http2Setting> settings)1652 std::unique_ptr<SpdySettingsIR> OgHttp2Session::PrepareSettingsFrame(
1653     absl::Span<const Http2Setting> settings) {
1654   auto settings_ir = std::make_unique<SpdySettingsIR>();
1655   for (const Http2Setting& setting : settings) {
1656     settings_ir->AddSetting(setting.id, setting.value);
1657   }
1658   return settings_ir;
1659 }
1660 
HandleOutboundSettings(const spdy::SpdySettingsIR & settings_frame)1661 void OgHttp2Session::HandleOutboundSettings(
1662     const spdy::SpdySettingsIR& settings_frame) {
1663   for (const auto& [id, value] : settings_frame.values()) {
1664     switch (static_cast<Http2KnownSettingsId>(id)) {
1665       case MAX_CONCURRENT_STREAMS:
1666         pending_max_inbound_concurrent_streams_ = value;
1667         break;
1668       case ENABLE_CONNECT_PROTOCOL:
1669         if (value == 1u && IsServerSession()) {
1670           // Allow extended CONNECT semantics even before SETTINGS are acked, to
1671           // make things easier for clients.
1672           headers_handler_.SetAllowExtendedConnect();
1673         }
1674         break;
1675       case HEADER_TABLE_SIZE:
1676       case ENABLE_PUSH:
1677       case INITIAL_WINDOW_SIZE:
1678       case MAX_FRAME_SIZE:
1679       case MAX_HEADER_LIST_SIZE:
1680         QUICHE_VLOG(2)
1681             << "Not adjusting internal state for outbound setting with id "
1682             << id;
1683         break;
1684     }
1685   }
1686 
1687   // Copy the (small) map of settings we are about to send so that we can set
1688   // values in the SETTINGS ack callback.
1689   settings_ack_callbacks_.push_back(
1690       [this, settings_map = settings_frame.values()]() {
1691         for (const auto& [id, value] : settings_map) {
1692           switch (static_cast<Http2KnownSettingsId>(id)) {
1693             case MAX_CONCURRENT_STREAMS:
1694               max_inbound_concurrent_streams_ = value;
1695               break;
1696             case HEADER_TABLE_SIZE:
1697               decoder_.GetHpackDecoder()->ApplyHeaderTableSizeSetting(value);
1698               break;
1699             case INITIAL_WINDOW_SIZE:
1700               UpdateStreamReceiveWindowSizes(value);
1701               initial_stream_receive_window_ = value;
1702               break;
1703             case MAX_FRAME_SIZE:
1704               decoder_.SetMaxFrameSize(value);
1705               break;
1706             case ENABLE_PUSH:
1707             case MAX_HEADER_LIST_SIZE:
1708             case ENABLE_CONNECT_PROTOCOL:
1709               QUICHE_VLOG(2)
1710                   << "No action required in ack for outbound setting with id "
1711                   << id;
1712               break;
1713           }
1714         }
1715       });
1716 }
1717 
SendWindowUpdate(Http2StreamId stream_id,size_t update_delta)1718 void OgHttp2Session::SendWindowUpdate(Http2StreamId stream_id,
1719                                       size_t update_delta) {
1720   EnqueueFrame(
1721       std::make_unique<spdy::SpdyWindowUpdateIR>(stream_id, update_delta));
1722 }
1723 
SendHeaders(Http2StreamId stream_id,spdy::Http2HeaderBlock headers,bool end_stream)1724 void OgHttp2Session::SendHeaders(Http2StreamId stream_id,
1725                                  spdy::Http2HeaderBlock headers,
1726                                  bool end_stream) {
1727   auto frame =
1728       std::make_unique<spdy::SpdyHeadersIR>(stream_id, std::move(headers));
1729   frame->set_fin(end_stream);
1730   EnqueueFrame(std::move(frame));
1731 }
1732 
SendTrailers(Http2StreamId stream_id,spdy::Http2HeaderBlock trailers)1733 void OgHttp2Session::SendTrailers(Http2StreamId stream_id,
1734                                   spdy::Http2HeaderBlock trailers) {
1735   auto frame =
1736       std::make_unique<spdy::SpdyHeadersIR>(stream_id, std::move(trailers));
1737   frame->set_fin(true);
1738   EnqueueFrame(std::move(frame));
1739   trailers_ready_.erase(stream_id);
1740 }
1741 
MaybeFinWithRstStream(StreamStateMap::iterator iter)1742 void OgHttp2Session::MaybeFinWithRstStream(StreamStateMap::iterator iter) {
1743   QUICHE_DCHECK(iter != stream_map_.end() && iter->second.half_closed_local);
1744 
1745   if (options_.rst_stream_no_error_when_incomplete && IsServerSession() &&
1746       !iter->second.half_closed_remote) {
1747     // Since the peer has not yet ended the stream, this endpoint should
1748     // send a RST_STREAM NO_ERROR. See RFC 7540 Section 8.1.
1749     EnqueueFrame(std::make_unique<spdy::SpdyRstStreamIR>(
1750         iter->first, spdy::SpdyErrorCode::ERROR_CODE_NO_ERROR));
1751     iter->second.half_closed_remote = true;
1752   }
1753 }
1754 
MarkDataBuffered(Http2StreamId stream_id,size_t bytes)1755 void OgHttp2Session::MarkDataBuffered(Http2StreamId stream_id, size_t bytes) {
1756   connection_window_manager_.MarkDataBuffered(bytes);
1757   if (auto it = stream_map_.find(stream_id); it != stream_map_.end()) {
1758     it->second.window_manager.MarkDataBuffered(bytes);
1759   }
1760 }
1761 
CreateStream(Http2StreamId stream_id)1762 OgHttp2Session::StreamStateMap::iterator OgHttp2Session::CreateStream(
1763     Http2StreamId stream_id) {
1764   WindowManager::WindowUpdateListener listener =
1765       [this, stream_id](size_t window_update_delta) {
1766         SendWindowUpdate(stream_id, window_update_delta);
1767       };
1768   auto [iter, inserted] = stream_map_.try_emplace(
1769       stream_id,
1770       StreamState(initial_stream_receive_window_, initial_stream_send_window_,
1771                   std::move(listener), options_.should_window_update_fn));
1772   if (inserted) {
1773     // Add the stream to the write scheduler.
1774     const spdy::SpdyPriority priority = 3;
1775     write_scheduler_.RegisterStream(stream_id, priority);
1776 
1777     highest_processed_stream_id_ =
1778         std::max(highest_processed_stream_id_, stream_id);
1779   }
1780   return iter;
1781 }
1782 
StartRequest(Http2StreamId stream_id,spdy::Http2HeaderBlock headers,std::unique_ptr<DataFrameSource> data_source,void * user_data)1783 void OgHttp2Session::StartRequest(Http2StreamId stream_id,
1784                                   spdy::Http2HeaderBlock headers,
1785                                   std::unique_ptr<DataFrameSource> data_source,
1786                                   void* user_data) {
1787   if (received_goaway_) {
1788     // Do not start new streams after receiving a GOAWAY.
1789     goaway_rejected_streams_.insert(stream_id);
1790     return;
1791   }
1792 
1793   auto iter = CreateStream(stream_id);
1794   const bool end_stream = data_source == nullptr;
1795   if (!end_stream) {
1796     iter->second.outbound_body = std::move(data_source);
1797     write_scheduler_.MarkStreamReady(stream_id, false);
1798   }
1799   iter->second.user_data = user_data;
1800   for (const auto& [name, value] : headers) {
1801     if (name == kHttp2MethodPseudoHeader && value == kHeadValue) {
1802       iter->second.sent_head_method = true;
1803     }
1804   }
1805   SendHeaders(stream_id, std::move(headers), end_stream);
1806 }
1807 
StartPendingStreams()1808 void OgHttp2Session::StartPendingStreams() {
1809   while (!pending_streams_.empty() && CanCreateStream()) {
1810     auto& [stream_id, pending_stream] = pending_streams_.front();
1811     StartRequest(stream_id, std::move(pending_stream.headers),
1812                  std::move(pending_stream.data_source),
1813                  pending_stream.user_data);
1814     pending_streams_.pop_front();
1815   }
1816 }
1817 
CloseStream(Http2StreamId stream_id,Http2ErrorCode error_code)1818 void OgHttp2Session::CloseStream(Http2StreamId stream_id,
1819                                  Http2ErrorCode error_code) {
1820   const bool result = visitor_.OnCloseStream(stream_id, error_code);
1821   if (!result) {
1822     latched_error_ = true;
1823     decoder_.StopProcessing();
1824   }
1825   stream_map_.erase(stream_id);
1826   trailers_ready_.erase(stream_id);
1827   streams_reset_.erase(stream_id);
1828   auto queued_it = queued_frames_.find(stream_id);
1829   if (queued_it != queued_frames_.end()) {
1830     // Remove any queued frames for this stream.
1831     int frames_remaining = queued_it->second;
1832     queued_frames_.erase(queued_it);
1833     for (auto it = frames_.begin();
1834          frames_remaining > 0 && it != frames_.end();) {
1835       if (static_cast<Http2StreamId>((*it)->stream_id()) == stream_id) {
1836         it = frames_.erase(it);
1837         --frames_remaining;
1838       } else {
1839         ++it;
1840       }
1841     }
1842   }
1843   if (write_scheduler_.StreamRegistered(stream_id)) {
1844     write_scheduler_.UnregisterStream(stream_id);
1845   }
1846 
1847   StartPendingStreams();
1848 }
1849 
CanCreateStream() const1850 bool OgHttp2Session::CanCreateStream() const {
1851   return stream_map_.size() < max_outbound_concurrent_streams_;
1852 }
1853 
NextHeaderType(absl::optional<HeaderType> current_type)1854 HeaderType OgHttp2Session::NextHeaderType(
1855     absl::optional<HeaderType> current_type) {
1856   if (IsServerSession()) {
1857     if (!current_type) {
1858       return HeaderType::REQUEST;
1859     } else {
1860       QUICHE_DCHECK(current_type == HeaderType::REQUEST);
1861       return HeaderType::REQUEST_TRAILER;
1862     }
1863   } else if (!current_type ||
1864              current_type.value() == HeaderType::RESPONSE_100) {
1865     return HeaderType::RESPONSE;
1866   } else {
1867     return HeaderType::RESPONSE_TRAILER;
1868   }
1869 }
1870 
LatchErrorAndNotify(Http2ErrorCode error_code,ConnectionError error)1871 void OgHttp2Session::LatchErrorAndNotify(Http2ErrorCode error_code,
1872                                          ConnectionError error) {
1873   if (latched_error_) {
1874     // Do not kick a connection when it is down.
1875     return;
1876   }
1877 
1878   latched_error_ = true;
1879   visitor_.OnConnectionError(error);
1880   decoder_.StopProcessing();
1881   EnqueueFrame(std::make_unique<spdy::SpdyGoAwayIR>(
1882       highest_processed_stream_id_, TranslateErrorCode(error_code),
1883       ConnectionErrorToString(error)));
1884 }
1885 
CloseStreamIfReady(uint8_t frame_type,uint32_t stream_id)1886 void OgHttp2Session::CloseStreamIfReady(uint8_t frame_type,
1887                                         uint32_t stream_id) {
1888   auto iter = stream_map_.find(stream_id);
1889   if (iter == stream_map_.end()) {
1890     return;
1891   }
1892   const StreamState& state = iter->second;
1893   if (static_cast<FrameType>(frame_type) == FrameType::RST_STREAM ||
1894       (state.half_closed_local && state.half_closed_remote)) {
1895     CloseStream(stream_id, Http2ErrorCode::HTTP2_NO_ERROR);
1896   }
1897 }
1898 
CloseGoAwayRejectedStreams()1899 void OgHttp2Session::CloseGoAwayRejectedStreams() {
1900   for (Http2StreamId stream_id : goaway_rejected_streams_) {
1901     const bool result =
1902         visitor_.OnCloseStream(stream_id, Http2ErrorCode::REFUSED_STREAM);
1903     if (!result) {
1904       latched_error_ = true;
1905       decoder_.StopProcessing();
1906     }
1907   }
1908   goaway_rejected_streams_.clear();
1909 }
1910 
PrepareForImmediateGoAway()1911 void OgHttp2Session::PrepareForImmediateGoAway() {
1912   queued_immediate_goaway_ = true;
1913 
1914   // Keep the initial SETTINGS frame if the session has SETTINGS at the front of
1915   // the queue but has not sent SETTINGS yet. The session should send initial
1916   // SETTINGS before GOAWAY.
1917   std::unique_ptr<spdy::SpdyFrameIR> initial_settings;
1918   if (!sent_non_ack_settings_ && !frames_.empty() &&
1919       IsNonAckSettings(*frames_.front())) {
1920     initial_settings = std::move(frames_.front());
1921     frames_.pop_front();
1922   }
1923 
1924   // Remove all pending frames except for RST_STREAMs. It is important to send
1925   // RST_STREAMs so the peer knows of errors below the GOAWAY last stream ID.
1926   // TODO(diannahu): Consider informing the visitor of dropped frames. This may
1927   // mean keeping the frames and invoking a frame-not-sent callback, similar to
1928   // nghttp2. Could add a closure to each frame in the frames queue.
1929   frames_.remove_if([](const auto& frame) {
1930     return frame->frame_type() != spdy::SpdyFrameType::RST_STREAM;
1931   });
1932 
1933   if (initial_settings != nullptr) {
1934     frames_.push_front(std::move(initial_settings));
1935   }
1936 }
1937 
MaybeHandleMetadataEndForStream(Http2StreamId stream_id)1938 void OgHttp2Session::MaybeHandleMetadataEndForStream(Http2StreamId stream_id) {
1939   if (metadata_length_ == 0 && end_metadata_) {
1940     const bool completion_success = visitor_.OnMetadataEndForStream(stream_id);
1941     if (!completion_success) {
1942       fatal_visitor_callback_failure_ = true;
1943       decoder_.StopProcessing();
1944     }
1945     process_metadata_ = false;
1946     end_metadata_ = false;
1947   }
1948 }
1949 
DecrementQueuedFrameCount(uint32_t stream_id,uint8_t frame_type)1950 void OgHttp2Session::DecrementQueuedFrameCount(uint32_t stream_id,
1951                                                uint8_t frame_type) {
1952   auto iter = queued_frames_.find(stream_id);
1953   if (iter == queued_frames_.end()) {
1954     QUICHE_LOG(ERROR) << "Unable to find a queued frame count for stream "
1955                       << stream_id;
1956     return;
1957   }
1958   if (static_cast<FrameType>(frame_type) != FrameType::DATA) {
1959     --iter->second;
1960   }
1961   if (iter->second == 0) {
1962     // TODO(birenroy): Consider passing through `error_code` here.
1963     CloseStreamIfReady(frame_type, stream_id);
1964   }
1965 }
1966 
HandleContentLengthError(Http2StreamId stream_id)1967 void OgHttp2Session::HandleContentLengthError(Http2StreamId stream_id) {
1968   EnqueueFrame(std::make_unique<spdy::SpdyRstStreamIR>(
1969       stream_id, spdy::ERROR_CODE_PROTOCOL_ERROR));
1970 }
1971 
UpdateReceiveWindow(Http2StreamId stream_id,int32_t delta)1972 void OgHttp2Session::UpdateReceiveWindow(Http2StreamId stream_id,
1973                                          int32_t delta) {
1974   if (stream_id == 0) {
1975     connection_window_manager_.IncreaseWindow(delta);
1976     // TODO(b/181586191): Provide an explicit way to set the desired window
1977     // limit, remove the upsize-on-window-update behavior.
1978     const int64_t current_window =
1979         connection_window_manager_.CurrentWindowSize();
1980     if (current_window > connection_window_manager_.WindowSizeLimit()) {
1981       connection_window_manager_.SetWindowSizeLimit(current_window);
1982     }
1983   } else {
1984     auto iter = stream_map_.find(stream_id);
1985     if (iter != stream_map_.end()) {
1986       WindowManager& manager = iter->second.window_manager;
1987       manager.IncreaseWindow(delta);
1988       // TODO(b/181586191): Provide an explicit way to set the desired window
1989       // limit, remove the upsize-on-window-update behavior.
1990       const int64_t current_window = manager.CurrentWindowSize();
1991       if (current_window > manager.WindowSizeLimit()) {
1992         manager.SetWindowSizeLimit(current_window);
1993       }
1994     }
1995   }
1996 }
1997 
UpdateStreamSendWindowSizes(uint32_t new_value)1998 void OgHttp2Session::UpdateStreamSendWindowSizes(uint32_t new_value) {
1999   const int32_t delta =
2000       static_cast<int32_t>(new_value) - initial_stream_send_window_;
2001   initial_stream_send_window_ = new_value;
2002   for (auto& [stream_id, stream_state] : stream_map_) {
2003     const int64_t current_window_size = stream_state.send_window;
2004     const int64_t new_window_size = current_window_size + delta;
2005     if (new_window_size > spdy::kSpdyMaximumWindowSize) {
2006       EnqueueFrame(std::make_unique<spdy::SpdyRstStreamIR>(
2007           stream_id, spdy::ERROR_CODE_FLOW_CONTROL_ERROR));
2008     } else {
2009       stream_state.send_window += delta;
2010     }
2011     if (current_window_size <= 0 && new_window_size > 0) {
2012       write_scheduler_.MarkStreamReady(stream_id, false);
2013     }
2014   }
2015 }
2016 
UpdateStreamReceiveWindowSizes(uint32_t new_value)2017 void OgHttp2Session::UpdateStreamReceiveWindowSizes(uint32_t new_value) {
2018   for (auto& [stream_id, stream_state] : stream_map_) {
2019     stream_state.window_manager.OnWindowSizeLimitChange(new_value);
2020   }
2021 }
2022 
2023 }  // namespace adapter
2024 }  // namespace http2
2025