1# Copyright 2024 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Classes for read and write transfers.""" 15 16import abc 17import asyncio 18from dataclasses import dataclass 19import enum 20import logging 21import math 22import threading 23from typing import Any, Callable 24 25from pw_status import Status 26from pw_transfer.chunk import Chunk, ProtocolVersion 27 28_LOG = logging.getLogger(__package__) 29 30 31@dataclass(frozen=True) 32class ProgressStats: 33 bytes_sent: int 34 bytes_confirmed_received: int 35 total_size_bytes: int | None 36 37 def percent_received(self) -> float: 38 if self.total_size_bytes is None or self.total_size_bytes == 0: 39 return math.nan 40 41 return self.bytes_confirmed_received / self.total_size_bytes * 100 42 43 def __str__(self) -> str: 44 total = ( 45 str(self.total_size_bytes) if self.total_size_bytes else 'unknown' 46 ) 47 return ( 48 f'{self.percent_received():5.1f}% ({self.bytes_sent} B sent, ' 49 f'{self.bytes_confirmed_received} B received of {total} B)' 50 ) 51 52 53ProgressCallback = Callable[[ProgressStats], Any] 54 55 56class _Timer: 57 """A timer which invokes a callback after a certain timeout.""" 58 59 def __init__(self, timeout_s: float, callback: Callable[[], Any]): 60 self.timeout_s = timeout_s 61 self._callback = callback 62 self._task: asyncio.Task[Any] | None = None 63 64 def start(self, timeout_s: float | None = None) -> None: 65 """Starts a new timer. 66 67 If a timer is already running, it is stopped and a new timer started. 68 This can be used to implement watchdog-like behavior, where a callback 69 is invoked after some time without a kick. 70 """ 71 self.stop() 72 timeout_s = self.timeout_s if timeout_s is None else timeout_s 73 self._task = asyncio.create_task(self._run(timeout_s)) 74 75 def stop(self) -> None: 76 """Terminates a running timer.""" 77 if self._task is not None: 78 self._task.cancel() 79 self._task = None 80 81 async def _run(self, timeout_s: float) -> None: 82 await asyncio.sleep(timeout_s) 83 self._task = None 84 self._callback() 85 86 87class Transfer(abc.ABC): 88 """A client-side data transfer through a Manager. 89 90 Subclasses are responsible for implementing all of the logic for their type 91 of transfer, receiving messages from the server and sending the appropriate 92 messages in response. 93 """ 94 95 # pylint: disable=too-many-instance-attributes 96 97 class _State(enum.Enum): 98 # Transfer is starting. The server and client are performing an initial 99 # handshake and negotiating protocol and feature flags. 100 INITIATING = 0 101 102 # Waiting for the other end to send a chunk. 103 WAITING = 1 104 105 # Transmitting a window of data to a receiver. 106 TRANSMITTING = 2 107 108 # Recovering after one or more chunks was dropped in an active transfer. 109 RECOVERY = 3 110 111 # Transfer has completed locally and is waiting for the peer to 112 # acknowledge its final status. Only entered by the terminating side of 113 # the transfer. 114 # 115 # The context remains in a TERMINATING state until it receives an 116 # acknowledgement from the peer or times out. 117 TERMINATING = 4 118 119 # A transfer has fully completed. 120 COMPLETE = 5 121 122 def __init__( # pylint: disable=too-many-arguments 123 self, 124 session_id: int, 125 resource_id: int, 126 send_chunk: Callable[[Chunk], None], 127 end_transfer: Callable[['Transfer'], None], 128 response_timeout_s: float, 129 initial_response_timeout_s: float, 130 max_retries: int, 131 max_lifetime_retries: int, 132 protocol_version: ProtocolVersion, 133 progress_callback: ProgressCallback | None = None, 134 initial_offset: int = 0, 135 ): 136 self.status = Status.OK 137 self.done = threading.Event() 138 139 self._session_id = session_id 140 self._resource_id = resource_id 141 self._offset = initial_offset 142 143 self._send_chunk_fn = send_chunk 144 self._end_transfer = end_transfer 145 146 self._desired_protocol_version = protocol_version 147 self._configured_protocol_version = ProtocolVersion.UNKNOWN 148 149 if self._desired_protocol_version is ProtocolVersion.LEGACY: 150 # In a legacy transfer, there is no protocol negotiation stage. 151 # Automatically configure the context to run the legacy protocol and 152 # proceed to waiting for a chunk. 153 self._configured_protocol_version = ProtocolVersion.LEGACY 154 self._state = Transfer._State.WAITING 155 self._session_id = self._resource_id 156 else: 157 self._state = Transfer._State.INITIATING 158 159 self._last_chunk: Chunk | None = None 160 161 self._retries = 0 162 self._max_retries = max_retries 163 self._lifetime_retries = 0 164 self._max_lifetime_retries = max_lifetime_retries 165 self._response_timer = _Timer(response_timeout_s, self._on_timeout) 166 self._initial_response_timeout_s = initial_response_timeout_s 167 168 self._progress_callback = progress_callback 169 170 async def begin(self) -> None: 171 """Sends the initial chunk of the transfer.""" 172 173 if ( 174 self._desired_protocol_version is ProtocolVersion.UNKNOWN 175 or self._desired_protocol_version.value 176 > ProtocolVersion.LATEST.value 177 ): 178 _LOG.error( 179 'Cannot start a transfer with unsupported protocol version %d', 180 self._desired_protocol_version.value, 181 ) 182 self.finish(Status.INVALID_ARGUMENT) 183 return 184 185 initial_chunk = Chunk( 186 self._desired_protocol_version, 187 Chunk.Type.START, 188 resource_id=self._resource_id, 189 ) 190 191 if self._offset != 0: 192 initial_chunk.initial_offset = self._offset 193 194 if self._desired_protocol_version is ProtocolVersion.VERSION_TWO: 195 initial_chunk.desired_session_id = self._session_id 196 197 # Regardless of the desired protocol version, set any additional fields 198 # on the opening chunk, in case the server only runs legacy. 199 self._set_initial_chunk_fields(initial_chunk) 200 201 self._send_chunk(initial_chunk) 202 self._response_timer.start(self._initial_response_timeout_s) 203 204 @property 205 def id(self) -> int: 206 """Returns the identifier for the active transfer.""" 207 return self._session_id 208 209 @property 210 def resource_id(self) -> int: 211 """Returns the identifier of the resource being transferred.""" 212 return self._resource_id 213 214 @property 215 @abc.abstractmethod 216 def data(self) -> bytes: 217 """Returns the data read or written in this transfer.""" 218 219 @abc.abstractmethod 220 def _set_initial_chunk_fields(self, chunk: Chunk) -> None: 221 """Sets fields for the initial non-handshake chunk of the transfer.""" 222 223 def _send_chunk(self, chunk: Chunk) -> None: 224 """Sends a chunk to the server, keeping track of the last chunk sent.""" 225 self._send_chunk_fn(chunk) 226 self._last_chunk = chunk 227 228 async def handle_chunk(self, chunk: Chunk) -> None: 229 """Processes an incoming chunk from the server. 230 231 Handles terminating chunks (i.e. those with a status) and forwards 232 non-terminating chunks to handle_data_chunk. 233 """ 234 self._response_timer.stop() 235 self._retries = 0 # Received data from service, so reset the retries. 236 237 _LOG.debug('Received chunk\n%s', str(chunk.to_message()).rstrip()) 238 239 # Status chunks are only used to terminate a transfer. They do not 240 # contain any data that requires processing. 241 if chunk.status is not None: 242 if self._configured_protocol_version is ProtocolVersion.VERSION_TWO: 243 self._send_chunk( 244 Chunk( 245 self._configured_protocol_version, 246 Chunk.Type.COMPLETION_ACK, 247 session_id=self._session_id, 248 ) 249 ) 250 251 self.finish(Status(chunk.status)) 252 return 253 254 if self._state is Transfer._State.INITIATING: 255 await self._perform_initial_handshake(chunk) 256 elif self._state is Transfer._State.TERMINATING: 257 if chunk.type is Chunk.Type.COMPLETION_ACK: 258 self.finish(self.status) 259 else: 260 # Expecting a completion ACK but didn't receive one. Go through 261 # the retry process. 262 self._on_timeout() 263 # Only ignoring START_ACK, tests were unhappy with other non-data chunks 264 elif chunk.type not in [Chunk.Type.START_ACK]: 265 await self._handle_data_chunk(chunk) 266 else: 267 _LOG.warning("Ignoring extra START_ACK chunk") 268 return 269 270 # Start the timeout for the server to send a chunk in response. 271 self._response_timer.start() 272 273 async def _perform_initial_handshake(self, chunk: Chunk) -> None: 274 """Progresses the initial handshake phase of a v2+ transfer.""" 275 assert self._state is Transfer._State.INITIATING 276 277 # If a non-handshake chunk is received during an INITIATING state, the 278 # transfer server is running a legacy protocol version, which does not 279 # perform a handshake. End the handshake, revert to the legacy protocol, 280 # and process the chunk appropriately. 281 if chunk.type is not Chunk.Type.START_ACK: 282 _LOG.debug( 283 'Transfer %d got non-handshake chunk, reverting to legacy', 284 self.id, 285 ) 286 287 if self._offset != 0: 288 _LOG.error( 289 'Non-zero offset transfers not supported by legacy protocol' 290 ) 291 self.finish(Status.INTERNAL) 292 return 293 294 self._configured_protocol_version = ProtocolVersion.LEGACY 295 self._state = Transfer._State.WAITING 296 297 # Update the transfer's session ID, which will map to the 298 # transfer_id of the legacy chunk. 299 self._session_id = chunk.session_id 300 301 await self._handle_data_chunk(chunk) 302 return 303 304 self._configured_protocol_version = ProtocolVersion( 305 min( 306 self._desired_protocol_version.value, 307 chunk.protocol_version.value, 308 ) 309 ) 310 _LOG.debug( 311 'Transfer %d negotiating protocol version: ours=%d, theirs=%d', 312 self.id, 313 self._desired_protocol_version.value, 314 chunk.protocol_version.value, 315 ) 316 317 if self._offset != chunk.initial_offset: 318 # If our offsets don't match, let user handle it 319 self.finish(Status.UNIMPLEMENTED) 320 return 321 322 # Send a confirmation chunk to the server accepting the assigned session 323 # ID and protocol version. Tag any initial transfer parameters onto the 324 # chunk to begin the data transfer. 325 start_ack_confirmation = Chunk( 326 self._configured_protocol_version, 327 Chunk.Type.START_ACK_CONFIRMATION, 328 session_id=self._session_id, 329 offset=self._offset, 330 ) 331 332 self._set_initial_chunk_fields(start_ack_confirmation) 333 334 self._state = Transfer._State.WAITING 335 self._send_chunk(start_ack_confirmation) 336 337 @abc.abstractmethod 338 async def _handle_data_chunk(self, chunk: Chunk) -> None: 339 """Handles a chunk that contains or requests data.""" 340 341 @abc.abstractmethod 342 def _retry_after_data_timeout(self) -> None: 343 """Retries after a timeout occurs during the data transfer phase. 344 345 Only invoked when in the data transfer phase (i.e. state is in 346 {WAITING, TRANSMITTING, RECOVERY}). Timeouts occurring during an 347 opening or closing handshake are handled by the base Transfer. 348 """ 349 350 def _on_timeout(self) -> None: 351 """Handles a timeout while waiting for a chunk.""" 352 if self._state is Transfer._State.COMPLETE: 353 return 354 355 self._retries += 1 356 self._lifetime_retries += 1 357 358 if ( 359 self._retries > self._max_retries 360 or self._lifetime_retries > self._max_lifetime_retries 361 ): 362 if self._state is Transfer._State.TERMINATING: 363 # If the server never responded to the sent completion chunk, 364 # simply end the transfer locally with its original status. 365 self.finish(self.status) 366 else: 367 self.finish(Status.DEADLINE_EXCEEDED) 368 return 369 370 _LOG.debug( 371 'Received no responses for %.3fs; retrying %d/%d', 372 self._response_timer.timeout_s, 373 self._retries, 374 self._max_retries, 375 ) 376 377 retry_handshake_chunk = self._state in ( 378 Transfer._State.INITIATING, 379 Transfer._State.TERMINATING, 380 ) or ( 381 self._last_chunk is not None 382 and self._last_chunk.type is Chunk.Type.START_ACK_CONFIRMATION 383 ) 384 385 if retry_handshake_chunk: 386 assert self._last_chunk is not None 387 self._send_chunk(self._last_chunk) 388 else: 389 self._retry_after_data_timeout() 390 391 self._response_timer.start() 392 393 def finish(self, status: Status, skip_callback: bool = False) -> None: 394 """Ends the transfer with the specified status.""" 395 self._response_timer.stop() 396 self.status = status 397 398 if status.ok(): 399 total_size = len(self.data) 400 self._update_progress(total_size, total_size, total_size) 401 402 if not skip_callback: 403 self._end_transfer(self) 404 405 # Set done last so that the transfer has been fully cleaned up. 406 self._state = Transfer._State.COMPLETE 407 self.done.set() 408 409 def _update_progress( 410 self, 411 bytes_sent: int, 412 bytes_confirmed_received: int, 413 total_size_bytes: int | None, 414 ) -> None: 415 """Invokes the provided progress callback, if any, with the progress.""" 416 417 stats = ProgressStats( 418 bytes_sent, bytes_confirmed_received, total_size_bytes 419 ) 420 _LOG.debug('Transfer %d progress: %s', self.id, stats) 421 422 if self._progress_callback: 423 self._progress_callback(stats) 424 425 def _send_final_chunk(self, status: Status) -> None: 426 """Sends a status chunk to the server and finishes the transfer.""" 427 self._send_chunk( 428 Chunk( 429 self._configured_protocol_version, 430 Chunk.Type.COMPLETION, 431 session_id=self.id, 432 status=status, 433 ) 434 ) 435 436 if self._configured_protocol_version is ProtocolVersion.VERSION_TWO: 437 # Wait for a completion ACK from the server. 438 self.status = status 439 self._state = Transfer._State.TERMINATING 440 self._response_timer.start() 441 else: 442 self.finish(status) 443 444 445class WriteTransfer(Transfer): 446 """A client -> server write transfer.""" 447 448 def __init__( # pylint: disable=too-many-arguments 449 self, 450 session_id: int, 451 resource_id: int, 452 data: bytes, 453 send_chunk: Callable[[Chunk], None], 454 end_transfer: Callable[[Transfer], None], 455 response_timeout_s: float, 456 initial_response_timeout_s: float, 457 max_retries: int, 458 max_lifetime_retries: int, 459 protocol_version: ProtocolVersion, 460 progress_callback: ProgressCallback | None = None, 461 initial_offset: int = 0, 462 ): 463 super().__init__( 464 session_id, 465 resource_id, 466 send_chunk, 467 end_transfer, 468 response_timeout_s, 469 initial_response_timeout_s, 470 max_retries, 471 max_lifetime_retries, 472 protocol_version, 473 progress_callback, 474 initial_offset=initial_offset, 475 ) 476 self._data = data 477 self.initial_offset = initial_offset 478 479 self._window_end_offset = 0 480 self._max_chunk_size = 0 481 self._chunk_delay_us: int | None = None 482 483 # The window ID increments for each parameters update. 484 self._window_id = 0 485 486 self._bytes_confirmed_received = 0 487 488 @property 489 def data(self) -> bytes: 490 return self._data 491 492 def _set_initial_chunk_fields(self, chunk: Chunk) -> None: 493 # Nothing to tag onto the initial chunk in a write transfer. 494 pass 495 496 async def _handle_data_chunk(self, chunk: Chunk) -> None: 497 """Processes an incoming chunk from the server. 498 499 In a write transfer, the server only sends transfer parameter updates 500 to the client. When a message is received, update local parameters and 501 send data accordingly. 502 """ 503 504 if self._state is Transfer._State.TRANSMITTING: 505 self._state = Transfer._State.WAITING 506 507 assert self._state is Transfer._State.WAITING 508 509 if not self._handle_parameters_update(chunk): 510 return 511 512 self._bytes_confirmed_received = chunk.offset 513 self._state = Transfer._State.TRANSMITTING 514 515 self._window_id += 1 516 asyncio.create_task(self._transmit_next_chunk(self._window_id)) 517 518 async def _transmit_next_chunk( 519 self, window_id: int, timeout_us: int | None = None 520 ) -> None: 521 """Transmits a single data chunk to the server. 522 523 If the chunk completes the active window, returns to a WAITING state. 524 Otherwise, schedules another transmission for the next chunk. 525 """ 526 if timeout_us is not None: 527 await asyncio.sleep(timeout_us / 1e6) 528 529 if self._state is not Transfer._State.TRANSMITTING: 530 return 531 532 if window_id != self._window_id: 533 _LOG.debug('Transfer %d: Skipping stale window', self.id) 534 return 535 536 chunk = self._next_chunk() 537 self._offset += len(chunk.data) 538 539 sent_requested_bytes = self._offset == self._window_end_offset 540 541 self._send_chunk(chunk) 542 self._update_progress( 543 self._offset, 544 self._bytes_confirmed_received, 545 len(self.data) + self.initial_offset, 546 ) 547 548 if sent_requested_bytes: 549 self._state = Transfer._State.WAITING 550 else: 551 asyncio.create_task( 552 self._transmit_next_chunk( 553 window_id, timeout_us=self._chunk_delay_us 554 ) 555 ) 556 557 def _handle_parameters_update(self, chunk: Chunk) -> bool: 558 """Updates transfer state based on a transfer parameters update.""" 559 560 if chunk.offset > len(self.data) + self.initial_offset: 561 # Bad offset; terminate the transfer. 562 _LOG.error( 563 'Transfer %d: server requested invalid offset %d (size %d)', 564 self.id, 565 chunk.offset, 566 len(self.data) + self.initial_offset, 567 ) 568 569 self._send_final_chunk(Status.OUT_OF_RANGE) 570 return False 571 572 if chunk.offset == chunk.window_end_offset: 573 _LOG.error( 574 'Transfer %d: service requested 0 bytes (invalid); aborting', 575 self.id, 576 ) 577 self._send_final_chunk(Status.INTERNAL) 578 return False 579 580 # Extend the window to the new end offset specified by the server. 581 self._window_end_offset = min( 582 chunk.window_end_offset, len(self.data) + self.initial_offset 583 ) 584 585 if chunk.requests_transmission_from_offset(): 586 # Check whether the client has sent a previous data offset, which 587 # indicates that some chunks were lost in transmission. 588 if chunk.offset < self._offset: 589 _LOG.debug( 590 'Write transfer %d rolling back: offset %d from %d', 591 self.id, 592 chunk.offset, 593 self._offset, 594 ) 595 596 self._offset = chunk.offset 597 598 if chunk.max_chunk_size_bytes is not None: 599 self._max_chunk_size = chunk.max_chunk_size_bytes 600 601 if chunk.min_delay_microseconds is not None: 602 self._chunk_delay_us = chunk.min_delay_microseconds 603 604 return True 605 606 def _retry_after_data_timeout(self) -> None: 607 if ( 608 self._state is Transfer._State.WAITING 609 and self._last_chunk is not None 610 ): 611 self._send_chunk(self._last_chunk) 612 613 def _next_chunk(self) -> Chunk: 614 """Returns the next Chunk message to send in the data transfer.""" 615 chunk = Chunk( 616 self._configured_protocol_version, 617 Chunk.Type.DATA, 618 session_id=self.id, 619 offset=self._offset, 620 ) 621 622 max_bytes_in_chunk = min( 623 self._max_chunk_size, self._window_end_offset - self._offset 624 ) 625 chunk.data = self.data[ 626 self._offset 627 - self.initial_offset : self._offset 628 - self.initial_offset 629 + max_bytes_in_chunk 630 ] 631 632 # Mark the final chunk of the transfer. 633 if ( 634 len(self.data) - self._offset + self.initial_offset 635 <= max_bytes_in_chunk 636 ): 637 chunk.remaining_bytes = 0 638 639 return chunk 640 641 642class ReadTransfer(Transfer): 643 """A client <- server read transfer. 644 645 Although Python can effectively handle an unlimited transfer window, this 646 client sets a conservative window and chunk size to avoid overloading the 647 device. These are configurable in the constructor. 648 """ 649 650 # pylint: disable=too-many-instance-attributes 651 652 # The fractional position within a window at which a receive transfer should 653 # extend its window size to minimize the amount of time the transmitter 654 # spends blocked. 655 # 656 # For example, a divisor of 2 will extend the window when half of the 657 # requested data has been received, a divisor of three will extend at a 658 # third of the window, and so on. 659 EXTEND_WINDOW_DIVISOR = 2 660 661 # Slow start and congestion avoidance are analogues to the equally named 662 # phases in TCP congestion control. 663 class _TransmitPhase(enum.Enum): 664 SLOW_START = 0 665 CONGESTION_AVOIDANCE = 1 666 667 # The type of data transmission the transfer is requesting. 668 class _TransmitAction(enum.Enum): 669 # Immediate parameters sent at the start of a new transfer for legacy 670 # compatibility. 671 BEGIN = 0 672 673 # Initial parameters chunk following the opening handshake. 674 FIRST_PARAMETERS = 1 675 676 # Extend the current transmission window. 677 EXTEND = 2 678 679 # Rewind the transfer to a certain offset following data loss. 680 RETRANSMIT = 3 681 682 def __init__( # pylint: disable=too-many-arguments 683 self, 684 session_id: int, 685 resource_id: int, 686 send_chunk: Callable[[Chunk], None], 687 end_transfer: Callable[[Transfer], None], 688 response_timeout_s: float, 689 initial_response_timeout_s: float, 690 max_retries: int, 691 max_lifetime_retries: int, 692 protocol_version: ProtocolVersion, 693 max_window_size_bytes: int = 32768, 694 max_chunk_size: int = 1024, 695 chunk_delay_us: int | None = None, 696 progress_callback: ProgressCallback | None = None, 697 initial_offset: int = 0, 698 ): 699 super().__init__( 700 session_id, 701 resource_id, 702 send_chunk, 703 end_transfer, 704 response_timeout_s, 705 initial_response_timeout_s, 706 max_retries, 707 max_lifetime_retries, 708 protocol_version, 709 progress_callback, 710 initial_offset=initial_offset, 711 ) 712 self._max_window_size_bytes = max_window_size_bytes 713 self._max_chunk_size = max_chunk_size 714 self._chunk_delay_us = chunk_delay_us 715 716 self._remaining_transfer_size: int | None = None 717 self._data = bytearray() 718 self._window_end_offset = max_chunk_size 719 self._window_size_multiplier = 1 720 self._window_size = self._max_chunk_size * self._window_size_multiplier 721 self._transmit_phase = ReadTransfer._TransmitPhase.SLOW_START 722 self._last_chunk_offset: int | None = None 723 724 @property 725 def data(self) -> bytes: 726 """Returns an immutable copy of the data that has been read.""" 727 return bytes(self._data) 728 729 def _set_initial_chunk_fields(self, chunk: Chunk) -> None: 730 self._update_and_set_transfer_parameters( 731 chunk, ReadTransfer._TransmitAction.BEGIN 732 ) 733 734 async def _handle_data_chunk(self, chunk: Chunk) -> None: 735 """Processes an incoming chunk from the server. 736 737 In a read transfer, the client receives data chunks from the server. 738 Once all pending data is received, the transfer parameters are updated. 739 """ 740 741 if self._state is Transfer._State.RECOVERY: 742 if chunk.offset != self._offset: 743 if self._last_chunk_offset == chunk.offset: 744 _LOG.debug( 745 'Transfer %d received repeated offset %d: ' 746 'retry detected, resending transfer parameters', 747 self.id, 748 chunk.offset, 749 ) 750 self._send_chunk( 751 self._transfer_parameters( 752 ReadTransfer._TransmitAction.RETRANSMIT 753 ) 754 ) 755 else: 756 _LOG.debug( 757 'Transfer %d waiting for offset %d, ignoring %d', 758 self.id, 759 self._offset, 760 chunk.offset, 761 ) 762 self._last_chunk_offset = chunk.offset 763 return 764 765 _LOG.info( 766 'Transfer %d received expected offset %d, resuming transfer', 767 self.id, 768 chunk.offset, 769 ) 770 self._state = Transfer._State.WAITING 771 772 assert self._state is Transfer._State.WAITING 773 774 if chunk.offset != self._offset: 775 # Initially, the transfer service only supports in-order transfers. 776 # If data is received out of order, request that the server 777 # retransmit from the previous offset. 778 _LOG.debug( 779 'Transfer %d expected offset %d, received %d: ' 780 'entering recovery state', 781 self.id, 782 self._offset, 783 chunk.offset, 784 ) 785 self._state = Transfer._State.RECOVERY 786 787 self._send_chunk( 788 self._transfer_parameters( 789 ReadTransfer._TransmitAction.RETRANSMIT 790 ) 791 ) 792 return 793 794 self._data += chunk.data 795 self._offset += len(chunk.data) 796 797 # Update the last offset seen so that retries can be detected. 798 self._last_chunk_offset = chunk.offset 799 800 if chunk.remaining_bytes is not None: 801 if chunk.remaining_bytes == 0: 802 # No more data to read. Acknowledge receipt and finish. 803 self._send_final_chunk(Status.OK) 804 return 805 806 # The server may indicate if the amount of remaining data is known. 807 self._remaining_transfer_size = chunk.remaining_bytes 808 elif self._remaining_transfer_size is not None: 809 # Update the remaining transfer size, if it is known. 810 self._remaining_transfer_size -= len(chunk.data) 811 812 # If the transfer size drops to zero, the estimate was inaccurate. 813 if self._remaining_transfer_size <= 0: 814 self._remaining_transfer_size = None 815 816 total_size = ( 817 None 818 if self._remaining_transfer_size is None 819 else (self._remaining_transfer_size + self._offset) 820 ) 821 self._update_progress(self._offset, self._offset, total_size) 822 823 if chunk.window_end_offset != 0: 824 if chunk.window_end_offset < self._offset: 825 _LOG.error( 826 'Transfer %d: transmitter sent invalid earlier end offset ' 827 '%d (receiver offset %d)', 828 self.id, 829 chunk.window_end_offset, 830 self._offset, 831 ) 832 self._send_final_chunk(Status.INTERNAL) 833 return 834 835 if chunk.window_end_offset > self._window_end_offset: 836 _LOG.error( 837 'Transfer %d: transmitter sent invalid later end offset ' 838 '%d (receiver end offset %d)', 839 self.id, 840 chunk.window_end_offset, 841 self._window_end_offset, 842 ) 843 self._send_final_chunk(Status.INTERNAL) 844 return 845 846 self._window_end_offset = chunk.window_end_offset 847 848 if self._offset == self._window_end_offset: 849 # All pending data was received. Send out a new parameters chunk for 850 # the next block. 851 self._send_chunk( 852 self._transfer_parameters(ReadTransfer._TransmitAction.EXTEND) 853 ) 854 return 855 856 remaining_window_size = self._window_end_offset - self._offset 857 extend_window = ( 858 remaining_window_size 859 <= self._window_size / ReadTransfer.EXTEND_WINDOW_DIVISOR 860 ) 861 862 if extend_window: 863 self._send_chunk( 864 self._transfer_parameters(ReadTransfer._TransmitAction.EXTEND) 865 ) 866 867 def _retry_after_data_timeout(self) -> None: 868 if ( 869 self._state is Transfer._State.WAITING 870 or self._state is Transfer._State.RECOVERY 871 ): 872 self._send_chunk( 873 self._transfer_parameters( 874 ReadTransfer._TransmitAction.RETRANSMIT 875 ) 876 ) 877 878 def _update_and_set_transfer_parameters( 879 self, chunk: Chunk, action: 'ReadTransfer._TransmitAction' 880 ) -> None: 881 if action is ReadTransfer._TransmitAction.EXTEND: 882 # Window was received successfully without packet loss and should 883 # grow. Double the window size during slow start, or increase it by 884 # a single chunk in congestion avoidance. 885 if self._transmit_phase == ReadTransfer._TransmitPhase.SLOW_START: 886 self._window_size_multiplier *= 2 887 else: 888 self._window_size_multiplier += 1 889 890 # The window size can never exceed the user-specified maximum bytes. 891 # If it does, reduce the multiplier to the largest size that fits. 892 if ( 893 self._window_size_multiplier * self._max_chunk_size 894 > self._max_window_size_bytes 895 ): 896 self._window_size_multiplier = ( 897 self._max_window_size_bytes // self._max_chunk_size 898 ) 899 900 elif action is ReadTransfer._TransmitAction.RETRANSMIT: 901 # A packet was lost: shrink the window size. Additionally, after the 902 # first packet loss, transition from the slow start to the 903 # congestion avoidance phase of the transfer. 904 if self._transmit_phase == ReadTransfer._TransmitPhase.SLOW_START: 905 self._transmit_phase = ( 906 ReadTransfer._TransmitPhase.CONGESTION_AVOIDANCE 907 ) 908 self._window_size_multiplier = max( 909 self._window_size_multiplier // 2, 1 910 ) 911 912 self._window_size = min( 913 self._max_chunk_size * self._window_size_multiplier, 914 self._max_window_size_bytes, 915 ) 916 917 self._window_end_offset = self._offset + self._window_size 918 919 chunk.offset = self._offset 920 chunk.window_end_offset = self._window_end_offset 921 chunk.max_chunk_size_bytes = self._max_chunk_size 922 923 if self._chunk_delay_us: 924 chunk.min_delay_microseconds = self._chunk_delay_us 925 926 def _transfer_parameters( 927 self, action: 'ReadTransfer._TransmitAction' 928 ) -> Chunk: 929 """Returns an updated transfer parameters chunk.""" 930 931 if action is ReadTransfer._TransmitAction.BEGIN: 932 chunk_type = Chunk.Type.START 933 elif action is ReadTransfer._TransmitAction.EXTEND: 934 chunk_type = Chunk.Type.PARAMETERS_CONTINUE 935 else: 936 chunk_type = Chunk.Type.PARAMETERS_RETRANSMIT 937 938 chunk = Chunk( 939 self._configured_protocol_version, chunk_type, session_id=self.id 940 ) 941 self._update_and_set_transfer_parameters(chunk, action) 942 943 return chunk 944