1# Copyright 2022 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Classes for read and write transfers.""" 15 16import abc 17import asyncio 18from dataclasses import dataclass 19import logging 20import math 21import threading 22from typing import Any, Callable, Optional 23 24from pw_status import Status 25from pw_transfer.transfer_pb2 import Chunk 26 27_LOG = logging.getLogger(__package__) 28 29 30@dataclass(frozen=True) 31class ProgressStats: 32 bytes_sent: int 33 bytes_confirmed_received: int 34 total_size_bytes: Optional[int] 35 36 def percent_received(self) -> float: 37 if self.total_size_bytes is None: 38 return math.nan 39 40 return self.bytes_confirmed_received / self.total_size_bytes * 100 41 42 def __str__(self) -> str: 43 total = str( 44 self.total_size_bytes) if self.total_size_bytes else 'unknown' 45 return (f'{self.percent_received():5.1f}% ({self.bytes_sent} B sent, ' 46 f'{self.bytes_confirmed_received} B received of {total} B)') 47 48 49ProgressCallback = Callable[[ProgressStats], Any] 50 51 52class _Timer: 53 """A timer which invokes a callback after a certain timeout.""" 54 def __init__(self, timeout_s: float, callback: Callable[[], Any]): 55 self.timeout_s = timeout_s 56 self._callback = callback 57 self._task: Optional[asyncio.Task[Any]] = None 58 59 def start(self, timeout_s: float = None) -> None: 60 """Starts a new timer. 61 62 If a timer is already running, it is stopped and a new timer started. 63 This can be used to implement watchdog-like behavior, where a callback 64 is invoked after some time without a kick. 65 """ 66 self.stop() 67 timeout_s = self.timeout_s if timeout_s is None else timeout_s 68 self._task = asyncio.create_task(self._run(timeout_s)) 69 70 def stop(self) -> None: 71 """Terminates a running timer.""" 72 if self._task is not None: 73 self._task.cancel() 74 self._task = None 75 76 async def _run(self, timeout_s: float) -> None: 77 await asyncio.sleep(timeout_s) 78 self._task = None 79 self._callback() 80 81 82class Transfer(abc.ABC): 83 """A client-side data transfer through a Manager. 84 85 Subclasses are responsible for implementing all of the logic for their type 86 of transfer, receiving messages from the server and sending the appropriate 87 messages in response. 88 """ 89 def __init__(self, 90 transfer_id: int, 91 send_chunk: Callable[[Chunk], None], 92 end_transfer: Callable[['Transfer'], None], 93 response_timeout_s: float, 94 initial_response_timeout_s: float, 95 max_retries: int, 96 progress_callback: ProgressCallback = None): 97 self.id = transfer_id 98 self.status = Status.OK 99 self.done = threading.Event() 100 101 self._send_chunk = send_chunk 102 self._end_transfer = end_transfer 103 104 self._retries = 0 105 self._max_retries = max_retries 106 self._response_timer = _Timer(response_timeout_s, self._on_timeout) 107 self._initial_response_timeout_s = initial_response_timeout_s 108 109 self._progress_callback = progress_callback 110 111 async def begin(self) -> None: 112 """Sends the initial chunk of the transfer.""" 113 self._send_chunk(self._initial_chunk()) 114 self._response_timer.start(self._initial_response_timeout_s) 115 116 @property 117 @abc.abstractmethod 118 def data(self) -> bytes: 119 """Returns the data read or written in this transfer.""" 120 121 @abc.abstractmethod 122 def _initial_chunk(self) -> Chunk: 123 """Returns the initial chunk to notify the sever of the transfer.""" 124 125 async def handle_chunk(self, chunk: Chunk) -> None: 126 """Processes an incoming chunk from the server. 127 128 Handles terminating chunks (i.e. those with a status) and forwards 129 non-terminating chunks to handle_data_chunk. 130 """ 131 self._response_timer.stop() 132 self._retries = 0 # Received data from service, so reset the retries. 133 134 _LOG.debug('Received chunk\n%s', str(chunk).rstrip()) 135 136 # Status chunks are only used to terminate a transfer. They do not 137 # contain any data that requires processing. 138 if chunk.HasField('status'): 139 self.finish(Status(chunk.status)) 140 return 141 142 await self._handle_data_chunk(chunk) 143 144 # Start the timeout for the server to send a chunk in response. 145 self._response_timer.start() 146 147 @abc.abstractmethod 148 async def _handle_data_chunk(self, chunk: Chunk) -> None: 149 """Handles a chunk that contains or requests data.""" 150 151 @abc.abstractmethod 152 def _retry_after_timeout(self) -> None: 153 """Retries after a timeout occurs.""" 154 155 def _on_timeout(self) -> None: 156 """Handles a timeout while waiting for a chunk.""" 157 if self.done.is_set(): 158 return 159 160 self._retries += 1 161 if self._retries > self._max_retries: 162 self.finish(Status.DEADLINE_EXCEEDED) 163 return 164 165 _LOG.debug('Received no responses for %.3fs; retrying %d/%d', 166 self._response_timer.timeout_s, self._retries, 167 self._max_retries) 168 self._retry_after_timeout() 169 self._response_timer.start() 170 171 def finish(self, status: Status, skip_callback: bool = False) -> None: 172 """Ends the transfer with the specified status.""" 173 self._response_timer.stop() 174 self.status = status 175 176 if status.ok(): 177 total_size = len(self.data) 178 self._update_progress(total_size, total_size, total_size) 179 180 if not skip_callback: 181 self._end_transfer(self) 182 183 # Set done last so that the transfer has been fully cleaned up. 184 self.done.set() 185 186 def _update_progress(self, bytes_sent: int, bytes_confirmed_received: int, 187 total_size_bytes: Optional[int]) -> None: 188 """Invokes the provided progress callback, if any, with the progress.""" 189 190 stats = ProgressStats(bytes_sent, bytes_confirmed_received, 191 total_size_bytes) 192 _LOG.debug('Transfer %d progress: %s', self.id, stats) 193 194 if self._progress_callback: 195 self._progress_callback(stats) 196 197 def _send_error(self, error: Status) -> None: 198 """Sends an error chunk to the server and finishes the transfer.""" 199 self._send_chunk( 200 Chunk(transfer_id=self.id, 201 status=error.value, 202 type=Chunk.Type.TRANSFER_COMPLETION)) 203 self.finish(error) 204 205 206class WriteTransfer(Transfer): 207 """A client -> server write transfer.""" 208 def __init__( 209 self, 210 transfer_id: int, 211 data: bytes, 212 send_chunk: Callable[[Chunk], None], 213 end_transfer: Callable[[Transfer], None], 214 response_timeout_s: float, 215 initial_response_timeout_s: float, 216 max_retries: int, 217 progress_callback: ProgressCallback = None, 218 ): 219 super().__init__(transfer_id, send_chunk, end_transfer, 220 response_timeout_s, initial_response_timeout_s, 221 max_retries, progress_callback) 222 self._data = data 223 224 # Guard this class with a lock since a transfer parameters update might 225 # arrive while responding to a prior update. 226 self._lock = asyncio.Lock() 227 self._offset = 0 228 self._window_end_offset = 0 229 self._max_chunk_size = 0 230 self._chunk_delay_us: Optional[int] = None 231 232 # The window ID increments for each parameters update. 233 self._window_id = 0 234 235 self._last_chunk = self._initial_chunk() 236 237 @property 238 def data(self) -> bytes: 239 return self._data 240 241 def _initial_chunk(self) -> Chunk: 242 return Chunk(transfer_id=self.id, type=Chunk.Type.TRANSFER_START) 243 244 async def _handle_data_chunk(self, chunk: Chunk) -> None: 245 """Processes an incoming chunk from the server. 246 247 In a write transfer, the server only sends transfer parameter updates 248 to the client. When a message is received, update local parameters and 249 send data accordingly. 250 """ 251 252 async with self._lock: 253 self._window_id += 1 254 window_id = self._window_id 255 256 if not self._handle_parameters_update(chunk): 257 return 258 259 bytes_acknowledged = chunk.offset 260 261 while True: 262 if self._chunk_delay_us: 263 await asyncio.sleep(self._chunk_delay_us / 1e6) 264 265 async with self._lock: 266 if self.done.is_set(): 267 return 268 269 if window_id != self._window_id: 270 _LOG.debug('Transfer %d: Skipping stale window', self.id) 271 return 272 273 write_chunk = self._next_chunk() 274 self._offset += len(write_chunk.data) 275 sent_requested_bytes = self._offset == self._window_end_offset 276 277 self._send_chunk(write_chunk) 278 279 self._update_progress(self._offset, bytes_acknowledged, 280 len(self.data)) 281 282 if sent_requested_bytes: 283 break 284 285 self._last_chunk = write_chunk 286 287 def _handle_parameters_update(self, chunk: Chunk) -> bool: 288 """Updates transfer state based on a transfer parameters update.""" 289 290 retransmit = True 291 if chunk.HasField('type'): 292 retransmit = (chunk.type == Chunk.Type.PARAMETERS_RETRANSMIT 293 or chunk.type == Chunk.Type.TRANSFER_START) 294 295 if chunk.offset > len(self.data): 296 # Bad offset; terminate the transfer. 297 _LOG.error( 298 'Transfer %d: server requested invalid offset %d (size %d)', 299 self.id, chunk.offset, len(self.data)) 300 301 self._send_error(Status.OUT_OF_RANGE) 302 return False 303 304 if chunk.pending_bytes == 0: 305 _LOG.error( 306 'Transfer %d: service requested 0 bytes (invalid); aborting', 307 self.id) 308 self._send_error(Status.INTERNAL) 309 return False 310 311 if retransmit: 312 # Check whether the client has sent a previous data offset, which 313 # indicates that some chunks were lost in transmission. 314 if chunk.offset < self._offset: 315 _LOG.debug('Write transfer %d rolling back: offset %d from %d', 316 self.id, chunk.offset, self._offset) 317 318 self._offset = chunk.offset 319 320 # Retransmit is the default behavior for older versions of the 321 # transfer protocol. The window_end_offset field is not guaranteed 322 # to be set in these version, so it must be calculated. 323 max_bytes_to_send = min(chunk.pending_bytes, 324 len(self.data) - self._offset) 325 self._window_end_offset = self._offset + max_bytes_to_send 326 else: 327 assert chunk.type == Chunk.Type.PARAMETERS_CONTINUE 328 329 # Extend the window to the new end offset specified by the server. 330 self._window_end_offset = min(chunk.window_end_offset, 331 len(self.data)) 332 333 if chunk.HasField('max_chunk_size_bytes'): 334 self._max_chunk_size = chunk.max_chunk_size_bytes 335 336 if chunk.HasField('min_delay_microseconds'): 337 self._chunk_delay_us = chunk.min_delay_microseconds 338 339 return True 340 341 def _retry_after_timeout(self) -> None: 342 self._send_chunk(self._last_chunk) 343 344 def _next_chunk(self) -> Chunk: 345 """Returns the next Chunk message to send in the data transfer.""" 346 chunk = Chunk(transfer_id=self.id, 347 offset=self._offset, 348 type=Chunk.Type.TRANSFER_DATA) 349 max_bytes_in_chunk = min(self._max_chunk_size, 350 self._window_end_offset - self._offset) 351 352 chunk.data = self.data[self._offset:self._offset + max_bytes_in_chunk] 353 354 # Mark the final chunk of the transfer. 355 if len(self.data) - self._offset <= max_bytes_in_chunk: 356 chunk.remaining_bytes = 0 357 358 return chunk 359 360 361class ReadTransfer(Transfer): 362 """A client <- server read transfer. 363 364 Although Python can effectively handle an unlimited transfer window, this 365 client sets a conservative window and chunk size to avoid overloading the 366 device. These are configurable in the constructor. 367 """ 368 369 # The fractional position within a window at which a receive transfer should 370 # extend its window size to minimize the amount of time the transmitter 371 # spends blocked. 372 # 373 # For example, a divisor of 2 will extend the window when half of the 374 # requested data has been received, a divisor of three will extend at a 375 # third of the window, and so on. 376 EXTEND_WINDOW_DIVISOR = 2 377 378 def __init__( # pylint: disable=too-many-arguments 379 self, 380 transfer_id: int, 381 send_chunk: Callable[[Chunk], None], 382 end_transfer: Callable[[Transfer], None], 383 response_timeout_s: float, 384 initial_response_timeout_s: float, 385 max_retries: int, 386 max_bytes_to_receive: int = 8192, 387 max_chunk_size: int = 1024, 388 chunk_delay_us: int = None, 389 progress_callback: ProgressCallback = None): 390 super().__init__(transfer_id, send_chunk, end_transfer, 391 response_timeout_s, initial_response_timeout_s, 392 max_retries, progress_callback) 393 self._max_bytes_to_receive = max_bytes_to_receive 394 self._max_chunk_size = max_chunk_size 395 self._chunk_delay_us = chunk_delay_us 396 397 self._remaining_transfer_size: Optional[int] = None 398 self._data = bytearray() 399 self._offset = 0 400 self._pending_bytes = max_bytes_to_receive 401 self._window_end_offset = max_bytes_to_receive 402 403 @property 404 def data(self) -> bytes: 405 """Returns an immutable copy of the data that has been read.""" 406 return bytes(self._data) 407 408 def _initial_chunk(self) -> Chunk: 409 return self._transfer_parameters(Chunk.Type.TRANSFER_START) 410 411 async def _handle_data_chunk(self, chunk: Chunk) -> None: 412 """Processes an incoming chunk from the server. 413 414 In a read transfer, the client receives data chunks from the server. 415 Once all pending data is received, the transfer parameters are updated. 416 """ 417 418 if chunk.offset != self._offset: 419 # Initially, the transfer service only supports in-order transfers. 420 # If data is received out of order, request that the server 421 # retransmit from the previous offset. 422 self._send_chunk( 423 self._transfer_parameters(Chunk.Type.PARAMETERS_RETRANSMIT)) 424 return 425 426 self._data += chunk.data 427 self._pending_bytes -= len(chunk.data) 428 self._offset += len(chunk.data) 429 430 if chunk.HasField('remaining_bytes'): 431 if chunk.remaining_bytes == 0: 432 # No more data to read. Acknowledge receipt and finish. 433 self._send_chunk( 434 Chunk(transfer_id=self.id, 435 status=Status.OK.value, 436 type=Chunk.Type.TRANSFER_COMPLETION)) 437 self.finish(Status.OK) 438 return 439 440 # The server may indicate if the amount of remaining data is known. 441 self._remaining_transfer_size = chunk.remaining_bytes 442 elif self._remaining_transfer_size is not None: 443 # Update the remaining transfer size, if it is known. 444 self._remaining_transfer_size -= len(chunk.data) 445 446 # If the transfer size drops to zero, the estimate was inaccurate. 447 if self._remaining_transfer_size <= 0: 448 self._remaining_transfer_size = None 449 450 total_size = None if self._remaining_transfer_size is None else ( 451 self._remaining_transfer_size + self._offset) 452 self._update_progress(self._offset, self._offset, total_size) 453 454 if chunk.window_end_offset != 0: 455 if chunk.window_end_offset < self._offset: 456 _LOG.error( 457 'Transfer %d: transmitter sent invalid earlier end offset ' 458 '%d (receiver offset %d)', self.id, 459 chunk.window_end_offset, self._offset) 460 self._send_error(Status.INTERNAL) 461 return 462 463 if chunk.window_end_offset > self._window_end_offset: 464 _LOG.error( 465 'Transfer %d: transmitter sent invalid later end offset ' 466 '%d (receiver end offset %d)', self.id, 467 chunk.window_end_offset, self._window_end_offset) 468 self._send_error(Status.INTERNAL) 469 return 470 471 self._window_end_offset = chunk.window_end_offset 472 self._pending_bytes -= chunk.window_end_offset - self._offset 473 474 remaining_window_size = self._window_end_offset - self._offset 475 extend_window = (remaining_window_size <= self._max_bytes_to_receive / 476 ReadTransfer.EXTEND_WINDOW_DIVISOR) 477 478 if self._pending_bytes == 0: 479 # All pending data was received. Send out a new parameters chunk for 480 # the next block. 481 self._send_chunk( 482 self._transfer_parameters(Chunk.Type.PARAMETERS_RETRANSMIT)) 483 elif extend_window: 484 self._send_chunk( 485 self._transfer_parameters(Chunk.Type.PARAMETERS_CONTINUE)) 486 487 def _retry_after_timeout(self) -> None: 488 self._send_chunk( 489 self._transfer_parameters(Chunk.Type.PARAMETERS_RETRANSMIT)) 490 491 def _transfer_parameters(self, chunk_type: Any) -> Chunk: 492 """Sends an updated transfer parameters chunk to the server.""" 493 494 self._pending_bytes = self._max_bytes_to_receive 495 self._window_end_offset = self._offset + self._max_bytes_to_receive 496 497 chunk = Chunk(transfer_id=self.id, 498 pending_bytes=self._pending_bytes, 499 window_end_offset=self._window_end_offset, 500 max_chunk_size_bytes=self._max_chunk_size, 501 offset=self._offset, 502 type=chunk_type) 503 504 if self._chunk_delay_us: 505 chunk.min_delay_microseconds = self._chunk_delay_us 506 507 return chunk 508