1 // Copyright 2024 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 #pragma once 15 16 #include <array> 17 #include <cstdint> 18 19 #include "pw_allocator/allocator.h" 20 #include "pw_bytes/byte_builder.h" 21 #include "pw_bytes/span.h" 22 #include "pw_function/function.h" 23 #include "pw_grpc/send_queue.h" 24 #include "pw_multibuf/allocator.h" 25 #include "pw_multibuf/multibuf.h" 26 #include "pw_result/result.h" 27 #include "pw_status/status.h" 28 #include "pw_stream/stream.h" 29 #include "pw_string/string.h" 30 #include "pw_sync/inline_borrowable.h" 31 #include "pw_thread/thread.h" 32 #include "pw_thread/thread_core.h" 33 34 namespace pw::grpc { 35 namespace internal { 36 37 struct FrameHeader; 38 enum class Http2Error : uint32_t; 39 40 // Parameters of this implementation. 41 // RFC 9113 §5.1.2 42 inline constexpr uint32_t kMaxConcurrentStreams = 16; 43 44 // RFC 9113 §4.2 and §6.5.2 45 inline constexpr uint32_t kMaxFramePayloadSize = 16384; 46 47 // Limits on grpc message sizes. The length prefix includes the compressed byte 48 // and 32-bit length from Length-Prefixed-Message. 49 // See: https://github.com/grpc/grpc/blob/v1.60.x/doc/PROTOCOL-HTTP2.md. 50 inline constexpr uint32_t kMaxGrpcMessageSizeWithLengthPrefix = 51 kMaxFramePayloadSize; 52 inline constexpr uint32_t kMaxGrpcMessageSize = 53 kMaxGrpcMessageSizeWithLengthPrefix - 5; 54 55 } // namespace internal 56 57 // RFC 9113 §5.1.1: Streams are identified by unsigned 31-bit integers. 58 using StreamId = uint32_t; 59 60 inline constexpr uint32_t kMaxMethodNameSize = 127; 61 62 // Implements a gRPC over HTTP2 server. 63 // 64 // Basic usage: 65 // * Provide a Connection::RequestCallbacks implementation that handles RPC 66 // events. 67 // * Provide a readable/writeable stream object that will be used like a 68 // socket over which the HTTP2 frames are read/written. When the underlying 69 // stream should be closed, the provided connection_close_callback will be 70 // called. 71 // * Drive the connection by calling ProcessConnectionPreface then ProcessFrame 72 // in a loop while status is Ok on one thread. 73 // * RPC responses can be sent from any thread by calling 74 // SendResponseMessage/SendResponseComplete. The SendQueue object will 75 // handle concurrent access. 76 // 77 // One thread should be dedicated to driving reads (ProcessFrame calls), while 78 // another thread (implemented by SendQueue) handles all writes. Refer to 79 // the ConnectionThread class for an implementation of this. 80 // 81 // By default, each gRPC message must be entirely contained within a single 82 // HTTP2 DATA frame, as supporting fragmented messages requires buffering 83 // up to the maximum message size per stream. To support fragmented messages, 84 // provide a message_assembly_allocator, which will be used to allocate 85 // temporary storage for fragmented gRPC messages when required. If no 86 // allocator is provided, or allocation fails, the stream will be closed. 87 class Connection { 88 public: 89 // Callbacks invoked on requests from the client. Called on same thread as 90 // ProcessFrame is being called on. 91 class RequestCallbacks { 92 public: 93 virtual ~RequestCallbacks() = default; 94 95 // Called on startup of connection. 96 virtual void OnNewConnection() = 0; 97 98 // Called on a new RPC. full_method_name is "<ServiceName>/<MethodName>". 99 // This is guaranteed to be called before any other method with the same id. 100 virtual Status OnNew(StreamId id, 101 InlineString<kMaxMethodNameSize> full_method_name) = 0; 102 103 // Called on a new request message for an RPC. The `message` must not be 104 // accessed after this method returns. 105 // 106 // Return an error status to cause the stream to be closed with RST_STREAM 107 // frame. 108 virtual Status OnMessage(StreamId id, ByteSpan message) = 0; 109 110 // Called after the client has sent all request messages for an RPC. 111 virtual void OnHalfClose(StreamId id) = 0; 112 113 // Called when an RPC has been canceled. 114 virtual void OnCancel(StreamId id) = 0; 115 }; 116 117 Connection(stream::ReaderWriter& socket, 118 SendQueue& send_queue, 119 RequestCallbacks& callbacks, 120 allocator::Allocator* message_assembly_allocator, 121 multibuf::MultiBufAllocator& multibuf_allocator); 122 123 // Reads from stream and processes required connection preface frames. Should 124 // be called before ProcessFrame(). Return OK if connection preface was found. ProcessConnectionPreface()125 Status ProcessConnectionPreface() { 126 return reader_.ProcessConnectionPreface(); 127 } 128 129 // Reads from stream and processes next frame on connection. Returns OK 130 // as long as connection is open. Should be called from a single thread. ProcessFrame()131 Status ProcessFrame() { return reader_.ProcessFrame(); } 132 133 // Sends a response message for an RPC. The `message` will not be accessed 134 // after this method returns. Thread safe. 135 // 136 // Errors are: 137 // 138 // * NOT_FOUND if stream_id does not reference an active stream, including 139 // RPCs that have already completed and IDs that do not refer to any prior 140 // RPC. 141 // * RESOURCE_EXHAUSTED if the flow control window is not large enough to send 142 // this RPC immediately. In this case, no response will be send. 143 // * UNAVAILABLE if the connection is closed. SendResponseMessage(StreamId stream_id,pw::ConstByteSpan message)144 Status SendResponseMessage(StreamId stream_id, pw::ConstByteSpan message) { 145 return writer_.SendResponseMessage(stream_id, message); 146 } 147 148 // Completes an RPC with the given status code. Thread safe. Pigweed status 149 // codes happen to align exactly with grpc status codes. Compare: 150 // https://grpc.github.io/grpc/core/md_doc_statuscodes.html 151 // https://pigweed.dev/pw_status/#quick-reference 152 // 153 // Errors are: 154 // 155 // * NOT_FOUND if stream_id does not reference an active stream, including 156 // RPCs that have already completed, or if stream_id does not refer to any 157 // prior RPC. 158 // * UNAVAILABLE if the connection is closed. SendResponseComplete(StreamId stream_id,pw::Status response_code)159 Status SendResponseComplete(StreamId stream_id, pw::Status response_code) { 160 return writer_.SendResponseComplete(stream_id, response_code); 161 } 162 163 private: 164 // RFC 9113 §6.9.2. Flow control windows are unsigned 31-bit numbers, but 165 // because of the following requirement from §6.9.2, we track flow control 166 // windows with signed integers. "A change to SETTINGS_INITIAL_WINDOW_SIZE can 167 // cause the available space in a flow-control window to become negative. A 168 // sender MUST track the negative flow-control window ..." 169 static inline constexpr int32_t kDefaultInitialWindowSize = 65535; 170 171 // From RFC 9113 §5.1, we use only the following states: 172 // * idle, which have `id > last_stream_id_` 173 // * open, which are in `streams_` with `half_closed = false` 174 // * half-closed (remote), which are in `streams_` with `half_closed = true` 175 // 176 // Regarding other states: 177 // * reserved is ignored because we do not sent PUSH_PROMISE 178 // * half-closed (local) is merged into close, because once a grpc server has 179 // sent a response, the RPC is complete 180 struct Stream { 181 StreamId id; 182 bool half_closed; 183 bool started_response; 184 int32_t send_window; 185 186 // Response messages that are waiting for window to send. 187 multibuf::MultiBuf response_queue; 188 189 // Fragmented gRPC message assembly, nullptr if not assembling a message. 190 std::byte* assembly_buffer; 191 union { 192 struct { 193 // Buffer for the length-prefix, if fragmented. 194 std::array<std::byte, 5> prefix_buffer; 195 // Bytes of the prefix received so far. 196 uint8_t prefix_received; 197 }; 198 struct { 199 // Total length of the message. 200 uint32_t message_length; 201 // Length of the message received so far (during assembly). 202 uint32_t message_received; 203 }; 204 }; 205 ResetStream206 void Reset() { 207 id = 0; 208 half_closed = false; 209 started_response = false; 210 send_window = 0; 211 response_queue = {}; 212 213 assembly_buffer = nullptr; 214 message_length = 0; 215 message_received = 0; 216 prefix_received = 0; 217 } 218 }; 219 220 // Internal state is divided into what is needed for reading/writing/shared to 221 // both. 222 223 class SharedState { 224 public: SharedState(allocator::Allocator * message_assembly_allocator,multibuf::MultiBufAllocator & multibuf_allocator,SendQueue & send_queue)225 SharedState(allocator::Allocator* message_assembly_allocator, 226 multibuf::MultiBufAllocator& multibuf_allocator, 227 SendQueue& send_queue) 228 : message_assembly_allocator_(message_assembly_allocator), 229 multibuf_allocator_(multibuf_allocator), 230 send_queue_(send_queue) {} 231 232 // Create stream if space available. 233 pw::Status CreateStream(StreamId id, int32_t initial_send_window); 234 235 // Update stream with `id` with new send window delta. 236 Status AddStreamSendWindow(StreamId id, int32_t delta); 237 // Update all stream with new send window delta. 238 Status AddAllStreamsSendWindow(int32_t delta); 239 // Update connection send window with new delta. 240 Status AddConnectionSendWindow(int32_t delta); 241 242 // Returns nullptr if stream not found. Note that a reference to locked 243 // SharedState should be retained while using the returned Stream*. 244 Stream* LookupStream(StreamId id); 245 246 void ForAllStreams(Function<void(Stream*)>&& callback); 247 248 // Queue response buffer for sending on `id` stream. Will send right away if 249 // window is available. 250 Status QueueStreamResponse(StreamId id, multibuf::MultiBuf&& buffer); 251 252 // Write raw bytes directly to send queue. 253 Status SendBytes(ConstByteSpan message); 254 255 // Construct and write header message directly to send queue. 256 Status SendHeaders(StreamId stream_id, 257 ConstByteSpan payload1, 258 ConstByteSpan payload2, 259 bool end_stream); 260 261 // Frame send functions. 262 Status SendRstStream(StreamId stream_id, internal::Http2Error code); 263 Status SendWindowUpdates(StreamId stream_id, uint32_t increment); 264 Status SendSettingsAck(); 265 message_assembly_allocator()266 allocator::Allocator* message_assembly_allocator() { 267 return message_assembly_allocator_; 268 } 269 multibuf_allocator()270 multibuf::MultiBufAllocator& multibuf_allocator() { 271 return multibuf_allocator_; 272 } 273 connection_send_window()274 int32_t connection_send_window() const { return connection_send_window_; } 275 276 private: 277 // Called whenever there is new data to send or a WINDOW_UPDATE message has 278 // increased a send window. Should attempt to drain any queued data across 279 // all active streams. 280 Status DrainResponseQueues(); 281 282 Status DrainResponseQueue(Stream& stream); 283 284 Status SendQueued(Stream& stream, multibuf::OwnedChunk&& chunk); 285 286 // Write DATA frame to send queue. Chunk should already have prefix space 287 // for headers. 288 Status SendData(StreamId stream_id, multibuf::OwnedChunk&& chunk); 289 290 // Stream state 291 std::array<Stream, internal::kMaxConcurrentStreams> streams_{}; 292 int32_t connection_send_window_ = kDefaultInitialWindowSize; 293 294 // Allocator for fragmented grpc message reassembly 295 allocator::Allocator* message_assembly_allocator_; 296 297 // Allocator for creating send buffers to queue. 298 multibuf::MultiBufAllocator& multibuf_allocator_; 299 300 SendQueue& send_queue_; 301 }; 302 303 class Writer { 304 public: Writer(Connection & connection)305 Writer(Connection& connection) : connection_(connection) {} 306 307 Status SendResponseMessage(StreamId stream_id, pw::ConstByteSpan message); 308 Status SendResponseComplete(StreamId stream_id, pw::Status response_code); 309 310 private: 311 Connection& connection_; 312 }; 313 314 class Reader { 315 public: Reader(Connection & connection,RequestCallbacks & callbacks)316 Reader(Connection& connection, RequestCallbacks& callbacks) 317 : connection_(connection), callbacks_(callbacks) {} 318 319 Status ProcessConnectionPreface(); 320 Status ProcessFrame(); 321 322 private: 323 void CloseStream(Stream* stream); 324 325 Status ProcessDataFrame(const internal::FrameHeader&); 326 Status ProcessHeadersFrame(const internal::FrameHeader&); 327 Status ProcessRstStreamFrame(const internal::FrameHeader&); 328 Status ProcessSettingsFrame(const internal::FrameHeader&, bool send_ack); 329 Status ProcessPingFrame(const internal::FrameHeader&); 330 Status ProcessWindowUpdateFrame(const internal::FrameHeader&); 331 Status ProcessIgnoredFrame(const internal::FrameHeader&); 332 Result<ByteSpan> ReadFramePayload(const internal::FrameHeader&); 333 334 // Send GOAWAY frame and signal connection should be closed. 335 void SendGoAway(internal::Http2Error code); 336 Status SendRstStreamAndClose(sync::BorrowedPointer<SharedState>& state, 337 Stream* stream, 338 internal::Http2Error code); 339 340 Connection& connection_; 341 RequestCallbacks& callbacks_; 342 int32_t initial_send_window_ = kDefaultInitialWindowSize; 343 bool received_connection_preface_ = false; 344 345 std::array<std::byte, internal::kMaxFramePayloadSize> payload_scratch_{}; 346 StreamId last_stream_id_ = 0; 347 }; 348 LockState()349 sync::BorrowedPointer<SharedState> LockState() { 350 return shared_state_.acquire(); 351 } 352 UnlockState(sync::BorrowedPointer<SharedState> && state)353 void UnlockState(sync::BorrowedPointer<SharedState>&& state) { 354 sync::BorrowedPointer<SharedState> moved_state = std::move(state); 355 static_cast<void>(moved_state); 356 } 357 358 // Shared state that is thread-safe. 359 stream::ReaderWriter& socket_; 360 361 sync::InlineBorrowable<SharedState> shared_state_; 362 Reader reader_; 363 Writer writer_; 364 }; 365 366 class ConnectionThread : public Connection, public thread::ThreadCore { 367 public: 368 // The ConnectionCloseCallback will be called when this thread is shutting 369 // down and all data has finished sending. It will be called from this 370 // ConnectionThread. 371 using ConnectionCloseCallback = Function<void()>; 372 ConnectionThread(stream::NonSeekableReaderWriter & stream,const thread::Options & send_thread_options,RequestCallbacks & callbacks,ConnectionCloseCallback && connection_close_callback,allocator::Allocator * message_assembly_allocator,multibuf::MultiBufAllocator & multibuf_allocator)373 ConnectionThread(stream::NonSeekableReaderWriter& stream, 374 const thread::Options& send_thread_options, 375 RequestCallbacks& callbacks, 376 ConnectionCloseCallback&& connection_close_callback, 377 allocator::Allocator* message_assembly_allocator, 378 multibuf::MultiBufAllocator& multibuf_allocator) 379 : Connection(stream, 380 send_queue_, 381 callbacks, 382 message_assembly_allocator, 383 multibuf_allocator), 384 send_queue_(stream), 385 send_queue_thread_options_(send_thread_options), 386 connection_close_callback_(std::move(connection_close_callback)) {} 387 388 // Process the connection. Does not return until the connection is closed. Run()389 void Run() override { 390 Thread send_thread(send_queue_thread_options_, send_queue_); 391 Status status = ProcessConnectionPreface(); 392 while (status.ok()) { 393 status = ProcessFrame(); 394 } 395 send_queue_.RequestStop(); 396 send_thread.join(); 397 if (connection_close_callback_) { 398 connection_close_callback_(); 399 } 400 }; 401 402 private: 403 SendQueue send_queue_; 404 const thread::Options& send_queue_thread_options_; 405 ConnectionCloseCallback connection_close_callback_; 406 }; 407 408 } // namespace pw::grpc 409