• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 #pragma once
15 
16 #include <cstdint>
17 
18 #include "pw_assert/assert.h"
19 #include "pw_bytes/span.h"
20 #include "pw_chrono/system_clock.h"
21 #include "pw_function/function.h"
22 #include "pw_rpc/raw/client_reader_writer.h"
23 #include "pw_rpc/raw/server_reader_writer.h"
24 #include "pw_span/span.h"
25 #include "pw_sync/binary_semaphore.h"
26 #include "pw_sync/timed_thread_notification.h"
27 #include "pw_thread/thread_core.h"
28 #include "pw_transfer/handler.h"
29 #include "pw_transfer/internal/client_context.h"
30 #include "pw_transfer/internal/context.h"
31 #include "pw_transfer/internal/event.h"
32 #include "pw_transfer/internal/server_context.h"
33 
34 namespace pw::transfer {
35 
36 class Client;
37 
38 namespace internal {
39 
40 class TransferThread : public thread::ThreadCore {
41  public:
TransferThread(span<ClientContext> client_transfers,span<ServerContext> server_transfers,ByteSpan chunk_buffer,ByteSpan encode_buffer)42   TransferThread(span<ClientContext> client_transfers,
43                  span<ServerContext> server_transfers,
44                  ByteSpan chunk_buffer,
45                  ByteSpan encode_buffer)
46       : client_transfers_(client_transfers),
47         server_transfers_(server_transfers),
48         next_session_id_(1),
49         chunk_buffer_(chunk_buffer),
50         encode_buffer_(encode_buffer) {}
51 
52   void StartClientTransfer(TransferType type,
53                            ProtocolVersion version,
54                            uint32_t resource_id,
55                            uint32_t handle_id,
56                            stream::Stream* stream,
57                            const TransferParameters& max_parameters,
58                            Function<void(Status)>&& on_completion,
59                            chrono::SystemClock::duration timeout,
60                            chrono::SystemClock::duration initial_timeout,
61                            uint8_t max_retries,
62                            uint32_t max_lifetime_retries,
63                            uint32_t initial_offset = 0) {
64     StartTransfer(type,
65                   version,
66                   Context::kUnassignedSessionId,  // Assigned later.
67                   resource_id,
68                   handle_id,
69                   /*raw_chunk=*/{},
70                   stream,
71                   max_parameters,
72                   std::move(on_completion),
73                   timeout,
74                   initial_timeout,
75                   max_retries,
76                   max_lifetime_retries,
77                   initial_offset);
78   }
79 
80   void StartServerTransfer(TransferType type,
81                            ProtocolVersion version,
82                            uint32_t session_id,
83                            uint32_t resource_id,
84                            ConstByteSpan raw_chunk,
85                            const TransferParameters& max_parameters,
86                            chrono::SystemClock::duration timeout,
87                            uint8_t max_retries,
88                            uint32_t max_lifetime_retries,
89                            uint32_t initial_offset = 0) {
90     StartTransfer(type,
91                   version,
92                   session_id,
93                   resource_id,
94                   /*handle_id=*/0,
95                   raw_chunk,
96                   /*stream=*/nullptr,
97                   max_parameters,
98                   /*on_completion=*/nullptr,
99                   timeout,
100                   timeout,
101                   max_retries,
102                   max_lifetime_retries,
103                   initial_offset);
104   }
105 
ProcessClientChunk(ConstByteSpan chunk)106   void ProcessClientChunk(ConstByteSpan chunk) {
107     ProcessChunk(EventType::kClientChunk, chunk);
108   }
109 
ProcessServerChunk(ConstByteSpan chunk)110   void ProcessServerChunk(ConstByteSpan chunk) {
111     ProcessChunk(EventType::kServerChunk, chunk);
112   }
113 
SendServerStatus(TransferType type,uint32_t session_id,ProtocolVersion version,Status status)114   void SendServerStatus(TransferType type,
115                         uint32_t session_id,
116                         ProtocolVersion version,
117                         Status status) {
118     SendStatus(type == TransferType::kTransmit ? TransferStream::kServerRead
119                                                : TransferStream::kServerWrite,
120                session_id,
121                version,
122                status);
123   }
124 
CancelClientTransfer(uint32_t handle_id)125   void CancelClientTransfer(uint32_t handle_id) {
126     EndTransfer(EventType::kClientEndTransfer,
127                 IdentifierType::Handle,
128                 handle_id,
129                 Status::Cancelled(),
130                 /*send_status_chunk=*/true);
131   }
132 
133   void EndClientTransfer(uint32_t session_id,
134                          Status status,
135                          bool send_status_chunk = false) {
136     EndTransfer(EventType::kClientEndTransfer,
137                 IdentifierType::Session,
138                 session_id,
139                 status,
140                 send_status_chunk);
141   }
142 
143   void EndServerTransfer(uint32_t session_id,
144                          Status status,
145                          bool send_status_chunk = false) {
146     EndTransfer(EventType::kServerEndTransfer,
147                 IdentifierType::Session,
148                 session_id,
149                 status,
150                 send_status_chunk);
151   }
152 
153   /// Updates the transfer thread's client read stream.
154   ///
155   /// The provided stream should not have an on_next function set. Instead,
156   /// on_next is passed separately to ensure that it is only set when the new
157   /// stream becomes the transfer thread's primary stream.
158   ///
159   /// If the thread has an existing active client read stream, closes it and
160   /// terminates any transfers running on it.
SetClientReadStream(rpc::RawClientReaderWriter & read_stream,Function<void (ConstByteSpan)> && on_next)161   void SetClientReadStream(rpc::RawClientReaderWriter& read_stream,
162                            Function<void(ConstByteSpan)>&& on_next) {
163     // Clear the existing callback to prevent incoming chunks from blocking on
164     // the transfer thread and preventing the call's cleanup.
165     client_read_stream_.set_on_next(nullptr);
166     staged_client_stream_ = std::move(read_stream);
167     staged_client_on_next_ = std::move(on_next);
168     SetStream(TransferStream::kClientRead);
169   }
170 
171   /// Updates the transfer thread's client write stream.
172   ///
173   /// The provided stream should not have an on_next function set. Instead,
174   /// on_next is passed separately to ensure that it is only set when the new
175   /// stream becomes the transfer thread's primary stream.
176   ///
177   /// If the thread has an existing active client write stream, closes it and
178   /// terminates any transfers running on it.
SetClientWriteStream(rpc::RawClientReaderWriter & write_stream,Function<void (ConstByteSpan)> && on_next)179   void SetClientWriteStream(rpc::RawClientReaderWriter& write_stream,
180                             Function<void(ConstByteSpan)>&& on_next) {
181     // Clear the existing callback to prevent incoming chunks from blocking on
182     // the transfer thread and preventing the call's cleanup.
183     client_write_stream_.set_on_next(nullptr);
184     staged_client_stream_ = std::move(write_stream);
185     staged_client_on_next_ = std::move(on_next);
186     SetStream(TransferStream::kClientWrite);
187   }
188 
189   /// Updates the transfer thread's server read stream.
190   ///
191   /// The provided stream should not have an on_next function set. Instead,
192   /// on_next is passed separately to ensure that it is only set when the new
193   /// stream becomes the transfer thread's primary stream.
194   ///
195   /// If the thread has an existing active server read stream, closes it and
196   /// terminates any transfers running on it.
SetServerReadStream(rpc::RawServerReaderWriter & read_stream,Function<void (ConstByteSpan)> && on_next)197   void SetServerReadStream(rpc::RawServerReaderWriter& read_stream,
198                            Function<void(ConstByteSpan)>&& on_next) {
199     // Clear the existing callback to prevent incoming chunks from blocking on
200     // the transfer thread and preventing the call's cleanup.
201     server_read_stream_.set_on_next(nullptr);
202     staged_server_stream_ = std::move(read_stream);
203     staged_server_on_next_ = std::move(on_next);
204     SetStream(TransferStream::kServerRead);
205   }
206 
207   /// Updates the transfer thread's server write stream.
208   ///
209   /// The provided stream should not have an on_next function set. Instead,
210   /// on_next is passed separately to ensure that it is only set when the new
211   /// stream becomes the transfer thread's primary stream.
212   ///
213   /// If the thread has an existing active server write stream, closes it and
214   /// terminates any transfers running on it.
SetServerWriteStream(rpc::RawServerReaderWriter & write_stream,Function<void (ConstByteSpan)> && on_next)215   void SetServerWriteStream(rpc::RawServerReaderWriter& write_stream,
216                             Function<void(ConstByteSpan)>&& on_next) {
217     // Clear the existing callback to prevent incoming chunks from blocking on
218     // the transfer thread and preventing the call's cleanup.
219     server_write_stream_.set_on_next(nullptr);
220     staged_server_stream_ = std::move(write_stream);
221     staged_server_on_next_ = std::move(on_next);
222     SetStream(TransferStream::kServerWrite);
223   }
224 
AddTransferHandler(Handler & handler)225   void AddTransferHandler(Handler& handler) {
226     TransferHandlerEvent(EventType::kAddTransferHandler, handler);
227   }
228 
RemoveTransferHandler(Handler & handler)229   void RemoveTransferHandler(Handler& handler) {
230     TransferHandlerEvent(EventType::kRemoveTransferHandler, handler);
231     // Ensure this function blocks until the transfer handler is fully cleaned
232     // up.
233     WaitUntilEventIsProcessed();
234   }
235 
max_chunk_size()236   size_t max_chunk_size() const { return chunk_buffer_.size(); }
237 
238   // For testing only: terminates the transfer thread with a kTerminate event.
239   void Terminate();
240 
241   // For testing only: blocks until the next event can be acquired, which means
242   // a previously enqueued event has been processed.
WaitUntilEventIsProcessed()243   void WaitUntilEventIsProcessed() {
244     next_event_ownership_.acquire();
245     next_event_ownership_.release();
246   }
247 
248   // For testing only: simulates a timeout event for a client transfer.
SimulateClientTimeout(uint32_t session_id)249   void SimulateClientTimeout(uint32_t session_id) {
250     SimulateTimeout(EventType::kClientTimeout, session_id);
251   }
252 
253   // For testing only: simulates a timeout event for a server transfer.
SimulateServerTimeout(uint32_t session_id)254   void SimulateServerTimeout(uint32_t session_id) {
255     SimulateTimeout(EventType::kServerTimeout, session_id);
256   }
257 
258   void EnqueueResourceEvent(uint32_t resource_id,
259                             ResourceStatusCallback&& callback);
260 
261  private:
262   friend class transfer::Client;
263   friend class Context;
264 
265   // Maximum amount of time between transfer thread runs.
266   static constexpr chrono::SystemClock::duration kMaxTimeout =
267       std::chrono::seconds(2);
268 
269   void UpdateClientTransfer(uint32_t handle_id, size_t transfer_size_bytes);
270 
271   // Finds an active server or client transfer, matching against its legacy ID.
272   template <typename T>
FindActiveTransferByLegacyId(const span<T> & transfers,uint32_t session_id)273   static Context* FindActiveTransferByLegacyId(const span<T>& transfers,
274                                                uint32_t session_id) {
275     auto transfer =
276         std::find_if(transfers.begin(), transfers.end(), [session_id](auto& c) {
277           return c.initialized() && c.session_id() == session_id;
278         });
279     return transfer != transfers.end() ? &*transfer : nullptr;
280   }
281 
282   // Finds an active server or client transfer, matching against resource ID.
283   template <typename T>
FindActiveTransferByResourceId(const span<T> & transfers,uint32_t resource_id)284   static Context* FindActiveTransferByResourceId(const span<T>& transfers,
285                                                  uint32_t resource_id) {
286     auto transfer = std::find_if(
287         transfers.begin(), transfers.end(), [resource_id](auto& c) {
288           return c.initialized() && c.resource_id() == resource_id;
289         });
290     return transfer != transfers.end() ? &*transfer : nullptr;
291   }
292 
FindClientTransferByHandleId(uint32_t handle_id)293   Context* FindClientTransferByHandleId(uint32_t handle_id) const {
294     auto transfer =
295         std::find_if(client_transfers_.begin(),
296                      client_transfers_.end(),
297                      [handle_id](auto& c) {
298                        return c.initialized() && c.handle_id() == handle_id;
299                      });
300     return transfer != client_transfers_.end() ? &*transfer : nullptr;
301   }
302 
303   void SimulateTimeout(EventType type, uint32_t session_id);
304 
305   // Finds an new server or client transfer.
306   template <typename T>
FindNewTransfer(const span<T> & transfers,uint32_t session_id)307   static Context* FindNewTransfer(const span<T>& transfers,
308                                   uint32_t session_id) {
309     Context* new_transfer = nullptr;
310 
311     for (Context& context : transfers) {
312       if (context.active()) {
313         if (context.session_id() == session_id) {
314           // Restart an already active transfer.
315           return &context;
316         }
317       } else {
318         // Store the inactive context as an option, but keep checking for the
319         // restart case.
320         new_transfer = &context;
321       }
322     }
323 
324     return new_transfer;
325   }
326 
encode_buffer()327   const ByteSpan& encode_buffer() const { return encode_buffer_; }
328 
329   void Run() final;
330 
331   void HandleTimeouts();
332 
stream_for(TransferStream stream)333   rpc::Writer& stream_for(TransferStream stream) {
334     switch (stream) {
335       case TransferStream::kClientRead:
336         return client_read_stream_.as_writer();
337       case TransferStream::kClientWrite:
338         return client_write_stream_.as_writer();
339       case TransferStream::kServerRead:
340         return server_read_stream_.as_writer();
341       case TransferStream::kServerWrite:
342         return server_write_stream_.as_writer();
343     }
344     // An unknown TransferStream value was passed, which means this function
345     // was passed an invalid enum value.
346     PW_ASSERT(false);
347   }
348 
349   // Returns the earliest timeout among all active transfers, up to kMaxTimeout.
350   chrono::SystemClock::time_point GetNextTransferTimeout() const;
351 
352   uint32_t AssignSessionId();
353 
354   void StartTransfer(TransferType type,
355                      ProtocolVersion version,
356                      uint32_t session_id,
357                      uint32_t resource_id,
358                      uint32_t handle_id,
359                      ConstByteSpan raw_chunk,
360                      stream::Stream* stream,
361                      const TransferParameters& max_parameters,
362                      Function<void(Status)>&& on_completion,
363                      chrono::SystemClock::duration timeout,
364                      chrono::SystemClock::duration initial_timeout,
365                      uint8_t max_retries,
366                      uint32_t max_lifetime_retries,
367                      uint32_t initial_offset);
368 
369   void ProcessChunk(EventType type, ConstByteSpan chunk);
370 
371   void SendStatus(TransferStream stream,
372                   uint32_t session_id,
373                   ProtocolVersion version,
374                   Status status);
375 
376   void EndTransfer(EventType type,
377                    IdentifierType id_type,
378                    uint32_t session_id,
379                    Status status,
380                    bool send_status_chunk);
381 
382   void SetStream(TransferStream stream);
383   void HandleSetStreamEvent(TransferStream stream);
384 
385   void TransferHandlerEvent(EventType type, Handler& handler);
386 
387   void HandleEvent(const Event& event);
388   Context* FindContextForEvent(const Event& event) const;
389 
390   void SendStatusChunk(const SendStatusChunkEvent& event);
391 
392   void GetResourceState(uint32_t resource_id);
393 
394   sync::TimedThreadNotification event_notification_;
395   sync::BinarySemaphore next_event_ownership_;
396 
397   Event next_event_;
398   Function<void(Status)> staged_on_completion_;
399 
400   rpc::RawClientReaderWriter client_read_stream_;
401   rpc::RawClientReaderWriter client_write_stream_;
402   rpc::RawClientReaderWriter staged_client_stream_;
403   Function<void(ConstByteSpan)> staged_client_on_next_;
404 
405   rpc::RawServerReaderWriter server_read_stream_;
406   rpc::RawServerReaderWriter server_write_stream_;
407   rpc::RawServerReaderWriter staged_server_stream_;
408   Function<void(ConstByteSpan)> staged_server_on_next_;
409 
410   span<ClientContext> client_transfers_;
411   span<ServerContext> server_transfers_;
412 
413   // Identifier to use for the next started transfer, unique over the RPC
414   // channel between the transfer client and server.
415   //
416   // TODO(frolv): If we ever support changing the RPC channel, this should be
417   // reset to 1.
418   uint32_t next_session_id_;
419 
420   // All registered transfer handlers.
421   IntrusiveList<Handler> handlers_;
422 
423   // Buffer in which chunk data is staged for CHUNK events.
424   ByteSpan chunk_buffer_;
425 
426   // Buffer into which responses are encoded. Only ever used from within the
427   // transfer thread, so no locking is required.
428   ByteSpan encode_buffer_;
429 
430   ResourceStatusCallback resource_status_callback_ = nullptr;
431 };
432 
433 }  // namespace internal
434 
435 using TransferThread = internal::TransferThread;
436 
437 template <size_t kMaxConcurrentClientTransfers,
438           size_t kMaxConcurrentServerTransfers>
439 class Thread final : public internal::TransferThread {
440  public:
Thread(ByteSpan chunk_buffer,ByteSpan encode_buffer)441   Thread(ByteSpan chunk_buffer, ByteSpan encode_buffer)
442       : internal::TransferThread(
443             client_contexts_, server_contexts_, chunk_buffer, encode_buffer) {}
444 
445  private:
446   std::array<internal::ClientContext, kMaxConcurrentClientTransfers>
447       client_contexts_;
448   std::array<internal::ServerContext, kMaxConcurrentServerTransfers>
449       server_contexts_;
450 };
451 
452 }  // namespace pw::transfer
453