1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_rpc/nanopb/server_reader_writer.h"
16
17 #include <optional>
18
19 #include "pw_rpc/nanopb/fake_channel_output.h"
20 #include "pw_rpc/nanopb/test_method_context.h"
21 #include "pw_rpc/service.h"
22 #include "pw_rpc_test_protos/test.rpc.pb.h"
23 #include "pw_unit_test/framework.h"
24
25 namespace pw::rpc {
26
27 class TestServiceImpl final
28 : public test::pw_rpc::nanopb::TestService::Service<TestServiceImpl> {
29 public:
TestUnaryRpc(const pw_rpc_test_TestRequest &,pw_rpc_test_TestResponse &)30 Status TestUnaryRpc(const pw_rpc_test_TestRequest&,
31 pw_rpc_test_TestResponse&) {
32 return OkStatus();
33 }
34
TestAnotherUnaryRpc(const pw_rpc_test_TestRequest &,NanopbUnaryResponder<pw_rpc_test_TestResponse> &)35 void TestAnotherUnaryRpc(const pw_rpc_test_TestRequest&,
36 NanopbUnaryResponder<pw_rpc_test_TestResponse>&) {}
37
TestServerStreamRpc(const pw_rpc_test_TestRequest &,NanopbServerWriter<pw_rpc_test_TestStreamResponse> &)38 void TestServerStreamRpc(
39 const pw_rpc_test_TestRequest&,
40 NanopbServerWriter<pw_rpc_test_TestStreamResponse>&) {}
41
TestClientStreamRpc(NanopbServerReader<pw_rpc_test_TestRequest,pw_rpc_test_TestStreamResponse> &)42 void TestClientStreamRpc(
43 NanopbServerReader<pw_rpc_test_TestRequest,
44 pw_rpc_test_TestStreamResponse>&) {}
45
TestBidirectionalStreamRpc(NanopbServerReaderWriter<pw_rpc_test_TestRequest,pw_rpc_test_TestStreamResponse> &)46 void TestBidirectionalStreamRpc(
47 NanopbServerReaderWriter<pw_rpc_test_TestRequest,
48 pw_rpc_test_TestStreamResponse>&) {}
49 };
50
51 template <auto kMethod>
52 struct ReaderWriterTestContext {
53 using Info = internal::MethodInfo<kMethod>;
54
55 static constexpr uint32_t kChannelId = 1;
56
ReaderWriterTestContextpw::rpc::ReaderWriterTestContext57 ReaderWriterTestContext()
58 : channel(Channel::Create<kChannelId>(&output)),
59 server(span(&channel, 1)) {}
60
61 TestServiceImpl service;
62 NanopbFakeChannelOutput<4> output;
63 Channel channel;
64 Server server;
65 };
66
67 using test::pw_rpc::nanopb::TestService;
68
TEST(NanopbUnaryResponder,DefaultConstructed)69 TEST(NanopbUnaryResponder, DefaultConstructed) {
70 NanopbUnaryResponder<pw_rpc_test_TestResponse> call;
71
72 ASSERT_FALSE(call.active());
73 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
74
75 EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
76
77 call.set_on_error([](Status) {});
78 }
79
TEST(NanopbServerWriter,DefaultConstructed)80 TEST(NanopbServerWriter, DefaultConstructed) {
81 NanopbServerWriter<pw_rpc_test_TestStreamResponse> call;
82
83 ASSERT_FALSE(call.active());
84 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
85
86 EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
87 EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus()));
88
89 call.set_on_error([](Status) {});
90 }
91
TEST(NanopbServerReader,DefaultConstructed)92 TEST(NanopbServerReader, DefaultConstructed) {
93 NanopbServerReader<pw_rpc_test_TestRequest, pw_rpc_test_TestStreamResponse>
94 call;
95
96 ASSERT_FALSE(call.active());
97 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
98
99 EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
100
101 call.set_on_next([](const pw_rpc_test_TestRequest&) {});
102 call.set_on_error([](Status) {});
103 }
104
TEST(NanopbServerReaderWriter,DefaultConstructed)105 TEST(NanopbServerReaderWriter, DefaultConstructed) {
106 NanopbServerReaderWriter<pw_rpc_test_TestRequest,
107 pw_rpc_test_TestStreamResponse>
108 call;
109
110 ASSERT_FALSE(call.active());
111 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
112
113 EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
114 EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus()));
115
116 call.set_on_next([](const pw_rpc_test_TestRequest&) {});
117 call.set_on_error([](Status) {});
118 }
119
TEST(NanopbUnaryResponder,Closed)120 TEST(NanopbUnaryResponder, Closed) {
121 ReaderWriterTestContext<TestService::TestUnaryRpc> ctx;
122 NanopbUnaryResponder call =
123 NanopbUnaryResponder<pw_rpc_test_TestResponse>::Open<
124 TestService::TestUnaryRpc>(ctx.server, ctx.channel.id(), ctx.service);
125 ASSERT_EQ(OkStatus(), call.Finish({}, OkStatus()));
126
127 ASSERT_FALSE(call.active());
128 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
129
130 EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
131
132 call.set_on_error([](Status) {});
133 }
134
TEST(NanopbServerWriter,Closed)135 TEST(NanopbServerWriter, Closed) {
136 ReaderWriterTestContext<TestService::TestServerStreamRpc> ctx;
137 NanopbServerWriter call =
138 NanopbServerWriter<pw_rpc_test_TestStreamResponse>::Open<
139 TestService::TestServerStreamRpc>(
140 ctx.server, ctx.channel.id(), ctx.service);
141 ASSERT_EQ(OkStatus(), call.Finish(OkStatus()));
142
143 ASSERT_FALSE(call.active());
144 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
145
146 EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
147 EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus()));
148
149 call.set_on_error([](Status) {});
150 }
151
TEST(NanopbServerWriter,TryClosedFailed)152 TEST(NanopbServerWriter, TryClosedFailed) {
153 ReaderWriterTestContext<TestService::TestServerStreamRpc> ctx;
154 NanopbServerWriter call =
155 NanopbServerWriter<pw_rpc_test_TestStreamResponse>::Open<
156 TestService::TestServerStreamRpc>(
157 ctx.server, ctx.channel.id(), ctx.service);
158 // Sets ChannelOutput to always return false.
159 ctx.output.set_send_status(Status::Unknown());
160 ASSERT_EQ(Status::Unknown(), call.TryFinish(OkStatus()));
161
162 // Call should be still alive.
163 ASSERT_TRUE(call.active());
164 }
165
TEST(NanopbServerWriter,TryCloseSuccessful)166 TEST(NanopbServerWriter, TryCloseSuccessful) {
167 ReaderWriterTestContext<TestService::TestServerStreamRpc> ctx;
168 NanopbServerWriter call =
169 NanopbServerWriter<pw_rpc_test_TestStreamResponse>::Open<
170 TestService::TestServerStreamRpc>(
171 ctx.server, ctx.channel.id(), ctx.service);
172 // Sets ChannelOutput to always return false.
173 ctx.output.set_send_status(Status::Unknown());
174 ASSERT_EQ(Status::Unknown(), call.TryFinish(OkStatus()));
175
176 // Call should be still alive.
177 ASSERT_TRUE(call.active());
178
179 // Tries to close the call again, with ChannelOutput set to return ok.
180 ctx.output.set_send_status(OkStatus());
181 ASSERT_EQ(OkStatus(), call.TryFinish(OkStatus()));
182 // Call should be closed.
183 ASSERT_FALSE(call.active());
184 }
185
TEST(NanopbServerReader,Closed)186 TEST(NanopbServerReader, Closed) {
187 ReaderWriterTestContext<TestService::TestClientStreamRpc> ctx;
188 NanopbServerReader call = NanopbServerReader<pw_rpc_test_TestRequest,
189 pw_rpc_test_TestStreamResponse>::
190 Open<TestService::TestClientStreamRpc>(
191 ctx.server, ctx.channel.id(), ctx.service);
192 ASSERT_EQ(OkStatus(), call.Finish({}, OkStatus()));
193
194 ASSERT_FALSE(call.active());
195 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
196
197 EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
198
199 call.set_on_next([](const pw_rpc_test_TestRequest&) {});
200 call.set_on_error([](Status) {});
201 }
202
TEST(NanopbServerReader,TryClosedFailed)203 TEST(NanopbServerReader, TryClosedFailed) {
204 ReaderWriterTestContext<TestService::TestClientStreamRpc> ctx;
205 NanopbServerReader call = NanopbServerReader<pw_rpc_test_TestRequest,
206 pw_rpc_test_TestStreamResponse>::
207 Open<TestService::TestClientStreamRpc>(
208 ctx.server, ctx.channel.id(), ctx.service);
209 // Sets ChannelOutput to always return false.
210 ctx.output.set_send_status(Status::Unknown());
211 ASSERT_EQ(Status::Unknown(), call.TryFinish({}, OkStatus()));
212
213 // Call should be still alive.
214 ASSERT_TRUE(call.active());
215 }
216
TEST(NanopbServerReader,TryCloseSuccessful)217 TEST(NanopbServerReader, TryCloseSuccessful) {
218 ReaderWriterTestContext<TestService::TestClientStreamRpc> ctx;
219 NanopbServerReader call = NanopbServerReader<pw_rpc_test_TestRequest,
220 pw_rpc_test_TestStreamResponse>::
221 Open<TestService::TestClientStreamRpc>(
222 ctx.server, ctx.channel.id(), ctx.service);
223 // Sets ChannelOutput to always return false.
224 ctx.output.set_send_status(Status::Unknown());
225 ASSERT_EQ(Status::Unknown(), call.TryFinish({}, OkStatus()));
226
227 // Call should be still alive.
228 ASSERT_TRUE(call.active());
229
230 // Tries to close the call again, with ChannelOutput set to return ok.
231 ctx.output.set_send_status(OkStatus());
232 ASSERT_EQ(OkStatus(), call.TryFinish({}, OkStatus()));
233 // Call should be closed.
234 ASSERT_FALSE(call.active());
235 }
236
TEST(NanopbServerReaderWriter,Closed)237 TEST(NanopbServerReaderWriter, Closed) {
238 ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
239 NanopbServerReaderWriter call =
240 NanopbServerReaderWriter<pw_rpc_test_TestRequest,
241 pw_rpc_test_TestStreamResponse>::
242 Open<TestService::TestBidirectionalStreamRpc>(
243 ctx.server, ctx.channel.id(), ctx.service);
244 ASSERT_EQ(OkStatus(), call.Finish(OkStatus()));
245
246 ASSERT_FALSE(call.active());
247 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
248
249 EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
250 EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus()));
251
252 call.set_on_next([](const pw_rpc_test_TestRequest&) {});
253 call.set_on_error([](Status) {});
254 }
255
TEST(NanopbServerReaderWriter,TryClosedFailed)256 TEST(NanopbServerReaderWriter, TryClosedFailed) {
257 ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
258 NanopbServerReaderWriter call =
259 NanopbServerReaderWriter<pw_rpc_test_TestRequest,
260 pw_rpc_test_TestStreamResponse>::
261 Open<TestService::TestBidirectionalStreamRpc>(
262 ctx.server, ctx.channel.id(), ctx.service);
263 // Sets ChannelOutput to always return false.
264 ctx.output.set_send_status(Status::Unknown());
265 ASSERT_EQ(Status::Unknown(), call.TryFinish(OkStatus()));
266
267 // Call should be still alive.
268 ASSERT_TRUE(call.active());
269 }
270
TEST(NanopbServerReaderWriter,TryCloseSuccessful)271 TEST(NanopbServerReaderWriter, TryCloseSuccessful) {
272 ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
273 NanopbServerReaderWriter call =
274 NanopbServerReaderWriter<pw_rpc_test_TestRequest,
275 pw_rpc_test_TestStreamResponse>::
276 Open<TestService::TestBidirectionalStreamRpc>(
277 ctx.server, ctx.channel.id(), ctx.service);
278 // Sets ChannelOutput to always return false.
279 ctx.output.set_send_status(Status::Unknown());
280 ASSERT_EQ(Status::Unknown(), call.TryFinish(OkStatus()));
281
282 // Call should be still alive.
283 ASSERT_TRUE(call.active());
284
285 // Tries to close the call again, with ChannelOutput set to return ok.
286 ctx.output.set_send_status(OkStatus());
287 ASSERT_EQ(OkStatus(), call.TryFinish(OkStatus()));
288 // Call should be closed.
289 ASSERT_FALSE(call.active());
290 }
291
TEST(NanopbUnaryResponder,Open_ReturnsUsableResponder)292 TEST(NanopbUnaryResponder, Open_ReturnsUsableResponder) {
293 ReaderWriterTestContext<TestService::TestUnaryRpc> ctx;
294 NanopbUnaryResponder responder =
295 NanopbUnaryResponder<pw_rpc_test_TestResponse>::Open<
296 TestService::TestUnaryRpc>(ctx.server, ctx.channel.id(), ctx.service);
297
298 ASSERT_EQ(OkStatus(),
299 responder.Finish({.value = 4321, .repeated_field = {}}));
300
301 EXPECT_EQ(ctx.output.last_response<TestService::TestUnaryRpc>().value, 4321);
302 EXPECT_EQ(ctx.output.last_status(), OkStatus());
303 }
304
TEST(NanopbServerWriter,Open_ReturnsUsableWriter)305 TEST(NanopbServerWriter, Open_ReturnsUsableWriter) {
306 ReaderWriterTestContext<TestService::TestServerStreamRpc> ctx;
307 NanopbServerWriter responder =
308 NanopbServerWriter<pw_rpc_test_TestStreamResponse>::Open<
309 TestService::TestServerStreamRpc>(
310 ctx.server, ctx.channel.id(), ctx.service);
311
312 ASSERT_EQ(OkStatus(), responder.Write({.chunk = {}, .number = 321}));
313 ASSERT_EQ(OkStatus(), responder.Finish());
314
315 EXPECT_EQ(ctx.output.last_response<TestService::TestServerStreamRpc>().number,
316 321u);
317 EXPECT_EQ(ctx.output.last_status(), OkStatus());
318 }
319
TEST(NanopbServerReader,Open_ReturnsUsableReader)320 TEST(NanopbServerReader, Open_ReturnsUsableReader) {
321 ReaderWriterTestContext<TestService::TestClientStreamRpc> ctx;
322 NanopbServerReader responder =
323 NanopbServerReader<pw_rpc_test_TestRequest,
324 pw_rpc_test_TestStreamResponse>::
325 Open<TestService::TestClientStreamRpc>(
326 ctx.server, ctx.channel.id(), ctx.service);
327
328 ASSERT_EQ(OkStatus(), responder.Finish({.chunk = {}, .number = 321}));
329
330 EXPECT_EQ(ctx.output.last_response<TestService::TestClientStreamRpc>().number,
331 321u);
332 }
333
TEST(NanopbServerReaderWriter,Open_ReturnsUsableReaderWriter)334 TEST(NanopbServerReaderWriter, Open_ReturnsUsableReaderWriter) {
335 ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
336 NanopbServerReaderWriter responder =
337 NanopbServerReaderWriter<pw_rpc_test_TestRequest,
338 pw_rpc_test_TestStreamResponse>::
339 Open<TestService::TestBidirectionalStreamRpc>(
340 ctx.server, ctx.channel.id(), ctx.service);
341
342 ASSERT_EQ(OkStatus(), responder.Write({.chunk = {}, .number = 321}));
343 ASSERT_EQ(OkStatus(), responder.Finish(Status::NotFound()));
344
345 EXPECT_EQ(ctx.output.last_response<TestService::TestBidirectionalStreamRpc>()
346 .number,
347 321u);
348 EXPECT_EQ(ctx.output.last_status(), Status::NotFound());
349 }
350
TEST(RawServerReaderWriter,Open_UnknownChannel)351 TEST(RawServerReaderWriter, Open_UnknownChannel) {
352 ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
353 ASSERT_EQ(OkStatus(), ctx.server.CloseChannel(ctx.kChannelId));
354
355 NanopbServerReaderWriter call =
356 NanopbServerReaderWriter<pw_rpc_test_TestRequest,
357 pw_rpc_test_TestStreamResponse>::
358 Open<TestService::TestBidirectionalStreamRpc>(
359 ctx.server, ctx.kChannelId, ctx.service);
360
361 EXPECT_TRUE(call.active());
362 EXPECT_EQ(call.channel_id(), ctx.kChannelId);
363 EXPECT_EQ(Status::Unavailable(), call.Write({}));
364
365 ASSERT_EQ(OkStatus(), ctx.server.OpenChannel(ctx.kChannelId, ctx.output));
366
367 EXPECT_EQ(OkStatus(), call.Write({}));
368 EXPECT_TRUE(call.active());
369
370 EXPECT_EQ(OkStatus(), call.Finish());
371 EXPECT_FALSE(call.active());
372 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
373 }
374
TEST(RawServerReaderWriter,Open_MultipleTimes_CancelsPrevious)375 TEST(RawServerReaderWriter, Open_MultipleTimes_CancelsPrevious) {
376 ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
377
378 NanopbServerReaderWriter one =
379 NanopbServerReaderWriter<pw_rpc_test_TestRequest,
380 pw_rpc_test_TestStreamResponse>::
381 Open<TestService::TestBidirectionalStreamRpc>(
382 ctx.server, ctx.kChannelId, ctx.service);
383
384 std::optional<Status> error;
385 one.set_on_error([&error](Status status) { error = status; });
386
387 ASSERT_TRUE(one.active());
388
389 NanopbServerReaderWriter two =
390 NanopbServerReaderWriter<pw_rpc_test_TestRequest,
391 pw_rpc_test_TestStreamResponse>::
392 Open<TestService::TestBidirectionalStreamRpc>(
393 ctx.server, ctx.kChannelId, ctx.service);
394
395 EXPECT_FALSE(one.active());
396 EXPECT_TRUE(two.active());
397
398 EXPECT_EQ(Status::Cancelled(), error);
399 }
400
TEST(NanopbServerReader,CallbacksMoveCorrectly)401 TEST(NanopbServerReader, CallbacksMoveCorrectly) {
402 PW_NANOPB_TEST_METHOD_CONTEXT(TestServiceImpl, TestClientStreamRpc) ctx;
403
404 NanopbServerReader call_1 = ctx.reader();
405
406 ASSERT_TRUE(call_1.active());
407
408 pw_rpc_test_TestRequest received_request = {.integer = 12345678,
409 .status_code = 1};
410
411 call_1.set_on_next([&received_request](const pw_rpc_test_TestRequest& value) {
412 received_request = value;
413 });
414
415 NanopbServerReader<pw_rpc_test_TestRequest, pw_rpc_test_TestStreamResponse>
416 call_2;
417 call_2 = std::move(call_1);
418
419 constexpr pw_rpc_test_TestRequest request{.integer = 600613,
420 .status_code = 2};
421 ctx.SendClientStream(request);
422 EXPECT_EQ(request.integer, received_request.integer);
423 EXPECT_EQ(request.status_code, received_request.status_code);
424 }
425
426 } // namespace pw::rpc
427