1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #define PW_LOG_MODULE_NAME "TRN"
16 #define PW_LOG_LEVEL PW_TRANSFER_CONFIG_LOG_LEVEL
17
18 #include "pw_transfer/internal/context.h"
19
20 #include <chrono>
21 #include <limits>
22
23 #include "pw_assert/check.h"
24 #include "pw_chrono/system_clock.h"
25 #include "pw_log/log.h"
26 #include "pw_log/rate_limited.h"
27 #include "pw_protobuf/serialized_size.h"
28 #include "pw_transfer/internal/config.h"
29 #include "pw_transfer/transfer.pwpb.h"
30 #include "pw_transfer/transfer_thread.h"
31 #include "pw_varint/varint.h"
32
33 namespace pw::transfer::internal {
34
HandleEvent(const Event & event)35 void Context::HandleEvent(const Event& event) {
36 switch (event.type) {
37 case EventType::kNewClientTransfer:
38 case EventType::kNewServerTransfer: {
39 if (active()) {
40 if (event.type == EventType::kNewServerTransfer &&
41 event.new_transfer.session_id == session_id_ &&
42 last_chunk_sent_ == Chunk::Type::kStartAck) {
43 // The client is retrying its initial chunk as the response may not
44 // have made it back. Re-send the handshake response without going
45 // through handler reinitialization.
46 RetryHandshake();
47 return;
48 }
49 Abort(Status::Aborted());
50 }
51
52 Initialize(event.new_transfer);
53
54 if (event.type == EventType::kNewClientTransfer) {
55 InitiateTransferAsClient();
56 } else {
57 if (StartTransferAsServer(event.new_transfer)) {
58 // TODO(frolv): This should probably be restructured.
59 HandleChunkEvent({.context_identifier = event.new_transfer.session_id,
60 .match_resource_id = false, // Unused.
61 .data = event.new_transfer.raw_chunk_data,
62 .size = event.new_transfer.raw_chunk_size});
63 }
64 }
65 return;
66 }
67
68 case EventType::kClientChunk:
69 case EventType::kServerChunk:
70 PW_CHECK(initialized());
71 HandleChunkEvent(event.chunk);
72 return;
73
74 case EventType::kClientTimeout:
75 case EventType::kServerTimeout:
76 HandleTimeout();
77 return;
78
79 case EventType::kClientEndTransfer:
80 case EventType::kServerEndTransfer:
81 if (active()) {
82 if (event.end_transfer.send_status_chunk) {
83 TerminateTransfer(event.end_transfer.status);
84 } else {
85 Abort(event.end_transfer.status);
86 }
87 }
88 return;
89
90 case EventType::kSendStatusChunk:
91 case EventType::kAddTransferHandler:
92 case EventType::kRemoveTransferHandler:
93 case EventType::kSetStream:
94 case EventType::kTerminate:
95 case EventType::kUpdateClientTransfer:
96 case EventType::kGetResourceStatus:
97 // These events are intended for the transfer thread and should never be
98 // forwarded through to a context.
99 PW_CRASH("Transfer context received a transfer thread event");
100 }
101 }
102
InitiateTransferAsClient()103 void Context::InitiateTransferAsClient() {
104 PW_DCHECK(active());
105
106 SetTimeout(initial_chunk_timeout_);
107
108 PW_LOG_INFO("Starting transfer for resource %u",
109 static_cast<unsigned>(resource_id_));
110
111 // Receive transfers should prepare their initial parameters to be send in the
112 // initial chunk.
113 if (type() == TransferType::kReceive) {
114 UpdateTransferParameters(TransmitAction::kBegin);
115 }
116
117 if (desired_protocol_version_ == ProtocolVersion::kLegacy) {
118 // Legacy transfers go straight into the data transfer phase without a
119 // handshake.
120 if (type() == TransferType::kReceive) {
121 SendTransferParameters(TransmitAction::kBegin);
122 } else {
123 SendInitialLegacyTransmitChunk();
124 }
125
126 LogTransferConfiguration();
127 return;
128 }
129
130 // In newer protocol versions, begin the initial transfer handshake.
131 Chunk start_chunk(desired_protocol_version_, Chunk::Type::kStart);
132 start_chunk.set_desired_session_id(session_id_);
133 start_chunk.set_resource_id(resource_id_);
134 start_chunk.set_initial_offset(offset_);
135
136 if (type() == TransferType::kReceive) {
137 // Parameters should still be set on the initial chunk for backwards
138 // compatibility if the server only supports the legacy protocol.
139 SetTransferParameters(start_chunk);
140 }
141
142 EncodeAndSendChunk(start_chunk);
143 }
144
StartTransferAsServer(const NewTransferEvent & new_transfer)145 bool Context::StartTransferAsServer(const NewTransferEvent& new_transfer) {
146 PW_LOG_INFO("Starting %s transfer %u for resource %u with offset %u",
147 new_transfer.type == TransferType::kTransmit ? "read" : "write",
148 static_cast<unsigned>(new_transfer.session_id),
149 static_cast<unsigned>(new_transfer.resource_id),
150 static_cast<unsigned>(new_transfer.initial_offset));
151 LogTransferConfiguration();
152
153 flags_ |= kFlagsContactMade;
154
155 if (Status status = new_transfer.handler->Prepare(
156 new_transfer.type, new_transfer.initial_offset);
157 !status.ok()) {
158 PW_LOG_WARN("Transfer handler %u prepare failed with status %u",
159 static_cast<unsigned>(new_transfer.handler->id()),
160 status.code());
161
162 // As this failure occurs at the start of a transfer, no protocol version is
163 // yet negotiated and one must be set to send a response. It is okay to use
164 // the desired version here, as that comes from the client.
165 configured_protocol_version_ = desired_protocol_version_;
166
167 status = (status.IsPermissionDenied() || status.IsUnimplemented() ||
168 status.IsResourceExhausted())
169 ? status
170 : Status::DataLoss();
171 TerminateTransfer(status, /*with_resource_id=*/true);
172 return false;
173 }
174
175 // Initialize doesn't set the handler since it's specific to server transfers.
176 static_cast<ServerContext&>(*this).set_handler(*new_transfer.handler);
177
178 // Server transfers use the stream provided by the handler rather than the
179 // stream included in the NewTransferEvent.
180 stream_ = &new_transfer.handler->stream();
181
182 return true;
183 }
184
SendInitialLegacyTransmitChunk()185 void Context::SendInitialLegacyTransmitChunk() {
186 // A transmitter begins a transfer by sending the ID of the resource to which
187 // it wishes to write.
188 Chunk chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart);
189 chunk.set_session_id(resource_id_);
190
191 EncodeAndSendChunk(chunk);
192 }
193
UpdateTransferParameters(TransmitAction action)194 void Context::UpdateTransferParameters(TransmitAction action) {
195 max_chunk_size_bytes_ = MaxWriteChunkSize(
196 max_parameters_->max_chunk_size_bytes(), rpc_writer_->channel_id());
197 uint32_t window_size = 0;
198
199 if (max_chunk_size_bytes_ > max_parameters_->max_window_size_bytes()) {
200 window_size =
201 std::min(max_parameters_->max_window_size_bytes(),
202 static_cast<uint32_t>(writer().ConservativeWriteLimit()));
203 } else {
204 // Adjust the window size based on the latest event in the transfer.
205 switch (action) {
206 case TransmitAction::kBegin:
207 case TransmitAction::kFirstParameters:
208 // A transfer always begins with a window size of one chunk, set during
209 // initialization. No further handling is required.
210 break;
211
212 case TransmitAction::kExtend:
213 // Window was received successfully without packet loss and should grow.
214 // Double the window size during slow start, or increase it by a single
215 // chunk in congestion avoidance.
216 if (transmit_phase_ == TransmitPhase::kCongestionAvoidance) {
217 window_size_multiplier_ += 1;
218 } else {
219 window_size_multiplier_ *= 2;
220 }
221
222 // The window size can never exceed the user-specified maximum bytes. If
223 // it does, reduce the multiplier to the largest size that fits.
224 if (window_size_multiplier_ * max_chunk_size_bytes_ >
225 max_parameters_->max_window_size_bytes()) {
226 window_size_multiplier_ =
227 max_parameters_->max_window_size_bytes() / max_chunk_size_bytes_;
228 }
229 break;
230
231 case TransmitAction::kRetransmit:
232 // A packet was lost: shrink the window size. Additionally, after the
233 // first packet loss, transition from the slow start to the congestion
234 // avoidance phase of the transfer.
235 if (transmit_phase_ == TransmitPhase::kSlowStart) {
236 transmit_phase_ = TransmitPhase::kCongestionAvoidance;
237 }
238 window_size_multiplier_ =
239 std::max(window_size_multiplier_ / static_cast<uint32_t>(2),
240 static_cast<uint32_t>(1));
241 break;
242 }
243
244 window_size =
245 std::min({window_size_multiplier_ * max_chunk_size_bytes_,
246 max_parameters_->max_window_size_bytes(),
247 static_cast<uint32_t>(writer().ConservativeWriteLimit())});
248 }
249
250 window_size_ = window_size;
251 window_end_offset_ = offset_ + window_size;
252 }
253
SetTransferParameters(Chunk & parameters)254 void Context::SetTransferParameters(Chunk& parameters) {
255 parameters.set_window_end_offset(window_end_offset_)
256 .set_max_chunk_size_bytes(max_chunk_size_bytes_)
257 .set_min_delay_microseconds(kDefaultChunkDelayMicroseconds)
258 .set_offset(offset_);
259 }
260
UpdateAndSendTransferParameters(TransmitAction action)261 void Context::UpdateAndSendTransferParameters(TransmitAction action) {
262 UpdateTransferParameters(action);
263
264 return SendTransferParameters(action);
265 }
266
SendTransferParameters(TransmitAction action)267 void Context::SendTransferParameters(TransmitAction action) {
268 Chunk::Type type = Chunk::Type::kParametersRetransmit;
269
270 switch (action) {
271 case TransmitAction::kBegin:
272 type = Chunk::Type::kStart;
273 break;
274 case TransmitAction::kFirstParameters:
275 case TransmitAction::kRetransmit:
276 type = Chunk::Type::kParametersRetransmit;
277 break;
278 case TransmitAction::kExtend:
279 type = Chunk::Type::kParametersContinue;
280 break;
281 }
282
283 Chunk parameters(configured_protocol_version_, type);
284 parameters.set_session_id(session_id_);
285 SetTransferParameters(parameters);
286
287 PW_LOG_EVERY_N_DURATION(
288 PW_LOG_LEVEL_INFO,
289 log_rate_limit_,
290 "Transfer rate: %u B/s",
291 static_cast<unsigned>(transfer_rate_.GetRateBytesPerSecond()));
292
293 PW_LOG_EVERY_N_DURATION(PW_LOG_LEVEL_INFO,
294 log_rate_limit_,
295 "Transfer %u sending transfer parameters: "
296 "offset=%u, window_end_offset=%u, max_chunk_size=%u",
297 static_cast<unsigned>(session_id_),
298 static_cast<unsigned>(offset_),
299 static_cast<unsigned>(window_end_offset_),
300 static_cast<unsigned>(max_chunk_size_bytes_));
301
302 if (log_chunks_before_rate_limit_ > 0) {
303 log_chunks_before_rate_limit_--;
304
305 if (log_chunks_before_rate_limit_ == 0) {
306 log_rate_limit_ = log_rate_limit_cfg_;
307 }
308 }
309
310 EncodeAndSendChunk(parameters);
311 }
312
EncodeAndSendChunk(const Chunk & chunk)313 void Context::EncodeAndSendChunk(const Chunk& chunk) {
314 last_chunk_sent_ = chunk.type();
315
316 #if PW_TRANSFER_CONFIG_DEBUG_CHUNKS
317 if ((chunk.remaining_bytes().has_value() &&
318 chunk.remaining_bytes().value() == 0) ||
319 (chunk.type() != Chunk::Type::kData &&
320 chunk.type() != Chunk::Type::kParametersContinue)) {
321 chunk.LogChunk(false, pw::chrono::SystemClock::duration::zero());
322 }
323 #endif
324
325 #if PW_TRANSFER_CONFIG_DEBUG_DATA_CHUNKS
326 if (chunk.type() == Chunk::Type::kData ||
327 chunk.type() == Chunk::Type::kParametersContinue) {
328 chunk.LogChunk(false, log_rate_limit_);
329 }
330 #endif
331
332 Result<ConstByteSpan> data = chunk.Encode(thread_->encode_buffer());
333 if (!data.ok()) {
334 PW_LOG_ERROR("Failed to encode chunk for transfer %u: %d",
335 static_cast<unsigned>(chunk.session_id()),
336 data.status().code());
337 if (active()) {
338 TerminateTransfer(Status::Internal());
339 }
340 return;
341 }
342
343 if (const Status status = rpc_writer_->Write(*data); !status.ok()) {
344 PW_LOG_ERROR("Failed to write chunk for transfer %u: %d",
345 static_cast<unsigned>(chunk.session_id()),
346 status.code());
347 if (active()) {
348 TerminateTransfer(Status::Internal());
349 }
350 return;
351 }
352 }
353
Initialize(const NewTransferEvent & new_transfer)354 void Context::Initialize(const NewTransferEvent& new_transfer) {
355 PW_DCHECK(!active());
356
357 PW_DCHECK_INT_NE(new_transfer.protocol_version,
358 ProtocolVersion::kUnknown,
359 "Cannot start a transfer with an unknown protocol");
360
361 session_id_ = new_transfer.session_id;
362 resource_id_ = new_transfer.resource_id;
363 desired_protocol_version_ = new_transfer.protocol_version;
364 configured_protocol_version_ = ProtocolVersion::kUnknown;
365
366 flags_ = static_cast<uint8_t>(new_transfer.type);
367 transfer_state_ = TransferState::kWaiting;
368 retries_ = 0;
369 max_retries_ = new_transfer.max_retries;
370 lifetime_retries_ = 0;
371 max_lifetime_retries_ = new_transfer.max_lifetime_retries;
372
373 if (desired_protocol_version_ == ProtocolVersion::kLegacy) {
374 // In a legacy transfer, there is no protocol negotiation stage.
375 // Automatically configure the context to run the legacy protocol and
376 // proceed to waiting for a chunk.
377 configured_protocol_version_ = ProtocolVersion::kLegacy;
378 transfer_state_ = TransferState::kWaiting;
379 } else {
380 transfer_state_ = TransferState::kInitiating;
381 }
382
383 rpc_writer_ = new_transfer.rpc_writer;
384 stream_ = new_transfer.stream;
385
386 offset_ = new_transfer.initial_offset;
387 initial_offset_ = new_transfer.initial_offset;
388 window_size_ = 0;
389 window_end_offset_ = 0;
390 max_chunk_size_bytes_ = new_transfer.max_parameters->max_chunk_size_bytes();
391
392 window_size_multiplier_ = 1;
393 transmit_phase_ = TransmitPhase::kSlowStart;
394
395 max_parameters_ = new_transfer.max_parameters;
396 thread_ = new_transfer.transfer_thread;
397
398 last_chunk_sent_ = Chunk::Type::kStart;
399 last_chunk_offset_ = 0;
400 chunk_timeout_ = new_transfer.timeout;
401 initial_chunk_timeout_ = new_transfer.initial_timeout;
402 interchunk_delay_ = chrono::SystemClock::for_at_least(
403 std::chrono::microseconds(kDefaultChunkDelayMicroseconds));
404 next_timeout_ = kNoTimeout;
405 log_chunks_before_rate_limit_ = log_chunks_before_rate_limit_cfg_;
406
407 transfer_rate_.Reset();
408 }
409
HandleChunkEvent(const ChunkEvent & event)410 void Context::HandleChunkEvent(const ChunkEvent& event) {
411 Result<Chunk> maybe_chunk =
412 Chunk::Parse(ConstByteSpan(event.data, event.size));
413 if (!maybe_chunk.ok()) {
414 return;
415 }
416
417 Chunk chunk = *maybe_chunk;
418
419 // Received some data. Reset the retry counter.
420 retries_ = 0;
421 flags_ |= kFlagsContactMade;
422
423 #if PW_TRANSFER_CONFIG_DEBUG_CHUNKS
424 if (chunk.type() != Chunk::Type::kData &&
425 chunk.type() != Chunk::Type::kParametersContinue) {
426 chunk.LogChunk(true, pw::chrono::SystemClock::duration::zero());
427 }
428 #endif
429 #if PW_TRANSFER_CONFIG_DEBUG_DATA_CHUNKS
430 if (chunk.type() == Chunk::Type::kData ||
431 chunk.type() == Chunk::Type::kParametersContinue) {
432 chunk.LogChunk(true, log_rate_limit_);
433 }
434 #endif
435
436 if (chunk.IsTerminatingChunk()) {
437 if (active()) {
438 HandleTermination(chunk.status().value());
439 } else {
440 PW_LOG_INFO("Got final status %d for completed transfer %d",
441 static_cast<int>(chunk.status().value().code()),
442 static_cast<int>(session_id_));
443 }
444 return;
445 }
446
447 if (type() == TransferType::kTransmit) {
448 HandleTransmitChunk(chunk);
449 } else {
450 HandleReceiveChunk(chunk);
451 }
452 }
453
PerformInitialHandshake(const Chunk & chunk)454 void Context::PerformInitialHandshake(const Chunk& chunk) {
455 switch (chunk.type()) {
456 // Initial packet sent from a client to a server.
457 case Chunk::Type::kStart: {
458 UpdateLocalProtocolConfigurationFromPeer(chunk);
459
460 if (type() == TransferType::kReceive) {
461 // Update window end offset so it is valid.
462 window_end_offset_ = offset_;
463 }
464
465 // This cast is safe as we know we're running in a transfer server.
466 uint32_t resource_id = static_cast<ServerContext&>(*this).handler()->id();
467
468 Chunk start_ack(configured_protocol_version_, Chunk::Type::kStartAck);
469 start_ack.set_session_id(session_id_);
470 start_ack.set_resource_id(resource_id);
471 start_ack.set_initial_offset(offset_);
472
473 EncodeAndSendChunk(start_ack);
474 break;
475 }
476
477 // Response packet sent from a server to a client, confirming the protocol
478 // version and session_id of the transfer.
479 case Chunk::Type::kStartAck: {
480 UpdateLocalProtocolConfigurationFromPeer(chunk);
481
482 // This should confirm the offset we're starting at
483 if (offset_ != chunk.initial_offset()) {
484 TerminateTransfer(Status::Unimplemented());
485 break;
486 }
487
488 Chunk start_ack_confirmation(configured_protocol_version_,
489 Chunk::Type::kStartAckConfirmation);
490 start_ack_confirmation.set_session_id(session_id_);
491
492 if (type() == TransferType::kReceive) {
493 // In a receive transfer, tag the initial transfer parameters onto the
494 // confirmation chunk so that the server can immediately begin sending
495 // data.
496 UpdateTransferParameters(TransmitAction::kFirstParameters);
497 SetTransferParameters(start_ack_confirmation);
498 }
499
500 set_transfer_state(TransferState::kWaiting);
501 EncodeAndSendChunk(start_ack_confirmation);
502 break;
503 }
504
505 // Confirmation sent by a client to a server of the configured transfer
506 // version and session ID. Completes the handshake and begins the actual
507 // data transfer.
508 case Chunk::Type::kStartAckConfirmation: {
509 set_transfer_state(TransferState::kWaiting);
510
511 if (type() == TransferType::kTransmit) {
512 HandleTransmitChunk(chunk);
513 } else {
514 HandleReceiveChunk(chunk);
515 }
516 break;
517 }
518
519 // If a non-handshake chunk is received during an INITIATING state, the
520 // transfer peer is running a legacy protocol version, which does not
521 // perform a handshake. End the handshake, revert to the legacy protocol,
522 // and process the chunk appropriately.
523 case Chunk::Type::kData:
524 case Chunk::Type::kParametersRetransmit:
525 case Chunk::Type::kParametersContinue:
526
527 // Update the local session_id, which will map to the transfer_id of the
528 // legacy chunk.
529 session_id_ = chunk.session_id();
530
531 configured_protocol_version_ = ProtocolVersion::kLegacy;
532 // Cancel if we are not using at least version 2, and we tried to start a
533 // non-zero offset transfer
534 if (chunk.initial_offset() != 0) {
535 PW_LOG_ERROR("Legacy transfer does not support offset transfers!");
536 TerminateTransfer(Status::Internal());
537 break;
538 }
539
540 set_transfer_state(TransferState::kWaiting);
541
542 PW_LOG_DEBUG(
543 "Transfer %u tried to start on protocol version %d, but peer only "
544 "supports legacy",
545 id_for_log(),
546 static_cast<int>(desired_protocol_version_));
547
548 if (type() == TransferType::kTransmit) {
549 HandleTransmitChunk(chunk);
550 } else {
551 HandleReceiveChunk(chunk);
552 }
553 break;
554
555 case Chunk::Type::kCompletion:
556 case Chunk::Type::kCompletionAck:
557 PW_CRASH(
558 "Transfer completion packets should be processed by "
559 "HandleChunkEvent()");
560 break;
561 }
562 }
563
UpdateLocalProtocolConfigurationFromPeer(const Chunk & chunk)564 void Context::UpdateLocalProtocolConfigurationFromPeer(const Chunk& chunk) {
565 PW_LOG_DEBUG("Negotiating protocol version: ours=%d, theirs=%d",
566 static_cast<int>(desired_protocol_version_),
567 static_cast<int>(chunk.protocol_version()));
568
569 configured_protocol_version_ =
570 std::min(desired_protocol_version_, chunk.protocol_version());
571
572 PW_LOG_INFO("Transfer %u: using protocol version %d",
573 id_for_log(),
574 static_cast<int>(configured_protocol_version_));
575 }
576
HandleTransmitChunk(const Chunk & chunk)577 void Context::HandleTransmitChunk(const Chunk& chunk) {
578 switch (transfer_state_) {
579 case TransferState::kInactive:
580 case TransferState::kRecovery:
581 PW_CRASH("Never should handle chunk while inactive");
582
583 case TransferState::kCompleted:
584 // If the transfer has already completed and another chunk is received,
585 // tell the other end that the transfer is over.
586 //
587 // TODO(frolv): Final status chunks should be ACKed by the other end. When
588 // that is added, this case should be updated to check if the received
589 // chunk is an ACK. If so, the transfer state can be reset to INACTIVE.
590 // Otherwise, the final status should be re-sent.
591 if (!chunk.IsInitialChunk()) {
592 status_ = Status::FailedPrecondition();
593 }
594 SendFinalStatusChunk();
595 return;
596
597 case TransferState::kInitiating:
598 PerformInitialHandshake(chunk);
599 return;
600
601 case TransferState::kWaiting:
602 case TransferState::kTransmitting:
603 if (chunk.protocol_version() == configured_protocol_version_) {
604 HandleTransferParametersUpdate(chunk);
605 } else {
606 PW_LOG_ERROR(
607 "Transmit transfer %u was configured to use protocol version %d "
608 "but received a chunk with version %d",
609 id_for_log(),
610 static_cast<int>(configured_protocol_version_),
611 static_cast<int>(chunk.protocol_version()));
612 TerminateTransfer(Status::Internal());
613 }
614 return;
615
616 case TransferState::kTerminating:
617 HandleTerminatingChunk(chunk);
618 return;
619 }
620 }
621
HandleTransferParametersUpdate(const Chunk & chunk)622 void Context::HandleTransferParametersUpdate(const Chunk& chunk) {
623 bool retransmit = chunk.RequestsTransmissionFromOffset();
624
625 if (retransmit) {
626 // If the offsets don't match, attempt to seek on the reader. Not all
627 // readers support seeking; abort with UNIMPLEMENTED if this handler
628 // doesn't.
629 if (offset_ != chunk.offset()) {
630 if (Status seek_status = SeekReader(chunk.offset()); !seek_status.ok()) {
631 PW_LOG_WARN("Transfer %u seek to %u failed with status %u",
632 static_cast<unsigned>(session_id_),
633 static_cast<unsigned>(chunk.offset()),
634 seek_status.code());
635
636 // Remap status codes to return one of the following:
637 //
638 // INTERNAL: invalid seek, never should happen
639 // DATA_LOSS: the reader is in a bad state
640 // UNIMPLEMENTED: seeking is not supported
641 //
642 if (seek_status.IsOutOfRange()) {
643 seek_status = Status::Internal();
644 } else if (!seek_status.IsUnimplemented()) {
645 seek_status = Status::DataLoss();
646 }
647
648 TerminateTransfer(seek_status);
649 return;
650 }
651 }
652
653 offset_ = chunk.offset();
654 }
655
656 window_end_offset_ = chunk.window_end_offset();
657
658 if (chunk.max_chunk_size_bytes().has_value()) {
659 max_chunk_size_bytes_ = std::min(chunk.max_chunk_size_bytes().value(),
660 max_parameters_->max_chunk_size_bytes());
661 }
662
663 if (chunk.min_delay_microseconds().has_value()) {
664 interchunk_delay_ = chrono::SystemClock::for_at_least(
665 std::chrono::microseconds(chunk.min_delay_microseconds().value()));
666 }
667
668 if (retransmit) {
669 PW_LOG_INFO(
670 "Transfer %u received parameters type=RETRANSMIT offset=%u "
671 "window_end_offset=%u",
672 static_cast<unsigned>(session_id_),
673 static_cast<unsigned>(chunk.offset()),
674 static_cast<unsigned>(window_end_offset_));
675 } else {
676 PW_LOG_EVERY_N_DURATION(
677 PW_LOG_LEVEL_INFO,
678 std::chrono::seconds(3),
679 "Transfer %u received parameters type=CONTINUE offset=%u "
680 "window_end_offset=%u",
681 static_cast<unsigned>(session_id_),
682 static_cast<unsigned>(chunk.offset()),
683 static_cast<unsigned>(window_end_offset_));
684 }
685
686 // Parsed all of the parameters; start sending the window.
687 set_transfer_state(TransferState::kTransmitting);
688
689 TransmitNextChunk(retransmit);
690 }
691
TransmitNextChunk(bool retransmit_requested)692 void Context::TransmitNextChunk(bool retransmit_requested) {
693 Chunk chunk(configured_protocol_version_, Chunk::Type::kData);
694 chunk.set_session_id(session_id_);
695 chunk.set_offset(offset_);
696
697 // Reserve space for the data proto field overhead and use the remainder of
698 // the buffer for the chunk data.
699 size_t reserved_size =
700 chunk.EncodedSize() + 1 /* data key */ + 5 /* data size */;
701
702 size_t total_size = TransferSizeBytes();
703 if (total_size != std::numeric_limits<size_t>::max()) {
704 reserved_size += protobuf::SizeOfVarintField(
705 pwpb::Chunk::Fields::kRemainingBytes, total_size);
706 }
707
708 ByteSpan buffer = thread_->encode_buffer();
709 Result<ByteSpan> data;
710
711 if (offset_ < total_size) {
712 // Read the next chunk of data into the encode buffer.
713 ByteSpan data_buffer = buffer.subspan(reserved_size);
714 size_t max_bytes_to_send =
715 std::min(window_end_offset_ - offset_, max_chunk_size_bytes_);
716
717 if (max_bytes_to_send < data_buffer.size()) {
718 data_buffer = data_buffer.first(max_bytes_to_send);
719 }
720
721 data = reader().Read(data_buffer);
722 } else {
723 // The user-specified resource size has been reached: respect it.
724 data = Status::OutOfRange();
725 }
726
727 if (data.status().IsOutOfRange()) {
728 // No more data to read.
729 chunk.set_remaining_bytes(0);
730 window_end_offset_ = offset_;
731
732 PW_LOG_INFO("Transfer %u sending final chunk with remaining_bytes=0",
733 static_cast<unsigned>(session_id_));
734 } else if (data.ok()) {
735 if (offset_ == window_end_offset_) {
736 if (retransmit_requested) {
737 PW_LOG_ERROR(
738 "Transfer %u: received an empty retransmit request, but there is "
739 "still data to send; aborting with RESOURCE_EXHAUSTED",
740 id_for_log());
741 TerminateTransfer(Status::ResourceExhausted());
742 } else {
743 PW_LOG_DEBUG(
744 "Transfer %u: ignoring continuation packet for transfer window "
745 "that has already been sent",
746 id_for_log());
747 SetTimeout(chunk_timeout_);
748 }
749 return; // No data was requested, so there is nothing else to do.
750 }
751
752 PW_LOG_DEBUG("Transfer %u sending chunk offset=%u size=%u",
753 static_cast<unsigned>(session_id_),
754 static_cast<unsigned>(offset_),
755 static_cast<unsigned>(data.value().size()));
756
757 chunk.set_payload(data.value());
758 last_chunk_offset_ = offset_;
759 offset_ += data.value().size();
760
761 if (total_size != std::numeric_limits<size_t>::max()) {
762 chunk.set_remaining_bytes(total_size - offset_);
763 }
764 } else {
765 PW_LOG_ERROR("Transfer %u Read() failed with status %u",
766 static_cast<unsigned>(session_id_),
767 data.status().code());
768 TerminateTransfer(Status::DataLoss());
769 return;
770 }
771
772 Result<ConstByteSpan> encoded_chunk = chunk.Encode(buffer);
773 if (!encoded_chunk.ok()) {
774 PW_LOG_ERROR("Transfer %u failed to encode transmit chunk",
775 static_cast<unsigned>(session_id_));
776 TerminateTransfer(Status::Internal());
777 return;
778 }
779
780 if (const Status status = rpc_writer_->Write(*encoded_chunk); !status.ok()) {
781 PW_LOG_ERROR("Transfer %u failed to send transmit chunk, status %u",
782 static_cast<unsigned>(session_id_),
783 status.code());
784 TerminateTransfer(Status::DataLoss());
785 return;
786 }
787
788 last_chunk_sent_ = chunk.type();
789 flags_ |= kFlagsDataSent;
790
791 if (offset_ == window_end_offset_ || offset_ == total_size) {
792 // Sent all requested data. Must now wait for next parameters from the
793 // receiver.
794 set_transfer_state(TransferState::kWaiting);
795 SetTimeout(chunk_timeout_);
796 } else {
797 // More data is to be sent. Set a timeout to send the next chunk following
798 // the chunk delay.
799 SetTimeout(chrono::SystemClock::for_at_least(interchunk_delay_));
800 }
801 }
802
HandleReceiveChunk(const Chunk & chunk)803 void Context::HandleReceiveChunk(const Chunk& chunk) {
804 if (transfer_state_ == TransferState::kInitiating) {
805 PerformInitialHandshake(chunk);
806 return;
807 }
808
809 if (chunk.protocol_version() != configured_protocol_version_) {
810 PW_LOG_ERROR(
811 "Receive transfer %u was configured to use protocol version %d "
812 "but received a chunk with version %d",
813 id_for_log(),
814 static_cast<int>(configured_protocol_version_),
815 static_cast<int>(chunk.protocol_version()));
816 TerminateTransfer(Status::Internal());
817 return;
818 }
819
820 switch (transfer_state_) {
821 case TransferState::kInactive:
822 case TransferState::kTransmitting:
823 case TransferState::kInitiating:
824 PW_CRASH("HandleReceiveChunk() called in bad transfer state %d",
825 static_cast<int>(transfer_state_));
826
827 case TransferState::kCompleted:
828 // If the transfer has already completed and another chunk is received,
829 // re-send the final status chunk.
830 //
831 // TODO(frolv): Final status chunks should be ACKed by the other end. When
832 // that is added, this case should be updated to check if the received
833 // chunk is an ACK. If so, the transfer state can be reset to INACTIVE.
834 // Otherwise, the final status should be re-sent.
835 SendFinalStatusChunk();
836 return;
837
838 case TransferState::kRecovery:
839 if (chunk.offset() != offset_) {
840 if (last_chunk_offset_ == chunk.offset()) {
841 PW_LOG_DEBUG(
842 "Transfer %u received repeated offset %u; retry detected, "
843 "resending transfer parameters",
844 static_cast<unsigned>(session_id_),
845 static_cast<unsigned>(chunk.offset()));
846
847 log_chunks_before_rate_limit_ = log_chunks_before_rate_limit_cfg_;
848 log_rate_limit_ = kNoRateLimit;
849
850 UpdateAndSendTransferParameters(TransmitAction::kRetransmit);
851 if (DataTransferComplete()) {
852 return;
853 }
854 PW_LOG_DEBUG("Transfer %u waiting for offset %u, ignoring %u",
855 static_cast<unsigned>(session_id_),
856 static_cast<unsigned>(offset_),
857 static_cast<unsigned>(chunk.offset()));
858 }
859
860 last_chunk_offset_ = chunk.offset();
861 SetTimeout(chunk_timeout_);
862 return;
863 }
864
865 PW_LOG_DEBUG("Transfer %u received expected offset %u, resuming transfer",
866 static_cast<unsigned>(session_id_),
867 static_cast<unsigned>(offset_));
868 set_transfer_state(TransferState::kWaiting);
869
870 // The correct chunk was received; process it normally.
871 [[fallthrough]];
872 case TransferState::kWaiting:
873 HandleReceivedData(chunk);
874 return;
875
876 case TransferState::kTerminating:
877 HandleTerminatingChunk(chunk);
878 return;
879 }
880 }
881
HandleReceivedData(const Chunk & chunk)882 void Context::HandleReceivedData(const Chunk& chunk) {
883 if (chunk.offset() != offset_) {
884 // Bad offset; reset window size to send another parameters chunk.
885 PW_LOG_DEBUG(
886 "Transfer %u expected offset %u, received %u; entering recovery "
887 "state",
888 static_cast<unsigned>(session_id_),
889 static_cast<unsigned>(offset_),
890 static_cast<unsigned>(chunk.offset()));
891
892 set_transfer_state(TransferState::kRecovery);
893 SetTimeout(chunk_timeout_);
894
895 UpdateAndSendTransferParameters(TransmitAction::kRetransmit);
896 return;
897 }
898
899 if (chunk.offset() + chunk.payload().size() > window_end_offset_) {
900 PW_LOG_WARN(
901 "Transfer %u received more data than what was requested (%u received "
902 "for %u pending); attempting to recover.",
903 id_for_log(),
904 static_cast<unsigned>(chunk.payload().size()),
905 static_cast<unsigned>(window_end_offset_ - offset_));
906
907 // To prevent an improperly implemented client which doesn't respect
908 // window_end_offset from entering an infinite retry loop, limit recovery
909 // attempts to the lifetime retry count.
910 lifetime_retries_++;
911 if (lifetime_retries_ <= max_lifetime_retries_) {
912 set_transfer_state(TransferState::kRecovery);
913 SetTimeout(chunk_timeout_);
914
915 UpdateAndSendTransferParameters(TransmitAction::kRetransmit);
916 } else {
917 TerminateTransfer(Status::Internal());
918 }
919 return;
920 }
921
922 // Update the last offset seen so that retries can be detected.
923 last_chunk_offset_ = chunk.offset();
924
925 // Write staged data from the buffer to the stream.
926 if (chunk.has_payload()) {
927 if (Status status = writer().Write(chunk.payload()); !status.ok()) {
928 PW_LOG_ERROR(
929 "Transfer %u write of %u B chunk failed with status %u; aborting "
930 "with DATA_LOSS",
931 static_cast<unsigned>(session_id_),
932 static_cast<unsigned>(chunk.payload().size()),
933 status.code());
934 TerminateTransfer(Status::DataLoss());
935 return;
936 }
937
938 transfer_rate_.Update(chunk.payload().size());
939 }
940
941 // Update the transfer state.
942 offset_ += chunk.payload().size();
943
944 // When the client sets remaining_bytes to 0, it indicates completion of the
945 // transfer. Acknowledge the completion through a status chunk and clean up.
946 if (chunk.IsFinalTransmitChunk()) {
947 TerminateTransfer(OkStatus());
948 return;
949 }
950
951 if (chunk.window_end_offset() != 0) {
952 if (chunk.window_end_offset() < offset_) {
953 PW_LOG_ERROR(
954 "Transfer %u got invalid end offset of %u (current offset %u)",
955 id_for_log(),
956 static_cast<unsigned>(chunk.window_end_offset()),
957 static_cast<unsigned>(offset_));
958 TerminateTransfer(Status::Internal());
959 return;
960 }
961
962 if (chunk.window_end_offset() > window_end_offset_) {
963 // A transmitter should never send a larger end offset than what the
964 // receiver has advertised. If this occurs, there is a bug in the
965 // transmitter implementation. Terminate the transfer.
966 PW_LOG_ERROR(
967 "Transfer %u transmitter sent invalid end offset of %u, "
968 "greater than receiver offset %u",
969 id_for_log(),
970 static_cast<unsigned>(chunk.window_end_offset()),
971 static_cast<unsigned>(window_end_offset_));
972 TerminateTransfer(Status::Internal());
973 return;
974 }
975
976 window_end_offset_ = chunk.window_end_offset();
977 }
978
979 SetTimeout(chunk_timeout_);
980
981 if (chunk.type() == Chunk::Type::kStartAckConfirmation) {
982 // Send the first parameters in the receive transfer.
983 UpdateAndSendTransferParameters(TransmitAction::kFirstParameters);
984 return;
985 }
986
987 if (offset_ == window_end_offset_) {
988 // Received all pending data. Advance the transfer parameters.
989 UpdateAndSendTransferParameters(TransmitAction::kExtend);
990 return;
991 }
992
993 // Once the transmitter has sent a sufficient amount of data, try to extend
994 // the window to allow it to continue sending data without blocking.
995 uint32_t remaining_window_size = window_end_offset_ - offset_;
996 bool extend_window = remaining_window_size <=
997 window_size_ / max_parameters_->extend_window_divisor();
998
999 if (extend_window) {
1000 UpdateAndSendTransferParameters(TransmitAction::kExtend);
1001 }
1002 }
1003
HandleTerminatingChunk(const Chunk & chunk)1004 void Context::HandleTerminatingChunk(const Chunk& chunk) {
1005 switch (chunk.type()) {
1006 case Chunk::Type::kCompletion:
1007 PW_CRASH("Completion chunks should be processed by HandleChunkEvent()");
1008
1009 case Chunk::Type::kCompletionAck:
1010 PW_LOG_INFO(
1011 "Transfer %u completed with status %u", id_for_log(), status_.code());
1012 set_transfer_state(TransferState::kInactive);
1013 break;
1014
1015 case Chunk::Type::kData:
1016 case Chunk::Type::kStart:
1017 case Chunk::Type::kParametersRetransmit:
1018 case Chunk::Type::kParametersContinue:
1019 case Chunk::Type::kStartAck:
1020 case Chunk::Type::kStartAckConfirmation:
1021 // If a non-completion chunk is received in a TERMINATING state, re-send
1022 // the transfer's completion chunk to the peer.
1023 EncodeAndSendChunk(
1024 Chunk::Final(configured_protocol_version_, session_id_, status_));
1025 break;
1026 }
1027 }
1028
TerminateTransfer(Status status,bool with_resource_id)1029 void Context::TerminateTransfer(Status status, bool with_resource_id) {
1030 if (transfer_state_ == TransferState::kTerminating ||
1031 transfer_state_ == TransferState::kCompleted) {
1032 // Transfer has already been terminated; no need to do it again.
1033 return;
1034 }
1035
1036 Finish(status);
1037
1038 PW_LOG_INFO("Transfer %u terminating with status: %u, offset: %u",
1039 static_cast<unsigned>(session_id_),
1040 status.code(),
1041 static_cast<unsigned>(offset_));
1042
1043 if (ShouldSkipCompletionHandshake()) {
1044 set_transfer_state(TransferState::kCompleted);
1045 } else {
1046 set_transfer_state(TransferState::kTerminating);
1047 SetTimeout(chunk_timeout_);
1048 }
1049
1050 // Don't send a final chunk if the other end of the transfer has not yet
1051 // made contact, as there is no one to notify.
1052 if ((flags_ & kFlagsContactMade) == kFlagsContactMade) {
1053 SendFinalStatusChunk(with_resource_id);
1054 }
1055 }
1056
HandleTermination(Status status)1057 void Context::HandleTermination(Status status) {
1058 Finish(status);
1059
1060 PW_LOG_INFO("Transfer %u completed with status %u",
1061 static_cast<unsigned>(session_id_),
1062 status.code());
1063
1064 if (ShouldSkipCompletionHandshake()) {
1065 set_transfer_state(TransferState::kCompleted);
1066 } else {
1067 EncodeAndSendChunk(
1068 Chunk(configured_protocol_version_, Chunk::Type::kCompletionAck)
1069 .set_session_id(session_id_));
1070
1071 set_transfer_state(TransferState::kInactive);
1072 }
1073 }
1074
SendFinalStatusChunk(bool with_resource_id)1075 void Context::SendFinalStatusChunk(bool with_resource_id) {
1076 PW_DCHECK(transfer_state_ == TransferState::kCompleted ||
1077 transfer_state_ == TransferState::kTerminating);
1078
1079 PW_LOG_INFO("Sending final chunk for transfer %u with status %u",
1080 static_cast<unsigned>(session_id_),
1081 status_.code());
1082
1083 Chunk chunk =
1084 Chunk::Final(configured_protocol_version_, session_id_, status_);
1085 if (with_resource_id) {
1086 chunk.set_resource_id(resource_id_);
1087 }
1088 EncodeAndSendChunk(chunk);
1089 }
1090
Finish(Status status)1091 void Context::Finish(Status status) {
1092 PW_DCHECK(active());
1093
1094 status.Update(FinalCleanup(status));
1095 status_ = status;
1096
1097 SetTimeout(kFinalChunkAckTimeout);
1098 }
1099
SetTimeout(chrono::SystemClock::duration timeout)1100 void Context::SetTimeout(chrono::SystemClock::duration timeout) {
1101 next_timeout_ = chrono::SystemClock::TimePointAfterAtLeast(timeout);
1102 }
1103
HandleTimeout()1104 void Context::HandleTimeout() {
1105 ClearTimeout();
1106
1107 switch (transfer_state_) {
1108 case TransferState::kCompleted:
1109 // A timeout occurring in a completed state indicates that the other side
1110 // never ACKed the final status packet. Reset the context to inactive.
1111 set_transfer_state(TransferState::kInactive);
1112 return;
1113
1114 case TransferState::kTransmitting:
1115 // A timeout occurring in a TRANSMITTING state indicates that the transfer
1116 // has waited for its inter-chunk delay and should transmit its next
1117 // chunk.
1118 TransmitNextChunk(/*retransmit_requested=*/false);
1119 break;
1120
1121 case TransferState::kInitiating:
1122 case TransferState::kWaiting:
1123 case TransferState::kRecovery:
1124 case TransferState::kTerminating:
1125 // A timeout occurring in a transfer or handshake state indicates that no
1126 // chunk has been received from the other side. The transfer should retry
1127 // its previous operation.
1128 //
1129 // The timeout is set immediately. Retry() will clear it if it fails.
1130 if (transfer_state_ == TransferState::kInitiating &&
1131 last_chunk_sent_ == Chunk::Type::kStart) {
1132 SetTimeout(initial_chunk_timeout_);
1133 } else {
1134 SetTimeout(chunk_timeout_);
1135 }
1136 Retry();
1137 break;
1138
1139 case TransferState::kInactive:
1140 PW_LOG_ERROR("Timeout occurred in INACTIVE state");
1141 return;
1142 }
1143 }
1144
Retry()1145 void Context::Retry() {
1146 if (retries_ == max_retries_ || lifetime_retries_ == max_lifetime_retries_) {
1147 PW_LOG_ERROR(
1148 "Transfer %u failed to receive a chunk after %u retries (lifetime %u).",
1149 id_for_log(),
1150 static_cast<unsigned>(retries_),
1151 static_cast<unsigned>(lifetime_retries_));
1152 PW_LOG_ERROR("Canceling transfer.");
1153
1154 if (transfer_state_ == TransferState::kTerminating) {
1155 // Timeouts occurring in a TERMINATING state indicate that the completion
1156 // chunk was never ACKed. Simply clean up the transfer context.
1157 set_transfer_state(TransferState::kInactive);
1158 } else {
1159 TerminateTransfer(Status::DeadlineExceeded());
1160 }
1161 return;
1162 }
1163
1164 ++retries_;
1165 ++lifetime_retries_;
1166
1167 if (transfer_state_ == TransferState::kInitiating ||
1168 last_chunk_sent_ == Chunk::Type::kStartAckConfirmation) {
1169 RetryHandshake();
1170 return;
1171 }
1172
1173 if (transfer_state_ == TransferState::kTerminating) {
1174 EncodeAndSendChunk(
1175 Chunk::Final(configured_protocol_version_, session_id_, status_));
1176 return;
1177 }
1178
1179 if (type() == TransferType::kReceive) {
1180 // Resend the most recent transfer parameters.
1181 PW_LOG_DEBUG(
1182 "Receive transfer %u timed out waiting for chunk; resending parameters",
1183 static_cast<unsigned>(session_id_));
1184
1185 UpdateAndSendTransferParameters(TransmitAction::kRetransmit);
1186 return;
1187 }
1188
1189 // In a transmit, if a data chunk has not yet been sent, the initial transfer
1190 // parameters did not arrive from the receiver. Resend the initial chunk.
1191 if ((flags_ & kFlagsDataSent) != kFlagsDataSent) {
1192 PW_LOG_DEBUG(
1193 "Transmit transfer %u timed out waiting for initial parameters",
1194 static_cast<unsigned>(session_id_));
1195 SendInitialLegacyTransmitChunk();
1196 return;
1197 }
1198
1199 // Otherwise, resend the most recent chunk. If the reader doesn't support
1200 // seeking, this isn't possible, so just terminate the transfer immediately.
1201 if (!SeekReader(last_chunk_offset_).ok()) {
1202 PW_LOG_ERROR("Transmit transfer %u timed out waiting for new parameters.",
1203 id_for_log());
1204 PW_LOG_ERROR("Retrying requires a seekable reader. Alas, ours is not.");
1205 TerminateTransfer(Status::DeadlineExceeded());
1206 return;
1207 }
1208
1209 // Rewind the transfer position and resend the chunk.
1210 offset_ = last_chunk_offset_;
1211
1212 TransmitNextChunk(/*retransmit_requested=*/false);
1213 }
1214
RetryHandshake()1215 void Context::RetryHandshake() {
1216 Chunk retry_chunk(configured_protocol_version_, last_chunk_sent_);
1217
1218 switch (last_chunk_sent_) {
1219 case Chunk::Type::kStart:
1220 // No protocol version is yet configured at the time of sending the start
1221 // chunk, so we use the client's desired version instead.
1222 retry_chunk.set_protocol_version(desired_protocol_version_)
1223 .set_desired_session_id(session_id_)
1224 .set_resource_id(resource_id_)
1225 .set_initial_offset(offset_);
1226 if (type() == TransferType::kReceive) {
1227 SetTransferParameters(retry_chunk);
1228 }
1229 break;
1230
1231 case Chunk::Type::kStartAck:
1232 retry_chunk.set_session_id(session_id_)
1233 .set_resource_id(static_cast<ServerContext&>(*this).handler()->id());
1234 break;
1235
1236 case Chunk::Type::kStartAckConfirmation:
1237 retry_chunk.set_session_id(session_id_);
1238 if (type() == TransferType::kReceive) {
1239 SetTransferParameters(retry_chunk);
1240 }
1241 break;
1242
1243 case Chunk::Type::kData:
1244 case Chunk::Type::kParametersRetransmit:
1245 case Chunk::Type::kParametersContinue:
1246 case Chunk::Type::kCompletion:
1247 case Chunk::Type::kCompletionAck:
1248 PW_CRASH("Should not RetryHandshake() when not in handshake phase");
1249 }
1250
1251 EncodeAndSendChunk(retry_chunk);
1252 }
1253
MaxWriteChunkSize(uint32_t max_chunk_size_bytes,uint32_t channel_id) const1254 uint32_t Context::MaxWriteChunkSize(uint32_t max_chunk_size_bytes,
1255 uint32_t channel_id) const {
1256 // Start with the user-provided maximum chunk size, which should be the usable
1257 // payload length on the RPC ingress path after any transport overhead.
1258 ptrdiff_t max_size = max_chunk_size_bytes;
1259
1260 // Subtract the RPC overhead (pw_rpc/internal/packet.proto).
1261 //
1262 // type: 1 byte key, 1 byte value (CLIENT_STREAM)
1263 // channel_id: 1 byte key, varint value (calculate from stream)
1264 // service_id: 1 byte key, 4 byte value
1265 // method_id: 1 byte key, 4 byte value
1266 // payload: 1 byte key, varint length (remaining space)
1267 // status: 0 bytes (not set in stream packets)
1268 //
1269 // TOTAL: 14 bytes + encoded channel_id size + encoded payload length
1270 //
1271 max_size -= 14;
1272 max_size -= varint::EncodedSize(channel_id);
1273 max_size -= varint::EncodedSize(max_size);
1274
1275 // TODO(frolv): Temporarily add 5 bytes for the new call_id change. The RPC
1276 // overhead calculation will be moved into an RPC helper to avoid having
1277 // pw_transfer depend on RPC internals.
1278 max_size -= 5;
1279
1280 // Subtract the transfer service overhead for a client write chunk
1281 // (pw_transfer/transfer.proto).
1282 //
1283 // session_id: 1 byte key, varint value (calculate)
1284 // offset: 1 byte key, varint value (calculate)
1285 // data: 1 byte key, varint length (remaining space)
1286 //
1287 // TOTAL: 3 + encoded session_id + encoded offset + encoded data length
1288 //
1289 // Use a lower bound of a single chunk for the window end offset, as it will
1290 // always be at least in that range.
1291 size_t window_end_offset = std::max(window_end_offset_, max_chunk_size_bytes);
1292 max_size -= 3;
1293 max_size -= varint::EncodedSize(session_id_);
1294 max_size -= varint::EncodedSize(window_end_offset);
1295 max_size -= varint::EncodedSize(max_size);
1296
1297 // A resulting value of zero (or less) renders write transfers unusable, as
1298 // there is no space to send any payload. This should be considered a
1299 // programmer error in the transfer service setup.
1300 PW_CHECK_INT_GT(
1301 max_size,
1302 0,
1303 "Transfer service maximum chunk size is too small to fit a payload. "
1304 "Increase max_chunk_size_bytes to support write transfers.");
1305
1306 return max_size;
1307 }
1308
LogTransferConfiguration()1309 void Context::LogTransferConfiguration() {
1310 PW_LOG_DEBUG(
1311 "Local transfer timing configuration: "
1312 "chunk_timeout=%ums, max_retries=%u, interchunk_delay=%uus",
1313 static_cast<unsigned>(
1314 std::chrono::ceil<std::chrono::milliseconds>(chunk_timeout_).count()),
1315 static_cast<unsigned>(max_retries_),
1316 static_cast<unsigned>(
1317 std::chrono::ceil<std::chrono::microseconds>(interchunk_delay_)
1318 .count()));
1319
1320 PW_LOG_DEBUG(
1321 "Local transfer windowing configuration: max_window_size_bytes=%u, "
1322 "extend_window_divisor=%u, max_chunk_size_bytes=%u",
1323 static_cast<unsigned>(max_parameters_->max_window_size_bytes()),
1324 static_cast<unsigned>(max_parameters_->extend_window_divisor()),
1325 static_cast<unsigned>(max_parameters_->max_chunk_size_bytes()));
1326 }
1327
1328 } // namespace pw::transfer::internal
1329