• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_transfer/transfer.h"
16 
17 #include "gtest/gtest.h"
18 #include "pw_bytes/array.h"
19 #include "pw_rpc/raw/test_method_context.h"
20 #include "pw_rpc/thread_testing.h"
21 #include "pw_thread/thread.h"
22 #include "pw_thread_stl/options.h"
23 #include "pw_transfer/transfer.pwpb.h"
24 #include "pw_transfer_private/chunk_testing.h"
25 
26 namespace pw::transfer::test {
27 namespace {
28 
29 using namespace std::chrono_literals;
30 
31 PW_MODIFY_DIAGNOSTICS_PUSH();
32 PW_MODIFY_DIAGNOSTIC(ignored, "-Wmissing-field-initializers");
33 
34 // TODO(frolv): Have a generic way to obtain a thread for testing on any system.
TransferThreadOptions()35 thread::Options& TransferThreadOptions() {
36   static thread::stl::Options options;
37   return options;
38 }
39 
40 using internal::Chunk;
41 
42 class TestMemoryReader : public stream::SeekableReader {
43  public:
TestMemoryReader(std::span<const std::byte> data)44   constexpr TestMemoryReader(std::span<const std::byte> data)
45       : memory_reader_(data) {}
46 
DoSeek(ptrdiff_t offset,Whence origin)47   Status DoSeek(ptrdiff_t offset, Whence origin) override {
48     if (seek_status.ok()) {
49       return memory_reader_.Seek(offset, origin);
50     }
51     return seek_status;
52   }
53 
DoRead(ByteSpan dest)54   StatusWithSize DoRead(ByteSpan dest) final {
55     if (!read_status.ok()) {
56       return StatusWithSize(read_status, 0);
57     }
58 
59     auto result = memory_reader_.Read(dest);
60     return result.ok() ? StatusWithSize(result->size())
61                        : StatusWithSize(result.status(), 0);
62   }
63 
64   Status seek_status;
65   Status read_status;
66 
67  private:
68   stream::MemoryReader memory_reader_;
69 };
70 
71 class SimpleReadTransfer final : public ReadOnlyHandler {
72  public:
SimpleReadTransfer(uint32_t transfer_id,ConstByteSpan data)73   SimpleReadTransfer(uint32_t transfer_id, ConstByteSpan data)
74       : ReadOnlyHandler(transfer_id),
75         prepare_read_called(false),
76         finalize_read_called(false),
77         finalize_read_status(Status::Unknown()),
78         reader_(data) {}
79 
PrepareRead()80   Status PrepareRead() final {
81     prepare_read_called = true;
82 
83     if (!prepare_read_return_status.ok()) {
84       return prepare_read_return_status;
85     }
86 
87     EXPECT_EQ(reader_.seek_status, reader_.Seek(0));
88     set_reader(reader_);
89     return OkStatus();
90   }
91 
FinalizeRead(Status status)92   void FinalizeRead(Status status) final {
93     finalize_read_called = true;
94     finalize_read_status = status;
95   }
96 
set_seek_status(Status status)97   void set_seek_status(Status status) { reader_.seek_status = status; }
set_read_status(Status status)98   void set_read_status(Status status) { reader_.read_status = status; }
99 
100   bool prepare_read_called;
101   bool finalize_read_called;
102   Status prepare_read_return_status;
103   Status finalize_read_status;
104 
105  private:
106   TestMemoryReader reader_;
107 };
108 
__anon215251c30202(size_t i) 109 constexpr auto kData = bytes::Initialized<32>([](size_t i) { return i; });
110 
111 class ReadTransfer : public ::testing::Test {
112  protected:
ReadTransfer(size_t max_chunk_size_bytes=64)113   ReadTransfer(size_t max_chunk_size_bytes = 64)
114       : handler_(3, kData),
115         transfer_thread_(std::span(data_buffer_).first(max_chunk_size_bytes),
116                          encode_buffer_),
117         ctx_(transfer_thread_, 64),
118         system_thread_(TransferThreadOptions(), transfer_thread_) {
119     ctx_.service().RegisterHandler(handler_);
120 
121     ASSERT_FALSE(handler_.prepare_read_called);
122     ASSERT_FALSE(handler_.finalize_read_called);
123 
124     ctx_.call();  // Open the read stream
125     transfer_thread_.WaitUntilEventIsProcessed();
126   }
127 
~ReadTransfer()128   ~ReadTransfer() {
129     transfer_thread_.Terminate();
130     system_thread_.join();
131   }
132 
133   SimpleReadTransfer handler_;
134   Thread<1, 1> transfer_thread_;
135   PW_RAW_TEST_METHOD_CONTEXT(TransferService, Read) ctx_;
136   thread::Thread system_thread_;
137   std::array<std::byte, 64> data_buffer_;
138   std::array<std::byte, 64> encode_buffer_;
139 };
140 
TEST_F(ReadTransfer,SingleChunk)141 TEST_F(ReadTransfer, SingleChunk) {
142   rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
143     ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
144                                        .window_end_offset = 64,
145                                        .pending_bytes = 64,
146                                        .offset = 0,
147                                        .type = Chunk::Type::kTransferStart}));
148 
149     transfer_thread_.WaitUntilEventIsProcessed();
150   });
151 
152   EXPECT_TRUE(handler_.prepare_read_called);
153   EXPECT_FALSE(handler_.finalize_read_called);
154 
155   ASSERT_EQ(ctx_.total_responses(), 2u);
156   Chunk c0 = DecodeChunk(ctx_.responses()[0]);
157   Chunk c1 = DecodeChunk(ctx_.responses()[1]);
158 
159   // First chunk should have all the read data.
160   EXPECT_EQ(c0.transfer_id, 3u);
161   EXPECT_EQ(c0.offset, 0u);
162   ASSERT_EQ(c0.data.size(), kData.size());
163   EXPECT_EQ(std::memcmp(c0.data.data(), kData.data(), c0.data.size()), 0);
164 
165   // Second chunk should be empty and set remaining_bytes = 0.
166   EXPECT_EQ(c1.transfer_id, 3u);
167   EXPECT_EQ(c1.data.size(), 0u);
168   ASSERT_TRUE(c1.remaining_bytes.has_value());
169   EXPECT_EQ(c1.remaining_bytes.value(), 0u);
170 
171   ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
172   transfer_thread_.WaitUntilEventIsProcessed();
173 
174   EXPECT_TRUE(handler_.finalize_read_called);
175   EXPECT_EQ(handler_.finalize_read_status, OkStatus());
176 }
177 
TEST_F(ReadTransfer,PendingBytes_SingleChunk)178 TEST_F(ReadTransfer, PendingBytes_SingleChunk) {
179   rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
180     ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
181                                        .pending_bytes = 64,
182                                        .offset = 0,
183                                        .type = Chunk::Type::kTransferStart}));
184 
185     transfer_thread_.WaitUntilEventIsProcessed();
186   });
187 
188   EXPECT_TRUE(handler_.prepare_read_called);
189   EXPECT_FALSE(handler_.finalize_read_called);
190 
191   ASSERT_EQ(ctx_.total_responses(), 2u);
192   Chunk c0 = DecodeChunk(ctx_.responses()[0]);
193   Chunk c1 = DecodeChunk(ctx_.responses()[1]);
194 
195   // First chunk should have all the read data.
196   EXPECT_EQ(c0.transfer_id, 3u);
197   EXPECT_EQ(c0.offset, 0u);
198   ASSERT_EQ(c0.data.size(), kData.size());
199   EXPECT_EQ(std::memcmp(c0.data.data(), kData.data(), c0.data.size()), 0);
200 
201   // Second chunk should be empty and set remaining_bytes = 0.
202   EXPECT_EQ(c1.transfer_id, 3u);
203   EXPECT_EQ(c1.data.size(), 0u);
204   ASSERT_TRUE(c1.remaining_bytes.has_value());
205   EXPECT_EQ(c1.remaining_bytes.value(), 0u);
206 
207   ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
208   transfer_thread_.WaitUntilEventIsProcessed();
209 
210   EXPECT_TRUE(handler_.finalize_read_called);
211   EXPECT_EQ(handler_.finalize_read_status, OkStatus());
212 }
213 
TEST_F(ReadTransfer,MultiChunk)214 TEST_F(ReadTransfer, MultiChunk) {
215   ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
216                                      .window_end_offset = 16,
217                                      .pending_bytes = 16,
218                                      .offset = 0,
219                                      .type = Chunk::Type::kTransferStart}));
220 
221   transfer_thread_.WaitUntilEventIsProcessed();
222 
223   EXPECT_TRUE(handler_.prepare_read_called);
224   EXPECT_FALSE(handler_.finalize_read_called);
225 
226   ASSERT_EQ(ctx_.total_responses(), 1u);
227   Chunk c0 = DecodeChunk(ctx_.responses()[0]);
228 
229   EXPECT_EQ(c0.transfer_id, 3u);
230   EXPECT_EQ(c0.offset, 0u);
231   ASSERT_EQ(c0.data.size(), 16u);
232   EXPECT_EQ(std::memcmp(c0.data.data(), kData.data(), c0.data.size()), 0);
233 
234   ctx_.SendClientStream(
235       EncodeChunk({.transfer_id = 3,
236                    .window_end_offset = 32,
237                    .pending_bytes = 16,
238                    .offset = 16,
239                    .type = Chunk::Type::kParametersContinue}));
240   transfer_thread_.WaitUntilEventIsProcessed();
241 
242   ASSERT_EQ(ctx_.total_responses(), 2u);
243   Chunk c1 = DecodeChunk(ctx_.responses()[1]);
244 
245   EXPECT_EQ(c1.transfer_id, 3u);
246   EXPECT_EQ(c1.offset, 16u);
247   ASSERT_EQ(c1.data.size(), 16u);
248   EXPECT_EQ(std::memcmp(c1.data.data(), kData.data() + 16, c1.data.size()), 0);
249 
250   ctx_.SendClientStream(
251       EncodeChunk({.transfer_id = 3,
252                    .window_end_offset = 48,
253                    .pending_bytes = 16,
254                    .offset = 32,
255                    .type = Chunk::Type::kParametersContinue}));
256   transfer_thread_.WaitUntilEventIsProcessed();
257 
258   ASSERT_EQ(ctx_.total_responses(), 3u);
259   Chunk c2 = DecodeChunk(ctx_.responses()[2]);
260 
261   EXPECT_EQ(c2.transfer_id, 3u);
262   EXPECT_EQ(c2.data.size(), 0u);
263   ASSERT_TRUE(c2.remaining_bytes.has_value());
264   EXPECT_EQ(c2.remaining_bytes.value(), 0u);
265 
266   ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
267   transfer_thread_.WaitUntilEventIsProcessed();
268 
269   EXPECT_TRUE(handler_.finalize_read_called);
270   EXPECT_EQ(handler_.finalize_read_status, OkStatus());
271 }
272 
TEST_F(ReadTransfer,MultiChunk_RepeatedContinuePackets)273 TEST_F(ReadTransfer, MultiChunk_RepeatedContinuePackets) {
274   ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
275                                      .window_end_offset = 16,
276                                      .pending_bytes = 16,
277                                      .offset = 0,
278                                      .type = Chunk::Type::kTransferStart}));
279 
280   transfer_thread_.WaitUntilEventIsProcessed();
281 
282   const auto continue_chunk =
283       EncodeChunk({.transfer_id = 3,
284                    .window_end_offset = 24,
285                    .pending_bytes = 8,
286                    .offset = 16,
287                    .type = Chunk::Type::kParametersContinue});
288   ctx_.SendClientStream(continue_chunk);
289 
290   transfer_thread_.WaitUntilEventIsProcessed();
291 
292   // Resend the CONTINUE packets that don't actually advance the window.
293   for (int i = 0; i < 3; ++i) {
294     ctx_.SendClientStream(continue_chunk);
295     transfer_thread_.WaitUntilEventIsProcessed();
296   }
297 
298   ASSERT_EQ(ctx_.total_responses(), 2u);  // Only sent one packet
299   Chunk c1 = DecodeChunk(ctx_.responses()[1]);
300 
301   EXPECT_EQ(c1.transfer_id, 3u);
302   EXPECT_EQ(c1.offset, 16u);
303   ASSERT_EQ(c1.data.size(), 8u);
304   EXPECT_EQ(std::memcmp(c1.data.data(), kData.data() + 16, c1.data.size()), 0);
305 }
306 
TEST_F(ReadTransfer,PendingBytes_MultiChunk)307 TEST_F(ReadTransfer, PendingBytes_MultiChunk) {
308   ctx_.SendClientStream(
309       EncodeChunk({.transfer_id = 3, .pending_bytes = 16, .offset = 0}));
310 
311   transfer_thread_.WaitUntilEventIsProcessed();
312 
313   EXPECT_TRUE(handler_.prepare_read_called);
314   EXPECT_FALSE(handler_.finalize_read_called);
315 
316   ASSERT_EQ(ctx_.total_responses(), 1u);
317   Chunk c0 = DecodeChunk(ctx_.responses()[0]);
318 
319   EXPECT_EQ(c0.transfer_id, 3u);
320   EXPECT_EQ(c0.offset, 0u);
321   ASSERT_EQ(c0.data.size(), 16u);
322   EXPECT_EQ(std::memcmp(c0.data.data(), kData.data(), c0.data.size()), 0);
323 
324   ctx_.SendClientStream(
325       EncodeChunk({.transfer_id = 3, .pending_bytes = 16, .offset = 16}));
326   transfer_thread_.WaitUntilEventIsProcessed();
327 
328   ASSERT_EQ(ctx_.total_responses(), 2u);
329   Chunk c1 = DecodeChunk(ctx_.responses()[1]);
330 
331   EXPECT_EQ(c1.transfer_id, 3u);
332   EXPECT_EQ(c1.offset, 16u);
333   ASSERT_EQ(c1.data.size(), 16u);
334   EXPECT_EQ(std::memcmp(c1.data.data(), kData.data() + 16, c1.data.size()), 0);
335 
336   ctx_.SendClientStream(
337       EncodeChunk({.transfer_id = 3, .pending_bytes = 16, .offset = 32}));
338   transfer_thread_.WaitUntilEventIsProcessed();
339 
340   ASSERT_EQ(ctx_.total_responses(), 3u);
341   Chunk c2 = DecodeChunk(ctx_.responses()[2]);
342 
343   EXPECT_EQ(c2.transfer_id, 3u);
344   EXPECT_EQ(c2.data.size(), 0u);
345   ASSERT_TRUE(c2.remaining_bytes.has_value());
346   EXPECT_EQ(c2.remaining_bytes.value(), 0u);
347 
348   ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
349   transfer_thread_.WaitUntilEventIsProcessed();
350 
351   EXPECT_TRUE(handler_.finalize_read_called);
352   EXPECT_EQ(handler_.finalize_read_status, OkStatus());
353 }
354 
TEST_F(ReadTransfer,OutOfOrder_SeekingSupported)355 TEST_F(ReadTransfer, OutOfOrder_SeekingSupported) {
356   rpc::test::WaitForPackets(ctx_.output(), 4, [this] {
357     ctx_.SendClientStream(
358         EncodeChunk({.transfer_id = 3, .pending_bytes = 16, .offset = 0}));
359 
360     transfer_thread_.WaitUntilEventIsProcessed();
361 
362     Chunk chunk = DecodeChunk(ctx_.responses().back());
363     EXPECT_TRUE(std::equal(
364         &kData[0], &kData[16], chunk.data.begin(), chunk.data.end()));
365 
366     ctx_.SendClientStream(
367         EncodeChunk({.transfer_id = 3, .pending_bytes = 8, .offset = 2}));
368 
369     transfer_thread_.WaitUntilEventIsProcessed();
370 
371     chunk = DecodeChunk(ctx_.responses().back());
372     EXPECT_TRUE(std::equal(
373         &kData[2], &kData[10], chunk.data.begin(), chunk.data.end()));
374 
375     ctx_.SendClientStream(
376         EncodeChunk({.transfer_id = 3, .pending_bytes = 64, .offset = 17}));
377   });
378 
379   ASSERT_EQ(ctx_.total_responses(), 4u);
380   Chunk chunk = DecodeChunk(ctx_.responses()[2]);
381   EXPECT_TRUE(std::equal(
382       &kData[17], kData.end(), chunk.data.begin(), chunk.data.end()));
383 }
384 
TEST_F(ReadTransfer,OutOfOrder_SeekingNotSupported_EndsWithUnimplemented)385 TEST_F(ReadTransfer, OutOfOrder_SeekingNotSupported_EndsWithUnimplemented) {
386   handler_.set_seek_status(Status::Unimplemented());
387 
388   ctx_.SendClientStream(
389       EncodeChunk({.transfer_id = 3, .pending_bytes = 16, .offset = 0}));
390   ctx_.SendClientStream(
391       EncodeChunk({.transfer_id = 3, .pending_bytes = 8, .offset = 2}));
392 
393   transfer_thread_.WaitUntilEventIsProcessed();
394 
395   ASSERT_EQ(ctx_.total_responses(), 2u);
396   Chunk chunk = DecodeChunk(ctx_.responses().back());
397   EXPECT_EQ(chunk.status, Status::Unimplemented());
398 }
399 
TEST_F(ReadTransfer,MaxChunkSize_Client)400 TEST_F(ReadTransfer, MaxChunkSize_Client) {
401   rpc::test::WaitForPackets(ctx_.output(), 5, [this] {
402     ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
403                                        .pending_bytes = 64,
404                                        .max_chunk_size_bytes = 8,
405                                        .offset = 0,
406                                        .type = Chunk::Type::kTransferStart}));
407   });
408 
409   EXPECT_TRUE(handler_.prepare_read_called);
410   EXPECT_FALSE(handler_.finalize_read_called);
411 
412   ASSERT_EQ(ctx_.total_responses(), 5u);
413   Chunk c0 = DecodeChunk(ctx_.responses()[0]);
414   Chunk c1 = DecodeChunk(ctx_.responses()[1]);
415   Chunk c2 = DecodeChunk(ctx_.responses()[2]);
416   Chunk c3 = DecodeChunk(ctx_.responses()[3]);
417   Chunk c4 = DecodeChunk(ctx_.responses()[4]);
418 
419   EXPECT_EQ(c0.transfer_id, 3u);
420   EXPECT_EQ(c0.offset, 0u);
421   ASSERT_EQ(c0.data.size(), 8u);
422   EXPECT_EQ(std::memcmp(c0.data.data(), kData.data(), c0.data.size()), 0);
423 
424   EXPECT_EQ(c1.transfer_id, 3u);
425   EXPECT_EQ(c1.offset, 8u);
426   ASSERT_EQ(c1.data.size(), 8u);
427   EXPECT_EQ(std::memcmp(c1.data.data(), kData.data() + 8, c1.data.size()), 0);
428 
429   EXPECT_EQ(c2.transfer_id, 3u);
430   EXPECT_EQ(c2.offset, 16u);
431   ASSERT_EQ(c2.data.size(), 8u);
432   EXPECT_EQ(std::memcmp(c2.data.data(), kData.data() + 16, c2.data.size()), 0);
433 
434   EXPECT_EQ(c3.transfer_id, 3u);
435   EXPECT_EQ(c3.offset, 24u);
436   ASSERT_EQ(c3.data.size(), 8u);
437   EXPECT_EQ(std::memcmp(c3.data.data(), kData.data() + 24, c3.data.size()), 0);
438 
439   EXPECT_EQ(c4.transfer_id, 3u);
440   EXPECT_EQ(c4.data.size(), 0u);
441   ASSERT_TRUE(c4.remaining_bytes.has_value());
442   EXPECT_EQ(c4.remaining_bytes.value(), 0u);
443 
444   ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
445   transfer_thread_.WaitUntilEventIsProcessed();
446 
447   EXPECT_TRUE(handler_.finalize_read_called);
448   EXPECT_EQ(handler_.finalize_read_status, OkStatus());
449 }
450 
TEST_F(ReadTransfer,HandlerIsClearedAfterTransfer)451 TEST_F(ReadTransfer, HandlerIsClearedAfterTransfer) {
452   ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
453                                      .window_end_offset = 64,
454                                      .pending_bytes = 64,
455                                      .offset = 0,
456                                      .type = Chunk::Type::kTransferStart}));
457   ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
458   transfer_thread_.WaitUntilEventIsProcessed();
459 
460   ASSERT_EQ(ctx_.total_responses(), 1u);
461   ASSERT_TRUE(handler_.prepare_read_called);
462   ASSERT_TRUE(handler_.finalize_read_called);
463   ASSERT_EQ(OkStatus(), handler_.finalize_read_status);
464 
465   // Now, clear state and start a second transfer
466   handler_.prepare_read_return_status = Status::FailedPrecondition();
467   handler_.prepare_read_called = false;
468   handler_.finalize_read_called = false;
469 
470   ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
471                                      .window_end_offset = 64,
472                                      .pending_bytes = 64,
473                                      .offset = 0,
474                                      .type = Chunk::Type::kTransferStart}));
475   transfer_thread_.WaitUntilEventIsProcessed();
476 
477   // Prepare failed, so the handler should not have been stored in the context,
478   // and finalize should not have been called.
479   ASSERT_TRUE(handler_.prepare_read_called);
480   ASSERT_FALSE(handler_.finalize_read_called);
481 }
482 
483 class ReadTransferMaxChunkSize8 : public ReadTransfer {
484  protected:
ReadTransferMaxChunkSize8()485   ReadTransferMaxChunkSize8() : ReadTransfer(/*max_chunk_size_bytes=*/8) {}
486 };
487 
TEST_F(ReadTransferMaxChunkSize8,MaxChunkSize_Server)488 TEST_F(ReadTransferMaxChunkSize8, MaxChunkSize_Server) {
489   // Client asks for max 16-byte chunks, but service places a limit of 8 bytes.
490   rpc::test::WaitForPackets(ctx_.output(), 5, [this] {
491     ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
492                                        .pending_bytes = 64,
493                                        .max_chunk_size_bytes = 16,
494                                        .offset = 0,
495                                        .type = Chunk::Type::kTransferStart}));
496   });
497 
498   EXPECT_TRUE(handler_.prepare_read_called);
499   EXPECT_FALSE(handler_.finalize_read_called);
500 
501   ASSERT_EQ(ctx_.total_responses(), 5u);
502   Chunk c0 = DecodeChunk(ctx_.responses()[0]);
503   Chunk c1 = DecodeChunk(ctx_.responses()[1]);
504   Chunk c2 = DecodeChunk(ctx_.responses()[2]);
505   Chunk c3 = DecodeChunk(ctx_.responses()[3]);
506   Chunk c4 = DecodeChunk(ctx_.responses()[4]);
507 
508   EXPECT_EQ(c0.transfer_id, 3u);
509   EXPECT_EQ(c0.offset, 0u);
510   ASSERT_EQ(c0.data.size(), 8u);
511   EXPECT_EQ(std::memcmp(c0.data.data(), kData.data(), c0.data.size()), 0);
512 
513   EXPECT_EQ(c1.transfer_id, 3u);
514   EXPECT_EQ(c1.offset, 8u);
515   ASSERT_EQ(c1.data.size(), 8u);
516   EXPECT_EQ(std::memcmp(c1.data.data(), kData.data() + 8, c1.data.size()), 0);
517 
518   EXPECT_EQ(c2.transfer_id, 3u);
519   EXPECT_EQ(c2.offset, 16u);
520   ASSERT_EQ(c2.data.size(), 8u);
521   EXPECT_EQ(std::memcmp(c2.data.data(), kData.data() + 16, c2.data.size()), 0);
522 
523   EXPECT_EQ(c3.transfer_id, 3u);
524   EXPECT_EQ(c3.offset, 24u);
525   ASSERT_EQ(c3.data.size(), 8u);
526   EXPECT_EQ(std::memcmp(c3.data.data(), kData.data() + 24, c3.data.size()), 0);
527 
528   EXPECT_EQ(c4.transfer_id, 3u);
529   EXPECT_EQ(c4.data.size(), 0u);
530   ASSERT_TRUE(c4.remaining_bytes.has_value());
531   EXPECT_EQ(c4.remaining_bytes.value(), 0u);
532 
533   ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
534   transfer_thread_.WaitUntilEventIsProcessed();
535 
536   EXPECT_TRUE(handler_.finalize_read_called);
537   EXPECT_EQ(handler_.finalize_read_status, OkStatus());
538 }
539 
TEST_F(ReadTransfer,ClientError)540 TEST_F(ReadTransfer, ClientError) {
541   ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
542                                      .pending_bytes = 16,
543                                      .offset = 0,
544                                      .type = Chunk::Type::kTransferStart}));
545 
546   transfer_thread_.WaitUntilEventIsProcessed();
547 
548   EXPECT_TRUE(handler_.prepare_read_called);
549   EXPECT_FALSE(handler_.finalize_read_called);
550   ASSERT_EQ(ctx_.total_responses(), 1u);
551 
552   // Send client error.
553   ctx_.SendClientStream(
554       EncodeChunk({.transfer_id = 3, .status = Status::OutOfRange()}));
555   transfer_thread_.WaitUntilEventIsProcessed();
556 
557   ASSERT_EQ(ctx_.total_responses(), 1u);
558   EXPECT_TRUE(handler_.finalize_read_called);
559   EXPECT_EQ(handler_.finalize_read_status, Status::OutOfRange());
560 }
561 
TEST_F(ReadTransfer,MalformedParametersChunk)562 TEST_F(ReadTransfer, MalformedParametersChunk) {
563   // pending_bytes is required in a parameters chunk.
564   ctx_.SendClientStream(EncodeChunk({.transfer_id = 3}));
565   transfer_thread_.WaitUntilEventIsProcessed();
566 
567   EXPECT_TRUE(handler_.prepare_read_called);
568   EXPECT_TRUE(handler_.finalize_read_called);
569   EXPECT_EQ(handler_.finalize_read_status, Status::InvalidArgument());
570 
571   ASSERT_EQ(ctx_.total_responses(), 1u);
572   Chunk chunk = DecodeChunk(ctx_.responses()[0]);
573   EXPECT_EQ(chunk.transfer_id, 3u);
574   ASSERT_TRUE(chunk.status.has_value());
575   EXPECT_EQ(chunk.status.value(), Status::InvalidArgument());
576 }
577 
TEST_F(ReadTransfer,UnregisteredHandler)578 TEST_F(ReadTransfer, UnregisteredHandler) {
579   ctx_.SendClientStream(EncodeChunk({.transfer_id = 11,
580                                      .pending_bytes = 32,
581                                      .offset = 0,
582                                      .type = Chunk::Type::kTransferStart}));
583   transfer_thread_.WaitUntilEventIsProcessed();
584 
585   ASSERT_EQ(ctx_.total_responses(), 1u);
586   Chunk chunk = DecodeChunk(ctx_.responses()[0]);
587   EXPECT_EQ(chunk.transfer_id, 11u);
588   ASSERT_TRUE(chunk.status.has_value());
589   EXPECT_EQ(chunk.status.value(), Status::NotFound());
590 }
591 
TEST_F(ReadTransfer,IgnoresNonPendingTransfers)592 TEST_F(ReadTransfer, IgnoresNonPendingTransfers) {
593   ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .offset = 3}));
594   ctx_.SendClientStream(EncodeChunk(
595       {.transfer_id = 3, .offset = 0, .data = std::span(kData).first(10)}));
596   ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
597   transfer_thread_.WaitUntilEventIsProcessed();
598 
599   // Only start transfer for initial packet.
600   EXPECT_FALSE(handler_.prepare_read_called);
601   EXPECT_FALSE(handler_.finalize_read_called);
602 }
603 
TEST_F(ReadTransfer,AbortAndRestartIfInitialPacketIsReceived)604 TEST_F(ReadTransfer, AbortAndRestartIfInitialPacketIsReceived) {
605   ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
606                                      .pending_bytes = 16,
607                                      .offset = 0,
608                                      .type = Chunk::Type::kTransferStart}));
609   transfer_thread_.WaitUntilEventIsProcessed();
610 
611   ASSERT_EQ(ctx_.total_responses(), 1u);
612 
613   EXPECT_TRUE(handler_.prepare_read_called);
614   EXPECT_FALSE(handler_.finalize_read_called);
615   handler_.prepare_read_called = false;  // Reset so can check if called again.
616 
617   ctx_.SendClientStream(  // Resend starting chunk
618       EncodeChunk({.transfer_id = 3,
619                    .pending_bytes = 16,
620                    .offset = 0,
621                    .type = Chunk::Type::kTransferStart}));
622   transfer_thread_.WaitUntilEventIsProcessed();
623 
624   ASSERT_EQ(ctx_.total_responses(), 2u);
625 
626   EXPECT_TRUE(handler_.prepare_read_called);
627   EXPECT_TRUE(handler_.finalize_read_called);
628   EXPECT_EQ(handler_.finalize_read_status, Status::Aborted());
629   handler_.finalize_read_called = false;  // Reset so can check later
630 
631   ctx_.SendClientStream(
632       EncodeChunk({.transfer_id = 3, .pending_bytes = 16, .offset = 16}));
633   ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
634   transfer_thread_.WaitUntilEventIsProcessed();
635 
636   ASSERT_EQ(ctx_.total_responses(), 3u);
637   EXPECT_TRUE(handler_.finalize_read_called);
638   EXPECT_EQ(handler_.finalize_read_status, OkStatus());
639 }
640 
TEST_F(ReadTransfer,ZeroPendingBytesWithRemainingData_Aborts)641 TEST_F(ReadTransfer, ZeroPendingBytesWithRemainingData_Aborts) {
642   ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
643                                      .pending_bytes = 0,
644                                      .type = Chunk::Type::kTransferStart}));
645   transfer_thread_.WaitUntilEventIsProcessed();
646 
647   ASSERT_EQ(ctx_.total_responses(), 1u);
648   ASSERT_TRUE(handler_.finalize_read_called);
649   EXPECT_EQ(handler_.finalize_read_status, Status::ResourceExhausted());
650 
651   Chunk chunk = DecodeChunk(ctx_.responses().back());
652   EXPECT_EQ(chunk.status, Status::ResourceExhausted());
653 }
654 
TEST_F(ReadTransfer,ZeroPendingBytesNoRemainingData_Completes)655 TEST_F(ReadTransfer, ZeroPendingBytesNoRemainingData_Completes) {
656   // Make the next read appear to be the end of the stream.
657   handler_.set_read_status(Status::OutOfRange());
658 
659   ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
660                                      .pending_bytes = 0,
661                                      .type = Chunk::Type::kTransferStart}));
662   transfer_thread_.WaitUntilEventIsProcessed();
663 
664   Chunk chunk = DecodeChunk(ctx_.responses().back());
665   EXPECT_EQ(chunk.transfer_id, 3u);
666   EXPECT_EQ(chunk.remaining_bytes, 0u);
667 
668   ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
669   transfer_thread_.WaitUntilEventIsProcessed();
670 
671   ASSERT_EQ(ctx_.total_responses(), 1u);
672   ASSERT_TRUE(handler_.finalize_read_called);
673   EXPECT_EQ(handler_.finalize_read_status, OkStatus());
674 }
675 
TEST_F(ReadTransfer,SendsErrorIfChunkIsReceivedInCompletedState)676 TEST_F(ReadTransfer, SendsErrorIfChunkIsReceivedInCompletedState) {
677   rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
678     ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
679                                        .pending_bytes = 64,
680                                        .offset = 0,
681                                        .type = Chunk::Type::kTransferStart}));
682   });
683 
684   EXPECT_TRUE(handler_.prepare_read_called);
685   EXPECT_FALSE(handler_.finalize_read_called);
686 
687   ASSERT_EQ(ctx_.total_responses(), 2u);
688   Chunk c0 = DecodeChunk(ctx_.responses()[0]);
689   Chunk c1 = DecodeChunk(ctx_.responses()[1]);
690 
691   // First chunk should have all the read data.
692   EXPECT_EQ(c0.transfer_id, 3u);
693   EXPECT_EQ(c0.offset, 0u);
694   ASSERT_EQ(c0.data.size(), kData.size());
695   EXPECT_EQ(std::memcmp(c0.data.data(), kData.data(), c0.data.size()), 0);
696 
697   // Second chunk should be empty and set remaining_bytes = 0.
698   EXPECT_EQ(c1.transfer_id, 3u);
699   EXPECT_EQ(c1.data.size(), 0u);
700   ASSERT_TRUE(c1.remaining_bytes.has_value());
701   EXPECT_EQ(c1.remaining_bytes.value(), 0u);
702 
703   ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
704   transfer_thread_.WaitUntilEventIsProcessed();
705 
706   EXPECT_TRUE(handler_.finalize_read_called);
707   EXPECT_EQ(handler_.finalize_read_status, OkStatus());
708 
709   // At this point the transfer should be in a completed state. Send a
710   // non-initial chunk as a continuation of the transfer.
711   handler_.finalize_read_called = false;
712 
713   ctx_.SendClientStream(
714       EncodeChunk({.transfer_id = 3, .pending_bytes = 48, .offset = 16}));
715   transfer_thread_.WaitUntilEventIsProcessed();
716 
717   ASSERT_EQ(ctx_.total_responses(), 3u);
718 
719   Chunk c2 = DecodeChunk(ctx_.responses()[2]);
720   ASSERT_TRUE(c2.status.has_value());
721   EXPECT_EQ(c2.status.value(), Status::FailedPrecondition());
722 
723   // FinalizeRead should not be called again.
724   EXPECT_FALSE(handler_.finalize_read_called);
725 }
726 
727 class SimpleWriteTransfer final : public WriteOnlyHandler {
728  public:
SimpleWriteTransfer(uint32_t transfer_id,ByteSpan data)729   SimpleWriteTransfer(uint32_t transfer_id, ByteSpan data)
730       : WriteOnlyHandler(transfer_id),
731         prepare_write_called(false),
732         finalize_write_called(false),
733         finalize_write_status(Status::Unknown()),
734         writer_(data) {}
735 
PrepareWrite()736   Status PrepareWrite() final {
737     EXPECT_EQ(OkStatus(), writer_.Seek(0));
738     set_writer(writer_);
739     prepare_write_called = true;
740     return OkStatus();
741   }
742 
FinalizeWrite(Status status)743   Status FinalizeWrite(Status status) final {
744     finalize_write_called = true;
745     finalize_write_status = status;
746     return finalize_write_return_status_;
747   }
748 
set_finalize_write_return(Status status)749   void set_finalize_write_return(Status status) {
750     finalize_write_return_status_ = status;
751   }
752 
753   bool prepare_write_called;
754   bool finalize_write_called;
755   Status finalize_write_status;
756 
757  private:
758   Status finalize_write_return_status_;
759   stream::MemoryWriter writer_;
760 };
761 
762 class WriteTransfer : public ::testing::Test {
763  protected:
WriteTransfer(size_t max_bytes_to_receive=64)764   WriteTransfer(size_t max_bytes_to_receive = 64)
765       : buffer{},
766         handler_(7, buffer),
767         transfer_thread_(data_buffer_, encode_buffer_),
768         system_thread_(TransferThreadOptions(), transfer_thread_),
769         ctx_(transfer_thread_,
770              max_bytes_to_receive,
771              // Use a long timeout to avoid accidentally triggering timeouts.
772              std::chrono::minutes(1)) {
773     ctx_.service().RegisterHandler(handler_);
774 
775     ASSERT_FALSE(handler_.prepare_write_called);
776     ASSERT_FALSE(handler_.finalize_write_called);
777 
778     ctx_.call();  // Open the write stream
779     transfer_thread_.WaitUntilEventIsProcessed();
780   }
781 
~WriteTransfer()782   ~WriteTransfer() {
783     transfer_thread_.Terminate();
784     system_thread_.join();
785   }
786 
787   std::array<std::byte, kData.size()> buffer;
788   SimpleWriteTransfer handler_;
789 
790   Thread<1, 1> transfer_thread_;
791   thread::Thread system_thread_;
792   std::array<std::byte, 64> data_buffer_;
793   std::array<std::byte, 64> encode_buffer_;
794   PW_RAW_TEST_METHOD_CONTEXT(TransferService, Write) ctx_;
795 };
796 
TEST_F(WriteTransfer,SingleChunk)797 TEST_F(WriteTransfer, SingleChunk) {
798   ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
799   transfer_thread_.WaitUntilEventIsProcessed();
800 
801   EXPECT_TRUE(handler_.prepare_write_called);
802   EXPECT_FALSE(handler_.finalize_write_called);
803 
804   ASSERT_EQ(ctx_.total_responses(), 1u);
805   Chunk chunk = DecodeChunk(ctx_.responses()[0]);
806   EXPECT_EQ(chunk.transfer_id, 7u);
807   ASSERT_TRUE(chunk.pending_bytes.has_value());
808   EXPECT_EQ(chunk.pending_bytes.value(), 32u);
809   ASSERT_TRUE(chunk.max_chunk_size_bytes.has_value());
810   EXPECT_EQ(chunk.max_chunk_size_bytes.value(), 37u);
811 
812   ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
813                                          .offset = 0,
814                                          .data = std::span(kData),
815                                          .remaining_bytes = 0}));
816   transfer_thread_.WaitUntilEventIsProcessed();
817 
818   ASSERT_EQ(ctx_.total_responses(), 2u);
819   chunk = DecodeChunk(ctx_.responses()[1]);
820   EXPECT_EQ(chunk.transfer_id, 7u);
821   ASSERT_TRUE(chunk.status.has_value());
822   EXPECT_EQ(chunk.status.value(), OkStatus());
823 
824   EXPECT_TRUE(handler_.finalize_write_called);
825   EXPECT_EQ(handler_.finalize_write_status, OkStatus());
826   EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
827 }
828 
TEST_F(WriteTransfer,FinalizeFails)829 TEST_F(WriteTransfer, FinalizeFails) {
830   // Return an error when FinalizeWrite is called.
831   handler_.set_finalize_write_return(Status::FailedPrecondition());
832 
833   ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
834   ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
835                                          .offset = 0,
836                                          .data = std::span(kData),
837                                          .remaining_bytes = 0}));
838   transfer_thread_.WaitUntilEventIsProcessed();
839 
840   ASSERT_EQ(ctx_.total_responses(), 2u);
841   Chunk chunk = DecodeChunk(ctx_.responses()[1]);
842   EXPECT_EQ(chunk.transfer_id, 7u);
843   ASSERT_TRUE(chunk.status.has_value());
844   EXPECT_EQ(chunk.status.value(), Status::DataLoss());
845 
846   EXPECT_TRUE(handler_.finalize_write_called);
847   EXPECT_EQ(handler_.finalize_write_status, OkStatus());
848 }
849 
TEST_F(WriteTransfer,SendingFinalPacketFails)850 TEST_F(WriteTransfer, SendingFinalPacketFails) {
851   ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
852   transfer_thread_.WaitUntilEventIsProcessed();
853 
854   ctx_.output().set_send_status(Status::Unknown());
855 
856   ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
857                                          .offset = 0,
858                                          .data = std::span(kData),
859                                          .remaining_bytes = 0}));
860   transfer_thread_.WaitUntilEventIsProcessed();
861 
862   // Should only have sent the transfer parameters.
863   ASSERT_EQ(ctx_.total_responses(), 1u);
864   Chunk chunk = DecodeChunk(ctx_.responses()[0]);
865   EXPECT_EQ(chunk.transfer_id, 7u);
866   ASSERT_TRUE(chunk.pending_bytes.has_value());
867   EXPECT_EQ(chunk.pending_bytes.value(), 32u);
868   ASSERT_TRUE(chunk.max_chunk_size_bytes.has_value());
869   EXPECT_EQ(chunk.max_chunk_size_bytes.value(), 37u);
870 
871   // When FinalizeWrite() was called, the transfer was considered successful.
872   EXPECT_TRUE(handler_.finalize_write_called);
873   EXPECT_EQ(handler_.finalize_write_status, OkStatus());
874 }
875 
TEST_F(WriteTransfer,MultiChunk)876 TEST_F(WriteTransfer, MultiChunk) {
877   ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
878   transfer_thread_.WaitUntilEventIsProcessed();
879 
880   EXPECT_TRUE(handler_.prepare_write_called);
881   EXPECT_FALSE(handler_.finalize_write_called);
882 
883   ASSERT_EQ(ctx_.total_responses(), 1u);
884   Chunk chunk = DecodeChunk(ctx_.responses()[0]);
885   EXPECT_EQ(chunk.transfer_id, 7u);
886   ASSERT_TRUE(chunk.pending_bytes.has_value());
887   EXPECT_EQ(chunk.pending_bytes.value(), 32u);
888 
889   ctx_.SendClientStream<64>(EncodeChunk(
890       {.transfer_id = 7, .offset = 0, .data = std::span(kData).first(8)}));
891   transfer_thread_.WaitUntilEventIsProcessed();
892 
893   ASSERT_EQ(ctx_.total_responses(), 1u);
894 
895   ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
896                                          .offset = 8,
897                                          .data = std::span(kData).subspan(8),
898                                          .remaining_bytes = 0}));
899   transfer_thread_.WaitUntilEventIsProcessed();
900 
901   ASSERT_EQ(ctx_.total_responses(), 2u);
902   chunk = DecodeChunk(ctx_.responses()[1]);
903   EXPECT_EQ(chunk.transfer_id, 7u);
904   ASSERT_TRUE(chunk.status.has_value());
905   EXPECT_EQ(chunk.status.value(), OkStatus());
906 
907   EXPECT_TRUE(handler_.finalize_write_called);
908   EXPECT_EQ(handler_.finalize_write_status, OkStatus());
909   EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
910 }
911 
TEST_F(WriteTransfer,WriteFailsOnRetry)912 TEST_F(WriteTransfer, WriteFailsOnRetry) {
913   // Skip one packet to fail on a retry.
914   ctx_.output().set_send_status(Status::FailedPrecondition(), 1);
915 
916   // Wait for 3 packets: initial params, retry attempt, final error
917   rpc::test::WaitForPackets(ctx_.output(), 3, [this] {
918     // Send only one client packet so the service times out.
919     ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
920     transfer_thread_.SimulateServerTimeout(7);  // Time out to trigger retry
921   });
922 
923   // Attempted to send 3 packets, but the 2nd packet was dropped.
924   // Check that the last packet is an INTERNAL error from the RPC write failure.
925   ASSERT_EQ(ctx_.total_responses(), 2u);
926   Chunk chunk = DecodeChunk(ctx_.responses()[1]);
927   EXPECT_EQ(chunk.transfer_id, 7u);
928   ASSERT_TRUE(chunk.status.has_value());
929   EXPECT_EQ(chunk.status.value(), Status::Internal());
930 }
931 
TEST_F(WriteTransfer,TimeoutInRecoveryState)932 TEST_F(WriteTransfer, TimeoutInRecoveryState) {
933   ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
934   transfer_thread_.WaitUntilEventIsProcessed();
935 
936   ASSERT_EQ(ctx_.total_responses(), 1u);
937   Chunk chunk = DecodeChunk(ctx_.responses().back());
938   EXPECT_EQ(chunk.transfer_id, 7u);
939   EXPECT_EQ(chunk.offset, 0u);
940   ASSERT_TRUE(chunk.pending_bytes.has_value());
941   EXPECT_EQ(chunk.pending_bytes.value(), 32u);
942 
943   constexpr std::span data(kData);
944 
945   ctx_.SendClientStream<64>(
946       EncodeChunk({.transfer_id = 7, .offset = 0, .data = data.first(8)}));
947 
948   // Skip offset 8 to enter a recovery state.
949   ctx_.SendClientStream<64>(EncodeChunk(
950       {.transfer_id = 7, .offset = 12, .data = data.subspan(12, 4)}));
951   transfer_thread_.WaitUntilEventIsProcessed();
952 
953   // Recovery parameters should be sent for offset 8.
954   ASSERT_EQ(ctx_.total_responses(), 2u);
955   chunk = DecodeChunk(ctx_.responses().back());
956   EXPECT_EQ(chunk.transfer_id, 7u);
957   EXPECT_EQ(chunk.offset, 8u);
958   ASSERT_TRUE(chunk.pending_bytes.has_value());
959   EXPECT_EQ(chunk.pending_bytes.value(), 24u);
960 
961   // Timeout while in the recovery state.
962   transfer_thread_.SimulateServerTimeout(7);
963   transfer_thread_.WaitUntilEventIsProcessed();
964 
965   // Same recovery parameters should be re-sent.
966   ASSERT_EQ(ctx_.total_responses(), 3u);
967   chunk = DecodeChunk(ctx_.responses().back());
968   EXPECT_EQ(chunk.transfer_id, 7u);
969   EXPECT_EQ(chunk.offset, 8u);
970   ASSERT_TRUE(chunk.pending_bytes.has_value());
971   EXPECT_EQ(chunk.pending_bytes.value(), 24u);
972 }
973 
TEST_F(WriteTransfer,ExtendWindow)974 TEST_F(WriteTransfer, ExtendWindow) {
975   ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
976   transfer_thread_.WaitUntilEventIsProcessed();
977 
978   EXPECT_TRUE(handler_.prepare_write_called);
979   EXPECT_FALSE(handler_.finalize_write_called);
980 
981   ASSERT_EQ(ctx_.total_responses(), 1u);
982   Chunk chunk = DecodeChunk(ctx_.responses()[0]);
983   EXPECT_EQ(chunk.transfer_id, 7u);
984   EXPECT_EQ(chunk.window_end_offset, 32u);
985   ASSERT_TRUE(chunk.pending_bytes.has_value());
986   EXPECT_EQ(chunk.pending_bytes.value(), 32u);
987 
988   // Window starts at 32 bytes and should extend when half of that is sent.
989   ctx_.SendClientStream<64>(EncodeChunk(
990       {.transfer_id = 7, .offset = 0, .data = std::span(kData).first(4)}));
991   transfer_thread_.WaitUntilEventIsProcessed();
992   ASSERT_EQ(ctx_.total_responses(), 1u);
993 
994   ctx_.SendClientStream<64>(EncodeChunk(
995       {.transfer_id = 7, .offset = 4, .data = std::span(kData).subspan(4, 4)}));
996   transfer_thread_.WaitUntilEventIsProcessed();
997   ASSERT_EQ(ctx_.total_responses(), 1u);
998 
999   ctx_.SendClientStream<64>(EncodeChunk(
1000       {.transfer_id = 7, .offset = 8, .data = std::span(kData).subspan(8, 4)}));
1001   transfer_thread_.WaitUntilEventIsProcessed();
1002   ASSERT_EQ(ctx_.total_responses(), 1u);
1003 
1004   ctx_.SendClientStream<64>(
1005       EncodeChunk({.transfer_id = 7,
1006                    .offset = 12,
1007                    .data = std::span(kData).subspan(12, 4)}));
1008   transfer_thread_.WaitUntilEventIsProcessed();
1009   ASSERT_EQ(ctx_.total_responses(), 2u);
1010 
1011   // Extend parameters chunk.
1012   chunk = DecodeChunk(ctx_.responses()[1]);
1013   EXPECT_EQ(chunk.transfer_id, 7u);
1014   EXPECT_EQ(chunk.window_end_offset, 32u);
1015   EXPECT_EQ(chunk.type, Chunk::Type::kParametersContinue);
1016   ASSERT_TRUE(chunk.pending_bytes.has_value());
1017   EXPECT_EQ(chunk.pending_bytes.value(), 16u);
1018 
1019   ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
1020                                          .offset = 16,
1021                                          .data = std::span(kData).subspan(16),
1022                                          .remaining_bytes = 0}));
1023   transfer_thread_.WaitUntilEventIsProcessed();
1024 
1025   ASSERT_EQ(ctx_.total_responses(), 3u);
1026   chunk = DecodeChunk(ctx_.responses()[2]);
1027   EXPECT_EQ(chunk.transfer_id, 7u);
1028   ASSERT_TRUE(chunk.status.has_value());
1029   EXPECT_EQ(chunk.status.value(), OkStatus());
1030 
1031   EXPECT_TRUE(handler_.finalize_write_called);
1032   EXPECT_EQ(handler_.finalize_write_status, OkStatus());
1033   EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
1034 }
1035 
1036 class WriteTransferMaxBytes16 : public WriteTransfer {
1037  protected:
WriteTransferMaxBytes16()1038   WriteTransferMaxBytes16() : WriteTransfer(/*max_bytes_to_receive=*/16) {}
1039 };
1040 
TEST_F(WriteTransfer,TransmitterReducesWindow)1041 TEST_F(WriteTransfer, TransmitterReducesWindow) {
1042   ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
1043   transfer_thread_.WaitUntilEventIsProcessed();
1044 
1045   EXPECT_TRUE(handler_.prepare_write_called);
1046   EXPECT_FALSE(handler_.finalize_write_called);
1047 
1048   ASSERT_EQ(ctx_.total_responses(), 1u);
1049   Chunk chunk = DecodeChunk(ctx_.responses().back());
1050   EXPECT_EQ(chunk.transfer_id, 7u);
1051   EXPECT_EQ(chunk.window_end_offset, 32u);
1052 
1053   // Send only 12 bytes and set that as the new end offset.
1054   ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
1055                                          .window_end_offset = 12,
1056                                          .offset = 0,
1057                                          .data = std::span(kData).first(12)}));
1058   transfer_thread_.WaitUntilEventIsProcessed();
1059   ASSERT_EQ(ctx_.total_responses(), 2u);
1060 
1061   // Receiver should respond immediately with a retransmit chunk as the end of
1062   // the window has been reached.
1063   chunk = DecodeChunk(ctx_.responses().back());
1064   EXPECT_EQ(chunk.transfer_id, 7u);
1065   EXPECT_EQ(chunk.offset, 12u);
1066   EXPECT_EQ(chunk.window_end_offset, 32u);
1067   EXPECT_EQ(chunk.type, Chunk::Type::kParametersRetransmit);
1068 }
1069 
TEST_F(WriteTransfer,TransmitterExtendsWindow_TerminatesWithInvalid)1070 TEST_F(WriteTransfer, TransmitterExtendsWindow_TerminatesWithInvalid) {
1071   ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
1072   transfer_thread_.WaitUntilEventIsProcessed();
1073 
1074   EXPECT_TRUE(handler_.prepare_write_called);
1075   EXPECT_FALSE(handler_.finalize_write_called);
1076 
1077   ASSERT_EQ(ctx_.total_responses(), 1u);
1078   Chunk chunk = DecodeChunk(ctx_.responses().back());
1079   EXPECT_EQ(chunk.transfer_id, 7u);
1080   EXPECT_EQ(chunk.window_end_offset, 32u);
1081 
1082   // Send only 12 bytes and set that as the new end offset.
1083   ctx_.SendClientStream<64>(
1084       EncodeChunk({.transfer_id = 7,
1085                    // Larger window end offset than the receiver's.
1086                    .window_end_offset = 48,
1087                    .offset = 0,
1088                    .data = std::span(kData).first(16)}));
1089   transfer_thread_.WaitUntilEventIsProcessed();
1090   ASSERT_EQ(ctx_.total_responses(), 2u);
1091 
1092   chunk = DecodeChunk(ctx_.responses().back());
1093   EXPECT_EQ(chunk.transfer_id, 7u);
1094   ASSERT_TRUE(chunk.status.has_value());
1095   EXPECT_EQ(chunk.status.value(), Status::Internal());
1096 }
1097 
TEST_F(WriteTransferMaxBytes16,MultipleParameters)1098 TEST_F(WriteTransferMaxBytes16, MultipleParameters) {
1099   ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
1100   transfer_thread_.WaitUntilEventIsProcessed();
1101 
1102   EXPECT_TRUE(handler_.prepare_write_called);
1103   EXPECT_FALSE(handler_.finalize_write_called);
1104 
1105   ASSERT_EQ(ctx_.total_responses(), 1u);
1106   Chunk chunk = DecodeChunk(ctx_.responses()[0]);
1107   EXPECT_EQ(chunk.transfer_id, 7u);
1108   ASSERT_TRUE(chunk.pending_bytes.has_value());
1109   EXPECT_EQ(chunk.pending_bytes.value(), 16u);
1110 
1111   ctx_.SendClientStream<64>(EncodeChunk(
1112       {.transfer_id = 7, .offset = 0, .data = std::span(kData).first(8)}));
1113   transfer_thread_.WaitUntilEventIsProcessed();
1114 
1115   ASSERT_EQ(ctx_.total_responses(), 2u);
1116   chunk = DecodeChunk(ctx_.responses()[1]);
1117   EXPECT_EQ(chunk.transfer_id, 7u);
1118   EXPECT_EQ(chunk.offset, 8u);
1119   EXPECT_EQ(chunk.window_end_offset, 24u);
1120   ASSERT_TRUE(chunk.pending_bytes.has_value());
1121   EXPECT_EQ(chunk.pending_bytes.value(), 16u);
1122 
1123   ctx_.SendClientStream<64>(EncodeChunk(
1124       {.transfer_id = 7, .offset = 8, .data = std::span(kData).subspan(8, 8)}));
1125   transfer_thread_.WaitUntilEventIsProcessed();
1126 
1127   ASSERT_EQ(ctx_.total_responses(), 3u);
1128   chunk = DecodeChunk(ctx_.responses()[2]);
1129   EXPECT_EQ(chunk.transfer_id, 7u);
1130   EXPECT_EQ(chunk.offset, 16u);
1131   EXPECT_EQ(chunk.window_end_offset, 32u);
1132   ASSERT_TRUE(chunk.pending_bytes.has_value());
1133   EXPECT_EQ(chunk.pending_bytes.value(), 16u);
1134 
1135   ctx_.SendClientStream<64>(
1136       EncodeChunk({.transfer_id = 7,
1137                    .offset = 16,
1138                    .data = std::span(kData).subspan(16, 8)}));
1139   transfer_thread_.WaitUntilEventIsProcessed();
1140 
1141   ASSERT_EQ(ctx_.total_responses(), 4u);
1142   chunk = DecodeChunk(ctx_.responses()[3]);
1143   EXPECT_EQ(chunk.transfer_id, 7u);
1144   EXPECT_EQ(chunk.offset, 24u);
1145   EXPECT_EQ(chunk.window_end_offset, 32u);
1146   ASSERT_TRUE(chunk.pending_bytes.has_value());
1147   EXPECT_EQ(chunk.pending_bytes.value(), 8u);
1148 
1149   ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
1150                                          .offset = 24,
1151                                          .data = std::span(kData).subspan(24),
1152                                          .remaining_bytes = 0}));
1153   transfer_thread_.WaitUntilEventIsProcessed();
1154 
1155   ASSERT_EQ(ctx_.total_responses(), 5u);
1156   chunk = DecodeChunk(ctx_.responses()[4]);
1157   EXPECT_EQ(chunk.transfer_id, 7u);
1158   ASSERT_TRUE(chunk.status.has_value());
1159   EXPECT_EQ(chunk.status.value(), OkStatus());
1160 
1161   EXPECT_TRUE(handler_.finalize_write_called);
1162   EXPECT_EQ(handler_.finalize_write_status, OkStatus());
1163   EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
1164 }
1165 
TEST_F(WriteTransferMaxBytes16,SetsDefaultPendingBytes)1166 TEST_F(WriteTransferMaxBytes16, SetsDefaultPendingBytes) {
1167   // Default max bytes is smaller than buffer.
1168   ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
1169   transfer_thread_.WaitUntilEventIsProcessed();
1170 
1171   ASSERT_EQ(ctx_.total_responses(), 1u);
1172   Chunk chunk = DecodeChunk(ctx_.responses()[0]);
1173   EXPECT_EQ(chunk.transfer_id, 7u);
1174   EXPECT_EQ(chunk.pending_bytes.value(), 16u);
1175 }
1176 
TEST_F(WriteTransfer,SetsWriterPendingBytes)1177 TEST_F(WriteTransfer, SetsWriterPendingBytes) {
1178   // Buffer is smaller than constructor's default max bytes.
1179   std::array<std::byte, 8> small_buffer = {};
1180 
1181   SimpleWriteTransfer handler_(987, small_buffer);
1182   ctx_.service().RegisterHandler(handler_);
1183 
1184   ctx_.SendClientStream(EncodeChunk({.transfer_id = 987}));
1185   transfer_thread_.WaitUntilEventIsProcessed();
1186 
1187   ASSERT_EQ(ctx_.total_responses(), 1u);
1188   Chunk chunk = DecodeChunk(ctx_.responses()[0]);
1189   EXPECT_EQ(chunk.transfer_id, 987u);
1190   EXPECT_EQ(chunk.pending_bytes.value(), 8u);
1191 }
1192 
TEST_F(WriteTransfer,UnexpectedOffset)1193 TEST_F(WriteTransfer, UnexpectedOffset) {
1194   ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
1195   transfer_thread_.WaitUntilEventIsProcessed();
1196 
1197   EXPECT_TRUE(handler_.prepare_write_called);
1198   EXPECT_FALSE(handler_.finalize_write_called);
1199 
1200   ASSERT_EQ(ctx_.total_responses(), 1u);
1201   Chunk chunk = DecodeChunk(ctx_.responses()[0]);
1202   EXPECT_EQ(chunk.transfer_id, 7u);
1203   EXPECT_EQ(chunk.offset, 0u);
1204   ASSERT_TRUE(chunk.pending_bytes.has_value());
1205   EXPECT_EQ(chunk.pending_bytes.value(), 32u);
1206 
1207   ctx_.SendClientStream<64>(EncodeChunk(
1208       {.transfer_id = 7, .offset = 0, .data = std::span(kData).first(8)}));
1209   transfer_thread_.WaitUntilEventIsProcessed();
1210 
1211   ASSERT_EQ(ctx_.total_responses(), 1u);
1212 
1213   ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
1214                                          .offset = 4,  // incorrect
1215                                          .data = std::span(kData).subspan(16),
1216                                          .remaining_bytes = 0}));
1217   transfer_thread_.WaitUntilEventIsProcessed();
1218 
1219   ASSERT_EQ(ctx_.total_responses(), 2u);
1220   chunk = DecodeChunk(ctx_.responses()[1]);
1221   EXPECT_EQ(chunk.transfer_id, 7u);
1222   EXPECT_EQ(chunk.offset, 8u);
1223   ASSERT_TRUE(chunk.pending_bytes.has_value());
1224   EXPECT_EQ(chunk.pending_bytes.value(), 24u);
1225 
1226   ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
1227                                          .offset = 8,  // correct
1228                                          .data = std::span(kData).subspan(8),
1229                                          .remaining_bytes = 0}));
1230   transfer_thread_.WaitUntilEventIsProcessed();
1231 
1232   ASSERT_EQ(ctx_.total_responses(), 3u);
1233   chunk = DecodeChunk(ctx_.responses()[2]);
1234   EXPECT_EQ(chunk.transfer_id, 7u);
1235   ASSERT_TRUE(chunk.status.has_value());
1236   EXPECT_EQ(chunk.status.value(), OkStatus());
1237 
1238   EXPECT_TRUE(handler_.finalize_write_called);
1239   EXPECT_EQ(handler_.finalize_write_status, OkStatus());
1240   EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
1241 }
1242 
TEST_F(WriteTransferMaxBytes16,TooMuchData)1243 TEST_F(WriteTransferMaxBytes16, TooMuchData) {
1244   ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
1245   transfer_thread_.WaitUntilEventIsProcessed();
1246 
1247   EXPECT_TRUE(handler_.prepare_write_called);
1248   EXPECT_FALSE(handler_.finalize_write_called);
1249 
1250   ASSERT_EQ(ctx_.total_responses(), 1u);
1251   Chunk chunk = DecodeChunk(ctx_.responses()[0]);
1252   EXPECT_EQ(chunk.transfer_id, 7u);
1253   ASSERT_TRUE(chunk.pending_bytes.has_value());
1254   EXPECT_EQ(chunk.pending_bytes.value(), 16u);
1255 
1256   // pending_bytes = 16 but send 24
1257   ctx_.SendClientStream<64>(EncodeChunk(
1258       {.transfer_id = 7, .offset = 0, .data = std::span(kData).first(24)}));
1259   transfer_thread_.WaitUntilEventIsProcessed();
1260 
1261   ASSERT_EQ(ctx_.total_responses(), 2u);
1262   chunk = DecodeChunk(ctx_.responses()[1]);
1263   EXPECT_EQ(chunk.transfer_id, 7u);
1264   ASSERT_TRUE(chunk.status.has_value());
1265   EXPECT_EQ(chunk.status.value(), Status::Internal());
1266 }
1267 
TEST_F(WriteTransfer,UnregisteredHandler)1268 TEST_F(WriteTransfer, UnregisteredHandler) {
1269   ctx_.SendClientStream(EncodeChunk({.transfer_id = 999}));
1270   transfer_thread_.WaitUntilEventIsProcessed();
1271 
1272   ASSERT_EQ(ctx_.total_responses(), 1u);
1273   Chunk chunk = DecodeChunk(ctx_.responses()[0]);
1274   EXPECT_EQ(chunk.transfer_id, 999u);
1275   ASSERT_TRUE(chunk.status.has_value());
1276   EXPECT_EQ(chunk.status.value(), Status::NotFound());
1277 }
1278 
TEST_F(WriteTransfer,ClientError)1279 TEST_F(WriteTransfer, ClientError) {
1280   ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
1281   transfer_thread_.WaitUntilEventIsProcessed();
1282 
1283   EXPECT_TRUE(handler_.prepare_write_called);
1284   EXPECT_FALSE(handler_.finalize_write_called);
1285 
1286   ASSERT_EQ(ctx_.total_responses(), 1u);
1287   Chunk chunk = DecodeChunk(ctx_.responses()[0]);
1288   EXPECT_EQ(chunk.transfer_id, 7u);
1289   ASSERT_TRUE(chunk.pending_bytes.has_value());
1290   EXPECT_EQ(chunk.pending_bytes.value(), 32u);
1291 
1292   ctx_.SendClientStream<64>(
1293       EncodeChunk({.transfer_id = 7, .status = Status::DataLoss()}));
1294   transfer_thread_.WaitUntilEventIsProcessed();
1295 
1296   EXPECT_EQ(ctx_.total_responses(), 1u);
1297 
1298   EXPECT_TRUE(handler_.finalize_write_called);
1299   EXPECT_EQ(handler_.finalize_write_status, Status::DataLoss());
1300 }
1301 
TEST_F(WriteTransfer,OnlySendParametersUpdateOnceAfterDrop)1302 TEST_F(WriteTransfer, OnlySendParametersUpdateOnceAfterDrop) {
1303   ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
1304   transfer_thread_.WaitUntilEventIsProcessed();
1305 
1306   ASSERT_EQ(ctx_.total_responses(), 1u);
1307 
1308   constexpr std::span data(kData);
1309   ctx_.SendClientStream<64>(
1310       EncodeChunk({.transfer_id = 7, .offset = 0, .data = data.first(1)}));
1311 
1312   // Drop offset 1, then send the rest of the data.
1313   for (uint32_t i = 2; i < kData.size(); ++i) {
1314     ctx_.SendClientStream<64>(EncodeChunk(
1315         {.transfer_id = 7, .offset = i, .data = data.subspan(i, 1)}));
1316   }
1317 
1318   transfer_thread_.WaitUntilEventIsProcessed();
1319 
1320   ASSERT_EQ(ctx_.total_responses(), 2u);
1321   Chunk chunk = DecodeChunk(ctx_.responses().back());
1322   EXPECT_EQ(chunk.transfer_id, 7u);
1323   EXPECT_EQ(chunk.offset, 1u);
1324 
1325   // Send the remaining data and the final status.
1326   ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
1327                                          .offset = 1,
1328                                          .data = data.subspan(1, 31),
1329                                          .status = OkStatus()}));
1330   transfer_thread_.WaitUntilEventIsProcessed();
1331 
1332   EXPECT_TRUE(handler_.finalize_write_called);
1333   EXPECT_EQ(handler_.finalize_write_status, OkStatus());
1334 }
1335 
TEST_F(WriteTransfer,ResendParametersIfSentRepeatedChunkDuringRecovery)1336 TEST_F(WriteTransfer, ResendParametersIfSentRepeatedChunkDuringRecovery) {
1337   ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
1338   transfer_thread_.WaitUntilEventIsProcessed();
1339 
1340   ASSERT_EQ(ctx_.total_responses(), 1u);
1341 
1342   constexpr std::span data(kData);
1343 
1344   // Skip offset 0, then send the rest of the data.
1345   for (uint32_t i = 1; i < kData.size(); ++i) {
1346     ctx_.SendClientStream<64>(EncodeChunk(
1347         {.transfer_id = 7, .offset = i, .data = data.subspan(i, 1)}));
1348   }
1349 
1350   transfer_thread_.WaitUntilEventIsProcessed();
1351 
1352   ASSERT_EQ(ctx_.total_responses(), 2u);  // Resent transfer parameters once.
1353 
1354   const auto last_chunk = EncodeChunk(
1355       {.transfer_id = 7, .offset = kData.size() - 1, .data = data.last(1)});
1356   ctx_.SendClientStream<64>(last_chunk);
1357   transfer_thread_.WaitUntilEventIsProcessed();
1358 
1359   // Resent transfer parameters since the packet is repeated
1360   ASSERT_EQ(ctx_.total_responses(), 3u);
1361 
1362   ctx_.SendClientStream<64>(last_chunk);
1363   transfer_thread_.WaitUntilEventIsProcessed();
1364 
1365   ASSERT_EQ(ctx_.total_responses(), 4u);
1366 
1367   Chunk chunk = DecodeChunk(ctx_.responses().back());
1368   EXPECT_EQ(chunk.transfer_id, 7u);
1369   EXPECT_EQ(chunk.offset, 0u);
1370   EXPECT_TRUE(chunk.pending_bytes.has_value());
1371 
1372   // Resumes normal operation when correct offset is sent.
1373   ctx_.SendClientStream<64>(EncodeChunk(
1374       {.transfer_id = 7, .offset = 0, .data = kData, .status = OkStatus()}));
1375   transfer_thread_.WaitUntilEventIsProcessed();
1376 
1377   EXPECT_TRUE(handler_.finalize_write_called);
1378   EXPECT_EQ(handler_.finalize_write_status, OkStatus());
1379 }
1380 
TEST_F(WriteTransfer,ResendsStatusIfClientRetriesAfterStatusChunk)1381 TEST_F(WriteTransfer, ResendsStatusIfClientRetriesAfterStatusChunk) {
1382   ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
1383   transfer_thread_.WaitUntilEventIsProcessed();
1384 
1385   ASSERT_EQ(ctx_.total_responses(), 1u);
1386 
1387   ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
1388                                          .offset = 0,
1389                                          .data = std::span(kData),
1390                                          .remaining_bytes = 0}));
1391   transfer_thread_.WaitUntilEventIsProcessed();
1392 
1393   ASSERT_EQ(ctx_.total_responses(), 2u);
1394   Chunk chunk = DecodeChunk(ctx_.responses().back());
1395   ASSERT_TRUE(chunk.status.has_value());
1396   EXPECT_EQ(chunk.status.value(), OkStatus());
1397 
1398   ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
1399                                          .offset = 0,
1400                                          .data = std::span(kData),
1401                                          .remaining_bytes = 0}));
1402   transfer_thread_.WaitUntilEventIsProcessed();
1403 
1404   ASSERT_EQ(ctx_.total_responses(), 3u);
1405   chunk = DecodeChunk(ctx_.responses().back());
1406   ASSERT_TRUE(chunk.status.has_value());
1407   EXPECT_EQ(chunk.status.value(), OkStatus());
1408 }
1409 
TEST_F(WriteTransfer,IgnoresNonPendingTransfers)1410 TEST_F(WriteTransfer, IgnoresNonPendingTransfers) {
1411   ctx_.SendClientStream(EncodeChunk({.transfer_id = 7, .offset = 3}));
1412   ctx_.SendClientStream(EncodeChunk(
1413       {.transfer_id = 7, .offset = 0, .data = std::span(kData).first(10)}));
1414   ctx_.SendClientStream(EncodeChunk({.transfer_id = 7, .status = OkStatus()}));
1415 
1416   transfer_thread_.WaitUntilEventIsProcessed();
1417 
1418   // Only start transfer for initial packet.
1419   EXPECT_FALSE(handler_.prepare_write_called);
1420   EXPECT_FALSE(handler_.finalize_write_called);
1421 }
1422 
TEST_F(WriteTransfer,AbortAndRestartIfInitialPacketIsReceived)1423 TEST_F(WriteTransfer, AbortAndRestartIfInitialPacketIsReceived) {
1424   ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
1425   transfer_thread_.WaitUntilEventIsProcessed();
1426 
1427   ASSERT_EQ(ctx_.total_responses(), 1u);
1428 
1429   ctx_.SendClientStream<64>(EncodeChunk(
1430       {.transfer_id = 7, .offset = 0, .data = std::span(kData).first(8)}));
1431   transfer_thread_.WaitUntilEventIsProcessed();
1432 
1433   ASSERT_EQ(ctx_.total_responses(), 1u);
1434 
1435   ASSERT_TRUE(handler_.prepare_write_called);
1436   ASSERT_FALSE(handler_.finalize_write_called);
1437   handler_.prepare_write_called = false;  // Reset to check it's called again.
1438 
1439   // Simulate client disappearing then restarting the transfer.
1440   ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
1441   transfer_thread_.WaitUntilEventIsProcessed();
1442 
1443   EXPECT_TRUE(handler_.prepare_write_called);
1444   EXPECT_TRUE(handler_.finalize_write_called);
1445   EXPECT_EQ(handler_.finalize_write_status, Status::Aborted());
1446 
1447   handler_.finalize_write_called = false;  // Reset to check it's called again.
1448 
1449   ASSERT_EQ(ctx_.total_responses(), 2u);
1450 
1451   ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
1452                                          .offset = 0,
1453                                          .data = std::span(kData),
1454                                          .remaining_bytes = 0}));
1455   transfer_thread_.WaitUntilEventIsProcessed();
1456 
1457   ASSERT_EQ(ctx_.total_responses(), 3u);
1458 
1459   EXPECT_TRUE(handler_.finalize_write_called);
1460   EXPECT_EQ(handler_.finalize_write_status, OkStatus());
1461   EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
1462 }
1463 
1464 class SometimesUnavailableReadHandler final : public ReadOnlyHandler {
1465  public:
SometimesUnavailableReadHandler(uint32_t transfer_id,ConstByteSpan data)1466   SometimesUnavailableReadHandler(uint32_t transfer_id, ConstByteSpan data)
1467       : ReadOnlyHandler(transfer_id), reader_(data), call_count_(0) {}
1468 
PrepareRead()1469   Status PrepareRead() final {
1470     if ((call_count_++ % 2) == 0) {
1471       return Status::Unavailable();
1472     }
1473 
1474     set_reader(reader_);
1475     return OkStatus();
1476   }
1477 
1478  private:
1479   stream::MemoryReader reader_;
1480   int call_count_;
1481 };
1482 
TEST_F(ReadTransfer,PrepareError)1483 TEST_F(ReadTransfer, PrepareError) {
1484   SometimesUnavailableReadHandler unavailable_handler(88, kData);
1485   ctx_.service().RegisterHandler(unavailable_handler);
1486 
1487   ctx_.SendClientStream(
1488       EncodeChunk({.transfer_id = 88, .pending_bytes = 128, .offset = 0}));
1489   transfer_thread_.WaitUntilEventIsProcessed();
1490 
1491   ASSERT_EQ(ctx_.total_responses(), 1u);
1492   Chunk chunk = DecodeChunk(ctx_.responses()[0]);
1493   EXPECT_EQ(chunk.transfer_id, 88u);
1494   ASSERT_TRUE(chunk.status.has_value());
1495   EXPECT_EQ(chunk.status.value(), Status::DataLoss());
1496 
1497   // Try starting the transfer again. It should work this time.
1498   // TODO(frolv): This won't work until completion ACKs are supported.
1499   if (false) {
1500     ctx_.SendClientStream(
1501         EncodeChunk({.transfer_id = 88, .pending_bytes = 128, .offset = 0}));
1502     transfer_thread_.WaitUntilEventIsProcessed();
1503 
1504     ASSERT_EQ(ctx_.total_responses(), 2u);
1505     chunk = DecodeChunk(ctx_.responses()[1]);
1506     EXPECT_EQ(chunk.transfer_id, 88u);
1507     ASSERT_EQ(chunk.data.size(), kData.size());
1508     EXPECT_EQ(std::memcmp(chunk.data.data(), kData.data(), chunk.data.size()),
1509               0);
1510   }
1511 }
1512 
TEST_F(WriteTransferMaxBytes16,Service_SetMaxPendingBytes)1513 TEST_F(WriteTransferMaxBytes16, Service_SetMaxPendingBytes) {
1514   ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
1515   transfer_thread_.WaitUntilEventIsProcessed();
1516 
1517   EXPECT_TRUE(handler_.prepare_write_called);
1518   EXPECT_FALSE(handler_.finalize_write_called);
1519 
1520   // First parameters chunk has default pending bytes of 16.
1521   ASSERT_EQ(ctx_.total_responses(), 1u);
1522   Chunk chunk = DecodeChunk(ctx_.responses()[0]);
1523   EXPECT_EQ(chunk.transfer_id, 7u);
1524   ASSERT_TRUE(chunk.pending_bytes.has_value());
1525   EXPECT_EQ(chunk.pending_bytes.value(), 16u);
1526 
1527   // Update the pending bytes value.
1528   ctx_.service().set_max_pending_bytes(12);
1529 
1530   ctx_.SendClientStream<64>(EncodeChunk(
1531       {.transfer_id = 7, .offset = 0, .data = std::span(kData).first(8)}));
1532   transfer_thread_.WaitUntilEventIsProcessed();
1533 
1534   // Second parameters chunk should use the new max pending bytes.
1535   ASSERT_EQ(ctx_.total_responses(), 2u);
1536   chunk = DecodeChunk(ctx_.responses()[1]);
1537   EXPECT_EQ(chunk.transfer_id, 7u);
1538   EXPECT_EQ(chunk.offset, 8u);
1539   EXPECT_EQ(chunk.window_end_offset, 20u);
1540   ASSERT_TRUE(chunk.pending_bytes.has_value());
1541   EXPECT_EQ(chunk.pending_bytes.value(), 12u);
1542 }
1543 
1544 PW_MODIFY_DIAGNOSTICS_POP();
1545 
1546 }  // namespace
1547 }  // namespace pw::transfer::test
1548