• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_transfer/transfer_thread.h"
16 
17 #include "gtest/gtest.h"
18 #include "pw_assert/check.h"
19 #include "pw_bytes/array.h"
20 #include "pw_rpc/raw/client_testing.h"
21 #include "pw_rpc/raw/test_method_context.h"
22 #include "pw_rpc/test_helpers.h"
23 #include "pw_thread/thread.h"
24 #include "pw_thread_stl/options.h"
25 #include "pw_transfer/handler.h"
26 #include "pw_transfer/transfer.h"
27 #include "pw_transfer/transfer.raw_rpc.pb.h"
28 #include "pw_transfer_private/chunk_testing.h"
29 
30 namespace pw::transfer::test {
31 namespace {
32 
33 using internal::Chunk;
34 
35 // TODO(frolv): Have a generic way to obtain a thread for testing on any system.
TransferThreadOptions()36 thread::Options& TransferThreadOptions() {
37   static thread::stl::Options options;
38   return options;
39 }
40 
41 class TransferThreadTest : public ::testing::Test {
42  public:
TransferThreadTest()43   TransferThreadTest()
44       : ctx_(transfer_thread_, 512),
45         max_parameters_(chunk_buffer_.size(),
46                         chunk_buffer_.size(),
47                         cfg::kDefaultExtendWindowDivisor),
48         transfer_thread_(chunk_buffer_, encode_buffer_),
49         system_thread_(TransferThreadOptions(), transfer_thread_) {}
50 
~TransferThreadTest()51   ~TransferThreadTest() override {
52     transfer_thread_.Terminate();
53     system_thread_.join();
54   }
55 
56  protected:
57   PW_RAW_TEST_METHOD_CONTEXT(TransferService, Read) ctx_;
58 
59   std::array<std::byte, 64> chunk_buffer_;
60   std::array<std::byte, 64> encode_buffer_;
61 
62   rpc::RawClientTestContext<> rpc_client_context_;
63   internal::TransferParameters max_parameters_;
64 
65   transfer::Thread<1, 1> transfer_thread_;
66 
67   thread::Thread system_thread_;
68 };
69 
70 class SimpleReadTransfer final : public ReadOnlyHandler {
71  public:
SimpleReadTransfer(uint32_t session_id,ConstByteSpan data)72   SimpleReadTransfer(uint32_t session_id, ConstByteSpan data)
73       : ReadOnlyHandler(session_id),
74         prepare_read_called(false),
75         finalize_read_called(false),
76         finalize_read_status(Status::Unknown()),
77         reader_(data) {}
78 
PrepareRead()79   Status PrepareRead() final {
80     PW_CHECK_OK(reader_.Seek(0));
81     set_reader(reader_);
82     prepare_read_called = true;
83     return OkStatus();
84   }
85 
FinalizeRead(Status status)86   void FinalizeRead(Status status) final {
87     finalize_read_called = true;
88     finalize_read_status = status;
89   }
90 
91   bool prepare_read_called;
92   bool finalize_read_called;
93   Status finalize_read_status;
94 
95  private:
96   stream::MemoryReader reader_;
97 };
98 
__anonfb73af530202(size_t i) 99 constexpr auto kData = bytes::Initialized<32>([](size_t i) { return i; });
100 
TEST_F(TransferThreadTest,AddTransferHandler)101 TEST_F(TransferThreadTest, AddTransferHandler) {
102   auto reader_writer = ctx_.reader_writer();
103   transfer_thread_.SetServerReadStream(reader_writer);
104 
105   SimpleReadTransfer handler(3, kData);
106   transfer_thread_.AddTransferHandler(handler);
107 
108   transfer_thread_.StartServerTransfer(internal::TransferType::kTransmit,
109                                        ProtocolVersion::kLegacy,
110                                        3,
111                                        3,
112                                        {},
113                                        max_parameters_,
114                                        std::chrono::seconds(2),
115                                        3,
116                                        10);
117 
118   transfer_thread_.WaitUntilEventIsProcessed();
119 
120   EXPECT_TRUE(handler.prepare_read_called);
121 
122   transfer_thread_.RemoveTransferHandler(handler);
123 }
124 
TEST_F(TransferThreadTest,RemoveTransferHandler)125 TEST_F(TransferThreadTest, RemoveTransferHandler) {
126   auto reader_writer = ctx_.reader_writer();
127   transfer_thread_.SetServerReadStream(reader_writer);
128 
129   SimpleReadTransfer handler(3, kData);
130   transfer_thread_.AddTransferHandler(handler);
131   transfer_thread_.RemoveTransferHandler(handler);
132 
133   transfer_thread_.StartServerTransfer(internal::TransferType::kTransmit,
134                                        ProtocolVersion::kLegacy,
135                                        3,
136                                        3,
137                                        {},
138                                        max_parameters_,
139                                        std::chrono::seconds(2),
140                                        3,
141                                        10);
142 
143   transfer_thread_.WaitUntilEventIsProcessed();
144 
145   EXPECT_FALSE(handler.prepare_read_called);
146 
147   ASSERT_EQ(ctx_.total_responses(), 1u);
148   auto chunk = DecodeChunk(ctx_.response());
149   EXPECT_EQ(chunk.session_id(), 3u);
150   ASSERT_TRUE(chunk.status().has_value());
151   EXPECT_EQ(chunk.status().value(), Status::NotFound());
152 
153   transfer_thread_.RemoveTransferHandler(handler);
154 }
155 
TEST_F(TransferThreadTest,ProcessChunk_SendsWindow)156 TEST_F(TransferThreadTest, ProcessChunk_SendsWindow) {
157   auto reader_writer = ctx_.reader_writer();
158   transfer_thread_.SetServerReadStream(reader_writer);
159 
160   SimpleReadTransfer handler(3, kData);
161   transfer_thread_.AddTransferHandler(handler);
162 
163   rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
164     transfer_thread_.StartServerTransfer(
165         internal::TransferType::kTransmit,
166         ProtocolVersion::kLegacy,
167         3,
168         3,
169         EncodeChunk(
170             Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
171                 .set_session_id(3)
172                 .set_window_end_offset(16)
173                 .set_max_chunk_size_bytes(8)
174                 .set_offset(0)),
175         max_parameters_,
176         std::chrono::seconds(2),
177         3,
178         10);
179   });
180 
181   ASSERT_EQ(ctx_.total_responses(), 2u);
182   auto chunk = DecodeChunk(ctx_.responses()[0]);
183   EXPECT_EQ(chunk.session_id(), 3u);
184   EXPECT_EQ(chunk.offset(), 0u);
185   EXPECT_EQ(chunk.payload().size(), 8u);
186   EXPECT_EQ(
187       std::memcmp(chunk.payload().data(), kData.data(), chunk.payload().size()),
188       0);
189 
190   chunk = DecodeChunk(ctx_.responses()[1]);
191   EXPECT_EQ(chunk.session_id(), 3u);
192   EXPECT_EQ(chunk.offset(), 8u);
193   EXPECT_EQ(chunk.payload().size(), 8u);
194   EXPECT_EQ(
195       std::memcmp(
196           chunk.payload().data(), kData.data() + 8, chunk.payload().size()),
197       0);
198 
199   transfer_thread_.RemoveTransferHandler(handler);
200 }
201 
TEST_F(TransferThreadTest,StartTransferExhausted_Server)202 TEST_F(TransferThreadTest, StartTransferExhausted_Server) {
203   auto reader_writer = ctx_.reader_writer();
204   transfer_thread_.SetServerReadStream(reader_writer);
205 
206   SimpleReadTransfer handler3(3, kData);
207   SimpleReadTransfer handler4(4, kData);
208   transfer_thread_.AddTransferHandler(handler3);
209   transfer_thread_.AddTransferHandler(handler4);
210 
211   transfer_thread_.StartServerTransfer(
212       internal::TransferType::kTransmit,
213       ProtocolVersion::kLegacy,
214       3,
215       3,
216       EncodeChunk(
217           Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
218               .set_session_id(3)
219               .set_window_end_offset(16)
220               .set_max_chunk_size_bytes(8)
221               .set_offset(0)),
222       max_parameters_,
223       std::chrono::seconds(2),
224       3,
225       10);
226   transfer_thread_.WaitUntilEventIsProcessed();
227 
228   // First transfer starts correctly.
229   EXPECT_TRUE(handler3.prepare_read_called);
230   EXPECT_FALSE(handler4.prepare_read_called);
231   ASSERT_EQ(ctx_.total_responses(), 1u);
232 
233   // Try to start a simultaneous transfer to resource 4, for which the thread
234   // does not have an available context.
235   transfer_thread_.StartServerTransfer(
236       internal::TransferType::kTransmit,
237       ProtocolVersion::kLegacy,
238       4,
239       4,
240       EncodeChunk(
241           Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
242               .set_session_id(4)
243               .set_window_end_offset(16)
244               .set_max_chunk_size_bytes(8)
245               .set_offset(0)),
246       max_parameters_,
247       std::chrono::seconds(2),
248       3,
249       10);
250   transfer_thread_.WaitUntilEventIsProcessed();
251 
252   EXPECT_FALSE(handler4.prepare_read_called);
253 
254   ASSERT_EQ(ctx_.total_responses(), 2u);
255   auto chunk = DecodeChunk(ctx_.response());
256   EXPECT_EQ(chunk.session_id(), 4u);
257   ASSERT_TRUE(chunk.status().has_value());
258   EXPECT_EQ(chunk.status().value(), Status::ResourceExhausted());
259 
260   transfer_thread_.RemoveTransferHandler(handler3);
261   transfer_thread_.RemoveTransferHandler(handler4);
262 }
263 
TEST_F(TransferThreadTest,StartTransferExhausted_Client)264 TEST_F(TransferThreadTest, StartTransferExhausted_Client) {
265   rpc::RawClientReaderWriter read_stream = pw_rpc::raw::Transfer::Read(
266       rpc_client_context_.client(), rpc_client_context_.channel().id());
267   transfer_thread_.SetClientReadStream(read_stream);
268 
269   Status status3 = Status::Unknown();
270   Status status4 = Status::Unknown();
271 
272   stream::MemoryWriterBuffer<16> buffer3;
273   stream::MemoryWriterBuffer<16> buffer4;
274 
275   transfer_thread_.StartClientTransfer(
276       internal::TransferType::kReceive,
277       ProtocolVersion::kLegacy,
278       3,
279       &buffer3,
280       max_parameters_,
281       [&status3](Status status) { status3 = status; },
282       std::chrono::seconds(2),
283       3,
284       10);
285   transfer_thread_.WaitUntilEventIsProcessed();
286 
287   EXPECT_EQ(status3, Status::Unknown());
288   EXPECT_EQ(status4, Status::Unknown());
289 
290   // Try to start a simultaneous transfer to resource 4, for which the thread
291   // does not have an available context.
292   transfer_thread_.StartClientTransfer(
293       internal::TransferType::kReceive,
294       ProtocolVersion::kLegacy,
295       4,
296       &buffer4,
297       max_parameters_,
298       [&status4](Status status) { status4 = status; },
299       std::chrono::seconds(2),
300       3,
301       10);
302   transfer_thread_.WaitUntilEventIsProcessed();
303 
304   EXPECT_EQ(status3, Status::Unknown());
305   EXPECT_EQ(status4, Status::ResourceExhausted());
306 
307   transfer_thread_.EndClientTransfer(3, Status::Cancelled());
308   transfer_thread_.EndClientTransfer(4, Status::Cancelled());
309 }
310 
TEST_F(TransferThreadTest,VersionTwo_NoHandler)311 TEST_F(TransferThreadTest, VersionTwo_NoHandler) {
312   auto reader_writer = ctx_.reader_writer();
313   transfer_thread_.SetServerReadStream(reader_writer);
314 
315   SimpleReadTransfer handler(3, kData);
316   transfer_thread_.AddTransferHandler(handler);
317   transfer_thread_.RemoveTransferHandler(handler);
318 
319   transfer_thread_.StartServerTransfer(internal::TransferType::kTransmit,
320                                        ProtocolVersion::kVersionTwo,
321                                        /*session_id=*/421,
322                                        /*resource_id=*/7,
323                                        {},
324                                        max_parameters_,
325                                        std::chrono::seconds(2),
326                                        3,
327                                        10);
328 
329   transfer_thread_.WaitUntilEventIsProcessed();
330 
331   EXPECT_FALSE(handler.prepare_read_called);
332 
333   ASSERT_EQ(ctx_.total_responses(), 1u);
334   Result<Chunk::Identifier> id = Chunk::ExtractIdentifier(ctx_.response());
335   ASSERT_TRUE(id.ok());
336   EXPECT_EQ(id->value(), 7u);
337   auto chunk = DecodeChunk(ctx_.response());
338   EXPECT_EQ(chunk.session_id(), 7u);
339   EXPECT_EQ(chunk.resource_id(), 7u);
340   ASSERT_TRUE(chunk.status().has_value());
341   EXPECT_EQ(chunk.status().value(), Status::NotFound());
342 
343   transfer_thread_.RemoveTransferHandler(handler);
344 }
345 
346 }  // namespace
347 }  // namespace pw::transfer::test
348