• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2023 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Protocol version-aware chunk message wrapper."""
15
16from __future__ import annotations
17
18import enum
19from typing import Any
20
21from pw_status import Status
22
23try:
24    from pw_transfer import transfer_pb2
25except ImportError:
26    # For the bazel build, which puts generated protos in a different location.
27    from pigweed.pw_transfer import transfer_pb2  # type: ignore
28
29
30class ProtocolVersion(enum.Enum):
31    """Supported versions of pw_transfer's RPC data transfer protocol."""
32
33    # Protocol version not known or not set.
34    UNKNOWN = 0
35
36    # The original transfer protocol, prior to transfer start/end handshakes.
37    LEGACY = 1
38
39    # Second version of the transfer protocol. Guarantees type fields on all
40    # chunks, deprecates pending_bytes in favor of window_end_offset, splits
41    # transfer resource IDs from ephemeral session IDs, and adds a handshake
42    # to the start and end of all transfer sessions.
43    VERSION_TWO = 2
44
45    # Alias to the most up-to-date version of the transfer protocol.
46    LATEST = VERSION_TWO
47
48
49_ChunkType = transfer_pb2.Chunk.Type
50
51
52class Chunk:
53    """A chunk exchanged in a pw_transfer stream.
54
55    Wraps the generated protobuf Chunk class with protocol-aware field encoding
56    and decoding.
57
58    Attributes:
59        protocol_version: Version of the transfer protocol with which the chunk
60            is encoded.
61        chunk_type: Type of the chunk within the protocol.
62        session_id: ID for the transfer session to which the chunk belongs.
63        desired_session_id: For a v2 START chunk, the client-assigned session ID
64            to request from the server.
65        resource_id: For a v2 START chunk, ID of the resource to transfer.
66        offset: Offset of the data to be transferred.
67        window_end_offset: In a parameters chunk, end offset of the available
68            window.
69        data: Raw transfer data.
70        remaining_bytes: Optional number of bytes remaining in the transfer.
71            Set to 0 when the data is fully transferred.
72        max_chunk_size_bytes: Maximum number of bytes to send in a single data
73            chunk.
74        min_delay_microseconds: Delay between data chunks to be sent.
75    """
76
77    # pylint: disable=too-many-instance-attributes
78
79    Type = transfer_pb2.Chunk.Type
80
81    # TODO(frolv): Figure out how to make the chunk type annotation work.
82    # pylint: disable=too-many-arguments
83    def __init__(
84        self,
85        protocol_version: ProtocolVersion,
86        chunk_type: Any,
87        session_id: int = 0,
88        desired_session_id: int | None = None,
89        resource_id: int | None = None,
90        offset: int = 0,
91        window_end_offset: int = 0,
92        data: bytes = b'',
93        remaining_bytes: int | None = None,
94        max_chunk_size_bytes: int | None = None,
95        min_delay_microseconds: int | None = None,
96        status: Status | None = None,
97        initial_offset: int = 0,
98    ):
99        """Creates a new transfer chunk.
100
101        Args:
102            protocol_version: Version of the transfer protocol with which to
103                encode the chunk.
104            chunk_type: Type of the chunk within the protocol.
105            session_id: ID for the transfer session to which the chunk belongs.
106            desired_session_id: For a v2 START chunk, the client-assigned
107                session ID to request from the server.
108            resource_id: For a v2 START chunk, ID of the resource to transfer.
109            offset: Offset of the data to be transferred.
110            window_end_offset: In a parameters chunk, end offset of the
111                available window.
112            data: Raw transfer data.
113            remaining_bytes: Optional number of bytes remaining in the transfer.
114                Set to 0 when the data is fully transferred.
115            max_chunk_size_bytes: Maximum number of bytes to send in a single
116                data chunk.
117            min_delay_microseconds: Delay between data chunks to be sent.
118            status: In a COMPLETION chunk, final status of the transfer.
119            initial_offset: Initial offset for non-zero starting offset
120            transfers
121        """
122        self.protocol_version = protocol_version
123        self.type = chunk_type
124        self.session_id = session_id
125        self.desired_session_id = desired_session_id
126        self.resource_id = resource_id
127        self.offset = offset
128        self.window_end_offset = window_end_offset
129        self.data = data
130        self.remaining_bytes = remaining_bytes
131        self.max_chunk_size_bytes = max_chunk_size_bytes
132        self.min_delay_microseconds = min_delay_microseconds
133        self.status = status
134        self.initial_offset = initial_offset
135
136    @classmethod
137    def from_message(cls, message: transfer_pb2.Chunk) -> Chunk:
138        """Parses a Chunk from a protobuf message."""
139
140        # Some very old versions of transfer don't always encode chunk types,
141        # so they must be deduced.
142        #
143        # The type-less legacy transfer protocol doesn't support handshakes or
144        # continuation parameters. Therefore, there are only three possible
145        # types: start, data, and retransmit.
146        if message.HasField('type'):
147            chunk_type = message.type
148        elif (
149            message.offset == 0
150            and not message.data
151            and not message.HasField('status')
152        ):
153            chunk_type = Chunk.Type.START
154        elif message.data:
155            chunk_type = Chunk.Type.DATA
156        else:
157            chunk_type = Chunk.Type.PARAMETERS_RETRANSMIT
158
159        chunk = cls(
160            ProtocolVersion.UNKNOWN,
161            chunk_type,
162            offset=message.offset,
163            window_end_offset=message.window_end_offset,
164            data=message.data,
165            initial_offset=message.initial_offset,
166        )
167
168        if message.HasField('session_id'):
169            chunk.protocol_version = ProtocolVersion.VERSION_TWO
170            chunk.session_id = message.session_id
171        else:
172            chunk.protocol_version = ProtocolVersion.LEGACY
173            chunk.session_id = message.transfer_id
174
175        if message.HasField('desired_session_id'):
176            chunk.protocol_version = ProtocolVersion.VERSION_TWO
177            chunk.desired_session_id = message.desired_session_id
178
179        if message.HasField('resource_id'):
180            chunk.resource_id = message.resource_id
181
182        if message.HasField('protocol_version'):
183            # An explicitly specified protocol version overrides any inferred
184            # one.
185            chunk.protocol_version = ProtocolVersion(message.protocol_version)
186
187        if message.HasField('pending_bytes'):
188            chunk.window_end_offset = message.offset + message.pending_bytes
189
190        if message.HasField('remaining_bytes'):
191            chunk.remaining_bytes = message.remaining_bytes
192
193        if message.HasField('max_chunk_size_bytes'):
194            chunk.max_chunk_size_bytes = message.max_chunk_size_bytes
195
196        if message.HasField('min_delay_microseconds'):
197            chunk.min_delay_microseconds = message.min_delay_microseconds
198
199        if message.HasField('status'):
200            chunk.status = Status(message.status)
201
202        if chunk.protocol_version is ProtocolVersion.UNKNOWN:
203            # If no fields in the chunk specified its protocol version,
204            # assume it is a legacy chunk.
205            chunk.protocol_version = ProtocolVersion.LEGACY
206
207        return chunk
208
209    def to_message(self) -> transfer_pb2.Chunk:
210        """Converts the chunk to a protobuf message."""
211        message = transfer_pb2.Chunk(
212            offset=self.offset,
213            window_end_offset=self.window_end_offset,
214            type=self.type,
215        )
216
217        if self.resource_id is not None:
218            message.resource_id = self.resource_id
219
220        if self.protocol_version is ProtocolVersion.VERSION_TWO:
221            if self.session_id != 0:
222                assert self.desired_session_id is None
223                message.session_id = self.session_id
224
225            if self.desired_session_id is not None:
226                message.desired_session_id = self.desired_session_id
227
228        if self._should_encode_legacy_fields():
229            if self.resource_id is not None:
230                message.transfer_id = self.resource_id
231            else:
232                assert self.session_id != 0
233                message.transfer_id = self.session_id
234
235            # In the legacy protocol, the pending_bytes field must be set
236            # alongside window_end_offset, as some transfer implementations
237            # require it.
238            if self.window_end_offset != 0:
239                message.pending_bytes = self.window_end_offset - self.offset
240
241        if self.data:
242            message.data = self.data
243
244        if self.remaining_bytes is not None:
245            message.remaining_bytes = self.remaining_bytes
246
247        if self.max_chunk_size_bytes is not None:
248            message.max_chunk_size_bytes = self.max_chunk_size_bytes
249
250        if self.min_delay_microseconds is not None:
251            message.min_delay_microseconds = self.min_delay_microseconds
252
253        if self.status is not None:
254            message.status = self.status.value
255
256        if self._is_initial_handshake_chunk():
257            # During the initial handshake, the desired protocol version is
258            # explictly encoded.
259            message.protocol_version = self.protocol_version.value
260
261        message.initial_offset = self.initial_offset
262
263        return message
264
265    def id(self) -> int:
266        """Returns the transfer context identifier for a chunk.
267
268        Depending on the protocol version and type of chunk, this may correspond
269        to one of several proto fields.
270        """
271        return self.session_id
272
273    def requests_transmission_from_offset(self) -> bool:
274        """Returns True if this chunk is requesting a retransmission."""
275        return (
276            self.type is Chunk.Type.PARAMETERS_RETRANSMIT
277            or self.type is Chunk.Type.START
278            or self.type is Chunk.Type.START_ACK_CONFIRMATION
279        )
280
281    def _is_initial_handshake_chunk(self) -> bool:
282        return self.protocol_version is ProtocolVersion.VERSION_TWO and (
283            self.type is Chunk.Type.START
284            or self.type is Chunk.Type.START_ACK
285            or self.type is Chunk.Type.START_ACK_CONFIRMATION
286        )
287
288    def _should_encode_legacy_fields(self) -> bool:
289        return (
290            self.protocol_version is ProtocolVersion.LEGACY
291            or self.type is Chunk.Type.START
292        )
293