1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #define PW_LOG_MODULE_NAME "TRN"
16 #define PW_LOG_LEVEL PW_TRANSFER_CONFIG_LOG_LEVEL
17
18 #include "pw_transfer/internal/context.h"
19
20 #include <chrono>
21 #include <limits>
22
23 #include "pw_assert/check.h"
24 #include "pw_chrono/system_clock.h"
25 #include "pw_log/log.h"
26 #include "pw_log/rate_limited.h"
27 #include "pw_preprocessor/compiler.h"
28 #include "pw_protobuf/serialized_size.h"
29 #include "pw_transfer/internal/config.h"
30 #include "pw_transfer/transfer.pwpb.h"
31 #include "pw_transfer/transfer_thread.h"
32 #include "pw_varint/varint.h"
33
34 namespace pw::transfer::internal {
35
HandleEvent(const Event & event)36 void Context::HandleEvent(const Event& event) {
37 switch (event.type) {
38 case EventType::kNewClientTransfer:
39 case EventType::kNewServerTransfer: {
40 if (active()) {
41 if (event.type == EventType::kNewServerTransfer &&
42 event.new_transfer.session_id == session_id_ &&
43 last_chunk_sent_ == Chunk::Type::kStartAck) {
44 // The client is retrying its initial chunk as the response may not
45 // have made it back. Re-send the handshake response without going
46 // through handler reinitialization.
47 RetryHandshake();
48 return;
49 }
50 Abort(Status::Aborted());
51 }
52
53 Initialize(event.new_transfer);
54
55 if (event.type == EventType::kNewClientTransfer) {
56 InitiateTransferAsClient();
57 } else {
58 if (StartTransferAsServer(event.new_transfer)) {
59 // TODO(frolv): This should probably be restructured.
60 HandleChunkEvent({.context_identifier = event.new_transfer.session_id,
61 .match_resource_id = false, // Unused.
62 .data = event.new_transfer.raw_chunk_data,
63 .size = event.new_transfer.raw_chunk_size});
64 }
65 }
66 return;
67 }
68
69 case EventType::kClientChunk:
70 case EventType::kServerChunk:
71 PW_CHECK(initialized());
72 HandleChunkEvent(event.chunk);
73 return;
74
75 case EventType::kClientTimeout:
76 case EventType::kServerTimeout:
77 HandleTimeout();
78 return;
79
80 case EventType::kClientEndTransfer:
81 case EventType::kServerEndTransfer:
82 if (active()) {
83 if (event.end_transfer.send_status_chunk) {
84 TerminateTransfer(event.end_transfer.status);
85 } else {
86 Abort(event.end_transfer.status);
87 }
88 }
89 return;
90
91 case EventType::kSendStatusChunk:
92 case EventType::kAddTransferHandler:
93 case EventType::kRemoveTransferHandler:
94 case EventType::kSetStream:
95 case EventType::kTerminate:
96 case EventType::kUpdateClientTransfer:
97 case EventType::kGetResourceStatus:
98 // These events are intended for the transfer thread and should never be
99 // forwarded through to a context.
100 PW_CRASH("Transfer context received a transfer thread event");
101 }
102 }
103
InitiateTransferAsClient()104 void Context::InitiateTransferAsClient() {
105 PW_DCHECK(active());
106
107 SetTimeout(initial_chunk_timeout_);
108
109 PW_LOG_INFO("Starting transfer for resource %u",
110 static_cast<unsigned>(resource_id_));
111
112 // Receive transfers should prepare their initial parameters to be send in the
113 // initial chunk.
114 if (type() == TransferType::kReceive) {
115 UpdateTransferParameters(TransmitAction::kBegin);
116 }
117
118 if (desired_protocol_version_ == ProtocolVersion::kLegacy) {
119 // Legacy transfers go straight into the data transfer phase without a
120 // handshake.
121 if (type() == TransferType::kReceive) {
122 SendTransferParameters(TransmitAction::kBegin);
123 } else {
124 SendInitialLegacyTransmitChunk();
125 }
126
127 LogTransferConfiguration();
128 return;
129 }
130
131 // In newer protocol versions, begin the initial transfer handshake.
132 Chunk start_chunk(desired_protocol_version_, Chunk::Type::kStart);
133 start_chunk.set_desired_session_id(session_id_);
134 start_chunk.set_resource_id(resource_id_);
135 start_chunk.set_initial_offset(offset_);
136
137 if (type() == TransferType::kReceive) {
138 // Parameters should still be set on the initial chunk for backwards
139 // compatibility if the server only supports the legacy protocol.
140 SetTransferParameters(start_chunk);
141 }
142
143 EncodeAndSendChunk(start_chunk);
144 }
145
StartTransferAsServer(const NewTransferEvent & new_transfer)146 bool Context::StartTransferAsServer(const NewTransferEvent& new_transfer) {
147 PW_LOG_INFO("Starting %s transfer %u for resource %u with offset %u",
148 new_transfer.type == TransferType::kTransmit ? "read" : "write",
149 static_cast<unsigned>(new_transfer.session_id),
150 static_cast<unsigned>(new_transfer.resource_id),
151 static_cast<unsigned>(new_transfer.initial_offset));
152 LogTransferConfiguration();
153
154 flags_ |= kFlagsContactMade;
155
156 if (Status status = new_transfer.handler->Prepare(
157 new_transfer.type, new_transfer.initial_offset);
158 !status.ok()) {
159 PW_LOG_WARN("Transfer handler %u prepare failed with status %u",
160 static_cast<unsigned>(new_transfer.handler->id()),
161 status.code());
162
163 // As this failure occurs at the start of a transfer, no protocol version is
164 // yet negotiated and one must be set to send a response. It is okay to use
165 // the desired version here, as that comes from the client.
166 configured_protocol_version_ = desired_protocol_version_;
167
168 status = (status.IsPermissionDenied() || status.IsUnimplemented() ||
169 status.IsResourceExhausted())
170 ? status
171 : Status::DataLoss();
172 TerminateTransfer(status, /*with_resource_id=*/true);
173 return false;
174 }
175
176 // Initialize doesn't set the handler since it's specific to server transfers.
177 static_cast<ServerContext&>(*this).set_handler(*new_transfer.handler);
178
179 // Server transfers use the stream provided by the handler rather than the
180 // stream included in the NewTransferEvent.
181 stream_ = &new_transfer.handler->stream();
182
183 return true;
184 }
185
SendInitialLegacyTransmitChunk()186 void Context::SendInitialLegacyTransmitChunk() {
187 // A transmitter begins a transfer by sending the ID of the resource to which
188 // it wishes to write.
189 Chunk chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart);
190 chunk.set_session_id(resource_id_);
191
192 EncodeAndSendChunk(chunk);
193 }
194
UpdateTransferParameters(TransmitAction action)195 void Context::UpdateTransferParameters(TransmitAction action) {
196 max_chunk_size_bytes_ = MaxWriteChunkSize(
197 max_parameters_->max_chunk_size_bytes(), rpc_writer_->channel_id());
198 uint32_t window_size = 0;
199
200 if (max_chunk_size_bytes_ > max_parameters_->max_window_size_bytes()) {
201 window_size =
202 std::min(max_parameters_->max_window_size_bytes(),
203 static_cast<uint32_t>(writer().ConservativeWriteLimit()));
204 } else {
205 // Adjust the window size based on the latest event in the transfer.
206 switch (action) {
207 case TransmitAction::kBegin:
208 case TransmitAction::kFirstParameters:
209 // A transfer always begins with a window size of one chunk, set during
210 // initialization. No further handling is required.
211 break;
212
213 case TransmitAction::kExtend:
214 // Window was received successfully without packet loss and should grow.
215 // Double the window size during slow start, or increase it by a single
216 // chunk in congestion avoidance.
217 if (transmit_phase_ == TransmitPhase::kCongestionAvoidance) {
218 window_size_multiplier_ += 1;
219 } else {
220 window_size_multiplier_ *= 2;
221 }
222
223 // The window size can never exceed the user-specified maximum bytes. If
224 // it does, reduce the multiplier to the largest size that fits.
225 if (window_size_multiplier_ * max_chunk_size_bytes_ >
226 max_parameters_->max_window_size_bytes()) {
227 window_size_multiplier_ =
228 max_parameters_->max_window_size_bytes() / max_chunk_size_bytes_;
229 }
230 break;
231
232 case TransmitAction::kRetransmit:
233 // A packet was lost: shrink the window size. Additionally, after the
234 // first packet loss, transition from the slow start to the congestion
235 // avoidance phase of the transfer.
236 if (transmit_phase_ == TransmitPhase::kSlowStart) {
237 transmit_phase_ = TransmitPhase::kCongestionAvoidance;
238 }
239 window_size_multiplier_ =
240 std::max(window_size_multiplier_ / static_cast<uint32_t>(2),
241 static_cast<uint32_t>(1));
242 break;
243 }
244
245 window_size =
246 std::min({window_size_multiplier_ * max_chunk_size_bytes_,
247 max_parameters_->max_window_size_bytes(),
248 static_cast<uint32_t>(writer().ConservativeWriteLimit())});
249 }
250
251 window_size_ = window_size;
252 window_end_offset_ = offset_ + window_size;
253 }
254
SetTransferParameters(Chunk & parameters)255 void Context::SetTransferParameters(Chunk& parameters) {
256 parameters.set_window_end_offset(window_end_offset_)
257 .set_max_chunk_size_bytes(max_chunk_size_bytes_)
258 .set_min_delay_microseconds(kDefaultChunkDelayMicroseconds)
259 .set_offset(offset_);
260 }
261
UpdateAndSendTransferParameters(TransmitAction action)262 void Context::UpdateAndSendTransferParameters(TransmitAction action) {
263 UpdateTransferParameters(action);
264
265 return SendTransferParameters(action);
266 }
267
SendTransferParameters(TransmitAction action)268 void Context::SendTransferParameters(TransmitAction action) {
269 Chunk::Type type = Chunk::Type::kParametersRetransmit;
270
271 switch (action) {
272 case TransmitAction::kBegin:
273 type = Chunk::Type::kStart;
274 break;
275 case TransmitAction::kFirstParameters:
276 case TransmitAction::kRetransmit:
277 type = Chunk::Type::kParametersRetransmit;
278 break;
279 case TransmitAction::kExtend:
280 type = Chunk::Type::kParametersContinue;
281 break;
282 }
283
284 Chunk parameters(configured_protocol_version_, type);
285 parameters.set_session_id(session_id_);
286 SetTransferParameters(parameters);
287
288 PW_LOG_EVERY_N_DURATION(
289 PW_LOG_LEVEL_INFO,
290 log_rate_limit_,
291 "Transfer rate: %u B/s",
292 static_cast<unsigned>(transfer_rate_.GetRateBytesPerSecond()));
293
294 PW_LOG_EVERY_N_DURATION(PW_LOG_LEVEL_INFO,
295 log_rate_limit_,
296 "Transfer %u sending transfer parameters: "
297 "offset=%u, window_end_offset=%u, max_chunk_size=%u",
298 static_cast<unsigned>(session_id_),
299 static_cast<unsigned>(offset_),
300 static_cast<unsigned>(window_end_offset_),
301 static_cast<unsigned>(max_chunk_size_bytes_));
302
303 if (log_chunks_before_rate_limit_ > 0) {
304 log_chunks_before_rate_limit_--;
305
306 if (log_chunks_before_rate_limit_ == 0) {
307 log_rate_limit_ = log_rate_limit_cfg_;
308 }
309 }
310
311 EncodeAndSendChunk(parameters);
312 }
313
EncodeAndSendChunk(const Chunk & chunk)314 void Context::EncodeAndSendChunk(const Chunk& chunk) {
315 last_chunk_sent_ = chunk.type();
316
317 #if PW_TRANSFER_CONFIG_DEBUG_CHUNKS
318 if ((chunk.remaining_bytes().has_value() &&
319 chunk.remaining_bytes().value() == 0) ||
320 (chunk.type() != Chunk::Type::kData &&
321 chunk.type() != Chunk::Type::kParametersContinue)) {
322 chunk.LogChunk(false, pw::chrono::SystemClock::duration::zero());
323 }
324 #endif
325
326 #if PW_TRANSFER_CONFIG_DEBUG_DATA_CHUNKS
327 if (chunk.type() == Chunk::Type::kData ||
328 chunk.type() == Chunk::Type::kParametersContinue) {
329 chunk.LogChunk(false, log_rate_limit_);
330 }
331 #endif
332
333 Result<ConstByteSpan> data = chunk.Encode(thread_->encode_buffer());
334 if (!data.ok()) {
335 PW_LOG_ERROR("Failed to encode chunk for transfer %u: %d",
336 static_cast<unsigned>(chunk.session_id()),
337 data.status().code());
338 if (active()) {
339 TerminateTransfer(Status::Internal());
340 }
341 return;
342 }
343
344 if (const Status status = rpc_writer_->Write(*data); !status.ok()) {
345 PW_LOG_ERROR("Failed to write chunk for transfer %u: %d",
346 static_cast<unsigned>(chunk.session_id()),
347 status.code());
348 if (active()) {
349 TerminateTransfer(Status::Internal());
350 }
351 return;
352 }
353 }
354
Initialize(const NewTransferEvent & new_transfer)355 void Context::Initialize(const NewTransferEvent& new_transfer) {
356 PW_DCHECK(!active());
357
358 PW_DCHECK_INT_NE(new_transfer.protocol_version,
359 ProtocolVersion::kUnknown,
360 "Cannot start a transfer with an unknown protocol");
361
362 session_id_ = new_transfer.session_id;
363 resource_id_ = new_transfer.resource_id;
364 desired_protocol_version_ = new_transfer.protocol_version;
365 configured_protocol_version_ = ProtocolVersion::kUnknown;
366
367 flags_ = static_cast<uint8_t>(new_transfer.type);
368 transfer_state_ = TransferState::kWaiting;
369 retries_ = 0;
370 max_retries_ = new_transfer.max_retries;
371 lifetime_retries_ = 0;
372 max_lifetime_retries_ = new_transfer.max_lifetime_retries;
373
374 if (desired_protocol_version_ == ProtocolVersion::kLegacy) {
375 // In a legacy transfer, there is no protocol negotiation stage.
376 // Automatically configure the context to run the legacy protocol and
377 // proceed to waiting for a chunk.
378 configured_protocol_version_ = ProtocolVersion::kLegacy;
379 transfer_state_ = TransferState::kWaiting;
380 } else {
381 transfer_state_ = TransferState::kInitiating;
382 }
383
384 rpc_writer_ = new_transfer.rpc_writer;
385 stream_ = new_transfer.stream;
386
387 offset_ = new_transfer.initial_offset;
388 initial_offset_ = new_transfer.initial_offset;
389 window_size_ = 0;
390 window_end_offset_ = 0;
391 max_chunk_size_bytes_ = new_transfer.max_parameters->max_chunk_size_bytes();
392
393 window_size_multiplier_ = 1;
394 transmit_phase_ = TransmitPhase::kSlowStart;
395
396 max_parameters_ = new_transfer.max_parameters;
397 thread_ = new_transfer.transfer_thread;
398
399 last_chunk_sent_ = Chunk::Type::kStart;
400 last_chunk_offset_ = 0;
401 chunk_timeout_ = new_transfer.timeout;
402 initial_chunk_timeout_ = new_transfer.initial_timeout;
403 interchunk_delay_ = chrono::SystemClock::for_at_least(
404 std::chrono::microseconds(kDefaultChunkDelayMicroseconds));
405 next_timeout_ = kNoTimeout;
406 log_chunks_before_rate_limit_ = log_chunks_before_rate_limit_cfg_;
407
408 transfer_rate_.Reset();
409 }
410
HandleChunkEvent(const ChunkEvent & event)411 void Context::HandleChunkEvent(const ChunkEvent& event) {
412 Result<Chunk> maybe_chunk =
413 Chunk::Parse(ConstByteSpan(event.data, event.size));
414 if (!maybe_chunk.ok()) {
415 return;
416 }
417
418 Chunk chunk = *maybe_chunk;
419
420 // Received some data. Reset the retry counter.
421 retries_ = 0;
422 flags_ |= kFlagsContactMade;
423
424 #if PW_TRANSFER_CONFIG_DEBUG_CHUNKS
425 if (chunk.type() != Chunk::Type::kData &&
426 chunk.type() != Chunk::Type::kParametersContinue) {
427 chunk.LogChunk(true, pw::chrono::SystemClock::duration::zero());
428 }
429 #endif
430 #if PW_TRANSFER_CONFIG_DEBUG_DATA_CHUNKS
431 if (chunk.type() == Chunk::Type::kData ||
432 chunk.type() == Chunk::Type::kParametersContinue) {
433 chunk.LogChunk(true, log_rate_limit_);
434 }
435 #endif
436
437 if (chunk.IsTerminatingChunk()) {
438 if (active()) {
439 HandleTermination(chunk.status().value());
440 } else {
441 PW_LOG_INFO("Got final status %d for completed transfer %d",
442 static_cast<int>(chunk.status().value().code()),
443 static_cast<int>(session_id_));
444 }
445 return;
446 }
447
448 if (type() == TransferType::kTransmit) {
449 HandleTransmitChunk(chunk);
450 } else {
451 HandleReceiveChunk(chunk);
452 }
453 }
454
PerformInitialHandshake(const Chunk & chunk)455 void Context::PerformInitialHandshake(const Chunk& chunk) {
456 switch (chunk.type()) {
457 // Initial packet sent from a client to a server.
458 case Chunk::Type::kStart: {
459 UpdateLocalProtocolConfigurationFromPeer(chunk);
460
461 if (type() == TransferType::kReceive) {
462 // Update window end offset so it is valid.
463 window_end_offset_ = offset_;
464 }
465
466 // This cast is safe as we know we're running in a transfer server.
467 uint32_t resource_id = static_cast<ServerContext&>(*this).handler()->id();
468
469 Chunk start_ack(configured_protocol_version_, Chunk::Type::kStartAck);
470 start_ack.set_session_id(session_id_);
471 start_ack.set_resource_id(resource_id);
472 start_ack.set_initial_offset(offset_);
473
474 EncodeAndSendChunk(start_ack);
475 break;
476 }
477
478 // Response packet sent from a server to a client, confirming the protocol
479 // version and session_id of the transfer.
480 case Chunk::Type::kStartAck: {
481 UpdateLocalProtocolConfigurationFromPeer(chunk);
482
483 // This should confirm the offset we're starting at
484 if (offset_ != chunk.initial_offset()) {
485 TerminateTransfer(Status::Unimplemented());
486 break;
487 }
488
489 Chunk start_ack_confirmation(configured_protocol_version_,
490 Chunk::Type::kStartAckConfirmation);
491 start_ack_confirmation.set_session_id(session_id_);
492
493 if (type() == TransferType::kReceive) {
494 // In a receive transfer, tag the initial transfer parameters onto the
495 // confirmation chunk so that the server can immediately begin sending
496 // data.
497 UpdateTransferParameters(TransmitAction::kFirstParameters);
498 SetTransferParameters(start_ack_confirmation);
499 }
500
501 set_transfer_state(TransferState::kWaiting);
502 EncodeAndSendChunk(start_ack_confirmation);
503 // we received a response, so we can re-up the timeout while waiting for
504 // parameters.
505 SetTimeout(chunk_timeout_);
506 break;
507 }
508
509 // Confirmation sent by a client to a server of the configured transfer
510 // version and session ID. Completes the handshake and begins the actual
511 // data transfer.
512 case Chunk::Type::kStartAckConfirmation: {
513 set_transfer_state(TransferState::kWaiting);
514
515 if (type() == TransferType::kTransmit) {
516 HandleTransmitChunk(chunk);
517 } else {
518 HandleReceiveChunk(chunk);
519 }
520 break;
521 }
522
523 // If a non-handshake chunk is received during an INITIATING state, the
524 // transfer peer is running a legacy protocol version, which does not
525 // perform a handshake. End the handshake, revert to the legacy protocol,
526 // and process the chunk appropriately.
527 case Chunk::Type::kData:
528 case Chunk::Type::kParametersRetransmit:
529 case Chunk::Type::kParametersContinue:
530
531 // Update the local session_id, which will map to the transfer_id of the
532 // legacy chunk.
533 session_id_ = chunk.session_id();
534
535 configured_protocol_version_ = ProtocolVersion::kLegacy;
536 // Cancel if we are not using at least version 2, and we tried to start a
537 // non-zero offset transfer
538 if (chunk.initial_offset() != 0) {
539 PW_LOG_ERROR("Legacy transfer does not support offset transfers!");
540 TerminateTransfer(Status::Internal());
541 break;
542 }
543
544 set_transfer_state(TransferState::kWaiting);
545
546 PW_LOG_DEBUG(
547 "Transfer %u tried to start on protocol version %d, but peer only "
548 "supports legacy",
549 id_for_log(),
550 static_cast<int>(desired_protocol_version_));
551
552 if (type() == TransferType::kTransmit) {
553 HandleTransmitChunk(chunk);
554 } else {
555 HandleReceiveChunk(chunk);
556 }
557 break;
558
559 case Chunk::Type::kCompletion:
560 case Chunk::Type::kCompletionAck:
561 PW_CRASH(
562 "Transfer completion packets should be processed by "
563 "HandleChunkEvent()");
564 break;
565 }
566 }
567
UpdateLocalProtocolConfigurationFromPeer(const Chunk & chunk)568 void Context::UpdateLocalProtocolConfigurationFromPeer(const Chunk& chunk) {
569 PW_LOG_DEBUG("Negotiating protocol version: ours=%d, theirs=%d",
570 static_cast<int>(desired_protocol_version_),
571 static_cast<int>(chunk.protocol_version()));
572
573 configured_protocol_version_ =
574 std::min(desired_protocol_version_, chunk.protocol_version());
575
576 PW_LOG_INFO("Transfer %u: using protocol version %d",
577 id_for_log(),
578 static_cast<int>(configured_protocol_version_));
579 }
580
HandleTransmitChunk(const Chunk & chunk)581 void Context::HandleTransmitChunk(const Chunk& chunk) {
582 switch (transfer_state_) {
583 case TransferState::kInactive:
584 case TransferState::kRecovery:
585 PW_CRASH("Never should handle chunk while inactive");
586
587 case TransferState::kCompleted:
588 // In a legacy transfer, if the transfer has already completed and another
589 // chunk is received, tell the other end that the transfer is over.
590 if (!chunk.IsInitialChunk() && status_.ok()) {
591 status_ = Status::FailedPrecondition();
592 }
593
594 SendFinalStatusChunk();
595 return;
596
597 case TransferState::kInitiating:
598 PerformInitialHandshake(chunk);
599 return;
600
601 case TransferState::kWaiting:
602 case TransferState::kTransmitting:
603 if (chunk.protocol_version() == configured_protocol_version_) {
604 HandleTransferParametersUpdate(chunk);
605 } else {
606 PW_LOG_ERROR(
607 "Transmit transfer %u was configured to use protocol version %d "
608 "but received a chunk with version %d",
609 id_for_log(),
610 static_cast<int>(configured_protocol_version_),
611 static_cast<int>(chunk.protocol_version()));
612 TerminateTransfer(Status::Internal());
613 }
614 return;
615
616 case TransferState::kTerminating:
617 HandleTerminatingChunk(chunk);
618 return;
619 }
620 }
621
HandleTransferParametersUpdate(const Chunk & chunk)622 void Context::HandleTransferParametersUpdate(const Chunk& chunk) {
623 bool retransmit = chunk.RequestsTransmissionFromOffset();
624
625 if (retransmit) {
626 // If the offsets don't match, attempt to seek on the reader. Not all
627 // readers support seeking; abort with UNIMPLEMENTED if this handler
628 // doesn't.
629 if (offset_ != chunk.offset()) {
630 if (Status seek_status = SeekReader(chunk.offset()); !seek_status.ok()) {
631 PW_LOG_WARN("Transfer %u seek to %u failed with status %u",
632 static_cast<unsigned>(session_id_),
633 static_cast<unsigned>(chunk.offset()),
634 seek_status.code());
635
636 // Remap status codes to return one of the following:
637 //
638 // INTERNAL: invalid seek, never should happen
639 // DATA_LOSS: the reader is in a bad state
640 // UNIMPLEMENTED: seeking is not supported
641 //
642 if (seek_status.IsOutOfRange()) {
643 seek_status = Status::Internal();
644 } else if (!seek_status.IsUnimplemented()) {
645 seek_status = Status::DataLoss();
646 }
647
648 TerminateTransfer(seek_status);
649 return;
650 }
651 }
652
653 offset_ = chunk.offset();
654 } else if (chunk.window_end_offset() <= offset_) {
655 PW_LOG_DEBUG("Transfer %u ignoring old rolling window chunk", id_for_log());
656 SetTimeout(chunk_timeout_);
657 return;
658 }
659
660 window_end_offset_ = chunk.window_end_offset();
661
662 if (chunk.max_chunk_size_bytes().has_value()) {
663 max_chunk_size_bytes_ = std::min(chunk.max_chunk_size_bytes().value(),
664 max_parameters_->max_chunk_size_bytes());
665 }
666
667 if (chunk.min_delay_microseconds().has_value()) {
668 interchunk_delay_ = chrono::SystemClock::for_at_least(
669 std::chrono::microseconds(chunk.min_delay_microseconds().value()));
670 }
671
672 if (retransmit) {
673 PW_LOG_INFO(
674 "Transfer %u received parameters type=RETRANSMIT offset=%u "
675 "window_end_offset=%u",
676 static_cast<unsigned>(session_id_),
677 static_cast<unsigned>(chunk.offset()),
678 static_cast<unsigned>(window_end_offset_));
679 } else {
680 PW_LOG_EVERY_N_DURATION(
681 PW_LOG_LEVEL_INFO,
682 std::chrono::seconds(3),
683 "Transfer %u received parameters type=CONTINUE offset=%u "
684 "window_end_offset=%u",
685 static_cast<unsigned>(session_id_),
686 static_cast<unsigned>(chunk.offset()),
687 static_cast<unsigned>(window_end_offset_));
688 }
689
690 // Parsed all of the parameters; start sending the window.
691 set_transfer_state(TransferState::kTransmitting);
692
693 TransmitNextChunk(retransmit);
694 }
695
TransmitNextChunk(bool retransmit_requested)696 void Context::TransmitNextChunk(bool retransmit_requested) {
697 Chunk chunk(configured_protocol_version_, Chunk::Type::kData);
698 chunk.set_session_id(session_id_);
699 chunk.set_offset(offset_);
700
701 // Reserve space for the data proto field overhead and use the remainder of
702 // the buffer for the chunk data.
703 size_t reserved_size =
704 chunk.EncodedSize() + 1 /* data key */ + 5 /* data size */;
705
706 size_t total_size = TransferSizeBytes();
707 if (total_size != std::numeric_limits<size_t>::max()) {
708 reserved_size += protobuf::SizeOfVarintField(
709 pwpb::Chunk::Fields::kRemainingBytes, total_size);
710 }
711
712 ByteSpan buffer = thread_->encode_buffer();
713 Result<ByteSpan> data;
714
715 if (offset_ < total_size) {
716 // Read the next chunk of data into the encode buffer.
717 ByteSpan data_buffer = buffer.subspan(reserved_size);
718 size_t max_bytes_to_send =
719 std::min(window_end_offset_ - offset_, max_chunk_size_bytes_);
720
721 if (max_bytes_to_send < data_buffer.size()) {
722 data_buffer = data_buffer.first(max_bytes_to_send);
723 }
724
725 data = reader().Read(data_buffer);
726 } else {
727 // The user-specified resource size has been reached: respect it.
728 data = Status::OutOfRange();
729 }
730
731 if (data.status().IsOutOfRange()) {
732 // No more data to read.
733 chunk.set_remaining_bytes(0);
734 window_end_offset_ = offset_;
735
736 PW_LOG_INFO("Transfer %u sending final chunk with remaining_bytes=0",
737 static_cast<unsigned>(session_id_));
738 } else if (data.ok()) {
739 if (offset_ == window_end_offset_) {
740 if (retransmit_requested) {
741 PW_LOG_ERROR(
742 "Transfer %u: received an empty retransmit request, but there is "
743 "still data to send; aborting with RESOURCE_EXHAUSTED",
744 id_for_log());
745 TerminateTransfer(Status::ResourceExhausted());
746 } else {
747 PW_LOG_DEBUG(
748 "Transfer %u: ignoring continuation packet for transfer window "
749 "that has already been sent",
750 id_for_log());
751 SetTimeout(chunk_timeout_);
752 }
753 return; // No data was requested, so there is nothing else to do.
754 }
755
756 PW_LOG_EVERY_N_DURATION(PW_LOG_LEVEL_DEBUG,
757 std::chrono::seconds(3),
758 "Transfer %u sending chunk offset=%u size=%u",
759 static_cast<unsigned>(session_id_),
760 static_cast<unsigned>(offset_),
761 static_cast<unsigned>(data.value().size()));
762
763 chunk.set_payload(data.value());
764 last_chunk_offset_ = offset_;
765 offset_ += data.value().size();
766
767 if (total_size != std::numeric_limits<size_t>::max()) {
768 chunk.set_remaining_bytes(total_size - offset_);
769 }
770 } else {
771 PW_LOG_ERROR("Transfer %u Read() failed with status %u",
772 static_cast<unsigned>(session_id_),
773 data.status().code());
774 TerminateTransfer(Status::DataLoss());
775 return;
776 }
777
778 Result<ConstByteSpan> encoded_chunk = chunk.Encode(buffer);
779 if (!encoded_chunk.ok()) {
780 PW_LOG_ERROR("Transfer %u failed to encode transmit chunk",
781 static_cast<unsigned>(session_id_));
782 TerminateTransfer(Status::Internal());
783 return;
784 }
785
786 if (const Status status = rpc_writer_->Write(*encoded_chunk); !status.ok()) {
787 PW_LOG_ERROR("Transfer %u failed to send transmit chunk, status %u",
788 static_cast<unsigned>(session_id_),
789 status.code());
790 TerminateTransfer(Status::DataLoss());
791 return;
792 }
793
794 last_chunk_sent_ = chunk.type();
795 flags_ |= kFlagsDataSent;
796
797 if (offset_ == window_end_offset_ || offset_ == total_size) {
798 // Sent all requested data. Must now wait for next parameters from the
799 // receiver.
800 set_transfer_state(TransferState::kWaiting);
801 SetTimeout(chunk_timeout_);
802 } else {
803 // More data is to be sent. Set a timeout to send the next chunk following
804 // the chunk delay.
805 SetTimeout(chrono::SystemClock::for_at_least(interchunk_delay_));
806 }
807 }
808
HandleReceiveChunk(const Chunk & chunk)809 void Context::HandleReceiveChunk(const Chunk& chunk) {
810 if (transfer_state_ == TransferState::kInitiating) {
811 PerformInitialHandshake(chunk);
812 return;
813 }
814
815 if (transfer_state_ == TransferState::kCompleted) {
816 // If the transfer has already completed and another chunk is received,
817 // re-send the final status chunk.
818 SendFinalStatusChunk();
819 return;
820 }
821
822 if (chunk.protocol_version() != configured_protocol_version_) {
823 PW_LOG_ERROR(
824 "Receive transfer %u was configured to use protocol version %d "
825 "but received a chunk with version %d",
826 id_for_log(),
827 static_cast<int>(configured_protocol_version_),
828 static_cast<int>(chunk.protocol_version()));
829 TerminateTransfer(Status::Internal());
830 return;
831 }
832
833 switch (transfer_state_) {
834 case TransferState::kInactive:
835 case TransferState::kTransmitting:
836 case TransferState::kInitiating:
837 PW_CRASH("HandleReceiveChunk() called in bad transfer state %d",
838 static_cast<int>(transfer_state_));
839
840 case TransferState::kCompleted:
841 // Handled earlier.
842 PW_UNREACHABLE;
843
844 case TransferState::kRecovery:
845 if (chunk.offset() != offset_) {
846 if (last_chunk_offset_ == chunk.offset()) {
847 PW_LOG_DEBUG(
848 "Transfer %u received repeated offset %u; retry detected, "
849 "resending transfer parameters",
850 static_cast<unsigned>(session_id_),
851 static_cast<unsigned>(chunk.offset()));
852
853 log_chunks_before_rate_limit_ = log_chunks_before_rate_limit_cfg_;
854 log_rate_limit_ = kNoRateLimit;
855
856 UpdateAndSendTransferParameters(TransmitAction::kRetransmit);
857 if (DataTransferComplete()) {
858 return;
859 }
860 PW_LOG_DEBUG("Transfer %u waiting for offset %u, ignoring %u",
861 static_cast<unsigned>(session_id_),
862 static_cast<unsigned>(offset_),
863 static_cast<unsigned>(chunk.offset()));
864 }
865
866 last_chunk_offset_ = chunk.offset();
867 SetTimeout(chunk_timeout_);
868 return;
869 }
870
871 PW_LOG_DEBUG("Transfer %u received expected offset %u, resuming transfer",
872 static_cast<unsigned>(session_id_),
873 static_cast<unsigned>(offset_));
874 set_transfer_state(TransferState::kWaiting);
875
876 // The correct chunk was received; process it normally.
877 [[fallthrough]];
878 case TransferState::kWaiting:
879 HandleReceivedData(chunk);
880 return;
881
882 case TransferState::kTerminating:
883 HandleTerminatingChunk(chunk);
884 return;
885 }
886 }
887
HandleReceivedData(const Chunk & chunk)888 void Context::HandleReceivedData(const Chunk& chunk) {
889 if (chunk.offset() != offset_) {
890 if (chunk.offset() + chunk.payload().size() <= offset_ &&
891 chunk.type() != Chunk::Type::kStartAckConfirmation) {
892 // If the chunk's data has already been received, don't go through a full
893 // recovery cycle to avoid shrinking the window size and potentially
894 // thrashing. The expected data may already be in-flight, so just allow
895 // the transmitter to keep going with a CONTINUE parameters chunk.
896 //
897 // However, as a retried chunk indicates a potential issue with the
898 // underlying connection, shrink the transfer window.
899 //
900 // Start ack confs do not come with an offset set, so it can get stuck
901 // here if we are doing an offset transfer.
902 PW_LOG_DEBUG("Transfer %u received duplicate chunk with offset %u",
903 id_for_log(),
904 static_cast<unsigned>(chunk.offset()));
905 UpdateTransferParameters(TransmitAction::kRetransmit);
906 SendTransferParameters(TransmitAction::kExtend);
907 } else {
908 // Bad offset; reset window size to send another parameters chunk.
909 PW_LOG_WARN(
910 "Transfer %u expected offset %u, received %u; entering recovery "
911 "state",
912 static_cast<unsigned>(session_id_),
913 static_cast<unsigned>(offset_),
914 static_cast<unsigned>(chunk.offset()));
915
916 set_transfer_state(TransferState::kRecovery);
917 UpdateAndSendTransferParameters(TransmitAction::kRetransmit);
918 }
919
920 SetTimeout(chunk_timeout_);
921 return;
922 }
923
924 if (chunk.offset() + chunk.payload().size() > window_end_offset_) {
925 PW_LOG_WARN(
926 "Transfer %u received more data than what was requested (%u received "
927 "for %u pending); attempting to recover.",
928 id_for_log(),
929 static_cast<unsigned>(chunk.payload().size()),
930 static_cast<unsigned>(window_end_offset_ - offset_));
931
932 // To prevent an improperly implemented client which doesn't respect
933 // window_end_offset from entering an infinite retry loop, limit recovery
934 // attempts to the lifetime retry count.
935 lifetime_retries_++;
936 if (lifetime_retries_ <= max_lifetime_retries_) {
937 set_transfer_state(TransferState::kRecovery);
938 SetTimeout(chunk_timeout_);
939
940 UpdateAndSendTransferParameters(TransmitAction::kRetransmit);
941 } else {
942 TerminateTransfer(Status::Internal());
943 }
944 return;
945 }
946
947 // Update the last offset seen so that retries can be detected.
948 last_chunk_offset_ = chunk.offset();
949
950 // Write staged data from the buffer to the stream.
951 if (chunk.has_payload()) {
952 if (Status status = writer().Write(chunk.payload()); !status.ok()) {
953 PW_LOG_ERROR(
954 "Transfer %u write of %u B chunk failed with status %u; aborting "
955 "with DATA_LOSS",
956 static_cast<unsigned>(session_id_),
957 static_cast<unsigned>(chunk.payload().size()),
958 status.code());
959 TerminateTransfer(Status::DataLoss());
960 return;
961 }
962
963 transfer_rate_.Update(chunk.payload().size());
964 }
965
966 // Update the transfer state.
967 offset_ += chunk.payload().size();
968
969 // When the client sets remaining_bytes to 0, it indicates completion of the
970 // transfer. Acknowledge the completion through a status chunk and clean up.
971 if (chunk.IsFinalTransmitChunk()) {
972 TerminateTransfer(OkStatus());
973 return;
974 }
975
976 if (chunk.window_end_offset() != 0) {
977 if (chunk.window_end_offset() < offset_) {
978 PW_LOG_ERROR(
979 "Transfer %u got invalid end offset of %u (current offset %u)",
980 id_for_log(),
981 static_cast<unsigned>(chunk.window_end_offset()),
982 static_cast<unsigned>(offset_));
983 TerminateTransfer(Status::Internal());
984 return;
985 }
986
987 if (chunk.window_end_offset() > window_end_offset_) {
988 // A transmitter should never send a larger end offset than what the
989 // receiver has advertised. If this occurs, there is a bug in the
990 // transmitter implementation. Terminate the transfer.
991 PW_LOG_ERROR(
992 "Transfer %u transmitter sent invalid end offset of %u, "
993 "greater than receiver offset %u",
994 id_for_log(),
995 static_cast<unsigned>(chunk.window_end_offset()),
996 static_cast<unsigned>(window_end_offset_));
997 TerminateTransfer(Status::Internal());
998 return;
999 }
1000
1001 window_end_offset_ = chunk.window_end_offset();
1002 }
1003
1004 SetTimeout(chunk_timeout_);
1005
1006 if (chunk.type() == Chunk::Type::kStartAckConfirmation) {
1007 // Send the first parameters in the receive transfer.
1008 UpdateAndSendTransferParameters(TransmitAction::kFirstParameters);
1009 return;
1010 }
1011
1012 if (offset_ == window_end_offset_) {
1013 // Received all pending data. Advance the transfer parameters.
1014 UpdateAndSendTransferParameters(TransmitAction::kExtend);
1015 return;
1016 }
1017
1018 // Once the transmitter has sent a sufficient amount of data, try to extend
1019 // the window to allow it to continue sending data without blocking.
1020 uint32_t remaining_window_size = window_end_offset_ - offset_;
1021 bool extend_window = remaining_window_size <=
1022 window_size_ / max_parameters_->extend_window_divisor();
1023
1024 if (extend_window) {
1025 UpdateAndSendTransferParameters(TransmitAction::kExtend);
1026 }
1027 }
1028
HandleTerminatingChunk(const Chunk & chunk)1029 void Context::HandleTerminatingChunk(const Chunk& chunk) {
1030 switch (chunk.type()) {
1031 case Chunk::Type::kCompletion:
1032 PW_CRASH("Completion chunks should be processed by HandleChunkEvent()");
1033
1034 case Chunk::Type::kCompletionAck:
1035 PW_LOG_INFO(
1036 "Transfer %u completed with status %u", id_for_log(), status_.code());
1037 set_transfer_state(TransferState::kInactive);
1038 break;
1039
1040 case Chunk::Type::kData:
1041 case Chunk::Type::kStart:
1042 case Chunk::Type::kParametersRetransmit:
1043 case Chunk::Type::kParametersContinue:
1044 case Chunk::Type::kStartAck:
1045 case Chunk::Type::kStartAckConfirmation:
1046 // If a non-completion chunk is received in a TERMINATING state, re-send
1047 // the transfer's completion chunk to the peer.
1048 EncodeAndSendChunk(
1049 Chunk::Final(configured_protocol_version_, session_id_, status_));
1050 break;
1051 }
1052 }
1053
TerminateTransfer(Status status,bool with_resource_id)1054 void Context::TerminateTransfer(Status status, bool with_resource_id) {
1055 if (transfer_state_ == TransferState::kTerminating ||
1056 transfer_state_ == TransferState::kCompleted) {
1057 // Transfer has already been terminated; no need to do it again.
1058 return;
1059 }
1060
1061 Finish(status);
1062
1063 PW_LOG_INFO("Transfer %u terminating with status: %u, offset: %u",
1064 static_cast<unsigned>(session_id_),
1065 status.code(),
1066 static_cast<unsigned>(offset_));
1067
1068 if (ShouldSkipCompletionHandshake()) {
1069 set_transfer_state(TransferState::kCompleted);
1070 } else {
1071 set_transfer_state(TransferState::kTerminating);
1072 SetTimeout(chunk_timeout_);
1073 }
1074
1075 // Don't send a final chunk if the other end of the transfer has not yet
1076 // made contact, as there is no one to notify.
1077 if ((flags_ & kFlagsContactMade) == kFlagsContactMade) {
1078 SendFinalStatusChunk(with_resource_id);
1079 }
1080 }
1081
HandleTermination(Status status)1082 void Context::HandleTermination(Status status) {
1083 Finish(status);
1084
1085 PW_LOG_INFO("Transfer %u completed with status %u",
1086 static_cast<unsigned>(session_id_),
1087 status.code());
1088
1089 if (ShouldSkipCompletionHandshake()) {
1090 set_transfer_state(TransferState::kCompleted);
1091 } else {
1092 EncodeAndSendChunk(
1093 Chunk(configured_protocol_version_, Chunk::Type::kCompletionAck)
1094 .set_session_id(session_id_));
1095
1096 set_transfer_state(TransferState::kInactive);
1097 }
1098 }
1099
SendFinalStatusChunk(bool with_resource_id)1100 void Context::SendFinalStatusChunk(bool with_resource_id) {
1101 PW_DCHECK(transfer_state_ == TransferState::kCompleted ||
1102 transfer_state_ == TransferState::kTerminating);
1103
1104 if (configured_protocol_version_ == ProtocolVersion::kUnknown) {
1105 // If the transfer is ended before contact is made with the peer,
1106 // the protocol version may not yet be configured. Use the desired
1107 // version for the status chunk.
1108 configured_protocol_version_ = desired_protocol_version_;
1109 PW_LOG_WARN(
1110 "Transfer %u ending before protocol version was confirmed; using "
1111 "version %u",
1112 id_for_log(),
1113 static_cast<unsigned>(desired_protocol_version_));
1114 }
1115
1116 PW_LOG_INFO("Sending final chunk for transfer %u with status %u",
1117 static_cast<unsigned>(session_id_),
1118 status_.code());
1119
1120 Chunk chunk =
1121 Chunk::Final(configured_protocol_version_, session_id_, status_);
1122 if (with_resource_id) {
1123 chunk.set_resource_id(resource_id_);
1124 }
1125 EncodeAndSendChunk(chunk);
1126 }
1127
Finish(Status status)1128 void Context::Finish(Status status) {
1129 PW_DCHECK(active());
1130
1131 status.Update(FinalCleanup(status));
1132 status_ = status;
1133
1134 SetTimeout(kFinalChunkAckTimeout);
1135 }
1136
SetTimeout(chrono::SystemClock::duration timeout)1137 void Context::SetTimeout(chrono::SystemClock::duration timeout) {
1138 next_timeout_ = chrono::SystemClock::TimePointAfterAtLeast(timeout);
1139 }
1140
HandleTimeout()1141 void Context::HandleTimeout() {
1142 ClearTimeout();
1143
1144 switch (transfer_state_) {
1145 case TransferState::kCompleted:
1146 // A timeout occurring in a completed state indicates that the other side
1147 // never ACKed the final status packet. Reset the context to inactive.
1148 set_transfer_state(TransferState::kInactive);
1149 return;
1150
1151 case TransferState::kTransmitting:
1152 // A timeout occurring in a TRANSMITTING state indicates that the transfer
1153 // has waited for its inter-chunk delay and should transmit its next
1154 // chunk.
1155 TransmitNextChunk(/*retransmit_requested=*/false);
1156 break;
1157
1158 case TransferState::kInitiating:
1159 case TransferState::kWaiting:
1160 case TransferState::kRecovery:
1161 case TransferState::kTerminating:
1162 // A timeout occurring in a transfer or handshake state indicates that no
1163 // chunk has been received from the other side. The transfer should retry
1164 // its previous operation.
1165 //
1166 // The timeout is set immediately. Retry() will clear it if it fails.
1167 if (transfer_state_ == TransferState::kInitiating &&
1168 last_chunk_sent_ == Chunk::Type::kStart) {
1169 SetTimeout(initial_chunk_timeout_);
1170 } else {
1171 SetTimeout(chunk_timeout_);
1172 }
1173 Retry();
1174 break;
1175
1176 case TransferState::kInactive:
1177 PW_LOG_ERROR("Timeout occurred in INACTIVE state");
1178 return;
1179 }
1180 }
1181
Retry()1182 void Context::Retry() {
1183 if (retries_ == max_retries_ || lifetime_retries_ == max_lifetime_retries_) {
1184 PW_LOG_ERROR(
1185 "Transfer %u failed to receive a chunk after %u retries (lifetime %u).",
1186 id_for_log(),
1187 static_cast<unsigned>(retries_),
1188 static_cast<unsigned>(lifetime_retries_));
1189 PW_LOG_ERROR("Canceling transfer.");
1190
1191 if (transfer_state_ == TransferState::kTerminating) {
1192 // Timeouts occurring in a TERMINATING state indicate that the completion
1193 // chunk was never ACKed. Simply clean up the transfer context.
1194 set_transfer_state(TransferState::kInactive);
1195 } else {
1196 TerminateTransfer(Status::DeadlineExceeded());
1197 }
1198 return;
1199 }
1200
1201 ++retries_;
1202 ++lifetime_retries_;
1203
1204 if (transfer_state_ == TransferState::kInitiating ||
1205 last_chunk_sent_ == Chunk::Type::kStartAckConfirmation) {
1206 RetryHandshake();
1207 return;
1208 }
1209
1210 if (transfer_state_ == TransferState::kTerminating) {
1211 EncodeAndSendChunk(
1212 Chunk::Final(configured_protocol_version_, session_id_, status_));
1213 return;
1214 }
1215
1216 if (type() == TransferType::kReceive) {
1217 // Resend the most recent transfer parameters.
1218 PW_LOG_DEBUG(
1219 "Receive transfer %u timed out waiting for chunk; resending parameters",
1220 static_cast<unsigned>(session_id_));
1221
1222 UpdateAndSendTransferParameters(TransmitAction::kRetransmit);
1223 return;
1224 }
1225
1226 // In a transmit, if a data chunk has not yet been sent, the initial transfer
1227 // parameters did not arrive from the receiver. Resend the initial chunk.
1228 if ((flags_ & kFlagsDataSent) != kFlagsDataSent) {
1229 PW_LOG_DEBUG(
1230 "Transmit transfer %u timed out waiting for initial parameters",
1231 static_cast<unsigned>(session_id_));
1232 SendInitialLegacyTransmitChunk();
1233 return;
1234 }
1235
1236 // Otherwise, resend the most recent chunk. If the reader doesn't support
1237 // seeking, this isn't possible, so just terminate the transfer immediately.
1238 if (!SeekReader(last_chunk_offset_).ok()) {
1239 PW_LOG_ERROR("Transmit transfer %u timed out waiting for new parameters.",
1240 id_for_log());
1241 PW_LOG_ERROR("Retrying requires a seekable reader. Alas, ours is not.");
1242 TerminateTransfer(Status::DeadlineExceeded());
1243 return;
1244 }
1245
1246 // Rewind the transfer position and resend the chunk.
1247 offset_ = last_chunk_offset_;
1248
1249 TransmitNextChunk(/*retransmit_requested=*/false);
1250 }
1251
RetryHandshake()1252 void Context::RetryHandshake() {
1253 Chunk retry_chunk(configured_protocol_version_, last_chunk_sent_);
1254
1255 switch (last_chunk_sent_) {
1256 case Chunk::Type::kStart:
1257 // No protocol version is yet configured at the time of sending the start
1258 // chunk, so we use the client's desired version instead.
1259 retry_chunk.set_protocol_version(desired_protocol_version_)
1260 .set_desired_session_id(session_id_)
1261 .set_resource_id(resource_id_)
1262 .set_initial_offset(offset_);
1263 if (type() == TransferType::kReceive) {
1264 SetTransferParameters(retry_chunk);
1265 }
1266 break;
1267
1268 case Chunk::Type::kStartAck:
1269 retry_chunk.set_session_id(session_id_)
1270 .set_resource_id(static_cast<ServerContext&>(*this).handler()->id());
1271 break;
1272
1273 case Chunk::Type::kStartAckConfirmation:
1274 retry_chunk.set_session_id(session_id_);
1275 if (type() == TransferType::kReceive) {
1276 SetTransferParameters(retry_chunk);
1277 }
1278 break;
1279
1280 case Chunk::Type::kData:
1281 case Chunk::Type::kParametersRetransmit:
1282 case Chunk::Type::kParametersContinue:
1283 case Chunk::Type::kCompletion:
1284 case Chunk::Type::kCompletionAck:
1285 PW_CRASH("Should not RetryHandshake() when not in handshake phase");
1286 }
1287
1288 EncodeAndSendChunk(retry_chunk);
1289 }
1290
MaxWriteChunkSize(uint32_t max_chunk_size_bytes,uint32_t channel_id) const1291 uint32_t Context::MaxWriteChunkSize(uint32_t max_chunk_size_bytes,
1292 uint32_t channel_id) const {
1293 // Start with the user-provided maximum chunk size, which should be the usable
1294 // payload length on the RPC ingress path after any transport overhead.
1295 ptrdiff_t max_size = max_chunk_size_bytes;
1296
1297 // Subtract the RPC overhead (pw_rpc/internal/packet.proto).
1298 //
1299 // type: 1 byte key, 1 byte value (CLIENT_STREAM)
1300 // channel_id: 1 byte key, varint value (calculate from stream)
1301 // service_id: 1 byte key, 4 byte value
1302 // method_id: 1 byte key, 4 byte value
1303 // payload: 1 byte key, varint length (remaining space)
1304 // status: 0 bytes (not set in stream packets)
1305 //
1306 // TOTAL: 14 bytes + encoded channel_id size + encoded payload length
1307 //
1308 max_size -= 14;
1309 max_size -= varint::EncodedSize(channel_id);
1310 max_size -= varint::EncodedSize(max_size);
1311
1312 // TODO(frolv): Temporarily add 5 bytes for the new call_id change. The RPC
1313 // overhead calculation will be moved into an RPC helper to avoid having
1314 // pw_transfer depend on RPC internals.
1315 max_size -= 5;
1316
1317 // Subtract the transfer service overhead for a client write chunk
1318 // (pw_transfer/transfer.proto).
1319 //
1320 // session_id: 1 byte key, varint value (calculate)
1321 // offset: 1 byte key, varint value (calculate)
1322 // data: 1 byte key, varint length (remaining space)
1323 //
1324 // TOTAL: 3 + encoded session_id + encoded offset + encoded data length
1325 //
1326 // Use a lower bound of a single chunk for the window end offset, as it will
1327 // always be at least in that range.
1328 size_t window_end_offset = std::max(window_end_offset_, max_chunk_size_bytes);
1329 max_size -= 3;
1330 max_size -= varint::EncodedSize(session_id_);
1331 max_size -= varint::EncodedSize(window_end_offset);
1332 max_size -= varint::EncodedSize(max_size);
1333
1334 // A resulting value of zero (or less) renders write transfers unusable, as
1335 // there is no space to send any payload. This should be considered a
1336 // programmer error in the transfer service setup.
1337 PW_CHECK_INT_GT(
1338 max_size,
1339 0,
1340 "Transfer service maximum chunk size is too small to fit a payload. "
1341 "Increase max_chunk_size_bytes to support write transfers.");
1342
1343 return max_size;
1344 }
1345
LogTransferConfiguration()1346 void Context::LogTransferConfiguration() {
1347 PW_LOG_DEBUG(
1348 "Local transfer timing configuration: "
1349 "chunk_timeout=%ums, max_retries=%u, interchunk_delay=%uus",
1350 static_cast<unsigned>(
1351 std::chrono::ceil<std::chrono::milliseconds>(chunk_timeout_).count()),
1352 static_cast<unsigned>(max_retries_),
1353 static_cast<unsigned>(
1354 std::chrono::ceil<std::chrono::microseconds>(interchunk_delay_)
1355 .count()));
1356
1357 PW_LOG_DEBUG(
1358 "Local transfer windowing configuration: max_window_size_bytes=%u, "
1359 "extend_window_divisor=%u, max_chunk_size_bytes=%u",
1360 static_cast<unsigned>(max_parameters_->max_window_size_bytes()),
1361 static_cast<unsigned>(max_parameters_->extend_window_divisor()),
1362 static_cast<unsigned>(max_parameters_->max_chunk_size_bytes()));
1363 }
1364
1365 } // namespace pw::transfer::internal
1366