• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_rpc/pwpb/server_reader_writer.h"
16 
17 #include <optional>
18 
19 #include "gtest/gtest.h"
20 #include "pw_rpc/pwpb/fake_channel_output.h"
21 #include "pw_rpc/pwpb/test_method_context.h"
22 #include "pw_rpc/service.h"
23 #include "pw_rpc_test_protos/test.rpc.pwpb.h"
24 
25 namespace pw::rpc {
26 namespace {
27 
28 namespace TestRequest = ::pw::rpc::test::pwpb::TestRequest;
29 namespace TestResponse = ::pw::rpc::test::pwpb::TestResponse;
30 namespace TestStreamResponse = ::pw::rpc::test::pwpb::TestStreamResponse;
31 
32 class TestServiceImpl final
33     : public test::pw_rpc::pwpb::TestService::Service<TestServiceImpl> {
34  public:
TestUnaryRpc(const TestRequest::Message &,TestResponse::Message &)35   Status TestUnaryRpc(const TestRequest::Message&, TestResponse::Message&) {
36     return OkStatus();
37   }
38 
TestAnotherUnaryRpc(const TestRequest::Message &,PwpbUnaryResponder<TestResponse::Message> &)39   void TestAnotherUnaryRpc(const TestRequest::Message&,
40                            PwpbUnaryResponder<TestResponse::Message>&) {}
41 
TestServerStreamRpc(const TestRequest::Message &,PwpbServerWriter<TestStreamResponse::Message> &)42   void TestServerStreamRpc(const TestRequest::Message&,
43                            PwpbServerWriter<TestStreamResponse::Message>&) {}
44 
TestClientStreamRpc(PwpbServerReader<TestRequest::Message,TestStreamResponse::Message> &)45   void TestClientStreamRpc(
46       PwpbServerReader<TestRequest::Message, TestStreamResponse::Message>&) {}
47 
TestBidirectionalStreamRpc(PwpbServerReaderWriter<TestRequest::Message,TestStreamResponse::Message> &)48   void TestBidirectionalStreamRpc(
49       PwpbServerReaderWriter<TestRequest::Message,
50                              TestStreamResponse::Message>&) {}
51 };
52 
53 template <auto kMethod>
54 struct ReaderWriterTestContext {
55   using Info = internal::MethodInfo<kMethod>;
56 
57   static constexpr uint32_t kChannelId = 1;
58 
ReaderWriterTestContextpw::rpc::__anon850436840111::ReaderWriterTestContext59   ReaderWriterTestContext()
60       : channel(Channel::Create<kChannelId>(&output)),
61         server(span(&channel, 1)) {}
62 
63   TestServiceImpl service;
64   PwpbFakeChannelOutput<4> output;
65   Channel channel;
66   Server server;
67 };
68 
69 using test::pw_rpc::pwpb::TestService;
70 
TEST(PwpbUnaryResponder,DefaultConstructed)71 TEST(PwpbUnaryResponder, DefaultConstructed) {
72   PwpbUnaryResponder<TestResponse::Message> call;
73 
74   ASSERT_FALSE(call.active());
75   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
76 
77   EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
78 
79   call.set_on_error([](Status) {});
80 }
81 
TEST(PwpbServerWriter,DefaultConstructed)82 TEST(PwpbServerWriter, DefaultConstructed) {
83   PwpbServerWriter<TestStreamResponse::Message> call;
84 
85   ASSERT_FALSE(call.active());
86   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
87 
88   EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
89   EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus()));
90 
91   call.set_on_error([](Status) {});
92 }
93 
TEST(PwpbServerReader,DefaultConstructed)94 TEST(PwpbServerReader, DefaultConstructed) {
95   PwpbServerReader<TestRequest::Message, TestStreamResponse::Message> call;
96 
97   ASSERT_FALSE(call.active());
98   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
99 
100   EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
101 
102   call.set_on_next([](const TestRequest::Message&) {});
103   call.set_on_error([](Status) {});
104 }
105 
TEST(PwpbServerReaderWriter,DefaultConstructed)106 TEST(PwpbServerReaderWriter, DefaultConstructed) {
107   PwpbServerReaderWriter<TestRequest::Message, TestStreamResponse::Message>
108       call;
109 
110   ASSERT_FALSE(call.active());
111   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
112 
113   EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
114   EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus()));
115 
116   call.set_on_next([](const TestRequest::Message&) {});
117   call.set_on_error([](Status) {});
118 }
119 
TEST(PwpbUnaryResponder,Closed)120 TEST(PwpbUnaryResponder, Closed) {
121   ReaderWriterTestContext<TestService::TestUnaryRpc> ctx;
122   PwpbUnaryResponder call = PwpbUnaryResponder<TestResponse::Message>::Open<
123       TestService::TestUnaryRpc>(ctx.server, ctx.channel.id(), ctx.service);
124   ASSERT_EQ(OkStatus(), call.Finish({}, OkStatus()));
125 
126   ASSERT_FALSE(call.active());
127   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
128 
129   EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
130 
131   call.set_on_error([](Status) {});
132 }
133 
TEST(PwpbServerWriter,Closed)134 TEST(PwpbServerWriter, Closed) {
135   ReaderWriterTestContext<TestService::TestServerStreamRpc> ctx;
136   PwpbServerWriter call = PwpbServerWriter<TestStreamResponse::Message>::Open<
137       TestService::TestServerStreamRpc>(
138       ctx.server, ctx.channel.id(), ctx.service);
139   ASSERT_EQ(OkStatus(), call.Finish(OkStatus()));
140 
141   ASSERT_FALSE(call.active());
142   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
143 
144   EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
145   EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus()));
146 
147   call.set_on_error([](Status) {});
148 }
149 
TEST(PwpbServerReader,Closed)150 TEST(PwpbServerReader, Closed) {
151   ReaderWriterTestContext<TestService::TestClientStreamRpc> ctx;
152   PwpbServerReader call =
153       PwpbServerReader<TestRequest::Message, TestStreamResponse::Message>::Open<
154           TestService::TestClientStreamRpc>(
155           ctx.server, ctx.channel.id(), ctx.service);
156   ASSERT_EQ(OkStatus(), call.Finish({}, OkStatus()));
157 
158   ASSERT_FALSE(call.active());
159   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
160 
161   EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
162 
163   call.set_on_next([](const TestRequest::Message&) {});
164   call.set_on_error([](Status) {});
165 }
166 
TEST(PwpbServerReaderWriter,Closed)167 TEST(PwpbServerReaderWriter, Closed) {
168   ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
169   PwpbServerReaderWriter call =
170       PwpbServerReaderWriter<TestRequest::Message,
171                              TestStreamResponse::Message>::
172           Open<TestService::TestBidirectionalStreamRpc>(
173               ctx.server, ctx.channel.id(), ctx.service);
174   ASSERT_EQ(OkStatus(), call.Finish(OkStatus()));
175 
176   ASSERT_FALSE(call.active());
177   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
178 
179   EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
180   EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus()));
181 
182   call.set_on_next([](const TestRequest::Message&) {});
183   call.set_on_error([](Status) {});
184 }
185 
TEST(PwpbUnaryResponder,Open_ReturnsUsableResponder)186 TEST(PwpbUnaryResponder, Open_ReturnsUsableResponder) {
187   ReaderWriterTestContext<TestService::TestUnaryRpc> ctx;
188   PwpbUnaryResponder responder =
189       PwpbUnaryResponder<TestResponse::Message>::Open<
190           TestService::TestUnaryRpc>(ctx.server, ctx.channel.id(), ctx.service);
191 
192   ASSERT_EQ(OkStatus(),
193             responder.Finish({.value = 4321, .repeated_field = {}}));
194 
195   EXPECT_EQ(ctx.output.last_response<TestService::TestUnaryRpc>().value, 4321);
196   EXPECT_EQ(ctx.output.last_status(), OkStatus());
197 }
198 
TEST(PwpbServerWriter,Open_ReturnsUsableWriter)199 TEST(PwpbServerWriter, Open_ReturnsUsableWriter) {
200   ReaderWriterTestContext<TestService::TestServerStreamRpc> ctx;
201   PwpbServerWriter responder =
202       PwpbServerWriter<TestStreamResponse::Message>::Open<
203           TestService::TestServerStreamRpc>(
204           ctx.server, ctx.channel.id(), ctx.service);
205 
206   ASSERT_EQ(OkStatus(), responder.Write({.chunk = {}, .number = 321}));
207   ASSERT_EQ(OkStatus(), responder.Finish());
208 
209   EXPECT_EQ(ctx.output.last_response<TestService::TestServerStreamRpc>().number,
210             321u);
211   EXPECT_EQ(ctx.output.last_status(), OkStatus());
212 }
213 
TEST(PwpbServerReader,Open_ReturnsUsableReader)214 TEST(PwpbServerReader, Open_ReturnsUsableReader) {
215   ReaderWriterTestContext<TestService::TestClientStreamRpc> ctx;
216   PwpbServerReader responder =
217       PwpbServerReader<TestRequest::Message, TestStreamResponse::Message>::Open<
218           TestService::TestClientStreamRpc>(
219           ctx.server, ctx.channel.id(), ctx.service);
220 
221   ASSERT_EQ(OkStatus(), responder.Finish({.chunk = {}, .number = 321}));
222 
223   EXPECT_EQ(ctx.output.last_response<TestService::TestClientStreamRpc>().number,
224             321u);
225 }
226 
TEST(PwpbServerReaderWriter,Open_ReturnsUsableReaderWriter)227 TEST(PwpbServerReaderWriter, Open_ReturnsUsableReaderWriter) {
228   ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
229   PwpbServerReaderWriter responder =
230       PwpbServerReaderWriter<TestRequest::Message,
231                              TestStreamResponse::Message>::
232           Open<TestService::TestBidirectionalStreamRpc>(
233               ctx.server, ctx.channel.id(), ctx.service);
234 
235   ASSERT_EQ(OkStatus(), responder.Write({.chunk = {}, .number = 321}));
236   ASSERT_EQ(OkStatus(), responder.Finish(Status::NotFound()));
237 
238   EXPECT_EQ(ctx.output.last_response<TestService::TestBidirectionalStreamRpc>()
239                 .number,
240             321u);
241   EXPECT_EQ(ctx.output.last_status(), Status::NotFound());
242 }
243 
TEST(RawServerReaderWriter,Open_UnknownChannel)244 TEST(RawServerReaderWriter, Open_UnknownChannel) {
245   ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
246   ASSERT_EQ(OkStatus(), ctx.server.CloseChannel(ctx.kChannelId));
247 
248   PwpbServerReaderWriter call =
249       PwpbServerReaderWriter<TestRequest::Message,
250                              TestStreamResponse::Message>::
251           Open<TestService::TestBidirectionalStreamRpc>(
252               ctx.server, ctx.kChannelId, ctx.service);
253 
254   EXPECT_TRUE(call.active());
255   EXPECT_EQ(call.channel_id(), ctx.kChannelId);
256   EXPECT_EQ(Status::Unavailable(), call.Write({}));
257 
258   ASSERT_EQ(OkStatus(), ctx.server.OpenChannel(ctx.kChannelId, ctx.output));
259 
260   EXPECT_EQ(OkStatus(), call.Write({}));
261   EXPECT_TRUE(call.active());
262 
263   EXPECT_EQ(OkStatus(), call.Finish());
264   EXPECT_FALSE(call.active());
265   EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
266 }
267 
TEST(RawServerReaderWriter,Open_MultipleTimes_CancelsPrevious)268 TEST(RawServerReaderWriter, Open_MultipleTimes_CancelsPrevious) {
269   ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
270 
271   PwpbServerReaderWriter one =
272       PwpbServerReaderWriter<TestRequest::Message,
273                              TestStreamResponse::Message>::
274           Open<TestService::TestBidirectionalStreamRpc>(
275               ctx.server, ctx.kChannelId, ctx.service);
276 
277   std::optional<Status> error;
278   one.set_on_error([&error](Status status) { error = status; });
279 
280   ASSERT_TRUE(one.active());
281 
282   PwpbServerReaderWriter two =
283       PwpbServerReaderWriter<TestRequest::Message,
284                              TestStreamResponse::Message>::
285           Open<TestService::TestBidirectionalStreamRpc>(
286               ctx.server, ctx.kChannelId, ctx.service);
287 
288   EXPECT_FALSE(one.active());
289   EXPECT_TRUE(two.active());
290 
291   EXPECT_EQ(Status::Cancelled(), error);
292 }
293 
TEST(PwpbServerReader,CallbacksMoveCorrectly)294 TEST(PwpbServerReader, CallbacksMoveCorrectly) {
295   PW_PWPB_TEST_METHOD_CONTEXT(TestServiceImpl, TestClientStreamRpc) ctx;
296 
297   PwpbServerReader call_1 = ctx.reader();
298 
299   ASSERT_TRUE(call_1.active());
300 
301   TestRequest::Message received_request = {.integer = 12345678,
302                                            .status_code = 1};
303 
304   call_1.set_on_next([&received_request](const TestRequest::Message& value) {
305     received_request = value;
306   });
307 
308   PwpbServerReader<TestRequest::Message, TestStreamResponse::Message> call_2;
309   call_2 = std::move(call_1);
310 
311   constexpr TestRequest::Message request{.integer = 600613, .status_code = 2};
312   ctx.SendClientStream(request);
313   EXPECT_EQ(request.integer, received_request.integer);
314   EXPECT_EQ(request.status_code, received_request.status_code);
315 }
316 
317 }  // namespace
318 }  // namespace pw::rpc
319