1#!/usr/bin/env python3 2# Copyright 2023 The Pigweed Authors 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may not 5# use this file except in compliance with the License. You may obtain a copy of 6# the License at 7# 8# https://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations under 14# the License. 15"""Tests for the transfer service client.""" 16 17import enum 18import math 19import os 20import unittest 21from typing import Iterable 22 23from pw_status import Status 24from pw_rpc import callback_client, client, ids, packets 25from pw_rpc.internal import packet_pb2 26 27import pw_transfer 28from pw_transfer import ProtocolVersion 29 30try: 31 from pw_transfer import transfer_pb2 32except ImportError: 33 # For the bazel build, which puts generated protos in a different location. 34 from pigweed.pw_transfer import transfer_pb2 # type: ignore 35 36_TRANSFER_SERVICE_ID = ids.calculate('pw.transfer.Transfer') 37_FIRST_SESSION_ID = 1 38_ARBITRARY_TRANSFER_ID = 66 39 40# If the default timeout is too short, some tests become flaky on Windows. 41DEFAULT_TIMEOUT_S = 0.3 42 43 44class _Method(enum.Enum): 45 READ = ids.calculate('Read') 46 WRITE = ids.calculate('Write') 47 48 49# pylint: disable=missing-function-docstring, missing-class-docstring 50 51 52class TransferManagerTest(unittest.TestCase): 53 # pylint: disable=too-many-public-methods 54 """Tests for the transfer manager.""" 55 56 def setUp(self) -> None: 57 self._client = client.Client.from_modules( 58 callback_client.Impl(), 59 [client.Channel(1, self._handle_request)], 60 (transfer_pb2,), 61 ) 62 self._service = self._client.channel(1).rpcs.pw.transfer.Transfer 63 64 self._sent_chunks: list[transfer_pb2.Chunk] = [] 65 self._packets_to_send: list[list[packet_pb2.RpcPacket]] = [] 66 67 def _enqueue_server_responses( 68 self, method: _Method, responses: Iterable[Iterable[transfer_pb2.Chunk]] 69 ) -> None: 70 for group in responses: 71 serialized_group = [] 72 for response in group: 73 serialized_group.append( 74 packet_pb2.RpcPacket( 75 type=packet_pb2.PacketType.SERVER_STREAM, 76 channel_id=1, 77 service_id=_TRANSFER_SERVICE_ID, 78 method_id=method.value, 79 status=Status.OK.value, 80 payload=response.SerializeToString(), 81 ) 82 ) 83 self._packets_to_send.append(serialized_group) 84 85 def _enqueue_server_error(self, method: _Method, error: Status) -> None: 86 self._packets_to_send.append( 87 [ 88 packet_pb2.RpcPacket( 89 type=packet_pb2.PacketType.SERVER_ERROR, 90 channel_id=1, 91 service_id=_TRANSFER_SERVICE_ID, 92 method_id=method.value, 93 status=error.value, 94 ) 95 ] 96 ) 97 98 def _handle_request(self, data: bytes) -> None: 99 packet = packets.decode(data) 100 if packet.type is not packet_pb2.PacketType.CLIENT_STREAM: 101 return 102 103 chunk = transfer_pb2.Chunk() 104 chunk.MergeFromString(packet.payload) 105 self._sent_chunks.append(chunk) 106 107 if self._packets_to_send: 108 responses = self._packets_to_send.pop(0) 109 for response in responses: 110 response.call_id = packet.call_id 111 self._client.process_packet(response.SerializeToString()) 112 113 def _received_data(self) -> bytearray: 114 data = bytearray() 115 for chunk in self._sent_chunks: 116 data.extend(chunk.data) 117 return data 118 119 def test_read_transfer_basic(self): 120 manager = pw_transfer.Manager( 121 self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S 122 ) 123 124 self._enqueue_server_responses( 125 _Method.READ, 126 ( 127 ( 128 transfer_pb2.Chunk( 129 transfer_id=3, offset=0, data=b'abc', remaining_bytes=0 130 ), 131 ), 132 ), 133 ) 134 135 data = manager.read(3) 136 self.assertEqual(data, b'abc') 137 self.assertEqual(len(self._sent_chunks), 2) 138 self.assertTrue(self._sent_chunks[-1].HasField('status')) 139 self.assertEqual(self._sent_chunks[-1].status, 0) 140 141 def test_read_transfer_multichunk(self) -> None: 142 manager = pw_transfer.Manager( 143 self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S 144 ) 145 146 self._enqueue_server_responses( 147 _Method.READ, 148 ( 149 ( 150 transfer_pb2.Chunk( 151 transfer_id=3, offset=0, data=b'abc', remaining_bytes=3 152 ), 153 transfer_pb2.Chunk( 154 transfer_id=3, offset=3, data=b'def', remaining_bytes=0 155 ), 156 ), 157 ), 158 ) 159 160 data = manager.read(3) 161 self.assertEqual(data, b'abcdef') 162 self.assertEqual(len(self._sent_chunks), 2) 163 self.assertTrue(self._sent_chunks[-1].HasField('status')) 164 self.assertEqual(self._sent_chunks[-1].status, 0) 165 166 def test_read_transfer_progress_callback(self) -> None: 167 manager = pw_transfer.Manager( 168 self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S 169 ) 170 171 self._enqueue_server_responses( 172 _Method.READ, 173 ( 174 ( 175 transfer_pb2.Chunk( 176 transfer_id=3, offset=0, data=b'abc', remaining_bytes=3 177 ), 178 transfer_pb2.Chunk( 179 transfer_id=3, offset=3, data=b'def', remaining_bytes=0 180 ), 181 ), 182 ), 183 ) 184 185 progress: list[pw_transfer.ProgressStats] = [] 186 187 data = manager.read(3, progress.append) 188 self.assertEqual(data, b'abcdef') 189 self.assertEqual(len(self._sent_chunks), 2) 190 self.assertTrue(self._sent_chunks[-1].HasField('status')) 191 self.assertEqual(self._sent_chunks[-1].status, 0) 192 self.assertEqual( 193 progress, 194 [ 195 pw_transfer.ProgressStats(3, 3, 6), 196 pw_transfer.ProgressStats(6, 6, 6), 197 ], 198 ) 199 200 def test_read_transfer_retry_bad_offset(self) -> None: 201 """Server responds with an unexpected offset in a read transfer.""" 202 manager = pw_transfer.Manager( 203 self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S 204 ) 205 206 self._enqueue_server_responses( 207 _Method.READ, 208 ( 209 ( 210 transfer_pb2.Chunk( 211 transfer_id=3, offset=0, data=b'123', remaining_bytes=6 212 ), 213 # Incorrect offset; expecting 3. 214 transfer_pb2.Chunk( 215 transfer_id=3, offset=1, data=b'456', remaining_bytes=3 216 ), 217 ), 218 ( 219 transfer_pb2.Chunk( 220 transfer_id=3, offset=3, data=b'456', remaining_bytes=3 221 ), 222 transfer_pb2.Chunk( 223 transfer_id=3, offset=6, data=b'789', remaining_bytes=0 224 ), 225 ), 226 ), 227 ) 228 229 data = manager.read(3) 230 self.assertEqual(data, b'123456789') 231 232 # Two transfer parameter requests should have been sent. 233 self.assertEqual(len(self._sent_chunks), 3) 234 self.assertTrue(self._sent_chunks[-1].HasField('status')) 235 self.assertEqual(self._sent_chunks[-1].status, 0) 236 237 def test_read_transfer_recovery_sends_parameters_on_retry(self) -> None: 238 """Server sends the same chunk twice (retry) in a read transfer.""" 239 manager = pw_transfer.Manager( 240 self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S 241 ) 242 243 self._enqueue_server_responses( 244 _Method.READ, 245 ( 246 ( 247 # Bad offset, enter recovery state. Only one parameters 248 # chunk should be sent. 249 transfer_pb2.Chunk( 250 transfer_id=3, offset=1, data=b'234', remaining_bytes=5 251 ), 252 transfer_pb2.Chunk( 253 transfer_id=3, offset=4, data=b'567', remaining_bytes=2 254 ), 255 transfer_pb2.Chunk( 256 transfer_id=3, offset=7, data=b'8', remaining_bytes=1 257 ), 258 ), 259 ( 260 # Only one parameters chunk should be sent after the server 261 # retries the same offset twice. 262 transfer_pb2.Chunk( 263 transfer_id=3, offset=1, data=b'234', remaining_bytes=5 264 ), 265 transfer_pb2.Chunk( 266 transfer_id=3, offset=4, data=b'567', remaining_bytes=2 267 ), 268 transfer_pb2.Chunk( 269 transfer_id=3, offset=7, data=b'8', remaining_bytes=1 270 ), 271 transfer_pb2.Chunk( 272 transfer_id=3, offset=7, data=b'8', remaining_bytes=1 273 ), 274 ), 275 ( 276 transfer_pb2.Chunk( 277 transfer_id=3, 278 offset=0, 279 data=b'123456789', 280 remaining_bytes=0, 281 ), 282 ), 283 ), 284 ) 285 286 data = manager.read(3) 287 self.assertEqual(data, b'123456789') 288 289 self.assertEqual(len(self._sent_chunks), 4) 290 self.assertEqual( 291 self._sent_chunks[0].type, transfer_pb2.Chunk.Type.START 292 ) 293 self.assertEqual(self._sent_chunks[0].offset, 0) 294 self.assertEqual( 295 self._sent_chunks[1].type, 296 transfer_pb2.Chunk.Type.PARAMETERS_RETRANSMIT, 297 ) 298 self.assertEqual(self._sent_chunks[1].offset, 0) 299 self.assertEqual( 300 self._sent_chunks[2].type, 301 transfer_pb2.Chunk.Type.PARAMETERS_RETRANSMIT, 302 ) 303 self.assertEqual(self._sent_chunks[2].offset, 0) 304 self.assertEqual( 305 self._sent_chunks[3].type, transfer_pb2.Chunk.Type.COMPLETION 306 ) 307 308 def test_read_transfer_retry_timeout(self) -> None: 309 """Server doesn't respond to read transfer parameters.""" 310 manager = pw_transfer.Manager( 311 self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S 312 ) 313 314 self._enqueue_server_responses( 315 _Method.READ, 316 ( 317 (), # Send nothing in response to the initial parameters. 318 ( 319 transfer_pb2.Chunk( 320 transfer_id=3, offset=0, data=b'xyz', remaining_bytes=0 321 ), 322 ), 323 ), 324 ) 325 326 data = manager.read(3) 327 self.assertEqual(data, b'xyz') 328 329 # Two transfer parameter requests should have been sent. 330 self.assertEqual(len(self._sent_chunks), 3) 331 self.assertTrue(self._sent_chunks[-1].HasField('status')) 332 self.assertEqual(self._sent_chunks[-1].status, 0) 333 334 def test_read_transfer_lifetime_retries(self) -> None: 335 """Server doesn't respond several times during the transfer.""" 336 manager = pw_transfer.Manager( 337 self._service, 338 default_response_timeout_s=DEFAULT_TIMEOUT_S, 339 max_retries=2**32 - 1, 340 max_lifetime_retries=4, 341 ) 342 343 self._enqueue_server_responses( 344 _Method.READ, 345 ( 346 (), # Retry 1 347 (), # Retry 2 348 ( 349 transfer_pb2.Chunk( # Expected chunk. 350 transfer_id=43, offset=0, data=b'xyz' 351 ), 352 ), 353 # Don't send anything else. The maximum lifetime retry count 354 # should be hit. 355 ), 356 ) 357 358 with self.assertRaises(pw_transfer.Error) as context: 359 manager.read(43) 360 361 self.assertEqual(len(self._sent_chunks), 5) 362 363 exception = context.exception 364 self.assertEqual(exception.resource_id, 43) 365 self.assertEqual(exception.status, Status.DEADLINE_EXCEEDED) 366 367 def test_read_transfer_timeout(self) -> None: 368 manager = pw_transfer.Manager( 369 self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S 370 ) 371 372 with self.assertRaises(pw_transfer.Error) as context: 373 manager.read(27) 374 375 exception = context.exception 376 self.assertEqual(exception.resource_id, 27) 377 self.assertEqual(exception.status, Status.DEADLINE_EXCEEDED) 378 379 # The client should have sent four transfer parameters requests: one 380 # initial, and three retries. 381 self.assertEqual(len(self._sent_chunks), 4) 382 383 def test_read_transfer_error(self) -> None: 384 manager = pw_transfer.Manager( 385 self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S 386 ) 387 388 self._enqueue_server_responses( 389 _Method.READ, 390 ( 391 ( 392 transfer_pb2.Chunk( 393 transfer_id=31, status=Status.NOT_FOUND.value 394 ), 395 ), 396 ), 397 ) 398 399 with self.assertRaises(pw_transfer.Error) as context: 400 manager.read(31) 401 402 exception = context.exception 403 self.assertEqual(exception.resource_id, 31) 404 self.assertEqual(exception.status, Status.NOT_FOUND) 405 406 def test_read_transfer_server_error(self) -> None: 407 manager = pw_transfer.Manager( 408 self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S 409 ) 410 411 self._enqueue_server_error(_Method.READ, Status.NOT_FOUND) 412 413 with self.assertRaises(pw_transfer.Error) as context: 414 manager.read(31) 415 416 exception = context.exception 417 self.assertEqual(exception.resource_id, 31) 418 self.assertEqual(exception.status, Status.INTERNAL) 419 420 def test_read_transfer_adaptive_window_slow_start(self) -> None: 421 test_max_chunk_size = 16 422 423 manager = pw_transfer.Manager( 424 self._service, 425 default_response_timeout_s=DEFAULT_TIMEOUT_S, 426 max_chunk_size_bytes=test_max_chunk_size, 427 default_protocol_version=ProtocolVersion.LEGACY, 428 ) 429 430 self._enqueue_server_responses( 431 _Method.READ, 432 ( 433 # First window: 1 chunk. 434 ( 435 transfer_pb2.Chunk( 436 transfer_id=_ARBITRARY_TRANSFER_ID, 437 type=transfer_pb2.Chunk.Type.DATA, 438 offset=0, 439 data=b'#' * test_max_chunk_size, 440 ), 441 ), 442 # Second window: 2 chunks. 443 ( 444 transfer_pb2.Chunk( 445 transfer_id=_ARBITRARY_TRANSFER_ID, 446 type=transfer_pb2.Chunk.Type.DATA, 447 offset=test_max_chunk_size, 448 data=b'#' * test_max_chunk_size, 449 ), 450 transfer_pb2.Chunk( 451 transfer_id=_ARBITRARY_TRANSFER_ID, 452 type=transfer_pb2.Chunk.Type.DATA, 453 offset=2 * test_max_chunk_size, 454 data=b'#' * test_max_chunk_size, 455 ), 456 ), 457 # Third window: finish transfer. 458 ( 459 transfer_pb2.Chunk( 460 transfer_id=_ARBITRARY_TRANSFER_ID, 461 type=transfer_pb2.Chunk.Type.DATA, 462 offset=3 * test_max_chunk_size, 463 data=b'#' * test_max_chunk_size, 464 remaining_bytes=0, 465 ), 466 ), 467 ), 468 ) 469 470 data = manager.read(_ARBITRARY_TRANSFER_ID) 471 472 self.assertEqual( 473 self._sent_chunks, 474 [ 475 # First parameters: 1 chunk window. 476 transfer_pb2.Chunk( 477 type=transfer_pb2.Chunk.Type.START, 478 transfer_id=_ARBITRARY_TRANSFER_ID, 479 resource_id=_ARBITRARY_TRANSFER_ID, 480 pending_bytes=test_max_chunk_size, 481 max_chunk_size_bytes=test_max_chunk_size, 482 window_end_offset=test_max_chunk_size, 483 ), 484 # Second parameters: 2 chunk window. 485 transfer_pb2.Chunk( 486 type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE, 487 transfer_id=_ARBITRARY_TRANSFER_ID, 488 offset=test_max_chunk_size, 489 pending_bytes=2 * test_max_chunk_size, 490 max_chunk_size_bytes=test_max_chunk_size, 491 window_end_offset=( 492 test_max_chunk_size + 2 * test_max_chunk_size 493 ), 494 ), 495 # Third parameters: 4 chunk window. 496 transfer_pb2.Chunk( 497 type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE, 498 transfer_id=_ARBITRARY_TRANSFER_ID, 499 offset=2 * test_max_chunk_size, 500 pending_bytes=4 * test_max_chunk_size, 501 max_chunk_size_bytes=test_max_chunk_size, 502 window_end_offset=( 503 2 * test_max_chunk_size + 4 * test_max_chunk_size 504 ), 505 ), 506 transfer_pb2.Chunk( 507 type=transfer_pb2.Chunk.Type.COMPLETION, 508 transfer_id=_ARBITRARY_TRANSFER_ID, 509 status=Status.OK.value, 510 ), 511 ], 512 ) 513 self.assertEqual(data, b'#' * (4 * test_max_chunk_size)) 514 515 def test_read_transfer_adaptive_window_congestion_avoidance(self) -> None: 516 test_max_chunk_size = 16 517 518 manager = pw_transfer.Manager( 519 self._service, 520 default_response_timeout_s=DEFAULT_TIMEOUT_S, 521 max_chunk_size_bytes=test_max_chunk_size, 522 default_protocol_version=ProtocolVersion.LEGACY, 523 ) 524 525 self._enqueue_server_responses( 526 _Method.READ, 527 ( 528 # First window: 1 chunk. 529 ( 530 transfer_pb2.Chunk( 531 transfer_id=_ARBITRARY_TRANSFER_ID, 532 type=transfer_pb2.Chunk.Type.DATA, 533 offset=0, 534 data=b'#' * test_max_chunk_size, 535 ), 536 ), 537 # Second window: 2 chunks. 538 ( 539 transfer_pb2.Chunk( 540 transfer_id=_ARBITRARY_TRANSFER_ID, 541 type=transfer_pb2.Chunk.Type.DATA, 542 offset=test_max_chunk_size, 543 data=b'#' * test_max_chunk_size, 544 ), 545 transfer_pb2.Chunk( 546 transfer_id=_ARBITRARY_TRANSFER_ID, 547 type=transfer_pb2.Chunk.Type.DATA, 548 offset=2 * test_max_chunk_size, 549 data=b'#' * test_max_chunk_size, 550 ), 551 ), 552 # Third window: send the wrong offset, triggering a 553 # retransmission. 554 ( 555 transfer_pb2.Chunk( 556 transfer_id=_ARBITRARY_TRANSFER_ID, 557 type=transfer_pb2.Chunk.Type.DATA, 558 offset=2 * test_max_chunk_size, 559 data=b'#' * test_max_chunk_size, 560 ), 561 ), 562 # Fourth window: send the expected offset. 563 ( 564 transfer_pb2.Chunk( 565 transfer_id=_ARBITRARY_TRANSFER_ID, 566 type=transfer_pb2.Chunk.Type.DATA, 567 offset=3 * test_max_chunk_size, 568 data=b'#' * test_max_chunk_size, 569 ), 570 transfer_pb2.Chunk( 571 transfer_id=_ARBITRARY_TRANSFER_ID, 572 type=transfer_pb2.Chunk.Type.DATA, 573 offset=4 * test_max_chunk_size, 574 data=b'#' * test_max_chunk_size, 575 ), 576 ), 577 # Fifth window: finish the transfer. 578 ( 579 transfer_pb2.Chunk( 580 transfer_id=_ARBITRARY_TRANSFER_ID, 581 type=transfer_pb2.Chunk.Type.DATA, 582 offset=5 * test_max_chunk_size, 583 data=b'#' * test_max_chunk_size, 584 remaining_bytes=0, 585 ), 586 ), 587 ), 588 ) 589 590 data = manager.read(_ARBITRARY_TRANSFER_ID) 591 592 self.assertEqual( 593 self._sent_chunks, 594 [ 595 # First parameters: 1 chunk window. 596 transfer_pb2.Chunk( 597 type=transfer_pb2.Chunk.Type.START, 598 transfer_id=_ARBITRARY_TRANSFER_ID, 599 resource_id=_ARBITRARY_TRANSFER_ID, 600 pending_bytes=test_max_chunk_size, 601 max_chunk_size_bytes=test_max_chunk_size, 602 window_end_offset=test_max_chunk_size, 603 ), 604 # Second parameters: 2 chunk window. 605 transfer_pb2.Chunk( 606 type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE, 607 transfer_id=_ARBITRARY_TRANSFER_ID, 608 offset=test_max_chunk_size, 609 pending_bytes=2 * test_max_chunk_size, 610 max_chunk_size_bytes=test_max_chunk_size, 611 window_end_offset=( 612 test_max_chunk_size + 2 * test_max_chunk_size 613 ), 614 ), 615 # Third parameters: 4 chunk window. 616 transfer_pb2.Chunk( 617 type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE, 618 transfer_id=_ARBITRARY_TRANSFER_ID, 619 offset=2 * test_max_chunk_size, 620 pending_bytes=4 * test_max_chunk_size, 621 max_chunk_size_bytes=test_max_chunk_size, 622 window_end_offset=( 623 2 * test_max_chunk_size + 4 * test_max_chunk_size 624 ), 625 ), 626 # Fourth parameters: data loss, retransmit and halve window. 627 transfer_pb2.Chunk( 628 type=transfer_pb2.Chunk.Type.PARAMETERS_RETRANSMIT, 629 transfer_id=_ARBITRARY_TRANSFER_ID, 630 offset=3 * test_max_chunk_size, 631 pending_bytes=2 * test_max_chunk_size, 632 max_chunk_size_bytes=test_max_chunk_size, 633 window_end_offset=( 634 3 * test_max_chunk_size + 2 * test_max_chunk_size 635 ), 636 ), 637 # Fifth parameters: in congestion avoidance, window size now 638 # only increases by one chunk instead of doubling. 639 transfer_pb2.Chunk( 640 type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE, 641 transfer_id=_ARBITRARY_TRANSFER_ID, 642 offset=4 * test_max_chunk_size, 643 pending_bytes=3 * test_max_chunk_size, 644 max_chunk_size_bytes=test_max_chunk_size, 645 window_end_offset=( 646 4 * test_max_chunk_size + 3 * test_max_chunk_size 647 ), 648 ), 649 transfer_pb2.Chunk( 650 type=transfer_pb2.Chunk.Type.COMPLETION, 651 transfer_id=_ARBITRARY_TRANSFER_ID, 652 status=Status.OK.value, 653 ), 654 ], 655 ) 656 self.assertEqual(data, b'#' * (6 * test_max_chunk_size)) 657 658 def test_read_transfer_v2_adaptive_window_slow_start(self) -> None: 659 test_max_chunk_size = 16 660 661 manager = pw_transfer.Manager( 662 self._service, 663 default_response_timeout_s=DEFAULT_TIMEOUT_S, 664 max_chunk_size_bytes=test_max_chunk_size, 665 default_protocol_version=ProtocolVersion.VERSION_TWO, 666 ) 667 668 self._enqueue_server_responses( 669 _Method.READ, 670 ( 671 ( 672 transfer_pb2.Chunk( 673 session_id=_FIRST_SESSION_ID, 674 type=transfer_pb2.Chunk.Type.START_ACK, 675 protocol_version=ProtocolVersion.VERSION_TWO.value, 676 ), 677 ), 678 # First window: 1 chunk. 679 ( 680 transfer_pb2.Chunk( 681 session_id=_FIRST_SESSION_ID, 682 type=transfer_pb2.Chunk.Type.DATA, 683 offset=0, 684 data=b'#' * test_max_chunk_size, 685 ), 686 ), 687 # Second window: 2 chunks. 688 ( 689 transfer_pb2.Chunk( 690 session_id=_FIRST_SESSION_ID, 691 type=transfer_pb2.Chunk.Type.DATA, 692 offset=test_max_chunk_size, 693 data=b'#' * test_max_chunk_size, 694 ), 695 transfer_pb2.Chunk( 696 session_id=_FIRST_SESSION_ID, 697 type=transfer_pb2.Chunk.Type.DATA, 698 offset=2 * test_max_chunk_size, 699 data=b'#' * test_max_chunk_size, 700 ), 701 ), 702 # Third window: finish transfer. 703 ( 704 transfer_pb2.Chunk( 705 session_id=_FIRST_SESSION_ID, 706 type=transfer_pb2.Chunk.Type.DATA, 707 offset=3 * test_max_chunk_size, 708 data=b'#' * test_max_chunk_size, 709 remaining_bytes=0, 710 ), 711 ), 712 ( 713 transfer_pb2.Chunk( 714 session_id=_FIRST_SESSION_ID, 715 type=transfer_pb2.Chunk.Type.COMPLETION_ACK, 716 ), 717 ), 718 ), 719 ) 720 721 data = manager.read(_ARBITRARY_TRANSFER_ID) 722 723 self.assertEqual( 724 self._sent_chunks, 725 [ 726 transfer_pb2.Chunk( 727 transfer_id=_ARBITRARY_TRANSFER_ID, 728 resource_id=_ARBITRARY_TRANSFER_ID, 729 desired_session_id=_FIRST_SESSION_ID, 730 pending_bytes=test_max_chunk_size, 731 max_chunk_size_bytes=test_max_chunk_size, 732 window_end_offset=test_max_chunk_size, 733 type=transfer_pb2.Chunk.Type.START, 734 protocol_version=ProtocolVersion.VERSION_TWO.value, 735 ), 736 # First parameters: 1 chunk window. 737 transfer_pb2.Chunk( 738 session_id=_FIRST_SESSION_ID, 739 type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION, 740 offset=0, 741 max_chunk_size_bytes=test_max_chunk_size, 742 window_end_offset=test_max_chunk_size, 743 protocol_version=ProtocolVersion.VERSION_TWO.value, 744 ), 745 # Second parameters: 2 chunk window. 746 transfer_pb2.Chunk( 747 session_id=_FIRST_SESSION_ID, 748 type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE, 749 offset=test_max_chunk_size, 750 max_chunk_size_bytes=test_max_chunk_size, 751 window_end_offset=( 752 test_max_chunk_size + 2 * test_max_chunk_size 753 ), 754 ), 755 # Third parameters: 4 chunk window. 756 transfer_pb2.Chunk( 757 session_id=_FIRST_SESSION_ID, 758 type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE, 759 offset=2 * test_max_chunk_size, 760 max_chunk_size_bytes=test_max_chunk_size, 761 window_end_offset=( 762 2 * test_max_chunk_size + 4 * test_max_chunk_size 763 ), 764 ), 765 transfer_pb2.Chunk( 766 session_id=_FIRST_SESSION_ID, 767 type=transfer_pb2.Chunk.Type.COMPLETION, 768 status=Status.OK.value, 769 ), 770 ], 771 ) 772 self.assertEqual(data, b'#' * (4 * test_max_chunk_size)) 773 774 def test_read_transfer_v2_adaptive_window_congestion_avoidance( 775 self, 776 ) -> None: 777 test_max_chunk_size = 16 778 779 manager = pw_transfer.Manager( 780 self._service, 781 default_response_timeout_s=DEFAULT_TIMEOUT_S, 782 max_chunk_size_bytes=test_max_chunk_size, 783 default_protocol_version=ProtocolVersion.VERSION_TWO, 784 ) 785 786 self._enqueue_server_responses( 787 _Method.READ, 788 ( 789 ( 790 transfer_pb2.Chunk( 791 session_id=_FIRST_SESSION_ID, 792 type=transfer_pb2.Chunk.Type.START_ACK, 793 protocol_version=ProtocolVersion.VERSION_TWO.value, 794 ), 795 ), 796 # First window: 1 chunk. 797 ( 798 transfer_pb2.Chunk( 799 session_id=_FIRST_SESSION_ID, 800 type=transfer_pb2.Chunk.Type.DATA, 801 offset=0, 802 data=b'#' * test_max_chunk_size, 803 ), 804 ), 805 # Second window: 2 chunks. 806 ( 807 transfer_pb2.Chunk( 808 session_id=_FIRST_SESSION_ID, 809 type=transfer_pb2.Chunk.Type.DATA, 810 offset=test_max_chunk_size, 811 data=b'#' * test_max_chunk_size, 812 ), 813 transfer_pb2.Chunk( 814 session_id=_FIRST_SESSION_ID, 815 type=transfer_pb2.Chunk.Type.DATA, 816 offset=2 * test_max_chunk_size, 817 data=b'#' * test_max_chunk_size, 818 ), 819 ), 820 # Third window: send the wrong offset, triggering a 821 # retransmission. 822 ( 823 transfer_pb2.Chunk( 824 session_id=_FIRST_SESSION_ID, 825 type=transfer_pb2.Chunk.Type.DATA, 826 offset=2 * test_max_chunk_size, 827 data=b'#' * test_max_chunk_size, 828 ), 829 ), 830 # Fourth window: send the expected offset. 831 ( 832 transfer_pb2.Chunk( 833 session_id=_FIRST_SESSION_ID, 834 type=transfer_pb2.Chunk.Type.DATA, 835 offset=3 * test_max_chunk_size, 836 data=b'#' * test_max_chunk_size, 837 ), 838 transfer_pb2.Chunk( 839 session_id=_FIRST_SESSION_ID, 840 type=transfer_pb2.Chunk.Type.DATA, 841 offset=4 * test_max_chunk_size, 842 data=b'#' * test_max_chunk_size, 843 ), 844 ), 845 # Fifth window: finish the transfer. 846 ( 847 transfer_pb2.Chunk( 848 session_id=_FIRST_SESSION_ID, 849 type=transfer_pb2.Chunk.Type.DATA, 850 offset=5 * test_max_chunk_size, 851 data=b'#' * test_max_chunk_size, 852 remaining_bytes=0, 853 ), 854 ), 855 ( 856 transfer_pb2.Chunk( 857 session_id=_FIRST_SESSION_ID, 858 type=transfer_pb2.Chunk.Type.COMPLETION_ACK, 859 ), 860 ), 861 ), 862 ) 863 864 data = manager.read(_ARBITRARY_TRANSFER_ID) 865 866 self.assertEqual( 867 self._sent_chunks, 868 [ 869 transfer_pb2.Chunk( 870 type=transfer_pb2.Chunk.Type.START, 871 transfer_id=_ARBITRARY_TRANSFER_ID, 872 resource_id=_ARBITRARY_TRANSFER_ID, 873 desired_session_id=_FIRST_SESSION_ID, 874 pending_bytes=test_max_chunk_size, 875 max_chunk_size_bytes=test_max_chunk_size, 876 window_end_offset=test_max_chunk_size, 877 protocol_version=ProtocolVersion.VERSION_TWO.value, 878 ), 879 # First parameters: 1 chunk window. 880 transfer_pb2.Chunk( 881 session_id=_FIRST_SESSION_ID, 882 type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION, 883 offset=0, 884 max_chunk_size_bytes=test_max_chunk_size, 885 window_end_offset=test_max_chunk_size, 886 protocol_version=ProtocolVersion.VERSION_TWO.value, 887 ), 888 # Second parameters: 2 chunk window. 889 transfer_pb2.Chunk( 890 type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE, 891 session_id=_FIRST_SESSION_ID, 892 offset=test_max_chunk_size, 893 max_chunk_size_bytes=test_max_chunk_size, 894 window_end_offset=( 895 test_max_chunk_size + 2 * test_max_chunk_size 896 ), 897 ), 898 # Third parameters: 4 chunk window. 899 transfer_pb2.Chunk( 900 type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE, 901 session_id=_FIRST_SESSION_ID, 902 offset=2 * test_max_chunk_size, 903 max_chunk_size_bytes=test_max_chunk_size, 904 window_end_offset=( 905 2 * test_max_chunk_size + 4 * test_max_chunk_size 906 ), 907 ), 908 # Fourth parameters: data loss, retransmit and halve window. 909 transfer_pb2.Chunk( 910 type=transfer_pb2.Chunk.Type.PARAMETERS_RETRANSMIT, 911 session_id=_FIRST_SESSION_ID, 912 offset=3 * test_max_chunk_size, 913 max_chunk_size_bytes=test_max_chunk_size, 914 window_end_offset=( 915 3 * test_max_chunk_size + 2 * test_max_chunk_size 916 ), 917 ), 918 # Fifth parameters: in congestion avoidance, window size now 919 # only increases by one chunk instead of doubling. 920 transfer_pb2.Chunk( 921 type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE, 922 session_id=_FIRST_SESSION_ID, 923 offset=4 * test_max_chunk_size, 924 max_chunk_size_bytes=test_max_chunk_size, 925 window_end_offset=( 926 4 * test_max_chunk_size + 3 * test_max_chunk_size 927 ), 928 ), 929 transfer_pb2.Chunk( 930 type=transfer_pb2.Chunk.Type.COMPLETION, 931 session_id=_FIRST_SESSION_ID, 932 status=Status.OK.value, 933 ), 934 ], 935 ) 936 self.assertEqual(data, b'#' * (6 * test_max_chunk_size)) 937 938 def test_write_transfer_basic(self) -> None: 939 manager = pw_transfer.Manager( 940 self._service, 941 default_response_timeout_s=DEFAULT_TIMEOUT_S, 942 ) 943 944 self._enqueue_server_responses( 945 _Method.WRITE, 946 ( 947 ( 948 transfer_pb2.Chunk( 949 transfer_id=4, 950 offset=0, 951 pending_bytes=32, 952 max_chunk_size_bytes=8, 953 ), 954 ), 955 (transfer_pb2.Chunk(transfer_id=4, status=Status.OK.value),), 956 ), 957 ) 958 959 manager.write(4, b'hello') 960 self.assertEqual(len(self._sent_chunks), 2) 961 self.assertEqual(self._received_data(), b'hello') 962 963 def test_write_transfer_max_chunk_size(self) -> None: 964 manager = pw_transfer.Manager( 965 self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S 966 ) 967 968 self._enqueue_server_responses( 969 _Method.WRITE, 970 ( 971 ( 972 transfer_pb2.Chunk( 973 transfer_id=4, 974 offset=0, 975 pending_bytes=32, 976 max_chunk_size_bytes=8, 977 ), 978 ), 979 (), 980 (transfer_pb2.Chunk(transfer_id=4, status=Status.OK.value),), 981 ), 982 ) 983 984 manager.write(4, b'hello world') 985 self.assertEqual(len(self._sent_chunks), 3) 986 self.assertEqual(self._received_data(), b'hello world') 987 self.assertEqual(self._sent_chunks[1].data, b'hello wo') 988 self.assertEqual(self._sent_chunks[2].data, b'rld') 989 990 def test_write_transfer_multiple_parameters(self) -> None: 991 manager = pw_transfer.Manager( 992 self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S 993 ) 994 995 self._enqueue_server_responses( 996 _Method.WRITE, 997 ( 998 ( 999 transfer_pb2.Chunk( 1000 transfer_id=4, 1001 offset=0, 1002 pending_bytes=8, 1003 max_chunk_size_bytes=8, 1004 ), 1005 ), 1006 ( 1007 transfer_pb2.Chunk( 1008 transfer_id=4, 1009 offset=8, 1010 pending_bytes=8, 1011 max_chunk_size_bytes=8, 1012 ), 1013 ), 1014 (transfer_pb2.Chunk(transfer_id=4, status=Status.OK.value),), 1015 ), 1016 ) 1017 1018 manager.write(4, b'data to write') 1019 self.assertEqual(len(self._sent_chunks), 3) 1020 self.assertEqual(self._received_data(), b'data to write') 1021 self.assertEqual(self._sent_chunks[1].data, b'data to ') 1022 self.assertEqual(self._sent_chunks[2].data, b'write') 1023 1024 def test_write_transfer_progress_callback(self) -> None: 1025 manager = pw_transfer.Manager( 1026 self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S 1027 ) 1028 1029 self._enqueue_server_responses( 1030 _Method.WRITE, 1031 ( 1032 ( 1033 transfer_pb2.Chunk( 1034 transfer_id=4, 1035 offset=0, 1036 pending_bytes=8, 1037 max_chunk_size_bytes=8, 1038 ), 1039 ), 1040 ( 1041 transfer_pb2.Chunk( 1042 transfer_id=4, 1043 offset=8, 1044 pending_bytes=8, 1045 max_chunk_size_bytes=8, 1046 ), 1047 ), 1048 (transfer_pb2.Chunk(transfer_id=4, status=Status.OK.value),), 1049 ), 1050 ) 1051 1052 progress: list[pw_transfer.ProgressStats] = [] 1053 1054 manager.write(4, b'data to write', progress.append) 1055 self.assertEqual(len(self._sent_chunks), 3) 1056 self.assertEqual(self._received_data(), b'data to write') 1057 self.assertEqual(self._sent_chunks[1].data, b'data to ') 1058 self.assertEqual(self._sent_chunks[2].data, b'write') 1059 self.assertEqual( 1060 progress, 1061 [ 1062 pw_transfer.ProgressStats(8, 0, 13), 1063 pw_transfer.ProgressStats(13, 8, 13), 1064 pw_transfer.ProgressStats(13, 13, 13), 1065 ], 1066 ) 1067 1068 def test_write_transfer_rewind(self) -> None: 1069 """Write transfer in which the server re-requests an earlier offset.""" 1070 manager = pw_transfer.Manager( 1071 self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S 1072 ) 1073 1074 self._enqueue_server_responses( 1075 _Method.WRITE, 1076 ( 1077 ( 1078 transfer_pb2.Chunk( 1079 transfer_id=4, 1080 offset=0, 1081 pending_bytes=8, 1082 max_chunk_size_bytes=8, 1083 ), 1084 ), 1085 ( 1086 transfer_pb2.Chunk( 1087 transfer_id=4, 1088 offset=8, 1089 pending_bytes=8, 1090 max_chunk_size_bytes=8, 1091 ), 1092 ), 1093 ( 1094 transfer_pb2.Chunk( 1095 transfer_id=4, 1096 offset=4, # rewind 1097 pending_bytes=8, 1098 max_chunk_size_bytes=8, 1099 ), 1100 ), 1101 ( 1102 transfer_pb2.Chunk( 1103 transfer_id=4, 1104 offset=12, 1105 pending_bytes=16, # update max size 1106 max_chunk_size_bytes=16, 1107 ), 1108 ), 1109 (transfer_pb2.Chunk(transfer_id=4, status=Status.OK.value),), 1110 ), 1111 ) 1112 1113 manager.write(4, b'pigweed data transfer') 1114 self.assertEqual(len(self._sent_chunks), 5) 1115 self.assertEqual(self._sent_chunks[1].data, b'pigweed ') 1116 self.assertEqual(self._sent_chunks[2].data, b'data tra') 1117 self.assertEqual(self._sent_chunks[3].data, b'eed data') 1118 self.assertEqual(self._sent_chunks[4].data, b' transfer') 1119 1120 def test_write_transfer_bad_offset(self) -> None: 1121 manager = pw_transfer.Manager( 1122 self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S 1123 ) 1124 1125 self._enqueue_server_responses( 1126 _Method.WRITE, 1127 ( 1128 ( 1129 transfer_pb2.Chunk( 1130 transfer_id=4, 1131 offset=0, 1132 pending_bytes=8, 1133 max_chunk_size_bytes=8, 1134 ), 1135 ), 1136 ( 1137 transfer_pb2.Chunk( 1138 transfer_id=4, 1139 offset=100, # larger offset than data 1140 pending_bytes=8, 1141 max_chunk_size_bytes=8, 1142 ), 1143 ), 1144 (transfer_pb2.Chunk(transfer_id=4, status=Status.OK.value),), 1145 ), 1146 ) 1147 1148 with self.assertRaises(pw_transfer.Error) as context: 1149 manager.write(4, b'small data') 1150 1151 exception = context.exception 1152 self.assertEqual(exception.resource_id, 4) 1153 self.assertEqual(exception.status, Status.OUT_OF_RANGE) 1154 1155 def test_write_transfer_error(self) -> None: 1156 manager = pw_transfer.Manager( 1157 self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S 1158 ) 1159 1160 self._enqueue_server_responses( 1161 _Method.WRITE, 1162 ( 1163 ( 1164 transfer_pb2.Chunk( 1165 transfer_id=21, status=Status.UNAVAILABLE.value 1166 ), 1167 ), 1168 ), 1169 ) 1170 1171 with self.assertRaises(pw_transfer.Error) as context: 1172 manager.write(21, b'no write') 1173 1174 exception = context.exception 1175 self.assertEqual(exception.resource_id, 21) 1176 self.assertEqual(exception.status, Status.UNAVAILABLE) 1177 1178 def test_write_transfer_server_error(self) -> None: 1179 manager = pw_transfer.Manager( 1180 self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S 1181 ) 1182 1183 self._enqueue_server_error(_Method.WRITE, Status.NOT_FOUND) 1184 1185 with self.assertRaises(pw_transfer.Error) as context: 1186 manager.write(21, b'server error') 1187 1188 exception = context.exception 1189 self.assertEqual(exception.resource_id, 21) 1190 self.assertEqual(exception.status, Status.INTERNAL) 1191 1192 def test_write_transfer_timeout_after_initial_chunk(self) -> None: 1193 manager = pw_transfer.Manager( 1194 self._service, 1195 default_response_timeout_s=0.001, 1196 max_retries=2, 1197 default_protocol_version=ProtocolVersion.LEGACY, 1198 ) 1199 1200 with self.assertRaises(pw_transfer.Error) as context: 1201 manager.write(22, b'no server response!') 1202 1203 self.assertEqual( 1204 self._sent_chunks, 1205 [ 1206 transfer_pb2.Chunk( 1207 transfer_id=22, 1208 resource_id=22, 1209 type=transfer_pb2.Chunk.Type.START, 1210 ), # initial chunk 1211 transfer_pb2.Chunk( 1212 transfer_id=22, 1213 resource_id=22, 1214 type=transfer_pb2.Chunk.Type.START, 1215 ), # retry 1 1216 transfer_pb2.Chunk( 1217 transfer_id=22, 1218 resource_id=22, 1219 type=transfer_pb2.Chunk.Type.START, 1220 ), # retry 2 1221 ], 1222 ) 1223 1224 exception = context.exception 1225 self.assertEqual(exception.resource_id, 22) 1226 self.assertEqual(exception.status, Status.DEADLINE_EXCEEDED) 1227 1228 def test_write_transfer_timeout_after_intermediate_chunk(self) -> None: 1229 """Tests write transfers that timeout after the initial chunk.""" 1230 manager = pw_transfer.Manager( 1231 self._service, 1232 default_response_timeout_s=DEFAULT_TIMEOUT_S, 1233 max_retries=2, 1234 default_protocol_version=ProtocolVersion.LEGACY, 1235 ) 1236 1237 self._enqueue_server_responses( 1238 _Method.WRITE, 1239 [ 1240 [ 1241 transfer_pb2.Chunk( 1242 transfer_id=22, pending_bytes=10, max_chunk_size_bytes=5 1243 ) 1244 ] 1245 ], 1246 ) 1247 1248 with self.assertRaises(pw_transfer.Error) as context: 1249 manager.write(22, b'0123456789') 1250 1251 last_data_chunk = transfer_pb2.Chunk( 1252 transfer_id=22, 1253 data=b'56789', 1254 offset=5, 1255 remaining_bytes=0, 1256 type=transfer_pb2.Chunk.Type.DATA, 1257 ) 1258 1259 self.assertEqual( 1260 self._sent_chunks, 1261 [ 1262 transfer_pb2.Chunk( 1263 transfer_id=22, 1264 resource_id=22, 1265 type=transfer_pb2.Chunk.Type.START, 1266 ), 1267 transfer_pb2.Chunk( 1268 transfer_id=22, 1269 data=b'01234', 1270 type=transfer_pb2.Chunk.Type.DATA, 1271 ), 1272 last_data_chunk, # last chunk 1273 last_data_chunk, # retry 1 1274 last_data_chunk, # retry 2 1275 ], 1276 ) 1277 1278 exception = context.exception 1279 self.assertEqual(exception.resource_id, 22) 1280 self.assertEqual(exception.status, Status.DEADLINE_EXCEEDED) 1281 1282 def test_write_zero_pending_bytes_is_internal_error(self) -> None: 1283 manager = pw_transfer.Manager( 1284 self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S 1285 ) 1286 1287 self._enqueue_server_responses( 1288 _Method.WRITE, 1289 ((transfer_pb2.Chunk(transfer_id=23, pending_bytes=0),),), 1290 ) 1291 1292 with self.assertRaises(pw_transfer.Error) as context: 1293 manager.write(23, b'no write') 1294 1295 exception = context.exception 1296 self.assertEqual(exception.resource_id, 23) 1297 self.assertEqual(exception.status, Status.INTERNAL) 1298 1299 def test_v2_read_transfer_basic(self) -> None: 1300 """Tests a simple protocol version 2 read transfer.""" 1301 manager = pw_transfer.Manager( 1302 self._service, 1303 default_response_timeout_s=DEFAULT_TIMEOUT_S, 1304 default_protocol_version=ProtocolVersion.VERSION_TWO, 1305 ) 1306 1307 self._enqueue_server_responses( 1308 _Method.READ, 1309 ( 1310 ( 1311 transfer_pb2.Chunk( 1312 resource_id=39, 1313 session_id=_FIRST_SESSION_ID, 1314 type=transfer_pb2.Chunk.Type.START_ACK, 1315 protocol_version=ProtocolVersion.VERSION_TWO.value, 1316 ), 1317 ), 1318 ( 1319 transfer_pb2.Chunk( 1320 session_id=_FIRST_SESSION_ID, 1321 type=transfer_pb2.Chunk.Type.DATA, 1322 offset=0, 1323 data=b'version two', 1324 remaining_bytes=0, 1325 ), 1326 ), 1327 ( 1328 transfer_pb2.Chunk( 1329 session_id=_FIRST_SESSION_ID, 1330 type=transfer_pb2.Chunk.Type.COMPLETION_ACK, 1331 ), 1332 ), 1333 ), 1334 ) 1335 1336 data = manager.read(39) 1337 1338 self.assertEqual( 1339 self._sent_chunks, 1340 [ 1341 transfer_pb2.Chunk( 1342 transfer_id=39, 1343 resource_id=39, 1344 desired_session_id=_FIRST_SESSION_ID, 1345 pending_bytes=1024, 1346 max_chunk_size_bytes=1024, 1347 window_end_offset=1024, 1348 type=transfer_pb2.Chunk.Type.START, 1349 protocol_version=ProtocolVersion.VERSION_TWO.value, 1350 ), 1351 transfer_pb2.Chunk( 1352 session_id=_FIRST_SESSION_ID, 1353 type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION, 1354 max_chunk_size_bytes=1024, 1355 window_end_offset=1024, 1356 # pending_bytes should no longer exist as server and client 1357 # have agreed on v2. 1358 protocol_version=ProtocolVersion.VERSION_TWO.value, 1359 ), 1360 transfer_pb2.Chunk( 1361 session_id=_FIRST_SESSION_ID, 1362 type=transfer_pb2.Chunk.Type.COMPLETION, 1363 status=Status.OK.value, 1364 ), 1365 ], 1366 ) 1367 1368 self.assertEqual(data, b'version two') 1369 1370 def test_v2_read_transfer_legacy_fallback(self) -> None: 1371 """Tests a v2 read transfer when the server only supports legacy.""" 1372 manager = pw_transfer.Manager( 1373 self._service, 1374 default_response_timeout_s=DEFAULT_TIMEOUT_S, 1375 default_protocol_version=ProtocolVersion.VERSION_TWO, 1376 ) 1377 1378 # Respond to the START chunk with a legacy data transfer chunk instead 1379 # of a START_ACK. 1380 self._enqueue_server_responses( 1381 _Method.READ, 1382 ( 1383 ( 1384 transfer_pb2.Chunk( 1385 transfer_id=40, 1386 type=transfer_pb2.Chunk.Type.DATA, 1387 offset=0, 1388 data=b'sorry, legacy only', 1389 remaining_bytes=0, 1390 ), 1391 ), 1392 ), 1393 ) 1394 1395 data = manager.read(40) 1396 1397 self.assertEqual( 1398 self._sent_chunks, 1399 [ 1400 transfer_pb2.Chunk( 1401 transfer_id=40, 1402 resource_id=40, 1403 desired_session_id=_FIRST_SESSION_ID, 1404 pending_bytes=1024, 1405 max_chunk_size_bytes=1024, 1406 window_end_offset=1024, 1407 type=transfer_pb2.Chunk.Type.START, 1408 protocol_version=ProtocolVersion.VERSION_TWO.value, 1409 ), 1410 transfer_pb2.Chunk( 1411 transfer_id=40, 1412 type=transfer_pb2.Chunk.Type.COMPLETION, 1413 status=Status.OK.value, 1414 ), 1415 ], 1416 ) 1417 1418 self.assertEqual(data, b'sorry, legacy only') 1419 1420 def test_v2_write_transfer_basic(self) -> None: 1421 """Tests a simple protocol version 2 write transfer.""" 1422 manager = pw_transfer.Manager( 1423 self._service, 1424 default_response_timeout_s=DEFAULT_TIMEOUT_S, 1425 default_protocol_version=ProtocolVersion.VERSION_TWO, 1426 ) 1427 1428 self._enqueue_server_responses( 1429 _Method.WRITE, 1430 ( 1431 ( 1432 transfer_pb2.Chunk( 1433 resource_id=72, 1434 session_id=_FIRST_SESSION_ID, 1435 type=transfer_pb2.Chunk.Type.START_ACK, 1436 protocol_version=ProtocolVersion.VERSION_TWO.value, 1437 ), 1438 ), 1439 ( 1440 transfer_pb2.Chunk( 1441 session_id=_FIRST_SESSION_ID, 1442 type=transfer_pb2.Chunk.Type.PARAMETERS_RETRANSMIT, 1443 offset=0, 1444 window_end_offset=32, 1445 max_chunk_size_bytes=8, 1446 ), 1447 ), 1448 (), # In response to the first data chunk. 1449 ( 1450 transfer_pb2.Chunk( 1451 session_id=_FIRST_SESSION_ID, 1452 type=transfer_pb2.Chunk.Type.COMPLETION, 1453 status=Status.OK.value, 1454 ), 1455 ), 1456 ), 1457 ) 1458 1459 manager.write(72, b'write version 2') 1460 1461 self.assertEqual( 1462 self._sent_chunks, 1463 [ 1464 transfer_pb2.Chunk( 1465 transfer_id=72, 1466 resource_id=72, 1467 desired_session_id=_FIRST_SESSION_ID, 1468 type=transfer_pb2.Chunk.Type.START, 1469 protocol_version=ProtocolVersion.VERSION_TWO.value, 1470 ), 1471 transfer_pb2.Chunk( 1472 session_id=_FIRST_SESSION_ID, 1473 type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION, 1474 protocol_version=ProtocolVersion.VERSION_TWO.value, 1475 ), 1476 transfer_pb2.Chunk( 1477 session_id=_FIRST_SESSION_ID, 1478 type=transfer_pb2.Chunk.Type.DATA, 1479 offset=0, 1480 data=b'write ve', 1481 ), 1482 transfer_pb2.Chunk( 1483 session_id=_FIRST_SESSION_ID, 1484 type=transfer_pb2.Chunk.Type.DATA, 1485 offset=8, 1486 data=b'rsion 2', 1487 remaining_bytes=0, 1488 ), 1489 transfer_pb2.Chunk( 1490 session_id=_FIRST_SESSION_ID, 1491 type=transfer_pb2.Chunk.Type.COMPLETION_ACK, 1492 ), 1493 ], 1494 ) 1495 1496 self.assertEqual(self._received_data(), b'write version 2') 1497 1498 def test_v2_write_transfer_legacy_fallback(self) -> None: 1499 """Tests a v2 write transfer when the server only supports legacy.""" 1500 manager = pw_transfer.Manager( 1501 self._service, 1502 default_response_timeout_s=DEFAULT_TIMEOUT_S, 1503 default_protocol_version=ProtocolVersion.VERSION_TWO, 1504 ) 1505 1506 self._enqueue_server_responses( 1507 _Method.WRITE, 1508 ( 1509 # Send a parameters chunk immediately per the legacy protocol. 1510 ( 1511 transfer_pb2.Chunk( 1512 transfer_id=76, 1513 type=transfer_pb2.Chunk.Type.PARAMETERS_RETRANSMIT, 1514 offset=0, 1515 pending_bytes=32, 1516 window_end_offset=32, 1517 max_chunk_size_bytes=8, 1518 ), 1519 ), 1520 (), # In response to the first data chunk. 1521 ( 1522 transfer_pb2.Chunk( 1523 transfer_id=76, 1524 type=transfer_pb2.Chunk.Type.COMPLETION, 1525 status=Status.OK.value, 1526 ), 1527 ), 1528 ), 1529 ) 1530 1531 manager.write(76, b'write v... NOPE') 1532 1533 self.assertEqual( 1534 self._sent_chunks, 1535 [ 1536 transfer_pb2.Chunk( 1537 transfer_id=76, 1538 resource_id=76, 1539 desired_session_id=_FIRST_SESSION_ID, 1540 type=transfer_pb2.Chunk.Type.START, 1541 protocol_version=ProtocolVersion.VERSION_TWO.value, 1542 ), 1543 transfer_pb2.Chunk( 1544 transfer_id=76, 1545 type=transfer_pb2.Chunk.Type.DATA, 1546 offset=0, 1547 data=b'write v.', 1548 ), 1549 transfer_pb2.Chunk( 1550 transfer_id=76, 1551 type=transfer_pb2.Chunk.Type.DATA, 1552 offset=8, 1553 data=b'.. NOPE', 1554 remaining_bytes=0, 1555 ), 1556 ], 1557 ) 1558 1559 self.assertEqual(self._received_data(), b'write v... NOPE') 1560 1561 def test_v2_server_error(self) -> None: 1562 """Tests a server error occurring during the opening handshake.""" 1563 1564 manager = pw_transfer.Manager( 1565 self._service, 1566 default_response_timeout_s=DEFAULT_TIMEOUT_S, 1567 default_protocol_version=ProtocolVersion.VERSION_TWO, 1568 ) 1569 1570 self._enqueue_server_responses( 1571 _Method.READ, 1572 ( 1573 ( 1574 transfer_pb2.Chunk( 1575 resource_id=43, 1576 session_id=_FIRST_SESSION_ID, 1577 type=transfer_pb2.Chunk.Type.START_ACK, 1578 protocol_version=ProtocolVersion.VERSION_TWO.value, 1579 ), 1580 ), 1581 ( 1582 transfer_pb2.Chunk( 1583 session_id=_FIRST_SESSION_ID, 1584 type=transfer_pb2.Chunk.Type.COMPLETION, 1585 status=Status.DATA_LOSS.value, 1586 ), 1587 ), 1588 ), 1589 ) 1590 1591 with self.assertRaises(pw_transfer.Error) as context: 1592 manager.read(43) 1593 1594 self.assertEqual( 1595 self._sent_chunks, 1596 [ 1597 transfer_pb2.Chunk( 1598 transfer_id=43, 1599 desired_session_id=_FIRST_SESSION_ID, 1600 resource_id=43, 1601 pending_bytes=1024, 1602 max_chunk_size_bytes=1024, 1603 window_end_offset=1024, 1604 type=transfer_pb2.Chunk.Type.START, 1605 protocol_version=ProtocolVersion.VERSION_TWO.value, 1606 ), 1607 transfer_pb2.Chunk( 1608 session_id=_FIRST_SESSION_ID, 1609 type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION, 1610 max_chunk_size_bytes=1024, 1611 window_end_offset=1024, 1612 protocol_version=ProtocolVersion.VERSION_TWO.value, 1613 ), 1614 # Client sends a COMPLETION_ACK in response to the server. 1615 transfer_pb2.Chunk( 1616 session_id=_FIRST_SESSION_ID, 1617 type=transfer_pb2.Chunk.Type.COMPLETION_ACK, 1618 ), 1619 ], 1620 ) 1621 1622 exception = context.exception 1623 self.assertEqual(exception.resource_id, 43) 1624 self.assertEqual(exception.status, Status.DATA_LOSS) 1625 1626 def test_v2_timeout_during_opening_handshake(self) -> None: 1627 """Tests a timeout occurring during the opening handshake.""" 1628 manager = pw_transfer.Manager( 1629 self._service, 1630 default_response_timeout_s=DEFAULT_TIMEOUT_S, 1631 default_protocol_version=ProtocolVersion.VERSION_TWO, 1632 ) 1633 1634 # Don't enqueue any server responses. 1635 1636 with self.assertRaises(pw_transfer.Error) as context: 1637 manager.read(41) 1638 1639 start_chunk = transfer_pb2.Chunk( 1640 transfer_id=41, 1641 resource_id=41, 1642 desired_session_id=_FIRST_SESSION_ID, 1643 pending_bytes=1024, 1644 max_chunk_size_bytes=1024, 1645 window_end_offset=1024, 1646 type=transfer_pb2.Chunk.Type.START, 1647 protocol_version=ProtocolVersion.VERSION_TWO.value, 1648 ) 1649 1650 # The opening chunk should be sent initially, then retried three times. 1651 self.assertEqual(self._sent_chunks, [start_chunk] * 4) 1652 1653 exception = context.exception 1654 self.assertEqual(exception.resource_id, 41) 1655 self.assertEqual(exception.status, Status.DEADLINE_EXCEEDED) 1656 1657 def test_v2_timeout_recovery_during_opening_handshake(self) -> None: 1658 """Tests a timeout during the opening handshake which recovers.""" 1659 manager = pw_transfer.Manager( 1660 self._service, 1661 default_response_timeout_s=DEFAULT_TIMEOUT_S, 1662 default_protocol_version=ProtocolVersion.VERSION_TWO, 1663 ) 1664 1665 self._enqueue_server_responses( 1666 _Method.WRITE, 1667 ( 1668 ( 1669 transfer_pb2.Chunk( 1670 resource_id=73, 1671 session_id=_FIRST_SESSION_ID, 1672 type=transfer_pb2.Chunk.Type.START_ACK, 1673 protocol_version=ProtocolVersion.VERSION_TWO.value, 1674 ), 1675 ), 1676 (), # Don't respond to the START_ACK_CONFIRMATION. 1677 (), # Don't respond to the first START_ACK_CONFIRMATION retry. 1678 ( 1679 transfer_pb2.Chunk( 1680 session_id=_FIRST_SESSION_ID, 1681 type=transfer_pb2.Chunk.Type.PARAMETERS_RETRANSMIT, 1682 offset=0, 1683 window_end_offset=32, 1684 max_chunk_size_bytes=8, 1685 ), 1686 ), 1687 (), # In response to the first data chunk. 1688 ( 1689 transfer_pb2.Chunk( 1690 session_id=_FIRST_SESSION_ID, 1691 type=transfer_pb2.Chunk.Type.COMPLETION, 1692 status=Status.OK.value, 1693 ), 1694 ), 1695 ), 1696 ) 1697 1698 manager.write(73, b'write timeout 2') 1699 1700 start_ack_confirmation = transfer_pb2.Chunk( 1701 session_id=_FIRST_SESSION_ID, 1702 type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION, 1703 protocol_version=ProtocolVersion.VERSION_TWO.value, 1704 ) 1705 1706 self.assertEqual( 1707 self._sent_chunks, 1708 [ 1709 transfer_pb2.Chunk( 1710 transfer_id=73, 1711 resource_id=73, 1712 desired_session_id=_FIRST_SESSION_ID, 1713 type=transfer_pb2.Chunk.Type.START, 1714 protocol_version=ProtocolVersion.VERSION_TWO.value, 1715 ), 1716 start_ack_confirmation, # Initial transmission 1717 start_ack_confirmation, # Retry 1 1718 start_ack_confirmation, # Retry 2 1719 transfer_pb2.Chunk( 1720 session_id=_FIRST_SESSION_ID, 1721 type=transfer_pb2.Chunk.Type.DATA, 1722 offset=0, 1723 data=b'write ti', 1724 ), 1725 transfer_pb2.Chunk( 1726 session_id=_FIRST_SESSION_ID, 1727 type=transfer_pb2.Chunk.Type.DATA, 1728 offset=8, 1729 data=b'meout 2', 1730 remaining_bytes=0, 1731 ), 1732 transfer_pb2.Chunk( 1733 session_id=_FIRST_SESSION_ID, 1734 type=transfer_pb2.Chunk.Type.COMPLETION_ACK, 1735 ), 1736 ], 1737 ) 1738 1739 self.assertEqual(self._received_data(), b'write timeout 2') 1740 1741 def test_v2_closing_handshake_bad_chunk(self) -> None: 1742 """Tests an unexpected chunk response during the closing handshake.""" 1743 manager = pw_transfer.Manager( 1744 self._service, 1745 default_response_timeout_s=DEFAULT_TIMEOUT_S, 1746 default_protocol_version=ProtocolVersion.VERSION_TWO, 1747 ) 1748 1749 self._enqueue_server_responses( 1750 _Method.READ, 1751 ( 1752 ( 1753 transfer_pb2.Chunk( 1754 resource_id=47, 1755 session_id=_FIRST_SESSION_ID, 1756 type=transfer_pb2.Chunk.Type.START_ACK, 1757 protocol_version=ProtocolVersion.VERSION_TWO.value, 1758 ), 1759 ), 1760 ( 1761 transfer_pb2.Chunk( 1762 session_id=_FIRST_SESSION_ID, 1763 type=transfer_pb2.Chunk.Type.DATA, 1764 offset=0, 1765 data=b'version two', 1766 remaining_bytes=0, 1767 ), 1768 ), 1769 # In response to the COMPLETION, re-send the last chunk instead 1770 # of a COMPLETION_ACK. 1771 ( 1772 transfer_pb2.Chunk( 1773 session_id=_FIRST_SESSION_ID, 1774 type=transfer_pb2.Chunk.Type.DATA, 1775 offset=0, 1776 data=b'version two', 1777 remaining_bytes=0, 1778 ), 1779 ), 1780 ( 1781 transfer_pb2.Chunk( 1782 session_id=_FIRST_SESSION_ID, 1783 type=transfer_pb2.Chunk.Type.COMPLETION_ACK, 1784 ), 1785 ), 1786 ), 1787 ) 1788 1789 data = manager.read(47) 1790 1791 self.assertEqual( 1792 self._sent_chunks, 1793 [ 1794 transfer_pb2.Chunk( 1795 transfer_id=47, 1796 resource_id=47, 1797 desired_session_id=_FIRST_SESSION_ID, 1798 pending_bytes=1024, 1799 max_chunk_size_bytes=1024, 1800 window_end_offset=1024, 1801 type=transfer_pb2.Chunk.Type.START, 1802 protocol_version=ProtocolVersion.VERSION_TWO.value, 1803 ), 1804 transfer_pb2.Chunk( 1805 session_id=_FIRST_SESSION_ID, 1806 type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION, 1807 max_chunk_size_bytes=1024, 1808 window_end_offset=1024, 1809 protocol_version=ProtocolVersion.VERSION_TWO.value, 1810 ), 1811 transfer_pb2.Chunk( 1812 session_id=_FIRST_SESSION_ID, 1813 type=transfer_pb2.Chunk.Type.COMPLETION, 1814 status=Status.OK.value, 1815 ), 1816 # Completion should be re-sent following the repeated chunk. 1817 transfer_pb2.Chunk( 1818 session_id=_FIRST_SESSION_ID, 1819 type=transfer_pb2.Chunk.Type.COMPLETION, 1820 status=Status.OK.value, 1821 ), 1822 ], 1823 ) 1824 1825 self.assertEqual(data, b'version two') 1826 1827 def test_v2_timeout_during_closing_handshake(self) -> None: 1828 """Tests a timeout occurring during the closing handshake.""" 1829 manager = pw_transfer.Manager( 1830 self._service, 1831 default_response_timeout_s=DEFAULT_TIMEOUT_S, 1832 default_protocol_version=ProtocolVersion.VERSION_TWO, 1833 ) 1834 1835 self._enqueue_server_responses( 1836 _Method.READ, 1837 ( 1838 ( 1839 transfer_pb2.Chunk( 1840 resource_id=47, 1841 session_id=_FIRST_SESSION_ID, 1842 type=transfer_pb2.Chunk.Type.START_ACK, 1843 protocol_version=ProtocolVersion.VERSION_TWO.value, 1844 ), 1845 ), 1846 ( 1847 transfer_pb2.Chunk( 1848 session_id=_FIRST_SESSION_ID, 1849 type=transfer_pb2.Chunk.Type.DATA, 1850 offset=0, 1851 data=b'dropped completion', 1852 remaining_bytes=0, 1853 ), 1854 ), 1855 # Never send the expected COMPLETION_ACK chunk. 1856 ), 1857 ) 1858 1859 data = manager.read(47) 1860 1861 self.assertEqual( 1862 self._sent_chunks, 1863 [ 1864 transfer_pb2.Chunk( 1865 transfer_id=47, 1866 resource_id=47, 1867 desired_session_id=_FIRST_SESSION_ID, 1868 pending_bytes=1024, 1869 max_chunk_size_bytes=1024, 1870 window_end_offset=1024, 1871 type=transfer_pb2.Chunk.Type.START, 1872 protocol_version=ProtocolVersion.VERSION_TWO.value, 1873 ), 1874 transfer_pb2.Chunk( 1875 session_id=_FIRST_SESSION_ID, 1876 type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION, 1877 max_chunk_size_bytes=1024, 1878 window_end_offset=1024, 1879 protocol_version=ProtocolVersion.VERSION_TWO.value, 1880 ), 1881 transfer_pb2.Chunk( 1882 session_id=_FIRST_SESSION_ID, 1883 type=transfer_pb2.Chunk.Type.COMPLETION, 1884 status=Status.OK.value, 1885 ), 1886 # The completion should be retried per the usual retry flow. 1887 transfer_pb2.Chunk( 1888 session_id=_FIRST_SESSION_ID, 1889 type=transfer_pb2.Chunk.Type.COMPLETION, 1890 status=Status.OK.value, 1891 ), 1892 transfer_pb2.Chunk( 1893 session_id=_FIRST_SESSION_ID, 1894 type=transfer_pb2.Chunk.Type.COMPLETION, 1895 status=Status.OK.value, 1896 ), 1897 transfer_pb2.Chunk( 1898 session_id=_FIRST_SESSION_ID, 1899 type=transfer_pb2.Chunk.Type.COMPLETION, 1900 status=Status.OK.value, 1901 ), 1902 ], 1903 ) 1904 1905 # Despite timing out following several retries, the transfer should 1906 # still conclude successfully, as failing to receive a COMPLETION_ACK 1907 # is not fatal. 1908 self.assertEqual(data, b'dropped completion') 1909 1910 1911class ProgressStatsTest(unittest.TestCase): 1912 def test_received_percent_known_total(self) -> None: 1913 self.assertEqual( 1914 pw_transfer.ProgressStats(75, 0, 100).percent_received(), 0.0 1915 ) 1916 self.assertEqual( 1917 pw_transfer.ProgressStats(75, 50, 100).percent_received(), 50.0 1918 ) 1919 self.assertEqual( 1920 pw_transfer.ProgressStats(100, 100, 100).percent_received(), 100.0 1921 ) 1922 1923 def test_received_percent_unknown_total(self) -> None: 1924 self.assertTrue( 1925 math.isnan( 1926 pw_transfer.ProgressStats(75, 50, None).percent_received() 1927 ) 1928 ) 1929 self.assertTrue( 1930 math.isnan( 1931 pw_transfer.ProgressStats(100, 100, None).percent_received() 1932 ) 1933 ) 1934 1935 def test_str_known_total(self) -> None: 1936 stats = str(pw_transfer.ProgressStats(75, 50, 100)) 1937 self.assertIn('75', stats) 1938 self.assertIn('50', stats) 1939 self.assertIn('100', stats) 1940 1941 def test_str_unknown_total(self) -> None: 1942 stats = str(pw_transfer.ProgressStats(75, 50, None)) 1943 self.assertIn('75', stats) 1944 self.assertIn('50', stats) 1945 self.assertIn('unknown', stats) 1946 1947 1948if __name__ == '__main__': 1949 # TODO: b/265975025 - Only run this test in upstream Pigweed until the 1950 # occasional hangs are fixed. 1951 if os.environ.get('PW_ROOT') and os.environ.get( 1952 'PW_ROOT' 1953 ) == os.environ.get('PW_PROJECT_ROOT'): 1954 unittest.main() 1955 else: 1956 print('Skipping transfer_test.py due to possible hangs (b/265975025).') 1957