1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "gtest/gtest.h"
16 #include "pw_assert/check.h"
17 #include "pw_rpc/benchmark.rpc.pwpb.h"
18 #include "pw_rpc/integration_testing.h"
19 #include "pw_sync/binary_semaphore.h"
20
21 namespace pwpb_rpc_test {
22 namespace {
23
24 using namespace std::chrono_literals;
25 using pw::ByteSpan;
26 using pw::ConstByteSpan;
27 using pw::Function;
28 using pw::OkStatus;
29 using pw::Status;
30
31 using pw::rpc::pw_rpc::pwpb::Benchmark;
32
33 constexpr int kIterations = 10;
34
35 class PayloadReceiver {
36 public:
Wait()37 const char* Wait() {
38 PW_CHECK(sem_.try_acquire_for(1500ms));
39 return reinterpret_cast<const char*>(payload_.payload.data());
40 }
41
UnaryOnCompleted()42 Function<void(const pw::rpc::Payload::Message&, Status)> UnaryOnCompleted() {
43 return [this](const pw::rpc::Payload::Message& data, Status) {
44 CopyPayload(data);
45 };
46 }
47
OnNext()48 Function<void(const pw::rpc::Payload::Message&)> OnNext() {
49 return [this](const pw::rpc::Payload::Message& data) { CopyPayload(data); };
50 }
51
52 private:
CopyPayload(const pw::rpc::Payload::Message & data)53 void CopyPayload(const pw::rpc::Payload::Message& data) {
54 payload_ = data;
55 sem_.release();
56 }
57
58 pw::sync::BinarySemaphore sem_;
59 pw::rpc::Payload::Message payload_ = {};
60 };
61
62 template <size_t kSize>
Payload(const char (& string)[kSize])63 pw::rpc::Payload::Message Payload(const char (&string)[kSize]) {
64 static_assert(kSize <= sizeof(pw::rpc::Payload::Message::payload));
65 pw::rpc::Payload::Message payload{};
66 payload.payload.resize(kSize);
67 std::memcpy(payload.payload.data(), string, kSize);
68 return payload;
69 }
70
71 const Benchmark::Client kClient(pw::rpc::integration_test::client(),
72 pw::rpc::integration_test::kChannelId);
73
TEST(PwpbRpcIntegrationTest,Unary)74 TEST(PwpbRpcIntegrationTest, Unary) {
75 char value[] = {"hello, world!"};
76
77 for (int i = 0; i < kIterations; ++i) {
78 PayloadReceiver receiver;
79
80 value[0] = static_cast<char>(i);
81 pw::rpc::PwpbUnaryReceiver call =
82 kClient.UnaryEcho(Payload(value), receiver.UnaryOnCompleted());
83 ASSERT_STREQ(receiver.Wait(), value);
84 }
85 }
86
TEST(PwpbRpcIntegrationTest,Unary_ReuseCall)87 TEST(PwpbRpcIntegrationTest, Unary_ReuseCall) {
88 pw::rpc::PwpbUnaryReceiver<pw::rpc::Payload::Message> call;
89 char value[] = {"O_o "};
90
91 for (int i = 0; i < kIterations; ++i) {
92 PayloadReceiver receiver;
93
94 value[sizeof(value) - 2] = static_cast<char>(i);
95 call = kClient.UnaryEcho(Payload(value), receiver.UnaryOnCompleted());
96 ASSERT_STREQ(receiver.Wait(), value);
97 }
98 }
99
TEST(PwpbRpcIntegrationTest,Unary_DiscardCalls)100 TEST(PwpbRpcIntegrationTest, Unary_DiscardCalls) {
101 constexpr int iterations = PW_RPC_USE_GLOBAL_MUTEX ? 10000 : 1;
102 for (int i = 0; i < iterations; ++i) {
103 kClient.UnaryEcho(Payload("O_o"));
104 }
105 }
106
TEST(PwpbRpcIntegrationTest,BidirectionalStreaming_MoveCalls)107 TEST(PwpbRpcIntegrationTest, BidirectionalStreaming_MoveCalls) {
108 for (int i = 0; i < kIterations; ++i) {
109 PayloadReceiver receiver;
110 pw::rpc::PwpbClientReaderWriter call =
111 kClient.BidirectionalEcho(receiver.OnNext());
112
113 ASSERT_EQ(OkStatus(), call.Write(Payload("Yello")));
114 ASSERT_STREQ(receiver.Wait(), "Yello");
115
116 pw::rpc::PwpbClientReaderWriter<pw::rpc::Payload::Message,
117 pw::rpc::Payload::Message>
118 new_call = std::move(call);
119
120 // NOLINTNEXTLINE(bugprone-use-after-move)
121 EXPECT_EQ(Status::FailedPrecondition(), call.Write(Payload("Dello")));
122
123 ASSERT_EQ(OkStatus(), new_call.Write(Payload("Dello")));
124 ASSERT_STREQ(receiver.Wait(), "Dello");
125
126 call = std::move(new_call);
127
128 // NOLINTNEXTLINE(bugprone-use-after-move)
129 EXPECT_EQ(Status::FailedPrecondition(), new_call.Write(Payload("Dello")));
130
131 ASSERT_EQ(OkStatus(), call.Write(Payload("???")));
132 ASSERT_STREQ(receiver.Wait(), "???");
133
134 EXPECT_EQ(OkStatus(), call.Cancel());
135 EXPECT_EQ(Status::FailedPrecondition(), new_call.Cancel());
136 }
137 }
138
TEST(PwpbRpcIntegrationTest,BidirectionalStreaming_ReuseCall)139 TEST(PwpbRpcIntegrationTest, BidirectionalStreaming_ReuseCall) {
140 pw::rpc::PwpbClientReaderWriter<pw::rpc::Payload::Message,
141 pw::rpc::Payload::Message>
142 call;
143
144 for (int i = 0; i < kIterations; ++i) {
145 PayloadReceiver receiver;
146 call = kClient.BidirectionalEcho(receiver.OnNext());
147
148 ASSERT_EQ(OkStatus(), call.Write(Payload("Yello")));
149 ASSERT_STREQ(receiver.Wait(), "Yello");
150
151 ASSERT_EQ(OkStatus(), call.Write(Payload("Dello")));
152 ASSERT_STREQ(receiver.Wait(), "Dello");
153
154 ASSERT_EQ(OkStatus(), call.Write(Payload("???")));
155 ASSERT_STREQ(receiver.Wait(), "???");
156 }
157 }
158
159 } // namespace
160 } // namespace pwpb_rpc_test
161