• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# Copyright 2023 The Pigweed Authors
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5# use this file except in compliance with the License. You may obtain a copy of
6# the License at
7#
8#     https://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations under
14# the License.
15"""Tests for the transfer service client."""
16
17import enum
18import math
19import os
20import unittest
21from typing import Iterable
22
23from pw_status import Status
24from pw_rpc import callback_client, client, ids, packets
25from pw_rpc.internal import packet_pb2
26
27import pw_transfer
28from pw_transfer import ProtocolVersion
29
30try:
31    from pw_transfer import transfer_pb2
32except ImportError:
33    # For the bazel build, which puts generated protos in a different location.
34    from pigweed.pw_transfer import transfer_pb2  # type: ignore
35
36_TRANSFER_SERVICE_ID = ids.calculate('pw.transfer.Transfer')
37_FIRST_SESSION_ID = 1
38_ARBITRARY_TRANSFER_ID = 66
39
40# If the default timeout is too short, some tests become flaky on Windows.
41DEFAULT_TIMEOUT_S = 0.3
42
43
44class _Method(enum.Enum):
45    READ = ids.calculate('Read')
46    WRITE = ids.calculate('Write')
47
48
49# pylint: disable=missing-function-docstring, missing-class-docstring
50
51
52class TransferManagerTest(unittest.TestCase):
53    # pylint: disable=too-many-public-methods
54    """Tests for the transfer manager."""
55
56    def setUp(self) -> None:
57        self._client = client.Client.from_modules(
58            callback_client.Impl(),
59            [client.Channel(1, self._handle_request)],
60            (transfer_pb2,),
61        )
62        self._service = self._client.channel(1).rpcs.pw.transfer.Transfer
63
64        self._sent_chunks: list[transfer_pb2.Chunk] = []
65        self._packets_to_send: list[list[packet_pb2.RpcPacket]] = []
66
67    def _enqueue_server_responses(
68        self, method: _Method, responses: Iterable[Iterable[transfer_pb2.Chunk]]
69    ) -> None:
70        for group in responses:
71            serialized_group = []
72            for response in group:
73                serialized_group.append(
74                    packet_pb2.RpcPacket(
75                        type=packet_pb2.PacketType.SERVER_STREAM,
76                        channel_id=1,
77                        service_id=_TRANSFER_SERVICE_ID,
78                        method_id=method.value,
79                        status=Status.OK.value,
80                        payload=response.SerializeToString(),
81                    )
82                )
83            self._packets_to_send.append(serialized_group)
84
85    def _enqueue_server_error(self, method: _Method, error: Status) -> None:
86        self._packets_to_send.append(
87            [
88                packet_pb2.RpcPacket(
89                    type=packet_pb2.PacketType.SERVER_ERROR,
90                    channel_id=1,
91                    service_id=_TRANSFER_SERVICE_ID,
92                    method_id=method.value,
93                    status=error.value,
94                )
95            ]
96        )
97
98    def _handle_request(self, data: bytes) -> None:
99        packet = packets.decode(data)
100        if packet.type is not packet_pb2.PacketType.CLIENT_STREAM:
101            return
102
103        chunk = transfer_pb2.Chunk()
104        chunk.MergeFromString(packet.payload)
105        self._sent_chunks.append(chunk)
106
107        if self._packets_to_send:
108            responses = self._packets_to_send.pop(0)
109            for response in responses:
110                response.call_id = packet.call_id
111                self._client.process_packet(response.SerializeToString())
112
113    def _received_data(self) -> bytearray:
114        data = bytearray()
115        for chunk in self._sent_chunks:
116            data.extend(chunk.data)
117        return data
118
119    def test_read_transfer_basic(self):
120        manager = pw_transfer.Manager(
121            self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S
122        )
123
124        self._enqueue_server_responses(
125            _Method.READ,
126            (
127                (
128                    transfer_pb2.Chunk(
129                        transfer_id=3, offset=0, data=b'abc', remaining_bytes=0
130                    ),
131                ),
132            ),
133        )
134
135        data = manager.read(3)
136        self.assertEqual(data, b'abc')
137        self.assertEqual(len(self._sent_chunks), 2)
138        self.assertTrue(self._sent_chunks[-1].HasField('status'))
139        self.assertEqual(self._sent_chunks[-1].status, 0)
140
141    def test_read_transfer_multichunk(self) -> None:
142        manager = pw_transfer.Manager(
143            self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S
144        )
145
146        self._enqueue_server_responses(
147            _Method.READ,
148            (
149                (
150                    transfer_pb2.Chunk(
151                        transfer_id=3, offset=0, data=b'abc', remaining_bytes=3
152                    ),
153                    transfer_pb2.Chunk(
154                        transfer_id=3, offset=3, data=b'def', remaining_bytes=0
155                    ),
156                ),
157            ),
158        )
159
160        data = manager.read(3)
161        self.assertEqual(data, b'abcdef')
162        self.assertEqual(len(self._sent_chunks), 2)
163        self.assertTrue(self._sent_chunks[-1].HasField('status'))
164        self.assertEqual(self._sent_chunks[-1].status, 0)
165
166    def test_read_transfer_progress_callback(self) -> None:
167        manager = pw_transfer.Manager(
168            self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S
169        )
170
171        self._enqueue_server_responses(
172            _Method.READ,
173            (
174                (
175                    transfer_pb2.Chunk(
176                        transfer_id=3, offset=0, data=b'abc', remaining_bytes=3
177                    ),
178                    transfer_pb2.Chunk(
179                        transfer_id=3, offset=3, data=b'def', remaining_bytes=0
180                    ),
181                ),
182            ),
183        )
184
185        progress: list[pw_transfer.ProgressStats] = []
186
187        data = manager.read(3, progress.append)
188        self.assertEqual(data, b'abcdef')
189        self.assertEqual(len(self._sent_chunks), 2)
190        self.assertTrue(self._sent_chunks[-1].HasField('status'))
191        self.assertEqual(self._sent_chunks[-1].status, 0)
192        self.assertEqual(
193            progress,
194            [
195                pw_transfer.ProgressStats(3, 3, 6),
196                pw_transfer.ProgressStats(6, 6, 6),
197            ],
198        )
199
200    def test_read_transfer_retry_bad_offset(self) -> None:
201        """Server responds with an unexpected offset in a read transfer."""
202        manager = pw_transfer.Manager(
203            self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S
204        )
205
206        self._enqueue_server_responses(
207            _Method.READ,
208            (
209                (
210                    transfer_pb2.Chunk(
211                        transfer_id=3, offset=0, data=b'123', remaining_bytes=6
212                    ),
213                    # Incorrect offset; expecting 3.
214                    transfer_pb2.Chunk(
215                        transfer_id=3, offset=1, data=b'456', remaining_bytes=3
216                    ),
217                ),
218                (
219                    transfer_pb2.Chunk(
220                        transfer_id=3, offset=3, data=b'456', remaining_bytes=3
221                    ),
222                    transfer_pb2.Chunk(
223                        transfer_id=3, offset=6, data=b'789', remaining_bytes=0
224                    ),
225                ),
226            ),
227        )
228
229        data = manager.read(3)
230        self.assertEqual(data, b'123456789')
231
232        # Two transfer parameter requests should have been sent.
233        self.assertEqual(len(self._sent_chunks), 3)
234        self.assertTrue(self._sent_chunks[-1].HasField('status'))
235        self.assertEqual(self._sent_chunks[-1].status, 0)
236
237    def test_read_transfer_recovery_sends_parameters_on_retry(self) -> None:
238        """Server sends the same chunk twice (retry) in a read transfer."""
239        manager = pw_transfer.Manager(
240            self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S
241        )
242
243        self._enqueue_server_responses(
244            _Method.READ,
245            (
246                (
247                    # Bad offset, enter recovery state. Only one parameters
248                    # chunk should be sent.
249                    transfer_pb2.Chunk(
250                        transfer_id=3, offset=1, data=b'234', remaining_bytes=5
251                    ),
252                    transfer_pb2.Chunk(
253                        transfer_id=3, offset=4, data=b'567', remaining_bytes=2
254                    ),
255                    transfer_pb2.Chunk(
256                        transfer_id=3, offset=7, data=b'8', remaining_bytes=1
257                    ),
258                ),
259                (
260                    # Only one parameters chunk should be sent after the server
261                    # retries the same offset twice.
262                    transfer_pb2.Chunk(
263                        transfer_id=3, offset=1, data=b'234', remaining_bytes=5
264                    ),
265                    transfer_pb2.Chunk(
266                        transfer_id=3, offset=4, data=b'567', remaining_bytes=2
267                    ),
268                    transfer_pb2.Chunk(
269                        transfer_id=3, offset=7, data=b'8', remaining_bytes=1
270                    ),
271                    transfer_pb2.Chunk(
272                        transfer_id=3, offset=7, data=b'8', remaining_bytes=1
273                    ),
274                ),
275                (
276                    transfer_pb2.Chunk(
277                        transfer_id=3,
278                        offset=0,
279                        data=b'123456789',
280                        remaining_bytes=0,
281                    ),
282                ),
283            ),
284        )
285
286        data = manager.read(3)
287        self.assertEqual(data, b'123456789')
288
289        self.assertEqual(len(self._sent_chunks), 4)
290        self.assertEqual(
291            self._sent_chunks[0].type, transfer_pb2.Chunk.Type.START
292        )
293        self.assertEqual(self._sent_chunks[0].offset, 0)
294        self.assertEqual(
295            self._sent_chunks[1].type,
296            transfer_pb2.Chunk.Type.PARAMETERS_RETRANSMIT,
297        )
298        self.assertEqual(self._sent_chunks[1].offset, 0)
299        self.assertEqual(
300            self._sent_chunks[2].type,
301            transfer_pb2.Chunk.Type.PARAMETERS_RETRANSMIT,
302        )
303        self.assertEqual(self._sent_chunks[2].offset, 0)
304        self.assertEqual(
305            self._sent_chunks[3].type, transfer_pb2.Chunk.Type.COMPLETION
306        )
307
308    def test_read_transfer_retry_timeout(self) -> None:
309        """Server doesn't respond to read transfer parameters."""
310        manager = pw_transfer.Manager(
311            self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S
312        )
313
314        self._enqueue_server_responses(
315            _Method.READ,
316            (
317                (),  # Send nothing in response to the initial parameters.
318                (
319                    transfer_pb2.Chunk(
320                        transfer_id=3, offset=0, data=b'xyz', remaining_bytes=0
321                    ),
322                ),
323            ),
324        )
325
326        data = manager.read(3)
327        self.assertEqual(data, b'xyz')
328
329        # Two transfer parameter requests should have been sent.
330        self.assertEqual(len(self._sent_chunks), 3)
331        self.assertTrue(self._sent_chunks[-1].HasField('status'))
332        self.assertEqual(self._sent_chunks[-1].status, 0)
333
334    def test_read_transfer_lifetime_retries(self) -> None:
335        """Server doesn't respond several times during the transfer."""
336        manager = pw_transfer.Manager(
337            self._service,
338            default_response_timeout_s=DEFAULT_TIMEOUT_S,
339            max_retries=2**32 - 1,
340            max_lifetime_retries=4,
341        )
342
343        self._enqueue_server_responses(
344            _Method.READ,
345            (
346                (),  # Retry 1
347                (),  # Retry 2
348                (
349                    transfer_pb2.Chunk(  # Expected chunk.
350                        transfer_id=43, offset=0, data=b'xyz'
351                    ),
352                ),
353                # Don't send anything else. The maximum lifetime retry count
354                # should be hit.
355            ),
356        )
357
358        with self.assertRaises(pw_transfer.Error) as context:
359            manager.read(43)
360
361        self.assertEqual(len(self._sent_chunks), 5)
362
363        exception = context.exception
364        self.assertEqual(exception.resource_id, 43)
365        self.assertEqual(exception.status, Status.DEADLINE_EXCEEDED)
366
367    def test_read_transfer_timeout(self) -> None:
368        manager = pw_transfer.Manager(
369            self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S
370        )
371
372        with self.assertRaises(pw_transfer.Error) as context:
373            manager.read(27)
374
375        exception = context.exception
376        self.assertEqual(exception.resource_id, 27)
377        self.assertEqual(exception.status, Status.DEADLINE_EXCEEDED)
378
379        # The client should have sent four transfer parameters requests: one
380        # initial, and three retries.
381        self.assertEqual(len(self._sent_chunks), 4)
382
383    def test_read_transfer_error(self) -> None:
384        manager = pw_transfer.Manager(
385            self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S
386        )
387
388        self._enqueue_server_responses(
389            _Method.READ,
390            (
391                (
392                    transfer_pb2.Chunk(
393                        transfer_id=31, status=Status.NOT_FOUND.value
394                    ),
395                ),
396            ),
397        )
398
399        with self.assertRaises(pw_transfer.Error) as context:
400            manager.read(31)
401
402        exception = context.exception
403        self.assertEqual(exception.resource_id, 31)
404        self.assertEqual(exception.status, Status.NOT_FOUND)
405
406    def test_read_transfer_server_error(self) -> None:
407        manager = pw_transfer.Manager(
408            self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S
409        )
410
411        self._enqueue_server_error(_Method.READ, Status.NOT_FOUND)
412
413        with self.assertRaises(pw_transfer.Error) as context:
414            manager.read(31)
415
416        exception = context.exception
417        self.assertEqual(exception.resource_id, 31)
418        self.assertEqual(exception.status, Status.INTERNAL)
419
420    def test_read_transfer_adaptive_window_slow_start(self) -> None:
421        test_max_chunk_size = 16
422
423        manager = pw_transfer.Manager(
424            self._service,
425            default_response_timeout_s=DEFAULT_TIMEOUT_S,
426            max_chunk_size_bytes=test_max_chunk_size,
427            default_protocol_version=ProtocolVersion.LEGACY,
428        )
429
430        self._enqueue_server_responses(
431            _Method.READ,
432            (
433                # First window: 1 chunk.
434                (
435                    transfer_pb2.Chunk(
436                        transfer_id=_ARBITRARY_TRANSFER_ID,
437                        type=transfer_pb2.Chunk.Type.DATA,
438                        offset=0,
439                        data=b'#' * test_max_chunk_size,
440                    ),
441                ),
442                # Second window: 2 chunks.
443                (
444                    transfer_pb2.Chunk(
445                        transfer_id=_ARBITRARY_TRANSFER_ID,
446                        type=transfer_pb2.Chunk.Type.DATA,
447                        offset=test_max_chunk_size,
448                        data=b'#' * test_max_chunk_size,
449                    ),
450                    transfer_pb2.Chunk(
451                        transfer_id=_ARBITRARY_TRANSFER_ID,
452                        type=transfer_pb2.Chunk.Type.DATA,
453                        offset=2 * test_max_chunk_size,
454                        data=b'#' * test_max_chunk_size,
455                    ),
456                ),
457                # Third window: finish transfer.
458                (
459                    transfer_pb2.Chunk(
460                        transfer_id=_ARBITRARY_TRANSFER_ID,
461                        type=transfer_pb2.Chunk.Type.DATA,
462                        offset=3 * test_max_chunk_size,
463                        data=b'#' * test_max_chunk_size,
464                        remaining_bytes=0,
465                    ),
466                ),
467            ),
468        )
469
470        data = manager.read(_ARBITRARY_TRANSFER_ID)
471
472        self.assertEqual(
473            self._sent_chunks,
474            [
475                # First parameters: 1 chunk window.
476                transfer_pb2.Chunk(
477                    type=transfer_pb2.Chunk.Type.START,
478                    transfer_id=_ARBITRARY_TRANSFER_ID,
479                    resource_id=_ARBITRARY_TRANSFER_ID,
480                    pending_bytes=test_max_chunk_size,
481                    max_chunk_size_bytes=test_max_chunk_size,
482                    window_end_offset=test_max_chunk_size,
483                ),
484                # Second parameters: 2 chunk window.
485                transfer_pb2.Chunk(
486                    type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE,
487                    transfer_id=_ARBITRARY_TRANSFER_ID,
488                    offset=test_max_chunk_size,
489                    pending_bytes=2 * test_max_chunk_size,
490                    max_chunk_size_bytes=test_max_chunk_size,
491                    window_end_offset=(
492                        test_max_chunk_size + 2 * test_max_chunk_size
493                    ),
494                ),
495                # Third parameters: 4 chunk window.
496                transfer_pb2.Chunk(
497                    type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE,
498                    transfer_id=_ARBITRARY_TRANSFER_ID,
499                    offset=2 * test_max_chunk_size,
500                    pending_bytes=4 * test_max_chunk_size,
501                    max_chunk_size_bytes=test_max_chunk_size,
502                    window_end_offset=(
503                        2 * test_max_chunk_size + 4 * test_max_chunk_size
504                    ),
505                ),
506                transfer_pb2.Chunk(
507                    type=transfer_pb2.Chunk.Type.COMPLETION,
508                    transfer_id=_ARBITRARY_TRANSFER_ID,
509                    status=Status.OK.value,
510                ),
511            ],
512        )
513        self.assertEqual(data, b'#' * (4 * test_max_chunk_size))
514
515    def test_read_transfer_adaptive_window_congestion_avoidance(self) -> None:
516        test_max_chunk_size = 16
517
518        manager = pw_transfer.Manager(
519            self._service,
520            default_response_timeout_s=DEFAULT_TIMEOUT_S,
521            max_chunk_size_bytes=test_max_chunk_size,
522            default_protocol_version=ProtocolVersion.LEGACY,
523        )
524
525        self._enqueue_server_responses(
526            _Method.READ,
527            (
528                # First window: 1 chunk.
529                (
530                    transfer_pb2.Chunk(
531                        transfer_id=_ARBITRARY_TRANSFER_ID,
532                        type=transfer_pb2.Chunk.Type.DATA,
533                        offset=0,
534                        data=b'#' * test_max_chunk_size,
535                    ),
536                ),
537                # Second window: 2 chunks.
538                (
539                    transfer_pb2.Chunk(
540                        transfer_id=_ARBITRARY_TRANSFER_ID,
541                        type=transfer_pb2.Chunk.Type.DATA,
542                        offset=test_max_chunk_size,
543                        data=b'#' * test_max_chunk_size,
544                    ),
545                    transfer_pb2.Chunk(
546                        transfer_id=_ARBITRARY_TRANSFER_ID,
547                        type=transfer_pb2.Chunk.Type.DATA,
548                        offset=2 * test_max_chunk_size,
549                        data=b'#' * test_max_chunk_size,
550                    ),
551                ),
552                # Third window: send the wrong offset, triggering a
553                # retransmission.
554                (
555                    transfer_pb2.Chunk(
556                        transfer_id=_ARBITRARY_TRANSFER_ID,
557                        type=transfer_pb2.Chunk.Type.DATA,
558                        offset=2 * test_max_chunk_size,
559                        data=b'#' * test_max_chunk_size,
560                    ),
561                ),
562                # Fourth window: send the expected offset.
563                (
564                    transfer_pb2.Chunk(
565                        transfer_id=_ARBITRARY_TRANSFER_ID,
566                        type=transfer_pb2.Chunk.Type.DATA,
567                        offset=3 * test_max_chunk_size,
568                        data=b'#' * test_max_chunk_size,
569                    ),
570                    transfer_pb2.Chunk(
571                        transfer_id=_ARBITRARY_TRANSFER_ID,
572                        type=transfer_pb2.Chunk.Type.DATA,
573                        offset=4 * test_max_chunk_size,
574                        data=b'#' * test_max_chunk_size,
575                    ),
576                ),
577                # Fifth window: finish the transfer.
578                (
579                    transfer_pb2.Chunk(
580                        transfer_id=_ARBITRARY_TRANSFER_ID,
581                        type=transfer_pb2.Chunk.Type.DATA,
582                        offset=5 * test_max_chunk_size,
583                        data=b'#' * test_max_chunk_size,
584                        remaining_bytes=0,
585                    ),
586                ),
587            ),
588        )
589
590        data = manager.read(_ARBITRARY_TRANSFER_ID)
591
592        self.assertEqual(
593            self._sent_chunks,
594            [
595                # First parameters: 1 chunk window.
596                transfer_pb2.Chunk(
597                    type=transfer_pb2.Chunk.Type.START,
598                    transfer_id=_ARBITRARY_TRANSFER_ID,
599                    resource_id=_ARBITRARY_TRANSFER_ID,
600                    pending_bytes=test_max_chunk_size,
601                    max_chunk_size_bytes=test_max_chunk_size,
602                    window_end_offset=test_max_chunk_size,
603                ),
604                # Second parameters: 2 chunk window.
605                transfer_pb2.Chunk(
606                    type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE,
607                    transfer_id=_ARBITRARY_TRANSFER_ID,
608                    offset=test_max_chunk_size,
609                    pending_bytes=2 * test_max_chunk_size,
610                    max_chunk_size_bytes=test_max_chunk_size,
611                    window_end_offset=(
612                        test_max_chunk_size + 2 * test_max_chunk_size
613                    ),
614                ),
615                # Third parameters: 4 chunk window.
616                transfer_pb2.Chunk(
617                    type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE,
618                    transfer_id=_ARBITRARY_TRANSFER_ID,
619                    offset=2 * test_max_chunk_size,
620                    pending_bytes=4 * test_max_chunk_size,
621                    max_chunk_size_bytes=test_max_chunk_size,
622                    window_end_offset=(
623                        2 * test_max_chunk_size + 4 * test_max_chunk_size
624                    ),
625                ),
626                # Fourth parameters: data loss, retransmit and halve window.
627                transfer_pb2.Chunk(
628                    type=transfer_pb2.Chunk.Type.PARAMETERS_RETRANSMIT,
629                    transfer_id=_ARBITRARY_TRANSFER_ID,
630                    offset=3 * test_max_chunk_size,
631                    pending_bytes=2 * test_max_chunk_size,
632                    max_chunk_size_bytes=test_max_chunk_size,
633                    window_end_offset=(
634                        3 * test_max_chunk_size + 2 * test_max_chunk_size
635                    ),
636                ),
637                # Fifth parameters: in congestion avoidance, window size now
638                # only increases by one chunk instead of doubling.
639                transfer_pb2.Chunk(
640                    type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE,
641                    transfer_id=_ARBITRARY_TRANSFER_ID,
642                    offset=4 * test_max_chunk_size,
643                    pending_bytes=3 * test_max_chunk_size,
644                    max_chunk_size_bytes=test_max_chunk_size,
645                    window_end_offset=(
646                        4 * test_max_chunk_size + 3 * test_max_chunk_size
647                    ),
648                ),
649                transfer_pb2.Chunk(
650                    type=transfer_pb2.Chunk.Type.COMPLETION,
651                    transfer_id=_ARBITRARY_TRANSFER_ID,
652                    status=Status.OK.value,
653                ),
654            ],
655        )
656        self.assertEqual(data, b'#' * (6 * test_max_chunk_size))
657
658    def test_read_transfer_v2_adaptive_window_slow_start(self) -> None:
659        test_max_chunk_size = 16
660
661        manager = pw_transfer.Manager(
662            self._service,
663            default_response_timeout_s=DEFAULT_TIMEOUT_S,
664            max_chunk_size_bytes=test_max_chunk_size,
665            default_protocol_version=ProtocolVersion.VERSION_TWO,
666        )
667
668        self._enqueue_server_responses(
669            _Method.READ,
670            (
671                (
672                    transfer_pb2.Chunk(
673                        session_id=_FIRST_SESSION_ID,
674                        type=transfer_pb2.Chunk.Type.START_ACK,
675                        protocol_version=ProtocolVersion.VERSION_TWO.value,
676                    ),
677                ),
678                # First window: 1 chunk.
679                (
680                    transfer_pb2.Chunk(
681                        session_id=_FIRST_SESSION_ID,
682                        type=transfer_pb2.Chunk.Type.DATA,
683                        offset=0,
684                        data=b'#' * test_max_chunk_size,
685                    ),
686                ),
687                # Second window: 2 chunks.
688                (
689                    transfer_pb2.Chunk(
690                        session_id=_FIRST_SESSION_ID,
691                        type=transfer_pb2.Chunk.Type.DATA,
692                        offset=test_max_chunk_size,
693                        data=b'#' * test_max_chunk_size,
694                    ),
695                    transfer_pb2.Chunk(
696                        session_id=_FIRST_SESSION_ID,
697                        type=transfer_pb2.Chunk.Type.DATA,
698                        offset=2 * test_max_chunk_size,
699                        data=b'#' * test_max_chunk_size,
700                    ),
701                ),
702                # Third window: finish transfer.
703                (
704                    transfer_pb2.Chunk(
705                        session_id=_FIRST_SESSION_ID,
706                        type=transfer_pb2.Chunk.Type.DATA,
707                        offset=3 * test_max_chunk_size,
708                        data=b'#' * test_max_chunk_size,
709                        remaining_bytes=0,
710                    ),
711                ),
712                (
713                    transfer_pb2.Chunk(
714                        session_id=_FIRST_SESSION_ID,
715                        type=transfer_pb2.Chunk.Type.COMPLETION_ACK,
716                    ),
717                ),
718            ),
719        )
720
721        data = manager.read(_ARBITRARY_TRANSFER_ID)
722
723        self.assertEqual(
724            self._sent_chunks,
725            [
726                transfer_pb2.Chunk(
727                    transfer_id=_ARBITRARY_TRANSFER_ID,
728                    resource_id=_ARBITRARY_TRANSFER_ID,
729                    desired_session_id=_FIRST_SESSION_ID,
730                    pending_bytes=test_max_chunk_size,
731                    max_chunk_size_bytes=test_max_chunk_size,
732                    window_end_offset=test_max_chunk_size,
733                    type=transfer_pb2.Chunk.Type.START,
734                    protocol_version=ProtocolVersion.VERSION_TWO.value,
735                ),
736                # First parameters: 1 chunk window.
737                transfer_pb2.Chunk(
738                    session_id=_FIRST_SESSION_ID,
739                    type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION,
740                    offset=0,
741                    max_chunk_size_bytes=test_max_chunk_size,
742                    window_end_offset=test_max_chunk_size,
743                    protocol_version=ProtocolVersion.VERSION_TWO.value,
744                ),
745                # Second parameters: 2 chunk window.
746                transfer_pb2.Chunk(
747                    session_id=_FIRST_SESSION_ID,
748                    type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE,
749                    offset=test_max_chunk_size,
750                    max_chunk_size_bytes=test_max_chunk_size,
751                    window_end_offset=(
752                        test_max_chunk_size + 2 * test_max_chunk_size
753                    ),
754                ),
755                # Third parameters: 4 chunk window.
756                transfer_pb2.Chunk(
757                    session_id=_FIRST_SESSION_ID,
758                    type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE,
759                    offset=2 * test_max_chunk_size,
760                    max_chunk_size_bytes=test_max_chunk_size,
761                    window_end_offset=(
762                        2 * test_max_chunk_size + 4 * test_max_chunk_size
763                    ),
764                ),
765                transfer_pb2.Chunk(
766                    session_id=_FIRST_SESSION_ID,
767                    type=transfer_pb2.Chunk.Type.COMPLETION,
768                    status=Status.OK.value,
769                ),
770            ],
771        )
772        self.assertEqual(data, b'#' * (4 * test_max_chunk_size))
773
774    def test_read_transfer_v2_adaptive_window_congestion_avoidance(
775        self,
776    ) -> None:
777        test_max_chunk_size = 16
778
779        manager = pw_transfer.Manager(
780            self._service,
781            default_response_timeout_s=DEFAULT_TIMEOUT_S,
782            max_chunk_size_bytes=test_max_chunk_size,
783            default_protocol_version=ProtocolVersion.VERSION_TWO,
784        )
785
786        self._enqueue_server_responses(
787            _Method.READ,
788            (
789                (
790                    transfer_pb2.Chunk(
791                        session_id=_FIRST_SESSION_ID,
792                        type=transfer_pb2.Chunk.Type.START_ACK,
793                        protocol_version=ProtocolVersion.VERSION_TWO.value,
794                    ),
795                ),
796                # First window: 1 chunk.
797                (
798                    transfer_pb2.Chunk(
799                        session_id=_FIRST_SESSION_ID,
800                        type=transfer_pb2.Chunk.Type.DATA,
801                        offset=0,
802                        data=b'#' * test_max_chunk_size,
803                    ),
804                ),
805                # Second window: 2 chunks.
806                (
807                    transfer_pb2.Chunk(
808                        session_id=_FIRST_SESSION_ID,
809                        type=transfer_pb2.Chunk.Type.DATA,
810                        offset=test_max_chunk_size,
811                        data=b'#' * test_max_chunk_size,
812                    ),
813                    transfer_pb2.Chunk(
814                        session_id=_FIRST_SESSION_ID,
815                        type=transfer_pb2.Chunk.Type.DATA,
816                        offset=2 * test_max_chunk_size,
817                        data=b'#' * test_max_chunk_size,
818                    ),
819                ),
820                # Third window: send the wrong offset, triggering a
821                # retransmission.
822                (
823                    transfer_pb2.Chunk(
824                        session_id=_FIRST_SESSION_ID,
825                        type=transfer_pb2.Chunk.Type.DATA,
826                        offset=2 * test_max_chunk_size,
827                        data=b'#' * test_max_chunk_size,
828                    ),
829                ),
830                # Fourth window: send the expected offset.
831                (
832                    transfer_pb2.Chunk(
833                        session_id=_FIRST_SESSION_ID,
834                        type=transfer_pb2.Chunk.Type.DATA,
835                        offset=3 * test_max_chunk_size,
836                        data=b'#' * test_max_chunk_size,
837                    ),
838                    transfer_pb2.Chunk(
839                        session_id=_FIRST_SESSION_ID,
840                        type=transfer_pb2.Chunk.Type.DATA,
841                        offset=4 * test_max_chunk_size,
842                        data=b'#' * test_max_chunk_size,
843                    ),
844                ),
845                # Fifth window: finish the transfer.
846                (
847                    transfer_pb2.Chunk(
848                        session_id=_FIRST_SESSION_ID,
849                        type=transfer_pb2.Chunk.Type.DATA,
850                        offset=5 * test_max_chunk_size,
851                        data=b'#' * test_max_chunk_size,
852                        remaining_bytes=0,
853                    ),
854                ),
855                (
856                    transfer_pb2.Chunk(
857                        session_id=_FIRST_SESSION_ID,
858                        type=transfer_pb2.Chunk.Type.COMPLETION_ACK,
859                    ),
860                ),
861            ),
862        )
863
864        data = manager.read(_ARBITRARY_TRANSFER_ID)
865
866        self.assertEqual(
867            self._sent_chunks,
868            [
869                transfer_pb2.Chunk(
870                    type=transfer_pb2.Chunk.Type.START,
871                    transfer_id=_ARBITRARY_TRANSFER_ID,
872                    resource_id=_ARBITRARY_TRANSFER_ID,
873                    desired_session_id=_FIRST_SESSION_ID,
874                    pending_bytes=test_max_chunk_size,
875                    max_chunk_size_bytes=test_max_chunk_size,
876                    window_end_offset=test_max_chunk_size,
877                    protocol_version=ProtocolVersion.VERSION_TWO.value,
878                ),
879                # First parameters: 1 chunk window.
880                transfer_pb2.Chunk(
881                    session_id=_FIRST_SESSION_ID,
882                    type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION,
883                    offset=0,
884                    max_chunk_size_bytes=test_max_chunk_size,
885                    window_end_offset=test_max_chunk_size,
886                    protocol_version=ProtocolVersion.VERSION_TWO.value,
887                ),
888                # Second parameters: 2 chunk window.
889                transfer_pb2.Chunk(
890                    type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE,
891                    session_id=_FIRST_SESSION_ID,
892                    offset=test_max_chunk_size,
893                    max_chunk_size_bytes=test_max_chunk_size,
894                    window_end_offset=(
895                        test_max_chunk_size + 2 * test_max_chunk_size
896                    ),
897                ),
898                # Third parameters: 4 chunk window.
899                transfer_pb2.Chunk(
900                    type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE,
901                    session_id=_FIRST_SESSION_ID,
902                    offset=2 * test_max_chunk_size,
903                    max_chunk_size_bytes=test_max_chunk_size,
904                    window_end_offset=(
905                        2 * test_max_chunk_size + 4 * test_max_chunk_size
906                    ),
907                ),
908                # Fourth parameters: data loss, retransmit and halve window.
909                transfer_pb2.Chunk(
910                    type=transfer_pb2.Chunk.Type.PARAMETERS_RETRANSMIT,
911                    session_id=_FIRST_SESSION_ID,
912                    offset=3 * test_max_chunk_size,
913                    max_chunk_size_bytes=test_max_chunk_size,
914                    window_end_offset=(
915                        3 * test_max_chunk_size + 2 * test_max_chunk_size
916                    ),
917                ),
918                # Fifth parameters: in congestion avoidance, window size now
919                # only increases by one chunk instead of doubling.
920                transfer_pb2.Chunk(
921                    type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE,
922                    session_id=_FIRST_SESSION_ID,
923                    offset=4 * test_max_chunk_size,
924                    max_chunk_size_bytes=test_max_chunk_size,
925                    window_end_offset=(
926                        4 * test_max_chunk_size + 3 * test_max_chunk_size
927                    ),
928                ),
929                transfer_pb2.Chunk(
930                    type=transfer_pb2.Chunk.Type.COMPLETION,
931                    session_id=_FIRST_SESSION_ID,
932                    status=Status.OK.value,
933                ),
934            ],
935        )
936        self.assertEqual(data, b'#' * (6 * test_max_chunk_size))
937
938    def test_write_transfer_basic(self) -> None:
939        manager = pw_transfer.Manager(
940            self._service,
941            default_response_timeout_s=DEFAULT_TIMEOUT_S,
942        )
943
944        self._enqueue_server_responses(
945            _Method.WRITE,
946            (
947                (
948                    transfer_pb2.Chunk(
949                        transfer_id=4,
950                        offset=0,
951                        pending_bytes=32,
952                        max_chunk_size_bytes=8,
953                    ),
954                ),
955                (transfer_pb2.Chunk(transfer_id=4, status=Status.OK.value),),
956            ),
957        )
958
959        manager.write(4, b'hello')
960        self.assertEqual(len(self._sent_chunks), 2)
961        self.assertEqual(self._received_data(), b'hello')
962
963    def test_write_transfer_max_chunk_size(self) -> None:
964        manager = pw_transfer.Manager(
965            self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S
966        )
967
968        self._enqueue_server_responses(
969            _Method.WRITE,
970            (
971                (
972                    transfer_pb2.Chunk(
973                        transfer_id=4,
974                        offset=0,
975                        pending_bytes=32,
976                        max_chunk_size_bytes=8,
977                    ),
978                ),
979                (),
980                (transfer_pb2.Chunk(transfer_id=4, status=Status.OK.value),),
981            ),
982        )
983
984        manager.write(4, b'hello world')
985        self.assertEqual(len(self._sent_chunks), 3)
986        self.assertEqual(self._received_data(), b'hello world')
987        self.assertEqual(self._sent_chunks[1].data, b'hello wo')
988        self.assertEqual(self._sent_chunks[2].data, b'rld')
989
990    def test_write_transfer_multiple_parameters(self) -> None:
991        manager = pw_transfer.Manager(
992            self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S
993        )
994
995        self._enqueue_server_responses(
996            _Method.WRITE,
997            (
998                (
999                    transfer_pb2.Chunk(
1000                        transfer_id=4,
1001                        offset=0,
1002                        pending_bytes=8,
1003                        max_chunk_size_bytes=8,
1004                    ),
1005                ),
1006                (
1007                    transfer_pb2.Chunk(
1008                        transfer_id=4,
1009                        offset=8,
1010                        pending_bytes=8,
1011                        max_chunk_size_bytes=8,
1012                    ),
1013                ),
1014                (transfer_pb2.Chunk(transfer_id=4, status=Status.OK.value),),
1015            ),
1016        )
1017
1018        manager.write(4, b'data to write')
1019        self.assertEqual(len(self._sent_chunks), 3)
1020        self.assertEqual(self._received_data(), b'data to write')
1021        self.assertEqual(self._sent_chunks[1].data, b'data to ')
1022        self.assertEqual(self._sent_chunks[2].data, b'write')
1023
1024    def test_write_transfer_progress_callback(self) -> None:
1025        manager = pw_transfer.Manager(
1026            self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S
1027        )
1028
1029        self._enqueue_server_responses(
1030            _Method.WRITE,
1031            (
1032                (
1033                    transfer_pb2.Chunk(
1034                        transfer_id=4,
1035                        offset=0,
1036                        pending_bytes=8,
1037                        max_chunk_size_bytes=8,
1038                    ),
1039                ),
1040                (
1041                    transfer_pb2.Chunk(
1042                        transfer_id=4,
1043                        offset=8,
1044                        pending_bytes=8,
1045                        max_chunk_size_bytes=8,
1046                    ),
1047                ),
1048                (transfer_pb2.Chunk(transfer_id=4, status=Status.OK.value),),
1049            ),
1050        )
1051
1052        progress: list[pw_transfer.ProgressStats] = []
1053
1054        manager.write(4, b'data to write', progress.append)
1055        self.assertEqual(len(self._sent_chunks), 3)
1056        self.assertEqual(self._received_data(), b'data to write')
1057        self.assertEqual(self._sent_chunks[1].data, b'data to ')
1058        self.assertEqual(self._sent_chunks[2].data, b'write')
1059        self.assertEqual(
1060            progress,
1061            [
1062                pw_transfer.ProgressStats(8, 0, 13),
1063                pw_transfer.ProgressStats(13, 8, 13),
1064                pw_transfer.ProgressStats(13, 13, 13),
1065            ],
1066        )
1067
1068    def test_write_transfer_rewind(self) -> None:
1069        """Write transfer in which the server re-requests an earlier offset."""
1070        manager = pw_transfer.Manager(
1071            self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S
1072        )
1073
1074        self._enqueue_server_responses(
1075            _Method.WRITE,
1076            (
1077                (
1078                    transfer_pb2.Chunk(
1079                        transfer_id=4,
1080                        offset=0,
1081                        pending_bytes=8,
1082                        max_chunk_size_bytes=8,
1083                    ),
1084                ),
1085                (
1086                    transfer_pb2.Chunk(
1087                        transfer_id=4,
1088                        offset=8,
1089                        pending_bytes=8,
1090                        max_chunk_size_bytes=8,
1091                    ),
1092                ),
1093                (
1094                    transfer_pb2.Chunk(
1095                        transfer_id=4,
1096                        offset=4,  # rewind
1097                        pending_bytes=8,
1098                        max_chunk_size_bytes=8,
1099                    ),
1100                ),
1101                (
1102                    transfer_pb2.Chunk(
1103                        transfer_id=4,
1104                        offset=12,
1105                        pending_bytes=16,  # update max size
1106                        max_chunk_size_bytes=16,
1107                    ),
1108                ),
1109                (transfer_pb2.Chunk(transfer_id=4, status=Status.OK.value),),
1110            ),
1111        )
1112
1113        manager.write(4, b'pigweed data transfer')
1114        self.assertEqual(len(self._sent_chunks), 5)
1115        self.assertEqual(self._sent_chunks[1].data, b'pigweed ')
1116        self.assertEqual(self._sent_chunks[2].data, b'data tra')
1117        self.assertEqual(self._sent_chunks[3].data, b'eed data')
1118        self.assertEqual(self._sent_chunks[4].data, b' transfer')
1119
1120    def test_write_transfer_bad_offset(self) -> None:
1121        manager = pw_transfer.Manager(
1122            self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S
1123        )
1124
1125        self._enqueue_server_responses(
1126            _Method.WRITE,
1127            (
1128                (
1129                    transfer_pb2.Chunk(
1130                        transfer_id=4,
1131                        offset=0,
1132                        pending_bytes=8,
1133                        max_chunk_size_bytes=8,
1134                    ),
1135                ),
1136                (
1137                    transfer_pb2.Chunk(
1138                        transfer_id=4,
1139                        offset=100,  # larger offset than data
1140                        pending_bytes=8,
1141                        max_chunk_size_bytes=8,
1142                    ),
1143                ),
1144                (transfer_pb2.Chunk(transfer_id=4, status=Status.OK.value),),
1145            ),
1146        )
1147
1148        with self.assertRaises(pw_transfer.Error) as context:
1149            manager.write(4, b'small data')
1150
1151        exception = context.exception
1152        self.assertEqual(exception.resource_id, 4)
1153        self.assertEqual(exception.status, Status.OUT_OF_RANGE)
1154
1155    def test_write_transfer_error(self) -> None:
1156        manager = pw_transfer.Manager(
1157            self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S
1158        )
1159
1160        self._enqueue_server_responses(
1161            _Method.WRITE,
1162            (
1163                (
1164                    transfer_pb2.Chunk(
1165                        transfer_id=21, status=Status.UNAVAILABLE.value
1166                    ),
1167                ),
1168            ),
1169        )
1170
1171        with self.assertRaises(pw_transfer.Error) as context:
1172            manager.write(21, b'no write')
1173
1174        exception = context.exception
1175        self.assertEqual(exception.resource_id, 21)
1176        self.assertEqual(exception.status, Status.UNAVAILABLE)
1177
1178    def test_write_transfer_server_error(self) -> None:
1179        manager = pw_transfer.Manager(
1180            self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S
1181        )
1182
1183        self._enqueue_server_error(_Method.WRITE, Status.NOT_FOUND)
1184
1185        with self.assertRaises(pw_transfer.Error) as context:
1186            manager.write(21, b'server error')
1187
1188        exception = context.exception
1189        self.assertEqual(exception.resource_id, 21)
1190        self.assertEqual(exception.status, Status.INTERNAL)
1191
1192    def test_write_transfer_timeout_after_initial_chunk(self) -> None:
1193        manager = pw_transfer.Manager(
1194            self._service,
1195            default_response_timeout_s=0.001,
1196            max_retries=2,
1197            default_protocol_version=ProtocolVersion.LEGACY,
1198        )
1199
1200        with self.assertRaises(pw_transfer.Error) as context:
1201            manager.write(22, b'no server response!')
1202
1203        self.assertEqual(
1204            self._sent_chunks,
1205            [
1206                transfer_pb2.Chunk(
1207                    transfer_id=22,
1208                    resource_id=22,
1209                    type=transfer_pb2.Chunk.Type.START,
1210                ),  # initial chunk
1211                transfer_pb2.Chunk(
1212                    transfer_id=22,
1213                    resource_id=22,
1214                    type=transfer_pb2.Chunk.Type.START,
1215                ),  # retry 1
1216                transfer_pb2.Chunk(
1217                    transfer_id=22,
1218                    resource_id=22,
1219                    type=transfer_pb2.Chunk.Type.START,
1220                ),  # retry 2
1221            ],
1222        )
1223
1224        exception = context.exception
1225        self.assertEqual(exception.resource_id, 22)
1226        self.assertEqual(exception.status, Status.DEADLINE_EXCEEDED)
1227
1228    def test_write_transfer_timeout_after_intermediate_chunk(self) -> None:
1229        """Tests write transfers that timeout after the initial chunk."""
1230        manager = pw_transfer.Manager(
1231            self._service,
1232            default_response_timeout_s=DEFAULT_TIMEOUT_S,
1233            max_retries=2,
1234            default_protocol_version=ProtocolVersion.LEGACY,
1235        )
1236
1237        self._enqueue_server_responses(
1238            _Method.WRITE,
1239            [
1240                [
1241                    transfer_pb2.Chunk(
1242                        transfer_id=22, pending_bytes=10, max_chunk_size_bytes=5
1243                    )
1244                ]
1245            ],
1246        )
1247
1248        with self.assertRaises(pw_transfer.Error) as context:
1249            manager.write(22, b'0123456789')
1250
1251        last_data_chunk = transfer_pb2.Chunk(
1252            transfer_id=22,
1253            data=b'56789',
1254            offset=5,
1255            remaining_bytes=0,
1256            type=transfer_pb2.Chunk.Type.DATA,
1257        )
1258
1259        self.assertEqual(
1260            self._sent_chunks,
1261            [
1262                transfer_pb2.Chunk(
1263                    transfer_id=22,
1264                    resource_id=22,
1265                    type=transfer_pb2.Chunk.Type.START,
1266                ),
1267                transfer_pb2.Chunk(
1268                    transfer_id=22,
1269                    data=b'01234',
1270                    type=transfer_pb2.Chunk.Type.DATA,
1271                ),
1272                last_data_chunk,  # last chunk
1273                last_data_chunk,  # retry 1
1274                last_data_chunk,  # retry 2
1275            ],
1276        )
1277
1278        exception = context.exception
1279        self.assertEqual(exception.resource_id, 22)
1280        self.assertEqual(exception.status, Status.DEADLINE_EXCEEDED)
1281
1282    def test_write_zero_pending_bytes_is_internal_error(self) -> None:
1283        manager = pw_transfer.Manager(
1284            self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S
1285        )
1286
1287        self._enqueue_server_responses(
1288            _Method.WRITE,
1289            ((transfer_pb2.Chunk(transfer_id=23, pending_bytes=0),),),
1290        )
1291
1292        with self.assertRaises(pw_transfer.Error) as context:
1293            manager.write(23, b'no write')
1294
1295        exception = context.exception
1296        self.assertEqual(exception.resource_id, 23)
1297        self.assertEqual(exception.status, Status.INTERNAL)
1298
1299    def test_v2_read_transfer_basic(self) -> None:
1300        """Tests a simple protocol version 2 read transfer."""
1301        manager = pw_transfer.Manager(
1302            self._service,
1303            default_response_timeout_s=DEFAULT_TIMEOUT_S,
1304            default_protocol_version=ProtocolVersion.VERSION_TWO,
1305        )
1306
1307        self._enqueue_server_responses(
1308            _Method.READ,
1309            (
1310                (
1311                    transfer_pb2.Chunk(
1312                        resource_id=39,
1313                        session_id=_FIRST_SESSION_ID,
1314                        type=transfer_pb2.Chunk.Type.START_ACK,
1315                        protocol_version=ProtocolVersion.VERSION_TWO.value,
1316                    ),
1317                ),
1318                (
1319                    transfer_pb2.Chunk(
1320                        session_id=_FIRST_SESSION_ID,
1321                        type=transfer_pb2.Chunk.Type.DATA,
1322                        offset=0,
1323                        data=b'version two',
1324                        remaining_bytes=0,
1325                    ),
1326                ),
1327                (
1328                    transfer_pb2.Chunk(
1329                        session_id=_FIRST_SESSION_ID,
1330                        type=transfer_pb2.Chunk.Type.COMPLETION_ACK,
1331                    ),
1332                ),
1333            ),
1334        )
1335
1336        data = manager.read(39)
1337
1338        self.assertEqual(
1339            self._sent_chunks,
1340            [
1341                transfer_pb2.Chunk(
1342                    transfer_id=39,
1343                    resource_id=39,
1344                    desired_session_id=_FIRST_SESSION_ID,
1345                    pending_bytes=1024,
1346                    max_chunk_size_bytes=1024,
1347                    window_end_offset=1024,
1348                    type=transfer_pb2.Chunk.Type.START,
1349                    protocol_version=ProtocolVersion.VERSION_TWO.value,
1350                ),
1351                transfer_pb2.Chunk(
1352                    session_id=_FIRST_SESSION_ID,
1353                    type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION,
1354                    max_chunk_size_bytes=1024,
1355                    window_end_offset=1024,
1356                    # pending_bytes should no longer exist as server and client
1357                    # have agreed on v2.
1358                    protocol_version=ProtocolVersion.VERSION_TWO.value,
1359                ),
1360                transfer_pb2.Chunk(
1361                    session_id=_FIRST_SESSION_ID,
1362                    type=transfer_pb2.Chunk.Type.COMPLETION,
1363                    status=Status.OK.value,
1364                ),
1365            ],
1366        )
1367
1368        self.assertEqual(data, b'version two')
1369
1370    def test_v2_read_transfer_legacy_fallback(self) -> None:
1371        """Tests a v2 read transfer when the server only supports legacy."""
1372        manager = pw_transfer.Manager(
1373            self._service,
1374            default_response_timeout_s=DEFAULT_TIMEOUT_S,
1375            default_protocol_version=ProtocolVersion.VERSION_TWO,
1376        )
1377
1378        # Respond to the START chunk with a legacy data transfer chunk instead
1379        # of a START_ACK.
1380        self._enqueue_server_responses(
1381            _Method.READ,
1382            (
1383                (
1384                    transfer_pb2.Chunk(
1385                        transfer_id=40,
1386                        type=transfer_pb2.Chunk.Type.DATA,
1387                        offset=0,
1388                        data=b'sorry, legacy only',
1389                        remaining_bytes=0,
1390                    ),
1391                ),
1392            ),
1393        )
1394
1395        data = manager.read(40)
1396
1397        self.assertEqual(
1398            self._sent_chunks,
1399            [
1400                transfer_pb2.Chunk(
1401                    transfer_id=40,
1402                    resource_id=40,
1403                    desired_session_id=_FIRST_SESSION_ID,
1404                    pending_bytes=1024,
1405                    max_chunk_size_bytes=1024,
1406                    window_end_offset=1024,
1407                    type=transfer_pb2.Chunk.Type.START,
1408                    protocol_version=ProtocolVersion.VERSION_TWO.value,
1409                ),
1410                transfer_pb2.Chunk(
1411                    transfer_id=40,
1412                    type=transfer_pb2.Chunk.Type.COMPLETION,
1413                    status=Status.OK.value,
1414                ),
1415            ],
1416        )
1417
1418        self.assertEqual(data, b'sorry, legacy only')
1419
1420    def test_v2_write_transfer_basic(self) -> None:
1421        """Tests a simple protocol version 2 write transfer."""
1422        manager = pw_transfer.Manager(
1423            self._service,
1424            default_response_timeout_s=DEFAULT_TIMEOUT_S,
1425            default_protocol_version=ProtocolVersion.VERSION_TWO,
1426        )
1427
1428        self._enqueue_server_responses(
1429            _Method.WRITE,
1430            (
1431                (
1432                    transfer_pb2.Chunk(
1433                        resource_id=72,
1434                        session_id=_FIRST_SESSION_ID,
1435                        type=transfer_pb2.Chunk.Type.START_ACK,
1436                        protocol_version=ProtocolVersion.VERSION_TWO.value,
1437                    ),
1438                ),
1439                (
1440                    transfer_pb2.Chunk(
1441                        session_id=_FIRST_SESSION_ID,
1442                        type=transfer_pb2.Chunk.Type.PARAMETERS_RETRANSMIT,
1443                        offset=0,
1444                        window_end_offset=32,
1445                        max_chunk_size_bytes=8,
1446                    ),
1447                ),
1448                (),  # In response to the first data chunk.
1449                (
1450                    transfer_pb2.Chunk(
1451                        session_id=_FIRST_SESSION_ID,
1452                        type=transfer_pb2.Chunk.Type.COMPLETION,
1453                        status=Status.OK.value,
1454                    ),
1455                ),
1456            ),
1457        )
1458
1459        manager.write(72, b'write version 2')
1460
1461        self.assertEqual(
1462            self._sent_chunks,
1463            [
1464                transfer_pb2.Chunk(
1465                    transfer_id=72,
1466                    resource_id=72,
1467                    desired_session_id=_FIRST_SESSION_ID,
1468                    type=transfer_pb2.Chunk.Type.START,
1469                    protocol_version=ProtocolVersion.VERSION_TWO.value,
1470                ),
1471                transfer_pb2.Chunk(
1472                    session_id=_FIRST_SESSION_ID,
1473                    type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION,
1474                    protocol_version=ProtocolVersion.VERSION_TWO.value,
1475                ),
1476                transfer_pb2.Chunk(
1477                    session_id=_FIRST_SESSION_ID,
1478                    type=transfer_pb2.Chunk.Type.DATA,
1479                    offset=0,
1480                    data=b'write ve',
1481                ),
1482                transfer_pb2.Chunk(
1483                    session_id=_FIRST_SESSION_ID,
1484                    type=transfer_pb2.Chunk.Type.DATA,
1485                    offset=8,
1486                    data=b'rsion 2',
1487                    remaining_bytes=0,
1488                ),
1489                transfer_pb2.Chunk(
1490                    session_id=_FIRST_SESSION_ID,
1491                    type=transfer_pb2.Chunk.Type.COMPLETION_ACK,
1492                ),
1493            ],
1494        )
1495
1496        self.assertEqual(self._received_data(), b'write version 2')
1497
1498    def test_v2_write_transfer_legacy_fallback(self) -> None:
1499        """Tests a v2 write transfer when the server only supports legacy."""
1500        manager = pw_transfer.Manager(
1501            self._service,
1502            default_response_timeout_s=DEFAULT_TIMEOUT_S,
1503            default_protocol_version=ProtocolVersion.VERSION_TWO,
1504        )
1505
1506        self._enqueue_server_responses(
1507            _Method.WRITE,
1508            (
1509                # Send a parameters chunk immediately per the legacy protocol.
1510                (
1511                    transfer_pb2.Chunk(
1512                        transfer_id=76,
1513                        type=transfer_pb2.Chunk.Type.PARAMETERS_RETRANSMIT,
1514                        offset=0,
1515                        pending_bytes=32,
1516                        window_end_offset=32,
1517                        max_chunk_size_bytes=8,
1518                    ),
1519                ),
1520                (),  # In response to the first data chunk.
1521                (
1522                    transfer_pb2.Chunk(
1523                        transfer_id=76,
1524                        type=transfer_pb2.Chunk.Type.COMPLETION,
1525                        status=Status.OK.value,
1526                    ),
1527                ),
1528            ),
1529        )
1530
1531        manager.write(76, b'write v... NOPE')
1532
1533        self.assertEqual(
1534            self._sent_chunks,
1535            [
1536                transfer_pb2.Chunk(
1537                    transfer_id=76,
1538                    resource_id=76,
1539                    desired_session_id=_FIRST_SESSION_ID,
1540                    type=transfer_pb2.Chunk.Type.START,
1541                    protocol_version=ProtocolVersion.VERSION_TWO.value,
1542                ),
1543                transfer_pb2.Chunk(
1544                    transfer_id=76,
1545                    type=transfer_pb2.Chunk.Type.DATA,
1546                    offset=0,
1547                    data=b'write v.',
1548                ),
1549                transfer_pb2.Chunk(
1550                    transfer_id=76,
1551                    type=transfer_pb2.Chunk.Type.DATA,
1552                    offset=8,
1553                    data=b'.. NOPE',
1554                    remaining_bytes=0,
1555                ),
1556            ],
1557        )
1558
1559        self.assertEqual(self._received_data(), b'write v... NOPE')
1560
1561    def test_v2_server_error(self) -> None:
1562        """Tests a server error occurring during the opening handshake."""
1563
1564        manager = pw_transfer.Manager(
1565            self._service,
1566            default_response_timeout_s=DEFAULT_TIMEOUT_S,
1567            default_protocol_version=ProtocolVersion.VERSION_TWO,
1568        )
1569
1570        self._enqueue_server_responses(
1571            _Method.READ,
1572            (
1573                (
1574                    transfer_pb2.Chunk(
1575                        resource_id=43,
1576                        session_id=_FIRST_SESSION_ID,
1577                        type=transfer_pb2.Chunk.Type.START_ACK,
1578                        protocol_version=ProtocolVersion.VERSION_TWO.value,
1579                    ),
1580                ),
1581                (
1582                    transfer_pb2.Chunk(
1583                        session_id=_FIRST_SESSION_ID,
1584                        type=transfer_pb2.Chunk.Type.COMPLETION,
1585                        status=Status.DATA_LOSS.value,
1586                    ),
1587                ),
1588            ),
1589        )
1590
1591        with self.assertRaises(pw_transfer.Error) as context:
1592            manager.read(43)
1593
1594        self.assertEqual(
1595            self._sent_chunks,
1596            [
1597                transfer_pb2.Chunk(
1598                    transfer_id=43,
1599                    desired_session_id=_FIRST_SESSION_ID,
1600                    resource_id=43,
1601                    pending_bytes=1024,
1602                    max_chunk_size_bytes=1024,
1603                    window_end_offset=1024,
1604                    type=transfer_pb2.Chunk.Type.START,
1605                    protocol_version=ProtocolVersion.VERSION_TWO.value,
1606                ),
1607                transfer_pb2.Chunk(
1608                    session_id=_FIRST_SESSION_ID,
1609                    type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION,
1610                    max_chunk_size_bytes=1024,
1611                    window_end_offset=1024,
1612                    protocol_version=ProtocolVersion.VERSION_TWO.value,
1613                ),
1614                # Client sends a COMPLETION_ACK in response to the server.
1615                transfer_pb2.Chunk(
1616                    session_id=_FIRST_SESSION_ID,
1617                    type=transfer_pb2.Chunk.Type.COMPLETION_ACK,
1618                ),
1619            ],
1620        )
1621
1622        exception = context.exception
1623        self.assertEqual(exception.resource_id, 43)
1624        self.assertEqual(exception.status, Status.DATA_LOSS)
1625
1626    def test_v2_timeout_during_opening_handshake(self) -> None:
1627        """Tests a timeout occurring during the opening handshake."""
1628        manager = pw_transfer.Manager(
1629            self._service,
1630            default_response_timeout_s=DEFAULT_TIMEOUT_S,
1631            default_protocol_version=ProtocolVersion.VERSION_TWO,
1632        )
1633
1634        # Don't enqueue any server responses.
1635
1636        with self.assertRaises(pw_transfer.Error) as context:
1637            manager.read(41)
1638
1639        start_chunk = transfer_pb2.Chunk(
1640            transfer_id=41,
1641            resource_id=41,
1642            desired_session_id=_FIRST_SESSION_ID,
1643            pending_bytes=1024,
1644            max_chunk_size_bytes=1024,
1645            window_end_offset=1024,
1646            type=transfer_pb2.Chunk.Type.START,
1647            protocol_version=ProtocolVersion.VERSION_TWO.value,
1648        )
1649
1650        # The opening chunk should be sent initially, then retried three times.
1651        self.assertEqual(self._sent_chunks, [start_chunk] * 4)
1652
1653        exception = context.exception
1654        self.assertEqual(exception.resource_id, 41)
1655        self.assertEqual(exception.status, Status.DEADLINE_EXCEEDED)
1656
1657    def test_v2_timeout_recovery_during_opening_handshake(self) -> None:
1658        """Tests a timeout during the opening handshake which recovers."""
1659        manager = pw_transfer.Manager(
1660            self._service,
1661            default_response_timeout_s=DEFAULT_TIMEOUT_S,
1662            default_protocol_version=ProtocolVersion.VERSION_TWO,
1663        )
1664
1665        self._enqueue_server_responses(
1666            _Method.WRITE,
1667            (
1668                (
1669                    transfer_pb2.Chunk(
1670                        resource_id=73,
1671                        session_id=_FIRST_SESSION_ID,
1672                        type=transfer_pb2.Chunk.Type.START_ACK,
1673                        protocol_version=ProtocolVersion.VERSION_TWO.value,
1674                    ),
1675                ),
1676                (),  # Don't respond to the START_ACK_CONFIRMATION.
1677                (),  # Don't respond to the first START_ACK_CONFIRMATION retry.
1678                (
1679                    transfer_pb2.Chunk(
1680                        session_id=_FIRST_SESSION_ID,
1681                        type=transfer_pb2.Chunk.Type.PARAMETERS_RETRANSMIT,
1682                        offset=0,
1683                        window_end_offset=32,
1684                        max_chunk_size_bytes=8,
1685                    ),
1686                ),
1687                (),  # In response to the first data chunk.
1688                (
1689                    transfer_pb2.Chunk(
1690                        session_id=_FIRST_SESSION_ID,
1691                        type=transfer_pb2.Chunk.Type.COMPLETION,
1692                        status=Status.OK.value,
1693                    ),
1694                ),
1695            ),
1696        )
1697
1698        manager.write(73, b'write timeout 2')
1699
1700        start_ack_confirmation = transfer_pb2.Chunk(
1701            session_id=_FIRST_SESSION_ID,
1702            type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION,
1703            protocol_version=ProtocolVersion.VERSION_TWO.value,
1704        )
1705
1706        self.assertEqual(
1707            self._sent_chunks,
1708            [
1709                transfer_pb2.Chunk(
1710                    transfer_id=73,
1711                    resource_id=73,
1712                    desired_session_id=_FIRST_SESSION_ID,
1713                    type=transfer_pb2.Chunk.Type.START,
1714                    protocol_version=ProtocolVersion.VERSION_TWO.value,
1715                ),
1716                start_ack_confirmation,  # Initial transmission
1717                start_ack_confirmation,  # Retry 1
1718                start_ack_confirmation,  # Retry 2
1719                transfer_pb2.Chunk(
1720                    session_id=_FIRST_SESSION_ID,
1721                    type=transfer_pb2.Chunk.Type.DATA,
1722                    offset=0,
1723                    data=b'write ti',
1724                ),
1725                transfer_pb2.Chunk(
1726                    session_id=_FIRST_SESSION_ID,
1727                    type=transfer_pb2.Chunk.Type.DATA,
1728                    offset=8,
1729                    data=b'meout 2',
1730                    remaining_bytes=0,
1731                ),
1732                transfer_pb2.Chunk(
1733                    session_id=_FIRST_SESSION_ID,
1734                    type=transfer_pb2.Chunk.Type.COMPLETION_ACK,
1735                ),
1736            ],
1737        )
1738
1739        self.assertEqual(self._received_data(), b'write timeout 2')
1740
1741    def test_v2_closing_handshake_bad_chunk(self) -> None:
1742        """Tests an unexpected chunk response during the closing handshake."""
1743        manager = pw_transfer.Manager(
1744            self._service,
1745            default_response_timeout_s=DEFAULT_TIMEOUT_S,
1746            default_protocol_version=ProtocolVersion.VERSION_TWO,
1747        )
1748
1749        self._enqueue_server_responses(
1750            _Method.READ,
1751            (
1752                (
1753                    transfer_pb2.Chunk(
1754                        resource_id=47,
1755                        session_id=_FIRST_SESSION_ID,
1756                        type=transfer_pb2.Chunk.Type.START_ACK,
1757                        protocol_version=ProtocolVersion.VERSION_TWO.value,
1758                    ),
1759                ),
1760                (
1761                    transfer_pb2.Chunk(
1762                        session_id=_FIRST_SESSION_ID,
1763                        type=transfer_pb2.Chunk.Type.DATA,
1764                        offset=0,
1765                        data=b'version two',
1766                        remaining_bytes=0,
1767                    ),
1768                ),
1769                # In response to the COMPLETION, re-send the last chunk instead
1770                # of a COMPLETION_ACK.
1771                (
1772                    transfer_pb2.Chunk(
1773                        session_id=_FIRST_SESSION_ID,
1774                        type=transfer_pb2.Chunk.Type.DATA,
1775                        offset=0,
1776                        data=b'version two',
1777                        remaining_bytes=0,
1778                    ),
1779                ),
1780                (
1781                    transfer_pb2.Chunk(
1782                        session_id=_FIRST_SESSION_ID,
1783                        type=transfer_pb2.Chunk.Type.COMPLETION_ACK,
1784                    ),
1785                ),
1786            ),
1787        )
1788
1789        data = manager.read(47)
1790
1791        self.assertEqual(
1792            self._sent_chunks,
1793            [
1794                transfer_pb2.Chunk(
1795                    transfer_id=47,
1796                    resource_id=47,
1797                    desired_session_id=_FIRST_SESSION_ID,
1798                    pending_bytes=1024,
1799                    max_chunk_size_bytes=1024,
1800                    window_end_offset=1024,
1801                    type=transfer_pb2.Chunk.Type.START,
1802                    protocol_version=ProtocolVersion.VERSION_TWO.value,
1803                ),
1804                transfer_pb2.Chunk(
1805                    session_id=_FIRST_SESSION_ID,
1806                    type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION,
1807                    max_chunk_size_bytes=1024,
1808                    window_end_offset=1024,
1809                    protocol_version=ProtocolVersion.VERSION_TWO.value,
1810                ),
1811                transfer_pb2.Chunk(
1812                    session_id=_FIRST_SESSION_ID,
1813                    type=transfer_pb2.Chunk.Type.COMPLETION,
1814                    status=Status.OK.value,
1815                ),
1816                # Completion should be re-sent following the repeated chunk.
1817                transfer_pb2.Chunk(
1818                    session_id=_FIRST_SESSION_ID,
1819                    type=transfer_pb2.Chunk.Type.COMPLETION,
1820                    status=Status.OK.value,
1821                ),
1822            ],
1823        )
1824
1825        self.assertEqual(data, b'version two')
1826
1827    def test_v2_timeout_during_closing_handshake(self) -> None:
1828        """Tests a timeout occurring during the closing handshake."""
1829        manager = pw_transfer.Manager(
1830            self._service,
1831            default_response_timeout_s=DEFAULT_TIMEOUT_S,
1832            default_protocol_version=ProtocolVersion.VERSION_TWO,
1833        )
1834
1835        self._enqueue_server_responses(
1836            _Method.READ,
1837            (
1838                (
1839                    transfer_pb2.Chunk(
1840                        resource_id=47,
1841                        session_id=_FIRST_SESSION_ID,
1842                        type=transfer_pb2.Chunk.Type.START_ACK,
1843                        protocol_version=ProtocolVersion.VERSION_TWO.value,
1844                    ),
1845                ),
1846                (
1847                    transfer_pb2.Chunk(
1848                        session_id=_FIRST_SESSION_ID,
1849                        type=transfer_pb2.Chunk.Type.DATA,
1850                        offset=0,
1851                        data=b'dropped completion',
1852                        remaining_bytes=0,
1853                    ),
1854                ),
1855                # Never send the expected COMPLETION_ACK chunk.
1856            ),
1857        )
1858
1859        data = manager.read(47)
1860
1861        self.assertEqual(
1862            self._sent_chunks,
1863            [
1864                transfer_pb2.Chunk(
1865                    transfer_id=47,
1866                    resource_id=47,
1867                    desired_session_id=_FIRST_SESSION_ID,
1868                    pending_bytes=1024,
1869                    max_chunk_size_bytes=1024,
1870                    window_end_offset=1024,
1871                    type=transfer_pb2.Chunk.Type.START,
1872                    protocol_version=ProtocolVersion.VERSION_TWO.value,
1873                ),
1874                transfer_pb2.Chunk(
1875                    session_id=_FIRST_SESSION_ID,
1876                    type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION,
1877                    max_chunk_size_bytes=1024,
1878                    window_end_offset=1024,
1879                    protocol_version=ProtocolVersion.VERSION_TWO.value,
1880                ),
1881                transfer_pb2.Chunk(
1882                    session_id=_FIRST_SESSION_ID,
1883                    type=transfer_pb2.Chunk.Type.COMPLETION,
1884                    status=Status.OK.value,
1885                ),
1886                # The completion should be retried per the usual retry flow.
1887                transfer_pb2.Chunk(
1888                    session_id=_FIRST_SESSION_ID,
1889                    type=transfer_pb2.Chunk.Type.COMPLETION,
1890                    status=Status.OK.value,
1891                ),
1892                transfer_pb2.Chunk(
1893                    session_id=_FIRST_SESSION_ID,
1894                    type=transfer_pb2.Chunk.Type.COMPLETION,
1895                    status=Status.OK.value,
1896                ),
1897                transfer_pb2.Chunk(
1898                    session_id=_FIRST_SESSION_ID,
1899                    type=transfer_pb2.Chunk.Type.COMPLETION,
1900                    status=Status.OK.value,
1901                ),
1902            ],
1903        )
1904
1905        # Despite timing out following several retries, the transfer should
1906        # still conclude successfully, as failing to receive a COMPLETION_ACK
1907        # is not fatal.
1908        self.assertEqual(data, b'dropped completion')
1909
1910
1911class ProgressStatsTest(unittest.TestCase):
1912    def test_received_percent_known_total(self) -> None:
1913        self.assertEqual(
1914            pw_transfer.ProgressStats(75, 0, 100).percent_received(), 0.0
1915        )
1916        self.assertEqual(
1917            pw_transfer.ProgressStats(75, 50, 100).percent_received(), 50.0
1918        )
1919        self.assertEqual(
1920            pw_transfer.ProgressStats(100, 100, 100).percent_received(), 100.0
1921        )
1922
1923    def test_received_percent_unknown_total(self) -> None:
1924        self.assertTrue(
1925            math.isnan(
1926                pw_transfer.ProgressStats(75, 50, None).percent_received()
1927            )
1928        )
1929        self.assertTrue(
1930            math.isnan(
1931                pw_transfer.ProgressStats(100, 100, None).percent_received()
1932            )
1933        )
1934
1935    def test_str_known_total(self) -> None:
1936        stats = str(pw_transfer.ProgressStats(75, 50, 100))
1937        self.assertIn('75', stats)
1938        self.assertIn('50', stats)
1939        self.assertIn('100', stats)
1940
1941    def test_str_unknown_total(self) -> None:
1942        stats = str(pw_transfer.ProgressStats(75, 50, None))
1943        self.assertIn('75', stats)
1944        self.assertIn('50', stats)
1945        self.assertIn('unknown', stats)
1946
1947
1948if __name__ == '__main__':
1949    # TODO: b/265975025 - Only run this test in upstream Pigweed until the
1950    #     occasional hangs are fixed.
1951    if os.environ.get('PW_ROOT') and os.environ.get(
1952        'PW_ROOT'
1953    ) == os.environ.get('PW_PROJECT_ROOT'):
1954        unittest.main()
1955    else:
1956        print('Skipping transfer_test.py due to possible hangs (b/265975025).')
1957