• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2023 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_rpc_transport/egress_ingress.h"
16 
17 #include <random>
18 
19 #include "pw_bytes/span.h"
20 #include "pw_metric/metric.h"
21 #include "pw_rpc/client_server.h"
22 #include "pw_rpc/packet_meta.h"
23 #include "pw_rpc_transport/hdlc_framing.h"
24 #include "pw_rpc_transport/internal/test.rpc.pwpb.h"
25 #include "pw_rpc_transport/rpc_transport.h"
26 #include "pw_rpc_transport/service_registry.h"
27 #include "pw_rpc_transport/simple_framing.h"
28 #include "pw_status/status.h"
29 #include "pw_string/string.h"
30 #include "pw_sync/thread_notification.h"
31 #include "pw_unit_test/framework.h"
32 
33 namespace pw::rpc {
34 namespace {
35 
36 constexpr size_t kMaxPacketSize = 256;
37 
38 class TestService final
39     : public pw_rpc_transport::testing::pw_rpc::pwpb::TestService::Service<
40           TestService> {
41  public:
Echo(const pw_rpc_transport::testing::pwpb::EchoMessage::Message & request,pw_rpc_transport::testing::pwpb::EchoMessage::Message & response)42   Status Echo(
43       const pw_rpc_transport::testing::pwpb::EchoMessage::Message& request,
44       pw_rpc_transport::testing::pwpb::EchoMessage::Message& response) {
45     response.msg = request.msg;
46     return OkStatus();
47   }
48 };
49 
50 // A transport that stores all received frames so they can be manually retrieved
51 // by the ingress later.
52 class TestTransport : public RpcFrameSender {
53  public:
TestTransport(size_t mtu,bool is_faulty=false)54   explicit TestTransport(size_t mtu, bool is_faulty = false)
55       : mtu_(mtu), is_faulty_(is_faulty) {}
56 
MaximumTransmissionUnit() const57   size_t MaximumTransmissionUnit() const override { return mtu_; }
58 
Send(RpcFrame frame)59   Status Send(RpcFrame frame) override {
60     if (is_faulty_) {
61       return Status::Internal();
62     }
63     std::copy(
64         frame.header.begin(), frame.header.end(), std::back_inserter(buffer_));
65     std::copy(frame.payload.begin(),
66               frame.payload.end(),
67               std::back_inserter(buffer_));
68     return OkStatus();
69   }
70 
buffer()71   ByteSpan buffer() { return buffer_; }
72 
73  private:
74   size_t mtu_;
75   bool is_faulty_ = false;
76   std::vector<std::byte> buffer_;
77 };
78 
79 // An egress handler that passes the received RPC packet to the service
80 // registry.
81 class TestLocalEgress : public RpcEgressHandler {
82  public:
SendRpcPacket(ConstByteSpan packet)83   Status SendRpcPacket(ConstByteSpan packet) override {
84     if (!registry_) {
85       return Status::FailedPrecondition();
86     }
87     return registry_->ProcessRpcPacket(packet);
88   }
89 
set_registry(ServiceRegistry & registry)90   void set_registry(ServiceRegistry& registry) { registry_ = &registry; }
91 
92  private:
93   ServiceRegistry* registry_ = nullptr;
94 };
95 
TEST(RpcEgressIngressTest,SimpleFramingRoundtrip)96 TEST(RpcEgressIngressTest, SimpleFramingRoundtrip) {
97   constexpr uint32_t kChannelAtoB = 1;
98   constexpr size_t kMaxMessageLength = 200;
99   constexpr size_t kAtoBMtu = 33;
100   constexpr size_t kBtoAMtu = 72;
101 
102   TestTransport transport_a_to_b(kAtoBMtu);
103   TestTransport transport_b_to_a(kBtoAMtu);
104 
105   SimpleRpcEgress<kMaxPacketSize> egress_a_to_b("a->b", transport_a_to_b);
106   SimpleRpcEgress<kMaxPacketSize> egress_b_to_a("b->a", transport_b_to_a);
107 
108   std::array a_tx_channels = {
109       rpc::Channel::Create<kChannelAtoB>(&egress_a_to_b)};
110   std::array b_tx_channels = {
111       rpc::Channel::Create<kChannelAtoB>(&egress_b_to_a)};
112 
113   ServiceRegistry registry_a(a_tx_channels);
114   ServiceRegistry registry_b(b_tx_channels);
115 
116   TestService test_service;
117   registry_b.RegisterService(test_service);
118 
119   TestLocalEgress local_egress_a;
120   local_egress_a.set_registry(registry_a);
121 
122   TestLocalEgress local_egress_b;
123   local_egress_b.set_registry(registry_b);
124 
125   std::array a_rx_channels = {
126       ChannelEgress{kChannelAtoB, local_egress_a},
127   };
128   std::array b_rx_channels = {
129       ChannelEgress{kChannelAtoB, local_egress_b},
130   };
131 
132   SimpleRpcIngress<kMaxPacketSize> ingress_a(a_rx_channels);
133   SimpleRpcIngress<kMaxPacketSize> ingress_b(b_rx_channels);
134 
135   auto client =
136       registry_a
137           .CreateClient<pw_rpc_transport::testing::pw_rpc::pwpb::TestService>(
138               kChannelAtoB);
139 
140   sync::ThreadNotification receiver1_done;
141   sync::ThreadNotification receiver2_done;
142 
143   struct ReceiverState {
144     InlineString<kMaxMessageLength> message;
145     sync::ThreadNotification done;
146   };
147 
148   ReceiverState receiver1;
149   ReceiverState receiver2;
150   receiver1.message.append(2 * transport_a_to_b.MaximumTransmissionUnit(), '*');
151   receiver2.message.append(2 * transport_b_to_a.MaximumTransmissionUnit(), '>');
152 
153   auto call1 = client.Echo(
154       {.msg = receiver1.message},
155       [&receiver1](
156           const pw_rpc_transport::testing::pwpb::EchoMessage::Message& response,
157           Status status) {
158         EXPECT_EQ(status, OkStatus());
159         EXPECT_EQ(response.msg, receiver1.message);
160         receiver1.done.release();
161       },
162       [&receiver1](Status status) {
163         EXPECT_EQ(status, OkStatus());
164         receiver1.done.release();
165       });
166 
167   auto call2 = client.Echo(
168       {.msg = receiver2.message},
169       [&receiver2](
170           const pw_rpc_transport::testing::pwpb::EchoMessage::Message& response,
171           Status status) {
172         EXPECT_EQ(status, OkStatus());
173         EXPECT_EQ(response.msg, receiver2.message);
174         receiver2.done.release();
175       },
176       [&receiver2](Status status) {
177         EXPECT_EQ(status, OkStatus());
178         receiver2.done.release();
179       });
180 
181   // Calling `ingress_b.ProcessIncomingData` reads all packets from the
182   // transport and dispatches them according to the ingress configuration.
183   // Dispatching a packet generates a reply message: we then read it back at the
184   // sender by calling `ingress_a.ProcessIncomingData`.
185   EXPECT_EQ(ingress_b.ProcessIncomingData(transport_a_to_b.buffer()),
186             OkStatus());
187   EXPECT_EQ(ingress_a.ProcessIncomingData(transport_b_to_a.buffer()),
188             OkStatus());
189 
190   receiver1.done.acquire();
191   receiver2.done.acquire();
192 }
193 
TEST(RpcEgressIngressTest,HdlcFramingRoundtrip)194 TEST(RpcEgressIngressTest, HdlcFramingRoundtrip) {
195   constexpr uint32_t kChannelAtoB = 1;
196   constexpr size_t kMaxMessageLength = 200;
197   constexpr size_t kAtoBMtu = 33;
198   constexpr size_t kBtoAMtu = 72;
199 
200   TestTransport transport_a_to_b(kAtoBMtu);
201   TestTransport transport_b_to_a(kBtoAMtu);
202 
203   HdlcRpcEgress<kMaxPacketSize> egress_a_to_b("a->b", transport_a_to_b);
204   HdlcRpcEgress<kMaxPacketSize> egress_b_to_a("b->a", transport_b_to_a);
205 
206   std::array a_tx_channels = {
207       rpc::Channel::Create<kChannelAtoB>(&egress_a_to_b)};
208   std::array b_tx_channels = {
209       rpc::Channel::Create<kChannelAtoB>(&egress_b_to_a)};
210 
211   ServiceRegistry registry_a(a_tx_channels);
212   ServiceRegistry registry_b(b_tx_channels);
213 
214   TestService test_service;
215   registry_b.RegisterService(test_service);
216 
217   TestLocalEgress local_egress_a;
218   local_egress_a.set_registry(registry_a);
219 
220   TestLocalEgress local_egress_b;
221   local_egress_b.set_registry(registry_b);
222 
223   std::array a_rx_channels = {
224       ChannelEgress{kChannelAtoB, local_egress_a},
225   };
226   std::array b_rx_channels = {
227       ChannelEgress{kChannelAtoB, local_egress_b},
228   };
229 
230   HdlcRpcIngress<kMaxPacketSize> ingress_a(a_rx_channels);
231   HdlcRpcIngress<kMaxPacketSize> ingress_b(b_rx_channels);
232 
233   auto client =
234       registry_a
235           .CreateClient<pw_rpc_transport::testing::pw_rpc::pwpb::TestService>(
236               kChannelAtoB);
237 
238   sync::ThreadNotification receiver1_done;
239   sync::ThreadNotification receiver2_done;
240 
241   struct ReceiverState {
242     InlineString<kMaxMessageLength> message;
243     sync::ThreadNotification done;
244   };
245 
246   ReceiverState receiver1;
247   ReceiverState receiver2;
248   receiver1.message.append(2 * transport_a_to_b.MaximumTransmissionUnit(), '*');
249   receiver2.message.append(2 * transport_b_to_a.MaximumTransmissionUnit(), '>');
250 
251   auto call1 = client.Echo(
252       {.msg = receiver1.message},
253       [&receiver1](
254           const pw_rpc_transport::testing::pwpb::EchoMessage::Message& response,
255           Status status) {
256         EXPECT_EQ(status, OkStatus());
257         EXPECT_EQ(response.msg, receiver1.message);
258         receiver1.done.release();
259       },
260       [&receiver1](Status status) {
261         EXPECT_EQ(status, OkStatus());
262         receiver1.done.release();
263       });
264 
265   auto call2 = client.Echo(
266       {.msg = receiver2.message},
267       [&receiver2](
268           const pw_rpc_transport::testing::pwpb::EchoMessage::Message& response,
269           Status status) {
270         EXPECT_EQ(status, OkStatus());
271         EXPECT_EQ(response.msg, receiver2.message);
272         receiver2.done.release();
273       },
274       [&receiver2](Status status) {
275         EXPECT_EQ(status, OkStatus());
276         receiver2.done.release();
277       });
278 
279   // Calling `ingress_b.ProcessIncomingData` reads all packets from the
280   // transport and dispatches them according to the ingress configuration.
281   // Dispatching a packet generates a reply message: we then read it back at the
282   // sender by calling `ingress_a.ProcessIncomingData`.
283   EXPECT_EQ(ingress_b.ProcessIncomingData(transport_a_to_b.buffer()),
284             OkStatus());
285   EXPECT_EQ(ingress_a.ProcessIncomingData(transport_b_to_a.buffer()),
286             OkStatus());
287   EXPECT_EQ(ingress_a.num_total_packets(), 2u);
288   EXPECT_EQ(ingress_b.num_total_packets(), 2u);
289 
290   receiver1.done.acquire();
291   receiver2.done.acquire();
292 }
293 
TEST(RpcEgressIngressTest,MalformedRpcPacket)294 TEST(RpcEgressIngressTest, MalformedRpcPacket) {
295   constexpr uint32_t kTestChannel = 1;
296   constexpr size_t kMtu = 33;
297   std::vector<std::byte> kMalformedPacket = {std::byte{0x42}, std::byte{0x74}};
298 
299   TestTransport transport(kMtu);
300   SimpleRpcEgress<kMaxPacketSize> egress("test", transport);
301 
302   TestLocalEgress local_egress;
303   std::array rx_channels = {
304       ChannelEgress{kTestChannel, local_egress},
305   };
306 
307   SimpleRpcIngress<kMaxPacketSize> ingress(rx_channels);
308 
309   EXPECT_EQ(egress.Send(kMalformedPacket), OkStatus());
310   EXPECT_EQ(ingress.ProcessIncomingData(transport.buffer()), OkStatus());
311 
312   EXPECT_EQ(ingress.num_total_packets(), 1u);
313   EXPECT_EQ(ingress.num_bad_packets(), 1u);
314   EXPECT_EQ(ingress.num_overflow_channel_ids(), 0u);
315   EXPECT_EQ(ingress.num_missing_egresses(), 0u);
316   EXPECT_EQ(ingress.num_egress_errors(), 0u);
317 }
318 
TEST(RpcEgressIngressTest,ChannelIdOverflow)319 TEST(RpcEgressIngressTest, ChannelIdOverflow) {
320   constexpr uint32_t kInvalidChannelId = 65;
321   constexpr size_t kMtu = 128;
322 
323   TestTransport transport(kMtu);
324   SimpleRpcEgress<kMaxPacketSize> egress("test", transport);
325 
326   std::array sender_tx_channels = {
327       rpc::Channel::Create<kInvalidChannelId>(&egress)};
328 
329   ServiceRegistry registry(sender_tx_channels);
330   auto client =
331       registry
332           .CreateClient<pw_rpc_transport::testing::pw_rpc::pwpb::TestService>(
333               kInvalidChannelId);
334 
335   SimpleRpcIngress<kMaxPacketSize> ingress;
336 
337   auto receiver = client.Echo({.msg = "test"});
338 
339   EXPECT_EQ(ingress.ProcessIncomingData(transport.buffer()), OkStatus());
340 
341   EXPECT_EQ(ingress.num_total_packets(), 1u);
342   EXPECT_EQ(ingress.num_bad_packets(), 0u);
343   EXPECT_EQ(ingress.num_overflow_channel_ids(), 1u);
344   EXPECT_EQ(ingress.num_missing_egresses(), 0u);
345   EXPECT_EQ(ingress.num_egress_errors(), 0u);
346 }
347 
TEST(RpcEgressIngressTest,MissingEgressForIncomingPacket)348 TEST(RpcEgressIngressTest, MissingEgressForIncomingPacket) {
349   constexpr uint32_t kChannelA = 22;
350   constexpr uint32_t kChannelB = 33;
351   constexpr size_t kMtu = 128;
352 
353   TestTransport transport(kMtu);
354   SimpleRpcEgress<kMaxPacketSize> egress("test", transport);
355 
356   std::array sender_tx_channels = {rpc::Channel::Create<kChannelA>(&egress)};
357 
358   ServiceRegistry registry(sender_tx_channels);
359   auto client =
360       registry
361           .CreateClient<pw_rpc_transport::testing::pw_rpc::pwpb::TestService>(
362               kChannelA);
363 
364   std::array ingress_channels = {ChannelEgress(kChannelB, egress)};
365   SimpleRpcIngress<kMaxPacketSize> ingress(ingress_channels);
366 
367   auto receiver = client.Echo({.msg = "test"});
368 
369   EXPECT_EQ(ingress.ProcessIncomingData(transport.buffer()), OkStatus());
370 
371   EXPECT_EQ(ingress.num_total_packets(), 1u);
372   EXPECT_EQ(ingress.num_bad_packets(), 0u);
373   EXPECT_EQ(ingress.num_overflow_channel_ids(), 0u);
374   EXPECT_EQ(ingress.num_missing_egresses(), 1u);
375   EXPECT_EQ(ingress.num_egress_errors(), 0u);
376 }
377 
TEST(RpcEgressIngressTest,EgressSendFailureForIncomingPacket)378 TEST(RpcEgressIngressTest, EgressSendFailureForIncomingPacket) {
379   constexpr uint32_t kChannelId = 22;
380   constexpr size_t kMtu = 128;
381 
382   TestTransport good_transport(kMtu, /*is_faulty=*/false);
383   TestTransport bad_transport(kMtu, /*is_faulty=*/true);
384   SimpleRpcEgress<kMaxPacketSize> good_egress("test", good_transport);
385   SimpleRpcEgress<kMaxPacketSize> bad_egress("test", bad_transport);
386 
387   std::array sender_tx_channels = {
388       rpc::Channel::Create<kChannelId>(&good_egress)};
389 
390   ServiceRegistry registry(sender_tx_channels);
391   auto client =
392       registry
393           .CreateClient<pw_rpc_transport::testing::pw_rpc::pwpb::TestService>(
394               kChannelId);
395 
396   std::array ingress_channels = {ChannelEgress(kChannelId, bad_egress)};
397   SimpleRpcIngress<kMaxPacketSize> ingress(ingress_channels);
398 
399   auto receiver = client.Echo({.msg = "test"});
400 
401   EXPECT_EQ(ingress.ProcessIncomingData(good_transport.buffer()), OkStatus());
402 
403   EXPECT_EQ(ingress.num_total_packets(), 1u);
404   EXPECT_EQ(ingress.num_bad_packets(), 0u);
405   EXPECT_EQ(ingress.num_overflow_channel_ids(), 0u);
406   EXPECT_EQ(ingress.num_missing_egresses(), 0u);
407   EXPECT_EQ(ingress.num_egress_errors(), 1u);
408 }
409 
410 }  // namespace
411 }  // namespace pw::rpc
412