1 // Copyright 2023 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 #pragma once
15
16 #include "pw_assert/assert.h"
17 #include "pw_bytes/span.h"
18 #include "pw_rpc_transport/rpc_transport.h"
19 #include "pw_status/status.h"
20 #include "pw_status/try.h"
21 #include "rpc_transport.h"
22
23 namespace pw::rpc {
24
25 // The following encoder and decoder implement a very simple RPC framing
26 // protocol where the first frame contains the total packet size in the header
27 // and up to max frame size bytes in the payload. The subsequent frames of the
28 // same packet have an empty header and the rest of the packet in their payload.
29 //
30 // First frame header also contains a special marker as an attempt to
31 // resynchronize the receiver if some frames were not sent (although we expect
32 // all transports using this framing type to be reliable, it's still possible
33 // that some random transport write timeout result in only the first few frames
34 // being sent and others dropped; in that case we attempt best effort recovery
35 // by effectively skipping the input until we see something that resembles a
36 // valid header).
37 //
38 // Both encoder and decoder are not thread-safe. The caller must ensure their
39 // correct use in a multi-threaded environment.
40
41 namespace internal {
42
43 void LogReceivedRpcPacketTooLarge(size_t packet_size, size_t max_packet_size);
44 void LogMalformedRpcFrameHeader();
45
46 } // namespace internal
47
48 template <size_t kMaxPacketSize>
49 class SimpleRpcPacketEncoder
50 : public RpcPacketEncoder<SimpleRpcPacketEncoder<kMaxPacketSize>> {
51 static_assert(kMaxPacketSize <= 1 << 16);
52
53 public:
54 static constexpr size_t kHeaderSize = 4;
55 static constexpr uint16_t kFrameMarker = 0x27f1;
56
57 // Encodes `packet` with a simple framing protocol and split the resulting
58 // frame into chunks of `RpcFrame`s where every `RpcFrame` is no longer than
59 // `max_frame_size`. Calls `callback` for for each of the resulting
60 // `RpcFrame`s.
Encode(ConstByteSpan rpc_packet,size_t max_frame_size,OnRpcFrameEncodedCallback && callback)61 Status Encode(ConstByteSpan rpc_packet,
62 size_t max_frame_size,
63 OnRpcFrameEncodedCallback&& callback) {
64 if (rpc_packet.size() > kMaxPacketSize) {
65 return Status::FailedPrecondition();
66 }
67 if (max_frame_size <= kHeaderSize) {
68 return Status::FailedPrecondition();
69 }
70
71 // First frame. This is the only frame that contains a header.
72 const auto first_frame_size =
73 std::min(max_frame_size - kHeaderSize, rpc_packet.size());
74
75 std::array<std::byte, kHeaderSize> header{
76 std::byte{kFrameMarker & 0xff},
77 std::byte{(kFrameMarker >> 8) & 0xff},
78 static_cast<std::byte>(rpc_packet.size() & 0xff),
79 static_cast<std::byte>((rpc_packet.size() >> 8) & 0xff),
80 };
81
82 RpcFrame frame{.header = span(header),
83 .payload = rpc_packet.first(first_frame_size)};
84 PW_TRY(callback(frame));
85 auto remaining = rpc_packet.subspan(first_frame_size);
86
87 // Second and subsequent frames (if any).
88 while (!remaining.empty()) {
89 auto fragment_size = std::min(max_frame_size, remaining.size());
90 RpcFrame next_frame{.header = {},
91 .payload = remaining.first(fragment_size)};
92 PW_TRY(callback(next_frame));
93 remaining = remaining.subspan(fragment_size);
94 }
95
96 return OkStatus();
97 }
98 };
99
100 template <size_t kMaxPacketSize>
101 class SimpleRpcPacketDecoder
102 : public RpcPacketDecoder<SimpleRpcPacketDecoder<kMaxPacketSize>> {
103 using Encoder = SimpleRpcPacketEncoder<kMaxPacketSize>;
104
105 public:
SimpleRpcPacketDecoder()106 SimpleRpcPacketDecoder() { ExpectHeader(); }
107
108 // Find and decodes `RpcFrame`s in `buffer`. `buffer` may contain zero or
109 // more frames for zero or more packets. Calls `callback` for each
110 // well-formed packet. Malformed packets are ignored and dropped.
Decode(ConstByteSpan buffer,OnRpcPacketDecodedCallback && callback)111 Status Decode(ConstByteSpan buffer, OnRpcPacketDecodedCallback&& callback) {
112 while (!buffer.empty()) {
113 switch (state_) {
114 case State::kReadingHeader: {
115 buffer = buffer.subspan(ReadHeader(buffer));
116 break;
117 }
118 case State::kReadingPayload: {
119 // Payload can only follow a valid header, reset the flag here so
120 // that next invalid header logs again.
121 already_logged_invalid_header_ = false;
122 buffer = buffer.subspan(ReadPayload(buffer, callback));
123 break;
124 }
125 }
126 }
127 return OkStatus();
128 }
129
130 private:
131 enum class State {
132 kReadingHeader,
133 kReadingPayload,
134 };
135
136 size_t ReadHeader(ConstByteSpan buffer);
137
138 size_t ReadPayload(ConstByteSpan buffer,
139 const OnRpcPacketDecodedCallback& callback);
140
ExpectHeader()141 void ExpectHeader() {
142 state_ = State::kReadingHeader;
143 bytes_read_ = 0;
144 bytes_remaining_ = Encoder::kHeaderSize;
145 }
146
ExpectPayload(size_t size)147 void ExpectPayload(size_t size) {
148 state_ = State::kReadingPayload;
149 bytes_read_ = 0;
150 bytes_remaining_ = size;
151 }
152
153 std::array<std::byte, kMaxPacketSize> packet_{};
154 std::array<std::byte, Encoder::kHeaderSize> header_{};
155
156 // Current decoder state.
157 State state_;
158 // How many bytes were read in the current state.
159 size_t bytes_read_ = 0;
160 // How many bytes remain to read in the current state.
161 size_t bytes_remaining_ = 0;
162 // When true, discard the received payload instead of buffering it (because
163 // it's too big to buffer).
164 bool discard_payload_ = false;
165 // When true, skip logging on invalid header if we already logged. This is
166 // to prevent logging on every payload byte of a malformed frame.
167 bool already_logged_invalid_header_ = false;
168 };
169
170 template <size_t kMaxPacketSize>
ReadHeader(ConstByteSpan buffer)171 size_t SimpleRpcPacketDecoder<kMaxPacketSize>::ReadHeader(
172 ConstByteSpan buffer) {
173 const auto read_size = std::min(buffer.size(), bytes_remaining_);
174 bool header_available = false;
175 PW_DASSERT(read_size <= Encoder::kHeaderSize);
176
177 std::memcpy(header_.data() + bytes_read_, buffer.data(), read_size);
178 bytes_read_ += read_size;
179 bytes_remaining_ -= read_size;
180 header_available = bytes_remaining_ == 0;
181
182 if (header_available) {
183 uint16_t marker = (static_cast<uint16_t>(header_[1]) << 8) |
184 static_cast<uint16_t>(header_[0]);
185 uint16_t packet_size = (static_cast<uint16_t>(header_[3]) << 8) |
186 static_cast<uint16_t>(header_[2]);
187
188 if (marker != Encoder::kFrameMarker) {
189 // We expected a header but received some data that is definitely not
190 // a header. Skip it and keep waiting for the next header. This could
191 // also be a false positive, e.g. in the worst case we treat some
192 // random data as a header: even then we should eventually be able to
193 // stumble upon a real header and start processing packets again.
194 ExpectHeader();
195 // Consume only a single byte since we're looking for a header in a
196 // broken stream and it could start at the next byte.
197 if (!already_logged_invalid_header_) {
198 internal::LogMalformedRpcFrameHeader();
199 already_logged_invalid_header_ = true;
200 }
201 return 1;
202 }
203 if (packet_size > kMaxPacketSize) {
204 // Consume both header and packet without saving it, as it's too big
205 // for the buffer. This is likely due to max packet size mismatch
206 // between the encoder and the decoder.
207 internal::LogReceivedRpcPacketTooLarge(packet_size, kMaxPacketSize);
208 discard_payload_ = true;
209 }
210 ExpectPayload(packet_size);
211 }
212
213 return read_size;
214 }
215
216 template <size_t kMaxPacketSize>
ReadPayload(ConstByteSpan buffer,const OnRpcPacketDecodedCallback & callback)217 size_t SimpleRpcPacketDecoder<kMaxPacketSize>::ReadPayload(
218 ConstByteSpan buffer, const OnRpcPacketDecodedCallback& callback) {
219 if (buffer.size() >= bytes_remaining_ && bytes_read_ == 0) {
220 const auto read_size = bytes_remaining_;
221 // We have the whole packet available in the buffer, no need to copy
222 // it into an internal buffer.
223 callback(buffer.first(read_size));
224 ExpectHeader();
225 return read_size;
226 }
227 // Frame has been split between multiple inputs: assembling it in
228 // an internal buffer.
229 const auto read_size = std::min(buffer.size(), bytes_remaining_);
230
231 // We could be discarding the payload if it was too big to fit into our
232 // packet buffer.
233 if (!discard_payload_) {
234 PW_DASSERT(bytes_read_ + read_size <= packet_.size());
235 std::memcpy(packet_.data() + bytes_read_, buffer.data(), read_size);
236 }
237
238 bytes_read_ += read_size;
239 bytes_remaining_ -= read_size;
240 if (bytes_remaining_ == 0) {
241 if (discard_payload_) {
242 discard_payload_ = false;
243 } else {
244 auto packet_span = span(packet_);
245 callback(packet_span.first(bytes_read_));
246 }
247 ExpectHeader();
248 }
249 return read_size;
250 }
251
252 } // namespace pw::rpc
253