• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_hdlc/router.h"
16 
17 #include <inttypes.h>
18 
19 #include <algorithm>
20 
21 #include "pw_hdlc/encoder.h"
22 #include "pw_log/log.h"
23 #include "pw_multibuf/multibuf.h"
24 #include "pw_multibuf/stream.h"
25 #include "pw_result/result.h"
26 #include "pw_stream/null_stream.h"
27 
28 namespace pw::hdlc {
29 
30 using ::pw::async2::Context;
31 using ::pw::async2::Pending;
32 using ::pw::async2::Poll;
33 using ::pw::async2::Ready;
34 using ::pw::channel::ByteReaderWriter;
35 using ::pw::channel::DatagramReaderWriter;
36 using ::pw::multibuf::Chunk;
37 using ::pw::multibuf::MultiBuf;
38 using ::pw::stream::CountingNullStream;
39 
40 namespace {
41 
42 /// HDLC encodes the contents of ``payload`` to ``writer``.
WriteMultiBufUIFrame(uint64_t address,const MultiBuf & payload,stream::Writer & writer)43 Status WriteMultiBufUIFrame(uint64_t address,
44                             const MultiBuf& payload,
45                             stream::Writer& writer) {
46   Encoder encoder(writer);
47   if (Status status = encoder.StartUnnumberedFrame(address); !status.ok()) {
48     return status;
49   }
50   for (const Chunk& chunk : payload.Chunks()) {
51     if (Status status = encoder.WriteData(chunk); !status.ok()) {
52       return status;
53     }
54   }
55   return encoder.FinishFrame();
56 }
57 
58 /// Calculates the size of ``payload`` once HDLC-encoded.
CalculateSizeOnceEncoded(uint64_t address,const MultiBuf & payload)59 Result<size_t> CalculateSizeOnceEncoded(uint64_t address,
60                                         const MultiBuf& payload) {
61   CountingNullStream null_stream;
62   Status encode_status = WriteMultiBufUIFrame(address, payload, null_stream);
63   if (!encode_status.ok()) {
64     return encode_status;
65   }
66   return null_stream.bytes_written();
67 }
68 
69 /// Attempts to decode a frame from ``data``, advancing ``data`` forwards by
70 /// any bytes that are consumed.
DecodeFrame(Decoder & decoder,MultiBuf & data)71 std::optional<Frame> DecodeFrame(Decoder& decoder, MultiBuf& data) {
72   size_t processed = 0;
73   for (std::byte byte : data) {
74     Result<Frame> frame_result = decoder.Process(byte);
75     ++processed;
76     if (frame_result.status().IsUnavailable()) {
77       // No frame is yet available.
78     } else if (frame_result.ok()) {
79       data.DiscardPrefix(processed);
80       return std::move(*frame_result);
81     } else if (frame_result.status().IsDataLoss()) {
82       PW_LOG_ERROR("Discarding invalid incoming HDLC frame.");
83     } else if (frame_result.status().IsResourceExhausted()) {
84       PW_LOG_ERROR("Discarding incoming HDLC frame: too large for buffer.");
85     }
86   }
87   data.DiscardPrefix(processed);
88   return std::nullopt;
89 }
90 
91 }  // namespace
92 
AddChannel(DatagramReaderWriter & channel,uint64_t receive_address,uint64_t send_address)93 Status Router::AddChannel(DatagramReaderWriter& channel,
94                           uint64_t receive_address,
95                           uint64_t send_address) {
96   for (const ChannelData& data : channel_datas_) {
97     if ((data.channel == &channel) ||
98         (data.receive_address == receive_address) ||
99         (data.send_address == send_address)) {
100       return Status::AlreadyExists();
101     }
102   }
103   channel_datas_.emplace_back(channel, receive_address, send_address);
104   return OkStatus();
105 }
106 
RemoveChannel(DatagramReaderWriter & channel,uint64_t receive_address,uint64_t send_address)107 Status Router::RemoveChannel(DatagramReaderWriter& channel,
108                              uint64_t receive_address,
109                              uint64_t send_address) {
110   auto channel_entry = std::find_if(
111       channel_datas_.begin(),
112       channel_datas_.end(),
113       [&channel, receive_address, send_address](const ChannelData& data) {
114         return (data.channel == &channel) &&
115                (data.receive_address == receive_address) &&
116                (data.send_address == send_address);
117       });
118   if (channel_entry == channel_datas_.end()) {
119     return Status::NotFound();
120   }
121   if (channel_datas_.size() == 1) {
122     channel_datas_.clear();
123   } else {
124     // Put the ChannelData in the back of
125     // the list and pop it out to avoid shifting
126     // all elements.
127     std::swap(*channel_entry, channel_datas_.back());
128     channel_datas_.pop_back();
129   }
130   return OkStatus();
131 }
132 
FindChannelForReceiveAddress(uint64_t receive_address)133 Router::ChannelData* Router::FindChannelForReceiveAddress(
134     uint64_t receive_address) {
135   for (auto& channel : channel_datas_) {
136     if (channel.receive_address == receive_address) {
137       return &channel;
138     }
139   }
140   return nullptr;
141 }
142 
PollDeliverIncomingFrame(Context & cx,const Frame & frame)143 Poll<> Router::PollDeliverIncomingFrame(Context& cx, const Frame& frame) {
144   ConstByteSpan data = frame.data();
145   uint64_t address = frame.address();
146   ChannelData* channel = FindChannelForReceiveAddress(address);
147   if (channel == nullptr) {
148     PW_LOG_ERROR("Received incoming HDLC packet with address %" PRIu64
149                  ", but no channel with that incoming address is registered.",
150                  address);
151     incoming_allocation_future_ = std::nullopt;
152     return Ready();
153   }
154   Poll<Status> ready_to_write = channel->channel->PendReadyToWrite(cx);
155   if (ready_to_write.IsPending()) {
156     return Pending();
157   }
158   if (!ready_to_write->ok()) {
159     PW_LOG_ERROR("Channel at incoming HDLC address %" PRIu64
160                  " became unwriteable. Status: %d",
161                  channel->receive_address,
162                  ready_to_write->code());
163     return Ready();
164   }
165   if (!incoming_allocation_future_.has_value()) {
166     incoming_allocation_future_ =
167         channel->channel->GetWriteAllocator().AllocateAsync(data.size());
168   }
169   Poll<std::optional<MultiBuf>> buffer = incoming_allocation_future_->Pend(cx);
170   if (buffer.IsPending()) {
171     return Pending();
172   }
173   incoming_allocation_future_ = std::nullopt;
174   if (!buffer->has_value()) {
175     PW_LOG_ERROR(
176         "Unable to allocate a buffer of size %zu destined for incoming "
177         "HDLC address %" PRIu64 ". Packet will be discarded.",
178         data.size(),
179         frame.address());
180     return Ready();
181   }
182   std::copy(frame.data().begin(), frame.data().end(), (**buffer).begin());
183   Status write_status = channel->channel->Write(std::move(**buffer)).status();
184   if (!write_status.ok()) {
185     PW_LOG_ERROR(
186         "Failed to write a buffer of size %zu destined for incoming HDLC "
187         "address %" PRIu64 ". Status: %d",
188         data.size(),
189         channel->receive_address,
190         write_status.code());
191   }
192   return Ready();
193 }
194 
DecodeAndWriteIncoming(Context & cx)195 void Router::DecodeAndWriteIncoming(Context& cx) {
196   while (true) {
197     if (decoded_frame_.has_value()) {
198       if (PollDeliverIncomingFrame(cx, *decoded_frame_).IsPending()) {
199         return;
200       }
201       // Zero out the frame delivery state.
202       decoded_frame_ = std::nullopt;
203     }
204 
205     while (incoming_data_.empty()) {
206       Poll<Result<MultiBuf>> incoming = io_channel_.PendRead(cx);
207       if (incoming.IsPending()) {
208         return;
209       }
210       if (!incoming->ok()) {
211         if (incoming->status().IsFailedPrecondition()) {
212           PW_LOG_WARN("HDLC io_channel has closed.");
213         } else {
214           PW_LOG_ERROR("Unable to read from HDLC io_channel. Status: %d",
215                        incoming->status().code());
216         }
217         return;
218       }
219       incoming_data_ = std::move(**incoming);
220     }
221 
222     decoded_frame_ = DecodeFrame(decoder_, incoming_data_);
223   }
224 }
225 
TryFillBufferToEncodeAndSend(Context & cx)226 void Router::TryFillBufferToEncodeAndSend(Context& cx) {
227   if (buffer_to_encode_and_send_.has_value()) {
228     return;
229   }
230   for (size_t i = 0; i < channel_datas_.size(); ++i) {
231     ChannelData& cd =
232         channel_datas_[(next_first_read_index_ + i) % channel_datas_.size()];
233     Poll<Result<MultiBuf>> buf = cd.channel->PendRead(cx);
234     if (buf.IsPending()) {
235       continue;
236     }
237     if (!buf->ok()) {
238       if (buf->status().IsUnimplemented()) {
239         PW_LOG_ERROR("Channel registered for outgoing HDLC address %" PRIu64
240                      " is not readable.",
241                      cd.send_address);
242       }
243       // We ignore FAILED_PRECONDITION (closed) because it will be handled
244       // elsewhere. OUT_OF_RANGE just means we have finished writing. No
245       // action is needed because the channel may still be receiving data.
246       continue;
247     }
248     buffer_to_encode_and_send_ = std::move(**buf);
249     address_to_encode_and_send_to_ = cd.send_address;
250     // We received data, so ensure that we start by reading from a different
251     // index next time.
252     next_first_read_index_ =
253         (next_first_read_index_ + 1) % channel_datas_.size();
254     return;
255   }
256 }
257 
WriteOutgoingMessages(Context & cx)258 void Router::WriteOutgoingMessages(Context& cx) {
259   while (io_channel_.is_write_open() &&
260          io_channel_.PendReadyToWrite(cx).IsReady()) {
261     TryFillBufferToEncodeAndSend(cx);
262     if (!buffer_to_encode_and_send_.has_value()) {
263       // No channels have new data to send.
264       return;
265     }
266     if (!outgoing_allocation_future_.has_value()) {
267       Result<size_t> encoded_size = CalculateSizeOnceEncoded(
268           address_to_encode_and_send_to_, *buffer_to_encode_and_send_);
269       if (!encoded_size.ok()) {
270         PW_LOG_ERROR(
271             "Unable to compute size of encoded packet for outgoing buffer of "
272             "size %zu destined for outgoing HDLC address %" PRIu64
273             ". Packet will be discarded.",
274             buffer_to_encode_and_send_->size(),
275             address_to_encode_and_send_to_);
276         buffer_to_encode_and_send_ = std::nullopt;
277         continue;
278       }
279       outgoing_allocation_future_ =
280           io_channel_.GetWriteAllocator().AllocateAsync(*encoded_size);
281     }
282     Poll<std::optional<MultiBuf>> maybe_write_buffer =
283         outgoing_allocation_future_->Pend(cx);
284     if (maybe_write_buffer.IsPending()) {
285       // Channel cannot write any further messages until we can allocate.
286       return;
287     }
288     // We've gotten the allocation: discard the future.
289     size_t buffer_size = outgoing_allocation_future_->min_size();
290     outgoing_allocation_future_ = std::nullopt;
291     if (!maybe_write_buffer->has_value()) {
292       // We can't allocate a write buffer large enough for our encoded frame.
293       // Sadly, we have to throw the frame away.
294       PW_LOG_ERROR(
295           "Unable to allocate a buffer of size %zu destined for outgoing "
296           "HDLC address %" PRIu64 ". Packet will be discarded.",
297           buffer_size,
298           address_to_encode_and_send_to_);
299       buffer_to_encode_and_send_ = std::nullopt;
300       continue;
301     }
302     MultiBuf write_buffer = std::move(**maybe_write_buffer);
303     Status encode_status =
304         WriteMultiBufUIFrame(address_to_encode_and_send_to_,
305                              *buffer_to_encode_and_send_,
306                              pw::multibuf::Stream(write_buffer));
307     buffer_to_encode_and_send_ = std::nullopt;
308     if (!encode_status.ok()) {
309       PW_LOG_ERROR(
310           "Failed to encode a buffer destined for outgoing HDLC address "
311           "%" PRIu64 ". Status: %d",
312           address_to_encode_and_send_to_,
313           encode_status.code());
314       continue;
315     }
316     Status write_status = io_channel_.Write(std::move(write_buffer)).status();
317     if (!write_status.ok()) {
318       PW_LOG_ERROR(
319           "Failed to write a buffer of size %zu destined for outgoing HDLC "
320           "address %" PRIu64 ". Status: %d",
321           buffer_size,
322           address_to_encode_and_send_to_,
323           write_status.code());
324     }
325   }
326 }
327 
Pend(Context & cx)328 Poll<> Router::Pend(Context& cx) {
329   // We check for ability to read, but not write, because we may not always
330   // attempt a write, which would cause us to miss that the channel has closed
331   // for writes.
332   //
333   // Additionally, it is uncommon for a channel to remain readable but not
334   // writeable: the reverse is more common (still readable while no longer
335   // writeable).
336   if (!io_channel_.is_read_open()) {
337     return PendClose(cx);
338   }
339   DecodeAndWriteIncoming(cx);
340   WriteOutgoingMessages(cx);
341   RemoveClosedChannels();
342   if (!io_channel_.is_read_open()) {
343     return PendClose(cx);
344   }
345   return Pending();
346 }
347 
PendClose(Context & cx)348 Poll<> Router::PendClose(Context& cx) {
349   for (ChannelData& cd : channel_datas_) {
350     // We ignore the status value from close.
351     // If one or more channels are unable to close, they will remain after
352     // `RemoveClosedChannels` and `channel_datas_.size()` will be nonzero.
353     cd.channel->PendClose(cx).IgnorePoll();
354   }
355   RemoveClosedChannels();
356   if (io_channel_.PendClose(cx).IsPending()) {
357     return Pending();
358   }
359   if (channel_datas_.empty()) {
360     return Ready();
361   } else {
362     return Pending();
363   }
364 }
365 
RemoveClosedChannels()366 void Router::RemoveClosedChannels() {
367   auto first_to_remove = std::remove_if(
368       channel_datas_.begin(), channel_datas_.end(), [](const ChannelData& cd) {
369         return !cd.channel->is_read_or_write_open();
370       });
371   channel_datas_.erase(first_to_remove, channel_datas_.end());
372 }
373 
374 }  // namespace pw::hdlc
375