• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include <atomic>
16 
17 #include "pw_rpc/nanopb/client_server_testing_threaded.h"
18 #include "pw_rpc_test_protos/test.rpc.pb.h"
19 #include "pw_sync/binary_semaphore.h"
20 #include "pw_sync/mutex.h"
21 #include "pw_thread/non_portable_test_thread_options.h"
22 #include "pw_unit_test/framework.h"
23 
24 namespace pw::rpc {
25 namespace {
26 
27 using GeneratedService = ::pw::rpc::test::pw_rpc::nanopb::TestService;
28 
29 class TestService final : public GeneratedService::Service<TestService> {
30  public:
TestUnaryRpc(const pw_rpc_test_TestRequest & request,pw_rpc_test_TestResponse & response)31   Status TestUnaryRpc(const pw_rpc_test_TestRequest& request,
32                       pw_rpc_test_TestResponse& response) {
33     response.value = request.integer + 1;
34     return static_cast<Status::Code>(request.status_code);
35   }
36 
TestAnotherUnaryRpc(const pw_rpc_test_TestRequest & request,pw_rpc_test_TestResponse & response)37   Status TestAnotherUnaryRpc(const pw_rpc_test_TestRequest& request,
38                              pw_rpc_test_TestResponse& response) {
39     using ArgType = std::array<uint32_t, 3>;
40     // The values array needs to be kept in memory until after this method call
41     // returns since the response is not encoded until after returning from this
42     // method.
43     static const ArgType values = {7, 8, 9};
44     response.repeated_field.funcs.encode = +[](pb_ostream_t* stream,
45                                                const pb_field_t* field,
46                                                void* const* arg) -> bool {
47       // Note: nanopb passes the pointer to the repeated_filed.arg member as
48       // arg, not its contents.
49       for (auto elem : *static_cast<const ArgType*>(*arg)) {
50         if (!pb_encode_tag_for_field(stream, field) ||
51             !pb_encode_varint(stream, elem))
52           return false;
53       }
54       return true;
55     };
56     response.repeated_field.arg = const_cast<ArgType*>(&values);
57     return static_cast<Status::Code>(request.status_code);
58   }
59 
TestServerStreamRpc(const pw_rpc_test_TestRequest &,ServerWriter<pw_rpc_test_TestStreamResponse> &)60   static void TestServerStreamRpc(
61       const pw_rpc_test_TestRequest&,
62       ServerWriter<pw_rpc_test_TestStreamResponse>&) {}
63 
TestClientStreamRpc(ServerReader<pw_rpc_test_TestRequest,pw_rpc_test_TestStreamResponse> &)64   void TestClientStreamRpc(
65       ServerReader<pw_rpc_test_TestRequest, pw_rpc_test_TestStreamResponse>&) {}
66 
TestBidirectionalStreamRpc(ServerReaderWriter<pw_rpc_test_TestRequest,pw_rpc_test_TestStreamResponse> &)67   void TestBidirectionalStreamRpc(
68       ServerReaderWriter<pw_rpc_test_TestRequest,
69                          pw_rpc_test_TestStreamResponse>&) {}
70 };
71 
72 class RpcCaller {
73  public:
74   template <auto kMethod = GeneratedService::TestUnaryRpc>
BlockOnResponse(uint32_t i,Client & client,uint32_t channel_id)75   Status BlockOnResponse(uint32_t i, Client& client, uint32_t channel_id) {
76     response_status_ = OkStatus();
77     pw_rpc_test_TestRequest request{.integer = i,
78                                     .status_code = OkStatus().code()};
79     auto call = kMethod(
80         client,
81         channel_id,
82         request,
83         [this](const pw_rpc_test_TestResponse&, Status status) {
84           response_status_ = status;
85           semaphore_.release();
86         },
87         [this](Status status) {
88           response_status_ = status;
89           semaphore_.release();
90         });
91 
92     semaphore_.acquire();
93     return response_status_;
94   }
95 
96  private:
97   pw::sync::BinarySemaphore semaphore_;
98   Status response_status_ = OkStatus();
99 };
100 
TEST(NanopbClientServerTestContextThreaded,ReceivesUnaryRpcResponseThreaded)101 TEST(NanopbClientServerTestContextThreaded, ReceivesUnaryRpcResponseThreaded) {
102   NanopbClientServerTestContextThreaded<> ctx(
103       // TODO: b/290860904 - Replace TestOptionsThread0 with TestThreadContext.
104       thread::test::TestOptionsThread0());
105   TestService service;
106   ctx.server().RegisterService(service);
107 
108   RpcCaller caller;
109   constexpr auto value = 1;
110   EXPECT_EQ(caller.BlockOnResponse(value, ctx.client(), ctx.channel().id()),
111             OkStatus());
112 
113   const auto request =
114       ctx.request<test::pw_rpc::nanopb::TestService::TestUnaryRpc>(0);
115   const auto response =
116       ctx.response<test::pw_rpc::nanopb::TestService::TestUnaryRpc>(0);
117 
118   EXPECT_EQ(value, request.integer);
119   EXPECT_EQ(value + 1, response.value);
120 }
121 
TEST(NanopbClientServerTestContextThreaded,ReceivesMultipleResponsesThreaded)122 TEST(NanopbClientServerTestContextThreaded, ReceivesMultipleResponsesThreaded) {
123   NanopbClientServerTestContextThreaded<> ctx(
124       thread::test::TestOptionsThread0());
125   TestService service;
126   ctx.server().RegisterService(service);
127 
128   RpcCaller caller;
129   constexpr auto value1 = 1;
130   constexpr auto value2 = 2;
131   EXPECT_EQ(caller.BlockOnResponse(value1, ctx.client(), ctx.channel().id()),
132             OkStatus());
133   EXPECT_EQ(caller.BlockOnResponse(value2, ctx.client(), ctx.channel().id()),
134             OkStatus());
135 
136   const auto request1 =
137       ctx.request<test::pw_rpc::nanopb::TestService::TestUnaryRpc>(0);
138   const auto request2 =
139       ctx.request<test::pw_rpc::nanopb::TestService::TestUnaryRpc>(1);
140   const auto response1 =
141       ctx.response<test::pw_rpc::nanopb::TestService::TestUnaryRpc>(0);
142   const auto response2 =
143       ctx.response<test::pw_rpc::nanopb::TestService::TestUnaryRpc>(1);
144 
145   EXPECT_EQ(value1, request1.integer);
146   EXPECT_EQ(value2, request2.integer);
147   EXPECT_EQ(value1 + 1, response1.value);
148   EXPECT_EQ(value2 + 1, response2.value);
149 }
150 
TEST(NanopbClientServerTestContextThreaded,ReceivesMultipleResponsesThreadedWithPacketProcessor)151 TEST(NanopbClientServerTestContextThreaded,
152      ReceivesMultipleResponsesThreadedWithPacketProcessor) {
153   using ProtectedInt = std::pair<int, pw::sync::Mutex>;
154   ProtectedInt server_counter{};
155   auto server_processor = [&server_counter](
156                               ClientServer& client_server,
157                               pw::ConstByteSpan packet) -> pw::Status {
158     server_counter.second.lock();
159     ++server_counter.first;
160     server_counter.second.unlock();
161     return client_server.ProcessPacket(packet);
162   };
163 
164   ProtectedInt client_counter{};
165   auto client_processor = [&client_counter](
166                               ClientServer& client_server,
167                               pw::ConstByteSpan packet) -> pw::Status {
168     client_counter.second.lock();
169     ++client_counter.first;
170     client_counter.second.unlock();
171     return client_server.ProcessPacket(packet);
172   };
173 
174   NanopbClientServerTestContextThreaded<> ctx(
175       thread::test::TestOptionsThread0(), server_processor, client_processor);
176   TestService service;
177   ctx.server().RegisterService(service);
178 
179   RpcCaller caller;
180   constexpr auto value1 = 1;
181   constexpr auto value2 = 2;
182   EXPECT_EQ(caller.BlockOnResponse(value1, ctx.client(), ctx.channel().id()),
183             OkStatus());
184   EXPECT_EQ(caller.BlockOnResponse(value2, ctx.client(), ctx.channel().id()),
185             OkStatus());
186 
187   const auto request1 =
188       ctx.request<test::pw_rpc::nanopb::TestService::TestUnaryRpc>(0);
189   const auto request2 =
190       ctx.request<test::pw_rpc::nanopb::TestService::TestUnaryRpc>(1);
191   const auto response1 =
192       ctx.response<test::pw_rpc::nanopb::TestService::TestUnaryRpc>(0);
193   const auto response2 =
194       ctx.response<test::pw_rpc::nanopb::TestService::TestUnaryRpc>(1);
195 
196   EXPECT_EQ(value1, request1.integer);
197   EXPECT_EQ(value2, request2.integer);
198   EXPECT_EQ(value1 + 1, response1.value);
199   EXPECT_EQ(value2 + 1, response2.value);
200 
201   server_counter.second.lock();
202   EXPECT_EQ(server_counter.first, 2);
203   server_counter.second.unlock();
204   client_counter.second.lock();
205   EXPECT_EQ(client_counter.first, 2);
206   client_counter.second.unlock();
207 }
208 
TEST(NanopbClientServerTestContextThreaded,ResponseWithCallbacks)209 TEST(NanopbClientServerTestContextThreaded, ResponseWithCallbacks) {
210   NanopbClientServerTestContextThreaded<> ctx(
211       thread::test::TestOptionsThread0());
212   TestService service;
213   ctx.server().RegisterService(service);
214 
215   RpcCaller caller;
216   EXPECT_EQ(caller.BlockOnResponse<GeneratedService::TestAnotherUnaryRpc>(
217                 0, ctx.client(), ctx.channel().id()),
218             OkStatus());
219 
220   // To decode a response object that requires to set pb_callback_t members,
221   // pass it to the response() method as a parameter.
222   constexpr size_t kMaxNumValues = 4;
223   struct DecoderContext {
224     uint32_t num_calls = 0;
225     uint32_t values[kMaxNumValues];
226     bool failed = false;
227   } decoder_context;
228 
229   pw_rpc_test_TestResponse response = pw_rpc_test_TestResponse_init_default;
230   response.repeated_field.funcs.decode = +[](pb_istream_t* stream,
231                                              const pb_field_t* /* field */,
232                                              void** arg) -> bool {
233     DecoderContext* dec_ctx = static_cast<DecoderContext*>(*arg);
234     uint64_t value;
235     if (!pb_decode_varint(stream, &value)) {
236       dec_ctx->failed = true;
237       return false;
238     }
239     if (dec_ctx->num_calls < kMaxNumValues) {
240       dec_ctx->values[dec_ctx->num_calls] = value;
241     }
242     dec_ctx->num_calls++;
243     return true;
244   };
245   response.repeated_field.arg = &decoder_context;
246   ctx.response<GeneratedService::TestAnotherUnaryRpc>(0, response);
247 
248   EXPECT_FALSE(decoder_context.failed);
249   EXPECT_EQ(3u, decoder_context.num_calls);
250   EXPECT_EQ(7u, decoder_context.values[0]);
251   EXPECT_EQ(8u, decoder_context.values[1]);
252   EXPECT_EQ(9u, decoder_context.values[2]);
253 }
254 
255 }  // namespace
256 }  // namespace pw::rpc
257