1 // Copyright 2024 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 #pragma once 15 16 #include <cstdint> 17 18 #include "pw_assert/assert.h" 19 #include "pw_bytes/span.h" 20 #include "pw_chrono/system_clock.h" 21 #include "pw_function/function.h" 22 #include "pw_rpc/raw/client_reader_writer.h" 23 #include "pw_rpc/raw/server_reader_writer.h" 24 #include "pw_span/span.h" 25 #include "pw_sync/binary_semaphore.h" 26 #include "pw_sync/timed_thread_notification.h" 27 #include "pw_thread/thread_core.h" 28 #include "pw_transfer/handler.h" 29 #include "pw_transfer/internal/client_context.h" 30 #include "pw_transfer/internal/context.h" 31 #include "pw_transfer/internal/event.h" 32 #include "pw_transfer/internal/server_context.h" 33 34 namespace pw::transfer { 35 36 class Client; 37 38 namespace internal { 39 40 class TransferThread : public thread::ThreadCore { 41 public: TransferThread(span<ClientContext> client_transfers,span<ServerContext> server_transfers,ByteSpan chunk_buffer,ByteSpan encode_buffer)42 TransferThread(span<ClientContext> client_transfers, 43 span<ServerContext> server_transfers, 44 ByteSpan chunk_buffer, 45 ByteSpan encode_buffer) 46 : client_transfers_(client_transfers), 47 server_transfers_(server_transfers), 48 next_session_id_(1), 49 chunk_buffer_(chunk_buffer), 50 encode_buffer_(encode_buffer) {} 51 52 void StartClientTransfer(TransferType type, 53 ProtocolVersion version, 54 uint32_t resource_id, 55 uint32_t handle_id, 56 stream::Stream* stream, 57 const TransferParameters& max_parameters, 58 Function<void(Status)>&& on_completion, 59 chrono::SystemClock::duration timeout, 60 chrono::SystemClock::duration initial_timeout, 61 uint8_t max_retries, 62 uint32_t max_lifetime_retries, 63 uint32_t initial_offset = 0) { 64 StartTransfer(type, 65 version, 66 Context::kUnassignedSessionId, // Assigned later. 67 resource_id, 68 handle_id, 69 /*raw_chunk=*/{}, 70 stream, 71 max_parameters, 72 std::move(on_completion), 73 timeout, 74 initial_timeout, 75 max_retries, 76 max_lifetime_retries, 77 initial_offset); 78 } 79 80 void StartServerTransfer(TransferType type, 81 ProtocolVersion version, 82 uint32_t session_id, 83 uint32_t resource_id, 84 ConstByteSpan raw_chunk, 85 const TransferParameters& max_parameters, 86 chrono::SystemClock::duration timeout, 87 uint8_t max_retries, 88 uint32_t max_lifetime_retries, 89 uint32_t initial_offset = 0) { 90 StartTransfer(type, 91 version, 92 session_id, 93 resource_id, 94 /*handle_id=*/0, 95 raw_chunk, 96 /*stream=*/nullptr, 97 max_parameters, 98 /*on_completion=*/nullptr, 99 timeout, 100 timeout, 101 max_retries, 102 max_lifetime_retries, 103 initial_offset); 104 } 105 ProcessClientChunk(ConstByteSpan chunk)106 void ProcessClientChunk(ConstByteSpan chunk) { 107 ProcessChunk(EventType::kClientChunk, chunk); 108 } 109 ProcessServerChunk(ConstByteSpan chunk)110 void ProcessServerChunk(ConstByteSpan chunk) { 111 ProcessChunk(EventType::kServerChunk, chunk); 112 } 113 SendServerStatus(TransferType type,uint32_t session_id,ProtocolVersion version,Status status)114 void SendServerStatus(TransferType type, 115 uint32_t session_id, 116 ProtocolVersion version, 117 Status status) { 118 SendStatus(type == TransferType::kTransmit ? TransferStream::kServerRead 119 : TransferStream::kServerWrite, 120 session_id, 121 version, 122 status); 123 } 124 CancelClientTransfer(uint32_t handle_id)125 void CancelClientTransfer(uint32_t handle_id) { 126 EndTransfer(EventType::kClientEndTransfer, 127 IdentifierType::Handle, 128 handle_id, 129 Status::Cancelled(), 130 /*send_status_chunk=*/true); 131 } 132 133 void EndClientTransfer(uint32_t session_id, 134 Status status, 135 bool send_status_chunk = false) { 136 EndTransfer(EventType::kClientEndTransfer, 137 IdentifierType::Session, 138 session_id, 139 status, 140 send_status_chunk); 141 } 142 143 void EndServerTransfer(uint32_t session_id, 144 Status status, 145 bool send_status_chunk = false) { 146 EndTransfer(EventType::kServerEndTransfer, 147 IdentifierType::Session, 148 session_id, 149 status, 150 send_status_chunk); 151 } 152 153 /// Updates the transfer thread's client read stream. 154 /// 155 /// The provided stream should not have an on_next function set. Instead, 156 /// on_next is passed separately to ensure that it is only set when the new 157 /// stream becomes the transfer thread's primary stream. 158 /// 159 /// If the thread has an existing active client read stream, closes it and 160 /// terminates any transfers running on it. SetClientReadStream(rpc::RawClientReaderWriter & read_stream,Function<void (ConstByteSpan)> && on_next)161 void SetClientReadStream(rpc::RawClientReaderWriter& read_stream, 162 Function<void(ConstByteSpan)>&& on_next) { 163 // Clear the existing callback to prevent incoming chunks from blocking on 164 // the transfer thread and preventing the call's cleanup. 165 client_read_stream_.set_on_next(nullptr); 166 staged_client_stream_ = std::move(read_stream); 167 staged_client_on_next_ = std::move(on_next); 168 SetStream(TransferStream::kClientRead); 169 } 170 171 /// Updates the transfer thread's client write stream. 172 /// 173 /// The provided stream should not have an on_next function set. Instead, 174 /// on_next is passed separately to ensure that it is only set when the new 175 /// stream becomes the transfer thread's primary stream. 176 /// 177 /// If the thread has an existing active client write stream, closes it and 178 /// terminates any transfers running on it. SetClientWriteStream(rpc::RawClientReaderWriter & write_stream,Function<void (ConstByteSpan)> && on_next)179 void SetClientWriteStream(rpc::RawClientReaderWriter& write_stream, 180 Function<void(ConstByteSpan)>&& on_next) { 181 // Clear the existing callback to prevent incoming chunks from blocking on 182 // the transfer thread and preventing the call's cleanup. 183 client_write_stream_.set_on_next(nullptr); 184 staged_client_stream_ = std::move(write_stream); 185 staged_client_on_next_ = std::move(on_next); 186 SetStream(TransferStream::kClientWrite); 187 } 188 189 /// Updates the transfer thread's server read stream. 190 /// 191 /// The provided stream should not have an on_next function set. Instead, 192 /// on_next is passed separately to ensure that it is only set when the new 193 /// stream becomes the transfer thread's primary stream. 194 /// 195 /// If the thread has an existing active server read stream, closes it and 196 /// terminates any transfers running on it. SetServerReadStream(rpc::RawServerReaderWriter & read_stream,Function<void (ConstByteSpan)> && on_next)197 void SetServerReadStream(rpc::RawServerReaderWriter& read_stream, 198 Function<void(ConstByteSpan)>&& on_next) { 199 // Clear the existing callback to prevent incoming chunks from blocking on 200 // the transfer thread and preventing the call's cleanup. 201 server_read_stream_.set_on_next(nullptr); 202 staged_server_stream_ = std::move(read_stream); 203 staged_server_on_next_ = std::move(on_next); 204 SetStream(TransferStream::kServerRead); 205 } 206 207 /// Updates the transfer thread's server write stream. 208 /// 209 /// The provided stream should not have an on_next function set. Instead, 210 /// on_next is passed separately to ensure that it is only set when the new 211 /// stream becomes the transfer thread's primary stream. 212 /// 213 /// If the thread has an existing active server write stream, closes it and 214 /// terminates any transfers running on it. SetServerWriteStream(rpc::RawServerReaderWriter & write_stream,Function<void (ConstByteSpan)> && on_next)215 void SetServerWriteStream(rpc::RawServerReaderWriter& write_stream, 216 Function<void(ConstByteSpan)>&& on_next) { 217 // Clear the existing callback to prevent incoming chunks from blocking on 218 // the transfer thread and preventing the call's cleanup. 219 server_write_stream_.set_on_next(nullptr); 220 staged_server_stream_ = std::move(write_stream); 221 staged_server_on_next_ = std::move(on_next); 222 SetStream(TransferStream::kServerWrite); 223 } 224 AddTransferHandler(Handler & handler)225 void AddTransferHandler(Handler& handler) { 226 TransferHandlerEvent(EventType::kAddTransferHandler, handler); 227 } 228 RemoveTransferHandler(Handler & handler)229 void RemoveTransferHandler(Handler& handler) { 230 TransferHandlerEvent(EventType::kRemoveTransferHandler, handler); 231 // Ensure this function blocks until the transfer handler is fully cleaned 232 // up. 233 WaitUntilEventIsProcessed(); 234 } 235 max_chunk_size()236 size_t max_chunk_size() const { return chunk_buffer_.size(); } 237 238 // For testing only: terminates the transfer thread with a kTerminate event. 239 void Terminate(); 240 241 // For testing only: blocks until the next event can be acquired, which means 242 // a previously enqueued event has been processed. WaitUntilEventIsProcessed()243 void WaitUntilEventIsProcessed() { 244 next_event_ownership_.acquire(); 245 next_event_ownership_.release(); 246 } 247 248 // For testing only: simulates a timeout event for a client transfer. SimulateClientTimeout(uint32_t session_id)249 void SimulateClientTimeout(uint32_t session_id) { 250 SimulateTimeout(EventType::kClientTimeout, session_id); 251 } 252 253 // For testing only: simulates a timeout event for a server transfer. SimulateServerTimeout(uint32_t session_id)254 void SimulateServerTimeout(uint32_t session_id) { 255 SimulateTimeout(EventType::kServerTimeout, session_id); 256 } 257 258 void EnqueueResourceEvent(uint32_t resource_id, 259 ResourceStatusCallback&& callback); 260 261 private: 262 friend class transfer::Client; 263 friend class Context; 264 265 // Maximum amount of time between transfer thread runs. 266 static constexpr chrono::SystemClock::duration kMaxTimeout = 267 std::chrono::seconds(2); 268 269 void UpdateClientTransfer(uint32_t handle_id, size_t transfer_size_bytes); 270 271 // Finds an active server or client transfer, matching against its legacy ID. 272 template <typename T> FindActiveTransferByLegacyId(const span<T> & transfers,uint32_t session_id)273 static Context* FindActiveTransferByLegacyId(const span<T>& transfers, 274 uint32_t session_id) { 275 auto transfer = 276 std::find_if(transfers.begin(), transfers.end(), [session_id](auto& c) { 277 return c.initialized() && c.session_id() == session_id; 278 }); 279 return transfer != transfers.end() ? &*transfer : nullptr; 280 } 281 282 // Finds an active server or client transfer, matching against resource ID. 283 template <typename T> FindActiveTransferByResourceId(const span<T> & transfers,uint32_t resource_id)284 static Context* FindActiveTransferByResourceId(const span<T>& transfers, 285 uint32_t resource_id) { 286 auto transfer = std::find_if( 287 transfers.begin(), transfers.end(), [resource_id](auto& c) { 288 return c.initialized() && c.resource_id() == resource_id; 289 }); 290 return transfer != transfers.end() ? &*transfer : nullptr; 291 } 292 FindClientTransferByHandleId(uint32_t handle_id)293 Context* FindClientTransferByHandleId(uint32_t handle_id) const { 294 auto transfer = 295 std::find_if(client_transfers_.begin(), 296 client_transfers_.end(), 297 [handle_id](auto& c) { 298 return c.initialized() && c.handle_id() == handle_id; 299 }); 300 return transfer != client_transfers_.end() ? &*transfer : nullptr; 301 } 302 303 void SimulateTimeout(EventType type, uint32_t session_id); 304 305 // Finds an new server or client transfer. 306 template <typename T> FindNewTransfer(const span<T> & transfers,uint32_t session_id)307 static Context* FindNewTransfer(const span<T>& transfers, 308 uint32_t session_id) { 309 Context* new_transfer = nullptr; 310 311 for (Context& context : transfers) { 312 if (context.active()) { 313 if (context.session_id() == session_id) { 314 // Restart an already active transfer. 315 return &context; 316 } 317 } else { 318 // Store the inactive context as an option, but keep checking for the 319 // restart case. 320 new_transfer = &context; 321 } 322 } 323 324 return new_transfer; 325 } 326 encode_buffer()327 const ByteSpan& encode_buffer() const { return encode_buffer_; } 328 329 void Run() final; 330 331 void HandleTimeouts(); 332 stream_for(TransferStream stream)333 rpc::Writer& stream_for(TransferStream stream) { 334 switch (stream) { 335 case TransferStream::kClientRead: 336 return client_read_stream_.as_writer(); 337 case TransferStream::kClientWrite: 338 return client_write_stream_.as_writer(); 339 case TransferStream::kServerRead: 340 return server_read_stream_.as_writer(); 341 case TransferStream::kServerWrite: 342 return server_write_stream_.as_writer(); 343 } 344 // An unknown TransferStream value was passed, which means this function 345 // was passed an invalid enum value. 346 PW_ASSERT(false); 347 } 348 349 // Returns the earliest timeout among all active transfers, up to kMaxTimeout. 350 chrono::SystemClock::time_point GetNextTransferTimeout() const; 351 352 uint32_t AssignSessionId(); 353 354 void StartTransfer(TransferType type, 355 ProtocolVersion version, 356 uint32_t session_id, 357 uint32_t resource_id, 358 uint32_t handle_id, 359 ConstByteSpan raw_chunk, 360 stream::Stream* stream, 361 const TransferParameters& max_parameters, 362 Function<void(Status)>&& on_completion, 363 chrono::SystemClock::duration timeout, 364 chrono::SystemClock::duration initial_timeout, 365 uint8_t max_retries, 366 uint32_t max_lifetime_retries, 367 uint32_t initial_offset); 368 369 void ProcessChunk(EventType type, ConstByteSpan chunk); 370 371 void SendStatus(TransferStream stream, 372 uint32_t session_id, 373 ProtocolVersion version, 374 Status status); 375 376 void EndTransfer(EventType type, 377 IdentifierType id_type, 378 uint32_t session_id, 379 Status status, 380 bool send_status_chunk); 381 382 void SetStream(TransferStream stream); 383 void HandleSetStreamEvent(TransferStream stream); 384 385 void TransferHandlerEvent(EventType type, Handler& handler); 386 387 void HandleEvent(const Event& event); 388 Context* FindContextForEvent(const Event& event) const; 389 390 void SendStatusChunk(const SendStatusChunkEvent& event); 391 392 void GetResourceState(uint32_t resource_id); 393 394 sync::TimedThreadNotification event_notification_; 395 sync::BinarySemaphore next_event_ownership_; 396 397 Event next_event_; 398 Function<void(Status)> staged_on_completion_; 399 400 rpc::RawClientReaderWriter client_read_stream_; 401 rpc::RawClientReaderWriter client_write_stream_; 402 rpc::RawClientReaderWriter staged_client_stream_; 403 Function<void(ConstByteSpan)> staged_client_on_next_; 404 405 rpc::RawServerReaderWriter server_read_stream_; 406 rpc::RawServerReaderWriter server_write_stream_; 407 rpc::RawServerReaderWriter staged_server_stream_; 408 Function<void(ConstByteSpan)> staged_server_on_next_; 409 410 span<ClientContext> client_transfers_; 411 span<ServerContext> server_transfers_; 412 413 // Identifier to use for the next started transfer, unique over the RPC 414 // channel between the transfer client and server. 415 // 416 // TODO(frolv): If we ever support changing the RPC channel, this should be 417 // reset to 1. 418 uint32_t next_session_id_; 419 420 // All registered transfer handlers. 421 IntrusiveList<Handler> handlers_; 422 423 // Buffer in which chunk data is staged for CHUNK events. 424 ByteSpan chunk_buffer_; 425 426 // Buffer into which responses are encoded. Only ever used from within the 427 // transfer thread, so no locking is required. 428 ByteSpan encode_buffer_; 429 430 ResourceStatusCallback resource_status_callback_ = nullptr; 431 }; 432 433 } // namespace internal 434 435 using TransferThread = internal::TransferThread; 436 437 template <size_t kMaxConcurrentClientTransfers, 438 size_t kMaxConcurrentServerTransfers> 439 class Thread final : public internal::TransferThread { 440 public: Thread(ByteSpan chunk_buffer,ByteSpan encode_buffer)441 Thread(ByteSpan chunk_buffer, ByteSpan encode_buffer) 442 : internal::TransferThread( 443 client_contexts_, server_contexts_, chunk_buffer, encode_buffer) {} 444 445 private: 446 std::array<internal::ClientContext, kMaxConcurrentClientTransfers> 447 client_contexts_; 448 std::array<internal::ServerContext, kMaxConcurrentServerTransfers> 449 server_contexts_; 450 }; 451 452 } // namespace pw::transfer 453