• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 #pragma once
15 
16 #include <array>
17 #include <cstdint>
18 
19 #include "pw_allocator/allocator.h"
20 #include "pw_bytes/byte_builder.h"
21 #include "pw_bytes/span.h"
22 #include "pw_function/function.h"
23 #include "pw_grpc/send_queue.h"
24 #include "pw_multibuf/allocator.h"
25 #include "pw_multibuf/multibuf.h"
26 #include "pw_result/result.h"
27 #include "pw_status/status.h"
28 #include "pw_stream/stream.h"
29 #include "pw_string/string.h"
30 #include "pw_sync/inline_borrowable.h"
31 #include "pw_thread/thread.h"
32 #include "pw_thread/thread_core.h"
33 
34 namespace pw::grpc {
35 namespace internal {
36 
37 struct FrameHeader;
38 enum class Http2Error : uint32_t;
39 
40 // Parameters of this implementation.
41 // RFC 9113 §5.1.2
42 inline constexpr uint32_t kMaxConcurrentStreams = 16;
43 
44 // RFC 9113 §4.2 and §6.5.2
45 inline constexpr uint32_t kMaxFramePayloadSize = 16384;
46 
47 // Limits on grpc message sizes. The length prefix includes the compressed byte
48 // and 32-bit length from Length-Prefixed-Message.
49 // See: https://github.com/grpc/grpc/blob/v1.60.x/doc/PROTOCOL-HTTP2.md.
50 inline constexpr uint32_t kMaxGrpcMessageSizeWithLengthPrefix =
51     kMaxFramePayloadSize;
52 inline constexpr uint32_t kMaxGrpcMessageSize =
53     kMaxGrpcMessageSizeWithLengthPrefix - 5;
54 
55 }  // namespace internal
56 
57 // RFC 9113 §5.1.1: Streams are identified by unsigned 31-bit integers.
58 using StreamId = uint32_t;
59 
60 inline constexpr uint32_t kMaxMethodNameSize = 127;
61 
62 // Implements a gRPC over HTTP2 server.
63 //
64 // Basic usage:
65 // * Provide a Connection::RequestCallbacks implementation that handles RPC
66 //   events.
67 // * Provide a readable/writeable stream object that will be used like a
68 //   socket over which the HTTP2 frames are read/written. When the underlying
69 //   stream should be closed, the provided connection_close_callback will be
70 //   called.
71 // * Drive the connection by calling ProcessConnectionPreface then ProcessFrame
72 //   in a loop while status is Ok on one thread.
73 // * RPC responses can be sent from any thread by calling
74 //   SendResponseMessage/SendResponseComplete. The SendQueue object will
75 //   handle concurrent access.
76 //
77 // One thread should be dedicated to driving reads (ProcessFrame calls), while
78 // another thread (implemented by SendQueue) handles all writes. Refer to
79 // the ConnectionThread class for an implementation of this.
80 //
81 // By default, each gRPC message must be entirely contained within a single
82 // HTTP2 DATA frame, as supporting fragmented messages requires buffering
83 // up to the maximum message size per stream. To support fragmented messages,
84 // provide a message_assembly_allocator, which will be used to allocate
85 // temporary storage for fragmented gRPC messages when required. If no
86 // allocator is provided, or allocation fails, the stream will be closed.
87 class Connection {
88  public:
89   // Callbacks invoked on requests from the client. Called on same thread as
90   // ProcessFrame is being called on.
91   class RequestCallbacks {
92    public:
93     virtual ~RequestCallbacks() = default;
94 
95     // Called on startup of connection.
96     virtual void OnNewConnection() = 0;
97 
98     // Called on a new RPC. full_method_name is "<ServiceName>/<MethodName>".
99     // This is guaranteed to be called before any other method with the same id.
100     virtual Status OnNew(StreamId id,
101                          InlineString<kMaxMethodNameSize> full_method_name) = 0;
102 
103     // Called on a new request message for an RPC. The `message` must not be
104     // accessed after this method returns.
105     //
106     // Return an error status to cause the stream to be closed with RST_STREAM
107     // frame.
108     virtual Status OnMessage(StreamId id, ByteSpan message) = 0;
109 
110     // Called after the client has sent all request messages for an RPC.
111     virtual void OnHalfClose(StreamId id) = 0;
112 
113     // Called when an RPC has been canceled.
114     virtual void OnCancel(StreamId id) = 0;
115   };
116 
117   Connection(stream::ReaderWriter& socket,
118              SendQueue& send_queue,
119              RequestCallbacks& callbacks,
120              allocator::Allocator* message_assembly_allocator,
121              multibuf::MultiBufAllocator& multibuf_allocator);
122 
123   // Reads from stream and processes required connection preface frames. Should
124   // be called before ProcessFrame(). Return OK if connection preface was found.
ProcessConnectionPreface()125   Status ProcessConnectionPreface() {
126     return reader_.ProcessConnectionPreface();
127   }
128 
129   // Reads from stream and processes next frame on connection. Returns OK
130   // as long as connection is open. Should be called from a single thread.
ProcessFrame()131   Status ProcessFrame() { return reader_.ProcessFrame(); }
132 
133   // Sends a response message for an RPC. The `message` will not be accessed
134   // after this method returns. Thread safe.
135   //
136   // Errors are:
137   //
138   // * NOT_FOUND if stream_id does not reference an active stream, including
139   //   RPCs that have already completed and IDs that do not refer to any prior
140   //   RPC.
141   // * RESOURCE_EXHAUSTED if the flow control window is not large enough to send
142   //   this RPC immediately. In this case, no response will be send.
143   // * UNAVAILABLE if the connection is closed.
SendResponseMessage(StreamId stream_id,pw::ConstByteSpan message)144   Status SendResponseMessage(StreamId stream_id, pw::ConstByteSpan message) {
145     return writer_.SendResponseMessage(stream_id, message);
146   }
147 
148   // Completes an RPC with the given status code. Thread safe. Pigweed status
149   // codes happen to align exactly with grpc status codes. Compare:
150   // https://grpc.github.io/grpc/core/md_doc_statuscodes.html
151   // https://pigweed.dev/pw_status/#quick-reference
152   //
153   // Errors are:
154   //
155   // * NOT_FOUND if stream_id does not reference an active stream, including
156   //   RPCs that have already completed, or if stream_id does not refer to any
157   //   prior RPC.
158   // * UNAVAILABLE if the connection is closed.
SendResponseComplete(StreamId stream_id,pw::Status response_code)159   Status SendResponseComplete(StreamId stream_id, pw::Status response_code) {
160     return writer_.SendResponseComplete(stream_id, response_code);
161   }
162 
163  private:
164   // RFC 9113 §6.9.2. Flow control windows are unsigned 31-bit numbers, but
165   // because of the following requirement from §6.9.2, we track flow control
166   // windows with signed integers. "A change to SETTINGS_INITIAL_WINDOW_SIZE can
167   // cause the available space in a flow-control window to become negative. A
168   // sender MUST track the negative flow-control window ..."
169   static inline constexpr int32_t kDefaultInitialWindowSize = 65535;
170 
171   // From RFC 9113 §5.1, we use only the following states:
172   // * idle, which have `id > last_stream_id_`
173   // * open, which are in `streams_` with `half_closed = false`
174   // * half-closed (remote), which are in `streams_` with `half_closed = true`
175   //
176   // Regarding other states:
177   // * reserved is ignored because we do not sent PUSH_PROMISE
178   // * half-closed (local) is merged into close, because once a grpc server has
179   //   sent a response, the RPC is complete
180   struct Stream {
181     StreamId id;
182     bool half_closed;
183     bool started_response;
184     int32_t send_window;
185 
186     // Response messages that are waiting for window to send.
187     multibuf::MultiBuf response_queue;
188 
189     // Fragmented gRPC message assembly, nullptr if not assembling a message.
190     std::byte* assembly_buffer;
191     union {
192       struct {
193         // Buffer for the length-prefix, if fragmented.
194         std::array<std::byte, 5> prefix_buffer;
195         // Bytes of the prefix received so far.
196         uint8_t prefix_received;
197       };
198       struct {
199         // Total length of the message.
200         uint32_t message_length;
201         // Length of the message received so far (during assembly).
202         uint32_t message_received;
203       };
204     };
205 
ResetStream206     void Reset() {
207       id = 0;
208       half_closed = false;
209       started_response = false;
210       send_window = 0;
211       response_queue = {};
212 
213       assembly_buffer = nullptr;
214       message_length = 0;
215       message_received = 0;
216       prefix_received = 0;
217     }
218   };
219 
220   // Internal state is divided into what is needed for reading/writing/shared to
221   // both.
222 
223   class SharedState {
224    public:
SharedState(allocator::Allocator * message_assembly_allocator,multibuf::MultiBufAllocator & multibuf_allocator,SendQueue & send_queue)225     SharedState(allocator::Allocator* message_assembly_allocator,
226                 multibuf::MultiBufAllocator& multibuf_allocator,
227                 SendQueue& send_queue)
228         : message_assembly_allocator_(message_assembly_allocator),
229           multibuf_allocator_(multibuf_allocator),
230           send_queue_(send_queue) {}
231 
232     // Create stream if space available.
233     pw::Status CreateStream(StreamId id, int32_t initial_send_window);
234 
235     // Update stream with `id` with new send window delta.
236     Status AddStreamSendWindow(StreamId id, int32_t delta);
237     // Update all stream with new send window delta.
238     Status AddAllStreamsSendWindow(int32_t delta);
239     // Update connection send window with new delta.
240     Status AddConnectionSendWindow(int32_t delta);
241 
242     // Returns nullptr if stream not found. Note that a reference to locked
243     // SharedState should be retained while using the returned Stream*.
244     Stream* LookupStream(StreamId id);
245 
246     void ForAllStreams(Function<void(Stream*)>&& callback);
247 
248     // Queue response buffer for sending on `id` stream. Will send right away if
249     // window is available.
250     Status QueueStreamResponse(StreamId id, multibuf::MultiBuf&& buffer);
251 
252     // Write raw bytes directly to send queue.
253     Status SendBytes(ConstByteSpan message);
254 
255     // Construct and write header message directly to send queue.
256     Status SendHeaders(StreamId stream_id,
257                        ConstByteSpan payload1,
258                        ConstByteSpan payload2,
259                        bool end_stream);
260 
261     // Frame send functions.
262     Status SendRstStream(StreamId stream_id, internal::Http2Error code);
263     Status SendWindowUpdates(StreamId stream_id, uint32_t increment);
264     Status SendSettingsAck();
265 
message_assembly_allocator()266     allocator::Allocator* message_assembly_allocator() {
267       return message_assembly_allocator_;
268     }
269 
multibuf_allocator()270     multibuf::MultiBufAllocator& multibuf_allocator() {
271       return multibuf_allocator_;
272     }
273 
connection_send_window()274     int32_t connection_send_window() const { return connection_send_window_; }
275 
276    private:
277     // Called whenever there is new data to send or a WINDOW_UPDATE message has
278     // increased a send window. Should attempt to drain any queued data across
279     // all active streams.
280     Status DrainResponseQueues();
281 
282     Status DrainResponseQueue(Stream& stream);
283 
284     Status SendQueued(Stream& stream, multibuf::OwnedChunk&& chunk);
285 
286     // Write DATA frame to send queue. Chunk should already have prefix space
287     // for headers.
288     Status SendData(StreamId stream_id, multibuf::OwnedChunk&& chunk);
289 
290     // Stream state
291     std::array<Stream, internal::kMaxConcurrentStreams> streams_{};
292     int32_t connection_send_window_ = kDefaultInitialWindowSize;
293 
294     // Allocator for fragmented grpc message reassembly
295     allocator::Allocator* message_assembly_allocator_;
296 
297     // Allocator for creating send buffers to queue.
298     multibuf::MultiBufAllocator& multibuf_allocator_;
299 
300     SendQueue& send_queue_;
301   };
302 
303   class Writer {
304    public:
Writer(Connection & connection)305     Writer(Connection& connection) : connection_(connection) {}
306 
307     Status SendResponseMessage(StreamId stream_id, pw::ConstByteSpan message);
308     Status SendResponseComplete(StreamId stream_id, pw::Status response_code);
309 
310    private:
311     Connection& connection_;
312   };
313 
314   class Reader {
315    public:
Reader(Connection & connection,RequestCallbacks & callbacks)316     Reader(Connection& connection, RequestCallbacks& callbacks)
317         : connection_(connection), callbacks_(callbacks) {}
318 
319     Status ProcessConnectionPreface();
320     Status ProcessFrame();
321 
322    private:
323     void CloseStream(Stream* stream);
324 
325     Status ProcessDataFrame(const internal::FrameHeader&);
326     Status ProcessHeadersFrame(const internal::FrameHeader&);
327     Status ProcessRstStreamFrame(const internal::FrameHeader&);
328     Status ProcessSettingsFrame(const internal::FrameHeader&, bool send_ack);
329     Status ProcessPingFrame(const internal::FrameHeader&);
330     Status ProcessWindowUpdateFrame(const internal::FrameHeader&);
331     Status ProcessIgnoredFrame(const internal::FrameHeader&);
332     Result<ByteSpan> ReadFramePayload(const internal::FrameHeader&);
333 
334     // Send GOAWAY frame and signal connection should be closed.
335     void SendGoAway(internal::Http2Error code);
336     Status SendRstStreamAndClose(sync::BorrowedPointer<SharedState>& state,
337                                  Stream* stream,
338                                  internal::Http2Error code);
339 
340     Connection& connection_;
341     RequestCallbacks& callbacks_;
342     int32_t initial_send_window_ = kDefaultInitialWindowSize;
343     bool received_connection_preface_ = false;
344 
345     std::array<std::byte, internal::kMaxFramePayloadSize> payload_scratch_{};
346     StreamId last_stream_id_ = 0;
347   };
348 
LockState()349   sync::BorrowedPointer<SharedState> LockState() {
350     return shared_state_.acquire();
351   }
352 
UnlockState(sync::BorrowedPointer<SharedState> && state)353   void UnlockState(sync::BorrowedPointer<SharedState>&& state) {
354     sync::BorrowedPointer<SharedState> moved_state = std::move(state);
355     static_cast<void>(moved_state);
356   }
357 
358   // Shared state that is thread-safe.
359   stream::ReaderWriter& socket_;
360 
361   sync::InlineBorrowable<SharedState> shared_state_;
362   Reader reader_;
363   Writer writer_;
364 };
365 
366 class ConnectionThread : public Connection, public thread::ThreadCore {
367  public:
368   // The ConnectionCloseCallback will be called when this thread is shutting
369   // down and all data has finished sending. It will be called from this
370   // ConnectionThread.
371   using ConnectionCloseCallback = Function<void()>;
372 
ConnectionThread(stream::NonSeekableReaderWriter & stream,const thread::Options & send_thread_options,RequestCallbacks & callbacks,ConnectionCloseCallback && connection_close_callback,allocator::Allocator * message_assembly_allocator,multibuf::MultiBufAllocator & multibuf_allocator)373   ConnectionThread(stream::NonSeekableReaderWriter& stream,
374                    const thread::Options& send_thread_options,
375                    RequestCallbacks& callbacks,
376                    ConnectionCloseCallback&& connection_close_callback,
377                    allocator::Allocator* message_assembly_allocator,
378                    multibuf::MultiBufAllocator& multibuf_allocator)
379       : Connection(stream,
380                    send_queue_,
381                    callbacks,
382                    message_assembly_allocator,
383                    multibuf_allocator),
384         send_queue_(stream),
385         send_queue_thread_options_(send_thread_options),
386         connection_close_callback_(std::move(connection_close_callback)) {}
387 
388   // Process the connection. Does not return until the connection is closed.
Run()389   void Run() override {
390     Thread send_thread(send_queue_thread_options_, send_queue_);
391     Status status = ProcessConnectionPreface();
392     while (status.ok()) {
393       status = ProcessFrame();
394     }
395     send_queue_.RequestStop();
396     send_thread.join();
397     if (connection_close_callback_) {
398       connection_close_callback_();
399     }
400   };
401 
402  private:
403   SendQueue send_queue_;
404   const thread::Options& send_queue_thread_options_;
405   ConnectionCloseCallback connection_close_callback_;
406 };
407 
408 }  // namespace pw::grpc
409