• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_transfer/client.h"
16 
17 #include <cstring>
18 
19 #include "gtest/gtest.h"
20 #include "pw_assert/check.h"
21 #include "pw_bytes/array.h"
22 #include "pw_rpc/raw/client_testing.h"
23 #include "pw_rpc/test_helpers.h"
24 #include "pw_thread/sleep.h"
25 #include "pw_thread/thread.h"
26 #include "pw_thread_stl/options.h"
27 #include "pw_transfer_private/chunk_testing.h"
28 
29 namespace pw::transfer::test {
30 namespace {
31 
32 using internal::Chunk;
33 using pw_rpc::raw::Transfer;
34 
35 using namespace std::chrono_literals;
36 
TransferThreadOptions()37 thread::Options& TransferThreadOptions() {
38   static thread::stl::Options options;
39   return options;
40 }
41 
42 class ReadTransfer : public ::testing::Test {
43  protected:
ReadTransfer(size_t max_bytes_to_receive=0)44   ReadTransfer(size_t max_bytes_to_receive = 0)
45       : transfer_thread_(chunk_buffer_, encode_buffer_),
46         client_(context_.client(),
47                 context_.channel().id(),
48                 transfer_thread_,
49                 max_bytes_to_receive),
50         system_thread_(TransferThreadOptions(), transfer_thread_) {}
51 
~ReadTransfer()52   ~ReadTransfer() override {
53     transfer_thread_.Terminate();
54     system_thread_.join();
55   }
56 
57   rpc::RawClientTestContext<> context_;
58 
59   Thread<1, 1> transfer_thread_;
60   Client client_;
61 
62   std::array<std::byte, 64> chunk_buffer_;
63   std::array<std::byte, 64> encode_buffer_;
64 
65   thread::Thread system_thread_;
66 };
67 
__anond3adec360202(size_t i) 68 constexpr auto kData32 = bytes::Initialized<32>([](size_t i) { return i; });
__anond3adec360302(size_t i) 69 constexpr auto kData64 = bytes::Initialized<64>([](size_t i) { return i; });
70 
TEST_F(ReadTransfer,SingleChunk)71 TEST_F(ReadTransfer, SingleChunk) {
72   stream::MemoryWriterBuffer<64> writer;
73   Status transfer_status = Status::Unknown();
74 
75   ASSERT_EQ(OkStatus(),
76             client_.Read(3, writer, [&transfer_status](Status status) {
77               transfer_status = status;
78             }));
79 
80   transfer_thread_.WaitUntilEventIsProcessed();
81 
82   // First transfer parameters chunk is sent.
83   rpc::PayloadsView payloads =
84       context_.output().payloads<Transfer::Read>(context_.channel().id());
85   ASSERT_EQ(payloads.size(), 1u);
86   EXPECT_EQ(transfer_status, Status::Unknown());
87 
88   Chunk c0 = DecodeChunk(payloads[0]);
89   EXPECT_EQ(c0.session_id(), 3u);
90   EXPECT_EQ(c0.offset(), 0u);
91   EXPECT_EQ(c0.window_end_offset(), 64u);
92   EXPECT_EQ(c0.type(), Chunk::Type::kStart);
93 
94   context_.server().SendServerStream<Transfer::Read>(
95       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
96                       .set_session_id(3)
97                       .set_offset(0)
98                       .set_payload(kData32)
99                       .set_remaining_bytes(0)));
100   transfer_thread_.WaitUntilEventIsProcessed();
101 
102   ASSERT_EQ(payloads.size(), 2u);
103 
104   Chunk c1 = DecodeChunk(payloads.back());
105   EXPECT_EQ(c1.session_id(), 3u);
106   ASSERT_TRUE(c1.status().has_value());
107   EXPECT_EQ(c1.status().value(), OkStatus());
108 
109   EXPECT_EQ(transfer_status, OkStatus());
110   EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
111             0);
112 }
113 
TEST_F(ReadTransfer,MultiChunk)114 TEST_F(ReadTransfer, MultiChunk) {
115   stream::MemoryWriterBuffer<64> writer;
116   Status transfer_status = Status::Unknown();
117 
118   ASSERT_EQ(OkStatus(),
119             client_.Read(4, writer, [&transfer_status](Status status) {
120               transfer_status = status;
121             }));
122 
123   transfer_thread_.WaitUntilEventIsProcessed();
124 
125   // First transfer parameters chunk is sent.
126   rpc::PayloadsView payloads =
127       context_.output().payloads<Transfer::Read>(context_.channel().id());
128   ASSERT_EQ(payloads.size(), 1u);
129   EXPECT_EQ(transfer_status, Status::Unknown());
130 
131   Chunk c0 = DecodeChunk(payloads[0]);
132   EXPECT_EQ(c0.session_id(), 4u);
133   EXPECT_EQ(c0.offset(), 0u);
134   EXPECT_EQ(c0.window_end_offset(), 64u);
135   EXPECT_EQ(c0.type(), Chunk::Type::kStart);
136 
137   constexpr ConstByteSpan data(kData32);
138   context_.server().SendServerStream<Transfer::Read>(
139       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
140                       .set_session_id(4)
141                       .set_offset(0)
142                       .set_payload(data.first(16))));
143   transfer_thread_.WaitUntilEventIsProcessed();
144 
145   ASSERT_EQ(payloads.size(), 1u);
146 
147   context_.server().SendServerStream<Transfer::Read>(
148       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
149                       .set_session_id(4)
150                       .set_offset(16)
151                       .set_payload(data.subspan(16))
152                       .set_remaining_bytes(0)));
153   transfer_thread_.WaitUntilEventIsProcessed();
154 
155   ASSERT_EQ(payloads.size(), 2u);
156 
157   Chunk c1 = DecodeChunk(payloads[1]);
158   EXPECT_EQ(c1.session_id(), 4u);
159   ASSERT_TRUE(c1.status().has_value());
160   EXPECT_EQ(c1.status().value(), OkStatus());
161 
162   EXPECT_EQ(transfer_status, OkStatus());
163   EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
164             0);
165 }
166 
TEST_F(ReadTransfer,MultipleTransfers)167 TEST_F(ReadTransfer, MultipleTransfers) {
168   stream::MemoryWriterBuffer<64> writer;
169   Status transfer_status = Status::Unknown();
170 
171   ASSERT_EQ(OkStatus(),
172             client_.Read(3, writer, [&transfer_status](Status status) {
173               transfer_status = status;
174             }));
175   transfer_thread_.WaitUntilEventIsProcessed();
176 
177   context_.server().SendServerStream<Transfer::Read>(
178       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
179                       .set_session_id(3)
180                       .set_offset(0)
181                       .set_payload(kData32)
182                       .set_remaining_bytes(0)));
183   transfer_thread_.WaitUntilEventIsProcessed();
184 
185   ASSERT_EQ(transfer_status, OkStatus());
186   transfer_status = Status::Unknown();
187 
188   ASSERT_EQ(OkStatus(),
189             client_.Read(3, writer, [&transfer_status](Status status) {
190               transfer_status = status;
191             }));
192   transfer_thread_.WaitUntilEventIsProcessed();
193 
194   context_.server().SendServerStream<Transfer::Read>(
195       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
196                       .set_session_id(3)
197                       .set_offset(0)
198                       .set_payload(kData32)
199                       .set_remaining_bytes(0)));
200   transfer_thread_.WaitUntilEventIsProcessed();
201 
202   EXPECT_EQ(transfer_status, OkStatus());
203 }
204 
205 class ReadTransferMaxBytes32 : public ReadTransfer {
206  protected:
ReadTransferMaxBytes32()207   ReadTransferMaxBytes32() : ReadTransfer(/*max_bytes_to_receive=*/32) {}
208 };
209 
TEST_F(ReadTransferMaxBytes32,SetsPendingBytesFromConstructorArg)210 TEST_F(ReadTransferMaxBytes32, SetsPendingBytesFromConstructorArg) {
211   stream::MemoryWriterBuffer<64> writer;
212   EXPECT_EQ(OkStatus(), client_.Read(5, writer, [](Status) {}));
213   transfer_thread_.WaitUntilEventIsProcessed();
214 
215   // First transfer parameters chunk is sent.
216   rpc::PayloadsView payloads =
217       context_.output().payloads<Transfer::Read>(context_.channel().id());
218   ASSERT_EQ(payloads.size(), 1u);
219 
220   Chunk c0 = DecodeChunk(payloads[0]);
221   EXPECT_EQ(c0.session_id(), 5u);
222   EXPECT_EQ(c0.offset(), 0u);
223   EXPECT_EQ(c0.window_end_offset(), 32u);
224   EXPECT_EQ(c0.type(), Chunk::Type::kStart);
225 }
226 
TEST_F(ReadTransferMaxBytes32,SetsPendingBytesFromWriterLimit)227 TEST_F(ReadTransferMaxBytes32, SetsPendingBytesFromWriterLimit) {
228   stream::MemoryWriterBuffer<16> small_writer;
229   EXPECT_EQ(OkStatus(), client_.Read(5, small_writer, [](Status) {}));
230   transfer_thread_.WaitUntilEventIsProcessed();
231 
232   // First transfer parameters chunk is sent.
233   rpc::PayloadsView payloads =
234       context_.output().payloads<Transfer::Read>(context_.channel().id());
235   ASSERT_EQ(payloads.size(), 1u);
236 
237   Chunk c0 = DecodeChunk(payloads[0]);
238   EXPECT_EQ(c0.session_id(), 5u);
239   EXPECT_EQ(c0.offset(), 0u);
240   EXPECT_EQ(c0.window_end_offset(), 16u);
241   EXPECT_EQ(c0.type(), Chunk::Type::kStart);
242 }
243 
TEST_F(ReadTransferMaxBytes32,MultiParameters)244 TEST_F(ReadTransferMaxBytes32, MultiParameters) {
245   stream::MemoryWriterBuffer<64> writer;
246   Status transfer_status = Status::Unknown();
247 
248   ASSERT_EQ(OkStatus(),
249             client_.Read(6, writer, [&transfer_status](Status status) {
250               transfer_status = status;
251             }));
252   transfer_thread_.WaitUntilEventIsProcessed();
253 
254   // First transfer parameters chunk is sent.
255   rpc::PayloadsView payloads =
256       context_.output().payloads<Transfer::Read>(context_.channel().id());
257   ASSERT_EQ(payloads.size(), 1u);
258   EXPECT_EQ(transfer_status, Status::Unknown());
259 
260   Chunk c0 = DecodeChunk(payloads[0]);
261   EXPECT_EQ(c0.session_id(), 6u);
262   EXPECT_EQ(c0.offset(), 0u);
263   ASSERT_EQ(c0.window_end_offset(), 32u);
264 
265   constexpr ConstByteSpan data(kData64);
266   context_.server().SendServerStream<Transfer::Read>(
267       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
268                       .set_session_id(6)
269                       .set_offset(0)
270                       .set_payload(data.first(32))));
271   transfer_thread_.WaitUntilEventIsProcessed();
272 
273   ASSERT_EQ(payloads.size(), 2u);
274   EXPECT_EQ(transfer_status, Status::Unknown());
275 
276   // Second parameters chunk.
277   Chunk c1 = DecodeChunk(payloads[1]);
278   EXPECT_EQ(c1.session_id(), 6u);
279   EXPECT_EQ(c1.offset(), 32u);
280   ASSERT_EQ(c1.window_end_offset(), 64u);
281 
282   context_.server().SendServerStream<Transfer::Read>(
283       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
284                       .set_session_id(6)
285                       .set_offset(32)
286                       .set_payload(data.subspan(32))
287                       .set_remaining_bytes(0)));
288   transfer_thread_.WaitUntilEventIsProcessed();
289 
290   ASSERT_EQ(payloads.size(), 3u);
291 
292   Chunk c2 = DecodeChunk(payloads[2]);
293   EXPECT_EQ(c2.session_id(), 6u);
294   ASSERT_TRUE(c2.status().has_value());
295   EXPECT_EQ(c2.status().value(), OkStatus());
296 
297   EXPECT_EQ(transfer_status, OkStatus());
298   EXPECT_EQ(std::memcmp(writer.data(), data.data(), writer.bytes_written()), 0);
299 }
300 
TEST_F(ReadTransfer,UnexpectedOffset)301 TEST_F(ReadTransfer, UnexpectedOffset) {
302   stream::MemoryWriterBuffer<64> writer;
303   Status transfer_status = Status::Unknown();
304 
305   ASSERT_EQ(OkStatus(),
306             client_.Read(7, writer, [&transfer_status](Status status) {
307               transfer_status = status;
308             }));
309   transfer_thread_.WaitUntilEventIsProcessed();
310 
311   // First transfer parameters chunk is sent.
312   rpc::PayloadsView payloads =
313       context_.output().payloads<Transfer::Read>(context_.channel().id());
314   ASSERT_EQ(payloads.size(), 1u);
315   EXPECT_EQ(transfer_status, Status::Unknown());
316 
317   Chunk c0 = DecodeChunk(payloads[0]);
318   EXPECT_EQ(c0.session_id(), 7u);
319   EXPECT_EQ(c0.offset(), 0u);
320   EXPECT_EQ(c0.window_end_offset(), 64u);
321 
322   constexpr ConstByteSpan data(kData32);
323   context_.server().SendServerStream<Transfer::Read>(
324       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
325                       .set_session_id(7)
326                       .set_offset(0)
327                       .set_payload(data.first(16))));
328   transfer_thread_.WaitUntilEventIsProcessed();
329 
330   ASSERT_EQ(payloads.size(), 1u);
331   EXPECT_EQ(transfer_status, Status::Unknown());
332 
333   // Send a chunk with an incorrect offset. The client should resend parameters.
334   context_.server().SendServerStream<Transfer::Read>(
335       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
336                       .set_session_id(7)
337                       .set_offset(8)  // wrong!
338                       .set_payload(data.subspan(16))
339                       .set_remaining_bytes(0)));
340   transfer_thread_.WaitUntilEventIsProcessed();
341 
342   ASSERT_EQ(payloads.size(), 2u);
343   EXPECT_EQ(transfer_status, Status::Unknown());
344 
345   Chunk c1 = DecodeChunk(payloads[1]);
346   EXPECT_EQ(c1.session_id(), 7u);
347   EXPECT_EQ(c1.offset(), 16u);
348   EXPECT_EQ(c1.window_end_offset(), 64u);
349 
350   // Send the correct chunk, completing the transfer.
351   context_.server().SendServerStream<Transfer::Read>(
352       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
353                       .set_session_id(7)
354                       .set_offset(16)
355                       .set_payload(data.subspan(16))
356                       .set_remaining_bytes(0)));
357   transfer_thread_.WaitUntilEventIsProcessed();
358 
359   ASSERT_EQ(payloads.size(), 3u);
360 
361   Chunk c2 = DecodeChunk(payloads[2]);
362   EXPECT_EQ(c2.session_id(), 7u);
363   ASSERT_TRUE(c2.status().has_value());
364   EXPECT_EQ(c2.status().value(), OkStatus());
365 
366   EXPECT_EQ(transfer_status, OkStatus());
367   EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
368             0);
369 }
370 
TEST_F(ReadTransferMaxBytes32,TooMuchData)371 TEST_F(ReadTransferMaxBytes32, TooMuchData) {
372   stream::MemoryWriterBuffer<32> writer;
373   Status transfer_status = Status::Unknown();
374 
375   ASSERT_EQ(OkStatus(),
376             client_.Read(8, writer, [&transfer_status](Status status) {
377               transfer_status = status;
378             }));
379   transfer_thread_.WaitUntilEventIsProcessed();
380 
381   // First transfer parameters chunk is sent.
382   rpc::PayloadsView payloads =
383       context_.output().payloads<Transfer::Read>(context_.channel().id());
384   ASSERT_EQ(payloads.size(), 1u);
385   EXPECT_EQ(transfer_status, Status::Unknown());
386 
387   Chunk c0 = DecodeChunk(payloads[0]);
388   EXPECT_EQ(c0.session_id(), 8u);
389   EXPECT_EQ(c0.offset(), 0u);
390   ASSERT_EQ(c0.window_end_offset(), 32u);
391 
392   constexpr ConstByteSpan data(kData64);
393 
394   // pending_bytes == 32
395   context_.server().SendServerStream<Transfer::Read>(
396       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
397                       .set_session_id(8)
398                       .set_offset(0)
399                       .set_payload(data.first(16))));
400 
401   // pending_bytes == 16
402   context_.server().SendServerStream<Transfer::Read>(
403       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
404                       .set_session_id(8)
405                       .set_offset(16)
406                       .set_payload(data.subspan(16, 8))));
407 
408   // pending_bytes == 8, send 16 instead.
409   context_.server().SendServerStream<Transfer::Read>(
410       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
411                       .set_session_id(8)
412                       .set_offset(24)
413                       .set_payload(data.subspan(24, 16))));
414   transfer_thread_.WaitUntilEventIsProcessed();
415 
416   ASSERT_EQ(payloads.size(), 4u);
417 
418   Chunk c1 = DecodeChunk(payloads[3]);
419   EXPECT_EQ(c1.session_id(), 8u);
420   ASSERT_TRUE(c1.status().has_value());
421   EXPECT_EQ(c1.status().value(), Status::Internal());
422 
423   EXPECT_EQ(transfer_status, Status::Internal());
424 }
425 
TEST_F(ReadTransfer,ServerError)426 TEST_F(ReadTransfer, ServerError) {
427   stream::MemoryWriterBuffer<64> writer;
428   Status transfer_status = Status::Unknown();
429 
430   ASSERT_EQ(OkStatus(),
431             client_.Read(9, writer, [&transfer_status](Status status) {
432               transfer_status = status;
433             }));
434   transfer_thread_.WaitUntilEventIsProcessed();
435 
436   // First transfer parameters chunk is sent.
437   rpc::PayloadsView payloads =
438       context_.output().payloads<Transfer::Read>(context_.channel().id());
439   ASSERT_EQ(payloads.size(), 1u);
440   EXPECT_EQ(transfer_status, Status::Unknown());
441 
442   Chunk c0 = DecodeChunk(payloads[0]);
443   EXPECT_EQ(c0.session_id(), 9u);
444   EXPECT_EQ(c0.offset(), 0u);
445   ASSERT_EQ(c0.window_end_offset(), 64u);
446 
447   // Server sends an error. Client should not respond and terminate the
448   // transfer.
449   context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
450       Chunk::Final(ProtocolVersion::kLegacy, 9, Status::NotFound())));
451   transfer_thread_.WaitUntilEventIsProcessed();
452 
453   ASSERT_EQ(payloads.size(), 1u);
454   EXPECT_EQ(transfer_status, Status::NotFound());
455 }
456 
TEST_F(ReadTransfer,OnlySendsParametersOnceAfterDrop)457 TEST_F(ReadTransfer, OnlySendsParametersOnceAfterDrop) {
458   stream::MemoryWriterBuffer<64> writer;
459   Status transfer_status = Status::Unknown();
460 
461   ASSERT_EQ(OkStatus(),
462             client_.Read(10, writer, [&transfer_status](Status status) {
463               transfer_status = status;
464             }));
465   transfer_thread_.WaitUntilEventIsProcessed();
466 
467   // First transfer parameters chunk is sent.
468   rpc::PayloadsView payloads =
469       context_.output().payloads<Transfer::Read>(context_.channel().id());
470   ASSERT_EQ(payloads.size(), 1u);
471   EXPECT_EQ(transfer_status, Status::Unknown());
472 
473   Chunk c0 = DecodeChunk(payloads[0]);
474   EXPECT_EQ(c0.session_id(), 10u);
475   EXPECT_EQ(c0.offset(), 0u);
476   ASSERT_EQ(c0.window_end_offset(), 64u);
477 
478   constexpr ConstByteSpan data(kData32);
479 
480   // Send the first 8 bytes of the transfer.
481   context_.server().SendServerStream<Transfer::Read>(
482       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
483                       .set_session_id(10)
484                       .set_offset(0)
485                       .set_payload(data.first(8))));
486 
487   // Skip offset 8, send the rest starting from 16.
488   for (uint32_t offset = 16; offset < data.size(); offset += 8) {
489     context_.server().SendServerStream<Transfer::Read>(
490         EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
491                         .set_session_id(10)
492                         .set_offset(offset)
493                         .set_payload(data.subspan(offset, 8))));
494   }
495   transfer_thread_.WaitUntilEventIsProcessed();
496 
497   // Only one parameters update should be sent, with the offset of the initial
498   // dropped packet.
499   ASSERT_EQ(payloads.size(), 2u);
500 
501   Chunk c1 = DecodeChunk(payloads[1]);
502   EXPECT_EQ(c1.session_id(), 10u);
503   EXPECT_EQ(c1.offset(), 8u);
504   ASSERT_EQ(c1.window_end_offset(), 64u);
505 
506   // Send the remaining data to complete the transfer.
507   context_.server().SendServerStream<Transfer::Read>(
508       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
509                       .set_session_id(10)
510                       .set_offset(8)
511                       .set_payload(data.subspan(8))
512                       .set_remaining_bytes(0)));
513   transfer_thread_.WaitUntilEventIsProcessed();
514 
515   ASSERT_EQ(payloads.size(), 3u);
516 
517   Chunk c2 = DecodeChunk(payloads[2]);
518   EXPECT_EQ(c2.session_id(), 10u);
519   ASSERT_TRUE(c2.status().has_value());
520   EXPECT_EQ(c2.status().value(), OkStatus());
521 
522   EXPECT_EQ(transfer_status, OkStatus());
523 }
524 
TEST_F(ReadTransfer,ResendsParametersIfSentRepeatedChunkDuringRecovery)525 TEST_F(ReadTransfer, ResendsParametersIfSentRepeatedChunkDuringRecovery) {
526   stream::MemoryWriterBuffer<64> writer;
527   Status transfer_status = Status::Unknown();
528 
529   ASSERT_EQ(OkStatus(),
530             client_.Read(11, writer, [&transfer_status](Status status) {
531               transfer_status = status;
532             }));
533   transfer_thread_.WaitUntilEventIsProcessed();
534 
535   // First transfer parameters chunk is sent.
536   rpc::PayloadsView payloads =
537       context_.output().payloads<Transfer::Read>(context_.channel().id());
538   ASSERT_EQ(payloads.size(), 1u);
539   EXPECT_EQ(transfer_status, Status::Unknown());
540 
541   Chunk c0 = DecodeChunk(payloads[0]);
542   EXPECT_EQ(c0.session_id(), 11u);
543   EXPECT_EQ(c0.offset(), 0u);
544   ASSERT_EQ(c0.window_end_offset(), 64u);
545 
546   constexpr ConstByteSpan data(kData32);
547 
548   // Send the first 8 bytes of the transfer.
549   context_.server().SendServerStream<Transfer::Read>(
550       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
551                       .set_session_id(11)
552                       .set_offset(0)
553                       .set_payload(data.first(8))));
554 
555   // Skip offset 8, send the rest starting from 16.
556   for (uint32_t offset = 16; offset < data.size(); offset += 8) {
557     context_.server().SendServerStream<Transfer::Read>(
558         EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
559                         .set_session_id(11)
560                         .set_offset(offset)
561                         .set_payload(data.subspan(offset, 8))));
562   }
563   transfer_thread_.WaitUntilEventIsProcessed();
564 
565   // Only one parameters update should be sent, with the offset of the initial
566   // dropped packet.
567   ASSERT_EQ(payloads.size(), 2u);
568 
569   const Chunk last_chunk = Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
570                                .set_session_id(11)
571                                .set_offset(24)
572                                .set_payload(data.subspan(24));
573 
574   // Re-send the final chunk of the block.
575   context_.server().SendServerStream<Transfer::Read>(EncodeChunk(last_chunk));
576   transfer_thread_.WaitUntilEventIsProcessed();
577 
578   // The original drop parameters should be re-sent.
579   ASSERT_EQ(payloads.size(), 3u);
580   Chunk c2 = DecodeChunk(payloads[2]);
581   EXPECT_EQ(c2.session_id(), 11u);
582   EXPECT_EQ(c2.offset(), 8u);
583   ASSERT_EQ(c2.window_end_offset(), 64u);
584 
585   // Do it again.
586   context_.server().SendServerStream<Transfer::Read>(EncodeChunk(last_chunk));
587   transfer_thread_.WaitUntilEventIsProcessed();
588 
589   ASSERT_EQ(payloads.size(), 4u);
590   Chunk c3 = DecodeChunk(payloads[3]);
591   EXPECT_EQ(c3.session_id(), 11u);
592   EXPECT_EQ(c3.offset(), 8u);
593   ASSERT_EQ(c3.window_end_offset(), 64u);
594 
595   // Finish the transfer normally.
596   context_.server().SendServerStream<Transfer::Read>(
597       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
598                       .set_session_id(11)
599                       .set_offset(8)
600                       .set_payload(data.subspan(8))
601                       .set_remaining_bytes(0)));
602   transfer_thread_.WaitUntilEventIsProcessed();
603 
604   ASSERT_EQ(payloads.size(), 5u);
605 
606   Chunk c4 = DecodeChunk(payloads[4]);
607   EXPECT_EQ(c4.session_id(), 11u);
608   ASSERT_TRUE(c4.status().has_value());
609   EXPECT_EQ(c4.status().value(), OkStatus());
610 
611   EXPECT_EQ(transfer_status, OkStatus());
612 }
613 
614 constexpr chrono::SystemClock::duration kTestTimeout =
615     std::chrono::milliseconds(50);
616 constexpr uint8_t kTestRetries = 3;
617 
TEST_F(ReadTransfer,Timeout_ResendsCurrentParameters)618 TEST_F(ReadTransfer, Timeout_ResendsCurrentParameters) {
619   stream::MemoryWriterBuffer<64> writer;
620   Status transfer_status = Status::Unknown();
621 
622   ASSERT_EQ(OkStatus(),
623             client_.Read(
624                 12,
625                 writer,
626                 [&transfer_status](Status status) { transfer_status = status; },
627                 kTestTimeout));
628   transfer_thread_.WaitUntilEventIsProcessed();
629 
630   // First transfer parameters chunk is sent.
631   rpc::PayloadsView payloads =
632       context_.output().payloads<Transfer::Read>(context_.channel().id());
633   ASSERT_EQ(payloads.size(), 1u);
634   EXPECT_EQ(transfer_status, Status::Unknown());
635 
636   Chunk c0 = DecodeChunk(payloads.back());
637   EXPECT_EQ(c0.session_id(), 12u);
638   EXPECT_EQ(c0.offset(), 0u);
639   EXPECT_EQ(c0.window_end_offset(), 64u);
640   EXPECT_EQ(c0.type(), Chunk::Type::kStart);
641 
642   // Wait for the timeout to expire without doing anything. The client should
643   // resend its initial parameters chunk.
644   transfer_thread_.SimulateClientTimeout(12);
645   ASSERT_EQ(payloads.size(), 2u);
646 
647   Chunk c = DecodeChunk(payloads.back());
648   EXPECT_EQ(c.session_id(), 12u);
649   EXPECT_EQ(c.offset(), 0u);
650   EXPECT_EQ(c.window_end_offset(), 64u);
651   EXPECT_EQ(c0.type(), Chunk::Type::kStart);
652 
653   // Transfer has not yet completed.
654   EXPECT_EQ(transfer_status, Status::Unknown());
655 
656   // Finish the transfer following the timeout.
657   context_.server().SendServerStream<Transfer::Read>(
658       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
659                       .set_session_id(12)
660                       .set_offset(0)
661                       .set_payload(kData32)
662                       .set_remaining_bytes(0)));
663   transfer_thread_.WaitUntilEventIsProcessed();
664 
665   ASSERT_EQ(payloads.size(), 3u);
666 
667   Chunk c4 = DecodeChunk(payloads.back());
668   EXPECT_EQ(c4.session_id(), 12u);
669   ASSERT_TRUE(c4.status().has_value());
670   EXPECT_EQ(c4.status().value(), OkStatus());
671 
672   EXPECT_EQ(transfer_status, OkStatus());
673 }
674 
TEST_F(ReadTransfer,Timeout_ResendsUpdatedParameters)675 TEST_F(ReadTransfer, Timeout_ResendsUpdatedParameters) {
676   stream::MemoryWriterBuffer<64> writer;
677   Status transfer_status = Status::Unknown();
678 
679   ASSERT_EQ(OkStatus(),
680             client_.Read(
681                 13,
682                 writer,
683                 [&transfer_status](Status status) { transfer_status = status; },
684                 kTestTimeout));
685   transfer_thread_.WaitUntilEventIsProcessed();
686 
687   // First transfer parameters chunk is sent.
688   rpc::PayloadsView payloads =
689       context_.output().payloads<Transfer::Read>(context_.channel().id());
690   ASSERT_EQ(payloads.size(), 1u);
691   EXPECT_EQ(transfer_status, Status::Unknown());
692 
693   Chunk c0 = DecodeChunk(payloads.back());
694   EXPECT_EQ(c0.session_id(), 13u);
695   EXPECT_EQ(c0.offset(), 0u);
696   EXPECT_EQ(c0.window_end_offset(), 64u);
697   EXPECT_EQ(c0.type(), Chunk::Type::kStart);
698 
699   constexpr ConstByteSpan data(kData32);
700 
701   // Send some data, but not everything.
702   context_.server().SendServerStream<Transfer::Read>(
703       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
704                       .set_session_id(13)
705                       .set_offset(0)
706                       .set_payload(data.first(16))));
707   transfer_thread_.WaitUntilEventIsProcessed();
708 
709   ASSERT_EQ(payloads.size(), 1u);
710 
711   // Wait for the timeout to expire without sending more data. The client should
712   // send an updated parameters chunk, accounting for the data already received.
713   transfer_thread_.SimulateClientTimeout(13);
714   ASSERT_EQ(payloads.size(), 2u);
715 
716   Chunk c = DecodeChunk(payloads.back());
717   EXPECT_EQ(c.session_id(), 13u);
718   EXPECT_EQ(c.offset(), 16u);
719   EXPECT_EQ(c.window_end_offset(), 64u);
720   EXPECT_EQ(c.type(), Chunk::Type::kParametersRetransmit);
721 
722   // Transfer has not yet completed.
723   EXPECT_EQ(transfer_status, Status::Unknown());
724 
725   // Send the rest of the data, finishing the transfer.
726   context_.server().SendServerStream<Transfer::Read>(
727       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
728                       .set_session_id(13)
729                       .set_offset(16)
730                       .set_payload(data.subspan(16))
731                       .set_remaining_bytes(0)));
732   transfer_thread_.WaitUntilEventIsProcessed();
733 
734   ASSERT_EQ(payloads.size(), 3u);
735 
736   Chunk c4 = DecodeChunk(payloads.back());
737   EXPECT_EQ(c4.session_id(), 13u);
738   ASSERT_TRUE(c4.status().has_value());
739   EXPECT_EQ(c4.status().value(), OkStatus());
740 
741   EXPECT_EQ(transfer_status, OkStatus());
742 }
743 
TEST_F(ReadTransfer,Timeout_EndsTransferAfterMaxRetries)744 TEST_F(ReadTransfer, Timeout_EndsTransferAfterMaxRetries) {
745   stream::MemoryWriterBuffer<64> writer;
746   Status transfer_status = Status::Unknown();
747 
748   ASSERT_EQ(OkStatus(),
749             client_.Read(
750                 14,
751                 writer,
752                 [&transfer_status](Status status) { transfer_status = status; },
753                 kTestTimeout));
754   transfer_thread_.WaitUntilEventIsProcessed();
755 
756   // First transfer parameters chunk is sent.
757   rpc::PayloadsView payloads =
758       context_.output().payloads<Transfer::Read>(context_.channel().id());
759   ASSERT_EQ(payloads.size(), 1u);
760   EXPECT_EQ(transfer_status, Status::Unknown());
761 
762   Chunk c0 = DecodeChunk(payloads.back());
763   EXPECT_EQ(c0.session_id(), 14u);
764   EXPECT_EQ(c0.offset(), 0u);
765   EXPECT_EQ(c0.window_end_offset(), 64u);
766   EXPECT_EQ(c0.type(), Chunk::Type::kStart);
767 
768   for (unsigned retry = 1; retry <= kTestRetries; ++retry) {
769     // Wait for the timeout to expire without doing anything. The client should
770     // resend its parameters chunk.
771     transfer_thread_.SimulateClientTimeout(14);
772     ASSERT_EQ(payloads.size(), retry + 1);
773 
774     Chunk c = DecodeChunk(payloads.back());
775     EXPECT_EQ(c.session_id(), 14u);
776     EXPECT_EQ(c.offset(), 0u);
777     EXPECT_EQ(c.window_end_offset(), 64u);
778 
779     // Transfer has not yet completed.
780     EXPECT_EQ(transfer_status, Status::Unknown());
781   }
782 
783   // Sleep one more time after the final retry. The client should cancel the
784   // transfer at this point. As no packets were received from the server, no
785   // final status chunk should be sent.
786   transfer_thread_.SimulateClientTimeout(14);
787   ASSERT_EQ(payloads.size(), 4u);
788 
789   EXPECT_EQ(transfer_status, Status::DeadlineExceeded());
790 
791   // After finishing the transfer, nothing else should be sent. Verify this by
792   // waiting for a bit.
793   this_thread::sleep_for(kTestTimeout * 4);
794   ASSERT_EQ(payloads.size(), 4u);
795 }
796 
TEST_F(ReadTransfer,Timeout_ReceivingDataResetsRetryCount)797 TEST_F(ReadTransfer, Timeout_ReceivingDataResetsRetryCount) {
798   stream::MemoryWriterBuffer<64> writer;
799   Status transfer_status = Status::Unknown();
800 
801   constexpr ConstByteSpan data(kData32);
802 
803   ASSERT_EQ(OkStatus(),
804             client_.Read(
805                 14,
806                 writer,
807                 [&transfer_status](Status status) { transfer_status = status; },
808                 kTestTimeout));
809   transfer_thread_.WaitUntilEventIsProcessed();
810 
811   // First transfer parameters chunk is sent.
812   rpc::PayloadsView payloads =
813       context_.output().payloads<Transfer::Read>(context_.channel().id());
814   ASSERT_EQ(payloads.size(), 1u);
815   EXPECT_EQ(transfer_status, Status::Unknown());
816 
817   Chunk c0 = DecodeChunk(payloads.back());
818   EXPECT_EQ(c0.session_id(), 14u);
819   EXPECT_EQ(c0.offset(), 0u);
820   EXPECT_EQ(c0.window_end_offset(), 64u);
821 
822   // Simulate one less timeout than the maximum amount of retries.
823   for (unsigned retry = 1; retry <= kTestRetries - 1; ++retry) {
824     transfer_thread_.SimulateClientTimeout(14);
825     ASSERT_EQ(payloads.size(), retry + 1);
826 
827     Chunk c = DecodeChunk(payloads.back());
828     EXPECT_EQ(c.session_id(), 14u);
829     EXPECT_EQ(c.offset(), 0u);
830     EXPECT_EQ(c.window_end_offset(), 64u);
831 
832     // Transfer has not yet completed.
833     EXPECT_EQ(transfer_status, Status::Unknown());
834   }
835 
836   // Send some data.
837   context_.server().SendServerStream<Transfer::Read>(
838       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
839                       .set_session_id(14)
840                       .set_offset(0)
841                       .set_payload(data.first(16))));
842   transfer_thread_.WaitUntilEventIsProcessed();
843   ASSERT_EQ(payloads.size(), 3u);
844 
845   // Time out a couple more times. The context's retry count should have been
846   // reset, so it should go through the standard retry flow instead of
847   // terminating the transfer.
848   transfer_thread_.SimulateClientTimeout(14);
849   ASSERT_EQ(payloads.size(), 4u);
850 
851   Chunk c = DecodeChunk(payloads.back());
852   EXPECT_FALSE(c.status().has_value());
853   EXPECT_EQ(c.session_id(), 14u);
854   EXPECT_EQ(c.offset(), 16u);
855   EXPECT_EQ(c.window_end_offset(), 64u);
856 
857   transfer_thread_.SimulateClientTimeout(14);
858   ASSERT_EQ(payloads.size(), 5u);
859 
860   c = DecodeChunk(payloads.back());
861   EXPECT_FALSE(c.status().has_value());
862   EXPECT_EQ(c.session_id(), 14u);
863   EXPECT_EQ(c.offset(), 16u);
864   EXPECT_EQ(c.window_end_offset(), 64u);
865 
866   // Ensure we don't leave a dangling reference to transfer_status.
867   client_.CancelTransfer(14);
868   transfer_thread_.WaitUntilEventIsProcessed();
869 }
870 
TEST_F(ReadTransfer,InitialPacketFails_OnCompletedCalledWithDataLoss)871 TEST_F(ReadTransfer, InitialPacketFails_OnCompletedCalledWithDataLoss) {
872   stream::MemoryWriterBuffer<64> writer;
873   Status transfer_status = Status::Unknown();
874 
875   context_.output().set_send_status(Status::Unauthenticated());
876 
877   ASSERT_EQ(OkStatus(),
878             client_.Read(
879                 14,
880                 writer,
881                 [&transfer_status](Status status) {
882                   ASSERT_EQ(transfer_status,
883                             Status::Unknown());  // Must only call once
884                   transfer_status = status;
885                 },
886                 kTestTimeout));
887   transfer_thread_.WaitUntilEventIsProcessed();
888 
889   EXPECT_EQ(transfer_status, Status::Internal());
890 }
891 
892 class WriteTransfer : public ::testing::Test {
893  protected:
WriteTransfer()894   WriteTransfer()
895       : transfer_thread_(chunk_buffer_, encode_buffer_),
896         client_(context_.client(), context_.channel().id(), transfer_thread_),
897         system_thread_(TransferThreadOptions(), transfer_thread_) {}
898 
~WriteTransfer()899   ~WriteTransfer() override {
900     transfer_thread_.Terminate();
901     system_thread_.join();
902   }
903 
904   rpc::RawClientTestContext<> context_;
905 
906   Thread<1, 1> transfer_thread_;
907   Client client_;
908 
909   std::array<std::byte, 64> chunk_buffer_;
910   std::array<std::byte, 64> encode_buffer_;
911 
912   thread::Thread system_thread_;
913 };
914 
TEST_F(WriteTransfer,SingleChunk)915 TEST_F(WriteTransfer, SingleChunk) {
916   stream::MemoryReader reader(kData32);
917   Status transfer_status = Status::Unknown();
918 
919   ASSERT_EQ(OkStatus(),
920             client_.Write(3, reader, [&transfer_status](Status status) {
921               transfer_status = status;
922             }));
923   transfer_thread_.WaitUntilEventIsProcessed();
924 
925   // The client begins by sending the ID of the resource to transfer.
926   rpc::PayloadsView payloads =
927       context_.output().payloads<Transfer::Write>(context_.channel().id());
928   ASSERT_EQ(payloads.size(), 1u);
929   EXPECT_EQ(transfer_status, Status::Unknown());
930 
931   Chunk c0 = DecodeChunk(payloads[0]);
932   EXPECT_EQ(c0.session_id(), 3u);
933   EXPECT_EQ(c0.resource_id(), 3u);
934   EXPECT_EQ(c0.type(), Chunk::Type::kStart);
935 
936   // Send transfer parameters. Client should send a data chunk and the final
937   // chunk.
938   rpc::test::WaitForPackets(context_.output(), 2, [this] {
939     context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
940         Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
941             .set_session_id(3)
942             .set_offset(0)
943             .set_window_end_offset(64)
944             .set_max_chunk_size_bytes(32)));
945   });
946 
947   ASSERT_EQ(payloads.size(), 3u);
948 
949   Chunk c1 = DecodeChunk(payloads[1]);
950   EXPECT_EQ(c1.session_id(), 3u);
951   EXPECT_EQ(c1.offset(), 0u);
952   EXPECT_TRUE(c1.has_payload());
953   EXPECT_EQ(
954       std::memcmp(c1.payload().data(), kData32.data(), c1.payload().size()), 0);
955 
956   Chunk c2 = DecodeChunk(payloads[2]);
957   EXPECT_EQ(c2.session_id(), 3u);
958   ASSERT_TRUE(c2.remaining_bytes().has_value());
959   EXPECT_EQ(c2.remaining_bytes().value(), 0u);
960 
961   EXPECT_EQ(transfer_status, Status::Unknown());
962 
963   // Send the final status chunk to complete the transfer.
964   context_.server().SendServerStream<Transfer::Write>(
965       EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
966   transfer_thread_.WaitUntilEventIsProcessed();
967 
968   EXPECT_EQ(payloads.size(), 3u);
969   EXPECT_EQ(transfer_status, OkStatus());
970 }
971 
TEST_F(WriteTransfer,MultiChunk)972 TEST_F(WriteTransfer, MultiChunk) {
973   stream::MemoryReader reader(kData32);
974   Status transfer_status = Status::Unknown();
975 
976   ASSERT_EQ(OkStatus(),
977             client_.Write(4, reader, [&transfer_status](Status status) {
978               transfer_status = status;
979             }));
980   transfer_thread_.WaitUntilEventIsProcessed();
981 
982   // The client begins by sending the ID of the resource to transfer.
983   rpc::PayloadsView payloads =
984       context_.output().payloads<Transfer::Write>(context_.channel().id());
985   ASSERT_EQ(payloads.size(), 1u);
986   EXPECT_EQ(transfer_status, Status::Unknown());
987 
988   Chunk c0 = DecodeChunk(payloads[0]);
989   EXPECT_EQ(c0.session_id(), 4u);
990   EXPECT_EQ(c0.resource_id(), 4u);
991   EXPECT_EQ(c0.type(), Chunk::Type::kStart);
992 
993   // Send transfer parameters with a chunk size smaller than the data.
994 
995   // Client should send two data chunks and the final chunk.
996   rpc::test::WaitForPackets(context_.output(), 3, [this] {
997     context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
998         Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
999             .set_session_id(4)
1000             .set_offset(0)
1001             .set_window_end_offset(64)
1002             .set_max_chunk_size_bytes(16)));
1003   });
1004 
1005   ASSERT_EQ(payloads.size(), 4u);
1006 
1007   Chunk c1 = DecodeChunk(payloads[1]);
1008   EXPECT_EQ(c1.session_id(), 4u);
1009   EXPECT_EQ(c1.offset(), 0u);
1010   EXPECT_TRUE(c1.has_payload());
1011   EXPECT_EQ(
1012       std::memcmp(c1.payload().data(), kData32.data(), c1.payload().size()), 0);
1013 
1014   Chunk c2 = DecodeChunk(payloads[2]);
1015   EXPECT_EQ(c2.session_id(), 4u);
1016   EXPECT_EQ(c2.offset(), 16u);
1017   EXPECT_TRUE(c2.has_payload());
1018   EXPECT_EQ(std::memcmp(c2.payload().data(),
1019                         kData32.data() + c2.offset(),
1020                         c2.payload().size()),
1021             0);
1022 
1023   Chunk c3 = DecodeChunk(payloads[3]);
1024   EXPECT_EQ(c3.session_id(), 4u);
1025   ASSERT_TRUE(c3.remaining_bytes().has_value());
1026   EXPECT_EQ(c3.remaining_bytes().value(), 0u);
1027 
1028   EXPECT_EQ(transfer_status, Status::Unknown());
1029 
1030   // Send the final status chunk to complete the transfer.
1031   context_.server().SendServerStream<Transfer::Write>(
1032       EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 4, OkStatus())));
1033   transfer_thread_.WaitUntilEventIsProcessed();
1034 
1035   EXPECT_EQ(payloads.size(), 4u);
1036   EXPECT_EQ(transfer_status, OkStatus());
1037 }
1038 
TEST_F(WriteTransfer,OutOfOrder_SeekSupported)1039 TEST_F(WriteTransfer, OutOfOrder_SeekSupported) {
1040   stream::MemoryReader reader(kData32);
1041   Status transfer_status = Status::Unknown();
1042 
1043   ASSERT_EQ(OkStatus(),
1044             client_.Write(5, reader, [&transfer_status](Status status) {
1045               transfer_status = status;
1046             }));
1047   transfer_thread_.WaitUntilEventIsProcessed();
1048 
1049   // The client begins by sending the ID of the resource to transfer.
1050   rpc::PayloadsView payloads =
1051       context_.output().payloads<Transfer::Write>(context_.channel().id());
1052   ASSERT_EQ(payloads.size(), 1u);
1053   EXPECT_EQ(transfer_status, Status::Unknown());
1054 
1055   Chunk c0 = DecodeChunk(payloads[0]);
1056   EXPECT_EQ(c0.session_id(), 5u);
1057   EXPECT_EQ(c0.resource_id(), 5u);
1058   EXPECT_EQ(c0.type(), Chunk::Type::kStart);
1059 
1060   // Send transfer parameters with a nonzero offset, requesting a seek.
1061   // Client should send a data chunk and the final chunk.
1062   rpc::test::WaitForPackets(context_.output(), 2, [this] {
1063     context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1064         Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
1065             .set_session_id(5)
1066             .set_offset(16)
1067             .set_window_end_offset(64)
1068             .set_max_chunk_size_bytes(32)));
1069   });
1070 
1071   ASSERT_EQ(payloads.size(), 3u);
1072 
1073   Chunk c1 = DecodeChunk(payloads[1]);
1074   EXPECT_EQ(c1.session_id(), 5u);
1075   EXPECT_EQ(c1.offset(), 16u);
1076   EXPECT_TRUE(c1.has_payload());
1077   EXPECT_EQ(std::memcmp(c1.payload().data(),
1078                         kData32.data() + c1.offset(),
1079                         c1.payload().size()),
1080             0);
1081 
1082   Chunk c2 = DecodeChunk(payloads[2]);
1083   EXPECT_EQ(c2.session_id(), 5u);
1084   ASSERT_TRUE(c2.remaining_bytes().has_value());
1085   EXPECT_EQ(c2.remaining_bytes().value(), 0u);
1086 
1087   EXPECT_EQ(transfer_status, Status::Unknown());
1088 
1089   // Send the final status chunk to complete the transfer.
1090   context_.server().SendServerStream<Transfer::Write>(
1091       EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 5, OkStatus())));
1092   transfer_thread_.WaitUntilEventIsProcessed();
1093 
1094   EXPECT_EQ(payloads.size(), 3u);
1095   EXPECT_EQ(transfer_status, OkStatus());
1096 }
1097 
1098 class FakeNonSeekableReader final : public stream::NonSeekableReader {
1099  public:
FakeNonSeekableReader(ConstByteSpan data)1100   FakeNonSeekableReader(ConstByteSpan data) : data_(data), position_(0) {}
1101 
1102  private:
DoRead(ByteSpan out)1103   StatusWithSize DoRead(ByteSpan out) final {
1104     if (position_ == data_.size()) {
1105       return StatusWithSize::OutOfRange();
1106     }
1107 
1108     size_t to_copy = std::min(out.size(), data_.size() - position_);
1109     std::memcpy(out.data(), data_.data() + position_, to_copy);
1110     position_ += to_copy;
1111 
1112     return StatusWithSize(to_copy);
1113   }
1114 
1115   ConstByteSpan data_;
1116   size_t position_;
1117 };
1118 
TEST_F(WriteTransfer,OutOfOrder_SeekNotSupported)1119 TEST_F(WriteTransfer, OutOfOrder_SeekNotSupported) {
1120   FakeNonSeekableReader reader(kData32);
1121   Status transfer_status = Status::Unknown();
1122 
1123   ASSERT_EQ(OkStatus(),
1124             client_.Write(6, reader, [&transfer_status](Status status) {
1125               transfer_status = status;
1126             }));
1127   transfer_thread_.WaitUntilEventIsProcessed();
1128 
1129   // The client begins by sending the ID of the resource to transfer.
1130   rpc::PayloadsView payloads =
1131       context_.output().payloads<Transfer::Write>(context_.channel().id());
1132   ASSERT_EQ(payloads.size(), 1u);
1133   EXPECT_EQ(transfer_status, Status::Unknown());
1134 
1135   Chunk c0 = DecodeChunk(payloads[0]);
1136   EXPECT_EQ(c0.session_id(), 6u);
1137   EXPECT_EQ(c0.resource_id(), 6u);
1138   EXPECT_EQ(c0.type(), Chunk::Type::kStart);
1139 
1140   // Send transfer parameters with a nonzero offset, requesting a seek.
1141   context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1142       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
1143           .set_session_id(6)
1144           .set_offset(16)
1145           .set_window_end_offset(64)
1146           .set_max_chunk_size_bytes(32)));
1147   transfer_thread_.WaitUntilEventIsProcessed();
1148 
1149   // Client should send a status chunk and end the transfer.
1150   ASSERT_EQ(payloads.size(), 2u);
1151 
1152   Chunk c1 = DecodeChunk(payloads[1]);
1153   EXPECT_EQ(c1.session_id(), 6u);
1154   EXPECT_EQ(c1.type(), Chunk::Type::kCompletion);
1155   ASSERT_TRUE(c1.status().has_value());
1156   EXPECT_EQ(c1.status().value(), Status::Unimplemented());
1157 
1158   EXPECT_EQ(transfer_status, Status::Unimplemented());
1159 }
1160 
TEST_F(WriteTransfer,ServerError)1161 TEST_F(WriteTransfer, ServerError) {
1162   stream::MemoryReader reader(kData32);
1163   Status transfer_status = Status::Unknown();
1164 
1165   ASSERT_EQ(OkStatus(),
1166             client_.Write(7, reader, [&transfer_status](Status status) {
1167               transfer_status = status;
1168             }));
1169   transfer_thread_.WaitUntilEventIsProcessed();
1170 
1171   // The client begins by sending the ID of the resource to transfer.
1172   rpc::PayloadsView payloads =
1173       context_.output().payloads<Transfer::Write>(context_.channel().id());
1174   ASSERT_EQ(payloads.size(), 1u);
1175   EXPECT_EQ(transfer_status, Status::Unknown());
1176 
1177   Chunk c0 = DecodeChunk(payloads[0]);
1178   EXPECT_EQ(c0.session_id(), 7u);
1179   EXPECT_EQ(c0.resource_id(), 7u);
1180   EXPECT_EQ(c0.type(), Chunk::Type::kStart);
1181 
1182   // Send an error from the server.
1183   context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1184       Chunk::Final(ProtocolVersion::kLegacy, 7, Status::NotFound())));
1185   transfer_thread_.WaitUntilEventIsProcessed();
1186 
1187   // Client should not respond and terminate the transfer.
1188   EXPECT_EQ(payloads.size(), 1u);
1189   EXPECT_EQ(transfer_status, Status::NotFound());
1190 }
1191 
TEST_F(WriteTransfer,AbortIfZeroBytesAreRequested)1192 TEST_F(WriteTransfer, AbortIfZeroBytesAreRequested) {
1193   stream::MemoryReader reader(kData32);
1194   Status transfer_status = Status::Unknown();
1195 
1196   ASSERT_EQ(OkStatus(),
1197             client_.Write(9, reader, [&transfer_status](Status status) {
1198               transfer_status = status;
1199             }));
1200   transfer_thread_.WaitUntilEventIsProcessed();
1201 
1202   // The client begins by sending the ID of the resource to transfer.
1203   rpc::PayloadsView payloads =
1204       context_.output().payloads<Transfer::Write>(context_.channel().id());
1205   ASSERT_EQ(payloads.size(), 1u);
1206   EXPECT_EQ(transfer_status, Status::Unknown());
1207 
1208   Chunk c0 = DecodeChunk(payloads[0]);
1209   EXPECT_EQ(c0.session_id(), 9u);
1210   EXPECT_EQ(c0.resource_id(), 9u);
1211   EXPECT_EQ(c0.type(), Chunk::Type::kStart);
1212 
1213   // Send an invalid transfer parameters chunk with 0 pending bytes.
1214   context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1215       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
1216           .set_session_id(9)
1217           .set_offset(0)
1218           .set_window_end_offset(0)
1219           .set_max_chunk_size_bytes(32)));
1220   transfer_thread_.WaitUntilEventIsProcessed();
1221 
1222   // Client should send a status chunk and end the transfer.
1223   ASSERT_EQ(payloads.size(), 2u);
1224 
1225   Chunk c1 = DecodeChunk(payloads[1]);
1226   EXPECT_EQ(c1.session_id(), 9u);
1227   ASSERT_TRUE(c1.status().has_value());
1228   EXPECT_EQ(c1.status().value(), Status::ResourceExhausted());
1229 
1230   EXPECT_EQ(transfer_status, Status::ResourceExhausted());
1231 }
1232 
TEST_F(WriteTransfer,Timeout_RetriesWithInitialChunk)1233 TEST_F(WriteTransfer, Timeout_RetriesWithInitialChunk) {
1234   stream::MemoryReader reader(kData32);
1235   Status transfer_status = Status::Unknown();
1236 
1237   ASSERT_EQ(OkStatus(),
1238             client_.Write(
1239                 10,
1240                 reader,
1241                 [&transfer_status](Status status) { transfer_status = status; },
1242                 kTestTimeout));
1243   transfer_thread_.WaitUntilEventIsProcessed();
1244 
1245   // The client begins by sending the ID of the resource to transfer.
1246   rpc::PayloadsView payloads =
1247       context_.output().payloads<Transfer::Write>(context_.channel().id());
1248   ASSERT_EQ(payloads.size(), 1u);
1249   EXPECT_EQ(transfer_status, Status::Unknown());
1250 
1251   Chunk c0 = DecodeChunk(payloads.back());
1252   EXPECT_EQ(c0.session_id(), 10u);
1253   EXPECT_EQ(c0.resource_id(), 10u);
1254   EXPECT_EQ(c0.type(), Chunk::Type::kStart);
1255 
1256   // Wait for the timeout to expire without doing anything. The client should
1257   // resend the initial transmit chunk.
1258   transfer_thread_.SimulateClientTimeout(10);
1259   ASSERT_EQ(payloads.size(), 2u);
1260 
1261   Chunk c = DecodeChunk(payloads.back());
1262   EXPECT_EQ(c.session_id(), 10u);
1263   EXPECT_EQ(c.resource_id(), 10u);
1264   EXPECT_EQ(c.type(), Chunk::Type::kStart);
1265 
1266   // Transfer has not yet completed.
1267   EXPECT_EQ(transfer_status, Status::Unknown());
1268 
1269   // Ensure we don't leave a dangling reference to transfer_status.
1270   client_.CancelTransfer(10);
1271   transfer_thread_.WaitUntilEventIsProcessed();
1272 }
1273 
TEST_F(WriteTransfer,Timeout_RetriesWithMostRecentChunk)1274 TEST_F(WriteTransfer, Timeout_RetriesWithMostRecentChunk) {
1275   stream::MemoryReader reader(kData32);
1276   Status transfer_status = Status::Unknown();
1277 
1278   ASSERT_EQ(OkStatus(),
1279             client_.Write(
1280                 11,
1281                 reader,
1282                 [&transfer_status](Status status) { transfer_status = status; },
1283                 kTestTimeout));
1284   transfer_thread_.WaitUntilEventIsProcessed();
1285 
1286   // The client begins by sending the ID of the resource to transfer.
1287   rpc::PayloadsView payloads =
1288       context_.output().payloads<Transfer::Write>(context_.channel().id());
1289   ASSERT_EQ(payloads.size(), 1u);
1290   EXPECT_EQ(transfer_status, Status::Unknown());
1291 
1292   Chunk c0 = DecodeChunk(payloads.back());
1293   EXPECT_EQ(c0.session_id(), 11u);
1294   EXPECT_EQ(c0.resource_id(), 11u);
1295   EXPECT_EQ(c0.type(), Chunk::Type::kStart);
1296 
1297   // Send the first parameters chunk.
1298   rpc::test::WaitForPackets(context_.output(), 2, [this] {
1299     context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1300         Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
1301             .set_session_id(11)
1302             .set_offset(0)
1303             .set_window_end_offset(16)
1304             .set_max_chunk_size_bytes(8)));
1305   });
1306   ASSERT_EQ(payloads.size(), 3u);
1307 
1308   EXPECT_EQ(transfer_status, Status::Unknown());
1309 
1310   Chunk c1 = DecodeChunk(payloads[1]);
1311   EXPECT_EQ(c1.session_id(), 11u);
1312   EXPECT_EQ(c1.offset(), 0u);
1313   EXPECT_EQ(c1.payload().size(), 8u);
1314   EXPECT_EQ(
1315       std::memcmp(c1.payload().data(), kData32.data(), c1.payload().size()), 0);
1316 
1317   Chunk c2 = DecodeChunk(payloads[2]);
1318   EXPECT_EQ(c2.session_id(), 11u);
1319   EXPECT_EQ(c2.offset(), 8u);
1320   EXPECT_EQ(c2.payload().size(), 8u);
1321   EXPECT_EQ(std::memcmp(c2.payload().data(),
1322                         kData32.data() + c2.offset(),
1323                         c2.payload().size()),
1324             0);
1325 
1326   // Wait for the timeout to expire without doing anything. The client should
1327   // resend the most recently sent chunk.
1328   transfer_thread_.SimulateClientTimeout(11);
1329   ASSERT_EQ(payloads.size(), 4u);
1330 
1331   Chunk c3 = DecodeChunk(payloads[3]);
1332   EXPECT_EQ(c3.session_id(), c2.session_id());
1333   EXPECT_EQ(c3.offset(), c2.offset());
1334   EXPECT_EQ(c3.payload().size(), c2.payload().size());
1335   EXPECT_EQ(std::memcmp(
1336                 c3.payload().data(), c2.payload().data(), c3.payload().size()),
1337             0);
1338 
1339   // Transfer has not yet completed.
1340   EXPECT_EQ(transfer_status, Status::Unknown());
1341 
1342   // Ensure we don't leave a dangling reference to transfer_status.
1343   client_.CancelTransfer(11);
1344   transfer_thread_.WaitUntilEventIsProcessed();
1345 }
1346 
TEST_F(WriteTransfer,Timeout_RetriesWithSingleChunkTransfer)1347 TEST_F(WriteTransfer, Timeout_RetriesWithSingleChunkTransfer) {
1348   stream::MemoryReader reader(kData32);
1349   Status transfer_status = Status::Unknown();
1350 
1351   ASSERT_EQ(OkStatus(),
1352             client_.Write(
1353                 12,
1354                 reader,
1355                 [&transfer_status](Status status) { transfer_status = status; },
1356                 kTestTimeout));
1357   transfer_thread_.WaitUntilEventIsProcessed();
1358 
1359   // The client begins by sending the ID of the resource to transfer.
1360   rpc::PayloadsView payloads =
1361       context_.output().payloads<Transfer::Write>(context_.channel().id());
1362   ASSERT_EQ(payloads.size(), 1u);
1363   EXPECT_EQ(transfer_status, Status::Unknown());
1364 
1365   Chunk c0 = DecodeChunk(payloads.back());
1366   EXPECT_EQ(c0.session_id(), 12u);
1367   EXPECT_EQ(c0.resource_id(), 12u);
1368   EXPECT_EQ(c0.type(), Chunk::Type::kStart);
1369 
1370   // Send the first parameters chunk, requesting all the data. The client should
1371   // respond with one data chunk and a remaining_bytes = 0 chunk.
1372   rpc::test::WaitForPackets(context_.output(), 2, [this] {
1373     context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1374         Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
1375             .set_session_id(12)
1376             .set_offset(0)
1377             .set_window_end_offset(64)
1378             .set_max_chunk_size_bytes(64)));
1379   });
1380   ASSERT_EQ(payloads.size(), 3u);
1381 
1382   EXPECT_EQ(transfer_status, Status::Unknown());
1383 
1384   Chunk c1 = DecodeChunk(payloads[1]);
1385   EXPECT_EQ(c1.session_id(), 12u);
1386   EXPECT_EQ(c1.offset(), 0u);
1387   EXPECT_EQ(c1.payload().size(), 32u);
1388   EXPECT_EQ(
1389       std::memcmp(c1.payload().data(), kData32.data(), c1.payload().size()), 0);
1390 
1391   Chunk c2 = DecodeChunk(payloads[2]);
1392   EXPECT_EQ(c2.session_id(), 12u);
1393   ASSERT_TRUE(c2.remaining_bytes().has_value());
1394   EXPECT_EQ(c2.remaining_bytes().value(), 0u);
1395 
1396   // Wait for the timeout to expire without doing anything. The client should
1397   // resend the data chunk.
1398   transfer_thread_.SimulateClientTimeout(12);
1399   ASSERT_EQ(payloads.size(), 4u);
1400 
1401   Chunk c3 = DecodeChunk(payloads[3]);
1402   EXPECT_EQ(c3.session_id(), c1.session_id());
1403   EXPECT_EQ(c3.offset(), c1.offset());
1404   EXPECT_EQ(c3.payload().size(), c1.payload().size());
1405   EXPECT_EQ(std::memcmp(
1406                 c3.payload().data(), c1.payload().data(), c3.payload().size()),
1407             0);
1408 
1409   // The remaining_bytes = 0 chunk should be resent on the next parameters.
1410   context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1411       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
1412           .set_session_id(12)
1413           .set_offset(32)
1414           .set_window_end_offset(64)));
1415   transfer_thread_.WaitUntilEventIsProcessed();
1416 
1417   ASSERT_EQ(payloads.size(), 5u);
1418 
1419   Chunk c4 = DecodeChunk(payloads[4]);
1420   EXPECT_EQ(c4.session_id(), 12u);
1421   ASSERT_TRUE(c4.remaining_bytes().has_value());
1422   EXPECT_EQ(c4.remaining_bytes().value(), 0u);
1423 
1424   context_.server().SendServerStream<Transfer::Write>(
1425       EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 12, OkStatus())));
1426   transfer_thread_.WaitUntilEventIsProcessed();
1427 
1428   EXPECT_EQ(transfer_status, OkStatus());
1429 }
1430 
TEST_F(WriteTransfer,Timeout_EndsTransferAfterMaxRetries)1431 TEST_F(WriteTransfer, Timeout_EndsTransferAfterMaxRetries) {
1432   stream::MemoryReader reader(kData32);
1433   Status transfer_status = Status::Unknown();
1434 
1435   ASSERT_EQ(OkStatus(),
1436             client_.Write(
1437                 13,
1438                 reader,
1439                 [&transfer_status](Status status) { transfer_status = status; },
1440                 kTestTimeout));
1441   transfer_thread_.WaitUntilEventIsProcessed();
1442 
1443   // The client begins by sending the ID of the resource to transfer.
1444   rpc::PayloadsView payloads =
1445       context_.output().payloads<Transfer::Write>(context_.channel().id());
1446   ASSERT_EQ(payloads.size(), 1u);
1447   EXPECT_EQ(transfer_status, Status::Unknown());
1448 
1449   Chunk c0 = DecodeChunk(payloads.back());
1450   EXPECT_EQ(c0.session_id(), 13u);
1451   EXPECT_EQ(c0.resource_id(), 13u);
1452   EXPECT_EQ(c0.type(), Chunk::Type::kStart);
1453 
1454   for (unsigned retry = 1; retry <= kTestRetries; ++retry) {
1455     // Wait for the timeout to expire without doing anything. The client should
1456     // resend the initial transmit chunk.
1457     transfer_thread_.SimulateClientTimeout(13);
1458     ASSERT_EQ(payloads.size(), retry + 1);
1459 
1460     Chunk c = DecodeChunk(payloads.back());
1461     EXPECT_EQ(c.session_id(), 13u);
1462     EXPECT_EQ(c.resource_id(), 13u);
1463     EXPECT_EQ(c.type(), Chunk::Type::kStart);
1464 
1465     // Transfer has not yet completed.
1466     EXPECT_EQ(transfer_status, Status::Unknown());
1467   }
1468 
1469   // Sleep one more time after the final retry. The client should cancel the
1470   // transfer at this point. As no packets were received from the server, no
1471   // final status chunk should be sent.
1472   transfer_thread_.SimulateClientTimeout(13);
1473   ASSERT_EQ(payloads.size(), 4u);
1474 
1475   EXPECT_EQ(transfer_status, Status::DeadlineExceeded());
1476 
1477   // After finishing the transfer, nothing else should be sent. Verify this by
1478   // waiting for a bit.
1479   this_thread::sleep_for(kTestTimeout * 4);
1480   ASSERT_EQ(payloads.size(), 4u);
1481 
1482   // Ensure we don't leave a dangling reference to transfer_status.
1483   client_.CancelTransfer(13);
1484   transfer_thread_.WaitUntilEventIsProcessed();
1485 }
1486 
TEST_F(WriteTransfer,Timeout_NonSeekableReaderEndsTransfer)1487 TEST_F(WriteTransfer, Timeout_NonSeekableReaderEndsTransfer) {
1488   FakeNonSeekableReader reader(kData32);
1489   Status transfer_status = Status::Unknown();
1490 
1491   ASSERT_EQ(OkStatus(),
1492             client_.Write(
1493                 14,
1494                 reader,
1495                 [&transfer_status](Status status) { transfer_status = status; },
1496                 kTestTimeout));
1497   transfer_thread_.WaitUntilEventIsProcessed();
1498 
1499   // The client begins by sending the ID of the resource to transfer.
1500   rpc::PayloadsView payloads =
1501       context_.output().payloads<Transfer::Write>(context_.channel().id());
1502   ASSERT_EQ(payloads.size(), 1u);
1503   EXPECT_EQ(transfer_status, Status::Unknown());
1504 
1505   Chunk c0 = DecodeChunk(payloads.back());
1506   EXPECT_EQ(c0.session_id(), 14u);
1507   EXPECT_EQ(c0.resource_id(), 14u);
1508   EXPECT_EQ(c0.type(), Chunk::Type::kStart);
1509 
1510   // Send the first parameters chunk.
1511   rpc::test::WaitForPackets(context_.output(), 2, [this] {
1512     context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1513         Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
1514             .set_session_id(14)
1515             .set_offset(0)
1516             .set_window_end_offset(16)
1517             .set_max_chunk_size_bytes(8)));
1518   });
1519   ASSERT_EQ(payloads.size(), 3u);
1520 
1521   EXPECT_EQ(transfer_status, Status::Unknown());
1522 
1523   Chunk c1 = DecodeChunk(payloads[1]);
1524   EXPECT_EQ(c1.session_id(), 14u);
1525   EXPECT_EQ(c1.offset(), 0u);
1526   EXPECT_TRUE(c1.has_payload());
1527   EXPECT_EQ(c1.payload().size(), 8u);
1528   EXPECT_EQ(
1529       std::memcmp(c1.payload().data(), kData32.data(), c1.payload().size()), 0);
1530 
1531   Chunk c2 = DecodeChunk(payloads[2]);
1532   EXPECT_EQ(c2.session_id(), 14u);
1533   EXPECT_EQ(c2.offset(), 8u);
1534   EXPECT_TRUE(c2.has_payload());
1535   EXPECT_EQ(c2.payload().size(), 8u);
1536   EXPECT_EQ(std::memcmp(c2.payload().data(),
1537                         kData32.data() + c2.offset(),
1538                         c2.payload().size()),
1539             0);
1540 
1541   // Wait for the timeout to expire without doing anything. The client should
1542   // fail to seek back and end the transfer.
1543   transfer_thread_.SimulateClientTimeout(14);
1544   ASSERT_EQ(payloads.size(), 4u);
1545 
1546   Chunk c3 = DecodeChunk(payloads[3]);
1547   EXPECT_EQ(c3.protocol_version(), ProtocolVersion::kLegacy);
1548   EXPECT_EQ(c3.session_id(), 14u);
1549   ASSERT_TRUE(c3.status().has_value());
1550   EXPECT_EQ(c3.status().value(), Status::DeadlineExceeded());
1551 
1552   EXPECT_EQ(transfer_status, Status::DeadlineExceeded());
1553 
1554   // Ensure we don't leave a dangling reference to transfer_status.
1555   client_.CancelTransfer(14);
1556   transfer_thread_.WaitUntilEventIsProcessed();
1557 }
1558 
TEST_F(WriteTransfer,ManualCancel)1559 TEST_F(WriteTransfer, ManualCancel) {
1560   stream::MemoryReader reader(kData32);
1561   Status transfer_status = Status::Unknown();
1562 
1563   ASSERT_EQ(OkStatus(),
1564             client_.Write(
1565                 15,
1566                 reader,
1567                 [&transfer_status](Status status) { transfer_status = status; },
1568                 kTestTimeout));
1569   transfer_thread_.WaitUntilEventIsProcessed();
1570 
1571   // The client begins by sending the ID of the resource to transfer.
1572   rpc::PayloadsView payloads =
1573       context_.output().payloads<Transfer::Write>(context_.channel().id());
1574   ASSERT_EQ(payloads.size(), 1u);
1575   EXPECT_EQ(transfer_status, Status::Unknown());
1576 
1577   Chunk chunk = DecodeChunk(payloads.back());
1578   EXPECT_EQ(chunk.session_id(), 15u);
1579   EXPECT_EQ(chunk.resource_id(), 15u);
1580   EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
1581 
1582   // Get a response from the server, then cancel the transfer.
1583   context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1584       Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
1585           .set_session_id(15)
1586           .set_offset(0)
1587           .set_window_end_offset(64)
1588           .set_max_chunk_size_bytes(32)));
1589   transfer_thread_.WaitUntilEventIsProcessed();
1590   ASSERT_EQ(payloads.size(), 2u);
1591 
1592   client_.CancelTransfer(15);
1593   transfer_thread_.WaitUntilEventIsProcessed();
1594 
1595   // Client should send a cancellation chunk to the server.
1596   ASSERT_EQ(payloads.size(), 3u);
1597   chunk = DecodeChunk(payloads.back());
1598   EXPECT_EQ(chunk.session_id(), 15u);
1599   ASSERT_EQ(chunk.type(), Chunk::Type::kCompletion);
1600   EXPECT_EQ(chunk.status().value(), Status::Cancelled());
1601 
1602   EXPECT_EQ(transfer_status, Status::Cancelled());
1603 }
1604 
TEST_F(WriteTransfer,ManualCancel_NoContact)1605 TEST_F(WriteTransfer, ManualCancel_NoContact) {
1606   stream::MemoryReader reader(kData32);
1607   Status transfer_status = Status::Unknown();
1608 
1609   ASSERT_EQ(OkStatus(),
1610             client_.Write(
1611                 15,
1612                 reader,
1613                 [&transfer_status](Status status) { transfer_status = status; },
1614                 kTestTimeout));
1615   transfer_thread_.WaitUntilEventIsProcessed();
1616 
1617   // The client begins by sending the ID of the resource to transfer.
1618   rpc::PayloadsView payloads =
1619       context_.output().payloads<Transfer::Write>(context_.channel().id());
1620   ASSERT_EQ(payloads.size(), 1u);
1621   EXPECT_EQ(transfer_status, Status::Unknown());
1622 
1623   Chunk chunk = DecodeChunk(payloads.back());
1624   EXPECT_EQ(chunk.session_id(), 15u);
1625   EXPECT_EQ(chunk.resource_id(), 15u);
1626   EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
1627 
1628   // Cancel transfer without a server response. No final chunk should be sent.
1629   client_.CancelTransfer(15);
1630   transfer_thread_.WaitUntilEventIsProcessed();
1631 
1632   ASSERT_EQ(payloads.size(), 1u);
1633 
1634   EXPECT_EQ(transfer_status, Status::Cancelled());
1635 }
1636 
TEST_F(ReadTransfer,Version2_SingleChunk)1637 TEST_F(ReadTransfer, Version2_SingleChunk) {
1638   stream::MemoryWriterBuffer<64> writer;
1639   Status transfer_status = Status::Unknown();
1640 
1641   ASSERT_EQ(OkStatus(),
1642             client_.Read(
1643                 3,
1644                 writer,
1645                 [&transfer_status](Status status) { transfer_status = status; },
1646                 cfg::kDefaultChunkTimeout,
1647                 ProtocolVersion::kVersionTwo));
1648 
1649   transfer_thread_.WaitUntilEventIsProcessed();
1650 
1651   // Initial chunk of the transfer is sent. This chunk should contain all the
1652   // fields from both legacy and version 2 protocols for backwards
1653   // compatibility.
1654   rpc::PayloadsView payloads =
1655       context_.output().payloads<Transfer::Read>(context_.channel().id());
1656   ASSERT_EQ(payloads.size(), 1u);
1657   EXPECT_EQ(transfer_status, Status::Unknown());
1658 
1659   Chunk chunk = DecodeChunk(payloads[0]);
1660   EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
1661   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1662   EXPECT_EQ(chunk.session_id(), 3u);
1663   EXPECT_EQ(chunk.resource_id(), 3u);
1664   EXPECT_EQ(chunk.offset(), 0u);
1665   EXPECT_EQ(chunk.window_end_offset(), 64u);
1666   EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
1667 
1668   // The server responds with a START_ACK, continuing the version 2 handshake
1669   // and assigning a session_id to the transfer.
1670   context_.server().SendServerStream<Transfer::Read>(
1671       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
1672                       .set_session_id(29)
1673                       .set_resource_id(3)));
1674   transfer_thread_.WaitUntilEventIsProcessed();
1675 
1676   ASSERT_EQ(payloads.size(), 2u);
1677 
1678   // Client should accept the session_id with a START_ACK_CONFIRMATION,
1679   // additionally containing the initial parameters for the read transfer.
1680   chunk = DecodeChunk(payloads.back());
1681   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
1682   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1683   EXPECT_EQ(chunk.session_id(), 29u);
1684   EXPECT_FALSE(chunk.resource_id().has_value());
1685   EXPECT_EQ(chunk.offset(), 0u);
1686   EXPECT_EQ(chunk.window_end_offset(), 64u);
1687   EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
1688 
1689   // Send all the transfer data. Client should accept it and complete the
1690   // transfer.
1691   context_.server().SendServerStream<Transfer::Read>(
1692       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
1693                       .set_session_id(29)
1694                       .set_offset(0)
1695                       .set_payload(kData32)
1696                       .set_remaining_bytes(0)));
1697   transfer_thread_.WaitUntilEventIsProcessed();
1698 
1699   ASSERT_EQ(payloads.size(), 3u);
1700 
1701   chunk = DecodeChunk(payloads.back());
1702   EXPECT_EQ(chunk.session_id(), 29u);
1703   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
1704   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1705   ASSERT_TRUE(chunk.status().has_value());
1706   EXPECT_EQ(chunk.status().value(), OkStatus());
1707 
1708   EXPECT_EQ(transfer_status, OkStatus());
1709   EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
1710             0);
1711 
1712   context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
1713       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
1714           .set_session_id(29)));
1715 }
1716 
TEST_F(ReadTransfer,Version2_ServerRunsLegacy)1717 TEST_F(ReadTransfer, Version2_ServerRunsLegacy) {
1718   stream::MemoryWriterBuffer<64> writer;
1719   Status transfer_status = Status::Unknown();
1720 
1721   ASSERT_EQ(OkStatus(),
1722             client_.Read(
1723                 3,
1724                 writer,
1725                 [&transfer_status](Status status) { transfer_status = status; },
1726                 cfg::kDefaultChunkTimeout,
1727                 ProtocolVersion::kVersionTwo));
1728 
1729   transfer_thread_.WaitUntilEventIsProcessed();
1730 
1731   // Initial chunk of the transfer is sent. This chunk should contain all the
1732   // fields from both legacy and version 2 protocols for backwards
1733   // compatibility.
1734   rpc::PayloadsView payloads =
1735       context_.output().payloads<Transfer::Read>(context_.channel().id());
1736   ASSERT_EQ(payloads.size(), 1u);
1737   EXPECT_EQ(transfer_status, Status::Unknown());
1738 
1739   Chunk chunk = DecodeChunk(payloads[0]);
1740   EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
1741   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1742   EXPECT_EQ(chunk.session_id(), 3u);
1743   EXPECT_EQ(chunk.resource_id(), 3u);
1744   EXPECT_EQ(chunk.offset(), 0u);
1745   EXPECT_EQ(chunk.window_end_offset(), 64u);
1746   EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
1747 
1748   // Instead of a START_ACK to continue the handshake, the server responds with
1749   // an immediate data chunk, indicating that it is running the legacy protocol
1750   // version. Client should revert to legacy, using the resource_id of 3 as the
1751   // session_id, and complete the transfer.
1752   context_.server().SendServerStream<Transfer::Read>(
1753       EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1754                       .set_session_id(3)
1755                       .set_offset(0)
1756                       .set_payload(kData32)
1757                       .set_remaining_bytes(0)));
1758   transfer_thread_.WaitUntilEventIsProcessed();
1759 
1760   ASSERT_EQ(payloads.size(), 2u);
1761 
1762   chunk = DecodeChunk(payloads.back());
1763   EXPECT_EQ(chunk.session_id(), 3u);
1764   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
1765   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kLegacy);
1766   ASSERT_TRUE(chunk.status().has_value());
1767   EXPECT_EQ(chunk.status().value(), OkStatus());
1768 
1769   EXPECT_EQ(transfer_status, OkStatus());
1770   EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
1771             0);
1772 }
1773 
TEST_F(ReadTransfer,Version2_TimeoutDuringHandshake)1774 TEST_F(ReadTransfer, Version2_TimeoutDuringHandshake) {
1775   stream::MemoryWriterBuffer<64> writer;
1776   Status transfer_status = Status::Unknown();
1777 
1778   ASSERT_EQ(OkStatus(),
1779             client_.Read(
1780                 3,
1781                 writer,
1782                 [&transfer_status](Status status) { transfer_status = status; },
1783                 cfg::kDefaultChunkTimeout,
1784                 ProtocolVersion::kVersionTwo));
1785 
1786   transfer_thread_.WaitUntilEventIsProcessed();
1787 
1788   // Initial chunk of the transfer is sent. This chunk should contain all the
1789   // fields from both legacy and version 2 protocols for backwards
1790   // compatibility.
1791   rpc::PayloadsView payloads =
1792       context_.output().payloads<Transfer::Read>(context_.channel().id());
1793   ASSERT_EQ(payloads.size(), 1u);
1794   EXPECT_EQ(transfer_status, Status::Unknown());
1795 
1796   Chunk chunk = DecodeChunk(payloads.back());
1797   EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
1798   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1799   EXPECT_EQ(chunk.session_id(), 3u);
1800   EXPECT_EQ(chunk.resource_id(), 3u);
1801   EXPECT_EQ(chunk.offset(), 0u);
1802   EXPECT_EQ(chunk.window_end_offset(), 64u);
1803   EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
1804 
1805   // Wait for the timeout to expire without doing anything. The client should
1806   // resend the initial chunk.
1807   transfer_thread_.SimulateClientTimeout(3);
1808   ASSERT_EQ(payloads.size(), 2u);
1809 
1810   chunk = DecodeChunk(payloads.back());
1811   EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
1812   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1813   EXPECT_EQ(chunk.session_id(), 3u);
1814   EXPECT_EQ(chunk.resource_id(), 3u);
1815   EXPECT_EQ(chunk.offset(), 0u);
1816   EXPECT_EQ(chunk.window_end_offset(), 64u);
1817   EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
1818 
1819   // This time, the server responds, continuing the handshake and transfer.
1820   context_.server().SendServerStream<Transfer::Read>(
1821       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
1822                       .set_session_id(31)
1823                       .set_resource_id(3)));
1824   transfer_thread_.WaitUntilEventIsProcessed();
1825 
1826   ASSERT_EQ(payloads.size(), 3u);
1827 
1828   chunk = DecodeChunk(payloads.back());
1829   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
1830   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1831   EXPECT_EQ(chunk.session_id(), 31u);
1832   EXPECT_FALSE(chunk.resource_id().has_value());
1833   EXPECT_EQ(chunk.offset(), 0u);
1834   EXPECT_EQ(chunk.window_end_offset(), 64u);
1835   EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
1836 
1837   context_.server().SendServerStream<Transfer::Read>(
1838       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
1839                       .set_session_id(31)
1840                       .set_offset(0)
1841                       .set_payload(kData32)
1842                       .set_remaining_bytes(0)));
1843   transfer_thread_.WaitUntilEventIsProcessed();
1844 
1845   ASSERT_EQ(payloads.size(), 4u);
1846 
1847   chunk = DecodeChunk(payloads.back());
1848   EXPECT_EQ(chunk.session_id(), 31u);
1849   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
1850   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1851   ASSERT_TRUE(chunk.status().has_value());
1852   EXPECT_EQ(chunk.status().value(), OkStatus());
1853 
1854   EXPECT_EQ(transfer_status, OkStatus());
1855   EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
1856             0);
1857 
1858   context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
1859       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
1860           .set_session_id(31)));
1861 }
1862 
TEST_F(ReadTransfer,Version2_TimeoutAfterHandshake)1863 TEST_F(ReadTransfer, Version2_TimeoutAfterHandshake) {
1864   stream::MemoryWriterBuffer<64> writer;
1865   Status transfer_status = Status::Unknown();
1866 
1867   ASSERT_EQ(OkStatus(),
1868             client_.Read(
1869                 3,
1870                 writer,
1871                 [&transfer_status](Status status) { transfer_status = status; },
1872                 cfg::kDefaultChunkTimeout,
1873                 ProtocolVersion::kVersionTwo));
1874 
1875   transfer_thread_.WaitUntilEventIsProcessed();
1876 
1877   // Initial chunk of the transfer is sent. This chunk should contain all the
1878   // fields from both legacy and version 2 protocols for backwards
1879   // compatibility.
1880   rpc::PayloadsView payloads =
1881       context_.output().payloads<Transfer::Read>(context_.channel().id());
1882   ASSERT_EQ(payloads.size(), 1u);
1883   EXPECT_EQ(transfer_status, Status::Unknown());
1884 
1885   Chunk chunk = DecodeChunk(payloads.back());
1886   EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
1887   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1888   EXPECT_EQ(chunk.session_id(), 3u);
1889   EXPECT_EQ(chunk.resource_id(), 3u);
1890   EXPECT_EQ(chunk.offset(), 0u);
1891   EXPECT_EQ(chunk.window_end_offset(), 64u);
1892   EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
1893 
1894   // The server responds with a START_ACK, continuing the version 2 handshake
1895   // and assigning a session_id to the transfer.
1896   context_.server().SendServerStream<Transfer::Read>(
1897       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
1898                       .set_session_id(33)
1899                       .set_resource_id(3)));
1900   transfer_thread_.WaitUntilEventIsProcessed();
1901 
1902   ASSERT_EQ(payloads.size(), 2u);
1903 
1904   // Client should accept the session_id with a START_ACK_CONFIRMATION,
1905   // additionally containing the initial parameters for the read transfer.
1906   chunk = DecodeChunk(payloads.back());
1907   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
1908   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1909   EXPECT_EQ(chunk.session_id(), 33u);
1910   EXPECT_FALSE(chunk.resource_id().has_value());
1911   EXPECT_EQ(chunk.offset(), 0u);
1912   EXPECT_EQ(chunk.window_end_offset(), 64u);
1913   EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
1914 
1915   // Wait for the timeout to expire without doing anything. The client should
1916   // resend the confirmation chunk.
1917   transfer_thread_.SimulateClientTimeout(33);
1918   ASSERT_EQ(payloads.size(), 3u);
1919 
1920   chunk = DecodeChunk(payloads.back());
1921   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
1922   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1923   EXPECT_EQ(chunk.session_id(), 33u);
1924   EXPECT_FALSE(chunk.resource_id().has_value());
1925   EXPECT_EQ(chunk.offset(), 0u);
1926   EXPECT_EQ(chunk.window_end_offset(), 64u);
1927   EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
1928 
1929   // The server responds and the transfer should continue normally.
1930   context_.server().SendServerStream<Transfer::Read>(
1931       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
1932                       .set_session_id(33)
1933                       .set_offset(0)
1934                       .set_payload(kData32)
1935                       .set_remaining_bytes(0)));
1936   transfer_thread_.WaitUntilEventIsProcessed();
1937 
1938   ASSERT_EQ(payloads.size(), 4u);
1939 
1940   chunk = DecodeChunk(payloads.back());
1941   EXPECT_EQ(chunk.session_id(), 33u);
1942   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
1943   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1944   ASSERT_TRUE(chunk.status().has_value());
1945   EXPECT_EQ(chunk.status().value(), OkStatus());
1946 
1947   EXPECT_EQ(transfer_status, OkStatus());
1948   EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
1949             0);
1950 
1951   context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
1952       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
1953           .set_session_id(33)));
1954 }
1955 
TEST_F(ReadTransfer,Version2_ServerErrorDuringHandshake)1956 TEST_F(ReadTransfer, Version2_ServerErrorDuringHandshake) {
1957   stream::MemoryWriterBuffer<64> writer;
1958   Status transfer_status = Status::Unknown();
1959 
1960   ASSERT_EQ(OkStatus(),
1961             client_.Read(
1962                 3,
1963                 writer,
1964                 [&transfer_status](Status status) { transfer_status = status; },
1965                 cfg::kDefaultChunkTimeout,
1966                 ProtocolVersion::kVersionTwo));
1967 
1968   transfer_thread_.WaitUntilEventIsProcessed();
1969 
1970   // Initial chunk of the transfer is sent. This chunk should contain all the
1971   // fields from both legacy and version 2 protocols for backwards
1972   // compatibility.
1973   rpc::PayloadsView payloads =
1974       context_.output().payloads<Transfer::Read>(context_.channel().id());
1975   ASSERT_EQ(payloads.size(), 1u);
1976   EXPECT_EQ(transfer_status, Status::Unknown());
1977 
1978   Chunk chunk = DecodeChunk(payloads.back());
1979   EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
1980   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1981   EXPECT_EQ(chunk.session_id(), 3u);
1982   EXPECT_EQ(chunk.resource_id(), 3u);
1983   EXPECT_EQ(chunk.offset(), 0u);
1984   EXPECT_EQ(chunk.window_end_offset(), 64u);
1985   EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
1986 
1987   // The server responds to the start request with an error.
1988   context_.server().SendServerStream<Transfer::Read>(EncodeChunk(Chunk::Final(
1989       ProtocolVersion::kVersionTwo, 3, Status::Unauthenticated())));
1990   transfer_thread_.WaitUntilEventIsProcessed();
1991 
1992   EXPECT_EQ(payloads.size(), 1u);
1993   EXPECT_EQ(transfer_status, Status::Unauthenticated());
1994 }
1995 
TEST_F(ReadTransfer,Version2_TimeoutWaitingForCompletionAckRetries)1996 TEST_F(ReadTransfer, Version2_TimeoutWaitingForCompletionAckRetries) {
1997   stream::MemoryWriterBuffer<64> writer;
1998   Status transfer_status = Status::Unknown();
1999 
2000   ASSERT_EQ(OkStatus(),
2001             client_.Read(
2002                 3,
2003                 writer,
2004                 [&transfer_status](Status status) { transfer_status = status; },
2005                 cfg::kDefaultChunkTimeout,
2006                 ProtocolVersion::kVersionTwo));
2007 
2008   transfer_thread_.WaitUntilEventIsProcessed();
2009 
2010   // Initial chunk of the transfer is sent. This chunk should contain all the
2011   // fields from both legacy and version 2 protocols for backwards
2012   // compatibility.
2013   rpc::PayloadsView payloads =
2014       context_.output().payloads<Transfer::Read>(context_.channel().id());
2015   ASSERT_EQ(payloads.size(), 1u);
2016   EXPECT_EQ(transfer_status, Status::Unknown());
2017 
2018   Chunk chunk = DecodeChunk(payloads[0]);
2019   EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2020   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2021   EXPECT_EQ(chunk.session_id(), 3u);
2022   EXPECT_EQ(chunk.resource_id(), 3u);
2023   EXPECT_EQ(chunk.offset(), 0u);
2024   EXPECT_EQ(chunk.window_end_offset(), 64u);
2025   EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
2026 
2027   // The server responds with a START_ACK, continuing the version 2 handshake
2028   // and assigning a session_id to the transfer.
2029   context_.server().SendServerStream<Transfer::Read>(
2030       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
2031                       .set_session_id(29)
2032                       .set_resource_id(3)));
2033   transfer_thread_.WaitUntilEventIsProcessed();
2034 
2035   ASSERT_EQ(payloads.size(), 2u);
2036 
2037   // Client should accept the session_id with a START_ACK_CONFIRMATION,
2038   // additionally containing the initial parameters for the read transfer.
2039   chunk = DecodeChunk(payloads.back());
2040   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
2041   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2042   EXPECT_EQ(chunk.session_id(), 29u);
2043   EXPECT_FALSE(chunk.resource_id().has_value());
2044   EXPECT_EQ(chunk.offset(), 0u);
2045   EXPECT_EQ(chunk.window_end_offset(), 64u);
2046   EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
2047 
2048   // Send all the transfer data. Client should accept it and complete the
2049   // transfer.
2050   context_.server().SendServerStream<Transfer::Read>(
2051       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2052                       .set_session_id(29)
2053                       .set_offset(0)
2054                       .set_payload(kData32)
2055                       .set_remaining_bytes(0)));
2056   transfer_thread_.WaitUntilEventIsProcessed();
2057 
2058   ASSERT_EQ(payloads.size(), 3u);
2059 
2060   chunk = DecodeChunk(payloads.back());
2061   EXPECT_EQ(chunk.session_id(), 29u);
2062   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2063   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2064   ASSERT_TRUE(chunk.status().has_value());
2065   EXPECT_EQ(chunk.status().value(), OkStatus());
2066 
2067   EXPECT_EQ(transfer_status, OkStatus());
2068   EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
2069             0);
2070 
2071   // Time out instead of sending a completion ACK. THe transfer should resend
2072   // its completion chunk.
2073   transfer_thread_.SimulateClientTimeout(29);
2074   ASSERT_EQ(payloads.size(), 4u);
2075 
2076   // Reset transfer_status to check whether the handler is called again.
2077   transfer_status = Status::Unknown();
2078 
2079   chunk = DecodeChunk(payloads.back());
2080   EXPECT_EQ(chunk.session_id(), 29u);
2081   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2082   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2083   ASSERT_TRUE(chunk.status().has_value());
2084   EXPECT_EQ(chunk.status().value(), OkStatus());
2085 
2086   // Transfer handler should not be called a second time in response to the
2087   // re-sent completion chunk.
2088   EXPECT_EQ(transfer_status, Status::Unknown());
2089 
2090   // Send a completion ACK to end the transfer.
2091   context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
2092       Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
2093           .set_session_id(29)));
2094   transfer_thread_.WaitUntilEventIsProcessed();
2095 
2096   // No further chunks should be sent following the ACK.
2097   transfer_thread_.SimulateClientTimeout(29);
2098   ASSERT_EQ(payloads.size(), 4u);
2099 }
2100 
TEST_F(ReadTransfer,Version2_TimeoutWaitingForCompletionAckEndsTransferAfterRetries)2101 TEST_F(ReadTransfer,
2102        Version2_TimeoutWaitingForCompletionAckEndsTransferAfterRetries) {
2103   stream::MemoryWriterBuffer<64> writer;
2104   Status transfer_status = Status::Unknown();
2105 
2106   ASSERT_EQ(OkStatus(),
2107             client_.Read(
2108                 3,
2109                 writer,
2110                 [&transfer_status](Status status) { transfer_status = status; },
2111                 cfg::kDefaultChunkTimeout,
2112                 ProtocolVersion::kVersionTwo));
2113 
2114   transfer_thread_.WaitUntilEventIsProcessed();
2115 
2116   // Initial chunk of the transfer is sent. This chunk should contain all the
2117   // fields from both legacy and version 2 protocols for backwards
2118   // compatibility.
2119   rpc::PayloadsView payloads =
2120       context_.output().payloads<Transfer::Read>(context_.channel().id());
2121   ASSERT_EQ(payloads.size(), 1u);
2122   EXPECT_EQ(transfer_status, Status::Unknown());
2123 
2124   Chunk chunk = DecodeChunk(payloads[0]);
2125   EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2126   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2127   EXPECT_EQ(chunk.session_id(), 3u);
2128   EXPECT_EQ(chunk.resource_id(), 3u);
2129   EXPECT_EQ(chunk.offset(), 0u);
2130   EXPECT_EQ(chunk.window_end_offset(), 64u);
2131   EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
2132 
2133   // The server responds with a START_ACK, continuing the version 2 handshake
2134   // and assigning a session_id to the transfer.
2135   context_.server().SendServerStream<Transfer::Read>(
2136       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
2137                       .set_session_id(29)
2138                       .set_resource_id(3)));
2139   transfer_thread_.WaitUntilEventIsProcessed();
2140 
2141   ASSERT_EQ(payloads.size(), 2u);
2142 
2143   // Client should accept the session_id with a START_ACK_CONFIRMATION,
2144   // additionally containing the initial parameters for the read transfer.
2145   chunk = DecodeChunk(payloads.back());
2146   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
2147   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2148   EXPECT_EQ(chunk.session_id(), 29u);
2149   EXPECT_FALSE(chunk.resource_id().has_value());
2150   EXPECT_EQ(chunk.offset(), 0u);
2151   EXPECT_EQ(chunk.window_end_offset(), 64u);
2152   EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
2153 
2154   // Send all the transfer data. Client should accept it and complete the
2155   // transfer.
2156   context_.server().SendServerStream<Transfer::Read>(
2157       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2158                       .set_session_id(29)
2159                       .set_offset(0)
2160                       .set_payload(kData32)
2161                       .set_remaining_bytes(0)));
2162   transfer_thread_.WaitUntilEventIsProcessed();
2163 
2164   ASSERT_EQ(payloads.size(), 3u);
2165 
2166   chunk = DecodeChunk(payloads.back());
2167   EXPECT_EQ(chunk.session_id(), 29u);
2168   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2169   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2170   ASSERT_TRUE(chunk.status().has_value());
2171   EXPECT_EQ(chunk.status().value(), OkStatus());
2172 
2173   EXPECT_EQ(transfer_status, OkStatus());
2174   EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
2175             0);
2176 
2177   // Time out instead of sending a completion ACK. THe transfer should resend
2178   // its completion chunk at first, then terminate after the maximum number of
2179   // retries.
2180   transfer_thread_.SimulateClientTimeout(29);
2181   ASSERT_EQ(payloads.size(), 4u);  // Retry 1.
2182 
2183   transfer_thread_.SimulateClientTimeout(29);
2184   ASSERT_EQ(payloads.size(), 5u);  // Retry 2.
2185 
2186   transfer_thread_.SimulateClientTimeout(29);
2187   ASSERT_EQ(payloads.size(), 6u);  // Retry 3.
2188 
2189   transfer_thread_.SimulateClientTimeout(29);
2190   ASSERT_EQ(payloads.size(), 6u);  // No more retries; transfer has ended.
2191 }
2192 
TEST_F(WriteTransfer,Version2_SingleChunk)2193 TEST_F(WriteTransfer, Version2_SingleChunk) {
2194   stream::MemoryReader reader(kData32);
2195   Status transfer_status = Status::Unknown();
2196 
2197   ASSERT_EQ(OkStatus(),
2198             client_.Write(
2199                 3,
2200                 reader,
2201                 [&transfer_status](Status status) { transfer_status = status; },
2202                 cfg::kDefaultChunkTimeout,
2203                 ProtocolVersion::kVersionTwo));
2204   transfer_thread_.WaitUntilEventIsProcessed();
2205 
2206   // The client begins by sending the ID of the resource to transfer.
2207   rpc::PayloadsView payloads =
2208       context_.output().payloads<Transfer::Write>(context_.channel().id());
2209   ASSERT_EQ(payloads.size(), 1u);
2210   EXPECT_EQ(transfer_status, Status::Unknown());
2211 
2212   Chunk chunk = DecodeChunk(payloads.back());
2213   EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2214   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2215   EXPECT_EQ(chunk.session_id(), 3u);
2216   EXPECT_EQ(chunk.resource_id(), 3u);
2217 
2218   // The server responds with a START_ACK, continuing the version 2 handshake
2219   // and assigning a session_id to the transfer.
2220   context_.server().SendServerStream<Transfer::Write>(
2221       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
2222                       .set_session_id(29)
2223                       .set_resource_id(3)));
2224   transfer_thread_.WaitUntilEventIsProcessed();
2225 
2226   ASSERT_EQ(payloads.size(), 2u);
2227 
2228   // Client should accept the session_id with a START_ACK_CONFIRMATION.
2229   chunk = DecodeChunk(payloads.back());
2230   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
2231   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2232   EXPECT_EQ(chunk.session_id(), 29u);
2233   EXPECT_FALSE(chunk.resource_id().has_value());
2234 
2235   // The server can then begin the data transfer by sending its transfer
2236   // parameters. Client should respond with a data chunk and the final chunk.
2237   rpc::test::WaitForPackets(context_.output(), 2, [this] {
2238     context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
2239         Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kParametersRetransmit)
2240             .set_session_id(29)
2241             .set_offset(0)
2242             .set_window_end_offset(64)
2243             .set_max_chunk_size_bytes(32)));
2244   });
2245 
2246   ASSERT_EQ(payloads.size(), 4u);
2247 
2248   chunk = DecodeChunk(payloads[2]);
2249   EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2250   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2251   EXPECT_EQ(chunk.session_id(), 29u);
2252   EXPECT_EQ(chunk.offset(), 0u);
2253   EXPECT_TRUE(chunk.has_payload());
2254   EXPECT_EQ(std::memcmp(
2255                 chunk.payload().data(), kData32.data(), chunk.payload().size()),
2256             0);
2257 
2258   chunk = DecodeChunk(payloads[3]);
2259   EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2260   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2261   EXPECT_EQ(chunk.session_id(), 29u);
2262   ASSERT_TRUE(chunk.remaining_bytes().has_value());
2263   EXPECT_EQ(chunk.remaining_bytes().value(), 0u);
2264 
2265   EXPECT_EQ(transfer_status, Status::Unknown());
2266 
2267   // Send the final status chunk to complete the transfer.
2268   context_.server().SendServerStream<Transfer::Write>(
2269       EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 29, OkStatus())));
2270   transfer_thread_.WaitUntilEventIsProcessed();
2271 
2272   // Client should acknowledge the completion of the transfer.
2273   EXPECT_EQ(payloads.size(), 5u);
2274 
2275   chunk = DecodeChunk(payloads[4]);
2276   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletionAck);
2277   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2278   EXPECT_EQ(chunk.session_id(), 29u);
2279 
2280   EXPECT_EQ(transfer_status, OkStatus());
2281 }
2282 
TEST_F(WriteTransfer,Version2_ServerRunsLegacy)2283 TEST_F(WriteTransfer, Version2_ServerRunsLegacy) {
2284   stream::MemoryReader reader(kData32);
2285   Status transfer_status = Status::Unknown();
2286 
2287   ASSERT_EQ(OkStatus(),
2288             client_.Write(
2289                 3,
2290                 reader,
2291                 [&transfer_status](Status status) { transfer_status = status; },
2292                 cfg::kDefaultChunkTimeout,
2293                 ProtocolVersion::kVersionTwo));
2294   transfer_thread_.WaitUntilEventIsProcessed();
2295 
2296   // The client begins by sending the ID of the resource to transfer.
2297   rpc::PayloadsView payloads =
2298       context_.output().payloads<Transfer::Write>(context_.channel().id());
2299   ASSERT_EQ(payloads.size(), 1u);
2300   EXPECT_EQ(transfer_status, Status::Unknown());
2301 
2302   Chunk chunk = DecodeChunk(payloads.back());
2303   EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2304   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2305   EXPECT_EQ(chunk.session_id(), 3u);
2306   EXPECT_EQ(chunk.resource_id(), 3u);
2307 
2308   // Instead of continuing the handshake with a START_ACK, the server
2309   // immediately sends parameters, indicating that it only supports the legacy
2310   // protocol. Client should switch over to legacy and continue the transfer.
2311   rpc::test::WaitForPackets(context_.output(), 2, [this] {
2312     context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
2313         Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
2314             .set_session_id(3)
2315             .set_offset(0)
2316             .set_window_end_offset(64)
2317             .set_max_chunk_size_bytes(32)));
2318   });
2319 
2320   ASSERT_EQ(payloads.size(), 3u);
2321 
2322   chunk = DecodeChunk(payloads[1]);
2323   EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2324   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kLegacy);
2325   EXPECT_EQ(chunk.session_id(), 3u);
2326   EXPECT_EQ(chunk.offset(), 0u);
2327   EXPECT_TRUE(chunk.has_payload());
2328   EXPECT_EQ(std::memcmp(
2329                 chunk.payload().data(), kData32.data(), chunk.payload().size()),
2330             0);
2331 
2332   chunk = DecodeChunk(payloads[2]);
2333   EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2334   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kLegacy);
2335   EXPECT_EQ(chunk.session_id(), 3u);
2336   ASSERT_TRUE(chunk.remaining_bytes().has_value());
2337   EXPECT_EQ(chunk.remaining_bytes().value(), 0u);
2338 
2339   EXPECT_EQ(transfer_status, Status::Unknown());
2340 
2341   // Send the final status chunk to complete the transfer.
2342   context_.server().SendServerStream<Transfer::Write>(
2343       EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
2344   transfer_thread_.WaitUntilEventIsProcessed();
2345 
2346   EXPECT_EQ(payloads.size(), 3u);
2347   EXPECT_EQ(transfer_status, OkStatus());
2348 }
2349 
TEST_F(WriteTransfer,Version2_RetryDuringHandshake)2350 TEST_F(WriteTransfer, Version2_RetryDuringHandshake) {
2351   stream::MemoryReader reader(kData32);
2352   Status transfer_status = Status::Unknown();
2353 
2354   ASSERT_EQ(OkStatus(),
2355             client_.Write(
2356                 3,
2357                 reader,
2358                 [&transfer_status](Status status) { transfer_status = status; },
2359                 cfg::kDefaultChunkTimeout,
2360                 ProtocolVersion::kVersionTwo));
2361   transfer_thread_.WaitUntilEventIsProcessed();
2362 
2363   // The client begins by sending the ID of the resource to transfer.
2364   rpc::PayloadsView payloads =
2365       context_.output().payloads<Transfer::Write>(context_.channel().id());
2366   ASSERT_EQ(payloads.size(), 1u);
2367   EXPECT_EQ(transfer_status, Status::Unknown());
2368 
2369   Chunk chunk = DecodeChunk(payloads.back());
2370   EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2371   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2372   EXPECT_EQ(chunk.session_id(), 3u);
2373   EXPECT_EQ(chunk.resource_id(), 3u);
2374 
2375   // Time out waiting for a server response. The client should resend the
2376   // initial packet.
2377   transfer_thread_.SimulateClientTimeout(3);
2378   ASSERT_EQ(payloads.size(), 2u);
2379 
2380   chunk = DecodeChunk(payloads.back());
2381   EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2382   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2383   EXPECT_EQ(chunk.session_id(), 3u);
2384   EXPECT_EQ(chunk.resource_id(), 3u);
2385 
2386   // This time, respond with the correct continuation packet. The transfer
2387   // should resume and complete normally.
2388   context_.server().SendServerStream<Transfer::Write>(
2389       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
2390                       .set_session_id(31)
2391                       .set_resource_id(3)));
2392   transfer_thread_.WaitUntilEventIsProcessed();
2393 
2394   ASSERT_EQ(payloads.size(), 3u);
2395 
2396   chunk = DecodeChunk(payloads.back());
2397   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
2398   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2399   EXPECT_EQ(chunk.session_id(), 31u);
2400   EXPECT_FALSE(chunk.resource_id().has_value());
2401 
2402   rpc::test::WaitForPackets(context_.output(), 2, [this] {
2403     context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
2404         Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kParametersRetransmit)
2405             .set_session_id(31)
2406             .set_offset(0)
2407             .set_window_end_offset(64)
2408             .set_max_chunk_size_bytes(32)));
2409   });
2410 
2411   ASSERT_EQ(payloads.size(), 5u);
2412 
2413   chunk = DecodeChunk(payloads[3]);
2414   EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2415   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2416   EXPECT_EQ(chunk.session_id(), 31u);
2417   EXPECT_EQ(chunk.offset(), 0u);
2418   EXPECT_TRUE(chunk.has_payload());
2419   EXPECT_EQ(std::memcmp(
2420                 chunk.payload().data(), kData32.data(), chunk.payload().size()),
2421             0);
2422 
2423   chunk = DecodeChunk(payloads[4]);
2424   EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2425   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2426   EXPECT_EQ(chunk.session_id(), 31u);
2427   ASSERT_TRUE(chunk.remaining_bytes().has_value());
2428   EXPECT_EQ(chunk.remaining_bytes().value(), 0u);
2429 
2430   EXPECT_EQ(transfer_status, Status::Unknown());
2431 
2432   context_.server().SendServerStream<Transfer::Write>(
2433       EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 31, OkStatus())));
2434   transfer_thread_.WaitUntilEventIsProcessed();
2435 
2436   // Client should acknowledge the completion of the transfer.
2437   EXPECT_EQ(payloads.size(), 6u);
2438 
2439   chunk = DecodeChunk(payloads[5]);
2440   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletionAck);
2441   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2442   EXPECT_EQ(chunk.session_id(), 31u);
2443 
2444   EXPECT_EQ(transfer_status, OkStatus());
2445 }
2446 
TEST_F(WriteTransfer,Version2_RetryAfterHandshake)2447 TEST_F(WriteTransfer, Version2_RetryAfterHandshake) {
2448   stream::MemoryReader reader(kData32);
2449   Status transfer_status = Status::Unknown();
2450 
2451   ASSERT_EQ(OkStatus(),
2452             client_.Write(
2453                 3,
2454                 reader,
2455                 [&transfer_status](Status status) { transfer_status = status; },
2456                 cfg::kDefaultChunkTimeout,
2457                 ProtocolVersion::kVersionTwo));
2458   transfer_thread_.WaitUntilEventIsProcessed();
2459 
2460   // The client begins by sending the ID of the resource to transfer.
2461   rpc::PayloadsView payloads =
2462       context_.output().payloads<Transfer::Write>(context_.channel().id());
2463   ASSERT_EQ(payloads.size(), 1u);
2464   EXPECT_EQ(transfer_status, Status::Unknown());
2465 
2466   Chunk chunk = DecodeChunk(payloads.back());
2467   EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2468   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2469   EXPECT_EQ(chunk.session_id(), 3u);
2470   EXPECT_EQ(chunk.resource_id(), 3u);
2471 
2472   // The server responds with a START_ACK, continuing the version 2 handshake
2473   // and assigning a session_id to the transfer.
2474   context_.server().SendServerStream<Transfer::Write>(
2475       EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
2476                       .set_session_id(33)
2477                       .set_resource_id(3)));
2478   transfer_thread_.WaitUntilEventIsProcessed();
2479 
2480   ASSERT_EQ(payloads.size(), 2u);
2481 
2482   // Client should accept the session_id with a START_ACK_CONFIRMATION.
2483   chunk = DecodeChunk(payloads.back());
2484   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
2485   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2486   EXPECT_EQ(chunk.session_id(), 33u);
2487   EXPECT_FALSE(chunk.resource_id().has_value());
2488 
2489   // Time out waiting for a server response. The client should resend the
2490   // initial packet.
2491   transfer_thread_.SimulateClientTimeout(33);
2492   ASSERT_EQ(payloads.size(), 3u);
2493 
2494   chunk = DecodeChunk(payloads.back());
2495   EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
2496   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2497   EXPECT_EQ(chunk.session_id(), 33u);
2498   EXPECT_FALSE(chunk.resource_id().has_value());
2499 
2500   // This time, respond with the first transfer parameters chunk. The transfer
2501   // should resume and complete normally.
2502   rpc::test::WaitForPackets(context_.output(), 2, [this] {
2503     context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
2504         Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kParametersRetransmit)
2505             .set_session_id(33)
2506             .set_offset(0)
2507             .set_window_end_offset(64)
2508             .set_max_chunk_size_bytes(32)));
2509   });
2510 
2511   ASSERT_EQ(payloads.size(), 5u);
2512 
2513   chunk = DecodeChunk(payloads[3]);
2514   EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2515   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2516   EXPECT_EQ(chunk.session_id(), 33u);
2517   EXPECT_EQ(chunk.offset(), 0u);
2518   EXPECT_TRUE(chunk.has_payload());
2519   EXPECT_EQ(std::memcmp(
2520                 chunk.payload().data(), kData32.data(), chunk.payload().size()),
2521             0);
2522 
2523   chunk = DecodeChunk(payloads[4]);
2524   EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2525   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2526   EXPECT_EQ(chunk.session_id(), 33u);
2527   ASSERT_TRUE(chunk.remaining_bytes().has_value());
2528   EXPECT_EQ(chunk.remaining_bytes().value(), 0u);
2529 
2530   EXPECT_EQ(transfer_status, Status::Unknown());
2531 
2532   context_.server().SendServerStream<Transfer::Write>(
2533       EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 33, OkStatus())));
2534   transfer_thread_.WaitUntilEventIsProcessed();
2535 
2536   // Client should acknowledge the completion of the transfer.
2537   EXPECT_EQ(payloads.size(), 6u);
2538 
2539   chunk = DecodeChunk(payloads[5]);
2540   EXPECT_EQ(chunk.type(), Chunk::Type::kCompletionAck);
2541   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2542   EXPECT_EQ(chunk.session_id(), 33u);
2543 
2544   EXPECT_EQ(transfer_status, OkStatus());
2545 }
2546 
TEST_F(WriteTransfer,Version2_ServerErrorDuringHandshake)2547 TEST_F(WriteTransfer, Version2_ServerErrorDuringHandshake) {
2548   stream::MemoryReader reader(kData32);
2549   Status transfer_status = Status::Unknown();
2550 
2551   ASSERT_EQ(OkStatus(),
2552             client_.Write(
2553                 3,
2554                 reader,
2555                 [&transfer_status](Status status) { transfer_status = status; },
2556                 cfg::kDefaultChunkTimeout,
2557                 ProtocolVersion::kVersionTwo));
2558   transfer_thread_.WaitUntilEventIsProcessed();
2559 
2560   // The client begins by sending the ID of the resource to transfer.
2561   rpc::PayloadsView payloads =
2562       context_.output().payloads<Transfer::Write>(context_.channel().id());
2563   ASSERT_EQ(payloads.size(), 1u);
2564   EXPECT_EQ(transfer_status, Status::Unknown());
2565 
2566   Chunk chunk = DecodeChunk(payloads.back());
2567   EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2568   EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2569   EXPECT_EQ(chunk.session_id(), 3u);
2570   EXPECT_EQ(chunk.resource_id(), 3u);
2571 
2572   // The server responds to the start request with an error.
2573   context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
2574       Chunk::Final(ProtocolVersion::kVersionTwo, 3, Status::NotFound())));
2575   transfer_thread_.WaitUntilEventIsProcessed();
2576 
2577   EXPECT_EQ(payloads.size(), 1u);
2578   EXPECT_EQ(transfer_status, Status::NotFound());
2579 }
2580 
2581 }  // namespace
2582 }  // namespace pw::transfer::test
2583