• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2024 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Classes for read and write transfers."""
15
16import abc
17import asyncio
18from dataclasses import dataclass
19import enum
20import logging
21import math
22import threading
23from typing import Any, Callable
24
25from pw_status import Status
26from pw_transfer.chunk import Chunk, ProtocolVersion
27
28_LOG = logging.getLogger(__package__)
29
30
31@dataclass(frozen=True)
32class ProgressStats:
33    bytes_sent: int
34    bytes_confirmed_received: int
35    total_size_bytes: int | None
36
37    def percent_received(self) -> float:
38        if self.total_size_bytes is None or self.total_size_bytes == 0:
39            return math.nan
40
41        return self.bytes_confirmed_received / self.total_size_bytes * 100
42
43    def __str__(self) -> str:
44        total = (
45            str(self.total_size_bytes) if self.total_size_bytes else 'unknown'
46        )
47        return (
48            f'{self.percent_received():5.1f}% ({self.bytes_sent} B sent, '
49            f'{self.bytes_confirmed_received} B received of {total} B)'
50        )
51
52
53ProgressCallback = Callable[[ProgressStats], Any]
54
55
56class _Timer:
57    """A timer which invokes a callback after a certain timeout."""
58
59    def __init__(self, timeout_s: float, callback: Callable[[], Any]):
60        self.timeout_s = timeout_s
61        self._callback = callback
62        self._task: asyncio.Task[Any] | None = None
63
64    def start(self, timeout_s: float | None = None) -> None:
65        """Starts a new timer.
66
67        If a timer is already running, it is stopped and a new timer started.
68        This can be used to implement watchdog-like behavior, where a callback
69        is invoked after some time without a kick.
70        """
71        self.stop()
72        timeout_s = self.timeout_s if timeout_s is None else timeout_s
73        self._task = asyncio.create_task(self._run(timeout_s))
74
75    def stop(self) -> None:
76        """Terminates a running timer."""
77        if self._task is not None:
78            self._task.cancel()
79            self._task = None
80
81    async def _run(self, timeout_s: float) -> None:
82        await asyncio.sleep(timeout_s)
83        self._task = None
84        self._callback()
85
86
87class Transfer(abc.ABC):
88    """A client-side data transfer through a Manager.
89
90    Subclasses are responsible for implementing all of the logic for their type
91    of transfer, receiving messages from the server and sending the appropriate
92    messages in response.
93    """
94
95    # pylint: disable=too-many-instance-attributes
96
97    class _State(enum.Enum):
98        # Transfer is starting. The server and client are performing an initial
99        # handshake and negotiating protocol and feature flags.
100        INITIATING = 0
101
102        # Waiting for the other end to send a chunk.
103        WAITING = 1
104
105        # Transmitting a window of data to a receiver.
106        TRANSMITTING = 2
107
108        # Recovering after one or more chunks was dropped in an active transfer.
109        RECOVERY = 3
110
111        # Transfer has completed locally and is waiting for the peer to
112        # acknowledge its final status. Only entered by the terminating side of
113        # the transfer.
114        #
115        # The context remains in a TERMINATING state until it receives an
116        # acknowledgement from the peer or times out.
117        TERMINATING = 4
118
119        # A transfer has fully completed.
120        COMPLETE = 5
121
122    def __init__(  # pylint: disable=too-many-arguments
123        self,
124        session_id: int,
125        resource_id: int,
126        send_chunk: Callable[[Chunk], None],
127        end_transfer: Callable[['Transfer'], None],
128        response_timeout_s: float,
129        initial_response_timeout_s: float,
130        max_retries: int,
131        max_lifetime_retries: int,
132        protocol_version: ProtocolVersion,
133        progress_callback: ProgressCallback | None = None,
134        initial_offset: int = 0,
135    ):
136        self.status = Status.OK
137        self.done = threading.Event()
138
139        self._session_id = session_id
140        self._resource_id = resource_id
141        self._offset = initial_offset
142
143        self._send_chunk_fn = send_chunk
144        self._end_transfer = end_transfer
145
146        self._desired_protocol_version = protocol_version
147        self._configured_protocol_version = ProtocolVersion.UNKNOWN
148
149        if self._desired_protocol_version is ProtocolVersion.LEGACY:
150            # In a legacy transfer, there is no protocol negotiation stage.
151            # Automatically configure the context to run the legacy protocol and
152            # proceed to waiting for a chunk.
153            self._configured_protocol_version = ProtocolVersion.LEGACY
154            self._state = Transfer._State.WAITING
155            self._session_id = self._resource_id
156        else:
157            self._state = Transfer._State.INITIATING
158
159        self._last_chunk: Chunk | None = None
160
161        self._retries = 0
162        self._max_retries = max_retries
163        self._lifetime_retries = 0
164        self._max_lifetime_retries = max_lifetime_retries
165        self._response_timer = _Timer(response_timeout_s, self._on_timeout)
166        self._initial_response_timeout_s = initial_response_timeout_s
167
168        self._progress_callback = progress_callback
169
170    async def begin(self) -> None:
171        """Sends the initial chunk of the transfer."""
172
173        if (
174            self._desired_protocol_version is ProtocolVersion.UNKNOWN
175            or self._desired_protocol_version.value
176            > ProtocolVersion.LATEST.value
177        ):
178            _LOG.error(
179                'Cannot start a transfer with unsupported protocol version %d',
180                self._desired_protocol_version.value,
181            )
182            self.finish(Status.INVALID_ARGUMENT)
183            return
184
185        initial_chunk = Chunk(
186            self._desired_protocol_version,
187            Chunk.Type.START,
188            resource_id=self._resource_id,
189        )
190
191        if self._offset != 0:
192            initial_chunk.initial_offset = self._offset
193
194        if self._desired_protocol_version is ProtocolVersion.VERSION_TWO:
195            initial_chunk.desired_session_id = self._session_id
196
197        # Regardless of the desired protocol version, set any additional fields
198        # on the opening chunk, in case the server only runs legacy.
199        self._set_initial_chunk_fields(initial_chunk)
200
201        self._send_chunk(initial_chunk)
202        self._response_timer.start(self._initial_response_timeout_s)
203
204    @property
205    def id(self) -> int:
206        """Returns the identifier for the active transfer."""
207        return self._session_id
208
209    @property
210    def resource_id(self) -> int:
211        """Returns the identifier of the resource being transferred."""
212        return self._resource_id
213
214    @property
215    @abc.abstractmethod
216    def data(self) -> bytes:
217        """Returns the data read or written in this transfer."""
218
219    @abc.abstractmethod
220    def _set_initial_chunk_fields(self, chunk: Chunk) -> None:
221        """Sets fields for the initial non-handshake chunk of the transfer."""
222
223    def _send_chunk(self, chunk: Chunk) -> None:
224        """Sends a chunk to the server, keeping track of the last chunk sent."""
225        self._send_chunk_fn(chunk)
226        self._last_chunk = chunk
227
228    async def handle_chunk(self, chunk: Chunk) -> None:
229        """Processes an incoming chunk from the server.
230
231        Handles terminating chunks (i.e. those with a status) and forwards
232        non-terminating chunks to handle_data_chunk.
233        """
234        self._response_timer.stop()
235        self._retries = 0  # Received data from service, so reset the retries.
236
237        _LOG.debug('Received chunk\n%s', str(chunk.to_message()).rstrip())
238
239        # Status chunks are only used to terminate a transfer. They do not
240        # contain any data that requires processing.
241        if chunk.status is not None:
242            if self._configured_protocol_version is ProtocolVersion.VERSION_TWO:
243                self._send_chunk(
244                    Chunk(
245                        self._configured_protocol_version,
246                        Chunk.Type.COMPLETION_ACK,
247                        session_id=self._session_id,
248                    )
249                )
250
251            self.finish(Status(chunk.status))
252            return
253
254        if self._state is Transfer._State.INITIATING:
255            await self._perform_initial_handshake(chunk)
256        elif self._state is Transfer._State.TERMINATING:
257            if chunk.type is Chunk.Type.COMPLETION_ACK:
258                self.finish(self.status)
259            else:
260                # Expecting a completion ACK but didn't receive one. Go through
261                # the retry process.
262                self._on_timeout()
263        # Only ignoring START_ACK, tests were unhappy with other non-data chunks
264        elif chunk.type not in [Chunk.Type.START_ACK]:
265            await self._handle_data_chunk(chunk)
266        else:
267            _LOG.warning("Ignoring extra START_ACK chunk")
268            return
269
270        # Start the timeout for the server to send a chunk in response.
271        self._response_timer.start()
272
273    async def _perform_initial_handshake(self, chunk: Chunk) -> None:
274        """Progresses the initial handshake phase of a v2+ transfer."""
275        assert self._state is Transfer._State.INITIATING
276
277        # If a non-handshake chunk is received during an INITIATING state, the
278        # transfer server is running a legacy protocol version, which does not
279        # perform a handshake. End the handshake, revert to the legacy protocol,
280        # and process the chunk appropriately.
281        if chunk.type is not Chunk.Type.START_ACK:
282            _LOG.debug(
283                'Transfer %d got non-handshake chunk, reverting to legacy',
284                self.id,
285            )
286
287            if self._offset != 0:
288                _LOG.error(
289                    'Non-zero offset transfers not supported by legacy protocol'
290                )
291                self.finish(Status.INTERNAL)
292                return
293
294            self._configured_protocol_version = ProtocolVersion.LEGACY
295            self._state = Transfer._State.WAITING
296
297            # Update the transfer's session ID, which will map to the
298            # transfer_id of the legacy chunk.
299            self._session_id = chunk.session_id
300
301            await self._handle_data_chunk(chunk)
302            return
303
304        self._configured_protocol_version = ProtocolVersion(
305            min(
306                self._desired_protocol_version.value,
307                chunk.protocol_version.value,
308            )
309        )
310        _LOG.debug(
311            'Transfer %d negotiating protocol version: ours=%d, theirs=%d',
312            self.id,
313            self._desired_protocol_version.value,
314            chunk.protocol_version.value,
315        )
316
317        if self._offset != chunk.initial_offset:
318            # If our offsets don't match, let user handle it
319            self.finish(Status.UNIMPLEMENTED)
320            return
321
322        # Send a confirmation chunk to the server accepting the assigned session
323        # ID and protocol version. Tag any initial transfer parameters onto the
324        # chunk to begin the data transfer.
325        start_ack_confirmation = Chunk(
326            self._configured_protocol_version,
327            Chunk.Type.START_ACK_CONFIRMATION,
328            session_id=self._session_id,
329            offset=self._offset,
330        )
331
332        self._set_initial_chunk_fields(start_ack_confirmation)
333
334        self._state = Transfer._State.WAITING
335        self._send_chunk(start_ack_confirmation)
336
337    @abc.abstractmethod
338    async def _handle_data_chunk(self, chunk: Chunk) -> None:
339        """Handles a chunk that contains or requests data."""
340
341    @abc.abstractmethod
342    def _retry_after_data_timeout(self) -> None:
343        """Retries after a timeout occurs during the data transfer phase.
344
345        Only invoked when in the data transfer phase (i.e. state is in
346        {WAITING, TRANSMITTING, RECOVERY}). Timeouts occurring during an
347        opening or closing handshake are handled by the base Transfer.
348        """
349
350    def _on_timeout(self) -> None:
351        """Handles a timeout while waiting for a chunk."""
352        if self._state is Transfer._State.COMPLETE:
353            return
354
355        self._retries += 1
356        self._lifetime_retries += 1
357
358        if (
359            self._retries > self._max_retries
360            or self._lifetime_retries > self._max_lifetime_retries
361        ):
362            if self._state is Transfer._State.TERMINATING:
363                # If the server never responded to the sent completion chunk,
364                # simply end the transfer locally with its original status.
365                self.finish(self.status)
366            else:
367                self.finish(Status.DEADLINE_EXCEEDED)
368            return
369
370        _LOG.debug(
371            'Received no responses for %.3fs; retrying %d/%d',
372            self._response_timer.timeout_s,
373            self._retries,
374            self._max_retries,
375        )
376
377        retry_handshake_chunk = self._state in (
378            Transfer._State.INITIATING,
379            Transfer._State.TERMINATING,
380        ) or (
381            self._last_chunk is not None
382            and self._last_chunk.type is Chunk.Type.START_ACK_CONFIRMATION
383        )
384
385        if retry_handshake_chunk:
386            assert self._last_chunk is not None
387            self._send_chunk(self._last_chunk)
388        else:
389            self._retry_after_data_timeout()
390
391        if (
392            self._last_chunk is not None
393            and self._last_chunk.type is Chunk.Type.START
394        ):
395            self._response_timer.start(self._initial_response_timeout_s)
396        else:
397            self._response_timer.start()
398
399    def finish(self, status: Status, skip_callback: bool = False) -> None:
400        """Ends the transfer with the specified status."""
401        self._response_timer.stop()
402        self.status = status
403
404        if status.ok():
405            total_size = len(self.data)
406            self._update_progress(total_size, total_size, total_size)
407
408        if not skip_callback:
409            self._end_transfer(self)
410
411        # Set done last so that the transfer has been fully cleaned up.
412        self._state = Transfer._State.COMPLETE
413        self.done.set()
414
415    def _update_progress(
416        self,
417        bytes_sent: int,
418        bytes_confirmed_received: int,
419        total_size_bytes: int | None,
420    ) -> None:
421        """Invokes the provided progress callback, if any, with the progress."""
422
423        stats = ProgressStats(
424            bytes_sent, bytes_confirmed_received, total_size_bytes
425        )
426        _LOG.debug('Transfer %d progress: %s', self.id, stats)
427
428        if self._progress_callback:
429            self._progress_callback(stats)
430
431    def _send_final_chunk(self, status: Status) -> None:
432        """Sends a status chunk to the server and finishes the transfer."""
433        self._send_chunk(
434            Chunk(
435                self._configured_protocol_version,
436                Chunk.Type.COMPLETION,
437                session_id=self.id,
438                status=status,
439            )
440        )
441
442        if self._configured_protocol_version is ProtocolVersion.VERSION_TWO:
443            # Wait for a completion ACK from the server.
444            self.status = status
445            self._state = Transfer._State.TERMINATING
446            self._response_timer.start()
447        else:
448            self.finish(status)
449
450
451class WriteTransfer(Transfer):
452    """A client -> server write transfer."""
453
454    def __init__(  # pylint: disable=too-many-arguments
455        self,
456        session_id: int,
457        resource_id: int,
458        data: bytes,
459        send_chunk: Callable[[Chunk], None],
460        end_transfer: Callable[[Transfer], None],
461        response_timeout_s: float,
462        initial_response_timeout_s: float,
463        max_retries: int,
464        max_lifetime_retries: int,
465        protocol_version: ProtocolVersion,
466        progress_callback: ProgressCallback | None = None,
467        initial_offset: int = 0,
468    ):
469        super().__init__(
470            session_id,
471            resource_id,
472            send_chunk,
473            end_transfer,
474            response_timeout_s,
475            initial_response_timeout_s,
476            max_retries,
477            max_lifetime_retries,
478            protocol_version,
479            progress_callback,
480            initial_offset=initial_offset,
481        )
482        self._data = data
483        self.initial_offset = initial_offset
484
485        self._window_end_offset = 0
486        self._max_chunk_size = 0
487        self._chunk_delay_us: int | None = None
488
489        # The window ID increments for each parameters update.
490        self._window_id = 0
491
492        self._bytes_confirmed_received = 0
493
494    @property
495    def data(self) -> bytes:
496        return self._data
497
498    def _set_initial_chunk_fields(self, chunk: Chunk) -> None:
499        # Nothing to tag onto the initial chunk in a write transfer.
500        pass
501
502    async def _handle_data_chunk(self, chunk: Chunk) -> None:
503        """Processes an incoming chunk from the server.
504
505        In a write transfer, the server only sends transfer parameter updates
506        to the client. When a message is received, update local parameters and
507        send data accordingly.
508        """
509
510        if self._state is Transfer._State.TRANSMITTING:
511            self._state = Transfer._State.WAITING
512
513        assert self._state is Transfer._State.WAITING
514
515        if not self._handle_parameters_update(chunk):
516            return
517
518        self._bytes_confirmed_received = chunk.offset
519        self._state = Transfer._State.TRANSMITTING
520
521        self._window_id += 1
522        asyncio.create_task(self._transmit_next_chunk(self._window_id))
523
524    async def _transmit_next_chunk(
525        self, window_id: int, timeout_us: int | None = None
526    ) -> None:
527        """Transmits a single data chunk to the server.
528
529        If the chunk completes the active window, returns to a WAITING state.
530        Otherwise, schedules another transmission for the next chunk.
531        """
532        if timeout_us is not None:
533            await asyncio.sleep(timeout_us / 1e6)
534
535        if self._state is not Transfer._State.TRANSMITTING:
536            return
537
538        if window_id != self._window_id:
539            _LOG.debug('Transfer %d: Skipping stale window', self.id)
540            return
541
542        chunk = self._next_chunk()
543        self._offset += len(chunk.data)
544
545        sent_requested_bytes = self._offset == self._window_end_offset
546
547        self._send_chunk(chunk)
548        self._update_progress(
549            self._offset,
550            self._bytes_confirmed_received,
551            len(self.data) + self.initial_offset,
552        )
553
554        if sent_requested_bytes:
555            self._state = Transfer._State.WAITING
556        else:
557            asyncio.create_task(
558                self._transmit_next_chunk(
559                    window_id, timeout_us=self._chunk_delay_us
560                )
561            )
562
563    def _handle_parameters_update(self, chunk: Chunk) -> bool:
564        """Updates transfer state based on a transfer parameters update."""
565
566        if chunk.offset > len(self.data) + self.initial_offset:
567            # Bad offset; terminate the transfer.
568            _LOG.error(
569                'Transfer %d: server requested invalid offset %d (size %d)',
570                self.id,
571                chunk.offset,
572                len(self.data) + self.initial_offset,
573            )
574
575            self._send_final_chunk(Status.OUT_OF_RANGE)
576            return False
577
578        if chunk.offset == chunk.window_end_offset:
579            _LOG.error(
580                'Transfer %d: service requested 0 bytes (invalid); aborting',
581                self.id,
582            )
583            self._send_final_chunk(Status.INTERNAL)
584            return False
585
586        # Extend the window to the new end offset specified by the server.
587        self._window_end_offset = min(
588            chunk.window_end_offset, len(self.data) + self.initial_offset
589        )
590
591        if chunk.requests_transmission_from_offset():
592            # Check whether the client has sent a previous data offset, which
593            # indicates that some chunks were lost in transmission.
594            if chunk.offset < self._offset:
595                _LOG.debug(
596                    'Write transfer %d rolling back: offset %d from %d',
597                    self.id,
598                    chunk.offset,
599                    self._offset,
600                )
601
602            self._offset = chunk.offset
603        elif (
604            chunk.type is Chunk.Type.PARAMETERS_CONTINUE
605            and chunk.window_end_offset <= self._offset
606        ):
607            _LOG.debug(
608                'Write transfer %d ignoring old rolling window chunk',
609                self.id,
610            )
611            return False
612
613        if chunk.max_chunk_size_bytes is not None:
614            self._max_chunk_size = chunk.max_chunk_size_bytes
615
616        if chunk.min_delay_microseconds is not None:
617            self._chunk_delay_us = chunk.min_delay_microseconds
618
619        return True
620
621    def _retry_after_data_timeout(self) -> None:
622        if (
623            self._state is Transfer._State.WAITING
624            and self._last_chunk is not None
625        ):
626            self._send_chunk(self._last_chunk)
627
628    def _next_chunk(self) -> Chunk:
629        """Returns the next Chunk message to send in the data transfer."""
630        chunk = Chunk(
631            self._configured_protocol_version,
632            Chunk.Type.DATA,
633            session_id=self.id,
634            offset=self._offset,
635        )
636
637        max_bytes_in_chunk = min(
638            self._max_chunk_size, self._window_end_offset - self._offset
639        )
640        chunk.data = self.data[
641            self._offset
642            - self.initial_offset : self._offset
643            - self.initial_offset
644            + max_bytes_in_chunk
645        ]
646
647        # Mark the final chunk of the transfer.
648        if (
649            len(self.data) - self._offset + self.initial_offset
650            <= max_bytes_in_chunk
651        ):
652            chunk.remaining_bytes = 0
653
654        return chunk
655
656
657class ReadTransfer(Transfer):
658    """A client <- server read transfer.
659
660    Although Python can effectively handle an unlimited transfer window, this
661    client sets a conservative window and chunk size to avoid overloading the
662    device. These are configurable in the constructor.
663    """
664
665    # pylint: disable=too-many-instance-attributes
666
667    # The fractional position within a window at which a receive transfer should
668    # extend its window size to minimize the amount of time the transmitter
669    # spends blocked.
670    #
671    # For example, a divisor of 2 will extend the window when half of the
672    # requested data has been received, a divisor of three will extend at a
673    # third of the window, and so on.
674    EXTEND_WINDOW_DIVISOR = 2
675
676    # Slow start and congestion avoidance are analogues to the equally named
677    # phases in TCP congestion control.
678    class _TransmitPhase(enum.Enum):
679        SLOW_START = 0
680        CONGESTION_AVOIDANCE = 1
681
682    # The type of data transmission the transfer is requesting.
683    class _TransmitAction(enum.Enum):
684        # Immediate parameters sent at the start of a new transfer for legacy
685        # compatibility.
686        BEGIN = 0
687
688        # Initial parameters chunk following the opening handshake.
689        FIRST_PARAMETERS = 1
690
691        # Extend the current transmission window.
692        EXTEND = 2
693
694        # Rewind the transfer to a certain offset following data loss.
695        RETRANSMIT = 3
696
697    def __init__(  # pylint: disable=too-many-arguments
698        self,
699        session_id: int,
700        resource_id: int,
701        send_chunk: Callable[[Chunk], None],
702        end_transfer: Callable[[Transfer], None],
703        response_timeout_s: float,
704        initial_response_timeout_s: float,
705        max_retries: int,
706        max_lifetime_retries: int,
707        protocol_version: ProtocolVersion,
708        max_window_size_bytes: int = 32768,
709        max_chunk_size: int = 1024,
710        chunk_delay_us: int | None = None,
711        progress_callback: ProgressCallback | None = None,
712        initial_offset: int = 0,
713    ):
714        super().__init__(
715            session_id,
716            resource_id,
717            send_chunk,
718            end_transfer,
719            response_timeout_s,
720            initial_response_timeout_s,
721            max_retries,
722            max_lifetime_retries,
723            protocol_version,
724            progress_callback,
725            initial_offset=initial_offset,
726        )
727        self._max_window_size_bytes = max_window_size_bytes
728        self._max_chunk_size = max_chunk_size
729        self._chunk_delay_us = chunk_delay_us
730
731        self._remaining_transfer_size: int | None = None
732        self._data = bytearray()
733        self._window_end_offset = max_chunk_size
734        self._window_size_multiplier = 1
735        self._window_size = self._max_chunk_size * self._window_size_multiplier
736        self._transmit_phase = ReadTransfer._TransmitPhase.SLOW_START
737        self._last_chunk_offset: int | None = None
738
739    @property
740    def data(self) -> bytes:
741        """Returns an immutable copy of the data that has been read."""
742        return bytes(self._data)
743
744    def _set_initial_chunk_fields(self, chunk: Chunk) -> None:
745        self._update_and_set_transfer_parameters(
746            chunk, ReadTransfer._TransmitAction.BEGIN
747        )
748
749    async def _handle_data_chunk(self, chunk: Chunk) -> None:
750        """Processes an incoming chunk from the server.
751
752        In a read transfer, the client receives data chunks from the server.
753        Once all pending data is received, the transfer parameters are updated.
754        """
755
756        if self._state is Transfer._State.RECOVERY:
757            if chunk.offset != self._offset:
758                if self._last_chunk_offset == chunk.offset:
759                    _LOG.debug(
760                        'Transfer %d received repeated offset %d: '
761                        'retry detected, resending transfer parameters',
762                        self.id,
763                        chunk.offset,
764                    )
765                    self._send_chunk(
766                        self._transfer_parameters(
767                            ReadTransfer._TransmitAction.RETRANSMIT
768                        )
769                    )
770                else:
771                    _LOG.debug(
772                        'Transfer %d waiting for offset %d, ignoring %d',
773                        self.id,
774                        self._offset,
775                        chunk.offset,
776                    )
777                    self._last_chunk_offset = chunk.offset
778                return
779
780            _LOG.info(
781                'Transfer %d received expected offset %d, resuming transfer',
782                self.id,
783                chunk.offset,
784            )
785            self._state = Transfer._State.WAITING
786
787        assert self._state is Transfer._State.WAITING
788
789        if chunk.offset != self._offset:
790            if chunk.offset + len(chunk.data) <= self._offset:
791                # If the chunk's data has already been received, don't go
792                # through a full recovery cycle to avoid shrinking the window
793                # size and potentially thrashing. The expected data may already
794                # be in-flight, so just allow the transmitter to keep going with
795                # a CONTINUE parameters chunk.
796                _LOG.debug(
797                    'Transfer %d received duplicate chunk with offset %d',
798                    self.id,
799                    chunk.offset,
800                )
801                self._send_chunk(
802                    self._transfer_parameters(
803                        ReadTransfer._TransmitAction.EXTEND,
804                        update=False,
805                    )
806                )
807            else:
808                # Initially, the transfer service only supports in-order
809                # transfers. If data is received out of order, request that the
810                # server retransmit from the previous offset.
811                _LOG.debug(
812                    'Transfer %d expected offset %d, received %d: '
813                    'entering recovery state',
814                    self.id,
815                    self._offset,
816                    chunk.offset,
817                )
818                self._state = Transfer._State.RECOVERY
819
820                self._send_chunk(
821                    self._transfer_parameters(
822                        ReadTransfer._TransmitAction.RETRANSMIT
823                    )
824                )
825            return
826
827        self._data += chunk.data
828        self._offset += len(chunk.data)
829
830        # Update the last offset seen so that retries can be detected.
831        self._last_chunk_offset = chunk.offset
832
833        if chunk.remaining_bytes is not None:
834            if chunk.remaining_bytes == 0:
835                # No more data to read. Acknowledge receipt and finish.
836                self._send_final_chunk(Status.OK)
837                return
838
839            # The server may indicate if the amount of remaining data is known.
840            self._remaining_transfer_size = chunk.remaining_bytes
841        elif self._remaining_transfer_size is not None:
842            # Update the remaining transfer size, if it is known.
843            self._remaining_transfer_size -= len(chunk.data)
844
845            # If the transfer size drops to zero, the estimate was inaccurate.
846            if self._remaining_transfer_size <= 0:
847                self._remaining_transfer_size = None
848
849        total_size = (
850            None
851            if self._remaining_transfer_size is None
852            else (self._remaining_transfer_size + self._offset)
853        )
854        self._update_progress(self._offset, self._offset, total_size)
855
856        if chunk.window_end_offset != 0:
857            if chunk.window_end_offset < self._offset:
858                _LOG.error(
859                    'Transfer %d: transmitter sent invalid earlier end offset '
860                    '%d (receiver offset %d)',
861                    self.id,
862                    chunk.window_end_offset,
863                    self._offset,
864                )
865                self._send_final_chunk(Status.INTERNAL)
866                return
867
868            if chunk.window_end_offset > self._window_end_offset:
869                _LOG.error(
870                    'Transfer %d: transmitter sent invalid later end offset '
871                    '%d (receiver end offset %d)',
872                    self.id,
873                    chunk.window_end_offset,
874                    self._window_end_offset,
875                )
876                self._send_final_chunk(Status.INTERNAL)
877                return
878
879            self._window_end_offset = chunk.window_end_offset
880
881        if self._offset == self._window_end_offset:
882            # All pending data was received. Send out a new parameters chunk for
883            # the next block.
884            self._send_chunk(
885                self._transfer_parameters(ReadTransfer._TransmitAction.EXTEND)
886            )
887            return
888
889        remaining_window_size = self._window_end_offset - self._offset
890        extend_window = (
891            remaining_window_size
892            <= self._window_size / ReadTransfer.EXTEND_WINDOW_DIVISOR
893        )
894
895        if extend_window:
896            self._send_chunk(
897                self._transfer_parameters(ReadTransfer._TransmitAction.EXTEND)
898            )
899
900    def _retry_after_data_timeout(self) -> None:
901        if (
902            self._state is Transfer._State.WAITING
903            or self._state is Transfer._State.RECOVERY
904        ):
905            self._send_chunk(
906                self._transfer_parameters(
907                    ReadTransfer._TransmitAction.RETRANSMIT
908                )
909            )
910
911    def _set_transfer_parameters(
912        self,
913        chunk: Chunk,
914    ) -> None:
915        chunk.offset = self._offset
916        chunk.window_end_offset = self._window_end_offset
917        chunk.max_chunk_size_bytes = self._max_chunk_size
918
919        if self._chunk_delay_us:
920            chunk.min_delay_microseconds = self._chunk_delay_us
921
922    def _update_and_set_transfer_parameters(
923        self, chunk: Chunk, action: 'ReadTransfer._TransmitAction'
924    ) -> None:
925        if action is ReadTransfer._TransmitAction.EXTEND:
926            # Window was received successfully without packet loss and should
927            # grow. Double the window size during slow start, or increase it by
928            # a single chunk in congestion avoidance.
929            if self._transmit_phase == ReadTransfer._TransmitPhase.SLOW_START:
930                self._window_size_multiplier *= 2
931            else:
932                self._window_size_multiplier += 1
933
934            # The window size can never exceed the user-specified maximum bytes.
935            # If it does, reduce the multiplier to the largest size that fits.
936            if (
937                self._window_size_multiplier * self._max_chunk_size
938                > self._max_window_size_bytes
939            ):
940                self._window_size_multiplier = (
941                    self._max_window_size_bytes // self._max_chunk_size
942                )
943
944        elif action is ReadTransfer._TransmitAction.RETRANSMIT:
945            # A packet was lost: shrink the window size. Additionally, after the
946            # first packet loss, transition from the slow start to the
947            # congestion avoidance phase of the transfer.
948            if self._transmit_phase == ReadTransfer._TransmitPhase.SLOW_START:
949                self._transmit_phase = (
950                    ReadTransfer._TransmitPhase.CONGESTION_AVOIDANCE
951                )
952            self._window_size_multiplier = max(
953                self._window_size_multiplier // 2, 1
954            )
955
956        self._window_size = min(
957            self._max_chunk_size * self._window_size_multiplier,
958            self._max_window_size_bytes,
959        )
960
961        self._window_end_offset = self._offset + self._window_size
962        self._set_transfer_parameters(chunk)
963
964    def _transfer_parameters(
965        self,
966        action: 'ReadTransfer._TransmitAction',
967        update: bool = True,
968    ) -> Chunk:
969        """Returns an updated transfer parameters chunk."""
970
971        if action is ReadTransfer._TransmitAction.BEGIN:
972            chunk_type = Chunk.Type.START
973        elif action is ReadTransfer._TransmitAction.EXTEND:
974            chunk_type = Chunk.Type.PARAMETERS_CONTINUE
975        else:
976            chunk_type = Chunk.Type.PARAMETERS_RETRANSMIT
977
978        chunk = Chunk(
979            self._configured_protocol_version, chunk_type, session_id=self.id
980        )
981
982        if update:
983            self._update_and_set_transfer_parameters(chunk, action)
984        else:
985            self._set_transfer_parameters(chunk)
986
987        return chunk
988