• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define TRACE_TAG USB
18 
19 #include "sysdeps.h"
20 
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <string.h>
25 #include <sys/ioctl.h>
26 #include <sys/types.h>
27 #include <unistd.h>
28 
29 #include <linux/usb/functionfs.h>
30 #include <sys/eventfd.h>
31 
32 #include <algorithm>
33 #include <array>
34 #include <future>
35 #include <memory>
36 #include <mutex>
37 #include <optional>
38 #include <vector>
39 
40 #include <asyncio/AsyncIO.h>
41 
42 #include <android-base/logging.h>
43 #include <android-base/macros.h>
44 #include <android-base/properties.h>
45 #include <android-base/thread_annotations.h>
46 
47 #include <adbd/usb.h>
48 
49 #include "adb_unique_fd.h"
50 #include "adb_utils.h"
51 #include "sysdeps/chrono.h"
52 #include "transport.h"
53 #include "types.h"
54 
55 using android::base::StringPrintf;
56 
57 // We can't find out whether we have support for AIO on ffs endpoints until we submit a read.
58 static std::optional<bool> gFfsAioSupported;
59 
60 // Not all USB controllers support operations larger than 16k, so don't go above that.
61 // Also, each submitted operation does an allocation in the kernel of that size, so we want to
62 // minimize our queue depth while still maintaining a deep enough queue to keep the USB stack fed.
63 static constexpr size_t kUsbReadQueueDepth = 8;
64 static constexpr size_t kUsbReadSize = 4 * PAGE_SIZE;
65 
66 static constexpr size_t kUsbWriteQueueDepth = 8;
67 static constexpr size_t kUsbWriteSize = 4 * PAGE_SIZE;
68 
to_string(enum usb_functionfs_event_type type)69 static const char* to_string(enum usb_functionfs_event_type type) {
70     switch (type) {
71         case FUNCTIONFS_BIND:
72             return "FUNCTIONFS_BIND";
73         case FUNCTIONFS_UNBIND:
74             return "FUNCTIONFS_UNBIND";
75         case FUNCTIONFS_ENABLE:
76             return "FUNCTIONFS_ENABLE";
77         case FUNCTIONFS_DISABLE:
78             return "FUNCTIONFS_DISABLE";
79         case FUNCTIONFS_SETUP:
80             return "FUNCTIONFS_SETUP";
81         case FUNCTIONFS_SUSPEND:
82             return "FUNCTIONFS_SUSPEND";
83         case FUNCTIONFS_RESUME:
84             return "FUNCTIONFS_RESUME";
85     }
86 }
87 
88 enum class TransferDirection : uint64_t {
89     READ = 0,
90     WRITE = 1,
91 };
92 
93 struct TransferId {
94     TransferDirection direction : 1;
95     uint64_t id : 63;
96 
TransferIdTransferId97     TransferId() : TransferId(TransferDirection::READ, 0) {}
98 
99   private:
TransferIdTransferId100     TransferId(TransferDirection direction, uint64_t id) : direction(direction), id(id) {}
101 
102   public:
operator uint64_tTransferId103     explicit operator uint64_t() const {
104         uint64_t result;
105         static_assert(sizeof(*this) == sizeof(result));
106         memcpy(&result, this, sizeof(*this));
107         return result;
108     }
109 
readTransferId110     static TransferId read(uint64_t id) { return TransferId(TransferDirection::READ, id); }
writeTransferId111     static TransferId write(uint64_t id) { return TransferId(TransferDirection::WRITE, id); }
112 
from_valueTransferId113     static TransferId from_value(uint64_t value) {
114         TransferId result;
115         memcpy(&result, &value, sizeof(value));
116         return result;
117     }
118 };
119 
120 struct IoBlock {
121     bool pending = false;
122     struct iocb control = {};
123     std::shared_ptr<Block> payload;
124 
idIoBlock125     TransferId id() const { return TransferId::from_value(control.aio_data); }
126 };
127 
128 struct ScopedAioContext {
129     ScopedAioContext() = default;
~ScopedAioContextScopedAioContext130     ~ScopedAioContext() { reset(); }
131 
ScopedAioContextScopedAioContext132     ScopedAioContext(ScopedAioContext&& move) { reset(move.release()); }
133     ScopedAioContext(const ScopedAioContext& copy) = delete;
134 
operator =ScopedAioContext135     ScopedAioContext& operator=(ScopedAioContext&& move) {
136         reset(move.release());
137         return *this;
138     }
139     ScopedAioContext& operator=(const ScopedAioContext& copy) = delete;
140 
CreateScopedAioContext141     static ScopedAioContext Create(size_t max_events) {
142         aio_context_t ctx = 0;
143         if (io_setup(max_events, &ctx) != 0) {
144             PLOG(FATAL) << "failed to create aio_context_t";
145         }
146         ScopedAioContext result;
147         result.reset(ctx);
148         return result;
149     }
150 
releaseScopedAioContext151     aio_context_t release() {
152         aio_context_t result = context_;
153         context_ = 0;
154         return result;
155     }
156 
resetScopedAioContext157     void reset(aio_context_t new_context = 0) {
158         if (context_ != 0) {
159             io_destroy(context_);
160         }
161 
162         context_ = new_context;
163     }
164 
getScopedAioContext165     aio_context_t get() { return context_; }
166 
167   private:
168     aio_context_t context_ = 0;
169 };
170 
171 struct UsbFfsConnection : public Connection {
UsbFfsConnectionUsbFfsConnection172     UsbFfsConnection(unique_fd control, unique_fd read, unique_fd write,
173                      std::promise<void> destruction_notifier)
174         : worker_started_(false),
175           stopped_(false),
176           destruction_notifier_(std::move(destruction_notifier)),
177           control_fd_(std::move(control)),
178           read_fd_(std::move(read)),
179           write_fd_(std::move(write)) {
180         LOG(INFO) << "UsbFfsConnection constructed";
181         worker_event_fd_.reset(eventfd(0, EFD_CLOEXEC));
182         if (worker_event_fd_ == -1) {
183             PLOG(FATAL) << "failed to create eventfd";
184         }
185 
186         monitor_event_fd_.reset(eventfd(0, EFD_CLOEXEC));
187         if (monitor_event_fd_ == -1) {
188             PLOG(FATAL) << "failed to create eventfd";
189         }
190 
191         aio_context_ = ScopedAioContext::Create(kUsbReadQueueDepth + kUsbWriteQueueDepth);
192     }
193 
~UsbFfsConnectionUsbFfsConnection194     ~UsbFfsConnection() {
195         LOG(INFO) << "UsbFfsConnection being destroyed";
196         Stop();
197         monitor_thread_.join();
198 
199         // We need to explicitly close our file descriptors before we notify our destruction,
200         // because the thread listening on the future will immediately try to reopen the endpoint.
201         aio_context_.reset();
202         control_fd_.reset();
203         read_fd_.reset();
204         write_fd_.reset();
205 
206         destruction_notifier_.set_value();
207     }
208 
WriteUsbFfsConnection209     virtual bool Write(std::unique_ptr<apacket> packet) override final {
210         LOG(DEBUG) << "USB write: " << dump_header(&packet->msg);
211         Block header(sizeof(packet->msg));
212         memcpy(header.data(), &packet->msg, sizeof(packet->msg));
213 
214         std::lock_guard<std::mutex> lock(write_mutex_);
215         write_requests_.push_back(CreateWriteBlock(std::move(header), next_write_id_++));
216         if (!packet->payload.empty()) {
217             // The kernel attempts to allocate a contiguous block of memory for each write,
218             // which can fail if the write is large and the kernel heap is fragmented.
219             // Split large writes into smaller chunks to avoid this.
220             std::shared_ptr<Block> payload = std::make_shared<Block>(std::move(packet->payload));
221             size_t offset = 0;
222             size_t len = payload->size();
223 
224             while (len > 0) {
225                 size_t write_size = std::min(kUsbWriteSize, len);
226                 write_requests_.push_back(
227                         CreateWriteBlock(payload, offset, write_size, next_write_id_++));
228                 len -= write_size;
229                 offset += write_size;
230             }
231         }
232         SubmitWrites();
233         return true;
234     }
235 
StartUsbFfsConnection236     virtual void Start() override final { StartMonitor(); }
237 
StopUsbFfsConnection238     virtual void Stop() override final {
239         if (stopped_.exchange(true)) {
240             return;
241         }
242         stopped_ = true;
243         uint64_t notify = 1;
244         ssize_t rc = adb_write(worker_event_fd_.get(), &notify, sizeof(notify));
245         if (rc < 0) {
246             PLOG(FATAL) << "failed to notify worker eventfd to stop UsbFfsConnection";
247         }
248         CHECK_EQ(static_cast<size_t>(rc), sizeof(notify));
249 
250         rc = adb_write(monitor_event_fd_.get(), &notify, sizeof(notify));
251         if (rc < 0) {
252             PLOG(FATAL) << "failed to notify monitor eventfd to stop UsbFfsConnection";
253         }
254 
255         CHECK_EQ(static_cast<size_t>(rc), sizeof(notify));
256     }
257 
258   private:
StartMonitorUsbFfsConnection259     void StartMonitor() {
260         // This is a bit of a mess.
261         // It's possible for io_submit to end up blocking, if we call it as the endpoint
262         // becomes disabled. Work around this by having a monitor thread to listen for functionfs
263         // lifecycle events. If we notice an error condition (either we've become disabled, or we
264         // were never enabled in the first place), we send interruption signals to the worker thread
265         // until it dies, and then report failure to the transport via HandleError, which will
266         // eventually result in the transport being destroyed, which will result in UsbFfsConnection
267         // being destroyed, which unblocks the open thread and restarts this entire process.
268         static std::once_flag handler_once;
269         std::call_once(handler_once, []() { signal(kInterruptionSignal, [](int) {}); });
270 
271         monitor_thread_ = std::thread([this]() {
272             adb_thread_setname("UsbFfs-monitor");
273 
274             bool bound = false;
275             bool enabled = false;
276             bool running = true;
277             while (running) {
278                 adb_pollfd pfd[2] = {
279                   { .fd = control_fd_.get(), .events = POLLIN, .revents = 0 },
280                   { .fd = monitor_event_fd_.get(), .events = POLLIN, .revents = 0 },
281                 };
282 
283                 // If we don't see our first bind within a second, try again.
284                 int timeout_ms = bound ? -1 : 1000;
285 
286                 int rc = TEMP_FAILURE_RETRY(adb_poll(pfd, 2, timeout_ms));
287                 if (rc == -1) {
288                     PLOG(FATAL) << "poll on USB control fd failed";
289                 } else if (rc == 0) {
290                     LOG(WARNING) << "timed out while waiting for FUNCTIONFS_BIND, trying again";
291                     break;
292                 }
293 
294                 if (pfd[1].revents) {
295                     // We were told to die.
296                     break;
297                 }
298 
299                 struct usb_functionfs_event event;
300                 rc = TEMP_FAILURE_RETRY(adb_read(control_fd_.get(), &event, sizeof(event)));
301                 if (rc == -1) {
302                     PLOG(FATAL) << "failed to read functionfs event";
303                 } else if (rc == 0) {
304                     LOG(WARNING) << "hit EOF on functionfs control fd";
305                     break;
306                 } else if (rc != sizeof(event)) {
307                     LOG(FATAL) << "read functionfs event of unexpected size, expected "
308                                << sizeof(event) << ", got " << rc;
309                 }
310 
311                 LOG(INFO) << "USB event: "
312                           << to_string(static_cast<usb_functionfs_event_type>(event.type));
313 
314                 switch (event.type) {
315                     case FUNCTIONFS_BIND:
316                         if (bound) {
317                             LOG(WARNING) << "received FUNCTIONFS_BIND while already bound?";
318                             running = false;
319                             break;
320                         }
321 
322                         if (enabled) {
323                             LOG(WARNING) << "received FUNCTIONFS_BIND while already enabled?";
324                             running = false;
325                             break;
326                         }
327 
328                         bound = true;
329                         break;
330 
331                     case FUNCTIONFS_ENABLE:
332                         if (!bound) {
333                             LOG(WARNING) << "received FUNCTIONFS_ENABLE while not bound?";
334                             running = false;
335                             break;
336                         }
337 
338                         if (enabled) {
339                             LOG(WARNING) << "received FUNCTIONFS_ENABLE while already enabled?";
340                             running = false;
341                             break;
342                         }
343 
344                         enabled = true;
345                         StartWorker();
346                         break;
347 
348                     case FUNCTIONFS_DISABLE:
349                         if (!bound) {
350                             LOG(WARNING) << "received FUNCTIONFS_DISABLE while not bound?";
351                         }
352 
353                         if (!enabled) {
354                             LOG(WARNING) << "received FUNCTIONFS_DISABLE while not enabled?";
355                         }
356 
357                         enabled = false;
358                         running = false;
359                         break;
360 
361                     case FUNCTIONFS_UNBIND:
362                         if (enabled) {
363                             LOG(WARNING) << "received FUNCTIONFS_UNBIND while still enabled?";
364                         }
365 
366                         if (!bound) {
367                             LOG(WARNING) << "received FUNCTIONFS_UNBIND when not bound?";
368                         }
369 
370                         bound = false;
371                         running = false;
372                         break;
373 
374                     case FUNCTIONFS_SETUP: {
375                         LOG(INFO) << "received FUNCTIONFS_SETUP control transfer: bRequestType = "
376                                   << static_cast<int>(event.u.setup.bRequestType)
377                                   << ", bRequest = " << static_cast<int>(event.u.setup.bRequest)
378                                   << ", wValue = " << static_cast<int>(event.u.setup.wValue)
379                                   << ", wIndex = " << static_cast<int>(event.u.setup.wIndex)
380                                   << ", wLength = " << static_cast<int>(event.u.setup.wLength);
381 
382                         if ((event.u.setup.bRequestType & USB_DIR_IN)) {
383                             LOG(INFO) << "acking device-to-host control transfer";
384                             ssize_t rc = adb_write(control_fd_.get(), "", 0);
385                             if (rc != 0) {
386                                 PLOG(ERROR) << "failed to write empty packet to host";
387                                 break;
388                             }
389                         } else {
390                             std::string buf;
391                             buf.resize(event.u.setup.wLength + 1);
392 
393                             ssize_t rc = adb_read(control_fd_.get(), buf.data(), buf.size());
394                             if (rc != event.u.setup.wLength) {
395                                 LOG(ERROR)
396                                         << "read " << rc
397                                         << " bytes when trying to read control request, expected "
398                                         << event.u.setup.wLength;
399                             }
400 
401                             LOG(INFO) << "control request contents: " << buf;
402                             break;
403                         }
404                     }
405                 }
406             }
407 
408             StopWorker();
409             HandleError("monitor thread finished");
410         });
411     }
412 
StartWorkerUsbFfsConnection413     void StartWorker() {
414         CHECK(!worker_started_);
415         worker_started_ = true;
416         worker_thread_ = std::thread([this]() {
417             adb_thread_setname("UsbFfs-worker");
418             for (size_t i = 0; i < kUsbReadQueueDepth; ++i) {
419                 read_requests_[i] = CreateReadBlock(next_read_id_++);
420                 if (!SubmitRead(&read_requests_[i])) {
421                     return;
422                 }
423             }
424 
425             while (!stopped_) {
426                 uint64_t dummy;
427                 ssize_t rc = adb_read(worker_event_fd_.get(), &dummy, sizeof(dummy));
428                 if (rc == -1) {
429                     PLOG(FATAL) << "failed to read from eventfd";
430                 } else if (rc == 0) {
431                     LOG(FATAL) << "hit EOF on eventfd";
432                 }
433 
434                 ReadEvents();
435             }
436         });
437     }
438 
StopWorkerUsbFfsConnection439     void StopWorker() {
440         if (!worker_started_) {
441             return;
442         }
443 
444         pthread_t worker_thread_handle = worker_thread_.native_handle();
445         while (true) {
446             int rc = pthread_kill(worker_thread_handle, kInterruptionSignal);
447             if (rc != 0) {
448                 LOG(ERROR) << "failed to send interruption signal to worker: " << strerror(rc);
449                 break;
450             }
451 
452             std::this_thread::sleep_for(100ms);
453 
454             rc = pthread_kill(worker_thread_handle, 0);
455             if (rc == 0) {
456                 continue;
457             } else if (rc == ESRCH) {
458                 break;
459             } else {
460                 LOG(ERROR) << "failed to send interruption signal to worker: " << strerror(rc);
461             }
462         }
463 
464         worker_thread_.join();
465     }
466 
PrepareReadBlockUsbFfsConnection467     void PrepareReadBlock(IoBlock* block, uint64_t id) {
468         block->pending = false;
469         block->payload = std::make_shared<Block>(kUsbReadSize);
470         block->control.aio_data = static_cast<uint64_t>(TransferId::read(id));
471         block->control.aio_buf = reinterpret_cast<uintptr_t>(block->payload->data());
472         block->control.aio_nbytes = block->payload->size();
473     }
474 
CreateReadBlockUsbFfsConnection475     IoBlock CreateReadBlock(uint64_t id) {
476         IoBlock block;
477         PrepareReadBlock(&block, id);
478         block.control.aio_rw_flags = 0;
479         block.control.aio_lio_opcode = IOCB_CMD_PREAD;
480         block.control.aio_reqprio = 0;
481         block.control.aio_fildes = read_fd_.get();
482         block.control.aio_offset = 0;
483         block.control.aio_flags = IOCB_FLAG_RESFD;
484         block.control.aio_resfd = worker_event_fd_.get();
485         return block;
486     }
487 
ReadEventsUsbFfsConnection488     void ReadEvents() {
489         static constexpr size_t kMaxEvents = kUsbReadQueueDepth + kUsbWriteQueueDepth;
490         struct io_event events[kMaxEvents];
491         struct timespec timeout = {.tv_sec = 0, .tv_nsec = 0};
492         int rc = io_getevents(aio_context_.get(), 0, kMaxEvents, events, &timeout);
493         if (rc == -1) {
494             HandleError(StringPrintf("io_getevents failed while reading: %s", strerror(errno)));
495             return;
496         }
497 
498         for (int event_idx = 0; event_idx < rc; ++event_idx) {
499             auto& event = events[event_idx];
500             TransferId id = TransferId::from_value(event.data);
501 
502             if (event.res < 0) {
503                 std::string error =
504                         StringPrintf("%s %" PRIu64 " failed with error %s",
505                                      id.direction == TransferDirection::READ ? "read" : "write",
506                                      id.id, strerror(-event.res));
507                 HandleError(error);
508                 return;
509             }
510 
511             if (id.direction == TransferDirection::READ) {
512                 HandleRead(id, event.res);
513             } else {
514                 HandleWrite(id);
515             }
516         }
517     }
518 
HandleReadUsbFfsConnection519     void HandleRead(TransferId id, int64_t size) {
520         uint64_t read_idx = id.id % kUsbReadQueueDepth;
521         IoBlock* block = &read_requests_[read_idx];
522         block->pending = false;
523         block->payload->resize(size);
524 
525         // Notification for completed reads can be received out of order.
526         if (block->id().id != needed_read_id_) {
527             LOG(VERBOSE) << "read " << block->id().id << " completed while waiting for "
528                          << needed_read_id_;
529             return;
530         }
531 
532         for (uint64_t id = needed_read_id_;; ++id) {
533             size_t read_idx = id % kUsbReadQueueDepth;
534             IoBlock* current_block = &read_requests_[read_idx];
535             if (current_block->pending) {
536                 break;
537             }
538             ProcessRead(current_block);
539             ++needed_read_id_;
540         }
541     }
542 
ProcessReadUsbFfsConnection543     void ProcessRead(IoBlock* block) {
544         if (!block->payload->empty()) {
545             if (!incoming_header_.has_value()) {
546                 CHECK_EQ(sizeof(amessage), block->payload->size());
547                 amessage msg;
548                 memcpy(&msg, block->payload->data(), sizeof(amessage));
549                 LOG(DEBUG) << "USB read:" << dump_header(&msg);
550                 incoming_header_ = msg;
551             } else {
552                 size_t bytes_left = incoming_header_->data_length - incoming_payload_.size();
553                 Block payload = std::move(*block->payload);
554                 CHECK_LE(payload.size(), bytes_left);
555                 incoming_payload_.append(std::make_unique<Block>(std::move(payload)));
556             }
557 
558             if (incoming_header_->data_length == incoming_payload_.size()) {
559                 auto packet = std::make_unique<apacket>();
560                 packet->msg = *incoming_header_;
561 
562                 // TODO: Make apacket contain an IOVector so we don't have to coalesce.
563                 packet->payload = incoming_payload_.coalesce();
564                 read_callback_(this, std::move(packet));
565 
566                 incoming_header_.reset();
567                 incoming_payload_.clear();
568             }
569         }
570 
571         PrepareReadBlock(block, block->id().id + kUsbReadQueueDepth);
572         SubmitRead(block);
573     }
574 
SubmitReadUsbFfsConnection575     bool SubmitRead(IoBlock* block) {
576         block->pending = true;
577         struct iocb* iocb = &block->control;
578         if (io_submit(aio_context_.get(), 1, &iocb) != 1) {
579             if (errno == EINVAL && !gFfsAioSupported.has_value()) {
580                 HandleError("failed to submit first read, AIO on FFS not supported");
581                 gFfsAioSupported = false;
582                 return false;
583             }
584 
585             HandleError(StringPrintf("failed to submit read: %s", strerror(errno)));
586             return false;
587         }
588 
589         gFfsAioSupported = true;
590         return true;
591     }
592 
HandleWriteUsbFfsConnection593     void HandleWrite(TransferId id) {
594         std::lock_guard<std::mutex> lock(write_mutex_);
595         auto it =
596                 std::find_if(write_requests_.begin(), write_requests_.end(), [id](const auto& req) {
597                     return static_cast<uint64_t>(req->id()) == static_cast<uint64_t>(id);
598                 });
599         CHECK(it != write_requests_.end());
600 
601         write_requests_.erase(it);
602         size_t outstanding_writes = --writes_submitted_;
603         LOG(DEBUG) << "USB write: reaped, down to " << outstanding_writes;
604 
605         SubmitWrites();
606     }
607 
CreateWriteBlockUsbFfsConnection608     std::unique_ptr<IoBlock> CreateWriteBlock(std::shared_ptr<Block> payload, size_t offset,
609                                               size_t len, uint64_t id) {
610         auto block = std::make_unique<IoBlock>();
611         block->payload = std::move(payload);
612         block->control.aio_data = static_cast<uint64_t>(TransferId::write(id));
613         block->control.aio_rw_flags = 0;
614         block->control.aio_lio_opcode = IOCB_CMD_PWRITE;
615         block->control.aio_reqprio = 0;
616         block->control.aio_fildes = write_fd_.get();
617         block->control.aio_buf = reinterpret_cast<uintptr_t>(block->payload->data() + offset);
618         block->control.aio_nbytes = len;
619         block->control.aio_offset = 0;
620         block->control.aio_flags = IOCB_FLAG_RESFD;
621         block->control.aio_resfd = worker_event_fd_.get();
622         return block;
623     }
624 
CreateWriteBlockUsbFfsConnection625     std::unique_ptr<IoBlock> CreateWriteBlock(Block payload, uint64_t id) {
626         std::shared_ptr<Block> block = std::make_shared<Block>(std::move(payload));
627         size_t len = block->size();
628         return CreateWriteBlock(std::move(block), 0, len, id);
629     }
630 
SubmitWritesUsbFfsConnection631     void SubmitWrites() REQUIRES(write_mutex_) {
632         if (writes_submitted_ == kUsbWriteQueueDepth) {
633             return;
634         }
635 
636         ssize_t writes_to_submit = std::min(kUsbWriteQueueDepth - writes_submitted_,
637                                             write_requests_.size() - writes_submitted_);
638         CHECK_GE(writes_to_submit, 0);
639         if (writes_to_submit == 0) {
640             return;
641         }
642 
643         struct iocb* iocbs[kUsbWriteQueueDepth];
644         for (int i = 0; i < writes_to_submit; ++i) {
645             CHECK(!write_requests_[writes_submitted_ + i]->pending);
646             write_requests_[writes_submitted_ + i]->pending = true;
647             iocbs[i] = &write_requests_[writes_submitted_ + i]->control;
648             LOG(VERBOSE) << "submitting write_request " << static_cast<void*>(iocbs[i]);
649         }
650 
651         writes_submitted_ += writes_to_submit;
652 
653         int rc = io_submit(aio_context_.get(), writes_to_submit, iocbs);
654         if (rc == -1) {
655             HandleError(StringPrintf("failed to submit write requests: %s", strerror(errno)));
656             return;
657         } else if (rc != writes_to_submit) {
658             LOG(FATAL) << "failed to submit all writes: wanted to submit " << writes_to_submit
659                        << ", actually submitted " << rc;
660         }
661     }
662 
HandleErrorUsbFfsConnection663     void HandleError(const std::string& error) {
664         std::call_once(error_flag_, [&]() {
665             error_callback_(this, error);
666             if (!stopped_) {
667                 Stop();
668             }
669         });
670     }
671 
672     std::thread monitor_thread_;
673 
674     bool worker_started_;
675     std::thread worker_thread_;
676 
677     std::atomic<bool> stopped_;
678     std::promise<void> destruction_notifier_;
679     std::once_flag error_flag_;
680 
681     unique_fd worker_event_fd_;
682     unique_fd monitor_event_fd_;
683 
684     ScopedAioContext aio_context_;
685     unique_fd control_fd_;
686     unique_fd read_fd_;
687     unique_fd write_fd_;
688 
689     std::optional<amessage> incoming_header_;
690     IOVector incoming_payload_;
691 
692     std::array<IoBlock, kUsbReadQueueDepth> read_requests_;
693     IOVector read_data_;
694 
695     // ID of the next request that we're going to send out.
696     size_t next_read_id_ = 0;
697 
698     // ID of the next packet we're waiting for.
699     size_t needed_read_id_ = 0;
700 
701     std::mutex write_mutex_;
702     std::deque<std::unique_ptr<IoBlock>> write_requests_ GUARDED_BY(write_mutex_);
703     size_t next_write_id_ GUARDED_BY(write_mutex_) = 0;
704     size_t writes_submitted_ GUARDED_BY(write_mutex_) = 0;
705 
706     static constexpr int kInterruptionSignal = SIGUSR1;
707 };
708 
709 void usb_init_legacy();
710 
usb_ffs_open_thread()711 static void usb_ffs_open_thread() {
712     adb_thread_setname("usb ffs open");
713 
714     while (true) {
715         if (gFfsAioSupported.has_value() && !gFfsAioSupported.value()) {
716             LOG(INFO) << "failed to use nonblocking ffs, falling back to legacy";
717             return usb_init_legacy();
718         }
719 
720         unique_fd control;
721         unique_fd bulk_out;
722         unique_fd bulk_in;
723         if (!open_functionfs(&control, &bulk_out, &bulk_in)) {
724             std::this_thread::sleep_for(1s);
725             continue;
726         }
727 
728         atransport* transport = new atransport();
729         transport->serial = "UsbFfs";
730         std::promise<void> destruction_notifier;
731         std::future<void> future = destruction_notifier.get_future();
732         transport->SetConnection(std::make_unique<UsbFfsConnection>(
733                 std::move(control), std::move(bulk_out), std::move(bulk_in),
734                 std::move(destruction_notifier)));
735         register_transport(transport);
736         future.wait();
737     }
738 }
739 
usb_init()740 void usb_init() {
741     bool use_nonblocking = android::base::GetBoolProperty(
742             "persist.adb.nonblocking_ffs",
743             android::base::GetBoolProperty("ro.adb.nonblocking_ffs", true));
744 
745     if (use_nonblocking) {
746         std::thread(usb_ffs_open_thread).detach();
747     } else {
748         usb_init_legacy();
749     }
750 }
751