1 // Copyright 2022 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 #pragma once 15 16 #include <cinttypes> 17 #include <cstddef> 18 #include <limits> 19 #include <optional> 20 21 #include "pw_assert/assert.h" 22 #include "pw_chrono/system_clock.h" 23 #include "pw_rpc/writer.h" 24 #include "pw_status/status.h" 25 #include "pw_stream/stream.h" 26 #include "pw_transfer/internal/chunk.h" 27 #include "pw_transfer/internal/event.h" 28 #include "pw_transfer/rate_estimate.h" 29 30 namespace pw::transfer::internal { 31 32 class TransferThread; 33 34 class TransferParameters { 35 public: TransferParameters(uint32_t pending_bytes,uint32_t max_chunk_size_bytes,uint32_t extend_window_divisor)36 constexpr TransferParameters(uint32_t pending_bytes, 37 uint32_t max_chunk_size_bytes, 38 uint32_t extend_window_divisor) 39 : pending_bytes_(pending_bytes), 40 max_chunk_size_bytes_(max_chunk_size_bytes), 41 extend_window_divisor_(extend_window_divisor) { 42 PW_ASSERT(pending_bytes > 0); 43 PW_ASSERT(max_chunk_size_bytes > 0); 44 PW_ASSERT(extend_window_divisor > 1); 45 } 46 pending_bytes()47 uint32_t pending_bytes() const { return pending_bytes_; } set_pending_bytes(uint32_t pending_bytes)48 void set_pending_bytes(uint32_t pending_bytes) { 49 pending_bytes_ = pending_bytes; 50 } 51 max_chunk_size_bytes()52 uint32_t max_chunk_size_bytes() const { return max_chunk_size_bytes_; } set_max_chunk_size_bytes(uint32_t max_chunk_size_bytes)53 void set_max_chunk_size_bytes(uint32_t max_chunk_size_bytes) { 54 max_chunk_size_bytes_ = max_chunk_size_bytes; 55 } 56 extend_window_divisor()57 uint32_t extend_window_divisor() const { return extend_window_divisor_; } set_extend_window_divisor(uint32_t extend_window_divisor)58 void set_extend_window_divisor(uint32_t extend_window_divisor) { 59 PW_DASSERT(extend_window_divisor > 1); 60 extend_window_divisor_ = extend_window_divisor; 61 } 62 63 private: 64 uint32_t pending_bytes_; 65 uint32_t max_chunk_size_bytes_; 66 uint32_t extend_window_divisor_; 67 }; 68 69 // Information about a single transfer. 70 class Context { 71 public: 72 Context(const Context&) = delete; 73 Context(Context&&) = delete; 74 Context& operator=(const Context&) = delete; 75 Context& operator=(Context&&) = delete; 76 transfer_id()77 constexpr uint32_t transfer_id() const { return transfer_id_; } 78 79 // True if the context has been used for a transfer (it has an ID). initialized()80 bool initialized() const { 81 return transfer_state_ != TransferState::kInactive; 82 } 83 84 // True if the transfer is active. active()85 bool active() const { return transfer_state_ >= TransferState::kWaiting; } 86 timeout()87 std::optional<chrono::SystemClock::time_point> timeout() const { 88 return active() && next_timeout_ != kNoTimeout 89 ? std::optional(next_timeout_) 90 : std::nullopt; 91 } 92 93 // Returns true if the transfer's most recently set timeout has passed. timed_out()94 bool timed_out() const { 95 std::optional<chrono::SystemClock::time_point> next_timeout = timeout(); 96 return next_timeout.has_value() && 97 chrono::SystemClock::now() >= next_timeout.value(); 98 } 99 100 // Processes an event for this transfer. 101 void HandleEvent(const Event& event); 102 103 protected: 104 ~Context() = default; 105 Context()106 constexpr Context() 107 : transfer_id_(0), 108 flags_(0), 109 transfer_state_(TransferState::kInactive), 110 retries_(0), 111 max_retries_(0), 112 stream_(nullptr), 113 rpc_writer_(nullptr), 114 offset_(0), 115 window_size_(0), 116 window_end_offset_(0), 117 pending_bytes_(0), 118 max_chunk_size_bytes_(std::numeric_limits<uint32_t>::max()), 119 max_parameters_(nullptr), 120 thread_(nullptr), 121 last_chunk_offset_(0), 122 chunk_timeout_(chrono::SystemClock::duration::zero()), 123 interchunk_delay_(chrono::SystemClock::for_at_least( 124 std::chrono::microseconds(kDefaultChunkDelayMicroseconds))), 125 next_timeout_(kNoTimeout) {} 126 type()127 constexpr TransferType type() const { 128 return static_cast<TransferType>(flags_ & kFlagsType); 129 } 130 131 private: 132 enum class TransferState : uint8_t { 133 // This ServerContext has never been used for a transfer. It is available 134 // for use for a transfer. 135 kInactive, 136 // A transfer completed and the final status chunk was sent. The Context 137 // is 138 // available for use for a new transfer. A receive transfer uses this 139 // state 140 // to allow a transmitter to retry its last chunk if the final status 141 // chunk 142 // was dropped. 143 kCompleted, 144 // Waiting for the other end to send a chunk. 145 kWaiting, 146 // Transmitting a window of data to a receiver. 147 kTransmitting, 148 // Recovering after one or more chunks was dropped in an active transfer. 149 kRecovery, 150 }; 151 152 enum class TransmitAction { 153 // Start of a new transfer. 154 kBegin, 155 // Extend the current window length. 156 kExtend, 157 // Retransmit from a specified offset. 158 kRetransmit, 159 }; 160 set_transfer_state(TransferState state)161 void set_transfer_state(TransferState state) { transfer_state_ = state; } 162 163 // The transfer ID as unsigned instead of uint32_t so it can be used with %u. id_for_log()164 unsigned id_for_log() const { 165 static_assert(sizeof(unsigned) >= sizeof(transfer_id_)); 166 return static_cast<unsigned>(transfer_id_); 167 } 168 reader()169 stream::Reader& reader() { 170 PW_DASSERT(active() && type() == TransferType::kTransmit); 171 return static_cast<stream::Reader&>(*stream_); 172 } 173 writer()174 stream::Writer& writer() { 175 PW_DASSERT(active() && type() == TransferType::kReceive); 176 return static_cast<stream::Writer&>(*stream_); 177 } 178 179 // Calculates the maximum size of actual data that can be sent within a 180 // single client write transfer chunk, accounting for the overhead of the 181 // transfer protocol and RPC system. 182 // 183 // Note: This function relies on RPC protocol internals. This is generally a 184 // *bad* idea, but is necessary here due to limitations of the RPC system 185 // and its asymmetric ingress and egress paths. 186 // 187 // TODO(frolv): This should be investigated further and perhaps addressed 188 // within the RPC system, at the least through a helper function. 189 uint32_t MaxWriteChunkSize(uint32_t max_chunk_size_bytes, 190 uint32_t channel_id) const; 191 192 // Initializes a new transfer using new_transfer. The provided stream 193 // argument is used in place of the NewTransferEvent's stream. Only 194 // initializes state; no packets are sent. 195 // 196 // Precondition: context is not active. 197 void Initialize(const NewTransferEvent& new_transfer); 198 199 // Starts a new transfer from an initialized context by sending the initial 200 // transfer chunk. This is only used by transfer clients, as the transfer 201 // service cannot initiate transfers. 202 // 203 // Calls Finish(), which calls the on_completion callback, if initiating a 204 // transfer fails. 205 void InitiateTransferAsClient(); 206 207 // Starts a new transfer on the server after receiving a request from a 208 // client. 209 void StartTransferAsServer(const NewTransferEvent& new_transfer); 210 211 // Does final cleanup specific to the server or client. Returns whether the 212 // cleanup succeeded. An error in cleanup indicates that the transfer 213 // failed. 214 virtual Status FinalCleanup(Status status) = 0; 215 216 // Processes a chunk in either a transfer or receive transfer. 217 void HandleChunkEvent(const ChunkEvent& event); 218 219 // Processes a chunk in a transmit transfer. 220 void HandleTransmitChunk(const Chunk& chunk); 221 222 // Processes a transfer parameters update in a transmit transfer. 223 void HandleTransferParametersUpdate(const Chunk& chunk); 224 225 // Sends the next chunk in a transmit transfer, if any. 226 void TransmitNextChunk(bool retransmit_requested); 227 228 // Processes a chunk in a receive transfer. 229 void HandleReceiveChunk(const Chunk& chunk); 230 231 // Processes a data chunk in a received while in the kWaiting state. 232 void HandleReceivedData(const Chunk& chunk); 233 234 // Sends the first chunk in a transmit transfer. 235 void SendInitialTransmitChunk(); 236 237 // In a receive transfer, sends a parameters chunk telling the transmitter 238 // how much data they can send. 239 void SendTransferParameters(TransmitAction action); 240 241 // Updates the current receive transfer parameters from the provided object, 242 // then sends them. 243 void UpdateAndSendTransferParameters(TransmitAction action); 244 245 // Sends a final status chunk of a completed transfer without updating the 246 // the transfer. Sends status_, which MUST have been set by a previous 247 // Finish() call. 248 void SendFinalStatusChunk(); 249 250 // Marks the transfer as completed and calls FinalCleanup(). Sets status_ to 251 // the final status for this transfer. The transfer MUST be active when this 252 // is called. 253 void Finish(Status status); 254 255 // Encodes the specified chunk to the encode buffer and sends it with the 256 // rpc_writer_. Calls Finish() with an error if the operation fails. 257 void EncodeAndSendChunk(const Chunk& chunk); 258 259 void SetTimeout(chrono::SystemClock::duration timeout); ClearTimeout()260 void ClearTimeout() { next_timeout_ = kNoTimeout; } 261 262 // Called when the transfer's timeout expires. 263 void HandleTimeout(); 264 265 // Resends the last packet or aborts the transfer if the maximum retries has 266 // been exceeded. 267 void Retry(); 268 269 static constexpr uint8_t kFlagsType = 1 << 0; 270 static constexpr uint8_t kFlagsDataSent = 1 << 1; 271 272 static constexpr uint32_t kDefaultChunkDelayMicroseconds = 2000; 273 274 // How long to wait for the other side to ACK a final transfer chunk before 275 // resetting the context so that it can be reused. During this time, the 276 // status chunk will be re-sent for every non-ACK chunk received, 277 // continually notifying the other end that the transfer is over. 278 static constexpr chrono::SystemClock::duration kFinalChunkAckTimeout = 279 std::chrono::milliseconds(5000); 280 281 static constexpr chrono::SystemClock::time_point kNoTimeout = 282 chrono::SystemClock::time_point(chrono::SystemClock::duration(0)); 283 284 uint32_t transfer_id_; 285 uint8_t flags_; 286 TransferState transfer_state_; 287 uint8_t retries_; 288 uint8_t max_retries_; 289 290 // The stream from which to read or to which to write data. 291 stream::Stream* stream_; 292 rpc::Writer* rpc_writer_; 293 294 uint32_t offset_; 295 uint32_t window_size_; 296 uint32_t window_end_offset_; 297 // TODO(pwbug/584): Remove pending_bytes in favor of window_end_offset. 298 uint32_t pending_bytes_; 299 uint32_t max_chunk_size_bytes_; 300 301 const TransferParameters* max_parameters_; 302 TransferThread* thread_; 303 304 union { 305 Status status_; // Used when state is kCompleted. 306 uint32_t last_chunk_offset_; // Used in states kWaiting and kRecovery. 307 }; 308 309 // How long to wait for a chunk from the other end. 310 chrono::SystemClock::duration chunk_timeout_; 311 312 // How long to delay between transmitting subsequent data chunks within a 313 // window. 314 chrono::SystemClock::duration interchunk_delay_; 315 316 // Timestamp at which the transfer will next time out, or kNoTimeout. 317 chrono::SystemClock::time_point next_timeout_; 318 319 RateEstimate transfer_rate_; 320 }; 321 322 } // namespace pw::transfer::internal 323