1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_transfer/transfer_thread.h"
16
17 #include "gtest/gtest.h"
18 #include "pw_assert/check.h"
19 #include "pw_bytes/array.h"
20 #include "pw_rpc/raw/client_testing.h"
21 #include "pw_rpc/raw/test_method_context.h"
22 #include "pw_rpc/test_helpers.h"
23 #include "pw_thread/thread.h"
24 #include "pw_thread_stl/options.h"
25 #include "pw_transfer/handler.h"
26 #include "pw_transfer/transfer.h"
27 #include "pw_transfer/transfer.raw_rpc.pb.h"
28 #include "pw_transfer_private/chunk_testing.h"
29
30 namespace pw::transfer::test {
31 namespace {
32
33 using internal::Chunk;
34
35 // TODO(frolv): Have a generic way to obtain a thread for testing on any system.
TransferThreadOptions()36 thread::Options& TransferThreadOptions() {
37 static thread::stl::Options options;
38 return options;
39 }
40
41 class TransferThreadTest : public ::testing::Test {
42 public:
TransferThreadTest()43 TransferThreadTest()
44 : ctx_(transfer_thread_, 512),
45 max_parameters_(chunk_buffer_.size(),
46 chunk_buffer_.size(),
47 cfg::kDefaultExtendWindowDivisor),
48 transfer_thread_(chunk_buffer_, encode_buffer_),
49 system_thread_(TransferThreadOptions(), transfer_thread_) {}
50
~TransferThreadTest()51 ~TransferThreadTest() override {
52 transfer_thread_.Terminate();
53 system_thread_.join();
54 }
55
56 protected:
57 PW_RAW_TEST_METHOD_CONTEXT(TransferService, Read) ctx_;
58
59 std::array<std::byte, 64> chunk_buffer_;
60 std::array<std::byte, 64> encode_buffer_;
61
62 rpc::RawClientTestContext<> rpc_client_context_;
63 internal::TransferParameters max_parameters_;
64
65 transfer::Thread<1, 1> transfer_thread_;
66
67 thread::Thread system_thread_;
68 };
69
70 class SimpleReadTransfer final : public ReadOnlyHandler {
71 public:
SimpleReadTransfer(uint32_t session_id,ConstByteSpan data)72 SimpleReadTransfer(uint32_t session_id, ConstByteSpan data)
73 : ReadOnlyHandler(session_id),
74 prepare_read_called(false),
75 finalize_read_called(false),
76 finalize_read_status(Status::Unknown()),
77 reader_(data) {}
78
PrepareRead()79 Status PrepareRead() final {
80 PW_CHECK_OK(reader_.Seek(0));
81 set_reader(reader_);
82 prepare_read_called = true;
83 return OkStatus();
84 }
85
FinalizeRead(Status status)86 void FinalizeRead(Status status) final {
87 finalize_read_called = true;
88 finalize_read_status = status;
89 }
90
91 bool prepare_read_called;
92 bool finalize_read_called;
93 Status finalize_read_status;
94
95 private:
96 stream::MemoryReader reader_;
97 };
98
__anonfb73af530202(size_t i) 99 constexpr auto kData = bytes::Initialized<32>([](size_t i) { return i; });
100
TEST_F(TransferThreadTest,AddTransferHandler)101 TEST_F(TransferThreadTest, AddTransferHandler) {
102 auto reader_writer = ctx_.reader_writer();
103 transfer_thread_.SetServerReadStream(reader_writer);
104
105 SimpleReadTransfer handler(3, kData);
106 transfer_thread_.AddTransferHandler(handler);
107
108 transfer_thread_.StartServerTransfer(internal::TransferType::kTransmit,
109 ProtocolVersion::kLegacy,
110 3,
111 3,
112 {},
113 max_parameters_,
114 std::chrono::seconds(2),
115 3,
116 10);
117
118 transfer_thread_.WaitUntilEventIsProcessed();
119
120 EXPECT_TRUE(handler.prepare_read_called);
121
122 transfer_thread_.RemoveTransferHandler(handler);
123 }
124
TEST_F(TransferThreadTest,RemoveTransferHandler)125 TEST_F(TransferThreadTest, RemoveTransferHandler) {
126 auto reader_writer = ctx_.reader_writer();
127 transfer_thread_.SetServerReadStream(reader_writer);
128
129 SimpleReadTransfer handler(3, kData);
130 transfer_thread_.AddTransferHandler(handler);
131 transfer_thread_.RemoveTransferHandler(handler);
132
133 transfer_thread_.StartServerTransfer(internal::TransferType::kTransmit,
134 ProtocolVersion::kLegacy,
135 3,
136 3,
137 {},
138 max_parameters_,
139 std::chrono::seconds(2),
140 3,
141 10);
142
143 transfer_thread_.WaitUntilEventIsProcessed();
144
145 EXPECT_FALSE(handler.prepare_read_called);
146
147 ASSERT_EQ(ctx_.total_responses(), 1u);
148 auto chunk = DecodeChunk(ctx_.response());
149 EXPECT_EQ(chunk.session_id(), 3u);
150 ASSERT_TRUE(chunk.status().has_value());
151 EXPECT_EQ(chunk.status().value(), Status::NotFound());
152
153 transfer_thread_.RemoveTransferHandler(handler);
154 }
155
TEST_F(TransferThreadTest,ProcessChunk_SendsWindow)156 TEST_F(TransferThreadTest, ProcessChunk_SendsWindow) {
157 auto reader_writer = ctx_.reader_writer();
158 transfer_thread_.SetServerReadStream(reader_writer);
159
160 SimpleReadTransfer handler(3, kData);
161 transfer_thread_.AddTransferHandler(handler);
162
163 rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
164 transfer_thread_.StartServerTransfer(
165 internal::TransferType::kTransmit,
166 ProtocolVersion::kLegacy,
167 3,
168 3,
169 EncodeChunk(
170 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
171 .set_session_id(3)
172 .set_window_end_offset(16)
173 .set_max_chunk_size_bytes(8)
174 .set_offset(0)),
175 max_parameters_,
176 std::chrono::seconds(2),
177 3,
178 10);
179 });
180
181 ASSERT_EQ(ctx_.total_responses(), 2u);
182 auto chunk = DecodeChunk(ctx_.responses()[0]);
183 EXPECT_EQ(chunk.session_id(), 3u);
184 EXPECT_EQ(chunk.offset(), 0u);
185 EXPECT_EQ(chunk.payload().size(), 8u);
186 EXPECT_EQ(
187 std::memcmp(chunk.payload().data(), kData.data(), chunk.payload().size()),
188 0);
189
190 chunk = DecodeChunk(ctx_.responses()[1]);
191 EXPECT_EQ(chunk.session_id(), 3u);
192 EXPECT_EQ(chunk.offset(), 8u);
193 EXPECT_EQ(chunk.payload().size(), 8u);
194 EXPECT_EQ(
195 std::memcmp(
196 chunk.payload().data(), kData.data() + 8, chunk.payload().size()),
197 0);
198
199 transfer_thread_.RemoveTransferHandler(handler);
200 }
201
TEST_F(TransferThreadTest,StartTransferExhausted_Server)202 TEST_F(TransferThreadTest, StartTransferExhausted_Server) {
203 auto reader_writer = ctx_.reader_writer();
204 transfer_thread_.SetServerReadStream(reader_writer);
205
206 SimpleReadTransfer handler3(3, kData);
207 SimpleReadTransfer handler4(4, kData);
208 transfer_thread_.AddTransferHandler(handler3);
209 transfer_thread_.AddTransferHandler(handler4);
210
211 transfer_thread_.StartServerTransfer(
212 internal::TransferType::kTransmit,
213 ProtocolVersion::kLegacy,
214 3,
215 3,
216 EncodeChunk(
217 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
218 .set_session_id(3)
219 .set_window_end_offset(16)
220 .set_max_chunk_size_bytes(8)
221 .set_offset(0)),
222 max_parameters_,
223 std::chrono::seconds(2),
224 3,
225 10);
226 transfer_thread_.WaitUntilEventIsProcessed();
227
228 // First transfer starts correctly.
229 EXPECT_TRUE(handler3.prepare_read_called);
230 EXPECT_FALSE(handler4.prepare_read_called);
231 ASSERT_EQ(ctx_.total_responses(), 1u);
232
233 // Try to start a simultaneous transfer to resource 4, for which the thread
234 // does not have an available context.
235 transfer_thread_.StartServerTransfer(
236 internal::TransferType::kTransmit,
237 ProtocolVersion::kLegacy,
238 4,
239 4,
240 EncodeChunk(
241 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
242 .set_session_id(4)
243 .set_window_end_offset(16)
244 .set_max_chunk_size_bytes(8)
245 .set_offset(0)),
246 max_parameters_,
247 std::chrono::seconds(2),
248 3,
249 10);
250 transfer_thread_.WaitUntilEventIsProcessed();
251
252 EXPECT_FALSE(handler4.prepare_read_called);
253
254 ASSERT_EQ(ctx_.total_responses(), 2u);
255 auto chunk = DecodeChunk(ctx_.response());
256 EXPECT_EQ(chunk.session_id(), 4u);
257 ASSERT_TRUE(chunk.status().has_value());
258 EXPECT_EQ(chunk.status().value(), Status::ResourceExhausted());
259
260 transfer_thread_.RemoveTransferHandler(handler3);
261 transfer_thread_.RemoveTransferHandler(handler4);
262 }
263
TEST_F(TransferThreadTest,StartTransferExhausted_Client)264 TEST_F(TransferThreadTest, StartTransferExhausted_Client) {
265 rpc::RawClientReaderWriter read_stream = pw_rpc::raw::Transfer::Read(
266 rpc_client_context_.client(), rpc_client_context_.channel().id());
267 transfer_thread_.SetClientReadStream(read_stream);
268
269 Status status3 = Status::Unknown();
270 Status status4 = Status::Unknown();
271
272 stream::MemoryWriterBuffer<16> buffer3;
273 stream::MemoryWriterBuffer<16> buffer4;
274
275 transfer_thread_.StartClientTransfer(
276 internal::TransferType::kReceive,
277 ProtocolVersion::kLegacy,
278 3,
279 &buffer3,
280 max_parameters_,
281 [&status3](Status status) { status3 = status; },
282 std::chrono::seconds(2),
283 3,
284 10);
285 transfer_thread_.WaitUntilEventIsProcessed();
286
287 EXPECT_EQ(status3, Status::Unknown());
288 EXPECT_EQ(status4, Status::Unknown());
289
290 // Try to start a simultaneous transfer to resource 4, for which the thread
291 // does not have an available context.
292 transfer_thread_.StartClientTransfer(
293 internal::TransferType::kReceive,
294 ProtocolVersion::kLegacy,
295 4,
296 &buffer4,
297 max_parameters_,
298 [&status4](Status status) { status4 = status; },
299 std::chrono::seconds(2),
300 3,
301 10);
302 transfer_thread_.WaitUntilEventIsProcessed();
303
304 EXPECT_EQ(status3, Status::Unknown());
305 EXPECT_EQ(status4, Status::ResourceExhausted());
306
307 transfer_thread_.EndClientTransfer(3, Status::Cancelled());
308 transfer_thread_.EndClientTransfer(4, Status::Cancelled());
309 }
310
TEST_F(TransferThreadTest,VersionTwo_NoHandler)311 TEST_F(TransferThreadTest, VersionTwo_NoHandler) {
312 auto reader_writer = ctx_.reader_writer();
313 transfer_thread_.SetServerReadStream(reader_writer);
314
315 SimpleReadTransfer handler(3, kData);
316 transfer_thread_.AddTransferHandler(handler);
317 transfer_thread_.RemoveTransferHandler(handler);
318
319 transfer_thread_.StartServerTransfer(internal::TransferType::kTransmit,
320 ProtocolVersion::kVersionTwo,
321 /*session_id=*/421,
322 /*resource_id=*/7,
323 {},
324 max_parameters_,
325 std::chrono::seconds(2),
326 3,
327 10);
328
329 transfer_thread_.WaitUntilEventIsProcessed();
330
331 EXPECT_FALSE(handler.prepare_read_called);
332
333 ASSERT_EQ(ctx_.total_responses(), 1u);
334 Result<Chunk::Identifier> id = Chunk::ExtractIdentifier(ctx_.response());
335 ASSERT_TRUE(id.ok());
336 EXPECT_EQ(id->value(), 7u);
337 auto chunk = DecodeChunk(ctx_.response());
338 EXPECT_EQ(chunk.session_id(), 7u);
339 EXPECT_EQ(chunk.resource_id(), 7u);
340 ASSERT_TRUE(chunk.status().has_value());
341 EXPECT_EQ(chunk.status().value(), Status::NotFound());
342
343 transfer_thread_.RemoveTransferHandler(handler);
344 }
345
346 } // namespace
347 } // namespace pw::transfer::test
348