• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2023 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_rpc_transport/egress_ingress.h"
16 
17 #include <random>
18 
19 #include "public/pw_rpc_transport/rpc_transport.h"
20 #include "pw_bytes/span.h"
21 #include "pw_metric/metric.h"
22 #include "pw_rpc/client_server.h"
23 #include "pw_rpc/packet_meta.h"
24 #include "pw_rpc_transport/hdlc_framing.h"
25 #include "pw_rpc_transport/internal/test.rpc.pwpb.h"
26 #include "pw_rpc_transport/rpc_transport.h"
27 #include "pw_rpc_transport/service_registry.h"
28 #include "pw_rpc_transport/simple_framing.h"
29 #include "pw_status/status.h"
30 #include "pw_string/string.h"
31 #include "pw_sync/thread_notification.h"
32 #include "pw_unit_test/framework.h"
33 
34 namespace pw::rpc {
35 namespace {
36 
37 constexpr size_t kMaxPacketSize = 256;
38 
39 class TestService final
40     : public pw_rpc_transport::testing::pw_rpc::pwpb::TestService::Service<
41           TestService> {
42  public:
Echo(const pw_rpc_transport::testing::pwpb::EchoMessage::Message & request,pw_rpc_transport::testing::pwpb::EchoMessage::Message & response)43   Status Echo(
44       const pw_rpc_transport::testing::pwpb::EchoMessage::Message& request,
45       pw_rpc_transport::testing::pwpb::EchoMessage::Message& response) {
46     response.msg = request.msg;
47     return OkStatus();
48   }
49 };
50 
51 // A transport that stores all received frames so they can be manually retrieved
52 // by the ingress later.
53 class TestTransport : public RpcFrameSender {
54  public:
TestTransport(size_t mtu,bool is_faulty=false)55   explicit TestTransport(size_t mtu, bool is_faulty = false)
56       : mtu_(mtu), is_faulty_(is_faulty) {}
57 
MaximumTransmissionUnit() const58   size_t MaximumTransmissionUnit() const override { return mtu_; }
59 
Send(RpcFrame frame)60   Status Send(RpcFrame frame) override {
61     if (is_faulty_) {
62       return Status::Internal();
63     }
64     std::copy(
65         frame.header.begin(), frame.header.end(), std::back_inserter(buffer_));
66     std::copy(frame.payload.begin(),
67               frame.payload.end(),
68               std::back_inserter(buffer_));
69     return OkStatus();
70   }
71 
buffer()72   ByteSpan buffer() { return buffer_; }
73 
74  private:
75   size_t mtu_;
76   bool is_faulty_ = false;
77   std::vector<std::byte> buffer_;
78 };
79 
80 // An egress handler that passes the received RPC packet to the service
81 // registry.
82 class TestLocalEgress : public RpcEgressHandler {
83  public:
SendRpcPacket(ConstByteSpan packet)84   Status SendRpcPacket(ConstByteSpan packet) override {
85     if (!registry_) {
86       return Status::FailedPrecondition();
87     }
88     return registry_->ProcessRpcPacket(packet);
89   }
90 
set_registry(ServiceRegistry & registry)91   void set_registry(ServiceRegistry& registry) { registry_ = &registry; }
92 
93  private:
94   ServiceRegistry* registry_ = nullptr;
95 };
96 
TEST(RpcEgressIngressTest,SimpleFramingRoundtrip)97 TEST(RpcEgressIngressTest, SimpleFramingRoundtrip) {
98   constexpr uint32_t kChannelAtoB = 1;
99   constexpr size_t kMaxMessageLength = 200;
100   constexpr size_t kAtoBMtu = 33;
101   constexpr size_t kBtoAMtu = 72;
102 
103   TestTransport transport_a_to_b(kAtoBMtu);
104   TestTransport transport_b_to_a(kBtoAMtu);
105 
106   SimpleRpcEgress<kMaxPacketSize> egress_a_to_b("a->b", transport_a_to_b);
107   SimpleRpcEgress<kMaxPacketSize> egress_b_to_a("b->a", transport_b_to_a);
108 
109   std::array a_tx_channels = {
110       rpc::Channel::Create<kChannelAtoB>(&egress_a_to_b)};
111   std::array b_tx_channels = {
112       rpc::Channel::Create<kChannelAtoB>(&egress_b_to_a)};
113 
114   ServiceRegistry registry_a(a_tx_channels);
115   ServiceRegistry registry_b(b_tx_channels);
116 
117   TestService test_service;
118   registry_b.RegisterService(test_service);
119 
120   TestLocalEgress local_egress_a;
121   local_egress_a.set_registry(registry_a);
122 
123   TestLocalEgress local_egress_b;
124   local_egress_b.set_registry(registry_b);
125 
126   std::array a_rx_channels = {
127       ChannelEgress{kChannelAtoB, local_egress_a},
128   };
129   std::array b_rx_channels = {
130       ChannelEgress{kChannelAtoB, local_egress_b},
131   };
132 
133   SimpleRpcIngress<kMaxPacketSize> ingress_a(a_rx_channels);
134   SimpleRpcIngress<kMaxPacketSize> ingress_b(b_rx_channels);
135 
136   auto client =
137       registry_a
138           .CreateClient<pw_rpc_transport::testing::pw_rpc::pwpb::TestService>(
139               kChannelAtoB);
140 
141   sync::ThreadNotification receiver1_done;
142   sync::ThreadNotification receiver2_done;
143 
144   struct ReceiverState {
145     InlineString<kMaxMessageLength> message;
146     sync::ThreadNotification done;
147   };
148 
149   ReceiverState receiver1;
150   ReceiverState receiver2;
151   receiver1.message.append(2 * transport_a_to_b.MaximumTransmissionUnit(), '*');
152   receiver2.message.append(2 * transport_b_to_a.MaximumTransmissionUnit(), '>');
153 
154   auto call1 = client.Echo(
155       {.msg = receiver1.message},
156       [&receiver1](
157           const pw_rpc_transport::testing::pwpb::EchoMessage::Message& response,
158           Status status) {
159         EXPECT_EQ(status, OkStatus());
160         EXPECT_EQ(response.msg, receiver1.message);
161         receiver1.done.release();
162       },
163       [&receiver1](Status status) {
164         EXPECT_EQ(status, OkStatus());
165         receiver1.done.release();
166       });
167 
168   auto call2 = client.Echo(
169       {.msg = receiver2.message},
170       [&receiver2](
171           const pw_rpc_transport::testing::pwpb::EchoMessage::Message& response,
172           Status status) {
173         EXPECT_EQ(status, OkStatus());
174         EXPECT_EQ(response.msg, receiver2.message);
175         receiver2.done.release();
176       },
177       [&receiver2](Status status) {
178         EXPECT_EQ(status, OkStatus());
179         receiver2.done.release();
180       });
181 
182   // Calling `ingress_b.ProcessIncomingData` reads all packets from the
183   // transport and dispatches them according to the ingress configuration.
184   // Dispatching a packet generates a reply message: we then read it back at the
185   // sender by calling `ingress_a.ProcessIncomingData`.
186   EXPECT_EQ(ingress_b.ProcessIncomingData(transport_a_to_b.buffer()),
187             OkStatus());
188   EXPECT_EQ(ingress_a.ProcessIncomingData(transport_b_to_a.buffer()),
189             OkStatus());
190 
191   receiver1.done.acquire();
192   receiver2.done.acquire();
193 }
194 
TEST(RpcEgressIngressTest,HdlcFramingRoundtrip)195 TEST(RpcEgressIngressTest, HdlcFramingRoundtrip) {
196   constexpr uint32_t kChannelAtoB = 1;
197   constexpr size_t kMaxMessageLength = 200;
198   constexpr size_t kAtoBMtu = 33;
199   constexpr size_t kBtoAMtu = 72;
200 
201   TestTransport transport_a_to_b(kAtoBMtu);
202   TestTransport transport_b_to_a(kBtoAMtu);
203 
204   HdlcRpcEgress<kMaxPacketSize> egress_a_to_b("a->b", transport_a_to_b);
205   HdlcRpcEgress<kMaxPacketSize> egress_b_to_a("b->a", transport_b_to_a);
206 
207   std::array a_tx_channels = {
208       rpc::Channel::Create<kChannelAtoB>(&egress_a_to_b)};
209   std::array b_tx_channels = {
210       rpc::Channel::Create<kChannelAtoB>(&egress_b_to_a)};
211 
212   ServiceRegistry registry_a(a_tx_channels);
213   ServiceRegistry registry_b(b_tx_channels);
214 
215   TestService test_service;
216   registry_b.RegisterService(test_service);
217 
218   TestLocalEgress local_egress_a;
219   local_egress_a.set_registry(registry_a);
220 
221   TestLocalEgress local_egress_b;
222   local_egress_b.set_registry(registry_b);
223 
224   std::array a_rx_channels = {
225       ChannelEgress{kChannelAtoB, local_egress_a},
226   };
227   std::array b_rx_channels = {
228       ChannelEgress{kChannelAtoB, local_egress_b},
229   };
230 
231   HdlcRpcIngress<kMaxPacketSize> ingress_a(a_rx_channels);
232   HdlcRpcIngress<kMaxPacketSize> ingress_b(b_rx_channels);
233 
234   auto client =
235       registry_a
236           .CreateClient<pw_rpc_transport::testing::pw_rpc::pwpb::TestService>(
237               kChannelAtoB);
238 
239   sync::ThreadNotification receiver1_done;
240   sync::ThreadNotification receiver2_done;
241 
242   struct ReceiverState {
243     InlineString<kMaxMessageLength> message;
244     sync::ThreadNotification done;
245   };
246 
247   ReceiverState receiver1;
248   ReceiverState receiver2;
249   receiver1.message.append(2 * transport_a_to_b.MaximumTransmissionUnit(), '*');
250   receiver2.message.append(2 * transport_b_to_a.MaximumTransmissionUnit(), '>');
251 
252   auto call1 = client.Echo(
253       {.msg = receiver1.message},
254       [&receiver1](
255           const pw_rpc_transport::testing::pwpb::EchoMessage::Message& response,
256           Status status) {
257         EXPECT_EQ(status, OkStatus());
258         EXPECT_EQ(response.msg, receiver1.message);
259         receiver1.done.release();
260       },
261       [&receiver1](Status status) {
262         EXPECT_EQ(status, OkStatus());
263         receiver1.done.release();
264       });
265 
266   auto call2 = client.Echo(
267       {.msg = receiver2.message},
268       [&receiver2](
269           const pw_rpc_transport::testing::pwpb::EchoMessage::Message& response,
270           Status status) {
271         EXPECT_EQ(status, OkStatus());
272         EXPECT_EQ(response.msg, receiver2.message);
273         receiver2.done.release();
274       },
275       [&receiver2](Status status) {
276         EXPECT_EQ(status, OkStatus());
277         receiver2.done.release();
278       });
279 
280   // Calling `ingress_b.ProcessIncomingData` reads all packets from the
281   // transport and dispatches them according to the ingress configuration.
282   // Dispatching a packet generates a reply message: we then read it back at the
283   // sender by calling `ingress_a.ProcessIncomingData`.
284   EXPECT_EQ(ingress_b.ProcessIncomingData(transport_a_to_b.buffer()),
285             OkStatus());
286   EXPECT_EQ(ingress_a.ProcessIncomingData(transport_b_to_a.buffer()),
287             OkStatus());
288 
289   receiver1.done.acquire();
290   receiver2.done.acquire();
291 }
292 
TEST(RpcEgressIngressTest,MalformedRpcPacket)293 TEST(RpcEgressIngressTest, MalformedRpcPacket) {
294   constexpr uint32_t kTestChannel = 1;
295   constexpr size_t kMtu = 33;
296   std::vector<std::byte> kMalformedPacket = {std::byte{0x42}, std::byte{0x74}};
297 
298   TestTransport transport(kMtu);
299   SimpleRpcEgress<kMaxPacketSize> egress("test", transport);
300 
301   TestLocalEgress local_egress;
302   std::array rx_channels = {
303       ChannelEgress{kTestChannel, local_egress},
304   };
305 
306   SimpleRpcIngress<kMaxPacketSize> ingress(rx_channels);
307 
308   EXPECT_EQ(egress.Send(kMalformedPacket), OkStatus());
309   EXPECT_EQ(ingress.ProcessIncomingData(transport.buffer()), OkStatus());
310 
311   EXPECT_EQ(ingress.num_bad_packets(), 1u);
312   EXPECT_EQ(ingress.num_overflow_channel_ids(), 0u);
313   EXPECT_EQ(ingress.num_missing_egresses(), 0u);
314   EXPECT_EQ(ingress.num_egress_errors(), 0u);
315 }
316 
TEST(RpcEgressIngressTest,ChannelIdOverflow)317 TEST(RpcEgressIngressTest, ChannelIdOverflow) {
318   constexpr uint32_t kInvalidChannelId = 65;
319   constexpr size_t kMtu = 128;
320 
321   TestTransport transport(kMtu);
322   SimpleRpcEgress<kMaxPacketSize> egress("test", transport);
323 
324   std::array sender_tx_channels = {
325       rpc::Channel::Create<kInvalidChannelId>(&egress)};
326 
327   ServiceRegistry registry(sender_tx_channels);
328   auto client =
329       registry
330           .CreateClient<pw_rpc_transport::testing::pw_rpc::pwpb::TestService>(
331               kInvalidChannelId);
332 
333   SimpleRpcIngress<kMaxPacketSize> ingress;
334 
335   auto receiver = client.Echo({.msg = "test"});
336 
337   EXPECT_EQ(ingress.ProcessIncomingData(transport.buffer()), OkStatus());
338 
339   EXPECT_EQ(ingress.num_bad_packets(), 0u);
340   EXPECT_EQ(ingress.num_overflow_channel_ids(), 1u);
341   EXPECT_EQ(ingress.num_missing_egresses(), 0u);
342   EXPECT_EQ(ingress.num_egress_errors(), 0u);
343 }
344 
TEST(RpcEgressIngressTest,MissingEgressForIncomingPacket)345 TEST(RpcEgressIngressTest, MissingEgressForIncomingPacket) {
346   constexpr uint32_t kChannelA = 22;
347   constexpr uint32_t kChannelB = 33;
348   constexpr size_t kMtu = 128;
349 
350   TestTransport transport(kMtu);
351   SimpleRpcEgress<kMaxPacketSize> egress("test", transport);
352 
353   std::array sender_tx_channels = {rpc::Channel::Create<kChannelA>(&egress)};
354 
355   ServiceRegistry registry(sender_tx_channels);
356   auto client =
357       registry
358           .CreateClient<pw_rpc_transport::testing::pw_rpc::pwpb::TestService>(
359               kChannelA);
360 
361   std::array ingress_channels = {ChannelEgress(kChannelB, egress)};
362   SimpleRpcIngress<kMaxPacketSize> ingress(ingress_channels);
363 
364   auto receiver = client.Echo({.msg = "test"});
365 
366   EXPECT_EQ(ingress.ProcessIncomingData(transport.buffer()), OkStatus());
367 
368   EXPECT_EQ(ingress.num_bad_packets(), 0u);
369   EXPECT_EQ(ingress.num_overflow_channel_ids(), 0u);
370   EXPECT_EQ(ingress.num_missing_egresses(), 1u);
371   EXPECT_EQ(ingress.num_egress_errors(), 0u);
372 }
373 
TEST(RpcEgressIngressTest,EgressSendFailureForIncomingPacket)374 TEST(RpcEgressIngressTest, EgressSendFailureForIncomingPacket) {
375   constexpr uint32_t kChannelId = 22;
376   constexpr size_t kMtu = 128;
377 
378   TestTransport good_transport(kMtu, /*is_faulty=*/false);
379   TestTransport bad_transport(kMtu, /*is_faulty=*/true);
380   SimpleRpcEgress<kMaxPacketSize> good_egress("test", good_transport);
381   SimpleRpcEgress<kMaxPacketSize> bad_egress("test", bad_transport);
382 
383   std::array sender_tx_channels = {
384       rpc::Channel::Create<kChannelId>(&good_egress)};
385 
386   ServiceRegistry registry(sender_tx_channels);
387   auto client =
388       registry
389           .CreateClient<pw_rpc_transport::testing::pw_rpc::pwpb::TestService>(
390               kChannelId);
391 
392   std::array ingress_channels = {ChannelEgress(kChannelId, bad_egress)};
393   SimpleRpcIngress<kMaxPacketSize> ingress(ingress_channels);
394 
395   auto receiver = client.Echo({.msg = "test"});
396 
397   EXPECT_EQ(ingress.ProcessIncomingData(good_transport.buffer()), OkStatus());
398 
399   EXPECT_EQ(ingress.num_bad_packets(), 0u);
400   EXPECT_EQ(ingress.num_overflow_channel_ids(), 0u);
401   EXPECT_EQ(ingress.num_missing_egresses(), 0u);
402   EXPECT_EQ(ingress.num_egress_errors(), 1u);
403 }
404 
405 }  // namespace
406 }  // namespace pw::rpc
407