1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_rpc/pwpb/server_reader_writer.h"
16
17 #include <optional>
18
19 #include "gtest/gtest.h"
20 #include "pw_rpc/pwpb/fake_channel_output.h"
21 #include "pw_rpc/pwpb/test_method_context.h"
22 #include "pw_rpc/service.h"
23 #include "pw_rpc_test_protos/test.rpc.pwpb.h"
24
25 namespace pw::rpc {
26 namespace {
27
28 namespace TestRequest = ::pw::rpc::test::pwpb::TestRequest;
29 namespace TestResponse = ::pw::rpc::test::pwpb::TestResponse;
30 namespace TestStreamResponse = ::pw::rpc::test::pwpb::TestStreamResponse;
31
32 class TestServiceImpl final
33 : public test::pw_rpc::pwpb::TestService::Service<TestServiceImpl> {
34 public:
TestUnaryRpc(const TestRequest::Message &,TestResponse::Message &)35 Status TestUnaryRpc(const TestRequest::Message&, TestResponse::Message&) {
36 return OkStatus();
37 }
38
TestAnotherUnaryRpc(const TestRequest::Message &,PwpbUnaryResponder<TestResponse::Message> &)39 void TestAnotherUnaryRpc(const TestRequest::Message&,
40 PwpbUnaryResponder<TestResponse::Message>&) {}
41
TestServerStreamRpc(const TestRequest::Message &,PwpbServerWriter<TestStreamResponse::Message> &)42 void TestServerStreamRpc(const TestRequest::Message&,
43 PwpbServerWriter<TestStreamResponse::Message>&) {}
44
TestClientStreamRpc(PwpbServerReader<TestRequest::Message,TestStreamResponse::Message> &)45 void TestClientStreamRpc(
46 PwpbServerReader<TestRequest::Message, TestStreamResponse::Message>&) {}
47
TestBidirectionalStreamRpc(PwpbServerReaderWriter<TestRequest::Message,TestStreamResponse::Message> &)48 void TestBidirectionalStreamRpc(
49 PwpbServerReaderWriter<TestRequest::Message,
50 TestStreamResponse::Message>&) {}
51 };
52
53 template <auto kMethod>
54 struct ReaderWriterTestContext {
55 using Info = internal::MethodInfo<kMethod>;
56
57 static constexpr uint32_t kChannelId = 1;
58
ReaderWriterTestContextpw::rpc::__anon850436840111::ReaderWriterTestContext59 ReaderWriterTestContext()
60 : channel(Channel::Create<kChannelId>(&output)),
61 server(span(&channel, 1)) {}
62
63 TestServiceImpl service;
64 PwpbFakeChannelOutput<4> output;
65 Channel channel;
66 Server server;
67 };
68
69 using test::pw_rpc::pwpb::TestService;
70
TEST(PwpbUnaryResponder,DefaultConstructed)71 TEST(PwpbUnaryResponder, DefaultConstructed) {
72 PwpbUnaryResponder<TestResponse::Message> call;
73
74 ASSERT_FALSE(call.active());
75 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
76
77 EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
78
79 call.set_on_error([](Status) {});
80 }
81
TEST(PwpbServerWriter,DefaultConstructed)82 TEST(PwpbServerWriter, DefaultConstructed) {
83 PwpbServerWriter<TestStreamResponse::Message> call;
84
85 ASSERT_FALSE(call.active());
86 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
87
88 EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
89 EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus()));
90
91 call.set_on_error([](Status) {});
92 }
93
TEST(PwpbServerReader,DefaultConstructed)94 TEST(PwpbServerReader, DefaultConstructed) {
95 PwpbServerReader<TestRequest::Message, TestStreamResponse::Message> call;
96
97 ASSERT_FALSE(call.active());
98 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
99
100 EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
101
102 call.set_on_next([](const TestRequest::Message&) {});
103 call.set_on_error([](Status) {});
104 }
105
TEST(PwpbServerReaderWriter,DefaultConstructed)106 TEST(PwpbServerReaderWriter, DefaultConstructed) {
107 PwpbServerReaderWriter<TestRequest::Message, TestStreamResponse::Message>
108 call;
109
110 ASSERT_FALSE(call.active());
111 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
112
113 EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
114 EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus()));
115
116 call.set_on_next([](const TestRequest::Message&) {});
117 call.set_on_error([](Status) {});
118 }
119
TEST(PwpbUnaryResponder,Closed)120 TEST(PwpbUnaryResponder, Closed) {
121 ReaderWriterTestContext<TestService::TestUnaryRpc> ctx;
122 PwpbUnaryResponder call = PwpbUnaryResponder<TestResponse::Message>::Open<
123 TestService::TestUnaryRpc>(ctx.server, ctx.channel.id(), ctx.service);
124 ASSERT_EQ(OkStatus(), call.Finish({}, OkStatus()));
125
126 ASSERT_FALSE(call.active());
127 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
128
129 EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
130
131 call.set_on_error([](Status) {});
132 }
133
TEST(PwpbServerWriter,Closed)134 TEST(PwpbServerWriter, Closed) {
135 ReaderWriterTestContext<TestService::TestServerStreamRpc> ctx;
136 PwpbServerWriter call = PwpbServerWriter<TestStreamResponse::Message>::Open<
137 TestService::TestServerStreamRpc>(
138 ctx.server, ctx.channel.id(), ctx.service);
139 ASSERT_EQ(OkStatus(), call.Finish(OkStatus()));
140
141 ASSERT_FALSE(call.active());
142 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
143
144 EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
145 EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus()));
146
147 call.set_on_error([](Status) {});
148 }
149
TEST(PwpbServerReader,Closed)150 TEST(PwpbServerReader, Closed) {
151 ReaderWriterTestContext<TestService::TestClientStreamRpc> ctx;
152 PwpbServerReader call =
153 PwpbServerReader<TestRequest::Message, TestStreamResponse::Message>::Open<
154 TestService::TestClientStreamRpc>(
155 ctx.server, ctx.channel.id(), ctx.service);
156 ASSERT_EQ(OkStatus(), call.Finish({}, OkStatus()));
157
158 ASSERT_FALSE(call.active());
159 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
160
161 EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
162
163 call.set_on_next([](const TestRequest::Message&) {});
164 call.set_on_error([](Status) {});
165 }
166
TEST(PwpbServerReaderWriter,Closed)167 TEST(PwpbServerReaderWriter, Closed) {
168 ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
169 PwpbServerReaderWriter call =
170 PwpbServerReaderWriter<TestRequest::Message,
171 TestStreamResponse::Message>::
172 Open<TestService::TestBidirectionalStreamRpc>(
173 ctx.server, ctx.channel.id(), ctx.service);
174 ASSERT_EQ(OkStatus(), call.Finish(OkStatus()));
175
176 ASSERT_FALSE(call.active());
177 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
178
179 EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
180 EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus()));
181
182 call.set_on_next([](const TestRequest::Message&) {});
183 call.set_on_error([](Status) {});
184 }
185
TEST(PwpbUnaryResponder,Open_ReturnsUsableResponder)186 TEST(PwpbUnaryResponder, Open_ReturnsUsableResponder) {
187 ReaderWriterTestContext<TestService::TestUnaryRpc> ctx;
188 PwpbUnaryResponder responder =
189 PwpbUnaryResponder<TestResponse::Message>::Open<
190 TestService::TestUnaryRpc>(ctx.server, ctx.channel.id(), ctx.service);
191
192 ASSERT_EQ(OkStatus(),
193 responder.Finish({.value = 4321, .repeated_field = {}}));
194
195 EXPECT_EQ(ctx.output.last_response<TestService::TestUnaryRpc>().value, 4321);
196 EXPECT_EQ(ctx.output.last_status(), OkStatus());
197 }
198
TEST(PwpbServerWriter,Open_ReturnsUsableWriter)199 TEST(PwpbServerWriter, Open_ReturnsUsableWriter) {
200 ReaderWriterTestContext<TestService::TestServerStreamRpc> ctx;
201 PwpbServerWriter responder =
202 PwpbServerWriter<TestStreamResponse::Message>::Open<
203 TestService::TestServerStreamRpc>(
204 ctx.server, ctx.channel.id(), ctx.service);
205
206 ASSERT_EQ(OkStatus(), responder.Write({.chunk = {}, .number = 321}));
207 ASSERT_EQ(OkStatus(), responder.Finish());
208
209 EXPECT_EQ(ctx.output.last_response<TestService::TestServerStreamRpc>().number,
210 321u);
211 EXPECT_EQ(ctx.output.last_status(), OkStatus());
212 }
213
TEST(PwpbServerReader,Open_ReturnsUsableReader)214 TEST(PwpbServerReader, Open_ReturnsUsableReader) {
215 ReaderWriterTestContext<TestService::TestClientStreamRpc> ctx;
216 PwpbServerReader responder =
217 PwpbServerReader<TestRequest::Message, TestStreamResponse::Message>::Open<
218 TestService::TestClientStreamRpc>(
219 ctx.server, ctx.channel.id(), ctx.service);
220
221 ASSERT_EQ(OkStatus(), responder.Finish({.chunk = {}, .number = 321}));
222
223 EXPECT_EQ(ctx.output.last_response<TestService::TestClientStreamRpc>().number,
224 321u);
225 }
226
TEST(PwpbServerReaderWriter,Open_ReturnsUsableReaderWriter)227 TEST(PwpbServerReaderWriter, Open_ReturnsUsableReaderWriter) {
228 ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
229 PwpbServerReaderWriter responder =
230 PwpbServerReaderWriter<TestRequest::Message,
231 TestStreamResponse::Message>::
232 Open<TestService::TestBidirectionalStreamRpc>(
233 ctx.server, ctx.channel.id(), ctx.service);
234
235 ASSERT_EQ(OkStatus(), responder.Write({.chunk = {}, .number = 321}));
236 ASSERT_EQ(OkStatus(), responder.Finish(Status::NotFound()));
237
238 EXPECT_EQ(ctx.output.last_response<TestService::TestBidirectionalStreamRpc>()
239 .number,
240 321u);
241 EXPECT_EQ(ctx.output.last_status(), Status::NotFound());
242 }
243
TEST(RawServerReaderWriter,Open_UnknownChannel)244 TEST(RawServerReaderWriter, Open_UnknownChannel) {
245 ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
246 ASSERT_EQ(OkStatus(), ctx.server.CloseChannel(ctx.kChannelId));
247
248 PwpbServerReaderWriter call =
249 PwpbServerReaderWriter<TestRequest::Message,
250 TestStreamResponse::Message>::
251 Open<TestService::TestBidirectionalStreamRpc>(
252 ctx.server, ctx.kChannelId, ctx.service);
253
254 EXPECT_TRUE(call.active());
255 EXPECT_EQ(call.channel_id(), ctx.kChannelId);
256 EXPECT_EQ(Status::Unavailable(), call.Write({}));
257
258 ASSERT_EQ(OkStatus(), ctx.server.OpenChannel(ctx.kChannelId, ctx.output));
259
260 EXPECT_EQ(OkStatus(), call.Write({}));
261 EXPECT_TRUE(call.active());
262
263 EXPECT_EQ(OkStatus(), call.Finish());
264 EXPECT_FALSE(call.active());
265 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
266 }
267
TEST(RawServerReaderWriter,Open_MultipleTimes_CancelsPrevious)268 TEST(RawServerReaderWriter, Open_MultipleTimes_CancelsPrevious) {
269 ReaderWriterTestContext<TestService::TestBidirectionalStreamRpc> ctx;
270
271 PwpbServerReaderWriter one =
272 PwpbServerReaderWriter<TestRequest::Message,
273 TestStreamResponse::Message>::
274 Open<TestService::TestBidirectionalStreamRpc>(
275 ctx.server, ctx.kChannelId, ctx.service);
276
277 std::optional<Status> error;
278 one.set_on_error([&error](Status status) { error = status; });
279
280 ASSERT_TRUE(one.active());
281
282 PwpbServerReaderWriter two =
283 PwpbServerReaderWriter<TestRequest::Message,
284 TestStreamResponse::Message>::
285 Open<TestService::TestBidirectionalStreamRpc>(
286 ctx.server, ctx.kChannelId, ctx.service);
287
288 EXPECT_FALSE(one.active());
289 EXPECT_TRUE(two.active());
290
291 EXPECT_EQ(Status::Cancelled(), error);
292 }
293
TEST(PwpbServerReader,CallbacksMoveCorrectly)294 TEST(PwpbServerReader, CallbacksMoveCorrectly) {
295 PW_PWPB_TEST_METHOD_CONTEXT(TestServiceImpl, TestClientStreamRpc) ctx;
296
297 PwpbServerReader call_1 = ctx.reader();
298
299 ASSERT_TRUE(call_1.active());
300
301 TestRequest::Message received_request = {.integer = 12345678,
302 .status_code = 1};
303
304 call_1.set_on_next([&received_request](const TestRequest::Message& value) {
305 received_request = value;
306 });
307
308 PwpbServerReader<TestRequest::Message, TestStreamResponse::Message> call_2;
309 call_2 = std::move(call_1);
310
311 constexpr TestRequest::Message request{.integer = 600613, .status_code = 2};
312 ctx.SendClientStream(request);
313 EXPECT_EQ(request.integer, received_request.integer);
314 EXPECT_EQ(request.status_code, received_request.status_code);
315 }
316
317 } // namespace
318 } // namespace pw::rpc
319