1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_transfer/client.h"
16
17 #include <cstring>
18
19 #include "gtest/gtest.h"
20 #include "pw_assert/check.h"
21 #include "pw_bytes/array.h"
22 #include "pw_rpc/raw/client_testing.h"
23 #include "pw_rpc/test_helpers.h"
24 #include "pw_thread/sleep.h"
25 #include "pw_thread/thread.h"
26 #include "pw_thread_stl/options.h"
27 #include "pw_transfer_private/chunk_testing.h"
28
29 namespace pw::transfer::test {
30 namespace {
31
32 using internal::Chunk;
33 using pw_rpc::raw::Transfer;
34
35 using namespace std::chrono_literals;
36
TransferThreadOptions()37 thread::Options& TransferThreadOptions() {
38 static thread::stl::Options options;
39 return options;
40 }
41
42 class ReadTransfer : public ::testing::Test {
43 protected:
ReadTransfer(size_t max_bytes_to_receive=0)44 ReadTransfer(size_t max_bytes_to_receive = 0)
45 : transfer_thread_(chunk_buffer_, encode_buffer_),
46 client_(context_.client(),
47 context_.channel().id(),
48 transfer_thread_,
49 max_bytes_to_receive),
50 system_thread_(TransferThreadOptions(), transfer_thread_) {}
51
~ReadTransfer()52 ~ReadTransfer() override {
53 transfer_thread_.Terminate();
54 system_thread_.join();
55 }
56
57 rpc::RawClientTestContext<> context_;
58
59 Thread<1, 1> transfer_thread_;
60 Client client_;
61
62 std::array<std::byte, 64> chunk_buffer_;
63 std::array<std::byte, 64> encode_buffer_;
64
65 thread::Thread system_thread_;
66 };
67
__anond3adec360202(size_t i) 68 constexpr auto kData32 = bytes::Initialized<32>([](size_t i) { return i; });
__anond3adec360302(size_t i) 69 constexpr auto kData64 = bytes::Initialized<64>([](size_t i) { return i; });
70
TEST_F(ReadTransfer,SingleChunk)71 TEST_F(ReadTransfer, SingleChunk) {
72 stream::MemoryWriterBuffer<64> writer;
73 Status transfer_status = Status::Unknown();
74
75 ASSERT_EQ(OkStatus(),
76 client_.Read(3, writer, [&transfer_status](Status status) {
77 transfer_status = status;
78 }));
79
80 transfer_thread_.WaitUntilEventIsProcessed();
81
82 // First transfer parameters chunk is sent.
83 rpc::PayloadsView payloads =
84 context_.output().payloads<Transfer::Read>(context_.channel().id());
85 ASSERT_EQ(payloads.size(), 1u);
86 EXPECT_EQ(transfer_status, Status::Unknown());
87
88 Chunk c0 = DecodeChunk(payloads[0]);
89 EXPECT_EQ(c0.session_id(), 3u);
90 EXPECT_EQ(c0.offset(), 0u);
91 EXPECT_EQ(c0.window_end_offset(), 64u);
92 EXPECT_EQ(c0.type(), Chunk::Type::kStart);
93
94 context_.server().SendServerStream<Transfer::Read>(
95 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
96 .set_session_id(3)
97 .set_offset(0)
98 .set_payload(kData32)
99 .set_remaining_bytes(0)));
100 transfer_thread_.WaitUntilEventIsProcessed();
101
102 ASSERT_EQ(payloads.size(), 2u);
103
104 Chunk c1 = DecodeChunk(payloads.back());
105 EXPECT_EQ(c1.session_id(), 3u);
106 ASSERT_TRUE(c1.status().has_value());
107 EXPECT_EQ(c1.status().value(), OkStatus());
108
109 EXPECT_EQ(transfer_status, OkStatus());
110 EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
111 0);
112 }
113
TEST_F(ReadTransfer,MultiChunk)114 TEST_F(ReadTransfer, MultiChunk) {
115 stream::MemoryWriterBuffer<64> writer;
116 Status transfer_status = Status::Unknown();
117
118 ASSERT_EQ(OkStatus(),
119 client_.Read(4, writer, [&transfer_status](Status status) {
120 transfer_status = status;
121 }));
122
123 transfer_thread_.WaitUntilEventIsProcessed();
124
125 // First transfer parameters chunk is sent.
126 rpc::PayloadsView payloads =
127 context_.output().payloads<Transfer::Read>(context_.channel().id());
128 ASSERT_EQ(payloads.size(), 1u);
129 EXPECT_EQ(transfer_status, Status::Unknown());
130
131 Chunk c0 = DecodeChunk(payloads[0]);
132 EXPECT_EQ(c0.session_id(), 4u);
133 EXPECT_EQ(c0.offset(), 0u);
134 EXPECT_EQ(c0.window_end_offset(), 64u);
135 EXPECT_EQ(c0.type(), Chunk::Type::kStart);
136
137 constexpr ConstByteSpan data(kData32);
138 context_.server().SendServerStream<Transfer::Read>(
139 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
140 .set_session_id(4)
141 .set_offset(0)
142 .set_payload(data.first(16))));
143 transfer_thread_.WaitUntilEventIsProcessed();
144
145 ASSERT_EQ(payloads.size(), 1u);
146
147 context_.server().SendServerStream<Transfer::Read>(
148 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
149 .set_session_id(4)
150 .set_offset(16)
151 .set_payload(data.subspan(16))
152 .set_remaining_bytes(0)));
153 transfer_thread_.WaitUntilEventIsProcessed();
154
155 ASSERT_EQ(payloads.size(), 2u);
156
157 Chunk c1 = DecodeChunk(payloads[1]);
158 EXPECT_EQ(c1.session_id(), 4u);
159 ASSERT_TRUE(c1.status().has_value());
160 EXPECT_EQ(c1.status().value(), OkStatus());
161
162 EXPECT_EQ(transfer_status, OkStatus());
163 EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
164 0);
165 }
166
TEST_F(ReadTransfer,MultipleTransfers)167 TEST_F(ReadTransfer, MultipleTransfers) {
168 stream::MemoryWriterBuffer<64> writer;
169 Status transfer_status = Status::Unknown();
170
171 ASSERT_EQ(OkStatus(),
172 client_.Read(3, writer, [&transfer_status](Status status) {
173 transfer_status = status;
174 }));
175 transfer_thread_.WaitUntilEventIsProcessed();
176
177 context_.server().SendServerStream<Transfer::Read>(
178 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
179 .set_session_id(3)
180 .set_offset(0)
181 .set_payload(kData32)
182 .set_remaining_bytes(0)));
183 transfer_thread_.WaitUntilEventIsProcessed();
184
185 ASSERT_EQ(transfer_status, OkStatus());
186 transfer_status = Status::Unknown();
187
188 ASSERT_EQ(OkStatus(),
189 client_.Read(3, writer, [&transfer_status](Status status) {
190 transfer_status = status;
191 }));
192 transfer_thread_.WaitUntilEventIsProcessed();
193
194 context_.server().SendServerStream<Transfer::Read>(
195 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
196 .set_session_id(3)
197 .set_offset(0)
198 .set_payload(kData32)
199 .set_remaining_bytes(0)));
200 transfer_thread_.WaitUntilEventIsProcessed();
201
202 EXPECT_EQ(transfer_status, OkStatus());
203 }
204
205 class ReadTransferMaxBytes32 : public ReadTransfer {
206 protected:
ReadTransferMaxBytes32()207 ReadTransferMaxBytes32() : ReadTransfer(/*max_bytes_to_receive=*/32) {}
208 };
209
TEST_F(ReadTransferMaxBytes32,SetsPendingBytesFromConstructorArg)210 TEST_F(ReadTransferMaxBytes32, SetsPendingBytesFromConstructorArg) {
211 stream::MemoryWriterBuffer<64> writer;
212 EXPECT_EQ(OkStatus(), client_.Read(5, writer, [](Status) {}));
213 transfer_thread_.WaitUntilEventIsProcessed();
214
215 // First transfer parameters chunk is sent.
216 rpc::PayloadsView payloads =
217 context_.output().payloads<Transfer::Read>(context_.channel().id());
218 ASSERT_EQ(payloads.size(), 1u);
219
220 Chunk c0 = DecodeChunk(payloads[0]);
221 EXPECT_EQ(c0.session_id(), 5u);
222 EXPECT_EQ(c0.offset(), 0u);
223 EXPECT_EQ(c0.window_end_offset(), 32u);
224 EXPECT_EQ(c0.type(), Chunk::Type::kStart);
225 }
226
TEST_F(ReadTransferMaxBytes32,SetsPendingBytesFromWriterLimit)227 TEST_F(ReadTransferMaxBytes32, SetsPendingBytesFromWriterLimit) {
228 stream::MemoryWriterBuffer<16> small_writer;
229 EXPECT_EQ(OkStatus(), client_.Read(5, small_writer, [](Status) {}));
230 transfer_thread_.WaitUntilEventIsProcessed();
231
232 // First transfer parameters chunk is sent.
233 rpc::PayloadsView payloads =
234 context_.output().payloads<Transfer::Read>(context_.channel().id());
235 ASSERT_EQ(payloads.size(), 1u);
236
237 Chunk c0 = DecodeChunk(payloads[0]);
238 EXPECT_EQ(c0.session_id(), 5u);
239 EXPECT_EQ(c0.offset(), 0u);
240 EXPECT_EQ(c0.window_end_offset(), 16u);
241 EXPECT_EQ(c0.type(), Chunk::Type::kStart);
242 }
243
TEST_F(ReadTransferMaxBytes32,MultiParameters)244 TEST_F(ReadTransferMaxBytes32, MultiParameters) {
245 stream::MemoryWriterBuffer<64> writer;
246 Status transfer_status = Status::Unknown();
247
248 ASSERT_EQ(OkStatus(),
249 client_.Read(6, writer, [&transfer_status](Status status) {
250 transfer_status = status;
251 }));
252 transfer_thread_.WaitUntilEventIsProcessed();
253
254 // First transfer parameters chunk is sent.
255 rpc::PayloadsView payloads =
256 context_.output().payloads<Transfer::Read>(context_.channel().id());
257 ASSERT_EQ(payloads.size(), 1u);
258 EXPECT_EQ(transfer_status, Status::Unknown());
259
260 Chunk c0 = DecodeChunk(payloads[0]);
261 EXPECT_EQ(c0.session_id(), 6u);
262 EXPECT_EQ(c0.offset(), 0u);
263 ASSERT_EQ(c0.window_end_offset(), 32u);
264
265 constexpr ConstByteSpan data(kData64);
266 context_.server().SendServerStream<Transfer::Read>(
267 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
268 .set_session_id(6)
269 .set_offset(0)
270 .set_payload(data.first(32))));
271 transfer_thread_.WaitUntilEventIsProcessed();
272
273 ASSERT_EQ(payloads.size(), 2u);
274 EXPECT_EQ(transfer_status, Status::Unknown());
275
276 // Second parameters chunk.
277 Chunk c1 = DecodeChunk(payloads[1]);
278 EXPECT_EQ(c1.session_id(), 6u);
279 EXPECT_EQ(c1.offset(), 32u);
280 ASSERT_EQ(c1.window_end_offset(), 64u);
281
282 context_.server().SendServerStream<Transfer::Read>(
283 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
284 .set_session_id(6)
285 .set_offset(32)
286 .set_payload(data.subspan(32))
287 .set_remaining_bytes(0)));
288 transfer_thread_.WaitUntilEventIsProcessed();
289
290 ASSERT_EQ(payloads.size(), 3u);
291
292 Chunk c2 = DecodeChunk(payloads[2]);
293 EXPECT_EQ(c2.session_id(), 6u);
294 ASSERT_TRUE(c2.status().has_value());
295 EXPECT_EQ(c2.status().value(), OkStatus());
296
297 EXPECT_EQ(transfer_status, OkStatus());
298 EXPECT_EQ(std::memcmp(writer.data(), data.data(), writer.bytes_written()), 0);
299 }
300
TEST_F(ReadTransfer,UnexpectedOffset)301 TEST_F(ReadTransfer, UnexpectedOffset) {
302 stream::MemoryWriterBuffer<64> writer;
303 Status transfer_status = Status::Unknown();
304
305 ASSERT_EQ(OkStatus(),
306 client_.Read(7, writer, [&transfer_status](Status status) {
307 transfer_status = status;
308 }));
309 transfer_thread_.WaitUntilEventIsProcessed();
310
311 // First transfer parameters chunk is sent.
312 rpc::PayloadsView payloads =
313 context_.output().payloads<Transfer::Read>(context_.channel().id());
314 ASSERT_EQ(payloads.size(), 1u);
315 EXPECT_EQ(transfer_status, Status::Unknown());
316
317 Chunk c0 = DecodeChunk(payloads[0]);
318 EXPECT_EQ(c0.session_id(), 7u);
319 EXPECT_EQ(c0.offset(), 0u);
320 EXPECT_EQ(c0.window_end_offset(), 64u);
321
322 constexpr ConstByteSpan data(kData32);
323 context_.server().SendServerStream<Transfer::Read>(
324 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
325 .set_session_id(7)
326 .set_offset(0)
327 .set_payload(data.first(16))));
328 transfer_thread_.WaitUntilEventIsProcessed();
329
330 ASSERT_EQ(payloads.size(), 1u);
331 EXPECT_EQ(transfer_status, Status::Unknown());
332
333 // Send a chunk with an incorrect offset. The client should resend parameters.
334 context_.server().SendServerStream<Transfer::Read>(
335 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
336 .set_session_id(7)
337 .set_offset(8) // wrong!
338 .set_payload(data.subspan(16))
339 .set_remaining_bytes(0)));
340 transfer_thread_.WaitUntilEventIsProcessed();
341
342 ASSERT_EQ(payloads.size(), 2u);
343 EXPECT_EQ(transfer_status, Status::Unknown());
344
345 Chunk c1 = DecodeChunk(payloads[1]);
346 EXPECT_EQ(c1.session_id(), 7u);
347 EXPECT_EQ(c1.offset(), 16u);
348 EXPECT_EQ(c1.window_end_offset(), 64u);
349
350 // Send the correct chunk, completing the transfer.
351 context_.server().SendServerStream<Transfer::Read>(
352 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
353 .set_session_id(7)
354 .set_offset(16)
355 .set_payload(data.subspan(16))
356 .set_remaining_bytes(0)));
357 transfer_thread_.WaitUntilEventIsProcessed();
358
359 ASSERT_EQ(payloads.size(), 3u);
360
361 Chunk c2 = DecodeChunk(payloads[2]);
362 EXPECT_EQ(c2.session_id(), 7u);
363 ASSERT_TRUE(c2.status().has_value());
364 EXPECT_EQ(c2.status().value(), OkStatus());
365
366 EXPECT_EQ(transfer_status, OkStatus());
367 EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
368 0);
369 }
370
TEST_F(ReadTransferMaxBytes32,TooMuchData)371 TEST_F(ReadTransferMaxBytes32, TooMuchData) {
372 stream::MemoryWriterBuffer<32> writer;
373 Status transfer_status = Status::Unknown();
374
375 ASSERT_EQ(OkStatus(),
376 client_.Read(8, writer, [&transfer_status](Status status) {
377 transfer_status = status;
378 }));
379 transfer_thread_.WaitUntilEventIsProcessed();
380
381 // First transfer parameters chunk is sent.
382 rpc::PayloadsView payloads =
383 context_.output().payloads<Transfer::Read>(context_.channel().id());
384 ASSERT_EQ(payloads.size(), 1u);
385 EXPECT_EQ(transfer_status, Status::Unknown());
386
387 Chunk c0 = DecodeChunk(payloads[0]);
388 EXPECT_EQ(c0.session_id(), 8u);
389 EXPECT_EQ(c0.offset(), 0u);
390 ASSERT_EQ(c0.window_end_offset(), 32u);
391
392 constexpr ConstByteSpan data(kData64);
393
394 // pending_bytes == 32
395 context_.server().SendServerStream<Transfer::Read>(
396 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
397 .set_session_id(8)
398 .set_offset(0)
399 .set_payload(data.first(16))));
400
401 // pending_bytes == 16
402 context_.server().SendServerStream<Transfer::Read>(
403 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
404 .set_session_id(8)
405 .set_offset(16)
406 .set_payload(data.subspan(16, 8))));
407
408 // pending_bytes == 8, send 16 instead.
409 context_.server().SendServerStream<Transfer::Read>(
410 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
411 .set_session_id(8)
412 .set_offset(24)
413 .set_payload(data.subspan(24, 16))));
414 transfer_thread_.WaitUntilEventIsProcessed();
415
416 ASSERT_EQ(payloads.size(), 4u);
417
418 Chunk c1 = DecodeChunk(payloads[3]);
419 EXPECT_EQ(c1.session_id(), 8u);
420 ASSERT_TRUE(c1.status().has_value());
421 EXPECT_EQ(c1.status().value(), Status::Internal());
422
423 EXPECT_EQ(transfer_status, Status::Internal());
424 }
425
TEST_F(ReadTransfer,ServerError)426 TEST_F(ReadTransfer, ServerError) {
427 stream::MemoryWriterBuffer<64> writer;
428 Status transfer_status = Status::Unknown();
429
430 ASSERT_EQ(OkStatus(),
431 client_.Read(9, writer, [&transfer_status](Status status) {
432 transfer_status = status;
433 }));
434 transfer_thread_.WaitUntilEventIsProcessed();
435
436 // First transfer parameters chunk is sent.
437 rpc::PayloadsView payloads =
438 context_.output().payloads<Transfer::Read>(context_.channel().id());
439 ASSERT_EQ(payloads.size(), 1u);
440 EXPECT_EQ(transfer_status, Status::Unknown());
441
442 Chunk c0 = DecodeChunk(payloads[0]);
443 EXPECT_EQ(c0.session_id(), 9u);
444 EXPECT_EQ(c0.offset(), 0u);
445 ASSERT_EQ(c0.window_end_offset(), 64u);
446
447 // Server sends an error. Client should not respond and terminate the
448 // transfer.
449 context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
450 Chunk::Final(ProtocolVersion::kLegacy, 9, Status::NotFound())));
451 transfer_thread_.WaitUntilEventIsProcessed();
452
453 ASSERT_EQ(payloads.size(), 1u);
454 EXPECT_EQ(transfer_status, Status::NotFound());
455 }
456
TEST_F(ReadTransfer,OnlySendsParametersOnceAfterDrop)457 TEST_F(ReadTransfer, OnlySendsParametersOnceAfterDrop) {
458 stream::MemoryWriterBuffer<64> writer;
459 Status transfer_status = Status::Unknown();
460
461 ASSERT_EQ(OkStatus(),
462 client_.Read(10, writer, [&transfer_status](Status status) {
463 transfer_status = status;
464 }));
465 transfer_thread_.WaitUntilEventIsProcessed();
466
467 // First transfer parameters chunk is sent.
468 rpc::PayloadsView payloads =
469 context_.output().payloads<Transfer::Read>(context_.channel().id());
470 ASSERT_EQ(payloads.size(), 1u);
471 EXPECT_EQ(transfer_status, Status::Unknown());
472
473 Chunk c0 = DecodeChunk(payloads[0]);
474 EXPECT_EQ(c0.session_id(), 10u);
475 EXPECT_EQ(c0.offset(), 0u);
476 ASSERT_EQ(c0.window_end_offset(), 64u);
477
478 constexpr ConstByteSpan data(kData32);
479
480 // Send the first 8 bytes of the transfer.
481 context_.server().SendServerStream<Transfer::Read>(
482 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
483 .set_session_id(10)
484 .set_offset(0)
485 .set_payload(data.first(8))));
486
487 // Skip offset 8, send the rest starting from 16.
488 for (uint32_t offset = 16; offset < data.size(); offset += 8) {
489 context_.server().SendServerStream<Transfer::Read>(
490 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
491 .set_session_id(10)
492 .set_offset(offset)
493 .set_payload(data.subspan(offset, 8))));
494 }
495 transfer_thread_.WaitUntilEventIsProcessed();
496
497 // Only one parameters update should be sent, with the offset of the initial
498 // dropped packet.
499 ASSERT_EQ(payloads.size(), 2u);
500
501 Chunk c1 = DecodeChunk(payloads[1]);
502 EXPECT_EQ(c1.session_id(), 10u);
503 EXPECT_EQ(c1.offset(), 8u);
504 ASSERT_EQ(c1.window_end_offset(), 64u);
505
506 // Send the remaining data to complete the transfer.
507 context_.server().SendServerStream<Transfer::Read>(
508 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
509 .set_session_id(10)
510 .set_offset(8)
511 .set_payload(data.subspan(8))
512 .set_remaining_bytes(0)));
513 transfer_thread_.WaitUntilEventIsProcessed();
514
515 ASSERT_EQ(payloads.size(), 3u);
516
517 Chunk c2 = DecodeChunk(payloads[2]);
518 EXPECT_EQ(c2.session_id(), 10u);
519 ASSERT_TRUE(c2.status().has_value());
520 EXPECT_EQ(c2.status().value(), OkStatus());
521
522 EXPECT_EQ(transfer_status, OkStatus());
523 }
524
TEST_F(ReadTransfer,ResendsParametersIfSentRepeatedChunkDuringRecovery)525 TEST_F(ReadTransfer, ResendsParametersIfSentRepeatedChunkDuringRecovery) {
526 stream::MemoryWriterBuffer<64> writer;
527 Status transfer_status = Status::Unknown();
528
529 ASSERT_EQ(OkStatus(),
530 client_.Read(11, writer, [&transfer_status](Status status) {
531 transfer_status = status;
532 }));
533 transfer_thread_.WaitUntilEventIsProcessed();
534
535 // First transfer parameters chunk is sent.
536 rpc::PayloadsView payloads =
537 context_.output().payloads<Transfer::Read>(context_.channel().id());
538 ASSERT_EQ(payloads.size(), 1u);
539 EXPECT_EQ(transfer_status, Status::Unknown());
540
541 Chunk c0 = DecodeChunk(payloads[0]);
542 EXPECT_EQ(c0.session_id(), 11u);
543 EXPECT_EQ(c0.offset(), 0u);
544 ASSERT_EQ(c0.window_end_offset(), 64u);
545
546 constexpr ConstByteSpan data(kData32);
547
548 // Send the first 8 bytes of the transfer.
549 context_.server().SendServerStream<Transfer::Read>(
550 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
551 .set_session_id(11)
552 .set_offset(0)
553 .set_payload(data.first(8))));
554
555 // Skip offset 8, send the rest starting from 16.
556 for (uint32_t offset = 16; offset < data.size(); offset += 8) {
557 context_.server().SendServerStream<Transfer::Read>(
558 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
559 .set_session_id(11)
560 .set_offset(offset)
561 .set_payload(data.subspan(offset, 8))));
562 }
563 transfer_thread_.WaitUntilEventIsProcessed();
564
565 // Only one parameters update should be sent, with the offset of the initial
566 // dropped packet.
567 ASSERT_EQ(payloads.size(), 2u);
568
569 const Chunk last_chunk = Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
570 .set_session_id(11)
571 .set_offset(24)
572 .set_payload(data.subspan(24));
573
574 // Re-send the final chunk of the block.
575 context_.server().SendServerStream<Transfer::Read>(EncodeChunk(last_chunk));
576 transfer_thread_.WaitUntilEventIsProcessed();
577
578 // The original drop parameters should be re-sent.
579 ASSERT_EQ(payloads.size(), 3u);
580 Chunk c2 = DecodeChunk(payloads[2]);
581 EXPECT_EQ(c2.session_id(), 11u);
582 EXPECT_EQ(c2.offset(), 8u);
583 ASSERT_EQ(c2.window_end_offset(), 64u);
584
585 // Do it again.
586 context_.server().SendServerStream<Transfer::Read>(EncodeChunk(last_chunk));
587 transfer_thread_.WaitUntilEventIsProcessed();
588
589 ASSERT_EQ(payloads.size(), 4u);
590 Chunk c3 = DecodeChunk(payloads[3]);
591 EXPECT_EQ(c3.session_id(), 11u);
592 EXPECT_EQ(c3.offset(), 8u);
593 ASSERT_EQ(c3.window_end_offset(), 64u);
594
595 // Finish the transfer normally.
596 context_.server().SendServerStream<Transfer::Read>(
597 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
598 .set_session_id(11)
599 .set_offset(8)
600 .set_payload(data.subspan(8))
601 .set_remaining_bytes(0)));
602 transfer_thread_.WaitUntilEventIsProcessed();
603
604 ASSERT_EQ(payloads.size(), 5u);
605
606 Chunk c4 = DecodeChunk(payloads[4]);
607 EXPECT_EQ(c4.session_id(), 11u);
608 ASSERT_TRUE(c4.status().has_value());
609 EXPECT_EQ(c4.status().value(), OkStatus());
610
611 EXPECT_EQ(transfer_status, OkStatus());
612 }
613
614 constexpr chrono::SystemClock::duration kTestTimeout =
615 std::chrono::milliseconds(50);
616 constexpr uint8_t kTestRetries = 3;
617
TEST_F(ReadTransfer,Timeout_ResendsCurrentParameters)618 TEST_F(ReadTransfer, Timeout_ResendsCurrentParameters) {
619 stream::MemoryWriterBuffer<64> writer;
620 Status transfer_status = Status::Unknown();
621
622 ASSERT_EQ(OkStatus(),
623 client_.Read(
624 12,
625 writer,
626 [&transfer_status](Status status) { transfer_status = status; },
627 kTestTimeout));
628 transfer_thread_.WaitUntilEventIsProcessed();
629
630 // First transfer parameters chunk is sent.
631 rpc::PayloadsView payloads =
632 context_.output().payloads<Transfer::Read>(context_.channel().id());
633 ASSERT_EQ(payloads.size(), 1u);
634 EXPECT_EQ(transfer_status, Status::Unknown());
635
636 Chunk c0 = DecodeChunk(payloads.back());
637 EXPECT_EQ(c0.session_id(), 12u);
638 EXPECT_EQ(c0.offset(), 0u);
639 EXPECT_EQ(c0.window_end_offset(), 64u);
640 EXPECT_EQ(c0.type(), Chunk::Type::kStart);
641
642 // Wait for the timeout to expire without doing anything. The client should
643 // resend its initial parameters chunk.
644 transfer_thread_.SimulateClientTimeout(12);
645 ASSERT_EQ(payloads.size(), 2u);
646
647 Chunk c = DecodeChunk(payloads.back());
648 EXPECT_EQ(c.session_id(), 12u);
649 EXPECT_EQ(c.offset(), 0u);
650 EXPECT_EQ(c.window_end_offset(), 64u);
651 EXPECT_EQ(c0.type(), Chunk::Type::kStart);
652
653 // Transfer has not yet completed.
654 EXPECT_EQ(transfer_status, Status::Unknown());
655
656 // Finish the transfer following the timeout.
657 context_.server().SendServerStream<Transfer::Read>(
658 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
659 .set_session_id(12)
660 .set_offset(0)
661 .set_payload(kData32)
662 .set_remaining_bytes(0)));
663 transfer_thread_.WaitUntilEventIsProcessed();
664
665 ASSERT_EQ(payloads.size(), 3u);
666
667 Chunk c4 = DecodeChunk(payloads.back());
668 EXPECT_EQ(c4.session_id(), 12u);
669 ASSERT_TRUE(c4.status().has_value());
670 EXPECT_EQ(c4.status().value(), OkStatus());
671
672 EXPECT_EQ(transfer_status, OkStatus());
673 }
674
TEST_F(ReadTransfer,Timeout_ResendsUpdatedParameters)675 TEST_F(ReadTransfer, Timeout_ResendsUpdatedParameters) {
676 stream::MemoryWriterBuffer<64> writer;
677 Status transfer_status = Status::Unknown();
678
679 ASSERT_EQ(OkStatus(),
680 client_.Read(
681 13,
682 writer,
683 [&transfer_status](Status status) { transfer_status = status; },
684 kTestTimeout));
685 transfer_thread_.WaitUntilEventIsProcessed();
686
687 // First transfer parameters chunk is sent.
688 rpc::PayloadsView payloads =
689 context_.output().payloads<Transfer::Read>(context_.channel().id());
690 ASSERT_EQ(payloads.size(), 1u);
691 EXPECT_EQ(transfer_status, Status::Unknown());
692
693 Chunk c0 = DecodeChunk(payloads.back());
694 EXPECT_EQ(c0.session_id(), 13u);
695 EXPECT_EQ(c0.offset(), 0u);
696 EXPECT_EQ(c0.window_end_offset(), 64u);
697 EXPECT_EQ(c0.type(), Chunk::Type::kStart);
698
699 constexpr ConstByteSpan data(kData32);
700
701 // Send some data, but not everything.
702 context_.server().SendServerStream<Transfer::Read>(
703 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
704 .set_session_id(13)
705 .set_offset(0)
706 .set_payload(data.first(16))));
707 transfer_thread_.WaitUntilEventIsProcessed();
708
709 ASSERT_EQ(payloads.size(), 1u);
710
711 // Wait for the timeout to expire without sending more data. The client should
712 // send an updated parameters chunk, accounting for the data already received.
713 transfer_thread_.SimulateClientTimeout(13);
714 ASSERT_EQ(payloads.size(), 2u);
715
716 Chunk c = DecodeChunk(payloads.back());
717 EXPECT_EQ(c.session_id(), 13u);
718 EXPECT_EQ(c.offset(), 16u);
719 EXPECT_EQ(c.window_end_offset(), 64u);
720 EXPECT_EQ(c.type(), Chunk::Type::kParametersRetransmit);
721
722 // Transfer has not yet completed.
723 EXPECT_EQ(transfer_status, Status::Unknown());
724
725 // Send the rest of the data, finishing the transfer.
726 context_.server().SendServerStream<Transfer::Read>(
727 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
728 .set_session_id(13)
729 .set_offset(16)
730 .set_payload(data.subspan(16))
731 .set_remaining_bytes(0)));
732 transfer_thread_.WaitUntilEventIsProcessed();
733
734 ASSERT_EQ(payloads.size(), 3u);
735
736 Chunk c4 = DecodeChunk(payloads.back());
737 EXPECT_EQ(c4.session_id(), 13u);
738 ASSERT_TRUE(c4.status().has_value());
739 EXPECT_EQ(c4.status().value(), OkStatus());
740
741 EXPECT_EQ(transfer_status, OkStatus());
742 }
743
TEST_F(ReadTransfer,Timeout_EndsTransferAfterMaxRetries)744 TEST_F(ReadTransfer, Timeout_EndsTransferAfterMaxRetries) {
745 stream::MemoryWriterBuffer<64> writer;
746 Status transfer_status = Status::Unknown();
747
748 ASSERT_EQ(OkStatus(),
749 client_.Read(
750 14,
751 writer,
752 [&transfer_status](Status status) { transfer_status = status; },
753 kTestTimeout));
754 transfer_thread_.WaitUntilEventIsProcessed();
755
756 // First transfer parameters chunk is sent.
757 rpc::PayloadsView payloads =
758 context_.output().payloads<Transfer::Read>(context_.channel().id());
759 ASSERT_EQ(payloads.size(), 1u);
760 EXPECT_EQ(transfer_status, Status::Unknown());
761
762 Chunk c0 = DecodeChunk(payloads.back());
763 EXPECT_EQ(c0.session_id(), 14u);
764 EXPECT_EQ(c0.offset(), 0u);
765 EXPECT_EQ(c0.window_end_offset(), 64u);
766 EXPECT_EQ(c0.type(), Chunk::Type::kStart);
767
768 for (unsigned retry = 1; retry <= kTestRetries; ++retry) {
769 // Wait for the timeout to expire without doing anything. The client should
770 // resend its parameters chunk.
771 transfer_thread_.SimulateClientTimeout(14);
772 ASSERT_EQ(payloads.size(), retry + 1);
773
774 Chunk c = DecodeChunk(payloads.back());
775 EXPECT_EQ(c.session_id(), 14u);
776 EXPECT_EQ(c.offset(), 0u);
777 EXPECT_EQ(c.window_end_offset(), 64u);
778
779 // Transfer has not yet completed.
780 EXPECT_EQ(transfer_status, Status::Unknown());
781 }
782
783 // Sleep one more time after the final retry. The client should cancel the
784 // transfer at this point. As no packets were received from the server, no
785 // final status chunk should be sent.
786 transfer_thread_.SimulateClientTimeout(14);
787 ASSERT_EQ(payloads.size(), 4u);
788
789 EXPECT_EQ(transfer_status, Status::DeadlineExceeded());
790
791 // After finishing the transfer, nothing else should be sent. Verify this by
792 // waiting for a bit.
793 this_thread::sleep_for(kTestTimeout * 4);
794 ASSERT_EQ(payloads.size(), 4u);
795 }
796
TEST_F(ReadTransfer,Timeout_ReceivingDataResetsRetryCount)797 TEST_F(ReadTransfer, Timeout_ReceivingDataResetsRetryCount) {
798 stream::MemoryWriterBuffer<64> writer;
799 Status transfer_status = Status::Unknown();
800
801 constexpr ConstByteSpan data(kData32);
802
803 ASSERT_EQ(OkStatus(),
804 client_.Read(
805 14,
806 writer,
807 [&transfer_status](Status status) { transfer_status = status; },
808 kTestTimeout));
809 transfer_thread_.WaitUntilEventIsProcessed();
810
811 // First transfer parameters chunk is sent.
812 rpc::PayloadsView payloads =
813 context_.output().payloads<Transfer::Read>(context_.channel().id());
814 ASSERT_EQ(payloads.size(), 1u);
815 EXPECT_EQ(transfer_status, Status::Unknown());
816
817 Chunk c0 = DecodeChunk(payloads.back());
818 EXPECT_EQ(c0.session_id(), 14u);
819 EXPECT_EQ(c0.offset(), 0u);
820 EXPECT_EQ(c0.window_end_offset(), 64u);
821
822 // Simulate one less timeout than the maximum amount of retries.
823 for (unsigned retry = 1; retry <= kTestRetries - 1; ++retry) {
824 transfer_thread_.SimulateClientTimeout(14);
825 ASSERT_EQ(payloads.size(), retry + 1);
826
827 Chunk c = DecodeChunk(payloads.back());
828 EXPECT_EQ(c.session_id(), 14u);
829 EXPECT_EQ(c.offset(), 0u);
830 EXPECT_EQ(c.window_end_offset(), 64u);
831
832 // Transfer has not yet completed.
833 EXPECT_EQ(transfer_status, Status::Unknown());
834 }
835
836 // Send some data.
837 context_.server().SendServerStream<Transfer::Read>(
838 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
839 .set_session_id(14)
840 .set_offset(0)
841 .set_payload(data.first(16))));
842 transfer_thread_.WaitUntilEventIsProcessed();
843 ASSERT_EQ(payloads.size(), 3u);
844
845 // Time out a couple more times. The context's retry count should have been
846 // reset, so it should go through the standard retry flow instead of
847 // terminating the transfer.
848 transfer_thread_.SimulateClientTimeout(14);
849 ASSERT_EQ(payloads.size(), 4u);
850
851 Chunk c = DecodeChunk(payloads.back());
852 EXPECT_FALSE(c.status().has_value());
853 EXPECT_EQ(c.session_id(), 14u);
854 EXPECT_EQ(c.offset(), 16u);
855 EXPECT_EQ(c.window_end_offset(), 64u);
856
857 transfer_thread_.SimulateClientTimeout(14);
858 ASSERT_EQ(payloads.size(), 5u);
859
860 c = DecodeChunk(payloads.back());
861 EXPECT_FALSE(c.status().has_value());
862 EXPECT_EQ(c.session_id(), 14u);
863 EXPECT_EQ(c.offset(), 16u);
864 EXPECT_EQ(c.window_end_offset(), 64u);
865
866 // Ensure we don't leave a dangling reference to transfer_status.
867 client_.CancelTransfer(14);
868 transfer_thread_.WaitUntilEventIsProcessed();
869 }
870
TEST_F(ReadTransfer,InitialPacketFails_OnCompletedCalledWithDataLoss)871 TEST_F(ReadTransfer, InitialPacketFails_OnCompletedCalledWithDataLoss) {
872 stream::MemoryWriterBuffer<64> writer;
873 Status transfer_status = Status::Unknown();
874
875 context_.output().set_send_status(Status::Unauthenticated());
876
877 ASSERT_EQ(OkStatus(),
878 client_.Read(
879 14,
880 writer,
881 [&transfer_status](Status status) {
882 ASSERT_EQ(transfer_status,
883 Status::Unknown()); // Must only call once
884 transfer_status = status;
885 },
886 kTestTimeout));
887 transfer_thread_.WaitUntilEventIsProcessed();
888
889 EXPECT_EQ(transfer_status, Status::Internal());
890 }
891
892 class WriteTransfer : public ::testing::Test {
893 protected:
WriteTransfer()894 WriteTransfer()
895 : transfer_thread_(chunk_buffer_, encode_buffer_),
896 client_(context_.client(), context_.channel().id(), transfer_thread_),
897 system_thread_(TransferThreadOptions(), transfer_thread_) {}
898
~WriteTransfer()899 ~WriteTransfer() override {
900 transfer_thread_.Terminate();
901 system_thread_.join();
902 }
903
904 rpc::RawClientTestContext<> context_;
905
906 Thread<1, 1> transfer_thread_;
907 Client client_;
908
909 std::array<std::byte, 64> chunk_buffer_;
910 std::array<std::byte, 64> encode_buffer_;
911
912 thread::Thread system_thread_;
913 };
914
TEST_F(WriteTransfer,SingleChunk)915 TEST_F(WriteTransfer, SingleChunk) {
916 stream::MemoryReader reader(kData32);
917 Status transfer_status = Status::Unknown();
918
919 ASSERT_EQ(OkStatus(),
920 client_.Write(3, reader, [&transfer_status](Status status) {
921 transfer_status = status;
922 }));
923 transfer_thread_.WaitUntilEventIsProcessed();
924
925 // The client begins by sending the ID of the resource to transfer.
926 rpc::PayloadsView payloads =
927 context_.output().payloads<Transfer::Write>(context_.channel().id());
928 ASSERT_EQ(payloads.size(), 1u);
929 EXPECT_EQ(transfer_status, Status::Unknown());
930
931 Chunk c0 = DecodeChunk(payloads[0]);
932 EXPECT_EQ(c0.session_id(), 3u);
933 EXPECT_EQ(c0.resource_id(), 3u);
934 EXPECT_EQ(c0.type(), Chunk::Type::kStart);
935
936 // Send transfer parameters. Client should send a data chunk and the final
937 // chunk.
938 rpc::test::WaitForPackets(context_.output(), 2, [this] {
939 context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
940 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
941 .set_session_id(3)
942 .set_offset(0)
943 .set_window_end_offset(64)
944 .set_max_chunk_size_bytes(32)));
945 });
946
947 ASSERT_EQ(payloads.size(), 3u);
948
949 Chunk c1 = DecodeChunk(payloads[1]);
950 EXPECT_EQ(c1.session_id(), 3u);
951 EXPECT_EQ(c1.offset(), 0u);
952 EXPECT_TRUE(c1.has_payload());
953 EXPECT_EQ(
954 std::memcmp(c1.payload().data(), kData32.data(), c1.payload().size()), 0);
955
956 Chunk c2 = DecodeChunk(payloads[2]);
957 EXPECT_EQ(c2.session_id(), 3u);
958 ASSERT_TRUE(c2.remaining_bytes().has_value());
959 EXPECT_EQ(c2.remaining_bytes().value(), 0u);
960
961 EXPECT_EQ(transfer_status, Status::Unknown());
962
963 // Send the final status chunk to complete the transfer.
964 context_.server().SendServerStream<Transfer::Write>(
965 EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
966 transfer_thread_.WaitUntilEventIsProcessed();
967
968 EXPECT_EQ(payloads.size(), 3u);
969 EXPECT_EQ(transfer_status, OkStatus());
970 }
971
TEST_F(WriteTransfer,MultiChunk)972 TEST_F(WriteTransfer, MultiChunk) {
973 stream::MemoryReader reader(kData32);
974 Status transfer_status = Status::Unknown();
975
976 ASSERT_EQ(OkStatus(),
977 client_.Write(4, reader, [&transfer_status](Status status) {
978 transfer_status = status;
979 }));
980 transfer_thread_.WaitUntilEventIsProcessed();
981
982 // The client begins by sending the ID of the resource to transfer.
983 rpc::PayloadsView payloads =
984 context_.output().payloads<Transfer::Write>(context_.channel().id());
985 ASSERT_EQ(payloads.size(), 1u);
986 EXPECT_EQ(transfer_status, Status::Unknown());
987
988 Chunk c0 = DecodeChunk(payloads[0]);
989 EXPECT_EQ(c0.session_id(), 4u);
990 EXPECT_EQ(c0.resource_id(), 4u);
991 EXPECT_EQ(c0.type(), Chunk::Type::kStart);
992
993 // Send transfer parameters with a chunk size smaller than the data.
994
995 // Client should send two data chunks and the final chunk.
996 rpc::test::WaitForPackets(context_.output(), 3, [this] {
997 context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
998 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
999 .set_session_id(4)
1000 .set_offset(0)
1001 .set_window_end_offset(64)
1002 .set_max_chunk_size_bytes(16)));
1003 });
1004
1005 ASSERT_EQ(payloads.size(), 4u);
1006
1007 Chunk c1 = DecodeChunk(payloads[1]);
1008 EXPECT_EQ(c1.session_id(), 4u);
1009 EXPECT_EQ(c1.offset(), 0u);
1010 EXPECT_TRUE(c1.has_payload());
1011 EXPECT_EQ(
1012 std::memcmp(c1.payload().data(), kData32.data(), c1.payload().size()), 0);
1013
1014 Chunk c2 = DecodeChunk(payloads[2]);
1015 EXPECT_EQ(c2.session_id(), 4u);
1016 EXPECT_EQ(c2.offset(), 16u);
1017 EXPECT_TRUE(c2.has_payload());
1018 EXPECT_EQ(std::memcmp(c2.payload().data(),
1019 kData32.data() + c2.offset(),
1020 c2.payload().size()),
1021 0);
1022
1023 Chunk c3 = DecodeChunk(payloads[3]);
1024 EXPECT_EQ(c3.session_id(), 4u);
1025 ASSERT_TRUE(c3.remaining_bytes().has_value());
1026 EXPECT_EQ(c3.remaining_bytes().value(), 0u);
1027
1028 EXPECT_EQ(transfer_status, Status::Unknown());
1029
1030 // Send the final status chunk to complete the transfer.
1031 context_.server().SendServerStream<Transfer::Write>(
1032 EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 4, OkStatus())));
1033 transfer_thread_.WaitUntilEventIsProcessed();
1034
1035 EXPECT_EQ(payloads.size(), 4u);
1036 EXPECT_EQ(transfer_status, OkStatus());
1037 }
1038
TEST_F(WriteTransfer,OutOfOrder_SeekSupported)1039 TEST_F(WriteTransfer, OutOfOrder_SeekSupported) {
1040 stream::MemoryReader reader(kData32);
1041 Status transfer_status = Status::Unknown();
1042
1043 ASSERT_EQ(OkStatus(),
1044 client_.Write(5, reader, [&transfer_status](Status status) {
1045 transfer_status = status;
1046 }));
1047 transfer_thread_.WaitUntilEventIsProcessed();
1048
1049 // The client begins by sending the ID of the resource to transfer.
1050 rpc::PayloadsView payloads =
1051 context_.output().payloads<Transfer::Write>(context_.channel().id());
1052 ASSERT_EQ(payloads.size(), 1u);
1053 EXPECT_EQ(transfer_status, Status::Unknown());
1054
1055 Chunk c0 = DecodeChunk(payloads[0]);
1056 EXPECT_EQ(c0.session_id(), 5u);
1057 EXPECT_EQ(c0.resource_id(), 5u);
1058 EXPECT_EQ(c0.type(), Chunk::Type::kStart);
1059
1060 // Send transfer parameters with a nonzero offset, requesting a seek.
1061 // Client should send a data chunk and the final chunk.
1062 rpc::test::WaitForPackets(context_.output(), 2, [this] {
1063 context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1064 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
1065 .set_session_id(5)
1066 .set_offset(16)
1067 .set_window_end_offset(64)
1068 .set_max_chunk_size_bytes(32)));
1069 });
1070
1071 ASSERT_EQ(payloads.size(), 3u);
1072
1073 Chunk c1 = DecodeChunk(payloads[1]);
1074 EXPECT_EQ(c1.session_id(), 5u);
1075 EXPECT_EQ(c1.offset(), 16u);
1076 EXPECT_TRUE(c1.has_payload());
1077 EXPECT_EQ(std::memcmp(c1.payload().data(),
1078 kData32.data() + c1.offset(),
1079 c1.payload().size()),
1080 0);
1081
1082 Chunk c2 = DecodeChunk(payloads[2]);
1083 EXPECT_EQ(c2.session_id(), 5u);
1084 ASSERT_TRUE(c2.remaining_bytes().has_value());
1085 EXPECT_EQ(c2.remaining_bytes().value(), 0u);
1086
1087 EXPECT_EQ(transfer_status, Status::Unknown());
1088
1089 // Send the final status chunk to complete the transfer.
1090 context_.server().SendServerStream<Transfer::Write>(
1091 EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 5, OkStatus())));
1092 transfer_thread_.WaitUntilEventIsProcessed();
1093
1094 EXPECT_EQ(payloads.size(), 3u);
1095 EXPECT_EQ(transfer_status, OkStatus());
1096 }
1097
1098 class FakeNonSeekableReader final : public stream::NonSeekableReader {
1099 public:
FakeNonSeekableReader(ConstByteSpan data)1100 FakeNonSeekableReader(ConstByteSpan data) : data_(data), position_(0) {}
1101
1102 private:
DoRead(ByteSpan out)1103 StatusWithSize DoRead(ByteSpan out) final {
1104 if (position_ == data_.size()) {
1105 return StatusWithSize::OutOfRange();
1106 }
1107
1108 size_t to_copy = std::min(out.size(), data_.size() - position_);
1109 std::memcpy(out.data(), data_.data() + position_, to_copy);
1110 position_ += to_copy;
1111
1112 return StatusWithSize(to_copy);
1113 }
1114
1115 ConstByteSpan data_;
1116 size_t position_;
1117 };
1118
TEST_F(WriteTransfer,OutOfOrder_SeekNotSupported)1119 TEST_F(WriteTransfer, OutOfOrder_SeekNotSupported) {
1120 FakeNonSeekableReader reader(kData32);
1121 Status transfer_status = Status::Unknown();
1122
1123 ASSERT_EQ(OkStatus(),
1124 client_.Write(6, reader, [&transfer_status](Status status) {
1125 transfer_status = status;
1126 }));
1127 transfer_thread_.WaitUntilEventIsProcessed();
1128
1129 // The client begins by sending the ID of the resource to transfer.
1130 rpc::PayloadsView payloads =
1131 context_.output().payloads<Transfer::Write>(context_.channel().id());
1132 ASSERT_EQ(payloads.size(), 1u);
1133 EXPECT_EQ(transfer_status, Status::Unknown());
1134
1135 Chunk c0 = DecodeChunk(payloads[0]);
1136 EXPECT_EQ(c0.session_id(), 6u);
1137 EXPECT_EQ(c0.resource_id(), 6u);
1138 EXPECT_EQ(c0.type(), Chunk::Type::kStart);
1139
1140 // Send transfer parameters with a nonzero offset, requesting a seek.
1141 context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1142 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
1143 .set_session_id(6)
1144 .set_offset(16)
1145 .set_window_end_offset(64)
1146 .set_max_chunk_size_bytes(32)));
1147 transfer_thread_.WaitUntilEventIsProcessed();
1148
1149 // Client should send a status chunk and end the transfer.
1150 ASSERT_EQ(payloads.size(), 2u);
1151
1152 Chunk c1 = DecodeChunk(payloads[1]);
1153 EXPECT_EQ(c1.session_id(), 6u);
1154 EXPECT_EQ(c1.type(), Chunk::Type::kCompletion);
1155 ASSERT_TRUE(c1.status().has_value());
1156 EXPECT_EQ(c1.status().value(), Status::Unimplemented());
1157
1158 EXPECT_EQ(transfer_status, Status::Unimplemented());
1159 }
1160
TEST_F(WriteTransfer,ServerError)1161 TEST_F(WriteTransfer, ServerError) {
1162 stream::MemoryReader reader(kData32);
1163 Status transfer_status = Status::Unknown();
1164
1165 ASSERT_EQ(OkStatus(),
1166 client_.Write(7, reader, [&transfer_status](Status status) {
1167 transfer_status = status;
1168 }));
1169 transfer_thread_.WaitUntilEventIsProcessed();
1170
1171 // The client begins by sending the ID of the resource to transfer.
1172 rpc::PayloadsView payloads =
1173 context_.output().payloads<Transfer::Write>(context_.channel().id());
1174 ASSERT_EQ(payloads.size(), 1u);
1175 EXPECT_EQ(transfer_status, Status::Unknown());
1176
1177 Chunk c0 = DecodeChunk(payloads[0]);
1178 EXPECT_EQ(c0.session_id(), 7u);
1179 EXPECT_EQ(c0.resource_id(), 7u);
1180 EXPECT_EQ(c0.type(), Chunk::Type::kStart);
1181
1182 // Send an error from the server.
1183 context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1184 Chunk::Final(ProtocolVersion::kLegacy, 7, Status::NotFound())));
1185 transfer_thread_.WaitUntilEventIsProcessed();
1186
1187 // Client should not respond and terminate the transfer.
1188 EXPECT_EQ(payloads.size(), 1u);
1189 EXPECT_EQ(transfer_status, Status::NotFound());
1190 }
1191
TEST_F(WriteTransfer,AbortIfZeroBytesAreRequested)1192 TEST_F(WriteTransfer, AbortIfZeroBytesAreRequested) {
1193 stream::MemoryReader reader(kData32);
1194 Status transfer_status = Status::Unknown();
1195
1196 ASSERT_EQ(OkStatus(),
1197 client_.Write(9, reader, [&transfer_status](Status status) {
1198 transfer_status = status;
1199 }));
1200 transfer_thread_.WaitUntilEventIsProcessed();
1201
1202 // The client begins by sending the ID of the resource to transfer.
1203 rpc::PayloadsView payloads =
1204 context_.output().payloads<Transfer::Write>(context_.channel().id());
1205 ASSERT_EQ(payloads.size(), 1u);
1206 EXPECT_EQ(transfer_status, Status::Unknown());
1207
1208 Chunk c0 = DecodeChunk(payloads[0]);
1209 EXPECT_EQ(c0.session_id(), 9u);
1210 EXPECT_EQ(c0.resource_id(), 9u);
1211 EXPECT_EQ(c0.type(), Chunk::Type::kStart);
1212
1213 // Send an invalid transfer parameters chunk with 0 pending bytes.
1214 context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1215 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
1216 .set_session_id(9)
1217 .set_offset(0)
1218 .set_window_end_offset(0)
1219 .set_max_chunk_size_bytes(32)));
1220 transfer_thread_.WaitUntilEventIsProcessed();
1221
1222 // Client should send a status chunk and end the transfer.
1223 ASSERT_EQ(payloads.size(), 2u);
1224
1225 Chunk c1 = DecodeChunk(payloads[1]);
1226 EXPECT_EQ(c1.session_id(), 9u);
1227 ASSERT_TRUE(c1.status().has_value());
1228 EXPECT_EQ(c1.status().value(), Status::ResourceExhausted());
1229
1230 EXPECT_EQ(transfer_status, Status::ResourceExhausted());
1231 }
1232
TEST_F(WriteTransfer,Timeout_RetriesWithInitialChunk)1233 TEST_F(WriteTransfer, Timeout_RetriesWithInitialChunk) {
1234 stream::MemoryReader reader(kData32);
1235 Status transfer_status = Status::Unknown();
1236
1237 ASSERT_EQ(OkStatus(),
1238 client_.Write(
1239 10,
1240 reader,
1241 [&transfer_status](Status status) { transfer_status = status; },
1242 kTestTimeout));
1243 transfer_thread_.WaitUntilEventIsProcessed();
1244
1245 // The client begins by sending the ID of the resource to transfer.
1246 rpc::PayloadsView payloads =
1247 context_.output().payloads<Transfer::Write>(context_.channel().id());
1248 ASSERT_EQ(payloads.size(), 1u);
1249 EXPECT_EQ(transfer_status, Status::Unknown());
1250
1251 Chunk c0 = DecodeChunk(payloads.back());
1252 EXPECT_EQ(c0.session_id(), 10u);
1253 EXPECT_EQ(c0.resource_id(), 10u);
1254 EXPECT_EQ(c0.type(), Chunk::Type::kStart);
1255
1256 // Wait for the timeout to expire without doing anything. The client should
1257 // resend the initial transmit chunk.
1258 transfer_thread_.SimulateClientTimeout(10);
1259 ASSERT_EQ(payloads.size(), 2u);
1260
1261 Chunk c = DecodeChunk(payloads.back());
1262 EXPECT_EQ(c.session_id(), 10u);
1263 EXPECT_EQ(c.resource_id(), 10u);
1264 EXPECT_EQ(c.type(), Chunk::Type::kStart);
1265
1266 // Transfer has not yet completed.
1267 EXPECT_EQ(transfer_status, Status::Unknown());
1268
1269 // Ensure we don't leave a dangling reference to transfer_status.
1270 client_.CancelTransfer(10);
1271 transfer_thread_.WaitUntilEventIsProcessed();
1272 }
1273
TEST_F(WriteTransfer,Timeout_RetriesWithMostRecentChunk)1274 TEST_F(WriteTransfer, Timeout_RetriesWithMostRecentChunk) {
1275 stream::MemoryReader reader(kData32);
1276 Status transfer_status = Status::Unknown();
1277
1278 ASSERT_EQ(OkStatus(),
1279 client_.Write(
1280 11,
1281 reader,
1282 [&transfer_status](Status status) { transfer_status = status; },
1283 kTestTimeout));
1284 transfer_thread_.WaitUntilEventIsProcessed();
1285
1286 // The client begins by sending the ID of the resource to transfer.
1287 rpc::PayloadsView payloads =
1288 context_.output().payloads<Transfer::Write>(context_.channel().id());
1289 ASSERT_EQ(payloads.size(), 1u);
1290 EXPECT_EQ(transfer_status, Status::Unknown());
1291
1292 Chunk c0 = DecodeChunk(payloads.back());
1293 EXPECT_EQ(c0.session_id(), 11u);
1294 EXPECT_EQ(c0.resource_id(), 11u);
1295 EXPECT_EQ(c0.type(), Chunk::Type::kStart);
1296
1297 // Send the first parameters chunk.
1298 rpc::test::WaitForPackets(context_.output(), 2, [this] {
1299 context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1300 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
1301 .set_session_id(11)
1302 .set_offset(0)
1303 .set_window_end_offset(16)
1304 .set_max_chunk_size_bytes(8)));
1305 });
1306 ASSERT_EQ(payloads.size(), 3u);
1307
1308 EXPECT_EQ(transfer_status, Status::Unknown());
1309
1310 Chunk c1 = DecodeChunk(payloads[1]);
1311 EXPECT_EQ(c1.session_id(), 11u);
1312 EXPECT_EQ(c1.offset(), 0u);
1313 EXPECT_EQ(c1.payload().size(), 8u);
1314 EXPECT_EQ(
1315 std::memcmp(c1.payload().data(), kData32.data(), c1.payload().size()), 0);
1316
1317 Chunk c2 = DecodeChunk(payloads[2]);
1318 EXPECT_EQ(c2.session_id(), 11u);
1319 EXPECT_EQ(c2.offset(), 8u);
1320 EXPECT_EQ(c2.payload().size(), 8u);
1321 EXPECT_EQ(std::memcmp(c2.payload().data(),
1322 kData32.data() + c2.offset(),
1323 c2.payload().size()),
1324 0);
1325
1326 // Wait for the timeout to expire without doing anything. The client should
1327 // resend the most recently sent chunk.
1328 transfer_thread_.SimulateClientTimeout(11);
1329 ASSERT_EQ(payloads.size(), 4u);
1330
1331 Chunk c3 = DecodeChunk(payloads[3]);
1332 EXPECT_EQ(c3.session_id(), c2.session_id());
1333 EXPECT_EQ(c3.offset(), c2.offset());
1334 EXPECT_EQ(c3.payload().size(), c2.payload().size());
1335 EXPECT_EQ(std::memcmp(
1336 c3.payload().data(), c2.payload().data(), c3.payload().size()),
1337 0);
1338
1339 // Transfer has not yet completed.
1340 EXPECT_EQ(transfer_status, Status::Unknown());
1341
1342 // Ensure we don't leave a dangling reference to transfer_status.
1343 client_.CancelTransfer(11);
1344 transfer_thread_.WaitUntilEventIsProcessed();
1345 }
1346
TEST_F(WriteTransfer,Timeout_RetriesWithSingleChunkTransfer)1347 TEST_F(WriteTransfer, Timeout_RetriesWithSingleChunkTransfer) {
1348 stream::MemoryReader reader(kData32);
1349 Status transfer_status = Status::Unknown();
1350
1351 ASSERT_EQ(OkStatus(),
1352 client_.Write(
1353 12,
1354 reader,
1355 [&transfer_status](Status status) { transfer_status = status; },
1356 kTestTimeout));
1357 transfer_thread_.WaitUntilEventIsProcessed();
1358
1359 // The client begins by sending the ID of the resource to transfer.
1360 rpc::PayloadsView payloads =
1361 context_.output().payloads<Transfer::Write>(context_.channel().id());
1362 ASSERT_EQ(payloads.size(), 1u);
1363 EXPECT_EQ(transfer_status, Status::Unknown());
1364
1365 Chunk c0 = DecodeChunk(payloads.back());
1366 EXPECT_EQ(c0.session_id(), 12u);
1367 EXPECT_EQ(c0.resource_id(), 12u);
1368 EXPECT_EQ(c0.type(), Chunk::Type::kStart);
1369
1370 // Send the first parameters chunk, requesting all the data. The client should
1371 // respond with one data chunk and a remaining_bytes = 0 chunk.
1372 rpc::test::WaitForPackets(context_.output(), 2, [this] {
1373 context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1374 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
1375 .set_session_id(12)
1376 .set_offset(0)
1377 .set_window_end_offset(64)
1378 .set_max_chunk_size_bytes(64)));
1379 });
1380 ASSERT_EQ(payloads.size(), 3u);
1381
1382 EXPECT_EQ(transfer_status, Status::Unknown());
1383
1384 Chunk c1 = DecodeChunk(payloads[1]);
1385 EXPECT_EQ(c1.session_id(), 12u);
1386 EXPECT_EQ(c1.offset(), 0u);
1387 EXPECT_EQ(c1.payload().size(), 32u);
1388 EXPECT_EQ(
1389 std::memcmp(c1.payload().data(), kData32.data(), c1.payload().size()), 0);
1390
1391 Chunk c2 = DecodeChunk(payloads[2]);
1392 EXPECT_EQ(c2.session_id(), 12u);
1393 ASSERT_TRUE(c2.remaining_bytes().has_value());
1394 EXPECT_EQ(c2.remaining_bytes().value(), 0u);
1395
1396 // Wait for the timeout to expire without doing anything. The client should
1397 // resend the data chunk.
1398 transfer_thread_.SimulateClientTimeout(12);
1399 ASSERT_EQ(payloads.size(), 4u);
1400
1401 Chunk c3 = DecodeChunk(payloads[3]);
1402 EXPECT_EQ(c3.session_id(), c1.session_id());
1403 EXPECT_EQ(c3.offset(), c1.offset());
1404 EXPECT_EQ(c3.payload().size(), c1.payload().size());
1405 EXPECT_EQ(std::memcmp(
1406 c3.payload().data(), c1.payload().data(), c3.payload().size()),
1407 0);
1408
1409 // The remaining_bytes = 0 chunk should be resent on the next parameters.
1410 context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1411 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
1412 .set_session_id(12)
1413 .set_offset(32)
1414 .set_window_end_offset(64)));
1415 transfer_thread_.WaitUntilEventIsProcessed();
1416
1417 ASSERT_EQ(payloads.size(), 5u);
1418
1419 Chunk c4 = DecodeChunk(payloads[4]);
1420 EXPECT_EQ(c4.session_id(), 12u);
1421 ASSERT_TRUE(c4.remaining_bytes().has_value());
1422 EXPECT_EQ(c4.remaining_bytes().value(), 0u);
1423
1424 context_.server().SendServerStream<Transfer::Write>(
1425 EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 12, OkStatus())));
1426 transfer_thread_.WaitUntilEventIsProcessed();
1427
1428 EXPECT_EQ(transfer_status, OkStatus());
1429 }
1430
TEST_F(WriteTransfer,Timeout_EndsTransferAfterMaxRetries)1431 TEST_F(WriteTransfer, Timeout_EndsTransferAfterMaxRetries) {
1432 stream::MemoryReader reader(kData32);
1433 Status transfer_status = Status::Unknown();
1434
1435 ASSERT_EQ(OkStatus(),
1436 client_.Write(
1437 13,
1438 reader,
1439 [&transfer_status](Status status) { transfer_status = status; },
1440 kTestTimeout));
1441 transfer_thread_.WaitUntilEventIsProcessed();
1442
1443 // The client begins by sending the ID of the resource to transfer.
1444 rpc::PayloadsView payloads =
1445 context_.output().payloads<Transfer::Write>(context_.channel().id());
1446 ASSERT_EQ(payloads.size(), 1u);
1447 EXPECT_EQ(transfer_status, Status::Unknown());
1448
1449 Chunk c0 = DecodeChunk(payloads.back());
1450 EXPECT_EQ(c0.session_id(), 13u);
1451 EXPECT_EQ(c0.resource_id(), 13u);
1452 EXPECT_EQ(c0.type(), Chunk::Type::kStart);
1453
1454 for (unsigned retry = 1; retry <= kTestRetries; ++retry) {
1455 // Wait for the timeout to expire without doing anything. The client should
1456 // resend the initial transmit chunk.
1457 transfer_thread_.SimulateClientTimeout(13);
1458 ASSERT_EQ(payloads.size(), retry + 1);
1459
1460 Chunk c = DecodeChunk(payloads.back());
1461 EXPECT_EQ(c.session_id(), 13u);
1462 EXPECT_EQ(c.resource_id(), 13u);
1463 EXPECT_EQ(c.type(), Chunk::Type::kStart);
1464
1465 // Transfer has not yet completed.
1466 EXPECT_EQ(transfer_status, Status::Unknown());
1467 }
1468
1469 // Sleep one more time after the final retry. The client should cancel the
1470 // transfer at this point. As no packets were received from the server, no
1471 // final status chunk should be sent.
1472 transfer_thread_.SimulateClientTimeout(13);
1473 ASSERT_EQ(payloads.size(), 4u);
1474
1475 EXPECT_EQ(transfer_status, Status::DeadlineExceeded());
1476
1477 // After finishing the transfer, nothing else should be sent. Verify this by
1478 // waiting for a bit.
1479 this_thread::sleep_for(kTestTimeout * 4);
1480 ASSERT_EQ(payloads.size(), 4u);
1481
1482 // Ensure we don't leave a dangling reference to transfer_status.
1483 client_.CancelTransfer(13);
1484 transfer_thread_.WaitUntilEventIsProcessed();
1485 }
1486
TEST_F(WriteTransfer,Timeout_NonSeekableReaderEndsTransfer)1487 TEST_F(WriteTransfer, Timeout_NonSeekableReaderEndsTransfer) {
1488 FakeNonSeekableReader reader(kData32);
1489 Status transfer_status = Status::Unknown();
1490
1491 ASSERT_EQ(OkStatus(),
1492 client_.Write(
1493 14,
1494 reader,
1495 [&transfer_status](Status status) { transfer_status = status; },
1496 kTestTimeout));
1497 transfer_thread_.WaitUntilEventIsProcessed();
1498
1499 // The client begins by sending the ID of the resource to transfer.
1500 rpc::PayloadsView payloads =
1501 context_.output().payloads<Transfer::Write>(context_.channel().id());
1502 ASSERT_EQ(payloads.size(), 1u);
1503 EXPECT_EQ(transfer_status, Status::Unknown());
1504
1505 Chunk c0 = DecodeChunk(payloads.back());
1506 EXPECT_EQ(c0.session_id(), 14u);
1507 EXPECT_EQ(c0.resource_id(), 14u);
1508 EXPECT_EQ(c0.type(), Chunk::Type::kStart);
1509
1510 // Send the first parameters chunk.
1511 rpc::test::WaitForPackets(context_.output(), 2, [this] {
1512 context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1513 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
1514 .set_session_id(14)
1515 .set_offset(0)
1516 .set_window_end_offset(16)
1517 .set_max_chunk_size_bytes(8)));
1518 });
1519 ASSERT_EQ(payloads.size(), 3u);
1520
1521 EXPECT_EQ(transfer_status, Status::Unknown());
1522
1523 Chunk c1 = DecodeChunk(payloads[1]);
1524 EXPECT_EQ(c1.session_id(), 14u);
1525 EXPECT_EQ(c1.offset(), 0u);
1526 EXPECT_TRUE(c1.has_payload());
1527 EXPECT_EQ(c1.payload().size(), 8u);
1528 EXPECT_EQ(
1529 std::memcmp(c1.payload().data(), kData32.data(), c1.payload().size()), 0);
1530
1531 Chunk c2 = DecodeChunk(payloads[2]);
1532 EXPECT_EQ(c2.session_id(), 14u);
1533 EXPECT_EQ(c2.offset(), 8u);
1534 EXPECT_TRUE(c2.has_payload());
1535 EXPECT_EQ(c2.payload().size(), 8u);
1536 EXPECT_EQ(std::memcmp(c2.payload().data(),
1537 kData32.data() + c2.offset(),
1538 c2.payload().size()),
1539 0);
1540
1541 // Wait for the timeout to expire without doing anything. The client should
1542 // fail to seek back and end the transfer.
1543 transfer_thread_.SimulateClientTimeout(14);
1544 ASSERT_EQ(payloads.size(), 4u);
1545
1546 Chunk c3 = DecodeChunk(payloads[3]);
1547 EXPECT_EQ(c3.protocol_version(), ProtocolVersion::kLegacy);
1548 EXPECT_EQ(c3.session_id(), 14u);
1549 ASSERT_TRUE(c3.status().has_value());
1550 EXPECT_EQ(c3.status().value(), Status::DeadlineExceeded());
1551
1552 EXPECT_EQ(transfer_status, Status::DeadlineExceeded());
1553
1554 // Ensure we don't leave a dangling reference to transfer_status.
1555 client_.CancelTransfer(14);
1556 transfer_thread_.WaitUntilEventIsProcessed();
1557 }
1558
TEST_F(WriteTransfer,ManualCancel)1559 TEST_F(WriteTransfer, ManualCancel) {
1560 stream::MemoryReader reader(kData32);
1561 Status transfer_status = Status::Unknown();
1562
1563 ASSERT_EQ(OkStatus(),
1564 client_.Write(
1565 15,
1566 reader,
1567 [&transfer_status](Status status) { transfer_status = status; },
1568 kTestTimeout));
1569 transfer_thread_.WaitUntilEventIsProcessed();
1570
1571 // The client begins by sending the ID of the resource to transfer.
1572 rpc::PayloadsView payloads =
1573 context_.output().payloads<Transfer::Write>(context_.channel().id());
1574 ASSERT_EQ(payloads.size(), 1u);
1575 EXPECT_EQ(transfer_status, Status::Unknown());
1576
1577 Chunk chunk = DecodeChunk(payloads.back());
1578 EXPECT_EQ(chunk.session_id(), 15u);
1579 EXPECT_EQ(chunk.resource_id(), 15u);
1580 EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
1581
1582 // Get a response from the server, then cancel the transfer.
1583 context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
1584 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
1585 .set_session_id(15)
1586 .set_offset(0)
1587 .set_window_end_offset(64)
1588 .set_max_chunk_size_bytes(32)));
1589 transfer_thread_.WaitUntilEventIsProcessed();
1590 ASSERT_EQ(payloads.size(), 2u);
1591
1592 client_.CancelTransfer(15);
1593 transfer_thread_.WaitUntilEventIsProcessed();
1594
1595 // Client should send a cancellation chunk to the server.
1596 ASSERT_EQ(payloads.size(), 3u);
1597 chunk = DecodeChunk(payloads.back());
1598 EXPECT_EQ(chunk.session_id(), 15u);
1599 ASSERT_EQ(chunk.type(), Chunk::Type::kCompletion);
1600 EXPECT_EQ(chunk.status().value(), Status::Cancelled());
1601
1602 EXPECT_EQ(transfer_status, Status::Cancelled());
1603 }
1604
TEST_F(WriteTransfer,ManualCancel_NoContact)1605 TEST_F(WriteTransfer, ManualCancel_NoContact) {
1606 stream::MemoryReader reader(kData32);
1607 Status transfer_status = Status::Unknown();
1608
1609 ASSERT_EQ(OkStatus(),
1610 client_.Write(
1611 15,
1612 reader,
1613 [&transfer_status](Status status) { transfer_status = status; },
1614 kTestTimeout));
1615 transfer_thread_.WaitUntilEventIsProcessed();
1616
1617 // The client begins by sending the ID of the resource to transfer.
1618 rpc::PayloadsView payloads =
1619 context_.output().payloads<Transfer::Write>(context_.channel().id());
1620 ASSERT_EQ(payloads.size(), 1u);
1621 EXPECT_EQ(transfer_status, Status::Unknown());
1622
1623 Chunk chunk = DecodeChunk(payloads.back());
1624 EXPECT_EQ(chunk.session_id(), 15u);
1625 EXPECT_EQ(chunk.resource_id(), 15u);
1626 EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
1627
1628 // Cancel transfer without a server response. No final chunk should be sent.
1629 client_.CancelTransfer(15);
1630 transfer_thread_.WaitUntilEventIsProcessed();
1631
1632 ASSERT_EQ(payloads.size(), 1u);
1633
1634 EXPECT_EQ(transfer_status, Status::Cancelled());
1635 }
1636
TEST_F(ReadTransfer,Version2_SingleChunk)1637 TEST_F(ReadTransfer, Version2_SingleChunk) {
1638 stream::MemoryWriterBuffer<64> writer;
1639 Status transfer_status = Status::Unknown();
1640
1641 ASSERT_EQ(OkStatus(),
1642 client_.Read(
1643 3,
1644 writer,
1645 [&transfer_status](Status status) { transfer_status = status; },
1646 cfg::kDefaultChunkTimeout,
1647 ProtocolVersion::kVersionTwo));
1648
1649 transfer_thread_.WaitUntilEventIsProcessed();
1650
1651 // Initial chunk of the transfer is sent. This chunk should contain all the
1652 // fields from both legacy and version 2 protocols for backwards
1653 // compatibility.
1654 rpc::PayloadsView payloads =
1655 context_.output().payloads<Transfer::Read>(context_.channel().id());
1656 ASSERT_EQ(payloads.size(), 1u);
1657 EXPECT_EQ(transfer_status, Status::Unknown());
1658
1659 Chunk chunk = DecodeChunk(payloads[0]);
1660 EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
1661 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1662 EXPECT_EQ(chunk.session_id(), 3u);
1663 EXPECT_EQ(chunk.resource_id(), 3u);
1664 EXPECT_EQ(chunk.offset(), 0u);
1665 EXPECT_EQ(chunk.window_end_offset(), 64u);
1666 EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
1667
1668 // The server responds with a START_ACK, continuing the version 2 handshake
1669 // and assigning a session_id to the transfer.
1670 context_.server().SendServerStream<Transfer::Read>(
1671 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
1672 .set_session_id(29)
1673 .set_resource_id(3)));
1674 transfer_thread_.WaitUntilEventIsProcessed();
1675
1676 ASSERT_EQ(payloads.size(), 2u);
1677
1678 // Client should accept the session_id with a START_ACK_CONFIRMATION,
1679 // additionally containing the initial parameters for the read transfer.
1680 chunk = DecodeChunk(payloads.back());
1681 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
1682 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1683 EXPECT_EQ(chunk.session_id(), 29u);
1684 EXPECT_FALSE(chunk.resource_id().has_value());
1685 EXPECT_EQ(chunk.offset(), 0u);
1686 EXPECT_EQ(chunk.window_end_offset(), 64u);
1687 EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
1688
1689 // Send all the transfer data. Client should accept it and complete the
1690 // transfer.
1691 context_.server().SendServerStream<Transfer::Read>(
1692 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
1693 .set_session_id(29)
1694 .set_offset(0)
1695 .set_payload(kData32)
1696 .set_remaining_bytes(0)));
1697 transfer_thread_.WaitUntilEventIsProcessed();
1698
1699 ASSERT_EQ(payloads.size(), 3u);
1700
1701 chunk = DecodeChunk(payloads.back());
1702 EXPECT_EQ(chunk.session_id(), 29u);
1703 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
1704 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1705 ASSERT_TRUE(chunk.status().has_value());
1706 EXPECT_EQ(chunk.status().value(), OkStatus());
1707
1708 EXPECT_EQ(transfer_status, OkStatus());
1709 EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
1710 0);
1711
1712 context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
1713 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
1714 .set_session_id(29)));
1715 }
1716
TEST_F(ReadTransfer,Version2_ServerRunsLegacy)1717 TEST_F(ReadTransfer, Version2_ServerRunsLegacy) {
1718 stream::MemoryWriterBuffer<64> writer;
1719 Status transfer_status = Status::Unknown();
1720
1721 ASSERT_EQ(OkStatus(),
1722 client_.Read(
1723 3,
1724 writer,
1725 [&transfer_status](Status status) { transfer_status = status; },
1726 cfg::kDefaultChunkTimeout,
1727 ProtocolVersion::kVersionTwo));
1728
1729 transfer_thread_.WaitUntilEventIsProcessed();
1730
1731 // Initial chunk of the transfer is sent. This chunk should contain all the
1732 // fields from both legacy and version 2 protocols for backwards
1733 // compatibility.
1734 rpc::PayloadsView payloads =
1735 context_.output().payloads<Transfer::Read>(context_.channel().id());
1736 ASSERT_EQ(payloads.size(), 1u);
1737 EXPECT_EQ(transfer_status, Status::Unknown());
1738
1739 Chunk chunk = DecodeChunk(payloads[0]);
1740 EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
1741 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1742 EXPECT_EQ(chunk.session_id(), 3u);
1743 EXPECT_EQ(chunk.resource_id(), 3u);
1744 EXPECT_EQ(chunk.offset(), 0u);
1745 EXPECT_EQ(chunk.window_end_offset(), 64u);
1746 EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
1747
1748 // Instead of a START_ACK to continue the handshake, the server responds with
1749 // an immediate data chunk, indicating that it is running the legacy protocol
1750 // version. Client should revert to legacy, using the resource_id of 3 as the
1751 // session_id, and complete the transfer.
1752 context_.server().SendServerStream<Transfer::Read>(
1753 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1754 .set_session_id(3)
1755 .set_offset(0)
1756 .set_payload(kData32)
1757 .set_remaining_bytes(0)));
1758 transfer_thread_.WaitUntilEventIsProcessed();
1759
1760 ASSERT_EQ(payloads.size(), 2u);
1761
1762 chunk = DecodeChunk(payloads.back());
1763 EXPECT_EQ(chunk.session_id(), 3u);
1764 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
1765 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kLegacy);
1766 ASSERT_TRUE(chunk.status().has_value());
1767 EXPECT_EQ(chunk.status().value(), OkStatus());
1768
1769 EXPECT_EQ(transfer_status, OkStatus());
1770 EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
1771 0);
1772 }
1773
TEST_F(ReadTransfer,Version2_TimeoutDuringHandshake)1774 TEST_F(ReadTransfer, Version2_TimeoutDuringHandshake) {
1775 stream::MemoryWriterBuffer<64> writer;
1776 Status transfer_status = Status::Unknown();
1777
1778 ASSERT_EQ(OkStatus(),
1779 client_.Read(
1780 3,
1781 writer,
1782 [&transfer_status](Status status) { transfer_status = status; },
1783 cfg::kDefaultChunkTimeout,
1784 ProtocolVersion::kVersionTwo));
1785
1786 transfer_thread_.WaitUntilEventIsProcessed();
1787
1788 // Initial chunk of the transfer is sent. This chunk should contain all the
1789 // fields from both legacy and version 2 protocols for backwards
1790 // compatibility.
1791 rpc::PayloadsView payloads =
1792 context_.output().payloads<Transfer::Read>(context_.channel().id());
1793 ASSERT_EQ(payloads.size(), 1u);
1794 EXPECT_EQ(transfer_status, Status::Unknown());
1795
1796 Chunk chunk = DecodeChunk(payloads.back());
1797 EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
1798 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1799 EXPECT_EQ(chunk.session_id(), 3u);
1800 EXPECT_EQ(chunk.resource_id(), 3u);
1801 EXPECT_EQ(chunk.offset(), 0u);
1802 EXPECT_EQ(chunk.window_end_offset(), 64u);
1803 EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
1804
1805 // Wait for the timeout to expire without doing anything. The client should
1806 // resend the initial chunk.
1807 transfer_thread_.SimulateClientTimeout(3);
1808 ASSERT_EQ(payloads.size(), 2u);
1809
1810 chunk = DecodeChunk(payloads.back());
1811 EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
1812 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1813 EXPECT_EQ(chunk.session_id(), 3u);
1814 EXPECT_EQ(chunk.resource_id(), 3u);
1815 EXPECT_EQ(chunk.offset(), 0u);
1816 EXPECT_EQ(chunk.window_end_offset(), 64u);
1817 EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
1818
1819 // This time, the server responds, continuing the handshake and transfer.
1820 context_.server().SendServerStream<Transfer::Read>(
1821 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
1822 .set_session_id(31)
1823 .set_resource_id(3)));
1824 transfer_thread_.WaitUntilEventIsProcessed();
1825
1826 ASSERT_EQ(payloads.size(), 3u);
1827
1828 chunk = DecodeChunk(payloads.back());
1829 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
1830 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1831 EXPECT_EQ(chunk.session_id(), 31u);
1832 EXPECT_FALSE(chunk.resource_id().has_value());
1833 EXPECT_EQ(chunk.offset(), 0u);
1834 EXPECT_EQ(chunk.window_end_offset(), 64u);
1835 EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
1836
1837 context_.server().SendServerStream<Transfer::Read>(
1838 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
1839 .set_session_id(31)
1840 .set_offset(0)
1841 .set_payload(kData32)
1842 .set_remaining_bytes(0)));
1843 transfer_thread_.WaitUntilEventIsProcessed();
1844
1845 ASSERT_EQ(payloads.size(), 4u);
1846
1847 chunk = DecodeChunk(payloads.back());
1848 EXPECT_EQ(chunk.session_id(), 31u);
1849 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
1850 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1851 ASSERT_TRUE(chunk.status().has_value());
1852 EXPECT_EQ(chunk.status().value(), OkStatus());
1853
1854 EXPECT_EQ(transfer_status, OkStatus());
1855 EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
1856 0);
1857
1858 context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
1859 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
1860 .set_session_id(31)));
1861 }
1862
TEST_F(ReadTransfer,Version2_TimeoutAfterHandshake)1863 TEST_F(ReadTransfer, Version2_TimeoutAfterHandshake) {
1864 stream::MemoryWriterBuffer<64> writer;
1865 Status transfer_status = Status::Unknown();
1866
1867 ASSERT_EQ(OkStatus(),
1868 client_.Read(
1869 3,
1870 writer,
1871 [&transfer_status](Status status) { transfer_status = status; },
1872 cfg::kDefaultChunkTimeout,
1873 ProtocolVersion::kVersionTwo));
1874
1875 transfer_thread_.WaitUntilEventIsProcessed();
1876
1877 // Initial chunk of the transfer is sent. This chunk should contain all the
1878 // fields from both legacy and version 2 protocols for backwards
1879 // compatibility.
1880 rpc::PayloadsView payloads =
1881 context_.output().payloads<Transfer::Read>(context_.channel().id());
1882 ASSERT_EQ(payloads.size(), 1u);
1883 EXPECT_EQ(transfer_status, Status::Unknown());
1884
1885 Chunk chunk = DecodeChunk(payloads.back());
1886 EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
1887 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1888 EXPECT_EQ(chunk.session_id(), 3u);
1889 EXPECT_EQ(chunk.resource_id(), 3u);
1890 EXPECT_EQ(chunk.offset(), 0u);
1891 EXPECT_EQ(chunk.window_end_offset(), 64u);
1892 EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
1893
1894 // The server responds with a START_ACK, continuing the version 2 handshake
1895 // and assigning a session_id to the transfer.
1896 context_.server().SendServerStream<Transfer::Read>(
1897 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
1898 .set_session_id(33)
1899 .set_resource_id(3)));
1900 transfer_thread_.WaitUntilEventIsProcessed();
1901
1902 ASSERT_EQ(payloads.size(), 2u);
1903
1904 // Client should accept the session_id with a START_ACK_CONFIRMATION,
1905 // additionally containing the initial parameters for the read transfer.
1906 chunk = DecodeChunk(payloads.back());
1907 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
1908 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1909 EXPECT_EQ(chunk.session_id(), 33u);
1910 EXPECT_FALSE(chunk.resource_id().has_value());
1911 EXPECT_EQ(chunk.offset(), 0u);
1912 EXPECT_EQ(chunk.window_end_offset(), 64u);
1913 EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
1914
1915 // Wait for the timeout to expire without doing anything. The client should
1916 // resend the confirmation chunk.
1917 transfer_thread_.SimulateClientTimeout(33);
1918 ASSERT_EQ(payloads.size(), 3u);
1919
1920 chunk = DecodeChunk(payloads.back());
1921 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
1922 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1923 EXPECT_EQ(chunk.session_id(), 33u);
1924 EXPECT_FALSE(chunk.resource_id().has_value());
1925 EXPECT_EQ(chunk.offset(), 0u);
1926 EXPECT_EQ(chunk.window_end_offset(), 64u);
1927 EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
1928
1929 // The server responds and the transfer should continue normally.
1930 context_.server().SendServerStream<Transfer::Read>(
1931 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
1932 .set_session_id(33)
1933 .set_offset(0)
1934 .set_payload(kData32)
1935 .set_remaining_bytes(0)));
1936 transfer_thread_.WaitUntilEventIsProcessed();
1937
1938 ASSERT_EQ(payloads.size(), 4u);
1939
1940 chunk = DecodeChunk(payloads.back());
1941 EXPECT_EQ(chunk.session_id(), 33u);
1942 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
1943 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1944 ASSERT_TRUE(chunk.status().has_value());
1945 EXPECT_EQ(chunk.status().value(), OkStatus());
1946
1947 EXPECT_EQ(transfer_status, OkStatus());
1948 EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
1949 0);
1950
1951 context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
1952 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
1953 .set_session_id(33)));
1954 }
1955
TEST_F(ReadTransfer,Version2_ServerErrorDuringHandshake)1956 TEST_F(ReadTransfer, Version2_ServerErrorDuringHandshake) {
1957 stream::MemoryWriterBuffer<64> writer;
1958 Status transfer_status = Status::Unknown();
1959
1960 ASSERT_EQ(OkStatus(),
1961 client_.Read(
1962 3,
1963 writer,
1964 [&transfer_status](Status status) { transfer_status = status; },
1965 cfg::kDefaultChunkTimeout,
1966 ProtocolVersion::kVersionTwo));
1967
1968 transfer_thread_.WaitUntilEventIsProcessed();
1969
1970 // Initial chunk of the transfer is sent. This chunk should contain all the
1971 // fields from both legacy and version 2 protocols for backwards
1972 // compatibility.
1973 rpc::PayloadsView payloads =
1974 context_.output().payloads<Transfer::Read>(context_.channel().id());
1975 ASSERT_EQ(payloads.size(), 1u);
1976 EXPECT_EQ(transfer_status, Status::Unknown());
1977
1978 Chunk chunk = DecodeChunk(payloads.back());
1979 EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
1980 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1981 EXPECT_EQ(chunk.session_id(), 3u);
1982 EXPECT_EQ(chunk.resource_id(), 3u);
1983 EXPECT_EQ(chunk.offset(), 0u);
1984 EXPECT_EQ(chunk.window_end_offset(), 64u);
1985 EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
1986
1987 // The server responds to the start request with an error.
1988 context_.server().SendServerStream<Transfer::Read>(EncodeChunk(Chunk::Final(
1989 ProtocolVersion::kVersionTwo, 3, Status::Unauthenticated())));
1990 transfer_thread_.WaitUntilEventIsProcessed();
1991
1992 EXPECT_EQ(payloads.size(), 1u);
1993 EXPECT_EQ(transfer_status, Status::Unauthenticated());
1994 }
1995
TEST_F(ReadTransfer,Version2_TimeoutWaitingForCompletionAckRetries)1996 TEST_F(ReadTransfer, Version2_TimeoutWaitingForCompletionAckRetries) {
1997 stream::MemoryWriterBuffer<64> writer;
1998 Status transfer_status = Status::Unknown();
1999
2000 ASSERT_EQ(OkStatus(),
2001 client_.Read(
2002 3,
2003 writer,
2004 [&transfer_status](Status status) { transfer_status = status; },
2005 cfg::kDefaultChunkTimeout,
2006 ProtocolVersion::kVersionTwo));
2007
2008 transfer_thread_.WaitUntilEventIsProcessed();
2009
2010 // Initial chunk of the transfer is sent. This chunk should contain all the
2011 // fields from both legacy and version 2 protocols for backwards
2012 // compatibility.
2013 rpc::PayloadsView payloads =
2014 context_.output().payloads<Transfer::Read>(context_.channel().id());
2015 ASSERT_EQ(payloads.size(), 1u);
2016 EXPECT_EQ(transfer_status, Status::Unknown());
2017
2018 Chunk chunk = DecodeChunk(payloads[0]);
2019 EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2020 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2021 EXPECT_EQ(chunk.session_id(), 3u);
2022 EXPECT_EQ(chunk.resource_id(), 3u);
2023 EXPECT_EQ(chunk.offset(), 0u);
2024 EXPECT_EQ(chunk.window_end_offset(), 64u);
2025 EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
2026
2027 // The server responds with a START_ACK, continuing the version 2 handshake
2028 // and assigning a session_id to the transfer.
2029 context_.server().SendServerStream<Transfer::Read>(
2030 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
2031 .set_session_id(29)
2032 .set_resource_id(3)));
2033 transfer_thread_.WaitUntilEventIsProcessed();
2034
2035 ASSERT_EQ(payloads.size(), 2u);
2036
2037 // Client should accept the session_id with a START_ACK_CONFIRMATION,
2038 // additionally containing the initial parameters for the read transfer.
2039 chunk = DecodeChunk(payloads.back());
2040 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
2041 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2042 EXPECT_EQ(chunk.session_id(), 29u);
2043 EXPECT_FALSE(chunk.resource_id().has_value());
2044 EXPECT_EQ(chunk.offset(), 0u);
2045 EXPECT_EQ(chunk.window_end_offset(), 64u);
2046 EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
2047
2048 // Send all the transfer data. Client should accept it and complete the
2049 // transfer.
2050 context_.server().SendServerStream<Transfer::Read>(
2051 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2052 .set_session_id(29)
2053 .set_offset(0)
2054 .set_payload(kData32)
2055 .set_remaining_bytes(0)));
2056 transfer_thread_.WaitUntilEventIsProcessed();
2057
2058 ASSERT_EQ(payloads.size(), 3u);
2059
2060 chunk = DecodeChunk(payloads.back());
2061 EXPECT_EQ(chunk.session_id(), 29u);
2062 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2063 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2064 ASSERT_TRUE(chunk.status().has_value());
2065 EXPECT_EQ(chunk.status().value(), OkStatus());
2066
2067 EXPECT_EQ(transfer_status, OkStatus());
2068 EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
2069 0);
2070
2071 // Time out instead of sending a completion ACK. THe transfer should resend
2072 // its completion chunk.
2073 transfer_thread_.SimulateClientTimeout(29);
2074 ASSERT_EQ(payloads.size(), 4u);
2075
2076 // Reset transfer_status to check whether the handler is called again.
2077 transfer_status = Status::Unknown();
2078
2079 chunk = DecodeChunk(payloads.back());
2080 EXPECT_EQ(chunk.session_id(), 29u);
2081 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2082 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2083 ASSERT_TRUE(chunk.status().has_value());
2084 EXPECT_EQ(chunk.status().value(), OkStatus());
2085
2086 // Transfer handler should not be called a second time in response to the
2087 // re-sent completion chunk.
2088 EXPECT_EQ(transfer_status, Status::Unknown());
2089
2090 // Send a completion ACK to end the transfer.
2091 context_.server().SendServerStream<Transfer::Read>(EncodeChunk(
2092 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
2093 .set_session_id(29)));
2094 transfer_thread_.WaitUntilEventIsProcessed();
2095
2096 // No further chunks should be sent following the ACK.
2097 transfer_thread_.SimulateClientTimeout(29);
2098 ASSERT_EQ(payloads.size(), 4u);
2099 }
2100
TEST_F(ReadTransfer,Version2_TimeoutWaitingForCompletionAckEndsTransferAfterRetries)2101 TEST_F(ReadTransfer,
2102 Version2_TimeoutWaitingForCompletionAckEndsTransferAfterRetries) {
2103 stream::MemoryWriterBuffer<64> writer;
2104 Status transfer_status = Status::Unknown();
2105
2106 ASSERT_EQ(OkStatus(),
2107 client_.Read(
2108 3,
2109 writer,
2110 [&transfer_status](Status status) { transfer_status = status; },
2111 cfg::kDefaultChunkTimeout,
2112 ProtocolVersion::kVersionTwo));
2113
2114 transfer_thread_.WaitUntilEventIsProcessed();
2115
2116 // Initial chunk of the transfer is sent. This chunk should contain all the
2117 // fields from both legacy and version 2 protocols for backwards
2118 // compatibility.
2119 rpc::PayloadsView payloads =
2120 context_.output().payloads<Transfer::Read>(context_.channel().id());
2121 ASSERT_EQ(payloads.size(), 1u);
2122 EXPECT_EQ(transfer_status, Status::Unknown());
2123
2124 Chunk chunk = DecodeChunk(payloads[0]);
2125 EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2126 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2127 EXPECT_EQ(chunk.session_id(), 3u);
2128 EXPECT_EQ(chunk.resource_id(), 3u);
2129 EXPECT_EQ(chunk.offset(), 0u);
2130 EXPECT_EQ(chunk.window_end_offset(), 64u);
2131 EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
2132
2133 // The server responds with a START_ACK, continuing the version 2 handshake
2134 // and assigning a session_id to the transfer.
2135 context_.server().SendServerStream<Transfer::Read>(
2136 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
2137 .set_session_id(29)
2138 .set_resource_id(3)));
2139 transfer_thread_.WaitUntilEventIsProcessed();
2140
2141 ASSERT_EQ(payloads.size(), 2u);
2142
2143 // Client should accept the session_id with a START_ACK_CONFIRMATION,
2144 // additionally containing the initial parameters for the read transfer.
2145 chunk = DecodeChunk(payloads.back());
2146 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
2147 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2148 EXPECT_EQ(chunk.session_id(), 29u);
2149 EXPECT_FALSE(chunk.resource_id().has_value());
2150 EXPECT_EQ(chunk.offset(), 0u);
2151 EXPECT_EQ(chunk.window_end_offset(), 64u);
2152 EXPECT_EQ(chunk.max_chunk_size_bytes(), 37u);
2153
2154 // Send all the transfer data. Client should accept it and complete the
2155 // transfer.
2156 context_.server().SendServerStream<Transfer::Read>(
2157 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2158 .set_session_id(29)
2159 .set_offset(0)
2160 .set_payload(kData32)
2161 .set_remaining_bytes(0)));
2162 transfer_thread_.WaitUntilEventIsProcessed();
2163
2164 ASSERT_EQ(payloads.size(), 3u);
2165
2166 chunk = DecodeChunk(payloads.back());
2167 EXPECT_EQ(chunk.session_id(), 29u);
2168 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2169 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2170 ASSERT_TRUE(chunk.status().has_value());
2171 EXPECT_EQ(chunk.status().value(), OkStatus());
2172
2173 EXPECT_EQ(transfer_status, OkStatus());
2174 EXPECT_EQ(std::memcmp(writer.data(), kData32.data(), writer.bytes_written()),
2175 0);
2176
2177 // Time out instead of sending a completion ACK. THe transfer should resend
2178 // its completion chunk at first, then terminate after the maximum number of
2179 // retries.
2180 transfer_thread_.SimulateClientTimeout(29);
2181 ASSERT_EQ(payloads.size(), 4u); // Retry 1.
2182
2183 transfer_thread_.SimulateClientTimeout(29);
2184 ASSERT_EQ(payloads.size(), 5u); // Retry 2.
2185
2186 transfer_thread_.SimulateClientTimeout(29);
2187 ASSERT_EQ(payloads.size(), 6u); // Retry 3.
2188
2189 transfer_thread_.SimulateClientTimeout(29);
2190 ASSERT_EQ(payloads.size(), 6u); // No more retries; transfer has ended.
2191 }
2192
TEST_F(WriteTransfer,Version2_SingleChunk)2193 TEST_F(WriteTransfer, Version2_SingleChunk) {
2194 stream::MemoryReader reader(kData32);
2195 Status transfer_status = Status::Unknown();
2196
2197 ASSERT_EQ(OkStatus(),
2198 client_.Write(
2199 3,
2200 reader,
2201 [&transfer_status](Status status) { transfer_status = status; },
2202 cfg::kDefaultChunkTimeout,
2203 ProtocolVersion::kVersionTwo));
2204 transfer_thread_.WaitUntilEventIsProcessed();
2205
2206 // The client begins by sending the ID of the resource to transfer.
2207 rpc::PayloadsView payloads =
2208 context_.output().payloads<Transfer::Write>(context_.channel().id());
2209 ASSERT_EQ(payloads.size(), 1u);
2210 EXPECT_EQ(transfer_status, Status::Unknown());
2211
2212 Chunk chunk = DecodeChunk(payloads.back());
2213 EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2214 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2215 EXPECT_EQ(chunk.session_id(), 3u);
2216 EXPECT_EQ(chunk.resource_id(), 3u);
2217
2218 // The server responds with a START_ACK, continuing the version 2 handshake
2219 // and assigning a session_id to the transfer.
2220 context_.server().SendServerStream<Transfer::Write>(
2221 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
2222 .set_session_id(29)
2223 .set_resource_id(3)));
2224 transfer_thread_.WaitUntilEventIsProcessed();
2225
2226 ASSERT_EQ(payloads.size(), 2u);
2227
2228 // Client should accept the session_id with a START_ACK_CONFIRMATION.
2229 chunk = DecodeChunk(payloads.back());
2230 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
2231 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2232 EXPECT_EQ(chunk.session_id(), 29u);
2233 EXPECT_FALSE(chunk.resource_id().has_value());
2234
2235 // The server can then begin the data transfer by sending its transfer
2236 // parameters. Client should respond with a data chunk and the final chunk.
2237 rpc::test::WaitForPackets(context_.output(), 2, [this] {
2238 context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
2239 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kParametersRetransmit)
2240 .set_session_id(29)
2241 .set_offset(0)
2242 .set_window_end_offset(64)
2243 .set_max_chunk_size_bytes(32)));
2244 });
2245
2246 ASSERT_EQ(payloads.size(), 4u);
2247
2248 chunk = DecodeChunk(payloads[2]);
2249 EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2250 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2251 EXPECT_EQ(chunk.session_id(), 29u);
2252 EXPECT_EQ(chunk.offset(), 0u);
2253 EXPECT_TRUE(chunk.has_payload());
2254 EXPECT_EQ(std::memcmp(
2255 chunk.payload().data(), kData32.data(), chunk.payload().size()),
2256 0);
2257
2258 chunk = DecodeChunk(payloads[3]);
2259 EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2260 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2261 EXPECT_EQ(chunk.session_id(), 29u);
2262 ASSERT_TRUE(chunk.remaining_bytes().has_value());
2263 EXPECT_EQ(chunk.remaining_bytes().value(), 0u);
2264
2265 EXPECT_EQ(transfer_status, Status::Unknown());
2266
2267 // Send the final status chunk to complete the transfer.
2268 context_.server().SendServerStream<Transfer::Write>(
2269 EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 29, OkStatus())));
2270 transfer_thread_.WaitUntilEventIsProcessed();
2271
2272 // Client should acknowledge the completion of the transfer.
2273 EXPECT_EQ(payloads.size(), 5u);
2274
2275 chunk = DecodeChunk(payloads[4]);
2276 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletionAck);
2277 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2278 EXPECT_EQ(chunk.session_id(), 29u);
2279
2280 EXPECT_EQ(transfer_status, OkStatus());
2281 }
2282
TEST_F(WriteTransfer,Version2_ServerRunsLegacy)2283 TEST_F(WriteTransfer, Version2_ServerRunsLegacy) {
2284 stream::MemoryReader reader(kData32);
2285 Status transfer_status = Status::Unknown();
2286
2287 ASSERT_EQ(OkStatus(),
2288 client_.Write(
2289 3,
2290 reader,
2291 [&transfer_status](Status status) { transfer_status = status; },
2292 cfg::kDefaultChunkTimeout,
2293 ProtocolVersion::kVersionTwo));
2294 transfer_thread_.WaitUntilEventIsProcessed();
2295
2296 // The client begins by sending the ID of the resource to transfer.
2297 rpc::PayloadsView payloads =
2298 context_.output().payloads<Transfer::Write>(context_.channel().id());
2299 ASSERT_EQ(payloads.size(), 1u);
2300 EXPECT_EQ(transfer_status, Status::Unknown());
2301
2302 Chunk chunk = DecodeChunk(payloads.back());
2303 EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2304 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2305 EXPECT_EQ(chunk.session_id(), 3u);
2306 EXPECT_EQ(chunk.resource_id(), 3u);
2307
2308 // Instead of continuing the handshake with a START_ACK, the server
2309 // immediately sends parameters, indicating that it only supports the legacy
2310 // protocol. Client should switch over to legacy and continue the transfer.
2311 rpc::test::WaitForPackets(context_.output(), 2, [this] {
2312 context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
2313 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
2314 .set_session_id(3)
2315 .set_offset(0)
2316 .set_window_end_offset(64)
2317 .set_max_chunk_size_bytes(32)));
2318 });
2319
2320 ASSERT_EQ(payloads.size(), 3u);
2321
2322 chunk = DecodeChunk(payloads[1]);
2323 EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2324 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kLegacy);
2325 EXPECT_EQ(chunk.session_id(), 3u);
2326 EXPECT_EQ(chunk.offset(), 0u);
2327 EXPECT_TRUE(chunk.has_payload());
2328 EXPECT_EQ(std::memcmp(
2329 chunk.payload().data(), kData32.data(), chunk.payload().size()),
2330 0);
2331
2332 chunk = DecodeChunk(payloads[2]);
2333 EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2334 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kLegacy);
2335 EXPECT_EQ(chunk.session_id(), 3u);
2336 ASSERT_TRUE(chunk.remaining_bytes().has_value());
2337 EXPECT_EQ(chunk.remaining_bytes().value(), 0u);
2338
2339 EXPECT_EQ(transfer_status, Status::Unknown());
2340
2341 // Send the final status chunk to complete the transfer.
2342 context_.server().SendServerStream<Transfer::Write>(
2343 EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
2344 transfer_thread_.WaitUntilEventIsProcessed();
2345
2346 EXPECT_EQ(payloads.size(), 3u);
2347 EXPECT_EQ(transfer_status, OkStatus());
2348 }
2349
TEST_F(WriteTransfer,Version2_RetryDuringHandshake)2350 TEST_F(WriteTransfer, Version2_RetryDuringHandshake) {
2351 stream::MemoryReader reader(kData32);
2352 Status transfer_status = Status::Unknown();
2353
2354 ASSERT_EQ(OkStatus(),
2355 client_.Write(
2356 3,
2357 reader,
2358 [&transfer_status](Status status) { transfer_status = status; },
2359 cfg::kDefaultChunkTimeout,
2360 ProtocolVersion::kVersionTwo));
2361 transfer_thread_.WaitUntilEventIsProcessed();
2362
2363 // The client begins by sending the ID of the resource to transfer.
2364 rpc::PayloadsView payloads =
2365 context_.output().payloads<Transfer::Write>(context_.channel().id());
2366 ASSERT_EQ(payloads.size(), 1u);
2367 EXPECT_EQ(transfer_status, Status::Unknown());
2368
2369 Chunk chunk = DecodeChunk(payloads.back());
2370 EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2371 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2372 EXPECT_EQ(chunk.session_id(), 3u);
2373 EXPECT_EQ(chunk.resource_id(), 3u);
2374
2375 // Time out waiting for a server response. The client should resend the
2376 // initial packet.
2377 transfer_thread_.SimulateClientTimeout(3);
2378 ASSERT_EQ(payloads.size(), 2u);
2379
2380 chunk = DecodeChunk(payloads.back());
2381 EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2382 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2383 EXPECT_EQ(chunk.session_id(), 3u);
2384 EXPECT_EQ(chunk.resource_id(), 3u);
2385
2386 // This time, respond with the correct continuation packet. The transfer
2387 // should resume and complete normally.
2388 context_.server().SendServerStream<Transfer::Write>(
2389 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
2390 .set_session_id(31)
2391 .set_resource_id(3)));
2392 transfer_thread_.WaitUntilEventIsProcessed();
2393
2394 ASSERT_EQ(payloads.size(), 3u);
2395
2396 chunk = DecodeChunk(payloads.back());
2397 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
2398 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2399 EXPECT_EQ(chunk.session_id(), 31u);
2400 EXPECT_FALSE(chunk.resource_id().has_value());
2401
2402 rpc::test::WaitForPackets(context_.output(), 2, [this] {
2403 context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
2404 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kParametersRetransmit)
2405 .set_session_id(31)
2406 .set_offset(0)
2407 .set_window_end_offset(64)
2408 .set_max_chunk_size_bytes(32)));
2409 });
2410
2411 ASSERT_EQ(payloads.size(), 5u);
2412
2413 chunk = DecodeChunk(payloads[3]);
2414 EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2415 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2416 EXPECT_EQ(chunk.session_id(), 31u);
2417 EXPECT_EQ(chunk.offset(), 0u);
2418 EXPECT_TRUE(chunk.has_payload());
2419 EXPECT_EQ(std::memcmp(
2420 chunk.payload().data(), kData32.data(), chunk.payload().size()),
2421 0);
2422
2423 chunk = DecodeChunk(payloads[4]);
2424 EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2425 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2426 EXPECT_EQ(chunk.session_id(), 31u);
2427 ASSERT_TRUE(chunk.remaining_bytes().has_value());
2428 EXPECT_EQ(chunk.remaining_bytes().value(), 0u);
2429
2430 EXPECT_EQ(transfer_status, Status::Unknown());
2431
2432 context_.server().SendServerStream<Transfer::Write>(
2433 EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 31, OkStatus())));
2434 transfer_thread_.WaitUntilEventIsProcessed();
2435
2436 // Client should acknowledge the completion of the transfer.
2437 EXPECT_EQ(payloads.size(), 6u);
2438
2439 chunk = DecodeChunk(payloads[5]);
2440 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletionAck);
2441 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2442 EXPECT_EQ(chunk.session_id(), 31u);
2443
2444 EXPECT_EQ(transfer_status, OkStatus());
2445 }
2446
TEST_F(WriteTransfer,Version2_RetryAfterHandshake)2447 TEST_F(WriteTransfer, Version2_RetryAfterHandshake) {
2448 stream::MemoryReader reader(kData32);
2449 Status transfer_status = Status::Unknown();
2450
2451 ASSERT_EQ(OkStatus(),
2452 client_.Write(
2453 3,
2454 reader,
2455 [&transfer_status](Status status) { transfer_status = status; },
2456 cfg::kDefaultChunkTimeout,
2457 ProtocolVersion::kVersionTwo));
2458 transfer_thread_.WaitUntilEventIsProcessed();
2459
2460 // The client begins by sending the ID of the resource to transfer.
2461 rpc::PayloadsView payloads =
2462 context_.output().payloads<Transfer::Write>(context_.channel().id());
2463 ASSERT_EQ(payloads.size(), 1u);
2464 EXPECT_EQ(transfer_status, Status::Unknown());
2465
2466 Chunk chunk = DecodeChunk(payloads.back());
2467 EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2468 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2469 EXPECT_EQ(chunk.session_id(), 3u);
2470 EXPECT_EQ(chunk.resource_id(), 3u);
2471
2472 // The server responds with a START_ACK, continuing the version 2 handshake
2473 // and assigning a session_id to the transfer.
2474 context_.server().SendServerStream<Transfer::Write>(
2475 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAck)
2476 .set_session_id(33)
2477 .set_resource_id(3)));
2478 transfer_thread_.WaitUntilEventIsProcessed();
2479
2480 ASSERT_EQ(payloads.size(), 2u);
2481
2482 // Client should accept the session_id with a START_ACK_CONFIRMATION.
2483 chunk = DecodeChunk(payloads.back());
2484 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
2485 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2486 EXPECT_EQ(chunk.session_id(), 33u);
2487 EXPECT_FALSE(chunk.resource_id().has_value());
2488
2489 // Time out waiting for a server response. The client should resend the
2490 // initial packet.
2491 transfer_thread_.SimulateClientTimeout(33);
2492 ASSERT_EQ(payloads.size(), 3u);
2493
2494 chunk = DecodeChunk(payloads.back());
2495 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAckConfirmation);
2496 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2497 EXPECT_EQ(chunk.session_id(), 33u);
2498 EXPECT_FALSE(chunk.resource_id().has_value());
2499
2500 // This time, respond with the first transfer parameters chunk. The transfer
2501 // should resume and complete normally.
2502 rpc::test::WaitForPackets(context_.output(), 2, [this] {
2503 context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
2504 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kParametersRetransmit)
2505 .set_session_id(33)
2506 .set_offset(0)
2507 .set_window_end_offset(64)
2508 .set_max_chunk_size_bytes(32)));
2509 });
2510
2511 ASSERT_EQ(payloads.size(), 5u);
2512
2513 chunk = DecodeChunk(payloads[3]);
2514 EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2515 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2516 EXPECT_EQ(chunk.session_id(), 33u);
2517 EXPECT_EQ(chunk.offset(), 0u);
2518 EXPECT_TRUE(chunk.has_payload());
2519 EXPECT_EQ(std::memcmp(
2520 chunk.payload().data(), kData32.data(), chunk.payload().size()),
2521 0);
2522
2523 chunk = DecodeChunk(payloads[4]);
2524 EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2525 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2526 EXPECT_EQ(chunk.session_id(), 33u);
2527 ASSERT_TRUE(chunk.remaining_bytes().has_value());
2528 EXPECT_EQ(chunk.remaining_bytes().value(), 0u);
2529
2530 EXPECT_EQ(transfer_status, Status::Unknown());
2531
2532 context_.server().SendServerStream<Transfer::Write>(
2533 EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 33, OkStatus())));
2534 transfer_thread_.WaitUntilEventIsProcessed();
2535
2536 // Client should acknowledge the completion of the transfer.
2537 EXPECT_EQ(payloads.size(), 6u);
2538
2539 chunk = DecodeChunk(payloads[5]);
2540 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletionAck);
2541 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2542 EXPECT_EQ(chunk.session_id(), 33u);
2543
2544 EXPECT_EQ(transfer_status, OkStatus());
2545 }
2546
TEST_F(WriteTransfer,Version2_ServerErrorDuringHandshake)2547 TEST_F(WriteTransfer, Version2_ServerErrorDuringHandshake) {
2548 stream::MemoryReader reader(kData32);
2549 Status transfer_status = Status::Unknown();
2550
2551 ASSERT_EQ(OkStatus(),
2552 client_.Write(
2553 3,
2554 reader,
2555 [&transfer_status](Status status) { transfer_status = status; },
2556 cfg::kDefaultChunkTimeout,
2557 ProtocolVersion::kVersionTwo));
2558 transfer_thread_.WaitUntilEventIsProcessed();
2559
2560 // The client begins by sending the ID of the resource to transfer.
2561 rpc::PayloadsView payloads =
2562 context_.output().payloads<Transfer::Write>(context_.channel().id());
2563 ASSERT_EQ(payloads.size(), 1u);
2564 EXPECT_EQ(transfer_status, Status::Unknown());
2565
2566 Chunk chunk = DecodeChunk(payloads.back());
2567 EXPECT_EQ(chunk.type(), Chunk::Type::kStart);
2568 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2569 EXPECT_EQ(chunk.session_id(), 3u);
2570 EXPECT_EQ(chunk.resource_id(), 3u);
2571
2572 // The server responds to the start request with an error.
2573 context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
2574 Chunk::Final(ProtocolVersion::kVersionTwo, 3, Status::NotFound())));
2575 transfer_thread_.WaitUntilEventIsProcessed();
2576
2577 EXPECT_EQ(payloads.size(), 1u);
2578 EXPECT_EQ(transfer_status, Status::NotFound());
2579 }
2580
2581 } // namespace
2582 } // namespace pw::transfer::test
2583