1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_transfer/client.h"
16
17 #include <cstring>
18
19 #include "gtest/gtest.h"
20 #include "pw_assert/check.h"
21 #include "pw_bytes/array.h"
22 #include "pw_rpc/raw/client_testing.h"
23 #include "pw_rpc/thread_testing.h"
24 #include "pw_thread/sleep.h"
25 #include "pw_thread/thread.h"
26 #include "pw_thread_stl/options.h"
27 #include "pw_transfer_private/chunk_testing.h"
28
29 namespace pw::transfer::test {
30 namespace {
31
32 using internal::Chunk;
33 using pw_rpc::raw::Transfer;
34
35 using namespace std::chrono_literals;
36
37 PW_MODIFY_DIAGNOSTICS_PUSH();
38 PW_MODIFY_DIAGNOSTIC(ignored, "-Wmissing-field-initializers");
39
TransferThreadOptions()40 thread::Options& TransferThreadOptions() {
41 static thread::stl::Options options;
42 return options;
43 }
44
45 class ReadTransfer : public ::testing::Test {
46 protected:
ReadTransfer(size_t max_bytes_to_receive=0)47 ReadTransfer(size_t max_bytes_to_receive = 0)
48 : transfer_thread_(chunk_buffer_, encode_buffer_),
49 client_(context_.client(),
50 context_.channel().id(),
51 transfer_thread_,
52 max_bytes_to_receive),
53 system_thread_(TransferThreadOptions(), transfer_thread_) {}
54
~ReadTransfer()55 ~ReadTransfer() {
56 transfer_thread_.Terminate();
57 system_thread_.join();
58 }
59
60 rpc::RawClientTestContext<> context_;
61
62 Thread<1, 1> transfer_thread_;
63 Client client_;
64
65 std::array<std::byte, 64> chunk_buffer_;
66 std::array<std::byte, 64> encode_buffer_;
67
68 thread::Thread system_thread_;
69 };
70
__anon689fba7d0202(size_t i) 71 constexpr auto kData32 = bytes::Initialized<32>([](size_t i) { return i; });
__anon689fba7d0302(size_t i) 72 constexpr auto kData64 = bytes::Initialized<64>([](size_t i) { return i; });
73
TEST_F(ReadTransfer,SingleChunk)74 TEST_F(ReadTransfer, SingleChunk) {
75 stream::MemoryWriterBuffer<64> writer;
76 Status transfer_status = Status::Unknown();
77
78 ASSERT_EQ(OkStatus(),
79 client_.Read(3, writer, [&transfer_status](Status status) {
80 transfer_status = status;
81 }));
82
83 transfer_thread_.WaitUntilEventIsProcessed();
84
85 // First transfer parameters chunk is sent.
86 rpc::PayloadsView payloads =
87 context_.output().payloads<Transfer::Read>(context_.channel().id());
88 ASSERT_EQ(payloads.size(), 1u);
89 EXPECT_EQ(transfer_status, Status::Unknown());
90
91 Chunk c0 = DecodeChunk(payloads[0]);
92 EXPECT_EQ(c0.transfer_id, 3u);
93 EXPECT_EQ(c0.offset, 0u);
94 EXPECT_EQ(c0.pending_bytes.value(), 64u);
95
96 context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
97 {.transfer_id = 3u, .offset = 0, .data = kData32, .remaining_bytes = 0}));
98 transfer_thread_.WaitUntilEventIsProcessed();
99
100 ASSERT_EQ(payloads.size(), 2u);
101
102 Chunk c1 = DecodeChunk(payloads[1]);
103 EXPECT_EQ(c1.transfer_id, 3u);
104 ASSERT_TRUE(c1.status.has_value());
105 EXPECT_EQ(c1.status.value(), OkStatus());
106
107 EXPECT_EQ(transfer_status, OkStatus());
108 EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
109 0);
110 }
111
TEST_F(ReadTransfer,MultiChunk)112 TEST_F(ReadTransfer, MultiChunk) {
113 stream::MemoryWriterBuffer<64> writer;
114 Status transfer_status = Status::Unknown();
115
116 ASSERT_EQ(OkStatus(),
117 client_.Read(4, writer, [&transfer_status](Status status) {
118 transfer_status = status;
119 }));
120
121 transfer_thread_.WaitUntilEventIsProcessed();
122
123 // First transfer parameters chunk is sent.
124 rpc::PayloadsView payloads =
125 context_.output().payloads<Transfer::Read>(context_.channel().id());
126 ASSERT_EQ(payloads.size(), 1u);
127 EXPECT_EQ(transfer_status, Status::Unknown());
128
129 Chunk c0 = DecodeChunk(payloads[0]);
130 EXPECT_EQ(c0.transfer_id, 4u);
131 EXPECT_EQ(c0.offset, 0u);
132 EXPECT_EQ(c0.pending_bytes.value(), 64u);
133
134 constexpr ConstByteSpan data(kData32);
135 context_.server().SendServerStream<Transfer::Read>(
136 EncodeChunk({.transfer_id = 4u, .offset = 0, .data = data.first(16)}));
137 transfer_thread_.WaitUntilEventIsProcessed();
138
139 ASSERT_EQ(payloads.size(), 1u);
140
141 context_.server().SendServerStream<Transfer::Read>(
142 EncodeChunk({.transfer_id = 4u,
143 .offset = 16,
144 .data = data.subspan(16),
145 .remaining_bytes = 0}));
146 transfer_thread_.WaitUntilEventIsProcessed();
147
148 ASSERT_EQ(payloads.size(), 2u);
149
150 Chunk c1 = DecodeChunk(payloads[1]);
151 EXPECT_EQ(c1.transfer_id, 4u);
152 ASSERT_TRUE(c1.status.has_value());
153 EXPECT_EQ(c1.status.value(), OkStatus());
154
155 EXPECT_EQ(transfer_status, OkStatus());
156 EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
157 0);
158 }
159
TEST_F(ReadTransfer,MultipleTransfers)160 TEST_F(ReadTransfer, MultipleTransfers) {
161 stream::MemoryWriterBuffer<64> writer;
162 Status transfer_status = Status::Unknown();
163
164 ASSERT_EQ(OkStatus(),
165 client_.Read(3, writer, [&transfer_status](Status status) {
166 transfer_status = status;
167 }));
168 transfer_thread_.WaitUntilEventIsProcessed();
169
170 context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
171 {.transfer_id = 3u, .offset = 0, .data = kData32, .remaining_bytes = 0}));
172 transfer_thread_.WaitUntilEventIsProcessed();
173
174 ASSERT_EQ(transfer_status, OkStatus());
175 transfer_status = Status::Unknown();
176
177 ASSERT_EQ(OkStatus(),
178 client_.Read(3, writer, [&transfer_status](Status status) {
179 transfer_status = status;
180 }));
181 transfer_thread_.WaitUntilEventIsProcessed();
182
183 context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
184 {.transfer_id = 3u, .offset = 0, .data = kData32, .remaining_bytes = 0}));
185 transfer_thread_.WaitUntilEventIsProcessed();
186
187 EXPECT_EQ(transfer_status, OkStatus());
188 }
189
190 class ReadTransferMaxBytes32 : public ReadTransfer {
191 protected:
ReadTransferMaxBytes32()192 ReadTransferMaxBytes32() : ReadTransfer(/*max_bytes_to_receive=*/32) {}
193 };
194
TEST_F(ReadTransferMaxBytes32,SetsPendingBytesFromConstructorArg)195 TEST_F(ReadTransferMaxBytes32, SetsPendingBytesFromConstructorArg) {
196 stream::MemoryWriterBuffer<64> writer;
197 EXPECT_EQ(OkStatus(), client_.Read(5, writer, [](Status) {}));
198 transfer_thread_.WaitUntilEventIsProcessed();
199
200 // First transfer parameters chunk is sent.
201 rpc::PayloadsView payloads =
202 context_.output().payloads<Transfer::Read>(context_.channel().id());
203 ASSERT_EQ(payloads.size(), 1u);
204
205 Chunk c0 = DecodeChunk(payloads[0]);
206 EXPECT_EQ(c0.transfer_id, 5u);
207 EXPECT_EQ(c0.offset, 0u);
208 ASSERT_EQ(c0.pending_bytes.value(), 32u);
209 }
210
TEST_F(ReadTransferMaxBytes32,SetsPendingBytesFromWriterLimit)211 TEST_F(ReadTransferMaxBytes32, SetsPendingBytesFromWriterLimit) {
212 stream::MemoryWriterBuffer<16> small_writer;
213 EXPECT_EQ(OkStatus(), client_.Read(5, small_writer, [](Status) {}));
214 transfer_thread_.WaitUntilEventIsProcessed();
215
216 // First transfer parameters chunk is sent.
217 rpc::PayloadsView payloads =
218 context_.output().payloads<Transfer::Read>(context_.channel().id());
219 ASSERT_EQ(payloads.size(), 1u);
220
221 Chunk c0 = DecodeChunk(payloads[0]);
222 EXPECT_EQ(c0.transfer_id, 5u);
223 EXPECT_EQ(c0.offset, 0u);
224 ASSERT_EQ(c0.pending_bytes.value(), 16u);
225 }
226
TEST_F(ReadTransferMaxBytes32,MultiParameters)227 TEST_F(ReadTransferMaxBytes32, MultiParameters) {
228 stream::MemoryWriterBuffer<64> writer;
229 Status transfer_status = Status::Unknown();
230
231 ASSERT_EQ(OkStatus(),
232 client_.Read(6, writer, [&transfer_status](Status status) {
233 transfer_status = status;
234 }));
235 transfer_thread_.WaitUntilEventIsProcessed();
236
237 // First transfer parameters chunk is sent.
238 rpc::PayloadsView payloads =
239 context_.output().payloads<Transfer::Read>(context_.channel().id());
240 ASSERT_EQ(payloads.size(), 1u);
241 EXPECT_EQ(transfer_status, Status::Unknown());
242
243 Chunk c0 = DecodeChunk(payloads[0]);
244 EXPECT_EQ(c0.transfer_id, 6u);
245 EXPECT_EQ(c0.offset, 0u);
246 ASSERT_EQ(c0.pending_bytes.value(), 32u);
247
248 constexpr ConstByteSpan data(kData64);
249 context_.server().SendServerStream<Transfer::Read>(
250 EncodeChunk({.transfer_id = 6u, .offset = 0, .data = data.first(32)}));
251 transfer_thread_.WaitUntilEventIsProcessed();
252
253 ASSERT_EQ(payloads.size(), 2u);
254 EXPECT_EQ(transfer_status, Status::Unknown());
255
256 // Second parameters chunk.
257 Chunk c1 = DecodeChunk(payloads[1]);
258 EXPECT_EQ(c1.transfer_id, 6u);
259 EXPECT_EQ(c1.offset, 32u);
260 ASSERT_EQ(c1.pending_bytes.value(), 32u);
261
262 context_.server().SendServerStream<Transfer::Read>(
263 EncodeChunk({.transfer_id = 6u,
264 .offset = 32,
265 .data = data.subspan(32),
266 .remaining_bytes = 0}));
267 transfer_thread_.WaitUntilEventIsProcessed();
268
269 ASSERT_EQ(payloads.size(), 3u);
270
271 Chunk c2 = DecodeChunk(payloads[2]);
272 EXPECT_EQ(c2.transfer_id, 6u);
273 ASSERT_TRUE(c2.status.has_value());
274 EXPECT_EQ(c2.status.value(), OkStatus());
275
276 EXPECT_EQ(transfer_status, OkStatus());
277 EXPECT_EQ(std::memcmp(writer.data(), data.data(), writer.bytes_written()), 0);
278 }
279
TEST_F(ReadTransfer,UnexpectedOffset)280 TEST_F(ReadTransfer, UnexpectedOffset) {
281 stream::MemoryWriterBuffer<64> writer;
282 Status transfer_status = Status::Unknown();
283
284 ASSERT_EQ(OkStatus(),
285 client_.Read(7, writer, [&transfer_status](Status status) {
286 transfer_status = status;
287 }));
288 transfer_thread_.WaitUntilEventIsProcessed();
289
290 // First transfer parameters chunk is sent.
291 rpc::PayloadsView payloads =
292 context_.output().payloads<Transfer::Read>(context_.channel().id());
293 ASSERT_EQ(payloads.size(), 1u);
294 EXPECT_EQ(transfer_status, Status::Unknown());
295
296 Chunk c0 = DecodeChunk(payloads[0]);
297 EXPECT_EQ(c0.transfer_id, 7u);
298 EXPECT_EQ(c0.offset, 0u);
299 EXPECT_EQ(c0.pending_bytes.value(), 64u);
300
301 constexpr ConstByteSpan data(kData32);
302 context_.server().SendServerStream<Transfer::Read>(
303 EncodeChunk({.transfer_id = 7u, .offset = 0, .data = data.first(16)}));
304 transfer_thread_.WaitUntilEventIsProcessed();
305
306 ASSERT_EQ(payloads.size(), 1u);
307 EXPECT_EQ(transfer_status, Status::Unknown());
308
309 // Send a chunk with an incorrect offset. The client should resend parameters.
310 context_.server().SendServerStream<Transfer::Read>(
311 EncodeChunk({.transfer_id = 7u,
312 .offset = 8, // wrong!
313 .data = data.subspan(16),
314 .remaining_bytes = 0}));
315 transfer_thread_.WaitUntilEventIsProcessed();
316
317 ASSERT_EQ(payloads.size(), 2u);
318 EXPECT_EQ(transfer_status, Status::Unknown());
319
320 Chunk c1 = DecodeChunk(payloads[1]);
321 EXPECT_EQ(c1.transfer_id, 7u);
322 EXPECT_EQ(c1.offset, 16u);
323 EXPECT_EQ(c1.pending_bytes.value(), 48u);
324
325 // Send the correct chunk, completing the transfer.
326 context_.server().SendServerStream<Transfer::Read>(
327 EncodeChunk({.transfer_id = 7u,
328 .offset = 16,
329 .data = data.subspan(16),
330 .remaining_bytes = 0}));
331 transfer_thread_.WaitUntilEventIsProcessed();
332
333 ASSERT_EQ(payloads.size(), 3u);
334
335 Chunk c2 = DecodeChunk(payloads[2]);
336 EXPECT_EQ(c2.transfer_id, 7u);
337 ASSERT_TRUE(c2.status.has_value());
338 EXPECT_EQ(c2.status.value(), OkStatus());
339
340 EXPECT_EQ(transfer_status, OkStatus());
341 EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
342 0);
343 }
344
TEST_F(ReadTransferMaxBytes32,TooMuchData)345 TEST_F(ReadTransferMaxBytes32, TooMuchData) {
346 stream::MemoryWriterBuffer<32> writer;
347 Status transfer_status = Status::Unknown();
348
349 ASSERT_EQ(OkStatus(),
350 client_.Read(8, writer, [&transfer_status](Status status) {
351 transfer_status = status;
352 }));
353 transfer_thread_.WaitUntilEventIsProcessed();
354
355 // First transfer parameters chunk is sent.
356 rpc::PayloadsView payloads =
357 context_.output().payloads<Transfer::Read>(context_.channel().id());
358 ASSERT_EQ(payloads.size(), 1u);
359 EXPECT_EQ(transfer_status, Status::Unknown());
360
361 Chunk c0 = DecodeChunk(payloads[0]);
362 EXPECT_EQ(c0.transfer_id, 8u);
363 EXPECT_EQ(c0.offset, 0u);
364 ASSERT_EQ(c0.pending_bytes.value(), 32u);
365
366 constexpr ConstByteSpan data(kData64);
367
368 // pending_bytes == 32
369 context_.server().SendServerStream<Transfer::Read>(
370 EncodeChunk({.transfer_id = 8u, .offset = 0, .data = data.first(16)}));
371
372 // pending_bytes == 16
373 context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
374 {.transfer_id = 8u, .offset = 16, .data = data.subspan(16, 8)}));
375
376 // pending_bytes == 8, send 16 instead.
377 context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
378 {.transfer_id = 8u, .offset = 24, .data = data.subspan(24, 16)}));
379 transfer_thread_.WaitUntilEventIsProcessed();
380
381 ASSERT_EQ(payloads.size(), 4u);
382
383 Chunk c1 = DecodeChunk(payloads[3]);
384 EXPECT_EQ(c1.transfer_id, 8u);
385 ASSERT_TRUE(c1.status.has_value());
386 EXPECT_EQ(c1.status.value(), Status::Internal());
387
388 EXPECT_EQ(transfer_status, Status::Internal());
389 }
390
TEST_F(ReadTransfer,ServerError)391 TEST_F(ReadTransfer, ServerError) {
392 stream::MemoryWriterBuffer<64> writer;
393 Status transfer_status = Status::Unknown();
394
395 ASSERT_EQ(OkStatus(),
396 client_.Read(9, writer, [&transfer_status](Status status) {
397 transfer_status = status;
398 }));
399 transfer_thread_.WaitUntilEventIsProcessed();
400
401 // First transfer parameters chunk is sent.
402 rpc::PayloadsView payloads =
403 context_.output().payloads<Transfer::Read>(context_.channel().id());
404 ASSERT_EQ(payloads.size(), 1u);
405 EXPECT_EQ(transfer_status, Status::Unknown());
406
407 Chunk c0 = DecodeChunk(payloads[0]);
408 EXPECT_EQ(c0.transfer_id, 9u);
409 EXPECT_EQ(c0.offset, 0u);
410 ASSERT_EQ(c0.pending_bytes.value(), 64u);
411
412 // Server sends an error. Client should not respond and terminate the
413 // transfer.
414 context_.server().SendServerStream<Transfer::Read>(
415 EncodeChunk({.transfer_id = 9u, .status = Status::NotFound()}));
416 transfer_thread_.WaitUntilEventIsProcessed();
417
418 ASSERT_EQ(payloads.size(), 1u);
419 EXPECT_EQ(transfer_status, Status::NotFound());
420 }
421
TEST_F(ReadTransfer,OnlySendsParametersOnceAfterDrop)422 TEST_F(ReadTransfer, OnlySendsParametersOnceAfterDrop) {
423 stream::MemoryWriterBuffer<64> writer;
424 Status transfer_status = Status::Unknown();
425
426 ASSERT_EQ(OkStatus(),
427 client_.Read(10, writer, [&transfer_status](Status status) {
428 transfer_status = status;
429 }));
430 transfer_thread_.WaitUntilEventIsProcessed();
431
432 // First transfer parameters chunk is sent.
433 rpc::PayloadsView payloads =
434 context_.output().payloads<Transfer::Read>(context_.channel().id());
435 ASSERT_EQ(payloads.size(), 1u);
436 EXPECT_EQ(transfer_status, Status::Unknown());
437
438 Chunk c0 = DecodeChunk(payloads[0]);
439 EXPECT_EQ(c0.transfer_id, 10u);
440 EXPECT_EQ(c0.offset, 0u);
441 ASSERT_EQ(c0.pending_bytes.value(), 64u);
442
443 constexpr ConstByteSpan data(kData64);
444
445 // Send the first 8 bytes of the transfer.
446 context_.server().SendServerStream<Transfer::Read>(
447 EncodeChunk({.transfer_id = 10u, .offset = 0, .data = data.first(8)}));
448
449 // Skip offset 8, send the rest starting from 16.
450 for (uint32_t offset = 16; offset < data.size(); offset += 8) {
451 context_.server().SendServerStream<Transfer::Read>(
452 EncodeChunk({.transfer_id = 10u,
453 .offset = offset,
454 .data = data.subspan(offset, 8)}));
455 }
456 transfer_thread_.WaitUntilEventIsProcessed();
457
458 // Only one parameters update should be sent, with the offset of the initial
459 // dropped packet.
460 ASSERT_EQ(payloads.size(), 2u);
461
462 Chunk c1 = DecodeChunk(payloads[1]);
463 EXPECT_EQ(c1.transfer_id, 10u);
464 EXPECT_EQ(c1.offset, 8u);
465 ASSERT_EQ(c1.pending_bytes.value(), 56u);
466
467 // Send the remaining data to complete the transfer.
468 context_.server().SendServerStream<Transfer::Read>(
469 EncodeChunk({.transfer_id = 10u,
470 .offset = 8,
471 .data = data.subspan(8, 56),
472 .remaining_bytes = 0}));
473 transfer_thread_.WaitUntilEventIsProcessed();
474
475 ASSERT_EQ(payloads.size(), 3u);
476
477 Chunk c2 = DecodeChunk(payloads[2]);
478 EXPECT_EQ(c2.transfer_id, 10u);
479 ASSERT_TRUE(c2.status.has_value());
480 EXPECT_EQ(c2.status.value(), OkStatus());
481
482 EXPECT_EQ(transfer_status, OkStatus());
483 }
484
TEST_F(ReadTransfer,ResendsParametersIfSentRepeatedChunkDuringRecovery)485 TEST_F(ReadTransfer, ResendsParametersIfSentRepeatedChunkDuringRecovery) {
486 stream::MemoryWriterBuffer<64> writer;
487 Status transfer_status = Status::Unknown();
488
489 ASSERT_EQ(OkStatus(),
490 client_.Read(11, writer, [&transfer_status](Status status) {
491 transfer_status = status;
492 }));
493 transfer_thread_.WaitUntilEventIsProcessed();
494
495 // First transfer parameters chunk is sent.
496 rpc::PayloadsView payloads =
497 context_.output().payloads<Transfer::Read>(context_.channel().id());
498 ASSERT_EQ(payloads.size(), 1u);
499 EXPECT_EQ(transfer_status, Status::Unknown());
500
501 Chunk c0 = DecodeChunk(payloads[0]);
502 EXPECT_EQ(c0.transfer_id, 11u);
503 EXPECT_EQ(c0.offset, 0u);
504 ASSERT_EQ(c0.pending_bytes.value(), 64u);
505
506 constexpr ConstByteSpan data(kData64);
507
508 // Send the first 8 bytes of the transfer.
509 context_.server().SendServerStream<Transfer::Read>(
510 EncodeChunk({.transfer_id = 11u, .offset = 0, .data = data.first(8)}));
511
512 // Skip offset 8, send the rest starting from 16.
513 for (uint32_t offset = 16; offset < data.size(); offset += 8) {
514 context_.server().SendServerStream<Transfer::Read>(
515 EncodeChunk({.transfer_id = 11u,
516 .offset = offset,
517 .data = data.subspan(offset, 8)}));
518 }
519 transfer_thread_.WaitUntilEventIsProcessed();
520
521 // Only one parameters update should be sent, with the offset of the initial
522 // dropped packet.
523 ASSERT_EQ(payloads.size(), 2u);
524
525 const Chunk last_chunk = {
526 .transfer_id = 11u, .offset = 56, .data = data.subspan(56)};
527
528 // Re-send the final chunk of the block.
529 context_.server().SendServerStream<Transfer::Read>(EncodeChunk(last_chunk));
530 transfer_thread_.WaitUntilEventIsProcessed();
531
532 // The original drop parameters should be re-sent.
533 ASSERT_EQ(payloads.size(), 3u);
534 Chunk c2 = DecodeChunk(payloads[2]);
535 EXPECT_EQ(c2.transfer_id, 11u);
536 EXPECT_EQ(c2.offset, 8u);
537 ASSERT_EQ(c2.pending_bytes.value(), 56u);
538
539 // Do it again.
540 context_.server().SendServerStream<Transfer::Read>(EncodeChunk(last_chunk));
541 transfer_thread_.WaitUntilEventIsProcessed();
542
543 ASSERT_EQ(payloads.size(), 4u);
544 Chunk c3 = DecodeChunk(payloads[3]);
545 EXPECT_EQ(c3.transfer_id, 11u);
546 EXPECT_EQ(c3.offset, 8u);
547 ASSERT_EQ(c3.pending_bytes.value(), 56u);
548
549 // Finish the transfer normally.
550 context_.server().SendServerStream<Transfer::Read>(
551 EncodeChunk({.transfer_id = 11u,
552 .offset = 8,
553 .data = data.subspan(8, 56),
554 .remaining_bytes = 0}));
555 transfer_thread_.WaitUntilEventIsProcessed();
556
557 ASSERT_EQ(payloads.size(), 5u);
558
559 Chunk c4 = DecodeChunk(payloads[4]);
560 EXPECT_EQ(c4.transfer_id, 11u);
561 ASSERT_TRUE(c4.status.has_value());
562 EXPECT_EQ(c4.status.value(), OkStatus());
563
564 EXPECT_EQ(transfer_status, OkStatus());
565 }
566
567 constexpr chrono::SystemClock::duration kTestTimeout =
568 std::chrono::milliseconds(50);
569 constexpr uint8_t kTestRetries = 3;
570
TEST_F(ReadTransfer,Timeout_ResendsCurrentParameters)571 TEST_F(ReadTransfer, Timeout_ResendsCurrentParameters) {
572 stream::MemoryWriterBuffer<64> writer;
573 Status transfer_status = Status::Unknown();
574
575 ASSERT_EQ(OkStatus(),
576 client_.Read(
577 12,
578 writer,
579 [&transfer_status](Status status) { transfer_status = status; },
580 kTestTimeout));
581 transfer_thread_.WaitUntilEventIsProcessed();
582
583 // First transfer parameters chunk is sent.
584 rpc::PayloadsView payloads =
585 context_.output().payloads<Transfer::Read>(context_.channel().id());
586 ASSERT_EQ(payloads.size(), 1u);
587 EXPECT_EQ(transfer_status, Status::Unknown());
588
589 Chunk c0 = DecodeChunk(payloads.back());
590 EXPECT_EQ(c0.transfer_id, 12u);
591 EXPECT_EQ(c0.offset, 0u);
592 EXPECT_EQ(c0.pending_bytes.value(), 64u);
593
594 // Wait for the timeout to expire without doing anything. The client should
595 // resend its parameters chunk.
596 transfer_thread_.SimulateClientTimeout(12);
597 ASSERT_EQ(payloads.size(), 2u);
598
599 Chunk c = DecodeChunk(payloads.back());
600 EXPECT_EQ(c.transfer_id, 12u);
601 EXPECT_EQ(c.offset, 0u);
602 EXPECT_EQ(c.pending_bytes.value(), 64u);
603
604 // Transfer has not yet completed.
605 EXPECT_EQ(transfer_status, Status::Unknown());
606
607 // Finish the transfer following the timeout.
608 context_.server().SendServerStream<Transfer::Read>(
609 EncodeChunk({.transfer_id = 12u,
610 .offset = 0,
611 .data = kData32,
612 .remaining_bytes = 0}));
613 transfer_thread_.WaitUntilEventIsProcessed();
614
615 ASSERT_EQ(payloads.size(), 3u);
616
617 Chunk c4 = DecodeChunk(payloads.back());
618 EXPECT_EQ(c4.transfer_id, 12u);
619 ASSERT_TRUE(c4.status.has_value());
620 EXPECT_EQ(c4.status.value(), OkStatus());
621
622 EXPECT_EQ(transfer_status, OkStatus());
623 }
624
TEST_F(ReadTransfer,Timeout_ResendsUpdatedParameters)625 TEST_F(ReadTransfer, Timeout_ResendsUpdatedParameters) {
626 stream::MemoryWriterBuffer<64> writer;
627 Status transfer_status = Status::Unknown();
628
629 ASSERT_EQ(OkStatus(),
630 client_.Read(
631 13,
632 writer,
633 [&transfer_status](Status status) { transfer_status = status; },
634 kTestTimeout));
635 transfer_thread_.WaitUntilEventIsProcessed();
636
637 // First transfer parameters chunk is sent.
638 rpc::PayloadsView payloads =
639 context_.output().payloads<Transfer::Read>(context_.channel().id());
640 ASSERT_EQ(payloads.size(), 1u);
641 EXPECT_EQ(transfer_status, Status::Unknown());
642
643 Chunk c0 = DecodeChunk(payloads.back());
644 EXPECT_EQ(c0.transfer_id, 13u);
645 EXPECT_EQ(c0.offset, 0u);
646 EXPECT_EQ(c0.pending_bytes.value(), 64u);
647
648 constexpr ConstByteSpan data(kData32);
649
650 // Send some data, but not everything.
651 context_.server().SendServerStream<Transfer::Read>(
652 EncodeChunk({.transfer_id = 13u, .offset = 0, .data = data.first(16)}));
653 transfer_thread_.WaitUntilEventIsProcessed();
654
655 ASSERT_EQ(payloads.size(), 1u);
656
657 // Wait for the timeout to expire without sending more data. The client should
658 // send an updated parameters chunk, accounting for the data already received.
659 transfer_thread_.SimulateClientTimeout(13);
660 ASSERT_EQ(payloads.size(), 2u);
661
662 Chunk c = DecodeChunk(payloads.back());
663 EXPECT_EQ(c.transfer_id, 13u);
664 EXPECT_EQ(c.offset, 16u);
665 EXPECT_EQ(c.pending_bytes.value(), 48u);
666
667 // Transfer has not yet completed.
668 EXPECT_EQ(transfer_status, Status::Unknown());
669
670 // Send the rest of the data, finishing the transfer.
671 context_.server().SendServerStream<Transfer::Read>(
672 EncodeChunk({.transfer_id = 13u,
673 .offset = 16,
674 .data = data.subspan(16),
675 .remaining_bytes = 0}));
676 transfer_thread_.WaitUntilEventIsProcessed();
677
678 ASSERT_EQ(payloads.size(), 3u);
679
680 Chunk c4 = DecodeChunk(payloads.back());
681 EXPECT_EQ(c4.transfer_id, 13u);
682 ASSERT_TRUE(c4.status.has_value());
683 EXPECT_EQ(c4.status.value(), OkStatus());
684
685 EXPECT_EQ(transfer_status, OkStatus());
686 }
687
TEST_F(ReadTransfer,Timeout_EndsTransferAfterMaxRetries)688 TEST_F(ReadTransfer, Timeout_EndsTransferAfterMaxRetries) {
689 stream::MemoryWriterBuffer<64> writer;
690 Status transfer_status = Status::Unknown();
691
692 ASSERT_EQ(OkStatus(),
693 client_.Read(
694 14,
695 writer,
696 [&transfer_status](Status status) { transfer_status = status; },
697 kTestTimeout));
698 transfer_thread_.WaitUntilEventIsProcessed();
699
700 // First transfer parameters chunk is sent.
701 rpc::PayloadsView payloads =
702 context_.output().payloads<Transfer::Read>(context_.channel().id());
703 ASSERT_EQ(payloads.size(), 1u);
704 EXPECT_EQ(transfer_status, Status::Unknown());
705
706 Chunk c0 = DecodeChunk(payloads.back());
707 EXPECT_EQ(c0.transfer_id, 14u);
708 EXPECT_EQ(c0.offset, 0u);
709 EXPECT_EQ(c0.pending_bytes.value(), 64u);
710
711 for (unsigned retry = 1; retry <= kTestRetries; ++retry) {
712 // Wait for the timeout to expire without doing anything. The client should
713 // resend its parameters chunk.
714 transfer_thread_.SimulateClientTimeout(14);
715 ASSERT_EQ(payloads.size(), retry + 1);
716
717 Chunk c = DecodeChunk(payloads.back());
718 EXPECT_EQ(c.transfer_id, 14u);
719 EXPECT_EQ(c.offset, 0u);
720 EXPECT_EQ(c.pending_bytes.value(), 64u);
721
722 // Transfer has not yet completed.
723 EXPECT_EQ(transfer_status, Status::Unknown());
724 }
725
726 // Sleep one more time after the final retry. The client should cancel the
727 // transfer at this point and send a DEADLINE_EXCEEDED chunk.
728 transfer_thread_.SimulateClientTimeout(14);
729 ASSERT_EQ(payloads.size(), 5u);
730
731 Chunk c4 = DecodeChunk(payloads.back());
732 EXPECT_EQ(c4.transfer_id, 14u);
733 ASSERT_TRUE(c4.status.has_value());
734 EXPECT_EQ(c4.status.value(), Status::DeadlineExceeded());
735
736 EXPECT_EQ(transfer_status, Status::DeadlineExceeded());
737
738 // After finishing the transfer, nothing else should be sent. Verify this by
739 // waiting for a bit.
740 this_thread::sleep_for(kTestTimeout * 4);
741 ASSERT_EQ(payloads.size(), 5u);
742 }
743
TEST_F(ReadTransfer,Timeout_ReceivingDataResetsRetryCount)744 TEST_F(ReadTransfer, Timeout_ReceivingDataResetsRetryCount) {
745 stream::MemoryWriterBuffer<64> writer;
746 Status transfer_status = Status::Unknown();
747
748 constexpr ConstByteSpan data(kData32);
749
750 ASSERT_EQ(OkStatus(),
751 client_.Read(
752 14,
753 writer,
754 [&transfer_status](Status status) { transfer_status = status; },
755 kTestTimeout));
756 transfer_thread_.WaitUntilEventIsProcessed();
757
758 // First transfer parameters chunk is sent.
759 rpc::PayloadsView payloads =
760 context_.output().payloads<Transfer::Read>(context_.channel().id());
761 ASSERT_EQ(payloads.size(), 1u);
762 EXPECT_EQ(transfer_status, Status::Unknown());
763
764 Chunk c0 = DecodeChunk(payloads.back());
765 EXPECT_EQ(c0.transfer_id, 14u);
766 EXPECT_EQ(c0.offset, 0u);
767 EXPECT_EQ(c0.window_end_offset, 64u);
768
769 // Simulate one less timeout than the maximum amount of retries.
770 for (unsigned retry = 1; retry <= kTestRetries - 1; ++retry) {
771 transfer_thread_.SimulateClientTimeout(14);
772 ASSERT_EQ(payloads.size(), retry + 1);
773
774 Chunk c = DecodeChunk(payloads.back());
775 EXPECT_EQ(c.transfer_id, 14u);
776 EXPECT_EQ(c.offset, 0u);
777 EXPECT_EQ(c.window_end_offset, 64u);
778
779 // Transfer has not yet completed.
780 EXPECT_EQ(transfer_status, Status::Unknown());
781 }
782
783 // Send some data.
784 context_.server().SendServerStream<Transfer::Read>(
785 EncodeChunk({.transfer_id = 14u, .offset = 0, .data = data.first(16)}));
786 transfer_thread_.WaitUntilEventIsProcessed();
787 ASSERT_EQ(payloads.size(), 3u);
788
789 // Time out a couple more times. The context's retry count should have been
790 // reset, so it should go through the standard retry flow instead of
791 // terminating the transfer.
792 transfer_thread_.SimulateClientTimeout(14);
793 ASSERT_EQ(payloads.size(), 4u);
794
795 Chunk c = DecodeChunk(payloads.back());
796 EXPECT_FALSE(c.status.has_value());
797 EXPECT_EQ(c.transfer_id, 14u);
798 EXPECT_EQ(c.offset, 16u);
799 EXPECT_EQ(c.window_end_offset, 64u);
800
801 transfer_thread_.SimulateClientTimeout(14);
802 ASSERT_EQ(payloads.size(), 5u);
803
804 c = DecodeChunk(payloads.back());
805 EXPECT_FALSE(c.status.has_value());
806 EXPECT_EQ(c.transfer_id, 14u);
807 EXPECT_EQ(c.offset, 16u);
808 EXPECT_EQ(c.window_end_offset, 64u);
809 }
810
TEST_F(ReadTransfer,InitialPacketFails_OnCompletedCalledWithDataLoss)811 TEST_F(ReadTransfer, InitialPacketFails_OnCompletedCalledWithDataLoss) {
812 stream::MemoryWriterBuffer<64> writer;
813 Status transfer_status = Status::Unknown();
814
815 context_.output().set_send_status(Status::Unauthenticated());
816
817 ASSERT_EQ(OkStatus(),
818 client_.Read(
819 14,
820 writer,
821 [&transfer_status](Status status) {
822 ASSERT_EQ(transfer_status,
823 Status::Unknown()); // Must only call once
824 transfer_status = status;
825 },
826 kTestTimeout));
827 transfer_thread_.WaitUntilEventIsProcessed();
828
829 EXPECT_EQ(transfer_status, Status::Internal());
830 }
831
832 class WriteTransfer : public ::testing::Test {
833 protected:
WriteTransfer()834 WriteTransfer()
835 : transfer_thread_(chunk_buffer_, encode_buffer_),
836 client_(context_.client(), context_.channel().id(), transfer_thread_),
837 system_thread_(TransferThreadOptions(), transfer_thread_) {}
838
~WriteTransfer()839 ~WriteTransfer() {
840 transfer_thread_.Terminate();
841 system_thread_.join();
842 }
843
844 rpc::RawClientTestContext<> context_;
845
846 Thread<1, 1> transfer_thread_;
847 Client client_;
848
849 std::array<std::byte, 64> chunk_buffer_;
850 std::array<std::byte, 64> encode_buffer_;
851
852 thread::Thread system_thread_;
853 };
854
TEST_F(WriteTransfer,SingleChunk)855 TEST_F(WriteTransfer, SingleChunk) {
856 stream::MemoryReader reader(kData32);
857 Status transfer_status = Status::Unknown();
858
859 ASSERT_EQ(OkStatus(),
860 client_.Write(3, reader, [&transfer_status](Status status) {
861 transfer_status = status;
862 }));
863 transfer_thread_.WaitUntilEventIsProcessed();
864
865 // The client begins by just sending the transfer ID.
866 rpc::PayloadsView payloads =
867 context_.output().payloads<Transfer::Write>(context_.channel().id());
868 ASSERT_EQ(payloads.size(), 1u);
869 EXPECT_EQ(transfer_status, Status::Unknown());
870
871 Chunk c0 = DecodeChunk(payloads[0]);
872 EXPECT_EQ(c0.transfer_id, 3u);
873
874 // Send transfer parameters. Client should send a data chunk and the final
875 // chunk.
876 rpc::test::WaitForPackets(context_.output(), 2, [this] {
877 context_.server().SendServerStream<Transfer::Write>(
878 EncodeChunk({.transfer_id = 3,
879 .pending_bytes = 64,
880 .max_chunk_size_bytes = 32,
881 .offset = 0}));
882 });
883
884 ASSERT_EQ(payloads.size(), 3u);
885
886 Chunk c1 = DecodeChunk(payloads[1]);
887 EXPECT_EQ(c1.transfer_id, 3u);
888 EXPECT_EQ(c1.offset, 0u);
889 EXPECT_EQ(std::memcmp(c1.data.data(), kData32.data(), c1.data.size()), 0);
890
891 Chunk c2 = DecodeChunk(payloads[2]);
892 EXPECT_EQ(c2.transfer_id, 3u);
893 ASSERT_TRUE(c2.remaining_bytes.has_value());
894 EXPECT_EQ(c2.remaining_bytes.value(), 0u);
895
896 EXPECT_EQ(transfer_status, Status::Unknown());
897
898 // Send the final status chunk to complete the transfer.
899 context_.server().SendServerStream<Transfer::Write>(
900 EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
901 transfer_thread_.WaitUntilEventIsProcessed();
902
903 EXPECT_EQ(payloads.size(), 3u);
904 EXPECT_EQ(transfer_status, OkStatus());
905 }
906
TEST_F(WriteTransfer,MultiChunk)907 TEST_F(WriteTransfer, MultiChunk) {
908 stream::MemoryReader reader(kData32);
909 Status transfer_status = Status::Unknown();
910
911 ASSERT_EQ(OkStatus(),
912 client_.Write(4, reader, [&transfer_status](Status status) {
913 transfer_status = status;
914 }));
915 transfer_thread_.WaitUntilEventIsProcessed();
916
917 // The client begins by just sending the transfer ID.
918 rpc::PayloadsView payloads =
919 context_.output().payloads<Transfer::Write>(context_.channel().id());
920 ASSERT_EQ(payloads.size(), 1u);
921 EXPECT_EQ(transfer_status, Status::Unknown());
922
923 Chunk c0 = DecodeChunk(payloads[0]);
924 EXPECT_EQ(c0.transfer_id, 4u);
925
926 // Send transfer parameters with a chunk size smaller than the data.
927
928 // Client should send two data chunks and the final chunk.
929 rpc::test::WaitForPackets(context_.output(), 3, [this] {
930 context_.server().SendServerStream<Transfer::Write>(
931 EncodeChunk({.transfer_id = 4,
932 .pending_bytes = 64,
933 .max_chunk_size_bytes = 16,
934 .offset = 0}));
935 });
936
937 ASSERT_EQ(payloads.size(), 4u);
938
939 Chunk c1 = DecodeChunk(payloads[1]);
940 EXPECT_EQ(c1.transfer_id, 4u);
941 EXPECT_EQ(c1.offset, 0u);
942 EXPECT_EQ(std::memcmp(c1.data.data(), kData32.data(), c1.data.size()), 0);
943
944 Chunk c2 = DecodeChunk(payloads[2]);
945 EXPECT_EQ(c2.transfer_id, 4u);
946 EXPECT_EQ(c2.offset, 16u);
947 EXPECT_EQ(
948 std::memcmp(c2.data.data(), kData32.data() + c2.offset, c2.data.size()),
949 0);
950
951 Chunk c3 = DecodeChunk(payloads[3]);
952 EXPECT_EQ(c3.transfer_id, 4u);
953 ASSERT_TRUE(c3.remaining_bytes.has_value());
954 EXPECT_EQ(c3.remaining_bytes.value(), 0u);
955
956 EXPECT_EQ(transfer_status, Status::Unknown());
957
958 // Send the final status chunk to complete the transfer.
959 context_.server().SendServerStream<Transfer::Write>(
960 EncodeChunk({.transfer_id = 4, .status = OkStatus()}));
961 transfer_thread_.WaitUntilEventIsProcessed();
962
963 EXPECT_EQ(payloads.size(), 4u);
964 EXPECT_EQ(transfer_status, OkStatus());
965 }
966
TEST_F(WriteTransfer,OutOfOrder_SeekSupported)967 TEST_F(WriteTransfer, OutOfOrder_SeekSupported) {
968 stream::MemoryReader reader(kData32);
969 Status transfer_status = Status::Unknown();
970
971 ASSERT_EQ(OkStatus(),
972 client_.Write(5, reader, [&transfer_status](Status status) {
973 transfer_status = status;
974 }));
975 transfer_thread_.WaitUntilEventIsProcessed();
976
977 // The client begins by just sending the transfer ID.
978 rpc::PayloadsView payloads =
979 context_.output().payloads<Transfer::Write>(context_.channel().id());
980 ASSERT_EQ(payloads.size(), 1u);
981 EXPECT_EQ(transfer_status, Status::Unknown());
982
983 Chunk c0 = DecodeChunk(payloads[0]);
984 EXPECT_EQ(c0.transfer_id, 5u);
985
986 // Send transfer parameters with a nonzero offset, requesting a seek.
987 // Client should send a data chunk and the final chunk.
988 rpc::test::WaitForPackets(context_.output(), 2, [this] {
989 context_.server().SendServerStream<Transfer::Write>(
990 EncodeChunk({.transfer_id = 5,
991 .pending_bytes = 64,
992 .max_chunk_size_bytes = 32,
993 .offset = 16}));
994 });
995
996 ASSERT_EQ(payloads.size(), 3u);
997
998 Chunk c1 = DecodeChunk(payloads[1]);
999 EXPECT_EQ(c1.transfer_id, 5u);
1000 EXPECT_EQ(c1.offset, 16u);
1001 EXPECT_EQ(
1002 std::memcmp(c1.data.data(), kData32.data() + c1.offset, c1.data.size()),
1003 0);
1004
1005 Chunk c2 = DecodeChunk(payloads[2]);
1006 EXPECT_EQ(c2.transfer_id, 5u);
1007 ASSERT_TRUE(c2.remaining_bytes.has_value());
1008 EXPECT_EQ(c2.remaining_bytes.value(), 0u);
1009
1010 EXPECT_EQ(transfer_status, Status::Unknown());
1011
1012 // Send the final status chunk to complete the transfer.
1013 context_.server().SendServerStream<Transfer::Write>(
1014 EncodeChunk({.transfer_id = 5, .status = OkStatus()}));
1015 transfer_thread_.WaitUntilEventIsProcessed();
1016
1017 EXPECT_EQ(payloads.size(), 3u);
1018 EXPECT_EQ(transfer_status, OkStatus());
1019 }
1020
1021 class FakeNonSeekableReader final : public stream::NonSeekableReader {
1022 public:
FakeNonSeekableReader(ConstByteSpan data)1023 FakeNonSeekableReader(ConstByteSpan data) : data_(data), position_(0) {}
1024
1025 private:
DoRead(ByteSpan out)1026 StatusWithSize DoRead(ByteSpan out) final {
1027 if (position_ == data_.size()) {
1028 return StatusWithSize::OutOfRange();
1029 }
1030
1031 size_t to_copy = std::min(out.size(), data_.size() - position_);
1032 std::memcpy(out.data(), data_.data() + position_, to_copy);
1033 position_ += to_copy;
1034
1035 return StatusWithSize(to_copy);
1036 }
1037
1038 ConstByteSpan data_;
1039 size_t position_;
1040 };
1041
TEST_F(WriteTransfer,OutOfOrder_SeekNotSupported)1042 TEST_F(WriteTransfer, OutOfOrder_SeekNotSupported) {
1043 FakeNonSeekableReader reader(kData32);
1044 Status transfer_status = Status::Unknown();
1045
1046 ASSERT_EQ(OkStatus(),
1047 client_.Write(6, reader, [&transfer_status](Status status) {
1048 transfer_status = status;
1049 }));
1050 transfer_thread_.WaitUntilEventIsProcessed();
1051
1052 // The client begins by just sending the transfer ID.
1053 rpc::PayloadsView payloads =
1054 context_.output().payloads<Transfer::Write>(context_.channel().id());
1055 ASSERT_EQ(payloads.size(), 1u);
1056 EXPECT_EQ(transfer_status, Status::Unknown());
1057
1058 Chunk c0 = DecodeChunk(payloads[0]);
1059 EXPECT_EQ(c0.transfer_id, 6u);
1060
1061 // Send transfer parameters with a nonzero offset, requesting a seek.
1062 context_.server().SendServerStream<Transfer::Write>(
1063 EncodeChunk({.transfer_id = 6,
1064 .pending_bytes = 64,
1065 .max_chunk_size_bytes = 32,
1066 .offset = 16}));
1067 transfer_thread_.WaitUntilEventIsProcessed();
1068
1069 // Client should send a status chunk and end the transfer.
1070 ASSERT_EQ(payloads.size(), 2u);
1071
1072 Chunk c1 = DecodeChunk(payloads[1]);
1073 EXPECT_EQ(c1.transfer_id, 6u);
1074 ASSERT_TRUE(c1.status.has_value());
1075 EXPECT_EQ(c1.status.value(), Status::Unimplemented());
1076
1077 EXPECT_EQ(transfer_status, Status::Unimplemented());
1078 }
1079
TEST_F(WriteTransfer,ServerError)1080 TEST_F(WriteTransfer, ServerError) {
1081 stream::MemoryReader reader(kData32);
1082 Status transfer_status = Status::Unknown();
1083
1084 ASSERT_EQ(OkStatus(),
1085 client_.Write(7, reader, [&transfer_status](Status status) {
1086 transfer_status = status;
1087 }));
1088 transfer_thread_.WaitUntilEventIsProcessed();
1089
1090 // The client begins by just sending the transfer ID.
1091 rpc::PayloadsView payloads =
1092 context_.output().payloads<Transfer::Write>(context_.channel().id());
1093 ASSERT_EQ(payloads.size(), 1u);
1094 EXPECT_EQ(transfer_status, Status::Unknown());
1095
1096 Chunk c0 = DecodeChunk(payloads[0]);
1097 EXPECT_EQ(c0.transfer_id, 7u);
1098
1099 // Send an error from the server.
1100 context_.server().SendServerStream<Transfer::Write>(
1101 EncodeChunk({.transfer_id = 7, .status = Status::NotFound()}));
1102 transfer_thread_.WaitUntilEventIsProcessed();
1103
1104 // Client should not respond and terminate the transfer.
1105 EXPECT_EQ(payloads.size(), 1u);
1106 EXPECT_EQ(transfer_status, Status::NotFound());
1107 }
1108
TEST_F(WriteTransfer,MalformedParametersChunk)1109 TEST_F(WriteTransfer, MalformedParametersChunk) {
1110 stream::MemoryReader reader(kData32);
1111 Status transfer_status = Status::Unknown();
1112
1113 ASSERT_EQ(OkStatus(),
1114 client_.Write(8, reader, [&transfer_status](Status status) {
1115 transfer_status = status;
1116 }));
1117 transfer_thread_.WaitUntilEventIsProcessed();
1118
1119 // The client begins by just sending the transfer ID.
1120 rpc::PayloadsView payloads =
1121 context_.output().payloads<Transfer::Write>(context_.channel().id());
1122 ASSERT_EQ(payloads.size(), 1u);
1123 EXPECT_EQ(transfer_status, Status::Unknown());
1124
1125 Chunk c0 = DecodeChunk(payloads[0]);
1126 EXPECT_EQ(c0.transfer_id, 8u);
1127
1128 // Send an invalid transfer parameters chunk without pending_bytes.
1129 context_.server().SendServerStream<Transfer::Write>(
1130 EncodeChunk({.transfer_id = 8, .max_chunk_size_bytes = 32}));
1131 transfer_thread_.WaitUntilEventIsProcessed();
1132
1133 // Client should send a status chunk and end the transfer.
1134 ASSERT_EQ(payloads.size(), 2u);
1135
1136 Chunk c1 = DecodeChunk(payloads[1]);
1137 EXPECT_EQ(c1.transfer_id, 8u);
1138 ASSERT_TRUE(c1.status.has_value());
1139 EXPECT_EQ(c1.status.value(), Status::InvalidArgument());
1140
1141 EXPECT_EQ(transfer_status, Status::InvalidArgument());
1142 }
1143
TEST_F(WriteTransfer,AbortIfZeroBytesAreRequested)1144 TEST_F(WriteTransfer, AbortIfZeroBytesAreRequested) {
1145 stream::MemoryReader reader(kData32);
1146 Status transfer_status = Status::Unknown();
1147
1148 ASSERT_EQ(OkStatus(),
1149 client_.Write(9, reader, [&transfer_status](Status status) {
1150 transfer_status = status;
1151 }));
1152 transfer_thread_.WaitUntilEventIsProcessed();
1153
1154 // The client begins by just sending the transfer ID.
1155 rpc::PayloadsView payloads =
1156 context_.output().payloads<Transfer::Write>(context_.channel().id());
1157 ASSERT_EQ(payloads.size(), 1u);
1158 EXPECT_EQ(transfer_status, Status::Unknown());
1159
1160 Chunk c0 = DecodeChunk(payloads[0]);
1161 EXPECT_EQ(c0.transfer_id, 9u);
1162
1163 // Send an invalid transfer parameters chunk with 0 pending_bytes.
1164 context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1165 {.transfer_id = 9, .pending_bytes = 0, .max_chunk_size_bytes = 32}));
1166 transfer_thread_.WaitUntilEventIsProcessed();
1167
1168 // Client should send a status chunk and end the transfer.
1169 ASSERT_EQ(payloads.size(), 2u);
1170
1171 Chunk c1 = DecodeChunk(payloads[1]);
1172 EXPECT_EQ(c1.transfer_id, 9u);
1173 ASSERT_TRUE(c1.status.has_value());
1174 EXPECT_EQ(c1.status.value(), Status::ResourceExhausted());
1175
1176 EXPECT_EQ(transfer_status, Status::ResourceExhausted());
1177 }
1178
TEST_F(WriteTransfer,Timeout_RetriesWithInitialChunk)1179 TEST_F(WriteTransfer, Timeout_RetriesWithInitialChunk) {
1180 stream::MemoryReader reader(kData32);
1181 Status transfer_status = Status::Unknown();
1182
1183 ASSERT_EQ(OkStatus(),
1184 client_.Write(
1185 10,
1186 reader,
1187 [&transfer_status](Status status) { transfer_status = status; },
1188 kTestTimeout));
1189 transfer_thread_.WaitUntilEventIsProcessed();
1190
1191 // The client begins by just sending the transfer ID.
1192 rpc::PayloadsView payloads =
1193 context_.output().payloads<Transfer::Write>(context_.channel().id());
1194 ASSERT_EQ(payloads.size(), 1u);
1195 EXPECT_EQ(transfer_status, Status::Unknown());
1196
1197 Chunk c0 = DecodeChunk(payloads.back());
1198 EXPECT_EQ(c0.transfer_id, 10u);
1199
1200 // Wait for the timeout to expire without doing anything. The client should
1201 // resend the initial transmit chunk.
1202 transfer_thread_.SimulateClientTimeout(10);
1203 ASSERT_EQ(payloads.size(), 2u);
1204
1205 Chunk c = DecodeChunk(payloads.back());
1206 EXPECT_EQ(c.transfer_id, 10u);
1207
1208 // Transfer has not yet completed.
1209 EXPECT_EQ(transfer_status, Status::Unknown());
1210 }
1211
TEST_F(WriteTransfer,Timeout_RetriesWithMostRecentChunk)1212 TEST_F(WriteTransfer, Timeout_RetriesWithMostRecentChunk) {
1213 stream::MemoryReader reader(kData32);
1214 Status transfer_status = Status::Unknown();
1215
1216 ASSERT_EQ(OkStatus(),
1217 client_.Write(
1218 11,
1219 reader,
1220 [&transfer_status](Status status) { transfer_status = status; },
1221 kTestTimeout));
1222 transfer_thread_.WaitUntilEventIsProcessed();
1223
1224 // The client begins by just sending the transfer ID.
1225 rpc::PayloadsView payloads =
1226 context_.output().payloads<Transfer::Write>(context_.channel().id());
1227 ASSERT_EQ(payloads.size(), 1u);
1228 EXPECT_EQ(transfer_status, Status::Unknown());
1229
1230 Chunk c0 = DecodeChunk(payloads.back());
1231 EXPECT_EQ(c0.transfer_id, 11u);
1232
1233 // Send the first parameters chunk.
1234 rpc::test::WaitForPackets(context_.output(), 2, [this] {
1235 context_.server().SendServerStream<Transfer::Write>(
1236 EncodeChunk({.transfer_id = 11,
1237 .pending_bytes = 16,
1238 .max_chunk_size_bytes = 8,
1239 .offset = 0}));
1240 });
1241 ASSERT_EQ(payloads.size(), 3u);
1242
1243 EXPECT_EQ(transfer_status, Status::Unknown());
1244
1245 Chunk c1 = DecodeChunk(payloads[1]);
1246 EXPECT_EQ(c1.transfer_id, 11u);
1247 EXPECT_EQ(c1.offset, 0u);
1248 EXPECT_EQ(c1.data.size(), 8u);
1249 EXPECT_EQ(std::memcmp(c1.data.data(), kData32.data(), c1.data.size()), 0);
1250
1251 Chunk c2 = DecodeChunk(payloads[2]);
1252 EXPECT_EQ(c2.transfer_id, 11u);
1253 EXPECT_EQ(c2.offset, 8u);
1254 EXPECT_EQ(c2.data.size(), 8u);
1255 EXPECT_EQ(
1256 std::memcmp(c2.data.data(), kData32.data() + c2.offset, c1.data.size()),
1257 0);
1258
1259 // Wait for the timeout to expire without doing anything. The client should
1260 // resend the most recently sent chunk.
1261 transfer_thread_.SimulateClientTimeout(11);
1262 ASSERT_EQ(payloads.size(), 4u);
1263
1264 Chunk c3 = DecodeChunk(payloads[3]);
1265 EXPECT_EQ(c3.transfer_id, c2.transfer_id);
1266 EXPECT_EQ(c3.offset, c2.offset);
1267 EXPECT_EQ(c3.data.size(), c2.data.size());
1268 EXPECT_EQ(std::memcmp(c3.data.data(), c2.data.data(), c3.data.size()), 0);
1269
1270 // Transfer has not yet completed.
1271 EXPECT_EQ(transfer_status, Status::Unknown());
1272 }
1273
TEST_F(WriteTransfer,Timeout_RetriesWithSingleChunkTransfer)1274 TEST_F(WriteTransfer, Timeout_RetriesWithSingleChunkTransfer) {
1275 stream::MemoryReader reader(kData32);
1276 Status transfer_status = Status::Unknown();
1277
1278 ASSERT_EQ(OkStatus(),
1279 client_.Write(
1280 12,
1281 reader,
1282 [&transfer_status](Status status) { transfer_status = status; },
1283 kTestTimeout));
1284 transfer_thread_.WaitUntilEventIsProcessed();
1285
1286 // The client begins by just sending the transfer ID.
1287 rpc::PayloadsView payloads =
1288 context_.output().payloads<Transfer::Write>(context_.channel().id());
1289 ASSERT_EQ(payloads.size(), 1u);
1290 EXPECT_EQ(transfer_status, Status::Unknown());
1291
1292 Chunk c0 = DecodeChunk(payloads.back());
1293 EXPECT_EQ(c0.transfer_id, 12u);
1294
1295 // Send the first parameters chunk, requesting all the data. The client should
1296 // respond with one data chunk and a remaining_bytes = 0 chunk.
1297 rpc::test::WaitForPackets(context_.output(), 2, [this] {
1298 context_.server().SendServerStream<Transfer::Write>(
1299 EncodeChunk({.transfer_id = 12,
1300 .pending_bytes = 64,
1301 .max_chunk_size_bytes = 64,
1302 .offset = 0}));
1303 });
1304 ASSERT_EQ(payloads.size(), 3u);
1305
1306 EXPECT_EQ(transfer_status, Status::Unknown());
1307
1308 Chunk c1 = DecodeChunk(payloads[1]);
1309 EXPECT_EQ(c1.transfer_id, 12u);
1310 EXPECT_EQ(c1.offset, 0u);
1311 EXPECT_EQ(c1.data.size(), 32u);
1312 EXPECT_EQ(std::memcmp(c1.data.data(), kData32.data(), c1.data.size()), 0);
1313
1314 Chunk c2 = DecodeChunk(payloads[2]);
1315 EXPECT_EQ(c2.transfer_id, 12u);
1316 ASSERT_TRUE(c2.remaining_bytes.has_value());
1317 EXPECT_EQ(c2.remaining_bytes.value(), 0u);
1318
1319 // Wait for the timeout to expire without doing anything. The client should
1320 // resend the data chunk.
1321 transfer_thread_.SimulateClientTimeout(12);
1322 ASSERT_EQ(payloads.size(), 4u);
1323
1324 Chunk c3 = DecodeChunk(payloads[3]);
1325 EXPECT_EQ(c3.transfer_id, c1.transfer_id);
1326 EXPECT_EQ(c3.offset, c1.offset);
1327 EXPECT_EQ(c3.data.size(), c1.data.size());
1328 EXPECT_EQ(std::memcmp(c3.data.data(), c1.data.data(), c3.data.size()), 0);
1329
1330 // The remaining_bytes = 0 chunk should be resent on the next parameters.
1331 context_.server().SendServerStream<Transfer::Write>(
1332 EncodeChunk({.transfer_id = 12,
1333 .pending_bytes = 64,
1334 .max_chunk_size_bytes = 64,
1335 .offset = 32}));
1336 transfer_thread_.WaitUntilEventIsProcessed();
1337
1338 ASSERT_EQ(payloads.size(), 5u);
1339
1340 Chunk c4 = DecodeChunk(payloads[4]);
1341 EXPECT_EQ(c4.transfer_id, 12u);
1342 ASSERT_TRUE(c4.remaining_bytes.has_value());
1343 EXPECT_EQ(c4.remaining_bytes.value(), 0u);
1344
1345 context_.server().SendServerStream<Transfer::Write>(
1346 EncodeChunk({.transfer_id = 12, .status = OkStatus()}));
1347 transfer_thread_.WaitUntilEventIsProcessed();
1348
1349 EXPECT_EQ(transfer_status, OkStatus());
1350 }
1351
TEST_F(WriteTransfer,Timeout_EndsTransferAfterMaxRetries)1352 TEST_F(WriteTransfer, Timeout_EndsTransferAfterMaxRetries) {
1353 stream::MemoryReader reader(kData32);
1354 Status transfer_status = Status::Unknown();
1355
1356 ASSERT_EQ(OkStatus(),
1357 client_.Write(
1358 13,
1359 reader,
1360 [&transfer_status](Status status) { transfer_status = status; },
1361 kTestTimeout));
1362 transfer_thread_.WaitUntilEventIsProcessed();
1363
1364 // The client begins by just sending the transfer ID.
1365 rpc::PayloadsView payloads =
1366 context_.output().payloads<Transfer::Write>(context_.channel().id());
1367 ASSERT_EQ(payloads.size(), 1u);
1368 EXPECT_EQ(transfer_status, Status::Unknown());
1369
1370 Chunk c0 = DecodeChunk(payloads.back());
1371 EXPECT_EQ(c0.transfer_id, 13u);
1372
1373 for (unsigned retry = 1; retry <= kTestRetries; ++retry) {
1374 // Wait for the timeout to expire without doing anything. The client should
1375 // resend the initial transmit chunk.
1376 transfer_thread_.SimulateClientTimeout(13);
1377 ASSERT_EQ(payloads.size(), retry + 1);
1378
1379 Chunk c = DecodeChunk(payloads.back());
1380 EXPECT_EQ(c.transfer_id, 13u);
1381
1382 // Transfer has not yet completed.
1383 EXPECT_EQ(transfer_status, Status::Unknown());
1384 }
1385
1386 // Sleep one more time after the final retry. The client should cancel the
1387 // transfer at this point and send a DEADLINE_EXCEEDED chunk.
1388 transfer_thread_.SimulateClientTimeout(13);
1389 ASSERT_EQ(payloads.size(), 5u);
1390
1391 Chunk c4 = DecodeChunk(payloads.back());
1392 EXPECT_EQ(c4.transfer_id, 13u);
1393 ASSERT_TRUE(c4.status.has_value());
1394 EXPECT_EQ(c4.status.value(), Status::DeadlineExceeded());
1395
1396 EXPECT_EQ(transfer_status, Status::DeadlineExceeded());
1397
1398 // After finishing the transfer, nothing else should be sent. Verify this by
1399 // waiting for a bit.
1400 this_thread::sleep_for(kTestTimeout * 4);
1401 ASSERT_EQ(payloads.size(), 5u);
1402 }
1403
TEST_F(WriteTransfer,Timeout_NonSeekableReaderEndsTransfer)1404 TEST_F(WriteTransfer, Timeout_NonSeekableReaderEndsTransfer) {
1405 FakeNonSeekableReader reader(kData32);
1406 Status transfer_status = Status::Unknown();
1407
1408 ASSERT_EQ(OkStatus(),
1409 client_.Write(
1410 14,
1411 reader,
1412 [&transfer_status](Status status) { transfer_status = status; },
1413 kTestTimeout));
1414 transfer_thread_.WaitUntilEventIsProcessed();
1415
1416 // The client begins by just sending the transfer ID.
1417 rpc::PayloadsView payloads =
1418 context_.output().payloads<Transfer::Write>(context_.channel().id());
1419 ASSERT_EQ(payloads.size(), 1u);
1420 EXPECT_EQ(transfer_status, Status::Unknown());
1421
1422 Chunk c0 = DecodeChunk(payloads.back());
1423 EXPECT_EQ(c0.transfer_id, 14u);
1424
1425 // Send the first parameters chunk.
1426 rpc::test::WaitForPackets(context_.output(), 2, [this] {
1427 context_.server().SendServerStream<Transfer::Write>(
1428 EncodeChunk({.transfer_id = 14,
1429 .pending_bytes = 16,
1430 .max_chunk_size_bytes = 8,
1431 .offset = 0}));
1432 });
1433 ASSERT_EQ(payloads.size(), 3u);
1434
1435 EXPECT_EQ(transfer_status, Status::Unknown());
1436
1437 Chunk c1 = DecodeChunk(payloads[1]);
1438 EXPECT_EQ(c1.transfer_id, 14u);
1439 EXPECT_EQ(c1.offset, 0u);
1440 EXPECT_EQ(c1.data.size(), 8u);
1441 EXPECT_EQ(std::memcmp(c1.data.data(), kData32.data(), c1.data.size()), 0);
1442
1443 Chunk c2 = DecodeChunk(payloads[2]);
1444 EXPECT_EQ(c2.transfer_id, 14u);
1445 EXPECT_EQ(c2.offset, 8u);
1446 EXPECT_EQ(c2.data.size(), 8u);
1447 EXPECT_EQ(
1448 std::memcmp(c2.data.data(), kData32.data() + c2.offset, c1.data.size()),
1449 0);
1450
1451 // Wait for the timeout to expire without doing anything. The client should
1452 // fail to seek back and end the transfer.
1453 transfer_thread_.SimulateClientTimeout(14);
1454 ASSERT_EQ(payloads.size(), 4u);
1455
1456 Chunk c3 = DecodeChunk(payloads[3]);
1457 EXPECT_EQ(c3.transfer_id, 14u);
1458 ASSERT_TRUE(c3.status.has_value());
1459 EXPECT_EQ(c3.status.value(), Status::DeadlineExceeded());
1460
1461 EXPECT_EQ(transfer_status, Status::DeadlineExceeded());
1462 }
1463
1464 PW_MODIFY_DIAGNOSTICS_POP();
1465
1466 } // namespace
1467 } // namespace pw::transfer::test
1468