1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #define PW_LOG_MODULE_NAME "TRN"
16
17 #include "pw_transfer/internal/context.h"
18
19 #include <chrono>
20 #include <mutex>
21
22 #include "pw_assert/check.h"
23 #include "pw_chrono/system_clock.h"
24 #include "pw_log/log.h"
25 #include "pw_status/try.h"
26 #include "pw_transfer/transfer.pwpb.h"
27 #include "pw_transfer/transfer_thread.h"
28 #include "pw_varint/varint.h"
29
30 PW_MODIFY_DIAGNOSTICS_PUSH();
31 PW_MODIFY_DIAGNOSTIC(ignored, "-Wmissing-field-initializers");
32
33 namespace pw::transfer::internal {
34
HandleEvent(const Event & event)35 void Context::HandleEvent(const Event& event) {
36 switch (event.type) {
37 case EventType::kNewClientTransfer:
38 case EventType::kNewServerTransfer: {
39 if (active()) {
40 Finish(Status::Aborted());
41 }
42
43 Initialize(event.new_transfer);
44
45 if (event.type == EventType::kNewClientTransfer) {
46 InitiateTransferAsClient();
47 } else {
48 StartTransferAsServer(event.new_transfer);
49 }
50 return;
51 }
52
53 case EventType::kClientChunk:
54 case EventType::kServerChunk:
55 PW_CHECK(initialized());
56 HandleChunkEvent(event.chunk);
57 return;
58
59 case EventType::kClientTimeout:
60 case EventType::kServerTimeout:
61 HandleTimeout();
62 return;
63
64 case EventType::kSendStatusChunk:
65 case EventType::kSetTransferStream:
66 case EventType::kAddTransferHandler:
67 case EventType::kRemoveTransferHandler:
68 case EventType::kTerminate:
69 // These events are intended for the transfer thread and should never be
70 // forwarded through to a context.
71 PW_CRASH("Transfer context received a transfer thread event");
72 }
73 }
74
InitiateTransferAsClient()75 void Context::InitiateTransferAsClient() {
76 PW_DCHECK(active());
77
78 SetTimeout(chunk_timeout_);
79
80 if (type() == TransferType::kReceive) {
81 // A receiver begins a new transfer with a parameters chunk telling the
82 // transmitter what to send.
83 UpdateAndSendTransferParameters(TransmitAction::kBegin);
84 } else {
85 SendInitialTransmitChunk();
86 }
87
88 // Don't send an error packet. If the transfer failed to start, then there's
89 // nothing to tell the server about.
90 }
91
StartTransferAsServer(const NewTransferEvent & new_transfer)92 void Context::StartTransferAsServer(const NewTransferEvent& new_transfer) {
93 PW_LOG_INFO("Starting transfer %u with handler %u",
94 static_cast<unsigned>(new_transfer.transfer_id),
95 static_cast<unsigned>(new_transfer.handler_id));
96
97 if (const Status status = new_transfer.handler->Prepare(new_transfer.type);
98 !status.ok()) {
99 PW_LOG_WARN("Transfer handler %u prepare failed with status %u",
100 static_cast<unsigned>(new_transfer.handler->id()),
101 status.code());
102 Finish(status.IsPermissionDenied() ? status : Status::DataLoss());
103 // Do not send the final status packet here! On the server, a start event
104 // will immediately be followed by the server chunk event. Sending the final
105 // chunk will be handled then.
106 return;
107 }
108
109 // Initialize doesn't set the handler since it's specific to server transfers.
110 static_cast<ServerContext&>(*this).set_handler(*new_transfer.handler);
111
112 // Server transfers use the stream provided by the handler rather than the
113 // stream included in the NewTransferEvent.
114 stream_ = &new_transfer.handler->stream();
115 }
116
SendInitialTransmitChunk()117 void Context::SendInitialTransmitChunk() {
118 // A transmitter begins a transfer by just sending its ID.
119 internal::Chunk chunk = {};
120 chunk.transfer_id = transfer_id_;
121 chunk.type = Chunk::Type::kTransferStart;
122
123 EncodeAndSendChunk(chunk);
124 }
125
SendTransferParameters(TransmitAction action)126 void Context::SendTransferParameters(TransmitAction action) {
127 internal::Chunk parameters = {
128 .transfer_id = transfer_id_,
129 .window_end_offset = window_end_offset_,
130 .pending_bytes = pending_bytes_,
131 .max_chunk_size_bytes = max_chunk_size_bytes_,
132 .min_delay_microseconds = kDefaultChunkDelayMicroseconds,
133 .offset = offset_,
134 };
135
136 switch (action) {
137 case TransmitAction::kBegin:
138 parameters.type = internal::Chunk::Type::kTransferStart;
139 break;
140 case TransmitAction::kRetransmit:
141 parameters.type = internal::Chunk::Type::kParametersRetransmit;
142 break;
143 case TransmitAction::kExtend:
144 parameters.type = internal::Chunk::Type::kParametersContinue;
145 break;
146 }
147
148 PW_LOG_DEBUG(
149 "Transfer %u sending transfer parameters: "
150 "offset=%u, window_end_offset=%u, pending_bytes=%u, chunk_size=%u",
151 static_cast<unsigned>(transfer_id_),
152 static_cast<unsigned>(offset_),
153 static_cast<unsigned>(window_end_offset_),
154 static_cast<unsigned>(pending_bytes_),
155 static_cast<unsigned>(max_chunk_size_bytes_));
156
157 EncodeAndSendChunk(parameters);
158 }
159
EncodeAndSendChunk(const Chunk & chunk)160 void Context::EncodeAndSendChunk(const Chunk& chunk) {
161 Result<ConstByteSpan> data =
162 internal::EncodeChunk(chunk, thread_->encode_buffer());
163 if (!data.ok()) {
164 PW_LOG_ERROR("Failed to encode chunk for transfer %u: %d",
165 static_cast<unsigned>(chunk.transfer_id),
166 data.status().code());
167 if (active()) {
168 Finish(Status::Internal());
169 }
170 return;
171 }
172
173 if (const Status status = rpc_writer_->Write(*data); !status.ok()) {
174 PW_LOG_ERROR("Failed to write chunk for transfer %u: %d",
175 static_cast<unsigned>(chunk.transfer_id),
176 status.code());
177 if (active()) {
178 Finish(Status::Internal());
179 }
180 return;
181 }
182 }
183
UpdateAndSendTransferParameters(TransmitAction action)184 void Context::UpdateAndSendTransferParameters(TransmitAction action) {
185 size_t pending_bytes =
186 std::min(max_parameters_->pending_bytes(),
187 static_cast<uint32_t>(writer().ConservativeWriteLimit()));
188
189 window_size_ = pending_bytes;
190 window_end_offset_ = offset_ + pending_bytes;
191 pending_bytes_ = pending_bytes;
192
193 max_chunk_size_bytes_ = MaxWriteChunkSize(
194 max_parameters_->max_chunk_size_bytes(), rpc_writer_->channel_id());
195
196 PW_LOG_INFO("Transfer rate: %u B/s",
197 static_cast<unsigned>(transfer_rate_.GetRateBytesPerSecond()));
198
199 return SendTransferParameters(action);
200 }
201
Initialize(const NewTransferEvent & new_transfer)202 void Context::Initialize(const NewTransferEvent& new_transfer) {
203 PW_DCHECK(!active());
204
205 transfer_id_ = new_transfer.transfer_id;
206 flags_ = static_cast<uint8_t>(new_transfer.type);
207 transfer_state_ = TransferState::kWaiting;
208 retries_ = 0;
209 max_retries_ = new_transfer.max_retries;
210
211 rpc_writer_ = new_transfer.rpc_writer;
212 stream_ = new_transfer.stream;
213
214 offset_ = 0;
215 window_size_ = 0;
216 window_end_offset_ = 0;
217 pending_bytes_ = 0;
218 max_chunk_size_bytes_ = new_transfer.max_parameters->max_chunk_size_bytes();
219
220 max_parameters_ = new_transfer.max_parameters;
221 thread_ = new_transfer.transfer_thread;
222
223 last_chunk_offset_ = 0;
224 chunk_timeout_ = new_transfer.timeout;
225 interchunk_delay_ = chrono::SystemClock::for_at_least(
226 std::chrono::microseconds(kDefaultChunkDelayMicroseconds));
227 next_timeout_ = kNoTimeout;
228
229 transfer_rate_.Reset();
230 }
231
HandleChunkEvent(const ChunkEvent & event)232 void Context::HandleChunkEvent(const ChunkEvent& event) {
233 PW_DCHECK(event.transfer_id == transfer_id_);
234
235 Chunk chunk;
236 if (!DecodeChunk(ConstByteSpan(event.data, event.size), chunk).ok()) {
237 return;
238 }
239
240 // Received some data. Reset the retry counter.
241 retries_ = 0;
242
243 if (chunk.status.has_value()) {
244 if (active()) {
245 Finish(chunk.status.value());
246 } else {
247 PW_LOG_DEBUG("Got final status %d for completed transfer %d",
248 static_cast<int>(chunk.status.value().code()),
249 static_cast<int>(transfer_id_));
250 }
251 return;
252 }
253
254 if (type() == TransferType::kTransmit) {
255 HandleTransmitChunk(chunk);
256 } else {
257 HandleReceiveChunk(chunk);
258 }
259 }
260
HandleTransmitChunk(const Chunk & chunk)261 void Context::HandleTransmitChunk(const Chunk& chunk) {
262 switch (transfer_state_) {
263 case TransferState::kInactive:
264 case TransferState::kRecovery:
265 PW_CRASH("Never should handle chunk while inactive");
266
267 case TransferState::kCompleted:
268 // If the transfer has already completed and another chunk is received,
269 // tell the other end that the transfer is over.
270 //
271 // TODO(frolv): Final status chunks should be ACKed by the other end. When
272 // that is added, this case should be updated to check if the received
273 // chunk is an ACK. If so, the transfer state can be reset to INACTIVE.
274 // Otherwise, the final status should be re-sent.
275 if (!chunk.IsInitialChunk()) {
276 status_ = Status::FailedPrecondition();
277 }
278 SendFinalStatusChunk();
279 return;
280
281 case TransferState::kWaiting:
282 case TransferState::kTransmitting:
283 HandleTransferParametersUpdate(chunk);
284 if (transfer_state_ == TransferState::kCompleted) {
285 SendFinalStatusChunk();
286 }
287 return;
288 }
289 }
290
HandleTransferParametersUpdate(const Chunk & chunk)291 void Context::HandleTransferParametersUpdate(const Chunk& chunk) {
292 if (!chunk.pending_bytes.has_value()) {
293 // Malformed chunk.
294 Finish(Status::InvalidArgument());
295 return;
296 }
297
298 bool retransmit = true;
299 if (chunk.type.has_value()) {
300 retransmit = chunk.type == Chunk::Type::kParametersRetransmit ||
301 chunk.type == Chunk::Type::kTransferStart;
302 }
303
304 if (retransmit) {
305 // If the offsets don't match, attempt to seek on the reader. Not all
306 // readers support seeking; abort with UNIMPLEMENTED if this handler
307 // doesn't.
308 if (offset_ != chunk.offset) {
309 if (Status seek_status = reader().Seek(chunk.offset); !seek_status.ok()) {
310 PW_LOG_WARN("Transfer %u seek to %u failed with status %u",
311 static_cast<unsigned>(transfer_id_),
312 static_cast<unsigned>(chunk.offset),
313 seek_status.code());
314
315 // Remap status codes to return one of the following:
316 //
317 // INTERNAL: invalid seek, never should happen
318 // DATA_LOSS: the reader is in a bad state
319 // UNIMPLEMENTED: seeking is not supported
320 //
321 if (seek_status.IsOutOfRange()) {
322 seek_status = Status::Internal();
323 } else if (!seek_status.IsUnimplemented()) {
324 seek_status = Status::DataLoss();
325 }
326
327 Finish(seek_status);
328 return;
329 }
330 }
331
332 // Retransmit is the default behavior for older versions of the transfer
333 // protocol. The window_end_offset field is not guaranteed to be set in
334 // these versions, so it must be calculated.
335 offset_ = chunk.offset;
336 window_end_offset_ = offset_ + chunk.pending_bytes.value();
337 pending_bytes_ = chunk.pending_bytes.value();
338 } else {
339 window_end_offset_ = chunk.window_end_offset;
340 }
341
342 if (chunk.max_chunk_size_bytes.has_value()) {
343 max_chunk_size_bytes_ = std::min(chunk.max_chunk_size_bytes.value(),
344 max_parameters_->max_chunk_size_bytes());
345 }
346
347 if (chunk.min_delay_microseconds.has_value()) {
348 interchunk_delay_ = chrono::SystemClock::for_at_least(
349 std::chrono::microseconds(chunk.min_delay_microseconds.value()));
350 }
351
352 PW_LOG_DEBUG(
353 "Transfer %u received parameters type=%s offset=%u window_end_offset=%u",
354 static_cast<unsigned>(transfer_id_),
355 retransmit ? "RETRANSMIT" : "CONTINUE",
356 static_cast<unsigned>(chunk.offset),
357 static_cast<unsigned>(window_end_offset_));
358
359 // Parsed all of the parameters; start sending the window.
360 set_transfer_state(TransferState::kTransmitting);
361
362 TransmitNextChunk(retransmit);
363 }
364
TransmitNextChunk(bool retransmit_requested)365 void Context::TransmitNextChunk(bool retransmit_requested) {
366 ByteSpan buffer = thread_->encode_buffer();
367
368 // Begin by doing a partial encode of all the metadata fields, leaving the
369 // buffer with usable space for the chunk data at the end.
370 transfer::Chunk::MemoryEncoder encoder{buffer};
371 encoder.WriteTransferId(transfer_id_).IgnoreError();
372 encoder.WriteOffset(offset_).IgnoreError();
373
374 // TODO(frolv): Type field presence is currently meaningful, so this type must
375 // be serialized. Once all users of transfer always set chunk types, the field
376 // can be made non-optional and this write can be removed as TRANSFER_DATA has
377 // the default proto value of 0.
378 encoder.WriteType(transfer::Chunk::Type::TRANSFER_DATA).IgnoreError();
379
380 // Reserve space for the data proto field overhead and use the remainder of
381 // the buffer for the chunk data.
382 size_t reserved_size = encoder.size() + 1 /* data key */ + 5 /* data size */;
383
384 ByteSpan data_buffer = buffer.subspan(reserved_size);
385 size_t max_bytes_to_send =
386 std::min(window_end_offset_ - offset_, max_chunk_size_bytes_);
387
388 if (max_bytes_to_send < data_buffer.size()) {
389 data_buffer = data_buffer.first(max_bytes_to_send);
390 }
391
392 Result<ByteSpan> data = reader().Read(data_buffer);
393 if (data.status().IsOutOfRange()) {
394 // No more data to read.
395 encoder.WriteRemainingBytes(0).IgnoreError();
396 window_end_offset_ = offset_;
397 pending_bytes_ = 0;
398
399 PW_LOG_DEBUG("Transfer %u sending final chunk with remaining_bytes=0",
400 static_cast<unsigned>(transfer_id_));
401 } else if (data.ok()) {
402 if (offset_ == window_end_offset_) {
403 if (retransmit_requested) {
404 PW_LOG_DEBUG(
405 "Transfer %u: received an empty retransmit request, but there is "
406 "still data to send; aborting with RESOURCE_EXHAUSTED",
407 id_for_log());
408 Finish(Status::ResourceExhausted());
409 } else {
410 PW_LOG_DEBUG(
411 "Transfer %u: ignoring continuation packet for transfer window "
412 "that has already been sent",
413 id_for_log());
414 SetTimeout(chunk_timeout_);
415 }
416 return; // No data was requested, so there is nothing else to do.
417 }
418
419 PW_LOG_DEBUG("Transfer %u sending chunk offset=%u size=%u",
420 static_cast<unsigned>(transfer_id_),
421 static_cast<unsigned>(offset_),
422 static_cast<unsigned>(data.value().size()));
423
424 encoder.WriteData(data.value()).IgnoreError();
425 last_chunk_offset_ = offset_;
426 offset_ += data.value().size();
427 pending_bytes_ -= data.value().size();
428 } else {
429 PW_LOG_ERROR("Transfer %u Read() failed with status %u",
430 static_cast<unsigned>(transfer_id_),
431 data.status().code());
432 Finish(Status::DataLoss());
433 return;
434 }
435
436 if (!encoder.status().ok()) {
437 PW_LOG_ERROR("Transfer %u failed to encode transmit chunk",
438 static_cast<unsigned>(transfer_id_));
439 Finish(Status::Internal());
440 return;
441 }
442
443 if (const Status status = rpc_writer_->Write(encoder); !status.ok()) {
444 PW_LOG_ERROR("Transfer %u failed to send transmit chunk, status %u",
445 static_cast<unsigned>(transfer_id_),
446 status.code());
447 Finish(Status::DataLoss());
448 return;
449 }
450
451 flags_ |= kFlagsDataSent;
452
453 if (offset_ == window_end_offset_) {
454 // Sent all requested data. Must now wait for next parameters from the
455 // receiver.
456 set_transfer_state(TransferState::kWaiting);
457 SetTimeout(chunk_timeout_);
458 } else {
459 // More data is to be sent. Set a timeout to send the next chunk following
460 // the chunk delay.
461 SetTimeout(chrono::SystemClock::for_at_least(interchunk_delay_));
462 }
463 }
464
HandleReceiveChunk(const Chunk & chunk)465 void Context::HandleReceiveChunk(const Chunk& chunk) {
466 switch (transfer_state_) {
467 case TransferState::kInactive:
468 PW_CRASH("Never should handle chunk while inactive");
469
470 case TransferState::kTransmitting:
471 PW_CRASH("Receive transfer somehow entered TRANSMITTING state");
472
473 case TransferState::kCompleted:
474 // If the transfer has already completed and another chunk is received,
475 // re-send the final status chunk.
476 //
477 // TODO(frolv): Final status chunks should be ACKed by the other end. When
478 // that is added, this case should be updated to check if the received
479 // chunk is an ACK. If so, the transfer state can be reset to INACTIVE.
480 // Otherwise, the final status should be re-sent.
481 SendFinalStatusChunk();
482 return;
483
484 case TransferState::kRecovery:
485 if (chunk.offset != offset_) {
486 if (last_chunk_offset_ == chunk.offset) {
487 PW_LOG_DEBUG(
488 "Transfer %u received repeated offset %u; retry detected, "
489 "resending transfer parameters",
490 static_cast<unsigned>(transfer_id_),
491 static_cast<unsigned>(chunk.offset));
492
493 UpdateAndSendTransferParameters(TransmitAction::kRetransmit);
494 if (transfer_state_ == TransferState::kCompleted) {
495 SendFinalStatusChunk();
496 return;
497 }
498 PW_LOG_DEBUG("Transfer %u waiting for offset %u, ignoring %u",
499 static_cast<unsigned>(transfer_id_),
500 static_cast<unsigned>(offset_),
501 static_cast<unsigned>(chunk.offset));
502 }
503
504 last_chunk_offset_ = chunk.offset;
505 SetTimeout(chunk_timeout_);
506 return;
507 }
508
509 PW_LOG_DEBUG("Transfer %u received expected offset %u, resuming transfer",
510 static_cast<unsigned>(transfer_id_),
511 static_cast<unsigned>(offset_));
512 set_transfer_state(TransferState::kWaiting);
513
514 // The correct chunk was received; process it normally.
515 [[fallthrough]];
516 case TransferState::kWaiting:
517 HandleReceivedData(chunk);
518 if (transfer_state_ == TransferState::kCompleted) {
519 SendFinalStatusChunk();
520 }
521 return;
522 }
523 }
524
HandleReceivedData(const Chunk & chunk)525 void Context::HandleReceivedData(const Chunk& chunk) {
526 if (chunk.data.size() > pending_bytes_) {
527 // End the transfer, as this indicates a bug with the client implementation
528 // where it doesn't respect pending_bytes. Trying to recover from here
529 // could potentially result in an infinite transfer loop.
530 PW_LOG_ERROR(
531 "Transfer %u received more data than what was requested (%u received "
532 "for %u pending); terminating transfer.",
533 id_for_log(),
534 static_cast<unsigned>(chunk.data.size()),
535 static_cast<unsigned>(pending_bytes_));
536 Finish(Status::Internal());
537 return;
538 }
539
540 if (chunk.offset != offset_) {
541 // Bad offset; reset pending_bytes to send another parameters chunk.
542 PW_LOG_DEBUG(
543 "Transfer %u expected offset %u, received %u; entering recovery state",
544 static_cast<unsigned>(transfer_id_),
545 static_cast<unsigned>(offset_),
546 static_cast<unsigned>(chunk.offset));
547
548 set_transfer_state(TransferState::kRecovery);
549 SetTimeout(chunk_timeout_);
550
551 UpdateAndSendTransferParameters(TransmitAction::kRetransmit);
552 return;
553 }
554
555 // Update the last offset seen so that retries can be detected.
556 last_chunk_offset_ = chunk.offset;
557
558 // Write staged data from the buffer to the stream.
559 if (!chunk.data.empty()) {
560 if (Status status = writer().Write(chunk.data); !status.ok()) {
561 PW_LOG_ERROR(
562 "Transfer %u write of %u B chunk failed with status %u; aborting "
563 "with DATA_LOSS",
564 static_cast<unsigned>(transfer_id_),
565 static_cast<unsigned>(chunk.data.size()),
566 status.code());
567 Finish(Status::DataLoss());
568 return;
569 }
570
571 transfer_rate_.Update(chunk.data.size());
572 }
573
574 // When the client sets remaining_bytes to 0, it indicates completion of the
575 // transfer. Acknowledge the completion through a status chunk and clean up.
576 if (chunk.IsFinalTransmitChunk()) {
577 Finish(OkStatus());
578 return;
579 }
580
581 // Update the transfer state.
582 offset_ += chunk.data.size();
583 pending_bytes_ -= chunk.data.size();
584
585 if (chunk.window_end_offset != 0) {
586 if (chunk.window_end_offset < offset_) {
587 PW_LOG_ERROR(
588 "Transfer %u got invalid end offset of %u (current offset %u)",
589 id_for_log(),
590 static_cast<unsigned>(chunk.window_end_offset),
591 static_cast<unsigned>(offset_));
592 Finish(Status::Internal());
593 return;
594 }
595
596 if (chunk.window_end_offset > window_end_offset_) {
597 // A transmitter should never send a larger end offset than what the
598 // receiver has advertised. If this occurs, there is a bug in the
599 // transmitter implementation. Terminate the transfer.
600 PW_LOG_ERROR(
601 "Transfer %u transmitter sent invalid end offset of %u, "
602 "greater than receiver offset %u",
603 id_for_log(),
604 static_cast<unsigned>(chunk.window_end_offset),
605 static_cast<unsigned>(window_end_offset_));
606 Finish(Status::Internal());
607 return;
608 }
609
610 window_end_offset_ = chunk.window_end_offset;
611 pending_bytes_ = chunk.window_end_offset - offset_;
612 }
613
614 SetTimeout(chunk_timeout_);
615
616 if (pending_bytes_ == 0u) {
617 // Received all pending data. Advance the transfer parameters.
618 UpdateAndSendTransferParameters(TransmitAction::kRetransmit);
619 return;
620 }
621
622 // Once the transmitter has sent a sufficient amount of data, try to extend
623 // the window to allow it to continue sending data without blocking.
624 uint32_t remaining_window_size = window_end_offset_ - offset_;
625 bool extend_window = remaining_window_size <=
626 window_size_ / max_parameters_->extend_window_divisor();
627
628 if (extend_window) {
629 UpdateAndSendTransferParameters(TransmitAction::kExtend);
630 return;
631 }
632 }
633
SendFinalStatusChunk()634 void Context::SendFinalStatusChunk() {
635 PW_DCHECK(transfer_state_ == TransferState::kCompleted);
636
637 internal::Chunk chunk = {};
638 chunk.transfer_id = transfer_id_;
639 chunk.status = status_.code();
640 chunk.type = Chunk::Type::kTransferCompletion;
641
642 PW_LOG_DEBUG("Sending final chunk for transfer %u with status %u",
643 static_cast<unsigned>(transfer_id_),
644 status_.code());
645 EncodeAndSendChunk(chunk);
646 }
647
Finish(Status status)648 void Context::Finish(Status status) {
649 PW_DCHECK(active());
650
651 PW_LOG_INFO("Transfer %u completed with status %u",
652 static_cast<unsigned>(transfer_id_),
653 status.code());
654
655 status.Update(FinalCleanup(status));
656
657 set_transfer_state(TransferState::kCompleted);
658 SetTimeout(kFinalChunkAckTimeout);
659 status_ = status;
660 }
661
SetTimeout(chrono::SystemClock::duration timeout)662 void Context::SetTimeout(chrono::SystemClock::duration timeout) {
663 next_timeout_ = chrono::SystemClock::TimePointAfterAtLeast(timeout);
664 }
665
HandleTimeout()666 void Context::HandleTimeout() {
667 ClearTimeout();
668
669 switch (transfer_state_) {
670 case TransferState::kCompleted:
671 // A timeout occurring in a completed state indicates that the other side
672 // never ACKed the final status packet. Reset the context to inactive.
673 set_transfer_state(TransferState::kInactive);
674 return;
675
676 case TransferState::kTransmitting:
677 // A timeout occurring in a TRANSMITTING state indicates that the transfer
678 // has waited for its inter-chunk delay and should transmit its next
679 // chunk.
680 TransmitNextChunk(/*retransmit_requested=*/false);
681 break;
682
683 case TransferState::kWaiting:
684 case TransferState::kRecovery:
685 // A timeout occurring in a WAITING or RECOVERY state indicates that no
686 // chunk has been received from the other side. The transfer should retry
687 // its previous operation.
688 SetTimeout(chunk_timeout_); // Finish() clears the timeout if retry fails
689 Retry();
690 break;
691
692 case TransferState::kInactive:
693 PW_LOG_ERROR("Timeout occurred in INACTIVE state");
694 return;
695 }
696
697 if (transfer_state_ == TransferState::kCompleted) {
698 SendFinalStatusChunk();
699 }
700 }
701
Retry()702 void Context::Retry() {
703 if (retries_ == max_retries_) {
704 PW_LOG_ERROR("Transfer %u failed to receive a chunk after %u retries.",
705 static_cast<unsigned>(transfer_id_),
706 static_cast<unsigned>(retries_));
707 PW_LOG_ERROR("Canceling transfer.");
708 Finish(Status::DeadlineExceeded());
709 return;
710 }
711
712 ++retries_;
713
714 if (type() == TransferType::kReceive) {
715 // Resend the most recent transfer parameters.
716 PW_LOG_DEBUG(
717 "Receive transfer %u timed out waiting for chunk; resending parameters",
718 static_cast<unsigned>(transfer_id_));
719
720 SendTransferParameters(TransmitAction::kRetransmit);
721 return;
722 }
723
724 // In a transmit, if a data chunk has not yet been sent, the initial transfer
725 // parameters did not arrive from the receiver. Resend the initial chunk.
726 if ((flags_ & kFlagsDataSent) != kFlagsDataSent) {
727 PW_LOG_DEBUG(
728 "Transmit transfer %u timed out waiting for initial parameters",
729 static_cast<unsigned>(transfer_id_));
730 SendInitialTransmitChunk();
731 return;
732 }
733
734 // Otherwise, resend the most recent chunk. If the reader doesn't support
735 // seeking, this isn't possible, so just terminate the transfer immediately.
736 if (!reader().Seek(last_chunk_offset_).ok()) {
737 PW_LOG_ERROR("Transmit transfer %d timed out waiting for new parameters.",
738 static_cast<unsigned>(transfer_id_));
739 PW_LOG_ERROR("Retrying requires a seekable reader. Alas, ours is not.");
740 Finish(Status::DeadlineExceeded());
741 return;
742 }
743
744 // Rewind the transfer position and resend the chunk.
745 size_t last_size_sent = offset_ - last_chunk_offset_;
746 offset_ = last_chunk_offset_;
747 pending_bytes_ += last_size_sent;
748
749 TransmitNextChunk(/*retransmit_requested=*/false);
750 }
751
MaxWriteChunkSize(uint32_t max_chunk_size_bytes,uint32_t channel_id) const752 uint32_t Context::MaxWriteChunkSize(uint32_t max_chunk_size_bytes,
753 uint32_t channel_id) const {
754 // Start with the user-provided maximum chunk size, which should be the usable
755 // payload length on the RPC ingress path after any transport overhead.
756 ptrdiff_t max_size = max_chunk_size_bytes;
757
758 // Subtract the RPC overhead (pw_rpc/internal/packet.proto).
759 //
760 // type: 1 byte key, 1 byte value (CLIENT_STREAM)
761 // channel_id: 1 byte key, varint value (calculate from stream)
762 // service_id: 1 byte key, 4 byte value
763 // method_id: 1 byte key, 4 byte value
764 // payload: 1 byte key, varint length (remaining space)
765 // status: 0 bytes (not set in stream packets)
766 //
767 // TOTAL: 14 bytes + encoded channel_id size + encoded payload length
768 //
769 max_size -= 14;
770 max_size -= varint::EncodedSize(channel_id);
771 max_size -= varint::EncodedSize(max_size);
772
773 // TODO(frolv): Temporarily add 5 bytes for the new call_id change. The RPC
774 // overhead calculation will be moved into an RPC helper to avoid having
775 // pw_transfer depend on RPC internals.
776 max_size -= 5;
777
778 // Subtract the transfer service overhead for a client write chunk
779 // (pw_transfer/transfer.proto).
780 //
781 // transfer_id: 1 byte key, varint value (calculate)
782 // offset: 1 byte key, varint value (calculate)
783 // data: 1 byte key, varint length (remaining space)
784 //
785 // TOTAL: 3 + encoded transfer_id + encoded offset + encoded data length
786 //
787 max_size -= 3;
788 max_size -= varint::EncodedSize(transfer_id_);
789 max_size -= varint::EncodedSize(window_end_offset_);
790 max_size -= varint::EncodedSize(max_size);
791
792 // A resulting value of zero (or less) renders write transfers unusable, as
793 // there is no space to send any payload. This should be considered a
794 // programmer error in the transfer service setup.
795 PW_CHECK_INT_GT(
796 max_size,
797 0,
798 "Transfer service maximum chunk size is too small to fit a payload. "
799 "Increase max_chunk_size_bytes to support write transfers.");
800
801 return max_size;
802 }
803
804 } // namespace pw::transfer::internal
805
806 PW_MODIFY_DIAGNOSTICS_POP();
807