1 // Copyright 2022 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 #pragma once 15 16 #include <cstdint> 17 #include <span> 18 19 #include "pw_assert/assert.h" 20 #include "pw_chrono/system_clock.h" 21 #include "pw_function/function.h" 22 #include "pw_preprocessor/compiler.h" 23 #include "pw_rpc/raw/client_reader_writer.h" 24 #include "pw_rpc/raw/server_reader_writer.h" 25 #include "pw_sync/binary_semaphore.h" 26 #include "pw_sync/timed_thread_notification.h" 27 #include "pw_thread/thread_core.h" 28 #include "pw_transfer/handler.h" 29 #include "pw_transfer/internal/client_context.h" 30 #include "pw_transfer/internal/context.h" 31 #include "pw_transfer/internal/event.h" 32 #include "pw_transfer/internal/server_context.h" 33 34 namespace pw::transfer { 35 namespace internal { 36 37 class TransferThread : public thread::ThreadCore { 38 public: TransferThread(std::span<ClientContext> client_transfers,std::span<ServerContext> server_transfers,ByteSpan chunk_buffer,ByteSpan encode_buffer)39 TransferThread(std::span<ClientContext> client_transfers, 40 std::span<ServerContext> server_transfers, 41 ByteSpan chunk_buffer, 42 ByteSpan encode_buffer) 43 : client_transfers_(client_transfers), 44 server_transfers_(server_transfers), 45 chunk_buffer_(chunk_buffer), 46 encode_buffer_(encode_buffer) {} 47 StartClientTransfer(TransferType type,uint32_t transfer_id,uint32_t handler_id,stream::Stream * stream,const TransferParameters & max_parameters,Function<void (Status)> && on_completion,chrono::SystemClock::duration timeout,uint8_t max_retries)48 void StartClientTransfer(TransferType type, 49 uint32_t transfer_id, 50 uint32_t handler_id, 51 stream::Stream* stream, 52 const TransferParameters& max_parameters, 53 Function<void(Status)>&& on_completion, 54 chrono::SystemClock::duration timeout, 55 uint8_t max_retries) { 56 StartTransfer(type, 57 transfer_id, 58 handler_id, 59 stream, 60 max_parameters, 61 std::move(on_completion), 62 timeout, 63 max_retries); 64 } 65 StartServerTransfer(TransferType type,uint32_t transfer_id,uint32_t handler_id,const TransferParameters & max_parameters,chrono::SystemClock::duration timeout,uint8_t max_retries)66 void StartServerTransfer(TransferType type, 67 uint32_t transfer_id, 68 uint32_t handler_id, 69 const TransferParameters& max_parameters, 70 chrono::SystemClock::duration timeout, 71 uint8_t max_retries) { 72 StartTransfer(type, 73 transfer_id, 74 handler_id, 75 /*stream=*/nullptr, 76 max_parameters, 77 /*on_completion=*/nullptr, 78 timeout, 79 max_retries); 80 } 81 ProcessClientChunk(ConstByteSpan chunk)82 void ProcessClientChunk(ConstByteSpan chunk) { 83 ProcessChunk(EventType::kClientChunk, chunk); 84 } 85 ProcessServerChunk(ConstByteSpan chunk)86 void ProcessServerChunk(ConstByteSpan chunk) { 87 ProcessChunk(EventType::kServerChunk, chunk); 88 } 89 SetClientReadStream(rpc::RawClientReaderWriter & read_stream)90 void SetClientReadStream(rpc::RawClientReaderWriter& read_stream) { 91 SetClientStream(TransferStream::kClientRead, read_stream); 92 } 93 SetClientWriteStream(rpc::RawClientReaderWriter & write_stream)94 void SetClientWriteStream(rpc::RawClientReaderWriter& write_stream) { 95 SetClientStream(TransferStream::kClientWrite, write_stream); 96 } 97 SetServerReadStream(rpc::RawServerReaderWriter & read_stream)98 void SetServerReadStream(rpc::RawServerReaderWriter& read_stream) { 99 SetServerStream(TransferStream::kServerRead, read_stream); 100 } 101 SetServerWriteStream(rpc::RawServerReaderWriter & write_stream)102 void SetServerWriteStream(rpc::RawServerReaderWriter& write_stream) { 103 SetServerStream(TransferStream::kServerWrite, write_stream); 104 } 105 AddTransferHandler(Handler & handler)106 void AddTransferHandler(Handler& handler) { 107 TransferHandlerEvent(EventType::kAddTransferHandler, handler); 108 } 109 RemoveTransferHandler(Handler & handler)110 void RemoveTransferHandler(Handler& handler) { 111 TransferHandlerEvent(EventType::kRemoveTransferHandler, handler); 112 } 113 max_chunk_size()114 size_t max_chunk_size() const { return chunk_buffer_.size(); } 115 116 // For testing only: terminates the transfer thread with a kTerminate event. 117 void Terminate(); 118 119 // For testing only: blocks until the next event can be acquired, which means 120 // a previously enqueued event has been processed. WaitUntilEventIsProcessed()121 void WaitUntilEventIsProcessed() { 122 next_event_ownership_.acquire(); 123 next_event_ownership_.release(); 124 } 125 126 // For testing only: simulates a timeout event for a client transfer. SimulateClientTimeout(uint32_t transfer_id)127 void SimulateClientTimeout(uint32_t transfer_id) { 128 SimulateTimeout(EventType::kClientTimeout, transfer_id); 129 } 130 131 // For testing only: simulates a timeout event for a server transfer. SimulateServerTimeout(uint32_t transfer_id)132 void SimulateServerTimeout(uint32_t transfer_id) { 133 SimulateTimeout(EventType::kServerTimeout, transfer_id); 134 } 135 136 private: 137 friend class Context; 138 139 // Maximum amount of time between transfer thread runs. 140 static constexpr chrono::SystemClock::duration kMaxTimeout = 141 std::chrono::seconds(2); 142 143 // Finds an active server or client transfer. 144 template <typename T> FindActiveTransfer(const std::span<T> & transfers,uint32_t transfer_id)145 static Context* FindActiveTransfer(const std::span<T>& transfers, 146 uint32_t transfer_id) { 147 auto transfer = std::find_if( 148 transfers.begin(), transfers.end(), [transfer_id](auto& c) { 149 return c.initialized() && c.transfer_id() == transfer_id; 150 }); 151 return transfer != transfers.end() ? &*transfer : nullptr; 152 } 153 154 void SimulateTimeout(EventType type, uint32_t transfer_id); 155 156 // Finds an new server or client transfer. 157 template <typename T> FindNewTransfer(const std::span<T> & transfers,uint32_t transfer_id)158 static Context* FindNewTransfer(const std::span<T>& transfers, 159 uint32_t transfer_id) { 160 Context* new_transfer = nullptr; 161 162 for (Context& context : transfers) { 163 if (context.active()) { 164 if (context.transfer_id() == transfer_id) { 165 // Restart an already active transfer. 166 return &context; 167 } 168 } else { 169 // Store the inactive context as an option, but keep checking for the 170 // restart case. 171 new_transfer = &context; 172 } 173 } 174 175 return new_transfer; 176 } 177 encode_buffer()178 const ByteSpan& encode_buffer() const { return encode_buffer_; } 179 180 void Run() final; 181 182 void HandleTimeouts(); 183 stream_for(TransferStream stream)184 rpc::Writer& stream_for(TransferStream stream) { 185 switch (stream) { 186 case TransferStream::kClientRead: 187 return client_read_stream_; 188 case TransferStream::kClientWrite: 189 return client_write_stream_; 190 case TransferStream::kServerRead: 191 return server_read_stream_; 192 case TransferStream::kServerWrite: 193 return server_write_stream_; 194 } 195 // An unknown TransferStream value was passed, which means this function 196 // was passed an invalid enum value. 197 PW_ASSERT(false); 198 } 199 200 // Returns the earliest timeout among all active transfers, up to kMaxTimeout. 201 chrono::SystemClock::time_point GetNextTransferTimeout() const; 202 203 void StartTransfer(TransferType type, 204 uint32_t transfer_id, 205 uint32_t handler_id, 206 stream::Stream* stream, 207 const TransferParameters& max_parameters, 208 Function<void(Status)>&& on_completion, 209 chrono::SystemClock::duration timeout, 210 uint8_t max_retries); 211 212 void ProcessChunk(EventType type, ConstByteSpan chunk); 213 214 void SetClientStream(TransferStream type, rpc::RawClientReaderWriter& stream); 215 void SetServerStream(TransferStream type, rpc::RawServerReaderWriter& stream); 216 217 void TransferHandlerEvent(EventType type, Handler& handler); 218 219 void HandleEvent(const Event& event); 220 Context* FindContextForEvent(const Event& event) const; 221 222 void SendStatusChunk(const SendStatusChunkEvent& event); 223 224 sync::TimedThreadNotification event_notification_; 225 sync::BinarySemaphore next_event_ownership_; 226 227 Event next_event_; 228 Function<void(Status)> staged_on_completion_; 229 rpc::RawClientReaderWriter staged_client_stream_; 230 rpc::RawServerReaderWriter staged_server_stream_; 231 232 rpc::RawClientReaderWriter client_read_stream_; 233 rpc::RawClientReaderWriter client_write_stream_; 234 rpc::RawServerReaderWriter server_read_stream_; 235 rpc::RawServerReaderWriter server_write_stream_; 236 237 std::span<ClientContext> client_transfers_; 238 std::span<ServerContext> server_transfers_; 239 240 // All registered transfer handlers. 241 IntrusiveList<Handler> handlers_; 242 243 // Buffer in which chunk data is staged for CHUNK events. 244 ByteSpan chunk_buffer_; 245 246 // Buffer into which responses are encoded. Only ever used from within the 247 // transfer thread, so no locking is required. 248 ByteSpan encode_buffer_; 249 }; 250 251 } // namespace internal 252 253 using TransferThread = internal::TransferThread; 254 255 template <size_t kMaxConcurrentClientTransfers, 256 size_t kMaxConcurrentServerTransfers> 257 class Thread final : public internal::TransferThread { 258 public: Thread(ByteSpan chunk_buffer,ByteSpan encode_buffer)259 Thread(ByteSpan chunk_buffer, ByteSpan encode_buffer) 260 : internal::TransferThread( 261 client_contexts_, server_contexts_, chunk_buffer, encode_buffer) {} 262 263 private: 264 std::array<internal::ClientContext, kMaxConcurrentClientTransfers> 265 client_contexts_; 266 std::array<internal::ServerContext, kMaxConcurrentServerTransfers> 267 server_contexts_; 268 }; 269 270 } // namespace pw::transfer 271