• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1  /*
2   * Copyright (C) 2018 The Android Open Source Project
3   *
4   * Licensed under the Apache License, Version 2.0 (the "License");
5   * you may not use this file except in compliance with the License.
6   * You may obtain a copy of the License at
7   *
8   *      http://www.apache.org/licenses/LICENSE-2.0
9   *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  #define TRACE_TAG USB
18  
19  #include "sysdeps.h"
20  
21  #include <errno.h>
22  #include <inttypes.h>
23  #include <stdio.h>
24  #include <stdlib.h>
25  #include <string.h>
26  #include <sys/ioctl.h>
27  #include <sys/types.h>
28  #include <unistd.h>
29  
30  #include <linux/usb/functionfs.h>
31  #include <sys/eventfd.h>
32  
33  #include <algorithm>
34  #include <array>
35  #include <future>
36  #include <memory>
37  #include <mutex>
38  #include <optional>
39  #include <vector>
40  
41  #include <asyncio/AsyncIO.h>
42  
43  #include <android-base/logging.h>
44  #include <android-base/macros.h>
45  #include <android-base/properties.h>
46  #include <android-base/thread_annotations.h>
47  
48  #include "adb_unique_fd.h"
49  #include "adb_utils.h"
50  #include "daemon/usb_ffs.h"
51  #include "sysdeps/chrono.h"
52  #include "transport.h"
53  #include "types.h"
54  
55  using android::base::StringPrintf;
56  
57  // Not all USB controllers support operations larger than 16k, so don't go above that.
58  // Also, each submitted operation does an allocation in the kernel of that size, so we want to
59  // minimize our queue depth while still maintaining a deep enough queue to keep the USB stack fed.
60  static constexpr size_t kUsbReadQueueDepth = 8;
61  static constexpr size_t kUsbReadSize = 4 * PAGE_SIZE;
62  
63  static constexpr size_t kUsbWriteQueueDepth = 8;
64  static constexpr size_t kUsbWriteSize = 4 * PAGE_SIZE;
65  
to_string(enum usb_functionfs_event_type type)66  static const char* to_string(enum usb_functionfs_event_type type) {
67      switch (type) {
68          case FUNCTIONFS_BIND:
69              return "FUNCTIONFS_BIND";
70          case FUNCTIONFS_UNBIND:
71              return "FUNCTIONFS_UNBIND";
72          case FUNCTIONFS_ENABLE:
73              return "FUNCTIONFS_ENABLE";
74          case FUNCTIONFS_DISABLE:
75              return "FUNCTIONFS_DISABLE";
76          case FUNCTIONFS_SETUP:
77              return "FUNCTIONFS_SETUP";
78          case FUNCTIONFS_SUSPEND:
79              return "FUNCTIONFS_SUSPEND";
80          case FUNCTIONFS_RESUME:
81              return "FUNCTIONFS_RESUME";
82      }
83  }
84  
85  enum class TransferDirection : uint64_t {
86      READ = 0,
87      WRITE = 1,
88  };
89  
90  struct TransferId {
91      TransferDirection direction : 1;
92      uint64_t id : 63;
93  
TransferIdTransferId94      TransferId() : TransferId(TransferDirection::READ, 0) {}
95  
96    private:
TransferIdTransferId97      TransferId(TransferDirection direction, uint64_t id) : direction(direction), id(id) {}
98  
99    public:
operator uint64_tTransferId100      explicit operator uint64_t() const {
101          uint64_t result;
102          static_assert(sizeof(*this) == sizeof(result));
103          memcpy(&result, this, sizeof(*this));
104          return result;
105      }
106  
readTransferId107      static TransferId read(uint64_t id) { return TransferId(TransferDirection::READ, id); }
writeTransferId108      static TransferId write(uint64_t id) { return TransferId(TransferDirection::WRITE, id); }
109  
from_valueTransferId110      static TransferId from_value(uint64_t value) {
111          TransferId result;
112          memcpy(&result, &value, sizeof(value));
113          return result;
114      }
115  };
116  
117  template <class Payload>
118  struct IoBlock {
119      bool pending = false;
120      struct iocb control = {};
121      Payload payload;
122  
idIoBlock123      TransferId id() const { return TransferId::from_value(control.aio_data); }
124  };
125  
126  using IoReadBlock = IoBlock<Block>;
127  using IoWriteBlock = IoBlock<std::shared_ptr<Block>>;
128  
129  struct ScopedAioContext {
130      ScopedAioContext() = default;
~ScopedAioContextScopedAioContext131      ~ScopedAioContext() { reset(); }
132  
ScopedAioContextScopedAioContext133      ScopedAioContext(ScopedAioContext&& move) { reset(move.release()); }
134      ScopedAioContext(const ScopedAioContext& copy) = delete;
135  
operator =ScopedAioContext136      ScopedAioContext& operator=(ScopedAioContext&& move) {
137          reset(move.release());
138          return *this;
139      }
140      ScopedAioContext& operator=(const ScopedAioContext& copy) = delete;
141  
CreateScopedAioContext142      static ScopedAioContext Create(size_t max_events) {
143          aio_context_t ctx = 0;
144          if (io_setup(max_events, &ctx) != 0) {
145              PLOG(FATAL) << "failed to create aio_context_t";
146          }
147          ScopedAioContext result;
148          result.reset(ctx);
149          return result;
150      }
151  
releaseScopedAioContext152      aio_context_t release() {
153          aio_context_t result = context_;
154          context_ = 0;
155          return result;
156      }
157  
resetScopedAioContext158      void reset(aio_context_t new_context = 0) {
159          if (context_ != 0) {
160              io_destroy(context_);
161          }
162  
163          context_ = new_context;
164      }
165  
getScopedAioContext166      aio_context_t get() { return context_; }
167  
168    private:
169      aio_context_t context_ = 0;
170  };
171  
172  struct UsbFfsConnection : public Connection {
UsbFfsConnectionUsbFfsConnection173      UsbFfsConnection(unique_fd control, unique_fd read, unique_fd write,
174                       std::promise<void> destruction_notifier)
175          : worker_started_(false),
176            stopped_(false),
177            destruction_notifier_(std::move(destruction_notifier)),
178            control_fd_(std::move(control)),
179            read_fd_(std::move(read)),
180            write_fd_(std::move(write)) {
181          LOG(INFO) << "UsbFfsConnection constructed";
182          worker_event_fd_.reset(eventfd(0, EFD_CLOEXEC));
183          if (worker_event_fd_ == -1) {
184              PLOG(FATAL) << "failed to create eventfd";
185          }
186  
187          monitor_event_fd_.reset(eventfd(0, EFD_CLOEXEC));
188          if (monitor_event_fd_ == -1) {
189              PLOG(FATAL) << "failed to create eventfd";
190          }
191  
192          aio_context_ = ScopedAioContext::Create(kUsbReadQueueDepth + kUsbWriteQueueDepth);
193      }
194  
~UsbFfsConnectionUsbFfsConnection195      ~UsbFfsConnection() {
196          LOG(INFO) << "UsbFfsConnection being destroyed";
197          Stop();
198          monitor_thread_.join();
199  
200          // We need to explicitly close our file descriptors before we notify our destruction,
201          // because the thread listening on the future will immediately try to reopen the endpoint.
202          aio_context_.reset();
203          control_fd_.reset();
204          read_fd_.reset();
205          write_fd_.reset();
206  
207          destruction_notifier_.set_value();
208      }
209  
WriteUsbFfsConnection210      virtual bool Write(std::unique_ptr<apacket> packet) override final {
211          LOG(DEBUG) << "USB write: " << dump_header(&packet->msg);
212          auto header = std::make_shared<Block>(sizeof(packet->msg));
213          memcpy(header->data(), &packet->msg, sizeof(packet->msg));
214  
215          std::lock_guard<std::mutex> lock(write_mutex_);
216          write_requests_.push_back(
217                  CreateWriteBlock(std::move(header), 0, sizeof(packet->msg), next_write_id_++));
218          if (!packet->payload.empty()) {
219              // The kernel attempts to allocate a contiguous block of memory for each write,
220              // which can fail if the write is large and the kernel heap is fragmented.
221              // Split large writes into smaller chunks to avoid this.
222              auto payload = std::make_shared<Block>(std::move(packet->payload));
223              size_t offset = 0;
224              size_t len = payload->size();
225  
226              while (len > 0) {
227                  size_t write_size = std::min(kUsbWriteSize, len);
228                  write_requests_.push_back(
229                          CreateWriteBlock(payload, offset, write_size, next_write_id_++));
230                  len -= write_size;
231                  offset += write_size;
232              }
233          }
234  
235          // Wake up the worker thread to submit writes.
236          uint64_t notify = 1;
237          ssize_t rc = adb_write(worker_event_fd_.get(), &notify, sizeof(notify));
238          if (rc < 0) {
239              PLOG(FATAL) << "failed to notify worker eventfd to submit writes";
240          }
241  
242          return true;
243      }
244  
StartUsbFfsConnection245      virtual void Start() override final { StartMonitor(); }
246  
StopUsbFfsConnection247      virtual void Stop() override final {
248          if (stopped_.exchange(true)) {
249              return;
250          }
251          stopped_ = true;
252          uint64_t notify = 1;
253          ssize_t rc = adb_write(worker_event_fd_.get(), &notify, sizeof(notify));
254          if (rc < 0) {
255              PLOG(FATAL) << "failed to notify worker eventfd to stop UsbFfsConnection";
256          }
257          CHECK_EQ(static_cast<size_t>(rc), sizeof(notify));
258  
259          rc = adb_write(monitor_event_fd_.get(), &notify, sizeof(notify));
260          if (rc < 0) {
261              PLOG(FATAL) << "failed to notify monitor eventfd to stop UsbFfsConnection";
262          }
263  
264          CHECK_EQ(static_cast<size_t>(rc), sizeof(notify));
265      }
266  
DoTlsHandshakeUsbFfsConnection267      virtual bool DoTlsHandshake(RSA* key, std::string* auth_key) override final {
268          // TODO: support TLS for usb connections.
269          LOG(FATAL) << "Not supported yet.";
270          return false;
271      }
272  
273    private:
StartMonitorUsbFfsConnection274      void StartMonitor() {
275          // This is a bit of a mess.
276          // It's possible for io_submit to end up blocking, if we call it as the endpoint
277          // becomes disabled. Work around this by having a monitor thread to listen for functionfs
278          // lifecycle events. If we notice an error condition (either we've become disabled, or we
279          // were never enabled in the first place), we send interruption signals to the worker thread
280          // until it dies, and then report failure to the transport via HandleError, which will
281          // eventually result in the transport being destroyed, which will result in UsbFfsConnection
282          // being destroyed, which unblocks the open thread and restarts this entire process.
283          static std::once_flag handler_once;
284          std::call_once(handler_once, []() { signal(kInterruptionSignal, [](int) {}); });
285  
286          monitor_thread_ = std::thread([this]() {
287              adb_thread_setname("UsbFfs-monitor");
288              LOG(INFO) << "UsbFfs-monitor thread spawned";
289  
290              bool bound = false;
291              bool enabled = false;
292              bool running = true;
293              while (running) {
294                  adb_pollfd pfd[2] = {
295                    { .fd = control_fd_.get(), .events = POLLIN, .revents = 0 },
296                    { .fd = monitor_event_fd_.get(), .events = POLLIN, .revents = 0 },
297                  };
298  
299                  // If we don't see our first bind within a second, try again.
300                  int timeout_ms = bound ? -1 : 1000;
301  
302                  int rc = TEMP_FAILURE_RETRY(adb_poll(pfd, 2, timeout_ms));
303                  if (rc == -1) {
304                      PLOG(FATAL) << "poll on USB control fd failed";
305                  } else if (rc == 0) {
306                      LOG(WARNING) << "timed out while waiting for FUNCTIONFS_BIND, trying again";
307                      break;
308                  }
309  
310                  if (pfd[1].revents) {
311                      // We were told to die.
312                      break;
313                  }
314  
315                  struct usb_functionfs_event event;
316                  rc = TEMP_FAILURE_RETRY(adb_read(control_fd_.get(), &event, sizeof(event)));
317                  if (rc == -1) {
318                      PLOG(FATAL) << "failed to read functionfs event";
319                  } else if (rc == 0) {
320                      LOG(WARNING) << "hit EOF on functionfs control fd";
321                      break;
322                  } else if (rc != sizeof(event)) {
323                      LOG(FATAL) << "read functionfs event of unexpected size, expected "
324                                 << sizeof(event) << ", got " << rc;
325                  }
326  
327                  LOG(INFO) << "USB event: "
328                            << to_string(static_cast<usb_functionfs_event_type>(event.type));
329  
330                  switch (event.type) {
331                      case FUNCTIONFS_BIND:
332                          if (bound) {
333                              LOG(WARNING) << "received FUNCTIONFS_BIND while already bound?";
334                              running = false;
335                              break;
336                          }
337  
338                          if (enabled) {
339                              LOG(WARNING) << "received FUNCTIONFS_BIND while already enabled?";
340                              running = false;
341                              break;
342                          }
343  
344                          bound = true;
345                          break;
346  
347                      case FUNCTIONFS_ENABLE:
348                          if (!bound) {
349                              LOG(WARNING) << "received FUNCTIONFS_ENABLE while not bound?";
350                              running = false;
351                              break;
352                          }
353  
354                          if (enabled) {
355                              LOG(WARNING) << "received FUNCTIONFS_ENABLE while already enabled?";
356                              running = false;
357                              break;
358                          }
359  
360                          enabled = true;
361                          StartWorker();
362                          break;
363  
364                      case FUNCTIONFS_DISABLE:
365                          if (!bound) {
366                              LOG(WARNING) << "received FUNCTIONFS_DISABLE while not bound?";
367                          }
368  
369                          if (!enabled) {
370                              LOG(WARNING) << "received FUNCTIONFS_DISABLE while not enabled?";
371                          }
372  
373                          enabled = false;
374                          running = false;
375                          break;
376  
377                      case FUNCTIONFS_UNBIND:
378                          if (enabled) {
379                              LOG(WARNING) << "received FUNCTIONFS_UNBIND while still enabled?";
380                          }
381  
382                          if (!bound) {
383                              LOG(WARNING) << "received FUNCTIONFS_UNBIND when not bound?";
384                          }
385  
386                          bound = false;
387                          running = false;
388                          break;
389  
390                      case FUNCTIONFS_SETUP: {
391                          LOG(INFO) << "received FUNCTIONFS_SETUP control transfer: bRequestType = "
392                                    << static_cast<int>(event.u.setup.bRequestType)
393                                    << ", bRequest = " << static_cast<int>(event.u.setup.bRequest)
394                                    << ", wValue = " << static_cast<int>(event.u.setup.wValue)
395                                    << ", wIndex = " << static_cast<int>(event.u.setup.wIndex)
396                                    << ", wLength = " << static_cast<int>(event.u.setup.wLength);
397  
398                          if ((event.u.setup.bRequestType & USB_DIR_IN)) {
399                              LOG(INFO) << "acking device-to-host control transfer";
400                              ssize_t rc = adb_write(control_fd_.get(), "", 0);
401                              if (rc != 0) {
402                                  PLOG(ERROR) << "failed to write empty packet to host";
403                                  break;
404                              }
405                          } else {
406                              std::string buf;
407                              buf.resize(event.u.setup.wLength + 1);
408  
409                              ssize_t rc = adb_read(control_fd_.get(), buf.data(), buf.size());
410                              if (rc != event.u.setup.wLength) {
411                                  LOG(ERROR)
412                                          << "read " << rc
413                                          << " bytes when trying to read control request, expected "
414                                          << event.u.setup.wLength;
415                              }
416  
417                              LOG(INFO) << "control request contents: " << buf;
418                              break;
419                          }
420                      }
421                  }
422              }
423  
424              StopWorker();
425              HandleError("monitor thread finished");
426          });
427      }
428  
StartWorkerUsbFfsConnection429      void StartWorker() {
430          CHECK(!worker_started_);
431          worker_started_ = true;
432          worker_thread_ = std::thread([this]() {
433              adb_thread_setname("UsbFfs-worker");
434              LOG(INFO) << "UsbFfs-worker thread spawned";
435  
436              for (size_t i = 0; i < kUsbReadQueueDepth; ++i) {
437                  read_requests_[i] = CreateReadBlock(next_read_id_++);
438                  if (!SubmitRead(&read_requests_[i])) {
439                      return;
440                  }
441              }
442  
443              while (!stopped_) {
444                  uint64_t dummy;
445                  ssize_t rc = adb_read(worker_event_fd_.get(), &dummy, sizeof(dummy));
446                  if (rc == -1) {
447                      PLOG(FATAL) << "failed to read from eventfd";
448                  } else if (rc == 0) {
449                      LOG(FATAL) << "hit EOF on eventfd";
450                  }
451  
452                  ReadEvents();
453  
454                  std::lock_guard<std::mutex> lock(write_mutex_);
455                  SubmitWrites();
456              }
457          });
458      }
459  
StopWorkerUsbFfsConnection460      void StopWorker() {
461          if (!worker_started_) {
462              return;
463          }
464  
465          pthread_t worker_thread_handle = worker_thread_.native_handle();
466          while (true) {
467              int rc = pthread_kill(worker_thread_handle, kInterruptionSignal);
468              if (rc != 0) {
469                  LOG(ERROR) << "failed to send interruption signal to worker: " << strerror(rc);
470                  break;
471              }
472  
473              std::this_thread::sleep_for(100ms);
474  
475              rc = pthread_kill(worker_thread_handle, 0);
476              if (rc == 0) {
477                  continue;
478              } else if (rc == ESRCH) {
479                  break;
480              } else {
481                  LOG(ERROR) << "failed to send interruption signal to worker: " << strerror(rc);
482              }
483          }
484  
485          worker_thread_.join();
486      }
487  
PrepareReadBlockUsbFfsConnection488      void PrepareReadBlock(IoReadBlock* block, uint64_t id) {
489          block->pending = false;
490          if (block->payload.capacity() >= kUsbReadSize) {
491              block->payload.resize(kUsbReadSize);
492          } else {
493              block->payload = Block(kUsbReadSize);
494          }
495          block->control.aio_data = static_cast<uint64_t>(TransferId::read(id));
496          block->control.aio_buf = reinterpret_cast<uintptr_t>(block->payload.data());
497          block->control.aio_nbytes = block->payload.size();
498      }
499  
CreateReadBlockUsbFfsConnection500      IoReadBlock CreateReadBlock(uint64_t id) {
501          IoReadBlock block;
502          PrepareReadBlock(&block, id);
503          block.control.aio_rw_flags = 0;
504          block.control.aio_lio_opcode = IOCB_CMD_PREAD;
505          block.control.aio_reqprio = 0;
506          block.control.aio_fildes = read_fd_.get();
507          block.control.aio_offset = 0;
508          block.control.aio_flags = IOCB_FLAG_RESFD;
509          block.control.aio_resfd = worker_event_fd_.get();
510          return block;
511      }
512  
ReadEventsUsbFfsConnection513      void ReadEvents() {
514          static constexpr size_t kMaxEvents = kUsbReadQueueDepth + kUsbWriteQueueDepth;
515          struct io_event events[kMaxEvents];
516          struct timespec timeout = {.tv_sec = 0, .tv_nsec = 0};
517          int rc = io_getevents(aio_context_.get(), 0, kMaxEvents, events, &timeout);
518          if (rc == -1) {
519              HandleError(StringPrintf("io_getevents failed while reading: %s", strerror(errno)));
520              return;
521          }
522  
523          for (int event_idx = 0; event_idx < rc; ++event_idx) {
524              auto& event = events[event_idx];
525              TransferId id = TransferId::from_value(event.data);
526  
527              if (event.res < 0) {
528                  std::string error =
529                          StringPrintf("%s %" PRIu64 " failed with error %s",
530                                       id.direction == TransferDirection::READ ? "read" : "write",
531                                       id.id, strerror(-event.res));
532                  HandleError(error);
533                  return;
534              }
535  
536              if (id.direction == TransferDirection::READ) {
537                  if (!HandleRead(id, event.res)) {
538                      return;
539                  }
540              } else {
541                  HandleWrite(id);
542              }
543          }
544      }
545  
HandleReadUsbFfsConnection546      bool HandleRead(TransferId id, int64_t size) {
547          uint64_t read_idx = id.id % kUsbReadQueueDepth;
548          IoReadBlock* block = &read_requests_[read_idx];
549          block->pending = false;
550          block->payload.resize(size);
551  
552          // Notification for completed reads can be received out of order.
553          if (block->id().id != needed_read_id_) {
554              LOG(VERBOSE) << "read " << block->id().id << " completed while waiting for "
555                           << needed_read_id_;
556              return true;
557          }
558  
559          for (uint64_t id = needed_read_id_;; ++id) {
560              size_t read_idx = id % kUsbReadQueueDepth;
561              IoReadBlock* current_block = &read_requests_[read_idx];
562              if (current_block->pending) {
563                  break;
564              }
565              if (!ProcessRead(current_block)) {
566                  return false;
567              }
568              ++needed_read_id_;
569          }
570  
571          return true;
572      }
573  
ProcessReadUsbFfsConnection574      bool ProcessRead(IoReadBlock* block) {
575          if (!block->payload.empty()) {
576              if (!incoming_header_.has_value()) {
577                  if (block->payload.size() != sizeof(amessage)) {
578                      HandleError("received packet of unexpected length while reading header");
579                      return false;
580                  }
581                  amessage& msg = incoming_header_.emplace();
582                  memcpy(&msg, block->payload.data(), sizeof(msg));
583                  LOG(DEBUG) << "USB read:" << dump_header(&msg);
584                  incoming_header_ = msg;
585              } else {
586                  size_t bytes_left = incoming_header_->data_length - incoming_payload_.size();
587                  if (block->payload.size() > bytes_left) {
588                      HandleError("received too many bytes while waiting for payload");
589                      return false;
590                  }
591                  incoming_payload_.append(std::move(block->payload));
592              }
593  
594              if (incoming_header_->data_length == incoming_payload_.size()) {
595                  auto packet = std::make_unique<apacket>();
596                  packet->msg = *incoming_header_;
597  
598                  // TODO: Make apacket contain an IOVector so we don't have to coalesce.
599                  packet->payload = std::move(incoming_payload_).coalesce();
600                  read_callback_(this, std::move(packet));
601  
602                  incoming_header_.reset();
603                  // reuse the capacity of the incoming payload while we can.
604                  auto free_block = incoming_payload_.clear();
605                  if (block->payload.capacity() == 0) {
606                      block->payload = std::move(free_block);
607                  }
608              }
609          }
610  
611          PrepareReadBlock(block, block->id().id + kUsbReadQueueDepth);
612          SubmitRead(block);
613          return true;
614      }
615  
SubmitReadUsbFfsConnection616      bool SubmitRead(IoReadBlock* block) {
617          block->pending = true;
618          struct iocb* iocb = &block->control;
619          if (io_submit(aio_context_.get(), 1, &iocb) != 1) {
620              HandleError(StringPrintf("failed to submit read: %s", strerror(errno)));
621              return false;
622          }
623  
624          return true;
625      }
626  
HandleWriteUsbFfsConnection627      void HandleWrite(TransferId id) {
628          std::lock_guard<std::mutex> lock(write_mutex_);
629          auto it =
630                  std::find_if(write_requests_.begin(), write_requests_.end(), [id](const auto& req) {
631                      return static_cast<uint64_t>(req.id()) == static_cast<uint64_t>(id);
632                  });
633          CHECK(it != write_requests_.end());
634  
635          write_requests_.erase(it);
636          size_t outstanding_writes = --writes_submitted_;
637          LOG(DEBUG) << "USB write: reaped, down to " << outstanding_writes;
638      }
639  
CreateWriteBlockUsbFfsConnection640      IoWriteBlock CreateWriteBlock(std::shared_ptr<Block> payload, size_t offset, size_t len,
641                                    uint64_t id) {
642          auto block = IoWriteBlock();
643          block.payload = std::move(payload);
644          block.control.aio_data = static_cast<uint64_t>(TransferId::write(id));
645          block.control.aio_rw_flags = 0;
646          block.control.aio_lio_opcode = IOCB_CMD_PWRITE;
647          block.control.aio_reqprio = 0;
648          block.control.aio_fildes = write_fd_.get();
649          block.control.aio_buf = reinterpret_cast<uintptr_t>(block.payload->data() + offset);
650          block.control.aio_nbytes = len;
651          block.control.aio_offset = 0;
652          block.control.aio_flags = IOCB_FLAG_RESFD;
653          block.control.aio_resfd = worker_event_fd_.get();
654          return block;
655      }
656  
CreateWriteBlockUsbFfsConnection657      IoWriteBlock CreateWriteBlock(Block&& payload, uint64_t id) {
658          size_t len = payload.size();
659          return CreateWriteBlock(std::make_shared<Block>(std::move(payload)), 0, len, id);
660      }
661  
SubmitWritesUsbFfsConnection662      void SubmitWrites() REQUIRES(write_mutex_) {
663          if (writes_submitted_ == kUsbWriteQueueDepth) {
664              return;
665          }
666  
667          ssize_t writes_to_submit = std::min(kUsbWriteQueueDepth - writes_submitted_,
668                                              write_requests_.size() - writes_submitted_);
669          CHECK_GE(writes_to_submit, 0);
670          if (writes_to_submit == 0) {
671              return;
672          }
673  
674          struct iocb* iocbs[kUsbWriteQueueDepth];
675          for (int i = 0; i < writes_to_submit; ++i) {
676              CHECK(!write_requests_[writes_submitted_ + i].pending);
677              write_requests_[writes_submitted_ + i].pending = true;
678              iocbs[i] = &write_requests_[writes_submitted_ + i].control;
679              LOG(VERBOSE) << "submitting write_request " << static_cast<void*>(iocbs[i]);
680          }
681  
682          writes_submitted_ += writes_to_submit;
683  
684          int rc = io_submit(aio_context_.get(), writes_to_submit, iocbs);
685          if (rc == -1) {
686              HandleError(StringPrintf("failed to submit write requests: %s", strerror(errno)));
687              return;
688          } else if (rc != writes_to_submit) {
689              LOG(FATAL) << "failed to submit all writes: wanted to submit " << writes_to_submit
690                         << ", actually submitted " << rc;
691          }
692      }
693  
HandleErrorUsbFfsConnection694      void HandleError(const std::string& error) {
695          std::call_once(error_flag_, [&]() {
696              error_callback_(this, error);
697              if (!stopped_) {
698                  Stop();
699              }
700          });
701      }
702  
703      std::thread monitor_thread_;
704  
705      bool worker_started_;
706      std::thread worker_thread_;
707  
708      std::atomic<bool> stopped_;
709      std::promise<void> destruction_notifier_;
710      std::once_flag error_flag_;
711  
712      unique_fd worker_event_fd_;
713      unique_fd monitor_event_fd_;
714  
715      ScopedAioContext aio_context_;
716      unique_fd control_fd_;
717      unique_fd read_fd_;
718      unique_fd write_fd_;
719  
720      std::optional<amessage> incoming_header_;
721      IOVector incoming_payload_;
722  
723      std::array<IoReadBlock, kUsbReadQueueDepth> read_requests_;
724      IOVector read_data_;
725  
726      // ID of the next request that we're going to send out.
727      size_t next_read_id_ = 0;
728  
729      // ID of the next packet we're waiting for.
730      size_t needed_read_id_ = 0;
731  
732      std::mutex write_mutex_;
733      std::deque<IoWriteBlock> write_requests_ GUARDED_BY(write_mutex_);
734      size_t next_write_id_ GUARDED_BY(write_mutex_) = 0;
735      size_t writes_submitted_ GUARDED_BY(write_mutex_) = 0;
736  
737      static constexpr int kInterruptionSignal = SIGUSR1;
738  };
739  
usb_ffs_open_thread()740  static void usb_ffs_open_thread() {
741      adb_thread_setname("usb ffs open");
742  
743      while (true) {
744          unique_fd control;
745          unique_fd bulk_out;
746          unique_fd bulk_in;
747          if (!open_functionfs(&control, &bulk_out, &bulk_in)) {
748              std::this_thread::sleep_for(1s);
749              continue;
750          }
751  
752          atransport* transport = new atransport();
753          transport->serial = "UsbFfs";
754          std::promise<void> destruction_notifier;
755          std::future<void> future = destruction_notifier.get_future();
756          transport->SetConnection(std::make_unique<UsbFfsConnection>(
757                  std::move(control), std::move(bulk_out), std::move(bulk_in),
758                  std::move(destruction_notifier)));
759          register_transport(transport);
760          future.wait();
761      }
762  }
763  
usb_init()764  void usb_init() {
765      std::thread(usb_ffs_open_thread).detach();
766  }
767