• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_transfer/client.h"
16 
17 #include <cstring>
18 
19 #include "gtest/gtest.h"
20 #include "pw_assert/check.h"
21 #include "pw_bytes/array.h"
22 #include "pw_rpc/raw/client_testing.h"
23 #include "pw_rpc/thread_testing.h"
24 #include "pw_thread/sleep.h"
25 #include "pw_thread/thread.h"
26 #include "pw_thread_stl/options.h"
27 #include "pw_transfer_private/chunk_testing.h"
28 
29 namespace pw::transfer::test {
30 namespace {
31 
32 using internal::Chunk;
33 using pw_rpc::raw::Transfer;
34 
35 using namespace std::chrono_literals;
36 
37 PW_MODIFY_DIAGNOSTICS_PUSH();
38 PW_MODIFY_DIAGNOSTIC(ignored, "-Wmissing-field-initializers");
39 
TransferThreadOptions()40 thread::Options& TransferThreadOptions() {
41   static thread::stl::Options options;
42   return options;
43 }
44 
45 class ReadTransfer : public ::testing::Test {
46  protected:
ReadTransfer(size_t max_bytes_to_receive=0)47   ReadTransfer(size_t max_bytes_to_receive = 0)
48       : transfer_thread_(chunk_buffer_, encode_buffer_),
49         client_(context_.client(),
50                 context_.channel().id(),
51                 transfer_thread_,
52                 max_bytes_to_receive),
53         system_thread_(TransferThreadOptions(), transfer_thread_) {}
54 
~ReadTransfer()55   ~ReadTransfer() {
56     transfer_thread_.Terminate();
57     system_thread_.join();
58   }
59 
60   rpc::RawClientTestContext<> context_;
61 
62   Thread<1, 1> transfer_thread_;
63   Client client_;
64 
65   std::array<std::byte, 64> chunk_buffer_;
66   std::array<std::byte, 64> encode_buffer_;
67 
68   thread::Thread system_thread_;
69 };
70 
__anon689fba7d0202(size_t i) 71 constexpr auto kData32 = bytes::Initialized<32>([](size_t i) { return i; });
__anon689fba7d0302(size_t i) 72 constexpr auto kData64 = bytes::Initialized<64>([](size_t i) { return i; });
73 
TEST_F(ReadTransfer,SingleChunk)74 TEST_F(ReadTransfer, SingleChunk) {
75   stream::MemoryWriterBuffer<64> writer;
76   Status transfer_status = Status::Unknown();
77 
78   ASSERT_EQ(OkStatus(),
79             client_.Read(3, writer, [&transfer_status](Status status) {
80               transfer_status = status;
81             }));
82 
83   transfer_thread_.WaitUntilEventIsProcessed();
84 
85   // First transfer parameters chunk is sent.
86   rpc::PayloadsView payloads =
87       context_.output().payloads<Transfer::Read>(context_.channel().id());
88   ASSERT_EQ(payloads.size(), 1u);
89   EXPECT_EQ(transfer_status, Status::Unknown());
90 
91   Chunk c0 = DecodeChunk(payloads[0]);
92   EXPECT_EQ(c0.transfer_id, 3u);
93   EXPECT_EQ(c0.offset, 0u);
94   EXPECT_EQ(c0.pending_bytes.value(), 64u);
95 
96   context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
97       {.transfer_id = 3u, .offset = 0, .data = kData32, .remaining_bytes = 0}));
98   transfer_thread_.WaitUntilEventIsProcessed();
99 
100   ASSERT_EQ(payloads.size(), 2u);
101 
102   Chunk c1 = DecodeChunk(payloads[1]);
103   EXPECT_EQ(c1.transfer_id, 3u);
104   ASSERT_TRUE(c1.status.has_value());
105   EXPECT_EQ(c1.status.value(), OkStatus());
106 
107   EXPECT_EQ(transfer_status, OkStatus());
108   EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
109             0);
110 }
111 
TEST_F(ReadTransfer,MultiChunk)112 TEST_F(ReadTransfer, MultiChunk) {
113   stream::MemoryWriterBuffer<64> writer;
114   Status transfer_status = Status::Unknown();
115 
116   ASSERT_EQ(OkStatus(),
117             client_.Read(4, writer, [&transfer_status](Status status) {
118               transfer_status = status;
119             }));
120 
121   transfer_thread_.WaitUntilEventIsProcessed();
122 
123   // First transfer parameters chunk is sent.
124   rpc::PayloadsView payloads =
125       context_.output().payloads<Transfer::Read>(context_.channel().id());
126   ASSERT_EQ(payloads.size(), 1u);
127   EXPECT_EQ(transfer_status, Status::Unknown());
128 
129   Chunk c0 = DecodeChunk(payloads[0]);
130   EXPECT_EQ(c0.transfer_id, 4u);
131   EXPECT_EQ(c0.offset, 0u);
132   EXPECT_EQ(c0.pending_bytes.value(), 64u);
133 
134   constexpr ConstByteSpan data(kData32);
135   context_.server().SendServerStream<Transfer::Read>(
136       EncodeChunk({.transfer_id = 4u, .offset = 0, .data = data.first(16)}));
137   transfer_thread_.WaitUntilEventIsProcessed();
138 
139   ASSERT_EQ(payloads.size(), 1u);
140 
141   context_.server().SendServerStream<Transfer::Read>(
142       EncodeChunk({.transfer_id = 4u,
143                    .offset = 16,
144                    .data = data.subspan(16),
145                    .remaining_bytes = 0}));
146   transfer_thread_.WaitUntilEventIsProcessed();
147 
148   ASSERT_EQ(payloads.size(), 2u);
149 
150   Chunk c1 = DecodeChunk(payloads[1]);
151   EXPECT_EQ(c1.transfer_id, 4u);
152   ASSERT_TRUE(c1.status.has_value());
153   EXPECT_EQ(c1.status.value(), OkStatus());
154 
155   EXPECT_EQ(transfer_status, OkStatus());
156   EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
157             0);
158 }
159 
TEST_F(ReadTransfer,MultipleTransfers)160 TEST_F(ReadTransfer, MultipleTransfers) {
161   stream::MemoryWriterBuffer<64> writer;
162   Status transfer_status = Status::Unknown();
163 
164   ASSERT_EQ(OkStatus(),
165             client_.Read(3, writer, [&transfer_status](Status status) {
166               transfer_status = status;
167             }));
168   transfer_thread_.WaitUntilEventIsProcessed();
169 
170   context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
171       {.transfer_id = 3u, .offset = 0, .data = kData32, .remaining_bytes = 0}));
172   transfer_thread_.WaitUntilEventIsProcessed();
173 
174   ASSERT_EQ(transfer_status, OkStatus());
175   transfer_status = Status::Unknown();
176 
177   ASSERT_EQ(OkStatus(),
178             client_.Read(3, writer, [&transfer_status](Status status) {
179               transfer_status = status;
180             }));
181   transfer_thread_.WaitUntilEventIsProcessed();
182 
183   context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
184       {.transfer_id = 3u, .offset = 0, .data = kData32, .remaining_bytes = 0}));
185   transfer_thread_.WaitUntilEventIsProcessed();
186 
187   EXPECT_EQ(transfer_status, OkStatus());
188 }
189 
190 class ReadTransferMaxBytes32 : public ReadTransfer {
191  protected:
ReadTransferMaxBytes32()192   ReadTransferMaxBytes32() : ReadTransfer(/*max_bytes_to_receive=*/32) {}
193 };
194 
TEST_F(ReadTransferMaxBytes32,SetsPendingBytesFromConstructorArg)195 TEST_F(ReadTransferMaxBytes32, SetsPendingBytesFromConstructorArg) {
196   stream::MemoryWriterBuffer<64> writer;
197   EXPECT_EQ(OkStatus(), client_.Read(5, writer, [](Status) {}));
198   transfer_thread_.WaitUntilEventIsProcessed();
199 
200   // First transfer parameters chunk is sent.
201   rpc::PayloadsView payloads =
202       context_.output().payloads<Transfer::Read>(context_.channel().id());
203   ASSERT_EQ(payloads.size(), 1u);
204 
205   Chunk c0 = DecodeChunk(payloads[0]);
206   EXPECT_EQ(c0.transfer_id, 5u);
207   EXPECT_EQ(c0.offset, 0u);
208   ASSERT_EQ(c0.pending_bytes.value(), 32u);
209 }
210 
TEST_F(ReadTransferMaxBytes32,SetsPendingBytesFromWriterLimit)211 TEST_F(ReadTransferMaxBytes32, SetsPendingBytesFromWriterLimit) {
212   stream::MemoryWriterBuffer<16> small_writer;
213   EXPECT_EQ(OkStatus(), client_.Read(5, small_writer, [](Status) {}));
214   transfer_thread_.WaitUntilEventIsProcessed();
215 
216   // First transfer parameters chunk is sent.
217   rpc::PayloadsView payloads =
218       context_.output().payloads<Transfer::Read>(context_.channel().id());
219   ASSERT_EQ(payloads.size(), 1u);
220 
221   Chunk c0 = DecodeChunk(payloads[0]);
222   EXPECT_EQ(c0.transfer_id, 5u);
223   EXPECT_EQ(c0.offset, 0u);
224   ASSERT_EQ(c0.pending_bytes.value(), 16u);
225 }
226 
TEST_F(ReadTransferMaxBytes32,MultiParameters)227 TEST_F(ReadTransferMaxBytes32, MultiParameters) {
228   stream::MemoryWriterBuffer<64> writer;
229   Status transfer_status = Status::Unknown();
230 
231   ASSERT_EQ(OkStatus(),
232             client_.Read(6, writer, [&transfer_status](Status status) {
233               transfer_status = status;
234             }));
235   transfer_thread_.WaitUntilEventIsProcessed();
236 
237   // First transfer parameters chunk is sent.
238   rpc::PayloadsView payloads =
239       context_.output().payloads<Transfer::Read>(context_.channel().id());
240   ASSERT_EQ(payloads.size(), 1u);
241   EXPECT_EQ(transfer_status, Status::Unknown());
242 
243   Chunk c0 = DecodeChunk(payloads[0]);
244   EXPECT_EQ(c0.transfer_id, 6u);
245   EXPECT_EQ(c0.offset, 0u);
246   ASSERT_EQ(c0.pending_bytes.value(), 32u);
247 
248   constexpr ConstByteSpan data(kData64);
249   context_.server().SendServerStream<Transfer::Read>(
250       EncodeChunk({.transfer_id = 6u, .offset = 0, .data = data.first(32)}));
251   transfer_thread_.WaitUntilEventIsProcessed();
252 
253   ASSERT_EQ(payloads.size(), 2u);
254   EXPECT_EQ(transfer_status, Status::Unknown());
255 
256   // Second parameters chunk.
257   Chunk c1 = DecodeChunk(payloads[1]);
258   EXPECT_EQ(c1.transfer_id, 6u);
259   EXPECT_EQ(c1.offset, 32u);
260   ASSERT_EQ(c1.pending_bytes.value(), 32u);
261 
262   context_.server().SendServerStream<Transfer::Read>(
263       EncodeChunk({.transfer_id = 6u,
264                    .offset = 32,
265                    .data = data.subspan(32),
266                    .remaining_bytes = 0}));
267   transfer_thread_.WaitUntilEventIsProcessed();
268 
269   ASSERT_EQ(payloads.size(), 3u);
270 
271   Chunk c2 = DecodeChunk(payloads[2]);
272   EXPECT_EQ(c2.transfer_id, 6u);
273   ASSERT_TRUE(c2.status.has_value());
274   EXPECT_EQ(c2.status.value(), OkStatus());
275 
276   EXPECT_EQ(transfer_status, OkStatus());
277   EXPECT_EQ(std::memcmp(writer.data(), data.data(), writer.bytes_written()), 0);
278 }
279 
TEST_F(ReadTransfer,UnexpectedOffset)280 TEST_F(ReadTransfer, UnexpectedOffset) {
281   stream::MemoryWriterBuffer<64> writer;
282   Status transfer_status = Status::Unknown();
283 
284   ASSERT_EQ(OkStatus(),
285             client_.Read(7, writer, [&transfer_status](Status status) {
286               transfer_status = status;
287             }));
288   transfer_thread_.WaitUntilEventIsProcessed();
289 
290   // First transfer parameters chunk is sent.
291   rpc::PayloadsView payloads =
292       context_.output().payloads<Transfer::Read>(context_.channel().id());
293   ASSERT_EQ(payloads.size(), 1u);
294   EXPECT_EQ(transfer_status, Status::Unknown());
295 
296   Chunk c0 = DecodeChunk(payloads[0]);
297   EXPECT_EQ(c0.transfer_id, 7u);
298   EXPECT_EQ(c0.offset, 0u);
299   EXPECT_EQ(c0.pending_bytes.value(), 64u);
300 
301   constexpr ConstByteSpan data(kData32);
302   context_.server().SendServerStream<Transfer::Read>(
303       EncodeChunk({.transfer_id = 7u, .offset = 0, .data = data.first(16)}));
304   transfer_thread_.WaitUntilEventIsProcessed();
305 
306   ASSERT_EQ(payloads.size(), 1u);
307   EXPECT_EQ(transfer_status, Status::Unknown());
308 
309   // Send a chunk with an incorrect offset. The client should resend parameters.
310   context_.server().SendServerStream<Transfer::Read>(
311       EncodeChunk({.transfer_id = 7u,
312                    .offset = 8,  // wrong!
313                    .data = data.subspan(16),
314                    .remaining_bytes = 0}));
315   transfer_thread_.WaitUntilEventIsProcessed();
316 
317   ASSERT_EQ(payloads.size(), 2u);
318   EXPECT_EQ(transfer_status, Status::Unknown());
319 
320   Chunk c1 = DecodeChunk(payloads[1]);
321   EXPECT_EQ(c1.transfer_id, 7u);
322   EXPECT_EQ(c1.offset, 16u);
323   EXPECT_EQ(c1.pending_bytes.value(), 48u);
324 
325   // Send the correct chunk, completing the transfer.
326   context_.server().SendServerStream<Transfer::Read>(
327       EncodeChunk({.transfer_id = 7u,
328                    .offset = 16,
329                    .data = data.subspan(16),
330                    .remaining_bytes = 0}));
331   transfer_thread_.WaitUntilEventIsProcessed();
332 
333   ASSERT_EQ(payloads.size(), 3u);
334 
335   Chunk c2 = DecodeChunk(payloads[2]);
336   EXPECT_EQ(c2.transfer_id, 7u);
337   ASSERT_TRUE(c2.status.has_value());
338   EXPECT_EQ(c2.status.value(), OkStatus());
339 
340   EXPECT_EQ(transfer_status, OkStatus());
341   EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
342             0);
343 }
344 
TEST_F(ReadTransferMaxBytes32,TooMuchData)345 TEST_F(ReadTransferMaxBytes32, TooMuchData) {
346   stream::MemoryWriterBuffer<32> writer;
347   Status transfer_status = Status::Unknown();
348 
349   ASSERT_EQ(OkStatus(),
350             client_.Read(8, writer, [&transfer_status](Status status) {
351               transfer_status = status;
352             }));
353   transfer_thread_.WaitUntilEventIsProcessed();
354 
355   // First transfer parameters chunk is sent.
356   rpc::PayloadsView payloads =
357       context_.output().payloads<Transfer::Read>(context_.channel().id());
358   ASSERT_EQ(payloads.size(), 1u);
359   EXPECT_EQ(transfer_status, Status::Unknown());
360 
361   Chunk c0 = DecodeChunk(payloads[0]);
362   EXPECT_EQ(c0.transfer_id, 8u);
363   EXPECT_EQ(c0.offset, 0u);
364   ASSERT_EQ(c0.pending_bytes.value(), 32u);
365 
366   constexpr ConstByteSpan data(kData64);
367 
368   // pending_bytes == 32
369   context_.server().SendServerStream<Transfer::Read>(
370       EncodeChunk({.transfer_id = 8u, .offset = 0, .data = data.first(16)}));
371 
372   // pending_bytes == 16
373   context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
374       {.transfer_id = 8u, .offset = 16, .data = data.subspan(16, 8)}));
375 
376   // pending_bytes == 8, send 16 instead.
377   context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
378       {.transfer_id = 8u, .offset = 24, .data = data.subspan(24, 16)}));
379   transfer_thread_.WaitUntilEventIsProcessed();
380 
381   ASSERT_EQ(payloads.size(), 4u);
382 
383   Chunk c1 = DecodeChunk(payloads[3]);
384   EXPECT_EQ(c1.transfer_id, 8u);
385   ASSERT_TRUE(c1.status.has_value());
386   EXPECT_EQ(c1.status.value(), Status::Internal());
387 
388   EXPECT_EQ(transfer_status, Status::Internal());
389 }
390 
TEST_F(ReadTransfer,ServerError)391 TEST_F(ReadTransfer, ServerError) {
392   stream::MemoryWriterBuffer<64> writer;
393   Status transfer_status = Status::Unknown();
394 
395   ASSERT_EQ(OkStatus(),
396             client_.Read(9, writer, [&transfer_status](Status status) {
397               transfer_status = status;
398             }));
399   transfer_thread_.WaitUntilEventIsProcessed();
400 
401   // First transfer parameters chunk is sent.
402   rpc::PayloadsView payloads =
403       context_.output().payloads<Transfer::Read>(context_.channel().id());
404   ASSERT_EQ(payloads.size(), 1u);
405   EXPECT_EQ(transfer_status, Status::Unknown());
406 
407   Chunk c0 = DecodeChunk(payloads[0]);
408   EXPECT_EQ(c0.transfer_id, 9u);
409   EXPECT_EQ(c0.offset, 0u);
410   ASSERT_EQ(c0.pending_bytes.value(), 64u);
411 
412   // Server sends an error. Client should not respond and terminate the
413   // transfer.
414   context_.server().SendServerStream<Transfer::Read>(
415       EncodeChunk({.transfer_id = 9u, .status = Status::NotFound()}));
416   transfer_thread_.WaitUntilEventIsProcessed();
417 
418   ASSERT_EQ(payloads.size(), 1u);
419   EXPECT_EQ(transfer_status, Status::NotFound());
420 }
421 
TEST_F(ReadTransfer,OnlySendsParametersOnceAfterDrop)422 TEST_F(ReadTransfer, OnlySendsParametersOnceAfterDrop) {
423   stream::MemoryWriterBuffer<64> writer;
424   Status transfer_status = Status::Unknown();
425 
426   ASSERT_EQ(OkStatus(),
427             client_.Read(10, writer, [&transfer_status](Status status) {
428               transfer_status = status;
429             }));
430   transfer_thread_.WaitUntilEventIsProcessed();
431 
432   // First transfer parameters chunk is sent.
433   rpc::PayloadsView payloads =
434       context_.output().payloads<Transfer::Read>(context_.channel().id());
435   ASSERT_EQ(payloads.size(), 1u);
436   EXPECT_EQ(transfer_status, Status::Unknown());
437 
438   Chunk c0 = DecodeChunk(payloads[0]);
439   EXPECT_EQ(c0.transfer_id, 10u);
440   EXPECT_EQ(c0.offset, 0u);
441   ASSERT_EQ(c0.pending_bytes.value(), 64u);
442 
443   constexpr ConstByteSpan data(kData64);
444 
445   // Send the first 8 bytes of the transfer.
446   context_.server().SendServerStream<Transfer::Read>(
447       EncodeChunk({.transfer_id = 10u, .offset = 0, .data = data.first(8)}));
448 
449   // Skip offset 8, send the rest starting from 16.
450   for (uint32_t offset = 16; offset < data.size(); offset += 8) {
451     context_.server().SendServerStream<Transfer::Read>(
452         EncodeChunk({.transfer_id = 10u,
453                      .offset = offset,
454                      .data = data.subspan(offset, 8)}));
455   }
456   transfer_thread_.WaitUntilEventIsProcessed();
457 
458   // Only one parameters update should be sent, with the offset of the initial
459   // dropped packet.
460   ASSERT_EQ(payloads.size(), 2u);
461 
462   Chunk c1 = DecodeChunk(payloads[1]);
463   EXPECT_EQ(c1.transfer_id, 10u);
464   EXPECT_EQ(c1.offset, 8u);
465   ASSERT_EQ(c1.pending_bytes.value(), 56u);
466 
467   // Send the remaining data to complete the transfer.
468   context_.server().SendServerStream<Transfer::Read>(
469       EncodeChunk({.transfer_id = 10u,
470                    .offset = 8,
471                    .data = data.subspan(8, 56),
472                    .remaining_bytes = 0}));
473   transfer_thread_.WaitUntilEventIsProcessed();
474 
475   ASSERT_EQ(payloads.size(), 3u);
476 
477   Chunk c2 = DecodeChunk(payloads[2]);
478   EXPECT_EQ(c2.transfer_id, 10u);
479   ASSERT_TRUE(c2.status.has_value());
480   EXPECT_EQ(c2.status.value(), OkStatus());
481 
482   EXPECT_EQ(transfer_status, OkStatus());
483 }
484 
TEST_F(ReadTransfer,ResendsParametersIfSentRepeatedChunkDuringRecovery)485 TEST_F(ReadTransfer, ResendsParametersIfSentRepeatedChunkDuringRecovery) {
486   stream::MemoryWriterBuffer<64> writer;
487   Status transfer_status = Status::Unknown();
488 
489   ASSERT_EQ(OkStatus(),
490             client_.Read(11, writer, [&transfer_status](Status status) {
491               transfer_status = status;
492             }));
493   transfer_thread_.WaitUntilEventIsProcessed();
494 
495   // First transfer parameters chunk is sent.
496   rpc::PayloadsView payloads =
497       context_.output().payloads<Transfer::Read>(context_.channel().id());
498   ASSERT_EQ(payloads.size(), 1u);
499   EXPECT_EQ(transfer_status, Status::Unknown());
500 
501   Chunk c0 = DecodeChunk(payloads[0]);
502   EXPECT_EQ(c0.transfer_id, 11u);
503   EXPECT_EQ(c0.offset, 0u);
504   ASSERT_EQ(c0.pending_bytes.value(), 64u);
505 
506   constexpr ConstByteSpan data(kData64);
507 
508   // Send the first 8 bytes of the transfer.
509   context_.server().SendServerStream<Transfer::Read>(
510       EncodeChunk({.transfer_id = 11u, .offset = 0, .data = data.first(8)}));
511 
512   // Skip offset 8, send the rest starting from 16.
513   for (uint32_t offset = 16; offset < data.size(); offset += 8) {
514     context_.server().SendServerStream<Transfer::Read>(
515         EncodeChunk({.transfer_id = 11u,
516                      .offset = offset,
517                      .data = data.subspan(offset, 8)}));
518   }
519   transfer_thread_.WaitUntilEventIsProcessed();
520 
521   // Only one parameters update should be sent, with the offset of the initial
522   // dropped packet.
523   ASSERT_EQ(payloads.size(), 2u);
524 
525   const Chunk last_chunk = {
526       .transfer_id = 11u, .offset = 56, .data = data.subspan(56)};
527 
528   // Re-send the final chunk of the block.
529   context_.server().SendServerStream<Transfer::Read>(EncodeChunk(last_chunk));
530   transfer_thread_.WaitUntilEventIsProcessed();
531 
532   // The original drop parameters should be re-sent.
533   ASSERT_EQ(payloads.size(), 3u);
534   Chunk c2 = DecodeChunk(payloads[2]);
535   EXPECT_EQ(c2.transfer_id, 11u);
536   EXPECT_EQ(c2.offset, 8u);
537   ASSERT_EQ(c2.pending_bytes.value(), 56u);
538 
539   // Do it again.
540   context_.server().SendServerStream<Transfer::Read>(EncodeChunk(last_chunk));
541   transfer_thread_.WaitUntilEventIsProcessed();
542 
543   ASSERT_EQ(payloads.size(), 4u);
544   Chunk c3 = DecodeChunk(payloads[3]);
545   EXPECT_EQ(c3.transfer_id, 11u);
546   EXPECT_EQ(c3.offset, 8u);
547   ASSERT_EQ(c3.pending_bytes.value(), 56u);
548 
549   // Finish the transfer normally.
550   context_.server().SendServerStream<Transfer::Read>(
551       EncodeChunk({.transfer_id = 11u,
552                    .offset = 8,
553                    .data = data.subspan(8, 56),
554                    .remaining_bytes = 0}));
555   transfer_thread_.WaitUntilEventIsProcessed();
556 
557   ASSERT_EQ(payloads.size(), 5u);
558 
559   Chunk c4 = DecodeChunk(payloads[4]);
560   EXPECT_EQ(c4.transfer_id, 11u);
561   ASSERT_TRUE(c4.status.has_value());
562   EXPECT_EQ(c4.status.value(), OkStatus());
563 
564   EXPECT_EQ(transfer_status, OkStatus());
565 }
566 
567 constexpr chrono::SystemClock::duration kTestTimeout =
568     std::chrono::milliseconds(50);
569 constexpr uint8_t kTestRetries = 3;
570 
TEST_F(ReadTransfer,Timeout_ResendsCurrentParameters)571 TEST_F(ReadTransfer, Timeout_ResendsCurrentParameters) {
572   stream::MemoryWriterBuffer<64> writer;
573   Status transfer_status = Status::Unknown();
574 
575   ASSERT_EQ(OkStatus(),
576             client_.Read(
577                 12,
578                 writer,
579                 [&transfer_status](Status status) { transfer_status = status; },
580                 kTestTimeout));
581   transfer_thread_.WaitUntilEventIsProcessed();
582 
583   // First transfer parameters chunk is sent.
584   rpc::PayloadsView payloads =
585       context_.output().payloads<Transfer::Read>(context_.channel().id());
586   ASSERT_EQ(payloads.size(), 1u);
587   EXPECT_EQ(transfer_status, Status::Unknown());
588 
589   Chunk c0 = DecodeChunk(payloads.back());
590   EXPECT_EQ(c0.transfer_id, 12u);
591   EXPECT_EQ(c0.offset, 0u);
592   EXPECT_EQ(c0.pending_bytes.value(), 64u);
593 
594   // Wait for the timeout to expire without doing anything. The client should
595   // resend its parameters chunk.
596   transfer_thread_.SimulateClientTimeout(12);
597   ASSERT_EQ(payloads.size(), 2u);
598 
599   Chunk c = DecodeChunk(payloads.back());
600   EXPECT_EQ(c.transfer_id, 12u);
601   EXPECT_EQ(c.offset, 0u);
602   EXPECT_EQ(c.pending_bytes.value(), 64u);
603 
604   // Transfer has not yet completed.
605   EXPECT_EQ(transfer_status, Status::Unknown());
606 
607   // Finish the transfer following the timeout.
608   context_.server().SendServerStream<Transfer::Read>(
609       EncodeChunk({.transfer_id = 12u,
610                    .offset = 0,
611                    .data = kData32,
612                    .remaining_bytes = 0}));
613   transfer_thread_.WaitUntilEventIsProcessed();
614 
615   ASSERT_EQ(payloads.size(), 3u);
616 
617   Chunk c4 = DecodeChunk(payloads.back());
618   EXPECT_EQ(c4.transfer_id, 12u);
619   ASSERT_TRUE(c4.status.has_value());
620   EXPECT_EQ(c4.status.value(), OkStatus());
621 
622   EXPECT_EQ(transfer_status, OkStatus());
623 }
624 
TEST_F(ReadTransfer,Timeout_ResendsUpdatedParameters)625 TEST_F(ReadTransfer, Timeout_ResendsUpdatedParameters) {
626   stream::MemoryWriterBuffer<64> writer;
627   Status transfer_status = Status::Unknown();
628 
629   ASSERT_EQ(OkStatus(),
630             client_.Read(
631                 13,
632                 writer,
633                 [&transfer_status](Status status) { transfer_status = status; },
634                 kTestTimeout));
635   transfer_thread_.WaitUntilEventIsProcessed();
636 
637   // First transfer parameters chunk is sent.
638   rpc::PayloadsView payloads =
639       context_.output().payloads<Transfer::Read>(context_.channel().id());
640   ASSERT_EQ(payloads.size(), 1u);
641   EXPECT_EQ(transfer_status, Status::Unknown());
642 
643   Chunk c0 = DecodeChunk(payloads.back());
644   EXPECT_EQ(c0.transfer_id, 13u);
645   EXPECT_EQ(c0.offset, 0u);
646   EXPECT_EQ(c0.pending_bytes.value(), 64u);
647 
648   constexpr ConstByteSpan data(kData32);
649 
650   // Send some data, but not everything.
651   context_.server().SendServerStream<Transfer::Read>(
652       EncodeChunk({.transfer_id = 13u, .offset = 0, .data = data.first(16)}));
653   transfer_thread_.WaitUntilEventIsProcessed();
654 
655   ASSERT_EQ(payloads.size(), 1u);
656 
657   // Wait for the timeout to expire without sending more data. The client should
658   // send an updated parameters chunk, accounting for the data already received.
659   transfer_thread_.SimulateClientTimeout(13);
660   ASSERT_EQ(payloads.size(), 2u);
661 
662   Chunk c = DecodeChunk(payloads.back());
663   EXPECT_EQ(c.transfer_id, 13u);
664   EXPECT_EQ(c.offset, 16u);
665   EXPECT_EQ(c.pending_bytes.value(), 48u);
666 
667   // Transfer has not yet completed.
668   EXPECT_EQ(transfer_status, Status::Unknown());
669 
670   // Send the rest of the data, finishing the transfer.
671   context_.server().SendServerStream<Transfer::Read>(
672       EncodeChunk({.transfer_id = 13u,
673                    .offset = 16,
674                    .data = data.subspan(16),
675                    .remaining_bytes = 0}));
676   transfer_thread_.WaitUntilEventIsProcessed();
677 
678   ASSERT_EQ(payloads.size(), 3u);
679 
680   Chunk c4 = DecodeChunk(payloads.back());
681   EXPECT_EQ(c4.transfer_id, 13u);
682   ASSERT_TRUE(c4.status.has_value());
683   EXPECT_EQ(c4.status.value(), OkStatus());
684 
685   EXPECT_EQ(transfer_status, OkStatus());
686 }
687 
TEST_F(ReadTransfer,Timeout_EndsTransferAfterMaxRetries)688 TEST_F(ReadTransfer, Timeout_EndsTransferAfterMaxRetries) {
689   stream::MemoryWriterBuffer<64> writer;
690   Status transfer_status = Status::Unknown();
691 
692   ASSERT_EQ(OkStatus(),
693             client_.Read(
694                 14,
695                 writer,
696                 [&transfer_status](Status status) { transfer_status = status; },
697                 kTestTimeout));
698   transfer_thread_.WaitUntilEventIsProcessed();
699 
700   // First transfer parameters chunk is sent.
701   rpc::PayloadsView payloads =
702       context_.output().payloads<Transfer::Read>(context_.channel().id());
703   ASSERT_EQ(payloads.size(), 1u);
704   EXPECT_EQ(transfer_status, Status::Unknown());
705 
706   Chunk c0 = DecodeChunk(payloads.back());
707   EXPECT_EQ(c0.transfer_id, 14u);
708   EXPECT_EQ(c0.offset, 0u);
709   EXPECT_EQ(c0.pending_bytes.value(), 64u);
710 
711   for (unsigned retry = 1; retry <= kTestRetries; ++retry) {
712     // Wait for the timeout to expire without doing anything. The client should
713     // resend its parameters chunk.
714     transfer_thread_.SimulateClientTimeout(14);
715     ASSERT_EQ(payloads.size(), retry + 1);
716 
717     Chunk c = DecodeChunk(payloads.back());
718     EXPECT_EQ(c.transfer_id, 14u);
719     EXPECT_EQ(c.offset, 0u);
720     EXPECT_EQ(c.pending_bytes.value(), 64u);
721 
722     // Transfer has not yet completed.
723     EXPECT_EQ(transfer_status, Status::Unknown());
724   }
725 
726   // Sleep one more time after the final retry. The client should cancel the
727   // transfer at this point and send a DEADLINE_EXCEEDED chunk.
728   transfer_thread_.SimulateClientTimeout(14);
729   ASSERT_EQ(payloads.size(), 5u);
730 
731   Chunk c4 = DecodeChunk(payloads.back());
732   EXPECT_EQ(c4.transfer_id, 14u);
733   ASSERT_TRUE(c4.status.has_value());
734   EXPECT_EQ(c4.status.value(), Status::DeadlineExceeded());
735 
736   EXPECT_EQ(transfer_status, Status::DeadlineExceeded());
737 
738   // After finishing the transfer, nothing else should be sent. Verify this by
739   // waiting for a bit.
740   this_thread::sleep_for(kTestTimeout * 4);
741   ASSERT_EQ(payloads.size(), 5u);
742 }
743 
TEST_F(ReadTransfer,Timeout_ReceivingDataResetsRetryCount)744 TEST_F(ReadTransfer, Timeout_ReceivingDataResetsRetryCount) {
745   stream::MemoryWriterBuffer<64> writer;
746   Status transfer_status = Status::Unknown();
747 
748   constexpr ConstByteSpan data(kData32);
749 
750   ASSERT_EQ(OkStatus(),
751             client_.Read(
752                 14,
753                 writer,
754                 [&transfer_status](Status status) { transfer_status = status; },
755                 kTestTimeout));
756   transfer_thread_.WaitUntilEventIsProcessed();
757 
758   // First transfer parameters chunk is sent.
759   rpc::PayloadsView payloads =
760       context_.output().payloads<Transfer::Read>(context_.channel().id());
761   ASSERT_EQ(payloads.size(), 1u);
762   EXPECT_EQ(transfer_status, Status::Unknown());
763 
764   Chunk c0 = DecodeChunk(payloads.back());
765   EXPECT_EQ(c0.transfer_id, 14u);
766   EXPECT_EQ(c0.offset, 0u);
767   EXPECT_EQ(c0.window_end_offset, 64u);
768 
769   // Simulate one less timeout than the maximum amount of retries.
770   for (unsigned retry = 1; retry <= kTestRetries - 1; ++retry) {
771     transfer_thread_.SimulateClientTimeout(14);
772     ASSERT_EQ(payloads.size(), retry + 1);
773 
774     Chunk c = DecodeChunk(payloads.back());
775     EXPECT_EQ(c.transfer_id, 14u);
776     EXPECT_EQ(c.offset, 0u);
777     EXPECT_EQ(c.window_end_offset, 64u);
778 
779     // Transfer has not yet completed.
780     EXPECT_EQ(transfer_status, Status::Unknown());
781   }
782 
783   // Send some data.
784   context_.server().SendServerStream<Transfer::Read>(
785       EncodeChunk({.transfer_id = 14u, .offset = 0, .data = data.first(16)}));
786   transfer_thread_.WaitUntilEventIsProcessed();
787   ASSERT_EQ(payloads.size(), 3u);
788 
789   // Time out a couple more times. The context's retry count should have been
790   // reset, so it should go through the standard retry flow instead of
791   // terminating the transfer.
792   transfer_thread_.SimulateClientTimeout(14);
793   ASSERT_EQ(payloads.size(), 4u);
794 
795   Chunk c = DecodeChunk(payloads.back());
796   EXPECT_FALSE(c.status.has_value());
797   EXPECT_EQ(c.transfer_id, 14u);
798   EXPECT_EQ(c.offset, 16u);
799   EXPECT_EQ(c.window_end_offset, 64u);
800 
801   transfer_thread_.SimulateClientTimeout(14);
802   ASSERT_EQ(payloads.size(), 5u);
803 
804   c = DecodeChunk(payloads.back());
805   EXPECT_FALSE(c.status.has_value());
806   EXPECT_EQ(c.transfer_id, 14u);
807   EXPECT_EQ(c.offset, 16u);
808   EXPECT_EQ(c.window_end_offset, 64u);
809 }
810 
TEST_F(ReadTransfer,InitialPacketFails_OnCompletedCalledWithDataLoss)811 TEST_F(ReadTransfer, InitialPacketFails_OnCompletedCalledWithDataLoss) {
812   stream::MemoryWriterBuffer<64> writer;
813   Status transfer_status = Status::Unknown();
814 
815   context_.output().set_send_status(Status::Unauthenticated());
816 
817   ASSERT_EQ(OkStatus(),
818             client_.Read(
819                 14,
820                 writer,
821                 [&transfer_status](Status status) {
822                   ASSERT_EQ(transfer_status,
823                             Status::Unknown());  // Must only call once
824                   transfer_status = status;
825                 },
826                 kTestTimeout));
827   transfer_thread_.WaitUntilEventIsProcessed();
828 
829   EXPECT_EQ(transfer_status, Status::Internal());
830 }
831 
832 class WriteTransfer : public ::testing::Test {
833  protected:
WriteTransfer()834   WriteTransfer()
835       : transfer_thread_(chunk_buffer_, encode_buffer_),
836         client_(context_.client(), context_.channel().id(), transfer_thread_),
837         system_thread_(TransferThreadOptions(), transfer_thread_) {}
838 
~WriteTransfer()839   ~WriteTransfer() {
840     transfer_thread_.Terminate();
841     system_thread_.join();
842   }
843 
844   rpc::RawClientTestContext<> context_;
845 
846   Thread<1, 1> transfer_thread_;
847   Client client_;
848 
849   std::array<std::byte, 64> chunk_buffer_;
850   std::array<std::byte, 64> encode_buffer_;
851 
852   thread::Thread system_thread_;
853 };
854 
TEST_F(WriteTransfer,SingleChunk)855 TEST_F(WriteTransfer, SingleChunk) {
856   stream::MemoryReader reader(kData32);
857   Status transfer_status = Status::Unknown();
858 
859   ASSERT_EQ(OkStatus(),
860             client_.Write(3, reader, [&transfer_status](Status status) {
861               transfer_status = status;
862             }));
863   transfer_thread_.WaitUntilEventIsProcessed();
864 
865   // The client begins by just sending the transfer ID.
866   rpc::PayloadsView payloads =
867       context_.output().payloads<Transfer::Write>(context_.channel().id());
868   ASSERT_EQ(payloads.size(), 1u);
869   EXPECT_EQ(transfer_status, Status::Unknown());
870 
871   Chunk c0 = DecodeChunk(payloads[0]);
872   EXPECT_EQ(c0.transfer_id, 3u);
873 
874   // Send transfer parameters. Client should send a data chunk and the final
875   // chunk.
876   rpc::test::WaitForPackets(context_.output(), 2, [this] {
877     context_.server().SendServerStream<Transfer::Write>(
878         EncodeChunk({.transfer_id = 3,
879                      .pending_bytes = 64,
880                      .max_chunk_size_bytes = 32,
881                      .offset = 0}));
882   });
883 
884   ASSERT_EQ(payloads.size(), 3u);
885 
886   Chunk c1 = DecodeChunk(payloads[1]);
887   EXPECT_EQ(c1.transfer_id, 3u);
888   EXPECT_EQ(c1.offset, 0u);
889   EXPECT_EQ(std::memcmp(c1.data.data(), kData32.data(), c1.data.size()), 0);
890 
891   Chunk c2 = DecodeChunk(payloads[2]);
892   EXPECT_EQ(c2.transfer_id, 3u);
893   ASSERT_TRUE(c2.remaining_bytes.has_value());
894   EXPECT_EQ(c2.remaining_bytes.value(), 0u);
895 
896   EXPECT_EQ(transfer_status, Status::Unknown());
897 
898   // Send the final status chunk to complete the transfer.
899   context_.server().SendServerStream<Transfer::Write>(
900       EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
901   transfer_thread_.WaitUntilEventIsProcessed();
902 
903   EXPECT_EQ(payloads.size(), 3u);
904   EXPECT_EQ(transfer_status, OkStatus());
905 }
906 
TEST_F(WriteTransfer,MultiChunk)907 TEST_F(WriteTransfer, MultiChunk) {
908   stream::MemoryReader reader(kData32);
909   Status transfer_status = Status::Unknown();
910 
911   ASSERT_EQ(OkStatus(),
912             client_.Write(4, reader, [&transfer_status](Status status) {
913               transfer_status = status;
914             }));
915   transfer_thread_.WaitUntilEventIsProcessed();
916 
917   // The client begins by just sending the transfer ID.
918   rpc::PayloadsView payloads =
919       context_.output().payloads<Transfer::Write>(context_.channel().id());
920   ASSERT_EQ(payloads.size(), 1u);
921   EXPECT_EQ(transfer_status, Status::Unknown());
922 
923   Chunk c0 = DecodeChunk(payloads[0]);
924   EXPECT_EQ(c0.transfer_id, 4u);
925 
926   // Send transfer parameters with a chunk size smaller than the data.
927 
928   // Client should send two data chunks and the final chunk.
929   rpc::test::WaitForPackets(context_.output(), 3, [this] {
930     context_.server().SendServerStream<Transfer::Write>(
931         EncodeChunk({.transfer_id = 4,
932                      .pending_bytes = 64,
933                      .max_chunk_size_bytes = 16,
934                      .offset = 0}));
935   });
936 
937   ASSERT_EQ(payloads.size(), 4u);
938 
939   Chunk c1 = DecodeChunk(payloads[1]);
940   EXPECT_EQ(c1.transfer_id, 4u);
941   EXPECT_EQ(c1.offset, 0u);
942   EXPECT_EQ(std::memcmp(c1.data.data(), kData32.data(), c1.data.size()), 0);
943 
944   Chunk c2 = DecodeChunk(payloads[2]);
945   EXPECT_EQ(c2.transfer_id, 4u);
946   EXPECT_EQ(c2.offset, 16u);
947   EXPECT_EQ(
948       std::memcmp(c2.data.data(), kData32.data() + c2.offset, c2.data.size()),
949       0);
950 
951   Chunk c3 = DecodeChunk(payloads[3]);
952   EXPECT_EQ(c3.transfer_id, 4u);
953   ASSERT_TRUE(c3.remaining_bytes.has_value());
954   EXPECT_EQ(c3.remaining_bytes.value(), 0u);
955 
956   EXPECT_EQ(transfer_status, Status::Unknown());
957 
958   // Send the final status chunk to complete the transfer.
959   context_.server().SendServerStream<Transfer::Write>(
960       EncodeChunk({.transfer_id = 4, .status = OkStatus()}));
961   transfer_thread_.WaitUntilEventIsProcessed();
962 
963   EXPECT_EQ(payloads.size(), 4u);
964   EXPECT_EQ(transfer_status, OkStatus());
965 }
966 
TEST_F(WriteTransfer,OutOfOrder_SeekSupported)967 TEST_F(WriteTransfer, OutOfOrder_SeekSupported) {
968   stream::MemoryReader reader(kData32);
969   Status transfer_status = Status::Unknown();
970 
971   ASSERT_EQ(OkStatus(),
972             client_.Write(5, reader, [&transfer_status](Status status) {
973               transfer_status = status;
974             }));
975   transfer_thread_.WaitUntilEventIsProcessed();
976 
977   // The client begins by just sending the transfer ID.
978   rpc::PayloadsView payloads =
979       context_.output().payloads<Transfer::Write>(context_.channel().id());
980   ASSERT_EQ(payloads.size(), 1u);
981   EXPECT_EQ(transfer_status, Status::Unknown());
982 
983   Chunk c0 = DecodeChunk(payloads[0]);
984   EXPECT_EQ(c0.transfer_id, 5u);
985 
986   // Send transfer parameters with a nonzero offset, requesting a seek.
987   // Client should send a data chunk and the final chunk.
988   rpc::test::WaitForPackets(context_.output(), 2, [this] {
989     context_.server().SendServerStream<Transfer::Write>(
990         EncodeChunk({.transfer_id = 5,
991                      .pending_bytes = 64,
992                      .max_chunk_size_bytes = 32,
993                      .offset = 16}));
994   });
995 
996   ASSERT_EQ(payloads.size(), 3u);
997 
998   Chunk c1 = DecodeChunk(payloads[1]);
999   EXPECT_EQ(c1.transfer_id, 5u);
1000   EXPECT_EQ(c1.offset, 16u);
1001   EXPECT_EQ(
1002       std::memcmp(c1.data.data(), kData32.data() + c1.offset, c1.data.size()),
1003       0);
1004 
1005   Chunk c2 = DecodeChunk(payloads[2]);
1006   EXPECT_EQ(c2.transfer_id, 5u);
1007   ASSERT_TRUE(c2.remaining_bytes.has_value());
1008   EXPECT_EQ(c2.remaining_bytes.value(), 0u);
1009 
1010   EXPECT_EQ(transfer_status, Status::Unknown());
1011 
1012   // Send the final status chunk to complete the transfer.
1013   context_.server().SendServerStream<Transfer::Write>(
1014       EncodeChunk({.transfer_id = 5, .status = OkStatus()}));
1015   transfer_thread_.WaitUntilEventIsProcessed();
1016 
1017   EXPECT_EQ(payloads.size(), 3u);
1018   EXPECT_EQ(transfer_status, OkStatus());
1019 }
1020 
1021 class FakeNonSeekableReader final : public stream::NonSeekableReader {
1022  public:
FakeNonSeekableReader(ConstByteSpan data)1023   FakeNonSeekableReader(ConstByteSpan data) : data_(data), position_(0) {}
1024 
1025  private:
DoRead(ByteSpan out)1026   StatusWithSize DoRead(ByteSpan out) final {
1027     if (position_ == data_.size()) {
1028       return StatusWithSize::OutOfRange();
1029     }
1030 
1031     size_t to_copy = std::min(out.size(), data_.size() - position_);
1032     std::memcpy(out.data(), data_.data() + position_, to_copy);
1033     position_ += to_copy;
1034 
1035     return StatusWithSize(to_copy);
1036   }
1037 
1038   ConstByteSpan data_;
1039   size_t position_;
1040 };
1041 
TEST_F(WriteTransfer,OutOfOrder_SeekNotSupported)1042 TEST_F(WriteTransfer, OutOfOrder_SeekNotSupported) {
1043   FakeNonSeekableReader reader(kData32);
1044   Status transfer_status = Status::Unknown();
1045 
1046   ASSERT_EQ(OkStatus(),
1047             client_.Write(6, reader, [&transfer_status](Status status) {
1048               transfer_status = status;
1049             }));
1050   transfer_thread_.WaitUntilEventIsProcessed();
1051 
1052   // The client begins by just sending the transfer ID.
1053   rpc::PayloadsView payloads =
1054       context_.output().payloads<Transfer::Write>(context_.channel().id());
1055   ASSERT_EQ(payloads.size(), 1u);
1056   EXPECT_EQ(transfer_status, Status::Unknown());
1057 
1058   Chunk c0 = DecodeChunk(payloads[0]);
1059   EXPECT_EQ(c0.transfer_id, 6u);
1060 
1061   // Send transfer parameters with a nonzero offset, requesting a seek.
1062   context_.server().SendServerStream<Transfer::Write>(
1063       EncodeChunk({.transfer_id = 6,
1064                    .pending_bytes = 64,
1065                    .max_chunk_size_bytes = 32,
1066                    .offset = 16}));
1067   transfer_thread_.WaitUntilEventIsProcessed();
1068 
1069   // Client should send a status chunk and end the transfer.
1070   ASSERT_EQ(payloads.size(), 2u);
1071 
1072   Chunk c1 = DecodeChunk(payloads[1]);
1073   EXPECT_EQ(c1.transfer_id, 6u);
1074   ASSERT_TRUE(c1.status.has_value());
1075   EXPECT_EQ(c1.status.value(), Status::Unimplemented());
1076 
1077   EXPECT_EQ(transfer_status, Status::Unimplemented());
1078 }
1079 
TEST_F(WriteTransfer,ServerError)1080 TEST_F(WriteTransfer, ServerError) {
1081   stream::MemoryReader reader(kData32);
1082   Status transfer_status = Status::Unknown();
1083 
1084   ASSERT_EQ(OkStatus(),
1085             client_.Write(7, reader, [&transfer_status](Status status) {
1086               transfer_status = status;
1087             }));
1088   transfer_thread_.WaitUntilEventIsProcessed();
1089 
1090   // The client begins by just sending the transfer ID.
1091   rpc::PayloadsView payloads =
1092       context_.output().payloads<Transfer::Write>(context_.channel().id());
1093   ASSERT_EQ(payloads.size(), 1u);
1094   EXPECT_EQ(transfer_status, Status::Unknown());
1095 
1096   Chunk c0 = DecodeChunk(payloads[0]);
1097   EXPECT_EQ(c0.transfer_id, 7u);
1098 
1099   // Send an error from the server.
1100   context_.server().SendServerStream<Transfer::Write>(
1101       EncodeChunk({.transfer_id = 7, .status = Status::NotFound()}));
1102   transfer_thread_.WaitUntilEventIsProcessed();
1103 
1104   // Client should not respond and terminate the transfer.
1105   EXPECT_EQ(payloads.size(), 1u);
1106   EXPECT_EQ(transfer_status, Status::NotFound());
1107 }
1108 
TEST_F(WriteTransfer,MalformedParametersChunk)1109 TEST_F(WriteTransfer, MalformedParametersChunk) {
1110   stream::MemoryReader reader(kData32);
1111   Status transfer_status = Status::Unknown();
1112 
1113   ASSERT_EQ(OkStatus(),
1114             client_.Write(8, reader, [&transfer_status](Status status) {
1115               transfer_status = status;
1116             }));
1117   transfer_thread_.WaitUntilEventIsProcessed();
1118 
1119   // The client begins by just sending the transfer ID.
1120   rpc::PayloadsView payloads =
1121       context_.output().payloads<Transfer::Write>(context_.channel().id());
1122   ASSERT_EQ(payloads.size(), 1u);
1123   EXPECT_EQ(transfer_status, Status::Unknown());
1124 
1125   Chunk c0 = DecodeChunk(payloads[0]);
1126   EXPECT_EQ(c0.transfer_id, 8u);
1127 
1128   // Send an invalid transfer parameters chunk without pending_bytes.
1129   context_.server().SendServerStream<Transfer::Write>(
1130       EncodeChunk({.transfer_id = 8, .max_chunk_size_bytes = 32}));
1131   transfer_thread_.WaitUntilEventIsProcessed();
1132 
1133   // Client should send a status chunk and end the transfer.
1134   ASSERT_EQ(payloads.size(), 2u);
1135 
1136   Chunk c1 = DecodeChunk(payloads[1]);
1137   EXPECT_EQ(c1.transfer_id, 8u);
1138   ASSERT_TRUE(c1.status.has_value());
1139   EXPECT_EQ(c1.status.value(), Status::InvalidArgument());
1140 
1141   EXPECT_EQ(transfer_status, Status::InvalidArgument());
1142 }
1143 
TEST_F(WriteTransfer,AbortIfZeroBytesAreRequested)1144 TEST_F(WriteTransfer, AbortIfZeroBytesAreRequested) {
1145   stream::MemoryReader reader(kData32);
1146   Status transfer_status = Status::Unknown();
1147 
1148   ASSERT_EQ(OkStatus(),
1149             client_.Write(9, reader, [&transfer_status](Status status) {
1150               transfer_status = status;
1151             }));
1152   transfer_thread_.WaitUntilEventIsProcessed();
1153 
1154   // The client begins by just sending the transfer ID.
1155   rpc::PayloadsView payloads =
1156       context_.output().payloads<Transfer::Write>(context_.channel().id());
1157   ASSERT_EQ(payloads.size(), 1u);
1158   EXPECT_EQ(transfer_status, Status::Unknown());
1159 
1160   Chunk c0 = DecodeChunk(payloads[0]);
1161   EXPECT_EQ(c0.transfer_id, 9u);
1162 
1163   // Send an invalid transfer parameters chunk with 0 pending_bytes.
1164   context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1165       {.transfer_id = 9, .pending_bytes = 0, .max_chunk_size_bytes = 32}));
1166   transfer_thread_.WaitUntilEventIsProcessed();
1167 
1168   // Client should send a status chunk and end the transfer.
1169   ASSERT_EQ(payloads.size(), 2u);
1170 
1171   Chunk c1 = DecodeChunk(payloads[1]);
1172   EXPECT_EQ(c1.transfer_id, 9u);
1173   ASSERT_TRUE(c1.status.has_value());
1174   EXPECT_EQ(c1.status.value(), Status::ResourceExhausted());
1175 
1176   EXPECT_EQ(transfer_status, Status::ResourceExhausted());
1177 }
1178 
TEST_F(WriteTransfer,Timeout_RetriesWithInitialChunk)1179 TEST_F(WriteTransfer, Timeout_RetriesWithInitialChunk) {
1180   stream::MemoryReader reader(kData32);
1181   Status transfer_status = Status::Unknown();
1182 
1183   ASSERT_EQ(OkStatus(),
1184             client_.Write(
1185                 10,
1186                 reader,
1187                 [&transfer_status](Status status) { transfer_status = status; },
1188                 kTestTimeout));
1189   transfer_thread_.WaitUntilEventIsProcessed();
1190 
1191   // The client begins by just sending the transfer ID.
1192   rpc::PayloadsView payloads =
1193       context_.output().payloads<Transfer::Write>(context_.channel().id());
1194   ASSERT_EQ(payloads.size(), 1u);
1195   EXPECT_EQ(transfer_status, Status::Unknown());
1196 
1197   Chunk c0 = DecodeChunk(payloads.back());
1198   EXPECT_EQ(c0.transfer_id, 10u);
1199 
1200   // Wait for the timeout to expire without doing anything. The client should
1201   // resend the initial transmit chunk.
1202   transfer_thread_.SimulateClientTimeout(10);
1203   ASSERT_EQ(payloads.size(), 2u);
1204 
1205   Chunk c = DecodeChunk(payloads.back());
1206   EXPECT_EQ(c.transfer_id, 10u);
1207 
1208   // Transfer has not yet completed.
1209   EXPECT_EQ(transfer_status, Status::Unknown());
1210 }
1211 
TEST_F(WriteTransfer,Timeout_RetriesWithMostRecentChunk)1212 TEST_F(WriteTransfer, Timeout_RetriesWithMostRecentChunk) {
1213   stream::MemoryReader reader(kData32);
1214   Status transfer_status = Status::Unknown();
1215 
1216   ASSERT_EQ(OkStatus(),
1217             client_.Write(
1218                 11,
1219                 reader,
1220                 [&transfer_status](Status status) { transfer_status = status; },
1221                 kTestTimeout));
1222   transfer_thread_.WaitUntilEventIsProcessed();
1223 
1224   // The client begins by just sending the transfer ID.
1225   rpc::PayloadsView payloads =
1226       context_.output().payloads<Transfer::Write>(context_.channel().id());
1227   ASSERT_EQ(payloads.size(), 1u);
1228   EXPECT_EQ(transfer_status, Status::Unknown());
1229 
1230   Chunk c0 = DecodeChunk(payloads.back());
1231   EXPECT_EQ(c0.transfer_id, 11u);
1232 
1233   // Send the first parameters chunk.
1234   rpc::test::WaitForPackets(context_.output(), 2, [this] {
1235     context_.server().SendServerStream<Transfer::Write>(
1236         EncodeChunk({.transfer_id = 11,
1237                      .pending_bytes = 16,
1238                      .max_chunk_size_bytes = 8,
1239                      .offset = 0}));
1240   });
1241   ASSERT_EQ(payloads.size(), 3u);
1242 
1243   EXPECT_EQ(transfer_status, Status::Unknown());
1244 
1245   Chunk c1 = DecodeChunk(payloads[1]);
1246   EXPECT_EQ(c1.transfer_id, 11u);
1247   EXPECT_EQ(c1.offset, 0u);
1248   EXPECT_EQ(c1.data.size(), 8u);
1249   EXPECT_EQ(std::memcmp(c1.data.data(), kData32.data(), c1.data.size()), 0);
1250 
1251   Chunk c2 = DecodeChunk(payloads[2]);
1252   EXPECT_EQ(c2.transfer_id, 11u);
1253   EXPECT_EQ(c2.offset, 8u);
1254   EXPECT_EQ(c2.data.size(), 8u);
1255   EXPECT_EQ(
1256       std::memcmp(c2.data.data(), kData32.data() + c2.offset, c1.data.size()),
1257       0);
1258 
1259   // Wait for the timeout to expire without doing anything. The client should
1260   // resend the most recently sent chunk.
1261   transfer_thread_.SimulateClientTimeout(11);
1262   ASSERT_EQ(payloads.size(), 4u);
1263 
1264   Chunk c3 = DecodeChunk(payloads[3]);
1265   EXPECT_EQ(c3.transfer_id, c2.transfer_id);
1266   EXPECT_EQ(c3.offset, c2.offset);
1267   EXPECT_EQ(c3.data.size(), c2.data.size());
1268   EXPECT_EQ(std::memcmp(c3.data.data(), c2.data.data(), c3.data.size()), 0);
1269 
1270   // Transfer has not yet completed.
1271   EXPECT_EQ(transfer_status, Status::Unknown());
1272 }
1273 
TEST_F(WriteTransfer,Timeout_RetriesWithSingleChunkTransfer)1274 TEST_F(WriteTransfer, Timeout_RetriesWithSingleChunkTransfer) {
1275   stream::MemoryReader reader(kData32);
1276   Status transfer_status = Status::Unknown();
1277 
1278   ASSERT_EQ(OkStatus(),
1279             client_.Write(
1280                 12,
1281                 reader,
1282                 [&transfer_status](Status status) { transfer_status = status; },
1283                 kTestTimeout));
1284   transfer_thread_.WaitUntilEventIsProcessed();
1285 
1286   // The client begins by just sending the transfer ID.
1287   rpc::PayloadsView payloads =
1288       context_.output().payloads<Transfer::Write>(context_.channel().id());
1289   ASSERT_EQ(payloads.size(), 1u);
1290   EXPECT_EQ(transfer_status, Status::Unknown());
1291 
1292   Chunk c0 = DecodeChunk(payloads.back());
1293   EXPECT_EQ(c0.transfer_id, 12u);
1294 
1295   // Send the first parameters chunk, requesting all the data. The client should
1296   // respond with one data chunk and a remaining_bytes = 0 chunk.
1297   rpc::test::WaitForPackets(context_.output(), 2, [this] {
1298     context_.server().SendServerStream<Transfer::Write>(
1299         EncodeChunk({.transfer_id = 12,
1300                      .pending_bytes = 64,
1301                      .max_chunk_size_bytes = 64,
1302                      .offset = 0}));
1303   });
1304   ASSERT_EQ(payloads.size(), 3u);
1305 
1306   EXPECT_EQ(transfer_status, Status::Unknown());
1307 
1308   Chunk c1 = DecodeChunk(payloads[1]);
1309   EXPECT_EQ(c1.transfer_id, 12u);
1310   EXPECT_EQ(c1.offset, 0u);
1311   EXPECT_EQ(c1.data.size(), 32u);
1312   EXPECT_EQ(std::memcmp(c1.data.data(), kData32.data(), c1.data.size()), 0);
1313 
1314   Chunk c2 = DecodeChunk(payloads[2]);
1315   EXPECT_EQ(c2.transfer_id, 12u);
1316   ASSERT_TRUE(c2.remaining_bytes.has_value());
1317   EXPECT_EQ(c2.remaining_bytes.value(), 0u);
1318 
1319   // Wait for the timeout to expire without doing anything. The client should
1320   // resend the data chunk.
1321   transfer_thread_.SimulateClientTimeout(12);
1322   ASSERT_EQ(payloads.size(), 4u);
1323 
1324   Chunk c3 = DecodeChunk(payloads[3]);
1325   EXPECT_EQ(c3.transfer_id, c1.transfer_id);
1326   EXPECT_EQ(c3.offset, c1.offset);
1327   EXPECT_EQ(c3.data.size(), c1.data.size());
1328   EXPECT_EQ(std::memcmp(c3.data.data(), c1.data.data(), c3.data.size()), 0);
1329 
1330   // The remaining_bytes = 0 chunk should be resent on the next parameters.
1331   context_.server().SendServerStream<Transfer::Write>(
1332       EncodeChunk({.transfer_id = 12,
1333                    .pending_bytes = 64,
1334                    .max_chunk_size_bytes = 64,
1335                    .offset = 32}));
1336   transfer_thread_.WaitUntilEventIsProcessed();
1337 
1338   ASSERT_EQ(payloads.size(), 5u);
1339 
1340   Chunk c4 = DecodeChunk(payloads[4]);
1341   EXPECT_EQ(c4.transfer_id, 12u);
1342   ASSERT_TRUE(c4.remaining_bytes.has_value());
1343   EXPECT_EQ(c4.remaining_bytes.value(), 0u);
1344 
1345   context_.server().SendServerStream<Transfer::Write>(
1346       EncodeChunk({.transfer_id = 12, .status = OkStatus()}));
1347   transfer_thread_.WaitUntilEventIsProcessed();
1348 
1349   EXPECT_EQ(transfer_status, OkStatus());
1350 }
1351 
TEST_F(WriteTransfer,Timeout_EndsTransferAfterMaxRetries)1352 TEST_F(WriteTransfer, Timeout_EndsTransferAfterMaxRetries) {
1353   stream::MemoryReader reader(kData32);
1354   Status transfer_status = Status::Unknown();
1355 
1356   ASSERT_EQ(OkStatus(),
1357             client_.Write(
1358                 13,
1359                 reader,
1360                 [&transfer_status](Status status) { transfer_status = status; },
1361                 kTestTimeout));
1362   transfer_thread_.WaitUntilEventIsProcessed();
1363 
1364   // The client begins by just sending the transfer ID.
1365   rpc::PayloadsView payloads =
1366       context_.output().payloads<Transfer::Write>(context_.channel().id());
1367   ASSERT_EQ(payloads.size(), 1u);
1368   EXPECT_EQ(transfer_status, Status::Unknown());
1369 
1370   Chunk c0 = DecodeChunk(payloads.back());
1371   EXPECT_EQ(c0.transfer_id, 13u);
1372 
1373   for (unsigned retry = 1; retry <= kTestRetries; ++retry) {
1374     // Wait for the timeout to expire without doing anything. The client should
1375     // resend the initial transmit chunk.
1376     transfer_thread_.SimulateClientTimeout(13);
1377     ASSERT_EQ(payloads.size(), retry + 1);
1378 
1379     Chunk c = DecodeChunk(payloads.back());
1380     EXPECT_EQ(c.transfer_id, 13u);
1381 
1382     // Transfer has not yet completed.
1383     EXPECT_EQ(transfer_status, Status::Unknown());
1384   }
1385 
1386   // Sleep one more time after the final retry. The client should cancel the
1387   // transfer at this point and send a DEADLINE_EXCEEDED chunk.
1388   transfer_thread_.SimulateClientTimeout(13);
1389   ASSERT_EQ(payloads.size(), 5u);
1390 
1391   Chunk c4 = DecodeChunk(payloads.back());
1392   EXPECT_EQ(c4.transfer_id, 13u);
1393   ASSERT_TRUE(c4.status.has_value());
1394   EXPECT_EQ(c4.status.value(), Status::DeadlineExceeded());
1395 
1396   EXPECT_EQ(transfer_status, Status::DeadlineExceeded());
1397 
1398   // After finishing the transfer, nothing else should be sent. Verify this by
1399   // waiting for a bit.
1400   this_thread::sleep_for(kTestTimeout * 4);
1401   ASSERT_EQ(payloads.size(), 5u);
1402 }
1403 
TEST_F(WriteTransfer,Timeout_NonSeekableReaderEndsTransfer)1404 TEST_F(WriteTransfer, Timeout_NonSeekableReaderEndsTransfer) {
1405   FakeNonSeekableReader reader(kData32);
1406   Status transfer_status = Status::Unknown();
1407 
1408   ASSERT_EQ(OkStatus(),
1409             client_.Write(
1410                 14,
1411                 reader,
1412                 [&transfer_status](Status status) { transfer_status = status; },
1413                 kTestTimeout));
1414   transfer_thread_.WaitUntilEventIsProcessed();
1415 
1416   // The client begins by just sending the transfer ID.
1417   rpc::PayloadsView payloads =
1418       context_.output().payloads<Transfer::Write>(context_.channel().id());
1419   ASSERT_EQ(payloads.size(), 1u);
1420   EXPECT_EQ(transfer_status, Status::Unknown());
1421 
1422   Chunk c0 = DecodeChunk(payloads.back());
1423   EXPECT_EQ(c0.transfer_id, 14u);
1424 
1425   // Send the first parameters chunk.
1426   rpc::test::WaitForPackets(context_.output(), 2, [this] {
1427     context_.server().SendServerStream<Transfer::Write>(
1428         EncodeChunk({.transfer_id = 14,
1429                      .pending_bytes = 16,
1430                      .max_chunk_size_bytes = 8,
1431                      .offset = 0}));
1432   });
1433   ASSERT_EQ(payloads.size(), 3u);
1434 
1435   EXPECT_EQ(transfer_status, Status::Unknown());
1436 
1437   Chunk c1 = DecodeChunk(payloads[1]);
1438   EXPECT_EQ(c1.transfer_id, 14u);
1439   EXPECT_EQ(c1.offset, 0u);
1440   EXPECT_EQ(c1.data.size(), 8u);
1441   EXPECT_EQ(std::memcmp(c1.data.data(), kData32.data(), c1.data.size()), 0);
1442 
1443   Chunk c2 = DecodeChunk(payloads[2]);
1444   EXPECT_EQ(c2.transfer_id, 14u);
1445   EXPECT_EQ(c2.offset, 8u);
1446   EXPECT_EQ(c2.data.size(), 8u);
1447   EXPECT_EQ(
1448       std::memcmp(c2.data.data(), kData32.data() + c2.offset, c1.data.size()),
1449       0);
1450 
1451   // Wait for the timeout to expire without doing anything. The client should
1452   // fail to seek back and end the transfer.
1453   transfer_thread_.SimulateClientTimeout(14);
1454   ASSERT_EQ(payloads.size(), 4u);
1455 
1456   Chunk c3 = DecodeChunk(payloads[3]);
1457   EXPECT_EQ(c3.transfer_id, 14u);
1458   ASSERT_TRUE(c3.status.has_value());
1459   EXPECT_EQ(c3.status.value(), Status::DeadlineExceeded());
1460 
1461   EXPECT_EQ(transfer_status, Status::DeadlineExceeded());
1462 }
1463 
1464 PW_MODIFY_DIAGNOSTICS_POP();
1465 
1466 }  // namespace
1467 }  // namespace pw::transfer::test
1468