• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2023 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 #pragma once
15 
16 #include <sys/types.h>
17 
18 #include <mutex>
19 
20 #include "pw_bytes/span.h"
21 #include "pw_metric/metric.h"
22 #include "pw_rpc/channel.h"
23 #include "pw_rpc/packet_meta.h"
24 #include "pw_rpc_transport/hdlc_framing.h"
25 #include "pw_rpc_transport/rpc_transport.h"
26 #include "pw_rpc_transport/simple_framing.h"
27 #include "pw_status/status.h"
28 #include "pw_sync/lock_annotations.h"
29 #include "pw_sync/mutex.h"
30 #include "rpc_transport.h"
31 
32 namespace pw::rpc {
33 namespace internal {
34 
35 void LogBadPacket();
36 void LogChannelIdOverflow(uint32_t channel_id, uint32_t max_channel_id);
37 void LogMissingEgressForChannel(uint32_t channel_id);
38 void LogIngressSendFailure(uint32_t channel_id, Status status);
39 
40 }  // namespace internal
41 
42 // Ties RPC transport and RPC frame encoder together.
43 template <typename Encoder>
44 class RpcEgress : public RpcEgressHandler, public ChannelOutput {
45  public:
RpcEgress(std::string_view channel_name,RpcFrameSender & transport)46   RpcEgress(std::string_view channel_name, RpcFrameSender& transport)
47       : ChannelOutput(channel_name.data()), transport_(transport) {}
48 
49   // Implements both rpc::ChannelOutput and RpcEgressHandler. Encodes the
50   // provided packet using the target transport's MTU as max frame size and
51   // sends it over that transport.
52   //
53   // Sending a packet may result in multiple RpcTransport::Write calls which
54   // must not be interleaved in order for the packet to be successfully
55   // reassembled from the transport-level frames by the receiver. RpcEgress
56   // is using a mutex to ensure this. Technically we could just rely on pw_rpc
57   // global lock but that would unnecessarily couple transport logic to pw_rpc
58   // internals.
SendRpcPacket(ConstByteSpan rpc_packet)59   Status SendRpcPacket(ConstByteSpan rpc_packet) override {
60     std::lock_guard lock(mutex_);
61     return encoder_.Encode(rpc_packet,
62                            transport_.MaximumTransmissionUnit(),
63                            [this](RpcFrame& frame) {
64                              // Encoders must call this callback inline so that
65                              // we're still holding `mutex_` here. Unfortunately
66                              // the lock annotations cannot be used on
67                              // `transport_` to enforce this.
68                              return transport_.Send(frame);
69                            });
70   }
71 
72   // Implements ChannelOutput.
Send(ConstByteSpan buffer)73   Status Send(ConstByteSpan buffer) override { return SendRpcPacket(buffer); }
74 
75  private:
76   sync::Mutex mutex_;
77   RpcFrameSender& transport_;
78   Encoder encoder_ PW_GUARDED_BY(mutex_);
79 };
80 
81 // Ties a channel id and the egress that packets on that channel should be sent
82 // to.
83 struct ChannelEgress {
ChannelEgressChannelEgress84   ChannelEgress(uint32_t id, RpcEgressHandler& egress_handler)
85       : channel_id(id), egress(&egress_handler) {}
86 
87   const uint32_t channel_id;
88   RpcEgressHandler* const egress = nullptr;
89 };
90 
91 // Handler for incoming RPC packets. RpcIngress is not thread-safe and must be
92 // accessed from a single thread (typically the RPC RX thread).
93 template <typename Decoder>
94 class RpcIngress : public RpcIngressHandler {
95  public:
96   static constexpr size_t kMaxChannelId = 64;
97   RpcIngress() = default;
98 
RpcIngress(span<ChannelEgress> channel_egresses)99   explicit RpcIngress(span<ChannelEgress> channel_egresses) {
100     for (auto& channel : channel_egresses) {
101       PW_ASSERT(channel.channel_id <= kMaxChannelId);
102       channel_egresses_[channel.channel_id] = channel.egress;
103     }
104   }
105 
metrics()106   const metric::Group& metrics() const { return metrics_; }
107 
num_bad_packets()108   uint32_t num_bad_packets() const { return bad_packets_.value(); }
109 
num_overflow_channel_ids()110   uint32_t num_overflow_channel_ids() const {
111     return overflow_channel_ids_.value();
112   }
113 
num_missing_egresses()114   uint32_t num_missing_egresses() const { return missing_egresses_.value(); }
115 
num_egress_errors()116   uint32_t num_egress_errors() const { return egress_errors_.value(); }
117 
118   // Finds RPC packets in `buffer`, extracts pw_rpc channel ID from each
119   // packet and sends the packet to the egress registered for that channel.
ProcessIncomingData(ConstByteSpan buffer)120   Status ProcessIncomingData(ConstByteSpan buffer) override {
121     return decoder_.Decode(buffer, [this](ConstByteSpan packet) {
122       const auto packet_meta = rpc::PacketMeta::FromBuffer(packet);
123       if (!packet_meta.ok()) {
124         bad_packets_.Increment();
125         internal::LogBadPacket();
126         return;
127       }
128       if (packet_meta->channel_id() > kMaxChannelId) {
129         overflow_channel_ids_.Increment();
130         internal::LogChannelIdOverflow(packet_meta->channel_id(),
131                                        kMaxChannelId);
132         return;
133       }
134       auto* egress = channel_egresses_[packet_meta->channel_id()];
135       if (egress == nullptr) {
136         missing_egresses_.Increment();
137         internal::LogMissingEgressForChannel(packet_meta->channel_id());
138         return;
139       }
140       const auto status = egress->SendRpcPacket(packet);
141       if (!status.ok()) {
142         egress_errors_.Increment();
143         internal::LogIngressSendFailure(packet_meta->channel_id(), status);
144       }
145     });
146   }
147 
148  private:
149   std::array<RpcEgressHandler*, kMaxChannelId + 1> channel_egresses_{};
150   Decoder decoder_;
151   PW_METRIC_GROUP(metrics_, "pw_rpc_transport");
152   PW_METRIC(metrics_, bad_packets_, "bad_packets", 0u);
153   PW_METRIC(metrics_, overflow_channel_ids_, "overflow_channel_ids", 0u);
154   PW_METRIC(metrics_, missing_egresses_, "missing_egresses", 0u);
155   PW_METRIC(metrics_, egress_errors_, "egress_errors", 0u);
156 };
157 
158 template <size_t kMaxPacketSize>
159 using HdlcRpcEgress = RpcEgress<HdlcRpcPacketEncoder<kMaxPacketSize>>;
160 
161 template <size_t kMaxPacketSize>
162 using HdlcRpcIngress = RpcIngress<HdlcRpcPacketDecoder<kMaxPacketSize>>;
163 
164 template <size_t kMaxPacketSize>
165 using SimpleRpcEgress = RpcEgress<SimpleRpcPacketEncoder<kMaxPacketSize>>;
166 
167 template <size_t kMaxPacketSize>
168 using SimpleRpcIngress = RpcIngress<SimpleRpcPacketDecoder<kMaxPacketSize>>;
169 
170 }  // namespace pw::rpc
171