1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_rpc/synchronous_call.h"
16
17 #include <chrono>
18
19 #include "gtest/gtest.h"
20 #include "pw_chrono/system_clock.h"
21 #include "pw_rpc/channel.h"
22 #include "pw_rpc/internal/packet.h"
23 #include "pw_rpc/pwpb/fake_channel_output.h"
24 #include "pw_rpc_test_protos/test.rpc.pwpb.h"
25 #include "pw_status/status.h"
26 #include "pw_status/status_with_size.h"
27 #include "pw_thread/thread.h"
28 #include "pw_work_queue/test_thread.h"
29 #include "pw_work_queue/work_queue.h"
30
31 namespace pw::rpc::test {
32 namespace {
33
34 using pw::rpc::test::pw_rpc::pwpb::TestService;
35 using MethodInfo = internal::MethodInfo<TestService::TestUnaryRpc>;
36
37 namespace TestRequest = ::pw::rpc::test::pwpb::TestRequest;
38 namespace TestResponse = ::pw::rpc::test::pwpb::TestResponse;
39
40 class SynchronousCallTest : public ::testing::Test {
41 public:
SynchronousCallTest()42 SynchronousCallTest()
43 : channels_({{Channel::Create<42>(&fake_output_)}}), client_(channels_) {}
44
SetUp()45 void SetUp() override {
46 work_thread_ =
47 thread::Thread(work_queue::test::WorkQueueThreadOptions(), work_queue_);
48 }
49
TearDown()50 void TearDown() override {
51 work_queue_.RequestStop();
52 work_thread_.join();
53 }
54
55 protected:
56 using FakeChannelOutput = PwpbFakeChannelOutput<2>;
57
OnSend(span<const std::byte> buffer,Status status)58 void OnSend(span<const std::byte> buffer, Status status) {
59 if (!status.ok()) {
60 return;
61 }
62 auto result = internal::Packet::FromBuffer(buffer);
63 EXPECT_TRUE(result.ok());
64 request_packet_ = *result;
65
66 EXPECT_TRUE(work_queue_.PushWork([this]() { SendResponse(); }).ok());
67 }
68
SendResponse()69 void SendResponse() {
70 std::array<std::byte, 256> buffer;
71 std::array<std::byte, 32> payload_buffer;
72
73 StatusWithSize size_status =
74 MethodInfo::serde().response().Encode(response_, payload_buffer);
75 EXPECT_TRUE(size_status.ok());
76
77 auto response =
78 internal::Packet::Response(request_packet_, response_status_);
79 response.set_payload({payload_buffer.data(), size_status.size()});
80 EXPECT_TRUE(client_.ProcessPacket(response.Encode(buffer).value()).ok());
81 }
82
set_response(const TestResponse::Message & response,Status response_status=OkStatus ())83 void set_response(const TestResponse::Message& response,
84 Status response_status = OkStatus()) {
85 response_ = response;
86 response_status_ = response_status;
87 output().set_on_send([this](span<const std::byte> buffer, Status status) {
88 OnSend(buffer, status);
89 });
90 }
91
generated_client()92 MethodInfo::GeneratedClient generated_client() {
93 return MethodInfo::GeneratedClient(client(), channel().id());
94 }
95
output()96 FakeChannelOutput& output() { return fake_output_; }
channel() const97 const Channel& channel() const { return channels_.front(); }
client()98 Client& client() { return client_; }
99
100 private:
101 FakeChannelOutput fake_output_;
102 std::array<Channel, 1> channels_;
103 Client client_;
104 thread::Thread work_thread_;
105 work_queue::WorkQueueWithBuffer<1> work_queue_;
106 TestResponse::Message response_{};
107 Status response_status_ = OkStatus();
108 internal::Packet request_packet_;
109 };
110
TEST_F(SynchronousCallTest,SynchronousCallSuccess)111 TEST_F(SynchronousCallTest, SynchronousCallSuccess) {
112 TestRequest::Message request{.integer = 5, .status_code = 0};
113 TestResponse::Message response{.value = 42, .repeated_field{}};
114
115 set_response(response, OkStatus());
116
117 auto result = SynchronousCall<TestService::TestUnaryRpc>(
118 client(), channel().id(), request);
119 EXPECT_TRUE(result.ok());
120 EXPECT_EQ(result.response().value, 42);
121 }
122
TEST_F(SynchronousCallTest,SynchronousCallServerError)123 TEST_F(SynchronousCallTest, SynchronousCallServerError) {
124 TestRequest::Message request{.integer = 5, .status_code = 0};
125 TestResponse::Message response{.value = 42, .repeated_field{}};
126
127 set_response(response, Status::Internal());
128
129 auto result = SynchronousCall<TestService::TestUnaryRpc>(
130 client(), channel().id(), request);
131 EXPECT_TRUE(result.is_error());
132 EXPECT_EQ(result.status(), Status::Internal());
133
134 // We should still receive the response
135 EXPECT_TRUE(result.is_server_response());
136 EXPECT_EQ(result.response().value, 42);
137 }
138
TEST_F(SynchronousCallTest,SynchronousCallRpcError)139 TEST_F(SynchronousCallTest, SynchronousCallRpcError) {
140 TestRequest::Message request{.integer = 5, .status_code = 0};
141
142 // Internally, if Channel receives a non-ok status from the
143 // ChannelOutput::Send, it will always return Unknown.
144 output().set_send_status(Status::Unknown());
145
146 auto result = SynchronousCall<TestService::TestUnaryRpc>(
147 client(), channel().id(), request);
148 EXPECT_TRUE(result.is_rpc_error());
149 EXPECT_EQ(result.status(), Status::Unknown());
150 }
151
TEST_F(SynchronousCallTest,SynchronousCallForTimeoutError)152 TEST_F(SynchronousCallTest, SynchronousCallForTimeoutError) {
153 TestRequest::Message request{.integer = 5, .status_code = 0};
154
155 auto result = SynchronousCallFor<TestService::TestUnaryRpc>(
156 client(),
157 channel().id(),
158 request,
159 chrono::SystemClock::for_at_least(std::chrono::milliseconds(1)));
160
161 EXPECT_TRUE(result.is_timeout());
162 EXPECT_EQ(result.status(), Status::DeadlineExceeded());
163 }
164
TEST_F(SynchronousCallTest,SynchronousCallUntilTimeoutError)165 TEST_F(SynchronousCallTest, SynchronousCallUntilTimeoutError) {
166 TestRequest::Message request{.integer = 5, .status_code = 0};
167
168 auto result = SynchronousCallUntil<TestService::TestUnaryRpc>(
169 client(), channel().id(), request, chrono::SystemClock::now());
170
171 EXPECT_TRUE(result.is_timeout());
172 EXPECT_EQ(result.status(), Status::DeadlineExceeded());
173 }
174
TEST_F(SynchronousCallTest,GeneratedClientSynchronousCallSuccess)175 TEST_F(SynchronousCallTest, GeneratedClientSynchronousCallSuccess) {
176 TestRequest::Message request{.integer = 5, .status_code = 0};
177 TestResponse::Message response{.value = 42, .repeated_field{}};
178
179 set_response(response, OkStatus());
180
181 auto result =
182 SynchronousCall<TestService::TestUnaryRpc>(generated_client(), request);
183 EXPECT_TRUE(result.ok());
184 EXPECT_EQ(result.response().value, 42);
185 }
186
TEST_F(SynchronousCallTest,GeneratedClientSynchronousCallServerError)187 TEST_F(SynchronousCallTest, GeneratedClientSynchronousCallServerError) {
188 TestRequest::Message request{.integer = 5, .status_code = 0};
189 TestResponse::Message response{.value = 42, .repeated_field{}};
190
191 set_response(response, Status::Internal());
192
193 auto result =
194 SynchronousCall<TestService::TestUnaryRpc>(generated_client(), request);
195 EXPECT_TRUE(result.is_error());
196 EXPECT_EQ(result.status(), Status::Internal());
197
198 // We should still receive the response
199 EXPECT_TRUE(result.is_server_response());
200 EXPECT_EQ(result.response().value, 42);
201 }
202
TEST_F(SynchronousCallTest,GeneratedClientSynchronousCallRpcError)203 TEST_F(SynchronousCallTest, GeneratedClientSynchronousCallRpcError) {
204 TestRequest::Message request{.integer = 5, .status_code = 0};
205
206 // Internally, if Channel receives a non-ok status from the
207 // ChannelOutput::Send, it will always return Unknown.
208 output().set_send_status(Status::Unknown());
209
210 auto result =
211 SynchronousCall<TestService::TestUnaryRpc>(generated_client(), request);
212 EXPECT_TRUE(result.is_rpc_error());
213 EXPECT_EQ(result.status(), Status::Unknown());
214 }
215
TEST_F(SynchronousCallTest,GeneratedClientSynchronousCallForTimeoutError)216 TEST_F(SynchronousCallTest, GeneratedClientSynchronousCallForTimeoutError) {
217 TestRequest::Message request{.integer = 5, .status_code = 0};
218
219 auto result = SynchronousCallFor<TestService::TestUnaryRpc>(
220 generated_client(),
221 request,
222 chrono::SystemClock::for_at_least(std::chrono::milliseconds(1)));
223
224 EXPECT_TRUE(result.is_timeout());
225 EXPECT_EQ(result.status(), Status::DeadlineExceeded());
226 }
227
TEST_F(SynchronousCallTest,GeneratedClientSynchronousCallUntilTimeoutError)228 TEST_F(SynchronousCallTest, GeneratedClientSynchronousCallUntilTimeoutError) {
229 TestRequest::Message request{.integer = 5, .status_code = 0};
230
231 auto result = SynchronousCallUntil<TestService::TestUnaryRpc>(
232 generated_client(), request, chrono::SystemClock::now());
233
234 EXPECT_TRUE(result.is_timeout());
235 EXPECT_EQ(result.status(), Status::DeadlineExceeded());
236 }
237 } // namespace
238 } // namespace pw::rpc::test
239