1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_transfer/transfer.h"
16
17 #include "gtest/gtest.h"
18 #include "pw_bytes/array.h"
19 #include "pw_rpc/raw/test_method_context.h"
20 #include "pw_rpc/thread_testing.h"
21 #include "pw_thread/thread.h"
22 #include "pw_thread_stl/options.h"
23 #include "pw_transfer/transfer.pwpb.h"
24 #include "pw_transfer_private/chunk_testing.h"
25
26 namespace pw::transfer::test {
27 namespace {
28
29 using namespace std::chrono_literals;
30
31 PW_MODIFY_DIAGNOSTICS_PUSH();
32 PW_MODIFY_DIAGNOSTIC(ignored, "-Wmissing-field-initializers");
33
34 // TODO(frolv): Have a generic way to obtain a thread for testing on any system.
TransferThreadOptions()35 thread::Options& TransferThreadOptions() {
36 static thread::stl::Options options;
37 return options;
38 }
39
40 using internal::Chunk;
41
42 class TestMemoryReader : public stream::SeekableReader {
43 public:
TestMemoryReader(std::span<const std::byte> data)44 constexpr TestMemoryReader(std::span<const std::byte> data)
45 : memory_reader_(data) {}
46
DoSeek(ptrdiff_t offset,Whence origin)47 Status DoSeek(ptrdiff_t offset, Whence origin) override {
48 if (seek_status.ok()) {
49 return memory_reader_.Seek(offset, origin);
50 }
51 return seek_status;
52 }
53
DoRead(ByteSpan dest)54 StatusWithSize DoRead(ByteSpan dest) final {
55 if (!read_status.ok()) {
56 return StatusWithSize(read_status, 0);
57 }
58
59 auto result = memory_reader_.Read(dest);
60 return result.ok() ? StatusWithSize(result->size())
61 : StatusWithSize(result.status(), 0);
62 }
63
64 Status seek_status;
65 Status read_status;
66
67 private:
68 stream::MemoryReader memory_reader_;
69 };
70
71 class SimpleReadTransfer final : public ReadOnlyHandler {
72 public:
SimpleReadTransfer(uint32_t transfer_id,ConstByteSpan data)73 SimpleReadTransfer(uint32_t transfer_id, ConstByteSpan data)
74 : ReadOnlyHandler(transfer_id),
75 prepare_read_called(false),
76 finalize_read_called(false),
77 finalize_read_status(Status::Unknown()),
78 reader_(data) {}
79
PrepareRead()80 Status PrepareRead() final {
81 prepare_read_called = true;
82
83 if (!prepare_read_return_status.ok()) {
84 return prepare_read_return_status;
85 }
86
87 EXPECT_EQ(reader_.seek_status, reader_.Seek(0));
88 set_reader(reader_);
89 return OkStatus();
90 }
91
FinalizeRead(Status status)92 void FinalizeRead(Status status) final {
93 finalize_read_called = true;
94 finalize_read_status = status;
95 }
96
set_seek_status(Status status)97 void set_seek_status(Status status) { reader_.seek_status = status; }
set_read_status(Status status)98 void set_read_status(Status status) { reader_.read_status = status; }
99
100 bool prepare_read_called;
101 bool finalize_read_called;
102 Status prepare_read_return_status;
103 Status finalize_read_status;
104
105 private:
106 TestMemoryReader reader_;
107 };
108
__anon215251c30202(size_t i) 109 constexpr auto kData = bytes::Initialized<32>([](size_t i) { return i; });
110
111 class ReadTransfer : public ::testing::Test {
112 protected:
ReadTransfer(size_t max_chunk_size_bytes=64)113 ReadTransfer(size_t max_chunk_size_bytes = 64)
114 : handler_(3, kData),
115 transfer_thread_(std::span(data_buffer_).first(max_chunk_size_bytes),
116 encode_buffer_),
117 ctx_(transfer_thread_, 64),
118 system_thread_(TransferThreadOptions(), transfer_thread_) {
119 ctx_.service().RegisterHandler(handler_);
120
121 ASSERT_FALSE(handler_.prepare_read_called);
122 ASSERT_FALSE(handler_.finalize_read_called);
123
124 ctx_.call(); // Open the read stream
125 transfer_thread_.WaitUntilEventIsProcessed();
126 }
127
~ReadTransfer()128 ~ReadTransfer() {
129 transfer_thread_.Terminate();
130 system_thread_.join();
131 }
132
133 SimpleReadTransfer handler_;
134 Thread<1, 1> transfer_thread_;
135 PW_RAW_TEST_METHOD_CONTEXT(TransferService, Read) ctx_;
136 thread::Thread system_thread_;
137 std::array<std::byte, 64> data_buffer_;
138 std::array<std::byte, 64> encode_buffer_;
139 };
140
TEST_F(ReadTransfer,SingleChunk)141 TEST_F(ReadTransfer, SingleChunk) {
142 rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
143 ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
144 .window_end_offset = 64,
145 .pending_bytes = 64,
146 .offset = 0,
147 .type = Chunk::Type::kTransferStart}));
148
149 transfer_thread_.WaitUntilEventIsProcessed();
150 });
151
152 EXPECT_TRUE(handler_.prepare_read_called);
153 EXPECT_FALSE(handler_.finalize_read_called);
154
155 ASSERT_EQ(ctx_.total_responses(), 2u);
156 Chunk c0 = DecodeChunk(ctx_.responses()[0]);
157 Chunk c1 = DecodeChunk(ctx_.responses()[1]);
158
159 // First chunk should have all the read data.
160 EXPECT_EQ(c0.transfer_id, 3u);
161 EXPECT_EQ(c0.offset, 0u);
162 ASSERT_EQ(c0.data.size(), kData.size());
163 EXPECT_EQ(std::memcmp(c0.data.data(), kData.data(), c0.data.size()), 0);
164
165 // Second chunk should be empty and set remaining_bytes = 0.
166 EXPECT_EQ(c1.transfer_id, 3u);
167 EXPECT_EQ(c1.data.size(), 0u);
168 ASSERT_TRUE(c1.remaining_bytes.has_value());
169 EXPECT_EQ(c1.remaining_bytes.value(), 0u);
170
171 ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
172 transfer_thread_.WaitUntilEventIsProcessed();
173
174 EXPECT_TRUE(handler_.finalize_read_called);
175 EXPECT_EQ(handler_.finalize_read_status, OkStatus());
176 }
177
TEST_F(ReadTransfer,PendingBytes_SingleChunk)178 TEST_F(ReadTransfer, PendingBytes_SingleChunk) {
179 rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
180 ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
181 .pending_bytes = 64,
182 .offset = 0,
183 .type = Chunk::Type::kTransferStart}));
184
185 transfer_thread_.WaitUntilEventIsProcessed();
186 });
187
188 EXPECT_TRUE(handler_.prepare_read_called);
189 EXPECT_FALSE(handler_.finalize_read_called);
190
191 ASSERT_EQ(ctx_.total_responses(), 2u);
192 Chunk c0 = DecodeChunk(ctx_.responses()[0]);
193 Chunk c1 = DecodeChunk(ctx_.responses()[1]);
194
195 // First chunk should have all the read data.
196 EXPECT_EQ(c0.transfer_id, 3u);
197 EXPECT_EQ(c0.offset, 0u);
198 ASSERT_EQ(c0.data.size(), kData.size());
199 EXPECT_EQ(std::memcmp(c0.data.data(), kData.data(), c0.data.size()), 0);
200
201 // Second chunk should be empty and set remaining_bytes = 0.
202 EXPECT_EQ(c1.transfer_id, 3u);
203 EXPECT_EQ(c1.data.size(), 0u);
204 ASSERT_TRUE(c1.remaining_bytes.has_value());
205 EXPECT_EQ(c1.remaining_bytes.value(), 0u);
206
207 ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
208 transfer_thread_.WaitUntilEventIsProcessed();
209
210 EXPECT_TRUE(handler_.finalize_read_called);
211 EXPECT_EQ(handler_.finalize_read_status, OkStatus());
212 }
213
TEST_F(ReadTransfer,MultiChunk)214 TEST_F(ReadTransfer, MultiChunk) {
215 ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
216 .window_end_offset = 16,
217 .pending_bytes = 16,
218 .offset = 0,
219 .type = Chunk::Type::kTransferStart}));
220
221 transfer_thread_.WaitUntilEventIsProcessed();
222
223 EXPECT_TRUE(handler_.prepare_read_called);
224 EXPECT_FALSE(handler_.finalize_read_called);
225
226 ASSERT_EQ(ctx_.total_responses(), 1u);
227 Chunk c0 = DecodeChunk(ctx_.responses()[0]);
228
229 EXPECT_EQ(c0.transfer_id, 3u);
230 EXPECT_EQ(c0.offset, 0u);
231 ASSERT_EQ(c0.data.size(), 16u);
232 EXPECT_EQ(std::memcmp(c0.data.data(), kData.data(), c0.data.size()), 0);
233
234 ctx_.SendClientStream(
235 EncodeChunk({.transfer_id = 3,
236 .window_end_offset = 32,
237 .pending_bytes = 16,
238 .offset = 16,
239 .type = Chunk::Type::kParametersContinue}));
240 transfer_thread_.WaitUntilEventIsProcessed();
241
242 ASSERT_EQ(ctx_.total_responses(), 2u);
243 Chunk c1 = DecodeChunk(ctx_.responses()[1]);
244
245 EXPECT_EQ(c1.transfer_id, 3u);
246 EXPECT_EQ(c1.offset, 16u);
247 ASSERT_EQ(c1.data.size(), 16u);
248 EXPECT_EQ(std::memcmp(c1.data.data(), kData.data() + 16, c1.data.size()), 0);
249
250 ctx_.SendClientStream(
251 EncodeChunk({.transfer_id = 3,
252 .window_end_offset = 48,
253 .pending_bytes = 16,
254 .offset = 32,
255 .type = Chunk::Type::kParametersContinue}));
256 transfer_thread_.WaitUntilEventIsProcessed();
257
258 ASSERT_EQ(ctx_.total_responses(), 3u);
259 Chunk c2 = DecodeChunk(ctx_.responses()[2]);
260
261 EXPECT_EQ(c2.transfer_id, 3u);
262 EXPECT_EQ(c2.data.size(), 0u);
263 ASSERT_TRUE(c2.remaining_bytes.has_value());
264 EXPECT_EQ(c2.remaining_bytes.value(), 0u);
265
266 ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
267 transfer_thread_.WaitUntilEventIsProcessed();
268
269 EXPECT_TRUE(handler_.finalize_read_called);
270 EXPECT_EQ(handler_.finalize_read_status, OkStatus());
271 }
272
TEST_F(ReadTransfer,MultiChunk_RepeatedContinuePackets)273 TEST_F(ReadTransfer, MultiChunk_RepeatedContinuePackets) {
274 ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
275 .window_end_offset = 16,
276 .pending_bytes = 16,
277 .offset = 0,
278 .type = Chunk::Type::kTransferStart}));
279
280 transfer_thread_.WaitUntilEventIsProcessed();
281
282 const auto continue_chunk =
283 EncodeChunk({.transfer_id = 3,
284 .window_end_offset = 24,
285 .pending_bytes = 8,
286 .offset = 16,
287 .type = Chunk::Type::kParametersContinue});
288 ctx_.SendClientStream(continue_chunk);
289
290 transfer_thread_.WaitUntilEventIsProcessed();
291
292 // Resend the CONTINUE packets that don't actually advance the window.
293 for (int i = 0; i < 3; ++i) {
294 ctx_.SendClientStream(continue_chunk);
295 transfer_thread_.WaitUntilEventIsProcessed();
296 }
297
298 ASSERT_EQ(ctx_.total_responses(), 2u); // Only sent one packet
299 Chunk c1 = DecodeChunk(ctx_.responses()[1]);
300
301 EXPECT_EQ(c1.transfer_id, 3u);
302 EXPECT_EQ(c1.offset, 16u);
303 ASSERT_EQ(c1.data.size(), 8u);
304 EXPECT_EQ(std::memcmp(c1.data.data(), kData.data() + 16, c1.data.size()), 0);
305 }
306
TEST_F(ReadTransfer,PendingBytes_MultiChunk)307 TEST_F(ReadTransfer, PendingBytes_MultiChunk) {
308 ctx_.SendClientStream(
309 EncodeChunk({.transfer_id = 3, .pending_bytes = 16, .offset = 0}));
310
311 transfer_thread_.WaitUntilEventIsProcessed();
312
313 EXPECT_TRUE(handler_.prepare_read_called);
314 EXPECT_FALSE(handler_.finalize_read_called);
315
316 ASSERT_EQ(ctx_.total_responses(), 1u);
317 Chunk c0 = DecodeChunk(ctx_.responses()[0]);
318
319 EXPECT_EQ(c0.transfer_id, 3u);
320 EXPECT_EQ(c0.offset, 0u);
321 ASSERT_EQ(c0.data.size(), 16u);
322 EXPECT_EQ(std::memcmp(c0.data.data(), kData.data(), c0.data.size()), 0);
323
324 ctx_.SendClientStream(
325 EncodeChunk({.transfer_id = 3, .pending_bytes = 16, .offset = 16}));
326 transfer_thread_.WaitUntilEventIsProcessed();
327
328 ASSERT_EQ(ctx_.total_responses(), 2u);
329 Chunk c1 = DecodeChunk(ctx_.responses()[1]);
330
331 EXPECT_EQ(c1.transfer_id, 3u);
332 EXPECT_EQ(c1.offset, 16u);
333 ASSERT_EQ(c1.data.size(), 16u);
334 EXPECT_EQ(std::memcmp(c1.data.data(), kData.data() + 16, c1.data.size()), 0);
335
336 ctx_.SendClientStream(
337 EncodeChunk({.transfer_id = 3, .pending_bytes = 16, .offset = 32}));
338 transfer_thread_.WaitUntilEventIsProcessed();
339
340 ASSERT_EQ(ctx_.total_responses(), 3u);
341 Chunk c2 = DecodeChunk(ctx_.responses()[2]);
342
343 EXPECT_EQ(c2.transfer_id, 3u);
344 EXPECT_EQ(c2.data.size(), 0u);
345 ASSERT_TRUE(c2.remaining_bytes.has_value());
346 EXPECT_EQ(c2.remaining_bytes.value(), 0u);
347
348 ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
349 transfer_thread_.WaitUntilEventIsProcessed();
350
351 EXPECT_TRUE(handler_.finalize_read_called);
352 EXPECT_EQ(handler_.finalize_read_status, OkStatus());
353 }
354
TEST_F(ReadTransfer,OutOfOrder_SeekingSupported)355 TEST_F(ReadTransfer, OutOfOrder_SeekingSupported) {
356 rpc::test::WaitForPackets(ctx_.output(), 4, [this] {
357 ctx_.SendClientStream(
358 EncodeChunk({.transfer_id = 3, .pending_bytes = 16, .offset = 0}));
359
360 transfer_thread_.WaitUntilEventIsProcessed();
361
362 Chunk chunk = DecodeChunk(ctx_.responses().back());
363 EXPECT_TRUE(std::equal(
364 &kData[0], &kData[16], chunk.data.begin(), chunk.data.end()));
365
366 ctx_.SendClientStream(
367 EncodeChunk({.transfer_id = 3, .pending_bytes = 8, .offset = 2}));
368
369 transfer_thread_.WaitUntilEventIsProcessed();
370
371 chunk = DecodeChunk(ctx_.responses().back());
372 EXPECT_TRUE(std::equal(
373 &kData[2], &kData[10], chunk.data.begin(), chunk.data.end()));
374
375 ctx_.SendClientStream(
376 EncodeChunk({.transfer_id = 3, .pending_bytes = 64, .offset = 17}));
377 });
378
379 ASSERT_EQ(ctx_.total_responses(), 4u);
380 Chunk chunk = DecodeChunk(ctx_.responses()[2]);
381 EXPECT_TRUE(std::equal(
382 &kData[17], kData.end(), chunk.data.begin(), chunk.data.end()));
383 }
384
TEST_F(ReadTransfer,OutOfOrder_SeekingNotSupported_EndsWithUnimplemented)385 TEST_F(ReadTransfer, OutOfOrder_SeekingNotSupported_EndsWithUnimplemented) {
386 handler_.set_seek_status(Status::Unimplemented());
387
388 ctx_.SendClientStream(
389 EncodeChunk({.transfer_id = 3, .pending_bytes = 16, .offset = 0}));
390 ctx_.SendClientStream(
391 EncodeChunk({.transfer_id = 3, .pending_bytes = 8, .offset = 2}));
392
393 transfer_thread_.WaitUntilEventIsProcessed();
394
395 ASSERT_EQ(ctx_.total_responses(), 2u);
396 Chunk chunk = DecodeChunk(ctx_.responses().back());
397 EXPECT_EQ(chunk.status, Status::Unimplemented());
398 }
399
TEST_F(ReadTransfer,MaxChunkSize_Client)400 TEST_F(ReadTransfer, MaxChunkSize_Client) {
401 rpc::test::WaitForPackets(ctx_.output(), 5, [this] {
402 ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
403 .pending_bytes = 64,
404 .max_chunk_size_bytes = 8,
405 .offset = 0,
406 .type = Chunk::Type::kTransferStart}));
407 });
408
409 EXPECT_TRUE(handler_.prepare_read_called);
410 EXPECT_FALSE(handler_.finalize_read_called);
411
412 ASSERT_EQ(ctx_.total_responses(), 5u);
413 Chunk c0 = DecodeChunk(ctx_.responses()[0]);
414 Chunk c1 = DecodeChunk(ctx_.responses()[1]);
415 Chunk c2 = DecodeChunk(ctx_.responses()[2]);
416 Chunk c3 = DecodeChunk(ctx_.responses()[3]);
417 Chunk c4 = DecodeChunk(ctx_.responses()[4]);
418
419 EXPECT_EQ(c0.transfer_id, 3u);
420 EXPECT_EQ(c0.offset, 0u);
421 ASSERT_EQ(c0.data.size(), 8u);
422 EXPECT_EQ(std::memcmp(c0.data.data(), kData.data(), c0.data.size()), 0);
423
424 EXPECT_EQ(c1.transfer_id, 3u);
425 EXPECT_EQ(c1.offset, 8u);
426 ASSERT_EQ(c1.data.size(), 8u);
427 EXPECT_EQ(std::memcmp(c1.data.data(), kData.data() + 8, c1.data.size()), 0);
428
429 EXPECT_EQ(c2.transfer_id, 3u);
430 EXPECT_EQ(c2.offset, 16u);
431 ASSERT_EQ(c2.data.size(), 8u);
432 EXPECT_EQ(std::memcmp(c2.data.data(), kData.data() + 16, c2.data.size()), 0);
433
434 EXPECT_EQ(c3.transfer_id, 3u);
435 EXPECT_EQ(c3.offset, 24u);
436 ASSERT_EQ(c3.data.size(), 8u);
437 EXPECT_EQ(std::memcmp(c3.data.data(), kData.data() + 24, c3.data.size()), 0);
438
439 EXPECT_EQ(c4.transfer_id, 3u);
440 EXPECT_EQ(c4.data.size(), 0u);
441 ASSERT_TRUE(c4.remaining_bytes.has_value());
442 EXPECT_EQ(c4.remaining_bytes.value(), 0u);
443
444 ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
445 transfer_thread_.WaitUntilEventIsProcessed();
446
447 EXPECT_TRUE(handler_.finalize_read_called);
448 EXPECT_EQ(handler_.finalize_read_status, OkStatus());
449 }
450
TEST_F(ReadTransfer,HandlerIsClearedAfterTransfer)451 TEST_F(ReadTransfer, HandlerIsClearedAfterTransfer) {
452 ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
453 .window_end_offset = 64,
454 .pending_bytes = 64,
455 .offset = 0,
456 .type = Chunk::Type::kTransferStart}));
457 ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
458 transfer_thread_.WaitUntilEventIsProcessed();
459
460 ASSERT_EQ(ctx_.total_responses(), 1u);
461 ASSERT_TRUE(handler_.prepare_read_called);
462 ASSERT_TRUE(handler_.finalize_read_called);
463 ASSERT_EQ(OkStatus(), handler_.finalize_read_status);
464
465 // Now, clear state and start a second transfer
466 handler_.prepare_read_return_status = Status::FailedPrecondition();
467 handler_.prepare_read_called = false;
468 handler_.finalize_read_called = false;
469
470 ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
471 .window_end_offset = 64,
472 .pending_bytes = 64,
473 .offset = 0,
474 .type = Chunk::Type::kTransferStart}));
475 transfer_thread_.WaitUntilEventIsProcessed();
476
477 // Prepare failed, so the handler should not have been stored in the context,
478 // and finalize should not have been called.
479 ASSERT_TRUE(handler_.prepare_read_called);
480 ASSERT_FALSE(handler_.finalize_read_called);
481 }
482
483 class ReadTransferMaxChunkSize8 : public ReadTransfer {
484 protected:
ReadTransferMaxChunkSize8()485 ReadTransferMaxChunkSize8() : ReadTransfer(/*max_chunk_size_bytes=*/8) {}
486 };
487
TEST_F(ReadTransferMaxChunkSize8,MaxChunkSize_Server)488 TEST_F(ReadTransferMaxChunkSize8, MaxChunkSize_Server) {
489 // Client asks for max 16-byte chunks, but service places a limit of 8 bytes.
490 rpc::test::WaitForPackets(ctx_.output(), 5, [this] {
491 ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
492 .pending_bytes = 64,
493 .max_chunk_size_bytes = 16,
494 .offset = 0,
495 .type = Chunk::Type::kTransferStart}));
496 });
497
498 EXPECT_TRUE(handler_.prepare_read_called);
499 EXPECT_FALSE(handler_.finalize_read_called);
500
501 ASSERT_EQ(ctx_.total_responses(), 5u);
502 Chunk c0 = DecodeChunk(ctx_.responses()[0]);
503 Chunk c1 = DecodeChunk(ctx_.responses()[1]);
504 Chunk c2 = DecodeChunk(ctx_.responses()[2]);
505 Chunk c3 = DecodeChunk(ctx_.responses()[3]);
506 Chunk c4 = DecodeChunk(ctx_.responses()[4]);
507
508 EXPECT_EQ(c0.transfer_id, 3u);
509 EXPECT_EQ(c0.offset, 0u);
510 ASSERT_EQ(c0.data.size(), 8u);
511 EXPECT_EQ(std::memcmp(c0.data.data(), kData.data(), c0.data.size()), 0);
512
513 EXPECT_EQ(c1.transfer_id, 3u);
514 EXPECT_EQ(c1.offset, 8u);
515 ASSERT_EQ(c1.data.size(), 8u);
516 EXPECT_EQ(std::memcmp(c1.data.data(), kData.data() + 8, c1.data.size()), 0);
517
518 EXPECT_EQ(c2.transfer_id, 3u);
519 EXPECT_EQ(c2.offset, 16u);
520 ASSERT_EQ(c2.data.size(), 8u);
521 EXPECT_EQ(std::memcmp(c2.data.data(), kData.data() + 16, c2.data.size()), 0);
522
523 EXPECT_EQ(c3.transfer_id, 3u);
524 EXPECT_EQ(c3.offset, 24u);
525 ASSERT_EQ(c3.data.size(), 8u);
526 EXPECT_EQ(std::memcmp(c3.data.data(), kData.data() + 24, c3.data.size()), 0);
527
528 EXPECT_EQ(c4.transfer_id, 3u);
529 EXPECT_EQ(c4.data.size(), 0u);
530 ASSERT_TRUE(c4.remaining_bytes.has_value());
531 EXPECT_EQ(c4.remaining_bytes.value(), 0u);
532
533 ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
534 transfer_thread_.WaitUntilEventIsProcessed();
535
536 EXPECT_TRUE(handler_.finalize_read_called);
537 EXPECT_EQ(handler_.finalize_read_status, OkStatus());
538 }
539
TEST_F(ReadTransfer,ClientError)540 TEST_F(ReadTransfer, ClientError) {
541 ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
542 .pending_bytes = 16,
543 .offset = 0,
544 .type = Chunk::Type::kTransferStart}));
545
546 transfer_thread_.WaitUntilEventIsProcessed();
547
548 EXPECT_TRUE(handler_.prepare_read_called);
549 EXPECT_FALSE(handler_.finalize_read_called);
550 ASSERT_EQ(ctx_.total_responses(), 1u);
551
552 // Send client error.
553 ctx_.SendClientStream(
554 EncodeChunk({.transfer_id = 3, .status = Status::OutOfRange()}));
555 transfer_thread_.WaitUntilEventIsProcessed();
556
557 ASSERT_EQ(ctx_.total_responses(), 1u);
558 EXPECT_TRUE(handler_.finalize_read_called);
559 EXPECT_EQ(handler_.finalize_read_status, Status::OutOfRange());
560 }
561
TEST_F(ReadTransfer,MalformedParametersChunk)562 TEST_F(ReadTransfer, MalformedParametersChunk) {
563 // pending_bytes is required in a parameters chunk.
564 ctx_.SendClientStream(EncodeChunk({.transfer_id = 3}));
565 transfer_thread_.WaitUntilEventIsProcessed();
566
567 EXPECT_TRUE(handler_.prepare_read_called);
568 EXPECT_TRUE(handler_.finalize_read_called);
569 EXPECT_EQ(handler_.finalize_read_status, Status::InvalidArgument());
570
571 ASSERT_EQ(ctx_.total_responses(), 1u);
572 Chunk chunk = DecodeChunk(ctx_.responses()[0]);
573 EXPECT_EQ(chunk.transfer_id, 3u);
574 ASSERT_TRUE(chunk.status.has_value());
575 EXPECT_EQ(chunk.status.value(), Status::InvalidArgument());
576 }
577
TEST_F(ReadTransfer,UnregisteredHandler)578 TEST_F(ReadTransfer, UnregisteredHandler) {
579 ctx_.SendClientStream(EncodeChunk({.transfer_id = 11,
580 .pending_bytes = 32,
581 .offset = 0,
582 .type = Chunk::Type::kTransferStart}));
583 transfer_thread_.WaitUntilEventIsProcessed();
584
585 ASSERT_EQ(ctx_.total_responses(), 1u);
586 Chunk chunk = DecodeChunk(ctx_.responses()[0]);
587 EXPECT_EQ(chunk.transfer_id, 11u);
588 ASSERT_TRUE(chunk.status.has_value());
589 EXPECT_EQ(chunk.status.value(), Status::NotFound());
590 }
591
TEST_F(ReadTransfer,IgnoresNonPendingTransfers)592 TEST_F(ReadTransfer, IgnoresNonPendingTransfers) {
593 ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .offset = 3}));
594 ctx_.SendClientStream(EncodeChunk(
595 {.transfer_id = 3, .offset = 0, .data = std::span(kData).first(10)}));
596 ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
597 transfer_thread_.WaitUntilEventIsProcessed();
598
599 // Only start transfer for initial packet.
600 EXPECT_FALSE(handler_.prepare_read_called);
601 EXPECT_FALSE(handler_.finalize_read_called);
602 }
603
TEST_F(ReadTransfer,AbortAndRestartIfInitialPacketIsReceived)604 TEST_F(ReadTransfer, AbortAndRestartIfInitialPacketIsReceived) {
605 ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
606 .pending_bytes = 16,
607 .offset = 0,
608 .type = Chunk::Type::kTransferStart}));
609 transfer_thread_.WaitUntilEventIsProcessed();
610
611 ASSERT_EQ(ctx_.total_responses(), 1u);
612
613 EXPECT_TRUE(handler_.prepare_read_called);
614 EXPECT_FALSE(handler_.finalize_read_called);
615 handler_.prepare_read_called = false; // Reset so can check if called again.
616
617 ctx_.SendClientStream( // Resend starting chunk
618 EncodeChunk({.transfer_id = 3,
619 .pending_bytes = 16,
620 .offset = 0,
621 .type = Chunk::Type::kTransferStart}));
622 transfer_thread_.WaitUntilEventIsProcessed();
623
624 ASSERT_EQ(ctx_.total_responses(), 2u);
625
626 EXPECT_TRUE(handler_.prepare_read_called);
627 EXPECT_TRUE(handler_.finalize_read_called);
628 EXPECT_EQ(handler_.finalize_read_status, Status::Aborted());
629 handler_.finalize_read_called = false; // Reset so can check later
630
631 ctx_.SendClientStream(
632 EncodeChunk({.transfer_id = 3, .pending_bytes = 16, .offset = 16}));
633 ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
634 transfer_thread_.WaitUntilEventIsProcessed();
635
636 ASSERT_EQ(ctx_.total_responses(), 3u);
637 EXPECT_TRUE(handler_.finalize_read_called);
638 EXPECT_EQ(handler_.finalize_read_status, OkStatus());
639 }
640
TEST_F(ReadTransfer,ZeroPendingBytesWithRemainingData_Aborts)641 TEST_F(ReadTransfer, ZeroPendingBytesWithRemainingData_Aborts) {
642 ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
643 .pending_bytes = 0,
644 .type = Chunk::Type::kTransferStart}));
645 transfer_thread_.WaitUntilEventIsProcessed();
646
647 ASSERT_EQ(ctx_.total_responses(), 1u);
648 ASSERT_TRUE(handler_.finalize_read_called);
649 EXPECT_EQ(handler_.finalize_read_status, Status::ResourceExhausted());
650
651 Chunk chunk = DecodeChunk(ctx_.responses().back());
652 EXPECT_EQ(chunk.status, Status::ResourceExhausted());
653 }
654
TEST_F(ReadTransfer,ZeroPendingBytesNoRemainingData_Completes)655 TEST_F(ReadTransfer, ZeroPendingBytesNoRemainingData_Completes) {
656 // Make the next read appear to be the end of the stream.
657 handler_.set_read_status(Status::OutOfRange());
658
659 ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
660 .pending_bytes = 0,
661 .type = Chunk::Type::kTransferStart}));
662 transfer_thread_.WaitUntilEventIsProcessed();
663
664 Chunk chunk = DecodeChunk(ctx_.responses().back());
665 EXPECT_EQ(chunk.transfer_id, 3u);
666 EXPECT_EQ(chunk.remaining_bytes, 0u);
667
668 ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
669 transfer_thread_.WaitUntilEventIsProcessed();
670
671 ASSERT_EQ(ctx_.total_responses(), 1u);
672 ASSERT_TRUE(handler_.finalize_read_called);
673 EXPECT_EQ(handler_.finalize_read_status, OkStatus());
674 }
675
TEST_F(ReadTransfer,SendsErrorIfChunkIsReceivedInCompletedState)676 TEST_F(ReadTransfer, SendsErrorIfChunkIsReceivedInCompletedState) {
677 rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
678 ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
679 .pending_bytes = 64,
680 .offset = 0,
681 .type = Chunk::Type::kTransferStart}));
682 });
683
684 EXPECT_TRUE(handler_.prepare_read_called);
685 EXPECT_FALSE(handler_.finalize_read_called);
686
687 ASSERT_EQ(ctx_.total_responses(), 2u);
688 Chunk c0 = DecodeChunk(ctx_.responses()[0]);
689 Chunk c1 = DecodeChunk(ctx_.responses()[1]);
690
691 // First chunk should have all the read data.
692 EXPECT_EQ(c0.transfer_id, 3u);
693 EXPECT_EQ(c0.offset, 0u);
694 ASSERT_EQ(c0.data.size(), kData.size());
695 EXPECT_EQ(std::memcmp(c0.data.data(), kData.data(), c0.data.size()), 0);
696
697 // Second chunk should be empty and set remaining_bytes = 0.
698 EXPECT_EQ(c1.transfer_id, 3u);
699 EXPECT_EQ(c1.data.size(), 0u);
700 ASSERT_TRUE(c1.remaining_bytes.has_value());
701 EXPECT_EQ(c1.remaining_bytes.value(), 0u);
702
703 ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
704 transfer_thread_.WaitUntilEventIsProcessed();
705
706 EXPECT_TRUE(handler_.finalize_read_called);
707 EXPECT_EQ(handler_.finalize_read_status, OkStatus());
708
709 // At this point the transfer should be in a completed state. Send a
710 // non-initial chunk as a continuation of the transfer.
711 handler_.finalize_read_called = false;
712
713 ctx_.SendClientStream(
714 EncodeChunk({.transfer_id = 3, .pending_bytes = 48, .offset = 16}));
715 transfer_thread_.WaitUntilEventIsProcessed();
716
717 ASSERT_EQ(ctx_.total_responses(), 3u);
718
719 Chunk c2 = DecodeChunk(ctx_.responses()[2]);
720 ASSERT_TRUE(c2.status.has_value());
721 EXPECT_EQ(c2.status.value(), Status::FailedPrecondition());
722
723 // FinalizeRead should not be called again.
724 EXPECT_FALSE(handler_.finalize_read_called);
725 }
726
727 class SimpleWriteTransfer final : public WriteOnlyHandler {
728 public:
SimpleWriteTransfer(uint32_t transfer_id,ByteSpan data)729 SimpleWriteTransfer(uint32_t transfer_id, ByteSpan data)
730 : WriteOnlyHandler(transfer_id),
731 prepare_write_called(false),
732 finalize_write_called(false),
733 finalize_write_status(Status::Unknown()),
734 writer_(data) {}
735
PrepareWrite()736 Status PrepareWrite() final {
737 EXPECT_EQ(OkStatus(), writer_.Seek(0));
738 set_writer(writer_);
739 prepare_write_called = true;
740 return OkStatus();
741 }
742
FinalizeWrite(Status status)743 Status FinalizeWrite(Status status) final {
744 finalize_write_called = true;
745 finalize_write_status = status;
746 return finalize_write_return_status_;
747 }
748
set_finalize_write_return(Status status)749 void set_finalize_write_return(Status status) {
750 finalize_write_return_status_ = status;
751 }
752
753 bool prepare_write_called;
754 bool finalize_write_called;
755 Status finalize_write_status;
756
757 private:
758 Status finalize_write_return_status_;
759 stream::MemoryWriter writer_;
760 };
761
762 class WriteTransfer : public ::testing::Test {
763 protected:
WriteTransfer(size_t max_bytes_to_receive=64)764 WriteTransfer(size_t max_bytes_to_receive = 64)
765 : buffer{},
766 handler_(7, buffer),
767 transfer_thread_(data_buffer_, encode_buffer_),
768 system_thread_(TransferThreadOptions(), transfer_thread_),
769 ctx_(transfer_thread_,
770 max_bytes_to_receive,
771 // Use a long timeout to avoid accidentally triggering timeouts.
772 std::chrono::minutes(1)) {
773 ctx_.service().RegisterHandler(handler_);
774
775 ASSERT_FALSE(handler_.prepare_write_called);
776 ASSERT_FALSE(handler_.finalize_write_called);
777
778 ctx_.call(); // Open the write stream
779 transfer_thread_.WaitUntilEventIsProcessed();
780 }
781
~WriteTransfer()782 ~WriteTransfer() {
783 transfer_thread_.Terminate();
784 system_thread_.join();
785 }
786
787 std::array<std::byte, kData.size()> buffer;
788 SimpleWriteTransfer handler_;
789
790 Thread<1, 1> transfer_thread_;
791 thread::Thread system_thread_;
792 std::array<std::byte, 64> data_buffer_;
793 std::array<std::byte, 64> encode_buffer_;
794 PW_RAW_TEST_METHOD_CONTEXT(TransferService, Write) ctx_;
795 };
796
TEST_F(WriteTransfer,SingleChunk)797 TEST_F(WriteTransfer, SingleChunk) {
798 ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
799 transfer_thread_.WaitUntilEventIsProcessed();
800
801 EXPECT_TRUE(handler_.prepare_write_called);
802 EXPECT_FALSE(handler_.finalize_write_called);
803
804 ASSERT_EQ(ctx_.total_responses(), 1u);
805 Chunk chunk = DecodeChunk(ctx_.responses()[0]);
806 EXPECT_EQ(chunk.transfer_id, 7u);
807 ASSERT_TRUE(chunk.pending_bytes.has_value());
808 EXPECT_EQ(chunk.pending_bytes.value(), 32u);
809 ASSERT_TRUE(chunk.max_chunk_size_bytes.has_value());
810 EXPECT_EQ(chunk.max_chunk_size_bytes.value(), 37u);
811
812 ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
813 .offset = 0,
814 .data = std::span(kData),
815 .remaining_bytes = 0}));
816 transfer_thread_.WaitUntilEventIsProcessed();
817
818 ASSERT_EQ(ctx_.total_responses(), 2u);
819 chunk = DecodeChunk(ctx_.responses()[1]);
820 EXPECT_EQ(chunk.transfer_id, 7u);
821 ASSERT_TRUE(chunk.status.has_value());
822 EXPECT_EQ(chunk.status.value(), OkStatus());
823
824 EXPECT_TRUE(handler_.finalize_write_called);
825 EXPECT_EQ(handler_.finalize_write_status, OkStatus());
826 EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
827 }
828
TEST_F(WriteTransfer,FinalizeFails)829 TEST_F(WriteTransfer, FinalizeFails) {
830 // Return an error when FinalizeWrite is called.
831 handler_.set_finalize_write_return(Status::FailedPrecondition());
832
833 ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
834 ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
835 .offset = 0,
836 .data = std::span(kData),
837 .remaining_bytes = 0}));
838 transfer_thread_.WaitUntilEventIsProcessed();
839
840 ASSERT_EQ(ctx_.total_responses(), 2u);
841 Chunk chunk = DecodeChunk(ctx_.responses()[1]);
842 EXPECT_EQ(chunk.transfer_id, 7u);
843 ASSERT_TRUE(chunk.status.has_value());
844 EXPECT_EQ(chunk.status.value(), Status::DataLoss());
845
846 EXPECT_TRUE(handler_.finalize_write_called);
847 EXPECT_EQ(handler_.finalize_write_status, OkStatus());
848 }
849
TEST_F(WriteTransfer,SendingFinalPacketFails)850 TEST_F(WriteTransfer, SendingFinalPacketFails) {
851 ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
852 transfer_thread_.WaitUntilEventIsProcessed();
853
854 ctx_.output().set_send_status(Status::Unknown());
855
856 ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
857 .offset = 0,
858 .data = std::span(kData),
859 .remaining_bytes = 0}));
860 transfer_thread_.WaitUntilEventIsProcessed();
861
862 // Should only have sent the transfer parameters.
863 ASSERT_EQ(ctx_.total_responses(), 1u);
864 Chunk chunk = DecodeChunk(ctx_.responses()[0]);
865 EXPECT_EQ(chunk.transfer_id, 7u);
866 ASSERT_TRUE(chunk.pending_bytes.has_value());
867 EXPECT_EQ(chunk.pending_bytes.value(), 32u);
868 ASSERT_TRUE(chunk.max_chunk_size_bytes.has_value());
869 EXPECT_EQ(chunk.max_chunk_size_bytes.value(), 37u);
870
871 // When FinalizeWrite() was called, the transfer was considered successful.
872 EXPECT_TRUE(handler_.finalize_write_called);
873 EXPECT_EQ(handler_.finalize_write_status, OkStatus());
874 }
875
TEST_F(WriteTransfer,MultiChunk)876 TEST_F(WriteTransfer, MultiChunk) {
877 ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
878 transfer_thread_.WaitUntilEventIsProcessed();
879
880 EXPECT_TRUE(handler_.prepare_write_called);
881 EXPECT_FALSE(handler_.finalize_write_called);
882
883 ASSERT_EQ(ctx_.total_responses(), 1u);
884 Chunk chunk = DecodeChunk(ctx_.responses()[0]);
885 EXPECT_EQ(chunk.transfer_id, 7u);
886 ASSERT_TRUE(chunk.pending_bytes.has_value());
887 EXPECT_EQ(chunk.pending_bytes.value(), 32u);
888
889 ctx_.SendClientStream<64>(EncodeChunk(
890 {.transfer_id = 7, .offset = 0, .data = std::span(kData).first(8)}));
891 transfer_thread_.WaitUntilEventIsProcessed();
892
893 ASSERT_EQ(ctx_.total_responses(), 1u);
894
895 ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
896 .offset = 8,
897 .data = std::span(kData).subspan(8),
898 .remaining_bytes = 0}));
899 transfer_thread_.WaitUntilEventIsProcessed();
900
901 ASSERT_EQ(ctx_.total_responses(), 2u);
902 chunk = DecodeChunk(ctx_.responses()[1]);
903 EXPECT_EQ(chunk.transfer_id, 7u);
904 ASSERT_TRUE(chunk.status.has_value());
905 EXPECT_EQ(chunk.status.value(), OkStatus());
906
907 EXPECT_TRUE(handler_.finalize_write_called);
908 EXPECT_EQ(handler_.finalize_write_status, OkStatus());
909 EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
910 }
911
TEST_F(WriteTransfer,WriteFailsOnRetry)912 TEST_F(WriteTransfer, WriteFailsOnRetry) {
913 // Skip one packet to fail on a retry.
914 ctx_.output().set_send_status(Status::FailedPrecondition(), 1);
915
916 // Wait for 3 packets: initial params, retry attempt, final error
917 rpc::test::WaitForPackets(ctx_.output(), 3, [this] {
918 // Send only one client packet so the service times out.
919 ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
920 transfer_thread_.SimulateServerTimeout(7); // Time out to trigger retry
921 });
922
923 // Attempted to send 3 packets, but the 2nd packet was dropped.
924 // Check that the last packet is an INTERNAL error from the RPC write failure.
925 ASSERT_EQ(ctx_.total_responses(), 2u);
926 Chunk chunk = DecodeChunk(ctx_.responses()[1]);
927 EXPECT_EQ(chunk.transfer_id, 7u);
928 ASSERT_TRUE(chunk.status.has_value());
929 EXPECT_EQ(chunk.status.value(), Status::Internal());
930 }
931
TEST_F(WriteTransfer,TimeoutInRecoveryState)932 TEST_F(WriteTransfer, TimeoutInRecoveryState) {
933 ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
934 transfer_thread_.WaitUntilEventIsProcessed();
935
936 ASSERT_EQ(ctx_.total_responses(), 1u);
937 Chunk chunk = DecodeChunk(ctx_.responses().back());
938 EXPECT_EQ(chunk.transfer_id, 7u);
939 EXPECT_EQ(chunk.offset, 0u);
940 ASSERT_TRUE(chunk.pending_bytes.has_value());
941 EXPECT_EQ(chunk.pending_bytes.value(), 32u);
942
943 constexpr std::span data(kData);
944
945 ctx_.SendClientStream<64>(
946 EncodeChunk({.transfer_id = 7, .offset = 0, .data = data.first(8)}));
947
948 // Skip offset 8 to enter a recovery state.
949 ctx_.SendClientStream<64>(EncodeChunk(
950 {.transfer_id = 7, .offset = 12, .data = data.subspan(12, 4)}));
951 transfer_thread_.WaitUntilEventIsProcessed();
952
953 // Recovery parameters should be sent for offset 8.
954 ASSERT_EQ(ctx_.total_responses(), 2u);
955 chunk = DecodeChunk(ctx_.responses().back());
956 EXPECT_EQ(chunk.transfer_id, 7u);
957 EXPECT_EQ(chunk.offset, 8u);
958 ASSERT_TRUE(chunk.pending_bytes.has_value());
959 EXPECT_EQ(chunk.pending_bytes.value(), 24u);
960
961 // Timeout while in the recovery state.
962 transfer_thread_.SimulateServerTimeout(7);
963 transfer_thread_.WaitUntilEventIsProcessed();
964
965 // Same recovery parameters should be re-sent.
966 ASSERT_EQ(ctx_.total_responses(), 3u);
967 chunk = DecodeChunk(ctx_.responses().back());
968 EXPECT_EQ(chunk.transfer_id, 7u);
969 EXPECT_EQ(chunk.offset, 8u);
970 ASSERT_TRUE(chunk.pending_bytes.has_value());
971 EXPECT_EQ(chunk.pending_bytes.value(), 24u);
972 }
973
TEST_F(WriteTransfer,ExtendWindow)974 TEST_F(WriteTransfer, ExtendWindow) {
975 ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
976 transfer_thread_.WaitUntilEventIsProcessed();
977
978 EXPECT_TRUE(handler_.prepare_write_called);
979 EXPECT_FALSE(handler_.finalize_write_called);
980
981 ASSERT_EQ(ctx_.total_responses(), 1u);
982 Chunk chunk = DecodeChunk(ctx_.responses()[0]);
983 EXPECT_EQ(chunk.transfer_id, 7u);
984 EXPECT_EQ(chunk.window_end_offset, 32u);
985 ASSERT_TRUE(chunk.pending_bytes.has_value());
986 EXPECT_EQ(chunk.pending_bytes.value(), 32u);
987
988 // Window starts at 32 bytes and should extend when half of that is sent.
989 ctx_.SendClientStream<64>(EncodeChunk(
990 {.transfer_id = 7, .offset = 0, .data = std::span(kData).first(4)}));
991 transfer_thread_.WaitUntilEventIsProcessed();
992 ASSERT_EQ(ctx_.total_responses(), 1u);
993
994 ctx_.SendClientStream<64>(EncodeChunk(
995 {.transfer_id = 7, .offset = 4, .data = std::span(kData).subspan(4, 4)}));
996 transfer_thread_.WaitUntilEventIsProcessed();
997 ASSERT_EQ(ctx_.total_responses(), 1u);
998
999 ctx_.SendClientStream<64>(EncodeChunk(
1000 {.transfer_id = 7, .offset = 8, .data = std::span(kData).subspan(8, 4)}));
1001 transfer_thread_.WaitUntilEventIsProcessed();
1002 ASSERT_EQ(ctx_.total_responses(), 1u);
1003
1004 ctx_.SendClientStream<64>(
1005 EncodeChunk({.transfer_id = 7,
1006 .offset = 12,
1007 .data = std::span(kData).subspan(12, 4)}));
1008 transfer_thread_.WaitUntilEventIsProcessed();
1009 ASSERT_EQ(ctx_.total_responses(), 2u);
1010
1011 // Extend parameters chunk.
1012 chunk = DecodeChunk(ctx_.responses()[1]);
1013 EXPECT_EQ(chunk.transfer_id, 7u);
1014 EXPECT_EQ(chunk.window_end_offset, 32u);
1015 EXPECT_EQ(chunk.type, Chunk::Type::kParametersContinue);
1016 ASSERT_TRUE(chunk.pending_bytes.has_value());
1017 EXPECT_EQ(chunk.pending_bytes.value(), 16u);
1018
1019 ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
1020 .offset = 16,
1021 .data = std::span(kData).subspan(16),
1022 .remaining_bytes = 0}));
1023 transfer_thread_.WaitUntilEventIsProcessed();
1024
1025 ASSERT_EQ(ctx_.total_responses(), 3u);
1026 chunk = DecodeChunk(ctx_.responses()[2]);
1027 EXPECT_EQ(chunk.transfer_id, 7u);
1028 ASSERT_TRUE(chunk.status.has_value());
1029 EXPECT_EQ(chunk.status.value(), OkStatus());
1030
1031 EXPECT_TRUE(handler_.finalize_write_called);
1032 EXPECT_EQ(handler_.finalize_write_status, OkStatus());
1033 EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
1034 }
1035
1036 class WriteTransferMaxBytes16 : public WriteTransfer {
1037 protected:
WriteTransferMaxBytes16()1038 WriteTransferMaxBytes16() : WriteTransfer(/*max_bytes_to_receive=*/16) {}
1039 };
1040
TEST_F(WriteTransfer,TransmitterReducesWindow)1041 TEST_F(WriteTransfer, TransmitterReducesWindow) {
1042 ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
1043 transfer_thread_.WaitUntilEventIsProcessed();
1044
1045 EXPECT_TRUE(handler_.prepare_write_called);
1046 EXPECT_FALSE(handler_.finalize_write_called);
1047
1048 ASSERT_EQ(ctx_.total_responses(), 1u);
1049 Chunk chunk = DecodeChunk(ctx_.responses().back());
1050 EXPECT_EQ(chunk.transfer_id, 7u);
1051 EXPECT_EQ(chunk.window_end_offset, 32u);
1052
1053 // Send only 12 bytes and set that as the new end offset.
1054 ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
1055 .window_end_offset = 12,
1056 .offset = 0,
1057 .data = std::span(kData).first(12)}));
1058 transfer_thread_.WaitUntilEventIsProcessed();
1059 ASSERT_EQ(ctx_.total_responses(), 2u);
1060
1061 // Receiver should respond immediately with a retransmit chunk as the end of
1062 // the window has been reached.
1063 chunk = DecodeChunk(ctx_.responses().back());
1064 EXPECT_EQ(chunk.transfer_id, 7u);
1065 EXPECT_EQ(chunk.offset, 12u);
1066 EXPECT_EQ(chunk.window_end_offset, 32u);
1067 EXPECT_EQ(chunk.type, Chunk::Type::kParametersRetransmit);
1068 }
1069
TEST_F(WriteTransfer,TransmitterExtendsWindow_TerminatesWithInvalid)1070 TEST_F(WriteTransfer, TransmitterExtendsWindow_TerminatesWithInvalid) {
1071 ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
1072 transfer_thread_.WaitUntilEventIsProcessed();
1073
1074 EXPECT_TRUE(handler_.prepare_write_called);
1075 EXPECT_FALSE(handler_.finalize_write_called);
1076
1077 ASSERT_EQ(ctx_.total_responses(), 1u);
1078 Chunk chunk = DecodeChunk(ctx_.responses().back());
1079 EXPECT_EQ(chunk.transfer_id, 7u);
1080 EXPECT_EQ(chunk.window_end_offset, 32u);
1081
1082 // Send only 12 bytes and set that as the new end offset.
1083 ctx_.SendClientStream<64>(
1084 EncodeChunk({.transfer_id = 7,
1085 // Larger window end offset than the receiver's.
1086 .window_end_offset = 48,
1087 .offset = 0,
1088 .data = std::span(kData).first(16)}));
1089 transfer_thread_.WaitUntilEventIsProcessed();
1090 ASSERT_EQ(ctx_.total_responses(), 2u);
1091
1092 chunk = DecodeChunk(ctx_.responses().back());
1093 EXPECT_EQ(chunk.transfer_id, 7u);
1094 ASSERT_TRUE(chunk.status.has_value());
1095 EXPECT_EQ(chunk.status.value(), Status::Internal());
1096 }
1097
TEST_F(WriteTransferMaxBytes16,MultipleParameters)1098 TEST_F(WriteTransferMaxBytes16, MultipleParameters) {
1099 ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
1100 transfer_thread_.WaitUntilEventIsProcessed();
1101
1102 EXPECT_TRUE(handler_.prepare_write_called);
1103 EXPECT_FALSE(handler_.finalize_write_called);
1104
1105 ASSERT_EQ(ctx_.total_responses(), 1u);
1106 Chunk chunk = DecodeChunk(ctx_.responses()[0]);
1107 EXPECT_EQ(chunk.transfer_id, 7u);
1108 ASSERT_TRUE(chunk.pending_bytes.has_value());
1109 EXPECT_EQ(chunk.pending_bytes.value(), 16u);
1110
1111 ctx_.SendClientStream<64>(EncodeChunk(
1112 {.transfer_id = 7, .offset = 0, .data = std::span(kData).first(8)}));
1113 transfer_thread_.WaitUntilEventIsProcessed();
1114
1115 ASSERT_EQ(ctx_.total_responses(), 2u);
1116 chunk = DecodeChunk(ctx_.responses()[1]);
1117 EXPECT_EQ(chunk.transfer_id, 7u);
1118 EXPECT_EQ(chunk.offset, 8u);
1119 EXPECT_EQ(chunk.window_end_offset, 24u);
1120 ASSERT_TRUE(chunk.pending_bytes.has_value());
1121 EXPECT_EQ(chunk.pending_bytes.value(), 16u);
1122
1123 ctx_.SendClientStream<64>(EncodeChunk(
1124 {.transfer_id = 7, .offset = 8, .data = std::span(kData).subspan(8, 8)}));
1125 transfer_thread_.WaitUntilEventIsProcessed();
1126
1127 ASSERT_EQ(ctx_.total_responses(), 3u);
1128 chunk = DecodeChunk(ctx_.responses()[2]);
1129 EXPECT_EQ(chunk.transfer_id, 7u);
1130 EXPECT_EQ(chunk.offset, 16u);
1131 EXPECT_EQ(chunk.window_end_offset, 32u);
1132 ASSERT_TRUE(chunk.pending_bytes.has_value());
1133 EXPECT_EQ(chunk.pending_bytes.value(), 16u);
1134
1135 ctx_.SendClientStream<64>(
1136 EncodeChunk({.transfer_id = 7,
1137 .offset = 16,
1138 .data = std::span(kData).subspan(16, 8)}));
1139 transfer_thread_.WaitUntilEventIsProcessed();
1140
1141 ASSERT_EQ(ctx_.total_responses(), 4u);
1142 chunk = DecodeChunk(ctx_.responses()[3]);
1143 EXPECT_EQ(chunk.transfer_id, 7u);
1144 EXPECT_EQ(chunk.offset, 24u);
1145 EXPECT_EQ(chunk.window_end_offset, 32u);
1146 ASSERT_TRUE(chunk.pending_bytes.has_value());
1147 EXPECT_EQ(chunk.pending_bytes.value(), 8u);
1148
1149 ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
1150 .offset = 24,
1151 .data = std::span(kData).subspan(24),
1152 .remaining_bytes = 0}));
1153 transfer_thread_.WaitUntilEventIsProcessed();
1154
1155 ASSERT_EQ(ctx_.total_responses(), 5u);
1156 chunk = DecodeChunk(ctx_.responses()[4]);
1157 EXPECT_EQ(chunk.transfer_id, 7u);
1158 ASSERT_TRUE(chunk.status.has_value());
1159 EXPECT_EQ(chunk.status.value(), OkStatus());
1160
1161 EXPECT_TRUE(handler_.finalize_write_called);
1162 EXPECT_EQ(handler_.finalize_write_status, OkStatus());
1163 EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
1164 }
1165
TEST_F(WriteTransferMaxBytes16,SetsDefaultPendingBytes)1166 TEST_F(WriteTransferMaxBytes16, SetsDefaultPendingBytes) {
1167 // Default max bytes is smaller than buffer.
1168 ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
1169 transfer_thread_.WaitUntilEventIsProcessed();
1170
1171 ASSERT_EQ(ctx_.total_responses(), 1u);
1172 Chunk chunk = DecodeChunk(ctx_.responses()[0]);
1173 EXPECT_EQ(chunk.transfer_id, 7u);
1174 EXPECT_EQ(chunk.pending_bytes.value(), 16u);
1175 }
1176
TEST_F(WriteTransfer,SetsWriterPendingBytes)1177 TEST_F(WriteTransfer, SetsWriterPendingBytes) {
1178 // Buffer is smaller than constructor's default max bytes.
1179 std::array<std::byte, 8> small_buffer = {};
1180
1181 SimpleWriteTransfer handler_(987, small_buffer);
1182 ctx_.service().RegisterHandler(handler_);
1183
1184 ctx_.SendClientStream(EncodeChunk({.transfer_id = 987}));
1185 transfer_thread_.WaitUntilEventIsProcessed();
1186
1187 ASSERT_EQ(ctx_.total_responses(), 1u);
1188 Chunk chunk = DecodeChunk(ctx_.responses()[0]);
1189 EXPECT_EQ(chunk.transfer_id, 987u);
1190 EXPECT_EQ(chunk.pending_bytes.value(), 8u);
1191 }
1192
TEST_F(WriteTransfer,UnexpectedOffset)1193 TEST_F(WriteTransfer, UnexpectedOffset) {
1194 ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
1195 transfer_thread_.WaitUntilEventIsProcessed();
1196
1197 EXPECT_TRUE(handler_.prepare_write_called);
1198 EXPECT_FALSE(handler_.finalize_write_called);
1199
1200 ASSERT_EQ(ctx_.total_responses(), 1u);
1201 Chunk chunk = DecodeChunk(ctx_.responses()[0]);
1202 EXPECT_EQ(chunk.transfer_id, 7u);
1203 EXPECT_EQ(chunk.offset, 0u);
1204 ASSERT_TRUE(chunk.pending_bytes.has_value());
1205 EXPECT_EQ(chunk.pending_bytes.value(), 32u);
1206
1207 ctx_.SendClientStream<64>(EncodeChunk(
1208 {.transfer_id = 7, .offset = 0, .data = std::span(kData).first(8)}));
1209 transfer_thread_.WaitUntilEventIsProcessed();
1210
1211 ASSERT_EQ(ctx_.total_responses(), 1u);
1212
1213 ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
1214 .offset = 4, // incorrect
1215 .data = std::span(kData).subspan(16),
1216 .remaining_bytes = 0}));
1217 transfer_thread_.WaitUntilEventIsProcessed();
1218
1219 ASSERT_EQ(ctx_.total_responses(), 2u);
1220 chunk = DecodeChunk(ctx_.responses()[1]);
1221 EXPECT_EQ(chunk.transfer_id, 7u);
1222 EXPECT_EQ(chunk.offset, 8u);
1223 ASSERT_TRUE(chunk.pending_bytes.has_value());
1224 EXPECT_EQ(chunk.pending_bytes.value(), 24u);
1225
1226 ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
1227 .offset = 8, // correct
1228 .data = std::span(kData).subspan(8),
1229 .remaining_bytes = 0}));
1230 transfer_thread_.WaitUntilEventIsProcessed();
1231
1232 ASSERT_EQ(ctx_.total_responses(), 3u);
1233 chunk = DecodeChunk(ctx_.responses()[2]);
1234 EXPECT_EQ(chunk.transfer_id, 7u);
1235 ASSERT_TRUE(chunk.status.has_value());
1236 EXPECT_EQ(chunk.status.value(), OkStatus());
1237
1238 EXPECT_TRUE(handler_.finalize_write_called);
1239 EXPECT_EQ(handler_.finalize_write_status, OkStatus());
1240 EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
1241 }
1242
TEST_F(WriteTransferMaxBytes16,TooMuchData)1243 TEST_F(WriteTransferMaxBytes16, TooMuchData) {
1244 ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
1245 transfer_thread_.WaitUntilEventIsProcessed();
1246
1247 EXPECT_TRUE(handler_.prepare_write_called);
1248 EXPECT_FALSE(handler_.finalize_write_called);
1249
1250 ASSERT_EQ(ctx_.total_responses(), 1u);
1251 Chunk chunk = DecodeChunk(ctx_.responses()[0]);
1252 EXPECT_EQ(chunk.transfer_id, 7u);
1253 ASSERT_TRUE(chunk.pending_bytes.has_value());
1254 EXPECT_EQ(chunk.pending_bytes.value(), 16u);
1255
1256 // pending_bytes = 16 but send 24
1257 ctx_.SendClientStream<64>(EncodeChunk(
1258 {.transfer_id = 7, .offset = 0, .data = std::span(kData).first(24)}));
1259 transfer_thread_.WaitUntilEventIsProcessed();
1260
1261 ASSERT_EQ(ctx_.total_responses(), 2u);
1262 chunk = DecodeChunk(ctx_.responses()[1]);
1263 EXPECT_EQ(chunk.transfer_id, 7u);
1264 ASSERT_TRUE(chunk.status.has_value());
1265 EXPECT_EQ(chunk.status.value(), Status::Internal());
1266 }
1267
TEST_F(WriteTransfer,UnregisteredHandler)1268 TEST_F(WriteTransfer, UnregisteredHandler) {
1269 ctx_.SendClientStream(EncodeChunk({.transfer_id = 999}));
1270 transfer_thread_.WaitUntilEventIsProcessed();
1271
1272 ASSERT_EQ(ctx_.total_responses(), 1u);
1273 Chunk chunk = DecodeChunk(ctx_.responses()[0]);
1274 EXPECT_EQ(chunk.transfer_id, 999u);
1275 ASSERT_TRUE(chunk.status.has_value());
1276 EXPECT_EQ(chunk.status.value(), Status::NotFound());
1277 }
1278
TEST_F(WriteTransfer,ClientError)1279 TEST_F(WriteTransfer, ClientError) {
1280 ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
1281 transfer_thread_.WaitUntilEventIsProcessed();
1282
1283 EXPECT_TRUE(handler_.prepare_write_called);
1284 EXPECT_FALSE(handler_.finalize_write_called);
1285
1286 ASSERT_EQ(ctx_.total_responses(), 1u);
1287 Chunk chunk = DecodeChunk(ctx_.responses()[0]);
1288 EXPECT_EQ(chunk.transfer_id, 7u);
1289 ASSERT_TRUE(chunk.pending_bytes.has_value());
1290 EXPECT_EQ(chunk.pending_bytes.value(), 32u);
1291
1292 ctx_.SendClientStream<64>(
1293 EncodeChunk({.transfer_id = 7, .status = Status::DataLoss()}));
1294 transfer_thread_.WaitUntilEventIsProcessed();
1295
1296 EXPECT_EQ(ctx_.total_responses(), 1u);
1297
1298 EXPECT_TRUE(handler_.finalize_write_called);
1299 EXPECT_EQ(handler_.finalize_write_status, Status::DataLoss());
1300 }
1301
TEST_F(WriteTransfer,OnlySendParametersUpdateOnceAfterDrop)1302 TEST_F(WriteTransfer, OnlySendParametersUpdateOnceAfterDrop) {
1303 ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
1304 transfer_thread_.WaitUntilEventIsProcessed();
1305
1306 ASSERT_EQ(ctx_.total_responses(), 1u);
1307
1308 constexpr std::span data(kData);
1309 ctx_.SendClientStream<64>(
1310 EncodeChunk({.transfer_id = 7, .offset = 0, .data = data.first(1)}));
1311
1312 // Drop offset 1, then send the rest of the data.
1313 for (uint32_t i = 2; i < kData.size(); ++i) {
1314 ctx_.SendClientStream<64>(EncodeChunk(
1315 {.transfer_id = 7, .offset = i, .data = data.subspan(i, 1)}));
1316 }
1317
1318 transfer_thread_.WaitUntilEventIsProcessed();
1319
1320 ASSERT_EQ(ctx_.total_responses(), 2u);
1321 Chunk chunk = DecodeChunk(ctx_.responses().back());
1322 EXPECT_EQ(chunk.transfer_id, 7u);
1323 EXPECT_EQ(chunk.offset, 1u);
1324
1325 // Send the remaining data and the final status.
1326 ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
1327 .offset = 1,
1328 .data = data.subspan(1, 31),
1329 .status = OkStatus()}));
1330 transfer_thread_.WaitUntilEventIsProcessed();
1331
1332 EXPECT_TRUE(handler_.finalize_write_called);
1333 EXPECT_EQ(handler_.finalize_write_status, OkStatus());
1334 }
1335
TEST_F(WriteTransfer,ResendParametersIfSentRepeatedChunkDuringRecovery)1336 TEST_F(WriteTransfer, ResendParametersIfSentRepeatedChunkDuringRecovery) {
1337 ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
1338 transfer_thread_.WaitUntilEventIsProcessed();
1339
1340 ASSERT_EQ(ctx_.total_responses(), 1u);
1341
1342 constexpr std::span data(kData);
1343
1344 // Skip offset 0, then send the rest of the data.
1345 for (uint32_t i = 1; i < kData.size(); ++i) {
1346 ctx_.SendClientStream<64>(EncodeChunk(
1347 {.transfer_id = 7, .offset = i, .data = data.subspan(i, 1)}));
1348 }
1349
1350 transfer_thread_.WaitUntilEventIsProcessed();
1351
1352 ASSERT_EQ(ctx_.total_responses(), 2u); // Resent transfer parameters once.
1353
1354 const auto last_chunk = EncodeChunk(
1355 {.transfer_id = 7, .offset = kData.size() - 1, .data = data.last(1)});
1356 ctx_.SendClientStream<64>(last_chunk);
1357 transfer_thread_.WaitUntilEventIsProcessed();
1358
1359 // Resent transfer parameters since the packet is repeated
1360 ASSERT_EQ(ctx_.total_responses(), 3u);
1361
1362 ctx_.SendClientStream<64>(last_chunk);
1363 transfer_thread_.WaitUntilEventIsProcessed();
1364
1365 ASSERT_EQ(ctx_.total_responses(), 4u);
1366
1367 Chunk chunk = DecodeChunk(ctx_.responses().back());
1368 EXPECT_EQ(chunk.transfer_id, 7u);
1369 EXPECT_EQ(chunk.offset, 0u);
1370 EXPECT_TRUE(chunk.pending_bytes.has_value());
1371
1372 // Resumes normal operation when correct offset is sent.
1373 ctx_.SendClientStream<64>(EncodeChunk(
1374 {.transfer_id = 7, .offset = 0, .data = kData, .status = OkStatus()}));
1375 transfer_thread_.WaitUntilEventIsProcessed();
1376
1377 EXPECT_TRUE(handler_.finalize_write_called);
1378 EXPECT_EQ(handler_.finalize_write_status, OkStatus());
1379 }
1380
TEST_F(WriteTransfer,ResendsStatusIfClientRetriesAfterStatusChunk)1381 TEST_F(WriteTransfer, ResendsStatusIfClientRetriesAfterStatusChunk) {
1382 ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
1383 transfer_thread_.WaitUntilEventIsProcessed();
1384
1385 ASSERT_EQ(ctx_.total_responses(), 1u);
1386
1387 ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
1388 .offset = 0,
1389 .data = std::span(kData),
1390 .remaining_bytes = 0}));
1391 transfer_thread_.WaitUntilEventIsProcessed();
1392
1393 ASSERT_EQ(ctx_.total_responses(), 2u);
1394 Chunk chunk = DecodeChunk(ctx_.responses().back());
1395 ASSERT_TRUE(chunk.status.has_value());
1396 EXPECT_EQ(chunk.status.value(), OkStatus());
1397
1398 ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
1399 .offset = 0,
1400 .data = std::span(kData),
1401 .remaining_bytes = 0}));
1402 transfer_thread_.WaitUntilEventIsProcessed();
1403
1404 ASSERT_EQ(ctx_.total_responses(), 3u);
1405 chunk = DecodeChunk(ctx_.responses().back());
1406 ASSERT_TRUE(chunk.status.has_value());
1407 EXPECT_EQ(chunk.status.value(), OkStatus());
1408 }
1409
TEST_F(WriteTransfer,IgnoresNonPendingTransfers)1410 TEST_F(WriteTransfer, IgnoresNonPendingTransfers) {
1411 ctx_.SendClientStream(EncodeChunk({.transfer_id = 7, .offset = 3}));
1412 ctx_.SendClientStream(EncodeChunk(
1413 {.transfer_id = 7, .offset = 0, .data = std::span(kData).first(10)}));
1414 ctx_.SendClientStream(EncodeChunk({.transfer_id = 7, .status = OkStatus()}));
1415
1416 transfer_thread_.WaitUntilEventIsProcessed();
1417
1418 // Only start transfer for initial packet.
1419 EXPECT_FALSE(handler_.prepare_write_called);
1420 EXPECT_FALSE(handler_.finalize_write_called);
1421 }
1422
TEST_F(WriteTransfer,AbortAndRestartIfInitialPacketIsReceived)1423 TEST_F(WriteTransfer, AbortAndRestartIfInitialPacketIsReceived) {
1424 ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
1425 transfer_thread_.WaitUntilEventIsProcessed();
1426
1427 ASSERT_EQ(ctx_.total_responses(), 1u);
1428
1429 ctx_.SendClientStream<64>(EncodeChunk(
1430 {.transfer_id = 7, .offset = 0, .data = std::span(kData).first(8)}));
1431 transfer_thread_.WaitUntilEventIsProcessed();
1432
1433 ASSERT_EQ(ctx_.total_responses(), 1u);
1434
1435 ASSERT_TRUE(handler_.prepare_write_called);
1436 ASSERT_FALSE(handler_.finalize_write_called);
1437 handler_.prepare_write_called = false; // Reset to check it's called again.
1438
1439 // Simulate client disappearing then restarting the transfer.
1440 ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
1441 transfer_thread_.WaitUntilEventIsProcessed();
1442
1443 EXPECT_TRUE(handler_.prepare_write_called);
1444 EXPECT_TRUE(handler_.finalize_write_called);
1445 EXPECT_EQ(handler_.finalize_write_status, Status::Aborted());
1446
1447 handler_.finalize_write_called = false; // Reset to check it's called again.
1448
1449 ASSERT_EQ(ctx_.total_responses(), 2u);
1450
1451 ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
1452 .offset = 0,
1453 .data = std::span(kData),
1454 .remaining_bytes = 0}));
1455 transfer_thread_.WaitUntilEventIsProcessed();
1456
1457 ASSERT_EQ(ctx_.total_responses(), 3u);
1458
1459 EXPECT_TRUE(handler_.finalize_write_called);
1460 EXPECT_EQ(handler_.finalize_write_status, OkStatus());
1461 EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
1462 }
1463
1464 class SometimesUnavailableReadHandler final : public ReadOnlyHandler {
1465 public:
SometimesUnavailableReadHandler(uint32_t transfer_id,ConstByteSpan data)1466 SometimesUnavailableReadHandler(uint32_t transfer_id, ConstByteSpan data)
1467 : ReadOnlyHandler(transfer_id), reader_(data), call_count_(0) {}
1468
PrepareRead()1469 Status PrepareRead() final {
1470 if ((call_count_++ % 2) == 0) {
1471 return Status::Unavailable();
1472 }
1473
1474 set_reader(reader_);
1475 return OkStatus();
1476 }
1477
1478 private:
1479 stream::MemoryReader reader_;
1480 int call_count_;
1481 };
1482
TEST_F(ReadTransfer,PrepareError)1483 TEST_F(ReadTransfer, PrepareError) {
1484 SometimesUnavailableReadHandler unavailable_handler(88, kData);
1485 ctx_.service().RegisterHandler(unavailable_handler);
1486
1487 ctx_.SendClientStream(
1488 EncodeChunk({.transfer_id = 88, .pending_bytes = 128, .offset = 0}));
1489 transfer_thread_.WaitUntilEventIsProcessed();
1490
1491 ASSERT_EQ(ctx_.total_responses(), 1u);
1492 Chunk chunk = DecodeChunk(ctx_.responses()[0]);
1493 EXPECT_EQ(chunk.transfer_id, 88u);
1494 ASSERT_TRUE(chunk.status.has_value());
1495 EXPECT_EQ(chunk.status.value(), Status::DataLoss());
1496
1497 // Try starting the transfer again. It should work this time.
1498 // TODO(frolv): This won't work until completion ACKs are supported.
1499 if (false) {
1500 ctx_.SendClientStream(
1501 EncodeChunk({.transfer_id = 88, .pending_bytes = 128, .offset = 0}));
1502 transfer_thread_.WaitUntilEventIsProcessed();
1503
1504 ASSERT_EQ(ctx_.total_responses(), 2u);
1505 chunk = DecodeChunk(ctx_.responses()[1]);
1506 EXPECT_EQ(chunk.transfer_id, 88u);
1507 ASSERT_EQ(chunk.data.size(), kData.size());
1508 EXPECT_EQ(std::memcmp(chunk.data.data(), kData.data(), chunk.data.size()),
1509 0);
1510 }
1511 }
1512
TEST_F(WriteTransferMaxBytes16,Service_SetMaxPendingBytes)1513 TEST_F(WriteTransferMaxBytes16, Service_SetMaxPendingBytes) {
1514 ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
1515 transfer_thread_.WaitUntilEventIsProcessed();
1516
1517 EXPECT_TRUE(handler_.prepare_write_called);
1518 EXPECT_FALSE(handler_.finalize_write_called);
1519
1520 // First parameters chunk has default pending bytes of 16.
1521 ASSERT_EQ(ctx_.total_responses(), 1u);
1522 Chunk chunk = DecodeChunk(ctx_.responses()[0]);
1523 EXPECT_EQ(chunk.transfer_id, 7u);
1524 ASSERT_TRUE(chunk.pending_bytes.has_value());
1525 EXPECT_EQ(chunk.pending_bytes.value(), 16u);
1526
1527 // Update the pending bytes value.
1528 ctx_.service().set_max_pending_bytes(12);
1529
1530 ctx_.SendClientStream<64>(EncodeChunk(
1531 {.transfer_id = 7, .offset = 0, .data = std::span(kData).first(8)}));
1532 transfer_thread_.WaitUntilEventIsProcessed();
1533
1534 // Second parameters chunk should use the new max pending bytes.
1535 ASSERT_EQ(ctx_.total_responses(), 2u);
1536 chunk = DecodeChunk(ctx_.responses()[1]);
1537 EXPECT_EQ(chunk.transfer_id, 7u);
1538 EXPECT_EQ(chunk.offset, 8u);
1539 EXPECT_EQ(chunk.window_end_offset, 20u);
1540 ASSERT_TRUE(chunk.pending_bytes.has_value());
1541 EXPECT_EQ(chunk.pending_bytes.value(), 12u);
1542 }
1543
1544 PW_MODIFY_DIAGNOSTICS_POP();
1545
1546 } // namespace
1547 } // namespace pw::transfer::test
1548