• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2023 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 #pragma once
15 
16 #include "pw_assert/assert.h"
17 #include "pw_bytes/span.h"
18 #include "pw_rpc_transport/rpc_transport.h"
19 #include "pw_status/status.h"
20 #include "pw_status/try.h"
21 #include "rpc_transport.h"
22 
23 namespace pw::rpc {
24 
25 // The following encoder and decoder implement a very simple RPC framing
26 // protocol where the first frame contains the total packet size in the header
27 // and up to max frame size bytes in the payload. The subsequent frames of the
28 // same packet have an empty header and the rest of the packet in their payload.
29 //
30 // First frame header also contains a special marker as an attempt to
31 // resynchronize the receiver if some frames were not sent (although we expect
32 // all transports using this framing type to be reliable, it's still possible
33 // that some random transport write timeout result in only the first few frames
34 // being sent and others dropped; in that case we attempt best effort recovery
35 // by effectively skipping the input until we see something that resembles a
36 // valid header).
37 //
38 // Both encoder and decoder are not thread-safe. The caller must ensure their
39 // correct use in a multi-threaded environment.
40 
41 namespace internal {
42 
43 void LogReceivedRpcPacketTooLarge(size_t packet_size, size_t max_packet_size);
44 void LogMalformedRpcFrameHeader();
45 
46 }  // namespace internal
47 
48 template <size_t kMaxPacketSize>
49 class SimpleRpcPacketEncoder
50     : public RpcPacketEncoder<SimpleRpcPacketEncoder<kMaxPacketSize>> {
51   static_assert(kMaxPacketSize <= 1 << 16);
52 
53  public:
54   static constexpr size_t kHeaderSize = 4;
55   static constexpr uint16_t kFrameMarker = 0x27f1;
56 
57   // Encodes `packet` with a simple framing protocol and split the resulting
58   // frame into chunks of `RpcFrame`s where every `RpcFrame` is no longer than
59   // `max_frame_size`. Calls `callback` for for each of the resulting
60   // `RpcFrame`s.
Encode(ConstByteSpan rpc_packet,size_t max_frame_size,OnRpcFrameEncodedCallback && callback)61   Status Encode(ConstByteSpan rpc_packet,
62                 size_t max_frame_size,
63                 OnRpcFrameEncodedCallback&& callback) {
64     if (rpc_packet.size() > kMaxPacketSize) {
65       return Status::FailedPrecondition();
66     }
67     if (max_frame_size <= kHeaderSize) {
68       return Status::FailedPrecondition();
69     }
70 
71     // First frame. This is the only frame that contains a header.
72     const auto first_frame_size =
73         std::min(max_frame_size - kHeaderSize, rpc_packet.size());
74 
75     std::array<std::byte, kHeaderSize> header{
76         std::byte{kFrameMarker & 0xff},
77         std::byte{(kFrameMarker >> 8) & 0xff},
78         static_cast<std::byte>(rpc_packet.size() & 0xff),
79         static_cast<std::byte>((rpc_packet.size() >> 8) & 0xff),
80     };
81 
82     RpcFrame frame{.header = span(header),
83                    .payload = rpc_packet.first(first_frame_size)};
84     PW_TRY(callback(frame));
85     auto remaining = rpc_packet.subspan(first_frame_size);
86 
87     // Second and subsequent frames (if any).
88     while (!remaining.empty()) {
89       auto fragment_size = std::min(max_frame_size, remaining.size());
90       RpcFrame next_frame{.header = {},
91                           .payload = remaining.first(fragment_size)};
92       PW_TRY(callback(next_frame));
93       remaining = remaining.subspan(fragment_size);
94     }
95 
96     return OkStatus();
97   }
98 };
99 
100 template <size_t kMaxPacketSize>
101 class SimpleRpcPacketDecoder
102     : public RpcPacketDecoder<SimpleRpcPacketDecoder<kMaxPacketSize>> {
103   using Encoder = SimpleRpcPacketEncoder<kMaxPacketSize>;
104 
105  public:
SimpleRpcPacketDecoder()106   SimpleRpcPacketDecoder() { ExpectHeader(); }
107 
108   // Find and decodes `RpcFrame`s in `buffer`. `buffer` may contain zero or
109   // more frames for zero or more packets. Calls `callback` for each
110   // well-formed packet. Malformed packets are ignored and dropped.
Decode(ConstByteSpan buffer,OnRpcPacketDecodedCallback && callback)111   Status Decode(ConstByteSpan buffer, OnRpcPacketDecodedCallback&& callback) {
112     while (!buffer.empty()) {
113       switch (state_) {
114         case State::kReadingHeader: {
115           buffer = buffer.subspan(ReadHeader(buffer));
116           break;
117         }
118         case State::kReadingPayload: {
119           // Payload can only follow a valid header, reset the flag here so
120           // that next invalid header logs again.
121           already_logged_invalid_header_ = false;
122           buffer = buffer.subspan(ReadPayload(buffer, callback));
123           break;
124         }
125       }
126     }
127     return OkStatus();
128   }
129 
130  private:
131   enum class State {
132     kReadingHeader,
133     kReadingPayload,
134   };
135 
136   size_t ReadHeader(ConstByteSpan buffer);
137 
138   size_t ReadPayload(ConstByteSpan buffer,
139                      const OnRpcPacketDecodedCallback& callback);
140 
ExpectHeader()141   void ExpectHeader() {
142     state_ = State::kReadingHeader;
143     bytes_read_ = 0;
144     bytes_remaining_ = Encoder::kHeaderSize;
145   }
146 
ExpectPayload(size_t size)147   void ExpectPayload(size_t size) {
148     state_ = State::kReadingPayload;
149     bytes_read_ = 0;
150     bytes_remaining_ = size;
151   }
152 
153   std::array<std::byte, kMaxPacketSize> packet_{};
154   std::array<std::byte, Encoder::kHeaderSize> header_{};
155 
156   // Current decoder state.
157   State state_;
158   // How many bytes were read in the current state.
159   size_t bytes_read_ = 0;
160   // How many bytes remain to read in the current state.
161   size_t bytes_remaining_ = 0;
162   // When true, discard the received payload instead of buffering it (because
163   // it's too big to buffer).
164   bool discard_payload_ = false;
165   // When true, skip logging on invalid header if we already logged. This is
166   // to prevent logging on every payload byte of a malformed frame.
167   bool already_logged_invalid_header_ = false;
168 };
169 
170 template <size_t kMaxPacketSize>
ReadHeader(ConstByteSpan buffer)171 size_t SimpleRpcPacketDecoder<kMaxPacketSize>::ReadHeader(
172     ConstByteSpan buffer) {
173   const auto read_size = std::min(buffer.size(), bytes_remaining_);
174   bool header_available = false;
175   PW_DASSERT(read_size <= Encoder::kHeaderSize);
176 
177   std::memcpy(header_.data() + bytes_read_, buffer.data(), read_size);
178   bytes_read_ += read_size;
179   bytes_remaining_ -= read_size;
180   header_available = bytes_remaining_ == 0;
181 
182   if (header_available) {
183     uint16_t marker = (static_cast<uint16_t>(header_[1]) << 8) |
184                       static_cast<uint16_t>(header_[0]);
185     uint16_t packet_size = (static_cast<uint16_t>(header_[3]) << 8) |
186                            static_cast<uint16_t>(header_[2]);
187 
188     if (marker != Encoder::kFrameMarker) {
189       // We expected a header but received some data that is definitely not
190       // a header. Skip it and keep waiting for the next header. This could
191       // also be a false positive, e.g. in the worst case we treat some
192       // random data as a header: even then we should eventually be able to
193       // stumble upon a real header and start processing packets again.
194       ExpectHeader();
195       // Consume only a single byte since we're looking for a header in a
196       // broken stream and it could start at the next byte.
197       if (!already_logged_invalid_header_) {
198         internal::LogMalformedRpcFrameHeader();
199         already_logged_invalid_header_ = true;
200       }
201       return 1;
202     }
203     if (packet_size > kMaxPacketSize) {
204       // Consume both header and packet without saving it, as it's too big
205       // for the buffer. This is likely due to max packet size mismatch
206       // between the encoder and the decoder.
207       internal::LogReceivedRpcPacketTooLarge(packet_size, kMaxPacketSize);
208       discard_payload_ = true;
209     }
210     ExpectPayload(packet_size);
211   }
212 
213   return read_size;
214 }
215 
216 template <size_t kMaxPacketSize>
ReadPayload(ConstByteSpan buffer,const OnRpcPacketDecodedCallback & callback)217 size_t SimpleRpcPacketDecoder<kMaxPacketSize>::ReadPayload(
218     ConstByteSpan buffer, const OnRpcPacketDecodedCallback& callback) {
219   if (buffer.size() >= bytes_remaining_ && bytes_read_ == 0) {
220     const auto read_size = bytes_remaining_;
221     // We have the whole packet available in the buffer, no need to copy
222     // it into an internal buffer.
223     callback(buffer.first(read_size));
224     ExpectHeader();
225     return read_size;
226   }
227   // Frame has been split between multiple inputs: assembling it in
228   // an internal buffer.
229   const auto read_size = std::min(buffer.size(), bytes_remaining_);
230 
231   // We could be discarding the payload if it was too big to fit into our
232   // packet buffer.
233   if (!discard_payload_) {
234     PW_DASSERT(bytes_read_ + read_size <= packet_.size());
235     std::memcpy(packet_.data() + bytes_read_, buffer.data(), read_size);
236   }
237 
238   bytes_read_ += read_size;
239   bytes_remaining_ -= read_size;
240   if (bytes_remaining_ == 0) {
241     if (discard_payload_) {
242       discard_payload_ = false;
243     } else {
244       auto packet_span = span(packet_);
245       callback(packet_span.first(bytes_read_));
246     }
247     ExpectHeader();
248   }
249   return read_size;
250 }
251 
252 }  // namespace pw::rpc
253