1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #define PW_LOG_MODULE_NAME "TRN"
16 
17 #include "pw_transfer/transfer_thread.h"
18 
19 #include "pw_assert/check.h"
20 #include "pw_log/log.h"
21 #include "pw_transfer/internal/chunk.h"
22 
23 PW_MODIFY_DIAGNOSTICS_PUSH();
24 PW_MODIFY_DIAGNOSTIC(ignored, "-Wmissing-field-initializers");
25 
26 namespace pw::transfer::internal {
27 
Terminate()28 void TransferThread::Terminate() {
29   next_event_ownership_.acquire();
30   next_event_.type = EventType::kTerminate;
31   event_notification_.release();
32 }
33 
SimulateTimeout(EventType type,uint32_t transfer_id)34 void TransferThread::SimulateTimeout(EventType type, uint32_t transfer_id) {
35   next_event_ownership_.acquire();
36 
37   next_event_.type = type;
38   next_event_.chunk = {};
39   next_event_.chunk.transfer_id = transfer_id;
40 
41   event_notification_.release();
42 
43   WaitUntilEventIsProcessed();
44 }
45 
Run()46 void TransferThread::Run() {
47   // Next event starts freed.
48   next_event_ownership_.release();
49 
50   while (true) {
51     if (event_notification_.try_acquire_until(GetNextTransferTimeout())) {
52       if (next_event_.type == EventType::kTerminate) {
53         return;
54       }
55 
56       HandleEvent(next_event_);
57 
58       // Finished processing the event. Allow the next_event struct to be
59       // overwritten.
60       next_event_ownership_.release();
61     }
62 
63     // Regardless of whether an event was received or not, check for any
64     // transfers which have timed out and process them if so.
65     for (Context& context : client_transfers_) {
66       if (context.timed_out()) {
67         context.HandleEvent({.type = EventType::kClientTimeout});
68       }
69     }
70     for (Context& context : server_transfers_) {
71       if (context.timed_out()) {
72         context.HandleEvent({.type = EventType::kServerTimeout});
73       }
74     }
75   }
76 }
77 
GetNextTransferTimeout() const78 chrono::SystemClock::time_point TransferThread::GetNextTransferTimeout() const {
79   chrono::SystemClock::time_point timeout =
80       chrono::SystemClock::TimePointAfterAtLeast(kMaxTimeout);
81 
82   for (Context& context : client_transfers_) {
83     auto ctx_timeout = context.timeout();
84     if (ctx_timeout.has_value() && ctx_timeout.value() < timeout) {
85       timeout = ctx_timeout.value();
86     }
87   }
88   for (Context& context : server_transfers_) {
89     auto ctx_timeout = context.timeout();
90     if (ctx_timeout.has_value() && ctx_timeout.value() < timeout) {
91       timeout = ctx_timeout.value();
92     }
93   }
94 
95   return timeout;
96 }
97 
StartTransfer(TransferType type,uint32_t transfer_id,uint32_t handler_id,stream::Stream * stream,const TransferParameters & max_parameters,Function<void (Status)> && on_completion,chrono::SystemClock::duration timeout,uint8_t max_retries)98 void TransferThread::StartTransfer(TransferType type,
99                                    uint32_t transfer_id,
100                                    uint32_t handler_id,
101                                    stream::Stream* stream,
102                                    const TransferParameters& max_parameters,
103                                    Function<void(Status)>&& on_completion,
104                                    chrono::SystemClock::duration timeout,
105                                    uint8_t max_retries) {
106   // Block until the last event has been processed.
107   next_event_ownership_.acquire();
108 
109   bool is_client_transfer = stream != nullptr;
110 
111   next_event_.type = is_client_transfer ? EventType::kNewClientTransfer
112                                         : EventType::kNewServerTransfer;
113   next_event_.new_transfer = {
114       .type = type,
115       .transfer_id = transfer_id,
116       .handler_id = handler_id,
117       .max_parameters = &max_parameters,
118       .timeout = timeout,
119       .max_retries = max_retries,
120       .transfer_thread = this,
121   };
122 
123   staged_on_completion_ = std::move(on_completion);
124 
125   // The transfer is initialized with either a stream (client-side) or a handler
126   // (server-side). If no stream is provided, try to find a registered handler
127   // with the specified ID.
128   if (is_client_transfer) {
129     next_event_.new_transfer.stream = stream;
130     next_event_.new_transfer.rpc_writer = &static_cast<rpc::Writer&>(
131         type == TransferType::kTransmit ? client_write_stream_
132                                         : client_read_stream_);
133   } else {
134     auto handler = std::find_if(handlers_.begin(),
135                                 handlers_.end(),
136                                 [&](auto& h) { return h.id() == handler_id; });
137     if (handler != handlers_.end()) {
138       next_event_.new_transfer.handler = &*handler;
139       next_event_.new_transfer.rpc_writer = &static_cast<rpc::Writer&>(
140           type == TransferType::kTransmit ? server_read_stream_
141                                           : server_write_stream_);
142     } else {
143       // No handler exists for the transfer: return a NOT_FOUND.
144       next_event_.type = EventType::kSendStatusChunk;
145       next_event_.send_status_chunk = {
146           .transfer_id = transfer_id,
147           .status = Status::NotFound().code(),
148           .stream = type == TransferType::kTransmit
149                         ? TransferStream::kServerRead
150                         : TransferStream::kServerWrite,
151       };
152     }
153   }
154 
155   event_notification_.release();
156 }
157 
ProcessChunk(EventType type,ConstByteSpan chunk)158 void TransferThread::ProcessChunk(EventType type, ConstByteSpan chunk) {
159   // If this assert is hit, there is a bug in the transfer implementation.
160   // Contexts' max_chunk_size_bytes fields should be set based on the size of
161   // chunk_buffer_.
162   PW_CHECK(chunk.size() <= chunk_buffer_.size(),
163            "Transfer received a larger chunk than it can handle.");
164 
165   Result<uint32_t> transfer_id = ExtractTransferId(chunk);
166   if (!transfer_id.ok()) {
167     PW_LOG_ERROR("Received a malformed chunk without a transfer ID");
168     return;
169   }
170 
171   // Block until the last event has been processed.
172   next_event_ownership_.acquire();
173 
174   std::memcpy(chunk_buffer_.data(), chunk.data(), chunk.size());
175 
176   next_event_.type = type;
177   next_event_.chunk = {
178       .transfer_id = *transfer_id,
179       .data = chunk_buffer_.data(),
180       .size = chunk.size(),
181   };
182 
183   event_notification_.release();
184 }
185 
SetClientStream(TransferStream type,rpc::RawClientReaderWriter & stream)186 void TransferThread::SetClientStream(TransferStream type,
187                                      rpc::RawClientReaderWriter& stream) {
188   // Block until the last event has been processed.
189   next_event_ownership_.acquire();
190 
191   next_event_.type = EventType::kSetTransferStream;
192   next_event_.set_transfer_stream = type;
193   staged_client_stream_ = std::move(stream);
194 
195   event_notification_.release();
196 }
197 
SetServerStream(TransferStream type,rpc::RawServerReaderWriter & stream)198 void TransferThread::SetServerStream(TransferStream type,
199                                      rpc::RawServerReaderWriter& stream) {
200   // Block until the last event has been processed.
201   next_event_ownership_.acquire();
202 
203   next_event_.type = EventType::kSetTransferStream;
204   next_event_.set_transfer_stream = type;
205   staged_server_stream_ = std::move(stream);
206 
207   event_notification_.release();
208 }
209 
TransferHandlerEvent(EventType type,internal::Handler & handler)210 void TransferThread::TransferHandlerEvent(EventType type,
211                                           internal::Handler& handler) {
212   // Block until the last event has been processed.
213   next_event_ownership_.acquire();
214 
215   next_event_.type = type;
216   if (type == EventType::kAddTransferHandler) {
217     next_event_.add_transfer_handler = &handler;
218   } else {
219     next_event_.remove_transfer_handler = &handler;
220   }
221 
222   event_notification_.release();
223 }
224 
HandleEvent(const internal::Event & event)225 void TransferThread::HandleEvent(const internal::Event& event) {
226   switch (event.type) {
227     case EventType::kSendStatusChunk:
228       SendStatusChunk(event.send_status_chunk);
229       break;
230 
231     case EventType::kSetTransferStream:
232       switch (event.set_transfer_stream) {
233         case TransferStream::kClientRead:
234           client_read_stream_ = std::move(staged_client_stream_);
235           break;
236 
237         case TransferStream::kClientWrite:
238           client_write_stream_ = std::move(staged_client_stream_);
239           break;
240 
241         case TransferStream::kServerRead:
242           server_read_stream_ = std::move(staged_server_stream_);
243           break;
244 
245         case TransferStream::kServerWrite:
246           server_write_stream_ = std::move(staged_server_stream_);
247           break;
248       }
249       return;
250 
251     case EventType::kAddTransferHandler:
252       handlers_.push_front(*event.add_transfer_handler);
253       return;
254 
255     case EventType::kRemoveTransferHandler:
256       handlers_.remove(*event.remove_transfer_handler);
257       return;
258 
259     default:
260       // Other events are handled by individual transfer contexts.
261       break;
262   }
263 
264   if (Context* ctx = FindContextForEvent(event); ctx != nullptr) {
265     if (event.type == EventType::kNewClientTransfer) {
266       // TODO(frolv): This is terrible.
267       static_cast<ClientContext*>(ctx)->set_on_completion(
268           std::move(staged_on_completion_));
269     }
270 
271     ctx->HandleEvent(event);
272   }
273 }
274 
FindContextForEvent(const internal::Event & event) const275 Context* TransferThread::FindContextForEvent(
276     const internal::Event& event) const {
277   switch (event.type) {
278     case EventType::kNewClientTransfer:
279       return FindNewTransfer(client_transfers_, event.new_transfer.transfer_id);
280     case EventType::kNewServerTransfer:
281       return FindNewTransfer(server_transfers_, event.new_transfer.transfer_id);
282     case EventType::kClientChunk:
283       return FindActiveTransfer(client_transfers_, event.chunk.transfer_id);
284     case EventType::kServerChunk:
285       return FindActiveTransfer(server_transfers_, event.chunk.transfer_id);
286     case EventType::kClientTimeout:  // Manually triggered client timeout
287       return FindActiveTransfer(client_transfers_, event.chunk.transfer_id);
288     case EventType::kServerTimeout:  // Manually triggered server timeout
289       return FindActiveTransfer(server_transfers_, event.chunk.transfer_id);
290     default:
291       return nullptr;
292   }
293 }
294 
SendStatusChunk(const internal::SendStatusChunkEvent & event)295 void TransferThread::SendStatusChunk(
296     const internal::SendStatusChunkEvent& event) {
297   rpc::Writer& destination = stream_for(event.stream);
298 
299   internal::Chunk chunk = {};
300   chunk.transfer_id = event.transfer_id;
301   chunk.status = event.status;
302 
303   Result<ConstByteSpan> result = internal::EncodeChunk(chunk, chunk_buffer_);
304 
305   if (!result.ok()) {
306     PW_LOG_ERROR("Failed to encode final chunk for transfer %u",
307                  static_cast<unsigned>(event.transfer_id));
308     return;
309   }
310 
311   if (!destination.Write(result.value()).ok()) {
312     PW_LOG_ERROR("Failed to send final chunk for transfer %u",
313                  static_cast<unsigned>(event.transfer_id));
314     return;
315   }
316 }
317 
318 }  // namespace pw::transfer::internal
319 
320 PW_MODIFY_DIAGNOSTICS_POP();
321