1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_hdlc/router.h"
16
17 #include <inttypes.h>
18
19 #include <algorithm>
20
21 #include "pw_hdlc/encoder.h"
22 #include "pw_log/log.h"
23 #include "pw_multibuf/multibuf.h"
24 #include "pw_multibuf/stream.h"
25 #include "pw_result/result.h"
26 #include "pw_stream/null_stream.h"
27
28 namespace pw::hdlc {
29
30 using ::pw::async2::Context;
31 using ::pw::async2::Pending;
32 using ::pw::async2::Poll;
33 using ::pw::async2::Ready;
34 using ::pw::channel::ByteReaderWriter;
35 using ::pw::channel::DatagramReaderWriter;
36 using ::pw::multibuf::Chunk;
37 using ::pw::multibuf::MultiBuf;
38 using ::pw::stream::CountingNullStream;
39
40 namespace {
41
42 /// HDLC encodes the contents of ``payload`` to ``writer``.
WriteMultiBufUIFrame(uint64_t address,const MultiBuf & payload,stream::Writer & writer)43 Status WriteMultiBufUIFrame(uint64_t address,
44 const MultiBuf& payload,
45 stream::Writer& writer) {
46 Encoder encoder(writer);
47 if (Status status = encoder.StartUnnumberedFrame(address); !status.ok()) {
48 return status;
49 }
50 for (const Chunk& chunk : payload.Chunks()) {
51 if (Status status = encoder.WriteData(chunk); !status.ok()) {
52 return status;
53 }
54 }
55 return encoder.FinishFrame();
56 }
57
58 /// Calculates the size of ``payload`` once HDLC-encoded.
CalculateSizeOnceEncoded(uint64_t address,const MultiBuf & payload)59 Result<size_t> CalculateSizeOnceEncoded(uint64_t address,
60 const MultiBuf& payload) {
61 CountingNullStream null_stream;
62 Status encode_status = WriteMultiBufUIFrame(address, payload, null_stream);
63 if (!encode_status.ok()) {
64 return encode_status;
65 }
66 return null_stream.bytes_written();
67 }
68
69 /// Attempts to decode a frame from ``data``, advancing ``data`` forwards by
70 /// any bytes that are consumed.
DecodeFrame(Decoder & decoder,MultiBuf & data)71 std::optional<Frame> DecodeFrame(Decoder& decoder, MultiBuf& data) {
72 size_t processed = 0;
73 for (std::byte byte : data) {
74 Result<Frame> frame_result = decoder.Process(byte);
75 ++processed;
76 if (frame_result.status().IsUnavailable()) {
77 // No frame is yet available.
78 } else if (frame_result.ok()) {
79 data.DiscardPrefix(processed);
80 return std::move(*frame_result);
81 } else if (frame_result.status().IsDataLoss()) {
82 PW_LOG_ERROR("Discarding invalid incoming HDLC frame.");
83 } else if (frame_result.status().IsResourceExhausted()) {
84 PW_LOG_ERROR("Discarding incoming HDLC frame: too large for buffer.");
85 }
86 }
87 data.DiscardPrefix(processed);
88 return std::nullopt;
89 }
90
91 } // namespace
92
AddChannel(DatagramReaderWriter & channel,uint64_t receive_address,uint64_t send_address)93 Status Router::AddChannel(DatagramReaderWriter& channel,
94 uint64_t receive_address,
95 uint64_t send_address) {
96 for (const ChannelData& data : channel_datas_) {
97 if ((data.channel == &channel) ||
98 (data.receive_address == receive_address) ||
99 (data.send_address == send_address)) {
100 return Status::AlreadyExists();
101 }
102 }
103 channel_datas_.emplace_back(channel, receive_address, send_address);
104 return OkStatus();
105 }
106
RemoveChannel(DatagramReaderWriter & channel,uint64_t receive_address,uint64_t send_address)107 Status Router::RemoveChannel(DatagramReaderWriter& channel,
108 uint64_t receive_address,
109 uint64_t send_address) {
110 auto channel_entry = std::find_if(
111 channel_datas_.begin(),
112 channel_datas_.end(),
113 [&channel, receive_address, send_address](const ChannelData& data) {
114 return (data.channel == &channel) &&
115 (data.receive_address == receive_address) &&
116 (data.send_address == send_address);
117 });
118 if (channel_entry == channel_datas_.end()) {
119 return Status::NotFound();
120 }
121 if (channel_datas_.size() == 1) {
122 channel_datas_.clear();
123 } else {
124 // Put the ChannelData in the back of
125 // the list and pop it out to avoid shifting
126 // all elements.
127 std::swap(*channel_entry, channel_datas_.back());
128 channel_datas_.pop_back();
129 }
130 return OkStatus();
131 }
132
FindChannelForReceiveAddress(uint64_t receive_address)133 Router::ChannelData* Router::FindChannelForReceiveAddress(
134 uint64_t receive_address) {
135 for (auto& channel : channel_datas_) {
136 if (channel.receive_address == receive_address) {
137 return &channel;
138 }
139 }
140 return nullptr;
141 }
142
PollDeliverIncomingFrame(Context & cx,const Frame & frame)143 Poll<> Router::PollDeliverIncomingFrame(Context& cx, const Frame& frame) {
144 ConstByteSpan data = frame.data();
145 uint64_t address = frame.address();
146 ChannelData* channel = FindChannelForReceiveAddress(address);
147 if (channel == nullptr) {
148 PW_LOG_ERROR("Received incoming HDLC packet with address %" PRIu64
149 ", but no channel with that incoming address is registered.",
150 address);
151 incoming_allocation_future_ = std::nullopt;
152 return Ready();
153 }
154 Poll<Status> ready_to_write = channel->channel->PendReadyToWrite(cx);
155 if (ready_to_write.IsPending()) {
156 return Pending();
157 }
158 if (!ready_to_write->ok()) {
159 PW_LOG_ERROR("Channel at incoming HDLC address %" PRIu64
160 " became unwriteable. Status: %d",
161 channel->receive_address,
162 ready_to_write->code());
163 return Ready();
164 }
165 if (!incoming_allocation_future_.has_value()) {
166 incoming_allocation_future_ =
167 channel->channel->GetWriteAllocator().AllocateAsync(data.size());
168 }
169 Poll<std::optional<MultiBuf>> buffer = incoming_allocation_future_->Pend(cx);
170 if (buffer.IsPending()) {
171 return Pending();
172 }
173 incoming_allocation_future_ = std::nullopt;
174 if (!buffer->has_value()) {
175 PW_LOG_ERROR(
176 "Unable to allocate a buffer of size %zu destined for incoming "
177 "HDLC address %" PRIu64 ". Packet will be discarded.",
178 data.size(),
179 frame.address());
180 return Ready();
181 }
182 std::copy(frame.data().begin(), frame.data().end(), (**buffer).begin());
183 Status write_status = channel->channel->Write(std::move(**buffer)).status();
184 if (!write_status.ok()) {
185 PW_LOG_ERROR(
186 "Failed to write a buffer of size %zu destined for incoming HDLC "
187 "address %" PRIu64 ". Status: %d",
188 data.size(),
189 channel->receive_address,
190 write_status.code());
191 }
192 return Ready();
193 }
194
DecodeAndWriteIncoming(Context & cx)195 void Router::DecodeAndWriteIncoming(Context& cx) {
196 while (true) {
197 if (decoded_frame_.has_value()) {
198 if (PollDeliverIncomingFrame(cx, *decoded_frame_).IsPending()) {
199 return;
200 }
201 // Zero out the frame delivery state.
202 decoded_frame_ = std::nullopt;
203 }
204
205 while (incoming_data_.empty()) {
206 Poll<Result<MultiBuf>> incoming = io_channel_.PendRead(cx);
207 if (incoming.IsPending()) {
208 return;
209 }
210 if (!incoming->ok()) {
211 if (incoming->status().IsFailedPrecondition()) {
212 PW_LOG_WARN("HDLC io_channel has closed.");
213 } else {
214 PW_LOG_ERROR("Unable to read from HDLC io_channel. Status: %d",
215 incoming->status().code());
216 }
217 return;
218 }
219 incoming_data_ = std::move(**incoming);
220 }
221
222 decoded_frame_ = DecodeFrame(decoder_, incoming_data_);
223 }
224 }
225
TryFillBufferToEncodeAndSend(Context & cx)226 void Router::TryFillBufferToEncodeAndSend(Context& cx) {
227 if (buffer_to_encode_and_send_.has_value()) {
228 return;
229 }
230 for (size_t i = 0; i < channel_datas_.size(); ++i) {
231 ChannelData& cd =
232 channel_datas_[(next_first_read_index_ + i) % channel_datas_.size()];
233 Poll<Result<MultiBuf>> buf = cd.channel->PendRead(cx);
234 if (buf.IsPending()) {
235 continue;
236 }
237 if (!buf->ok()) {
238 if (buf->status().IsUnimplemented()) {
239 PW_LOG_ERROR("Channel registered for outgoing HDLC address %" PRIu64
240 " is not readable.",
241 cd.send_address);
242 }
243 // We ignore FAILED_PRECONDITION (closed) because it will be handled
244 // elsewhere. OUT_OF_RANGE just means we have finished writing. No
245 // action is needed because the channel may still be receiving data.
246 continue;
247 }
248 buffer_to_encode_and_send_ = std::move(**buf);
249 address_to_encode_and_send_to_ = cd.send_address;
250 // We received data, so ensure that we start by reading from a different
251 // index next time.
252 next_first_read_index_ =
253 (next_first_read_index_ + 1) % channel_datas_.size();
254 return;
255 }
256 }
257
WriteOutgoingMessages(Context & cx)258 void Router::WriteOutgoingMessages(Context& cx) {
259 while (io_channel_.is_write_open() &&
260 io_channel_.PendReadyToWrite(cx).IsReady()) {
261 TryFillBufferToEncodeAndSend(cx);
262 if (!buffer_to_encode_and_send_.has_value()) {
263 // No channels have new data to send.
264 return;
265 }
266 if (!outgoing_allocation_future_.has_value()) {
267 Result<size_t> encoded_size = CalculateSizeOnceEncoded(
268 address_to_encode_and_send_to_, *buffer_to_encode_and_send_);
269 if (!encoded_size.ok()) {
270 PW_LOG_ERROR(
271 "Unable to compute size of encoded packet for outgoing buffer of "
272 "size %zu destined for outgoing HDLC address %" PRIu64
273 ". Packet will be discarded.",
274 buffer_to_encode_and_send_->size(),
275 address_to_encode_and_send_to_);
276 buffer_to_encode_and_send_ = std::nullopt;
277 continue;
278 }
279 outgoing_allocation_future_ =
280 io_channel_.GetWriteAllocator().AllocateAsync(*encoded_size);
281 }
282 Poll<std::optional<MultiBuf>> maybe_write_buffer =
283 outgoing_allocation_future_->Pend(cx);
284 if (maybe_write_buffer.IsPending()) {
285 // Channel cannot write any further messages until we can allocate.
286 return;
287 }
288 // We've gotten the allocation: discard the future.
289 size_t buffer_size = outgoing_allocation_future_->min_size();
290 outgoing_allocation_future_ = std::nullopt;
291 if (!maybe_write_buffer->has_value()) {
292 // We can't allocate a write buffer large enough for our encoded frame.
293 // Sadly, we have to throw the frame away.
294 PW_LOG_ERROR(
295 "Unable to allocate a buffer of size %zu destined for outgoing "
296 "HDLC address %" PRIu64 ". Packet will be discarded.",
297 buffer_size,
298 address_to_encode_and_send_to_);
299 buffer_to_encode_and_send_ = std::nullopt;
300 continue;
301 }
302 MultiBuf write_buffer = std::move(**maybe_write_buffer);
303 Status encode_status =
304 WriteMultiBufUIFrame(address_to_encode_and_send_to_,
305 *buffer_to_encode_and_send_,
306 pw::multibuf::Stream(write_buffer));
307 buffer_to_encode_and_send_ = std::nullopt;
308 if (!encode_status.ok()) {
309 PW_LOG_ERROR(
310 "Failed to encode a buffer destined for outgoing HDLC address "
311 "%" PRIu64 ". Status: %d",
312 address_to_encode_and_send_to_,
313 encode_status.code());
314 continue;
315 }
316 Status write_status = io_channel_.Write(std::move(write_buffer)).status();
317 if (!write_status.ok()) {
318 PW_LOG_ERROR(
319 "Failed to write a buffer of size %zu destined for outgoing HDLC "
320 "address %" PRIu64 ". Status: %d",
321 buffer_size,
322 address_to_encode_and_send_to_,
323 write_status.code());
324 }
325 }
326 }
327
Pend(Context & cx)328 Poll<> Router::Pend(Context& cx) {
329 // We check for ability to read, but not write, because we may not always
330 // attempt a write, which would cause us to miss that the channel has closed
331 // for writes.
332 //
333 // Additionally, it is uncommon for a channel to remain readable but not
334 // writeable: the reverse is more common (still readable while no longer
335 // writeable).
336 if (!io_channel_.is_read_open()) {
337 return PendClose(cx);
338 }
339 DecodeAndWriteIncoming(cx);
340 WriteOutgoingMessages(cx);
341 RemoveClosedChannels();
342 if (!io_channel_.is_read_open()) {
343 return PendClose(cx);
344 }
345 return Pending();
346 }
347
PendClose(Context & cx)348 Poll<> Router::PendClose(Context& cx) {
349 for (ChannelData& cd : channel_datas_) {
350 // We ignore the status value from close.
351 // If one or more channels are unable to close, they will remain after
352 // `RemoveClosedChannels` and `channel_datas_.size()` will be nonzero.
353 cd.channel->PendClose(cx).IgnorePoll();
354 }
355 RemoveClosedChannels();
356 if (io_channel_.PendClose(cx).IsPending()) {
357 return Pending();
358 }
359 if (channel_datas_.empty()) {
360 return Ready();
361 } else {
362 return Pending();
363 }
364 }
365
RemoveClosedChannels()366 void Router::RemoveClosedChannels() {
367 auto first_to_remove = std::remove_if(
368 channel_datas_.begin(), channel_datas_.end(), [](const ChannelData& cd) {
369 return !cd.channel->is_read_or_write_open();
370 });
371 channel_datas_.erase(first_to_remove, channel_datas_.end());
372 }
373
374 } // namespace pw::hdlc
375