• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 #pragma once
15 
16 #include <cinttypes>
17 #include <cstddef>
18 #include <limits>
19 #include <optional>
20 
21 #include "pw_assert/assert.h"
22 #include "pw_chrono/system_clock.h"
23 #include "pw_rpc/writer.h"
24 #include "pw_status/status.h"
25 #include "pw_stream/stream.h"
26 #include "pw_transfer/internal/chunk.h"
27 #include "pw_transfer/internal/event.h"
28 #include "pw_transfer/rate_estimate.h"
29 
30 namespace pw::transfer::internal {
31 
32 class TransferThread;
33 
34 class TransferParameters {
35  public:
TransferParameters(uint32_t pending_bytes,uint32_t max_chunk_size_bytes,uint32_t extend_window_divisor)36   constexpr TransferParameters(uint32_t pending_bytes,
37                                uint32_t max_chunk_size_bytes,
38                                uint32_t extend_window_divisor)
39       : pending_bytes_(pending_bytes),
40         max_chunk_size_bytes_(max_chunk_size_bytes),
41         extend_window_divisor_(extend_window_divisor) {
42     PW_ASSERT(pending_bytes > 0);
43     PW_ASSERT(max_chunk_size_bytes > 0);
44     PW_ASSERT(extend_window_divisor > 1);
45   }
46 
pending_bytes()47   uint32_t pending_bytes() const { return pending_bytes_; }
set_pending_bytes(uint32_t pending_bytes)48   void set_pending_bytes(uint32_t pending_bytes) {
49     pending_bytes_ = pending_bytes;
50   }
51 
max_chunk_size_bytes()52   uint32_t max_chunk_size_bytes() const { return max_chunk_size_bytes_; }
set_max_chunk_size_bytes(uint32_t max_chunk_size_bytes)53   void set_max_chunk_size_bytes(uint32_t max_chunk_size_bytes) {
54     max_chunk_size_bytes_ = max_chunk_size_bytes;
55   }
56 
extend_window_divisor()57   uint32_t extend_window_divisor() const { return extend_window_divisor_; }
set_extend_window_divisor(uint32_t extend_window_divisor)58   void set_extend_window_divisor(uint32_t extend_window_divisor) {
59     PW_DASSERT(extend_window_divisor > 1);
60     extend_window_divisor_ = extend_window_divisor;
61   }
62 
63  private:
64   uint32_t pending_bytes_;
65   uint32_t max_chunk_size_bytes_;
66   uint32_t extend_window_divisor_;
67 };
68 
69 // Information about a single transfer.
70 class Context {
71  public:
72   Context(const Context&) = delete;
73   Context(Context&&) = delete;
74   Context& operator=(const Context&) = delete;
75   Context& operator=(Context&&) = delete;
76 
transfer_id()77   constexpr uint32_t transfer_id() const { return transfer_id_; }
78 
79   // True if the context has been used for a transfer (it has an ID).
initialized()80   bool initialized() const {
81     return transfer_state_ != TransferState::kInactive;
82   }
83 
84   // True if the transfer is active.
active()85   bool active() const { return transfer_state_ >= TransferState::kWaiting; }
86 
timeout()87   std::optional<chrono::SystemClock::time_point> timeout() const {
88     return active() && next_timeout_ != kNoTimeout
89                ? std::optional(next_timeout_)
90                : std::nullopt;
91   }
92 
93   // Returns true if the transfer's most recently set timeout has passed.
timed_out()94   bool timed_out() const {
95     std::optional<chrono::SystemClock::time_point> next_timeout = timeout();
96     return next_timeout.has_value() &&
97            chrono::SystemClock::now() >= next_timeout.value();
98   }
99 
100   // Processes an event for this transfer.
101   void HandleEvent(const Event& event);
102 
103  protected:
104   ~Context() = default;
105 
Context()106   constexpr Context()
107       : transfer_id_(0),
108         flags_(0),
109         transfer_state_(TransferState::kInactive),
110         retries_(0),
111         max_retries_(0),
112         stream_(nullptr),
113         rpc_writer_(nullptr),
114         offset_(0),
115         window_size_(0),
116         window_end_offset_(0),
117         pending_bytes_(0),
118         max_chunk_size_bytes_(std::numeric_limits<uint32_t>::max()),
119         max_parameters_(nullptr),
120         thread_(nullptr),
121         last_chunk_offset_(0),
122         chunk_timeout_(chrono::SystemClock::duration::zero()),
123         interchunk_delay_(chrono::SystemClock::for_at_least(
124             std::chrono::microseconds(kDefaultChunkDelayMicroseconds))),
125         next_timeout_(kNoTimeout) {}
126 
type()127   constexpr TransferType type() const {
128     return static_cast<TransferType>(flags_ & kFlagsType);
129   }
130 
131  private:
132   enum class TransferState : uint8_t {
133     // This ServerContext has never been used for a transfer. It is available
134     // for use for a transfer.
135     kInactive,
136     // A transfer completed and the final status chunk was sent. The Context
137     // is
138     // available for use for a new transfer. A receive transfer uses this
139     // state
140     // to allow a transmitter to retry its last chunk if the final status
141     // chunk
142     // was dropped.
143     kCompleted,
144     // Waiting for the other end to send a chunk.
145     kWaiting,
146     // Transmitting a window of data to a receiver.
147     kTransmitting,
148     // Recovering after one or more chunks was dropped in an active transfer.
149     kRecovery,
150   };
151 
152   enum class TransmitAction {
153     // Start of a new transfer.
154     kBegin,
155     // Extend the current window length.
156     kExtend,
157     // Retransmit from a specified offset.
158     kRetransmit,
159   };
160 
set_transfer_state(TransferState state)161   void set_transfer_state(TransferState state) { transfer_state_ = state; }
162 
163   // The transfer ID as unsigned instead of uint32_t so it can be used with %u.
id_for_log()164   unsigned id_for_log() const {
165     static_assert(sizeof(unsigned) >= sizeof(transfer_id_));
166     return static_cast<unsigned>(transfer_id_);
167   }
168 
reader()169   stream::Reader& reader() {
170     PW_DASSERT(active() && type() == TransferType::kTransmit);
171     return static_cast<stream::Reader&>(*stream_);
172   }
173 
writer()174   stream::Writer& writer() {
175     PW_DASSERT(active() && type() == TransferType::kReceive);
176     return static_cast<stream::Writer&>(*stream_);
177   }
178 
179   // Calculates the maximum size of actual data that can be sent within a
180   // single client write transfer chunk, accounting for the overhead of the
181   // transfer protocol and RPC system.
182   //
183   // Note: This function relies on RPC protocol internals. This is generally a
184   // *bad* idea, but is necessary here due to limitations of the RPC system
185   // and its asymmetric ingress and egress paths.
186   //
187   // TODO(frolv): This should be investigated further and perhaps addressed
188   // within the RPC system, at the least through a helper function.
189   uint32_t MaxWriteChunkSize(uint32_t max_chunk_size_bytes,
190                              uint32_t channel_id) const;
191 
192   // Initializes a new transfer using new_transfer. The provided stream
193   // argument is used in place of the NewTransferEvent's stream. Only
194   // initializes state; no packets are sent.
195   //
196   // Precondition: context is not active.
197   void Initialize(const NewTransferEvent& new_transfer);
198 
199   // Starts a new transfer from an initialized context by sending the initial
200   // transfer chunk. This is only used by transfer clients, as the transfer
201   // service cannot initiate transfers.
202   //
203   // Calls Finish(), which calls the on_completion callback, if initiating a
204   // transfer fails.
205   void InitiateTransferAsClient();
206 
207   // Starts a new transfer on the server after receiving a request from a
208   // client.
209   void StartTransferAsServer(const NewTransferEvent& new_transfer);
210 
211   // Does final cleanup specific to the server or client. Returns whether the
212   // cleanup succeeded. An error in cleanup indicates that the transfer
213   // failed.
214   virtual Status FinalCleanup(Status status) = 0;
215 
216   // Processes a chunk in either a transfer or receive transfer.
217   void HandleChunkEvent(const ChunkEvent& event);
218 
219   // Processes a chunk in a transmit transfer.
220   void HandleTransmitChunk(const Chunk& chunk);
221 
222   // Processes a transfer parameters update in a transmit transfer.
223   void HandleTransferParametersUpdate(const Chunk& chunk);
224 
225   // Sends the next chunk in a transmit transfer, if any.
226   void TransmitNextChunk(bool retransmit_requested);
227 
228   // Processes a chunk in a receive transfer.
229   void HandleReceiveChunk(const Chunk& chunk);
230 
231   // Processes a data chunk in a received while in the kWaiting state.
232   void HandleReceivedData(const Chunk& chunk);
233 
234   // Sends the first chunk in a transmit transfer.
235   void SendInitialTransmitChunk();
236 
237   // In a receive transfer, sends a parameters chunk telling the transmitter
238   // how much data they can send.
239   void SendTransferParameters(TransmitAction action);
240 
241   // Updates the current receive transfer parameters from the provided object,
242   // then sends them.
243   void UpdateAndSendTransferParameters(TransmitAction action);
244 
245   // Sends a final status chunk of a completed transfer without updating the
246   // the transfer. Sends status_, which MUST have been set by a previous
247   // Finish() call.
248   void SendFinalStatusChunk();
249 
250   // Marks the transfer as completed and calls FinalCleanup(). Sets status_ to
251   // the final status for this transfer. The transfer MUST be active when this
252   // is called.
253   void Finish(Status status);
254 
255   // Encodes the specified chunk to the encode buffer and sends it with the
256   // rpc_writer_. Calls Finish() with an error if the operation fails.
257   void EncodeAndSendChunk(const Chunk& chunk);
258 
259   void SetTimeout(chrono::SystemClock::duration timeout);
ClearTimeout()260   void ClearTimeout() { next_timeout_ = kNoTimeout; }
261 
262   // Called when the transfer's timeout expires.
263   void HandleTimeout();
264 
265   // Resends the last packet or aborts the transfer if the maximum retries has
266   // been exceeded.
267   void Retry();
268 
269   static constexpr uint8_t kFlagsType = 1 << 0;
270   static constexpr uint8_t kFlagsDataSent = 1 << 1;
271 
272   static constexpr uint32_t kDefaultChunkDelayMicroseconds = 2000;
273 
274   // How long to wait for the other side to ACK a final transfer chunk before
275   // resetting the context so that it can be reused. During this time, the
276   // status chunk will be re-sent for every non-ACK chunk received,
277   // continually notifying the other end that the transfer is over.
278   static constexpr chrono::SystemClock::duration kFinalChunkAckTimeout =
279       std::chrono::milliseconds(5000);
280 
281   static constexpr chrono::SystemClock::time_point kNoTimeout =
282       chrono::SystemClock::time_point(chrono::SystemClock::duration(0));
283 
284   uint32_t transfer_id_;
285   uint8_t flags_;
286   TransferState transfer_state_;
287   uint8_t retries_;
288   uint8_t max_retries_;
289 
290   // The stream from which to read or to which to write data.
291   stream::Stream* stream_;
292   rpc::Writer* rpc_writer_;
293 
294   uint32_t offset_;
295   uint32_t window_size_;
296   uint32_t window_end_offset_;
297   // TODO(pwbug/584): Remove pending_bytes in favor of window_end_offset.
298   uint32_t pending_bytes_;
299   uint32_t max_chunk_size_bytes_;
300 
301   const TransferParameters* max_parameters_;
302   TransferThread* thread_;
303 
304   union {
305     Status status_;               // Used when state is kCompleted.
306     uint32_t last_chunk_offset_;  // Used in states kWaiting and kRecovery.
307   };
308 
309   // How long to wait for a chunk from the other end.
310   chrono::SystemClock::duration chunk_timeout_;
311 
312   // How long to delay between transmitting subsequent data chunks within a
313   // window.
314   chrono::SystemClock::duration interchunk_delay_;
315 
316   // Timestamp at which the transfer will next time out, or kNoTimeout.
317   chrono::SystemClock::time_point next_timeout_;
318 
319   RateEstimate transfer_rate_;
320 };
321 
322 }  // namespace pw::transfer::internal
323