1 /* 2 * Copyright (C) 2018 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #define TRACE_TAG USB 18 19 #include "sysdeps.h" 20 21 #include <errno.h> 22 #include <inttypes.h> 23 #include <stdio.h> 24 #include <stdlib.h> 25 #include <string.h> 26 #include <sys/ioctl.h> 27 #include <sys/types.h> 28 #include <unistd.h> 29 30 #include <linux/usb/functionfs.h> 31 #include <sys/eventfd.h> 32 33 #include <algorithm> 34 #include <array> 35 #include <future> 36 #include <memory> 37 #include <mutex> 38 #include <optional> 39 #include <vector> 40 41 #include <asyncio/AsyncIO.h> 42 43 #include <android-base/logging.h> 44 #include <android-base/macros.h> 45 #include <android-base/properties.h> 46 #include <android-base/thread_annotations.h> 47 48 #include "adb_unique_fd.h" 49 #include "adb_utils.h" 50 #include "daemon/usb_ffs.h" 51 #include "sysdeps/chrono.h" 52 #include "transport.h" 53 #include "types.h" 54 55 using android::base::StringPrintf; 56 57 // Not all USB controllers support operations larger than 16k, so don't go above that. 58 // Also, each submitted operation does an allocation in the kernel of that size, so we want to 59 // minimize our queue depth while still maintaining a deep enough queue to keep the USB stack fed. 60 static constexpr size_t kUsbReadQueueDepth = 8; 61 static constexpr size_t kUsbReadSize = 4 * PAGE_SIZE; 62 63 static constexpr size_t kUsbWriteQueueDepth = 8; 64 static constexpr size_t kUsbWriteSize = 4 * PAGE_SIZE; 65 to_string(enum usb_functionfs_event_type type)66 static const char* to_string(enum usb_functionfs_event_type type) { 67 switch (type) { 68 case FUNCTIONFS_BIND: 69 return "FUNCTIONFS_BIND"; 70 case FUNCTIONFS_UNBIND: 71 return "FUNCTIONFS_UNBIND"; 72 case FUNCTIONFS_ENABLE: 73 return "FUNCTIONFS_ENABLE"; 74 case FUNCTIONFS_DISABLE: 75 return "FUNCTIONFS_DISABLE"; 76 case FUNCTIONFS_SETUP: 77 return "FUNCTIONFS_SETUP"; 78 case FUNCTIONFS_SUSPEND: 79 return "FUNCTIONFS_SUSPEND"; 80 case FUNCTIONFS_RESUME: 81 return "FUNCTIONFS_RESUME"; 82 } 83 } 84 85 enum class TransferDirection : uint64_t { 86 READ = 0, 87 WRITE = 1, 88 }; 89 90 struct TransferId { 91 TransferDirection direction : 1; 92 uint64_t id : 63; 93 TransferIdTransferId94 TransferId() : TransferId(TransferDirection::READ, 0) {} 95 96 private: TransferIdTransferId97 TransferId(TransferDirection direction, uint64_t id) : direction(direction), id(id) {} 98 99 public: operator uint64_tTransferId100 explicit operator uint64_t() const { 101 uint64_t result; 102 static_assert(sizeof(*this) == sizeof(result)); 103 memcpy(&result, this, sizeof(*this)); 104 return result; 105 } 106 readTransferId107 static TransferId read(uint64_t id) { return TransferId(TransferDirection::READ, id); } writeTransferId108 static TransferId write(uint64_t id) { return TransferId(TransferDirection::WRITE, id); } 109 from_valueTransferId110 static TransferId from_value(uint64_t value) { 111 TransferId result; 112 memcpy(&result, &value, sizeof(value)); 113 return result; 114 } 115 }; 116 117 template <class Payload> 118 struct IoBlock { 119 bool pending = false; 120 struct iocb control = {}; 121 Payload payload; 122 idIoBlock123 TransferId id() const { return TransferId::from_value(control.aio_data); } 124 }; 125 126 using IoReadBlock = IoBlock<Block>; 127 using IoWriteBlock = IoBlock<std::shared_ptr<Block>>; 128 129 struct ScopedAioContext { 130 ScopedAioContext() = default; ~ScopedAioContextScopedAioContext131 ~ScopedAioContext() { reset(); } 132 ScopedAioContextScopedAioContext133 ScopedAioContext(ScopedAioContext&& move) { reset(move.release()); } 134 ScopedAioContext(const ScopedAioContext& copy) = delete; 135 operator =ScopedAioContext136 ScopedAioContext& operator=(ScopedAioContext&& move) { 137 reset(move.release()); 138 return *this; 139 } 140 ScopedAioContext& operator=(const ScopedAioContext& copy) = delete; 141 CreateScopedAioContext142 static ScopedAioContext Create(size_t max_events) { 143 aio_context_t ctx = 0; 144 if (io_setup(max_events, &ctx) != 0) { 145 PLOG(FATAL) << "failed to create aio_context_t"; 146 } 147 ScopedAioContext result; 148 result.reset(ctx); 149 return result; 150 } 151 releaseScopedAioContext152 aio_context_t release() { 153 aio_context_t result = context_; 154 context_ = 0; 155 return result; 156 } 157 resetScopedAioContext158 void reset(aio_context_t new_context = 0) { 159 if (context_ != 0) { 160 io_destroy(context_); 161 } 162 163 context_ = new_context; 164 } 165 getScopedAioContext166 aio_context_t get() { return context_; } 167 168 private: 169 aio_context_t context_ = 0; 170 }; 171 172 struct UsbFfsConnection : public Connection { UsbFfsConnectionUsbFfsConnection173 UsbFfsConnection(unique_fd control, unique_fd read, unique_fd write, 174 std::promise<void> destruction_notifier) 175 : worker_started_(false), 176 stopped_(false), 177 destruction_notifier_(std::move(destruction_notifier)), 178 control_fd_(std::move(control)), 179 read_fd_(std::move(read)), 180 write_fd_(std::move(write)) { 181 LOG(INFO) << "UsbFfsConnection constructed"; 182 worker_event_fd_.reset(eventfd(0, EFD_CLOEXEC)); 183 if (worker_event_fd_ == -1) { 184 PLOG(FATAL) << "failed to create eventfd"; 185 } 186 187 monitor_event_fd_.reset(eventfd(0, EFD_CLOEXEC)); 188 if (monitor_event_fd_ == -1) { 189 PLOG(FATAL) << "failed to create eventfd"; 190 } 191 192 aio_context_ = ScopedAioContext::Create(kUsbReadQueueDepth + kUsbWriteQueueDepth); 193 } 194 ~UsbFfsConnectionUsbFfsConnection195 ~UsbFfsConnection() { 196 LOG(INFO) << "UsbFfsConnection being destroyed"; 197 Stop(); 198 monitor_thread_.join(); 199 200 // We need to explicitly close our file descriptors before we notify our destruction, 201 // because the thread listening on the future will immediately try to reopen the endpoint. 202 aio_context_.reset(); 203 control_fd_.reset(); 204 read_fd_.reset(); 205 write_fd_.reset(); 206 207 destruction_notifier_.set_value(); 208 } 209 WriteUsbFfsConnection210 virtual bool Write(std::unique_ptr<apacket> packet) override final { 211 LOG(DEBUG) << "USB write: " << dump_header(&packet->msg); 212 auto header = std::make_shared<Block>(sizeof(packet->msg)); 213 memcpy(header->data(), &packet->msg, sizeof(packet->msg)); 214 215 std::lock_guard<std::mutex> lock(write_mutex_); 216 write_requests_.push_back( 217 CreateWriteBlock(std::move(header), 0, sizeof(packet->msg), next_write_id_++)); 218 if (!packet->payload.empty()) { 219 // The kernel attempts to allocate a contiguous block of memory for each write, 220 // which can fail if the write is large and the kernel heap is fragmented. 221 // Split large writes into smaller chunks to avoid this. 222 auto payload = std::make_shared<Block>(std::move(packet->payload)); 223 size_t offset = 0; 224 size_t len = payload->size(); 225 226 while (len > 0) { 227 size_t write_size = std::min(kUsbWriteSize, len); 228 write_requests_.push_back( 229 CreateWriteBlock(payload, offset, write_size, next_write_id_++)); 230 len -= write_size; 231 offset += write_size; 232 } 233 } 234 235 // Wake up the worker thread to submit writes. 236 uint64_t notify = 1; 237 ssize_t rc = adb_write(worker_event_fd_.get(), ¬ify, sizeof(notify)); 238 if (rc < 0) { 239 PLOG(FATAL) << "failed to notify worker eventfd to submit writes"; 240 } 241 242 return true; 243 } 244 StartUsbFfsConnection245 virtual void Start() override final { StartMonitor(); } 246 StopUsbFfsConnection247 virtual void Stop() override final { 248 if (stopped_.exchange(true)) { 249 return; 250 } 251 stopped_ = true; 252 uint64_t notify = 1; 253 ssize_t rc = adb_write(worker_event_fd_.get(), ¬ify, sizeof(notify)); 254 if (rc < 0) { 255 PLOG(FATAL) << "failed to notify worker eventfd to stop UsbFfsConnection"; 256 } 257 CHECK_EQ(static_cast<size_t>(rc), sizeof(notify)); 258 259 rc = adb_write(monitor_event_fd_.get(), ¬ify, sizeof(notify)); 260 if (rc < 0) { 261 PLOG(FATAL) << "failed to notify monitor eventfd to stop UsbFfsConnection"; 262 } 263 264 CHECK_EQ(static_cast<size_t>(rc), sizeof(notify)); 265 } 266 DoTlsHandshakeUsbFfsConnection267 virtual bool DoTlsHandshake(RSA* key, std::string* auth_key) override final { 268 // TODO: support TLS for usb connections. 269 LOG(FATAL) << "Not supported yet."; 270 return false; 271 } 272 273 private: StartMonitorUsbFfsConnection274 void StartMonitor() { 275 // This is a bit of a mess. 276 // It's possible for io_submit to end up blocking, if we call it as the endpoint 277 // becomes disabled. Work around this by having a monitor thread to listen for functionfs 278 // lifecycle events. If we notice an error condition (either we've become disabled, or we 279 // were never enabled in the first place), we send interruption signals to the worker thread 280 // until it dies, and then report failure to the transport via HandleError, which will 281 // eventually result in the transport being destroyed, which will result in UsbFfsConnection 282 // being destroyed, which unblocks the open thread and restarts this entire process. 283 static std::once_flag handler_once; 284 std::call_once(handler_once, []() { signal(kInterruptionSignal, [](int) {}); }); 285 286 monitor_thread_ = std::thread([this]() { 287 adb_thread_setname("UsbFfs-monitor"); 288 LOG(INFO) << "UsbFfs-monitor thread spawned"; 289 290 bool bound = false; 291 bool enabled = false; 292 bool running = true; 293 while (running) { 294 adb_pollfd pfd[2] = { 295 { .fd = control_fd_.get(), .events = POLLIN, .revents = 0 }, 296 { .fd = monitor_event_fd_.get(), .events = POLLIN, .revents = 0 }, 297 }; 298 299 // If we don't see our first bind within a second, try again. 300 int timeout_ms = bound ? -1 : 1000; 301 302 int rc = TEMP_FAILURE_RETRY(adb_poll(pfd, 2, timeout_ms)); 303 if (rc == -1) { 304 PLOG(FATAL) << "poll on USB control fd failed"; 305 } else if (rc == 0) { 306 LOG(WARNING) << "timed out while waiting for FUNCTIONFS_BIND, trying again"; 307 break; 308 } 309 310 if (pfd[1].revents) { 311 // We were told to die. 312 break; 313 } 314 315 struct usb_functionfs_event event; 316 rc = TEMP_FAILURE_RETRY(adb_read(control_fd_.get(), &event, sizeof(event))); 317 if (rc == -1) { 318 PLOG(FATAL) << "failed to read functionfs event"; 319 } else if (rc == 0) { 320 LOG(WARNING) << "hit EOF on functionfs control fd"; 321 break; 322 } else if (rc != sizeof(event)) { 323 LOG(FATAL) << "read functionfs event of unexpected size, expected " 324 << sizeof(event) << ", got " << rc; 325 } 326 327 LOG(INFO) << "USB event: " 328 << to_string(static_cast<usb_functionfs_event_type>(event.type)); 329 330 switch (event.type) { 331 case FUNCTIONFS_BIND: 332 if (bound) { 333 LOG(WARNING) << "received FUNCTIONFS_BIND while already bound?"; 334 running = false; 335 break; 336 } 337 338 if (enabled) { 339 LOG(WARNING) << "received FUNCTIONFS_BIND while already enabled?"; 340 running = false; 341 break; 342 } 343 344 bound = true; 345 break; 346 347 case FUNCTIONFS_ENABLE: 348 if (!bound) { 349 LOG(WARNING) << "received FUNCTIONFS_ENABLE while not bound?"; 350 running = false; 351 break; 352 } 353 354 if (enabled) { 355 LOG(WARNING) << "received FUNCTIONFS_ENABLE while already enabled?"; 356 running = false; 357 break; 358 } 359 360 enabled = true; 361 StartWorker(); 362 break; 363 364 case FUNCTIONFS_DISABLE: 365 if (!bound) { 366 LOG(WARNING) << "received FUNCTIONFS_DISABLE while not bound?"; 367 } 368 369 if (!enabled) { 370 LOG(WARNING) << "received FUNCTIONFS_DISABLE while not enabled?"; 371 } 372 373 enabled = false; 374 running = false; 375 break; 376 377 case FUNCTIONFS_UNBIND: 378 if (enabled) { 379 LOG(WARNING) << "received FUNCTIONFS_UNBIND while still enabled?"; 380 } 381 382 if (!bound) { 383 LOG(WARNING) << "received FUNCTIONFS_UNBIND when not bound?"; 384 } 385 386 bound = false; 387 running = false; 388 break; 389 390 case FUNCTIONFS_SETUP: { 391 LOG(INFO) << "received FUNCTIONFS_SETUP control transfer: bRequestType = " 392 << static_cast<int>(event.u.setup.bRequestType) 393 << ", bRequest = " << static_cast<int>(event.u.setup.bRequest) 394 << ", wValue = " << static_cast<int>(event.u.setup.wValue) 395 << ", wIndex = " << static_cast<int>(event.u.setup.wIndex) 396 << ", wLength = " << static_cast<int>(event.u.setup.wLength); 397 398 if ((event.u.setup.bRequestType & USB_DIR_IN)) { 399 LOG(INFO) << "acking device-to-host control transfer"; 400 ssize_t rc = adb_write(control_fd_.get(), "", 0); 401 if (rc != 0) { 402 PLOG(ERROR) << "failed to write empty packet to host"; 403 break; 404 } 405 } else { 406 std::string buf; 407 buf.resize(event.u.setup.wLength + 1); 408 409 ssize_t rc = adb_read(control_fd_.get(), buf.data(), buf.size()); 410 if (rc != event.u.setup.wLength) { 411 LOG(ERROR) 412 << "read " << rc 413 << " bytes when trying to read control request, expected " 414 << event.u.setup.wLength; 415 } 416 417 LOG(INFO) << "control request contents: " << buf; 418 break; 419 } 420 } 421 } 422 } 423 424 StopWorker(); 425 HandleError("monitor thread finished"); 426 }); 427 } 428 StartWorkerUsbFfsConnection429 void StartWorker() { 430 CHECK(!worker_started_); 431 worker_started_ = true; 432 worker_thread_ = std::thread([this]() { 433 adb_thread_setname("UsbFfs-worker"); 434 LOG(INFO) << "UsbFfs-worker thread spawned"; 435 436 for (size_t i = 0; i < kUsbReadQueueDepth; ++i) { 437 read_requests_[i] = CreateReadBlock(next_read_id_++); 438 if (!SubmitRead(&read_requests_[i])) { 439 return; 440 } 441 } 442 443 while (!stopped_) { 444 uint64_t dummy; 445 ssize_t rc = adb_read(worker_event_fd_.get(), &dummy, sizeof(dummy)); 446 if (rc == -1) { 447 PLOG(FATAL) << "failed to read from eventfd"; 448 } else if (rc == 0) { 449 LOG(FATAL) << "hit EOF on eventfd"; 450 } 451 452 ReadEvents(); 453 454 std::lock_guard<std::mutex> lock(write_mutex_); 455 SubmitWrites(); 456 } 457 }); 458 } 459 StopWorkerUsbFfsConnection460 void StopWorker() { 461 if (!worker_started_) { 462 return; 463 } 464 465 pthread_t worker_thread_handle = worker_thread_.native_handle(); 466 while (true) { 467 int rc = pthread_kill(worker_thread_handle, kInterruptionSignal); 468 if (rc != 0) { 469 LOG(ERROR) << "failed to send interruption signal to worker: " << strerror(rc); 470 break; 471 } 472 473 std::this_thread::sleep_for(100ms); 474 475 rc = pthread_kill(worker_thread_handle, 0); 476 if (rc == 0) { 477 continue; 478 } else if (rc == ESRCH) { 479 break; 480 } else { 481 LOG(ERROR) << "failed to send interruption signal to worker: " << strerror(rc); 482 } 483 } 484 485 worker_thread_.join(); 486 } 487 PrepareReadBlockUsbFfsConnection488 void PrepareReadBlock(IoReadBlock* block, uint64_t id) { 489 block->pending = false; 490 if (block->payload.capacity() >= kUsbReadSize) { 491 block->payload.resize(kUsbReadSize); 492 } else { 493 block->payload = Block(kUsbReadSize); 494 } 495 block->control.aio_data = static_cast<uint64_t>(TransferId::read(id)); 496 block->control.aio_buf = reinterpret_cast<uintptr_t>(block->payload.data()); 497 block->control.aio_nbytes = block->payload.size(); 498 } 499 CreateReadBlockUsbFfsConnection500 IoReadBlock CreateReadBlock(uint64_t id) { 501 IoReadBlock block; 502 PrepareReadBlock(&block, id); 503 block.control.aio_rw_flags = 0; 504 block.control.aio_lio_opcode = IOCB_CMD_PREAD; 505 block.control.aio_reqprio = 0; 506 block.control.aio_fildes = read_fd_.get(); 507 block.control.aio_offset = 0; 508 block.control.aio_flags = IOCB_FLAG_RESFD; 509 block.control.aio_resfd = worker_event_fd_.get(); 510 return block; 511 } 512 ReadEventsUsbFfsConnection513 void ReadEvents() { 514 static constexpr size_t kMaxEvents = kUsbReadQueueDepth + kUsbWriteQueueDepth; 515 struct io_event events[kMaxEvents]; 516 struct timespec timeout = {.tv_sec = 0, .tv_nsec = 0}; 517 int rc = io_getevents(aio_context_.get(), 0, kMaxEvents, events, &timeout); 518 if (rc == -1) { 519 HandleError(StringPrintf("io_getevents failed while reading: %s", strerror(errno))); 520 return; 521 } 522 523 for (int event_idx = 0; event_idx < rc; ++event_idx) { 524 auto& event = events[event_idx]; 525 TransferId id = TransferId::from_value(event.data); 526 527 if (event.res < 0) { 528 std::string error = 529 StringPrintf("%s %" PRIu64 " failed with error %s", 530 id.direction == TransferDirection::READ ? "read" : "write", 531 id.id, strerror(-event.res)); 532 HandleError(error); 533 return; 534 } 535 536 if (id.direction == TransferDirection::READ) { 537 if (!HandleRead(id, event.res)) { 538 return; 539 } 540 } else { 541 HandleWrite(id); 542 } 543 } 544 } 545 HandleReadUsbFfsConnection546 bool HandleRead(TransferId id, int64_t size) { 547 uint64_t read_idx = id.id % kUsbReadQueueDepth; 548 IoReadBlock* block = &read_requests_[read_idx]; 549 block->pending = false; 550 block->payload.resize(size); 551 552 // Notification for completed reads can be received out of order. 553 if (block->id().id != needed_read_id_) { 554 LOG(VERBOSE) << "read " << block->id().id << " completed while waiting for " 555 << needed_read_id_; 556 return true; 557 } 558 559 for (uint64_t id = needed_read_id_;; ++id) { 560 size_t read_idx = id % kUsbReadQueueDepth; 561 IoReadBlock* current_block = &read_requests_[read_idx]; 562 if (current_block->pending) { 563 break; 564 } 565 if (!ProcessRead(current_block)) { 566 return false; 567 } 568 ++needed_read_id_; 569 } 570 571 return true; 572 } 573 ProcessReadUsbFfsConnection574 bool ProcessRead(IoReadBlock* block) { 575 if (!block->payload.empty()) { 576 if (!incoming_header_.has_value()) { 577 if (block->payload.size() != sizeof(amessage)) { 578 HandleError("received packet of unexpected length while reading header"); 579 return false; 580 } 581 amessage& msg = incoming_header_.emplace(); 582 memcpy(&msg, block->payload.data(), sizeof(msg)); 583 LOG(DEBUG) << "USB read:" << dump_header(&msg); 584 incoming_header_ = msg; 585 } else { 586 size_t bytes_left = incoming_header_->data_length - incoming_payload_.size(); 587 if (block->payload.size() > bytes_left) { 588 HandleError("received too many bytes while waiting for payload"); 589 return false; 590 } 591 incoming_payload_.append(std::move(block->payload)); 592 } 593 594 if (incoming_header_->data_length == incoming_payload_.size()) { 595 auto packet = std::make_unique<apacket>(); 596 packet->msg = *incoming_header_; 597 598 // TODO: Make apacket contain an IOVector so we don't have to coalesce. 599 packet->payload = std::move(incoming_payload_).coalesce(); 600 read_callback_(this, std::move(packet)); 601 602 incoming_header_.reset(); 603 // reuse the capacity of the incoming payload while we can. 604 auto free_block = incoming_payload_.clear(); 605 if (block->payload.capacity() == 0) { 606 block->payload = std::move(free_block); 607 } 608 } 609 } 610 611 PrepareReadBlock(block, block->id().id + kUsbReadQueueDepth); 612 SubmitRead(block); 613 return true; 614 } 615 SubmitReadUsbFfsConnection616 bool SubmitRead(IoReadBlock* block) { 617 block->pending = true; 618 struct iocb* iocb = &block->control; 619 if (io_submit(aio_context_.get(), 1, &iocb) != 1) { 620 HandleError(StringPrintf("failed to submit read: %s", strerror(errno))); 621 return false; 622 } 623 624 return true; 625 } 626 HandleWriteUsbFfsConnection627 void HandleWrite(TransferId id) { 628 std::lock_guard<std::mutex> lock(write_mutex_); 629 auto it = 630 std::find_if(write_requests_.begin(), write_requests_.end(), [id](const auto& req) { 631 return static_cast<uint64_t>(req.id()) == static_cast<uint64_t>(id); 632 }); 633 CHECK(it != write_requests_.end()); 634 635 write_requests_.erase(it); 636 size_t outstanding_writes = --writes_submitted_; 637 LOG(DEBUG) << "USB write: reaped, down to " << outstanding_writes; 638 } 639 CreateWriteBlockUsbFfsConnection640 IoWriteBlock CreateWriteBlock(std::shared_ptr<Block> payload, size_t offset, size_t len, 641 uint64_t id) { 642 auto block = IoWriteBlock(); 643 block.payload = std::move(payload); 644 block.control.aio_data = static_cast<uint64_t>(TransferId::write(id)); 645 block.control.aio_rw_flags = 0; 646 block.control.aio_lio_opcode = IOCB_CMD_PWRITE; 647 block.control.aio_reqprio = 0; 648 block.control.aio_fildes = write_fd_.get(); 649 block.control.aio_buf = reinterpret_cast<uintptr_t>(block.payload->data() + offset); 650 block.control.aio_nbytes = len; 651 block.control.aio_offset = 0; 652 block.control.aio_flags = IOCB_FLAG_RESFD; 653 block.control.aio_resfd = worker_event_fd_.get(); 654 return block; 655 } 656 CreateWriteBlockUsbFfsConnection657 IoWriteBlock CreateWriteBlock(Block&& payload, uint64_t id) { 658 size_t len = payload.size(); 659 return CreateWriteBlock(std::make_shared<Block>(std::move(payload)), 0, len, id); 660 } 661 SubmitWritesUsbFfsConnection662 void SubmitWrites() REQUIRES(write_mutex_) { 663 if (writes_submitted_ == kUsbWriteQueueDepth) { 664 return; 665 } 666 667 ssize_t writes_to_submit = std::min(kUsbWriteQueueDepth - writes_submitted_, 668 write_requests_.size() - writes_submitted_); 669 CHECK_GE(writes_to_submit, 0); 670 if (writes_to_submit == 0) { 671 return; 672 } 673 674 struct iocb* iocbs[kUsbWriteQueueDepth]; 675 for (int i = 0; i < writes_to_submit; ++i) { 676 CHECK(!write_requests_[writes_submitted_ + i].pending); 677 write_requests_[writes_submitted_ + i].pending = true; 678 iocbs[i] = &write_requests_[writes_submitted_ + i].control; 679 LOG(VERBOSE) << "submitting write_request " << static_cast<void*>(iocbs[i]); 680 } 681 682 writes_submitted_ += writes_to_submit; 683 684 int rc = io_submit(aio_context_.get(), writes_to_submit, iocbs); 685 if (rc == -1) { 686 HandleError(StringPrintf("failed to submit write requests: %s", strerror(errno))); 687 return; 688 } else if (rc != writes_to_submit) { 689 LOG(FATAL) << "failed to submit all writes: wanted to submit " << writes_to_submit 690 << ", actually submitted " << rc; 691 } 692 } 693 HandleErrorUsbFfsConnection694 void HandleError(const std::string& error) { 695 std::call_once(error_flag_, [&]() { 696 error_callback_(this, error); 697 if (!stopped_) { 698 Stop(); 699 } 700 }); 701 } 702 703 std::thread monitor_thread_; 704 705 bool worker_started_; 706 std::thread worker_thread_; 707 708 std::atomic<bool> stopped_; 709 std::promise<void> destruction_notifier_; 710 std::once_flag error_flag_; 711 712 unique_fd worker_event_fd_; 713 unique_fd monitor_event_fd_; 714 715 ScopedAioContext aio_context_; 716 unique_fd control_fd_; 717 unique_fd read_fd_; 718 unique_fd write_fd_; 719 720 std::optional<amessage> incoming_header_; 721 IOVector incoming_payload_; 722 723 std::array<IoReadBlock, kUsbReadQueueDepth> read_requests_; 724 IOVector read_data_; 725 726 // ID of the next request that we're going to send out. 727 size_t next_read_id_ = 0; 728 729 // ID of the next packet we're waiting for. 730 size_t needed_read_id_ = 0; 731 732 std::mutex write_mutex_; 733 std::deque<IoWriteBlock> write_requests_ GUARDED_BY(write_mutex_); 734 size_t next_write_id_ GUARDED_BY(write_mutex_) = 0; 735 size_t writes_submitted_ GUARDED_BY(write_mutex_) = 0; 736 737 static constexpr int kInterruptionSignal = SIGUSR1; 738 }; 739 usb_ffs_open_thread()740 static void usb_ffs_open_thread() { 741 adb_thread_setname("usb ffs open"); 742 743 while (true) { 744 unique_fd control; 745 unique_fd bulk_out; 746 unique_fd bulk_in; 747 if (!open_functionfs(&control, &bulk_out, &bulk_in)) { 748 std::this_thread::sleep_for(1s); 749 continue; 750 } 751 752 atransport* transport = new atransport(); 753 transport->serial = "UsbFfs"; 754 std::promise<void> destruction_notifier; 755 std::future<void> future = destruction_notifier.get_future(); 756 transport->SetConnection(std::make_unique<UsbFfsConnection>( 757 std::move(control), std::move(bulk_out), std::move(bulk_in), 758 std::move(destruction_notifier))); 759 register_transport(transport); 760 future.wait(); 761 } 762 } 763 usb_init()764 void usb_init() { 765 std::thread(usb_ffs_open_thread).detach(); 766 } 767