1 // Copyright 2023 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_rpc_transport/egress_ingress.h"
16
17 #include <random>
18
19 #include "pw_bytes/span.h"
20 #include "pw_metric/metric.h"
21 #include "pw_rpc/client_server.h"
22 #include "pw_rpc/packet_meta.h"
23 #include "pw_rpc_transport/hdlc_framing.h"
24 #include "pw_rpc_transport/internal/test.rpc.pwpb.h"
25 #include "pw_rpc_transport/rpc_transport.h"
26 #include "pw_rpc_transport/service_registry.h"
27 #include "pw_rpc_transport/simple_framing.h"
28 #include "pw_status/status.h"
29 #include "pw_string/string.h"
30 #include "pw_sync/thread_notification.h"
31 #include "pw_unit_test/framework.h"
32
33 namespace pw::rpc {
34 namespace {
35
36 constexpr size_t kMaxPacketSize = 256;
37
38 class TestService final
39 : public pw_rpc_transport::testing::pw_rpc::pwpb::TestService::Service<
40 TestService> {
41 public:
Echo(const pw_rpc_transport::testing::pwpb::EchoMessage::Message & request,pw_rpc_transport::testing::pwpb::EchoMessage::Message & response)42 Status Echo(
43 const pw_rpc_transport::testing::pwpb::EchoMessage::Message& request,
44 pw_rpc_transport::testing::pwpb::EchoMessage::Message& response) {
45 response.msg = request.msg;
46 return OkStatus();
47 }
48 };
49
50 // A transport that stores all received frames so they can be manually retrieved
51 // by the ingress later.
52 class TestTransport : public RpcFrameSender {
53 public:
TestTransport(size_t mtu,bool is_faulty=false)54 explicit TestTransport(size_t mtu, bool is_faulty = false)
55 : mtu_(mtu), is_faulty_(is_faulty) {}
56
MaximumTransmissionUnit() const57 size_t MaximumTransmissionUnit() const override { return mtu_; }
58
Send(RpcFrame frame)59 Status Send(RpcFrame frame) override {
60 if (is_faulty_) {
61 return Status::Internal();
62 }
63 std::copy(
64 frame.header.begin(), frame.header.end(), std::back_inserter(buffer_));
65 std::copy(frame.payload.begin(),
66 frame.payload.end(),
67 std::back_inserter(buffer_));
68 return OkStatus();
69 }
70
buffer()71 ByteSpan buffer() { return buffer_; }
72
73 private:
74 size_t mtu_;
75 bool is_faulty_ = false;
76 std::vector<std::byte> buffer_;
77 };
78
79 // An egress handler that passes the received RPC packet to the service
80 // registry.
81 class TestLocalEgress : public RpcEgressHandler {
82 public:
SendRpcPacket(ConstByteSpan packet)83 Status SendRpcPacket(ConstByteSpan packet) override {
84 if (!registry_) {
85 return Status::FailedPrecondition();
86 }
87 return registry_->ProcessRpcPacket(packet);
88 }
89
set_registry(ServiceRegistry & registry)90 void set_registry(ServiceRegistry& registry) { registry_ = ®istry; }
91
92 private:
93 ServiceRegistry* registry_ = nullptr;
94 };
95
TEST(RpcEgressIngressTest,SimpleFramingRoundtrip)96 TEST(RpcEgressIngressTest, SimpleFramingRoundtrip) {
97 constexpr uint32_t kChannelAtoB = 1;
98 constexpr size_t kMaxMessageLength = 200;
99 constexpr size_t kAtoBMtu = 33;
100 constexpr size_t kBtoAMtu = 72;
101
102 TestTransport transport_a_to_b(kAtoBMtu);
103 TestTransport transport_b_to_a(kBtoAMtu);
104
105 SimpleRpcEgress<kMaxPacketSize> egress_a_to_b("a->b", transport_a_to_b);
106 SimpleRpcEgress<kMaxPacketSize> egress_b_to_a("b->a", transport_b_to_a);
107
108 std::array a_tx_channels = {
109 rpc::Channel::Create<kChannelAtoB>(&egress_a_to_b)};
110 std::array b_tx_channels = {
111 rpc::Channel::Create<kChannelAtoB>(&egress_b_to_a)};
112
113 ServiceRegistry registry_a(a_tx_channels);
114 ServiceRegistry registry_b(b_tx_channels);
115
116 TestService test_service;
117 registry_b.RegisterService(test_service);
118
119 TestLocalEgress local_egress_a;
120 local_egress_a.set_registry(registry_a);
121
122 TestLocalEgress local_egress_b;
123 local_egress_b.set_registry(registry_b);
124
125 std::array a_rx_channels = {
126 ChannelEgress{kChannelAtoB, local_egress_a},
127 };
128 std::array b_rx_channels = {
129 ChannelEgress{kChannelAtoB, local_egress_b},
130 };
131
132 SimpleRpcIngress<kMaxPacketSize> ingress_a(a_rx_channels);
133 SimpleRpcIngress<kMaxPacketSize> ingress_b(b_rx_channels);
134
135 auto client =
136 registry_a
137 .CreateClient<pw_rpc_transport::testing::pw_rpc::pwpb::TestService>(
138 kChannelAtoB);
139
140 sync::ThreadNotification receiver1_done;
141 sync::ThreadNotification receiver2_done;
142
143 struct ReceiverState {
144 InlineString<kMaxMessageLength> message;
145 sync::ThreadNotification done;
146 };
147
148 ReceiverState receiver1;
149 ReceiverState receiver2;
150 receiver1.message.append(2 * transport_a_to_b.MaximumTransmissionUnit(), '*');
151 receiver2.message.append(2 * transport_b_to_a.MaximumTransmissionUnit(), '>');
152
153 auto call1 = client.Echo(
154 {.msg = receiver1.message},
155 [&receiver1](
156 const pw_rpc_transport::testing::pwpb::EchoMessage::Message& response,
157 Status status) {
158 EXPECT_EQ(status, OkStatus());
159 EXPECT_EQ(response.msg, receiver1.message);
160 receiver1.done.release();
161 },
162 [&receiver1](Status status) {
163 EXPECT_EQ(status, OkStatus());
164 receiver1.done.release();
165 });
166
167 auto call2 = client.Echo(
168 {.msg = receiver2.message},
169 [&receiver2](
170 const pw_rpc_transport::testing::pwpb::EchoMessage::Message& response,
171 Status status) {
172 EXPECT_EQ(status, OkStatus());
173 EXPECT_EQ(response.msg, receiver2.message);
174 receiver2.done.release();
175 },
176 [&receiver2](Status status) {
177 EXPECT_EQ(status, OkStatus());
178 receiver2.done.release();
179 });
180
181 // Calling `ingress_b.ProcessIncomingData` reads all packets from the
182 // transport and dispatches them according to the ingress configuration.
183 // Dispatching a packet generates a reply message: we then read it back at the
184 // sender by calling `ingress_a.ProcessIncomingData`.
185 EXPECT_EQ(ingress_b.ProcessIncomingData(transport_a_to_b.buffer()),
186 OkStatus());
187 EXPECT_EQ(ingress_a.ProcessIncomingData(transport_b_to_a.buffer()),
188 OkStatus());
189
190 receiver1.done.acquire();
191 receiver2.done.acquire();
192 }
193
TEST(RpcEgressIngressTest,HdlcFramingRoundtrip)194 TEST(RpcEgressIngressTest, HdlcFramingRoundtrip) {
195 constexpr uint32_t kChannelAtoB = 1;
196 constexpr size_t kMaxMessageLength = 200;
197 constexpr size_t kAtoBMtu = 33;
198 constexpr size_t kBtoAMtu = 72;
199
200 TestTransport transport_a_to_b(kAtoBMtu);
201 TestTransport transport_b_to_a(kBtoAMtu);
202
203 HdlcRpcEgress<kMaxPacketSize> egress_a_to_b("a->b", transport_a_to_b);
204 HdlcRpcEgress<kMaxPacketSize> egress_b_to_a("b->a", transport_b_to_a);
205
206 std::array a_tx_channels = {
207 rpc::Channel::Create<kChannelAtoB>(&egress_a_to_b)};
208 std::array b_tx_channels = {
209 rpc::Channel::Create<kChannelAtoB>(&egress_b_to_a)};
210
211 ServiceRegistry registry_a(a_tx_channels);
212 ServiceRegistry registry_b(b_tx_channels);
213
214 TestService test_service;
215 registry_b.RegisterService(test_service);
216
217 TestLocalEgress local_egress_a;
218 local_egress_a.set_registry(registry_a);
219
220 TestLocalEgress local_egress_b;
221 local_egress_b.set_registry(registry_b);
222
223 std::array a_rx_channels = {
224 ChannelEgress{kChannelAtoB, local_egress_a},
225 };
226 std::array b_rx_channels = {
227 ChannelEgress{kChannelAtoB, local_egress_b},
228 };
229
230 HdlcRpcIngress<kMaxPacketSize> ingress_a(a_rx_channels);
231 HdlcRpcIngress<kMaxPacketSize> ingress_b(b_rx_channels);
232
233 auto client =
234 registry_a
235 .CreateClient<pw_rpc_transport::testing::pw_rpc::pwpb::TestService>(
236 kChannelAtoB);
237
238 sync::ThreadNotification receiver1_done;
239 sync::ThreadNotification receiver2_done;
240
241 struct ReceiverState {
242 InlineString<kMaxMessageLength> message;
243 sync::ThreadNotification done;
244 };
245
246 ReceiverState receiver1;
247 ReceiverState receiver2;
248 receiver1.message.append(2 * transport_a_to_b.MaximumTransmissionUnit(), '*');
249 receiver2.message.append(2 * transport_b_to_a.MaximumTransmissionUnit(), '>');
250
251 auto call1 = client.Echo(
252 {.msg = receiver1.message},
253 [&receiver1](
254 const pw_rpc_transport::testing::pwpb::EchoMessage::Message& response,
255 Status status) {
256 EXPECT_EQ(status, OkStatus());
257 EXPECT_EQ(response.msg, receiver1.message);
258 receiver1.done.release();
259 },
260 [&receiver1](Status status) {
261 EXPECT_EQ(status, OkStatus());
262 receiver1.done.release();
263 });
264
265 auto call2 = client.Echo(
266 {.msg = receiver2.message},
267 [&receiver2](
268 const pw_rpc_transport::testing::pwpb::EchoMessage::Message& response,
269 Status status) {
270 EXPECT_EQ(status, OkStatus());
271 EXPECT_EQ(response.msg, receiver2.message);
272 receiver2.done.release();
273 },
274 [&receiver2](Status status) {
275 EXPECT_EQ(status, OkStatus());
276 receiver2.done.release();
277 });
278
279 // Calling `ingress_b.ProcessIncomingData` reads all packets from the
280 // transport and dispatches them according to the ingress configuration.
281 // Dispatching a packet generates a reply message: we then read it back at the
282 // sender by calling `ingress_a.ProcessIncomingData`.
283 EXPECT_EQ(ingress_b.ProcessIncomingData(transport_a_to_b.buffer()),
284 OkStatus());
285 EXPECT_EQ(ingress_a.ProcessIncomingData(transport_b_to_a.buffer()),
286 OkStatus());
287 EXPECT_EQ(ingress_a.num_total_packets(), 2u);
288 EXPECT_EQ(ingress_b.num_total_packets(), 2u);
289
290 receiver1.done.acquire();
291 receiver2.done.acquire();
292 }
293
TEST(RpcEgressIngressTest,MalformedRpcPacket)294 TEST(RpcEgressIngressTest, MalformedRpcPacket) {
295 constexpr uint32_t kTestChannel = 1;
296 constexpr size_t kMtu = 33;
297 std::vector<std::byte> kMalformedPacket = {std::byte{0x42}, std::byte{0x74}};
298
299 TestTransport transport(kMtu);
300 SimpleRpcEgress<kMaxPacketSize> egress("test", transport);
301
302 TestLocalEgress local_egress;
303 std::array rx_channels = {
304 ChannelEgress{kTestChannel, local_egress},
305 };
306
307 SimpleRpcIngress<kMaxPacketSize> ingress(rx_channels);
308
309 EXPECT_EQ(egress.Send(kMalformedPacket), OkStatus());
310 EXPECT_EQ(ingress.ProcessIncomingData(transport.buffer()), OkStatus());
311
312 EXPECT_EQ(ingress.num_total_packets(), 1u);
313 EXPECT_EQ(ingress.num_bad_packets(), 1u);
314 EXPECT_EQ(ingress.num_overflow_channel_ids(), 0u);
315 EXPECT_EQ(ingress.num_missing_egresses(), 0u);
316 EXPECT_EQ(ingress.num_egress_errors(), 0u);
317 }
318
TEST(RpcEgressIngressTest,ChannelIdOverflow)319 TEST(RpcEgressIngressTest, ChannelIdOverflow) {
320 constexpr uint32_t kInvalidChannelId = 65;
321 constexpr size_t kMtu = 128;
322
323 TestTransport transport(kMtu);
324 SimpleRpcEgress<kMaxPacketSize> egress("test", transport);
325
326 std::array sender_tx_channels = {
327 rpc::Channel::Create<kInvalidChannelId>(&egress)};
328
329 ServiceRegistry registry(sender_tx_channels);
330 auto client =
331 registry
332 .CreateClient<pw_rpc_transport::testing::pw_rpc::pwpb::TestService>(
333 kInvalidChannelId);
334
335 SimpleRpcIngress<kMaxPacketSize> ingress;
336
337 auto receiver = client.Echo({.msg = "test"});
338
339 EXPECT_EQ(ingress.ProcessIncomingData(transport.buffer()), OkStatus());
340
341 EXPECT_EQ(ingress.num_total_packets(), 1u);
342 EXPECT_EQ(ingress.num_bad_packets(), 0u);
343 EXPECT_EQ(ingress.num_overflow_channel_ids(), 1u);
344 EXPECT_EQ(ingress.num_missing_egresses(), 0u);
345 EXPECT_EQ(ingress.num_egress_errors(), 0u);
346 }
347
TEST(RpcEgressIngressTest,MissingEgressForIncomingPacket)348 TEST(RpcEgressIngressTest, MissingEgressForIncomingPacket) {
349 constexpr uint32_t kChannelA = 22;
350 constexpr uint32_t kChannelB = 33;
351 constexpr size_t kMtu = 128;
352
353 TestTransport transport(kMtu);
354 SimpleRpcEgress<kMaxPacketSize> egress("test", transport);
355
356 std::array sender_tx_channels = {rpc::Channel::Create<kChannelA>(&egress)};
357
358 ServiceRegistry registry(sender_tx_channels);
359 auto client =
360 registry
361 .CreateClient<pw_rpc_transport::testing::pw_rpc::pwpb::TestService>(
362 kChannelA);
363
364 std::array ingress_channels = {ChannelEgress(kChannelB, egress)};
365 SimpleRpcIngress<kMaxPacketSize> ingress(ingress_channels);
366
367 auto receiver = client.Echo({.msg = "test"});
368
369 EXPECT_EQ(ingress.ProcessIncomingData(transport.buffer()), OkStatus());
370
371 EXPECT_EQ(ingress.num_total_packets(), 1u);
372 EXPECT_EQ(ingress.num_bad_packets(), 0u);
373 EXPECT_EQ(ingress.num_overflow_channel_ids(), 0u);
374 EXPECT_EQ(ingress.num_missing_egresses(), 1u);
375 EXPECT_EQ(ingress.num_egress_errors(), 0u);
376 }
377
TEST(RpcEgressIngressTest,EgressSendFailureForIncomingPacket)378 TEST(RpcEgressIngressTest, EgressSendFailureForIncomingPacket) {
379 constexpr uint32_t kChannelId = 22;
380 constexpr size_t kMtu = 128;
381
382 TestTransport good_transport(kMtu, /*is_faulty=*/false);
383 TestTransport bad_transport(kMtu, /*is_faulty=*/true);
384 SimpleRpcEgress<kMaxPacketSize> good_egress("test", good_transport);
385 SimpleRpcEgress<kMaxPacketSize> bad_egress("test", bad_transport);
386
387 std::array sender_tx_channels = {
388 rpc::Channel::Create<kChannelId>(&good_egress)};
389
390 ServiceRegistry registry(sender_tx_channels);
391 auto client =
392 registry
393 .CreateClient<pw_rpc_transport::testing::pw_rpc::pwpb::TestService>(
394 kChannelId);
395
396 std::array ingress_channels = {ChannelEgress(kChannelId, bad_egress)};
397 SimpleRpcIngress<kMaxPacketSize> ingress(ingress_channels);
398
399 auto receiver = client.Echo({.msg = "test"});
400
401 EXPECT_EQ(ingress.ProcessIncomingData(good_transport.buffer()), OkStatus());
402
403 EXPECT_EQ(ingress.num_total_packets(), 1u);
404 EXPECT_EQ(ingress.num_bad_packets(), 0u);
405 EXPECT_EQ(ingress.num_overflow_channel_ids(), 0u);
406 EXPECT_EQ(ingress.num_missing_egresses(), 0u);
407 EXPECT_EQ(ingress.num_egress_errors(), 1u);
408 }
409
410 } // namespace
411 } // namespace pw::rpc
412