1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #define PW_LOG_MODULE_NAME "TRN"
16
17 #include "pw_transfer/internal/context.h"
18
19 #include <chrono>
20
21 #include "pw_assert/check.h"
22 #include "pw_chrono/system_clock.h"
23 #include "pw_log/log.h"
24 #include "pw_status/try.h"
25 #include "pw_transfer/transfer.pwpb.h"
26 #include "pw_transfer/transfer_thread.h"
27 #include "pw_varint/varint.h"
28
29 namespace pw::transfer::internal {
30
HandleEvent(const Event & event)31 void Context::HandleEvent(const Event& event) {
32 switch (event.type) {
33 case EventType::kNewClientTransfer:
34 case EventType::kNewServerTransfer: {
35 if (active()) {
36 Abort(Status::Aborted());
37 }
38
39 Initialize(event.new_transfer);
40
41 if (event.type == EventType::kNewClientTransfer) {
42 InitiateTransferAsClient();
43 } else {
44 if (StartTransferAsServer(event.new_transfer)) {
45 // TODO(frolv): This should probably be restructured.
46 HandleChunkEvent({.context_identifier = event.new_transfer.session_id,
47 .match_resource_id = false, // Unused.
48 .data = event.new_transfer.raw_chunk_data,
49 .size = event.new_transfer.raw_chunk_size});
50 }
51 }
52 return;
53 }
54
55 case EventType::kClientChunk:
56 case EventType::kServerChunk:
57 PW_CHECK(initialized());
58 HandleChunkEvent(event.chunk);
59 return;
60
61 case EventType::kClientTimeout:
62 case EventType::kServerTimeout:
63 HandleTimeout();
64 return;
65
66 case EventType::kClientEndTransfer:
67 case EventType::kServerEndTransfer:
68 if (active()) {
69 if (event.end_transfer.send_status_chunk) {
70 TerminateTransfer(event.end_transfer.status);
71 } else {
72 Abort(event.end_transfer.status);
73 }
74 }
75 return;
76
77 case EventType::kSendStatusChunk:
78 case EventType::kAddTransferHandler:
79 case EventType::kRemoveTransferHandler:
80 case EventType::kTerminate:
81 // These events are intended for the transfer thread and should never be
82 // forwarded through to a context.
83 PW_CRASH("Transfer context received a transfer thread event");
84 }
85 }
86
InitiateTransferAsClient()87 void Context::InitiateTransferAsClient() {
88 PW_DCHECK(active());
89
90 SetTimeout(chunk_timeout_);
91
92 PW_LOG_INFO("Starting transfer for resource %u",
93 static_cast<unsigned>(resource_id_));
94
95 // Receive transfers should prepare their initial parameters to be send in the
96 // initial chunk.
97 if (type() == TransferType::kReceive) {
98 UpdateTransferParameters();
99 }
100
101 if (desired_protocol_version_ == ProtocolVersion::kLegacy) {
102 // Legacy transfers go straight into the data transfer phase without a
103 // handshake.
104 if (type() == TransferType::kReceive) {
105 SendTransferParameters(TransmitAction::kBegin);
106 } else {
107 SendInitialTransmitChunk();
108 }
109
110 LogTransferConfiguration();
111 return;
112 }
113
114 // In newer protocol versions, begin the initial transfer handshake.
115 Chunk start_chunk(desired_protocol_version_, Chunk::Type::kStart);
116 start_chunk.set_resource_id(resource_id_);
117
118 if (type() == TransferType::kReceive) {
119 // Parameters should still be set on the initial chunk for backwards
120 // compatibility if the server only supports the legacy protocol.
121 SetTransferParameters(start_chunk);
122 }
123
124 EncodeAndSendChunk(start_chunk);
125 }
126
StartTransferAsServer(const NewTransferEvent & new_transfer)127 bool Context::StartTransferAsServer(const NewTransferEvent& new_transfer) {
128 PW_LOG_INFO("Starting %s transfer %u for resource %u",
129 new_transfer.type == TransferType::kTransmit ? "read" : "write",
130 static_cast<unsigned>(new_transfer.session_id),
131 static_cast<unsigned>(new_transfer.resource_id));
132 LogTransferConfiguration();
133
134 flags_ |= kFlagsContactMade;
135
136 if (Status status = new_transfer.handler->Prepare(new_transfer.type);
137 !status.ok()) {
138 PW_LOG_WARN("Transfer handler %u prepare failed with status %u",
139 static_cast<unsigned>(new_transfer.handler->id()),
140 status.code());
141
142 // As this failure occurs at the start of a transfer, no protocol version is
143 // yet negotiated and one must be set to send a response. It is okay to use
144 // the desired version here, as that comes from the client.
145 configured_protocol_version_ = desired_protocol_version_;
146
147 status = status.IsPermissionDenied() ? status : Status::DataLoss();
148 TerminateTransfer(status, /*with_resource_id=*/true);
149 return false;
150 }
151
152 // Initialize doesn't set the handler since it's specific to server transfers.
153 static_cast<ServerContext&>(*this).set_handler(*new_transfer.handler);
154
155 // Server transfers use the stream provided by the handler rather than the
156 // stream included in the NewTransferEvent.
157 stream_ = &new_transfer.handler->stream();
158
159 return true;
160 }
161
SendInitialTransmitChunk()162 void Context::SendInitialTransmitChunk() {
163 // A transmitter begins a transfer by sending the ID of the resource to which
164 // it wishes to write.
165 Chunk chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart);
166 chunk.set_session_id(session_id_);
167
168 EncodeAndSendChunk(chunk);
169 }
170
UpdateTransferParameters()171 void Context::UpdateTransferParameters() {
172 size_t pending_bytes =
173 std::min(max_parameters_->pending_bytes(),
174 static_cast<uint32_t>(writer().ConservativeWriteLimit()));
175
176 window_size_ = pending_bytes;
177 window_end_offset_ = offset_ + pending_bytes;
178
179 max_chunk_size_bytes_ = MaxWriteChunkSize(
180 max_parameters_->max_chunk_size_bytes(), rpc_writer_->channel_id());
181 }
182
SetTransferParameters(Chunk & parameters)183 void Context::SetTransferParameters(Chunk& parameters) {
184 parameters.set_window_end_offset(window_end_offset_)
185 .set_max_chunk_size_bytes(max_chunk_size_bytes_)
186 .set_min_delay_microseconds(kDefaultChunkDelayMicroseconds)
187 .set_offset(offset_);
188 }
189
UpdateAndSendTransferParameters(TransmitAction action)190 void Context::UpdateAndSendTransferParameters(TransmitAction action) {
191 UpdateTransferParameters();
192
193 PW_LOG_INFO("Transfer rate: %u B/s",
194 static_cast<unsigned>(transfer_rate_.GetRateBytesPerSecond()));
195
196 return SendTransferParameters(action);
197 }
198
SendTransferParameters(TransmitAction action)199 void Context::SendTransferParameters(TransmitAction action) {
200 Chunk::Type type = Chunk::Type::kParametersRetransmit;
201
202 switch (action) {
203 case TransmitAction::kBegin:
204 type = Chunk::Type::kStart;
205 break;
206 case TransmitAction::kRetransmit:
207 type = Chunk::Type::kParametersRetransmit;
208 break;
209 case TransmitAction::kExtend:
210 type = Chunk::Type::kParametersContinue;
211 break;
212 }
213
214 Chunk parameters(configured_protocol_version_, type);
215 parameters.set_session_id(session_id_);
216 SetTransferParameters(parameters);
217
218 PW_LOG_DEBUG(
219 "Transfer %u sending transfer parameters: "
220 "offset=%u, window_end_offset=%u, max_chunk_size=%u",
221 static_cast<unsigned>(session_id_),
222 static_cast<unsigned>(offset_),
223 static_cast<unsigned>(window_end_offset_),
224 static_cast<unsigned>(max_chunk_size_bytes_));
225
226 EncodeAndSendChunk(parameters);
227 }
228
EncodeAndSendChunk(const Chunk & chunk)229 void Context::EncodeAndSendChunk(const Chunk& chunk) {
230 last_chunk_sent_ = chunk.type();
231
232 Result<ConstByteSpan> data = chunk.Encode(thread_->encode_buffer());
233 if (!data.ok()) {
234 PW_LOG_ERROR("Failed to encode chunk for transfer %u: %d",
235 static_cast<unsigned>(chunk.session_id()),
236 data.status().code());
237 if (active()) {
238 TerminateTransfer(Status::Internal());
239 }
240 return;
241 }
242
243 if (const Status status = rpc_writer_->Write(*data); !status.ok()) {
244 PW_LOG_ERROR("Failed to write chunk for transfer %u: %d",
245 static_cast<unsigned>(chunk.session_id()),
246 status.code());
247 if (active()) {
248 TerminateTransfer(Status::Internal());
249 }
250 return;
251 }
252 }
253
Initialize(const NewTransferEvent & new_transfer)254 void Context::Initialize(const NewTransferEvent& new_transfer) {
255 PW_DCHECK(!active());
256
257 PW_DCHECK_INT_NE(new_transfer.protocol_version,
258 ProtocolVersion::kUnknown,
259 "Cannot start a transfer with an unknown protocol");
260
261 session_id_ = new_transfer.session_id;
262 resource_id_ = new_transfer.resource_id;
263 desired_protocol_version_ = new_transfer.protocol_version;
264 configured_protocol_version_ = ProtocolVersion::kUnknown;
265
266 flags_ = static_cast<uint8_t>(new_transfer.type);
267 transfer_state_ = TransferState::kWaiting;
268 retries_ = 0;
269 max_retries_ = new_transfer.max_retries;
270 lifetime_retries_ = 0;
271 max_lifetime_retries_ = new_transfer.max_lifetime_retries;
272
273 if (desired_protocol_version_ == ProtocolVersion::kLegacy) {
274 // In a legacy transfer, there is no protocol negotiation stage.
275 // Automatically configure the context to run the legacy protocol and
276 // proceed to waiting for a chunk.
277 configured_protocol_version_ = ProtocolVersion::kLegacy;
278 transfer_state_ = TransferState::kWaiting;
279 } else {
280 transfer_state_ = TransferState::kInitiating;
281 }
282
283 rpc_writer_ = new_transfer.rpc_writer;
284 stream_ = new_transfer.stream;
285
286 offset_ = 0;
287 window_size_ = 0;
288 window_end_offset_ = 0;
289 max_chunk_size_bytes_ = new_transfer.max_parameters->max_chunk_size_bytes();
290
291 max_parameters_ = new_transfer.max_parameters;
292 thread_ = new_transfer.transfer_thread;
293
294 last_chunk_sent_ = Chunk::Type::kStart;
295 last_chunk_offset_ = 0;
296 chunk_timeout_ = new_transfer.timeout;
297 interchunk_delay_ = chrono::SystemClock::for_at_least(
298 std::chrono::microseconds(kDefaultChunkDelayMicroseconds));
299 next_timeout_ = kNoTimeout;
300
301 transfer_rate_.Reset();
302 }
303
HandleChunkEvent(const ChunkEvent & event)304 void Context::HandleChunkEvent(const ChunkEvent& event) {
305 Result<Chunk> maybe_chunk =
306 Chunk::Parse(ConstByteSpan(event.data, event.size));
307 if (!maybe_chunk.ok()) {
308 return;
309 }
310
311 Chunk chunk = *maybe_chunk;
312
313 // Received some data. Reset the retry counter.
314 retries_ = 0;
315 flags_ |= kFlagsContactMade;
316
317 if (chunk.IsTerminatingChunk()) {
318 if (active()) {
319 HandleTermination(chunk.status().value());
320 } else {
321 PW_LOG_DEBUG("Got final status %d for completed transfer %d",
322 static_cast<int>(chunk.status().value().code()),
323 static_cast<int>(session_id_));
324 }
325 return;
326 }
327
328 if (type() == TransferType::kTransmit) {
329 HandleTransmitChunk(chunk);
330 } else {
331 HandleReceiveChunk(chunk);
332 }
333 }
334
PerformInitialHandshake(const Chunk & chunk)335 void Context::PerformInitialHandshake(const Chunk& chunk) {
336 switch (chunk.type()) {
337 // Initial packet sent from a client to a server.
338 case Chunk::Type::kStart: {
339 UpdateLocalProtocolConfigurationFromPeer(chunk);
340
341 // This cast is safe as we know we're running in a transfer server.
342 uint32_t resource_id = static_cast<ServerContext&>(*this).handler()->id();
343
344 Chunk start_ack(configured_protocol_version_, Chunk::Type::kStartAck);
345 start_ack.set_session_id(session_id_).set_resource_id(resource_id);
346
347 EncodeAndSendChunk(start_ack);
348 break;
349 }
350
351 // Response packet sent from a server to a client. Contains the assigned
352 // session_id of the transfer.
353 case Chunk::Type::kStartAck: {
354 UpdateLocalProtocolConfigurationFromPeer(chunk);
355
356 // Accept the assigned session_id and tell the server that the transfer
357 // can begin.
358 session_id_ = chunk.session_id();
359 PW_LOG_DEBUG("Transfer for resource %u was assigned session ID %u",
360 static_cast<unsigned>(resource_id_),
361 static_cast<unsigned>(session_id_));
362
363 Chunk start_ack_confirmation(configured_protocol_version_,
364 Chunk::Type::kStartAckConfirmation);
365 start_ack_confirmation.set_session_id(session_id_);
366
367 if (type() == TransferType::kReceive) {
368 // In a receive transfer, tag the initial transfer parameters onto the
369 // confirmation chunk so that the server can immediately begin sending
370 // data.
371 UpdateTransferParameters();
372 SetTransferParameters(start_ack_confirmation);
373 }
374
375 set_transfer_state(TransferState::kWaiting);
376 EncodeAndSendChunk(start_ack_confirmation);
377 break;
378 }
379
380 // Confirmation sent by a client to a server of the configured transfer
381 // version and session ID. Completes the handshake and begins the actual
382 // data transfer.
383 case Chunk::Type::kStartAckConfirmation: {
384 set_transfer_state(TransferState::kWaiting);
385
386 if (type() == TransferType::kTransmit) {
387 HandleTransmitChunk(chunk);
388 } else {
389 HandleReceiveChunk(chunk);
390 }
391 break;
392 }
393
394 // If a non-handshake chunk is received during an INITIATING state, the
395 // transfer peer is running a legacy protocol version, which does not
396 // perform a handshake. End the handshake, revert to the legacy protocol,
397 // and process the chunk appropriately.
398 case Chunk::Type::kData:
399 case Chunk::Type::kParametersRetransmit:
400 case Chunk::Type::kParametersContinue:
401 // Update the local context's session ID in case it was expecting one to
402 // be assigned by the server.
403 session_id_ = chunk.session_id();
404
405 configured_protocol_version_ = ProtocolVersion::kLegacy;
406 set_transfer_state(TransferState::kWaiting);
407
408 PW_LOG_DEBUG(
409 "Transfer %u tried to start on protocol version %d, but peer only "
410 "supports legacy",
411 id_for_log(),
412 static_cast<int>(desired_protocol_version_));
413
414 if (type() == TransferType::kTransmit) {
415 HandleTransmitChunk(chunk);
416 } else {
417 HandleReceiveChunk(chunk);
418 }
419 break;
420
421 case Chunk::Type::kCompletion:
422 case Chunk::Type::kCompletionAck:
423 PW_CRASH(
424 "Transfer completion packets should be processed by "
425 "HandleChunkEvent()");
426 break;
427 }
428 }
429
UpdateLocalProtocolConfigurationFromPeer(const Chunk & chunk)430 void Context::UpdateLocalProtocolConfigurationFromPeer(const Chunk& chunk) {
431 PW_LOG_DEBUG("Negotiating protocol version: ours=%d, theirs=%d",
432 static_cast<int>(desired_protocol_version_),
433 static_cast<int>(chunk.protocol_version()));
434
435 configured_protocol_version_ =
436 std::min(desired_protocol_version_, chunk.protocol_version());
437
438 PW_LOG_INFO("Transfer %u: using protocol version %d",
439 id_for_log(),
440 static_cast<int>(configured_protocol_version_));
441 }
442
HandleTransmitChunk(const Chunk & chunk)443 void Context::HandleTransmitChunk(const Chunk& chunk) {
444 switch (transfer_state_) {
445 case TransferState::kInactive:
446 case TransferState::kRecovery:
447 PW_CRASH("Never should handle chunk while inactive");
448
449 case TransferState::kCompleted:
450 // If the transfer has already completed and another chunk is received,
451 // tell the other end that the transfer is over.
452 //
453 // TODO(frolv): Final status chunks should be ACKed by the other end. When
454 // that is added, this case should be updated to check if the received
455 // chunk is an ACK. If so, the transfer state can be reset to INACTIVE.
456 // Otherwise, the final status should be re-sent.
457 if (!chunk.IsInitialChunk()) {
458 status_ = Status::FailedPrecondition();
459 }
460 SendFinalStatusChunk();
461 return;
462
463 case TransferState::kInitiating:
464 PerformInitialHandshake(chunk);
465 return;
466
467 case TransferState::kWaiting:
468 case TransferState::kTransmitting:
469 if (chunk.protocol_version() == configured_protocol_version_) {
470 HandleTransferParametersUpdate(chunk);
471 } else {
472 PW_LOG_ERROR(
473 "Transmit transfer %u was configured to use protocol version %d "
474 "but received a chunk with version %d",
475 id_for_log(),
476 static_cast<int>(configured_protocol_version_),
477 static_cast<int>(chunk.protocol_version()));
478 TerminateTransfer(Status::Internal());
479 }
480 return;
481
482 case TransferState::kTerminating:
483 HandleTerminatingChunk(chunk);
484 return;
485 }
486 }
487
HandleTransferParametersUpdate(const Chunk & chunk)488 void Context::HandleTransferParametersUpdate(const Chunk& chunk) {
489 bool retransmit = chunk.RequestsTransmissionFromOffset();
490
491 if (retransmit) {
492 // If the offsets don't match, attempt to seek on the reader. Not all
493 // readers support seeking; abort with UNIMPLEMENTED if this handler
494 // doesn't.
495 if (offset_ != chunk.offset()) {
496 if (Status seek_status = reader().Seek(chunk.offset());
497 !seek_status.ok()) {
498 PW_LOG_WARN("Transfer %u seek to %u failed with status %u",
499 static_cast<unsigned>(session_id_),
500 static_cast<unsigned>(chunk.offset()),
501 seek_status.code());
502
503 // Remap status codes to return one of the following:
504 //
505 // INTERNAL: invalid seek, never should happen
506 // DATA_LOSS: the reader is in a bad state
507 // UNIMPLEMENTED: seeking is not supported
508 //
509 if (seek_status.IsOutOfRange()) {
510 seek_status = Status::Internal();
511 } else if (!seek_status.IsUnimplemented()) {
512 seek_status = Status::DataLoss();
513 }
514
515 TerminateTransfer(seek_status);
516 return;
517 }
518 }
519
520 offset_ = chunk.offset();
521 }
522
523 window_end_offset_ = chunk.window_end_offset();
524
525 if (chunk.max_chunk_size_bytes().has_value()) {
526 max_chunk_size_bytes_ = std::min(chunk.max_chunk_size_bytes().value(),
527 max_parameters_->max_chunk_size_bytes());
528 }
529
530 if (chunk.min_delay_microseconds().has_value()) {
531 interchunk_delay_ = chrono::SystemClock::for_at_least(
532 std::chrono::microseconds(chunk.min_delay_microseconds().value()));
533 }
534
535 PW_LOG_DEBUG(
536 "Transfer %u received parameters type=%s offset=%u window_end_offset=%u",
537 static_cast<unsigned>(session_id_),
538 retransmit ? "RETRANSMIT" : "CONTINUE",
539 static_cast<unsigned>(chunk.offset()),
540 static_cast<unsigned>(window_end_offset_));
541
542 // Parsed all of the parameters; start sending the window.
543 set_transfer_state(TransferState::kTransmitting);
544
545 TransmitNextChunk(retransmit);
546 }
547
TransmitNextChunk(bool retransmit_requested)548 void Context::TransmitNextChunk(bool retransmit_requested) {
549 Chunk chunk(configured_protocol_version_, Chunk::Type::kData);
550 chunk.set_session_id(session_id_);
551 chunk.set_offset(offset_);
552
553 // Reserve space for the data proto field overhead and use the remainder of
554 // the buffer for the chunk data.
555 size_t reserved_size =
556 chunk.EncodedSize() + 1 /* data key */ + 5 /* data size */;
557
558 ByteSpan buffer = thread_->encode_buffer();
559
560 ByteSpan data_buffer = buffer.subspan(reserved_size);
561 size_t max_bytes_to_send =
562 std::min(window_end_offset_ - offset_, max_chunk_size_bytes_);
563
564 if (max_bytes_to_send < data_buffer.size()) {
565 data_buffer = data_buffer.first(max_bytes_to_send);
566 }
567
568 Result<ByteSpan> data = reader().Read(data_buffer);
569 if (data.status().IsOutOfRange()) {
570 // No more data to read.
571 chunk.set_remaining_bytes(0);
572 window_end_offset_ = offset_;
573
574 PW_LOG_DEBUG("Transfer %u sending final chunk with remaining_bytes=0",
575 static_cast<unsigned>(session_id_));
576 } else if (data.ok()) {
577 if (offset_ == window_end_offset_) {
578 if (retransmit_requested) {
579 PW_LOG_DEBUG(
580 "Transfer %u: received an empty retransmit request, but there is "
581 "still data to send; aborting with RESOURCE_EXHAUSTED",
582 id_for_log());
583 TerminateTransfer(Status::ResourceExhausted());
584 } else {
585 PW_LOG_DEBUG(
586 "Transfer %u: ignoring continuation packet for transfer window "
587 "that has already been sent",
588 id_for_log());
589 SetTimeout(chunk_timeout_);
590 }
591 return; // No data was requested, so there is nothing else to do.
592 }
593
594 PW_LOG_DEBUG("Transfer %u sending chunk offset=%u size=%u",
595 static_cast<unsigned>(session_id_),
596 static_cast<unsigned>(offset_),
597 static_cast<unsigned>(data.value().size()));
598
599 chunk.set_payload(data.value());
600 last_chunk_offset_ = offset_;
601 offset_ += data.value().size();
602 } else {
603 PW_LOG_ERROR("Transfer %u Read() failed with status %u",
604 static_cast<unsigned>(session_id_),
605 data.status().code());
606 TerminateTransfer(Status::DataLoss());
607 return;
608 }
609
610 Result<ConstByteSpan> encoded_chunk = chunk.Encode(buffer);
611 if (!encoded_chunk.ok()) {
612 PW_LOG_ERROR("Transfer %u failed to encode transmit chunk",
613 static_cast<unsigned>(session_id_));
614 TerminateTransfer(Status::Internal());
615 return;
616 }
617
618 if (const Status status = rpc_writer_->Write(*encoded_chunk); !status.ok()) {
619 PW_LOG_ERROR("Transfer %u failed to send transmit chunk, status %u",
620 static_cast<unsigned>(session_id_),
621 status.code());
622 TerminateTransfer(Status::DataLoss());
623 return;
624 }
625
626 last_chunk_sent_ = chunk.type();
627 flags_ |= kFlagsDataSent;
628
629 if (offset_ == window_end_offset_) {
630 // Sent all requested data. Must now wait for next parameters from the
631 // receiver.
632 set_transfer_state(TransferState::kWaiting);
633 SetTimeout(chunk_timeout_);
634 } else {
635 // More data is to be sent. Set a timeout to send the next chunk following
636 // the chunk delay.
637 SetTimeout(chrono::SystemClock::for_at_least(interchunk_delay_));
638 }
639 }
640
HandleReceiveChunk(const Chunk & chunk)641 void Context::HandleReceiveChunk(const Chunk& chunk) {
642 if (transfer_state_ == TransferState::kInitiating) {
643 PerformInitialHandshake(chunk);
644 return;
645 }
646
647 if (chunk.protocol_version() != configured_protocol_version_) {
648 PW_LOG_ERROR(
649 "Receive transfer %u was configured to use protocol version %d "
650 "but received a chunk with version %d",
651 id_for_log(),
652 static_cast<int>(configured_protocol_version_),
653 static_cast<int>(chunk.protocol_version()));
654 TerminateTransfer(Status::Internal());
655 return;
656 }
657
658 switch (transfer_state_) {
659 case TransferState::kInactive:
660 case TransferState::kTransmitting:
661 case TransferState::kInitiating:
662 PW_CRASH("HandleReceiveChunk() called in bad transfer state %d",
663 static_cast<int>(transfer_state_));
664
665 case TransferState::kCompleted:
666 // If the transfer has already completed and another chunk is received,
667 // re-send the final status chunk.
668 //
669 // TODO(frolv): Final status chunks should be ACKed by the other end. When
670 // that is added, this case should be updated to check if the received
671 // chunk is an ACK. If so, the transfer state can be reset to INACTIVE.
672 // Otherwise, the final status should be re-sent.
673 SendFinalStatusChunk();
674 return;
675
676 case TransferState::kRecovery:
677 if (chunk.offset() != offset_) {
678 if (last_chunk_offset_ == chunk.offset()) {
679 PW_LOG_DEBUG(
680 "Transfer %u received repeated offset %u; retry detected, "
681 "resending transfer parameters",
682 static_cast<unsigned>(session_id_),
683 static_cast<unsigned>(chunk.offset()));
684
685 UpdateAndSendTransferParameters(TransmitAction::kRetransmit);
686 if (DataTransferComplete()) {
687 return;
688 }
689 PW_LOG_DEBUG("Transfer %u waiting for offset %u, ignoring %u",
690 static_cast<unsigned>(session_id_),
691 static_cast<unsigned>(offset_),
692 static_cast<unsigned>(chunk.offset()));
693 }
694
695 last_chunk_offset_ = chunk.offset();
696 SetTimeout(chunk_timeout_);
697 return;
698 }
699
700 PW_LOG_DEBUG("Transfer %u received expected offset %u, resuming transfer",
701 static_cast<unsigned>(session_id_),
702 static_cast<unsigned>(offset_));
703 set_transfer_state(TransferState::kWaiting);
704
705 // The correct chunk was received; process it normally.
706 [[fallthrough]];
707 case TransferState::kWaiting:
708 HandleReceivedData(chunk);
709 return;
710
711 case TransferState::kTerminating:
712 HandleTerminatingChunk(chunk);
713 return;
714 }
715 }
716
HandleReceivedData(const Chunk & chunk)717 void Context::HandleReceivedData(const Chunk& chunk) {
718 if (chunk.offset() != offset_) {
719 // Bad offset; reset pending_bytes to send another parameters chunk.
720 PW_LOG_DEBUG(
721 "Transfer %u expected offset %u, received %u; entering recovery state",
722 static_cast<unsigned>(session_id_),
723 static_cast<unsigned>(offset_),
724 static_cast<unsigned>(chunk.offset()));
725
726 set_transfer_state(TransferState::kRecovery);
727 SetTimeout(chunk_timeout_);
728
729 UpdateAndSendTransferParameters(TransmitAction::kRetransmit);
730 return;
731 }
732
733 if (chunk.offset() + chunk.payload().size() > window_end_offset_) {
734 // End the transfer, as this indicates a bug with the client implementation
735 // where it doesn't respect pending_bytes. Trying to recover from here
736 // could potentially result in an infinite transfer loop.
737 PW_LOG_ERROR(
738 "Transfer %u received more data than what was requested (%u received "
739 "for %u pending); terminating transfer.",
740 id_for_log(),
741 static_cast<unsigned>(chunk.payload().size()),
742 static_cast<unsigned>(window_end_offset_ - offset_));
743 TerminateTransfer(Status::Internal());
744 return;
745 }
746
747 // Update the last offset seen so that retries can be detected.
748 last_chunk_offset_ = chunk.offset();
749
750 // Write staged data from the buffer to the stream.
751 if (chunk.has_payload()) {
752 if (Status status = writer().Write(chunk.payload()); !status.ok()) {
753 PW_LOG_ERROR(
754 "Transfer %u write of %u B chunk failed with status %u; aborting "
755 "with DATA_LOSS",
756 static_cast<unsigned>(session_id_),
757 static_cast<unsigned>(chunk.payload().size()),
758 status.code());
759 TerminateTransfer(Status::DataLoss());
760 return;
761 }
762
763 transfer_rate_.Update(chunk.payload().size());
764 }
765
766 // When the client sets remaining_bytes to 0, it indicates completion of the
767 // transfer. Acknowledge the completion through a status chunk and clean up.
768 if (chunk.IsFinalTransmitChunk()) {
769 TerminateTransfer(OkStatus());
770 return;
771 }
772
773 // Update the transfer state.
774 offset_ += chunk.payload().size();
775
776 if (chunk.window_end_offset() != 0) {
777 if (chunk.window_end_offset() < offset_) {
778 PW_LOG_ERROR(
779 "Transfer %u got invalid end offset of %u (current offset %u)",
780 id_for_log(),
781 static_cast<unsigned>(chunk.window_end_offset()),
782 static_cast<unsigned>(offset_));
783 TerminateTransfer(Status::Internal());
784 return;
785 }
786
787 if (chunk.window_end_offset() > window_end_offset_) {
788 // A transmitter should never send a larger end offset than what the
789 // receiver has advertised. If this occurs, there is a bug in the
790 // transmitter implementation. Terminate the transfer.
791 PW_LOG_ERROR(
792 "Transfer %u transmitter sent invalid end offset of %u, "
793 "greater than receiver offset %u",
794 id_for_log(),
795 static_cast<unsigned>(chunk.window_end_offset()),
796 static_cast<unsigned>(window_end_offset_));
797 TerminateTransfer(Status::Internal());
798 return;
799 }
800
801 window_end_offset_ = chunk.window_end_offset();
802 }
803
804 SetTimeout(chunk_timeout_);
805
806 if (offset_ == window_end_offset_) {
807 // Received all pending data. Advance the transfer parameters.
808 UpdateAndSendTransferParameters(TransmitAction::kRetransmit);
809 return;
810 }
811
812 // Once the transmitter has sent a sufficient amount of data, try to extend
813 // the window to allow it to continue sending data without blocking.
814 uint32_t remaining_window_size = window_end_offset_ - offset_;
815 bool extend_window = remaining_window_size <=
816 window_size_ / max_parameters_->extend_window_divisor();
817
818 if (extend_window) {
819 UpdateAndSendTransferParameters(TransmitAction::kExtend);
820 return;
821 }
822 }
823
HandleTerminatingChunk(const Chunk & chunk)824 void Context::HandleTerminatingChunk(const Chunk& chunk) {
825 switch (chunk.type()) {
826 case Chunk::Type::kCompletion:
827 PW_CRASH("Completion chunks should be processed by HandleChunkEvent()");
828
829 case Chunk::Type::kCompletionAck:
830 PW_LOG_INFO(
831 "Transfer %u completed with status %u", id_for_log(), status_.code());
832 set_transfer_state(TransferState::kInactive);
833 break;
834
835 case Chunk::Type::kData:
836 case Chunk::Type::kStart:
837 case Chunk::Type::kParametersRetransmit:
838 case Chunk::Type::kParametersContinue:
839 case Chunk::Type::kStartAck:
840 case Chunk::Type::kStartAckConfirmation:
841 // If a non-completion chunk is received in a TERMINATING state, re-send
842 // the transfer's completion chunk to the peer.
843 EncodeAndSendChunk(
844 Chunk::Final(configured_protocol_version_, session_id_, status_));
845 break;
846 }
847 }
848
TerminateTransfer(Status status,bool with_resource_id)849 void Context::TerminateTransfer(Status status, bool with_resource_id) {
850 if (transfer_state_ == TransferState::kTerminating ||
851 transfer_state_ == TransferState::kCompleted) {
852 // Transfer has already been terminated; no need to do it again.
853 return;
854 }
855
856 Finish(status);
857
858 PW_LOG_INFO("Transfer %u terminating with status %u",
859 static_cast<unsigned>(session_id_),
860 status.code());
861
862 if (ShouldSkipCompletionHandshake()) {
863 set_transfer_state(TransferState::kCompleted);
864 } else {
865 set_transfer_state(TransferState::kTerminating);
866 SetTimeout(chunk_timeout_);
867 }
868
869 // Don't send a final chunk if the other end of the transfer has not yet
870 // made contact, as there is no one to notify.
871 if ((flags_ & kFlagsContactMade) == kFlagsContactMade) {
872 SendFinalStatusChunk(with_resource_id);
873 }
874 }
875
HandleTermination(Status status)876 void Context::HandleTermination(Status status) {
877 Finish(status);
878
879 PW_LOG_INFO("Transfer %u completed with status %u",
880 static_cast<unsigned>(session_id_),
881 status.code());
882
883 if (ShouldSkipCompletionHandshake()) {
884 set_transfer_state(TransferState::kCompleted);
885 } else {
886 EncodeAndSendChunk(
887 Chunk(configured_protocol_version_, Chunk::Type::kCompletionAck)
888 .set_session_id(session_id_));
889
890 set_transfer_state(TransferState::kInactive);
891 }
892 }
893
SendFinalStatusChunk(bool with_resource_id)894 void Context::SendFinalStatusChunk(bool with_resource_id) {
895 PW_DCHECK(transfer_state_ == TransferState::kCompleted ||
896 transfer_state_ == TransferState::kTerminating);
897
898 PW_LOG_DEBUG("Sending final chunk for transfer %u with status %u",
899 static_cast<unsigned>(session_id_),
900 status_.code());
901
902 Chunk chunk =
903 Chunk::Final(configured_protocol_version_, session_id_, status_);
904 if (with_resource_id) {
905 chunk.set_resource_id(resource_id_);
906 }
907 EncodeAndSendChunk(chunk);
908 }
909
Finish(Status status)910 void Context::Finish(Status status) {
911 PW_DCHECK(active());
912
913 status.Update(FinalCleanup(status));
914 status_ = status;
915
916 SetTimeout(kFinalChunkAckTimeout);
917 }
918
SetTimeout(chrono::SystemClock::duration timeout)919 void Context::SetTimeout(chrono::SystemClock::duration timeout) {
920 next_timeout_ = chrono::SystemClock::TimePointAfterAtLeast(timeout);
921 }
922
HandleTimeout()923 void Context::HandleTimeout() {
924 ClearTimeout();
925
926 switch (transfer_state_) {
927 case TransferState::kCompleted:
928 // A timeout occurring in a completed state indicates that the other side
929 // never ACKed the final status packet. Reset the context to inactive.
930 set_transfer_state(TransferState::kInactive);
931 return;
932
933 case TransferState::kTransmitting:
934 // A timeout occurring in a TRANSMITTING state indicates that the transfer
935 // has waited for its inter-chunk delay and should transmit its next
936 // chunk.
937 TransmitNextChunk(/*retransmit_requested=*/false);
938 break;
939
940 case TransferState::kInitiating:
941 case TransferState::kWaiting:
942 case TransferState::kRecovery:
943 case TransferState::kTerminating:
944 // A timeout occurring in a transfer or handshake state indicates that no
945 // chunk has been received from the other side. The transfer should retry
946 // its previous operation.
947 SetTimeout(chunk_timeout_); // Retry() clears the timeout if it fails
948 Retry();
949 break;
950
951 case TransferState::kInactive:
952 PW_LOG_ERROR("Timeout occurred in INACTIVE state");
953 return;
954 }
955 }
956
Retry()957 void Context::Retry() {
958 if (retries_ == max_retries_ || lifetime_retries_ == max_lifetime_retries_) {
959 PW_LOG_ERROR(
960 "Transfer %u failed to receive a chunk after %u retries (lifetime %u).",
961 id_for_log(),
962 static_cast<unsigned>(retries_),
963 static_cast<unsigned>(lifetime_retries_));
964 PW_LOG_ERROR("Canceling transfer.");
965
966 if (transfer_state_ == TransferState::kTerminating) {
967 // Timeouts occurring in a TERMINATING state indicate that the completion
968 // chunk was never ACKed. Simply clean up the transfer context.
969 set_transfer_state(TransferState::kInactive);
970 } else {
971 TerminateTransfer(Status::DeadlineExceeded());
972 }
973 return;
974 }
975
976 ++retries_;
977 ++lifetime_retries_;
978
979 if (transfer_state_ == TransferState::kInitiating ||
980 last_chunk_sent_ == Chunk::Type::kStartAckConfirmation) {
981 RetryHandshake();
982 return;
983 }
984
985 if (transfer_state_ == TransferState::kTerminating) {
986 EncodeAndSendChunk(
987 Chunk::Final(configured_protocol_version_, session_id_, status_));
988 return;
989 }
990
991 if (type() == TransferType::kReceive) {
992 // Resend the most recent transfer parameters.
993 PW_LOG_DEBUG(
994 "Receive transfer %u timed out waiting for chunk; resending parameters",
995 static_cast<unsigned>(session_id_));
996
997 SendTransferParameters(TransmitAction::kRetransmit);
998 return;
999 }
1000
1001 // In a transmit, if a data chunk has not yet been sent, the initial transfer
1002 // parameters did not arrive from the receiver. Resend the initial chunk.
1003 if ((flags_ & kFlagsDataSent) != kFlagsDataSent) {
1004 PW_LOG_DEBUG(
1005 "Transmit transfer %u timed out waiting for initial parameters",
1006 static_cast<unsigned>(session_id_));
1007 SendInitialTransmitChunk();
1008 return;
1009 }
1010
1011 // Otherwise, resend the most recent chunk. If the reader doesn't support
1012 // seeking, this isn't possible, so just terminate the transfer immediately.
1013 if (!reader().Seek(last_chunk_offset_).ok()) {
1014 PW_LOG_ERROR("Transmit transfer %u timed out waiting for new parameters.",
1015 id_for_log());
1016 PW_LOG_ERROR("Retrying requires a seekable reader. Alas, ours is not.");
1017 TerminateTransfer(Status::DeadlineExceeded());
1018 return;
1019 }
1020
1021 // Rewind the transfer position and resend the chunk.
1022 offset_ = last_chunk_offset_;
1023
1024 TransmitNextChunk(/*retransmit_requested=*/false);
1025 }
1026
RetryHandshake()1027 void Context::RetryHandshake() {
1028 Chunk retry_chunk(configured_protocol_version_, last_chunk_sent_);
1029
1030 switch (last_chunk_sent_) {
1031 case Chunk::Type::kStart:
1032 // No protocol version is yet configured at the time of sending the start
1033 // chunk, so we use the client's desired version instead.
1034 retry_chunk.set_protocol_version(desired_protocol_version_)
1035 .set_resource_id(resource_id_);
1036 if (type() == TransferType::kReceive) {
1037 SetTransferParameters(retry_chunk);
1038 }
1039 break;
1040
1041 case Chunk::Type::kStartAck:
1042 retry_chunk.set_session_id(session_id_)
1043 .set_resource_id(static_cast<ServerContext&>(*this).handler()->id());
1044 break;
1045
1046 case Chunk::Type::kStartAckConfirmation:
1047 retry_chunk.set_session_id(session_id_);
1048 if (type() == TransferType::kReceive) {
1049 SetTransferParameters(retry_chunk);
1050 }
1051 break;
1052
1053 case Chunk::Type::kData:
1054 case Chunk::Type::kParametersRetransmit:
1055 case Chunk::Type::kParametersContinue:
1056 case Chunk::Type::kCompletion:
1057 case Chunk::Type::kCompletionAck:
1058 PW_CRASH("Should not RetryHandshake() when not in handshake phase");
1059 }
1060
1061 EncodeAndSendChunk(retry_chunk);
1062 }
1063
MaxWriteChunkSize(uint32_t max_chunk_size_bytes,uint32_t channel_id) const1064 uint32_t Context::MaxWriteChunkSize(uint32_t max_chunk_size_bytes,
1065 uint32_t channel_id) const {
1066 // Start with the user-provided maximum chunk size, which should be the usable
1067 // payload length on the RPC ingress path after any transport overhead.
1068 ptrdiff_t max_size = max_chunk_size_bytes;
1069
1070 // Subtract the RPC overhead (pw_rpc/internal/packet.proto).
1071 //
1072 // type: 1 byte key, 1 byte value (CLIENT_STREAM)
1073 // channel_id: 1 byte key, varint value (calculate from stream)
1074 // service_id: 1 byte key, 4 byte value
1075 // method_id: 1 byte key, 4 byte value
1076 // payload: 1 byte key, varint length (remaining space)
1077 // status: 0 bytes (not set in stream packets)
1078 //
1079 // TOTAL: 14 bytes + encoded channel_id size + encoded payload length
1080 //
1081 max_size -= 14;
1082 max_size -= varint::EncodedSize(channel_id);
1083 max_size -= varint::EncodedSize(max_size);
1084
1085 // TODO(frolv): Temporarily add 5 bytes for the new call_id change. The RPC
1086 // overhead calculation will be moved into an RPC helper to avoid having
1087 // pw_transfer depend on RPC internals.
1088 max_size -= 5;
1089
1090 // Subtract the transfer service overhead for a client write chunk
1091 // (pw_transfer/transfer.proto).
1092 //
1093 // session_id: 1 byte key, varint value (calculate)
1094 // offset: 1 byte key, varint value (calculate)
1095 // data: 1 byte key, varint length (remaining space)
1096 //
1097 // TOTAL: 3 + encoded session_id + encoded offset + encoded data length
1098 //
1099 max_size -= 3;
1100 max_size -= varint::EncodedSize(session_id_);
1101 max_size -= varint::EncodedSize(window_end_offset_);
1102 max_size -= varint::EncodedSize(max_size);
1103
1104 // A resulting value of zero (or less) renders write transfers unusable, as
1105 // there is no space to send any payload. This should be considered a
1106 // programmer error in the transfer service setup.
1107 PW_CHECK_INT_GT(
1108 max_size,
1109 0,
1110 "Transfer service maximum chunk size is too small to fit a payload. "
1111 "Increase max_chunk_size_bytes to support write transfers.");
1112
1113 return max_size;
1114 }
1115
LogTransferConfiguration()1116 void Context::LogTransferConfiguration() {
1117 PW_LOG_DEBUG(
1118 "Local transfer timing configuration: "
1119 "chunk_timeout=%ums, max_retries=%u, interchunk_delay=%uus",
1120 static_cast<unsigned>(
1121 std::chrono::ceil<std::chrono::milliseconds>(chunk_timeout_).count()),
1122 static_cast<unsigned>(max_retries_),
1123 static_cast<unsigned>(
1124 std::chrono::ceil<std::chrono::microseconds>(interchunk_delay_)
1125 .count()));
1126
1127 PW_LOG_DEBUG(
1128 "Local transfer windowing configuration: "
1129 "pending_bytes=%u, extend_window_divisor=%u, max_chunk_size_bytes=%u",
1130 static_cast<unsigned>(max_parameters_->pending_bytes()),
1131 static_cast<unsigned>(max_parameters_->extend_window_divisor()),
1132 static_cast<unsigned>(max_parameters_->max_chunk_size_bytes()));
1133 }
1134
1135 } // namespace pw::transfer::internal
1136