1 // Copyright 2022 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 #pragma once 15 16 #include <cstdint> 17 18 #include "pw_assert/assert.h" 19 #include "pw_chrono/system_clock.h" 20 #include "pw_function/function.h" 21 #include "pw_preprocessor/compiler.h" 22 #include "pw_rpc/raw/client_reader_writer.h" 23 #include "pw_rpc/raw/server_reader_writer.h" 24 #include "pw_span/span.h" 25 #include "pw_sync/binary_semaphore.h" 26 #include "pw_sync/timed_thread_notification.h" 27 #include "pw_thread/thread_core.h" 28 #include "pw_transfer/handler.h" 29 #include "pw_transfer/internal/client_context.h" 30 #include "pw_transfer/internal/context.h" 31 #include "pw_transfer/internal/event.h" 32 #include "pw_transfer/internal/server_context.h" 33 34 namespace pw::transfer { 35 namespace internal { 36 37 class TransferThread : public thread::ThreadCore { 38 public: TransferThread(span<ClientContext> client_transfers,span<ServerContext> server_transfers,ByteSpan chunk_buffer,ByteSpan encode_buffer)39 TransferThread(span<ClientContext> client_transfers, 40 span<ServerContext> server_transfers, 41 ByteSpan chunk_buffer, 42 ByteSpan encode_buffer) 43 : client_transfers_(client_transfers), 44 server_transfers_(server_transfers), 45 chunk_buffer_(chunk_buffer), 46 encode_buffer_(encode_buffer) {} 47 StartClientTransfer(TransferType type,ProtocolVersion version,uint32_t resource_id,stream::Stream * stream,const TransferParameters & max_parameters,Function<void (Status)> && on_completion,chrono::SystemClock::duration timeout,uint8_t max_retries,uint32_t max_lifetime_retries)48 void StartClientTransfer(TransferType type, 49 ProtocolVersion version, 50 uint32_t resource_id, 51 stream::Stream* stream, 52 const TransferParameters& max_parameters, 53 Function<void(Status)>&& on_completion, 54 chrono::SystemClock::duration timeout, 55 uint8_t max_retries, 56 uint32_t max_lifetime_retries) { 57 uint32_t session_id = version == ProtocolVersion::kLegacy 58 ? resource_id 59 : Context::kUnassignedSessionId; 60 StartTransfer(type, 61 version, 62 session_id, 63 resource_id, 64 /*raw_chunk=*/{}, 65 stream, 66 max_parameters, 67 std::move(on_completion), 68 timeout, 69 max_retries, 70 max_lifetime_retries); 71 } 72 StartServerTransfer(TransferType type,ProtocolVersion version,uint32_t session_id,uint32_t resource_id,ConstByteSpan raw_chunk,const TransferParameters & max_parameters,chrono::SystemClock::duration timeout,uint8_t max_retries,uint32_t max_lifetime_retries)73 void StartServerTransfer(TransferType type, 74 ProtocolVersion version, 75 uint32_t session_id, 76 uint32_t resource_id, 77 ConstByteSpan raw_chunk, 78 const TransferParameters& max_parameters, 79 chrono::SystemClock::duration timeout, 80 uint8_t max_retries, 81 uint32_t max_lifetime_retries) { 82 StartTransfer(type, 83 version, 84 session_id, 85 resource_id, 86 raw_chunk, 87 /*stream=*/nullptr, 88 max_parameters, 89 /*on_completion=*/nullptr, 90 timeout, 91 max_retries, 92 max_lifetime_retries); 93 } 94 ProcessClientChunk(ConstByteSpan chunk)95 void ProcessClientChunk(ConstByteSpan chunk) { 96 ProcessChunk(EventType::kClientChunk, chunk); 97 } 98 ProcessServerChunk(ConstByteSpan chunk)99 void ProcessServerChunk(ConstByteSpan chunk) { 100 ProcessChunk(EventType::kServerChunk, chunk); 101 } 102 103 void EndClientTransfer(uint32_t session_id, 104 Status status, 105 bool send_status_chunk = false) { 106 EndTransfer( 107 EventType::kClientEndTransfer, session_id, status, send_status_chunk); 108 } 109 110 void EndServerTransfer(uint32_t session_id, 111 Status status, 112 bool send_status_chunk = false) { 113 EndTransfer( 114 EventType::kServerEndTransfer, session_id, status, send_status_chunk); 115 } 116 117 // Move the read/write streams on this thread instead of the transfer thread. 118 // RPC call objects are synchronized by pw_rpc, so this move will be atomic 119 // with respect to the transfer thread. SetClientReadStream(rpc::RawClientReaderWriter & read_stream)120 void SetClientReadStream(rpc::RawClientReaderWriter& read_stream) { 121 client_read_stream_ = std::move(read_stream); 122 } 123 SetClientWriteStream(rpc::RawClientReaderWriter & write_stream)124 void SetClientWriteStream(rpc::RawClientReaderWriter& write_stream) { 125 client_write_stream_ = std::move(write_stream); 126 } 127 SetServerReadStream(rpc::RawServerReaderWriter & read_stream)128 void SetServerReadStream(rpc::RawServerReaderWriter& read_stream) { 129 server_read_stream_ = std::move(read_stream); 130 } 131 SetServerWriteStream(rpc::RawServerReaderWriter & write_stream)132 void SetServerWriteStream(rpc::RawServerReaderWriter& write_stream) { 133 server_write_stream_ = std::move(write_stream); 134 } 135 AddTransferHandler(Handler & handler)136 void AddTransferHandler(Handler& handler) { 137 TransferHandlerEvent(EventType::kAddTransferHandler, handler); 138 } 139 RemoveTransferHandler(Handler & handler)140 void RemoveTransferHandler(Handler& handler) { 141 TransferHandlerEvent(EventType::kRemoveTransferHandler, handler); 142 // Ensure this function blocks until the transfer handler is fully cleaned 143 // up. 144 WaitUntilEventIsProcessed(); 145 } 146 max_chunk_size()147 size_t max_chunk_size() const { return chunk_buffer_.size(); } 148 149 // For testing only: terminates the transfer thread with a kTerminate event. 150 void Terminate(); 151 152 // For testing only: blocks until the next event can be acquired, which means 153 // a previously enqueued event has been processed. WaitUntilEventIsProcessed()154 void WaitUntilEventIsProcessed() { 155 next_event_ownership_.acquire(); 156 next_event_ownership_.release(); 157 } 158 159 // For testing only: simulates a timeout event for a client transfer. SimulateClientTimeout(uint32_t session_id)160 void SimulateClientTimeout(uint32_t session_id) { 161 SimulateTimeout(EventType::kClientTimeout, session_id); 162 } 163 164 // For testing only: simulates a timeout event for a server transfer. SimulateServerTimeout(uint32_t session_id)165 void SimulateServerTimeout(uint32_t session_id) { 166 SimulateTimeout(EventType::kServerTimeout, session_id); 167 } 168 169 private: 170 friend class Context; 171 172 // Maximum amount of time between transfer thread runs. 173 static constexpr chrono::SystemClock::duration kMaxTimeout = 174 std::chrono::seconds(2); 175 176 // Finds an active server or client transfer, matching against its legacy ID. 177 template <typename T> FindActiveTransferByLegacyId(const span<T> & transfers,uint32_t session_id)178 static Context* FindActiveTransferByLegacyId(const span<T>& transfers, 179 uint32_t session_id) { 180 auto transfer = 181 std::find_if(transfers.begin(), transfers.end(), [session_id](auto& c) { 182 return c.initialized() && c.id() == session_id; 183 }); 184 return transfer != transfers.end() ? &*transfer : nullptr; 185 } 186 187 // Finds an active server or client transfer, matching against resource ID. 188 template <typename T> FindActiveTransferByResourceId(const span<T> & transfers,uint32_t resource_id)189 static Context* FindActiveTransferByResourceId(const span<T>& transfers, 190 uint32_t resource_id) { 191 auto transfer = std::find_if( 192 transfers.begin(), transfers.end(), [resource_id](auto& c) { 193 return c.initialized() && c.resource_id() == resource_id; 194 }); 195 return transfer != transfers.end() ? &*transfer : nullptr; 196 } 197 198 void SimulateTimeout(EventType type, uint32_t session_id); 199 200 // Finds an new server or client transfer. 201 template <typename T> FindNewTransfer(const span<T> & transfers,uint32_t session_id)202 static Context* FindNewTransfer(const span<T>& transfers, 203 uint32_t session_id) { 204 Context* new_transfer = nullptr; 205 206 for (Context& context : transfers) { 207 if (context.active()) { 208 if (context.session_id() == session_id) { 209 // Restart an already active transfer. 210 return &context; 211 } 212 } else { 213 // Store the inactive context as an option, but keep checking for the 214 // restart case. 215 new_transfer = &context; 216 } 217 } 218 219 return new_transfer; 220 } 221 encode_buffer()222 const ByteSpan& encode_buffer() const { return encode_buffer_; } 223 224 void Run() final; 225 226 void HandleTimeouts(); 227 stream_for(TransferStream stream)228 rpc::Writer& stream_for(TransferStream stream) { 229 switch (stream) { 230 case TransferStream::kClientRead: 231 return client_read_stream_; 232 case TransferStream::kClientWrite: 233 return client_write_stream_; 234 case TransferStream::kServerRead: 235 return server_read_stream_; 236 case TransferStream::kServerWrite: 237 return server_write_stream_; 238 } 239 // An unknown TransferStream value was passed, which means this function 240 // was passed an invalid enum value. 241 PW_ASSERT(false); 242 } 243 244 // Returns the earliest timeout among all active transfers, up to kMaxTimeout. 245 chrono::SystemClock::time_point GetNextTransferTimeout() const; 246 247 void StartTransfer(TransferType type, 248 ProtocolVersion version, 249 uint32_t session_id, 250 uint32_t resource_id, 251 ConstByteSpan raw_chunk, 252 stream::Stream* stream, 253 const TransferParameters& max_parameters, 254 Function<void(Status)>&& on_completion, 255 chrono::SystemClock::duration timeout, 256 uint8_t max_retries, 257 uint32_t max_lifetime_retries); 258 259 void ProcessChunk(EventType type, ConstByteSpan chunk); 260 261 void EndTransfer(EventType type, 262 uint32_t session_id, 263 Status status, 264 bool send_status_chunk); 265 266 void TransferHandlerEvent(EventType type, Handler& handler); 267 268 void HandleEvent(const Event& event); 269 Context* FindContextForEvent(const Event& event) const; 270 271 void SendStatusChunk(const SendStatusChunkEvent& event); 272 273 sync::TimedThreadNotification event_notification_; 274 sync::BinarySemaphore next_event_ownership_; 275 276 Event next_event_; 277 Function<void(Status)> staged_on_completion_; 278 279 rpc::RawClientReaderWriter client_read_stream_; 280 rpc::RawClientReaderWriter client_write_stream_; 281 rpc::RawServerReaderWriter server_read_stream_; 282 rpc::RawServerReaderWriter server_write_stream_; 283 284 span<ClientContext> client_transfers_; 285 span<ServerContext> server_transfers_; 286 287 // All registered transfer handlers. 288 IntrusiveList<Handler> handlers_; 289 290 // Buffer in which chunk data is staged for CHUNK events. 291 ByteSpan chunk_buffer_; 292 293 // Buffer into which responses are encoded. Only ever used from within the 294 // transfer thread, so no locking is required. 295 ByteSpan encode_buffer_; 296 }; 297 298 } // namespace internal 299 300 using TransferThread = internal::TransferThread; 301 302 template <size_t kMaxConcurrentClientTransfers, 303 size_t kMaxConcurrentServerTransfers> 304 class Thread final : public internal::TransferThread { 305 public: Thread(ByteSpan chunk_buffer,ByteSpan encode_buffer)306 Thread(ByteSpan chunk_buffer, ByteSpan encode_buffer) 307 : internal::TransferThread( 308 client_contexts_, server_contexts_, chunk_buffer, encode_buffer) {} 309 310 private: 311 std::array<internal::ClientContext, kMaxConcurrentClientTransfers> 312 client_contexts_; 313 std::array<internal::ServerContext, kMaxConcurrentServerTransfers> 314 server_contexts_; 315 }; 316 317 } // namespace pw::transfer 318