1# Copyright 2023 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Protocol version-aware chunk message wrapper.""" 15 16from __future__ import annotations 17 18import enum 19from typing import Any 20 21from pw_status import Status 22 23try: 24 from pw_transfer import transfer_pb2 25except ImportError: 26 # For the bazel build, which puts generated protos in a different location. 27 from pigweed.pw_transfer import transfer_pb2 # type: ignore 28 29 30class ProtocolVersion(enum.Enum): 31 """Supported versions of pw_transfer's RPC data transfer protocol.""" 32 33 # Protocol version not known or not set. 34 UNKNOWN = 0 35 36 # The original transfer protocol, prior to transfer start/end handshakes. 37 LEGACY = 1 38 39 # Second version of the transfer protocol. Guarantees type fields on all 40 # chunks, deprecates pending_bytes in favor of window_end_offset, splits 41 # transfer resource IDs from ephemeral session IDs, and adds a handshake 42 # to the start and end of all transfer sessions. 43 VERSION_TWO = 2 44 45 # Alias to the most up-to-date version of the transfer protocol. 46 LATEST = VERSION_TWO 47 48 49_ChunkType = transfer_pb2.Chunk.Type 50 51 52class Chunk: 53 """A chunk exchanged in a pw_transfer stream. 54 55 Wraps the generated protobuf Chunk class with protocol-aware field encoding 56 and decoding. 57 58 Attributes: 59 protocol_version: Version of the transfer protocol with which the chunk 60 is encoded. 61 chunk_type: Type of the chunk within the protocol. 62 session_id: ID for the transfer session to which the chunk belongs. 63 desired_session_id: For a v2 START chunk, the client-assigned session ID 64 to request from the server. 65 resource_id: For a v2 START chunk, ID of the resource to transfer. 66 offset: Offset of the data to be transferred. 67 window_end_offset: In a parameters chunk, end offset of the available 68 window. 69 data: Raw transfer data. 70 remaining_bytes: Optional number of bytes remaining in the transfer. 71 Set to 0 when the data is fully transferred. 72 max_chunk_size_bytes: Maximum number of bytes to send in a single data 73 chunk. 74 min_delay_microseconds: Delay between data chunks to be sent. 75 """ 76 77 # pylint: disable=too-many-instance-attributes 78 79 Type = transfer_pb2.Chunk.Type 80 81 # TODO(frolv): Figure out how to make the chunk type annotation work. 82 # pylint: disable=too-many-arguments 83 def __init__( 84 self, 85 protocol_version: ProtocolVersion, 86 chunk_type: Any, 87 session_id: int = 0, 88 desired_session_id: int | None = None, 89 resource_id: int | None = None, 90 offset: int = 0, 91 window_end_offset: int = 0, 92 data: bytes = b'', 93 remaining_bytes: int | None = None, 94 max_chunk_size_bytes: int | None = None, 95 min_delay_microseconds: int | None = None, 96 status: Status | None = None, 97 initial_offset: int = 0, 98 ): 99 """Creates a new transfer chunk. 100 101 Args: 102 protocol_version: Version of the transfer protocol with which to 103 encode the chunk. 104 chunk_type: Type of the chunk within the protocol. 105 session_id: ID for the transfer session to which the chunk belongs. 106 desired_session_id: For a v2 START chunk, the client-assigned 107 session ID to request from the server. 108 resource_id: For a v2 START chunk, ID of the resource to transfer. 109 offset: Offset of the data to be transferred. 110 window_end_offset: In a parameters chunk, end offset of the 111 available window. 112 data: Raw transfer data. 113 remaining_bytes: Optional number of bytes remaining in the transfer. 114 Set to 0 when the data is fully transferred. 115 max_chunk_size_bytes: Maximum number of bytes to send in a single 116 data chunk. 117 min_delay_microseconds: Delay between data chunks to be sent. 118 status: In a COMPLETION chunk, final status of the transfer. 119 initial_offset: Initial offset for non-zero starting offset 120 transfers 121 """ 122 self.protocol_version = protocol_version 123 self.type = chunk_type 124 self.session_id = session_id 125 self.desired_session_id = desired_session_id 126 self.resource_id = resource_id 127 self.offset = offset 128 self.window_end_offset = window_end_offset 129 self.data = data 130 self.remaining_bytes = remaining_bytes 131 self.max_chunk_size_bytes = max_chunk_size_bytes 132 self.min_delay_microseconds = min_delay_microseconds 133 self.status = status 134 self.initial_offset = initial_offset 135 136 @classmethod 137 def from_message(cls, message: transfer_pb2.Chunk) -> Chunk: 138 """Parses a Chunk from a protobuf message.""" 139 140 # Some very old versions of transfer don't always encode chunk types, 141 # so they must be deduced. 142 # 143 # The type-less legacy transfer protocol doesn't support handshakes or 144 # continuation parameters. Therefore, there are only three possible 145 # types: start, data, and retransmit. 146 if message.HasField('type'): 147 chunk_type = message.type 148 elif ( 149 message.offset == 0 150 and not message.data 151 and not message.HasField('status') 152 ): 153 chunk_type = Chunk.Type.START 154 elif message.data: 155 chunk_type = Chunk.Type.DATA 156 else: 157 chunk_type = Chunk.Type.PARAMETERS_RETRANSMIT 158 159 chunk = cls( 160 ProtocolVersion.UNKNOWN, 161 chunk_type, 162 offset=message.offset, 163 window_end_offset=message.window_end_offset, 164 data=message.data, 165 initial_offset=message.initial_offset, 166 ) 167 168 if message.HasField('session_id'): 169 chunk.protocol_version = ProtocolVersion.VERSION_TWO 170 chunk.session_id = message.session_id 171 else: 172 chunk.protocol_version = ProtocolVersion.LEGACY 173 chunk.session_id = message.transfer_id 174 175 if message.HasField('desired_session_id'): 176 chunk.protocol_version = ProtocolVersion.VERSION_TWO 177 chunk.desired_session_id = message.desired_session_id 178 179 if message.HasField('resource_id'): 180 chunk.resource_id = message.resource_id 181 182 if message.HasField('protocol_version'): 183 # An explicitly specified protocol version overrides any inferred 184 # one. 185 chunk.protocol_version = ProtocolVersion(message.protocol_version) 186 187 if message.HasField('pending_bytes'): 188 chunk.window_end_offset = message.offset + message.pending_bytes 189 190 if message.HasField('remaining_bytes'): 191 chunk.remaining_bytes = message.remaining_bytes 192 193 if message.HasField('max_chunk_size_bytes'): 194 chunk.max_chunk_size_bytes = message.max_chunk_size_bytes 195 196 if message.HasField('min_delay_microseconds'): 197 chunk.min_delay_microseconds = message.min_delay_microseconds 198 199 if message.HasField('status'): 200 chunk.status = Status(message.status) 201 202 if chunk.protocol_version is ProtocolVersion.UNKNOWN: 203 # If no fields in the chunk specified its protocol version, 204 # assume it is a legacy chunk. 205 chunk.protocol_version = ProtocolVersion.LEGACY 206 207 return chunk 208 209 def to_message(self) -> transfer_pb2.Chunk: 210 """Converts the chunk to a protobuf message.""" 211 message = transfer_pb2.Chunk( 212 offset=self.offset, 213 window_end_offset=self.window_end_offset, 214 type=self.type, 215 ) 216 217 if self.resource_id is not None: 218 message.resource_id = self.resource_id 219 220 if self.protocol_version is ProtocolVersion.VERSION_TWO: 221 if self.session_id != 0: 222 assert self.desired_session_id is None 223 message.session_id = self.session_id 224 225 if self.desired_session_id is not None: 226 message.desired_session_id = self.desired_session_id 227 228 if self._should_encode_legacy_fields(): 229 if self.resource_id is not None: 230 message.transfer_id = self.resource_id 231 else: 232 assert self.session_id != 0 233 message.transfer_id = self.session_id 234 235 # In the legacy protocol, the pending_bytes field must be set 236 # alongside window_end_offset, as some transfer implementations 237 # require it. 238 if self.window_end_offset != 0: 239 message.pending_bytes = self.window_end_offset - self.offset 240 241 if self.data: 242 message.data = self.data 243 244 if self.remaining_bytes is not None: 245 message.remaining_bytes = self.remaining_bytes 246 247 if self.max_chunk_size_bytes is not None: 248 message.max_chunk_size_bytes = self.max_chunk_size_bytes 249 250 if self.min_delay_microseconds is not None: 251 message.min_delay_microseconds = self.min_delay_microseconds 252 253 if self.status is not None: 254 message.status = self.status.value 255 256 if self._is_initial_handshake_chunk(): 257 # During the initial handshake, the desired protocol version is 258 # explictly encoded. 259 message.protocol_version = self.protocol_version.value 260 261 message.initial_offset = self.initial_offset 262 263 return message 264 265 def id(self) -> int: 266 """Returns the transfer context identifier for a chunk. 267 268 Depending on the protocol version and type of chunk, this may correspond 269 to one of several proto fields. 270 """ 271 return self.session_id 272 273 def requests_transmission_from_offset(self) -> bool: 274 """Returns True if this chunk is requesting a retransmission.""" 275 return ( 276 self.type is Chunk.Type.PARAMETERS_RETRANSMIT 277 or self.type is Chunk.Type.START 278 or self.type is Chunk.Type.START_ACK_CONFIRMATION 279 ) 280 281 def _is_initial_handshake_chunk(self) -> bool: 282 return self.protocol_version is ProtocolVersion.VERSION_TWO and ( 283 self.type is Chunk.Type.START 284 or self.type is Chunk.Type.START_ACK 285 or self.type is Chunk.Type.START_ACK_CONFIRMATION 286 ) 287 288 def _should_encode_legacy_fields(self) -> bool: 289 return ( 290 self.protocol_version is ProtocolVersion.LEGACY 291 or self.type is Chunk.Type.START 292 ) 293