• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 #pragma once
15 
16 #include <cinttypes>
17 #include <cstddef>
18 #include <limits>
19 #include <optional>
20 
21 #include "pw_assert/assert.h"
22 #include "pw_chrono/system_clock.h"
23 #include "pw_rpc/writer.h"
24 #include "pw_status/status.h"
25 #include "pw_stream/stream.h"
26 #include "pw_transfer/internal/chunk.h"
27 #include "pw_transfer/internal/event.h"
28 #include "pw_transfer/internal/protocol.h"
29 #include "pw_transfer/rate_estimate.h"
30 
31 namespace pw::transfer::internal {
32 
33 class TransferThread;
34 
35 class TransferParameters {
36  public:
TransferParameters(uint32_t pending_bytes,uint32_t max_chunk_size_bytes,uint32_t extend_window_divisor)37   constexpr TransferParameters(uint32_t pending_bytes,
38                                uint32_t max_chunk_size_bytes,
39                                uint32_t extend_window_divisor)
40       : pending_bytes_(pending_bytes),
41         max_chunk_size_bytes_(max_chunk_size_bytes),
42         extend_window_divisor_(extend_window_divisor) {
43     PW_ASSERT(pending_bytes > 0);
44     PW_ASSERT(max_chunk_size_bytes > 0);
45     PW_ASSERT(extend_window_divisor > 1);
46   }
47 
pending_bytes()48   uint32_t pending_bytes() const { return pending_bytes_; }
set_pending_bytes(uint32_t pending_bytes)49   void set_pending_bytes(uint32_t pending_bytes) {
50     pending_bytes_ = pending_bytes;
51   }
52 
max_chunk_size_bytes()53   uint32_t max_chunk_size_bytes() const { return max_chunk_size_bytes_; }
set_max_chunk_size_bytes(uint32_t max_chunk_size_bytes)54   void set_max_chunk_size_bytes(uint32_t max_chunk_size_bytes) {
55     max_chunk_size_bytes_ = max_chunk_size_bytes;
56   }
57 
extend_window_divisor()58   uint32_t extend_window_divisor() const { return extend_window_divisor_; }
set_extend_window_divisor(uint32_t extend_window_divisor)59   void set_extend_window_divisor(uint32_t extend_window_divisor) {
60     PW_DASSERT(extend_window_divisor > 1);
61     extend_window_divisor_ = extend_window_divisor;
62   }
63 
64  private:
65   uint32_t pending_bytes_;
66   uint32_t max_chunk_size_bytes_;
67   uint32_t extend_window_divisor_;
68 };
69 
70 // Information about a single transfer.
71 class Context {
72  public:
73   static constexpr uint32_t kUnassignedSessionId = 0;
74 
75   Context(const Context&) = delete;
76   Context(Context&&) = delete;
77   Context& operator=(const Context&) = delete;
78   Context& operator=(Context&&) = delete;
79 
session_id()80   constexpr uint32_t session_id() const { return session_id_; }
resource_id()81   constexpr uint32_t resource_id() const { return resource_id_; }
82 
83   // True if the context has been used for a transfer (it has an ID).
initialized()84   bool initialized() const {
85     return transfer_state_ != TransferState::kInactive;
86   }
87 
88   // True if the transfer is active.
active()89   bool active() const { return transfer_state_ >= TransferState::kInitiating; }
90 
timeout()91   std::optional<chrono::SystemClock::time_point> timeout() const {
92     return active() && next_timeout_ != kNoTimeout
93                ? std::optional(next_timeout_)
94                : std::nullopt;
95   }
96 
97   // Returns true if the transfer's most recently set timeout has passed.
timed_out()98   bool timed_out() const {
99     std::optional<chrono::SystemClock::time_point> next_timeout = timeout();
100     return next_timeout.has_value() &&
101            chrono::SystemClock::now() >= next_timeout.value();
102   }
103 
104   // Processes an event for this transfer.
105   void HandleEvent(const Event& event);
106 
107  protected:
108   ~Context() = default;
109 
Context()110   constexpr Context()
111       : session_id_(kUnassignedSessionId),
112         resource_id_(0),
113         desired_protocol_version_(ProtocolVersion::kUnknown),
114         configured_protocol_version_(ProtocolVersion::kUnknown),
115         flags_(0),
116         transfer_state_(TransferState::kInactive),
117         retries_(0),
118         max_retries_(0),
119         lifetime_retries_(0),
120         max_lifetime_retries_(0),
121         stream_(nullptr),
122         rpc_writer_(nullptr),
123         offset_(0),
124         window_size_(0),
125         window_end_offset_(0),
126         max_chunk_size_bytes_(std::numeric_limits<uint32_t>::max()),
127         max_parameters_(nullptr),
128         thread_(nullptr),
129         last_chunk_sent_(Chunk::Type::kData),
130         last_chunk_offset_(0),
131         chunk_timeout_(chrono::SystemClock::duration::zero()),
132         interchunk_delay_(chrono::SystemClock::for_at_least(
133             std::chrono::microseconds(kDefaultChunkDelayMicroseconds))),
134         next_timeout_(kNoTimeout) {}
135 
type()136   constexpr TransferType type() const {
137     return static_cast<TransferType>(flags_ & kFlagsType);
138   }
139 
140  private:
141   enum class TransferState : uint8_t {
142     // The context is available for use for a new transfer.
143     kInactive,
144 
145     // A transfer completed and the final status chunk was sent. The Context is
146     // available for use for a new transfer. A receive transfer uses this state
147     // to allow a transmitter to retry its last chunk if the final status chunk
148     // was dropped.
149     //
150     // Only used by the legacy protocol. Starting from version 2, transfer
151     // completions are acknowledged, for which the TERMINATING state is used.
152     kCompleted,
153 
154     // Transfer is starting. The server and client are performing an initial
155     // handshake and negotiating protocol and feature flags.
156     kInitiating,
157 
158     // Waiting for the other end to send a chunk.
159     kWaiting,
160 
161     // Transmitting a window of data to a receiver.
162     kTransmitting,
163 
164     // Recovering after one or more chunks was dropped in an active transfer.
165     kRecovery,
166 
167     // Transfer has completed locally and is waiting for the peer to acknowledge
168     // its final status. Only entered by the terminating side of the transfer.
169     //
170     // The context remains in a TERMINATING state until it receives an
171     // acknowledgement from the peer or times out. Either way, the context
172     // transitions to INACTIVE afterwards, fully cleaning it up for reuse.
173     //
174     // Used instead of COMPLETED starting from version 2. Unlike COMPLETED,
175     // contexts in a TERMINATING state cannot be used to start new transfers.
176     kTerminating,
177   };
178 
179   enum class TransmitAction {
180     // Start of a new transfer.
181     kBegin,
182     // Extend the current window length.
183     kExtend,
184     // Retransmit from a specified offset.
185     kRetransmit,
186   };
187 
set_transfer_state(TransferState state)188   void set_transfer_state(TransferState state) { transfer_state_ = state; }
189 
190   // The session ID as unsigned instead of uint32_t so it can be used with %u.
id_for_log()191   unsigned id_for_log() const {
192     static_assert(sizeof(unsigned) >= sizeof(session_id_));
193     return static_cast<unsigned>(session_id_);
194   }
195 
reader()196   stream::Reader& reader() {
197     PW_DASSERT(active() && type() == TransferType::kTransmit);
198     return static_cast<stream::Reader&>(*stream_);
199   }
200 
writer()201   stream::Writer& writer() {
202     PW_DASSERT(active() && type() == TransferType::kReceive);
203     return static_cast<stream::Writer&>(*stream_);
204   }
205 
DataTransferComplete()206   bool DataTransferComplete() const {
207     return transfer_state_ == TransferState::kTerminating ||
208            transfer_state_ == TransferState::kCompleted;
209   }
210 
ShouldSkipCompletionHandshake()211   bool ShouldSkipCompletionHandshake() const {
212     // Completion handshakes are not part of the legacy protocol. Additionally,
213     // transfers which have not yet fully established should not handshake and
214     // simply time out.
215     return configured_protocol_version_ <= ProtocolVersion::kLegacy ||
216            transfer_state_ == TransferState::kInitiating;
217   }
218 
219   // Calculates the maximum size of actual data that can be sent within a
220   // single client write transfer chunk, accounting for the overhead of the
221   // transfer protocol and RPC system.
222   //
223   // Note: This function relies on RPC protocol internals. This is generally a
224   // *bad* idea, but is necessary here due to limitations of the RPC system
225   // and its asymmetric ingress and egress paths.
226   //
227   // TODO(frolv): This should be investigated further and perhaps addressed
228   // within the RPC system, at the least through a helper function.
229   uint32_t MaxWriteChunkSize(uint32_t max_chunk_size_bytes,
230                              uint32_t channel_id) const;
231 
232   // Initializes a new transfer using new_transfer. The provided stream
233   // argument is used in place of the NewTransferEvent's stream. Only
234   // initializes state; no packets are sent.
235   //
236   // Precondition: context is not active.
237   void Initialize(const NewTransferEvent& new_transfer);
238 
239   // Starts a new transfer from an initialized context by sending the initial
240   // transfer chunk. This is only used by transfer clients, as the transfer
241   // service cannot initiate transfers.
242   //
243   // Calls Finish(), which calls the on_completion callback, if initiating a
244   // transfer fails.
245   void InitiateTransferAsClient();
246 
247   // Starts a new transfer on the server after receiving a request from a
248   // client.
249   bool StartTransferAsServer(const NewTransferEvent& new_transfer);
250 
251   // Does final cleanup specific to the server or client. Returns whether the
252   // cleanup succeeded. An error in cleanup indicates that the transfer
253   // failed.
254   virtual Status FinalCleanup(Status status) = 0;
255 
256   // Processes a chunk in either a transfer or receive transfer.
257   void HandleChunkEvent(const ChunkEvent& event);
258 
259   // Runs the initial three-way handshake when starting a new transfer.
260   void PerformInitialHandshake(const Chunk& chunk);
261 
262   void UpdateLocalProtocolConfigurationFromPeer(const Chunk& chunk);
263 
264   // Processes a chunk in a transmit transfer.
265   void HandleTransmitChunk(const Chunk& chunk);
266 
267   // Processes a transfer parameters update in a transmit transfer.
268   void HandleTransferParametersUpdate(const Chunk& chunk);
269 
270   // Sends the next chunk in a transmit transfer, if any.
271   void TransmitNextChunk(bool retransmit_requested);
272 
273   // Processes a chunk in a receive transfer.
274   void HandleReceiveChunk(const Chunk& chunk);
275 
276   // Processes a data chunk in a received while in the kWaiting state.
277   void HandleReceivedData(const Chunk& chunk);
278 
279   // Sends the first chunk in a transmit transfer.
280   void SendInitialTransmitChunk();
281 
282   // Updates the current receive transfer parameters based on the context's
283   // configuration.
284   void UpdateTransferParameters();
285 
286   // Populates the transfer parameters fields on a chunk object.
287   void SetTransferParameters(Chunk& parameters);
288 
289   // In a receive transfer, sends a parameters chunk telling the transmitter
290   // how much data they can send.
291   void SendTransferParameters(TransmitAction action);
292 
293   // Updates the current receive transfer parameters, then sends them.
294   void UpdateAndSendTransferParameters(TransmitAction action);
295 
296   // Processes a chunk in a terminating state.
297   void HandleTerminatingChunk(const Chunk& chunk);
298 
299   // Ends the transfer with the specified status, sending a completion chunk to
300   // the peer.
301   void TerminateTransfer(Status status, bool with_resource_id = false);
302 
303   // Ends a transfer following notification of completion from the peer.
304   void HandleTermination(Status status);
305 
306   // Forcefully ends a transfer locally without contacting the peer.
Abort(Status status)307   void Abort(Status status) {
308     Finish(status);
309     set_transfer_state(TransferState::kCompleted);
310   }
311 
312   // Sends a final status chunk of a completed transfer without updating the
313   // transfer. Sends status_, which MUST have been set by a previous Finish()
314   // call.
315   void SendFinalStatusChunk(bool with_resource_id = false);
316 
317   // Marks the transfer as completed and calls FinalCleanup(). Sets status_ to
318   // the final status for this transfer. The transfer MUST be active when this
319   // is called.
320   void Finish(Status status);
321 
322   // Encodes the specified chunk to the encode buffer and sends it with the
323   // rpc_writer_. Calls Finish() with an error if the operation fails.
324   void EncodeAndSendChunk(const Chunk& chunk);
325 
326   void SetTimeout(chrono::SystemClock::duration timeout);
ClearTimeout()327   void ClearTimeout() { next_timeout_ = kNoTimeout; }
328 
329   // Called when the transfer's timeout expires.
330   void HandleTimeout();
331 
332   // Resends the last packet or aborts the transfer if the maximum retries has
333   // been exceeded.
334   void Retry();
335   void RetryHandshake();
336 
337   void LogTransferConfiguration();
338 
339   static constexpr uint8_t kFlagsType = 1 << 0;
340   static constexpr uint8_t kFlagsDataSent = 1 << 1;
341   static constexpr uint8_t kFlagsContactMade = 1 << 2;
342 
343   static constexpr uint32_t kDefaultChunkDelayMicroseconds = 2000;
344 
345   // How long to wait for the other side to ACK a final transfer chunk before
346   // resetting the context so that it can be reused. During this time, the
347   // status chunk will be re-sent for every non-ACK chunk received,
348   // continually notifying the other end that the transfer is over.
349   static constexpr chrono::SystemClock::duration kFinalChunkAckTimeout =
350       std::chrono::milliseconds(5000);
351 
352   static constexpr chrono::SystemClock::time_point kNoTimeout =
353       chrono::SystemClock::time_point(chrono::SystemClock::duration(0));
354 
355   uint32_t session_id_;
356   uint32_t resource_id_;
357 
358   // The version of the transfer protocol that this node wants to run.
359   ProtocolVersion desired_protocol_version_;
360 
361   // The version of the transfer protocol that the context is actually running,
362   // following negotiation with the transfer peer.
363   ProtocolVersion configured_protocol_version_;
364 
365   uint8_t flags_;
366   TransferState transfer_state_;
367   uint8_t retries_;
368   uint8_t max_retries_;
369   uint32_t lifetime_retries_;
370   uint32_t max_lifetime_retries_;
371 
372   // The stream from which to read or to which to write data.
373   stream::Stream* stream_;
374   rpc::Writer* rpc_writer_;
375 
376   uint32_t offset_;
377   uint32_t window_size_;
378   uint32_t window_end_offset_;
379   uint32_t max_chunk_size_bytes_;
380 
381   const TransferParameters* max_parameters_;
382   TransferThread* thread_;
383 
384   Chunk::Type last_chunk_sent_;
385 
386   union {
387     Status status_;               // Used when state is kCompleted.
388     uint32_t last_chunk_offset_;  // Used in states kWaiting and kRecovery.
389   };
390 
391   // How long to wait for a chunk from the other end.
392   chrono::SystemClock::duration chunk_timeout_;
393 
394   // How long to delay between transmitting subsequent data chunks within a
395   // window.
396   chrono::SystemClock::duration interchunk_delay_;
397 
398   // Timestamp at which the transfer will next time out, or kNoTimeout.
399   chrono::SystemClock::time_point next_timeout_;
400 
401   RateEstimate transfer_rate_;
402 };
403 
404 }  // namespace pw::transfer::internal
405