• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #define PW_LOG_MODULE_NAME "TRN"
16 
17 #include "pw_transfer/internal/context.h"
18 
19 #include <chrono>
20 
21 #include "pw_assert/check.h"
22 #include "pw_chrono/system_clock.h"
23 #include "pw_log/log.h"
24 #include "pw_status/try.h"
25 #include "pw_transfer/transfer.pwpb.h"
26 #include "pw_transfer/transfer_thread.h"
27 #include "pw_varint/varint.h"
28 
29 namespace pw::transfer::internal {
30 
HandleEvent(const Event & event)31 void Context::HandleEvent(const Event& event) {
32   switch (event.type) {
33     case EventType::kNewClientTransfer:
34     case EventType::kNewServerTransfer: {
35       if (active()) {
36         Abort(Status::Aborted());
37       }
38 
39       Initialize(event.new_transfer);
40 
41       if (event.type == EventType::kNewClientTransfer) {
42         InitiateTransferAsClient();
43       } else {
44         if (StartTransferAsServer(event.new_transfer)) {
45           // TODO(frolv): This should probably be restructured.
46           HandleChunkEvent({.context_identifier = event.new_transfer.session_id,
47                             .match_resource_id = false,  // Unused.
48                             .data = event.new_transfer.raw_chunk_data,
49                             .size = event.new_transfer.raw_chunk_size});
50         }
51       }
52       return;
53     }
54 
55     case EventType::kClientChunk:
56     case EventType::kServerChunk:
57       PW_CHECK(initialized());
58       HandleChunkEvent(event.chunk);
59       return;
60 
61     case EventType::kClientTimeout:
62     case EventType::kServerTimeout:
63       HandleTimeout();
64       return;
65 
66     case EventType::kClientEndTransfer:
67     case EventType::kServerEndTransfer:
68       if (active()) {
69         if (event.end_transfer.send_status_chunk) {
70           TerminateTransfer(event.end_transfer.status);
71         } else {
72           Abort(event.end_transfer.status);
73         }
74       }
75       return;
76 
77     case EventType::kSendStatusChunk:
78     case EventType::kAddTransferHandler:
79     case EventType::kRemoveTransferHandler:
80     case EventType::kTerminate:
81       // These events are intended for the transfer thread and should never be
82       // forwarded through to a context.
83       PW_CRASH("Transfer context received a transfer thread event");
84   }
85 }
86 
InitiateTransferAsClient()87 void Context::InitiateTransferAsClient() {
88   PW_DCHECK(active());
89 
90   SetTimeout(chunk_timeout_);
91 
92   PW_LOG_INFO("Starting transfer for resource %u",
93               static_cast<unsigned>(resource_id_));
94 
95   // Receive transfers should prepare their initial parameters to be send in the
96   // initial chunk.
97   if (type() == TransferType::kReceive) {
98     UpdateTransferParameters();
99   }
100 
101   if (desired_protocol_version_ == ProtocolVersion::kLegacy) {
102     // Legacy transfers go straight into the data transfer phase without a
103     // handshake.
104     if (type() == TransferType::kReceive) {
105       SendTransferParameters(TransmitAction::kBegin);
106     } else {
107       SendInitialTransmitChunk();
108     }
109 
110     LogTransferConfiguration();
111     return;
112   }
113 
114   // In newer protocol versions, begin the initial transfer handshake.
115   Chunk start_chunk(desired_protocol_version_, Chunk::Type::kStart);
116   start_chunk.set_resource_id(resource_id_);
117 
118   if (type() == TransferType::kReceive) {
119     // Parameters should still be set on the initial chunk for backwards
120     // compatibility if the server only supports the legacy protocol.
121     SetTransferParameters(start_chunk);
122   }
123 
124   EncodeAndSendChunk(start_chunk);
125 }
126 
StartTransferAsServer(const NewTransferEvent & new_transfer)127 bool Context::StartTransferAsServer(const NewTransferEvent& new_transfer) {
128   PW_LOG_INFO("Starting %s transfer %u for resource %u",
129               new_transfer.type == TransferType::kTransmit ? "read" : "write",
130               static_cast<unsigned>(new_transfer.session_id),
131               static_cast<unsigned>(new_transfer.resource_id));
132   LogTransferConfiguration();
133 
134   flags_ |= kFlagsContactMade;
135 
136   if (Status status = new_transfer.handler->Prepare(new_transfer.type);
137       !status.ok()) {
138     PW_LOG_WARN("Transfer handler %u prepare failed with status %u",
139                 static_cast<unsigned>(new_transfer.handler->id()),
140                 status.code());
141 
142     // As this failure occurs at the start of a transfer, no protocol version is
143     // yet negotiated and one must be set to send a response. It is okay to use
144     // the desired version here, as that comes from the client.
145     configured_protocol_version_ = desired_protocol_version_;
146 
147     status = status.IsPermissionDenied() ? status : Status::DataLoss();
148     TerminateTransfer(status, /*with_resource_id=*/true);
149     return false;
150   }
151 
152   // Initialize doesn't set the handler since it's specific to server transfers.
153   static_cast<ServerContext&>(*this).set_handler(*new_transfer.handler);
154 
155   // Server transfers use the stream provided by the handler rather than the
156   // stream included in the NewTransferEvent.
157   stream_ = &new_transfer.handler->stream();
158 
159   return true;
160 }
161 
SendInitialTransmitChunk()162 void Context::SendInitialTransmitChunk() {
163   // A transmitter begins a transfer by sending the ID of the resource to which
164   // it wishes to write.
165   Chunk chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart);
166   chunk.set_session_id(session_id_);
167 
168   EncodeAndSendChunk(chunk);
169 }
170 
UpdateTransferParameters()171 void Context::UpdateTransferParameters() {
172   size_t pending_bytes =
173       std::min(max_parameters_->pending_bytes(),
174                static_cast<uint32_t>(writer().ConservativeWriteLimit()));
175 
176   window_size_ = pending_bytes;
177   window_end_offset_ = offset_ + pending_bytes;
178 
179   max_chunk_size_bytes_ = MaxWriteChunkSize(
180       max_parameters_->max_chunk_size_bytes(), rpc_writer_->channel_id());
181 }
182 
SetTransferParameters(Chunk & parameters)183 void Context::SetTransferParameters(Chunk& parameters) {
184   parameters.set_window_end_offset(window_end_offset_)
185       .set_max_chunk_size_bytes(max_chunk_size_bytes_)
186       .set_min_delay_microseconds(kDefaultChunkDelayMicroseconds)
187       .set_offset(offset_);
188 }
189 
UpdateAndSendTransferParameters(TransmitAction action)190 void Context::UpdateAndSendTransferParameters(TransmitAction action) {
191   UpdateTransferParameters();
192 
193   PW_LOG_INFO("Transfer rate: %u B/s",
194               static_cast<unsigned>(transfer_rate_.GetRateBytesPerSecond()));
195 
196   return SendTransferParameters(action);
197 }
198 
SendTransferParameters(TransmitAction action)199 void Context::SendTransferParameters(TransmitAction action) {
200   Chunk::Type type = Chunk::Type::kParametersRetransmit;
201 
202   switch (action) {
203     case TransmitAction::kBegin:
204       type = Chunk::Type::kStart;
205       break;
206     case TransmitAction::kRetransmit:
207       type = Chunk::Type::kParametersRetransmit;
208       break;
209     case TransmitAction::kExtend:
210       type = Chunk::Type::kParametersContinue;
211       break;
212   }
213 
214   Chunk parameters(configured_protocol_version_, type);
215   parameters.set_session_id(session_id_);
216   SetTransferParameters(parameters);
217 
218   PW_LOG_DEBUG(
219       "Transfer %u sending transfer parameters: "
220       "offset=%u, window_end_offset=%u, max_chunk_size=%u",
221       static_cast<unsigned>(session_id_),
222       static_cast<unsigned>(offset_),
223       static_cast<unsigned>(window_end_offset_),
224       static_cast<unsigned>(max_chunk_size_bytes_));
225 
226   EncodeAndSendChunk(parameters);
227 }
228 
EncodeAndSendChunk(const Chunk & chunk)229 void Context::EncodeAndSendChunk(const Chunk& chunk) {
230   last_chunk_sent_ = chunk.type();
231 
232   Result<ConstByteSpan> data = chunk.Encode(thread_->encode_buffer());
233   if (!data.ok()) {
234     PW_LOG_ERROR("Failed to encode chunk for transfer %u: %d",
235                  static_cast<unsigned>(chunk.session_id()),
236                  data.status().code());
237     if (active()) {
238       TerminateTransfer(Status::Internal());
239     }
240     return;
241   }
242 
243   if (const Status status = rpc_writer_->Write(*data); !status.ok()) {
244     PW_LOG_ERROR("Failed to write chunk for transfer %u: %d",
245                  static_cast<unsigned>(chunk.session_id()),
246                  status.code());
247     if (active()) {
248       TerminateTransfer(Status::Internal());
249     }
250     return;
251   }
252 }
253 
Initialize(const NewTransferEvent & new_transfer)254 void Context::Initialize(const NewTransferEvent& new_transfer) {
255   PW_DCHECK(!active());
256 
257   PW_DCHECK_INT_NE(new_transfer.protocol_version,
258                    ProtocolVersion::kUnknown,
259                    "Cannot start a transfer with an unknown protocol");
260 
261   session_id_ = new_transfer.session_id;
262   resource_id_ = new_transfer.resource_id;
263   desired_protocol_version_ = new_transfer.protocol_version;
264   configured_protocol_version_ = ProtocolVersion::kUnknown;
265 
266   flags_ = static_cast<uint8_t>(new_transfer.type);
267   transfer_state_ = TransferState::kWaiting;
268   retries_ = 0;
269   max_retries_ = new_transfer.max_retries;
270   lifetime_retries_ = 0;
271   max_lifetime_retries_ = new_transfer.max_lifetime_retries;
272 
273   if (desired_protocol_version_ == ProtocolVersion::kLegacy) {
274     // In a legacy transfer, there is no protocol negotiation stage.
275     // Automatically configure the context to run the legacy protocol and
276     // proceed to waiting for a chunk.
277     configured_protocol_version_ = ProtocolVersion::kLegacy;
278     transfer_state_ = TransferState::kWaiting;
279   } else {
280     transfer_state_ = TransferState::kInitiating;
281   }
282 
283   rpc_writer_ = new_transfer.rpc_writer;
284   stream_ = new_transfer.stream;
285 
286   offset_ = 0;
287   window_size_ = 0;
288   window_end_offset_ = 0;
289   max_chunk_size_bytes_ = new_transfer.max_parameters->max_chunk_size_bytes();
290 
291   max_parameters_ = new_transfer.max_parameters;
292   thread_ = new_transfer.transfer_thread;
293 
294   last_chunk_sent_ = Chunk::Type::kStart;
295   last_chunk_offset_ = 0;
296   chunk_timeout_ = new_transfer.timeout;
297   interchunk_delay_ = chrono::SystemClock::for_at_least(
298       std::chrono::microseconds(kDefaultChunkDelayMicroseconds));
299   next_timeout_ = kNoTimeout;
300 
301   transfer_rate_.Reset();
302 }
303 
HandleChunkEvent(const ChunkEvent & event)304 void Context::HandleChunkEvent(const ChunkEvent& event) {
305   Result<Chunk> maybe_chunk =
306       Chunk::Parse(ConstByteSpan(event.data, event.size));
307   if (!maybe_chunk.ok()) {
308     return;
309   }
310 
311   Chunk chunk = *maybe_chunk;
312 
313   // Received some data. Reset the retry counter.
314   retries_ = 0;
315   flags_ |= kFlagsContactMade;
316 
317   if (chunk.IsTerminatingChunk()) {
318     if (active()) {
319       HandleTermination(chunk.status().value());
320     } else {
321       PW_LOG_DEBUG("Got final status %d for completed transfer %d",
322                    static_cast<int>(chunk.status().value().code()),
323                    static_cast<int>(session_id_));
324     }
325     return;
326   }
327 
328   if (type() == TransferType::kTransmit) {
329     HandleTransmitChunk(chunk);
330   } else {
331     HandleReceiveChunk(chunk);
332   }
333 }
334 
PerformInitialHandshake(const Chunk & chunk)335 void Context::PerformInitialHandshake(const Chunk& chunk) {
336   switch (chunk.type()) {
337     // Initial packet sent from a client to a server.
338     case Chunk::Type::kStart: {
339       UpdateLocalProtocolConfigurationFromPeer(chunk);
340 
341       // This cast is safe as we know we're running in a transfer server.
342       uint32_t resource_id = static_cast<ServerContext&>(*this).handler()->id();
343 
344       Chunk start_ack(configured_protocol_version_, Chunk::Type::kStartAck);
345       start_ack.set_session_id(session_id_).set_resource_id(resource_id);
346 
347       EncodeAndSendChunk(start_ack);
348       break;
349     }
350 
351     // Response packet sent from a server to a client. Contains the assigned
352     // session_id of the transfer.
353     case Chunk::Type::kStartAck: {
354       UpdateLocalProtocolConfigurationFromPeer(chunk);
355 
356       // Accept the assigned session_id and tell the server that the transfer
357       // can begin.
358       session_id_ = chunk.session_id();
359       PW_LOG_DEBUG("Transfer for resource %u was assigned session ID %u",
360                    static_cast<unsigned>(resource_id_),
361                    static_cast<unsigned>(session_id_));
362 
363       Chunk start_ack_confirmation(configured_protocol_version_,
364                                    Chunk::Type::kStartAckConfirmation);
365       start_ack_confirmation.set_session_id(session_id_);
366 
367       if (type() == TransferType::kReceive) {
368         // In a receive transfer, tag the initial transfer parameters onto the
369         // confirmation chunk so that the server can immediately begin sending
370         // data.
371         UpdateTransferParameters();
372         SetTransferParameters(start_ack_confirmation);
373       }
374 
375       set_transfer_state(TransferState::kWaiting);
376       EncodeAndSendChunk(start_ack_confirmation);
377       break;
378     }
379 
380     // Confirmation sent by a client to a server of the configured transfer
381     // version and session ID. Completes the handshake and begins the actual
382     // data transfer.
383     case Chunk::Type::kStartAckConfirmation: {
384       set_transfer_state(TransferState::kWaiting);
385 
386       if (type() == TransferType::kTransmit) {
387         HandleTransmitChunk(chunk);
388       } else {
389         HandleReceiveChunk(chunk);
390       }
391       break;
392     }
393 
394     // If a non-handshake chunk is received during an INITIATING state, the
395     // transfer peer is running a legacy protocol version, which does not
396     // perform a handshake. End the handshake, revert to the legacy protocol,
397     // and process the chunk appropriately.
398     case Chunk::Type::kData:
399     case Chunk::Type::kParametersRetransmit:
400     case Chunk::Type::kParametersContinue:
401       // Update the local context's session ID in case it was expecting one to
402       // be assigned by the server.
403       session_id_ = chunk.session_id();
404 
405       configured_protocol_version_ = ProtocolVersion::kLegacy;
406       set_transfer_state(TransferState::kWaiting);
407 
408       PW_LOG_DEBUG(
409           "Transfer %u tried to start on protocol version %d, but peer only "
410           "supports legacy",
411           id_for_log(),
412           static_cast<int>(desired_protocol_version_));
413 
414       if (type() == TransferType::kTransmit) {
415         HandleTransmitChunk(chunk);
416       } else {
417         HandleReceiveChunk(chunk);
418       }
419       break;
420 
421     case Chunk::Type::kCompletion:
422     case Chunk::Type::kCompletionAck:
423       PW_CRASH(
424           "Transfer completion packets should be processed by "
425           "HandleChunkEvent()");
426       break;
427   }
428 }
429 
UpdateLocalProtocolConfigurationFromPeer(const Chunk & chunk)430 void Context::UpdateLocalProtocolConfigurationFromPeer(const Chunk& chunk) {
431   PW_LOG_DEBUG("Negotiating protocol version: ours=%d, theirs=%d",
432                static_cast<int>(desired_protocol_version_),
433                static_cast<int>(chunk.protocol_version()));
434 
435   configured_protocol_version_ =
436       std::min(desired_protocol_version_, chunk.protocol_version());
437 
438   PW_LOG_INFO("Transfer %u: using protocol version %d",
439               id_for_log(),
440               static_cast<int>(configured_protocol_version_));
441 }
442 
HandleTransmitChunk(const Chunk & chunk)443 void Context::HandleTransmitChunk(const Chunk& chunk) {
444   switch (transfer_state_) {
445     case TransferState::kInactive:
446     case TransferState::kRecovery:
447       PW_CRASH("Never should handle chunk while inactive");
448 
449     case TransferState::kCompleted:
450       // If the transfer has already completed and another chunk is received,
451       // tell the other end that the transfer is over.
452       //
453       // TODO(frolv): Final status chunks should be ACKed by the other end. When
454       // that is added, this case should be updated to check if the received
455       // chunk is an ACK. If so, the transfer state can be reset to INACTIVE.
456       // Otherwise, the final status should be re-sent.
457       if (!chunk.IsInitialChunk()) {
458         status_ = Status::FailedPrecondition();
459       }
460       SendFinalStatusChunk();
461       return;
462 
463     case TransferState::kInitiating:
464       PerformInitialHandshake(chunk);
465       return;
466 
467     case TransferState::kWaiting:
468     case TransferState::kTransmitting:
469       if (chunk.protocol_version() == configured_protocol_version_) {
470         HandleTransferParametersUpdate(chunk);
471       } else {
472         PW_LOG_ERROR(
473             "Transmit transfer %u was configured to use protocol version %d "
474             "but received a chunk with version %d",
475             id_for_log(),
476             static_cast<int>(configured_protocol_version_),
477             static_cast<int>(chunk.protocol_version()));
478         TerminateTransfer(Status::Internal());
479       }
480       return;
481 
482     case TransferState::kTerminating:
483       HandleTerminatingChunk(chunk);
484       return;
485   }
486 }
487 
HandleTransferParametersUpdate(const Chunk & chunk)488 void Context::HandleTransferParametersUpdate(const Chunk& chunk) {
489   bool retransmit = chunk.RequestsTransmissionFromOffset();
490 
491   if (retransmit) {
492     // If the offsets don't match, attempt to seek on the reader. Not all
493     // readers support seeking; abort with UNIMPLEMENTED if this handler
494     // doesn't.
495     if (offset_ != chunk.offset()) {
496       if (Status seek_status = reader().Seek(chunk.offset());
497           !seek_status.ok()) {
498         PW_LOG_WARN("Transfer %u seek to %u failed with status %u",
499                     static_cast<unsigned>(session_id_),
500                     static_cast<unsigned>(chunk.offset()),
501                     seek_status.code());
502 
503         // Remap status codes to return one of the following:
504         //
505         //   INTERNAL: invalid seek, never should happen
506         //   DATA_LOSS: the reader is in a bad state
507         //   UNIMPLEMENTED: seeking is not supported
508         //
509         if (seek_status.IsOutOfRange()) {
510           seek_status = Status::Internal();
511         } else if (!seek_status.IsUnimplemented()) {
512           seek_status = Status::DataLoss();
513         }
514 
515         TerminateTransfer(seek_status);
516         return;
517       }
518     }
519 
520     offset_ = chunk.offset();
521   }
522 
523   window_end_offset_ = chunk.window_end_offset();
524 
525   if (chunk.max_chunk_size_bytes().has_value()) {
526     max_chunk_size_bytes_ = std::min(chunk.max_chunk_size_bytes().value(),
527                                      max_parameters_->max_chunk_size_bytes());
528   }
529 
530   if (chunk.min_delay_microseconds().has_value()) {
531     interchunk_delay_ = chrono::SystemClock::for_at_least(
532         std::chrono::microseconds(chunk.min_delay_microseconds().value()));
533   }
534 
535   PW_LOG_DEBUG(
536       "Transfer %u received parameters type=%s offset=%u window_end_offset=%u",
537       static_cast<unsigned>(session_id_),
538       retransmit ? "RETRANSMIT" : "CONTINUE",
539       static_cast<unsigned>(chunk.offset()),
540       static_cast<unsigned>(window_end_offset_));
541 
542   // Parsed all of the parameters; start sending the window.
543   set_transfer_state(TransferState::kTransmitting);
544 
545   TransmitNextChunk(retransmit);
546 }
547 
TransmitNextChunk(bool retransmit_requested)548 void Context::TransmitNextChunk(bool retransmit_requested) {
549   Chunk chunk(configured_protocol_version_, Chunk::Type::kData);
550   chunk.set_session_id(session_id_);
551   chunk.set_offset(offset_);
552 
553   // Reserve space for the data proto field overhead and use the remainder of
554   // the buffer for the chunk data.
555   size_t reserved_size =
556       chunk.EncodedSize() + 1 /* data key */ + 5 /* data size */;
557 
558   ByteSpan buffer = thread_->encode_buffer();
559 
560   ByteSpan data_buffer = buffer.subspan(reserved_size);
561   size_t max_bytes_to_send =
562       std::min(window_end_offset_ - offset_, max_chunk_size_bytes_);
563 
564   if (max_bytes_to_send < data_buffer.size()) {
565     data_buffer = data_buffer.first(max_bytes_to_send);
566   }
567 
568   Result<ByteSpan> data = reader().Read(data_buffer);
569   if (data.status().IsOutOfRange()) {
570     // No more data to read.
571     chunk.set_remaining_bytes(0);
572     window_end_offset_ = offset_;
573 
574     PW_LOG_DEBUG("Transfer %u sending final chunk with remaining_bytes=0",
575                  static_cast<unsigned>(session_id_));
576   } else if (data.ok()) {
577     if (offset_ == window_end_offset_) {
578       if (retransmit_requested) {
579         PW_LOG_DEBUG(
580             "Transfer %u: received an empty retransmit request, but there is "
581             "still data to send; aborting with RESOURCE_EXHAUSTED",
582             id_for_log());
583         TerminateTransfer(Status::ResourceExhausted());
584       } else {
585         PW_LOG_DEBUG(
586             "Transfer %u: ignoring continuation packet for transfer window "
587             "that has already been sent",
588             id_for_log());
589         SetTimeout(chunk_timeout_);
590       }
591       return;  // No data was requested, so there is nothing else to do.
592     }
593 
594     PW_LOG_DEBUG("Transfer %u sending chunk offset=%u size=%u",
595                  static_cast<unsigned>(session_id_),
596                  static_cast<unsigned>(offset_),
597                  static_cast<unsigned>(data.value().size()));
598 
599     chunk.set_payload(data.value());
600     last_chunk_offset_ = offset_;
601     offset_ += data.value().size();
602   } else {
603     PW_LOG_ERROR("Transfer %u Read() failed with status %u",
604                  static_cast<unsigned>(session_id_),
605                  data.status().code());
606     TerminateTransfer(Status::DataLoss());
607     return;
608   }
609 
610   Result<ConstByteSpan> encoded_chunk = chunk.Encode(buffer);
611   if (!encoded_chunk.ok()) {
612     PW_LOG_ERROR("Transfer %u failed to encode transmit chunk",
613                  static_cast<unsigned>(session_id_));
614     TerminateTransfer(Status::Internal());
615     return;
616   }
617 
618   if (const Status status = rpc_writer_->Write(*encoded_chunk); !status.ok()) {
619     PW_LOG_ERROR("Transfer %u failed to send transmit chunk, status %u",
620                  static_cast<unsigned>(session_id_),
621                  status.code());
622     TerminateTransfer(Status::DataLoss());
623     return;
624   }
625 
626   last_chunk_sent_ = chunk.type();
627   flags_ |= kFlagsDataSent;
628 
629   if (offset_ == window_end_offset_) {
630     // Sent all requested data. Must now wait for next parameters from the
631     // receiver.
632     set_transfer_state(TransferState::kWaiting);
633     SetTimeout(chunk_timeout_);
634   } else {
635     // More data is to be sent. Set a timeout to send the next chunk following
636     // the chunk delay.
637     SetTimeout(chrono::SystemClock::for_at_least(interchunk_delay_));
638   }
639 }
640 
HandleReceiveChunk(const Chunk & chunk)641 void Context::HandleReceiveChunk(const Chunk& chunk) {
642   if (transfer_state_ == TransferState::kInitiating) {
643     PerformInitialHandshake(chunk);
644     return;
645   }
646 
647   if (chunk.protocol_version() != configured_protocol_version_) {
648     PW_LOG_ERROR(
649         "Receive transfer %u was configured to use protocol version %d "
650         "but received a chunk with version %d",
651         id_for_log(),
652         static_cast<int>(configured_protocol_version_),
653         static_cast<int>(chunk.protocol_version()));
654     TerminateTransfer(Status::Internal());
655     return;
656   }
657 
658   switch (transfer_state_) {
659     case TransferState::kInactive:
660     case TransferState::kTransmitting:
661     case TransferState::kInitiating:
662       PW_CRASH("HandleReceiveChunk() called in bad transfer state %d",
663                static_cast<int>(transfer_state_));
664 
665     case TransferState::kCompleted:
666       // If the transfer has already completed and another chunk is received,
667       // re-send the final status chunk.
668       //
669       // TODO(frolv): Final status chunks should be ACKed by the other end. When
670       // that is added, this case should be updated to check if the received
671       // chunk is an ACK. If so, the transfer state can be reset to INACTIVE.
672       // Otherwise, the final status should be re-sent.
673       SendFinalStatusChunk();
674       return;
675 
676     case TransferState::kRecovery:
677       if (chunk.offset() != offset_) {
678         if (last_chunk_offset_ == chunk.offset()) {
679           PW_LOG_DEBUG(
680               "Transfer %u received repeated offset %u; retry detected, "
681               "resending transfer parameters",
682               static_cast<unsigned>(session_id_),
683               static_cast<unsigned>(chunk.offset()));
684 
685           UpdateAndSendTransferParameters(TransmitAction::kRetransmit);
686           if (DataTransferComplete()) {
687             return;
688           }
689           PW_LOG_DEBUG("Transfer %u waiting for offset %u, ignoring %u",
690                        static_cast<unsigned>(session_id_),
691                        static_cast<unsigned>(offset_),
692                        static_cast<unsigned>(chunk.offset()));
693         }
694 
695         last_chunk_offset_ = chunk.offset();
696         SetTimeout(chunk_timeout_);
697         return;
698       }
699 
700       PW_LOG_DEBUG("Transfer %u received expected offset %u, resuming transfer",
701                    static_cast<unsigned>(session_id_),
702                    static_cast<unsigned>(offset_));
703       set_transfer_state(TransferState::kWaiting);
704 
705       // The correct chunk was received; process it normally.
706       [[fallthrough]];
707     case TransferState::kWaiting:
708       HandleReceivedData(chunk);
709       return;
710 
711     case TransferState::kTerminating:
712       HandleTerminatingChunk(chunk);
713       return;
714   }
715 }
716 
HandleReceivedData(const Chunk & chunk)717 void Context::HandleReceivedData(const Chunk& chunk) {
718   if (chunk.offset() != offset_) {
719     // Bad offset; reset pending_bytes to send another parameters chunk.
720     PW_LOG_DEBUG(
721         "Transfer %u expected offset %u, received %u; entering recovery state",
722         static_cast<unsigned>(session_id_),
723         static_cast<unsigned>(offset_),
724         static_cast<unsigned>(chunk.offset()));
725 
726     set_transfer_state(TransferState::kRecovery);
727     SetTimeout(chunk_timeout_);
728 
729     UpdateAndSendTransferParameters(TransmitAction::kRetransmit);
730     return;
731   }
732 
733   if (chunk.offset() + chunk.payload().size() > window_end_offset_) {
734     // End the transfer, as this indicates a bug with the client implementation
735     // where it doesn't respect pending_bytes. Trying to recover from here
736     // could potentially result in an infinite transfer loop.
737     PW_LOG_ERROR(
738         "Transfer %u received more data than what was requested (%u received "
739         "for %u pending); terminating transfer.",
740         id_for_log(),
741         static_cast<unsigned>(chunk.payload().size()),
742         static_cast<unsigned>(window_end_offset_ - offset_));
743     TerminateTransfer(Status::Internal());
744     return;
745   }
746 
747   // Update the last offset seen so that retries can be detected.
748   last_chunk_offset_ = chunk.offset();
749 
750   // Write staged data from the buffer to the stream.
751   if (chunk.has_payload()) {
752     if (Status status = writer().Write(chunk.payload()); !status.ok()) {
753       PW_LOG_ERROR(
754           "Transfer %u write of %u B chunk failed with status %u; aborting "
755           "with DATA_LOSS",
756           static_cast<unsigned>(session_id_),
757           static_cast<unsigned>(chunk.payload().size()),
758           status.code());
759       TerminateTransfer(Status::DataLoss());
760       return;
761     }
762 
763     transfer_rate_.Update(chunk.payload().size());
764   }
765 
766   // When the client sets remaining_bytes to 0, it indicates completion of the
767   // transfer. Acknowledge the completion through a status chunk and clean up.
768   if (chunk.IsFinalTransmitChunk()) {
769     TerminateTransfer(OkStatus());
770     return;
771   }
772 
773   // Update the transfer state.
774   offset_ += chunk.payload().size();
775 
776   if (chunk.window_end_offset() != 0) {
777     if (chunk.window_end_offset() < offset_) {
778       PW_LOG_ERROR(
779           "Transfer %u got invalid end offset of %u (current offset %u)",
780           id_for_log(),
781           static_cast<unsigned>(chunk.window_end_offset()),
782           static_cast<unsigned>(offset_));
783       TerminateTransfer(Status::Internal());
784       return;
785     }
786 
787     if (chunk.window_end_offset() > window_end_offset_) {
788       // A transmitter should never send a larger end offset than what the
789       // receiver has advertised. If this occurs, there is a bug in the
790       // transmitter implementation. Terminate the transfer.
791       PW_LOG_ERROR(
792           "Transfer %u transmitter sent invalid end offset of %u, "
793           "greater than receiver offset %u",
794           id_for_log(),
795           static_cast<unsigned>(chunk.window_end_offset()),
796           static_cast<unsigned>(window_end_offset_));
797       TerminateTransfer(Status::Internal());
798       return;
799     }
800 
801     window_end_offset_ = chunk.window_end_offset();
802   }
803 
804   SetTimeout(chunk_timeout_);
805 
806   if (offset_ == window_end_offset_) {
807     // Received all pending data. Advance the transfer parameters.
808     UpdateAndSendTransferParameters(TransmitAction::kRetransmit);
809     return;
810   }
811 
812   // Once the transmitter has sent a sufficient amount of data, try to extend
813   // the window to allow it to continue sending data without blocking.
814   uint32_t remaining_window_size = window_end_offset_ - offset_;
815   bool extend_window = remaining_window_size <=
816                        window_size_ / max_parameters_->extend_window_divisor();
817 
818   if (extend_window) {
819     UpdateAndSendTransferParameters(TransmitAction::kExtend);
820     return;
821   }
822 }
823 
HandleTerminatingChunk(const Chunk & chunk)824 void Context::HandleTerminatingChunk(const Chunk& chunk) {
825   switch (chunk.type()) {
826     case Chunk::Type::kCompletion:
827       PW_CRASH("Completion chunks should be processed by HandleChunkEvent()");
828 
829     case Chunk::Type::kCompletionAck:
830       PW_LOG_INFO(
831           "Transfer %u completed with status %u", id_for_log(), status_.code());
832       set_transfer_state(TransferState::kInactive);
833       break;
834 
835     case Chunk::Type::kData:
836     case Chunk::Type::kStart:
837     case Chunk::Type::kParametersRetransmit:
838     case Chunk::Type::kParametersContinue:
839     case Chunk::Type::kStartAck:
840     case Chunk::Type::kStartAckConfirmation:
841       // If a non-completion chunk is received in a TERMINATING state, re-send
842       // the transfer's completion chunk to the peer.
843       EncodeAndSendChunk(
844           Chunk::Final(configured_protocol_version_, session_id_, status_));
845       break;
846   }
847 }
848 
TerminateTransfer(Status status,bool with_resource_id)849 void Context::TerminateTransfer(Status status, bool with_resource_id) {
850   if (transfer_state_ == TransferState::kTerminating ||
851       transfer_state_ == TransferState::kCompleted) {
852     // Transfer has already been terminated; no need to do it again.
853     return;
854   }
855 
856   Finish(status);
857 
858   PW_LOG_INFO("Transfer %u terminating with status %u",
859               static_cast<unsigned>(session_id_),
860               status.code());
861 
862   if (ShouldSkipCompletionHandshake()) {
863     set_transfer_state(TransferState::kCompleted);
864   } else {
865     set_transfer_state(TransferState::kTerminating);
866     SetTimeout(chunk_timeout_);
867   }
868 
869   // Don't send a final chunk if the other end of the transfer has not yet
870   // made contact, as there is no one to notify.
871   if ((flags_ & kFlagsContactMade) == kFlagsContactMade) {
872     SendFinalStatusChunk(with_resource_id);
873   }
874 }
875 
HandleTermination(Status status)876 void Context::HandleTermination(Status status) {
877   Finish(status);
878 
879   PW_LOG_INFO("Transfer %u completed with status %u",
880               static_cast<unsigned>(session_id_),
881               status.code());
882 
883   if (ShouldSkipCompletionHandshake()) {
884     set_transfer_state(TransferState::kCompleted);
885   } else {
886     EncodeAndSendChunk(
887         Chunk(configured_protocol_version_, Chunk::Type::kCompletionAck)
888             .set_session_id(session_id_));
889 
890     set_transfer_state(TransferState::kInactive);
891   }
892 }
893 
SendFinalStatusChunk(bool with_resource_id)894 void Context::SendFinalStatusChunk(bool with_resource_id) {
895   PW_DCHECK(transfer_state_ == TransferState::kCompleted ||
896             transfer_state_ == TransferState::kTerminating);
897 
898   PW_LOG_DEBUG("Sending final chunk for transfer %u with status %u",
899                static_cast<unsigned>(session_id_),
900                status_.code());
901 
902   Chunk chunk =
903       Chunk::Final(configured_protocol_version_, session_id_, status_);
904   if (with_resource_id) {
905     chunk.set_resource_id(resource_id_);
906   }
907   EncodeAndSendChunk(chunk);
908 }
909 
Finish(Status status)910 void Context::Finish(Status status) {
911   PW_DCHECK(active());
912 
913   status.Update(FinalCleanup(status));
914   status_ = status;
915 
916   SetTimeout(kFinalChunkAckTimeout);
917 }
918 
SetTimeout(chrono::SystemClock::duration timeout)919 void Context::SetTimeout(chrono::SystemClock::duration timeout) {
920   next_timeout_ = chrono::SystemClock::TimePointAfterAtLeast(timeout);
921 }
922 
HandleTimeout()923 void Context::HandleTimeout() {
924   ClearTimeout();
925 
926   switch (transfer_state_) {
927     case TransferState::kCompleted:
928       // A timeout occurring in a completed state indicates that the other side
929       // never ACKed the final status packet. Reset the context to inactive.
930       set_transfer_state(TransferState::kInactive);
931       return;
932 
933     case TransferState::kTransmitting:
934       // A timeout occurring in a TRANSMITTING state indicates that the transfer
935       // has waited for its inter-chunk delay and should transmit its next
936       // chunk.
937       TransmitNextChunk(/*retransmit_requested=*/false);
938       break;
939 
940     case TransferState::kInitiating:
941     case TransferState::kWaiting:
942     case TransferState::kRecovery:
943     case TransferState::kTerminating:
944       // A timeout occurring in a transfer or handshake state indicates that no
945       // chunk has been received from the other side. The transfer should retry
946       // its previous operation.
947       SetTimeout(chunk_timeout_);  // Retry() clears the timeout if it fails
948       Retry();
949       break;
950 
951     case TransferState::kInactive:
952       PW_LOG_ERROR("Timeout occurred in INACTIVE state");
953       return;
954   }
955 }
956 
Retry()957 void Context::Retry() {
958   if (retries_ == max_retries_ || lifetime_retries_ == max_lifetime_retries_) {
959     PW_LOG_ERROR(
960         "Transfer %u failed to receive a chunk after %u retries (lifetime %u).",
961         id_for_log(),
962         static_cast<unsigned>(retries_),
963         static_cast<unsigned>(lifetime_retries_));
964     PW_LOG_ERROR("Canceling transfer.");
965 
966     if (transfer_state_ == TransferState::kTerminating) {
967       // Timeouts occurring in a TERMINATING state indicate that the completion
968       // chunk was never ACKed. Simply clean up the transfer context.
969       set_transfer_state(TransferState::kInactive);
970     } else {
971       TerminateTransfer(Status::DeadlineExceeded());
972     }
973     return;
974   }
975 
976   ++retries_;
977   ++lifetime_retries_;
978 
979   if (transfer_state_ == TransferState::kInitiating ||
980       last_chunk_sent_ == Chunk::Type::kStartAckConfirmation) {
981     RetryHandshake();
982     return;
983   }
984 
985   if (transfer_state_ == TransferState::kTerminating) {
986     EncodeAndSendChunk(
987         Chunk::Final(configured_protocol_version_, session_id_, status_));
988     return;
989   }
990 
991   if (type() == TransferType::kReceive) {
992     // Resend the most recent transfer parameters.
993     PW_LOG_DEBUG(
994         "Receive transfer %u timed out waiting for chunk; resending parameters",
995         static_cast<unsigned>(session_id_));
996 
997     SendTransferParameters(TransmitAction::kRetransmit);
998     return;
999   }
1000 
1001   // In a transmit, if a data chunk has not yet been sent, the initial transfer
1002   // parameters did not arrive from the receiver. Resend the initial chunk.
1003   if ((flags_ & kFlagsDataSent) != kFlagsDataSent) {
1004     PW_LOG_DEBUG(
1005         "Transmit transfer %u timed out waiting for initial parameters",
1006         static_cast<unsigned>(session_id_));
1007     SendInitialTransmitChunk();
1008     return;
1009   }
1010 
1011   // Otherwise, resend the most recent chunk. If the reader doesn't support
1012   // seeking, this isn't possible, so just terminate the transfer immediately.
1013   if (!reader().Seek(last_chunk_offset_).ok()) {
1014     PW_LOG_ERROR("Transmit transfer %u timed out waiting for new parameters.",
1015                  id_for_log());
1016     PW_LOG_ERROR("Retrying requires a seekable reader. Alas, ours is not.");
1017     TerminateTransfer(Status::DeadlineExceeded());
1018     return;
1019   }
1020 
1021   // Rewind the transfer position and resend the chunk.
1022   offset_ = last_chunk_offset_;
1023 
1024   TransmitNextChunk(/*retransmit_requested=*/false);
1025 }
1026 
RetryHandshake()1027 void Context::RetryHandshake() {
1028   Chunk retry_chunk(configured_protocol_version_, last_chunk_sent_);
1029 
1030   switch (last_chunk_sent_) {
1031     case Chunk::Type::kStart:
1032       // No protocol version is yet configured at the time of sending the start
1033       // chunk, so we use the client's desired version instead.
1034       retry_chunk.set_protocol_version(desired_protocol_version_)
1035           .set_resource_id(resource_id_);
1036       if (type() == TransferType::kReceive) {
1037         SetTransferParameters(retry_chunk);
1038       }
1039       break;
1040 
1041     case Chunk::Type::kStartAck:
1042       retry_chunk.set_session_id(session_id_)
1043           .set_resource_id(static_cast<ServerContext&>(*this).handler()->id());
1044       break;
1045 
1046     case Chunk::Type::kStartAckConfirmation:
1047       retry_chunk.set_session_id(session_id_);
1048       if (type() == TransferType::kReceive) {
1049         SetTransferParameters(retry_chunk);
1050       }
1051       break;
1052 
1053     case Chunk::Type::kData:
1054     case Chunk::Type::kParametersRetransmit:
1055     case Chunk::Type::kParametersContinue:
1056     case Chunk::Type::kCompletion:
1057     case Chunk::Type::kCompletionAck:
1058       PW_CRASH("Should not RetryHandshake() when not in handshake phase");
1059   }
1060 
1061   EncodeAndSendChunk(retry_chunk);
1062 }
1063 
MaxWriteChunkSize(uint32_t max_chunk_size_bytes,uint32_t channel_id) const1064 uint32_t Context::MaxWriteChunkSize(uint32_t max_chunk_size_bytes,
1065                                     uint32_t channel_id) const {
1066   // Start with the user-provided maximum chunk size, which should be the usable
1067   // payload length on the RPC ingress path after any transport overhead.
1068   ptrdiff_t max_size = max_chunk_size_bytes;
1069 
1070   // Subtract the RPC overhead (pw_rpc/internal/packet.proto).
1071   //
1072   //   type:       1 byte key, 1 byte value (CLIENT_STREAM)
1073   //   channel_id: 1 byte key, varint value (calculate from stream)
1074   //   service_id: 1 byte key, 4 byte value
1075   //   method_id:  1 byte key, 4 byte value
1076   //   payload:    1 byte key, varint length (remaining space)
1077   //   status:     0 bytes (not set in stream packets)
1078   //
1079   //   TOTAL: 14 bytes + encoded channel_id size + encoded payload length
1080   //
1081   max_size -= 14;
1082   max_size -= varint::EncodedSize(channel_id);
1083   max_size -= varint::EncodedSize(max_size);
1084 
1085   // TODO(frolv): Temporarily add 5 bytes for the new call_id change. The RPC
1086   // overhead calculation will be moved into an RPC helper to avoid having
1087   // pw_transfer depend on RPC internals.
1088   max_size -= 5;
1089 
1090   // Subtract the transfer service overhead for a client write chunk
1091   // (pw_transfer/transfer.proto).
1092   //
1093   //   session_id: 1 byte key, varint value (calculate)
1094   //   offset:     1 byte key, varint value (calculate)
1095   //   data:       1 byte key, varint length (remaining space)
1096   //
1097   //   TOTAL: 3 + encoded session_id + encoded offset + encoded data length
1098   //
1099   max_size -= 3;
1100   max_size -= varint::EncodedSize(session_id_);
1101   max_size -= varint::EncodedSize(window_end_offset_);
1102   max_size -= varint::EncodedSize(max_size);
1103 
1104   // A resulting value of zero (or less) renders write transfers unusable, as
1105   // there is no space to send any payload. This should be considered a
1106   // programmer error in the transfer service setup.
1107   PW_CHECK_INT_GT(
1108       max_size,
1109       0,
1110       "Transfer service maximum chunk size is too small to fit a payload. "
1111       "Increase max_chunk_size_bytes to support write transfers.");
1112 
1113   return max_size;
1114 }
1115 
LogTransferConfiguration()1116 void Context::LogTransferConfiguration() {
1117   PW_LOG_DEBUG(
1118       "Local transfer timing configuration: "
1119       "chunk_timeout=%ums, max_retries=%u, interchunk_delay=%uus",
1120       static_cast<unsigned>(
1121           std::chrono::ceil<std::chrono::milliseconds>(chunk_timeout_).count()),
1122       static_cast<unsigned>(max_retries_),
1123       static_cast<unsigned>(
1124           std::chrono::ceil<std::chrono::microseconds>(interchunk_delay_)
1125               .count()));
1126 
1127   PW_LOG_DEBUG(
1128       "Local transfer windowing configuration: "
1129       "pending_bytes=%u, extend_window_divisor=%u, max_chunk_size_bytes=%u",
1130       static_cast<unsigned>(max_parameters_->pending_bytes()),
1131       static_cast<unsigned>(max_parameters_->extend_window_divisor()),
1132       static_cast<unsigned>(max_parameters_->max_chunk_size_bytes()));
1133 }
1134 
1135 }  // namespace pw::transfer::internal
1136