1 // Copyright 2023 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 #pragma once 15 16 #include <sys/types.h> 17 18 #include <mutex> 19 20 #include "pw_bytes/span.h" 21 #include "pw_metric/metric.h" 22 #include "pw_rpc/channel.h" 23 #include "pw_rpc/packet_meta.h" 24 #include "pw_rpc_transport/hdlc_framing.h" 25 #include "pw_rpc_transport/rpc_transport.h" 26 #include "pw_rpc_transport/simple_framing.h" 27 #include "pw_status/status.h" 28 #include "pw_sync/lock_annotations.h" 29 #include "pw_sync/mutex.h" 30 #include "rpc_transport.h" 31 32 namespace pw::rpc { 33 namespace internal { 34 35 void LogBadPacket(); 36 void LogChannelIdOverflow(uint32_t channel_id, uint32_t max_channel_id); 37 void LogMissingEgressForChannel(uint32_t channel_id); 38 void LogIngressSendFailure(uint32_t channel_id, Status status); 39 40 } // namespace internal 41 42 // Ties RPC transport and RPC frame encoder together. 43 template <typename Encoder> 44 class RpcEgress : public RpcEgressHandler, public ChannelOutput { 45 public: RpcEgress(std::string_view channel_name,RpcFrameSender & transport)46 RpcEgress(std::string_view channel_name, RpcFrameSender& transport) 47 : ChannelOutput(channel_name.data()), transport_(transport) {} 48 49 // Implements both rpc::ChannelOutput and RpcEgressHandler. Encodes the 50 // provided packet using the target transport's MTU as max frame size and 51 // sends it over that transport. 52 // 53 // Sending a packet may result in multiple RpcTransport::Write calls which 54 // must not be interleaved in order for the packet to be successfully 55 // reassembled from the transport-level frames by the receiver. RpcEgress 56 // is using a mutex to ensure this. Technically we could just rely on pw_rpc 57 // global lock but that would unnecessarily couple transport logic to pw_rpc 58 // internals. SendRpcPacket(ConstByteSpan rpc_packet)59 Status SendRpcPacket(ConstByteSpan rpc_packet) override { 60 std::lock_guard lock(mutex_); 61 return encoder_.Encode(rpc_packet, 62 transport_.MaximumTransmissionUnit(), 63 [this](RpcFrame& frame) { 64 // Encoders must call this callback inline so that 65 // we're still holding `mutex_` here. Unfortunately 66 // the lock annotations cannot be used on 67 // `transport_` to enforce this. 68 return transport_.Send(frame); 69 }); 70 } 71 72 // Implements ChannelOutput. Send(ConstByteSpan buffer)73 Status Send(ConstByteSpan buffer) override { return SendRpcPacket(buffer); } 74 75 private: 76 sync::Mutex mutex_; 77 RpcFrameSender& transport_; 78 Encoder encoder_ PW_GUARDED_BY(mutex_); 79 }; 80 81 // Ties a channel id and the egress that packets on that channel should be sent 82 // to. 83 struct ChannelEgress { ChannelEgressChannelEgress84 ChannelEgress(uint32_t id, RpcEgressHandler& egress_handler) 85 : channel_id(id), egress(&egress_handler) {} 86 87 const uint32_t channel_id; 88 RpcEgressHandler* const egress = nullptr; 89 }; 90 91 // Handler for incoming RPC packets. RpcIngress is not thread-safe and must be 92 // accessed from a single thread (typically the RPC RX thread). 93 template <typename Decoder> 94 class RpcIngress : public RpcIngressHandler { 95 public: 96 static constexpr size_t kMaxChannelId = 64; 97 RpcIngress() = default; 98 RpcIngress(span<ChannelEgress> channel_egresses)99 explicit RpcIngress(span<ChannelEgress> channel_egresses) { 100 for (auto& channel : channel_egresses) { 101 PW_ASSERT(channel.channel_id <= kMaxChannelId); 102 channel_egresses_[channel.channel_id] = channel.egress; 103 } 104 } 105 metrics()106 const metric::Group& metrics() const { return metrics_; } 107 num_bad_packets()108 uint32_t num_bad_packets() const { return bad_packets_.value(); } 109 num_overflow_channel_ids()110 uint32_t num_overflow_channel_ids() const { 111 return overflow_channel_ids_.value(); 112 } 113 num_missing_egresses()114 uint32_t num_missing_egresses() const { return missing_egresses_.value(); } 115 num_egress_errors()116 uint32_t num_egress_errors() const { return egress_errors_.value(); } 117 118 // Finds RPC packets in `buffer`, extracts pw_rpc channel ID from each 119 // packet and sends the packet to the egress registered for that channel. ProcessIncomingData(ConstByteSpan buffer)120 Status ProcessIncomingData(ConstByteSpan buffer) override { 121 return decoder_.Decode(buffer, [this](ConstByteSpan packet) { 122 const auto packet_meta = rpc::PacketMeta::FromBuffer(packet); 123 if (!packet_meta.ok()) { 124 bad_packets_.Increment(); 125 internal::LogBadPacket(); 126 return; 127 } 128 if (packet_meta->channel_id() > kMaxChannelId) { 129 overflow_channel_ids_.Increment(); 130 internal::LogChannelIdOverflow(packet_meta->channel_id(), 131 kMaxChannelId); 132 return; 133 } 134 auto* egress = channel_egresses_[packet_meta->channel_id()]; 135 if (egress == nullptr) { 136 missing_egresses_.Increment(); 137 internal::LogMissingEgressForChannel(packet_meta->channel_id()); 138 return; 139 } 140 const auto status = egress->SendRpcPacket(packet); 141 if (!status.ok()) { 142 egress_errors_.Increment(); 143 internal::LogIngressSendFailure(packet_meta->channel_id(), status); 144 } 145 }); 146 } 147 148 private: 149 std::array<RpcEgressHandler*, kMaxChannelId + 1> channel_egresses_{}; 150 Decoder decoder_; 151 PW_METRIC_GROUP(metrics_, "pw_rpc_transport"); 152 PW_METRIC(metrics_, bad_packets_, "bad_packets", 0u); 153 PW_METRIC(metrics_, overflow_channel_ids_, "overflow_channel_ids", 0u); 154 PW_METRIC(metrics_, missing_egresses_, "missing_egresses", 0u); 155 PW_METRIC(metrics_, egress_errors_, "egress_errors", 0u); 156 }; 157 158 template <size_t kMaxPacketSize> 159 using HdlcRpcEgress = RpcEgress<HdlcRpcPacketEncoder<kMaxPacketSize>>; 160 161 template <size_t kMaxPacketSize> 162 using HdlcRpcIngress = RpcIngress<HdlcRpcPacketDecoder<kMaxPacketSize>>; 163 164 template <size_t kMaxPacketSize> 165 using SimpleRpcEgress = RpcEgress<SimpleRpcPacketEncoder<kMaxPacketSize>>; 166 167 template <size_t kMaxPacketSize> 168 using SimpleRpcIngress = RpcIngress<SimpleRpcPacketDecoder<kMaxPacketSize>>; 169 170 } // namespace pw::rpc 171