1# Copyright 2024 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Classes for read and write transfers.""" 15 16import abc 17import asyncio 18from dataclasses import dataclass 19import enum 20import logging 21import math 22import threading 23from typing import Any, Callable 24 25from pw_status import Status 26from pw_transfer.chunk import Chunk, ProtocolVersion 27 28_LOG = logging.getLogger(__package__) 29 30 31@dataclass(frozen=True) 32class ProgressStats: 33 bytes_sent: int 34 bytes_confirmed_received: int 35 total_size_bytes: int | None 36 37 def percent_received(self) -> float: 38 if self.total_size_bytes is None or self.total_size_bytes == 0: 39 return math.nan 40 41 return self.bytes_confirmed_received / self.total_size_bytes * 100 42 43 def __str__(self) -> str: 44 total = ( 45 str(self.total_size_bytes) if self.total_size_bytes else 'unknown' 46 ) 47 return ( 48 f'{self.percent_received():5.1f}% ({self.bytes_sent} B sent, ' 49 f'{self.bytes_confirmed_received} B received of {total} B)' 50 ) 51 52 53ProgressCallback = Callable[[ProgressStats], Any] 54 55 56class _Timer: 57 """A timer which invokes a callback after a certain timeout.""" 58 59 def __init__(self, timeout_s: float, callback: Callable[[], Any]): 60 self.timeout_s = timeout_s 61 self._callback = callback 62 self._task: asyncio.Task[Any] | None = None 63 64 def start(self, timeout_s: float | None = None) -> None: 65 """Starts a new timer. 66 67 If a timer is already running, it is stopped and a new timer started. 68 This can be used to implement watchdog-like behavior, where a callback 69 is invoked after some time without a kick. 70 """ 71 self.stop() 72 timeout_s = self.timeout_s if timeout_s is None else timeout_s 73 self._task = asyncio.create_task(self._run(timeout_s)) 74 75 def stop(self) -> None: 76 """Terminates a running timer.""" 77 if self._task is not None: 78 self._task.cancel() 79 self._task = None 80 81 async def _run(self, timeout_s: float) -> None: 82 await asyncio.sleep(timeout_s) 83 self._task = None 84 self._callback() 85 86 87class Transfer(abc.ABC): 88 """A client-side data transfer through a Manager. 89 90 Subclasses are responsible for implementing all of the logic for their type 91 of transfer, receiving messages from the server and sending the appropriate 92 messages in response. 93 """ 94 95 # pylint: disable=too-many-instance-attributes 96 97 class _State(enum.Enum): 98 # Transfer is starting. The server and client are performing an initial 99 # handshake and negotiating protocol and feature flags. 100 INITIATING = 0 101 102 # Waiting for the other end to send a chunk. 103 WAITING = 1 104 105 # Transmitting a window of data to a receiver. 106 TRANSMITTING = 2 107 108 # Recovering after one or more chunks was dropped in an active transfer. 109 RECOVERY = 3 110 111 # Transfer has completed locally and is waiting for the peer to 112 # acknowledge its final status. Only entered by the terminating side of 113 # the transfer. 114 # 115 # The context remains in a TERMINATING state until it receives an 116 # acknowledgement from the peer or times out. 117 TERMINATING = 4 118 119 # A transfer has fully completed. 120 COMPLETE = 5 121 122 def __init__( # pylint: disable=too-many-arguments 123 self, 124 session_id: int, 125 resource_id: int, 126 send_chunk: Callable[[Chunk], None], 127 end_transfer: Callable[['Transfer'], None], 128 response_timeout_s: float, 129 initial_response_timeout_s: float, 130 max_retries: int, 131 max_lifetime_retries: int, 132 protocol_version: ProtocolVersion, 133 progress_callback: ProgressCallback | None = None, 134 initial_offset: int = 0, 135 ): 136 self.status = Status.OK 137 self.done = threading.Event() 138 139 self._session_id = session_id 140 self._resource_id = resource_id 141 self._offset = initial_offset 142 143 self._send_chunk_fn = send_chunk 144 self._end_transfer = end_transfer 145 146 self._desired_protocol_version = protocol_version 147 self._configured_protocol_version = ProtocolVersion.UNKNOWN 148 149 if self._desired_protocol_version is ProtocolVersion.LEGACY: 150 # In a legacy transfer, there is no protocol negotiation stage. 151 # Automatically configure the context to run the legacy protocol and 152 # proceed to waiting for a chunk. 153 self._configured_protocol_version = ProtocolVersion.LEGACY 154 self._state = Transfer._State.WAITING 155 self._session_id = self._resource_id 156 else: 157 self._state = Transfer._State.INITIATING 158 159 self._last_chunk: Chunk | None = None 160 161 self._retries = 0 162 self._max_retries = max_retries 163 self._lifetime_retries = 0 164 self._max_lifetime_retries = max_lifetime_retries 165 self._response_timer = _Timer(response_timeout_s, self._on_timeout) 166 self._initial_response_timeout_s = initial_response_timeout_s 167 168 self._progress_callback = progress_callback 169 170 async def begin(self) -> None: 171 """Sends the initial chunk of the transfer.""" 172 173 if ( 174 self._desired_protocol_version is ProtocolVersion.UNKNOWN 175 or self._desired_protocol_version.value 176 > ProtocolVersion.LATEST.value 177 ): 178 _LOG.error( 179 'Cannot start a transfer with unsupported protocol version %d', 180 self._desired_protocol_version.value, 181 ) 182 self.finish(Status.INVALID_ARGUMENT) 183 return 184 185 initial_chunk = Chunk( 186 self._desired_protocol_version, 187 Chunk.Type.START, 188 resource_id=self._resource_id, 189 ) 190 191 if self._offset != 0: 192 initial_chunk.initial_offset = self._offset 193 194 if self._desired_protocol_version is ProtocolVersion.VERSION_TWO: 195 initial_chunk.desired_session_id = self._session_id 196 197 # Regardless of the desired protocol version, set any additional fields 198 # on the opening chunk, in case the server only runs legacy. 199 self._set_initial_chunk_fields(initial_chunk) 200 201 self._send_chunk(initial_chunk) 202 self._response_timer.start(self._initial_response_timeout_s) 203 204 @property 205 def id(self) -> int: 206 """Returns the identifier for the active transfer.""" 207 return self._session_id 208 209 @property 210 def resource_id(self) -> int: 211 """Returns the identifier of the resource being transferred.""" 212 return self._resource_id 213 214 @property 215 @abc.abstractmethod 216 def data(self) -> bytes: 217 """Returns the data read or written in this transfer.""" 218 219 @abc.abstractmethod 220 def _set_initial_chunk_fields(self, chunk: Chunk) -> None: 221 """Sets fields for the initial non-handshake chunk of the transfer.""" 222 223 def _send_chunk(self, chunk: Chunk) -> None: 224 """Sends a chunk to the server, keeping track of the last chunk sent.""" 225 self._send_chunk_fn(chunk) 226 self._last_chunk = chunk 227 228 async def handle_chunk(self, chunk: Chunk) -> None: 229 """Processes an incoming chunk from the server. 230 231 Handles terminating chunks (i.e. those with a status) and forwards 232 non-terminating chunks to handle_data_chunk. 233 """ 234 self._response_timer.stop() 235 self._retries = 0 # Received data from service, so reset the retries. 236 237 _LOG.debug('Received chunk\n%s', str(chunk.to_message()).rstrip()) 238 239 # Status chunks are only used to terminate a transfer. They do not 240 # contain any data that requires processing. 241 if chunk.status is not None: 242 if self._configured_protocol_version is ProtocolVersion.VERSION_TWO: 243 self._send_chunk( 244 Chunk( 245 self._configured_protocol_version, 246 Chunk.Type.COMPLETION_ACK, 247 session_id=self._session_id, 248 ) 249 ) 250 251 self.finish(Status(chunk.status)) 252 return 253 254 if self._state is Transfer._State.INITIATING: 255 await self._perform_initial_handshake(chunk) 256 elif self._state is Transfer._State.TERMINATING: 257 if chunk.type is Chunk.Type.COMPLETION_ACK: 258 self.finish(self.status) 259 else: 260 # Expecting a completion ACK but didn't receive one. Go through 261 # the retry process. 262 self._on_timeout() 263 # Only ignoring START_ACK, tests were unhappy with other non-data chunks 264 elif chunk.type not in [Chunk.Type.START_ACK]: 265 await self._handle_data_chunk(chunk) 266 else: 267 _LOG.warning("Ignoring extra START_ACK chunk") 268 return 269 270 # Start the timeout for the server to send a chunk in response. 271 self._response_timer.start() 272 273 async def _perform_initial_handshake(self, chunk: Chunk) -> None: 274 """Progresses the initial handshake phase of a v2+ transfer.""" 275 assert self._state is Transfer._State.INITIATING 276 277 # If a non-handshake chunk is received during an INITIATING state, the 278 # transfer server is running a legacy protocol version, which does not 279 # perform a handshake. End the handshake, revert to the legacy protocol, 280 # and process the chunk appropriately. 281 if chunk.type is not Chunk.Type.START_ACK: 282 _LOG.debug( 283 'Transfer %d got non-handshake chunk, reverting to legacy', 284 self.id, 285 ) 286 287 if self._offset != 0: 288 _LOG.error( 289 'Non-zero offset transfers not supported by legacy protocol' 290 ) 291 self.finish(Status.INTERNAL) 292 return 293 294 self._configured_protocol_version = ProtocolVersion.LEGACY 295 self._state = Transfer._State.WAITING 296 297 # Update the transfer's session ID, which will map to the 298 # transfer_id of the legacy chunk. 299 self._session_id = chunk.session_id 300 301 await self._handle_data_chunk(chunk) 302 return 303 304 self._configured_protocol_version = ProtocolVersion( 305 min( 306 self._desired_protocol_version.value, 307 chunk.protocol_version.value, 308 ) 309 ) 310 _LOG.debug( 311 'Transfer %d negotiating protocol version: ours=%d, theirs=%d', 312 self.id, 313 self._desired_protocol_version.value, 314 chunk.protocol_version.value, 315 ) 316 317 if self._offset != chunk.initial_offset: 318 # If our offsets don't match, let user handle it 319 self.finish(Status.UNIMPLEMENTED) 320 return 321 322 # Send a confirmation chunk to the server accepting the assigned session 323 # ID and protocol version. Tag any initial transfer parameters onto the 324 # chunk to begin the data transfer. 325 start_ack_confirmation = Chunk( 326 self._configured_protocol_version, 327 Chunk.Type.START_ACK_CONFIRMATION, 328 session_id=self._session_id, 329 offset=self._offset, 330 ) 331 332 self._set_initial_chunk_fields(start_ack_confirmation) 333 334 self._state = Transfer._State.WAITING 335 self._send_chunk(start_ack_confirmation) 336 337 @abc.abstractmethod 338 async def _handle_data_chunk(self, chunk: Chunk) -> None: 339 """Handles a chunk that contains or requests data.""" 340 341 @abc.abstractmethod 342 def _retry_after_data_timeout(self) -> None: 343 """Retries after a timeout occurs during the data transfer phase. 344 345 Only invoked when in the data transfer phase (i.e. state is in 346 {WAITING, TRANSMITTING, RECOVERY}). Timeouts occurring during an 347 opening or closing handshake are handled by the base Transfer. 348 """ 349 350 def _on_timeout(self) -> None: 351 """Handles a timeout while waiting for a chunk.""" 352 if self._state is Transfer._State.COMPLETE: 353 return 354 355 self._retries += 1 356 self._lifetime_retries += 1 357 358 if ( 359 self._retries > self._max_retries 360 or self._lifetime_retries > self._max_lifetime_retries 361 ): 362 if self._state is Transfer._State.TERMINATING: 363 # If the server never responded to the sent completion chunk, 364 # simply end the transfer locally with its original status. 365 self.finish(self.status) 366 else: 367 self.finish(Status.DEADLINE_EXCEEDED) 368 return 369 370 _LOG.debug( 371 'Received no responses for %.3fs; retrying %d/%d', 372 self._response_timer.timeout_s, 373 self._retries, 374 self._max_retries, 375 ) 376 377 retry_handshake_chunk = self._state in ( 378 Transfer._State.INITIATING, 379 Transfer._State.TERMINATING, 380 ) or ( 381 self._last_chunk is not None 382 and self._last_chunk.type is Chunk.Type.START_ACK_CONFIRMATION 383 ) 384 385 if retry_handshake_chunk: 386 assert self._last_chunk is not None 387 self._send_chunk(self._last_chunk) 388 else: 389 self._retry_after_data_timeout() 390 391 if ( 392 self._last_chunk is not None 393 and self._last_chunk.type is Chunk.Type.START 394 ): 395 self._response_timer.start(self._initial_response_timeout_s) 396 else: 397 self._response_timer.start() 398 399 def finish(self, status: Status, skip_callback: bool = False) -> None: 400 """Ends the transfer with the specified status.""" 401 self._response_timer.stop() 402 self.status = status 403 404 if status.ok(): 405 total_size = len(self.data) 406 self._update_progress(total_size, total_size, total_size) 407 408 if not skip_callback: 409 self._end_transfer(self) 410 411 # Set done last so that the transfer has been fully cleaned up. 412 self._state = Transfer._State.COMPLETE 413 self.done.set() 414 415 def _update_progress( 416 self, 417 bytes_sent: int, 418 bytes_confirmed_received: int, 419 total_size_bytes: int | None, 420 ) -> None: 421 """Invokes the provided progress callback, if any, with the progress.""" 422 423 stats = ProgressStats( 424 bytes_sent, bytes_confirmed_received, total_size_bytes 425 ) 426 _LOG.debug('Transfer %d progress: %s', self.id, stats) 427 428 if self._progress_callback: 429 self._progress_callback(stats) 430 431 def _send_final_chunk(self, status: Status) -> None: 432 """Sends a status chunk to the server and finishes the transfer.""" 433 self._send_chunk( 434 Chunk( 435 self._configured_protocol_version, 436 Chunk.Type.COMPLETION, 437 session_id=self.id, 438 status=status, 439 ) 440 ) 441 442 if self._configured_protocol_version is ProtocolVersion.VERSION_TWO: 443 # Wait for a completion ACK from the server. 444 self.status = status 445 self._state = Transfer._State.TERMINATING 446 self._response_timer.start() 447 else: 448 self.finish(status) 449 450 451class WriteTransfer(Transfer): 452 """A client -> server write transfer.""" 453 454 def __init__( # pylint: disable=too-many-arguments 455 self, 456 session_id: int, 457 resource_id: int, 458 data: bytes, 459 send_chunk: Callable[[Chunk], None], 460 end_transfer: Callable[[Transfer], None], 461 response_timeout_s: float, 462 initial_response_timeout_s: float, 463 max_retries: int, 464 max_lifetime_retries: int, 465 protocol_version: ProtocolVersion, 466 progress_callback: ProgressCallback | None = None, 467 initial_offset: int = 0, 468 ): 469 super().__init__( 470 session_id, 471 resource_id, 472 send_chunk, 473 end_transfer, 474 response_timeout_s, 475 initial_response_timeout_s, 476 max_retries, 477 max_lifetime_retries, 478 protocol_version, 479 progress_callback, 480 initial_offset=initial_offset, 481 ) 482 self._data = data 483 self.initial_offset = initial_offset 484 485 self._window_end_offset = 0 486 self._max_chunk_size = 0 487 self._chunk_delay_us: int | None = None 488 489 # The window ID increments for each parameters update. 490 self._window_id = 0 491 492 self._bytes_confirmed_received = 0 493 494 @property 495 def data(self) -> bytes: 496 return self._data 497 498 def _set_initial_chunk_fields(self, chunk: Chunk) -> None: 499 # Nothing to tag onto the initial chunk in a write transfer. 500 pass 501 502 async def _handle_data_chunk(self, chunk: Chunk) -> None: 503 """Processes an incoming chunk from the server. 504 505 In a write transfer, the server only sends transfer parameter updates 506 to the client. When a message is received, update local parameters and 507 send data accordingly. 508 """ 509 510 if self._state is Transfer._State.TRANSMITTING: 511 self._state = Transfer._State.WAITING 512 513 assert self._state is Transfer._State.WAITING 514 515 if not self._handle_parameters_update(chunk): 516 return 517 518 self._bytes_confirmed_received = chunk.offset 519 self._state = Transfer._State.TRANSMITTING 520 521 self._window_id += 1 522 asyncio.create_task(self._transmit_next_chunk(self._window_id)) 523 524 async def _transmit_next_chunk( 525 self, window_id: int, timeout_us: int | None = None 526 ) -> None: 527 """Transmits a single data chunk to the server. 528 529 If the chunk completes the active window, returns to a WAITING state. 530 Otherwise, schedules another transmission for the next chunk. 531 """ 532 if timeout_us is not None: 533 await asyncio.sleep(timeout_us / 1e6) 534 535 if self._state is not Transfer._State.TRANSMITTING: 536 return 537 538 if window_id != self._window_id: 539 _LOG.debug('Transfer %d: Skipping stale window', self.id) 540 return 541 542 chunk = self._next_chunk() 543 self._offset += len(chunk.data) 544 545 sent_requested_bytes = self._offset == self._window_end_offset 546 547 self._send_chunk(chunk) 548 self._update_progress( 549 self._offset, 550 self._bytes_confirmed_received, 551 len(self.data) + self.initial_offset, 552 ) 553 554 if sent_requested_bytes: 555 self._state = Transfer._State.WAITING 556 else: 557 asyncio.create_task( 558 self._transmit_next_chunk( 559 window_id, timeout_us=self._chunk_delay_us 560 ) 561 ) 562 563 def _handle_parameters_update(self, chunk: Chunk) -> bool: 564 """Updates transfer state based on a transfer parameters update.""" 565 566 if chunk.offset > len(self.data) + self.initial_offset: 567 # Bad offset; terminate the transfer. 568 _LOG.error( 569 'Transfer %d: server requested invalid offset %d (size %d)', 570 self.id, 571 chunk.offset, 572 len(self.data) + self.initial_offset, 573 ) 574 575 self._send_final_chunk(Status.OUT_OF_RANGE) 576 return False 577 578 if chunk.offset == chunk.window_end_offset: 579 _LOG.error( 580 'Transfer %d: service requested 0 bytes (invalid); aborting', 581 self.id, 582 ) 583 self._send_final_chunk(Status.INTERNAL) 584 return False 585 586 # Extend the window to the new end offset specified by the server. 587 self._window_end_offset = min( 588 chunk.window_end_offset, len(self.data) + self.initial_offset 589 ) 590 591 if chunk.requests_transmission_from_offset(): 592 # Check whether the client has sent a previous data offset, which 593 # indicates that some chunks were lost in transmission. 594 if chunk.offset < self._offset: 595 _LOG.debug( 596 'Write transfer %d rolling back: offset %d from %d', 597 self.id, 598 chunk.offset, 599 self._offset, 600 ) 601 602 self._offset = chunk.offset 603 elif ( 604 chunk.type is Chunk.Type.PARAMETERS_CONTINUE 605 and chunk.window_end_offset <= self._offset 606 ): 607 _LOG.debug( 608 'Write transfer %d ignoring old rolling window chunk', 609 self.id, 610 ) 611 return False 612 613 if chunk.max_chunk_size_bytes is not None: 614 self._max_chunk_size = chunk.max_chunk_size_bytes 615 616 if chunk.min_delay_microseconds is not None: 617 self._chunk_delay_us = chunk.min_delay_microseconds 618 619 return True 620 621 def _retry_after_data_timeout(self) -> None: 622 if ( 623 self._state is Transfer._State.WAITING 624 and self._last_chunk is not None 625 ): 626 self._send_chunk(self._last_chunk) 627 628 def _next_chunk(self) -> Chunk: 629 """Returns the next Chunk message to send in the data transfer.""" 630 chunk = Chunk( 631 self._configured_protocol_version, 632 Chunk.Type.DATA, 633 session_id=self.id, 634 offset=self._offset, 635 ) 636 637 max_bytes_in_chunk = min( 638 self._max_chunk_size, self._window_end_offset - self._offset 639 ) 640 chunk.data = self.data[ 641 self._offset 642 - self.initial_offset : self._offset 643 - self.initial_offset 644 + max_bytes_in_chunk 645 ] 646 647 # Mark the final chunk of the transfer. 648 if ( 649 len(self.data) - self._offset + self.initial_offset 650 <= max_bytes_in_chunk 651 ): 652 chunk.remaining_bytes = 0 653 654 return chunk 655 656 657class ReadTransfer(Transfer): 658 """A client <- server read transfer. 659 660 Although Python can effectively handle an unlimited transfer window, this 661 client sets a conservative window and chunk size to avoid overloading the 662 device. These are configurable in the constructor. 663 """ 664 665 # pylint: disable=too-many-instance-attributes 666 667 # The fractional position within a window at which a receive transfer should 668 # extend its window size to minimize the amount of time the transmitter 669 # spends blocked. 670 # 671 # For example, a divisor of 2 will extend the window when half of the 672 # requested data has been received, a divisor of three will extend at a 673 # third of the window, and so on. 674 EXTEND_WINDOW_DIVISOR = 2 675 676 # Slow start and congestion avoidance are analogues to the equally named 677 # phases in TCP congestion control. 678 class _TransmitPhase(enum.Enum): 679 SLOW_START = 0 680 CONGESTION_AVOIDANCE = 1 681 682 # The type of data transmission the transfer is requesting. 683 class _TransmitAction(enum.Enum): 684 # Immediate parameters sent at the start of a new transfer for legacy 685 # compatibility. 686 BEGIN = 0 687 688 # Initial parameters chunk following the opening handshake. 689 FIRST_PARAMETERS = 1 690 691 # Extend the current transmission window. 692 EXTEND = 2 693 694 # Rewind the transfer to a certain offset following data loss. 695 RETRANSMIT = 3 696 697 def __init__( # pylint: disable=too-many-arguments 698 self, 699 session_id: int, 700 resource_id: int, 701 send_chunk: Callable[[Chunk], None], 702 end_transfer: Callable[[Transfer], None], 703 response_timeout_s: float, 704 initial_response_timeout_s: float, 705 max_retries: int, 706 max_lifetime_retries: int, 707 protocol_version: ProtocolVersion, 708 max_window_size_bytes: int = 32768, 709 max_chunk_size: int = 1024, 710 chunk_delay_us: int | None = None, 711 progress_callback: ProgressCallback | None = None, 712 initial_offset: int = 0, 713 ): 714 super().__init__( 715 session_id, 716 resource_id, 717 send_chunk, 718 end_transfer, 719 response_timeout_s, 720 initial_response_timeout_s, 721 max_retries, 722 max_lifetime_retries, 723 protocol_version, 724 progress_callback, 725 initial_offset=initial_offset, 726 ) 727 self._max_window_size_bytes = max_window_size_bytes 728 self._max_chunk_size = max_chunk_size 729 self._chunk_delay_us = chunk_delay_us 730 731 self._remaining_transfer_size: int | None = None 732 self._data = bytearray() 733 self._window_end_offset = max_chunk_size 734 self._window_size_multiplier = 1 735 self._window_size = self._max_chunk_size * self._window_size_multiplier 736 self._transmit_phase = ReadTransfer._TransmitPhase.SLOW_START 737 self._last_chunk_offset: int | None = None 738 739 @property 740 def data(self) -> bytes: 741 """Returns an immutable copy of the data that has been read.""" 742 return bytes(self._data) 743 744 def _set_initial_chunk_fields(self, chunk: Chunk) -> None: 745 self._update_and_set_transfer_parameters( 746 chunk, ReadTransfer._TransmitAction.BEGIN 747 ) 748 749 async def _handle_data_chunk(self, chunk: Chunk) -> None: 750 """Processes an incoming chunk from the server. 751 752 In a read transfer, the client receives data chunks from the server. 753 Once all pending data is received, the transfer parameters are updated. 754 """ 755 756 if self._state is Transfer._State.RECOVERY: 757 if chunk.offset != self._offset: 758 if self._last_chunk_offset == chunk.offset: 759 _LOG.debug( 760 'Transfer %d received repeated offset %d: ' 761 'retry detected, resending transfer parameters', 762 self.id, 763 chunk.offset, 764 ) 765 self._send_chunk( 766 self._transfer_parameters( 767 ReadTransfer._TransmitAction.RETRANSMIT 768 ) 769 ) 770 else: 771 _LOG.debug( 772 'Transfer %d waiting for offset %d, ignoring %d', 773 self.id, 774 self._offset, 775 chunk.offset, 776 ) 777 self._last_chunk_offset = chunk.offset 778 return 779 780 _LOG.info( 781 'Transfer %d received expected offset %d, resuming transfer', 782 self.id, 783 chunk.offset, 784 ) 785 self._state = Transfer._State.WAITING 786 787 assert self._state is Transfer._State.WAITING 788 789 if chunk.offset != self._offset: 790 if chunk.offset + len(chunk.data) <= self._offset: 791 # If the chunk's data has already been received, don't go 792 # through a full recovery cycle to avoid shrinking the window 793 # size and potentially thrashing. The expected data may already 794 # be in-flight, so just allow the transmitter to keep going with 795 # a CONTINUE parameters chunk. 796 _LOG.debug( 797 'Transfer %d received duplicate chunk with offset %d', 798 self.id, 799 chunk.offset, 800 ) 801 self._send_chunk( 802 self._transfer_parameters( 803 ReadTransfer._TransmitAction.EXTEND, 804 update=False, 805 ) 806 ) 807 else: 808 # Initially, the transfer service only supports in-order 809 # transfers. If data is received out of order, request that the 810 # server retransmit from the previous offset. 811 _LOG.debug( 812 'Transfer %d expected offset %d, received %d: ' 813 'entering recovery state', 814 self.id, 815 self._offset, 816 chunk.offset, 817 ) 818 self._state = Transfer._State.RECOVERY 819 820 self._send_chunk( 821 self._transfer_parameters( 822 ReadTransfer._TransmitAction.RETRANSMIT 823 ) 824 ) 825 return 826 827 self._data += chunk.data 828 self._offset += len(chunk.data) 829 830 # Update the last offset seen so that retries can be detected. 831 self._last_chunk_offset = chunk.offset 832 833 if chunk.remaining_bytes is not None: 834 if chunk.remaining_bytes == 0: 835 # No more data to read. Acknowledge receipt and finish. 836 self._send_final_chunk(Status.OK) 837 return 838 839 # The server may indicate if the amount of remaining data is known. 840 self._remaining_transfer_size = chunk.remaining_bytes 841 elif self._remaining_transfer_size is not None: 842 # Update the remaining transfer size, if it is known. 843 self._remaining_transfer_size -= len(chunk.data) 844 845 # If the transfer size drops to zero, the estimate was inaccurate. 846 if self._remaining_transfer_size <= 0: 847 self._remaining_transfer_size = None 848 849 total_size = ( 850 None 851 if self._remaining_transfer_size is None 852 else (self._remaining_transfer_size + self._offset) 853 ) 854 self._update_progress(self._offset, self._offset, total_size) 855 856 if chunk.window_end_offset != 0: 857 if chunk.window_end_offset < self._offset: 858 _LOG.error( 859 'Transfer %d: transmitter sent invalid earlier end offset ' 860 '%d (receiver offset %d)', 861 self.id, 862 chunk.window_end_offset, 863 self._offset, 864 ) 865 self._send_final_chunk(Status.INTERNAL) 866 return 867 868 if chunk.window_end_offset > self._window_end_offset: 869 _LOG.error( 870 'Transfer %d: transmitter sent invalid later end offset ' 871 '%d (receiver end offset %d)', 872 self.id, 873 chunk.window_end_offset, 874 self._window_end_offset, 875 ) 876 self._send_final_chunk(Status.INTERNAL) 877 return 878 879 self._window_end_offset = chunk.window_end_offset 880 881 if self._offset == self._window_end_offset: 882 # All pending data was received. Send out a new parameters chunk for 883 # the next block. 884 self._send_chunk( 885 self._transfer_parameters(ReadTransfer._TransmitAction.EXTEND) 886 ) 887 return 888 889 remaining_window_size = self._window_end_offset - self._offset 890 extend_window = ( 891 remaining_window_size 892 <= self._window_size / ReadTransfer.EXTEND_WINDOW_DIVISOR 893 ) 894 895 if extend_window: 896 self._send_chunk( 897 self._transfer_parameters(ReadTransfer._TransmitAction.EXTEND) 898 ) 899 900 def _retry_after_data_timeout(self) -> None: 901 if ( 902 self._state is Transfer._State.WAITING 903 or self._state is Transfer._State.RECOVERY 904 ): 905 self._send_chunk( 906 self._transfer_parameters( 907 ReadTransfer._TransmitAction.RETRANSMIT 908 ) 909 ) 910 911 def _set_transfer_parameters( 912 self, 913 chunk: Chunk, 914 ) -> None: 915 chunk.offset = self._offset 916 chunk.window_end_offset = self._window_end_offset 917 chunk.max_chunk_size_bytes = self._max_chunk_size 918 919 if self._chunk_delay_us: 920 chunk.min_delay_microseconds = self._chunk_delay_us 921 922 def _update_and_set_transfer_parameters( 923 self, chunk: Chunk, action: 'ReadTransfer._TransmitAction' 924 ) -> None: 925 if action is ReadTransfer._TransmitAction.EXTEND: 926 # Window was received successfully without packet loss and should 927 # grow. Double the window size during slow start, or increase it by 928 # a single chunk in congestion avoidance. 929 if self._transmit_phase == ReadTransfer._TransmitPhase.SLOW_START: 930 self._window_size_multiplier *= 2 931 else: 932 self._window_size_multiplier += 1 933 934 # The window size can never exceed the user-specified maximum bytes. 935 # If it does, reduce the multiplier to the largest size that fits. 936 if ( 937 self._window_size_multiplier * self._max_chunk_size 938 > self._max_window_size_bytes 939 ): 940 self._window_size_multiplier = ( 941 self._max_window_size_bytes // self._max_chunk_size 942 ) 943 944 elif action is ReadTransfer._TransmitAction.RETRANSMIT: 945 # A packet was lost: shrink the window size. Additionally, after the 946 # first packet loss, transition from the slow start to the 947 # congestion avoidance phase of the transfer. 948 if self._transmit_phase == ReadTransfer._TransmitPhase.SLOW_START: 949 self._transmit_phase = ( 950 ReadTransfer._TransmitPhase.CONGESTION_AVOIDANCE 951 ) 952 self._window_size_multiplier = max( 953 self._window_size_multiplier // 2, 1 954 ) 955 956 self._window_size = min( 957 self._max_chunk_size * self._window_size_multiplier, 958 self._max_window_size_bytes, 959 ) 960 961 self._window_end_offset = self._offset + self._window_size 962 self._set_transfer_parameters(chunk) 963 964 def _transfer_parameters( 965 self, 966 action: 'ReadTransfer._TransmitAction', 967 update: bool = True, 968 ) -> Chunk: 969 """Returns an updated transfer parameters chunk.""" 970 971 if action is ReadTransfer._TransmitAction.BEGIN: 972 chunk_type = Chunk.Type.START 973 elif action is ReadTransfer._TransmitAction.EXTEND: 974 chunk_type = Chunk.Type.PARAMETERS_CONTINUE 975 else: 976 chunk_type = Chunk.Type.PARAMETERS_RETRANSMIT 977 978 chunk = Chunk( 979 self._configured_protocol_version, chunk_type, session_id=self.id 980 ) 981 982 if update: 983 self._update_and_set_transfer_parameters(chunk, action) 984 else: 985 self._set_transfer_parameters(chunk) 986 987 return chunk 988