• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_grpc/connection.h"
16 
17 #include <cinttypes>
18 #include <cstring>
19 #include <string_view>
20 #include <type_traits>
21 
22 #include "pw_assert/check.h"
23 #include "pw_chrono/system_clock.h"
24 #include "pw_grpc_private/hpack.h"
25 #include "pw_log/log.h"
26 #include "pw_preprocessor/compiler.h"
27 #include "pw_status/try.h"
28 
29 namespace pw::grpc {
30 namespace internal {
31 
32 // RFC 9113 §6
33 // Enum names left in naming style of RFC
34 enum class FrameType : uint8_t {
35   DATA = 0x00,
36   HEADERS = 0x01,
37   PRIORITY = 0x02,
38   RST_STREAM = 0x03,
39   SETTINGS = 0x04,
40   PUSH_PROMISE = 0x05,
41   PING = 0x06,
42   GOAWAY = 0x07,
43   WINDOW_UPDATE = 0x08,
44   CONTINUATION = 0x09,
45 };
46 
47 // RFC 9113 §4.1
48 constexpr size_t kFrameHeaderEncodedSize = 9;
49 struct FrameHeader {
50   uint32_t payload_length;
51   FrameType type;
52   uint8_t flags;
53   StreamId stream_id;
54 };
55 
56 // RFC 9113 §7
57 // Enum names left in naming style of RFC
58 enum class Http2Error : uint32_t {
59   NO_ERROR = 0x00,
60   PROTOCOL_ERROR = 0x01,
61   INTERNAL_ERROR = 0x02,
62   FLOW_CONTROL_ERROR = 0x03,
63   SETTINGS_TIMEOUT = 0x04,
64   STREAM_CLOSED = 0x05,
65   FRAME_SIZE_ERROR = 0x06,
66   REFUSED_STREAM = 0x07,
67   CANCEL = 0x08,
68   COMPRESSION_ERROR = 0x09,
69   CONNECT_ERROR = 0x0a,
70   ENHANCE_YOUR_CALM = 0x0b,
71   INADEQUATE_SECURITY = 0x0c,
72   HTTP_1_1_REQUIRED = 0x0d,
73 };
74 
75 }  // namespace internal
76 
77 namespace {
78 
79 using internal::FrameHeader;
80 using internal::FrameType;
81 using internal::Http2Error;
82 using internal::kMaxConcurrentStreams;
83 using internal::kMaxGrpcMessageSize;
84 
85 // RFC 9113 §3.4
86 constexpr std::string_view kExpectedConnectionPrefaceLiteral(
87     "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n");
88 
89 static_assert(kMaxMethodNameSize == kHpackMaxStringSize);
90 
91 constexpr size_t kLengthPrefixedMessageHdrSize = 5;
92 
93 enum {
94   FLAGS_ACK = 0x01,
95   FLAGS_END_STREAM = 0x01,
96   FLAGS_END_HEADERS = 0x04,
97   FLAGS_PADDED = 0x08,
98   FLAGS_PRIORITY = 0x20,
99 };
100 
101 // RFC 9113 §6.5.2
102 enum SettingType : uint16_t {
103   SETTINGS_HEADER_TABLE_SIZE = 0x01,
104   SETTINGS_ENABLE_PUSH = 0x02,
105   SETTINGS_MAX_CONCURRENT_STREAMS = 0x03,
106   SETTINGS_INITIAL_WINDOW_SIZE = 0x04,
107   SETTINGS_MAX_FRAME_SIZE = 0x05,
108   SETTINGS_MAX_HEADER_LIST_SIZE = 0x06,
109 };
110 
ReadExactly(stream::Reader & reader,ByteSpan buffer)111 Status ReadExactly(stream::Reader& reader, ByteSpan buffer) {
112   size_t bytes_read = 0;
113   while (bytes_read < buffer.size()) {
114     PW_TRY_ASSIGN(auto out, reader.Read(buffer.subspan(bytes_read)));
115     bytes_read += out.size();
116   }
117   return OkStatus();
118 }
119 
ReadFrameHeader(stream::Reader & reader)120 Result<FrameHeader> ReadFrameHeader(stream::Reader& reader) {
121   std::array<std::byte, internal::kFrameHeaderEncodedSize> buffer;
122   PW_TRY(ReadExactly(reader, buffer));
123 
124   // RFC 9113 §4.1
125   FrameHeader out;
126   ByteBuilder builder(as_writable_bytes(span{buffer}));
127   auto it = builder.begin();
128   auto type_and_length = it.ReadUint32(endian::big);
129   out.payload_length = type_and_length >> 8;
130   out.type = static_cast<FrameType>(type_and_length & 0xff);
131   out.flags = it.ReadUint8();
132   out.stream_id = it.ReadUint32(endian::big) & 0x7fffffff;
133   return out;
134 }
135 
136 template <typename T, std::enable_if_t<std::is_integral_v<T>, bool> = true>
ToNetworkOrder(T value)137 constexpr T ToNetworkOrder(T value) {
138   return bytes::ConvertOrder(/*from=*/endian::native,
139                              /*to=*/endian::big,
140                              value);
141 }
142 
143 template <typename T, std::enable_if_t<std::is_enum_v<T>, bool> = true>
ToNetworkOrder(T value)144 constexpr std::underlying_type_t<T> ToNetworkOrder(T value) {
145   return ToNetworkOrder(static_cast<std::underlying_type_t<T>>(value));
146 }
147 
148 // Use this instead of FrameHeader when writing frames.
PW_PACKED(struct)149 PW_PACKED(struct) WireFrameHeader {
150   WireFrameHeader(FrameHeader h)
151       : payload_length_and_type(ToNetworkOrder(h.payload_length << 8 |
152                                                static_cast<uint32_t>(h.type))),
153         flags(h.flags),
154         stream_id(ToNetworkOrder(h.stream_id)) {}
155 
156   uint32_t payload_length_and_type;
157   uint8_t flags;
158   uint32_t stream_id;
159 };
160 
161 template <typename T>
AsBytes(T & object)162 ConstByteSpan AsBytes(T& object) {
163   return as_bytes(span<T, 1>{&object, 1});
164 }
165 
166 }  // namespace
167 
Connection(stream::ReaderWriter & socket,SendQueue & send_queue,RequestCallbacks & callbacks,allocator::Allocator * message_assembly_allocator,multibuf::MultiBufAllocator & multibuf_allocator)168 Connection::Connection(stream::ReaderWriter& socket,
169                        SendQueue& send_queue,
170                        RequestCallbacks& callbacks,
171                        allocator::Allocator* message_assembly_allocator,
172                        multibuf::MultiBufAllocator& multibuf_allocator)
173     : socket_(socket),
174       shared_state_(std::in_place,
175                     message_assembly_allocator,
176                     multibuf_allocator,
177                     send_queue),
178       reader_(*this, callbacks),
179       writer_(*this) {}
180 
ProcessFrame()181 Status Connection::Reader::ProcessFrame() {
182   if (!received_connection_preface_) {
183     return Status::FailedPrecondition();
184   }
185 
186   PW_TRY_ASSIGN(auto frame, ReadFrameHeader(connection_.socket_.as_reader()));
187   switch (frame.type) {
188     // Frames that we handle.
189     case FrameType::DATA:
190       PW_TRY(ProcessDataFrame(frame));
191       break;
192     case FrameType::HEADERS:
193       PW_TRY(ProcessHeadersFrame(frame));
194       break;
195     case FrameType::PRIORITY:
196       PW_TRY(ProcessIgnoredFrame(frame));
197       break;
198     case FrameType::RST_STREAM:
199       PW_TRY(ProcessRstStreamFrame(frame));
200       break;
201     case FrameType::SETTINGS:
202       PW_TRY(ProcessSettingsFrame(frame, /*send_ack=*/true));
203       break;
204     case FrameType::PING:
205       PW_TRY(ProcessPingFrame(frame));
206       break;
207     case FrameType::WINDOW_UPDATE:
208       PW_TRY(ProcessWindowUpdateFrame(frame));
209       break;
210 
211     // Frames that trigger an immediate connection close.
212     case FrameType::GOAWAY:
213       PW_LOG_ERROR("Client sent GOAWAY");
214       // don't bother sending GOAWAY in response
215       return Status::Internal();
216     case FrameType::PUSH_PROMISE:
217       PW_LOG_ERROR("Client sent PUSH_PROMISE");
218       SendGoAway(Http2Error::PROTOCOL_ERROR);
219       return Status::Internal();
220     case FrameType::CONTINUATION:
221       PW_LOG_ERROR("Client sent CONTINUATION: unsupported");
222       SendGoAway(Http2Error::INTERNAL_ERROR);
223       return Status::Internal();
224   }
225 
226   return OkStatus();
227 }
228 
CreateStream(StreamId id,int32_t initial_send_window)229 pw::Status Connection::SharedState::CreateStream(StreamId id,
230                                                  int32_t initial_send_window) {
231   for (size_t i = 0; i < streams_.size(); i++) {
232     if (streams_[i].id != 0) {
233       continue;
234     }
235     PW_LOG_DEBUG("Conn.CreateStream id=%" PRIu32 " at slot=%" PRIu32,
236                  id,
237                  static_cast<uint32_t>(i));
238     streams_[i].id = id;
239     streams_[i].half_closed = false;
240     streams_[i].started_response = false;
241     streams_[i].send_window = initial_send_window;
242     return OkStatus();
243   }
244   PW_LOG_WARN("Conn.CreateStream id=%" PRIu32 " OUT OF SPACE", id);
245   return Status::ResourceExhausted();
246 }
247 
ForAllStreams(Function<void (Stream *)> && callback)248 void Connection::SharedState::ForAllStreams(
249     Function<void(Stream*)>&& callback) {
250   for (size_t i = 0; i < streams_.size(); i++) {
251     if (streams_[i].id != 0) {
252       callback(&streams_[i]);
253     }
254   }
255 }
256 
LookupStream(StreamId id)257 Connection::Stream* Connection::SharedState::LookupStream(StreamId id) {
258   for (size_t i = 0; i < streams_.size(); i++) {
259     if (streams_[i].id == id) {
260       return &streams_[i];
261     }
262   }
263   return nullptr;
264 }
265 
DrainResponseQueue(Stream & stream)266 Status Connection::SharedState::DrainResponseQueue(Stream& stream) {
267   while (stream.response_queue.size() > 0) {
268     multibuf::MultiBufChunks& chunks = stream.response_queue.Chunks();
269 
270     size_t message_size = chunks.front().size();
271 
272     if (static_cast<int32_t>(message_size) > stream.send_window ||
273         static_cast<int32_t>(message_size) > connection_send_window_) {
274       break;
275     }
276 
277     PW_TRY(SendQueued(stream, stream.response_queue.TakeFrontChunk()));
278   }
279   return OkStatus();
280 }
281 
DrainResponseQueues()282 Status Connection::SharedState::DrainResponseQueues() {
283   for (Stream& stream : streams_) {
284     PW_TRY(DrainResponseQueue(stream));
285   }
286   return OkStatus();
287 }
288 
SendBytes(ConstByteSpan message)289 Status Connection::SharedState::SendBytes(ConstByteSpan message) {
290   std::optional<multibuf::MultiBuf> buffer =
291       multibuf_allocator_.AllocateContiguous(message.size());
292   if (!buffer.has_value()) {
293     return Status::ResourceExhausted();
294   }
295   PW_TRY(buffer->CopyFrom(message, 0));
296   send_queue_.QueueSend(std::move(*buffer));
297   return OkStatus();
298 }
299 
300 // RFC 9113 §6.1
301 //
302 // multibuf chunk should have already been allocated with enough prefix space
303 // for headers.
SendData(StreamId stream_id,multibuf::OwnedChunk && chunk)304 Status Connection::SharedState::SendData(StreamId stream_id,
305                                          multibuf::OwnedChunk&& chunk) {
306   size_t message_size = chunk.size();
307   PW_LOG_DEBUG("Conn.Send DATA with id=%" PRIu32 " len=%" PRIu32,
308                stream_id,
309                static_cast<uint32_t>(message_size));
310 
311   // Write a Length-Prefixed-Message payload.
312   if (!chunk->ClaimPrefix(kLengthPrefixedMessageHdrSize)) {
313     return Status::ResourceExhausted();
314   }
315 
316   ByteBuilder prefix(chunk);
317   prefix.PutUint8(0);
318   prefix.PutUint32(message_size, endian::big);
319 
320   // Write FrameHeader
321   if (!chunk->ClaimPrefix(sizeof(WireFrameHeader))) {
322     return Status::ResourceExhausted();
323   }
324 
325   WireFrameHeader frame(FrameHeader{
326       .payload_length =
327           static_cast<uint32_t>(message_size + kLengthPrefixedMessageHdrSize),
328       .type = FrameType::DATA,
329       .flags = 0,
330       .stream_id = stream_id,
331   });
332   ConstByteSpan frame_span = AsBytes(frame);
333   std::copy(frame_span.begin(), frame_span.end(), chunk->begin());
334 
335   auto buffer = multibuf::MultiBuf::FromChunk(std::move(chunk));
336   send_queue_.QueueSend(std::move(buffer));
337   return OkStatus();
338 }
339 
340 // RFC 9113 §6.2
SendHeaders(StreamId stream_id,ConstByteSpan payload1,ConstByteSpan payload2,bool end_stream)341 Status Connection::SharedState::SendHeaders(StreamId stream_id,
342                                             ConstByteSpan payload1,
343                                             ConstByteSpan payload2,
344                                             bool end_stream) {
345   PW_LOG_DEBUG("Conn.Send HEADERS with id=%" PRIu32 " len1=%" PRIu32
346                " len2=%" PRIu32 " end=%d",
347                stream_id,
348                static_cast<uint32_t>(payload1.size()),
349                static_cast<uint32_t>(payload2.size()),
350                end_stream);
351   WireFrameHeader frame(FrameHeader{
352       .payload_length =
353           static_cast<uint32_t>(payload1.size() + payload2.size()),
354       .type = FrameType::HEADERS,
355       .flags = FLAGS_END_HEADERS,
356       .stream_id = stream_id,
357   });
358 
359   if (end_stream) {
360     frame.flags |= FLAGS_END_STREAM;
361   }
362 
363   ConstByteSpan frame_span = AsBytes(frame);
364   std::optional<multibuf::MultiBuf> buffer =
365       multibuf_allocator_.AllocateContiguous(frame_span.size() +
366                                              payload1.size() + payload2.size());
367   if (!buffer.has_value()) {
368     return Status::ResourceExhausted();
369   }
370 
371   size_t offset = 0;
372   PW_TRY(buffer->CopyFrom(frame_span, offset));
373   offset += frame_span.size();
374 
375   if (!payload1.empty()) {
376     PW_TRY(buffer->CopyFrom(payload1, offset));
377     offset += payload1.size();
378   }
379   if (!payload2.empty()) {
380     PW_TRY(buffer->CopyFrom(payload2, offset));
381     offset += payload2.size();
382   }
383 
384   send_queue_.QueueSend(std::move(*buffer));
385   return OkStatus();
386 }
387 
388 // RFC 9113 §6.4
SendRstStream(StreamId stream_id,Http2Error code)389 Status Connection::SharedState::SendRstStream(StreamId stream_id,
390                                               Http2Error code) {
391   PW_PACKED(struct) RstStreamFrame {
392     WireFrameHeader header;
393     uint32_t error_code;
394   };
395   RstStreamFrame frame{
396       .header = WireFrameHeader(FrameHeader{
397           .payload_length = 4,
398           .type = FrameType::RST_STREAM,
399           .flags = 0,
400           .stream_id = stream_id,
401       }),
402       .error_code = ToNetworkOrder(code),
403   };
404   return SendBytes(AsBytes(frame));
405 }
406 
407 // RFC 9113 §6.9
SendWindowUpdates(StreamId stream_id,uint32_t increment)408 Status Connection::SharedState::SendWindowUpdates(StreamId stream_id,
409                                                   uint32_t increment) {
410   // It is illegal to send updates with increment=0.
411   if (increment == 0) {
412     return OkStatus();
413   }
414   if (increment & 0x80000000) {
415     // Upper bit is reserved, error.
416     return Status::InvalidArgument();
417   }
418 
419   PW_LOG_DEBUG("Conn.Send WINDOW_UPDATE frames with id=%" PRIu32
420                " increment=%" PRIu32,
421                stream_id,
422                increment);
423 
424   PW_PACKED(struct) WindowUpdateFrame {
425     WireFrameHeader header;
426     uint32_t increment;
427   };
428   WindowUpdateFrame frames[2] = {
429       {
430           .header = WireFrameHeader(FrameHeader{
431               .payload_length = 4,
432               .type = FrameType::WINDOW_UPDATE,
433               .flags = 0,
434               .stream_id = 0,
435           }),
436           .increment = ToNetworkOrder(increment),
437       },
438       {
439           .header = WireFrameHeader(FrameHeader{
440               .payload_length = 4,
441               .type = FrameType::WINDOW_UPDATE,
442               .flags = 0,
443               .stream_id = stream_id,
444           }),
445           .increment = ToNetworkOrder(increment),
446       },
447   };
448   return SendBytes(as_bytes(span{frames}));
449 }
450 
451 // RFC 9113 §6.5
SendSettingsAck()452 Status Connection::SharedState::SendSettingsAck() {
453   PW_LOG_DEBUG("Conn.Send SETTINGS ACK");
454   WireFrameHeader frame(FrameHeader{
455       .payload_length = 0,
456       .type = FrameType::SETTINGS,
457       .flags = FLAGS_ACK,
458       .stream_id = 0,
459   });
460   return SendBytes(AsBytes(frame));
461 }
462 
SendResponseMessage(StreamId stream_id,ConstByteSpan message)463 Status Connection::Writer::SendResponseMessage(StreamId stream_id,
464                                                ConstByteSpan message) {
465   auto state = connection_.LockState();
466 
467   if (message.size() > kMaxGrpcMessageSize) {
468     PW_LOG_WARN("Message %" PRIu32 " bytes on id=%" PRIu32
469                 " exceeds maximum message size",
470                 static_cast<uint32_t>(message.size()),
471                 stream_id);
472     return Status::InvalidArgument();
473   }
474 
475   // Create contiguous buffer big enough to hold the response message plus
476   // headers.
477   std::optional<multibuf::MultiBuf> buffer =
478       state->multibuf_allocator().AllocateContiguous(
479           message.size() + kLengthPrefixedMessageHdrSize +
480           sizeof(WireFrameHeader));
481 
482   if (!buffer.has_value()) {
483     return Status::ResourceExhausted();
484   }
485 
486   // Before copying message in, move internal offset forward past header region.
487   buffer->DiscardPrefix(kLengthPrefixedMessageHdrSize +
488                         sizeof(WireFrameHeader));
489   PW_TRY(buffer->CopyFrom(message, 0));
490 
491   return state->QueueStreamResponse(stream_id, std::move(*buffer));
492 }
493 
QueueStreamResponse(StreamId id,multibuf::MultiBuf && buffer)494 Status Connection::SharedState::QueueStreamResponse(
495     StreamId id, multibuf::MultiBuf&& buffer) {
496   auto stream = LookupStream(id);
497   if (!stream) {
498     return Status::NotFound();
499   }
500   stream->response_queue.PushSuffix(std::move(buffer));
501   // Try and send if we have window
502   return DrainResponseQueues();
503 }
504 
SendQueued(Connection::Stream & stream,multibuf::OwnedChunk && chunk)505 Status Connection::SharedState::SendQueued(Connection::Stream& stream,
506                                            multibuf::OwnedChunk&& chunk) {
507   size_t message_size = chunk.size();
508 
509   auto status = OkStatus();
510   if (!stream.started_response) {
511     stream.started_response = true;
512     status = SendHeaders(stream.id,
513                          ResponseHeadersPayload(),
514                          ConstByteSpan(),
515                          /*end_stream=*/false);
516   }
517 
518   if (status.ok()) {
519     status = SendData(stream.id, std::move(chunk));
520   }
521 
522   if (!status.ok()) {
523     PW_LOG_WARN("Failed sending response message on id=%" PRIu32 " error=%d",
524                 stream.id,
525                 status.code());
526     return status;
527   }
528 
529   stream.send_window -= message_size;
530   connection_send_window_ -= message_size;
531 
532   return OkStatus();
533 }
534 
SendResponseComplete(StreamId stream_id,Status response_code)535 Status Connection::Writer::SendResponseComplete(StreamId stream_id,
536                                                 Status response_code) {
537   auto state = connection_.LockState();
538   auto stream = state->LookupStream(stream_id);
539   if (!stream) {
540     return Status::NotFound();
541   }
542 
543   Status status;
544   if (!stream->started_response) {
545     // If the response has not started yet, we need to include the initial
546     // headers.
547     PW_LOG_DEBUG("Conn.SendResponseWithTrailers id=%" PRIu32 " code=%d",
548                  stream_id,
549                  response_code.code());
550     status = state->SendHeaders(stream_id,
551                                 ResponseHeadersPayload(),
552                                 ResponseTrailersPayload(response_code),
553                                 /*end_stream=*/true);
554   } else {
555     PW_LOG_DEBUG("Conn.SendTrailers id=%" PRIu32 " code=%d",
556                  stream_id,
557                  response_code.code());
558     status = state->SendHeaders(stream_id,
559                                 ConstByteSpan(),
560                                 ResponseTrailersPayload(response_code),
561                                 /*end_stream=*/true);
562   }
563 
564   if (!status.ok()) {
565     PW_LOG_WARN("Failed sending response complete on id=%" PRIu32 " error=%d",
566                 stream_id,
567                 status.code());
568     return Status::Unavailable();
569   }
570 
571   PW_LOG_DEBUG("Conn.CloseStream id=%" PRIu32, stream_id);
572   stream->Reset();
573 
574   return OkStatus();
575 }
576 
CloseStream(Connection::Stream * stream)577 void Connection::Reader::CloseStream(Connection::Stream* stream) {
578   StreamId id = stream->id;
579   PW_LOG_DEBUG("Conn.CloseStream id=%" PRIu32, id);
580   stream->Reset();
581   callbacks_.OnCancel(id);
582 }
583 
584 // RFC 9113 §3.4
ProcessConnectionPreface()585 Status Connection::Reader::ProcessConnectionPreface() {
586   if (received_connection_preface_) {
587     return OkStatus();
588   }
589 
590   callbacks_.OnNewConnection();
591 
592   // The preface starts with a literal string.
593   auto literal = span{payload_scratch_}.subspan(
594       0, kExpectedConnectionPrefaceLiteral.size());
595 
596   PW_TRY(ReadExactly(connection_.socket_.as_reader(), literal));
597   if (std::memcmp(literal.data(),
598                   kExpectedConnectionPrefaceLiteral.data(),
599                   kExpectedConnectionPrefaceLiteral.size()) != 0) {
600     PW_LOG_ERROR("Invalid connection preface literal");
601     return Status::Internal();
602   }
603 
604   PW_LOG_DEBUG("Conn.Preface received literal");
605 
606   // Client must send a SETTINGS frames.
607   PW_TRY_ASSIGN(auto client_frame,
608                 ReadFrameHeader(connection_.socket_.as_reader()));
609   if (client_frame.type != FrameType::SETTINGS) {
610     PW_LOG_ERROR(
611         "Connection preface missing SETTINGS frame, found frame.type=%d",
612         static_cast<int>(client_frame.type));
613     return Status::Internal();
614   }
615 
616   // Don't send an ACK yet, we'll do that below.
617   PW_TRY(ProcessSettingsFrame(client_frame, /*send_ack=*/false));
618   PW_LOG_DEBUG("Conn.Preface received SETTINGS");
619 
620   // We must send a SETTINGS frame.
621   // RFC 9113 §6.5.2
622   PW_PACKED(struct) Setting {
623     uint16_t id;
624     uint32_t value;
625   };
626   PW_PACKED(struct) SettingsFrame {
627     WireFrameHeader header;
628     Setting settings[2];
629   };
630   SettingsFrame server_frame{
631       .header = WireFrameHeader(FrameHeader{
632           .payload_length = 12,
633           .type = FrameType::SETTINGS,
634           .flags = 0,
635           .stream_id = 0,
636       }),
637       .settings =
638           {
639               {
640                   .id = ToNetworkOrder(SETTINGS_HEADER_TABLE_SIZE),
641                   .value = ToNetworkOrder(kHpackDynamicHeaderTableSize),
642               },
643               {
644                   .id = ToNetworkOrder(SETTINGS_MAX_CONCURRENT_STREAMS),
645                   .value = ToNetworkOrder(kMaxConcurrentStreams),
646               },
647           },
648   };
649   PW_LOG_DEBUG("Conn.Send SETTINGS");
650 
651   {
652     auto state = connection_.LockState();
653     PW_TRY(state->SendBytes(AsBytes(server_frame)));
654 
655     // We must ack the client's SETTINGS frame *after* sending our SETTINGS.
656     PW_TRY(state->SendSettingsAck());
657   }
658 
659   received_connection_preface_ = true;
660   PW_LOG_DEBUG("Conn.Preface complete");
661   return OkStatus();
662 }
663 
664 // RFC 9113 §6.1
ProcessDataFrame(const FrameHeader & frame)665 Status Connection::Reader::ProcessDataFrame(const FrameHeader& frame) {
666   PW_LOG_DEBUG("Conn.Recv DATA id=%" PRIu32 " flags=0x%x len=%" PRIu32,
667                frame.stream_id,
668                frame.flags,
669                frame.payload_length);
670 
671   if (frame.stream_id == 0) {
672     // RFC 9113 §6.1: "If a DATA frame is received whose Stream Identifier field
673     // is 0x00, the recipient MUST respond with a connection error of type
674     // PROTOCOL_ERROR."
675     SendGoAway(Http2Error::PROTOCOL_ERROR);
676     return Status::Internal();
677   }
678 
679   {
680     auto state = connection_.LockState();
681 
682     // From RFC 9113 §6.9: "A receiver that receives a flow-controlled frame
683     // MUST always account for its contribution against the connection
684     // flow-control window, unless the receiver treats this as a connection
685     // error. This is necessary even if the frame is in error. The sender counts
686     // the frame toward the flow-control window, but if the receiver does not,
687     // the flow-control window at the sender and receiver can become different."
688     //
689     // To simplify this, we send WINDOW_UPDATE frames eagerly.
690     //
691     // In the future we should do something less chatty.
692     PW_TRY(state->SendWindowUpdates(frame.stream_id, frame.payload_length));
693 
694     auto stream = state->LookupStream(frame.stream_id);
695     if (!stream) {
696       PW_LOG_DEBUG("Ignoring DATA on closed stream id=%" PRIu32,
697                    frame.stream_id);
698       // Unlock since ProcessIgnoredFrame will try and read all the ignored
699       // data, and also may try and take the lock in the error case.
700       connection_.UnlockState(std::move(state));
701       PW_TRY(ProcessIgnoredFrame(frame));
702       // Stream has been fully closed: silently ignore.
703       return OkStatus();
704     }
705 
706     if (stream->half_closed) {
707       PW_LOG_ERROR("Recv DATA on half-closed stream id=%" PRIu32,
708                    frame.stream_id);
709       // Unlock since ProcessIgnoredFrame will try and read all the ignored
710       // data, and also may try and take the lock in the error case.
711       connection_.UnlockState(std::move(state));
712       PW_TRY(ProcessIgnoredFrame(frame));
713       state = connection_.LockState();
714       stream = state->LookupStream(frame.stream_id);
715       if (stream) {
716         // RFC 9113 §6.1: "If a DATA frame is received whose stream is not in
717         // the "open" or "half-closed (local)" state, the recipient MUST respond
718         // with a stream error of type STREAM_CLOSED."
719         PW_TRY(SendRstStreamAndClose(state, stream, Http2Error::STREAM_CLOSED));
720       }
721       return OkStatus();
722     }
723   }
724 
725   PW_TRY_ASSIGN(auto payload, ReadFramePayload(frame));
726 
727   // Drop padding.
728   if ((frame.flags & FLAGS_PADDED) != 0) {
729     if (payload.size() < 1) {
730       // RFC 9113 §4.2: "An endpoint MUST send an error code of FRAME_SIZE_ERROR
731       // if a frame ... is too small to contain mandatory frame data."
732       SendGoAway(Http2Error::FRAME_SIZE_ERROR);
733       return Status::Internal();
734     }
735 
736     uint32_t pad_length = static_cast<uint32_t>(payload[0]);
737     if (pad_length >= frame.payload_length) {
738       // RFC 9113 §6.1: "If the length of the padding is the length of the frame
739       // payload or greater, the recipient MUST treat this as a connection error
740       // of type PROTOCOL_ERROR."
741       SendGoAway(Http2Error::PROTOCOL_ERROR);
742       return Status::Internal();
743     }
744     payload = payload.subspan(1, payload.size() - pad_length - 1);
745   }
746 
747   auto state = connection_.LockState();
748   Stream* stream = state->LookupStream(frame.stream_id);
749   if (!stream) {
750     return OkStatus();
751   }
752 
753   // Parse repeated grpc Length-Prefix-Message.
754   // https://github.com/grpc/grpc/blob/v1.60.x/doc/PROTOCOL-HTTP2.md#requests
755   while (!payload.empty()) {
756     uint32_t message_length;
757 
758     // If we aren't reassembling a message, read the next length prefix.
759     if (!stream->assembly_buffer) {
760       size_t read = std::min(5 - static_cast<size_t>(stream->prefix_received),
761                              payload.size());
762       std::copy(payload.begin(),
763                 payload.begin() + read,
764                 stream->prefix_buffer.data() + stream->prefix_received);
765       stream->prefix_received += read;
766       payload = payload.subspan(read);
767 
768       // Read the length prefix.
769       if (stream->prefix_received < 5) {
770         continue;
771       }
772       stream->prefix_received = 0;
773 
774       ByteBuilder builder(stream->prefix_buffer);
775       auto it = builder.begin();
776       auto message_compressed = it.ReadUint8();
777       message_length = it.ReadUint32(endian::big);
778       if (message_compressed != 0) {
779         PW_LOG_ERROR("Unsupported: grpc message is compressed");
780         PW_TRY(
781             SendRstStreamAndClose(state, stream, Http2Error::INTERNAL_ERROR));
782         return OkStatus();
783       }
784 
785       if (message_length > payload.size()) {
786         // gRPC message is split across DATA frames, must allocate buffer.
787         if (!state->message_assembly_allocator()) {
788           PW_LOG_ERROR(
789               "Unsupported: split grpc message without allocator provided");
790           PW_TRY(
791               SendRstStreamAndClose(state, stream, Http2Error::INTERNAL_ERROR));
792           return OkStatus();
793         }
794 
795         stream->assembly_buffer = static_cast<std::byte*>(
796             state->message_assembly_allocator()->Allocate(
797                 allocator::Layout(message_length)));
798         if (stream->assembly_buffer == nullptr) {
799           PW_LOG_ERROR("Partial message reassembly buffer allocation failed");
800           PW_TRY(
801               SendRstStreamAndClose(state, stream, Http2Error::INTERNAL_ERROR));
802           return OkStatus();
803         }
804         stream->message_length = message_length;
805         stream->message_received = 0;
806         continue;
807       }
808     }
809 
810     pw::ByteSpan message;
811 
812     // Reading message payload.
813     if (stream->assembly_buffer != nullptr) {
814       uint32_t read =
815           std::min(stream->message_length - stream->message_received,
816                    static_cast<uint32_t>(payload.size()));
817       std::copy(payload.begin(),
818                 payload.begin() + read,
819                 stream->assembly_buffer + stream->message_received);
820       payload = payload.subspan(read);
821       stream->message_received += read;
822       if (stream->message_received < stream->message_length) {
823         continue;
824       }
825       // Fully received message.
826       message = pw::span(stream->assembly_buffer, stream->message_length);
827     } else {
828       message = payload.subspan(0, message_length);
829       payload = payload.subspan(message_length);
830     }
831 
832     // Release state lock before callback, reacquire after.
833     connection_.UnlockState(std::move(state));
834     const auto status = callbacks_.OnMessage(frame.stream_id, message);
835     state = connection_.LockState();
836     stream = state->LookupStream(frame.stream_id);
837     if (!stream) {
838       return OkStatus();
839     }
840 
841     if (!status.ok()) {
842       PW_TRY(SendRstStreamAndClose(state, stream, Http2Error::INTERNAL_ERROR));
843       return OkStatus();
844     }
845 
846     if (stream->assembly_buffer != nullptr) {
847       state->message_assembly_allocator()->Deallocate(stream->assembly_buffer);
848       stream->assembly_buffer = nullptr;
849       stream->message_length = 0;
850       stream->message_received = 0;
851     }
852   }
853 
854   // grpc requires every request stream to end with an empty DATA frame with
855   // FLAGS_END_STREAM. If a client sends FLAGS_END_STREAM with a non-empty
856   // payload, it's not specified how the server should respond. We choose to
857   // accept the payload before ending the stream.
858   // See: https://github.com/grpc/grpc/blob/v1.60.x/doc/PROTOCOL-HTTP2.md.
859   if ((frame.flags & FLAGS_END_STREAM) != 0) {
860     stream->half_closed = true;
861     connection_.UnlockState(std::move(state));
862     callbacks_.OnHalfClose(frame.stream_id);
863   }
864 
865   return OkStatus();
866 }
867 
868 // RFC 9113 §6.2
ProcessHeadersFrame(const FrameHeader & frame)869 Status Connection::Reader::ProcessHeadersFrame(const FrameHeader& frame) {
870   PW_LOG_DEBUG("Conn.Recv HEADERS id=%" PRIu32 " len=%" PRIu32,
871                frame.stream_id,
872                frame.payload_length);
873 
874   if (frame.stream_id == 0) {
875     // RFC 9113 §6.2: "If a HEADERS frame is received whose Stream Identifier
876     // field is 0x00, the recipient MUST respond with a connection error of type
877     // PROTOCOL_ERROR."
878     SendGoAway(Http2Error::PROTOCOL_ERROR);
879     return Status::Internal();
880   }
881   if (frame.stream_id % 2 != 1 || frame.stream_id <= last_stream_id_) {
882     // RFC 9113 §5.1.1: "Streams initiated by a client MUST use odd-numbered
883     // stream identifiers ... The identifier of a newly established stream MUST
884     // be numerically greater than all streams that the initiating endpoint has
885     // opened ... An endpoint that receives an unexpected stream identifier MUST
886     // respond with a connection error of type PROTOCOL_ERROR."
887     SendGoAway(Http2Error::PROTOCOL_ERROR);
888     return Status::Internal();
889   }
890 
891   last_stream_id_ = frame.stream_id;
892 
893   {
894     auto state = connection_.LockState();
895     if (Stream* stream = state->LookupStream(frame.stream_id);
896         stream != nullptr) {
897       PW_LOG_DEBUG("Client sent HEADERS after the first stream message");
898       // Unlock since ProcessIgnoredFrame will try and read all the ignored
899       // data, and also may try and take the lock in the error case.
900       connection_.UnlockState(std::move(state));
901       PW_TRY(ProcessIgnoredFrame(frame));
902       state = connection_.LockState();
903       stream = state->LookupStream(frame.stream_id);
904       if (stream) {
905         // grpc requests cannot contain trailers.
906         // See:
907         // https://github.com/grpc/grpc/blob/v1.60.x/doc/PROTOCOL-HTTP2.md.
908         PW_TRY(
909             SendRstStreamAndClose(state, stream, Http2Error::PROTOCOL_ERROR));
910       }
911       return OkStatus();
912     }
913   }
914 
915   if ((frame.flags & FLAGS_END_STREAM) != 0) {
916     PW_LOG_DEBUG("Client sent HEADERS with END_STREAM");
917     PW_TRY(ProcessIgnoredFrame(frame));
918     // grpc requests must send END_STREAM in an empty DATA frame.
919     // See: https://github.com/grpc/grpc/blob/v1.60.x/doc/PROTOCOL-HTTP2.md.
920     auto state = connection_.LockState();
921     PW_TRY(state->SendRstStream(frame.stream_id, Http2Error::PROTOCOL_ERROR));
922     return OkStatus();
923   }
924   if ((frame.flags & FLAGS_END_HEADERS) == 0) {
925     PW_LOG_ERROR("Client sent HEADERS frame without END_HEADERS: unsupported");
926     SendGoAway(Http2Error::INTERNAL_ERROR);
927     return Status::Internal();
928   }
929 
930   PW_TRY_ASSIGN(auto payload, ReadFramePayload(frame));
931 
932   // Drop padding.
933   if ((frame.flags & FLAGS_PADDED) != 0) {
934     if (payload.size() < 1) {
935       // RFC 9113 §4.2: "An endpoint MUST send an error code of FRAME_SIZE_ERROR
936       // if a frame ... is too small to contain mandatory frame data. A frame
937       // size error in a frame that could alter the state of the entire
938       // connection MUST be treated as a connection error"
939       SendGoAway(Http2Error::FRAME_SIZE_ERROR);
940       return Status::Internal();
941     }
942     uint32_t pad_length = static_cast<uint32_t>(payload[0]);
943     if (pad_length >= frame.payload_length) {
944       // RFC 9113 §6.2: "If the length of the padding is the length of the frame
945       // payload or greater, the recipient MUST treat this as a connection error
946       // of type PROTOCOL_ERROR."
947       SendGoAway(Http2Error::PROTOCOL_ERROR);
948       return Status::Internal();
949     }
950     payload = payload.subspan(1, payload.size() - pad_length - 1);
951   }
952 
953   // Drop priority fields.
954   if ((frame.flags & FLAGS_PRIORITY) != 0) {
955     if (payload.size() < 5) {
956       // RFC 9113 §4.2: "An endpoint MUST send an error code of FRAME_SIZE_ERROR
957       // if a frame ... is too small to contain mandatory frame data."
958       SendGoAway(Http2Error::FRAME_SIZE_ERROR);
959       return Status::Internal();
960     }
961     payload = payload.subspan(5);
962   }
963 
964   PW_TRY_ASSIGN(auto method_name, HpackParseRequestHeaders(payload));
965   {
966     auto state = connection_.LockState();
967     if (!state->CreateStream(frame.stream_id, initial_send_window_).ok()) {
968       PW_LOG_WARN("Too many streams, rejecting id=%" PRIu32, frame.stream_id);
969       return state->SendRstStream(frame.stream_id, Http2Error::REFUSED_STREAM);
970     }
971   }
972 
973   if (const auto status = callbacks_.OnNew(frame.stream_id, method_name);
974       !status.ok()) {
975     auto state = connection_.LockState();
976     if (Stream* stream = state->LookupStream(frame.stream_id);
977         stream != nullptr) {
978       return SendRstStreamAndClose(state, stream, Http2Error::INTERNAL_ERROR);
979     }
980   }
981 
982   return OkStatus();
983 }
984 
AddConnectionSendWindow(int32_t delta)985 Status Connection::SharedState::AddConnectionSendWindow(int32_t delta) {
986   if (PW_ADD_OVERFLOW(
987           connection_send_window_, delta, &connection_send_window_)) {
988     return Status::Internal();
989   }
990 
991   DrainResponseQueues().IgnoreError();
992 
993   return OkStatus();
994 }
995 
AddAllStreamsSendWindow(int32_t delta)996 Status Connection::SharedState::AddAllStreamsSendWindow(int32_t delta) {
997   for (size_t i = 0; i < streams_.size(); i++) {
998     if (streams_[i].id == 0) {
999       continue;
1000     }
1001     if (PW_ADD_OVERFLOW(
1002             streams_[i].send_window, delta, &streams_[i].send_window)) {
1003       return Status::Internal();
1004     }
1005   }
1006 
1007   DrainResponseQueues().IgnoreError();
1008 
1009   return OkStatus();
1010 }
1011 
AddStreamSendWindow(StreamId id,int32_t delta)1012 Status Connection::SharedState::AddStreamSendWindow(StreamId id,
1013                                                     int32_t delta) {
1014   Stream* stream = LookupStream(id);
1015   if (!stream) {
1016     return Status::NotFound();
1017   }
1018 
1019   if (PW_ADD_OVERFLOW(stream->send_window, delta, &stream->send_window)) {
1020     return Status::Internal();
1021   }
1022 
1023   DrainResponseQueues().IgnoreError();
1024 
1025   return OkStatus();
1026 }
1027 
1028 // RFC 9113 §6.4
ProcessRstStreamFrame(const FrameHeader & frame)1029 Status Connection::Reader::ProcessRstStreamFrame(const FrameHeader& frame) {
1030   PW_LOG_DEBUG("Conn.Recv RST_STREAM id=%" PRIu32 " len=%" PRIu32,
1031                frame.stream_id,
1032                frame.payload_length);
1033 
1034   if (frame.stream_id == 0) {
1035     // RFC 9113 §6.4: "If a RST_STREAM frame is received with a stream
1036     // identifier of 0x00, the recipient MUST treat this as a connection error
1037     // of type PROTOCOL_ERROR".
1038     SendGoAway(Http2Error::PROTOCOL_ERROR);
1039     return Status::Internal();
1040   }
1041   if (frame.stream_id > last_stream_id_) {
1042     // RFC 9113 §6.4: "If a RST_STREAM frame identifying an idle stream is
1043     // received, the recipient MUST treat this as a connection error of type
1044     // PROTOCOL_ERROR."
1045     SendGoAway(Http2Error::PROTOCOL_ERROR);
1046     return Status::Internal();
1047   }
1048   if (frame.payload_length != 4) {
1049     // RFC 9113 §6.4: "A RST_STREAM frame with a length other than 4 octets MUST
1050     // be treated as a connection error of type FRAME_SIZE_ERROR."
1051     SendGoAway(Http2Error::FRAME_SIZE_ERROR);
1052     return Status::Internal();
1053   }
1054 
1055   PW_TRY_ASSIGN(auto payload, ReadFramePayload(frame));
1056   ByteBuilder builder(payload);
1057   auto error_code = builder.begin().ReadUint32(endian::big);
1058 
1059   PW_LOG_DEBUG("Conn.RstStream id=%" PRIu32 " error=%" PRIu32,
1060                frame.stream_id,
1061                error_code);
1062   auto state = connection_.LockState();
1063   if (Stream* stream = state->LookupStream(frame.stream_id);
1064       stream != nullptr) {
1065     CloseStream(stream);
1066   }
1067   return OkStatus();
1068 }
1069 
1070 // RFC 9113 §6.5
ProcessSettingsFrame(const FrameHeader & frame,bool send_ack)1071 Status Connection::Reader::ProcessSettingsFrame(const FrameHeader& frame,
1072                                                 bool send_ack) {
1073   PW_LOG_DEBUG("Conn.Recv SETTINGS len=%" PRIu32 " flags=0x%x",
1074                frame.payload_length,
1075                frame.flags);
1076 
1077   if ((frame.flags & FLAGS_ACK) != 0) {
1078     // RFC 9113 §6.5: "Receipt of a SETTINGS frame with the ACK flag set and a
1079     // length field value other than 0 MUST be treated as a connection error of
1080     // type FRAME_SIZE_ERROR."
1081     if (frame.payload_length != 0) {
1082       PW_LOG_ERROR("Invalid SETTINGS frame: has ACK with non-empty payload");
1083       SendGoAway(Http2Error::FRAME_SIZE_ERROR);
1084       return Status::Internal();
1085     }
1086     // Don't ACK an ACK.
1087     send_ack = false;
1088   } else {
1089     // RFC 9113 §6.5: "A SETTINGS frame with a length other than a multiple of 6
1090     // octets MUST be treated as a connection error of type FRAME_SIZE_ERROR."
1091     if (frame.payload_length % 6 != 0) {
1092       PW_LOG_ERROR("Invalid SETTINGS frame: payload size invalid");
1093       SendGoAway(Http2Error::FRAME_SIZE_ERROR);
1094       return Status::Internal();
1095     }
1096   }
1097 
1098   if (frame.stream_id != 0) {
1099     // RFC 9113 §6.5: "If an endpoint receives a SETTINGS frame whose Stream
1100     // Identifier field is anything other than 0x00, the endpoint MUST respond
1101     // with a connection error of type PROTOCOL_ERROR."
1102     SendGoAway(Http2Error::PROTOCOL_ERROR);
1103     return Status::Internal();
1104   }
1105 
1106   PW_TRY_ASSIGN(auto payload, ReadFramePayload(frame));
1107 
1108   // RFC 9113 §6.5.2
1109   ByteBuilder builder(payload);
1110   for (auto it = builder.begin(); it != builder.end();) {
1111     auto id = it.ReadUint16(endian::big);
1112     auto value = it.ReadUint32(endian::big);
1113     PW_LOG_DEBUG("Applying SETTING id=%" PRIu16 " value=%" PRIu32, id, value);
1114     switch (id) {
1115       case SETTINGS_INITIAL_WINDOW_SIZE: {
1116         // RFC 9113 §6.5.2: "Values above the maximum flow-control window size
1117         // of 2^31-1 MUST be treated as a connection error of type
1118         // FLOW_CONTROL_ERROR."
1119         if ((value & (1 << 31)) != 0) {
1120           SendGoAway(Http2Error::FLOW_CONTROL_ERROR);
1121           return Status::Internal();
1122         }
1123         // RFC 9113 §6.9.2: "When the value of SETTINGS_INITIAL_WINDOW_SIZE
1124         // changes, a receiver MUST adjust the size of all stream flow-control
1125         // windows that it maintains by the difference between the new value and
1126         // the old value."
1127         int32_t newval = static_cast<int32_t>(value);
1128         int32_t delta = newval - initial_send_window_;
1129         auto state = connection_.LockState();
1130         if (const auto status = state->AddAllStreamsSendWindow(delta);
1131             !status.ok()) {
1132           SendGoAway(Http2Error::FLOW_CONTROL_ERROR);
1133           return Status::Internal();
1134         }
1135         initial_send_window_ = newval;
1136         break;
1137       }
1138       case SETTINGS_MAX_FRAME_SIZE:
1139         // RFC 9113 §6.5.2: "Values outside this range MUST be treated as a
1140         // connection error of type PROTOCOL_ERROR".
1141         if (value < 16384 || 16777215 < value) {
1142           SendGoAway(Http2Error::PROTOCOL_ERROR);
1143           return Status::Internal();
1144         }
1145         // We never send frame payloads larger than 16384, so we don't need to
1146         // track the client's preference.
1147         break;
1148       // Ignore these.
1149       // SETTINGS_HEADER_TABLE_SIZE: our responses don't use the dynamic table
1150       // SETTINGS_ENABLE_PUSH: we don't support push
1151       // SETTINGS_MAX_CONCURRENT_STREAMS: we don't support push
1152       // SETTINGS_MAX_HEADER_LIST_SIZE: we send very tiny response HEADERS
1153       default:
1154         break;
1155     }
1156   }
1157 
1158   if (send_ack) {
1159     auto state = connection_.LockState();
1160     PW_TRY(state->SendSettingsAck());
1161   }
1162 
1163   return OkStatus();
1164 }
1165 
1166 // RFC 9113 §6.7
ProcessPingFrame(const FrameHeader & frame)1167 Status Connection::Reader::ProcessPingFrame(const FrameHeader& frame) {
1168   PW_LOG_DEBUG("Conn.Recv PING len=%" PRIu32, frame.payload_length);
1169 
1170   if (frame.stream_id != 0) {
1171     // RFC 9113 §6.7: "If a PING frame is received with a Stream Identifier
1172     // field value other than 0x00, the recipient MUST respond with a connection
1173     // error of type PROTOCOL_ERROR."
1174     SendGoAway(Http2Error::PROTOCOL_ERROR);
1175     return Status::Internal();
1176   }
1177   if (frame.payload_length != 8) {
1178     // RFC 9113 §6.7: "Receipt of a PING frame with a length field value other
1179     // than 8 MUST be treated as a connection error of type FRAME_SIZE_ERROR."
1180     SendGoAway(Http2Error::FRAME_SIZE_ERROR);
1181     return Status::Internal();
1182   }
1183 
1184   PW_TRY_ASSIGN(auto payload, ReadFramePayload(frame));
1185 
1186   // Don't ACK an ACK.
1187   if ((frame.flags & FLAGS_ACK) != 0) {
1188     return OkStatus();
1189   }
1190 
1191   // Send an ACK.
1192   PW_PACKED(struct) PingFrame {
1193     WireFrameHeader header;
1194     uint64_t opaque_data;
1195   };
1196   ByteBuilder builder(payload);
1197   PingFrame ack_frame = {
1198       .header = WireFrameHeader(FrameHeader{
1199           .payload_length = 8,
1200           .type = FrameType::PING,
1201           .flags = FLAGS_ACK,
1202           .stream_id = 0,
1203       }),
1204       // Since we're going to echo this, read as native endian so it gets echoed
1205       // exactly as-is.
1206       .opaque_data = builder.begin().ReadUint64(endian::native),
1207   };
1208 
1209   {
1210     auto state = connection_.LockState();
1211     PW_TRY(state->SendBytes(AsBytes(ack_frame)));
1212   }
1213 
1214   return OkStatus();
1215 }
1216 
1217 // RFC 9113 §6.9
ProcessWindowUpdateFrame(const FrameHeader & frame)1218 Status Connection::Reader::ProcessWindowUpdateFrame(const FrameHeader& frame) {
1219   PW_LOG_DEBUG("Conn.Recv WINDOW_UPDATE id=%" PRIu32 " len=%" PRIu32,
1220                frame.stream_id,
1221                frame.payload_length);
1222 
1223   if (frame.payload_length != 4) {
1224     // RFC 9113 §6.9: "A WINDOW_UPDATE frame with a length other than 4 octets
1225     // MUST be treated as a connection error of type FRAME_SIZE_ERROR."
1226     SendGoAway(Http2Error::FRAME_SIZE_ERROR);
1227     return Status::Internal();
1228   }
1229 
1230   // Read window size increment.
1231   PW_TRY_ASSIGN(auto payload, ReadFramePayload(frame));
1232   ByteBuilder builder(payload);
1233   int32_t delta = static_cast<int32_t>(builder.begin().ReadUint32(endian::big) &
1234                                        0x7fffffff);
1235 
1236   auto state = connection_.LockState();
1237   auto stream = state->LookupStream(frame.stream_id);
1238 
1239   if (delta == 0) {
1240     // RFC 9113 §6.9: "A receiver MUST treat a WINDOW_UPDATE frame with an
1241     // increment of 0 as a stream error of type PROTOCOL_ERROR; errors on the
1242     // connection flow-control window MUST be treated as a connection error."
1243     if (frame.stream_id == 0) {
1244       SendGoAway(Http2Error::PROTOCOL_ERROR);
1245       return Status::Internal();
1246     } else {
1247       if (!stream) {
1248         // Already closed
1249         return OkStatus();
1250       }
1251       return SendRstStreamAndClose(state, stream, Http2Error::PROTOCOL_ERROR);
1252     }
1253   }
1254 
1255   // RFC 9113 §6.9.1: "If a sender receives a WINDOW_UPDATE that causes a
1256   // flow-control window to exceed 2^31-1 bytes, it MUST terminate either the
1257   // stream or the connection, as appropriate ... with an error code of
1258   // FLOW_CONTROL_ERROR"
1259   if (frame.stream_id == 0) {
1260     if (const auto status = state->AddConnectionSendWindow(delta);
1261         !status.ok()) {
1262       SendGoAway(Http2Error::FLOW_CONTROL_ERROR);
1263       return Status::Internal();
1264     }
1265   } else {
1266     if (!stream) {
1267       // Already closed
1268       return OkStatus();
1269     }
1270     if (const auto status = state->AddStreamSendWindow(stream->id, delta);
1271         !status.ok()) {
1272       return SendRstStreamAndClose(
1273           state, stream, Http2Error::FLOW_CONTROL_ERROR);
1274     }
1275   }
1276 
1277   return OkStatus();
1278 }
1279 
1280 // Advance past the payload.
ProcessIgnoredFrame(const FrameHeader & frame)1281 Status Connection::Reader::ProcessIgnoredFrame(const FrameHeader& frame) {
1282   size_t to_read = frame.payload_length;
1283   while (to_read > 0) {
1284     auto chunk = span{payload_scratch_}.subspan(
1285         0, std::min(payload_scratch_.size(), to_read));
1286     PW_TRY(ReadExactly(connection_.socket_.as_reader(), chunk));
1287     to_read -= chunk.size();
1288   }
1289   return OkStatus();
1290 }
1291 
ReadFramePayload(const FrameHeader & frame)1292 Result<ByteSpan> Connection::Reader::ReadFramePayload(
1293     const FrameHeader& frame) {
1294   if (frame.payload_length == 0) {
1295     return ByteSpan();
1296   }
1297   if (frame.payload_length > payload_scratch_.size()) {
1298     PW_LOG_ERROR("Frame type=%d payload too large: %" PRIu32 " > %" PRIu32,
1299                  static_cast<int>(frame.type),
1300                  frame.payload_length,
1301                  static_cast<uint32_t>(payload_scratch_.size()));
1302     SendGoAway(Http2Error::FRAME_SIZE_ERROR);
1303     return Status::Internal();
1304   }
1305   auto payload = span{payload_scratch_}.subspan(0, frame.payload_length);
1306   PW_TRY(ReadExactly(connection_.socket_.as_reader(), payload));
1307   return payload;
1308 }
1309 
1310 // RFC 9113 §6.8
SendGoAway(Http2Error code)1311 void Connection::Reader::SendGoAway(Http2Error code) {
1312   if (!received_connection_preface_) {
1313     // RFC 9113 §3.4: "A GOAWAY frame MAY be omitted in this case, since an
1314     // invalid preface indicates that the peer is not using HTTP/2."
1315     return;
1316   }
1317 
1318   PW_PACKED(struct) GoAwayFrame {
1319     WireFrameHeader header;
1320     uint32_t last_stream_id;
1321     uint32_t error_code;
1322   };
1323   GoAwayFrame frame{
1324       .header = WireFrameHeader(FrameHeader{
1325           .payload_length = 8,
1326           .type = FrameType::GOAWAY,
1327           .flags = 0,
1328           .stream_id = 0,
1329       }),
1330       .last_stream_id = ToNetworkOrder(last_stream_id_),
1331       .error_code = ToNetworkOrder(code),
1332   };
1333 
1334   {
1335     auto state = connection_.LockState();
1336     // Close all open streams.
1337     state->ForAllStreams([this](Stream* stream) { CloseStream(stream); });
1338     // Ignore errors since we're about to close the connection anyway.
1339     state->SendBytes(AsBytes(frame)).IgnoreError();
1340   }
1341 }
1342 
1343 // RFC 9113 §6.4
SendRstStreamAndClose(sync::BorrowedPointer<SharedState> & state,Stream * stream,Http2Error code)1344 Status Connection::Reader::SendRstStreamAndClose(
1345     sync::BorrowedPointer<SharedState>& state,
1346     Stream* stream,
1347     Http2Error code) {
1348   state->SendRstStream(stream->id, code).IgnoreError();
1349   CloseStream(stream);
1350   return OkStatus();
1351 }
1352 
1353 }  // namespace pw::grpc
1354