• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #define PW_LOG_MODULE_NAME "TRN"
16 
17 #include "pw_transfer/transfer_thread.h"
18 
19 #include "pw_assert/check.h"
20 #include "pw_log/log.h"
21 #include "pw_transfer/internal/chunk.h"
22 
23 PW_MODIFY_DIAGNOSTICS_PUSH();
24 PW_MODIFY_DIAGNOSTIC(ignored, "-Wmissing-field-initializers");
25 
26 namespace pw::transfer::internal {
27 
Terminate()28 void TransferThread::Terminate() {
29   next_event_ownership_.acquire();
30   next_event_.type = EventType::kTerminate;
31   event_notification_.release();
32 }
33 
SimulateTimeout(EventType type,uint32_t session_id)34 void TransferThread::SimulateTimeout(EventType type, uint32_t session_id) {
35   next_event_ownership_.acquire();
36 
37   next_event_.type = type;
38   next_event_.chunk = {};
39   next_event_.chunk.context_identifier = session_id;
40 
41   event_notification_.release();
42 
43   WaitUntilEventIsProcessed();
44 }
45 
Run()46 void TransferThread::Run() {
47   // Next event starts freed.
48   next_event_ownership_.release();
49 
50   while (true) {
51     if (event_notification_.try_acquire_until(GetNextTransferTimeout())) {
52       HandleEvent(next_event_);
53 
54       // Sample event type before we release ownership of next_event_.
55       bool is_terminating = next_event_.type == EventType::kTerminate;
56 
57       // Finished processing the event. Allow the next_event struct to be
58       // overwritten.
59       next_event_ownership_.release();
60 
61       if (is_terminating) {
62         return;
63       }
64     }
65 
66     // Regardless of whether an event was received or not, check for any
67     // transfers which have timed out and process them if so.
68     for (Context& context : client_transfers_) {
69       if (context.timed_out()) {
70         context.HandleEvent({.type = EventType::kClientTimeout});
71       }
72     }
73     for (Context& context : server_transfers_) {
74       if (context.timed_out()) {
75         context.HandleEvent({.type = EventType::kServerTimeout});
76       }
77     }
78   }
79 }
80 
GetNextTransferTimeout() const81 chrono::SystemClock::time_point TransferThread::GetNextTransferTimeout() const {
82   chrono::SystemClock::time_point timeout =
83       chrono::SystemClock::TimePointAfterAtLeast(kMaxTimeout);
84 
85   for (Context& context : client_transfers_) {
86     auto ctx_timeout = context.timeout();
87     if (ctx_timeout.has_value() && ctx_timeout.value() < timeout) {
88       timeout = ctx_timeout.value();
89     }
90   }
91   for (Context& context : server_transfers_) {
92     auto ctx_timeout = context.timeout();
93     if (ctx_timeout.has_value() && ctx_timeout.value() < timeout) {
94       timeout = ctx_timeout.value();
95     }
96   }
97 
98   return timeout;
99 }
100 
StartTransfer(TransferType type,ProtocolVersion version,uint32_t session_id,uint32_t resource_id,ConstByteSpan raw_chunk,stream::Stream * stream,const TransferParameters & max_parameters,Function<void (Status)> && on_completion,chrono::SystemClock::duration timeout,uint8_t max_retries,uint32_t max_lifetime_retries)101 void TransferThread::StartTransfer(TransferType type,
102                                    ProtocolVersion version,
103                                    uint32_t session_id,
104                                    uint32_t resource_id,
105                                    ConstByteSpan raw_chunk,
106                                    stream::Stream* stream,
107                                    const TransferParameters& max_parameters,
108                                    Function<void(Status)>&& on_completion,
109                                    chrono::SystemClock::duration timeout,
110                                    uint8_t max_retries,
111                                    uint32_t max_lifetime_retries) {
112   // Block until the last event has been processed.
113   next_event_ownership_.acquire();
114 
115   bool is_client_transfer = stream != nullptr;
116 
117   next_event_.type = is_client_transfer ? EventType::kNewClientTransfer
118                                         : EventType::kNewServerTransfer;
119 
120   if (!raw_chunk.empty()) {
121     std::memcpy(chunk_buffer_.data(), raw_chunk.data(), raw_chunk.size());
122   }
123 
124   next_event_.new_transfer = {
125       .type = type,
126       .protocol_version = version,
127       .session_id = session_id,
128       .resource_id = resource_id,
129       .max_parameters = &max_parameters,
130       .timeout = timeout,
131       .max_retries = max_retries,
132       .max_lifetime_retries = max_lifetime_retries,
133       .transfer_thread = this,
134       .raw_chunk_data = chunk_buffer_.data(),
135       .raw_chunk_size = raw_chunk.size(),
136   };
137 
138   staged_on_completion_ = std::move(on_completion);
139 
140   // The transfer is initialized with either a stream (client-side) or a handler
141   // (server-side). If no stream is provided, try to find a registered handler
142   // with the specified ID.
143   if (is_client_transfer) {
144     next_event_.new_transfer.stream = stream;
145     next_event_.new_transfer.rpc_writer = &static_cast<rpc::Writer&>(
146         type == TransferType::kTransmit ? client_write_stream_
147                                         : client_read_stream_);
148   } else {
149     auto handler = std::find_if(handlers_.begin(),
150                                 handlers_.end(),
151                                 [&](auto& h) { return h.id() == resource_id; });
152     if (handler != handlers_.end()) {
153       next_event_.new_transfer.handler = &*handler;
154       next_event_.new_transfer.rpc_writer = &static_cast<rpc::Writer&>(
155           type == TransferType::kTransmit ? server_read_stream_
156                                           : server_write_stream_);
157     } else {
158       // No handler exists for the transfer: return a NOT_FOUND.
159       next_event_.type = EventType::kSendStatusChunk;
160       next_event_.send_status_chunk = {
161           // Identify the status chunk using the requested resource ID rather
162           // than the session ID. In legacy, the two are the same, whereas in
163           // v2+ the client has not yet been assigned a session.
164           .session_id = resource_id,
165           .set_resource_id = version == ProtocolVersion::kVersionTwo,
166           .protocol_version = version,
167           .status = Status::NotFound().code(),
168           .stream = type == TransferType::kTransmit
169                         ? TransferStream::kServerRead
170                         : TransferStream::kServerWrite,
171       };
172     }
173   }
174 
175   event_notification_.release();
176 }
177 
ProcessChunk(EventType type,ConstByteSpan chunk)178 void TransferThread::ProcessChunk(EventType type, ConstByteSpan chunk) {
179   // If this assert is hit, there is a bug in the transfer implementation.
180   // Contexts' max_chunk_size_bytes fields should be set based on the size of
181   // chunk_buffer_.
182   PW_CHECK(chunk.size() <= chunk_buffer_.size(),
183            "Transfer received a larger chunk than it can handle.");
184 
185   Result<Chunk::Identifier> identifier = Chunk::ExtractIdentifier(chunk);
186   if (!identifier.ok()) {
187     PW_LOG_ERROR("Received a malformed chunk without a context identifier");
188     return;
189   }
190 
191   // Block until the last event has been processed.
192   next_event_ownership_.acquire();
193 
194   std::memcpy(chunk_buffer_.data(), chunk.data(), chunk.size());
195 
196   next_event_.type = type;
197   next_event_.chunk = {
198       .context_identifier = identifier->value(),
199       .match_resource_id = identifier->is_resource(),
200       .data = chunk_buffer_.data(),
201       .size = chunk.size(),
202   };
203 
204   event_notification_.release();
205 }
206 
EndTransfer(EventType type,uint32_t session_id,Status status,bool send_status_chunk)207 void TransferThread::EndTransfer(EventType type,
208                                  uint32_t session_id,
209                                  Status status,
210                                  bool send_status_chunk) {
211   // Block until the last event has been processed.
212   next_event_ownership_.acquire();
213 
214   next_event_.type = type;
215   next_event_.end_transfer = {
216       .session_id = session_id,
217       .status = status.code(),
218       .send_status_chunk = send_status_chunk,
219   };
220 
221   event_notification_.release();
222 }
223 
TransferHandlerEvent(EventType type,Handler & handler)224 void TransferThread::TransferHandlerEvent(EventType type, Handler& handler) {
225   // Block until the last event has been processed.
226   next_event_ownership_.acquire();
227 
228   next_event_.type = type;
229   if (type == EventType::kAddTransferHandler) {
230     next_event_.add_transfer_handler = &handler;
231   } else {
232     next_event_.remove_transfer_handler = &handler;
233   }
234 
235   event_notification_.release();
236 }
237 
HandleEvent(const internal::Event & event)238 void TransferThread::HandleEvent(const internal::Event& event) {
239   switch (event.type) {
240     case EventType::kTerminate:
241       // Terminate server contexts.
242       for (ServerContext& server_context : server_transfers_) {
243         server_context.HandleEvent(Event{
244             .type = EventType::kServerEndTransfer,
245             .end_transfer =
246                 EndTransferEvent{
247                     .session_id = server_context.session_id(),
248                     .status = Status::Aborted().code(),
249                     .send_status_chunk = false,
250                 },
251         });
252       }
253 
254       // Terminate client contexts.
255       for (ClientContext& client_context : client_transfers_) {
256         client_context.HandleEvent(Event{
257             .type = EventType::kClientEndTransfer,
258             .end_transfer =
259                 EndTransferEvent{
260                     .session_id = client_context.session_id(),
261                     .status = Status::Aborted().code(),
262                     .send_status_chunk = false,
263                 },
264         });
265       }
266 
267       // Cancel/Finish streams.
268       client_read_stream_.Cancel().IgnoreError();
269       client_write_stream_.Cancel().IgnoreError();
270       server_read_stream_.Finish(Status::Aborted()).IgnoreError();
271       server_write_stream_.Finish(Status::Aborted()).IgnoreError();
272       return;
273 
274     case EventType::kSendStatusChunk:
275       SendStatusChunk(event.send_status_chunk);
276       break;
277 
278     case EventType::kAddTransferHandler:
279       handlers_.push_front(*event.add_transfer_handler);
280       return;
281 
282     case EventType::kRemoveTransferHandler:
283       for (ServerContext& server_context : server_transfers_) {
284         if (server_context.handler() == event.remove_transfer_handler) {
285           server_context.HandleEvent(Event{
286               .type = EventType::kServerEndTransfer,
287               .end_transfer =
288                   EndTransferEvent{
289                       .session_id = server_context.session_id(),
290                       .status = Status::Aborted().code(),
291                       .send_status_chunk = false,
292                   },
293           });
294         }
295       }
296       handlers_.remove(*event.remove_transfer_handler);
297       return;
298 
299     case EventType::kNewClientTransfer:
300     case EventType::kNewServerTransfer:
301     case EventType::kClientChunk:
302     case EventType::kServerChunk:
303     case EventType::kClientTimeout:
304     case EventType::kServerTimeout:
305     case EventType::kClientEndTransfer:
306     case EventType::kServerEndTransfer:
307     default:
308       // Other events are handled by individual transfer contexts.
309       break;
310   }
311 
312   Context* ctx = FindContextForEvent(event);
313   if (ctx == nullptr) {
314     // No context was found. For new transfer events, report a
315     // RESOURCE_EXHAUSTED error with starting the transfer.
316     if (event.type == EventType::kNewClientTransfer) {
317       // On the client, invoke the completion callback directly.
318       staged_on_completion_(Status::ResourceExhausted());
319     } else if (event.type == EventType::kNewServerTransfer) {
320       // On the server, send a status chunk back to the client.
321       SendStatusChunk(
322           {.session_id = event.new_transfer.resource_id,
323            .set_resource_id = event.new_transfer.protocol_version ==
324                               ProtocolVersion::kVersionTwo,
325            .protocol_version = event.new_transfer.protocol_version,
326            .status = Status::ResourceExhausted().code(),
327            .stream = event.new_transfer.type == TransferType::kTransmit
328                          ? TransferStream::kServerRead
329                          : TransferStream::kServerWrite});
330     }
331     return;
332   }
333 
334   if (event.type == EventType::kNewClientTransfer) {
335     // TODO(frolv): This is terrible.
336     static_cast<ClientContext*>(ctx)->set_on_completion(
337         std::move(staged_on_completion_));
338   }
339 
340   ctx->HandleEvent(event);
341 }
342 
FindContextForEvent(const internal::Event & event) const343 Context* TransferThread::FindContextForEvent(
344     const internal::Event& event) const {
345   switch (event.type) {
346     case EventType::kNewClientTransfer:
347       return FindNewTransfer(client_transfers_, event.new_transfer.session_id);
348     case EventType::kNewServerTransfer:
349       return FindNewTransfer(server_transfers_, event.new_transfer.session_id);
350 
351     case EventType::kClientChunk:
352       if (event.chunk.match_resource_id) {
353         return FindActiveTransferByResourceId(client_transfers_,
354                                               event.chunk.context_identifier);
355       }
356       return FindActiveTransferByLegacyId(client_transfers_,
357                                           event.chunk.context_identifier);
358 
359     case EventType::kServerChunk:
360       if (event.chunk.match_resource_id) {
361         return FindActiveTransferByResourceId(server_transfers_,
362                                               event.chunk.context_identifier);
363       }
364       return FindActiveTransferByLegacyId(server_transfers_,
365                                           event.chunk.context_identifier);
366 
367     case EventType::kClientTimeout:  // Manually triggered client timeout
368       return FindActiveTransferByLegacyId(client_transfers_,
369                                           event.chunk.context_identifier);
370     case EventType::kServerTimeout:  // Manually triggered server timeout
371       return FindActiveTransferByLegacyId(server_transfers_,
372                                           event.chunk.context_identifier);
373 
374     case EventType::kClientEndTransfer:
375       return FindActiveTransferByLegacyId(client_transfers_,
376                                           event.end_transfer.session_id);
377     case EventType::kServerEndTransfer:
378       return FindActiveTransferByLegacyId(server_transfers_,
379                                           event.end_transfer.session_id);
380 
381     case EventType::kSendStatusChunk:
382     case EventType::kAddTransferHandler:
383     case EventType::kRemoveTransferHandler:
384     case EventType::kTerminate:
385     default:
386       return nullptr;
387   }
388 }
389 
SendStatusChunk(const internal::SendStatusChunkEvent & event)390 void TransferThread::SendStatusChunk(
391     const internal::SendStatusChunkEvent& event) {
392   rpc::Writer& destination = stream_for(event.stream);
393 
394   Chunk chunk =
395       Chunk::Final(event.protocol_version, event.session_id, event.status);
396 
397   if (event.set_resource_id) {
398     chunk.set_resource_id(event.session_id);
399   }
400 
401   Result<ConstByteSpan> result = chunk.Encode(chunk_buffer_);
402   if (!result.ok()) {
403     PW_LOG_ERROR("Failed to encode final chunk for transfer %u",
404                  static_cast<unsigned>(event.session_id));
405     return;
406   }
407 
408   if (!destination.Write(result.value()).ok()) {
409     PW_LOG_ERROR("Failed to send final chunk for transfer %u",
410                  static_cast<unsigned>(event.session_id));
411     return;
412   }
413 }
414 
415 }  // namespace pw::transfer::internal
416 
417 PW_MODIFY_DIAGNOSTICS_POP();
418