• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #define PW_LOG_MODULE_NAME "TRN"
16 #define PW_LOG_LEVEL PW_TRANSFER_CONFIG_LOG_LEVEL
17 
18 #include "pw_transfer/internal/context.h"
19 
20 #include <chrono>
21 #include <limits>
22 
23 #include "pw_assert/check.h"
24 #include "pw_chrono/system_clock.h"
25 #include "pw_log/log.h"
26 #include "pw_log/rate_limited.h"
27 #include "pw_preprocessor/compiler.h"
28 #include "pw_protobuf/serialized_size.h"
29 #include "pw_transfer/internal/config.h"
30 #include "pw_transfer/transfer.pwpb.h"
31 #include "pw_transfer/transfer_thread.h"
32 #include "pw_varint/varint.h"
33 
34 namespace pw::transfer::internal {
35 
HandleEvent(const Event & event)36 void Context::HandleEvent(const Event& event) {
37   switch (event.type) {
38     case EventType::kNewClientTransfer:
39     case EventType::kNewServerTransfer: {
40       if (active()) {
41         if (event.type == EventType::kNewServerTransfer &&
42             event.new_transfer.session_id == session_id_ &&
43             last_chunk_sent_ == Chunk::Type::kStartAck) {
44           // The client is retrying its initial chunk as the response may not
45           // have made it back. Re-send the handshake response without going
46           // through handler reinitialization.
47           RetryHandshake();
48           return;
49         }
50         Abort(Status::Aborted());
51       }
52 
53       Initialize(event.new_transfer);
54 
55       if (event.type == EventType::kNewClientTransfer) {
56         InitiateTransferAsClient();
57       } else {
58         if (StartTransferAsServer(event.new_transfer)) {
59           // TODO(frolv): This should probably be restructured.
60           HandleChunkEvent({.context_identifier = event.new_transfer.session_id,
61                             .match_resource_id = false,  // Unused.
62                             .data = event.new_transfer.raw_chunk_data,
63                             .size = event.new_transfer.raw_chunk_size});
64         }
65       }
66       return;
67     }
68 
69     case EventType::kClientChunk:
70     case EventType::kServerChunk:
71       PW_CHECK(initialized());
72       HandleChunkEvent(event.chunk);
73       return;
74 
75     case EventType::kClientTimeout:
76     case EventType::kServerTimeout:
77       HandleTimeout();
78       return;
79 
80     case EventType::kClientEndTransfer:
81     case EventType::kServerEndTransfer:
82       if (active()) {
83         if (event.end_transfer.send_status_chunk) {
84           TerminateTransfer(event.end_transfer.status);
85         } else {
86           Abort(event.end_transfer.status);
87         }
88       }
89       return;
90 
91     case EventType::kSendStatusChunk:
92     case EventType::kAddTransferHandler:
93     case EventType::kRemoveTransferHandler:
94     case EventType::kSetStream:
95     case EventType::kTerminate:
96     case EventType::kUpdateClientTransfer:
97     case EventType::kGetResourceStatus:
98       // These events are intended for the transfer thread and should never be
99       // forwarded through to a context.
100       PW_CRASH("Transfer context received a transfer thread event");
101   }
102 }
103 
InitiateTransferAsClient()104 void Context::InitiateTransferAsClient() {
105   PW_DCHECK(active());
106 
107   SetTimeout(initial_chunk_timeout_);
108 
109   PW_LOG_INFO("Starting transfer for resource %u",
110               static_cast<unsigned>(resource_id_));
111 
112   // Receive transfers should prepare their initial parameters to be send in the
113   // initial chunk.
114   if (type() == TransferType::kReceive) {
115     UpdateTransferParameters(TransmitAction::kBegin);
116   }
117 
118   if (desired_protocol_version_ == ProtocolVersion::kLegacy) {
119     // Legacy transfers go straight into the data transfer phase without a
120     // handshake.
121     if (type() == TransferType::kReceive) {
122       SendTransferParameters(TransmitAction::kBegin);
123     } else {
124       SendInitialLegacyTransmitChunk();
125     }
126 
127     LogTransferConfiguration();
128     return;
129   }
130 
131   // In newer protocol versions, begin the initial transfer handshake.
132   Chunk start_chunk(desired_protocol_version_, Chunk::Type::kStart);
133   start_chunk.set_desired_session_id(session_id_);
134   start_chunk.set_resource_id(resource_id_);
135   start_chunk.set_initial_offset(offset_);
136 
137   if (type() == TransferType::kReceive) {
138     // Parameters should still be set on the initial chunk for backwards
139     // compatibility if the server only supports the legacy protocol.
140     SetTransferParameters(start_chunk);
141   }
142 
143   EncodeAndSendChunk(start_chunk);
144 }
145 
StartTransferAsServer(const NewTransferEvent & new_transfer)146 bool Context::StartTransferAsServer(const NewTransferEvent& new_transfer) {
147   PW_LOG_INFO("Starting %s transfer %u for resource %u with offset %u",
148               new_transfer.type == TransferType::kTransmit ? "read" : "write",
149               static_cast<unsigned>(new_transfer.session_id),
150               static_cast<unsigned>(new_transfer.resource_id),
151               static_cast<unsigned>(new_transfer.initial_offset));
152   LogTransferConfiguration();
153 
154   flags_ |= kFlagsContactMade;
155 
156   if (Status status = new_transfer.handler->Prepare(
157           new_transfer.type, new_transfer.initial_offset);
158       !status.ok()) {
159     PW_LOG_WARN("Transfer handler %u prepare failed with status %u",
160                 static_cast<unsigned>(new_transfer.handler->id()),
161                 status.code());
162 
163     // As this failure occurs at the start of a transfer, no protocol version is
164     // yet negotiated and one must be set to send a response. It is okay to use
165     // the desired version here, as that comes from the client.
166     configured_protocol_version_ = desired_protocol_version_;
167 
168     status = (status.IsPermissionDenied() || status.IsUnimplemented() ||
169               status.IsResourceExhausted())
170                  ? status
171                  : Status::DataLoss();
172     TerminateTransfer(status, /*with_resource_id=*/true);
173     return false;
174   }
175 
176   // Initialize doesn't set the handler since it's specific to server transfers.
177   static_cast<ServerContext&>(*this).set_handler(*new_transfer.handler);
178 
179   // Server transfers use the stream provided by the handler rather than the
180   // stream included in the NewTransferEvent.
181   stream_ = &new_transfer.handler->stream();
182 
183   return true;
184 }
185 
SendInitialLegacyTransmitChunk()186 void Context::SendInitialLegacyTransmitChunk() {
187   // A transmitter begins a transfer by sending the ID of the resource to which
188   // it wishes to write.
189   Chunk chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart);
190   chunk.set_session_id(resource_id_);
191 
192   EncodeAndSendChunk(chunk);
193 }
194 
UpdateTransferParameters(TransmitAction action)195 void Context::UpdateTransferParameters(TransmitAction action) {
196   max_chunk_size_bytes_ = MaxWriteChunkSize(
197       max_parameters_->max_chunk_size_bytes(), rpc_writer_->channel_id());
198   uint32_t window_size = 0;
199 
200   if (max_chunk_size_bytes_ > max_parameters_->max_window_size_bytes()) {
201     window_size =
202         std::min(max_parameters_->max_window_size_bytes(),
203                  static_cast<uint32_t>(writer().ConservativeWriteLimit()));
204   } else {
205     // Adjust the window size based on the latest event in the transfer.
206     switch (action) {
207       case TransmitAction::kBegin:
208       case TransmitAction::kFirstParameters:
209         // A transfer always begins with a window size of one chunk, set during
210         // initialization. No further handling is required.
211         break;
212 
213       case TransmitAction::kExtend:
214         // Window was received successfully without packet loss and should grow.
215         // Double the window size during slow start, or increase it by a single
216         // chunk in congestion avoidance.
217         if (transmit_phase_ == TransmitPhase::kCongestionAvoidance) {
218           window_size_multiplier_ += 1;
219         } else {
220           window_size_multiplier_ *= 2;
221         }
222 
223         // The window size can never exceed the user-specified maximum bytes. If
224         // it does, reduce the multiplier to the largest size that fits.
225         if (window_size_multiplier_ * max_chunk_size_bytes_ >
226             max_parameters_->max_window_size_bytes()) {
227           window_size_multiplier_ =
228               max_parameters_->max_window_size_bytes() / max_chunk_size_bytes_;
229         }
230         break;
231 
232       case TransmitAction::kRetransmit:
233         // A packet was lost: shrink the window size. Additionally, after the
234         // first packet loss, transition from the slow start to the congestion
235         // avoidance phase of the transfer.
236         if (transmit_phase_ == TransmitPhase::kSlowStart) {
237           transmit_phase_ = TransmitPhase::kCongestionAvoidance;
238         }
239         window_size_multiplier_ =
240             std::max(window_size_multiplier_ / static_cast<uint32_t>(2),
241                      static_cast<uint32_t>(1));
242         break;
243     }
244 
245     window_size =
246         std::min({window_size_multiplier_ * max_chunk_size_bytes_,
247                   max_parameters_->max_window_size_bytes(),
248                   static_cast<uint32_t>(writer().ConservativeWriteLimit())});
249   }
250 
251   window_size_ = window_size;
252   window_end_offset_ = offset_ + window_size;
253 }
254 
SetTransferParameters(Chunk & parameters)255 void Context::SetTransferParameters(Chunk& parameters) {
256   parameters.set_window_end_offset(window_end_offset_)
257       .set_max_chunk_size_bytes(max_chunk_size_bytes_)
258       .set_min_delay_microseconds(kDefaultChunkDelayMicroseconds)
259       .set_offset(offset_);
260 }
261 
UpdateAndSendTransferParameters(TransmitAction action)262 void Context::UpdateAndSendTransferParameters(TransmitAction action) {
263   UpdateTransferParameters(action);
264 
265   return SendTransferParameters(action);
266 }
267 
SendTransferParameters(TransmitAction action)268 void Context::SendTransferParameters(TransmitAction action) {
269   Chunk::Type type = Chunk::Type::kParametersRetransmit;
270 
271   switch (action) {
272     case TransmitAction::kBegin:
273       type = Chunk::Type::kStart;
274       break;
275     case TransmitAction::kFirstParameters:
276     case TransmitAction::kRetransmit:
277       type = Chunk::Type::kParametersRetransmit;
278       break;
279     case TransmitAction::kExtend:
280       type = Chunk::Type::kParametersContinue;
281       break;
282   }
283 
284   Chunk parameters(configured_protocol_version_, type);
285   parameters.set_session_id(session_id_);
286   SetTransferParameters(parameters);
287 
288   PW_LOG_EVERY_N_DURATION(
289       PW_LOG_LEVEL_INFO,
290       log_rate_limit_,
291       "Transfer rate: %u B/s",
292       static_cast<unsigned>(transfer_rate_.GetRateBytesPerSecond()));
293 
294   PW_LOG_EVERY_N_DURATION(PW_LOG_LEVEL_INFO,
295                           log_rate_limit_,
296                           "Transfer %u sending transfer parameters: "
297                           "offset=%u, window_end_offset=%u, max_chunk_size=%u",
298                           static_cast<unsigned>(session_id_),
299                           static_cast<unsigned>(offset_),
300                           static_cast<unsigned>(window_end_offset_),
301                           static_cast<unsigned>(max_chunk_size_bytes_));
302 
303   if (log_chunks_before_rate_limit_ > 0) {
304     log_chunks_before_rate_limit_--;
305 
306     if (log_chunks_before_rate_limit_ == 0) {
307       log_rate_limit_ = log_rate_limit_cfg_;
308     }
309   }
310 
311   EncodeAndSendChunk(parameters);
312 }
313 
EncodeAndSendChunk(const Chunk & chunk)314 void Context::EncodeAndSendChunk(const Chunk& chunk) {
315   last_chunk_sent_ = chunk.type();
316 
317 #if PW_TRANSFER_CONFIG_DEBUG_CHUNKS
318   if ((chunk.remaining_bytes().has_value() &&
319        chunk.remaining_bytes().value() == 0) ||
320       (chunk.type() != Chunk::Type::kData &&
321        chunk.type() != Chunk::Type::kParametersContinue)) {
322     chunk.LogChunk(false, pw::chrono::SystemClock::duration::zero());
323   }
324 #endif
325 
326 #if PW_TRANSFER_CONFIG_DEBUG_DATA_CHUNKS
327   if (chunk.type() == Chunk::Type::kData ||
328       chunk.type() == Chunk::Type::kParametersContinue) {
329     chunk.LogChunk(false, log_rate_limit_);
330   }
331 #endif
332 
333   Result<ConstByteSpan> data = chunk.Encode(thread_->encode_buffer());
334   if (!data.ok()) {
335     PW_LOG_ERROR("Failed to encode chunk for transfer %u: %d",
336                  static_cast<unsigned>(chunk.session_id()),
337                  data.status().code());
338     if (active()) {
339       TerminateTransfer(Status::Internal());
340     }
341     return;
342   }
343 
344   if (const Status status = rpc_writer_->Write(*data); !status.ok()) {
345     PW_LOG_ERROR("Failed to write chunk for transfer %u: %d",
346                  static_cast<unsigned>(chunk.session_id()),
347                  status.code());
348     if (active()) {
349       TerminateTransfer(Status::Internal());
350     }
351     return;
352   }
353 }
354 
Initialize(const NewTransferEvent & new_transfer)355 void Context::Initialize(const NewTransferEvent& new_transfer) {
356   PW_DCHECK(!active());
357 
358   PW_DCHECK_INT_NE(new_transfer.protocol_version,
359                    ProtocolVersion::kUnknown,
360                    "Cannot start a transfer with an unknown protocol");
361 
362   session_id_ = new_transfer.session_id;
363   resource_id_ = new_transfer.resource_id;
364   desired_protocol_version_ = new_transfer.protocol_version;
365   configured_protocol_version_ = ProtocolVersion::kUnknown;
366 
367   flags_ = static_cast<uint8_t>(new_transfer.type);
368   transfer_state_ = TransferState::kWaiting;
369   retries_ = 0;
370   max_retries_ = new_transfer.max_retries;
371   lifetime_retries_ = 0;
372   max_lifetime_retries_ = new_transfer.max_lifetime_retries;
373 
374   if (desired_protocol_version_ == ProtocolVersion::kLegacy) {
375     // In a legacy transfer, there is no protocol negotiation stage.
376     // Automatically configure the context to run the legacy protocol and
377     // proceed to waiting for a chunk.
378     configured_protocol_version_ = ProtocolVersion::kLegacy;
379     transfer_state_ = TransferState::kWaiting;
380   } else {
381     transfer_state_ = TransferState::kInitiating;
382   }
383 
384   rpc_writer_ = new_transfer.rpc_writer;
385   stream_ = new_transfer.stream;
386 
387   offset_ = new_transfer.initial_offset;
388   initial_offset_ = new_transfer.initial_offset;
389   window_size_ = 0;
390   window_end_offset_ = 0;
391   max_chunk_size_bytes_ = new_transfer.max_parameters->max_chunk_size_bytes();
392 
393   window_size_multiplier_ = 1;
394   transmit_phase_ = TransmitPhase::kSlowStart;
395 
396   max_parameters_ = new_transfer.max_parameters;
397   thread_ = new_transfer.transfer_thread;
398 
399   last_chunk_sent_ = Chunk::Type::kStart;
400   last_chunk_offset_ = 0;
401   chunk_timeout_ = new_transfer.timeout;
402   initial_chunk_timeout_ = new_transfer.initial_timeout;
403   interchunk_delay_ = chrono::SystemClock::for_at_least(
404       std::chrono::microseconds(kDefaultChunkDelayMicroseconds));
405   next_timeout_ = kNoTimeout;
406   log_chunks_before_rate_limit_ = log_chunks_before_rate_limit_cfg_;
407 
408   transfer_rate_.Reset();
409 }
410 
HandleChunkEvent(const ChunkEvent & event)411 void Context::HandleChunkEvent(const ChunkEvent& event) {
412   Result<Chunk> maybe_chunk =
413       Chunk::Parse(ConstByteSpan(event.data, event.size));
414   if (!maybe_chunk.ok()) {
415     return;
416   }
417 
418   Chunk chunk = *maybe_chunk;
419 
420   // Received some data. Reset the retry counter.
421   retries_ = 0;
422   flags_ |= kFlagsContactMade;
423 
424 #if PW_TRANSFER_CONFIG_DEBUG_CHUNKS
425   if (chunk.type() != Chunk::Type::kData &&
426       chunk.type() != Chunk::Type::kParametersContinue) {
427     chunk.LogChunk(true, pw::chrono::SystemClock::duration::zero());
428   }
429 #endif
430 #if PW_TRANSFER_CONFIG_DEBUG_DATA_CHUNKS
431   if (chunk.type() == Chunk::Type::kData ||
432       chunk.type() == Chunk::Type::kParametersContinue) {
433     chunk.LogChunk(true, log_rate_limit_);
434   }
435 #endif
436 
437   if (chunk.IsTerminatingChunk()) {
438     if (active()) {
439       HandleTermination(chunk.status().value());
440     } else {
441       PW_LOG_INFO("Got final status %d for completed transfer %d",
442                   static_cast<int>(chunk.status().value().code()),
443                   static_cast<int>(session_id_));
444     }
445     return;
446   }
447 
448   if (type() == TransferType::kTransmit) {
449     HandleTransmitChunk(chunk);
450   } else {
451     HandleReceiveChunk(chunk);
452   }
453 }
454 
PerformInitialHandshake(const Chunk & chunk)455 void Context::PerformInitialHandshake(const Chunk& chunk) {
456   switch (chunk.type()) {
457     // Initial packet sent from a client to a server.
458     case Chunk::Type::kStart: {
459       UpdateLocalProtocolConfigurationFromPeer(chunk);
460 
461       if (type() == TransferType::kReceive) {
462         // Update window end offset so it is valid.
463         window_end_offset_ = offset_;
464       }
465 
466       // This cast is safe as we know we're running in a transfer server.
467       uint32_t resource_id = static_cast<ServerContext&>(*this).handler()->id();
468 
469       Chunk start_ack(configured_protocol_version_, Chunk::Type::kStartAck);
470       start_ack.set_session_id(session_id_);
471       start_ack.set_resource_id(resource_id);
472       start_ack.set_initial_offset(offset_);
473 
474       EncodeAndSendChunk(start_ack);
475       break;
476     }
477 
478     // Response packet sent from a server to a client, confirming the protocol
479     // version and session_id of the transfer.
480     case Chunk::Type::kStartAck: {
481       UpdateLocalProtocolConfigurationFromPeer(chunk);
482 
483       // This should confirm the offset we're starting at
484       if (offset_ != chunk.initial_offset()) {
485         TerminateTransfer(Status::Unimplemented());
486         break;
487       }
488 
489       Chunk start_ack_confirmation(configured_protocol_version_,
490                                    Chunk::Type::kStartAckConfirmation);
491       start_ack_confirmation.set_session_id(session_id_);
492 
493       if (type() == TransferType::kReceive) {
494         // In a receive transfer, tag the initial transfer parameters onto the
495         // confirmation chunk so that the server can immediately begin sending
496         // data.
497         UpdateTransferParameters(TransmitAction::kFirstParameters);
498         SetTransferParameters(start_ack_confirmation);
499       }
500 
501       set_transfer_state(TransferState::kWaiting);
502       EncodeAndSendChunk(start_ack_confirmation);
503       // we received a response, so we can re-up the timeout while waiting for
504       // parameters.
505       SetTimeout(chunk_timeout_);
506       break;
507     }
508 
509     // Confirmation sent by a client to a server of the configured transfer
510     // version and session ID. Completes the handshake and begins the actual
511     // data transfer.
512     case Chunk::Type::kStartAckConfirmation: {
513       set_transfer_state(TransferState::kWaiting);
514 
515       if (type() == TransferType::kTransmit) {
516         HandleTransmitChunk(chunk);
517       } else {
518         HandleReceiveChunk(chunk);
519       }
520       break;
521     }
522 
523     // If a non-handshake chunk is received during an INITIATING state, the
524     // transfer peer is running a legacy protocol version, which does not
525     // perform a handshake. End the handshake, revert to the legacy protocol,
526     // and process the chunk appropriately.
527     case Chunk::Type::kData:
528     case Chunk::Type::kParametersRetransmit:
529     case Chunk::Type::kParametersContinue:
530 
531       // Update the local session_id, which will map to the transfer_id of the
532       // legacy chunk.
533       session_id_ = chunk.session_id();
534 
535       configured_protocol_version_ = ProtocolVersion::kLegacy;
536       // Cancel if we are not using at least version 2, and we tried to start a
537       // non-zero offset transfer
538       if (chunk.initial_offset() != 0) {
539         PW_LOG_ERROR("Legacy transfer does not support offset transfers!");
540         TerminateTransfer(Status::Internal());
541         break;
542       }
543 
544       set_transfer_state(TransferState::kWaiting);
545 
546       PW_LOG_DEBUG(
547           "Transfer %u tried to start on protocol version %d, but peer only "
548           "supports legacy",
549           id_for_log(),
550           static_cast<int>(desired_protocol_version_));
551 
552       if (type() == TransferType::kTransmit) {
553         HandleTransmitChunk(chunk);
554       } else {
555         HandleReceiveChunk(chunk);
556       }
557       break;
558 
559     case Chunk::Type::kCompletion:
560     case Chunk::Type::kCompletionAck:
561       PW_CRASH(
562           "Transfer completion packets should be processed by "
563           "HandleChunkEvent()");
564       break;
565   }
566 }
567 
UpdateLocalProtocolConfigurationFromPeer(const Chunk & chunk)568 void Context::UpdateLocalProtocolConfigurationFromPeer(const Chunk& chunk) {
569   PW_LOG_DEBUG("Negotiating protocol version: ours=%d, theirs=%d",
570                static_cast<int>(desired_protocol_version_),
571                static_cast<int>(chunk.protocol_version()));
572 
573   configured_protocol_version_ =
574       std::min(desired_protocol_version_, chunk.protocol_version());
575 
576   PW_LOG_INFO("Transfer %u: using protocol version %d",
577               id_for_log(),
578               static_cast<int>(configured_protocol_version_));
579 }
580 
HandleTransmitChunk(const Chunk & chunk)581 void Context::HandleTransmitChunk(const Chunk& chunk) {
582   switch (transfer_state_) {
583     case TransferState::kInactive:
584     case TransferState::kRecovery:
585       PW_CRASH("Never should handle chunk while inactive");
586 
587     case TransferState::kCompleted:
588       // In a legacy transfer, if the transfer has already completed and another
589       // chunk is received, tell the other end that the transfer is over.
590       if (!chunk.IsInitialChunk() && status_.ok()) {
591         status_ = Status::FailedPrecondition();
592       }
593 
594       SendFinalStatusChunk();
595       return;
596 
597     case TransferState::kInitiating:
598       PerformInitialHandshake(chunk);
599       return;
600 
601     case TransferState::kWaiting:
602     case TransferState::kTransmitting:
603       if (chunk.protocol_version() == configured_protocol_version_) {
604         HandleTransferParametersUpdate(chunk);
605       } else {
606         PW_LOG_ERROR(
607             "Transmit transfer %u was configured to use protocol version %d "
608             "but received a chunk with version %d",
609             id_for_log(),
610             static_cast<int>(configured_protocol_version_),
611             static_cast<int>(chunk.protocol_version()));
612         TerminateTransfer(Status::Internal());
613       }
614       return;
615 
616     case TransferState::kTerminating:
617       HandleTerminatingChunk(chunk);
618       return;
619   }
620 }
621 
HandleTransferParametersUpdate(const Chunk & chunk)622 void Context::HandleTransferParametersUpdate(const Chunk& chunk) {
623   bool retransmit = chunk.RequestsTransmissionFromOffset();
624 
625   if (retransmit) {
626     // If the offsets don't match, attempt to seek on the reader. Not all
627     // readers support seeking; abort with UNIMPLEMENTED if this handler
628     // doesn't.
629     if (offset_ != chunk.offset()) {
630       if (Status seek_status = SeekReader(chunk.offset()); !seek_status.ok()) {
631         PW_LOG_WARN("Transfer %u seek to %u failed with status %u",
632                     static_cast<unsigned>(session_id_),
633                     static_cast<unsigned>(chunk.offset()),
634                     seek_status.code());
635 
636         // Remap status codes to return one of the following:
637         //
638         //   INTERNAL: invalid seek, never should happen
639         //   DATA_LOSS: the reader is in a bad state
640         //   UNIMPLEMENTED: seeking is not supported
641         //
642         if (seek_status.IsOutOfRange()) {
643           seek_status = Status::Internal();
644         } else if (!seek_status.IsUnimplemented()) {
645           seek_status = Status::DataLoss();
646         }
647 
648         TerminateTransfer(seek_status);
649         return;
650       }
651     }
652 
653     offset_ = chunk.offset();
654   } else if (chunk.window_end_offset() <= offset_) {
655     PW_LOG_DEBUG("Transfer %u ignoring old rolling window chunk", id_for_log());
656     SetTimeout(chunk_timeout_);
657     return;
658   }
659 
660   window_end_offset_ = chunk.window_end_offset();
661 
662   if (chunk.max_chunk_size_bytes().has_value()) {
663     max_chunk_size_bytes_ = std::min(chunk.max_chunk_size_bytes().value(),
664                                      max_parameters_->max_chunk_size_bytes());
665   }
666 
667   if (chunk.min_delay_microseconds().has_value()) {
668     interchunk_delay_ = chrono::SystemClock::for_at_least(
669         std::chrono::microseconds(chunk.min_delay_microseconds().value()));
670   }
671 
672   if (retransmit) {
673     PW_LOG_INFO(
674         "Transfer %u received parameters type=RETRANSMIT offset=%u "
675         "window_end_offset=%u",
676         static_cast<unsigned>(session_id_),
677         static_cast<unsigned>(chunk.offset()),
678         static_cast<unsigned>(window_end_offset_));
679   } else {
680     PW_LOG_EVERY_N_DURATION(
681         PW_LOG_LEVEL_INFO,
682         std::chrono::seconds(3),
683         "Transfer %u received parameters type=CONTINUE offset=%u "
684         "window_end_offset=%u",
685         static_cast<unsigned>(session_id_),
686         static_cast<unsigned>(chunk.offset()),
687         static_cast<unsigned>(window_end_offset_));
688   }
689 
690   // Parsed all of the parameters; start sending the window.
691   set_transfer_state(TransferState::kTransmitting);
692 
693   TransmitNextChunk(retransmit);
694 }
695 
TransmitNextChunk(bool retransmit_requested)696 void Context::TransmitNextChunk(bool retransmit_requested) {
697   Chunk chunk(configured_protocol_version_, Chunk::Type::kData);
698   chunk.set_session_id(session_id_);
699   chunk.set_offset(offset_);
700 
701   // Reserve space for the data proto field overhead and use the remainder of
702   // the buffer for the chunk data.
703   size_t reserved_size =
704       chunk.EncodedSize() + 1 /* data key */ + 5 /* data size */;
705 
706   size_t total_size = TransferSizeBytes();
707   if (total_size != std::numeric_limits<size_t>::max()) {
708     reserved_size += protobuf::SizeOfVarintField(
709         pwpb::Chunk::Fields::kRemainingBytes, total_size);
710   }
711 
712   ByteSpan buffer = thread_->encode_buffer();
713   Result<ByteSpan> data;
714 
715   if (offset_ < total_size) {
716     // Read the next chunk of data into the encode buffer.
717     ByteSpan data_buffer = buffer.subspan(reserved_size);
718     size_t max_bytes_to_send =
719         std::min(window_end_offset_ - offset_, max_chunk_size_bytes_);
720 
721     if (max_bytes_to_send < data_buffer.size()) {
722       data_buffer = data_buffer.first(max_bytes_to_send);
723     }
724 
725     data = reader().Read(data_buffer);
726   } else {
727     // The user-specified resource size has been reached: respect it.
728     data = Status::OutOfRange();
729   }
730 
731   if (data.status().IsOutOfRange()) {
732     // No more data to read.
733     chunk.set_remaining_bytes(0);
734     window_end_offset_ = offset_;
735 
736     PW_LOG_INFO("Transfer %u sending final chunk with remaining_bytes=0",
737                 static_cast<unsigned>(session_id_));
738   } else if (data.ok()) {
739     if (offset_ == window_end_offset_) {
740       if (retransmit_requested) {
741         PW_LOG_ERROR(
742             "Transfer %u: received an empty retransmit request, but there is "
743             "still data to send; aborting with RESOURCE_EXHAUSTED",
744             id_for_log());
745         TerminateTransfer(Status::ResourceExhausted());
746       } else {
747         PW_LOG_DEBUG(
748             "Transfer %u: ignoring continuation packet for transfer window "
749             "that has already been sent",
750             id_for_log());
751         SetTimeout(chunk_timeout_);
752       }
753       return;  // No data was requested, so there is nothing else to do.
754     }
755 
756     PW_LOG_EVERY_N_DURATION(PW_LOG_LEVEL_DEBUG,
757                             std::chrono::seconds(3),
758                             "Transfer %u sending chunk offset=%u size=%u",
759                             static_cast<unsigned>(session_id_),
760                             static_cast<unsigned>(offset_),
761                             static_cast<unsigned>(data.value().size()));
762 
763     chunk.set_payload(data.value());
764     last_chunk_offset_ = offset_;
765     offset_ += data.value().size();
766 
767     if (total_size != std::numeric_limits<size_t>::max()) {
768       chunk.set_remaining_bytes(total_size - offset_);
769     }
770   } else {
771     PW_LOG_ERROR("Transfer %u Read() failed with status %u",
772                  static_cast<unsigned>(session_id_),
773                  data.status().code());
774     TerminateTransfer(Status::DataLoss());
775     return;
776   }
777 
778   Result<ConstByteSpan> encoded_chunk = chunk.Encode(buffer);
779   if (!encoded_chunk.ok()) {
780     PW_LOG_ERROR("Transfer %u failed to encode transmit chunk",
781                  static_cast<unsigned>(session_id_));
782     TerminateTransfer(Status::Internal());
783     return;
784   }
785 
786   if (const Status status = rpc_writer_->Write(*encoded_chunk); !status.ok()) {
787     PW_LOG_ERROR("Transfer %u failed to send transmit chunk, status %u",
788                  static_cast<unsigned>(session_id_),
789                  status.code());
790     TerminateTransfer(Status::DataLoss());
791     return;
792   }
793 
794   last_chunk_sent_ = chunk.type();
795   flags_ |= kFlagsDataSent;
796 
797   if (offset_ == window_end_offset_ || offset_ == total_size) {
798     // Sent all requested data. Must now wait for next parameters from the
799     // receiver.
800     set_transfer_state(TransferState::kWaiting);
801     SetTimeout(chunk_timeout_);
802   } else {
803     // More data is to be sent. Set a timeout to send the next chunk following
804     // the chunk delay.
805     SetTimeout(chrono::SystemClock::for_at_least(interchunk_delay_));
806   }
807 }
808 
HandleReceiveChunk(const Chunk & chunk)809 void Context::HandleReceiveChunk(const Chunk& chunk) {
810   if (transfer_state_ == TransferState::kInitiating) {
811     PerformInitialHandshake(chunk);
812     return;
813   }
814 
815   if (transfer_state_ == TransferState::kCompleted) {
816     // If the transfer has already completed and another chunk is received,
817     // re-send the final status chunk.
818     SendFinalStatusChunk();
819     return;
820   }
821 
822   if (chunk.protocol_version() != configured_protocol_version_) {
823     PW_LOG_ERROR(
824         "Receive transfer %u was configured to use protocol version %d "
825         "but received a chunk with version %d",
826         id_for_log(),
827         static_cast<int>(configured_protocol_version_),
828         static_cast<int>(chunk.protocol_version()));
829     TerminateTransfer(Status::Internal());
830     return;
831   }
832 
833   switch (transfer_state_) {
834     case TransferState::kInactive:
835     case TransferState::kTransmitting:
836     case TransferState::kInitiating:
837       PW_CRASH("HandleReceiveChunk() called in bad transfer state %d",
838                static_cast<int>(transfer_state_));
839 
840     case TransferState::kCompleted:
841       // Handled earlier.
842       PW_UNREACHABLE;
843 
844     case TransferState::kRecovery:
845       if (chunk.offset() != offset_) {
846         if (last_chunk_offset_ == chunk.offset()) {
847           PW_LOG_DEBUG(
848               "Transfer %u received repeated offset %u; retry detected, "
849               "resending transfer parameters",
850               static_cast<unsigned>(session_id_),
851               static_cast<unsigned>(chunk.offset()));
852 
853           log_chunks_before_rate_limit_ = log_chunks_before_rate_limit_cfg_;
854           log_rate_limit_ = kNoRateLimit;
855 
856           UpdateAndSendTransferParameters(TransmitAction::kRetransmit);
857           if (DataTransferComplete()) {
858             return;
859           }
860           PW_LOG_DEBUG("Transfer %u waiting for offset %u, ignoring %u",
861                        static_cast<unsigned>(session_id_),
862                        static_cast<unsigned>(offset_),
863                        static_cast<unsigned>(chunk.offset()));
864         }
865 
866         last_chunk_offset_ = chunk.offset();
867         SetTimeout(chunk_timeout_);
868         return;
869       }
870 
871       PW_LOG_DEBUG("Transfer %u received expected offset %u, resuming transfer",
872                    static_cast<unsigned>(session_id_),
873                    static_cast<unsigned>(offset_));
874       set_transfer_state(TransferState::kWaiting);
875 
876       // The correct chunk was received; process it normally.
877       [[fallthrough]];
878     case TransferState::kWaiting:
879       HandleReceivedData(chunk);
880       return;
881 
882     case TransferState::kTerminating:
883       HandleTerminatingChunk(chunk);
884       return;
885   }
886 }
887 
HandleReceivedData(const Chunk & chunk)888 void Context::HandleReceivedData(const Chunk& chunk) {
889   if (chunk.offset() != offset_) {
890     if (chunk.offset() + chunk.payload().size() <= offset_ &&
891         chunk.type() != Chunk::Type::kStartAckConfirmation) {
892       // If the chunk's data has already been received, don't go through a full
893       // recovery cycle to avoid shrinking the window size and potentially
894       // thrashing. The expected data may already be in-flight, so just allow
895       // the transmitter to keep going with a CONTINUE parameters chunk.
896       //
897       // However, as a retried chunk indicates a potential issue with the
898       // underlying connection, shrink the transfer window.
899       //
900       // Start ack confs do not come with an offset set, so it can get stuck
901       // here if we are doing an offset transfer.
902       PW_LOG_DEBUG("Transfer %u received duplicate chunk with offset %u",
903                    id_for_log(),
904                    static_cast<unsigned>(chunk.offset()));
905       UpdateTransferParameters(TransmitAction::kRetransmit);
906       SendTransferParameters(TransmitAction::kExtend);
907     } else {
908       // Bad offset; reset window size to send another parameters chunk.
909       PW_LOG_WARN(
910           "Transfer %u expected offset %u, received %u; entering recovery "
911           "state",
912           static_cast<unsigned>(session_id_),
913           static_cast<unsigned>(offset_),
914           static_cast<unsigned>(chunk.offset()));
915 
916       set_transfer_state(TransferState::kRecovery);
917       UpdateAndSendTransferParameters(TransmitAction::kRetransmit);
918     }
919 
920     SetTimeout(chunk_timeout_);
921     return;
922   }
923 
924   if (chunk.offset() + chunk.payload().size() > window_end_offset_) {
925     PW_LOG_WARN(
926         "Transfer %u received more data than what was requested (%u received "
927         "for %u pending); attempting to recover.",
928         id_for_log(),
929         static_cast<unsigned>(chunk.payload().size()),
930         static_cast<unsigned>(window_end_offset_ - offset_));
931 
932     // To prevent an improperly implemented client which doesn't respect
933     // window_end_offset from entering an infinite retry loop, limit recovery
934     // attempts to the lifetime retry count.
935     lifetime_retries_++;
936     if (lifetime_retries_ <= max_lifetime_retries_) {
937       set_transfer_state(TransferState::kRecovery);
938       SetTimeout(chunk_timeout_);
939 
940       UpdateAndSendTransferParameters(TransmitAction::kRetransmit);
941     } else {
942       TerminateTransfer(Status::Internal());
943     }
944     return;
945   }
946 
947   // Update the last offset seen so that retries can be detected.
948   last_chunk_offset_ = chunk.offset();
949 
950   // Write staged data from the buffer to the stream.
951   if (chunk.has_payload()) {
952     if (Status status = writer().Write(chunk.payload()); !status.ok()) {
953       PW_LOG_ERROR(
954           "Transfer %u write of %u B chunk failed with status %u; aborting "
955           "with DATA_LOSS",
956           static_cast<unsigned>(session_id_),
957           static_cast<unsigned>(chunk.payload().size()),
958           status.code());
959       TerminateTransfer(Status::DataLoss());
960       return;
961     }
962 
963     transfer_rate_.Update(chunk.payload().size());
964   }
965 
966   // Update the transfer state.
967   offset_ += chunk.payload().size();
968 
969   // When the client sets remaining_bytes to 0, it indicates completion of the
970   // transfer. Acknowledge the completion through a status chunk and clean up.
971   if (chunk.IsFinalTransmitChunk()) {
972     TerminateTransfer(OkStatus());
973     return;
974   }
975 
976   if (chunk.window_end_offset() != 0) {
977     if (chunk.window_end_offset() < offset_) {
978       PW_LOG_ERROR(
979           "Transfer %u got invalid end offset of %u (current offset %u)",
980           id_for_log(),
981           static_cast<unsigned>(chunk.window_end_offset()),
982           static_cast<unsigned>(offset_));
983       TerminateTransfer(Status::Internal());
984       return;
985     }
986 
987     if (chunk.window_end_offset() > window_end_offset_) {
988       // A transmitter should never send a larger end offset than what the
989       // receiver has advertised. If this occurs, there is a bug in the
990       // transmitter implementation. Terminate the transfer.
991       PW_LOG_ERROR(
992           "Transfer %u transmitter sent invalid end offset of %u, "
993           "greater than receiver offset %u",
994           id_for_log(),
995           static_cast<unsigned>(chunk.window_end_offset()),
996           static_cast<unsigned>(window_end_offset_));
997       TerminateTransfer(Status::Internal());
998       return;
999     }
1000 
1001     window_end_offset_ = chunk.window_end_offset();
1002   }
1003 
1004   SetTimeout(chunk_timeout_);
1005 
1006   if (chunk.type() == Chunk::Type::kStartAckConfirmation) {
1007     // Send the first parameters in the receive transfer.
1008     UpdateAndSendTransferParameters(TransmitAction::kFirstParameters);
1009     return;
1010   }
1011 
1012   if (offset_ == window_end_offset_) {
1013     // Received all pending data. Advance the transfer parameters.
1014     UpdateAndSendTransferParameters(TransmitAction::kExtend);
1015     return;
1016   }
1017 
1018   // Once the transmitter has sent a sufficient amount of data, try to extend
1019   // the window to allow it to continue sending data without blocking.
1020   uint32_t remaining_window_size = window_end_offset_ - offset_;
1021   bool extend_window = remaining_window_size <=
1022                        window_size_ / max_parameters_->extend_window_divisor();
1023 
1024   if (extend_window) {
1025     UpdateAndSendTransferParameters(TransmitAction::kExtend);
1026   }
1027 }
1028 
HandleTerminatingChunk(const Chunk & chunk)1029 void Context::HandleTerminatingChunk(const Chunk& chunk) {
1030   switch (chunk.type()) {
1031     case Chunk::Type::kCompletion:
1032       PW_CRASH("Completion chunks should be processed by HandleChunkEvent()");
1033 
1034     case Chunk::Type::kCompletionAck:
1035       PW_LOG_INFO(
1036           "Transfer %u completed with status %u", id_for_log(), status_.code());
1037       set_transfer_state(TransferState::kInactive);
1038       break;
1039 
1040     case Chunk::Type::kData:
1041     case Chunk::Type::kStart:
1042     case Chunk::Type::kParametersRetransmit:
1043     case Chunk::Type::kParametersContinue:
1044     case Chunk::Type::kStartAck:
1045     case Chunk::Type::kStartAckConfirmation:
1046       // If a non-completion chunk is received in a TERMINATING state, re-send
1047       // the transfer's completion chunk to the peer.
1048       EncodeAndSendChunk(
1049           Chunk::Final(configured_protocol_version_, session_id_, status_));
1050       break;
1051   }
1052 }
1053 
TerminateTransfer(Status status,bool with_resource_id)1054 void Context::TerminateTransfer(Status status, bool with_resource_id) {
1055   if (transfer_state_ == TransferState::kTerminating ||
1056       transfer_state_ == TransferState::kCompleted) {
1057     // Transfer has already been terminated; no need to do it again.
1058     return;
1059   }
1060 
1061   Finish(status);
1062 
1063   PW_LOG_INFO("Transfer %u terminating with status: %u, offset: %u",
1064               static_cast<unsigned>(session_id_),
1065               status.code(),
1066               static_cast<unsigned>(offset_));
1067 
1068   if (ShouldSkipCompletionHandshake()) {
1069     set_transfer_state(TransferState::kCompleted);
1070   } else {
1071     set_transfer_state(TransferState::kTerminating);
1072     SetTimeout(chunk_timeout_);
1073   }
1074 
1075   // Don't send a final chunk if the other end of the transfer has not yet
1076   // made contact, as there is no one to notify.
1077   if ((flags_ & kFlagsContactMade) == kFlagsContactMade) {
1078     SendFinalStatusChunk(with_resource_id);
1079   }
1080 }
1081 
HandleTermination(Status status)1082 void Context::HandleTermination(Status status) {
1083   Finish(status);
1084 
1085   PW_LOG_INFO("Transfer %u completed with status %u",
1086               static_cast<unsigned>(session_id_),
1087               status.code());
1088 
1089   if (ShouldSkipCompletionHandshake()) {
1090     set_transfer_state(TransferState::kCompleted);
1091   } else {
1092     EncodeAndSendChunk(
1093         Chunk(configured_protocol_version_, Chunk::Type::kCompletionAck)
1094             .set_session_id(session_id_));
1095 
1096     set_transfer_state(TransferState::kInactive);
1097   }
1098 }
1099 
SendFinalStatusChunk(bool with_resource_id)1100 void Context::SendFinalStatusChunk(bool with_resource_id) {
1101   PW_DCHECK(transfer_state_ == TransferState::kCompleted ||
1102             transfer_state_ == TransferState::kTerminating);
1103 
1104   if (configured_protocol_version_ == ProtocolVersion::kUnknown) {
1105     // If the transfer is ended before contact is made with the peer,
1106     // the protocol version may not yet be configured. Use the desired
1107     // version for the status chunk.
1108     configured_protocol_version_ = desired_protocol_version_;
1109     PW_LOG_WARN(
1110         "Transfer %u ending before protocol version was confirmed; using "
1111         "version %u",
1112         id_for_log(),
1113         static_cast<unsigned>(desired_protocol_version_));
1114   }
1115 
1116   PW_LOG_INFO("Sending final chunk for transfer %u with status %u",
1117               static_cast<unsigned>(session_id_),
1118               status_.code());
1119 
1120   Chunk chunk =
1121       Chunk::Final(configured_protocol_version_, session_id_, status_);
1122   if (with_resource_id) {
1123     chunk.set_resource_id(resource_id_);
1124   }
1125   EncodeAndSendChunk(chunk);
1126 }
1127 
Finish(Status status)1128 void Context::Finish(Status status) {
1129   PW_DCHECK(active());
1130 
1131   status.Update(FinalCleanup(status));
1132   status_ = status;
1133 
1134   SetTimeout(kFinalChunkAckTimeout);
1135 }
1136 
SetTimeout(chrono::SystemClock::duration timeout)1137 void Context::SetTimeout(chrono::SystemClock::duration timeout) {
1138   next_timeout_ = chrono::SystemClock::TimePointAfterAtLeast(timeout);
1139 }
1140 
HandleTimeout()1141 void Context::HandleTimeout() {
1142   ClearTimeout();
1143 
1144   switch (transfer_state_) {
1145     case TransferState::kCompleted:
1146       // A timeout occurring in a completed state indicates that the other side
1147       // never ACKed the final status packet. Reset the context to inactive.
1148       set_transfer_state(TransferState::kInactive);
1149       return;
1150 
1151     case TransferState::kTransmitting:
1152       // A timeout occurring in a TRANSMITTING state indicates that the transfer
1153       // has waited for its inter-chunk delay and should transmit its next
1154       // chunk.
1155       TransmitNextChunk(/*retransmit_requested=*/false);
1156       break;
1157 
1158     case TransferState::kInitiating:
1159     case TransferState::kWaiting:
1160     case TransferState::kRecovery:
1161     case TransferState::kTerminating:
1162       // A timeout occurring in a transfer or handshake state indicates that no
1163       // chunk has been received from the other side. The transfer should retry
1164       // its previous operation.
1165       //
1166       // The timeout is set immediately. Retry() will clear it if it fails.
1167       if (transfer_state_ == TransferState::kInitiating &&
1168           last_chunk_sent_ == Chunk::Type::kStart) {
1169         SetTimeout(initial_chunk_timeout_);
1170       } else {
1171         SetTimeout(chunk_timeout_);
1172       }
1173       Retry();
1174       break;
1175 
1176     case TransferState::kInactive:
1177       PW_LOG_ERROR("Timeout occurred in INACTIVE state");
1178       return;
1179   }
1180 }
1181 
Retry()1182 void Context::Retry() {
1183   if (retries_ == max_retries_ || lifetime_retries_ == max_lifetime_retries_) {
1184     PW_LOG_ERROR(
1185         "Transfer %u failed to receive a chunk after %u retries (lifetime %u).",
1186         id_for_log(),
1187         static_cast<unsigned>(retries_),
1188         static_cast<unsigned>(lifetime_retries_));
1189     PW_LOG_ERROR("Canceling transfer.");
1190 
1191     if (transfer_state_ == TransferState::kTerminating) {
1192       // Timeouts occurring in a TERMINATING state indicate that the completion
1193       // chunk was never ACKed. Simply clean up the transfer context.
1194       set_transfer_state(TransferState::kInactive);
1195     } else {
1196       TerminateTransfer(Status::DeadlineExceeded());
1197     }
1198     return;
1199   }
1200 
1201   ++retries_;
1202   ++lifetime_retries_;
1203 
1204   if (transfer_state_ == TransferState::kInitiating ||
1205       last_chunk_sent_ == Chunk::Type::kStartAckConfirmation) {
1206     RetryHandshake();
1207     return;
1208   }
1209 
1210   if (transfer_state_ == TransferState::kTerminating) {
1211     EncodeAndSendChunk(
1212         Chunk::Final(configured_protocol_version_, session_id_, status_));
1213     return;
1214   }
1215 
1216   if (type() == TransferType::kReceive) {
1217     // Resend the most recent transfer parameters.
1218     PW_LOG_DEBUG(
1219         "Receive transfer %u timed out waiting for chunk; resending parameters",
1220         static_cast<unsigned>(session_id_));
1221 
1222     UpdateAndSendTransferParameters(TransmitAction::kRetransmit);
1223     return;
1224   }
1225 
1226   // In a transmit, if a data chunk has not yet been sent, the initial transfer
1227   // parameters did not arrive from the receiver. Resend the initial chunk.
1228   if ((flags_ & kFlagsDataSent) != kFlagsDataSent) {
1229     PW_LOG_DEBUG(
1230         "Transmit transfer %u timed out waiting for initial parameters",
1231         static_cast<unsigned>(session_id_));
1232     SendInitialLegacyTransmitChunk();
1233     return;
1234   }
1235 
1236   // Otherwise, resend the most recent chunk. If the reader doesn't support
1237   // seeking, this isn't possible, so just terminate the transfer immediately.
1238   if (!SeekReader(last_chunk_offset_).ok()) {
1239     PW_LOG_ERROR("Transmit transfer %u timed out waiting for new parameters.",
1240                  id_for_log());
1241     PW_LOG_ERROR("Retrying requires a seekable reader. Alas, ours is not.");
1242     TerminateTransfer(Status::DeadlineExceeded());
1243     return;
1244   }
1245 
1246   // Rewind the transfer position and resend the chunk.
1247   offset_ = last_chunk_offset_;
1248 
1249   TransmitNextChunk(/*retransmit_requested=*/false);
1250 }
1251 
RetryHandshake()1252 void Context::RetryHandshake() {
1253   Chunk retry_chunk(configured_protocol_version_, last_chunk_sent_);
1254 
1255   switch (last_chunk_sent_) {
1256     case Chunk::Type::kStart:
1257       // No protocol version is yet configured at the time of sending the start
1258       // chunk, so we use the client's desired version instead.
1259       retry_chunk.set_protocol_version(desired_protocol_version_)
1260           .set_desired_session_id(session_id_)
1261           .set_resource_id(resource_id_)
1262           .set_initial_offset(offset_);
1263       if (type() == TransferType::kReceive) {
1264         SetTransferParameters(retry_chunk);
1265       }
1266       break;
1267 
1268     case Chunk::Type::kStartAck:
1269       retry_chunk.set_session_id(session_id_)
1270           .set_resource_id(static_cast<ServerContext&>(*this).handler()->id());
1271       break;
1272 
1273     case Chunk::Type::kStartAckConfirmation:
1274       retry_chunk.set_session_id(session_id_);
1275       if (type() == TransferType::kReceive) {
1276         SetTransferParameters(retry_chunk);
1277       }
1278       break;
1279 
1280     case Chunk::Type::kData:
1281     case Chunk::Type::kParametersRetransmit:
1282     case Chunk::Type::kParametersContinue:
1283     case Chunk::Type::kCompletion:
1284     case Chunk::Type::kCompletionAck:
1285       PW_CRASH("Should not RetryHandshake() when not in handshake phase");
1286   }
1287 
1288   EncodeAndSendChunk(retry_chunk);
1289 }
1290 
MaxWriteChunkSize(uint32_t max_chunk_size_bytes,uint32_t channel_id) const1291 uint32_t Context::MaxWriteChunkSize(uint32_t max_chunk_size_bytes,
1292                                     uint32_t channel_id) const {
1293   // Start with the user-provided maximum chunk size, which should be the usable
1294   // payload length on the RPC ingress path after any transport overhead.
1295   ptrdiff_t max_size = max_chunk_size_bytes;
1296 
1297   // Subtract the RPC overhead (pw_rpc/internal/packet.proto).
1298   //
1299   //   type:       1 byte key, 1 byte value (CLIENT_STREAM)
1300   //   channel_id: 1 byte key, varint value (calculate from stream)
1301   //   service_id: 1 byte key, 4 byte value
1302   //   method_id:  1 byte key, 4 byte value
1303   //   payload:    1 byte key, varint length (remaining space)
1304   //   status:     0 bytes (not set in stream packets)
1305   //
1306   //   TOTAL: 14 bytes + encoded channel_id size + encoded payload length
1307   //
1308   max_size -= 14;
1309   max_size -= varint::EncodedSize(channel_id);
1310   max_size -= varint::EncodedSize(max_size);
1311 
1312   // TODO(frolv): Temporarily add 5 bytes for the new call_id change. The RPC
1313   // overhead calculation will be moved into an RPC helper to avoid having
1314   // pw_transfer depend on RPC internals.
1315   max_size -= 5;
1316 
1317   // Subtract the transfer service overhead for a client write chunk
1318   // (pw_transfer/transfer.proto).
1319   //
1320   //   session_id: 1 byte key, varint value (calculate)
1321   //   offset:     1 byte key, varint value (calculate)
1322   //   data:       1 byte key, varint length (remaining space)
1323   //
1324   //   TOTAL: 3 + encoded session_id + encoded offset + encoded data length
1325   //
1326   // Use a lower bound of a single chunk for the window end offset, as it will
1327   // always be at least in that range.
1328   size_t window_end_offset = std::max(window_end_offset_, max_chunk_size_bytes);
1329   max_size -= 3;
1330   max_size -= varint::EncodedSize(session_id_);
1331   max_size -= varint::EncodedSize(window_end_offset);
1332   max_size -= varint::EncodedSize(max_size);
1333 
1334   // A resulting value of zero (or less) renders write transfers unusable, as
1335   // there is no space to send any payload. This should be considered a
1336   // programmer error in the transfer service setup.
1337   PW_CHECK_INT_GT(
1338       max_size,
1339       0,
1340       "Transfer service maximum chunk size is too small to fit a payload. "
1341       "Increase max_chunk_size_bytes to support write transfers.");
1342 
1343   return max_size;
1344 }
1345 
LogTransferConfiguration()1346 void Context::LogTransferConfiguration() {
1347   PW_LOG_DEBUG(
1348       "Local transfer timing configuration: "
1349       "chunk_timeout=%ums, max_retries=%u, interchunk_delay=%uus",
1350       static_cast<unsigned>(
1351           std::chrono::ceil<std::chrono::milliseconds>(chunk_timeout_).count()),
1352       static_cast<unsigned>(max_retries_),
1353       static_cast<unsigned>(
1354           std::chrono::ceil<std::chrono::microseconds>(interchunk_delay_)
1355               .count()));
1356 
1357   PW_LOG_DEBUG(
1358       "Local transfer windowing configuration: max_window_size_bytes=%u, "
1359       "extend_window_divisor=%u, max_chunk_size_bytes=%u",
1360       static_cast<unsigned>(max_parameters_->max_window_size_bytes()),
1361       static_cast<unsigned>(max_parameters_->extend_window_divisor()),
1362       static_cast<unsigned>(max_parameters_->max_chunk_size_bytes()));
1363 }
1364 
1365 }  // namespace pw::transfer::internal
1366