1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include <atomic>
16
17 #include "pw_rpc/nanopb/client_server_testing_threaded.h"
18 #include "pw_rpc_test_protos/test.rpc.pb.h"
19 #include "pw_sync/binary_semaphore.h"
20 #include "pw_sync/mutex.h"
21 #include "pw_thread/non_portable_test_thread_options.h"
22 #include "pw_unit_test/framework.h"
23
24 namespace pw::rpc {
25 namespace {
26
27 using GeneratedService = ::pw::rpc::test::pw_rpc::nanopb::TestService;
28
29 class TestService final : public GeneratedService::Service<TestService> {
30 public:
TestUnaryRpc(const pw_rpc_test_TestRequest & request,pw_rpc_test_TestResponse & response)31 Status TestUnaryRpc(const pw_rpc_test_TestRequest& request,
32 pw_rpc_test_TestResponse& response) {
33 response.value = request.integer + 1;
34 return static_cast<Status::Code>(request.status_code);
35 }
36
TestAnotherUnaryRpc(const pw_rpc_test_TestRequest & request,pw_rpc_test_TestResponse & response)37 Status TestAnotherUnaryRpc(const pw_rpc_test_TestRequest& request,
38 pw_rpc_test_TestResponse& response) {
39 using ArgType = std::array<uint32_t, 3>;
40 // The values array needs to be kept in memory until after this method call
41 // returns since the response is not encoded until after returning from this
42 // method.
43 static const ArgType values = {7, 8, 9};
44 response.repeated_field.funcs.encode = +[](pb_ostream_t* stream,
45 const pb_field_t* field,
46 void* const* arg) -> bool {
47 // Note: nanopb passes the pointer to the repeated_filed.arg member as
48 // arg, not its contents.
49 for (auto elem : *static_cast<const ArgType*>(*arg)) {
50 if (!pb_encode_tag_for_field(stream, field) ||
51 !pb_encode_varint(stream, elem))
52 return false;
53 }
54 return true;
55 };
56 response.repeated_field.arg = const_cast<ArgType*>(&values);
57 return static_cast<Status::Code>(request.status_code);
58 }
59
TestServerStreamRpc(const pw_rpc_test_TestRequest &,ServerWriter<pw_rpc_test_TestStreamResponse> &)60 static void TestServerStreamRpc(
61 const pw_rpc_test_TestRequest&,
62 ServerWriter<pw_rpc_test_TestStreamResponse>&) {}
63
TestClientStreamRpc(ServerReader<pw_rpc_test_TestRequest,pw_rpc_test_TestStreamResponse> &)64 void TestClientStreamRpc(
65 ServerReader<pw_rpc_test_TestRequest, pw_rpc_test_TestStreamResponse>&) {}
66
TestBidirectionalStreamRpc(ServerReaderWriter<pw_rpc_test_TestRequest,pw_rpc_test_TestStreamResponse> &)67 void TestBidirectionalStreamRpc(
68 ServerReaderWriter<pw_rpc_test_TestRequest,
69 pw_rpc_test_TestStreamResponse>&) {}
70 };
71
72 class RpcCaller {
73 public:
74 template <auto kMethod = GeneratedService::TestUnaryRpc>
BlockOnResponse(uint32_t i,Client & client,uint32_t channel_id)75 Status BlockOnResponse(uint32_t i, Client& client, uint32_t channel_id) {
76 response_status_ = OkStatus();
77 pw_rpc_test_TestRequest request{.integer = i,
78 .status_code = OkStatus().code()};
79 auto call = kMethod(
80 client,
81 channel_id,
82 request,
83 [this](const pw_rpc_test_TestResponse&, Status status) {
84 response_status_ = status;
85 semaphore_.release();
86 },
87 [this](Status status) {
88 response_status_ = status;
89 semaphore_.release();
90 });
91
92 semaphore_.acquire();
93 return response_status_;
94 }
95
96 private:
97 pw::sync::BinarySemaphore semaphore_;
98 Status response_status_ = OkStatus();
99 };
100
TEST(NanopbClientServerTestContextThreaded,ReceivesUnaryRpcResponseThreaded)101 TEST(NanopbClientServerTestContextThreaded, ReceivesUnaryRpcResponseThreaded) {
102 NanopbClientServerTestContextThreaded<> ctx(
103 // TODO: b/290860904 - Replace TestOptionsThread0 with TestThreadContext.
104 thread::test::TestOptionsThread0());
105 TestService service;
106 ctx.server().RegisterService(service);
107
108 RpcCaller caller;
109 constexpr auto value = 1;
110 EXPECT_EQ(caller.BlockOnResponse(value, ctx.client(), ctx.channel().id()),
111 OkStatus());
112
113 const auto request =
114 ctx.request<test::pw_rpc::nanopb::TestService::TestUnaryRpc>(0);
115 const auto response =
116 ctx.response<test::pw_rpc::nanopb::TestService::TestUnaryRpc>(0);
117
118 EXPECT_EQ(value, request.integer);
119 EXPECT_EQ(value + 1, response.value);
120 }
121
TEST(NanopbClientServerTestContextThreaded,ReceivesMultipleResponsesThreaded)122 TEST(NanopbClientServerTestContextThreaded, ReceivesMultipleResponsesThreaded) {
123 NanopbClientServerTestContextThreaded<> ctx(
124 thread::test::TestOptionsThread0());
125 TestService service;
126 ctx.server().RegisterService(service);
127
128 RpcCaller caller;
129 constexpr auto value1 = 1;
130 constexpr auto value2 = 2;
131 EXPECT_EQ(caller.BlockOnResponse(value1, ctx.client(), ctx.channel().id()),
132 OkStatus());
133 EXPECT_EQ(caller.BlockOnResponse(value2, ctx.client(), ctx.channel().id()),
134 OkStatus());
135
136 const auto request1 =
137 ctx.request<test::pw_rpc::nanopb::TestService::TestUnaryRpc>(0);
138 const auto request2 =
139 ctx.request<test::pw_rpc::nanopb::TestService::TestUnaryRpc>(1);
140 const auto response1 =
141 ctx.response<test::pw_rpc::nanopb::TestService::TestUnaryRpc>(0);
142 const auto response2 =
143 ctx.response<test::pw_rpc::nanopb::TestService::TestUnaryRpc>(1);
144
145 EXPECT_EQ(value1, request1.integer);
146 EXPECT_EQ(value2, request2.integer);
147 EXPECT_EQ(value1 + 1, response1.value);
148 EXPECT_EQ(value2 + 1, response2.value);
149 }
150
TEST(NanopbClientServerTestContextThreaded,ReceivesMultipleResponsesThreadedWithPacketProcessor)151 TEST(NanopbClientServerTestContextThreaded,
152 ReceivesMultipleResponsesThreadedWithPacketProcessor) {
153 using ProtectedInt = std::pair<int, pw::sync::Mutex>;
154 ProtectedInt server_counter{};
155 auto server_processor = [&server_counter](
156 ClientServer& client_server,
157 pw::ConstByteSpan packet) -> pw::Status {
158 server_counter.second.lock();
159 ++server_counter.first;
160 server_counter.second.unlock();
161 return client_server.ProcessPacket(packet);
162 };
163
164 ProtectedInt client_counter{};
165 auto client_processor = [&client_counter](
166 ClientServer& client_server,
167 pw::ConstByteSpan packet) -> pw::Status {
168 client_counter.second.lock();
169 ++client_counter.first;
170 client_counter.second.unlock();
171 return client_server.ProcessPacket(packet);
172 };
173
174 NanopbClientServerTestContextThreaded<> ctx(
175 thread::test::TestOptionsThread0(), server_processor, client_processor);
176 TestService service;
177 ctx.server().RegisterService(service);
178
179 RpcCaller caller;
180 constexpr auto value1 = 1;
181 constexpr auto value2 = 2;
182 EXPECT_EQ(caller.BlockOnResponse(value1, ctx.client(), ctx.channel().id()),
183 OkStatus());
184 EXPECT_EQ(caller.BlockOnResponse(value2, ctx.client(), ctx.channel().id()),
185 OkStatus());
186
187 const auto request1 =
188 ctx.request<test::pw_rpc::nanopb::TestService::TestUnaryRpc>(0);
189 const auto request2 =
190 ctx.request<test::pw_rpc::nanopb::TestService::TestUnaryRpc>(1);
191 const auto response1 =
192 ctx.response<test::pw_rpc::nanopb::TestService::TestUnaryRpc>(0);
193 const auto response2 =
194 ctx.response<test::pw_rpc::nanopb::TestService::TestUnaryRpc>(1);
195
196 EXPECT_EQ(value1, request1.integer);
197 EXPECT_EQ(value2, request2.integer);
198 EXPECT_EQ(value1 + 1, response1.value);
199 EXPECT_EQ(value2 + 1, response2.value);
200
201 server_counter.second.lock();
202 EXPECT_EQ(server_counter.first, 2);
203 server_counter.second.unlock();
204 client_counter.second.lock();
205 EXPECT_EQ(client_counter.first, 2);
206 client_counter.second.unlock();
207 }
208
TEST(NanopbClientServerTestContextThreaded,ResponseWithCallbacks)209 TEST(NanopbClientServerTestContextThreaded, ResponseWithCallbacks) {
210 NanopbClientServerTestContextThreaded<> ctx(
211 thread::test::TestOptionsThread0());
212 TestService service;
213 ctx.server().RegisterService(service);
214
215 RpcCaller caller;
216 EXPECT_EQ(caller.BlockOnResponse<GeneratedService::TestAnotherUnaryRpc>(
217 0, ctx.client(), ctx.channel().id()),
218 OkStatus());
219
220 // To decode a response object that requires to set pb_callback_t members,
221 // pass it to the response() method as a parameter.
222 constexpr size_t kMaxNumValues = 4;
223 struct DecoderContext {
224 uint32_t num_calls = 0;
225 uint32_t values[kMaxNumValues];
226 bool failed = false;
227 } decoder_context;
228
229 pw_rpc_test_TestResponse response = pw_rpc_test_TestResponse_init_default;
230 response.repeated_field.funcs.decode = +[](pb_istream_t* stream,
231 const pb_field_t* /* field */,
232 void** arg) -> bool {
233 DecoderContext* dec_ctx = static_cast<DecoderContext*>(*arg);
234 uint64_t value;
235 if (!pb_decode_varint(stream, &value)) {
236 dec_ctx->failed = true;
237 return false;
238 }
239 if (dec_ctx->num_calls < kMaxNumValues) {
240 dec_ctx->values[dec_ctx->num_calls] = value;
241 }
242 dec_ctx->num_calls++;
243 return true;
244 };
245 response.repeated_field.arg = &decoder_context;
246 ctx.response<GeneratedService::TestAnotherUnaryRpc>(0, response);
247
248 EXPECT_FALSE(decoder_context.failed);
249 EXPECT_EQ(3u, decoder_context.num_calls);
250 EXPECT_EQ(7u, decoder_context.values[0]);
251 EXPECT_EQ(8u, decoder_context.values[1]);
252 EXPECT_EQ(9u, decoder_context.values[2]);
253 }
254
255 } // namespace
256 } // namespace pw::rpc
257