• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_transfer/transfer.h"
16 
17 #include "gtest/gtest.h"
18 #include "pw_assert/check.h"
19 #include "pw_bytes/array.h"
20 #include "pw_containers/algorithm.h"
21 #include "pw_rpc/raw/test_method_context.h"
22 #include "pw_rpc/test_helpers.h"
23 #include "pw_thread/thread.h"
24 #include "pw_thread_stl/options.h"
25 #include "pw_transfer/transfer.pwpb.h"
26 #include "pw_transfer_private/chunk_testing.h"
27 
28 namespace pw::transfer::test {
29 namespace {
30 
31 using namespace std::chrono_literals;
32 
33 // TODO(frolv): Have a generic way to obtain a thread for testing on any system.
TransferThreadOptions()34 thread::Options& TransferThreadOptions() {
35   static thread::stl::Options options;
36   return options;
37 }
38 
39 using internal::Chunk;
40 
41 class TestMemoryReader : public stream::SeekableReader {
42  public:
TestMemoryReader(span<const std::byte> data)43   constexpr TestMemoryReader(span<const std::byte> data)
44       : memory_reader_(data) {}
45 
DoSeek(ptrdiff_t offset,Whence origin)46   Status DoSeek(ptrdiff_t offset, Whence origin) override {
47     if (seek_status.ok()) {
48       return memory_reader_.Seek(offset, origin);
49     }
50     return seek_status;
51   }
52 
DoRead(ByteSpan dest)53   StatusWithSize DoRead(ByteSpan dest) final {
54     if (!read_status.ok()) {
55       return StatusWithSize(read_status, 0);
56     }
57 
58     auto result = memory_reader_.Read(dest);
59     return result.ok() ? StatusWithSize(result->size())
60                        : StatusWithSize(result.status(), 0);
61   }
62 
63   Status seek_status;
64   Status read_status;
65 
66  private:
67   stream::MemoryReader memory_reader_;
68 };
69 
70 class SimpleReadTransfer final : public ReadOnlyHandler {
71  public:
SimpleReadTransfer(uint32_t session_id,ConstByteSpan data)72   SimpleReadTransfer(uint32_t session_id, ConstByteSpan data)
73       : ReadOnlyHandler(session_id),
74         prepare_read_called(false),
75         finalize_read_called(false),
76         finalize_read_status(Status::Unknown()),
77         reader_(data) {}
78 
PrepareRead()79   Status PrepareRead() final {
80     prepare_read_called = true;
81 
82     if (!prepare_read_return_status.ok()) {
83       return prepare_read_return_status;
84     }
85 
86     EXPECT_EQ(reader_.seek_status, reader_.Seek(0));
87     set_reader(reader_);
88     return OkStatus();
89   }
90 
FinalizeRead(Status status)91   void FinalizeRead(Status status) final {
92     finalize_read_called = true;
93     finalize_read_status = status;
94   }
95 
set_seek_status(Status status)96   void set_seek_status(Status status) { reader_.seek_status = status; }
set_read_status(Status status)97   void set_read_status(Status status) { reader_.read_status = status; }
98 
99   bool prepare_read_called;
100   bool finalize_read_called;
101   Status prepare_read_return_status;
102   Status finalize_read_status;
103 
104  private:
105   TestMemoryReader reader_;
106 };
107 
__anon88b3d5bc0202(size_t i) 108 constexpr auto kData = bytes::Initialized<32>([](size_t i) { return i; });
109 
110 class ReadTransfer : public ::testing::Test {
111  protected:
ReadTransfer(size_t max_chunk_size_bytes=64)112   ReadTransfer(size_t max_chunk_size_bytes = 64)
113       : handler_(3, kData),
114         transfer_thread_(span(data_buffer_).first(max_chunk_size_bytes),
115                          encode_buffer_),
116         ctx_(transfer_thread_, 64),
117         system_thread_(TransferThreadOptions(), transfer_thread_) {
118     ctx_.service().RegisterHandler(handler_);
119 
120     PW_CHECK(!handler_.prepare_read_called);
121     PW_CHECK(!handler_.finalize_read_called);
122 
123     ctx_.call();  // Open the read stream
124     transfer_thread_.WaitUntilEventIsProcessed();
125   }
126 
~ReadTransfer()127   ~ReadTransfer() override {
128     transfer_thread_.Terminate();
129     system_thread_.join();
130   }
131 
132   SimpleReadTransfer handler_;
133   Thread<1, 1> transfer_thread_;
134   PW_RAW_TEST_METHOD_CONTEXT(TransferService, Read) ctx_;
135   thread::Thread system_thread_;
136   std::array<std::byte, 64> data_buffer_;
137   std::array<std::byte, 64> encode_buffer_;
138 };
139 
TEST_F(ReadTransfer,SingleChunk)140 TEST_F(ReadTransfer, SingleChunk) {
141   rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
142     ctx_.SendClientStream(
143         EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
144                         .set_session_id(3)
145                         .set_window_end_offset(64)
146                         .set_offset(0)));
147 
148     transfer_thread_.WaitUntilEventIsProcessed();
149   });
150 
151   EXPECT_TRUE(handler_.prepare_read_called);
152   EXPECT_FALSE(handler_.finalize_read_called);
153 
154   ASSERT_EQ(ctx_.total_responses(), 2u);
155   Chunk c0 = DecodeChunk(ctx_.responses()[0]);
156   Chunk c1 = DecodeChunk(ctx_.responses()[1]);
157 
158   // First chunk should have all the read data.
159   EXPECT_EQ(c0.session_id(), 3u);
160   EXPECT_EQ(c0.offset(), 0u);
161   ASSERT_EQ(c0.payload().size(), kData.size());
162   EXPECT_EQ(std::memcmp(c0.payload().data(), kData.data(), c0.payload().size()),
163             0);
164 
165   // Second chunk should be empty and set remaining_bytes = 0.
166   EXPECT_EQ(c1.session_id(), 3u);
167   EXPECT_FALSE(c1.has_payload());
168   ASSERT_TRUE(c1.remaining_bytes().has_value());
169   EXPECT_EQ(c1.remaining_bytes().value(), 0u);
170 
171   ctx_.SendClientStream(
172       EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
173   transfer_thread_.WaitUntilEventIsProcessed();
174 
175   EXPECT_TRUE(handler_.finalize_read_called);
176   EXPECT_EQ(handler_.finalize_read_status, OkStatus());
177 }
178 
TEST_F(ReadTransfer,MultiChunk)179 TEST_F(ReadTransfer, MultiChunk) {
180   ctx_.SendClientStream(
181       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
182                       .set_session_id(3)
183                       .set_window_end_offset(16)
184                       .set_offset(0)));
185 
186   transfer_thread_.WaitUntilEventIsProcessed();
187 
188   EXPECT_TRUE(handler_.prepare_read_called);
189   EXPECT_FALSE(handler_.finalize_read_called);
190 
191   ASSERT_EQ(ctx_.total_responses(), 1u);
192   Chunk c0 = DecodeChunk(ctx_.responses().back());
193 
194   EXPECT_EQ(c0.session_id(), 3u);
195   EXPECT_EQ(c0.offset(), 0u);
196   ASSERT_EQ(c0.payload().size(), 16u);
197   EXPECT_EQ(std::memcmp(c0.payload().data(), kData.data(), c0.payload().size()),
198             0);
199 
200   ctx_.SendClientStream(EncodeChunk(
201       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersContinue)
202           .set_session_id(3)
203           .set_window_end_offset(32)
204           .set_offset(16)));
205   transfer_thread_.WaitUntilEventIsProcessed();
206 
207   ASSERT_EQ(ctx_.total_responses(), 2u);
208   Chunk c1 = DecodeChunk(ctx_.responses().back());
209 
210   EXPECT_EQ(c1.session_id(), 3u);
211   EXPECT_EQ(c1.offset(), 16u);
212   ASSERT_EQ(c1.payload().size(), 16u);
213   EXPECT_EQ(
214       std::memcmp(c1.payload().data(), kData.data() + 16, c1.payload().size()),
215       0);
216 
217   ctx_.SendClientStream(EncodeChunk(
218       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersContinue)
219           .set_session_id(3)
220           .set_window_end_offset(48)
221           .set_offset(32)));
222   transfer_thread_.WaitUntilEventIsProcessed();
223 
224   ASSERT_EQ(ctx_.total_responses(), 3u);
225   Chunk c2 = DecodeChunk(ctx_.responses()[2]);
226 
227   EXPECT_EQ(c2.session_id(), 3u);
228   EXPECT_FALSE(c2.has_payload());
229   ASSERT_TRUE(c2.remaining_bytes().has_value());
230   EXPECT_EQ(c2.remaining_bytes().value(), 0u);
231 
232   ctx_.SendClientStream(
233       EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
234   transfer_thread_.WaitUntilEventIsProcessed();
235 
236   EXPECT_TRUE(handler_.finalize_read_called);
237   EXPECT_EQ(handler_.finalize_read_status, OkStatus());
238 }
239 
TEST_F(ReadTransfer,MultiChunk_RepeatedContinuePackets)240 TEST_F(ReadTransfer, MultiChunk_RepeatedContinuePackets) {
241   ctx_.SendClientStream(
242       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
243                       .set_session_id(3)
244                       .set_window_end_offset(16)
245                       .set_offset(0)));
246 
247   transfer_thread_.WaitUntilEventIsProcessed();
248 
249   const auto continue_chunk = EncodeChunk(
250       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersContinue)
251           .set_session_id(3)
252           .set_window_end_offset(24)
253           .set_offset(16));
254   ctx_.SendClientStream(continue_chunk);
255 
256   transfer_thread_.WaitUntilEventIsProcessed();
257 
258   // Resend the CONTINUE packets that don't actually advance the window.
259   for (int i = 0; i < 3; ++i) {
260     ctx_.SendClientStream(continue_chunk);
261     transfer_thread_.WaitUntilEventIsProcessed();
262   }
263 
264   ASSERT_EQ(ctx_.total_responses(), 2u);  // Only sent one packet
265   Chunk c1 = DecodeChunk(ctx_.responses()[1]);
266 
267   EXPECT_EQ(c1.session_id(), 3u);
268   EXPECT_EQ(c1.offset(), 16u);
269   ASSERT_EQ(c1.payload().size(), 8u);
270   EXPECT_EQ(
271       std::memcmp(c1.payload().data(), kData.data() + 16, c1.payload().size()),
272       0);
273 }
274 
TEST_F(ReadTransfer,OutOfOrder_SeekingSupported)275 TEST_F(ReadTransfer, OutOfOrder_SeekingSupported) {
276   rpc::test::WaitForPackets(ctx_.output(), 4, [this] {
277     ctx_.SendClientStream(
278         EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
279                         .set_session_id(3)
280                         .set_window_end_offset(16)
281                         .set_offset(0)));
282 
283     transfer_thread_.WaitUntilEventIsProcessed();
284 
285     Chunk chunk = DecodeChunk(ctx_.responses().back());
286     EXPECT_TRUE(pw::containers::Equal(span(kData).first(16), chunk.payload()));
287 
288     ctx_.SendClientStream(EncodeChunk(
289         Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
290             .set_session_id(3)
291             .set_window_end_offset(10)
292             .set_offset(2)));
293 
294     transfer_thread_.WaitUntilEventIsProcessed();
295 
296     chunk = DecodeChunk(ctx_.responses().back());
297     EXPECT_TRUE(
298         pw::containers::Equal(span(kData).subspan(2, 8), chunk.payload()));
299 
300     ctx_.SendClientStream(EncodeChunk(
301         Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
302             .set_session_id(3)
303             .set_window_end_offset(64)
304             .set_offset(17)));
305   });
306 
307   ASSERT_EQ(ctx_.total_responses(), 4u);
308   Chunk chunk = DecodeChunk(ctx_.responses()[2]);
309   EXPECT_TRUE(
310       pw::containers::Equal(span(&kData[17], kData.end()), chunk.payload()));
311 }
312 
TEST_F(ReadTransfer,OutOfOrder_SeekingNotSupported_EndsWithUnimplemented)313 TEST_F(ReadTransfer, OutOfOrder_SeekingNotSupported_EndsWithUnimplemented) {
314   handler_.set_seek_status(Status::Unimplemented());
315 
316   ctx_.SendClientStream(
317       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
318                       .set_session_id(3)
319                       .set_window_end_offset(16)
320                       .set_offset(0)));
321   ctx_.SendClientStream(EncodeChunk(
322       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
323           .set_session_id(3)
324           .set_window_end_offset(10)
325           .set_offset(2)));
326 
327   transfer_thread_.WaitUntilEventIsProcessed();
328 
329   ASSERT_EQ(ctx_.total_responses(), 2u);
330   Chunk chunk = DecodeChunk(ctx_.responses().back());
331   ASSERT_TRUE(chunk.status().has_value());
332   EXPECT_EQ(chunk.status().value(), Status::Unimplemented());
333 }
334 
TEST_F(ReadTransfer,MaxChunkSize_Client)335 TEST_F(ReadTransfer, MaxChunkSize_Client) {
336   rpc::test::WaitForPackets(ctx_.output(), 5, [this] {
337     ctx_.SendClientStream(
338         EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
339                         .set_session_id(3)
340                         .set_window_end_offset(64)
341                         .set_max_chunk_size_bytes(8)
342                         .set_offset(0)));
343   });
344 
345   EXPECT_TRUE(handler_.prepare_read_called);
346   EXPECT_FALSE(handler_.finalize_read_called);
347 
348   ASSERT_EQ(ctx_.total_responses(), 5u);
349   Chunk c0 = DecodeChunk(ctx_.responses()[0]);
350   Chunk c1 = DecodeChunk(ctx_.responses()[1]);
351   Chunk c2 = DecodeChunk(ctx_.responses()[2]);
352   Chunk c3 = DecodeChunk(ctx_.responses()[3]);
353   Chunk c4 = DecodeChunk(ctx_.responses()[4]);
354 
355   EXPECT_EQ(c0.session_id(), 3u);
356   EXPECT_EQ(c0.offset(), 0u);
357   ASSERT_EQ(c0.payload().size(), 8u);
358   EXPECT_EQ(std::memcmp(c0.payload().data(), kData.data(), c0.payload().size()),
359             0);
360 
361   EXPECT_EQ(c1.session_id(), 3u);
362   EXPECT_EQ(c1.offset(), 8u);
363   ASSERT_EQ(c1.payload().size(), 8u);
364   EXPECT_EQ(
365       std::memcmp(c1.payload().data(), kData.data() + 8, c1.payload().size()),
366       0);
367 
368   EXPECT_EQ(c2.session_id(), 3u);
369   EXPECT_EQ(c2.offset(), 16u);
370   ASSERT_EQ(c2.payload().size(), 8u);
371   EXPECT_EQ(
372       std::memcmp(c2.payload().data(), kData.data() + 16, c2.payload().size()),
373       0);
374 
375   EXPECT_EQ(c3.session_id(), 3u);
376   EXPECT_EQ(c3.offset(), 24u);
377   ASSERT_EQ(c3.payload().size(), 8u);
378   EXPECT_EQ(
379       std::memcmp(c3.payload().data(), kData.data() + 24, c3.payload().size()),
380       0);
381 
382   EXPECT_EQ(c4.session_id(), 3u);
383   EXPECT_EQ(c4.payload().size(), 0u);
384   ASSERT_TRUE(c4.remaining_bytes().has_value());
385   EXPECT_EQ(c4.remaining_bytes().value(), 0u);
386 
387   ctx_.SendClientStream(
388       EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
389   transfer_thread_.WaitUntilEventIsProcessed();
390 
391   EXPECT_TRUE(handler_.finalize_read_called);
392   EXPECT_EQ(handler_.finalize_read_status, OkStatus());
393 }
394 
TEST_F(ReadTransfer,HandlerIsClearedAfterTransfer)395 TEST_F(ReadTransfer, HandlerIsClearedAfterTransfer) {
396   ctx_.SendClientStream(
397       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
398                       .set_session_id(3)
399                       .set_window_end_offset(64)
400                       .set_offset(0)));
401   ctx_.SendClientStream(
402       EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
403   transfer_thread_.WaitUntilEventIsProcessed();
404 
405   ASSERT_EQ(ctx_.total_responses(), 1u);
406   ASSERT_TRUE(handler_.prepare_read_called);
407   ASSERT_TRUE(handler_.finalize_read_called);
408   ASSERT_EQ(OkStatus(), handler_.finalize_read_status);
409 
410   // Now, clear state and start a second transfer
411   handler_.prepare_read_return_status = Status::FailedPrecondition();
412   handler_.prepare_read_called = false;
413   handler_.finalize_read_called = false;
414 
415   ctx_.SendClientStream(
416       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
417                       .set_session_id(3)
418                       .set_window_end_offset(64)
419                       .set_offset(0)));
420   transfer_thread_.WaitUntilEventIsProcessed();
421 
422   // Prepare failed, so the handler should not have been stored in the context,
423   // and finalize should not have been called.
424   ASSERT_TRUE(handler_.prepare_read_called);
425   ASSERT_FALSE(handler_.finalize_read_called);
426 }
427 
428 class ReadTransferMaxChunkSize8 : public ReadTransfer {
429  protected:
ReadTransferMaxChunkSize8()430   ReadTransferMaxChunkSize8() : ReadTransfer(/*max_chunk_size_bytes=*/8) {}
431 };
432 
TEST_F(ReadTransferMaxChunkSize8,MaxChunkSize_Server)433 TEST_F(ReadTransferMaxChunkSize8, MaxChunkSize_Server) {
434   // Client asks for max 16-byte chunks, but service places a limit of 8 bytes.
435   // TODO(frolv): Fix
436   rpc::test::WaitForPackets(ctx_.output(), 5, [this] {
437     ctx_.SendClientStream(
438         EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
439                         .set_session_id(3)
440                         .set_window_end_offset(64)
441                         // .set_max_chunk_size_bytes(16)
442                         .set_offset(0)));
443   });
444 
445   EXPECT_TRUE(handler_.prepare_read_called);
446   EXPECT_FALSE(handler_.finalize_read_called);
447 
448   ASSERT_EQ(ctx_.total_responses(), 5u);
449   Chunk c0 = DecodeChunk(ctx_.responses()[0]);
450   Chunk c1 = DecodeChunk(ctx_.responses()[1]);
451   Chunk c2 = DecodeChunk(ctx_.responses()[2]);
452   Chunk c3 = DecodeChunk(ctx_.responses()[3]);
453   Chunk c4 = DecodeChunk(ctx_.responses()[4]);
454 
455   EXPECT_EQ(c0.session_id(), 3u);
456   EXPECT_EQ(c0.offset(), 0u);
457   ASSERT_EQ(c0.payload().size(), 8u);
458   EXPECT_EQ(std::memcmp(c0.payload().data(), kData.data(), c0.payload().size()),
459             0);
460 
461   EXPECT_EQ(c1.session_id(), 3u);
462   EXPECT_EQ(c1.offset(), 8u);
463   ASSERT_EQ(c1.payload().size(), 8u);
464   EXPECT_EQ(
465       std::memcmp(c1.payload().data(), kData.data() + 8, c1.payload().size()),
466       0);
467 
468   EXPECT_EQ(c2.session_id(), 3u);
469   EXPECT_EQ(c2.offset(), 16u);
470   ASSERT_EQ(c2.payload().size(), 8u);
471   EXPECT_EQ(
472       std::memcmp(c2.payload().data(), kData.data() + 16, c2.payload().size()),
473       0);
474 
475   EXPECT_EQ(c3.session_id(), 3u);
476   EXPECT_EQ(c3.offset(), 24u);
477   ASSERT_EQ(c3.payload().size(), 8u);
478   EXPECT_EQ(
479       std::memcmp(c3.payload().data(), kData.data() + 24, c3.payload().size()),
480       0);
481 
482   EXPECT_EQ(c4.session_id(), 3u);
483   EXPECT_EQ(c4.payload().size(), 0u);
484   ASSERT_TRUE(c4.remaining_bytes().has_value());
485   EXPECT_EQ(c4.remaining_bytes().value(), 0u);
486 
487   ctx_.SendClientStream(
488       EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
489   transfer_thread_.WaitUntilEventIsProcessed();
490 
491   EXPECT_TRUE(handler_.finalize_read_called);
492   EXPECT_EQ(handler_.finalize_read_status, OkStatus());
493 }
494 
TEST_F(ReadTransfer,ClientError)495 TEST_F(ReadTransfer, ClientError) {
496   ctx_.SendClientStream(
497       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
498                       .set_session_id(3)
499                       .set_window_end_offset(16)
500                       .set_offset(0)));
501 
502   transfer_thread_.WaitUntilEventIsProcessed();
503 
504   EXPECT_TRUE(handler_.prepare_read_called);
505   EXPECT_FALSE(handler_.finalize_read_called);
506   ASSERT_EQ(ctx_.total_responses(), 1u);
507 
508   // Send client error.
509   ctx_.SendClientStream(EncodeChunk(
510       Chunk::Final(ProtocolVersion::kLegacy, 3, Status::OutOfRange())));
511   transfer_thread_.WaitUntilEventIsProcessed();
512 
513   ASSERT_EQ(ctx_.total_responses(), 1u);
514   EXPECT_TRUE(handler_.finalize_read_called);
515   EXPECT_EQ(handler_.finalize_read_status, Status::OutOfRange());
516 }
517 
TEST_F(ReadTransfer,UnregisteredHandler)518 TEST_F(ReadTransfer, UnregisteredHandler) {
519   ctx_.SendClientStream(
520       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
521                       .set_session_id(11)
522                       .set_window_end_offset(32)
523                       .set_offset(0)));
524   transfer_thread_.WaitUntilEventIsProcessed();
525 
526   ASSERT_EQ(ctx_.total_responses(), 1u);
527   Chunk chunk = DecodeChunk(ctx_.responses().back());
528   EXPECT_EQ(chunk.session_id(), 11u);
529   ASSERT_TRUE(chunk.status().has_value());
530   EXPECT_EQ(chunk.status().value(), Status::NotFound());
531 }
532 
TEST_F(ReadTransfer,IgnoresNonPendingTransfers)533 TEST_F(ReadTransfer, IgnoresNonPendingTransfers) {
534   ctx_.SendClientStream(EncodeChunk(
535       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
536           .set_session_id(3)
537           .set_window_end_offset(32)
538           .set_offset(3)));
539   ctx_.SendClientStream(
540       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
541                       .set_session_id(3)
542                       .set_payload(span(kData).first(10))
543                       .set_offset(3)));
544   ctx_.SendClientStream(
545       EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
546   transfer_thread_.WaitUntilEventIsProcessed();
547 
548   // Only start transfer for an initial packet.
549   EXPECT_FALSE(handler_.prepare_read_called);
550   EXPECT_FALSE(handler_.finalize_read_called);
551 }
552 
TEST_F(ReadTransfer,AbortAndRestartIfInitialPacketIsReceived)553 TEST_F(ReadTransfer, AbortAndRestartIfInitialPacketIsReceived) {
554   ctx_.SendClientStream(
555       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
556                       .set_session_id(3)
557                       .set_window_end_offset(16)
558                       .set_offset(0)));
559   transfer_thread_.WaitUntilEventIsProcessed();
560 
561   ASSERT_EQ(ctx_.total_responses(), 1u);
562 
563   EXPECT_TRUE(handler_.prepare_read_called);
564   EXPECT_FALSE(handler_.finalize_read_called);
565   handler_.prepare_read_called = false;  // Reset so can check if called again.
566 
567   ctx_.SendClientStream(  // Resend starting chunk
568       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
569                       .set_session_id(3)
570                       .set_window_end_offset(16)
571                       .set_offset(0)));
572   transfer_thread_.WaitUntilEventIsProcessed();
573 
574   ASSERT_EQ(ctx_.total_responses(), 2u);
575 
576   EXPECT_TRUE(handler_.prepare_read_called);
577   EXPECT_TRUE(handler_.finalize_read_called);
578   EXPECT_EQ(handler_.finalize_read_status, Status::Aborted());
579   handler_.finalize_read_called = false;  // Reset so can check later
580 
581   ctx_.SendClientStream(EncodeChunk(
582       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
583           .set_session_id(3)
584           .set_window_end_offset(32)
585           .set_offset(16)));
586   ctx_.SendClientStream(
587       EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
588   transfer_thread_.WaitUntilEventIsProcessed();
589 
590   ASSERT_EQ(ctx_.total_responses(), 3u);
591   EXPECT_TRUE(handler_.finalize_read_called);
592   EXPECT_EQ(handler_.finalize_read_status, OkStatus());
593 }
594 
TEST_F(ReadTransfer,ZeroPendingBytesWithRemainingData_Aborts)595 TEST_F(ReadTransfer, ZeroPendingBytesWithRemainingData_Aborts) {
596   ctx_.SendClientStream(
597       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
598                       .set_session_id(3)
599                       .set_window_end_offset(0)
600                       .set_offset(0)));
601   transfer_thread_.WaitUntilEventIsProcessed();
602 
603   ASSERT_EQ(ctx_.total_responses(), 1u);
604   ASSERT_TRUE(handler_.finalize_read_called);
605   EXPECT_EQ(handler_.finalize_read_status, Status::ResourceExhausted());
606 
607   Chunk chunk = DecodeChunk(ctx_.responses().back());
608   EXPECT_EQ(chunk.status(), Status::ResourceExhausted());
609 }
610 
TEST_F(ReadTransfer,ZeroPendingBytesNoRemainingData_Completes)611 TEST_F(ReadTransfer, ZeroPendingBytesNoRemainingData_Completes) {
612   // Make the next read appear to be the end of the stream.
613   handler_.set_read_status(Status::OutOfRange());
614 
615   ctx_.SendClientStream(
616       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
617                       .set_session_id(3)
618                       .set_window_end_offset(0)
619                       .set_offset(0)));
620   transfer_thread_.WaitUntilEventIsProcessed();
621 
622   Chunk chunk = DecodeChunk(ctx_.responses().back());
623   EXPECT_EQ(chunk.session_id(), 3u);
624   EXPECT_EQ(chunk.remaining_bytes(), 0u);
625 
626   ctx_.SendClientStream(
627       EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
628   transfer_thread_.WaitUntilEventIsProcessed();
629 
630   ASSERT_EQ(ctx_.total_responses(), 1u);
631   ASSERT_TRUE(handler_.finalize_read_called);
632   EXPECT_EQ(handler_.finalize_read_status, OkStatus());
633 }
634 
TEST_F(ReadTransfer,SendsErrorIfChunkIsReceivedInCompletedState)635 TEST_F(ReadTransfer, SendsErrorIfChunkIsReceivedInCompletedState) {
636   rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
637     ctx_.SendClientStream(
638         EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
639                         .set_session_id(3)
640                         .set_window_end_offset(64)
641                         .set_offset(0)));
642   });
643 
644   EXPECT_TRUE(handler_.prepare_read_called);
645   EXPECT_FALSE(handler_.finalize_read_called);
646 
647   ASSERT_EQ(ctx_.total_responses(), 2u);
648   Chunk c0 = DecodeChunk(ctx_.responses()[0]);
649   Chunk c1 = DecodeChunk(ctx_.responses()[1]);
650 
651   // First chunk should have all the read data.
652   EXPECT_EQ(c0.session_id(), 3u);
653   EXPECT_EQ(c0.offset(), 0u);
654   ASSERT_EQ(c0.payload().size(), kData.size());
655   EXPECT_EQ(std::memcmp(c0.payload().data(), kData.data(), c0.payload().size()),
656             0);
657 
658   // Second chunk should be empty and set remaining_bytes = 0.
659   EXPECT_EQ(c1.session_id(), 3u);
660   EXPECT_FALSE(c1.has_payload());
661   ASSERT_TRUE(c1.remaining_bytes().has_value());
662   EXPECT_EQ(c1.remaining_bytes().value(), 0u);
663 
664   ctx_.SendClientStream(
665       EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
666   transfer_thread_.WaitUntilEventIsProcessed();
667 
668   EXPECT_TRUE(handler_.finalize_read_called);
669   EXPECT_EQ(handler_.finalize_read_status, OkStatus());
670 
671   // At this point the transfer should be in a completed state. Send a
672   // non-initial chunk as a continuation of the transfer.
673   handler_.finalize_read_called = false;
674 
675   ctx_.SendClientStream(EncodeChunk(
676       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
677           .set_session_id(3)
678           .set_window_end_offset(64)
679           .set_offset(16)));
680   transfer_thread_.WaitUntilEventIsProcessed();
681 
682   ASSERT_EQ(ctx_.total_responses(), 3u);
683 
684   Chunk c2 = DecodeChunk(ctx_.responses()[2]);
685   ASSERT_TRUE(c2.status().has_value());
686   EXPECT_EQ(c2.status().value(), Status::FailedPrecondition());
687 
688   // FinalizeRead should not be called again.
689   EXPECT_FALSE(handler_.finalize_read_called);
690 }
691 
692 class SimpleWriteTransfer final : public WriteOnlyHandler {
693  public:
SimpleWriteTransfer(uint32_t session_id,ByteSpan data)694   SimpleWriteTransfer(uint32_t session_id, ByteSpan data)
695       : WriteOnlyHandler(session_id),
696         prepare_write_called(false),
697         finalize_write_called(false),
698         finalize_write_status(Status::Unknown()),
699         writer_(data) {}
700 
PrepareWrite()701   Status PrepareWrite() final {
702     EXPECT_EQ(OkStatus(), writer_.Seek(0));
703     set_writer(writer_);
704     prepare_write_called = true;
705     return OkStatus();
706   }
707 
FinalizeWrite(Status status)708   Status FinalizeWrite(Status status) final {
709     finalize_write_called = true;
710     finalize_write_status = status;
711     return finalize_write_return_status_;
712   }
713 
set_finalize_write_return(Status status)714   void set_finalize_write_return(Status status) {
715     finalize_write_return_status_ = status;
716   }
717 
718   bool prepare_write_called;
719   bool finalize_write_called;
720   Status finalize_write_status;
721 
722  private:
723   Status finalize_write_return_status_;
724   stream::MemoryWriter writer_;
725 };
726 
727 class WriteTransfer : public ::testing::Test {
728  protected:
WriteTransfer(size_t max_bytes_to_receive=64)729   WriteTransfer(size_t max_bytes_to_receive = 64)
730       : buffer{},
731         handler_(7, buffer),
732         transfer_thread_(data_buffer_, encode_buffer_),
733         system_thread_(TransferThreadOptions(), transfer_thread_),
734         ctx_(transfer_thread_,
735              max_bytes_to_receive,
736              // Use a long timeout to avoid accidentally triggering timeouts.
737              std::chrono::minutes(1)) {
738     ctx_.service().RegisterHandler(handler_);
739 
740     PW_CHECK(!handler_.prepare_write_called);
741     PW_CHECK(!handler_.finalize_write_called);
742 
743     ctx_.call();  // Open the write stream
744     transfer_thread_.WaitUntilEventIsProcessed();
745   }
746 
~WriteTransfer()747   ~WriteTransfer() override {
748     transfer_thread_.Terminate();
749     system_thread_.join();
750   }
751 
752   std::array<std::byte, kData.size()> buffer;
753   SimpleWriteTransfer handler_;
754 
755   Thread<1, 1> transfer_thread_;
756   thread::Thread system_thread_;
757   std::array<std::byte, 64> data_buffer_;
758   std::array<std::byte, 64> encode_buffer_;
759   PW_RAW_TEST_METHOD_CONTEXT(TransferService, Write) ctx_;
760 };
761 
TEST_F(WriteTransfer,SingleChunk)762 TEST_F(WriteTransfer, SingleChunk) {
763   ctx_.SendClientStream(EncodeChunk(
764       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
765   transfer_thread_.WaitUntilEventIsProcessed();
766 
767   EXPECT_TRUE(handler_.prepare_write_called);
768   EXPECT_FALSE(handler_.finalize_write_called);
769 
770   ASSERT_EQ(ctx_.total_responses(), 1u);
771   Chunk chunk = DecodeChunk(ctx_.responses().back());
772   EXPECT_EQ(chunk.session_id(), 7u);
773   EXPECT_EQ(chunk.window_end_offset(), 32u);
774   ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
775   EXPECT_EQ(chunk.max_chunk_size_bytes().value(), 37u);
776 
777   ctx_.SendClientStream<64>(
778       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
779                       .set_session_id(7)
780                       .set_offset(0)
781                       .set_payload(kData)
782                       .set_remaining_bytes(0)));
783   transfer_thread_.WaitUntilEventIsProcessed();
784 
785   ASSERT_EQ(ctx_.total_responses(), 2u);
786   chunk = DecodeChunk(ctx_.responses().back());
787   EXPECT_EQ(chunk.session_id(), 7u);
788   ASSERT_TRUE(chunk.status().has_value());
789   EXPECT_EQ(chunk.status().value(), OkStatus());
790 
791   EXPECT_TRUE(handler_.finalize_write_called);
792   EXPECT_EQ(handler_.finalize_write_status, OkStatus());
793   EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
794 }
795 
TEST_F(WriteTransfer,FinalizeFails)796 TEST_F(WriteTransfer, FinalizeFails) {
797   // Return an error when FinalizeWrite is called.
798   handler_.set_finalize_write_return(Status::FailedPrecondition());
799 
800   ctx_.SendClientStream(EncodeChunk(
801       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
802   ctx_.SendClientStream<64>(
803       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
804                       .set_session_id(7)
805                       .set_offset(0)
806                       .set_payload(kData)
807                       .set_remaining_bytes(0)));
808   transfer_thread_.WaitUntilEventIsProcessed();
809 
810   ASSERT_EQ(ctx_.total_responses(), 2u);
811   Chunk chunk = DecodeChunk(ctx_.responses()[1]);
812   EXPECT_EQ(chunk.session_id(), 7u);
813   ASSERT_TRUE(chunk.status().has_value());
814   EXPECT_EQ(chunk.status().value(), Status::DataLoss());
815 
816   EXPECT_TRUE(handler_.finalize_write_called);
817   EXPECT_EQ(handler_.finalize_write_status, OkStatus());
818 }
819 
TEST_F(WriteTransfer,SendingFinalPacketFails)820 TEST_F(WriteTransfer, SendingFinalPacketFails) {
821   ctx_.SendClientStream(EncodeChunk(
822       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
823   transfer_thread_.WaitUntilEventIsProcessed();
824 
825   ctx_.output().set_send_status(Status::Unknown());
826 
827   ctx_.SendClientStream<64>(
828       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
829                       .set_session_id(7)
830                       .set_offset(0)
831                       .set_payload(kData)
832                       .set_remaining_bytes(0)));
833   transfer_thread_.WaitUntilEventIsProcessed();
834 
835   // Should only have sent the transfer parameters.
836   ASSERT_EQ(ctx_.total_responses(), 1u);
837   Chunk chunk = DecodeChunk(ctx_.responses()[0]);
838   EXPECT_EQ(chunk.session_id(), 7u);
839   EXPECT_EQ(chunk.window_end_offset(), 32u);
840   ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
841   EXPECT_EQ(chunk.max_chunk_size_bytes().value(), 37u);
842 
843   // When FinalizeWrite() was called, the transfer was considered successful.
844   EXPECT_TRUE(handler_.finalize_write_called);
845   EXPECT_EQ(handler_.finalize_write_status, OkStatus());
846 }
847 
TEST_F(WriteTransfer,MultiChunk)848 TEST_F(WriteTransfer, MultiChunk) {
849   ctx_.SendClientStream(EncodeChunk(
850       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
851   transfer_thread_.WaitUntilEventIsProcessed();
852 
853   EXPECT_TRUE(handler_.prepare_write_called);
854   EXPECT_FALSE(handler_.finalize_write_called);
855 
856   ASSERT_EQ(ctx_.total_responses(), 1u);
857   Chunk chunk = DecodeChunk(ctx_.responses()[0]);
858   EXPECT_EQ(chunk.session_id(), 7u);
859   EXPECT_EQ(chunk.window_end_offset(), 32u);
860 
861   ctx_.SendClientStream<64>(
862       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
863                       .set_session_id(7)
864                       .set_offset(0)
865                       .set_payload(span(kData).first(8))));
866   transfer_thread_.WaitUntilEventIsProcessed();
867 
868   ASSERT_EQ(ctx_.total_responses(), 1u);
869 
870   ctx_.SendClientStream<64>(
871       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
872                       .set_session_id(7)
873                       .set_offset(8)
874                       .set_payload(span(kData).subspan(8))
875                       .set_remaining_bytes(0)));
876   transfer_thread_.WaitUntilEventIsProcessed();
877 
878   ASSERT_EQ(ctx_.total_responses(), 2u);
879   chunk = DecodeChunk(ctx_.responses().back());
880   EXPECT_EQ(chunk.session_id(), 7u);
881   ASSERT_TRUE(chunk.status().has_value());
882   EXPECT_EQ(chunk.status().value(), OkStatus());
883 
884   EXPECT_TRUE(handler_.finalize_write_called);
885   EXPECT_EQ(handler_.finalize_write_status, OkStatus());
886   EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
887 }
888 
TEST_F(WriteTransfer,WriteFailsOnRetry)889 TEST_F(WriteTransfer, WriteFailsOnRetry) {
890   // Skip one packet to fail on a retry.
891   ctx_.output().set_send_status(Status::FailedPrecondition(), 1);
892 
893   // Wait for 3 packets: initial params, retry attempt, final error
894   rpc::test::WaitForPackets(ctx_.output(), 3, [this] {
895     // Send only one client packet so the service times out.
896     ctx_.SendClientStream(
897         EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
898                         .set_session_id(7)));
899     transfer_thread_.SimulateServerTimeout(7);  // Time out to trigger retry
900   });
901 
902   // Attempted to send 3 packets, but the 2nd packet was dropped.
903   // Check that the last packet is an INTERNAL error from the RPC write failure.
904   ASSERT_EQ(ctx_.total_responses(), 2u);
905   Chunk chunk = DecodeChunk(ctx_.responses()[1]);
906   EXPECT_EQ(chunk.session_id(), 7u);
907   ASSERT_TRUE(chunk.status().has_value());
908   EXPECT_EQ(chunk.status().value(), Status::Internal());
909 }
910 
TEST_F(WriteTransfer,TimeoutInRecoveryState)911 TEST_F(WriteTransfer, TimeoutInRecoveryState) {
912   ctx_.SendClientStream(EncodeChunk(
913       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
914   transfer_thread_.WaitUntilEventIsProcessed();
915 
916   ASSERT_EQ(ctx_.total_responses(), 1u);
917   Chunk chunk = DecodeChunk(ctx_.responses().back());
918   EXPECT_EQ(chunk.session_id(), 7u);
919   EXPECT_EQ(chunk.offset(), 0u);
920   EXPECT_EQ(chunk.window_end_offset(), 32u);
921 
922   constexpr span data(kData);
923 
924   ctx_.SendClientStream(
925       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
926                       .set_session_id(7)
927                       .set_offset(0)
928                       .set_payload(data.first(8))));
929 
930   // Skip offset 8 to enter a recovery state.
931   ctx_.SendClientStream(
932       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
933                       .set_session_id(7)
934                       .set_offset(12)
935                       .set_payload(data.subspan(12, 4))));
936   transfer_thread_.WaitUntilEventIsProcessed();
937 
938   // Recovery parameters should be sent for offset 8.
939   ASSERT_EQ(ctx_.total_responses(), 2u);
940   chunk = DecodeChunk(ctx_.responses().back());
941   EXPECT_EQ(chunk.session_id(), 7u);
942   EXPECT_EQ(chunk.offset(), 8u);
943   EXPECT_EQ(chunk.window_end_offset(), 32u);
944 
945   // Timeout while in the recovery state.
946   transfer_thread_.SimulateServerTimeout(7);
947   transfer_thread_.WaitUntilEventIsProcessed();
948 
949   // Same recovery parameters should be re-sent.
950   ASSERT_EQ(ctx_.total_responses(), 3u);
951   chunk = DecodeChunk(ctx_.responses().back());
952   EXPECT_EQ(chunk.session_id(), 7u);
953   EXPECT_EQ(chunk.offset(), 8u);
954   EXPECT_EQ(chunk.window_end_offset(), 32u);
955 }
956 
TEST_F(WriteTransfer,ExtendWindow)957 TEST_F(WriteTransfer, ExtendWindow) {
958   ctx_.SendClientStream(EncodeChunk(
959       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
960   transfer_thread_.WaitUntilEventIsProcessed();
961 
962   EXPECT_TRUE(handler_.prepare_write_called);
963   EXPECT_FALSE(handler_.finalize_write_called);
964 
965   ASSERT_EQ(ctx_.total_responses(), 1u);
966   Chunk chunk = DecodeChunk(ctx_.responses().back());
967   EXPECT_EQ(chunk.session_id(), 7u);
968   EXPECT_EQ(chunk.window_end_offset(), 32u);
969 
970   // Window starts at 32 bytes and should extend when half of that is sent.
971   ctx_.SendClientStream(
972       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
973                       .set_session_id(7)
974                       .set_offset(0)
975                       .set_payload(span(kData).first(4))));
976   transfer_thread_.WaitUntilEventIsProcessed();
977   ASSERT_EQ(ctx_.total_responses(), 1u);
978 
979   ctx_.SendClientStream(
980       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
981                       .set_session_id(7)
982                       .set_offset(4)
983                       .set_payload(span(kData).subspan(4, 4))));
984   transfer_thread_.WaitUntilEventIsProcessed();
985   ASSERT_EQ(ctx_.total_responses(), 1u);
986 
987   ctx_.SendClientStream(
988       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
989                       .set_session_id(7)
990                       .set_offset(8)
991                       .set_payload(span(kData).subspan(8, 4))));
992   transfer_thread_.WaitUntilEventIsProcessed();
993   ASSERT_EQ(ctx_.total_responses(), 1u);
994 
995   ctx_.SendClientStream(
996       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
997                       .set_session_id(7)
998                       .set_offset(12)
999                       .set_payload(span(kData).subspan(12, 4))));
1000   transfer_thread_.WaitUntilEventIsProcessed();
1001   ASSERT_EQ(ctx_.total_responses(), 2u);
1002 
1003   // Extend parameters chunk.
1004   chunk = DecodeChunk(ctx_.responses().back());
1005   EXPECT_EQ(chunk.session_id(), 7u);
1006   EXPECT_EQ(chunk.window_end_offset(), 32u);
1007   EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
1008 
1009   ctx_.SendClientStream<64>(
1010       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1011                       .set_session_id(7)
1012                       .set_offset(16)
1013                       .set_payload(span(kData).subspan(16))
1014                       .set_remaining_bytes(0)));
1015   transfer_thread_.WaitUntilEventIsProcessed();
1016 
1017   ASSERT_EQ(ctx_.total_responses(), 3u);
1018   chunk = DecodeChunk(ctx_.responses()[2]);
1019   EXPECT_EQ(chunk.session_id(), 7u);
1020   ASSERT_TRUE(chunk.status().has_value());
1021   EXPECT_EQ(chunk.status().value(), OkStatus());
1022 
1023   EXPECT_TRUE(handler_.finalize_write_called);
1024   EXPECT_EQ(handler_.finalize_write_status, OkStatus());
1025   EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
1026 }
1027 
1028 class WriteTransferMaxBytes16 : public WriteTransfer {
1029  protected:
WriteTransferMaxBytes16()1030   WriteTransferMaxBytes16() : WriteTransfer(/*max_bytes_to_receive=*/16) {}
1031 };
1032 
TEST_F(WriteTransfer,TransmitterReducesWindow)1033 TEST_F(WriteTransfer, TransmitterReducesWindow) {
1034   ctx_.SendClientStream(EncodeChunk(
1035       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1036   transfer_thread_.WaitUntilEventIsProcessed();
1037 
1038   EXPECT_TRUE(handler_.prepare_write_called);
1039   EXPECT_FALSE(handler_.finalize_write_called);
1040 
1041   ASSERT_EQ(ctx_.total_responses(), 1u);
1042   Chunk chunk = DecodeChunk(ctx_.responses().back());
1043   EXPECT_EQ(chunk.session_id(), 7u);
1044   EXPECT_EQ(chunk.window_end_offset(), 32u);
1045 
1046   // Send only 12 bytes and set that as the new end offset.
1047   ctx_.SendClientStream<64>(
1048       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1049                       .set_session_id(7)
1050                       .set_offset(0)
1051                       .set_window_end_offset(12)
1052                       .set_payload(span(kData).first(12))));
1053   transfer_thread_.WaitUntilEventIsProcessed();
1054   ASSERT_EQ(ctx_.total_responses(), 2u);
1055 
1056   // Receiver should respond immediately with a retransmit chunk as the end of
1057   // the window has been reached.
1058   chunk = DecodeChunk(ctx_.responses().back());
1059   EXPECT_EQ(chunk.session_id(), 7u);
1060   EXPECT_EQ(chunk.offset(), 12u);
1061   EXPECT_EQ(chunk.window_end_offset(), 32u);
1062   EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
1063 }
1064 
TEST_F(WriteTransfer,TransmitterExtendsWindow_TerminatesWithInvalid)1065 TEST_F(WriteTransfer, TransmitterExtendsWindow_TerminatesWithInvalid) {
1066   ctx_.SendClientStream(EncodeChunk(
1067       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1068   transfer_thread_.WaitUntilEventIsProcessed();
1069 
1070   EXPECT_TRUE(handler_.prepare_write_called);
1071   EXPECT_FALSE(handler_.finalize_write_called);
1072 
1073   ASSERT_EQ(ctx_.total_responses(), 1u);
1074   Chunk chunk = DecodeChunk(ctx_.responses().back());
1075   EXPECT_EQ(chunk.session_id(), 7u);
1076   EXPECT_EQ(chunk.window_end_offset(), 32u);
1077 
1078   ctx_.SendClientStream<64>(
1079       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1080                       .set_session_id(7)
1081                       .set_offset(0)
1082                       // Larger window end offset than the receiver's.
1083                       .set_window_end_offset(48)
1084                       .set_payload(span(kData).first(16))));
1085   transfer_thread_.WaitUntilEventIsProcessed();
1086   ASSERT_EQ(ctx_.total_responses(), 2u);
1087 
1088   chunk = DecodeChunk(ctx_.responses().back());
1089   EXPECT_EQ(chunk.session_id(), 7u);
1090   ASSERT_TRUE(chunk.status().has_value());
1091   EXPECT_EQ(chunk.status().value(), Status::Internal());
1092 }
1093 
TEST_F(WriteTransferMaxBytes16,MultipleParameters)1094 TEST_F(WriteTransferMaxBytes16, MultipleParameters) {
1095   ctx_.SendClientStream(EncodeChunk(
1096       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1097   transfer_thread_.WaitUntilEventIsProcessed();
1098 
1099   EXPECT_TRUE(handler_.prepare_write_called);
1100   EXPECT_FALSE(handler_.finalize_write_called);
1101 
1102   ASSERT_EQ(ctx_.total_responses(), 1u);
1103   Chunk chunk = DecodeChunk(ctx_.responses().back());
1104   EXPECT_EQ(chunk.session_id(), 7u);
1105   EXPECT_EQ(chunk.window_end_offset(), 16u);
1106 
1107   ctx_.SendClientStream<64>(
1108       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1109                       .set_session_id(7)
1110                       .set_offset(0)
1111                       .set_payload(span(kData).first(8))));
1112   transfer_thread_.WaitUntilEventIsProcessed();
1113 
1114   ASSERT_EQ(ctx_.total_responses(), 2u);
1115   chunk = DecodeChunk(ctx_.responses().back());
1116   EXPECT_EQ(chunk.session_id(), 7u);
1117   EXPECT_EQ(chunk.offset(), 8u);
1118   EXPECT_EQ(chunk.window_end_offset(), 24u);
1119 
1120   ctx_.SendClientStream<64>(
1121       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1122                       .set_session_id(7)
1123                       .set_offset(8)
1124                       .set_payload(span(kData).subspan(8, 8))));
1125   transfer_thread_.WaitUntilEventIsProcessed();
1126 
1127   ASSERT_EQ(ctx_.total_responses(), 3u);
1128   chunk = DecodeChunk(ctx_.responses().back());
1129   EXPECT_EQ(chunk.session_id(), 7u);
1130   EXPECT_EQ(chunk.offset(), 16u);
1131   EXPECT_EQ(chunk.window_end_offset(), 32u);
1132 
1133   ctx_.SendClientStream<64>(
1134       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1135                       .set_session_id(7)
1136                       .set_offset(16)
1137                       .set_payload(span(kData).subspan(16, 8))));
1138   transfer_thread_.WaitUntilEventIsProcessed();
1139 
1140   ASSERT_EQ(ctx_.total_responses(), 4u);
1141   chunk = DecodeChunk(ctx_.responses().back());
1142   EXPECT_EQ(chunk.session_id(), 7u);
1143   EXPECT_EQ(chunk.offset(), 24u);
1144   EXPECT_EQ(chunk.window_end_offset(), 32u);
1145 
1146   ctx_.SendClientStream<64>(
1147       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1148                       .set_session_id(7)
1149                       .set_offset(24)
1150                       .set_payload(span(kData).subspan(24))
1151                       .set_remaining_bytes(0)));
1152   transfer_thread_.WaitUntilEventIsProcessed();
1153 
1154   ASSERT_EQ(ctx_.total_responses(), 5u);
1155   chunk = DecodeChunk(ctx_.responses().back());
1156   EXPECT_EQ(chunk.session_id(), 7u);
1157   ASSERT_TRUE(chunk.status().has_value());
1158   EXPECT_EQ(chunk.status().value(), OkStatus());
1159 
1160   EXPECT_TRUE(handler_.finalize_write_called);
1161   EXPECT_EQ(handler_.finalize_write_status, OkStatus());
1162   EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
1163 }
1164 
TEST_F(WriteTransferMaxBytes16,SetsDefaultWindowEndOffset)1165 TEST_F(WriteTransferMaxBytes16, SetsDefaultWindowEndOffset) {
1166   // Default max bytes is smaller than buffer.
1167   ctx_.SendClientStream(EncodeChunk(
1168       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1169   transfer_thread_.WaitUntilEventIsProcessed();
1170 
1171   ASSERT_EQ(ctx_.total_responses(), 1u);
1172   Chunk chunk = DecodeChunk(ctx_.responses().back());
1173   EXPECT_EQ(chunk.session_id(), 7u);
1174   EXPECT_EQ(chunk.window_end_offset(), 16u);
1175 }
1176 
TEST_F(WriteTransfer,SetsWriterWindowEndOffset)1177 TEST_F(WriteTransfer, SetsWriterWindowEndOffset) {
1178   // Buffer is smaller than constructor's default max bytes.
1179   std::array<std::byte, 8> small_buffer = {};
1180 
1181   SimpleWriteTransfer handler_(987, small_buffer);
1182   ctx_.service().RegisterHandler(handler_);
1183 
1184   ctx_.SendClientStream(
1185       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
1186                       .set_session_id(987)));
1187   transfer_thread_.WaitUntilEventIsProcessed();
1188 
1189   ASSERT_EQ(ctx_.total_responses(), 1u);
1190   Chunk chunk = DecodeChunk(ctx_.responses().back());
1191   EXPECT_EQ(chunk.session_id(), 987u);
1192   EXPECT_EQ(chunk.window_end_offset(), 8u);
1193 
1194   ctx_.service().UnregisterHandler(handler_);
1195 }
1196 
TEST_F(WriteTransfer,UnexpectedOffset)1197 TEST_F(WriteTransfer, UnexpectedOffset) {
1198   ctx_.SendClientStream(EncodeChunk(
1199       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1200   transfer_thread_.WaitUntilEventIsProcessed();
1201 
1202   EXPECT_TRUE(handler_.prepare_write_called);
1203   EXPECT_FALSE(handler_.finalize_write_called);
1204 
1205   ASSERT_EQ(ctx_.total_responses(), 1u);
1206   Chunk chunk = DecodeChunk(ctx_.responses().back());
1207   EXPECT_EQ(chunk.session_id(), 7u);
1208   EXPECT_EQ(chunk.offset(), 0u);
1209   EXPECT_EQ(chunk.window_end_offset(), 32u);
1210 
1211   ctx_.SendClientStream<64>(
1212       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1213                       .set_session_id(7)
1214                       .set_offset(0)
1215                       .set_payload(span(kData).first(8))));
1216   transfer_thread_.WaitUntilEventIsProcessed();
1217 
1218   ASSERT_EQ(ctx_.total_responses(), 1u);
1219 
1220   ctx_.SendClientStream<64>(
1221       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1222                       .set_session_id(7)
1223                       .set_offset(4)  // incorrect
1224                       .set_payload(span(kData).subspan(8))
1225                       .set_remaining_bytes(0)));
1226   transfer_thread_.WaitUntilEventIsProcessed();
1227 
1228   ASSERT_EQ(ctx_.total_responses(), 2u);
1229   chunk = DecodeChunk(ctx_.responses().back());
1230   EXPECT_EQ(chunk.session_id(), 7u);
1231   EXPECT_EQ(chunk.offset(), 8u);
1232   EXPECT_EQ(chunk.window_end_offset(), 32u);
1233 
1234   ctx_.SendClientStream<64>(
1235       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1236                       .set_session_id(7)
1237                       .set_offset(8)  // correct
1238                       .set_payload(span(kData).subspan(8))
1239                       .set_remaining_bytes(0)));
1240   transfer_thread_.WaitUntilEventIsProcessed();
1241 
1242   ASSERT_EQ(ctx_.total_responses(), 3u);
1243   chunk = DecodeChunk(ctx_.responses().back());
1244   EXPECT_EQ(chunk.session_id(), 7u);
1245   ASSERT_TRUE(chunk.status().has_value());
1246   EXPECT_EQ(chunk.status().value(), OkStatus());
1247 
1248   EXPECT_TRUE(handler_.finalize_write_called);
1249   EXPECT_EQ(handler_.finalize_write_status, OkStatus());
1250   EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
1251 }
1252 
TEST_F(WriteTransferMaxBytes16,TooMuchData)1253 TEST_F(WriteTransferMaxBytes16, TooMuchData) {
1254   ctx_.SendClientStream(EncodeChunk(
1255       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1256   transfer_thread_.WaitUntilEventIsProcessed();
1257 
1258   EXPECT_TRUE(handler_.prepare_write_called);
1259   EXPECT_FALSE(handler_.finalize_write_called);
1260 
1261   ASSERT_EQ(ctx_.total_responses(), 1u);
1262   Chunk chunk = DecodeChunk(ctx_.responses().back());
1263   EXPECT_EQ(chunk.session_id(), 7u);
1264   EXPECT_EQ(chunk.window_end_offset(), 16u);
1265 
1266   // window_end_offset = 16, but send 24 bytes of data.
1267   ctx_.SendClientStream<64>(
1268       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1269                       .set_session_id(7)
1270                       .set_offset(0)
1271                       .set_payload(span(kData).first(24))));
1272   transfer_thread_.WaitUntilEventIsProcessed();
1273 
1274   ASSERT_EQ(ctx_.total_responses(), 2u);
1275   chunk = DecodeChunk(ctx_.responses().back());
1276   EXPECT_EQ(chunk.session_id(), 7u);
1277   ASSERT_TRUE(chunk.status().has_value());
1278   EXPECT_EQ(chunk.status().value(), Status::Internal());
1279 }
1280 
TEST_F(WriteTransfer,UnregisteredHandler)1281 TEST_F(WriteTransfer, UnregisteredHandler) {
1282   ctx_.SendClientStream(
1283       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
1284                       .set_session_id(999)));
1285   transfer_thread_.WaitUntilEventIsProcessed();
1286 
1287   ASSERT_EQ(ctx_.total_responses(), 1u);
1288   Chunk chunk = DecodeChunk(ctx_.responses().back());
1289   EXPECT_EQ(chunk.session_id(), 999u);
1290   ASSERT_TRUE(chunk.status().has_value());
1291   EXPECT_EQ(chunk.status().value(), Status::NotFound());
1292 }
1293 
TEST_F(WriteTransfer,ClientError)1294 TEST_F(WriteTransfer, ClientError) {
1295   ctx_.SendClientStream(EncodeChunk(
1296       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1297   transfer_thread_.WaitUntilEventIsProcessed();
1298 
1299   EXPECT_TRUE(handler_.prepare_write_called);
1300   EXPECT_FALSE(handler_.finalize_write_called);
1301 
1302   ASSERT_EQ(ctx_.total_responses(), 1u);
1303   Chunk chunk = DecodeChunk(ctx_.responses().back());
1304   EXPECT_EQ(chunk.session_id(), 7u);
1305   EXPECT_EQ(chunk.window_end_offset(), 32u);
1306 
1307   ctx_.SendClientStream<64>(EncodeChunk(
1308       Chunk::Final(ProtocolVersion::kLegacy, 7, Status::DataLoss())));
1309   transfer_thread_.WaitUntilEventIsProcessed();
1310 
1311   EXPECT_EQ(ctx_.total_responses(), 1u);
1312 
1313   EXPECT_TRUE(handler_.finalize_write_called);
1314   EXPECT_EQ(handler_.finalize_write_status, Status::DataLoss());
1315 }
1316 
TEST_F(WriteTransfer,OnlySendParametersUpdateOnceAfterDrop)1317 TEST_F(WriteTransfer, OnlySendParametersUpdateOnceAfterDrop) {
1318   ctx_.SendClientStream(EncodeChunk(
1319       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1320   transfer_thread_.WaitUntilEventIsProcessed();
1321 
1322   ASSERT_EQ(ctx_.total_responses(), 1u);
1323 
1324   constexpr span data(kData);
1325   ctx_.SendClientStream<64>(
1326       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1327                       .set_session_id(7)
1328                       .set_offset(0)
1329                       .set_payload(data.first(1))));
1330 
1331   // Drop offset 1, then send the rest of the data.
1332   for (uint32_t i = 2; i < kData.size(); ++i) {
1333     ctx_.SendClientStream<64>(
1334         EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1335                         .set_session_id(7)
1336                         .set_offset(i)
1337                         .set_payload(data.subspan(i, 1))));
1338   }
1339 
1340   transfer_thread_.WaitUntilEventIsProcessed();
1341 
1342   ASSERT_EQ(ctx_.total_responses(), 2u);
1343   Chunk chunk = DecodeChunk(ctx_.responses().back());
1344   EXPECT_EQ(chunk.session_id(), 7u);
1345   EXPECT_EQ(chunk.offset(), 1u);
1346 
1347   // Send the remaining data.
1348   ctx_.SendClientStream<64>(
1349       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1350                       .set_session_id(7)
1351                       .set_offset(1)
1352                       .set_payload(data.subspan(1, 31))
1353                       .set_remaining_bytes(0)));
1354   transfer_thread_.WaitUntilEventIsProcessed();
1355 
1356   EXPECT_TRUE(handler_.finalize_write_called);
1357   EXPECT_EQ(handler_.finalize_write_status, OkStatus());
1358 }
1359 
TEST_F(WriteTransfer,ResendParametersIfSentRepeatedChunkDuringRecovery)1360 TEST_F(WriteTransfer, ResendParametersIfSentRepeatedChunkDuringRecovery) {
1361   ctx_.SendClientStream(EncodeChunk(
1362       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1363   transfer_thread_.WaitUntilEventIsProcessed();
1364 
1365   ASSERT_EQ(ctx_.total_responses(), 1u);
1366 
1367   constexpr span data(kData);
1368 
1369   // Skip offset 0, then send the rest of the data.
1370   for (uint32_t i = 1; i < kData.size(); ++i) {
1371     ctx_.SendClientStream<64>(
1372         EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1373                         .set_session_id(7)
1374                         .set_offset(i)
1375                         .set_payload(data.subspan(i, 1))));
1376   }
1377 
1378   transfer_thread_.WaitUntilEventIsProcessed();
1379 
1380   ASSERT_EQ(ctx_.total_responses(), 2u);  // Resent transfer parameters once.
1381 
1382   const auto last_chunk =
1383       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1384                       .set_session_id(7)
1385                       .set_offset(kData.size() - 1)
1386                       .set_payload(data.last(1)));
1387   ctx_.SendClientStream<64>(last_chunk);
1388   transfer_thread_.WaitUntilEventIsProcessed();
1389 
1390   // Resent transfer parameters since the packet is repeated
1391   ASSERT_EQ(ctx_.total_responses(), 3u);
1392 
1393   ctx_.SendClientStream<64>(last_chunk);
1394   transfer_thread_.WaitUntilEventIsProcessed();
1395 
1396   ASSERT_EQ(ctx_.total_responses(), 4u);
1397 
1398   Chunk chunk = DecodeChunk(ctx_.responses().back());
1399   EXPECT_EQ(chunk.session_id(), 7u);
1400   EXPECT_EQ(chunk.offset(), 0u);
1401   EXPECT_EQ(chunk.window_end_offset(), 32u);
1402 
1403   // Resumes normal operation when correct offset is sent.
1404   ctx_.SendClientStream<64>(
1405       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1406                       .set_session_id(7)
1407                       .set_offset(0)
1408                       .set_payload(kData)
1409                       .set_remaining_bytes(0)));
1410   transfer_thread_.WaitUntilEventIsProcessed();
1411 
1412   EXPECT_TRUE(handler_.finalize_write_called);
1413   EXPECT_EQ(handler_.finalize_write_status, OkStatus());
1414 }
1415 
TEST_F(WriteTransfer,ResendsStatusIfClientRetriesAfterStatusChunk)1416 TEST_F(WriteTransfer, ResendsStatusIfClientRetriesAfterStatusChunk) {
1417   ctx_.SendClientStream(EncodeChunk(
1418       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1419   transfer_thread_.WaitUntilEventIsProcessed();
1420 
1421   ASSERT_EQ(ctx_.total_responses(), 1u);
1422 
1423   ctx_.SendClientStream<64>(
1424       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1425                       .set_session_id(7)
1426                       .set_offset(0)
1427                       .set_payload(kData)
1428                       .set_remaining_bytes(0)));
1429   transfer_thread_.WaitUntilEventIsProcessed();
1430 
1431   ASSERT_EQ(ctx_.total_responses(), 2u);
1432   Chunk chunk = DecodeChunk(ctx_.responses().back());
1433   ASSERT_TRUE(chunk.status().has_value());
1434   EXPECT_EQ(chunk.status().value(), OkStatus());
1435 
1436   ctx_.SendClientStream<64>(
1437       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1438                       .set_session_id(7)
1439                       .set_offset(0)
1440                       .set_payload(kData)
1441                       .set_remaining_bytes(0)));
1442   transfer_thread_.WaitUntilEventIsProcessed();
1443 
1444   ASSERT_EQ(ctx_.total_responses(), 3u);
1445   chunk = DecodeChunk(ctx_.responses().back());
1446   ASSERT_TRUE(chunk.status().has_value());
1447   EXPECT_EQ(chunk.status().value(), OkStatus());
1448 }
1449 
TEST_F(WriteTransfer,IgnoresNonPendingTransfers)1450 TEST_F(WriteTransfer, IgnoresNonPendingTransfers) {
1451   ctx_.SendClientStream<64>(
1452       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1453                       .set_session_id(7)
1454                       .set_offset(3)));
1455   ctx_.SendClientStream<64>(
1456       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1457                       .set_session_id(7)
1458                       .set_offset(0)
1459                       .set_payload(span(kData).first(10))
1460                       .set_remaining_bytes(0)));
1461 
1462   transfer_thread_.WaitUntilEventIsProcessed();
1463 
1464   // Only start transfer for initial packet.
1465   EXPECT_FALSE(handler_.prepare_write_called);
1466   EXPECT_FALSE(handler_.finalize_write_called);
1467 }
1468 
TEST_F(WriteTransfer,AbortAndRestartIfInitialPacketIsReceived)1469 TEST_F(WriteTransfer, AbortAndRestartIfInitialPacketIsReceived) {
1470   ctx_.SendClientStream(EncodeChunk(
1471       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1472   transfer_thread_.WaitUntilEventIsProcessed();
1473 
1474   ASSERT_EQ(ctx_.total_responses(), 1u);
1475 
1476   ctx_.SendClientStream<64>(
1477       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1478                       .set_session_id(7)
1479                       .set_offset(0)
1480                       .set_payload(span(kData).first(8))));
1481   transfer_thread_.WaitUntilEventIsProcessed();
1482 
1483   ASSERT_EQ(ctx_.total_responses(), 1u);
1484 
1485   ASSERT_TRUE(handler_.prepare_write_called);
1486   ASSERT_FALSE(handler_.finalize_write_called);
1487   handler_.prepare_write_called = false;  // Reset to check it's called again.
1488 
1489   // Simulate client disappearing then restarting the transfer.
1490   ctx_.SendClientStream(EncodeChunk(
1491       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1492   transfer_thread_.WaitUntilEventIsProcessed();
1493 
1494   EXPECT_TRUE(handler_.prepare_write_called);
1495   EXPECT_TRUE(handler_.finalize_write_called);
1496   EXPECT_EQ(handler_.finalize_write_status, Status::Aborted());
1497 
1498   handler_.finalize_write_called = false;  // Reset to check it's called again.
1499 
1500   ASSERT_EQ(ctx_.total_responses(), 2u);
1501 
1502   ctx_.SendClientStream<64>(
1503       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1504                       .set_session_id(7)
1505                       .set_offset(0)
1506                       .set_payload(kData)
1507                       .set_remaining_bytes(0)));
1508   transfer_thread_.WaitUntilEventIsProcessed();
1509 
1510   ASSERT_EQ(ctx_.total_responses(), 3u);
1511 
1512   EXPECT_TRUE(handler_.finalize_write_called);
1513   EXPECT_EQ(handler_.finalize_write_status, OkStatus());
1514   EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
1515 }
1516 
1517 class SometimesUnavailableReadHandler final : public ReadOnlyHandler {
1518  public:
SometimesUnavailableReadHandler(uint32_t session_id,ConstByteSpan data)1519   SometimesUnavailableReadHandler(uint32_t session_id, ConstByteSpan data)
1520       : ReadOnlyHandler(session_id), reader_(data), call_count_(0) {}
1521 
PrepareRead()1522   Status PrepareRead() final {
1523     if ((call_count_++ % 2) == 0) {
1524       return Status::Unavailable();
1525     }
1526 
1527     set_reader(reader_);
1528     return OkStatus();
1529   }
1530 
1531  private:
1532   stream::MemoryReader reader_;
1533   int call_count_;
1534 };
1535 
TEST_F(ReadTransfer,PrepareError)1536 TEST_F(ReadTransfer, PrepareError) {
1537   SometimesUnavailableReadHandler unavailable_handler(88, kData);
1538   ctx_.service().RegisterHandler(unavailable_handler);
1539 
1540   ctx_.SendClientStream(
1541       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
1542                       .set_session_id(88)
1543                       .set_window_end_offset(128)
1544                       .set_offset(0)));
1545   transfer_thread_.WaitUntilEventIsProcessed();
1546 
1547   ASSERT_EQ(ctx_.total_responses(), 1u);
1548   Chunk chunk = DecodeChunk(ctx_.responses().back());
1549   EXPECT_EQ(chunk.session_id(), 88u);
1550   ASSERT_TRUE(chunk.status().has_value());
1551   EXPECT_EQ(chunk.status().value(), Status::DataLoss());
1552 
1553   // Try starting the transfer again. It should work this time.
1554   // TODO(frolv): This won't work until completion ACKs are supported.
1555   if (false) {
1556     ctx_.SendClientStream(
1557         EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
1558                         .set_session_id(88)
1559                         .set_window_end_offset(128)
1560                         .set_offset(0)));
1561     transfer_thread_.WaitUntilEventIsProcessed();
1562 
1563     ASSERT_EQ(ctx_.total_responses(), 2u);
1564     chunk = DecodeChunk(ctx_.responses().back());
1565     EXPECT_EQ(chunk.session_id(), 88u);
1566     ASSERT_EQ(chunk.payload().size(), kData.size());
1567     EXPECT_EQ(std::memcmp(
1568                   chunk.payload().data(), kData.data(), chunk.payload().size()),
1569               0);
1570   }
1571 }
1572 
TEST_F(WriteTransferMaxBytes16,Service_SetMaxPendingBytes)1573 TEST_F(WriteTransferMaxBytes16, Service_SetMaxPendingBytes) {
1574   ctx_.SendClientStream(EncodeChunk(
1575       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1576   transfer_thread_.WaitUntilEventIsProcessed();
1577 
1578   EXPECT_TRUE(handler_.prepare_write_called);
1579   EXPECT_FALSE(handler_.finalize_write_called);
1580 
1581   // First parameters chunk has the default window end offset of 16.
1582   ASSERT_EQ(ctx_.total_responses(), 1u);
1583   Chunk chunk = DecodeChunk(ctx_.responses().back());
1584   EXPECT_EQ(chunk.session_id(), 7u);
1585   EXPECT_EQ(chunk.window_end_offset(), 16u);
1586 
1587   // Update the pending bytes value.
1588   ctx_.service().set_max_pending_bytes(12);
1589 
1590   ctx_.SendClientStream<64>(
1591       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1592                       .set_session_id(7)
1593                       .set_offset(0)
1594                       .set_payload(span(kData).first(8))));
1595   transfer_thread_.WaitUntilEventIsProcessed();
1596 
1597   // Second parameters chunk should use the new max pending bytes.
1598   ASSERT_EQ(ctx_.total_responses(), 2u);
1599   chunk = DecodeChunk(ctx_.responses().back());
1600   EXPECT_EQ(chunk.session_id(), 7u);
1601   EXPECT_EQ(chunk.offset(), 8u);
1602   EXPECT_EQ(chunk.window_end_offset(), 8u + 12u);
1603 }
1604 
TEST_F(ReadTransfer,Version2_SimpleTransfer)1605 TEST_F(ReadTransfer, Version2_SimpleTransfer) {
1606   ctx_.SendClientStream(
1607       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
1608                       .set_resource_id(3)));
1609 
1610   transfer_thread_.WaitUntilEventIsProcessed();
1611 
1612   EXPECT_TRUE(handler_.prepare_read_called);
1613   EXPECT_FALSE(handler_.finalize_read_called);
1614 
1615   // First, the server responds with a START_ACK, assigning a session ID and
1616   // confirming the protocol version.
1617   ASSERT_EQ(ctx_.total_responses(), 1u);
1618   Chunk chunk = DecodeChunk(ctx_.responses().back());
1619   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1620   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
1621   EXPECT_EQ(chunk.session_id(), 1u);
1622   EXPECT_EQ(chunk.resource_id(), 3u);
1623 
1624   // Complete the handshake by confirming the server's ACK and sending the first
1625   // read transfer parameters.
1626   rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
1627     ctx_.SendClientStream(EncodeChunk(
1628         Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
1629             .set_session_id(1)
1630             .set_window_end_offset(64)
1631             .set_offset(0)));
1632 
1633     transfer_thread_.WaitUntilEventIsProcessed();
1634   });
1635 
1636   // Server should respond by starting the data transfer, sending its sole data
1637   // chunk and a remaining_bytes 0 chunk.
1638   ASSERT_EQ(ctx_.total_responses(), 3u);
1639 
1640   Chunk c1 = DecodeChunk(ctx_.responses()[1]);
1641   EXPECT_EQ(c1.protocol_version(), ProtocolVersion::kVersionTwo);
1642   EXPECT_EQ(c1.type(), Chunk::Type::kData);
1643   EXPECT_EQ(c1.session_id(), 1u);
1644   EXPECT_EQ(c1.offset(), 0u);
1645   ASSERT_TRUE(c1.has_payload());
1646   ASSERT_EQ(c1.payload().size(), kData.size());
1647   EXPECT_EQ(std::memcmp(c1.payload().data(), kData.data(), c1.payload().size()),
1648             0);
1649 
1650   Chunk c2 = DecodeChunk(ctx_.responses()[2]);
1651   EXPECT_EQ(c2.protocol_version(), ProtocolVersion::kVersionTwo);
1652   EXPECT_EQ(c2.type(), Chunk::Type::kData);
1653   EXPECT_EQ(c2.session_id(), 1u);
1654   EXPECT_FALSE(c2.has_payload());
1655   EXPECT_EQ(c2.remaining_bytes(), 0u);
1656 
1657   ctx_.SendClientStream(
1658       EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 1, OkStatus())));
1659   transfer_thread_.WaitUntilEventIsProcessed();
1660 
1661   EXPECT_TRUE(handler_.finalize_read_called);
1662   EXPECT_EQ(handler_.finalize_read_status, OkStatus());
1663 }
1664 
TEST_F(ReadTransfer,Version2_MultiChunk)1665 TEST_F(ReadTransfer, Version2_MultiChunk) {
1666   ctx_.SendClientStream(
1667       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
1668                       .set_resource_id(3)));
1669 
1670   transfer_thread_.WaitUntilEventIsProcessed();
1671 
1672   EXPECT_TRUE(handler_.prepare_read_called);
1673   EXPECT_FALSE(handler_.finalize_read_called);
1674 
1675   // First, the server responds with a START_ACK, assigning a session ID and
1676   // confirming the protocol version.
1677   ASSERT_EQ(ctx_.total_responses(), 1u);
1678   Chunk chunk = DecodeChunk(ctx_.responses().back());
1679   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1680   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
1681   EXPECT_EQ(chunk.session_id(), 1u);
1682   EXPECT_EQ(chunk.resource_id(), 3u);
1683 
1684   // Complete the handshake by confirming the server's ACK and sending the first
1685   // read transfer parameters.
1686   rpc::test::WaitForPackets(ctx_.output(), 3, [this] {
1687     ctx_.SendClientStream(EncodeChunk(
1688         Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
1689             .set_session_id(1)
1690             .set_window_end_offset(64)
1691             .set_max_chunk_size_bytes(16)
1692             .set_offset(0)));
1693 
1694     transfer_thread_.WaitUntilEventIsProcessed();
1695   });
1696 
1697   ASSERT_EQ(ctx_.total_responses(), 4u);
1698 
1699   Chunk c1 = DecodeChunk(ctx_.responses()[1]);
1700   EXPECT_EQ(c1.protocol_version(), ProtocolVersion::kVersionTwo);
1701   EXPECT_EQ(c1.type(), Chunk::Type::kData);
1702   EXPECT_EQ(c1.session_id(), 1u);
1703   EXPECT_EQ(c1.offset(), 0u);
1704   ASSERT_TRUE(c1.has_payload());
1705   ASSERT_EQ(c1.payload().size(), 16u);
1706   EXPECT_EQ(std::memcmp(c1.payload().data(), kData.data(), c1.payload().size()),
1707             0);
1708 
1709   Chunk c2 = DecodeChunk(ctx_.responses()[2]);
1710   EXPECT_EQ(c2.protocol_version(), ProtocolVersion::kVersionTwo);
1711   EXPECT_EQ(c2.type(), Chunk::Type::kData);
1712   EXPECT_EQ(c2.session_id(), 1u);
1713   EXPECT_EQ(c2.offset(), 16u);
1714   ASSERT_TRUE(c2.has_payload());
1715   ASSERT_EQ(c2.payload().size(), 16u);
1716   EXPECT_EQ(
1717       std::memcmp(
1718           c2.payload().data(), kData.data() + c2.offset(), c2.payload().size()),
1719       0);
1720 
1721   Chunk c3 = DecodeChunk(ctx_.responses()[3]);
1722   EXPECT_EQ(c3.protocol_version(), ProtocolVersion::kVersionTwo);
1723   EXPECT_EQ(c3.type(), Chunk::Type::kData);
1724   EXPECT_EQ(c3.session_id(), 1u);
1725   EXPECT_FALSE(c3.has_payload());
1726   EXPECT_EQ(c3.remaining_bytes(), 0u);
1727 
1728   ctx_.SendClientStream(
1729       EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 1, OkStatus())));
1730   transfer_thread_.WaitUntilEventIsProcessed();
1731 
1732   EXPECT_TRUE(handler_.finalize_read_called);
1733   EXPECT_EQ(handler_.finalize_read_status, OkStatus());
1734 }
1735 
TEST_F(ReadTransfer,Version2_MultiParameters)1736 TEST_F(ReadTransfer, Version2_MultiParameters) {
1737   ctx_.SendClientStream(
1738       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
1739                       .set_resource_id(3)));
1740 
1741   transfer_thread_.WaitUntilEventIsProcessed();
1742 
1743   EXPECT_TRUE(handler_.prepare_read_called);
1744   EXPECT_FALSE(handler_.finalize_read_called);
1745 
1746   // First, the server responds with a START_ACK, assigning a session ID and
1747   // confirming the protocol version.
1748   ASSERT_EQ(ctx_.total_responses(), 1u);
1749   Chunk chunk = DecodeChunk(ctx_.responses().back());
1750   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1751   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
1752   EXPECT_EQ(chunk.session_id(), 1u);
1753   EXPECT_EQ(chunk.resource_id(), 3u);
1754 
1755   // Complete the handshake by confirming the server's ACK and sending the first
1756   // read transfer parameters.
1757   ctx_.SendClientStream(EncodeChunk(
1758       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
1759           .set_session_id(1)
1760           .set_window_end_offset(16)
1761           .set_offset(0)));
1762   transfer_thread_.WaitUntilEventIsProcessed();
1763 
1764   ASSERT_EQ(ctx_.total_responses(), 2u);
1765 
1766   Chunk c1 = DecodeChunk(ctx_.responses()[1]);
1767   EXPECT_EQ(c1.protocol_version(), ProtocolVersion::kVersionTwo);
1768   EXPECT_EQ(c1.type(), Chunk::Type::kData);
1769   EXPECT_EQ(c1.session_id(), 1u);
1770   EXPECT_EQ(c1.offset(), 0u);
1771   ASSERT_TRUE(c1.has_payload());
1772   ASSERT_EQ(c1.payload().size(), 16u);
1773   EXPECT_EQ(std::memcmp(c1.payload().data(), kData.data(), c1.payload().size()),
1774             0);
1775 
1776   rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
1777     ctx_.SendClientStream(EncodeChunk(
1778         Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kParametersContinue)
1779             .set_session_id(1)
1780             .set_window_end_offset(64)
1781             .set_offset(16)));
1782     transfer_thread_.WaitUntilEventIsProcessed();
1783   });
1784 
1785   ASSERT_EQ(ctx_.total_responses(), 4u);
1786 
1787   Chunk c2 = DecodeChunk(ctx_.responses()[2]);
1788   EXPECT_EQ(c2.protocol_version(), ProtocolVersion::kVersionTwo);
1789   EXPECT_EQ(c2.type(), Chunk::Type::kData);
1790   EXPECT_EQ(c2.session_id(), 1u);
1791   EXPECT_EQ(c2.offset(), 16u);
1792   ASSERT_TRUE(c2.has_payload());
1793   ASSERT_EQ(c2.payload().size(), 16u);
1794   EXPECT_EQ(
1795       std::memcmp(
1796           c2.payload().data(), kData.data() + c2.offset(), c2.payload().size()),
1797       0);
1798 
1799   Chunk c3 = DecodeChunk(ctx_.responses()[3]);
1800   EXPECT_EQ(c3.protocol_version(), ProtocolVersion::kVersionTwo);
1801   EXPECT_EQ(c3.type(), Chunk::Type::kData);
1802   EXPECT_EQ(c3.session_id(), 1u);
1803   EXPECT_FALSE(c3.has_payload());
1804   EXPECT_EQ(c3.remaining_bytes(), 0u);
1805 
1806   ctx_.SendClientStream(
1807       EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 1, OkStatus())));
1808   transfer_thread_.WaitUntilEventIsProcessed();
1809 
1810   EXPECT_TRUE(handler_.finalize_read_called);
1811   EXPECT_EQ(handler_.finalize_read_status, OkStatus());
1812 }
1813 
TEST_F(ReadTransfer,Version2_ClientTerminatesDuringHandshake)1814 TEST_F(ReadTransfer, Version2_ClientTerminatesDuringHandshake) {
1815   ctx_.SendClientStream(
1816       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
1817                       .set_resource_id(3)));
1818 
1819   transfer_thread_.WaitUntilEventIsProcessed();
1820 
1821   EXPECT_TRUE(handler_.prepare_read_called);
1822   EXPECT_FALSE(handler_.finalize_read_called);
1823 
1824   // First, the server responds with a START_ACK, assigning a session ID and
1825   // confirming the protocol version.
1826   ASSERT_EQ(ctx_.total_responses(), 1u);
1827   Chunk chunk = DecodeChunk(ctx_.responses().back());
1828   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1829   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
1830   EXPECT_EQ(chunk.session_id(), 1u);
1831   EXPECT_EQ(chunk.resource_id(), 3u);
1832 
1833   // Send a terminating chunk instead of the third part of the handshake.
1834   ctx_.SendClientStream(EncodeChunk(Chunk::Final(
1835       ProtocolVersion::kVersionTwo, 1, Status::ResourceExhausted())));
1836   transfer_thread_.WaitUntilEventIsProcessed();
1837 
1838   EXPECT_TRUE(handler_.finalize_read_called);
1839   EXPECT_EQ(handler_.finalize_read_status, Status::ResourceExhausted());
1840 }
1841 
TEST_F(ReadTransfer,Version2_ClientSendsWrongProtocolVersion)1842 TEST_F(ReadTransfer, Version2_ClientSendsWrongProtocolVersion) {
1843   ctx_.SendClientStream(
1844       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
1845                       .set_resource_id(3)));
1846 
1847   transfer_thread_.WaitUntilEventIsProcessed();
1848 
1849   EXPECT_TRUE(handler_.prepare_read_called);
1850   EXPECT_FALSE(handler_.finalize_read_called);
1851 
1852   // First, the server responds with a START_ACK, assigning a session ID and
1853   // confirming the protocol version.
1854   ASSERT_EQ(ctx_.total_responses(), 1u);
1855   Chunk chunk = DecodeChunk(ctx_.responses().back());
1856   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1857   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
1858   EXPECT_EQ(chunk.session_id(), 1u);
1859   EXPECT_EQ(chunk.resource_id(), 3u);
1860 
1861   // Complete the handshake by confirming the server's ACK and sending the first
1862   // read transfer parameters.
1863   ctx_.SendClientStream(EncodeChunk(
1864       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
1865           .set_session_id(1)
1866           .set_window_end_offset(16)
1867           .set_offset(0)));
1868   transfer_thread_.WaitUntilEventIsProcessed();
1869 
1870   ASSERT_EQ(ctx_.total_responses(), 2u);
1871 
1872   Chunk c1 = DecodeChunk(ctx_.responses()[1]);
1873   EXPECT_EQ(c1.protocol_version(), ProtocolVersion::kVersionTwo);
1874   EXPECT_EQ(c1.type(), Chunk::Type::kData);
1875   EXPECT_EQ(c1.session_id(), 1u);
1876   EXPECT_EQ(c1.offset(), 0u);
1877   ASSERT_TRUE(c1.has_payload());
1878   ASSERT_EQ(c1.payload().size(), 16u);
1879   EXPECT_EQ(std::memcmp(c1.payload().data(), kData.data(), c1.payload().size()),
1880             0);
1881 
1882   // Send a parameters update, but with the incorrect protocol version. The
1883   // server should terminate the transfer.
1884   ctx_.SendClientStream(EncodeChunk(
1885       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersContinue)
1886           .set_session_id(1)
1887           .set_window_end_offset(64)
1888           .set_offset(16)));
1889   transfer_thread_.WaitUntilEventIsProcessed();
1890 
1891   ASSERT_EQ(ctx_.total_responses(), 3u);
1892 
1893   chunk = DecodeChunk(ctx_.responses().back());
1894   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1895   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
1896   EXPECT_EQ(chunk.session_id(), 1u);
1897   ASSERT_TRUE(chunk.status().has_value());
1898   EXPECT_EQ(chunk.status().value(), Status::Internal());
1899 
1900   EXPECT_TRUE(handler_.finalize_read_called);
1901   EXPECT_EQ(handler_.finalize_read_status, Status::Internal());
1902 }
1903 
TEST_F(ReadTransfer,Version2_BadParametersInHandshake)1904 TEST_F(ReadTransfer, Version2_BadParametersInHandshake) {
1905   ctx_.SendClientStream(
1906       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
1907                       .set_resource_id(3)));
1908 
1909   transfer_thread_.WaitUntilEventIsProcessed();
1910 
1911   EXPECT_TRUE(handler_.prepare_read_called);
1912   EXPECT_FALSE(handler_.finalize_read_called);
1913 
1914   ASSERT_EQ(ctx_.total_responses(), 1u);
1915   Chunk chunk = DecodeChunk(ctx_.responses().back());
1916   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1917   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
1918   EXPECT_EQ(chunk.session_id(), 1u);
1919   EXPECT_EQ(chunk.resource_id(), 3u);
1920 
1921   // Complete the handshake, but send an invalid parameters chunk. The server
1922   // should terminate the transfer.
1923   ctx_.SendClientStream(EncodeChunk(
1924       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
1925           .set_session_id(1)
1926           .set_window_end_offset(0)
1927           .set_offset(0)));
1928 
1929   transfer_thread_.WaitUntilEventIsProcessed();
1930 
1931   ASSERT_EQ(ctx_.total_responses(), 2u);
1932 
1933   Chunk c1 = DecodeChunk(ctx_.responses()[1]);
1934   EXPECT_EQ(c1.protocol_version(), ProtocolVersion::kVersionTwo);
1935   EXPECT_EQ(c1.type(), Chunk::Type::kCompletion);
1936   EXPECT_EQ(c1.session_id(), 1u);
1937   ASSERT_TRUE(c1.status().has_value());
1938   EXPECT_EQ(c1.status().value(), Status::ResourceExhausted());
1939 }
1940 
TEST_F(ReadTransfer,Version2_InvalidResourceId)1941 TEST_F(ReadTransfer, Version2_InvalidResourceId) {
1942   ctx_.SendClientStream(
1943       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
1944                       .set_resource_id(99)));
1945 
1946   transfer_thread_.WaitUntilEventIsProcessed();
1947 
1948   ASSERT_EQ(ctx_.total_responses(), 1u);
1949 
1950   Chunk chunk = DecodeChunk(ctx_.responses().back());
1951   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1952   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
1953   EXPECT_EQ(chunk.status().value(), Status::NotFound());
1954 }
1955 
TEST_F(ReadTransfer,Version2_PrepareError)1956 TEST_F(ReadTransfer, Version2_PrepareError) {
1957   SometimesUnavailableReadHandler unavailable_handler(99, kData);
1958   ctx_.service().RegisterHandler(unavailable_handler);
1959 
1960   ctx_.SendClientStream(
1961       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
1962                       .set_resource_id(99)));
1963   transfer_thread_.WaitUntilEventIsProcessed();
1964 
1965   ASSERT_EQ(ctx_.total_responses(), 1u);
1966   Chunk chunk = DecodeChunk(ctx_.responses().back());
1967   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1968   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
1969   EXPECT_EQ(chunk.resource_id(), 99u);
1970   EXPECT_EQ(chunk.status().value(), Status::DataLoss());
1971 }
1972 
TEST_F(WriteTransfer,Version2_SimpleTransfer)1973 TEST_F(WriteTransfer, Version2_SimpleTransfer) {
1974   ctx_.SendClientStream(
1975       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
1976                       .set_resource_id(7)));
1977 
1978   transfer_thread_.WaitUntilEventIsProcessed();
1979 
1980   EXPECT_TRUE(handler_.prepare_write_called);
1981   EXPECT_FALSE(handler_.finalize_write_called);
1982 
1983   // First, the server responds with a START_ACK, assigning a session ID and
1984   // confirming the protocol version.
1985   ASSERT_EQ(ctx_.total_responses(), 1u);
1986   Chunk chunk = DecodeChunk(ctx_.responses().back());
1987   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1988   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
1989   EXPECT_EQ(chunk.session_id(), 1u);
1990   EXPECT_EQ(chunk.resource_id(), 7u);
1991 
1992   // Complete the handshake by confirming the server's ACK.
1993   ctx_.SendClientStream(EncodeChunk(
1994       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
1995           .set_session_id(1)));
1996   transfer_thread_.WaitUntilEventIsProcessed();
1997 
1998   // Server should respond by sending its initial transfer parameters.
1999   ASSERT_EQ(ctx_.total_responses(), 2u);
2000 
2001   chunk = DecodeChunk(ctx_.responses()[1]);
2002   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2003   EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
2004   EXPECT_EQ(chunk.session_id(), 1u);
2005   EXPECT_EQ(chunk.offset(), 0u);
2006   EXPECT_EQ(chunk.window_end_offset(), 32u);
2007   ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
2008   EXPECT_EQ(chunk.max_chunk_size_bytes().value(), 37u);
2009 
2010   // Send all of our data.
2011   ctx_.SendClientStream<64>(
2012       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2013                       .set_session_id(1)
2014                       .set_offset(0)
2015                       .set_payload(kData)
2016                       .set_remaining_bytes(0)));
2017   transfer_thread_.WaitUntilEventIsProcessed();
2018 
2019   ASSERT_EQ(ctx_.total_responses(), 3u);
2020 
2021   chunk = DecodeChunk(ctx_.responses().back());
2022   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2023   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2024   EXPECT_EQ(chunk.session_id(), 1u);
2025   ASSERT_TRUE(chunk.status().has_value());
2026   EXPECT_EQ(chunk.status().value(), OkStatus());
2027 
2028   // Send the completion acknowledgement.
2029   ctx_.SendClientStream(EncodeChunk(
2030       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
2031           .set_session_id(1)));
2032   transfer_thread_.WaitUntilEventIsProcessed();
2033 
2034   ASSERT_EQ(ctx_.total_responses(), 3u);
2035 
2036   EXPECT_TRUE(handler_.finalize_write_called);
2037   EXPECT_EQ(handler_.finalize_write_status, OkStatus());
2038   EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
2039 }
2040 
TEST_F(WriteTransfer,Version2_Multichunk)2041 TEST_F(WriteTransfer, Version2_Multichunk) {
2042   ctx_.SendClientStream(
2043       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
2044                       .set_resource_id(7)));
2045 
2046   transfer_thread_.WaitUntilEventIsProcessed();
2047 
2048   EXPECT_TRUE(handler_.prepare_write_called);
2049   EXPECT_FALSE(handler_.finalize_write_called);
2050 
2051   // First, the server responds with a START_ACK, assigning a session ID and
2052   // confirming the protocol version.
2053   ASSERT_EQ(ctx_.total_responses(), 1u);
2054   Chunk chunk = DecodeChunk(ctx_.responses().back());
2055   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2056   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
2057   EXPECT_EQ(chunk.session_id(), 1u);
2058   EXPECT_EQ(chunk.resource_id(), 7u);
2059 
2060   // Complete the handshake by confirming the server's ACK.
2061   ctx_.SendClientStream(EncodeChunk(
2062       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
2063           .set_session_id(1)));
2064   transfer_thread_.WaitUntilEventIsProcessed();
2065 
2066   // Server should respond by sending its initial transfer parameters.
2067   ASSERT_EQ(ctx_.total_responses(), 2u);
2068 
2069   chunk = DecodeChunk(ctx_.responses()[1]);
2070   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2071   EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
2072   EXPECT_EQ(chunk.session_id(), 1u);
2073   EXPECT_EQ(chunk.offset(), 0u);
2074   EXPECT_EQ(chunk.window_end_offset(), 32u);
2075   ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
2076   EXPECT_EQ(chunk.max_chunk_size_bytes().value(), 37u);
2077 
2078   // Send all of our data across two chunks.
2079   ctx_.SendClientStream<64>(
2080       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2081                       .set_session_id(1)
2082                       .set_offset(0)
2083                       .set_payload(span(kData).first(8))));
2084   ctx_.SendClientStream<64>(
2085       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2086                       .set_session_id(1)
2087                       .set_offset(8)
2088                       .set_payload(span(kData).subspan(8))
2089                       .set_remaining_bytes(0)));
2090   transfer_thread_.WaitUntilEventIsProcessed();
2091 
2092   ASSERT_EQ(ctx_.total_responses(), 3u);
2093 
2094   chunk = DecodeChunk(ctx_.responses().back());
2095   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2096   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2097   EXPECT_EQ(chunk.session_id(), 1u);
2098   ASSERT_TRUE(chunk.status().has_value());
2099   EXPECT_EQ(chunk.status().value(), OkStatus());
2100 
2101   // Send the completion acknowledgement.
2102   ctx_.SendClientStream(EncodeChunk(
2103       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
2104           .set_session_id(1)));
2105   transfer_thread_.WaitUntilEventIsProcessed();
2106 
2107   ASSERT_EQ(ctx_.total_responses(), 3u);
2108 
2109   EXPECT_TRUE(handler_.finalize_write_called);
2110   EXPECT_EQ(handler_.finalize_write_status, OkStatus());
2111   EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
2112 }
2113 
TEST_F(WriteTransfer,Version2_ContinueParameters)2114 TEST_F(WriteTransfer, Version2_ContinueParameters) {
2115   ctx_.SendClientStream(
2116       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
2117                       .set_resource_id(7)));
2118 
2119   transfer_thread_.WaitUntilEventIsProcessed();
2120 
2121   EXPECT_TRUE(handler_.prepare_write_called);
2122   EXPECT_FALSE(handler_.finalize_write_called);
2123 
2124   // First, the server responds with a START_ACK, assigning a session ID and
2125   // confirming the protocol version.
2126   ASSERT_EQ(ctx_.total_responses(), 1u);
2127   Chunk chunk = DecodeChunk(ctx_.responses().back());
2128   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2129   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
2130   EXPECT_EQ(chunk.session_id(), 1u);
2131   EXPECT_EQ(chunk.resource_id(), 7u);
2132 
2133   // Complete the handshake by confirming the server's ACK.
2134   ctx_.SendClientStream(EncodeChunk(
2135       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
2136           .set_session_id(1)));
2137   transfer_thread_.WaitUntilEventIsProcessed();
2138 
2139   // Server should respond by sending its initial transfer parameters.
2140   ASSERT_EQ(ctx_.total_responses(), 2u);
2141 
2142   chunk = DecodeChunk(ctx_.responses()[1]);
2143   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2144   EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
2145   EXPECT_EQ(chunk.session_id(), 1u);
2146   EXPECT_EQ(chunk.offset(), 0u);
2147   EXPECT_EQ(chunk.window_end_offset(), 32u);
2148   ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
2149   EXPECT_EQ(chunk.max_chunk_size_bytes().value(), 37u);
2150 
2151   // Send all of our data across several chunks.
2152   ctx_.SendClientStream<64>(
2153       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2154                       .set_session_id(1)
2155                       .set_offset(0)
2156                       .set_payload(span(kData).first(8))));
2157 
2158   transfer_thread_.WaitUntilEventIsProcessed();
2159   ASSERT_EQ(ctx_.total_responses(), 2u);
2160 
2161   ctx_.SendClientStream<64>(
2162       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2163                       .set_session_id(1)
2164                       .set_offset(8)
2165                       .set_payload(span(kData).subspan(8, 8))));
2166 
2167   transfer_thread_.WaitUntilEventIsProcessed();
2168   ASSERT_EQ(ctx_.total_responses(), 3u);
2169 
2170   chunk = DecodeChunk(ctx_.responses().back());
2171   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2172   EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
2173   EXPECT_EQ(chunk.session_id(), 1u);
2174   EXPECT_EQ(chunk.offset(), 16u);
2175   EXPECT_EQ(chunk.window_end_offset(), 32u);
2176 
2177   ctx_.SendClientStream<64>(
2178       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2179                       .set_session_id(1)
2180                       .set_offset(16)
2181                       .set_payload(span(kData).subspan(16, 8))));
2182 
2183   transfer_thread_.WaitUntilEventIsProcessed();
2184   ASSERT_EQ(ctx_.total_responses(), 4u);
2185 
2186   chunk = DecodeChunk(ctx_.responses().back());
2187   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2188   EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
2189   EXPECT_EQ(chunk.session_id(), 1u);
2190   EXPECT_EQ(chunk.offset(), 24u);
2191   EXPECT_EQ(chunk.window_end_offset(), 32u);
2192 
2193   ctx_.SendClientStream<64>(
2194       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2195                       .set_session_id(1)
2196                       .set_offset(24)
2197                       .set_payload(span(kData).subspan(24))
2198                       .set_remaining_bytes(0)));
2199   transfer_thread_.WaitUntilEventIsProcessed();
2200 
2201   ASSERT_EQ(ctx_.total_responses(), 5u);
2202 
2203   chunk = DecodeChunk(ctx_.responses().back());
2204   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2205   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2206   EXPECT_EQ(chunk.session_id(), 1u);
2207   ASSERT_TRUE(chunk.status().has_value());
2208   EXPECT_EQ(chunk.status().value(), OkStatus());
2209 
2210   // Send the completion acknowledgement.
2211   ctx_.SendClientStream(EncodeChunk(
2212       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
2213           .set_session_id(1)));
2214   transfer_thread_.WaitUntilEventIsProcessed();
2215 
2216   ASSERT_EQ(ctx_.total_responses(), 5u);
2217 
2218   EXPECT_TRUE(handler_.finalize_write_called);
2219   EXPECT_EQ(handler_.finalize_write_status, OkStatus());
2220   EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
2221 }
2222 
TEST_F(WriteTransfer,Version2_ClientTerminatesDuringHandshake)2223 TEST_F(WriteTransfer, Version2_ClientTerminatesDuringHandshake) {
2224   ctx_.SendClientStream(
2225       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
2226                       .set_resource_id(7)));
2227 
2228   transfer_thread_.WaitUntilEventIsProcessed();
2229 
2230   EXPECT_TRUE(handler_.prepare_write_called);
2231   EXPECT_FALSE(handler_.finalize_write_called);
2232 
2233   // First, the server responds with a START_ACK, assigning a session ID and
2234   // confirming the protocol version.
2235   ASSERT_EQ(ctx_.total_responses(), 1u);
2236   Chunk chunk = DecodeChunk(ctx_.responses().back());
2237   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2238   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
2239   EXPECT_EQ(chunk.session_id(), 1u);
2240   EXPECT_EQ(chunk.resource_id(), 7u);
2241 
2242   // Send an error chunk instead of completing the handshake.
2243   ctx_.SendClientStream(EncodeChunk(Chunk::Final(
2244       ProtocolVersion::kVersionTwo, 1, Status::FailedPrecondition())));
2245   transfer_thread_.WaitUntilEventIsProcessed();
2246 
2247   EXPECT_TRUE(handler_.finalize_write_called);
2248   EXPECT_EQ(handler_.finalize_write_status, Status::FailedPrecondition());
2249 }
2250 
TEST_F(WriteTransfer,Version2_ClientSendsWrongProtocolVersion)2251 TEST_F(WriteTransfer, Version2_ClientSendsWrongProtocolVersion) {
2252   ctx_.SendClientStream(
2253       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
2254                       .set_resource_id(7)));
2255 
2256   transfer_thread_.WaitUntilEventIsProcessed();
2257 
2258   EXPECT_TRUE(handler_.prepare_write_called);
2259   EXPECT_FALSE(handler_.finalize_write_called);
2260 
2261   // First, the server responds with a START_ACK, assigning a session ID and
2262   // confirming the protocol version.
2263   ASSERT_EQ(ctx_.total_responses(), 1u);
2264   Chunk chunk = DecodeChunk(ctx_.responses().back());
2265   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2266   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
2267   EXPECT_EQ(chunk.session_id(), 1u);
2268   EXPECT_EQ(chunk.resource_id(), 7u);
2269 
2270   // Complete the handshake by confirming the server's ACK.
2271   ctx_.SendClientStream(EncodeChunk(
2272       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
2273           .set_session_id(1)));
2274   transfer_thread_.WaitUntilEventIsProcessed();
2275 
2276   // Server should respond by sending its initial transfer parameters.
2277   ASSERT_EQ(ctx_.total_responses(), 2u);
2278 
2279   chunk = DecodeChunk(ctx_.responses()[1]);
2280   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2281   EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
2282   EXPECT_EQ(chunk.session_id(), 1u);
2283   EXPECT_EQ(chunk.offset(), 0u);
2284   EXPECT_EQ(chunk.window_end_offset(), 32u);
2285   ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
2286   EXPECT_EQ(chunk.max_chunk_size_bytes().value(), 37u);
2287 
2288   // The transfer was configured to use protocol version 2. Send a legacy chunk
2289   // instead.
2290   ctx_.SendClientStream<64>(
2291       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
2292                       .set_session_id(1)
2293                       .set_offset(0)
2294                       .set_payload(kData)
2295                       .set_remaining_bytes(0)));
2296   transfer_thread_.WaitUntilEventIsProcessed();
2297 
2298   // Server should terminate the transfer.
2299   ASSERT_EQ(ctx_.total_responses(), 3u);
2300 
2301   chunk = DecodeChunk(ctx_.responses()[2]);
2302   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2303   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2304   EXPECT_EQ(chunk.status().value(), Status::Internal());
2305 
2306   // Send the completion acknowledgement.
2307   ctx_.SendClientStream(EncodeChunk(
2308       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
2309           .set_session_id(1)));
2310   transfer_thread_.WaitUntilEventIsProcessed();
2311 
2312   ASSERT_EQ(ctx_.total_responses(), 3u);
2313 }
2314 
TEST_F(WriteTransfer,Version2_InvalidResourceId)2315 TEST_F(WriteTransfer, Version2_InvalidResourceId) {
2316   ctx_.SendClientStream(
2317       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
2318                       .set_resource_id(99)));
2319 
2320   transfer_thread_.WaitUntilEventIsProcessed();
2321 
2322   ASSERT_EQ(ctx_.total_responses(), 1u);
2323 
2324   Chunk chunk = DecodeChunk(ctx_.responses().back());
2325   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2326   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2327   EXPECT_EQ(chunk.status().value(), Status::NotFound());
2328 }
2329 
2330 class ReadTransferLowMaxRetries : public ::testing::Test {
2331  protected:
2332   static constexpr uint32_t kMaxRetries = 3;
2333   static constexpr uint32_t kMaxLifetimeRetries = 4;
2334 
ReadTransferLowMaxRetries()2335   ReadTransferLowMaxRetries()
2336       : handler_(9, kData),
2337         transfer_thread_(data_buffer_, encode_buffer_),
2338         ctx_(transfer_thread_,
2339              64,
2340              // Use a long timeout to avoid accidentally triggering timeouts.
2341              std::chrono::minutes(1),
2342              kMaxRetries,
2343              cfg::kDefaultExtendWindowDivisor,
2344              kMaxLifetimeRetries),
2345         system_thread_(TransferThreadOptions(), transfer_thread_) {
2346     ctx_.service().RegisterHandler(handler_);
2347 
2348     PW_CHECK(!handler_.prepare_read_called);
2349     PW_CHECK(!handler_.finalize_read_called);
2350 
2351     ctx_.call();  // Open the read stream
2352     transfer_thread_.WaitUntilEventIsProcessed();
2353   }
2354 
~ReadTransferLowMaxRetries()2355   ~ReadTransferLowMaxRetries() override {
2356     transfer_thread_.Terminate();
2357     system_thread_.join();
2358   }
2359 
2360   SimpleReadTransfer handler_;
2361   Thread<1, 1> transfer_thread_;
2362   PW_RAW_TEST_METHOD_CONTEXT(TransferService, Read, 10) ctx_;
2363   thread::Thread system_thread_;
2364   std::array<std::byte, 64> data_buffer_;
2365   std::array<std::byte, 64> encode_buffer_;
2366 };
2367 
TEST_F(ReadTransferLowMaxRetries,FailsAfterLifetimeRetryCount)2368 TEST_F(ReadTransferLowMaxRetries, FailsAfterLifetimeRetryCount) {
2369   ctx_.SendClientStream(
2370       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
2371                       .set_session_id(9)
2372                       .set_window_end_offset(16)
2373                       .set_offset(0)));
2374 
2375   transfer_thread_.WaitUntilEventIsProcessed();
2376 
2377   EXPECT_TRUE(handler_.prepare_read_called);
2378   EXPECT_FALSE(handler_.finalize_read_called);
2379 
2380   ASSERT_EQ(ctx_.total_responses(), 1u);
2381   Chunk chunk = DecodeChunk(ctx_.responses().back());
2382 
2383   EXPECT_EQ(chunk.session_id(), 9u);
2384   EXPECT_EQ(chunk.offset(), 0u);
2385   ASSERT_EQ(chunk.payload().size(), 16u);
2386   EXPECT_EQ(
2387       std::memcmp(chunk.payload().data(), kData.data(), chunk.payload().size()),
2388       0);
2389 
2390   // Time out twice. Server should retry both times.
2391   transfer_thread_.SimulateServerTimeout(9);
2392   transfer_thread_.SimulateServerTimeout(9);
2393   ASSERT_EQ(ctx_.total_responses(), 3u);
2394   chunk = DecodeChunk(ctx_.responses().back());
2395   EXPECT_EQ(chunk.session_id(), 9u);
2396   EXPECT_EQ(chunk.offset(), 0u);
2397   ASSERT_EQ(chunk.payload().size(), 16u);
2398   EXPECT_EQ(
2399       std::memcmp(chunk.payload().data(), kData.data(), chunk.payload().size()),
2400       0);
2401 
2402   ctx_.SendClientStream(EncodeChunk(
2403       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersContinue)
2404           .set_session_id(9)
2405           .set_window_end_offset(32)
2406           .set_offset(16)));
2407   transfer_thread_.WaitUntilEventIsProcessed();
2408 
2409   ASSERT_EQ(ctx_.total_responses(), 4u);
2410   chunk = DecodeChunk(ctx_.responses().back());
2411   EXPECT_EQ(chunk.session_id(), 9u);
2412   EXPECT_EQ(chunk.offset(), 16u);
2413   ASSERT_EQ(chunk.payload().size(), 16u);
2414   EXPECT_EQ(
2415       std::memcmp(
2416           chunk.payload().data(), kData.data() + 16, chunk.payload().size()),
2417       0);
2418 
2419   // Time out three more times. The transfer should terminate.
2420   transfer_thread_.SimulateServerTimeout(9);
2421   ASSERT_EQ(ctx_.total_responses(), 5u);
2422   chunk = DecodeChunk(ctx_.responses().back());
2423   EXPECT_EQ(chunk.session_id(), 9u);
2424   EXPECT_EQ(chunk.offset(), 16u);
2425   ASSERT_EQ(chunk.payload().size(), 16u);
2426   EXPECT_EQ(
2427       std::memcmp(
2428           chunk.payload().data(), kData.data() + 16, chunk.payload().size()),
2429       0);
2430 
2431   transfer_thread_.SimulateServerTimeout(9);
2432   ASSERT_EQ(ctx_.total_responses(), 6u);
2433   chunk = DecodeChunk(ctx_.responses().back());
2434   EXPECT_EQ(chunk.session_id(), 9u);
2435   EXPECT_EQ(chunk.offset(), 16u);
2436   ASSERT_EQ(chunk.payload().size(), 16u);
2437   EXPECT_EQ(
2438       std::memcmp(
2439           chunk.payload().data(), kData.data() + 16, chunk.payload().size()),
2440       0);
2441 
2442   transfer_thread_.SimulateServerTimeout(9);
2443   ASSERT_EQ(ctx_.total_responses(), 7u);
2444   chunk = DecodeChunk(ctx_.responses().back());
2445   EXPECT_EQ(chunk.status(), Status::DeadlineExceeded());
2446 }
2447 
TEST_F(ReadTransferLowMaxRetries,Version2_FailsAfterLifetimeRetryCount)2448 TEST_F(ReadTransferLowMaxRetries, Version2_FailsAfterLifetimeRetryCount) {
2449   ctx_.SendClientStream(
2450       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
2451                       .set_resource_id(9)));
2452 
2453   transfer_thread_.WaitUntilEventIsProcessed();
2454 
2455   EXPECT_TRUE(handler_.prepare_read_called);
2456   EXPECT_FALSE(handler_.finalize_read_called);
2457 
2458   // First, the server responds with a START_ACK, assigning a session ID and
2459   // confirming the protocol version.
2460   ASSERT_EQ(ctx_.total_responses(), 1u);
2461   Chunk chunk = DecodeChunk(ctx_.responses().back());
2462   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2463   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
2464   EXPECT_EQ(chunk.session_id(), 1u);
2465   EXPECT_EQ(chunk.resource_id(), 9u);
2466 
2467   // Time out twice. Server should retry both times.
2468   transfer_thread_.SimulateServerTimeout(1);
2469   transfer_thread_.SimulateServerTimeout(1);
2470   ASSERT_EQ(ctx_.total_responses(), 3u);
2471   chunk = DecodeChunk(ctx_.responses().back());
2472   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
2473 
2474   // Complete the handshake, allowing the transfer to continue.
2475   ctx_.SendClientStream(EncodeChunk(
2476       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
2477           .set_session_id(1)
2478           .set_window_end_offset(16)
2479           .set_offset(0)));
2480   transfer_thread_.WaitUntilEventIsProcessed();
2481 
2482   ASSERT_EQ(ctx_.total_responses(), 4u);
2483   chunk = DecodeChunk(ctx_.responses().back());
2484   EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2485 
2486   // Time out three more times. The transfer should terminate.
2487   transfer_thread_.SimulateServerTimeout(1);
2488   ASSERT_EQ(ctx_.total_responses(), 5u);
2489   chunk = DecodeChunk(ctx_.responses().back());
2490   EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2491 
2492   transfer_thread_.SimulateServerTimeout(1);
2493   ASSERT_EQ(ctx_.total_responses(), 6u);
2494   chunk = DecodeChunk(ctx_.responses().back());
2495   EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2496 
2497   transfer_thread_.SimulateServerTimeout(1);
2498   ASSERT_EQ(ctx_.total_responses(), 7u);
2499   chunk = DecodeChunk(ctx_.responses().back());
2500   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2501   EXPECT_EQ(chunk.status(), Status::DeadlineExceeded());
2502 }
2503 
2504 }  // namespace
2505 }  // namespace pw::transfer::test
2506