• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #define PW_LOG_MODULE_NAME "TRN"
16 #define PW_LOG_LEVEL PW_TRANSFER_CONFIG_LOG_LEVEL
17 
18 #include "pw_transfer/internal/context.h"
19 
20 #include <chrono>
21 #include <limits>
22 
23 #include "pw_assert/check.h"
24 #include "pw_chrono/system_clock.h"
25 #include "pw_log/log.h"
26 #include "pw_log/rate_limited.h"
27 #include "pw_protobuf/serialized_size.h"
28 #include "pw_transfer/internal/config.h"
29 #include "pw_transfer/transfer.pwpb.h"
30 #include "pw_transfer/transfer_thread.h"
31 #include "pw_varint/varint.h"
32 
33 namespace pw::transfer::internal {
34 
HandleEvent(const Event & event)35 void Context::HandleEvent(const Event& event) {
36   switch (event.type) {
37     case EventType::kNewClientTransfer:
38     case EventType::kNewServerTransfer: {
39       if (active()) {
40         if (event.type == EventType::kNewServerTransfer &&
41             event.new_transfer.session_id == session_id_ &&
42             last_chunk_sent_ == Chunk::Type::kStartAck) {
43           // The client is retrying its initial chunk as the response may not
44           // have made it back. Re-send the handshake response without going
45           // through handler reinitialization.
46           RetryHandshake();
47           return;
48         }
49         Abort(Status::Aborted());
50       }
51 
52       Initialize(event.new_transfer);
53 
54       if (event.type == EventType::kNewClientTransfer) {
55         InitiateTransferAsClient();
56       } else {
57         if (StartTransferAsServer(event.new_transfer)) {
58           // TODO(frolv): This should probably be restructured.
59           HandleChunkEvent({.context_identifier = event.new_transfer.session_id,
60                             .match_resource_id = false,  // Unused.
61                             .data = event.new_transfer.raw_chunk_data,
62                             .size = event.new_transfer.raw_chunk_size});
63         }
64       }
65       return;
66     }
67 
68     case EventType::kClientChunk:
69     case EventType::kServerChunk:
70       PW_CHECK(initialized());
71       HandleChunkEvent(event.chunk);
72       return;
73 
74     case EventType::kClientTimeout:
75     case EventType::kServerTimeout:
76       HandleTimeout();
77       return;
78 
79     case EventType::kClientEndTransfer:
80     case EventType::kServerEndTransfer:
81       if (active()) {
82         if (event.end_transfer.send_status_chunk) {
83           TerminateTransfer(event.end_transfer.status);
84         } else {
85           Abort(event.end_transfer.status);
86         }
87       }
88       return;
89 
90     case EventType::kSendStatusChunk:
91     case EventType::kAddTransferHandler:
92     case EventType::kRemoveTransferHandler:
93     case EventType::kSetStream:
94     case EventType::kTerminate:
95     case EventType::kUpdateClientTransfer:
96     case EventType::kGetResourceStatus:
97       // These events are intended for the transfer thread and should never be
98       // forwarded through to a context.
99       PW_CRASH("Transfer context received a transfer thread event");
100   }
101 }
102 
InitiateTransferAsClient()103 void Context::InitiateTransferAsClient() {
104   PW_DCHECK(active());
105 
106   SetTimeout(initial_chunk_timeout_);
107 
108   PW_LOG_INFO("Starting transfer for resource %u",
109               static_cast<unsigned>(resource_id_));
110 
111   // Receive transfers should prepare their initial parameters to be send in the
112   // initial chunk.
113   if (type() == TransferType::kReceive) {
114     UpdateTransferParameters(TransmitAction::kBegin);
115   }
116 
117   if (desired_protocol_version_ == ProtocolVersion::kLegacy) {
118     // Legacy transfers go straight into the data transfer phase without a
119     // handshake.
120     if (type() == TransferType::kReceive) {
121       SendTransferParameters(TransmitAction::kBegin);
122     } else {
123       SendInitialLegacyTransmitChunk();
124     }
125 
126     LogTransferConfiguration();
127     return;
128   }
129 
130   // In newer protocol versions, begin the initial transfer handshake.
131   Chunk start_chunk(desired_protocol_version_, Chunk::Type::kStart);
132   start_chunk.set_desired_session_id(session_id_);
133   start_chunk.set_resource_id(resource_id_);
134   start_chunk.set_initial_offset(offset_);
135 
136   if (type() == TransferType::kReceive) {
137     // Parameters should still be set on the initial chunk for backwards
138     // compatibility if the server only supports the legacy protocol.
139     SetTransferParameters(start_chunk);
140   }
141 
142   EncodeAndSendChunk(start_chunk);
143 }
144 
StartTransferAsServer(const NewTransferEvent & new_transfer)145 bool Context::StartTransferAsServer(const NewTransferEvent& new_transfer) {
146   PW_LOG_INFO("Starting %s transfer %u for resource %u with offset %u",
147               new_transfer.type == TransferType::kTransmit ? "read" : "write",
148               static_cast<unsigned>(new_transfer.session_id),
149               static_cast<unsigned>(new_transfer.resource_id),
150               static_cast<unsigned>(new_transfer.initial_offset));
151   LogTransferConfiguration();
152 
153   flags_ |= kFlagsContactMade;
154 
155   if (Status status = new_transfer.handler->Prepare(
156           new_transfer.type, new_transfer.initial_offset);
157       !status.ok()) {
158     PW_LOG_WARN("Transfer handler %u prepare failed with status %u",
159                 static_cast<unsigned>(new_transfer.handler->id()),
160                 status.code());
161 
162     // As this failure occurs at the start of a transfer, no protocol version is
163     // yet negotiated and one must be set to send a response. It is okay to use
164     // the desired version here, as that comes from the client.
165     configured_protocol_version_ = desired_protocol_version_;
166 
167     status = (status.IsPermissionDenied() || status.IsUnimplemented() ||
168               status.IsResourceExhausted())
169                  ? status
170                  : Status::DataLoss();
171     TerminateTransfer(status, /*with_resource_id=*/true);
172     return false;
173   }
174 
175   // Initialize doesn't set the handler since it's specific to server transfers.
176   static_cast<ServerContext&>(*this).set_handler(*new_transfer.handler);
177 
178   // Server transfers use the stream provided by the handler rather than the
179   // stream included in the NewTransferEvent.
180   stream_ = &new_transfer.handler->stream();
181 
182   return true;
183 }
184 
SendInitialLegacyTransmitChunk()185 void Context::SendInitialLegacyTransmitChunk() {
186   // A transmitter begins a transfer by sending the ID of the resource to which
187   // it wishes to write.
188   Chunk chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart);
189   chunk.set_session_id(resource_id_);
190 
191   EncodeAndSendChunk(chunk);
192 }
193 
UpdateTransferParameters(TransmitAction action)194 void Context::UpdateTransferParameters(TransmitAction action) {
195   max_chunk_size_bytes_ = MaxWriteChunkSize(
196       max_parameters_->max_chunk_size_bytes(), rpc_writer_->channel_id());
197   uint32_t window_size = 0;
198 
199   if (max_chunk_size_bytes_ > max_parameters_->max_window_size_bytes()) {
200     window_size =
201         std::min(max_parameters_->max_window_size_bytes(),
202                  static_cast<uint32_t>(writer().ConservativeWriteLimit()));
203   } else {
204     // Adjust the window size based on the latest event in the transfer.
205     switch (action) {
206       case TransmitAction::kBegin:
207       case TransmitAction::kFirstParameters:
208         // A transfer always begins with a window size of one chunk, set during
209         // initialization. No further handling is required.
210         break;
211 
212       case TransmitAction::kExtend:
213         // Window was received successfully without packet loss and should grow.
214         // Double the window size during slow start, or increase it by a single
215         // chunk in congestion avoidance.
216         if (transmit_phase_ == TransmitPhase::kCongestionAvoidance) {
217           window_size_multiplier_ += 1;
218         } else {
219           window_size_multiplier_ *= 2;
220         }
221 
222         // The window size can never exceed the user-specified maximum bytes. If
223         // it does, reduce the multiplier to the largest size that fits.
224         if (window_size_multiplier_ * max_chunk_size_bytes_ >
225             max_parameters_->max_window_size_bytes()) {
226           window_size_multiplier_ =
227               max_parameters_->max_window_size_bytes() / max_chunk_size_bytes_;
228         }
229         break;
230 
231       case TransmitAction::kRetransmit:
232         // A packet was lost: shrink the window size. Additionally, after the
233         // first packet loss, transition from the slow start to the congestion
234         // avoidance phase of the transfer.
235         if (transmit_phase_ == TransmitPhase::kSlowStart) {
236           transmit_phase_ = TransmitPhase::kCongestionAvoidance;
237         }
238         window_size_multiplier_ =
239             std::max(window_size_multiplier_ / static_cast<uint32_t>(2),
240                      static_cast<uint32_t>(1));
241         break;
242     }
243 
244     window_size =
245         std::min({window_size_multiplier_ * max_chunk_size_bytes_,
246                   max_parameters_->max_window_size_bytes(),
247                   static_cast<uint32_t>(writer().ConservativeWriteLimit())});
248   }
249 
250   window_size_ = window_size;
251   window_end_offset_ = offset_ + window_size;
252 }
253 
SetTransferParameters(Chunk & parameters)254 void Context::SetTransferParameters(Chunk& parameters) {
255   parameters.set_window_end_offset(window_end_offset_)
256       .set_max_chunk_size_bytes(max_chunk_size_bytes_)
257       .set_min_delay_microseconds(kDefaultChunkDelayMicroseconds)
258       .set_offset(offset_);
259 }
260 
UpdateAndSendTransferParameters(TransmitAction action)261 void Context::UpdateAndSendTransferParameters(TransmitAction action) {
262   UpdateTransferParameters(action);
263 
264   return SendTransferParameters(action);
265 }
266 
SendTransferParameters(TransmitAction action)267 void Context::SendTransferParameters(TransmitAction action) {
268   Chunk::Type type = Chunk::Type::kParametersRetransmit;
269 
270   switch (action) {
271     case TransmitAction::kBegin:
272       type = Chunk::Type::kStart;
273       break;
274     case TransmitAction::kFirstParameters:
275     case TransmitAction::kRetransmit:
276       type = Chunk::Type::kParametersRetransmit;
277       break;
278     case TransmitAction::kExtend:
279       type = Chunk::Type::kParametersContinue;
280       break;
281   }
282 
283   Chunk parameters(configured_protocol_version_, type);
284   parameters.set_session_id(session_id_);
285   SetTransferParameters(parameters);
286 
287   PW_LOG_EVERY_N_DURATION(
288       PW_LOG_LEVEL_INFO,
289       log_rate_limit_,
290       "Transfer rate: %u B/s",
291       static_cast<unsigned>(transfer_rate_.GetRateBytesPerSecond()));
292 
293   PW_LOG_EVERY_N_DURATION(PW_LOG_LEVEL_INFO,
294                           log_rate_limit_,
295                           "Transfer %u sending transfer parameters: "
296                           "offset=%u, window_end_offset=%u, max_chunk_size=%u",
297                           static_cast<unsigned>(session_id_),
298                           static_cast<unsigned>(offset_),
299                           static_cast<unsigned>(window_end_offset_),
300                           static_cast<unsigned>(max_chunk_size_bytes_));
301 
302   if (log_chunks_before_rate_limit_ > 0) {
303     log_chunks_before_rate_limit_--;
304 
305     if (log_chunks_before_rate_limit_ == 0) {
306       log_rate_limit_ = log_rate_limit_cfg_;
307     }
308   }
309 
310   EncodeAndSendChunk(parameters);
311 }
312 
EncodeAndSendChunk(const Chunk & chunk)313 void Context::EncodeAndSendChunk(const Chunk& chunk) {
314   last_chunk_sent_ = chunk.type();
315 
316 #if PW_TRANSFER_CONFIG_DEBUG_CHUNKS
317   if ((chunk.remaining_bytes().has_value() &&
318        chunk.remaining_bytes().value() == 0) ||
319       (chunk.type() != Chunk::Type::kData &&
320        chunk.type() != Chunk::Type::kParametersContinue)) {
321     chunk.LogChunk(false, pw::chrono::SystemClock::duration::zero());
322   }
323 #endif
324 
325 #if PW_TRANSFER_CONFIG_DEBUG_DATA_CHUNKS
326   if (chunk.type() == Chunk::Type::kData ||
327       chunk.type() == Chunk::Type::kParametersContinue) {
328     chunk.LogChunk(false, log_rate_limit_);
329   }
330 #endif
331 
332   Result<ConstByteSpan> data = chunk.Encode(thread_->encode_buffer());
333   if (!data.ok()) {
334     PW_LOG_ERROR("Failed to encode chunk for transfer %u: %d",
335                  static_cast<unsigned>(chunk.session_id()),
336                  data.status().code());
337     if (active()) {
338       TerminateTransfer(Status::Internal());
339     }
340     return;
341   }
342 
343   if (const Status status = rpc_writer_->Write(*data); !status.ok()) {
344     PW_LOG_ERROR("Failed to write chunk for transfer %u: %d",
345                  static_cast<unsigned>(chunk.session_id()),
346                  status.code());
347     if (active()) {
348       TerminateTransfer(Status::Internal());
349     }
350     return;
351   }
352 }
353 
Initialize(const NewTransferEvent & new_transfer)354 void Context::Initialize(const NewTransferEvent& new_transfer) {
355   PW_DCHECK(!active());
356 
357   PW_DCHECK_INT_NE(new_transfer.protocol_version,
358                    ProtocolVersion::kUnknown,
359                    "Cannot start a transfer with an unknown protocol");
360 
361   session_id_ = new_transfer.session_id;
362   resource_id_ = new_transfer.resource_id;
363   desired_protocol_version_ = new_transfer.protocol_version;
364   configured_protocol_version_ = ProtocolVersion::kUnknown;
365 
366   flags_ = static_cast<uint8_t>(new_transfer.type);
367   transfer_state_ = TransferState::kWaiting;
368   retries_ = 0;
369   max_retries_ = new_transfer.max_retries;
370   lifetime_retries_ = 0;
371   max_lifetime_retries_ = new_transfer.max_lifetime_retries;
372 
373   if (desired_protocol_version_ == ProtocolVersion::kLegacy) {
374     // In a legacy transfer, there is no protocol negotiation stage.
375     // Automatically configure the context to run the legacy protocol and
376     // proceed to waiting for a chunk.
377     configured_protocol_version_ = ProtocolVersion::kLegacy;
378     transfer_state_ = TransferState::kWaiting;
379   } else {
380     transfer_state_ = TransferState::kInitiating;
381   }
382 
383   rpc_writer_ = new_transfer.rpc_writer;
384   stream_ = new_transfer.stream;
385 
386   offset_ = new_transfer.initial_offset;
387   initial_offset_ = new_transfer.initial_offset;
388   window_size_ = 0;
389   window_end_offset_ = 0;
390   max_chunk_size_bytes_ = new_transfer.max_parameters->max_chunk_size_bytes();
391 
392   window_size_multiplier_ = 1;
393   transmit_phase_ = TransmitPhase::kSlowStart;
394 
395   max_parameters_ = new_transfer.max_parameters;
396   thread_ = new_transfer.transfer_thread;
397 
398   last_chunk_sent_ = Chunk::Type::kStart;
399   last_chunk_offset_ = 0;
400   chunk_timeout_ = new_transfer.timeout;
401   initial_chunk_timeout_ = new_transfer.initial_timeout;
402   interchunk_delay_ = chrono::SystemClock::for_at_least(
403       std::chrono::microseconds(kDefaultChunkDelayMicroseconds));
404   next_timeout_ = kNoTimeout;
405   log_chunks_before_rate_limit_ = log_chunks_before_rate_limit_cfg_;
406 
407   transfer_rate_.Reset();
408 }
409 
HandleChunkEvent(const ChunkEvent & event)410 void Context::HandleChunkEvent(const ChunkEvent& event) {
411   Result<Chunk> maybe_chunk =
412       Chunk::Parse(ConstByteSpan(event.data, event.size));
413   if (!maybe_chunk.ok()) {
414     return;
415   }
416 
417   Chunk chunk = *maybe_chunk;
418 
419   // Received some data. Reset the retry counter.
420   retries_ = 0;
421   flags_ |= kFlagsContactMade;
422 
423 #if PW_TRANSFER_CONFIG_DEBUG_CHUNKS
424   if (chunk.type() != Chunk::Type::kData &&
425       chunk.type() != Chunk::Type::kParametersContinue) {
426     chunk.LogChunk(true, pw::chrono::SystemClock::duration::zero());
427   }
428 #endif
429 #if PW_TRANSFER_CONFIG_DEBUG_DATA_CHUNKS
430   if (chunk.type() == Chunk::Type::kData ||
431       chunk.type() == Chunk::Type::kParametersContinue) {
432     chunk.LogChunk(true, log_rate_limit_);
433   }
434 #endif
435 
436   if (chunk.IsTerminatingChunk()) {
437     if (active()) {
438       HandleTermination(chunk.status().value());
439     } else {
440       PW_LOG_INFO("Got final status %d for completed transfer %d",
441                   static_cast<int>(chunk.status().value().code()),
442                   static_cast<int>(session_id_));
443     }
444     return;
445   }
446 
447   if (type() == TransferType::kTransmit) {
448     HandleTransmitChunk(chunk);
449   } else {
450     HandleReceiveChunk(chunk);
451   }
452 }
453 
PerformInitialHandshake(const Chunk & chunk)454 void Context::PerformInitialHandshake(const Chunk& chunk) {
455   switch (chunk.type()) {
456     // Initial packet sent from a client to a server.
457     case Chunk::Type::kStart: {
458       UpdateLocalProtocolConfigurationFromPeer(chunk);
459 
460       if (type() == TransferType::kReceive) {
461         // Update window end offset so it is valid.
462         window_end_offset_ = offset_;
463       }
464 
465       // This cast is safe as we know we're running in a transfer server.
466       uint32_t resource_id = static_cast<ServerContext&>(*this).handler()->id();
467 
468       Chunk start_ack(configured_protocol_version_, Chunk::Type::kStartAck);
469       start_ack.set_session_id(session_id_);
470       start_ack.set_resource_id(resource_id);
471       start_ack.set_initial_offset(offset_);
472 
473       EncodeAndSendChunk(start_ack);
474       break;
475     }
476 
477     // Response packet sent from a server to a client, confirming the protocol
478     // version and session_id of the transfer.
479     case Chunk::Type::kStartAck: {
480       UpdateLocalProtocolConfigurationFromPeer(chunk);
481 
482       // This should confirm the offset we're starting at
483       if (offset_ != chunk.initial_offset()) {
484         TerminateTransfer(Status::Unimplemented());
485         break;
486       }
487 
488       Chunk start_ack_confirmation(configured_protocol_version_,
489                                    Chunk::Type::kStartAckConfirmation);
490       start_ack_confirmation.set_session_id(session_id_);
491 
492       if (type() == TransferType::kReceive) {
493         // In a receive transfer, tag the initial transfer parameters onto the
494         // confirmation chunk so that the server can immediately begin sending
495         // data.
496         UpdateTransferParameters(TransmitAction::kFirstParameters);
497         SetTransferParameters(start_ack_confirmation);
498       }
499 
500       set_transfer_state(TransferState::kWaiting);
501       EncodeAndSendChunk(start_ack_confirmation);
502       break;
503     }
504 
505     // Confirmation sent by a client to a server of the configured transfer
506     // version and session ID. Completes the handshake and begins the actual
507     // data transfer.
508     case Chunk::Type::kStartAckConfirmation: {
509       set_transfer_state(TransferState::kWaiting);
510 
511       if (type() == TransferType::kTransmit) {
512         HandleTransmitChunk(chunk);
513       } else {
514         HandleReceiveChunk(chunk);
515       }
516       break;
517     }
518 
519     // If a non-handshake chunk is received during an INITIATING state, the
520     // transfer peer is running a legacy protocol version, which does not
521     // perform a handshake. End the handshake, revert to the legacy protocol,
522     // and process the chunk appropriately.
523     case Chunk::Type::kData:
524     case Chunk::Type::kParametersRetransmit:
525     case Chunk::Type::kParametersContinue:
526 
527       // Update the local session_id, which will map to the transfer_id of the
528       // legacy chunk.
529       session_id_ = chunk.session_id();
530 
531       configured_protocol_version_ = ProtocolVersion::kLegacy;
532       // Cancel if we are not using at least version 2, and we tried to start a
533       // non-zero offset transfer
534       if (chunk.initial_offset() != 0) {
535         PW_LOG_ERROR("Legacy transfer does not support offset transfers!");
536         TerminateTransfer(Status::Internal());
537         break;
538       }
539 
540       set_transfer_state(TransferState::kWaiting);
541 
542       PW_LOG_DEBUG(
543           "Transfer %u tried to start on protocol version %d, but peer only "
544           "supports legacy",
545           id_for_log(),
546           static_cast<int>(desired_protocol_version_));
547 
548       if (type() == TransferType::kTransmit) {
549         HandleTransmitChunk(chunk);
550       } else {
551         HandleReceiveChunk(chunk);
552       }
553       break;
554 
555     case Chunk::Type::kCompletion:
556     case Chunk::Type::kCompletionAck:
557       PW_CRASH(
558           "Transfer completion packets should be processed by "
559           "HandleChunkEvent()");
560       break;
561   }
562 }
563 
UpdateLocalProtocolConfigurationFromPeer(const Chunk & chunk)564 void Context::UpdateLocalProtocolConfigurationFromPeer(const Chunk& chunk) {
565   PW_LOG_DEBUG("Negotiating protocol version: ours=%d, theirs=%d",
566                static_cast<int>(desired_protocol_version_),
567                static_cast<int>(chunk.protocol_version()));
568 
569   configured_protocol_version_ =
570       std::min(desired_protocol_version_, chunk.protocol_version());
571 
572   PW_LOG_INFO("Transfer %u: using protocol version %d",
573               id_for_log(),
574               static_cast<int>(configured_protocol_version_));
575 }
576 
HandleTransmitChunk(const Chunk & chunk)577 void Context::HandleTransmitChunk(const Chunk& chunk) {
578   switch (transfer_state_) {
579     case TransferState::kInactive:
580     case TransferState::kRecovery:
581       PW_CRASH("Never should handle chunk while inactive");
582 
583     case TransferState::kCompleted:
584       // If the transfer has already completed and another chunk is received,
585       // tell the other end that the transfer is over.
586       //
587       // TODO(frolv): Final status chunks should be ACKed by the other end. When
588       // that is added, this case should be updated to check if the received
589       // chunk is an ACK. If so, the transfer state can be reset to INACTIVE.
590       // Otherwise, the final status should be re-sent.
591       if (!chunk.IsInitialChunk()) {
592         status_ = Status::FailedPrecondition();
593       }
594       SendFinalStatusChunk();
595       return;
596 
597     case TransferState::kInitiating:
598       PerformInitialHandshake(chunk);
599       return;
600 
601     case TransferState::kWaiting:
602     case TransferState::kTransmitting:
603       if (chunk.protocol_version() == configured_protocol_version_) {
604         HandleTransferParametersUpdate(chunk);
605       } else {
606         PW_LOG_ERROR(
607             "Transmit transfer %u was configured to use protocol version %d "
608             "but received a chunk with version %d",
609             id_for_log(),
610             static_cast<int>(configured_protocol_version_),
611             static_cast<int>(chunk.protocol_version()));
612         TerminateTransfer(Status::Internal());
613       }
614       return;
615 
616     case TransferState::kTerminating:
617       HandleTerminatingChunk(chunk);
618       return;
619   }
620 }
621 
HandleTransferParametersUpdate(const Chunk & chunk)622 void Context::HandleTransferParametersUpdate(const Chunk& chunk) {
623   bool retransmit = chunk.RequestsTransmissionFromOffset();
624 
625   if (retransmit) {
626     // If the offsets don't match, attempt to seek on the reader. Not all
627     // readers support seeking; abort with UNIMPLEMENTED if this handler
628     // doesn't.
629     if (offset_ != chunk.offset()) {
630       if (Status seek_status = SeekReader(chunk.offset()); !seek_status.ok()) {
631         PW_LOG_WARN("Transfer %u seek to %u failed with status %u",
632                     static_cast<unsigned>(session_id_),
633                     static_cast<unsigned>(chunk.offset()),
634                     seek_status.code());
635 
636         // Remap status codes to return one of the following:
637         //
638         //   INTERNAL: invalid seek, never should happen
639         //   DATA_LOSS: the reader is in a bad state
640         //   UNIMPLEMENTED: seeking is not supported
641         //
642         if (seek_status.IsOutOfRange()) {
643           seek_status = Status::Internal();
644         } else if (!seek_status.IsUnimplemented()) {
645           seek_status = Status::DataLoss();
646         }
647 
648         TerminateTransfer(seek_status);
649         return;
650       }
651     }
652 
653     offset_ = chunk.offset();
654   }
655 
656   window_end_offset_ = chunk.window_end_offset();
657 
658   if (chunk.max_chunk_size_bytes().has_value()) {
659     max_chunk_size_bytes_ = std::min(chunk.max_chunk_size_bytes().value(),
660                                      max_parameters_->max_chunk_size_bytes());
661   }
662 
663   if (chunk.min_delay_microseconds().has_value()) {
664     interchunk_delay_ = chrono::SystemClock::for_at_least(
665         std::chrono::microseconds(chunk.min_delay_microseconds().value()));
666   }
667 
668   if (retransmit) {
669     PW_LOG_INFO(
670         "Transfer %u received parameters type=RETRANSMIT offset=%u "
671         "window_end_offset=%u",
672         static_cast<unsigned>(session_id_),
673         static_cast<unsigned>(chunk.offset()),
674         static_cast<unsigned>(window_end_offset_));
675   } else {
676     PW_LOG_EVERY_N_DURATION(
677         PW_LOG_LEVEL_INFO,
678         std::chrono::seconds(3),
679         "Transfer %u received parameters type=CONTINUE offset=%u "
680         "window_end_offset=%u",
681         static_cast<unsigned>(session_id_),
682         static_cast<unsigned>(chunk.offset()),
683         static_cast<unsigned>(window_end_offset_));
684   }
685 
686   // Parsed all of the parameters; start sending the window.
687   set_transfer_state(TransferState::kTransmitting);
688 
689   TransmitNextChunk(retransmit);
690 }
691 
TransmitNextChunk(bool retransmit_requested)692 void Context::TransmitNextChunk(bool retransmit_requested) {
693   Chunk chunk(configured_protocol_version_, Chunk::Type::kData);
694   chunk.set_session_id(session_id_);
695   chunk.set_offset(offset_);
696 
697   // Reserve space for the data proto field overhead and use the remainder of
698   // the buffer for the chunk data.
699   size_t reserved_size =
700       chunk.EncodedSize() + 1 /* data key */ + 5 /* data size */;
701 
702   size_t total_size = TransferSizeBytes();
703   if (total_size != std::numeric_limits<size_t>::max()) {
704     reserved_size += protobuf::SizeOfVarintField(
705         pwpb::Chunk::Fields::kRemainingBytes, total_size);
706   }
707 
708   ByteSpan buffer = thread_->encode_buffer();
709   Result<ByteSpan> data;
710 
711   if (offset_ < total_size) {
712     // Read the next chunk of data into the encode buffer.
713     ByteSpan data_buffer = buffer.subspan(reserved_size);
714     size_t max_bytes_to_send =
715         std::min(window_end_offset_ - offset_, max_chunk_size_bytes_);
716 
717     if (max_bytes_to_send < data_buffer.size()) {
718       data_buffer = data_buffer.first(max_bytes_to_send);
719     }
720 
721     data = reader().Read(data_buffer);
722   } else {
723     // The user-specified resource size has been reached: respect it.
724     data = Status::OutOfRange();
725   }
726 
727   if (data.status().IsOutOfRange()) {
728     // No more data to read.
729     chunk.set_remaining_bytes(0);
730     window_end_offset_ = offset_;
731 
732     PW_LOG_INFO("Transfer %u sending final chunk with remaining_bytes=0",
733                 static_cast<unsigned>(session_id_));
734   } else if (data.ok()) {
735     if (offset_ == window_end_offset_) {
736       if (retransmit_requested) {
737         PW_LOG_ERROR(
738             "Transfer %u: received an empty retransmit request, but there is "
739             "still data to send; aborting with RESOURCE_EXHAUSTED",
740             id_for_log());
741         TerminateTransfer(Status::ResourceExhausted());
742       } else {
743         PW_LOG_DEBUG(
744             "Transfer %u: ignoring continuation packet for transfer window "
745             "that has already been sent",
746             id_for_log());
747         SetTimeout(chunk_timeout_);
748       }
749       return;  // No data was requested, so there is nothing else to do.
750     }
751 
752     PW_LOG_DEBUG("Transfer %u sending chunk offset=%u size=%u",
753                  static_cast<unsigned>(session_id_),
754                  static_cast<unsigned>(offset_),
755                  static_cast<unsigned>(data.value().size()));
756 
757     chunk.set_payload(data.value());
758     last_chunk_offset_ = offset_;
759     offset_ += data.value().size();
760 
761     if (total_size != std::numeric_limits<size_t>::max()) {
762       chunk.set_remaining_bytes(total_size - offset_);
763     }
764   } else {
765     PW_LOG_ERROR("Transfer %u Read() failed with status %u",
766                  static_cast<unsigned>(session_id_),
767                  data.status().code());
768     TerminateTransfer(Status::DataLoss());
769     return;
770   }
771 
772   Result<ConstByteSpan> encoded_chunk = chunk.Encode(buffer);
773   if (!encoded_chunk.ok()) {
774     PW_LOG_ERROR("Transfer %u failed to encode transmit chunk",
775                  static_cast<unsigned>(session_id_));
776     TerminateTransfer(Status::Internal());
777     return;
778   }
779 
780   if (const Status status = rpc_writer_->Write(*encoded_chunk); !status.ok()) {
781     PW_LOG_ERROR("Transfer %u failed to send transmit chunk, status %u",
782                  static_cast<unsigned>(session_id_),
783                  status.code());
784     TerminateTransfer(Status::DataLoss());
785     return;
786   }
787 
788   last_chunk_sent_ = chunk.type();
789   flags_ |= kFlagsDataSent;
790 
791   if (offset_ == window_end_offset_ || offset_ == total_size) {
792     // Sent all requested data. Must now wait for next parameters from the
793     // receiver.
794     set_transfer_state(TransferState::kWaiting);
795     SetTimeout(chunk_timeout_);
796   } else {
797     // More data is to be sent. Set a timeout to send the next chunk following
798     // the chunk delay.
799     SetTimeout(chrono::SystemClock::for_at_least(interchunk_delay_));
800   }
801 }
802 
HandleReceiveChunk(const Chunk & chunk)803 void Context::HandleReceiveChunk(const Chunk& chunk) {
804   if (transfer_state_ == TransferState::kInitiating) {
805     PerformInitialHandshake(chunk);
806     return;
807   }
808 
809   if (chunk.protocol_version() != configured_protocol_version_) {
810     PW_LOG_ERROR(
811         "Receive transfer %u was configured to use protocol version %d "
812         "but received a chunk with version %d",
813         id_for_log(),
814         static_cast<int>(configured_protocol_version_),
815         static_cast<int>(chunk.protocol_version()));
816     TerminateTransfer(Status::Internal());
817     return;
818   }
819 
820   switch (transfer_state_) {
821     case TransferState::kInactive:
822     case TransferState::kTransmitting:
823     case TransferState::kInitiating:
824       PW_CRASH("HandleReceiveChunk() called in bad transfer state %d",
825                static_cast<int>(transfer_state_));
826 
827     case TransferState::kCompleted:
828       // If the transfer has already completed and another chunk is received,
829       // re-send the final status chunk.
830       //
831       // TODO(frolv): Final status chunks should be ACKed by the other end. When
832       // that is added, this case should be updated to check if the received
833       // chunk is an ACK. If so, the transfer state can be reset to INACTIVE.
834       // Otherwise, the final status should be re-sent.
835       SendFinalStatusChunk();
836       return;
837 
838     case TransferState::kRecovery:
839       if (chunk.offset() != offset_) {
840         if (last_chunk_offset_ == chunk.offset()) {
841           PW_LOG_DEBUG(
842               "Transfer %u received repeated offset %u; retry detected, "
843               "resending transfer parameters",
844               static_cast<unsigned>(session_id_),
845               static_cast<unsigned>(chunk.offset()));
846 
847           log_chunks_before_rate_limit_ = log_chunks_before_rate_limit_cfg_;
848           log_rate_limit_ = kNoRateLimit;
849 
850           UpdateAndSendTransferParameters(TransmitAction::kRetransmit);
851           if (DataTransferComplete()) {
852             return;
853           }
854           PW_LOG_DEBUG("Transfer %u waiting for offset %u, ignoring %u",
855                        static_cast<unsigned>(session_id_),
856                        static_cast<unsigned>(offset_),
857                        static_cast<unsigned>(chunk.offset()));
858         }
859 
860         last_chunk_offset_ = chunk.offset();
861         SetTimeout(chunk_timeout_);
862         return;
863       }
864 
865       PW_LOG_DEBUG("Transfer %u received expected offset %u, resuming transfer",
866                    static_cast<unsigned>(session_id_),
867                    static_cast<unsigned>(offset_));
868       set_transfer_state(TransferState::kWaiting);
869 
870       // The correct chunk was received; process it normally.
871       [[fallthrough]];
872     case TransferState::kWaiting:
873       HandleReceivedData(chunk);
874       return;
875 
876     case TransferState::kTerminating:
877       HandleTerminatingChunk(chunk);
878       return;
879   }
880 }
881 
HandleReceivedData(const Chunk & chunk)882 void Context::HandleReceivedData(const Chunk& chunk) {
883   if (chunk.offset() != offset_) {
884     // Bad offset; reset window size to send another parameters chunk.
885     PW_LOG_DEBUG(
886         "Transfer %u expected offset %u, received %u; entering recovery "
887         "state",
888         static_cast<unsigned>(session_id_),
889         static_cast<unsigned>(offset_),
890         static_cast<unsigned>(chunk.offset()));
891 
892     set_transfer_state(TransferState::kRecovery);
893     SetTimeout(chunk_timeout_);
894 
895     UpdateAndSendTransferParameters(TransmitAction::kRetransmit);
896     return;
897   }
898 
899   if (chunk.offset() + chunk.payload().size() > window_end_offset_) {
900     PW_LOG_WARN(
901         "Transfer %u received more data than what was requested (%u received "
902         "for %u pending); attempting to recover.",
903         id_for_log(),
904         static_cast<unsigned>(chunk.payload().size()),
905         static_cast<unsigned>(window_end_offset_ - offset_));
906 
907     // To prevent an improperly implemented client which doesn't respect
908     // window_end_offset from entering an infinite retry loop, limit recovery
909     // attempts to the lifetime retry count.
910     lifetime_retries_++;
911     if (lifetime_retries_ <= max_lifetime_retries_) {
912       set_transfer_state(TransferState::kRecovery);
913       SetTimeout(chunk_timeout_);
914 
915       UpdateAndSendTransferParameters(TransmitAction::kRetransmit);
916     } else {
917       TerminateTransfer(Status::Internal());
918     }
919     return;
920   }
921 
922   // Update the last offset seen so that retries can be detected.
923   last_chunk_offset_ = chunk.offset();
924 
925   // Write staged data from the buffer to the stream.
926   if (chunk.has_payload()) {
927     if (Status status = writer().Write(chunk.payload()); !status.ok()) {
928       PW_LOG_ERROR(
929           "Transfer %u write of %u B chunk failed with status %u; aborting "
930           "with DATA_LOSS",
931           static_cast<unsigned>(session_id_),
932           static_cast<unsigned>(chunk.payload().size()),
933           status.code());
934       TerminateTransfer(Status::DataLoss());
935       return;
936     }
937 
938     transfer_rate_.Update(chunk.payload().size());
939   }
940 
941   // Update the transfer state.
942   offset_ += chunk.payload().size();
943 
944   // When the client sets remaining_bytes to 0, it indicates completion of the
945   // transfer. Acknowledge the completion through a status chunk and clean up.
946   if (chunk.IsFinalTransmitChunk()) {
947     TerminateTransfer(OkStatus());
948     return;
949   }
950 
951   if (chunk.window_end_offset() != 0) {
952     if (chunk.window_end_offset() < offset_) {
953       PW_LOG_ERROR(
954           "Transfer %u got invalid end offset of %u (current offset %u)",
955           id_for_log(),
956           static_cast<unsigned>(chunk.window_end_offset()),
957           static_cast<unsigned>(offset_));
958       TerminateTransfer(Status::Internal());
959       return;
960     }
961 
962     if (chunk.window_end_offset() > window_end_offset_) {
963       // A transmitter should never send a larger end offset than what the
964       // receiver has advertised. If this occurs, there is a bug in the
965       // transmitter implementation. Terminate the transfer.
966       PW_LOG_ERROR(
967           "Transfer %u transmitter sent invalid end offset of %u, "
968           "greater than receiver offset %u",
969           id_for_log(),
970           static_cast<unsigned>(chunk.window_end_offset()),
971           static_cast<unsigned>(window_end_offset_));
972       TerminateTransfer(Status::Internal());
973       return;
974     }
975 
976     window_end_offset_ = chunk.window_end_offset();
977   }
978 
979   SetTimeout(chunk_timeout_);
980 
981   if (chunk.type() == Chunk::Type::kStartAckConfirmation) {
982     // Send the first parameters in the receive transfer.
983     UpdateAndSendTransferParameters(TransmitAction::kFirstParameters);
984     return;
985   }
986 
987   if (offset_ == window_end_offset_) {
988     // Received all pending data. Advance the transfer parameters.
989     UpdateAndSendTransferParameters(TransmitAction::kExtend);
990     return;
991   }
992 
993   // Once the transmitter has sent a sufficient amount of data, try to extend
994   // the window to allow it to continue sending data without blocking.
995   uint32_t remaining_window_size = window_end_offset_ - offset_;
996   bool extend_window = remaining_window_size <=
997                        window_size_ / max_parameters_->extend_window_divisor();
998 
999   if (extend_window) {
1000     UpdateAndSendTransferParameters(TransmitAction::kExtend);
1001   }
1002 }
1003 
HandleTerminatingChunk(const Chunk & chunk)1004 void Context::HandleTerminatingChunk(const Chunk& chunk) {
1005   switch (chunk.type()) {
1006     case Chunk::Type::kCompletion:
1007       PW_CRASH("Completion chunks should be processed by HandleChunkEvent()");
1008 
1009     case Chunk::Type::kCompletionAck:
1010       PW_LOG_INFO(
1011           "Transfer %u completed with status %u", id_for_log(), status_.code());
1012       set_transfer_state(TransferState::kInactive);
1013       break;
1014 
1015     case Chunk::Type::kData:
1016     case Chunk::Type::kStart:
1017     case Chunk::Type::kParametersRetransmit:
1018     case Chunk::Type::kParametersContinue:
1019     case Chunk::Type::kStartAck:
1020     case Chunk::Type::kStartAckConfirmation:
1021       // If a non-completion chunk is received in a TERMINATING state, re-send
1022       // the transfer's completion chunk to the peer.
1023       EncodeAndSendChunk(
1024           Chunk::Final(configured_protocol_version_, session_id_, status_));
1025       break;
1026   }
1027 }
1028 
TerminateTransfer(Status status,bool with_resource_id)1029 void Context::TerminateTransfer(Status status, bool with_resource_id) {
1030   if (transfer_state_ == TransferState::kTerminating ||
1031       transfer_state_ == TransferState::kCompleted) {
1032     // Transfer has already been terminated; no need to do it again.
1033     return;
1034   }
1035 
1036   Finish(status);
1037 
1038   PW_LOG_INFO("Transfer %u terminating with status: %u, offset: %u",
1039               static_cast<unsigned>(session_id_),
1040               status.code(),
1041               static_cast<unsigned>(offset_));
1042 
1043   if (ShouldSkipCompletionHandshake()) {
1044     set_transfer_state(TransferState::kCompleted);
1045   } else {
1046     set_transfer_state(TransferState::kTerminating);
1047     SetTimeout(chunk_timeout_);
1048   }
1049 
1050   // Don't send a final chunk if the other end of the transfer has not yet
1051   // made contact, as there is no one to notify.
1052   if ((flags_ & kFlagsContactMade) == kFlagsContactMade) {
1053     SendFinalStatusChunk(with_resource_id);
1054   }
1055 }
1056 
HandleTermination(Status status)1057 void Context::HandleTermination(Status status) {
1058   Finish(status);
1059 
1060   PW_LOG_INFO("Transfer %u completed with status %u",
1061               static_cast<unsigned>(session_id_),
1062               status.code());
1063 
1064   if (ShouldSkipCompletionHandshake()) {
1065     set_transfer_state(TransferState::kCompleted);
1066   } else {
1067     EncodeAndSendChunk(
1068         Chunk(configured_protocol_version_, Chunk::Type::kCompletionAck)
1069             .set_session_id(session_id_));
1070 
1071     set_transfer_state(TransferState::kInactive);
1072   }
1073 }
1074 
SendFinalStatusChunk(bool with_resource_id)1075 void Context::SendFinalStatusChunk(bool with_resource_id) {
1076   PW_DCHECK(transfer_state_ == TransferState::kCompleted ||
1077             transfer_state_ == TransferState::kTerminating);
1078 
1079   PW_LOG_INFO("Sending final chunk for transfer %u with status %u",
1080               static_cast<unsigned>(session_id_),
1081               status_.code());
1082 
1083   Chunk chunk =
1084       Chunk::Final(configured_protocol_version_, session_id_, status_);
1085   if (with_resource_id) {
1086     chunk.set_resource_id(resource_id_);
1087   }
1088   EncodeAndSendChunk(chunk);
1089 }
1090 
Finish(Status status)1091 void Context::Finish(Status status) {
1092   PW_DCHECK(active());
1093 
1094   status.Update(FinalCleanup(status));
1095   status_ = status;
1096 
1097   SetTimeout(kFinalChunkAckTimeout);
1098 }
1099 
SetTimeout(chrono::SystemClock::duration timeout)1100 void Context::SetTimeout(chrono::SystemClock::duration timeout) {
1101   next_timeout_ = chrono::SystemClock::TimePointAfterAtLeast(timeout);
1102 }
1103 
HandleTimeout()1104 void Context::HandleTimeout() {
1105   ClearTimeout();
1106 
1107   switch (transfer_state_) {
1108     case TransferState::kCompleted:
1109       // A timeout occurring in a completed state indicates that the other side
1110       // never ACKed the final status packet. Reset the context to inactive.
1111       set_transfer_state(TransferState::kInactive);
1112       return;
1113 
1114     case TransferState::kTransmitting:
1115       // A timeout occurring in a TRANSMITTING state indicates that the transfer
1116       // has waited for its inter-chunk delay and should transmit its next
1117       // chunk.
1118       TransmitNextChunk(/*retransmit_requested=*/false);
1119       break;
1120 
1121     case TransferState::kInitiating:
1122     case TransferState::kWaiting:
1123     case TransferState::kRecovery:
1124     case TransferState::kTerminating:
1125       // A timeout occurring in a transfer or handshake state indicates that no
1126       // chunk has been received from the other side. The transfer should retry
1127       // its previous operation.
1128       //
1129       // The timeout is set immediately. Retry() will clear it if it fails.
1130       if (transfer_state_ == TransferState::kInitiating &&
1131           last_chunk_sent_ == Chunk::Type::kStart) {
1132         SetTimeout(initial_chunk_timeout_);
1133       } else {
1134         SetTimeout(chunk_timeout_);
1135       }
1136       Retry();
1137       break;
1138 
1139     case TransferState::kInactive:
1140       PW_LOG_ERROR("Timeout occurred in INACTIVE state");
1141       return;
1142   }
1143 }
1144 
Retry()1145 void Context::Retry() {
1146   if (retries_ == max_retries_ || lifetime_retries_ == max_lifetime_retries_) {
1147     PW_LOG_ERROR(
1148         "Transfer %u failed to receive a chunk after %u retries (lifetime %u).",
1149         id_for_log(),
1150         static_cast<unsigned>(retries_),
1151         static_cast<unsigned>(lifetime_retries_));
1152     PW_LOG_ERROR("Canceling transfer.");
1153 
1154     if (transfer_state_ == TransferState::kTerminating) {
1155       // Timeouts occurring in a TERMINATING state indicate that the completion
1156       // chunk was never ACKed. Simply clean up the transfer context.
1157       set_transfer_state(TransferState::kInactive);
1158     } else {
1159       TerminateTransfer(Status::DeadlineExceeded());
1160     }
1161     return;
1162   }
1163 
1164   ++retries_;
1165   ++lifetime_retries_;
1166 
1167   if (transfer_state_ == TransferState::kInitiating ||
1168       last_chunk_sent_ == Chunk::Type::kStartAckConfirmation) {
1169     RetryHandshake();
1170     return;
1171   }
1172 
1173   if (transfer_state_ == TransferState::kTerminating) {
1174     EncodeAndSendChunk(
1175         Chunk::Final(configured_protocol_version_, session_id_, status_));
1176     return;
1177   }
1178 
1179   if (type() == TransferType::kReceive) {
1180     // Resend the most recent transfer parameters.
1181     PW_LOG_DEBUG(
1182         "Receive transfer %u timed out waiting for chunk; resending parameters",
1183         static_cast<unsigned>(session_id_));
1184 
1185     UpdateAndSendTransferParameters(TransmitAction::kRetransmit);
1186     return;
1187   }
1188 
1189   // In a transmit, if a data chunk has not yet been sent, the initial transfer
1190   // parameters did not arrive from the receiver. Resend the initial chunk.
1191   if ((flags_ & kFlagsDataSent) != kFlagsDataSent) {
1192     PW_LOG_DEBUG(
1193         "Transmit transfer %u timed out waiting for initial parameters",
1194         static_cast<unsigned>(session_id_));
1195     SendInitialLegacyTransmitChunk();
1196     return;
1197   }
1198 
1199   // Otherwise, resend the most recent chunk. If the reader doesn't support
1200   // seeking, this isn't possible, so just terminate the transfer immediately.
1201   if (!SeekReader(last_chunk_offset_).ok()) {
1202     PW_LOG_ERROR("Transmit transfer %u timed out waiting for new parameters.",
1203                  id_for_log());
1204     PW_LOG_ERROR("Retrying requires a seekable reader. Alas, ours is not.");
1205     TerminateTransfer(Status::DeadlineExceeded());
1206     return;
1207   }
1208 
1209   // Rewind the transfer position and resend the chunk.
1210   offset_ = last_chunk_offset_;
1211 
1212   TransmitNextChunk(/*retransmit_requested=*/false);
1213 }
1214 
RetryHandshake()1215 void Context::RetryHandshake() {
1216   Chunk retry_chunk(configured_protocol_version_, last_chunk_sent_);
1217 
1218   switch (last_chunk_sent_) {
1219     case Chunk::Type::kStart:
1220       // No protocol version is yet configured at the time of sending the start
1221       // chunk, so we use the client's desired version instead.
1222       retry_chunk.set_protocol_version(desired_protocol_version_)
1223           .set_desired_session_id(session_id_)
1224           .set_resource_id(resource_id_)
1225           .set_initial_offset(offset_);
1226       if (type() == TransferType::kReceive) {
1227         SetTransferParameters(retry_chunk);
1228       }
1229       break;
1230 
1231     case Chunk::Type::kStartAck:
1232       retry_chunk.set_session_id(session_id_)
1233           .set_resource_id(static_cast<ServerContext&>(*this).handler()->id());
1234       break;
1235 
1236     case Chunk::Type::kStartAckConfirmation:
1237       retry_chunk.set_session_id(session_id_);
1238       if (type() == TransferType::kReceive) {
1239         SetTransferParameters(retry_chunk);
1240       }
1241       break;
1242 
1243     case Chunk::Type::kData:
1244     case Chunk::Type::kParametersRetransmit:
1245     case Chunk::Type::kParametersContinue:
1246     case Chunk::Type::kCompletion:
1247     case Chunk::Type::kCompletionAck:
1248       PW_CRASH("Should not RetryHandshake() when not in handshake phase");
1249   }
1250 
1251   EncodeAndSendChunk(retry_chunk);
1252 }
1253 
MaxWriteChunkSize(uint32_t max_chunk_size_bytes,uint32_t channel_id) const1254 uint32_t Context::MaxWriteChunkSize(uint32_t max_chunk_size_bytes,
1255                                     uint32_t channel_id) const {
1256   // Start with the user-provided maximum chunk size, which should be the usable
1257   // payload length on the RPC ingress path after any transport overhead.
1258   ptrdiff_t max_size = max_chunk_size_bytes;
1259 
1260   // Subtract the RPC overhead (pw_rpc/internal/packet.proto).
1261   //
1262   //   type:       1 byte key, 1 byte value (CLIENT_STREAM)
1263   //   channel_id: 1 byte key, varint value (calculate from stream)
1264   //   service_id: 1 byte key, 4 byte value
1265   //   method_id:  1 byte key, 4 byte value
1266   //   payload:    1 byte key, varint length (remaining space)
1267   //   status:     0 bytes (not set in stream packets)
1268   //
1269   //   TOTAL: 14 bytes + encoded channel_id size + encoded payload length
1270   //
1271   max_size -= 14;
1272   max_size -= varint::EncodedSize(channel_id);
1273   max_size -= varint::EncodedSize(max_size);
1274 
1275   // TODO(frolv): Temporarily add 5 bytes for the new call_id change. The RPC
1276   // overhead calculation will be moved into an RPC helper to avoid having
1277   // pw_transfer depend on RPC internals.
1278   max_size -= 5;
1279 
1280   // Subtract the transfer service overhead for a client write chunk
1281   // (pw_transfer/transfer.proto).
1282   //
1283   //   session_id: 1 byte key, varint value (calculate)
1284   //   offset:     1 byte key, varint value (calculate)
1285   //   data:       1 byte key, varint length (remaining space)
1286   //
1287   //   TOTAL: 3 + encoded session_id + encoded offset + encoded data length
1288   //
1289   // Use a lower bound of a single chunk for the window end offset, as it will
1290   // always be at least in that range.
1291   size_t window_end_offset = std::max(window_end_offset_, max_chunk_size_bytes);
1292   max_size -= 3;
1293   max_size -= varint::EncodedSize(session_id_);
1294   max_size -= varint::EncodedSize(window_end_offset);
1295   max_size -= varint::EncodedSize(max_size);
1296 
1297   // A resulting value of zero (or less) renders write transfers unusable, as
1298   // there is no space to send any payload. This should be considered a
1299   // programmer error in the transfer service setup.
1300   PW_CHECK_INT_GT(
1301       max_size,
1302       0,
1303       "Transfer service maximum chunk size is too small to fit a payload. "
1304       "Increase max_chunk_size_bytes to support write transfers.");
1305 
1306   return max_size;
1307 }
1308 
LogTransferConfiguration()1309 void Context::LogTransferConfiguration() {
1310   PW_LOG_DEBUG(
1311       "Local transfer timing configuration: "
1312       "chunk_timeout=%ums, max_retries=%u, interchunk_delay=%uus",
1313       static_cast<unsigned>(
1314           std::chrono::ceil<std::chrono::milliseconds>(chunk_timeout_).count()),
1315       static_cast<unsigned>(max_retries_),
1316       static_cast<unsigned>(
1317           std::chrono::ceil<std::chrono::microseconds>(interchunk_delay_)
1318               .count()));
1319 
1320   PW_LOG_DEBUG(
1321       "Local transfer windowing configuration: max_window_size_bytes=%u, "
1322       "extend_window_divisor=%u, max_chunk_size_bytes=%u",
1323       static_cast<unsigned>(max_parameters_->max_window_size_bytes()),
1324       static_cast<unsigned>(max_parameters_->extend_window_divisor()),
1325       static_cast<unsigned>(max_parameters_->max_chunk_size_bytes()));
1326 }
1327 
1328 }  // namespace pw::transfer::internal
1329