• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2024 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Classes for read and write transfers."""
15
16import abc
17import asyncio
18from dataclasses import dataclass
19import enum
20import logging
21import math
22import threading
23from typing import Any, Callable
24
25from pw_status import Status
26from pw_transfer.chunk import Chunk, ProtocolVersion
27
28_LOG = logging.getLogger(__package__)
29
30
31@dataclass(frozen=True)
32class ProgressStats:
33    bytes_sent: int
34    bytes_confirmed_received: int
35    total_size_bytes: int | None
36
37    def percent_received(self) -> float:
38        if self.total_size_bytes is None or self.total_size_bytes == 0:
39            return math.nan
40
41        return self.bytes_confirmed_received / self.total_size_bytes * 100
42
43    def __str__(self) -> str:
44        total = (
45            str(self.total_size_bytes) if self.total_size_bytes else 'unknown'
46        )
47        return (
48            f'{self.percent_received():5.1f}% ({self.bytes_sent} B sent, '
49            f'{self.bytes_confirmed_received} B received of {total} B)'
50        )
51
52
53ProgressCallback = Callable[[ProgressStats], Any]
54
55
56class _Timer:
57    """A timer which invokes a callback after a certain timeout."""
58
59    def __init__(self, timeout_s: float, callback: Callable[[], Any]):
60        self.timeout_s = timeout_s
61        self._callback = callback
62        self._task: asyncio.Task[Any] | None = None
63
64    def start(self, timeout_s: float | None = None) -> None:
65        """Starts a new timer.
66
67        If a timer is already running, it is stopped and a new timer started.
68        This can be used to implement watchdog-like behavior, where a callback
69        is invoked after some time without a kick.
70        """
71        self.stop()
72        timeout_s = self.timeout_s if timeout_s is None else timeout_s
73        self._task = asyncio.create_task(self._run(timeout_s))
74
75    def stop(self) -> None:
76        """Terminates a running timer."""
77        if self._task is not None:
78            self._task.cancel()
79            self._task = None
80
81    async def _run(self, timeout_s: float) -> None:
82        await asyncio.sleep(timeout_s)
83        self._task = None
84        self._callback()
85
86
87class Transfer(abc.ABC):
88    """A client-side data transfer through a Manager.
89
90    Subclasses are responsible for implementing all of the logic for their type
91    of transfer, receiving messages from the server and sending the appropriate
92    messages in response.
93    """
94
95    # pylint: disable=too-many-instance-attributes
96
97    class _State(enum.Enum):
98        # Transfer is starting. The server and client are performing an initial
99        # handshake and negotiating protocol and feature flags.
100        INITIATING = 0
101
102        # Waiting for the other end to send a chunk.
103        WAITING = 1
104
105        # Transmitting a window of data to a receiver.
106        TRANSMITTING = 2
107
108        # Recovering after one or more chunks was dropped in an active transfer.
109        RECOVERY = 3
110
111        # Transfer has completed locally and is waiting for the peer to
112        # acknowledge its final status. Only entered by the terminating side of
113        # the transfer.
114        #
115        # The context remains in a TERMINATING state until it receives an
116        # acknowledgement from the peer or times out.
117        TERMINATING = 4
118
119        # A transfer has fully completed.
120        COMPLETE = 5
121
122    def __init__(  # pylint: disable=too-many-arguments
123        self,
124        session_id: int,
125        resource_id: int,
126        send_chunk: Callable[[Chunk], None],
127        end_transfer: Callable[['Transfer'], None],
128        response_timeout_s: float,
129        initial_response_timeout_s: float,
130        max_retries: int,
131        max_lifetime_retries: int,
132        protocol_version: ProtocolVersion,
133        progress_callback: ProgressCallback | None = None,
134        initial_offset: int = 0,
135    ):
136        self.status = Status.OK
137        self.done = threading.Event()
138
139        self._session_id = session_id
140        self._resource_id = resource_id
141        self._offset = initial_offset
142
143        self._send_chunk_fn = send_chunk
144        self._end_transfer = end_transfer
145
146        self._desired_protocol_version = protocol_version
147        self._configured_protocol_version = ProtocolVersion.UNKNOWN
148
149        if self._desired_protocol_version is ProtocolVersion.LEGACY:
150            # In a legacy transfer, there is no protocol negotiation stage.
151            # Automatically configure the context to run the legacy protocol and
152            # proceed to waiting for a chunk.
153            self._configured_protocol_version = ProtocolVersion.LEGACY
154            self._state = Transfer._State.WAITING
155            self._session_id = self._resource_id
156        else:
157            self._state = Transfer._State.INITIATING
158
159        self._last_chunk: Chunk | None = None
160
161        self._retries = 0
162        self._max_retries = max_retries
163        self._lifetime_retries = 0
164        self._max_lifetime_retries = max_lifetime_retries
165        self._response_timer = _Timer(response_timeout_s, self._on_timeout)
166        self._initial_response_timeout_s = initial_response_timeout_s
167
168        self._progress_callback = progress_callback
169
170    async def begin(self) -> None:
171        """Sends the initial chunk of the transfer."""
172
173        if (
174            self._desired_protocol_version is ProtocolVersion.UNKNOWN
175            or self._desired_protocol_version.value
176            > ProtocolVersion.LATEST.value
177        ):
178            _LOG.error(
179                'Cannot start a transfer with unsupported protocol version %d',
180                self._desired_protocol_version.value,
181            )
182            self.finish(Status.INVALID_ARGUMENT)
183            return
184
185        initial_chunk = Chunk(
186            self._desired_protocol_version,
187            Chunk.Type.START,
188            resource_id=self._resource_id,
189        )
190
191        if self._offset != 0:
192            initial_chunk.initial_offset = self._offset
193
194        if self._desired_protocol_version is ProtocolVersion.VERSION_TWO:
195            initial_chunk.desired_session_id = self._session_id
196
197        # Regardless of the desired protocol version, set any additional fields
198        # on the opening chunk, in case the server only runs legacy.
199        self._set_initial_chunk_fields(initial_chunk)
200
201        self._send_chunk(initial_chunk)
202        self._response_timer.start(self._initial_response_timeout_s)
203
204    @property
205    def id(self) -> int:
206        """Returns the identifier for the active transfer."""
207        return self._session_id
208
209    @property
210    def resource_id(self) -> int:
211        """Returns the identifier of the resource being transferred."""
212        return self._resource_id
213
214    @property
215    @abc.abstractmethod
216    def data(self) -> bytes:
217        """Returns the data read or written in this transfer."""
218
219    @abc.abstractmethod
220    def _set_initial_chunk_fields(self, chunk: Chunk) -> None:
221        """Sets fields for the initial non-handshake chunk of the transfer."""
222
223    def _send_chunk(self, chunk: Chunk) -> None:
224        """Sends a chunk to the server, keeping track of the last chunk sent."""
225        self._send_chunk_fn(chunk)
226        self._last_chunk = chunk
227
228    async def handle_chunk(self, chunk: Chunk) -> None:
229        """Processes an incoming chunk from the server.
230
231        Handles terminating chunks (i.e. those with a status) and forwards
232        non-terminating chunks to handle_data_chunk.
233        """
234        self._response_timer.stop()
235        self._retries = 0  # Received data from service, so reset the retries.
236
237        _LOG.debug('Received chunk\n%s', str(chunk.to_message()).rstrip())
238
239        # Status chunks are only used to terminate a transfer. They do not
240        # contain any data that requires processing.
241        if chunk.status is not None:
242            if self._configured_protocol_version is ProtocolVersion.VERSION_TWO:
243                self._send_chunk(
244                    Chunk(
245                        self._configured_protocol_version,
246                        Chunk.Type.COMPLETION_ACK,
247                        session_id=self._session_id,
248                    )
249                )
250
251            self.finish(Status(chunk.status))
252            return
253
254        if self._state is Transfer._State.INITIATING:
255            await self._perform_initial_handshake(chunk)
256        elif self._state is Transfer._State.TERMINATING:
257            if chunk.type is Chunk.Type.COMPLETION_ACK:
258                self.finish(self.status)
259            else:
260                # Expecting a completion ACK but didn't receive one. Go through
261                # the retry process.
262                self._on_timeout()
263        # Only ignoring START_ACK, tests were unhappy with other non-data chunks
264        elif chunk.type not in [Chunk.Type.START_ACK]:
265            await self._handle_data_chunk(chunk)
266        else:
267            _LOG.warning("Ignoring extra START_ACK chunk")
268            return
269
270        # Start the timeout for the server to send a chunk in response.
271        self._response_timer.start()
272
273    async def _perform_initial_handshake(self, chunk: Chunk) -> None:
274        """Progresses the initial handshake phase of a v2+ transfer."""
275        assert self._state is Transfer._State.INITIATING
276
277        # If a non-handshake chunk is received during an INITIATING state, the
278        # transfer server is running a legacy protocol version, which does not
279        # perform a handshake. End the handshake, revert to the legacy protocol,
280        # and process the chunk appropriately.
281        if chunk.type is not Chunk.Type.START_ACK:
282            _LOG.debug(
283                'Transfer %d got non-handshake chunk, reverting to legacy',
284                self.id,
285            )
286
287            if self._offset != 0:
288                _LOG.error(
289                    'Non-zero offset transfers not supported by legacy protocol'
290                )
291                self.finish(Status.INTERNAL)
292                return
293
294            self._configured_protocol_version = ProtocolVersion.LEGACY
295            self._state = Transfer._State.WAITING
296
297            # Update the transfer's session ID, which will map to the
298            # transfer_id of the legacy chunk.
299            self._session_id = chunk.session_id
300
301            await self._handle_data_chunk(chunk)
302            return
303
304        self._configured_protocol_version = ProtocolVersion(
305            min(
306                self._desired_protocol_version.value,
307                chunk.protocol_version.value,
308            )
309        )
310        _LOG.debug(
311            'Transfer %d negotiating protocol version: ours=%d, theirs=%d',
312            self.id,
313            self._desired_protocol_version.value,
314            chunk.protocol_version.value,
315        )
316
317        if self._offset != chunk.initial_offset:
318            # If our offsets don't match, let user handle it
319            self.finish(Status.UNIMPLEMENTED)
320            return
321
322        # Send a confirmation chunk to the server accepting the assigned session
323        # ID and protocol version. Tag any initial transfer parameters onto the
324        # chunk to begin the data transfer.
325        start_ack_confirmation = Chunk(
326            self._configured_protocol_version,
327            Chunk.Type.START_ACK_CONFIRMATION,
328            session_id=self._session_id,
329            offset=self._offset,
330        )
331
332        self._set_initial_chunk_fields(start_ack_confirmation)
333
334        self._state = Transfer._State.WAITING
335        self._send_chunk(start_ack_confirmation)
336
337    @abc.abstractmethod
338    async def _handle_data_chunk(self, chunk: Chunk) -> None:
339        """Handles a chunk that contains or requests data."""
340
341    @abc.abstractmethod
342    def _retry_after_data_timeout(self) -> None:
343        """Retries after a timeout occurs during the data transfer phase.
344
345        Only invoked when in the data transfer phase (i.e. state is in
346        {WAITING, TRANSMITTING, RECOVERY}). Timeouts occurring during an
347        opening or closing handshake are handled by the base Transfer.
348        """
349
350    def _on_timeout(self) -> None:
351        """Handles a timeout while waiting for a chunk."""
352        if self._state is Transfer._State.COMPLETE:
353            return
354
355        self._retries += 1
356        self._lifetime_retries += 1
357
358        if (
359            self._retries > self._max_retries
360            or self._lifetime_retries > self._max_lifetime_retries
361        ):
362            if self._state is Transfer._State.TERMINATING:
363                # If the server never responded to the sent completion chunk,
364                # simply end the transfer locally with its original status.
365                self.finish(self.status)
366            else:
367                self.finish(Status.DEADLINE_EXCEEDED)
368            return
369
370        _LOG.debug(
371            'Received no responses for %.3fs; retrying %d/%d',
372            self._response_timer.timeout_s,
373            self._retries,
374            self._max_retries,
375        )
376
377        retry_handshake_chunk = self._state in (
378            Transfer._State.INITIATING,
379            Transfer._State.TERMINATING,
380        ) or (
381            self._last_chunk is not None
382            and self._last_chunk.type is Chunk.Type.START_ACK_CONFIRMATION
383        )
384
385        if retry_handshake_chunk:
386            assert self._last_chunk is not None
387            self._send_chunk(self._last_chunk)
388        else:
389            self._retry_after_data_timeout()
390
391        self._response_timer.start()
392
393    def finish(self, status: Status, skip_callback: bool = False) -> None:
394        """Ends the transfer with the specified status."""
395        self._response_timer.stop()
396        self.status = status
397
398        if status.ok():
399            total_size = len(self.data)
400            self._update_progress(total_size, total_size, total_size)
401
402        if not skip_callback:
403            self._end_transfer(self)
404
405        # Set done last so that the transfer has been fully cleaned up.
406        self._state = Transfer._State.COMPLETE
407        self.done.set()
408
409    def _update_progress(
410        self,
411        bytes_sent: int,
412        bytes_confirmed_received: int,
413        total_size_bytes: int | None,
414    ) -> None:
415        """Invokes the provided progress callback, if any, with the progress."""
416
417        stats = ProgressStats(
418            bytes_sent, bytes_confirmed_received, total_size_bytes
419        )
420        _LOG.debug('Transfer %d progress: %s', self.id, stats)
421
422        if self._progress_callback:
423            self._progress_callback(stats)
424
425    def _send_final_chunk(self, status: Status) -> None:
426        """Sends a status chunk to the server and finishes the transfer."""
427        self._send_chunk(
428            Chunk(
429                self._configured_protocol_version,
430                Chunk.Type.COMPLETION,
431                session_id=self.id,
432                status=status,
433            )
434        )
435
436        if self._configured_protocol_version is ProtocolVersion.VERSION_TWO:
437            # Wait for a completion ACK from the server.
438            self.status = status
439            self._state = Transfer._State.TERMINATING
440            self._response_timer.start()
441        else:
442            self.finish(status)
443
444
445class WriteTransfer(Transfer):
446    """A client -> server write transfer."""
447
448    def __init__(  # pylint: disable=too-many-arguments
449        self,
450        session_id: int,
451        resource_id: int,
452        data: bytes,
453        send_chunk: Callable[[Chunk], None],
454        end_transfer: Callable[[Transfer], None],
455        response_timeout_s: float,
456        initial_response_timeout_s: float,
457        max_retries: int,
458        max_lifetime_retries: int,
459        protocol_version: ProtocolVersion,
460        progress_callback: ProgressCallback | None = None,
461        initial_offset: int = 0,
462    ):
463        super().__init__(
464            session_id,
465            resource_id,
466            send_chunk,
467            end_transfer,
468            response_timeout_s,
469            initial_response_timeout_s,
470            max_retries,
471            max_lifetime_retries,
472            protocol_version,
473            progress_callback,
474            initial_offset=initial_offset,
475        )
476        self._data = data
477        self.initial_offset = initial_offset
478
479        self._window_end_offset = 0
480        self._max_chunk_size = 0
481        self._chunk_delay_us: int | None = None
482
483        # The window ID increments for each parameters update.
484        self._window_id = 0
485
486        self._bytes_confirmed_received = 0
487
488    @property
489    def data(self) -> bytes:
490        return self._data
491
492    def _set_initial_chunk_fields(self, chunk: Chunk) -> None:
493        # Nothing to tag onto the initial chunk in a write transfer.
494        pass
495
496    async def _handle_data_chunk(self, chunk: Chunk) -> None:
497        """Processes an incoming chunk from the server.
498
499        In a write transfer, the server only sends transfer parameter updates
500        to the client. When a message is received, update local parameters and
501        send data accordingly.
502        """
503
504        if self._state is Transfer._State.TRANSMITTING:
505            self._state = Transfer._State.WAITING
506
507        assert self._state is Transfer._State.WAITING
508
509        if not self._handle_parameters_update(chunk):
510            return
511
512        self._bytes_confirmed_received = chunk.offset
513        self._state = Transfer._State.TRANSMITTING
514
515        self._window_id += 1
516        asyncio.create_task(self._transmit_next_chunk(self._window_id))
517
518    async def _transmit_next_chunk(
519        self, window_id: int, timeout_us: int | None = None
520    ) -> None:
521        """Transmits a single data chunk to the server.
522
523        If the chunk completes the active window, returns to a WAITING state.
524        Otherwise, schedules another transmission for the next chunk.
525        """
526        if timeout_us is not None:
527            await asyncio.sleep(timeout_us / 1e6)
528
529        if self._state is not Transfer._State.TRANSMITTING:
530            return
531
532        if window_id != self._window_id:
533            _LOG.debug('Transfer %d: Skipping stale window', self.id)
534            return
535
536        chunk = self._next_chunk()
537        self._offset += len(chunk.data)
538
539        sent_requested_bytes = self._offset == self._window_end_offset
540
541        self._send_chunk(chunk)
542        self._update_progress(
543            self._offset,
544            self._bytes_confirmed_received,
545            len(self.data) + self.initial_offset,
546        )
547
548        if sent_requested_bytes:
549            self._state = Transfer._State.WAITING
550        else:
551            asyncio.create_task(
552                self._transmit_next_chunk(
553                    window_id, timeout_us=self._chunk_delay_us
554                )
555            )
556
557    def _handle_parameters_update(self, chunk: Chunk) -> bool:
558        """Updates transfer state based on a transfer parameters update."""
559
560        if chunk.offset > len(self.data) + self.initial_offset:
561            # Bad offset; terminate the transfer.
562            _LOG.error(
563                'Transfer %d: server requested invalid offset %d (size %d)',
564                self.id,
565                chunk.offset,
566                len(self.data) + self.initial_offset,
567            )
568
569            self._send_final_chunk(Status.OUT_OF_RANGE)
570            return False
571
572        if chunk.offset == chunk.window_end_offset:
573            _LOG.error(
574                'Transfer %d: service requested 0 bytes (invalid); aborting',
575                self.id,
576            )
577            self._send_final_chunk(Status.INTERNAL)
578            return False
579
580        # Extend the window to the new end offset specified by the server.
581        self._window_end_offset = min(
582            chunk.window_end_offset, len(self.data) + self.initial_offset
583        )
584
585        if chunk.requests_transmission_from_offset():
586            # Check whether the client has sent a previous data offset, which
587            # indicates that some chunks were lost in transmission.
588            if chunk.offset < self._offset:
589                _LOG.debug(
590                    'Write transfer %d rolling back: offset %d from %d',
591                    self.id,
592                    chunk.offset,
593                    self._offset,
594                )
595
596            self._offset = chunk.offset
597
598        if chunk.max_chunk_size_bytes is not None:
599            self._max_chunk_size = chunk.max_chunk_size_bytes
600
601        if chunk.min_delay_microseconds is not None:
602            self._chunk_delay_us = chunk.min_delay_microseconds
603
604        return True
605
606    def _retry_after_data_timeout(self) -> None:
607        if (
608            self._state is Transfer._State.WAITING
609            and self._last_chunk is not None
610        ):
611            self._send_chunk(self._last_chunk)
612
613    def _next_chunk(self) -> Chunk:
614        """Returns the next Chunk message to send in the data transfer."""
615        chunk = Chunk(
616            self._configured_protocol_version,
617            Chunk.Type.DATA,
618            session_id=self.id,
619            offset=self._offset,
620        )
621
622        max_bytes_in_chunk = min(
623            self._max_chunk_size, self._window_end_offset - self._offset
624        )
625        chunk.data = self.data[
626            self._offset
627            - self.initial_offset : self._offset
628            - self.initial_offset
629            + max_bytes_in_chunk
630        ]
631
632        # Mark the final chunk of the transfer.
633        if (
634            len(self.data) - self._offset + self.initial_offset
635            <= max_bytes_in_chunk
636        ):
637            chunk.remaining_bytes = 0
638
639        return chunk
640
641
642class ReadTransfer(Transfer):
643    """A client <- server read transfer.
644
645    Although Python can effectively handle an unlimited transfer window, this
646    client sets a conservative window and chunk size to avoid overloading the
647    device. These are configurable in the constructor.
648    """
649
650    # pylint: disable=too-many-instance-attributes
651
652    # The fractional position within a window at which a receive transfer should
653    # extend its window size to minimize the amount of time the transmitter
654    # spends blocked.
655    #
656    # For example, a divisor of 2 will extend the window when half of the
657    # requested data has been received, a divisor of three will extend at a
658    # third of the window, and so on.
659    EXTEND_WINDOW_DIVISOR = 2
660
661    # Slow start and congestion avoidance are analogues to the equally named
662    # phases in TCP congestion control.
663    class _TransmitPhase(enum.Enum):
664        SLOW_START = 0
665        CONGESTION_AVOIDANCE = 1
666
667    # The type of data transmission the transfer is requesting.
668    class _TransmitAction(enum.Enum):
669        # Immediate parameters sent at the start of a new transfer for legacy
670        # compatibility.
671        BEGIN = 0
672
673        # Initial parameters chunk following the opening handshake.
674        FIRST_PARAMETERS = 1
675
676        # Extend the current transmission window.
677        EXTEND = 2
678
679        # Rewind the transfer to a certain offset following data loss.
680        RETRANSMIT = 3
681
682    def __init__(  # pylint: disable=too-many-arguments
683        self,
684        session_id: int,
685        resource_id: int,
686        send_chunk: Callable[[Chunk], None],
687        end_transfer: Callable[[Transfer], None],
688        response_timeout_s: float,
689        initial_response_timeout_s: float,
690        max_retries: int,
691        max_lifetime_retries: int,
692        protocol_version: ProtocolVersion,
693        max_window_size_bytes: int = 32768,
694        max_chunk_size: int = 1024,
695        chunk_delay_us: int | None = None,
696        progress_callback: ProgressCallback | None = None,
697        initial_offset: int = 0,
698    ):
699        super().__init__(
700            session_id,
701            resource_id,
702            send_chunk,
703            end_transfer,
704            response_timeout_s,
705            initial_response_timeout_s,
706            max_retries,
707            max_lifetime_retries,
708            protocol_version,
709            progress_callback,
710            initial_offset=initial_offset,
711        )
712        self._max_window_size_bytes = max_window_size_bytes
713        self._max_chunk_size = max_chunk_size
714        self._chunk_delay_us = chunk_delay_us
715
716        self._remaining_transfer_size: int | None = None
717        self._data = bytearray()
718        self._window_end_offset = max_chunk_size
719        self._window_size_multiplier = 1
720        self._window_size = self._max_chunk_size * self._window_size_multiplier
721        self._transmit_phase = ReadTransfer._TransmitPhase.SLOW_START
722        self._last_chunk_offset: int | None = None
723
724    @property
725    def data(self) -> bytes:
726        """Returns an immutable copy of the data that has been read."""
727        return bytes(self._data)
728
729    def _set_initial_chunk_fields(self, chunk: Chunk) -> None:
730        self._update_and_set_transfer_parameters(
731            chunk, ReadTransfer._TransmitAction.BEGIN
732        )
733
734    async def _handle_data_chunk(self, chunk: Chunk) -> None:
735        """Processes an incoming chunk from the server.
736
737        In a read transfer, the client receives data chunks from the server.
738        Once all pending data is received, the transfer parameters are updated.
739        """
740
741        if self._state is Transfer._State.RECOVERY:
742            if chunk.offset != self._offset:
743                if self._last_chunk_offset == chunk.offset:
744                    _LOG.debug(
745                        'Transfer %d received repeated offset %d: '
746                        'retry detected, resending transfer parameters',
747                        self.id,
748                        chunk.offset,
749                    )
750                    self._send_chunk(
751                        self._transfer_parameters(
752                            ReadTransfer._TransmitAction.RETRANSMIT
753                        )
754                    )
755                else:
756                    _LOG.debug(
757                        'Transfer %d waiting for offset %d, ignoring %d',
758                        self.id,
759                        self._offset,
760                        chunk.offset,
761                    )
762                    self._last_chunk_offset = chunk.offset
763                return
764
765            _LOG.info(
766                'Transfer %d received expected offset %d, resuming transfer',
767                self.id,
768                chunk.offset,
769            )
770            self._state = Transfer._State.WAITING
771
772        assert self._state is Transfer._State.WAITING
773
774        if chunk.offset != self._offset:
775            # Initially, the transfer service only supports in-order transfers.
776            # If data is received out of order, request that the server
777            # retransmit from the previous offset.
778            _LOG.debug(
779                'Transfer %d expected offset %d, received %d: '
780                'entering recovery state',
781                self.id,
782                self._offset,
783                chunk.offset,
784            )
785            self._state = Transfer._State.RECOVERY
786
787            self._send_chunk(
788                self._transfer_parameters(
789                    ReadTransfer._TransmitAction.RETRANSMIT
790                )
791            )
792            return
793
794        self._data += chunk.data
795        self._offset += len(chunk.data)
796
797        # Update the last offset seen so that retries can be detected.
798        self._last_chunk_offset = chunk.offset
799
800        if chunk.remaining_bytes is not None:
801            if chunk.remaining_bytes == 0:
802                # No more data to read. Acknowledge receipt and finish.
803                self._send_final_chunk(Status.OK)
804                return
805
806            # The server may indicate if the amount of remaining data is known.
807            self._remaining_transfer_size = chunk.remaining_bytes
808        elif self._remaining_transfer_size is not None:
809            # Update the remaining transfer size, if it is known.
810            self._remaining_transfer_size -= len(chunk.data)
811
812            # If the transfer size drops to zero, the estimate was inaccurate.
813            if self._remaining_transfer_size <= 0:
814                self._remaining_transfer_size = None
815
816        total_size = (
817            None
818            if self._remaining_transfer_size is None
819            else (self._remaining_transfer_size + self._offset)
820        )
821        self._update_progress(self._offset, self._offset, total_size)
822
823        if chunk.window_end_offset != 0:
824            if chunk.window_end_offset < self._offset:
825                _LOG.error(
826                    'Transfer %d: transmitter sent invalid earlier end offset '
827                    '%d (receiver offset %d)',
828                    self.id,
829                    chunk.window_end_offset,
830                    self._offset,
831                )
832                self._send_final_chunk(Status.INTERNAL)
833                return
834
835            if chunk.window_end_offset > self._window_end_offset:
836                _LOG.error(
837                    'Transfer %d: transmitter sent invalid later end offset '
838                    '%d (receiver end offset %d)',
839                    self.id,
840                    chunk.window_end_offset,
841                    self._window_end_offset,
842                )
843                self._send_final_chunk(Status.INTERNAL)
844                return
845
846            self._window_end_offset = chunk.window_end_offset
847
848        if self._offset == self._window_end_offset:
849            # All pending data was received. Send out a new parameters chunk for
850            # the next block.
851            self._send_chunk(
852                self._transfer_parameters(ReadTransfer._TransmitAction.EXTEND)
853            )
854            return
855
856        remaining_window_size = self._window_end_offset - self._offset
857        extend_window = (
858            remaining_window_size
859            <= self._window_size / ReadTransfer.EXTEND_WINDOW_DIVISOR
860        )
861
862        if extend_window:
863            self._send_chunk(
864                self._transfer_parameters(ReadTransfer._TransmitAction.EXTEND)
865            )
866
867    def _retry_after_data_timeout(self) -> None:
868        if (
869            self._state is Transfer._State.WAITING
870            or self._state is Transfer._State.RECOVERY
871        ):
872            self._send_chunk(
873                self._transfer_parameters(
874                    ReadTransfer._TransmitAction.RETRANSMIT
875                )
876            )
877
878    def _update_and_set_transfer_parameters(
879        self, chunk: Chunk, action: 'ReadTransfer._TransmitAction'
880    ) -> None:
881        if action is ReadTransfer._TransmitAction.EXTEND:
882            # Window was received successfully without packet loss and should
883            # grow. Double the window size during slow start, or increase it by
884            # a single chunk in congestion avoidance.
885            if self._transmit_phase == ReadTransfer._TransmitPhase.SLOW_START:
886                self._window_size_multiplier *= 2
887            else:
888                self._window_size_multiplier += 1
889
890            # The window size can never exceed the user-specified maximum bytes.
891            # If it does, reduce the multiplier to the largest size that fits.
892            if (
893                self._window_size_multiplier * self._max_chunk_size
894                > self._max_window_size_bytes
895            ):
896                self._window_size_multiplier = (
897                    self._max_window_size_bytes // self._max_chunk_size
898                )
899
900        elif action is ReadTransfer._TransmitAction.RETRANSMIT:
901            # A packet was lost: shrink the window size. Additionally, after the
902            # first packet loss, transition from the slow start to the
903            # congestion avoidance phase of the transfer.
904            if self._transmit_phase == ReadTransfer._TransmitPhase.SLOW_START:
905                self._transmit_phase = (
906                    ReadTransfer._TransmitPhase.CONGESTION_AVOIDANCE
907                )
908            self._window_size_multiplier = max(
909                self._window_size_multiplier // 2, 1
910            )
911
912        self._window_size = min(
913            self._max_chunk_size * self._window_size_multiplier,
914            self._max_window_size_bytes,
915        )
916
917        self._window_end_offset = self._offset + self._window_size
918
919        chunk.offset = self._offset
920        chunk.window_end_offset = self._window_end_offset
921        chunk.max_chunk_size_bytes = self._max_chunk_size
922
923        if self._chunk_delay_us:
924            chunk.min_delay_microseconds = self._chunk_delay_us
925
926    def _transfer_parameters(
927        self, action: 'ReadTransfer._TransmitAction'
928    ) -> Chunk:
929        """Returns an updated transfer parameters chunk."""
930
931        if action is ReadTransfer._TransmitAction.BEGIN:
932            chunk_type = Chunk.Type.START
933        elif action is ReadTransfer._TransmitAction.EXTEND:
934            chunk_type = Chunk.Type.PARAMETERS_CONTINUE
935        else:
936            chunk_type = Chunk.Type.PARAMETERS_RETRANSMIT
937
938        chunk = Chunk(
939            self._configured_protocol_version, chunk_type, session_id=self.id
940        )
941        self._update_and_set_transfer_parameters(chunk, action)
942
943        return chunk
944