• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2022 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Classes for read and write transfers."""
15
16import abc
17import asyncio
18from dataclasses import dataclass
19import logging
20import math
21import threading
22from typing import Any, Callable, Optional
23
24from pw_status import Status
25from pw_transfer.transfer_pb2 import Chunk
26
27_LOG = logging.getLogger(__package__)
28
29
30@dataclass(frozen=True)
31class ProgressStats:
32    bytes_sent: int
33    bytes_confirmed_received: int
34    total_size_bytes: Optional[int]
35
36    def percent_received(self) -> float:
37        if self.total_size_bytes is None:
38            return math.nan
39
40        return self.bytes_confirmed_received / self.total_size_bytes * 100
41
42    def __str__(self) -> str:
43        total = str(
44            self.total_size_bytes) if self.total_size_bytes else 'unknown'
45        return (f'{self.percent_received():5.1f}% ({self.bytes_sent} B sent, '
46                f'{self.bytes_confirmed_received} B received of {total} B)')
47
48
49ProgressCallback = Callable[[ProgressStats], Any]
50
51
52class _Timer:
53    """A timer which invokes a callback after a certain timeout."""
54    def __init__(self, timeout_s: float, callback: Callable[[], Any]):
55        self.timeout_s = timeout_s
56        self._callback = callback
57        self._task: Optional[asyncio.Task[Any]] = None
58
59    def start(self, timeout_s: float = None) -> None:
60        """Starts a new timer.
61
62        If a timer is already running, it is stopped and a new timer started.
63        This can be used to implement watchdog-like behavior, where a callback
64        is invoked after some time without a kick.
65        """
66        self.stop()
67        timeout_s = self.timeout_s if timeout_s is None else timeout_s
68        self._task = asyncio.create_task(self._run(timeout_s))
69
70    def stop(self) -> None:
71        """Terminates a running timer."""
72        if self._task is not None:
73            self._task.cancel()
74            self._task = None
75
76    async def _run(self, timeout_s: float) -> None:
77        await asyncio.sleep(timeout_s)
78        self._task = None
79        self._callback()
80
81
82class Transfer(abc.ABC):
83    """A client-side data transfer through a Manager.
84
85    Subclasses are responsible for implementing all of the logic for their type
86    of transfer, receiving messages from the server and sending the appropriate
87    messages in response.
88    """
89    def __init__(self,
90                 transfer_id: int,
91                 send_chunk: Callable[[Chunk], None],
92                 end_transfer: Callable[['Transfer'], None],
93                 response_timeout_s: float,
94                 initial_response_timeout_s: float,
95                 max_retries: int,
96                 progress_callback: ProgressCallback = None):
97        self.id = transfer_id
98        self.status = Status.OK
99        self.done = threading.Event()
100
101        self._send_chunk = send_chunk
102        self._end_transfer = end_transfer
103
104        self._retries = 0
105        self._max_retries = max_retries
106        self._response_timer = _Timer(response_timeout_s, self._on_timeout)
107        self._initial_response_timeout_s = initial_response_timeout_s
108
109        self._progress_callback = progress_callback
110
111    async def begin(self) -> None:
112        """Sends the initial chunk of the transfer."""
113        self._send_chunk(self._initial_chunk())
114        self._response_timer.start(self._initial_response_timeout_s)
115
116    @property
117    @abc.abstractmethod
118    def data(self) -> bytes:
119        """Returns the data read or written in this transfer."""
120
121    @abc.abstractmethod
122    def _initial_chunk(self) -> Chunk:
123        """Returns the initial chunk to notify the sever of the transfer."""
124
125    async def handle_chunk(self, chunk: Chunk) -> None:
126        """Processes an incoming chunk from the server.
127
128        Handles terminating chunks (i.e. those with a status) and forwards
129        non-terminating chunks to handle_data_chunk.
130        """
131        self._response_timer.stop()
132        self._retries = 0  # Received data from service, so reset the retries.
133
134        _LOG.debug('Received chunk\n%s', str(chunk).rstrip())
135
136        # Status chunks are only used to terminate a transfer. They do not
137        # contain any data that requires processing.
138        if chunk.HasField('status'):
139            self.finish(Status(chunk.status))
140            return
141
142        await self._handle_data_chunk(chunk)
143
144        # Start the timeout for the server to send a chunk in response.
145        self._response_timer.start()
146
147    @abc.abstractmethod
148    async def _handle_data_chunk(self, chunk: Chunk) -> None:
149        """Handles a chunk that contains or requests data."""
150
151    @abc.abstractmethod
152    def _retry_after_timeout(self) -> None:
153        """Retries after a timeout occurs."""
154
155    def _on_timeout(self) -> None:
156        """Handles a timeout while waiting for a chunk."""
157        if self.done.is_set():
158            return
159
160        self._retries += 1
161        if self._retries > self._max_retries:
162            self.finish(Status.DEADLINE_EXCEEDED)
163            return
164
165        _LOG.debug('Received no responses for %.3fs; retrying %d/%d',
166                   self._response_timer.timeout_s, self._retries,
167                   self._max_retries)
168        self._retry_after_timeout()
169        self._response_timer.start()
170
171    def finish(self, status: Status, skip_callback: bool = False) -> None:
172        """Ends the transfer with the specified status."""
173        self._response_timer.stop()
174        self.status = status
175
176        if status.ok():
177            total_size = len(self.data)
178            self._update_progress(total_size, total_size, total_size)
179
180        if not skip_callback:
181            self._end_transfer(self)
182
183        # Set done last so that the transfer has been fully cleaned up.
184        self.done.set()
185
186    def _update_progress(self, bytes_sent: int, bytes_confirmed_received: int,
187                         total_size_bytes: Optional[int]) -> None:
188        """Invokes the provided progress callback, if any, with the progress."""
189
190        stats = ProgressStats(bytes_sent, bytes_confirmed_received,
191                              total_size_bytes)
192        _LOG.debug('Transfer %d progress: %s', self.id, stats)
193
194        if self._progress_callback:
195            self._progress_callback(stats)
196
197    def _send_error(self, error: Status) -> None:
198        """Sends an error chunk to the server and finishes the transfer."""
199        self._send_chunk(
200            Chunk(transfer_id=self.id,
201                  status=error.value,
202                  type=Chunk.Type.TRANSFER_COMPLETION))
203        self.finish(error)
204
205
206class WriteTransfer(Transfer):
207    """A client -> server write transfer."""
208    def __init__(
209        self,
210        transfer_id: int,
211        data: bytes,
212        send_chunk: Callable[[Chunk], None],
213        end_transfer: Callable[[Transfer], None],
214        response_timeout_s: float,
215        initial_response_timeout_s: float,
216        max_retries: int,
217        progress_callback: ProgressCallback = None,
218    ):
219        super().__init__(transfer_id, send_chunk, end_transfer,
220                         response_timeout_s, initial_response_timeout_s,
221                         max_retries, progress_callback)
222        self._data = data
223
224        # Guard this class with a lock since a transfer parameters update might
225        # arrive while responding to a prior update.
226        self._lock = asyncio.Lock()
227        self._offset = 0
228        self._window_end_offset = 0
229        self._max_chunk_size = 0
230        self._chunk_delay_us: Optional[int] = None
231
232        # The window ID increments for each parameters update.
233        self._window_id = 0
234
235        self._last_chunk = self._initial_chunk()
236
237    @property
238    def data(self) -> bytes:
239        return self._data
240
241    def _initial_chunk(self) -> Chunk:
242        return Chunk(transfer_id=self.id, type=Chunk.Type.TRANSFER_START)
243
244    async def _handle_data_chunk(self, chunk: Chunk) -> None:
245        """Processes an incoming chunk from the server.
246
247        In a write transfer, the server only sends transfer parameter updates
248        to the client. When a message is received, update local parameters and
249        send data accordingly.
250        """
251
252        async with self._lock:
253            self._window_id += 1
254            window_id = self._window_id
255
256            if not self._handle_parameters_update(chunk):
257                return
258
259            bytes_acknowledged = chunk.offset
260
261        while True:
262            if self._chunk_delay_us:
263                await asyncio.sleep(self._chunk_delay_us / 1e6)
264
265            async with self._lock:
266                if self.done.is_set():
267                    return
268
269                if window_id != self._window_id:
270                    _LOG.debug('Transfer %d: Skipping stale window', self.id)
271                    return
272
273                write_chunk = self._next_chunk()
274                self._offset += len(write_chunk.data)
275                sent_requested_bytes = self._offset == self._window_end_offset
276
277            self._send_chunk(write_chunk)
278
279            self._update_progress(self._offset, bytes_acknowledged,
280                                  len(self.data))
281
282            if sent_requested_bytes:
283                break
284
285        self._last_chunk = write_chunk
286
287    def _handle_parameters_update(self, chunk: Chunk) -> bool:
288        """Updates transfer state based on a transfer parameters update."""
289
290        retransmit = True
291        if chunk.HasField('type'):
292            retransmit = (chunk.type == Chunk.Type.PARAMETERS_RETRANSMIT
293                          or chunk.type == Chunk.Type.TRANSFER_START)
294
295        if chunk.offset > len(self.data):
296            # Bad offset; terminate the transfer.
297            _LOG.error(
298                'Transfer %d: server requested invalid offset %d (size %d)',
299                self.id, chunk.offset, len(self.data))
300
301            self._send_error(Status.OUT_OF_RANGE)
302            return False
303
304        if chunk.pending_bytes == 0:
305            _LOG.error(
306                'Transfer %d: service requested 0 bytes (invalid); aborting',
307                self.id)
308            self._send_error(Status.INTERNAL)
309            return False
310
311        if retransmit:
312            # Check whether the client has sent a previous data offset, which
313            # indicates that some chunks were lost in transmission.
314            if chunk.offset < self._offset:
315                _LOG.debug('Write transfer %d rolling back: offset %d from %d',
316                           self.id, chunk.offset, self._offset)
317
318            self._offset = chunk.offset
319
320            # Retransmit is the default behavior for older versions of the
321            # transfer protocol. The window_end_offset field is not guaranteed
322            # to be set in these version, so it must be calculated.
323            max_bytes_to_send = min(chunk.pending_bytes,
324                                    len(self.data) - self._offset)
325            self._window_end_offset = self._offset + max_bytes_to_send
326        else:
327            assert chunk.type == Chunk.Type.PARAMETERS_CONTINUE
328
329            # Extend the window to the new end offset specified by the server.
330            self._window_end_offset = min(chunk.window_end_offset,
331                                          len(self.data))
332
333        if chunk.HasField('max_chunk_size_bytes'):
334            self._max_chunk_size = chunk.max_chunk_size_bytes
335
336        if chunk.HasField('min_delay_microseconds'):
337            self._chunk_delay_us = chunk.min_delay_microseconds
338
339        return True
340
341    def _retry_after_timeout(self) -> None:
342        self._send_chunk(self._last_chunk)
343
344    def _next_chunk(self) -> Chunk:
345        """Returns the next Chunk message to send in the data transfer."""
346        chunk = Chunk(transfer_id=self.id,
347                      offset=self._offset,
348                      type=Chunk.Type.TRANSFER_DATA)
349        max_bytes_in_chunk = min(self._max_chunk_size,
350                                 self._window_end_offset - self._offset)
351
352        chunk.data = self.data[self._offset:self._offset + max_bytes_in_chunk]
353
354        # Mark the final chunk of the transfer.
355        if len(self.data) - self._offset <= max_bytes_in_chunk:
356            chunk.remaining_bytes = 0
357
358        return chunk
359
360
361class ReadTransfer(Transfer):
362    """A client <- server read transfer.
363
364    Although Python can effectively handle an unlimited transfer window, this
365    client sets a conservative window and chunk size to avoid overloading the
366    device. These are configurable in the constructor.
367    """
368
369    # The fractional position within a window at which a receive transfer should
370    # extend its window size to minimize the amount of time the transmitter
371    # spends blocked.
372    #
373    # For example, a divisor of 2 will extend the window when half of the
374    # requested data has been received, a divisor of three will extend at a
375    # third of the window, and so on.
376    EXTEND_WINDOW_DIVISOR = 2
377
378    def __init__(  # pylint: disable=too-many-arguments
379            self,
380            transfer_id: int,
381            send_chunk: Callable[[Chunk], None],
382            end_transfer: Callable[[Transfer], None],
383            response_timeout_s: float,
384            initial_response_timeout_s: float,
385            max_retries: int,
386            max_bytes_to_receive: int = 8192,
387            max_chunk_size: int = 1024,
388            chunk_delay_us: int = None,
389            progress_callback: ProgressCallback = None):
390        super().__init__(transfer_id, send_chunk, end_transfer,
391                         response_timeout_s, initial_response_timeout_s,
392                         max_retries, progress_callback)
393        self._max_bytes_to_receive = max_bytes_to_receive
394        self._max_chunk_size = max_chunk_size
395        self._chunk_delay_us = chunk_delay_us
396
397        self._remaining_transfer_size: Optional[int] = None
398        self._data = bytearray()
399        self._offset = 0
400        self._pending_bytes = max_bytes_to_receive
401        self._window_end_offset = max_bytes_to_receive
402
403    @property
404    def data(self) -> bytes:
405        """Returns an immutable copy of the data that has been read."""
406        return bytes(self._data)
407
408    def _initial_chunk(self) -> Chunk:
409        return self._transfer_parameters(Chunk.Type.TRANSFER_START)
410
411    async def _handle_data_chunk(self, chunk: Chunk) -> None:
412        """Processes an incoming chunk from the server.
413
414        In a read transfer, the client receives data chunks from the server.
415        Once all pending data is received, the transfer parameters are updated.
416        """
417
418        if chunk.offset != self._offset:
419            # Initially, the transfer service only supports in-order transfers.
420            # If data is received out of order, request that the server
421            # retransmit from the previous offset.
422            self._send_chunk(
423                self._transfer_parameters(Chunk.Type.PARAMETERS_RETRANSMIT))
424            return
425
426        self._data += chunk.data
427        self._pending_bytes -= len(chunk.data)
428        self._offset += len(chunk.data)
429
430        if chunk.HasField('remaining_bytes'):
431            if chunk.remaining_bytes == 0:
432                # No more data to read. Acknowledge receipt and finish.
433                self._send_chunk(
434                    Chunk(transfer_id=self.id,
435                          status=Status.OK.value,
436                          type=Chunk.Type.TRANSFER_COMPLETION))
437                self.finish(Status.OK)
438                return
439
440            # The server may indicate if the amount of remaining data is known.
441            self._remaining_transfer_size = chunk.remaining_bytes
442        elif self._remaining_transfer_size is not None:
443            # Update the remaining transfer size, if it is known.
444            self._remaining_transfer_size -= len(chunk.data)
445
446            # If the transfer size drops to zero, the estimate was inaccurate.
447            if self._remaining_transfer_size <= 0:
448                self._remaining_transfer_size = None
449
450        total_size = None if self._remaining_transfer_size is None else (
451            self._remaining_transfer_size + self._offset)
452        self._update_progress(self._offset, self._offset, total_size)
453
454        if chunk.window_end_offset != 0:
455            if chunk.window_end_offset < self._offset:
456                _LOG.error(
457                    'Transfer %d: transmitter sent invalid earlier end offset '
458                    '%d (receiver offset %d)', self.id,
459                    chunk.window_end_offset, self._offset)
460                self._send_error(Status.INTERNAL)
461                return
462
463            if chunk.window_end_offset > self._window_end_offset:
464                _LOG.error(
465                    'Transfer %d: transmitter sent invalid later end offset '
466                    '%d (receiver end offset %d)', self.id,
467                    chunk.window_end_offset, self._window_end_offset)
468                self._send_error(Status.INTERNAL)
469                return
470
471            self._window_end_offset = chunk.window_end_offset
472            self._pending_bytes -= chunk.window_end_offset - self._offset
473
474        remaining_window_size = self._window_end_offset - self._offset
475        extend_window = (remaining_window_size <= self._max_bytes_to_receive /
476                         ReadTransfer.EXTEND_WINDOW_DIVISOR)
477
478        if self._pending_bytes == 0:
479            # All pending data was received. Send out a new parameters chunk for
480            # the next block.
481            self._send_chunk(
482                self._transfer_parameters(Chunk.Type.PARAMETERS_RETRANSMIT))
483        elif extend_window:
484            self._send_chunk(
485                self._transfer_parameters(Chunk.Type.PARAMETERS_CONTINUE))
486
487    def _retry_after_timeout(self) -> None:
488        self._send_chunk(
489            self._transfer_parameters(Chunk.Type.PARAMETERS_RETRANSMIT))
490
491    def _transfer_parameters(self, chunk_type: Any) -> Chunk:
492        """Sends an updated transfer parameters chunk to the server."""
493
494        self._pending_bytes = self._max_bytes_to_receive
495        self._window_end_offset = self._offset + self._max_bytes_to_receive
496
497        chunk = Chunk(transfer_id=self.id,
498                      pending_bytes=self._pending_bytes,
499                      window_end_offset=self._window_end_offset,
500                      max_chunk_size_bytes=self._max_chunk_size,
501                      offset=self._offset,
502                      type=chunk_type)
503
504        if self._chunk_delay_us:
505            chunk.min_delay_microseconds = self._chunk_delay_us
506
507        return chunk
508