1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #define PW_LOG_MODULE_NAME "TRN"
16
17 #include "pw_transfer/transfer_thread.h"
18
19 #include "pw_assert/check.h"
20 #include "pw_log/log.h"
21 #include "pw_transfer/internal/chunk.h"
22
23 PW_MODIFY_DIAGNOSTICS_PUSH();
24 PW_MODIFY_DIAGNOSTIC(ignored, "-Wmissing-field-initializers");
25
26 namespace pw::transfer::internal {
27
Terminate()28 void TransferThread::Terminate() {
29 next_event_ownership_.acquire();
30 next_event_.type = EventType::kTerminate;
31 event_notification_.release();
32 }
33
SimulateTimeout(EventType type,uint32_t session_id)34 void TransferThread::SimulateTimeout(EventType type, uint32_t session_id) {
35 next_event_ownership_.acquire();
36
37 next_event_.type = type;
38 next_event_.chunk = {};
39 next_event_.chunk.context_identifier = session_id;
40
41 event_notification_.release();
42
43 WaitUntilEventIsProcessed();
44 }
45
Run()46 void TransferThread::Run() {
47 // Next event starts freed.
48 next_event_ownership_.release();
49
50 while (true) {
51 if (event_notification_.try_acquire_until(GetNextTransferTimeout())) {
52 HandleEvent(next_event_);
53
54 // Sample event type before we release ownership of next_event_.
55 bool is_terminating = next_event_.type == EventType::kTerminate;
56
57 // Finished processing the event. Allow the next_event struct to be
58 // overwritten.
59 next_event_ownership_.release();
60
61 if (is_terminating) {
62 return;
63 }
64 }
65
66 // Regardless of whether an event was received or not, check for any
67 // transfers which have timed out and process them if so.
68 for (Context& context : client_transfers_) {
69 if (context.timed_out()) {
70 context.HandleEvent({.type = EventType::kClientTimeout});
71 }
72 }
73 for (Context& context : server_transfers_) {
74 if (context.timed_out()) {
75 context.HandleEvent({.type = EventType::kServerTimeout});
76 }
77 }
78 }
79 }
80
GetNextTransferTimeout() const81 chrono::SystemClock::time_point TransferThread::GetNextTransferTimeout() const {
82 chrono::SystemClock::time_point timeout =
83 chrono::SystemClock::TimePointAfterAtLeast(kMaxTimeout);
84
85 for (Context& context : client_transfers_) {
86 auto ctx_timeout = context.timeout();
87 if (ctx_timeout.has_value() && ctx_timeout.value() < timeout) {
88 timeout = ctx_timeout.value();
89 }
90 }
91 for (Context& context : server_transfers_) {
92 auto ctx_timeout = context.timeout();
93 if (ctx_timeout.has_value() && ctx_timeout.value() < timeout) {
94 timeout = ctx_timeout.value();
95 }
96 }
97
98 return timeout;
99 }
100
StartTransfer(TransferType type,ProtocolVersion version,uint32_t session_id,uint32_t resource_id,ConstByteSpan raw_chunk,stream::Stream * stream,const TransferParameters & max_parameters,Function<void (Status)> && on_completion,chrono::SystemClock::duration timeout,uint8_t max_retries,uint32_t max_lifetime_retries)101 void TransferThread::StartTransfer(TransferType type,
102 ProtocolVersion version,
103 uint32_t session_id,
104 uint32_t resource_id,
105 ConstByteSpan raw_chunk,
106 stream::Stream* stream,
107 const TransferParameters& max_parameters,
108 Function<void(Status)>&& on_completion,
109 chrono::SystemClock::duration timeout,
110 uint8_t max_retries,
111 uint32_t max_lifetime_retries) {
112 // Block until the last event has been processed.
113 next_event_ownership_.acquire();
114
115 bool is_client_transfer = stream != nullptr;
116
117 next_event_.type = is_client_transfer ? EventType::kNewClientTransfer
118 : EventType::kNewServerTransfer;
119
120 if (!raw_chunk.empty()) {
121 std::memcpy(chunk_buffer_.data(), raw_chunk.data(), raw_chunk.size());
122 }
123
124 next_event_.new_transfer = {
125 .type = type,
126 .protocol_version = version,
127 .session_id = session_id,
128 .resource_id = resource_id,
129 .max_parameters = &max_parameters,
130 .timeout = timeout,
131 .max_retries = max_retries,
132 .max_lifetime_retries = max_lifetime_retries,
133 .transfer_thread = this,
134 .raw_chunk_data = chunk_buffer_.data(),
135 .raw_chunk_size = raw_chunk.size(),
136 };
137
138 staged_on_completion_ = std::move(on_completion);
139
140 // The transfer is initialized with either a stream (client-side) or a handler
141 // (server-side). If no stream is provided, try to find a registered handler
142 // with the specified ID.
143 if (is_client_transfer) {
144 next_event_.new_transfer.stream = stream;
145 next_event_.new_transfer.rpc_writer = &static_cast<rpc::Writer&>(
146 type == TransferType::kTransmit ? client_write_stream_
147 : client_read_stream_);
148 } else {
149 auto handler = std::find_if(handlers_.begin(),
150 handlers_.end(),
151 [&](auto& h) { return h.id() == resource_id; });
152 if (handler != handlers_.end()) {
153 next_event_.new_transfer.handler = &*handler;
154 next_event_.new_transfer.rpc_writer = &static_cast<rpc::Writer&>(
155 type == TransferType::kTransmit ? server_read_stream_
156 : server_write_stream_);
157 } else {
158 // No handler exists for the transfer: return a NOT_FOUND.
159 next_event_.type = EventType::kSendStatusChunk;
160 next_event_.send_status_chunk = {
161 // Identify the status chunk using the requested resource ID rather
162 // than the session ID. In legacy, the two are the same, whereas in
163 // v2+ the client has not yet been assigned a session.
164 .session_id = resource_id,
165 .set_resource_id = version == ProtocolVersion::kVersionTwo,
166 .protocol_version = version,
167 .status = Status::NotFound().code(),
168 .stream = type == TransferType::kTransmit
169 ? TransferStream::kServerRead
170 : TransferStream::kServerWrite,
171 };
172 }
173 }
174
175 event_notification_.release();
176 }
177
ProcessChunk(EventType type,ConstByteSpan chunk)178 void TransferThread::ProcessChunk(EventType type, ConstByteSpan chunk) {
179 // If this assert is hit, there is a bug in the transfer implementation.
180 // Contexts' max_chunk_size_bytes fields should be set based on the size of
181 // chunk_buffer_.
182 PW_CHECK(chunk.size() <= chunk_buffer_.size(),
183 "Transfer received a larger chunk than it can handle.");
184
185 Result<Chunk::Identifier> identifier = Chunk::ExtractIdentifier(chunk);
186 if (!identifier.ok()) {
187 PW_LOG_ERROR("Received a malformed chunk without a context identifier");
188 return;
189 }
190
191 // Block until the last event has been processed.
192 next_event_ownership_.acquire();
193
194 std::memcpy(chunk_buffer_.data(), chunk.data(), chunk.size());
195
196 next_event_.type = type;
197 next_event_.chunk = {
198 .context_identifier = identifier->value(),
199 .match_resource_id = identifier->is_resource(),
200 .data = chunk_buffer_.data(),
201 .size = chunk.size(),
202 };
203
204 event_notification_.release();
205 }
206
EndTransfer(EventType type,uint32_t session_id,Status status,bool send_status_chunk)207 void TransferThread::EndTransfer(EventType type,
208 uint32_t session_id,
209 Status status,
210 bool send_status_chunk) {
211 // Block until the last event has been processed.
212 next_event_ownership_.acquire();
213
214 next_event_.type = type;
215 next_event_.end_transfer = {
216 .session_id = session_id,
217 .status = status.code(),
218 .send_status_chunk = send_status_chunk,
219 };
220
221 event_notification_.release();
222 }
223
TransferHandlerEvent(EventType type,Handler & handler)224 void TransferThread::TransferHandlerEvent(EventType type, Handler& handler) {
225 // Block until the last event has been processed.
226 next_event_ownership_.acquire();
227
228 next_event_.type = type;
229 if (type == EventType::kAddTransferHandler) {
230 next_event_.add_transfer_handler = &handler;
231 } else {
232 next_event_.remove_transfer_handler = &handler;
233 }
234
235 event_notification_.release();
236 }
237
HandleEvent(const internal::Event & event)238 void TransferThread::HandleEvent(const internal::Event& event) {
239 switch (event.type) {
240 case EventType::kTerminate:
241 // Terminate server contexts.
242 for (ServerContext& server_context : server_transfers_) {
243 server_context.HandleEvent(Event{
244 .type = EventType::kServerEndTransfer,
245 .end_transfer =
246 EndTransferEvent{
247 .session_id = server_context.session_id(),
248 .status = Status::Aborted().code(),
249 .send_status_chunk = false,
250 },
251 });
252 }
253
254 // Terminate client contexts.
255 for (ClientContext& client_context : client_transfers_) {
256 client_context.HandleEvent(Event{
257 .type = EventType::kClientEndTransfer,
258 .end_transfer =
259 EndTransferEvent{
260 .session_id = client_context.session_id(),
261 .status = Status::Aborted().code(),
262 .send_status_chunk = false,
263 },
264 });
265 }
266
267 // Cancel/Finish streams.
268 client_read_stream_.Cancel().IgnoreError();
269 client_write_stream_.Cancel().IgnoreError();
270 server_read_stream_.Finish(Status::Aborted()).IgnoreError();
271 server_write_stream_.Finish(Status::Aborted()).IgnoreError();
272 return;
273
274 case EventType::kSendStatusChunk:
275 SendStatusChunk(event.send_status_chunk);
276 break;
277
278 case EventType::kAddTransferHandler:
279 handlers_.push_front(*event.add_transfer_handler);
280 return;
281
282 case EventType::kRemoveTransferHandler:
283 for (ServerContext& server_context : server_transfers_) {
284 if (server_context.handler() == event.remove_transfer_handler) {
285 server_context.HandleEvent(Event{
286 .type = EventType::kServerEndTransfer,
287 .end_transfer =
288 EndTransferEvent{
289 .session_id = server_context.session_id(),
290 .status = Status::Aborted().code(),
291 .send_status_chunk = false,
292 },
293 });
294 }
295 }
296 handlers_.remove(*event.remove_transfer_handler);
297 return;
298
299 case EventType::kNewClientTransfer:
300 case EventType::kNewServerTransfer:
301 case EventType::kClientChunk:
302 case EventType::kServerChunk:
303 case EventType::kClientTimeout:
304 case EventType::kServerTimeout:
305 case EventType::kClientEndTransfer:
306 case EventType::kServerEndTransfer:
307 default:
308 // Other events are handled by individual transfer contexts.
309 break;
310 }
311
312 Context* ctx = FindContextForEvent(event);
313 if (ctx == nullptr) {
314 // No context was found. For new transfer events, report a
315 // RESOURCE_EXHAUSTED error with starting the transfer.
316 if (event.type == EventType::kNewClientTransfer) {
317 // On the client, invoke the completion callback directly.
318 staged_on_completion_(Status::ResourceExhausted());
319 } else if (event.type == EventType::kNewServerTransfer) {
320 // On the server, send a status chunk back to the client.
321 SendStatusChunk(
322 {.session_id = event.new_transfer.resource_id,
323 .set_resource_id = event.new_transfer.protocol_version ==
324 ProtocolVersion::kVersionTwo,
325 .protocol_version = event.new_transfer.protocol_version,
326 .status = Status::ResourceExhausted().code(),
327 .stream = event.new_transfer.type == TransferType::kTransmit
328 ? TransferStream::kServerRead
329 : TransferStream::kServerWrite});
330 }
331 return;
332 }
333
334 if (event.type == EventType::kNewClientTransfer) {
335 // TODO(frolv): This is terrible.
336 static_cast<ClientContext*>(ctx)->set_on_completion(
337 std::move(staged_on_completion_));
338 }
339
340 ctx->HandleEvent(event);
341 }
342
FindContextForEvent(const internal::Event & event) const343 Context* TransferThread::FindContextForEvent(
344 const internal::Event& event) const {
345 switch (event.type) {
346 case EventType::kNewClientTransfer:
347 return FindNewTransfer(client_transfers_, event.new_transfer.session_id);
348 case EventType::kNewServerTransfer:
349 return FindNewTransfer(server_transfers_, event.new_transfer.session_id);
350
351 case EventType::kClientChunk:
352 if (event.chunk.match_resource_id) {
353 return FindActiveTransferByResourceId(client_transfers_,
354 event.chunk.context_identifier);
355 }
356 return FindActiveTransferByLegacyId(client_transfers_,
357 event.chunk.context_identifier);
358
359 case EventType::kServerChunk:
360 if (event.chunk.match_resource_id) {
361 return FindActiveTransferByResourceId(server_transfers_,
362 event.chunk.context_identifier);
363 }
364 return FindActiveTransferByLegacyId(server_transfers_,
365 event.chunk.context_identifier);
366
367 case EventType::kClientTimeout: // Manually triggered client timeout
368 return FindActiveTransferByLegacyId(client_transfers_,
369 event.chunk.context_identifier);
370 case EventType::kServerTimeout: // Manually triggered server timeout
371 return FindActiveTransferByLegacyId(server_transfers_,
372 event.chunk.context_identifier);
373
374 case EventType::kClientEndTransfer:
375 return FindActiveTransferByLegacyId(client_transfers_,
376 event.end_transfer.session_id);
377 case EventType::kServerEndTransfer:
378 return FindActiveTransferByLegacyId(server_transfers_,
379 event.end_transfer.session_id);
380
381 case EventType::kSendStatusChunk:
382 case EventType::kAddTransferHandler:
383 case EventType::kRemoveTransferHandler:
384 case EventType::kTerminate:
385 default:
386 return nullptr;
387 }
388 }
389
SendStatusChunk(const internal::SendStatusChunkEvent & event)390 void TransferThread::SendStatusChunk(
391 const internal::SendStatusChunkEvent& event) {
392 rpc::Writer& destination = stream_for(event.stream);
393
394 Chunk chunk =
395 Chunk::Final(event.protocol_version, event.session_id, event.status);
396
397 if (event.set_resource_id) {
398 chunk.set_resource_id(event.session_id);
399 }
400
401 Result<ConstByteSpan> result = chunk.Encode(chunk_buffer_);
402 if (!result.ok()) {
403 PW_LOG_ERROR("Failed to encode final chunk for transfer %u",
404 static_cast<unsigned>(event.session_id));
405 return;
406 }
407
408 if (!destination.Write(result.value()).ok()) {
409 PW_LOG_ERROR("Failed to send final chunk for transfer %u",
410 static_cast<unsigned>(event.session_id));
411 return;
412 }
413 }
414
415 } // namespace pw::transfer::internal
416
417 PW_MODIFY_DIAGNOSTICS_POP();
418