• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 #pragma once
15 
16 #include <cstdint>
17 #include <span>
18 
19 #include "pw_assert/assert.h"
20 #include "pw_chrono/system_clock.h"
21 #include "pw_function/function.h"
22 #include "pw_preprocessor/compiler.h"
23 #include "pw_rpc/raw/client_reader_writer.h"
24 #include "pw_rpc/raw/server_reader_writer.h"
25 #include "pw_sync/binary_semaphore.h"
26 #include "pw_sync/timed_thread_notification.h"
27 #include "pw_thread/thread_core.h"
28 #include "pw_transfer/handler.h"
29 #include "pw_transfer/internal/client_context.h"
30 #include "pw_transfer/internal/context.h"
31 #include "pw_transfer/internal/event.h"
32 #include "pw_transfer/internal/server_context.h"
33 
34 namespace pw::transfer {
35 namespace internal {
36 
37 class TransferThread : public thread::ThreadCore {
38  public:
TransferThread(std::span<ClientContext> client_transfers,std::span<ServerContext> server_transfers,ByteSpan chunk_buffer,ByteSpan encode_buffer)39   TransferThread(std::span<ClientContext> client_transfers,
40                  std::span<ServerContext> server_transfers,
41                  ByteSpan chunk_buffer,
42                  ByteSpan encode_buffer)
43       : client_transfers_(client_transfers),
44         server_transfers_(server_transfers),
45         chunk_buffer_(chunk_buffer),
46         encode_buffer_(encode_buffer) {}
47 
StartClientTransfer(TransferType type,uint32_t transfer_id,uint32_t handler_id,stream::Stream * stream,const TransferParameters & max_parameters,Function<void (Status)> && on_completion,chrono::SystemClock::duration timeout,uint8_t max_retries)48   void StartClientTransfer(TransferType type,
49                            uint32_t transfer_id,
50                            uint32_t handler_id,
51                            stream::Stream* stream,
52                            const TransferParameters& max_parameters,
53                            Function<void(Status)>&& on_completion,
54                            chrono::SystemClock::duration timeout,
55                            uint8_t max_retries) {
56     StartTransfer(type,
57                   transfer_id,
58                   handler_id,
59                   stream,
60                   max_parameters,
61                   std::move(on_completion),
62                   timeout,
63                   max_retries);
64   }
65 
StartServerTransfer(TransferType type,uint32_t transfer_id,uint32_t handler_id,const TransferParameters & max_parameters,chrono::SystemClock::duration timeout,uint8_t max_retries)66   void StartServerTransfer(TransferType type,
67                            uint32_t transfer_id,
68                            uint32_t handler_id,
69                            const TransferParameters& max_parameters,
70                            chrono::SystemClock::duration timeout,
71                            uint8_t max_retries) {
72     StartTransfer(type,
73                   transfer_id,
74                   handler_id,
75                   /*stream=*/nullptr,
76                   max_parameters,
77                   /*on_completion=*/nullptr,
78                   timeout,
79                   max_retries);
80   }
81 
ProcessClientChunk(ConstByteSpan chunk)82   void ProcessClientChunk(ConstByteSpan chunk) {
83     ProcessChunk(EventType::kClientChunk, chunk);
84   }
85 
ProcessServerChunk(ConstByteSpan chunk)86   void ProcessServerChunk(ConstByteSpan chunk) {
87     ProcessChunk(EventType::kServerChunk, chunk);
88   }
89 
SetClientReadStream(rpc::RawClientReaderWriter & read_stream)90   void SetClientReadStream(rpc::RawClientReaderWriter& read_stream) {
91     SetClientStream(TransferStream::kClientRead, read_stream);
92   }
93 
SetClientWriteStream(rpc::RawClientReaderWriter & write_stream)94   void SetClientWriteStream(rpc::RawClientReaderWriter& write_stream) {
95     SetClientStream(TransferStream::kClientWrite, write_stream);
96   }
97 
SetServerReadStream(rpc::RawServerReaderWriter & read_stream)98   void SetServerReadStream(rpc::RawServerReaderWriter& read_stream) {
99     SetServerStream(TransferStream::kServerRead, read_stream);
100   }
101 
SetServerWriteStream(rpc::RawServerReaderWriter & write_stream)102   void SetServerWriteStream(rpc::RawServerReaderWriter& write_stream) {
103     SetServerStream(TransferStream::kServerWrite, write_stream);
104   }
105 
AddTransferHandler(Handler & handler)106   void AddTransferHandler(Handler& handler) {
107     TransferHandlerEvent(EventType::kAddTransferHandler, handler);
108   }
109 
RemoveTransferHandler(Handler & handler)110   void RemoveTransferHandler(Handler& handler) {
111     TransferHandlerEvent(EventType::kRemoveTransferHandler, handler);
112   }
113 
max_chunk_size()114   size_t max_chunk_size() const { return chunk_buffer_.size(); }
115 
116   // For testing only: terminates the transfer thread with a kTerminate event.
117   void Terminate();
118 
119   // For testing only: blocks until the next event can be acquired, which means
120   // a previously enqueued event has been processed.
WaitUntilEventIsProcessed()121   void WaitUntilEventIsProcessed() {
122     next_event_ownership_.acquire();
123     next_event_ownership_.release();
124   }
125 
126   // For testing only: simulates a timeout event for a client transfer.
SimulateClientTimeout(uint32_t transfer_id)127   void SimulateClientTimeout(uint32_t transfer_id) {
128     SimulateTimeout(EventType::kClientTimeout, transfer_id);
129   }
130 
131   // For testing only: simulates a timeout event for a server transfer.
SimulateServerTimeout(uint32_t transfer_id)132   void SimulateServerTimeout(uint32_t transfer_id) {
133     SimulateTimeout(EventType::kServerTimeout, transfer_id);
134   }
135 
136  private:
137   friend class Context;
138 
139   // Maximum amount of time between transfer thread runs.
140   static constexpr chrono::SystemClock::duration kMaxTimeout =
141       std::chrono::seconds(2);
142 
143   // Finds an active server or client transfer.
144   template <typename T>
FindActiveTransfer(const std::span<T> & transfers,uint32_t transfer_id)145   static Context* FindActiveTransfer(const std::span<T>& transfers,
146                                      uint32_t transfer_id) {
147     auto transfer = std::find_if(
148         transfers.begin(), transfers.end(), [transfer_id](auto& c) {
149           return c.initialized() && c.transfer_id() == transfer_id;
150         });
151     return transfer != transfers.end() ? &*transfer : nullptr;
152   }
153 
154   void SimulateTimeout(EventType type, uint32_t transfer_id);
155 
156   // Finds an new server or client transfer.
157   template <typename T>
FindNewTransfer(const std::span<T> & transfers,uint32_t transfer_id)158   static Context* FindNewTransfer(const std::span<T>& transfers,
159                                   uint32_t transfer_id) {
160     Context* new_transfer = nullptr;
161 
162     for (Context& context : transfers) {
163       if (context.active()) {
164         if (context.transfer_id() == transfer_id) {
165           // Restart an already active transfer.
166           return &context;
167         }
168       } else {
169         // Store the inactive context as an option, but keep checking for the
170         // restart case.
171         new_transfer = &context;
172       }
173     }
174 
175     return new_transfer;
176   }
177 
encode_buffer()178   const ByteSpan& encode_buffer() const { return encode_buffer_; }
179 
180   void Run() final;
181 
182   void HandleTimeouts();
183 
stream_for(TransferStream stream)184   rpc::Writer& stream_for(TransferStream stream) {
185     switch (stream) {
186       case TransferStream::kClientRead:
187         return client_read_stream_;
188       case TransferStream::kClientWrite:
189         return client_write_stream_;
190       case TransferStream::kServerRead:
191         return server_read_stream_;
192       case TransferStream::kServerWrite:
193         return server_write_stream_;
194     }
195     // An unknown TransferStream value was passed, which means this function
196     // was passed an invalid enum value.
197     PW_ASSERT(false);
198   }
199 
200   // Returns the earliest timeout among all active transfers, up to kMaxTimeout.
201   chrono::SystemClock::time_point GetNextTransferTimeout() const;
202 
203   void StartTransfer(TransferType type,
204                      uint32_t transfer_id,
205                      uint32_t handler_id,
206                      stream::Stream* stream,
207                      const TransferParameters& max_parameters,
208                      Function<void(Status)>&& on_completion,
209                      chrono::SystemClock::duration timeout,
210                      uint8_t max_retries);
211 
212   void ProcessChunk(EventType type, ConstByteSpan chunk);
213 
214   void SetClientStream(TransferStream type, rpc::RawClientReaderWriter& stream);
215   void SetServerStream(TransferStream type, rpc::RawServerReaderWriter& stream);
216 
217   void TransferHandlerEvent(EventType type, Handler& handler);
218 
219   void HandleEvent(const Event& event);
220   Context* FindContextForEvent(const Event& event) const;
221 
222   void SendStatusChunk(const SendStatusChunkEvent& event);
223 
224   sync::TimedThreadNotification event_notification_;
225   sync::BinarySemaphore next_event_ownership_;
226 
227   Event next_event_;
228   Function<void(Status)> staged_on_completion_;
229   rpc::RawClientReaderWriter staged_client_stream_;
230   rpc::RawServerReaderWriter staged_server_stream_;
231 
232   rpc::RawClientReaderWriter client_read_stream_;
233   rpc::RawClientReaderWriter client_write_stream_;
234   rpc::RawServerReaderWriter server_read_stream_;
235   rpc::RawServerReaderWriter server_write_stream_;
236 
237   std::span<ClientContext> client_transfers_;
238   std::span<ServerContext> server_transfers_;
239 
240   // All registered transfer handlers.
241   IntrusiveList<Handler> handlers_;
242 
243   // Buffer in which chunk data is staged for CHUNK events.
244   ByteSpan chunk_buffer_;
245 
246   // Buffer into which responses are encoded. Only ever used from within the
247   // transfer thread, so no locking is required.
248   ByteSpan encode_buffer_;
249 };
250 
251 }  // namespace internal
252 
253 using TransferThread = internal::TransferThread;
254 
255 template <size_t kMaxConcurrentClientTransfers,
256           size_t kMaxConcurrentServerTransfers>
257 class Thread final : public internal::TransferThread {
258  public:
Thread(ByteSpan chunk_buffer,ByteSpan encode_buffer)259   Thread(ByteSpan chunk_buffer, ByteSpan encode_buffer)
260       : internal::TransferThread(
261             client_contexts_, server_contexts_, chunk_buffer, encode_buffer) {}
262 
263  private:
264   std::array<internal::ClientContext, kMaxConcurrentClientTransfers>
265       client_contexts_;
266   std::array<internal::ServerContext, kMaxConcurrentServerTransfers>
267       server_contexts_;
268 };
269 
270 }  // namespace pw::transfer
271