1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_transfer/transfer.h"
16
17 #include "gtest/gtest.h"
18 #include "pw_assert/check.h"
19 #include "pw_bytes/array.h"
20 #include "pw_containers/algorithm.h"
21 #include "pw_rpc/raw/test_method_context.h"
22 #include "pw_rpc/test_helpers.h"
23 #include "pw_thread/thread.h"
24 #include "pw_thread_stl/options.h"
25 #include "pw_transfer/transfer.pwpb.h"
26 #include "pw_transfer_private/chunk_testing.h"
27
28 namespace pw::transfer::test {
29 namespace {
30
31 using namespace std::chrono_literals;
32
33 // TODO(frolv): Have a generic way to obtain a thread for testing on any system.
TransferThreadOptions()34 thread::Options& TransferThreadOptions() {
35 static thread::stl::Options options;
36 return options;
37 }
38
39 using internal::Chunk;
40
41 class TestMemoryReader : public stream::SeekableReader {
42 public:
TestMemoryReader(span<const std::byte> data)43 constexpr TestMemoryReader(span<const std::byte> data)
44 : memory_reader_(data) {}
45
DoSeek(ptrdiff_t offset,Whence origin)46 Status DoSeek(ptrdiff_t offset, Whence origin) override {
47 if (seek_status.ok()) {
48 return memory_reader_.Seek(offset, origin);
49 }
50 return seek_status;
51 }
52
DoRead(ByteSpan dest)53 StatusWithSize DoRead(ByteSpan dest) final {
54 if (!read_status.ok()) {
55 return StatusWithSize(read_status, 0);
56 }
57
58 auto result = memory_reader_.Read(dest);
59 return result.ok() ? StatusWithSize(result->size())
60 : StatusWithSize(result.status(), 0);
61 }
62
63 Status seek_status;
64 Status read_status;
65
66 private:
67 stream::MemoryReader memory_reader_;
68 };
69
70 class SimpleReadTransfer final : public ReadOnlyHandler {
71 public:
SimpleReadTransfer(uint32_t session_id,ConstByteSpan data)72 SimpleReadTransfer(uint32_t session_id, ConstByteSpan data)
73 : ReadOnlyHandler(session_id),
74 prepare_read_called(false),
75 finalize_read_called(false),
76 finalize_read_status(Status::Unknown()),
77 reader_(data) {}
78
PrepareRead()79 Status PrepareRead() final {
80 prepare_read_called = true;
81
82 if (!prepare_read_return_status.ok()) {
83 return prepare_read_return_status;
84 }
85
86 EXPECT_EQ(reader_.seek_status, reader_.Seek(0));
87 set_reader(reader_);
88 return OkStatus();
89 }
90
FinalizeRead(Status status)91 void FinalizeRead(Status status) final {
92 finalize_read_called = true;
93 finalize_read_status = status;
94 }
95
set_seek_status(Status status)96 void set_seek_status(Status status) { reader_.seek_status = status; }
set_read_status(Status status)97 void set_read_status(Status status) { reader_.read_status = status; }
98
99 bool prepare_read_called;
100 bool finalize_read_called;
101 Status prepare_read_return_status;
102 Status finalize_read_status;
103
104 private:
105 TestMemoryReader reader_;
106 };
107
__anon88b3d5bc0202(size_t i) 108 constexpr auto kData = bytes::Initialized<32>([](size_t i) { return i; });
109
110 class ReadTransfer : public ::testing::Test {
111 protected:
ReadTransfer(size_t max_chunk_size_bytes=64)112 ReadTransfer(size_t max_chunk_size_bytes = 64)
113 : handler_(3, kData),
114 transfer_thread_(span(data_buffer_).first(max_chunk_size_bytes),
115 encode_buffer_),
116 ctx_(transfer_thread_, 64),
117 system_thread_(TransferThreadOptions(), transfer_thread_) {
118 ctx_.service().RegisterHandler(handler_);
119
120 PW_CHECK(!handler_.prepare_read_called);
121 PW_CHECK(!handler_.finalize_read_called);
122
123 ctx_.call(); // Open the read stream
124 transfer_thread_.WaitUntilEventIsProcessed();
125 }
126
~ReadTransfer()127 ~ReadTransfer() override {
128 transfer_thread_.Terminate();
129 system_thread_.join();
130 }
131
132 SimpleReadTransfer handler_;
133 Thread<1, 1> transfer_thread_;
134 PW_RAW_TEST_METHOD_CONTEXT(TransferService, Read) ctx_;
135 thread::Thread system_thread_;
136 std::array<std::byte, 64> data_buffer_;
137 std::array<std::byte, 64> encode_buffer_;
138 };
139
TEST_F(ReadTransfer,SingleChunk)140 TEST_F(ReadTransfer, SingleChunk) {
141 rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
142 ctx_.SendClientStream(
143 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
144 .set_session_id(3)
145 .set_window_end_offset(64)
146 .set_offset(0)));
147
148 transfer_thread_.WaitUntilEventIsProcessed();
149 });
150
151 EXPECT_TRUE(handler_.prepare_read_called);
152 EXPECT_FALSE(handler_.finalize_read_called);
153
154 ASSERT_EQ(ctx_.total_responses(), 2u);
155 Chunk c0 = DecodeChunk(ctx_.responses()[0]);
156 Chunk c1 = DecodeChunk(ctx_.responses()[1]);
157
158 // First chunk should have all the read data.
159 EXPECT_EQ(c0.session_id(), 3u);
160 EXPECT_EQ(c0.offset(), 0u);
161 ASSERT_EQ(c0.payload().size(), kData.size());
162 EXPECT_EQ(std::memcmp(c0.payload().data(), kData.data(), c0.payload().size()),
163 0);
164
165 // Second chunk should be empty and set remaining_bytes = 0.
166 EXPECT_EQ(c1.session_id(), 3u);
167 EXPECT_FALSE(c1.has_payload());
168 ASSERT_TRUE(c1.remaining_bytes().has_value());
169 EXPECT_EQ(c1.remaining_bytes().value(), 0u);
170
171 ctx_.SendClientStream(
172 EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
173 transfer_thread_.WaitUntilEventIsProcessed();
174
175 EXPECT_TRUE(handler_.finalize_read_called);
176 EXPECT_EQ(handler_.finalize_read_status, OkStatus());
177 }
178
TEST_F(ReadTransfer,MultiChunk)179 TEST_F(ReadTransfer, MultiChunk) {
180 ctx_.SendClientStream(
181 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
182 .set_session_id(3)
183 .set_window_end_offset(16)
184 .set_offset(0)));
185
186 transfer_thread_.WaitUntilEventIsProcessed();
187
188 EXPECT_TRUE(handler_.prepare_read_called);
189 EXPECT_FALSE(handler_.finalize_read_called);
190
191 ASSERT_EQ(ctx_.total_responses(), 1u);
192 Chunk c0 = DecodeChunk(ctx_.responses().back());
193
194 EXPECT_EQ(c0.session_id(), 3u);
195 EXPECT_EQ(c0.offset(), 0u);
196 ASSERT_EQ(c0.payload().size(), 16u);
197 EXPECT_EQ(std::memcmp(c0.payload().data(), kData.data(), c0.payload().size()),
198 0);
199
200 ctx_.SendClientStream(EncodeChunk(
201 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersContinue)
202 .set_session_id(3)
203 .set_window_end_offset(32)
204 .set_offset(16)));
205 transfer_thread_.WaitUntilEventIsProcessed();
206
207 ASSERT_EQ(ctx_.total_responses(), 2u);
208 Chunk c1 = DecodeChunk(ctx_.responses().back());
209
210 EXPECT_EQ(c1.session_id(), 3u);
211 EXPECT_EQ(c1.offset(), 16u);
212 ASSERT_EQ(c1.payload().size(), 16u);
213 EXPECT_EQ(
214 std::memcmp(c1.payload().data(), kData.data() + 16, c1.payload().size()),
215 0);
216
217 ctx_.SendClientStream(EncodeChunk(
218 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersContinue)
219 .set_session_id(3)
220 .set_window_end_offset(48)
221 .set_offset(32)));
222 transfer_thread_.WaitUntilEventIsProcessed();
223
224 ASSERT_EQ(ctx_.total_responses(), 3u);
225 Chunk c2 = DecodeChunk(ctx_.responses()[2]);
226
227 EXPECT_EQ(c2.session_id(), 3u);
228 EXPECT_FALSE(c2.has_payload());
229 ASSERT_TRUE(c2.remaining_bytes().has_value());
230 EXPECT_EQ(c2.remaining_bytes().value(), 0u);
231
232 ctx_.SendClientStream(
233 EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
234 transfer_thread_.WaitUntilEventIsProcessed();
235
236 EXPECT_TRUE(handler_.finalize_read_called);
237 EXPECT_EQ(handler_.finalize_read_status, OkStatus());
238 }
239
TEST_F(ReadTransfer,MultiChunk_RepeatedContinuePackets)240 TEST_F(ReadTransfer, MultiChunk_RepeatedContinuePackets) {
241 ctx_.SendClientStream(
242 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
243 .set_session_id(3)
244 .set_window_end_offset(16)
245 .set_offset(0)));
246
247 transfer_thread_.WaitUntilEventIsProcessed();
248
249 const auto continue_chunk = EncodeChunk(
250 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersContinue)
251 .set_session_id(3)
252 .set_window_end_offset(24)
253 .set_offset(16));
254 ctx_.SendClientStream(continue_chunk);
255
256 transfer_thread_.WaitUntilEventIsProcessed();
257
258 // Resend the CONTINUE packets that don't actually advance the window.
259 for (int i = 0; i < 3; ++i) {
260 ctx_.SendClientStream(continue_chunk);
261 transfer_thread_.WaitUntilEventIsProcessed();
262 }
263
264 ASSERT_EQ(ctx_.total_responses(), 2u); // Only sent one packet
265 Chunk c1 = DecodeChunk(ctx_.responses()[1]);
266
267 EXPECT_EQ(c1.session_id(), 3u);
268 EXPECT_EQ(c1.offset(), 16u);
269 ASSERT_EQ(c1.payload().size(), 8u);
270 EXPECT_EQ(
271 std::memcmp(c1.payload().data(), kData.data() + 16, c1.payload().size()),
272 0);
273 }
274
TEST_F(ReadTransfer,OutOfOrder_SeekingSupported)275 TEST_F(ReadTransfer, OutOfOrder_SeekingSupported) {
276 rpc::test::WaitForPackets(ctx_.output(), 4, [this] {
277 ctx_.SendClientStream(
278 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
279 .set_session_id(3)
280 .set_window_end_offset(16)
281 .set_offset(0)));
282
283 transfer_thread_.WaitUntilEventIsProcessed();
284
285 Chunk chunk = DecodeChunk(ctx_.responses().back());
286 EXPECT_TRUE(pw::containers::Equal(span(kData).first(16), chunk.payload()));
287
288 ctx_.SendClientStream(EncodeChunk(
289 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
290 .set_session_id(3)
291 .set_window_end_offset(10)
292 .set_offset(2)));
293
294 transfer_thread_.WaitUntilEventIsProcessed();
295
296 chunk = DecodeChunk(ctx_.responses().back());
297 EXPECT_TRUE(
298 pw::containers::Equal(span(kData).subspan(2, 8), chunk.payload()));
299
300 ctx_.SendClientStream(EncodeChunk(
301 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
302 .set_session_id(3)
303 .set_window_end_offset(64)
304 .set_offset(17)));
305 });
306
307 ASSERT_EQ(ctx_.total_responses(), 4u);
308 Chunk chunk = DecodeChunk(ctx_.responses()[2]);
309 EXPECT_TRUE(
310 pw::containers::Equal(span(&kData[17], kData.end()), chunk.payload()));
311 }
312
TEST_F(ReadTransfer,OutOfOrder_SeekingNotSupported_EndsWithUnimplemented)313 TEST_F(ReadTransfer, OutOfOrder_SeekingNotSupported_EndsWithUnimplemented) {
314 handler_.set_seek_status(Status::Unimplemented());
315
316 ctx_.SendClientStream(
317 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
318 .set_session_id(3)
319 .set_window_end_offset(16)
320 .set_offset(0)));
321 ctx_.SendClientStream(EncodeChunk(
322 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
323 .set_session_id(3)
324 .set_window_end_offset(10)
325 .set_offset(2)));
326
327 transfer_thread_.WaitUntilEventIsProcessed();
328
329 ASSERT_EQ(ctx_.total_responses(), 2u);
330 Chunk chunk = DecodeChunk(ctx_.responses().back());
331 ASSERT_TRUE(chunk.status().has_value());
332 EXPECT_EQ(chunk.status().value(), Status::Unimplemented());
333 }
334
TEST_F(ReadTransfer,MaxChunkSize_Client)335 TEST_F(ReadTransfer, MaxChunkSize_Client) {
336 rpc::test::WaitForPackets(ctx_.output(), 5, [this] {
337 ctx_.SendClientStream(
338 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
339 .set_session_id(3)
340 .set_window_end_offset(64)
341 .set_max_chunk_size_bytes(8)
342 .set_offset(0)));
343 });
344
345 EXPECT_TRUE(handler_.prepare_read_called);
346 EXPECT_FALSE(handler_.finalize_read_called);
347
348 ASSERT_EQ(ctx_.total_responses(), 5u);
349 Chunk c0 = DecodeChunk(ctx_.responses()[0]);
350 Chunk c1 = DecodeChunk(ctx_.responses()[1]);
351 Chunk c2 = DecodeChunk(ctx_.responses()[2]);
352 Chunk c3 = DecodeChunk(ctx_.responses()[3]);
353 Chunk c4 = DecodeChunk(ctx_.responses()[4]);
354
355 EXPECT_EQ(c0.session_id(), 3u);
356 EXPECT_EQ(c0.offset(), 0u);
357 ASSERT_EQ(c0.payload().size(), 8u);
358 EXPECT_EQ(std::memcmp(c0.payload().data(), kData.data(), c0.payload().size()),
359 0);
360
361 EXPECT_EQ(c1.session_id(), 3u);
362 EXPECT_EQ(c1.offset(), 8u);
363 ASSERT_EQ(c1.payload().size(), 8u);
364 EXPECT_EQ(
365 std::memcmp(c1.payload().data(), kData.data() + 8, c1.payload().size()),
366 0);
367
368 EXPECT_EQ(c2.session_id(), 3u);
369 EXPECT_EQ(c2.offset(), 16u);
370 ASSERT_EQ(c2.payload().size(), 8u);
371 EXPECT_EQ(
372 std::memcmp(c2.payload().data(), kData.data() + 16, c2.payload().size()),
373 0);
374
375 EXPECT_EQ(c3.session_id(), 3u);
376 EXPECT_EQ(c3.offset(), 24u);
377 ASSERT_EQ(c3.payload().size(), 8u);
378 EXPECT_EQ(
379 std::memcmp(c3.payload().data(), kData.data() + 24, c3.payload().size()),
380 0);
381
382 EXPECT_EQ(c4.session_id(), 3u);
383 EXPECT_EQ(c4.payload().size(), 0u);
384 ASSERT_TRUE(c4.remaining_bytes().has_value());
385 EXPECT_EQ(c4.remaining_bytes().value(), 0u);
386
387 ctx_.SendClientStream(
388 EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
389 transfer_thread_.WaitUntilEventIsProcessed();
390
391 EXPECT_TRUE(handler_.finalize_read_called);
392 EXPECT_EQ(handler_.finalize_read_status, OkStatus());
393 }
394
TEST_F(ReadTransfer,HandlerIsClearedAfterTransfer)395 TEST_F(ReadTransfer, HandlerIsClearedAfterTransfer) {
396 ctx_.SendClientStream(
397 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
398 .set_session_id(3)
399 .set_window_end_offset(64)
400 .set_offset(0)));
401 ctx_.SendClientStream(
402 EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
403 transfer_thread_.WaitUntilEventIsProcessed();
404
405 ASSERT_EQ(ctx_.total_responses(), 1u);
406 ASSERT_TRUE(handler_.prepare_read_called);
407 ASSERT_TRUE(handler_.finalize_read_called);
408 ASSERT_EQ(OkStatus(), handler_.finalize_read_status);
409
410 // Now, clear state and start a second transfer
411 handler_.prepare_read_return_status = Status::FailedPrecondition();
412 handler_.prepare_read_called = false;
413 handler_.finalize_read_called = false;
414
415 ctx_.SendClientStream(
416 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
417 .set_session_id(3)
418 .set_window_end_offset(64)
419 .set_offset(0)));
420 transfer_thread_.WaitUntilEventIsProcessed();
421
422 // Prepare failed, so the handler should not have been stored in the context,
423 // and finalize should not have been called.
424 ASSERT_TRUE(handler_.prepare_read_called);
425 ASSERT_FALSE(handler_.finalize_read_called);
426 }
427
428 class ReadTransferMaxChunkSize8 : public ReadTransfer {
429 protected:
ReadTransferMaxChunkSize8()430 ReadTransferMaxChunkSize8() : ReadTransfer(/*max_chunk_size_bytes=*/8) {}
431 };
432
TEST_F(ReadTransferMaxChunkSize8,MaxChunkSize_Server)433 TEST_F(ReadTransferMaxChunkSize8, MaxChunkSize_Server) {
434 // Client asks for max 16-byte chunks, but service places a limit of 8 bytes.
435 // TODO(frolv): Fix
436 rpc::test::WaitForPackets(ctx_.output(), 5, [this] {
437 ctx_.SendClientStream(
438 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
439 .set_session_id(3)
440 .set_window_end_offset(64)
441 // .set_max_chunk_size_bytes(16)
442 .set_offset(0)));
443 });
444
445 EXPECT_TRUE(handler_.prepare_read_called);
446 EXPECT_FALSE(handler_.finalize_read_called);
447
448 ASSERT_EQ(ctx_.total_responses(), 5u);
449 Chunk c0 = DecodeChunk(ctx_.responses()[0]);
450 Chunk c1 = DecodeChunk(ctx_.responses()[1]);
451 Chunk c2 = DecodeChunk(ctx_.responses()[2]);
452 Chunk c3 = DecodeChunk(ctx_.responses()[3]);
453 Chunk c4 = DecodeChunk(ctx_.responses()[4]);
454
455 EXPECT_EQ(c0.session_id(), 3u);
456 EXPECT_EQ(c0.offset(), 0u);
457 ASSERT_EQ(c0.payload().size(), 8u);
458 EXPECT_EQ(std::memcmp(c0.payload().data(), kData.data(), c0.payload().size()),
459 0);
460
461 EXPECT_EQ(c1.session_id(), 3u);
462 EXPECT_EQ(c1.offset(), 8u);
463 ASSERT_EQ(c1.payload().size(), 8u);
464 EXPECT_EQ(
465 std::memcmp(c1.payload().data(), kData.data() + 8, c1.payload().size()),
466 0);
467
468 EXPECT_EQ(c2.session_id(), 3u);
469 EXPECT_EQ(c2.offset(), 16u);
470 ASSERT_EQ(c2.payload().size(), 8u);
471 EXPECT_EQ(
472 std::memcmp(c2.payload().data(), kData.data() + 16, c2.payload().size()),
473 0);
474
475 EXPECT_EQ(c3.session_id(), 3u);
476 EXPECT_EQ(c3.offset(), 24u);
477 ASSERT_EQ(c3.payload().size(), 8u);
478 EXPECT_EQ(
479 std::memcmp(c3.payload().data(), kData.data() + 24, c3.payload().size()),
480 0);
481
482 EXPECT_EQ(c4.session_id(), 3u);
483 EXPECT_EQ(c4.payload().size(), 0u);
484 ASSERT_TRUE(c4.remaining_bytes().has_value());
485 EXPECT_EQ(c4.remaining_bytes().value(), 0u);
486
487 ctx_.SendClientStream(
488 EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
489 transfer_thread_.WaitUntilEventIsProcessed();
490
491 EXPECT_TRUE(handler_.finalize_read_called);
492 EXPECT_EQ(handler_.finalize_read_status, OkStatus());
493 }
494
TEST_F(ReadTransfer,ClientError)495 TEST_F(ReadTransfer, ClientError) {
496 ctx_.SendClientStream(
497 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
498 .set_session_id(3)
499 .set_window_end_offset(16)
500 .set_offset(0)));
501
502 transfer_thread_.WaitUntilEventIsProcessed();
503
504 EXPECT_TRUE(handler_.prepare_read_called);
505 EXPECT_FALSE(handler_.finalize_read_called);
506 ASSERT_EQ(ctx_.total_responses(), 1u);
507
508 // Send client error.
509 ctx_.SendClientStream(EncodeChunk(
510 Chunk::Final(ProtocolVersion::kLegacy, 3, Status::OutOfRange())));
511 transfer_thread_.WaitUntilEventIsProcessed();
512
513 ASSERT_EQ(ctx_.total_responses(), 1u);
514 EXPECT_TRUE(handler_.finalize_read_called);
515 EXPECT_EQ(handler_.finalize_read_status, Status::OutOfRange());
516 }
517
TEST_F(ReadTransfer,UnregisteredHandler)518 TEST_F(ReadTransfer, UnregisteredHandler) {
519 ctx_.SendClientStream(
520 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
521 .set_session_id(11)
522 .set_window_end_offset(32)
523 .set_offset(0)));
524 transfer_thread_.WaitUntilEventIsProcessed();
525
526 ASSERT_EQ(ctx_.total_responses(), 1u);
527 Chunk chunk = DecodeChunk(ctx_.responses().back());
528 EXPECT_EQ(chunk.session_id(), 11u);
529 ASSERT_TRUE(chunk.status().has_value());
530 EXPECT_EQ(chunk.status().value(), Status::NotFound());
531 }
532
TEST_F(ReadTransfer,IgnoresNonPendingTransfers)533 TEST_F(ReadTransfer, IgnoresNonPendingTransfers) {
534 ctx_.SendClientStream(EncodeChunk(
535 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
536 .set_session_id(3)
537 .set_window_end_offset(32)
538 .set_offset(3)));
539 ctx_.SendClientStream(
540 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
541 .set_session_id(3)
542 .set_payload(span(kData).first(10))
543 .set_offset(3)));
544 ctx_.SendClientStream(
545 EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
546 transfer_thread_.WaitUntilEventIsProcessed();
547
548 // Only start transfer for an initial packet.
549 EXPECT_FALSE(handler_.prepare_read_called);
550 EXPECT_FALSE(handler_.finalize_read_called);
551 }
552
TEST_F(ReadTransfer,AbortAndRestartIfInitialPacketIsReceived)553 TEST_F(ReadTransfer, AbortAndRestartIfInitialPacketIsReceived) {
554 ctx_.SendClientStream(
555 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
556 .set_session_id(3)
557 .set_window_end_offset(16)
558 .set_offset(0)));
559 transfer_thread_.WaitUntilEventIsProcessed();
560
561 ASSERT_EQ(ctx_.total_responses(), 1u);
562
563 EXPECT_TRUE(handler_.prepare_read_called);
564 EXPECT_FALSE(handler_.finalize_read_called);
565 handler_.prepare_read_called = false; // Reset so can check if called again.
566
567 ctx_.SendClientStream( // Resend starting chunk
568 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
569 .set_session_id(3)
570 .set_window_end_offset(16)
571 .set_offset(0)));
572 transfer_thread_.WaitUntilEventIsProcessed();
573
574 ASSERT_EQ(ctx_.total_responses(), 2u);
575
576 EXPECT_TRUE(handler_.prepare_read_called);
577 EXPECT_TRUE(handler_.finalize_read_called);
578 EXPECT_EQ(handler_.finalize_read_status, Status::Aborted());
579 handler_.finalize_read_called = false; // Reset so can check later
580
581 ctx_.SendClientStream(EncodeChunk(
582 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
583 .set_session_id(3)
584 .set_window_end_offset(32)
585 .set_offset(16)));
586 ctx_.SendClientStream(
587 EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
588 transfer_thread_.WaitUntilEventIsProcessed();
589
590 ASSERT_EQ(ctx_.total_responses(), 3u);
591 EXPECT_TRUE(handler_.finalize_read_called);
592 EXPECT_EQ(handler_.finalize_read_status, OkStatus());
593 }
594
TEST_F(ReadTransfer,ZeroPendingBytesWithRemainingData_Aborts)595 TEST_F(ReadTransfer, ZeroPendingBytesWithRemainingData_Aborts) {
596 ctx_.SendClientStream(
597 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
598 .set_session_id(3)
599 .set_window_end_offset(0)
600 .set_offset(0)));
601 transfer_thread_.WaitUntilEventIsProcessed();
602
603 ASSERT_EQ(ctx_.total_responses(), 1u);
604 ASSERT_TRUE(handler_.finalize_read_called);
605 EXPECT_EQ(handler_.finalize_read_status, Status::ResourceExhausted());
606
607 Chunk chunk = DecodeChunk(ctx_.responses().back());
608 EXPECT_EQ(chunk.status(), Status::ResourceExhausted());
609 }
610
TEST_F(ReadTransfer,ZeroPendingBytesNoRemainingData_Completes)611 TEST_F(ReadTransfer, ZeroPendingBytesNoRemainingData_Completes) {
612 // Make the next read appear to be the end of the stream.
613 handler_.set_read_status(Status::OutOfRange());
614
615 ctx_.SendClientStream(
616 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
617 .set_session_id(3)
618 .set_window_end_offset(0)
619 .set_offset(0)));
620 transfer_thread_.WaitUntilEventIsProcessed();
621
622 Chunk chunk = DecodeChunk(ctx_.responses().back());
623 EXPECT_EQ(chunk.session_id(), 3u);
624 EXPECT_EQ(chunk.remaining_bytes(), 0u);
625
626 ctx_.SendClientStream(
627 EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
628 transfer_thread_.WaitUntilEventIsProcessed();
629
630 ASSERT_EQ(ctx_.total_responses(), 1u);
631 ASSERT_TRUE(handler_.finalize_read_called);
632 EXPECT_EQ(handler_.finalize_read_status, OkStatus());
633 }
634
TEST_F(ReadTransfer,SendsErrorIfChunkIsReceivedInCompletedState)635 TEST_F(ReadTransfer, SendsErrorIfChunkIsReceivedInCompletedState) {
636 rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
637 ctx_.SendClientStream(
638 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
639 .set_session_id(3)
640 .set_window_end_offset(64)
641 .set_offset(0)));
642 });
643
644 EXPECT_TRUE(handler_.prepare_read_called);
645 EXPECT_FALSE(handler_.finalize_read_called);
646
647 ASSERT_EQ(ctx_.total_responses(), 2u);
648 Chunk c0 = DecodeChunk(ctx_.responses()[0]);
649 Chunk c1 = DecodeChunk(ctx_.responses()[1]);
650
651 // First chunk should have all the read data.
652 EXPECT_EQ(c0.session_id(), 3u);
653 EXPECT_EQ(c0.offset(), 0u);
654 ASSERT_EQ(c0.payload().size(), kData.size());
655 EXPECT_EQ(std::memcmp(c0.payload().data(), kData.data(), c0.payload().size()),
656 0);
657
658 // Second chunk should be empty and set remaining_bytes = 0.
659 EXPECT_EQ(c1.session_id(), 3u);
660 EXPECT_FALSE(c1.has_payload());
661 ASSERT_TRUE(c1.remaining_bytes().has_value());
662 EXPECT_EQ(c1.remaining_bytes().value(), 0u);
663
664 ctx_.SendClientStream(
665 EncodeChunk(Chunk::Final(ProtocolVersion::kLegacy, 3, OkStatus())));
666 transfer_thread_.WaitUntilEventIsProcessed();
667
668 EXPECT_TRUE(handler_.finalize_read_called);
669 EXPECT_EQ(handler_.finalize_read_status, OkStatus());
670
671 // At this point the transfer should be in a completed state. Send a
672 // non-initial chunk as a continuation of the transfer.
673 handler_.finalize_read_called = false;
674
675 ctx_.SendClientStream(EncodeChunk(
676 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
677 .set_session_id(3)
678 .set_window_end_offset(64)
679 .set_offset(16)));
680 transfer_thread_.WaitUntilEventIsProcessed();
681
682 ASSERT_EQ(ctx_.total_responses(), 3u);
683
684 Chunk c2 = DecodeChunk(ctx_.responses()[2]);
685 ASSERT_TRUE(c2.status().has_value());
686 EXPECT_EQ(c2.status().value(), Status::FailedPrecondition());
687
688 // FinalizeRead should not be called again.
689 EXPECT_FALSE(handler_.finalize_read_called);
690 }
691
692 class SimpleWriteTransfer final : public WriteOnlyHandler {
693 public:
SimpleWriteTransfer(uint32_t session_id,ByteSpan data)694 SimpleWriteTransfer(uint32_t session_id, ByteSpan data)
695 : WriteOnlyHandler(session_id),
696 prepare_write_called(false),
697 finalize_write_called(false),
698 finalize_write_status(Status::Unknown()),
699 writer_(data) {}
700
PrepareWrite()701 Status PrepareWrite() final {
702 EXPECT_EQ(OkStatus(), writer_.Seek(0));
703 set_writer(writer_);
704 prepare_write_called = true;
705 return OkStatus();
706 }
707
FinalizeWrite(Status status)708 Status FinalizeWrite(Status status) final {
709 finalize_write_called = true;
710 finalize_write_status = status;
711 return finalize_write_return_status_;
712 }
713
set_finalize_write_return(Status status)714 void set_finalize_write_return(Status status) {
715 finalize_write_return_status_ = status;
716 }
717
718 bool prepare_write_called;
719 bool finalize_write_called;
720 Status finalize_write_status;
721
722 private:
723 Status finalize_write_return_status_;
724 stream::MemoryWriter writer_;
725 };
726
727 class WriteTransfer : public ::testing::Test {
728 protected:
WriteTransfer(size_t max_bytes_to_receive=64)729 WriteTransfer(size_t max_bytes_to_receive = 64)
730 : buffer{},
731 handler_(7, buffer),
732 transfer_thread_(data_buffer_, encode_buffer_),
733 system_thread_(TransferThreadOptions(), transfer_thread_),
734 ctx_(transfer_thread_,
735 max_bytes_to_receive,
736 // Use a long timeout to avoid accidentally triggering timeouts.
737 std::chrono::minutes(1)) {
738 ctx_.service().RegisterHandler(handler_);
739
740 PW_CHECK(!handler_.prepare_write_called);
741 PW_CHECK(!handler_.finalize_write_called);
742
743 ctx_.call(); // Open the write stream
744 transfer_thread_.WaitUntilEventIsProcessed();
745 }
746
~WriteTransfer()747 ~WriteTransfer() override {
748 transfer_thread_.Terminate();
749 system_thread_.join();
750 }
751
752 std::array<std::byte, kData.size()> buffer;
753 SimpleWriteTransfer handler_;
754
755 Thread<1, 1> transfer_thread_;
756 thread::Thread system_thread_;
757 std::array<std::byte, 64> data_buffer_;
758 std::array<std::byte, 64> encode_buffer_;
759 PW_RAW_TEST_METHOD_CONTEXT(TransferService, Write) ctx_;
760 };
761
TEST_F(WriteTransfer,SingleChunk)762 TEST_F(WriteTransfer, SingleChunk) {
763 ctx_.SendClientStream(EncodeChunk(
764 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
765 transfer_thread_.WaitUntilEventIsProcessed();
766
767 EXPECT_TRUE(handler_.prepare_write_called);
768 EXPECT_FALSE(handler_.finalize_write_called);
769
770 ASSERT_EQ(ctx_.total_responses(), 1u);
771 Chunk chunk = DecodeChunk(ctx_.responses().back());
772 EXPECT_EQ(chunk.session_id(), 7u);
773 EXPECT_EQ(chunk.window_end_offset(), 32u);
774 ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
775 EXPECT_EQ(chunk.max_chunk_size_bytes().value(), 37u);
776
777 ctx_.SendClientStream<64>(
778 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
779 .set_session_id(7)
780 .set_offset(0)
781 .set_payload(kData)
782 .set_remaining_bytes(0)));
783 transfer_thread_.WaitUntilEventIsProcessed();
784
785 ASSERT_EQ(ctx_.total_responses(), 2u);
786 chunk = DecodeChunk(ctx_.responses().back());
787 EXPECT_EQ(chunk.session_id(), 7u);
788 ASSERT_TRUE(chunk.status().has_value());
789 EXPECT_EQ(chunk.status().value(), OkStatus());
790
791 EXPECT_TRUE(handler_.finalize_write_called);
792 EXPECT_EQ(handler_.finalize_write_status, OkStatus());
793 EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
794 }
795
TEST_F(WriteTransfer,FinalizeFails)796 TEST_F(WriteTransfer, FinalizeFails) {
797 // Return an error when FinalizeWrite is called.
798 handler_.set_finalize_write_return(Status::FailedPrecondition());
799
800 ctx_.SendClientStream(EncodeChunk(
801 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
802 ctx_.SendClientStream<64>(
803 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
804 .set_session_id(7)
805 .set_offset(0)
806 .set_payload(kData)
807 .set_remaining_bytes(0)));
808 transfer_thread_.WaitUntilEventIsProcessed();
809
810 ASSERT_EQ(ctx_.total_responses(), 2u);
811 Chunk chunk = DecodeChunk(ctx_.responses()[1]);
812 EXPECT_EQ(chunk.session_id(), 7u);
813 ASSERT_TRUE(chunk.status().has_value());
814 EXPECT_EQ(chunk.status().value(), Status::DataLoss());
815
816 EXPECT_TRUE(handler_.finalize_write_called);
817 EXPECT_EQ(handler_.finalize_write_status, OkStatus());
818 }
819
TEST_F(WriteTransfer,SendingFinalPacketFails)820 TEST_F(WriteTransfer, SendingFinalPacketFails) {
821 ctx_.SendClientStream(EncodeChunk(
822 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
823 transfer_thread_.WaitUntilEventIsProcessed();
824
825 ctx_.output().set_send_status(Status::Unknown());
826
827 ctx_.SendClientStream<64>(
828 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
829 .set_session_id(7)
830 .set_offset(0)
831 .set_payload(kData)
832 .set_remaining_bytes(0)));
833 transfer_thread_.WaitUntilEventIsProcessed();
834
835 // Should only have sent the transfer parameters.
836 ASSERT_EQ(ctx_.total_responses(), 1u);
837 Chunk chunk = DecodeChunk(ctx_.responses()[0]);
838 EXPECT_EQ(chunk.session_id(), 7u);
839 EXPECT_EQ(chunk.window_end_offset(), 32u);
840 ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
841 EXPECT_EQ(chunk.max_chunk_size_bytes().value(), 37u);
842
843 // When FinalizeWrite() was called, the transfer was considered successful.
844 EXPECT_TRUE(handler_.finalize_write_called);
845 EXPECT_EQ(handler_.finalize_write_status, OkStatus());
846 }
847
TEST_F(WriteTransfer,MultiChunk)848 TEST_F(WriteTransfer, MultiChunk) {
849 ctx_.SendClientStream(EncodeChunk(
850 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
851 transfer_thread_.WaitUntilEventIsProcessed();
852
853 EXPECT_TRUE(handler_.prepare_write_called);
854 EXPECT_FALSE(handler_.finalize_write_called);
855
856 ASSERT_EQ(ctx_.total_responses(), 1u);
857 Chunk chunk = DecodeChunk(ctx_.responses()[0]);
858 EXPECT_EQ(chunk.session_id(), 7u);
859 EXPECT_EQ(chunk.window_end_offset(), 32u);
860
861 ctx_.SendClientStream<64>(
862 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
863 .set_session_id(7)
864 .set_offset(0)
865 .set_payload(span(kData).first(8))));
866 transfer_thread_.WaitUntilEventIsProcessed();
867
868 ASSERT_EQ(ctx_.total_responses(), 1u);
869
870 ctx_.SendClientStream<64>(
871 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
872 .set_session_id(7)
873 .set_offset(8)
874 .set_payload(span(kData).subspan(8))
875 .set_remaining_bytes(0)));
876 transfer_thread_.WaitUntilEventIsProcessed();
877
878 ASSERT_EQ(ctx_.total_responses(), 2u);
879 chunk = DecodeChunk(ctx_.responses().back());
880 EXPECT_EQ(chunk.session_id(), 7u);
881 ASSERT_TRUE(chunk.status().has_value());
882 EXPECT_EQ(chunk.status().value(), OkStatus());
883
884 EXPECT_TRUE(handler_.finalize_write_called);
885 EXPECT_EQ(handler_.finalize_write_status, OkStatus());
886 EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
887 }
888
TEST_F(WriteTransfer,WriteFailsOnRetry)889 TEST_F(WriteTransfer, WriteFailsOnRetry) {
890 // Skip one packet to fail on a retry.
891 ctx_.output().set_send_status(Status::FailedPrecondition(), 1);
892
893 // Wait for 3 packets: initial params, retry attempt, final error
894 rpc::test::WaitForPackets(ctx_.output(), 3, [this] {
895 // Send only one client packet so the service times out.
896 ctx_.SendClientStream(
897 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
898 .set_session_id(7)));
899 transfer_thread_.SimulateServerTimeout(7); // Time out to trigger retry
900 });
901
902 // Attempted to send 3 packets, but the 2nd packet was dropped.
903 // Check that the last packet is an INTERNAL error from the RPC write failure.
904 ASSERT_EQ(ctx_.total_responses(), 2u);
905 Chunk chunk = DecodeChunk(ctx_.responses()[1]);
906 EXPECT_EQ(chunk.session_id(), 7u);
907 ASSERT_TRUE(chunk.status().has_value());
908 EXPECT_EQ(chunk.status().value(), Status::Internal());
909 }
910
TEST_F(WriteTransfer,TimeoutInRecoveryState)911 TEST_F(WriteTransfer, TimeoutInRecoveryState) {
912 ctx_.SendClientStream(EncodeChunk(
913 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
914 transfer_thread_.WaitUntilEventIsProcessed();
915
916 ASSERT_EQ(ctx_.total_responses(), 1u);
917 Chunk chunk = DecodeChunk(ctx_.responses().back());
918 EXPECT_EQ(chunk.session_id(), 7u);
919 EXPECT_EQ(chunk.offset(), 0u);
920 EXPECT_EQ(chunk.window_end_offset(), 32u);
921
922 constexpr span data(kData);
923
924 ctx_.SendClientStream(
925 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
926 .set_session_id(7)
927 .set_offset(0)
928 .set_payload(data.first(8))));
929
930 // Skip offset 8 to enter a recovery state.
931 ctx_.SendClientStream(
932 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
933 .set_session_id(7)
934 .set_offset(12)
935 .set_payload(data.subspan(12, 4))));
936 transfer_thread_.WaitUntilEventIsProcessed();
937
938 // Recovery parameters should be sent for offset 8.
939 ASSERT_EQ(ctx_.total_responses(), 2u);
940 chunk = DecodeChunk(ctx_.responses().back());
941 EXPECT_EQ(chunk.session_id(), 7u);
942 EXPECT_EQ(chunk.offset(), 8u);
943 EXPECT_EQ(chunk.window_end_offset(), 32u);
944
945 // Timeout while in the recovery state.
946 transfer_thread_.SimulateServerTimeout(7);
947 transfer_thread_.WaitUntilEventIsProcessed();
948
949 // Same recovery parameters should be re-sent.
950 ASSERT_EQ(ctx_.total_responses(), 3u);
951 chunk = DecodeChunk(ctx_.responses().back());
952 EXPECT_EQ(chunk.session_id(), 7u);
953 EXPECT_EQ(chunk.offset(), 8u);
954 EXPECT_EQ(chunk.window_end_offset(), 32u);
955 }
956
TEST_F(WriteTransfer,ExtendWindow)957 TEST_F(WriteTransfer, ExtendWindow) {
958 ctx_.SendClientStream(EncodeChunk(
959 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
960 transfer_thread_.WaitUntilEventIsProcessed();
961
962 EXPECT_TRUE(handler_.prepare_write_called);
963 EXPECT_FALSE(handler_.finalize_write_called);
964
965 ASSERT_EQ(ctx_.total_responses(), 1u);
966 Chunk chunk = DecodeChunk(ctx_.responses().back());
967 EXPECT_EQ(chunk.session_id(), 7u);
968 EXPECT_EQ(chunk.window_end_offset(), 32u);
969
970 // Window starts at 32 bytes and should extend when half of that is sent.
971 ctx_.SendClientStream(
972 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
973 .set_session_id(7)
974 .set_offset(0)
975 .set_payload(span(kData).first(4))));
976 transfer_thread_.WaitUntilEventIsProcessed();
977 ASSERT_EQ(ctx_.total_responses(), 1u);
978
979 ctx_.SendClientStream(
980 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
981 .set_session_id(7)
982 .set_offset(4)
983 .set_payload(span(kData).subspan(4, 4))));
984 transfer_thread_.WaitUntilEventIsProcessed();
985 ASSERT_EQ(ctx_.total_responses(), 1u);
986
987 ctx_.SendClientStream(
988 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
989 .set_session_id(7)
990 .set_offset(8)
991 .set_payload(span(kData).subspan(8, 4))));
992 transfer_thread_.WaitUntilEventIsProcessed();
993 ASSERT_EQ(ctx_.total_responses(), 1u);
994
995 ctx_.SendClientStream(
996 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
997 .set_session_id(7)
998 .set_offset(12)
999 .set_payload(span(kData).subspan(12, 4))));
1000 transfer_thread_.WaitUntilEventIsProcessed();
1001 ASSERT_EQ(ctx_.total_responses(), 2u);
1002
1003 // Extend parameters chunk.
1004 chunk = DecodeChunk(ctx_.responses().back());
1005 EXPECT_EQ(chunk.session_id(), 7u);
1006 EXPECT_EQ(chunk.window_end_offset(), 32u);
1007 EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
1008
1009 ctx_.SendClientStream<64>(
1010 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1011 .set_session_id(7)
1012 .set_offset(16)
1013 .set_payload(span(kData).subspan(16))
1014 .set_remaining_bytes(0)));
1015 transfer_thread_.WaitUntilEventIsProcessed();
1016
1017 ASSERT_EQ(ctx_.total_responses(), 3u);
1018 chunk = DecodeChunk(ctx_.responses()[2]);
1019 EXPECT_EQ(chunk.session_id(), 7u);
1020 ASSERT_TRUE(chunk.status().has_value());
1021 EXPECT_EQ(chunk.status().value(), OkStatus());
1022
1023 EXPECT_TRUE(handler_.finalize_write_called);
1024 EXPECT_EQ(handler_.finalize_write_status, OkStatus());
1025 EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
1026 }
1027
1028 class WriteTransferMaxBytes16 : public WriteTransfer {
1029 protected:
WriteTransferMaxBytes16()1030 WriteTransferMaxBytes16() : WriteTransfer(/*max_bytes_to_receive=*/16) {}
1031 };
1032
TEST_F(WriteTransfer,TransmitterReducesWindow)1033 TEST_F(WriteTransfer, TransmitterReducesWindow) {
1034 ctx_.SendClientStream(EncodeChunk(
1035 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1036 transfer_thread_.WaitUntilEventIsProcessed();
1037
1038 EXPECT_TRUE(handler_.prepare_write_called);
1039 EXPECT_FALSE(handler_.finalize_write_called);
1040
1041 ASSERT_EQ(ctx_.total_responses(), 1u);
1042 Chunk chunk = DecodeChunk(ctx_.responses().back());
1043 EXPECT_EQ(chunk.session_id(), 7u);
1044 EXPECT_EQ(chunk.window_end_offset(), 32u);
1045
1046 // Send only 12 bytes and set that as the new end offset.
1047 ctx_.SendClientStream<64>(
1048 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1049 .set_session_id(7)
1050 .set_offset(0)
1051 .set_window_end_offset(12)
1052 .set_payload(span(kData).first(12))));
1053 transfer_thread_.WaitUntilEventIsProcessed();
1054 ASSERT_EQ(ctx_.total_responses(), 2u);
1055
1056 // Receiver should respond immediately with a retransmit chunk as the end of
1057 // the window has been reached.
1058 chunk = DecodeChunk(ctx_.responses().back());
1059 EXPECT_EQ(chunk.session_id(), 7u);
1060 EXPECT_EQ(chunk.offset(), 12u);
1061 EXPECT_EQ(chunk.window_end_offset(), 32u);
1062 EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
1063 }
1064
TEST_F(WriteTransfer,TransmitterExtendsWindow_TerminatesWithInvalid)1065 TEST_F(WriteTransfer, TransmitterExtendsWindow_TerminatesWithInvalid) {
1066 ctx_.SendClientStream(EncodeChunk(
1067 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1068 transfer_thread_.WaitUntilEventIsProcessed();
1069
1070 EXPECT_TRUE(handler_.prepare_write_called);
1071 EXPECT_FALSE(handler_.finalize_write_called);
1072
1073 ASSERT_EQ(ctx_.total_responses(), 1u);
1074 Chunk chunk = DecodeChunk(ctx_.responses().back());
1075 EXPECT_EQ(chunk.session_id(), 7u);
1076 EXPECT_EQ(chunk.window_end_offset(), 32u);
1077
1078 ctx_.SendClientStream<64>(
1079 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1080 .set_session_id(7)
1081 .set_offset(0)
1082 // Larger window end offset than the receiver's.
1083 .set_window_end_offset(48)
1084 .set_payload(span(kData).first(16))));
1085 transfer_thread_.WaitUntilEventIsProcessed();
1086 ASSERT_EQ(ctx_.total_responses(), 2u);
1087
1088 chunk = DecodeChunk(ctx_.responses().back());
1089 EXPECT_EQ(chunk.session_id(), 7u);
1090 ASSERT_TRUE(chunk.status().has_value());
1091 EXPECT_EQ(chunk.status().value(), Status::Internal());
1092 }
1093
TEST_F(WriteTransferMaxBytes16,MultipleParameters)1094 TEST_F(WriteTransferMaxBytes16, MultipleParameters) {
1095 ctx_.SendClientStream(EncodeChunk(
1096 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1097 transfer_thread_.WaitUntilEventIsProcessed();
1098
1099 EXPECT_TRUE(handler_.prepare_write_called);
1100 EXPECT_FALSE(handler_.finalize_write_called);
1101
1102 ASSERT_EQ(ctx_.total_responses(), 1u);
1103 Chunk chunk = DecodeChunk(ctx_.responses().back());
1104 EXPECT_EQ(chunk.session_id(), 7u);
1105 EXPECT_EQ(chunk.window_end_offset(), 16u);
1106
1107 ctx_.SendClientStream<64>(
1108 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1109 .set_session_id(7)
1110 .set_offset(0)
1111 .set_payload(span(kData).first(8))));
1112 transfer_thread_.WaitUntilEventIsProcessed();
1113
1114 ASSERT_EQ(ctx_.total_responses(), 2u);
1115 chunk = DecodeChunk(ctx_.responses().back());
1116 EXPECT_EQ(chunk.session_id(), 7u);
1117 EXPECT_EQ(chunk.offset(), 8u);
1118 EXPECT_EQ(chunk.window_end_offset(), 24u);
1119
1120 ctx_.SendClientStream<64>(
1121 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1122 .set_session_id(7)
1123 .set_offset(8)
1124 .set_payload(span(kData).subspan(8, 8))));
1125 transfer_thread_.WaitUntilEventIsProcessed();
1126
1127 ASSERT_EQ(ctx_.total_responses(), 3u);
1128 chunk = DecodeChunk(ctx_.responses().back());
1129 EXPECT_EQ(chunk.session_id(), 7u);
1130 EXPECT_EQ(chunk.offset(), 16u);
1131 EXPECT_EQ(chunk.window_end_offset(), 32u);
1132
1133 ctx_.SendClientStream<64>(
1134 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1135 .set_session_id(7)
1136 .set_offset(16)
1137 .set_payload(span(kData).subspan(16, 8))));
1138 transfer_thread_.WaitUntilEventIsProcessed();
1139
1140 ASSERT_EQ(ctx_.total_responses(), 4u);
1141 chunk = DecodeChunk(ctx_.responses().back());
1142 EXPECT_EQ(chunk.session_id(), 7u);
1143 EXPECT_EQ(chunk.offset(), 24u);
1144 EXPECT_EQ(chunk.window_end_offset(), 32u);
1145
1146 ctx_.SendClientStream<64>(
1147 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1148 .set_session_id(7)
1149 .set_offset(24)
1150 .set_payload(span(kData).subspan(24))
1151 .set_remaining_bytes(0)));
1152 transfer_thread_.WaitUntilEventIsProcessed();
1153
1154 ASSERT_EQ(ctx_.total_responses(), 5u);
1155 chunk = DecodeChunk(ctx_.responses().back());
1156 EXPECT_EQ(chunk.session_id(), 7u);
1157 ASSERT_TRUE(chunk.status().has_value());
1158 EXPECT_EQ(chunk.status().value(), OkStatus());
1159
1160 EXPECT_TRUE(handler_.finalize_write_called);
1161 EXPECT_EQ(handler_.finalize_write_status, OkStatus());
1162 EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
1163 }
1164
TEST_F(WriteTransferMaxBytes16,SetsDefaultWindowEndOffset)1165 TEST_F(WriteTransferMaxBytes16, SetsDefaultWindowEndOffset) {
1166 // Default max bytes is smaller than buffer.
1167 ctx_.SendClientStream(EncodeChunk(
1168 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1169 transfer_thread_.WaitUntilEventIsProcessed();
1170
1171 ASSERT_EQ(ctx_.total_responses(), 1u);
1172 Chunk chunk = DecodeChunk(ctx_.responses().back());
1173 EXPECT_EQ(chunk.session_id(), 7u);
1174 EXPECT_EQ(chunk.window_end_offset(), 16u);
1175 }
1176
TEST_F(WriteTransfer,SetsWriterWindowEndOffset)1177 TEST_F(WriteTransfer, SetsWriterWindowEndOffset) {
1178 // Buffer is smaller than constructor's default max bytes.
1179 std::array<std::byte, 8> small_buffer = {};
1180
1181 SimpleWriteTransfer handler_(987, small_buffer);
1182 ctx_.service().RegisterHandler(handler_);
1183
1184 ctx_.SendClientStream(
1185 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
1186 .set_session_id(987)));
1187 transfer_thread_.WaitUntilEventIsProcessed();
1188
1189 ASSERT_EQ(ctx_.total_responses(), 1u);
1190 Chunk chunk = DecodeChunk(ctx_.responses().back());
1191 EXPECT_EQ(chunk.session_id(), 987u);
1192 EXPECT_EQ(chunk.window_end_offset(), 8u);
1193
1194 ctx_.service().UnregisterHandler(handler_);
1195 }
1196
TEST_F(WriteTransfer,UnexpectedOffset)1197 TEST_F(WriteTransfer, UnexpectedOffset) {
1198 ctx_.SendClientStream(EncodeChunk(
1199 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1200 transfer_thread_.WaitUntilEventIsProcessed();
1201
1202 EXPECT_TRUE(handler_.prepare_write_called);
1203 EXPECT_FALSE(handler_.finalize_write_called);
1204
1205 ASSERT_EQ(ctx_.total_responses(), 1u);
1206 Chunk chunk = DecodeChunk(ctx_.responses().back());
1207 EXPECT_EQ(chunk.session_id(), 7u);
1208 EXPECT_EQ(chunk.offset(), 0u);
1209 EXPECT_EQ(chunk.window_end_offset(), 32u);
1210
1211 ctx_.SendClientStream<64>(
1212 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1213 .set_session_id(7)
1214 .set_offset(0)
1215 .set_payload(span(kData).first(8))));
1216 transfer_thread_.WaitUntilEventIsProcessed();
1217
1218 ASSERT_EQ(ctx_.total_responses(), 1u);
1219
1220 ctx_.SendClientStream<64>(
1221 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1222 .set_session_id(7)
1223 .set_offset(4) // incorrect
1224 .set_payload(span(kData).subspan(8))
1225 .set_remaining_bytes(0)));
1226 transfer_thread_.WaitUntilEventIsProcessed();
1227
1228 ASSERT_EQ(ctx_.total_responses(), 2u);
1229 chunk = DecodeChunk(ctx_.responses().back());
1230 EXPECT_EQ(chunk.session_id(), 7u);
1231 EXPECT_EQ(chunk.offset(), 8u);
1232 EXPECT_EQ(chunk.window_end_offset(), 32u);
1233
1234 ctx_.SendClientStream<64>(
1235 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1236 .set_session_id(7)
1237 .set_offset(8) // correct
1238 .set_payload(span(kData).subspan(8))
1239 .set_remaining_bytes(0)));
1240 transfer_thread_.WaitUntilEventIsProcessed();
1241
1242 ASSERT_EQ(ctx_.total_responses(), 3u);
1243 chunk = DecodeChunk(ctx_.responses().back());
1244 EXPECT_EQ(chunk.session_id(), 7u);
1245 ASSERT_TRUE(chunk.status().has_value());
1246 EXPECT_EQ(chunk.status().value(), OkStatus());
1247
1248 EXPECT_TRUE(handler_.finalize_write_called);
1249 EXPECT_EQ(handler_.finalize_write_status, OkStatus());
1250 EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
1251 }
1252
TEST_F(WriteTransferMaxBytes16,TooMuchData)1253 TEST_F(WriteTransferMaxBytes16, TooMuchData) {
1254 ctx_.SendClientStream(EncodeChunk(
1255 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1256 transfer_thread_.WaitUntilEventIsProcessed();
1257
1258 EXPECT_TRUE(handler_.prepare_write_called);
1259 EXPECT_FALSE(handler_.finalize_write_called);
1260
1261 ASSERT_EQ(ctx_.total_responses(), 1u);
1262 Chunk chunk = DecodeChunk(ctx_.responses().back());
1263 EXPECT_EQ(chunk.session_id(), 7u);
1264 EXPECT_EQ(chunk.window_end_offset(), 16u);
1265
1266 // window_end_offset = 16, but send 24 bytes of data.
1267 ctx_.SendClientStream<64>(
1268 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1269 .set_session_id(7)
1270 .set_offset(0)
1271 .set_payload(span(kData).first(24))));
1272 transfer_thread_.WaitUntilEventIsProcessed();
1273
1274 ASSERT_EQ(ctx_.total_responses(), 2u);
1275 chunk = DecodeChunk(ctx_.responses().back());
1276 EXPECT_EQ(chunk.session_id(), 7u);
1277 ASSERT_TRUE(chunk.status().has_value());
1278 EXPECT_EQ(chunk.status().value(), Status::Internal());
1279 }
1280
TEST_F(WriteTransfer,UnregisteredHandler)1281 TEST_F(WriteTransfer, UnregisteredHandler) {
1282 ctx_.SendClientStream(
1283 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
1284 .set_session_id(999)));
1285 transfer_thread_.WaitUntilEventIsProcessed();
1286
1287 ASSERT_EQ(ctx_.total_responses(), 1u);
1288 Chunk chunk = DecodeChunk(ctx_.responses().back());
1289 EXPECT_EQ(chunk.session_id(), 999u);
1290 ASSERT_TRUE(chunk.status().has_value());
1291 EXPECT_EQ(chunk.status().value(), Status::NotFound());
1292 }
1293
TEST_F(WriteTransfer,ClientError)1294 TEST_F(WriteTransfer, ClientError) {
1295 ctx_.SendClientStream(EncodeChunk(
1296 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1297 transfer_thread_.WaitUntilEventIsProcessed();
1298
1299 EXPECT_TRUE(handler_.prepare_write_called);
1300 EXPECT_FALSE(handler_.finalize_write_called);
1301
1302 ASSERT_EQ(ctx_.total_responses(), 1u);
1303 Chunk chunk = DecodeChunk(ctx_.responses().back());
1304 EXPECT_EQ(chunk.session_id(), 7u);
1305 EXPECT_EQ(chunk.window_end_offset(), 32u);
1306
1307 ctx_.SendClientStream<64>(EncodeChunk(
1308 Chunk::Final(ProtocolVersion::kLegacy, 7, Status::DataLoss())));
1309 transfer_thread_.WaitUntilEventIsProcessed();
1310
1311 EXPECT_EQ(ctx_.total_responses(), 1u);
1312
1313 EXPECT_TRUE(handler_.finalize_write_called);
1314 EXPECT_EQ(handler_.finalize_write_status, Status::DataLoss());
1315 }
1316
TEST_F(WriteTransfer,OnlySendParametersUpdateOnceAfterDrop)1317 TEST_F(WriteTransfer, OnlySendParametersUpdateOnceAfterDrop) {
1318 ctx_.SendClientStream(EncodeChunk(
1319 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1320 transfer_thread_.WaitUntilEventIsProcessed();
1321
1322 ASSERT_EQ(ctx_.total_responses(), 1u);
1323
1324 constexpr span data(kData);
1325 ctx_.SendClientStream<64>(
1326 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1327 .set_session_id(7)
1328 .set_offset(0)
1329 .set_payload(data.first(1))));
1330
1331 // Drop offset 1, then send the rest of the data.
1332 for (uint32_t i = 2; i < kData.size(); ++i) {
1333 ctx_.SendClientStream<64>(
1334 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1335 .set_session_id(7)
1336 .set_offset(i)
1337 .set_payload(data.subspan(i, 1))));
1338 }
1339
1340 transfer_thread_.WaitUntilEventIsProcessed();
1341
1342 ASSERT_EQ(ctx_.total_responses(), 2u);
1343 Chunk chunk = DecodeChunk(ctx_.responses().back());
1344 EXPECT_EQ(chunk.session_id(), 7u);
1345 EXPECT_EQ(chunk.offset(), 1u);
1346
1347 // Send the remaining data.
1348 ctx_.SendClientStream<64>(
1349 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1350 .set_session_id(7)
1351 .set_offset(1)
1352 .set_payload(data.subspan(1, 31))
1353 .set_remaining_bytes(0)));
1354 transfer_thread_.WaitUntilEventIsProcessed();
1355
1356 EXPECT_TRUE(handler_.finalize_write_called);
1357 EXPECT_EQ(handler_.finalize_write_status, OkStatus());
1358 }
1359
TEST_F(WriteTransfer,ResendParametersIfSentRepeatedChunkDuringRecovery)1360 TEST_F(WriteTransfer, ResendParametersIfSentRepeatedChunkDuringRecovery) {
1361 ctx_.SendClientStream(EncodeChunk(
1362 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1363 transfer_thread_.WaitUntilEventIsProcessed();
1364
1365 ASSERT_EQ(ctx_.total_responses(), 1u);
1366
1367 constexpr span data(kData);
1368
1369 // Skip offset 0, then send the rest of the data.
1370 for (uint32_t i = 1; i < kData.size(); ++i) {
1371 ctx_.SendClientStream<64>(
1372 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1373 .set_session_id(7)
1374 .set_offset(i)
1375 .set_payload(data.subspan(i, 1))));
1376 }
1377
1378 transfer_thread_.WaitUntilEventIsProcessed();
1379
1380 ASSERT_EQ(ctx_.total_responses(), 2u); // Resent transfer parameters once.
1381
1382 const auto last_chunk =
1383 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1384 .set_session_id(7)
1385 .set_offset(kData.size() - 1)
1386 .set_payload(data.last(1)));
1387 ctx_.SendClientStream<64>(last_chunk);
1388 transfer_thread_.WaitUntilEventIsProcessed();
1389
1390 // Resent transfer parameters since the packet is repeated
1391 ASSERT_EQ(ctx_.total_responses(), 3u);
1392
1393 ctx_.SendClientStream<64>(last_chunk);
1394 transfer_thread_.WaitUntilEventIsProcessed();
1395
1396 ASSERT_EQ(ctx_.total_responses(), 4u);
1397
1398 Chunk chunk = DecodeChunk(ctx_.responses().back());
1399 EXPECT_EQ(chunk.session_id(), 7u);
1400 EXPECT_EQ(chunk.offset(), 0u);
1401 EXPECT_EQ(chunk.window_end_offset(), 32u);
1402
1403 // Resumes normal operation when correct offset is sent.
1404 ctx_.SendClientStream<64>(
1405 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1406 .set_session_id(7)
1407 .set_offset(0)
1408 .set_payload(kData)
1409 .set_remaining_bytes(0)));
1410 transfer_thread_.WaitUntilEventIsProcessed();
1411
1412 EXPECT_TRUE(handler_.finalize_write_called);
1413 EXPECT_EQ(handler_.finalize_write_status, OkStatus());
1414 }
1415
TEST_F(WriteTransfer,ResendsStatusIfClientRetriesAfterStatusChunk)1416 TEST_F(WriteTransfer, ResendsStatusIfClientRetriesAfterStatusChunk) {
1417 ctx_.SendClientStream(EncodeChunk(
1418 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1419 transfer_thread_.WaitUntilEventIsProcessed();
1420
1421 ASSERT_EQ(ctx_.total_responses(), 1u);
1422
1423 ctx_.SendClientStream<64>(
1424 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1425 .set_session_id(7)
1426 .set_offset(0)
1427 .set_payload(kData)
1428 .set_remaining_bytes(0)));
1429 transfer_thread_.WaitUntilEventIsProcessed();
1430
1431 ASSERT_EQ(ctx_.total_responses(), 2u);
1432 Chunk chunk = DecodeChunk(ctx_.responses().back());
1433 ASSERT_TRUE(chunk.status().has_value());
1434 EXPECT_EQ(chunk.status().value(), OkStatus());
1435
1436 ctx_.SendClientStream<64>(
1437 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1438 .set_session_id(7)
1439 .set_offset(0)
1440 .set_payload(kData)
1441 .set_remaining_bytes(0)));
1442 transfer_thread_.WaitUntilEventIsProcessed();
1443
1444 ASSERT_EQ(ctx_.total_responses(), 3u);
1445 chunk = DecodeChunk(ctx_.responses().back());
1446 ASSERT_TRUE(chunk.status().has_value());
1447 EXPECT_EQ(chunk.status().value(), OkStatus());
1448 }
1449
TEST_F(WriteTransfer,IgnoresNonPendingTransfers)1450 TEST_F(WriteTransfer, IgnoresNonPendingTransfers) {
1451 ctx_.SendClientStream<64>(
1452 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1453 .set_session_id(7)
1454 .set_offset(3)));
1455 ctx_.SendClientStream<64>(
1456 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1457 .set_session_id(7)
1458 .set_offset(0)
1459 .set_payload(span(kData).first(10))
1460 .set_remaining_bytes(0)));
1461
1462 transfer_thread_.WaitUntilEventIsProcessed();
1463
1464 // Only start transfer for initial packet.
1465 EXPECT_FALSE(handler_.prepare_write_called);
1466 EXPECT_FALSE(handler_.finalize_write_called);
1467 }
1468
TEST_F(WriteTransfer,AbortAndRestartIfInitialPacketIsReceived)1469 TEST_F(WriteTransfer, AbortAndRestartIfInitialPacketIsReceived) {
1470 ctx_.SendClientStream(EncodeChunk(
1471 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1472 transfer_thread_.WaitUntilEventIsProcessed();
1473
1474 ASSERT_EQ(ctx_.total_responses(), 1u);
1475
1476 ctx_.SendClientStream<64>(
1477 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1478 .set_session_id(7)
1479 .set_offset(0)
1480 .set_payload(span(kData).first(8))));
1481 transfer_thread_.WaitUntilEventIsProcessed();
1482
1483 ASSERT_EQ(ctx_.total_responses(), 1u);
1484
1485 ASSERT_TRUE(handler_.prepare_write_called);
1486 ASSERT_FALSE(handler_.finalize_write_called);
1487 handler_.prepare_write_called = false; // Reset to check it's called again.
1488
1489 // Simulate client disappearing then restarting the transfer.
1490 ctx_.SendClientStream(EncodeChunk(
1491 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1492 transfer_thread_.WaitUntilEventIsProcessed();
1493
1494 EXPECT_TRUE(handler_.prepare_write_called);
1495 EXPECT_TRUE(handler_.finalize_write_called);
1496 EXPECT_EQ(handler_.finalize_write_status, Status::Aborted());
1497
1498 handler_.finalize_write_called = false; // Reset to check it's called again.
1499
1500 ASSERT_EQ(ctx_.total_responses(), 2u);
1501
1502 ctx_.SendClientStream<64>(
1503 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1504 .set_session_id(7)
1505 .set_offset(0)
1506 .set_payload(kData)
1507 .set_remaining_bytes(0)));
1508 transfer_thread_.WaitUntilEventIsProcessed();
1509
1510 ASSERT_EQ(ctx_.total_responses(), 3u);
1511
1512 EXPECT_TRUE(handler_.finalize_write_called);
1513 EXPECT_EQ(handler_.finalize_write_status, OkStatus());
1514 EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
1515 }
1516
1517 class SometimesUnavailableReadHandler final : public ReadOnlyHandler {
1518 public:
SometimesUnavailableReadHandler(uint32_t session_id,ConstByteSpan data)1519 SometimesUnavailableReadHandler(uint32_t session_id, ConstByteSpan data)
1520 : ReadOnlyHandler(session_id), reader_(data), call_count_(0) {}
1521
PrepareRead()1522 Status PrepareRead() final {
1523 if ((call_count_++ % 2) == 0) {
1524 return Status::Unavailable();
1525 }
1526
1527 set_reader(reader_);
1528 return OkStatus();
1529 }
1530
1531 private:
1532 stream::MemoryReader reader_;
1533 int call_count_;
1534 };
1535
TEST_F(ReadTransfer,PrepareError)1536 TEST_F(ReadTransfer, PrepareError) {
1537 SometimesUnavailableReadHandler unavailable_handler(88, kData);
1538 ctx_.service().RegisterHandler(unavailable_handler);
1539
1540 ctx_.SendClientStream(
1541 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
1542 .set_session_id(88)
1543 .set_window_end_offset(128)
1544 .set_offset(0)));
1545 transfer_thread_.WaitUntilEventIsProcessed();
1546
1547 ASSERT_EQ(ctx_.total_responses(), 1u);
1548 Chunk chunk = DecodeChunk(ctx_.responses().back());
1549 EXPECT_EQ(chunk.session_id(), 88u);
1550 ASSERT_TRUE(chunk.status().has_value());
1551 EXPECT_EQ(chunk.status().value(), Status::DataLoss());
1552
1553 // Try starting the transfer again. It should work this time.
1554 // TODO(frolv): This won't work until completion ACKs are supported.
1555 if (false) {
1556 ctx_.SendClientStream(
1557 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
1558 .set_session_id(88)
1559 .set_window_end_offset(128)
1560 .set_offset(0)));
1561 transfer_thread_.WaitUntilEventIsProcessed();
1562
1563 ASSERT_EQ(ctx_.total_responses(), 2u);
1564 chunk = DecodeChunk(ctx_.responses().back());
1565 EXPECT_EQ(chunk.session_id(), 88u);
1566 ASSERT_EQ(chunk.payload().size(), kData.size());
1567 EXPECT_EQ(std::memcmp(
1568 chunk.payload().data(), kData.data(), chunk.payload().size()),
1569 0);
1570 }
1571 }
1572
TEST_F(WriteTransferMaxBytes16,Service_SetMaxPendingBytes)1573 TEST_F(WriteTransferMaxBytes16, Service_SetMaxPendingBytes) {
1574 ctx_.SendClientStream(EncodeChunk(
1575 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart).set_session_id(7)));
1576 transfer_thread_.WaitUntilEventIsProcessed();
1577
1578 EXPECT_TRUE(handler_.prepare_write_called);
1579 EXPECT_FALSE(handler_.finalize_write_called);
1580
1581 // First parameters chunk has the default window end offset of 16.
1582 ASSERT_EQ(ctx_.total_responses(), 1u);
1583 Chunk chunk = DecodeChunk(ctx_.responses().back());
1584 EXPECT_EQ(chunk.session_id(), 7u);
1585 EXPECT_EQ(chunk.window_end_offset(), 16u);
1586
1587 // Update the pending bytes value.
1588 ctx_.service().set_max_pending_bytes(12);
1589
1590 ctx_.SendClientStream<64>(
1591 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
1592 .set_session_id(7)
1593 .set_offset(0)
1594 .set_payload(span(kData).first(8))));
1595 transfer_thread_.WaitUntilEventIsProcessed();
1596
1597 // Second parameters chunk should use the new max pending bytes.
1598 ASSERT_EQ(ctx_.total_responses(), 2u);
1599 chunk = DecodeChunk(ctx_.responses().back());
1600 EXPECT_EQ(chunk.session_id(), 7u);
1601 EXPECT_EQ(chunk.offset(), 8u);
1602 EXPECT_EQ(chunk.window_end_offset(), 8u + 12u);
1603 }
1604
TEST_F(ReadTransfer,Version2_SimpleTransfer)1605 TEST_F(ReadTransfer, Version2_SimpleTransfer) {
1606 ctx_.SendClientStream(
1607 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
1608 .set_resource_id(3)));
1609
1610 transfer_thread_.WaitUntilEventIsProcessed();
1611
1612 EXPECT_TRUE(handler_.prepare_read_called);
1613 EXPECT_FALSE(handler_.finalize_read_called);
1614
1615 // First, the server responds with a START_ACK, assigning a session ID and
1616 // confirming the protocol version.
1617 ASSERT_EQ(ctx_.total_responses(), 1u);
1618 Chunk chunk = DecodeChunk(ctx_.responses().back());
1619 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1620 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
1621 EXPECT_EQ(chunk.session_id(), 1u);
1622 EXPECT_EQ(chunk.resource_id(), 3u);
1623
1624 // Complete the handshake by confirming the server's ACK and sending the first
1625 // read transfer parameters.
1626 rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
1627 ctx_.SendClientStream(EncodeChunk(
1628 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
1629 .set_session_id(1)
1630 .set_window_end_offset(64)
1631 .set_offset(0)));
1632
1633 transfer_thread_.WaitUntilEventIsProcessed();
1634 });
1635
1636 // Server should respond by starting the data transfer, sending its sole data
1637 // chunk and a remaining_bytes 0 chunk.
1638 ASSERT_EQ(ctx_.total_responses(), 3u);
1639
1640 Chunk c1 = DecodeChunk(ctx_.responses()[1]);
1641 EXPECT_EQ(c1.protocol_version(), ProtocolVersion::kVersionTwo);
1642 EXPECT_EQ(c1.type(), Chunk::Type::kData);
1643 EXPECT_EQ(c1.session_id(), 1u);
1644 EXPECT_EQ(c1.offset(), 0u);
1645 ASSERT_TRUE(c1.has_payload());
1646 ASSERT_EQ(c1.payload().size(), kData.size());
1647 EXPECT_EQ(std::memcmp(c1.payload().data(), kData.data(), c1.payload().size()),
1648 0);
1649
1650 Chunk c2 = DecodeChunk(ctx_.responses()[2]);
1651 EXPECT_EQ(c2.protocol_version(), ProtocolVersion::kVersionTwo);
1652 EXPECT_EQ(c2.type(), Chunk::Type::kData);
1653 EXPECT_EQ(c2.session_id(), 1u);
1654 EXPECT_FALSE(c2.has_payload());
1655 EXPECT_EQ(c2.remaining_bytes(), 0u);
1656
1657 ctx_.SendClientStream(
1658 EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 1, OkStatus())));
1659 transfer_thread_.WaitUntilEventIsProcessed();
1660
1661 EXPECT_TRUE(handler_.finalize_read_called);
1662 EXPECT_EQ(handler_.finalize_read_status, OkStatus());
1663 }
1664
TEST_F(ReadTransfer,Version2_MultiChunk)1665 TEST_F(ReadTransfer, Version2_MultiChunk) {
1666 ctx_.SendClientStream(
1667 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
1668 .set_resource_id(3)));
1669
1670 transfer_thread_.WaitUntilEventIsProcessed();
1671
1672 EXPECT_TRUE(handler_.prepare_read_called);
1673 EXPECT_FALSE(handler_.finalize_read_called);
1674
1675 // First, the server responds with a START_ACK, assigning a session ID and
1676 // confirming the protocol version.
1677 ASSERT_EQ(ctx_.total_responses(), 1u);
1678 Chunk chunk = DecodeChunk(ctx_.responses().back());
1679 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1680 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
1681 EXPECT_EQ(chunk.session_id(), 1u);
1682 EXPECT_EQ(chunk.resource_id(), 3u);
1683
1684 // Complete the handshake by confirming the server's ACK and sending the first
1685 // read transfer parameters.
1686 rpc::test::WaitForPackets(ctx_.output(), 3, [this] {
1687 ctx_.SendClientStream(EncodeChunk(
1688 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
1689 .set_session_id(1)
1690 .set_window_end_offset(64)
1691 .set_max_chunk_size_bytes(16)
1692 .set_offset(0)));
1693
1694 transfer_thread_.WaitUntilEventIsProcessed();
1695 });
1696
1697 ASSERT_EQ(ctx_.total_responses(), 4u);
1698
1699 Chunk c1 = DecodeChunk(ctx_.responses()[1]);
1700 EXPECT_EQ(c1.protocol_version(), ProtocolVersion::kVersionTwo);
1701 EXPECT_EQ(c1.type(), Chunk::Type::kData);
1702 EXPECT_EQ(c1.session_id(), 1u);
1703 EXPECT_EQ(c1.offset(), 0u);
1704 ASSERT_TRUE(c1.has_payload());
1705 ASSERT_EQ(c1.payload().size(), 16u);
1706 EXPECT_EQ(std::memcmp(c1.payload().data(), kData.data(), c1.payload().size()),
1707 0);
1708
1709 Chunk c2 = DecodeChunk(ctx_.responses()[2]);
1710 EXPECT_EQ(c2.protocol_version(), ProtocolVersion::kVersionTwo);
1711 EXPECT_EQ(c2.type(), Chunk::Type::kData);
1712 EXPECT_EQ(c2.session_id(), 1u);
1713 EXPECT_EQ(c2.offset(), 16u);
1714 ASSERT_TRUE(c2.has_payload());
1715 ASSERT_EQ(c2.payload().size(), 16u);
1716 EXPECT_EQ(
1717 std::memcmp(
1718 c2.payload().data(), kData.data() + c2.offset(), c2.payload().size()),
1719 0);
1720
1721 Chunk c3 = DecodeChunk(ctx_.responses()[3]);
1722 EXPECT_EQ(c3.protocol_version(), ProtocolVersion::kVersionTwo);
1723 EXPECT_EQ(c3.type(), Chunk::Type::kData);
1724 EXPECT_EQ(c3.session_id(), 1u);
1725 EXPECT_FALSE(c3.has_payload());
1726 EXPECT_EQ(c3.remaining_bytes(), 0u);
1727
1728 ctx_.SendClientStream(
1729 EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 1, OkStatus())));
1730 transfer_thread_.WaitUntilEventIsProcessed();
1731
1732 EXPECT_TRUE(handler_.finalize_read_called);
1733 EXPECT_EQ(handler_.finalize_read_status, OkStatus());
1734 }
1735
TEST_F(ReadTransfer,Version2_MultiParameters)1736 TEST_F(ReadTransfer, Version2_MultiParameters) {
1737 ctx_.SendClientStream(
1738 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
1739 .set_resource_id(3)));
1740
1741 transfer_thread_.WaitUntilEventIsProcessed();
1742
1743 EXPECT_TRUE(handler_.prepare_read_called);
1744 EXPECT_FALSE(handler_.finalize_read_called);
1745
1746 // First, the server responds with a START_ACK, assigning a session ID and
1747 // confirming the protocol version.
1748 ASSERT_EQ(ctx_.total_responses(), 1u);
1749 Chunk chunk = DecodeChunk(ctx_.responses().back());
1750 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1751 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
1752 EXPECT_EQ(chunk.session_id(), 1u);
1753 EXPECT_EQ(chunk.resource_id(), 3u);
1754
1755 // Complete the handshake by confirming the server's ACK and sending the first
1756 // read transfer parameters.
1757 ctx_.SendClientStream(EncodeChunk(
1758 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
1759 .set_session_id(1)
1760 .set_window_end_offset(16)
1761 .set_offset(0)));
1762 transfer_thread_.WaitUntilEventIsProcessed();
1763
1764 ASSERT_EQ(ctx_.total_responses(), 2u);
1765
1766 Chunk c1 = DecodeChunk(ctx_.responses()[1]);
1767 EXPECT_EQ(c1.protocol_version(), ProtocolVersion::kVersionTwo);
1768 EXPECT_EQ(c1.type(), Chunk::Type::kData);
1769 EXPECT_EQ(c1.session_id(), 1u);
1770 EXPECT_EQ(c1.offset(), 0u);
1771 ASSERT_TRUE(c1.has_payload());
1772 ASSERT_EQ(c1.payload().size(), 16u);
1773 EXPECT_EQ(std::memcmp(c1.payload().data(), kData.data(), c1.payload().size()),
1774 0);
1775
1776 rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
1777 ctx_.SendClientStream(EncodeChunk(
1778 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kParametersContinue)
1779 .set_session_id(1)
1780 .set_window_end_offset(64)
1781 .set_offset(16)));
1782 transfer_thread_.WaitUntilEventIsProcessed();
1783 });
1784
1785 ASSERT_EQ(ctx_.total_responses(), 4u);
1786
1787 Chunk c2 = DecodeChunk(ctx_.responses()[2]);
1788 EXPECT_EQ(c2.protocol_version(), ProtocolVersion::kVersionTwo);
1789 EXPECT_EQ(c2.type(), Chunk::Type::kData);
1790 EXPECT_EQ(c2.session_id(), 1u);
1791 EXPECT_EQ(c2.offset(), 16u);
1792 ASSERT_TRUE(c2.has_payload());
1793 ASSERT_EQ(c2.payload().size(), 16u);
1794 EXPECT_EQ(
1795 std::memcmp(
1796 c2.payload().data(), kData.data() + c2.offset(), c2.payload().size()),
1797 0);
1798
1799 Chunk c3 = DecodeChunk(ctx_.responses()[3]);
1800 EXPECT_EQ(c3.protocol_version(), ProtocolVersion::kVersionTwo);
1801 EXPECT_EQ(c3.type(), Chunk::Type::kData);
1802 EXPECT_EQ(c3.session_id(), 1u);
1803 EXPECT_FALSE(c3.has_payload());
1804 EXPECT_EQ(c3.remaining_bytes(), 0u);
1805
1806 ctx_.SendClientStream(
1807 EncodeChunk(Chunk::Final(ProtocolVersion::kVersionTwo, 1, OkStatus())));
1808 transfer_thread_.WaitUntilEventIsProcessed();
1809
1810 EXPECT_TRUE(handler_.finalize_read_called);
1811 EXPECT_EQ(handler_.finalize_read_status, OkStatus());
1812 }
1813
TEST_F(ReadTransfer,Version2_ClientTerminatesDuringHandshake)1814 TEST_F(ReadTransfer, Version2_ClientTerminatesDuringHandshake) {
1815 ctx_.SendClientStream(
1816 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
1817 .set_resource_id(3)));
1818
1819 transfer_thread_.WaitUntilEventIsProcessed();
1820
1821 EXPECT_TRUE(handler_.prepare_read_called);
1822 EXPECT_FALSE(handler_.finalize_read_called);
1823
1824 // First, the server responds with a START_ACK, assigning a session ID and
1825 // confirming the protocol version.
1826 ASSERT_EQ(ctx_.total_responses(), 1u);
1827 Chunk chunk = DecodeChunk(ctx_.responses().back());
1828 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1829 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
1830 EXPECT_EQ(chunk.session_id(), 1u);
1831 EXPECT_EQ(chunk.resource_id(), 3u);
1832
1833 // Send a terminating chunk instead of the third part of the handshake.
1834 ctx_.SendClientStream(EncodeChunk(Chunk::Final(
1835 ProtocolVersion::kVersionTwo, 1, Status::ResourceExhausted())));
1836 transfer_thread_.WaitUntilEventIsProcessed();
1837
1838 EXPECT_TRUE(handler_.finalize_read_called);
1839 EXPECT_EQ(handler_.finalize_read_status, Status::ResourceExhausted());
1840 }
1841
TEST_F(ReadTransfer,Version2_ClientSendsWrongProtocolVersion)1842 TEST_F(ReadTransfer, Version2_ClientSendsWrongProtocolVersion) {
1843 ctx_.SendClientStream(
1844 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
1845 .set_resource_id(3)));
1846
1847 transfer_thread_.WaitUntilEventIsProcessed();
1848
1849 EXPECT_TRUE(handler_.prepare_read_called);
1850 EXPECT_FALSE(handler_.finalize_read_called);
1851
1852 // First, the server responds with a START_ACK, assigning a session ID and
1853 // confirming the protocol version.
1854 ASSERT_EQ(ctx_.total_responses(), 1u);
1855 Chunk chunk = DecodeChunk(ctx_.responses().back());
1856 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1857 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
1858 EXPECT_EQ(chunk.session_id(), 1u);
1859 EXPECT_EQ(chunk.resource_id(), 3u);
1860
1861 // Complete the handshake by confirming the server's ACK and sending the first
1862 // read transfer parameters.
1863 ctx_.SendClientStream(EncodeChunk(
1864 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
1865 .set_session_id(1)
1866 .set_window_end_offset(16)
1867 .set_offset(0)));
1868 transfer_thread_.WaitUntilEventIsProcessed();
1869
1870 ASSERT_EQ(ctx_.total_responses(), 2u);
1871
1872 Chunk c1 = DecodeChunk(ctx_.responses()[1]);
1873 EXPECT_EQ(c1.protocol_version(), ProtocolVersion::kVersionTwo);
1874 EXPECT_EQ(c1.type(), Chunk::Type::kData);
1875 EXPECT_EQ(c1.session_id(), 1u);
1876 EXPECT_EQ(c1.offset(), 0u);
1877 ASSERT_TRUE(c1.has_payload());
1878 ASSERT_EQ(c1.payload().size(), 16u);
1879 EXPECT_EQ(std::memcmp(c1.payload().data(), kData.data(), c1.payload().size()),
1880 0);
1881
1882 // Send a parameters update, but with the incorrect protocol version. The
1883 // server should terminate the transfer.
1884 ctx_.SendClientStream(EncodeChunk(
1885 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersContinue)
1886 .set_session_id(1)
1887 .set_window_end_offset(64)
1888 .set_offset(16)));
1889 transfer_thread_.WaitUntilEventIsProcessed();
1890
1891 ASSERT_EQ(ctx_.total_responses(), 3u);
1892
1893 chunk = DecodeChunk(ctx_.responses().back());
1894 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1895 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
1896 EXPECT_EQ(chunk.session_id(), 1u);
1897 ASSERT_TRUE(chunk.status().has_value());
1898 EXPECT_EQ(chunk.status().value(), Status::Internal());
1899
1900 EXPECT_TRUE(handler_.finalize_read_called);
1901 EXPECT_EQ(handler_.finalize_read_status, Status::Internal());
1902 }
1903
TEST_F(ReadTransfer,Version2_BadParametersInHandshake)1904 TEST_F(ReadTransfer, Version2_BadParametersInHandshake) {
1905 ctx_.SendClientStream(
1906 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
1907 .set_resource_id(3)));
1908
1909 transfer_thread_.WaitUntilEventIsProcessed();
1910
1911 EXPECT_TRUE(handler_.prepare_read_called);
1912 EXPECT_FALSE(handler_.finalize_read_called);
1913
1914 ASSERT_EQ(ctx_.total_responses(), 1u);
1915 Chunk chunk = DecodeChunk(ctx_.responses().back());
1916 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1917 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
1918 EXPECT_EQ(chunk.session_id(), 1u);
1919 EXPECT_EQ(chunk.resource_id(), 3u);
1920
1921 // Complete the handshake, but send an invalid parameters chunk. The server
1922 // should terminate the transfer.
1923 ctx_.SendClientStream(EncodeChunk(
1924 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
1925 .set_session_id(1)
1926 .set_window_end_offset(0)
1927 .set_offset(0)));
1928
1929 transfer_thread_.WaitUntilEventIsProcessed();
1930
1931 ASSERT_EQ(ctx_.total_responses(), 2u);
1932
1933 Chunk c1 = DecodeChunk(ctx_.responses()[1]);
1934 EXPECT_EQ(c1.protocol_version(), ProtocolVersion::kVersionTwo);
1935 EXPECT_EQ(c1.type(), Chunk::Type::kCompletion);
1936 EXPECT_EQ(c1.session_id(), 1u);
1937 ASSERT_TRUE(c1.status().has_value());
1938 EXPECT_EQ(c1.status().value(), Status::ResourceExhausted());
1939 }
1940
TEST_F(ReadTransfer,Version2_InvalidResourceId)1941 TEST_F(ReadTransfer, Version2_InvalidResourceId) {
1942 ctx_.SendClientStream(
1943 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
1944 .set_resource_id(99)));
1945
1946 transfer_thread_.WaitUntilEventIsProcessed();
1947
1948 ASSERT_EQ(ctx_.total_responses(), 1u);
1949
1950 Chunk chunk = DecodeChunk(ctx_.responses().back());
1951 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1952 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
1953 EXPECT_EQ(chunk.status().value(), Status::NotFound());
1954 }
1955
TEST_F(ReadTransfer,Version2_PrepareError)1956 TEST_F(ReadTransfer, Version2_PrepareError) {
1957 SometimesUnavailableReadHandler unavailable_handler(99, kData);
1958 ctx_.service().RegisterHandler(unavailable_handler);
1959
1960 ctx_.SendClientStream(
1961 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
1962 .set_resource_id(99)));
1963 transfer_thread_.WaitUntilEventIsProcessed();
1964
1965 ASSERT_EQ(ctx_.total_responses(), 1u);
1966 Chunk chunk = DecodeChunk(ctx_.responses().back());
1967 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1968 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
1969 EXPECT_EQ(chunk.resource_id(), 99u);
1970 EXPECT_EQ(chunk.status().value(), Status::DataLoss());
1971 }
1972
TEST_F(WriteTransfer,Version2_SimpleTransfer)1973 TEST_F(WriteTransfer, Version2_SimpleTransfer) {
1974 ctx_.SendClientStream(
1975 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
1976 .set_resource_id(7)));
1977
1978 transfer_thread_.WaitUntilEventIsProcessed();
1979
1980 EXPECT_TRUE(handler_.prepare_write_called);
1981 EXPECT_FALSE(handler_.finalize_write_called);
1982
1983 // First, the server responds with a START_ACK, assigning a session ID and
1984 // confirming the protocol version.
1985 ASSERT_EQ(ctx_.total_responses(), 1u);
1986 Chunk chunk = DecodeChunk(ctx_.responses().back());
1987 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
1988 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
1989 EXPECT_EQ(chunk.session_id(), 1u);
1990 EXPECT_EQ(chunk.resource_id(), 7u);
1991
1992 // Complete the handshake by confirming the server's ACK.
1993 ctx_.SendClientStream(EncodeChunk(
1994 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
1995 .set_session_id(1)));
1996 transfer_thread_.WaitUntilEventIsProcessed();
1997
1998 // Server should respond by sending its initial transfer parameters.
1999 ASSERT_EQ(ctx_.total_responses(), 2u);
2000
2001 chunk = DecodeChunk(ctx_.responses()[1]);
2002 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2003 EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
2004 EXPECT_EQ(chunk.session_id(), 1u);
2005 EXPECT_EQ(chunk.offset(), 0u);
2006 EXPECT_EQ(chunk.window_end_offset(), 32u);
2007 ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
2008 EXPECT_EQ(chunk.max_chunk_size_bytes().value(), 37u);
2009
2010 // Send all of our data.
2011 ctx_.SendClientStream<64>(
2012 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2013 .set_session_id(1)
2014 .set_offset(0)
2015 .set_payload(kData)
2016 .set_remaining_bytes(0)));
2017 transfer_thread_.WaitUntilEventIsProcessed();
2018
2019 ASSERT_EQ(ctx_.total_responses(), 3u);
2020
2021 chunk = DecodeChunk(ctx_.responses().back());
2022 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2023 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2024 EXPECT_EQ(chunk.session_id(), 1u);
2025 ASSERT_TRUE(chunk.status().has_value());
2026 EXPECT_EQ(chunk.status().value(), OkStatus());
2027
2028 // Send the completion acknowledgement.
2029 ctx_.SendClientStream(EncodeChunk(
2030 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
2031 .set_session_id(1)));
2032 transfer_thread_.WaitUntilEventIsProcessed();
2033
2034 ASSERT_EQ(ctx_.total_responses(), 3u);
2035
2036 EXPECT_TRUE(handler_.finalize_write_called);
2037 EXPECT_EQ(handler_.finalize_write_status, OkStatus());
2038 EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
2039 }
2040
TEST_F(WriteTransfer,Version2_Multichunk)2041 TEST_F(WriteTransfer, Version2_Multichunk) {
2042 ctx_.SendClientStream(
2043 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
2044 .set_resource_id(7)));
2045
2046 transfer_thread_.WaitUntilEventIsProcessed();
2047
2048 EXPECT_TRUE(handler_.prepare_write_called);
2049 EXPECT_FALSE(handler_.finalize_write_called);
2050
2051 // First, the server responds with a START_ACK, assigning a session ID and
2052 // confirming the protocol version.
2053 ASSERT_EQ(ctx_.total_responses(), 1u);
2054 Chunk chunk = DecodeChunk(ctx_.responses().back());
2055 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2056 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
2057 EXPECT_EQ(chunk.session_id(), 1u);
2058 EXPECT_EQ(chunk.resource_id(), 7u);
2059
2060 // Complete the handshake by confirming the server's ACK.
2061 ctx_.SendClientStream(EncodeChunk(
2062 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
2063 .set_session_id(1)));
2064 transfer_thread_.WaitUntilEventIsProcessed();
2065
2066 // Server should respond by sending its initial transfer parameters.
2067 ASSERT_EQ(ctx_.total_responses(), 2u);
2068
2069 chunk = DecodeChunk(ctx_.responses()[1]);
2070 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2071 EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
2072 EXPECT_EQ(chunk.session_id(), 1u);
2073 EXPECT_EQ(chunk.offset(), 0u);
2074 EXPECT_EQ(chunk.window_end_offset(), 32u);
2075 ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
2076 EXPECT_EQ(chunk.max_chunk_size_bytes().value(), 37u);
2077
2078 // Send all of our data across two chunks.
2079 ctx_.SendClientStream<64>(
2080 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2081 .set_session_id(1)
2082 .set_offset(0)
2083 .set_payload(span(kData).first(8))));
2084 ctx_.SendClientStream<64>(
2085 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2086 .set_session_id(1)
2087 .set_offset(8)
2088 .set_payload(span(kData).subspan(8))
2089 .set_remaining_bytes(0)));
2090 transfer_thread_.WaitUntilEventIsProcessed();
2091
2092 ASSERT_EQ(ctx_.total_responses(), 3u);
2093
2094 chunk = DecodeChunk(ctx_.responses().back());
2095 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2096 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2097 EXPECT_EQ(chunk.session_id(), 1u);
2098 ASSERT_TRUE(chunk.status().has_value());
2099 EXPECT_EQ(chunk.status().value(), OkStatus());
2100
2101 // Send the completion acknowledgement.
2102 ctx_.SendClientStream(EncodeChunk(
2103 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
2104 .set_session_id(1)));
2105 transfer_thread_.WaitUntilEventIsProcessed();
2106
2107 ASSERT_EQ(ctx_.total_responses(), 3u);
2108
2109 EXPECT_TRUE(handler_.finalize_write_called);
2110 EXPECT_EQ(handler_.finalize_write_status, OkStatus());
2111 EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
2112 }
2113
TEST_F(WriteTransfer,Version2_ContinueParameters)2114 TEST_F(WriteTransfer, Version2_ContinueParameters) {
2115 ctx_.SendClientStream(
2116 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
2117 .set_resource_id(7)));
2118
2119 transfer_thread_.WaitUntilEventIsProcessed();
2120
2121 EXPECT_TRUE(handler_.prepare_write_called);
2122 EXPECT_FALSE(handler_.finalize_write_called);
2123
2124 // First, the server responds with a START_ACK, assigning a session ID and
2125 // confirming the protocol version.
2126 ASSERT_EQ(ctx_.total_responses(), 1u);
2127 Chunk chunk = DecodeChunk(ctx_.responses().back());
2128 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2129 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
2130 EXPECT_EQ(chunk.session_id(), 1u);
2131 EXPECT_EQ(chunk.resource_id(), 7u);
2132
2133 // Complete the handshake by confirming the server's ACK.
2134 ctx_.SendClientStream(EncodeChunk(
2135 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
2136 .set_session_id(1)));
2137 transfer_thread_.WaitUntilEventIsProcessed();
2138
2139 // Server should respond by sending its initial transfer parameters.
2140 ASSERT_EQ(ctx_.total_responses(), 2u);
2141
2142 chunk = DecodeChunk(ctx_.responses()[1]);
2143 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2144 EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
2145 EXPECT_EQ(chunk.session_id(), 1u);
2146 EXPECT_EQ(chunk.offset(), 0u);
2147 EXPECT_EQ(chunk.window_end_offset(), 32u);
2148 ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
2149 EXPECT_EQ(chunk.max_chunk_size_bytes().value(), 37u);
2150
2151 // Send all of our data across several chunks.
2152 ctx_.SendClientStream<64>(
2153 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2154 .set_session_id(1)
2155 .set_offset(0)
2156 .set_payload(span(kData).first(8))));
2157
2158 transfer_thread_.WaitUntilEventIsProcessed();
2159 ASSERT_EQ(ctx_.total_responses(), 2u);
2160
2161 ctx_.SendClientStream<64>(
2162 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2163 .set_session_id(1)
2164 .set_offset(8)
2165 .set_payload(span(kData).subspan(8, 8))));
2166
2167 transfer_thread_.WaitUntilEventIsProcessed();
2168 ASSERT_EQ(ctx_.total_responses(), 3u);
2169
2170 chunk = DecodeChunk(ctx_.responses().back());
2171 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2172 EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
2173 EXPECT_EQ(chunk.session_id(), 1u);
2174 EXPECT_EQ(chunk.offset(), 16u);
2175 EXPECT_EQ(chunk.window_end_offset(), 32u);
2176
2177 ctx_.SendClientStream<64>(
2178 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2179 .set_session_id(1)
2180 .set_offset(16)
2181 .set_payload(span(kData).subspan(16, 8))));
2182
2183 transfer_thread_.WaitUntilEventIsProcessed();
2184 ASSERT_EQ(ctx_.total_responses(), 4u);
2185
2186 chunk = DecodeChunk(ctx_.responses().back());
2187 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2188 EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
2189 EXPECT_EQ(chunk.session_id(), 1u);
2190 EXPECT_EQ(chunk.offset(), 24u);
2191 EXPECT_EQ(chunk.window_end_offset(), 32u);
2192
2193 ctx_.SendClientStream<64>(
2194 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
2195 .set_session_id(1)
2196 .set_offset(24)
2197 .set_payload(span(kData).subspan(24))
2198 .set_remaining_bytes(0)));
2199 transfer_thread_.WaitUntilEventIsProcessed();
2200
2201 ASSERT_EQ(ctx_.total_responses(), 5u);
2202
2203 chunk = DecodeChunk(ctx_.responses().back());
2204 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2205 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2206 EXPECT_EQ(chunk.session_id(), 1u);
2207 ASSERT_TRUE(chunk.status().has_value());
2208 EXPECT_EQ(chunk.status().value(), OkStatus());
2209
2210 // Send the completion acknowledgement.
2211 ctx_.SendClientStream(EncodeChunk(
2212 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
2213 .set_session_id(1)));
2214 transfer_thread_.WaitUntilEventIsProcessed();
2215
2216 ASSERT_EQ(ctx_.total_responses(), 5u);
2217
2218 EXPECT_TRUE(handler_.finalize_write_called);
2219 EXPECT_EQ(handler_.finalize_write_status, OkStatus());
2220 EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
2221 }
2222
TEST_F(WriteTransfer,Version2_ClientTerminatesDuringHandshake)2223 TEST_F(WriteTransfer, Version2_ClientTerminatesDuringHandshake) {
2224 ctx_.SendClientStream(
2225 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
2226 .set_resource_id(7)));
2227
2228 transfer_thread_.WaitUntilEventIsProcessed();
2229
2230 EXPECT_TRUE(handler_.prepare_write_called);
2231 EXPECT_FALSE(handler_.finalize_write_called);
2232
2233 // First, the server responds with a START_ACK, assigning a session ID and
2234 // confirming the protocol version.
2235 ASSERT_EQ(ctx_.total_responses(), 1u);
2236 Chunk chunk = DecodeChunk(ctx_.responses().back());
2237 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2238 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
2239 EXPECT_EQ(chunk.session_id(), 1u);
2240 EXPECT_EQ(chunk.resource_id(), 7u);
2241
2242 // Send an error chunk instead of completing the handshake.
2243 ctx_.SendClientStream(EncodeChunk(Chunk::Final(
2244 ProtocolVersion::kVersionTwo, 1, Status::FailedPrecondition())));
2245 transfer_thread_.WaitUntilEventIsProcessed();
2246
2247 EXPECT_TRUE(handler_.finalize_write_called);
2248 EXPECT_EQ(handler_.finalize_write_status, Status::FailedPrecondition());
2249 }
2250
TEST_F(WriteTransfer,Version2_ClientSendsWrongProtocolVersion)2251 TEST_F(WriteTransfer, Version2_ClientSendsWrongProtocolVersion) {
2252 ctx_.SendClientStream(
2253 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
2254 .set_resource_id(7)));
2255
2256 transfer_thread_.WaitUntilEventIsProcessed();
2257
2258 EXPECT_TRUE(handler_.prepare_write_called);
2259 EXPECT_FALSE(handler_.finalize_write_called);
2260
2261 // First, the server responds with a START_ACK, assigning a session ID and
2262 // confirming the protocol version.
2263 ASSERT_EQ(ctx_.total_responses(), 1u);
2264 Chunk chunk = DecodeChunk(ctx_.responses().back());
2265 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2266 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
2267 EXPECT_EQ(chunk.session_id(), 1u);
2268 EXPECT_EQ(chunk.resource_id(), 7u);
2269
2270 // Complete the handshake by confirming the server's ACK.
2271 ctx_.SendClientStream(EncodeChunk(
2272 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
2273 .set_session_id(1)));
2274 transfer_thread_.WaitUntilEventIsProcessed();
2275
2276 // Server should respond by sending its initial transfer parameters.
2277 ASSERT_EQ(ctx_.total_responses(), 2u);
2278
2279 chunk = DecodeChunk(ctx_.responses()[1]);
2280 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2281 EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
2282 EXPECT_EQ(chunk.session_id(), 1u);
2283 EXPECT_EQ(chunk.offset(), 0u);
2284 EXPECT_EQ(chunk.window_end_offset(), 32u);
2285 ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
2286 EXPECT_EQ(chunk.max_chunk_size_bytes().value(), 37u);
2287
2288 // The transfer was configured to use protocol version 2. Send a legacy chunk
2289 // instead.
2290 ctx_.SendClientStream<64>(
2291 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kData)
2292 .set_session_id(1)
2293 .set_offset(0)
2294 .set_payload(kData)
2295 .set_remaining_bytes(0)));
2296 transfer_thread_.WaitUntilEventIsProcessed();
2297
2298 // Server should terminate the transfer.
2299 ASSERT_EQ(ctx_.total_responses(), 3u);
2300
2301 chunk = DecodeChunk(ctx_.responses()[2]);
2302 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2303 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2304 EXPECT_EQ(chunk.status().value(), Status::Internal());
2305
2306 // Send the completion acknowledgement.
2307 ctx_.SendClientStream(EncodeChunk(
2308 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
2309 .set_session_id(1)));
2310 transfer_thread_.WaitUntilEventIsProcessed();
2311
2312 ASSERT_EQ(ctx_.total_responses(), 3u);
2313 }
2314
TEST_F(WriteTransfer,Version2_InvalidResourceId)2315 TEST_F(WriteTransfer, Version2_InvalidResourceId) {
2316 ctx_.SendClientStream(
2317 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
2318 .set_resource_id(99)));
2319
2320 transfer_thread_.WaitUntilEventIsProcessed();
2321
2322 ASSERT_EQ(ctx_.total_responses(), 1u);
2323
2324 Chunk chunk = DecodeChunk(ctx_.responses().back());
2325 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2326 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2327 EXPECT_EQ(chunk.status().value(), Status::NotFound());
2328 }
2329
2330 class ReadTransferLowMaxRetries : public ::testing::Test {
2331 protected:
2332 static constexpr uint32_t kMaxRetries = 3;
2333 static constexpr uint32_t kMaxLifetimeRetries = 4;
2334
ReadTransferLowMaxRetries()2335 ReadTransferLowMaxRetries()
2336 : handler_(9, kData),
2337 transfer_thread_(data_buffer_, encode_buffer_),
2338 ctx_(transfer_thread_,
2339 64,
2340 // Use a long timeout to avoid accidentally triggering timeouts.
2341 std::chrono::minutes(1),
2342 kMaxRetries,
2343 cfg::kDefaultExtendWindowDivisor,
2344 kMaxLifetimeRetries),
2345 system_thread_(TransferThreadOptions(), transfer_thread_) {
2346 ctx_.service().RegisterHandler(handler_);
2347
2348 PW_CHECK(!handler_.prepare_read_called);
2349 PW_CHECK(!handler_.finalize_read_called);
2350
2351 ctx_.call(); // Open the read stream
2352 transfer_thread_.WaitUntilEventIsProcessed();
2353 }
2354
~ReadTransferLowMaxRetries()2355 ~ReadTransferLowMaxRetries() override {
2356 transfer_thread_.Terminate();
2357 system_thread_.join();
2358 }
2359
2360 SimpleReadTransfer handler_;
2361 Thread<1, 1> transfer_thread_;
2362 PW_RAW_TEST_METHOD_CONTEXT(TransferService, Read, 10) ctx_;
2363 thread::Thread system_thread_;
2364 std::array<std::byte, 64> data_buffer_;
2365 std::array<std::byte, 64> encode_buffer_;
2366 };
2367
TEST_F(ReadTransferLowMaxRetries,FailsAfterLifetimeRetryCount)2368 TEST_F(ReadTransferLowMaxRetries, FailsAfterLifetimeRetryCount) {
2369 ctx_.SendClientStream(
2370 EncodeChunk(Chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart)
2371 .set_session_id(9)
2372 .set_window_end_offset(16)
2373 .set_offset(0)));
2374
2375 transfer_thread_.WaitUntilEventIsProcessed();
2376
2377 EXPECT_TRUE(handler_.prepare_read_called);
2378 EXPECT_FALSE(handler_.finalize_read_called);
2379
2380 ASSERT_EQ(ctx_.total_responses(), 1u);
2381 Chunk chunk = DecodeChunk(ctx_.responses().back());
2382
2383 EXPECT_EQ(chunk.session_id(), 9u);
2384 EXPECT_EQ(chunk.offset(), 0u);
2385 ASSERT_EQ(chunk.payload().size(), 16u);
2386 EXPECT_EQ(
2387 std::memcmp(chunk.payload().data(), kData.data(), chunk.payload().size()),
2388 0);
2389
2390 // Time out twice. Server should retry both times.
2391 transfer_thread_.SimulateServerTimeout(9);
2392 transfer_thread_.SimulateServerTimeout(9);
2393 ASSERT_EQ(ctx_.total_responses(), 3u);
2394 chunk = DecodeChunk(ctx_.responses().back());
2395 EXPECT_EQ(chunk.session_id(), 9u);
2396 EXPECT_EQ(chunk.offset(), 0u);
2397 ASSERT_EQ(chunk.payload().size(), 16u);
2398 EXPECT_EQ(
2399 std::memcmp(chunk.payload().data(), kData.data(), chunk.payload().size()),
2400 0);
2401
2402 ctx_.SendClientStream(EncodeChunk(
2403 Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersContinue)
2404 .set_session_id(9)
2405 .set_window_end_offset(32)
2406 .set_offset(16)));
2407 transfer_thread_.WaitUntilEventIsProcessed();
2408
2409 ASSERT_EQ(ctx_.total_responses(), 4u);
2410 chunk = DecodeChunk(ctx_.responses().back());
2411 EXPECT_EQ(chunk.session_id(), 9u);
2412 EXPECT_EQ(chunk.offset(), 16u);
2413 ASSERT_EQ(chunk.payload().size(), 16u);
2414 EXPECT_EQ(
2415 std::memcmp(
2416 chunk.payload().data(), kData.data() + 16, chunk.payload().size()),
2417 0);
2418
2419 // Time out three more times. The transfer should terminate.
2420 transfer_thread_.SimulateServerTimeout(9);
2421 ASSERT_EQ(ctx_.total_responses(), 5u);
2422 chunk = DecodeChunk(ctx_.responses().back());
2423 EXPECT_EQ(chunk.session_id(), 9u);
2424 EXPECT_EQ(chunk.offset(), 16u);
2425 ASSERT_EQ(chunk.payload().size(), 16u);
2426 EXPECT_EQ(
2427 std::memcmp(
2428 chunk.payload().data(), kData.data() + 16, chunk.payload().size()),
2429 0);
2430
2431 transfer_thread_.SimulateServerTimeout(9);
2432 ASSERT_EQ(ctx_.total_responses(), 6u);
2433 chunk = DecodeChunk(ctx_.responses().back());
2434 EXPECT_EQ(chunk.session_id(), 9u);
2435 EXPECT_EQ(chunk.offset(), 16u);
2436 ASSERT_EQ(chunk.payload().size(), 16u);
2437 EXPECT_EQ(
2438 std::memcmp(
2439 chunk.payload().data(), kData.data() + 16, chunk.payload().size()),
2440 0);
2441
2442 transfer_thread_.SimulateServerTimeout(9);
2443 ASSERT_EQ(ctx_.total_responses(), 7u);
2444 chunk = DecodeChunk(ctx_.responses().back());
2445 EXPECT_EQ(chunk.status(), Status::DeadlineExceeded());
2446 }
2447
TEST_F(ReadTransferLowMaxRetries,Version2_FailsAfterLifetimeRetryCount)2448 TEST_F(ReadTransferLowMaxRetries, Version2_FailsAfterLifetimeRetryCount) {
2449 ctx_.SendClientStream(
2450 EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
2451 .set_resource_id(9)));
2452
2453 transfer_thread_.WaitUntilEventIsProcessed();
2454
2455 EXPECT_TRUE(handler_.prepare_read_called);
2456 EXPECT_FALSE(handler_.finalize_read_called);
2457
2458 // First, the server responds with a START_ACK, assigning a session ID and
2459 // confirming the protocol version.
2460 ASSERT_EQ(ctx_.total_responses(), 1u);
2461 Chunk chunk = DecodeChunk(ctx_.responses().back());
2462 EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
2463 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
2464 EXPECT_EQ(chunk.session_id(), 1u);
2465 EXPECT_EQ(chunk.resource_id(), 9u);
2466
2467 // Time out twice. Server should retry both times.
2468 transfer_thread_.SimulateServerTimeout(1);
2469 transfer_thread_.SimulateServerTimeout(1);
2470 ASSERT_EQ(ctx_.total_responses(), 3u);
2471 chunk = DecodeChunk(ctx_.responses().back());
2472 EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
2473
2474 // Complete the handshake, allowing the transfer to continue.
2475 ctx_.SendClientStream(EncodeChunk(
2476 Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
2477 .set_session_id(1)
2478 .set_window_end_offset(16)
2479 .set_offset(0)));
2480 transfer_thread_.WaitUntilEventIsProcessed();
2481
2482 ASSERT_EQ(ctx_.total_responses(), 4u);
2483 chunk = DecodeChunk(ctx_.responses().back());
2484 EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2485
2486 // Time out three more times. The transfer should terminate.
2487 transfer_thread_.SimulateServerTimeout(1);
2488 ASSERT_EQ(ctx_.total_responses(), 5u);
2489 chunk = DecodeChunk(ctx_.responses().back());
2490 EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2491
2492 transfer_thread_.SimulateServerTimeout(1);
2493 ASSERT_EQ(ctx_.total_responses(), 6u);
2494 chunk = DecodeChunk(ctx_.responses().back());
2495 EXPECT_EQ(chunk.type(), Chunk::Type::kData);
2496
2497 transfer_thread_.SimulateServerTimeout(1);
2498 ASSERT_EQ(ctx_.total_responses(), 7u);
2499 chunk = DecodeChunk(ctx_.responses().back());
2500 EXPECT_EQ(chunk.type(), Chunk::Type::kCompletion);
2501 EXPECT_EQ(chunk.status(), Status::DeadlineExceeded());
2502 }
2503
2504 } // namespace
2505 } // namespace pw::transfer::test
2506