1 // Copyright 2022 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 #pragma once 15 16 #include <cinttypes> 17 #include <cstddef> 18 #include <limits> 19 #include <optional> 20 21 #include "pw_assert/assert.h" 22 #include "pw_chrono/system_clock.h" 23 #include "pw_rpc/writer.h" 24 #include "pw_status/status.h" 25 #include "pw_stream/stream.h" 26 #include "pw_transfer/internal/chunk.h" 27 #include "pw_transfer/internal/event.h" 28 #include "pw_transfer/internal/protocol.h" 29 #include "pw_transfer/rate_estimate.h" 30 31 namespace pw::transfer::internal { 32 33 class TransferThread; 34 35 class TransferParameters { 36 public: TransferParameters(uint32_t pending_bytes,uint32_t max_chunk_size_bytes,uint32_t extend_window_divisor)37 constexpr TransferParameters(uint32_t pending_bytes, 38 uint32_t max_chunk_size_bytes, 39 uint32_t extend_window_divisor) 40 : pending_bytes_(pending_bytes), 41 max_chunk_size_bytes_(max_chunk_size_bytes), 42 extend_window_divisor_(extend_window_divisor) { 43 PW_ASSERT(pending_bytes > 0); 44 PW_ASSERT(max_chunk_size_bytes > 0); 45 PW_ASSERT(extend_window_divisor > 1); 46 } 47 pending_bytes()48 uint32_t pending_bytes() const { return pending_bytes_; } set_pending_bytes(uint32_t pending_bytes)49 void set_pending_bytes(uint32_t pending_bytes) { 50 pending_bytes_ = pending_bytes; 51 } 52 max_chunk_size_bytes()53 uint32_t max_chunk_size_bytes() const { return max_chunk_size_bytes_; } set_max_chunk_size_bytes(uint32_t max_chunk_size_bytes)54 void set_max_chunk_size_bytes(uint32_t max_chunk_size_bytes) { 55 max_chunk_size_bytes_ = max_chunk_size_bytes; 56 } 57 extend_window_divisor()58 uint32_t extend_window_divisor() const { return extend_window_divisor_; } set_extend_window_divisor(uint32_t extend_window_divisor)59 void set_extend_window_divisor(uint32_t extend_window_divisor) { 60 PW_DASSERT(extend_window_divisor > 1); 61 extend_window_divisor_ = extend_window_divisor; 62 } 63 64 private: 65 uint32_t pending_bytes_; 66 uint32_t max_chunk_size_bytes_; 67 uint32_t extend_window_divisor_; 68 }; 69 70 // Information about a single transfer. 71 class Context { 72 public: 73 static constexpr uint32_t kUnassignedSessionId = 0; 74 75 Context(const Context&) = delete; 76 Context(Context&&) = delete; 77 Context& operator=(const Context&) = delete; 78 Context& operator=(Context&&) = delete; 79 session_id()80 constexpr uint32_t session_id() const { return session_id_; } resource_id()81 constexpr uint32_t resource_id() const { return resource_id_; } 82 83 // True if the context has been used for a transfer (it has an ID). initialized()84 bool initialized() const { 85 return transfer_state_ != TransferState::kInactive; 86 } 87 88 // True if the transfer is active. active()89 bool active() const { return transfer_state_ >= TransferState::kInitiating; } 90 timeout()91 std::optional<chrono::SystemClock::time_point> timeout() const { 92 return active() && next_timeout_ != kNoTimeout 93 ? std::optional(next_timeout_) 94 : std::nullopt; 95 } 96 97 // Returns true if the transfer's most recently set timeout has passed. timed_out()98 bool timed_out() const { 99 std::optional<chrono::SystemClock::time_point> next_timeout = timeout(); 100 return next_timeout.has_value() && 101 chrono::SystemClock::now() >= next_timeout.value(); 102 } 103 104 // Processes an event for this transfer. 105 void HandleEvent(const Event& event); 106 107 protected: 108 ~Context() = default; 109 Context()110 constexpr Context() 111 : session_id_(kUnassignedSessionId), 112 resource_id_(0), 113 desired_protocol_version_(ProtocolVersion::kUnknown), 114 configured_protocol_version_(ProtocolVersion::kUnknown), 115 flags_(0), 116 transfer_state_(TransferState::kInactive), 117 retries_(0), 118 max_retries_(0), 119 lifetime_retries_(0), 120 max_lifetime_retries_(0), 121 stream_(nullptr), 122 rpc_writer_(nullptr), 123 offset_(0), 124 window_size_(0), 125 window_end_offset_(0), 126 max_chunk_size_bytes_(std::numeric_limits<uint32_t>::max()), 127 max_parameters_(nullptr), 128 thread_(nullptr), 129 last_chunk_sent_(Chunk::Type::kData), 130 last_chunk_offset_(0), 131 chunk_timeout_(chrono::SystemClock::duration::zero()), 132 interchunk_delay_(chrono::SystemClock::for_at_least( 133 std::chrono::microseconds(kDefaultChunkDelayMicroseconds))), 134 next_timeout_(kNoTimeout) {} 135 type()136 constexpr TransferType type() const { 137 return static_cast<TransferType>(flags_ & kFlagsType); 138 } 139 140 private: 141 enum class TransferState : uint8_t { 142 // The context is available for use for a new transfer. 143 kInactive, 144 145 // A transfer completed and the final status chunk was sent. The Context is 146 // available for use for a new transfer. A receive transfer uses this state 147 // to allow a transmitter to retry its last chunk if the final status chunk 148 // was dropped. 149 // 150 // Only used by the legacy protocol. Starting from version 2, transfer 151 // completions are acknowledged, for which the TERMINATING state is used. 152 kCompleted, 153 154 // Transfer is starting. The server and client are performing an initial 155 // handshake and negotiating protocol and feature flags. 156 kInitiating, 157 158 // Waiting for the other end to send a chunk. 159 kWaiting, 160 161 // Transmitting a window of data to a receiver. 162 kTransmitting, 163 164 // Recovering after one or more chunks was dropped in an active transfer. 165 kRecovery, 166 167 // Transfer has completed locally and is waiting for the peer to acknowledge 168 // its final status. Only entered by the terminating side of the transfer. 169 // 170 // The context remains in a TERMINATING state until it receives an 171 // acknowledgement from the peer or times out. Either way, the context 172 // transitions to INACTIVE afterwards, fully cleaning it up for reuse. 173 // 174 // Used instead of COMPLETED starting from version 2. Unlike COMPLETED, 175 // contexts in a TERMINATING state cannot be used to start new transfers. 176 kTerminating, 177 }; 178 179 enum class TransmitAction { 180 // Start of a new transfer. 181 kBegin, 182 // Extend the current window length. 183 kExtend, 184 // Retransmit from a specified offset. 185 kRetransmit, 186 }; 187 set_transfer_state(TransferState state)188 void set_transfer_state(TransferState state) { transfer_state_ = state; } 189 190 // The session ID as unsigned instead of uint32_t so it can be used with %u. id_for_log()191 unsigned id_for_log() const { 192 static_assert(sizeof(unsigned) >= sizeof(session_id_)); 193 return static_cast<unsigned>(session_id_); 194 } 195 reader()196 stream::Reader& reader() { 197 PW_DASSERT(active() && type() == TransferType::kTransmit); 198 return static_cast<stream::Reader&>(*stream_); 199 } 200 writer()201 stream::Writer& writer() { 202 PW_DASSERT(active() && type() == TransferType::kReceive); 203 return static_cast<stream::Writer&>(*stream_); 204 } 205 DataTransferComplete()206 bool DataTransferComplete() const { 207 return transfer_state_ == TransferState::kTerminating || 208 transfer_state_ == TransferState::kCompleted; 209 } 210 ShouldSkipCompletionHandshake()211 bool ShouldSkipCompletionHandshake() const { 212 // Completion handshakes are not part of the legacy protocol. Additionally, 213 // transfers which have not yet fully established should not handshake and 214 // simply time out. 215 return configured_protocol_version_ <= ProtocolVersion::kLegacy || 216 transfer_state_ == TransferState::kInitiating; 217 } 218 219 // Calculates the maximum size of actual data that can be sent within a 220 // single client write transfer chunk, accounting for the overhead of the 221 // transfer protocol and RPC system. 222 // 223 // Note: This function relies on RPC protocol internals. This is generally a 224 // *bad* idea, but is necessary here due to limitations of the RPC system 225 // and its asymmetric ingress and egress paths. 226 // 227 // TODO(frolv): This should be investigated further and perhaps addressed 228 // within the RPC system, at the least through a helper function. 229 uint32_t MaxWriteChunkSize(uint32_t max_chunk_size_bytes, 230 uint32_t channel_id) const; 231 232 // Initializes a new transfer using new_transfer. The provided stream 233 // argument is used in place of the NewTransferEvent's stream. Only 234 // initializes state; no packets are sent. 235 // 236 // Precondition: context is not active. 237 void Initialize(const NewTransferEvent& new_transfer); 238 239 // Starts a new transfer from an initialized context by sending the initial 240 // transfer chunk. This is only used by transfer clients, as the transfer 241 // service cannot initiate transfers. 242 // 243 // Calls Finish(), which calls the on_completion callback, if initiating a 244 // transfer fails. 245 void InitiateTransferAsClient(); 246 247 // Starts a new transfer on the server after receiving a request from a 248 // client. 249 bool StartTransferAsServer(const NewTransferEvent& new_transfer); 250 251 // Does final cleanup specific to the server or client. Returns whether the 252 // cleanup succeeded. An error in cleanup indicates that the transfer 253 // failed. 254 virtual Status FinalCleanup(Status status) = 0; 255 256 // Processes a chunk in either a transfer or receive transfer. 257 void HandleChunkEvent(const ChunkEvent& event); 258 259 // Runs the initial three-way handshake when starting a new transfer. 260 void PerformInitialHandshake(const Chunk& chunk); 261 262 void UpdateLocalProtocolConfigurationFromPeer(const Chunk& chunk); 263 264 // Processes a chunk in a transmit transfer. 265 void HandleTransmitChunk(const Chunk& chunk); 266 267 // Processes a transfer parameters update in a transmit transfer. 268 void HandleTransferParametersUpdate(const Chunk& chunk); 269 270 // Sends the next chunk in a transmit transfer, if any. 271 void TransmitNextChunk(bool retransmit_requested); 272 273 // Processes a chunk in a receive transfer. 274 void HandleReceiveChunk(const Chunk& chunk); 275 276 // Processes a data chunk in a received while in the kWaiting state. 277 void HandleReceivedData(const Chunk& chunk); 278 279 // Sends the first chunk in a transmit transfer. 280 void SendInitialTransmitChunk(); 281 282 // Updates the current receive transfer parameters based on the context's 283 // configuration. 284 void UpdateTransferParameters(); 285 286 // Populates the transfer parameters fields on a chunk object. 287 void SetTransferParameters(Chunk& parameters); 288 289 // In a receive transfer, sends a parameters chunk telling the transmitter 290 // how much data they can send. 291 void SendTransferParameters(TransmitAction action); 292 293 // Updates the current receive transfer parameters, then sends them. 294 void UpdateAndSendTransferParameters(TransmitAction action); 295 296 // Processes a chunk in a terminating state. 297 void HandleTerminatingChunk(const Chunk& chunk); 298 299 // Ends the transfer with the specified status, sending a completion chunk to 300 // the peer. 301 void TerminateTransfer(Status status, bool with_resource_id = false); 302 303 // Ends a transfer following notification of completion from the peer. 304 void HandleTermination(Status status); 305 306 // Forcefully ends a transfer locally without contacting the peer. Abort(Status status)307 void Abort(Status status) { 308 Finish(status); 309 set_transfer_state(TransferState::kCompleted); 310 } 311 312 // Sends a final status chunk of a completed transfer without updating the 313 // transfer. Sends status_, which MUST have been set by a previous Finish() 314 // call. 315 void SendFinalStatusChunk(bool with_resource_id = false); 316 317 // Marks the transfer as completed and calls FinalCleanup(). Sets status_ to 318 // the final status for this transfer. The transfer MUST be active when this 319 // is called. 320 void Finish(Status status); 321 322 // Encodes the specified chunk to the encode buffer and sends it with the 323 // rpc_writer_. Calls Finish() with an error if the operation fails. 324 void EncodeAndSendChunk(const Chunk& chunk); 325 326 void SetTimeout(chrono::SystemClock::duration timeout); ClearTimeout()327 void ClearTimeout() { next_timeout_ = kNoTimeout; } 328 329 // Called when the transfer's timeout expires. 330 void HandleTimeout(); 331 332 // Resends the last packet or aborts the transfer if the maximum retries has 333 // been exceeded. 334 void Retry(); 335 void RetryHandshake(); 336 337 void LogTransferConfiguration(); 338 339 static constexpr uint8_t kFlagsType = 1 << 0; 340 static constexpr uint8_t kFlagsDataSent = 1 << 1; 341 static constexpr uint8_t kFlagsContactMade = 1 << 2; 342 343 static constexpr uint32_t kDefaultChunkDelayMicroseconds = 2000; 344 345 // How long to wait for the other side to ACK a final transfer chunk before 346 // resetting the context so that it can be reused. During this time, the 347 // status chunk will be re-sent for every non-ACK chunk received, 348 // continually notifying the other end that the transfer is over. 349 static constexpr chrono::SystemClock::duration kFinalChunkAckTimeout = 350 std::chrono::milliseconds(5000); 351 352 static constexpr chrono::SystemClock::time_point kNoTimeout = 353 chrono::SystemClock::time_point(chrono::SystemClock::duration(0)); 354 355 uint32_t session_id_; 356 uint32_t resource_id_; 357 358 // The version of the transfer protocol that this node wants to run. 359 ProtocolVersion desired_protocol_version_; 360 361 // The version of the transfer protocol that the context is actually running, 362 // following negotiation with the transfer peer. 363 ProtocolVersion configured_protocol_version_; 364 365 uint8_t flags_; 366 TransferState transfer_state_; 367 uint8_t retries_; 368 uint8_t max_retries_; 369 uint32_t lifetime_retries_; 370 uint32_t max_lifetime_retries_; 371 372 // The stream from which to read or to which to write data. 373 stream::Stream* stream_; 374 rpc::Writer* rpc_writer_; 375 376 uint32_t offset_; 377 uint32_t window_size_; 378 uint32_t window_end_offset_; 379 uint32_t max_chunk_size_bytes_; 380 381 const TransferParameters* max_parameters_; 382 TransferThread* thread_; 383 384 Chunk::Type last_chunk_sent_; 385 386 union { 387 Status status_; // Used when state is kCompleted. 388 uint32_t last_chunk_offset_; // Used in states kWaiting and kRecovery. 389 }; 390 391 // How long to wait for a chunk from the other end. 392 chrono::SystemClock::duration chunk_timeout_; 393 394 // How long to delay between transmitting subsequent data chunks within a 395 // window. 396 chrono::SystemClock::duration interchunk_delay_; 397 398 // Timestamp at which the transfer will next time out, or kNoTimeout. 399 chrono::SystemClock::time_point next_timeout_; 400 401 RateEstimate transfer_rate_; 402 }; 403 404 } // namespace pw::transfer::internal 405