• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #define PW_LOG_MODULE_NAME "TRN"
16 
17 #include "pw_transfer/internal/context.h"
18 
19 #include <chrono>
20 #include <mutex>
21 
22 #include "pw_assert/check.h"
23 #include "pw_chrono/system_clock.h"
24 #include "pw_log/log.h"
25 #include "pw_status/try.h"
26 #include "pw_transfer/transfer.pwpb.h"
27 #include "pw_transfer/transfer_thread.h"
28 #include "pw_varint/varint.h"
29 
30 PW_MODIFY_DIAGNOSTICS_PUSH();
31 PW_MODIFY_DIAGNOSTIC(ignored, "-Wmissing-field-initializers");
32 
33 namespace pw::transfer::internal {
34 
HandleEvent(const Event & event)35 void Context::HandleEvent(const Event& event) {
36   switch (event.type) {
37     case EventType::kNewClientTransfer:
38     case EventType::kNewServerTransfer: {
39       if (active()) {
40         Finish(Status::Aborted());
41       }
42 
43       Initialize(event.new_transfer);
44 
45       if (event.type == EventType::kNewClientTransfer) {
46         InitiateTransferAsClient();
47       } else {
48         StartTransferAsServer(event.new_transfer);
49       }
50       return;
51     }
52 
53     case EventType::kClientChunk:
54     case EventType::kServerChunk:
55       PW_CHECK(initialized());
56       HandleChunkEvent(event.chunk);
57       return;
58 
59     case EventType::kClientTimeout:
60     case EventType::kServerTimeout:
61       HandleTimeout();
62       return;
63 
64     case EventType::kSendStatusChunk:
65     case EventType::kSetTransferStream:
66     case EventType::kAddTransferHandler:
67     case EventType::kRemoveTransferHandler:
68     case EventType::kTerminate:
69       // These events are intended for the transfer thread and should never be
70       // forwarded through to a context.
71       PW_CRASH("Transfer context received a transfer thread event");
72   }
73 }
74 
InitiateTransferAsClient()75 void Context::InitiateTransferAsClient() {
76   PW_DCHECK(active());
77 
78   SetTimeout(chunk_timeout_);
79 
80   if (type() == TransferType::kReceive) {
81     // A receiver begins a new transfer with a parameters chunk telling the
82     // transmitter what to send.
83     UpdateAndSendTransferParameters(TransmitAction::kBegin);
84   } else {
85     SendInitialTransmitChunk();
86   }
87 
88   // Don't send an error packet. If the transfer failed to start, then there's
89   // nothing to tell the server about.
90 }
91 
StartTransferAsServer(const NewTransferEvent & new_transfer)92 void Context::StartTransferAsServer(const NewTransferEvent& new_transfer) {
93   PW_LOG_INFO("Starting transfer %u with handler %u",
94               static_cast<unsigned>(new_transfer.transfer_id),
95               static_cast<unsigned>(new_transfer.handler_id));
96 
97   if (const Status status = new_transfer.handler->Prepare(new_transfer.type);
98       !status.ok()) {
99     PW_LOG_WARN("Transfer handler %u prepare failed with status %u",
100                 static_cast<unsigned>(new_transfer.handler->id()),
101                 status.code());
102     Finish(status.IsPermissionDenied() ? status : Status::DataLoss());
103     // Do not send the final status packet here! On the server, a start event
104     // will immediately be followed by the server chunk event. Sending the final
105     // chunk will be handled then.
106     return;
107   }
108 
109   // Initialize doesn't set the handler since it's specific to server transfers.
110   static_cast<ServerContext&>(*this).set_handler(*new_transfer.handler);
111 
112   // Server transfers use the stream provided by the handler rather than the
113   // stream included in the NewTransferEvent.
114   stream_ = &new_transfer.handler->stream();
115 }
116 
SendInitialTransmitChunk()117 void Context::SendInitialTransmitChunk() {
118   // A transmitter begins a transfer by just sending its ID.
119   internal::Chunk chunk = {};
120   chunk.transfer_id = transfer_id_;
121   chunk.type = Chunk::Type::kTransferStart;
122 
123   EncodeAndSendChunk(chunk);
124 }
125 
SendTransferParameters(TransmitAction action)126 void Context::SendTransferParameters(TransmitAction action) {
127   internal::Chunk parameters = {
128       .transfer_id = transfer_id_,
129       .window_end_offset = window_end_offset_,
130       .pending_bytes = pending_bytes_,
131       .max_chunk_size_bytes = max_chunk_size_bytes_,
132       .min_delay_microseconds = kDefaultChunkDelayMicroseconds,
133       .offset = offset_,
134   };
135 
136   switch (action) {
137     case TransmitAction::kBegin:
138       parameters.type = internal::Chunk::Type::kTransferStart;
139       break;
140     case TransmitAction::kRetransmit:
141       parameters.type = internal::Chunk::Type::kParametersRetransmit;
142       break;
143     case TransmitAction::kExtend:
144       parameters.type = internal::Chunk::Type::kParametersContinue;
145       break;
146   }
147 
148   PW_LOG_DEBUG(
149       "Transfer %u sending transfer parameters: "
150       "offset=%u, window_end_offset=%u, pending_bytes=%u, chunk_size=%u",
151       static_cast<unsigned>(transfer_id_),
152       static_cast<unsigned>(offset_),
153       static_cast<unsigned>(window_end_offset_),
154       static_cast<unsigned>(pending_bytes_),
155       static_cast<unsigned>(max_chunk_size_bytes_));
156 
157   EncodeAndSendChunk(parameters);
158 }
159 
EncodeAndSendChunk(const Chunk & chunk)160 void Context::EncodeAndSendChunk(const Chunk& chunk) {
161   Result<ConstByteSpan> data =
162       internal::EncodeChunk(chunk, thread_->encode_buffer());
163   if (!data.ok()) {
164     PW_LOG_ERROR("Failed to encode chunk for transfer %u: %d",
165                  static_cast<unsigned>(chunk.transfer_id),
166                  data.status().code());
167     if (active()) {
168       Finish(Status::Internal());
169     }
170     return;
171   }
172 
173   if (const Status status = rpc_writer_->Write(*data); !status.ok()) {
174     PW_LOG_ERROR("Failed to write chunk for transfer %u: %d",
175                  static_cast<unsigned>(chunk.transfer_id),
176                  status.code());
177     if (active()) {
178       Finish(Status::Internal());
179     }
180     return;
181   }
182 }
183 
UpdateAndSendTransferParameters(TransmitAction action)184 void Context::UpdateAndSendTransferParameters(TransmitAction action) {
185   size_t pending_bytes =
186       std::min(max_parameters_->pending_bytes(),
187                static_cast<uint32_t>(writer().ConservativeWriteLimit()));
188 
189   window_size_ = pending_bytes;
190   window_end_offset_ = offset_ + pending_bytes;
191   pending_bytes_ = pending_bytes;
192 
193   max_chunk_size_bytes_ = MaxWriteChunkSize(
194       max_parameters_->max_chunk_size_bytes(), rpc_writer_->channel_id());
195 
196   PW_LOG_INFO("Transfer rate: %u B/s",
197               static_cast<unsigned>(transfer_rate_.GetRateBytesPerSecond()));
198 
199   return SendTransferParameters(action);
200 }
201 
Initialize(const NewTransferEvent & new_transfer)202 void Context::Initialize(const NewTransferEvent& new_transfer) {
203   PW_DCHECK(!active());
204 
205   transfer_id_ = new_transfer.transfer_id;
206   flags_ = static_cast<uint8_t>(new_transfer.type);
207   transfer_state_ = TransferState::kWaiting;
208   retries_ = 0;
209   max_retries_ = new_transfer.max_retries;
210 
211   rpc_writer_ = new_transfer.rpc_writer;
212   stream_ = new_transfer.stream;
213 
214   offset_ = 0;
215   window_size_ = 0;
216   window_end_offset_ = 0;
217   pending_bytes_ = 0;
218   max_chunk_size_bytes_ = new_transfer.max_parameters->max_chunk_size_bytes();
219 
220   max_parameters_ = new_transfer.max_parameters;
221   thread_ = new_transfer.transfer_thread;
222 
223   last_chunk_offset_ = 0;
224   chunk_timeout_ = new_transfer.timeout;
225   interchunk_delay_ = chrono::SystemClock::for_at_least(
226       std::chrono::microseconds(kDefaultChunkDelayMicroseconds));
227   next_timeout_ = kNoTimeout;
228 
229   transfer_rate_.Reset();
230 }
231 
HandleChunkEvent(const ChunkEvent & event)232 void Context::HandleChunkEvent(const ChunkEvent& event) {
233   PW_DCHECK(event.transfer_id == transfer_id_);
234 
235   Chunk chunk;
236   if (!DecodeChunk(ConstByteSpan(event.data, event.size), chunk).ok()) {
237     return;
238   }
239 
240   // Received some data. Reset the retry counter.
241   retries_ = 0;
242 
243   if (chunk.status.has_value()) {
244     if (active()) {
245       Finish(chunk.status.value());
246     } else {
247       PW_LOG_DEBUG("Got final status %d for completed transfer %d",
248                    static_cast<int>(chunk.status.value().code()),
249                    static_cast<int>(transfer_id_));
250     }
251     return;
252   }
253 
254   if (type() == TransferType::kTransmit) {
255     HandleTransmitChunk(chunk);
256   } else {
257     HandleReceiveChunk(chunk);
258   }
259 }
260 
HandleTransmitChunk(const Chunk & chunk)261 void Context::HandleTransmitChunk(const Chunk& chunk) {
262   switch (transfer_state_) {
263     case TransferState::kInactive:
264     case TransferState::kRecovery:
265       PW_CRASH("Never should handle chunk while inactive");
266 
267     case TransferState::kCompleted:
268       // If the transfer has already completed and another chunk is received,
269       // tell the other end that the transfer is over.
270       //
271       // TODO(frolv): Final status chunks should be ACKed by the other end. When
272       // that is added, this case should be updated to check if the received
273       // chunk is an ACK. If so, the transfer state can be reset to INACTIVE.
274       // Otherwise, the final status should be re-sent.
275       if (!chunk.IsInitialChunk()) {
276         status_ = Status::FailedPrecondition();
277       }
278       SendFinalStatusChunk();
279       return;
280 
281     case TransferState::kWaiting:
282     case TransferState::kTransmitting:
283       HandleTransferParametersUpdate(chunk);
284       if (transfer_state_ == TransferState::kCompleted) {
285         SendFinalStatusChunk();
286       }
287       return;
288   }
289 }
290 
HandleTransferParametersUpdate(const Chunk & chunk)291 void Context::HandleTransferParametersUpdate(const Chunk& chunk) {
292   if (!chunk.pending_bytes.has_value()) {
293     // Malformed chunk.
294     Finish(Status::InvalidArgument());
295     return;
296   }
297 
298   bool retransmit = true;
299   if (chunk.type.has_value()) {
300     retransmit = chunk.type == Chunk::Type::kParametersRetransmit ||
301                  chunk.type == Chunk::Type::kTransferStart;
302   }
303 
304   if (retransmit) {
305     // If the offsets don't match, attempt to seek on the reader. Not all
306     // readers support seeking; abort with UNIMPLEMENTED if this handler
307     // doesn't.
308     if (offset_ != chunk.offset) {
309       if (Status seek_status = reader().Seek(chunk.offset); !seek_status.ok()) {
310         PW_LOG_WARN("Transfer %u seek to %u failed with status %u",
311                     static_cast<unsigned>(transfer_id_),
312                     static_cast<unsigned>(chunk.offset),
313                     seek_status.code());
314 
315         // Remap status codes to return one of the following:
316         //
317         //   INTERNAL: invalid seek, never should happen
318         //   DATA_LOSS: the reader is in a bad state
319         //   UNIMPLEMENTED: seeking is not supported
320         //
321         if (seek_status.IsOutOfRange()) {
322           seek_status = Status::Internal();
323         } else if (!seek_status.IsUnimplemented()) {
324           seek_status = Status::DataLoss();
325         }
326 
327         Finish(seek_status);
328         return;
329       }
330     }
331 
332     // Retransmit is the default behavior for older versions of the transfer
333     // protocol. The window_end_offset field is not guaranteed to be set in
334     // these versions, so it must be calculated.
335     offset_ = chunk.offset;
336     window_end_offset_ = offset_ + chunk.pending_bytes.value();
337     pending_bytes_ = chunk.pending_bytes.value();
338   } else {
339     window_end_offset_ = chunk.window_end_offset;
340   }
341 
342   if (chunk.max_chunk_size_bytes.has_value()) {
343     max_chunk_size_bytes_ = std::min(chunk.max_chunk_size_bytes.value(),
344                                      max_parameters_->max_chunk_size_bytes());
345   }
346 
347   if (chunk.min_delay_microseconds.has_value()) {
348     interchunk_delay_ = chrono::SystemClock::for_at_least(
349         std::chrono::microseconds(chunk.min_delay_microseconds.value()));
350   }
351 
352   PW_LOG_DEBUG(
353       "Transfer %u received parameters type=%s offset=%u window_end_offset=%u",
354       static_cast<unsigned>(transfer_id_),
355       retransmit ? "RETRANSMIT" : "CONTINUE",
356       static_cast<unsigned>(chunk.offset),
357       static_cast<unsigned>(window_end_offset_));
358 
359   // Parsed all of the parameters; start sending the window.
360   set_transfer_state(TransferState::kTransmitting);
361 
362   TransmitNextChunk(retransmit);
363 }
364 
TransmitNextChunk(bool retransmit_requested)365 void Context::TransmitNextChunk(bool retransmit_requested) {
366   ByteSpan buffer = thread_->encode_buffer();
367 
368   // Begin by doing a partial encode of all the metadata fields, leaving the
369   // buffer with usable space for the chunk data at the end.
370   transfer::Chunk::MemoryEncoder encoder{buffer};
371   encoder.WriteTransferId(transfer_id_).IgnoreError();
372   encoder.WriteOffset(offset_).IgnoreError();
373 
374   // TODO(frolv): Type field presence is currently meaningful, so this type must
375   // be serialized. Once all users of transfer always set chunk types, the field
376   // can be made non-optional and this write can be removed as TRANSFER_DATA has
377   // the default proto value of 0.
378   encoder.WriteType(transfer::Chunk::Type::TRANSFER_DATA).IgnoreError();
379 
380   // Reserve space for the data proto field overhead and use the remainder of
381   // the buffer for the chunk data.
382   size_t reserved_size = encoder.size() + 1 /* data key */ + 5 /* data size */;
383 
384   ByteSpan data_buffer = buffer.subspan(reserved_size);
385   size_t max_bytes_to_send =
386       std::min(window_end_offset_ - offset_, max_chunk_size_bytes_);
387 
388   if (max_bytes_to_send < data_buffer.size()) {
389     data_buffer = data_buffer.first(max_bytes_to_send);
390   }
391 
392   Result<ByteSpan> data = reader().Read(data_buffer);
393   if (data.status().IsOutOfRange()) {
394     // No more data to read.
395     encoder.WriteRemainingBytes(0).IgnoreError();
396     window_end_offset_ = offset_;
397     pending_bytes_ = 0;
398 
399     PW_LOG_DEBUG("Transfer %u sending final chunk with remaining_bytes=0",
400                  static_cast<unsigned>(transfer_id_));
401   } else if (data.ok()) {
402     if (offset_ == window_end_offset_) {
403       if (retransmit_requested) {
404         PW_LOG_DEBUG(
405             "Transfer %u: received an empty retransmit request, but there is "
406             "still data to send; aborting with RESOURCE_EXHAUSTED",
407             id_for_log());
408         Finish(Status::ResourceExhausted());
409       } else {
410         PW_LOG_DEBUG(
411             "Transfer %u: ignoring continuation packet for transfer window "
412             "that has already been sent",
413             id_for_log());
414         SetTimeout(chunk_timeout_);
415       }
416       return;  // No data was requested, so there is nothing else to do.
417     }
418 
419     PW_LOG_DEBUG("Transfer %u sending chunk offset=%u size=%u",
420                  static_cast<unsigned>(transfer_id_),
421                  static_cast<unsigned>(offset_),
422                  static_cast<unsigned>(data.value().size()));
423 
424     encoder.WriteData(data.value()).IgnoreError();
425     last_chunk_offset_ = offset_;
426     offset_ += data.value().size();
427     pending_bytes_ -= data.value().size();
428   } else {
429     PW_LOG_ERROR("Transfer %u Read() failed with status %u",
430                  static_cast<unsigned>(transfer_id_),
431                  data.status().code());
432     Finish(Status::DataLoss());
433     return;
434   }
435 
436   if (!encoder.status().ok()) {
437     PW_LOG_ERROR("Transfer %u failed to encode transmit chunk",
438                  static_cast<unsigned>(transfer_id_));
439     Finish(Status::Internal());
440     return;
441   }
442 
443   if (const Status status = rpc_writer_->Write(encoder); !status.ok()) {
444     PW_LOG_ERROR("Transfer %u failed to send transmit chunk, status %u",
445                  static_cast<unsigned>(transfer_id_),
446                  status.code());
447     Finish(Status::DataLoss());
448     return;
449   }
450 
451   flags_ |= kFlagsDataSent;
452 
453   if (offset_ == window_end_offset_) {
454     // Sent all requested data. Must now wait for next parameters from the
455     // receiver.
456     set_transfer_state(TransferState::kWaiting);
457     SetTimeout(chunk_timeout_);
458   } else {
459     // More data is to be sent. Set a timeout to send the next chunk following
460     // the chunk delay.
461     SetTimeout(chrono::SystemClock::for_at_least(interchunk_delay_));
462   }
463 }
464 
HandleReceiveChunk(const Chunk & chunk)465 void Context::HandleReceiveChunk(const Chunk& chunk) {
466   switch (transfer_state_) {
467     case TransferState::kInactive:
468       PW_CRASH("Never should handle chunk while inactive");
469 
470     case TransferState::kTransmitting:
471       PW_CRASH("Receive transfer somehow entered TRANSMITTING state");
472 
473     case TransferState::kCompleted:
474       // If the transfer has already completed and another chunk is received,
475       // re-send the final status chunk.
476       //
477       // TODO(frolv): Final status chunks should be ACKed by the other end. When
478       // that is added, this case should be updated to check if the received
479       // chunk is an ACK. If so, the transfer state can be reset to INACTIVE.
480       // Otherwise, the final status should be re-sent.
481       SendFinalStatusChunk();
482       return;
483 
484     case TransferState::kRecovery:
485       if (chunk.offset != offset_) {
486         if (last_chunk_offset_ == chunk.offset) {
487           PW_LOG_DEBUG(
488               "Transfer %u received repeated offset %u; retry detected, "
489               "resending transfer parameters",
490               static_cast<unsigned>(transfer_id_),
491               static_cast<unsigned>(chunk.offset));
492 
493           UpdateAndSendTransferParameters(TransmitAction::kRetransmit);
494           if (transfer_state_ == TransferState::kCompleted) {
495             SendFinalStatusChunk();
496             return;
497           }
498           PW_LOG_DEBUG("Transfer %u waiting for offset %u, ignoring %u",
499                        static_cast<unsigned>(transfer_id_),
500                        static_cast<unsigned>(offset_),
501                        static_cast<unsigned>(chunk.offset));
502         }
503 
504         last_chunk_offset_ = chunk.offset;
505         SetTimeout(chunk_timeout_);
506         return;
507       }
508 
509       PW_LOG_DEBUG("Transfer %u received expected offset %u, resuming transfer",
510                    static_cast<unsigned>(transfer_id_),
511                    static_cast<unsigned>(offset_));
512       set_transfer_state(TransferState::kWaiting);
513 
514       // The correct chunk was received; process it normally.
515       [[fallthrough]];
516     case TransferState::kWaiting:
517       HandleReceivedData(chunk);
518       if (transfer_state_ == TransferState::kCompleted) {
519         SendFinalStatusChunk();
520       }
521       return;
522   }
523 }
524 
HandleReceivedData(const Chunk & chunk)525 void Context::HandleReceivedData(const Chunk& chunk) {
526   if (chunk.data.size() > pending_bytes_) {
527     // End the transfer, as this indicates a bug with the client implementation
528     // where it doesn't respect pending_bytes. Trying to recover from here
529     // could potentially result in an infinite transfer loop.
530     PW_LOG_ERROR(
531         "Transfer %u received more data than what was requested (%u received "
532         "for %u pending); terminating transfer.",
533         id_for_log(),
534         static_cast<unsigned>(chunk.data.size()),
535         static_cast<unsigned>(pending_bytes_));
536     Finish(Status::Internal());
537     return;
538   }
539 
540   if (chunk.offset != offset_) {
541     // Bad offset; reset pending_bytes to send another parameters chunk.
542     PW_LOG_DEBUG(
543         "Transfer %u expected offset %u, received %u; entering recovery state",
544         static_cast<unsigned>(transfer_id_),
545         static_cast<unsigned>(offset_),
546         static_cast<unsigned>(chunk.offset));
547 
548     set_transfer_state(TransferState::kRecovery);
549     SetTimeout(chunk_timeout_);
550 
551     UpdateAndSendTransferParameters(TransmitAction::kRetransmit);
552     return;
553   }
554 
555   // Update the last offset seen so that retries can be detected.
556   last_chunk_offset_ = chunk.offset;
557 
558   // Write staged data from the buffer to the stream.
559   if (!chunk.data.empty()) {
560     if (Status status = writer().Write(chunk.data); !status.ok()) {
561       PW_LOG_ERROR(
562           "Transfer %u write of %u B chunk failed with status %u; aborting "
563           "with DATA_LOSS",
564           static_cast<unsigned>(transfer_id_),
565           static_cast<unsigned>(chunk.data.size()),
566           status.code());
567       Finish(Status::DataLoss());
568       return;
569     }
570 
571     transfer_rate_.Update(chunk.data.size());
572   }
573 
574   // When the client sets remaining_bytes to 0, it indicates completion of the
575   // transfer. Acknowledge the completion through a status chunk and clean up.
576   if (chunk.IsFinalTransmitChunk()) {
577     Finish(OkStatus());
578     return;
579   }
580 
581   // Update the transfer state.
582   offset_ += chunk.data.size();
583   pending_bytes_ -= chunk.data.size();
584 
585   if (chunk.window_end_offset != 0) {
586     if (chunk.window_end_offset < offset_) {
587       PW_LOG_ERROR(
588           "Transfer %u got invalid end offset of %u (current offset %u)",
589           id_for_log(),
590           static_cast<unsigned>(chunk.window_end_offset),
591           static_cast<unsigned>(offset_));
592       Finish(Status::Internal());
593       return;
594     }
595 
596     if (chunk.window_end_offset > window_end_offset_) {
597       // A transmitter should never send a larger end offset than what the
598       // receiver has advertised. If this occurs, there is a bug in the
599       // transmitter implementation. Terminate the transfer.
600       PW_LOG_ERROR(
601           "Transfer %u transmitter sent invalid end offset of %u, "
602           "greater than receiver offset %u",
603           id_for_log(),
604           static_cast<unsigned>(chunk.window_end_offset),
605           static_cast<unsigned>(window_end_offset_));
606       Finish(Status::Internal());
607       return;
608     }
609 
610     window_end_offset_ = chunk.window_end_offset;
611     pending_bytes_ = chunk.window_end_offset - offset_;
612   }
613 
614   SetTimeout(chunk_timeout_);
615 
616   if (pending_bytes_ == 0u) {
617     // Received all pending data. Advance the transfer parameters.
618     UpdateAndSendTransferParameters(TransmitAction::kRetransmit);
619     return;
620   }
621 
622   // Once the transmitter has sent a sufficient amount of data, try to extend
623   // the window to allow it to continue sending data without blocking.
624   uint32_t remaining_window_size = window_end_offset_ - offset_;
625   bool extend_window = remaining_window_size <=
626                        window_size_ / max_parameters_->extend_window_divisor();
627 
628   if (extend_window) {
629     UpdateAndSendTransferParameters(TransmitAction::kExtend);
630     return;
631   }
632 }
633 
SendFinalStatusChunk()634 void Context::SendFinalStatusChunk() {
635   PW_DCHECK(transfer_state_ == TransferState::kCompleted);
636 
637   internal::Chunk chunk = {};
638   chunk.transfer_id = transfer_id_;
639   chunk.status = status_.code();
640   chunk.type = Chunk::Type::kTransferCompletion;
641 
642   PW_LOG_DEBUG("Sending final chunk for transfer %u with status %u",
643                static_cast<unsigned>(transfer_id_),
644                status_.code());
645   EncodeAndSendChunk(chunk);
646 }
647 
Finish(Status status)648 void Context::Finish(Status status) {
649   PW_DCHECK(active());
650 
651   PW_LOG_INFO("Transfer %u completed with status %u",
652               static_cast<unsigned>(transfer_id_),
653               status.code());
654 
655   status.Update(FinalCleanup(status));
656 
657   set_transfer_state(TransferState::kCompleted);
658   SetTimeout(kFinalChunkAckTimeout);
659   status_ = status;
660 }
661 
SetTimeout(chrono::SystemClock::duration timeout)662 void Context::SetTimeout(chrono::SystemClock::duration timeout) {
663   next_timeout_ = chrono::SystemClock::TimePointAfterAtLeast(timeout);
664 }
665 
HandleTimeout()666 void Context::HandleTimeout() {
667   ClearTimeout();
668 
669   switch (transfer_state_) {
670     case TransferState::kCompleted:
671       // A timeout occurring in a completed state indicates that the other side
672       // never ACKed the final status packet. Reset the context to inactive.
673       set_transfer_state(TransferState::kInactive);
674       return;
675 
676     case TransferState::kTransmitting:
677       // A timeout occurring in a TRANSMITTING state indicates that the transfer
678       // has waited for its inter-chunk delay and should transmit its next
679       // chunk.
680       TransmitNextChunk(/*retransmit_requested=*/false);
681       break;
682 
683     case TransferState::kWaiting:
684     case TransferState::kRecovery:
685       // A timeout occurring in a WAITING or RECOVERY state indicates that no
686       // chunk has been received from the other side. The transfer should retry
687       // its previous operation.
688       SetTimeout(chunk_timeout_);  // Finish() clears the timeout if retry fails
689       Retry();
690       break;
691 
692     case TransferState::kInactive:
693       PW_LOG_ERROR("Timeout occurred in INACTIVE state");
694       return;
695   }
696 
697   if (transfer_state_ == TransferState::kCompleted) {
698     SendFinalStatusChunk();
699   }
700 }
701 
Retry()702 void Context::Retry() {
703   if (retries_ == max_retries_) {
704     PW_LOG_ERROR("Transfer %u failed to receive a chunk after %u retries.",
705                  static_cast<unsigned>(transfer_id_),
706                  static_cast<unsigned>(retries_));
707     PW_LOG_ERROR("Canceling transfer.");
708     Finish(Status::DeadlineExceeded());
709     return;
710   }
711 
712   ++retries_;
713 
714   if (type() == TransferType::kReceive) {
715     // Resend the most recent transfer parameters.
716     PW_LOG_DEBUG(
717         "Receive transfer %u timed out waiting for chunk; resending parameters",
718         static_cast<unsigned>(transfer_id_));
719 
720     SendTransferParameters(TransmitAction::kRetransmit);
721     return;
722   }
723 
724   // In a transmit, if a data chunk has not yet been sent, the initial transfer
725   // parameters did not arrive from the receiver. Resend the initial chunk.
726   if ((flags_ & kFlagsDataSent) != kFlagsDataSent) {
727     PW_LOG_DEBUG(
728         "Transmit transfer %u timed out waiting for initial parameters",
729         static_cast<unsigned>(transfer_id_));
730     SendInitialTransmitChunk();
731     return;
732   }
733 
734   // Otherwise, resend the most recent chunk. If the reader doesn't support
735   // seeking, this isn't possible, so just terminate the transfer immediately.
736   if (!reader().Seek(last_chunk_offset_).ok()) {
737     PW_LOG_ERROR("Transmit transfer %d timed out waiting for new parameters.",
738                  static_cast<unsigned>(transfer_id_));
739     PW_LOG_ERROR("Retrying requires a seekable reader. Alas, ours is not.");
740     Finish(Status::DeadlineExceeded());
741     return;
742   }
743 
744   // Rewind the transfer position and resend the chunk.
745   size_t last_size_sent = offset_ - last_chunk_offset_;
746   offset_ = last_chunk_offset_;
747   pending_bytes_ += last_size_sent;
748 
749   TransmitNextChunk(/*retransmit_requested=*/false);
750 }
751 
MaxWriteChunkSize(uint32_t max_chunk_size_bytes,uint32_t channel_id) const752 uint32_t Context::MaxWriteChunkSize(uint32_t max_chunk_size_bytes,
753                                     uint32_t channel_id) const {
754   // Start with the user-provided maximum chunk size, which should be the usable
755   // payload length on the RPC ingress path after any transport overhead.
756   ptrdiff_t max_size = max_chunk_size_bytes;
757 
758   // Subtract the RPC overhead (pw_rpc/internal/packet.proto).
759   //
760   //   type:       1 byte key, 1 byte value (CLIENT_STREAM)
761   //   channel_id: 1 byte key, varint value (calculate from stream)
762   //   service_id: 1 byte key, 4 byte value
763   //   method_id:  1 byte key, 4 byte value
764   //   payload:    1 byte key, varint length (remaining space)
765   //   status:     0 bytes (not set in stream packets)
766   //
767   //   TOTAL: 14 bytes + encoded channel_id size + encoded payload length
768   //
769   max_size -= 14;
770   max_size -= varint::EncodedSize(channel_id);
771   max_size -= varint::EncodedSize(max_size);
772 
773   // TODO(frolv): Temporarily add 5 bytes for the new call_id change. The RPC
774   // overhead calculation will be moved into an RPC helper to avoid having
775   // pw_transfer depend on RPC internals.
776   max_size -= 5;
777 
778   // Subtract the transfer service overhead for a client write chunk
779   // (pw_transfer/transfer.proto).
780   //
781   //   transfer_id: 1 byte key, varint value (calculate)
782   //   offset:      1 byte key, varint value (calculate)
783   //   data:        1 byte key, varint length (remaining space)
784   //
785   //   TOTAL: 3 + encoded transfer_id + encoded offset + encoded data length
786   //
787   max_size -= 3;
788   max_size -= varint::EncodedSize(transfer_id_);
789   max_size -= varint::EncodedSize(window_end_offset_);
790   max_size -= varint::EncodedSize(max_size);
791 
792   // A resulting value of zero (or less) renders write transfers unusable, as
793   // there is no space to send any payload. This should be considered a
794   // programmer error in the transfer service setup.
795   PW_CHECK_INT_GT(
796       max_size,
797       0,
798       "Transfer service maximum chunk size is too small to fit a payload. "
799       "Increase max_chunk_size_bytes to support write transfers.");
800 
801   return max_size;
802 }
803 
804 }  // namespace pw::transfer::internal
805 
806 PW_MODIFY_DIAGNOSTICS_POP();
807