1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_grpc/connection.h"
16
17 #include <cinttypes>
18 #include <cstring>
19 #include <string_view>
20 #include <type_traits>
21
22 #include "pw_assert/check.h"
23 #include "pw_chrono/system_clock.h"
24 #include "pw_grpc_private/hpack.h"
25 #include "pw_log/log.h"
26 #include "pw_preprocessor/compiler.h"
27 #include "pw_status/try.h"
28
29 namespace pw::grpc {
30 namespace internal {
31
32 // RFC 9113 §6
33 // Enum names left in naming style of RFC
34 enum class FrameType : uint8_t {
35 DATA = 0x00,
36 HEADERS = 0x01,
37 PRIORITY = 0x02,
38 RST_STREAM = 0x03,
39 SETTINGS = 0x04,
40 PUSH_PROMISE = 0x05,
41 PING = 0x06,
42 GOAWAY = 0x07,
43 WINDOW_UPDATE = 0x08,
44 CONTINUATION = 0x09,
45 };
46
47 // RFC 9113 §4.1
48 constexpr size_t kFrameHeaderEncodedSize = 9;
49 struct FrameHeader {
50 uint32_t payload_length;
51 FrameType type;
52 uint8_t flags;
53 StreamId stream_id;
54 };
55
56 // RFC 9113 §7
57 // Enum names left in naming style of RFC
58 enum class Http2Error : uint32_t {
59 NO_ERROR = 0x00,
60 PROTOCOL_ERROR = 0x01,
61 INTERNAL_ERROR = 0x02,
62 FLOW_CONTROL_ERROR = 0x03,
63 SETTINGS_TIMEOUT = 0x04,
64 STREAM_CLOSED = 0x05,
65 FRAME_SIZE_ERROR = 0x06,
66 REFUSED_STREAM = 0x07,
67 CANCEL = 0x08,
68 COMPRESSION_ERROR = 0x09,
69 CONNECT_ERROR = 0x0a,
70 ENHANCE_YOUR_CALM = 0x0b,
71 INADEQUATE_SECURITY = 0x0c,
72 HTTP_1_1_REQUIRED = 0x0d,
73 };
74
75 } // namespace internal
76
77 namespace {
78
79 using internal::FrameHeader;
80 using internal::FrameType;
81 using internal::Http2Error;
82 using internal::kMaxConcurrentStreams;
83 using internal::kMaxGrpcMessageSize;
84
85 // RFC 9113 §3.4
86 constexpr std::string_view kExpectedConnectionPrefaceLiteral(
87 "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n");
88
89 static_assert(kMaxMethodNameSize == kHpackMaxStringSize);
90
91 constexpr size_t kLengthPrefixedMessageHdrSize = 5;
92
93 enum {
94 FLAGS_ACK = 0x01,
95 FLAGS_END_STREAM = 0x01,
96 FLAGS_END_HEADERS = 0x04,
97 FLAGS_PADDED = 0x08,
98 FLAGS_PRIORITY = 0x20,
99 };
100
101 // RFC 9113 §6.5.2
102 enum SettingType : uint16_t {
103 SETTINGS_HEADER_TABLE_SIZE = 0x01,
104 SETTINGS_ENABLE_PUSH = 0x02,
105 SETTINGS_MAX_CONCURRENT_STREAMS = 0x03,
106 SETTINGS_INITIAL_WINDOW_SIZE = 0x04,
107 SETTINGS_MAX_FRAME_SIZE = 0x05,
108 SETTINGS_MAX_HEADER_LIST_SIZE = 0x06,
109 };
110
ReadExactly(stream::Reader & reader,ByteSpan buffer)111 Status ReadExactly(stream::Reader& reader, ByteSpan buffer) {
112 size_t bytes_read = 0;
113 while (bytes_read < buffer.size()) {
114 PW_TRY_ASSIGN(auto out, reader.Read(buffer.subspan(bytes_read)));
115 bytes_read += out.size();
116 }
117 return OkStatus();
118 }
119
ReadFrameHeader(stream::Reader & reader)120 Result<FrameHeader> ReadFrameHeader(stream::Reader& reader) {
121 std::array<std::byte, internal::kFrameHeaderEncodedSize> buffer;
122 PW_TRY(ReadExactly(reader, buffer));
123
124 // RFC 9113 §4.1
125 FrameHeader out;
126 ByteBuilder builder(as_writable_bytes(span{buffer}));
127 auto it = builder.begin();
128 auto type_and_length = it.ReadUint32(endian::big);
129 out.payload_length = type_and_length >> 8;
130 out.type = static_cast<FrameType>(type_and_length & 0xff);
131 out.flags = it.ReadUint8();
132 out.stream_id = it.ReadUint32(endian::big) & 0x7fffffff;
133 return out;
134 }
135
136 template <typename T, std::enable_if_t<std::is_integral_v<T>, bool> = true>
ToNetworkOrder(T value)137 constexpr T ToNetworkOrder(T value) {
138 return bytes::ConvertOrder(/*from=*/endian::native,
139 /*to=*/endian::big,
140 value);
141 }
142
143 template <typename T, std::enable_if_t<std::is_enum_v<T>, bool> = true>
ToNetworkOrder(T value)144 constexpr std::underlying_type_t<T> ToNetworkOrder(T value) {
145 return ToNetworkOrder(static_cast<std::underlying_type_t<T>>(value));
146 }
147
148 // Use this instead of FrameHeader when writing frames.
PW_PACKED(struct)149 PW_PACKED(struct) WireFrameHeader {
150 WireFrameHeader(FrameHeader h)
151 : payload_length_and_type(ToNetworkOrder(h.payload_length << 8 |
152 static_cast<uint32_t>(h.type))),
153 flags(h.flags),
154 stream_id(ToNetworkOrder(h.stream_id)) {}
155
156 uint32_t payload_length_and_type;
157 uint8_t flags;
158 uint32_t stream_id;
159 };
160
161 template <typename T>
AsBytes(T & object)162 ConstByteSpan AsBytes(T& object) {
163 return as_bytes(span<T, 1>{&object, 1});
164 }
165
166 } // namespace
167
Connection(stream::ReaderWriter & socket,SendQueue & send_queue,RequestCallbacks & callbacks,allocator::Allocator * message_assembly_allocator,multibuf::MultiBufAllocator & multibuf_allocator)168 Connection::Connection(stream::ReaderWriter& socket,
169 SendQueue& send_queue,
170 RequestCallbacks& callbacks,
171 allocator::Allocator* message_assembly_allocator,
172 multibuf::MultiBufAllocator& multibuf_allocator)
173 : socket_(socket),
174 shared_state_(std::in_place,
175 message_assembly_allocator,
176 multibuf_allocator,
177 send_queue),
178 reader_(*this, callbacks),
179 writer_(*this) {}
180
ProcessFrame()181 Status Connection::Reader::ProcessFrame() {
182 if (!received_connection_preface_) {
183 return Status::FailedPrecondition();
184 }
185
186 PW_TRY_ASSIGN(auto frame, ReadFrameHeader(connection_.socket_.as_reader()));
187 switch (frame.type) {
188 // Frames that we handle.
189 case FrameType::DATA:
190 PW_TRY(ProcessDataFrame(frame));
191 break;
192 case FrameType::HEADERS:
193 PW_TRY(ProcessHeadersFrame(frame));
194 break;
195 case FrameType::PRIORITY:
196 PW_TRY(ProcessIgnoredFrame(frame));
197 break;
198 case FrameType::RST_STREAM:
199 PW_TRY(ProcessRstStreamFrame(frame));
200 break;
201 case FrameType::SETTINGS:
202 PW_TRY(ProcessSettingsFrame(frame, /*send_ack=*/true));
203 break;
204 case FrameType::PING:
205 PW_TRY(ProcessPingFrame(frame));
206 break;
207 case FrameType::WINDOW_UPDATE:
208 PW_TRY(ProcessWindowUpdateFrame(frame));
209 break;
210
211 // Frames that trigger an immediate connection close.
212 case FrameType::GOAWAY:
213 PW_LOG_ERROR("Client sent GOAWAY");
214 // don't bother sending GOAWAY in response
215 return Status::Internal();
216 case FrameType::PUSH_PROMISE:
217 PW_LOG_ERROR("Client sent PUSH_PROMISE");
218 SendGoAway(Http2Error::PROTOCOL_ERROR);
219 return Status::Internal();
220 case FrameType::CONTINUATION:
221 PW_LOG_ERROR("Client sent CONTINUATION: unsupported");
222 SendGoAway(Http2Error::INTERNAL_ERROR);
223 return Status::Internal();
224 }
225
226 return OkStatus();
227 }
228
CreateStream(StreamId id,int32_t initial_send_window)229 pw::Status Connection::SharedState::CreateStream(StreamId id,
230 int32_t initial_send_window) {
231 for (size_t i = 0; i < streams_.size(); i++) {
232 if (streams_[i].id != 0) {
233 continue;
234 }
235 PW_LOG_DEBUG("Conn.CreateStream id=%" PRIu32 " at slot=%" PRIu32,
236 id,
237 static_cast<uint32_t>(i));
238 streams_[i].id = id;
239 streams_[i].half_closed = false;
240 streams_[i].started_response = false;
241 streams_[i].send_window = initial_send_window;
242 return OkStatus();
243 }
244 PW_LOG_WARN("Conn.CreateStream id=%" PRIu32 " OUT OF SPACE", id);
245 return Status::ResourceExhausted();
246 }
247
ForAllStreams(Function<void (Stream *)> && callback)248 void Connection::SharedState::ForAllStreams(
249 Function<void(Stream*)>&& callback) {
250 for (size_t i = 0; i < streams_.size(); i++) {
251 if (streams_[i].id != 0) {
252 callback(&streams_[i]);
253 }
254 }
255 }
256
LookupStream(StreamId id)257 Connection::Stream* Connection::SharedState::LookupStream(StreamId id) {
258 for (size_t i = 0; i < streams_.size(); i++) {
259 if (streams_[i].id == id) {
260 return &streams_[i];
261 }
262 }
263 return nullptr;
264 }
265
DrainResponseQueue(Stream & stream)266 Status Connection::SharedState::DrainResponseQueue(Stream& stream) {
267 while (stream.response_queue.size() > 0) {
268 multibuf::MultiBufChunks& chunks = stream.response_queue.Chunks();
269
270 size_t message_size = chunks.front().size();
271
272 if (static_cast<int32_t>(message_size) > stream.send_window ||
273 static_cast<int32_t>(message_size) > connection_send_window_) {
274 break;
275 }
276
277 PW_TRY(SendQueued(stream, stream.response_queue.TakeFrontChunk()));
278 }
279 return OkStatus();
280 }
281
DrainResponseQueues()282 Status Connection::SharedState::DrainResponseQueues() {
283 for (Stream& stream : streams_) {
284 PW_TRY(DrainResponseQueue(stream));
285 }
286 return OkStatus();
287 }
288
SendBytes(ConstByteSpan message)289 Status Connection::SharedState::SendBytes(ConstByteSpan message) {
290 std::optional<multibuf::MultiBuf> buffer =
291 multibuf_allocator_.AllocateContiguous(message.size());
292 if (!buffer.has_value()) {
293 return Status::ResourceExhausted();
294 }
295 PW_TRY(buffer->CopyFrom(message, 0));
296 send_queue_.QueueSend(std::move(*buffer));
297 return OkStatus();
298 }
299
300 // RFC 9113 §6.1
301 //
302 // multibuf chunk should have already been allocated with enough prefix space
303 // for headers.
SendData(StreamId stream_id,multibuf::OwnedChunk && chunk)304 Status Connection::SharedState::SendData(StreamId stream_id,
305 multibuf::OwnedChunk&& chunk) {
306 size_t message_size = chunk.size();
307 PW_LOG_DEBUG("Conn.Send DATA with id=%" PRIu32 " len=%" PRIu32,
308 stream_id,
309 static_cast<uint32_t>(message_size));
310
311 // Write a Length-Prefixed-Message payload.
312 if (!chunk->ClaimPrefix(kLengthPrefixedMessageHdrSize)) {
313 return Status::ResourceExhausted();
314 }
315
316 ByteBuilder prefix(chunk);
317 prefix.PutUint8(0);
318 prefix.PutUint32(message_size, endian::big);
319
320 // Write FrameHeader
321 if (!chunk->ClaimPrefix(sizeof(WireFrameHeader))) {
322 return Status::ResourceExhausted();
323 }
324
325 WireFrameHeader frame(FrameHeader{
326 .payload_length =
327 static_cast<uint32_t>(message_size + kLengthPrefixedMessageHdrSize),
328 .type = FrameType::DATA,
329 .flags = 0,
330 .stream_id = stream_id,
331 });
332 ConstByteSpan frame_span = AsBytes(frame);
333 std::copy(frame_span.begin(), frame_span.end(), chunk->begin());
334
335 auto buffer = multibuf::MultiBuf::FromChunk(std::move(chunk));
336 send_queue_.QueueSend(std::move(buffer));
337 return OkStatus();
338 }
339
340 // RFC 9113 §6.2
SendHeaders(StreamId stream_id,ConstByteSpan payload1,ConstByteSpan payload2,bool end_stream)341 Status Connection::SharedState::SendHeaders(StreamId stream_id,
342 ConstByteSpan payload1,
343 ConstByteSpan payload2,
344 bool end_stream) {
345 PW_LOG_DEBUG("Conn.Send HEADERS with id=%" PRIu32 " len1=%" PRIu32
346 " len2=%" PRIu32 " end=%d",
347 stream_id,
348 static_cast<uint32_t>(payload1.size()),
349 static_cast<uint32_t>(payload2.size()),
350 end_stream);
351 WireFrameHeader frame(FrameHeader{
352 .payload_length =
353 static_cast<uint32_t>(payload1.size() + payload2.size()),
354 .type = FrameType::HEADERS,
355 .flags = FLAGS_END_HEADERS,
356 .stream_id = stream_id,
357 });
358
359 if (end_stream) {
360 frame.flags |= FLAGS_END_STREAM;
361 }
362
363 ConstByteSpan frame_span = AsBytes(frame);
364 std::optional<multibuf::MultiBuf> buffer =
365 multibuf_allocator_.AllocateContiguous(frame_span.size() +
366 payload1.size() + payload2.size());
367 if (!buffer.has_value()) {
368 return Status::ResourceExhausted();
369 }
370
371 size_t offset = 0;
372 PW_TRY(buffer->CopyFrom(frame_span, offset));
373 offset += frame_span.size();
374
375 if (!payload1.empty()) {
376 PW_TRY(buffer->CopyFrom(payload1, offset));
377 offset += payload1.size();
378 }
379 if (!payload2.empty()) {
380 PW_TRY(buffer->CopyFrom(payload2, offset));
381 offset += payload2.size();
382 }
383
384 send_queue_.QueueSend(std::move(*buffer));
385 return OkStatus();
386 }
387
388 // RFC 9113 §6.4
SendRstStream(StreamId stream_id,Http2Error code)389 Status Connection::SharedState::SendRstStream(StreamId stream_id,
390 Http2Error code) {
391 PW_PACKED(struct) RstStreamFrame {
392 WireFrameHeader header;
393 uint32_t error_code;
394 };
395 RstStreamFrame frame{
396 .header = WireFrameHeader(FrameHeader{
397 .payload_length = 4,
398 .type = FrameType::RST_STREAM,
399 .flags = 0,
400 .stream_id = stream_id,
401 }),
402 .error_code = ToNetworkOrder(code),
403 };
404 return SendBytes(AsBytes(frame));
405 }
406
407 // RFC 9113 §6.9
SendWindowUpdates(StreamId stream_id,uint32_t increment)408 Status Connection::SharedState::SendWindowUpdates(StreamId stream_id,
409 uint32_t increment) {
410 // It is illegal to send updates with increment=0.
411 if (increment == 0) {
412 return OkStatus();
413 }
414 if (increment & 0x80000000) {
415 // Upper bit is reserved, error.
416 return Status::InvalidArgument();
417 }
418
419 PW_LOG_DEBUG("Conn.Send WINDOW_UPDATE frames with id=%" PRIu32
420 " increment=%" PRIu32,
421 stream_id,
422 increment);
423
424 PW_PACKED(struct) WindowUpdateFrame {
425 WireFrameHeader header;
426 uint32_t increment;
427 };
428 WindowUpdateFrame frames[2] = {
429 {
430 .header = WireFrameHeader(FrameHeader{
431 .payload_length = 4,
432 .type = FrameType::WINDOW_UPDATE,
433 .flags = 0,
434 .stream_id = 0,
435 }),
436 .increment = ToNetworkOrder(increment),
437 },
438 {
439 .header = WireFrameHeader(FrameHeader{
440 .payload_length = 4,
441 .type = FrameType::WINDOW_UPDATE,
442 .flags = 0,
443 .stream_id = stream_id,
444 }),
445 .increment = ToNetworkOrder(increment),
446 },
447 };
448 return SendBytes(as_bytes(span{frames}));
449 }
450
451 // RFC 9113 §6.5
SendSettingsAck()452 Status Connection::SharedState::SendSettingsAck() {
453 PW_LOG_DEBUG("Conn.Send SETTINGS ACK");
454 WireFrameHeader frame(FrameHeader{
455 .payload_length = 0,
456 .type = FrameType::SETTINGS,
457 .flags = FLAGS_ACK,
458 .stream_id = 0,
459 });
460 return SendBytes(AsBytes(frame));
461 }
462
SendResponseMessage(StreamId stream_id,ConstByteSpan message)463 Status Connection::Writer::SendResponseMessage(StreamId stream_id,
464 ConstByteSpan message) {
465 auto state = connection_.LockState();
466
467 if (message.size() > kMaxGrpcMessageSize) {
468 PW_LOG_WARN("Message %" PRIu32 " bytes on id=%" PRIu32
469 " exceeds maximum message size",
470 static_cast<uint32_t>(message.size()),
471 stream_id);
472 return Status::InvalidArgument();
473 }
474
475 // Create contiguous buffer big enough to hold the response message plus
476 // headers.
477 std::optional<multibuf::MultiBuf> buffer =
478 state->multibuf_allocator().AllocateContiguous(
479 message.size() + kLengthPrefixedMessageHdrSize +
480 sizeof(WireFrameHeader));
481
482 if (!buffer.has_value()) {
483 return Status::ResourceExhausted();
484 }
485
486 // Before copying message in, move internal offset forward past header region.
487 buffer->DiscardPrefix(kLengthPrefixedMessageHdrSize +
488 sizeof(WireFrameHeader));
489 PW_TRY(buffer->CopyFrom(message, 0));
490
491 return state->QueueStreamResponse(stream_id, std::move(*buffer));
492 }
493
QueueStreamResponse(StreamId id,multibuf::MultiBuf && buffer)494 Status Connection::SharedState::QueueStreamResponse(
495 StreamId id, multibuf::MultiBuf&& buffer) {
496 auto stream = LookupStream(id);
497 if (!stream) {
498 return Status::NotFound();
499 }
500 stream->response_queue.PushSuffix(std::move(buffer));
501 // Try and send if we have window
502 return DrainResponseQueues();
503 }
504
SendQueued(Connection::Stream & stream,multibuf::OwnedChunk && chunk)505 Status Connection::SharedState::SendQueued(Connection::Stream& stream,
506 multibuf::OwnedChunk&& chunk) {
507 size_t message_size = chunk.size();
508
509 auto status = OkStatus();
510 if (!stream.started_response) {
511 stream.started_response = true;
512 status = SendHeaders(stream.id,
513 ResponseHeadersPayload(),
514 ConstByteSpan(),
515 /*end_stream=*/false);
516 }
517
518 if (status.ok()) {
519 status = SendData(stream.id, std::move(chunk));
520 }
521
522 if (!status.ok()) {
523 PW_LOG_WARN("Failed sending response message on id=%" PRIu32 " error=%d",
524 stream.id,
525 status.code());
526 return status;
527 }
528
529 stream.send_window -= message_size;
530 connection_send_window_ -= message_size;
531
532 return OkStatus();
533 }
534
SendResponseComplete(StreamId stream_id,Status response_code)535 Status Connection::Writer::SendResponseComplete(StreamId stream_id,
536 Status response_code) {
537 auto state = connection_.LockState();
538 auto stream = state->LookupStream(stream_id);
539 if (!stream) {
540 return Status::NotFound();
541 }
542
543 Status status;
544 if (!stream->started_response) {
545 // If the response has not started yet, we need to include the initial
546 // headers.
547 PW_LOG_DEBUG("Conn.SendResponseWithTrailers id=%" PRIu32 " code=%d",
548 stream_id,
549 response_code.code());
550 status = state->SendHeaders(stream_id,
551 ResponseHeadersPayload(),
552 ResponseTrailersPayload(response_code),
553 /*end_stream=*/true);
554 } else {
555 PW_LOG_DEBUG("Conn.SendTrailers id=%" PRIu32 " code=%d",
556 stream_id,
557 response_code.code());
558 status = state->SendHeaders(stream_id,
559 ConstByteSpan(),
560 ResponseTrailersPayload(response_code),
561 /*end_stream=*/true);
562 }
563
564 if (!status.ok()) {
565 PW_LOG_WARN("Failed sending response complete on id=%" PRIu32 " error=%d",
566 stream_id,
567 status.code());
568 return Status::Unavailable();
569 }
570
571 PW_LOG_DEBUG("Conn.CloseStream id=%" PRIu32, stream_id);
572 stream->Reset();
573
574 return OkStatus();
575 }
576
CloseStream(Connection::Stream * stream)577 void Connection::Reader::CloseStream(Connection::Stream* stream) {
578 StreamId id = stream->id;
579 PW_LOG_DEBUG("Conn.CloseStream id=%" PRIu32, id);
580 stream->Reset();
581 callbacks_.OnCancel(id);
582 }
583
584 // RFC 9113 §3.4
ProcessConnectionPreface()585 Status Connection::Reader::ProcessConnectionPreface() {
586 if (received_connection_preface_) {
587 return OkStatus();
588 }
589
590 callbacks_.OnNewConnection();
591
592 // The preface starts with a literal string.
593 auto literal = span{payload_scratch_}.subspan(
594 0, kExpectedConnectionPrefaceLiteral.size());
595
596 PW_TRY(ReadExactly(connection_.socket_.as_reader(), literal));
597 if (std::memcmp(literal.data(),
598 kExpectedConnectionPrefaceLiteral.data(),
599 kExpectedConnectionPrefaceLiteral.size()) != 0) {
600 PW_LOG_ERROR("Invalid connection preface literal");
601 return Status::Internal();
602 }
603
604 PW_LOG_DEBUG("Conn.Preface received literal");
605
606 // Client must send a SETTINGS frames.
607 PW_TRY_ASSIGN(auto client_frame,
608 ReadFrameHeader(connection_.socket_.as_reader()));
609 if (client_frame.type != FrameType::SETTINGS) {
610 PW_LOG_ERROR(
611 "Connection preface missing SETTINGS frame, found frame.type=%d",
612 static_cast<int>(client_frame.type));
613 return Status::Internal();
614 }
615
616 // Don't send an ACK yet, we'll do that below.
617 PW_TRY(ProcessSettingsFrame(client_frame, /*send_ack=*/false));
618 PW_LOG_DEBUG("Conn.Preface received SETTINGS");
619
620 // We must send a SETTINGS frame.
621 // RFC 9113 §6.5.2
622 PW_PACKED(struct) Setting {
623 uint16_t id;
624 uint32_t value;
625 };
626 PW_PACKED(struct) SettingsFrame {
627 WireFrameHeader header;
628 Setting settings[2];
629 };
630 SettingsFrame server_frame{
631 .header = WireFrameHeader(FrameHeader{
632 .payload_length = 12,
633 .type = FrameType::SETTINGS,
634 .flags = 0,
635 .stream_id = 0,
636 }),
637 .settings =
638 {
639 {
640 .id = ToNetworkOrder(SETTINGS_HEADER_TABLE_SIZE),
641 .value = ToNetworkOrder(kHpackDynamicHeaderTableSize),
642 },
643 {
644 .id = ToNetworkOrder(SETTINGS_MAX_CONCURRENT_STREAMS),
645 .value = ToNetworkOrder(kMaxConcurrentStreams),
646 },
647 },
648 };
649 PW_LOG_DEBUG("Conn.Send SETTINGS");
650
651 {
652 auto state = connection_.LockState();
653 PW_TRY(state->SendBytes(AsBytes(server_frame)));
654
655 // We must ack the client's SETTINGS frame *after* sending our SETTINGS.
656 PW_TRY(state->SendSettingsAck());
657 }
658
659 received_connection_preface_ = true;
660 PW_LOG_DEBUG("Conn.Preface complete");
661 return OkStatus();
662 }
663
664 // RFC 9113 §6.1
ProcessDataFrame(const FrameHeader & frame)665 Status Connection::Reader::ProcessDataFrame(const FrameHeader& frame) {
666 PW_LOG_DEBUG("Conn.Recv DATA id=%" PRIu32 " flags=0x%x len=%" PRIu32,
667 frame.stream_id,
668 frame.flags,
669 frame.payload_length);
670
671 if (frame.stream_id == 0) {
672 // RFC 9113 §6.1: "If a DATA frame is received whose Stream Identifier field
673 // is 0x00, the recipient MUST respond with a connection error of type
674 // PROTOCOL_ERROR."
675 SendGoAway(Http2Error::PROTOCOL_ERROR);
676 return Status::Internal();
677 }
678
679 {
680 auto state = connection_.LockState();
681
682 // From RFC 9113 §6.9: "A receiver that receives a flow-controlled frame
683 // MUST always account for its contribution against the connection
684 // flow-control window, unless the receiver treats this as a connection
685 // error. This is necessary even if the frame is in error. The sender counts
686 // the frame toward the flow-control window, but if the receiver does not,
687 // the flow-control window at the sender and receiver can become different."
688 //
689 // To simplify this, we send WINDOW_UPDATE frames eagerly.
690 //
691 // In the future we should do something less chatty.
692 PW_TRY(state->SendWindowUpdates(frame.stream_id, frame.payload_length));
693
694 auto stream = state->LookupStream(frame.stream_id);
695 if (!stream) {
696 PW_LOG_DEBUG("Ignoring DATA on closed stream id=%" PRIu32,
697 frame.stream_id);
698 // Unlock since ProcessIgnoredFrame will try and read all the ignored
699 // data, and also may try and take the lock in the error case.
700 connection_.UnlockState(std::move(state));
701 PW_TRY(ProcessIgnoredFrame(frame));
702 // Stream has been fully closed: silently ignore.
703 return OkStatus();
704 }
705
706 if (stream->half_closed) {
707 PW_LOG_ERROR("Recv DATA on half-closed stream id=%" PRIu32,
708 frame.stream_id);
709 // Unlock since ProcessIgnoredFrame will try and read all the ignored
710 // data, and also may try and take the lock in the error case.
711 connection_.UnlockState(std::move(state));
712 PW_TRY(ProcessIgnoredFrame(frame));
713 state = connection_.LockState();
714 stream = state->LookupStream(frame.stream_id);
715 if (stream) {
716 // RFC 9113 §6.1: "If a DATA frame is received whose stream is not in
717 // the "open" or "half-closed (local)" state, the recipient MUST respond
718 // with a stream error of type STREAM_CLOSED."
719 PW_TRY(SendRstStreamAndClose(state, stream, Http2Error::STREAM_CLOSED));
720 }
721 return OkStatus();
722 }
723 }
724
725 PW_TRY_ASSIGN(auto payload, ReadFramePayload(frame));
726
727 // Drop padding.
728 if ((frame.flags & FLAGS_PADDED) != 0) {
729 if (payload.size() < 1) {
730 // RFC 9113 §4.2: "An endpoint MUST send an error code of FRAME_SIZE_ERROR
731 // if a frame ... is too small to contain mandatory frame data."
732 SendGoAway(Http2Error::FRAME_SIZE_ERROR);
733 return Status::Internal();
734 }
735
736 uint32_t pad_length = static_cast<uint32_t>(payload[0]);
737 if (pad_length >= frame.payload_length) {
738 // RFC 9113 §6.1: "If the length of the padding is the length of the frame
739 // payload or greater, the recipient MUST treat this as a connection error
740 // of type PROTOCOL_ERROR."
741 SendGoAway(Http2Error::PROTOCOL_ERROR);
742 return Status::Internal();
743 }
744 payload = payload.subspan(1, payload.size() - pad_length - 1);
745 }
746
747 auto state = connection_.LockState();
748 Stream* stream = state->LookupStream(frame.stream_id);
749 if (!stream) {
750 return OkStatus();
751 }
752
753 // Parse repeated grpc Length-Prefix-Message.
754 // https://github.com/grpc/grpc/blob/v1.60.x/doc/PROTOCOL-HTTP2.md#requests
755 while (!payload.empty()) {
756 uint32_t message_length;
757
758 // If we aren't reassembling a message, read the next length prefix.
759 if (!stream->assembly_buffer) {
760 size_t read = std::min(5 - static_cast<size_t>(stream->prefix_received),
761 payload.size());
762 std::copy(payload.begin(),
763 payload.begin() + read,
764 stream->prefix_buffer.data() + stream->prefix_received);
765 stream->prefix_received += read;
766 payload = payload.subspan(read);
767
768 // Read the length prefix.
769 if (stream->prefix_received < 5) {
770 continue;
771 }
772 stream->prefix_received = 0;
773
774 ByteBuilder builder(stream->prefix_buffer);
775 auto it = builder.begin();
776 auto message_compressed = it.ReadUint8();
777 message_length = it.ReadUint32(endian::big);
778 if (message_compressed != 0) {
779 PW_LOG_ERROR("Unsupported: grpc message is compressed");
780 PW_TRY(
781 SendRstStreamAndClose(state, stream, Http2Error::INTERNAL_ERROR));
782 return OkStatus();
783 }
784
785 if (message_length > payload.size()) {
786 // gRPC message is split across DATA frames, must allocate buffer.
787 if (!state->message_assembly_allocator()) {
788 PW_LOG_ERROR(
789 "Unsupported: split grpc message without allocator provided");
790 PW_TRY(
791 SendRstStreamAndClose(state, stream, Http2Error::INTERNAL_ERROR));
792 return OkStatus();
793 }
794
795 stream->assembly_buffer = static_cast<std::byte*>(
796 state->message_assembly_allocator()->Allocate(
797 allocator::Layout(message_length)));
798 if (stream->assembly_buffer == nullptr) {
799 PW_LOG_ERROR("Partial message reassembly buffer allocation failed");
800 PW_TRY(
801 SendRstStreamAndClose(state, stream, Http2Error::INTERNAL_ERROR));
802 return OkStatus();
803 }
804 stream->message_length = message_length;
805 stream->message_received = 0;
806 continue;
807 }
808 }
809
810 pw::ByteSpan message;
811
812 // Reading message payload.
813 if (stream->assembly_buffer != nullptr) {
814 uint32_t read =
815 std::min(stream->message_length - stream->message_received,
816 static_cast<uint32_t>(payload.size()));
817 std::copy(payload.begin(),
818 payload.begin() + read,
819 stream->assembly_buffer + stream->message_received);
820 payload = payload.subspan(read);
821 stream->message_received += read;
822 if (stream->message_received < stream->message_length) {
823 continue;
824 }
825 // Fully received message.
826 message = pw::span(stream->assembly_buffer, stream->message_length);
827 } else {
828 message = payload.subspan(0, message_length);
829 payload = payload.subspan(message_length);
830 }
831
832 // Release state lock before callback, reacquire after.
833 connection_.UnlockState(std::move(state));
834 const auto status = callbacks_.OnMessage(frame.stream_id, message);
835 state = connection_.LockState();
836 stream = state->LookupStream(frame.stream_id);
837 if (!stream) {
838 return OkStatus();
839 }
840
841 if (!status.ok()) {
842 PW_TRY(SendRstStreamAndClose(state, stream, Http2Error::INTERNAL_ERROR));
843 return OkStatus();
844 }
845
846 if (stream->assembly_buffer != nullptr) {
847 state->message_assembly_allocator()->Deallocate(stream->assembly_buffer);
848 stream->assembly_buffer = nullptr;
849 stream->message_length = 0;
850 stream->message_received = 0;
851 }
852 }
853
854 // grpc requires every request stream to end with an empty DATA frame with
855 // FLAGS_END_STREAM. If a client sends FLAGS_END_STREAM with a non-empty
856 // payload, it's not specified how the server should respond. We choose to
857 // accept the payload before ending the stream.
858 // See: https://github.com/grpc/grpc/blob/v1.60.x/doc/PROTOCOL-HTTP2.md.
859 if ((frame.flags & FLAGS_END_STREAM) != 0) {
860 stream->half_closed = true;
861 connection_.UnlockState(std::move(state));
862 callbacks_.OnHalfClose(frame.stream_id);
863 }
864
865 return OkStatus();
866 }
867
868 // RFC 9113 §6.2
ProcessHeadersFrame(const FrameHeader & frame)869 Status Connection::Reader::ProcessHeadersFrame(const FrameHeader& frame) {
870 PW_LOG_DEBUG("Conn.Recv HEADERS id=%" PRIu32 " len=%" PRIu32,
871 frame.stream_id,
872 frame.payload_length);
873
874 if (frame.stream_id == 0) {
875 // RFC 9113 §6.2: "If a HEADERS frame is received whose Stream Identifier
876 // field is 0x00, the recipient MUST respond with a connection error of type
877 // PROTOCOL_ERROR."
878 SendGoAway(Http2Error::PROTOCOL_ERROR);
879 return Status::Internal();
880 }
881 if (frame.stream_id % 2 != 1 || frame.stream_id <= last_stream_id_) {
882 // RFC 9113 §5.1.1: "Streams initiated by a client MUST use odd-numbered
883 // stream identifiers ... The identifier of a newly established stream MUST
884 // be numerically greater than all streams that the initiating endpoint has
885 // opened ... An endpoint that receives an unexpected stream identifier MUST
886 // respond with a connection error of type PROTOCOL_ERROR."
887 SendGoAway(Http2Error::PROTOCOL_ERROR);
888 return Status::Internal();
889 }
890
891 last_stream_id_ = frame.stream_id;
892
893 {
894 auto state = connection_.LockState();
895 if (Stream* stream = state->LookupStream(frame.stream_id);
896 stream != nullptr) {
897 PW_LOG_DEBUG("Client sent HEADERS after the first stream message");
898 // Unlock since ProcessIgnoredFrame will try and read all the ignored
899 // data, and also may try and take the lock in the error case.
900 connection_.UnlockState(std::move(state));
901 PW_TRY(ProcessIgnoredFrame(frame));
902 state = connection_.LockState();
903 stream = state->LookupStream(frame.stream_id);
904 if (stream) {
905 // grpc requests cannot contain trailers.
906 // See:
907 // https://github.com/grpc/grpc/blob/v1.60.x/doc/PROTOCOL-HTTP2.md.
908 PW_TRY(
909 SendRstStreamAndClose(state, stream, Http2Error::PROTOCOL_ERROR));
910 }
911 return OkStatus();
912 }
913 }
914
915 if ((frame.flags & FLAGS_END_STREAM) != 0) {
916 PW_LOG_DEBUG("Client sent HEADERS with END_STREAM");
917 PW_TRY(ProcessIgnoredFrame(frame));
918 // grpc requests must send END_STREAM in an empty DATA frame.
919 // See: https://github.com/grpc/grpc/blob/v1.60.x/doc/PROTOCOL-HTTP2.md.
920 auto state = connection_.LockState();
921 PW_TRY(state->SendRstStream(frame.stream_id, Http2Error::PROTOCOL_ERROR));
922 return OkStatus();
923 }
924 if ((frame.flags & FLAGS_END_HEADERS) == 0) {
925 PW_LOG_ERROR("Client sent HEADERS frame without END_HEADERS: unsupported");
926 SendGoAway(Http2Error::INTERNAL_ERROR);
927 return Status::Internal();
928 }
929
930 PW_TRY_ASSIGN(auto payload, ReadFramePayload(frame));
931
932 // Drop padding.
933 if ((frame.flags & FLAGS_PADDED) != 0) {
934 if (payload.size() < 1) {
935 // RFC 9113 §4.2: "An endpoint MUST send an error code of FRAME_SIZE_ERROR
936 // if a frame ... is too small to contain mandatory frame data. A frame
937 // size error in a frame that could alter the state of the entire
938 // connection MUST be treated as a connection error"
939 SendGoAway(Http2Error::FRAME_SIZE_ERROR);
940 return Status::Internal();
941 }
942 uint32_t pad_length = static_cast<uint32_t>(payload[0]);
943 if (pad_length >= frame.payload_length) {
944 // RFC 9113 §6.2: "If the length of the padding is the length of the frame
945 // payload or greater, the recipient MUST treat this as a connection error
946 // of type PROTOCOL_ERROR."
947 SendGoAway(Http2Error::PROTOCOL_ERROR);
948 return Status::Internal();
949 }
950 payload = payload.subspan(1, payload.size() - pad_length - 1);
951 }
952
953 // Drop priority fields.
954 if ((frame.flags & FLAGS_PRIORITY) != 0) {
955 if (payload.size() < 5) {
956 // RFC 9113 §4.2: "An endpoint MUST send an error code of FRAME_SIZE_ERROR
957 // if a frame ... is too small to contain mandatory frame data."
958 SendGoAway(Http2Error::FRAME_SIZE_ERROR);
959 return Status::Internal();
960 }
961 payload = payload.subspan(5);
962 }
963
964 PW_TRY_ASSIGN(auto method_name, HpackParseRequestHeaders(payload));
965 {
966 auto state = connection_.LockState();
967 if (!state->CreateStream(frame.stream_id, initial_send_window_).ok()) {
968 PW_LOG_WARN("Too many streams, rejecting id=%" PRIu32, frame.stream_id);
969 return state->SendRstStream(frame.stream_id, Http2Error::REFUSED_STREAM);
970 }
971 }
972
973 if (const auto status = callbacks_.OnNew(frame.stream_id, method_name);
974 !status.ok()) {
975 auto state = connection_.LockState();
976 if (Stream* stream = state->LookupStream(frame.stream_id);
977 stream != nullptr) {
978 return SendRstStreamAndClose(state, stream, Http2Error::INTERNAL_ERROR);
979 }
980 }
981
982 return OkStatus();
983 }
984
AddConnectionSendWindow(int32_t delta)985 Status Connection::SharedState::AddConnectionSendWindow(int32_t delta) {
986 if (PW_ADD_OVERFLOW(
987 connection_send_window_, delta, &connection_send_window_)) {
988 return Status::Internal();
989 }
990
991 DrainResponseQueues().IgnoreError();
992
993 return OkStatus();
994 }
995
AddAllStreamsSendWindow(int32_t delta)996 Status Connection::SharedState::AddAllStreamsSendWindow(int32_t delta) {
997 for (size_t i = 0; i < streams_.size(); i++) {
998 if (streams_[i].id == 0) {
999 continue;
1000 }
1001 if (PW_ADD_OVERFLOW(
1002 streams_[i].send_window, delta, &streams_[i].send_window)) {
1003 return Status::Internal();
1004 }
1005 }
1006
1007 DrainResponseQueues().IgnoreError();
1008
1009 return OkStatus();
1010 }
1011
AddStreamSendWindow(StreamId id,int32_t delta)1012 Status Connection::SharedState::AddStreamSendWindow(StreamId id,
1013 int32_t delta) {
1014 Stream* stream = LookupStream(id);
1015 if (!stream) {
1016 return Status::NotFound();
1017 }
1018
1019 if (PW_ADD_OVERFLOW(stream->send_window, delta, &stream->send_window)) {
1020 return Status::Internal();
1021 }
1022
1023 DrainResponseQueues().IgnoreError();
1024
1025 return OkStatus();
1026 }
1027
1028 // RFC 9113 §6.4
ProcessRstStreamFrame(const FrameHeader & frame)1029 Status Connection::Reader::ProcessRstStreamFrame(const FrameHeader& frame) {
1030 PW_LOG_DEBUG("Conn.Recv RST_STREAM id=%" PRIu32 " len=%" PRIu32,
1031 frame.stream_id,
1032 frame.payload_length);
1033
1034 if (frame.stream_id == 0) {
1035 // RFC 9113 §6.4: "If a RST_STREAM frame is received with a stream
1036 // identifier of 0x00, the recipient MUST treat this as a connection error
1037 // of type PROTOCOL_ERROR".
1038 SendGoAway(Http2Error::PROTOCOL_ERROR);
1039 return Status::Internal();
1040 }
1041 if (frame.stream_id > last_stream_id_) {
1042 // RFC 9113 §6.4: "If a RST_STREAM frame identifying an idle stream is
1043 // received, the recipient MUST treat this as a connection error of type
1044 // PROTOCOL_ERROR."
1045 SendGoAway(Http2Error::PROTOCOL_ERROR);
1046 return Status::Internal();
1047 }
1048 if (frame.payload_length != 4) {
1049 // RFC 9113 §6.4: "A RST_STREAM frame with a length other than 4 octets MUST
1050 // be treated as a connection error of type FRAME_SIZE_ERROR."
1051 SendGoAway(Http2Error::FRAME_SIZE_ERROR);
1052 return Status::Internal();
1053 }
1054
1055 PW_TRY_ASSIGN(auto payload, ReadFramePayload(frame));
1056 ByteBuilder builder(payload);
1057 auto error_code = builder.begin().ReadUint32(endian::big);
1058
1059 PW_LOG_DEBUG("Conn.RstStream id=%" PRIu32 " error=%" PRIu32,
1060 frame.stream_id,
1061 error_code);
1062 auto state = connection_.LockState();
1063 if (Stream* stream = state->LookupStream(frame.stream_id);
1064 stream != nullptr) {
1065 CloseStream(stream);
1066 }
1067 return OkStatus();
1068 }
1069
1070 // RFC 9113 §6.5
ProcessSettingsFrame(const FrameHeader & frame,bool send_ack)1071 Status Connection::Reader::ProcessSettingsFrame(const FrameHeader& frame,
1072 bool send_ack) {
1073 PW_LOG_DEBUG("Conn.Recv SETTINGS len=%" PRIu32 " flags=0x%x",
1074 frame.payload_length,
1075 frame.flags);
1076
1077 if ((frame.flags & FLAGS_ACK) != 0) {
1078 // RFC 9113 §6.5: "Receipt of a SETTINGS frame with the ACK flag set and a
1079 // length field value other than 0 MUST be treated as a connection error of
1080 // type FRAME_SIZE_ERROR."
1081 if (frame.payload_length != 0) {
1082 PW_LOG_ERROR("Invalid SETTINGS frame: has ACK with non-empty payload");
1083 SendGoAway(Http2Error::FRAME_SIZE_ERROR);
1084 return Status::Internal();
1085 }
1086 // Don't ACK an ACK.
1087 send_ack = false;
1088 } else {
1089 // RFC 9113 §6.5: "A SETTINGS frame with a length other than a multiple of 6
1090 // octets MUST be treated as a connection error of type FRAME_SIZE_ERROR."
1091 if (frame.payload_length % 6 != 0) {
1092 PW_LOG_ERROR("Invalid SETTINGS frame: payload size invalid");
1093 SendGoAway(Http2Error::FRAME_SIZE_ERROR);
1094 return Status::Internal();
1095 }
1096 }
1097
1098 if (frame.stream_id != 0) {
1099 // RFC 9113 §6.5: "If an endpoint receives a SETTINGS frame whose Stream
1100 // Identifier field is anything other than 0x00, the endpoint MUST respond
1101 // with a connection error of type PROTOCOL_ERROR."
1102 SendGoAway(Http2Error::PROTOCOL_ERROR);
1103 return Status::Internal();
1104 }
1105
1106 PW_TRY_ASSIGN(auto payload, ReadFramePayload(frame));
1107
1108 // RFC 9113 §6.5.2
1109 ByteBuilder builder(payload);
1110 for (auto it = builder.begin(); it != builder.end();) {
1111 auto id = it.ReadUint16(endian::big);
1112 auto value = it.ReadUint32(endian::big);
1113 PW_LOG_DEBUG("Applying SETTING id=%" PRIu16 " value=%" PRIu32, id, value);
1114 switch (id) {
1115 case SETTINGS_INITIAL_WINDOW_SIZE: {
1116 // RFC 9113 §6.5.2: "Values above the maximum flow-control window size
1117 // of 2^31-1 MUST be treated as a connection error of type
1118 // FLOW_CONTROL_ERROR."
1119 if ((value & (1 << 31)) != 0) {
1120 SendGoAway(Http2Error::FLOW_CONTROL_ERROR);
1121 return Status::Internal();
1122 }
1123 // RFC 9113 §6.9.2: "When the value of SETTINGS_INITIAL_WINDOW_SIZE
1124 // changes, a receiver MUST adjust the size of all stream flow-control
1125 // windows that it maintains by the difference between the new value and
1126 // the old value."
1127 int32_t newval = static_cast<int32_t>(value);
1128 int32_t delta = newval - initial_send_window_;
1129 auto state = connection_.LockState();
1130 if (const auto status = state->AddAllStreamsSendWindow(delta);
1131 !status.ok()) {
1132 SendGoAway(Http2Error::FLOW_CONTROL_ERROR);
1133 return Status::Internal();
1134 }
1135 initial_send_window_ = newval;
1136 break;
1137 }
1138 case SETTINGS_MAX_FRAME_SIZE:
1139 // RFC 9113 §6.5.2: "Values outside this range MUST be treated as a
1140 // connection error of type PROTOCOL_ERROR".
1141 if (value < 16384 || 16777215 < value) {
1142 SendGoAway(Http2Error::PROTOCOL_ERROR);
1143 return Status::Internal();
1144 }
1145 // We never send frame payloads larger than 16384, so we don't need to
1146 // track the client's preference.
1147 break;
1148 // Ignore these.
1149 // SETTINGS_HEADER_TABLE_SIZE: our responses don't use the dynamic table
1150 // SETTINGS_ENABLE_PUSH: we don't support push
1151 // SETTINGS_MAX_CONCURRENT_STREAMS: we don't support push
1152 // SETTINGS_MAX_HEADER_LIST_SIZE: we send very tiny response HEADERS
1153 default:
1154 break;
1155 }
1156 }
1157
1158 if (send_ack) {
1159 auto state = connection_.LockState();
1160 PW_TRY(state->SendSettingsAck());
1161 }
1162
1163 return OkStatus();
1164 }
1165
1166 // RFC 9113 §6.7
ProcessPingFrame(const FrameHeader & frame)1167 Status Connection::Reader::ProcessPingFrame(const FrameHeader& frame) {
1168 PW_LOG_DEBUG("Conn.Recv PING len=%" PRIu32, frame.payload_length);
1169
1170 if (frame.stream_id != 0) {
1171 // RFC 9113 §6.7: "If a PING frame is received with a Stream Identifier
1172 // field value other than 0x00, the recipient MUST respond with a connection
1173 // error of type PROTOCOL_ERROR."
1174 SendGoAway(Http2Error::PROTOCOL_ERROR);
1175 return Status::Internal();
1176 }
1177 if (frame.payload_length != 8) {
1178 // RFC 9113 §6.7: "Receipt of a PING frame with a length field value other
1179 // than 8 MUST be treated as a connection error of type FRAME_SIZE_ERROR."
1180 SendGoAway(Http2Error::FRAME_SIZE_ERROR);
1181 return Status::Internal();
1182 }
1183
1184 PW_TRY_ASSIGN(auto payload, ReadFramePayload(frame));
1185
1186 // Don't ACK an ACK.
1187 if ((frame.flags & FLAGS_ACK) != 0) {
1188 return OkStatus();
1189 }
1190
1191 // Send an ACK.
1192 PW_PACKED(struct) PingFrame {
1193 WireFrameHeader header;
1194 uint64_t opaque_data;
1195 };
1196 ByteBuilder builder(payload);
1197 PingFrame ack_frame = {
1198 .header = WireFrameHeader(FrameHeader{
1199 .payload_length = 8,
1200 .type = FrameType::PING,
1201 .flags = FLAGS_ACK,
1202 .stream_id = 0,
1203 }),
1204 // Since we're going to echo this, read as native endian so it gets echoed
1205 // exactly as-is.
1206 .opaque_data = builder.begin().ReadUint64(endian::native),
1207 };
1208
1209 {
1210 auto state = connection_.LockState();
1211 PW_TRY(state->SendBytes(AsBytes(ack_frame)));
1212 }
1213
1214 return OkStatus();
1215 }
1216
1217 // RFC 9113 §6.9
ProcessWindowUpdateFrame(const FrameHeader & frame)1218 Status Connection::Reader::ProcessWindowUpdateFrame(const FrameHeader& frame) {
1219 PW_LOG_DEBUG("Conn.Recv WINDOW_UPDATE id=%" PRIu32 " len=%" PRIu32,
1220 frame.stream_id,
1221 frame.payload_length);
1222
1223 if (frame.payload_length != 4) {
1224 // RFC 9113 §6.9: "A WINDOW_UPDATE frame with a length other than 4 octets
1225 // MUST be treated as a connection error of type FRAME_SIZE_ERROR."
1226 SendGoAway(Http2Error::FRAME_SIZE_ERROR);
1227 return Status::Internal();
1228 }
1229
1230 // Read window size increment.
1231 PW_TRY_ASSIGN(auto payload, ReadFramePayload(frame));
1232 ByteBuilder builder(payload);
1233 int32_t delta = static_cast<int32_t>(builder.begin().ReadUint32(endian::big) &
1234 0x7fffffff);
1235
1236 auto state = connection_.LockState();
1237 auto stream = state->LookupStream(frame.stream_id);
1238
1239 if (delta == 0) {
1240 // RFC 9113 §6.9: "A receiver MUST treat a WINDOW_UPDATE frame with an
1241 // increment of 0 as a stream error of type PROTOCOL_ERROR; errors on the
1242 // connection flow-control window MUST be treated as a connection error."
1243 if (frame.stream_id == 0) {
1244 SendGoAway(Http2Error::PROTOCOL_ERROR);
1245 return Status::Internal();
1246 } else {
1247 if (!stream) {
1248 // Already closed
1249 return OkStatus();
1250 }
1251 return SendRstStreamAndClose(state, stream, Http2Error::PROTOCOL_ERROR);
1252 }
1253 }
1254
1255 // RFC 9113 §6.9.1: "If a sender receives a WINDOW_UPDATE that causes a
1256 // flow-control window to exceed 2^31-1 bytes, it MUST terminate either the
1257 // stream or the connection, as appropriate ... with an error code of
1258 // FLOW_CONTROL_ERROR"
1259 if (frame.stream_id == 0) {
1260 if (const auto status = state->AddConnectionSendWindow(delta);
1261 !status.ok()) {
1262 SendGoAway(Http2Error::FLOW_CONTROL_ERROR);
1263 return Status::Internal();
1264 }
1265 } else {
1266 if (!stream) {
1267 // Already closed
1268 return OkStatus();
1269 }
1270 if (const auto status = state->AddStreamSendWindow(stream->id, delta);
1271 !status.ok()) {
1272 return SendRstStreamAndClose(
1273 state, stream, Http2Error::FLOW_CONTROL_ERROR);
1274 }
1275 }
1276
1277 return OkStatus();
1278 }
1279
1280 // Advance past the payload.
ProcessIgnoredFrame(const FrameHeader & frame)1281 Status Connection::Reader::ProcessIgnoredFrame(const FrameHeader& frame) {
1282 size_t to_read = frame.payload_length;
1283 while (to_read > 0) {
1284 auto chunk = span{payload_scratch_}.subspan(
1285 0, std::min(payload_scratch_.size(), to_read));
1286 PW_TRY(ReadExactly(connection_.socket_.as_reader(), chunk));
1287 to_read -= chunk.size();
1288 }
1289 return OkStatus();
1290 }
1291
ReadFramePayload(const FrameHeader & frame)1292 Result<ByteSpan> Connection::Reader::ReadFramePayload(
1293 const FrameHeader& frame) {
1294 if (frame.payload_length == 0) {
1295 return ByteSpan();
1296 }
1297 if (frame.payload_length > payload_scratch_.size()) {
1298 PW_LOG_ERROR("Frame type=%d payload too large: %" PRIu32 " > %" PRIu32,
1299 static_cast<int>(frame.type),
1300 frame.payload_length,
1301 static_cast<uint32_t>(payload_scratch_.size()));
1302 SendGoAway(Http2Error::FRAME_SIZE_ERROR);
1303 return Status::Internal();
1304 }
1305 auto payload = span{payload_scratch_}.subspan(0, frame.payload_length);
1306 PW_TRY(ReadExactly(connection_.socket_.as_reader(), payload));
1307 return payload;
1308 }
1309
1310 // RFC 9113 §6.8
SendGoAway(Http2Error code)1311 void Connection::Reader::SendGoAway(Http2Error code) {
1312 if (!received_connection_preface_) {
1313 // RFC 9113 §3.4: "A GOAWAY frame MAY be omitted in this case, since an
1314 // invalid preface indicates that the peer is not using HTTP/2."
1315 return;
1316 }
1317
1318 PW_PACKED(struct) GoAwayFrame {
1319 WireFrameHeader header;
1320 uint32_t last_stream_id;
1321 uint32_t error_code;
1322 };
1323 GoAwayFrame frame{
1324 .header = WireFrameHeader(FrameHeader{
1325 .payload_length = 8,
1326 .type = FrameType::GOAWAY,
1327 .flags = 0,
1328 .stream_id = 0,
1329 }),
1330 .last_stream_id = ToNetworkOrder(last_stream_id_),
1331 .error_code = ToNetworkOrder(code),
1332 };
1333
1334 {
1335 auto state = connection_.LockState();
1336 // Close all open streams.
1337 state->ForAllStreams([this](Stream* stream) { CloseStream(stream); });
1338 // Ignore errors since we're about to close the connection anyway.
1339 state->SendBytes(AsBytes(frame)).IgnoreError();
1340 }
1341 }
1342
1343 // RFC 9113 §6.4
SendRstStreamAndClose(sync::BorrowedPointer<SharedState> & state,Stream * stream,Http2Error code)1344 Status Connection::Reader::SendRstStreamAndClose(
1345 sync::BorrowedPointer<SharedState>& state,
1346 Stream* stream,
1347 Http2Error code) {
1348 state->SendRstStream(stream->id, code).IgnoreError();
1349 CloseStream(stream);
1350 return OkStatus();
1351 }
1352
1353 } // namespace pw::grpc
1354