1 // Copyright 2023 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_rpc_transport/local_rpc_egress.h"
16
17 #include "pw_chrono/system_clock.h"
18 #include "pw_log/log.h"
19 #include "pw_rpc/client_server.h"
20 #include "pw_rpc/packet_meta.h"
21 #include "pw_rpc_transport/internal/test.rpc.pwpb.h"
22 #include "pw_rpc_transport/rpc_transport.h"
23 #include "pw_rpc_transport/service_registry.h"
24 #include "pw_status/status.h"
25 #include "pw_sync/counting_semaphore.h"
26 #include "pw_sync/thread_notification.h"
27 #include "pw_thread/thread.h"
28 #include "pw_thread_stl/options.h"
29 #include "pw_unit_test/framework.h"
30
31 namespace pw::rpc {
32 namespace {
33
34 using namespace std::literals::chrono_literals;
35 using namespace std::literals::string_view_literals;
36
37 const auto kTestMessage = "I hope that someone gets my message in a bottle"sv;
38
39 class TestEchoService final
40 : public pw_rpc_transport::testing::pw_rpc::pwpb::TestService::Service<
41 TestEchoService> {
42 public:
43 uint32_t msg_count = 0;
Echo(const pw_rpc_transport::testing::pwpb::EchoMessage::Message & request,pw_rpc_transport::testing::pwpb::EchoMessage::Message & response)44 Status Echo(
45 const pw_rpc_transport::testing::pwpb::EchoMessage::Message& request,
46 pw_rpc_transport::testing::pwpb::EchoMessage::Message& response) {
47 response.msg = request.msg;
48 return OkStatus();
49 }
50 };
51
52 // Test service that can be controlled from the test, e.g. the test can tell the
53 // service when it's OK to proceed. Useful for testing packet queue exhaustion.
54 class ControlledTestEchoService final
55 : public pw_rpc_transport::testing::pw_rpc::pwpb::TestService::Service<
56 ControlledTestEchoService> {
57 public:
Echo(const pw_rpc_transport::testing::pwpb::EchoMessage::Message & request,pw_rpc_transport::testing::pwpb::EchoMessage::Message & response)58 Status Echo(
59 const pw_rpc_transport::testing::pwpb::EchoMessage::Message& request,
60 pw_rpc_transport::testing::pwpb::EchoMessage::Message& response) {
61 start_.release();
62 process_.acquire();
63 response.msg = request.msg;
64 return OkStatus();
65 }
66
Wait()67 void Wait() { start_.acquire(); }
Proceed()68 void Proceed() { process_.release(); }
69
70 private:
71 sync::ThreadNotification start_;
72 sync::ThreadNotification process_;
73 };
74
75 template <size_t kPacketQueueSize, size_t kMaxPacketSize>
LocalRpcEgressTest(LocalRpcEgress<kPacketQueueSize,kMaxPacketSize> & egress,size_t kNumRequests)76 void LocalRpcEgressTest(
77 LocalRpcEgress<kPacketQueueSize, kMaxPacketSize>& egress,
78 size_t kNumRequests) {
79 constexpr uint32_t kChannelId = 1;
80
81 std::array channels = {rpc::Channel::Create<kChannelId>(&egress)};
82 ServiceRegistry registry(channels);
83
84 TestEchoService service;
85 registry.RegisterService(service);
86
87 egress.set_packet_processor(registry);
88 auto egress_thread = Thread(thread::stl::Options(), egress);
89
90 auto client =
91 registry
92 .CreateClient<pw_rpc_transport::testing::pw_rpc::pwpb::TestService>(
93 kChannelId);
94
95 std::vector<rpc::PwpbUnaryReceiver<
96 pw_rpc_transport::testing::pwpb::EchoMessage::Message>>
97 receivers;
98
99 struct State {
100 // Stash the receivers to keep the calls alive.
101 std::atomic<uint32_t> successes = 0;
102 std::atomic<uint32_t> errors = 0;
103 sync::CountingSemaphore sem;
104 } state;
105
106 receivers.reserve(kNumRequests);
107 for (size_t i = 0; i < kNumRequests; i++) {
108 receivers.push_back(client.Echo(
109 {.msg = kTestMessage},
110 [&state](const pw_rpc_transport::testing::pwpb::EchoMessage::Message&
111 response,
112 Status status) {
113 EXPECT_EQ(status, OkStatus());
114 EXPECT_EQ(response.msg, kTestMessage);
115 state.successes++;
116 state.sem.release();
117 },
118 [&state](Status) {
119 state.errors++;
120 state.sem.release();
121 }));
122 }
123
124 for (size_t i = 0; i < kNumRequests; i++) {
125 state.sem.acquire();
126 }
127
128 EXPECT_EQ(state.successes.load(), kNumRequests);
129 EXPECT_EQ(state.errors.load(), 0u);
130
131 egress.Stop();
132 egress_thread.join();
133 }
134
TEST(LocalRpcEgressTest,PacketsGetDeliveredToPacketProcessor)135 TEST(LocalRpcEgressTest, PacketsGetDeliveredToPacketProcessor) {
136 constexpr size_t kMaxPacketSize = 100;
137 constexpr size_t kNumRequests = 10;
138 // Size the queue so we don't exhaust it (we don't want this test to flake;
139 // exhaustion is tested separately).
140 constexpr size_t kPacketQueueSize = 2 * kNumRequests;
141
142 LocalRpcEgress<kPacketQueueSize, kMaxPacketSize> egress;
143 LocalRpcEgressTest(egress, kNumRequests);
144 }
145
TEST(LocalRpcEgressTest,OverridePacketFunctions)146 TEST(LocalRpcEgressTest, OverridePacketFunctions) {
147 constexpr size_t kMaxPacketSize = 100;
148 constexpr size_t kNumRequests = 10;
149 // Size the queue so we don't exhaust it (we don't want this test to flake;
150 // exhaustion is tested separately).
151 constexpr size_t kPacketQueueSize = 2 * kNumRequests;
152
153 class LocalRpcEgressWithOverrides
154 : public LocalRpcEgress<kPacketQueueSize, kMaxPacketSize> {
155 public:
156 size_t GetPacketsQueued() { return packets_queued_; }
157 size_t GetPacketsProcessed() { return packets_processed_; }
158
159 private:
160 void PacketQueued() final { packets_queued_++; }
161
162 void PacketProcessed() final { packets_processed_++; }
163
164 size_t packets_queued_ = 0;
165 size_t packets_processed_ = 0;
166 };
167 LocalRpcEgressWithOverrides egress;
168 LocalRpcEgressTest(egress, kNumRequests);
169 // Each request will create a response that will be queued up and processed as
170 // well.
171 EXPECT_EQ(egress.GetPacketsQueued(), kNumRequests * 2);
172 EXPECT_EQ(egress.GetPacketsProcessed(), kNumRequests * 2);
173 }
174
TEST(LocalRpcEgressTest,PacketQueueExhausted)175 TEST(LocalRpcEgressTest, PacketQueueExhausted) {
176 constexpr size_t kMaxPacketSize = 100;
177 constexpr size_t kPacketQueueSize = 1;
178 constexpr uint32_t kChannelId = 1;
179
180 LocalRpcEgress<kPacketQueueSize, kMaxPacketSize> egress;
181 std::array channels = {rpc::Channel::Create<kChannelId>(&egress)};
182 ServiceRegistry registry(channels);
183
184 ControlledTestEchoService service;
185 registry.RegisterService(service);
186
187 egress.set_packet_processor(registry);
188 auto egress_thread = Thread(thread::stl::Options(), egress);
189
190 auto client =
191 registry
192 .CreateClient<pw_rpc_transport::testing::pw_rpc::pwpb::TestService>(
193 kChannelId);
194
195 auto receiver = client.Echo({.msg = kTestMessage});
196 service.Wait();
197
198 // echo_call is blocked in ServiceRegistry waiting for the Proceed() call.
199 // Since there is only one packet queue buffer available at a time, other
200 // packets will get rejected with RESOURCE_EXHAUSTED error until the first
201 // one is handled.
202 EXPECT_EQ(egress.Send({}), Status::ResourceExhausted());
203 service.Proceed();
204
205 // Expecting egress to return the packet queue buffer within a reasonable
206 // amount of time; currently there is no way to explicitly synchronize on
207 // its availability, so we give it few seconds to recover.
208 auto deadline = chrono::SystemClock::now() + 5s;
209 bool egress_ok = false;
210 while (chrono::SystemClock::now() <= deadline) {
211 if (egress.Send({}).ok()) {
212 egress_ok = true;
213 break;
214 }
215 }
216
217 EXPECT_TRUE(egress_ok);
218
219 egress.Stop();
220 egress_thread.join();
221 }
222
TEST(LocalRpcEgressTest,NoPacketProcessor)223 TEST(LocalRpcEgressTest, NoPacketProcessor) {
224 constexpr size_t kPacketQueueSize = 10;
225 constexpr size_t kMaxPacketSize = 10;
226 LocalRpcEgress<kPacketQueueSize, kMaxPacketSize> egress;
227 EXPECT_EQ(egress.Send({}), Status::FailedPrecondition());
228 }
229
TEST(LocalRpcEgressTest,PacketTooBig)230 TEST(LocalRpcEgressTest, PacketTooBig) {
231 constexpr size_t kPacketQueueSize = 10;
232 constexpr size_t kMaxPacketSize = 10;
233 constexpr uint32_t kChannelId = 1;
234 LocalRpcEgress<kPacketQueueSize, kMaxPacketSize> egress;
235
236 std::array<std::byte, kMaxPacketSize + 1> packet{};
237 std::array channels = {rpc::Channel::Create<kChannelId>(&egress)};
238 ServiceRegistry registry(channels);
239 egress.set_packet_processor(registry);
240
241 EXPECT_EQ(egress.Send(packet), Status::InvalidArgument());
242 }
243
TEST(LocalRpcEgressTest,EgressStopped)244 TEST(LocalRpcEgressTest, EgressStopped) {
245 constexpr size_t kPacketQueueSize = 10;
246 constexpr size_t kMaxPacketSize = 10;
247 constexpr uint32_t kChannelId = 1;
248 LocalRpcEgress<kPacketQueueSize, kMaxPacketSize> egress;
249
250 std::array channels = {rpc::Channel::Create<kChannelId>(&egress)};
251 ServiceRegistry registry(channels);
252 egress.set_packet_processor(registry);
253
254 auto egress_thread = Thread(thread::stl::Options(), egress);
255 EXPECT_EQ(egress.Send({}), OkStatus());
256 egress.Stop();
257 EXPECT_EQ(egress.Send({}), Status::FailedPrecondition());
258
259 egress_thread.join();
260 }
261
262 } // namespace
263 } // namespace pw::rpc
264