• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 #pragma once
15 
16 #include <cstdint>
17 
18 #include "pw_assert/assert.h"
19 #include "pw_chrono/system_clock.h"
20 #include "pw_function/function.h"
21 #include "pw_preprocessor/compiler.h"
22 #include "pw_rpc/raw/client_reader_writer.h"
23 #include "pw_rpc/raw/server_reader_writer.h"
24 #include "pw_span/span.h"
25 #include "pw_sync/binary_semaphore.h"
26 #include "pw_sync/timed_thread_notification.h"
27 #include "pw_thread/thread_core.h"
28 #include "pw_transfer/handler.h"
29 #include "pw_transfer/internal/client_context.h"
30 #include "pw_transfer/internal/context.h"
31 #include "pw_transfer/internal/event.h"
32 #include "pw_transfer/internal/server_context.h"
33 
34 namespace pw::transfer {
35 namespace internal {
36 
37 class TransferThread : public thread::ThreadCore {
38  public:
TransferThread(span<ClientContext> client_transfers,span<ServerContext> server_transfers,ByteSpan chunk_buffer,ByteSpan encode_buffer)39   TransferThread(span<ClientContext> client_transfers,
40                  span<ServerContext> server_transfers,
41                  ByteSpan chunk_buffer,
42                  ByteSpan encode_buffer)
43       : client_transfers_(client_transfers),
44         server_transfers_(server_transfers),
45         chunk_buffer_(chunk_buffer),
46         encode_buffer_(encode_buffer) {}
47 
StartClientTransfer(TransferType type,ProtocolVersion version,uint32_t resource_id,stream::Stream * stream,const TransferParameters & max_parameters,Function<void (Status)> && on_completion,chrono::SystemClock::duration timeout,uint8_t max_retries,uint32_t max_lifetime_retries)48   void StartClientTransfer(TransferType type,
49                            ProtocolVersion version,
50                            uint32_t resource_id,
51                            stream::Stream* stream,
52                            const TransferParameters& max_parameters,
53                            Function<void(Status)>&& on_completion,
54                            chrono::SystemClock::duration timeout,
55                            uint8_t max_retries,
56                            uint32_t max_lifetime_retries) {
57     uint32_t session_id = version == ProtocolVersion::kLegacy
58                               ? resource_id
59                               : Context::kUnassignedSessionId;
60     StartTransfer(type,
61                   version,
62                   session_id,
63                   resource_id,
64                   /*raw_chunk=*/{},
65                   stream,
66                   max_parameters,
67                   std::move(on_completion),
68                   timeout,
69                   max_retries,
70                   max_lifetime_retries);
71   }
72 
StartServerTransfer(TransferType type,ProtocolVersion version,uint32_t session_id,uint32_t resource_id,ConstByteSpan raw_chunk,const TransferParameters & max_parameters,chrono::SystemClock::duration timeout,uint8_t max_retries,uint32_t max_lifetime_retries)73   void StartServerTransfer(TransferType type,
74                            ProtocolVersion version,
75                            uint32_t session_id,
76                            uint32_t resource_id,
77                            ConstByteSpan raw_chunk,
78                            const TransferParameters& max_parameters,
79                            chrono::SystemClock::duration timeout,
80                            uint8_t max_retries,
81                            uint32_t max_lifetime_retries) {
82     StartTransfer(type,
83                   version,
84                   session_id,
85                   resource_id,
86                   raw_chunk,
87                   /*stream=*/nullptr,
88                   max_parameters,
89                   /*on_completion=*/nullptr,
90                   timeout,
91                   max_retries,
92                   max_lifetime_retries);
93   }
94 
ProcessClientChunk(ConstByteSpan chunk)95   void ProcessClientChunk(ConstByteSpan chunk) {
96     ProcessChunk(EventType::kClientChunk, chunk);
97   }
98 
ProcessServerChunk(ConstByteSpan chunk)99   void ProcessServerChunk(ConstByteSpan chunk) {
100     ProcessChunk(EventType::kServerChunk, chunk);
101   }
102 
103   void EndClientTransfer(uint32_t session_id,
104                          Status status,
105                          bool send_status_chunk = false) {
106     EndTransfer(
107         EventType::kClientEndTransfer, session_id, status, send_status_chunk);
108   }
109 
110   void EndServerTransfer(uint32_t session_id,
111                          Status status,
112                          bool send_status_chunk = false) {
113     EndTransfer(
114         EventType::kServerEndTransfer, session_id, status, send_status_chunk);
115   }
116 
117   // Move the read/write streams on this thread instead of the transfer thread.
118   // RPC call objects are synchronized by pw_rpc, so this move will be atomic
119   // with respect to the transfer thread.
SetClientReadStream(rpc::RawClientReaderWriter & read_stream)120   void SetClientReadStream(rpc::RawClientReaderWriter& read_stream) {
121     client_read_stream_ = std::move(read_stream);
122   }
123 
SetClientWriteStream(rpc::RawClientReaderWriter & write_stream)124   void SetClientWriteStream(rpc::RawClientReaderWriter& write_stream) {
125     client_write_stream_ = std::move(write_stream);
126   }
127 
SetServerReadStream(rpc::RawServerReaderWriter & read_stream)128   void SetServerReadStream(rpc::RawServerReaderWriter& read_stream) {
129     server_read_stream_ = std::move(read_stream);
130   }
131 
SetServerWriteStream(rpc::RawServerReaderWriter & write_stream)132   void SetServerWriteStream(rpc::RawServerReaderWriter& write_stream) {
133     server_write_stream_ = std::move(write_stream);
134   }
135 
AddTransferHandler(Handler & handler)136   void AddTransferHandler(Handler& handler) {
137     TransferHandlerEvent(EventType::kAddTransferHandler, handler);
138   }
139 
RemoveTransferHandler(Handler & handler)140   void RemoveTransferHandler(Handler& handler) {
141     TransferHandlerEvent(EventType::kRemoveTransferHandler, handler);
142     // Ensure this function blocks until the transfer handler is fully cleaned
143     // up.
144     WaitUntilEventIsProcessed();
145   }
146 
max_chunk_size()147   size_t max_chunk_size() const { return chunk_buffer_.size(); }
148 
149   // For testing only: terminates the transfer thread with a kTerminate event.
150   void Terminate();
151 
152   // For testing only: blocks until the next event can be acquired, which means
153   // a previously enqueued event has been processed.
WaitUntilEventIsProcessed()154   void WaitUntilEventIsProcessed() {
155     next_event_ownership_.acquire();
156     next_event_ownership_.release();
157   }
158 
159   // For testing only: simulates a timeout event for a client transfer.
SimulateClientTimeout(uint32_t session_id)160   void SimulateClientTimeout(uint32_t session_id) {
161     SimulateTimeout(EventType::kClientTimeout, session_id);
162   }
163 
164   // For testing only: simulates a timeout event for a server transfer.
SimulateServerTimeout(uint32_t session_id)165   void SimulateServerTimeout(uint32_t session_id) {
166     SimulateTimeout(EventType::kServerTimeout, session_id);
167   }
168 
169  private:
170   friend class Context;
171 
172   // Maximum amount of time between transfer thread runs.
173   static constexpr chrono::SystemClock::duration kMaxTimeout =
174       std::chrono::seconds(2);
175 
176   // Finds an active server or client transfer, matching against its legacy ID.
177   template <typename T>
FindActiveTransferByLegacyId(const span<T> & transfers,uint32_t session_id)178   static Context* FindActiveTransferByLegacyId(const span<T>& transfers,
179                                                uint32_t session_id) {
180     auto transfer =
181         std::find_if(transfers.begin(), transfers.end(), [session_id](auto& c) {
182           return c.initialized() && c.id() == session_id;
183         });
184     return transfer != transfers.end() ? &*transfer : nullptr;
185   }
186 
187   // Finds an active server or client transfer, matching against resource ID.
188   template <typename T>
FindActiveTransferByResourceId(const span<T> & transfers,uint32_t resource_id)189   static Context* FindActiveTransferByResourceId(const span<T>& transfers,
190                                                  uint32_t resource_id) {
191     auto transfer = std::find_if(
192         transfers.begin(), transfers.end(), [resource_id](auto& c) {
193           return c.initialized() && c.resource_id() == resource_id;
194         });
195     return transfer != transfers.end() ? &*transfer : nullptr;
196   }
197 
198   void SimulateTimeout(EventType type, uint32_t session_id);
199 
200   // Finds an new server or client transfer.
201   template <typename T>
FindNewTransfer(const span<T> & transfers,uint32_t session_id)202   static Context* FindNewTransfer(const span<T>& transfers,
203                                   uint32_t session_id) {
204     Context* new_transfer = nullptr;
205 
206     for (Context& context : transfers) {
207       if (context.active()) {
208         if (context.session_id() == session_id) {
209           // Restart an already active transfer.
210           return &context;
211         }
212       } else {
213         // Store the inactive context as an option, but keep checking for the
214         // restart case.
215         new_transfer = &context;
216       }
217     }
218 
219     return new_transfer;
220   }
221 
encode_buffer()222   const ByteSpan& encode_buffer() const { return encode_buffer_; }
223 
224   void Run() final;
225 
226   void HandleTimeouts();
227 
stream_for(TransferStream stream)228   rpc::Writer& stream_for(TransferStream stream) {
229     switch (stream) {
230       case TransferStream::kClientRead:
231         return client_read_stream_;
232       case TransferStream::kClientWrite:
233         return client_write_stream_;
234       case TransferStream::kServerRead:
235         return server_read_stream_;
236       case TransferStream::kServerWrite:
237         return server_write_stream_;
238     }
239     // An unknown TransferStream value was passed, which means this function
240     // was passed an invalid enum value.
241     PW_ASSERT(false);
242   }
243 
244   // Returns the earliest timeout among all active transfers, up to kMaxTimeout.
245   chrono::SystemClock::time_point GetNextTransferTimeout() const;
246 
247   void StartTransfer(TransferType type,
248                      ProtocolVersion version,
249                      uint32_t session_id,
250                      uint32_t resource_id,
251                      ConstByteSpan raw_chunk,
252                      stream::Stream* stream,
253                      const TransferParameters& max_parameters,
254                      Function<void(Status)>&& on_completion,
255                      chrono::SystemClock::duration timeout,
256                      uint8_t max_retries,
257                      uint32_t max_lifetime_retries);
258 
259   void ProcessChunk(EventType type, ConstByteSpan chunk);
260 
261   void EndTransfer(EventType type,
262                    uint32_t session_id,
263                    Status status,
264                    bool send_status_chunk);
265 
266   void TransferHandlerEvent(EventType type, Handler& handler);
267 
268   void HandleEvent(const Event& event);
269   Context* FindContextForEvent(const Event& event) const;
270 
271   void SendStatusChunk(const SendStatusChunkEvent& event);
272 
273   sync::TimedThreadNotification event_notification_;
274   sync::BinarySemaphore next_event_ownership_;
275 
276   Event next_event_;
277   Function<void(Status)> staged_on_completion_;
278 
279   rpc::RawClientReaderWriter client_read_stream_;
280   rpc::RawClientReaderWriter client_write_stream_;
281   rpc::RawServerReaderWriter server_read_stream_;
282   rpc::RawServerReaderWriter server_write_stream_;
283 
284   span<ClientContext> client_transfers_;
285   span<ServerContext> server_transfers_;
286 
287   // All registered transfer handlers.
288   IntrusiveList<Handler> handlers_;
289 
290   // Buffer in which chunk data is staged for CHUNK events.
291   ByteSpan chunk_buffer_;
292 
293   // Buffer into which responses are encoded. Only ever used from within the
294   // transfer thread, so no locking is required.
295   ByteSpan encode_buffer_;
296 };
297 
298 }  // namespace internal
299 
300 using TransferThread = internal::TransferThread;
301 
302 template <size_t kMaxConcurrentClientTransfers,
303           size_t kMaxConcurrentServerTransfers>
304 class Thread final : public internal::TransferThread {
305  public:
Thread(ByteSpan chunk_buffer,ByteSpan encode_buffer)306   Thread(ByteSpan chunk_buffer, ByteSpan encode_buffer)
307       : internal::TransferThread(
308             client_contexts_, server_contexts_, chunk_buffer, encode_buffer) {}
309 
310  private:
311   std::array<internal::ClientContext, kMaxConcurrentClientTransfers>
312       client_contexts_;
313   std::array<internal::ServerContext, kMaxConcurrentServerTransfers>
314       server_contexts_;
315 };
316 
317 }  // namespace pw::transfer
318