• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_rpc/nanopb/server_reader_writer.h"
16 
17 #include <optional>
18 
19 #include "pw_rpc/nanopb/fake_channel_output.h"
20 #include "pw_rpc/nanopb/test_method_context.h"
21 #include "pw_rpc/service.h"
22 #include "pw_rpc_test_protos/test.rpc.pb.h"
23 #include "pw_unit_test/framework.h"
24 
25 namespace pw::rpc {
26 
27 class TestServiceImpl final
28     : public test::pw_rpc::nanopb::TestService::Service<TestServiceImpl> {
29  public:
TestUnaryRpc(const pw_rpc_test_TestRequest &,pw_rpc_test_TestResponse &)30   Status TestUnaryRpc(const pw_rpc_test_TestRequest&,
31                       pw_rpc_test_TestResponse&) {
32     return OkStatus();
33   }
34 
TestAnotherUnaryRpc(const pw_rpc_test_TestRequest &,NanopbUnaryResponder<pw_rpc_test_TestResponse> &)35   void TestAnotherUnaryRpc(const pw_rpc_test_TestRequest&,
36                            NanopbUnaryResponder<pw_rpc_test_TestResponse>&) {}
37 
TestServerStreamRpc(const pw_rpc_test_TestRequest &,NanopbServerWriter<pw_rpc_test_TestStreamResponse> &)38   void TestServerStreamRpc(
39       const pw_rpc_test_TestRequest&,
40       NanopbServerWriter<pw_rpc_test_TestStreamResponse>&) {}
41 
TestClientStreamRpc(NanopbServerReader<pw_rpc_test_TestRequest,pw_rpc_test_TestStreamResponse> &)42   void TestClientStreamRpc(
43       NanopbServerReader<pw_rpc_test_TestRequest,
44                          pw_rpc_test_TestStreamResponse>&) {}
45 
TestBidirectionalStreamRpc(NanopbServerReaderWriter<pw_rpc_test_TestRequest,pw_rpc_test_TestStreamResponse> &)46   void TestBidirectionalStreamRpc(
47       NanopbServerReaderWriter<pw_rpc_test_TestRequest,
48                                pw_rpc_test_TestStreamResponse>&) {}
49 };
50 
51 template <auto kMethod>
52 struct ReaderWriterTestContext {
53   using Info = internal::MethodInfo<kMethod>;
54 
55   static constexpr uint32_t kChannelId = 1;
56 
ReaderWriterTestContextpw::rpc::ReaderWriterTestContext57   ReaderWriterTestContext()
58       : channel(Channel::Create<kChannelId>(&output)),
59         server(span(&channel, 1)) {}
60 
61   TestServiceImpl service;
62   NanopbFakeChannelOutput<4> output;
63   Channel channel;
64   Server server;
65 };
66 
67 using test::pw_rpc::nanopb::TestService;
68 
TEST(NanopbUnaryResponder,DefaultConstructed)69 TEST(NanopbUnaryResponder, DefaultConstructed) {
70   NanopbUnaryResponder<pw_rpc_test_TestResponse> call;
71 
72   ASSERT_FALSE(call.active());
73   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
74 
75   EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
76 
77   call.set_on_error([](Status) {});
78 }
79 
TEST(NanopbServerWriter,DefaultConstructed)80 TEST(NanopbServerWriter, DefaultConstructed) {
81   NanopbServerWriter<pw_rpc_test_TestStreamResponse> call;
82 
83   ASSERT_FALSE(call.active());
84   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
85 
86   EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
87   EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus()));
88 
89   call.set_on_error([](Status) {});
90 }
91 
TEST(NanopbServerReader,DefaultConstructed)92 TEST(NanopbServerReader, DefaultConstructed) {
93   NanopbServerReader<pw_rpc_test_TestRequest, pw_rpc_test_TestStreamResponse>
94       call;
95 
96   ASSERT_FALSE(call.active());
97   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
98 
99   EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
100 
101   call.set_on_next([](const pw_rpc_test_TestRequest&) {});
102   call.set_on_error([](Status) {});
103 }
104 
TEST(NanopbServerReaderWriter,DefaultConstructed)105 TEST(NanopbServerReaderWriter, DefaultConstructed) {
106   NanopbServerReaderWriter<pw_rpc_test_TestRequest,
107                            pw_rpc_test_TestStreamResponse>
108       call;
109 
110   ASSERT_FALSE(call.active());
111   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
112 
113   EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
114   EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus()));
115 
116   call.set_on_next([](const pw_rpc_test_TestRequest&) {});
117   call.set_on_error([](Status) {});
118 }
119 
TEST(NanopbUnaryResponder,Closed)120 TEST(NanopbUnaryResponder, Closed) {
121   ReaderWriterTestContext<TestService::TestUnaryRpc> ctx;
122   NanopbUnaryResponder call =
123       NanopbUnaryResponder<pw_rpc_test_TestResponse>::Open<
124           TestService::TestUnaryRpc>(ctx.server, ctx.channel.id(), ctx.service);
125   ASSERT_EQ(OkStatus(), call.Finish({}, OkStatus()));
126 
127   ASSERT_FALSE(call.active());
128   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
129 
130   EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
131 
132   call.set_on_error([](Status) {});
133 }
134 
TEST(NanopbServerWriter,Closed)135 TEST(NanopbServerWriter, Closed) {
136   ReaderWriterTestContext<TestService::TestServerStreamRpc> ctx;
137   NanopbServerWriter call =
138       NanopbServerWriter<pw_rpc_test_TestStreamResponse>::Open<
139           TestService::TestServerStreamRpc>(
140           ctx.server, ctx.channel.id(), ctx.service);
141   ASSERT_EQ(OkStatus(), call.Finish(OkStatus()));
142 
143   ASSERT_FALSE(call.active());
144   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
145 
146   EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
147   EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus()));
148 
149   call.set_on_error([](Status) {});
150 }
151 
TEST(NanopbServerWriter,TryClosedFailed)152 TEST(NanopbServerWriter, TryClosedFailed) {
153   ReaderWriterTestContext<TestService::TestServerStreamRpc> ctx;
154   NanopbServerWriter call =
155       NanopbServerWriter<pw_rpc_test_TestStreamResponse>::Open<
156           TestService::TestServerStreamRpc>(
157           ctx.server, ctx.channel.id(), ctx.service);
158   // Sets ChannelOutput to always return false.
159   ctx.output.set_send_status(Status::Unknown());
160   ASSERT_EQ(Status::Unknown(), call.TryFinish(OkStatus()));
161 
162   // Call should be still alive.
163   ASSERT_TRUE(call.active());
164 }
165 
TEST(NanopbServerWriter,TryCloseSuccessful)166 TEST(NanopbServerWriter, TryCloseSuccessful) {
167   ReaderWriterTestContext<TestService::TestServerStreamRpc> ctx;
168   NanopbServerWriter call =
169       NanopbServerWriter<pw_rpc_test_TestStreamResponse>::Open<
170           TestService::TestServerStreamRpc>(
171           ctx.server, ctx.channel.id(), ctx.service);
172   // Sets ChannelOutput to always return false.
173   ctx.output.set_send_status(Status::Unknown());
174   ASSERT_EQ(Status::Unknown(), call.TryFinish(OkStatus()));
175 
176   // Call should be still alive.
177   ASSERT_TRUE(call.active());
178 
179   // Tries to close the call again, with ChannelOutput set to return ok.
180   ctx.output.set_send_status(OkStatus());
181   ASSERT_EQ(OkStatus(), call.TryFinish(OkStatus()));
182   // Call should be closed.
183   ASSERT_FALSE(call.active());
184 }
185 
TEST(NanopbServerReader,Closed)186 TEST(NanopbServerReader, Closed) {
187   ReaderWriterTestContext<TestService::TestClientStreamRpc> ctx;
188   NanopbServerReader call = NanopbServerReader<pw_rpc_test_TestRequest,
189                                                pw_rpc_test_TestStreamResponse>::
190       Open<TestService::TestClientStreamRpc>(
191           ctx.server, ctx.channel.id(), ctx.service);
192   ASSERT_EQ(OkStatus(), call.Finish({}, OkStatus()));
193 
194   ASSERT_FALSE(call.active());
195   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
196 
197   EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
198 
199   call.set_on_next([](const pw_rpc_test_TestRequest&) {});
200   call.set_on_error([](Status) {});
201 }
202 
TEST(NanopbServerReader,TryClosedFailed)203 TEST(NanopbServerReader, TryClosedFailed) {
204   ReaderWriterTestContext<TestService::TestClientStreamRpc> ctx;
205   NanopbServerReader call = NanopbServerReader<pw_rpc_test_TestRequest,
206                                                pw_rpc_test_TestStreamResponse>::
207       Open<TestService::TestClientStreamRpc>(
208           ctx.server, ctx.channel.id(), ctx.service);
209   // Sets ChannelOutput to always return false.
210   ctx.output.set_send_status(Status::Unknown());
211   ASSERT_EQ(Status::Unknown(), call.TryFinish({}, OkStatus()));
212 
213   // Call should be still alive.
214   ASSERT_TRUE(call.active());
215 }
216 
TEST(NanopbServerReader,TryCloseSuccessful)217 TEST(NanopbServerReader, TryCloseSuccessful) {
218   ReaderWriterTestContext<TestService::TestClientStreamRpc> ctx;
219   NanopbServerReader call = NanopbServerReader<pw_rpc_test_TestRequest,
220                                                pw_rpc_test_TestStreamResponse>::
221       Open<TestService::TestClientStreamRpc>(
222           ctx.server, ctx.channel.id(), ctx.service);
223   // Sets ChannelOutput to always return false.
224   ctx.output.set_send_status(Status::Unknown());
225   ASSERT_EQ(Status::Unknown(), call.TryFinish({}, OkStatus()));
226 
227   // Call should be still alive.
228   ASSERT_TRUE(call.active());
229 
230   // Tries to close the call again, with ChannelOutput set to return ok.
231   ctx.output.set_send_status(OkStatus());
232   ASSERT_EQ(OkStatus(), call.TryFinish({}, OkStatus()));
233   // Call should be closed.
234   ASSERT_FALSE(call.active());
235 }
236 
TEST(NanopbServerReaderWriter,Closed)237 TEST(NanopbServerReaderWriter, Closed) {
238   ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
239   NanopbServerReaderWriter call =
240       NanopbServerReaderWriter<pw_rpc_test_TestRequest,
241                                pw_rpc_test_TestStreamResponse>::
242           Open<TestService::TestBidirectionalStreamRpc>(
243               ctx.server, ctx.channel.id(), ctx.service);
244   ASSERT_EQ(OkStatus(), call.Finish(OkStatus()));
245 
246   ASSERT_FALSE(call.active());
247   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
248 
249   EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
250   EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus()));
251 
252   call.set_on_next([](const pw_rpc_test_TestRequest&) {});
253   call.set_on_error([](Status) {});
254 }
255 
TEST(NanopbServerReaderWriter,TryClosedFailed)256 TEST(NanopbServerReaderWriter, TryClosedFailed) {
257   ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
258   NanopbServerReaderWriter call =
259       NanopbServerReaderWriter<pw_rpc_test_TestRequest,
260                                pw_rpc_test_TestStreamResponse>::
261           Open<TestService::TestBidirectionalStreamRpc>(
262               ctx.server, ctx.channel.id(), ctx.service);
263   // Sets ChannelOutput to always return false.
264   ctx.output.set_send_status(Status::Unknown());
265   ASSERT_EQ(Status::Unknown(), call.TryFinish(OkStatus()));
266 
267   // Call should be still alive.
268   ASSERT_TRUE(call.active());
269 }
270 
TEST(NanopbServerReaderWriter,TryCloseSuccessful)271 TEST(NanopbServerReaderWriter, TryCloseSuccessful) {
272   ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
273   NanopbServerReaderWriter call =
274       NanopbServerReaderWriter<pw_rpc_test_TestRequest,
275                                pw_rpc_test_TestStreamResponse>::
276           Open<TestService::TestBidirectionalStreamRpc>(
277               ctx.server, ctx.channel.id(), ctx.service);
278   // Sets ChannelOutput to always return false.
279   ctx.output.set_send_status(Status::Unknown());
280   ASSERT_EQ(Status::Unknown(), call.TryFinish(OkStatus()));
281 
282   // Call should be still alive.
283   ASSERT_TRUE(call.active());
284 
285   // Tries to close the call again, with ChannelOutput set to return ok.
286   ctx.output.set_send_status(OkStatus());
287   ASSERT_EQ(OkStatus(), call.TryFinish(OkStatus()));
288   // Call should be closed.
289   ASSERT_FALSE(call.active());
290 }
291 
TEST(NanopbUnaryResponder,Open_ReturnsUsableResponder)292 TEST(NanopbUnaryResponder, Open_ReturnsUsableResponder) {
293   ReaderWriterTestContext<TestService::TestUnaryRpc> ctx;
294   NanopbUnaryResponder responder =
295       NanopbUnaryResponder<pw_rpc_test_TestResponse>::Open<
296           TestService::TestUnaryRpc>(ctx.server, ctx.channel.id(), ctx.service);
297 
298   ASSERT_EQ(OkStatus(),
299             responder.Finish({.value = 4321, .repeated_field = {}}));
300 
301   EXPECT_EQ(ctx.output.last_response<TestService::TestUnaryRpc>().value, 4321);
302   EXPECT_EQ(ctx.output.last_status(), OkStatus());
303 }
304 
TEST(NanopbServerWriter,Open_ReturnsUsableWriter)305 TEST(NanopbServerWriter, Open_ReturnsUsableWriter) {
306   ReaderWriterTestContext<TestService::TestServerStreamRpc> ctx;
307   NanopbServerWriter responder =
308       NanopbServerWriter<pw_rpc_test_TestStreamResponse>::Open<
309           TestService::TestServerStreamRpc>(
310           ctx.server, ctx.channel.id(), ctx.service);
311 
312   ASSERT_EQ(OkStatus(), responder.Write({.chunk = {}, .number = 321}));
313   ASSERT_EQ(OkStatus(), responder.Finish());
314 
315   EXPECT_EQ(ctx.output.last_response<TestService::TestServerStreamRpc>().number,
316             321u);
317   EXPECT_EQ(ctx.output.last_status(), OkStatus());
318 }
319 
TEST(NanopbServerReader,Open_ReturnsUsableReader)320 TEST(NanopbServerReader, Open_ReturnsUsableReader) {
321   ReaderWriterTestContext<TestService::TestClientStreamRpc> ctx;
322   NanopbServerReader responder =
323       NanopbServerReader<pw_rpc_test_TestRequest,
324                          pw_rpc_test_TestStreamResponse>::
325           Open<TestService::TestClientStreamRpc>(
326               ctx.server, ctx.channel.id(), ctx.service);
327 
328   ASSERT_EQ(OkStatus(), responder.Finish({.chunk = {}, .number = 321}));
329 
330   EXPECT_EQ(ctx.output.last_response<TestService::TestClientStreamRpc>().number,
331             321u);
332 }
333 
TEST(NanopbServerReaderWriter,Open_ReturnsUsableReaderWriter)334 TEST(NanopbServerReaderWriter, Open_ReturnsUsableReaderWriter) {
335   ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
336   NanopbServerReaderWriter responder =
337       NanopbServerReaderWriter<pw_rpc_test_TestRequest,
338                                pw_rpc_test_TestStreamResponse>::
339           Open<TestService::TestBidirectionalStreamRpc>(
340               ctx.server, ctx.channel.id(), ctx.service);
341 
342   ASSERT_EQ(OkStatus(), responder.Write({.chunk = {}, .number = 321}));
343   ASSERT_EQ(OkStatus(), responder.Finish(Status::NotFound()));
344 
345   EXPECT_EQ(ctx.output.last_response<TestService::TestBidirectionalStreamRpc>()
346                 .number,
347             321u);
348   EXPECT_EQ(ctx.output.last_status(), Status::NotFound());
349 }
350 
TEST(RawServerReaderWriter,Open_UnknownChannel)351 TEST(RawServerReaderWriter, Open_UnknownChannel) {
352   ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
353   ASSERT_EQ(OkStatus(), ctx.server.CloseChannel(ctx.kChannelId));
354 
355   NanopbServerReaderWriter call =
356       NanopbServerReaderWriter<pw_rpc_test_TestRequest,
357                                pw_rpc_test_TestStreamResponse>::
358           Open<TestService::TestBidirectionalStreamRpc>(
359               ctx.server, ctx.kChannelId, ctx.service);
360 
361   EXPECT_TRUE(call.active());
362   EXPECT_EQ(call.channel_id(), ctx.kChannelId);
363   EXPECT_EQ(Status::Unavailable(), call.Write({}));
364 
365   ASSERT_EQ(OkStatus(), ctx.server.OpenChannel(ctx.kChannelId, ctx.output));
366 
367   EXPECT_EQ(OkStatus(), call.Write({}));
368   EXPECT_TRUE(call.active());
369 
370   EXPECT_EQ(OkStatus(), call.Finish());
371   EXPECT_FALSE(call.active());
372   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
373 }
374 
TEST(RawServerReaderWriter,Open_MultipleTimes_CancelsPrevious)375 TEST(RawServerReaderWriter, Open_MultipleTimes_CancelsPrevious) {
376   ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
377 
378   NanopbServerReaderWriter one =
379       NanopbServerReaderWriter<pw_rpc_test_TestRequest,
380                                pw_rpc_test_TestStreamResponse>::
381           Open<TestService::TestBidirectionalStreamRpc>(
382               ctx.server, ctx.kChannelId, ctx.service);
383 
384   std::optional<Status> error;
385   one.set_on_error([&error](Status status) { error = status; });
386 
387   ASSERT_TRUE(one.active());
388 
389   NanopbServerReaderWriter two =
390       NanopbServerReaderWriter<pw_rpc_test_TestRequest,
391                                pw_rpc_test_TestStreamResponse>::
392           Open<TestService::TestBidirectionalStreamRpc>(
393               ctx.server, ctx.kChannelId, ctx.service);
394 
395   EXPECT_FALSE(one.active());
396   EXPECT_TRUE(two.active());
397 
398   EXPECT_EQ(Status::Cancelled(), error);
399 }
400 
TEST(NanopbServerReader,CallbacksMoveCorrectly)401 TEST(NanopbServerReader, CallbacksMoveCorrectly) {
402   PW_NANOPB_TEST_METHOD_CONTEXT(TestServiceImpl, TestClientStreamRpc) ctx;
403 
404   NanopbServerReader call_1 = ctx.reader();
405 
406   ASSERT_TRUE(call_1.active());
407 
408   pw_rpc_test_TestRequest received_request = {.integer = 12345678,
409                                               .status_code = 1};
410 
411   call_1.set_on_next([&received_request](const pw_rpc_test_TestRequest& value) {
412     received_request = value;
413   });
414 
415   NanopbServerReader<pw_rpc_test_TestRequest, pw_rpc_test_TestStreamResponse>
416       call_2;
417   call_2 = std::move(call_1);
418 
419   constexpr pw_rpc_test_TestRequest request{.integer = 600613,
420                                             .status_code = 2};
421   ctx.SendClientStream(request);
422   EXPECT_EQ(request.integer, received_request.integer);
423   EXPECT_EQ(request.status_code, received_request.status_code);
424 }
425 
426 }  // namespace pw::rpc
427